Source code for reip.interface

import time
import reip


def skip_strategy(source):
    while not source.empty() and source._skip_id < source.skip:
        # source._get()
        source.next()  # discard the buffer
        source._skip_id += 1
        source.skipped += 1

    if source._skip_id == source.skip and not source.empty():
        buffer = source._get()
        source._skip_id = 0  # might not process this buffer in multi_source block
        return buffer


def latest_strategy(source):
    while not source.last():
        # source._get()
        source.next()  # discard the buffer
        source.skipped += 1
    return source._get()


def all_strategy(source):
    if not source.empty():
        return source._get()


[docs]class Sink: '''An abstract interface representing a place to put data. Essentially, a put-only queue. ''' def __init__(self): self.dropped = 0 self._full_delay = 1e-6
[docs] def spawn(self): '''An optional method that allows a sink to perform initialization.''' pass
[docs] def join(self): '''An optional method that allows a sink to perform cleanup.''' pass
def __len__(self): '''How many elements are in the queue.''' raise NotImplementedError
[docs] def full(self): '''Is the queue full?''' raise NotImplementedError
def _put(self, buffer): '''The overrideable method to put items in the queue.''' raise NotImplementedError
[docs] def wait(self, timeout=None): '''Wait until the queue has space for another element.''' t0 = time.time() while self.full(): time.sleep(self._full_delay) if timeout and time.time() - t0 > timeout: return None # QUESTION: TimeoutError instead?
[docs] def put(self, buffer, block=True, timeout=None): '''Put an element in the sink.''' if block: self.wait(timeout) return self.put_nowait(buffer)
[docs] def put_nowait(self, buffer): '''Put an element in the sink, dropping the value if the queue is full.''' if self.full(): self.dropped += 1 else: self._put(buffer)
def gen_source(self, **kw): raise NotImplementedError
[docs]class Source: '''An abstract interface representing a place to get data. Essentially, a get-only queue. ''' # TODO: make it easier for someone to register a new strategy # e.g.: # @Source.register # def SkipStochastic(source): ... All = 'all' Latest = 'latest' Skip = 'skip' strategies = { All: all_strategy, Latest: latest_strategy, Skip: skip_strategy, } _strategy = All def __init__(self, strategy=All, skip=0, default=None): self.strategy = strategy self.skip = skip self._skip_id = 0 self.skipped = 0 self._empty_delay = 1e-6 self._default = reip.util.as_func(default) if default is not None else default @property def strategy(self): return self._strategy @strategy.setter def strategy(self, strategy): if strategy not in self.strategies: raise ValueError("Unknown strategy '{}'".format(strategy)) self._strategy = strategy self._strategy_get = self.strategies[strategy] def __len__(self): '''How many elements are in the Source queue.''' raise NotImplementedError
[docs] def empty(self): '''Is the Source queue empty?''' return not len(self) and self._default is None
[docs] def last(self): '''Returns True if there is only one element left.''' ln = len(self) return ln == 1 or (not ln and self._default is not None)
def next(self): raise NotImplementedError def _get(self): raise NotImplementedError
[docs] def wait(self, timeout=None): '''Wait until an element is ready.''' t0 = time.time() while self.empty(): time.sleep(self._empty_delay) if timeout and time.time() - t0 > timeout: return None # QUESTION: TimeoutError instead?
[docs] def get(self, block=True, timeout=None): '''Get the next element in the queue.''' if block: self.wait(timeout) return self.get_nowait()
[docs] def get_nowait(self): '''Get the next element in the queue, returning None if no value is available.''' if len(self) == 0: if self._default is not None: return self._default() return None return self._strategy_get(self)