import binascii
import logging
import os
import sys
from typing import Any, Dict, Iterator, List, NewType, Optional
import numpy
import pyarrow
import pyarrow.plasma as plasma
logger = logging.getLogger(__name__)
#: Length of Plasma's object IDs
OBJECT_ID_SIZE = 20
#: Arrow type used for representing Plasma object IDs
OBJECT_ID_TYPE = pyarrow.binary(OBJECT_ID_SIZE)
#: Bytes of the Object ID we are going to use as namespace prefix
NAMESPACE_ID_SIZE = 4
#: Metadata entry for namespace name
PROC_NAMESPACE_META = b"proc:namespace"
#: Metadata entry for namespace process ID
PROC_NAMESPACE_PID_META = b"proc:pid"
#: Metadata entry for namespace process arguments
PROC_NAMESPACE_ARGV_META = b"proc:argv"
#: First object in namespace is expected to be its declaration
NAMESPACE_DECL_SUFFIX = b"\0" * (OBJECT_ID_SIZE - NAMESPACE_ID_SIZE)
# Schema used for declaring calls valid for namespace
PROCS_SCHEMA = pyarrow.schema(
[
("name", pyarrow.string()),
("schema", pyarrow.binary()), # serialised pyarrow.Schema
]
)
# RPC-specific meta data added to schemas
PROC_FUNC_META = b"proc:func"
PROC_PAR_META = b"proc:par"
PROC_PAR_META_IN = b"in"
PROC_PAR_META_OUT = b"out"
PROC_TENSOR_TYPE_META = b"proc:tensor_type"
PROC_TENSOR_DIMS_META = b"proc:tensor_dims"
PROC_TABLE_SCHEMA_META = b"proc:table_schema"
[docs]def parse_hex_objectid(oid_str: str) -> bytes:
"""Parse an Object ID given as a hexadecimal string representation
Note that this allows Object IDs to have less than 20 bytes,
i.e. partial Object IDs (prefixes) are parsed without error.
:param oid_str: String representation
:returns: Object ID as binary string
"""
# Validate
if len(oid_str) % 2 != 0:
raise ValueError("Object ID string must have even length!")
if len(oid_str) // 2 > OBJECT_ID_SIZE:
raise ValueError(
"Object ID can have {} bytes at maximum!".format(OBJECT_ID_SIZE)
)
# Parse prefixes
oid = bytearray()
for i in range(len(oid_str) // 2):
oid.append(int(oid_str[2 * i : 2 * i + 2], 16))
return bytes(oid)
[docs]def object_id_hex(oid: plasma.ObjectID) -> str:
"""Convert Object ID into a hexadecimal string representation
:param oid: The Object ID to convert (as bytearray or ObjectID)
"""
if isinstance(oid, plasma.ObjectID):
oid = oid.binary()
return binascii.hexlify(oid).decode("ascii")
[docs]def objectid_generator(
prefix: bytes, size: int = OBJECT_ID_SIZE
) -> Iterator[bytes]:
"""Generate ObjectIDs with a given prefix.
:param prefix: Prefix as binary string
"""
# Pad prefix to length
oid = bytearray(prefix)
while len(oid) < size:
oid.append(0)
# Generate IDs
byte_count = size - len(prefix)
i = 0
while i < 2 ** (8 * byte_count):
for k in range(byte_count):
oid[-k - 1] = (i >> 8 * k) % 256
yield bytes(oid)
i += 1
def _make_namespace_decl(
name: str = None, procs: List[pyarrow.Schema] = []
) -> bytes:
"""Generate namespace declaration.
This gives the supported call schemas (if any) plus some
information about the owning process. Meant to allow users and
other processes to identify the purpose of this namespace.
:param name: Informative display name for namespace
:param procs: Call schemas supported (if any)
:returns: Namespace declaration as record batch
"""
# Add process identification to metadata
_metadata = {
PROC_NAMESPACE_META: str(name).encode(),
PROC_NAMESPACE_ARGV_META: str(sys.argv).encode(),
PROC_NAMESPACE_PID_META: str(os.getpid()).encode(),
}
# Build batch record
return pyarrow.record_batch(
[
pyarrow.array(
[schema.metadata[PROC_FUNC_META] for schema in procs]
),
pyarrow.array(
[schema.serialize().to_pybytes() for schema in procs]
),
],
PROCS_SCHEMA.with_metadata(_metadata),
)
[docs]def is_namespace_decl(oid: plasma.ObjectID):
"""Checks whether the given object ID declares a namespace."""
if isinstance(oid, plasma.ObjectID):
oid = oid.binary()
return oid.endswith(NAMESPACE_DECL_SUFFIX)
def _numpy_cast_to_complex(arr: numpy.ndarray) -> numpy.ndarray:
"""Cast array with inner-most dimension of 2 into complex array.
:param arr: Array to convert
"""
if arr.shape[-1] != 2 and not arr.size == 0:
raise ValueError(
"To interpret as complex type, array must have "
+ "inner dimension of 2!"
)
# Determine output type
if arr.dtype == numpy.single:
dtype = numpy.csingle
elif arr.dtype == numpy.double:
dtype = numpy.cdouble
elif arr.dtype == numpy.longdouble:
dtype = numpy.clongdouble
else:
raise TypeError(
"Unsupported dtype for cast to complex: {}".format(arr.dtype)
)
# Perform cast
out = numpy.frombuffer(memoryview(arr), dtype=dtype, count=arr.size // 2)
return out.reshape(arr.shape[:-1])
def _numpy_cast_from_complex(arr: numpy.ndarray) -> numpy.ndarray:
"""Cast complex array to real array with inner dimension of 2
:param arr: Array to convert
"""
# Determine output type
if arr.dtype == numpy.csingle:
dtype = numpy.single
elif arr.dtype == numpy.cdouble:
dtype = numpy.double
elif arr.dtype == numpy.clongdouble:
dtype = numpy.longdouble
else:
raise TypeError(
"Unsupported dtype for cast from complex: {}".format(arr.dtype)
)
# Perform cast, which requires a C-contiguous array
if not arr.flags["C_CONTIGUOUS"]:
arr = numpy.ascontiguousarray(arr)
out = numpy.frombuffer(memoryview(arr), dtype=dtype, count=arr.size * 2)
return out.reshape(arr.shape + (0 if arr.size == 0 else 2,))
[docs]class ComplexType(object):
"""
Pseudo-type to refer to complex values.
Use in place of arrow types.
"""
def __init__(self, real_type, complex_dtype, real_dtype):
self.real_type = real_type
self.complex_dtype = complex_dtype
self.real_dtype = real_dtype
[docs] def to_pandas_dtype(self):
return self.complex_dtype
#: Single-precision complex type
complex64 = ComplexType(pyarrow.float32(), numpy.csingle, numpy.single)
#: Double-precision complex type
complex128 = ComplexType(pyarrow.float64(), numpy.cdouble, numpy.double)
[docs]def from_numpy_dtype(dtype: Any) -> pyarrow.DataType:
dt = numpy.dtype(dtype)
if dt == numpy.csingle:
return complex64
if dt == numpy.cdouble:
return complex128
return pyarrow.from_numpy_dtype(dtype)
def _put_numpy(
client: plasma.PlasmaClient,
oid: plasma.ObjectID,
array: numpy.ndarray,
typ: pyarrow.DataType = None,
dim_names: List[str] = None,
temporary: bool = True,
) -> pyarrow.Buffer:
"""Write a numpy array to storage as a tensor
Note that this is less efficient than constructing it
in-place, which we should support at some point (TODO)
:param client: Plasma client to use
:param oid: Object ID to use
:param array: Tensor as numpy array
:param typ: Tensor assumed value type
:param dim_names: Tensor dimension names
:param temporary: Can be deleted once returned buffer is discarded
:returns: Buffer in Plasma store used to write object
"""
# Check dimensionality
if dim_names is not None:
if len(array.shape) != len(dim_names):
raise TypeError(
f"Tensor has {len(array.shape)} dimensions, but "
+ f"expected {len(dim_names)} ({','.join(dim_names)}!"
)
# Attempt to convert to tensor of given type, if applicable
is_complex = isinstance(typ, ComplexType)
if is_complex:
array = _numpy_cast_from_complex(array)
if dim_names is not None:
dim_names = list(dim_names) + ["cpx"]
typ = typ.real_type
if typ is not None:
array = array.astype(typ.to_pandas_dtype(), copy=False)
tensor = pyarrow.Tensor.from_numpy(array)
if typ is not None:
if not tensor.type.equals(typ):
raise TypeError(
"Tensor has type {}, but expected {}!".format(tensor.type, typ)
)
# Determine size, reserve in Plasma and mmap in
tensor_size = pyarrow.ipc.get_tensor_size(tensor)
out_buf = client.create(oid, tensor_size)
# Write to fixed-size buffer in shared memory
writer = pyarrow.FixedSizeBufferWriter(out_buf)
pyarrow.ipc.write_tensor(tensor, writer)
# Seal + request deletion
client.seal(oid)
if temporary:
client.delete([oid])
return out_buf
def _tensor_from_buf(
buf: pyarrow.Buffer,
name: str,
typ: pyarrow.DataType = None,
dim_names: List[str] = None,
) -> numpy.ndarray:
# Deserialise the tensor
tensor = pyarrow.ipc.read_tensor(pyarrow.BufferReader(buf))
# Check type
is_complex = False
if isinstance(typ, ComplexType):
typ = typ.real_type
is_complex = True
if typ is not None and not tensor.type.equals(typ):
raise TypeError(
("Field {} refers to tensor " + "of type {}, expected {}!").format(
name, tensor.type, typ
)
)
# Check dimensionality
if dim_names is not None:
ndim = tensor.ndim
if is_complex:
if ndim < 1:
raise TypeError(
(
"Field {} has complex type, but " + "zero dimensions!"
).format(name)
)
ndim -= 1
if ndim != len(dim_names):
raise TypeError(
(
"(Field {} refers to tensor "
+ "of dimensionality {}, expected {} ({})!"
).format(name, ndim, len(dim_names), ",".join(dim_names))
)
# TODO: Check dimension names?
# Convert to numpy
np = tensor.to_numpy()
if is_complex:
np = _numpy_cast_to_complex(np)
return np
par_spec = NewType("par_spec", tuple)
[docs]def call_name(schema: pyarrow.Schema) -> str:
"""Get call name from call schema
:param schema: Call schema
"""
return schema.metadata[PROC_FUNC_META].decode()
[docs]def par_tensor_elem_type(field: pyarrow.Field) -> Optional[pyarrow.DataType]:
"""Get tensor element type parameter
:param field: Field to read metadata from
:return: Parameter element type, or `None` if not set
"""
if field.metadata is None or PROC_TENSOR_TYPE_META not in field.metadata:
return None
return from_numpy_dtype(field.metadata[PROC_TENSOR_TYPE_META])
[docs]def par_tensor_dim_names(field: pyarrow.Field) -> Optional[List[str]]:
"""Get tensor element type parameter
:param field: Field to read metadata frmo
:return: Parameter element type, or `None` if not set
"""
if field.metadata is None or PROC_TENSOR_DIMS_META not in field.metadata:
return None
return field.metadata[PROC_TENSOR_DIMS_META].decode().split(",")
[docs]def par_table_schema(field: pyarrow.Field) -> Optional[pyarrow.Schema]:
"""Get table schma for a parameter
:param field: Field to read metadata frmo
:return: Table schema, or `None` if not set
"""
if field.metadata is None or PROC_TABLE_SCHEMA_META not in field.metadata:
return None
# Deserialise
try:
schema_bytes = field.metadata[PROC_TABLE_SCHEMA_META]
reader = pyarrow.BufferReader(pyarrow.py_buffer(schema_bytes))
return pyarrow.ipc.RecordBatchStreamReader(reader).schema
except pyarrow.ArrowException as e:
logger.warning("Could not deserialise table schema: " + repr(e))
return None
[docs]def make_par(
name: str,
typ: pyarrow.DataType,
nullable: bool = False,
metadata: Dict[str, str] = {},
) -> par_spec:
"""
Create parameter declaration to pass to make_call_schema.
:param name: Parameter name
:param typ: Arrow data type
:param nullable: Allowed to be null?
:param metadata: Metadata dictionary to associate with field.
"""
return (str(name), typ, nullable, dict(metadata))
[docs]def make_oid_par(
name: str, nullable: bool = False, metadata: Dict[bytes, bytes] = {}
) -> par_spec:
"""
Create Object ID parameter to pass to make_call_schema.
:param name: Parameter name
:param nullable: Allowed to be null?
:param metadata: Metadata dictionary to associate with field.
"""
return make_par(name, OBJECT_ID_TYPE, nullable, metadata)
[docs]def make_oid_output_par(
name: str, nullable: bool = False, metadata: Dict[bytes, bytes] = {}
) -> par_spec:
"""Create Object ID parameter to pass to make_call_schema.
The call will be skipped if all outputs already exist in the
Plasma store.
:param name: Parameter name
:param nullable: Allowed to be null?
:param metadata: Metadata dictionary to associate with field.
"""
metadata = dict(metadata)
metadata[PROC_PAR_META] = PROC_PAR_META_OUT
return make_oid_par(name, nullable, metadata)
[docs]def make_tensor_output_par(
name: str,
elem_type: pyarrow.DataType,
dim_names: List[str],
nullable: bool = False,
) -> par_spec:
"""Create input tensor parameter to pass to make_call_schema.
Marking the parameter as input means that the call will be delayed
until a tensor with the given ID appears in the Plasma store.
:param name: Parameter name
:param elem_type: Tensor element type
:param dim_names: Dimension names
:param nullable: Allowed to be null?
"""
metadata = {
PROC_TENSOR_TYPE_META: numpy.dtype(
elem_type.to_pandas_dtype()
).name.encode(),
PROC_TENSOR_DIMS_META: ",".join(dim_names).encode(),
}
return make_oid_output_par(name, nullable, metadata)
[docs]def make_call_schema(
func_name: str, pars: List[par_spec], metadata={}
) -> pyarrow.Schema:
"""Create schema for calls through the Plasma store.
:param func_name: Function name
:param pars: List of parameters
:param metadata: Metadata to associate with schema
"""
metadata = dict(metadata)
metadata[PROC_FUNC_META] = func_name
return pyarrow.schema(pars, metadata=metadata)
[docs]def schema_compatible(
expected: pyarrow.Schema, actual: pyarrow.Schema
) -> bool:
"""Checks for compatibility between (call) schemas.
This means that all expected fields are there and have the same
types (including relevant metadata).
:param expected: Expected schema
:param actual: Schema to check
:return: Empty list if compatible, otherwise list of mismatches
"""
problems = []
for i in range(len(expected.names)):
# Check that expected field exists
field_exp = expected.field(i)
field_act = actual.field(field_exp.name)
if field_act is None:
problems.append(f"field '{field_exp.name}' does not exist")
continue
# Check type
if not field_exp.type.equals(field_act.type):
problems.append(
f"field '{field_exp.name}' has type "
f"'{field_act.type}' - expected '{field_exp.type}'"
)
# Check that it can be nulled
if field_exp.nullable and not field_act.nullable:
problems.append(f"field '{field_exp.name}' should be nullable")
continue
# Check that parameter metadata matches
par_exp = (
None
if field_exp.metadata is None
else field_exp.metadata.get(PROC_PAR_META)
)
par_act = (
None
if field_act.metadata is None
else field_act.metadata.get(PROC_PAR_META)
)
if par_exp != par_act:
problems.append(
f"field '{field_exp.name}' has kind "
"'{par_act}' - expected '{par_exp}'"
)
return problems