diff --git a/aiogremlin/remote/driver_remote_connection.py b/aiogremlin/remote/driver_remote_connection.py index 9859d7d69b08bb0b472b071c47320d426a2046fc..1a8d31676b2b588ace2ceef90c119b22751f8a5f 100644 --- a/aiogremlin/remote/driver_remote_connection.py +++ b/aiogremlin/remote/driver_remote_connection.py @@ -3,8 +3,6 @@ from urllib.parse import urlparse from aiogremlin.driver.cluster import Cluster from gremlin_python.driver import serializer -from aiogremlin.remote.driver_remote_side_effects import ( - AsyncRemoteTraversalSideEffects) from gremlin_python.driver.remote_connection import RemoteTraversal @@ -91,9 +89,9 @@ class DriverRemoteConnection: async def submit(self, bytecode): """Submit bytecode to the Gremlin Server""" result_set = await self._client.submit(bytecode) - side_effects = AsyncRemoteTraversalSideEffects(result_set.request_id, - self._client) - return RemoteTraversal(result_set, side_effects) + results_all = await result_set.all() + results = results_all.result() + return RemoteTraversal(iter(results)) async def __aenter__(self): return self diff --git a/aiogremlin/remote/driver_remote_side_effects.py b/aiogremlin/remote/driver_remote_side_effects.py index 22e59f1be9aef0a02fd7546c351cae33d225b66c..481960478798854aaa9042b1c12ce96887f7b11a 100644 --- a/aiogremlin/remote/driver_remote_side_effects.py +++ b/aiogremlin/remote/driver_remote_side_effects.py @@ -1,87 +1,87 @@ -from gremlin_python.driver import request -from gremlin_python.process import traversal - - - -class AsyncRemoteTraversalSideEffects(traversal.TraversalSideEffects): - def __init__(self, side_effect, client): - self._side_effect = side_effect - self._client = client - self._keys = set() - self._side_effects = {} - self._closed = False - - async def __getitem__(self, key): - if isinstance(key, slice): - raise TypeError( - 'AsyncRemoteTraversalSideEffects does not support slicing') - return await self.get(key) - - async def keys(self): - """Get side effect keys associated with Traversal""" - if not self._closed: - message = request.RequestMessage( - 'traversal', 'keys', - {'sideEffect': self._side_effect, - 'aliases': self._client.aliases}) - result_set = await self._client.submit(message) - results = await result_set.all() - self._keys = set(results) - return self._keys - - async def get(self, key): - """Get side effects associated with a specific key""" - if not self._side_effects.get(key): - if not self._closed: - results = await self._get(key) - self._side_effects[key] = results - self._keys.add(key) - else: - return None - return self._side_effects[key] - - async def _get(self, key): - message = request.RequestMessage( - 'traversal', 'gather', - {'sideEffect': self._side_effect, 'sideEffectKey': key, - 'aliases': self._client.aliases}) - result_set = await self._client.submit(message) - return await self._aggregate_results(result_set) - - async def close(self): - """Release side effects""" - if not self._closed: - message = request.RequestMessage( - 'traversal', 'close', - {'sideEffect': self._side_effect, - 'aliases': {'g': self._client.aliases}}) - result_set = await self._client.submit(message) - self._closed = True - return await result_set.one() - - async def _aggregate_results(self, result_set): - aggregates = {'list': [], 'set': set(), 'map': {}, 'bulkset': {}, - 'none': None} - results = None - async for msg in result_set: - if results is None: - aggregate_to = result_set.aggregate_to - results = aggregates.get(aggregate_to, []) - # on first message, get the right result data structure - # if there is no update to a structure, then the item is the result - if results is None: - results = msg - # updating a map is different than a list or a set - elif isinstance(results, dict): - if aggregate_to == "map": - results.update(msg) - else: - results[msg.object] = msg.bulk - elif isinstance(results, set): - results.update(msg) - # flat add list to result list - else: - results.append(msg) - if results is None: - results = [] - return results +# from gremlin_python.driver import request +# from gremlin_python.process import traversal +# +# +# +# class AsyncRemoteTraversalSideEffects(traversal.TraversalSideEffects): +# def __init__(self, side_effect, client): +# self._side_effect = side_effect +# self._client = client +# self._keys = set() +# self._side_effects = {} +# self._closed = False +# +# async def __getitem__(self, key): +# if isinstance(key, slice): +# raise TypeError( +# 'AsyncRemoteTraversalSideEffects does not support slicing') +# return await self.get(key) +# +# async def keys(self): +# """Get side effect keys associated with Traversal""" +# if not self._closed: +# message = request.RequestMessage( +# 'traversal', 'keys', +# {'sideEffect': self._side_effect, +# 'aliases': self._client.aliases}) +# result_set = await self._client.submit(message) +# results = await result_set.all() +# self._keys = set(results) +# return self._keys +# +# async def get(self, key): +# """Get side effects associated with a specific key""" +# if not self._side_effects.get(key): +# if not self._closed: +# results = await self._get(key) +# self._side_effects[key] = results +# self._keys.add(key) +# else: +# return None +# return self._side_effects[key] +# +# async def _get(self, key): +# message = request.RequestMessage( +# 'traversal', 'gather', +# {'sideEffect': self._side_effect, 'sideEffectKey': key, +# 'aliases': self._client.aliases}) +# result_set = await self._client.submit(message) +# return await self._aggregate_results(result_set) +# +# async def close(self): +# """Release side effects""" +# if not self._closed: +# message = request.RequestMessage( +# 'traversal', 'close', +# {'sideEffect': self._side_effect, +# 'aliases': {'g': self._client.aliases}}) +# result_set = await self._client.submit(message) +# self._closed = True +# return await result_set.one() +# +# async def _aggregate_results(self, result_set): +# aggregates = {'list': [], 'set': set(), 'map': {}, 'bulkset': {}, +# 'none': None} +# results = None +# async for msg in result_set: +# if results is None: +# aggregate_to = result_set.aggregate_to +# results = aggregates.get(aggregate_to, []) +# # on first message, get the right result data structure +# # if there is no update to a structure, then the item is the result +# if results is None: +# results = msg +# # updating a map is different than a list or a set +# elif isinstance(results, dict): +# if aggregate_to == "map": +# results.update(msg) +# else: +# results[msg.object] = msg.bulk +# elif isinstance(results, set): +# results.update(msg) +# # flat add list to result list +# else: +# results.append(msg) +# if results is None: +# results = [] +# return results diff --git a/pytest.ini b/pytest.ini new file mode 100644 index 0000000000000000000000000000000000000000..2f4c80e307508cb30bd6635716963f88f6796738 --- /dev/null +++ b/pytest.ini @@ -0,0 +1,2 @@ +[pytest] +asyncio_mode = auto diff --git a/requirements.txt b/requirements.txt index 0e326fe7d1fa85cc967a991bae3dde96e8045297..5275e0a3f7e43bdfb74fcd1852efad41fe77c153 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,13 +1,13 @@ -gremlinpython==3.4.3 +gremlinpython==3.5.2 PyYAML==5.3 six==1.14.0 aenum==2.2.3 aiohttp==3.6.2 inflection==0.3.1 -coverage==5.0.3 +coverage==6.3.2 #for testing -pytest-asyncio>=0.10.0 +pytest-asyncio>=0.18.3 pytest-cache>=1.0 pytest-cov>=2.8.1 pytest-pep8>=1.0.6 diff --git a/setup.py b/setup.py index 8d892d52ca21e43d794795c628a55053250494fb..464ad55e456bf7e4ee97bdd51be960e0974d21a0 100644 --- a/setup.py +++ b/setup.py @@ -32,11 +32,11 @@ setup( 'aiogremlin.remote'], python_requires='>=3.5', install_requires=[ - 'gremlinpython<=3.4.3', + 'gremlinpython>=3.5.2', 'aenum>=1.4.5', # required gremlinpython dep - 'aiohttp>=2.2.5', - 'PyYAML>=3.12', - 'six>=1.10.0', # required gremlinpython dep + 'aiohttp>=3.6.2', + 'PyYAML>=5.3', + 'six>=1.14.0', # required gremlinpython dep 'inflection>=0.3.1' ], test_suite='tests', diff --git a/tests/conftest.py b/tests/conftest.py index 44d1240eef8a3ea3e7899d4b1df3f81f77c9581d..dd00e0a5075b0bcda530a5206e35885d918559b1 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -23,12 +23,43 @@ from aiogremlin import driver from aiogremlin.driver.provider import TinkerGraph from gremlin_python.driver import serializer from aiogremlin.remote.driver_remote_connection import DriverRemoteConnection +import threading +from time import sleep # def pytest_generate_tests(metafunc): # if 'cluster' in metafunc.fixturenames: # metafunc.parametrize("cluster", ['c1', 'c2'], indirect=True) +# @pytest.fixture +# def event_loop(): +# try: +# loop = asyncio.get_event_loop() +# except RuntimeError: +# loop = asyncio.new_event_loop() +# asyncio.set_event_loop(loop) +# +# # if not loop.is_running(): +# # def run(): +# # loop.run_forever() +# # +# # thread = threading.Thread(target=run) +# # thread.daemon = True +# # thread.start() +# # +# # assert loop.is_running() +# yield loop +# loop.stop() +# loop.close() + + +# def pytest_sessionfinish(session, exitstatus): +# loop = asyncio.get_event_loop() +# loop.stop() +# while loop.is_running(): +# sleep(0.01) +# asyncio.get_event_loop().close() + def pytest_addoption(parser): parser.addoption('--provider', default='tinkergraph', @@ -87,8 +118,15 @@ def gremlin_url(gremlin_host, gremlin_port): @pytest.fixture def connection(gremlin_url, event_loop, provider): + #assert event_loop.is_running() try: - conn = event_loop.run_until_complete( + # conn = event_loop.run_until_complete( + # driver.Connection.open( + # gremlin_url, event_loop, + # message_serializer=serializer.GraphSONMessageSerializer, + # provider=provider + # )) + conn = event_loop.run( driver.Connection.open( gremlin_url, event_loop, message_serializer=serializer.GraphSONMessageSerializer, @@ -141,7 +179,9 @@ def cluster_class(event_loop): @pytest.fixture def remote_connection(event_loop, gremlin_url): try: - remote_conn = event_loop.run_until_complete( + # remote_conn = event_loop.run_until_complete( + # DriverRemoteConnection.open(gremlin_url, 'g')) + remote_conn = event_loop.run( DriverRemoteConnection.open(gremlin_url, 'g')) except OSError: pytest.skip('Gremlin Server is not running') @@ -168,6 +208,7 @@ def run_around_tests(remote_connection, event_loop): created3 = await g.addE("created").from_(person3).to(software1).property("weight", 1.0).property(T.id, 11).next() created4 = await g.addE("created").from_(person4).to(software1).property("weight", 0.2).property(T.id, 12).next() - event_loop.run_until_complete(create_graph()) + #event_loop.run_until_complete(create_graph()) + event_loop.run(create_graph()) yield