Core Components

class reip.Block(n_inputs=1, n_outputs=1, queue=100, blocking=False, print_summary=True, max_rate=None, min_interval=None, max_processed=None, graph=None, name=None, source_strategy=<built-in function all>, extra_kw=True, extra_meta=None, log_level=None, handlers=None, modifiers=None, input_modifiers=None, **kw)[source]

This is the base instance of a block.

Parameters
  • queue (int) – the maximum queue length.

  • n_inputs (int, or None) – the number of sources to expect. If None, it will accept a variable number of sources.

  • n_outputs (int) –

    the number of sinks to expect. - If n_outputs=0, then there will be a single sink, that outputs no

    buffer value, but does output metadata. e.g. ```python # block 1 return {‘just some metadata’: True} # or return [], {‘just some metadata’: True}

    # block 2 def process(self, meta): # no buffer value

    assert meta[‘just some metadata’] == True

    ```

    • If n_outputs=None, then the block will have no sinks.

  • blocking (bool) – should the block wait for sinks to have a free space before processing more items?

  • max_rate (int) – the maximum number of iterations per second. If the block processes items any faster, it will throttle and sleep the required amount of time to meet that requirement. e.g. if max_rate=5, then an iteration will take at minimum 0.2s.

  • graph (reip.Graph, reip.Task, or False) – the graph instance to be added to.

  • () (name) –

__init__(n_inputs=1, n_outputs=1, queue=100, blocking=False, print_summary=True, max_rate=None, min_interval=None, max_processed=None, graph=None, name=None, source_strategy=<built-in function all>, extra_kw=True, extra_meta=None, log_level=None, handlers=None, modifiers=None, input_modifiers=None, **kw)[source]
__repr__()[source]

Return repr(self).

__call__(*others, index=0, **kw)[source]

Connect other block sinks to this blocks sources. If the blocks have multiple sinks, they will be passed as additional inputs.

ProcessBlock()(InputBlock())
to(*others, squeeze=True, **kw)[source]

Connect this blocks sinks to other blocks’ sources.

InputBlock().to(ProcessBlock())
__or__(other)[source]

Connect blocks using Unix pipe syntax.

InputBlock() | ProcessBlock()
__ror__(other)[source]

Connect blocks using Unix pipe syntax. See __or__()

output_stream(**kw)[source]

Create a Stream iterator which will let you iterate over the outputs of a block.

init()[source]

Initialize the block.

process(*xs, meta=None)[source]

Process data.

finish()[source]

Cleanup.

spawn(wait=True, _controlling=True, _ready_flag=None)[source]

Spawn the block thread

wait_until_ready()[source]

Wait until the block is initialized.

__weakref__

list of weak references to the object (if defined)

status()[source]

e.g. Block_123 + 2 buffers (2.09 x/s), 9 total (1.47 x/s avg), sources=[1], sinks=[0]

class reip.Graph(*blocks, log_level='info', **kw)[source]

Represents a collection of Blocks.

__init__(*blocks, log_level='info', **kw)[source]
__repr__()[source]

Return repr(self).

run(duration=None, stats_interval=1, print_graph=True, **kw)[source]

Run a graph and all of its child blocks.

with reip.Graph() as g:
    BlockA().to(BlockB()).to(BlockC())

g.run(duration=10)
run_scope(wait=True, raise_exc=True)[source]

Run a graph and its child blocks while inside the context manager.

with reip.Graph() as g:
    BlockA().to(BlockB()).to(BlockC())

with g.run_scope():
    # gives you more flexibility in here
    g.wait(duration=5)
# equivalent to g.run(duration=5)
property ready

True if all child blocks are ready

property running

True if any child blocks are running

property terminated

True if any child blocks are ready

property done

True if any child blocks are done

property error

True if any child blocks threw an error

spawn(wait=True, _controlling=True, _ready_flag=None, **kw)[source]

Spawns the graph and its children.

join(close=True, terminate=False, raise_exc=None, **kw)[source]

Joins the graph and its children.

pause()[source]

Pause all blocks.

resume()[source]

Resume all blocks.

close()[source]

Close all blocks.

terminate()[source]

Terminate all blocks.

raise_exception()[source]

Raise any exceptions from blocks.

class reip.Task(*blocks, graph=None, **kw)[source]
__init__(*blocks, graph=None, **kw)[source]
__repr__()[source]

Return repr(self).

spawn(wait=True, _controlling=True, _ready_flag=None)[source]

Spawns the graph and its children.

join(*a, timeout=10, raise_exc=True, **kw)[source]

Joins the graph and its children.

property ready

True if all child blocks are ready

property running

True if any child blocks are running

property terminated

True if any child blocks are ready

property done

True if any child blocks are done

property error

True if any child blocks threw an error

pause()[source]

Pause all blocks.

resume()[source]

Resume all blocks.

close()[source]

Close all blocks.

terminate()[source]

Terminate all blocks.