Commit 84a692d7 authored by davebshow's avatar davebshow
Browse files

using ujson for serialization

parent aa7dbd82
......@@ -2,4 +2,4 @@ from .abc import AbstractFactory, AbstractConnection
from .connection import WebsocketPool, AiohttpFactory
from .client import GremlinClient, create_client
from .exceptions import RequestError, GremlinServerError, SocketClientError
__version__ = "0.0.2"
__version__ = "0.0.3"
"""Client for the Tinkerpop 3 Gremlin Server."""
import asyncio
import json
import ssl
import uuid
import ujson
from aiogremlin.connection import WebsocketPool
from aiogremlin.log import client_logger
from aiogremlin.protocol import gremlin_response_parser, GremlinWriter
......@@ -86,7 +87,7 @@ class GremlinClient:
lang = lang or self.lang
op = op or self.op
processor = processor or self.processor
message = json.dumps({
message = ujson.dumps({
"requestId": str(uuid.uuid4()),
"op": op,
"processor": processor,
......
......@@ -139,12 +139,12 @@ class AiohttpFactory(BaseFactory):
@classmethod
@asyncio.coroutine
def connect(cls, uri='ws://localhost:8182/', pool=None, protocols=(),
connector=None, autoclose=False, autoping=False, loop=None):
connector=None, autoclose=False, autoping=True, loop=None):
if pool:
loop = loop or pool.loop
try:
socket = yield from aiohttp.ws_connect(uri, protocols=protocols,
connector=connector, autoclose=autoclose, autoping=autoping,
connector=connector, autoclose=False, autoping=True,
loop=loop)
except aiohttp.WSServerHandshakeError as e:
raise SocketClientError(e.message)
......@@ -214,33 +214,25 @@ class AiohttpConnection(BaseConnection):
@asyncio.coroutine
def _receive(self):
"""Implements a dispatcher using the aiohttp websocket protocol."""
while True:
try:
message = yield from self.socket.receive()
except (asyncio.CancelledError, asyncio.TimeoutError):
yield from self.release()
raise
except RuntimeError:
yield from self.release()
raise
if message.tp == aiohttp.MsgType.binary:
return message.data.decode()
elif message.tp == aiohttp.MsgType.text:
return message.data.strip()
else:
try:
message = yield from self.socket.receive()
except (asyncio.CancelledError, asyncio.TimeoutError):
yield from self.release()
raise
except RuntimeError:
if message.tp == aiohttp.MsgType.close:
conn_logger.warn("Socket connection closed by server.")
elif message.tp == aiohttp.MsgType.error:
raise SocketClientError(self.socket.exception())
elif message.tp == aiohttp.MsgType.closed:
raise RuntimeError("Socket closed.")
finally:
yield from self.release()
raise
if message.tp == aiohttp.MsgType.binary:
return message.data.decode()
elif message.tp == aiohttp.MsgType.text:
return message.data.strip()
elif message.tp == aiohttp.MsgType.ping:
conn_logger.warn("Ping received.")
ws.pong()
conn_logger.warn("Sent pong.")
elif message.tp == aiohttp.MsgType.pong:
conn_logger.warn('Pong received')
else:
try:
if message.tp == aiohttp.MsgType.close:
conn_logger.warn("Socket connection closed by server.")
elif message.tp == aiohttp.MsgType.error:
raise SocketClientError(self.socket.exception())
elif message.tp == aiohttp.MsgType.closed:
raise RuntimeError("Socket closed.")
break
finally:
yield from self.release()
......@@ -2,7 +2,8 @@
import asyncio
import collections
import json
import ujson
from aiogremlin.exceptions import RequestError, GremlinServerError
......@@ -14,7 +15,7 @@ Message = collections.namedtuple("Message", ["status_code", "data", "message",
@asyncio.coroutine
def gremlin_response_parser(connection):
message = yield from connection._receive()
message = json.loads(message)
message = ujson.loads(message)
message = Message(message["status"]["code"],
message["result"]["data"],
message["result"]["meta"],
......
"""Simple benchmark based on aiohttp benchmark client.
https://github.com/KeepSafe/aiohttp/blob/master/benchmark/async.py
"""
import argparse
import asyncio
import collections
import random
import aiogremlin
@asyncio.coroutine
def run(client, count, concurrency, loop):
processed_count = 0
execute = client.execute
@asyncio.coroutine
def do_bomb():
nonlocal processed_count
for x in range(count):
try:
t1 = loop.time()
resp = yield from execute("1 + 1")
assert resp[0].status_code == 200, resp[0].status_code
assert resp[0].data[0] == 2, resp[0].data[0]
t2 = loop.time()
processed_count += 1
except Exception:
continue
bombers = []
append = bombers.append
async = asyncio.async
for i in range(concurrency):
bomber = async(do_bomb(), loop=loop)
append(bomber)
t1 = loop.time()
yield from asyncio.gather(*bombers, loop=loop)
t2 = loop.time()
rps = processed_count / (t2 - t1)
print("Benchmark complete: {} rps. {} messages in {}".format(rps,
processed_count, t2-t1))
return rps
@asyncio.coroutine
def main(client, tests, count, concurrency, loop):
execute = client.execute
# warmup
for i in range(10000):
resp = yield from execute("1+1")
assert resp[0].status_code == 200, resp[0].status_code
print("Warmup successful!")
# Rest
yield from asyncio.sleep(30)
rps = yield from run(client, count, concurrency, loop)
for i in range(tests - 1):
# Take a breather between tests.
yield from asyncio.sleep(60)
rps = yield from run(client, count, concurrency, loop)
ARGS = argparse.ArgumentParser(description="Run benchmark.")
ARGS.add_argument(
'-t', '--tests', action="store",
nargs='?', type=int, default=5,
help='number of tests (default: `%(default)s`)')
ARGS.add_argument(
'-n', '--count', action="store",
nargs='?', type=int, default=10000,
help='message count (default: `%(default)s`)')
ARGS.add_argument(
'-c', '--concurrency', action="store",
nargs='?', type=int, default=10,
help='count of parallel requests (default: `%(default)s`)')
ARGS.add_argument(
'-p', '--poolsize', action="store",
nargs='?', type=int, default=256,
help='num connected websockets (default: `%(default)s`)')
if __name__ == "__main__":
args = ARGS.parse_args()
num_tests = args.tests
num_mssg = args.count
concurr = args.concurrency
poolsize = args.poolsize
loop = asyncio.get_event_loop()
client = loop.run_until_complete(
aiogremlin.create_client(loop=loop, poolsize=poolsize))
try:
print(
"Runs: {}. Messages: {}. Concurrency: {}. Total mssg/run: {} ".format(
num_tests, num_mssg, concurr, num_mssg * concurr))
main = main(client, num_tests, num_mssg, concurr, loop)
loop.run_until_complete(main)
finally:
loop.run_until_complete(client.close())
loop.close()
print("CLOSED CLIENT AND LOOP")
"""Simple benchmark based on aiohttp benchmark client."""
import asyncio
from aiogremlin import GremlinClient
@asyncio.coroutine
def attack(loop):
client = GremlinClient(loop=loop, poolsize=10)
execute = client.execute
processed_count = 0
@asyncio.coroutine
def drop_bomb():
nonlocal processed_count
try:
t1 = loop.time()
resp = yield from execute("1 + 1")
assert resp[0].status_code == 200, resp[0].status_code
t2 = loop.time()
processed_count += 1
except Exception:
print("an exception occurred {}".format(resp[0].status_code))
bombers = []
append = bombers.append
async = asyncio.async
for i in range(10000):
bomber = async(drop_bomb())
append(bomber)
t1 = loop.time()
yield from asyncio.gather(*bombers, loop=loop)
t2 = loop.time()
rps = processed_count / (t2 - t1)
print("Benchmark complete: {} rps".format(rps))
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(attack(loop))
0.0.1 - 4/2015: Birth!
0.0.2 - 5/1/2015: Added an init_pool method and a create_client constructor.
0.0.3 - 5/2/2015: Using ujson for serialization.
......@@ -3,7 +3,7 @@ from setuptools import setup
setup(
name="aiogremlin",
version="0.0.2",
version="0.0.3",
url="",
license="MIT",
author="davebshow",
......@@ -12,7 +12,8 @@ setup(
long_description=open("README.txt").read(),
packages=["aiogremlin", "tests"],
install_requires=[
"aiohttp==0.15.3"
"aiohttp==0.15.3",
"ujson==1.33"
],
test_suite="tests",
classifiers=[
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment