Source code for ska_sdp_dal.common

import binascii
import logging
import os
import sys
from typing import Any, Dict, Iterator, List, NewType, Optional

import numpy
import pyarrow
import pyarrow.plasma as plasma

logger = logging.getLogger(__name__)

#: Length of Plasma's object IDs
OBJECT_ID_SIZE = 20
#: Arrow type used for representing Plasma object IDs
OBJECT_ID_TYPE = pyarrow.binary(OBJECT_ID_SIZE)

#: Bytes of the Object ID we are going to use as namespace prefix
NAMESPACE_ID_SIZE = 4
#: Metadata entry for namespace name
PROC_NAMESPACE_META = b"proc:namespace"
#: Metadata entry for namespace process ID
PROC_NAMESPACE_PID_META = b"proc:pid"
#: Metadata entry for namespace process arguments
PROC_NAMESPACE_ARGV_META = b"proc:argv"

#: First object in namespace is expected to be its declaration
NAMESPACE_DECL_SUFFIX = b"\0" * (OBJECT_ID_SIZE - NAMESPACE_ID_SIZE)

# Schema used for declaring calls valid for namespace
PROCS_SCHEMA = pyarrow.schema(
    [
        ("name", pyarrow.string()),
        ("schema", pyarrow.binary()),  # serialised pyarrow.Schema
    ]
)

# RPC-specific meta data added to schemas
PROC_FUNC_META = b"proc:func"
PROC_PAR_META = b"proc:par"
PROC_PAR_META_IN = b"in"
PROC_PAR_META_OUT = b"out"
PROC_TENSOR_TYPE_META = b"proc:tensor_type"
PROC_TENSOR_DIMS_META = b"proc:tensor_dims"
PROC_TABLE_SCHEMA_META = b"proc:table_schema"


[docs]def parse_hex_objectid(oid_str: str) -> bytes: """Parse an Object ID given as a hexadecimal string representation Note that this allows Object IDs to have less than 20 bytes, i.e. partial Object IDs (prefixes) are parsed without error. :param oid_str: String representation :returns: Object ID as binary string """ # Validate if len(oid_str) % 2 != 0: raise ValueError("Object ID string must have even length!") if len(oid_str) // 2 > OBJECT_ID_SIZE: raise ValueError( "Object ID can have {} bytes at maximum!".format(OBJECT_ID_SIZE) ) # Parse prefixes oid = bytearray() for i in range(len(oid_str) // 2): oid.append(int(oid_str[2 * i : 2 * i + 2], 16)) return bytes(oid)
[docs]def object_id_hex(oid: plasma.ObjectID) -> str: """Convert Object ID into a hexadecimal string representation :param oid: The Object ID to convert (as bytearray or ObjectID) """ if isinstance(oid, plasma.ObjectID): oid = oid.binary() return binascii.hexlify(oid).decode("ascii")
[docs]def objectid_generator( prefix: bytes, size: int = OBJECT_ID_SIZE ) -> Iterator[bytes]: """Generate ObjectIDs with a given prefix. :param prefix: Prefix as binary string """ # Pad prefix to length oid = bytearray(prefix) while len(oid) < size: oid.append(0) # Generate IDs byte_count = size - len(prefix) i = 0 while i < 2 ** (8 * byte_count): for k in range(byte_count): oid[-k - 1] = (i >> 8 * k) % 256 yield bytes(oid) i += 1
def _make_namespace_decl( name: str = None, procs: List[pyarrow.Schema] = [] ) -> bytes: """Generate namespace declaration. This gives the supported call schemas (if any) plus some information about the owning process. Meant to allow users and other processes to identify the purpose of this namespace. :param name: Informative display name for namespace :param procs: Call schemas supported (if any) :returns: Namespace declaration as record batch """ # Add process identification to metadata _metadata = { PROC_NAMESPACE_META: str(name).encode(), PROC_NAMESPACE_ARGV_META: str(sys.argv).encode(), PROC_NAMESPACE_PID_META: str(os.getpid()).encode(), } # Build batch record return pyarrow.record_batch( [ pyarrow.array( [schema.metadata[PROC_FUNC_META] for schema in procs] ), pyarrow.array( [schema.serialize().to_pybytes() for schema in procs] ), ], PROCS_SCHEMA.with_metadata(_metadata), )
[docs]def is_namespace_decl(oid: plasma.ObjectID): """Checks whether the given object ID declares a namespace.""" if isinstance(oid, plasma.ObjectID): oid = oid.binary() return oid.endswith(NAMESPACE_DECL_SUFFIX)
def _numpy_cast_to_complex(arr: numpy.ndarray) -> numpy.ndarray: """Cast array with inner-most dimension of 2 into complex array. :param arr: Array to convert """ if arr.shape[-1] != 2 and not arr.size == 0: raise ValueError( "To interpret as complex type, array must have " + "inner dimension of 2!" ) # Determine output type if arr.dtype == numpy.single: dtype = numpy.csingle elif arr.dtype == numpy.double: dtype = numpy.cdouble elif arr.dtype == numpy.longdouble: dtype = numpy.clongdouble else: raise TypeError( "Unsupported dtype for cast to complex: {}".format(arr.dtype) ) # Perform cast out = numpy.frombuffer(memoryview(arr), dtype=dtype, count=arr.size // 2) return out.reshape(arr.shape[:-1]) def _numpy_cast_from_complex(arr: numpy.ndarray) -> numpy.ndarray: """Cast complex array to real array with inner dimension of 2 :param arr: Array to convert """ # Determine output type if arr.dtype == numpy.csingle: dtype = numpy.single elif arr.dtype == numpy.cdouble: dtype = numpy.double elif arr.dtype == numpy.clongdouble: dtype = numpy.longdouble else: raise TypeError( "Unsupported dtype for cast from complex: {}".format(arr.dtype) ) # Perform cast, which requires a C-contiguous array if not arr.flags["C_CONTIGUOUS"]: arr = numpy.ascontiguousarray(arr) out = numpy.frombuffer(memoryview(arr), dtype=dtype, count=arr.size * 2) return out.reshape(arr.shape + (0 if arr.size == 0 else 2,))
[docs]class ComplexType(object): """ Pseudo-type to refer to complex values. Use in place of arrow types. """ def __init__(self, real_type, complex_dtype, real_dtype): self.real_type = real_type self.complex_dtype = complex_dtype self.real_dtype = real_dtype
[docs] def to_pandas_dtype(self): return self.complex_dtype
#: Single-precision complex type complex64 = ComplexType(pyarrow.float32(), numpy.csingle, numpy.single) #: Double-precision complex type complex128 = ComplexType(pyarrow.float64(), numpy.cdouble, numpy.double)
[docs]def from_numpy_dtype(dtype: Any) -> pyarrow.DataType: dt = numpy.dtype(dtype) if dt == numpy.csingle: return complex64 if dt == numpy.cdouble: return complex128 return pyarrow.from_numpy_dtype(dtype)
def _put_numpy( client: plasma.PlasmaClient, oid: plasma.ObjectID, array: numpy.ndarray, typ: pyarrow.DataType = None, dim_names: List[str] = None, temporary: bool = True, ) -> pyarrow.Buffer: """Write a numpy array to storage as a tensor Note that this is less efficient than constructing it in-place, which we should support at some point (TODO) :param client: Plasma client to use :param oid: Object ID to use :param array: Tensor as numpy array :param typ: Tensor assumed value type :param dim_names: Tensor dimension names :param temporary: Can be deleted once returned buffer is discarded :returns: Buffer in Plasma store used to write object """ # Check dimensionality if dim_names is not None: if len(array.shape) != len(dim_names): raise TypeError( f"Tensor has {len(array.shape)} dimensions, but " + f"expected {len(dim_names)} ({','.join(dim_names)}!" ) # Attempt to convert to tensor of given type, if applicable is_complex = isinstance(typ, ComplexType) if is_complex: array = _numpy_cast_from_complex(array) if dim_names is not None: dim_names = list(dim_names) + ["cpx"] typ = typ.real_type if typ is not None: array = array.astype(typ.to_pandas_dtype(), copy=False) tensor = pyarrow.Tensor.from_numpy(array) if typ is not None: if not tensor.type.equals(typ): raise TypeError( "Tensor has type {}, but expected {}!".format(tensor.type, typ) ) # Determine size, reserve in Plasma and mmap in tensor_size = pyarrow.ipc.get_tensor_size(tensor) out_buf = client.create(oid, tensor_size) # Write to fixed-size buffer in shared memory writer = pyarrow.FixedSizeBufferWriter(out_buf) pyarrow.ipc.write_tensor(tensor, writer) # Seal + request deletion client.seal(oid) if temporary: client.delete([oid]) return out_buf def _tensor_from_buf( buf: pyarrow.Buffer, name: str, typ: pyarrow.DataType = None, dim_names: List[str] = None, ) -> numpy.ndarray: # Deserialise the tensor tensor = pyarrow.ipc.read_tensor(pyarrow.BufferReader(buf)) # Check type is_complex = False if isinstance(typ, ComplexType): typ = typ.real_type is_complex = True if typ is not None and not tensor.type.equals(typ): raise TypeError( ("Field {} refers to tensor " + "of type {}, expected {}!").format( name, tensor.type, typ ) ) # Check dimensionality if dim_names is not None: ndim = tensor.ndim if is_complex: if ndim < 1: raise TypeError( ( "Field {} has complex type, but " + "zero dimensions!" ).format(name) ) ndim -= 1 if ndim != len(dim_names): raise TypeError( ( "(Field {} refers to tensor " + "of dimensionality {}, expected {} ({})!" ).format(name, ndim, len(dim_names), ",".join(dim_names)) ) # TODO: Check dimension names? # Convert to numpy np = tensor.to_numpy() if is_complex: np = _numpy_cast_to_complex(np) return np par_spec = NewType("par_spec", tuple)
[docs]def call_name(schema: pyarrow.Schema) -> str: """Get call name from call schema :param schema: Call schema """ return schema.metadata[PROC_FUNC_META].decode()
[docs]def par_meta(field: pyarrow.Field) -> Optional[str]: """Get parameter kind metadata from schema field :param schema: Field :return: Parameter kind, or `None` if not set """ if field.metadata is None: return None return field.metadata.get(PROC_PAR_META)
[docs]def par_tensor_elem_type(field: pyarrow.Field) -> Optional[pyarrow.DataType]: """Get tensor element type parameter :param field: Field to read metadata from :return: Parameter element type, or `None` if not set """ if field.metadata is None or PROC_TENSOR_TYPE_META not in field.metadata: return None return from_numpy_dtype(field.metadata[PROC_TENSOR_TYPE_META])
[docs]def par_tensor_dim_names(field: pyarrow.Field) -> Optional[List[str]]: """Get tensor element type parameter :param field: Field to read metadata frmo :return: Parameter element type, or `None` if not set """ if field.metadata is None or PROC_TENSOR_DIMS_META not in field.metadata: return None return field.metadata[PROC_TENSOR_DIMS_META].decode().split(",")
[docs]def par_table_schema(field: pyarrow.Field) -> Optional[pyarrow.Schema]: """Get table schma for a parameter :param field: Field to read metadata frmo :return: Table schema, or `None` if not set """ if field.metadata is None or PROC_TABLE_SCHEMA_META not in field.metadata: return None # Deserialise try: schema_bytes = field.metadata[PROC_TABLE_SCHEMA_META] reader = pyarrow.BufferReader(pyarrow.py_buffer(schema_bytes)) return pyarrow.ipc.RecordBatchStreamReader(reader).schema except pyarrow.ArrowException as e: logger.warning("Could not deserialise table schema: " + repr(e)) return None
[docs]def make_par( name: str, typ: pyarrow.DataType, nullable: bool = False, metadata: Dict[str, str] = {}, ) -> par_spec: """ Create parameter declaration to pass to make_call_schema. :param name: Parameter name :param typ: Arrow data type :param nullable: Allowed to be null? :param metadata: Metadata dictionary to associate with field. """ return (str(name), typ, nullable, dict(metadata))
[docs]def make_oid_par( name: str, nullable: bool = False, metadata: Dict[bytes, bytes] = {} ) -> par_spec: """ Create Object ID parameter to pass to make_call_schema. :param name: Parameter name :param nullable: Allowed to be null? :param metadata: Metadata dictionary to associate with field. """ return make_par(name, OBJECT_ID_TYPE, nullable, metadata)
[docs]def make_oid_input_par( name: str, nullable: bool = False, metadata: Dict[bytes, bytes] = {} ) -> par_spec: """Create input Object ID parameter to pass to make_call_schema. Marking the parameter as input means that the call will be delayed until an object with the given ID appears in the Plasma store. :param name: Parameter name :param nullable: Allowed to be null? :param metadata: Metadata dictionary to associate with field. """ metadata = dict(metadata) metadata[PROC_PAR_META] = PROC_PAR_META_IN return make_oid_par(name, nullable, metadata)
[docs]def make_tensor_input_par( name: str, elem_type: pyarrow.DataType, dim_names: List[str], nullable: bool = False, ) -> par_spec: """Create input tensor parameter to pass to make_call_schema. Marking the parameter as input means that the call will be delayed until a tensor with the given ID appears in the Plasma store. :param name: Parameter name :param elem_type: Tensor element type :param dim_names: Dimension names :param nullable: Allowed to be null? """ metadata = { PROC_TENSOR_TYPE_META: str( numpy.dtype(elem_type.to_pandas_dtype()) ).encode(), PROC_TENSOR_DIMS_META: ",".join(dim_names).encode(), } return make_oid_input_par(name, nullable, metadata)
[docs]def make_table_input_par( name: str, table_schema: pyarrow.Schema, nullable: bool = False ) -> par_spec: """Create input tensor parameter to pass to make_call_schema. Marking the parameter as input means that the call will be delayed until a tensor with the given ID appears in the Plasma store. :param name: Parameter name :param elem_type: Tensor element type :param dim_names: Dimension names :param nullable: Allowed to be null? """ metadata = { PROC_TABLE_SCHEMA_META: table_schema.serialize().to_pybytes(), } return make_oid_input_par(name, nullable, metadata)
[docs]def make_oid_output_par( name: str, nullable: bool = False, metadata: Dict[bytes, bytes] = {} ) -> par_spec: """Create Object ID parameter to pass to make_call_schema. The call will be skipped if all outputs already exist in the Plasma store. :param name: Parameter name :param nullable: Allowed to be null? :param metadata: Metadata dictionary to associate with field. """ metadata = dict(metadata) metadata[PROC_PAR_META] = PROC_PAR_META_OUT return make_oid_par(name, nullable, metadata)
[docs]def make_tensor_output_par( name: str, elem_type: pyarrow.DataType, dim_names: List[str], nullable: bool = False, ) -> par_spec: """Create input tensor parameter to pass to make_call_schema. Marking the parameter as input means that the call will be delayed until a tensor with the given ID appears in the Plasma store. :param name: Parameter name :param elem_type: Tensor element type :param dim_names: Dimension names :param nullable: Allowed to be null? """ metadata = { PROC_TENSOR_TYPE_META: numpy.dtype( elem_type.to_pandas_dtype() ).name.encode(), PROC_TENSOR_DIMS_META: ",".join(dim_names).encode(), } return make_oid_output_par(name, nullable, metadata)
[docs]def make_call_schema( func_name: str, pars: List[par_spec], metadata={} ) -> pyarrow.Schema: """Create schema for calls through the Plasma store. :param func_name: Function name :param pars: List of parameters :param metadata: Metadata to associate with schema """ metadata = dict(metadata) metadata[PROC_FUNC_META] = func_name return pyarrow.schema(pars, metadata=metadata)
[docs]def schema_compatible( expected: pyarrow.Schema, actual: pyarrow.Schema ) -> bool: """Checks for compatibility between (call) schemas. This means that all expected fields are there and have the same types (including relevant metadata). :param expected: Expected schema :param actual: Schema to check :return: Empty list if compatible, otherwise list of mismatches """ problems = [] for i in range(len(expected.names)): # Check that expected field exists field_exp = expected.field(i) field_act = actual.field(field_exp.name) if field_act is None: problems.append(f"field '{field_exp.name}' does not exist") continue # Check type if not field_exp.type.equals(field_act.type): problems.append( f"field '{field_exp.name}' has type " f"'{field_act.type}' - expected '{field_exp.type}'" ) # Check that it can be nulled if field_exp.nullable and not field_act.nullable: problems.append(f"field '{field_exp.name}' should be nullable") continue # Check that parameter metadata matches par_exp = ( None if field_exp.metadata is None else field_exp.metadata.get(PROC_PAR_META) ) par_act = ( None if field_act.metadata is None else field_act.metadata.get(PROC_PAR_META) ) if par_exp != par_act: problems.append( f"field '{field_exp.name}' has kind " "'{par_act}' - expected '{par_exp}'" ) return problems