Source code for ska_ser_log_transactions.transactions

# -*- coding: utf-8 -*-

"""This module provides the transaction logging mechanism."""
import json
import logging
import os
from random import randint
from typing import Mapping, Optional, Text

from ska_ser_skuid.client import SkuidClient


[docs]class TransactionBase: """Transaction context handler. Provides: | - Transaction ID:: | * Re-use existing transaction ID, if available | * If no transaction ID, or empty or None, then generate a new ID | * context handler returns the transaction ID used | - Log messages on entry, exit, and exception .. code-block:: python def command(self, parameter_json): parameters = json.reads(parameter_json) with transaction('My Command', parameters) as transaction_id: # ... parameters['transaction_id'] = transaction_id device.further_command(json.dumps(pars)) # ... def command(self, parameter_json): parameters = json.reads(parameter_json) with transaction('My Command', parameters, transaction_id="123") as transaction_id: # ... parameters['transaction_id'] = transaction_id device.further_command(json.dumps(pars)) # ... def command(self, parameter_json): parameters = json.reads(parameter_json) parameters["txn_id_key"] = 123 with transaction('My Command', parameters, transaction_id_key="txn_id_key") as transaction_id: # ... parameters['transaction_id'] = transaction_id device.further_command(json.dumps(pars)) # ... | Log message formats: | On Entry: | Transaction[id]: Enter[name] with parameters [arguments] marker[marker] | On Exit: | Transaction[id]: Exit[name] marker[marker] | On exception: | Transaction[id]: Exception[name] marker[marker] | Stacktrace """
[docs] def __init__( self, name: str, params: dict = {}, transaction_id: str = "", transaction_id_key: str = "transaction_id", logger: Optional[logging.Logger] = None, # pylint: disable=E1101 ): """Create the transaction context handler. A new transaction ID is generated if none is passed in via `transaction_id` or in `params`. If there is a transaction ID in `params` and `transaction_id` is also passed in then the passed in `transaction_id` will take precedence. However, if both transaction IDs provided in `params` and `transaction_id` are deemed invalid (not a string or an empty string), then a new transaction ID will be generated. By default the key `transaction_id` will be used to get a transaction ID out of `params`. If a different key is required then `transaction_id_key` can be specified. :param name: A description for the context. This is usually the Tango device command. :type name: str :param params: The parameters will be logged and will be used to retrieve the transaction ID if `transaction_id` is not passed in, defaults to {} :type params: dict :param transaction_id: The transaction ID to be used for the context, defaults to "" :type transaction_id: str :param transaction_id_key: The key to use to get the transaction ID from params, defaults to "transaction_id" :type transaction_id_key: str :param logger: The logger to use for logging, by default None. If no logger is specified a new one named `ska.transaction` will be used. :type logger: Optional[logging.Logger], optional :raises TransactionParamsError: If the `params` passed is not valid. """ if not isinstance(params, Mapping): raise TransactionParamsError("params must be dict-like (Mapping)") if logger: self.logger = logger else: self.logger = logging.getLogger("ska.transaction") # pylint: disable=E1101 self._name = name self._params = params self._transaction_id_key = transaction_id_key self._transaction_id = self._get_id_from_params_or_generate_new_id(transaction_id) # Used to match enter and exit when multiple devices calls the same command # on a shared device simultaneously self._random_marker = str(randint(0, 99999)).zfill(5) if transaction_id and params.get(self._transaction_id_key): self.logger.info( f"Received 2 transaction IDs {transaction_id} and" f" {params.get(transaction_id_key)}, using {self._transaction_id}" )
[docs] def log_entry(self): """Log the entry message""" params_json = json.dumps(self._params) self.logger.info( f"Transaction[{self._transaction_id}]: Enter[{self._name}] " f"with parameters [{params_json}] " f"marker[{self._random_marker}]" )
[docs] def log_exit(self, exc_type): """Log the exit message and exception if it occurs :param exc_type: Exception type :type exc_type: exception_type """ if exc_type: self.logger.exception( f"Transaction[{self._transaction_id}]: Exception[{self._name}] " f"marker[{self._random_marker}]" ) self.logger.info(f"Transaction[{self._transaction_id}]: Exit[{self._name}] " f"marker[{self._random_marker}]") if exc_type: raise # pylint: disable=E0704
def _get_id_from_params_or_generate_new_id(self, transaction_id): """At first use the transaction_id passed or use the transaction_id_key to get the transaction ID from the parameters or generate a new one if it's not there. :param transaction_id: The transaction ID :type transaction_id: String :return: A transaction ID :rtype: String """ _transaction_id = transaction_id if transaction_id else self._params.get(self._transaction_id_key) if not self._is_valid_id(_transaction_id): _transaction_id = self._generate_new_id() self.logger.info(f"Generated transaction ID {_transaction_id}") return _transaction_id def _is_valid_id(self, transaction_id): """Check if the ID is valid :param transaction_id: The transaction ID :type transaction_id: String :return: Whether the ID is valid or not :rtype: boolean """ if isinstance(transaction_id, Text) and transaction_id.strip(): return True return False def _generate_new_id(self): """Use TransactionIdGenerator to generate a new transaction ID :return: The transaction ID :rtype: String """ id_generator = TransactionIdGenerator() return id_generator.next() # pylint: disable=E1102
[docs]class Transaction(TransactionBase): def __enter__(self): """Context handler entry :return: The transaction ID :rtype: String """ self.log_entry() return self._transaction_id def __exit__(self, exc_type, exc_val, exc_tb): """Context handler exit :param exc_type: The exception type :type exc_type: exception_type :param exc_val: The exception value :type exc_val: exception_value :param exc_tb: The exception traceback :type exc_tb: exception_traceback """ self.log_exit(exc_type)
[docs]class AsyncTransaction(TransactionBase): async def __aenter__(self): """Asynchronous context handler entry :return: The transaction ID :rtype: String """ self.log_entry() return self._transaction_id async def __aexit__(self, exc_type, exc_val, exc_tb): """Asynchronous context handler exit :param exc_type: The exception type :type exc_type: exception_type :param exc_val: The exception value :type exc_val: exception_value :param exc_tb: The exception traceback :type exc_tb: exception_traceback """ self.log_exit(exc_type)
[docs]class TransactionIdGenerator: """ TransactionIdGenerator retrieves a transaction id from skuid. Skuid may fetch the id from the service if the SKUID_URL is set or alternatively generate one. """
[docs] def __init__(self): if os.environ.get("SKUID_URL"): client = SkuidClient(os.environ["SKUID_URL"]) self._get_id = client.fetch_transaction_id else: self._get_id = SkuidClient.get_local_transaction_id
def next(self): return self._get_id()
[docs]class TransactionParamsError(TypeError): """Invalid data type for transaction parameters."""