import os
import time
import numpy as np
import pyarrow as pa
import pyarrow.plasma as plasma
from .base import BaseStore
from .pointers import SharedPointer
from . import queue as q_
__all__ = ['PlasmaStore', 'ArrowQueue']
def get_plasma_path():
return os.path.join(os.getenv('TMPDIR') or '/tmp', 'plasma')
# def start_plasma(plasma_socket=None, plasma_store_memory=1e9):
# import subprocess
# command = [
# 'plasma_store',
# '-s', plasma_socket or get_plasma_path(),
# '-m', str(int(plasma_store_memory))]
# print('Starting plasma server...', command)
# proc = subprocess.Popen(command, stdout=None, stderr=None, shell=False)
# time.sleep(0.5)
# return proc
#
#
# def connect_to_plasma(plasma_socket=None, **kw):
# plasma_socket = plasma_socket or get_plasma_path()
# client = None #plasma.connect(plasma_dir)
# if client is None:
# # if getattr(connect_to_plasma, 'proc', None) is None:
# # connect_to_plasma.proc = start_plasma(plasma_socket, **kw)
#
# client = plasma.connect(plasma_socket)
# return client
def random_object_id():
return plasma.ObjectID(np.random.bytes(20))
def random_unique_object_id(existing):
while True:
id = random_object_id()
if id not in existing:
existing.add(id)
return id
def n_random_unique_object_ids(client, size=10):
existing_ids = set(client.list())
return [
random_unique_object_id(existing_ids)
for i in range(size)]
[docs]class PlasmaStore(BaseStore):
'''Store that puts data in plasma store.
Requires running a plasma store process.
e.g.
.. code-block:: bash
plasma_store -s $TMPDIR/plasma -m 2000000000
'''
Pointer = SharedPointer
def __init__(self, size, plasma_socket=None):
self._plasma_socket_name = plasma_socket or get_plasma_path()
self.client = plasma.connect(self._plasma_socket_name)
# First write can be very slow on platform like Jetson Nano
print("Connected to %s. Warming up..." % self._plasma_socket_name)
t0 = time.time()
ret = self.client.get(self.client.put("warm-up"))
assert (ret == "warm-up")
print("Warmed up in %.4f sec\n" % (time.time()- t0))
self.ids = n_random_unique_object_ids(self.client, size)
self.size = size
_refreshed = False
def refresh(self): # TODO: HOW TO CALL THIS WHEN FORKING ???
self.client = plasma.connect(self._plasma_socket_name)
def __len__(self):
return len(self.client.list())
def put(self, data, meta=None, id=None):
return save_both(self.client, data, meta or {}, id=self.ids[id])
def get(self, id):
if not self._refreshed:
self.refresh()
self._refreshed = True
return load_both(self.client, self.ids[id])
def delete(self, ids):
self.client.delete([self.ids[id] for id in ids])
TYPE = '__PLASMA_SERIALIZE_DTYPE__'
def save_both(client, data, meta=None, id=None):
meta = dict(meta) if meta else {} # make a copy, don't modify the original dict
if data is None:
meta[TYPE] = "void"
object_size = 0
elif isinstance(data, str):
meta[TYPE] = "string"
data = data.encode("utf-8")
object_size = len(data)
elif isinstance(data, np.ndarray):
meta[TYPE] = "array"
tensor = pa.Tensor.from_numpy(data)
object_size = pa.ipc.get_tensor_size(tensor)
else:
data = pa.serialize(data).to_buffer()
object_size = len(data)
meta = pa.serialize(meta).to_buffer().to_pybytes()
object_id = id or random_object_id()
buf = client.create(object_id, object_size, metadata=meta)
stream = pa.FixedSizeBufferWriter(buf)
if isinstance(data, (bytes, pa.lib.Buffer)):
stream.write(data)
elif data is not None:
stream.set_memcopy_threads(4)
pa.ipc.write_tensor(tensor, stream)
client.seal(object_id)
return object_id
def load_both(client, id):
meta, data = client.get_buffers([id], timeout_ms=1, with_meta=True)[0]
meta = pa.deserialize(meta)
dtype = meta.pop(TYPE, None)
if dtype == "void":
data = None
elif dtype == "string":
data = data.to_pybytes().decode("utf-8")
elif dtype == "array":
reader = pa.BufferReader(data)
tensor = pa.ipc.read_tensor(reader)
data = tensor.to_numpy()
else:
data = pa.deserialize(data)
return data, meta
class pyarrow_serializer:
def loads(self, obj):
return pa.deserialize(obj)
def dumps(self, obj):
return pa.serialize(obj).to_buffer()
q_.register_serializer(pyarrow_serializer(), 'arrow')
class ArrowQueue(q_.Queue):
serializer = 'arrow'
#
#
# def test():
# print(get_plasma_path())
# client = plasma.connect(get_plasma_path())
# print(client.store_capacity())
# print(client.list())
# client.delete(client.list())
# print(client.list())
#
# print("Generating...")
# data = np.ones(1 * 10 ** 4, dtype=np.uint8)
# # data = "Hello"
# # data = None
# print("Done")
#
# # id = save_data(client, data)
# # data2 = load_data(client, id)
# # print(data2)
# # client.delete([id])
#
# meta = {"Foo": 10}
#
# # id2 = save_meta(client, meta)
# # meta2 = load_meta(client, id2)
# # print(meta2)
#
# id3 = save_both(client, data, meta)
# data3, meta3 = load_both(client, id3)
# print(data3, meta3)
#
#
# def test2():
# store = Store()
# for i in range(5):
# print('-'*10, i)
# data = np.ones(1 * 10 ** 4, dtype=np.uint8)
# meta = {"Foo": 10}
#
# id = store.put(data, meta)
# data2, meta2 = store.get(id)
# print(id, data2, meta2)
#
#
# if __name__ == '__main__':
# # test()
# test2()
# exit(0)
#
# client = plasma.connect(get_plasma_path())
# print(client.store_capacity())
# print(client.list())
# client.delete(client.list())
# print(client.list())
# client.evict(10000000000)
#
# object_id = plasma.ObjectID(20 * b"a")
# object_size = 1000
# print(object_id)
# if client.contains(object_id):
# client.delete([object_id])
#
# buffer = memoryview(client.create(object_id, object_size))
#
# for i in range(1000):
# buffer[i] = i % 128
# client.seal(object_id)
#
# print("Generating...")
# data = np.ones(10**9)
# print("Done")
# tensor = pa.Tensor.from_numpy(data)
#
# random_id = plasma.ObjectID(np.random.bytes(20))
# sz = pa.ipc.get_tensor_size(tensor)
# print("sz", sz)
# buf = client.create(random_id, sz)
#
# stream = pa.FixedSizeBufferWriter(buf)
# stream.set_memcopy_threads(6)
# a = time.time()
# print("Writing...")
# pa.ipc.write_tensor(tensor, stream)
# print("Writing took ", time.time() - a)
# client.seal(random_id)
#
# #################################################################
# client2 = plasma.connect(get_plasma_path())
#
# t0 = time.time()
# print("Reading...")
# [buf2] = client.get_buffers([random_id])
# reader = pa.BufferReader(buf2)
# tensor2 = pa.ipc.read_tensor(reader)
# array = tensor2.to_numpy()
# # array[10] = 0
# # array2 = np.copy(array)
# # array2 = copy.deepcopy(array)
# # array2[10] = 0
# print("Reading took ", time.time() - t0)
#
# object_id2 = plasma.ObjectID(20 * b"a")
# [buffer2] = client2.get_buffers([object_id2], timeout_ms=1)
# view2 = memoryview(buffer2)
# for i in range(1000):
# if buffer[i] != view2[i]:
# print("Mismatch", i)
#
# meta = {"shape": (10, 20), "timestamps": [0, 1, 2], "other": 1.2}
#
# auto_id = client.put(meta)
# print(auto_id)
# got = client.get(auto_id, timeout_ms=1)
# print(got)