"""
Etcd3 backend for SKA SDP configuration DB, using client from
https://github.com/Revolution1/etcd3-py
"""
import logging
import os
import queue as queue_m
import socket
import threading
import time
from typing import Any, Callable, Iterable, Optional, cast
import etcd3_revolution1
import requests
from deprecated import deprecated
from .backend import (
Backend,
DbRevision,
DbTransaction,
Lease,
TxnWrapper,
Watcher,
)
from .common import (
ConfigCollision,
ConfigVanished,
_check_path,
_tag_depth,
_untag_depth,
)
LOGGER = logging.getLogger(__name__)
# Change the log level for the imported package 'etcd3'
logging.getLogger("etcd3").setLevel(
os.getenv("SDP_CONFIG_ETCD3_LOG_LEVEL", "INFO")
)
[docs]
class Etcd3BackendRevolution1(Backend):
"""
Highly consistent database backend store.
See https://github.com/etcd-io/etcd
All parameters will be passed on to :py:meth:`etcd3.Client`.
"""
def __init__(
self, *args, max_retries: int = 15, retry_time: float = 0.1, **kw_args
):
"""Instantiate the database client.
:param max_retries: Number of times we retry any database interaction
:param retry_time: Initial back-off time after a failed
database interaction, in seconds. Will be increased by 50%
for every failed attempt.
"""
self._max_retries = max_retries
self._retry_time = retry_time
self._client = self._retry_loop(
lambda: etcd3_revolution1.Client(*args, **kw_args)
)
def _retry_loop(self, code_to_try: Callable) -> Any:
"""
Helper that retries code if an exception gets thrown that
typically indicates a loss of connection. Note that this *can*
rarely mean that the effect of the code in question was
executed multiple times.
"""
# Retry loop
retry_time = self._retry_time
for i in range(self._max_retries):
# Common retry code
def log_exception(ex, i):
LOGGER.warning(
"Caught %s, retry %d after %gs",
repr(ex),
i,
retry_time,
)
# Run the code, catching typical exceptions
try:
return code_to_try()
except requests.exceptions.ConnectionError as ex:
log_exception(ex, i)
except (
etcd3_revolution1.errors.go_etcd_rpctypes_error.ErrUnknownError
) as ex:
log_exception(ex, i)
except AttributeError as ex:
# This gets raised when we fail to connect to etcd
# when calling the etcd3.Client constructor, for some
# reason.
if ex.name == "server_version_sem":
log_exception(ex, i)
else:
raise
# Delay before next iteration
time.sleep(retry_time)
retry_time *= 1.5 # back off
# Attempt one final time - without safety net
return code_to_try()
[docs]
def lease(self, ttl: int = 10) -> Lease:
"""Generate a new lease.
Once entered can be associated with keys, which will be kept
alive until the end of the lease. Note that this involves
starting a daemon thread that will refresh the lease
periodically (default seems to be TTL/4).
:param ttl: Time to live for lease
:return: lease object
"""
return cast(Lease, self._client.Lease(ttl=ttl))
[docs]
def txn(self, max_retries: int = 64) -> Iterable["Etcd3Transaction"]:
"""Create a new transaction.
Note that this uses an optimistic STM-style implementation,
which cannot guarantee that a transaction runs through
successfully. Therefore, this function returns an iterator,
which loops until the transaction succeeds:
.. code-block:: python
for txn in etcd3.txn():
# ... transaction steps ...
Note that this will in most cases only execute one
iteration. If you actually want to loop - for instance because
you intend to wait for something to happen in the
configuration - use :py:meth:`watcher()` instead.
:param max_retries: Maximum number of transaction loops
:returns: Transaction iterator
"""
yield from Etcd3Transaction(self, self._client, max_retries)
[docs]
def watcher(
self,
timeout: float = None,
txn_wrapper: TxnWrapper = None,
) -> Iterable["Etcd3Watcher"]:
"""Create a new watcher.
Useful for waiting for changes in the configuration. See
:py:class:`Etcd3Watcher`.
:param timeout: Timeout for waiting. Watcher will loop after this time.
:param txn_wrapper: Function to wrap transactions returned by the
wrapper.
:returns: Watcher iterator
"""
return Etcd3Watcher(self, self._client, timeout, txn_wrapper)
[docs]
def get(
self, path: str, revision: Optional[DbRevision] = None
) -> tuple[str, DbRevision]:
"""
Get value of a key.
:param path: Path of key to query
:param revision: Database revision for which to read key
:returns: (value, revision). value is None if it doesn't exist
"""
# Check/prepare parameters
_check_path(path)
tagged_path = _tag_depth(path)
rev = None if revision is None else revision.revision
# Query range
response = self._retry_loop(
lambda: self._client.range(tagged_path, revision=rev)
)
# Get value returned
result = response.kvs
if result is not None:
assert (
len(response.kvs) == 1
), f"Requesting '{path}' yielded more than one match!"
result = result[0].value.decode("utf-8")
# Return value together with revision
return result, DbRevision(response.header.revision)
[docs]
def watch(
self,
path: str,
prefix: bool = False,
revision: Optional[DbRevision] = None,
depth: Optional[int] = None,
):
"""Watch key or key range.
Use a path ending with `'/'` in combination with `prefix` to
watch all child keys.
:param path: Path of key to query, or prefix of keys.
:param prefix: Watch for keys with given prefix if set
:param revision: Database revision from which to watch
:param depth: tag depth
:returns: `Etcd3Watch` object for watch request
"""
# Check/prepare parameters
if not prefix and path and path[-1] == "/":
raise ValueError("Path should not have a trailing '/'!")
tagged_path = _tag_depth(path, depth)
rev = None if revision is None else revision.revision
# Set up watcher
return Etcd3Watch(
backend=self,
tagged_path=tagged_path,
start_revision=rev,
prefix=prefix,
max_retries=self._max_retries,
retry_time=self._retry_time,
)
[docs]
def list_keys(
self,
path: str,
recurse: int = 0,
revision: Optional[DbRevision] = None,
) -> tuple[list[str], DbRevision]:
"""
List keys under given path.
:param path: Prefix of keys to query. Append '/' to list
child paths.
:param recurse: Maximum recursion level to query. If iterable,
cover exactly the recursion levels specified.
:param revision: Database revision for which to list
:returns: (sorted key list, revision)
"""
# Prepare parameters
path_depth = path.count("/")
rev = None
if revision is not None:
rev = revision.revision
# Make transaction to collect keys from all levels
txn = self._client.Txn()
try:
depth_iter = iter(recurse)
except TypeError:
depth_iter = range(recurse + 1)
for depth in depth_iter:
tagged_path = _tag_depth(path, depth + path_depth)
txn.success(
txn.range(
tagged_path, prefix=True, keys_only=True, revision=rev
)
)
response = self._retry_loop(txn.commit)
# We do not return a mod revision here - this would not be
# very useful anyway as we are not returning values
revision = DbRevision(response.header.revision)
if response.responses is None:
return [], revision
# Collect and sort keys
sorted_keys = sorted(
[
_untag_depth(kv.key)
for res in response.responses
if res.response_range.kvs is not None
for kv in res.response_range.kvs
]
)
return sorted_keys, revision
[docs]
def create(
self, path: str, value: str, lease: etcd3_revolution1.Lease = None
) -> None:
"""Create a key and initialise it with the value.
Fails if the key already exists. If a lease is given, the key will
automatically get deleted once it expires.
:param path: Path to create
:param value: Value to set
:param lease: Lease to associate
:raises: ConfigCollision
"""
# Prepare parameters
_check_path(path)
tagged_path = _tag_depth(path)
lease_id = 0 if lease is None else lease.ID
value = str(value).encode("utf-8")
# Put value if version is zero (i.e. does not exist)
txn = self._client.Txn()
txn.compare(txn.key(tagged_path).version == 0)
txn.success(txn.put(tagged_path, value, lease_id))
response = self._retry_loop(txn.commit)
if not response.succeeded:
raise ConfigCollision(
path, f"Cannot create {path}, as it already exists!"
)
# pylint: disable=arguments-renamed
[docs]
def update(
self, path: str, value: str, must_be_rev: Optional[DbRevision] = None
) -> None:
"""
Update an existing key. Fails if the key does not exist.
:param path: Path to update
:param value: Value to set
:param must_be_rev: Fail if found value does not match given
revision (atomic update)
:raises: ConfigVanished
"""
# Validate parameters
_check_path(path)
tagged_path = _tag_depth(path)
value = str(value).encode("utf-8")
# Put value if version is *not* zero (i.e. it exists)
txn = self._client.Txn()
txn.compare(txn.key(tagged_path).version != 0)
if must_be_rev is not None:
if must_be_rev.revision is None:
raise ValueError("Did not pass a valid revision!")
txn.compare(txn.key(tagged_path).mod < must_be_rev.revision + 1)
txn.success(txn.put(tagged_path, value))
response = self._retry_loop(txn.commit)
if not response.succeeded:
raise ConfigVanished(
path, f"Cannot update {path}, as it does not exist!"
)
[docs]
def delete(
self,
path: str,
must_exist: bool = True,
recursive: bool = False,
prefix: bool = False,
max_depth: int = 16,
) -> None:
# pylint: disable=too-many-arguments
"""
Delete the given key or key range.
:param path: Path (prefix) of keys to remove
:param must_exist: Fail if path does not exist?
:param recursive: Delete children keys at lower levels recursively
:param prefix: Delete all keys at given level with prefix
:param max_depth: Recursion limit
:returns: Whether transaction was successful
"""
# Prepare parameters
tagged_path = _tag_depth(path)
# Determine start recursion level
txn = self._client.Txn()
if must_exist:
txn.compare(txn.key(tagged_path).version != 0)
txn.success(txn.delete(tagged_path, prefix=prefix))
# If recursive, we also delete all paths at lower recursion
# levels that have the path as a prefix
if recursive:
depth = path.count("/")
for lvl in range(depth + 1, depth + max_depth):
dpath = _tag_depth(path if prefix else path + "/", lvl)
txn.success(txn.delete(dpath, prefix=True))
# Execute
response = self._retry_loop(txn.commit)
if not response.succeeded:
raise ConfigVanished(
path, f"Cannot delete {path}, as it does not exist!"
)
[docs]
def close(self) -> None:
"""Close the client connection."""
self._client.close()
[docs]
class Etcd3Watch: # pylint: disable=too-many-instance-attributes
"""Wrapper for etc3 watch requests.
Entering the watcher using a `with` block yields a queue of `(key,
val, rev)` triples.
"""
def __init__(
self,
backend: Etcd3BackendRevolution1,
tagged_path: str,
start_revision: int,
prefix: bool,
max_retries: int = 20,
retry_time: float = 0.1,
): # pylint: disable=too-many-arguments
"""Initialise watcher.
:param backend: Backend instance
:param tagged_path: Tagged path to watch
:param start_revision: Yield events starting from this
revision (replaying history if needed)
:param prefix: Use prefix match instead of exact match
for the path
:param max_retries: Number of times we retry any database interaction
:param retry_time: Initial back-off time after a failed
database interaction, in seconds. Will be increased by 50%
for every failed attempt.
"""
self._tagged_path = tagged_path
self._start_revision = start_revision
self._prefix = prefix
self._max_retries = max_retries
self._retry_time = retry_time
self.queue = None
self._stop_thread = True
self._thread = None
self._watcher = backend._client.Watcher(
self._tagged_path,
start_revision=self._start_revision,
prefix=self._prefix,
)
[docs]
def start(self, queue: queue_m.Queue = None) -> None:
"""Activates the watcher, yielding a queue for updates."""
# Create queue if appropriate
if queue is None:
queue = queue_m.Queue()
self.queue = queue
# Start thread
self._stop_thread = False
self._thread = threading.Thread(target=self._run_thread)
self._thread.daemon = True
self._thread.start()
def _run_thread(self) -> None:
"""Thread watching for changes."""
# Yet another workaround: Replicate Watcher.run, but catch
# various exceptions and either
#
# 1) Retry (with some delay) a dropped connection,
# 2) catch if the server has compacted the revision we are
# trying to watch from, or
# 3) push the exception to the queue to make the caller
# handle it
# pylint: disable=protected-access,broad-except,too-many-lines
# Configure back-off retries
retry_time = self._retry_time
retries = 0
while not self._stop_thread:
try:
# Set up watcher
LOGGER.info(
"Starting watch at revision %s...", self._start_revision
)
self._watcher.start_revision = self._start_revision
self._watcher.revision = None
# Enter watcher - this is what actually requests the
# watch from the etcd3 server
with self._watcher:
# Connection established: Reset retries
retries = 0
retry_time = self._retry_time
# Main watch loop: Get events from watcher
self._run_watcher_loop(self._watcher)
except etcd3_revolution1.errors.Etcd3WatchCanceled as ex:
# The etcd server tells us that the watcher was
# cancelled. This typically happens because the
# connetion was reset (i.e. we need to re-establish
# the watch) and our stat_revision has been compacted
# in the meantime. After all, in theory we could be
# losing data here: If the connection reset lasted
# long enough for some change to a watched key to
# happen *and* be compacted (!), we would have no way
# to learn about it any more.
#
# This (very low) risk of data loss is basically
# inherent to compaction.
# Stopping?
if self._stop_thread:
break
# Re-raise any error that isn't about compactation
LOGGER.warning("Watcher thread caught %s", repr(ex))
if (
"compact_revision" not in ex.resp
or not ex.resp.compact_revision
):
self.queue.put((None, ex, None))
raise
# If we compacted past the point of the last
# update, move the start revision up to that point
# to prevent the exact same exception from getting
# thrown again
if (
self._start_revision is None
or ex.resp.compact_revision > self._start_revision
):
LOGGER.warning(
"Jumping to compaction revision %d",
ex.resp.compact_revision,
)
self._start_revision = ex.resp.compact_revision
# Retry
continue
except etcd3_revolution1.errors.Etcd3StreamError as ex:
# We get this typically when we are trying to close
# the watcher (see below). Otherwise this is a genuine
# error.
# Stopping?
if self._stop_thread:
break
self.queue.put((None, ex, None))
raise
except KeyError as ex:
# This seems to happen quite regularly instead of a
# connection error - etcd3-py tries to look up "None"
# in some dictionary.
LOGGER.warning(
"Watcher thread caught %s - typically harmless, ignored",
repr(ex),
)
continue
except requests.exceptions.ConnectionError as ex:
# The connection dropped for some reason. We generally
# want to retry a couple times in this situation
# before we give up on the database.
# Out of retries? Push exception on queue
if retries >= self._max_retries:
self.queue.put((None, ex, None))
raise
# Otherwise retry after some time
LOGGER.warning(
"Watcher thread caught %s - retry %d (%g s)",
repr(ex),
retries,
retry_time,
)
time.sleep(retry_time)
retries += 1
retry_time *= 1.5
continue
except Exception as ex:
# Push exception on queue
self.queue.put((None, ex, None))
raise
def _run_watcher_loop(self, watcher) -> None:
# Get events from watcher
for event in watcher:
# Get key path + value
key = _untag_depth(event.key)
if event.type == etcd3_revolution1.EventType.PUT:
val = event.value.decode("utf-8")
else:
val = None
# Record that we have seen this revision, so
# when we restart we will not ask for it again
self._start_revision = event.mod_revision + 1
rev = DbRevision(event.mod_revision)
self.queue.put((key, val, rev))
# pylint: disable=too-many-branches
[docs]
def stop(self):
"""Deactivates the watcher."""
if self._thread is None:
return
# Temporary workaround for testing: Manually stop the
# watcher. This is exactly what the original call would do,
# just with way more safety and logging
# pylint: disable=protected-access,broad-except,too-many-lines
# self._watcher.stop()
LOGGER.debug("Stopping watcher %s", self._thread.name)
# Prevent re-tries by overwriting the watcher's client. The
# problem here is that the way Watcher.__iter__ operates, if
# self.watching is False after self.request_create() finishes,
# it will just silently re-set self.watching and re-try the
# connection. This is the only reliable way to break it out of
# that loop.
# pylint: disable=too-few-public-methods
# pylint: disable=missing-class-docstring
# pylint: disable=missing-function-docstring
class DummyClient:
def __init__(self, watcher):
self._watcher = watcher
def watch_create(self, *_args, **_kwargs):
self._watcher.watching = False
raise etcd3_revolution1.errors.Etcd3StreamError(
None, None, None
)
self._watcher.client = DummyClient(self._watcher)
# Try to repeat this a couple of times
for _ in range(20):
# Kill the response stream
resp = self._watcher._resp
self._watcher.watching = False
self._stop_thread = True
if resp is not None and not resp.raw.closed:
# First attempt to shut down socket
try:
sock = socket.fromfd(
resp.raw._fp.fileno(),
socket.AF_INET,
socket.SOCK_STREAM,
)
sock.shutdown(socket.SHUT_RDWR)
sock.close()
except Exception as exc:
print(exc)
LOGGER.debug("Exception in socket shutdown", exc_info=True)
# Finally join the thread - but with a timeout
if self._thread and self._thread.is_alive():
self._thread.join(0.1)
# Stop if there's no thread (any more)
if not self._thread or not self._thread.is_alive():
break
LOGGER.debug("re-trying closing watcher stream...")
time.sleep(0.1)
# Then attempt to close the raw request, request, and
# connection, if not reset
resp = self._watcher._resp
if resp is not None and not resp.raw.closed:
try:
resp.raw.close()
except Exception as exc:
print(exc)
LOGGER.debug("Exception in closing raw request", exc_info=True)
try:
resp.close()
except Exception as exc:
print(exc)
LOGGER.debug("Exception in closing request", exc_info=True)
try:
if hasattr(resp, "connection"):
resp.connection.close()
except Exception as exc:
print(exc)
LOGGER.debug("Exception in closing connection", exc_info=True)
# Final attempt to join the thread
if self._thread and self._thread.is_alive():
self._thread.join(0.1)
if self._thread and self._thread.is_alive():
LOGGER.warning("Watcher thread did not exit!")
else:
LOGGER.debug("Watcher thread stopped")
self.queue = None
def __enter__(self):
"""Use for scoping watcher to a block."""
self.start()
return self.queue
def __exit__(self, *args):
"""Use for scoping watcher to a block."""
self.stop()
[docs]
class Etcd3Transaction(DbTransaction):
"""A series of queries and updates to be executed atomically.
Use :py:meth:`Etcd3Backend.txn()` or :py:meth:`Etcd3Watcher.txn()`
to construct transactions.
"""
# pylint: disable=too-many-instance-attributes
# Ideas:
#
# Ranged deletes - in contrast to the main backend we cannot
# release a range of keys (especially recursively) in a
# transaction yet. The tricky bit is to make this consistent with
# the rest of the transaction machinery and properly
# atomic. Easiest solution might just be to simply use a bunch of
# list_key and single delete calls to get the same effect. Would
# be slightly inefficient, but would get the job done.
#
# Caching - especially when looping a transaction most of the
# queried data from the database might still be valid. So after a
# loop we could just migrate the known information to a _cache
# (plus any further information we got from watches). Then once
# the "new" database revision has been determined we might just
# send one cheap query to the database to figure out which bits of
# it are still current.
#
# Cheaper update/create checks - right now we query the old value
# of the key on every update/create call, even though we are only
# interested in whether or not the key exists. We could instead
# query this along the same lines as list_keys. Not entirely sure
# this is worthwhile though, given that it is quite typical to
# "get" a key before "updating" it anyway, and collisions on
# "create" should be quite rare.
def __init__(
self,
backend: Etcd3BackendRevolution1,
client: etcd3_revolution1.Client,
max_retries: int = 64,
):
"""Initialise transaction."""
super().__init__(backend)
self._client = client
self._max_retries = max_retries
self._revision = None # Revision backed in after first read
self._get_queries = {} # Query log
self._list_queries = {} # Query log
self._updates = {} # Delayed updates
self._committed = False
self._loop = False
self._watch = False
self._watch_timeout = None
self._got_timeout = False # For test cases
self._retries = 0
self._watchers = {}
self._watch_queue = queue_m.Queue()
self._commit_callbacks = []
def _ensure_uncommitted(self) -> None:
if self._committed:
raise RuntimeError("Attempted to modify committed transaction!")
@property
def revision(self) -> int:
"""The last-committed database revision.
Only valid to call after the transaction has been comitted.
"""
if not self._committed:
raise RuntimeError(
"Revision is undefined on an uncommitted transaction!"
)
return self._revision.revision
[docs]
def get(self, path: str) -> str:
"""
Get value of a key.
:param path: Path of key to query
:returns: Key value. None if it doesn't exist.
"""
self._ensure_uncommitted()
# Check whether it was written as part of this transaction
if path in self._updates:
return self._updates[path][0]
# Check whether we already have the request response
if path in self._get_queries:
return self._get_queries[path][0]
# Perform get request
val, rev = self._get_queries[path] = self.backend.get(
path, revision=self._revision
)
# Set revision, if not already done so
if self._revision is None:
self._revision = rev
return val
[docs]
def list_keys(self, path: str, recurse: int = 0) -> list[str]:
"""
List keys under given path.
:param path: Prefix of keys to query. Append '/' to list
child paths.
:param recurse: Children depths to include in search
:returns: sorted key list
"""
self._ensure_uncommitted()
path_depth = path.count("/")
# Walk through depths, collecting known keys
try:
depth_iter = iter(recurse)
except TypeError:
depth_iter = range(recurse + 1)
keys = []
for depth in depth_iter:
# We might have created or deleted an uncommitted key that
# falls into the range - add to list
tagged_path = _tag_depth(path, path_depth + depth)
matching_vals = [
kv
for kv in self._updates.items()
if _tag_depth(kv[0]).startswith(tagged_path)
]
added_keys = {key for key, val in matching_vals if val is not None}
removed_keys = {key for key, val in matching_vals if val is None}
# Check whether we need to perform the request
query = (path, depth + path_depth)
if query not in self._list_queries:
self._list_queries[query] = self.backend.list_keys(
path, recurse=(depth,), revision=self._revision
)
# Add to key set
result, rev = self._list_queries[query]
keys.extend(set(result) - removed_keys | added_keys)
# Bake in revision if not already done so
if self._revision is None:
self._revision = rev
# Sort
return sorted(keys)
[docs]
def create(
self, path: str, value: str, lease: etcd3_revolution1.Lease = None
) -> None:
"""Create a key and initialise it with the value.
Fails if the key already exists. If a lease is given, the key will
automatically get deleted once it expires.
:param path: Path to create
:param value: Value to set
:param lease: Lease to associate
:raises: ConfigCollision
"""
self._ensure_uncommitted()
# Attempt to get the value - mainly to check whether it exists
# and put it into the query log
result = self.get(path)
if result is not None:
raise ConfigCollision(
path, f"Cannot create {path}, as it already exists!"
)
# Add update request
self._updates[path] = (value, lease)
[docs]
def update(self, path: str, value: str) -> None:
"""
Update an existing key. Fails if the key does not exist.
:param path: Path to update
:param value: Value to set
:raises: ConfigVanished
"""
self._ensure_uncommitted()
# As with "create"
result = self.get(path)
if result is None:
raise ConfigVanished(
path, f"Cannot update {path}, as it does not exist!"
)
# Add update request
self._updates[path] = (value, None)
[docs]
def delete(
self, path: str, must_exist: bool = True, recursive: bool = False
) -> None:
"""
Delete the given key.
:param path: Path of key to remove
:param must_exist: Fail if path does not exist?
:param recursive: Delete children keys at lower levels recursively
(not used)
"""
if must_exist:
# As with "update"
result = self.get(path)
if result is None:
raise ConfigVanished(
path, f"Cannot delete {path}, it does not exist!"
)
# Add delete request
self._updates[path] = (None, None)
[docs]
def commit(self) -> bool:
"""
Commit the transaction to the database.
This can fail, in which case the transaction must get `reset`
and built again.
:returns: Whether the commit succeeded
"""
self._ensure_uncommitted()
# If we have made no updates, we don't need to verify the log
if not self._updates:
self._committed = True
return True
# Create transaction
txn = self._client.Txn()
# Verify get() calls from the query log
for path, (_, rev) in self._get_queries.items():
tagged_path = _tag_depth(path)
if rev.revision is None:
# Did not exist? Verify continued non-existance. Note
# that it is possible for the key to have been
# created, then deleted again in the meantime.
txn.compare(txn.key(tagged_path).version == 0)
else:
# Otherwise check add an assertion that the revision
# has not been changed. This actually guarantees that
# the key has not been touched since we read it.
txn.compare(txn.key(tagged_path).mod < rev.revision + 1)
# Verify list_keys() calls from the query log
for (path, depth), (result, rev) in self._list_queries.items():
tagged_path = _tag_depth(path, depth)
# Make sure that all returned keys still exist
for res_path in result:
tagged_res_path = _tag_depth(res_path)
txn.compare(txn.key(tagged_res_path).version > 0)
# Also check that no new keys have entered the range
# (by checking whether the request would contain any
# keys with a newer create revision than our request)
txn.compare(
txn.key(tagged_path, prefix=True).create
< self._revision.revision + 1
)
# Commit changes. Note that the dictionary guarantees that we
# only update any key at most once.
for path, (value, lease) in self._updates.items():
tagged_path = _tag_depth(path)
lease_id = None if lease is None else lease.ID
if value is None:
txn.success(txn.delete(tagged_path, value, lease_id))
else:
txn.success(txn.put(tagged_path, value, lease_id))
# Done
self._committed = True
# pylint: disable=protected-access
response = self.backend._retry_loop(txn.commit)
if response.succeeded:
for callback in self._commit_callbacks:
callback()
self._commit_callbacks = []
return response.succeeded
[docs]
def on_commit(self, callback: Callable[[], None]) -> None:
"""Register a callback to call when the transaction succeeds.
A bit of a hack, but occassionally useful to add additional
side-effects to a transaction that are guaranteed to not get
duplicated.
:param callback: Callback to call
"""
self._commit_callbacks.append(callback)
[docs]
def reset(self, revision: Optional[DbRevision] = None) -> None:
"""Reset the transaction, so it can be restarted after commit()."""
if not self._committed:
raise RuntimeError("Called reset on an uncommitted transaction!")
# Reset
self._revision = revision
self._get_queries = {}
self._list_queries = {}
self._updates = {}
self._committed = False
self._loop = False
self._watch = False
self._watch_timeout = None
[docs]
@deprecated
def loop(self, watch: bool = False, watch_timeout: Optional[float] = None):
"""Repeat transaction execution, even if it succeeds.
*Deprecated*: Use :py:class:`Etcd3Watcher` instead, or loop manually.
:param watch: Once the transaction succeeds, block until one of
the values read changes, then loop the transaction
:param watch_timeout: timeout value
"""
if self._loop:
# If called multiple times, looping immediately takes precedence
self._watch = self._watch and watch
else:
self._loop = True
self._watch = watch
if watch:
self._watch_timeout = watch_timeout
def __iter__(self) -> "Etcd3Transaction":
"""Iterate transaction as requested by loop(), or until it succeeds."""
try:
while self._retries <= self._max_retries:
# Should build up a transaction
yield self
# Try to commit, count how many times we have tried
if not self.commit():
self._retries += 1
else:
self._retries = 0
# No further loop?
if not self._loop:
return
# Use watches? Then wait for something to happen
# before looping.
if self._watch:
self._do_watch()
# Repeat after reset otherwise
self.reset()
finally:
# Warn if we are dropping uncommitted changes to the
# database - this is an easy mistake to make:
#
# for txn in cfg.txn():
# txn.put(...)
# # ...
# # We are done!
# return
if self._updates and not self._committed:
LOGGER.warning(
"Transaction loop aborted - dropping updates to %s!",
list(self._updates.keys()),
)
self._clear_watch()
# Ran out of repeats? Fail
raise RuntimeError(
f"Transaction did not succeed after {self._max_retries} retries!"
)
[docs]
@deprecated
def clear_watch(self) -> None:
"""Stop all currently active watchers.
*Deprecated*: Use :py:class:`Etcd3Watcher` instead.
"""
self._clear_watch()
def _clear_watch(self) -> None:
"""Stop all currently active watchers."""
# Remove watchers
for watcher in self._watchers.values():
watcher.stop()
self._watchers = {}
def _update_watchers(self) -> None:
# Watch any ranges we listed. Note that this will trigger also
# on key updates, we will filter that below.
prefixes = []
active_watchers = set()
for path, depth in self._list_queries:
query = ("list", path, depth)
# Add tagged prefixes so we can check for key overlap later
prefixes.append(_tag_depth(path, depth))
active_watchers.add(query)
# Start a watcher, if required
if self._watchers.get(query) is None:
self._watchers[query] = self.backend.watch(
path, revision=self._revision, prefix=True, depth=depth
)
self._watchers[query].start(self._watch_queue)
# Watch any individual key we read
for path in self._get_queries:
query = ("get", path)
# Check that we are not already watching this key as
# part of a range. This is basically using the
# above-mentioned property of range watches to our
# advantage. This is actually a fairly important
# optimisation, as it means that listing keys followed
# by iterating over the values won't create extra
# watches here!
tagged_path = _tag_depth(path)
if not any(tagged_path.startswith(pre) for pre in prefixes):
active_watchers.add(query)
# Start individual watcher, if required
if self._watchers.get(query) is None:
self._watchers[query] = self.backend.watch(
path, revision=self._revision
)
self._watchers[query].start(self._watch_queue)
# Remove any watchers that we are not currently using. Note
# that we only do this on the next watch() call, so watchers
# will be kept alive through transaction failures *and*
# non-waiting loops. So as long as the set of keys waited on
# is relatively constant (and ideally forms ranges), we will
# not generate much churn here.
for query, watcher in list(self._watchers.items()):
if query not in active_watchers:
watcher.stop()
del self._watchers[query]
[docs]
def watch(self) -> None:
"""Wait for a change on one of the values read.
*Deprecated*: Use :py:class:`Etcd3Watcher` instead.
:returns: The revision at which a change was detected.
"""
return self._do_watch()
def _do_watch(self) -> DbRevision:
"""Wait for a change on one of the values read.
:returns: The revision at which a change was detected.
"""
# Make sure the watchers we have in place match what we read
self._update_watchers()
# Wait for updates from the watcher queue
block = True
revision = self._revision
start_time = time.time()
while True:
# Determine timeout
timeout = None
if self._watch_timeout is not None:
timeout = max(
0, start_time + self._watch_timeout - time.time()
)
# Wait for something to get pushed on the queue
try:
path, value, rev = self._watch_queue.get(block, timeout)
except queue_m.Empty:
self._got_timeout = block
return revision
# Exception? Re-raise
if path is None and isinstance(value, Exception):
raise value
# Manual trigger?
if path is None and value is None and rev is None:
self._got_timeout = False
break
# Check that revision is newer (prevent duplicated updates)
if rev.revision <= revision.revision:
continue
# Are we waiting on a value change of this one?
if path not in self._get_queries:
# Are we getting this because of one of the list queries?
tagged_path = _tag_depth(path)
found_match = False
for (lpath, depth), (result, _) in self._list_queries.items():
if tagged_path.startswith(_tag_depth(lpath, depth)):
# We should not notify for a value change,
# only if a key was added / removed. Good
# thing we can check that using the log.
if value is None or path not in result:
found_match = True
break
# Otherwise this is either a misfire from an old
# watcher, or a value update from a list watcher (see
# above). Ignore.
if not found_match:
continue
# Alright, we can stop waiting. However, we will attempt
# to clear the queue before we do so, as we might get a
# lot of updates in batch
revision = rev
block = False
return revision
[docs]
def trigger_loop(self) -> None:
"""Manually triggers a loop
Effectively makes loop(True) behave like loop(False), looping
immediately. This is useful for interrupting a blocking
watch() from a different thread.
"""
# Push a magic "cancel" entry
self._watch_queue.put((None, None, None))
[docs]
class Etcd3Watcher(Watcher):
"""Watch for database changes by using nested transactions
Use as follows:
.. code-block:: python
for watcher in config.watcher():
for txn in watcher.txn():
# ... do something
for txn in watcher.txn():
# ... do something else
At the end of a for loop iteration, the watcher will start
watching all values read by transactions started through
:py:meth:`txn`, and only repeat the execution of the loop body
once one of these values has changed.
"""
def __init__(
self,
backend: Etcd3BackendRevolution1,
client: etcd3_revolution1.Client,
timeout: float = None,
txn_wrapper: TxnWrapper = None,
):
"""Initialise watcher.
:param timeout: Maximum time to wait per loop in seconds.
If ``None``, will wait indefinitely.
"""
super().__init__(backend, timeout, txn_wrapper)
self._wait_txn = Etcd3Transaction(backend, client)
self._client = client
[docs]
def txn(
self, max_retries: int = 64
) -> Iterable[Etcd3Transaction | TxnWrapper]:
"""Create nested transaction.
The watcher loop will iterate when any value read by
transactions created by this method have changed in the
database.
Note that these transactions otherwise behave exactly as
normal transactions: As long as they are internally
consistent, they will be commited. This means there is no
consistency guarantees between transactions created from the
same watcher, i.e. one transaction might read one value from
the database while a later one reads another.
:param max_retries: Maximum number of times the transaction will be
tried before giving up.
"""
# Make a new transaction.
# pylint: disable=fixme
# TODO: Would be more efficient if the different transactions
# could share some sort of cache so they don't need to
# re-query keys...
for txn in Etcd3Transaction(self.backend, self._client, max_retries):
yield self.get_txn(txn)
# Extract read values from transaction
# pylint: disable=protected-access,undefined-loop-variable
if txn._committed:
# Take over earliest revision used in a transaction, as we
# want to know about any changes from that particular
# point forward.
if (
self._wait_txn._revision is None
or self._wait_txn._revision.revision > txn._revision.revision
):
self._wait_txn._revision = txn._revision
self._wait_txn._get_queries.update(txn._get_queries)
self._wait_txn._list_queries.update(txn._list_queries)
def __iter__(self) -> "Etcd3Watcher":
"""
Iterate forever, waiting after every interaction for something to
change.
"""
try:
while True:
yield self
# reset the alarm, since we don't want to use it
# in the next iteration
timeout = self.get_timeout()
self._wake_up_at = None
# pylint: disable=fixme, protected-access
# TODO: Move those to this class!
self._wait_txn.loop(True, timeout)
self._wait_txn._do_watch()
# Clear current queries
self._wait_txn._get_queries = {}
self._wait_txn._list_queries = {}
finally:
# pylint: disable=protected-access
self._wait_txn._clear_watch()
[docs]
def trigger(self) -> None:
"""Manually triggers a loop
Can be called from a different thread to force a loop, even if
the watcher is currently waiting.
"""
# pylint: disable=protected-access
self._wait_txn.trigger_loop()