db not clearing between sessions
For some reason the db is not clearing between different sessions. This apparently also causes the elements in the graphs to be recognized as generic goblin types instead of the classes I defined. This specific example will pass in the first time then fail in the second (hence the for loop of 2).
import asyncio
import goblin
from goblin import Goblin, Graph, element, properties
class Person(element.Vertex):
name = properties.Property(properties.String)
age = properties.Property(properties.Integer)
class Knows(element.Edge):
notes = properties.Property(properties.String, default='N/A')
async def test_data_clearing(event_loop):
goblin_app = await Goblin.open(
event_loop,
hosts=['localhost'],
port=8182,
provider=goblin.provider.JanusGraph,
)
session = await goblin_app.session()
# check if the db is empty
vertices = await session.g.V().toList()
assert len(vertices) == 0
# create some elements the graph:
leif = Person()
jon = Person()
works_with = Knows(leif, jon)
session.add(leif, jon, works_with)
await session.flush()
result = await session.g.E(works_with.id).next()
assert result is works_with
# check if type of the vertices/edges matches the classes or appear as generic goblin element:
g = Graph().traversal().withRemote(session)
vertices = await g.V().toList()
edges = await g.E().toList()
for vertex in vertices:
assert isinstance(vertex, Person)
for edge in edges:
assert isinstance(edge, Knows)
session.close()
await goblin_app.close()
if __name__ == '__main__':
loop = asyncio.get_event_loop()
for i in range(2):
loop.run_until_complete(test_data_clearing(event_loop=loop))