Commit c0a80906 authored by davebshow's avatar davebshow
Browse files

refactoring

parent d1340c45
# aiogremlin 0.0.1 [(gizmo grew up)](https://pypi.python.org/pypi/gizmo/0.1.12)
`aiogremlin` is a **Python 3** driver for the the [Tinkerpop 3 Gremlin Server](http://www.tinkerpop.com/docs/3.0.0.M7/#gremlin-server). This module is built on [Asyncio](https://docs.python.org/3/library/asyncio.html). By default it uses the [aiohttp](http://aiohttp.readthedocs.org/en/v0.15.3/index.html) socket client implementation, but it is easy to plug in a different implementation. `aiogremlin` is currently in **alpha** mode, but all major functionality has test coverage.
`aiogremlin` is a **Python 3** driver for the the [Tinkerpop 3 Gremlin Server](http://www.tinkerpop.com/docs/3.0.0.M7/#gremlin-server). This module is built on [Asyncio](https://docs.python.org/3/library/asyncio.html). By default it uses the [aiohttp](http://aiohttp.readthedocs.org/en/v0.15.3/index.html) websocket client , but it is easy to plug in a different implementation. `aiogremlin` is currently in **alpha** mode, but all major functionality has test coverage.
## Getting started
......@@ -17,25 +17,57 @@ Fire up the Gremlin Server:
$ ./bin/gremlin-server.sh
```
The `GremlinClient` communicates asynchronously with the Gremlin Server using websockets. The client uses a combination of [asyncio.coroutine](https://docs.python.org/3/library/asyncio-task.html#coroutines) and [asyncio.Task](https://docs.python.org/3/library/asyncio-task.html#task) run on Asyncio's pluggable event loop to achieve this communication.
The `GremlinClient` communicates asynchronously with the Gremlin Server using websockets. The majority of `GremlinClient` methods are an `asyncio.coroutine`, so you will also need to use `asyncio`:
The majority of `GremlinClient` methods are an `asyncio.coroutine`, so you will also need to use either `asyncio` or the `aiogremlin` [Task API](#task-api). The following examples use `asyncio` to demonstrate the use of the AsyncioGremlineClient.
```python
>>> import asyncio
>>> from aiogremlin import GremlinClient
```
The Gremlin Server responds with messages in chunks, `GremlinClient.submit` submits a script to the server, and returns a `GremlinResponse` object. This object provides two methods: `get` and `stream`. `get` collects all of the response messages and returns them as a Python list. `stream` returns an object of the type `GremlinResponseStream` that implements a method `read`. This allows you to read the response without loading all of the messages into memory.
The Gremlin Server sends responses in chunks, so `GremlinClient.submit` returns a list of gremlin response objects:
```python
>>> import asyncio
>>> loop = asyncio.get_event_loop()
>>> gc = GremlinClient('ws://localhost:8182/', loop=loop)
>>> submit = gc.submit("x + x", bindings={"x": 4})
>>> result = loop.run_until_complete(submit)
# Use get.
>>> @asyncio.coroutine
... def get(gc):
... resp = yield from gc.submit("x + x", bindings={"x": 4})
... result = yield from resp.get()
... return result
>>> result = loop.run_until_complete(get(gc))
>>> result
[[8]]
[Message(status_code=200, data=[8], message={}, metadata='')]
>>> resp = result[0]
>>> resp.status_code
200
>>> resp
[8]
>>> loop.run_until_complete(gc.close())
>>> resp # Named tuple.
Message(status_code=200, data=[8], message={}, metadata='')
>>> loop.run_until_complete(gc.close()) # Explicitly close client!!!
>>> loop.close()
# Use stream.
>>> @asyncio.coroutine
... def stream(gc):
... resp = yield from gc.submit("x + x", bindings={"x": 1})
... while True:
... result = yield from resp.stream.read()
... if result is None:
... break
... print(result)
>>> loop.run_until_complete(stream(gc))
Message(status_code=200, data=[2], message={}, metadata='')
```
For convenience, `aiogremlin` also provides a method `execute`, which is equivalent to calling `yield from submit()` and then `yield from get()` in the same coroutine.
```python
>>> loop = asyncio.get_event_loop()
>>> gc = GremlinClient('ws://localhost:8182/', loop=loop)
>>> execute = gc.execute("x + x", bindings={"x": 4})
>>> result = loop.run_until_complete(execute)
>>> result
[Message(status_code=200, data=[8], message={}, metadata='')]
```
from .abc import AbstractFactory, AbstractConnection
from .connection import WebsocketPool, AiohttpFactory
from .client import GremlinClient
from .contextmanager import GremlinContext
from .exceptions import RequestError, GremlinServerError, SocketClientError
__version__ = "0.0.1"
......@@ -45,5 +45,5 @@ class AbstractConnection(metaclass=ABCMeta):
pass
@abstractmethod
def recv(self):
def receive(self):
pass
"""
"""
"""Client for the Tinkerpop 3 Gremlin Server."""
import asyncio
import json
import ssl
......@@ -10,7 +10,7 @@ from aiogremlin.log import client_logger
from aiogremlin.protocol import gremlin_response_parser, GremlinWriter
class GremlinBase:
class GremlinClient:
def __init__(self, uri='ws://localhost:8182/', loop=None, ssl=None,
protocol=None, lang="gremlin-groovy", op="eval",
......@@ -37,8 +37,6 @@ class GremlinBase:
poolsize=poolsize, timeout=timeout, loop=self._loop)
self.factory = factory or self.pool.factory
class GremlinClient(GremlinBase):
@property
def loop(self):
return self._loop
......@@ -57,8 +55,8 @@ class GremlinClient(GremlinBase):
return connection
@asyncio.coroutine
def send(self, gremlin, connection=None, bindings=None, lang=None,
op=None, processor=None, binary=True):
def submit(self, gremlin, connection=None, bindings=None, lang=None,
op=None, processor=None, binary=True):
"""
"""
lang = lang or self.lang
......@@ -77,49 +75,53 @@ class GremlinClient(GremlinBase):
if connection is None:
connection = yield from self.pool.connect(self.uri, loop=self.loop)
writer = GremlinWriter(connection)
yield from writer.send(message, binary=binary)
return connection
connection = yield from writer.write(message, binary=binary)
return GremlinResponse(connection)
@asyncio.coroutine
def submit(self, gremlin, bindings=None, lang=None,
def execute(self, gremlin, bindings=None, lang=None,
op=None, processor=None, consumer=None, collect=True, **kwargs):
"""
"""
lang = lang or self.lang
op = op or self.op
processor = processor or self.processor
connection = yield from self.send(gremlin, bindings=bindings, lang=lang,
resp = yield from self.submit(gremlin, bindings=bindings, lang=lang,
op=op, processor=processor)
results = yield from self.run(connection, consumer=consumer,
collect=collect)
return results
return (yield from resp.get())
def s(self, *args, **kwargs):
"""
"""
if not kwargs.get("loop", ""):
kwargs["loop"] = self.loop
return async(self.submit, *args, **kwargs)
class GremlinResponse:
def __init__(self, conn):
self._stream = GremlinResponseStream(conn)
@property
def stream(self):
return self._stream
@asyncio.coroutine
def recv(self, connection):
"""
"""
return (yield from gremlin_response_parser(connection))
def get(self):
return (yield from self._run())
@asyncio.coroutine
def run(self, connection, consumer=None, collect=True):
def _run(self):
"""
"""
results = []
while True:
message = yield from self.recv(connection)
message = yield from self.stream.read()
if message is None:
break
if consumer:
message = consumer(message)
if asyncio.iscoroutine(message):
message = yield from asyncio.async(message, loop=self.loop)
if message and collect:
results.append(message)
results.append(message)
return results
class GremlinResponseStream:
def __init__(self, conn):
self.conn = conn
@asyncio.coroutine
def read(self):
return (yield from gremlin_response_parser(self.conn))
......@@ -36,6 +36,10 @@ class WebsocketPool:
def factory(self):
return self._factory
@property
def closed(self):
return self._closed
@property
def num_active_conns(self):
return len(self.active_conns)
......@@ -113,6 +117,8 @@ class WebsocketPool:
self.pool.put_nowait(socket)
except asyncio.QueueFull:
pass
# This should be - not working
# yield from socket.release()
class BaseFactory(AbstractFactory):
......@@ -150,6 +156,10 @@ class BaseConnection(AbstractConnection):
if self in self.pool.active_conns:
self.pool.feed_pool(self)
@asyncio.coroutine
def receive(self):
return (yield from parse_gremlin_response(self))
@asyncio.coroutine
def release(self):
try:
......@@ -196,7 +206,7 @@ class AiohttpConnection(BaseConnection):
raise
@asyncio.coroutine
def receive(self):
def _receive(self):
"""Implements a dispatcher using the aiohttp websocket protocol."""
while True:
try:
......
from contextlib import contextmanager
from aiogremlin.client import GremlinBase, GremlinClient
from aiogremlin.connection import WebsocketPool
class GremlinContext(GremlinBase):
# Untested.
@property
def client(self):
return self._client()
@property
def pool(self):
return self._pool()
@contextmanager
def _client(self):
client = GremlinClient(uri=self.uri, loop=self._loop, ssl=self.ssl,
protocol=self.protocol, lang=self.lang, op=self.op,
processor=self.processor, pool=self.pool, factory=self.factory,
poolsize=self.poolsize, timeout=self.timeout)
try:
yield client
finally:
yield from client.close()
@contextmanager
def _pool(self):
pool = WebsocketPool(uri=self.uri, loop=self._loop,
factory=self.factory, poolsize=self.poolsize, timeout=self.timeout)
try:
yield pool
finally:
yield from pool.close()
......@@ -13,7 +13,7 @@ Message = collections.namedtuple("Message", ["status_code", "data", "message",
@asyncio.coroutine
def gremlin_response_parser(connection):
message = yield from connection.receive()
message = yield from connection._receive()
message = json.loads(message)
message = Message(message["status"]["code"],
message["result"]["data"],
......@@ -26,7 +26,7 @@ def gremlin_response_parser(connection):
# Return None
else:
try:
if status_code < 500:
if message.status_code < 500:
raise RequestError(message.status_code, message.message)
else:
raise GremlinServerError(message.status_code, message.message)
......@@ -40,10 +40,11 @@ class GremlinWriter:
self._connection = connection
@asyncio.coroutine
def send(self, message, binary=True, mime_type="application/json"):
def write(self, message, binary=True, mime_type="application/json"):
if binary:
message = self._set_message_header(message, mime_type)
yield from self._connection.send(message, binary)
return self._connection
@staticmethod
def _set_message_header(message, mime_type):
......
......@@ -3,10 +3,9 @@
import asyncio
import itertools
import websockets
import unittest
from aiogremlin import (GremlinClient, RequestError,
GremlinServerError, SocketClientError, WebsocketPool, AiohttpFactory)
from aiogremlin import (GremlinClient, RequestError, GremlinServerError,
SocketClientError, WebsocketPool, AiohttpFactory)
class GremlinClientTests(unittest.TestCase):
......@@ -32,35 +31,53 @@ class GremlinClientTests(unittest.TestCase):
self.loop.run_until_complete(conn.close())
def test_sub(self):
sub = self.gc.submit("x + x", bindings={"x": 4})
results = self.loop.run_until_complete(sub)
execute = self.gc.execute("x + x", bindings={"x": 4})
results = self.loop.run_until_complete(execute)
self.assertEqual(results[0].data[0], 8)
def test_recv(self):
def test_sub_waitfor(self):
sub1 = self.gc.execute("x + x", bindings={"x": 1})
sub2 = self.gc.execute("x + x", bindings={"x": 2})
sub3 = self.gc.execute("x + x", bindings={"x": 4})
coro = asyncio.wait([asyncio.async(sub1, loop=self.loop),
asyncio.async(sub2, loop=self.loop),
asyncio.async(sub3, loop=self.loop)], loop=self.loop)
# Here I am looking for resource warnings.
results = self.loop.run_until_complete(coro)
self.assertIsNotNone(results)
def test_resp_stream(self):
@asyncio.coroutine
def recv_coro():
def stream_coro():
results = []
websocket = yield from self.gc.send("x + x", bindings={"x": 4})
resp = yield from self.gc.submit("x + x", bindings={"x": 4})
while True:
f = yield from self.gc.recv(websocket)
f = yield from resp.stream.read()
if f is None:
break
results.append(f)
self.assertEqual(results[0].data[0], 8)
self.loop.run_until_complete(stream_coro())
self.loop.run_until_complete(recv_coro())
def test_resp_get(self):
@asyncio.coroutine
def get_coro():
conn = yield from self.gc.submit("x + x", bindings={"x": 4})
results = yield from conn.get()
self.assertEqual(results[0].data[0], 8)
self.loop.run_until_complete(get_coro())
def test_submit_error(self):
sub = self.gc.submit("x + x g.asdfas", bindings={"x": 4})
def test_execute_error(self):
execute = self.gc.execute("x + x g.asdfas", bindings={"x": 4})
try:
self.loop.run_until_complete(sub)
self.loop.run_until_complete(execute)
error = False
except:
error = True
self.assertTrue(error)
class ConnectionManagerTests(unittest.TestCase):
class WebsocketPoolTests(unittest.TestCase):
def setUp(self):
self.loop = asyncio.new_event_loop()
......@@ -148,8 +165,6 @@ class ConnectionManagerTests(unittest.TestCase):
self.assertFalse(conn1.closed)
self.assertIsNotNone(conn2.socket)
self.assertFalse(conn2.closed)
# conn1.socket._closed = True
# conn2.socket._closed = True
yield from conn1.socket.close()
yield from conn2.socket.close()
self.assertTrue(conn2.closed)
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment