diff --git a/aiogremlin/__init__.py b/aiogremlin/__init__.py index c51a9722aae330a51959b5549a3f92884744929d..1afdfad2038d3f633c6bc736db0021687b92d6e0 100644 --- a/aiogremlin/__init__.py +++ b/aiogremlin/__init__.py @@ -1,7 +1,7 @@ -from .connection import * +from .response import * from .client import * from .exceptions import * -from .pool import * +from .connector import * from .subprotocol import * -__version__ = "0.0.9" +__version__ = "0.0.10" diff --git a/aiogremlin/client.py b/aiogremlin/client.py index d460005f17a3e8fec1397989937d9154b5f86cd0..700573a8ba5e19c9c53b996f798b9c803337ee66 100644 --- a/aiogremlin/client.py +++ b/aiogremlin/client.py @@ -1,165 +1,178 @@ """Client for the Tinkerpop 3 Gremlin Server.""" import asyncio -import ssl import aiohttp -from aiogremlin.connection import (GremlinFactory, - GremlinClientWebSocketResponse) +from aiogremlin.response import GremlinClientWebSocketResponse from aiogremlin.exceptions import RequestError from aiogremlin.log import logger, INFO -from aiogremlin.pool import WebSocketPool +from aiogremlin.connector import GremlinConnector from aiogremlin.subprotocol import gremlin_response_parser, GremlinWriter -__all__ = ("create_client", "GremlinClient", "GremlinResponse", - "GremlinResponseStream") +__all__ = ("GremlinClient", "GremlinClientSession") -@asyncio.coroutine -def create_client(*, url='ws://localhost:8182/', loop=None, - protocol=None, lang="gremlin-groovy", op="eval", - processor="", pool=None, factory=None, poolsize=10, - timeout=None, verbose=False, fill_pool=True, connector=None): +@asycnio.coroutine +def submit(gremlin, *, + url='ws://localhost:8182/', + bindings=None, + lang="gremlin-groovy", + op="eval", + processor="", + connector=None): - if factory is None: - factory = aiohttp.ClientSession( - connector=connector, - ws_response_class=GremlinClientWebSocketResponse, - loop=loop) + connector = aiohttp.TCPConnector(force_close=True) - if pool is None: - pool = WebSocketPool(url, - factory=factory, - poolsize=poolsize, - timeout=timeout, - loop=loop, - verbose=verbose) + client_session = aiohttp.ClientSession( + connector=connector, ws_response_class=GremlinClientWebSocketResponse) - if fill_pool: - yield from pool.fill_pool() + gremlin_client = GremlinClient(url=url, connector=client_session) + try: + resp = yield from gremlin_client.submit( + gremlin, bindings=bindings, lang=lang, op=op, processor=processor) - return GremlinClient(url=url, - loop=loop, - protocol=protocol, - lang=lang, - op=op, - processor=processor, - pool=pool, - factory=factory, - verbose=verbose) + return resp + + finally: + gremlin_client.detach() + client_session.detach() + + +class SimpleGremlinClient: + + def __init__(self, connection, *, loop=None, verbose=False): + """This class is primarily designed to be used in the context + `manager""" + self._loop = loop or asyncio.get_event_loop() + self._connection = connection + if verbose: + logger.setLevel(INFO) + + @asyncio.coroutine + def submit(self, gremlin, *, bindings=None, lang="gremlin-groovy", + op="eval", processor=""): + """ + """ + writer = GremlinWriter(self._connection) + connection = writer.write(gremlin, bindings=bindings, lang=lang, op=op, + processor=processor, session=session, + binary=binary) + + return GremlinResponse(self._connection, + pool=self._pool, + session=session, + loop=self._loop) class GremlinClient: def __init__(self, url='ws://localhost:8182/', loop=None, - protocol=None, lang="gremlin-groovy", op="eval", - processor="", pool=None, factory=None, poolsize=10, - timeout=None, verbose=False, connector=None, - connection=None): + protocols=None, lang="gremlin-groovy", op="eval", + processor="", timeout=None, verbose=False, + session=None, connector=None): """ """ + # Maybe getter setter for some of these: url, session, lang, op self.url = url - self.ssl = ssl - self.protocol = protocol self._loop = loop or asyncio.get_event_loop() self.lang = lang or "gremlin-groovy" self.op = op or "eval" self.processor = processor or "" - self.poolsize = poolsize - self.timeout = timeout - self._pool = pool - self._connector = connector - self._conn = connection - if pool is not None: - factory = pool._factory - self._connected = self._pool._connected - elif self._conn and not getattr(connection, "closed", True): - self._connected = True - else: - self._connected = False - self._conn = asyncio.async(self._connect(), loop=self._loop) - self._factory = factory or GremlinFactory(connector=self._connector, - loop=self._loop) + self._timeout = timeout + self._session = session if verbose: logger.setLevel(INFO) + if connector is None: + connector = GremlinConnector() + self._connector = connector @property def loop(self): return self._loop + @property + def closed(self): + return self._closed or self._connector is None + @asyncio.coroutine def close(self): + + if self._closed: + return + + self._closed = True try: - if self._pool: - yield from self._pool.close() - elif self._connected: - yield from self._conn.close() + yield from self._connector.close() finally: - self._connected = False + self._connector = None - @asyncio.coroutine - def _connect(self): - """ - """ - connection = yield from self._factory.ws_connect(self.url) - self._connected = True - return connection + def detach(self): + self._connector = None @asyncio.coroutine - def _acquire(self): - if self._pool: - conn = yield from self._pool.acquire() - elif self._connected: - conn = self._conn - else: - try: - self._conn = yield from self._conn - except TypeError: - self._conn = yield from self._connect() - except Exception: - raise RuntimeError("Unable to acquire connection.") - conn = self._conn - return conn - - @asyncio.coroutine - def submit(self, gremlin, *, connection=None, bindings=None, lang=None, - op=None, processor=None, session=None, binary=True): + def submit(self, gremlin, *, bindings=None, lang=None, + op=None, processor=None, binary=True): """ """ lang = lang or self.lang op = op or self.op processor = processor or self.processor - if connection is None: - connection = yield from self._acquire() - writer = GremlinWriter(connection) - connection = writer.write(gremlin, bindings=bindings, lang=lang, op=op, - processor=processor, session=session, - binary=binary) - return GremlinResponse(connection, - pool=self._pool, - session=session, - loop=self._loop) + + ws = self._connector.ws_connect(self.url, timeout=self._timeout) + + writer = GremlinWriter(ws) + ws = writer.write(gremlin, bindings=bindings, lang=lang, op=op, + processor=processor, binary=binary, + session=self._session) + + return GremlinResponse(ws, session=self._session, loop=self._loop) @asyncio.coroutine def execute(self, gremlin, *, bindings=None, lang=None, - op=None, processor=None, consumer=None, collect=True): + op=None, processor=None, binary=True): """ """ lang = lang or self.lang op = op or self.op processor = processor or self.processor + resp = yield from self.submit(gremlin, bindings=bindings, lang=lang, - op=op, processor=processor) + op=op, processor=processor, + binary=binary) + return (yield from resp.get()) +class GremlinClientSession(GremlinClient): + + def __init__(self, url='ws://localhost:8182/', loop=None, + protocols=None, lang="gremlin-groovy", op="eval", + processor="", timeout=None, verbose=False, + session=None, connector=None): + + super().__init__(url=url, loop=loop, protocols=protocols, lang=lang, + op=op, processor=processor, timeout=timeout, + verbose=verbose, connector=connector) + + if session is None: + session = uuid4.uuid4() + self._session = session + + def set_session(self): + pass + + def change_session(self): + pass + + class GremlinResponse: - def __init__(self, conn, *, pool=None, session=None, loop=None): + def __init__(self, ws, *, session=None, loop=None): + # Add timeout for read self._loop = loop or asyncio.get_event_loop() self._session = session - self._stream = GremlinResponseStream(conn, pool=pool, loop=self._loop) + self._stream = GremlinResponseStream(ws, oop=self._loop) @property def stream(self): @@ -188,25 +201,22 @@ class GremlinResponse: class GremlinResponseStream: - def __init__(self, conn, pool=None, loop=None): - self._conn = conn - self._pool = pool + def __init__(self, ws, loop=None): + self._ws = ws self._loop = loop or asyncio.get_event_loop() data_stream = aiohttp.DataQueue(loop=self._loop) - self._stream = self._conn.parser.set_parser(gremlin_response_parser, - output=data_stream) + self._stream = self._ws.parser.set_parser(gremlin_response_parser, + output=data_stream) @asyncio.coroutine def read(self): if self._stream.at_eof(): - if self._pool: - self._pool.release(self._conn) + yield from self._ws.release() message = None else: - asyncio.async(self._conn.receive(), loop=self._loop) + asyncio.async(self._ws.receive(), loop=self._loop) try: message = yield from self._stream.read() except RequestError: - if self._pool: - self._pool.release(self._conn) + yield from self._ws.release() return message diff --git a/aiogremlin/connector.py b/aiogremlin/connector.py new file mode 100644 index 0000000000000000000000000000000000000000..f0c2eadcb2505c9d05352881f981dd70f0c2dadc --- /dev/null +++ b/aiogremlin/connector.py @@ -0,0 +1,61 @@ +import asyncio + +from aiowebsocketclient import WebSocketConnector + +from aiogremlin.response import GremlinClientWebSocketResponse +from aiogremlin.contextmanager import ConnectionContextManager +from aiogremlin.log import logger + +__all__ = ("GremlinConnector",) + + +class GremlinConnector(WebSocketConnector): + + def __init__(self, *args, **kwargs): + kwargs["ws_response_class"] = GremlinClientWebSocketResponse + super().__init__(*args, **kwargs) + + @asyncio.coroutine + def create_client(self, *, url='ws://localhost:8182/', loop=None, + protocol=None, lang="gremlin-groovy", op="eval", + processor="", verbose=False): + + return GremlinClient(url=url, + loop=loop, + protocol=protocol, + lang=lang, + op=op, + processor=processor, + connector=self + verbose=verbose) + + def create_client_session(self, *, url='ws://localhost:8182/', loop=None, + protocol=None, lang="gremlin-groovy", op="eval", + processor="", connector=self, verbose=False): + + return GremlinClientSession(url=url, + loop=loop, + protocol=protocol, + lang=lang, + op=op, + processor=processor, + connector=self + verbose=verbose) + + # # Something like + # @contextmanager + # @asyncio.coroutine + # def connect(self, url, etc): + # pass + + # aioredis style + def __enter__(self): + raise RuntimeError( + "'yield from' should be used as a context manager expression") + + def __exit__(self, *args): + pass + + def __iter__(self): + conn = yield from self.ws_connect(url='ws://localhost:8182/') + return ConnectionContextManager(client) diff --git a/aiogremlin/contextmanager.py b/aiogremlin/contextmanager.py index 206845cd9997b55e0a9af2ea126be9cafa7bd4f8..04cc3ed8cb21d8194ac9c697cfdb3b2783d91d04 100644 --- a/aiogremlin/contextmanager.py +++ b/aiogremlin/contextmanager.py @@ -1,14 +1,18 @@ +from aiogremlin.client import SimpleGremlinClient + + class ConnectionContextManager: __slots__ = ("_conn") def __init__(self, conn): self._conn = conn + self._client = SimpleGremlinClient(conn) def __enter__(self): if self._conn.closed: raise RuntimeError("Connection closed unexpectedly.") - return self._conn + return self._client def __exit__(self, exception_type, exception_value, traceback): try: @@ -17,3 +21,4 @@ class ConnectionContextManager: self._conn._close() finally: self._conn = None + self._client = None diff --git a/aiogremlin/pool.py b/aiogremlin/pool.py deleted file mode 100644 index 2ef8496cbf9f7d7489e5dd94515e00a450b28332..0000000000000000000000000000000000000000 --- a/aiogremlin/pool.py +++ /dev/null @@ -1,146 +0,0 @@ -import asyncio - -from aiogremlin.connection import (GremlinFactory, - GremlinClientWebSocketResponse) -from aiogremlin.contextmanager import ConnectionContextManager -from aiogremlin.log import logger - -__all__ = ("WebSocketPool",) - - -class WebSocketPool: - - def __init__(self, url, *, factory=None, poolsize=10, connector=None, - max_retries=10, timeout=None, loop=None, verbose=False, - ws_response_class=None): - """ - """ - self.url = url - if ws_response_class is None: - ws_response_class = GremlinClientWebSocketResponse - self.poolsize = poolsize - self.max_retries = max_retries - self.timeout = timeout - self._connected = False - self._loop = loop or asyncio.get_event_loop() - self._factory = factory or GremlinFactory(connector=connector, - loop=self._loop) - self._pool = asyncio.Queue(maxsize=self.poolsize, loop=self._loop) - self.active_conns = set() - self.num_connecting = 0 - self._closed = False - if verbose: - logger.setLevel(INFO) - - @asyncio.coroutine - def fill_pool(self): - tasks = [] - poolsize = self.poolsize - for i in range(poolsize): - coro = self.factory.ws_connect(self.url) - task = asyncio.async(coro, loop=self._loop) - tasks.append(task) - for f in asyncio.as_completed(tasks, loop=self._loop): - conn = yield from f - self._put(conn) - self._connected = True - - @property - def loop(self): - return self._loop - - @property - def factory(self): - return self._factory - - @property - def closed(self): - return self._closed - - @property - def num_active_conns(self): - return len(self.active_conns) - - def release(self, conn): - if self._closed: - raise RuntimeError("WebsocketPool is closed.") - self.active_conns.discard(conn) - self._put(conn) - - @asyncio.coroutine - def close(self): - try: - self._factory.close() - except AttributeError: - pass - if not self._closed: - if self.active_conns: - yield from self._close_active_conns() - yield from self._purge_pool() - self._closed = True - - @asyncio.coroutine - def _close_active_conns(self): - tasks = [asyncio.async(conn.close(), loop=self.loop) for conn - in self.active_conns] - yield from asyncio.wait(tasks, loop=self.loop) - - @asyncio.coroutine - def _purge_pool(self): - while not self._pool.empty(): - conn = self._pool.get_nowait() - yield from conn.close() - - @asyncio.coroutine - def acquire(self, url=None, loop=None, num_retries=None): - if self._closed: - raise RuntimeError("WebsocketPool is closed.") - if num_retries is None: - num_retries = self.max_retries - url = url or self.url - loop = loop or self.loop - if not self._pool.empty(): - socket = self._pool.get_nowait() - logger.info("Reusing socket: {} at {}".format(socket, url)) - elif self.num_active_conns + self.num_connecting >= self.poolsize: - logger.info("Waiting for socket...") - socket = yield from asyncio.wait_for(self._pool.get(), - self.timeout, loop=loop) - logger.info("Socket acquired: {} at {}".format(socket, url)) - else: - self.num_connecting += 1 - try: - socket = yield from self.factory.ws_connect(url) - finally: - self.num_connecting -= 1 - if not socket.closed: - logger.info("New connection on socket: {} at {}".format( - socket, url)) - self.active_conns.add(socket) - # Untested. - elif num_retries > 0: - logger.warning("Got bad socket, retry...") - socket = yield from self.acquire(url, loop, num_retries - 1) - else: - raise RuntimeError("Unable to connect, max retries exceeded.") - return socket - - def _put(self, socket): - try: - self._pool.put_nowait(socket) - except asyncio.QueueFull: - pass - # This should be - not working - # yield from socket.release() - - # aioredis style - def __enter__(self): - raise RuntimeError( - "'yield from' should be used as a context manager expression") - - def __exit__(self, *args): - pass - - def __iter__(self): - conn = yield from self.acquire() - return ConnectionContextManager(conn) diff --git a/aiogremlin/connection.py b/aiogremlin/response.py similarity index 75% rename from aiogremlin/connection.py rename to aiogremlin/response.py index 5fbb6bc3623ae9b421cb754c684ed19a4e414b51..c216f13387d13fd53971acceec0b57ba3522d6c4 100644 --- a/aiogremlin/connection.py +++ b/aiogremlin/response.py @@ -7,11 +7,11 @@ import os import aiohttp -from aiohttp.websocket_client import ClientWebSocketResponse +from aiowebsocketclient.connector import ClientWebSocketResponse from aiogremlin.exceptions import SocketClientError from aiogremlin.log import INFO, logger -__all__ = ('GremlinFactory', 'GremlinClientWebSocketResponse') +__all__ = ('GremlinClientWebSocketResponse') class GremlinClientWebSocketResponse(ClientWebSocketResponse): @@ -29,9 +29,9 @@ class GremlinClientWebSocketResponse(ClientWebSocketResponse): return self._parser @asyncio.coroutine - def close(self, *, code=1000, message=b''): + def _close(self, *, code=1000, message=b''): if not self._closed: - did_close = self._close() + did_close = self._do_close() if did_close: return True while True: @@ -55,7 +55,7 @@ class GremlinClientWebSocketResponse(ClientWebSocketResponse): else: return False - def _close(self, code=1000, message=b''): + def _do_close(self, code=1000, message=b''): self._closed = True try: self._writer.close(code, message) @@ -73,7 +73,7 @@ class GremlinClientWebSocketResponse(ClientWebSocketResponse): self._response.close(force=True) return True - def send(self, message, binary=True): + def send(self, message, *, binary=True): if binary: method = self.send_bytes else: @@ -101,21 +101,3 @@ class GremlinClientWebSocketResponse(ClientWebSocketResponse): raise msg.data elif msg.tp == aiohttp.MsgType.closed: pass - - -class GremlinFactory: - - def __init__(self, connector=None, loop=None): - self._connector = connector - self._loop = loop or asyncio.get_event_loop() - - @asyncio.coroutine - def ws_connect(self, url='ws://localhost:8182/', protocols=(), - autoclose=False, autoping=True): - try: - return (yield from aiohttp.ws_connect( - url, protocols=protocols, connector=self._connector, - ws_response_class=GremlinClientWebSocketResponse, - autoclose=True, autoping=True, loop=self._loop)) - except aiohttp.WSServerHandshakeError as e: - raise SocketClientError(e.message) diff --git a/aiogremlin/subprotocol.py b/aiogremlin/subprotocol.py index 559f6de476492fe940173f8e5f6c46695d899878..4025c4f0f54e3a81ee2a73b3e2bd4d388780fb0a 100644 --- a/aiogremlin/subprotocol.py +++ b/aiogremlin/subprotocol.py @@ -61,7 +61,7 @@ class GremlinWriter: message = json.dumps(message) if binary: message = self._set_message_header(message, mime_type) - self._connection.send(message, binary) + self._connection.send(message, binary=binary) return self._connection @staticmethod @@ -87,6 +87,7 @@ class GremlinWriter: } } if processor == "session": + # idk about this autogenerate here with new class session = session or str(uuid.uuid4()) message["args"]["session"] = session logger.info( diff --git a/setup.py b/setup.py index 2363354a4b5103d40586de35a0cf8f9973a1b07e..93ce9eed73be1bc11a52974e0d45f00c436d4f59 100644 --- a/setup.py +++ b/setup.py @@ -3,7 +3,7 @@ from setuptools import setup setup( name="aiogremlin", - version="0.0.9", + version="0.0.10", url="", license="MIT", author="davebshow", @@ -12,7 +12,8 @@ setup( long_description=open("README.txt").read(), packages=["aiogremlin", "tests"], install_requires=[ - "aiohttp==0.16.3" + "aiohttp==0.16.3", + "aiowebsocketclient==0.0.3" ], test_suite="tests", classifiers=[