Source code for ska_sdp_dal.store

import logging
from typing import List, Mapping, Union

import numpy
import pyarrow
import pyarrow.plasma as plasma

from . import common, connection

logger = logging.getLogger(__name__)


[docs]class Store(object): """ A storage namespace within a Plasma store Used for holding shared data objects, such as tensors and tables. These can be passed to processors. """ def __init__( self, plasma_path: str, max_attempts: int = 10000, name: str = None ): # Connect to Plasma self._conn = connection.Connection(plasma_path) # Reserve namespace if name is None: name = self.__class__.__name__ self._prefix, self._root = self._conn.reserve_namespace(name, []) logger.info( "Store using prefix %s for objects", common.object_id_hex(self._prefix), ) # Initialise self._oid_generator = common.objectid_generator(self._prefix) next(self._oid_generator) self._max_attempts = max_attempts @property def conn(self) -> connection.Connection: return self._conn
[docs] def make_tensor_ref( self, oid: plasma.ObjectID, typ: pyarrow.DataType = None, dim_names: List[str] = None, ) -> connection.TensorRef: """Create a TensorRef object for an existing object in Plasma :param oid: Existing object ID :param typ: Element datatype. If `ComplexType`, will convert. :param dim_names: Dimension names :returns: Reference to tensor """ return connection.TensorRef(self._conn, oid, typ, dim_names)
def _allocate_oid(self) -> plasma.ObjectID: """Allocate a new free Object ID in Plasma :returns: New unused OID """ for _ in range(self._max_attempts): # Create a new OID oid = next(self._oid_generator) # Check whether it is free. Note this is not race-safe - # we *are* basically assuming that there's no two # processes using the same namespace for storage at the # same time. However this might help when recovering from # a crash. if not self._conn.object_exists(oid): return plasma.ObjectID(oid) raise RuntimeError( "Maximum number of retries reached while " + "attempting to find unused object ID!" )
[docs] def new_tensor_ref( self, typ: pyarrow.DataType = None, dim_names: List[str] = None ) -> connection.TensorRef: """Allocate an Object ID for a new tensor in Plasma :param typ: Element datatype. If `ComplexType`, will convert. :param dim_names: Dimension names :returns: Reference to tensor """ return self.make_tensor_ref(self._allocate_oid(), typ, dim_names)
[docs] def put_new_tensor( self, arr: numpy.ndarray, typ: pyarrow.DataType = None, dim_names: List[str] = None, ) -> connection.TensorRef: """Allocate and create a new tensor in Plasma :param arr: Data as numpy array :param typ: Element datatype. If ComplexType, will convert. :param dim_names: Dimension names :returns: Reference to tensor """ # Allocate in Plasma ref = self.new_tensor_ref(typ, dim_names) # Put, return ref.put(arr) return ref
[docs] def new_table_ref( self, schema: pyarrow.Schema = None ) -> connection.TableRef: """Allocate an Object ID for a new tensor in Plasma :param typ: Element datatype. If `ComplexType`, will convert. :param dim_names: Dimension names :returns: Reference to tensor """ return connection.TableRef(self._conn, self._allocate_oid(), schema)
[docs] def put_new_table( self, table: Union[ pyarrow.Table, Mapping[str, pyarrow.ChunkedArray], Mapping[str, pyarrow.Array], Mapping[str, list], ], schema: pyarrow.Schema = None, ) -> connection.TableRef: """Allocate and create a new table in Plasma See :py:meth:`connection.TableRef.put()` for notes about possible parameters. :param table: Table data :param schema: Table schema :returns: Reference to table """ # Allocate in Plasma ref = self.new_table_ref(schema) # Put, return ref.put(table) return ref