diff --git a/goblin/api.py b/goblin/api.py index 6e4d72a0052800caf3fc0d118891db7b9a34279a..b3ac5f079e4430375ad62592a46d607bf2655896 100644 --- a/goblin/api.py +++ b/goblin/api.py @@ -21,13 +21,9 @@ async def create_engine(url, """Constructor function for :py:class:`Engine`. Connects to database and builds a dictionary of relevant vendor implmentation features""" features = {} - # Will use a pool here - pool = gremlin_python_driver.create_pool(url, - loop, - maxsize=maxsize, - force_close=force_close, - force_release=force_release) - async with pool.driver.get() as conn: + # Will use a driver here + driver = gremlin_python_driver.Driver(url, loop) + async with driver.get() as conn: # Propbably just use a parser to parse the whole feature list stream = conn.submit( 'graph.features().graph().supportsComputer()') @@ -50,7 +46,7 @@ async def create_engine(url, msg = await stream.fetch_data() features['threaded_transactions'] = msg.data[0] - return Engine(url, loop, pool=pool, **features) + return Engine(url, loop, driver=driver, **features) # Main API classes @@ -59,16 +55,16 @@ class Engine: database connections. Used as a factory to create :py:class:`Session` objects. More config coming soon.""" - def __init__(self, url, loop, *, pool=None, force_close=True, **features): + def __init__(self, url, loop, *, driver=None, force_close=True, **features): self._url = url self._loop = loop self._force_close = force_close self._features = features self._translator = gremlin_python.GroovyTranslator('g') - # This will be a pool - if pool is None: - pool = gremlin_python_driver.Pool(url, loop) - self._pool = pool + # This will be a driver + if driver is None: + driver = gremlin_python_driver.Driver(url, loop) + self._driver = driver @property def translator(self): @@ -79,19 +75,19 @@ class Engine: return self._url @property - def pool(self): - return self._pool + def driver(self): + return self._driver def session(self, *, use_session=False): return Session(self, use_session=use_session) async def execute(self, query, *, bindings=None, session=None): - conn = await self.pool.acquire() + conn = await self.driver.recycle() return conn.submit(query, bindings=bindings) async def close(self): - await self.pool.close() - self._pool = None + await self.driver.close() + self._driver = None class Session: diff --git a/goblin/gremlin_python_driver/__init__.py b/goblin/gremlin_python_driver/__init__.py index e0283f0ee682ea821400f3cb5bbe9eb2c51b8493..9073a3f9291f78e909c3af1093670517e743b4ca 100644 --- a/goblin/gremlin_python_driver/__init__.py +++ b/goblin/gremlin_python_driver/__init__.py @@ -1,2 +1 @@ from goblin.gremlin_python_driver.driver import Driver -from goblin.gremlin_python_driver.pool import create_pool, Pool diff --git a/goblin/gremlin_python_driver/driver.py b/goblin/gremlin_python_driver/driver.py index dd341181cd84aea398a2caf6414c2fec48aa1fa3..bbf466ce652fc5922bd9da362f9bd22fe834591d 100644 --- a/goblin/gremlin_python_driver/driver.py +++ b/goblin/gremlin_python_driver/driver.py @@ -1,12 +1,16 @@ """Simple Async driver for the TinkerPop3 Gremlin Server""" +import asyncio import collections import json +import logging import uuid - import aiohttp +logger = logging.getLogger(__name__) + + Message = collections.namedtuple( "Message", ["status_code", "data", "message", "metadata"]) @@ -20,25 +24,82 @@ class Driver: if not client_session: client_session = aiohttp.ClientSession(loop=self._loop) self._client_session = client_session + self._reclaimed = collections.deque() + self._open_connections = 0 + self._inflight_messages = 0 + self._connecting = 0 + self._max_connections = 32 + self._max_inflight_messages = 128 @property - def conn(self): - return self._conn + def max_connections(self): + return self._max_connections + + @property + def max_inflight_messages(self): + return self._max_inflight_messages + + @property + def total_connections(self): + return self._connecting + self._open_connections def get(self): return AsyncDriverConnectionContextManager(self) - async def connect(self, *, force_close=True, force_release=False, pool=None): - ws = await self._client_session.ws_connect(self._url) - return Connection(ws, self._loop, force_close=force_close, - force_release=force_release, pool=pool) + async def connect(self, *, force_close=True, recycle=False): + if self.total_connections <= self._max_connections: + self._connecting += 1 + try: + ws = await self._client_session.ws_connect(self._url) + self._open_connections +=1 + return Connection(ws, self._loop, force_close=force_close, + recycle=recycle, driver=self) + finally: + self._connecting -= 1 + else: + raise RuntimeError("To many connections, try recycling") + + async def recycle(self, *, force_close=False, recycle=True): + if self._reclaimed: + while self._reclaimed: + conn = self._reclaimed.popleft() + if not conn.closed: + logger.info("Reusing connection: {}".format(conn)) + break + else: + self._open_connections -= 1 + logger.debug( + "Discarded closed connection: {}".format(conn)) + elif self.total_connections < self.max_connections: + conn = await self.connect(force_close=force_close, + recycle=recycle) + logger.info("Acquired new connection: {}".format(conn)) + return conn + + async def reclaim(self, conn): + if self.total_connections <= self.max_connections: + if conn.closed: + # conn has been closed + logger.info( + "Released closed connection: {}".format(conn)) + self._open_connections -= 1 + conn = None + else: + self._reclaimed.append(conn) + else: + if conn.driver is self: + # hmmm + await conn.close() + self._open_connections -= 1 async def close(self): - if self._conn: - await self._conn.close() - self._conn = None + while self._reclaimed: + conn = self._reclaimed.popleft() + await conn.close() await self._client_session.close() self._client_session = None + self._closed = True + logger.debug("Driver {} has been closed".format(self)) class AsyncDriverConnectionContextManager: @@ -60,13 +121,18 @@ class AsyncDriverConnectionContextManager: class AsyncResponseIter: - def __init__(self, ws, loop, conn): + def __init__(self, ws, loop, conn, username, password, processor, session): self._ws = ws self._loop = loop self._conn = conn + self._username = username + self._password = password + self._processor = processor + self._session = session self._force_close = self._conn.force_close - self._force_release = self._conn.force_release + self._recycle = self._conn.recycle self._closed = False + self._response_queue = asyncio.Queue(loop=loop) async def __aiter__(self): return self @@ -85,20 +151,30 @@ class AsyncResponseIter: self._conn = None async def fetch_data(self): - if self._closed: - return + if not self._response_queue.empty(): + message = self._response_queue.get_nowait() + else: + asyncio.ensure_future(self._parse_data(), loop=self._loop) + message = await self._response_queue.get() + return message + + async def _parse_data(self): data = await self._ws.receive() + # parse aiohttp response here message = json.loads(data.data.decode("utf-8")) message = Message(message["status"]["code"], message["result"]["data"], message["status"]["message"], message["result"]["meta"]) if message.status_code in [200, 206, 204]: + self._response_queue.put_nowait(message) if message.status_code != 206: await self._term() - return message + self._response_queue.put_nowait(None) elif message.status_code == 407: - pass # auth + self._authenticate(self._username, self._password, + self._processor, self._session) + message = await self.fetch_data() else: await self._term() raise RuntimeError("{0} {1}".format(message.status_code, @@ -108,18 +184,20 @@ class AsyncResponseIter: self._closed = True if self._force_close: await self.close() - elif self._force_release: - await self._conn.release() + elif self._recycle: + await self._conn.reclaim() class Connection: - def __init__(self, ws, loop, *, force_close=True, force_release=False, - pool=None): + def __init__(self, ws, loop, *, force_close=True, recycle=False, + driver=None, username=None, password=None): self._ws = ws self._loop = loop self._force_close = force_close - self._force_release = force_release - self._pool = pool + self._recycle = recycle + self._driver = driver + self._username = username + self._password = password self._closed = False @property @@ -131,16 +209,16 @@ class Connection: return self._force_close @property - def force_release(self): - return self._force_release + def recycle(self): + return self._recycle @property - def pool(self): - return self._pool + def driver(self): + return self._driver - async def release(self): - if self._pool: - await self._pool.release(self) + async def reclaim(self): + if self.driver: + await self.driver.reclaim(self) def submit(self, gremlin, @@ -164,11 +242,14 @@ class Connection: request_id) self._ws.send_bytes(message) - return AsyncResponseIter(self._ws, self._loop, self) + return AsyncResponseIter(self._ws, self._loop, self, self._username, + self._password, processor, session) async def close(self): await self._ws.close() self._closed = True + self.driver._open_connections -= 1 + self._driver = None def _prepare_message(self, gremlin, bindings, lang, aliases, op, processor, session, request_id): diff --git a/goblin/gremlin_python_driver/pool.py b/goblin/gremlin_python_driver/pool.py deleted file mode 100644 index bc7f76d6b4ce8650e76f6176f11fb61e20894f86..0000000000000000000000000000000000000000 --- a/goblin/gremlin_python_driver/pool.py +++ /dev/null @@ -1,155 +0,0 @@ -import collections -import logging - -from goblin.gremlin_python_driver import driver - - -logger = logging.getLogger(__name__) - - -def create_pool(url, - loop, - maxsize=256, - force_close=False, - force_release=True): - return Pool(url, loop, maxsize=maxsize, force_close=force_close, - force_release=force_release) - - -class Pool(object): - """ - Pool of :py:class:`goblin.gremlin_python_driver.Connection` objects. - :param str url: url for Gremlin Server. - :param float timeout: timeout for establishing connection (optional). - Values ``0`` or ``None`` mean no timeout - :param str username: Username for SASL auth - :param str password: Password for SASL auth - :param gremlinclient.graph.GraphDatabase graph: The graph instances - used to create connections - :param int maxsize: Maximum number of connections. - :param loop: event loop - """ - def __init__(self, url, loop, maxsize=256, force_close=False, - force_release=True): - self._graph = url - self._loop = loop - self._maxsize = maxsize - self._force_close = force_close - self._force_release = force_release - self._pool = collections.deque() - self._acquired = set() - self._acquiring = 0 - self._closed = False - self._driver = driver.Driver(url, loop) - self._conn = None - - @property - def freesize(self): - """ - Number of free connections - :returns: int - """ - return len(self._pool) - - @property - def size(self): - """ - Total number of connections - :returns: int - """ - return len(self._acquired) + self._acquiring + self.freesize - - @property - def maxsize(self): - """ - Maximum number of connections - :returns: in - """ - return self._maxsize - - @property - def driver(self): - """ - Associated graph instance used for creating connections - :returns: :py:class:`gremlinclient.graph.GraphDatabase` - """ - return self._driver - - @property - def pool(self): - """ - Object that stores unused connections - :returns: :py:class:`collections.deque` - """ - return self._pool - - @property - def closed(self): - """ - Check if pool has been closed - :returns: bool - """ - return self._closed or self._graph is None - - def get(self): - return AsyncPoolConnectionContextManager(self) - - async def acquire(self): - """ - Acquire a connection from the Pool - :returns: Future - - :py:class:`asyncio.Future`, :py:class:`trollius.Future`, or - :py:class:`tornado.concurrent.Future` - """ - if self._pool: - while self._pool: - conn = self._pool.popleft() - if not conn.closed: - logger.debug("Reusing connection: {}".format(conn)) - self._acquired.add(conn) - break - else: - logger.debug( - "Discarded closed connection: {}".format(conn)) - elif self.size < self.maxsize: - self._acquiring += 1 - conn = await self.driver.connect(force_close=self._force_close, - force_release=self._force_release, pool=self) - self._acquiring -= 1 - self._acquired.add(conn) - logger.debug( - "Acquired new connection: {}".format(conn)) - - return conn - - async def release(self, conn): - """ - Release a connection back to the pool. - :param gremlinclient.connection.Connection: The connection to be - released - """ - if self.size <= self.maxsize: - if conn.closed: - # conn has been closed - logger.info( - "Released closed connection: {}".format(conn)) - self._acquired.remove(conn) - conn = None - else: - self._pool.append(conn) - self._acquired.remove(conn) - else: - await conn.close() - - async def close(self): - """ - Close pool - """ - while self.pool: - conn = self.pool.popleft() - await conn.close() - await self.driver.close() - self._driver = None - self._closed = True - logger.info( - "Connection pool {} has been closed".format(self)) diff --git a/goblin/mapper.py b/goblin/mapper.py index 58f0c5eba7a44ab2a3205e31b26ea9be8c2e2fbc..da15b010b02b9d2af1a22bedc5f125d68ce19677 100644 --- a/goblin/mapper.py +++ b/goblin/mapper.py @@ -1,7 +1,11 @@ """Helper functions and class to map between OGM Elements <-> DB Elements""" +import logging import inflection +logger = logging.getLogger(__name__) + + def props_generator(properties): for prop in properties: yield prop['ogm_name'], prop['db_name'], prop['data_type'] diff --git a/goblin/query.py b/goblin/query.py index fcc363fcb3f35c3dafa132430066818afa90a91e..1b0fdc1e9bfe8f68108742a1fad8a3af60c07236 100644 --- a/goblin/query.py +++ b/goblin/query.py @@ -1,7 +1,11 @@ """Query API and helpers""" +import logging from goblin import mapper +logger = logging.getLogger(__name__) + + def parse_traversal(traversal): script = traversal.translator.traversal_script bindings = traversal.bindings