diff --git a/CHANGELOG.md b/CHANGELOG.md index 45d340799897e3ff1ce545b7198220e33321c534..446ba288f5dfca65ded34fdcbd827528a8c0584c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,8 @@ # Goblin Changelog -## v2.1.1 +## v2.2.0 +* Added Immutable meta-property +* Added optimistic locking on-creation. * Fixed incorrect hashable id handling on Janusgraph. * Updated to run on newer versions of gremlin-python, 3.4.3 is the latest compatible version. diff --git a/goblin/abc.py b/goblin/abc.py index b7d144f9a14051dd413eb68eeb4f6739cb42c603..5661f3f3455291a525699397be23e68d1b4291a9 100644 --- a/goblin/abc.py +++ b/goblin/abc.py @@ -1,7 +1,7 @@ import abc import logging -from gremlin_python.process.traversal import Cardinality +from gremlin_python.process.traversal import Cardinality # type: ignore from goblin import element, exception, manager diff --git a/goblin/app.py b/goblin/app.py index 0b23bde57e278a53216f15df120376617fc8bae1..068700282f8745232d156c550a194c85a05a0f83 100644 --- a/goblin/app.py +++ b/goblin/app.py @@ -1,10 +1,10 @@ -"""Goblin application class and class constructor""" +"""goblin application class and class constructor""" import collections import importlib import logging -import aiogremlin +import aiogremlin # type: ignore from goblin import element, provider, session diff --git a/goblin/driver/__init__.py b/goblin/driver/__init__.py index 5fbfd4713c8c92a9107e7c22562a014f0e8f5a6a..fdbda646d2a0f91568026b465be36f6ae4150dbe 100644 --- a/goblin/driver/__init__.py +++ b/goblin/driver/__init__.py @@ -1,8 +1,8 @@ -from aiogremlin import Cluster, DriverRemoteConnection, Graph -from aiogremlin.driver.client import Client -from aiogremlin.driver.connection import Connection -from aiogremlin.driver.pool import ConnectionPool -from aiogremlin.driver.server import GremlinServer -from gremlin_python.driver.serializer import GraphSONMessageSerializer +from aiogremlin import Cluster, DriverRemoteConnection, Graph # type: ignore +from aiogremlin.driver.client import Client # type: ignore +from aiogremlin.driver.connection import Connection # type: ignore +from aiogremlin.driver.pool import ConnectionPool # type: ignore +from aiogremlin.driver.server import GremlinServer # type: ignore +from gremlin_python.driver.serializer import GraphSONMessageSerializer # type: ignore AsyncGraph = Graph diff --git a/goblin/element.py b/goblin/element.py index 883124d022dae88aab04fa2015eb6ebb04b2eca8..722bbf5d95561dda3e71888e06001a05ccd166db 100644 --- a/goblin/element.py +++ b/goblin/element.py @@ -2,13 +2,22 @@ import logging -import inflection -from gremlin_python.process.traversal import Cardinality +import inflection # type: ignore +from gremlin_python.process.traversal import Cardinality # type: ignore +from enum import Enum from goblin import abc, exception, mapper, properties +#from goblin.element import Property logger = logging.getLogger(__name__) +class ImmutableMode(Enum): + OFF = 0 + SIMPLE = 1 + +class LockingMode(Enum): + OFF = 0 + OPTIMISTIC_LOCKING = 1 class ElementMeta(type): """ @@ -50,6 +59,8 @@ class ElementMeta(type): new_namespace[k] = v new_namespace['__mapping__'] = mapper.create_mapping(namespace, props) new_namespace['__properties__'] = props + new_namespace['__immutable__'] = namespace.get('__immutable__', ImmutableMode.OFF) + new_namespace['__locking__'] = namespace.get('__locking__', LockingMode.OFF) result = type.__new__(cls, name, bases, new_namespace) return result @@ -67,6 +78,7 @@ class Element(metaclass=ElementMeta): setattr(self, key, value) id = properties.IdProperty(properties.Generic) + dirty = properties.Property(properties.String) class VertexPropertyDescriptor: diff --git a/goblin/mapper.py b/goblin/mapper.py index ed171016e7bba2e842d0a054c85255ef4d3ac6c8..15a3420a6d4b6be210bc0129daba46dee60ae772 100644 --- a/goblin/mapper.py +++ b/goblin/mapper.py @@ -1,5 +1,6 @@ """Helper functions and class to map between OGM Elements <-> DB Elements""" +from typing import Any, Dict import functools import logging @@ -87,15 +88,15 @@ def map_vertex_to_ogm(result, props, element, *, mapping=None): return element -# temp hack -def get_hashable_id(val): - # Use the value "as-is" by default. +# TODO: temp hack +def get_hashable_id(val: Dict[str, Any]) -> Any: if isinstance(val, dict) and "@type" in val and "@value" in val: if val["@type"] == "janusgraph:RelationIdentifier": val = val["@value"].get("value", val["@value"]["relationId"]) return val + def map_vertex_property_to_ogm(result, element, *, mapping=None): """Map a vertex returned by DB to OGM vertex""" for (val, metaprops) in result: diff --git a/goblin/properties.py b/goblin/properties.py index 19e18dcce2e7b60a839f83883b65d00d33dbc7f8..7ad4de8431f6067bbce3cabc17ef5247967b1590 100644 --- a/goblin/properties.py +++ b/goblin/properties.py @@ -1,8 +1,9 @@ """Classes to handle properties and data type definitions""" import logging +from typing import Any -from gremlin_python.statics import long +from gremlin_python.statics import long # type: ignore from goblin import abc, exception @@ -207,12 +208,12 @@ class Float(abc.DataType): class Boolean(abc.DataType): """Simple boolean datatype""" - def validate(self, val): + def validate(self, val: Any): try: val = bool(val) except ValueError: raise exception.ValidationError( - "Not a valid boolean: {val}".format(val)) from e + "Not a valid boolean: {val}".format(val)) from e # type: ignore return val def to_db(self, val=None): diff --git a/goblin/provider.py b/goblin/provider.py index d45cd6b7d6ed89e044c05c2cf0aa9f12c42abd27..47cd95873f5ff45af987cec8c943015bd9006c7d 100644 --- a/goblin/provider.py +++ b/goblin/provider.py @@ -1,6 +1,8 @@ +from typing import Dict, Any + class Provider: """Superclass for provider plugins""" - DEFAULT_OP_ARGS = {} + DEFAULT_OP_ARGS: Dict[Any, Any] = {} @classmethod def get_default_op_args(cls, processor): diff --git a/goblin/session.py b/goblin/session.py index 1fcf84b66b41650700f3112682ede9ed5a4b42c4..8df2083cf354f32bd6ebca429fabdf4a7b8034c3 100644 --- a/goblin/session.py +++ b/goblin/session.py @@ -4,17 +4,19 @@ import asyncio import collections import logging import weakref +import uuid +from typing import Callable, Awaitable, Any, Optional -import aiogremlin -from aiogremlin.driver.protocol import Message -from aiogremlin.driver.resultset import ResultSet -from aiogremlin.process.graph_traversal import __ -from gremlin_python.driver.remote_connection import RemoteTraversal -from gremlin_python.process.traversal import Binding, Cardinality, Traverser -from gremlin_python.structure.graph import Edge, Vertex +import aiogremlin # type: ignore +from aiogremlin.driver.protocol import Message # type: ignore +from aiogremlin.driver.resultset import ResultSet # type: ignore +from gremlin_python.process.graph_traversal import __, GraphTraversal # type: ignore +from gremlin_python.driver.remote_connection import RemoteTraversal # type: ignore +from gremlin_python.process.traversal import Binding, Cardinality, Traverser # type: ignore +from gremlin_python.structure.graph import Edge, Vertex # type: ignore from goblin import exception, mapper -from goblin.element import GenericEdge, GenericVertex, VertexProperty +from goblin.element import GenericEdge, GenericVertex, VertexProperty, ImmutableMode, LockingMode from goblin.manager import VertexPropertyManager logger = logging.getLogger(__name__) @@ -228,14 +230,53 @@ class Session: for elem in elements: self._pending.append(elem) - async def flush(self): + async def flush( + self, + conflicts_query: Optional[GraphTraversal] = None + ) -> None: """ Issue creation/update queries to database for all elements in the session pending queue. """ - while self._pending: - elem = self._pending.popleft() - await self.save(elem) + transaction_id = str(uuid.uuid4()) + processed = [] + try: + while self._pending: + elem = self._pending.popleft() + actual_id = self.__dirty_element(elem, id=transaction_id) + if actual_id: + processed.append(await self.save(elem)) + else: + await self.save(elem) + + if not processed: return + if not conflicts_query: + await self.__commit_transaction(transaction_id) + else: + await (self. + g. + E(). + has('dirty', transaction_id). + aggregate('x'). + fold(). + V(). + has('dirty', transaction_id). + aggregate('x'). + choose( + conflicts_query, + + __. + select('x'). + unfold(). + properties('dirty'). + drop()). + iterate()) # type: ignore + await self.__rollback_transaction(transaction_id) + except Exception as e: + await self.__rollback_transaction(transaction_id) + raise e + for elem in processed: + elem.dirty = None async def remove_vertex(self, vertex): """ @@ -347,6 +388,22 @@ class Session: eid = Binding('eid', edge.id) return await self.g.E(eid).next() + def __dirty_element(self, elem, id = str(uuid.uuid4())): + if elem.__locking__ and elem.__locking__ == LockingMode.OPTIMISTIC_LOCKING: + if not elem.dirty: + elem.dirty = id + return id + else: + return elem.dirty + return None + + async def __commit_transaction(self, id): + if id: await self._g.E().has('dirty',id).aggregate('x').fold().V().has('dirty',id).aggregate('x').select('x').unfold().properties('dirty').drop().iterate() + + async def __rollback_transaction(self, id): + print("id of: %s" % id) + if id: await self._g.E().has('dirty',id).aggregate('x').fold().V().has('dirty',id).aggregate('x').select('x').unfold().drop().iterate() + async def _update_vertex(self, vertex): """ Update a vertex, generally to change/remove property values. @@ -388,15 +445,35 @@ class Session: elem = element.__mapping__.mapper_func(elem, props, element) return elem + + async def __handle_create_func(self, elem, create_func): + transaction_id = elem.dirty + if not transaction_id: + transaction_id = self.__dirty_element(elem) + if transaction_id: + result = None + try: + result = await create_func(elem) + await self.__commit_transaction(transaction_id) + result.dirty = None + except Exception as e: + await self.__rollback_transaction(transaction_id) + raise e + return result + + return await create_func(elem) + + async def _save_element(self, elem, check_func, create_func, update_func): if hasattr(elem, 'id'): exists = await check_func(elem) if not exists: - result = await create_func(elem) + result = await self.__handle_create_func(elem, create_func) else: + if elem.__immutable__ and elem.__immutable__ != ImmutableMode.OFF: raise AttributeError("Trying to update an immutable element: %s" % elem) result = await update_func(elem) else: - result = await create_func(elem) + result = await self.__handle_create_func(elem, create_func) return result async def _add_vertex(self, vertex):