import time
import threading
import warnings
from contextlib import contextmanager
import remoteobj
import reip
from reip.stores import Producer
from reip.util import text, Meta
'''
'''
__all__ = ['Block']
class _BlockSinkView:
'''This is so that we can select a subview of a block's sinks. This is similar
in principal to numpy views. The alternative is to store the state on the original
object, but that would have unintended consequences if someone tried to create two
different views from the same block.
'''
def __init__(self, block, idx):
self._block = block
self._index = idx
@property
def item(self): # can be a single sink, or multiple i.e. block[0] or block[:2]
return self._block.sinks[self._index]
@property
def items(self): # always a list
return reip.util.as_list(self.item)
def __iter__(self): # iterate over sink
return iter(self.items)
def __getitem__(self, key): # get sub index
return self.items[key]
def to(self, *others, squeeze=True, **kw):
'''Connect this blocks sinks to other blocks' sources.
.. code-block:: python
InputBlock().to(ProcessBlock())
'''
outs = [other(self.items, **kw) for other in others]
return outs[0] if squeeze and len(outs) == 1 else outs
def output_stream(self, strategy='all', source_strategy=all, **kw):
'''Create a Stream iterator which will let you iterate over the
outputs of a block.'''
return reip.Stream([s.gen_source(strategy=strategy, **kw) for s in self.items], strategy=source_strategy, **kw)
[docs]class Block:
'''This is the base instance of a block.
Arguments:
queue (int): the maximum queue length.
n_inputs (int, or None): the number of sources to expect. If None,
it will accept a variable number of sources.
n_outputs (int): the number of sinks to expect.
- If `n_outputs=0`, then there will be a single sink, that outputs no
buffer value, but does output metadata.
e.g.
```python
# block 1
return {'just some metadata': True}
# or
return [], {'just some metadata': True}
# block 2
def process(self, meta): # no buffer value
assert meta['just some metadata'] == True
```
- If `n_outputs=None`, then the block will have no sinks.
blocking (bool): should the block wait for sinks to have a free space
before processing more items?
max_rate (int): the maximum number of iterations per second. If the block
processes items any faster, it will throttle and sleep the required
amount of time to meet that requirement.
e.g. if max_rate=5, then an iteration will take at minimum 0.2s.
graph (reip.Graph, reip.Task, or False): the graph instance to be added to.
name ()
'''
_thread = None
_stream = None
_delay = 1e-4
parent_id, task_id = None, None
started = ready = done = closed = _terminated = False
processed = 0
controlling = False
max_rate = None
[docs] def __init__(self, n_inputs=1, n_outputs=1, queue=100, blocking=False, print_summary=True,
max_rate=None, min_interval=None, max_processed=None, graph=None, name=None,
source_strategy=all, extra_kw=True, extra_meta=None, log_level=None,
handlers=None, modifiers=None, input_modifiers=None, **kw):
self._except = remoteobj.LocalExcept(raises=True)
self.name = name or f'{self.__class__.__name__}_{id(self)}'
self.parent_id, self.task_id = reip.Graph.register_instance(self, graph)
self._print_summary = print_summary
# sources and sinks
# by default, a block takes a variable number of sources.
# If a block has a fixed number of sources, you can specify that
# and it will throw an exception when spawning if the number of
# sources does not match.
self._block_sink_queue_length = queue # needed for set sink count
self.n_expected_sources = n_inputs
self.sources, self.sinks = [], []
self.set_block_source_count(n_inputs)
self.set_block_sink_count(n_outputs)
# used in Stream class. Can be all(), any() e.g. source_strategy=all
self._source_strategy = source_strategy
# handlers are wrapper functions that can do things like catch errors and restart when a block is closing.
self.handlers = reip.util.as_list(handlers or [])
# modifiers are functions that can be used to alter the output of the block before they are
# sent to a sink
self.modifiers = reip.util.as_list(modifiers or [])
self.input_modifiers = reip.util.as_list(input_modifiers or [])
if min_interval and max_rate:
warnings.warn((
'Both max_rate ({}) and min_interval ({}) are set, but are '
'mutually exclusive (max_rate=1/min_interval). min_interval will'
'be used.').format(max_rate, min_interval))
if max_rate:
self.max_rate = max_rate
if min_interval:
self.min_interval = min_interval
self.max_processed = max_processed
self._put_blocking = blocking
self._extra_meta = reip.util.as_list(extra_meta or [])
if self._extra_meta:
# this will flatten a list of dicts into a single dict. Since call=False,
# if there is a list of dictionaries and a function, any dictionaries
# before the function will be collapsed, and any after will be left.
# Calling again (without call=False), will evaluate fully and return
# the a flattened dict.
self._extra_meta = reip.util.mergedict(self._extra_meta, call=False)
if extra_kw:
self.extra_kw = kw
for key, value in kw.items():
setattr(self, key, value)
elif kw:
raise TypeError('{} received {} unexpected keyword arguments: {}.'.format(
self, len(kw), set(kw)))
# block timer
self._sw = reip.util.Stopwatch(self.name)
self.log = reip.util.logging.getLogger(self, level=log_level)
# signals
self._reset_state()
@property
def min_interval(self):
return self.max_rate
@min_interval.setter
def min_interval(self, value):
self.max_rate = 1. / value if value is not None else value
def set_block_source_count(self, n):
# NOTE: if n is -1, no resize will happen
self.sources = reip.util.resize_list(self.sources, n, None)
def set_block_sink_count(self, n):
# NOTE: if n is None, no resize will happen
new_sink = lambda: Producer(self._block_sink_queue_length, task_id=self.task_id)
self.sinks = reip.util.resize_list(self.sinks, n, new_sink)
def _reset_state(self):
# state
self.started = False
self.ready = False
self.done = False
self.closed = False
self._terminated = False
# stats
self.processed = 0
self.old_processed = 0
self.old_time = time.time()
self._sw.reset()
self._except.clear()
[docs] def __repr__(self):
return 'Block({}): ({}/{} in, {} out; {} processed) - {}'.format(
self.name, sum(s is not None for s in self.sources),
self.n_expected_sources,
len(self.sinks), self.processed,
self.block_state_name)
@property
def block_state_name(self):
return (
(type(self._exception).__name__
if self._exception is not None else 'error') if self.error else
('terminated' if self.terminated else 'done') if self.done else
('running' if self.running else 'paused') if self.ready else
'started' if self.started else '--' # ??
) # add "uptime 35.4s"
# Graph definition
[docs] def __call__(self, *others, index=0, **kw):
'''Connect other block sinks to this blocks sources.
If the blocks have multiple sinks, they will be passed as additional
inputs.
.. code-block:: python
ProcessBlock()(InputBlock())
'''
j = next(
(i for i, s in enumerate(self.sources) if s is None), len(self.sources)
) if index == -1 else index
for i, other in enumerate(others):
# permit argument to be a block
# permit argument to be a sink or a list of sinks
if isinstance(other, Block):
sinks = other.sinks
elif isinstance(other, _BlockSinkView):
sinks = other.items
else:
sinks = reip.util.as_list(other)
if sinks:
# make sure the list is long enough
self.set_block_source_count(j + len(sinks))
# create and add the source
for j, sink in enumerate(sinks, j):
self.sources[j] = sink.gen_source(
task_id=self.task_id, **kw)
j += 1 # need to increment cursor so we don't repeat last index
return self
[docs] def to(self, *others, squeeze=True, **kw):
'''Connect this blocks sinks to other blocks' sources.
.. code-block:: python
InputBlock().to(ProcessBlock())
'''
outs = [other(self, **kw) for other in others]
return outs[0] if squeeze and len(outs) == 1 else outs
[docs] def __or__(self, other):
'''Connect blocks using Unix pipe syntax.
.. code-block:: python
InputBlock() | ProcessBlock()
'''
return self.to(*reip.util.as_list(other))
[docs] def __ror__(self, other):
'''Connect blocks using Unix pipe syntax. See :meth:`__or__`'''
return self(*reip.util.as_list(other))
[docs] def output_stream(self, **kw):
'''Create a Stream iterator which will let you iterate over the
outputs of a block.'''
return reip.Stream.from_block(self, **kw)
def __getitem__(self, key):
return _BlockSinkView(self, key)
# User Interface
[docs] def init(self):
'''Initialize the block.'''
[docs] def process(self, *xs, meta=None):
'''Process data.'''
return xs, meta
[docs] def finish(self):
'''Cleanup.'''
# main process loop
# TODO common worker class
def run(self, duration=None, **kw):
with self.run_scope(**kw):
self.wait(duration)
@contextmanager
def run_scope(self, raise_exc=True):
try:
self.spawn()
yield self
except KeyboardInterrupt:
self.terminate()
finally:
self.join(raise_exc=raise_exc)
def wait(self, duration=None):
for _ in reip.util.iters.timed(reip.util.iters.sleep_loop(self._delay), duration):
if self.done or self.error:
return True
def _main(self, _ready_flag=None, duration=None):
'''The main thread target function. Handles uncaught exceptions and
generic Block context management.'''
try:
self.started = True
self.closed = False
self.log.debug(text.blue('Starting...'))
time.sleep(self._delay)
with self._sw(), self._except(raises=False):
# create a stream from sources with a custom control loop
self._stream = reip.Stream.from_block_sources(
self, name=self.name, _sw=self._sw,
strategy=self._source_strategy)
# self.__first_time = True
# this lets us wrap the block's run function with retry loops, error suppression and
# whatever else might be useful
run = reip.util.partial(self.__main_run, _ready_flag=_ready_flag, duration=duration)
for wrapper in self.handlers[::-1]:
run = reip.util.partial(wrapper, self, run)
run()
finally:
try:
# propagate stream signals to sinks e.g. CLOSE
if self._stream.signal is not None:
self._send_sink_signal(self._stream.signal)
finally:
self.done = True
self.started = False
self.closed = True
self.log.debug(text.green('Done.'))
def __main_run(self, _ready_flag=None, duration=None):
# # This is to return from a retry loop that doesn't want to close
# if not self.__first_time and self.closed:
# return
# self.__first_time = False
try:
with self._stream:
self._sw.tick() # offset time delay due to the reip initialization (e.g. plasma store warm-up)
# block initialization
with self._sw('init'): #, self._except('init')
self.init()
self.ready = True
self.log.debug(text.green('Ready.'))
if _ready_flag is not None:
with self._sw('wait'):
_ready_flag.wait()
# the loop
loop = reip.util.iters.throttled(
reip.util.iters.timed(reip.util.iters.loop(), duration),
self.max_rate, delay=self._delay)
self.old_time = time.time() # offset self.init() time delay for proper immediate speed calculation
for _ in self._sw.iter(loop, 'sleep'):
inputs = self._stream.get()
if inputs is None:
break
# process each input batch
with self._sw('process'): #, self._except('process')
buffers, meta = inputs
if self._extra_meta:
meta.maps += reip.util.flatten(self._extra_meta, call=True, meta=meta)
for func in self.input_modifiers:
buffers, meta = func(*buffers, meta=meta)
outputs = self.process(*buffers, meta=meta)
for func in self.modifiers:
outputs = func(outputs)
# This block of code needs to be here or else self.processed is not counting calls to self.process() function
# but buffers generated by current block and thus self.processed will be zero or inacurate in a number of cases:
# (i) black hole block that is only consuming data, (ii) data source block that has a buitin buffer bundling/grouping capabilities
# We can always add another self.generated counter if we need/want to
self.processed += 1
# limit the number of blocks
if self.max_processed and self.processed >= self.max_processed:
self.close(propagate=True)
# send each output batch to the sinks
with self._sw('sink'):
self.__send_to_sinks(outputs, meta)
except KeyboardInterrupt as e:
self.log.info(text.yellow('Interrupting'))
self.log.exception(e)
# reip.util.print_stack('Interrupted here')
except Exception as e:
self.log.exception(e)
raise
finally:
self.ready = False
# finish up and shut down
self.log.debug(text.yellow('Finishing...'))
with self._sw('finish'): # , self._except('finish', raises=False)
self.finish()
def __send_to_sinks(self, outputs, meta_in=None):
'''Send the outputs to the sink.'''
source_signals = [None]*len(self.sources)
# retry all sources
for outs in outputs if reip.util.is_iter(outputs) else (outputs,):
if outs == reip.RETRY:
source_signals = [reip.RETRY]*len(self.sources)
elif outs == reip.CLOSE:
self.close(propagate=True)
elif outs == reip.TERMINATE:
self.terminate(propagate=True)
# increment sources but don't have any outputs to send
elif outs is None:
pass
# self._stream.next()
# increment sources and send outputs
else:
# See self.__main_run()
# self.processed += 1
# # limit the number of blocks
# if self.max_processed and self.processed >= self.max_processed:
# self.close(propagate=True)
# detect signals meant for the source
if self.sources:
if outs is not None and any(any(t.check(o) for t in reip.SOURCE_TOKENS) for o in outs):
# check signal values
if len(outputs) > len(self.sources):
raise RuntimeError(
'Too many signals for sources in {}. Got {}, expected a maximum of {}.'.format(
self, len(outputs), len(self.sources)))
for i, o in enumerate(outs):
if o is not None:
source_signals[i] = o
continue
# convert outputs to a consistent format
outs, meta = prepare_output(outs, input_meta=meta_in)
# pass to sinks
for sink, out in zip(self.sinks, outs):
if sink is not None:
sink.put((out, meta), self._put_blocking)
# increment sources
# self._stream.next()
for src, sig in zip(self.sources, source_signals):
if sig is reip.RETRY:
pass
else:
src.next()
def _send_sink_signal(self, signal, block=True, meta=None):
'''Emit a signal to all sinks.'''
self.log.debug(text.yellow(text.l_('sending', signal)))
for sink in self.sinks:
if sink is not None:
sink.put((signal, meta or {}), block=block)
# Thread management
[docs] def spawn(self, wait=True, _controlling=True, _ready_flag=None):
'''Spawn the block thread'''
try:
self.controlling = _controlling
self.log.debug(text.blue('Spawning...'))
# print(self.summary())
self._check_source_connections()
# spawn any sinks that need it
for s in self.sinks:
if hasattr(s, 'spawn'):
s.spawn()
self._reset_state()
self.resume()
self._thread = remoteobj.util.thread(self._main, _ready_flag=_ready_flag, daemon_=True, raises_=False)
# threading.Thread(target=self._main, kwargs={'_ready_flag': _ready_flag}, daemon=True)
self._thread.start()
if wait:
self.wait_until_ready()
if self.controlling:
self.raise_exception()
finally:
# thread didn't start ??
if self._thread is None or not self._thread.is_alive():
self.done = True
def _check_source_connections(self):
'''Check if there are too many sources for this block.'''
# check for any empty sources
disconnected = [i for i, s in enumerate(self.sources) if s is None]
if disconnected:
raise RuntimeError(f"Sources {disconnected} in {self} not connected.")
# check for exact source count
if (self.n_expected_sources is not None and self.n_expected_sources != -1
and len(self.sources) != self.n_expected_sources):
raise RuntimeError(
f'Expected {self.n_expected_sources} sources '
f'in {self}. Found {len(self.sources)}.')
def remove_extra_sources(self, n=None):
n = n or self.n_expected_sources
if n is not None and n != -1:
self.sources = self.sources[:n]
[docs] def wait_until_ready(self):
'''Wait until the block is initialized.'''
while not self.ready and not self.error and not self.done:
time.sleep(self._delay)
def join(self, close=True, terminate=False, raise_exc=None, timeout=None):
# close stream
if close:
self.close()
if terminate:
self.terminate()
# join any sinks that need it
for s in self.sinks:
if hasattr(s, 'join'):
s.join()
# close thread
if self._thread is not None:
self._thread.join(timeout=timeout)
if (self.controlling if raise_exc is None else raise_exc):
self.raise_exception()
if self._print_summary: # print now or part of the stats will be lost if the block is used inside of Task
print(self.stats_summary())
def raise_exception(self):
self._except.raise_any()
def log_exception(self):
for e in self._except.all():
self.log.exception(e)
@property
def _exception(self):
return self._except.last
@property
def all_exceptions(self):
return self._except.all()
def __export_state__(self):
return {
'_sw': self._sw,
'started': self.started, 'ready': self.ready,
'done': self.done, #'error': self.error,
'terminated': self.terminated,
# '_stream.terminated': self._stream.terminated,
# '_stream.should_wait': self._stream.should_wait,
# '_stream.running': self._stream.running,
'_except': self._except,
}
def __import_state__(self, state):
for k, v in state.items():
try:
x, ks = self, k.lstrip('!').split('.')
for ki in ks[:-1]:
x = getattr(x, ki)
setattr(x, ks[-1], v)
except AttributeError:
raise AttributeError('Could not set attribute {} = {}'.format(k, v))
# State management
def pause(self):
if self._stream is not None:
self._stream.pause()
def resume(self):
if self._stream is not None:
self._stream.resume()
def close(self, propagate=False):
if self._stream is not None:
self._stream.close()
self.closed = True
if propagate:
self._send_sink_signal(reip.CLOSE)
def terminate(self, propagate=False):
if self._stream is not None:
self._stream.terminate()
if propagate:
self._send_sink_signal(reip.TERMINATE)
@property
def error(self):
return self._exception is not None
# XXX: this is temporary. idk how to elegantly handle this
@property
def running(self):
return self._stream.running if self._stream is not None else False
# @property
# def closed(self):
# return self._stream.should_wait if self._stream is not None else True
@property
def terminated(self):
return self._terminated or (self._stream.terminated if self._stream is not None else False)
@terminated.setter
def terminated(self, value):
self._terminated = value
# debug
def short_str(self):
return '[B({})[{}/{}]({})]'.format(
self.name, len(self.sources), len(self.sinks),
self.block_state_name)
def stats(self):
total_time = self._sw.stats().sum if '' in self._sw else 0
init_time = self._sw.stats("init").sum if 'init' in self._sw else 0
finish_time = self._sw.stats("finish").sum if 'finish' in self._sw else 0
return {
'name': self.name,
'total_time': total_time,
'processed': self.processed,
'speed': self.processed / (total_time - init_time - finish_time) if total_time else 0,
'dropped': [getattr(s, "dropped", None) for s in self.sinks],
'skipped': [getattr(s, "skipped", None) for s in self.sources],
'n_in_sources': [len(s) if s is not None else None for s in self.sources],
'n_in_sinks': [len(s) if s is not None else None for s in self.sinks],
'sw': self._sw,
'exception': self._exception,
'all_exceptions': self._except._groups,
}
def summary(self):
return text.block_text(
text.green(str(self)),
'Sources:',
text.indent(text.b_(*(f'- {s}' for s in self.sources)) or None, w=4)[2:],
'',
'Sinks:',
text.indent(text.b_(*(f'- {s}' for s in self.sinks)) or None, w=4)[2:],
ch=text.blue('*'), n=40,
)
[docs] def status(self):
'''
e.g. `Block_123 + 2 buffers (2.09 x/s), 9 total (1.47 x/s avg), sources=[1], sinks=[0]`
'''
n = self.processed
total_time = self._sw.elapsed()
init_time = self._sw.stats("init").sum if 'init' in self._sw else 0
speed_avg = n / (total_time - init_time)
n_new = n - self.old_processed
speed_now = n_new / (time.time() - self.old_time)
self.old_processed, self.old_time = n, time.time()
n_src = [len(s) for s in self.sources]
n_snk = [len(s) for s in self.sinks]
return f'{self.name}\t + {n_new:3} buffers ({speed_now:,.2f} x/s), {n:5} total ({speed_avg:,.2f} x/s avg), sources={n_src}, sinks={n_snk}'
def stats_summary(self):
stats = self.stats()
# return text.block_text(
return text.b_(
# block title
'\nStats for {summary_banner}',
# any exception, if one was raised.
text.red('({}) {}'.format(type(self._exception).__name__, self._exception))
if self._exception else None,
# basic stats
'Processed {processed} buffers in {total_time:.2f} sec. ({speed:.2f} x/s)',
'Dropped: {dropped} Skipped: {skipped} Left in Queue: in={n_in_sources} out={n_in_sinks}',
# timing info
# self._sw, ch=text.blue('*')).format(
self._sw).format(
summary_banner=text.red(self) if self.error else text.green(self),
**stats)
# def print_stats(self):
# print(self.stats_summary())
def prepare_output(outputs, input_meta=None, expected_length=None):
'''Take the inputs from block.process and prepare to be passed to block.sinks.'''
if not outputs:
return (), {}
bufs, meta = None, None
if isinstance(outputs, tuple):
if len(outputs) == 2:
bufs, meta = outputs
elif isinstance(outputs, (Meta, dict)):
meta = outputs
bufs = list(bufs) if bufs else []
if expected_length: # pad outputs with blank values
bufs.extend((reip.BLANK,) * max(0, expected_length - len(bufs)))
if input_meta: # merge meta as a new layer
if isinstance(meta, Meta): # only the user did not wish to override it
meta = Meta(meta, input_meta)
else:
meta = Meta(meta)
return bufs, meta or Meta()