"""
Memory backend for SKA SDP configuration DB.
The main purpose of this is for use in testing.
In principle, it should behave in the same way as the etcd backend.
No attempt has been made to make it thread-safe, so it probably isn't.
"""
from typing import Callable, Iterable, Optional, cast
from .backend import (
Backend,
DbRevision,
DbTransaction,
Lease,
TxnWrapper,
Watcher,
)
from .common import (
ConfigCollision,
ConfigVanished,
_check_path,
_tag_depth,
_untag_depth,
depth_of_path,
)
def _op(
path: str,
value: str,
to_check: Callable[[str], None],
to_do: Callable[[str, str], None],
):
_check_path(path)
tag = _tag_depth(path)
to_check(tag)
to_do(tag, value)
[docs]
class MemoryLease:
"""Dummy lease for in-memory backend."""
def __init__(self, ttl: float = 10.0):
self._ttl = ttl
self._remaining_ttl = ttl
@property
def remaining_ttl(self) -> float:
"""Remaining TTL."""
return self._remaining_ttl
[docs]
def refresh(self) -> None:
"""Refresh the lease (does nothing)."""
[docs]
def revoke(self) -> None:
"""Revoke the lease (does nothing)."""
[docs]
class MemoryBackend(Backend):
"""In-memory backend implementation, principally for testing."""
# Class variable to store data
_data: dict[str, str] = {}
[docs]
def lease(self, ttl: float = 10) -> Lease:
"""
Generate a dummy lease object.
:param ttl: time to live
:returns: dummy lease object
"""
return cast(Lease, MemoryLease(ttl))
[docs]
def txn(self, max_retries: int = 64) -> Iterable["MemoryTransaction"]:
"""
Create an in-memory "transaction".
:param max_retries: Maximum number of transaction loops
:returns: transaction object
"""
yield MemoryTransaction(self)
[docs]
def watcher(
self, timeout: float = None, txn_wrapper: TxnWrapper = None
) -> Watcher:
"""
Create an in-memory "watcher".
:param timeout: timeout in seconds
:param txn_wrapper: wrapper (factory) to return transaction
:returns: MemoryWatcher object (mock of Etcd3Watcher)
"""
return MemoryWatcher(self, timeout, txn_wrapper)
[docs]
def get(
self, path: str, revision: Optional[DbRevision] = None
) -> tuple[str, DbRevision]:
return (
self._data.get(_tag_depth(path), None),
revision or DbRevision(0),
)
def _put(self, path: str, value: str) -> None:
self._data[path] = value
def _check_exists(self, path: str) -> None:
if path not in self._data:
raise ConfigVanished(path, f"{path} not in dictionary")
def _check_not_exists(self, path: str) -> None:
if path in self._data:
raise ConfigCollision(path, f"path {path} already in dictionary")
[docs]
def create(
self, path: str, value: str, lease: Optional[Lease] = None
) -> None:
_op(path, value, self._check_not_exists, self._put)
[docs]
def update(self, path: str, value: str) -> None:
_op(path, value, self._check_exists, self._put)
# pylint: disable=too-many-arguments
# pylint: disable=too-many-positional-arguments
[docs]
def delete(
self,
path: str,
must_exist: bool = True,
recursive: bool = False,
prefix: bool = False,
max_depth: int = 16,
) -> None:
_check_path(path)
tag = _tag_depth(path)
if must_exist:
self._check_exists(tag)
if recursive:
depth = depth_of_path(path)
for lvl in range(depth, depth + max_depth):
tag = _tag_depth(path, depth=lvl)
for key in self._data.copy():
if key.startswith(tag):
self._data.pop(key)
elif tag in self._data:
self._data.pop(tag)
[docs]
def list_keys(self, path: str, recurse: int = 0) -> list[str]:
"""
Get a list of the keys at the given path.
In common with the etcd backend, the structure is
"flat" rather than a real hierarchy, even though it looks like one.
:param path: prefix of keys to query
:param recurse: maximum recursion level to query
:returns: list of keys
"""
depth = depth_of_path(path)
keys = []
for lvl in range(depth, depth + recurse + 1):
tag = _tag_depth(path, depth=lvl)
keys += [_untag_depth(k) for k in self._data if k.startswith(tag)]
return sorted(keys)
[docs]
def close(self) -> None:
"""
Close the resource. This does nothing.
"""
def __repr__(self) -> str:
return str(self._data)
[docs]
class MemoryTransaction(DbTransaction):
"""
Transaction wrapper around the backend implementation.
Transactions always succeed if they are valid, so there is no need
to loop; however the iterator is supported for compatibility with
the etcd backend.
"""
def __iter__(self):
"""
Iterate over just this object.
:returns: this object
"""
yield self
[docs]
def commit(self) -> bool:
"""
Commit the transaction. This does nothing.
"""
return True
[docs]
def reset(self, revision: Optional[DbRevision] = None) -> None:
"""
Reset the transaction. This does nothing.
"""
[docs]
def get(self, path: str) -> str:
value, _ = self.backend.get(path)
return value
[docs]
def create(
self, path: str, value: str, lease: Optional[Lease] = None
) -> None:
self.backend.create(path, value)
[docs]
def update(self, path: str, value: str) -> None:
self.backend.update(path, value)
[docs]
def delete(
self,
path: str,
must_exist: bool = True,
recursive: bool = False,
):
self.backend.delete(path, must_exist=must_exist, recursive=recursive)
[docs]
def list_keys(self, path: str, recurse: int = 0) -> list[str]:
return self.backend.list_keys(path, recurse=recurse)
[docs]
class MemoryWatcher(Watcher):
"""
Watcher wrapper around the backend implementation (Etcd3Watcher).
"""
def __iter__(self) -> Iterable["MemoryWatcher"]:
"""
Iterate over just this object.
:returns: this object
"""
yield self
[docs]
def txn(self) -> Iterable[MemoryTransaction]:
"""
Yield the wrapped MemoryTransaction object.
It does not implement the commit check that is part of
Etcd3Watcher.txn(), hence it acts as MemoryBackend.txn()
"""
for txn in MemoryTransaction(self.backend):
yield self.get_txn(txn)