Source code for ska_tango_base.callback_scheduler

"""
A module for managing Tango callbacks robustly.

The aim of the :class:`CallbackScheduler` class is to allow you to
decouple the performance of callbacks from each other, allowing you to locally
reason about the callback.

Currently :class:`CallbackScheduler` only supports callbacks for Tango
events but it may support SKA Long Running Command callbacks and
asynchronous Tango request callbacks in the future.
"""

from __future__ import annotations

import collections
import concurrent.futures as _futs
import heapq
import itertools
import logging
import threading
import time
import typing
import weakref
from dataclasses import dataclass, field

import tango
from packaging import version

from . import type_hints

__all__ = ["CallbackScheduler", "CallbackID", "Queue", "BrokenCallbackSchedulerError"]


CallbackID = typing.NewType("CallbackID", int)
"""
Callback Identifier.

Used to unregister a callback.  Can be discarded if unregistering
is not needed.
"""


[docs] class Queue: """ Opaque reference to an internal event queue. If there are no references to the :class:`Queue` then the corresponding internal event queue will be deleted. The class:`CallbackScheduler` will keep a reference to the :class:`Queue` while the internal event queue is in use. """ __slots__ = ("__weakref__",)
[docs] class BrokenCallbackSchedulerError(Exception): """Raised when a CallbackScheduler has already been shutdown."""
[docs] def __init__(self) -> None: """Initialise exception message.""" super().__init__("CallbackSheduler already shutdown")
[docs] class CallbackScheduler: """ A class for managing Tango callbacks robustly. Tango events are quickly moved from the Tango thread to internal queues where background threads call the supplied callbacks. This decouples the speed of event processing of the supplied callbacks from the rest of the Tango process. Queues are bounded and will discard old data if the supplied callbacks cannot keep up. The queue can be configured by manually allocating it with the :meth:`allocate_queue` method and then passing the returned queue to :meth:`register_event_callback` or :meth:`connect_event_stream`. Multiple event streams can be allocated to the same queue to ensure events from each stream are processed in order relative to each other. Queues are processed by worker threads with each queue being assigned a priority that is lower the more events that have been processed recently. The worker threads process one event on the queue at a time, selecting a new queue to process in between each event. """ _name_counter = itertools.count().__next__
[docs] def __init__( self, *, thread_count: int = 1, name: str | None = None, logger: logging.Logger | None = None, ) -> None: """ Initialise the object. .. warning:: If ``thread_count > 1`` then registered callbacks for different event streams can be executed concurrently and it is the users responsibility to ensure this is safe. This is different from Tango event subscriptions where callbacks are all called from the same thread. :param thread_count: Number of worker threads to spawn. :param name: Name of the CallbackScheduler, if ``None`` a default name will be provided. :param logger: Logger object to use, if ``None`` the module logger will be used. """ if name is None: name = f"CallbackScheduler-{CallbackScheduler._name_counter()}" self.name = name # If empty the event handler has been shutdown and # we raise BrokenCallbackSchedulerError as a courtesy self._threads: set[threading.Thread] = set() self._callback_id_counter = itertools.count().__next__ # Lock invariant: Only 1 thread is calling _callback_id_counter at a # time. self._callback_id_lock = threading.Lock() # We do not need a lock to guard _callbacks as CallbackID is an int so # __eq__ does not run arbitrary code. See # https://docs.python.org/3/library/threadsafety.html#thread-safety-dict. self._callbacks: dict[ CallbackID, tuple[_EventStream, type_hints.EventCallbackType] ] = {} self._queues = _QueueSet() self._proxies = weakref.WeakValueDictionary[str, tango.DeviceProxy]() # Lock invariant: At most one DeviceProxy exists for each trl self._proxies_lock = threading.Lock() self._event_streams: dict[_EventStream, _EventStreamMeta] = {} # Lock invariant: At most one _EventStreamMeta exists for each # _EventStream, including any _EventStreamMeta that is "on the stack". # # If both this lock and an _EventStreamMeta.lock need to be held at the # same time, then this must be acquired first self._event_streams_lock = threading.Lock() self._active_heap = _ActiveHeap() self._logger = logger if logger is not None else logging.getLogger(__name__) for i in range(thread_count): thread_name = f"{name}-{i}" t = threading.Thread( name=thread_name, target=_worker, args=( self._queues, self._active_heap, self.name, self._logger, ), daemon=True, ) t.start() self._threads.add(t)
[docs] def shutdown(self) -> None: """Shutdown background threads and subscribe from all events.""" threads = self._begin_shutdown() self._wait_shutdown(threads)
def __del__(self) -> None: """Shutdown CallbackScheduler.""" self.shutdown() @typing.overload def register_event_callback( self, device_trl: str, event_type: tango.EventType, callback: type_hints.EventCallbackType, /, initial_event: bool = True, queue_factory: typing.Callable[[], Queue] | None = None, ) -> _futs.Future[CallbackID]: ... @typing.overload def register_event_callback( self, device_trl: str, attr: str, event_type: tango.EventType, callback: type_hints.EventCallbackType, /, initial_event: bool = True, queue_factory: typing.Callable[[], Queue] | None = None, ) -> _futs.Future[CallbackID]: ... @typing.overload def register_event_callback( self, device: tango.DeviceProxy, event_type: tango.EventType, callback: type_hints.EventCallbackType, /, initial_event: bool = True, queue_factory: typing.Callable[[], Queue] | None = None, ) -> _futs.Future[CallbackID]: ... @typing.overload def register_event_callback( self, device: tango.DeviceProxy, attr: str, event_type: tango.EventType, callback: type_hints.EventCallbackType, /, initial_event: bool = True, queue_factory: typing.Callable[[], Queue] | None = None, ) -> _futs.Future[CallbackID]: ...
[docs] def register_event_callback( self, *args: typing.Any, initial_event: bool = True, queue_factory: typing.Callable[[], Queue] | None = None, ) -> _futs.Future[CallbackID]: """ Register a callback to an event stream. If it is not already connected, this will create a temporary connection. A temporary connection will automatically be disconnected when there are no registered callbacks. See :meth:`connect_event_stream` for details. If unregistering independently from other callbacks is not required, the returned :class:`CallbackID` and/or :class:`~concurrent.futures.Future` can be safely discarded. >>> eh = CallbackScheduler() >>> fut = eh.register_callback( "foo/bar/1", "myAttr", tango.EventType.CHANGE_EVENT, my_callback) >>> cid = fut.result(timeout=10) # Wait for connection to be established >>> ... >>> eh.unregister_callback(cid) :param device: The Tango device producing the event stream. :param device_trl: The Tango resource locator of the device producing the event stream. :param attr: The name of the attribute to connect to. Absent for ``INTERFACE_CHANGE_EVENT``. :param event_type: The type of the event stream. :param callback: Callback to call with event data. :param queue_factory: Returns a queue to use for this event stream if not yet connected. :return: Future returning callback ID to unregister with. """ result = _futs.Future[CallbackID]() callback: typing.Callable[..., None] stream, callback = _resolve_stream_args(*args) with self._callback_id_lock: cid = CallbackID(self._callback_id_counter()) self._callbacks[cid] = (stream, callback) def on_connection(fut: _ConnectionFuture) -> None: try: if result.set_running_or_notify_cancel(): meta, finalisers = fut.result() # CoW so that events received before this point do # not get sent to the new callback. new = meta.callbacks.copy() new.append(callback) meta.callbacks = new if initial_event and meta.last_event is not None: # FIXME(tri): It would be nice if we batched up these initial # calls when multiple callbacks are waiting for the # connection... I'm not quite sure how to do it with # the current architecture. queue = self._queues.mapping[meta.queue] queue.put(_Invocation(meta.last_event, [callback])) finalisers.append(lambda: result.set_result(cid)) else: try: del self._callbacks[cid] except KeyError: pass except Exception as ex: result.set_exception(ex) con_fut = _ConnectionFuture() con_fut.add_done_callback(on_connection) self._ensure_connected(stream, queue_factory, con_fut) return result
[docs] def unregister_callback( self, callback: CallbackID, ) -> None: """ Unregister a callback. If there are no more callbacks associated with the event stream, then the event stream will be disconnected unless it was marked as permanent via :meth:`connect_event_stream`. :param callback: The callback to unregister. :raises ValueError: If the callback ID is not known. """ try: stream, cb = self._callbacks.pop(callback) except KeyError: raise ValueError(f"Unknown callback ID {callback}.") with self._event_streams_lock: meta = self._event_streams[stream] with meta.lock: new = meta.callbacks.copy() new.remove(cb) meta.callbacks = new if meta.should_delete(): del self._event_streams[stream] if meta.should_delete(): meta.unsub_and_cancel(stream)
@typing.overload def connect_event_stream( self, device_trl: str, event_type: tango.EventType, /, queue_factory: typing.Callable[[], Queue] | None = None, ) -> _futs.Future[None]: ... @typing.overload def connect_event_stream( self, device_trl: str, attr: str, event_type: tango.EventType, /, queue_factory: typing.Callable[[], Queue] | None = None, ) -> _futs.Future[None]: ... @typing.overload def connect_event_stream( self, device: tango.DeviceProxy, event_type: tango.EventType, /, queue_factory: typing.Callable[[], Queue] | None = None, ) -> _futs.Future[None]: ... @typing.overload def connect_event_stream( self, device: tango.DeviceProxy, attr: str, event_type: tango.EventType, /, queue_factory: typing.Callable[[], Queue] | None = None, ) -> _futs.Future[None]: ...
[docs] def connect_event_stream( self, *args: typing.Any, queue_factory: typing.Callable[[], Queue] | None = None, ) -> _futs.Future[None]: """ Connect to an event stream and maintain connection. The connection is established via a Tango event subscription with a callback that will push events to the queue associated with this event stream. Calling this function marks an event stream as "permanent", meaning it will not get disconnected even if there are no callbacks subscribed. This can be useful to avoid multiple Tango subscription requests to the Tango device when callbacks only want to be registered temporarily. If the connection has not already been made and ``queue_factory is None`` then a new queue will be allocated with the default arguments. Otherwise the ``queue_factory`` will be called to construct a new queue. Events are discarded rather than added to the queue if there are no callbacks subscribed. Connecting to the event stream is handled asynchronously and the returned future can be used to track the progress of the connection. If the connection is already active when this function is called, then the returned future will be fulfilled. Call :meth:`~concurrent.futures.Future.result` to wait until the connection has completed. The future may be :meth:`~concurrent.futures.Future.cancel`'d to abort the connection if it has not already been completed. .. warning:: The connection is considered completed as soon as Tango calls the internal subscription callback. This may be with an error event informing us that we are unable to connect to the Tango device for some reason. Example:: >>> eh = CallbackScheduler() >>> fut = eh.connect_event_stream( "foo/bar/1", "myAttr", tango.EventType.CHANGE_EVENT) >>> fut.result(timeout=10) # Wait for connection to be established :param device: The Tango device producing the event stream. :param device_trl: The Tango resource locator of the device producing the event stream. :param attr: The name of the attribute to connect to. Absent for ``INTERFACE_CHANGE_EVENT``. :param event_type: The type of the event stream. :param queue_factory: Returns a queue to use for this event stream if not yet connected. :return: Future to track the progress of the connection. """ result = _futs.Future[None]() stream, _ = _resolve_stream_args(*args) def on_connection(fut: _ConnectionFuture) -> None: try: if result.set_running_or_notify_cancel(): meta, finalisers = fut.result() meta.temporary = False finalisers.append(lambda: result.set_result(None)) except Exception as ex: result.set_exception(ex) con_fut = _ConnectionFuture() con_fut.add_done_callback(on_connection) self._ensure_connected(stream, queue_factory, con_fut) return result
@typing.overload def disconnect_event_stream( self, device_trl: str, event_type: tango.EventType, /, ) -> None: ... @typing.overload def disconnect_event_stream( self, device_trl: str, attr: str, event_type: tango.EventType, /, ) -> None: ... @typing.overload def disconnect_event_stream( self, device: tango.DeviceProxy, event_type: tango.EventType, /, ) -> None: ... @typing.overload def disconnect_event_stream( self, device: tango.DeviceProxy, attr: str, event_type: tango.EventType, /, ) -> None: ...
[docs] def disconnect_event_stream( self, *args: typing.Any, ) -> None: """ Disconnect from an event stream. If there are callbacks associated with the event stream, then this function will just mark the event stream as "temporary" and the event stream will be disconnected when the last callback is unregistered. :param device: The Tango device producing the event stream. :param device_trl: The Tango resource locator of the device producing the event stream. :param attr: The name of the attribute to connect to. Absent for ``INTERFACE_CHANGE_EVENT``. :param event_type: The type of the event stream. """ stream, _ = _resolve_stream_args(*args) with self._event_streams_lock: meta = self._event_streams[stream] with meta.lock: meta.temporary = True if meta.should_delete(): del self._event_streams[stream] if meta.should_delete(): meta.unsub_and_cancel(stream)
[docs] def disconnect_all(self) -> None: """ Unregister all callbacks and disconnect all event streams. Unlike :meth:`shutdown`, the worker threads are still running and the :class:`CallbackScheduler` can still be used. """ with self._event_streams_lock: streams = self._event_streams self._callbacks.clear() self._event_streams = {} for stream, meta in streams.items(): meta.unsub_and_cancel(stream)
[docs] def allocate_queue(self, *, queue_size: int = 8) -> Queue: """ Allocate an event queue. :param queue_size: Size of the queue. When full old entries will be discarded to make space for the new. :return: The allocated queue. """ if not self._threads: raise BrokenCallbackSchedulerError() return self._queues.allocate(self._active_heap, queue_size)
@typing.overload def get_queue( self, device_trl: str, event_type: tango.EventType, /, ) -> Queue: ... @typing.overload def get_queue( self, device_trl: str, attr: str, event_type: tango.EventType, /, ) -> Queue: ... @typing.overload def get_queue( self, device: tango.DeviceProxy, event_type: tango.EventType, /, ) -> Queue: ... @typing.overload def get_queue( self, device: tango.DeviceProxy, attr: str, event_type: tango.EventType, /, ) -> Queue: ...
[docs] def get_queue(self, *args: typing.Any) -> Queue: """ Return the queue used by a particular event stream. :param trl: The Tango resource locator of the origin of the event stream. :param event_type: The type of the event stream. :return: ID of the queue. """ if not self._threads: raise BrokenCallbackSchedulerError() stream, _ = _resolve_stream_args(*args) try: meta = self._event_streams[stream] return meta.queue except KeyError: raise KeyError(f"Unknown event stream {stream!r}")
def _get_proxy(self, trl: str) -> tango.DeviceProxy: with self._proxies_lock: if trl not in self._proxies: result = tango.DeviceProxy(trl) self._proxies[trl] = result return result return self._proxies[trl] def _ensure_connected( self, stream: _EventStream, queue_factory: typing.Callable[[], Queue] | None, fut: _ConnectionFuture, ) -> None: """ Ensure an event stream is connected. To maximise concurrency and avoid deadlocks this method ensures the following: 1. We release the `_event_streams_lock` ASAP to allow other threads to access other event streams. 2. We grab the `_EventStreamData.lock` before we release the `_event_stream_lock` so that we can mark the data stream as in use before it can be deleted. 3. We do not call "user code" while holding a lock as this may lead to a deadlock. `fut` will be set with the `_EventStreamData` object and a list of finalisers while holding the `_EventStreamData.lock` or an exception if the event stream is disconnected while the connection is still pending. After this the `_EventStreamData` will be checked to see if it should be deleted, so the `fut` should have a done callback to mark the data as in use when required. After this check, the `finalisers` will be called while not holding the `_EventStreamData.lock`. These should be used to set the result of user `Future`s. """ if not self._threads: raise BrokenCallbackSchedulerError() fut.set_running_or_notify_cancel() # Mark as uncancelable # If true, we need to do a Tango event subscription to connect # this event stream. sub_required = False with self._event_streams_lock: if stream in self._event_streams: meta = self._event_streams[stream] else: if queue_factory is None: queue = self.allocate_queue() else: queue = queue_factory() meta = _EventStreamMeta(queue) self._event_streams[stream] = meta sub_required = True meta.lock.acquire() finalisers: list[typing.Callable[[], None]] = [] try: if meta.pending_futures is not None: meta.pending_futures.append(fut) else: fut.set_result((meta, finalisers)) finally: meta.lock.release() for cb in finalisers: cb() if not sub_required: return connected = False def on_event_data( event: tango.EventData, metaref: weakref.ref[_EventStreamMeta] = weakref.ref(meta), ) -> None: # Use a weakref to avoid this callback keeping the Meta alive. meta = metaref() if meta is None: return with meta.lock: meta.last_event = event if meta.callbacks: queue = self._queues.mapping[meta.queue] queue.put(_Invocation(event, meta.callbacks)) # On initial connection (successful or otherwise) inform all waiting # _ConnectionFutures so the done callback can update the meta with # either a EventCallbackType or marking it as permanent. nonlocal connected if not connected: # Callbacks to update user futures _after_ we have resolved # the connection. finalisers: list[typing.Callable[[], None]] = [] with meta.lock: futs = meta.pending_futures meta.pending_futures = None assert futs is not None for f in futs: # This calls a done callback as we always add the # callback to the _ConnectionFuture before we pass it # here. f.set_result((meta, finalisers)) # If all the user futures were cancelled, this event stream might # need to be disconnected as no callbacks were added and it wasn't # marked permanent. with self._event_streams_lock: with meta.lock: if meta.should_delete(): del self._event_streams[stream] if meta.should_delete(): meta.unsub_and_cancel(stream) connected = True for cb in finalisers: cb() try: meta.proxy = self._get_proxy(stream.device_trl) args = stream.sub_args if version.parse(tango.__version__) < version.parse("10.1.0"): meta.sub_id = meta.proxy.subscribe_event( *args, on_event_data, stateless=True ) else: meta.sub_id = meta.proxy.subscribe_event( *args, on_event_data, sub_mode=tango.EventSubMode.AsyncRead, ) except Exception: with self._event_streams_lock: with meta.lock: try: del self._event_streams[stream] except KeyError: pass meta.unsub_and_cancel(stream) raise def _begin_shutdown(self) -> list[threading.Thread]: threads = list(self._threads) self._threads.clear() self.disconnect_all() for _ in threads: self._active_heap.submit(_NULL_QUEUE) return threads def _wait_shutdown(self, threads: list[threading.Thread]) -> None: for t in threads: t.join()
class _ActiveHeap: def __init__(self) -> None: self.work_available = threading.Condition() self.heap: list[_InvocationQueue] = [] self.ticket_counter = itertools.count(1).__next__ def submit(self, queue: _InvocationQueue) -> None: # queue lock must be held with self.work_available: if queue is not _NULL_QUEUE: queue.active = True queue.ticket = self.ticket_counter() heapq.heappush(self.heap, queue) self.work_available.notify() def next_queue(self, current: _InvocationQueue) -> _InvocationQueue: if current is not _NULL_QUEUE: with current.mutex: if current.items: with self.work_available: current.ticket = self.ticket_counter() return heapq.heappushpop(self.heap, current) current.active = False del current with self.work_available: while not self.heap: # Reset the ticket counter while nothing is going on to avoid # the ticket numbers getting too large self.ticket_counter = itertools.count(1).__next__ self.work_available.wait() return heapq.heappop(self.heap) @dataclass(slots=True) class _Invocation: event: type_hints.EventDataType callbacks: list[type_hints.EventCallbackType] class _InvocationQueue: """ A thread-safe, bounded queue that never blocks. Unlike queue.SimpleQueue, this queue is bounded. Unlike queue.Queue, this queue will discard the oldest data to make room for new data. The queue will add itself to it's ``_ActiveHeap`` when it has data. """ def __init__(self, active_heap: _ActiveHeap, max_size: int) -> None: self.items = collections.deque[_Invocation]() self.active_heap = active_heap self.max_size = max_size self.active = False # Lower priority is higher precedence, 0.0 is the highest # allowed precedence self.priority = 0.0 # Unique for all active queues. Used as a tie breaker to ensure # round-robin. self.ticket = 0 # Must be held for all operations. self.mutex = threading.Lock() def put(self, item: _Invocation) -> None: """ Put an item into the queue. If the queue is full, the first (oldest) item will be removed to make room. :param item: Item to insert """ with self.mutex: # This should only ever run once, but we defensively use # a while loop to make sure we make enough room. while len(self.items) >= self.max_size: self.items.popleft() self.items.append(item) if not self.active: self.active_heap.submit(self) def get(self) -> _Invocation | None: """Get an item from the front of the queue.""" with self.mutex: if len(self.items): return self.items.popleft() return None def __lt__(self, other: _InvocationQueue) -> bool: return (self.priority, self.ticket) < (other.priority, other.ticket) _NULL_QUEUE = _InvocationQueue(typing.cast(_ActiveHeap, None), 0) @dataclass(init=True, unsafe_hash=True, eq=True, slots=True, frozen=True) class _EventStream: device_trl: str attr: str | None event_type: tango.EventType @property def sub_args(self) -> tuple[typing.Any, ...]: if self.attr is None: return (self.event_type,) return (self.attr, self.event_type) _T = typing.TypeVar("_T") def _resolve_stream_args( device_proxy_or_trl: tango.DeviceProxy | str, attr_or_type: str | tango.EventType, event_type: tango.EventType | _T, arg: _T | None = None, ) -> tuple[_EventStream, _T]: if isinstance(attr_or_type, str): attr = attr_or_type else: attr = None event_type = attr_or_type arg = typing.cast(_T, event_type) if isinstance(device_proxy_or_trl, str): if not device_proxy_or_trl.startswith("tango://"): db = tango.Database() db_host = db.get_db_host() db_port = db.get_db_port() device_trl = f"tango://{db_host}:{db_port}/{device_proxy_or_trl}" else: device_trl = device_proxy_or_trl else: db_port = device_proxy_or_trl.get_db_port() dev_name = device_proxy_or_trl.dev_name() if db_port == "Unused": dev_host = device_proxy_or_trl.get_dev_host() dev_port = device_proxy_or_trl.get_dev_port() device_trl = f"tango://{dev_host}:{dev_port}/{dev_name}#dbase=no" else: db_host = device_proxy_or_trl.get_db_host() device_trl = f"tango://{db_host}:{db_port}/{dev_name}" return _EventStream(device_trl, attr, event_type), typing.cast(_T, arg) @dataclass class _EventStreamMeta: # Queue to add subscriptions to. queue: Queue # Lock invariants: # - (callbacks or not temporary) or the meta is not in the dictionary # - pending_futures is None and last_event is not None or the # event stream is still connecting # - if sub_id is not None, then there is an active Tango subscription lock: threading.Lock = field(default_factory=threading.Lock) # Proxy used to connect with. # Invariant: If proxy is None then so is sub_id. proxy: tango.DeviceProxy | None = None # The Tango subscription ID for this event stream. # Invariant: If sub_id is None then so is proxy. sub_id: int | None = None # The most recent event received for the event stream. This is used to # provide the initial call when we register a callback. last_event: typing.Any = None # A possibly empty list of callbacks to call whenever an event is received. # This list must be treated as CoW as the existing list might be reference # by an _Invocation in some _InvocationQueue. callbacks: list[type_hints.EventCallbackType] = field(default_factory=list) # Futures to be notified when the connection is established. Each future # must have a done callback to mark the meta as "in use" when called. If # None then the meta has already established the connection. pending_futures: list[_ConnectionFuture] | None = field(default_factory=list) # If True then the meta will be deleted when there are no remaining # subscriptions. # Invariant: if temporary is True then callbacks is not empty temporary: bool = True def should_delete(self) -> bool: return self.temporary and not self.callbacks def unsub_and_cancel(self, stream: _EventStream) -> None: if self.proxy is not None and self.sub_id is not None: self.proxy.unsubscribe_event(self.sub_id) self.proxy = None self.sub_id = None if self.pending_futures is not None: for fut in self.pending_futures: fut.set_exception(RuntimeError(f"{stream!r} has been disconnected")) _DECAY_PERIOD = 10 _DECAY_FACTOR = 5 / 8 @dataclass class _QueueSet: mapping: weakref.WeakKeyDictionary[Queue, _InvocationQueue] = field( default_factory=weakref.WeakKeyDictionary ) # Lock invariant: Only one thread is either iterating or adding to # mapping at a time. lock: threading.Lock = field(default_factory=threading.Lock) next_decay: float = field(default_factory=lambda: time.monotonic() + _DECAY_PERIOD) def allocate(self, active_heap: _ActiveHeap, size: int) -> Queue: with self.lock: qid = Queue() self.mapping[qid] = _InvocationQueue(active_heap, size) return qid def decay_priorities(self) -> None: with self.lock: now = time.monotonic() while now > self.next_decay: self.next_decay += _DECAY_PERIOD for q in self.mapping.values(): q.priority *= _DECAY_FACTOR _ConnectionFuture: typing.TypeAlias = _futs.Future[ tuple[_EventStreamMeta, list[typing.Callable[[], None]]] ] def _worker( queues: _QueueSet, active_heap: _ActiveHeap, name: str, logger: logging.Logger, ) -> None: with tango.EnsureOmniThread(): queue = _NULL_QUEUE while True: queues.decay_priorities() queue = active_heap.next_queue(queue) if queue is _NULL_QUEUE: return item = queue.get() if item is not None: for cb in item.callbacks: try: if callable(cb): cb(item.event) else: cb.push_event(item.event) except Exception: logger.exception( f"{name}: Callback {cb} raised an exception " f"for {item.event}. Continuing." ) queue.priority += 1