Source code for realtime.receive.core.scan

import enum
import logging
import warnings
from dataclasses import dataclass, field
from typing import Sequence

import numpy as np
from astropy import units as u
from astropy.coordinates import Angle, SkyCoord
from dataclass_type_validator import dataclass_validate

from realtime.receive.core.channel_range import ChannelRange

logger = logging.getLogger(__name__)


[docs] @dataclass_validate(strict=False) @dataclass class SpectralWindow: """A spectral window specifying channel numbers and a frequency range""" spectral_window_id: str """ID of this spectral window""" count: int """Number of channels""" start: int """First channel""" # From https://developer.skao.int/projects/ska-telmodel/en/latest/schemas/ska-sdp-assignres.html#scan-channels-0-4 freq_min: float """The lower bound of the frequency of the first channel in Hz""" freq_max: float """The upper bound of the frequency of the last channel in Hz""" stride: int = 1 """Stride in channel numbers"""
[docs] def try_as_channel_range(self, start_id: int | None = None, count: int | None = None): """ Construct a ChannelRange from this SpectralWindow, adding the specified constraints. If this isn't possible, return None. :param start_id: The first channel of the created range. Defaults to the channel at the start of the SpectralWindow :param count: The number of channels in the new range. Defaults to the number of channels in the spectral window after start_id """ sw_range = self.as_channel_range() sw_range = sw_range.try_create_subrange(start_id=start_id, max_channels=count) if not sw_range: return None if count is not None and sw_range.count != count: return None return sw_range
[docs] def as_channel_range(self): """Construct a ChannelRange from this SpectralWindow""" return ChannelRange(self.start, self.count, self.stride)
@property def frequencies(self): """The center points (in Hz) of each channel""" frequencies = self._unstrided_channels[:: self.stride] return frequencies @property def channel_width(self): """The gap (in Hz) between the center points of each channel""" return self.channel_bandwidth * self.stride @property def channel_bandwidth(self): """The amount of bandwidth (in Hz) that each channel will have""" return (self.freq_max - self.freq_min) / self._unstrided_channel_count @property def _unstrided_channels(self): """ Filling in gaps as if every channel ID was present (i.e. if stride=1 and channel_bandwidth stayed the same), the center points (in Hz) of every channel in this spectral window. """ freq_offset = self.channel_bandwidth / 2 frequencies = np.linspace( self.freq_min + freq_offset, self.freq_max - freq_offset, self._unstrided_channel_count, ) return frequencies @property def _unstrided_channel_count(self): """ Filling in gaps as if every channel ID was present (i.e. if stride=1 and channel_bandwidth stayed the same), the number of channels in this spectral window. """ return self.count * self.stride - (self.stride - 1)
[docs] def frequencies_for_range(self, channel_range: ChannelRange): """ Return the frequencies of the channels that the specified range refers to. ``channel_range`` must be a valid subrange of this spectral window. """ assert self.as_channel_range().contains_range(channel_range) return self._unstrided_channels[channel_range.as_slice(indexed_from=self.start)]
[docs] @dataclass_validate(strict=False) @dataclass class Channels: """A named collection of channels, expressed as spectral windows""" channels_id: str """ID of this collection of channels""" spectral_windows: Sequence[SpectralWindow] """Spectral windows making up this collection of channels""" @property def num_channels(self): """ Number of channels in all child spectral windows, given they are all the same. Raises an error if they are not. """ count = set(map(lambda sw: sw.count, self.spectral_windows)) if len(count) > 1: raise ValueError("Varying spectral window num_channels not supported") return next(iter(count))
[docs] @dataclass_validate(strict=False) @dataclass(eq=False) class PhaseDirection: """A phase direction""" ra: Angle """Right Ascension polinomial""" dec: Angle """Declination polinomial""" reference_time: str """Reference time for RA/Dec polinomials""" reference_frame: str = "icrs" """Reference frame in which coordinates are given""" def __eq__(self, other): return ( (Angle(self.ra.rad, u.rad) == Angle(other.ra.rad, u.rad)).all() and (Angle(self.dec.rad, u.rad) == Angle(other.dec.rad, u.rad)).all() and self.reference_time == other.reference_time and self.reference_frame == other.reference_frame )
[docs] def as_SkyCoord(self): """Convert to astropy SkyCoord""" return SkyCoord(ra=self.ra, dec=self.dec, frame=self.reference_frame)
[docs] @dataclass_validate(strict=False) @dataclass class Field: """A named field with one or many coordinates""" field_id: str """ID of this field""" phase_dir: PhaseDirection """The phase direction where this field can be found""" pointing_fqdn: str | None = None """The FQDN of the Tango attribute where live pointing information can be retrieved from"""
class FrequencyType(enum.IntEnum): """ Types of freqeuncy frames aligned with casacore::MFrequency::Types enumeration for consistency and ease of use when writing them into a measurement set. //# Enumerations // Types of known MFrequencies // <note role=warning> The order defines the order in the translation // matrix FromTo // in the getConvert routine. Do not change the order without // changing the array. Additions should be made before N_types, and // an additional row and column should be coded in FromTo, and // in showType().</note> enum Types { REST, LSRK, LSRD, BARY, GEO, TOPO, GALACTO, LGROUP, CMB, N_Types, Undefined = 64, N_Other, // all extra bits EXTRA = 64, // Defaults DEFAULT=LSRK, // Synonyms LSR=LSRK }; """ REST = 0 LSRK = 1 LSRD = 2 BARY = 3 GEO = 4 TOPO = 5 GALACTO = 6 LGROUP = 7 CMB = 8 Undefined = (64,)
[docs] class StokesType(enum.IntEnum): """ A type of stoke for correlations. The values correspond to the casacore::Stokes::StokesTypes enumeration for consistency and ease of use when writing them into a Measurement Set. """ RR = 5 RL = 6 LR = 7 LL = 8 XX = 9 XY = 10 YX = 11 YY = 12 @property def product(self): """Returns the correlator product""" normalized = self.value - 1 return np.array([np.fmod(normalized // 2, 2), np.fmod(normalized, 2)])
[docs] @dataclass_validate(strict=False) @dataclass class Polarisations: """A named collection of correlation types""" polarisation_id: str """ID of this collection of correlation types""" correlation_type: Sequence[StokesType] """The correlation types""" @property def num_pols(self): """Number of polarisations""" return len(self.correlation_type)
[docs] @dataclass_validate(strict=False) @dataclass class Beam: """A beam configuration, containing channels, polarisations and a field""" beam_id: str """ID of this beam""" function: str """The functional purpose of this beam""" channels: Channels """The channels this beam is configured with""" polarisations: Polarisations """The polarisations this beam is configured with""" field: Field """The field this beam is pointing to""" search_beam_id: int | None = None """If this beam's function is "pulsar search", the beam ID of this beam as sent by CBF.""" timing_beam_id: int | None = None """If this beam's function is "pulsar timing", the beam ID of this beam as sent by CBF.""" vlbi_beam_id: int | None = None """If this beam's function is "vlbi", the beam ID of this beam as sent by CBF.""" visibility_beam_id: int | None = None """If this beam's function is "visibilities", the beam ID of this beam as sent by CBF."""
[docs] @dataclass_validate(strict=False) @dataclass class ScanType: """A scan type, consisting on one or more beam configurations""" scan_type_id: str """ID of this scan type""" beams: Sequence[Beam] """The beams making up this scan type""" scan_intents: list = field(default_factory=list) """Scan intent""" integration_time: float | None = None """Correlator integration time in seconds""" averaging_channels: int = 1 "Number of channels to average over when using Frequency Averaging Processor" averaging_samples: int = 1 "Number of time samples to average over when using Time Averaging Processor" def __post_init__(self): if self.integration_time is None: logger.warning("Integration time not provided, defaulting to 1.0") self.integration_time = 1.0 @property def num_channels(self): """ N_f, the number of frequency channels across all beams. Valid only if all beams declare the same number of channels, raises an error otherwise. """ warnings.warn( "obtain number of channels from specific spectral window", DeprecationWarning, stacklevel=2, ) count = set(map(lambda b: b.channels.num_channels, self.beams)) # all beam spectral windows need the same number of channels if len(count) > 1: raise ValueError("Varying beam num_channels not supported") return next(iter(count)) @property def num_pols(self): """ N_p, The number of poloarizations across all beams. Valid only if all beams declare the same number of channels, raises an error otherwise. """ warnings.warn( "obtain number of polarisations from specific beam", DeprecationWarning, stacklevel=2, ) count = set(map(lambda b: b.polarisations.num_pols, self.beams)) # all beam polarizations need the same number of pols if len(count) > 1: raise ValueError("Varying beam num_pols not supported") return next(iter(count))
[docs] def beam_for_visibility_beam_id(self, visibility_beam_id: int) -> Beam | None: """Find the Beam with the given visibility beam ID, if any""" vis_beams = (beam for beam in self.beams if beam.visibility_beam_id == visibility_beam_id) return next(vis_beams, None)
[docs] def beam_with_visibility_function(self) -> Beam | None: """Find the beam with function "visibilities", if any""" beams = {beam.beam_id: beam for beam in self.beams if beam.function == "visibilities"} if len(beams) > 1: raise ValueError( "More than one beam with function 'visibilities', at most one expected" ) return next(iter(beams.values()), None)
[docs] @dataclass_validate(strict=False) @dataclass class Scan: """A scan, identified by a number and associated to a scan type""" # TODO(rtobar): rename to scan_id in next major release to clarify semantics scan_number: int """The scan number""" scan_type: ScanType """The scan type associated to this scan"""