Entities

Common

Common type definitions for SDP config entity models.

class ska_sdp_config.entity.common.AnyTRL(trl: str | TRL | AnyTRL)[source]

Base type for all TRL models.

property attribute_name: str | None

The Tango attribute name part of the TRL, or None.

property dbase: bool

The using database flag. True for ‘dbase=yes’.

property device_name: str

The Tango device name part of the TRL.

property eval: TRL

TRL with all defaults evaluated (i.e TANGO_HOST, #dbase).

property host: str | None

The host part of the TRL, or None.

property port: int | None

The port part of the TRL, or None.

property property_name: str | None

Property part of the TRL, or None.

property scheme: str | None

The scheme part of the TRL, or None.

property url: AnyUrl

The Uniform Resource Locator (URL) with all defaults evaluated (i.e TANGO_HOST, dbase).

Is RFC 3986 compliant for wider library and tool compatibility.

class ska_sdp_config.entity.common.AttributeTRL(trl: str | TRL | AnyTRL)[source]

Base type for a TRL to a Tango Attribute.

class ska_sdp_config.entity.common.KafkaUrl(uri: str)[source]

Kafka client specific URL for connecting to bootstrap servers.

Supports initialization using either a URL or bootstrap address.

>>> from ska_sdp_config.entity.common import KafkaUrl
>>> from pydantic import TypeAdapter
>>> assert TypeAdapter(KafkaUrl).validate_python(
...     "kafka://localhost:9092"
... ).bootstrap_address == "localhost:9092"
>>> assert TypeAdapter(KafkaUrl).validate_python(
...     "localhost:9092"
... ).bootstrap_address == "localhost:9092"
property bootstrap_address

Bootstrap address format used by python Consumer and Producers.

class ska_sdp_config.entity.common.PVCPath(*, k8s_namespaces: str | Sequence[str], k8s_pvc_name: str, pvc_mount_path: Annotated[PurePath, PlainValidator(func=_validate_pure_path, json_schema_input_type=str), PlainSerializer(func=str, return_type=str, when_used=always), WrapValidator(func=_validate_is_absolute, json_schema_input_type=str), PlainSerializer(func=str, return_type=str, when_used=always)], pvc_subpath: Annotated[PurePath, PlainValidator(func=_validate_pure_path, json_schema_input_type=str), PlainSerializer(func=str, return_type=str, when_used=always), WrapValidator(func=_validate_is_relative, json_schema_input_type=str), PlainSerializer(func=str, return_type=str, when_used=always)])[source]

Mounted path within a Kubernetes Persistent Volume Claim.

k8s_namespaces: str | Sequence[str]

Kuberentes pvc namespaces.

k8s_pvc_name: str

Kubernetes persistent volume claim name.

model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': False, 'populate_by_name': True, 'serialize_by_alias': True, 'strict': True, 'validate_assignment': True, 'validate_by_alias': True, 'validate_by_name': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

property path: PlainSerializer(func=str, return_type=str, when_used=always), FieldInfo(annotation=NoneType, required=True, json_schema_extra={'format': 'path', 'type': 'string'})]

The container mounted absolute path to subpath.

pvc_mount_path: PlainSerializer(func=str, return_type=str, when_used=always), FieldInfo(annotation=NoneType, required=True, json_schema_extra={'format': 'path', 'type': 'string'})]

Mount path of the persistent volume claim.

pvc_subpath: PlainSerializer(func=str, return_type=str, when_used=always), FieldInfo(annotation=NoneType, required=True, json_schema_extra={'format': 'path', 'type': 'string'})]

Subpath within the persistent volume claim.

class ska_sdp_config.entity.common.TRL(value: Any)[source]

Tango Resource Locator (TRL) of the form:

[protocol://][host:port/]device-name[/attribute][->property][#dbase=xx]

For more information see: https://tango-controls.readthedocs.io/projects/rfc/en/latest/16/TangoResourceLocator.html

property attribute_name: str | None

The Tango attribute name part of the TRL, or None.

property dbase: bool

The using database flag. True for ‘dbase=yes’.

property device_name: str

The Tango device name part of the TRL.

property eval: TRL

TRL with all defaults evaluated (i.e TANGO_HOST, #dbase).

property host: str | None

The host part of the TRL, or None.

property port: int | None

The port part of the TRL, or None.

property property_name: str | None

Property part of the TRL, or None.

property scheme: str | None

The scheme part of the TRL, or None.

property url: Url

The Uniform Resource Locator (URL) with all defaults evaluated (i.e TANGO_HOST, dbase).

Is RFC 3986 compliant for wider library and tool compatibility.

class ska_sdp_config.entity.common.TangoAttributeUrl(url: str | Url | _BaseUrl)[source]

Tango URL to an attribute on a device instance.

Supported input expressions are of the form: tango://<host>[:<port>]/<device_domain>/<device_family>/<device_member>/<device_attribute>[#dbase=yes|no]

Defaults: - port=10000 - dbase=yes

Pre-conditions: - host required if dbase=yes

For more information see: https://tango-controls.readthedocs.io/en/9.2.5/manual/C-naming.html

classmethod validate_url(value: AnyUrl | TRL | str, handler: ValidatorFunctionWrapHandler) TangoAttributeUrl[source]

Wrap validate a URL string or TRL instance to a Tango attribute URL.

class ska_sdp_config.entity.common.TangoUrl(url: str | Url | _BaseUrl)[source]

Tango URL compliant to RFC 3986 parsers.

Supported input expressions are of the form: tango://<host>[:<port>]/<device_domain>/<device_family>/<device_member>/[<device_attribute>][#dbase=yes|no]

Defaults: - port=10000 - dbase=yes

Pre-conditions: - host required if dbase=yes

Additionally supports construction from a TRL.

For more info see: https://tango-controls.readthedocs.io/en/9.2.5/manual/C-naming.html

property attribute_name: str

The device name part of the URL.

property dbase: bool

The using database flag. True for ‘dbase=yes’.

property device_name: str

The Tango device name part of the TRL.

property device_url: TangoUrl

The device URL.

property domain_name: str

The device family part of the URL.

property family_name: str

The device family part of the URL.

property member_name: str

The device member part of the URL.

property property_name: str

The Tango attribute name part of the TRL, or None.

classmethod validate_url(value: AnyTRL | AnyUrl | str, handler: ValidatorFunctionWrapHandler) TangoUrl[source]

Wrap validate a URL string or TRL instance to a Tango URL.

Owner

Deployment configuration entities.

class ska_sdp_config.entity.owner.Owner(*, pid: int, hostname: str, command: list[str])[source]

Owner information for an entity, uniquely identifying the process owning the entity on a distributed system.

command: list[str]

The command line used to start the process

hostname: str

The name of the host where the process is running

pid: int

The process ID of the entity’s owner

System

System config model entities

class ska_sdp_config.entity.system.System(*, version: Annotated[str, _PydanticGeneralMetadata(pattern='^[a-zA-Z0-9_\\.-]+$')], components: dict[str, SystemComponent], dependencies: dict[str, SystemDependency])[source]

Configuration details for SDP including components, dependencies and their versions.

components: dict[str, SystemComponent]

The components comprising SDP.

dependencies: dict[str, SystemDependency]

The dependencies of SDP.

version: Annotated[str, FieldInfo(annotation=NoneType, required=True, metadata=[_PydanticGeneralMetadata(pattern='^[a-zA-Z0-9_\\.-]+$')])]

The version of SDP.

class ska_sdp_config.entity.system.SystemComponent(*, devicename: Annotated[str | None, _PydanticGeneralMetadata(pattern='^[^/]+/[^/]+/[^/]+$')] = None, image: Annotated[str, _PydanticGeneralMetadata(pattern='^[a-zA-Z0-9_:\\./-]+$')], version: Annotated[str, _PydanticGeneralMetadata(pattern='^[a-zA-Z0-9_\\.-]+$')])[source]

SDP component entity e.g. lmc-controller.

devicename: str | None

The tango device name of this component.

image: Annotated[str, FieldInfo(annotation=NoneType, required=True, metadata=[_PydanticGeneralMetadata(pattern='^[a-zA-Z0-9_:\\./-]+$')])]

The image used to launch this component.

version: Annotated[str, FieldInfo(annotation=NoneType, required=True, metadata=[_PydanticGeneralMetadata(pattern='^[a-zA-Z0-9_\\.-]+$')])]

The version of this component.

class ska_sdp_config.entity.system.SystemDependency(*, repository: Annotated[Url, UrlConstraints(max_length=None, allowed_schemes=['https', 'file'], host_required=None, default_host=None, default_port=None, default_path=None, preserve_empty_path=None)] | Annotated[str, FieldInfo(annotation=NoneType, required=True, metadata=[_PydanticGeneralMetadata(pattern='(alias:|\\@)\\w([-_]?\\w){0,}$')])], version: Annotated[str, _PydanticGeneralMetadata(pattern='^[a-zA-Z0-9_\\.-]+$')])[source]

SDP dependency entity e.g. ska-tango-base.

repository: Annotated[Url, UrlConstraints(max_length=None, allowed_schemes=['https', 'file'], host_required=None, default_host=None, default_port=None, default_path=None, preserve_empty_path=None)] | Annotated[str, FieldInfo(annotation=NoneType, required=True, metadata=[_PydanticGeneralMetadata(pattern='(alias:|\\@)\\w([-_]?\\w){0,}$')])]

The repository containing this dependency.

version: Annotated[str, FieldInfo(annotation=NoneType, required=True, metadata=[_PydanticGeneralMetadata(pattern='^[a-zA-Z0-9_\\.-]+$')])]

The version of this dependency.

Execution Block

Execution block configuration entities.

class ska_sdp_config.entity.eb.ExecutionBlock(*, key: ~typing.Annotated[str, _PydanticGeneralMetadata(pattern='^eb\\-[a-z0-9]+\\-[0-9]{8}\\-[a-z0-9]+$')], beams: list = <factory>, channels: list = <factory>, context: dict = <factory>, fields: list = <factory>, max_length: float | None = None, pb_batch: list[~typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, metadata=[_PydanticGeneralMetadata(pattern='^pb\\-[a-z0-9]+\\-[0-9]{8}\\-[a-z0-9]+$')])]] = <factory>, pb_realtime: list[~typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, metadata=[_PydanticGeneralMetadata(pattern='^pb\\-[a-z0-9]+\\-[0-9]{8}\\-[a-z0-9]+$')])]] = <factory>, polarisations: list = <factory>, resources: dict = <factory>, scan_types: list = <factory>, subarray_id: ~types.Annotated[str | None, _PydanticGeneralMetadata(pattern='^[0-9]+$')] = None, telmodel: ~ska_sdp_config.entity.eb.TelModel | None = None)[source]

Execution block entity.

Collects configuration information relating to an execution block for the SDP.

beams: Annotated[list, FieldInfo(annotation=NoneType, required=False, default_factory=list)]

Beam parameters for the purpose of the Science Data Processor

channels: Annotated[list, FieldInfo(annotation=NoneType, required=False, default_factory=list)]

Channels. This is basically a list of spectral windows per channel configuration.

context: Annotated[dict, FieldInfo(annotation=NoneType, required=False, default_factory=dict)]

Free-form information from OET to describe the observing context

fields: Annotated[list, FieldInfo(annotation=NoneType, required=False, default_factory=list)]

List of fields/targets

key: Annotated[str, FieldInfo(annotation=NoneType, required=True, metadata=[_PydanticGeneralMetadata(pattern='^eb\\-[a-z0-9]+\\-[0-9]{8}\\-[a-z0-9]+$')])]

ID of the Execution Block

max_length: float | None

Maximum duration of the execution block in seconds.

pb_batch: Annotated[list[Annotated[str, FieldInfo(annotation=NoneType, required=True, metadata=[_PydanticGeneralMetadata(pattern='^pb\\-[a-z0-9]+\\-[0-9]{8}\\-[a-z0-9]+$')])]], FieldInfo(annotation=NoneType, required=False, default_factory=list)]

Batch processing blocks.

pb_realtime: Annotated[list[Annotated[str, FieldInfo(annotation=NoneType, required=True, metadata=[_PydanticGeneralMetadata(pattern='^pb\\-[a-z0-9]+\\-[0-9]{8}\\-[a-z0-9]+$')])]], FieldInfo(annotation=NoneType, required=False, default_factory=list)]

Real-time processing blocks.

polarisations: Annotated[list, FieldInfo(annotation=NoneType, required=False, default_factory=list)]

Polarisation definitions

resources: Annotated[dict, FieldInfo(annotation=NoneType, required=False, default_factory=dict)]

Free-form Resources allocated to the execution block.

scan_types: Annotated[list, FieldInfo(annotation=NoneType, required=False, default_factory=list)]

Scan types. Associates scans with per-beam fields & channel configurations

subarray_id: Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, metadata=[_PydanticGeneralMetadata(pattern='^[0-9]+$')])]

Subarray with which this EB is associated.

telmodel: TelModel | None

Telescope model data to use in processing.

class ska_sdp_config.entity.eb.TelModel(*, source_uris: list[str], array_layout_path: str, station_data_path_prefix: str | None)[source]

Telescope model data.

array_layout_path: str

Path of array layout and static delays in telescope model.

source_uris: list[str]

List of source URIs to use to look up telescope model data.

station_data_path_prefix: str | None

Prefix of paths to station data in telescope model.

Processing Block

Processing block configuration entities.

class ska_sdp_config.entity.pb.FlowDependency(*, purpose: list[str], flow_key: Key)[source]

A flow dependency

flow_key: Key

The identifying key of the flow that is depended on

purpose: list[str]

what the flow is to be used for (e.g. calibration)

class ska_sdp_config.entity.pb.PBDependency(*, kind: list[str], pb_id: Annotated[str, _PydanticGeneralMetadata(pattern='^pb\\-[a-z0-9]+\\-[0-9]{8}\\-[a-z0-9]+$')])[source]

A Processing Block dependency.

kind: list[str]

How this dependency is meant to be interpreted.

pb_id: Annotated[str, FieldInfo(annotation=NoneType, required=True, metadata=[_PydanticGeneralMetadata(pattern='^pb\\-[a-z0-9]+\\-[0-9]{8}\\-[a-z0-9]+$')])]

The ID of the Processing Block that is depended on.

class ska_sdp_config.entity.pb.ProcessingBlock(*, key: ~typing.Annotated[str, _PydanticGeneralMetadata(pattern='^pb\\-[a-z0-9]+\\-[0-9]{8}\\-[a-z0-9]+$')], eb_id: ~typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, metadata=[_PydanticGeneralMetadata(pattern='^eb\\-[a-z0-9]+\\-[0-9]{8}\\-[a-z0-9]+$')])] | None, script: ~ska_sdp_config.entity.script.Script.Key, parameters: dict = <factory>, dependencies: list[~ska_sdp_config.entity.pb.FlowDependency | ~ska_sdp_config.entity.pb.PBDependency] = <factory>)[source]

Processing block entity.

Collects configuration information relating to a processing job for the SDP. This might be either real-time (supporting a running observation) or batch (to process data after the fact).

Actual execution of processing steps will be performed by a (parameterised) processing script interpreting processing block information.

dependencies: Annotated[list[FlowDependency | PBDependency], FieldInfo(annotation=NoneType, required=False, default_factory=list)]

Dependencies of this Processing Block on other Processing Blocks and Flows.

eb_id: Annotated[str, FieldInfo(annotation=NoneType, required=True, metadata=[_PydanticGeneralMetadata(pattern='^eb\\-[a-z0-9]+\\-[0-9]{8}\\-[a-z0-9]+$')])] | None

ID of the Execution Block associated with this Processing Block, if any.

key: Annotated[str, FieldInfo(annotation=NoneType, required=True, metadata=[_PydanticGeneralMetadata(pattern='^pb\\-[a-z0-9]+\\-[0-9]{8}\\-[a-z0-9]+$')])]

The primary key to this entity.

parameters: Annotated[dict, FieldInfo(annotation=NoneType, required=False, default_factory=dict)]

Parameters for the Processing Block.

script: Key

The script that will execute this Processing Block.

Deployment

Deployment configuration entities.

class ska_sdp_config.entity.deployment.Deployment(*, key: Annotated[str, _PydanticGeneralMetadata(pattern='^[A-Za-z0-9\\-]{1,96}$')], kind: Literal['helm', 'slurm'], args: dict)[source]

Deployment entity.

Collects configuration information relating to a cluster configuration change.

args: dict

A dictionary of values used to customise the deployment. In the case of helm deployments, these are Helm values. For slurm deployment these are used to construct a Slurm script.

key: Annotated[str, FieldInfo(annotation=NoneType, required=True, metadata=[_PydanticGeneralMetadata(pattern='^[A-Za-z0-9\\-]{1,96}$')])]

The primary key to this entity.

kind: Literal['helm', 'slurm']

The kind of deployment, currently “helm” and “slurm” are supported.

Script

Script model.

class ska_sdp_config.entity.script.Script(*, key: ~ska_sdp_config.entity.script.Script.Key, image: ~typing.Annotated[str, _PydanticGeneralMetadata(pattern='^[a-zA-Z0-9_:\\./-]+$')], parameters: dict = <factory>, sdp_version: ~types.Annotated[str | None, _PydanticGeneralMetadata(pattern='^(?:[><]=?|==)\\d+\\.\\d+\\.\\d+(?:,\\s*(?:[><]=?|==)\\d+\\.\\d+\\.\\d+)*$')] = None, resources: list[~ska_sdp_config.entity.script.ScriptResources] = [])[source]

An SDP Script.

class Key(*, kind: Annotated[str, _PydanticGeneralMetadata(pattern='^(realtime)|(batch)$')], name: Annotated[str, _PydanticGeneralMetadata(pattern='^[a-zA-Z0-9_-]+$')], version: Annotated[str, _PydanticGeneralMetadata(pattern='^[a-zA-Z0-9_\\.-]+$')])[source]

An SDP Script primary key.

kind: Annotated[str, FieldInfo(annotation=NoneType, required=True, metadata=[_PydanticGeneralMetadata(pattern='^(realtime)|(batch)$')])]

The kind of this script (realtime or batch).

name: Annotated[str, FieldInfo(annotation=NoneType, required=True, metadata=[_PydanticGeneralMetadata(pattern='^[a-zA-Z0-9_-]+$')])]

The name of this script.

version: Annotated[str, FieldInfo(annotation=NoneType, required=True, metadata=[_PydanticGeneralMetadata(pattern='^[a-zA-Z0-9_\\.-]+$')])]

The version of this script.

image: Annotated[str, FieldInfo(annotation=NoneType, required=True, metadata=[_PydanticGeneralMetadata(pattern='^[a-zA-Z0-9_:\\./-]+$')])]

The OCI image used to launch this script.

key: Annotated[Key, FieldInfo(annotation=NoneType, required=True, exclude=True)]

The primary key to this entity.

parameters: Annotated[dict, FieldInfo(annotation=NoneType, required=False, default_factory=dict)]

Parameters for the script.

resources: list[ScriptResources]
sdp_version: Annotated[str | None, FieldInfo(annotation=NoneType, required=True, metadata=[_PydanticGeneralMetadata(pattern='^(?:[><]=?|==)\\d+\\.\\d+\\.\\d+(?:,\\s*(?:[><]=?|==)\\d+\\.\\d+\\.\\d+)*$')])]

The range of SDP versions this script is compatible with.

class ska_sdp_config.entity.script.ScriptResources(*, kind: Annotated[Literal['performance-buffer-storage', 'capacity-buffer-storage', 'processing-node'], _PydanticGeneralMetadata(pattern='^[a-z-]{1,96}$')], name: Annotated[str, _PydanticGeneralMetadata(pattern='^[a-zA-Z0-9_-]+$')], phases: list[str] = [], quantity: str)[source]

The Resources estimated to be required by an SDP Script.

kind: Annotated[Literal['performance-buffer-storage', 'capacity-buffer-storage', 'processing-node'], FieldInfo(annotation=NoneType, required=True, metadata=[_PydanticGeneralMetadata(pattern='^[a-z-]{1,96}$')])]

The kind of resource.

name: Annotated[str, FieldInfo(annotation=NoneType, required=True, metadata=[_PydanticGeneralMetadata(pattern='^[a-zA-Z0-9_-]+$')])]

The name of the resource definition.

phases: list[str]

The phases that this applies to.

quantity: str

The quantity as a python parsable expression.

classmethod quantity_valid_syntax(quantity_definition: str) str[source]

Ensure the quantity is specified in a python parsable expression.

Flow

Flow entity Model and related sub-entities.

ska_sdp_config.entity.flow.ChannelAddressMapping

A contiguous, ordered mapping of observation channels to receivers.

dict[start_channel, (receiver_id, port_start, port_increment)), e.g.

>>> from pydantic import TypeAdapter
>>> from ska_sdp_config.entity.flow import ChannelAddressMapping
>>> channel_map = TypeAdapter(ChannelAddressMapping).validate_python({
...     0: (0, 8000, 1),
...     400: (1, 8000, 1),
...     800: (2, 8000, 1),
... })
ska_sdp_config.entity.flow.ChannelMap

A contiguous, ordered mapping of channels to a value. Tuples represent (start_channel, value), e.g.

>>> from ipaddress import IPv4Address
>>> from pydantic import TypeAdapter
>>> from ska_sdp_config.entity.flow import ChannelMap
>>> channel_map = TypeAdapter(ChannelMap[IPv4Address]).validate_python([
...     (0, '192.168.0.1'),
...     (400, '192.168.0.2')
... ])

alias of Sequence[tuple[Annotated[int, Ge(ge=0)], _T]]

class ska_sdp_config.entity.flow.DataProduct(*, kind: Literal['data-product'] = 'data-product', data_dir: PlainSerializer(func=str, return_type=str, when_used=always), FieldInfo(annotation=NoneType, required=True, json_schema_extra={'format': 'path', 'type': 'string'})], _PydanticGeneralMetadata(union_mode='left_to_right')], paths: list[~pathlib.Annotated[~pathlib.PurePath, ~pydantic.functional_validators.PlainValidator(func=~ska_sdp_config.entity.common.path._validate_pure_path, json_schema_input_type=str), ~pydantic.functional_serializers.PlainSerializer(func=str, return_type=str, when_used=always), FieldInfo(annotation=NoneType, required=True, json_schema_extra={'format': 'path', 'type': 'string'})]])[source]

Data Product flow entity e.g. Measurement Set.

data_dir: Annotated[PVCPath | AnyPurePath, Field(union_mode='left_to_right', description='Output directory of data products.')]
kind: Literal['data-product']
paths: list[AnyPurePath]

Channel mapping of output data products relative to the persistent volume directory.

class ska_sdp_config.entity.flow.DataProductPersist(*, kind: Literal['data-product-persist'] = 'data-product-persist', phase: Literal['SOLID', 'LIQUID', 'GAS'], expires_at: AwareDatetime | None = None)[source]

Data Product Persist flow entity.

Represents an external persistent storage endpoint.

expires_at: AwareDatetime | None

UTC timestamp for the data product archive to expire at.

If None, no archive expiry.

kind: Literal['data-product-persist']
phase: Literal['SOLID', 'LIQUID', 'GAS']

Level of resilience in archival heuristic.

class ska_sdp_config.entity.flow.DataQueue(*, kind: Literal['data-queue'] = 'data-queue', topics: Annotated[str, FieldInfo(annotation=NoneType, required=True, description='\n            A Kafka topic name. Legal characters including alphanumeric,\n            `-`, `_`, and `.`', metadata=[_PydanticGeneralMetadata(pattern='^[A-Za-z0-9-_.]+$')])] | Sequence[tuple[Annotated[int, Ge(ge=0)], Annotated[str, FieldInfo(annotation=NoneType, required=True, description='\n            A Kafka topic name. Legal characters including alphanumeric,\n            `-`, `_`, and `.`', metadata=[_PydanticGeneralMetadata(pattern='^[A-Za-z0-9-_.]+$')])]]], host: KafkaUrl, format: Literal['json', 'npy', 'npz', 'msgpack_numpy'])[source]

Data Queue flow entity e.g. Kafka topic.

format: Literal['json', 'npy', 'npz', 'msgpack_numpy']

The data encoded format.

host: KafkaUrl

The host URL of the data queue service.

kind: Literal['data-queue']
topics: KafkaTopicName | ChannelMap[KafkaTopicName]

The topic names.

class ska_sdp_config.entity.flow.Dependency(*, key: Key, expiry_time: Annotated[int, Ge(ge=-1)], description: str | None = None)[source]

Dependencies are used to lock flows, preventing deletion until the dependency is cleared.

class Key(*, pb_id: Annotated[str, _PydanticGeneralMetadata(pattern='^pb\\-[a-z0-9]+\\-[0-9]{8}\\-[a-z0-9]+$')], kind: Annotated[str, _PydanticGeneralMetadata(pattern='^[A-Za-z0-9-]{1,96}$')], name: Annotated[str, _PydanticGeneralMetadata(pattern='^[A-Za-z0-9-]{1,96}$')], origin: Annotated[str, _PydanticGeneralMetadata(pattern='^[A-Za-z0-9-]{1,96}')])[source]

A Dependency primary key.

kind: Annotated[str, Field(pattern='^[A-Za-z0-9-]{1,96}$', description='\n                    The kind of sink this flow moves data to.\n                    In flow, this is post-initialized with sink kind.\n                    provided.')]
origin: Annotated[str, Field(pattern='^[A-Za-z0-9-]{1,96}', description='\n                    The name of the origin component that issued this lock.\n                    This could also be a processing block.')]
description: str | None

A free form description.

expiry_time: Annotated[int, Ge(-1)]

After this time the dependency should be released. The time is specified in seconds. -1 == infinity.

key: Key

The reservation key.

class ska_sdp_config.entity.flow.Display(*, kind: Literal['display'] = 'display', widget: str, endpoint: AnyUrl)[source]

Data Display flow entity e.g. QA Display.

endpoint: AnyUrl

The display URL endpoint.

kind: Literal['display']
widget: str

The type of display widget.

class ska_sdp_config.entity.flow.Flow(*, key: Key, sink: SharedMem | DataQueue | Display | DataProduct | DataProductPersist | TangoAttribute | TangoAttributeMap | SpeadStream, sources: list[FlowSource], data_model: Annotated[str, _PydanticGeneralMetadata(pattern='^[A-Za-z0-9\\,\\[\\]]+$')], expiry_time: int = -1)[source]

Flow datastream entity.

Contains configuration information relating to streaming data connections related to a processing job for SDP. Flows are associated with and persist for the lifetime of a ProcessingBlock.

class Key(*, pb_id: Annotated[str, _PydanticGeneralMetadata(pattern='^pb\\-[a-z0-9]+\\-[0-9]{8}\\-[a-z0-9]+$')], kind: Annotated[str | None, _PydanticGeneralMetadata(pattern='^[A-Za-z0-9-]{1,96}$')] = None, name: Annotated[str, _PydanticGeneralMetadata(pattern='^[A-Za-z0-9-]{1,96}$')])[source]

An SDP Flow primary key.

kind: Annotated[str | None, Field(pattern='^[A-Za-z0-9-]{1,96}$', description='The kind of sink this flow moves data to.')]
name: Annotated[str, Field(pattern='^[A-Za-z0-9-]{1,96}$', description='The name of this flow.')]
pb_id: ProcessingBlockId

The ID of the Processing Block that this flow belongs to.

data_model: Annotated[str, Field(pattern='^[A-Za-z0-9\\,\\[\\]]+$', description='The DataModel that the flow data represents.')]
expiry_time: int

The amount of time in seconds to wait to expire. Negative numbers mean never expire.

key: Annotated[Key, Field(exclude=True)]

The primary key to this entity.

model_post_init(context: Any, /) None[source]

Override this method to perform additional initialization after __init__ and model_construct. This is useful if you want to do some validation that requires the entire model to be initialized.

sink: Annotated[SharedMem | DataQueue | Display | DataProduct | DataProductPersist | TangoAttribute | TangoAttributeMap | SpeadStream, Field(discriminator='kind', description='The kind of sink this flow moves data to.')]
sources: list[FlowSource]

A list containing sources of flow data.

class ska_sdp_config.entity.flow.FlowSource[source]

A generic flow source pointing to either: - an existing flow entry in the configuration database - external source, such as a tango attribute - an external url

function: str | None

Python function, class or OCI image that will flow data from source to sink

parameters: dict | None

Function, class or OCI image parameters

uri: Annotated[Flow.Key | AttributeTRL | AnyTRL | AnyUrl, Field(union_mode='left_to_right', description='\n                Reference to the data source:\n                - Flow.Key: key of a data flow entry\n                - TangoAttributeUrl: tango attribute reference\n                - AnyURL: url of any other kind\n                ')]
ska_sdp_config.entity.flow.ReceiverId

SPEAD Receiver ID.

alias of Annotated[int, Ge(ge=0)]

class ska_sdp_config.entity.flow.SharedMem(*, kind: Literal['sharedmem'] = 'sharedmem', impl: Literal['plasma'], host_path: Annotated[PurePath, PlainValidator(func=_validate_pure_path, json_schema_input_type=str), PlainSerializer(func=str, return_type=str, when_used=always)])[source]

Shared Memory flow entity e.g. Plasma.

host_path: AnyPurePath

The Path of the shared memory socket.

impl: Literal['plasma']

The shared memory implementation.

kind: Literal['sharedmem']
class ska_sdp_config.entity.flow.SpeadStream(*, kind: Literal['spead'] = 'spead', channel_map: dict[Annotated[str, FieldInfo(annotation=NoneType, required=True, description='Observation ScanType ID.', metadata=[_PydanticGeneralMetadata(pattern='^[a-z0-9-:]+$')])], dict[Annotated[str, FieldInfo(annotation=NoneType, required=True, description='Observation Beam ID.', metadata=[_PydanticGeneralMetadata(pattern='^[a-z0-9]+$')])], dict[Annotated[int, Ge(ge=0)], tuple[Annotated[int, Ge(ge=0)], Annotated[int, Ge(ge=0)], Annotated[int, Ge(ge=0)]]]]], receiver_version: str, streams: int = 1, transport_protocol: Literal['tcp', 'udp'] = 'udp', continuous_mode: bool = False)[source]

A SPEAD flow entity.

channel_map: dict[ScanTypeId, dict[BeamId, ChannelAddressMapping]]

The channel to network address mapping for scans and beams.

continuous_mode: bool

Flag for enabling receivers to re-create the streams and resume receiving data after all end of streams are reached.

property instances: int

Number of receiver instances required for all scans.

kind: Literal['spead']
receiver_version: str

Receiver OCI image version.

streams: int

Total number of streams per receiver.

transport_protocol: Literal['tcp', 'udp']

The spead stream transport protocol.

class ska_sdp_config.entity.flow.TangoAttribute(*, kind: Literal['tango'] = 'tango', attribute_url: AttributeTRL, dtype: Literal['DevEncoded', 'DevString', 'DevBoolean', 'DevUChar', 'DevUShort', 'DevULong', 'DevULong64', 'DevShort', 'DevLong', 'DevLong64', 'DevFloat', 'DevDouble'], max_dim_x: Annotated[int, Ge(ge=0)] = 0, max_dim_y: Annotated[int, Ge(ge=0)] = 0, default_value: tuple[str, bytes] | int | float | bool | str | None = None)[source]

Tango attribute flow entity.

This entity instructs to flow data into a single Tango attribute.

attribute_trl: Annotated[AttributeTRL, Field(alias='attribute_url', description='\n                Tango attribute TRL.\n\n                e.g. `"tango://mid-sdp/subarray/01/some_attribute"`.\n                ')]
property attribute_url: TangoAttributeUrl

Fully qualified tango attribute URL.

e.g. “tango://tangodb:10000/mid-sdp/subarray/01/some_attribute”.

default_value: TangoDataType | None

Initial attribute value, cast to the configured dtype to allow support for non-JSON Tango types.

If None, the attribute is initialized with quality ATTR_INVALID.

dtype: TangoArgType

Attribute data type corresponding to a tango.ArgType.

kind: Literal['tango']
max_dim_x: NonNegativeInt

Size of the slowest changing dimension.

max_dim_y: NonNegativeInt

Size of the fastest changing dimension.

classmethod warn_deprecated(values)[source]

warn for deprecations during validation.

class ska_sdp_config.entity.flow.TangoAttributeMap(*, kind: Literal['tangomap'] = 'tangomap', attributes: list[tuple[TangoAttribute, DataQuery]])[source]

Tango attribute map flow entity.

This entity instructs to flow data into multiple Tango attributes.

class DataQuery(*, when: str | None = None, select: str = '@')[source]

Decomposed JMESPath query for locating data within a data model.

select: str

JMESPath select query clause on the flow data model.

when: str | None

JMESPath when query clause on the flow data model.

attributes: list[tuple[TangoAttribute, DataQuery]]

Mapping of TangoAttributes with unique attribute URL to a data query.

kind: Literal['tangomap']
model_post_init(context: Any, /) None[source]

Override this method to perform additional initialization after __init__ and model_construct. This is useful if you want to do some validation that requires the entire model to be initialized.

Resource Management

Entities related to resource management.

class ska_sdp_config.entity.resource_management.Allocation(*, key: Key, quantity: Annotated[int, Ge(ge=0)] = 0, tags: list[str] = [], instances: list[str] = [], resource_link: Key | None = None, request_link: Key | None = None)[source]

Allocation for a resource request, created by the processing controller.

class Key(*, pb_id: Annotated[str, _PydanticGeneralMetadata(pattern='^pb\\-[a-z0-9]+\\-[0-9]{8}\\-[a-z0-9]+$')], kind: Annotated[str, _PydanticGeneralMetadata(pattern='performance-buffer-storage|capacity-buffer-storage|processing-node')], name: Annotated[str, _PydanticGeneralMetadata(pattern='^[A-Za-z0-9-]{1,96}$')])[source]

A request primary key. No overrides. Needed to be redefined here for Key pattern validation

key: Key

The primary key to this entity.

Key to the Request this Allocation fulfills.

Key to Resource this Allocation is taken from.

validate_data() Allocation[source]

Run post-init validation

class ska_sdp_config.entity.resource_management.Request(*, key: ~ska_sdp_config.entity.resource_management.Request.Key, quantity: ~typing.Annotated[int, ~annotated_types.Ge(ge=0)] = 0, tags: list[str] = [], instances: list[str] = [], phases: list[~typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, metadata=[_PydanticGeneralMetadata(pattern='^\\d+-[a-zA-Z]+$')])]] = <factory>)[source]

Indicates a request for a resource. Created by processing scripts to request resources of a certain kind.

class Key(*, pb_id: Annotated[str, _PydanticGeneralMetadata(pattern='^pb\\-[a-z0-9]+\\-[0-9]{8}\\-[a-z0-9]+$')], kind: Annotated[str, _PydanticGeneralMetadata(pattern='performance-buffer-storage|capacity-buffer-storage|processing-node')], name: Annotated[str, _PydanticGeneralMetadata(pattern='^[A-Za-z0-9-]{1,96}$')])[source]

A request primary key. No overrides. Needed to be redefined here for Key pattern validation

key: Key

The primary key to this entity.

phases: Annotated[list[Annotated[str, Field(pattern=PHASES_NAME_PATTERN)]], Field(default_factory=list)]
class ska_sdp_config.entity.resource_management.RequestAllocationKey(*, pb_id: Annotated[str, _PydanticGeneralMetadata(pattern='^pb\\-[a-z0-9]+\\-[0-9]{8}\\-[a-z0-9]+$')], kind: Annotated[str, _PydanticGeneralMetadata(pattern='performance-buffer-storage|capacity-buffer-storage|processing-node')], name: Annotated[str, _PydanticGeneralMetadata(pattern='^[A-Za-z0-9-]{1,96}$')])[source]

Primary key for Request or Allocation.

kind: Annotated[str, Field(pattern=KIND_PATTERN)]

The kind of this particular request/allocation/resource.

name: Annotated[str, Field(pattern='^[A-Za-z0-9-]{1,96}$')]

Unique name of a request/allocation/resource of the given kind. In case of allocations, they need to match the name of the corresponding request.

pb_id: ProcessingBlockId

The ID of the Processing Block that the request/allocation belongs to.

class ska_sdp_config.entity.resource_management.Resource(*, key: Key, quantity: Annotated[int, Ge(ge=0)] = 0, tags: list[str] = [], instances: list[str] = [], information: dict | None = None)[source]

Indicates platform availability of a resource. The resource manager will update free space quantity and status on each cycle. Resources types can include, but not limited to: buffer, cpu, memory, nodes etc.

class Key(*, kind: Annotated[str, _PydanticGeneralMetadata(pattern='performance-buffer-storage|capacity-buffer-storage|processing-node')], name: Annotated[str, _PydanticGeneralMetadata(pattern='^[A-Za-z0-9-]{1,96}$')])[source]

A resource primary key.

kind: Annotated[str, Field(pattern=KIND_PATTERN)]

The kind of this particular resource.

name: Annotated[str, Field(pattern='^[A-Za-z0-9-]{1,96}$')]

The name of this resource (e.g. a PVC of choice).

information: dict | None

Optional specifications such as contingency reserve

key: Key

The primary key to this entity.

class ska_sdp_config.entity.resource_management.ResourceBaseModel(*, key: Annotated[str, _PydanticGeneralMetadata(pattern='.+')], quantity: Annotated[int, Ge(ge=0)] = 0, tags: list[str] = [], instances: list[str] = [])[source]

Shared model between resource management entities.

are_tags_subset_of(other_entity: ResourceBaseModel) bool[source]

Are the tags of this entity a subset of another?

instances: list[str]

List of identifiers of a certain resource type, e.g. node names, subnets, etc.

is_quantity_subset_of(other_entity: ResourceBaseModel) bool[source]

Is this entity’s quantity <= than the quanity of another?

is_same_kind_as(other_entity: ResourceBaseModel) bool[source]

Does this entity share a kind with another one?

is_subset_of(other_entity: ResourceBaseModel) bool[source]

Is this entity a subset of another based on kind, quantity, and tags?

quantity: NonNegativeInt

Quantity of request/allocation/resource.

tags: list[str]

Tags specifying the sub-resource types. E.g. “cpu_node”, “gpu_node”.

property unit: str

Unit of the quantity.

validate_data() ResourceBaseModel[source]

Run post-init validation