Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • goblin-ogm/aiogremlin
  • grzn/aiogremlin
2 results
Show changes
Commits on Source (2)
......@@ -3,8 +3,6 @@ from urllib.parse import urlparse
from aiogremlin.driver.cluster import Cluster
from gremlin_python.driver import serializer
from aiogremlin.remote.driver_remote_side_effects import (
AsyncRemoteTraversalSideEffects)
from gremlin_python.driver.remote_connection import RemoteTraversal
......@@ -91,9 +89,9 @@ class DriverRemoteConnection:
async def submit(self, bytecode):
"""Submit bytecode to the Gremlin Server"""
result_set = await self._client.submit(bytecode)
side_effects = AsyncRemoteTraversalSideEffects(result_set.request_id,
self._client)
return RemoteTraversal(result_set, side_effects)
results_all = await result_set.all()
results = results_all.result()
return RemoteTraversal(iter(results))
async def __aenter__(self):
return self
......
from gremlin_python.driver import request
from gremlin_python.process import traversal
class AsyncRemoteTraversalSideEffects(traversal.TraversalSideEffects):
def __init__(self, side_effect, client):
self._side_effect = side_effect
self._client = client
self._keys = set()
self._side_effects = {}
self._closed = False
async def __getitem__(self, key):
if isinstance(key, slice):
raise TypeError(
'AsyncRemoteTraversalSideEffects does not support slicing')
return await self.get(key)
async def keys(self):
"""Get side effect keys associated with Traversal"""
if not self._closed:
message = request.RequestMessage(
'traversal', 'keys',
{'sideEffect': self._side_effect,
'aliases': self._client.aliases})
result_set = await self._client.submit(message)
results = await result_set.all()
self._keys = set(results)
return self._keys
async def get(self, key):
"""Get side effects associated with a specific key"""
if not self._side_effects.get(key):
if not self._closed:
results = await self._get(key)
self._side_effects[key] = results
self._keys.add(key)
else:
return None
return self._side_effects[key]
async def _get(self, key):
message = request.RequestMessage(
'traversal', 'gather',
{'sideEffect': self._side_effect, 'sideEffectKey': key,
'aliases': self._client.aliases})
result_set = await self._client.submit(message)
return await self._aggregate_results(result_set)
async def close(self):
"""Release side effects"""
if not self._closed:
message = request.RequestMessage(
'traversal', 'close',
{'sideEffect': self._side_effect,
'aliases': {'g': self._client.aliases}})
result_set = await self._client.submit(message)
self._closed = True
return await result_set.one()
async def _aggregate_results(self, result_set):
aggregates = {'list': [], 'set': set(), 'map': {}, 'bulkset': {},
'none': None}
results = None
async for msg in result_set:
if results is None:
aggregate_to = result_set.aggregate_to
results = aggregates.get(aggregate_to, [])
# on first message, get the right result data structure
# if there is no update to a structure, then the item is the result
if results is None:
results = msg
# updating a map is different than a list or a set
elif isinstance(results, dict):
if aggregate_to == "map":
results.update(msg)
else:
results[msg.object] = msg.bulk
elif isinstance(results, set):
results.update(msg)
# flat add list to result list
else:
results.append(msg)
if results is None:
results = []
return results
# from gremlin_python.driver import request
# from gremlin_python.process import traversal
#
#
#
# class AsyncRemoteTraversalSideEffects(traversal.TraversalSideEffects):
# def __init__(self, side_effect, client):
# self._side_effect = side_effect
# self._client = client
# self._keys = set()
# self._side_effects = {}
# self._closed = False
#
# async def __getitem__(self, key):
# if isinstance(key, slice):
# raise TypeError(
# 'AsyncRemoteTraversalSideEffects does not support slicing')
# return await self.get(key)
#
# async def keys(self):
# """Get side effect keys associated with Traversal"""
# if not self._closed:
# message = request.RequestMessage(
# 'traversal', 'keys',
# {'sideEffect': self._side_effect,
# 'aliases': self._client.aliases})
# result_set = await self._client.submit(message)
# results = await result_set.all()
# self._keys = set(results)
# return self._keys
#
# async def get(self, key):
# """Get side effects associated with a specific key"""
# if not self._side_effects.get(key):
# if not self._closed:
# results = await self._get(key)
# self._side_effects[key] = results
# self._keys.add(key)
# else:
# return None
# return self._side_effects[key]
#
# async def _get(self, key):
# message = request.RequestMessage(
# 'traversal', 'gather',
# {'sideEffect': self._side_effect, 'sideEffectKey': key,
# 'aliases': self._client.aliases})
# result_set = await self._client.submit(message)
# return await self._aggregate_results(result_set)
#
# async def close(self):
# """Release side effects"""
# if not self._closed:
# message = request.RequestMessage(
# 'traversal', 'close',
# {'sideEffect': self._side_effect,
# 'aliases': {'g': self._client.aliases}})
# result_set = await self._client.submit(message)
# self._closed = True
# return await result_set.one()
#
# async def _aggregate_results(self, result_set):
# aggregates = {'list': [], 'set': set(), 'map': {}, 'bulkset': {},
# 'none': None}
# results = None
# async for msg in result_set:
# if results is None:
# aggregate_to = result_set.aggregate_to
# results = aggregates.get(aggregate_to, [])
# # on first message, get the right result data structure
# # if there is no update to a structure, then the item is the result
# if results is None:
# results = msg
# # updating a map is different than a list or a set
# elif isinstance(results, dict):
# if aggregate_to == "map":
# results.update(msg)
# else:
# results[msg.object] = msg.bulk
# elif isinstance(results, set):
# results.update(msg)
# # flat add list to result list
# else:
# results.append(msg)
# if results is None:
# results = []
# return results
[pytest]
asyncio_mode = auto
gremlinpython==3.4.3
gremlinpython==3.5.2
PyYAML==5.3
six==1.14.0
aenum==2.2.3
aiohttp==3.6.2
inflection==0.3.1
coverage==5.0.3
coverage==6.3.2
#for testing
pytest-asyncio>=0.10.0
pytest-asyncio>=0.18.3
pytest-cache>=1.0
pytest-cov>=2.8.1
pytest-pep8>=1.0.6
......
......@@ -32,11 +32,11 @@ setup(
'aiogremlin.remote'],
python_requires='>=3.5',
install_requires=[
'gremlinpython<=3.4.3',
'gremlinpython>=3.5.2',
'aenum>=1.4.5', # required gremlinpython dep
'aiohttp>=2.2.5',
'PyYAML>=3.12',
'six>=1.10.0', # required gremlinpython dep
'aiohttp<4.0.0',
'PyYAML>=5.3',
'six>=1.14.0', # required gremlinpython dep
'inflection>=0.3.1'
],
test_suite='tests',
......
......@@ -23,12 +23,43 @@ from aiogremlin import driver
from aiogremlin.driver.provider import TinkerGraph
from gremlin_python.driver import serializer
from aiogremlin.remote.driver_remote_connection import DriverRemoteConnection
import threading
from time import sleep
# def pytest_generate_tests(metafunc):
# if 'cluster' in metafunc.fixturenames:
# metafunc.parametrize("cluster", ['c1', 'c2'], indirect=True)
# @pytest.fixture
# def event_loop():
# try:
# loop = asyncio.get_event_loop()
# except RuntimeError:
# loop = asyncio.new_event_loop()
# asyncio.set_event_loop(loop)
#
# # if not loop.is_running():
# # def run():
# # loop.run_forever()
# #
# # thread = threading.Thread(target=run)
# # thread.daemon = True
# # thread.start()
# #
# # assert loop.is_running()
# yield loop
# loop.stop()
# loop.close()
# def pytest_sessionfinish(session, exitstatus):
# loop = asyncio.get_event_loop()
# loop.stop()
# while loop.is_running():
# sleep(0.01)
# asyncio.get_event_loop().close()
def pytest_addoption(parser):
parser.addoption('--provider', default='tinkergraph',
......@@ -87,8 +118,15 @@ def gremlin_url(gremlin_host, gremlin_port):
@pytest.fixture
def connection(gremlin_url, event_loop, provider):
#assert event_loop.is_running()
try:
conn = event_loop.run_until_complete(
# conn = event_loop.run_until_complete(
# driver.Connection.open(
# gremlin_url, event_loop,
# message_serializer=serializer.GraphSONMessageSerializer,
# provider=provider
# ))
conn = event_loop.run(
driver.Connection.open(
gremlin_url, event_loop,
message_serializer=serializer.GraphSONMessageSerializer,
......@@ -141,7 +179,9 @@ def cluster_class(event_loop):
@pytest.fixture
def remote_connection(event_loop, gremlin_url):
try:
remote_conn = event_loop.run_until_complete(
# remote_conn = event_loop.run_until_complete(
# DriverRemoteConnection.open(gremlin_url, 'g'))
remote_conn = event_loop.run(
DriverRemoteConnection.open(gremlin_url, 'g'))
except OSError:
pytest.skip('Gremlin Server is not running')
......@@ -168,6 +208,7 @@ def run_around_tests(remote_connection, event_loop):
created3 = await g.addE("created").from_(person3).to(software1).property("weight", 1.0).property(T.id, 11).next()
created4 = await g.addE("created").from_(person4).to(software1).property("weight", 0.2).property(T.id, 12).next()
event_loop.run_until_complete(create_graph())
#event_loop.run_until_complete(create_graph())
event_loop.run(create_graph())
yield