"""
The pdm_transforms module contains code to transform Project Data Model (PDM)
entities to Configuration Data Model (CDM) entities. The pdm_transforms code
is called by observing scripts to convert the PDM Scheduling Block to the
equivalent CDM configurations, which are then sent to TMC devices to control
the telescope.
"""
import logging
from typing import Union
from ska_oso_pdm import Target
from ska_oso_pdm._shared.target import Beam, EquatorialCoordinatesReferenceFrame
from ska_oso_pdm.sb_definition import ScanDefinition
from ska_tmc_cdm.messages.skydirection import (
AltAzField,
ICRSField,
SkyDirection,
SolarSystemObject,
SpecialField,
)
from ska_tmc_cdm.messages.subarray_node.configure.tmc import (
TMCConfiguration as cdm_TMCConfiguration,
)
LOG = logging.getLogger(__name__)
FORMAT = "%(asctime)-15s %(message)s"
logging.basicConfig(level=logging.INFO, format=FORMAT)
# Not every function in this module should be called externally
__all__ = [
"convert_tmcconfiguration",
]
[docs]
def convert_tmcconfiguration(
scan_definition: ScanDefinition,
) -> cdm_TMCConfiguration:
"""
Convert a PDM ScanDefinition to the equivalent TMC configuration
"""
if isinstance(scan_definition, ScanDefinition):
return cdm_TMCConfiguration(scan_duration=scan_definition.scan_duration_ms)
raise TypeError(f"Expected PDM ScanDefinition, got {type(scan_definition)}")
def convert_target(target: Union[Target, Beam]) -> SkyDirection:
"""
Convert a PDM target or PSS/PST/VLBI beam into the equivalent CDM
SkyDirection.
@param target: the sky coordinate to convert
@raises NotImplementedError: if reference frame is not handled yet
"""
if isinstance(target, Beam):
return _convert_beam(target)
else:
return _convert_target(target)
def _convert_beam(beam: Beam) -> SkyDirection:
"""
Convert a PDM Beam into the appropriate CDM SkyDirection.
@param beam: the PSS/PST/VLBI beam to convert
"""
match beam.beam_coordinate.reference_frame:
case EquatorialCoordinatesReferenceFrame.ICRS:
coord = beam.beam_coordinate
optionals = dict(
pm_c1=coord.pm_ra, pm_c2=coord.pm_dec, parallax=coord.parallax
)
optionals = {k: v for k, v in optionals.items() if v not in (0.0, None)}
sky_coord = coord._coord
return ICRSField(
target_name=beam.beam_coordinate.target_id,
attrs=ICRSField.Attrs(
c1=sky_coord.ra.degree, c2=sky_coord.dec.degree, **optionals
),
)
case _:
raise NotImplementedError(
f"Reference frame {beam.beam_coordinate.reference_frame} not handled."
)
def _convert_target(target: Target) -> SkyDirection:
"""
Convert a PDM target into the appropriate CDM SkyDirection.
TODO:
* Handle Galactic and TLE reference frames as/when they are added to
the PDM.
* Handle proper motion
* Handle parallax
@param target: the sky coordinate to convert
@raises NotImplementedError: if reference frame is not handled yet
"""
match target.reference_coordinate.reference_frame:
case "icrs":
kwargs = {}
if target.radial_velocity.quantity.value != 0.0:
in_si_units = target.radial_velocity.quantity.to("m/s").value
kwargs["radial_velocity"] = in_si_units
# delegate to Astropy for conversion of ra,dec to floating
# point degrees.
sky_coord = target.reference_coordinate._coord
return ICRSField(
target_name=target.target_id,
attrs=ICRSField.Attrs(
c1=sky_coord.ra.degree, c2=sky_coord.dec.degree, **kwargs
),
)
case "altaz":
return AltAzField(
target_name=target.target_id,
attrs=AltAzField.Attrs(
# az/el should already be degree floats
c1=target.reference_coordinate.az,
c2=target.reference_coordinate.el,
),
)
case "special":
return SpecialField(
target_name=SolarSystemObject(target.reference_coordinate.name)
)
case _:
raise NotImplementedError(
f"Reference frame {target.reference_frame} not handled."
)