From 84a692d7a96ebbb8ee10cd3de5752603bf236e32 Mon Sep 17 00:00:00 2001 From: davebshow <davebshow@gmail.com> Date: Sat, 2 May 2015 18:56:45 -0400 Subject: [PATCH] using ujson for serialization --- aiogremlin/__init__.py | 2 +- aiogremlin/client.py | 5 +- aiogremlin/connection.py | 52 +++++++++----------- aiogremlin/protocol.py | 5 +- benchmark.py | 101 +++++++++++++++++++++++++++++++++++++++ benchmarks/__init__.py | 0 benchmarks/benchmark.py | 44 ----------------- changes.txt | 1 + setup.py | 5 +- 9 files changed, 134 insertions(+), 81 deletions(-) create mode 100644 benchmark.py delete mode 100644 benchmarks/__init__.py delete mode 100644 benchmarks/benchmark.py diff --git a/aiogremlin/__init__.py b/aiogremlin/__init__.py index f4ca236..a66a05f 100644 --- a/aiogremlin/__init__.py +++ b/aiogremlin/__init__.py @@ -2,4 +2,4 @@ from .abc import AbstractFactory, AbstractConnection from .connection import WebsocketPool, AiohttpFactory from .client import GremlinClient, create_client from .exceptions import RequestError, GremlinServerError, SocketClientError -__version__ = "0.0.2" +__version__ = "0.0.3" diff --git a/aiogremlin/client.py b/aiogremlin/client.py index 938ecb8..a859690 100644 --- a/aiogremlin/client.py +++ b/aiogremlin/client.py @@ -1,10 +1,11 @@ """Client for the Tinkerpop 3 Gremlin Server.""" import asyncio -import json import ssl import uuid +import ujson + from aiogremlin.connection import WebsocketPool from aiogremlin.log import client_logger from aiogremlin.protocol import gremlin_response_parser, GremlinWriter @@ -86,7 +87,7 @@ class GremlinClient: lang = lang or self.lang op = op or self.op processor = processor or self.processor - message = json.dumps({ + message = ujson.dumps({ "requestId": str(uuid.uuid4()), "op": op, "processor": processor, diff --git a/aiogremlin/connection.py b/aiogremlin/connection.py index e2e8188..86f7331 100644 --- a/aiogremlin/connection.py +++ b/aiogremlin/connection.py @@ -139,12 +139,12 @@ class AiohttpFactory(BaseFactory): @classmethod @asyncio.coroutine def connect(cls, uri='ws://localhost:8182/', pool=None, protocols=(), - connector=None, autoclose=False, autoping=False, loop=None): + connector=None, autoclose=False, autoping=True, loop=None): if pool: loop = loop or pool.loop try: socket = yield from aiohttp.ws_connect(uri, protocols=protocols, - connector=connector, autoclose=autoclose, autoping=autoping, + connector=connector, autoclose=False, autoping=True, loop=loop) except aiohttp.WSServerHandshakeError as e: raise SocketClientError(e.message) @@ -214,33 +214,25 @@ class AiohttpConnection(BaseConnection): @asyncio.coroutine def _receive(self): """Implements a dispatcher using the aiohttp websocket protocol.""" - while True: + try: + message = yield from self.socket.receive() + except (asyncio.CancelledError, asyncio.TimeoutError): + yield from self.release() + raise + except RuntimeError: + yield from self.release() + raise + if message.tp == aiohttp.MsgType.binary: + return message.data.decode() + elif message.tp == aiohttp.MsgType.text: + return message.data.strip() + else: try: - message = yield from self.socket.receive() - except (asyncio.CancelledError, asyncio.TimeoutError): - yield from self.release() - raise - except RuntimeError: + if message.tp == aiohttp.MsgType.close: + conn_logger.warn("Socket connection closed by server.") + elif message.tp == aiohttp.MsgType.error: + raise SocketClientError(self.socket.exception()) + elif message.tp == aiohttp.MsgType.closed: + raise RuntimeError("Socket closed.") + finally: yield from self.release() - raise - if message.tp == aiohttp.MsgType.binary: - return message.data.decode() - elif message.tp == aiohttp.MsgType.text: - return message.data.strip() - elif message.tp == aiohttp.MsgType.ping: - conn_logger.warn("Ping received.") - ws.pong() - conn_logger.warn("Sent pong.") - elif message.tp == aiohttp.MsgType.pong: - conn_logger.warn('Pong received') - else: - try: - if message.tp == aiohttp.MsgType.close: - conn_logger.warn("Socket connection closed by server.") - elif message.tp == aiohttp.MsgType.error: - raise SocketClientError(self.socket.exception()) - elif message.tp == aiohttp.MsgType.closed: - raise RuntimeError("Socket closed.") - break - finally: - yield from self.release() diff --git a/aiogremlin/protocol.py b/aiogremlin/protocol.py index c6afc07..2a26135 100644 --- a/aiogremlin/protocol.py +++ b/aiogremlin/protocol.py @@ -2,7 +2,8 @@ import asyncio import collections -import json + +import ujson from aiogremlin.exceptions import RequestError, GremlinServerError @@ -14,7 +15,7 @@ Message = collections.namedtuple("Message", ["status_code", "data", "message", @asyncio.coroutine def gremlin_response_parser(connection): message = yield from connection._receive() - message = json.loads(message) + message = ujson.loads(message) message = Message(message["status"]["code"], message["result"]["data"], message["result"]["meta"], diff --git a/benchmark.py b/benchmark.py new file mode 100644 index 0000000..894f2fd --- /dev/null +++ b/benchmark.py @@ -0,0 +1,101 @@ +"""Simple benchmark based on aiohttp benchmark client. +https://github.com/KeepSafe/aiohttp/blob/master/benchmark/async.py +""" +import argparse +import asyncio +import collections +import random + +import aiogremlin + + +@asyncio.coroutine +def run(client, count, concurrency, loop): + processed_count = 0 + execute = client.execute + + @asyncio.coroutine + def do_bomb(): + nonlocal processed_count + for x in range(count): + try: + t1 = loop.time() + resp = yield from execute("1 + 1") + assert resp[0].status_code == 200, resp[0].status_code + assert resp[0].data[0] == 2, resp[0].data[0] + t2 = loop.time() + processed_count += 1 + except Exception: + continue + + bombers = [] + append = bombers.append + async = asyncio.async + for i in range(concurrency): + bomber = async(do_bomb(), loop=loop) + append(bomber) + + t1 = loop.time() + yield from asyncio.gather(*bombers, loop=loop) + t2 = loop.time() + rps = processed_count / (t2 - t1) + print("Benchmark complete: {} rps. {} messages in {}".format(rps, + processed_count, t2-t1)) + return rps + + +@asyncio.coroutine +def main(client, tests, count, concurrency, loop): + execute = client.execute + # warmup + for i in range(10000): + resp = yield from execute("1+1") + assert resp[0].status_code == 200, resp[0].status_code + print("Warmup successful!") + # Rest + yield from asyncio.sleep(30) + rps = yield from run(client, count, concurrency, loop) + for i in range(tests - 1): + # Take a breather between tests. + yield from asyncio.sleep(60) + rps = yield from run(client, count, concurrency, loop) + + +ARGS = argparse.ArgumentParser(description="Run benchmark.") +ARGS.add_argument( + '-t', '--tests', action="store", + nargs='?', type=int, default=5, + help='number of tests (default: `%(default)s`)') +ARGS.add_argument( + '-n', '--count', action="store", + nargs='?', type=int, default=10000, + help='message count (default: `%(default)s`)') +ARGS.add_argument( + '-c', '--concurrency', action="store", + nargs='?', type=int, default=10, + help='count of parallel requests (default: `%(default)s`)') +ARGS.add_argument( + '-p', '--poolsize', action="store", + nargs='?', type=int, default=256, + help='num connected websockets (default: `%(default)s`)') + + +if __name__ == "__main__": + args = ARGS.parse_args() + num_tests = args.tests + num_mssg = args.count + concurr = args.concurrency + poolsize = args.poolsize + loop = asyncio.get_event_loop() + client = loop.run_until_complete( + aiogremlin.create_client(loop=loop, poolsize=poolsize)) + try: + print( + "Runs: {}. Messages: {}. Concurrency: {}. Total mssg/run: {} ".format( + num_tests, num_mssg, concurr, num_mssg * concurr)) + main = main(client, num_tests, num_mssg, concurr, loop) + loop.run_until_complete(main) + finally: + loop.run_until_complete(client.close()) + loop.close() + print("CLOSED CLIENT AND LOOP") diff --git a/benchmarks/__init__.py b/benchmarks/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/benchmarks/benchmark.py b/benchmarks/benchmark.py deleted file mode 100644 index 9da80d7..0000000 --- a/benchmarks/benchmark.py +++ /dev/null @@ -1,44 +0,0 @@ -"""Simple benchmark based on aiohttp benchmark client.""" - -import asyncio - -from aiogremlin import GremlinClient - - -@asyncio.coroutine -def attack(loop): - - client = GremlinClient(loop=loop, poolsize=10) - execute = client.execute - - processed_count = 0 - - @asyncio.coroutine - def drop_bomb(): - nonlocal processed_count - try: - t1 = loop.time() - resp = yield from execute("1 + 1") - assert resp[0].status_code == 200, resp[0].status_code - t2 = loop.time() - processed_count += 1 - except Exception: - print("an exception occurred {}".format(resp[0].status_code)) - - bombers = [] - append = bombers.append - async = asyncio.async - for i in range(10000): - bomber = async(drop_bomb()) - append(bomber) - - t1 = loop.time() - yield from asyncio.gather(*bombers, loop=loop) - t2 = loop.time() - rps = processed_count / (t2 - t1) - print("Benchmark complete: {} rps".format(rps)) - - -if __name__ == "__main__": - loop = asyncio.get_event_loop() - loop.run_until_complete(attack(loop)) diff --git a/changes.txt b/changes.txt index c644f79..e134c84 100644 --- a/changes.txt +++ b/changes.txt @@ -1,2 +1,3 @@ 0.0.1 - 4/2015: Birth! 0.0.2 - 5/1/2015: Added an init_pool method and a create_client constructor. +0.0.3 - 5/2/2015: Using ujson for serialization. diff --git a/setup.py b/setup.py index 3507070..4244101 100644 --- a/setup.py +++ b/setup.py @@ -3,7 +3,7 @@ from setuptools import setup setup( name="aiogremlin", - version="0.0.2", + version="0.0.3", url="", license="MIT", author="davebshow", @@ -12,7 +12,8 @@ setup( long_description=open("README.txt").read(), packages=["aiogremlin", "tests"], install_requires=[ - "aiohttp==0.15.3" + "aiohttp==0.15.3", + "ujson==1.33" ], test_suite="tests", classifiers=[ -- GitLab