Commit ffdd9460 authored by davebshow's avatar davebshow
Browse files

working on new stream parsing

parent dd816a23
......@@ -4,6 +4,7 @@ import asyncio
import ssl
import uuid
import aiohttp
import ujson
from aiogremlin.connection import WebsocketPool
......@@ -102,8 +103,8 @@ class GremlinClient:
writer = GremlinWriter(connection)
connection = yield from writer.write(message, binary=binary)
queue = connection.parser.set_parser(gremlin_response_parser,
output=aiohttp.DataQueue())
return GremlinResponse(connection, queue)
output=aiohttp.DataQueue(loop=self._loop))
return GremlinResponse(connection, queue, loop=self._loop)
@asyncio.coroutine
def execute(self, gremlin, bindings=None, lang=None,
......@@ -120,8 +121,10 @@ class GremlinClient:
class GremlinResponse:
def __init__(self, conn, queue):
self._stream = GremlinResponseStream(conn, queue)
def __init__(self, conn, queue, loop=None):
self._loop = loop or asyncio.get_event_loop()
self._stream = GremlinResponseStream(conn, queue, loop=self._loop)
@property
def stream(self):
......@@ -146,9 +149,10 @@ class GremlinResponse:
class GremlinResponseStream:
def __init__(self, conn):
def __init__(self, conn, queue, loop=None):
self._conn = conn
self._queue = queue
self._loop = loop or asyncio.get_event_loop()
@asyncio.coroutine
def _read(self):
......@@ -157,10 +161,13 @@ class GremlinResponseStream:
[message, asyncio.async(self._conn._receive(), loop=self._loop)],
loop=self._loop, return_when=asyncio.FIRST_COMPLETED)
if message in done:
return message.result()
try:
return message.result()
except aiohttp.streams.EofStream:
pass
else:
message.cancel()
@asyncio.coroutine
def read(self):
return (yield from self._read)
return (yield from self._read())
......@@ -148,15 +148,17 @@ class AiohttpFactory(BaseFactory):
loop=loop)
except aiohttp.WSServerHandshakeError as e:
raise SocketClientError(e.message)
return AiohttpConnection(socket, pool)
return AiohttpConnection(socket, pool, loop=loop)
class BaseConnection(AbstractConnection):
def __init__(self, socket, pool=None):
def __init__(self, socket, pool=None, loop=None):
self.socket = socket
self._loop = loop or asyncio.get_event_loop()
self._pool = pool
self._parser = aiohttp.StreamParser()
self._parser = aiohttp.StreamParser(
buf=aiohttp.DataQueue(loop=self._loop), loop=self._loop)
@property
def parser(self):
......@@ -228,9 +230,9 @@ class AiohttpConnection(BaseConnection):
yield from self.release()
raise
if message.tp == aiohttp.MsgType.binary:
self._parser.feed_data(message.data.decode())
self.parser.feed_data(message.data.decode())
elif message.tp == aiohttp.MsgType.text:
self._parser.feed_data(message.data.strip())
self.parser.feed_data(message.data.strip())
else:
try:
if message.tp == aiohttp.MsgType.close:
......
......@@ -13,19 +13,19 @@ Message = collections.namedtuple("Message", ["status_code", "data", "message",
# REWRITE FOR StreamParser
@asyncio.coroutine
def gremlin_response_parser(connection):
message = yield from connection._receive()
def gremlin_response_parser(out, buf):
# import ipdb; ipdb.set_trace()
message = yield buf
message = ujson.loads(message)
message = Message(message["status"]["code"],
message["result"]["data"],
message["result"]["meta"],
message["status"]["message"])
if message.status_code == 200:
return message
out.feed_data(message)
elif message.status_code == 299:
connection.feed_pool()
# Return None
out.feed_eof()
else:
try:
if message.status_code < 500:
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment