Commit 1377cf54 authored by Jeffrey Phillips Freeman's avatar Jeffrey Phillips Freeman 💥
Browse files

Fixed aiogoblin to work with latest version of tinkerpop.

parent 35286c4d
Pipeline #1653 failed with stage
in 60 minutes and 3 seconds
......@@ -3,8 +3,6 @@ from urllib.parse import urlparse
from aiogremlin.driver.cluster import Cluster
from gremlin_python.driver import serializer
from aiogremlin.remote.driver_remote_side_effects import (
AsyncRemoteTraversalSideEffects)
from gremlin_python.driver.remote_connection import RemoteTraversal
......@@ -91,9 +89,9 @@ class DriverRemoteConnection:
async def submit(self, bytecode):
"""Submit bytecode to the Gremlin Server"""
result_set = await self._client.submit(bytecode)
side_effects = AsyncRemoteTraversalSideEffects(result_set.request_id,
self._client)
return RemoteTraversal(result_set, side_effects)
results_all = await result_set.all()
results = results_all.result()
return RemoteTraversal(iter(results))
async def __aenter__(self):
return self
......
from gremlin_python.driver import request
from gremlin_python.process import traversal
class AsyncRemoteTraversalSideEffects(traversal.TraversalSideEffects):
def __init__(self, side_effect, client):
self._side_effect = side_effect
self._client = client
self._keys = set()
self._side_effects = {}
self._closed = False
async def __getitem__(self, key):
if isinstance(key, slice):
raise TypeError(
'AsyncRemoteTraversalSideEffects does not support slicing')
return await self.get(key)
async def keys(self):
"""Get side effect keys associated with Traversal"""
if not self._closed:
message = request.RequestMessage(
'traversal', 'keys',
{'sideEffect': self._side_effect,
'aliases': self._client.aliases})
result_set = await self._client.submit(message)
results = await result_set.all()
self._keys = set(results)
return self._keys
async def get(self, key):
"""Get side effects associated with a specific key"""
if not self._side_effects.get(key):
if not self._closed:
results = await self._get(key)
self._side_effects[key] = results
self._keys.add(key)
else:
return None
return self._side_effects[key]
async def _get(self, key):
message = request.RequestMessage(
'traversal', 'gather',
{'sideEffect': self._side_effect, 'sideEffectKey': key,
'aliases': self._client.aliases})
result_set = await self._client.submit(message)
return await self._aggregate_results(result_set)
async def close(self):
"""Release side effects"""
if not self._closed:
message = request.RequestMessage(
'traversal', 'close',
{'sideEffect': self._side_effect,
'aliases': {'g': self._client.aliases}})
result_set = await self._client.submit(message)
self._closed = True
return await result_set.one()
async def _aggregate_results(self, result_set):
aggregates = {'list': [], 'set': set(), 'map': {}, 'bulkset': {},
'none': None}
results = None
async for msg in result_set:
if results is None:
aggregate_to = result_set.aggregate_to
results = aggregates.get(aggregate_to, [])
# on first message, get the right result data structure
# if there is no update to a structure, then the item is the result
if results is None:
results = msg
# updating a map is different than a list or a set
elif isinstance(results, dict):
if aggregate_to == "map":
results.update(msg)
else:
results[msg.object] = msg.bulk
elif isinstance(results, set):
results.update(msg)
# flat add list to result list
else:
results.append(msg)
if results is None:
results = []
return results
# from gremlin_python.driver import request
# from gremlin_python.process import traversal
#
#
#
# class AsyncRemoteTraversalSideEffects(traversal.TraversalSideEffects):
# def __init__(self, side_effect, client):
# self._side_effect = side_effect
# self._client = client
# self._keys = set()
# self._side_effects = {}
# self._closed = False
#
# async def __getitem__(self, key):
# if isinstance(key, slice):
# raise TypeError(
# 'AsyncRemoteTraversalSideEffects does not support slicing')
# return await self.get(key)
#
# async def keys(self):
# """Get side effect keys associated with Traversal"""
# if not self._closed:
# message = request.RequestMessage(
# 'traversal', 'keys',
# {'sideEffect': self._side_effect,
# 'aliases': self._client.aliases})
# result_set = await self._client.submit(message)
# results = await result_set.all()
# self._keys = set(results)
# return self._keys
#
# async def get(self, key):
# """Get side effects associated with a specific key"""
# if not self._side_effects.get(key):
# if not self._closed:
# results = await self._get(key)
# self._side_effects[key] = results
# self._keys.add(key)
# else:
# return None
# return self._side_effects[key]
#
# async def _get(self, key):
# message = request.RequestMessage(
# 'traversal', 'gather',
# {'sideEffect': self._side_effect, 'sideEffectKey': key,
# 'aliases': self._client.aliases})
# result_set = await self._client.submit(message)
# return await self._aggregate_results(result_set)
#
# async def close(self):
# """Release side effects"""
# if not self._closed:
# message = request.RequestMessage(
# 'traversal', 'close',
# {'sideEffect': self._side_effect,
# 'aliases': {'g': self._client.aliases}})
# result_set = await self._client.submit(message)
# self._closed = True
# return await result_set.one()
#
# async def _aggregate_results(self, result_set):
# aggregates = {'list': [], 'set': set(), 'map': {}, 'bulkset': {},
# 'none': None}
# results = None
# async for msg in result_set:
# if results is None:
# aggregate_to = result_set.aggregate_to
# results = aggregates.get(aggregate_to, [])
# # on first message, get the right result data structure
# # if there is no update to a structure, then the item is the result
# if results is None:
# results = msg
# # updating a map is different than a list or a set
# elif isinstance(results, dict):
# if aggregate_to == "map":
# results.update(msg)
# else:
# results[msg.object] = msg.bulk
# elif isinstance(results, set):
# results.update(msg)
# # flat add list to result list
# else:
# results.append(msg)
# if results is None:
# results = []
# return results
[pytest]
asyncio_mode = auto
gremlinpython==3.4.3
gremlinpython==3.5.2
PyYAML==5.3
six==1.14.0
aenum==2.2.3
aiohttp==3.6.2
inflection==0.3.1
coverage==5.0.3
coverage==6.3.2
#for testing
pytest-asyncio>=0.10.0
pytest-asyncio>=0.18.3
pytest-cache>=1.0
pytest-cov>=2.8.1
pytest-pep8>=1.0.6
......
......@@ -32,11 +32,11 @@ setup(
'aiogremlin.remote'],
python_requires='>=3.5',
install_requires=[
'gremlinpython<=3.4.3',
'gremlinpython>=3.5.2',
'aenum>=1.4.5', # required gremlinpython dep
'aiohttp>=2.2.5',
'PyYAML>=3.12',
'six>=1.10.0', # required gremlinpython dep
'aiohttp>=3.6.2',
'PyYAML>=5.3',
'six>=1.14.0', # required gremlinpython dep
'inflection>=0.3.1'
],
test_suite='tests',
......
......@@ -23,12 +23,43 @@ from aiogremlin import driver
from aiogremlin.driver.provider import TinkerGraph
from gremlin_python.driver import serializer
from aiogremlin.remote.driver_remote_connection import DriverRemoteConnection
import threading
from time import sleep
# def pytest_generate_tests(metafunc):
# if 'cluster' in metafunc.fixturenames:
# metafunc.parametrize("cluster", ['c1', 'c2'], indirect=True)
# @pytest.fixture
# def event_loop():
# try:
# loop = asyncio.get_event_loop()
# except RuntimeError:
# loop = asyncio.new_event_loop()
# asyncio.set_event_loop(loop)
#
# # if not loop.is_running():
# # def run():
# # loop.run_forever()
# #
# # thread = threading.Thread(target=run)
# # thread.daemon = True
# # thread.start()
# #
# # assert loop.is_running()
# yield loop
# loop.stop()
# loop.close()
# def pytest_sessionfinish(session, exitstatus):
# loop = asyncio.get_event_loop()
# loop.stop()
# while loop.is_running():
# sleep(0.01)
# asyncio.get_event_loop().close()
def pytest_addoption(parser):
parser.addoption('--provider', default='tinkergraph',
......@@ -87,8 +118,15 @@ def gremlin_url(gremlin_host, gremlin_port):
@pytest.fixture
def connection(gremlin_url, event_loop, provider):
#assert event_loop.is_running()
try:
conn = event_loop.run_until_complete(
# conn = event_loop.run_until_complete(
# driver.Connection.open(
# gremlin_url, event_loop,
# message_serializer=serializer.GraphSONMessageSerializer,
# provider=provider
# ))
conn = event_loop.run(
driver.Connection.open(
gremlin_url, event_loop,
message_serializer=serializer.GraphSONMessageSerializer,
......@@ -141,7 +179,9 @@ def cluster_class(event_loop):
@pytest.fixture
def remote_connection(event_loop, gremlin_url):
try:
remote_conn = event_loop.run_until_complete(
# remote_conn = event_loop.run_until_complete(
# DriverRemoteConnection.open(gremlin_url, 'g'))
remote_conn = event_loop.run(
DriverRemoteConnection.open(gremlin_url, 'g'))
except OSError:
pytest.skip('Gremlin Server is not running')
......@@ -168,6 +208,7 @@ def run_around_tests(remote_connection, event_loop):
created3 = await g.addE("created").from_(person3).to(software1).property("weight", 1.0).property(T.id, 11).next()
created4 = await g.addE("created").from_(person4).to(software1).property("weight", 0.2).property(T.id, 12).next()
event_loop.run_until_complete(create_graph())
#event_loop.run_until_complete(create_graph())
event_loop.run(create_graph())
yield
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment