Source code for reip.util.iters

import time
import itertools
import numpy as np


def run_loop(duration=None, rate=None, interval=None, n=None, delay=1e-5):
    loop_ = sleep_loop(delay) if delay else loop()
    return throttled(
        timed(limit(loop_, n), duration=duration),
        rate, interval, delay=delay or 0)

[docs]def loop(i=0): '''Infinite loop''' while True: yield i i += 1
def sleep_loop(delay=1e-5): for _ in loop(): yield _ time.sleep(delay)
[docs]def timed(it=None, duration=None, error=False): '''Run a loop for a predetermined amount of time.''' if isinstance(it, (float, int)): duration, it = it, loop() if it is None: it = loop() if not duration: yield from it return t0 = time.time() for x in it: if time.time() - t0 >= duration: if error: raise TimeoutError('Loop did not exit in under {} seconds.'.format(duration)) break yield x
[docs]def throttled(it=None, rate=None, interval=None, delay=1e-5, initial=None): '''Throttle a loop to take ''' if initial: time.sleep(initial) if it is None: it = loop() rate = 1. / interval if interval else rate if not rate: yield from it return t1 = time.time() for x in it: yield x t1, t0 = time.time(), t1 dt = max((1. / rate) - (t1 - t0 + 0.5 * delay), 0) if dt: time.sleep(dt) # wraps around for _ in because that may take a non-trivial amount of time # e.g. (time.sleep(1) for _ in loop()) t1 = time.time()
def thread_sleep(it, delay=1e-5): if not delay: yield from it return for _ in it: yield _ time.sleep(delay) def resample_iter(it, interval): if not interval: yield from it return t = time.time() for _ in it: if time.time() - t >= interval: t = time.time() yield _ def limit(it, n=None): if not n: yield from it return # yield only n elements for i, x in zip(range(n), it): yield x
[docs]def peakiter(it, n=1): '''Check the value first n items of an iterator without unloading them from the iterator queue.''' it = iter(it) items = [_ for _, i in zip(it, range(n))] return items, itertools.chain(items, it)
# numpy
[docs]def npgenarray(it, shape, **kw): '''Create a np.ndarray from a generator. Must specify at least the length of the generator or the entire shape of the final array.''' if isinstance(shape, int): (x0,), it = peakiter(it) shape = (shape,) + x0.shape X = np.empty(shape, **kw) for i, x in enumerate(it): X[i] = x return X
def npmaparray(func, X): return npgenarray((func(x) for x in X), len(X)) # if __name__ == '__main__': # for _ in timed(throttled(loop(), 1), 10): # ...