Source code for reip.stores.producer

import reip
import multiprocessing as mp
from reip.stores import Store, PlasmaStore, QueueStore, Pointer, Counter, HAS_PYARROW


[docs]class Producer(reip.Sink): def __init__(self, size=100, delete_rate=10, task_id=reip.UNSET, skip_no_readers=True, **kw): self.size = size + 1 # need extra slot because head == tail means empty self.delete_rate = delete_rate self.task_id = task_id self.stores = {} self.head = Pointer(self.size) self.tail = Pointer(self.size) self.skip_no_readers = skip_no_readers # self._dropped = Counter() self.readers = [] super().__init__(**kw) def __str__(self): return '<{} \n head={}\n tail={}\n readers={}\n stores={}>'.format( self.__class__.__name__, self.head, self.tail, [str(p) for p in self.readers], [str(s) for s in self.stores.values()])
[docs] def spawn(self): for store in self.stores.values(): if hasattr(store, 'spawn'): store.spawn()
[docs] def join(self): for store in self.stores.values(): if hasattr(store, 'join'): store.join()
def __len__(self): return self.head.counter - min( (r.counter for r in self.readers), default=self.tail.counter)
[docs] def full(self): self._trim() return (self.head.counter - self.tail.counter) >= (self.size - 1)
# @property # def dropped(self): # return self._dropped.value # # @dropped.setter # def dropped(self, value): # self._dropped.value = value def _trim(self): '''Delete any stale items from the queue.''' if self.readers: # get the number of stale entries new_value = min( (r.counter for r in self.readers), default=self.tail.counter) to_delete = [ v % self.size for v in range(self.tail.counter, new_value)] # delete items if there's enough if len(to_delete) > self.size / self.delete_rate: # print("Trimming", len(to_delete)) for store in self.stores.values(): store.delete(to_delete) self.tail.counter = new_value def _put(self, buffer): '''Send data to stores.''' if self.skip_no_readers and not self.readers: return data, meta = buffer meta = meta or {} for store in self.stores.values(): store.put(data, meta, id=self.head.pos) self.head.counter += 1
[docs] def gen_source(self, task_id=reip.UNSET, throughput='small', **kw): '''Generate a source from this sink. Arguments: task_id (str): the identifier for the task. throughput (str): The size of data. This determines the serialization method to use. Should be: - 'small' for data < 1GB - 'medium' for data < 3GB - 'large' for data > 3GB **kwargs: arguments to pass to `store.Customer`. ''' # create the store if it doesn't exist already same_context = ( reip.UNSET.check(task_id) or reip.UNSET.check(self.task_id) or task_id == self.task_id) store_id = same_context, throughput if store_id not in self.stores: # use True/False as store keys if same_context: store = Store(self.size) else: if HAS_PYARROW and throughput == 'large': store = PlasmaStore(self.size) else: store = QueueStore(self.size) if HAS_PYARROW and throughput == 'medium': kw.setdefault('serializer', 'arrow') # convert pointers to shared pointers self.head = self.head.as_shared() self.tail = self.tail.as_shared() # self._dropped = self._dropped.as_shared() self.stores[store_id] = store # create a customer and a pointer. store = self.stores[store_id] self.readers.append(store.Pointer(self.size, self.tail.counter)) return store.Customer(self, len(self.readers) - 1, store_id=store_id, **kw)