diff --git a/aiogremlin/__init__.py b/aiogremlin/__init__.py
index f4ca236c0788834382ee783c44aa81625c24effa..a66a05f02f0e9c3be43fe1b2f306700eb018dafc 100644
--- a/aiogremlin/__init__.py
+++ b/aiogremlin/__init__.py
@@ -2,4 +2,4 @@ from .abc import AbstractFactory, AbstractConnection
from .connection import WebsocketPool, AiohttpFactory
from .client import GremlinClient, create_client
from .exceptions import RequestError, GremlinServerError, SocketClientError
-__version__ = "0.0.2"
+__version__ = "0.0.3"
diff --git a/aiogremlin/client.py b/aiogremlin/client.py
index 938ecb8948d7249426e4d736f0ccfeaccc66640a..a8596906f5e55beddfd4a590928c741053b59bf8 100644
--- a/aiogremlin/client.py
+++ b/aiogremlin/client.py
@@ -1,10 +1,11 @@
"""Client for the Tinkerpop 3 Gremlin Server."""
import asyncio
-import json
import ssl
import uuid
+import ujson
+
from aiogremlin.connection import WebsocketPool
from aiogremlin.log import client_logger
from aiogremlin.protocol import gremlin_response_parser, GremlinWriter
@@ -86,7 +87,7 @@ class GremlinClient:
lang = lang or self.lang
op = op or self.op
processor = processor or self.processor
- message = json.dumps({
+ message = ujson.dumps({
"requestId": str(uuid.uuid4()),
"op": op,
"processor": processor,
diff --git a/aiogremlin/connection.py b/aiogremlin/connection.py
index e2e81883f84c2f3454581ad18fea275e516f5c53..86f7331d1d6b8e6b02764fb75b0e0740fd65a196 100644
--- a/aiogremlin/connection.py
+++ b/aiogremlin/connection.py
@@ -139,12 +139,12 @@ class AiohttpFactory(BaseFactory):
@classmethod
@asyncio.coroutine
def connect(cls, uri='ws://localhost:8182/', pool=None, protocols=(),
- connector=None, autoclose=False, autoping=False, loop=None):
+ connector=None, autoclose=False, autoping=True, loop=None):
if pool:
loop = loop or pool.loop
try:
socket = yield from aiohttp.ws_connect(uri, protocols=protocols,
- connector=connector, autoclose=autoclose, autoping=autoping,
+ connector=connector, autoclose=False, autoping=True,
loop=loop)
except aiohttp.WSServerHandshakeError as e:
raise SocketClientError(e.message)
@@ -214,33 +214,25 @@ class AiohttpConnection(BaseConnection):
@asyncio.coroutine
def _receive(self):
"""Implements a dispatcher using the aiohttp websocket protocol."""
- while True:
+ try:
+ message = yield from self.socket.receive()
+ except (asyncio.CancelledError, asyncio.TimeoutError):
+ yield from self.release()
+ raise
+ except RuntimeError:
+ yield from self.release()
+ raise
+ if message.tp == aiohttp.MsgType.binary:
+ return message.data.decode()
+ elif message.tp == aiohttp.MsgType.text:
+ return message.data.strip()
+ else:
try:
- message = yield from self.socket.receive()
- except (asyncio.CancelledError, asyncio.TimeoutError):
- yield from self.release()
- raise
- except RuntimeError:
+ if message.tp == aiohttp.MsgType.close:
+ conn_logger.warn("Socket connection closed by server.")
+ elif message.tp == aiohttp.MsgType.error:
+ raise SocketClientError(self.socket.exception())
+ elif message.tp == aiohttp.MsgType.closed:
+ raise RuntimeError("Socket closed.")
+ finally:
yield from self.release()
- raise
- if message.tp == aiohttp.MsgType.binary:
- return message.data.decode()
- elif message.tp == aiohttp.MsgType.text:
- return message.data.strip()
- elif message.tp == aiohttp.MsgType.ping:
- conn_logger.warn("Ping received.")
- ws.pong()
- conn_logger.warn("Sent pong.")
- elif message.tp == aiohttp.MsgType.pong:
- conn_logger.warn('Pong received')
- else:
- try:
- if message.tp == aiohttp.MsgType.close:
- conn_logger.warn("Socket connection closed by server.")
- elif message.tp == aiohttp.MsgType.error:
- raise SocketClientError(self.socket.exception())
- elif message.tp == aiohttp.MsgType.closed:
- raise RuntimeError("Socket closed.")
- break
- finally:
- yield from self.release()
diff --git a/aiogremlin/protocol.py b/aiogremlin/protocol.py
index c6afc077a59e19eed3b01f919dcb9ebbbc75ebe6..2a2613561e2e86421594f1174611cee12f825bbd 100644
--- a/aiogremlin/protocol.py
+++ b/aiogremlin/protocol.py
@@ -2,7 +2,8 @@
import asyncio
import collections
-import json
+
+import ujson
from aiogremlin.exceptions import RequestError, GremlinServerError
@@ -14,7 +15,7 @@ Message = collections.namedtuple("Message", ["status_code", "data", "message",
@asyncio.coroutine
def gremlin_response_parser(connection):
message = yield from connection._receive()
- message = json.loads(message)
+ message = ujson.loads(message)
message = Message(message["status"]["code"],
message["result"]["data"],
message["result"]["meta"],
diff --git a/benchmark.py b/benchmark.py
new file mode 100644
index 0000000000000000000000000000000000000000..894f2fd28f35747a830fca1c8eb75cb1a23cf280
--- /dev/null
+++ b/benchmark.py
@@ -0,0 +1,101 @@
+"""Simple benchmark based on aiohttp benchmark client.
+https://github.com/KeepSafe/aiohttp/blob/master/benchmark/async.py
+"""
+import argparse
+import asyncio
+import collections
+import random
+
+import aiogremlin
+
+
+@asyncio.coroutine
+def run(client, count, concurrency, loop):
+ processed_count = 0
+ execute = client.execute
+
+ @asyncio.coroutine
+ def do_bomb():
+ nonlocal processed_count
+ for x in range(count):
+ try:
+ t1 = loop.time()
+ resp = yield from execute("1 + 1")
+ assert resp[0].status_code == 200, resp[0].status_code
+ assert resp[0].data[0] == 2, resp[0].data[0]
+ t2 = loop.time()
+ processed_count += 1
+ except Exception:
+ continue
+
+ bombers = []
+ append = bombers.append
+ async = asyncio.async
+ for i in range(concurrency):
+ bomber = async(do_bomb(), loop=loop)
+ append(bomber)
+
+ t1 = loop.time()
+ yield from asyncio.gather(*bombers, loop=loop)
+ t2 = loop.time()
+ rps = processed_count / (t2 - t1)
+ print("Benchmark complete: {} rps. {} messages in {}".format(rps,
+ processed_count, t2-t1))
+ return rps
+
+
+@asyncio.coroutine
+def main(client, tests, count, concurrency, loop):
+ execute = client.execute
+ # warmup
+ for i in range(10000):
+ resp = yield from execute("1+1")
+ assert resp[0].status_code == 200, resp[0].status_code
+ print("Warmup successful!")
+ # Rest
+ yield from asyncio.sleep(30)
+ rps = yield from run(client, count, concurrency, loop)
+ for i in range(tests - 1):
+ # Take a breather between tests.
+ yield from asyncio.sleep(60)
+ rps = yield from run(client, count, concurrency, loop)
+
+
+ARGS = argparse.ArgumentParser(description="Run benchmark.")
+ARGS.add_argument(
+ '-t', '--tests', action="store",
+ nargs='?', type=int, default=5,
+ help='number of tests (default: `%(default)s`)')
+ARGS.add_argument(
+ '-n', '--count', action="store",
+ nargs='?', type=int, default=10000,
+ help='message count (default: `%(default)s`)')
+ARGS.add_argument(
+ '-c', '--concurrency', action="store",
+ nargs='?', type=int, default=10,
+ help='count of parallel requests (default: `%(default)s`)')
+ARGS.add_argument(
+ '-p', '--poolsize', action="store",
+ nargs='?', type=int, default=256,
+ help='num connected websockets (default: `%(default)s`)')
+
+
+if __name__ == "__main__":
+ args = ARGS.parse_args()
+ num_tests = args.tests
+ num_mssg = args.count
+ concurr = args.concurrency
+ poolsize = args.poolsize
+ loop = asyncio.get_event_loop()
+ client = loop.run_until_complete(
+ aiogremlin.create_client(loop=loop, poolsize=poolsize))
+ try:
+ print(
+ "Runs: {}. Messages: {}. Concurrency: {}. Total mssg/run: {} ".format(
+ num_tests, num_mssg, concurr, num_mssg * concurr))
+ main = main(client, num_tests, num_mssg, concurr, loop)
+ loop.run_until_complete(main)
+ finally:
+ loop.run_until_complete(client.close())
+ loop.close()
+ print("CLOSED CLIENT AND LOOP")
diff --git a/benchmarks/__init__.py b/benchmarks/__init__.py
deleted file mode 100644
index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..0000000000000000000000000000000000000000
diff --git a/benchmarks/benchmark.py b/benchmarks/benchmark.py
deleted file mode 100644
index 9da80d7c094cd8a071f914bb691f211278061e19..0000000000000000000000000000000000000000
--- a/benchmarks/benchmark.py
+++ /dev/null
@@ -1,44 +0,0 @@
-"""Simple benchmark based on aiohttp benchmark client."""
-
-import asyncio
-
-from aiogremlin import GremlinClient
-
-
-@asyncio.coroutine
-def attack(loop):
-
- client = GremlinClient(loop=loop, poolsize=10)
- execute = client.execute
-
- processed_count = 0
-
- @asyncio.coroutine
- def drop_bomb():
- nonlocal processed_count
- try:
- t1 = loop.time()
- resp = yield from execute("1 + 1")
- assert resp[0].status_code == 200, resp[0].status_code
- t2 = loop.time()
- processed_count += 1
- except Exception:
- print("an exception occurred {}".format(resp[0].status_code))
-
- bombers = []
- append = bombers.append
- async = asyncio.async
- for i in range(10000):
- bomber = async(drop_bomb())
- append(bomber)
-
- t1 = loop.time()
- yield from asyncio.gather(*bombers, loop=loop)
- t2 = loop.time()
- rps = processed_count / (t2 - t1)
- print("Benchmark complete: {} rps".format(rps))
-
-
-if __name__ == "__main__":
- loop = asyncio.get_event_loop()
- loop.run_until_complete(attack(loop))
diff --git a/changes.txt b/changes.txt
index c644f7954483aeb16107095a674d4cc0bd3a9662..e134c84a70a455761e5f2b1c6b12c372beee47d8 100644
--- a/changes.txt
+++ b/changes.txt
@@ -1,2 +1,3 @@
0.0.1 - 4/2015: Birth!
0.0.2 - 5/1/2015: Added an init_pool method and a create_client constructor.
+0.0.3 - 5/2/2015: Using ujson for serialization.
diff --git a/setup.py b/setup.py
index 3507070e3a7739b36b684a7137d32f68d9d4a1f3..4244101250fd5644525abe738f48bbeb02d4e80f 100644
--- a/setup.py
+++ b/setup.py
@@ -3,7 +3,7 @@ from setuptools import setup
setup(
name="aiogremlin",
- version="0.0.2",
+ version="0.0.3",
url="",
license="MIT",
author="davebshow",
@@ -12,7 +12,8 @@ setup(
long_description=open("README.txt").read(),
packages=["aiogremlin", "tests"],
install_requires=[
- "aiohttp==0.15.3"
+ "aiohttp==0.15.3",
+ "ujson==1.33"
],
test_suite="tests",
classifiers=[