From ff2ea9b773f93a19a5f4df8c6349a8ba8af3aa46 Mon Sep 17 00:00:00 2001 From: davebshow <davebshow@gmail.com> Date: Fri, 18 Sep 2015 15:33:02 -0400 Subject: [PATCH] working on new auth. broken. --- aiogremlin/client.py | 37 +++- aiogremlin/exceptions.py | 2 + aiogremlin/subprotocol.py | 45 ++++- requirements.txt | 1 - tests/tests.py | 415 +++++++++++++++++++------------------- 5 files changed, 271 insertions(+), 229 deletions(-) diff --git a/aiogremlin/client.py b/aiogremlin/client.py index 079403d..429c5c1 100644 --- a/aiogremlin/client.py +++ b/aiogremlin/client.py @@ -28,15 +28,14 @@ class GremlinClient: :param str processor: Gremlin Server processor argument. "" by default. :param float timeout: timeout for establishing connection (optional). Values ``0`` or ``None`` mean no timeout - :param connector: A class that implements the method ``ws_connect``. + :param ws_connector: A class that implements the method ``ws_connect``. Usually an instance of ``aiogremlin.connector.GremlinConnector`` """ def __init__(self, *, url='http://localhost:8182/', loop=None, lang="gremlin-groovy", op="eval", processor="", - timeout=None, ws_connector=None, connector=None): - """ - """ + timeout=None, ws_connector=None, connector=None, + username="", password=""): self._lang = lang self._op = op self._processor = processor @@ -45,6 +44,8 @@ class GremlinClient: self._session = None self._url = url self._timeout = timeout + self._username = username + self._password = password if connector is None: connector = aiohttp.TCPConnector(verify_ssl=False, loop=self._loop) client_session = aiohttp.ClientSession(connector=connector, @@ -146,12 +147,14 @@ class GremlinClient: writer = GremlinWriter(ws) - ws = writer.write(gremlin, bindings=bindings, lang=lang, + ws = writer.write(gremlin=gremlin, bindings=bindings, lang=lang, rebindings=rebindings, op=op, processor=processor, binary=binary, session=session) - return GremlinResponse(ws, session=session, loop=self._loop) + return GremlinResponse(ws, username=self._username, + password=self._password, session=session, + loop=self._loop) @asyncio.coroutine def execute(self, gremlin, *, bindings=None, lang=None, rebindings=None, @@ -252,10 +255,12 @@ class GremlinResponse: `asyncio.get_event_loop` is used for getting default event loop (optional) """ - def __init__(self, ws, *, session=None, loop=None): + def __init__(self, ws, *, session=None, loop=None, username="", + password=""): self._loop = loop or asyncio.get_event_loop() self._session = session - self._stream = GremlinResponseStream(ws, loop=self._loop) + self._stream = GremlinResponseStream(ws, username, password, + loop=self._loop) @property def stream(self): @@ -282,6 +287,7 @@ class GremlinResponse: @asyncio.coroutine def _run(self): + import ipdb; ipdb.set_trace() results = [] while True: message = yield from self._stream.read() @@ -302,8 +308,10 @@ class GremlinResponseStream: `asyncio.get_event_loop` is used for getting default event loop (optional) """ - def __init__(self, ws, loop=None): + def __init__(self, ws, username, password, loop=None): self._ws = ws + self._username = username + self._password = password self._loop = loop or asyncio.get_event_loop() data_stream = aiohttp.DataQueue(loop=self._loop) self._stream = self._ws.parser.set_parser(gremlin_response_parser, @@ -325,6 +333,10 @@ class GremlinResponseStream: asyncio.Task(self._ws.receive(), loop=self._loop) try: message = yield from self._stream.read() + if message.status_code == 407: + writer = GremlinWriter(self._ws) + writer.write(op="authentication", username=self._username, + password=self._password) except (RequestError, GremlinServerError): yield from self._ws.release() raise @@ -341,7 +353,9 @@ def submit(gremlin, *, processor="", timeout=None, session=None, - loop=None): + loop=None, + username="", + password=""): """ :ref:`coroutine<coroutine>` @@ -377,7 +391,8 @@ def submit(gremlin, *, ws_response_class=GremlinClientWebSocketResponse) gremlin_client = GremlinClient(url=url, loop=loop, - ws_connector=client_session) + ws_connector=client_session, + username=username, password=password) try: resp = yield from gremlin_client.submit( diff --git a/aiogremlin/exceptions.py b/aiogremlin/exceptions.py index 6f30935..890932a 100644 --- a/aiogremlin/exceptions.py +++ b/aiogremlin/exceptions.py @@ -10,6 +10,8 @@ class StatusException(IOError): """ self.value = value self.response = { + 401: ("UNAUTHORIZED", ("The request attempted to access resources " + + "that the requesting user did not have access to")), 498: ("MALFORMED_REQUEST", ("The request message was not properly formatted which " + "means it could not be parsed at all or the 'op' code " + diff --git a/aiogremlin/subprotocol.py b/aiogremlin/subprotocol.py index e4feedd..3bfb175 100644 --- a/aiogremlin/subprotocol.py +++ b/aiogremlin/subprotocol.py @@ -29,7 +29,7 @@ def gremlin_response_parser(out, buf): if message.status_code == 200: out.feed_data(message) out.feed_eof() - elif message.status_code == 206: + elif message.status_code == 206 or message.status_code == 407: out.feed_data(message) elif message.status_code == 204: out.feed_data(message) @@ -46,22 +46,28 @@ class GremlinWriter: def __init__(self, ws): self.ws = ws - def write(self, gremlin, bindings=None, lang="gremlin-groovy", + def write(self, *, gremlin="", bindings=None, lang="gremlin-groovy", rebindings=None, op="eval", processor="", session=None, - binary=True, mime_type="application/json"): + binary=True, mime_type="application/json", username="", + password=""): if rebindings is None: rebindings = {} - message = self._prepare_message(gremlin, - bindings, - lang, - rebindings, - op, - processor, - session) + if op == "eval": + message = self._prepare_message(gremlin, + bindings, + lang, + rebindings, + op, + processor, + session) + + if op == "authentication": + message = self._authenticate(username, password, session, processor) message = json.dumps(message) if binary: message = self._set_message_header(message, mime_type) self.ws.send(message, binary=binary) + print(message) return self.ws @staticmethod @@ -93,3 +99,22 @@ class GremlinWriter: else: message["args"].update({"session": session}) return message + + @staticmethod + def _authenticate(username, password, session, processor): + auth_bytes = "".join(["0", username, "0", password]) + print(auth_bytes) + message = { + "requestId": str(uuid.uuid4()), + "op": "authentication", + "processor": processor, + "args": { + "sasl": auth_bytes + } + } + if session is None: + if processor == "session": + raise RuntimeError("session processor requires a session id") + else: + message["args"].update({"session": session}) + return message diff --git a/requirements.txt b/requirements.txt index 7a8c54e..1d19e2f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,3 @@ -aiogremlin==0.0.11 aiohttp==0.16.5 aiowebsocketclient==0.0.3 argparse==1.2.1 diff --git a/tests/tests.py b/tests/tests.py index a756068..fbff3c8 100644 --- a/tests/tests.py +++ b/tests/tests.py @@ -22,219 +22,220 @@ class SubmitTest(unittest.TestCase): @asyncio.coroutine def go(): - resp = yield from submit("4 + 4", bindings={"x": 4}, - loop=self.loop) + resp = yield from submit("4 + 4", url='https://localhost:8182/', + bindings={"x": 4}, loop=self.loop, + username="stephen", password="password") results = yield from resp.get() return results results = self.loop.run_until_complete(go()) self.assertEqual(results[0].data[0], 8) - def test_rebinding(self): - - @asyncio.coroutine - def go1(): - result = yield from submit("graph2.addVertex()", loop=self.loop) - resp = yield from result.get() - - try: - self.loop.run_until_complete(go1()) - error = False - except GremlinServerError: - error = True - self.assertTrue(error) - - @asyncio.coroutine - def go2(): - result = yield from submit( - "graph2.addVertex()", rebindings={"graph2": "graph"}, - loop=self.loop) - resp = yield from result.get() - self.assertEqual(len(resp), 1) - - try: - self.loop.run_until_complete(go2()) - except GremlinServerError: - print("RELEASE DOES NOT SUPPORT REBINDINGS") - - -class GremlinClientTest(unittest.TestCase): - - def setUp(self): - self.loop = asyncio.new_event_loop() - asyncio.set_event_loop(None) - self.gc = GremlinClient(url="ws://localhost:8182/", loop=self.loop) - - def tearDown(self): - self.loop.run_until_complete(self.gc.close()) - self.loop.close() - - def test_connection(self): - - @asyncio.coroutine - def go(): - ws = yield from self.gc._connector.ws_connect(self.gc.url) - self.assertFalse(ws.closed) - yield from ws.close() - - self.loop.run_until_complete(go()) - - def test_execute(self): - - @asyncio.coroutine - def go(): - resp = yield from self.gc.execute("x + x", bindings={"x": 4}) - return resp - - results = self.loop.run_until_complete(go()) - self.assertEqual(results[0].data[0], 8) - - def test_sub_waitfor(self): - sub1 = self.gc.execute("x + x", bindings={"x": 1}) - sub2 = self.gc.execute("x + x", bindings={"x": 2}) - sub3 = self.gc.execute("x + x", bindings={"x": 4}) - coro = asyncio.gather(*[asyncio.async(sub1, loop=self.loop), - asyncio.async(sub2, loop=self.loop), - asyncio.async(sub3, loop=self.loop)], - loop=self.loop) - # Here I am looking for resource warnings. - results = self.loop.run_until_complete(coro) - self.assertIsNotNone(results) - - def test_resp_stream(self): - @asyncio.coroutine - def stream_coro(): - results = [] - resp = yield from self.gc.submit("x + x", bindings={"x": 4}) - while True: - f = yield from resp.stream.read() - if f is None: - break - results.append(f) - self.assertEqual(results[0].data[0], 8) - self.loop.run_until_complete(stream_coro()) - - def test_execute_error(self): - execute = self.gc.execute("x + x g.asdfas", bindings={"x": 4}) - try: - self.loop.run_until_complete(execute) - error = False - except: - error = True - self.assertTrue(error) - - def test_rebinding(self): - execute = self.gc.execute("graph2.addVertex()") - try: - self.loop.run_until_complete(execute) - error = False - except GremlinServerError: - error = True - self.assertTrue(error) - - @asyncio.coroutine - def go(): - result = yield from self.gc.execute( - "graph2.addVertex()", rebindings={"graph2": "graph"}) - self.assertEqual(len(result), 1) - - try: - self.loop.run_until_complete(go()) - except GremlinServerError: - print("RELEASE DOES NOT SUPPORT REBINDINGS") - - -class GremlinClientSessionTest(unittest.TestCase): - - def setUp(self): - self.loop = asyncio.new_event_loop() - asyncio.set_event_loop(None) - self.gc = GremlinClientSession(url="ws://localhost:8182/", - loop=self.loop) - self.script1 = """v = graph.addVertex('name', 'Dave')""" - - self.script2 = "v.property('name')" - - def tearDown(self): - self.loop.run_until_complete(self.gc.close()) - self.loop.close() - - def test_session(self): - - @asyncio.coroutine - def go(): - yield from self.gc.execute(self.script1) - results = yield from self.gc.execute(self.script2) - return results - - results = self.loop.run_until_complete(go()) - self.assertEqual(results[0].data[0]['value'], 'Dave') - - def test_session_reset(self): - - @asyncio.coroutine - def go(): - yield from self.gc.execute(self.script1) - self.gc.reset_session() - results = yield from self.gc.execute(self.script2) - return results - try: - results = self.loop.run_until_complete(go()) - error = False - except GremlinServerError: - error = True - self.assertTrue(error) - - def test_session_manual_reset(self): - - @asyncio.coroutine - def go(): - yield from self.gc.execute(self.script1) - new_sess = str(uuid.uuid4()) - sess = self.gc.reset_session(session=new_sess) - self.assertEqual(sess, new_sess) - self.assertEqual(self.gc.session, new_sess) - results = yield from self.gc.execute(self.script2) - return results - try: - results = self.loop.run_until_complete(go()) - error = False - except GremlinServerError: - error = True - self.assertTrue(error) - - def test_session_set(self): - - @asyncio.coroutine - def go(): - yield from self.gc.execute(self.script1) - new_sess = str(uuid.uuid4()) - self.gc.session = new_sess - self.assertEqual(self.gc.session, new_sess) - results = yield from self.gc.execute(self.script2) - return results - try: - results = self.loop.run_until_complete(go()) - error = False - except GremlinServerError: - error = True - self.assertTrue(error) - - def test_resp_session(self): - - @asyncio.coroutine - def go(): - session = str(uuid.uuid4()) - self.gc.session = session - resp = yield from self.gc.submit("x + x", bindings={"x": 4}) - while True: - f = yield from resp.stream.read() - if f is None: - break - self.assertEqual(resp.session, session) - - self.loop.run_until_complete(go()) - +# def test_rebinding(self): +# +# @asyncio.coroutine +# def go1(): +# result = yield from submit("graph2.addVertex()", loop=self.loop) +# resp = yield from result.get() +# +# try: +# self.loop.run_until_complete(go1()) +# error = False +# except GremlinServerError: +# error = True +# self.assertTrue(error) +# +# @asyncio.coroutine +# def go2(): +# result = yield from submit( +# "graph2.addVertex()", rebindings={"graph2": "graph"}, +# loop=self.loop) +# resp = yield from result.get() +# self.assertEqual(len(resp), 1) +# +# try: +# self.loop.run_until_complete(go2()) +# except GremlinServerError: +# print("RELEASE DOES NOT SUPPORT REBINDINGS") +# +# +# class GremlinClientTest(unittest.TestCase): +# +# def setUp(self): +# self.loop = asyncio.new_event_loop() +# asyncio.set_event_loop(None) +# self.gc = GremlinClient(url="http://localhost:8182/", loop=self.loop) +# +# def tearDown(self): +# self.loop.run_until_complete(self.gc.close()) +# self.loop.close() +# +# def test_connection(self): +# +# @asyncio.coroutine +# def go(): +# ws = yield from self.gc._connector.ws_connect(self.gc.url) +# self.assertFalse(ws.closed) +# yield from ws.close() +# +# self.loop.run_until_complete(go()) +# +# def test_execute(self): +# +# @asyncio.coroutine +# def go(): +# resp = yield from self.gc.execute("x + x", bindings={"x": 4}) +# return resp +# +# results = self.loop.run_until_complete(go()) +# self.assertEqual(results[0].data[0], 8) +# +# def test_sub_waitfor(self): +# sub1 = self.gc.execute("x + x", bindings={"x": 1}) +# sub2 = self.gc.execute("x + x", bindings={"x": 2}) +# sub3 = self.gc.execute("x + x", bindings={"x": 4}) +# coro = asyncio.gather(*[asyncio.async(sub1, loop=self.loop), +# asyncio.async(sub2, loop=self.loop), +# asyncio.async(sub3, loop=self.loop)], +# loop=self.loop) +# # Here I am looking for resource warnings. +# results = self.loop.run_until_complete(coro) +# self.assertIsNotNone(results) +# +# def test_resp_stream(self): +# @asyncio.coroutine +# def stream_coro(): +# results = [] +# resp = yield from self.gc.submit("x + x", bindings={"x": 4}) +# while True: +# f = yield from resp.stream.read() +# if f is None: +# break +# results.append(f) +# self.assertEqual(results[0].data[0], 8) +# self.loop.run_until_complete(stream_coro()) +# +# def test_execute_error(self): +# execute = self.gc.execute("x + x g.asdfas", bindings={"x": 4}) +# try: +# self.loop.run_until_complete(execute) +# error = False +# except: +# error = True +# self.assertTrue(error) +# +# def test_rebinding(self): +# execute = self.gc.execute("graph2.addVertex()") +# try: +# self.loop.run_until_complete(execute) +# error = False +# except GremlinServerError: +# error = True +# self.assertTrue(error) +# +# @asyncio.coroutine +# def go(): +# result = yield from self.gc.execute( +# "graph2.addVertex()", rebindings={"graph2": "graph"}) +# self.assertEqual(len(result), 1) +# +# try: +# self.loop.run_until_complete(go()) +# except GremlinServerError: +# print("RELEASE DOES NOT SUPPORT REBINDINGS") +# +# +# class GremlinClientSessionTest(unittest.TestCase): +# +# def setUp(self): +# self.loop = asyncio.new_event_loop() +# asyncio.set_event_loop(None) +# self.gc = GremlinClientSession(url="http://localhost:8182/", +# loop=self.loop) +# self.script1 = """v = graph.addVertex('name', 'Dave')""" +# +# self.script2 = "v.property('name')" +# +# def tearDown(self): +# self.loop.run_until_complete(self.gc.close()) +# self.loop.close() +# +# def test_session(self): +# +# @asyncio.coroutine +# def go(): +# yield from self.gc.execute(self.script1) +# results = yield from self.gc.execute(self.script2) +# return results +# +# results = self.loop.run_until_complete(go()) +# self.assertEqual(results[0].data[0]['value'], 'Dave') +# +# def test_session_reset(self): +# +# @asyncio.coroutine +# def go(): +# yield from self.gc.execute(self.script1) +# self.gc.reset_session() +# results = yield from self.gc.execute(self.script2) +# return results +# try: +# results = self.loop.run_until_complete(go()) +# error = False +# except GremlinServerError: +# error = True +# self.assertTrue(error) +# +# def test_session_manual_reset(self): +# +# @asyncio.coroutine +# def go(): +# yield from self.gc.execute(self.script1) +# new_sess = str(uuid.uuid4()) +# sess = self.gc.reset_session(session=new_sess) +# self.assertEqual(sess, new_sess) +# self.assertEqual(self.gc.session, new_sess) +# results = yield from self.gc.execute(self.script2) +# return results +# try: +# results = self.loop.run_until_complete(go()) +# error = False +# except GremlinServerError: +# error = True +# self.assertTrue(error) +# +# def test_session_set(self): +# +# @asyncio.coroutine +# def go(): +# yield from self.gc.execute(self.script1) +# new_sess = str(uuid.uuid4()) +# self.gc.session = new_sess +# self.assertEqual(self.gc.session, new_sess) +# results = yield from self.gc.execute(self.script2) +# return results +# try: +# results = self.loop.run_until_complete(go()) +# error = False +# except GremlinServerError: +# error = True +# self.assertTrue(error) +# +# def test_resp_session(self): +# +# @asyncio.coroutine +# def go(): +# session = str(uuid.uuid4()) +# self.gc.session = session +# resp = yield from self.gc.submit("x + x", bindings={"x": 4}) +# while True: +# f = yield from resp.stream.read() +# if f is None: +# break +# self.assertEqual(resp.session, session) +# +# self.loop.run_until_complete(go()) +# if __name__ == "__main__": unittest.main() -- GitLab