"""High-level API for SKA SDP configuration."""
import functools
import logging
import os
import sys
import threading
import warnings
from datetime import date
from socket import gethostname
from typing import Callable, Iterable, Optional, Union
from . import backend as backend_mod
from . import entity
from .backend import Backend, DbTransaction, Lease, TxnWrapper, Watcher
from .base_transaction import BaseTransaction
from .operations import (
ArbitraryOperations,
ComponentOperations,
ControllerOperations,
DeploymentOperations,
EntityOperations,
ExecutionBlockOperations,
FlowOperations,
ProcessingBlockOperations,
ScriptOperations,
SubarrayOperations,
)
LOG = logging.getLogger(__name__)
[docs]
class Config:
"""Connection to SKA SDP configuration."""
# pylint: disable=too-many-instance-attributes, too-many-arguments
def __init__(
self,
backend=None,
global_prefix: str = "",
owner: dict | entity.Owner | None = None,
component_name: str | None = None,
wrapper: Optional[TxnWrapper] = None,
owned_entity: tuple[str, str] | None = None,
**cargs,
):
"""
Connect to configuration using the given backend.
:param backend: Backend to use. Defaults to environment or etcd3 if
not set.
:param global_prefix: Prefix to use within the database
:param owner: Object used for identifying the process when claiming
ownership.
:param component_name: name of component; used to generate
alive key in db (e.g. lmc-controller). *DEPRECATED*: use
``owned_entity=("component", component_name)`` instead.
:param owned_entity: two-tuple with the type (Transaction attribute
name) and full key of the entity for which convenience ownership
management is provided.
:param cargs: Backend client arguments
"""
self._backend = self._determine_backend(backend, **cargs)
# Owner dictionary
if isinstance(owner, entity.Owner):
self.owner = owner
else:
if owner is None:
owner = {
"pid": os.getpid(),
"hostname": gethostname(),
"command": sys.argv,
}
self.owner = entity.Owner(**owner)
# Global prefix
assert global_prefix == "" or global_prefix[0] == "/"
self._global_prefix = global_prefix
# Lease associated with this client, kept alive on a separate thread
# until the client is closed
self._client_lease = None
self._keepalive_thread: threading.Thread | None = None
self._close_evt = threading.Event()
# Identity of the entity receiving convenient ownership management
if component_name and not owned_entity:
warnings.warn(
"component_name is deprecated, use owned_entity",
DeprecationWarning,
)
owned_entity = ("component", component_name)
self._owned_entity = owned_entity
# Transaction wrapper.
self.wrapper: TxnWrapper = wrapper
@property
def backend(self) -> Backend:
"""Get the backend database object."""
return self._backend
@staticmethod
# pylint: disable=too-many-branches
# Temp - Above pylint can be removed when we switch to one backend
def _determine_backend(backend: str, **cargs) -> Backend:
# Determine backend
if not backend:
backend = os.getenv("SDP_CONFIG_BACKEND", "etcd3")
# Instantiate backend, reading configuration from environment/dotenv
if backend == "etcd3":
if "host" not in cargs:
cargs["host"] = os.getenv("SDP_CONFIG_HOST", "127.0.0.1")
if "port" not in cargs:
cargs["port"] = int(os.getenv("SDP_CONFIG_PORT", "2379"))
if "user" not in cargs:
cargs["user"] = os.getenv("SDP_CONFIG_USERNAME", None)
if "password" not in cargs:
cargs["password"] = os.getenv("SDP_CONFIG_PASSWORD", None)
return backend_mod.Etcd3Backend(**cargs)
if backend == "etcd3revolution1":
if "host" not in cargs:
cargs["host"] = os.getenv("SDP_CONFIG_HOST", "127.0.0.1")
if "port" not in cargs:
cargs["port"] = int(os.getenv("SDP_CONFIG_PORT", "2379"))
if "protocol" not in cargs:
cargs["protocol"] = os.getenv("SDP_CONFIG_PROTOCOL", "http")
if "cert" not in cargs:
cargs["cert"] = os.getenv("SDP_CONFIG_CERT", None)
if "username" not in cargs:
cargs["username"] = os.getenv("SDP_CONFIG_USERNAME", None)
if "password" not in cargs:
cargs["password"] = os.getenv("SDP_CONFIG_PASSWORD", None)
return backend_mod.Etcd3BackendRevolution1(**cargs)
if backend == "memory":
return backend_mod.MemoryBackend()
raise ValueError(f"Unknown configuration backend {backend}!")
[docs]
def lease(self, ttl=10) -> Lease:
"""
Generate a new lease.
Once entered can be associated with keys,
which will be kept alive until the end of the lease. At that
point a daemon thread will be started automatically to refresh
the lease periodically (default seems to be TTL/4).
:param ttl: Time to live for lease
:returns: lease object
"""
return self._backend.lease(ttl)
@property
def client_lease(self) -> Lease:
"""Return the lease associated with the client.
It will be kept alive until the client gets closed.
"""
return self._ensure_client_lease()
def _ensure_client_lease(self) -> Lease:
# This always returns the same lease, even if it's dead.
# Which means the dead lease is a fatal error.
# Lease death can be checked by calling lease.alive().
if self._client_lease is None:
self._client_lease = self.lease()
# pylint: disable=unnecessary-dunder-call
if hasattr(self._client_lease, "__enter__"):
# etcd3-py lease has an __enter__ - so this logic works!
self._client_lease.__enter__()
else:
def keepalive():
lease = self._client_lease
while True:
if lease.remaining_ttl < 0:
break
if self._close_evt.wait(lease.remaining_ttl / 2):
LOG.debug("Revoking lease")
lease.revoke()
self._client_lease = None
return
lease.refresh()
LOG.fatal(
"keepalive failed to maintain lease - aborting thread!"
)
self._keepalive_thread = threading.Thread(
target=keepalive, name="lease_keepalive"
)
self._keepalive_thread.start()
LOG.debug("keepalive started!")
return self._client_lease
[docs]
def revoke_lease(self) -> None:
"""
Revokes the lease internally held by this client, if any. Shouldn't
normally be called by users, but is useful for tests.
"""
self._close_evt.set()
if self._keepalive_thread:
self._keepalive_thread.join()
self._keepalive_thread = None
self._close_evt.clear()
def _wrap_txn(
self, txn: DbTransaction, wrap: bool = True
) -> Union["Transaction", TxnWrapper]:
"""
Utility function to wrap a low-level transaction.
Wraps the low-level transaction in the Transaction class. If a custom
wrapper is provided and further wrapping has been requested, it is
used to wrap the transaction afterwards.
:param txn: low-level transaction
:param wrap: whether to use the custom wrapper, if present.
:returns: wrapped transaction
"""
transaction = Transaction(
self.owner,
self._owned_entity,
self._ensure_client_lease,
txn,
self._global_prefix,
)
if self.wrapper is not None and wrap:
transaction = self.wrapper(transaction)
return transaction
def _txn(
self, max_retries: int = 64, wrap: bool = True
) -> "Transaction" | TxnWrapper:
"""
Utility function to allow creating Transaction objects that are not
wrapped with the custom wrapper provided by the user, and thus can be
used internally by this class.
"""
for txn in self._backend.txn(max_retries=max_retries):
yield self._wrap_txn(txn, wrap)
[docs]
def txn(
self, max_retries: int = 64
) -> Iterable[Union["Transaction", TxnWrapper]]:
"""Create a :class:`Transaction` for atomic configuration query/change.
As we do not use locks, transactions might have to be repeated in
order to guarantee atomicity. Suggested usage is as follows:
.. code-block:: python
for txn in config.txn():
# Use txn to read+write configuration
# [Possibly call txn.loop()]
As the `for` loop suggests, the code might get run multiple
times even if not forced by calling
:meth:`Transaction.loop`. Any writes using the transaction
will be discarded if the transaction fails, but the
application must make sure that the loop body has no other
observable side effects.
See also :ref:`Usage Guide <usage-guide>` for best practices
for using transactions.
:param max_retries: Number of transaction retries before a
:class:`RuntimeError` gets raised.
"""
yield from self._txn(max_retries, True)
[docs]
def watcher(self, timeout: Optional[float] = None) -> Iterable[Watcher]:
"""
Create a new watcher.
Useful for waiting for changes in the configuration. Calling
:py:meth:`Etcd3Watcher.txn()` on the returned watchers will
create :py:class:`Transaction` objects just like
:py:meth:`txn()`.
See also :ref:`Usage Guide <usage-guide>` for best practices
for using watchers.
:param timeout: Timeout for waiting. Watcher will loop after this time.
"""
yield from self._backend.watcher(timeout, self._wrap_txn)
[docs]
def set_alive(self) -> None:
"""
Set the keep-alive key.
"""
if self._owned_entity is None:
raise TypeError(
"No entity provided at creation time, cannot set it alive"
)
for txn in self._txn(wrap=False):
txn.self.take_ownership_if_not_alive()
[docs]
def is_alive(self) -> bool:
"""
Is the connection alive in the sense that the keep-alive key exists?
:returns: whether it is
"""
if self._owned_entity is None:
raise TypeError(
"No entity provided at creation time, cannot check if alive"
)
alive = False
for txn in self._txn(wrap=False):
alive = txn.self.is_alive()
return alive
[docs]
def close(self) -> None:
"""Close the client connection."""
self.revoke_lease()
self._backend.close()
# Can declare Self as return type from Python 3.11.
def __enter__(self):
"""Scope the client connection."""
return self
def __exit__(self, exc_type, exc_val, exc_tb) -> bool:
"""Scope the client connection."""
self.close()
return False
[docs]
class SdpConfigDeprecationWarning(DeprecationWarning):
"""Used to mark an sdp-config-db method deprecated"""
[docs]
def deprecated(subobject_name):
"""
Marks a method as deprecated in favour of using the Transaction sub-objects
"""
def _inner_deprecated(method):
@functools.wraps(method)
def with_deprecation_message(*args, **kwargs):
warnings.warn(
f"{method.__name__} is deprecated, use the corresponding "
f"method in Transaction.{subobject_name} instead",
SdpConfigDeprecationWarning,
)
return method(*args, **kwargs)
return with_deprecation_message
return _inner_deprecated
# pylint: disable-next=too-many-instance-attributes,too-many-public-methods
[docs]
class Transaction:
"""High-level configuration queries and updates to execute atomically."""
# pylint: disable-next=too-many-arguments
def __init__(
self,
owner: entity.Owner,
owned_entity: tuple[str, str] | None,
lease_getter: Callable[[], Lease],
txn: DbTransaction,
global_prefix: str,
):
"""Instantiate transaction."""
self._raw_txn = txn
base_txn = BaseTransaction(owner, lease_getter, txn, global_prefix)
self.component: ComponentOperations = ComponentOperations(base_txn)
"""Operations over SDP components"""
self.controller: ControllerOperations = ControllerOperations(base_txn)
"""Operations over the LMC Controller"""
self.deployment: DeploymentOperations = DeploymentOperations(base_txn)
"""Operations over Deployments"""
self.execution_block: ExecutionBlockOperations = (
ExecutionBlockOperations(base_txn)
)
"""Operations over Execution Blocks"""
self.processing_block: ProcessingBlockOperations = (
ProcessingBlockOperations(base_txn)
)
"""Operations over Processing Blocks"""
self.script: ScriptOperations = ScriptOperations(base_txn)
"""Operations over Scripts"""
self.subarray: SubarrayOperations = SubarrayOperations(base_txn)
"""Operations over Subarrays"""
self.flow: FlowOperations = FlowOperations(base_txn)
"""Operations over Flows"""
known_roots = {
ComponentOperations.PREFIX,
ControllerOperations.PATH,
DeploymentOperations.PREFIX,
ExecutionBlockOperations.PREFIX,
ProcessingBlockOperations.PREFIX,
ScriptOperations.PREFIX,
SubarrayOperations.PREFIX,
FlowOperations.PREFIX,
}
self.arbitrary: ArbitraryOperations = ArbitraryOperations(
base_txn, known_roots
)
"""Operations over arbitrary paths"""
# Special case for entity of interest
self._self: EntityOperations | None = None
if owned_entity:
attr_name, key = owned_entity
self._self = getattr(self, attr_name).index_by_key_parts(key)
@property
def raw(self) -> DbTransaction:
"""Return transaction object for accessing database directly."""
return self._raw_txn
@property
def self(self) -> EntityOperations | None:
"""Fast access to entity identified when creating the parent Config"""
return self._self
[docs]
def loop(
self, wait: bool = False, timeout: Optional[float] = None
) -> None:
"""Repeat transaction regardless of whether commit succeeds.
:param wait: If transaction succeeded, wait for any read
values to change before repeating it.
:param timeout: Maximum time to wait, in seconds
"""
warnings.warn(
"Usage of txn.loop() is deprecated, "
"only the old etcd3_revolution backend supports this still. "
"Use a watcher instead",
DeprecationWarning,
)
return self._raw_txn.loop(wait, timeout)
[docs]
@deprecated("component")
def create_is_alive(self, key: str) -> str:
"""
Create an "is_alive" entry.
:param key: "is alive" key in database
e.g. "lmc-controller/owner"
:returns: the full path of the entry
"""
return self.component.arbitrary_create_is_alive(key)
[docs]
@deprecated("component")
def is_alive(self, key: str) -> bool:
"""
Check if the "is alive" key still exists.
:param key: "is alive" key in database
e.g. "lmc-controller/owner"
:returns: True if it does
"""
return self.component.arbitrary_check_is_alive(key)
[docs]
@deprecated("processing_block")
def list_processing_blocks(self, prefix: str = "") -> list[str]:
"""Query processing block IDs from the configuration.
:param prefix: If given, only search for processing block IDs
with the given prefix
:returns: Processing block ids, in lexicographical order
"""
return self.processing_block.list_keys(key_prefix=prefix)
@staticmethod
def _new_block_id(
generator: str,
prefix: str,
description: str,
list_blocks: Callable[[str], list[str]],
) -> str:
# Find existing blocks with same prefix
id_prefix = f"{prefix}-{generator}-{date.today():%Y%m%d}"
existing_ids = list_blocks(id_prefix)
# Choose ID that doesn't exist
block_id = index = None
max_blocks = 100000
for index in range(max_blocks):
block_id = f"{id_prefix}-{index:05}"
if block_id not in existing_ids:
break
if index >= max_blocks:
raise RuntimeError(
f"Exceeded daily number of {description} blocks!"
)
return block_id
[docs]
def new_processing_block_id(self, generator: str) -> str:
"""Generate a new processing block ID that is not yet in use.
:param generator: Name of the generator
:returns: Processing block ID
"""
return self._new_block_id(
generator, "pb", "processing", self.list_processing_blocks
)
[docs]
@deprecated("processing_block")
def get_processing_block(
self, pb_id: str
) -> Optional[entity.ProcessingBlock]:
"""
Look up processing block data.
:param pb_id: Processing block ID to look up
:returns: Processing block entity, or None if it doesn't exist
"""
return self.processing_block.get(pb_id)
[docs]
@deprecated("processing_block")
def create_processing_block(self, pblock: entity.ProcessingBlock) -> None:
"""
Add a new :class:`ProcessingBlock` to the configuration.
:param pblock: Processing block to create
"""
assert isinstance(pblock, entity.ProcessingBlock)
self.processing_block.create(pblock)
[docs]
@deprecated("processing_block")
def update_processing_block(self, pblock: entity.ProcessingBlock) -> None:
"""
Update a :class:`ProcessingBlock` in the configuration.
:param pblock: Processing block to update
"""
assert isinstance(pblock, entity.ProcessingBlock)
self.processing_block.update(pblock)
[docs]
@deprecated("processing_block")
def get_processing_block_owner(self, pb_id: str) -> Optional[dict]:
"""
Look up the current processing block owner.
:param pb_id: Processing block ID to look up
:returns: Processing block owner data, or None if not claimed
"""
return self.processing_block.ownership(pb_id).get()
[docs]
@deprecated("processing_block")
def is_processing_block_owner(self, pb_id: str) -> bool:
"""
Check whether this client is owner of the processing block.
:param pb_id: Processing block ID to look up
:returns: Whether processing block exists and is claimed
"""
return self.processing_block.ownership(
pb_id
).is_owned_by_this_process()
[docs]
@deprecated("processing_block")
def take_processing_block(self, pb_id: str) -> None:
"""
Take ownership of the processing block.
:param pb_id: Processing block ID to take ownership of
:raises: backend.ConfigCollision
"""
self.processing_block.ownership(pb_id).take()
[docs]
@deprecated("processing_block")
def get_processing_block_state(self, pb_id: str) -> Optional[dict]:
"""
Get the current processing block state.
:param pb_id: Processing block ID
:returns: Processing block state, or None if not present
"""
return self.processing_block.state(pb_id).get()
[docs]
@deprecated("processing_block")
def create_processing_block_state(self, pb_id: str, state: dict) -> None:
"""
Create processing block state.
:param pb_id: Processing block ID
:param state: Processing block state to create
"""
self.processing_block.state(pb_id).create(state)
[docs]
@deprecated("processing_block")
def update_processing_block_state(self, pb_id: str, state: dict) -> None:
"""
Update processing block state.
:param pb_id: Processing block ID
:param state: Processing block state to update
"""
self.processing_block.state(pb_id).update(state)
[docs]
@deprecated("processing_block")
def delete_processing_block(
self, pb_id: str, recurse: bool = True
) -> None:
"""
Delete a processing block (pb)
:param pb_id: Processing block ID
:param recurse: if True, run recursive query and delete all
includes deleting /state and /owner of pb if exists
"""
self.processing_block.delete(pb_id, recurse=recurse)
[docs]
@deprecated("deployments")
def get_deployment(self, deploy_id: str) -> Optional[entity.Deployment]:
"""
Retrieve details about a cluster configuration change.
:param deploy_id: Name of the deployment
:returns: Deployment details
"""
return self.deployment.get(deploy_id)
[docs]
@deprecated("deployment")
def list_deployments(self, prefix: str = "") -> list[str]:
"""
List all current deployments.
:returns: Deployment IDs
"""
return self.deployment.list_keys(key_prefix=prefix)
[docs]
@deprecated("deployment")
def create_deployment(self, dpl: entity.Deployment) -> None:
"""
Request a change to cluster configuration.
:param dpl: Deployment to add to database
"""
# Add to database
assert isinstance(dpl, entity.Deployment)
self.deployment.create(dpl)
[docs]
@deprecated("deployment")
def delete_deployment(self, dpl: entity.Deployment) -> None:
"""
Undo a change to cluster configuration.
:param dpl: Deployment to remove
"""
self.deployment.delete(dpl.dpl_id, recurse=True)
[docs]
@deprecated("deployment")
def get_deployment_state(self, deploy_id: str) -> Optional[dict]:
"""
Get the current Deployment state.
:param deploy_id: Deployment ID
:returns: Deployment state, or None if not present
"""
return self.deployment.state(deploy_id).get()
[docs]
@deprecated("deployment")
def create_deployment_state(self, deploy_id: str, state: dict) -> None:
"""
Create Deployment state.
:param deploy_id: Deployment ID
:param state: Deployment state to create
"""
return self.deployment.state(deploy_id).create(state)
[docs]
@deprecated("deployment")
def update_deployment_state(self, deploy_id: str, state: dict) -> None:
"""
Update Deployment state.
:param deploy_id: Deployment ID
:param state: Deployment state to update
"""
return self.deployment.state(deploy_id).update(state)
[docs]
@deprecated("execution_block")
def list_execution_blocks(self, prefix: str = "") -> list[str]:
"""Query execution block IDs from the configuration.
:param prefix: if given, only search for execution block IDs
with the given prefix
:returns: execution block IDs, in lexicographical order
"""
return self.execution_block.list_keys(eb_id_prefix=prefix)
[docs]
def new_execution_block_id(self, generator: str) -> str:
"""Generate a new execution block ID that is not yet in use.
:param generator: Name of the generator
:returns: execution block ID
"""
return self._new_block_id(
generator, "eb", "execution", self.list_execution_blocks
)
[docs]
@deprecated("execution_block")
def get_execution_block(self, eb_id: str) -> dict:
"""
Get execution block.
:param eb_id: execution block ID
:returns: execution block state
"""
return self.execution_block(eb_id).get()
[docs]
@deprecated("execution_block")
def create_execution_block(self, eb_id: str, state: dict) -> None:
"""
Create execution block.
:param eb_id: execution block ID
:param state: execution block state
"""
self.execution_block(eb_id).create(state)
[docs]
@deprecated("execution_block")
def update_execution_block(self, eb_id: str, state: dict) -> None:
"""
Update execution block.
:param eb_id: execution block ID
:param state: execution block state
"""
self.execution_block(eb_id).update(state)
[docs]
@deprecated("execution_block")
def delete_execution_block(self, eb_id: str, recurse: bool = True) -> None:
"""
Delete an execution block (eb)
:param eb_id: Execution block ID
:param recurse: if True, run recursive query and delete all objects
"""
self.execution_block(eb_id).delete(recurse=recurse)
[docs]
@deprecated("subarray")
def list_subarrays(self, prefix: str = "") -> list[str]:
"""Query subarray IDs from the configuration.
:param prefix: if given, only search for subarray IDs
with the given prefix
:returns: subarray IDs, in lexicographical order
"""
return self.subarray.list_keys(subarray_id=prefix)
[docs]
@deprecated("subarray")
def get_subarray(self, subarray_id: str) -> Optional[dict]:
"""
Get subarray state.
:param subarray_id: subarray ID
:returns: subarray state
"""
return self.subarray(subarray_id).get()
[docs]
@deprecated("subarray")
def create_subarray(self, subarray_id: str, state: dict) -> None:
"""
Create subarray state.
:param subarray_id: subarray ID
:param state: subarray state
"""
self.subarray(subarray_id).create(state)
[docs]
@deprecated("subarray")
def update_subarray(self, subarray_id: str, state: dict) -> None:
"""
Update subarray state.
:param subarray_id: subarray ID
:param state: subarray state
"""
self.subarray(subarray_id).update(state)
[docs]
@deprecated("controller")
def create_controller(self, state: dict) -> None:
"""
Create controller state.
:param state: controller state
"""
self.controller.create(state)
[docs]
@deprecated("controller")
def update_controller(self, state: dict) -> None:
"""
Update controller state.
:param state: controller state
"""
self.controller.update(state)
[docs]
@deprecated("controller")
def get_controller(self) -> Optional[dict]:
"""
Get controller state.
:returns: controller state
"""
return self.controller.get()
[docs]
@deprecated("script")
def create_script(
self, kind: str, name: str, version: str, script: dict
) -> None:
"""
Create processing script definition.
:param kind: script kind
:param name: script name
:param version: script version
:param script: script definition
"""
key = entity.Script.Key(kind=kind, name=name, version=version)
script = entity.Script(key=key, **script)
self.script.create(script)
[docs]
@deprecated("script")
def get_script(self, kind: str, name: str, version: str) -> Optional[dict]:
"""
Get processing script definition.
:param kind: script kind
:param name: script name
:param version: script version
:returns: script definition
"""
key = entity.Script.Key(kind=kind, name=name, version=version)
script = self.script.get(key)
return None if script is None else script.model_dump()
[docs]
@deprecated("script")
def list_scripts(self, kind: str = "", name: str = "") -> list[tuple[str]]:
"""
List processing script definitions.
:param kind: script kind. Default empty
:param name: script name. Default empty
:returns: list of script definitions
"""
return [
tuple(key.model_dump().values())
for key in self.script.query_keys(kind=kind, name=name)
]
[docs]
@deprecated("script")
def update_script(
self, kind: str, name: str, version: str, script: dict
) -> None:
"""
Update processing script definition.
:param kind: script kind
:param name: script name
:param version: script version
:param script: script definition
"""
key = entity.Script.Key(kind=kind, name=name, version=version)
script = entity.Script(key=key, **script)
self.script.update(script)
[docs]
@deprecated("script")
def delete_script(self, kind: str, name: str, version: str) -> None:
"""
Delete processing script definition.
:param kind: script kind
:param name: script name
:param version: script version
"""
key = entity.Script.Key(kind=kind, name=name, version=version)
self.script.delete(key)