Commit bbc0affd authored by davebshow's avatar davebshow
Browse files

added context manager, full integration with aiohttp, added http client session

parent f9666b9c
# aiogremlin 0.0.6 [(gizmo grew up)](https://pypi.python.org/pypi/gizmo/0.1.12)
`aiogremlin` is a **Python 3** driver for the the [Tinkerpop 3 Gremlin Server](http://www.tinkerpop.com/docs/3.0.0.M7/#gremlin-server). This module is built on [Asyncio](https://docs.python.org/3/library/asyncio.html). By default it uses the [aiohttp](http://aiohttp.readthedocs.org/en/v0.15.3/index.html) websocket client , but it is easy to plug in a different implementation. `aiogremlin` is currently in **alpha** mode, but all major functionality has test coverage.
`aiogremlin` is a **Python 3** driver for the the [Tinkerpop 3 Gremlin Server](http://www.tinkerpop.com/docs/3.0.0.M7/#gremlin-server). This module is built on [Asyncio](https://docs.python.org/3/library/asyncio.html) and [aiohttp](http://aiohttp.readthedocs.org/en/v0.15.3/index.html) `aiogremlin` is currently in **alpha** mode, but all major functionality has test coverage.
## Getting started
......@@ -26,13 +26,15 @@ The `GremlinClient` communicates asynchronously with the Gremlin Server using we
The Gremlin Server responds with messages in chunks, `GremlinClient.submit` submits a script to the server, and returns a `GremlinResponse` object. This object provides the methods: `get` and the property `stream`. `get` collects all of the response messages and returns them as a Python list. `stream` returns an object of the type `GremlinResponseStream` that implements a method `read`. This allows you to read the response without loading all of the messages into memory.
Note that the GremlinClient constructor and the create_client function take [keyword only arguments](https://www.python.org/dev/peps/pep-3102/) only!
```python
>>> loop = asyncio.get_event_loop()
>>> gc = GremlinClient('ws://localhost:8182/', loop=loop)
>>> gc = GremlinClient(url='ws://localhost:8182/', loop=loop) # Default url
# Or even better, use the constructor function. This will init pool connections.
# ***Must be done inside a coroutine***
>>> gc = yield from create_client('ws://localhost:8182/', loop=loop)
>>> gc = yield from create_client(loop=loop)
# Or outside of a coroutine using the loop to help
>>> gc = loop.run_until_complete(create_client(loop=loop))
......@@ -74,9 +76,30 @@ For convenience, `aiogremlin` also provides a method `execute`, which is equival
```python
>>> loop = asyncio.get_event_loop()
>>> gc = GremlinClient('ws://localhost:8182/', loop=loop)
>>> gc = GremlinClient(loop=loop)
>>> execute = gc.execute("x + x", bindings={"x": 4})
>>> result = loop.run_until_complete(execute)
>>> result
[Message(status_code=200, data=[8], message={}, metadata='')]
>>> loop.run_until_complete(gc.close()) # Explicitly close client!!!
>>> loop.close()
```
To avoid the explicit client close, `aiogremlin` provides a class called `ConnectionContextManager`. To create an instance of `ConnectionContextManager`, get a connection from a `WebSocketPool` instance as follows:
```python
>>> loop = asyncio.get_event_loop()
# Note that url is a required positional param here!
# Pool does not open any connections until requested.
>>> pool = aiogremlin.WebSocketPool('ws://localhost:8182/')
>>> @asyncio.coroutine
... def go(pool, loop):
... with (yield from pool) as conn:
... gc = aiogremlin.GremlinClient(connection=conn, loop=loop)
... resp = yield from gc.execute("1 + 1")
... return resp
>>> result = loop.run_until_complete(go(pool, loop))
>>> result
[Message(status_code=200, data=[2], message={}, metadata='')]
>>> loop.close() # Close loop, but client connections will be closed.
```
from .abc import AbstractFactory, AbstractConnection
from .connection import (AiohttpFactory, BaseFactory, BaseConnection,
WebSocketSession)
from .client import (create_client, GremlinClient, GremlinResponse,
GremlinResponseStream)
from .exceptions import RequestError, GremlinServerError, SocketClientError
from .pool import WebSocketPool
from .protocol import GremlinWriter
__version__ = "0.0.6"
from .connection import *
from .client import *
from .exceptions import *
from .pool import *
from .protocol import *
__version__ = "0.0.7"
"""Abstract classes for creating pluggable websocket clients."""
from abc import ABCMeta, abstractmethod
class AbstractFactory(metaclass=ABCMeta):
@abstractmethod
def ws_connect(cls):
pass
class AbstractConnection(metaclass=ABCMeta):
@property
@abstractmethod
def closed(self):
pass
@abstractmethod
def close():
pass
@abstractmethod
def _close():
pass
@abstractmethod
def send(self):
pass
@abstractmethod
def receive(self):
pass
......@@ -5,30 +5,41 @@ import ssl
import aiohttp
from aiogremlin.connection import AiohttpFactory
from aiogremlin.connection import (GremlinFactory, WebSocketSession,
GremlinClientWebSocketResponse)
from aiogremlin.exceptions import RequestError
from aiogremlin.log import logger, INFO
from aiogremlin.pool import WebSocketPool
from aiogremlin.protocol import gremlin_response_parser, GremlinWriter
__all__ = ("create_client", "GremlinClient", "GremlinResponse",
"GremlinResponseStream")
@asyncio.coroutine
def create_client(uri='ws://localhost:8182/', loop=None, ssl=None,
def create_client(*, url='ws://localhost:8182/', loop=None,
protocol=None, lang="gremlin-groovy", op="eval",
processor="", pool=None, factory=None, poolsize=10,
timeout=None, verbose=False, **kwargs):
pool = WebSocketPool(uri,
factory=factory,
poolsize=poolsize,
timeout=timeout,
loop=loop,
verbose=verbose)
timeout=None, verbose=False, fill_pool=True, connector=None):
if factory is None:
factory = WebSocketSession(connector=connector,
ws_response_class=GremlinClientWebSocketResponse,
loop=loop)
yield from pool.fill_pool()
if pool is None:
pool = WebSocketPool(url,
factory=factory,
poolsize=poolsize,
timeout=timeout,
loop=loop,
verbose=verbose)
return GremlinClient(uri=uri,
if fill_pool:
yield from pool.fill_pool()
return GremlinClient(url=url,
loop=loop,
ssl=ssl,
protocol=protocol,
lang=lang,
op=op,
......@@ -40,21 +51,16 @@ def create_client(uri='ws://localhost:8182/', loop=None, ssl=None,
class GremlinClient:
def __init__(self, uri='ws://localhost:8182/', loop=None, ssl=None,
def __init__(self, url='ws://localhost:8182/', loop=None,
protocol=None, lang="gremlin-groovy", op="eval",
processor="", pool=None, factory=None, poolsize=10,
timeout=None, verbose=False, **kwargs):
timeout=None, verbose=False, connector=None,
connection=None):
"""
"""
self.uri = uri
self.url = url
self.ssl = ssl
self.protocol = protocol
# if self.ssl:
# protocol = protocol or ssl.PROTOCOL_TLSv1
# ssl_context = ssl.SSLContext(protocol)
# ssl_context.load_verify_locations(ssl)
# ssl_context.verify_mode = ssl.CERT_REQUIRED
# self.ssl_context = ssl_context # This will go to conn pool... use TCPConnector?
self._loop = loop or asyncio.get_event_loop()
self.lang = lang or "gremlin-groovy"
self.op = op or "eval"
......@@ -62,12 +68,18 @@ class GremlinClient:
self.poolsize = poolsize
self.timeout = timeout
self._pool = pool
self._factory = factory or AiohttpFactory()
if self._pool is None:
self._connector = connector
self._factory = factory or GremlinFactory(connector=self._connector)
self._conn = connection
if self._pool is None and self._conn is None:
self._connected = False
self._conn = asyncio.async(self._connect(), loop=self._loop)
else:
elif pool is not None:
self._connected = self._pool._connected
elif self._conn is not None and not connection._closed:
self._connected = True
else:
self._connected = False
if verbose:
logger.setLevel(INFO)
......@@ -86,16 +98,16 @@ class GremlinClient:
self._connected = False
@asyncio.coroutine
def _connect(self, **kwargs):
def _connect(self):
"""
"""
loop = kwargs.get("loop", "") or self._loop
connection = yield from self._factory.ws_connect(self.uri, loop=loop)
connection = yield from self._factory.ws_connect(self.url,
loop=self._loop)
self._connected = True
return connection
@asyncio.coroutine
def _acquire(self, **kwargs):
def _acquire(self):
if self._pool:
conn = yield from self._pool.acquire()
elif self._connected:
......@@ -111,26 +123,26 @@ class GremlinClient:
return conn
@asyncio.coroutine
def submit(self, gremlin, conn=None, bindings=None, lang=None, op=None,
processor=None, session=None, binary=True):
def submit(self, gremlin, *, connection=None, bindings=None, lang=None,
op=None, processor=None, session=None, binary=True):
"""
"""
lang = lang or self.lang
op = op or self.op
processor = processor or self.processor
if conn is None:
conn = yield from self._acquire()
writer = GremlinWriter(conn)
conn = writer.write(gremlin, bindings=bindings,
lang=lang, op=op, processor=processor, session=session,
binary=binary)
return GremlinResponse(conn,
if connection is None:
connection = yield from self._acquire()
writer = GremlinWriter(connection)
connection = writer.write(gremlin, bindings=bindings, lang=lang, op=op,
processor=processor, session=session,
binary=binary)
return GremlinResponse(connection,
pool=self._pool,
session=session,
loop=self._loop)
@asyncio.coroutine
def execute(self, gremlin, bindings=None, lang=None,
def execute(self, gremlin, *, bindings=None, lang=None,
op=None, processor=None, consumer=None, collect=True, **kwargs):
"""
"""
......@@ -138,13 +150,13 @@ class GremlinClient:
op = op or self.op
processor = processor or self.processor
resp = yield from self.submit(gremlin, bindings=bindings, lang=lang,
op=op, processor=processor)
op=op, processor=processor)
return (yield from resp.get())
class GremlinResponse:
def __init__(self, conn, pool=None, session=None, loop=None):
def __init__(self, conn, *, pool=None, session=None, loop=None):
self._loop = loop or asyncio.get_event_loop()
self._session = session
self._stream = GremlinResponseStream(conn, pool=pool, loop=self._loop)
......
......@@ -15,16 +15,33 @@ from aiohttp.websocket import (MSG_BINARY, MSG_TEXT, MSG_CLOSE, MSG_PING,
from aiohttp.websocket_client import (MsgType, closedMessage,
ClientWebSocketResponse)
from aiogremlin.abc import AbstractFactory, AbstractConnection
from aiogremlin.exceptions import SocketClientError
from aiogremlin.log import INFO, logger
__all__ = ('WebSocketSession', 'GremlinFactory',
'GremlinClientWebSocketResponse')
# Basically cut and paste from aiohttp until merge/release of #374
class WebSocketSession(ClientSession):
def __init__(self, *, connector=None, loop=None,
cookies=None, headers=None, auth=None,
ws_response_class=None):
super().__init__(connector=connector, loop=loop,
cookies=cookies, headers=headers, auth=auth)
self._ws_response_class = ws_response_class
class WebSocketSession(AbstractFactory, ClientSession):
@asyncio.coroutine
def ws_connect(self, url, protocols=(), timeout=10.0, connector=None,
response_class=None, autoclose=True, autoping=True,
def ws_connect(self, url, *,
protocols=(),
timeout=10.0,
autoclose=True,
autoping=True,
ws_response_class=None,
loop=None):
"""Initiate websocket connection."""
......@@ -41,7 +58,7 @@ class WebSocketSession(AbstractFactory, ClientSession):
# send request
resp = yield from self.request('get', url, headers=headers,
read_until_eof=False)
read_until_eof=False)
# check handshake
if resp.status != 101:
......@@ -73,10 +90,11 @@ class WebSocketSession(AbstractFactory, ClientSession):
reader = resp.connection.reader.set_parser(WebSocketParser)
writer = WebSocketWriter(resp.connection.writer, use_mask=True)
if response_class is None:
response_class = ClientWebSocketResponse
if ws_response_class is None:
ws_response_class = (self._ws_response_class or
ClientWebSocketResponse)
return response_class(
return ws_response_class(
reader, writer, protocol, resp, timeout, autoclose, autoping, loop)
def detach(self):
......@@ -86,8 +104,9 @@ class WebSocketSession(AbstractFactory, ClientSession):
self._connector = None
def ws_connect(url, protocols=(), timeout=10.0, connector=None,
response_class=None, autoclose=True, autoping=True,
# Cut and paste from aiohttp until merge/release of #374
def ws_connect(url, *, protocols=(), timeout=10.0, connector=None,
ws_response_class=None, autoclose=True, autoping=True,
loop=None):
if loop is None:
asyncio.get_event_loop()
......@@ -95,13 +114,11 @@ def ws_connect(url, protocols=(), timeout=10.0, connector=None,
connector = TCPConnector(loop=loop, force_close=True)
ws_session = WebSocketSession(loop=loop, connector=connector)
try:
resp = yield from ws_session.ws_connect(url,
protocols=protocols,
timeout=timeout,
connector=connector,
response_class=response_class,
ws_response_class=ws_response_class,
autoclose=autoclose,
autoping=autoping,
loop=loop)
......@@ -111,51 +128,39 @@ def ws_connect(url, protocols=(), timeout=10.0, connector=None,
ws_session.detach()
# Will drop 'pluggable sockets' implementation in favour of aiohttp default.
class BaseFactory(AbstractFactory):
class GremlinFactory:
@property
def factory(self):
return self
class AiohttpFactory(BaseFactory):
def __init__(self, connector=None):
self._connector = connector
@asyncio.coroutine
def ws_connect(cls, uri='ws://localhost:8182/', protocols=(),
def ws_connect(self, url='ws://localhost:8182/', protocols=(),
connector=None, autoclose=False, autoping=True,
response_class=None, loop=None):
if response_class is None:
response_class = GremlinClientWebSocketResponse
ws_response_class=None, loop=None):
if connector is None:
connector = self._connector
if ws_response_class is None:
ws_response_class = GremlinClientWebSocketResponse
try:
return (yield from ws_connect(
uri, protocols=protocols, connector=connector,
response_class=response_class, autoclose=True, autoping=True,
url, protocols=protocols, connector=connector,
ws_response_class=ws_response_class, autoclose=True, autoping=True,
loop=loop))
except WSServerHandshakeError as e:
raise SocketClientError(e.message)
class BaseConnection(AbstractConnection):
def __init__(self, loop=None):
self._loop = loop or asyncio.get_event_loop()
self._parser = StreamParser(
buf=DataQueue(loop=self._loop), loop=self._loop)
@property
def parser(self):
return self._parser
class GremlinClientWebSocketResponse(BaseConnection, ClientWebSocketResponse):
class GremlinClientWebSocketResponse(ClientWebSocketResponse):
def __init__(self, reader, writer, protocol, response, timeout, autoclose,
autoping, loop):
BaseConnection.__init__(self, loop=loop)
ClientWebSocketResponse.__init__(self, reader, writer, protocol,
response, timeout, autoclose, autoping, loop)
self._parser = StreamParser(buf=DataQueue(loop=loop), loop=loop)
@property
def parser(self):
return self._parser
@property
def closed(self):
......
class ConnectionContextManager:
__slots__ = ("_conn", "_pool")
__slots__ = ("_conn")
def __init__(self, conn, pool):
def __init__(self, conn):
self._conn = conn
self._pool = pool
def __enter__(self):
return self._conn
......@@ -15,4 +14,3 @@ class ConnectionContextManager:
self._conn._close()
finally:
self._conn = None
self._pool = None
......@@ -2,6 +2,9 @@
Gremlin Server exceptions.
"""
__all__ = ("RequestError", "GremlinServerError", "SocketClientError")
class SocketClientError(IOError): pass
......
import asyncio
from aiogremlin.connection import (AiohttpFactory,
from aiogremlin.connection import (GremlinFactory,
GremlinClientWebSocketResponse)
from aiogremlin.contextmanager import ConnectionContextManager
from aiogremlin.log import logger
def create_pool():
pass
__all__ = ("WebSocketPool",)
class WebSocketPool:
def __init__(self, url='ws://localhost:8182/', factory=None, poolsize=10,
def __init__(self, url, *, factory=None, poolsize=10, connector=None,
max_retries=10, timeout=None, loop=None, verbose=False,
response_class=None):
ws_response_class=None):
"""
"""
self.url = url
self._factory = factory or AiohttpFactory
self._factory = factory or GremlinFactory(connector=connector)
self.poolsize = poolsize
self.max_retries = max_retries
self.timeout = timeout
......@@ -27,7 +25,8 @@ class WebSocketPool:
self._pool = asyncio.Queue(maxsize=self.poolsize, loop=self._loop)
self.active_conns = set()
self.num_connecting = 0
self._response_class = response_class or GremlinClientWebSocketResponse
self._response_class = (ws_response_class or
GremlinClientWebSocketResponse)
self._closed = False
if verbose:
logger.setLevel(INFO)
......@@ -37,8 +36,11 @@ class WebSocketPool:
tasks = []
poolsize = self.poolsize
for i in range(poolsize):
task = asyncio.async(self.factory.ws_connect(self.url,
response_class=self._response_class, loop=self._loop), loop=self._loop)
coro = self.factory.ws_connect(
self.url,
ws_response_class=self._response_class,
loop=self._loop)
task = asyncio.async(coro, loop=self._loop)
tasks.append(task)
for f in asyncio.as_completed(tasks, loop=self._loop):
conn = yield from f
......@@ -78,18 +80,14 @@ class WebSocketPool:
@asyncio.coroutine
def _close_active_conns(self):
tasks = [asyncio.async(conn.close(), loop=self.loop) for conn
in self.active_conns]
in self.active_conns]
yield from asyncio.wait(tasks, loop=self.loop)
@asyncio.coroutine
def _purge_pool(self):
while True:
try:
conn = self._pool.get_nowait()
except asyncio.QueueEmpty:
break
else:
yield from conn.close()
while not self._pool.empty():
conn = self._pool.get_nowait()
yield from conn.close()
@asyncio.coroutine
def acquire(self, url=None, loop=None, num_retries=None):
......@@ -105,18 +103,20 @@ class WebSocketPool:
elif self.num_active_conns + self.num_connecting >= self.poolsize:
logger.info("Waiting for socket...")
socket = yield from asyncio.wait_for(self._pool.get(),
self.timeout, loop=loop)
self.timeout, loop=loop)
logger.info("Socket acquired: {} at {}".format(socket, url))
else:
self.num_connecting += 1
try:
socket = yield from self.factory.ws_connect(url,
response_class=self._response_class, loop=loop)
socket = yield from self.factory.ws_connect(
url,
ws_response_class=self._response_class,
loop=loop)
finally:
self.num_connecting -= 1
if not socket.closed:
logger.info("New connection on socket: {} at {}".format(
socket, url))
socket, url))
self.active_conns.add(socket)
# Untested.
elif num_retries > 0:
......@@ -144,4 +144,4 @@ class WebSocketPool:
def __iter__(self):
conn = yield from self.acquire()
return ConnectionContextManager(conn, self)
return ConnectionContextManager(conn)
......@@ -12,6 +12,8 @@ except ImportError:
from aiogremlin.exceptions import RequestError, GremlinServerError
from aiogremlin.log import logger
__all__ = ("GremlinWriter",)
Message = collections.namedtuple("Message", ["status_code", "data", "message",
"metadata"])
......@@ -53,8 +55,12 @@ class GremlinWriter:
def write(self, gremlin, bindings=None, lang="gremlin-groovy", op="eval",
processor="", session=None, binary=True,
mime_type="application/json"):
message = self._prepare_message(gremlin, bindings=bindings,
lang=lang, op=op, processor=processor, session=session)
message = self._prepare_message(gremlin,
bindings=bindings,
lang=lang,
op=op,
processor=processor,
session=session)
message = json.dumps(message)
if binary:
message = self._set_message_header(message, mime_type)
......@@ -71,8 +77,8 @@ class GremlinWriter:
return b"".join([mime_len, mime_type, bytes(message, "utf-8")])
@staticmethod
def _prepare_message(gremlin, bindings=None, lang="gremlin-groovy", op="eval",
processor="", session=None):
def _prepare_message(gremlin, bindings=None, lang="gremlin-groovy",
op="eval", processor="", session=None):