.tangobridge package
Module contents
Implements a tango bridge to a tango gql service for rest and websocket connections.
The the tango bridge is essentially a facade in front of a rest and and websocket controller, each one responsible for monitoring the connection health and ensuring successful calls to the services. The websocket service indirectly implements tango based subscriptions to devices that gets used by a subscribing module before being access by the tango bridge.
Submodules
.authentication module
Handles user authentication to obtain access to tango gql services.
- class AuthenticatedUser(cookies: Any, username: str, password: str)[source]
Bases:
NamedTuple
Bundle authenticated user values as a singel object.
- property auth: bool
Whether the object represents a successful authentication.
- Returns
True if authentication successful
- cookies: Any
Alias for field number 0
- class Authenticator(env: ska_ser_skallop.connectors.remoting.tangobridge.configuration.Environment)[source]
Bases:
object
Generates an authenticated user.
- get_authenticated_user() ska_ser_skallop.connectors.remoting.tangobridge.authentication.AuthenticatedUser [source]
Return authenticated user values for gaining access to tango gql services.
- Raises
AuthException – when authentication unsuccessful
- Returns
authenticated user values as a AuthenticatedUser object.
.configuration module
Manage configuration settings and credentials for connecting to tango gql services.
- class CredentailsDict[source]
Bases:
TypedDict
Bundle user username and password as single dictionary object.
- class Credentials(username: str, password: str)[source]
Bases:
NamedTuple
Bundle user username and password as single object.
- asdict() ska_ser_skallop.connectors.remoting.tangobridge.configuration.CredentailsDict [source]
Return credentials as a dictionary.
- Returns
the credentials as a dictionary
- class Environment(username: str, password: str, domain: str, kube_branch: str, telescope: str, kubehost: str, tango_bridge_ip: Optional[str] = None, bypass_auth: bool = False, kube_namespace: Optional[str] = None)[source]
Bases:
NamedTuple
Stores environment settings as a collection into a single object.
- get_credentials() Optional[ska_ser_skallop.connectors.remoting.tangobridge.configuration.Credentials] [source]
Generate a Credentials object if exists.
- Returns
the credentials if they exist otherwise return None.
- class Settings(service_name: str, tangogql: str)[source]
Bases:
NamedTuple
Bundle service name and tango gql end point.
- get_env() ska_ser_skallop.connectors.remoting.tangobridge.configuration.Environment [source]
Collect the relevant environment settings from host env.
The following environmental settings is relevant for remoting:
KUBE_HOST: the domain name hosting the k8 cluster (default k8s.skao.stfc)
BYPASS_AUTH: whether authentication must be skipped (default is not to skip)
TARANTA_USER: the taranta username needed for authentication on tangogql service
TARANTA_PASSWORD: the taranta password needed for authentication on tangogql service
DOMAIN: The type of environment in which SUT is deployed nl: branch/integration/staging
KUBE_BRANCH: The branch name (in case of branch domain) used for the k8 deployment
TANGO_BRIDGE_IP: an alternative raw ip address to use for connecting to k8 cluster
Note KUBE_BRANCH is needed only when the DOMAIN variable is set to branch. The branch value is the name of the git branch used to deploy an instance of the SUT. If TANGO_BRIDGE_IP is given then the DOMAIN and KUBE_BRANCH variables become irrelevant.
- Returns
The environment settings as a single Environment object
- get_tango_gql_rest_url(settings: ska_ser_skallop.connectors.remoting.tangobridge.configuration.Settings, env: ska_ser_skallop.connectors.remoting.tangobridge.configuration.Environment) str [source]
Generate a tango gql rest service url from a given environment and tangogql settings.
- Parameters
settings (Settings) – The settings used to name the tangogql service
env (Environment) – The environment values obtained from the host.
- Returns
A tangogql rest http endpoint
- get_tango_gql_ws_url(settings: ska_ser_skallop.connectors.remoting.tangobridge.configuration.Settings, env: ska_ser_skallop.connectors.remoting.tangobridge.configuration.Environment) str [source]
Generate a tango gql websocket service url from a given environment and tangogql settings.
- Parameters
settings (Settings) – The settings used to name the tangogql service
env (Environment) – The environment values obtained from the host.
- Returns
A tangogql websocket endpoint
.control module
Enable conccurent monitoring and control as both asyncio routines or concurrent threads.
- class BaseControllerTasks[source]
Bases:
object
Base object containing the common methods to handle a list of Tasks/Futures.
- cancelled_tasks: List[Union[_asyncio.Task, ska_ser_skallop.connectors.remoting.tangobridge.control.NamedFuture]] = []
- done_tasks: List[Union[_asyncio.Task, ska_ser_skallop.connectors.remoting.tangobridge.control.NamedFuture]] = []
- failed_tasks: List[BaseException] = []
- raise_any_exceptions()[source]
Raise any exceptions generated from failed tasks.
- Raises
Exception – The exception with the list of task exceptions
- submitted_tasks: List[Union[_asyncio.Task, ska_ser_skallop.connectors.remoting.tangobridge.control.NamedFuture]] = []
- class Controller[source]
Bases:
object
Class to manage running asyncio tasks on a separate thread.
The controller creates a separate deamon thread named “asyncio”. The deamon thread contains an asyncio loop that is used to manage asyncio tasks.
Thus the controller allows for a separate thread to run asyncio operations within a pytest environment. These asyncio operations can then be used by the Tango bridge for calls to external services.
The controller allows for two separate ways of achieving concurrency:
From within a deamon like asynchronous routine that is running on the controller thread
From within the main thread as a dispatched Future resulting from running the async routine on that thread.
The most typical use case is thus to first create a main asyncio routine that works like a deamon loop waiting for all asyncio tasks to complete in a gather command.
controller = Controller() async def main(): do_a = controller.create_async_task(doA()) do_b = controller.create_async_task(doB()) await asyncio.gather(do_a, do_b) async def doA(): await asyncio.sleep(1) async def doB(): await asyncio.sleep(2)
The main routine is then executed as a concurrent thread loaded onto the controller:
main_thread = controller.dispatch_concurrent_routine(main()) main_thread.result() # wait until doA and doB have completed # gracefully tear down all tasks and futures that may still be running controller.stop()
Note the controller has the ability to cancel all pending tasks (and Futures) when it gets terminated via the stop command (which is registered on the atexit method). This will result in a cancelled exception on an asyncio routine.
- create_async_task(routine: Coroutine[Any, Any, ska_ser_skallop.connectors.remoting.tangobridge.control.T], name=None) _asyncio.Task [source]
Run an asynchronous routine concurrently within a given loop.
Note this must be called from within a currently executing routine within that loop.
- Parameters
routine (Coroutine[Any, Any, T]) – the routine to run
name (str) – The name to be given to the task, defaults to None which will result in the name being the same as the coroutine function name.
- Returns
the coroutine wrapped as a future allowing asynchronous awaiting
- dispatch_concurrent_routine(routine: Coroutine[Any, Any, ska_ser_skallop.connectors.remoting.tangobridge.control.T], name='') concurrent.futures._base.Future[ska_ser_skallop.connectors.remoting.tangobridge.control.T] [source]
Dispatch a separate thread to run an asynchronous task on its event loop.
- Parameters
routine (Coroutine[Any, Any, T]) – an asynchronous routine that will be run on an event loop in a separate thread. Note any running or pending tasks in the thread will be cancelled during tear down of the thread.
name (str) – The name to be given to the task, defaults to “” which will result in the name being the same as the coroutine function name.
- Returns
a Future representing the concurrent execution of the task, that can be waited upon at some later time to get the result.
- get_loop() asyncio.events.AbstractEventLoop [source]
Return the event loop being used to generate async tasks on the thread.
- Returns
the event loop.
- run_async_task(routine: Coroutine[Any, Any, ska_ser_skallop.connectors.remoting.tangobridge.control.T], name='', timeout=100) ska_ser_skallop.connectors.remoting.tangobridge.control.T [source]
Run an async task on a separate controller thread.
Block until the task has finished or raised an exception.
- Parameters
routine (Coroutine[Any, Any, T]) – an asynchronous routine that will be run on an event loop in a separate thread. Note any running or pending tasks in the thread will be cancelled during tear down of the thread.
timeout – A maximum amount of time to wait for the result of the task, default is 100s
name (str) – The name to be given to the task, defaults to “” which will result in the name being the same as the coroutine function name.
- Raises
TimeoutError – if the task did not return within given timeout period
- Returns
the result of the asynchronous task
- task_polling_period = 0.2
- class ControllerState[source]
Bases:
NamedTuple
Represent a Controller state as a tuple of running and not running (mutually exclusively).
- not_running = <threading.Event object>
- running = <threading.Event object>
- class Futures[source]
Bases:
ska_ser_skallop.connectors.remoting.tangobridge.control.BaseControllerTasks
A subsclass of BaseControllerTasks as a set of Future tasks.
- add(future: concurrent.futures._base.Future, name='')[source]
Add a new Future to be monitored.
- Parameters
future (Future) – The conccurent future to monitor
name (str) – A name to give to the future for logging purposes, deafult to “”
- cancel_pending_and_join()[source]
Cancel any pending Futures and join the remaining ones until they have finished.
- cancelled_tasks: List[ska_ser_skallop.connectors.remoting.tangobridge.control.NamedFuture] = []
- done_tasks: List[ska_ser_skallop.connectors.remoting.tangobridge.control.NamedFuture] = []
- failed_tasks: List[BaseException] = []
- submitted_tasks: List[ska_ser_skallop.connectors.remoting.tangobridge.control.NamedFuture] = []
- class MonitoredTask(future: concurrent.futures._base.Future, result: Any = 'pending', exception: Union[None, Exception] = None)[source]
Bases:
NamedTuple
Represents a conccurrent task being monitored.
- future: concurrent.futures._base.Future
Alias for field number 0
- result: Any
Alias for field number 1
- class NamedFuture(future: concurrent.futures._base.Future[Any], name: str)[source]
Bases:
NamedTuple
Wraps a Conccurent Future as a Future class with a name.
- block_until_complete(timeout: float) ska_ser_skallop.connectors.remoting.tangobridge.control.Outcome [source]
Block any further execution until future has completed.
- Parameters
timeout (float) – [description]
- Returns
The outcome as a combination of the result and any exceptions raised within Future
- cancel() bool [source]
Cancel the future if possible.
Returns True if the future was cancelled, False otherwise. A future cannot be cancelled if it is running or has already completed.
- Returns
True if the Future was successfully cancelled
- cancelled()[source]
Return True if the future was cancelled.
- Returns
True if the future was cancelled
- done() bool [source]
Return True if the future was cancelled or finished executing.
- Returns
Return True if the future was cancelled or finished executing
- future: concurrent.futures._base.Future[Any]
Alias for field number 0
- poll_finished_with_exception(ignore_cancelled=True) Union[None, BaseException] [source]
Check if the future has finished but ended with an exception.
- Parameters
ignore_cancelled (bool) – Whether cancelled errors should be treated as an exception, default to True - meaning cancelled errors will be ignored
- Returns
None if task has not finished yet or finished but no exception was raised otherwise returns the actual exception object.
- class Outcome(result: Any, cancelled: Union[None, concurrent.futures._base.CancelledError], exception: Union[None, Exception])[source]
Bases:
NamedTuple
Bundles a future result with any exceptions raised from it.
- result: Any
Alias for field number 0
- T
- sphinx-autodoc can’t document typevars properly right now,
see https://github.com/agronholm/sphinx-autodoc-typehints/issues/39
A generic type for elements of an iterable.
- Type
alias of TypeVar(‘T’)
- class Tasks[source]
Bases:
ska_ser_skallop.connectors.remoting.tangobridge.control.BaseControllerTasks
A subsclass of BaseControllerTasks as a set of asyncio tasks.
- add(task: _asyncio.Task)[source]
Add a new task to be monitored.
- Parameters
task (asyncio.Task) – the asyncio Task to be monitored
- async cancel_pending()[source]
Cancel any pending asyncio tasks.
This will cause the task to raise a cancelled Error and finish.
- cancelled_tasks: List[_asyncio.Task] = []
- done_tasks: List[_asyncio.Task] = []
- failed_tasks: List[BaseException] = []
- submitted_tasks: List[_asyncio.Task] = []
- cancel_future(future: concurrent.futures._base.Future, timeout=1)[source]
Attempt to cancel a future task in case it is still pending.
Will remain block if the future is running until it is completed
- Parameters
future (Future) – The future that needs to be canceled
timeout (int, optional) – The maximum amount of time to wait for future to finish, defaults to 1
.factories module
Provide Factories for creating implementation of tangobridge components.
- class AbstractFactory[source]
Bases:
object
Abstract Factory for generating remote tangobridge components.
- class TBridgeFactory[source]
Bases:
ska_ser_skallop.connectors.remoting.tangobridge.factories.AbstractFactory
Implementation of Abstract factory for for prodiving tangbridge components.
- property authenticated_user: ska_ser_skallop.connectors.remoting.tangobridge.authentication.AuthenticatedUser
Return the authenticated user.
If this is the first time a new authentication process will run, otherwise the existing authenticated user will eb returned.
- Returns
the authenticated user
- property env
Get the host environment.
- Returns
the host environment
- generate_async_queue() asyncio.queues.Queue [source]
Generate an asyncio queue that is used by the controller asyncio thread.
An asyncio queue can be used as a message bus between tasks running in the asyncio thread and the main thread.
Note this requires an existing controller instance and will geerate an AssertionError if the the controller was not generated.
- Returns
asyncio queue that is used by the controller asyncio thread.
- Raises
AssertionError – when method called before a controller was initiated.
- get_controller() ska_ser_skallop.connectors.remoting.tangobridge.control.Controller [source]
Return the Controller object to use for managing asyncio Tasks.
If the Controller instance does not exist then a instance of the asyncio thread will be created together with the initialisation process.
- Returns
[description]
- get_graphql_client(*args, **kwargs) ska_ser_skallop.connectors.remoting.tangobridge.factories.TangoGQLClient [source]
Construct a graphql client object.
- Parameters
args – additional positional arguments for the graphql client
kwargs – additional keyword arguments for the graphql client
- Returns
the graphql client object
- get_new_authenticated_user() ska_ser_skallop.connectors.remoting.tangobridge.authentication.AuthenticatedUser [source]
Generate an authentication process using the given env variables.
- Returns
The authentication result as a authenticated user data object.
- get_tango_gql_rest_url() str [source]
Return the url for calling the tango gql.
- Returns
The url for calling the tango gql.
- get_tango_gql_service_url()[source]
Return the url for calling the tango gql service.
- Returns
the url for calling the tango gql service.
- get_tango_gql_ws_url() str [source]
Return the url for calling the tango gql websocket.
- Returns
the url for calling the tango gql websocket.
- get_websockets()[source]
Return the websocket module to use for websocket connections.
- Returns
the websocket module to use for websocket connections.
- settings = Settings(service_name='taranta', tangogql='graphiql')
.parsing module
Parse gql rest based results.
- exception ParseError[source]
Bases:
Exception
Signal an error occurred in parsing the data returned from gql query.
- class Parser(data: Dict)[source]
Bases:
object
Object use for parsing input data in a piece wise and recursive fashion.
The
parse()
method results in the object traversing in a chain like fashion within the dictionary structure until the data being pointed to is of type string (meaning it has reached the end of the path).For example:
data = { 'this': { 'is': { 'a': { 'very': { 'long': { 'chain': 'This is the end of the chain' } } } } } } parser = Parser(data) assert_that( parser.parse('this').parse('is').parse('a').parse('very').parse('long').parse('chain').value )is_equal_to('This is the end of the chain')
Each parse method has the option of a given default which means if there does not exist a given key, the default object will be used. Otherwise a ParseError will be raised.
- parse(key: str, default=None) ska_ser_skallop.connectors.remoting.tangobridge.parsing.Parser [source]
Parse the data by traversing to the next item returned from the key.
If the result is of type string then the value is set to that result, otherwise the data is updated to point to the returned result for parsing further.
- Parameters
key (str) – The key to lookup the value in the dictionary
default (Any) – The default value to used if key does not exists if none is given then a missing key will result in a parse error, defaults to None
- Raises
ParseError – When the given key does not exist and no default value was given.
- Returns
Itself so as to allow for chaining a parse execution
- property value_as_singleton: Any
Return the first item in the parsed result (assumes result is a list).
This is usefull in cases whereby it is known beforehand that the result will always be a list with only one element in it.
- Returns
The inner value from the list
- parse(data: Dict, key: str, default=None) ska_ser_skallop.connectors.remoting.tangobridge.parsing.Parser [source]
Parse a given input dictionary.
E.g. :
data = { 'this': { 'is': { 'a': { 'very': { 'long': { 'chain': 'This is the end of the chain' } } } } } } assert_that( parse('this').parse('is').parse('a').parse('very').parse('long').parse('chain').value )is_equal_to('This is the end of the chain')
- Parameters
data (Dict) – The input data
key (str) – The key pointing to the first item for traversing the parsing.
default (Any, optional) – Whether to return a default value if the key does not exist, defaults to None
- Returns
A Parser object that can be traversed further of read to get the actual value from
.queries module
Defines tango gql queries to use on rest based calls to the tango gql service.
- class GQLQuery(query: str, variables: Optional[Dict[Any, Any]])[source]
Bases:
NamedTuple
Bundles a gql query into a query (str) and its variables as a NamedTuple.
- variables: Optional[Dict[Any, Any]]
Alias for field number 1
- command(device: str, command_name: str, argin: Optional[Any] = None) ska_ser_skallop.connectors.remoting.tangobridge.queries.GQLQuery [source]
Return a gql command for commanding a device with given input arguments.
- fetch_attributes(device: str) ska_ser_skallop.connectors.remoting.tangobridge.queries.GQLQuery [source]
Return a gql query for fetching all the attributes for a given device.
- Parameters
device (str) – The tango device name
- Returns
The gql query for fetching all the attributes for a given device.
- fetch_commands(device: str) ska_ser_skallop.connectors.remoting.tangobridge.queries.GQLQuery [source]
Return a gql query for fetching all the commands from a given device.
- Parameters
device (str) – The tango device name
- Returns
The gql query for fetching all the commands from a given device.
- get_device_state(device_name: str)[source]
Return a message to get state for a particular device.
- Parameters
device_name (str) – The tango device name
- Returns
The message to get state for a particular device.
- load_all_attributes(device_name: str)[source]
Return a message to load all attribute for a given device.
- Parameters
device_name (str) – The tango device name
- Returns
The message to load all attribute for a given device.
- read_attribute(device: str, attr_name: str) ska_ser_skallop.connectors.remoting.tangobridge.queries.GQLQuery [source]
Return a gql query for reading the value from a given attribute from a given device.
- read_attributes_from_multiple_devices(device_list: List[str], attr: Union[str, List[str]]) ska_ser_skallop.connectors.remoting.tangobridge.queries.GQLQuery [source]
Return a gql query for reading attribute/s from a given list of devices.
If the given attribute is in the form of a list then the attributes and devices, are paired (zipped) as respective reads for each device.
- rest_info()[source]
Return a message to query health status of tango gql.
- Returns
The message to query health status of tango gql.
- subscribe_device(device: str, attribute: str) str [source]
Return a subscription message for subscribing to a tango device attribute.
- write_attribute(device: str, attribute: str, value: Any)[source]
Return a gql query for writing a given attribute value to a device.
- Parameters
device – The tango device name
attribute – The device attribute
value – The value to be written to the device attribute
- Returns
The gql query for setting an attribute
.restcontrol module
Manages rest type calls to a tango gql interface.
- exception RestCallError[source]
Bases:
Exception
Indicates an error during a rest based call to tango gql.
- class RestController(factory: ska_ser_skallop.connectors.remoting.tangobridge.factories.TBridgeFactory = <ska_ser_skallop.connectors.remoting.tangobridge.factories.TBridgeFactory object>)[source]
Bases:
object
Monitors and controls the tango gql connection for rest base calls.
- call_graphql(query: str, variables: Optional[Dict[Any, Any]] = None, operation_name: Optional[str] = None, **kwargs) Any [source]
Call a graph gql based query to the tango gql service.
If the tango gql is not available at the time of call, the program will block for twice the allocated montoring poll period before raising a RestCallError. In other words it will allow for the montiring loop to check two more times if the service, becomes available before raising an exception.
- Parameters
query (str) – The tango gql query as an encoded string
variables (Union[Dict[Any, Any], None], optional) – Any graph gql variables to associate with the query, defaults to None
operation_name (Union[str, None], optional) – The gql operation to use, defaults to None
kwargs – Any additional keyword arguments to pass on to the tangogql connector
- Raises
RestCallError – If call did not succeed within allocated waiting period
ClientResponseError – If service responded with an unhandled exception
- Returns
The result of the call
- monitor_polling_period = 5
- exception RestHealthError[source]
Bases:
TimeoutError
Indicates a timeout waiting for tango gql rest interface to be healthy.
- class RestStatus[source]
Bases:
NamedTuple
Bundle rest status as a monitoring and tangogql Event Tuple.
- monitoring = <threading.Event object>
- tangogql_healthy = <threading.Event object>
.subscribing module
Module for handing and initiating subscriptions to device attributes on application layer.
- class AbstractCallBackWrapper(callback: Union[ska_ser_skallop.subscribing.base.Subscriber, int, Callable[[ska_ser_skallop.subscribing.base.EventDataInt], None]])[source]
Bases:
object
Abstraction of a callback wrapper object.
This allows for different kinds of callback implementations initiated by a handler refering to them only in a generic way.
- abstract run_callback(event: ska_ser_skallop.subscribing.base.EventDataInt)[source]
Run a predetermined callback on the given event.
- Parameters
event (base.EventDataInt) – [description]
- class BufferedCallBackWrapper(buffer_size: int)[source]
Bases:
ska_ser_skallop.connectors.remoting.tangobridge.subscribing.AbstractCallBackWrapper
Wraps callbacks as a buffer that will be populated whenever an event has occurred.
- get_events() List[ska_ser_skallop.subscribing.base.EventDataInt] [source]
Retrieve any events currently generated and placed on the queue.
- Returns
a list of events placed on the queue.
- run_callback(event: ska_ser_skallop.subscribing.base.EventDataInt)[source]
Run a predetermined callback on the given event.
- Parameters
event (base.EventDataInt) – [description]
- class CallBackWrapper(callback: Callable[[ska_ser_skallop.subscribing.base.EventDataInt], None])[source]
Bases:
ska_ser_skallop.connectors.remoting.tangobridge.subscribing.AbstractCallBackWrapper
Wraps callbacks as functions to be called on a given event.
- run_callback(event: ska_ser_skallop.subscribing.base.EventDataInt)[source]
Run a predetermined callback on the given event.
- Parameters
event (base.EventDataInt) – [description]
- class DeviceAttribute(device: str, attr: str)[source]
Bases:
NamedTuple
Bundles device name and attribute.
- class DeviceAttributeEventsProducer[source]
Bases:
object
Produce events to subcribers for a particular subscription.
- add_callback(callback: Union[ska_ser_skallop.subscribing.base.Subscriber, int, Callable[[ska_ser_skallop.subscribing.base.EventDataInt], None]]) int [source]
Add a new callback/subscriber for a given subscription.
- Parameters
callback (Union[base.Subscriber, int, Callable[[base.EventDataInt], None]]) – the callback to use.
- Returns
the subscription id to identify the callback subscription with
- get_events(subscription_id: int) List[ska_ser_skallop.subscribing.base.EventDataInt] [source]
Get events for a particular subscription that have been buffered.
Note this method assumes the subscription id identifies a subscription for which a buffer size have been given and thus generated a BufferedCallBackWrapper. (see
BufferedCallBackWrapper
)- Parameters
subscription_id (int) – [description]
- Raises
WrongSubscription – [description]
WrongSubscription – [description]
- Returns
[description]
- remove_callback(subscription_id: int) None [source]
Remove a callback subscription.
- Parameters
subscription_id (int) – [description]
- run_callbacks(event: ska_ser_skallop.subscribing.base.EventDataInt)[source]
Call all subscribers (callbacks) that have been subsribing for events.
- Parameters
event (base.EventDataInt) – The event to be handled by the call back methods
- class DeviceAttributeSubscriber(ws_controller: ska_ser_skallop.connectors.remoting.tangobridge.wscontrol.WSController, polling_rate: float = 15)[source]
Bases:
ska_ser_skallop.connectors.remoting.tangobridge.base.Subscriber
Manage subscriptions to tango gql for device attribute events.
- add_subscription(device_name: str, attribute: str, callback: ska_ser_skallop.connectors.remoting.tangobridge.subscribing.DeviceSubscriptionCallback) int [source]
Create a new subscription for which a callback must be called when an event occurs.
- get_events(device_name: str, attribute: str, subscription_id: int) List[ska_ser_skallop.subscribing.base.EventDataInt] [source]
Receive events for a particular “buffered” subscription that stored them in a buffer.
- Parameters
- Raises
WrongSubscription – When the particular subscription is not a buffered type
- Returns
The list of events generated up till now.
- push_event(event: Dict)[source]
Receive events from the selector coming from subscriptions to the websocket.
- Parameters
event (Dict) – [description]
- remove_subscription(device_name: str, attribute: str, subscription_id: int)[source]
Remove a subscription as identified by it’s id.
Note, even though the id is enough to locate and remove the subscription, the device attribute and name data is needed so as to allow for “piggy backing” same type of subscriptions as a single subscription to the tango gql service. Removing a subscription may thus not necessarily lead to a subscripion to the tangogql service being cancelled, but it will result in the particular callback not being called anymore.
- DeviceSubscriptionCallback
An subscribe argument and object indicating action to do after subscribe event occurs.
If a Subscriber is given, the action would be that of calling a ‘push_event” on the object. If an integer is given, the action would be that of populating a buffer up to the int value size. If a callable (function) is given, the function with be called with the EventDataInt as argument.
alias of
Union
[ska_ser_skallop.subscribing.base.Subscriber
,int
,Callable
[[ska_ser_skallop.subscribing.base.EventDataInt
],None
]]
- class ParseResult(key: DeviceAttribute, data: base.EventDataInt)[source]
Bases:
NamedTuple
Bundling of parsed event data as key and data.
key =
DeviceAttribute
and data =base.EventDataInt
.- data: ska_ser_skallop.subscribing.base.EventDataInt
Alias for field number 1
- key: ska_ser_skallop.connectors.remoting.tangobridge.subscribing.DeviceAttribute
Alias for field number 0
- class SubscriberCallBackWrapper(subscriber: ska_ser_skallop.subscribing.base.Subscriber)[source]
Bases:
ska_ser_skallop.connectors.remoting.tangobridge.subscribing.AbstractCallBackWrapper
Wraps callbacks as a subscriber object called when event occurred.
- run_callback(event: ska_ser_skallop.subscribing.base.EventDataInt)[source]
Run a predetermined callback on the given event.
- Parameters
event (base.EventDataInt) – [description]
- class SubscriptionHealth(device_name: str, attribute: str)[source]
Bases:
object
Object holding subscription health in a separate bundle.
- update_health(allowed_elapsed_time: int = 5)[source]
Check if subscription received any acknowledgements/keep alive messages within time.
If no messages occurred within the allowed_elapsed_time the health state is set to stale.
- Parameters
allowed_elapsed_time – The time period for which an acknowledgement from subscription should occur, defaults to 5
- class SubscriptionManager(ws_controller: ska_ser_skallop.connectors.remoting.tangobridge.wscontrol.WSController, polling_rate: float = 15)[source]
Bases:
ska_ser_skallop.connectors.remoting.tangobridge.base.Subscriber
Manages the health of subscriptions by listening in on events being produced from them.
- exception WrongSubscription[source]
Bases:
Exception
Exception when a wrong subscription have been returned to a waiting client.
- create_call_back_wrapper(callback: Union[ska_ser_skallop.subscribing.base.Subscriber, int, Callable[[ska_ser_skallop.subscribing.base.EventDataInt], None]]) ska_ser_skallop.connectors.remoting.tangobridge.subscribing.AbstractCallBackWrapper [source]
Create a callback wrapper based on the type of callback to be used.
- Parameters
callback (Union[base.Subscriber, int, Callable[[base.EventDataInt], None]]) – the callback to use
- Returns
The callback wrapper
- DeviceSubscriptionCallback
An subscribe argument and object indicating action to do after subscribe event occurs.
If a Subscriber is given, the action would be that of calling a ‘push_event” on the object. If an integer is given, the action would be that of populating a buffer up to the int value size. If a callable (function) is given, the function with be called with the EventDataInt as argument.
alias of
Union
[ska_ser_skallop.subscribing.base.Subscriber
,int
,Callable
[[ska_ser_skallop.subscribing.base.EventDataInt
],None
]]
.tangobridge module
Implements a bridge connection between a tango gql service and a client.
- class PollingBasedTangoBridge(*args: Any, **kwargs: Any)[source]
Bases:
ska_ser_skallop.connectors.remoting.tangobridge.tangobridge.TangoBridge
Type of Tangobridge class that does not make use of websocket.
- add_subscription(device_name: str, attribute: str, callback: Union[ska_ser_skallop.subscribing.base.Subscriber, int, Callable[[ska_ser_skallop.subscribing.base.EventDataInt], None]]) int [source]
Create a new subscription on the tangogql websocket service.
- Parameters
- Raises
NotImplementedError – _description_
- get_events(device_name: str, attribute: str, subscription_id: int) List[ska_ser_skallop.subscribing.base.EventDataInt] [source]
Get a list of current events generated on a particular subscription.
- Parameters
- Return type
List[base.EventDataInt]
- Raises
NotImplementedError – _description_
- reload_ws_connection() None [source]
Re connects to the websocket service.
- Raises
NotImplementedError – _description_
- remove_subscription(device_name: str, attribute: str, subscription_id: int) None [source]
Remove a given subscription from the tangogql websocket service.
- Parameters
- Raises
NotImplementedError – _description_
- property tango_subscriptions_healthy: bool
Indicate whether the tango gql websocket service is still available.
- Returns
Whether the tango gql websocket service is still available.
- Return type
- tear_down_ws_connection() None [source]
Tear down all the running threads related to the ws connection.
- Raises
NotImplementedError – _description_
- wait_for_tango_subscriptions_healthy(timeout=5) None [source]
Block until the tangogql websocket is healthy.
- Parameters
timeout – How long to wait until a timeout is thrown, defaults to 5
- Raises
NotImplementedError – _description_
- class TangoBridge(*args: Any, **kwargs: Any)[source]
Bases:
object
Class that realizes a connection to a tango gql interface.
The class provides a client with a REST interface to call gql queries on as well as creating and maintaining websocket subscriptions to tango event producers.
Note this class is a singleton and gets created and initialised only once.
- add_subscription(device_name: str, attribute: str, callback: Union[ska_ser_skallop.subscribing.base.Subscriber, int, Callable[[ska_ser_skallop.subscribing.base.EventDataInt], None]]) int [source]
Create a new subscription on the tangogql websocket service.
- call_graphql(query: str, variables: Optional[Dict[Any, Any]] = None, operation_name: Optional[str] = None, **kwargs) Any [source]
Call a gql structured rest query on a tango gql service.
- Parameters
- Returns
The result of the query
- Return type
Any
- get_events(device_name: str, attribute: str, subscription_id: int) List[ska_ser_skallop.subscribing.base.EventDataInt] [source]
Get a list of current events generated on a particular subscription.
- Parameters
- Returns
A list of events generated from the subscription
- Return type
List[base.EventDataInt]
- monitor_polling_period = 0.2
- remove_subscription(device_name: str, attribute: str, subscription_id: int)[source]
Remove a given subscription from the tangogql websocket service.
- settings = Settings(service_name='taranta', tangogql='graphiql')
- property tango_gql_healthy: bool
Indicate whether the tango gql rest service is still available.
- Returns
Whether the tango gql rest service is still available
- Return type
- property tango_subscriptions_healthy: bool
Indicate whether the tango gql websocket service is still available.
- Returns
Whether the tango gql websocket service is still available.
- Return type
- tear_down_rest_connection()[source]
Tear down all the running threads related to the rest connection.
- property url: str
Return the url used for connecting to a tangogql service.
- Returns
The url used for connecting to a tangogql service
- Return type
- get_tango_bridge(factory: ska_ser_skallop.connectors.remoting.tangobridge.factories.TBridgeFactory = <ska_ser_skallop.connectors.remoting.tangobridge.factories.TBridgeFactory object>)[source]
Generate a tango bridge object.
Note if env USE_ONLY_POLLING is set, a tangobridge that does not use ws will be created. :param factory: __ :returns: a Tangobridge object
.wscontrol module
Facilitates a tangogql websocket connection to a client.
- class BufferedSubscriber[source]
Bases:
ska_ser_skallop.connectors.remoting.tangobridge.base.Subscriber
Subscriber that places received events in a buffer for later retrieval.
Getting results from the buffer happens asynchronously.
- class MessageContext(outbox: asyncio.queues.Queue, inbox: asyncio.queues.Queue, subscribers: List[ska_ser_skallop.connectors.remoting.tangobridge.base.Subscriber] = [])[source]
Bases:
NamedTuple
Bundles a message inbox, outbox and subscribers list into a single object.
- inbox: asyncio.queues.Queue
Alias for field number 1
- outbox: asyncio.queues.Queue
Alias for field number 0
- subscribers: List[ska_ser_skallop.connectors.remoting.tangobridge.base.Subscriber]
Alias for field number 2
- class Selector(predicate: Callable[[Any], bool], name='')[source]
Bases:
ska_ser_skallop.connectors.remoting.tangobridge.base.Subscriber
Subscriber that funnels/filters incoming events to downstream subscribers.
- bind(loop)[source]
Bind this object to an asyncio loop for asynchronous waiting.
- Parameters
loop ([type]) – the loop belonging to an asynchronous thread
- async listen(stop: threading.Event)[source]
Listen asynchronously for events produced and push if selected.
Will stop listening when stop event is set.
- Parameters
stop (Event) – The stop event which will signal to task to stop listening.
- push_event(event: Any)[source]
Receive and handle the subscriber push event.
- Parameters
event (Any) – A event that the producer is required to push to subscribers
- subscribe(subscriber: ska_ser_skallop.connectors.remoting.tangobridge.base.Subscriber)[source]
Let subscribers subscribe to this object as a producer of selected events.
- Parameters
subscriber (Subscriber) – the subscriber to be called when event is selected
- class WSController(factory: ska_ser_skallop.connectors.remoting.tangobridge.factories.TBridgeFactory = <ska_ser_skallop.connectors.remoting.tangobridge.factories.TBridgeFactory object>)[source]
Bases:
ska_ser_skallop.connectors.remoting.tangobridge.base.MessagePusher
Monitors and controls a websocket connection.
- add_selector(selector: ska_ser_skallop.connectors.remoting.tangobridge.wscontrol.Selector)[source]
Add a selector (filter) to listen for incoming subscribed events.
The selector will push events to downstream subscribers when certain kind of events (as defined by the selector’s predicate function) have been received from the websocket.
- Parameters
selector (Selector) – [description]
- add_subscription(device: str, attribute: str) int [source]
Add a new subscription to the websocket based on events from a device attribute.
Note the websocket will only produce a new subscription if there does not already exist a subscription for the same device and attribute, otherwise it will just “piggyback” on an existing subscription.
- Parameters
- Returns
The subscription id to use for when a subscription needs to be removed (
remove_subscription()
).
- listen_to_websocket_health(subscriber: ska_ser_skallop.connectors.remoting.tangobridge.base.WSHealthSubscriber)[source]
Add a ws health subscriber that will receive ws health change events.
- Parameters
subscriber – The object to be called when an event occurs
- monitor_polling_period = 5
- push_message(item: Any)[source]
Push a new message to be send by the websocket being controlled.
- Parameters
item (Any) – The message to be send
- async push_message_routine(item: Any)[source]
Send an asynchronous message on the websocket (must be from asyncio thread).
- Parameters
item – The item to send
- Raises
TimeoutError – when a messages could not be send due to a faulty websocket remaining faulty for longer than 10 seconds
- remove_subscription(device: str, attribute: str) Union[None, int] [source]
Remove a subscription as identified by the given id.
Note a subscription will only be removed virtually if other subscriptions still exist to the same device and attribute. If no subscriptions to the same device and attribute remains, then the actual subscription will be removed.
- Parameters
- Returns
Returns empty if subscriptions still remain to the given device and attribute, otherwise will return the “base” subscription id upon which the subscriptions have been “piggy backed” on.
- tear_down_ws()[source]
Tear down montoring threads related to the websocket and close the ws connection.
- class WSHealthSelector[source]
Bases:
ska_ser_skallop.connectors.remoting.tangobridge.wscontrol.Selector
Specific Selector that looks at only WS health type of events.
- class Websocket(factory: ska_ser_skallop.connectors.remoting.tangobridge.factories.TBridgeFactory)[source]
Bases:
object
Manages a websocket connection and wraps the websocket api provided by a factory.
- async block_until_healthy(timeout=60)[source]
Asynchronously wait until a websocket is healthy.
- Parameters
timeout – the time to block until the task can not continue
- Raises
TimeoutError – When unhealthy longer than given timeout
- async connect()[source]
Asynchronously connects to a remove websocket service provider.
If the connection is not available the task will retry to connect every second until either a connection is successfull or the websocket is closed (
close()
).
- get_health_selector() ska_ser_skallop.connectors.remoting.tangobridge.wscontrol.Selector [source]
Create a health selector that will generate events related to websocket health.
- Returns
the created selector object
- get_messages() AsyncGenerator[Any, None] [source]
Get incoming websocket messages asynchronously as json decoded objects.
For example:
async for message in ws.get_messages(): handle_message(message)
- Yield
The json decoded message from the websocket
- healthy() bool [source]
Report the current observed health of websocket.
- Returns
Returns True if health is ok
- async monitor_ws(timeout=1)[source]
Asynchronously ping the web socket continuously until ws is closed (
close()
).Note this method assumes a health selector has already been defined as per
get_health_selector()
.- Parameters
timeout (int, optional) – How long to wait for a ping result to be received before deeming it as faulty, defaults to 1.
- async ping(timeout=1)[source]
Asynchronously ping the web socket.
- Parameters
timeout (int, optional) – How long to wait for a ping result to be received before deeming it as faulty, defaults to 1.
- async send(message: Union[str, bytes, Iterable[Any], AsyncIterable[Any]])[source]
Send an asynchronous message over the websocket.
- wait_until_healthy(timeout=1)[source]
Wait for a given period until a thread has observed the health of ws to be ok.
- Parameters
timeout (int, optional) – The time to wait, defaults to 1
- Raises
TimeoutError – when the wait for a websocket exceeds the given timeout