'''
'''
import time
from contextlib import contextmanager
import weakref
import multiprocessing as mp
import remoteobj
import reip
from reip.util import text, iters
def default_graph():
return _ContextScope.default
def get_graph(graph=None):
return _ContextScope.default if graph is None else graph
def top_graph():
return _ContextScope.top
class _ContextScope:
top = None
default = None
all_instances = {}
class BaseContext: # (metaclass=_MetaContext)
_delay = 1e-3
_previous = False # the previous default instance
parent_id, task_id = None, None
def __init__(self, *blocks, name=None, graph=None):
if blocks and not name and isinstance(blocks[0], str):
name, blocks = blocks[0], blocks[1:]
self.blocks = list(blocks)
self.name = name or f'{self.__class__.__name__}_{id(self)}'
self.parent_id, self.task_id = BaseContext.register_instance(self, graph)
for b in self.blocks:
self.register_instance(b, self)
# global instance management
@classmethod
def _initialize_default_graph(cls):
_ContextScope.default = _ContextScope.top = Graph(name='_default_')
@classmethod
def get(cls, instance=None):
'''If `instance` is `None`, the default instance will be returned.
Otherwise `instance` is returned.'''
return _ContextScope.default if instance is None else instance
def add(self, block):
'''Add block to graph.'''
self.blocks.append(block)
def remove(self, block):
self.blocks.remove(block)
def clear(self):
self.blocks.clear()
@classmethod
def register_instance(cls, child=None, parent=None):
'''Add a member (Block, Task) to the graph in instance.
This is used inside of
Arguments:
child (reip.Graph, reip.Block): A graph, task, or block to add to
instance.
If `member is instance`, nothing will be added. In other words,
A graph cannot be added to itself.
parent (reip.Graph): A graph or task to be added to.
If instance is None, the default graph/task will be used.
If instance is False, nothing will be added.
Returns:
parent_id (str): the name of the child's parent block.
Can be a Graph or Task.
task_id (str or None): the name of the task that a child is attached to.
If the parent graph is not inside a task or is not a task itself,
it will return None.
NOTE: Returning string ids prevents Blocks from having a circular reference
to the entire graph.
Basically this should handle:
- adding a graph to a graph
- parent.name, parent.task_id
- adding a task to a graph
- raise if parent is task
- raise if graph.task_id is not None
- if `flatten_tasks`, then iterate through graph parents until you
find one on the main task
- parent.name, child.name
- adding a graph to a task
- parent.name, parent.task_id
- adding a block to a graph
- parent.name, parent.task_id
- adding a block to a task
- parent.name, parent.task_id
- do nothing if parent is False / None+unset default.
'''
_ContextScope.all_instances[child.name] = weakref.ref(child)
task_id = child.name if isinstance(child, reip.Task) else None
# user explicitly disabled graph for this child
if parent is False:
return None, task_id
parent = get_graph(parent) # get default if None
# there is no graph specified and no default graph
if parent is None:
return None, task_id
# trying to add to self
if parent is child:
return parent.parent_id, task_id or parent.task_id
if task_id: # disallow nested tasks
# parent_contexts = [parent] + parent.parent_contexts if flatten_tasks else [parent]
if isinstance(parent, reip.Task):
raise RuntimeError(
'Cannot add a task ({}) to another task ({}).'.format(
child.name, parent.name))
if parent.task_id:
raise RuntimeError(
'Cannot add a task ({}) to a graph ({}) in a different task ({}).'.format(
child.name, parent.name, parent.task_id))
# everything checks out.
parent.add(child)
return parent.name, task_id or parent.task_id
@classmethod
def get_object(cls, id, require=True):
'''Get an instance using its name. If the instance '''
obj = _ContextScope.all_instances.get(id) if id is not None else None
obj = obj() if obj is not None else None
if obj is None and require:
raise ValueError(f'Object {id} does not exist.')
return obj
@property
def parents(self):
obj = self
stack = []
while obj.parent_id:
obj = BaseContext.get_object(obj.parent_id)
if obj is None:
break
stack.append(obj)
return stack
def __enter__(self):
return self.as_default()
def __exit__(self, exc_type, exc_val, exc_tb):
self.restore_previous()
def as_default(self):
'''Set this graph as the default graph and track the previous default.'''
self._previous, _ContextScope.default = _ContextScope.default, self
return self
def restore_previous(self):
'''Restore the previously set graph.'''
if self._previous is not False:
_ContextScope.default, self._previous = self._previous, None
self._previous = False
return self
# def rename_child(self, old_name, new_name):
# pass
# import blessed
[docs]class Graph(BaseContext):
'''Represents a collection of Blocks.
'''
_delay = 1e-3
_main_delay = 1e-2#0.1
controlling = None
[docs] def __init__(self, *blocks, log_level='info', **kw):
super().__init__(*blocks, **kw)
self.log = reip.util.logging.getLogger(self, level=log_level)
self._except = remoteobj.LocalExcept()
[docs] def __repr__(self):
return '{}({}), {} children:{}\n'.format(
self.__class__.__name__, self.name, len(self.blocks),
''.join('\n' + text.indent(b) for b in self.blocks))
def __bool__(self):
return not (self.done or self.error)
@classmethod
def detached(cls, *blocks, **kw):
return cls(*blocks, graph=None, **kw)
# run graph
[docs] def run(self, duration=None, stats_interval=1, print_graph=True, **kw):
'''Run a graph and all of its child blocks.
.. code-block::
with reip.Graph() as g:
BlockA().to(BlockB()).to(BlockC())
g.run(duration=10)
'''
if print_graph:
print("\nStarting:", self, "\n")
with self.run_scope(**kw):
self.wait(duration, stats_interval=stats_interval)
[docs] @contextmanager
def run_scope(self, wait=True, raise_exc=True):
'''Run a graph and its child blocks while inside the context manager.
.. code-block:: python
with reip.Graph() as g:
BlockA().to(BlockB()).to(BlockC())
with g.run_scope():
# gives you more flexibility in here
g.wait(duration=5)
# equivalent to g.run(duration=5)
'''
self.log.debug(text.green('Starting'))
# controlling = False
try:
with self._except('spawn', raises=False):
try:
self.spawn(wait=wait)
# controlling = self.controlling
self.log.debug(text.green('Ready'))
except Exception as e:
self.log.error(text.red('Spawn Error'))
self.log.exception(e)
raise
with self._except(raises=False):
yield self
except KeyboardInterrupt as e:
self.log.warning(text.yellow('Interrupting'))
# reip.util.print_stack('Interrupted here')
self.log.exception(e)
self.terminate()
finally:
try:
self.join(raise_exc=raise_exc)
finally:
self.log.debug(text.green('Done'))
# if controlling:
# print(self.stats_summary())
# _delay = 1
def wait(self, duration=None, stats_interval=None):
t, t0 = time.time(), time.time()
for _ in iters.timed(iters.sleep_loop(self._delay), duration):
if self.done:
return True
if stats_interval and time.time() - t > stats_interval:
t = time.time()
status = self.status()
self.log.info("Status after %.3f sec:\n%s" % (time.time() - t0, status[status.find("\n")+1:]))
def _reset_state(self):
self._except.clear()
# state
@property
def ready(self):
'''True if all child blocks are ready'''
return all(b.ready for b in self.blocks)
@property
def running(self):
'''True if any child blocks are running'''
return any(b.running for b in self.blocks)
@property
def terminated(self):
'''True if any child blocks are ready'''
return all(b.terminated for b in self.blocks)
@property
def done(self):
'''True if any child blocks are done'''
return any(b.done for b in self.blocks)
@property
def error(self):
'''True if any child blocks threw an error'''
return bool(self._except.all()) or any(b.error for b in self.blocks)
# Block control
_ready_flag = None
[docs] def spawn(self, wait=True, _controlling=True, _ready_flag=None, **kw):
'''Spawns the graph and its children.'''
self.controlling = _controlling
if self.controlling:
self._ready_flag = _ready_flag = mp.Event()
self._spawn_tasks_first(_ready_flag=_ready_flag, **kw)
for block in self.blocks:
block.spawn(wait=False, _controlling=False, _ready_flag=_ready_flag, **kw)
if wait:
self.wait_until_ready()
if self.controlling:
self.raise_exception()
if self._ready_flag is not None:
_ready_flag.set()
def _spawn_tasks_first(self, **kw):
for b in self.blocks:
if isinstance(b, reip.Task):
b.spawn(wait=False, _controlling=False, **kw)
elif isinstance(b, reip.Graph):
b._spawn_tasks_first(wait=False, _controlling=False, **kw)
def wait_until_ready(self, stats_interval=5):
t = time.time()
while not self.ready and not self.done:
time.sleep(self._delay)
if stats_interval and time.time() - t > stats_interval:
t = time.time()
self.log.info(f'waiting for all blocks to initialize...\n{self}')
[docs] def join(self, close=True, terminate=False, raise_exc=None, **kw):
'''Joins the graph and its children.'''
if self._ready_flag is not None and not self._ready_flag.is_set():
self._ready_flag.set()
if close:
self.close()
if terminate:
self.terminate()
for block in self.blocks:
block.join(terminate=False, raise_exc=False, **kw)
if raise_exc is None and self.controlling:
raise_exc = True
if raise_exc:
self.raise_exception()
self.controlling = self._ready_flag = None
[docs] def pause(self):
'''Pause all blocks.'''
for block in self.blocks:
block.pause()
[docs] def resume(self):
'''Resume all blocks.'''
for block in self.blocks:
block.resume()
[docs] def close(self):
'''Close all blocks.'''
for block in self.blocks:
block.close()
[docs] def terminate(self):
'''Terminate all blocks.'''
for block in self.blocks:
block.terminate()
[docs] def raise_exception(self):
'''Raise any exceptions from blocks.'''
for block in self.blocks:
block.raise_exception()
self._except.raise_any()
def __export_state__(self):
return {
'blocks': [b.__export_state__() for b in self.blocks],
'_except': self._except,
}
def __import_state__(self, state):
if state:
for b, update in zip(self.blocks, state.pop('blocks', ())):
b.__import_state__(update)
def short_str(self):
return '[{}({})[{} children]]'.format(
self.__class__.__name__[0],
self.name, len(self.blocks))
def stats(self):
return {
'name': self.name,
'blocks': {b.name: b.stats() for b in self.blocks}
}
def summary(self):
return '\n'.join(s for s in (b.summary() for b in self.blocks) if s)
def status(self):
return text.b_(
f'[{self.name}]',
text.indent('\n'.join(
s for s in (b.status() for b in self.blocks) if s)), ""
)
def stats_summary(self):
return text.block_text(
f'[{self.name}]',
*(s for s in (b.stats_summary() for b in self.blocks) if s)
)
# create an initial default graph
# Graph.top = Graph.default = Graph()