Source code for ska_ser_skuid.snowflake

"""Snowflake ID generator.

Heavily adapted from https://github.com/10xHub/snowflakekit
https://en.wikipedia.org/wiki/Snowflake_ID
"""

import hashlib
import random
import threading
from enum import IntEnum
from socket import gethostname
from time import sleep, time

# Note that this differs from the Y2000 TAI epoch defined in ADR-39.
# January 7, 2026 when SKAO announced First Fringes from Mid.
SKUID_OFFSET_MILLISECONDS = 1767744000000
SEP = "-"

# Snowflake bit allocation (total 63 bits)
TIMESTAMP_BITS = 42  # 139.5 years worth of milliseconds before overflow.
TIMESTAMP_UBOUND = 2**TIMESTAMP_BITS
GENERATOR_BITS = 10  # 1024 possible generator IDs.
GENERATOR_UBOUND = 2**GENERATOR_BITS
RANDOM_BITS = 11  # 2048 IDs per millisecond
RANDOM_UBOUND = 2**RANDOM_BITS

# Scan ID bit allocation (total 48 bits)
SCAN_TIMESTAMP_BITS = 42
SCAN_GENERATOR_BITS = 1
SCAN_GENERATOR_UBOUND = 2**SCAN_GENERATOR_BITS
SCAN_SEQUENCE_BITS = 5
SCAN_SEQUENCE_UBOUND = 2**SCAN_SEQUENCE_BITS


[docs] class TelescopeBit(IntEnum): LOW = 0 MID = 1
[docs] def make_generator_id(value: bytes) -> int: """Deterministically produce a 10-bit generator ID by hashing the input bytes. Uses BLAKE2b (2-byte digest) truncated to 10 bits. The default generator ID is derived from the system hostname via this function. :param value: Bytes to hash, e.g. ``socket.getfqdn().encode()``. :returns: An integer in the range ``[0, 1024)`` (10 bits). Example:: from ska_ser_skuid import make_generator_id, mint_skuid, EntityType GENERATOR_ID = make_generator_id(b'my-service-instance') # e.g. 620 skuid = mint_skuid(EntityType.SBD, generator_id=GENERATOR_ID) """ hash_digest = hashlib.blake2b(value, digest_size=2).digest() generator_id = int.from_bytes(hash_digest, byteorder="big") & ((1 << GENERATOR_BITS) - 1) return generator_id
DEFAULT_GENERATOR_ID = make_generator_id(gethostname().encode())
[docs] def unix_epoch() -> int: """Return the Unix epoch timestamp in milliseconds. :return: unix epoch in milliseconds. """ return int(time() * 1000)
[docs] def extract_generator_id(snowflake: int) -> int: """Extract the 10-bit generator ID from a Snowflake integer. :param snowflake: A 63-bit Snowflake ID. :returns: The generator ID component (0–1023). """ mask = ((1 << GENERATOR_BITS) - 1) << RANDOM_BITS return (snowflake & mask) >> RANDOM_BITS
[docs] def extract_random_suffix(snowflake: int) -> int: """Extract the 11-bit random suffix from a Snowflake integer. :param snowflake: A 63-bit Snowflake ID. :returns: The random suffix component (0–2047). """ mask = (1 << RANDOM_BITS) - 1 return snowflake & mask
[docs] def extract_timestamp_ms(snowflake: int) -> int: """Extract the Unix timestamp (milliseconds) from a Snowflake integer. The returned value is a standard Unix epoch timestamp in milliseconds (SKUID timestamp offset is added internally). :param snowflake: A 63-bit Snowflake ID. :returns: Unix timestamp in milliseconds. """ mask = ((1 << TIMESTAMP_BITS) - 1) << (GENERATOR_BITS + RANDOM_BITS) ts = (snowflake & mask) >> (GENERATOR_BITS + RANDOM_BITS) return ts + SKUID_OFFSET_MILLISECONDS
class _TimestampGeneratorBase: """Shared timestamp state and time-bound checks for ID generators.""" def __init__(self): self.last_unix_ts = -1 self.lock = threading.Lock() def _on_new_millisecond(self): """Reset per-millisecond state.""" def _get_skuid_timestamp(self) -> int: unix_timestamp = unix_epoch() if unix_timestamp < self.last_unix_ts: raise RuntimeError("Clock moved backwards! Refusing to generate IDs.") if unix_timestamp > self.last_unix_ts: self._on_new_millisecond() self.last_unix_ts = unix_timestamp skuid_ts = unix_timestamp - SKUID_OFFSET_MILLISECONDS if skuid_ts >= TIMESTAMP_UBOUND: raise OverflowError( f"Timestamp is too large to represent in {TIMESTAMP_BITS} bits: {skuid_ts}" ) return skuid_ts def _wait_for_next(self): """Wait until the next millisecond.""" while (timestamp := unix_epoch()) <= self.last_unix_ts: sleep(0.001) # Sleep 1ms self.last_unix_ts = timestamp self._on_new_millisecond()
[docs] class SnowflakeGenerator(_TimestampGeneratorBase): """Generates unique 63-bit IDs using the Snowflake algorithm."""
[docs] def __init__(self): super().__init__() # Track values already used in this time interval self.used_suffixes: set[int] = set()
def _on_new_millisecond(self): self.used_suffixes.clear()
[docs] def generate( self, *, generator_id: int = DEFAULT_GENERATOR_ID, ) -> int: """Generate a collision-safe 63-bit Snowflake ID. Thread-safe. If all 2048 random suffixes are exhausted within the current millisecond, waits until the clock advances. :param generator_id: A 10-bit integer (0–1023) identifying this generator. Defaults to a hash of the system hostname. :returns: A 63-bit Snowflake ID. :raises OverflowError: If *generator_id* is outside ``[0, 1024)``. :raises RuntimeError: If the system clock moves backwards. """ if generator_id not in range(0, GENERATOR_UBOUND): raise OverflowError( f"generator_id {generator_id} must fit into " f"{GENERATOR_BITS} bit integer: [0, {GENERATOR_UBOUND})" ) with self.lock: random_bits = self._get_random_bits() timestamp = self._get_skuid_timestamp() self.used_suffixes.add(random_bits) # Assemble the 64 bit Snowflake ID # Layout: [1 sign bit][42 bits timestamp][10 bits generator][11 bits random] time_part = timestamp << (GENERATOR_BITS + RANDOM_BITS) generator_part = generator_id << RANDOM_BITS random_part = random_bits snowflake_id = time_part | generator_part | random_part return snowflake_id
def _get_random_bits(self) -> int: while True: random_bits = random.getrandbits(RANDOM_BITS) if random_bits not in self.used_suffixes: return random_bits if len(self.used_suffixes) >= RANDOM_UBOUND: # Exhausted all random values in this millisecond self._wait_for_next()
[docs] class ScanIDGenerator(_TimestampGeneratorBase): """Generates unique 48-bit Scan IDs as opaque integers. Layout: [42 bits timestamp][5 bits sequence][1 bit generator] """
[docs] def __init__(self): super().__init__() self.sequence = 0
def _on_new_millisecond(self): self.sequence = 0
[docs] def generate(self, telescope_bit: TelescopeBit = TelescopeBit.LOW) -> int: """Generate a collision-resistent 48-bit Scan ID. Thread-safe. If all 32 sequence values are exhausted within the current millisecond, waits until the clock advances. :param telescope_bit: Single bit to avoid collisions between telescopes. :returns: A 48-bit Scan ID integer. :raises ValueError: If telescope_bit is not 1 or 0. :raises RuntimeError: If the system clock moves backwards. """ telescope_bit = TelescopeBit(telescope_bit) with self.lock: timestamp = self._get_skuid_timestamp() if self.sequence >= SCAN_SEQUENCE_UBOUND: self._wait_for_next() timestamp = self._get_skuid_timestamp() sequence = self.sequence self.sequence += 1 time_part = timestamp << (SCAN_SEQUENCE_BITS + SCAN_GENERATOR_BITS) sequence_part = sequence << SCAN_GENERATOR_BITS return time_part | sequence_part | telescope_bit
DEFAULT_SNOWFLAKE_GENERATOR_INSTANCE = SnowflakeGenerator() DEFAULT_SCAN_ID_GENERATOR_INSTANCE = ScanIDGenerator()
[docs] def get_snowflake( generator_id: int = DEFAULT_GENERATOR_ID, generator: SnowflakeGenerator = DEFAULT_SNOWFLAKE_GENERATOR_INSTANCE, ) -> int: """Generate a Snowflake ID using the module-level default generator. Thin convenience wrapper around :meth:`SnowflakeGenerator.generate`. :param generator_id: A 10-bit integer (0–1023) identifying this generator. :param generator: The :class:`SnowflakeGenerator` instance to use. :returns: A 63-bit Snowflake ID. """ return generator.generate(generator_id=generator_id)
[docs] def get_scan_id( telescope_bit: TelescopeBit, generator: ScanIDGenerator = DEFAULT_SCAN_ID_GENERATOR_INSTANCE, ) -> int: """Generate a Scan ID using the module-level default generator. Thin convenience wrapper around :meth:`ScanIDGenerator.generate`. :param telescope_bit: Single bit to avoid collisions between telescopes. :param generator: The :class:`ScanIDGenerator` instance to use. :returns: A 48-bit Scan ID integer. Example:: from ska_ser_skuid import LOW_SCAN, MID_SCAN, get_scan_id scan_id = get_scan_id(LOW_SCAN) """ return generator.generate(telescope_bit=telescope_bit)