Skip to content
Snippets Groups Projects
Commit aa7dbd82 authored by davebshow's avatar davebshow
Browse files

added pool init and client constructor, 0.0.2 release

parent c0a80906
No related branches found
No related tags found
No related merge requests found
# aiogremlin 0.0.1 [(gizmo grew up)](https://pypi.python.org/pypi/gizmo/0.1.12)
# aiogremlin 0.0.2 [(gizmo grew up)](https://pypi.python.org/pypi/gizmo/0.1.12)
`aiogremlin` is a **Python 3** driver for the the [Tinkerpop 3 Gremlin Server](http://www.tinkerpop.com/docs/3.0.0.M7/#gremlin-server). This module is built on [Asyncio](https://docs.python.org/3/library/asyncio.html). By default it uses the [aiohttp](http://aiohttp.readthedocs.org/en/v0.15.3/index.html) websocket client , but it is easy to plug in a different implementation. `aiogremlin` is currently in **alpha** mode, but all major functionality has test coverage.
......@@ -30,6 +30,8 @@ The Gremlin Server responds with messages in chunks, `GremlinClient.submit` subm
```python
>>> loop = asyncio.get_event_loop()
>>> gc = GremlinClient('ws://localhost:8182/', loop=loop)
# Or even better, use the constructor function. This will init pool connections.
>>> gc = create_client('ws://localhost:8182/', loop=loop)
# Use get.
>>> @asyncio.coroutine
......@@ -37,16 +39,17 @@ The Gremlin Server responds with messages in chunks, `GremlinClient.submit` subm
... resp = yield from gc.submit("x + x", bindings={"x": 4})
... result = yield from resp.get()
... return result
>>> result = loop.run_until_complete(get(gc))
>>> result
[Message(status_code=200, data=[8], message={}, metadata='')]
>>> resp = result[0]
>>> resp.status_code
200
>>> resp # Named tuple.
Message(status_code=200, data=[8], message={}, metadata='')
>>> loop.run_until_complete(gc.close()) # Explicitly close client!!!
>>> loop.close()
# Use stream.
>>> @asyncio.coroutine
......@@ -59,6 +62,9 @@ Message(status_code=200, data=[8], message={}, metadata='')
... print(result)
>>> loop.run_until_complete(stream(gc))
Message(status_code=200, data=[2], message={}, metadata='')
>>> loop.run_until_complete(gc.close()) # Explicitly close client!!!
>>> loop.close()
```
For convenience, `aiogremlin` also provides a method `execute`, which is equivalent to calling `yield from submit()` and then `yield from get()` in the same coroutine.
......
from .abc import AbstractFactory, AbstractConnection
from .connection import WebsocketPool, AiohttpFactory
from .client import GremlinClient
from .client import GremlinClient, create_client
from .exceptions import RequestError, GremlinServerError, SocketClientError
__version__ = "0.0.1"
__version__ = "0.0.2"
......@@ -47,3 +47,7 @@ class AbstractConnection(metaclass=ABCMeta):
@abstractmethod
def receive(self):
pass
@abstractmethod
def _receive(self):
pass
......@@ -10,6 +10,30 @@ from aiogremlin.log import client_logger
from aiogremlin.protocol import gremlin_response_parser, GremlinWriter
@asyncio.coroutine
def create_client(uri='ws://localhost:8182/', loop=None, ssl=None,
protocol=None, lang="gremlin-groovy", op="eval",
processor="", pool=None, factory=None, poolsize=10,
timeout=None, **kwargs):
pool = WebsocketPool(uri,
factory=factory,
poolsize=poolsize,
timeout=timeout,
loop=loop)
yield from pool.init_pool()
return GremlinClient(uri=uri,
loop=loop,
ssl=ssl,
protocol=protocol,
lang=lang,
op=op,
processor=processor,
pool=pool,
factory=factory)
class GremlinClient:
def __init__(self, uri='ws://localhost:8182/', loop=None, ssl=None,
......
......@@ -28,6 +28,13 @@ class WebsocketPool:
if verbose:
conn_logger.setLevel(INFO)
@asyncio.coroutine
def init_pool(self):
for i in range(self.poolsize):
conn = yield from self.factory.connect(self.uri, pool=self,
loop=self._loop)
self._put(conn)
@property
def loop(self):
return self._loop
......@@ -85,8 +92,7 @@ class WebsocketPool:
if not self.pool.empty():
socket = self.pool.get_nowait()
conn_logger.info("Reusing socket: {} at {}".format(socket, uri))
elif (self.num_active_conns + self.num_connecting >= self.poolsize or
not self.poolsize):
elif self.num_active_conns + self.num_connecting >= self.poolsize:
conn_logger.info("Waiting for socket...")
socket = yield from asyncio.wait_for(self.pool.get(),
self.timeout, loop=loop)
......@@ -229,7 +235,7 @@ class AiohttpConnection(BaseConnection):
conn_logger.warn('Pong received')
else:
try:
if message.tp == aiohttp.MsgType.release:
if message.tp == aiohttp.MsgType.close:
conn_logger.warn("Socket connection closed by server.")
elif message.tp == aiohttp.MsgType.error:
raise SocketClientError(self.socket.exception())
......
"""Simple benchmark based on aiohttp benchmark client."""
import asyncio
from aiogremlin import GremlinClient
@asyncio.coroutine
def attack(loop):
client = GremlinClient(loop=loop, poolsize=10)
execute = client.execute
processed_count = 0
@asyncio.coroutine
def drop_bomb():
nonlocal processed_count
try:
t1 = loop.time()
resp = yield from execute("1 + 1")
assert resp[0].status_code == 200, resp[0].status_code
t2 = loop.time()
processed_count += 1
except Exception:
print("an exception occurred {}".format(resp[0].status_code))
bombers = []
append = bombers.append
async = asyncio.async
for i in range(10000):
bomber = async(drop_bomb())
append(bomber)
t1 = loop.time()
yield from asyncio.gather(*bombers, loop=loop)
t2 = loop.time()
rps = processed_count / (t2 - t1)
print("Benchmark complete: {} rps".format(rps))
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(attack(loop))
0.0.1 - 4/2015: Birth!
0.0.2 - 5/1/2015: Added an init_pool method and a create_client constructor.
......@@ -3,7 +3,7 @@ from setuptools import setup
setup(
name="aiogremlin",
version="0.0.1",
version="0.0.2",
url="",
license="MIT",
author="davebshow",
......
......@@ -5,9 +5,9 @@ import asyncio
import itertools
import unittest
from aiogremlin import (GremlinClient, RequestError, GremlinServerError,
SocketClientError, WebsocketPool, AiohttpFactory)
SocketClientError, WebsocketPool, AiohttpFactory, create_client)
#
class GremlinClientTests(unittest.TestCase):
def setUp(self):
......@@ -39,7 +39,7 @@ class GremlinClientTests(unittest.TestCase):
sub1 = self.gc.execute("x + x", bindings={"x": 1})
sub2 = self.gc.execute("x + x", bindings={"x": 2})
sub3 = self.gc.execute("x + x", bindings={"x": 4})
coro = asyncio.wait([asyncio.async(sub1, loop=self.loop),
coro = asyncio.gather(*[asyncio.async(sub1, loop=self.loop),
asyncio.async(sub2, loop=self.loop),
asyncio.async(sub3, loop=self.loop)], loop=self.loop)
# Here I am looking for resource warnings.
......@@ -181,5 +181,19 @@ class WebsocketPoolTests(unittest.TestCase):
self.loop.run_until_complete(conn())
class CreateClientTests(unittest.TestCase):
def test_pool_init(self):
@asyncio.coroutine
def go(loop):
gc = yield from create_client(poolsize=10, loop=loop)
self.assertEqual(gc.pool.pool.qsize(), 10)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(None)
loop.run_until_complete(go(loop))
loop.close()
if __name__ == "__main__":
unittest.main()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment