Skip to content
Snippets Groups Projects
Commit 141379d6 authored by davebshow's avatar davebshow
Browse files

restructured project

parent 19ad2110
No related branches found
No related tags found
No related merge requests found
import logging
from goblin import abc
from goblin import mapper
from goblin import properties
logger = logging.getLogger(__name__)
class ElementMeta(type):
"""Metaclass for graph elements. Responsible for creating the
the :py:class:`mapper.Mapping` object and replacing user defined
:py:class:`property.Property` with
:py:class:`property.PropertyDescriptor`"""
def __new__(cls, name, bases, namespace, **kwds):
if bases:
namespace['__type__'] = bases[0].__name__.lower()
props = {}
new_namespace = {}
for k, v in namespace.items():
if isinstance(v, abc.BaseProperty):
props[k] = v
v = v.__descriptor__(k, v)
new_namespace[k] = v
new_namespace['__mapping__'] = mapper.create_mapping(namespace,
props)
logger.warning("Creating new Element class {}: {}".format(
name, new_namespace['__mapping__']))
result = type.__new__(cls, name, bases, new_namespace)
return result
class Element(metaclass=ElementMeta):
pass
class Vertex(Element):
"""Base class for user defined Vertex classes"""
pass
class Edge(Element):
"""Base class for user defined Edge classes"""
def __init__(self, source=None, target=None):
if source:
self._source = source
if target:
self._target = target
def getsource(self):
return self._source
def setsource(self, val):
assert isinstance(val, Vertex) or val is None
self._source = val
def delsource(self):
del self._source
source = property(getsource, setsource, delsource)
def gettarget(self):
return self._target
def settarget(self, val):
assert isinstance(val, Vertex) or val is None
self._target = val
def deltarget(self):
del self._target
target = property(gettarget, settarget, deltarget)
class VertexPropertyDescriptor:
"""Descriptor that validates user property input and gets/sets properties
as instance attributes."""
def __init__(self, name, vertex_property):
self._name = '_' + name
self._vertex_property = vertex_property.__class__
self._data_type = vertex_property.data_type
self._default = vertex_property.default
def __get__(self, obj, objtype):
if obj is None:
return self._vertex_property
default = self._default
if default:
default = self._data_type.validate(default)
default = self._vertex_property(self._default)
return getattr(obj, self._name, default)
def __set__(self, obj, val):
if isinstance(val, (list, tuple , set)):
vertex_property = []
for v in val:
v = self._data_type.validate(v)
vertex_property.append(
self._vertex_property(self._data_type, value=v))
else:
val = self._data_type.validate(val)
vertex_property = self._vertex_property(self._data_type, value=val)
setattr(obj, self._name, vertex_property)
class VertexProperty(Element, abc.BaseProperty):
__descriptor__ = VertexPropertyDescriptor
def __init__(self, data_type, *, value=None, default=None):
if isinstance(data_type, type):
data_type = data_type()
self._data_type = data_type
self._value = value
self._default = default
@property
def default(self):
self._default
@property
def data_type(self):
return self._data_type
@property
def value(self):
return self._value
def __repr__(self):
return '<{}(type={}, value={})'.format(self.__class__.__name__,
self._data_type, self.value)
"""Main OGM API classes and constructors"""
import collections
import logging
from goblin import gremlin_python
from goblin import driver
from goblin import session
logger = logging.getLogger(__name__)
# Constructor API
async def create_engine(url,
loop,
maxsize=256,
force_close=False,
force_release=True):
"""Constructor function for :py:class:`Engine`. Connects to database
and builds a dictionary of relevant vendor implmentation features"""
features = {}
# This will be some kind of manager client etc.
conn = await driver.GremlinServer.open(url, loop)
# Propbably just use a parser to parse the whole feature list
stream = await conn.submit(
'graph.features().graph().supportsComputer()')
msg = await stream.fetch_data()
features['computer'] = msg.data[0]
stream = await conn.submit(
'graph.features().graph().supportsTransactions()')
msg = await stream.fetch_data()
features['transactions'] = msg.data[0]
stream = await conn.submit(
'graph.features().graph().supportsPersistence()')
msg = await stream.fetch_data()
features['persistence'] = msg.data[0]
stream = await conn.submit(
'graph.features().graph().supportsConcurrentAccess()')
msg = await stream.fetch_data()
features['concurrent_access'] = msg.data[0]
stream = await conn.submit(
'graph.features().graph().supportsThreadedTransactions()')
msg = await stream.fetch_data()
features['threaded_transactions'] = msg.data[0]
return Engine(url, conn, loop, **features)
# Main API classes
class Engine(driver.AbstractConnection):
"""Class used to encapsulate database connection configuration and generate
database connections. Used as a factory to create :py:class:`Session`
objects. More config coming soon."""
def __init__(self, url, conn, loop, *, force_close=True, **features):
self._url = url
self._conn = conn
self._loop = loop
self._force_close = force_close
self._features = features
self._translator = gremlin_python.GroovyTranslator('g')
@property
def translator(self):
return self._translator
@property
def url(self):
return self._url
@property
def conn(self):
return self._conn
def session(self, *, use_session=False):
return session.Session(self, use_session=use_session)
async def submit(self, query, *, bindings=None, session=None):
return await self._conn.submit(query, bindings=bindings)
async def close(self):
await self.conn.close()
self._conn = None
import logging
from goblin import abc
from goblin import mapper
from goblin import properties
logger = logging.getLogger(__name__)
# Graph elements
class ElementMeta(type):
"""Metaclass for graph elements. Responsible for creating the
the :py:class:`mapper.Mapping` object and replacing user defined
:py:class:`property.Property` with
:py:class:`property.PropertyDescriptor`"""
def __new__(cls, name, bases, namespace, **kwds):
if bases:
namespace['__type__'] = bases[0].__name__.lower()
props = {}
new_namespace = {}
for k, v in namespace.items():
if isinstance(v, abc.BaseProperty):
props[k] = v
v = v.__descriptor__(k, v)
new_namespace[k] = v
new_namespace['__mapping__'] = mapper.create_mapping(namespace,
props)
logger.warning("Creating new Element class {}: {}".format(
name, new_namespace['__mapping__']))
result = type.__new__(cls, name, bases, new_namespace)
return result
class Element(metaclass=ElementMeta):
pass
......@@ -2,7 +2,6 @@
import logging
from goblin import abc
from goblin import mapper
logger = logging.getLogger(__name__)
......
"""Query API and helpers"""
import asyncio
import logging
from goblin import gremlin_python
from goblin import mapper
......@@ -35,47 +37,93 @@ class QueryResponse:
class Query:
"""Provides interface for user generated queries"""
def __init__(self, session, element_class):
def __init__(self, session, translator):
self._session = session
self._engine = session.engine
self._element_class = element_class
self._loop = self._session._loop
if element_class.__type__ == 'vertex':
self._traversal = self.session.traversal.g.V().hasLabel(
element_class.__mapping__.label)
self._mapper = mapper.map_vertex_to_ogm
elif element_class.__type__ == 'edge':
self._traversal = self.session.traversal.g.E().hasLabel(
element_class.__mapping__.label)
self._mapper = mapper.map_edge_to_ogm
else:
raise Exception("unknown element type")
self._traversal_source = gremlin_python.PythonGraphTraversalSource(
translator)
self._binding = 0
@property
def session(self):
return self._session
@property
def g(self):
return self.traversal_source
@property
def traversal_source(self):
return self._traversal_source
# Generative query methods...
def filter(self, **kwargs):
"""Add a filter to the query"""
raise NotImplementedError
# Methods that issue a query
async def all(self):
def traversal(self, element_class):
pass
# Methods that issue a traversal query to server
async def _all(self, traversal, element_class):
"""Get all results generated by query"""
async_iter = await self.session.execute_traversal(self._traversal)
async_iter = await self.session.execute_traversal(traversal)
response_queue = asyncio.Queue(loop=self._loop)
self._loop.create_task(self._receive(async_iter, response_queue))
self._loop.create_task(
self._receive(async_iter, response_queue, element_class))
return QueryResponse(response_queue)
async def _receive(self, async_iter, response_queue):
async def _receive(self, async_iter, response_queue, element_class):
async for msg in async_iter:
results = msg.data
for result in results:
current = self.session.current.get(result['id'], None)
if not current:
current = self._element_class()
element = self._mapper(result, current,
current.__mapping__)
current = element_class()
element = self._mapper(result, current, current.__mapping__)
response_queue.put_nowait(element)
response_queue.put_nowait(None)
# Common CRUD methods that generate traversals
def remove_vertex(self, element):
return self.g.V(element.id).drop()
def remove_edge(self, element):
return self.g.E(element.id).drop()
def get_vertex_by_id(self, element):
return self.g.V(element.id)
def get_edge_by_id(self, element):
return self.g.E(element.id)
def add_vertex(self, element):
props = mapper.map_props_to_db(element, element.__mapping__)
traversal = self.g.addV(element.__mapping__.label)
return self._add_properties(traversal, props)
def add_edge(self, element):
props = mapper.map_props_to_db(element, element.__mapping__)
traversal = self.g.V(element.source.id)
traversal = traversal.addE(element.__mapping__._label)
traversal = traversal.to(self.g.V(element.target.id))
return self._add_properties(traversal, props)
def update_vertex(self, element):
props = mapper.map_props_to_db(element, element.__mapping__)
traversal = self.g.V(element.id)
return self._add_properties(traversal, props)
def update_edge(self, element):
props = mapper.map_props_to_db(element, element.__mapping__)
traversal = self.g.E(element.id)
return self._add_properties(traversal, props)
def _add_properties(self, traversal, props):
for k, v in props:
if v:
traversal = traversal.property(
('k' + str(self._binding), k),
('v' + str(self._binding), v))
self._binding += 1
self._binding = 0
return traversal
......@@ -2,91 +2,13 @@
import collections
import logging
from goblin import abc
from goblin import gremlin_python
from goblin import driver
from goblin import mapper
from goblin import meta
from goblin import traversal
from goblin import query
logger = logging.getLogger(__name__)
# Constructor API
async def create_engine(url,
loop,
maxsize=256,
force_close=False,
force_release=True):
"""Constructor function for :py:class:`Engine`. Connects to database
and builds a dictionary of relevant vendor implmentation features"""
features = {}
# This will be some kind of manager client etc.
conn = await driver.GremlinServer.open(url, loop)
# Propbably just use a parser to parse the whole feature list
stream = await conn.submit(
'graph.features().graph().supportsComputer()')
msg = await stream.fetch_data()
features['computer'] = msg.data[0]
stream = await conn.submit(
'graph.features().graph().supportsTransactions()')
msg = await stream.fetch_data()
features['transactions'] = msg.data[0]
stream = await conn.submit(
'graph.features().graph().supportsPersistence()')
msg = await stream.fetch_data()
features['persistence'] = msg.data[0]
stream = await conn.submit(
'graph.features().graph().supportsConcurrentAccess()')
msg = await stream.fetch_data()
features['concurrent_access'] = msg.data[0]
stream = await conn.submit(
'graph.features().graph().supportsThreadedTransactions()')
msg = await stream.fetch_data()
features['threaded_transactions'] = msg.data[0]
return Engine(url, conn, loop, **features)
# Main API classes
class Engine(driver.AbstractConnection):
"""Class used to encapsulate database connection configuration and generate
database connections. Used as a factory to create :py:class:`Session`
objects. More config coming soon."""
def __init__(self, url, conn, loop, *, force_close=True, **features):
self._url = url
self._conn = conn
self._loop = loop
self._force_close = force_close
self._features = features
self._translator = gremlin_python.GroovyTranslator('g')
@property
def translator(self):
return self._translator
@property
def url(self):
return self._url
@property
def conn(self):
return self._conn
def session(self, *, use_session=False):
return Session(self, use_session=use_session)
async def submit(self, query, *, bindings=None, session=None):
return await self._conn.submit(query, bindings=bindings)
async def close(self):
await self.conn.close()
self._conn = None
class Session:
"""Provides the main API for interacting with the database. Does not
necessarily correpsond to a database session."""
......@@ -96,7 +18,7 @@ class Session:
self._loop = self._engine._loop
self._use_session = False
self._session = None
self._traversal = traversal.TraversalSource(self.engine.translator)
self._query = query.Query(self, self.engine.translator)
self._pending = collections.deque()
self._current = {}
......@@ -105,16 +27,13 @@ class Session:
return self._engine
@property
def traversal(self):
return self._traversal
def query(self):
return self._query
@property
def current(self):
return self._current
def query(self, element_class):
return query.Query(self, element_class)
def add(self, *elements):
for elem in elements:
self._pending.append(elem)
......@@ -135,9 +54,9 @@ class Session:
async def save_vertex(self, element):
result = await self._save_element(element,
self.traversal.get_vertex_by_id,
self.traversal.add_vertex,
self.traversal.update_vertex,
self.query.get_vertex_by_id,
self.query.add_vertex,
self.query.update_vertex,
mapper.map_vertex_to_ogm)
self.current[result.id] = result
return result
......@@ -146,9 +65,9 @@ class Session:
if not (hasattr(element, 'source') and hasattr(element, 'target')):
raise Exception("Edges require source/target vetices")
result = await self._save_element(element,
self.traversal.get_edge_by_id,
self.traversal.add_edge,
self.traversal.update_edge,
self.query.get_edge_by_id,
self.query.add_edge,
self.query.update_edge,
mapper.map_edge_to_ogm)
self.current[result.id] = result
return result
......@@ -174,12 +93,12 @@ class Session:
return mapper_func(result.data[0], element, element.__mapping__)
async def remove_vertex(self, element):
traversal = self.traversal.remove_vertex(element)
traversal = self.query.remove_vertex(element)
result = await self._remove_element(element, traversal)
return result
async def remove_edge(self, element):
traversal = self.traversal.remove_edge(element)
traversal = self.query.remove_edge(element)
result = await self._remove_element(element, traversal)
return result
......@@ -190,7 +109,7 @@ class Session:
return result
async def get_vertex(self, element):
traversal = self.traversal.get_vertex_by_id(element)
traversal = self.query.get_vertex_by_id(element)
stream = await self.execute_traversal(traversal)
result = await stream.fetch_data()
if result.data:
......@@ -199,7 +118,7 @@ class Session:
return vertex
async def get_edge(self, element):
traversal = self.traversal.get_edge_by_id(element)
traversal = self.query.get_edge_by_id(element)
stream = await self.execute_traversal(traversal)
result = await stream.fetch_data()
if result.data:
......@@ -208,6 +127,7 @@ class Session:
return vertex
async def execute_traversal(self, traversal):
# Move parsing to query
script, bindings = query.parse_traversal(traversal)
if self.engine._features['transactions'] and not self._use_session():
script = self._wrap_in_tx(script)
......@@ -229,103 +149,3 @@ class Session:
async def rollback(self):
raise NotImplementedError
class Vertex(meta.Element):
"""Base class for user defined Vertex classes"""
pass
class Edge(meta.Element):
"""Base class for user defined Edge classes"""
def __init__(self, source=None, target=None):
if source:
self._source = source
if target:
self._target = target
def getsource(self):
return self._source
def setsource(self, val):
assert isinstance(val, Vertex) or val is None
self._source = val
def delsource(self):
del self._source
source = property(getsource, setsource, delsource)
def gettarget(self):
return self._target
def settarget(self, val):
assert isinstance(val, Vertex) or val is None
self._target = val
def deltarget(self):
del self._target
target = property(gettarget, settarget, deltarget)
class VertexPropertyDescriptor:
"""Descriptor that validates user property input and gets/sets properties
as instance attributes."""
def __init__(self, name, vertex_property):
self._name = '_' + name
self._vertex_property = vertex_property.__class__
self._data_type = vertex_property.data_type
self._default = vertex_property.default
def __get__(self, obj, objtype):
if obj is None:
return self._vertex_property
default = self._default
if default:
default = self._data_type.validate(default)
default = self._vertex_property(self._default)
return getattr(obj, self._name, default)
def __set__(self, obj, val):
if isinstance(val, (list, tuple , set)):
vertex_property = []
for v in val:
v = self._data_type.validate(v)
vertex_property.append(
self._vertex_property(self._data_type, value=v))
else:
val = self._data_type.validate(val)
vertex_property = self._vertex_property(self._data_type, value=val)
setattr(obj, self._name, vertex_property)
class VertexProperty(meta.Element, abc.BaseProperty):
__descriptor__ = VertexPropertyDescriptor
def __init__(self, data_type, *, value=None, default=None):
if isinstance(data_type, type):
data_type = data_type()
self._data_type = data_type
self._value = value
self._default = default
@property
def default(self):
self._default
@property
def data_type(self):
return self._data_type
@property
def value(self):
return self._value
def __repr__(self):
return '<{}(type={}, value={})'.format(self.__class__.__name__,
self._data_type, self.value)
"""Class used to produce traversals"""
from goblin import gremlin_python
from goblin import mapper
class TraversalSource:
"""A wrapper for :py:class:gremlin_python.PythonGraphTraversalSource that
generates commonly used traversals"""
def __init__(self, translator):
self._traversal_source = gremlin_python.PythonGraphTraversalSource(
translator)
self._binding = 0
@property
def g(self):
return self.traversal_source
@property
def traversal_source(self):
return self._traversal_source
def remove_vertex(self, element):
return self.g.V(element.id).drop()
def remove_edge(self, element):
return self.g.E(element.id).drop()
def get_vertex_by_id(self, element):
return self.g.V(element.id)
def get_edge_by_id(self, element):
return self.g.E(element.id)
def add_vertex(self, element):
props = mapper.map_props_to_db(element, element.__mapping__)
traversal = self.g.addV(element.__mapping__.label)
return self._add_properties(traversal, props)
def add_edge(self, element):
props = mapper.map_props_to_db(element, element.__mapping__)
traversal = self.g.V(element.source.id)
traversal = traversal.addE(element.__mapping__._label)
traversal = traversal.to(self.g.V(element.target.id))
return self._add_properties(traversal, props)
def update_vertex(self, element):
props = mapper.map_props_to_db(element, element.__mapping__)
traversal = self.g.V(element.id)
return self._add_properties(traversal, props)
def update_edge(self, element):
props = mapper.map_props_to_db(element, element.__mapping__)
traversal = self.g.E(element.id)
return self._add_properties(traversal, props)
def _add_properties(self, traversal, props):
for k, v in props:
if v:
traversal = traversal.property(
('k' + str(self._binding), k),
('v' + str(self._binding), v))
self._binding += 1
self._binding = 0
return traversal
import asyncio
import unittest
from goblin.api import create_engine, Vertex, Edge
from goblin.engine import create_engine
from goblin.element import Vertex, Edge, VertexProperty
from goblin.properties import Property, String
......@@ -125,28 +126,28 @@ class TestEngine(unittest.TestCase):
self.loop.run_until_complete(go())
def test_query_all(self):
async def go():
engine = await create_engine("http://localhost:8182/", self.loop)
session = engine.session()
leif = TestVertex()
leif.name = 'leifur'
jon = TestVertex()
jon.name = 'jonathan'
session.add(leif, jon)
await session.flush()
results = []
stream = await session.query(TestVertex).all()
async for msg in stream:
results.append(msg)
print(len(results))
self.assertEqual(len(session.current), 2)
for result in results:
self.assertIsInstance(result, Vertex)
await engine.close()
self.loop.run_until_complete(go())
# def test_query_all(self):
#
# async def go():
# engine = await create_engine("http://localhost:8182/", self.loop)
# session = engine.session()
# leif = TestVertex()
# leif.name = 'leifur'
# jon = TestVertex()
# jon.name = 'jonathan'
# session.add(leif, jon)
# await session.flush()
# results = []
# stream = await session.query(TestVertex).all()
# async for msg in stream:
# results.append(msg)
# print(len(results))
# self.assertEqual(len(session.current), 2)
# for result in results:
# self.assertIsInstance(result, Vertex)
# await engine.close()
#
# # self.loop.run_until_complete(go())
def test_remove_vertex(self):
......
import asyncio
import unittest
from goblin.api import create_engine, Vertex, Edge, VertexProperty
from goblin.engine import create_engine
from goblin.element import Vertex, Edge, VertexProperty
from goblin.properties import Property, String
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment