API documentation
CLI interface
Unknown arguments will be passed to your processor create() function
plasma_processor [-h] [-v] [-s PLASMA_SOCKET] [-r READINESS_FILE] [--max-scans MAX_SCANS]
[--input INPUT] [--input-channel-range INPUT_CHANNEL_RANGE]
[--use-sdp-metadata USE_SDP_METADATA] [--num-receivers NUM_RECEIVERS]
[--telescope TELESCOPE]
user_processor_class
plasma_processor positional arguments
user_processor_class- One or more <module_path>.<class_name> processor paths implementing BaseProcessor. A chain of processors can be specified using a comma-separated list. (default:None)
plasma_processor options
-v,--verbose- If set, more verbose output will be produced-sPLASMA_SOCKET,--plasma_socketPLASMA_SOCKET- The socket where Plasma is listening for connections (default:/tmp/plasma)-rREADINESS_FILE,--readiness-fileREADINESS_FILE- An empty file that will be created after the processor has finished setting up, signalling it’s ready to receive data (default:None)--max-scansMAX_SCANS- The number of scans to process data for before automatically exiting. (default:None)--inputINPUT- Emulate data reception using a measurement set file (default:None)--input-channel-rangeINPUT_CHANNEL_RANGE- Map the channel data in the measurement set to the specified channel ids using start:count[:stride] notation. (default:None)--use-sdp-metadataUSE_SDP_METADATA- Use SDP metadata support, ignored when used with –input (default:True)--num-receiversNUM_RECEIVERS- Number of receivers used in the pipeline (default:1)--telescopeTELESCOPE- Telescope type ('mid'or'low'), ignored unless used with –input (default:Telescope.LOW)
Processor Interface
- class realtime.receive.processors.sdp.base_processor.BaseProcessor
Base class for all Processors.
Subclasses should override the create, process, timeout and close methods as appropriate.
- cancel()
Cancel remaining tasks.
- async close()
Flush and close any remaining resources.
DataProducts are gracefully finalized transitioned to
COMPLETEDstatus where possible.If
cancel()was invoked, pending work is cancelled and pending DataProducts transition toCANCELLED.
- abstract static create(argv: Iterable[str]) BaseProcessor
Create an instance of this class from the given command line parameters. This allows user-provided classes to have their own command line parsing logic, and receive arbitrary user-provided parameters.
- Parameters:
argv – A list of command line parameters.
- Returns:
A new instance of this class.
- async end_scan(scan_id: int) Visibility | Iterable[Visibility] | None
Call when a scan has ended. The default implementation ignores this event, but subclasses might want to react to this.
- Parameters:
scan_id – the ID of the scan that has ended.
- Returns:
Visibility to pass to the next processor, or None to skip subsequent processors.
- get_storage(subpath_components: Iterable[Path | str]) DataProductStorage
Return the Storage object for a given subpath, creating it if it does not exist.
- initialise_storage(storage_class: Callable[[Iterable[Path | str]], DataProductStorage])
Initialise storage configuration.
- abstract async process(dataset: Visibility) Visibility | Iterable[Visibility] | None
Process the given visibility dataset.
- Parameters:
dataset – the visibility to process
- Returns:
Visibility or iterable collection of Visibility objects to pass to the next processor, or None to skip subsequent processors.
- set_plasma_socket(plasma_socket: str)
Set the path to the socket through which communication with Plasma takes place. The processor should not usually need to communicate with Plasma, but it can if necessary.
- Parameters:
plasma_socket – The plasma socket used by this processor.
- async start_scan(scan_id: int) None
Call when a new scan has started. The default implementation ignores this event, but subclasses might want to react to this.
- Parameters:
scan_id – the ID of the scan that has started.
- property storage: DataProductStorage
Get the default Storage object.
Storage
DataProduct Storage module.
- class realtime.receive.processors.storage.DataProductStorage(subpath_components: Iterable[Path | str] | None = None)
Processing Block connected volume storage for a single DataProduct.
Used by processors to resolve ProcessingBlock file locations and generate DataProduct metadata.
Usage: >>> storage = DataProductStorage() >>> file = storage.declare_file(“data.file”) >>> with open(file.local_path, ‘w’) as ostream: … dumps(ostream, data) >>> file.update_status(“done”) # metadata written
- class File(file: File, parent: DataProductStorage)
File handle to assist in metadata generation using DataProductStorage.
- Parameters:
file – An SDP metadata File class pointing to the path of the file.
parent – DataProductStorage instance for writing metadata.
- property exists: bool
Whether the file exists on disk.
- property local_path: Path
Absolute path to the file on the local filesystem.
- property sdp_path: PurePath | None
File path relative to the volume root.
- update_status(status: str)
Update the status of this file to status and flushes to disk.
- Parameters:
status – The new status of the file.
- property abs_path: Path
Absolute path to processing block storage directory on the local filesystem.
- declare_file(path: str, description: str) File
Declare a file in the storage. The file might or might not exist already.
- Parameters:
path – The path to the file
description – A description of the file if the file does not exist.
- Returns:
A File object representing the file
- property metadata: dict
Get the DataProduct Metadata.
- write_metadata()
Write data into ska-data-product.yaml file.
- class realtime.receive.processors.storage.DataProductStorageStub(root: Path | None = None, subpath_components: Iterable[Path | str] | None = None)
Stub DataProductStorage with no connected ProcessingBlock configuration.
- class File(path: str)
File handle to assist in metadata generation using DataProductStorage.
- Parameters:
path – An SDP metadata File class pointing to the path of the file.
- property exists: bool
Whether the file exists on disk.
- property local_path: Path
The path to this file on the local filesystem.
- property sdp_path: Path | None
The path to this file as a globally accessible SDP path. If not running in the context of SDP, returns None.
- update_status(_status)
Update the status of this file to status and flushes to disk.
- Parameters:
status – The new status of the file.
- property abs_path: Path
Absolute path to processing block storage directory on the local filesystem.
- declare_file(path: str, _description: str) File
Declare a file in the storage. The file might or might not exist already.
- Parameters:
path – File path. Relative paths are joined with self._root; absolute paths ignore self._root.
description – A description of the file if the file does not exist.
- Returns:
A File object representing the file
- property metadata: dict
Get the DataProduct Metadata.
- write_metadata()
Write metadata to ska-data-product.yaml file.
- realtime.receive.processors.storage.File
alias of
CompatFile
- realtime.receive.processors.storage.Storage
alias of
CompatStorage
In-built Processors
- class realtime.receive.processors.sdp.mswriter_processor.MSWriterProcessor(output_ms_path: str | Path, telescope_name: str | None, use_plasmastman: bool = False, timestamp_output: bool = False, max_time_centroid_variation: float = 0.1, upper_triangular_baselines: bool = False, pointing_config: PointingSettings | None = None, ms_closed_handlers: list[ContextEventHandler[MsClosedEventData]] | None = None, msflow_config: Config | None = None, ms_version: MSVersion = MSVersion.MSV2, station_data_key_path: str = 'stations')
Processor that writes incoming payloads into a Measurement Set.
Aggregator
Module for signal display metric aggregators.
- class realtime.receive.aggregator.metrics_aggregator.Aggregator
Generic class for aggregating metric payloads.
- append_payload(payload: MetricPayload)
Append a payload to the aggregator after performing metric_data_type and spectral window validation.
- build() MetricPayload | None
Assemble a new payload by combining multiple input payloads in ascending frequency order.
- Returns:
A new payload representing the combined metric data, or None if input payloads are missing or inconsistent.
- metric_data_attrs: list[str]
List of attributes in
MetricPayload.datacontaining arrays of numerical metric data. Needs to be provided by derived classes.
- metric_data_type: MetricDataTypes
The type of metric this aggregator class handles. Needs to be provided by derived classes.
- class realtime.receive.aggregator.metrics_aggregator.AmplitudeAggregator
Aggregates and appends amplitude payloads of type MetricPayload containing DataAndComponentPayload entries.
Gaps in frequency coverage between payloads are filled with NaNs to ensure consistent channel alignment across baselines/polarizations.
- metric_data_attrs: list[str] = ['data', 'component']
List of attributes in
MetricPayload.datacontaining arrays of numerical metric data. Needs to be provided by derived classes.
- metric_data_type: MetricDataTypes = 'amplitude'
The type of metric this aggregator class handles. Needs to be provided by derived classes.
- class realtime.receive.aggregator.metrics_aggregator.AmplitudePhaseVarianceAggregator
Aggregates the Variance metrics.
- metric_data_attrs: list[str] = ['amplitude_variance', 'phase_variance', 'uv_distance', 'distance']
List of attributes in
MetricPayload.datacontaining arrays of numerical metric data. Needs to be provided by derived classes.
- metric_data_type: MetricDataTypes = 'amplitudeandphasevariance'
The type of metric this aggregator class handles. Needs to be provided by derived classes.
- class realtime.receive.aggregator.metrics_aggregator.AveragedAmplitudeAggregator
Aggregates averaged amplitude payloads of type MetricPayload containing DataAndComponentPayload entries.
This aggregator recomputes global statistics using weighted means, where weights are based on the spectral window channel counts.
- metric_data_attrs: list[str] = ['data', 'component']
List of attributes in
MetricPayload.datacontaining arrays of numerical metric data. Needs to be provided by derived classes.
- metric_data_type: MetricDataTypes = 'averagedamplitude'
The type of metric this aggregator class handles. Needs to be provided by derived classes.
- class realtime.receive.aggregator.metrics_aggregator.BandCrossCorrAggregator
Aggregates and appends band averaged cross correlation payloads of type MetricPayload containing DataPayload entries.
- metric_data_attrs: list[str] = []
List of attributes in
MetricPayload.datacontaining arrays of numerical metric data. Needs to be provided by derived classes.
- metric_data_type: MetricDataTypes = 'bandaveragedxcorr'
The type of metric this aggregator class handles. Needs to be provided by derived classes.
- class realtime.receive.aggregator.metrics_aggregator.LagplotAggregator
Aggregates and appends lag_plot payloads of type MetricPayload containing DataPayload entries.
Gaps in frequency coverage between payloads are filled with NaNs to ensure consistent channel alignment across baselines/polarizations.
- metric_data_attrs: list[str] = ['data', 'component']
List of attributes in
MetricPayload.datacontaining arrays of numerical metric data. Needs to be provided by derived classes.
- metric_data_type: MetricDataTypes = 'lagplot'
The type of metric this aggregator class handles. Needs to be provided by derived classes.
- class realtime.receive.aggregator.metrics_aggregator.PhaseAggregator
Aggregates and appends phase payloads of type MetricPayload containing DataAndComponentPayload entries.
Gaps in frequency coverage between payloads are filled with NaNs to ensure consistent channel alignment across baselines/polarizations.
- metric_data_attrs: list[str] = ['data', 'component']
List of attributes in
MetricPayload.datacontaining arrays of numerical metric data. Needs to be provided by derived classes.
- metric_data_type: MetricDataTypes = 'phase'
The type of metric this aggregator class handles. Needs to be provided by derived classes.
- class realtime.receive.aggregator.metrics_aggregator.SpectrumAggregator
Aggregates and appends spectrum payloads of type MetricPayload containing SpectrumPayload entries.
Gaps in frequency coverage between payloads are filled with NaNs to ensure consistent channel alignment across baselines/polarizations.
- metric_data_attrs: list[str] = ['power', 'angle']
List of attributes in
MetricPayload.datacontaining arrays of numerical metric data. Needs to be provided by derived classes.
- metric_data_type: MetricDataTypes = 'spectrum'
The type of metric this aggregator class handles. Needs to be provided by derived classes.
- class realtime.receive.aggregator.metrics_aggregator.StatsAggregator
Combines and appends stats payload data.
- append_payload(payload: VisReceiveStatistics)
Append a new stats payload to the aggregator.
- build() VisReceiveStatistics | None
Assemble a new stats VisReceiveStatistics payload by combining multiple input stats payloads. This aggregator is independent and does not subclass Aggregator because it uses different payload logic.
- Returns:
A new payload representing the combined stats data, or None if input payloads are missing or inconsistent.
- class realtime.receive.aggregator.metrics_aggregator.UVCoverageAggregator
Aggregates and appends UV Coverage payloads of type MetricPayload containing UVCoveragePayload entries.
- metric_data_attrs: list[str] = []
List of attributes in
MetricPayload.datacontaining arrays of numerical metric data. Needs to be provided by derived classes.
- metric_data_type: MetricDataTypes = 'uvcoverage'
The type of metric this aggregator class handles. Needs to be provided by derived classes.
- realtime.receive.aggregator.metrics_aggregator.find_spectral_window_for_frequency_range(scan_type: ScanType, freq_start: float, freq_end: float) SpectralWindow | None
Given a frequency range, find the SpectralWindow in the scan_type object that it overlaps with.
- Parameters:
scan_type – The ScanType object
freq_start – Start of frequency range of the slice
freq_end – End of frequency range of the slice
- Returns:
Spectral window that overlaps with the range, otherwise None
Others
- class realtime.receive.processors.file_executor.FileExecutor
Executes jobs in the background for files that get scheduled to this instance.
- class realtime.receive.processors.file_executor.FunctionFileExecutor(fun: Callable[[str], None])
A file executor that schedules the execution of a function on the output filename in a background thread.
Functions are executed on a single background thread, one after another.
- class realtime.receive.processors.file_executor.CommandFileExecutor(command_template: list[str])
A file executor that schedules the execution of a bash command on the output filename in a background thread.
Commands are executed on a single background thread, one after another.