Source code for spy.decorators

from functools import partial, wraps, update_wrapper
import copy
import re

import clize.errors

from .core import _accepts_context, _call_fragment_body, collect, DROP, many as _many
from .objects import Context

__all__ = ['accumulate', 'callable', 'filter', 'many', 'format', 'regex', 'keywords',
           'focus', 'magnify', 'try_except']

decorators = []


def _get_lenses():
    global lenses, _get_lenses
    try:
        import lenses
    except ImportError:  # pragma: no cover
        lenses = None
    _get_lenses = lambda: lenses
    return lenses


def _get_formatter():
    global _get_formatter
    import string
    formatter = string.Formatter()
    _get_formatter = lambda: formatter
    return formatter


[docs]def decorator(*names, doc=None, takes_string=False, prep=None, dec_args=()): if prep is None: if len(dec_args) == 1: prep = lambda _, a: a elif len(dec_args) > 1: prep = lambda _, *a: a def wrapperer(_spy_decorator): @wraps(_spy_decorator) def wrapper(fn, dec_args=()): is_decorator = getattr(fn, '_spy_decorated', None) if is_decorator: xfn = fn elif _accepts_context(fn): xfn = partial(_call_fragment_body, fn) else: xfn = partial(_drop_context, fn) if prep: opaque = prep(fn, *dec_args) def wrapped(v, context=None): _spy_callable = fn # noqa: F841 _spy_value = v # noqa: F841 return _spy_decorator(xfn, v, context, opaque) else: def wrapped(v, context=None): _spy_callable = fn # noqa: F841 _spy_value = v # noqa: F841 return _spy_decorator(xfn, v, context) update_wrapper(wrapped, fn) wrapped._spy_decorated = True return wrapped if dec_args: orig_wrapper = wrapper def wrapper(*a): return partial(orig_wrapper, dec_args=a) wrapper.decorator_names = names wrapper.decorator_help = doc wrapper.takes_string = takes_string wrapper.dec_args = dec_args decorators.append(wrapper) return wrapper return wrapperer
def _drop_context(fn, v, context): return _call_fragment_body(fn, v)
[docs]@decorator('--accumulate', '-a', doc='Pass an iterator of yielded values to this fragment') def accumulate(fn, v, context): return fn(collect(context), context)
[docs]@decorator('--callable', '-c', doc='Call the result of this fragment') def callable(fn, v, context): result = fn(v, context) return result(v)
[docs]@decorator('--filter', '-f', doc='Treat this fragment as a predicate to filter data') def filter(fn, v, context): result = fn(v, context) return v if result else DROP
[docs]@decorator('--many', '-m', doc='Iterate over this fragment') def many(fn, v, context): result = fn(v, context) return _many(result)
@decorator('--format', '-i', doc='Interpolate argument as a format string', takes_string=True, prep=lambda _: _get_formatter()) def format(fn, v, context, formatter): env, x = fn(v, context) return formatter.vformat(x, v, env)
[docs]@decorator('--regex', '--regexp', '-R', doc='Match argument as a regexp', takes_string=True) def regex(fn, v, context): env, x = fn(v, context) return re.match(x, v)
def _kw_prep(fn): base = fn while hasattr(base, '__wrapped__'): base = base.__wrapped__ if not hasattr(base, '_spy_setenv'): raise ValueError("inappropriate function") return base._spy_setenv
[docs]@decorator('--keywords', '-k', doc='Execute with the input value as the scope', prep=_kw_prep) def keywords(fn, v, context, setenv): setenv(v) return fn(v, context)
def _convert_focus(s): lenses = _get_lenses() if lenses is not None and s.startswith('_'): context = Context() context['_'] = lenses.lens return eval(s, context, {}) if s.startswith('.'): return s[1:] if s[:1] in '0123456789-' and (len(s) == 1 or s[1:].isdigit()): return int(s) if ':' in s: if lenses is None: raise clize.errors.ArgumentError("slice focusing requires `lenses`") sbits = s.split(':') bits = [] for x in sbits: if x == '': bits.append(None) elif x.isdigit() or x[:1] == '-' and x[1:].isdigit(): bits.append(int(x)) else: break else: if len(bits) in (2,3): return lenses.lens[slice(*bits)].Each() return s _convert_focus.usage_name = 'ITEM' def _focus_prep(fn, focus): lenses = _get_lenses() if lenses is None: def apply(f, v): v_ = copy.copy(v) v_[focus] = f(v_[focus]) return v_ return apply if not isinstance(focus, lenses.UnboundLens): focus = lenses.lens[focus] return lambda f, v: focus.modify(f)(v)
[docs]@decorator('--focus', '-o', doc='Operate on an item of the input in-place', prep=_focus_prep, dec_args=[_convert_focus]) def focus(fn, v, context, f): fn = partial(fn, context=context) return f(fn, v)
def _magnify_prep(fn, focus): lenses = _get_lenses() if lenses is None: def apply(f, v): return f(v[focus]) return apply if not isinstance(focus, lenses.UnboundLens): focus = lenses.lens[focus] return lambda f, v: f(focus.get()(v))
[docs]@decorator('--magnify', '-O', doc='Operate on and return an item of the input', prep=_magnify_prep, dec_args=[_convert_focus]) def magnify(fn, v, context, f): fn = partial(fn, context=context) return f(fn, v)
[docs]@decorator('--try', '-t', doc='Filter out input that causes the fragment to raise an exception') def try_except(fn, v, context): try: return fn(v, context) except: pass return DROP