"""
A module for managing Tango callbacks robustly.
The aim of the :class:`CallbackScheduler` class is to allow you to
decouple the performance of callbacks from each other, allowing you to locally
reason about the callback.
Currently :class:`CallbackScheduler` only supports callbacks for Tango
events but it may support SKA Long Running Command callbacks and
asynchronous Tango request callbacks in the future.
"""
from __future__ import annotations
import collections
import concurrent.futures as _futs
import heapq
import itertools
import logging
import threading
import time
import typing
import weakref
from dataclasses import dataclass, field
import tango
from packaging import version
from . import type_hints
__all__ = ["CallbackScheduler", "CallbackID", "Queue", "BrokenCallbackSchedulerError"]
CallbackID = typing.NewType("CallbackID", int)
"""
Callback Identifier.
Used to unregister a callback. Can be discarded if unregistering
is not needed.
"""
[docs]
class Queue:
"""
Opaque reference to an internal event queue.
If there are no references to the :class:`Queue` then the corresponding
internal event queue will be deleted. The class:`CallbackScheduler` will
keep a reference to the :class:`Queue` while the internal event queue is in
use.
"""
__slots__ = ("__weakref__",)
[docs]
class BrokenCallbackSchedulerError(Exception):
"""Raised when a CallbackScheduler has already been shutdown."""
[docs]
def __init__(self) -> None:
"""Initialise exception message."""
super().__init__("CallbackSheduler already shutdown")
[docs]
class CallbackScheduler:
"""
A class for managing Tango callbacks robustly.
Tango events are quickly moved from the Tango thread to internal queues where
background threads call the supplied callbacks. This decouples the speed of
event processing of the supplied callbacks from the rest of the Tango process.
Queues are bounded and will discard old data if the supplied callbacks
cannot keep up. The queue can be configured by manually allocating it with
the :meth:`allocate_queue` method and then passing the returned queue to
:meth:`register_event_callback` or :meth:`connect_event_stream`. Multiple event
streams can be allocated to the same queue to ensure events from each stream
are processed in order relative to each other.
Queues are processed by worker threads with each queue being assigned a
priority that is lower the more events that have been processed recently.
The worker threads process one event on the queue at a time, selecting a new
queue to process in between each event.
"""
_name_counter = itertools.count().__next__
[docs]
def __init__(
self,
*,
thread_count: int = 1,
name: str | None = None,
logger: logging.Logger | None = None,
) -> None:
"""
Initialise the object.
.. warning::
If ``thread_count > 1`` then registered callbacks for different
event streams can be executed concurrently and it is the users
responsibility to ensure this is safe. This is different from Tango
event subscriptions where callbacks are all called from the same
thread.
:param thread_count: Number of worker threads to spawn.
:param name: Name of the CallbackScheduler, if ``None`` a default name will be
provided.
:param logger: Logger object to use, if ``None`` the module logger will be used.
"""
if name is None:
name = f"CallbackScheduler-{CallbackScheduler._name_counter()}"
self.name = name
# If empty the event handler has been shutdown and
# we raise BrokenCallbackSchedulerError as a courtesy
self._threads: set[threading.Thread] = set()
self._callback_id_counter = itertools.count().__next__
# Lock invariant: Only 1 thread is calling _callback_id_counter at a
# time.
self._callback_id_lock = threading.Lock()
# We do not need a lock to guard _callbacks as CallbackID is an int so
# __eq__ does not run arbitrary code. See
# https://docs.python.org/3/library/threadsafety.html#thread-safety-dict.
self._callbacks: dict[
CallbackID, tuple[_EventStream, type_hints.EventCallbackType]
] = {}
self._queues = _QueueSet()
self._proxies = weakref.WeakValueDictionary[str, tango.DeviceProxy]()
# Lock invariant: At most one DeviceProxy exists for each trl
self._proxies_lock = threading.Lock()
self._event_streams: dict[_EventStream, _EventStreamMeta] = {}
# Lock invariant: At most one _EventStreamMeta exists for each
# _EventStream, including any _EventStreamMeta that is "on the stack".
#
# If both this lock and an _EventStreamMeta.lock need to be held at the
# same time, then this must be acquired first
self._event_streams_lock = threading.Lock()
self._active_heap = _ActiveHeap()
self._logger = logger if logger is not None else logging.getLogger(__name__)
for i in range(thread_count):
thread_name = f"{name}-{i}"
t = threading.Thread(
name=thread_name,
target=_worker,
args=(
self._queues,
self._active_heap,
self.name,
self._logger,
),
daemon=True,
)
t.start()
self._threads.add(t)
[docs]
def shutdown(self) -> None:
"""Shutdown background threads and subscribe from all events."""
threads = self._begin_shutdown()
self._wait_shutdown(threads)
def __del__(self) -> None:
"""Shutdown CallbackScheduler."""
self.shutdown()
@typing.overload
def register_event_callback(
self,
device_trl: str,
event_type: tango.EventType,
callback: type_hints.EventCallbackType,
/,
initial_event: bool = True,
queue_factory: typing.Callable[[], Queue] | None = None,
) -> _futs.Future[CallbackID]: ...
@typing.overload
def register_event_callback(
self,
device_trl: str,
attr: str,
event_type: tango.EventType,
callback: type_hints.EventCallbackType,
/,
initial_event: bool = True,
queue_factory: typing.Callable[[], Queue] | None = None,
) -> _futs.Future[CallbackID]: ...
@typing.overload
def register_event_callback(
self,
device: tango.DeviceProxy,
event_type: tango.EventType,
callback: type_hints.EventCallbackType,
/,
initial_event: bool = True,
queue_factory: typing.Callable[[], Queue] | None = None,
) -> _futs.Future[CallbackID]: ...
@typing.overload
def register_event_callback(
self,
device: tango.DeviceProxy,
attr: str,
event_type: tango.EventType,
callback: type_hints.EventCallbackType,
/,
initial_event: bool = True,
queue_factory: typing.Callable[[], Queue] | None = None,
) -> _futs.Future[CallbackID]: ...
[docs]
def register_event_callback(
self,
*args: typing.Any,
initial_event: bool = True,
queue_factory: typing.Callable[[], Queue] | None = None,
) -> _futs.Future[CallbackID]:
"""
Register a callback to an event stream.
If it is not already connected, this will create a temporary connection.
A temporary connection will automatically be disconnected when there are
no registered callbacks. See :meth:`connect_event_stream` for details.
If unregistering independently from other callbacks is not required, the
returned :class:`CallbackID` and/or :class:`~concurrent.futures.Future`
can be safely discarded.
>>> eh = CallbackScheduler()
>>> fut = eh.register_callback(
"foo/bar/1", "myAttr", tango.EventType.CHANGE_EVENT, my_callback)
>>> cid = fut.result(timeout=10) # Wait for connection to be established
>>> ...
>>> eh.unregister_callback(cid)
:param device: The Tango device producing the event stream.
:param device_trl: The Tango resource locator of the device producing the event
stream.
:param attr: The name of the attribute to connect to. Absent for
``INTERFACE_CHANGE_EVENT``.
:param event_type: The type of the event stream.
:param callback: Callback to call with event data.
:param queue_factory: Returns a queue to use for this event stream if not yet
connected.
:return: Future returning callback ID to unregister with.
"""
result = _futs.Future[CallbackID]()
callback: typing.Callable[..., None]
stream, callback = _resolve_stream_args(*args)
with self._callback_id_lock:
cid = CallbackID(self._callback_id_counter())
self._callbacks[cid] = (stream, callback)
def on_connection(fut: _ConnectionFuture) -> None:
try:
if result.set_running_or_notify_cancel():
meta, finalisers = fut.result()
# CoW so that events received before this point do
# not get sent to the new callback.
new = meta.callbacks.copy()
new.append(callback)
meta.callbacks = new
if initial_event and meta.last_event is not None:
# FIXME(tri): It would be nice if we batched up these initial
# calls when multiple callbacks are waiting for the
# connection... I'm not quite sure how to do it with
# the current architecture.
queue = self._queues.mapping[meta.queue]
queue.put(_Invocation(meta.last_event, [callback]))
finalisers.append(lambda: result.set_result(cid))
else:
try:
del self._callbacks[cid]
except KeyError:
pass
except Exception as ex:
result.set_exception(ex)
con_fut = _ConnectionFuture()
con_fut.add_done_callback(on_connection)
self._ensure_connected(stream, queue_factory, con_fut)
return result
[docs]
def unregister_callback(
self,
callback: CallbackID,
) -> None:
"""
Unregister a callback.
If there are no more callbacks associated with the event stream, then the
event stream will be disconnected unless it was marked as permanent via
:meth:`connect_event_stream`.
:param callback: The callback to unregister.
:raises ValueError: If the callback ID is not known.
"""
try:
stream, cb = self._callbacks.pop(callback)
except KeyError:
raise ValueError(f"Unknown callback ID {callback}.")
with self._event_streams_lock:
meta = self._event_streams[stream]
with meta.lock:
new = meta.callbacks.copy()
new.remove(cb)
meta.callbacks = new
if meta.should_delete():
del self._event_streams[stream]
if meta.should_delete():
meta.unsub_and_cancel(stream)
@typing.overload
def connect_event_stream(
self,
device_trl: str,
event_type: tango.EventType,
/,
queue_factory: typing.Callable[[], Queue] | None = None,
) -> _futs.Future[None]: ...
@typing.overload
def connect_event_stream(
self,
device_trl: str,
attr: str,
event_type: tango.EventType,
/,
queue_factory: typing.Callable[[], Queue] | None = None,
) -> _futs.Future[None]: ...
@typing.overload
def connect_event_stream(
self,
device: tango.DeviceProxy,
event_type: tango.EventType,
/,
queue_factory: typing.Callable[[], Queue] | None = None,
) -> _futs.Future[None]: ...
@typing.overload
def connect_event_stream(
self,
device: tango.DeviceProxy,
attr: str,
event_type: tango.EventType,
/,
queue_factory: typing.Callable[[], Queue] | None = None,
) -> _futs.Future[None]: ...
[docs]
def connect_event_stream(
self,
*args: typing.Any,
queue_factory: typing.Callable[[], Queue] | None = None,
) -> _futs.Future[None]:
"""
Connect to an event stream and maintain connection.
The connection is established via a Tango event subscription with a callback
that will push events to the queue associated with this event stream.
Calling this function marks an event stream as "permanent", meaning it will
not get disconnected even if there are no callbacks subscribed. This can be
useful to avoid multiple Tango subscription requests to the Tango device when
callbacks only want to be registered temporarily.
If the connection has not already been made and ``queue_factory is
None`` then a new queue will be allocated with the default arguments.
Otherwise the ``queue_factory`` will be called to construct a new queue.
Events are discarded rather than added to the queue if there are no
callbacks subscribed.
Connecting to the event stream is handled asynchronously and the
returned future can be used to track the progress of the connection. If
the connection is already active when this function is called, then the
returned future will be fulfilled. Call
:meth:`~concurrent.futures.Future.result` to wait until the connection has
completed. The future may be :meth:`~concurrent.futures.Future.cancel`'d to
abort the connection if it has not already been completed.
.. warning::
The connection is considered completed as soon as Tango calls the internal
subscription callback. This may be with an error event informing us that
we are unable to connect to the Tango device for some reason.
Example::
>>> eh = CallbackScheduler()
>>> fut = eh.connect_event_stream(
"foo/bar/1", "myAttr", tango.EventType.CHANGE_EVENT)
>>> fut.result(timeout=10) # Wait for connection to be established
:param device: The Tango device producing the event stream.
:param device_trl: The Tango resource locator of the device producing the event
stream.
:param attr: The name of the attribute to connect to. Absent for
``INTERFACE_CHANGE_EVENT``.
:param event_type: The type of the event stream.
:param queue_factory: Returns a queue to use for this event stream if not yet
connected.
:return: Future to track the progress of the connection.
"""
result = _futs.Future[None]()
stream, _ = _resolve_stream_args(*args)
def on_connection(fut: _ConnectionFuture) -> None:
try:
if result.set_running_or_notify_cancel():
meta, finalisers = fut.result()
meta.temporary = False
finalisers.append(lambda: result.set_result(None))
except Exception as ex:
result.set_exception(ex)
con_fut = _ConnectionFuture()
con_fut.add_done_callback(on_connection)
self._ensure_connected(stream, queue_factory, con_fut)
return result
@typing.overload
def disconnect_event_stream(
self,
device_trl: str,
event_type: tango.EventType,
/,
) -> None: ...
@typing.overload
def disconnect_event_stream(
self,
device_trl: str,
attr: str,
event_type: tango.EventType,
/,
) -> None: ...
@typing.overload
def disconnect_event_stream(
self,
device: tango.DeviceProxy,
event_type: tango.EventType,
/,
) -> None: ...
@typing.overload
def disconnect_event_stream(
self,
device: tango.DeviceProxy,
attr: str,
event_type: tango.EventType,
/,
) -> None: ...
[docs]
def disconnect_event_stream(
self,
*args: typing.Any,
) -> None:
"""
Disconnect from an event stream.
If there are callbacks associated with the event stream, then this
function will just mark the event stream as "temporary" and the event
stream will be disconnected when the last callback is unregistered.
:param device: The Tango device producing the event stream.
:param device_trl: The Tango resource locator of the device producing the event
stream.
:param attr: The name of the attribute to connect to. Absent for
``INTERFACE_CHANGE_EVENT``.
:param event_type: The type of the event stream.
"""
stream, _ = _resolve_stream_args(*args)
with self._event_streams_lock:
meta = self._event_streams[stream]
with meta.lock:
meta.temporary = True
if meta.should_delete():
del self._event_streams[stream]
if meta.should_delete():
meta.unsub_and_cancel(stream)
[docs]
def disconnect_all(self) -> None:
"""
Unregister all callbacks and disconnect all event streams.
Unlike :meth:`shutdown`, the worker threads are still running and the
:class:`CallbackScheduler` can still be used.
"""
with self._event_streams_lock:
streams = self._event_streams
self._callbacks.clear()
self._event_streams = {}
for stream, meta in streams.items():
meta.unsub_and_cancel(stream)
[docs]
def allocate_queue(self, *, queue_size: int = 8) -> Queue:
"""
Allocate an event queue.
:param queue_size: Size of the queue. When full old entries will be discarded
to make space for the new.
:return: The allocated queue.
"""
if not self._threads:
raise BrokenCallbackSchedulerError()
return self._queues.allocate(self._active_heap, queue_size)
@typing.overload
def get_queue(
self,
device_trl: str,
event_type: tango.EventType,
/,
) -> Queue: ...
@typing.overload
def get_queue(
self,
device_trl: str,
attr: str,
event_type: tango.EventType,
/,
) -> Queue: ...
@typing.overload
def get_queue(
self,
device: tango.DeviceProxy,
event_type: tango.EventType,
/,
) -> Queue: ...
@typing.overload
def get_queue(
self,
device: tango.DeviceProxy,
attr: str,
event_type: tango.EventType,
/,
) -> Queue: ...
[docs]
def get_queue(self, *args: typing.Any) -> Queue:
"""
Return the queue used by a particular event stream.
:param trl: The Tango resource locator of the origin of the event stream.
:param event_type: The type of the event stream.
:return: ID of the queue.
"""
if not self._threads:
raise BrokenCallbackSchedulerError()
stream, _ = _resolve_stream_args(*args)
try:
meta = self._event_streams[stream]
return meta.queue
except KeyError:
raise KeyError(f"Unknown event stream {stream!r}")
def _get_proxy(self, trl: str) -> tango.DeviceProxy:
with self._proxies_lock:
if trl not in self._proxies:
result = tango.DeviceProxy(trl)
self._proxies[trl] = result
return result
return self._proxies[trl]
def _ensure_connected(
self,
stream: _EventStream,
queue_factory: typing.Callable[[], Queue] | None,
fut: _ConnectionFuture,
) -> None:
"""
Ensure an event stream is connected.
To maximise concurrency and avoid deadlocks this method ensures
the following:
1. We release the `_event_streams_lock` ASAP to allow other threads
to access other event streams.
2. We grab the `_EventStreamData.lock` before we release the
`_event_stream_lock` so that we can mark the data stream as in use before
it can be deleted.
3. We do not call "user code" while holding a lock as this may lead to a
deadlock.
`fut` will be set with the `_EventStreamData` object and a list of
finalisers while holding the `_EventStreamData.lock` or an exception if
the event stream is disconnected while the connection is still pending.
After this the `_EventStreamData` will be checked to see if it should be
deleted, so the `fut` should have a done callback to mark the data as in
use when required. After this check, the `finalisers` will be called while
not holding the `_EventStreamData.lock`. These should be used to set the result
of user `Future`s.
"""
if not self._threads:
raise BrokenCallbackSchedulerError()
fut.set_running_or_notify_cancel() # Mark as uncancelable
# If true, we need to do a Tango event subscription to connect
# this event stream.
sub_required = False
with self._event_streams_lock:
if stream in self._event_streams:
meta = self._event_streams[stream]
else:
if queue_factory is None:
queue = self.allocate_queue()
else:
queue = queue_factory()
meta = _EventStreamMeta(queue)
self._event_streams[stream] = meta
sub_required = True
meta.lock.acquire()
finalisers: list[typing.Callable[[], None]] = []
try:
if meta.pending_futures is not None:
meta.pending_futures.append(fut)
else:
fut.set_result((meta, finalisers))
finally:
meta.lock.release()
for cb in finalisers:
cb()
if not sub_required:
return
connected = False
def on_event_data(
event: tango.EventData,
metaref: weakref.ref[_EventStreamMeta] = weakref.ref(meta),
) -> None:
# Use a weakref to avoid this callback keeping the Meta alive.
meta = metaref()
if meta is None:
return
with meta.lock:
meta.last_event = event
if meta.callbacks:
queue = self._queues.mapping[meta.queue]
queue.put(_Invocation(event, meta.callbacks))
# On initial connection (successful or otherwise) inform all waiting
# _ConnectionFutures so the done callback can update the meta with
# either a EventCallbackType or marking it as permanent.
nonlocal connected
if not connected:
# Callbacks to update user futures _after_ we have resolved
# the connection.
finalisers: list[typing.Callable[[], None]] = []
with meta.lock:
futs = meta.pending_futures
meta.pending_futures = None
assert futs is not None
for f in futs:
# This calls a done callback as we always add the
# callback to the _ConnectionFuture before we pass it
# here.
f.set_result((meta, finalisers))
# If all the user futures were cancelled, this event stream might
# need to be disconnected as no callbacks were added and it wasn't
# marked permanent.
with self._event_streams_lock:
with meta.lock:
if meta.should_delete():
del self._event_streams[stream]
if meta.should_delete():
meta.unsub_and_cancel(stream)
connected = True
for cb in finalisers:
cb()
try:
meta.proxy = self._get_proxy(stream.device_trl)
args = stream.sub_args
if version.parse(tango.__version__) < version.parse("10.1.0"):
meta.sub_id = meta.proxy.subscribe_event(
*args, on_event_data, stateless=True
)
else:
meta.sub_id = meta.proxy.subscribe_event(
*args,
on_event_data,
sub_mode=tango.EventSubMode.AsyncRead,
)
except Exception:
with self._event_streams_lock:
with meta.lock:
try:
del self._event_streams[stream]
except KeyError:
pass
meta.unsub_and_cancel(stream)
raise
def _begin_shutdown(self) -> list[threading.Thread]:
threads = list(self._threads)
self._threads.clear()
self.disconnect_all()
for _ in threads:
self._active_heap.submit(_NULL_QUEUE)
return threads
def _wait_shutdown(self, threads: list[threading.Thread]) -> None:
for t in threads:
t.join()
class _ActiveHeap:
def __init__(self) -> None:
self.work_available = threading.Condition()
self.heap: list[_InvocationQueue] = []
self.ticket_counter = itertools.count(1).__next__
def submit(self, queue: _InvocationQueue) -> None:
# queue lock must be held
with self.work_available:
if queue is not _NULL_QUEUE:
queue.active = True
queue.ticket = self.ticket_counter()
heapq.heappush(self.heap, queue)
self.work_available.notify()
def next_queue(self, current: _InvocationQueue) -> _InvocationQueue:
if current is not _NULL_QUEUE:
with current.mutex:
if current.items:
with self.work_available:
current.ticket = self.ticket_counter()
return heapq.heappushpop(self.heap, current)
current.active = False
del current
with self.work_available:
while not self.heap:
# Reset the ticket counter while nothing is going on to avoid
# the ticket numbers getting too large
self.ticket_counter = itertools.count(1).__next__
self.work_available.wait()
return heapq.heappop(self.heap)
@dataclass(slots=True)
class _Invocation:
event: type_hints.EventDataType
callbacks: list[type_hints.EventCallbackType]
class _InvocationQueue:
"""
A thread-safe, bounded queue that never blocks.
Unlike queue.SimpleQueue, this queue is bounded.
Unlike queue.Queue, this queue will discard the oldest data to
make room for new data.
The queue will add itself to it's ``_ActiveHeap`` when it has data.
"""
def __init__(self, active_heap: _ActiveHeap, max_size: int) -> None:
self.items = collections.deque[_Invocation]()
self.active_heap = active_heap
self.max_size = max_size
self.active = False
# Lower priority is higher precedence, 0.0 is the highest
# allowed precedence
self.priority = 0.0
# Unique for all active queues. Used as a tie breaker to ensure
# round-robin.
self.ticket = 0
# Must be held for all operations.
self.mutex = threading.Lock()
def put(self, item: _Invocation) -> None:
"""
Put an item into the queue.
If the queue is full, the first (oldest) item will be removed to make room.
:param item: Item to insert
"""
with self.mutex:
# This should only ever run once, but we defensively use
# a while loop to make sure we make enough room.
while len(self.items) >= self.max_size:
self.items.popleft()
self.items.append(item)
if not self.active:
self.active_heap.submit(self)
def get(self) -> _Invocation | None:
"""Get an item from the front of the queue."""
with self.mutex:
if len(self.items):
return self.items.popleft()
return None
def __lt__(self, other: _InvocationQueue) -> bool:
return (self.priority, self.ticket) < (other.priority, other.ticket)
_NULL_QUEUE = _InvocationQueue(typing.cast(_ActiveHeap, None), 0)
@dataclass(init=True, unsafe_hash=True, eq=True, slots=True, frozen=True)
class _EventStream:
device_trl: str
attr: str | None
event_type: tango.EventType
@property
def sub_args(self) -> tuple[typing.Any, ...]:
if self.attr is None:
return (self.event_type,)
return (self.attr, self.event_type)
_T = typing.TypeVar("_T")
def _resolve_stream_args(
device_proxy_or_trl: tango.DeviceProxy | str,
attr_or_type: str | tango.EventType,
event_type: tango.EventType | _T,
arg: _T | None = None,
) -> tuple[_EventStream, _T]:
if isinstance(attr_or_type, str):
attr = attr_or_type
else:
attr = None
event_type = attr_or_type
arg = typing.cast(_T, event_type)
if isinstance(device_proxy_or_trl, str):
if not device_proxy_or_trl.startswith("tango://"):
db = tango.Database()
db_host = db.get_db_host()
db_port = db.get_db_port()
device_trl = f"tango://{db_host}:{db_port}/{device_proxy_or_trl}"
else:
device_trl = device_proxy_or_trl
else:
db_port = device_proxy_or_trl.get_db_port()
dev_name = device_proxy_or_trl.dev_name()
if db_port == "Unused":
dev_host = device_proxy_or_trl.get_dev_host()
dev_port = device_proxy_or_trl.get_dev_port()
device_trl = f"tango://{dev_host}:{dev_port}/{dev_name}#dbase=no"
else:
db_host = device_proxy_or_trl.get_db_host()
device_trl = f"tango://{db_host}:{db_port}/{dev_name}"
return _EventStream(device_trl, attr, event_type), typing.cast(_T, arg)
@dataclass
class _EventStreamMeta:
# Queue to add subscriptions to.
queue: Queue
# Lock invariants:
# - (callbacks or not temporary) or the meta is not in the dictionary
# - pending_futures is None and last_event is not None or the
# event stream is still connecting
# - if sub_id is not None, then there is an active Tango subscription
lock: threading.Lock = field(default_factory=threading.Lock)
# Proxy used to connect with.
# Invariant: If proxy is None then so is sub_id.
proxy: tango.DeviceProxy | None = None
# The Tango subscription ID for this event stream.
# Invariant: If sub_id is None then so is proxy.
sub_id: int | None = None
# The most recent event received for the event stream. This is used to
# provide the initial call when we register a callback.
last_event: typing.Any = None
# A possibly empty list of callbacks to call whenever an event is received.
# This list must be treated as CoW as the existing list might be reference
# by an _Invocation in some _InvocationQueue.
callbacks: list[type_hints.EventCallbackType] = field(default_factory=list)
# Futures to be notified when the connection is established. Each future
# must have a done callback to mark the meta as "in use" when called. If
# None then the meta has already established the connection.
pending_futures: list[_ConnectionFuture] | None = field(default_factory=list)
# If True then the meta will be deleted when there are no remaining
# subscriptions.
# Invariant: if temporary is True then callbacks is not empty
temporary: bool = True
def should_delete(self) -> bool:
return self.temporary and not self.callbacks
def unsub_and_cancel(self, stream: _EventStream) -> None:
if self.proxy is not None and self.sub_id is not None:
self.proxy.unsubscribe_event(self.sub_id)
self.proxy = None
self.sub_id = None
if self.pending_futures is not None:
for fut in self.pending_futures:
fut.set_exception(RuntimeError(f"{stream!r} has been disconnected"))
_DECAY_PERIOD = 10
_DECAY_FACTOR = 5 / 8
@dataclass
class _QueueSet:
mapping: weakref.WeakKeyDictionary[Queue, _InvocationQueue] = field(
default_factory=weakref.WeakKeyDictionary
)
# Lock invariant: Only one thread is either iterating or adding to
# mapping at a time.
lock: threading.Lock = field(default_factory=threading.Lock)
next_decay: float = field(default_factory=lambda: time.monotonic() + _DECAY_PERIOD)
def allocate(self, active_heap: _ActiveHeap, size: int) -> Queue:
with self.lock:
qid = Queue()
self.mapping[qid] = _InvocationQueue(active_heap, size)
return qid
def decay_priorities(self) -> None:
with self.lock:
now = time.monotonic()
while now > self.next_decay:
self.next_decay += _DECAY_PERIOD
for q in self.mapping.values():
q.priority *= _DECAY_FACTOR
_ConnectionFuture: typing.TypeAlias = _futs.Future[
tuple[_EventStreamMeta, list[typing.Callable[[], None]]]
]
def _worker(
queues: _QueueSet,
active_heap: _ActiveHeap,
name: str,
logger: logging.Logger,
) -> None:
with tango.EnsureOmniThread():
queue = _NULL_QUEUE
while True:
queues.decay_priorities()
queue = active_heap.next_queue(queue)
if queue is _NULL_QUEUE:
return
item = queue.get()
if item is not None:
for cb in item.callbacks:
try:
if callable(cb):
cb(item.event)
else:
cb.push_event(item.event)
except Exception:
logger.exception(
f"{name}: Callback {cb} raised an exception "
f"for {item.event}. Continuing."
)
queue.priority += 1