From 42f35c9d4dc1ad1c5b79f2c39666371a541ca6e0 Mon Sep 17 00:00:00 2001
From: davebshow <davebshow@gmail.com>
Date: Mon, 4 Jul 2016 22:24:32 -0400
Subject: [PATCH] updating connection pooling/driver approach, updated parsing
 a bit, using asyncio...hmmm

---
 goblin/api.py                            |  32 ++---
 goblin/gremlin_python_driver/__init__.py |   1 -
 goblin/gremlin_python_driver/driver.py   | 141 ++++++++++++++++-----
 goblin/gremlin_python_driver/pool.py     | 155 -----------------------
 goblin/mapper.py                         |   4 +
 goblin/query.py                          |   4 +
 6 files changed, 133 insertions(+), 204 deletions(-)
 delete mode 100644 goblin/gremlin_python_driver/pool.py

diff --git a/goblin/api.py b/goblin/api.py
index 6e4d72a..b3ac5f0 100644
--- a/goblin/api.py
+++ b/goblin/api.py
@@ -21,13 +21,9 @@ async def create_engine(url,
     """Constructor function for :py:class:`Engine`. Connects to database
        and builds a dictionary of relevant vendor implmentation features"""
     features = {}
-    # Will use a pool here
-    pool = gremlin_python_driver.create_pool(url,
-                                             loop,
-                                             maxsize=maxsize,
-                                             force_close=force_close,
-                                             force_release=force_release)
-    async with pool.driver.get() as conn:
+    # Will use a driver here
+    driver = gremlin_python_driver.Driver(url, loop)
+    async with driver.get() as conn:
         # Propbably just use a parser to parse the whole feature list
         stream = conn.submit(
             'graph.features().graph().supportsComputer()')
@@ -50,7 +46,7 @@ async def create_engine(url,
         msg = await stream.fetch_data()
         features['threaded_transactions'] = msg.data[0]
 
-    return Engine(url, loop, pool=pool, **features)
+    return Engine(url, loop, driver=driver, **features)
 
 
 # Main API classes
@@ -59,16 +55,16 @@ class Engine:
        database connections. Used as a factory to create :py:class:`Session`
        objects. More config coming soon."""
 
-    def __init__(self, url, loop, *, pool=None, force_close=True, **features):
+    def __init__(self, url, loop, *, driver=None, force_close=True, **features):
         self._url = url
         self._loop = loop
         self._force_close = force_close
         self._features = features
         self._translator = gremlin_python.GroovyTranslator('g')
-        # This will be a pool
-        if pool is None:
-            pool = gremlin_python_driver.Pool(url, loop)
-        self._pool = pool
+        # This will be a driver
+        if driver is None:
+            driver = gremlin_python_driver.Driver(url, loop)
+        self._driver = driver
 
     @property
     def translator(self):
@@ -79,19 +75,19 @@ class Engine:
         return self._url
 
     @property
-    def pool(self):
-        return self._pool
+    def driver(self):
+        return self._driver
 
     def session(self, *, use_session=False):
         return Session(self, use_session=use_session)
 
     async def execute(self, query, *, bindings=None, session=None):
-        conn = await self.pool.acquire()
+        conn = await self.driver.recycle()
         return conn.submit(query, bindings=bindings)
 
     async def close(self):
-        await self.pool.close()
-        self._pool = None
+        await self.driver.close()
+        self._driver = None
 
 
 class Session:
diff --git a/goblin/gremlin_python_driver/__init__.py b/goblin/gremlin_python_driver/__init__.py
index e0283f0..9073a3f 100644
--- a/goblin/gremlin_python_driver/__init__.py
+++ b/goblin/gremlin_python_driver/__init__.py
@@ -1,2 +1 @@
 from goblin.gremlin_python_driver.driver import Driver
-from goblin.gremlin_python_driver.pool import create_pool, Pool
diff --git a/goblin/gremlin_python_driver/driver.py b/goblin/gremlin_python_driver/driver.py
index dd34118..bbf466c 100644
--- a/goblin/gremlin_python_driver/driver.py
+++ b/goblin/gremlin_python_driver/driver.py
@@ -1,12 +1,16 @@
 """Simple Async driver for the TinkerPop3 Gremlin Server"""
+import asyncio
 import collections
 import json
+import logging
 import uuid
 
-
 import aiohttp
 
 
+logger = logging.getLogger(__name__)
+
+
 Message = collections.namedtuple(
     "Message",
     ["status_code", "data", "message", "metadata"])
@@ -20,25 +24,82 @@ class Driver:
         if not client_session:
             client_session = aiohttp.ClientSession(loop=self._loop)
         self._client_session = client_session
+        self._reclaimed = collections.deque()
+        self._open_connections = 0
+        self._inflight_messages = 0
+        self._connecting = 0
+        self._max_connections = 32
+        self._max_inflight_messages = 128
 
     @property
-    def conn(self):
-        return self._conn
+    def max_connections(self):
+        return self._max_connections
+
+    @property
+    def max_inflight_messages(self):
+        return self._max_inflight_messages
+
+    @property
+    def total_connections(self):
+        return self._connecting + self._open_connections
 
     def get(self):
         return AsyncDriverConnectionContextManager(self)
 
-    async def connect(self, *, force_close=True, force_release=False, pool=None):
-        ws = await self._client_session.ws_connect(self._url)
-        return Connection(ws, self._loop, force_close=force_close,
-                          force_release=force_release, pool=pool)
+    async def connect(self, *, force_close=True, recycle=False):
+        if self.total_connections <= self._max_connections:
+            self._connecting += 1
+            try:
+                ws = await self._client_session.ws_connect(self._url)
+                self._open_connections +=1
+                return Connection(ws, self._loop, force_close=force_close,
+                                  recycle=recycle, driver=self)
+            finally:
+                self._connecting -= 1
+        else:
+            raise RuntimeError("To many connections, try recycling")
+
+    async def recycle(self, *, force_close=False, recycle=True):
+        if self._reclaimed:
+            while self._reclaimed:
+                conn = self._reclaimed.popleft()
+                if not conn.closed:
+                    logger.info("Reusing connection: {}".format(conn))
+                    break
+                else:
+                    self._open_connections -= 1
+                    logger.debug(
+                        "Discarded closed connection: {}".format(conn))
+        elif self.total_connections < self.max_connections:
+            conn = await self.connect(force_close=force_close,
+                                      recycle=recycle)
+            logger.info("Acquired new connection: {}".format(conn))
+        return conn
+
+    async def reclaim(self, conn):
+        if self.total_connections <= self.max_connections:
+            if conn.closed:
+                # conn has been closed
+                logger.info(
+                    "Released closed connection: {}".format(conn))
+                self._open_connections -= 1
+                conn = None
+            else:
+                self._reclaimed.append(conn)
+        else:
+            if conn.driver is self:
+                # hmmm
+                await conn.close()
+                self._open_connections -= 1
 
     async def close(self):
-        if self._conn:
-            await self._conn.close()
-            self._conn = None
+        while self._reclaimed:
+            conn = self._reclaimed.popleft()
+            await conn.close()
         await self._client_session.close()
         self._client_session = None
+        self._closed = True
+        logger.debug("Driver {} has been closed".format(self))
 
 
 class AsyncDriverConnectionContextManager:
@@ -60,13 +121,18 @@ class AsyncDriverConnectionContextManager:
 
 class AsyncResponseIter:
 
-    def __init__(self, ws, loop, conn):
+    def __init__(self, ws, loop, conn, username, password, processor, session):
         self._ws = ws
         self._loop = loop
         self._conn = conn
+        self._username = username
+        self._password = password
+        self._processor = processor
+        self._session = session
         self._force_close = self._conn.force_close
-        self._force_release = self._conn.force_release
+        self._recycle = self._conn.recycle
         self._closed = False
+        self._response_queue = asyncio.Queue(loop=loop)
 
     async def __aiter__(self):
         return self
@@ -85,20 +151,30 @@ class AsyncResponseIter:
             self._conn = None
 
     async def fetch_data(self):
-        if self._closed:
-            return
+        if not self._response_queue.empty():
+            message = self._response_queue.get_nowait()
+        else:
+            asyncio.ensure_future(self._parse_data(), loop=self._loop)
+            message = await self._response_queue.get()
+        return message
+
+    async def _parse_data(self):
         data = await self._ws.receive()
+        # parse aiohttp response here
         message = json.loads(data.data.decode("utf-8"))
         message = Message(message["status"]["code"],
                           message["result"]["data"],
                           message["status"]["message"],
                           message["result"]["meta"])
         if message.status_code in [200, 206, 204]:
+            self._response_queue.put_nowait(message)
             if message.status_code != 206:
                 await self._term()
-            return message
+                self._response_queue.put_nowait(None)
         elif message.status_code == 407:
-            pass  # auth
+            self._authenticate(self._username, self._password,
+                               self._processor, self._session)
+            message = await self.fetch_data()
         else:
             await self._term()
             raise RuntimeError("{0} {1}".format(message.status_code,
@@ -108,18 +184,20 @@ class AsyncResponseIter:
         self._closed = True
         if self._force_close:
             await self.close()
-        elif self._force_release:
-            await self._conn.release()
+        elif self._recycle:
+            await self._conn.reclaim()
 
 class Connection:
 
-    def __init__(self, ws, loop, *, force_close=True, force_release=False,
-                 pool=None):
+    def __init__(self, ws, loop, *, force_close=True, recycle=False,
+                 driver=None, username=None, password=None):
         self._ws = ws
         self._loop = loop
         self._force_close = force_close
-        self._force_release = force_release
-        self._pool = pool
+        self._recycle = recycle
+        self._driver = driver
+        self._username = username
+        self._password = password
         self._closed = False
 
     @property
@@ -131,16 +209,16 @@ class Connection:
         return self._force_close
 
     @property
-    def force_release(self):
-        return self._force_release
+    def recycle(self):
+        return self._recycle
 
     @property
-    def pool(self):
-        return self._pool
+    def driver(self):
+        return self._driver
 
-    async def release(self):
-        if self._pool:
-            await self._pool.release(self)
+    async def reclaim(self):
+        if self.driver:
+            await self.driver.reclaim(self)
 
     def submit(self,
                gremlin,
@@ -164,11 +242,14 @@ class Connection:
                                         request_id)
 
         self._ws.send_bytes(message)
-        return AsyncResponseIter(self._ws, self._loop, self)
+        return AsyncResponseIter(self._ws, self._loop, self, self._username,
+                                 self._password, processor, session)
 
     async def close(self):
         await self._ws.close()
         self._closed = True
+        self.driver._open_connections -= 1
+        self._driver = None
 
     def _prepare_message(self, gremlin, bindings, lang, aliases, op, processor,
                          session, request_id):
diff --git a/goblin/gremlin_python_driver/pool.py b/goblin/gremlin_python_driver/pool.py
deleted file mode 100644
index bc7f76d..0000000
--- a/goblin/gremlin_python_driver/pool.py
+++ /dev/null
@@ -1,155 +0,0 @@
-import collections
-import logging
-
-from goblin.gremlin_python_driver import driver
-
-
-logger = logging.getLogger(__name__)
-
-
-def create_pool(url,
-                loop,
-                maxsize=256,
-                force_close=False,
-                force_release=True):
-    return Pool(url, loop, maxsize=maxsize, force_close=force_close,
-                force_release=force_release)
-
-
-class Pool(object):
-    """
-    Pool of :py:class:`goblin.gremlin_python_driver.Connection` objects.
-    :param str url: url for Gremlin Server.
-    :param float timeout: timeout for establishing connection (optional).
-        Values ``0`` or ``None`` mean no timeout
-    :param str username: Username for SASL auth
-    :param str password: Password for SASL auth
-    :param gremlinclient.graph.GraphDatabase graph: The graph instances
-        used to create connections
-    :param int maxsize: Maximum number of connections.
-    :param loop: event loop
-    """
-    def __init__(self, url, loop, maxsize=256, force_close=False,
-                 force_release=True):
-        self._graph = url
-        self._loop = loop
-        self._maxsize = maxsize
-        self._force_close = force_close
-        self._force_release = force_release
-        self._pool = collections.deque()
-        self._acquired = set()
-        self._acquiring = 0
-        self._closed = False
-        self._driver = driver.Driver(url, loop)
-        self._conn = None
-
-    @property
-    def freesize(self):
-        """
-        Number of free connections
-        :returns: int
-        """
-        return len(self._pool)
-
-    @property
-    def size(self):
-        """
-        Total number of connections
-        :returns: int
-        """
-        return len(self._acquired) + self._acquiring + self.freesize
-
-    @property
-    def maxsize(self):
-        """
-        Maximum number of connections
-        :returns: in
-        """
-        return self._maxsize
-
-    @property
-    def driver(self):
-        """
-        Associated graph instance used for creating connections
-        :returns: :py:class:`gremlinclient.graph.GraphDatabase`
-        """
-        return self._driver
-
-    @property
-    def pool(self):
-        """
-        Object that stores unused connections
-        :returns: :py:class:`collections.deque`
-        """
-        return self._pool
-
-    @property
-    def closed(self):
-        """
-        Check if pool has been closed
-        :returns: bool
-        """
-        return self._closed or self._graph is None
-
-    def get(self):
-        return AsyncPoolConnectionContextManager(self)
-
-    async def acquire(self):
-        """
-        Acquire a connection from the Pool
-        :returns: Future -
-            :py:class:`asyncio.Future`, :py:class:`trollius.Future`, or
-            :py:class:`tornado.concurrent.Future`
-        """
-        if self._pool:
-            while self._pool:
-                conn = self._pool.popleft()
-                if not conn.closed:
-                    logger.debug("Reusing connection: {}".format(conn))
-                    self._acquired.add(conn)
-                    break
-                else:
-                    logger.debug(
-                        "Discarded closed connection: {}".format(conn))
-        elif self.size < self.maxsize:
-            self._acquiring += 1
-            conn = await self.driver.connect(force_close=self._force_close,
-                force_release=self._force_release, pool=self)
-            self._acquiring -= 1
-            self._acquired.add(conn)
-            logger.debug(
-                "Acquired new connection: {}".format(conn))
-
-        return conn
-
-    async def release(self, conn):
-        """
-        Release a connection back to the pool.
-        :param gremlinclient.connection.Connection: The connection to be
-            released
-        """
-        if self.size <= self.maxsize:
-            if conn.closed:
-                # conn has been closed
-                logger.info(
-                    "Released closed connection: {}".format(conn))
-                self._acquired.remove(conn)
-                conn = None
-            else:
-                self._pool.append(conn)
-                self._acquired.remove(conn)
-        else:
-            await conn.close()
-
-    async def close(self):
-        """
-        Close pool
-        """
-        while self.pool:
-            conn = self.pool.popleft()
-            await conn.close()
-        await self.driver.close()
-        self._driver = None
-        self._closed = True
-        logger.info(
-            "Connection pool {} has been closed".format(self))
diff --git a/goblin/mapper.py b/goblin/mapper.py
index 58f0c5e..da15b01 100644
--- a/goblin/mapper.py
+++ b/goblin/mapper.py
@@ -1,7 +1,11 @@
 """Helper functions and class to map between OGM Elements <-> DB Elements"""
+import logging
 import inflection
 
 
+logger = logging.getLogger(__name__)
+
+
 def props_generator(properties):
     for prop in properties:
         yield prop['ogm_name'], prop['db_name'], prop['data_type']
diff --git a/goblin/query.py b/goblin/query.py
index fcc363f..1b0fdc1 100644
--- a/goblin/query.py
+++ b/goblin/query.py
@@ -1,7 +1,11 @@
 """Query API and helpers"""
+import logging
 from goblin import mapper
 
 
+logger = logging.getLogger(__name__)
+
+
 def parse_traversal(traversal):
     script = traversal.translator.traversal_script
     bindings = traversal.bindings
-- 
GitLab