Source code for ska_sdp_dal.processor

"""Provides facilities for registering processing components

Each :py:class:`Processor` registers a number of processing functions
in the Plasma store, see :py:func:`.common.make_call_schema`. You can
especially use Plasma tensor and table objects as input and output
parameters, see :py:func:`.common.make_tensor_input_par`,
:py:func:`.common.make_tensor_output_par`,
:py:func:`.common.make_table_input_par` and
:py:func:`.common.make_table_output_par`.

These processing functions can then be called from other processes
using a :py:class:`.caller.Caller` instance connected to the same
Plasma store.
"""

import abc
import logging
from typing import Any, List, Tuple

import pyarrow
import pyarrow.plasma as plasma

from . import common, connection

logger = logging.getLogger(__name__)


[docs]class BasicProcessor(metaclass=abc.ABCMeta): """Low-level processor interface. **Deprecated**: Use :py:class:`Processor` instead. Should be sub-classed for implementing a concrete processor. When :py:meth:`process` is called, the processor will wait for a matching function call to appear in the Plasma store and input parameters to become available. Once that's the case, :py:meth:`_process_call` will be called with the function name and a :py:class:`pyarrow.RecordBatch` containing the parameters to the call. Simple parameters can then be retrieved using :py:meth`parameter`, and input tensors using :py:meth:`tensor_parameter`/:py:meth:`tensor_parameters`. Output parameters can be set using :py:meth:`output_tensor` :param procs: Processing functions accepted by this processor :param plasma_path: Socket path of the Apache Plasma store :param prefix: Prefix to use for namespace :param name: Name to use for registering the processor in the store. Defaults to the class name. """ def __init__( self, procs: List[pyarrow.Schema], plasma_path: str, prefix: bytes = b"", name: str = None, ): # Create connection self._conn = connection.Connection(plasma_path) # Reserve a namespace if name is None: name = self.__class__.__name__ self._prefix, self._root = self._conn.reserve_namespace(name, procs) logger.info( "%s waiting for calls at prefix %s", name, common.object_id_hex(self._prefix), ) # Initialise self._delayed = {} @property def prefix(self): """The Plasma prefix for calls to this processor.""" return self._prefix
[docs] def process(self, timeout=None, catch_exceptions=True): """Attempts to process a call. Blocks if no call is currently available. :param timeout: Maximum time to block, in seconds :param catch_exceptions: Whether exceptions thrown by :py:meth:`_process_call` should be caught and logged (the default). :return: ``False`` if timeout expired, otherwise ``True`` """ for oid, data_size, _ in self._conn.update_obj_table(timeout): # Ignore deletions if data_size < 0: continue # Has call prefix? Request and attempt to process if oid.binary().startswith(self._prefix): if not common.is_namespace_decl(oid): self._read_calls(oid, catch_exceptions) # Delayed call waiting for this input? if oid in self._delayed: for call_oid, batch in self._delayed[oid]: in_obj_cols, out_obj_cols = self._find_in_out_pars( batch.schema ) self._read_call( call_oid, batch, in_obj_cols, out_obj_cols, catch_exceptions, ) del self._delayed[oid] return True
def _read_calls(self, oid: plasma.ObjectID, catch_exceptions: bool): # Receive objects object_buffers = self._conn.get_buffers([oid], 0) if object_buffers[0] is None: logger.warning( f"Failed to read calls from {common.object_id_hex(oid)}!" ) return logger.debug( f"Reading calls from {oid} ({len(object_buffers[0])} bytes)" ) # Read as a record batch batch_reader = pyarrow.RecordBatchStreamReader( pyarrow.BufferReader(object_buffers[0]) ) if ( batch_reader.schema.metadata is None or common.PROC_FUNC_META not in batch_reader.schema.metadata ): logger.warning( f"Calls {common.object_id_hex(oid)} lack function metadata!" ) return # Identify in+out parameters in_obj_cols, out_obj_cols = self._find_in_out_pars(batch_reader.schema) # Get batches num_calls = 0 for batch in batch_reader: # Go through rows num_calls += batch.num_rows for row in range(batch.num_rows): batch_row = batch.slice(row, 1) self._read_call( oid, batch_row, in_obj_cols, out_obj_cols, catch_exceptions ) # Debug log proc_func = batch_reader.schema.metadata[common.PROC_FUNC_META].decode( "utf8" ) logger.debug( "Received {} calls to {} in {}".format( num_calls, proc_func, common.object_id_hex(oid) ) ) def _find_in_out_pars( self, schema: pyarrow.Schema ) -> Tuple[List[int], List[int]]: """Identify input and output parameters in a schema.""" in_obj_cols = [] out_obj_cols = [] for col in range(len(schema.names)): field = schema.field(col) # Check that field has ObjectID type and has parametr kind # metadata associated with it parkind = common.par_meta(field) if parkind is None or not field.type.equals(common.OBJECT_ID_TYPE): continue # Check the concrete parameter kind if parkind == common.PROC_PAR_META_IN: in_obj_cols.append(col) elif parkind == common.PROC_PAR_META_OUT: out_obj_cols.append(col) return in_obj_cols, out_obj_cols def _read_call( self, call_oid: plasma.ObjectID, batch: pyarrow.RecordBatch, in_obj_cols: List[int], out_obj_cols: List[int], catch_exceptions: bool, ): # Get function name proc_func = batch.schema.metadata[common.PROC_FUNC_META].decode("utf8") # Check whether some outputs are not satisfied yet (i.e. the # request has not been processed already) output_oids = [] sealed_oids = [] for col in out_obj_cols: oid = plasma.ObjectID(batch.column(col)[0].as_py()) if not self._conn.object_exists(oid): output_oids.append(oid) else: sealed_oids.append(oid) if not output_oids: logger.debug( "Outputs {} for {} already sealed, ignoring".format( sealed_oids, proc_func ) ) return # Check that inputs are present for col in in_obj_cols: oid = batch.column(col)[0].as_py() if oid is None: continue oid = plasma.ObjectID(oid) if not self._conn.object_exists(oid): logger.debug( "Input {} for {} missing delaying".format( common.object_id_hex(oid), proc_func ) ) if oid in self._delayed: self._delayed[oid].append((call_oid, batch)) else: self._delayed[oid] = [(call_oid, batch)] return # Process logger.debug( "Processing call to {} from {}".format( proc_func, common.object_id_hex(call_oid) ) ) try: self._process_call(proc_func, batch) except Exception: if catch_exceptions: logger.exception( "While processing call %s to %s", call_oid, proc_func ) return else: raise
[docs] def parameter( self, batch: pyarrow.RecordBatch, name: str, typ: pyarrow.DataType = None, allow_null: bool = False, ) -> Any: """Extract parameter from first row of record batch :param batch: Record batch containing parameter :param name: Name of parameter to extract :param typ: Type to check (optional) :param allow_null: Value allowed to be null - will return None """ # Look up column name col = batch.schema.get_field_index(name) # To type check if typ is not None and not batch.schema.field(col).type.equals(typ): raise TypeError( "Field {} has type {}, expected {}!".format( name, batch.schema.field(col).type, typ ) ) # Get value, convert to Python interpretation val = batch[col][0].as_py() if val is None: if allow_null: return val raise ValueError("Field {} of type {} is null!".format(name, typ)) return val
[docs] def oid_parameter( self, batch: pyarrow.RecordBatch, name: str, allow_null: bool = False ) -> plasma.ObjectID: """Extract Object ID parameter from first row of record batch. :param batch: Record batch containing parameter :param name: Name of parameter to extract :param allow_null: Value allowed to be null - will return None """ oid = self.parameter(batch, name, common.OBJECT_ID_TYPE, allow_null) if oid is None: return None return plasma.ObjectID(oid)
[docs] def tensor_parameters( self, batch: pyarrow.RecordBatch, tensor_specs: List[Tuple[str, pyarrow.DataType, List[str], bool]], ) -> List[pyarrow.Tensor]: """Read tensors referred to via object ID parameters. :param batch: Record batch containing parameters :param tensor_specs: Either list of strings or list of tuples of form (name, type, dimensionality, allow_null). If given, type and dimensionality will be checked. If allow_null is set, the object ID is allowed to be null, in which case None will get returned instead of a tensor. """ # Collect object IDs oid_map = {} for tensor_spec in tensor_specs: if isinstance(tensor_spec, tuple): name = tensor_spec[0] allow_null = False if len(tensor_spec) < 4 else tensor_spec[3] else: name = tensor_spec allow_null = False # Get object ID, skip if None (it was null) oid = self.oid_parameter(batch, name, allow_null) if oid is None: continue oid_map[name] = oid # Make sure it was marked as an input parameter (because # of the previous check this virtually guarantees that the # call below will not block) if ( common.par_meta(batch.schema.field(name)) != common.PROC_PAR_META_IN ): raise ValueError( ( "Tensor call parameter {} not marked " "as input!" ).format(name) ) # Batch-request buffers oids = list(oid_map.values()) bufs = self._conn.get_buffers(oids, 100) buf_map = {name: buf for name, buf in zip(oid_map.keys(), bufs)} # Read them all arrays = [] for tensor_spec in tensor_specs: # Determine name and do null check pars = [] if not isinstance(tensor_spec, tuple): name = buf_map[tensor_spec] else: name = tensor_spec[0] pars = tensor_spec[1:3] # Check for null if ( name not in buf_map and len(tensor_spec) >= 4 and tensor_spec[3] ): arrays.append(None) continue # Result not found in Plasma? Can happen on races when the # caller goes away, raise an error if buf_map[name] is None: raise ValueError( f"Object {oid_map[name]} for parameter {name}" "vanished from Plasma store!" ) # Get parameter arrays.append(common._tensor_from_buf(buf_map[name], name, *pars)) return arrays
[docs] def tensor_parameter( self, batch: pyarrow.RecordBatch, name: str, typ: pyarrow.DataType = None, dim_names: List[str] = None, allow_null: bool = False, ) -> pyarrow.Tensor: """Read tensor referred to via object ID parameter. :param batch: Record batch containing parameters :param name: Name of parameter to extract :param typ: Tensor value type to check (optional) :param dim_names: Tensor dimensionality to check (optional) """ return self.tensor_parameters( batch, [(name, typ, dim_names, allow_null)] )[0]
[docs] @abc.abstractmethod def _process_call(self, proc_func: str, batch: pyarrow.RecordBatch): pass
[docs] def output_tensor(self, batch, name, array, typ=None): """Write output tensor to storage Note that this is less efficient than constructing it in-place, which we should support at some point (TODO) :param batch: Record batch containing parameters :param name: Name of parameter to extract :param arr: Tensor as numpy array :param typ: Tensor value type """ # Get object ID out_oid = self.oid_parameter(batch, name) # Make sure it was marked as an output parameter if ( common.par_meta(batch.schema.field(name)) != common.PROC_PAR_META_OUT ): raise ValueError( ("Tensor call parameter {} not marked as output!").format(name) ) # Write tensor return common._put_numpy( self._conn._client, out_oid, array, typ, temporary=False )
[docs]class LogProcessor(BasicProcessor): """Simple processor that just logs all calls. :param procs: Processing functions accepted by this processor :param plasma_path: Socket path of the Apache Plasma store :param prefix: Prefix to use for namespace :param name: Name to use for registering the processor in the store. Defaults to the class name. """ def _process_call(self, proc_func: str, batch: pyarrow.RecordBatch): field_strs = [] for col in range(len(batch.schema.names)): field = batch.schema.field(col) metadata_str = "" if field.metadata is not None: metadata_str = dict(field.metadata) field_strs.append( "{}={} {}".format( field.name, batch.column(col)[0], metadata_str ) ) logger.info("%s(%s)", proc_func, ",\n\t".join(field_strs))
[docs]class Processor(BasicProcessor): """A high-level processor interface. Should be subclassed for implementing a concrete processor, with methods for every processing function registered by the processor. When :py:meth:`BasicProcessor.process` is called, the processor will wait for a matching function call to appear in the Plasma store and input parameters to become available (see :py:class:`BasicProcessor`). All parameters are then automatically retrieved and unpacked into Python objects. References to the Plasma store are represented using :py:class:`.connection.TensorRef` or :py:class:`.connection.TableRef` instances. These can be used for getting and setting input and output tensors and tables respectively. :param procs: Processing functions accepted by this processor :param plasma_path: Socket path of the Apache Plasma store :param prefix: Prefix to use for namespace. :param name: Name to use for registering the processor in the store. Defaults to the class name. """ def __init__( self, procs: List[pyarrow.Schema], plasma_path: str, prefix: bytes = b"", name: str = None, ): # Make sure all procs are handled for proc in procs: if not hasattr(self, common.call_name(proc)): raise TypeError( f"Processor {name} ({self.__class__.__name__}) is missing " f"handler for {common.call_name(proc)}!" ) # Initialise super(Processor, self).__init__(procs, plasma_path, prefix, name) self._procs = {common.call_name(proc): proc for proc in procs} def _process_call(self, proc_func: str, batch: pyarrow.RecordBatch): """Handle incoming call(s)""" # Get schema schema = self._procs.get(proc_func) if schema is None: raise ValueError( ("Call to unknown function: {}!").format(proc_func) ) # Gather parameters params = {} in_tensor_refs = [] for field in [schema.field(i) for i in range(len(schema.names))]: # Is a reference? par_meta = common.par_meta(field) if par_meta is None: # Just read as-is params[field.name] = self.parameter( batch, field.name, field.type, field.nullable ) continue is_input = par_meta == common.PROC_PAR_META_IN # Get OID - skip if not set oid = self.oid_parameter(batch, field.name, field.nullable) if oid is None: params[field.name] = None continue # A tensor? tensor_typ = common.par_tensor_elem_type(field) if tensor_typ is not None: # Get type + dimension names tensor_typ = common.par_tensor_elem_type(field) dim_names = common.par_tensor_dim_names(field) # Add to specs to be retrieved as inputs - or allocate # as output ref = connection.TensorRef( self._conn, oid, tensor_typ, dim_names, is_input ) params[field.name] = ref if is_input: in_tensor_refs.append(ref) continue # A table? table_schema = common.par_table_schema(field) if table_schema is not None: # Again, either retrieve or allocate ref = connection.TableRef(self._conn, oid, schema, is_input) params[field.name] = ref if is_input: in_tensor_refs.append(ref) # Batch-retrieve input refs (tensors & tables) if len(in_tensor_refs) > 0: self._conn.get_ref_buffers(in_tensor_refs, 0) # Call, passing parameters as keywords method = getattr(self, proc_func) method(**params)