Commit 3ba1df03 authored by davebshow's avatar davebshow
Browse files

working on stream parser

parent ffdd9460
......@@ -125,7 +125,6 @@ class GremlinResponse:
self._loop = loop or asyncio.get_event_loop()
self._stream = GremlinResponseStream(conn, queue, loop=self._loop)
@property
def stream(self):
return self._stream
......@@ -164,7 +163,10 @@ class GremlinResponseStream:
try:
return message.result()
except aiohttp.streams.EofStream:
pass
self._conn.feed_pool()
except Exception:
self._conn.feed_pool()
raise
else:
message.cancel()
......
......@@ -12,28 +12,26 @@ Message = collections.namedtuple("Message", ["status_code", "data", "message",
"metadata"])
# REWRITE FOR StreamParser
def gremlin_response_parser(out, buf):
# import ipdb; ipdb.set_trace()
message = yield buf
message = ujson.loads(message)
message = Message(message["status"]["code"],
message["result"]["data"],
message["result"]["meta"],
message["status"]["message"])
if message.status_code == 200:
out.feed_data(message)
elif message.status_code == 299:
connection.feed_pool()
out.feed_eof()
else:
try:
if message.status_code < 500:
raise RequestError(message.status_code, message.message)
else:
raise GremlinServerError(message.status_code, message.message)
finally:
yield from connection.release()
while True:
message = yield
message = ujson.loads(message)
message = Message(message["status"]["code"],
message["result"]["data"],
message["result"]["meta"],
message["status"]["message"])
if message.status_code == 200:
out.feed_data(message)
elif message.status_code == 299:
out.feed_eof()
else:
try:
if message.status_code < 500:
raise RequestError(message.status_code, message.message)
else:
raise GremlinServerError(message.status_code, message.message)
finally:
yield from connection.release()
class GremlinWriter:
......
......@@ -20,9 +20,9 @@ def run(client, count, concurrency, loop):
def do_bomb():
nonlocal processed_count
while inqueue:
params, result = popleft()
mssg, result = popleft()
try:
resp = yield from execute("x + y", bindings=params)
resp = yield from execute(mssg)
assert resp[0].status_code == 200, resp[0].status_code
assert resp[0].data[0] == result, resp[0].data[0]
processed_count += 1
......@@ -32,9 +32,9 @@ def run(client, count, concurrency, loop):
for i in range(count):
rnd1 = random.randint(1, 9)
rnd2 = random.randint(1, 9)
params = {"x": rnd1, "y": rnd2}
mssg = "{} + {}".format(rnd1, rnd2)
result = rnd1 + rnd2
inqueue.append((params, result))
inqueue.append((mssg, result))
bombers = []
for i in range(concurrency):
......@@ -55,7 +55,7 @@ def main(client, tests, count, concurrency, warmups, loop):
execute = client.execute
# warmup
for x in range(warmups):
print("Warmup run {}:".format(x + 1))
print("Warmup run {}:".format(x))
yield from run(client, count, concurrency, loop)
print("Warmup successful!")
mps_list = []
......@@ -102,8 +102,8 @@ if __name__ == "__main__":
client = loop.run_until_complete(
aiogremlin.create_client(loop=loop, poolsize=poolsize))
try:
print("Runs: {}. Warmups: {}. Messages: {}. Concurrency: {}. Poolsize: {}".format(
num_tests, num_warmups, num_mssg, concurr, poolsize))
print("Runs: {}. Warmups: {}. Messages: {}. Concurrency: {}.".format(
num_tests, num_warmups, num_mssg, concurr))
main = main(client, num_tests, num_mssg, concurr, num_warmups, loop)
loop.run_until_complete(main)
finally:
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment