From 96c879901319bcb11df31de0092c34f8695b8da5 Mon Sep 17 00:00:00 2001 From: davebshow <davebshow@gmail.com> Date: Tue, 5 Jul 2016 13:32:33 -0400 Subject: [PATCH] inflight conn management is handled per connection --- goblin/gremlin_python_driver/driver.py | 39 +++++++++++++------------- 1 file changed, 19 insertions(+), 20 deletions(-) diff --git a/goblin/gremlin_python_driver/driver.py b/goblin/gremlin_python_driver/driver.py index 8379d77..45e1bd2 100644 --- a/goblin/gremlin_python_driver/driver.py +++ b/goblin/gremlin_python_driver/driver.py @@ -27,10 +27,8 @@ class Driver: self._reclaimed = collections.deque() self._driver_condition = asyncio.Condition(loop=loop) self._open_connections = 0 - self._inflight_messages = 0 self._connecting = 0 self._max_connections = 4 - self._max_inflight_messages = 128 @property def driver_condition(self): @@ -40,24 +38,10 @@ class Driver: def max_connections(self): return self._max_connections - @property - def max_inflight_messages(self): - return self._max_inflight_messages - @property def total_connections(self): return self._connecting + self._open_connections - @property - def inflight_messages(self): - return self._inflight_messages - - def add_inflight(self): - self._inflight_messages += 1 - - def remove_inflight(self): - self._inflight_messages -= 1 - def get(self): return AsyncDriverConnectionContextManager(self) @@ -228,7 +212,7 @@ class AsyncResponseIter: async def term(self): async with self._conn.conn_condition: - self._conn.driver.remove_inflight() + self._conn.remove_inflight() self._conn.conn_condition.notify() self._closed = True if self._force_close: @@ -249,6 +233,16 @@ class Connection: self._password = password self._closed = False self._conn_condition = asyncio.Condition(loop=loop) + self._inflight_messages = 0 + self._max_inflight_messages = 32 + + @property + def inflight_messages(self): + return self._inflight_messages + + @property + def max_inflight_messages(self): + return self._max_inflight_messages @property def conn_condition(self): @@ -270,6 +264,12 @@ class Connection: def driver(self): return self._driver + def add_inflight(self): + self._inflight_messages += 1 + + def remove_inflight(self): + self._inflight_messages -= 1 + async def reclaim(self): if self.driver: await self.driver.reclaim(self) @@ -296,14 +296,13 @@ class Connection: request_id) async with self.conn_condition: while True: - if (self.driver.inflight_messages < - self.driver.max_inflight_messages): + if self.inflight_messages < self.max_inflight_messages: self._ws.send_bytes(message) return AsyncResponseIter(self._ws, self._loop, self, self._username, self._password, processor, session) else: - await self.driver.message_condition.wait() + await self.conn_condition.wait() async def close(self): async with self.conn_condition: -- GitLab