From c0a8090688933f93d0f4d8c62a13c9649daabd84 Mon Sep 17 00:00:00 2001 From: davebshow <davebshow@gmail.com> Date: Wed, 29 Apr 2015 21:09:23 -0400 Subject: [PATCH] refactoring --- README.md | 54 +++++++++++++++++++++++------ aiogremlin/__init__.py | 1 - aiogremlin/abc.py | 2 +- aiogremlin/client.py | 66 +++++++++++++++++++----------------- aiogremlin/connection.py | 12 ++++++- aiogremlin/contextmanager.py | 34 ------------------- aiogremlin/protocol.py | 7 ++-- tests/tests.py | 47 ++++++++++++++++--------- 8 files changed, 124 insertions(+), 99 deletions(-) delete mode 100644 aiogremlin/contextmanager.py diff --git a/README.md b/README.md index 8f5db31..785e0eb 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # aiogremlin 0.0.1 [(gizmo grew up)](https://pypi.python.org/pypi/gizmo/0.1.12) -`aiogremlin` is a **Python 3** driver for the the [Tinkerpop 3 Gremlin Server](http://www.tinkerpop.com/docs/3.0.0.M7/#gremlin-server). This module is built on [Asyncio](https://docs.python.org/3/library/asyncio.html). By default it uses the [aiohttp](http://aiohttp.readthedocs.org/en/v0.15.3/index.html) socket client implementation, but it is easy to plug in a different implementation. `aiogremlin` is currently in **alpha** mode, but all major functionality has test coverage. +`aiogremlin` is a **Python 3** driver for the the [Tinkerpop 3 Gremlin Server](http://www.tinkerpop.com/docs/3.0.0.M7/#gremlin-server). This module is built on [Asyncio](https://docs.python.org/3/library/asyncio.html). By default it uses the [aiohttp](http://aiohttp.readthedocs.org/en/v0.15.3/index.html) websocket client , but it is easy to plug in a different implementation. `aiogremlin` is currently in **alpha** mode, but all major functionality has test coverage. ## Getting started @@ -17,25 +17,57 @@ Fire up the Gremlin Server: $ ./bin/gremlin-server.sh ``` -The `GremlinClient` communicates asynchronously with the Gremlin Server using websockets. The client uses a combination of [asyncio.coroutine](https://docs.python.org/3/library/asyncio-task.html#coroutines) and [asyncio.Task](https://docs.python.org/3/library/asyncio-task.html#task) run on Asyncio's pluggable event loop to achieve this communication. +The `GremlinClient` communicates asynchronously with the Gremlin Server using websockets. The majority of `GremlinClient` methods are an `asyncio.coroutine`, so you will also need to use `asyncio`: -The majority of `GremlinClient` methods are an `asyncio.coroutine`, so you will also need to use either `asyncio` or the `aiogremlin` [Task API](#task-api). The following examples use `asyncio` to demonstrate the use of the AsyncioGremlineClient. +```python +>>> import asyncio +>>> from aiogremlin import GremlinClient +``` + +The Gremlin Server responds with messages in chunks, `GremlinClient.submit` submits a script to the server, and returns a `GremlinResponse` object. This object provides two methods: `get` and `stream`. `get` collects all of the response messages and returns them as a Python list. `stream` returns an object of the type `GremlinResponseStream` that implements a method `read`. This allows you to read the response without loading all of the messages into memory. -The Gremlin Server sends responses in chunks, so `GremlinClient.submit` returns a list of gremlin response objects: ```python ->>> import asyncio >>> loop = asyncio.get_event_loop() >>> gc = GremlinClient('ws://localhost:8182/', loop=loop) ->>> submit = gc.submit("x + x", bindings={"x": 4}) ->>> result = loop.run_until_complete(submit) + +# Use get. +>>> @asyncio.coroutine +... def get(gc): +... resp = yield from gc.submit("x + x", bindings={"x": 4}) +... result = yield from resp.get() +... return result +>>> result = loop.run_until_complete(get(gc)) >>> result -[[8]] +[Message(status_code=200, data=[8], message={}, metadata='')] >>> resp = result[0] >>> resp.status_code 200 ->>> resp -[8] ->>> loop.run_until_complete(gc.close()) +>>> resp # Named tuple. +Message(status_code=200, data=[8], message={}, metadata='') +>>> loop.run_until_complete(gc.close()) # Explicitly close client!!! >>> loop.close() + +# Use stream. +>>> @asyncio.coroutine +... def stream(gc): +... resp = yield from gc.submit("x + x", bindings={"x": 1}) +... while True: +... result = yield from resp.stream.read() +... if result is None: +... break +... print(result) +>>> loop.run_until_complete(stream(gc)) +Message(status_code=200, data=[2], message={}, metadata='') +``` + +For convenience, `aiogremlin` also provides a method `execute`, which is equivalent to calling `yield from submit()` and then `yield from get()` in the same coroutine. + +```python +>>> loop = asyncio.get_event_loop() +>>> gc = GremlinClient('ws://localhost:8182/', loop=loop) +>>> execute = gc.execute("x + x", bindings={"x": 4}) +>>> result = loop.run_until_complete(execute) +>>> result +[Message(status_code=200, data=[8], message={}, metadata='')] ``` diff --git a/aiogremlin/__init__.py b/aiogremlin/__init__.py index 6e0fcad..a59a7b8 100644 --- a/aiogremlin/__init__.py +++ b/aiogremlin/__init__.py @@ -1,6 +1,5 @@ from .abc import AbstractFactory, AbstractConnection from .connection import WebsocketPool, AiohttpFactory from .client import GremlinClient -from .contextmanager import GremlinContext from .exceptions import RequestError, GremlinServerError, SocketClientError __version__ = "0.0.1" diff --git a/aiogremlin/abc.py b/aiogremlin/abc.py index c3ac714..4f4fc14 100644 --- a/aiogremlin/abc.py +++ b/aiogremlin/abc.py @@ -45,5 +45,5 @@ class AbstractConnection(metaclass=ABCMeta): pass @abstractmethod - def recv(self): + def receive(self): pass diff --git a/aiogremlin/client.py b/aiogremlin/client.py index a2c6025..9bcc20c 100644 --- a/aiogremlin/client.py +++ b/aiogremlin/client.py @@ -1,5 +1,5 @@ -""" -""" +"""Client for the Tinkerpop 3 Gremlin Server.""" + import asyncio import json import ssl @@ -10,7 +10,7 @@ from aiogremlin.log import client_logger from aiogremlin.protocol import gremlin_response_parser, GremlinWriter -class GremlinBase: +class GremlinClient: def __init__(self, uri='ws://localhost:8182/', loop=None, ssl=None, protocol=None, lang="gremlin-groovy", op="eval", @@ -37,8 +37,6 @@ class GremlinBase: poolsize=poolsize, timeout=timeout, loop=self._loop) self.factory = factory or self.pool.factory -class GremlinClient(GremlinBase): - @property def loop(self): return self._loop @@ -57,8 +55,8 @@ class GremlinClient(GremlinBase): return connection @asyncio.coroutine - def send(self, gremlin, connection=None, bindings=None, lang=None, - op=None, processor=None, binary=True): + def submit(self, gremlin, connection=None, bindings=None, lang=None, + op=None, processor=None, binary=True): """ """ lang = lang or self.lang @@ -77,49 +75,53 @@ class GremlinClient(GremlinBase): if connection is None: connection = yield from self.pool.connect(self.uri, loop=self.loop) writer = GremlinWriter(connection) - yield from writer.send(message, binary=binary) - return connection + connection = yield from writer.write(message, binary=binary) + return GremlinResponse(connection) @asyncio.coroutine - def submit(self, gremlin, bindings=None, lang=None, + def execute(self, gremlin, bindings=None, lang=None, op=None, processor=None, consumer=None, collect=True, **kwargs): """ """ lang = lang or self.lang op = op or self.op processor = processor or self.processor - connection = yield from self.send(gremlin, bindings=bindings, lang=lang, + resp = yield from self.submit(gremlin, bindings=bindings, lang=lang, op=op, processor=processor) - results = yield from self.run(connection, consumer=consumer, - collect=collect) - return results + return (yield from resp.get()) - def s(self, *args, **kwargs): - """ - """ - if not kwargs.get("loop", ""): - kwargs["loop"] = self.loop - return async(self.submit, *args, **kwargs) + +class GremlinResponse: + + def __init__(self, conn): + self._stream = GremlinResponseStream(conn) + + @property + def stream(self): + return self._stream @asyncio.coroutine - def recv(self, connection): - """ - """ - return (yield from gremlin_response_parser(connection)) + def get(self): + return (yield from self._run()) @asyncio.coroutine - def run(self, connection, consumer=None, collect=True): + def _run(self): """ """ results = [] while True: - message = yield from self.recv(connection) + message = yield from self.stream.read() if message is None: break - if consumer: - message = consumer(message) - if asyncio.iscoroutine(message): - message = yield from asyncio.async(message, loop=self.loop) - if message and collect: - results.append(message) + results.append(message) return results + + +class GremlinResponseStream: + + def __init__(self, conn): + self.conn = conn + + @asyncio.coroutine + def read(self): + return (yield from gremlin_response_parser(self.conn)) diff --git a/aiogremlin/connection.py b/aiogremlin/connection.py index 0b74947..6797d75 100644 --- a/aiogremlin/connection.py +++ b/aiogremlin/connection.py @@ -36,6 +36,10 @@ class WebsocketPool: def factory(self): return self._factory + @property + def closed(self): + return self._closed + @property def num_active_conns(self): return len(self.active_conns) @@ -113,6 +117,8 @@ class WebsocketPool: self.pool.put_nowait(socket) except asyncio.QueueFull: pass + # This should be - not working + # yield from socket.release() class BaseFactory(AbstractFactory): @@ -150,6 +156,10 @@ class BaseConnection(AbstractConnection): if self in self.pool.active_conns: self.pool.feed_pool(self) + @asyncio.coroutine + def receive(self): + return (yield from parse_gremlin_response(self)) + @asyncio.coroutine def release(self): try: @@ -196,7 +206,7 @@ class AiohttpConnection(BaseConnection): raise @asyncio.coroutine - def receive(self): + def _receive(self): """Implements a dispatcher using the aiohttp websocket protocol.""" while True: try: diff --git a/aiogremlin/contextmanager.py b/aiogremlin/contextmanager.py deleted file mode 100644 index b8ac97c..0000000 --- a/aiogremlin/contextmanager.py +++ /dev/null @@ -1,34 +0,0 @@ -from contextlib import contextmanager -from aiogremlin.client import GremlinBase, GremlinClient -from aiogremlin.connection import WebsocketPool - - -class GremlinContext(GremlinBase): - # Untested. - @property - def client(self): - return self._client() - - @property - def pool(self): - return self._pool() - - @contextmanager - def _client(self): - client = GremlinClient(uri=self.uri, loop=self._loop, ssl=self.ssl, - protocol=self.protocol, lang=self.lang, op=self.op, - processor=self.processor, pool=self.pool, factory=self.factory, - poolsize=self.poolsize, timeout=self.timeout) - try: - yield client - finally: - yield from client.close() - - @contextmanager - def _pool(self): - pool = WebsocketPool(uri=self.uri, loop=self._loop, - factory=self.factory, poolsize=self.poolsize, timeout=self.timeout) - try: - yield pool - finally: - yield from pool.close() diff --git a/aiogremlin/protocol.py b/aiogremlin/protocol.py index 0b8c641..c6afc07 100644 --- a/aiogremlin/protocol.py +++ b/aiogremlin/protocol.py @@ -13,7 +13,7 @@ Message = collections.namedtuple("Message", ["status_code", "data", "message", @asyncio.coroutine def gremlin_response_parser(connection): - message = yield from connection.receive() + message = yield from connection._receive() message = json.loads(message) message = Message(message["status"]["code"], message["result"]["data"], @@ -26,7 +26,7 @@ def gremlin_response_parser(connection): # Return None else: try: - if status_code < 500: + if message.status_code < 500: raise RequestError(message.status_code, message.message) else: raise GremlinServerError(message.status_code, message.message) @@ -40,10 +40,11 @@ class GremlinWriter: self._connection = connection @asyncio.coroutine - def send(self, message, binary=True, mime_type="application/json"): + def write(self, message, binary=True, mime_type="application/json"): if binary: message = self._set_message_header(message, mime_type) yield from self._connection.send(message, binary) + return self._connection @staticmethod def _set_message_header(message, mime_type): diff --git a/tests/tests.py b/tests/tests.py index f72fd87..8830efc 100644 --- a/tests/tests.py +++ b/tests/tests.py @@ -3,10 +3,9 @@ import asyncio import itertools -import websockets import unittest -from aiogremlin import (GremlinClient, RequestError, - GremlinServerError, SocketClientError, WebsocketPool, AiohttpFactory) +from aiogremlin import (GremlinClient, RequestError, GremlinServerError, + SocketClientError, WebsocketPool, AiohttpFactory) class GremlinClientTests(unittest.TestCase): @@ -32,35 +31,53 @@ class GremlinClientTests(unittest.TestCase): self.loop.run_until_complete(conn.close()) def test_sub(self): - sub = self.gc.submit("x + x", bindings={"x": 4}) - results = self.loop.run_until_complete(sub) + execute = self.gc.execute("x + x", bindings={"x": 4}) + results = self.loop.run_until_complete(execute) self.assertEqual(results[0].data[0], 8) - def test_recv(self): + def test_sub_waitfor(self): + sub1 = self.gc.execute("x + x", bindings={"x": 1}) + sub2 = self.gc.execute("x + x", bindings={"x": 2}) + sub3 = self.gc.execute("x + x", bindings={"x": 4}) + coro = asyncio.wait([asyncio.async(sub1, loop=self.loop), + asyncio.async(sub2, loop=self.loop), + asyncio.async(sub3, loop=self.loop)], loop=self.loop) + # Here I am looking for resource warnings. + results = self.loop.run_until_complete(coro) + self.assertIsNotNone(results) + + def test_resp_stream(self): @asyncio.coroutine - def recv_coro(): + def stream_coro(): results = [] - websocket = yield from self.gc.send("x + x", bindings={"x": 4}) + resp = yield from self.gc.submit("x + x", bindings={"x": 4}) while True: - f = yield from self.gc.recv(websocket) + f = yield from resp.stream.read() if f is None: break results.append(f) self.assertEqual(results[0].data[0], 8) + self.loop.run_until_complete(stream_coro()) - self.loop.run_until_complete(recv_coro()) + def test_resp_get(self): + @asyncio.coroutine + def get_coro(): + conn = yield from self.gc.submit("x + x", bindings={"x": 4}) + results = yield from conn.get() + self.assertEqual(results[0].data[0], 8) + self.loop.run_until_complete(get_coro()) - def test_submit_error(self): - sub = self.gc.submit("x + x g.asdfas", bindings={"x": 4}) + def test_execute_error(self): + execute = self.gc.execute("x + x g.asdfas", bindings={"x": 4}) try: - self.loop.run_until_complete(sub) + self.loop.run_until_complete(execute) error = False except: error = True self.assertTrue(error) -class ConnectionManagerTests(unittest.TestCase): +class WebsocketPoolTests(unittest.TestCase): def setUp(self): self.loop = asyncio.new_event_loop() @@ -148,8 +165,6 @@ class ConnectionManagerTests(unittest.TestCase): self.assertFalse(conn1.closed) self.assertIsNotNone(conn2.socket) self.assertFalse(conn2.closed) - # conn1.socket._closed = True - # conn2.socket._closed = True yield from conn1.socket.close() yield from conn2.socket.close() self.assertTrue(conn2.closed) -- GitLab