Commit 1466f0f7 authored by davebshow's avatar davebshow
Browse files

fixed merge conflict in benchmark

parents 440d2d74 497b8508
# aiogremlin 0.0.2 [(gizmo grew up)](https://pypi.python.org/pypi/gizmo/0.1.12)
# aiogremlin 0.0.4 [(gizmo grew up)](https://pypi.python.org/pypi/gizmo/0.1.12)
`aiogremlin` is a **Python 3** driver for the the [Tinkerpop 3 Gremlin Server](http://www.tinkerpop.com/docs/3.0.0.M7/#gremlin-server). This module is built on [Asyncio](https://docs.python.org/3/library/asyncio.html). By default it uses the [aiohttp](http://aiohttp.readthedocs.org/en/v0.15.3/index.html) websocket client , but it is easy to plug in a different implementation. `aiogremlin` is currently in **alpha** mode, but all major functionality has test coverage.
......
......@@ -3,4 +3,4 @@ from .connection import (WebsocketPool, AiohttpFactory, BaseFactory,
BaseConnection)
from .client import GremlinClient, create_client
from .exceptions import RequestError, GremlinServerError, SocketClientError
__version__ = "0.0.3"
__version__ = "0.0.4dev"
......@@ -44,10 +44,6 @@ class AbstractConnection(metaclass=ABCMeta):
def send(self):
pass
@abstractmethod
def receive(self):
pass
@abstractmethod
def _receive(self):
pass
......@@ -4,10 +4,10 @@ import asyncio
import ssl
import uuid
import ujson
import aiohttp
from aiogremlin.connection import WebsocketPool
from aiogremlin.log import client_logger
from aiogremlin.log import client_logger, INFO
from aiogremlin.protocol import gremlin_response_parser, GremlinWriter
......@@ -15,12 +15,13 @@ from aiogremlin.protocol import gremlin_response_parser, GremlinWriter
def create_client(uri='ws://localhost:8182/', loop=None, ssl=None,
protocol=None, lang="gremlin-groovy", op="eval",
processor="", pool=None, factory=None, poolsize=10,
timeout=None, **kwargs):
timeout=None, verbose=False, **kwargs):
pool = WebsocketPool(uri,
factory=factory,
poolsize=poolsize,
timeout=timeout,
loop=loop)
loop=loop,
verbose=verbose)
yield from pool.init_pool()
......@@ -32,7 +33,8 @@ def create_client(uri='ws://localhost:8182/', loop=None, ssl=None,
op=op,
processor=processor,
pool=pool,
factory=factory)
factory=factory,
verbose=verbose)
class GremlinClient:
......@@ -40,7 +42,7 @@ class GremlinClient:
def __init__(self, uri='ws://localhost:8182/', loop=None, ssl=None,
protocol=None, lang="gremlin-groovy", op="eval",
processor="", pool=None, factory=None, poolsize=10,
timeout=None, **kwargs):
timeout=None, verbose=True, **kwargs):
"""
"""
self.uri = uri
......@@ -61,6 +63,8 @@ class GremlinClient:
self.pool = pool or WebsocketPool(uri, factory=factory,
poolsize=poolsize, timeout=timeout, loop=self._loop)
self.factory = factory or self.pool.factory
if verbose:
client_logger.setLevel(INFO)
@property
def loop(self):
......@@ -87,7 +91,7 @@ class GremlinClient:
lang = lang or self.lang
op = op or self.op
processor = processor or self.processor
message = ujson.dumps({
message = {
"requestId": str(uuid.uuid4()),
"op": op,
"processor": processor,
......@@ -96,12 +100,18 @@ class GremlinClient:
"bindings": bindings,
"language": lang
}
})
}
if processor == "session":
message["args"]["session"] = str(uuid.uuid4())
client_logger.info(
"Session ID: {}".format(message["args"]["session"]))
if connection is None:
connection = yield from self.pool.connect(self.uri, loop=self.loop)
writer = GremlinWriter(connection)
connection = yield from writer.write(message, binary=binary)
return GremlinResponse(connection)
queue = connection.parser.set_parser(gremlin_response_parser,
output=aiohttp.DataQueue(loop=self._loop))
return GremlinResponse(connection, queue, loop=self._loop)
@asyncio.coroutine
def execute(self, gremlin, bindings=None, lang=None,
......@@ -118,8 +128,9 @@ class GremlinClient:
class GremlinResponse:
def __init__(self, conn):
self._stream = GremlinResponseStream(conn)
def __init__(self, conn, queue, loop=None):
self._loop = loop or asyncio.get_event_loop()
self._stream = GremlinResponseStream(conn, queue, loop=self._loop)
@property
def stream(self):
......@@ -135,7 +146,7 @@ class GremlinResponse:
"""
results = []
while True:
message = yield from self.stream.read()
message = yield from self._stream.read()
if message is None:
break
results.append(message)
......@@ -144,9 +155,23 @@ class GremlinResponse:
class GremlinResponseStream:
def __init__(self, conn):
self.conn = conn
def __init__(self, conn, queue, loop=None):
self._conn = conn
self._queue = queue
self._loop = loop or asyncio.get_event_loop()
@asyncio.coroutine
def read(self):
return (yield from gremlin_response_parser(self.conn))
# For 3.0.0.M9
# if self._queue.at_eof():
# self._conn.feed_pool()
# message = None
# else:
# This will be different 3.0.0.M9
yield from self._conn._receive()
if self._queue.is_eof():
self._conn.feed_pool()
message = None
else:
message = yield from self._queue.read()
return message
......@@ -102,17 +102,15 @@ class WebsocketPool:
try:
socket = yield from self.factory.connect(uri, pool=self,
loop=loop)
except:
raise
else:
conn_logger.info("New connection on socket: {} at {}".format(
socket, uri))
finally:
self.num_connecting -= 1
if not socket.closed:
conn_logger.info("New connection on socket: {} at {}".format(
socket, uri))
self.active_conns.add(socket)
# Untested.
elif num_retries > 0:
conn_logger.warning("Got bad socket, retry...")
socket = yield from self.connect(uri, loop, num_retries - 1)
else:
raise RuntimeError("Unable to connect, max retries exceeded.")
......@@ -148,24 +146,27 @@ class AiohttpFactory(BaseFactory):
loop=loop)
except aiohttp.WSServerHandshakeError as e:
raise SocketClientError(e.message)
return AiohttpConnection(socket, pool)
return AiohttpConnection(socket, pool, loop=loop)
class BaseConnection(AbstractConnection):
def __init__(self, socket, pool=None):
def __init__(self, socket, pool=None, loop=None):
self.socket = socket
self._loop = loop or asyncio.get_event_loop()
self._pool = pool
self._parser = aiohttp.StreamParser(
buf=aiohttp.DataQueue(loop=self._loop), loop=self._loop)
@property
def parser(self):
return self._parser
def feed_pool(self):
if self.pool:
if self in self.pool.active_conns:
self.pool.feed_pool(self)
@asyncio.coroutine
def receive(self):
return (yield from parse_gremlin_response(self))
@asyncio.coroutine
def release(self):
try:
......@@ -223,9 +224,17 @@ class AiohttpConnection(BaseConnection):
yield from self.release()
raise
if message.tp == aiohttp.MsgType.binary:
return message.data.decode()
try:
self.parser.feed_data(message.data.decode())
except Exception:
self.release()
raise
elif message.tp == aiohttp.MsgType.text:
return message.data.strip()
try:
self.parser.feed_data(message.data.strip())
except Exception:
self.release()
raise
else:
try:
if message.tp == aiohttp.MsgType.close:
......
......@@ -12,27 +12,32 @@ Message = collections.namedtuple("Message", ["status_code", "data", "message",
"metadata"])
@asyncio.coroutine
def gremlin_response_parser(connection):
message = yield from connection._receive()
message = ujson.loads(message)
message = Message(message["status"]["code"],
message["result"]["data"],
message["result"]["meta"],
message["status"]["message"])
if message.status_code == 200:
return message
elif message.status_code == 299:
connection.feed_pool()
# Return None
else:
try:
def gremlin_response_parser(out, buf):
while True:
message = yield
message = ujson.loads(message)
message = Message(message["status"]["code"],
message["result"]["data"],
message["result"]["meta"],
message["status"]["message"])
if message.status_code == 200:
out.feed_data(message)
# For 3.0.0.M9
# out.feed_eof()
# This will be terminated in 3.0.0.M9
elif message.status_code == 299:
out.feed_eof()
# For 3.0.0.M9
# elif message.status_code == 206:
# out.feed_data(message)
# elif message.status_code == 204:
# out.feed_data(message)
# out.feed_eof()
else:
if message.status_code < 500:
raise RequestError(message.status_code, message.message)
else:
raise GremlinServerError(message.status_code, message.message)
finally:
yield from connection.release()
class GremlinWriter:
......@@ -42,6 +47,7 @@ class GremlinWriter:
@asyncio.coroutine
def write(self, message, binary=True, mime_type="application/json"):
message = ujson.dumps(message)
if binary:
message = self._set_message_header(message, mime_type)
yield from self._connection.send(message, binary)
......
......@@ -27,7 +27,7 @@ def run(client, count, concurrency, loop):
assert resp[0].data[0] == result, resp[0].data[0]
processed_count += 1
except Exception:
continue
raise
for i in range(count):
rnd1 = random.randint(1, 9)
......@@ -79,11 +79,11 @@ ARGS.add_argument(
help='message count (default: `%(default)s`)')
ARGS.add_argument(
'-c', '--concurrency', action="store",
nargs='?', type=int, default=256,
nargs='?', type=int, default=100,
help='count of parallel requests (default: `%(default)s`)')
ARGS.add_argument(
'-p', '--poolsize', action="store",
nargs='?', type=int, default=256,
nargs='?', type=int, default=100,
help='num connected websockets (default: `%(default)s`)')
ARGS.add_argument(
'-w', '--warmups', action="store",
......@@ -102,8 +102,8 @@ if __name__ == "__main__":
client = loop.run_until_complete(
aiogremlin.create_client(loop=loop, poolsize=poolsize))
try:
print("Runs: {}. Warmups: {}. Messages: {}. Concurrency: {}.".format(
num_tests, num_warmups, num_mssg, concurr))
print("Runs: {}. Warmups: {}. Messages: {}. Concurrency: {}. Poolsize: {}".format(
num_tests, num_warmups, num_mssg, concurr, poolsize))
main = main(client, num_tests, num_mssg, concurr, num_warmups, loop)
loop.run_until_complete(main)
finally:
......
0.0.1 - 4/2015: Birth!
0.0.2 - 5/1/2015: Added an init_pool method and a create_client constructor.
0.0.3 - 5/2/2015: Using ujson for serialization.
0.0.4 - 5/12/2015: Added support for sessions.
......@@ -3,7 +3,7 @@ from setuptools import setup
setup(
name="aiogremlin",
version="0.0.3",
version="0.0.4",
url="",
license="MIT",
author="davebshow",
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment