diff --git a/README.md b/README.md index 785e0ebb64f0c215b9812871e5a049195bcecfe1..d3e03ba58e3010fa7d31b528ab875a2074573b44 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# aiogremlin 0.0.1 [(gizmo grew up)](https://pypi.python.org/pypi/gizmo/0.1.12) +# aiogremlin 0.0.2 [(gizmo grew up)](https://pypi.python.org/pypi/gizmo/0.1.12) `aiogremlin` is a **Python 3** driver for the the [Tinkerpop 3 Gremlin Server](http://www.tinkerpop.com/docs/3.0.0.M7/#gremlin-server). This module is built on [Asyncio](https://docs.python.org/3/library/asyncio.html). By default it uses the [aiohttp](http://aiohttp.readthedocs.org/en/v0.15.3/index.html) websocket client , but it is easy to plug in a different implementation. `aiogremlin` is currently in **alpha** mode, but all major functionality has test coverage. @@ -30,6 +30,8 @@ The Gremlin Server responds with messages in chunks, `GremlinClient.submit` subm ```python >>> loop = asyncio.get_event_loop() >>> gc = GremlinClient('ws://localhost:8182/', loop=loop) +# Or even better, use the constructor function. This will init pool connections. +>>> gc = create_client('ws://localhost:8182/', loop=loop) # Use get. >>> @asyncio.coroutine @@ -37,16 +39,17 @@ The Gremlin Server responds with messages in chunks, `GremlinClient.submit` subm ... resp = yield from gc.submit("x + x", bindings={"x": 4}) ... result = yield from resp.get() ... return result + >>> result = loop.run_until_complete(get(gc)) >>> result [Message(status_code=200, data=[8], message={}, metadata='')] + >>> resp = result[0] >>> resp.status_code 200 + >>> resp # Named tuple. Message(status_code=200, data=[8], message={}, metadata='') ->>> loop.run_until_complete(gc.close()) # Explicitly close client!!! ->>> loop.close() # Use stream. >>> @asyncio.coroutine @@ -59,6 +62,9 @@ Message(status_code=200, data=[8], message={}, metadata='') ... print(result) >>> loop.run_until_complete(stream(gc)) Message(status_code=200, data=[2], message={}, metadata='') + +>>> loop.run_until_complete(gc.close()) # Explicitly close client!!! +>>> loop.close() ``` For convenience, `aiogremlin` also provides a method `execute`, which is equivalent to calling `yield from submit()` and then `yield from get()` in the same coroutine. diff --git a/aiogremlin/__init__.py b/aiogremlin/__init__.py index a59a7b8a139ab0a971d06ef54f46af23e169c2f8..f4ca236c0788834382ee783c44aa81625c24effa 100644 --- a/aiogremlin/__init__.py +++ b/aiogremlin/__init__.py @@ -1,5 +1,5 @@ from .abc import AbstractFactory, AbstractConnection from .connection import WebsocketPool, AiohttpFactory -from .client import GremlinClient +from .client import GremlinClient, create_client from .exceptions import RequestError, GremlinServerError, SocketClientError -__version__ = "0.0.1" +__version__ = "0.0.2" diff --git a/aiogremlin/abc.py b/aiogremlin/abc.py index 4f4fc14a5a9823aea0c53a69b1b9b1b3ee4b54ea..9cdea94bba5b525de4e267e4b5b1df60fbac6f29 100644 --- a/aiogremlin/abc.py +++ b/aiogremlin/abc.py @@ -47,3 +47,7 @@ class AbstractConnection(metaclass=ABCMeta): @abstractmethod def receive(self): pass + + @abstractmethod + def _receive(self): + pass diff --git a/aiogremlin/client.py b/aiogremlin/client.py index 9bcc20c1707167d91d234a3571d8c52c2342c8c5..938ecb8948d7249426e4d736f0ccfeaccc66640a 100644 --- a/aiogremlin/client.py +++ b/aiogremlin/client.py @@ -10,6 +10,30 @@ from aiogremlin.log import client_logger from aiogremlin.protocol import gremlin_response_parser, GremlinWriter +@asyncio.coroutine +def create_client(uri='ws://localhost:8182/', loop=None, ssl=None, + protocol=None, lang="gremlin-groovy", op="eval", + processor="", pool=None, factory=None, poolsize=10, + timeout=None, **kwargs): + pool = WebsocketPool(uri, + factory=factory, + poolsize=poolsize, + timeout=timeout, + loop=loop) + + yield from pool.init_pool() + + return GremlinClient(uri=uri, + loop=loop, + ssl=ssl, + protocol=protocol, + lang=lang, + op=op, + processor=processor, + pool=pool, + factory=factory) + + class GremlinClient: def __init__(self, uri='ws://localhost:8182/', loop=None, ssl=None, diff --git a/aiogremlin/connection.py b/aiogremlin/connection.py index 6797d7587c85f7f3c0837c153b251c7ad8c4b001..e2e81883f84c2f3454581ad18fea275e516f5c53 100644 --- a/aiogremlin/connection.py +++ b/aiogremlin/connection.py @@ -28,6 +28,13 @@ class WebsocketPool: if verbose: conn_logger.setLevel(INFO) + @asyncio.coroutine + def init_pool(self): + for i in range(self.poolsize): + conn = yield from self.factory.connect(self.uri, pool=self, + loop=self._loop) + self._put(conn) + @property def loop(self): return self._loop @@ -85,8 +92,7 @@ class WebsocketPool: if not self.pool.empty(): socket = self.pool.get_nowait() conn_logger.info("Reusing socket: {} at {}".format(socket, uri)) - elif (self.num_active_conns + self.num_connecting >= self.poolsize or - not self.poolsize): + elif self.num_active_conns + self.num_connecting >= self.poolsize: conn_logger.info("Waiting for socket...") socket = yield from asyncio.wait_for(self.pool.get(), self.timeout, loop=loop) @@ -229,7 +235,7 @@ class AiohttpConnection(BaseConnection): conn_logger.warn('Pong received') else: try: - if message.tp == aiohttp.MsgType.release: + if message.tp == aiohttp.MsgType.close: conn_logger.warn("Socket connection closed by server.") elif message.tp == aiohttp.MsgType.error: raise SocketClientError(self.socket.exception()) diff --git a/benchmarks/__init__.py b/benchmarks/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/benchmarks/benchmark.py b/benchmarks/benchmark.py new file mode 100644 index 0000000000000000000000000000000000000000..9da80d7c094cd8a071f914bb691f211278061e19 --- /dev/null +++ b/benchmarks/benchmark.py @@ -0,0 +1,44 @@ +"""Simple benchmark based on aiohttp benchmark client.""" + +import asyncio + +from aiogremlin import GremlinClient + + +@asyncio.coroutine +def attack(loop): + + client = GremlinClient(loop=loop, poolsize=10) + execute = client.execute + + processed_count = 0 + + @asyncio.coroutine + def drop_bomb(): + nonlocal processed_count + try: + t1 = loop.time() + resp = yield from execute("1 + 1") + assert resp[0].status_code == 200, resp[0].status_code + t2 = loop.time() + processed_count += 1 + except Exception: + print("an exception occurred {}".format(resp[0].status_code)) + + bombers = [] + append = bombers.append + async = asyncio.async + for i in range(10000): + bomber = async(drop_bomb()) + append(bomber) + + t1 = loop.time() + yield from asyncio.gather(*bombers, loop=loop) + t2 = loop.time() + rps = processed_count / (t2 - t1) + print("Benchmark complete: {} rps".format(rps)) + + +if __name__ == "__main__": + loop = asyncio.get_event_loop() + loop.run_until_complete(attack(loop)) diff --git a/changes.txt b/changes.txt new file mode 100644 index 0000000000000000000000000000000000000000..c644f7954483aeb16107095a674d4cc0bd3a9662 --- /dev/null +++ b/changes.txt @@ -0,0 +1,2 @@ +0.0.1 - 4/2015: Birth! +0.0.2 - 5/1/2015: Added an init_pool method and a create_client constructor. diff --git a/setup.py b/setup.py index 5afcd199360e79933edb1a124753919b5ed5a3ab..3507070e3a7739b36b684a7137d32f68d9d4a1f3 100644 --- a/setup.py +++ b/setup.py @@ -3,7 +3,7 @@ from setuptools import setup setup( name="aiogremlin", - version="0.0.1", + version="0.0.2", url="", license="MIT", author="davebshow", diff --git a/tests/tests.py b/tests/tests.py index 8830efcc9e160685133ee0a13d1a8ff1bf8607cf..bd5c7ca0a4c4dbf783aaef611107ee1b178de446 100644 --- a/tests/tests.py +++ b/tests/tests.py @@ -5,9 +5,9 @@ import asyncio import itertools import unittest from aiogremlin import (GremlinClient, RequestError, GremlinServerError, - SocketClientError, WebsocketPool, AiohttpFactory) - + SocketClientError, WebsocketPool, AiohttpFactory, create_client) +# class GremlinClientTests(unittest.TestCase): def setUp(self): @@ -39,7 +39,7 @@ class GremlinClientTests(unittest.TestCase): sub1 = self.gc.execute("x + x", bindings={"x": 1}) sub2 = self.gc.execute("x + x", bindings={"x": 2}) sub3 = self.gc.execute("x + x", bindings={"x": 4}) - coro = asyncio.wait([asyncio.async(sub1, loop=self.loop), + coro = asyncio.gather(*[asyncio.async(sub1, loop=self.loop), asyncio.async(sub2, loop=self.loop), asyncio.async(sub3, loop=self.loop)], loop=self.loop) # Here I am looking for resource warnings. @@ -181,5 +181,19 @@ class WebsocketPoolTests(unittest.TestCase): self.loop.run_until_complete(conn()) +class CreateClientTests(unittest.TestCase): + + def test_pool_init(self): + @asyncio.coroutine + def go(loop): + gc = yield from create_client(poolsize=10, loop=loop) + self.assertEqual(gc.pool.pool.qsize(), 10) + + loop = asyncio.new_event_loop() + asyncio.set_event_loop(None) + loop.run_until_complete(go(loop)) + loop.close() + + if __name__ == "__main__": unittest.main()