driver_remote_side_effects.py 3.36 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# from gremlin_python.driver import request
# from gremlin_python.process import traversal
#
#
#
# class AsyncRemoteTraversalSideEffects(traversal.TraversalSideEffects):
#     def __init__(self, side_effect, client):
#         self._side_effect = side_effect
#         self._client = client
#         self._keys = set()
#         self._side_effects = {}
#         self._closed = False
#
#     async def __getitem__(self, key):
#         if isinstance(key, slice):
#             raise TypeError(
#                 'AsyncRemoteTraversalSideEffects does not support slicing')
#         return await self.get(key)
#
#     async def keys(self):
#         """Get side effect keys associated with Traversal"""
#         if not self._closed:
#             message = request.RequestMessage(
#                 'traversal', 'keys',
#                 {'sideEffect': self._side_effect,
#                 'aliases': self._client.aliases})
#             result_set = await self._client.submit(message)
#             results = await result_set.all()
#             self._keys = set(results)
#         return self._keys
#
#     async def get(self, key):
#         """Get side effects associated with a specific key"""
#         if not self._side_effects.get(key):
#             if not self._closed:
#                 results = await self._get(key)
#                 self._side_effects[key] = results
#                 self._keys.add(key)
#             else:
#                 return None
#         return self._side_effects[key]
#
#     async def _get(self, key):
#         message = request.RequestMessage(
#             'traversal', 'gather',
#             {'sideEffect': self._side_effect, 'sideEffectKey': key,
#              'aliases': self._client.aliases})
#         result_set = await self._client.submit(message)
#         return await self._aggregate_results(result_set)
#
#     async def close(self):
#         """Release side effects"""
#         if not self._closed:
#             message = request.RequestMessage(
#                 'traversal', 'close',
#                 {'sideEffect': self._side_effect,
#                  'aliases': {'g': self._client.aliases}})
#             result_set = await self._client.submit(message)
#         self._closed = True
#         return await result_set.one()
#
#     async def _aggregate_results(self, result_set):
#         aggregates = {'list': [], 'set': set(), 'map': {}, 'bulkset': {},
#                       'none': None}
#         results = None
#         async for msg in result_set:
#             if results is None:
#                 aggregate_to = result_set.aggregate_to
#                 results = aggregates.get(aggregate_to, [])
#             # on first message, get the right result data structure
#             # if there is no update to a structure, then the item is the result
#             if results is None:
#                 results = msg
#             # updating a map is different than a list or a set
#             elif isinstance(results, dict):
#                 if aggregate_to == "map":
#                     results.update(msg)
#                 else:
#                     results[msg.object] = msg.bulk
#             elif isinstance(results, set):
#                 results.update(msg)
#             # flat add list to result list
#             else:
#                 results.append(msg)
#         if results is None:
#             results = []
#         return results