Source code for ska_oso_scripting.functions.pdm_transforms.sdp

"""
The pdm_transforms.sdp module contains code to transform SDP Project Data
Model (PDM) entities to Configuration Data Model (CDM) entities.
"""
import datetime
import logging
from typing import Dict, List, Optional

import astropy.units as u
from astropy.coordinates import SkyCoord
from ska_oso_pdm.entities.common.scan_definition import ScanDefinition
from ska_oso_pdm.entities.common.target import (
    EquatorialCoordinates,
    FivePointParameters,
    PointingPatternParameters,
    SinglePointParameters,
    Target,
)
from ska_oso_pdm.entities.sdp import (
    Beam,
    BeamMapping,
    Channels,
    ExecutionBlock,
    PbDependency,
    Polarisation,
    ProcessingBlock,
    Resources,
    ScanType,
    Script,
    SDPConfiguration,
    SpectralWindow,
)
from ska_tmc_cdm.messages.central_node.sdp import (
    BeamConfiguration as cdm_BeamConfiguration,
)
from ska_tmc_cdm.messages.central_node.sdp import Channel as cdm_Channel
from ska_tmc_cdm.messages.central_node.sdp import (
    ChannelConfiguration as cdm_ChannelConfiguration,
)
from ska_tmc_cdm.messages.central_node.sdp import EBScanType as cdm_EBScanType
from ska_tmc_cdm.messages.central_node.sdp import EBScanTypeBeam as cdm_ScanTypeBeam
from ska_tmc_cdm.messages.central_node.sdp import (
    ExecutionBlockConfiguration as cdm_EBConfiguration,
)
from ska_tmc_cdm.messages.central_node.sdp import (
    FieldConfiguration as cdm_FieldConfiguration,
)
from ska_tmc_cdm.messages.central_node.sdp import PbDependency as cdm_PbDependency
from ska_tmc_cdm.messages.central_node.sdp import PhaseDir as cdm_PhaseDir
from ska_tmc_cdm.messages.central_node.sdp import (
    PolarisationConfiguration as cdm_PolarisationConfiguration,
)
from ska_tmc_cdm.messages.central_node.sdp import (
    ProcessingBlockConfiguration as cdm_ProcessingBlockConfiguration,
)
from ska_tmc_cdm.messages.central_node.sdp import (
    ScriptConfiguration as cdm_ScriptConfiguration,
)
from ska_tmc_cdm.messages.central_node.sdp import (
    SDPConfiguration as cdm_centralnode_SDPConfiguration,
)
from ska_tmc_cdm.messages.subarray_node.configure.sdp import (
    SDPConfiguration as cdm_subarraynode_SDPConfiguration,
)

LOG = logging.getLogger(__name__)
FORMAT = "%(asctime)-15s %(message)s"

logging.basicConfig(level=logging.INFO, format=FORMAT)


# Not every function in this module should be called externally
__all__ = [
    "convert_sdpconfiguration_centralnode",
    "convert_sdpconfiguration_subarraynode",
]


def _utcnow() -> datetime.datetime:
    """
    Returns now as a UTC datetime.

    This is coded as a private function rather than included inline to make
    testing easier.
    """
    return datetime.datetime.utcnow().replace(tzinfo=datetime.timezone.utc)


[docs]def convert_sdpconfiguration_centralnode( pdm_config: SDPConfiguration, pdm_targets: List[Target], ) -> cdm_centralnode_SDPConfiguration: """ Convert a PDM SDPConfiguration to the equivalent CDM SDPConfiguration. In a SchedulingBlockDefinition, Targets are recorded exactly once as PDM Targets separate and external to any SDPConfiguration. Targets to be inserted into the output SDPConfiguration should be passed to this function. :param pdm_config: the SDPConfiguration to convert :param pdm_targets: Targets to inject into output SDP configuration :raises TypeError: if pdm_config is not an SDPConfiguration """ if not isinstance(pdm_config, SDPConfiguration): raise TypeError(f"Expected PDM SDPConfiguration, got {type(pdm_config)}") if pdm_config.execution_block: execution_block = convert_execution_block( pdm_config.execution_block, pdm_targets ) else: execution_block = None if pdm_config.processing_blocks is not None: processing_blocks = [ convert_processing_block(obj) for obj in pdm_config.processing_blocks ] else: processing_blocks = None if pdm_config.resources: resources = convert_resources(pdm_config.resources) else: resources = None return cdm_centralnode_SDPConfiguration( interface="https://schema.skao.int/ska-sdp-assignres/0.4", execution_block=execution_block, processing_blocks=processing_blocks, resources=resources, )
def convert_execution_block( pdm_config: ExecutionBlock, pdm_targets: List[Target], ) -> cdm_EBConfiguration: """ Convert a PDM ExecutionBlock to the equivalent CDM SDP EBConfiguration. :param pdm_config: the ExecutionBlock to convert :param pdm_targets: Targets to insert into the EBConfiguration :raises TypeError: if pdm_config is not a ExecutionBlock """ if not isinstance(pdm_config, ExecutionBlock): raise TypeError(f"Expected PDM ExecutionBlock, got {type(pdm_config)}") beams = [convert_beam(beam) for beam in pdm_config.beams] scan_types = [convert_scantypes(scan_type) for scan_type in pdm_config.scan_types] channels = [convert_channels(channels) for channels in pdm_config.channels] polarisations = [ convert_polarisation(polarisation) for polarisation in pdm_config.polarisations ] fields = [convert_target_to_fieldconfiguration(target) for target in pdm_targets] return cdm_EBConfiguration( eb_id=pdm_config.eb_id, max_length=pdm_config.max_length, context=pdm_config.context, beams=beams, scan_types=scan_types, channels=channels, polarisations=polarisations, fields=fields, ) def convert_processing_block( pdm_config: ProcessingBlock, ) -> cdm_ProcessingBlockConfiguration: """ Convert a PDM ProcessingBlock to the equivalent CDM ProcessingBlockConfiguration. """ if pdm_config.dependencies: pdm_dependencies = [ convert_dependencies(obj) for obj in pdm_config.dependencies ] else: pdm_dependencies = None cdm_scriptconfiguration = convert_script(pdm_config.script) LOG.info(f"Setting ProcessingBlock Id : {pdm_config.pb_id} ") return cdm_ProcessingBlockConfiguration( pb_id=pdm_config.pb_id, script=cdm_scriptconfiguration, parameters=pdm_config.parameters, dependencies=pdm_dependencies, sbi_ids=pdm_config.sbi_ids, ) def convert_script(pdm_script: Script) -> cdm_ScriptConfiguration: """ Convert a PDM Script to the equivalent CDM ScriptConfiguration. """ LOG.info(f"Setting ScriptConfiguration : {pdm_script.name} ") return cdm_ScriptConfiguration( kind=pdm_script.kind.value, name=pdm_script.name, version=pdm_script.version, ) def convert_dependencies(pdm_config: PbDependency) -> cdm_PbDependency: """ Convert a PDM PbDependency to the equivalent CDM PbDependency. """ LOG.info(f"Setting PbDependency Id : {pdm_config.pb_id} ") return cdm_PbDependency(pb_id=pdm_config.pb_id, kind=pdm_config.kind) def convert_polarisation( pdm_instance: Polarisation, ) -> cdm_PolarisationConfiguration: """ Convert a PDM Polarisation to the equivalent CDM PolarisationConfiguration. """ return cdm_PolarisationConfiguration( polarisations_id=pdm_instance.polarisations_id, corr_type=pdm_instance.corr_type, ) def convert_resources(pdm_resources: Resources) -> Dict: """ Convert a PDM Resources to equivalent Resources Dict """ resources_dict = {} if pdm_resources.csp_links: resources_dict["csp_links"] = pdm_resources.csp_links if pdm_resources.receptors: resources_dict["receptors"] = pdm_resources.receptors if pdm_resources.receive_nodes: resources_dict["receive_nodes"] = pdm_resources.receive_nodes return resources_dict def convert_scantypes( pdm_instance: ScanType, ) -> cdm_EBScanType: """ Convert a PDM ScanType to the equivalent CDM EBScanType. """ beams = {beam.beam_id: convert_beam_mapping(beam) for beam in pdm_instance.beams} return cdm_EBScanType( scan_type_id=pdm_instance.scan_type_id, beams=beams, derive_from=pdm_instance.derive_from, ) def convert_beam_mapping(pdm_instance: BeamMapping) -> cdm_ScanTypeBeam: """ Convert a PDM BeamMapping to the equivalent CDM EBScanTypeBeam. """ return cdm_ScanTypeBeam( field_id=pdm_instance.field_id, channels_id=pdm_instance.channels_id, polarisations_id=pdm_instance.polarisations_id, ) def convert_spectral_window(pdm_instance: SpectralWindow) -> cdm_Channel: """ Convert a PDM SpectralWindow to the equivalent CDM Channel. """ LOG.info( f"Setting channel attribute -> count:{pdm_instance.count} ," f" start:{pdm_instance.start} , stride:{pdm_instance.stride}," f" freq_min:{pdm_instance.freq_min}, freq_max:{pdm_instance.freq_max} ," f" link_map:{pdm_instance.link_map} " ) return cdm_Channel( count=pdm_instance.count, start=pdm_instance.start, stride=pdm_instance.stride, freq_min=pdm_instance.freq_min, freq_max=pdm_instance.freq_max, link_map=pdm_instance.link_map, spectral_window_id=pdm_instance.spectral_window_id, ) def convert_channels(pdm_instance: Channels) -> cdm_ChannelConfiguration: """ Convert a PDM Channels to the equivalent CDM cdm_ChannelConfiguration. """ LOG.info( f"Setting ChannelConfiguration id:{pdm_instance.channels_id} ," f" spectral_windows:{pdm_instance.spectral_windows} " ) spectral_windows = [ convert_spectral_window(obj) for obj in pdm_instance.spectral_windows ] return cdm_ChannelConfiguration( channels_id=pdm_instance.channels_id, spectral_windows=spectral_windows ) class TargetConversion: """ TargetConversion is a class holding static methods related to PDM Target conversion. This class should become a separate module as/when we refactor this SDP module into an SDP package containing a module for each SDP entity. """ SUPPORTED_PATTERNS = [c.kind for c in (SinglePointParameters, FivePointParameters)] @staticmethod def convert_target_to_fieldconfiguration( target: Target, reference_time: Optional[datetime.datetime] = None, pointing_fqdn: Optional[str] = None, ) -> cdm_FieldConfiguration: """ Convert a PDM Target to the equivalent CDM FieldConfiguration. The only supported coordinate is a single pointing with Equatorial coordinate. Any other coordinate or pattern will raise a NotImplementedError. ADR-63 states that: a "reference_time" field (given according to ADR-37 How to represent time as an ISO 8601 string) should be given that specifies the time where the direction matches the zeroth-order Taylor term. The default reference time of now will be inserted into the output FieldConfiguration. :param target: the PDM Target to convert :param reference_time: reference time for CDM Emphemeris targets. :param pointing_fqdn: Tango FQDN of device outputting ephemeris coords :raises NotImplementError: if converting an unsupported coordinate system or pointing pattern. """ LOG.info( f"Setting target attribute -> target_id:{target.target_id} ," f" reference_coordinate:{target.reference_coordinate} ", ) if reference_time is None: reference_time = _utcnow() reference_time = reference_time.isoformat() # get phase centre transformed by the applicable pointing pattern. This gets # the coordinate position in the original PDM reference frame skycoord = TargetConversion._apply_pattern_to_coord(target) # convert the resulting coordinate to a reference frame supported by SDP in_sdp_frame = TargetConversion._convert_to_supported_sdp_frame(skycoord) # transform the Astropy SkyCoord to a CDM object phase_dir = TargetConversion._as_phase_dir(in_sdp_frame, reference_time) LOG.info( f"Resulting SDP coordinate for targetid:{target.target_id} ->" f" system:{phase_dir.reference_frame} , ra:{phase_dir.ra} , dec:{phase_dir.dec}" ) return cdm_FieldConfiguration( field_id=target.target_id, pointing_fqdn=pointing_fqdn, phase_dir=phase_dir, ) @staticmethod def _apply_pattern_to_coord(target: Target) -> SkyCoord: """ Get target phase centre transformed to match target pointing pattern. :param target: Target to process :raises NotImplementedError: if coordinate type or pattern is unsupported """ # SDP doesn't support az/el coordinates yet if not isinstance(target.reference_coordinate, EquatorialCoordinates): raise NotImplementedError("Only EquatorialCoordinates are supported") # We need the original phase centre and the pointing pattern to apply coord = target.reference_coordinate.coord pattern_parameters = TargetConversion._pattern_parameters_for(target) match pattern_parameters: case SinglePointParameters(): return coord.spherical_offsets_by( pattern_parameters.offset_x_arcsec * u.arcsec, pattern_parameters.offset_y_arcsec * u.arcsec, ) case FivePointParameters(): return coord case _: raise NotImplementedError( f"Unhandled pointing pattern parameters for SDP: {pattern_parameters.kind}" ) @staticmethod def _pattern_parameters_for(target: Target) -> PointingPatternParameters: """ Get the active pointing pattern parameters for a target. :raises NotImplementedError: if the active pointing pattern is not a supported type. """ active_pattern = target.pointing_pattern.active # We can only convert single pointing targets for SDP, raise an exception for anything else if active_pattern not in TargetConversion.SUPPORTED_PATTERNS: raise NotImplementedError( f"Unhandled pointing pattern parameters for SDP: {active_pattern}" ) # PDM guarantees one pattern params per type so should be safe to create a dict # mapping kind to instance pointing_params_by_type = { p.kind: p for p in target.pointing_pattern.parameters } pattern_parameters = pointing_params_by_type[active_pattern] return pattern_parameters @staticmethod def _convert_to_supported_sdp_frame(coord: SkyCoord) -> SkyCoord: """ Convert a SkyCoord to the closest supported SDP frame. The current algorithm is to convert all coordinates to ICRS, which is the closest equivalent to the ICRF3 frame supported by SDP. """ return coord.transform_to("icrs") @staticmethod def _as_phase_dir(coord: SkyCoord, reference_time: str) -> cdm_PhaseDir: """ Convert an AstroPy SkyCoord to an CDM SDP PhaseDir. Currently we just replace ICRS with ICRF3 in the output coord as ICRS and ICRF3 frames are virtually identical and astropy doesn't support ICRS -> ICRF3 conversion yet. :raises ValueError: if the coordinate to transform is not in a transformable reference frame. """ if coord.frame.name != "icrs": raise ValueError("Cannot convert non-ICRS reference frames") # ADR-63 says that RA and dec are defined in degrees. Read .degree # attribute rather than .value attribute to ensure this is so. ra = coord.ra.degree dec = coord.dec.degree phase_dir = cdm_PhaseDir( ra=[ra], dec=[dec], reference_time=reference_time, reference_frame="ICRF3", ) return phase_dir convert_target_to_fieldconfiguration = ( TargetConversion.convert_target_to_fieldconfiguration )
[docs]def convert_sdpconfiguration_subarraynode( scan_definition: ScanDefinition, ) -> cdm_subarraynode_SDPConfiguration: """ Convert a PDM Scan Definition to an SDP Configuration aspect of a TMC SubArrayNode.Configure call """ if not isinstance(scan_definition, ScanDefinition): raise TypeError(f"Expected PDM ScanDefinition, got {type(scan_definition)}") scan_type = scan_definition.scan_type_id return cdm_subarraynode_SDPConfiguration(scan_type=scan_type)
def convert_beam(beam: Beam) -> cdm_BeamConfiguration: """ Convert a PDM Beam to a CDM BeamConfiguration. """ if not isinstance(beam, Beam): raise TypeError(f"Expected PDM Beam, got {type(beam)}") converted = cdm_BeamConfiguration( beam_id=beam.beam_id, function=beam.function.value, timing_beam_id=beam.timing_beam_id, vlbi_beam_id=beam.vlbi_beam_id, search_beam_id=beam.search_beam_id, ) return converted