diff --git a/goblin/query.py b/goblin/query.py index 1b0fdc1e9bfe8f68108742a1fad8a3af60c07236..03c45378eb9e290165a04b8503122d7fb5999ae3 100644 --- a/goblin/query.py +++ b/goblin/query.py @@ -1,4 +1,5 @@ """Query API and helpers""" +import collections import logging from goblin import mapper @@ -17,25 +18,26 @@ class AsyncQueryResponseIter: def __init__(self, async_iter, query): self._async_iter = async_iter self._query = query + self._queue = collections.deque() async def __aiter__(self): return self async def __anext__(self): - msg = await self._async_iter.fetch_data() - if msg: - results = msg.data - processed_results = [] - for result in results: - current = self._query.session.current.get(result['id'], None) - if not current: - current = self._query._element_class() - element = self._query._mapper(result, current, - current.__mapping__) - processed_results.append(element) - return processed_results - else: - raise StopAsyncIteration + if not self._queue: + msg = await self._async_iter.fetch_data() + if msg: + results = msg.data + for result in results: + current = self._query.session.current.get(result['id'], None) + if not current: + current = self._query._element_class() + element = self._query._mapper(result, current, + current.__mapping__) + self._queue.append(element) + else: + raise StopAsyncIteration + return self._queue.popleft() class Query: diff --git a/tests/test_engine.py b/tests/test_engine.py index 42f40902b5d2a7c87c18e324a54923eae8611e0d..597d49b2514e4e6054d0bbd44060af687d3990fc 100644 --- a/tests/test_engine.py +++ b/tests/test_engine.py @@ -139,8 +139,8 @@ class TestEngine(unittest.TestCase): results = [] stream = await session.query(TestVertex).all() async for msg in stream: - results += msg - print(len(results)) + results.append(msg) + print(len(results)) self.assertEqual(len(session.current), 2) for result in results: self.assertIsInstance(result, Vertex)