Writing scripts for the OET
The Observation Execution Tool (OET) can run observing scripts in a headless non-interactive manner. For efficiency, OET script execution is split into two phases: an initialisation phase and an execution phase. Scripts that are expected to be run by the OET should be structured to have two entry points corresponding to these two phases, as the template below:
1def init(subarray: int, *args, **kwargs):
2 # Called by the OET when the script is loaded and initialised by someone
3 # calling 'oet prepare'. Add your script initialisation code here. Note that
4 # the target subarray is supplied to this function as the first argument.
5 pass
6
7def main(*args, **kwargs):
8 # Called by the OET when the prepared script is told to run by someone
9 # calling 'oet start'. Add the main body of your script to this function.
10 pass
The initialisation phase occurs when the script is loaded and the script’s
init
function is called (if defined) to perform any preparation and/or
initialisation. Expensive and slow operations that can be performed ahead of the main
body of script execution can be run in the initialisation phase. Typical actions
performed in init are I/O intensive operations, e.g., cloning a git repository,
creating multiple Tango device proxies, subscribing to Tango events, etc. When run by
the Observation Execution Tool (OET), the init
function is passed an integer
subarray ID declaring which subarray the control script is intended to control.
Subsequently, at some point a user may call oet start
, requesting that the
initialised script begin the main body of its execution. When this occurs, the OET
calls the script’s main
function, which should performs the main function of the
script. For an observing script, this would involve the configuration and control of a
subarray.
below is the real example script in the scripts
folder of this project.
SKA : Allocate Resources and Perform Observation
Allocating resources and performing scans requires communication with TMC CentralNode and
TMC SubarrayNode, and targets a specific subarray. This script’s init
function
pre-applies the subarray ID argument to the main function. Note that this script does not
perform any Tango calls directly, but uses ska_oso_scripting.functions.devicecontrol
functions to perform all the required Tango interactions (command invocation; event
subscriptions; event monitoring).
1"""
2Example script for running an SB-driven observation. Last updated
307/08/24. ConfigureRequest is created from the SB in its entirety.
45PointScan observations are supported.
5"""
6import functools
7import logging
8import os
9
10from ska_oso_oet.event import topics
11from ska_oso_oet.tango import SCAN_ID_GENERATOR
12from ska_oso_pdm import SBDefinition
13from ska_oso_pdm._shared import TelescopeType
14
15from ska_oso_scripting import WORKAROUNDS, oda_helper
16from ska_oso_scripting.functions import (
17 devicecontrol,
18 environment,
19 messages,
20 pdm_transforms,
21 sb,
22)
23from ska_oso_scripting.functions.devicecontrol import release_all_resources
24from ska_oso_scripting.functions.devicecontrol.common import ValueTransitionError
25from ska_oso_scripting.objects import SubArray
26
27LOG = logging.getLogger(__name__)
28FORMAT = "%(asctime)-15s %(message)s"
29
30logging.basicConfig(level=logging.INFO, format=FORMAT)
31
32
33def init(subarray_id: int):
34 """
35 Initialise the script, binding the sub-array ID to the script.
36 """
37 LOG.debug(f"Initializing script {__name__} with subarray_id={subarray_id}")
38 global main
39 main = functools.partial(_main, subarray_id)
40 LOG.info(f"Script bound to sub-array {subarray_id}")
41
42
43def assign_resources(subarray: SubArray, sbi: SBDefinition):
44 """
45 assign resources to a target sub-array using a Scheduling Block (SB).
46 :param subarray: subarray ID
47 :param sbi: ska_oso_pdm.SBDefinition
48 :return:
49 """
50 LOG.info(
51 f"Running assign_resources(subarray={subarray.id} sbi.sbd_id={sbi.sbd_id})"
52 )
53
54 cdm_allocation = pdm_transforms.create_cdm_assign_resources_request_from_scheduling_block(
55 subarray.id, sbi
56 )
57
58 response = devicecontrol.assign_resources_from_cdm(subarray.id, cdm_allocation)
59 LOG.info(f"Resources Allocated: {response}")
60
61 messages.send_message(topics.sb.lifecycle.allocated, sb_id=sbi.sbd_id)
62 LOG.info("Allocation complete")
63
64
65def observe(subarray: SubArray, sbi: SBDefinition):
66 """
67 Observe using a Scheduling Block (SB) and template CDM file.
68
69 :param subarray: SubArray instance containing subarray ID
70 :param sbi: Instance of a SBDefinition
71 :return:
72 """
73
74 LOG.info(
75 f"Starting observing for Scheduling Block: {sbi.sbd_id}, subarray_id={subarray.id})"
76 )
77
78 if not sbi.scan_sequence:
79 LOG.info(f"No scans defined in Scheduling Block {sbi.sbd_id}. No observation performed.")
80 return
81 cdm_configure_requests = (
82 pdm_transforms.create_cdm_configure_request_from_scheduling_block(sbi)
83 )
84
85 for scan_definition_id in sbi.scan_sequence:
86 LOG.info(f"Configuring for scan definition: {scan_definition_id}")
87 messages.send_message(topics.sb.lifecycle.observation.started, sb_id=sbi.sbd_id)
88 cdm_configs = cdm_configure_requests[scan_definition_id]
89 for cdm_config in cdm_configs:
90 # Get the scan ID. This is only used for logging, not for any
91 # business logic.
92 scan_id = SCAN_ID_GENERATOR.next()
93 try:
94 # With the CDM modified, we can now issue the Configure instruction...
95 LOG.info(f"Configuring subarray {subarray.id} for scan {scan_id}")
96 messages.send_message(
97 topics.scan.lifecycle.configure.started,
98 sb_id=sbi.sbd_id,
99 scan_id=scan_id,
100 )
101 devicecontrol.configure_from_cdm(subarray.id, cdm_config)
102 except ValueTransitionError as err:
103 LOG.error(f"Error configuring subarray: {err}")
104 messages.send_message(
105 topics.scan.lifecycle.configure.failed,
106 sb_id=sbi.sbd_id,
107 scan_id=scan_id,
108 )
109 messages.send_message(
110 topics.sb.lifecycle.observation.finished.failed, sb_id=sbi.sbd_id
111 )
112 raise err
113 else:
114 messages.send_message(
115 topics.scan.lifecycle.configure.complete,
116 sb_id=sbi.sbd_id,
117 scan_id=scan_id,
118 )
119
120 try:
121 # with configuration complete, we can begin the scan.
122 LOG.info(f"Starting scan {scan_id}")
123 messages.send_message(
124 topics.scan.lifecycle.start, sb_id=sbi.sbd_id, scan_id=scan_id
125 )
126 devicecontrol.scan(subarray.id)
127 except ValueTransitionError as err:
128 LOG.error(f"Error when executing scan {scan_id}: {err}")
129 messages.send_message(
130 topics.scan.lifecycle.end.failed, sb_id=sbi.sbd_id, scan_id=scan_id
131 )
132 messages.send_message(
133 topics.sb.lifecycle.observation.finished.failed, sb_id=sbi.sbd_id
134 )
135 raise err
136 else:
137 messages.send_message(
138 topics.scan.lifecycle.end.succeeded,
139 sb_id=sbi.sbd_id,
140 scan_id=scan_id,
141 )
142
143 # All scans are complete. Observations are concluded with an 'end'
144 # command.
145 messages.send_message(
146 topics.sb.lifecycle.observation.finished.succeeded, sb_id=sbi.sbd_id
147 )
148 LOG.info(f"End scheduling block: {sbi.sbd_id}")
149 devicecontrol.end(subarray.id)
150
151 LOG.info("Observation script complete")
152
153
154def _main(subarray_id: int, sb_json: str, sbi_id: str):
155 LOG.info(f"Running OS process {os.getpid()}")
156 LOG.info(f"Called with main(subarray_id={subarray_id}, sbi_id={sbi_id})")
157 LOG.debug(f"main() sb_json={sb_json}")
158 sbd: SBDefinition = sb.load_sbd(sb_json)
159 eb_id = oda_helper.create_eb(sbd.telescope, sbi_ref=sbi_id)
160 LOG.info(f"Created Execution Block {eb_id}")
161 subarray = SubArray(subarray_id)
162 assign_resources(subarray, sbd)
163 observe(subarray, sbd)
164 release_all_resources(subarray_id)