Commit dd816a23 authored by davebshow's avatar davebshow
Browse files

working on setting up stream parser to use Eof for protocol change: Terminating the terminator

parent 1bdda00a
......@@ -101,7 +101,9 @@ class GremlinClient:
connection = yield from self.pool.connect(self.uri, loop=self.loop)
writer = GremlinWriter(connection)
connection = yield from writer.write(message, binary=binary)
return GremlinResponse(connection)
queue = connection.parser.set_parser(gremlin_response_parser,
output=aiohttp.DataQueue())
return GremlinResponse(connection, queue)
@asyncio.coroutine
def execute(self, gremlin, bindings=None, lang=None,
......@@ -118,8 +120,8 @@ class GremlinClient:
class GremlinResponse:
def __init__(self, conn):
self._stream = GremlinResponseStream(conn)
def __init__(self, conn, queue):
self._stream = GremlinResponseStream(conn, queue)
@property
def stream(self):
......@@ -145,8 +147,20 @@ class GremlinResponse:
class GremlinResponseStream:
def __init__(self, conn):
self.conn = conn
self._conn = conn
self._queue = queue
@asyncio.coroutine
def _read(self):
message = asyncio.async(self._queue.read(), loop=self._loop)
done, pending = yield from asyncio.wait(
[message, asyncio.async(self._conn._receive(), loop=self._loop)],
loop=self._loop, return_when=asyncio.FIRST_COMPLETED)
if message in done:
return message.result()
else:
message.cancel()
@asyncio.coroutine
def read(self):
return (yield from gremlin_response_parser(self.conn))
return (yield from self._read)
......@@ -156,6 +156,11 @@ class BaseConnection(AbstractConnection):
def __init__(self, socket, pool=None):
self.socket = socket
self._pool = pool
self._parser = aiohttp.StreamParser()
@property
def parser(self):
return self._parser
def feed_pool(self):
if self.pool:
......@@ -223,9 +228,9 @@ class AiohttpConnection(BaseConnection):
yield from self.release()
raise
if message.tp == aiohttp.MsgType.binary:
return message.data.decode()
self._parser.feed_data(message.data.decode())
elif message.tp == aiohttp.MsgType.text:
return message.data.strip()
self._parser.feed_data(message.data.strip())
else:
try:
if message.tp == aiohttp.MsgType.close:
......
......@@ -12,6 +12,7 @@ Message = collections.namedtuple("Message", ["status_code", "data", "message",
"metadata"])
# REWRITE FOR StreamParser
@asyncio.coroutine
def gremlin_response_parser(connection):
message = yield from connection._receive()
......
......@@ -20,9 +20,9 @@ def run(client, count, concurrency, loop):
def do_bomb():
nonlocal processed_count
while inqueue:
mssg, result = popleft()
params, result = popleft()
try:
resp = yield from execute(mssg)
resp = yield from execute("x + y", bindings=params)
assert resp[0].status_code == 200, resp[0].status_code
assert resp[0].data[0] == result, resp[0].data[0]
processed_count += 1
......@@ -32,9 +32,9 @@ def run(client, count, concurrency, loop):
for i in range(count):
rnd1 = random.randint(1, 9)
rnd2 = random.randint(1, 9)
mssg = "{} + {}".format(rnd1, rnd2)
params = {"x": rnd1, "y": rnd2}
result = rnd1 + rnd2
inqueue.append((mssg, result))
inqueue.append((params, result))
bombers = []
for i in range(concurrency):
......@@ -55,7 +55,7 @@ def main(client, tests, count, concurrency, warmups, loop):
execute = client.execute
# warmup
for x in range(warmups):
print("Warmup run {}:".format(x))
print("Warmup run {}:".format(x + 1))
yield from run(client, count, concurrency, loop)
print("Warmup successful!")
mps_list = []
......@@ -79,7 +79,7 @@ ARGS.add_argument(
help='message count (default: `%(default)s`)')
ARGS.add_argument(
'-c', '--concurrency', action="store",
nargs='?', type=int, default=500,
nargs='?', type=int, default=256,
help='count of parallel requests (default: `%(default)s`)')
ARGS.add_argument(
'-p', '--poolsize', action="store",
......@@ -87,7 +87,7 @@ ARGS.add_argument(
help='num connected websockets (default: `%(default)s`)')
ARGS.add_argument(
'-w', '--warmups', action="store",
nargs='?', type=int, default=1,
nargs='?', type=int, default=5,
help='num warmups (default: `%(default)s`)')
......@@ -102,8 +102,8 @@ if __name__ == "__main__":
client = loop.run_until_complete(
aiogremlin.create_client(loop=loop, poolsize=poolsize))
try:
print("Runs: {}. Warmups: {}. Messages: {}. Concurrency: {}.".format(
num_tests, num_warmups, num_mssg, concurr))
print("Runs: {}. Warmups: {}. Messages: {}. Concurrency: {}. Poolsize: {}".format(
num_tests, num_warmups, num_mssg, concurr, poolsize))
main = main(client, num_tests, num_mssg, concurr, num_warmups, loop)
loop.run_until_complete(main)
finally:
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment