import enum
import logging
import warnings
from dataclasses import dataclass, field
from typing import Sequence
import numpy as np
from astropy import units as u
from astropy.coordinates import Angle, SkyCoord
from dataclass_type_validator import dataclass_validate
from realtime.receive.core.channel_range import ChannelRange
logger = logging.getLogger(__name__)
[docs]
@dataclass_validate(strict=False)
@dataclass
class SpectralWindow:
"""A spectral window specifying channel numbers and a frequency range"""
spectral_window_id: str
"""ID of this spectral window"""
count: int
"""Number of channels"""
start: int
"""First channel"""
# From https://developer.skao.int/projects/ska-telmodel/en/latest/schemas/ska-sdp-assignres.html#scan-channels-0-4
freq_min: float
"""The lower bound of the frequency of the first channel in Hz"""
freq_max: float
"""The upper bound of the frequency of the last channel in Hz"""
stride: int = 1
"""Stride in channel numbers"""
[docs]
def try_as_channel_range(self, start_id: int | None = None, count: int | None = None):
"""
Construct a ChannelRange from this SpectralWindow,
adding the specified constraints. If this isn't possible, return None.
:param start_id: The first channel of the created range. Defaults
to the channel at the start of the SpectralWindow
:param count: The number of channels in the new range. Defaults to
the number of channels in the spectral window after start_id
"""
sw_range = self.as_channel_range()
sw_range = sw_range.try_create_subrange(start_id=start_id, max_channels=count)
if not sw_range:
return None
if count is not None and sw_range.count != count:
return None
return sw_range
[docs]
def as_channel_range(self):
"""Construct a ChannelRange from this SpectralWindow"""
return ChannelRange(self.start, self.count, self.stride)
@property
def frequencies(self):
"""The center points (in Hz) of each channel"""
frequencies = self._unstrided_channels[:: self.stride]
return frequencies
@property
def channel_width(self):
"""The gap (in Hz) between the center points of each channel"""
return self.channel_bandwidth * self.stride
@property
def channel_bandwidth(self):
"""The amount of bandwidth (in Hz) that each channel will have"""
return (self.freq_max - self.freq_min) / self._unstrided_channel_count
@property
def _unstrided_channels(self):
"""
Filling in gaps as if every channel ID was present
(i.e. if stride=1 and channel_bandwidth stayed the same),
the center points (in Hz) of every channel in this spectral window.
"""
freq_offset = self.channel_bandwidth / 2
frequencies = np.linspace(
self.freq_min + freq_offset,
self.freq_max - freq_offset,
self._unstrided_channel_count,
)
return frequencies
@property
def _unstrided_channel_count(self):
"""
Filling in gaps as if every channel ID was present
(i.e. if stride=1 and channel_bandwidth stayed the same),
the number of channels in this spectral window.
"""
return self.count * self.stride - (self.stride - 1)
[docs]
def frequencies_for_range(self, channel_range: ChannelRange):
"""
Return the frequencies of the channels that the
specified range refers to. ``channel_range`` must be a valid
subrange of this spectral window.
"""
assert self.as_channel_range().contains_range(channel_range)
return self._unstrided_channels[channel_range.as_slice(indexed_from=self.start)]
[docs]
@dataclass_validate(strict=False)
@dataclass
class Channels:
"""A named collection of channels, expressed as spectral windows"""
channels_id: str
"""ID of this collection of channels"""
spectral_windows: Sequence[SpectralWindow]
"""Spectral windows making up this collection of channels"""
@property
def num_channels(self):
"""
Number of channels in all child spectral windows, given they are all the
same. Raises an error if they are not.
"""
count = set(map(lambda sw: sw.count, self.spectral_windows))
if len(count) > 1:
raise ValueError("Varying spectral window num_channels not supported")
return next(iter(count))
[docs]
@dataclass_validate(strict=False)
@dataclass(eq=False)
class PhaseDirection:
"""A phase direction"""
ra: Angle
"""Right Ascension polinomial"""
dec: Angle
"""Declination polinomial"""
reference_time: str
"""Reference time for RA/Dec polinomials"""
reference_frame: str = "icrs"
"""Reference frame in which coordinates are given"""
def __eq__(self, other):
return (
(Angle(self.ra.rad, u.rad) == Angle(other.ra.rad, u.rad)).all()
and (Angle(self.dec.rad, u.rad) == Angle(other.dec.rad, u.rad)).all()
and self.reference_time == other.reference_time
and self.reference_frame == other.reference_frame
)
[docs]
def as_SkyCoord(self):
"""Convert to astropy SkyCoord"""
return SkyCoord(ra=self.ra, dec=self.dec, frame=self.reference_frame)
[docs]
@dataclass_validate(strict=False)
@dataclass
class Field:
"""A named field with one or many coordinates"""
field_id: str
"""ID of this field"""
phase_dir: PhaseDirection
"""The phase direction where this field can be found"""
pointing_fqdn: str | None = None
"""The FQDN of the Tango attribute where live pointing information can be retrieved from"""
class FrequencyType(enum.IntEnum):
"""
Types of freqeuncy frames aligned with casacore::MFrequency::Types
enumeration for consistency and ease of use when writing them into a
measurement set.
//# Enumerations
// Types of known MFrequencies
// <note role=warning> The order defines the order in the translation
// matrix FromTo
// in the getConvert routine. Do not change the order without
// changing the array. Additions should be made before N_types, and
// an additional row and column should be coded in FromTo, and
// in showType().</note>
enum Types {
REST,
LSRK,
LSRD,
BARY,
GEO,
TOPO,
GALACTO,
LGROUP,
CMB,
N_Types,
Undefined = 64,
N_Other,
// all extra bits
EXTRA = 64,
// Defaults
DEFAULT=LSRK,
// Synonyms
LSR=LSRK };
"""
REST = 0
LSRK = 1
LSRD = 2
BARY = 3
GEO = 4
TOPO = 5
GALACTO = 6
LGROUP = 7
CMB = 8
Undefined = (64,)
[docs]
class StokesType(enum.IntEnum):
"""
A type of stoke for correlations.
The values correspond to the casacore::Stokes::StokesTypes enumeration for
consistency and ease of use when writing them into a Measurement Set.
"""
RR = 5
RL = 6
LR = 7
LL = 8
XX = 9
XY = 10
YX = 11
YY = 12
@property
def product(self):
"""Returns the correlator product"""
normalized = self.value - 1
return np.array([np.fmod(normalized // 2, 2), np.fmod(normalized, 2)])
[docs]
@dataclass_validate(strict=False)
@dataclass
class Polarisations:
"""A named collection of correlation types"""
polarisation_id: str
"""ID of this collection of correlation types"""
correlation_type: Sequence[StokesType]
"""The correlation types"""
@property
def num_pols(self):
"""Number of polarisations"""
return len(self.correlation_type)
[docs]
@dataclass_validate(strict=False)
@dataclass
class Beam:
"""A beam configuration, containing channels, polarisations and a field"""
beam_id: str
"""ID of this beam"""
function: str
"""The functional purpose of this beam"""
channels: Channels
"""The channels this beam is configured with"""
polarisations: Polarisations
"""The polarisations this beam is configured with"""
field: Field
"""The field this beam is pointing to"""
search_beam_id: int | None = None
"""If this beam's function is "pulsar search", the beam ID of this beam as sent by CBF."""
timing_beam_id: int | None = None
"""If this beam's function is "pulsar timing", the beam ID of this beam as sent by CBF."""
vlbi_beam_id: int | None = None
"""If this beam's function is "vlbi", the beam ID of this beam as sent by CBF."""
visibility_beam_id: int | None = None
"""If this beam's function is "visibilities", the beam ID of this beam as sent by CBF."""
[docs]
@dataclass_validate(strict=False)
@dataclass
class ScanType:
"""A scan type, consisting on one or more beam configurations"""
scan_type_id: str
"""ID of this scan type"""
beams: Sequence[Beam]
"""The beams making up this scan type"""
scan_intents: list = field(default_factory=list)
"""Scan intent"""
integration_time: float | None = None
"""Correlator integration time in seconds"""
averaging_channels: int = 1
"Number of channels to average over when using Frequency Averaging Processor"
averaging_samples: int = 1
"Number of time samples to average over when using Time Averaging Processor"
def __post_init__(self):
if self.integration_time is None:
logger.warning("Integration time not provided, defaulting to 1.0")
self.integration_time = 1.0
@property
def num_channels(self):
"""
N_f, the number of frequency channels across all beams. Valid only if
all beams declare the same number of channels, raises an error otherwise.
"""
warnings.warn(
"obtain number of channels from specific spectral window",
DeprecationWarning,
stacklevel=2,
)
count = set(map(lambda b: b.channels.num_channels, self.beams))
# all beam spectral windows need the same number of channels
if len(count) > 1:
raise ValueError("Varying beam num_channels not supported")
return next(iter(count))
@property
def num_pols(self):
"""
N_p, The number of poloarizations across all beams. Valid only if all
beams declare the same number of channels, raises an error otherwise.
"""
warnings.warn(
"obtain number of polarisations from specific beam",
DeprecationWarning,
stacklevel=2,
)
count = set(map(lambda b: b.polarisations.num_pols, self.beams))
# all beam polarizations need the same number of pols
if len(count) > 1:
raise ValueError("Varying beam num_pols not supported")
return next(iter(count))
[docs]
def beam_for_visibility_beam_id(self, visibility_beam_id: int) -> Beam | None:
"""Find the Beam with the given visibility beam ID, if any"""
vis_beams = (beam for beam in self.beams if beam.visibility_beam_id == visibility_beam_id)
return next(vis_beams, None)
[docs]
def beam_with_visibility_function(self) -> Beam | None:
"""Find the beam with function "visibilities", if any"""
beams = {beam.beam_id: beam for beam in self.beams if beam.function == "visibilities"}
if len(beams) > 1:
raise ValueError(
"More than one beam with function 'visibilities', at most one expected"
)
return next(iter(beams.values()), None)
[docs]
@dataclass_validate(strict=False)
@dataclass
class Scan:
"""A scan, identified by a number and associated to a scan type"""
# TODO(rtobar): rename to scan_id in next major release to clarify semantics
scan_number: int
"""The scan number"""
scan_type: ScanType
"""The scan type associated to this scan"""