#
# This file is part of the SKA Tango Base project
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE.txt for more info.
import contextlib
import contextvars
import logging
import queue
import sys
import threading
import time
import typing
import warnings
import weakref
if sys.version_info >= (3, 13):
from warnings import deprecated
else:
from typing_extensions import deprecated
import tango
from .. import type_hints
from ._signal import NoValue
module_logger = logging.getLogger("software_bus")
current_signal_bus_logger = contextvars.ContextVar[logging.Logger](
"current_signal_bus_logger", default=module_logger
)
[docs]
class TimedOutError(Exception):
"""Timed out waiting for the signal bus background thread."""
[docs]
def __init__(self) -> None:
"""Initalise the exception with the default error message."""
super().__init__(self.__class__.__doc__)
class _WaitForSignalObserver:
"""An observer for SignalBus.wait_for_signal_value."""
def __init__(
self,
signal: str,
conditional: typing.Callable[[typing.Any], bool],
event: threading.Event,
):
self._signal = signal
self._conditional = conditional
self._event = event
def notify_emission(self, signal: str, value: typing.Any) -> None:
"""
Unblock the `wait_for_signal_value` caller when `self._conditional` passes.
First checks the signal is the one waited on, then unblocks the event if the
value is desired as indicated by self._conditional.
"""
if signal == self._signal:
if self._conditional(value):
self._event.set()
class _LockedEmissionContext:
"""An object for atomically updating a stored emission with thread-safe locking."""
def __init__(self, bus: "_SignalBus", signal: str) -> None:
self._bus = bus
self._signal = signal
def set(self, value: typing.Any) -> None:
"""
Set the stored value for the locked emission.
:param value: New value to store
"""
self._bus._emission_store[self._signal] = value
def get(self) -> typing.Any:
"""Get the stored value for the locked emission."""
return self._bus.get_last_emitted_value(self._signal)
def delete(self) -> None:
"""Delete the stored value for the locked emission."""
self._bus.delete_last_emitted_value(self._signal)
class _EmissionStoreManager:
"""
Context manager for atomically updating a stored emission with thread-safe locking.
The :py:meth:`_EmissionStoreManager.lock_emission` method yields an
:py:class:`_LockedEmissionContext` object that provides thread-safe, atomic
read-modify-write operations for a stored emission. All operations within the
context are protected by a lock unique to the signal, ensuring that concurrent
updates cannot interleave.
"""
def __init__(self, bus: "_SignalBus") -> None:
self._bus = bus
@contextlib.contextmanager
def lock_emission(self, signal: str) -> typing.Iterator[_LockedEmissionContext]:
"""
Get a locked context for atomically updating a stored emission.
This context manager provides thread-safe, atomic read-modify-write
operations for a stored emission. All operations within the context are
protected by a lock unique to the signal, ensuring that concurrent updates
cannot interleave.
Example usage:
.. code-block:: python
def transform_emission_cascade(self, emissions, store_manager):
emissions = super().transform_emission_cascade(emissions)
with store_manager.lock_emission("counter") as counter:
current_count = counter.get()
counter.set(current_count + 1)
return emissions
:param signal: Absolute name of the signal for which to return a locked context
:returns: A context manager yielding an ``LockedEmissionContext`` for the signal
:raises TypeError: If the signal is marked for auto-storing of emissions
"""
if signal in self._bus._emission_store_set:
raise TypeError(
'The "lock_emission" context manager may not be used for signals '
f'marked for auto-storing, such as signal "{signal}"'
)
with self._bus._emission_store_locks.setdefault(signal, threading.RLock()):
yield _LockedEmissionContext(self._bus, signal)
class _SignalBus:
"""
A software bus that notifies observers listening to signals.
The bus can :py:func:`emit()` values for a given signal. In order to
get notified when a value is emitted for any signals, an observer
can be registered with :py:func:`register_observer()`. Observers are stored
in a class :py:class:`weakref.WeakSet` and so must be kept alive by the
caller.
Each signal is identified by a user-provided string. When a value is emitted
for a given signal an "emission" is added to an
internal queue. This queue is serviced by a separate background thread,
which must be started with :py:func:`start_thread()`.
When background thread receives an emission from the internal queue it
will notify all the registered observer objects that still exist by
calling :py:func:`ObserverProtocol.notify_emission`. This notification occurs in an
unspecified order. Emissions are processed in the order they are received.
The internal queue has a maximum size and attempting to :py:func:`emit()` while the
queue is full will log a warning message before discarding the emission.
The expectation is that observers should return quickly compared to the rate
that signals are emitted, so the internal queue filling up should be considered
a misuse of ``SignalBus``. The limit is in place so that the failure mode is more
explicit than some queue growing and eating up all the memory on the system.
:param max_queue_size: Maximum size of the internal queue
:param name: name to use for the background thread
:param emit_timeout: how long to wait in seconds if the queue is full
"""
def __init__(
self,
logger: logging.Logger | None = None,
max_queue_size: int = 8196,
name: str = "",
emit_timeout: float = 10.0,
) -> None:
"""
Initialise the object.
:param logger: to use to for logging
:param max_queue_size: maximum number of emissions that can be stored
in the internal queue
"""
self._logger = module_logger if logger is None else logger
# We don't want to create circular references with this bus, so we only
# ever hold on to observers with weak references. If the observer goes
# away, we don't care and will just stop notifying it.
self._observers = weakref.WeakSet[type_hints.ObserverProtocol]()
self._transformer_ref: (
weakref.WeakMethod[type_hints.TransformerCallable] | None
) = None
self._emit_timeout = emit_timeout
self._blocking_emissions_lock = threading.Lock()
self._last_emission_queue_full_log = 0.0
self._blocking_emissions = 0
self._emission_queue = queue.Queue[tuple[str, typing.Any]](
maxsize=max_queue_size
)
self._emission_store_set: set[str] = set()
self._emission_store: dict[str, typing.Any] = {}
self._emission_store_locks: dict[str, threading.RLock] = {}
if name:
name += " "
self._thread = threading.Thread(
target=self._run_bus_thread,
name=f"{name}SignalBus Thread",
)
def register_transformer(self, transformer: type_hints.TransformerProtocol) -> None:
"""
Register a transformer to transform emissions before they are emitted.
The transformer's :py:meth:`TransformerProtocol.transform_emission_cascade`
method will be called for each emission, allowing it to modify, filter, or
generate new emissions. Only one transformer can be registered at a time -
registering a new transformer will replace the previous one.
:param transformer: The transformer to register
"""
self._transformer_ref = weakref.WeakMethod(
transformer.transform_emission_cascade
)
def register_observer(self, observer: type_hints.ObserverProtocol) -> None:
"""
Register an observer to be notified when a signal is emitted.
`observer.notify_emission` will be called for every signal emitted
on the `SignalBus`. It is the responsibility of the observer to
determine whether the signal is interesting.
If `register_observer` is called while the SignalBus background thread is
running, the observer will be registered after any queued emissions
(calls to `SignalBus.emit`). `wait_for_thread` is supplied for synchronising
with the background thread.
If the observer is destroyed it will automatically be unregistered.
:param observer: The observer to register.
"""
if self._thread.is_alive():
self._emission_queue.put(("!register", observer))
return
self._observers.add(observer)
def mark_auto_store(self, signal: str) -> None:
"""
Mark the signal to automatically store its emitted values on the bus.
When a signal is registered for auto-storage, the most recently emitted value
is kept in memory and can be retrieved with :py:meth:`get_last_emitted_value`.
This is useful for signals that represent state that needs to be accessed
later without waiting for a new emission.
Example usage:
.. code-block:: python
# Register signals that represent component state
self.shared_bus.mark_auto_store(".error_count")
# Later, retrieve the last emitted values
error_count = self.shared_bus.get_last_emitted_value(".error_count")
:param signal: Absolute name of the signal
.. note::
This replaces the deprecated ``store=True`` parameter of :py:meth:`emit`.
New code should use this method instead of ``emit(..., store=True)``.
"""
self._emission_store_set.add(signal)
_LOG_QUEUE_FULL_PERIOD = 10.0
@typing.overload
def emit(self, signal: str, value: typing.Any) -> None: ...
@typing.overload
@deprecated(
'The "store" keyword argument is deprecated since ska-tango-base 1.7.0. '
'To store the value being emitted, use the "mark_auto_store" method instead.',
)
def emit(self, signal: str, value: typing.Any, *, store: bool = False) -> None: ...
def emit(self, signal: str, value: typing.Any, *, store: bool = False) -> None:
"""
Emit a new value for the signal.
Any observes registered for this signal are notified asynchronously by the
background thread which is started by :py:func:`start_thread()`.
If a transformer is registered, it will be called to potentially modify the
signal before notification.
If the signal is registered via :py:meth:`mark_auto_store`, the emitted
value is also stored and can be retrieved later with
:py:meth:`get_last_emitted_value`.
Example usage:
.. code-block:: python
self.shared_bus.mark_auto_store(".config")
self.shared_bus.emit(".config", {"timeout": 30})
config = self.shared_bus.get_last_emitted_value(".config")
.. note::
If the internal queue is full, the emission may be discarded with a warning
logged. This indicates that observers cannot keep up with the emission rate.
:param signal: Absolute name of the signal
:param value: New value to emit
:param store: **Deprecated since 1.7.0.** Use :py:meth:`mark_auto_store`
instead to mark signals for storage
:raises ValueError: If signal name starts with "!" (reserved for internal use)
"""
if signal.startswith("!"):
raise ValueError(
f'Invalid signal "{signal}". Signals may not start with a "!"'
)
# TODO: DEPRECATED - Remove in next major release
if store:
self.mark_auto_store(signal)
# Call the transformer on the emission if one has been registered
cascade: dict[str, typing.Any] = {signal: value}
transformer_func = self._transformer_ref() if self._transformer_ref else None
if transformer_func:
cascade = transformer_func(cascade, _EmissionStoreManager(self))
# Put the cascade of emissions on the queue and store them if required
for sig, val in cascade.items():
store_lock = None
if val is NoValue:
warnings.warn(
'Assigning "NoValue" to a "Signal" is deprecated since '
"ska-tango-base 1.5.0. It will raise an exception in the next "
'major release. To delete the stored value, use the "del" keyword '
"on the signal instead. To change a linked attribute's quality to "
'"ATTR_INVALID", assign "None" instead.',
DeprecationWarning,
)
try:
self.delete_last_emitted_value(sig)
except KeyError:
pass
elif sig in self._emission_store_set:
store_lock = self._emission_store_locks.setdefault(
sig, threading.RLock()
)
store_lock.acquire()
try:
self._emission_queue.put((sig, val), block=False)
except queue.Full:
with self._blocking_emissions_lock:
self._blocking_emissions += 1
try:
self._emission_queue.put((sig, val), timeout=self._emit_timeout)
except queue.Full:
self._logger.exception(
"Observers cannot keep up with rate signals are being emitted. "
"Emission %r has been discarded after waiting %s seconds.",
(sig, val),
self._emit_timeout,
)
else:
if store_lock:
self._emission_store[sig] = val
else:
if store_lock:
self._emission_store[sig] = val
finally:
if store_lock:
store_lock.release()
with self._blocking_emissions_lock:
now = time.monotonic()
if (
now - self._last_emission_queue_full_log > self._LOG_QUEUE_FULL_PERIOD
and self._blocking_emissions != 0
):
prefix = "has"
suffix = ""
if self._blocking_emissions != 1:
prefix = f"and {self._blocking_emissions - 1} other emissions have"
if self._last_emission_queue_full_log != 0.0:
since_last_log = now - self._last_emission_queue_full_log
suffix = f" in the last {since_last_log} seconds"
self._logger.warning(
"Observers cannot keep up with rate signals "
"are being emitted. "
"Emission %r %s had to block waiting for "
"space in the queue%s.",
(signal, value),
prefix,
suffix,
)
self._last_emission_queue_full_log = now
self._blocking_emissions = 0
def get_last_emitted_value(self, signal: str) -> typing.Any:
"""
Get the last emission that was stored for the given signal.
:param signal: Absolute name of the signal.
:raises KeyError: If no value has been stored for the signal.
"""
with self._emission_store_locks.setdefault(signal, threading.RLock()):
return self._emission_store[signal]
def delete_last_emitted_value(self, signal: str) -> None:
"""
Delete the last emission that was stored for the given signal.
:param signal: Absolute name of the signal.
:raises KeyError: If no value has been stored for the signal.
"""
with self._emission_store_locks.setdefault(signal, threading.RLock()):
del self._emission_store[signal]
def start_thread(self) -> None:
"""Start the background thread to notify observers about emissions."""
self._thread.start()
def shutdown_thread(self) -> None:
"""
Shutdown the background thread.
This waits for the thread to finish processing remaining emissions.
If the background thread is not running, this does nothing.
"""
# Wait for all emissions to be processed so we don't miss any events
if self._thread.is_alive():
self._emission_queue.put(("!shutdown", None))
self._thread.join()
def wait_for_thread(self, timeout: float | None = 5.0) -> None:
"""
Synchronise the calling thread with the background thread.
When this function returns, all emissions sequenced before this call
will have been processed by the background thread.
:param timeout: Time to wait for the background thread. A timeout of
``None`` will wait forever.
:raises TimedOutError: If the timeout is exceeded while waiting for
the background thread.
"""
event = threading.Event()
self._emission_queue.put(("!fence", (event, None)))
if not event.wait(timeout=timeout):
raise TimedOutError()
def wait_for_signal_value(
self,
signal: str,
conditional: typing.Callable[[typing.Any], bool] | None = None,
timeout: float | None = None,
) -> bool:
"""
Wait for a value to be emitted for `signal` where `conditional` returns True.
Whenever a value is emitted for the signal, `conditional(old_value, new_value)`
is called. The thread calling this method will block until `conditional` returns
True or the timeout is reached. If `conditional` is None, this method will
unblock on any value emitted for the signal.
:param signal: The signal to wait for a value emission.
:param conditional: Callable used to check for the desired signal value.
:param timeout: Time to wait for signal value. A timeout of None (the default)
will wait forever.
:returns: True if a desired value was emitted for the signal within the timeout,
False otherwise.
"""
if conditional is None:
def default(*_: typing.Any) -> bool:
return True
conditional = default
# Event gets set() by _run_bus_thread
event = threading.Event()
# The observer will exist at least until it is added to self._observers by the
# background thread. If the timeout is longer than it takes to be registered,
# the only strong reference to the observer will be in this method, and when
# execution leaves this scope, the garbage collector will delete the observer.
observer = _WaitForSignalObserver(signal, conditional, event)
self.register_observer(observer)
return event.wait(timeout)
def __del__(self) -> None:
"""Delete the object."""
if self._thread.is_alive():
self.shutdown_thread()
def _process_emission(self, signal: str, value: typing.Any) -> None:
for obs in self._observers:
try:
obs.notify_emission(signal, value)
except Exception:
self._logger.exception(
(
"Observer %r threw an unexpected exception while "
+ "processing emission %r. Continuing with remaining observers."
),
obs,
(signal, value),
)
def _run_bus_thread(self) -> None:
current_signal_bus_logger.set(self._logger)
with tango.EnsureOmniThread():
while True:
signal, value = self._emission_queue.get()
if signal.startswith("!"):
match signal:
case "!shutdown":
break
case "!fence":
received, pause_request = value
received.set()
if pause_request is not None:
self._pause(*pause_request)
case "!register":
self._observers.add(value)
case _:
self._logger.error(
f'Invalid control signal "{signal}". Skipping.'
)
else:
self._process_emission(signal, value)
# Workarounds for pytango#688, see comment in SignalBusMixin.__init__()
def _pause(
self, should_unpause_now: typing.Callable[[], bool], max_duration: float
) -> None:
"""Pause thread until should_unpause_now returns True."""
end_timestamp = time.monotonic() + max_duration
while time.monotonic() < end_timestamp:
if should_unpause_now():
return
time.sleep(0.1)
self._logger.error(
"SignalBus background thread pause exceed "
f"maximum duration of {max_duration} ms."
)
def _pause_thread_until(
self,
callback: typing.Callable[[], bool],
max_pause_duration: int = 10000,
ack_timeout: int = 1000,
) -> None:
"""
Pause the background thread until the callback returns True.
This function will first clear the queue, before pausing the
background thread. Signals emitted after this function is called,
will not be processed until the background thread is unpaused.
The thread will emit and error and unpause if it exceeds the
maximum duration.
This function exists to workaround pytango#688 and should not
be used outside of ska-tango-base.
:param callback: Returns True when we should stop pausing
:param max_pause_duration: Maximum duration to pause for
:param ack_timeout: Timeout for pause acknowledgement
:returns: Once the queue has been cleared and the background
thread has acknowledged the pause request.
"""
event = threading.Event()
self._emission_queue.put(("!fence", (event, (callback, max_pause_duration))))
if not event.wait(timeout=ack_timeout):
raise TimedOutError()