Commit 183800e5 authored by davebshow's avatar davebshow
Browse files

sasl is up and running, sessions tests broken

parent ff2ea9b7
...@@ -34,7 +34,7 @@ class GremlinClient: ...@@ -34,7 +34,7 @@ class GremlinClient:
def __init__(self, *, url='http://localhost:8182/', loop=None, def __init__(self, *, url='http://localhost:8182/', loop=None,
lang="gremlin-groovy", op="eval", processor="", lang="gremlin-groovy", op="eval", processor="",
timeout=None, ws_connector=None, connector=None, timeout=None, ws_connector=None, client_session=None,
username="", password=""): username="", password=""):
self._lang = lang self._lang = lang
self._op = op self._op = op
...@@ -46,10 +46,6 @@ class GremlinClient: ...@@ -46,10 +46,6 @@ class GremlinClient:
self._timeout = timeout self._timeout = timeout
self._username = username self._username = username
self._password = password self._password = password
if connector is None:
connector = aiohttp.TCPConnector(verify_ssl=False, loop=self._loop)
client_session = aiohttp.ClientSession(connector=connector,
loop=self._loop)
if ws_connector is None: if ws_connector is None:
ws_connector = GremlinConnector(loop=self._loop, ws_connector = GremlinConnector(loop=self._loop,
client_session=client_session) client_session=client_session)
...@@ -211,10 +207,12 @@ class GremlinClientSession(GremlinClient): ...@@ -211,10 +207,12 @@ class GremlinClientSession(GremlinClient):
def __init__(self, *, url='http://localhost:8182/', loop=None, def __init__(self, *, url='http://localhost:8182/', loop=None,
lang="gremlin-groovy", op="eval", processor="session", lang="gremlin-groovy", op="eval", processor="session",
session=None, timeout=None, session=None, timeout=None, client_session=None,
ws_connector=None): ws_connector=None, username="", password=""):
super().__init__(url=url, lang=lang, op=op, processor=processor, super().__init__(url=url, lang=lang, op=op, processor=processor,
loop=loop, timeout=timeout, ws_connector=ws_connector) loop=loop, timeout=timeout, ws_connector=ws_connector,
client_session=client_session, username=username,
password=password)
if session is None: if session is None:
session = str(uuid.uuid4()) session = str(uuid.uuid4())
...@@ -287,7 +285,6 @@ class GremlinResponse: ...@@ -287,7 +285,6 @@ class GremlinResponse:
@asyncio.coroutine @asyncio.coroutine
def _run(self): def _run(self):
import ipdb; ipdb.set_trace()
results = [] results = []
while True: while True:
message = yield from self._stream.read() message = yield from self._stream.read()
...@@ -337,6 +334,8 @@ class GremlinResponseStream: ...@@ -337,6 +334,8 @@ class GremlinResponseStream:
writer = GremlinWriter(self._ws) writer = GremlinWriter(self._ws)
writer.write(op="authentication", username=self._username, writer.write(op="authentication", username=self._username,
password=self._password) password=self._password)
asyncio.Task(self._ws.receive(), loop=self._loop)
message = yield from self._stream.read()
except (RequestError, GremlinServerError): except (RequestError, GremlinServerError):
yield from self._ws.release() yield from self._ws.release()
raise raise
......
"""Implements the Gremlin Server subprotocol.""" """Implements the Gremlin Server subprotocol."""
import base64
import collections import collections
import uuid import uuid
...@@ -62,12 +63,13 @@ class GremlinWriter: ...@@ -62,12 +63,13 @@ class GremlinWriter:
session) session)
if op == "authentication": if op == "authentication":
message = self._authenticate(username, password, session, processor) message = self._authenticate(
username, password, session, processor)
message = json.dumps(message) message = json.dumps(message)
if binary: if binary:
message = self._set_message_header(message, mime_type) message = self._set_message_header(message, mime_type)
self.ws.send(message, binary=binary) self.ws.send(message, binary=binary)
print(message) # print(message)
return self.ws return self.ws
@staticmethod @staticmethod
...@@ -100,16 +102,17 @@ class GremlinWriter: ...@@ -100,16 +102,17 @@ class GremlinWriter:
message["args"].update({"session": session}) message["args"].update({"session": session})
return message return message
@staticmethod @staticmethod
def _authenticate(username, password, session, processor): def _authenticate(username, password, session, processor):
auth_bytes = "".join(["0", username, "0", password]) auth = b"".join([b"\x00", bytes(username, "utf-8"), b"\x00", bytes(password, "utf-8")])
print(auth_bytes) print("auth:",auth)
message = { message = {
"requestId": str(uuid.uuid4()), "requestId": str(uuid.uuid4()),
"op": "authentication", "op": "authentication",
"processor": processor, "processor": processor,
"args": { "args": {
"sasl": auth_bytes "sasl": base64.b64encode(auth)
} }
} }
if session is None: if session is None:
......
...@@ -4,9 +4,10 @@ ...@@ -4,9 +4,10 @@
import asyncio import asyncio
import unittest import unittest
import uuid import uuid
import aiohttp
from aiogremlin import (submit, GremlinConnector, GremlinClient, from aiogremlin import (submit, GremlinConnector, GremlinClient,
GremlinClientSession, GremlinServerError) GremlinClientSession, GremlinServerError,
GremlinClientWebSocketResponse)
class SubmitTest(unittest.TestCase): class SubmitTest(unittest.TestCase):
...@@ -22,220 +23,242 @@ class SubmitTest(unittest.TestCase): ...@@ -22,220 +23,242 @@ class SubmitTest(unittest.TestCase):
@asyncio.coroutine @asyncio.coroutine
def go(): def go():
resp = yield from submit("4 + 4", url='https://localhost:8182/', resp = yield from submit("x + x", url='https://localhost:8182/',
bindings={"x": 4}, loop=self.loop, bindings={"x": 4}, loop=self.loop,
username="stephen", password="password") username="stephen", password="password")
results = yield from resp.get() results = yield from resp.get()
return results return results
results = self.loop.run_until_complete(go())
self.assertEqual(results[0].data[0], 8)
def test_rebinding(self):
@asyncio.coroutine
def go1():
result = yield from submit("graph2.addVertex()",
url='https://localhost:8182/',
loop=self.loop, username="stephen",
password="password")
resp = yield from result.get()
try:
self.loop.run_until_complete(go1())
error = False
except GremlinServerError:
error = True
self.assertTrue(error)
@asyncio.coroutine
def go2():
result = yield from submit(
"graph2.addVertex()", url='https://localhost:8182/',
rebindings={"graph2": "graph"}, loop=self.loop,
username="stephen", password="password")
resp = yield from result.get()
self.assertEqual(len(resp), 1)
try:
self.loop.run_until_complete(go2())
except GremlinServerError:
print("RELEASE DOES NOT SUPPORT REBINDINGS")
class GremlinClientTest(unittest.TestCase):
def setUp(self):
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(None)
connector = aiohttp.TCPConnector(force_close=False, loop=self.loop,
verify_ssl=False)
client_session = aiohttp.ClientSession(
connector=connector, loop=self.loop,
ws_response_class=GremlinClientWebSocketResponse)
self.gc = GremlinClient(url="https://localhost:8182/", loop=self.loop,
username="stephen", password="password",
client_session=client_session)
def tearDown(self):
self.loop.run_until_complete(self.gc.close())
self.loop.close()
def test_connection(self):
@asyncio.coroutine
def go():
ws = yield from self.gc._connector.ws_connect(self.gc.url)
self.assertFalse(ws.closed)
yield from ws.close()
self.loop.run_until_complete(go())
def test_execute(self):
@asyncio.coroutine
def go():
resp = yield from self.gc.execute("x + x", bindings={"x": 4})
return resp
results = self.loop.run_until_complete(go()) results = self.loop.run_until_complete(go())
self.assertEqual(results[0].data[0], 8) self.assertEqual(results[0].data[0], 8)
# def test_rebinding(self): def test_sub_waitfor(self):
# sub1 = self.gc.execute("x + x", bindings={"x": 1})
# @asyncio.coroutine sub2 = self.gc.execute("x + x", bindings={"x": 2})
# def go1(): sub3 = self.gc.execute("x + x", bindings={"x": 4})
# result = yield from submit("graph2.addVertex()", loop=self.loop) coro = asyncio.gather(*[asyncio.async(sub1, loop=self.loop),
# resp = yield from result.get() asyncio.async(sub2, loop=self.loop),
# asyncio.async(sub3, loop=self.loop)],
# try: loop=self.loop)
# self.loop.run_until_complete(go1()) # Here I am looking for resource warnings.
# error = False results = self.loop.run_until_complete(coro)
# except GremlinServerError: self.assertIsNotNone(results)
# error = True
# self.assertTrue(error) def test_resp_stream(self):
# @asyncio.coroutine
# @asyncio.coroutine def stream_coro():
# def go2(): results = []
# result = yield from submit( resp = yield from self.gc.submit("x + x", bindings={"x": 4})
# "graph2.addVertex()", rebindings={"graph2": "graph"}, while True:
# loop=self.loop) f = yield from resp.stream.read()
# resp = yield from result.get() if f is None:
# self.assertEqual(len(resp), 1) break
# results.append(f)
# try: self.assertEqual(results[0].data[0], 8)
# self.loop.run_until_complete(go2()) self.loop.run_until_complete(stream_coro())
# except GremlinServerError:
# print("RELEASE DOES NOT SUPPORT REBINDINGS") def test_execute_error(self):
# execute = self.gc.execute("x + x g.asdfas", bindings={"x": 4})
# try:
# class GremlinClientTest(unittest.TestCase): self.loop.run_until_complete(execute)
# error = False
# def setUp(self): except:
# self.loop = asyncio.new_event_loop() error = True
# asyncio.set_event_loop(None) self.assertTrue(error)
# self.gc = GremlinClient(url="http://localhost:8182/", loop=self.loop)
# def test_rebinding(self):
# def tearDown(self): execute = self.gc.execute("graph2.addVertex()")
# self.loop.run_until_complete(self.gc.close()) try:
# self.loop.close() self.loop.run_until_complete(execute)
# error = False
# def test_connection(self): except GremlinServerError:
# error = True
# @asyncio.coroutine self.assertTrue(error)
# def go():
# ws = yield from self.gc._connector.ws_connect(self.gc.url) @asyncio.coroutine
# self.assertFalse(ws.closed) def go():
# yield from ws.close() result = yield from self.gc.execute(
# "graph2.addVertex()", rebindings={"graph2": "graph"})
# self.loop.run_until_complete(go()) self.assertEqual(len(result), 1)
#
# def test_execute(self): try:
# self.loop.run_until_complete(go())
# @asyncio.coroutine except GremlinServerError:
# def go(): print("RELEASE DOES NOT SUPPORT REBINDINGS")
# resp = yield from self.gc.execute("x + x", bindings={"x": 4})
# return resp
# class GremlinClientSessionTest(unittest.TestCase):
# results = self.loop.run_until_complete(go())
# self.assertEqual(results[0].data[0], 8) def setUp(self):
# self.loop = asyncio.new_event_loop()
# def test_sub_waitfor(self): asyncio.set_event_loop(None)
# sub1 = self.gc.execute("x + x", bindings={"x": 1}) connector = aiohttp.TCPConnector(force_close=False, loop=self.loop,
# sub2 = self.gc.execute("x + x", bindings={"x": 2}) verify_ssl=False)
# sub3 = self.gc.execute("x + x", bindings={"x": 4})
# coro = asyncio.gather(*[asyncio.async(sub1, loop=self.loop), client_session = aiohttp.ClientSession(
# asyncio.async(sub2, loop=self.loop), connector=connector, loop=self.loop,
# asyncio.async(sub3, loop=self.loop)], ws_response_class=GremlinClientWebSocketResponse)
# loop=self.loop)
# # Here I am looking for resource warnings. self.gc = GremlinClientSession(url="https://localhost:8182/",
# results = self.loop.run_until_complete(coro) loop=self.loop,
# self.assertIsNotNone(results) username="stephen", password="password",
# client_session=client_session)
# def test_resp_stream(self):
# @asyncio.coroutine self.script1 = """v=graph.addVertex('name', 'Dave')"""
# def stream_coro():
# results = [] self.script2 = "v.property('name')"
# resp = yield from self.gc.submit("x + x", bindings={"x": 4})
# while True: def tearDown(self):
# f = yield from resp.stream.read() self.loop.run_until_complete(self.gc.close())
# if f is None: self.loop.close()
# break
# results.append(f) def test_session(self):
# self.assertEqual(results[0].data[0], 8)
# self.loop.run_until_complete(stream_coro()) @asyncio.coroutine
# def go():
# def test_execute_error(self): yield from self.gc.execute(self.script1)
# execute = self.gc.execute("x + x g.asdfas", bindings={"x": 4}) results = yield from self.gc.execute(self.script2)
# try: return results
# self.loop.run_until_complete(execute)
# error = False results = self.loop.run_until_complete(go())
# except: self.assertEqual(results[0].data[0]['value'], 'Dave')
# error = True
# self.assertTrue(error) # def test_session_reset(self):
# #
# def test_rebinding(self): # @asyncio.coroutine
# execute = self.gc.execute("graph2.addVertex()") # def go():
# try: # yield from self.gc.execute(self.script1)
# self.loop.run_until_complete(execute) # self.gc.reset_session()
# error = False # results = yield from self.gc.execute(self.script2)
# except GremlinServerError: # return results
# error = True # try:
# self.assertTrue(error) # results = self.loop.run_until_complete(go())
# # error = False
# @asyncio.coroutine # except GremlinServerError:
# def go(): # error = True
# result = yield from self.gc.execute( # self.assertTrue(error)
# "graph2.addVertex()", rebindings={"graph2": "graph"}) #
# self.assertEqual(len(result), 1) # def test_session_manual_reset(self):
# #
# try: # @asyncio.coroutine
# self.loop.run_until_complete(go()) # def go():
# except GremlinServerError: # yield from self.gc.execute(self.script1)
# print("RELEASE DOES NOT SUPPORT REBINDINGS") # new_sess = str(uuid.uuid4())
# # sess = self.gc.reset_session(session=new_sess)
# # self.assertEqual(sess, new_sess)
# class GremlinClientSessionTest(unittest.TestCase): # self.assertEqual(self.gc.session, new_sess)
# # results = yield from self.gc.execute(self.script2)
# def setUp(self): # return results
# self.loop = asyncio.new_event_loop() # try:
# asyncio.set_event_loop(None) # results = self.loop.run_until_complete(go())
# self.gc = GremlinClientSession(url="http://localhost:8182/", # error = False
# loop=self.loop) # except GremlinServerError:
# self.script1 = """v = graph.addVertex('name', 'Dave')""" # error = True
# # self.assertTrue(error)
# self.script2 = "v.property('name')" #
# # def test_session_set(self):
# def tearDown(self): #
# self.loop.run_until_complete(self.gc.close()) # @asyncio.coroutine
# self.loop.close() # def go():
# # yield from self.gc.execute(self.script1)
# def test_session(self): # new_sess = str(uuid.uuid4())
# # self.gc.session = new_sess
# @asyncio.coroutine # self.assertEqual(self.gc.session, new_sess)
# def go(): # results = yield from self.gc.execute(self.script2)
# yield from self.gc.execute(self.script1) # return results
# results = yield from self.gc.execute(self.script2) # try:
# return results # results = self.loop.run_until_complete(go())
# # error = False
# results = self.loop.run_until_complete(go()) # except GremlinServerError:
# self.assertEqual(results[0].data[0]['value'], 'Dave') # error = True
# # self.assertTrue(error)
# def test_session_reset(self): #
# # def test_resp_session(self):
# @asyncio.coroutine #
# def go(): # @asyncio.coroutine
# yield from self.gc.execute(self.script1) # def go():
# self.gc.reset_session() # session = str(uuid.uuid4())
# results = yield from self.gc.execute(self.script2) # self.gc.session = session
# return results # resp = yield from self.gc.submit("x + x", bindings={"x": 4})
# try: # while True:
# results = self.loop.run_until_complete(go()) # f = yield from resp.stream.read()
# error = False # if f is None:
# except GremlinServerError: # break
# error = True # self.assertEqual(resp.session, session)
# self.assertTrue(error) #
# # self.loop.run_until_complete(go())
# def test_session_manual_reset(self): #
#
# @asyncio.coroutine
# def go():
# yield from self.gc.execute(self.script1)
# new_sess = str(uuid.uuid4())
# sess = self.gc.reset_session(session=new_sess)
# self.assertEqual(sess, new_sess)
# self.assertEqual(self.gc.session, new_sess)
# results = yield from self.gc.execute(self.script2)
# return results
# try:
# results = self.loop.run_until_complete(go())
# error = False
# except GremlinServerError:
# error = True
# self.assertTrue(error)
#
# def test_session_set(self):
#
# @asyncio.coroutine
# def go():
# yield from self.gc.execute(self.script1)
# new_sess = str(uuid.uuid4())
# self.gc.session = new_sess
# self.assertEqual(self.gc.session, new_sess)
# results = yield from self.gc.execute(self.script2)
# return results
# try:
# results = self.loop.run_until_complete(go())
# error = False
# except GremlinServerError:
# error = True
# self.assertTrue(error)
#
# def test_resp_session(self):
#
# @asyncio.coroutine
# def go():
# session = str(uuid.uuid4())
# self.gc.session = session
# resp = yield from self.gc.submit("x + x", bindings={"x": 4})
# while True:
# f = yield from resp.stream.read()
# if f is None:
# break
# self.assertEqual(resp.session, session)
#
# self.loop.run_until_complete(go())
#
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment