import logging
from typing import List, Mapping, Union
import numpy
import pyarrow
import pyarrow.plasma as plasma
from . import common, connection
logger = logging.getLogger(__name__)
[docs]class Store(object):
"""
A storage namespace within a Plasma store
Used for holding shared data objects, such as tensors and
tables. These can be passed to processors.
"""
def __init__(
self, plasma_path: str, max_attempts: int = 10000, name: str = None
):
# Connect to Plasma
self._conn = connection.Connection(plasma_path)
# Reserve namespace
if name is None:
name = self.__class__.__name__
self._prefix, self._root = self._conn.reserve_namespace(name, [])
logger.info(
"Store using prefix %s for objects",
common.object_id_hex(self._prefix),
)
# Initialise
self._oid_generator = common.objectid_generator(self._prefix)
next(self._oid_generator)
self._max_attempts = max_attempts
@property
def conn(self) -> connection.Connection:
return self._conn
[docs] def make_tensor_ref(
self,
oid: plasma.ObjectID,
typ: pyarrow.DataType = None,
dim_names: List[str] = None,
) -> connection.TensorRef:
"""Create a TensorRef object for an existing object in Plasma
:param oid: Existing object ID
:param typ: Element datatype. If `ComplexType`, will convert.
:param dim_names: Dimension names
:returns: Reference to tensor
"""
return connection.TensorRef(self._conn, oid, typ, dim_names)
def _allocate_oid(self) -> plasma.ObjectID:
"""Allocate a new free Object ID in Plasma
:returns: New unused OID
"""
for _ in range(self._max_attempts):
# Create a new OID
oid = next(self._oid_generator)
# Check whether it is free. Note this is not race-safe -
# we *are* basically assuming that there's no two
# processes using the same namespace for storage at the
# same time. However this might help when recovering from
# a crash.
if not self._conn.object_exists(oid):
return plasma.ObjectID(oid)
raise RuntimeError(
"Maximum number of retries reached while "
+ "attempting to find unused object ID!"
)
[docs] def new_tensor_ref(
self, typ: pyarrow.DataType = None, dim_names: List[str] = None
) -> connection.TensorRef:
"""Allocate an Object ID for a new tensor in Plasma
:param typ: Element datatype. If `ComplexType`, will convert.
:param dim_names: Dimension names
:returns: Reference to tensor
"""
return self.make_tensor_ref(self._allocate_oid(), typ, dim_names)
[docs] def put_new_tensor(
self,
arr: numpy.ndarray,
typ: pyarrow.DataType = None,
dim_names: List[str] = None,
) -> connection.TensorRef:
"""Allocate and create a new tensor in Plasma
:param arr: Data as numpy array
:param typ: Element datatype. If ComplexType, will convert.
:param dim_names: Dimension names
:returns: Reference to tensor
"""
# Allocate in Plasma
ref = self.new_tensor_ref(typ, dim_names)
# Put, return
ref.put(arr)
return ref
[docs] def new_table_ref(
self, schema: pyarrow.Schema = None
) -> connection.TableRef:
"""Allocate an Object ID for a new tensor in Plasma
:param typ: Element datatype. If `ComplexType`, will convert.
:param dim_names: Dimension names
:returns: Reference to tensor
"""
return connection.TableRef(self._conn, self._allocate_oid(), schema)
[docs] def put_new_table(
self,
table: Union[
pyarrow.Table,
Mapping[str, pyarrow.ChunkedArray],
Mapping[str, pyarrow.Array],
Mapping[str, list],
],
schema: pyarrow.Schema = None,
) -> connection.TableRef:
"""Allocate and create a new table in Plasma
See :py:meth:`connection.TableRef.put()` for notes about
possible parameters.
:param table: Table data
:param schema: Table schema
:returns: Reference to table
"""
# Allocate in Plasma
ref = self.new_table_ref(schema)
# Put, return
ref.put(table)
return ref