diff --git a/README.md b/README.md index b361a1be0c348418f364f1521e35b922f782c0a3..a5c19225b9bb81c05928483ea34b5a5b5af214b3 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# aiogremlin 0.0.2 [(gizmo grew up)](https://pypi.python.org/pypi/gizmo/0.1.12) +# aiogremlin 0.0.4 [(gizmo grew up)](https://pypi.python.org/pypi/gizmo/0.1.12) `aiogremlin` is a **Python 3** driver for the the [Tinkerpop 3 Gremlin Server](http://www.tinkerpop.com/docs/3.0.0.M7/#gremlin-server). This module is built on [Asyncio](https://docs.python.org/3/library/asyncio.html). By default it uses the [aiohttp](http://aiohttp.readthedocs.org/en/v0.15.3/index.html) websocket client , but it is easy to plug in a different implementation. `aiogremlin` is currently in **alpha** mode, but all major functionality has test coverage. diff --git a/aiogremlin/__init__.py b/aiogremlin/__init__.py index 123064e230d20d2bac48a14b064cac0ef6d0c1cc..e666a2bc0c31d612d6268c3e2bdf4b1580a84179 100644 --- a/aiogremlin/__init__.py +++ b/aiogremlin/__init__.py @@ -3,4 +3,4 @@ from .connection import (WebsocketPool, AiohttpFactory, BaseFactory, BaseConnection) from .client import GremlinClient, create_client from .exceptions import RequestError, GremlinServerError, SocketClientError -__version__ = "0.0.3" +__version__ = "0.0.4dev" diff --git a/aiogremlin/abc.py b/aiogremlin/abc.py index 9cdea94bba5b525de4e267e4b5b1df60fbac6f29..9400e07346e30a7b98638f14fc124af0cb8a77bc 100644 --- a/aiogremlin/abc.py +++ b/aiogremlin/abc.py @@ -44,10 +44,6 @@ class AbstractConnection(metaclass=ABCMeta): def send(self): pass - @abstractmethod - def receive(self): - pass - @abstractmethod def _receive(self): pass diff --git a/aiogremlin/client.py b/aiogremlin/client.py index a8596906f5e55beddfd4a590928c741053b59bf8..1025ff06dfd56bc037970d74abb6525d2ed03e29 100644 --- a/aiogremlin/client.py +++ b/aiogremlin/client.py @@ -4,10 +4,10 @@ import asyncio import ssl import uuid -import ujson +import aiohttp from aiogremlin.connection import WebsocketPool -from aiogremlin.log import client_logger +from aiogremlin.log import client_logger, INFO from aiogremlin.protocol import gremlin_response_parser, GremlinWriter @@ -15,12 +15,13 @@ from aiogremlin.protocol import gremlin_response_parser, GremlinWriter def create_client(uri='ws://localhost:8182/', loop=None, ssl=None, protocol=None, lang="gremlin-groovy", op="eval", processor="", pool=None, factory=None, poolsize=10, - timeout=None, **kwargs): + timeout=None, verbose=False, **kwargs): pool = WebsocketPool(uri, factory=factory, poolsize=poolsize, timeout=timeout, - loop=loop) + loop=loop, + verbose=verbose) yield from pool.init_pool() @@ -32,7 +33,8 @@ def create_client(uri='ws://localhost:8182/', loop=None, ssl=None, op=op, processor=processor, pool=pool, - factory=factory) + factory=factory, + verbose=verbose) class GremlinClient: @@ -40,7 +42,7 @@ class GremlinClient: def __init__(self, uri='ws://localhost:8182/', loop=None, ssl=None, protocol=None, lang="gremlin-groovy", op="eval", processor="", pool=None, factory=None, poolsize=10, - timeout=None, **kwargs): + timeout=None, verbose=True, **kwargs): """ """ self.uri = uri @@ -61,6 +63,8 @@ class GremlinClient: self.pool = pool or WebsocketPool(uri, factory=factory, poolsize=poolsize, timeout=timeout, loop=self._loop) self.factory = factory or self.pool.factory + if verbose: + client_logger.setLevel(INFO) @property def loop(self): @@ -87,7 +91,7 @@ class GremlinClient: lang = lang or self.lang op = op or self.op processor = processor or self.processor - message = ujson.dumps({ + message = { "requestId": str(uuid.uuid4()), "op": op, "processor": processor, @@ -96,12 +100,18 @@ class GremlinClient: "bindings": bindings, "language": lang } - }) + } + if processor == "session": + message["args"]["session"] = str(uuid.uuid4()) + client_logger.info( + "Session ID: {}".format(message["args"]["session"])) if connection is None: connection = yield from self.pool.connect(self.uri, loop=self.loop) writer = GremlinWriter(connection) connection = yield from writer.write(message, binary=binary) - return GremlinResponse(connection) + queue = connection.parser.set_parser(gremlin_response_parser, + output=aiohttp.DataQueue(loop=self._loop)) + return GremlinResponse(connection, queue, loop=self._loop) @asyncio.coroutine def execute(self, gremlin, bindings=None, lang=None, @@ -118,8 +128,9 @@ class GremlinClient: class GremlinResponse: - def __init__(self, conn): - self._stream = GremlinResponseStream(conn) + def __init__(self, conn, queue, loop=None): + self._loop = loop or asyncio.get_event_loop() + self._stream = GremlinResponseStream(conn, queue, loop=self._loop) @property def stream(self): @@ -135,7 +146,7 @@ class GremlinResponse: """ results = [] while True: - message = yield from self.stream.read() + message = yield from self._stream.read() if message is None: break results.append(message) @@ -144,9 +155,23 @@ class GremlinResponse: class GremlinResponseStream: - def __init__(self, conn): - self.conn = conn + def __init__(self, conn, queue, loop=None): + self._conn = conn + self._queue = queue + self._loop = loop or asyncio.get_event_loop() @asyncio.coroutine def read(self): - return (yield from gremlin_response_parser(self.conn)) + # For 3.0.0.M9 + # if self._queue.at_eof(): + # self._conn.feed_pool() + # message = None + # else: + # This will be different 3.0.0.M9 + yield from self._conn._receive() + if self._queue.is_eof(): + self._conn.feed_pool() + message = None + else: + message = yield from self._queue.read() + return message diff --git a/aiogremlin/connection.py b/aiogremlin/connection.py index 74f4794660e66bfea5522a827b26a2de048312d7..633a5028b6cb17fd4738031f7d1d6a5156409b98 100644 --- a/aiogremlin/connection.py +++ b/aiogremlin/connection.py @@ -102,17 +102,15 @@ class WebsocketPool: try: socket = yield from self.factory.connect(uri, pool=self, loop=loop) - except: - raise - else: - conn_logger.info("New connection on socket: {} at {}".format( - socket, uri)) finally: self.num_connecting -= 1 if not socket.closed: + conn_logger.info("New connection on socket: {} at {}".format( + socket, uri)) self.active_conns.add(socket) # Untested. elif num_retries > 0: + conn_logger.warning("Got bad socket, retry...") socket = yield from self.connect(uri, loop, num_retries - 1) else: raise RuntimeError("Unable to connect, max retries exceeded.") @@ -148,24 +146,27 @@ class AiohttpFactory(BaseFactory): loop=loop) except aiohttp.WSServerHandshakeError as e: raise SocketClientError(e.message) - return AiohttpConnection(socket, pool) + return AiohttpConnection(socket, pool, loop=loop) class BaseConnection(AbstractConnection): - def __init__(self, socket, pool=None): + def __init__(self, socket, pool=None, loop=None): self.socket = socket + self._loop = loop or asyncio.get_event_loop() self._pool = pool + self._parser = aiohttp.StreamParser( + buf=aiohttp.DataQueue(loop=self._loop), loop=self._loop) + + @property + def parser(self): + return self._parser def feed_pool(self): if self.pool: if self in self.pool.active_conns: self.pool.feed_pool(self) - @asyncio.coroutine - def receive(self): - return (yield from parse_gremlin_response(self)) - @asyncio.coroutine def release(self): try: @@ -223,9 +224,17 @@ class AiohttpConnection(BaseConnection): yield from self.release() raise if message.tp == aiohttp.MsgType.binary: - return message.data.decode() + try: + self.parser.feed_data(message.data.decode()) + except Exception: + self.release() + raise elif message.tp == aiohttp.MsgType.text: - return message.data.strip() + try: + self.parser.feed_data(message.data.strip()) + except Exception: + self.release() + raise else: try: if message.tp == aiohttp.MsgType.close: diff --git a/aiogremlin/protocol.py b/aiogremlin/protocol.py index 2a2613561e2e86421594f1174611cee12f825bbd..897acac0147d9d647ff8d832fb28e3071fd2f74f 100644 --- a/aiogremlin/protocol.py +++ b/aiogremlin/protocol.py @@ -12,27 +12,32 @@ Message = collections.namedtuple("Message", ["status_code", "data", "message", "metadata"]) -@asyncio.coroutine -def gremlin_response_parser(connection): - message = yield from connection._receive() - message = ujson.loads(message) - message = Message(message["status"]["code"], - message["result"]["data"], - message["result"]["meta"], - message["status"]["message"]) - if message.status_code == 200: - return message - elif message.status_code == 299: - connection.feed_pool() - # Return None - else: - try: +def gremlin_response_parser(out, buf): + while True: + message = yield + message = ujson.loads(message) + message = Message(message["status"]["code"], + message["result"]["data"], + message["result"]["meta"], + message["status"]["message"]) + if message.status_code == 200: + out.feed_data(message) + # For 3.0.0.M9 + # out.feed_eof() + # This will be terminated in 3.0.0.M9 + elif message.status_code == 299: + out.feed_eof() + # For 3.0.0.M9 + # elif message.status_code == 206: + # out.feed_data(message) + # elif message.status_code == 204: + # out.feed_data(message) + # out.feed_eof() + else: if message.status_code < 500: raise RequestError(message.status_code, message.message) else: raise GremlinServerError(message.status_code, message.message) - finally: - yield from connection.release() class GremlinWriter: @@ -42,6 +47,7 @@ class GremlinWriter: @asyncio.coroutine def write(self, message, binary=True, mime_type="application/json"): + message = ujson.dumps(message) if binary: message = self._set_message_header(message, mime_type) yield from self._connection.send(message, binary) diff --git a/benchmark.py b/benchmark.py index 4514f3e971694125ce46d2960358d65b869228f7..2e8ada786a170166732bc2153434dbd82b352cbf 100644 --- a/benchmark.py +++ b/benchmark.py @@ -27,7 +27,7 @@ def run(client, count, concurrency, loop): assert resp[0].data[0] == result, resp[0].data[0] processed_count += 1 except Exception: - continue + raise for i in range(count): rnd1 = random.randint(1, 9) @@ -79,11 +79,11 @@ ARGS.add_argument( help='message count (default: `%(default)s`)') ARGS.add_argument( '-c', '--concurrency', action="store", - nargs='?', type=int, default=256, + nargs='?', type=int, default=100, help='count of parallel requests (default: `%(default)s`)') ARGS.add_argument( '-p', '--poolsize', action="store", - nargs='?', type=int, default=256, + nargs='?', type=int, default=100, help='num connected websockets (default: `%(default)s`)') ARGS.add_argument( '-w', '--warmups', action="store", @@ -102,8 +102,8 @@ if __name__ == "__main__": client = loop.run_until_complete( aiogremlin.create_client(loop=loop, poolsize=poolsize)) try: - print("Runs: {}. Warmups: {}. Messages: {}. Concurrency: {}.".format( - num_tests, num_warmups, num_mssg, concurr)) + print("Runs: {}. Warmups: {}. Messages: {}. Concurrency: {}. Poolsize: {}".format( + num_tests, num_warmups, num_mssg, concurr, poolsize)) main = main(client, num_tests, num_mssg, concurr, num_warmups, loop) loop.run_until_complete(main) finally: diff --git a/changes.txt b/changes.txt index e134c84a70a455761e5f2b1c6b12c372beee47d8..2fbf42b17c05ee7a2fe8f01d31b6f6552f445aaa 100644 --- a/changes.txt +++ b/changes.txt @@ -1,3 +1,4 @@ 0.0.1 - 4/2015: Birth! 0.0.2 - 5/1/2015: Added an init_pool method and a create_client constructor. 0.0.3 - 5/2/2015: Using ujson for serialization. +0.0.4 - 5/12/2015: Added support for sessions. diff --git a/profile.py b/profiler.py similarity index 100% rename from profile.py rename to profiler.py diff --git a/setup.py b/setup.py index 4244101250fd5644525abe738f48bbeb02d4e80f..01a7bd91490adc083ae3f1e279a31244c97ff6df 100644 --- a/setup.py +++ b/setup.py @@ -3,7 +3,7 @@ from setuptools import setup setup( name="aiogremlin", - version="0.0.3", + version="0.0.4", url="", license="MIT", author="davebshow",