"""High-level API for SKA SDP configuration."""
from __future__ import annotations
import logging
import os
import sys
import threading
import warnings
from datetime import date
from socket import gethostname
from typing import Callable, Iterable, Optional, Union
from . import backend as backend_mod
from . import entity
from .backend import Backend, DbTransaction, Lease, TxnWrapper, Watcher
from .base_transaction import BaseTransaction
from .operations import (
AllocationOperations,
ArbitraryOperations,
ComponentOperations,
DependencyOperations,
DeploymentOperations,
EntityOperations,
ExecutionBlockOperations,
FlowOperations,
ProcessingBlockOperations,
RequestOperations,
ResourceOperations,
ScriptOperations,
SystemOperations,
)
LOG = logging.getLogger(__name__)
MAX_LEASE_REFRESH_RETRIES = 5
[docs]
class Config:
"""Connection to SKA SDP configuration."""
# pylint: disable=too-many-instance-attributes, too-many-arguments
# pylint: disable=too-many-positional-arguments
def __init__(
self,
backend=None,
global_prefix: str = "",
owner: dict | entity.Owner | None = None,
component_name: str | None = None,
wrapper: Optional[TxnWrapper] = None,
owned_entity: tuple[str, str] | None = None,
**cargs,
):
"""
Connect to configuration using the given backend.
:param backend: Backend to use. Defaults to environment or etcd3 if
not set.
:param global_prefix: Prefix to use within the database
:param owner: Object used for identifying the process when claiming
ownership.
:param component_name: name of component; used to generate
alive key in db (e.g. lmc-controller). *DEPRECATED*: use
``owned_entity=("component", component_name)`` instead.
:param owned_entity: two-tuple with the type (Transaction attribute
name) and full key of the entity for which convenience ownership
management is provided.
:param cargs: Backend client arguments
"""
self._backend = self._determine_backend(backend, **cargs)
# Owner dictionary
if isinstance(owner, entity.Owner):
self.owner = owner
else:
if owner is None:
owner = {
"pid": os.getpid(),
"hostname": gethostname(),
"command": sys.argv,
}
self.owner = entity.Owner(**owner)
# Global prefix
assert global_prefix == "" or global_prefix[0] == "/"
self._global_prefix = global_prefix
# Lease associated with this client, kept alive on a separate thread
# until the client is closed
self._client_lease = None
self._keepalive_thread: threading.Thread | None = None
self._close_evt = threading.Event()
# Identity of the entity receiving convenient ownership management
if component_name and not owned_entity:
warnings.warn(
"component_name is deprecated, use owned_entity",
DeprecationWarning,
)
owned_entity = ("component", component_name)
self._owned_entity = owned_entity
# Transaction wrapper.
self.wrapper: TxnWrapper = wrapper
self._lease_lost = False
@property
def backend(self) -> Backend:
"""Get the backend database object."""
return self._backend
@staticmethod
def _determine_backend(backend: str, **cargs) -> Backend:
# Determine backend
if not backend:
backend = os.getenv("SDP_CONFIG_BACKEND", "etcd3")
# Instantiate backend, reading configuration from environment/dotenv
if backend == "etcd3":
if "host" not in cargs:
cargs["host"] = os.getenv("SDP_CONFIG_HOST", "127.0.0.1")
if "port" not in cargs:
cargs["port"] = int(os.getenv("SDP_CONFIG_PORT", "2379"))
if "user" not in cargs:
cargs["user"] = os.getenv("SDP_CONFIG_USERNAME", None)
if "password" not in cargs:
cargs["password"] = os.getenv("SDP_CONFIG_PASSWORD", None)
return backend_mod.Etcd3Backend(**cargs)
if backend == "memory":
return backend_mod.MemoryBackend()
raise ValueError(f"Unknown configuration backend {backend}!")
[docs]
def lease(self, ttl=10) -> Lease:
"""
Generate a new lease.
Once entered can be associated with keys,
which will be kept alive until the end of the lease. At that
point a daemon thread will be started automatically to refresh
the lease periodically (default seems to be TTL/4).
:param ttl: Time to live for lease
:returns: lease object
"""
return self._backend.lease(ttl)
@property
def lease_is_lost(self) -> bool:
"""Return whether this Config object's lease has been lost."""
return self._lease_lost
@property
def client_lease(self) -> Lease:
"""Return the lease associated with the client.
It will be kept alive until the client gets closed.
"""
return self._ensure_client_lease()
def _ensure_client_lease(self) -> Lease:
# This always returns the same lease, even if it's dead.
# Which means the dead lease is a fatal error.
# Lease death can be checked by calling lease.alive().
if self._lease_lost:
raise LeaseLostError(
f"Lease lost after {MAX_LEASE_REFRESH_RETRIES} retries."
)
if self._client_lease is None:
self._client_lease = self.lease()
self._keepalive_thread = threading.Thread(
target=self._keep_lease_alive,
args=(self._client_lease,),
name="lease_keepalive",
)
self._keepalive_thread.start()
LOG.debug("keepalive started!")
return self._client_lease
def _keep_lease_alive(self, lease: Lease) -> None:
"""Refresh lease"""
max_retries = MAX_LEASE_REFRESH_RETRIES
retries = 0
# try to refresh lease 5 times, waiting in between each retry
while True:
# check lease ttl, if finished, revoke lease
try:
ttl = lease.remaining_ttl
# pylint: disable=broad-exception-caught
except Exception as exc:
retries += 1
LOG.warning(
"Failed to query ttl, attempt %d/%d, error %s",
retries,
max_retries,
exc,
)
if retries >= max_retries:
self._mark_lease_as_lost()
return
# wait on the event to avoid retrying too quickly
if self._close_evt.wait(1.0):
self._remove_lease(lease)
return
continue
if ttl < 0:
self._mark_lease_as_lost()
return
if self._close_evt.wait(ttl / 2):
self._remove_lease(lease)
return
# if refreshing lease fails, mark as lost
try:
lease.refresh()
retries = 0
# pylint: disable=broad-exception-caught
except Exception as exc:
retries += 1
LOG.warning(
"Failed to refresh lease, attempt %d/%d, error %s",
retries,
max_retries,
exc,
)
if retries >= max_retries:
self._mark_lease_as_lost()
return
continue
def _mark_lease_as_lost(self):
"""Mark lease as lost"""
LOG.error("Marking lease as lost.")
self._lease_lost = True
self._client_lease = None
def _remove_lease(self, lease: Lease):
"""Revoke lease"""
LOG.info("Revoking lease.")
lease.revoke()
self._client_lease = None
[docs]
def revoke_lease(self) -> None:
"""
Revokes the lease internally held by this client, if any. Shouldn't
normally be called by users, but is useful for tests.
"""
self._close_evt.set()
if self._keepalive_thread:
self._keepalive_thread.join()
self._keepalive_thread = None
self._close_evt.clear()
def _wrap_txn(
self, txn: DbTransaction, wrap: bool = True
) -> Transaction | TxnWrapper:
"""
Utility function to wrap a low-level transaction.
Wraps the low-level transaction in the Transaction class. If a custom
wrapper is provided and further wrapping has been requested, it is
used to wrap the transaction afterwards.
:param txn: low-level transaction
:param wrap: whether to use the custom wrapper, if present.
:returns: wrapped transaction
"""
transaction = Transaction(
self.owner,
self._owned_entity,
self._ensure_client_lease,
txn,
self._global_prefix,
)
if self.wrapper is not None and wrap:
transaction = self.wrapper(transaction)
return transaction
def _txn(
self, max_retries: int = 64, wrap: bool = True
) -> Transaction | TxnWrapper:
"""
Utility function to allow creating Transaction objects that are not
wrapped with the custom wrapper provided by the user, and thus can be
used internally by this class.
"""
for txn in self._backend.txn(max_retries=max_retries):
yield self._wrap_txn(txn, wrap)
[docs]
def txn(
self, max_retries: int = 64
) -> Iterable[Union[Transaction, TxnWrapper]]:
"""Create a :class:`Transaction` for atomic configuration query/change.
As we do not use locks, transactions might have to be repeated in
order to guarantee atomicity. Suggested usage is as follows:
.. code-block:: python
for watcher in config.watcher(timeout=0.1):
for txn in watcher.txn():
# Use txn to read+write configuration
As the `for` loop suggests, the code might get run multiple
times. Any writes using the transaction
will be discarded if the transaction fails, but the
application must make sure that the loop body has no other
observable side effects.
See also :ref:`Usage Guide <usage-guide>` for best practices
for using transactions.
:param max_retries: Number of transaction retries before a
:class:`RuntimeError` gets raised.
"""
yield from self._txn(max_retries, True)
[docs]
def watcher(self, timeout: Optional[float] = None) -> Iterable[Watcher]:
"""
Create a new watcher.
Useful for waiting for changes in the configuration. Calling
:py:meth:`Etcd3Watcher.txn()` on the returned watchers will
create :py:class:`Transaction` objects just like
:py:meth:`txn()`.
See also :ref:`Usage Guide <usage-guide>` for best practices
for using watchers.
:param timeout: Timeout for waiting. Watcher will loop after this time.
"""
yield from self._backend.watcher(timeout, self._wrap_txn)
[docs]
def set_alive(self) -> None:
"""
Set the keep-alive key.
"""
if self._owned_entity is None:
raise TypeError(
"No entity provided at creation time, cannot set it alive"
)
for txn in self._txn(wrap=False):
txn.self.take_ownership_if_not_alive()
[docs]
def is_alive(self) -> bool:
"""
Is the connection alive in the sense that the keep-alive key exists?
:returns: whether it is
"""
if self._owned_entity is None:
raise TypeError(
"No entity provided at creation time, cannot check if alive"
)
alive = False
for txn in self._txn(wrap=False):
alive = txn.self.is_alive()
return alive
[docs]
def close(self) -> None:
"""Close the client connection."""
LOG.info("Closing connection to Config DB client.")
self.revoke_lease()
self._backend.close()
# Can declare Self as return type from Python 3.11.
def __enter__(self):
"""Scope the client connection."""
return self
def __exit__(self, exc_type, exc_val, exc_tb) -> bool:
"""Scope the client connection."""
self.close()
return False
# pylint: disable-next=too-many-instance-attributes,too-many-public-methods
[docs]
class Transaction:
"""High-level configuration queries and updates to execute atomically."""
# pylint: disable=too-many-arguments
# pylint: disable=too-many-positional-arguments
def __init__(
self,
owner: entity.Owner,
owned_entity: tuple[str, str] | None,
lease_getter: Callable[[], Lease],
txn: DbTransaction,
global_prefix: str,
):
"""Instantiate transaction."""
self._raw_txn = txn
base_txn = BaseTransaction(owner, lease_getter, txn, global_prefix)
self.component: ComponentOperations = ComponentOperations(base_txn)
"""Operations over SDP components"""
self.deployment: DeploymentOperations = DeploymentOperations(base_txn)
"""Operations over Deployments"""
self.execution_block: ExecutionBlockOperations = (
ExecutionBlockOperations(base_txn)
)
"""Operations over Execution Blocks"""
self.processing_block: ProcessingBlockOperations = (
ProcessingBlockOperations(base_txn)
)
"""Operations over Processing Blocks"""
self.script: ScriptOperations = ScriptOperations(base_txn)
"""Operations over Scripts"""
self.flow: FlowOperations = FlowOperations(base_txn)
"""Operations over Flows"""
self.system: SystemOperations = SystemOperations(base_txn)
"""Operations over system config"""
self.resource: ResourceOperations = ResourceOperations(base_txn)
"""Operations over resources"""
self.request: RequestOperations = RequestOperations(base_txn)
"""Operations over requests"""
self.allocation: AllocationOperations = AllocationOperations(base_txn)
"""Operations over allocations"""
self.dependency: DependencyOperations = DependencyOperations(base_txn)
"""Operations over Dependencies"""
known_roots = {
ComponentOperations.PREFIX,
DeploymentOperations.PREFIX,
ExecutionBlockOperations.PREFIX,
ProcessingBlockOperations.PREFIX,
ScriptOperations.PREFIX,
DependencyOperations.PREFIX,
FlowOperations.PREFIX,
SystemOperations.PATH,
ResourceOperations.PREFIX,
RequestOperations.PREFIX,
AllocationOperations.PREFIX,
}
self.arbitrary: ArbitraryOperations = ArbitraryOperations(
base_txn, known_roots
)
"""Operations over arbitrary paths"""
# Special case for entity of interest
self._self: EntityOperations | None = None
if owned_entity:
attr_name, key = owned_entity
self._self = getattr(self, attr_name).index_by_key_parts(key)
@property
def raw(self) -> DbTransaction:
"""Return transaction object for accessing database directly."""
return self._raw_txn
@property
def self(self) -> EntityOperations | None:
"""Fast access to entity identified when creating the parent Config"""
return self._self
@staticmethod
def _new_block_id(
generator: str,
prefix: str,
description: str,
list_blocks: Callable[[str], list[str]],
list_arg: str,
) -> str:
# Find existing blocks with same prefix
id_prefix = f"{prefix}-{generator}-{date.today():%Y%m%d}"
kwargs = {list_arg: id_prefix}
existing_ids = list_blocks(**kwargs)
# Choose ID that doesn't exist
block_id = index = None
max_blocks = 100000
for index in range(max_blocks):
block_id = f"{id_prefix}-{index:05}"
if block_id not in existing_ids:
break
if index >= max_blocks:
raise RuntimeError(
f"Exceeded daily number of {description} blocks!"
)
return block_id
[docs]
def new_processing_block_id(self, generator: str) -> str:
"""Generate a new processing block ID that is not yet in use.
:param generator: Name of the generator
:returns: Processing block ID
"""
return self._new_block_id(
generator,
"pb",
"processing",
self.processing_block.list_keys,
"key_prefix",
)
[docs]
def new_execution_block_id(self, generator: str) -> str:
"""Generate a new execution block ID that is not yet in use.
:param generator: Name of the generator
:returns: execution block ID
"""
return self._new_block_id(
generator,
"eb",
"execution",
self.execution_block.list_keys,
"key_prefix",
)
[docs]
class LeaseLostError(RuntimeError):
"""
For when a Config object's lease has been lost.
"""