Commit 90db2ba2 authored by davebshow's avatar davebshow
Browse files

gizmo reborn, needs a lot of work

parents
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
# C extensions
*.so
# Distribution / packaging
.Python
env/
build/
develop-eggs/
dist/
downloads/
eggs/
lib/
lib64/
parts/
sdist/
var/
*.egg-info/
.installed.cfg
*.egg
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.coverage
.cache
nosetests.xml
coverage.xml
# Translations
*.mo
*.pot
# Django stuff:
*.log
# Sphinx documentation
docs/_build/
# PyBuilder
target/
The MIT License (MIT)
Copyright (c) 2015 David Michael Brown
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
# aiogremlin 0.0.1 [(gizmo grew up)](https://pypi.python.org/pypi/gizmo/0.1.12)
`aiogremlin` is a **Python 3** driver for the the [Tinkerpop 3 Gremlin Server](http://www.tinkerpop.com/docs/3.0.0.M7/#gremlin-server). This module is built on [Asyncio](https://docs.python.org/3/library/asyncio.html). By default it uses the [aiohttp](http://aiohttp.readthedocs.org/en/v0.15.3/index.html) socket client implementation, but it is easy to plug in a different implementation. `aiogremlin` is currently in **alpha** mode, but all major functionality has test coverage.
## Getting started
Since Python 3.4 is not the default version on many systems, it's nice to create a virtualenv that uses Python 3.4 by default. Then use pip to install `aiogremlin`. Using virtualenvwrapper on Ubuntu 14.04:
```bash
$ mkvirtualenv -p /usr/bin/python3.4 aiogremlin
$ pip install aiogremlin
```
Fire up the Gremlin Server:
```bash
$ ./bin/gremlin-server.sh
```
The `AsyncGremlinClient` communicates asynchronously with the Gremlin Server using websockets. The client uses a combination of [asyncio.coroutine](https://docs.python.org/3/library/asyncio-task.html#coroutines) and [asyncio.Task](https://docs.python.org/3/library/asyncio-task.html#task) run on Asyncio's pluggable event loop to achieve this communication.
The majority of ``AsyncGremlinClient`` methods are an `asyncio.coroutine`, so you will also need to use either `asyncio` or the `aiogremlin` [Task API](#task-api). The following examples use `asyncio` to demonstrate the use of the AsyncioGremlineClient.
The Gremlin Server sends responses in chunks, so `AsyncGremlinClient.submit` returns a list of gremlin response objects:
```python
>>> import asyncio
>>> loop = asyncio.get_event_loop()
>>> gc = GremlinClient('ws://localhost:8182/', loop=loop)
>>> submit = gc.submit("x + x", bindings={"x": 4})
>>> result = loop.run_until_complete(submit)
>>> result
[[8]]
>>> resp = result[0]
>>> resp.status_code
200
>>> resp
[8]
>>> loop.run_until_complete(gc.close())
>>> loop.close()
```
====================================================
aiogremlin - Async Python 3 driver for TP3 Gremlin Server
====================================================
**alpha**
`Official Documentation`_
.. _Official Documentation:
from .abc import AbstractBaseFactory, AbstractBaseConnection
from .connection import WebsocketPool, AiohttpFactory
from .client import GremlinClient
from .contextmanager import GremlinContext
from .exceptions import RequestError, GremlinServerError, SocketClientError
from .tasks import async, Group, Chain, Chord
__version__ = "0.0.1"
import asyncio
from abc import ABCMeta, abstractmethod
class AbstractBaseFactory(metaclass=ABCMeta):
@classmethod
@abstractmethod
def connect(cls):
pass
@property
def factory(self):
return self
class AbstractBaseConnection(metaclass=ABCMeta):
def __init__(self, socket, pool=None):
self.socket = socket
self._pool = pool
def feed_pool(self):
if self.pool:
if self in self.pool.active_conns:
self.pool.feed_pool(self)
@asyncio.coroutine
def release(self):
yield from self.close()
if self in self.pool.active_conns:
self.pool.active_conns.discard(self)
@property
def pool(self):
return self._pool
@property
@abstractmethod
def closed(self):
pass
@abstractmethod
def close():
pass
@abstractmethod
def send(self):
pass
@abstractmethod
def recv(self):
pass
"""
"""
import asyncio
import json
import ssl
import uuid
from .connection import WebsocketPool
from .log import client_logger
from .protocol import gremlin_response_parser, GremlinWriter
from .response import GremlinResponse
from .tasks import async
class GremlinBase:
def __init__(self, uri='ws://localhost:8182/', loop=None, ssl=None,
protocol=None, lang="gremlin-groovy", op="eval",
processor="", pool=None, factory=None, poolsize=10,
timeout=None, **kwargs):
"""
"""
self.uri = uri
self.ssl = ssl
self.protocol = protocol
# if self.ssl:
# protocol = protocol or ssl.PROTOCOL_TLSv1
# ssl_context = ssl.SSLContext(protocol)
# ssl_context.load_verify_locations(ssl)
# ssl_context.verify_mode = ssl.CERT_REQUIRED
# self.ssl_context = ssl_context # This will go to conn pool... use TCPConnector?
self._loop = loop or asyncio.get_event_loop()
self.lang = lang or "gremlin-groovy"
self.op = op or "eval"
self.processor = processor or ""
self.poolsize = poolsize
self.timeout = timeout
self.pool = pool or WebsocketPool(uri, factory=factory,
poolsize=poolsize, timeout=timeout, loop=self._loop)
self.factory = factory or self.pool.factory
class GremlinClient(GremlinBase):
@property
def loop(self):
return self._loop
@asyncio.coroutine
def close(self):
yield from self.pool.close()
@asyncio.coroutine
def connect(self, **kwargs):
"""
"""
loop = kwargs.get("loop", "") or self.loop
connection = yield from self.factory.connect(self.uri, loop=loop,
**kwargs)
return connection
@asyncio.coroutine
def send(self, gremlin, connection=None, bindings=None, lang=None,
op=None, processor=None, binary=True):
"""
"""
lang = lang or self.lang
op = op or self.op
processor = processor or self.processor
message = json.dumps({
"requestId": str(uuid.uuid4()),
"op": op,
"processor": processor,
"args":{
"gremlin": gremlin,
"bindings": bindings,
"language": lang
}
})
if connection is None:
connection = yield from self.pool.connect(self.uri, loop=self.loop)
writer = GremlinWriter(connection)
yield from writer.send(message, binary=binary)
return connection
@asyncio.coroutine
def submit(self, gremlin, bindings=None, lang=None,
op=None, processor=None, consumer=None, collect=True, **kwargs):
"""
"""
lang = lang or self.lang
op = op or self.op
processor = processor or self.processor
connection = yield from self.send(gremlin, bindings=bindings, lang=lang,
op=op, processor=processor)
results = yield from self.run(connection, consumer=consumer,
collect=collect)
return results
def s(self, *args, **kwargs):
"""
"""
if not kwargs.get("loop", ""):
kwargs["loop"] = self.loop
return async(self.submit, *args, **kwargs)
@asyncio.coroutine
def recv(self, connection):
"""
"""
return (yield from gremlin_response_parser(connection))
@asyncio.coroutine
def run(self, connection, consumer=None, collect=True):
"""
"""
results = []
while True:
message = yield from self.recv(connection)
if message is None:
break
message = GremlinResponse(message)
if consumer:
message = consumer(message)
if asyncio.iscoroutine(message):
message = yield from asyncio.async(message, loop=self.loop)
if message and collect:
results.append(message)
return results
"""
"""
import asyncio
import aiohttp
from .abc import AbstractBaseFactory, AbstractBaseConnection
from .exceptions import SocketClientError
from .log import INFO, conn_logger
class WebsocketPool:
def __init__(self, uri='ws://localhost:8182/', factory=None, poolsize=10,
max_retries=10, timeout=None, loop=None, verbose=False):
"""
"""
self.uri = uri
self._factory = factory or AiohttpFactory
self.poolsize = poolsize
self.max_retries = max_retries
self.timeout = timeout
self._loop = loop or asyncio.get_event_loop()
self.pool = asyncio.Queue(maxsize=self.poolsize, loop=self._loop)
self.active_conns = set()
self.num_connecting = 0
self._closed = False
if verbose:
conn_logger.setLevel(INFO)
@property
def loop(self):
return self._loop
@property
def factory(self):
return self._factory
@property
def num_active_conns(self):
return len(self.active_conns)
def feed_pool(self, conn):
self.active_conns.discard(conn)
self._put(conn)
@asyncio.coroutine
def close(self):
if not self._closed:
if self.active_conns:
yield from self._close_active_conns()
yield from self._purge_pool()
self._closed = True
@asyncio.coroutine
def _close_active_conns(self):
tasks = [asyncio.async(conn.close(), loop=self.loop) for conn
in self.active_conns]
yield from asyncio.wait(tasks, loop=self.loop)
@asyncio.coroutine
def _purge_pool(self):
while True:
try:
conn = self.pool.get_nowait()
except asyncio.QueueEmpty:
break
else:
yield from conn.close()
@asyncio.coroutine
def connect(self, uri=None, loop=None, num_retries=None):
if num_retries is None:
num_retries = self.max_retries
uri = uri or self.uri
loop = loop or self.loop
if not self.pool.empty():
socket = self.pool.get_nowait()
conn_logger.info("Reusing socket: {} at {}".format(socket, uri))
elif (self.num_active_conns + self.num_connecting >= self.poolsize or
not self.poolsize):
conn_logger.info("Waiting for socket...")
socket = yield from asyncio.wait_for(self.pool.get(),
self.timeout, loop=loop)
conn_logger.info("Socket acquired: {} at {}".format(socket, uri))
else:
self.num_connecting += 1
try:
socket = yield from self.factory.connect(uri, pool=self,
loop=loop)
except:
raise
else:
conn_logger.info("New connection on socket: {} at {}".format(
socket, uri))
finally:
self.num_connecting -= 1
if not socket.closed:
self.active_conns.add(socket)
# Untested.
elif num_retries > 0:
socket = yield from self.connect(uri, loop, num_retries - 1)
else:
raise RuntimeError("Unable to connect, max retries exceeded.")
return socket
def _put(self, socket):
try:
self.pool.put_nowait(socket)
except asyncio.QueueFull:
pass
class AiohttpFactory(AbstractBaseFactory):
@classmethod
@asyncio.coroutine
def connect(cls, uri='ws://localhost:8182/', pool=None, protocols=(),
connector=None, autoclose=False, autoping=False, loop=None):
if pool:
loop = loop or pool.loop
try:
socket = yield from aiohttp.ws_connect(uri, protocols=protocols,
connector=connector, autoclose=autoclose, autoping=autoping,
loop=loop)
except aiohttp.WSServerHandshakeError as e:
raise SocketClientError(e.message)
return AiohttpConnection(socket, pool)
class AiohttpConnection(AbstractBaseConnection):
def __init__(self, socket, pool=None):
super().__init__(socket, pool=pool)
def __str__(self):
return "{} wrapping {}".format(repr(self), repr(self.socket))
@property
def closed(self):
return self.socket.closed
@asyncio.coroutine
def close(self):
yield from self.socket.close()
@asyncio.coroutine
def send(self, message, binary=True):
if binary:
method = self.socket.send_bytes
else:
method = self.socket.send_str
try:
method(message)
except RuntimeError:
# Socket closed.
yield from self.release()
raise
except TypeError:
# Bytes/string input error.
yield from self.release()
raise
@asyncio.coroutine
def recv(self):
while True:
try:
message = yield from self.socket.receive()
except (asyncio.CancelledError, asyncio.TimeoutError):
yield from self.release()
raise
except RuntimeError:
yield from self.release()
raise
if message.tp == aiohttp.MsgType.binary:
return message.data.decode()
elif message.tp == aiohttp.MsgType.text:
return message.data.strip()
elif message.tp == aiohttp.MsgType.ping:
conn_logger.warn("Ping received.")
ws.pong()
conn_logger.warn("Sent pong.")
elif message.tp == aiohttp.MsgType.pong:
conn_logger.warn('Pong received')
else:
try:
if message.tp == aiohttp.MsgType.release:
conn_logger.warn("Socket connection closed by server.")
elif message.tp == aiohttp.MsgType.error:
raise SocketClientError(self.socket.exception())
elif message.tp == aiohttp.MsgType.closed:
raise SocketClientError("Socket closed.")
break
finally:
yield from self.release()
from contextlib import contextmanager
from .client import GremlinBase, GremlinClient
from .connection import WebsocketPool
class GremlinContext(GremlinBase):
# Untested.
@property
def client(self):
return self._client()
@property
def pool(self):
return self._pool()
@contextmanager
def _client(self):
client = GremlinClient(uri=self.uri, loop=self._loop, ssl=self.ssl,
protocol=self.protocol, lang=self.lang, op=self.op,
processor=self.processor, pool=self.pool, factory=self.factory,
poolsize=self.poolsize, timeout=self.timeout)
try:
yield client
finally:
yield from client.close()
@contextmanager
def _pool(self):
pool = WebsocketPool(uri=self.uri, loop=self._loop,
factory=self.factory, poolsize=self.poolsize, timeout=self.timeout)
try:
yield pool
finally:
yield from pool.close()
"""
gizmo.exceptions
This module defines exceptions for the Gremlin Server.
"""
class SocketClientError(IOError): pass
class StatusException(IOError):
def __init__(self, value, result):
"""
Handle all exceptions returned from the Gremlin Server as per:
https://github.com/apache/incubator-tinkerpop/blob/ddd0b36bed9a2b1ce5b335b1753d881f0614a6c4/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/message/ResponseStatusCode.java
:param value: ResultCode:
:param value: message:
"""
self.value = value
self.response = {
498: ("MALFORMED_REQUEST",
("The request message was not properly formatted which " +
"means it could not be parsed at all or the 'op' code was " +
"not recognized such that Gremlin Server could properly " +
"route it for processing. Check the message format and " +
"retry the request")),
499: ("INVALID_REQUEST_ARGUMENTS",
("The request message was parseable, but the arguments " +
"supplied in the message were in conflict or incomplete. " +
"Check the message format and retry the request.")),
500: ("SERVER_ERROR",
("A general server error occurred that prevented the " +
"request from being processed.")),
596: ("TRAVERSAL_EVALUATION",
("The remote " +
"{@link org.apache.tinkerpop.gremlin.process.Traversal} " +
"submitted for processing evaluated in on the server with " +
"errors and could not be processed")),
597: ("SCRIPT_EVALUATION",
("The script submitted for processing evaluated in the " +
"{@code ScriptEngine} with errors and could not be " +
"processed.Check the script submitted for syntax errors " +
"or other problems and then resubmit.")),
598: ("TIMEOUT",
("The server exceeded one of the timeout settings for the " +
"request and could therefore only partially respond or " +
" not respond at all.")),
599: ("SERIALIZATION",
("The server was not capable of serializing an object that " +
"was returned from the script supplied on the request. " +
"Either transform the object into something Gremlin " +
"Server can process within the script or install mapper " +
"serialization classes to Gremlin Server."))
}
if result:
result = "\n\n{}".format(result)
self.message = 'Code [{}]: {}. {}.{}'.format(self.value,