Skip to content
Snippets Groups Projects
Unverified Commit 56897671 authored by Jeffrey Phillips Freeman's avatar Jeffrey Phillips Freeman 💥
Browse files

Merge: remote-tracking branch 'origin/optimistic'

parents f08f30dc 6450090f
No related branches found
No related tags found
No related merge requests found
# Goblin Changelog # Goblin Changelog
## v2.1.1 ## v2.2.0
* Added Immutable meta-property
* Added optimistic locking on-creation.
* Fixed incorrect hashable id handling on Janusgraph. * Fixed incorrect hashable id handling on Janusgraph.
* Updated to run on newer versions of gremlin-python, 3.4.3 is the latest compatible version. * Updated to run on newer versions of gremlin-python, 3.4.3 is the latest compatible version.
import abc import abc
import logging import logging
from gremlin_python.process.traversal import Cardinality from gremlin_python.process.traversal import Cardinality # type: ignore
from goblin import element, exception, manager from goblin import element, exception, manager
......
"""Goblin application class and class constructor""" """goblin application class and class constructor"""
import collections import collections
import importlib import importlib
import logging import logging
import aiogremlin import aiogremlin # type: ignore
from goblin import element, provider, session from goblin import element, provider, session
......
from aiogremlin import Cluster, DriverRemoteConnection, Graph from aiogremlin import Cluster, DriverRemoteConnection, Graph # type: ignore
from aiogremlin.driver.client import Client from aiogremlin.driver.client import Client # type: ignore
from aiogremlin.driver.connection import Connection from aiogremlin.driver.connection import Connection # type: ignore
from aiogremlin.driver.pool import ConnectionPool from aiogremlin.driver.pool import ConnectionPool # type: ignore
from aiogremlin.driver.server import GremlinServer from aiogremlin.driver.server import GremlinServer # type: ignore
from gremlin_python.driver.serializer import GraphSONMessageSerializer from gremlin_python.driver.serializer import GraphSONMessageSerializer # type: ignore
AsyncGraph = Graph AsyncGraph = Graph
...@@ -2,13 +2,22 @@ ...@@ -2,13 +2,22 @@
import logging import logging
import inflection import inflection # type: ignore
from gremlin_python.process.traversal import Cardinality from gremlin_python.process.traversal import Cardinality # type: ignore
from enum import Enum
from goblin import abc, exception, mapper, properties from goblin import abc, exception, mapper, properties
#from goblin.element import Property
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class ImmutableMode(Enum):
OFF = 0
SIMPLE = 1
class LockingMode(Enum):
OFF = 0
OPTIMISTIC_LOCKING = 1
class ElementMeta(type): class ElementMeta(type):
""" """
...@@ -50,6 +59,8 @@ class ElementMeta(type): ...@@ -50,6 +59,8 @@ class ElementMeta(type):
new_namespace[k] = v new_namespace[k] = v
new_namespace['__mapping__'] = mapper.create_mapping(namespace, props) new_namespace['__mapping__'] = mapper.create_mapping(namespace, props)
new_namespace['__properties__'] = props new_namespace['__properties__'] = props
new_namespace['__immutable__'] = namespace.get('__immutable__', ImmutableMode.OFF)
new_namespace['__locking__'] = namespace.get('__locking__', LockingMode.OFF)
result = type.__new__(cls, name, bases, new_namespace) result = type.__new__(cls, name, bases, new_namespace)
return result return result
...@@ -67,6 +78,7 @@ class Element(metaclass=ElementMeta): ...@@ -67,6 +78,7 @@ class Element(metaclass=ElementMeta):
setattr(self, key, value) setattr(self, key, value)
id = properties.IdProperty(properties.Generic) id = properties.IdProperty(properties.Generic)
dirty = properties.Property(properties.String)
class VertexPropertyDescriptor: class VertexPropertyDescriptor:
......
"""Helper functions and class to map between OGM Elements <-> DB Elements""" """Helper functions and class to map between OGM Elements <-> DB Elements"""
from typing import Any, Dict
import functools import functools
import logging import logging
...@@ -87,15 +88,15 @@ def map_vertex_to_ogm(result, props, element, *, mapping=None): ...@@ -87,15 +88,15 @@ def map_vertex_to_ogm(result, props, element, *, mapping=None):
return element return element
# temp hack # TODO: temp hack
def get_hashable_id(val): def get_hashable_id(val: Dict[str, Any]) -> Any:
# Use the value "as-is" by default.
if isinstance(val, dict) and "@type" in val and "@value" in val: if isinstance(val, dict) and "@type" in val and "@value" in val:
if val["@type"] == "janusgraph:RelationIdentifier": if val["@type"] == "janusgraph:RelationIdentifier":
val = val["@value"].get("value", val["@value"]["relationId"]) val = val["@value"].get("value", val["@value"]["relationId"])
return val return val
def map_vertex_property_to_ogm(result, element, *, mapping=None): def map_vertex_property_to_ogm(result, element, *, mapping=None):
"""Map a vertex returned by DB to OGM vertex""" """Map a vertex returned by DB to OGM vertex"""
for (val, metaprops) in result: for (val, metaprops) in result:
......
"""Classes to handle properties and data type definitions""" """Classes to handle properties and data type definitions"""
import logging import logging
from typing import Any
from gremlin_python.statics import long from gremlin_python.statics import long # type: ignore
from goblin import abc, exception from goblin import abc, exception
...@@ -207,12 +208,12 @@ class Float(abc.DataType): ...@@ -207,12 +208,12 @@ class Float(abc.DataType):
class Boolean(abc.DataType): class Boolean(abc.DataType):
"""Simple boolean datatype""" """Simple boolean datatype"""
def validate(self, val): def validate(self, val: Any):
try: try:
val = bool(val) val = bool(val)
except ValueError: except ValueError:
raise exception.ValidationError( raise exception.ValidationError(
"Not a valid boolean: {val}".format(val)) from e "Not a valid boolean: {val}".format(val)) from e # type: ignore
return val return val
def to_db(self, val=None): def to_db(self, val=None):
......
from typing import Dict, Any
class Provider: class Provider:
"""Superclass for provider plugins""" """Superclass for provider plugins"""
DEFAULT_OP_ARGS = {} DEFAULT_OP_ARGS: Dict[Any, Any] = {}
@classmethod @classmethod
def get_default_op_args(cls, processor): def get_default_op_args(cls, processor):
......
...@@ -4,17 +4,19 @@ import asyncio ...@@ -4,17 +4,19 @@ import asyncio
import collections import collections
import logging import logging
import weakref import weakref
import uuid
from typing import Callable, Awaitable, Any, Optional
import aiogremlin import aiogremlin # type: ignore
from aiogremlin.driver.protocol import Message from aiogremlin.driver.protocol import Message # type: ignore
from aiogremlin.driver.resultset import ResultSet from aiogremlin.driver.resultset import ResultSet # type: ignore
from aiogremlin.process.graph_traversal import __ from gremlin_python.process.graph_traversal import __, GraphTraversal # type: ignore
from gremlin_python.driver.remote_connection import RemoteTraversal from gremlin_python.driver.remote_connection import RemoteTraversal # type: ignore
from gremlin_python.process.traversal import Binding, Cardinality, Traverser from gremlin_python.process.traversal import Binding, Cardinality, Traverser # type: ignore
from gremlin_python.structure.graph import Edge, Vertex from gremlin_python.structure.graph import Edge, Vertex # type: ignore
from goblin import exception, mapper from goblin import exception, mapper
from goblin.element import GenericEdge, GenericVertex, VertexProperty from goblin.element import GenericEdge, GenericVertex, VertexProperty, ImmutableMode, LockingMode
from goblin.manager import VertexPropertyManager from goblin.manager import VertexPropertyManager
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
...@@ -228,14 +230,53 @@ class Session: ...@@ -228,14 +230,53 @@ class Session:
for elem in elements: for elem in elements:
self._pending.append(elem) self._pending.append(elem)
async def flush(self): async def flush(
self,
conflicts_query: Optional[GraphTraversal] = None
) -> None:
""" """
Issue creation/update queries to database for all elements in the Issue creation/update queries to database for all elements in the
session pending queue. session pending queue.
""" """
while self._pending: transaction_id = str(uuid.uuid4())
elem = self._pending.popleft() processed = []
await self.save(elem) try:
while self._pending:
elem = self._pending.popleft()
actual_id = self.__dirty_element(elem, id=transaction_id)
if actual_id:
processed.append(await self.save(elem))
else:
await self.save(elem)
if not processed: return
if not conflicts_query:
await self.__commit_transaction(transaction_id)
else:
await (self.
g.
E().
has('dirty', transaction_id).
aggregate('x').
fold().
V().
has('dirty', transaction_id).
aggregate('x').
choose(
conflicts_query,
__.
select('x').
unfold().
properties('dirty').
drop()).
iterate()) # type: ignore
await self.__rollback_transaction(transaction_id)
except Exception as e:
await self.__rollback_transaction(transaction_id)
raise e
for elem in processed:
elem.dirty = None
async def remove_vertex(self, vertex): async def remove_vertex(self, vertex):
""" """
...@@ -347,6 +388,22 @@ class Session: ...@@ -347,6 +388,22 @@ class Session:
eid = Binding('eid', edge.id) eid = Binding('eid', edge.id)
return await self.g.E(eid).next() return await self.g.E(eid).next()
def __dirty_element(self, elem, id = str(uuid.uuid4())):
if elem.__locking__ and elem.__locking__ == LockingMode.OPTIMISTIC_LOCKING:
if not elem.dirty:
elem.dirty = id
return id
else:
return elem.dirty
return None
async def __commit_transaction(self, id):
if id: await self._g.E().has('dirty',id).aggregate('x').fold().V().has('dirty',id).aggregate('x').select('x').unfold().properties('dirty').drop().iterate()
async def __rollback_transaction(self, id):
print("id of: %s" % id)
if id: await self._g.E().has('dirty',id).aggregate('x').fold().V().has('dirty',id).aggregate('x').select('x').unfold().drop().iterate()
async def _update_vertex(self, vertex): async def _update_vertex(self, vertex):
""" """
Update a vertex, generally to change/remove property values. Update a vertex, generally to change/remove property values.
...@@ -388,15 +445,35 @@ class Session: ...@@ -388,15 +445,35 @@ class Session:
elem = element.__mapping__.mapper_func(elem, props, element) elem = element.__mapping__.mapper_func(elem, props, element)
return elem return elem
async def __handle_create_func(self, elem, create_func):
transaction_id = elem.dirty
if not transaction_id:
transaction_id = self.__dirty_element(elem)
if transaction_id:
result = None
try:
result = await create_func(elem)
await self.__commit_transaction(transaction_id)
result.dirty = None
except Exception as e:
await self.__rollback_transaction(transaction_id)
raise e
return result
return await create_func(elem)
async def _save_element(self, elem, check_func, create_func, update_func): async def _save_element(self, elem, check_func, create_func, update_func):
if hasattr(elem, 'id'): if hasattr(elem, 'id'):
exists = await check_func(elem) exists = await check_func(elem)
if not exists: if not exists:
result = await create_func(elem) result = await self.__handle_create_func(elem, create_func)
else: else:
if elem.__immutable__ and elem.__immutable__ != ImmutableMode.OFF: raise AttributeError("Trying to update an immutable element: %s" % elem)
result = await update_func(elem) result = await update_func(elem)
else: else:
result = await create_func(elem) result = await self.__handle_create_func(elem, create_func)
return result return result
async def _add_vertex(self, vertex): async def _add_vertex(self, vertex):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment