"""Provides facilities for registering processing components
Each :py:class:`Processor` registers a number of processing functions
in the Plasma store, see :py:func:`.common.make_call_schema`. You can
especially use Plasma tensor and table objects as input and output
parameters, see :py:func:`.common.make_tensor_input_par`,
:py:func:`.common.make_tensor_output_par`,
:py:func:`.common.make_table_input_par` and
:py:func:`.common.make_table_output_par`.
These processing functions can then be called from other processes
using a :py:class:`.caller.Caller` instance connected to the same
Plasma store.
"""
import abc
import logging
from typing import Any, List, Tuple
import pyarrow
import pyarrow.plasma as plasma
from . import common, connection
logger = logging.getLogger(__name__)
[docs]class BasicProcessor(metaclass=abc.ABCMeta):
"""Low-level processor interface.
**Deprecated**: Use :py:class:`Processor` instead.
Should be sub-classed for implementing a concrete processor. When
:py:meth:`process` is called, the processor will wait for a
matching function call to appear in the Plasma store and input
parameters to become available. Once that's the case,
:py:meth:`_process_call` will be called with the function name and
a :py:class:`pyarrow.RecordBatch` containing the parameters to the
call.
Simple parameters can then be retrieved using :py:meth`parameter`,
and input tensors using
:py:meth:`tensor_parameter`/:py:meth:`tensor_parameters`. Output
parameters can be set using :py:meth:`output_tensor`
:param procs: Processing functions accepted by this processor
:param plasma_path: Socket path of the Apache Plasma store
:param prefix: Prefix to use for namespace
:param name: Name to use for registering the processor in the
store. Defaults to the class name.
"""
def __init__(
self,
procs: List[pyarrow.Schema],
plasma_path: str,
prefix: bytes = b"",
name: str = None,
):
# Create connection
self._conn = connection.Connection(plasma_path)
# Reserve a namespace
if name is None:
name = self.__class__.__name__
self._prefix, self._root = self._conn.reserve_namespace(name, procs)
logger.info(
"%s waiting for calls at prefix %s",
name,
common.object_id_hex(self._prefix),
)
# Initialise
self._delayed = {}
@property
def prefix(self):
"""The Plasma prefix for calls to this processor."""
return self._prefix
[docs] def process(self, timeout=None, catch_exceptions=True):
"""Attempts to process a call.
Blocks if no call is currently available.
:param timeout: Maximum time to block, in seconds
:param catch_exceptions: Whether exceptions thrown by
:py:meth:`_process_call` should be caught and
logged (the default).
:return: ``False`` if timeout expired, otherwise ``True``
"""
for oid, data_size, _ in self._conn.update_obj_table(timeout):
# Ignore deletions
if data_size < 0:
continue
# Has call prefix? Request and attempt to process
if oid.binary().startswith(self._prefix):
if not common.is_namespace_decl(oid):
self._read_calls(oid, catch_exceptions)
# Delayed call waiting for this input?
if oid in self._delayed:
for call_oid, batch in self._delayed[oid]:
in_obj_cols, out_obj_cols = self._find_in_out_pars(
batch.schema
)
self._read_call(
call_oid,
batch,
in_obj_cols,
out_obj_cols,
catch_exceptions,
)
del self._delayed[oid]
return True
def _read_calls(self, oid: plasma.ObjectID, catch_exceptions: bool):
# Receive objects
object_buffers = self._conn.get_buffers([oid], 0)
if object_buffers[0] is None:
logger.warning(
f"Failed to read calls from {common.object_id_hex(oid)}!"
)
return
logger.debug(
f"Reading calls from {oid} ({len(object_buffers[0])} bytes)"
)
# Read as a record batch
batch_reader = pyarrow.RecordBatchStreamReader(
pyarrow.BufferReader(object_buffers[0])
)
if (
batch_reader.schema.metadata is None
or common.PROC_FUNC_META not in batch_reader.schema.metadata
):
logger.warning(
f"Calls {common.object_id_hex(oid)} lack function metadata!"
)
return
# Identify in+out parameters
in_obj_cols, out_obj_cols = self._find_in_out_pars(batch_reader.schema)
# Get batches
num_calls = 0
for batch in batch_reader:
# Go through rows
num_calls += batch.num_rows
for row in range(batch.num_rows):
batch_row = batch.slice(row, 1)
self._read_call(
oid, batch_row, in_obj_cols, out_obj_cols, catch_exceptions
)
# Debug log
proc_func = batch_reader.schema.metadata[common.PROC_FUNC_META].decode(
"utf8"
)
logger.debug(
"Received {} calls to {} in {}".format(
num_calls, proc_func, common.object_id_hex(oid)
)
)
def _find_in_out_pars(
self, schema: pyarrow.Schema
) -> Tuple[List[int], List[int]]:
"""Identify input and output parameters in a schema."""
in_obj_cols = []
out_obj_cols = []
for col in range(len(schema.names)):
field = schema.field(col)
# Check that field has ObjectID type and has parametr kind
# metadata associated with it
parkind = common.par_meta(field)
if parkind is None or not field.type.equals(common.OBJECT_ID_TYPE):
continue
# Check the concrete parameter kind
if parkind == common.PROC_PAR_META_IN:
in_obj_cols.append(col)
elif parkind == common.PROC_PAR_META_OUT:
out_obj_cols.append(col)
return in_obj_cols, out_obj_cols
def _read_call(
self,
call_oid: plasma.ObjectID,
batch: pyarrow.RecordBatch,
in_obj_cols: List[int],
out_obj_cols: List[int],
catch_exceptions: bool,
):
# Get function name
proc_func = batch.schema.metadata[common.PROC_FUNC_META].decode("utf8")
# Check whether some outputs are not satisfied yet (i.e. the
# request has not been processed already)
output_oids = []
sealed_oids = []
for col in out_obj_cols:
oid = plasma.ObjectID(batch.column(col)[0].as_py())
if not self._conn.object_exists(oid):
output_oids.append(oid)
else:
sealed_oids.append(oid)
if not output_oids:
logger.debug(
"Outputs {} for {} already sealed, ignoring".format(
sealed_oids, proc_func
)
)
return
# Check that inputs are present
for col in in_obj_cols:
oid = batch.column(col)[0].as_py()
if oid is None:
continue
oid = plasma.ObjectID(oid)
if not self._conn.object_exists(oid):
logger.debug(
"Input {} for {} missing delaying".format(
common.object_id_hex(oid), proc_func
)
)
if oid in self._delayed:
self._delayed[oid].append((call_oid, batch))
else:
self._delayed[oid] = [(call_oid, batch)]
return
# Process
logger.debug(
"Processing call to {} from {}".format(
proc_func, common.object_id_hex(call_oid)
)
)
try:
self._process_call(proc_func, batch)
except Exception:
if catch_exceptions:
logger.exception(
"While processing call %s to %s", call_oid, proc_func
)
return
else:
raise
[docs] def parameter(
self,
batch: pyarrow.RecordBatch,
name: str,
typ: pyarrow.DataType = None,
allow_null: bool = False,
) -> Any:
"""Extract parameter from first row of record batch
:param batch: Record batch containing parameter
:param name: Name of parameter to extract
:param typ: Type to check (optional)
:param allow_null: Value allowed to be null - will return None
"""
# Look up column name
col = batch.schema.get_field_index(name)
# To type check
if typ is not None and not batch.schema.field(col).type.equals(typ):
raise TypeError(
"Field {} has type {}, expected {}!".format(
name, batch.schema.field(col).type, typ
)
)
# Get value, convert to Python interpretation
val = batch[col][0].as_py()
if val is None:
if allow_null:
return val
raise ValueError("Field {} of type {} is null!".format(name, typ))
return val
[docs] def oid_parameter(
self, batch: pyarrow.RecordBatch, name: str, allow_null: bool = False
) -> plasma.ObjectID:
"""Extract Object ID parameter from first row of record batch.
:param batch: Record batch containing parameter
:param name: Name of parameter to extract
:param allow_null: Value allowed to be null - will return None
"""
oid = self.parameter(batch, name, common.OBJECT_ID_TYPE, allow_null)
if oid is None:
return None
return plasma.ObjectID(oid)
[docs] def tensor_parameters(
self,
batch: pyarrow.RecordBatch,
tensor_specs: List[Tuple[str, pyarrow.DataType, List[str], bool]],
) -> List[pyarrow.Tensor]:
"""Read tensors referred to via object ID parameters.
:param batch: Record batch containing parameters
:param tensor_specs: Either list of strings or list of tuples
of form (name, type, dimensionality, allow_null). If given,
type and dimensionality will be checked. If allow_null is set,
the object ID is allowed to be null, in which case None will
get returned instead of a tensor.
"""
# Collect object IDs
oid_map = {}
for tensor_spec in tensor_specs:
if isinstance(tensor_spec, tuple):
name = tensor_spec[0]
allow_null = False if len(tensor_spec) < 4 else tensor_spec[3]
else:
name = tensor_spec
allow_null = False
# Get object ID, skip if None (it was null)
oid = self.oid_parameter(batch, name, allow_null)
if oid is None:
continue
oid_map[name] = oid
# Make sure it was marked as an input parameter (because
# of the previous check this virtually guarantees that the
# call below will not block)
if (
common.par_meta(batch.schema.field(name))
!= common.PROC_PAR_META_IN
):
raise ValueError(
(
"Tensor call parameter {} not marked " "as input!"
).format(name)
)
# Batch-request buffers
oids = list(oid_map.values())
bufs = self._conn.get_buffers(oids, 100)
buf_map = {name: buf for name, buf in zip(oid_map.keys(), bufs)}
# Read them all
arrays = []
for tensor_spec in tensor_specs:
# Determine name and do null check
pars = []
if not isinstance(tensor_spec, tuple):
name = buf_map[tensor_spec]
else:
name = tensor_spec[0]
pars = tensor_spec[1:3]
# Check for null
if (
name not in buf_map
and len(tensor_spec) >= 4
and tensor_spec[3]
):
arrays.append(None)
continue
# Result not found in Plasma? Can happen on races when the
# caller goes away, raise an error
if buf_map[name] is None:
raise ValueError(
f"Object {oid_map[name]} for parameter {name}"
"vanished from Plasma store!"
)
# Get parameter
arrays.append(common._tensor_from_buf(buf_map[name], name, *pars))
return arrays
[docs] def tensor_parameter(
self,
batch: pyarrow.RecordBatch,
name: str,
typ: pyarrow.DataType = None,
dim_names: List[str] = None,
allow_null: bool = False,
) -> pyarrow.Tensor:
"""Read tensor referred to via object ID parameter.
:param batch: Record batch containing parameters
:param name: Name of parameter to extract
:param typ: Tensor value type to check (optional)
:param dim_names: Tensor dimensionality to check (optional)
"""
return self.tensor_parameters(
batch, [(name, typ, dim_names, allow_null)]
)[0]
[docs] @abc.abstractmethod
def _process_call(self, proc_func: str, batch: pyarrow.RecordBatch):
pass
[docs] def output_tensor(self, batch, name, array, typ=None):
"""Write output tensor to storage
Note that this is less efficient than constructing it
in-place, which we should support at some point (TODO)
:param batch: Record batch containing parameters
:param name: Name of parameter to extract
:param arr: Tensor as numpy array
:param typ: Tensor value type
"""
# Get object ID
out_oid = self.oid_parameter(batch, name)
# Make sure it was marked as an output parameter
if (
common.par_meta(batch.schema.field(name))
!= common.PROC_PAR_META_OUT
):
raise ValueError(
("Tensor call parameter {} not marked as output!").format(name)
)
# Write tensor
return common._put_numpy(
self._conn._client, out_oid, array, typ, temporary=False
)
[docs]class LogProcessor(BasicProcessor):
"""Simple processor that just logs all calls.
:param procs: Processing functions accepted by this processor
:param plasma_path: Socket path of the Apache Plasma store
:param prefix: Prefix to use for namespace
:param name: Name to use for registering the processor in the
store. Defaults to the class name.
"""
def _process_call(self, proc_func: str, batch: pyarrow.RecordBatch):
field_strs = []
for col in range(len(batch.schema.names)):
field = batch.schema.field(col)
metadata_str = ""
if field.metadata is not None:
metadata_str = dict(field.metadata)
field_strs.append(
"{}={} {}".format(
field.name, batch.column(col)[0], metadata_str
)
)
logger.info("%s(%s)", proc_func, ",\n\t".join(field_strs))
[docs]class Processor(BasicProcessor):
"""A high-level processor interface.
Should be subclassed for implementing a concrete processor, with
methods for every processing function registered by the
processor. When :py:meth:`BasicProcessor.process` is called, the
processor will wait for a matching function call to appear in the
Plasma store and input parameters to become available (see
:py:class:`BasicProcessor`).
All parameters are then automatically retrieved and unpacked into
Python objects. References to the Plasma store are represented
using :py:class:`.connection.TensorRef` or
:py:class:`.connection.TableRef` instances. These can be used for
getting and setting input and output tensors and tables
respectively.
:param procs: Processing functions accepted by this processor
:param plasma_path: Socket path of the Apache Plasma store
:param prefix: Prefix to use for namespace.
:param name: Name to use for registering the processor in the
store. Defaults to the class name.
"""
def __init__(
self,
procs: List[pyarrow.Schema],
plasma_path: str,
prefix: bytes = b"",
name: str = None,
):
# Make sure all procs are handled
for proc in procs:
if not hasattr(self, common.call_name(proc)):
raise TypeError(
f"Processor {name} ({self.__class__.__name__}) is missing "
f"handler for {common.call_name(proc)}!"
)
# Initialise
super(Processor, self).__init__(procs, plasma_path, prefix, name)
self._procs = {common.call_name(proc): proc for proc in procs}
def _process_call(self, proc_func: str, batch: pyarrow.RecordBatch):
"""Handle incoming call(s)"""
# Get schema
schema = self._procs.get(proc_func)
if schema is None:
raise ValueError(
("Call to unknown function: {}!").format(proc_func)
)
# Gather parameters
params = {}
in_tensor_refs = []
for field in [schema.field(i) for i in range(len(schema.names))]:
# Is a reference?
par_meta = common.par_meta(field)
if par_meta is None:
# Just read as-is
params[field.name] = self.parameter(
batch, field.name, field.type, field.nullable
)
continue
is_input = par_meta == common.PROC_PAR_META_IN
# Get OID - skip if not set
oid = self.oid_parameter(batch, field.name, field.nullable)
if oid is None:
params[field.name] = None
continue
# A tensor?
tensor_typ = common.par_tensor_elem_type(field)
if tensor_typ is not None:
# Get type + dimension names
tensor_typ = common.par_tensor_elem_type(field)
dim_names = common.par_tensor_dim_names(field)
# Add to specs to be retrieved as inputs - or allocate
# as output
ref = connection.TensorRef(
self._conn, oid, tensor_typ, dim_names, is_input
)
params[field.name] = ref
if is_input:
in_tensor_refs.append(ref)
continue
# A table?
table_schema = common.par_table_schema(field)
if table_schema is not None:
# Again, either retrieve or allocate
ref = connection.TableRef(self._conn, oid, schema, is_input)
params[field.name] = ref
if is_input:
in_tensor_refs.append(ref)
# Batch-retrieve input refs (tensors & tables)
if len(in_tensor_refs) > 0:
self._conn.get_ref_buffers(in_tensor_refs, 0)
# Call, passing parameters as keywords
method = getattr(self, proc_func)
method(**params)