Type Hints
Common types used through out ska-tango-base.
- class ska_tango_base.type_hints.BusProtocol[source]
A bus which emits signals and stores the most recently emitted value.
- register_observer(observer: ObserverProtocol) None[source]
Register the observer to be notified when a value is emitted for a signal.
Once registered, the observer’s
ObserverProtocol.notify_emission()method will be called asynchronously for every signal emitted on the bus. The observer remains registered until it is garbage collected.Example usage:
class MyObserver: def notify_emission(self, signal, value): if "temperature" in signal: print(f"Temperature: {value}") observer = MyObserver() bus.register_observer(observer) # Now receives all emissions
- Parameters:
observer – The observer to register
- register_transformer(transformer: TransformerProtocol) None[source]
Register a transformer to intercept and modify signal emissions.
The transformer’s
TransformerProtocol.transform_emission_cascade()method will be called for each emission, allowing it to modify, filter, or generate signals. Only one transformer can be registered at a time - registering a new transformer will replace the previous one.- Parameters:
transformer – The transformer to register
- mark_auto_store(signal: str) None[source]
Mark the signal to automatically store its emitted values on the bus.
When a signal is registered for auto-storage, the most recently emitted value is kept in memory and can be retrieved with
get_last_emitted_value(). This is useful for signals that represent state that needs to be accessed later without waiting for a new emission.Example usage:
# Register signals that represent component state self.shared_bus.mark_auto_store(".error_count") # Later, retrieve the last emitted values error_count = self.shared_bus.get_last_emitted_value(".error_count")
- Parameters:
signal – Absolute name of the signal
Note
This replaces the deprecated
store=Trueparameter ofemit(). New code should use this method instead ofemit(..., store=True).
- emit(signal: str, value: Any) None[source]
- emit(signal: str, value: Any, *, store: bool = False) None
Emit a new value for the signal.
Any observes registered for this signal are notified asynchronously by the background thread which is started by
start_thread().If a transformer is registered, it will be called to potentially modify the signal before notification.
If the signal is registered via
mark_auto_store(), the emitted value is also stored and can be retrieved later withget_last_emitted_value().Example usage:
self.shared_bus.mark_auto_store(".config") self.shared_bus.emit(".config", {"timeout": 30}) config = self.shared_bus.get_last_emitted_value(".config")
Note
If the internal queue is full, the emission may be discarded with a warning logged. This indicates that observers cannot keep up with the emission rate.
- Parameters:
signal – Absolute name of the signal
value – New value to emit
store – Deprecated since 1.7.0. Use
mark_auto_store()instead to mark signals for storage
- Raises:
ValueError – If signal name starts with “!” (reserved for internal use)
- get_last_emitted_value(signal: str) Any[source]
Get the last emission that was stored for the given signal.
- Parameters:
signal – Absolute name of the signal.
- Raises:
KeyError – If no value has been stored for the signal.
- delete_last_emitted_value(signal: str) None[source]
Delete the last emission that was stored for the given signal.
- Parameters:
signal – Absolute name of the signal.
- Raises:
KeyError – If no value has been stored for the signal.
- wait_for_thread(timeout: float | None = 5.0) None[source]
Synchronise the calling thread with the background thread.
When this function returns, all emissions sequenced before this call will have been processed by the background thread.
- wait_for_signal_value(signal: str, conditional: Callable[[Any], bool] | None = None, timeout: float | None = None) bool[source]
Wait for a value to be emitted for signal where conditional returns True.
Whenever a value is emitted for the signal, conditional(old_value, new_value) is called. The thread calling this method will block until conditional returns True or the timeout is reached. If conditional is None, this method will unblock on any value emitted for the signal.
- __init__(*args, **kwargs)
- class ska_tango_base.type_hints.CommandTrackerProtocol[source]
All things to do with commands.
- new_command(command_name: str, completed_callback: Callable[[], None] | None = None, started_callback: Callable[[], None] | None = None) str[source]
Create a new command.
- Parameters:
command_name – the command name
completed_callback – an optional callback for command completion
started_callback – an optional callback for when a command starts
- Returns:
a unique command id
- update_command_info(command_id: str, status: TaskStatus | None = None, progress: int | None = None, result: tuple[ResultCode, str] | None = None, exception: Exception | None = None) None[source]
Update status information on the command.
- Parameters:
command_id – the unique command id
status – the status of the asynchronous task
progress – the progress of the asynchronous task
result – the result of the completed asynchronous task
exception – any exception caught in the running task
- get_command_status(command_id: str) TaskStatus[source]
Return the current status of a running command.
- Parameters:
command_id – the unique command id
- Returns:
a status of the asynchronous task.
- property commands_in_queue: list[tuple[str, str]]
Return a list of commands in the queue.
- Returns:
a list of (command_id, command_name) tuples, ordered by when invoked.
- property command_statuses: list[tuple[str, TaskStatus]]
Return a list of command statuses.
- Returns:
a list of (command_id, status) tuples, ordered by when invoked.
- property command_progresses: list[tuple[str, int | str]]
Return a list of command progresses for commands in the queue.
- Returns:
a list of (command_id, progress) tuples, ordered by when invoked.
- property command_result: tuple[str, JSONData] | None
Return the result of the most recently completed command.
- Returns:
a (command_id, result) tuple. If no command has completed yet, then None.
- property command_exception: tuple[str, Exception] | None
Return the most recent exception, if any.
- Returns:
a (command_id, exception) tuple. If no command has raised an uncaught exception, then None.
- __init__(*args, **kwargs)
- class ska_tango_base.type_hints.LRCCallbackType[source]
Expected LRC callback signature.
The callback will be called with some combination of the following arguments:
status:TaskStatusprogress:intresult:Anyerror:tuple[DevError]
Each of the above arguments is optional and the callback must check which are present by testing them for None. The callback cannot assume that only one argument will be provided per call.
It must accept a generic **kwargs parameter for forwards compatibility.
- __init__(*args, **kwargs)
- class ska_tango_base.type_hints.LRCSubscriptionsProtocol[source]
LRC event subscriptions that is returned by invoke_lrc.
Unsubscribes from all events when the instance is deleted.
- property protocol_version: int
The LRC client-server protocol version used.
- Returns:
the protocol version.
- __init__(*args, **kwargs)
- class ska_tango_base.type_hints.ObserverProtocol[source]
An object which observes signals being emitted on a bus.
The observer must be registered with the bus first with
BusProtocol.register_observer().- notify_emission(signal: str, value: Any) None[source]
Notify the observer of an emission asynchronously.
While this function is being called a logger is available at
current_signal_bus_logger.get()which should be used (sparingly) for reporting problems.
- __init__(*args, **kwargs)
- class ska_tango_base.type_hints.ProgressCallbackType[source]
Structural subtyping protocol for a
progress_callback.A
progress_callbackshould be called with a single integer indicating the progress of the task.- __init__(*args, **kwargs)
- class ska_tango_base.type_hints.LockedEmissionContextProtocol[source]
An object for atomically updating a stored emission with thread-safe locking.
This protocol is yielded by
EmissionStoreManagerProtocol.lock_emission()and provides thread-safe access to a stored emission. All operations within the context are protected by a unique lock, ensuring atomic read-modify-write operations.Example usage:
def transform_emission_cascade(self, emissions, store_manager): emissions = super().transform_emission_cascade( emissions, store_manager ) with store_manager.lock_emission("counter") as counter: current_count = counter.get() counter.set(current_count + 1) return emissions
- set(value: Any) None[source]
Set the stored value for the locked emission.
- Parameters:
value – New value to store
- __init__(*args, **kwargs)
- class ska_tango_base.type_hints.EmissionStoreManagerProtocol[source]
Context manager for atomically updating a stored emission with thread-safe locking.
The
EmissionStoreManagerProtocol.lock_emission()method yields anLockedEmissionContextProtocolobject that provides thread-safe, atomic read-modify-write operations for a stored emission. All operations within the context are protected by a lock unique to the signal, ensuring that concurrent updates cannot interleave.Example usage:
def transform_emission_cascade(self, emissions, store_manager): emissions = super().transform_emission_cascade( emissions, store_manager ) with store_manager.lock_emission("counter") as counter: current_count = counter.get() counter.set(current_count + 1) return emissions
- lock_emission(signal: str) AbstractContextManager[LockedEmissionContextProtocol][source]
Get a locked context for atomically updating a stored emission.
This context manager provides thread-safe, atomic read-modify-write operations for a stored emission. All operations within the context are protected by a lock unique to the signal, ensuring that concurrent updates cannot interleave.
Should only be used within the
transform_emission_cascadefunction.Example usage:
def transform_emission_cascade(self, emissions, store_manager): emissions = super().transform_emission_cascade(emissions) with store_manager.lock_emission("counter") as counter: current_count = counter.get() counter.set(current_count + 1) return emissions
- Parameters:
signal – Absolute name of the signal for which to return a locked context
- Returns:
A context manager yielding an
LockedEmissionContextfor the signal- Raises:
TypeError – If the signal is marked for auto-storing of emissions
- __init__(*args, **kwargs)
- class ska_tango_base.type_hints.SharingObserverProtocol[source]
An object with access to an shared bus.
Bus to emit signals on.
- __init__(*args, **kwargs)
- class ska_tango_base.type_hints.TransformerProtocol[source]
An object which transforms a cascade of emissions before they are sent to observers.
Transformers allow you to intercept and modify signal emissions at the bus level. A transformer’s
transform_emission_cascade()method is called once perBusProtocol.emit()call, receiving all signals emitted in that cascade.The transformer can suppress emissions, modify values, or generate additional emissions. Only one transformer can be registered with a bus at a time via
BusProtocol.register_transformer().The transformer must be registered with the bus first with
BusProtocol.register_transformer().- transform_emission_cascade(emissions: dict[str, Any], store_manager: EmissionStoreManagerProtocol) dict[str, Any][source]
Transform a cascade of emissions.
This method is called for each
BusProtocol.emit()call, receiving all signals being emitted in that cascade. You can modify the emissions - change values, add new signals, or remove signals by not including them in the returned dict.The transformation is applied to the whole cascade of emissions at once, so the transformation can use the relationships between signals to determine how to transform them. Signals not included in the returned dict will be suppressed and not sent to observers. New signals can be added to generate derived emissions.
- Parameters:
emissions – the cascade of emissions to be transformed, as a dict mapping signal name to emitted value.
store_manager – a handle to the transformer, which provides access to the bus and allows for thread-safe locking of stored emissions.
- Returns:
the transformed cascade of emissions, as a dict mapping signal name to emitted value.
- __init__(*args, **kwargs)
- class ska_tango_base.type_hints.TaskCallbackType[source]
Structural subtyping protocol for a
task_callback.A
task_callbackwill be called with some combination of the following arguments:status:TaskStatusof the task.progress:intprogress of the task.result: JSON serialisable result of the task.exception:Exceptionraised from the task.
Each of the above arguments is optional and the callback must check which are present by testing them for None. The callback cannot assume that only one argument will be provided per call.
- __call__(status: TaskStatus | None = None, progress: int | None = None, result: JSONData | None = None, exception: Exception | None = None) None[source]
Call the callback.
- __init__(*args, **kwargs)
- class ska_tango_base.type_hints.TaskFunctionType[source]
Perform some task, managing the task’s status.
The task should call
task_callback(status=IN_PROGRESS)when it starts andtask_callback(status=COMPLETED, result=<JSONData>)when it finishes normally.It can periodically call
task_callback(progress=<int>)to update clients to its progress. Also, the task must periodically check thetask_abort_eventand promptly abort if it is set, returning once cleanup has been completed. To acknowledge that the task has been aborted, the task should calltask_callback(status=ABORTED, result=<JSONData).Note
Due to limitations of the python 3.10 type system, this protocol is not used for type checking. However, this protocol describes the interface assumed by ska-tango-base.
- Parameters:
task_callback – callback to notify when the task makes progress and of status changes
task_abort_event – set whenever the task is requested to abort
- __call__(*args: ~typing.~P, task_callback: ~ska_tango_base.type_hints.TaskCallbackType | None, task_abort_event: ~threading.Event | None, **kwargs: ~typing.~P) None[source]
Run the task.
- __init__(*args, **kwargs)
- class ska_tango_base.type_hints.SimpleTaskFunctionType[source]
Perform some task without managing the task’s status.
This task periodically calls
progress_callbackwith a value from 0-100 indicating the percentage completion of the task, and will periodically check thetask_abort_event. If thetask_abort_eventis set the task will abort itself and raiseTaskAborted.With the exception of
TaskAborted, this task does not raise an exception.Note
Due to limitations of the python 3.10 type system, this protocol is not used for type checking. However, this protocol describes the interface assumed by ska-tango-base.
- Parameters:
progress_callback – callback to notify when the task makes progress
task_abort_event – set whenever the task is requested to abort
- Raises:
TaskAborted – when
task_abort_eventis set- Returns:
a JSON encodable result indicating the success or failure of the task
- __call__(*args: ~typing.~P, progress_callback: ~ska_tango_base.type_hints.ProgressCallbackType, task_abort_event: ~threading.Event, **kwargs: ~typing.~P) JSONData[source]
Run the task.
- __init__(*args, **kwargs)
- class ska_tango_base.type_hints.TaskExecutorProtocol[source]
Protocol for a TaskExecutor.
- submit(func: Any, args: Any | None = None, kwargs: Any | None = None, is_cmd_allowed: Callable[[], bool] | None = None, task_callback: TaskCallbackType | None = None) tuple[TaskStatus, str][source]
Submit a new task.
- Parameters:
func – the task function to be executed.
args – positional arguments to the task function
kwargs – keyword arguments to the task function
is_cmd_allowed – sanity check for task execution
task_callback – the callback to be called when the status or progress of the task execution changes
- Returns:
(TaskStatus, message)
- abort(task_callback: TaskCallbackType | None = None) tuple[TaskStatus, str][source]
Tell this executor to abort execution.
- Parameters:
task_callback – callback for abort complete
- Returns:
tuple of task status & message
- __init__(*args, **kwargs)
- ska_tango_base.type_hints.EventDataType: TypeAlias = tango._tango.EventData | tango._tango.AttrConfEventData | tango._tango.DataReadyEventData | tango._tango.DevIntrChangeEventData
Tango event data.
- ska_tango_base.type_hints.EventCallbackCallable
A Tango event callback using the __call__ operator.
- class ska_tango_base.type_hints.EventCallbackPushEventProtocol[source]
A Tango event callback using the push_event method.
- __init__(*args, **kwargs)
- push_event(event: EventData | AttrConfEventData | DataReadyEventData | DevIntrChangeEventData) None[source]
Process the event data.
- ska_tango_base.type_hints.EventCallbackType: TypeAlias = collections.abc.Callable[[tango._tango.EventData | tango._tango.AttrConfEventData | tango._tango.DataReadyEventData | tango._tango.DevIntrChangeEventData], None] | ska_tango_base.type_hints.EventCallbackPushEventProtocol
An object that can be used to process Tango events.
- class ska_tango_base.type_hints.JSONData
A type hint for any JSON-encodable data.
JSONData = ( None | bool | int | float | str | list["JSONData"] # A list can contain more JSON-encodable data | dict[str, "JSONData"] # A dict must have str keys and JSON-encodable data | tuple["JSONData", ...] # A tuple can contain more JSON-encodable data )
- class ska_tango_base.type_hints.DevVarLongStringArrayType
A type hint for the DevVarLongStringArray Tango data type.
DevVarLongStringArrayType = tuple[list[ResultCode], list[str]]
- class ska_tango_base.type_hints.ReadAttrType
A type hint for attribute read methods which may return a Tango data triple.
AttrT = TypeVar("AttrT") ReadAttrType = AttrT | tuple[AttrT, float, AttrQuality]
Usage:
class MyDevice(Device): @attribute(dtype="DevLong") def myAttr(self) -> ReadAttrType[int]: return 0, time.now(), AttrQuality.ATTR_WARNING