Commit 8bfbf574 authored by davebshow's avatar davebshow
Browse files

massive refactor, api changes, moving towards stable release with real docs

parent aa8e69f4
from .connection import *
from .response import *
from .client import *
from .exceptions import *
from .pool import *
from .connector import *
from .subprotocol import *
__version__ = "0.0.9"
__version__ = "0.0.10"
"""Client for the Tinkerpop 3 Gremlin Server."""
import asyncio
import ssl
import aiohttp
from aiogremlin.connection import (GremlinFactory,
GremlinClientWebSocketResponse)
from aiogremlin.response import GremlinClientWebSocketResponse
from aiogremlin.exceptions import RequestError
from aiogremlin.log import logger, INFO
from aiogremlin.pool import WebSocketPool
from aiogremlin.connector import GremlinConnector
from aiogremlin.subprotocol import gremlin_response_parser, GremlinWriter
__all__ = ("create_client", "GremlinClient", "GremlinResponse",
"GremlinResponseStream")
__all__ = ("GremlinClient", "GremlinClientSession")
@asyncio.coroutine
def create_client(*, url='ws://localhost:8182/', loop=None,
protocol=None, lang="gremlin-groovy", op="eval",
processor="", pool=None, factory=None, poolsize=10,
timeout=None, verbose=False, fill_pool=True, connector=None):
@asycnio.coroutine
def submit(gremlin, *,
url='ws://localhost:8182/',
bindings=None,
lang="gremlin-groovy",
op="eval",
processor="",
connector=None):
if factory is None:
factory = aiohttp.ClientSession(
connector=connector,
ws_response_class=GremlinClientWebSocketResponse,
loop=loop)
connector = aiohttp.TCPConnector(force_close=True)
if pool is None:
pool = WebSocketPool(url,
factory=factory,
poolsize=poolsize,
timeout=timeout,
loop=loop,
verbose=verbose)
client_session = aiohttp.ClientSession(
connector=connector, ws_response_class=GremlinClientWebSocketResponse)
if fill_pool:
yield from pool.fill_pool()
gremlin_client = GremlinClient(url=url, connector=client_session)
try:
resp = yield from gremlin_client.submit(
gremlin, bindings=bindings, lang=lang, op=op, processor=processor)
return GremlinClient(url=url,
loop=loop,
protocol=protocol,
lang=lang,
op=op,
processor=processor,
pool=pool,
factory=factory,
verbose=verbose)
return resp
finally:
gremlin_client.detach()
client_session.detach()
class SimpleGremlinClient:
def __init__(self, connection, *, loop=None, verbose=False):
"""This class is primarily designed to be used in the context
`manager"""
self._loop = loop or asyncio.get_event_loop()
self._connection = connection
if verbose:
logger.setLevel(INFO)
@asyncio.coroutine
def submit(self, gremlin, *, bindings=None, lang="gremlin-groovy",
op="eval", processor=""):
"""
"""
writer = GremlinWriter(self._connection)
connection = writer.write(gremlin, bindings=bindings, lang=lang, op=op,
processor=processor, session=session,
binary=binary)
return GremlinResponse(self._connection,
pool=self._pool,
session=session,
loop=self._loop)
class GremlinClient:
def __init__(self, url='ws://localhost:8182/', loop=None,
protocol=None, lang="gremlin-groovy", op="eval",
processor="", pool=None, factory=None, poolsize=10,
timeout=None, verbose=False, connector=None,
connection=None):
protocols=None, lang="gremlin-groovy", op="eval",
processor="", timeout=None, verbose=False,
session=None, connector=None):
"""
"""
# Maybe getter setter for some of these: url, session, lang, op
self.url = url
self.ssl = ssl
self.protocol = protocol
self._loop = loop or asyncio.get_event_loop()
self.lang = lang or "gremlin-groovy"
self.op = op or "eval"
self.processor = processor or ""
self.poolsize = poolsize
self.timeout = timeout
self._pool = pool
self._connector = connector
self._conn = connection
if pool is not None:
factory = pool._factory
self._connected = self._pool._connected
elif self._conn and not getattr(connection, "closed", True):
self._connected = True
else:
self._connected = False
self._conn = asyncio.async(self._connect(), loop=self._loop)
self._factory = factory or GremlinFactory(connector=self._connector,
loop=self._loop)
self._timeout = timeout
self._session = session
if verbose:
logger.setLevel(INFO)
if connector is None:
connector = GremlinConnector()
self._connector = connector
@property
def loop(self):
return self._loop
@property
def closed(self):
return self._closed or self._connector is None
@asyncio.coroutine
def close(self):
if self._closed:
return
self._closed = True
try:
if self._pool:
yield from self._pool.close()
elif self._connected:
yield from self._conn.close()
yield from self._connector.close()
finally:
self._connected = False
self._connector = None
@asyncio.coroutine
def _connect(self):
"""
"""
connection = yield from self._factory.ws_connect(self.url)
self._connected = True
return connection
def detach(self):
self._connector = None
@asyncio.coroutine
def _acquire(self):
if self._pool:
conn = yield from self._pool.acquire()
elif self._connected:
conn = self._conn
else:
try:
self._conn = yield from self._conn
except TypeError:
self._conn = yield from self._connect()
except Exception:
raise RuntimeError("Unable to acquire connection.")
conn = self._conn
return conn
@asyncio.coroutine
def submit(self, gremlin, *, connection=None, bindings=None, lang=None,
op=None, processor=None, session=None, binary=True):
def submit(self, gremlin, *, bindings=None, lang=None,
op=None, processor=None, binary=True):
"""
"""
lang = lang or self.lang
op = op or self.op
processor = processor or self.processor
if connection is None:
connection = yield from self._acquire()
writer = GremlinWriter(connection)
connection = writer.write(gremlin, bindings=bindings, lang=lang, op=op,
processor=processor, session=session,
binary=binary)
return GremlinResponse(connection,
pool=self._pool,
session=session,
loop=self._loop)
ws = self._connector.ws_connect(self.url, timeout=self._timeout)
writer = GremlinWriter(ws)
ws = writer.write(gremlin, bindings=bindings, lang=lang, op=op,
processor=processor, binary=binary,
session=self._session)
return GremlinResponse(ws, session=self._session, loop=self._loop)
@asyncio.coroutine
def execute(self, gremlin, *, bindings=None, lang=None,
op=None, processor=None, consumer=None, collect=True):
op=None, processor=None, binary=True):
"""
"""
lang = lang or self.lang
op = op or self.op
processor = processor or self.processor
resp = yield from self.submit(gremlin, bindings=bindings, lang=lang,
op=op, processor=processor)
op=op, processor=processor,
binary=binary)
return (yield from resp.get())
class GremlinClientSession(GremlinClient):
def __init__(self, url='ws://localhost:8182/', loop=None,
protocols=None, lang="gremlin-groovy", op="eval",
processor="", timeout=None, verbose=False,
session=None, connector=None):
super().__init__(url=url, loop=loop, protocols=protocols, lang=lang,
op=op, processor=processor, timeout=timeout,
verbose=verbose, connector=connector)
if session is None:
session = uuid4.uuid4()
self._session = session
def set_session(self):
pass
def change_session(self):
pass
class GremlinResponse:
def __init__(self, conn, *, pool=None, session=None, loop=None):
def __init__(self, ws, *, session=None, loop=None):
# Add timeout for read
self._loop = loop or asyncio.get_event_loop()
self._session = session
self._stream = GremlinResponseStream(conn, pool=pool, loop=self._loop)
self._stream = GremlinResponseStream(ws, oop=self._loop)
@property
def stream(self):
......@@ -188,25 +201,22 @@ class GremlinResponse:
class GremlinResponseStream:
def __init__(self, conn, pool=None, loop=None):
self._conn = conn
self._pool = pool
def __init__(self, ws, loop=None):
self._ws = ws
self._loop = loop or asyncio.get_event_loop()
data_stream = aiohttp.DataQueue(loop=self._loop)
self._stream = self._conn.parser.set_parser(gremlin_response_parser,
output=data_stream)
self._stream = self._ws.parser.set_parser(gremlin_response_parser,
output=data_stream)
@asyncio.coroutine
def read(self):
if self._stream.at_eof():
if self._pool:
self._pool.release(self._conn)
yield from self._ws.release()
message = None
else:
asyncio.async(self._conn.receive(), loop=self._loop)
asyncio.async(self._ws.receive(), loop=self._loop)
try:
message = yield from self._stream.read()
except RequestError:
if self._pool:
self._pool.release(self._conn)
yield from self._ws.release()
return message
import asyncio
from aiowebsocketclient import WebSocketConnector
from aiogremlin.response import GremlinClientWebSocketResponse
from aiogremlin.contextmanager import ConnectionContextManager
from aiogremlin.log import logger
__all__ = ("GremlinConnector",)
class GremlinConnector(WebSocketConnector):
def __init__(self, *args, **kwargs):
kwargs["ws_response_class"] = GremlinClientWebSocketResponse
super().__init__(*args, **kwargs)
@asyncio.coroutine
def create_client(self, *, url='ws://localhost:8182/', loop=None,
protocol=None, lang="gremlin-groovy", op="eval",
processor="", verbose=False):
return GremlinClient(url=url,
loop=loop,
protocol=protocol,
lang=lang,
op=op,
processor=processor,
connector=self
verbose=verbose)
def create_client_session(self, *, url='ws://localhost:8182/', loop=None,
protocol=None, lang="gremlin-groovy", op="eval",
processor="", connector=self, verbose=False):
return GremlinClientSession(url=url,
loop=loop,
protocol=protocol,
lang=lang,
op=op,
processor=processor,
connector=self
verbose=verbose)
# # Something like
# @contextmanager
# @asyncio.coroutine
# def connect(self, url, etc):
# pass
# aioredis style
def __enter__(self):
raise RuntimeError(
"'yield from' should be used as a context manager expression")
def __exit__(self, *args):
pass
def __iter__(self):
conn = yield from self.ws_connect(url='ws://localhost:8182/')
return ConnectionContextManager(client)
from aiogremlin.client import SimpleGremlinClient
class ConnectionContextManager:
__slots__ = ("_conn")
def __init__(self, conn):
self._conn = conn
self._client = SimpleGremlinClient(conn)
def __enter__(self):
if self._conn.closed:
raise RuntimeError("Connection closed unexpectedly.")
return self._conn
return self._client
def __exit__(self, exception_type, exception_value, traceback):
try:
......@@ -17,3 +21,4 @@ class ConnectionContextManager:
self._conn._close()
finally:
self._conn = None
self._client = None
import asyncio
from aiogremlin.connection import (GremlinFactory,
GremlinClientWebSocketResponse)
from aiogremlin.contextmanager import ConnectionContextManager
from aiogremlin.log import logger
__all__ = ("WebSocketPool",)
class WebSocketPool:
def __init__(self, url, *, factory=None, poolsize=10, connector=None,
max_retries=10, timeout=None, loop=None, verbose=False,
ws_response_class=None):
"""
"""
self.url = url
if ws_response_class is None:
ws_response_class = GremlinClientWebSocketResponse
self.poolsize = poolsize
self.max_retries = max_retries
self.timeout = timeout
self._connected = False
self._loop = loop or asyncio.get_event_loop()
self._factory = factory or GremlinFactory(connector=connector,
loop=self._loop)
self._pool = asyncio.Queue(maxsize=self.poolsize, loop=self._loop)
self.active_conns = set()
self.num_connecting = 0
self._closed = False
if verbose:
logger.setLevel(INFO)
@asyncio.coroutine
def fill_pool(self):
tasks = []
poolsize = self.poolsize
for i in range(poolsize):
coro = self.factory.ws_connect(self.url)
task = asyncio.async(coro, loop=self._loop)
tasks.append(task)
for f in asyncio.as_completed(tasks, loop=self._loop):
conn = yield from f
self._put(conn)
self._connected = True
@property
def loop(self):
return self._loop
@property
def factory(self):
return self._factory
@property
def closed(self):
return self._closed
@property
def num_active_conns(self):
return len(self.active_conns)
def release(self, conn):
if self._closed:
raise RuntimeError("WebsocketPool is closed.")
self.active_conns.discard(conn)
self._put(conn)
@asyncio.coroutine
def close(self):
try:
self._factory.close()
except AttributeError:
pass
if not self._closed:
if self.active_conns:
yield from self._close_active_conns()
yield from self._purge_pool()
self._closed = True
@asyncio.coroutine
def _close_active_conns(self):
tasks = [asyncio.async(conn.close(), loop=self.loop) for conn
in self.active_conns]
yield from asyncio.wait(tasks, loop=self.loop)
@asyncio.coroutine
def _purge_pool(self):
while not self._pool.empty():
conn = self._pool.get_nowait()
yield from conn.close()
@asyncio.coroutine
def acquire(self, url=None, loop=None, num_retries=None):
if self._closed:
raise RuntimeError("WebsocketPool is closed.")
if num_retries is None:
num_retries = self.max_retries
url = url or self.url
loop = loop or self.loop
if not self._pool.empty():
socket = self._pool.get_nowait()
logger.info("Reusing socket: {} at {}".format(socket, url))
elif self.num_active_conns + self.num_connecting >= self.poolsize:
logger.info("Waiting for socket...")
socket = yield from asyncio.wait_for(self._pool.get(),
self.timeout, loop=loop)
logger.info("Socket acquired: {} at {}".format(socket, url))
else:
self.num_connecting += 1
try:
socket = yield from self.factory.ws_connect(url)
finally:
self.num_connecting -= 1
if not socket.closed:
logger.info("New connection on socket: {} at {}".format(
socket, url))
self.active_conns.add(socket)
# Untested.
elif num_retries > 0:
logger.warning("Got bad socket, retry...")
socket = yield from self.acquire(url, loop, num_retries - 1)
else:
raise RuntimeError("Unable to connect, max retries exceeded.")
return socket
def _put(self, socket):
try:
self._pool.put_nowait(socket)
except asyncio.QueueFull:
pass
# This should be - not working
# yield from socket.release()
# aioredis style
def __enter__(self):
raise RuntimeError(
"'yield from' should be used as a context manager expression")
def __exit__(self, *args):
pass
def __iter__(self):
conn = yield from self.acquire()
return ConnectionContextManager(conn)
......@@ -7,11 +7,11 @@ import os
import aiohttp
from aiohttp.websocket_client import ClientWebSocketResponse
from aiowebsocketclient.connector import ClientWebSocketResponse
from aiogremlin.exceptions import SocketClientError
from aiogremlin.log import INFO, logger
__all__ = ('GremlinFactory', 'GremlinClientWebSocketResponse')
__all__ = ('GremlinClientWebSocketResponse')
class GremlinClientWebSocketResponse(ClientWebSocketResponse):
......@@ -29,9 +29,9 @@ class GremlinClientWebSocketResponse(ClientWebSocketResponse):
return self._parser
@asyncio.coroutine
def close(self, *, code=1000, message=b''):
def _close(self, *, code=1000, message=b''):
if not self._closed:
did_close = self._close()
did_close = self._do_close()
if did_close:
return True
while True:
......@@ -55,7 +55,7 @@ class GremlinClientWebSocketResponse(ClientWebSocketResponse):
else:
return False
def _close(self, code=1000, message=b''):
def _do_close(self, code=1000, message=b''):
self._closed = True
try:
self._writer.close(code, message)
......@@ -73,7 +73,7 @@ class GremlinClientWebSocketResponse(ClientWebSocketResponse):
self._response.close(force=True)
return True
def send(self, message, binary=True):
def send(self, message, *, binary=True):
if binary:
method = self.send_bytes
else:
......@@ -101,21 +101,3 @@ class GremlinClientWebSocketResponse(ClientWebSocketResponse):
raise msg.data
elif msg.tp == aiohttp.MsgType.closed:
pass
class GremlinFactory: