Source code for ska_tango_base.software_bus._bus

#
# This file is part of the SKA Tango Base project
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE.txt for more info.
import contextlib
import contextvars
import logging
import queue
import sys
import threading
import time
import typing
import warnings
import weakref

if sys.version_info >= (3, 13):
    from warnings import deprecated
else:
    from typing_extensions import deprecated

import tango

from .. import type_hints
from ._signal import NoValue

module_logger = logging.getLogger("software_bus")

current_signal_bus_logger = contextvars.ContextVar[logging.Logger](
    "current_signal_bus_logger", default=module_logger
)


[docs] class TimedOutError(Exception): """Timed out waiting for the signal bus background thread."""
[docs] def __init__(self) -> None: """Initalise the exception with the default error message.""" super().__init__(self.__class__.__doc__)
class _WaitForSignalObserver: """An observer for SignalBus.wait_for_signal_value.""" def __init__( self, signal: str, conditional: typing.Callable[[typing.Any], bool], event: threading.Event, ): self._signal = signal self._conditional = conditional self._event = event def notify_emission(self, signal: str, value: typing.Any) -> None: """ Unblock the `wait_for_signal_value` caller when `self._conditional` passes. First checks the signal is the one waited on, then unblocks the event if the value is desired as indicated by self._conditional. """ if signal == self._signal: if self._conditional(value): self._event.set() class _LockedEmissionContext: """An object for atomically updating a stored emission with thread-safe locking.""" def __init__(self, bus: "_SignalBus", signal: str) -> None: self._bus = bus self._signal = signal def set(self, value: typing.Any) -> None: """ Set the stored value for the locked emission. :param value: New value to store """ self._bus._emission_store[self._signal] = value def get(self) -> typing.Any: """Get the stored value for the locked emission.""" return self._bus.get_last_emitted_value(self._signal) def delete(self) -> None: """Delete the stored value for the locked emission.""" self._bus.delete_last_emitted_value(self._signal) class _EmissionStoreManager: """ Context manager for atomically updating a stored emission with thread-safe locking. The :py:meth:`_EmissionStoreManager.lock_emission` method yields an :py:class:`_LockedEmissionContext` object that provides thread-safe, atomic read-modify-write operations for a stored emission. All operations within the context are protected by a lock unique to the signal, ensuring that concurrent updates cannot interleave. """ def __init__(self, bus: "_SignalBus") -> None: self._bus = bus @contextlib.contextmanager def lock_emission(self, signal: str) -> typing.Iterator[_LockedEmissionContext]: """ Get a locked context for atomically updating a stored emission. This context manager provides thread-safe, atomic read-modify-write operations for a stored emission. All operations within the context are protected by a lock unique to the signal, ensuring that concurrent updates cannot interleave. Example usage: .. code-block:: python def transform_emission_cascade(self, emissions, store_manager): emissions = super().transform_emission_cascade(emissions) with store_manager.lock_emission("counter") as counter: current_count = counter.get() counter.set(current_count + 1) return emissions :param signal: Absolute name of the signal for which to return a locked context :returns: A context manager yielding an ``LockedEmissionContext`` for the signal :raises TypeError: If the signal is marked for auto-storing of emissions """ if signal in self._bus._emission_store_set: raise TypeError( 'The "lock_emission" context manager may not be used for signals ' f'marked for auto-storing, such as signal "{signal}"' ) with self._bus._emission_store_locks.setdefault(signal, threading.RLock()): yield _LockedEmissionContext(self._bus, signal) class _SignalBus: """ A software bus that notifies observers listening to signals. The bus can :py:func:`emit()` values for a given signal. In order to get notified when a value is emitted for any signals, an observer can be registered with :py:func:`register_observer()`. Observers are stored in a class :py:class:`weakref.WeakSet` and so must be kept alive by the caller. Each signal is identified by a user-provided string. When a value is emitted for a given signal an "emission" is added to an internal queue. This queue is serviced by a separate background thread, which must be started with :py:func:`start_thread()`. When background thread receives an emission from the internal queue it will notify all the registered observer objects that still exist by calling :py:func:`ObserverProtocol.notify_emission`. This notification occurs in an unspecified order. Emissions are processed in the order they are received. The internal queue has a maximum size and attempting to :py:func:`emit()` while the queue is full will log a warning message before discarding the emission. The expectation is that observers should return quickly compared to the rate that signals are emitted, so the internal queue filling up should be considered a misuse of ``SignalBus``. The limit is in place so that the failure mode is more explicit than some queue growing and eating up all the memory on the system. :param max_queue_size: Maximum size of the internal queue :param name: name to use for the background thread :param emit_timeout: how long to wait in seconds if the queue is full """ def __init__( self, logger: logging.Logger | None = None, max_queue_size: int = 8196, name: str = "", emit_timeout: float = 10.0, ) -> None: """ Initialise the object. :param logger: to use to for logging :param max_queue_size: maximum number of emissions that can be stored in the internal queue """ self._logger = module_logger if logger is None else logger # We don't want to create circular references with this bus, so we only # ever hold on to observers with weak references. If the observer goes # away, we don't care and will just stop notifying it. self._observers = weakref.WeakSet[type_hints.ObserverProtocol]() self._transformer_ref: ( weakref.WeakMethod[type_hints.TransformerCallable] | None ) = None self._emit_timeout = emit_timeout self._blocking_emissions_lock = threading.Lock() self._last_emission_queue_full_log = 0.0 self._blocking_emissions = 0 self._emission_queue = queue.Queue[tuple[str, typing.Any]]( maxsize=max_queue_size ) self._emission_store_set: set[str] = set() self._emission_store: dict[str, typing.Any] = {} self._emission_store_locks: dict[str, threading.RLock] = {} if name: name += " " self._thread = threading.Thread( target=self._run_bus_thread, name=f"{name}SignalBus Thread", ) def register_transformer(self, transformer: type_hints.TransformerProtocol) -> None: """ Register a transformer to transform emissions before they are emitted. The transformer's :py:meth:`TransformerProtocol.transform_emission_cascade` method will be called for each emission, allowing it to modify, filter, or generate new emissions. Only one transformer can be registered at a time - registering a new transformer will replace the previous one. :param transformer: The transformer to register """ self._transformer_ref = weakref.WeakMethod( transformer.transform_emission_cascade ) def register_observer(self, observer: type_hints.ObserverProtocol) -> None: """ Register an observer to be notified when a signal is emitted. `observer.notify_emission` will be called for every signal emitted on the `SignalBus`. It is the responsibility of the observer to determine whether the signal is interesting. If `register_observer` is called while the SignalBus background thread is running, the observer will be registered after any queued emissions (calls to `SignalBus.emit`). `wait_for_thread` is supplied for synchronising with the background thread. If the observer is destroyed it will automatically be unregistered. :param observer: The observer to register. """ if self._thread.is_alive(): self._emission_queue.put(("!register", observer)) return self._observers.add(observer) def mark_auto_store(self, signal: str) -> None: """ Mark the signal to automatically store its emitted values on the bus. When a signal is registered for auto-storage, the most recently emitted value is kept in memory and can be retrieved with :py:meth:`get_last_emitted_value`. This is useful for signals that represent state that needs to be accessed later without waiting for a new emission. Example usage: .. code-block:: python # Register signals that represent component state self.shared_bus.mark_auto_store(".error_count") # Later, retrieve the last emitted values error_count = self.shared_bus.get_last_emitted_value(".error_count") :param signal: Absolute name of the signal .. note:: This replaces the deprecated ``store=True`` parameter of :py:meth:`emit`. New code should use this method instead of ``emit(..., store=True)``. """ self._emission_store_set.add(signal) _LOG_QUEUE_FULL_PERIOD = 10.0 @typing.overload def emit(self, signal: str, value: typing.Any) -> None: ... @typing.overload @deprecated( 'The "store" keyword argument is deprecated since ska-tango-base 1.7.0. ' 'To store the value being emitted, use the "mark_auto_store" method instead.', ) def emit(self, signal: str, value: typing.Any, *, store: bool = False) -> None: ... def emit(self, signal: str, value: typing.Any, *, store: bool = False) -> None: """ Emit a new value for the signal. Any observes registered for this signal are notified asynchronously by the background thread which is started by :py:func:`start_thread()`. If a transformer is registered, it will be called to potentially modify the signal before notification. If the signal is registered via :py:meth:`mark_auto_store`, the emitted value is also stored and can be retrieved later with :py:meth:`get_last_emitted_value`. Example usage: .. code-block:: python self.shared_bus.mark_auto_store(".config") self.shared_bus.emit(".config", {"timeout": 30}) config = self.shared_bus.get_last_emitted_value(".config") .. note:: If the internal queue is full, the emission may be discarded with a warning logged. This indicates that observers cannot keep up with the emission rate. :param signal: Absolute name of the signal :param value: New value to emit :param store: **Deprecated since 1.7.0.** Use :py:meth:`mark_auto_store` instead to mark signals for storage :raises ValueError: If signal name starts with "!" (reserved for internal use) """ if signal.startswith("!"): raise ValueError( f'Invalid signal "{signal}". Signals may not start with a "!"' ) # TODO: DEPRECATED - Remove in next major release if store: self.mark_auto_store(signal) # Call the transformer on the emission if one has been registered cascade: dict[str, typing.Any] = {signal: value} transformer_func = self._transformer_ref() if self._transformer_ref else None if transformer_func: cascade = transformer_func(cascade, _EmissionStoreManager(self)) # Put the cascade of emissions on the queue and store them if required for sig, val in cascade.items(): store_lock = None if val is NoValue: warnings.warn( 'Assigning "NoValue" to a "Signal" is deprecated since ' "ska-tango-base 1.5.0. It will raise an exception in the next " 'major release. To delete the stored value, use the "del" keyword ' "on the signal instead. To change a linked attribute's quality to " '"ATTR_INVALID", assign "None" instead.', DeprecationWarning, ) try: self.delete_last_emitted_value(sig) except KeyError: pass elif sig in self._emission_store_set: store_lock = self._emission_store_locks.setdefault( sig, threading.RLock() ) store_lock.acquire() try: self._emission_queue.put((sig, val), block=False) except queue.Full: with self._blocking_emissions_lock: self._blocking_emissions += 1 try: self._emission_queue.put((sig, val), timeout=self._emit_timeout) except queue.Full: self._logger.exception( "Observers cannot keep up with rate signals are being emitted. " "Emission %r has been discarded after waiting %s seconds.", (sig, val), self._emit_timeout, ) else: if store_lock: self._emission_store[sig] = val else: if store_lock: self._emission_store[sig] = val finally: if store_lock: store_lock.release() with self._blocking_emissions_lock: now = time.monotonic() if ( now - self._last_emission_queue_full_log > self._LOG_QUEUE_FULL_PERIOD and self._blocking_emissions != 0 ): prefix = "has" suffix = "" if self._blocking_emissions != 1: prefix = f"and {self._blocking_emissions - 1} other emissions have" if self._last_emission_queue_full_log != 0.0: since_last_log = now - self._last_emission_queue_full_log suffix = f" in the last {since_last_log} seconds" self._logger.warning( "Observers cannot keep up with rate signals " "are being emitted. " "Emission %r %s had to block waiting for " "space in the queue%s.", (signal, value), prefix, suffix, ) self._last_emission_queue_full_log = now self._blocking_emissions = 0 def get_last_emitted_value(self, signal: str) -> typing.Any: """ Get the last emission that was stored for the given signal. :param signal: Absolute name of the signal. :raises KeyError: If no value has been stored for the signal. """ with self._emission_store_locks.setdefault(signal, threading.RLock()): return self._emission_store[signal] def delete_last_emitted_value(self, signal: str) -> None: """ Delete the last emission that was stored for the given signal. :param signal: Absolute name of the signal. :raises KeyError: If no value has been stored for the signal. """ with self._emission_store_locks.setdefault(signal, threading.RLock()): del self._emission_store[signal] def start_thread(self) -> None: """Start the background thread to notify observers about emissions.""" self._thread.start() def shutdown_thread(self) -> None: """ Shutdown the background thread. This waits for the thread to finish processing remaining emissions. If the background thread is not running, this does nothing. """ # Wait for all emissions to be processed so we don't miss any events if self._thread.is_alive(): self._emission_queue.put(("!shutdown", None)) self._thread.join() def wait_for_thread(self, timeout: float | None = 5.0) -> None: """ Synchronise the calling thread with the background thread. When this function returns, all emissions sequenced before this call will have been processed by the background thread. :param timeout: Time to wait for the background thread. A timeout of ``None`` will wait forever. :raises TimedOutError: If the timeout is exceeded while waiting for the background thread. """ event = threading.Event() self._emission_queue.put(("!fence", (event, None))) if not event.wait(timeout=timeout): raise TimedOutError() def wait_for_signal_value( self, signal: str, conditional: typing.Callable[[typing.Any], bool] | None = None, timeout: float | None = None, ) -> bool: """ Wait for a value to be emitted for `signal` where `conditional` returns True. Whenever a value is emitted for the signal, `conditional(old_value, new_value)` is called. The thread calling this method will block until `conditional` returns True or the timeout is reached. If `conditional` is None, this method will unblock on any value emitted for the signal. :param signal: The signal to wait for a value emission. :param conditional: Callable used to check for the desired signal value. :param timeout: Time to wait for signal value. A timeout of None (the default) will wait forever. :returns: True if a desired value was emitted for the signal within the timeout, False otherwise. """ if conditional is None: def default(*_: typing.Any) -> bool: return True conditional = default # Event gets set() by _run_bus_thread event = threading.Event() # The observer will exist at least until it is added to self._observers by the # background thread. If the timeout is longer than it takes to be registered, # the only strong reference to the observer will be in this method, and when # execution leaves this scope, the garbage collector will delete the observer. observer = _WaitForSignalObserver(signal, conditional, event) self.register_observer(observer) return event.wait(timeout) def __del__(self) -> None: """Delete the object.""" if self._thread.is_alive(): self.shutdown_thread() def _process_emission(self, signal: str, value: typing.Any) -> None: for obs in self._observers: try: obs.notify_emission(signal, value) except Exception: self._logger.exception( ( "Observer %r threw an unexpected exception while " + "processing emission %r. Continuing with remaining observers." ), obs, (signal, value), ) def _run_bus_thread(self) -> None: current_signal_bus_logger.set(self._logger) with tango.EnsureOmniThread(): while True: signal, value = self._emission_queue.get() if signal.startswith("!"): match signal: case "!shutdown": break case "!fence": received, pause_request = value received.set() if pause_request is not None: self._pause(*pause_request) case "!register": self._observers.add(value) case _: self._logger.error( f'Invalid control signal "{signal}". Skipping.' ) else: self._process_emission(signal, value) # Workarounds for pytango#688, see comment in SignalBusMixin.__init__() def _pause( self, should_unpause_now: typing.Callable[[], bool], max_duration: float ) -> None: """Pause thread until should_unpause_now returns True.""" end_timestamp = time.monotonic() + max_duration while time.monotonic() < end_timestamp: if should_unpause_now(): return time.sleep(0.1) self._logger.error( "SignalBus background thread pause exceed " f"maximum duration of {max_duration} ms." ) def _pause_thread_until( self, callback: typing.Callable[[], bool], max_pause_duration: int = 10000, ack_timeout: int = 1000, ) -> None: """ Pause the background thread until the callback returns True. This function will first clear the queue, before pausing the background thread. Signals emitted after this function is called, will not be processed until the background thread is unpaused. The thread will emit and error and unpause if it exceeds the maximum duration. This function exists to workaround pytango#688 and should not be used outside of ska-tango-base. :param callback: Returns True when we should stop pausing :param max_pause_duration: Maximum duration to pause for :param ack_timeout: Timeout for pause acknowledgement :returns: Once the queue has been cleared and the background thread has acknowledged the pause request. """ event = threading.Event() self._emission_queue.put(("!fence", (event, (callback, max_pause_duration)))) if not event.wait(timeout=ack_timeout): raise TimedOutError()