From aa7dbd8261a7cf68bb55fc84d992a686bdf36266 Mon Sep 17 00:00:00 2001 From: davebshow <davebshow@gmail.com> Date: Fri, 1 May 2015 19:29:02 -0400 Subject: [PATCH] added pool init and client constructor, 0.0.2 release --- README.md | 12 ++++++++--- aiogremlin/__init__.py | 4 ++-- aiogremlin/abc.py | 4 ++++ aiogremlin/client.py | 24 ++++++++++++++++++++++ aiogremlin/connection.py | 12 ++++++++--- benchmarks/__init__.py | 0 benchmarks/benchmark.py | 44 ++++++++++++++++++++++++++++++++++++++++ changes.txt | 2 ++ setup.py | 2 +- tests/tests.py | 20 +++++++++++++++--- 10 files changed, 112 insertions(+), 12 deletions(-) create mode 100644 benchmarks/__init__.py create mode 100644 benchmarks/benchmark.py create mode 100644 changes.txt diff --git a/README.md b/README.md index 785e0eb..d3e03ba 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# aiogremlin 0.0.1 [(gizmo grew up)](https://pypi.python.org/pypi/gizmo/0.1.12) +# aiogremlin 0.0.2 [(gizmo grew up)](https://pypi.python.org/pypi/gizmo/0.1.12) `aiogremlin` is a **Python 3** driver for the the [Tinkerpop 3 Gremlin Server](http://www.tinkerpop.com/docs/3.0.0.M7/#gremlin-server). This module is built on [Asyncio](https://docs.python.org/3/library/asyncio.html). By default it uses the [aiohttp](http://aiohttp.readthedocs.org/en/v0.15.3/index.html) websocket client , but it is easy to plug in a different implementation. `aiogremlin` is currently in **alpha** mode, but all major functionality has test coverage. @@ -30,6 +30,8 @@ The Gremlin Server responds with messages in chunks, `GremlinClient.submit` subm ```python >>> loop = asyncio.get_event_loop() >>> gc = GremlinClient('ws://localhost:8182/', loop=loop) +# Or even better, use the constructor function. This will init pool connections. +>>> gc = create_client('ws://localhost:8182/', loop=loop) # Use get. >>> @asyncio.coroutine @@ -37,16 +39,17 @@ The Gremlin Server responds with messages in chunks, `GremlinClient.submit` subm ... resp = yield from gc.submit("x + x", bindings={"x": 4}) ... result = yield from resp.get() ... return result + >>> result = loop.run_until_complete(get(gc)) >>> result [Message(status_code=200, data=[8], message={}, metadata='')] + >>> resp = result[0] >>> resp.status_code 200 + >>> resp # Named tuple. Message(status_code=200, data=[8], message={}, metadata='') ->>> loop.run_until_complete(gc.close()) # Explicitly close client!!! ->>> loop.close() # Use stream. >>> @asyncio.coroutine @@ -59,6 +62,9 @@ Message(status_code=200, data=[8], message={}, metadata='') ... print(result) >>> loop.run_until_complete(stream(gc)) Message(status_code=200, data=[2], message={}, metadata='') + +>>> loop.run_until_complete(gc.close()) # Explicitly close client!!! +>>> loop.close() ``` For convenience, `aiogremlin` also provides a method `execute`, which is equivalent to calling `yield from submit()` and then `yield from get()` in the same coroutine. diff --git a/aiogremlin/__init__.py b/aiogremlin/__init__.py index a59a7b8..f4ca236 100644 --- a/aiogremlin/__init__.py +++ b/aiogremlin/__init__.py @@ -1,5 +1,5 @@ from .abc import AbstractFactory, AbstractConnection from .connection import WebsocketPool, AiohttpFactory -from .client import GremlinClient +from .client import GremlinClient, create_client from .exceptions import RequestError, GremlinServerError, SocketClientError -__version__ = "0.0.1" +__version__ = "0.0.2" diff --git a/aiogremlin/abc.py b/aiogremlin/abc.py index 4f4fc14..9cdea94 100644 --- a/aiogremlin/abc.py +++ b/aiogremlin/abc.py @@ -47,3 +47,7 @@ class AbstractConnection(metaclass=ABCMeta): @abstractmethod def receive(self): pass + + @abstractmethod + def _receive(self): + pass diff --git a/aiogremlin/client.py b/aiogremlin/client.py index 9bcc20c..938ecb8 100644 --- a/aiogremlin/client.py +++ b/aiogremlin/client.py @@ -10,6 +10,30 @@ from aiogremlin.log import client_logger from aiogremlin.protocol import gremlin_response_parser, GremlinWriter +@asyncio.coroutine +def create_client(uri='ws://localhost:8182/', loop=None, ssl=None, + protocol=None, lang="gremlin-groovy", op="eval", + processor="", pool=None, factory=None, poolsize=10, + timeout=None, **kwargs): + pool = WebsocketPool(uri, + factory=factory, + poolsize=poolsize, + timeout=timeout, + loop=loop) + + yield from pool.init_pool() + + return GremlinClient(uri=uri, + loop=loop, + ssl=ssl, + protocol=protocol, + lang=lang, + op=op, + processor=processor, + pool=pool, + factory=factory) + + class GremlinClient: def __init__(self, uri='ws://localhost:8182/', loop=None, ssl=None, diff --git a/aiogremlin/connection.py b/aiogremlin/connection.py index 6797d75..e2e8188 100644 --- a/aiogremlin/connection.py +++ b/aiogremlin/connection.py @@ -28,6 +28,13 @@ class WebsocketPool: if verbose: conn_logger.setLevel(INFO) + @asyncio.coroutine + def init_pool(self): + for i in range(self.poolsize): + conn = yield from self.factory.connect(self.uri, pool=self, + loop=self._loop) + self._put(conn) + @property def loop(self): return self._loop @@ -85,8 +92,7 @@ class WebsocketPool: if not self.pool.empty(): socket = self.pool.get_nowait() conn_logger.info("Reusing socket: {} at {}".format(socket, uri)) - elif (self.num_active_conns + self.num_connecting >= self.poolsize or - not self.poolsize): + elif self.num_active_conns + self.num_connecting >= self.poolsize: conn_logger.info("Waiting for socket...") socket = yield from asyncio.wait_for(self.pool.get(), self.timeout, loop=loop) @@ -229,7 +235,7 @@ class AiohttpConnection(BaseConnection): conn_logger.warn('Pong received') else: try: - if message.tp == aiohttp.MsgType.release: + if message.tp == aiohttp.MsgType.close: conn_logger.warn("Socket connection closed by server.") elif message.tp == aiohttp.MsgType.error: raise SocketClientError(self.socket.exception()) diff --git a/benchmarks/__init__.py b/benchmarks/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/benchmarks/benchmark.py b/benchmarks/benchmark.py new file mode 100644 index 0000000..9da80d7 --- /dev/null +++ b/benchmarks/benchmark.py @@ -0,0 +1,44 @@ +"""Simple benchmark based on aiohttp benchmark client.""" + +import asyncio + +from aiogremlin import GremlinClient + + +@asyncio.coroutine +def attack(loop): + + client = GremlinClient(loop=loop, poolsize=10) + execute = client.execute + + processed_count = 0 + + @asyncio.coroutine + def drop_bomb(): + nonlocal processed_count + try: + t1 = loop.time() + resp = yield from execute("1 + 1") + assert resp[0].status_code == 200, resp[0].status_code + t2 = loop.time() + processed_count += 1 + except Exception: + print("an exception occurred {}".format(resp[0].status_code)) + + bombers = [] + append = bombers.append + async = asyncio.async + for i in range(10000): + bomber = async(drop_bomb()) + append(bomber) + + t1 = loop.time() + yield from asyncio.gather(*bombers, loop=loop) + t2 = loop.time() + rps = processed_count / (t2 - t1) + print("Benchmark complete: {} rps".format(rps)) + + +if __name__ == "__main__": + loop = asyncio.get_event_loop() + loop.run_until_complete(attack(loop)) diff --git a/changes.txt b/changes.txt new file mode 100644 index 0000000..c644f79 --- /dev/null +++ b/changes.txt @@ -0,0 +1,2 @@ +0.0.1 - 4/2015: Birth! +0.0.2 - 5/1/2015: Added an init_pool method and a create_client constructor. diff --git a/setup.py b/setup.py index 5afcd19..3507070 100644 --- a/setup.py +++ b/setup.py @@ -3,7 +3,7 @@ from setuptools import setup setup( name="aiogremlin", - version="0.0.1", + version="0.0.2", url="", license="MIT", author="davebshow", diff --git a/tests/tests.py b/tests/tests.py index 8830efc..bd5c7ca 100644 --- a/tests/tests.py +++ b/tests/tests.py @@ -5,9 +5,9 @@ import asyncio import itertools import unittest from aiogremlin import (GremlinClient, RequestError, GremlinServerError, - SocketClientError, WebsocketPool, AiohttpFactory) - + SocketClientError, WebsocketPool, AiohttpFactory, create_client) +# class GremlinClientTests(unittest.TestCase): def setUp(self): @@ -39,7 +39,7 @@ class GremlinClientTests(unittest.TestCase): sub1 = self.gc.execute("x + x", bindings={"x": 1}) sub2 = self.gc.execute("x + x", bindings={"x": 2}) sub3 = self.gc.execute("x + x", bindings={"x": 4}) - coro = asyncio.wait([asyncio.async(sub1, loop=self.loop), + coro = asyncio.gather(*[asyncio.async(sub1, loop=self.loop), asyncio.async(sub2, loop=self.loop), asyncio.async(sub3, loop=self.loop)], loop=self.loop) # Here I am looking for resource warnings. @@ -181,5 +181,19 @@ class WebsocketPoolTests(unittest.TestCase): self.loop.run_until_complete(conn()) +class CreateClientTests(unittest.TestCase): + + def test_pool_init(self): + @asyncio.coroutine + def go(loop): + gc = yield from create_client(poolsize=10, loop=loop) + self.assertEqual(gc.pool.pool.qsize(), 10) + + loop = asyncio.new_event_loop() + asyncio.set_event_loop(None) + loop.run_until_complete(go(loop)) + loop.close() + + if __name__ == "__main__": unittest.main() -- GitLab