Source code for ska_sdp_config.config

"""High-level API for SKA SDP configuration."""

import functools
import logging
import os
import sys
import threading
import warnings
from datetime import date
from socket import gethostname
from typing import Callable, Iterable, Optional, Union

from . import backend as backend_mod
from . import entity
from .backend import Backend, DbTransaction, Lease, TxnWrapper, Watcher
from .base_transaction import BaseTransaction
from .operations import (
    ArbitraryOperations,
    ComponentOperations,
    ControllerOperations,
    DeploymentOperations,
    EntityOperations,
    ExecutionBlockOperations,
    FlowOperations,
    ProcessingBlockOperations,
    ScriptOperations,
    SubarrayOperations,
)

LOG = logging.getLogger(__name__)


[docs] class Config: """Connection to SKA SDP configuration.""" # pylint: disable=too-many-instance-attributes, too-many-arguments def __init__( self, backend=None, global_prefix: str = "", owner: dict | entity.Owner | None = None, component_name: str | None = None, wrapper: Optional[TxnWrapper] = None, owned_entity: tuple[str, str] | None = None, **cargs, ): """ Connect to configuration using the given backend. :param backend: Backend to use. Defaults to environment or etcd3 if not set. :param global_prefix: Prefix to use within the database :param owner: Object used for identifying the process when claiming ownership. :param component_name: name of component; used to generate alive key in db (e.g. lmc-controller). *DEPRECATED*: use ``owned_entity=("component", component_name)`` instead. :param owned_entity: two-tuple with the type (Transaction attribute name) and full key of the entity for which convenience ownership management is provided. :param cargs: Backend client arguments """ self._backend = self._determine_backend(backend, **cargs) # Owner dictionary if isinstance(owner, entity.Owner): self.owner = owner else: if owner is None: owner = { "pid": os.getpid(), "hostname": gethostname(), "command": sys.argv, } self.owner = entity.Owner(**owner) # Global prefix assert global_prefix == "" or global_prefix[0] == "/" self._global_prefix = global_prefix # Lease associated with this client, kept alive on a separate thread # until the client is closed self._client_lease = None self._keepalive_thread: threading.Thread | None = None self._close_evt = threading.Event() # Identity of the entity receiving convenient ownership management if component_name and not owned_entity: warnings.warn( "component_name is deprecated, use owned_entity", DeprecationWarning, ) owned_entity = ("component", component_name) self._owned_entity = owned_entity # Transaction wrapper. self.wrapper: TxnWrapper = wrapper @property def backend(self) -> Backend: """Get the backend database object.""" return self._backend @staticmethod # pylint: disable=too-many-branches # Temp - Above pylint can be removed when we switch to one backend def _determine_backend(backend: str, **cargs) -> Backend: # Determine backend if not backend: backend = os.getenv("SDP_CONFIG_BACKEND", "etcd3") # Instantiate backend, reading configuration from environment/dotenv if backend == "etcd3": if "host" not in cargs: cargs["host"] = os.getenv("SDP_CONFIG_HOST", "127.0.0.1") if "port" not in cargs: cargs["port"] = int(os.getenv("SDP_CONFIG_PORT", "2379")) if "user" not in cargs: cargs["user"] = os.getenv("SDP_CONFIG_USERNAME", None) if "password" not in cargs: cargs["password"] = os.getenv("SDP_CONFIG_PASSWORD", None) return backend_mod.Etcd3Backend(**cargs) if backend == "etcd3revolution1": if "host" not in cargs: cargs["host"] = os.getenv("SDP_CONFIG_HOST", "127.0.0.1") if "port" not in cargs: cargs["port"] = int(os.getenv("SDP_CONFIG_PORT", "2379")) if "protocol" not in cargs: cargs["protocol"] = os.getenv("SDP_CONFIG_PROTOCOL", "http") if "cert" not in cargs: cargs["cert"] = os.getenv("SDP_CONFIG_CERT", None) if "username" not in cargs: cargs["username"] = os.getenv("SDP_CONFIG_USERNAME", None) if "password" not in cargs: cargs["password"] = os.getenv("SDP_CONFIG_PASSWORD", None) return backend_mod.Etcd3BackendRevolution1(**cargs) if backend == "memory": return backend_mod.MemoryBackend() raise ValueError(f"Unknown configuration backend {backend}!")
[docs] def lease(self, ttl=10) -> Lease: """ Generate a new lease. Once entered can be associated with keys, which will be kept alive until the end of the lease. At that point a daemon thread will be started automatically to refresh the lease periodically (default seems to be TTL/4). :param ttl: Time to live for lease :returns: lease object """ return self._backend.lease(ttl)
@property def client_lease(self) -> Lease: """Return the lease associated with the client. It will be kept alive until the client gets closed. """ return self._ensure_client_lease() def _ensure_client_lease(self) -> Lease: # This always returns the same lease, even if it's dead. # Which means the dead lease is a fatal error. # Lease death can be checked by calling lease.alive(). if self._client_lease is None: self._client_lease = self.lease() # pylint: disable=unnecessary-dunder-call if hasattr(self._client_lease, "__enter__"): # etcd3-py lease has an __enter__ - so this logic works! self._client_lease.__enter__() else: def keepalive(): lease = self._client_lease while True: if lease.remaining_ttl < 0: break if self._close_evt.wait(lease.remaining_ttl / 2): LOG.debug("Revoking lease") lease.revoke() self._client_lease = None return lease.refresh() LOG.fatal( "keepalive failed to maintain lease - aborting thread!" ) self._keepalive_thread = threading.Thread( target=keepalive, name="lease_keepalive" ) self._keepalive_thread.start() LOG.debug("keepalive started!") return self._client_lease
[docs] def revoke_lease(self) -> None: """ Revokes the lease internally held by this client, if any. Shouldn't normally be called by users, but is useful for tests. """ self._close_evt.set() if self._keepalive_thread: self._keepalive_thread.join() self._keepalive_thread = None self._close_evt.clear()
def _wrap_txn( self, txn: DbTransaction, wrap: bool = True ) -> Union["Transaction", TxnWrapper]: """ Utility function to wrap a low-level transaction. Wraps the low-level transaction in the Transaction class. If a custom wrapper is provided and further wrapping has been requested, it is used to wrap the transaction afterwards. :param txn: low-level transaction :param wrap: whether to use the custom wrapper, if present. :returns: wrapped transaction """ transaction = Transaction( self.owner, self._owned_entity, self._ensure_client_lease, txn, self._global_prefix, ) if self.wrapper is not None and wrap: transaction = self.wrapper(transaction) return transaction def _txn( self, max_retries: int = 64, wrap: bool = True ) -> "Transaction" | TxnWrapper: """ Utility function to allow creating Transaction objects that are not wrapped with the custom wrapper provided by the user, and thus can be used internally by this class. """ for txn in self._backend.txn(max_retries=max_retries): yield self._wrap_txn(txn, wrap)
[docs] def txn( self, max_retries: int = 64 ) -> Iterable[Union["Transaction", TxnWrapper]]: """Create a :class:`Transaction` for atomic configuration query/change. As we do not use locks, transactions might have to be repeated in order to guarantee atomicity. Suggested usage is as follows: .. code-block:: python for txn in config.txn(): # Use txn to read+write configuration # [Possibly call txn.loop()] As the `for` loop suggests, the code might get run multiple times even if not forced by calling :meth:`Transaction.loop`. Any writes using the transaction will be discarded if the transaction fails, but the application must make sure that the loop body has no other observable side effects. See also :ref:`Usage Guide <usage-guide>` for best practices for using transactions. :param max_retries: Number of transaction retries before a :class:`RuntimeError` gets raised. """ yield from self._txn(max_retries, True)
[docs] def watcher(self, timeout: Optional[float] = None) -> Iterable[Watcher]: """ Create a new watcher. Useful for waiting for changes in the configuration. Calling :py:meth:`Etcd3Watcher.txn()` on the returned watchers will create :py:class:`Transaction` objects just like :py:meth:`txn()`. See also :ref:`Usage Guide <usage-guide>` for best practices for using watchers. :param timeout: Timeout for waiting. Watcher will loop after this time. """ yield from self._backend.watcher(timeout, self._wrap_txn)
[docs] def set_alive(self) -> None: """ Set the keep-alive key. """ if self._owned_entity is None: raise TypeError( "No entity provided at creation time, cannot set it alive" ) for txn in self._txn(wrap=False): txn.self.take_ownership_if_not_alive()
[docs] def is_alive(self) -> bool: """ Is the connection alive in the sense that the keep-alive key exists? :returns: whether it is """ if self._owned_entity is None: raise TypeError( "No entity provided at creation time, cannot check if alive" ) alive = False for txn in self._txn(wrap=False): alive = txn.self.is_alive() return alive
[docs] def close(self) -> None: """Close the client connection.""" self.revoke_lease() self._backend.close()
# Can declare Self as return type from Python 3.11. def __enter__(self): """Scope the client connection.""" return self def __exit__(self, exc_type, exc_val, exc_tb) -> bool: """Scope the client connection.""" self.close() return False
[docs] class SdpConfigDeprecationWarning(DeprecationWarning): """Used to mark an sdp-config-db method deprecated"""
[docs] def deprecated(subobject_name): """ Marks a method as deprecated in favour of using the Transaction sub-objects """ def _inner_deprecated(method): @functools.wraps(method) def with_deprecation_message(*args, **kwargs): warnings.warn( f"{method.__name__} is deprecated, use the corresponding " f"method in Transaction.{subobject_name} instead", SdpConfigDeprecationWarning, ) return method(*args, **kwargs) return with_deprecation_message return _inner_deprecated
# pylint: disable-next=too-many-instance-attributes,too-many-public-methods
[docs] class Transaction: """High-level configuration queries and updates to execute atomically.""" # pylint: disable-next=too-many-arguments def __init__( self, owner: entity.Owner, owned_entity: tuple[str, str] | None, lease_getter: Callable[[], Lease], txn: DbTransaction, global_prefix: str, ): """Instantiate transaction.""" self._raw_txn = txn base_txn = BaseTransaction(owner, lease_getter, txn, global_prefix) self.component: ComponentOperations = ComponentOperations(base_txn) """Operations over SDP components""" self.controller: ControllerOperations = ControllerOperations(base_txn) """Operations over the LMC Controller""" self.deployment: DeploymentOperations = DeploymentOperations(base_txn) """Operations over Deployments""" self.execution_block: ExecutionBlockOperations = ( ExecutionBlockOperations(base_txn) ) """Operations over Execution Blocks""" self.processing_block: ProcessingBlockOperations = ( ProcessingBlockOperations(base_txn) ) """Operations over Processing Blocks""" self.script: ScriptOperations = ScriptOperations(base_txn) """Operations over Scripts""" self.subarray: SubarrayOperations = SubarrayOperations(base_txn) """Operations over Subarrays""" self.flow: FlowOperations = FlowOperations(base_txn) """Operations over Flows""" known_roots = { ComponentOperations.PREFIX, ControllerOperations.PATH, DeploymentOperations.PREFIX, ExecutionBlockOperations.PREFIX, ProcessingBlockOperations.PREFIX, ScriptOperations.PREFIX, SubarrayOperations.PREFIX, FlowOperations.PREFIX, } self.arbitrary: ArbitraryOperations = ArbitraryOperations( base_txn, known_roots ) """Operations over arbitrary paths""" # Special case for entity of interest self._self: EntityOperations | None = None if owned_entity: attr_name, key = owned_entity self._self = getattr(self, attr_name).index_by_key_parts(key) @property def raw(self) -> DbTransaction: """Return transaction object for accessing database directly.""" return self._raw_txn @property def self(self) -> EntityOperations | None: """Fast access to entity identified when creating the parent Config""" return self._self
[docs] def loop( self, wait: bool = False, timeout: Optional[float] = None ) -> None: """Repeat transaction regardless of whether commit succeeds. :param wait: If transaction succeeded, wait for any read values to change before repeating it. :param timeout: Maximum time to wait, in seconds """ warnings.warn( "Usage of txn.loop() is deprecated, " "only the old etcd3_revolution backend supports this still. " "Use a watcher instead", DeprecationWarning, ) return self._raw_txn.loop(wait, timeout)
[docs] @deprecated("component") def create_is_alive(self, key: str) -> str: """ Create an "is_alive" entry. :param key: "is alive" key in database e.g. "lmc-controller/owner" :returns: the full path of the entry """ return self.component.arbitrary_create_is_alive(key)
[docs] @deprecated("component") def is_alive(self, key: str) -> bool: """ Check if the "is alive" key still exists. :param key: "is alive" key in database e.g. "lmc-controller/owner" :returns: True if it does """ return self.component.arbitrary_check_is_alive(key)
[docs] @deprecated("processing_block") def list_processing_blocks(self, prefix: str = "") -> list[str]: """Query processing block IDs from the configuration. :param prefix: If given, only search for processing block IDs with the given prefix :returns: Processing block ids, in lexicographical order """ return self.processing_block.list_keys(key_prefix=prefix)
@staticmethod def _new_block_id( generator: str, prefix: str, description: str, list_blocks: Callable[[str], list[str]], ) -> str: # Find existing blocks with same prefix id_prefix = f"{prefix}-{generator}-{date.today():%Y%m%d}" existing_ids = list_blocks(id_prefix) # Choose ID that doesn't exist block_id = index = None max_blocks = 100000 for index in range(max_blocks): block_id = f"{id_prefix}-{index:05}" if block_id not in existing_ids: break if index >= max_blocks: raise RuntimeError( f"Exceeded daily number of {description} blocks!" ) return block_id
[docs] def new_processing_block_id(self, generator: str) -> str: """Generate a new processing block ID that is not yet in use. :param generator: Name of the generator :returns: Processing block ID """ return self._new_block_id( generator, "pb", "processing", self.list_processing_blocks )
[docs] @deprecated("processing_block") def get_processing_block( self, pb_id: str ) -> Optional[entity.ProcessingBlock]: """ Look up processing block data. :param pb_id: Processing block ID to look up :returns: Processing block entity, or None if it doesn't exist """ return self.processing_block.get(pb_id)
[docs] @deprecated("processing_block") def create_processing_block(self, pblock: entity.ProcessingBlock) -> None: """ Add a new :class:`ProcessingBlock` to the configuration. :param pblock: Processing block to create """ assert isinstance(pblock, entity.ProcessingBlock) self.processing_block.create(pblock)
[docs] @deprecated("processing_block") def update_processing_block(self, pblock: entity.ProcessingBlock) -> None: """ Update a :class:`ProcessingBlock` in the configuration. :param pblock: Processing block to update """ assert isinstance(pblock, entity.ProcessingBlock) self.processing_block.update(pblock)
[docs] @deprecated("processing_block") def get_processing_block_owner(self, pb_id: str) -> Optional[dict]: """ Look up the current processing block owner. :param pb_id: Processing block ID to look up :returns: Processing block owner data, or None if not claimed """ return self.processing_block.ownership(pb_id).get()
[docs] @deprecated("processing_block") def is_processing_block_owner(self, pb_id: str) -> bool: """ Check whether this client is owner of the processing block. :param pb_id: Processing block ID to look up :returns: Whether processing block exists and is claimed """ return self.processing_block.ownership( pb_id ).is_owned_by_this_process()
[docs] @deprecated("processing_block") def take_processing_block(self, pb_id: str) -> None: """ Take ownership of the processing block. :param pb_id: Processing block ID to take ownership of :raises: backend.ConfigCollision """ self.processing_block.ownership(pb_id).take()
[docs] @deprecated("processing_block") def get_processing_block_state(self, pb_id: str) -> Optional[dict]: """ Get the current processing block state. :param pb_id: Processing block ID :returns: Processing block state, or None if not present """ return self.processing_block.state(pb_id).get()
[docs] @deprecated("processing_block") def create_processing_block_state(self, pb_id: str, state: dict) -> None: """ Create processing block state. :param pb_id: Processing block ID :param state: Processing block state to create """ self.processing_block.state(pb_id).create(state)
[docs] @deprecated("processing_block") def update_processing_block_state(self, pb_id: str, state: dict) -> None: """ Update processing block state. :param pb_id: Processing block ID :param state: Processing block state to update """ self.processing_block.state(pb_id).update(state)
[docs] @deprecated("processing_block") def delete_processing_block( self, pb_id: str, recurse: bool = True ) -> None: """ Delete a processing block (pb) :param pb_id: Processing block ID :param recurse: if True, run recursive query and delete all includes deleting /state and /owner of pb if exists """ self.processing_block.delete(pb_id, recurse=recurse)
[docs] @deprecated("deployments") def get_deployment(self, deploy_id: str) -> Optional[entity.Deployment]: """ Retrieve details about a cluster configuration change. :param deploy_id: Name of the deployment :returns: Deployment details """ return self.deployment.get(deploy_id)
[docs] @deprecated("deployment") def list_deployments(self, prefix: str = "") -> list[str]: """ List all current deployments. :returns: Deployment IDs """ return self.deployment.list_keys(key_prefix=prefix)
[docs] @deprecated("deployment") def create_deployment(self, dpl: entity.Deployment) -> None: """ Request a change to cluster configuration. :param dpl: Deployment to add to database """ # Add to database assert isinstance(dpl, entity.Deployment) self.deployment.create(dpl)
[docs] @deprecated("deployment") def delete_deployment(self, dpl: entity.Deployment) -> None: """ Undo a change to cluster configuration. :param dpl: Deployment to remove """ self.deployment.delete(dpl.dpl_id, recurse=True)
[docs] @deprecated("deployment") def get_deployment_state(self, deploy_id: str) -> Optional[dict]: """ Get the current Deployment state. :param deploy_id: Deployment ID :returns: Deployment state, or None if not present """ return self.deployment.state(deploy_id).get()
[docs] @deprecated("deployment") def create_deployment_state(self, deploy_id: str, state: dict) -> None: """ Create Deployment state. :param deploy_id: Deployment ID :param state: Deployment state to create """ return self.deployment.state(deploy_id).create(state)
[docs] @deprecated("deployment") def update_deployment_state(self, deploy_id: str, state: dict) -> None: """ Update Deployment state. :param deploy_id: Deployment ID :param state: Deployment state to update """ return self.deployment.state(deploy_id).update(state)
[docs] @deprecated("execution_block") def list_execution_blocks(self, prefix: str = "") -> list[str]: """Query execution block IDs from the configuration. :param prefix: if given, only search for execution block IDs with the given prefix :returns: execution block IDs, in lexicographical order """ return self.execution_block.list_keys(eb_id_prefix=prefix)
[docs] def new_execution_block_id(self, generator: str) -> str: """Generate a new execution block ID that is not yet in use. :param generator: Name of the generator :returns: execution block ID """ return self._new_block_id( generator, "eb", "execution", self.list_execution_blocks )
[docs] @deprecated("execution_block") def get_execution_block(self, eb_id: str) -> dict: """ Get execution block. :param eb_id: execution block ID :returns: execution block state """ return self.execution_block(eb_id).get()
[docs] @deprecated("execution_block") def create_execution_block(self, eb_id: str, state: dict) -> None: """ Create execution block. :param eb_id: execution block ID :param state: execution block state """ self.execution_block(eb_id).create(state)
[docs] @deprecated("execution_block") def update_execution_block(self, eb_id: str, state: dict) -> None: """ Update execution block. :param eb_id: execution block ID :param state: execution block state """ self.execution_block(eb_id).update(state)
[docs] @deprecated("execution_block") def delete_execution_block(self, eb_id: str, recurse: bool = True) -> None: """ Delete an execution block (eb) :param eb_id: Execution block ID :param recurse: if True, run recursive query and delete all objects """ self.execution_block(eb_id).delete(recurse=recurse)
[docs] @deprecated("subarray") def list_subarrays(self, prefix: str = "") -> list[str]: """Query subarray IDs from the configuration. :param prefix: if given, only search for subarray IDs with the given prefix :returns: subarray IDs, in lexicographical order """ return self.subarray.list_keys(subarray_id=prefix)
[docs] @deprecated("subarray") def get_subarray(self, subarray_id: str) -> Optional[dict]: """ Get subarray state. :param subarray_id: subarray ID :returns: subarray state """ return self.subarray(subarray_id).get()
[docs] @deprecated("subarray") def create_subarray(self, subarray_id: str, state: dict) -> None: """ Create subarray state. :param subarray_id: subarray ID :param state: subarray state """ self.subarray(subarray_id).create(state)
[docs] @deprecated("subarray") def update_subarray(self, subarray_id: str, state: dict) -> None: """ Update subarray state. :param subarray_id: subarray ID :param state: subarray state """ self.subarray(subarray_id).update(state)
[docs] @deprecated("controller") def create_controller(self, state: dict) -> None: """ Create controller state. :param state: controller state """ self.controller.create(state)
[docs] @deprecated("controller") def update_controller(self, state: dict) -> None: """ Update controller state. :param state: controller state """ self.controller.update(state)
[docs] @deprecated("controller") def get_controller(self) -> Optional[dict]: """ Get controller state. :returns: controller state """ return self.controller.get()
[docs] @deprecated("script") def create_script( self, kind: str, name: str, version: str, script: dict ) -> None: """ Create processing script definition. :param kind: script kind :param name: script name :param version: script version :param script: script definition """ key = entity.Script.Key(kind=kind, name=name, version=version) script = entity.Script(key=key, **script) self.script.create(script)
[docs] @deprecated("script") def get_script(self, kind: str, name: str, version: str) -> Optional[dict]: """ Get processing script definition. :param kind: script kind :param name: script name :param version: script version :returns: script definition """ key = entity.Script.Key(kind=kind, name=name, version=version) script = self.script.get(key) return None if script is None else script.model_dump()
[docs] @deprecated("script") def list_scripts(self, kind: str = "", name: str = "") -> list[tuple[str]]: """ List processing script definitions. :param kind: script kind. Default empty :param name: script name. Default empty :returns: list of script definitions """ return [ tuple(key.model_dump().values()) for key in self.script.query_keys(kind=kind, name=name) ]
[docs] @deprecated("script") def update_script( self, kind: str, name: str, version: str, script: dict ) -> None: """ Update processing script definition. :param kind: script kind :param name: script name :param version: script version :param script: script definition """ key = entity.Script.Key(kind=kind, name=name, version=version) script = entity.Script(key=key, **script) self.script.update(script)
[docs] @deprecated("script") def delete_script(self, kind: str, name: str, version: str) -> None: """ Delete processing script definition. :param kind: script kind :param name: script name :param version: script version """ key = entity.Script.Key(kind=kind, name=name, version=version) self.script.delete(key)