diff --git a/goblin/app.py b/goblin/app.py index 2ba2426943841b249de82f5df8ddd81700b7456d..2e5866599904aa36fbe4505914a69baa6ba03403 100644 --- a/goblin/app.py +++ b/goblin/app.py @@ -1,4 +1,4 @@ -"""Main OGM API classes and constructors""" +"""Goblin application class and class constructor""" import collections import logging @@ -10,7 +10,6 @@ from goblin import session logger = logging.getLogger(__name__) -# Constructor API async def create_app(url, loop, **config): """Constructor function for :py:class:`Engine`. Connects to database and builds a dictionary of relevant vendor implmentation features""" @@ -20,23 +19,23 @@ async def create_app(url, loop, **config): stream = await conn.submit( 'graph.features().graph().supportsComputer()') msg = await stream.fetch_data() - features['computer'] = msg.data[0] + features['computer'] = msg stream = await conn.submit( 'graph.features().graph().supportsTransactions()') msg = await stream.fetch_data() - features['transactions'] = msg.data[0] + features['transactions'] = msg stream = await conn.submit( 'graph.features().graph().supportsPersistence()') msg = await stream.fetch_data() - features['persistence'] = msg.data[0] + features['persistence'] = msg stream = await conn.submit( 'graph.features().graph().supportsConcurrentAccess()') msg = await stream.fetch_data() - features['concurrent_access'] = msg.data[0] + features['concurrent_access'] = msg stream = await conn.submit( 'graph.features().graph().supportsThreadedTransactions()') msg = await stream.fetch_data() - features['threaded_transactions'] = msg.data[0] + features['threaded_transactions'] = msg return App(url, loop, features=features, **config) diff --git a/goblin/driver/connection.py b/goblin/driver/connection.py index 5bf400a01306abc727e4cd3c1320b73e6227ee27..2af1fd95b03d63567e3754a6dbfc78f32fe0b2e7 100644 --- a/goblin/driver/connection.py +++ b/goblin/driver/connection.py @@ -170,15 +170,17 @@ class Connection(AbstractConnection): message["result"]["meta"]) response_queue = self._response_queues[request_id] if message.status_code in [200, 206, 204]: - response_queue.put_nowait(message) + if message.data: + for result in message.data: + response_queue.put_nowait(result) if message.status_code == 206: self._loop.create_task(self.receive()) else: response_queue.put_nowait(None) del self._response_queues[request_id] elif message.status_code == 407: - self._authenticate(self._username, self._password, - self._processor, self._session) + await self._authenticate(self._username, self._password, + self._processor, self._session) self._loop.create_task(self.receive()) else: del self._response_queues[request_id] diff --git a/goblin/session.py b/goblin/session.py index 52f1ce8d3d034b12b88d9215ac516802e2f1c879..a7164229ab136188ad06f9c66b1513a102f19b89 100644 --- a/goblin/session.py +++ b/goblin/session.py @@ -79,26 +79,22 @@ class Session(connection.AbstractConnection): return traversal.TraversalResponse(response_queue) async def _receive(self, async_iter, response_queue): - async for msg in async_iter: - results = msg.data - if results: - for result in results: - current = self.current.get(result['id'], None) - if not current: - element_type = result['type'] - label = result['label'] - if element_type == 'vertex': - current = self.app.vertices.get(label, None) - else: - current = self.app.edges.get(label, None) - if not current: - # build generic element here - pass - else: - current = current() - element = current.__mapping__.mapper_func( - result, current) - response_queue.put_nowait(element) + async for result in async_iter: + current = self.current.get(result['id'], None) + if not current: + element_type = result['type'] + label = result['label'] + if element_type == 'vertex': + current = self.app.vertices.get(label, None) + else: + current = self.app.edges.get(label, None) + if not current: + # build generic element here + pass + else: + current = current() + element = current.__mapping__.mapper_func(result, current) + response_queue.put_nowait(element) response_queue.put_nowait(None) # Creation API @@ -151,10 +147,12 @@ class Session(connection.AbstractConnection): return result async def get_vertex(self, element): - return await self.traversal_factory.get_vertex_by_id(element).one_or_none() + return await self.traversal_factory.get_vertex_by_id( + element).one_or_none() async def get_edge(self, element): - return await self.traversal_factory.get_edge_by_id(element).one_or_none() + return await self.traversal_factory.get_edge_by_id( + element).one_or_none() # Transaction support def tx(self): @@ -177,8 +175,8 @@ class Session(connection.AbstractConnection): stream = await self.conn.submit( repr(traversal), bindings=traversal.bindings) msg = await stream.fetch_data() - if msg.data: - msg = element.__mapping__.mapper_func(msg.data[0], element) + if msg: + msg = element.__mapping__.mapper_func(msg, element) return msg async def _save_element(self, @@ -188,7 +186,7 @@ class Session(connection.AbstractConnection): update_func): if hasattr(element, 'id'): result = await check_func(element) - if not result.data: + if not result: traversal = create_func(element) else: traversal = update_func(element)