.subscribing package
Subpackages
Submodules
.subscribing.base module
- class AttributeInt(name: str, value: Any, timestamp=None)[source]
Bases:
object
An abstraction of a Device Attribute so that it provides the minimum set of fields and methods necessary to work with subscription module
- class EventDataInt(producer_name: str, attr_name: str, val: Optional[Any] = None, timestamp=None)[source]
Bases:
object
An abstraction of event data that provides the minium set of attributes and methods necessary to work with subscription module. It is derived from the specific tango EventData so that the EventData from tango devices can also work in place.
- class EventItemBase[source]
Bases:
object
TODO
- handler: Optional[ska_ser_skallop.subscribing.base.MessageHandlerBase]
- subscription: ska_ser_skallop.subscribing.base.SubscriptionBase
- class EventTimeInt(timestamp: Optional[float] = None)[source]
Bases:
object
An abstraction of a Device event’s time of occurrence so as to be compatible with the Tango Device attribute’s time object
- isoformat() str [source]
renders a datetime into isformatted string
- Returns
the isoformattted string
- todatetime() datetime.datetime [source]
renders the object as a datetime type
- class EventsPusherBase[source]
Bases:
ska_ser_skallop.subscribing.base.Subscriber
- handler: Optional[ska_ser_skallop.subscribing.base.MessageHandlerBase]
- push_event(event: ska_ser_skallop.subscribing.base.EventDataInt) None [source]
the method that is called by the producer when an event occurs on a specific subscription
- class MessageBoardBase[source]
Bases:
object
- add_subscription(producer: ska_ser_skallop.subscribing.base.BaseProducer, attr: str, handler: ska_ser_skallop.subscribing.base.MessageHandlerBase, polling: bool = False) ska_ser_skallop.subscribing.base.SubscriptionBase [source]
adds a new subscription
- archived_subscriptions: Set[ska_ser_skallop.subscribing.base.SubscriptionBase]
- board: queue.Queue
- abstract get_current_items() List[ska_ser_skallop.subscribing.base.EventItemBase] [source]
Get the current events contained in the buffer of a message board object.
Note this should be a method of the MessageBoard Class in skallop
- Returns
A list of current event items
- abstract get_items(timeout: float = 0) Iterator[ska_ser_skallop.subscribing.base.EventItemBase] [source]
Iterates over the
EventItem
instances placed in its internal queue as a consequence of subscriptions toProducer
instances. If the queue is currently empty it will wait for a given timeout until either returning with aStopIteration
or raising anska_ser_skallop.subscribing.exceptions.EventTimedOut
exception (depending on whether a subscription is configured to suppress or not suppress timeouts)- Parameters
timeout – the maximum amount of time to wait for any events to be placed on the queue, defaults to 0 in the case of call back subscriptions, otherwise in case of a polling subscription, it will wait at least for the
ska_ser_skallop.subscribing.message_board.MessageBoard.gathered_sleep_time
amount of time.- Raises
EventTimedOut – if timeout is not suppressed by subscription (note a timeout condition will result in all remaining subscriptions to be canceled)
- Returns
a
StopIteration
during a suppressed timeout or when all subscriptions have been removed.- Yield
yields an item in the queue if not empty
- abstract log(message: str, time: Optional[datetime.datetime] = None, label=None)[source]
puts a log message on the message board
- property non_expendable_subscriptions: Set[ska_ser_skallop.subscribing.base.SubscriptionBase]
- abstract play_log_book(filter_log: bool = True, log_filter_pattern: str = '') str [source]
Returns the contents of the logbook as a string of messages seperated by newline characters
- Parameters
filter_log – whether the log book should filter out messages labeled as “log_filter”, defaults to True
- Returns
the logbook contents
- abstract remove_subscription(subscription: ska_ser_skallop.subscribing.base.SubscriptionBase)[source]
removes a given subscription
- subscriptions: Set[ska_ser_skallop.subscribing.base.SubscriptionBase]
- class MessageHandlerBase[source]
Bases:
object
- handle_event(event: ska_ser_skallop.subscribing.base.EventDataInt, subscription: ska_ser_skallop.subscribing.base.SubscriptionBase, *args) None [source]
- handle_timedout(producer: ska_ser_skallop.subscribing.base.Producer, attr: str, *args, **kwargs) str [source]
- print_event(event: ska_ser_skallop.subscribing.base.EventDataInt, ignore_first=False) str [source]
- class Producer(name: str)[source]
Bases:
ska_ser_skallop.subscribing.base.BaseProducer
something that can be subscribed or unsubscribed to the interface is an generalization of the DeviceProxy so that the DeviceProxy can be a specific instance of a producer without having to change its interface. A specific instance of a Producer must have at least the following methods:
- get_events(subscription_id: int) List[ska_ser_skallop.subscribing.base.EventDataInt] [source]
returns events generated on a specific subscription that was held in an internal buffer. This is for when the subscription is polled based.
- poll_attribute(attr: str, period: int)[source]
sets a poll period on a producer to periodically check if an attribute value has change and only then publish it
- subscribe_event(attr: str, event_type: Any, subscriber: Union[ska_ser_skallop.subscribing.base.Subscriber, int, Callable[[ska_ser_skallop.subscribing.base.EventDataInt], None]]) int [source]
sets up an subscription by registering a subscriber to be notified (via the push_event method) when a particular event type occurs on the specified attribute. In an integer is given in stead of a Subscriber the subscription is polled and the producers stores any events internally for collection by the subscriber at a later stage
- class Subscriber[source]
Bases:
object
- push_event(event: ska_ser_skallop.subscribing.base.EventDataInt) None [source]
the method that is called by the producer when an event occurs on a specific subscription
- class SubscriptionBase[source]
Bases:
object
- buffer_size = 100
- producer: ska_ser_skallop.subscribing.base.Producer = <ska_ser_skallop.subscribing.base.Producer object>
- subscribe_by_callback(board: queue.Queue) None [source]
.subscribing.configuration module
- class MessageboardContainer(*args: Any, **kwargs: Any)[source]
Bases:
object
- get_message_board() ska_ser_skallop.subscribing.base.MessageBoardBase [source]
retrieves the message board for use in the app, if this is the first time call has been made, the object shall first determine a default messageboard
- inject(board: ska_ser_skallop.subscribing.base.MessageBoardBase, provided_by: str)[source]
- class Provider(provider, provided_by)[source]
Bases:
NamedTuple
- provider: ska_ser_skallop.subscribing.base.MessageBoardBase
Alias for field number 0
- determine_messageboard() ska_ser_skallop.subscribing.base.MessageBoardBase [source]
- patch_messageboard(board: ska_ser_skallop.subscribing.base.MessageBoardBase = None, provided_by: str = '')[source]
- set_messageboard(board: ska_ser_skallop.subscribing.base.MessageBoardBase, provided_by: str)[source]
.subscribing.event_item module
- class EventItem(event: ska_ser_skallop.subscribing.base.EventDataInt, subscription: ska_ser_skallop.subscribing.base.SubscriptionBase, handler: Optional[ska_ser_skallop.subscribing.base.MessageHandlerBase])[source]
Bases:
ska_ser_skallop.subscribing.base.EventItemBase
Class for tying an incoming event from a subscription to the subscription itself as well as a registered event handler and allow the extraction of these items
- describe() Tuple[str, str, str, str] [source]
Returns a description of the event data in the form of a tuple of strings in the following order:
The name of the producer (device)
The name of the attribute for which the subscription was based
The value of the attribute
The date at which the event was generated
- Returns
[description]
- get_attr_name() str [source]
Returns the attribute of the producer upon which events were raised
- Returns
the name of the attribute
- get_attr_value_str() str [source]
Returns the current value of the attribute rendered as a string
- Returns
the current value of the attribute
- get_date_init() datetime.datetime [source]
Returns the date the event item (not the event it self) was created
- Returns
the date of event item creation
- get_date_lodged() datetime.datetime [source]
Returns the date the event was created by the producer
- Returns
the date when the event was lodged
- get_date_lodged_isoformat() str [source]
Returns the date the event was generated, but rendered as a string in isoformat (usefull for logging purposes)
- Returns
the isoformatted date
- get_producer_name() str [source]
Returns the name of the producer (or device) that generatted the evnt
- Returns
the name of the producer
- handler: Optional[ska_ser_skallop.subscribing.base.MessageHandlerBase]
- subscription: ska_ser_skallop.subscribing.base.SubscriptionBase
.subscribing.exceptions module
- exception EventTimedOut(message: str, remaining_events='')[source]
Bases:
Exception
Used to indicate a timeout has occurred whilst waiting for events to come in from a set of subscriptions. The timeout message will contain a list of diagnostic messages from remaining subscriptions in order to assist with fault finding.
.subscribing.helpers module
- class LogBook[source]
Bases:
object
- log(message: str, timestamp: Optional[datetime.datetime] = None, label=None)[source]
- log_filer = 'log'
- class LogMessage(time, log, label)
Bases:
tuple
- label
Alias for field number 2
- log
Alias for field number 1
- time
Alias for field number 0
- class Tracer(message: Optional[str] = None)[source]
Bases:
object
class used to record messages at specific events
- class TracerMessage(time, message)[source]
Bases:
NamedTuple
- time: datetime.datetime
Alias for field number 0
- describe_event(event: ska_ser_skallop.subscribing.base.EventDataInt, init_date: datetime.datetime = datetime.datetime(2023, 6, 20, 8, 24, 51, 839901)) Tuple[str, str, str, str] [source]
Return an event as a tuple of strings describing the event.
- Returns
an event as a tuple of strings describing the event.
- get_attr_name(event: ska_ser_skallop.subscribing.base.EventDataInt) str [source]
returns the event attribute for which the value haven been set
- get_attr_value_as_str(attr: ska_ser_skallop.subscribing.base.AttributeInt) str [source]
transform a tango base.DeviceAttribute value into a string as determined by its type (name)
- get_attr_value_str(event: ska_ser_skallop.subscribing.base.EventDataInt) str [source]
returns the attribute value for an event as a string
- get_date_lodged(event: ska_ser_skallop.subscribing.base.EventDataInt, init_date: Optional[datetime.datetime] = None) datetime.datetime [source]
returns the initial date an event was generated (if it exists). If it does not exist a new date can either be injected as a parameter or generated at the time of call
- get_date_lodged_isoformat(event: ska_ser_skallop.subscribing.base.EventDataInt, init_date: datetime.datetime = datetime.datetime(2023, 6, 20, 8, 24, 51, 839893)) str [source]
renders the date for an event as an isoformated string
- get_device_name(event: ska_ser_skallop.subscribing.base.EventDataInt) str [source]
returns the tango device owning the event
- i_can_subscribe(producer: ska_ser_skallop.subscribing.base.Producer, attr: str, event_type: Any) bool [source]
- print_tracers(tracers: List[ska_ser_skallop.subscribing.helpers.Tracer]) str [source]
- unpack_event(event: ska_ser_skallop.subscribing.base.EventDataInt, init_date: datetime.datetime = datetime.datetime(2023, 6, 20, 8, 24, 51, 839906)) Tuple[str, str, str, datetime.datetime] [source]
returns a tuple of key attributes for an event as device name, attribute, value and date lodged
.subscribing.message_board module
Contains the MessageBoard class and related classes to implement a Message Board
- class MessageBoard[source]
Bases:
ska_ser_skallop.subscribing.base.MessageBoardBase
Encapsulates a queue containing events that gets places by events pushers onto it. Also keeps track of subscriptions and allow adding and removing of a subscription. Lastly, it keeps log of various internal messages and transitions to assist in describing the occurrence of events.
- add_subscription(producer: ska_ser_skallop.subscribing.base.Producer, attr: str, handler: ska_ser_skallop.subscribing.message_handler.MessageHandler, polling: bool = False) ska_ser_skallop.subscribing.subscription.Subscription [source]
Registers and initiate a new subscription on a given producer with a specific attribute. A subscription can be seen as a request to be notified when an event occurs on a particular attribute of an entity. In this implementation an event is seen as when the value of an attribute of that entity has changed. The role of the producer is then to notify the subscriber with two possible ways:
- By means of a given call to be made by a producer (call back) on a given
object
- By storing the event temporarily in a buffer for later retrieval
(polling = True)
However, both of these options achieve the same external result on the
MessageBoard
by resulting in events being placed on its internal queue. These events can then be retrieved later from theMessageBoard
, together with information about the subscription and a given handler to consume the results of the event.- Parameters
producer – the object responsible for generating the events and upon which the subscription must be placed.
attr – the attribute (item of of interest) for which a change in value must result in a notification event
handler – the object responsible for consuming the contents of the event. Note this is different from the subscriber handler that will be given to the producer for handling the incoming event, in this case the handler is just an optional attrubute of the
EventItem
that will be placed on the queue, providing a means for consuming events as they get pulled from the buffer.polling – whether the internal event generation must be done via polling or call back. In general the call back mechanism is simpler but in cases where a producer is not able to call a subscriber directly then the polling method is also available, defaults to False
- Returns
returns the created subscription in cases the client code wants to use the subscription later
- archived_subscriptions: Set[ska_ser_skallop.subscribing.base.SubscriptionBase]
- board: queue.Queue
- gathered_sleep_time = 0.05
- get_current_items() List[ska_ser_skallop.subscribing.event_item.EventItem] [source]
Get the current events contained in the buffer of a message board object.
Note this should be a method of the MessageBoard Class in skallop
- Returns
A list of current event items
- get_items(timeout: float = 0) Iterator[ska_ser_skallop.subscribing.event_item.EventItem] [source]
Iterates over the
EventItem
instances placed in its internal queue as a consequence of subscriptions toProducer
instances. If the queue is currently empty it will wait for a given timeout until either returning with aStopIteration
or raising anska_ser_skallop.subscribing.exceptions.EventTimedOut
exception (depending on whether a subscription is configured to suppress or not suppress timeouts)- Parameters
timeout – the maximum amount of time to wait for any events to be placed on the queue, defaults to 0 in the case of call back subscriptions, otherwise in case of a polling subscription, it will wait at least for the
gathered_sleep_time
amount of time.- Raises
EventTimedOut – if timeout is not suppressed by subscription (note a timeout condition will result in all remaining subscriptions to be canceled)
- Returns
returns a
StopIteration
during a suppressed timeout or when all subscriptions have been removed.- Yield
yields an item in the queue if not empty
- log(message: str, time: Optional[datetime.datetime] = None, label=None) None [source]
Logs a message with optional time of occurrence
- Parameters
message – the messages to log
time – the time at which this message was deemed to take place (if None then it will use its own time), defaults to None
label – any labels to annotate the message with, defaults to None
- log_filter = 'log'
- play_log_book(filter_log: bool = True, log_filter_pattern: str = '') str [source]
Returns the contents of the logbook as a string of messages seperated by newline characters
- Parameters
filter_log – whether the log book should filter out messages labeled as
log_filter
, defaults to True- Returns
the logbook contents
- print_remaining_subscriptions() str [source]
Returns a description of all current subscriptions
- Returns
the description of the subscriptions
- remove_all_subscriptions() None [source]
Remove all subscriptions by unsubscribing on the producers. Note all removed subscriptions are placed in the archived list for diagnostic purposes.
- remove_subscription(subscription: ska_ser_skallop.subscribing.base.SubscriptionBase) None [source]
Remove a given subscription by unsubscribing on the producer. Note the removed subscriptions is placed in the archived list for diagnostic purposes.
- Parameters
subscription – the subscription to remove
- replay_subscription(subscription: ska_ser_skallop.subscribing.base.SubscriptionBase) str [source]
Replay the tracer logs generated on a particular subscription
- Parameters
subscription – the subscription to replay
- Returns
the tracer logs
- replay_subscriptions() str [source]
Replay the tracer logs from current and archived subscriptions
- Returns
the tracer logs
- subscriptions: Set[ska_ser_skallop.subscribing.base.SubscriptionBase]
.subscribing.message_handler module
- class MessageHandler(board: ska_ser_skallop.subscribing.base.MessageBoardBase, handler_annotation: str = '', enable_pre_handling_annotations: bool = True)[source]
Bases:
ska_ser_skallop.subscribing.base.MessageHandlerBase
A basic implementation of the
MessageHandlerBase
containing typical generic event handling behaviour for convenience. Developers can inherit from this base class in order to implement event handling. The following basic features are provided:Recording and updating of current state from event data (see
load_event()
,replay()
)Subscription behavior: remove the subscription generating this event or remove all from messageboard
A pre and post context for handling an event (see
handle_context()
andhandle_event()
)Logging/Printing of the data (see
print_event()
,replay()
)
- describe_self() str [source]
Currently returns empty string. When inheriting you should overide this method and return a string representation of what the current event handler does
- Returns
a description of what the handler does
- handle_context(event: ska_ser_skallop.subscribing.base.EventDataInt, subscription: ska_ser_skallop.subscribing.base.SubscriptionBase) Iterator[None] [source]
Context manager for use with an handler event method. When you place your event handling code within this context then the basic pre and post functionality will be included in the handling:
Pre-handling: determine the basic ordering state (wether it is the first, second or subsequent events). In addition it may also unsubscribe automatically before processing the event if the attribute
cancel_at_next_event
has been setPost-handling: redords on the message board when the event item retrieved is finished handling it.
When inheriting you can update the pre and post methods (
_pre_handling
,_post_handling
) with more sophisticated behaviour:
- handle_event(event: ska_ser_skallop.subscribing.base.EventDataInt, subscription: ska_ser_skallop.subscribing.base.SubscriptionBase, *args) None [source]
This is the basic method that normally gets called by the client to handle an event on the messageboard. It should be overridded when you want to create specific behaviour.
- handle_timedout(producer: ska_ser_skallop.subscribing.base.BaseProducer, attr: str, print_header: bool = True, print_tracer: bool = False) str [source]
called by the messageboard when it has waited too long for events on a subscription. The result of this is a string containing diagnostic data about the event handler to assist a tester in determining the cause of the time out.
- Parameters
producer – The producer of the event data
attr – the attribute upon which the subscription is based
print_header – whether the diagnostic data should be preceded by a print header, defaults to True
- Returns
the actual diagnostic data used to understand the cause of the time out
- load_event(event: ska_ser_skallop.subscribing.base.EventDataInt, subscription: ska_ser_skallop.subscribing.base.SubscriptionBase) None [source]
Updates the state of the handler by setting the contents of attr:current_event and attr:current_subscription to input parameters
- print_event(event: ska_ser_skallop.subscribing.base.EventDataInt, ignore_first: bool = False) str [source]
This method is typically called by the client to assist the testing script. It returns a string describing the received event with annotations about its order and context. It also writes the same string to the messageboard so that all messages coming from event handlers can be centralized and replayed later.
- Parameters
event – The eventdata as extracted from the messageboard
EventItem
objectignore_first – whether a the first event resulting from a subscription should be ignored. This is typically when a subscription on a tango device results in an immediate event being generated before a test execution has ran. In such cases it may be useful to treat those events as not part of the actual test scenario, defaults to False
- Returns
the event described in a useful context in order to assist the tester in understanding the dynamic situation.
- replay() str [source]
returns a string that lists all the messages logged by the handler during its lifetime in order to assist with diagnosis”
- Returns
the diagnostic string
- suppress_timeout() bool [source]
This method is used by the messageboard to determine whether or not it should raise and exception when the timeout occurred or wether it should simply halt the iteration. Note, this is set to always return False. Override this method, if you want your handler to behave differently.
- Returns
indicates whether it should
- unsubscribe(subscription: ska_ser_skallop.subscribing.base.SubscriptionBase)[source]
Unsubscribes the given subscription fom the messageboard. Use this typically if the incoming event indicates no further events is expected (and thus no further waiting is necessary)
- Parameters
subscription – The subscription used to generate the event data (also extracted from the messageboard
EventItem
object)
.subscribing.producers module
- class BufferedSubscriber[source]
Bases:
ska_ser_skallop.subscribing.base.Subscriber
- push_event(event: ska_ser_skallop.subscribing.base.EventDataInt) None [source]
the method that is called by the producer when an event occurs on a specific subscription
- class Producer(name: str)[source]
Bases:
ska_ser_skallop.subscribing.base.Producer
An emulation of a producer with the ability to be subscribed to and to respond when being called to push events by in turn calling all its subscribers. This Class could be usefull to test subscriptions without needing externally running applications.
- describe_subscription(subscription_id: int) Dict [source]
Gives a description of a current subscription
- Parameters
subscription_id – the identification of the subscription
- Returns
the description
- get_events(subscription_id: int) List[ska_ser_skallop.subscribing.base.EventDataInt] [source]
Used for when polling based supscription is used
- Parameters
subscription_id – the subscription id
- Returns
a list of events generated since last retrieval
- push_event(attr: str, event: ska_ser_skallop.subscribing.base.EventDataInt) None [source]
- subscribe_event(attr: str, event_type: Any, subscriber: Union[ska_ser_skallop.subscribing.base.Subscriber, int]) int [source]
Registers a subscription on a producer based on a given attr.
- Parameters
attr – the attribute for which events must be generated when an event has been pushed
event_type – The event type is to ensure the interface matches to the tango Device interface even though only events of Change type are considered.
subscriber – The object that will be called by it’s
push_event()
method when an event has occurred. If instead of an object an integer is given then the subscription will result in internal buffer being populated when a new event is pushed (polling)
- Returns
The subscription ID as a sequential index nr starting from 0
.subscribing.subscription module
- class EventsPusher(queue: queue.Queue, handler: Optional[ska_ser_skallop.subscribing.base.MessageHandlerBase] = None)[source]
Bases:
ska_ser_skallop.subscribing.base.EventsPusherBase
Object that pushes events onto a given buffer when called by a push event
- handler: Optional[ska_ser_skallop.subscribing.base.MessageHandlerBase]
- push_event(event: ska_ser_skallop.subscribing.base.EventDataInt) None [source]
Called by a callback from a producer with a new incoming event and results in a new “class”EventItem being created and placed on the provided queue
- Parameters
event – the incoming event
- set_subscription(subscription: ska_ser_skallop.subscribing.base.SubscriptionBase) None [source]
This method is used by the calling
Subscription
object during the initiation of a subscription on aProducer
. Because object is needed as part of the subscription call, the subscription can only be created after theEventsPusher
and thus requiring this method.
- class Subscription(producer: ska_ser_skallop.subscribing.base.Producer, attr: str, handler: ska_ser_skallop.subscribing.base.MessageHandlerBase, master: bool = False)[source]
Bases:
ska_ser_skallop.subscribing.base.SubscriptionBase
class that ties a subscription to a producer in order to keep record of subscriptions. It handles the basic subscribing and unsubscribing behaviour to the producer.
- buffer_size = 100
- describe() Tuple[str, str, Optional[int]] [source]
Describe it self in terms of a tuple of items describing key attributes about the subscription in the following order:
Producer Name
Attribute Name
Subscripton id (may be None if a subscription has been removed or not executed yet)
- Returns
the tuple of items
- get_event_pushing_logs() str [source]
returns any logs generated by the events pusher object used as a subscriber onto a producer during callbacks
- Returns
the diagnostic logs
- get_handler_logs() str [source]
returns logs generated by the handler during the handling of events to do with this particular subscription
- Returns
the logs messages
- get_internal_logs() str [source]
returns any internal logs generated by its tracing object for diagnostic purposes
- Returns
the internal logs
- handle_timedout(*args, **kwargs) str [source]
Delivers diagnostic information about a subscription in case of a timeout
- Returns
the diagnostic data
- poll() List[ska_ser_skallop.subscribing.event_item.EventItem] [source]
Used when a polling based subscription is made on a producer and gets any internally buffered events that may have occurred since the last time it was queried.
- Returns
A list (empty if none was generated) of
EventItem
instances
- subscribe_buffer(buffersize: Optional[int] = 100) None [source]
Registers a subscription on its Producer by means of an internal events buffer that holds newly generated events temporarily until collected with the poll
- Parameters
buffersize – the size of the internal buffer to be used by the producer, defaults to buffer_size
- subscribe_by_callback(board: queue.Queue) None [source]
Registers a subscription on its Producer by means of a callback being called on a provided object that will result in placing the event as an
EventItem
on the provided queue- Parameters
board – the queue that must be used by the callback object to place new
EventItem
instances on
Module contents
Subscribing allows the monitoring of events generated from diverse producers at a central place. If you have producer A and B, for which you expect over the course of a test to produce events 1,2,3; then you set up a message board to which these producers can post if and when they generate the expected event. Then at the appropriate time, you can access these events, check their order, analyse their timing and or verify their content within your test.
The example below illustrates the concept:
import logging
import sys
from ska_ser_skallop.connectors import configuration
from ska_ser_skallop.subscribing import configuration as sub_conf
from ska_ser_skallop.subscribing.base import MessageBoardBase
from ska_ser_skallop.subscribing.message_handler import MessageHandler
logger = logging.getLogger(__name__)
def subscribe(board: MessageBoardBase):
# adds two subscriptions
board.add_subscription(
configuration.get_producer("sys/tg_test/1", fast_load=True),
"state",
MessageHandler(board),
)
board.add_subscription(
configuration.get_producer("sys/tg_test/1", fast_load=True),
"short_scalar",
MessageHandler(board),
)
def handle_events(board: MessageBoardBase, option):
# handle next item on the messageboard...
if option == "handle most recent":
next(board.get_items(timeout=1)).handle_event()
# or if you want to get the next 2 events
elif option == "block until all handled":
for index, item in enumerate(board.get_items()):
item.handle_event()
print(item.handler.print_event(item.event))
if index == 2:
board.remove_all_subscriptions()
def set_logging_for_stdout():
handler = logging.StreamHandler(sys.stdout)
handler.setLevel(logging.DEBUG)
formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
handler.setFormatter(formatter)
logger.addHandler(handler)
if __name__ == "__main__":
set_logging_for_stdout()
brd = sub_conf.get_messageboard()
subscribe(brd)
handle_events(brd, "block until all handled")