diff --git a/.gitignore b/.gitignore index 5ef4fd8..adacf41 100644 --- a/.gitignore +++ b/.gitignore @@ -15,3 +15,5 @@ LemonGraph/data/d3.v3.min.js LemonGraph/data/d3.v4.min.js LemonGraph/data/svg-crowbar.js deps/js +.tox +.eggs diff --git a/LemonGraph/MatchLGQL.py b/LemonGraph/MatchLGQL.py index 4dcce3d..9b5267b 100755 --- a/LemonGraph/MatchLGQL.py +++ b/LemonGraph/MatchLGQL.py @@ -1,32 +1,33 @@ from __future__ import print_function + +import itertools import re -from six import iteritems import sys -import itertools - from collections import defaultdict, deque +from six import iteritems + SQ = '(?:\'(?:[^\'\\\\]|\\\\[\'\"\\\\])*\')' DQ = '(?:\"(?:[^\"\\\\]|\\\\[\'\"\\\\])*\")' -BW = '(?:(?:(?![0-9])\w)\w*)' +BW = '(?:(?:(?![0-9])\w)\w*)' # noqa STR = re.compile('(?:%s|%s)' % (DQ, SQ), re.UNICODE) WHITE = re.compile(r'\s+', re.UNICODE) -KEY = re.compile(r'(?:%s|%s|%s)' % (BW, SQ, DQ), re.IGNORECASE|re.UNICODE) +KEY = re.compile(r'(?:%s|%s|%s)' % (BW, SQ, DQ), re.IGNORECASE | re.UNICODE) DOT = re.compile(r'(?:\.)', re.UNICODE) -NULL = re.compile(r'(?:None|null)', re.IGNORECASE|re.UNICODE) -TRUE = re.compile(r'(?:true)', re.IGNORECASE|re.UNICODE) -FALSE = re.compile(r'(?:false)', re.IGNORECASE|re.UNICODE) +NULL = re.compile(r'(?:None|null)', re.IGNORECASE | re.UNICODE) +TRUE = re.compile(r'(?:true)', re.IGNORECASE | re.UNICODE) +FALSE = re.compile(r'(?:false)', re.IGNORECASE | re.UNICODE) TYPES = re.compile(r'(?:boolean|string|number|array|object)', re.UNICODE) OCT = re.compile(r'(?:-?0[0-7]+)', re.UNICODE) -HEX = re.compile(r'(?:-?0x[0-9a-f]+)', re.IGNORECASE|re.UNICODE) -NUM = re.compile(r'(?:[0-9.e+-]+)', re.IGNORECASE|re.UNICODE) -#REGEX = re.compile(r'(?:/((?:[^\/]|\\.)*)/([ilmsxu]*))', re.UNICODE) +HEX = re.compile(r'(?:-?0x[0-9a-f]+)', re.IGNORECASE | re.UNICODE) +NUM = re.compile(r'(?:[0-9.e+-]+)', re.IGNORECASE | re.UNICODE) +# REGEX = re.compile(r'(?:/((?:[^\/]|\\.)*)/([ilmsxu]*))', re.UNICODE) REGEX = re.compile(r'(?:/((?:[^/]|\\.)*)/([imsx]*))', re.UNICODE) LIST_BEGIN = re.compile(r'\[', re.UNICODE) LIST_END = re.compile(r'\]', re.UNICODE) COMMA = re.compile(r',[\s,]*', re.UNICODE) -OBJ_BEGIN = re.compile(r'([@]*)\b([NE])(?::(%s(?:,%s)*?))?\(' % (BW, BW), re.IGNORECASE|re.UNICODE) +OBJ_BEGIN = re.compile(r'([@]*)\b([NE])(?::(%s(?:,%s)*?))?\(' % (BW, BW), re.IGNORECASE | re.UNICODE) OBJ_END = re.compile(r'\)', re.UNICODE) LINK_UNIQ = re.compile(r'(?:?)', re.UNICODE) CLEANER = re.compile(r'\\(.)', re.UNICODE) @@ -37,25 +38,25 @@ RANGE = (STR, OCT, NUM, HEX) OP_NEXT_BEGIN = { - ':' : (TYPES, LIST_BEGIN), - '!:' : (TYPES, LIST_BEGIN), - '=' : (STR, OCT, NUM, HEX, TRUE, FALSE, NULL, LIST_BEGIN), - '!=' : (STR, OCT, NUM, HEX, TRUE, FALSE, NULL, LIST_BEGIN), - '~' : (REGEX, LIST_BEGIN), - '!~' : (REGEX, LIST_BEGIN), - '<' : RANGE, - '<=' : RANGE, - '>' : RANGE, - '>=' : RANGE, + ':' : (TYPES, LIST_BEGIN), # noqa + '!:' : (TYPES, LIST_BEGIN), # noqa + '=' : (STR, OCT, NUM, HEX, TRUE, FALSE, NULL, LIST_BEGIN), # noqa + '!=' : (STR, OCT, NUM, HEX, TRUE, FALSE, NULL, LIST_BEGIN), # noqa + '~' : (REGEX, LIST_BEGIN), # noqa + '!~' : (REGEX, LIST_BEGIN), # noqa + '<' : RANGE, # noqa + '<=' : RANGE, # noqa + '>' : RANGE, # noqa + '>=' : RANGE, # noqa } OP_NEXT_END = { - ':' : (TYPES, LIST_END), - '!:' : (TYPES, LIST_END), - '=' : (STR, OCT, NUM, HEX, TRUE, FALSE, NULL, LIST_END), - '!=' : (STR, OCT, NUM, HEX, TRUE, FALSE, NULL, LIST_END), - '~' : (REGEX, LIST_END), - '!~' : (REGEX, LIST_END), + ':' : (TYPES, LIST_END), # noqa + '!:' : (TYPES, LIST_END), # noqa + '=' : (STR, OCT, NUM, HEX, TRUE, FALSE, NULL, LIST_END), # noqa + '!=' : (STR, OCT, NUM, HEX, TRUE, FALSE, NULL, LIST_END), # noqa + '~' : (REGEX, LIST_END), # noqa + '!~' : (REGEX, LIST_END), # noqa } OTHERTYPE = { @@ -74,17 +75,17 @@ QUOTES = "\'\"" REVERSE = { - 'both' : 'both', - 'in' : 'out', - 'out' : 'in', + 'both': 'both', + 'in': 'out', + 'out': 'in', } MERGE = { - '=': lambda a, b: a.intersection(b), + '=': lambda a, b: a.intersection(b), '!=': lambda a, b: a.union(b), - '~': lambda a, b: a.intersection(b), + '~': lambda a, b: a.intersection(b), '!~': lambda a, b: a.union(b), - ':': lambda a, b: a.intersection(b), + ':': lambda a, b: a.intersection(b), '!:': lambda a, b: a.union(b), } @@ -95,42 +96,45 @@ '<=': lambda a, b: a <= b, } + # sigh - bools are a subclass of int def is_type(val, types): if isinstance(val, bool) and bool not in types: return False return isinstance(val, tuple(types)) + FILTER_OPS = ('!=', ':', '!:', '~', '!~') FILTER = { - '!=' : lambda d1, d2: set(v for v in d1 if v not in d2), - ':' : lambda d1, d2: set(v for v in d1 if is_type(v, d2)), - '!:' : lambda d1, d2: set(v for v in d1 if not is_type(v, d2)), - '~' : lambda d1, d2: set(v for v in d1 if _match_at_least_one(v, d2)), - '!~' : lambda d1, d2: set(v for v in d1 if not _match_at_least_one(v, d2)), + '!=' : lambda d1, d2: set(v for v in d1 if v not in d2), # noqa + ':' : lambda d1, d2: set(v for v in d1 if is_type(v, d2)), # noqa + '!:' : lambda d1, d2: set(v for v in d1 if not is_type(v, d2)), # noqa + '~' : lambda d1, d2: set(v for v in d1 if _match_at_least_one(v, d2)), # noqa + '!~' : lambda d1, d2: set(v for v in d1 if not _match_at_least_one(v, d2)), # noqa } # tests are (key-list, op, vals) TEST_EVAL = { - '=' : lambda val, vals: val in vals, - '!=' : lambda val, vals: val not in vals, - '~' : lambda val, vals: _match_at_least_one(val, vals), - '!~' : lambda val, vals: not _match_at_least_one(val, vals), + '=' : lambda val, vals: val in vals, # noqa + '!=' : lambda val, vals: val not in vals, # noqa + '~' : lambda val, vals: _match_at_least_one(val, vals), # noqa + '!~' : lambda val, vals: not _match_at_least_one(val, vals), # noqa # resolving the key already succeeded - that's all we need - 'exists' : lambda val, vals: True, + 'exists' : lambda val, vals: True, # noqa # range operators are guaranteed to have exactly one value - '>' : lambda val, vals: val > vals[0], - '>=' : lambda val, vals: val >= vals[0], - '<' : lambda val, vals: val < vals[0], - '<=' : lambda val, vals: val <= vals[0], + '>' : lambda val, vals: val > vals[0], # noqa + '>=' : lambda val, vals: val >= vals[0], # noqa + '<' : lambda val, vals: val < vals[0], # noqa + '<=' : lambda val, vals: val <= vals[0], # noqa # type operators - ':' : lambda val, vals: is_type(val, vals), - '!:' : lambda val, vals: not is_type(val, vals), + ':' : lambda val, vals: is_type(val, vals), # noqa + '!:' : lambda val, vals: not is_type(val, vals), # noqa } + def _clean_num(m, val, _): try: val = float(val) @@ -141,6 +145,7 @@ def _clean_num(m, val, _): raise ValueError(m.start()) return val + def _clean_regex(m, val, cache): flags = re.UNICODE for f in m.group(2): @@ -151,6 +156,7 @@ def _clean_regex(m, val, cache): ret = cache[(m.group(1), flags)] = re.compile(m.group(1), flags) return ret + TYPES_MAP = { 'boolean': bool, 'string': str, @@ -171,6 +177,7 @@ def _clean_regex(m, val, cache): TYPES: lambda m, val, _: TYPES_MAP[val], } + def _match_at_least_one(val, rgxs): try: for rgx in rgxs: @@ -180,6 +187,7 @@ def _match_at_least_one(val, rgxs): pass return False + class QueryCannotMatch(Exception): def __init__(self, query): self.query = query @@ -190,6 +198,7 @@ def __str__(self): def __repr__(self): return 'QueryCannotMatch(%s)' % repr(self.query) + class QuerySyntaxError(Exception): def __init__(self, query, pos, message): self.query = query @@ -229,9 +238,9 @@ def __init__(self, filter, cache=None): arrow = m.group(0) if len(arrow) == 2: - link, rlink = ('out','in') if arrow[-1] == '>' else ('in', 'out') + link, rlink = ('out', 'in') if arrow[-1] == '>' else ('in', 'out') else: - link = rlink = dir = 'both' + link = rlink = dir = 'both' # noqa info['next'] = link @@ -242,7 +251,7 @@ def __init__(self, filter, cache=None): if info['type'] == self.matches[-1]['type']: inferred = { 'type': OTHERTYPE[info['type']], - 'tests' : tuple([(('type',), 'exists', ())]), + 'tests': tuple([(('type',), 'exists', ())]), 'next': link, 'prev': rlink, 'keep': False, @@ -294,7 +303,7 @@ def add_filter(self, alias): if required: raise self.syntax_error('missing required alias: %s' % normalized) # minimum info obj - infos = ({ 'tests': deque() },) + infos = ({'tests': deque()},) self.required_filters.discard(normalized) pos = self.pos for info in infos: @@ -511,11 +520,11 @@ def munge_obj(self, info): info['rank2'] = 0 try: offset = 0 if info['type'] == 'N' else 1 - accel['type'] = tuple(d[(('type',), '=')]) + accel['type'] = tuple(d[(('type',), '=')]) info['rank'] = 4 + offset info['rank2'] = len(accel['type']) # value is only useful if type is there - accel['value'] = tuple(d[(('value',),'=')]) + accel['value'] = tuple(d[(('value',), '=')]) info['rank'] = 2 + offset info['rank2'] *= len(accel['value']) except KeyError: @@ -523,7 +532,7 @@ def munge_obj(self, info): # add id accelerator try: - accel['ID'] = tuple(d[(('ID',), '=')]) + accel['ID'] = tuple(d[(('ID',), '=')]) info['rank'] = 0 if info['type'] == 'E' else 1 info['rank2'] = len(accel['ID']) except KeyError: @@ -546,7 +555,7 @@ def seeds(self, txn, beforeID=None): rank = test['rank'] accel = test['accel'] funcs = (txn.nodes, txn.edges) - if rank in (0,1): + if rank in (0, 1): funcs = (txn.edge, txn.node) for ID in accel['ID']: try: @@ -554,7 +563,7 @@ def seeds(self, txn, beforeID=None): except TypeError: pass elif rank in (2,): - for t,v in itertools.product(accel['type'], accel['value']): + for t, v in itertools.product(accel['type'], accel['value']): seed = txn.node(type=t, value=v, query=True, beforeID=beforeID) if seed: yield seed @@ -564,7 +573,7 @@ def seeds(self, txn, beforeID=None): for seed in txn.edges(type=t, beforeID=beforeID): if seed.value in vals: yield seed - elif rank in (4,5): + elif rank in (4, 5): for t in accel['type']: for seed in funcs[rank % 2](type=t, beforeID=beforeID): yield seed @@ -575,7 +584,7 @@ def seeds(self, txn, beforeID=None): def dump(self, fh=sys.stdout): print('[', file=fh) for p in self.matches: - pre = dict( (key, val) for key, val in iteritems(p) if key != 'tests' ) + pre = dict((key, val) for key, val in iteritems(p) if key != 'tests') if p['tests']: print('\t%s:[' % pre, file=fh) for test in p['tests']: @@ -603,6 +612,7 @@ def is_valid(self, obj, idx=0, skip_fudged=False): # return False # return True + def eval_test(obj, test): target = obj try: @@ -612,6 +622,7 @@ def eval_test(obj, test): return False return TEST_EVAL[test[1]](target, test[2]) + class MatchCTX(object): link = (None, 'next', 'prev') @@ -621,7 +632,7 @@ def __init__(self, match): self.next = next self.match = match self.chain = deque() - self.uniq = tuple(i for i,x in enumerate(match.matches) if x['uniq']) + self.uniq = tuple(i for i, x in enumerate(match.matches) if x['uniq']) self.seen = deque() def push(self, target, delta): diff --git a/LemonGraph/__init__.py b/LemonGraph/__init__.py index 9e4ad0d..f4770c7 100644 --- a/LemonGraph/__init__.py +++ b/LemonGraph/__init__.py @@ -1,4 +1,5 @@ from __future__ import print_function + try: from ._lemongraph_cffi import ffi, lib except ImportError: @@ -14,20 +15,24 @@ except ImportError: import builtins as builtin -from collections import deque import itertools -from lazy import lazy import os -from six import iteritems, itervalues import sys +from collections import deque + +from lazy import lazy + +from six import iteritems, itervalues -from .callableiterable import CallableIterableMethod -from .hooks import Hooks -from .dirlist import dirlist -from .indexer import Indexer -from .MatchLGQL import QueryCannotMatch, QuerySyntaxError from . import algorithms from . import wire +from .callableiterable import CallableIterableMethod +from .hooks import Hooks + +from .dirlist import dirlist # noqa +# From Andrey: I'm not sure both these modules are needed here +# from .indexer import Indexer +# from .MatchLGQL import QueryCannotMatch, QuerySyntaxError # these imports happen at the bottom ''' @@ -38,6 +43,7 @@ from .sset import SSet ''' + # so I can easily create keys/items/values class # methods that behave as the caller expects def listify_py2(func): @@ -50,6 +56,7 @@ def listify(*args, **kwargs): return list(func(*args, **kwargs)) return listify + # todo: # think about splitting deletes off into its own btree so the log is truly append-only @@ -69,7 +76,7 @@ def merge_values(a, b): if isinstance(b, dict): if isinstance(a, dict): for k, v in iteritems(b): - a[k] = merge_values(a.get(k,None), v) + a[k] = merge_values(a.get(k, None), v) return a elif isinstance(b, (tuple, list)): # also merge and sort lists, but do not descend into them @@ -166,7 +173,7 @@ def delete(self): path = self.path if self.nosubdir: os.unlink(path) - os.unlink(path+"-lock") + os.unlink(path + "-lock") else: os.unlink(os.path.join(path, 'data.mdb')) os.unlink(os.path.join(path, 'lock.mdb')) @@ -306,12 +313,15 @@ class EndTransaction(Exception): def __init__(self, txn): self.txn = txn + class AbortTransaction(EndTransaction): pass + class CommitTransaction(EndTransaction): pass + class Transaction(GraphItem): reserved = () @@ -330,9 +340,9 @@ def __init__(self, graph, write=True, beforeID=None, _parent=None): # create a child transaction - not possible if DB_WRITEMAP was enabled (it is disabled) def transaction(self, write=None, beforeID=None): return Transaction(self.graph, - self.write if write is None else write, - self.b4ID(beforeID), - self._txn) + self.write if write is None else write, + self.b4ID(beforeID), + self._txn) def __enter__(self): self._txn = lib.graph_txn_begin(self.graph._graph, self._parent, self.txn_flags) @@ -497,7 +507,7 @@ def scan(self, start=1, stop=0): if start == self.beforeID: return try: - ret = self.entry(start, beforeID=start+1) + ret = self.entry(start, beforeID=start + 1) except IndexError: return yield ret @@ -543,7 +553,7 @@ def fifo(self, domain, map_values=False, serialize_value=None): def updates(self, **kwargs): """returns iterator for tuples of: node/edge before modification, node/edge after modification, the set of key names that changed, first and last txn ID to contribute to the delta, and the last logID processed""" changed = {} - batch=kwargs.pop('batch', 500) + batch = kwargs.pop('batch', 500) for target in self.scan(**kwargs): ID = target.ID updated = [] @@ -553,7 +563,7 @@ def updates(self, **kwargs): else: updated.append((target, set(target))) # new edges cause changes in referenced nodes' properties - if isinstance(target,Edge): + if isinstance(target, Edge): updated.append((target.src, Node.reserved_src_updates)) updated.append((target.tgt, Node.reserved_tgt_updates)) @@ -578,6 +588,7 @@ def updates(self, **kwargs): def query(self, pattern, start=0, stop=0, cache=None, limit=0): q = Query((pattern,), cache={} if cache is None else cache) + def just_chain(): for _, chain in q.execute(self, start=start, stop=stop, limit=limit): yield chain @@ -680,14 +691,14 @@ def delete(self): self._lib_delete(self._txn, self._data) def __repr__(self): - pairs = ["%s: %s" % (repr(key),repr(value)) for (key, value) in self.iteritems()] + pairs = ["%s: %s" % (repr(key), repr(value)) for (key, value) in self.iteritems()] return '{' + ', '.join(pairs) + '}' def as_dict(self, trans=None, update=None, native=None): if trans is None: - ret = dict( (prop.key, prop.value) for prop in self.properties(native=native) ) + ret = dict((prop.key, prop.value) for prop in self.properties(native=native)) else: - ret = dict( (trans.get(prop.key, prop.key), prop.value) for prop in self.properties(native=native) ) + ret = dict((trans.get(prop.key, prop.key), prop.value) for prop in self.properties(native=native)) if update: ret.update(update) return ret @@ -730,19 +741,19 @@ class Node(NodeEdgeProperty): _lib_resolve = lib.graph_node_resolve _lib_updateID = lib.graph_node_updateID directions = { - None: 0, - 'in': lib.GRAPH_DIR_IN, - 'out': lib.GRAPH_DIR_OUT, + None: 0, + 'in': lib.GRAPH_DIR_IN, + 'out': lib.GRAPH_DIR_OUT, 'both': lib.GRAPH_DIR_BOTH, } code = 'N' pcode = 'n' discoverable = ('ID', 'type', 'value') reserved_src_updates = frozenset(('outbound_count', 'edge_count', 'neighbor_count', 'neighbor_types')) - reserved_tgt_updates = frozenset(('inbound_count', 'edge_count', 'neighbor_count', 'neighbor_types')) + reserved_tgt_updates = frozenset(('inbound_count', 'edge_count', 'neighbor_count', 'neighbor_types')) reserved_both = frozenset(('edges', 'edgeIDs', 'edge_count', 'neighbors', 'neighborIDs', 'neighbor_count', 'neighbor_types')) reserved_src_only = frozenset(('outbound', 'outboundIDs', 'outbound_count')) - reserved_tgt_only = frozenset(('inbound', 'inboundIDs', 'inbound_count')) + reserved_tgt_only = frozenset(('inbound', 'inboundIDs', 'inbound_count')) reserved_src = reserved_both | reserved_src_only reserved_tgt = reserved_both | reserved_tgt_only reserved_internal = frozenset(discoverable + ('typeID', 'valueID')) @@ -824,7 +835,7 @@ def neighbor_count(self): def neighbor_types(self): types = {} for node in self.neighbors: - types[node.type] = types.get(node.type,0) + 1 + types[node.type] = types.get(node.type, 0) + 1 return types @builtin.property @@ -944,7 +955,7 @@ class Property(NodeEdgeProperty): _lib_updateID = lib.graph_prop_updateID pcode = 'p' discoverable = ('ID', 'key', 'value', 'parentID') - reserved = frozenset(discoverable + ('parent', 'keyID', 'valueID')) + reserved = frozenset(discoverable + ('parent', 'keyID', 'valueID')) is_property = True # methods - note that we do not cache the key/value, as both: @@ -1034,6 +1045,7 @@ def __del__(self): next = __next__ + # constructor glue ConstructorsByRecType[lib.GRAPH_NODE] = Node ConstructorsByRecType[lib.GRAPH_EDGE] = Edge @@ -1088,8 +1100,8 @@ def update(txn, start): else: raise Exception("no handlers!") -from .kv import KV -from .query import Query -from .fifo import Fifo -from .serializer import Serializer -from .sset import SSet +from .fifo import Fifo # noqa +from .kv import KV # noqa +from .query import Query # noqa +from .serializer import Serializer # noqa +from .sset import SSet # noqa diff --git a/LemonGraph/algorithms.py b/LemonGraph/algorithms.py index b88dc9e..e034d3d 100644 --- a/LemonGraph/algorithms.py +++ b/LemonGraph/algorithms.py @@ -1,5 +1,6 @@ from collections import deque + # source/target must be LemonGraph.Node objects from the same graph # if cost_cb is supplied: # it must be a callable and will be passed the edge being traversed @@ -12,9 +13,9 @@ def shortest_path(source, target, directed=False, cost_field=None, cost_default= if cost_cb is None: if cost_field is None: - cost_cb = lambda edge: cost_default + cost_cb = lambda edge: cost_default # noqa else: - cost_cb = lambda edge: edge.get(cost_field, cost_default) + cost_cb = lambda edge: edge.get(cost_field, cost_default) # noqa helper = _shortest_path_helper_directed if directed else _shortest_path_helper_undirected @@ -44,6 +45,7 @@ def shortest_path(source, target, directed=False, cost_field=None, cost_default= seen.pop() seen.pop() + def _shortest_path_helper_directed(node, seen, cost_cb, cost): for edge in node.iterlinks(filterIDs=seen, dir='out'): delta = cost_cb(edge) @@ -52,6 +54,7 @@ def _shortest_path_helper_directed(node, seen, cost_cb, cost): raise ValueError(delta) yield edge, edge.tgt, cost1 + def _shortest_path_helper_undirected(node, seen, cost_cb, cost): for edge in node.iterlinks(filterIDs=seen, dir='out'): delta = cost_cb(edge) diff --git a/LemonGraph/callableiterable.py b/LemonGraph/callableiterable.py index b93f176..ba3111b 100644 --- a/LemonGraph/callableiterable.py +++ b/LemonGraph/callableiterable.py @@ -1,5 +1,7 @@ # decorator shenanigans to bless class methods such that they can be referenced # as an iterable property or called with parameters (also returns an iterable) + + class CallableIterable(object): def __init__(self, func, target): self.func = func @@ -11,6 +13,7 @@ def __iter__(self): def __call__(self, *args, **kwargs): return self.func(self.target, *args, **kwargs) + class CallableIterableMethod(object): def __init__(self, fget=None): self.fget = fget @@ -26,5 +29,3 @@ def __get__(self, obj, type=None): obj.__CallableIterableMethod_cache = {} fn = obj.__CallableIterableMethod_cache[self.fget.__name__] = CallableIterable(self.fget, obj) return fn - - diff --git a/LemonGraph/cffi_stubs.py b/LemonGraph/cffi_stubs.py index 7e284fc..11e1682 100644 --- a/LemonGraph/cffi_stubs.py +++ b/LemonGraph/cffi_stubs.py @@ -240,7 +240,7 @@ C_KEYWORDS = dict( sources=['deps/lmdb/libraries/liblmdb/mdb.c', 'deps/lmdb/libraries/liblmdb/midl.c', 'lib/lemongraph.c', 'lib/db.c'], - include_dirs=['lib','deps/lmdb/libraries/liblmdb'], + include_dirs=['lib', 'deps/lmdb/libraries/liblmdb'], libraries=['z'], ) diff --git a/LemonGraph/collection.py b/LemonGraph/collection.py index 8af4186..27491dd 100644 --- a/LemonGraph/collection.py +++ b/LemonGraph/collection.py @@ -1,4 +1,3 @@ -from collections import deque import datetime import errno import logging @@ -6,15 +5,18 @@ import re import resource import signal -from six import iteritems, iterkeys import sys -from time import sleep, time import uuid +from collections import deque +from time import sleep, time from lazy import lazy + from pysigset import suspended_signals -from . import Graph, Serializer, Hooks, dirlist, Indexer, Query +from six import iteritems, iterkeys + +from . import Graph, Hooks, Indexer, Query, Serializer, dirlist try: xrange # Python 2 @@ -24,28 +26,32 @@ log = logging.getLogger(__name__) log.addHandler(logging.NullHandler()) + def uuidgen(): return str(uuid.uuid1()) + def uuid_to_utc_ts(u): return (uuid.UUID('{%s}' % u).time - 0x01b21dd213814000) // 1e7 + def uuid_to_utc(u): return datetime.datetime.utcfromtimestamp(uuid_to_utc_ts(u)).strftime('%Y-%m-%dT%H:%M:%S.%fZ') + class CollectionHooks(Hooks): - def __init__(self, uuid, collection): - self.uuid = uuid + def __init__(self, uuid_id, collection): + self.uuid_id = uuid_id self.collection = collection def opened(self, g): - self.collection.sync(self.uuid, g) + self.collection.sync(self.uuid_id, g) def updated(self, g, nextID, updates): - self.collection.sync_qflush(self.uuid, g) + self.collection.sync_qflush(self.uuid_id, g) def deleted(self): - self.collection.remove(self.uuid) + self.collection.remove(self.uuid_id) class StatusIndexer(Indexer): @@ -64,6 +70,7 @@ def idx_users(self, obj): except (KeyError, AttributeError): return () + class StatusIndex(object): domain = 'status' null = Serializer.null() @@ -73,19 +80,19 @@ def __init__(self, ctx): self.indexer = StatusIndexer() self._indexes = {} - def update(self, uuid, old, new): + def update(self, uuid_id, old, new): oldkeys = self.indexer.index(old) newkeys = self.indexer.index(new) - uuid = uuid.encode() + uuid_id = uuid_id.encode() for name, crc in oldkeys.difference(newkeys): keys = self._index(name) try: - keys.remove(crc + uuid) + keys.remove(crc + uuid_id) except KeyError: pass for name, crc in newkeys.difference(oldkeys): keys = self._index(name) - keys.add(crc + uuid) + keys.add(crc + uuid_id) def _index(self, idx): try: @@ -102,10 +109,11 @@ def search(self, idx, value): except KeyError: return for key in index.iterpfx(pfx=crc): - uuid = key[crclen:].decode() - status = self.ctx.statusDB[uuid] + uuid_id = key[crclen:].decode() + status = self.ctx.statusDB[uuid_id] if check(status): - yield uuid, status + yield uuid_id, status + class Context(object): def __init__(self, collection, write=True): @@ -130,20 +138,20 @@ def _graphs(self, user, roles): if roles: seen = set() for role in roles: - for uuid, status in self.status_index.search('user_roles', '%s\0%s' % (user, role)): - if uuid not in seen: - seen.add(uuid) - yield uuid, status + for uuid_id, status in self.status_index.search('user_roles', '%s\0%s' % (user, role)): + if uuid_id not in seen: + seen.add(uuid_id) + yield uuid_id, status elif user: - for uuid, status in self.status_index.search('users', user): - yield uuid, status + for uuid_id, status in self.status_index.search('users', user): + yield uuid_id, status else: try: all = iteritems(self.statusDB) except KeyError: return - for uuid, status in all: - yield uuid, status + for uuid_id, status in all: + yield uuid_id, status def _filter_objs(self, gen, filters): filters = map(lambda pat: 'n(%s)' % pat, filters) @@ -157,12 +165,12 @@ def _filter_objs(self, gen, filters): def graphs(self, enabled=None, user=None, roles=None, created_before=None, created_after=None, filters=None): gen = self._graphs(user, None if user is None else roles) if created_before is not None: - gen = ((uuid, status) for uuid, status in gen if uuid_to_utc_ts(uuid) < created_before) + gen = ((uuid_id, status) for uuid_id, status in gen if uuid_to_utc_ts(uuid_id) < created_before) if created_after is not None: - gen = ((uuid, status) for uuid, status in gen if uuid_to_utc_ts(uuid) > created_after) + gen = ((uuid_id, status) for uuid_id, status in gen if uuid_to_utc_ts(uuid_id) > created_after) if enabled is not None: - gen = ((uuid, status) for uuid, status in gen if status['enabled'] is enabled) - gen = (self._status_enrich(status, uuid) for uuid, status in gen) + gen = ((uuid_id, status) for uuid_id, status in gen if status['enabled'] is enabled) + gen = (self._status_enrich(status, uuid_id) for uuid_id, status in gen) if filters: gen = self._filter_objs(gen, filters) return gen @@ -171,34 +179,34 @@ def graph(self, *args, **kwargs): kwargs['ctx'] = self return self._graph(*args, **kwargs) - def _status_enrich(self, status, uuid): - output = { 'graph': uuid, 'id': uuid } + def _status_enrich(self, status, uuid_id): + output = {'graph': uuid_id, 'id': uuid_id} try: - output['meta'] = self.metaDB[uuid] + output['meta'] = self.metaDB[uuid_id] except KeyError: output['meta'] = {} for field in ('size', 'nodes_count', 'edges_count'): output[field] = status[field] output['maxID'] = status['nextID'] - 1 - output['created'] = uuid_to_utc(uuid) + output['created'] = uuid_to_utc(uuid_id) return output - def status(self, uuid): + def status(self, uuid_id): try: - return self._status_enrich(self.statusDB[uuid], uuid) + return self._status_enrich(self.statusDB[uuid_id], uuid_id) except KeyError: pass - def sync(self, uuid, txn): - old_status, new_status = self._sync_status(uuid, txn) - self.status_index.update(uuid, old_status, new_status) - self.metaDB[uuid] = txn.as_dict() + def sync(self, uuid_id, txn): + old_status, new_status = self._sync_status(uuid_id, txn) + self.status_index.update(uuid_id, old_status, new_status) + self.metaDB[uuid_id] = txn.as_dict() @lazy def status_index(self): return StatusIndex(self) - def _sync_status(self, uuid, txn): + def _sync_status(self, uuid_id, txn): status = {'nextID': txn.nextID, 'size': txn.graph.size, 'nodes_count': txn.nodes_count(), @@ -222,25 +230,25 @@ def _sync_status(self, uuid, txn): user_roles = self.user_roles(txn, user) if user_roles: cache[user] = sorted(user_roles) - except: # fixme + except Exception: # fixme pass try: - old_status = self.statusDB[uuid] + old_status = self.statusDB[uuid_id] except KeyError: old_status = None - self.statusDB[uuid] = status + self.statusDB[uuid_id] = status return old_status, status - def remove(self, uuid): + def remove(self, uuid_id): try: - status = self.statusDB.pop(uuid) - self.status_index.update(uuid, status, None) + status = self.statusDB.pop(uuid_id) + self.status_index.update(uuid_id, status, None) except KeyError: pass try: - del self.metaDB[uuid] + del self.metaDB[uuid_id] except KeyError: pass @@ -312,14 +320,14 @@ def __init__(self, dir, graph_opts=None, create=True, rebuild=False, **kwargs): def _sync_uuids(self, uuids): with self.context(write=True) as ctx: - for uuid in uuids: + for uuid_id in uuids: try: -# with ctx.graph(uuid, readonly=True, create=False, hook=False) as g: - with ctx.graph(uuid, create=False, hook=False) as g: + # with ctx.graph(uuid, readonly=True, create=False, hook=False) as g: + with ctx.graph(uuid_id, create=False, hook=False) as g: with g.transaction(write=False) as txn: - ctx.sync(uuid, txn) + ctx.sync(uuid_id, txn) except IOError as e: - log.warning('error syncing graph %s: %s', uuid, str(e)) + log.warning('error syncing graph %s: %s', uuid_id, str(e)) self.db.sync(force=True) @@ -328,41 +336,41 @@ def _fs_dbs(self): for x in dirlist(self.dir): if len(x) != 39 or x[-3:] != '.db': continue - uuid = x[0:36] - if UUID.match(uuid): - yield uuid + uuid_id = x[0:36] + if UUID.match(uuid_id): + yield uuid_id - def sync(self, uuid, g): + def sync(self, uuid_id, g): with self.context(write=True) as ctx: with g.transaction(write=False) as txn: - ctx.sync(uuid, txn) + ctx.sync(uuid_id, txn) - def sync_qflush(self, uuid, g): + def sync_qflush(self, uuid_id, g): with self.context(write=True) as ctx: with g.transaction(write=False) as txn: try: - ctx.sync(uuid, txn) + ctx.sync(uuid_id, txn) finally: try: - if uuid in ctx.updatedDB_idx: + if uuid_id in ctx.updatedDB_idx: return except KeyError: pass - ctx.updatedDB.push(uuid) - ctx.updatedDB_idx[uuid] = time() + ctx.updatedDB.push(uuid_id) + ctx.updatedDB_idx[uuid_id] = time() - def remove(self, uuid): + def remove(self, uuid_id): with self.context(write=True) as ctx: - ctx.remove(uuid) + ctx.remove(uuid_id) - def drop(self, uuid): - path = self.graph_path(uuid) + def drop(self, uuid_id): + path = self.graph_path(uuid_id) for x in (path, '%s-lock' % path): try: os.unlink(x) - except: + except Exception: pass - self.remove(uuid) + self.remove(uuid_id) def __del__(self): self.close() @@ -373,18 +381,18 @@ def close(self): self.db = None # opens a graph - def graph(self, uuid=None, hook=True, ctx=None, user=None, roles=None, **kwargs): - if uuid is None and kwargs.get('create', False): - uuid = uuidgen() - for k,v in iteritems(self.graph_opts): + def graph(self, uuid_id=None, hook=True, ctx=None, user=None, roles=None, **kwargs): + if uuid_id is None and kwargs.get('create', False): + uuid_id = uuidgen() + for k, v in iteritems(self.graph_opts): if k not in kwargs: kwargs[k] = v if hook: - kwargs['hooks'] = CollectionHooks(uuid, self) + kwargs['hooks'] = CollectionHooks(uuid_id, self) try: - g = Graph(self.graph_path(uuid), **kwargs) - except: - self.remove(uuid) if ctx is None else ctx.remove(uuid) + g = Graph(self.graph_path(uuid_id), **kwargs) + except Exception: + self.remove(uuid_id) if ctx is None else ctx.remove(uuid_id) raise if user is not None: # new graph - do not check creds @@ -393,7 +401,7 @@ def graph(self, uuid=None, hook=True, ctx=None, user=None, roles=None, **kwargs) else: if not self.user_allowed(g, user, roles): g.close() - raise OSError(errno.EPERM, 'Permission denied', uuid) + raise OSError(errno.EPERM, 'Permission denied', uuid_id) return g def user_allowed(self, g, user, roles): @@ -414,19 +422,19 @@ def user_roles(self, txn, user): @lazy def _words(self): - return re.compile('\w+') + return re.compile('\w+') # noqa - def status(self, uuid): + def status(self, uuid_id): with self.context(write=False) as ctx: - return ctx.status(uuid) + return ctx.status(uuid_id) def graphs(self, enabled=None): with self.context(write=False) as ctx: for x in ctx.graphs(enabled=enabled): yield x - def graph_path(self, uuid): - return "%s%s%s.db" % (self.dir, os.path.sep, uuid) + def graph_path(self, uuid_id): + return "%s%s%s.db" % (self.dir, os.path.sep, uuid_id) def context(self, write=True): return Context(self, write=write) @@ -442,7 +450,7 @@ def daemon(self, poll=250, maxopen=1000): try: os.fstat(fd) pad += 1 - except: + except Exception: pass # check limits @@ -483,15 +491,15 @@ def daemon(self, poll=250, maxopen=1000): log.debug("syncing") with self.context(write=True) as ctx: uuids = ctx.updatedDB.pop(n=maxopen) - for uuid in uuids: - age = ctx.updatedDB_idx.pop(uuid) + for uuid_id in uuids: + age = ctx.updatedDB_idx.pop(uuid_id) try: - fd = os.open(self.graph_path(uuid), os.O_RDONLY) + fd = os.open(self.graph_path(uuid_id), os.O_RDONLY) todo.append(fd) except OSError as e: # may have been legitimately deleted already if e.errno != errno.ENOENT: - log.warning('error syncing graph %s: %s', uuid, str(e)) + log.warning('error syncing graph %s: %s', uuid_id, str(e)) count += len(uuids) backlog = len(ctx.updatedDB) for fd in todo: diff --git a/LemonGraph/dbtool.py b/LemonGraph/dbtool.py index 8d3c37a..81d8b08 100755 --- a/LemonGraph/dbtool.py +++ b/LemonGraph/dbtool.py @@ -1,13 +1,15 @@ from __future__ import print_function -from . import Graph, Serializer, QuerySyntaxError import os import re import readline -from six import iteritems import sys import time +from six import iteritems + +from . import Graph, QuerySyntaxError, Serializer + try: raw_input # Python 2 except NameError: @@ -28,19 +30,20 @@ To query, enter one or more query patterns, joined by semi-colons. To force streaming or ad-hoc mode, the line may be preceded by: - start,stop: + start,stop: or just: - start: + start: Examples: - 400,500:n() - n(type="foo")->n() - 1:e(type="bar") + 400,500:n() + n(type="foo")->n() + 1:e(type="bar") ''' RANGE1 = re.compile(r'^(\d+)(?:\s*,\s*(\d+))?$') RANGE2 = re.compile(r'^(\d+)(?:\s*,\s*(\d+))?\s*:\s*') + def parse_range(m, default=None): a = int(m.group(1)) try: @@ -49,6 +52,7 @@ def parse_range(m, default=None): b = 0 return a, b + def do_query(txn, query, start=0, stop=0, interactive=False): m = RANGE2.match(query) if m: @@ -65,7 +69,7 @@ def do_query(txn, query, start=0, stop=0, interactive=False): mode = 'dump' total = None elif 'g' == query: - print(dict( (k, v) for k,v in iteritems(txn) )) + print(dict((k, v) for k, v in iteritems(txn))) mode = 'graph properties' total = None elif len(queries) > 1: @@ -83,7 +87,7 @@ def do_query(txn, query, start=0, stop=0, interactive=False): else: raise tstop = time.time() - return total, tstop-tstart, mode + return total, tstop - tstart, mode def main(g): @@ -97,15 +101,15 @@ def main(g): if os.isatty(sys.stdin.fileno()): while True: if prompt is None: - prompt = '%s> ' % (repr((start, stop) if stop else (start,lastID)) if start else lastID) + prompt = '%s> ' % (repr((start, stop) if stop else (start, lastID)) if start else lastID) try: line = raw_input(prompt) while True: hlen = readline.get_current_history_length() - if hlen < 2 or readline.get_history_item(hlen-1) != readline.get_history_item(hlen-2): + if hlen < 2 or readline.get_history_item(hlen - 1) != readline.get_history_item(hlen - 2): break - readline.remove_history_item(hlen-1) + readline.remove_history_item(hlen - 1) line = line.strip() except KeyboardInterrupt: continue @@ -121,7 +125,7 @@ def main(g): prompt = None line = None - elif line in ('help','?'): + elif line in ('help', '?'): print(HELP) line = None @@ -158,6 +162,7 @@ def main(g): with g.transaction(write=False) as txn: do_query(txn, line) + if '__main__' == __name__: if len(sys.argv) != 2: print("Database file path required!") diff --git a/LemonGraph/dirlist.py b/LemonGraph/dirlist.py index 516273c..5e1c93c 100644 --- a/LemonGraph/dirlist.py +++ b/LemonGraph/dirlist.py @@ -1,5 +1,6 @@ from . import ffi, lib, wire + def dirlist(path): try: dirp = lib.opendir(wire.encode(path)) diff --git a/LemonGraph/fifo.py b/LemonGraph/fifo.py index 02aae14..4df4138 100644 --- a/LemonGraph/fifo.py +++ b/LemonGraph/fifo.py @@ -1,16 +1,17 @@ -from . import lib, ffi, lazy -from .serializer import Serializer - from collections import deque +from . import ffi, lazy, lib +from .serializer import Serializer + UNSPECIFIED = object() + class Fifo(object): def __init__(self, txn, domain, map_values=False, serialize_domain=Serializer(), serialize_value=Serializer()): self.txn = txn self._txn = txn._txn self.domain = domain - self.serialize_value = serialize_value + self.serialize_value = serialize_value enc = serialize_domain.encode(domain) flags = lib.LG_KV_MAP_DATA if map_values else 0 self._dlen = ffi.new('size_t *') @@ -96,7 +97,7 @@ def __len__(self): return 0 last = self.serialize_key.decode(ffi.buffer(last, self._dlen[0])[:]) for idx, _ in KVIterator(self): - return 1+last-idx + return 1 + last - idx raise Exception def __del__(self): @@ -104,10 +105,11 @@ def __del__(self): lib.kv_deref(self._kv) self._kv = None + class KVIterator(object): def __init__(self, kv): - self.serialize_key = kv.serialize_key - self.serialize_value = kv.serialize_value + self.serialize_key = kv.serialize_key + self.serialize_value = kv.serialize_value self._key = ffi.new('void **') self._data = ffi.new('void **') self._klen = ffi.new('size_t *') diff --git a/LemonGraph/httpd.py b/LemonGraph/httpd.py index c0f68a7..30a49d5 100644 --- a/LemonGraph/httpd.py +++ b/LemonGraph/httpd.py @@ -1,34 +1,38 @@ # stock -from collections import deque + import logging import multiprocessing import os import re import signal import socket -from six import itervalues -from six.moves.urllib_parse import urlsplit import sys import time import traceback import zlib +from collections import deque + +from lazy import lazy + +from pysigset import suspended_signals + +from six import itervalues +from six.moves.urllib_parse import urlsplit from . import lib, wire try: import ujson + def json_encode(x): return ujson.dumps(x, escape_forward_slashes=False, ensure_ascii=False) json_decode = ujson.loads except ImportError: import json + def json_encode(x): return json.dumps(x, separators=(',', ':'), ensure_ascii=False) - json_decode = lambda x: json.loads(wire.decode(x)) - -# pypi -from lazy import lazy -from pysigset import suspended_signals + json_decode = lambda x: json.loads(wire.decode(x)) # noqa log = logging.getLogger(__name__) log.addHandler(logging.NullHandler()) @@ -39,14 +43,17 @@ def json_encode(x): 'debug': log.debug, } + def _generator(): yield + generator = type(_generator()) iterator = type(iter('')) string_bin = type(b'') string_uni = type(u'') + class Disconnected(Exception): def __init__(self, why, level='warn'): self.why = why @@ -59,6 +66,7 @@ def __repr__(self): def __str__(self): return 'disconnected: %s' % self.why + class ErrorCompleted(Exception): pass @@ -115,13 +123,13 @@ def method(self, m): class Chunk(object): def __init__(self, bs): self.bs = bs = int(bs) - header ='%x\r\n' % bs + header = '%x\r\n' % bs self.hlen = hlen = len(header) self.ba = bytearray(hlen + bs + 2) self.mem = memoryview(self.ba) self.header = self.mem[0:hlen] - self.payload = self.mem[hlen:hlen+bs] - self.payload_plus = self.mem[hlen:hlen+bs+2] + self.payload = self.mem[hlen:hlen + bs] + self.payload_plus = self.mem[hlen:hlen + bs + 2] self.hoffset = None self.size = None @@ -142,20 +150,20 @@ def chunk(self): @property def body(self): - return self.mem[self.hlen : self.hlen + self.size] + return self.mem[self.hlen: self.hlen + self.size] def _wrap(self, size): # update header header = wire.encode('%x\r\n' % size) hlen = len(header) self.hoffset = self.hlen - hlen - self.header[self.hoffset : self.hoffset + hlen] = header + self.header[self.hoffset: self.hoffset + hlen] = header # add footer - self.payload_plus[size:size+2] = b'\r\n' + self.payload_plus[size:size + 2] = b'\r\n' if size == self.bs: return self.mem - return self.mem[self.hoffset : self.hoffset + self.size + hlen + 2] + return self.mem[self.hoffset: self.hoffset + self.size + hlen + 2] class Chunks(object): @@ -197,18 +205,19 @@ def chunkify(self, gen): # then carve off full blocks directly from src while soff + bs <= slen: - chunk.payload[0:bs] = src[soff:soff+bs] + chunk.payload[0:bs] = src[soff:soff + bs] yield chunk chunk = next(chunks) soff += bs # and stash the remainder pos = slen - soff - chunk.payload[0:pos] = src[soff:soff+pos] + chunk.payload[0:pos] = src[soff:soff + pos] if pos: yield chunk(pos) + # because every multiprocessing.Process().start() very helpfully # does a waitpid(WNOHANG) across all known children, and I want # to use os.wait() to catch exiting children @@ -289,7 +298,7 @@ def _halt(sig, frame): # raise Graceful(True) signal.signal(signal.SIGTERM, _halt) - signal.signal(signal.SIGINT, _halt) + signal.signal(signal.SIGINT, _halt) # signal.signal(signal.SIGHUP, _reload) procs = {} @@ -313,7 +322,7 @@ def spawn(label, target): pid, status = os.wait() if pid > 0: proc, label, target = procs.pop(pid) - if status is 0: + if status == 0: log.info("-%s(%d): exit: %d", label, pid, status) else: log.warning("-%s(%d): exit: %d", label, pid, status) @@ -333,7 +342,7 @@ def spawn(label, target): except OSError: break proc, label, target = procs.pop(pid) - if status is 0: + if status == 0: log.info("-%s(%d): exit: %d", label, pid, status) else: log.warning("-%s(%d): exit: %d", label, pid, status) @@ -363,7 +372,7 @@ def worker(self): try: req = Request(conn, timeout=self.timeout) # hmm - it may not make sense to allow pipelining by default - look for magic header - if 'HTTP/1.1' == req.version and ('x-please-pipeline' not in req.headers or self.maxreqs is 0): + if 'HTTP/1.1' == req.version and ('x-please-pipeline' not in req.headers or self.maxreqs == 0): res.headers.set('Connection', 'close') self.process(req, res) except HTTPError as e: @@ -377,11 +386,11 @@ def worker(self): raise Disconnected('not HTTP/1.1', level='debug') elif res.headers.contains('Connection', 'close'): raise Disconnected('handler closed', level='debug') - except Disconnected as e: + except Disconnected: pass except socket.timeout: log.warning('client %s:%d: timed out', *addr) - except Exception as e: + except Exception: info = sys.exc_info() log.error('Unhandled exception: %s', traceback.print_exception(*info)) sys.exit(1) @@ -393,12 +402,12 @@ def worker(self): conn.close() log.debug('client %s:%d: finished', *addr) except Graceful: -# if e.reload: -# log.info("*master(%d): re-exec!", os.getpid()) -# log.warn(repr(cmd)) - #if __package__ is not None: - #sys.argv[0] = '-m%s' % __loader__.name - #os.execl(sys.executable, sys.executable, *sys.argv) + # if e.reload: + # log.info("*master(%d): re-exec!", os.getpid()) + # log.warn(repr(cmd)) + # if __package__ is not None: + # sys.argv[0] = '-m%s' % __loader__.name + # os.execl(sys.executable, sys.executable, *sys.argv) pass def process(self, req, res): @@ -455,7 +464,7 @@ def _chunked(self, body): chunks = self.chunks.chunkify(body) for first in chunks: for chunk in chunks: - self.res.headers.set('Transfer-Encoding','chunked') + self.res.headers.set('Transfer-Encoding', 'chunked') self.res.begin(default=200) self.res.send(first.chunk) self.res.send(chunk.chunk) @@ -485,6 +494,7 @@ class Response(object): 503: 'Service Unavailable', 507: 'Insufficient Storage', } + def __init__(self, sock): self._code = None self.sock = sock @@ -551,9 +561,10 @@ def code(self, newcode): def json(self, doc): return json_encode(doc) + class Request(object): req = re.compile('^(' + '|'.join(HTTPMethods.all_methods) + ') (.+?)(?: (HTTP/[0-9.]+))?(\r?\n)$') - hsplit = re.compile(':\s*') + hsplit = re.compile(':\s*') # noqa def __init__(self, sock, timeout=10): self.sock = sock @@ -570,7 +581,7 @@ def __init__(self, sock, timeout=10): # returns generator for posted content @lazy def body(self): - if self.method not in ('POST','PUT'): + if self.method not in ('POST', 'PUT'): return tuple() # RFC 2616 says that if Transfer-Encoding is set to anything other than 'identity', than @@ -589,14 +600,14 @@ def body(self): except KeyError: content_length = 0 - if self.headers.contains('Expect','100-continue'): + if self.headers.contains('Expect', '100-continue'): self.sock.send('HTTP/1.1 100 Continue\r\n\r\n') log.debug('continued!') # fixme - I'm not super sure how Transfer-Encoding and Content-Encoding get used # in the wild - can you specify both chunked and gzip (or other) in TE? Does order matter? body = self._body_chunked() if chunked else self._body_raw(content_length) - if self.headers.contains('Content-Encoding','gzip'): + if self.headers.contains('Content-Encoding', 'gzip'): body = zcat(body) return body @@ -810,5 +821,6 @@ def httpd(**kwargs): service = Service(**kwargs) service.run() + if '__main__' == __name__: httpd() diff --git a/LemonGraph/indexer.py b/LemonGraph/indexer.py index af57951..744572e 100644 --- a/LemonGraph/indexer.py +++ b/LemonGraph/indexer.py @@ -1,8 +1,11 @@ -import msgpack -from zlib import crc32 from struct import pack +from zlib import crc32 + +import msgpack + from six import iteritems + class Indexer(object): def __init__(self): idx = {} @@ -32,11 +35,12 @@ def index(self, obj): def key(self, name, value, smash=True): hash(value) - return str(name), pack('=i',crc32(msgpack.packb(value))) + return str(name), pack('=i', crc32(msgpack.packb(value))) def prequery(self, index, value): key = self.key(index, value) method = self._idx[key[0]] + def check(obj): return value in method(obj) return tuple(key) + (check,) diff --git a/LemonGraph/kv.py b/LemonGraph/kv.py index 8bb5d79..09c7fd9 100644 --- a/LemonGraph/kv.py +++ b/LemonGraph/kv.py @@ -1,14 +1,15 @@ -from . import lib, ffi, wire, listify_py2 +from . import ffi, lib, listify_py2, wire from .serializer import Serializer + class KV(object): def __init__(self, txn, domain, map_data=False, map_keys=False, serialize_domain=Serializer(), serialize_key=Serializer(), serialize_value=Serializer()): self._kv = None self.txn = txn self._txn = txn._txn self.domain = domain - self.serialize_key = serialize_key - self.serialize_value = serialize_value + self.serialize_key = serialize_key + self.serialize_value = serialize_value enc = serialize_domain.encode(domain) flags = 0 if map_keys: @@ -88,10 +89,11 @@ def empty(self): return False return True + class KVIterator(object): def __init__(self, kv, handler, pfx=None): - self.serialize_key = kv.serialize_key - self.serialize_value = kv.serialize_value + self.serialize_key = kv.serialize_key + self.serialize_value = kv.serialize_value self.handler = handler self._key = ffi.new('void **') self._data = ffi.new('void **') diff --git a/LemonGraph/lock.py b/LemonGraph/lock.py index dc41327..1623791 100644 --- a/LemonGraph/lock.py +++ b/LemonGraph/lock.py @@ -1,5 +1,6 @@ -import os import fcntl +import os + class Lock(object): # byte-range file locking is tracked by process for an inode. @@ -33,7 +34,6 @@ def exclusive(self, *keys, **kwargs): kwargs['excl'] = True return self.lock(*keys, **kwargs) - def _ctx_cleanup(self): self.ctx = None diff --git a/LemonGraph/query.py b/LemonGraph/query.py index c0d8383..a5913c0 100644 --- a/LemonGraph/query.py +++ b/LemonGraph/query.py @@ -1,9 +1,12 @@ -from collections import deque, defaultdict + import itertools +from collections import defaultdict, deque + from six import iteritems, iterkeys -from . import Node, Edge -from .MatchLGQL import MatchLGQL, MatchCTX, QueryCannotMatch, eval_test +from . import Edge, Node +from .MatchLGQL import MatchCTX, MatchLGQL, QueryCannotMatch, eval_test + class Query(object): magic = { @@ -32,16 +35,15 @@ def __init__(self, patterns, cache=None): compiled = deque() for p_idx, p in enumerate(patterns): try: - c = self.cache[('p',p)] + c = self.cache[('p', p)] except KeyError: try: - c = self.cache[('p',p)] = MatchLGQL(p, cache=self.cache) + c = self.cache[('p', p)] = MatchLGQL(p, cache=self.cache) except QueryCannotMatch: - c = self.cache[('p',p)] = None + c = self.cache[('p', p)] = None compiled.append(c) self.compiled = self.cache[('c',) + patterns] = tuple(compiled) - def _gen_handlers(self): if self.handlers is not None: return @@ -52,7 +54,7 @@ def _gen_handlers(self): except KeyError: pass - setdict = lambda: defaultdict(set) + setdict = lambda: defaultdict(set) # noqa triggers = defaultdict(setdict) triggers['n'] = defaultdict(setdict) triggers['e'] = defaultdict(setdict) @@ -61,7 +63,7 @@ def _gen_handlers(self): if c is None: continue for idx, match in enumerate(c.matches): - target_type = match['type'] # N or E + target_type = match['type'] # N or E jump = self.magic[target_type] toc = (p_idx, idx) for test in match['tests']: @@ -78,10 +80,10 @@ def _gen_handlers(self): # squish down to regular dicts and frozensets for code in iterkeys(triggers): if code in 'ne': - triggers[code] = dict( (k0, dict( (test, frozenset(tocs)) for test, tocs in iteritems(d) )) for k0, d in iteritems(triggers[code]) ) + triggers[code] = dict((k0, dict((test, frozenset(tocs)) for test, tocs in iteritems(d))) for k0, d in iteritems(triggers[code])) else: - triggers[code] = dict( (test, frozenset(tocs)) for test, tocs in iteritems(triggers[code]) ) - triggers = dict( (k, v) for k, v in iteritems(triggers) if v ) + triggers[code] = dict((test, frozenset(tocs)) for test, tocs in iteritems(triggers[code])) + triggers = dict((k, v) for k, v in iteritems(triggers) if v) edge_funcs = deque() if 'E' in triggers: @@ -102,11 +104,13 @@ def _gen_handlers(self): if edge_funcs: edge_funcs = tuple(edge_funcs) + def _edge_handler(target): seen = set() for func in edge_funcs: for ret in func(target, seen): yield ret + def _edge_handler2(target): seen = set() h = deque() @@ -162,7 +166,7 @@ def _adhoc(self, txn, stop, limit): continue ctx = MatchCTX(c) p = self.patterns[p_idx] - for seed in c.seeds(txn, beforeID=(stop+1) if stop else None): + for seed in c.seeds(txn, beforeID=(stop + 1) if stop else None): yield (p, ctx.matches(seed, idx=c.best)) def validate(self, *chains): @@ -174,7 +178,7 @@ def validate(self, *chains): try: ctxs_by_len[len(c.keep)][p] = MatchCTX(c) except KeyError: - ctxs_by_len[len(c.keep)] = { p: MatchCTX(c) } + ctxs_by_len[len(c.keep)] = {p: MatchCTX(c)} for chain in chains: if not isinstance(chain, tuple): diff --git a/LemonGraph/serializer.py b/LemonGraph/serializer.py index b3c5327..649b194 100644 --- a/LemonGraph/serializer.py +++ b/LemonGraph/serializer.py @@ -1,7 +1,9 @@ -from . import ffi, lib, wire -import msgpack as messagepack + import collections -import sys + +import msgpack as messagepack + +from . import ffi, lib, wire try: xrange # Python 2 @@ -27,6 +29,7 @@ class memoryview_ish(str): # monkey patch that too def wrap_unpack_from(): func = struct.unpack_from + def unpack_from_wrapper(*args, **kwargs): if isinstance(args[1], bytearray): args = list(args) @@ -47,6 +50,7 @@ def unpack_from_wrapper(*args, **kwargs): def identity(x): return x + class Serializer(object): @staticmethod def str_encode(x): @@ -113,9 +117,10 @@ def uints(cls, count, decode_type=tuple, string=False): count = int(count) if count < 1: raise ValueError(count) - buffer = ffi.new('char[]', count*9) + buffer = ffi.new('char[]', count * 9) buffers = {} decoded = ffi.new('uint64_t[]', count) + def encode(n): if len(n) != count: raise ValueError(n) @@ -139,6 +144,7 @@ def _uints_string(count, decode_type=tuple): raise ValueError(count) buffer = ffi.new('char[511]') decoded = ffi.new('uint64_t[]', count) + def encode(n): if len(n) != count: raise ValueError(n) @@ -151,14 +157,14 @@ def encode(n): if size + strlen > 511: raise ValueError() - buffer[size:size+strlen] = string + buffer[size:size + strlen] = string size += strlen return ffi.buffer(buffer, size)[:] def decode(b): size = lib.unpack_uints(count, decoded, b[:]) - buf = ffi.buffer(buffer, size + decoded[count-1]) - ret = list(int(decoded[i]) for i in xrange(0, count-1)) + buf = ffi.buffer(buffer, size + decoded[count - 1]) + ret = list(int(decoded[i]) for i in xrange(0, count - 1)) ret.append(wire.decode(buf[size:])) return decode_type(ret) diff --git a/LemonGraph/server/__init__.py b/LemonGraph/server/__init__.py index afc9c3e..1a97cb5 100644 --- a/LemonGraph/server/__init__.py +++ b/LemonGraph/server/__init__.py @@ -1,51 +1,61 @@ from __future__ import print_function import atexit -from collections import deque, namedtuple -import dateutil.parser import errno import itertools -from lazy import lazy import logging -import msgpack import os -import pkg_resources import re -from six import iteritems, itervalues -from six.moves.urllib_parse import parse_qs import sys import tempfile import time import traceback +from collections import deque, namedtuple from uuid import uuid1 as uuidgen -from .. import Serializer, Node, Edge, QuerySyntaxError, merge_values +import dateutil.parser + +from lazy import lazy + +import msgpack + +import pkg_resources + + +from six import iteritems, itervalues +from six.moves.urllib_parse import parse_qs + +from .. import Edge, Node, QuerySyntaxError, Serializer, merge_values from ..collection import Collection, uuid_to_utc +from ..httpd import HTTPError, HTTPMethods, httpd, json_decode, json_encode from ..lock import Lock -from ..httpd import HTTPMethods, HTTPError, httpd, json_encode, json_decode log = logging.getLogger(__name__) log.addHandler(logging.NullHandler()) CACHE = {} -BLOCKSIZE=1048576 +BLOCKSIZE = 1048576 UUID = re.compile(r'^[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}$') INT = re.compile(r'^[1-9][0-9]*$') STRING = re.compile(r'^.+$') + def date_to_timestamp(s): dt = dateutil.parser.parse(s) return time.mktime(dt.utctimetuple()) + dt.microsecond // 1e6 + def js_dumps(obj, pretty=False): txt = json_encode(obj) if pretty: txt += '\n' return txt + def mp_dumps(obj, pretty=False): return msgpack.packb(obj, raw=True) + Streamer = namedtuple('Streamer', 'mime encode') streamJS = Streamer('application/json', js_dumps) streamMP = Streamer('application/x-msgpack', mp_dumps) @@ -55,6 +65,7 @@ def mp_dumps(obj, pretty=False): Edge_Reserved = Edge.reserved Node_Reserved = Node.reserved | frozenset(['depth']) + class SeedTracker(object): msgpack = Serializer.msgpack() uint = Serializer.uint() @@ -80,6 +91,7 @@ def seeds(self): return () return gen + class Handler(HTTPMethods): content_types = { 'application/json': json_decode, @@ -122,8 +134,8 @@ def content_type(self): def content_length(self): try: return int(str(self.req.headers['Content-Length'])) - except: - #if self.req.headers.contains('Transfer-Encoding', 'chunked'): + except Exception: + # if self.req.headers.contains('Transfer-Encoding', 'chunked'): pass @property @@ -136,7 +148,7 @@ def params(self): # grab latest param only def param(self, field, default=None): - return self.params.get(field,[default])[-1] + return self.params.get(field, [default])[-1] def input(self): try: @@ -170,8 +182,8 @@ def init(self, req, res): def format_edge(self, e): d = e.as_dict() - d['src'] = { "ID": e.src.ID, "type": e.src.type, "value": e.src.value } - d['tgt'] = { "ID": e.tgt.ID, "type": e.tgt.type, "value": e.tgt.value } + d['src'] = {"ID": e.src.ID, "type": e.src.type, "value": e.src.value} + d['tgt'] = {"ID": e.tgt.ID, "type": e.tgt.type, "value": e.tgt.value} del d['srcID'] del d['tgtID'] return d @@ -179,7 +191,7 @@ def format_edge(self, e): @property def creds(self): return { - 'user': self.param('user', None), + 'user': self.param('user', None), 'roles': self.params.get('role', None), } @@ -220,9 +232,11 @@ def tmp_graph(self, uuid): return fd, name, dbname, path msgpack = Serializer.msgpack() + def kv(self, txn): return txn.kv('lg.restobjs', serialize_value=self.msgpack) + def graphtxn(write=False, create=False, excl=False, on_success=None, on_failure=None): def decorator(func): def wrapper(self, _, uuid, *args, **kwargs): @@ -314,10 +328,10 @@ def do_chains(self, txn, chains, seed, create=False): raise HTTPError(409, 'Bad data - chain length must be odd') nodes = self.map_nodes(txn, chain[0::2], seed=seed, create=create) for i, n in enumerate(nodes): - chain[i*2] = n + chain[i * 2] = n for i, x in enumerate(chain): if i % 2: - chain[i] = self.do_edge(txn, x, src=chain[i-1], tgt=chain[i+1], seed=seed, create=create) + chain[i] = self.do_edge(txn, x, src=chain[i - 1], tgt=chain[i + 1], seed=seed, create=create) def do_nodes(self, txn, nodes, seed=False, create=False): for n in nodes: @@ -334,11 +348,11 @@ def do_node(self, txn, node, seed=False, create=False): try: if create: del node['ID'] - param = { 'ID': node['ID'] } + param = {'ID': node['ID']} except KeyError: - param = { 'type': node['type'], 'value': node['value'] } + param = {'type': node['type'], 'value': node['value']} - props = dict( (k, v) for k, v in iteritems(node) if k not in Node_Reserved) + props = dict((k, v) for k, v in iteritems(node) if k not in Node_Reserved) if seed: props['seed'] = True elif not create: @@ -361,12 +375,12 @@ def do_edge(self, txn, edge, src=None, tgt=None, seed=False, create=False): try: if create: del edge['ID'] - param = { 'ID': edge['ID'] } + param = {'ID': edge['ID']} except KeyError: src = self._edge_node(txn, edge, src, seed=seed, create=create, is_src=True) tgt = self._edge_node(txn, edge, tgt, seed=seed, create=create, is_src=False) param = dict(type=edge['type'], value=edge.get('value', ''), src=src, tgt=tgt) - props = dict( (k, v) for k, v in iteritems(edge) if k not in Edge_Reserved) + props = dict((k, v) for k, v in iteritems(edge) if k not in Edge_Reserved) if seed: props['seed'] = seed elif not create: @@ -374,14 +388,14 @@ def do_edge(self, txn, edge, src=None, tgt=None, seed=False, create=False): try: e = txn.edge(**param) - except: + except Exception: raise HTTPError(409, "Bad edge: %s" % edge) # handle edge costs, allowed range: [0.0, 1.0] # silently drop bad values: # cost defaults to 1 when edge is created # once cost is assigned, value may not be increased, as - # depth recalculation could (would?) cover the entire graph + # depth recalculation could (would?) cover the entire graph try: cost = props['cost'] if cost < 0 or cost > 1 or cost >= e['cost']: @@ -397,7 +411,8 @@ def _create(self, g, txn, _, uuid): self.do_input(txn, uuid, create=True) self.res.code = 201 self.res.headers.set('Location', '/graph/' + uuid) - return self.dumps({ 'uuid': uuid, 'id': uuid }, pretty=True) + return self.dumps({'uuid': uuid, 'id': uuid}, pretty=True) + class _Streamy(object): def _stream_js(self, gen): @@ -441,6 +456,7 @@ def _dump_json(self, txn, uuid, nodes, edges): pass yield ']}\n' + class Graph_Root(_Input, _Streamy): path = ('graph',) @@ -461,7 +477,7 @@ def _query_graphs(self, graphs, queries): def __query_graphs(self, uuids, queries, qtoc): for uuid in uuids: try: -# with self.graph(uuid, readonly=True, create=False, hook=False) as g: + # with self.graph(uuid, readonly=True, create=False, hook=False) as g: with self.graph(uuid, create=False, hook=False) as g: with g.transaction(write=False) as txn: try: @@ -479,6 +495,7 @@ def post(self, _): uuid = str(uuidgen()) return self._create(None, uuid) + class Graph_UUID(_Input, _Streamy): path = ('graph', UUID,) inf = float('Inf') @@ -486,7 +503,7 @@ class Graph_UUID(_Input, _Streamy): def delete(self, _, uuid): with self.lock.exclusive(uuid) as locked: # opening the graph checks user/role perms -# with self.graph(uuid, readonly=True, create=False, locked=locked) as g: + # with self.graph(uuid, readonly=True, create=False, locked=locked) as g: with self.graph(uuid, create=False, locked=locked): self.collection.drop(uuid) @@ -512,12 +529,12 @@ def put(self, _, uuid): if len(data) == 0: break fh.write(data) - cleanup.popleft()() # fh.close() -# with self.collection.graph(dbname, readonly=True, hook=False, create=False) as g: + cleanup.popleft()() # fh.close() + # with self.collection.graph(dbname, readonly=True, hook=False, create=False) as g: with self.collection.graph(dbname, hook=False, create=False): pass os.rename(path, target) - cleanup.pop() # remove os.unlink(path) + cleanup.pop() # remove os.unlink(path) cleanup.append(lambda: os.unlink('%s-lock' % target)) except Exception as e: os.unlink(name) @@ -526,12 +543,12 @@ def put(self, _, uuid): for x in cleanup: try: x() - except: + except Exception: pass def _snapshot(self, g, uuid): self.res.headers.set('Content-Type', 'application/octet-stream') - self.res.headers.set('Content-Disposition','attachment; filename="%s.db"' % uuid) + self.res.headers.set('Content-Disposition', 'attachment; filename="%s.db"' % uuid) for block in g.snapshot(bs=BLOCKSIZE): yield block @@ -653,6 +670,7 @@ def post(self, _, uuid): for x in self._update(None, uuid): yield x + class Graph_UUID_Status(Handler): path = ('graph', UUID, 'status') @@ -669,6 +687,7 @@ def status(self, uuid): raise HTTPError(404, '%s status is not cached' % uuid) return status + class Reset_UUID(_Input, Handler): path = ('reset', UUID,) @@ -683,7 +702,7 @@ def put(self, _, uuid): os.close(fd) cleanup = [lambda: os.unlink(path)] try: -# with self.graph(uuid, readonly=True, locked=locked) as g1, self.collection.graph(dbname, create=True, hook=False) as g2: + # with self.graph(uuid, readonly=True, locked=locked) as g1, self.collection.graph(dbname, create=True, hook=False) as g2: with self.graph(uuid, locked=locked) as g1, self.collection.graph(dbname, create=True, hook=False) as g2: with g1.transaction(write=False) as t1, g2.transaction(write=True) as t2: # fixme @@ -692,25 +711,25 @@ def put(self, _, uuid): if keep is None: keep = self.default_keep - if keep.get('kv',False): + if keep.get('kv', False): self.clone_kv(t1, t2) seeds = keep.get('seeds', None) if seeds: self.clone_seeds(uuid, t1, t2, seeds) target = g1.path - cleanup.pop()() # unlink(path-lock) + cleanup.pop()() # unlink(path-lock) try: # fixme os.unlink('%s-lock' % target) except OSError: pass os.rename(path, target) - cleanup.pop() # unlink(path) + cleanup.pop() # unlink(path) # remove from index self.collection.remove(uuid) # open to re-index, bypass creds check, allow hooks to run -# with self.collection.graph(uuid, readonly=True): + # with self.collection.graph(uuid, readonly=True): with self.collection.graph(uuid): pass except (IOError, OSError) as e: @@ -723,7 +742,7 @@ def put(self, _, uuid): for x in cleanup: try: x() - except: + except Exception: pass def clone_kv(self, src, dst): @@ -743,6 +762,7 @@ def clone_seeds(self, uuid, src, dst, limit): if i is limit: break + class D3_UUID(_Streamy, Handler): path = ('d3', UUID) @@ -763,12 +783,12 @@ def _dump_json(self, txn): n = next(nodes) nmap[n.ID] = nidx nidx += 1 - yield self.dumps({ 'data': n.as_dict() }) + yield self.dumps({'data': n.as_dict()}) for n in nodes: yield ',' nmap[n.ID] = nidx nidx += 1 - yield self.dumps({ 'data': n.as_dict() }) + yield self.dumps({'data': n.as_dict()}) except StopIteration: pass yield '],"edges":[' @@ -778,22 +798,26 @@ def _dump_json(self, txn): yield self.dumps({ 'data': e.as_dict(), 'source': nmap[e.srcID], - 'target': nmap[e.tgtID] }) + 'target': nmap[e.tgtID] + }) for e in edges: yield ',' yield self.dumps({ 'data': e.as_dict(), 'source': nmap[e.srcID], - 'target': nmap[e.tgtID] }) + 'target': nmap[e.tgtID] + }) except StopIteration: pass yield ']}\n' + def exec_wrapper(code, **gvars): lvars = {} exec(code, gvars, lvars) return lvars + class Graph_Exec(_Input, _Streamy): path = ('graph', 'exec') eat_params = ('enabled', 'user', 'role', 'created_after', 'created_before', 'filter') @@ -827,15 +851,17 @@ def post(self, _, __): def _txns_uuids(self, uuids): for uuid in uuids: try: -# with self.graph(uuid, readonly=True, create=False, hook=False) as g: + # with self.graph(uuid, readonly=True, create=False, hook=False) as g: with self.graph(uuid, create=False, hook=False) as g: with g.transaction(write=False) as txn: yield txn, uuid - except: + except Exception: pass + class Graph_UUID_Exec(_Input, _Streamy): path = ('graph', UUID, 'exec') + @graphtxn(write=False) def post(self, g, txn, _, uuid, __): if self.content_type != 'application/python': @@ -859,6 +885,7 @@ def post(self, g, txn, _, uuid, __): except Exception as e: raise HTTPError(400, 'handler raised exception: %s' % e) + class Graph_UUID_Meta(_Input): path = ('graph', UUID, 'meta') @@ -870,12 +897,15 @@ def get(self, g, txn, _, uuid, __): def put(self, g, txn, _, uuid, __): self.do_meta(txn, self.input()) + class Graph_UUID_Seeds(Handler, _Streamy): path = ('graph', UUID, 'seeds') + @graphtxn(write=False) def get(self, g, txn, _, uuid, __): return self.stream(SeedTracker(txn).seeds) + class Graph_UUID_Node_ID(_Input): path = ('graph', UUID, 'node', INT) @@ -884,7 +914,7 @@ def get(self, g, txn, _, uuid, __, ID): try: return self.dumps(txn.node(ID=int(ID)).as_dict()) except TypeError: - raise HTTPError(404,"not a node") + raise HTTPError(404, "not a node") @graphtxn(write=True) def put(self, g, txn, _, uuid, __, ID): @@ -893,6 +923,7 @@ def put(self, g, txn, _, uuid, __, ID): data['ID'] = ID self.do_node(txn, data) + class Graph_UUID_Edge_ID(_Input): path = ('graph', UUID, 'edge', INT) @@ -910,6 +941,7 @@ def put(self, g, txn, _, uuid, __, ID): data['ID'] = ID self.do_edge(txn, data) + class KV_UUID(Handler): path = ('kv', UUID) @@ -957,6 +989,7 @@ def delete(self, g, txn, _, uuid): for k in kv.iterkeys(): del kv[k] + class KV_UUID_Key(Handler): path = ('kv', UUID, STRING) @@ -988,8 +1021,9 @@ def delete(self, g, txn, _, uuid, key): except KeyError: raise HTTPError(404, 'key not found') + class Static(Handler): - res = re.compile('^[^/]+\.(js|css|html)') + res = re.compile('^[^/]+\.(js|css|html)') # noqa mime = { '.html': 'text/html', '.css': 'text/css', @@ -1004,7 +1038,7 @@ def get(self, _, resource): body = self.cache[resource] except KeyError: body = pkg_resources.resource_string(__name__, 'data/%s' % resource) - #self.cache[resource] = body + # self.cache[resource] = body extension = os.path.splitext(resource)[1] try: @@ -1014,18 +1048,21 @@ def get(self, _, resource): self.res.headers.set('Content-Length', len(body)) return body + class View_UUID(Static): path = ('view', UUID) def get(self, _, uuid): return super(View_UUID, self).get(Static.path[0], self.param('style', 'd3v4') + '.html') + class Favicon(Static): path = ('favicon.ico',) def get(self, _): return super(Favicon, self).get(Static.path[0], 'lemon.png') + class Server(object): def __init__(self, collection_path=None, graph_opts=None, **kwargs): classes = ( @@ -1053,6 +1090,6 @@ def __init__(self, collection_path=None, graph_opts=None, **kwargs): global collection collection = None - handlers = tuple( H(collection_path=collection_path, graph_opts=graph_opts) for H in classes) + handlers = tuple(H(collection_path=collection_path, graph_opts=graph_opts) for H in classes) kwargs['handlers'] = handlers httpd(**kwargs) diff --git a/LemonGraph/server/__main__.py b/LemonGraph/server/__main__.py index 996a57b..198632e 100644 --- a/LemonGraph/server/__main__.py +++ b/LemonGraph/server/__main__.py @@ -1,13 +1,15 @@ from __future__ import print_function -from .. import Serializer, Adapters -from ..collection import Collection -from ..httpd import Graceful -from . import Server -import sys import logging +import sys from getopt import GetoptError, gnu_getopt as getopt +from . import Server +from .. import Adapters, Serializer +from ..collection import Collection +from ..httpd import Graceful + + class LogHandler(logging.StreamHandler): def __init__(self, stream=sys.stdout): logging.StreamHandler.__init__(self, stream) @@ -16,6 +18,7 @@ def __init__(self, stream=sys.stdout): formatter = logging.Formatter(fmt, fmt_date) self.setFormatter(formatter) + def usage(msg=None, fh=sys.stderr): print('Usage: python -mLemonGraph.server [graphs-dir]', file=fh) print('', file=fh) @@ -36,6 +39,7 @@ def usage(msg=None, fh=sys.stderr): print(msg, file=fh) sys.exit(1) + def seed_depth0(): while True: txn, entry = yield @@ -43,6 +47,7 @@ def seed_depth0(): if entry.is_property and entry.key == 'seed' and entry.value and entry.is_node_property: entry.parent['depth'] = 0 + def process_edge(txn, e, cost=1, inf=float('Inf')): # grab latest versions of endpoints src = txn.node(ID=e.srcID) @@ -54,10 +59,12 @@ def process_edge(txn, e, cost=1, inf=float('Inf')): elif dt + cost < ds: src['depth'] = dt + cost + def apply_cost(txn, prop): # cost value validity has already been enforced above process_edge(txn, prop.parent, cost=prop.value) + def cascade_depth(txn, prop): # grab latest version of parent node node = txn.node(ID=prop.parentID) @@ -70,6 +77,7 @@ def cascade_depth(txn, prop): pass n2['depth'] = mindepth + def update_depth_cost(): while True: txn, entry = yield @@ -84,6 +92,7 @@ def update_depth_cost(): if entry.is_edge_property: apply_cost(txn, entry) + def main(): levels = ('NOTSET', 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL') try: @@ -141,7 +150,7 @@ def main(): usage() all_logs = tuple("LemonGraph.%s" % x for x in ('httpd', 'collection', 'server')) - log_levels = dict( (k, default_level) for k in all_logs) + log_levels = dict((k, default_level) for k in all_logs) for token in logspec.split(','): try: target, level = token.split('=', 1) @@ -162,11 +171,11 @@ def main(): logger.addHandler(loghandler) logger.setLevel(getattr(logging, level.upper())) - graph_opts=dict( + graph_opts = dict( serialize_property_value=Serializer.msgpack(), adapters=Adapters(seed_depth0, update_depth_cost), nosync=nosync, nometasync=nometasync, - ) + ) # initialize the collection up front (which will re-create if missing) # before we turn on the web server @@ -184,4 +193,5 @@ def _syncd(): Server(collection_path=path, graph_opts=graph_opts, extra_procs={'syncd': _syncd}, host=ip, port=port, spawn=workers, timeout=timeout, buflen=buflen) + main() diff --git a/LemonGraph/snapshot.py b/LemonGraph/snapshot.py index 0913794..675822d 100755 --- a/LemonGraph/snapshot.py +++ b/LemonGraph/snapshot.py @@ -1,11 +1,12 @@ from __future__ import print_function + import sys from . import Graph try: db = sys.argv[1] -except: +except Exception: print("Usage: python -mLemonGraph.snapshot path/to/src.db > dst.db", file=sys.stderr) sys.exit(1) diff --git a/LemonGraph/sset.py b/LemonGraph/sset.py index a1ad956..3570d6b 100644 --- a/LemonGraph/sset.py +++ b/LemonGraph/sset.py @@ -1,14 +1,15 @@ -from . import lib, ffi, wire +from . import ffi, lib, wire from .serializer import Serializer UNSPECIFIED = object() + class SSet(object): def __init__(self, txn, domain, map_values=False, serialize_domain=Serializer(), serialize_value=Serializer()): self.txn = txn self._txn = txn._txn self.domain = domain - self.serialize_value = serialize_value + self.serialize_value = serialize_value enc = serialize_domain.encode(domain) flags = 0 if map_values: @@ -72,9 +73,10 @@ def empty(self): return False return True + class SSetIterator(object): def __init__(self, kv, handler, pfx=None): - self.serialize_value = kv.serialize_value + self.serialize_value = kv.serialize_value self._key = ffi.new('void **') self._data = ffi.new('void **') self._klen = ffi.new('size_t *') diff --git a/LemonGraph/wire.py b/LemonGraph/wire.py index 19fe26d..a0771df 100644 --- a/LemonGraph/wire.py +++ b/LemonGraph/wire.py @@ -11,7 +11,7 @@ # but return binary if unicode decode failed def decode(data): try: - u = data.decode() + data.decode() except UnicodeDecodeError: return data return data.decode() @@ -26,7 +26,7 @@ def encode(data): raise TypeError('Unsupported type %s' % type(data)) else: - uni_types = unicode + uni_types = unicode # noqa try: bin_types = (str, memoryview) except NameError: diff --git a/bench.py b/bench.py index bca9245..6e9eed2 100644 --- a/bench.py +++ b/bench.py @@ -1,13 +1,14 @@ from __future__ import print_function import os -from random import randint import sys import tempfile +from random import randint from time import time import LemonGraph + def log(msg, *params): times.append(time()) fmt = "%.3lf\t+%.3lf\t" + msg @@ -15,9 +16,11 @@ def log(msg, *params): print(fmt % args) sys.stdout.flush() + def munge(t, n): return t + str(n % 5) + mil = tuple(range(0, 1000000)) pairs = set() while len(pairs) < 1000000: @@ -31,7 +34,7 @@ def munge(t, n): try: g = LemonGraph.Graph(path, serialize_property_value=LemonGraph.Serializer.msgpack(), noreadahead=True, nosync=True) nuke.append(path + '-lock') - ret = LemonGraph.lib.graph_set_mapsize(g._graph, (1<<30) * 10) + ret = LemonGraph.lib.graph_set_mapsize(g._graph, (1 << 30) * 10) assert(0 == ret) times = [time()] @@ -58,7 +61,7 @@ def munge(t, n): start = times[-1] for i, x_y in enumerate(pairs): x, y = x_y - e = txn.edge(type=munge('edge', x+y), value=i, src=nodes[x], tgt=nodes[y]) + e = txn.edge(type=munge('edge', x + y), value=i, src=nodes[x], tgt=nodes[y]) log("+1m edges") elapsed = times[-1] - start print("total edge insert time: %.3lf" % elapsed) @@ -71,5 +74,5 @@ def munge(t, n): for p in nuke: try: os.unlink(p) - except: + except Exception: pass diff --git a/lg_cffi_setup.py b/lg_cffi_setup.py index 6cf0ac2..c61c810 100644 --- a/lg_cffi_setup.py +++ b/lg_cffi_setup.py @@ -1,6 +1,7 @@ # adapted from https://caremad.io/2015/06/distributing-a-cffi-project-redux/ on 18 May 2016 from distutils.command.build import build + from setuptools.command.install import install SETUP_REQUIRES_ERROR = ( diff --git a/setup.py b/setup.py index e55c1be..289fdc2 100644 --- a/setup.py +++ b/setup.py @@ -1,14 +1,15 @@ from __future__ import print_function + import os import platform import stat import subprocess import sys -from setuptools import setup - from lg_cffi_setup import keywords_with_side_effects +from setuptools import setup + def git_submodule_init(): lmdb_src = os.path.sep.join(('deps', 'lmdb')) diff --git a/test.py b/test.py index bb55780..da27e59 100644 --- a/test.py +++ b/test.py @@ -2,10 +2,16 @@ import tempfile import unittest -from LemonGraph import Graph, Serializer, dirlist, Query +from LemonGraph import Graph, Query, Serializer, dirlist + + +def node(i): + return dict((k, Nodes[i][k]) for k in ('type', 'value')) + + +def edge(i): + return dict((k, Edges[i][k]) for k in ('type', 'value', 'src', 'tgt')) -node = lambda i: dict((k, Nodes[i][k]) for k in ('type', 'value')) -edge = lambda i: dict((k, Edges[i][k]) for k in ('type', 'value', 'src', 'tgt')) Nodes = [ {'type': 'foo', 'value': 'bar', 'properties': {'np1k': 'np1v'}}, @@ -226,6 +232,7 @@ def test_reset(self): class TestAlgorithms(unittest.TestCase): serializer = Serializer.msgpack() # each test_foo() method is wrapped w/ setup/teardown around it, so each test has a fresh graph + def setUp(self): fd, path = tempfile.mkstemp() os.close(fd) @@ -281,7 +288,7 @@ def test_sp2(self): e0b e1b \ / n1b - ''' + ''' # noqa n0 = txn.node(type='foo', value='0') n1a = txn.node(type='foo', value='1a') n1b = txn.node(type='foo', value='1b') @@ -301,7 +308,6 @@ def test_sp2(self): # use default cost to make it find the upper path again self.assertPathEqual(n0.shortest_path(n2, cost_field='cost', cost_default=0.5), (n0, e0a, n1a, e1a, n2)) - def assertPathEqual(self, a, b): self.assertEqual(type(a), type(b)) if a is not None: @@ -352,11 +358,12 @@ def test_uints_string(self): def test_msgpack(self): s = Serializer.msgpack() - a = { 'foo': 'bar', u'foo\u2020': u'bar\u2020' } + a = {'foo': 'bar', u'foo\u2020': u'bar\u2020'} b = s.encode(a) c = s.decode(b) self.assertEqual(a, c) + class TestDL(unittest.TestCase): def test_dirlist(self): dots = 0 diff --git a/tox.ini b/tox.ini new file mode 100644 index 0000000..ba01e7d --- /dev/null +++ b/tox.ini @@ -0,0 +1,29 @@ +[tox] +envlist = py36,flake8,test + +[testenv] + +[testenv:flake8] +deps = + flake8 + flake8-import-order>=0.9 + pep8-naming + flake8-colors +commands = flake8 . + +[testenv:test] +commands = python test.py + +[flake8] +ignore = + E501 + W503 + W504 + N801 # Should be fixed (e.g.: class name 'Graph_UUID_Status' should use CapWords convention) + N802 # Should be fixed (e.g.: function name 'ByID' should be lowercase) + N803 # Should be fixed (e.g.: argument name 'ID' should be lowercase) + N805 # Should be fixed (e.g.: first argument of a method should be named 'self') + N806 # Should be fixed (e.g.: variable 'ID' in function should be lowercase) + N815 # Should be fixed (e.g.: variable '_lib_byID' in class scope should not be mixedCase) + N816 # Should be fixed (e.g.: variable 'streamJS' in global scope should not be mixedCase) +format = ${cyan}%(path)s${reset}:${yellow_bold}%(row)d${reset}:${green_bold}%(col)d${reset}: ${red_bold}%(code)s${reset} %(text)s