Source code for ska_oso_scripting.functions.sb

import copy
import logging
import os
from typing import Union

from ska_oso_pdm.entities.common.sb_definition import SBDefinition
from ska_oso_pdm.schemas import CODEC as pdm_CODEC
from ska_ser_skuid.client import SkuidClient
from ska_ser_skuid.services import SKAEntityTypeService

LOGGER = logging.getLogger(__name__)


[docs]def create_sbi(sbd: SBDefinition) -> SBDefinition: """ Create a Scheduling Block Instance from a Scheduling Block Definition. Currently, an SBI is a snapshot of an SBD but with EB and PB IDs replaced. """ # TODO inject this dependency uid_client = SkuidClient(os.environ["SKUID_URL"]) # Create a new ID for the SBI sbi_map = { sbd.sbd_id: uid_client.fetch_skuid( SKAEntityTypeService.DefaultEntityTypes.SchedulingBlockInstance ) } LOGGER.info(f"New SBI ID mapping: {sbd.sbd_id} -> {sbi_map[sbd.sbd_id]}") # Create a mapping of old processing block IDs to new IDs if sbd.sdp_configuration is not None: sdp_config = sbd.sdp_configuration pb_map = { pb.pb_id: uid_client.fetch_skuid( SKAEntityTypeService.DefaultEntityTypes.ProcessingBlock ) for pb in sdp_config.processing_blocks if sdp_config is not None } for old_id, new_id in pb_map.items(): LOGGER.info(f"New PB ID mapping: {old_id} -> {new_id}") sbi = copy.deepcopy(sbd) # update the top-level ID. This is the SBI ID. sbi.sbd_id = sbi_map[sbd.sbd_id] if sbd.sdp_configuration is not None: sdp_config = sbi.sdp_configuration # Get EB ID for SDP Execution Block # # Currently, SBIs are effectively straight copies of SBDs. Eventually, # SBIs will become real entities with an entity ID, stored in the ODA. # These entities will be lightweight, *referencing* a specific SBD # version, rather than including all the SBD content in the SBI. # Once this is implemented, copying of EB ID into the SDP config would # be done in the PDM transform rather than when creating an SBI. sdp_config.execution_block.eb_id = uid_client.fetch_skuid( SKAEntityTypeService.DefaultEntityTypes.ExecutionBlock ) # Trawl through the SDP Processing Blocks replacing old IDs with new for pb in sdp_config.processing_blocks: pb.pb_id = pb_map[pb.pb_id] # Expect the mapping of PBs to SBDs and (or??) SBIs to become # more complicated. For a commensal SB built from a host SB and # guest SB (e.g., interferometric observing SB plus a pulsar # search SB), certain processing blocks might only reference their # originating SBs. That's moot at the moment as we only operate on # one SB. if sbi.sbd_id not in pb.sbi_ids: pb.sbi_ids.append(sbi.sbd_id) if pb.dependencies: for dependency in pb.dependencies: dependency.pb_id = pb_map[dependency.pb_id] return sbi
[docs]def load_sbd(path: Union[str, os.PathLike]) -> SBDefinition: """ Load an SBDefinition from a JSON file on disk. :param path: path to SBD. :return: SBDefinition object """ if not os.path.isfile(path): msg = f"SB file not found: {path}" LOGGER.error(msg) raise IOError(msg) return pdm_CODEC.load_from_file(SBDefinition, path)
[docs]def save_sbi(sbi: SBDefinition, path: str): """ Save an SBI to disk. Saves an SBI (really, an SBD but with fixed IDs) the specified path. :param sbi: SBI to serialise :param path: output file to write """ sbi_json = pdm_CODEC.dumps(sbi) with open(path, "w") as outfile: outfile.write(sbi_json) return path