import os
import re
import contextlib
from functools import wraps
import numpy as np
def always(*a, **kw):
return True
def never(*a, **kw):
return False
[docs]def resize_list(lst, length, value=None):
'''Resize a list to be a specific length.
Example:
>>> x = [1, 2]
>>> assert resize_list(x, 5) == [1, 2, None, None, None]
>>> assert resize_list(x, 5, 10) == [1, 2, 10, 10, 10]
>>> assert resize_list(x, 4, lambda: x[0]) == [1, 2, 1, 1] # some callable
'''
if length is None or length == -1 or len(lst) == length:
return lst
return (
lst + [
value() if callable(value) else value
for i in range(max(0, length - len(lst)))
]
)[:length]
[docs]def decorator(__func__=None, **kw):
'''A convenience wrapper that allows you to define a decorator with optional
initialization parameters.
Example:
>>> def my_decorator(func, **kw):
... @functools.wraps(func)
... def inner(*a, **kwi):
... return func(*a, **dict(kw, **kw))
... return inner
>>> @my_decorator
... def asdf(x=10, y=15):
... return x+y
>>> @my_decorator(y=20)
... def asdf2(x=10, y=15):
... return x+y
>>> assert asdf() == 25 and asdf2() == 30
'''
def decorated(func):
@wraps(func)
def inner(*a, **kwi):
return func(*a, **dict(kw, **kwi))
return inner
return decorated(__func__) if callable(__func__) else decorated
def partial(__partial_func__, *a, __name__=None, **kw):
@wraps(__partial_func__)
def inner(*ai, **kwi):
return __partial_func__(*a, *ai, **dict(kw, **kwi))
if __name__ is not None:
inner.__name__ = __name__
return inner
def create_partial(__partial_func__, *a, __name__=None, **kw):
return partial(partial, __partial_func__, *a, __name__=__name__ or __partial_func__.__name__, **kw)
def resolve_call(func, *a, **kw):
return func(*a, **kw) if callable(func) else func
def as_func(func):
return func if callable(func) else (lambda: func)
[docs]def ensure_dir(fname):
'''Make sure that the directory that this filename is in exists. Does nothing
if this file is in the current working directory.'''
parent = os.path.dirname(fname)
if parent: # ignore if parent is cwd
os.makedirs(parent, exist_ok=True)
return fname
def adjacent_file(file, *f):
return os.path.abspath(os.path.join(os.path.dirname(file), *f))
[docs]def fname(file):
'''Get file name. e.g. path/to/fileA.txt => fileA'''
return os.path.splitext(os.path.basename(file))[0]
[docs]def write(fname, *lines, mode='r'):
'''write to file.'''
d = os.path.dirname(fname)
if d and not os.path.exists(d):
os.makedirs(d)
with open(os.path.expanduser(fname), mode) as f:
f.write('\n'.join(map(str, lines)))
[docs]def as_list(x):
'''Convert or wrap value as a list.
Examples:
>>> assert as_list(5) == [5]
>>> assert as_list('asdf') == ['asdf']
>>> assert as_list((1, 2)) == [1, 2]
>>> assert as_list([1, 2]) == [1, 2]
'''
return (
x if isinstance(x, list)
else list(x) if isinstance(x, tuple)
else [x])
def is_iter(iterable):
return not hasattr(iterable,'__len__') and hasattr(iterable,'__iter__')
def as_iterlike(x, like=(list, tuple, set, np.ndarray)):
return x if isinstance(x, like) or is_iter(x) else [x]
[docs]def squeeze(x):
'''If the input is a one-element list or tuple, take the first element.
(removes an unnecessary container.)
Examples:
>>> assert squeeze([5]) == 5
>>> assert squeeze((1,)) == 1
>>> assert squeeze([1, 2]) == [1, 2]
>>> assert squeeze('asdf') == 'asdf'
>>> assert squeeze(('asdf',)) == 'asdf'
>>> assert squeeze((1, 2)) == (1, 2)
'''
return x[0] if isinstance(x, (list, tuple)) and len(x) == 1 else x
def notnone(x):
return x is not None
def filter_none(xs):
return [x for x in xs if x is not None]
def separate(xs, *conditions, keep_fails=True):
conditions = conditions or ((lambda x: x is not None),)
outs = [[] for i in range(len(conditions)+1)]
for x in xs:
for i, c in enumerate(conditions):
if c(x):
outs[i].append(x)
break
else:
outs[-1].append(x)
return outs
def flatten(X, args=(), call=False, **kw):
if isinstance(X, (list, tuple)):
yield from (x for xs in X for x in flatten(xs, call=call, args=args, **kw))
elif call and callable(X):
yield from flatten(X(*args, **kw), call=call, args=args, **kw)
else:
yield X
def mergedict(*dicts, args=(), call=True, **kw):
return _merge(flatten(dicts, args=args, call=call, **kw), call=call)
def _merge(flat, call=True):
out = {}
for d in flat:
if not d:
continue
if not call and callable(d):
return ([out] if out else []) + [d] + as_list(_merge(flat, call=call))
out.update(d)
return out
def matchmany(text, *patterns):
return {
k: v for m in (re.search(p, text) for p in patterns) if m
for k, v in m.groupdict().items()}
[docs]@contextlib.contextmanager
def multicontext(*items):
'''Use a variable set of context managers as one.'''
with contextlib.ExitStack() as stack:
yield [stack.enter_context(x) for x in items]
# @wraps(mergedict)
# def _mergedicts(dicts, *a, **kw):
# out = {}
# for d in flatten(dicts, *a, call=True, **kw):
# out.update(d)
# return out
# > 1d: 1d02h05m
# > 1hr: 1h03m04s
# > 10mins: 11m05s
# > 2mins: 2m05s
# : 90.065s
# : 2.065s
_TIME_BREAKS = [
(60 * 60 * 24 * 7, '{w:.0f}w:{d:.0f}d:{h:.0f}h'.format),
(60 * 60 * 24, '{d:.0f}d:{h:.0f}h:{m:.0f}m'.format),
(60 * 60, '{h:.0f}h:{m:.0f}m:{s:.0f}s'.format),
(60 * 10, '{m:.0f}m:{s:.0f}s'.format),
(60 * 1, '{m:.0f}m:{s:.1f}s'.format),
(0, '{s:.3f}s'.format),
]
def _factor_time(t):
wks, days = divmod(t, 60*60*24*7)
days, hrs = divmod(t, 60*60*24)
hrs, mins = divmod(hrs, 60*60)
mins, secs = divmod(mins, 60)
return {'w': wks, 'd': days, 'h': hrs, 'm': mins, 's': secs}
def human_time(secs):
suffix = ' ago' if secs < 0 else ''
secs = abs(secs)
return '({}{})'.format(next((
fmt(**_factor_time(secs))
for t, fmt in _TIME_BREAKS if secs >= t
)), suffix)
# class MpValueProp:
# def __init__(self, name):
# self.name = name
#
# def __get__(self, instance, owner=None):
# return getattr(instance, self.name).value
#
# def __set__(self, instance, value):
# getattr(instance, self.name).value = value