Writing scripts for the OET

The Observation Execution Tool (OET) can run observing scripts in a headless non-interactive manner. For efficiency, OET script execution is split into two phases: an initialisation phase and an execution phase. Scripts that are expected to be run by the OET should be structured to have two entry points corresponding to these two phases, as the template below:

Observing script template
 1def init(subarray: int, *args, **kwargs):
 2    # Called by the OET when the script is loaded and initialised by someone
 3    # calling 'oet prepare'. Add your script initialisation code here. Note that
 4    # the target subarray is supplied to this function as the first argument.
 5    pass
 6
 7def main(*args, **kwargs):
 8    # Called by the OET when the prepared script is told to run by someone
 9    # calling 'oet start'. Add the main body of your script to this function.
10    pass

The initialisation phase occurs when the script is loaded and the script’s init function is called (if defined) to perform any preparation and/or initialisation. Expensive and slow operations that can be performed ahead of the main body of script execution can be run in the initialisation phase. Typical actions performed in init are I/O intensive operations, e.g., cloning a git repository, creating multiple Tango device proxies, subscribing to Tango events, etc. When run by the Observation Execution Tool (OET), the init function is passed an integer subarray ID declaring which subarray the control script is intended to control.

Subsequently, at some point a user may call oet start, requesting that the initialised script begin the main body of its execution. When this occurs, the OET calls the script’s main function, which should performs the main function of the script. For an observing script, this would involve the configuration and control of a subarray.

below is the real example script in the scripts folder of this project.

SKA : Allocate Resources and Perform Observation

Allocating resources and performing scans requires communication with TMC CentralNode and TMC SubarrayNode, and targets a specific subarray. This script’s init function pre-applies the subarray ID argument to the main function. Note that this script does not perform any Tango calls directly, but uses ska_oso_scripting.api functions to perform all the required Tango interactions (command invocation; event subscriptions; event monitoring).

Resource allocation and perform observation script for an SKA MID/LOW subarray
  1"""
  2Standard observing script for SBDefinition execution.
  3
  4Creates an AssignResources command from the SBDefinition and sends to TMC CentralNode, then
  5creates ConfigureRequests and Scan commands to send to TMC SubarrayNode.
  6
  7At the end of the observation, all resources are released.
  8
  9This script and the transform functions to telescope commands should support
 10all functionality currently available in the ODT (e.g. PST, 5 point scans, etc)
 11"""
 12import functools
 13import logging
 14import os
 15from typing import Any
 16
 17from ska_oso_pdm import SBDefinition, TelescopeType
 18
 19from ska_oso_scripting import api
 20from ska_oso_scripting.api import execution_block
 21from ska_oso_scripting.api.objects import SubArray
 22from ska_oso_scripting.core.execution import ValueTransitionError
 23from ska_oso_scripting.pdm_transforms import (
 24    create_cdm_assign_resources_request_from_scheduling_block,
 25    create_cdm_configure_requests_from_scheduling_block,
 26)
 27
 28from ska_oso_scripting.topics import user_topics
 29from ska_oso_scripting.engineering.low import workarounds as LOW_WORKAROUNDS
 30from ska_oso_scripting.engineering.scripting_workarounds import TEMP_WORKAROUNDS
 31
 32LOG = logging.getLogger(__name__)
 33FORMAT = "%(asctime)-15s %(message)s"
 34
 35logging.basicConfig(level=logging.INFO, format=FORMAT)
 36
 37
 38def init(subarray_id: int | str, **init_args):
 39    """
 40    Initialise the script, binding the script runtime args to the script.
 41    """
 42    try:
 43        subarray_id = int(subarray_id)
 44    except ValueError as err:
 45        raise TypeError("subarray_id must be an integer") from err
 46
 47    LOG.debug(f"Initializing script {__name__} with subarray_id={subarray_id}")
 48
 49    for arg,value in init_args.items():
 50        LOG.debug(f"Initializing script {__name__} with argument(s) {arg}={value}")
 51
 52    global main
 53
 54    main = functools.partial(_main, subarray_id=subarray_id)
 55    LOG.info(f"Script bound to sub-array {subarray_id}")
 56
 57
 58def assign_resources(subarray: SubArray, sbi: SBDefinition, apply_low_workarounds: bool =False):
 59    """
 60    assign resources to a target sub-array using a Scheduling Block (SB).
 61    :param subarray: subarray ID
 62    :param sbi: ska_oso_pdm.SBDefinition
 63    :return:
 64    """
 65    LOG.info(
 66        f"Running assign_resources(subarray={subarray.id}, sbi.sbd_id={sbi.sbd_id})"
 67    )
 68
 69    cdm_allocation = create_cdm_assign_resources_request_from_scheduling_block(
 70            subarray.id, sbi
 71        )
 72
 73    if apply_low_workarounds:
 74        cdm_allocation = LOW_WORKAROUNDS.update_assign_resources_request(cdm_allocation)
 75        LOW_WORKAROUNDS.pre_obs_checks(
 76            cdm_allocation.model_dump(mode="json", exclude_none=True, by_alias=True)
 77        )
 78
 79    response = api.assign_resources_from_cdm(subarray.id, cdm_allocation)
 80    LOG.info(f"Resources Allocated: {response}")
 81
 82    LOG.info("Allocation complete")
 83
 84
 85def observe(*, subarray: SubArray, sbi: SBDefinition, runtime_args: dict[str,Any], apply_low_workarounds:bool =False):
 86    """
 87    Observe using a Scheduling Block (SB) and template CDM file.
 88
 89    :param subarray:  SubArray instance containing subarray ID
 90    :param sbi: Instance of a SBDefinition
 91    :param runtime_args: Any script runtime arguments to be considered.
 92    :return:
 93    """
 94
 95    LOG.info(
 96        f"Starting observing for Scheduling Block: {sbi.sbd_id}, subarray_id={subarray.id})"
 97    )
 98
 99    if runtime_args:
100        for arg,value in runtime_args.items():
101            LOG.info(f"runtime argument(s) {arg}={value}")
102
103    scan_sequence = sbi.dish_allocations.scan_sequence if sbi.telescope == TelescopeType.SKA_MID else sbi.mccs_allocation.subarray_beams[0].scan_sequence
104
105    if not scan_sequence:
106        LOG.info(f"No scans defined in Scheduling Block {sbi.sbd_id}. No observation performed.")
107        return
108
109    cdm_configure_requests = (
110        create_cdm_configure_requests_from_scheduling_block(sbi,runtime_args)
111    )
112
113    if apply_low_workarounds:
114        LOW_WORKAROUNDS.update_configure_requests(cdm_configure_requests)
115
116    for scan_definition_idx in range(len(scan_sequence)):
117        cdm_configs = cdm_configure_requests[scan_definition_idx]
118        for index, cdm_config in enumerate(cdm_configs):
119            scan_id_string = (f"{scan_definition_idx} "
120                              f"({str(index + 1)}/{str(len(cdm_configs))})" if len(cdm_configs) > 1 else '')
121            try:
122                # With the CDM modified, we can now issue the Configure instruction...
123                LOG.info(f"Configuring subarray {subarray.id} for scan: {scan_id_string}")
124                api.send_message(
125                    user_topics.script.announce,
126                    msg=f"Configuring subarray {subarray.id} for scan: {scan_id_string}"
127                )
128                api.configure_from_cdm(subarray.id, cdm_config)
129            except ValueTransitionError as err:
130                LOG.error(f"Error configuring subarray: {err}")
131                api.send_message(
132                    user_topics.script.announce,
133                    msg=f"Error configuring subarray for scan {scan_id_string}"
134                )
135                raise err
136            else:
137                LOG.info(f"Configuration for scan {scan_id_string} complete")
138                api.send_message(
139                    user_topics.script.announce,
140                    msg=f"Configuration for scan {scan_id_string} complete"
141                )
142            try:
143                # with configuration complete, we can begin the scan.
144                LOG.info(f"Starting scan: {scan_id_string}")
145                api.send_message(
146                    user_topics.script.announce,
147                    msg=f"Starting scan: {scan_id_string}"
148                )
149                api.scan(subarray.id)
150            except ValueTransitionError as err:
151                LOG.error(f"Error when executing scan: {scan_id_string}: {err}")
152                api.send_message(
153                    user_topics.script.announce,
154                    msg=f"Error when executing scan: {scan_id_string}"
155                )
156                raise err
157            else:
158                LOG.info(f"Scan {scan_id_string} complete")
159                api.send_message(
160                    user_topics.script.announce,
161                    msg=f"Scan {scan_id_string} complete"
162                )
163
164    # All scans are complete. Observations are concluded with an 'end'
165    # command.
166    LOG.info(f"End scheduling block: {sbi.sbd_id}")
167    api.end(subarray.id)
168
169    LOG.info("Observation script complete")
170
171
172def _resolve_context_bool(
173    context: dict | None,
174    *,
175    key: str,
176    fallback: bool,
177) -> bool:
178    """
179    Resolve a boolean context value, falling back when not provided.
180
181    :param context: Optional script context.
182    :param key: Context key to resolve.
183    :param fallback: Value to use when key is absent.
184    :raises TypeError: If the provided context value is not boolean.
185    """
186    if not context or key not in context:
187        return fallback
188
189    value = context[key]
190    if isinstance(value, bool):
191        return value
192
193    raise TypeError(
194        f"context['{key}'] must be a boolean, got {value!r} "
195        f"(type={type(value).__name__})"
196    )
197
198
199def _main(subarray_id: int, sb_json: str, sbi_id: str, context: dict | None = None, **runtime_args):
200    LOG.info(f"Running OS process {os.getpid()}")
201    LOG.info(f"Called with main(subarray_id={subarray_id}, sbi_id={sbi_id})")
202    LOG.debug(f"main() sb_json={sb_json}")
203    sbd: SBDefinition = api.load_sbd(sb_json)
204
205    qa_enabled = _resolve_context_bool(
206        context,
207        key='wait_for_qa_ready',
208        fallback=TEMP_WORKAROUNDS.enable_adr111,
209    )
210    mccs_early_scan = _resolve_context_bool(
211        context,
212        key='mccs_early_scan',
213        fallback=TEMP_WORKAROUNDS.mccs_early_scan,
214    )
215
216    api.init_qa_wait(qa_enabled, mccs_early_scan=mccs_early_scan)
217
218    apply_low_workarounds = apply_low_workarounds_in_args(runtime_args)
219
220    if apply_low_workarounds:
221        sbd = LOW_WORKAROUNDS.update_scheduling_block_definition(sbd)
222
223    eb_id = execution_block.create_eb(telescope=sbd.telescope, sbi_ref=sbi_id)
224    try:
225        LOG.info(f"Created Execution Block {eb_id}")
226        subarray = SubArray(subarray_id)
227        assign_resources(subarray, sbd, apply_low_workarounds)
228        api.configure_subarray_quality_monitor(subarray.id, sbd)
229        if apply_low_workarounds:
230            LOW_WORKAROUNDS.check_mccs_subarray_leaf_node_obsstate(subarray_id)
231        observe(subarray=subarray, sbi=sbd, apply_low_workarounds=apply_low_workarounds, runtime_args=runtime_args)
232        api.release_all_resources(subarray_id)
233        execution_block.mark_execution_block_observed(eb_id)
234    except Exception as err:
235        LOG.error(f"Scheduling block execution failed: {err}", exc_info=True)
236        if apply_low_workarounds:
237            LOW_WORKAROUNDS.back_to_empty(subarray_id)
238        execution_block.mark_execution_block_failed(eb_id=eb_id, error_message=str(err))
239        raise
240
241
242def apply_low_workarounds_in_args(runtime_args: dict) -> bool:
243    if (workarounds := runtime_args.get("workarounds")) is None:
244        return False
245
246    if "low" in str(workarounds).lower():
247        return True
248
249    return False