Type Hints

Common types used through out ska-tango-base.

class ska_tango_base.type_hints.BusProtocol[source]

A bus which emits signals and stores the most recently emitted value.

register_observer(observer: ObserverProtocol) None[source]

Register the observer to be notified when a value is emitted for a signal.

Once registered, the observer’s ObserverProtocol.notify_emission() method will be called asynchronously for every signal emitted on the bus. The observer remains registered until it is garbage collected.

Example usage:

class MyObserver:
    def notify_emission(self, signal, value):
        if "temperature" in signal:
            print(f"Temperature: {value}")


observer = MyObserver()
bus.register_observer(observer)  # Now receives all emissions
Parameters:

observer – The observer to register

register_transformer(transformer: TransformerProtocol) None[source]

Register a transformer to intercept and modify signal emissions.

The transformer’s TransformerProtocol.transform_emission_cascade() method will be called for each emission, allowing it to modify, filter, or generate signals. Only one transformer can be registered at a time - registering a new transformer will replace the previous one.

Parameters:

transformer – The transformer to register

mark_auto_store(signal: str) None[source]

Mark the signal to automatically store its emitted values on the bus.

When a signal is registered for auto-storage, the most recently emitted value is kept in memory and can be retrieved with get_last_emitted_value(). This is useful for signals that represent state that needs to be accessed later without waiting for a new emission.

Example usage:

# Register signals that represent component state
self.shared_bus.mark_auto_store(".error_count")

# Later, retrieve the last emitted values
error_count = self.shared_bus.get_last_emitted_value(".error_count")
Parameters:

signal – Absolute name of the signal

Note

This replaces the deprecated store=True parameter of emit(). New code should use this method instead of emit(..., store=True).

emit(signal: str, value: Any) None[source]
emit(signal: str, value: Any, *, store: bool = False) None

Emit a new value for the signal.

Any observes registered for this signal are notified asynchronously by the background thread which is started by start_thread().

If a transformer is registered, it will be called to potentially modify the signal before notification.

If the signal is registered via mark_auto_store(), the emitted value is also stored and can be retrieved later with get_last_emitted_value().

Example usage:

self.shared_bus.mark_auto_store(".config")
self.shared_bus.emit(".config", {"timeout": 30})
config = self.shared_bus.get_last_emitted_value(".config")

Note

If the internal queue is full, the emission may be discarded with a warning logged. This indicates that observers cannot keep up with the emission rate.

Parameters:
  • signal – Absolute name of the signal

  • value – New value to emit

  • storeDeprecated since 1.7.0. Use mark_auto_store() instead to mark signals for storage

Raises:

ValueError – If signal name starts with “!” (reserved for internal use)

get_last_emitted_value(signal: str) Any[source]

Get the last emission that was stored for the given signal.

Parameters:

signal – Absolute name of the signal.

Raises:

KeyError – If no value has been stored for the signal.

delete_last_emitted_value(signal: str) None[source]

Delete the last emission that was stored for the given signal.

Parameters:

signal – Absolute name of the signal.

Raises:

KeyError – If no value has been stored for the signal.

wait_for_thread(timeout: float | None = 5.0) None[source]

Synchronise the calling thread with the background thread.

When this function returns, all emissions sequenced before this call will have been processed by the background thread.

wait_for_signal_value(signal: str, conditional: Callable[[Any], bool] | None = None, timeout: float | None = None) bool[source]

Wait for a value to be emitted for signal where conditional returns True.

Whenever a value is emitted for the signal, conditional(old_value, new_value) is called. The thread calling this method will block until conditional returns True or the timeout is reached. If conditional is None, this method will unblock on any value emitted for the signal.

__init__(*args, **kwargs)
class ska_tango_base.type_hints.CommandTrackerProtocol[source]

All things to do with commands.

new_command(command_name: str, completed_callback: Callable[[], None] | None = None, started_callback: Callable[[], None] | None = None) str[source]

Create a new command.

Parameters:
  • command_name – the command name

  • completed_callback – an optional callback for command completion

  • started_callback – an optional callback for when a command starts

Returns:

a unique command id

update_command_info(command_id: str, status: TaskStatus | None = None, progress: int | None = None, result: tuple[ResultCode, str] | None = None, exception: Exception | None = None) None[source]

Update status information on the command.

Parameters:
  • command_id – the unique command id

  • status – the status of the asynchronous task

  • progress – the progress of the asynchronous task

  • result – the result of the completed asynchronous task

  • exception – any exception caught in the running task

get_command_status(command_id: str) TaskStatus[source]

Return the current status of a running command.

Parameters:

command_id – the unique command id

Returns:

a status of the asynchronous task.

property commands_in_queue: list[tuple[str, str]]

Return a list of commands in the queue.

Returns:

a list of (command_id, command_name) tuples, ordered by when invoked.

property command_statuses: list[tuple[str, TaskStatus]]

Return a list of command statuses.

Returns:

a list of (command_id, status) tuples, ordered by when invoked.

property command_progresses: list[tuple[str, int | str]]

Return a list of command progresses for commands in the queue.

Returns:

a list of (command_id, progress) tuples, ordered by when invoked.

property command_result: tuple[str, JSONData] | None

Return the result of the most recently completed command.

Returns:

a (command_id, result) tuple. If no command has completed yet, then None.

property command_exception: tuple[str, Exception] | None

Return the most recent exception, if any.

Returns:

a (command_id, exception) tuple. If no command has raised an uncaught exception, then None.

__init__(*args, **kwargs)
class ska_tango_base.type_hints.LRCCallbackType[source]

Expected LRC callback signature.

The callback will be called with some combination of the following arguments:

  • status: TaskStatus

  • progress: int

  • result: Any

  • error: tuple[DevError]

Each of the above arguments is optional and the callback must check which are present by testing them for None. The callback cannot assume that only one argument will be provided per call.

It must accept a generic **kwargs parameter for forwards compatibility.

__call__(**kwargs: Any) None[source]

Call the callback.

__init__(*args, **kwargs)
class ska_tango_base.type_hints.LRCSubscriptionsProtocol[source]

LRC event subscriptions that is returned by invoke_lrc.

Unsubscribes from all events when the instance is deleted.

property command_id: str

The command ID.

Returns:

the command ID.

property protocol_version: int

The LRC client-server protocol version used.

Returns:

the protocol version.

__init__(*args, **kwargs)
unsubscribe_lrc_events() None[source]

Unsubscribe from LRC attributes’ events.

class ska_tango_base.type_hints.ObserverProtocol[source]

An object which observes signals being emitted on a bus.

The observer must be registered with the bus first with BusProtocol.register_observer().

notify_emission(signal: str, value: Any) None[source]

Notify the observer of an emission asynchronously.

While this function is being called a logger is available at current_signal_bus_logger.get() which should be used (sparingly) for reporting problems.

__init__(*args, **kwargs)
class ska_tango_base.type_hints.ProgressCallbackType[source]

Structural subtyping protocol for a progress_callback.

A progress_callback should be called with a single integer indicating the progress of the task.

__call__(progress: int) None[source]

Call the callback.

__init__(*args, **kwargs)
class ska_tango_base.type_hints.LockedEmissionContextProtocol[source]

An object for atomically updating a stored emission with thread-safe locking.

This protocol is yielded by EmissionStoreManagerProtocol.lock_emission() and provides thread-safe access to a stored emission. All operations within the context are protected by a unique lock, ensuring atomic read-modify-write operations.

Example usage:

def transform_emission_cascade(self, emissions, store_manager):
    emissions = super().transform_emission_cascade(
        emissions, store_manager
    )
    with store_manager.lock_emission("counter") as counter:
        current_count = counter.get()
        counter.set(current_count + 1)
    return emissions
set(value: Any) None[source]

Set the stored value for the locked emission.

Parameters:

value – New value to store

get() Any[source]

Get the stored value for the locked emission.

delete() None[source]

Delete the stored value for the locked emission.

__init__(*args, **kwargs)
class ska_tango_base.type_hints.EmissionStoreManagerProtocol[source]

Context manager for atomically updating a stored emission with thread-safe locking.

The EmissionStoreManagerProtocol.lock_emission() method yields an LockedEmissionContextProtocol object that provides thread-safe, atomic read-modify-write operations for a stored emission. All operations within the context are protected by a lock unique to the signal, ensuring that concurrent updates cannot interleave.

Example usage:

def transform_emission_cascade(self, emissions, store_manager):
    emissions = super().transform_emission_cascade(
        emissions, store_manager
    )
    with store_manager.lock_emission("counter") as counter:
        current_count = counter.get()
        counter.set(current_count + 1)
    return emissions
lock_emission(signal: str) AbstractContextManager[LockedEmissionContextProtocol][source]

Get a locked context for atomically updating a stored emission.

This context manager provides thread-safe, atomic read-modify-write operations for a stored emission. All operations within the context are protected by a lock unique to the signal, ensuring that concurrent updates cannot interleave.

Should only be used within the transform_emission_cascade function.

Example usage:

def transform_emission_cascade(self, emissions, store_manager):
    emissions = super().transform_emission_cascade(emissions)
    with store_manager.lock_emission("counter") as counter:
        current_count = counter.get()
        counter.set(current_count + 1)
    return emissions
Parameters:

signal – Absolute name of the signal for which to return a locked context

Returns:

A context manager yielding an LockedEmissionContext for the signal

Raises:

TypeError – If the signal is marked for auto-storing of emissions

__init__(*args, **kwargs)
class ska_tango_base.type_hints.SharingObserverProtocol[source]

An object with access to an shared bus.

shared_bus: BusProtocol

Bus to emit signals on.

property observer_prefix: str

Path from the root of the bus to this object.

__init__(*args, **kwargs)
class ska_tango_base.type_hints.TransformerProtocol[source]

An object which transforms a cascade of emissions before they are sent to observers.

Transformers allow you to intercept and modify signal emissions at the bus level. A transformer’s transform_emission_cascade() method is called once per BusProtocol.emit() call, receiving all signals emitted in that cascade.

The transformer can suppress emissions, modify values, or generate additional emissions. Only one transformer can be registered with a bus at a time via BusProtocol.register_transformer().

The transformer must be registered with the bus first with BusProtocol.register_transformer().

transform_emission_cascade(emissions: dict[str, Any], store_manager: EmissionStoreManagerProtocol) dict[str, Any][source]

Transform a cascade of emissions.

This method is called for each BusProtocol.emit() call, receiving all signals being emitted in that cascade. You can modify the emissions - change values, add new signals, or remove signals by not including them in the returned dict.

The transformation is applied to the whole cascade of emissions at once, so the transformation can use the relationships between signals to determine how to transform them. Signals not included in the returned dict will be suppressed and not sent to observers. New signals can be added to generate derived emissions.

Parameters:
  • emissions – the cascade of emissions to be transformed, as a dict mapping signal name to emitted value.

  • store_manager – a handle to the transformer, which provides access to the bus and allows for thread-safe locking of stored emissions.

Returns:

the transformed cascade of emissions, as a dict mapping signal name to emitted value.

__init__(*args, **kwargs)
class ska_tango_base.type_hints.TaskCallbackType[source]

Structural subtyping protocol for a task_callback.

A task_callback will be called with some combination of the following arguments:

  • status: TaskStatus of the task.

  • progress: int progress of the task.

  • result: JSON serialisable result of the task.

  • exception: Exception raised from the task.

Each of the above arguments is optional and the callback must check which are present by testing them for None. The callback cannot assume that only one argument will be provided per call.

__call__(status: TaskStatus | None = None, progress: int | None = None, result: JSONData | None = None, exception: Exception | None = None) None[source]

Call the callback.

__init__(*args, **kwargs)
class ska_tango_base.type_hints.TaskFunctionType[source]

Perform some task, managing the task’s status.

The task should call task_callback(status=IN_PROGRESS) when it starts and task_callback(status=COMPLETED, result=<JSONData>) when it finishes normally.

It can periodically call task_callback(progress=<int>) to update clients to its progress. Also, the task must periodically check the task_abort_event and promptly abort if it is set, returning once cleanup has been completed. To acknowledge that the task has been aborted, the task should call task_callback(status=ABORTED, result=<JSONData).

Note

Due to limitations of the python 3.10 type system, this protocol is not used for type checking. However, this protocol describes the interface assumed by ska-tango-base.

Parameters:
  • task_callback – callback to notify when the task makes progress and of status changes

  • task_abort_event – set whenever the task is requested to abort

__call__(*args: ~typing.~P, task_callback: ~ska_tango_base.type_hints.TaskCallbackType | None, task_abort_event: ~threading.Event | None, **kwargs: ~typing.~P) None[source]

Run the task.

__init__(*args, **kwargs)
class ska_tango_base.type_hints.SimpleTaskFunctionType[source]

Perform some task without managing the task’s status.

This task periodically calls progress_callback with a value from 0-100 indicating the percentage completion of the task, and will periodically check the task_abort_event. If the task_abort_event is set the task will abort itself and raise TaskAborted.

With the exception of TaskAborted, this task does not raise an exception.

Note

Due to limitations of the python 3.10 type system, this protocol is not used for type checking. However, this protocol describes the interface assumed by ska-tango-base.

Parameters:
  • progress_callback – callback to notify when the task makes progress

  • task_abort_event – set whenever the task is requested to abort

Raises:

TaskAborted – when task_abort_event is set

Returns:

a JSON encodable result indicating the success or failure of the task

__call__(*args: ~typing.~P, progress_callback: ~ska_tango_base.type_hints.ProgressCallbackType, task_abort_event: ~threading.Event, **kwargs: ~typing.~P) JSONData[source]

Run the task.

__init__(*args, **kwargs)
class ska_tango_base.type_hints.TaskExecutorProtocol[source]

Protocol for a TaskExecutor.

property max_executing_tasks: int

Get the maximum number of simultaneously executing tasks.

property max_queued_tasks: int

Get the maximum task queue size.

submit(func: Any, args: Any | None = None, kwargs: Any | None = None, is_cmd_allowed: Callable[[], bool] | None = None, task_callback: TaskCallbackType | None = None) tuple[TaskStatus, str][source]

Submit a new task.

Parameters:
  • func – the task function to be executed.

  • args – positional arguments to the task function

  • kwargs – keyword arguments to the task function

  • is_cmd_allowed – sanity check for task execution

  • task_callback – the callback to be called when the status or progress of the task execution changes

Returns:

(TaskStatus, message)

abort(task_callback: TaskCallbackType | None = None) tuple[TaskStatus, str][source]

Tell this executor to abort execution.

Parameters:

task_callback – callback for abort complete

Returns:

tuple of task status & message

shutdown() None[source]

Permanently shutdown the executor.

__init__(*args, **kwargs)
ska_tango_base.type_hints.EventDataType: TypeAlias = tango._tango.EventData | tango._tango.AttrConfEventData | tango._tango.DataReadyEventData | tango._tango.DevIntrChangeEventData

Tango event data.

ska_tango_base.type_hints.EventCallbackCallable

A Tango event callback using the __call__ operator.

class ska_tango_base.type_hints.EventCallbackPushEventProtocol[source]

A Tango event callback using the push_event method.

__init__(*args, **kwargs)
push_event(event: EventData | AttrConfEventData | DataReadyEventData | DevIntrChangeEventData) None[source]

Process the event data.

ska_tango_base.type_hints.EventCallbackType: TypeAlias = collections.abc.Callable[[tango._tango.EventData | tango._tango.AttrConfEventData | tango._tango.DataReadyEventData | tango._tango.DevIntrChangeEventData], None] | ska_tango_base.type_hints.EventCallbackPushEventProtocol

An object that can be used to process Tango events.

class ska_tango_base.type_hints.JSONData

A type hint for any JSON-encodable data.

JSONData = (
   None
   | bool
   | int
   | float
   | str
   | list["JSONData"]  # A list can contain more JSON-encodable data
   | dict[str, "JSONData"]  # A dict must have str keys and JSON-encodable data
   | tuple["JSONData", ...]  # A tuple can contain more JSON-encodable data
)
class ska_tango_base.type_hints.DevVarLongStringArrayType

A type hint for the DevVarLongStringArray Tango data type.

DevVarLongStringArrayType = tuple[list[ResultCode], list[str]]
class ska_tango_base.type_hints.ReadAttrType

A type hint for attribute read methods which may return a Tango data triple.

AttrT = TypeVar("AttrT")
ReadAttrType = AttrT | tuple[AttrT, float, AttrQuality]

Usage:

class MyDevice(Device):
   @attribute(dtype="DevLong")
   def myAttr(self) -> ReadAttrType[int]:
      return 0, time.now(), AttrQuality.ATTR_WARNING