diff --git a/README.md b/README.md index b37414aa227221f85cd53ab5b4fd3ebdfbdb6535..9da6f42540bf16571c976390b8c71337617a5eba 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # aiogremlin 0.0.6 [(gizmo grew up)](https://pypi.python.org/pypi/gizmo/0.1.12) -`aiogremlin` is a **Python 3** driver for the the [Tinkerpop 3 Gremlin Server](http://www.tinkerpop.com/docs/3.0.0.M7/#gremlin-server). This module is built on [Asyncio](https://docs.python.org/3/library/asyncio.html). By default it uses the [aiohttp](http://aiohttp.readthedocs.org/en/v0.15.3/index.html) websocket client , but it is easy to plug in a different implementation. `aiogremlin` is currently in **alpha** mode, but all major functionality has test coverage. +`aiogremlin` is a **Python 3** driver for the the [Tinkerpop 3 Gremlin Server](http://www.tinkerpop.com/docs/3.0.0.M7/#gremlin-server). This module is built on [Asyncio](https://docs.python.org/3/library/asyncio.html) and [aiohttp](http://aiohttp.readthedocs.org/en/v0.15.3/index.html) `aiogremlin` is currently in **alpha** mode, but all major functionality has test coverage. ## Getting started @@ -26,13 +26,15 @@ The `GremlinClient` communicates asynchronously with the Gremlin Server using we The Gremlin Server responds with messages in chunks, `GremlinClient.submit` submits a script to the server, and returns a `GremlinResponse` object. This object provides the methods: `get` and the property `stream`. `get` collects all of the response messages and returns them as a Python list. `stream` returns an object of the type `GremlinResponseStream` that implements a method `read`. This allows you to read the response without loading all of the messages into memory. +Note that the GremlinClient constructor and the create_client function take [keyword only arguments](https://www.python.org/dev/peps/pep-3102/) only! + ```python >>> loop = asyncio.get_event_loop() ->>> gc = GremlinClient('ws://localhost:8182/', loop=loop) +>>> gc = GremlinClient(url='ws://localhost:8182/', loop=loop) # Default url # Or even better, use the constructor function. This will init pool connections. # ***Must be done inside a coroutine*** ->>> gc = yield from create_client('ws://localhost:8182/', loop=loop) +>>> gc = yield from create_client(loop=loop) # Or outside of a coroutine using the loop to help >>> gc = loop.run_until_complete(create_client(loop=loop)) @@ -74,9 +76,30 @@ For convenience, `aiogremlin` also provides a method `execute`, which is equival ```python >>> loop = asyncio.get_event_loop() ->>> gc = GremlinClient('ws://localhost:8182/', loop=loop) +>>> gc = GremlinClient(loop=loop) >>> execute = gc.execute("x + x", bindings={"x": 4}) >>> result = loop.run_until_complete(execute) >>> result [Message(status_code=200, data=[8], message={}, metadata='')] +>>> loop.run_until_complete(gc.close()) # Explicitly close client!!! +>>> loop.close() +``` + +To avoid the explicit client close, `aiogremlin` provides a class called `ConnectionContextManager`. To create an instance of `ConnectionContextManager`, get a connection from a `WebSocketPool` instance as follows: + +```python +>>> loop = asyncio.get_event_loop() +# Note that url is a required positional param here! +# Pool does not open any connections until requested. +>>> pool = aiogremlin.WebSocketPool('ws://localhost:8182/') +>>> @asyncio.coroutine +... def go(pool, loop): +... with (yield from pool) as conn: +... gc = aiogremlin.GremlinClient(connection=conn, loop=loop) +... resp = yield from gc.execute("1 + 1") +... return resp +>>> result = loop.run_until_complete(go(pool, loop)) +>>> result +[Message(status_code=200, data=[2], message={}, metadata='')] +>>> loop.close() # Close loop, but client connections will be closed. ``` diff --git a/aiogremlin/__init__.py b/aiogremlin/__init__.py index 667336e1acc67a7858d3b2c34e8fa2f539c50f1a..3f6fa2fb9a299c959db5957cc6251e19391c759f 100644 --- a/aiogremlin/__init__.py +++ b/aiogremlin/__init__.py @@ -1,9 +1,6 @@ -from .abc import AbstractFactory, AbstractConnection -from .connection import (AiohttpFactory, BaseFactory, BaseConnection, - WebSocketSession) -from .client import (create_client, GremlinClient, GremlinResponse, - GremlinResponseStream) -from .exceptions import RequestError, GremlinServerError, SocketClientError -from .pool import WebSocketPool -from .protocol import GremlinWriter -__version__ = "0.0.6" +from .connection import * +from .client import * +from .exceptions import * +from .pool import * +from .protocol import * +__version__ = "0.0.7" diff --git a/aiogremlin/abc.py b/aiogremlin/abc.py deleted file mode 100644 index 79a9bb23c56695689b4b71916a6a532d53222a93..0000000000000000000000000000000000000000 --- a/aiogremlin/abc.py +++ /dev/null @@ -1,34 +0,0 @@ -"""Abstract classes for creating pluggable websocket clients.""" - -from abc import ABCMeta, abstractmethod - - -class AbstractFactory(metaclass=ABCMeta): - - @abstractmethod - def ws_connect(cls): - pass - - -class AbstractConnection(metaclass=ABCMeta): - - @property - @abstractmethod - def closed(self): - pass - - @abstractmethod - def close(): - pass - - @abstractmethod - def _close(): - pass - - @abstractmethod - def send(self): - pass - - @abstractmethod - def receive(self): - pass diff --git a/aiogremlin/client.py b/aiogremlin/client.py index af4d270b693182a9c45f8e000e4c052b56f6a6a4..e970826f1e39872d2aa425ddbb5db5e8afdba958 100644 --- a/aiogremlin/client.py +++ b/aiogremlin/client.py @@ -5,30 +5,41 @@ import ssl import aiohttp -from aiogremlin.connection import AiohttpFactory +from aiogremlin.connection import (GremlinFactory, WebSocketSession, + GremlinClientWebSocketResponse) from aiogremlin.exceptions import RequestError from aiogremlin.log import logger, INFO from aiogremlin.pool import WebSocketPool from aiogremlin.protocol import gremlin_response_parser, GremlinWriter +__all__ = ("create_client", "GremlinClient", "GremlinResponse", + "GremlinResponseStream") + @asyncio.coroutine -def create_client(uri='ws://localhost:8182/', loop=None, ssl=None, +def create_client(*, url='ws://localhost:8182/', loop=None, protocol=None, lang="gremlin-groovy", op="eval", processor="", pool=None, factory=None, poolsize=10, - timeout=None, verbose=False, **kwargs): - pool = WebSocketPool(uri, - factory=factory, - poolsize=poolsize, - timeout=timeout, - loop=loop, - verbose=verbose) + timeout=None, verbose=False, fill_pool=True, connector=None): + + if factory is None: + factory = WebSocketSession(connector=connector, + ws_response_class=GremlinClientWebSocketResponse, + loop=loop) - yield from pool.fill_pool() + if pool is None: + pool = WebSocketPool(url, + factory=factory, + poolsize=poolsize, + timeout=timeout, + loop=loop, + verbose=verbose) - return GremlinClient(uri=uri, + if fill_pool: + yield from pool.fill_pool() + + return GremlinClient(url=url, loop=loop, - ssl=ssl, protocol=protocol, lang=lang, op=op, @@ -40,21 +51,16 @@ def create_client(uri='ws://localhost:8182/', loop=None, ssl=None, class GremlinClient: - def __init__(self, uri='ws://localhost:8182/', loop=None, ssl=None, + def __init__(self, url='ws://localhost:8182/', loop=None, protocol=None, lang="gremlin-groovy", op="eval", processor="", pool=None, factory=None, poolsize=10, - timeout=None, verbose=False, **kwargs): + timeout=None, verbose=False, connector=None, + connection=None): """ """ - self.uri = uri + self.url = url self.ssl = ssl self.protocol = protocol - # if self.ssl: - # protocol = protocol or ssl.PROTOCOL_TLSv1 - # ssl_context = ssl.SSLContext(protocol) - # ssl_context.load_verify_locations(ssl) - # ssl_context.verify_mode = ssl.CERT_REQUIRED - # self.ssl_context = ssl_context # This will go to conn pool... use TCPConnector? self._loop = loop or asyncio.get_event_loop() self.lang = lang or "gremlin-groovy" self.op = op or "eval" @@ -62,12 +68,18 @@ class GremlinClient: self.poolsize = poolsize self.timeout = timeout self._pool = pool - self._factory = factory or AiohttpFactory() - if self._pool is None: + self._connector = connector + self._factory = factory or GremlinFactory(connector=self._connector) + self._conn = connection + if self._pool is None and self._conn is None: self._connected = False self._conn = asyncio.async(self._connect(), loop=self._loop) - else: + elif pool is not None: self._connected = self._pool._connected + elif self._conn is not None and not connection._closed: + self._connected = True + else: + self._connected = False if verbose: logger.setLevel(INFO) @@ -86,16 +98,16 @@ class GremlinClient: self._connected = False @asyncio.coroutine - def _connect(self, **kwargs): + def _connect(self): """ """ - loop = kwargs.get("loop", "") or self._loop - connection = yield from self._factory.ws_connect(self.uri, loop=loop) + connection = yield from self._factory.ws_connect(self.url, + loop=self._loop) self._connected = True return connection @asyncio.coroutine - def _acquire(self, **kwargs): + def _acquire(self): if self._pool: conn = yield from self._pool.acquire() elif self._connected: @@ -111,26 +123,26 @@ class GremlinClient: return conn @asyncio.coroutine - def submit(self, gremlin, conn=None, bindings=None, lang=None, op=None, - processor=None, session=None, binary=True): + def submit(self, gremlin, *, connection=None, bindings=None, lang=None, + op=None, processor=None, session=None, binary=True): """ """ lang = lang or self.lang op = op or self.op processor = processor or self.processor - if conn is None: - conn = yield from self._acquire() - writer = GremlinWriter(conn) - conn = writer.write(gremlin, bindings=bindings, - lang=lang, op=op, processor=processor, session=session, - binary=binary) - return GremlinResponse(conn, + if connection is None: + connection = yield from self._acquire() + writer = GremlinWriter(connection) + connection = writer.write(gremlin, bindings=bindings, lang=lang, op=op, + processor=processor, session=session, + binary=binary) + return GremlinResponse(connection, pool=self._pool, session=session, loop=self._loop) @asyncio.coroutine - def execute(self, gremlin, bindings=None, lang=None, + def execute(self, gremlin, *, bindings=None, lang=None, op=None, processor=None, consumer=None, collect=True, **kwargs): """ """ @@ -138,13 +150,13 @@ class GremlinClient: op = op or self.op processor = processor or self.processor resp = yield from self.submit(gremlin, bindings=bindings, lang=lang, - op=op, processor=processor) + op=op, processor=processor) return (yield from resp.get()) class GremlinResponse: - def __init__(self, conn, pool=None, session=None, loop=None): + def __init__(self, conn, *, pool=None, session=None, loop=None): self._loop = loop or asyncio.get_event_loop() self._session = session self._stream = GremlinResponseStream(conn, pool=pool, loop=self._loop) diff --git a/aiogremlin/connection.py b/aiogremlin/connection.py index 019c3bdd1902e4d2e5ca0cb38b76c6fde67c25f4..30ce5eb132a703264676e485c1e3555a839c2b45 100644 --- a/aiogremlin/connection.py +++ b/aiogremlin/connection.py @@ -15,16 +15,33 @@ from aiohttp.websocket import (MSG_BINARY, MSG_TEXT, MSG_CLOSE, MSG_PING, from aiohttp.websocket_client import (MsgType, closedMessage, ClientWebSocketResponse) -from aiogremlin.abc import AbstractFactory, AbstractConnection from aiogremlin.exceptions import SocketClientError from aiogremlin.log import INFO, logger +__all__ = ('WebSocketSession', 'GremlinFactory', + 'GremlinClientWebSocketResponse') + + +# Basically cut and paste from aiohttp until merge/release of #374 +class WebSocketSession(ClientSession): + + def __init__(self, *, connector=None, loop=None, + cookies=None, headers=None, auth=None, + ws_response_class=None): + + super().__init__(connector=connector, loop=loop, + cookies=cookies, headers=headers, auth=auth) + + self._ws_response_class = ws_response_class -class WebSocketSession(AbstractFactory, ClientSession): @asyncio.coroutine - def ws_connect(self, url, protocols=(), timeout=10.0, connector=None, - response_class=None, autoclose=True, autoping=True, + def ws_connect(self, url, *, + protocols=(), + timeout=10.0, + autoclose=True, + autoping=True, + ws_response_class=None, loop=None): """Initiate websocket connection.""" @@ -41,7 +58,7 @@ class WebSocketSession(AbstractFactory, ClientSession): # send request resp = yield from self.request('get', url, headers=headers, - read_until_eof=False) + read_until_eof=False) # check handshake if resp.status != 101: @@ -73,10 +90,11 @@ class WebSocketSession(AbstractFactory, ClientSession): reader = resp.connection.reader.set_parser(WebSocketParser) writer = WebSocketWriter(resp.connection.writer, use_mask=True) - if response_class is None: - response_class = ClientWebSocketResponse + if ws_response_class is None: + ws_response_class = (self._ws_response_class or + ClientWebSocketResponse) - return response_class( + return ws_response_class( reader, writer, protocol, resp, timeout, autoclose, autoping, loop) def detach(self): @@ -86,8 +104,9 @@ class WebSocketSession(AbstractFactory, ClientSession): self._connector = None -def ws_connect(url, protocols=(), timeout=10.0, connector=None, - response_class=None, autoclose=True, autoping=True, +# Cut and paste from aiohttp until merge/release of #374 +def ws_connect(url, *, protocols=(), timeout=10.0, connector=None, + ws_response_class=None, autoclose=True, autoping=True, loop=None): if loop is None: asyncio.get_event_loop() @@ -95,13 +114,11 @@ def ws_connect(url, protocols=(), timeout=10.0, connector=None, connector = TCPConnector(loop=loop, force_close=True) ws_session = WebSocketSession(loop=loop, connector=connector) - try: resp = yield from ws_session.ws_connect(url, protocols=protocols, timeout=timeout, - connector=connector, - response_class=response_class, + ws_response_class=ws_response_class, autoclose=autoclose, autoping=autoping, loop=loop) @@ -111,51 +128,39 @@ def ws_connect(url, protocols=(), timeout=10.0, connector=None, ws_session.detach() - # Will drop 'pluggable sockets' implementation in favour of aiohttp default. -class BaseFactory(AbstractFactory): +class GremlinFactory: - @property - def factory(self): - return self - - -class AiohttpFactory(BaseFactory): + def __init__(self, connector=None): + self._connector = connector @asyncio.coroutine - def ws_connect(cls, uri='ws://localhost:8182/', protocols=(), + def ws_connect(self, url='ws://localhost:8182/', protocols=(), connector=None, autoclose=False, autoping=True, - response_class=None, loop=None): - if response_class is None: - response_class = GremlinClientWebSocketResponse - + ws_response_class=None, loop=None): + if connector is None: + connector = self._connector + if ws_response_class is None: + ws_response_class = GremlinClientWebSocketResponse try: return (yield from ws_connect( - uri, protocols=protocols, connector=connector, - response_class=response_class, autoclose=True, autoping=True, + url, protocols=protocols, connector=connector, + ws_response_class=ws_response_class, autoclose=True, autoping=True, loop=loop)) except WSServerHandshakeError as e: raise SocketClientError(e.message) -class BaseConnection(AbstractConnection): - - def __init__(self, loop=None): - self._loop = loop or asyncio.get_event_loop() - self._parser = StreamParser( - buf=DataQueue(loop=self._loop), loop=self._loop) - - @property - def parser(self): - return self._parser - - -class GremlinClientWebSocketResponse(BaseConnection, ClientWebSocketResponse): +class GremlinClientWebSocketResponse(ClientWebSocketResponse): def __init__(self, reader, writer, protocol, response, timeout, autoclose, autoping, loop): - BaseConnection.__init__(self, loop=loop) ClientWebSocketResponse.__init__(self, reader, writer, protocol, response, timeout, autoclose, autoping, loop) + self._parser = StreamParser(buf=DataQueue(loop=loop), loop=loop) + + @property + def parser(self): + return self._parser @property def closed(self): diff --git a/aiogremlin/contextmanager.py b/aiogremlin/contextmanager.py index 9e7e9a2f2168eb6f820c360ec5bc2df5e9fbed12..b5ecade34217b5924a051894e229b58e6da0a8a6 100644 --- a/aiogremlin/contextmanager.py +++ b/aiogremlin/contextmanager.py @@ -1,10 +1,9 @@ class ConnectionContextManager: - __slots__ = ("_conn", "_pool") + __slots__ = ("_conn") - def __init__(self, conn, pool): + def __init__(self, conn): self._conn = conn - self._pool = pool def __enter__(self): return self._conn @@ -15,4 +14,3 @@ class ConnectionContextManager: self._conn._close() finally: self._conn = None - self._pool = None diff --git a/aiogremlin/exceptions.py b/aiogremlin/exceptions.py index 00512f2dedbb621711c19238898b3c615fad9684..8b42d0b6c309964a115e1bf30de845c5749895bc 100644 --- a/aiogremlin/exceptions.py +++ b/aiogremlin/exceptions.py @@ -2,6 +2,9 @@ Gremlin Server exceptions. """ +__all__ = ("RequestError", "GremlinServerError", "SocketClientError") + + class SocketClientError(IOError): pass diff --git a/aiogremlin/pool.py b/aiogremlin/pool.py index 2f3157878b5f76b0f37287f626776f05d2cabe7d..117e0c481b0cbcac50ac2812ba58c1dfb5d68688 100644 --- a/aiogremlin/pool.py +++ b/aiogremlin/pool.py @@ -1,24 +1,22 @@ import asyncio -from aiogremlin.connection import (AiohttpFactory, +from aiogremlin.connection import (GremlinFactory, GremlinClientWebSocketResponse) from aiogremlin.contextmanager import ConnectionContextManager from aiogremlin.log import logger - -def create_pool(): - pass +__all__ = ("WebSocketPool",) class WebSocketPool: - def __init__(self, url='ws://localhost:8182/', factory=None, poolsize=10, + def __init__(self, url, *, factory=None, poolsize=10, connector=None, max_retries=10, timeout=None, loop=None, verbose=False, - response_class=None): + ws_response_class=None): """ """ self.url = url - self._factory = factory or AiohttpFactory + self._factory = factory or GremlinFactory(connector=connector) self.poolsize = poolsize self.max_retries = max_retries self.timeout = timeout @@ -27,7 +25,8 @@ class WebSocketPool: self._pool = asyncio.Queue(maxsize=self.poolsize, loop=self._loop) self.active_conns = set() self.num_connecting = 0 - self._response_class = response_class or GremlinClientWebSocketResponse + self._response_class = (ws_response_class or + GremlinClientWebSocketResponse) self._closed = False if verbose: logger.setLevel(INFO) @@ -37,8 +36,11 @@ class WebSocketPool: tasks = [] poolsize = self.poolsize for i in range(poolsize): - task = asyncio.async(self.factory.ws_connect(self.url, - response_class=self._response_class, loop=self._loop), loop=self._loop) + coro = self.factory.ws_connect( + self.url, + ws_response_class=self._response_class, + loop=self._loop) + task = asyncio.async(coro, loop=self._loop) tasks.append(task) for f in asyncio.as_completed(tasks, loop=self._loop): conn = yield from f @@ -78,18 +80,14 @@ class WebSocketPool: @asyncio.coroutine def _close_active_conns(self): tasks = [asyncio.async(conn.close(), loop=self.loop) for conn - in self.active_conns] + in self.active_conns] yield from asyncio.wait(tasks, loop=self.loop) @asyncio.coroutine def _purge_pool(self): - while True: - try: - conn = self._pool.get_nowait() - except asyncio.QueueEmpty: - break - else: - yield from conn.close() + while not self._pool.empty(): + conn = self._pool.get_nowait() + yield from conn.close() @asyncio.coroutine def acquire(self, url=None, loop=None, num_retries=None): @@ -105,18 +103,20 @@ class WebSocketPool: elif self.num_active_conns + self.num_connecting >= self.poolsize: logger.info("Waiting for socket...") socket = yield from asyncio.wait_for(self._pool.get(), - self.timeout, loop=loop) + self.timeout, loop=loop) logger.info("Socket acquired: {} at {}".format(socket, url)) else: self.num_connecting += 1 try: - socket = yield from self.factory.ws_connect(url, - response_class=self._response_class, loop=loop) + socket = yield from self.factory.ws_connect( + url, + ws_response_class=self._response_class, + loop=loop) finally: self.num_connecting -= 1 if not socket.closed: logger.info("New connection on socket: {} at {}".format( - socket, url)) + socket, url)) self.active_conns.add(socket) # Untested. elif num_retries > 0: @@ -144,4 +144,4 @@ class WebSocketPool: def __iter__(self): conn = yield from self.acquire() - return ConnectionContextManager(conn, self) + return ConnectionContextManager(conn) diff --git a/aiogremlin/protocol.py b/aiogremlin/protocol.py index 563f1402737b658bb5554f2a65cc601b00e42afd..ddfb11e9bff4de7adf3ae8d16e25017e2cf341f4 100644 --- a/aiogremlin/protocol.py +++ b/aiogremlin/protocol.py @@ -12,6 +12,8 @@ except ImportError: from aiogremlin.exceptions import RequestError, GremlinServerError from aiogremlin.log import logger +__all__ = ("GremlinWriter",) + Message = collections.namedtuple("Message", ["status_code", "data", "message", "metadata"]) @@ -53,8 +55,12 @@ class GremlinWriter: def write(self, gremlin, bindings=None, lang="gremlin-groovy", op="eval", processor="", session=None, binary=True, mime_type="application/json"): - message = self._prepare_message(gremlin, bindings=bindings, - lang=lang, op=op, processor=processor, session=session) + message = self._prepare_message(gremlin, + bindings=bindings, + lang=lang, + op=op, + processor=processor, + session=session) message = json.dumps(message) if binary: message = self._set_message_header(message, mime_type) @@ -71,8 +77,8 @@ class GremlinWriter: return b"".join([mime_len, mime_type, bytes(message, "utf-8")]) @staticmethod - def _prepare_message(gremlin, bindings=None, lang="gremlin-groovy", op="eval", - processor="", session=None): + def _prepare_message(gremlin, bindings=None, lang="gremlin-groovy", + op="eval", processor="", session=None): message = { "requestId": str(uuid.uuid4()), "op": op, diff --git a/benchmark.py b/benchmark.py index 2e80803e35e22493174580c619a9e18e691530b8..8310dd949092cba2c069c8af8be84536fecfaf0c 100644 --- a/benchmark.py +++ b/benchmark.py @@ -89,10 +89,6 @@ ARGS.add_argument( '-w', '--warmups', action="store", nargs='?', type=int, default=5, help='num warmups (default: `%(default)s`)') -ARGS.add_argument( - '-s', '--session', action="store", - nargs='?', type=str, default="false", - help='use session to establish connections (default: `%(default)s`)') if __name__ == "__main__": @@ -102,20 +98,16 @@ if __name__ == "__main__": concurr = args.concurrency poolsize = args.poolsize num_warmups = args.warmups - session = args.session loop = asyncio.get_event_loop() t1 = loop.time() - if session == "true": - factory = aiogremlin.WebSocketSession() - else: - factory = aiogremlin.AiohttpFactory() + factory = aiogremlin.GremlinFactory() client = loop.run_until_complete( aiogremlin.create_client(loop=loop, factory=factory, poolsize=poolsize)) t2 = loop.time() print("time to establish conns: {}".format(t2 - t1)) try: - print("Runs: {}. Warmups: {}. Messages: {}. Concurrency: {}. Poolsize: {}. Use Session: {}".format( - num_tests, num_warmups, num_mssg, concurr, poolsize, session)) + print("Runs: {}. Warmups: {}. Messages: {}. Concurrency: {}. Poolsize: {}.".format( + num_tests, num_warmups, num_mssg, concurr, poolsize)) main = main(client, num_tests, num_mssg, concurr, num_warmups, loop) loop.run_until_complete(main) finally: diff --git a/benchmarks/__init__.py b/benchmarks/__init__.py deleted file mode 100644 index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..0000000000000000000000000000000000000000 diff --git a/benchmarks/client_bench.py b/benchmarks/client_bench.py deleted file mode 100644 index 7fb8843ce89fc2cd19ecd083b38d985dcb1a6168..0000000000000000000000000000000000000000 --- a/benchmarks/client_bench.py +++ /dev/null @@ -1,42 +0,0 @@ -import asyncio -import collections - -from aiogremlin import GremlinClient - - -@asyncio.coroutine -def attack(loop): - - client = GremlinClient(loop=loop, poolsize=1000) - execute = client.execute - - out_times = collections.deque() - processed_count = 0 - - @asyncio.coroutine - def do_bomb(): - nonlocal processed_count - try: - t1 = loop.time() - resp = yield from execute("1 + 1") - assert resp[0].status_code == 200, resp[0].status_code - t2 = loop.time() - out_times.append(t2 - t1) - processed_count += 1 - except Exception: - print("an exception occurred {}".format(resp[0].status_code)) - - bombers = [] - for i in range(10000): - bomber = asyncio.async(do_bomb()) - bombers.append(bomber) - - t1 = loop.time() - yield from asyncio.gather(*bombers, loop=loop) - t2 = loop.time() - rps = processed_count / (t2 - t1) - print("Benchmark complete: {} rps".format(rps)) - -if __name__ == "__main__": - loop = asyncio.get_event_loop() - loop.run_until_complete(attack(loop)) diff --git a/changes.txt b/changes.txt index 4f67059143f40430de765e6c669a0a09d3dee547..20e3974f74d84f019b53f17d06e2f21d4edaf6a3 100644 --- a/changes.txt +++ b/changes.txt @@ -3,3 +3,5 @@ 0.0.3 - 5/2/2015: Using ujson for serialization. 0.0.4 - 5/12/2015: Added support for sessions. 0.0.5 - 5/13/2015: Using EofStream terminator technique to prepare for 3.0.0.M9 +0.0.7 - 5/20/2015: Full integration with aiohttp. Added Session object for HTTP + connection pooling. Added a context manager. diff --git a/conn_bench.py b/conn_bench.py index d05cff0d8e03551ffe00476832c2bd3c3e477f8d..c099cdc29943fdf7f25d605b5e990b8e3a7c4711 100644 --- a/conn_bench.py +++ b/conn_bench.py @@ -46,7 +46,7 @@ if __name__ == "__main__": t1 = loop.time() loop.run_until_complete(asyncio.async(asyncio.gather(*tasks, loop=loop))) t2 = loop.time() - print("avg: time to establish conn: {}".format((t2 - t1) / (tests * 50))) + print("avg: time to establish conn: {}".format((t2 - t1) / (tests * 100))) m2 = loop.time() print("time to establish conns: {}".format((m2 - m1))) print("avg time to establish conns: {}".format((m2 - m1) / (tests * 100 * 50))) diff --git a/setup.py b/setup.py index 67899ee9d7e2c51af75d81e2a73df6c51864bc49..8e12c39656b926bdf5245d1bb3e56b67ee76ba73 100644 --- a/setup.py +++ b/setup.py @@ -3,7 +3,7 @@ from setuptools import setup setup( name="aiogremlin", - version="0.0.6", + version="0.0.7", url="", license="MIT", author="davebshow", diff --git a/tests/tests.py b/tests/tests.py index f9cf0959168b5cbda47632b4a1cef1087226e31e..4fb908f6ca309b23ea85a71ed4ad469dbf682edf 100644 --- a/tests/tests.py +++ b/tests/tests.py @@ -6,7 +6,7 @@ import itertools import unittest import uuid from aiogremlin import (GremlinClient, RequestError, GremlinServerError, - SocketClientError, WebSocketPool, AiohttpFactory, create_client, + SocketClientError, WebSocketPool, GremlinFactory, create_client, GremlinWriter, GremlinResponse, WebSocketSession) @@ -32,8 +32,11 @@ class GremlinClientTests(unittest.TestCase): self.loop.run_until_complete(conn.close()) def test_sub(self): - execute = self.gc.execute("x + x", bindings={"x": 4}) - results = self.loop.run_until_complete(execute) + @asyncio.coroutine + def go(): + resp = yield from self.gc.execute("x + x", bindings={"x": 4}) + return resp + results = self.loop.run_until_complete(go()) self.assertEqual(results[0].data[0], 8) @@ -42,8 +45,8 @@ class GremlinClientPoolTests(unittest.TestCase): def setUp(self): self.loop = asyncio.new_event_loop() asyncio.set_event_loop(None) - self.gc = GremlinClient("ws://localhost:8182/", - factory=AiohttpFactory(), pool=WebSocketPool("ws://localhost:8182/", + self.gc = GremlinClient(url="ws://localhost:8182/", + factory=GremlinFactory(), pool=WebSocketPool("ws://localhost:8182/", loop=self.loop), loop=self.loop) @@ -131,8 +134,11 @@ class WebSocketPoolTests(unittest.TestCase): def setUp(self): self.loop = asyncio.new_event_loop() asyncio.set_event_loop(None) - self.pool = WebSocketPool(poolsize=2, timeout=1, loop=self.loop, - factory=AiohttpFactory()) + self.pool = WebSocketPool("ws://localhost:8182/", + poolsize=2, + timeout=1, + loop=self.loop, + factory=GremlinFactory()) def tearDown(self): self.loop.run_until_complete(self.pool.close()) @@ -225,13 +231,28 @@ class ContextMngrTest(unittest.TestCase): def setUp(self): self.loop = asyncio.new_event_loop() asyncio.set_event_loop(None) - self.pool = WebSocketPool(poolsize=1, loop=self.loop, - factory=AiohttpFactory, max_retries=0) + self.pool = WebSocketPool("ws://localhost:8182/", + poolsize=1, + loop=self.loop, + factory=GremlinFactory(), + max_retries=0) def tearDown(self): self.loop.run_until_complete(self.pool.close()) self.loop.close() + @asyncio.coroutine + def _check_closed(self): + conn = self.pool._pool.get_nowait() + self.assertTrue(conn.closed) + writer = GremlinWriter(conn) + try: + conn = yield from writer.write("1 + 1") + error = False + except RuntimeError: + error = True + self.assertTrue(error) + def test_connection_manager(self): results = [] @asyncio.coroutine @@ -239,23 +260,53 @@ class ContextMngrTest(unittest.TestCase): with (yield from self.pool) as conn: writer = GremlinWriter(conn) conn = writer.write("1 + 1") - resp = GremlinResponse(conn, self.pool, loop=self.loop) + resp = GremlinResponse(conn, pool=self.pool, loop=self.loop) while True: mssg = yield from resp.stream.read() if mssg is None: break results.append(mssg) - conn = self.pool._pool.get_nowait() - self.assertTrue(conn.closed) - writer = GremlinWriter(conn) - try: - conn = yield from writer.write("1 + 1") - error = False - except RuntimeError: - error = True - self.assertTrue(error) + # Test that connection was closed + yield from self._check_closed() + self.loop.run_until_complete(go()) + + def test_connection_manager_with_client(self): + @asyncio.coroutine + def go(): + with (yield from self.pool) as conn: + gc = GremlinClient(connection=conn, loop=self.loop) + resp = yield from gc.execute("1 + 1") + self.assertEqual(resp[0].data[0], 2) + self.pool.release(conn) + # Test that connection was closed + yield from self._check_closed() self.loop.run_until_complete(go()) + def test_connection_manager_error(self): + results = [] + @asyncio.coroutine + def go(): + with (yield from self.pool) as conn: + writer = GremlinWriter(conn) + conn = writer.write("1 + 1 sdalfj;sd") + resp = GremlinResponse(conn, pool=self.pool, loop=self.loop) + try: + while True: + mssg = yield from resp.stream.read() + if mssg is None: + break + results.append(mssg) + except: + self.pool.release(conn) + raise + try: + self.loop.run_until_complete(go()) + err = False + except: + err = True + self.assertTrue(err) + self.loop.run_until_complete(self._check_closed()) + class GremlinClientPoolSessionTests(unittest.TestCase): @@ -264,7 +315,7 @@ class GremlinClientPoolSessionTests(unittest.TestCase): asyncio.set_event_loop(None) self.gc = GremlinClient("ws://localhost:8182/", pool=WebSocketPool("ws://localhost:8182/", loop=self.loop, - factory=WebSocketSession(loop=self.loop)), + factory=WebSocketSession(loop=self.loop)), loop=self.loop) def tearDown(self):