Source code for ska_tango_base.software_bus._observer

#
# This file is part of the SKA Tango Base project
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE.txt for more info.
#
import functools
import inspect
import typing
from collections import defaultdict

from .. import type_hints
from .._autodoc_hacks import _TANGO_IS_MOCKED_BY_AUTODOC
from ._bus import current_signal_bus_logger
from ._signal import NoValue, Signal

ObserverT = typing.TypeVar("ObserverT", bound="Observer")
SharingObserverT = typing.TypeVar("SharingObserverT", bound="SharingObserver")
ValueT = typing.TypeVar("ValueT")


[docs] class ListenerMethod(typing.Protocol): """Method on an :py:class:`Observer` which can receive a signal.""" @property def __listen_to_signal__(self) -> typing.Any: """Signal to listen to.""" @__listen_to_signal__.setter def __listen_to_signal__(self, value: typing.Any) -> None: """Signal to listen to.""" def __call__(self, observer: ObserverT, value: typing.Any) -> None: """Respond to emission."""
class OnOwnerSharedBusDescriptor(typing.Protocol): """A descriptor that gets called when the bus is shared with its owner.""" def on_owner_shared_bus(self, owner: SharingObserverT) -> None: """Respond to a bus being shared with the owner.""" _ListenerDecorator: typing.TypeAlias = typing.Callable[ [typing.Callable[[ObserverT, typing.Any], None]], ListenerMethod ] # We are using a object here, rather than a function, to make it easier to # avoid circular references. If we just used a single function as an observer, # then users would have to be careful to make sure closures only hold weak # references to objects holding a reference to the bus. By making the observer # a class, we can manage this problem in the _SignalBus ourselves.
[docs] class Observer: """ An observer that handles different signals with different methods. Sub-classes can mark a method as a :py:const:`ListenerMethod` using the :py:func:`listen_to_signal()` decorator. :py:const:`ListenerMethod` will only be called for signals they are registered to. """ _static_listener_methods: typing.ClassVar[ dict[str | Signal[typing.Any], list[ListenerMethod]] ] @property def _listener_methods(self) -> dict[str, list[ListenerMethod]]: return typing.cast( dict[str, list[ListenerMethod]], self._static_listener_methods ) @classmethod def __init_subclass__(cls, **kwargs: typing.Any) -> None: """Gather all the listener methods on subclasses.""" super().__init_subclass__(**kwargs) listeners = defaultdict[typing.Any, list[ListenerMethod]](lambda: []) for key in dir(cls): obj = getattr(cls, key) if inspect.isroutine(obj) and hasattr(obj, "__listen_to_signal__"): listeners[obj.__listen_to_signal__].append( typing.cast(ListenerMethod, obj) ) cls._static_listener_methods = dict(listeners)
[docs] def notify_emission(self, signal: str, value: typing.Any) -> None: """Call all the listener methods for the given signal.""" for unbound_method in self._listener_methods.get(signal, []): try: unbound_method(self, value) except Exception: logger = current_signal_bus_logger.get() logger.exception( ( "Listener method %r threw an unexpected exception for %r. " + "Continuing with remaining listeners." ), unbound_method, value, )
@typing.overload def listen_to_signal(signal: str) -> _ListenerDecorator[ObserverT]: ... @typing.overload def listen_to_signal( signal: Signal[ValueT], ) -> _ListenerDecorator[SharingObserverT]: ... @typing.overload def listen_to_signal( signal: str, listener: typing.Callable[[ObserverT, typing.Any], None] ) -> ListenerMethod: ... @typing.overload def listen_to_signal( signal: Signal[ValueT], listener: typing.Callable[[SharingObserverT, typing.Any], None], ) -> ListenerMethod: ...
[docs] def listen_to_signal( signal: str | Signal[ValueT], listener: typing.Callable[[ObserverT, typing.Any], None] | None = None, ) -> ( _ListenerDecorator[ObserverT] | _ListenerDecorator[SharingObserverT] | ListenerMethod ): """ Mark a method as listening to a signal. This function will be called asynchronously whenever a value is emitted for the signal. For :py:class:`Observer` objects, only a `signal` specified as a `str` is supported, however, for :py:class:`SharingObserver` objects both `str` and :py:class:`Signal` objects are accepted. For a :py:class:`SharingObserver`, signal names are relative to the parent object. """ # This functools.partial trick allows us to use listen_to_signal as a # decorator: # # ``` # @listen_to_signal("signal") # def on_signal(self, old_value, new_value): # ... # ``` # and as a function: # ``` # listener = listen_to_signal("signal", method) # ``` if listener is None: return typing.cast( _ListenerDecorator[typing.Any], functools.partial(listen_to_signal, signal) ) # Used by Observer.__init_subclass__ to know that this method wants to # listen to this signal. listener = typing.cast(ListenerMethod, listener) listener.__listen_to_signal__ = signal return listener
[docs] class SharingObserver(Observer): """ An observer that shares its bus with sub-objects. When a :py:class:`~ska_tango_base.type_hints.BusProtocol` is assigned to :py:attr:`shared_bus` this will recursively set on all sub-objects which also inherit from :py:class:`SharingObserver`. Subclasses can be notified when a new :py:attr:`shared_bus` is available by overriding :py:meth:`on_new_shared_bus`. Each :py:class:`SharingObserver` object has a :py:attr:`observer_prefix` that prefixes any signals listened to by that object. The root object, where the bus was originally assigned, has an :py:attr:`observer_prefix` of "." and each sub-object has an :py:attr:`observer_prefix` based on the path to access that sub-object from the root object. For example: .. code :: python class SubObj(SharingObserver): bar = Signal[str]() @listen_to_signal(bar) def on_bar(self, value: str): print(value) class MySharer(SharingObserver): def __init__(self): self.foo = SubObj() self.shared_bus = SignalBus() self.shared_bus.start_thread() sharer = MySharer() assert sharer.observer_prefix == "." assert sharer.foo.observer_prefix == ".foo" Here, the ``sharer.foo.on_bar`` listener method will receive values emitted on the bus whenever ``sharer.foo.bar`` is set. """ _canonicalised_listener_methods: dict[str, list[ListenerMethod]] _on_owner_shared_bus_descriptors: typing.ClassVar[list[OnOwnerSharedBusDescriptor]] _shared_bus: type_hints.BusProtocol | None ROOT_OBSERVER_PREFIX: typing.ClassVar[str] = "."
[docs] def __init__(self, *args: typing.Any, **kwargs: typing.Any) -> None: """Initialise the device.""" self._shared_bus = None self._path_from_root = self.ROOT_OBSERVER_PREFIX super().__init__(*args, **kwargs)
@classmethod def __init_subclass__(cls, **kwargs: typing.Any) -> None: """Gather all the on share descriptors on subclasses.""" super().__init_subclass__(**kwargs) descriptors = [] if not _TANGO_IS_MOCKED_BY_AUTODOC: for key in dir(cls): obj = getattr(cls, key) if inspect.isroutine(getattr(obj, "on_owner_shared_bus", None)): descriptors.append(typing.cast(OnOwnerSharedBusDescriptor, obj)) cls._on_owner_shared_bus_descriptors = descriptors @property def observer_prefix(self) -> str: """Return the path from the root ``SharingObserver`` holding the bus.""" return self._path_from_root @property def shared_bus(self) -> type_hints.BusProtocol: """The shared bus used by this ``SharingObserver``.""" if self._shared_bus is None: raise RuntimeError("The bus has not been shared!") return self._shared_bus @shared_bus.setter def shared_bus(self, bus: type_hints.BusProtocol) -> None: self._shared_bus = bus self._shared_bus.register_observer(self) self.on_new_shared_bus() for key, obj in vars(self).items(): if isinstance(obj, SharingObserver): obj._path_from_root = f"{self._path_from_root}{key}." obj.shared_bus = self._shared_bus all_vars: dict[str, typing.Any] = {} for cls in reversed(self.__class__.mro()): all_vars.update(vars(cls)) for key, obj in all_vars.items(): if isinstance(obj, Signal) and obj._initial_value is not NoValue: obj.__set__(self, obj._initial_value) @property def _listener_methods(self) -> dict[str, list[ListenerMethod]]: return self._canonicalised_listener_methods
[docs] def on_new_shared_bus(self) -> None: """ Notify that a new bus is available. When overriding this method you must call ``super().on_new_shared_bus``. """ listeners = defaultdict(lambda: []) for signal, methods in self._static_listener_methods.items(): if isinstance(signal, str): canonical = canonicalise_relative_to(self, signal) elif isinstance(signal, Signal): canonical = signal._absolute_name_for(self) else: raise TypeError(f"Unknown signal type '{type(signal)}'") listeners[canonical].extend(methods) self._canonicalised_listener_methods = dict(listeners) for descriptor in self._on_owner_shared_bus_descriptors: descriptor.on_owner_shared_bus(self)
[docs] def canonicalise_relative_to( obj: type_hints.SharingObserverProtocol, signal: str ) -> str: """ Return the absolute signal name, relative to obj. If signal is already absolute, it is returned as is. """ if signal.startswith(obj.ROOT_OBSERVER_PREFIX): return signal return f"{obj.observer_prefix}{signal}"
[docs] class RootObserver(SharingObserver):
[docs] def on_new_shared_bus(self) -> None: """Register the transformer on the shared bus with a weak reference.""" super().on_new_shared_bus() self.shared_bus.register_transformer(self)
[docs] def transform_emission_cascade( self, emissions: dict[str, typing.Any], store_manager: type_hints.EmissionStoreManagerProtocol, ) -> dict[str, typing.Any]: """ Transform a cascade of emissions. This method is called for each :py:meth:`~ska_tango_base.type_hints.BusProtocol.emit` call, receiving all signals being emitted in that cascade. You can modify the emissions - change values, add new signals, or remove signals by not including them in the returned dict. The transformation is applied to the whole cascade of emissions at once, so the transformation can use the relationships between signals to determine how to transform them. Signals not included in the returned dict will be suppressed and not sent to observers. New signals can be added to generate derived emissions. Any implementation must call ``super().transform_emission_cascade(emissions)`` in order to not break the transformation chain: .. code::python def transform_emission_cascade(self, emissions, store_manager): emissions = super().transform_emission_cascade(emissions, store_manager) # Do some transformation to emissions here ... return emissions :param emissions: the cascade of emissions to be transformed, as a dict mapping signal name to emitted value. :param store_manager: context manager for atomically updating stored emissions :return: the transformed cascade of emissions, as a dict mapping signal name to emitted value. """ return emissions