from ..interface import Contract, ContractNotRespected, describe_value
from ..syntax import (Combine, Word, W, alphas, alphanums, oneOf,
ParseException, ZeroOrMore, S, rvalue,
delimitedList, Optional)
from pyparsing import ParseFatalException
[docs]class Extension(Contract):
registrar = {}
def __init__(self, identifier, where=None, args=tuple(), kwargs=None):
assert identifier in Extension.registrar
self.contract = Extension.registrar[identifier]
self.identifier = identifier
self.args = args
self.kwargs = kwargs or {}
Contract.__init__(self, where)
def __str__(self):
inside = []
if self.args:
inside.extend(map(str, self.args))
if self.kwargs:
ks = sorted(self.kwargs)
inside.extend(["%s=%s" % (k, self.kwargs[k]) for k in ks])
s = self.identifier
if inside:
return self.identifier + "(" + ",".join(inside) + ")"
else:
return s
def __repr__(self):
if self.args or self.kwargs:
return ("Extension(%r, args=%r, kwargs=%r)" %
(self.identifier, self.args, self.kwargs))
return "Extension(%r)" % self.identifier
[docs] def check_contract(self, context, value, silent):
context['args'] = tuple(a.eval(context) for a in self.args)
context['kwargs'] = dict((k, v.eval(context)) for
k, v in self.kwargs.items())
self.contract._check_contract(context, value, silent)
[docs] @staticmethod
def parse_action(s, loc, tokens):
identifier = tokens[0]
args = tuple()
kwargs = {}
if len(tokens) == 2:
args, kwargs = tokens[1]
args = tuple(args)
if not identifier in Extension.registrar:
raise ParseException('Unknown extension contract %r' % identifier)
# from contracts.library.separate_context import SeparateContext
contract_ext = Extension.registrar[identifier]
if isinstance(contract_ext, CheckCallable):
callable_thing = contract_ext.callable
test_args = ('value',) + args
from contracts.inspection import check_callable_accepts_these_arguments, InvalidArgs
try:
check_callable_accepts_these_arguments(callable_thing, test_args, kwargs)
except InvalidArgs as e:
msg = 'The callable %s cannot accept these arguments ' % callable_thing
msg += 'args = %s, kwargs = %s ' % (test_args, kwargs)
msg += '%s' % e
raise ParseFatalException(msg)
where = W(s, loc)
return Extension(identifier, where, args, kwargs)
# We want to be pickable so we do not save self.contract
# which might point to a lambda
def __getstate__(self):
return {'identifier': self.identifier,
'args': self.args,
'kwargs': self.kwargs}
def __setstate__(self, d):
self.identifier = d['identifier']
self.contract = Extension.registrar[self.identifier]
self.args = d['args']
self.kwargs = d['kwargs']
[docs]class CheckCallable(Contract):
def __init__(self, callable):
self.callable = callable
Contract.__init__(self, where=None)
[docs] def check_contract(self, context, value, silent):
allowed = (ValueError, AssertionError)
args = context.get('args', tuple())
kwargs = context.get('kwargs', {})
try:
result = self.callable(value, *args, **kwargs)
except allowed as e: # failed
raise ContractNotRespected(self, str(e), value, context)
if result in [None, True]:
# passed
pass
elif result == False:
msg = ('Value does not pass criteria of %s() (module: %s).' %
(get_callable_name(self.callable),
get_callable_module(self.callable)))
raise ContractNotRespected(self, msg, value, context)
else:
msg = ('I expect that %r returns either True, False, None; or '
'raises a ValueError exception. Instead, I got %s.' %
(self.callable, describe_value(value)))
raise ValueError(msg)
def __repr__(self):
""" Note: this contract is not representable, but anyway it is
only used by Extension, which serializes using the identifier. """
return 'CheckCallable(%r)' % self.callable
def __str__(self):
""" Note: this contract is not representable, but anyway it is only
used by Extension, which serializes using the identifier. """
return get_callable_name(callable)
[docs]def get_callable_name(c):
""" Get a displayable name for the callable even if __name__
is not available. """
try:
return c.__name__ + '()'
except:
return str(c)
[docs]def get_callable_module(c):
try:
return c.__module__
except:
return '(No __module__ attr)'
[docs]def describe_callable(c):
return get_callable_name(c) + ' module: %s' % get_callable_module(c)
[docs]class CheckCallableWithSelf(Contract):
def __init__(self, callable): # @ReservedAssignment
self.callable = callable
Contract.__init__(self, where=None)
[docs] def check_contract(self, context, value, silent):
args = context.get('args', tuple())
kwargs = context.get('kwargs', {})
if not 'self' in context:
msg = ('You can only call this contract in the context of '
' a function call to a regular method.')
raise ContractNotRespected(self, msg, value, context)
args = (context['self'], value) + args
allowed = (ValueError, AssertionError)
try:
result = self.callable(*args, **kwargs)
except allowed as e: # failed
raise ContractNotRespected(self, str(e), value, context)
if result in [None, True]:
# passed
pass
elif result == False:
msg = ('Value does not pass criteria of %s.' %
describe_callable(self.callable))
raise ContractNotRespected(self, msg, value, context)
else:
msg = ('I expect that %r returns either True, False, None; or '
'raises a ValueError exception. Instead, I got %s.' %
(self.callable, describe_value(value)))
raise ValueError(msg)
def __repr__(self):
""" Note: this contract is not representable, but anyway it is only
used by Extension, which serializes using the identifier. """
return 'CheckCallableWithSelf(%r)' % self.callable
def __str__(self):
""" Note: this contract is not representable, but anyway it is only
used by Extension, which serializes using the identifier. """
return 'function %s()' % get_callable_name(self.callable)
w = Word('_' + alphanums)
arg = rvalue.copy()
kwarg = w + ZeroOrMore(' ') + S('=') + ZeroOrMore(' ') + rvalue
kwarg.setParseAction(lambda s, loc, tokens: {tokens[0]: tokens[1]})
[docs]def build_args_kwargs(s, loc, tokens):
return (tuple(t for t in tokens if not isinstance(t, dict)),
dict((k, v) for t in tokens if isinstance(t, dict)
for k, v in t.items()))
arglist = delimitedList(kwarg | arg)
arglist.setParseAction(build_args_kwargs)
identifier_expression = (Combine(oneOf(list(alphas)) + Word('_' + alphanums)) +
Optional(S('(') + arglist + S(')')))
identifier_contract = identifier_expression.copy().setParseAction(
Extension.parse_action)