Source code for contracts.library.array

from ..interface import Contract, ContractNotRespected, describe_type
from ..pyparsing_utils import myOperatorPrecedence
from ..syntax import (add_contract, W, contract_expression, O, S, rvalue,
    simple_contract, ZeroOrMore, Literal, MatchFirst, opAssoc, FollowedBy, NotAny,
    Keyword, add_keyword, Word)
from .array_ops import (ArrayOR, ArrayAnd, DType, ArrayConstraint,
    ArrayORCustomString)
from .compositions import And, OR
from .suggester import create_suggester
from numpy import ndarray, dtype  # @UnusedImport
import numpy
from pyparsing import operatorPrecedence


[docs]class Array(Contract): def __init__(self, shape_contract=None, elements_contract=None, where=None): Contract.__init__(self, where) self.shape_contract = shape_contract self.elements_contract = elements_contract
[docs] def check_contract(self, context, value, silent): if not isinstance(value, ndarray): error = 'Expected an array, got a %s.' % describe_type(value) raise ContractNotRespected(contract=self, error=error, value=value, context=context) if self.shape_contract is not None: self.shape_contract._check_contract(context, value.shape, silent) if self.elements_contract is not None: self.elements_contract._check_contract(context, value, silent)
def __str__(self): s = 'array' if self.shape_contract is not None: s += '[%s]' % self.shape_contract if self.elements_contract is not None: s += '(%s)' % self.elements_contract return s def __repr__(self): s = 'Array(%r,%r)' % (self.shape_contract, self.elements_contract) return s
[docs] @staticmethod def parse_action(s, loc, tokens): where = W(s, loc) shape_contract = tokens.get('shape_contract', [None])[0] elements_contract = tokens.get('elements_contract', [None])[0] assert shape_contract is None or isinstance(shape_contract, ShapeContract) assert elements_contract is None or isinstance(elements_contract, Contract) return Array(shape_contract, elements_contract, where=where)
[docs]class ShapeContract(Contract): def __init__(self, dimensions, ellipsis=False, where=None): assert isinstance(dimensions, list) assert isinstance(ellipsis, bool) Contract.__init__(self, where) self.dimensions = dimensions self.ellipsis = ellipsis
[docs] def check_contract(self, context, value, silent): assert isinstance(value, tuple) # Guaranteed by construction expected = len(self.dimensions) ndim = len(value) if ndim < expected: # TODO: write clearer message error = 'Expected %d dimensions, got %d.' % (expected, ndim) raise ContractNotRespected(contract=self, error=error, value=value, context=context) if ndim > expected and not self.ellipsis: error = 'Expected %d dimensions, got %d.' % (expected, ndim) raise ContractNotRespected(contract=self, error=error, value=value, context=context) for i in range(expected): self.dimensions[i]._check_contract(context, value[i], silent)
def __str__(self): be_careful = self.ellipsis or len(self.dimensions) > 1 def rep(x): if be_careful and isinstance(x, (And, OR)): return "(%s)" % x else: return "%s" % x s = 'x'.join(rep(x) for x in self.dimensions) if self.ellipsis: s += 'x...' return s def __repr__(self): if self.ellipsis: s = 'ShapeContract(%r,%r)' % (self.dimensions, self.ellipsis) else: s = 'ShapeContract(%r)' % self.dimensions return s
[docs] @staticmethod def parse_action(s, loc, tokens): where = W(s, loc) # print "in tokens: " % tokens # workaround for some bugs ellipsis = False dimensions = [] for t in tokens: if t == '...': ellipsis = True else: assert isinstance(t, Contract), 'Wrong token %r' % t dimensions.append(t) return ShapeContract(dimensions, ellipsis, where=where)
[docs]class Shape(Contract): def __init__(self, length, contract, where=None): Contract.__init__(self, where) self.contract = contract self.length = length
[docs] def check_contract(self, context, value, silent): if not isinstance(value, ndarray): error = 'Expected an array, got %r.' % value.__class__.__name__ raise ContractNotRespected(contract=self, error=error, value=value, context=context) if isinstance(value, ndarray): value = value.shape if self.length is not None: self.length._check_contract(context, len(value), silent) if self.contract is not None: self.contract._check_contract(context, value, silent)
def __str__(self): s = 'shape' if self.length is not None: s += '[%s]' % self.length if self.contract is not None: s += '(%s)' % self.contract return s def __repr__(self): s = 'Shape(%r,%r)' % (self.length, self.contract) return s
[docs] @staticmethod def parse_action(s, loc, tokens): where = W(s, loc) assert 0 <= len(tokens) <= 2 length = tokens.get('length', [None])[0] contract = tokens.get('other', [None])[0] return Shape(length, contract, where)
array_constraints = [] for glyph in ArrayConstraint.constraints: if glyph == '!=': # special case: ! must be followed by = glyph_expression = Literal('!') - Literal('=') glyph_expression.setName('!=') else: glyph_expression = Literal(glyph) expr = glyph_expression('glyph') - rvalue('rvalue') expr.setParseAction(ArrayConstraint.parse_action) array_constraints.append(expr) np_uint_dtypes = "u1 uint8 uint16 uint32 uint64".split() np_int_dtypes = "i1 int8 int16 int32 int64".split() np_float_dtypes = "float32 float64".split() np_other_dtypes = ['bool'] atomic = np_uint_dtypes + np_int_dtypes + np_float_dtypes + np_other_dtypes # in numpy, int = int64, float = float64 # for us, int = int64|int32| ... dtype_checks = [] for x in atomic: d = numpy.dtype(x) expr = Keyword(x).setParseAction(DType.parse_action(d)) dtype_checks.append(expr)
[docs]def np_composite(custom_string, alternatives): alts = [DType(numpy.dtype(a), a) for a in alternatives] return ArrayORCustomString(custom_string=custom_string, clauses=alts)
[docs]def np_uint(s, loc, tokens): # @UnusedVariable return np_composite('uint', np_uint_dtypes)
[docs]def np_int(s, loc, tokens): # @UnusedVariable return np_composite('int', np_int_dtypes)
[docs]def np_float(s, loc, tokens): # @UnusedVariable return np_composite('float', np_float_dtypes)
dtype_checks.append(Keyword('int').setParseAction(np_int)) dtype_checks.append(Keyword('uint').setParseAction(np_uint)) dtype_checks.append(Keyword('float').setParseAction(np_float)) composite = ['int', 'uint', 'float'] ndarray_simple_contract = MatchFirst(dtype_checks + array_constraints) ndarray_simple_contract.setName('numpy element contract') suggester = create_suggester(get_options=lambda: atomic + composite) baseExpr = ndarray_simple_contract | suggester baseExpr.setName('numpy contract (with recovery)') op = myOperatorPrecedence # op = operatorPrecedence ndarray_composite_contract = op(baseExpr, [ (',', 2, opAssoc.LEFT, ArrayAnd.parse_action), # @UndefinedVariable ('|', 2, opAssoc.LEFT, ArrayOR.parse_action), # @UndefinedVariable ])
[docs]def my_delim_list2(what, delim): return (what + ZeroOrMore(S(delim) + FollowedBy(NotAny(ellipsis)) - what))
ellipsis = Literal('...') shape_suggester = create_suggester(get_options=lambda: ['...'], pattern=Word('.')) inside_inside1 = simple_contract | shape_suggester inside_inside2 = contract_expression | shape_suggester inside = (S('(') - inside_inside2 - S(')')) | inside_inside1 # XXX: ^ and use or_contract? shape_contract = my_delim_list2(inside, S('x')) + O(S('x') + ellipsis) shape_contract.setParseAction(ShapeContract.parse_action) shape_contract.setName('array shape contract') name = Keyword('array') | Keyword('ndarray') optional_shape = (S('[') - shape_contract - S(']'))('shape_contract') optional_elements = (S('(') - ndarray_composite_contract - S(')'))('elements_contract') array_contract = name + O(optional_shape) + O(optional_elements) array_contract.setParseAction(Array.parse_action) array_contract.setName('array() contract') add_contract(array_contract) add_keyword('array') add_keyword('ndarray') optional_length = (S('[') - contract_expression - S(']'))('length') optional_other = (S('(') - contract_expression - S(')'))('other') shape = Keyword('shape') + O(optional_length) + O(optional_other) shape.setName('shape() contract') shape.setParseAction(Shape.parse_action) add_contract(shape) add_keyword('shape')