API documentation

CLI interface

Unknown arguments will be passed to your processor create() function

plasma_processor [-h] [-v] [-s PLASMA_SOCKET] [-r READINESS_FILE] [--max-scans MAX_SCANS]
                 [--input INPUT] [--input-channel-range INPUT_CHANNEL_RANGE]
                 [--use-sdp-metadata USE_SDP_METADATA] [--num-receivers NUM_RECEIVERS]
                 [--telescope TELESCOPE]
                 user_processor_class

plasma_processor positional arguments

  • user_processor_class - One or more <module_path>.<class_name> processor paths implementing BaseProcessor. A chain of processors can be specified using a comma-separated list. (default: None)

plasma_processor options

  • -h, --help - show this help message and exit

  • -v, --verbose - If set, more verbose output will be produced

  • -s PLASMA_SOCKET, --plasma_socket PLASMA_SOCKET - The socket where Plasma is listening for connections (default: /tmp/plasma)

  • -r READINESS_FILE, --readiness-file READINESS_FILE - An empty file that will be created after the processor has finished setting up, signalling it’s ready to receive data (default: None)

  • --max-scans MAX_SCANS - The number of scans to process data for before automatically exiting. (default: None)

  • --input INPUT - Emulate data reception using a measurement set file (default: None)

  • --input-channel-range INPUT_CHANNEL_RANGE - Map the channel data in the measurement set to the specified channel ids using start:count[:stride] notation. (default: None)

  • --use-sdp-metadata USE_SDP_METADATA - Use SDP metadata support, ignored when used with –input (default: True)

  • --num-receivers NUM_RECEIVERS - Number of receivers used in the pipeline (default: 1)

  • --telescope TELESCOPE - Telescope type ('mid' or 'low'), ignored unless used with –input (default: Telescope.LOW)

Processor Interface

class realtime.receive.processors.sdp.base_processor.BaseProcessor

Base class for all Processors.

Subclasses should override the create, process, timeout and close methods as appropriate.

cancel()

Cancel remaining tasks.

async close()

Flush and close any remaining resources.

DataProducts are gracefully finalized transitioned to COMPLETED status where possible.

If cancel() was invoked, pending work is cancelled and pending DataProducts transition to CANCELLED.

abstract static create(argv: Iterable[str]) BaseProcessor

Create an instance of this class from the given command line parameters. This allows user-provided classes to have their own command line parsing logic, and receive arbitrary user-provided parameters.

Parameters:

argv – A list of command line parameters.

Returns:

A new instance of this class.

async end_scan(scan_id: int) Visibility | Iterable[Visibility] | None

Call when a scan has ended. The default implementation ignores this event, but subclasses might want to react to this.

Parameters:

scan_id – the ID of the scan that has ended.

Returns:

Visibility to pass to the next processor, or None to skip subsequent processors.

get_storage(subpath_components: Iterable[Path | str]) DataProductStorage

Return the Storage object for a given subpath, creating it if it does not exist.

initialise_storage(storage_class: Callable[[Iterable[Path | str]], DataProductStorage])

Initialise storage configuration.

abstract async process(dataset: Visibility) Visibility | Iterable[Visibility] | None

Process the given visibility dataset.

Parameters:

dataset – the visibility to process

Returns:

Visibility or iterable collection of Visibility objects to pass to the next processor, or None to skip subsequent processors.

set_plasma_socket(plasma_socket: str)

Set the path to the socket through which communication with Plasma takes place. The processor should not usually need to communicate with Plasma, but it can if necessary.

Parameters:

plasma_socket – The plasma socket used by this processor.

async start_scan(scan_id: int) None

Call when a new scan has started. The default implementation ignores this event, but subclasses might want to react to this.

Parameters:

scan_id – the ID of the scan that has started.

property storage: DataProductStorage

Get the default Storage object.

Storage

DataProduct Storage module.

class realtime.receive.processors.storage.DataProductStorage(subpath_components: Iterable[Path | str] | None = None)

Processing Block connected volume storage for a single DataProduct.

Used by processors to resolve ProcessingBlock file locations and generate DataProduct metadata.

Usage: >>> storage = DataProductStorage() >>> file = storage.declare_file(“data.file”) >>> with open(file.local_path, ‘w’) as ostream: … dumps(ostream, data) >>> file.update_status(“done”) # metadata written

class File(file: File, parent: DataProductStorage)

File handle to assist in metadata generation using DataProductStorage.

Parameters:
  • file – An SDP metadata File class pointing to the path of the file.

  • parent – DataProductStorage instance for writing metadata.

property exists: bool

Whether the file exists on disk.

property local_path: Path

Absolute path to the file on the local filesystem.

property sdp_path: PurePath | None

File path relative to the volume root.

update_status(status: str)

Update the status of this file to status and flushes to disk.

Parameters:

status – The new status of the file.

property abs_path: Path

Absolute path to processing block storage directory on the local filesystem.

declare_file(path: str, description: str) File

Declare a file in the storage. The file might or might not exist already.

Parameters:
  • path – The path to the file

  • description – A description of the file if the file does not exist.

Returns:

A File object representing the file

property metadata: dict

Get the DataProduct Metadata.

write_metadata()

Write data into ska-data-product.yaml file.

class realtime.receive.processors.storage.DataProductStorageStub(root: Path | None = None, subpath_components: Iterable[Path | str] | None = None)

Stub DataProductStorage with no connected ProcessingBlock configuration.

class File(path: str)

File handle to assist in metadata generation using DataProductStorage.

Parameters:

path – An SDP metadata File class pointing to the path of the file.

property exists: bool

Whether the file exists on disk.

property local_path: Path

The path to this file on the local filesystem.

property sdp_path: Path | None

The path to this file as a globally accessible SDP path. If not running in the context of SDP, returns None.

update_status(_status)

Update the status of this file to status and flushes to disk.

Parameters:

status – The new status of the file.

property abs_path: Path

Absolute path to processing block storage directory on the local filesystem.

declare_file(path: str, _description: str) File

Declare a file in the storage. The file might or might not exist already.

Parameters:
  • path – File path. Relative paths are joined with self._root; absolute paths ignore self._root.

  • description – A description of the file if the file does not exist.

Returns:

A File object representing the file

property metadata: dict

Get the DataProduct Metadata.

write_metadata()

Write metadata to ska-data-product.yaml file.

realtime.receive.processors.storage.File

alias of CompatFile

realtime.receive.processors.storage.Storage

alias of CompatStorage

In-built Processors

class realtime.receive.processors.sdp.mswriter_processor.MSWriterProcessor(output_ms_path: str | Path, telescope_name: str | None, use_plasmastman: bool = False, timestamp_output: bool = False, max_time_centroid_variation: float = 0.1, upper_triangular_baselines: bool = False, pointing_config: PointingSettings | None = None, ms_closed_handlers: list[ContextEventHandler[MsClosedEventData]] | None = None, msflow_config: Config | None = None, ms_version: MSVersion = MSVersion.MSV2, station_data_key_path: str = 'stations')

Processor that writes incoming payloads into a Measurement Set.

Aggregator

Module for signal display metric aggregators.

class realtime.receive.aggregator.metrics_aggregator.Aggregator

Generic class for aggregating metric payloads.

append_payload(payload: MetricPayload)

Append a payload to the aggregator after performing metric_data_type and spectral window validation.

build() MetricPayload | None

Assemble a new payload by combining multiple input payloads in ascending frequency order.

Returns:

A new payload representing the combined metric data, or None if input payloads are missing or inconsistent.

metric_data_attrs: list[str]

List of attributes in MetricPayload.data containing arrays of numerical metric data. Needs to be provided by derived classes.

metric_data_type: MetricDataTypes

The type of metric this aggregator class handles. Needs to be provided by derived classes.

set_scan_type(scan_type: ScanType)

Set scan_type to be used for the aggregator. The scan_type must be set before appending payload data.

class realtime.receive.aggregator.metrics_aggregator.AmplitudeAggregator

Aggregates and appends amplitude payloads of type MetricPayload containing DataAndComponentPayload entries.

Gaps in frequency coverage between payloads are filled with NaNs to ensure consistent channel alignment across baselines/polarizations.

metric_data_attrs: list[str] = ['data', 'component']

List of attributes in MetricPayload.data containing arrays of numerical metric data. Needs to be provided by derived classes.

metric_data_type: MetricDataTypes = 'amplitude'

The type of metric this aggregator class handles. Needs to be provided by derived classes.

class realtime.receive.aggregator.metrics_aggregator.AmplitudePhaseVarianceAggregator

Aggregates the Variance metrics.

metric_data_attrs: list[str] = ['amplitude_variance', 'phase_variance', 'uv_distance', 'distance']

List of attributes in MetricPayload.data containing arrays of numerical metric data. Needs to be provided by derived classes.

metric_data_type: MetricDataTypes = 'amplitudeandphasevariance'

The type of metric this aggregator class handles. Needs to be provided by derived classes.

class realtime.receive.aggregator.metrics_aggregator.AveragedAmplitudeAggregator

Aggregates averaged amplitude payloads of type MetricPayload containing DataAndComponentPayload entries.

This aggregator recomputes global statistics using weighted means, where weights are based on the spectral window channel counts.

metric_data_attrs: list[str] = ['data', 'component']

List of attributes in MetricPayload.data containing arrays of numerical metric data. Needs to be provided by derived classes.

metric_data_type: MetricDataTypes = 'averagedamplitude'

The type of metric this aggregator class handles. Needs to be provided by derived classes.

class realtime.receive.aggregator.metrics_aggregator.BandCrossCorrAggregator

Aggregates and appends band averaged cross correlation payloads of type MetricPayload containing DataPayload entries.

metric_data_attrs: list[str] = []

List of attributes in MetricPayload.data containing arrays of numerical metric data. Needs to be provided by derived classes.

metric_data_type: MetricDataTypes = 'bandaveragedxcorr'

The type of metric this aggregator class handles. Needs to be provided by derived classes.

class realtime.receive.aggregator.metrics_aggregator.LagplotAggregator

Aggregates and appends lag_plot payloads of type MetricPayload containing DataPayload entries.

Gaps in frequency coverage between payloads are filled with NaNs to ensure consistent channel alignment across baselines/polarizations.

metric_data_attrs: list[str] = ['data', 'component']

List of attributes in MetricPayload.data containing arrays of numerical metric data. Needs to be provided by derived classes.

metric_data_type: MetricDataTypes = 'lagplot'

The type of metric this aggregator class handles. Needs to be provided by derived classes.

class realtime.receive.aggregator.metrics_aggregator.PhaseAggregator

Aggregates and appends phase payloads of type MetricPayload containing DataAndComponentPayload entries.

Gaps in frequency coverage between payloads are filled with NaNs to ensure consistent channel alignment across baselines/polarizations.

metric_data_attrs: list[str] = ['data', 'component']

List of attributes in MetricPayload.data containing arrays of numerical metric data. Needs to be provided by derived classes.

metric_data_type: MetricDataTypes = 'phase'

The type of metric this aggregator class handles. Needs to be provided by derived classes.

class realtime.receive.aggregator.metrics_aggregator.SpectrumAggregator

Aggregates and appends spectrum payloads of type MetricPayload containing SpectrumPayload entries.

Gaps in frequency coverage between payloads are filled with NaNs to ensure consistent channel alignment across baselines/polarizations.

metric_data_attrs: list[str] = ['power', 'angle']

List of attributes in MetricPayload.data containing arrays of numerical metric data. Needs to be provided by derived classes.

metric_data_type: MetricDataTypes = 'spectrum'

The type of metric this aggregator class handles. Needs to be provided by derived classes.

class realtime.receive.aggregator.metrics_aggregator.StatsAggregator

Combines and appends stats payload data.

append_payload(payload: VisReceiveStatistics)

Append a new stats payload to the aggregator.

build() VisReceiveStatistics | None

Assemble a new stats VisReceiveStatistics payload by combining multiple input stats payloads. This aggregator is independent and does not subclass Aggregator because it uses different payload logic.

Returns:

A new payload representing the combined stats data, or None if input payloads are missing or inconsistent.

class realtime.receive.aggregator.metrics_aggregator.UVCoverageAggregator

Aggregates and appends UV Coverage payloads of type MetricPayload containing UVCoveragePayload entries.

metric_data_attrs: list[str] = []

List of attributes in MetricPayload.data containing arrays of numerical metric data. Needs to be provided by derived classes.

metric_data_type: MetricDataTypes = 'uvcoverage'

The type of metric this aggregator class handles. Needs to be provided by derived classes.

realtime.receive.aggregator.metrics_aggregator.find_spectral_window_for_frequency_range(scan_type: ScanType, freq_start: float, freq_end: float) SpectralWindow | None

Given a frequency range, find the SpectralWindow in the scan_type object that it overlaps with.

Parameters:
  • scan_type – The ScanType object

  • freq_start – Start of frequency range of the slice

  • freq_end – End of frequency range of the slice

Returns:

Spectral window that overlaps with the range, otherwise None

Others

class realtime.receive.processors.file_executor.FileExecutor

Executes jobs in the background for files that get scheduled to this instance.

class realtime.receive.processors.file_executor.FunctionFileExecutor(fun: Callable[[str], None])

A file executor that schedules the execution of a function on the output filename in a background thread.

Functions are executed on a single background thread, one after another.

class realtime.receive.processors.file_executor.CommandFileExecutor(command_template: list[str])

A file executor that schedules the execution of a bash command on the output filename in a background thread.

Commands are executed on a single background thread, one after another.