"""
The pdm_transforms.sdp module contains code to transform SDP Project Data
Model (PDM) entities to Configuration Data Model (CDM) entities.
"""
import datetime
import logging
from typing import Dict, List, Optional
import astropy.units as u
from astropy.coordinates import SkyCoord
from ska_oso_pdm.entities.common.scan_definition import ScanDefinition
from ska_oso_pdm.entities.common.target import (
EquatorialCoordinates,
FivePointParameters,
PointingPatternParameters,
SinglePointParameters,
Target,
)
from ska_oso_pdm.entities.sdp import (
Beam,
BeamMapping,
Channels,
ExecutionBlock,
PbDependency,
Polarisation,
ProcessingBlock,
Resources,
ScanType,
Script,
SDPConfiguration,
SpectralWindow,
)
from ska_tmc_cdm.messages.central_node.sdp import (
BeamConfiguration as cdm_BeamConfiguration,
)
from ska_tmc_cdm.messages.central_node.sdp import Channel as cdm_Channel
from ska_tmc_cdm.messages.central_node.sdp import (
ChannelConfiguration as cdm_ChannelConfiguration,
)
from ska_tmc_cdm.messages.central_node.sdp import EBScanType as cdm_EBScanType
from ska_tmc_cdm.messages.central_node.sdp import EBScanTypeBeam as cdm_ScanTypeBeam
from ska_tmc_cdm.messages.central_node.sdp import (
ExecutionBlockConfiguration as cdm_EBConfiguration,
)
from ska_tmc_cdm.messages.central_node.sdp import (
FieldConfiguration as cdm_FieldConfiguration,
)
from ska_tmc_cdm.messages.central_node.sdp import PbDependency as cdm_PbDependency
from ska_tmc_cdm.messages.central_node.sdp import PhaseDir as cdm_PhaseDir
from ska_tmc_cdm.messages.central_node.sdp import (
PolarisationConfiguration as cdm_PolarisationConfiguration,
)
from ska_tmc_cdm.messages.central_node.sdp import (
ProcessingBlockConfiguration as cdm_ProcessingBlockConfiguration,
)
from ska_tmc_cdm.messages.central_node.sdp import (
ScriptConfiguration as cdm_ScriptConfiguration,
)
from ska_tmc_cdm.messages.central_node.sdp import (
SDPConfiguration as cdm_centralnode_SDPConfiguration,
)
from ska_tmc_cdm.messages.subarray_node.configure.sdp import (
SDPConfiguration as cdm_subarraynode_SDPConfiguration,
)
LOG = logging.getLogger(__name__)
FORMAT = "%(asctime)-15s %(message)s"
logging.basicConfig(level=logging.INFO, format=FORMAT)
# Not every function in this module should be called externally
__all__ = [
"convert_sdpconfiguration_centralnode",
"convert_sdpconfiguration_subarraynode",
]
def _utcnow() -> datetime.datetime:
"""
Returns now as a UTC datetime.
This is coded as a private function rather than included inline to make
testing easier.
"""
return datetime.datetime.utcnow().replace(tzinfo=datetime.timezone.utc)
[docs]def convert_sdpconfiguration_centralnode(
pdm_config: SDPConfiguration,
pdm_targets: List[Target],
) -> cdm_centralnode_SDPConfiguration:
"""
Convert a PDM SDPConfiguration to the equivalent CDM SDPConfiguration.
In a SchedulingBlockDefinition, Targets are recorded exactly once as PDM
Targets separate and external to any SDPConfiguration. Targets to be
inserted into the output SDPConfiguration should be passed to this
function.
:param pdm_config: the SDPConfiguration to convert
:param pdm_targets: Targets to inject into output SDP configuration
:raises TypeError: if pdm_config is not an SDPConfiguration
"""
if not isinstance(pdm_config, SDPConfiguration):
raise TypeError(f"Expected PDM SDPConfiguration, got {type(pdm_config)}")
if pdm_config.execution_block:
execution_block = convert_execution_block(
pdm_config.execution_block, pdm_targets
)
else:
execution_block = None
if pdm_config.processing_blocks is not None:
processing_blocks = [
convert_processing_block(obj) for obj in pdm_config.processing_blocks
]
else:
processing_blocks = None
if pdm_config.resources:
resources = convert_resources(pdm_config.resources)
else:
resources = None
return cdm_centralnode_SDPConfiguration(
interface="https://schema.skao.int/ska-sdp-assignres/0.4",
execution_block=execution_block,
processing_blocks=processing_blocks,
resources=resources,
)
def convert_execution_block(
pdm_config: ExecutionBlock,
pdm_targets: List[Target],
) -> cdm_EBConfiguration:
"""
Convert a PDM ExecutionBlock to the equivalent CDM SDP EBConfiguration.
:param pdm_config: the ExecutionBlock to convert
:param pdm_targets: Targets to insert into the EBConfiguration
:raises TypeError: if pdm_config is not a ExecutionBlock
"""
if not isinstance(pdm_config, ExecutionBlock):
raise TypeError(f"Expected PDM ExecutionBlock, got {type(pdm_config)}")
beams = [convert_beam(beam) for beam in pdm_config.beams]
scan_types = [convert_scantypes(scan_type) for scan_type in pdm_config.scan_types]
channels = [convert_channels(channels) for channels in pdm_config.channels]
polarisations = [
convert_polarisation(polarisation) for polarisation in pdm_config.polarisations
]
fields = [convert_target_to_fieldconfiguration(target) for target in pdm_targets]
return cdm_EBConfiguration(
eb_id=pdm_config.eb_id,
max_length=pdm_config.max_length,
context=pdm_config.context,
beams=beams,
scan_types=scan_types,
channels=channels,
polarisations=polarisations,
fields=fields,
)
def convert_processing_block(
pdm_config: ProcessingBlock,
) -> cdm_ProcessingBlockConfiguration:
"""
Convert a PDM ProcessingBlock to the equivalent CDM ProcessingBlockConfiguration.
"""
if pdm_config.dependencies:
pdm_dependencies = [
convert_dependencies(obj) for obj in pdm_config.dependencies
]
else:
pdm_dependencies = None
cdm_scriptconfiguration = convert_script(pdm_config.script)
LOG.info(f"Setting ProcessingBlock Id : {pdm_config.pb_id} ")
return cdm_ProcessingBlockConfiguration(
pb_id=pdm_config.pb_id,
script=cdm_scriptconfiguration,
parameters=pdm_config.parameters,
dependencies=pdm_dependencies,
sbi_ids=pdm_config.sbi_ids,
)
def convert_script(pdm_script: Script) -> cdm_ScriptConfiguration:
"""
Convert a PDM Script to the equivalent CDM ScriptConfiguration.
"""
LOG.info(f"Setting ScriptConfiguration : {pdm_script.name} ")
return cdm_ScriptConfiguration(
kind=pdm_script.kind.value,
name=pdm_script.name,
version=pdm_script.version,
)
def convert_dependencies(pdm_config: PbDependency) -> cdm_PbDependency:
"""
Convert a PDM PbDependency to the equivalent CDM PbDependency.
"""
LOG.info(f"Setting PbDependency Id : {pdm_config.pb_id} ")
return cdm_PbDependency(pb_id=pdm_config.pb_id, kind=pdm_config.kind)
def convert_polarisation(
pdm_instance: Polarisation,
) -> cdm_PolarisationConfiguration:
"""
Convert a PDM Polarisation to the equivalent CDM PolarisationConfiguration.
"""
return cdm_PolarisationConfiguration(
polarisations_id=pdm_instance.polarisations_id,
corr_type=pdm_instance.corr_type,
)
def convert_resources(pdm_resources: Resources) -> Dict:
"""
Convert a PDM Resources to equivalent Resources Dict
"""
resources_dict = {}
if pdm_resources.csp_links:
resources_dict["csp_links"] = pdm_resources.csp_links
if pdm_resources.receptors:
resources_dict["receptors"] = pdm_resources.receptors
if pdm_resources.receive_nodes:
resources_dict["receive_nodes"] = pdm_resources.receive_nodes
return resources_dict
def convert_scantypes(
pdm_instance: ScanType,
) -> cdm_EBScanType:
"""
Convert a PDM ScanType to the equivalent CDM EBScanType.
"""
beams = {beam.beam_id: convert_beam_mapping(beam) for beam in pdm_instance.beams}
return cdm_EBScanType(
scan_type_id=pdm_instance.scan_type_id,
beams=beams,
derive_from=pdm_instance.derive_from,
)
def convert_beam_mapping(pdm_instance: BeamMapping) -> cdm_ScanTypeBeam:
"""
Convert a PDM BeamMapping to the equivalent CDM EBScanTypeBeam.
"""
return cdm_ScanTypeBeam(
field_id=pdm_instance.field_id,
channels_id=pdm_instance.channels_id,
polarisations_id=pdm_instance.polarisations_id,
)
def convert_spectral_window(pdm_instance: SpectralWindow) -> cdm_Channel:
"""
Convert a PDM SpectralWindow to the equivalent CDM Channel.
"""
LOG.info(
f"Setting channel attribute -> count:{pdm_instance.count} ,"
f" start:{pdm_instance.start} , stride:{pdm_instance.stride},"
f" freq_min:{pdm_instance.freq_min}, freq_max:{pdm_instance.freq_max} ,"
f" link_map:{pdm_instance.link_map} "
)
return cdm_Channel(
count=pdm_instance.count,
start=pdm_instance.start,
stride=pdm_instance.stride,
freq_min=pdm_instance.freq_min,
freq_max=pdm_instance.freq_max,
link_map=pdm_instance.link_map,
spectral_window_id=pdm_instance.spectral_window_id,
)
def convert_channels(pdm_instance: Channels) -> cdm_ChannelConfiguration:
"""
Convert a PDM Channels to the equivalent CDM cdm_ChannelConfiguration.
"""
LOG.info(
f"Setting ChannelConfiguration id:{pdm_instance.channels_id} ,"
f" spectral_windows:{pdm_instance.spectral_windows} "
)
spectral_windows = [
convert_spectral_window(obj) for obj in pdm_instance.spectral_windows
]
return cdm_ChannelConfiguration(
channels_id=pdm_instance.channels_id, spectral_windows=spectral_windows
)
class TargetConversion:
"""
TargetConversion is a class holding static methods related to PDM Target
conversion.
This class should become a separate module as/when we refactor this SDP
module into an SDP package containing a module for each SDP entity.
"""
SUPPORTED_PATTERNS = [c.kind for c in (SinglePointParameters, FivePointParameters)]
@staticmethod
def convert_target_to_fieldconfiguration(
target: Target,
reference_time: Optional[datetime.datetime] = None,
pointing_fqdn: Optional[str] = None,
) -> cdm_FieldConfiguration:
"""
Convert a PDM Target to the equivalent CDM FieldConfiguration.
The only supported coordinate is a single pointing with Equatorial
coordinate. Any other coordinate or pattern will raise a
NotImplementedError.
ADR-63 states that:
a "reference_time" field (given according to ADR-37 How to
represent time as an ISO 8601 string) should be given that
specifies the time where the direction matches the zeroth-order
Taylor term.
The default reference time of now will be inserted into the output
FieldConfiguration.
:param target: the PDM Target to convert
:param reference_time: reference time for CDM Emphemeris targets.
:param pointing_fqdn: Tango FQDN of device outputting ephemeris coords
:raises NotImplementError: if converting an unsupported coordinate system
or pointing pattern.
"""
LOG.info(
f"Setting target attribute -> target_id:{target.target_id} ,"
f" reference_coordinate:{target.reference_coordinate} ",
)
if reference_time is None:
reference_time = _utcnow()
reference_time = reference_time.isoformat()
# get phase centre transformed by the applicable pointing pattern. This gets
# the coordinate position in the original PDM reference frame
skycoord = TargetConversion._apply_pattern_to_coord(target)
# convert the resulting coordinate to a reference frame supported by SDP
in_sdp_frame = TargetConversion._convert_to_supported_sdp_frame(skycoord)
# transform the Astropy SkyCoord to a CDM object
phase_dir = TargetConversion._as_phase_dir(in_sdp_frame, reference_time)
LOG.info(
f"Resulting SDP coordinate for targetid:{target.target_id} ->"
f" system:{phase_dir.reference_frame} , ra:{phase_dir.ra} , dec:{phase_dir.dec}"
)
return cdm_FieldConfiguration(
field_id=target.target_id,
pointing_fqdn=pointing_fqdn,
phase_dir=phase_dir,
)
@staticmethod
def _apply_pattern_to_coord(target: Target) -> SkyCoord:
"""
Get target phase centre transformed to match target pointing pattern.
:param target: Target to process
:raises NotImplementedError: if coordinate type or pattern is unsupported
"""
# SDP doesn't support az/el coordinates yet
if not isinstance(target.reference_coordinate, EquatorialCoordinates):
raise NotImplementedError("Only EquatorialCoordinates are supported")
# We need the original phase centre and the pointing pattern to apply
coord = target.reference_coordinate.coord
pattern_parameters = TargetConversion._pattern_parameters_for(target)
match pattern_parameters:
case SinglePointParameters():
return coord.spherical_offsets_by(
pattern_parameters.offset_x_arcsec * u.arcsec,
pattern_parameters.offset_y_arcsec * u.arcsec,
)
case FivePointParameters():
return coord
case _:
raise NotImplementedError(
f"Unhandled pointing pattern parameters for SDP: {pattern_parameters.kind}"
)
@staticmethod
def _pattern_parameters_for(target: Target) -> PointingPatternParameters:
"""
Get the active pointing pattern parameters for a target.
:raises NotImplementedError: if the active pointing pattern is not a
supported type.
"""
active_pattern = target.pointing_pattern.active
# We can only convert single pointing targets for SDP, raise an exception for anything else
if active_pattern not in TargetConversion.SUPPORTED_PATTERNS:
raise NotImplementedError(
f"Unhandled pointing pattern parameters for SDP: {active_pattern}"
)
# PDM guarantees one pattern params per type so should be safe to create a dict
# mapping kind to instance
pointing_params_by_type = {
p.kind: p for p in target.pointing_pattern.parameters
}
pattern_parameters = pointing_params_by_type[active_pattern]
return pattern_parameters
@staticmethod
def _convert_to_supported_sdp_frame(coord: SkyCoord) -> SkyCoord:
"""
Convert a SkyCoord to the closest supported SDP frame.
The current algorithm is to convert all coordinates to ICRS, which is
the closest equivalent to the ICRF3 frame supported by SDP.
"""
return coord.transform_to("icrs")
@staticmethod
def _as_phase_dir(coord: SkyCoord, reference_time: str) -> cdm_PhaseDir:
"""
Convert an AstroPy SkyCoord to an CDM SDP PhaseDir.
Currently we just replace ICRS with ICRF3 in the output coord as
ICRS and ICRF3 frames are virtually identical and astropy doesn't
support ICRS -> ICRF3 conversion yet.
:raises ValueError: if the coordinate to transform is not in a
transformable reference frame.
"""
if coord.frame.name != "icrs":
raise ValueError("Cannot convert non-ICRS reference frames")
# ADR-63 says that RA and dec are defined in degrees. Read .degree
# attribute rather than .value attribute to ensure this is so.
ra = coord.ra.degree
dec = coord.dec.degree
phase_dir = cdm_PhaseDir(
ra=[ra],
dec=[dec],
reference_time=reference_time,
reference_frame="ICRF3",
)
return phase_dir
convert_target_to_fieldconfiguration = (
TargetConversion.convert_target_to_fieldconfiguration
)
[docs]def convert_sdpconfiguration_subarraynode(
scan_definition: ScanDefinition,
) -> cdm_subarraynode_SDPConfiguration:
"""
Convert a PDM Scan Definition to an SDP Configuration aspect
of a TMC SubArrayNode.Configure call
"""
if not isinstance(scan_definition, ScanDefinition):
raise TypeError(f"Expected PDM ScanDefinition, got {type(scan_definition)}")
scan_type = scan_definition.scan_type_id
return cdm_subarraynode_SDPConfiguration(scan_type=scan_type)
def convert_beam(beam: Beam) -> cdm_BeamConfiguration:
"""
Convert a PDM Beam to a CDM BeamConfiguration.
"""
if not isinstance(beam, Beam):
raise TypeError(f"Expected PDM Beam, got {type(beam)}")
converted = cdm_BeamConfiguration(
beam_id=beam.beam_id,
function=beam.function.value,
timing_beam_id=beam.timing_beam_id,
vlbi_beam_id=beam.vlbi_beam_id,
search_beam_id=beam.search_beam_id,
)
return converted