Source code for ska_oso_scripting.functions.pdm_transforms.csp

"""
The pdm_transforms module contains code to transform Project Data Model (PDM)
entities to Configuration Data Model (CDM) entities. The pdm_transforms code
is called by observing scripts to convert the PDM Scheduling Block to the
equivalent CDM configurations, which are then sent to TMC devices to control
the telescope.
"""
import copy
import logging
from datetime import timedelta
from typing import List, Optional, Union

import ska_oso_pdm.entities.csp as pdm
import ska_tmc_cdm.messages.subarray_node.configure.csp as cdm
from ska_oso_pdm.entities.dish.dish_configuration import ReceiverBand

from ska_oso_scripting.functions.pdm_transforms.dish import convert_frequency_band

LOG = logging.getLogger(__name__)
FORMAT = "%(asctime)-15s %(message)s"

logging.basicConfig(level=logging.INFO, format=FORMAT)

# Not every function in this module should be called externally
__all__ = [
    "convert_cspconfiguration",
]

CSP_SCHEMA_2_0_URL = "https://schema.skao.int/ska-csp-configure/2.0"
CSP_SCHEMA_LOW_0_0_URL = "https://schema.skao.int/ska-low-csp-configure/0.0"


def convert_csp_id(pdm_val: Optional[str]) -> Optional[str]:
    """
    Converts a PDM CSP ID to a CDM CSP ID.

    Currently, both input and output type are str but this could change.

    :param pdm_val: PDM CSP ID to convert
    :return: CDM CSP ID
    """
    if pdm_val is None:
        return None
    return str(pdm_val)


[docs]def convert_cspconfiguration( pdm_config: pdm.CSPConfiguration, receiver_band: Union[ReceiverBand, cdm.core.ReceiverBand, None] = None, ) -> cdm.CSPConfiguration: """ Convert a PDM CSPConfiguration to the equivalent CDM CSPConfiguration. :param pdm_config: The PDM configuration to convert :param receiver_band: PDM receiver band to set for this configuration :return: the equivalent CDM configuration """ if not isinstance(pdm_config, pdm.CSPConfiguration): raise TypeError(f"Expected PDM CSPConfiguration, got {type(pdm_config)}") if pdm_config.subarray_config is not None and receiver_band is None: raise ValueError( "A valid ReceiverBand must be specified for a MID Scheduling Block." ) if pdm_config.subarray_config is not None: interface_url = CSP_SCHEMA_2_0_URL else: interface_url = CSP_SCHEMA_LOW_0_0_URL common_configuration = convert_commonconfiguration( pdm_config.common_config, pdm_config.config_id, receiver_band ) subarray_config = convert_subarrayconfiguration(pdm_config.subarray_config) cbf_configuration = convert_cbfconfiguration(pdm_config.cbf_config) lowcbf_configuration = convert_lowcbfconfiguration(pdm_config.lowcbf_config) pss_config = convert_pss_config(pdm_config.pss_config) pst_config = convert_pst_config(pdm_config.pst_config) return cdm.CSPConfiguration( interface=interface_url, subarray=subarray_config, common=common_configuration, cbf_config=cbf_configuration, lowcbf=lowcbf_configuration, pst_config=pst_config, pss_config=pss_config, )
# Pulsar Search def convert_pss_config( pdm_val: Optional[pdm.PSSConfiguration], ) -> Optional[cdm.PSSConfiguration]: """ Convert a PDM PSS configuration to its CDM equivalent. PSS configurations have not been defined yet so this function will always fail unless the incoming configuration is also undefined. :param pdm_val: PDM PSS configuration to convert :return: CDM PSS configuration """ if pdm_val is not None: raise NotImplementedError("Cannot convert PSS configurations") return None # Pulsar Timing def convert_pst_config( pdm_val: Optional[pdm.PSTConfiguration], ) -> Optional[cdm.PSTConfiguration]: """ Convert a PDM PST configuration to its CDM equivalent. PST configurations have not been defined yet so this function will always fail unless the incoming configuration is also undefined. :param pdm_val: PDM PST configuration to convert :return: CDM PST configuration """ if pdm_val is not None: raise NotImplementedError("Cannot convert PST configurations") return None def convert_subarrayconfiguration( pdm_val: Optional[pdm.SubarrayConfiguration], ) -> Optional[cdm.SubarrayConfiguration]: """ Convert a PDM SubarrayConfiguration to a CDM SubarrayConfiguration. :param pdm_val: PDM entity to convert :return: equivalent CDM SubarrayConfiguration """ if pdm_val is None: return None if not isinstance(pdm_val, pdm.SubarrayConfiguration): raise TypeError(f"Expected PDM SubarrayConfiguration, got {type(pdm_val)}") return cdm.SubarrayConfiguration(subarray_name=pdm_val.subarray_name) # Correlator Beamformer def convert_cbfconfiguration( pdm_val: Optional[pdm.CBFConfiguration], ) -> Optional[cdm.CBFConfiguration]: """ Convert a PDM CBFConfiguration to a CDM CBFConfiguration :param pdm_val: PDM entity to convert :return: equivalent CDM CBFConfiguration """ if pdm_val is None: return None if not isinstance(pdm_val, pdm.CBFConfiguration): raise TypeError(f"Expected PDM CBFConfiguration, got {type(pdm_val)}") if pdm_val.vlbi_config is not None: raise NotImplementedError("Cannot convert CBF configurations for VLBI") fsp_configs = [convert_fspconfiguration(o) for o in pdm_val.fsp_configs] return cdm.CBFConfiguration(fsp_configs=fsp_configs, vlbi_config=None) def convert_station_beam_mac( pdm_val: Optional[pdm.StationBeamMac], ) -> Optional[List]: """ Convert a PDM StationBeamMac to a list """ if pdm_val is None: return None if not isinstance(pdm_val, pdm.StationBeamMac): raise TypeError(f"Expected PDM StationBeamMac, got {type(pdm_val)}") station_beam_mac = [pdm_val.sdp_channel, pdm_val.server_mac] return station_beam_mac def convert_station_beam_host( pdm_val: Optional[pdm.StationBeamHost], ) -> Optional[List]: """ Convert a PDM StationBeamHost to a list """ if pdm_val is None: return None if not isinstance(pdm_val, pdm.StationBeamHost): raise TypeError(f"Expected PDM StationBeamHost, got {type(pdm_val)}") station_beam_host = [pdm_val.sdp_channel, pdm_val.ip_addr] return station_beam_host def convert_station_beam_port( pdm_val: Optional[pdm.StationBeamPort], ) -> Optional[List]: """ Convert a PDM StationBeamPort to a list """ if pdm_val is None: return None if not isinstance(pdm_val, pdm.StationBeamPort): raise TypeError(f"Expected PDM StationBeamPort, got {type(pdm_val)}") station_beam_host = [pdm_val.sdp_channel, pdm_val.udp_port, pdm_val.stride] return station_beam_host def convert_vis_station_beam_configuration( pdm_val: Optional[pdm.VisStationBeamConfiguration], ) -> Optional[cdm.StnBeamConfiguration]: """ Convert a PDM VisStationBeamConfiguration to a CDM StnBeamConfiguration """ if pdm_val is None: return None if not isinstance(pdm_val, pdm.VisStationBeamConfiguration): raise TypeError( f"Expected PDM VisStationBeamConfiguration, got {type(pdm_val)}" ) port = [ convert_station_beam_port(station_beam_port) for station_beam_port in pdm_val.port ] mac = [ convert_station_beam_mac(station_beam_mac) for station_beam_mac in pdm_val.mac ] host = [ convert_station_beam_host(station_beam_host) for station_beam_host in pdm_val.host ] station_beam_config = cdm.StnBeamConfiguration( stn_beam_id=pdm_val.stn_beam_id, host=host, port=port, mac=mac, integration_ms=pdm_val.integration_ms / timedelta(milliseconds=1.0), ) return station_beam_config def convert_station_beam_configuration( pdm_val: Optional[pdm.StationBeamConfiguration], ) -> Optional[cdm.StnBeamConfiguration]: """ Convert a PDM StationBeamConfiguration to a CDM StnBeamConfiguration """ if pdm_val is None: return None if not isinstance(pdm_val, pdm.StationBeamConfiguration): raise TypeError(f"Expected PDM StationBeamConfiguration, got {type(pdm_val)}") station_beam_config = cdm.StnBeamConfiguration( stn_beam_id=pdm_val.stn_beam_id, freq_ids=pdm_val.freq_ids ) return station_beam_config def convert_vis_fsp_configuration( pdm_val: Optional[pdm.VisFSPConfiguration], ) -> Optional[cdm.VisFspConfiguration]: """ Convert a PDM VisFSPConfiguration to a CDM VisFspConfiguration """ if pdm_val is None: return None if not isinstance(pdm_val, pdm.VisFSPConfiguration): raise TypeError(f"Expected PDM VisFSPConfiguration, got {type(pdm_val)}") vis_fsp_config = cdm.VisFspConfiguration( function_mode=pdm_val.function_mode, fsp_ids=pdm_val.fsp_ids ) return vis_fsp_config def convert_vis_configuration( pdm_val: Optional[pdm.VisConfiguration], ) -> Optional[cdm.VisConfiguration]: """ Convert a PDM VisConfiguration to a CDM VisConfiguration """ if pdm_val is None: return None if not isinstance(pdm_val, pdm.VisConfiguration): raise TypeError(f"Expected PDM VisConfiguration, got {type(pdm_val)}") vis_config = cdm.VisConfiguration( fsp=convert_vis_fsp_configuration(pdm_val.fsp), stn_beams=[ convert_vis_station_beam_configuration(vis_station_beam_configs) for vis_station_beam_configs in pdm_val.stn_beams ], ) return vis_config def convert_station_configuration( pdm_val: Optional[pdm.StationConfiguration], ) -> Optional[cdm.StationConfiguration]: """ Convert a PDM StationConfiguration to a CDM StationConfiguration """ if pdm_val is None: return None if not isinstance(pdm_val, pdm.StationConfiguration): raise TypeError(f"Expected PDM StationConfiguration, got {type(pdm_val)}") station_config = cdm.StationConfiguration( stns=pdm_val.stns, stn_beams=[ convert_station_beam_configuration(stn_beams) for stn_beams in pdm_val.stn_beams ], ) return station_config def convert_lowcbfconfiguration( pdm_val: Optional[pdm.LowCBFConfiguration], ) -> Optional[cdm.LowCBFConfiguration]: """ Convert a PDM LowCBFConfiguration to a CDM LowCBFConfiguration :param pdm_val: PDM entity to convert :return: equivalent CDM LowCBFConfiguration """ if pdm_val is None: return None if not isinstance(pdm_val, pdm.LowCBFConfiguration): raise TypeError(f"Expected PDM LowCBFConfiguration, got {type(pdm_val)}") lowcbf = cdm.LowCBFConfiguration( vis=convert_vis_configuration(pdm_val.vis), stations=convert_station_configuration(pdm_val.stations), ) return lowcbf # Frequency Slice Processor def convert_fspconfiguration( pdm_val: pdm.FSPConfiguration, ) -> cdm.FSPConfiguration: """ Convert a PDM FSPConfiguration to the equivalent CDM FSPConfiguration. :param pdm_val: CDM FSPConfiguration to convert :return: equivalent CDM FSPConfiguration """ if not isinstance(pdm_val, pdm.FSPConfiguration): raise TypeError(f"Expected PDM FSPConfiguration, got {type(pdm_val)}") return cdm.FSPConfiguration( fsp_id=pdm_val.fsp_id, function_mode=cdm.FSPFunctionMode[pdm_val.function_mode.value], frequency_slice_id=pdm_val.frequency_slice_id, integration_factor=pdm_val.integration_factor, zoom_factor=pdm_val.zoom_factor, channel_averaging_map=copy.deepcopy(pdm_val.channel_averaging_map), output_link_map=copy.deepcopy(pdm_val.output_link_map), channel_offset=pdm_val.channel_offset, zoom_window_tuning=pdm_val.zoom_window_tuning, ) def convert_commonconfiguration( pdm_val: Optional[pdm.CommonConfiguration], csp_id: pdm.CSPConfigurationID, receiver_band: Optional[Union[ReceiverBand, cdm.core.ReceiverBand]], ) -> Optional[cdm.CommonConfiguration]: """ Convert a PDM CommonConfiguration to a CDM CommonConfiguration receiver_band can be provided as either a PDM or CDM receiver band. :param pdm_val: PDM entity to convert :param csp_id: CSP configuration ID :param receiver_band: receiver band to set for this configuration :return: equivalent CDM CommonConfiguration """ if pdm_val is None: return None if not isinstance(pdm_val, pdm.CommonConfiguration): raise TypeError(f"Expected PDM CommonConfiguration, got {type(pdm_val)}") if receiver_band is None: return cdm.CommonConfiguration( config_id=convert_csp_id(csp_id), ) else: if isinstance(receiver_band, cdm.core.ReceiverBand): frequency_band = receiver_band else: frequency_band = convert_frequency_band(receiver_band) return cdm.CommonConfiguration( config_id=convert_csp_id(csp_id), frequency_band=frequency_band, subarray_id=pdm_val.subarray_id, band_5_tuning=pdm_val.band_5_tuning, )