Source code for ska_oso_scripting.functions.pdm_transforms.common

"""
The pdm_transforms module contains code to transform Project Data Model (PDM)
entities to Configuration Data Model (CDM) entities. The pdm_transforms code
is called by observing scripts to convert the PDM Scheduling Block to the
equivalent CDM configurations, which are then sent to TMC devices to control
the telescope.
"""
import logging
from typing import Union

from ska_oso_pdm import Target
from ska_oso_pdm._shared.target import Beam, EquatorialCoordinatesReferenceFrame
from ska_oso_pdm.sb_definition import ScanDefinition
from ska_tmc_cdm.messages.skydirection import (
    AltAzField,
    ICRSField,
    SkyDirection,
    SolarSystemObject,
    SpecialField,
)
from ska_tmc_cdm.messages.subarray_node.configure.tmc import (
    TMCConfiguration as cdm_TMCConfiguration,
)

LOG = logging.getLogger(__name__)
FORMAT = "%(asctime)-15s %(message)s"

logging.basicConfig(level=logging.INFO, format=FORMAT)

# Not every function in this module should be called externally
__all__ = [
    "convert_tmcconfiguration",
]


[docs] def convert_tmcconfiguration( scan_definition: ScanDefinition, ) -> cdm_TMCConfiguration: """ Convert a PDM ScanDefinition to the equivalent TMC configuration """ if isinstance(scan_definition, ScanDefinition): return cdm_TMCConfiguration(scan_duration=scan_definition.scan_duration_ms) raise TypeError(f"Expected PDM ScanDefinition, got {type(scan_definition)}")
def convert_target(target: Union[Target, Beam]) -> SkyDirection: """ Convert a PDM target or PSS/PST/VLBI beam into the equivalent CDM SkyDirection. @param target: the sky coordinate to convert @raises NotImplementedError: if reference frame is not handled yet """ if isinstance(target, Beam): return _convert_beam(target) else: return _convert_target(target) def _convert_beam(beam: Beam) -> SkyDirection: """ Convert a PDM Beam into the appropriate CDM SkyDirection. @param beam: the PSS/PST/VLBI beam to convert """ match beam.beam_coordinate.reference_frame: case EquatorialCoordinatesReferenceFrame.ICRS: coord = beam.beam_coordinate optionals = dict( pm_c1=coord.pm_ra, pm_c2=coord.pm_dec, parallax=coord.parallax ) optionals = {k: v for k, v in optionals.items() if v not in (0.0, None)} sky_coord = coord._coord return ICRSField( target_name=beam.beam_coordinate.target_id, attrs=ICRSField.Attrs( c1=sky_coord.ra.degree, c2=sky_coord.dec.degree, **optionals ), ) case _: raise NotImplementedError( f"Reference frame {beam.beam_coordinate.reference_frame} not handled." ) def _convert_target(target: Target) -> SkyDirection: """ Convert a PDM target into the appropriate CDM SkyDirection. TODO: * Handle Galactic and TLE reference frames as/when they are added to the PDM. * Handle proper motion * Handle parallax @param target: the sky coordinate to convert @raises NotImplementedError: if reference frame is not handled yet """ match target.reference_coordinate.reference_frame: case "icrs": kwargs = {} if target.radial_velocity.quantity.value != 0.0: in_si_units = target.radial_velocity.quantity.to("m/s").value kwargs["radial_velocity"] = in_si_units # delegate to Astropy for conversion of ra,dec to floating # point degrees. sky_coord = target.reference_coordinate._coord return ICRSField( target_name=target.target_id, attrs=ICRSField.Attrs( c1=sky_coord.ra.degree, c2=sky_coord.dec.degree, **kwargs ), ) case "altaz": return AltAzField( target_name=target.target_id, attrs=AltAzField.Attrs( # az/el should already be degree floats c1=target.reference_coordinate.az, c2=target.reference_coordinate.el, ), ) case "special": return SpecialField( target_name=SolarSystemObject(target.reference_coordinate.name) ) case _: raise NotImplementedError( f"Reference frame {target.reference_frame} not handled." )