diff --git a/tests/tests.py b/tests/tests.py index 1c8f5e3a413629c2233037c51514d333b1050992..f9cf0959168b5cbda47632b4a1cef1087226e31e 100644 --- a/tests/tests.py +++ b/tests/tests.py @@ -8,253 +8,253 @@ import uuid from aiogremlin import (GremlinClient, RequestError, GremlinServerError, SocketClientError, WebSocketPool, AiohttpFactory, create_client, GremlinWriter, GremlinResponse, WebSocketSession) -# -# -# class GremlinClientTests(unittest.TestCase): -# -# def setUp(self): -# self.loop = asyncio.new_event_loop() -# asyncio.set_event_loop(None) -# self.gc = GremlinClient("ws://localhost:8182/", loop=self.loop) -# -# def tearDown(self): -# self.loop.run_until_complete(self.gc.close()) -# self.loop.close() -# -# def test_connection(self): -# @asyncio.coroutine -# def conn_coro(): -# conn = yield from self.gc._acquire() -# self.assertFalse(conn.closed) -# return conn -# conn = self.loop.run_until_complete(conn_coro()) -# # Clean up the resource. -# self.loop.run_until_complete(conn.close()) -# -# def test_sub(self): -# execute = self.gc.execute("x + x", bindings={"x": 4}) -# results = self.loop.run_until_complete(execute) -# self.assertEqual(results[0].data[0], 8) -# -# -# class GremlinClientPoolTests(unittest.TestCase): -# -# def setUp(self): -# self.loop = asyncio.new_event_loop() -# asyncio.set_event_loop(None) -# self.gc = GremlinClient("ws://localhost:8182/", -# factory=AiohttpFactory(), pool=WebSocketPool("ws://localhost:8182/", -# loop=self.loop), -# loop=self.loop) -# -# def tearDown(self): -# self.loop.run_until_complete(self.gc.close()) -# self.loop.close() -# -# def test_connection(self): -# @asyncio.coroutine -# def conn_coro(): -# conn = yield from self.gc._acquire() -# self.assertFalse(conn.closed) -# return conn -# conn = self.loop.run_until_complete(conn_coro()) -# # Clean up the resource. -# self.loop.run_until_complete(conn.close()) -# -# def test_sub(self): -# execute = self.gc.execute("x + x", bindings={"x": 4}) -# results = self.loop.run_until_complete(execute) -# self.assertEqual(results[0].data[0], 8) -# -# def test_sub_waitfor(self): -# sub1 = self.gc.execute("x + x", bindings={"x": 1}) -# sub2 = self.gc.execute("x + x", bindings={"x": 2}) -# sub3 = self.gc.execute("x + x", bindings={"x": 4}) -# coro = asyncio.gather(*[asyncio.async(sub1, loop=self.loop), -# asyncio.async(sub2, loop=self.loop), -# asyncio.async(sub3, loop=self.loop)], loop=self.loop) -# # Here I am looking for resource warnings. -# results = self.loop.run_until_complete(coro) -# self.assertIsNotNone(results) -# -# def test_resp_stream(self): -# @asyncio.coroutine -# def stream_coro(): -# results = [] -# resp = yield from self.gc.submit("x + x", bindings={"x": 4}) -# while True: -# f = yield from resp.stream.read() -# if f is None: -# break -# results.append(f) -# self.assertEqual(results[0].data[0], 8) -# self.loop.run_until_complete(stream_coro()) -# -# def test_resp_get(self): -# @asyncio.coroutine -# def get_coro(): -# conn = yield from self.gc.submit("x + x", bindings={"x": 4}) -# results = yield from conn.get() -# self.assertEqual(results[0].data[0], 8) -# self.loop.run_until_complete(get_coro()) -# -# def test_execute_error(self): -# execute = self.gc.execute("x + x g.asdfas", bindings={"x": 4}) -# try: -# self.loop.run_until_complete(execute) -# error = False -# except: -# error = True -# self.assertTrue(error) -# -# def test_session_gen(self): -# execute = self.gc.execute("x + x", processor="session", bindings={"x": 4}) -# results = self.loop.run_until_complete(execute) -# self.assertEqual(results[0].data[0], 8) -# -# def test_session(self): -# @asyncio.coroutine -# def stream_coro(): -# session = str(uuid.uuid4()) -# resp = yield from self.gc.submit("x + x", bindings={"x": 4}, -# session=session) -# while True: -# f = yield from resp.stream.read() -# if f is None: -# break -# self.assertEqual(resp.session, session) -# self.loop.run_until_complete(stream_coro()) -# -# -# class WebSocketPoolTests(unittest.TestCase): -# -# def setUp(self): -# self.loop = asyncio.new_event_loop() -# asyncio.set_event_loop(None) -# self.pool = WebSocketPool(poolsize=2, timeout=1, loop=self.loop, -# factory=AiohttpFactory()) -# -# def tearDown(self): -# self.loop.run_until_complete(self.pool.close()) -# self.loop.close() -# -# def test_connect(self): -# -# @asyncio.coroutine -# def conn(): -# conn = yield from self.pool.acquire() -# self.assertFalse(conn.closed) -# self.pool.release(conn) -# self.assertEqual(self.pool.num_active_conns, 0) -# -# self.loop.run_until_complete(conn()) -# -# def test_multi_connect(self): -# -# @asyncio.coroutine -# def conn(): -# conn1 = yield from self.pool.acquire() -# conn2 = yield from self.pool.acquire() -# self.assertFalse(conn1.closed) -# self.assertFalse(conn2.closed) -# self.pool.release(conn1) -# self.assertEqual(self.pool.num_active_conns, 1) -# self.pool.release(conn2) -# self.assertEqual(self.pool.num_active_conns, 0) -# -# self.loop.run_until_complete(conn()) -# -# def test_timeout(self): -# -# @asyncio.coroutine -# def conn(): -# conn1 = yield from self.pool.acquire() -# conn2 = yield from self.pool.acquire() -# try: -# conn3 = yield from self.pool.acquire() -# timeout = False -# except asyncio.TimeoutError: -# timeout = True -# self.assertTrue(timeout) -# -# self.loop.run_until_complete(conn()) -# -# def test_socket_reuse(self): -# -# @asyncio.coroutine -# def conn(): -# conn1 = yield from self.pool.acquire() -# conn2 = yield from self.pool.acquire() -# try: -# conn3 = yield from self.pool.acquire() -# timeout = False -# except asyncio.TimeoutError: -# timeout = True -# self.assertTrue(timeout) -# self.pool.release(conn2) -# conn3 = yield from self.pool.acquire() -# self.assertFalse(conn1.closed) -# self.assertFalse(conn3.closed) -# self.assertEqual(conn2, conn3) -# -# self.loop.run_until_complete(conn()) -# -# def test_socket_repare(self): -# -# @asyncio.coroutine -# def conn(): -# conn1 = yield from self.pool.acquire() -# conn2 = yield from self.pool.acquire() -# self.assertFalse(conn1.closed) -# self.assertFalse(conn2.closed) -# yield from conn1.close() -# yield from conn2.close() -# self.assertTrue(conn2.closed) -# self.assertTrue(conn2.closed) -# self.pool.release(conn1) -# self.pool.release(conn2) -# conn1 = yield from self.pool.acquire() -# conn2 = yield from self.pool.acquire() -# self.assertFalse(conn1.closed) -# self.assertFalse(conn2.closed) -# -# self.loop.run_until_complete(conn()) -# -# class ContextMngrTest(unittest.TestCase): -# -# def setUp(self): -# self.loop = asyncio.new_event_loop() -# asyncio.set_event_loop(None) -# self.pool = WebSocketPool(poolsize=1, loop=self.loop, -# factory=AiohttpFactory, max_retries=0) -# -# def tearDown(self): -# self.loop.run_until_complete(self.pool.close()) -# self.loop.close() -# -# def test_connection_manager(self): -# results = [] -# @asyncio.coroutine -# def go(): -# with (yield from self.pool) as conn: -# writer = GremlinWriter(conn) -# conn = writer.write("1 + 1") -# resp = GremlinResponse(conn, self.pool, loop=self.loop) -# while True: -# mssg = yield from resp.stream.read() -# if mssg is None: -# break -# results.append(mssg) -# conn = self.pool._pool.get_nowait() -# self.assertTrue(conn.closed) -# writer = GremlinWriter(conn) -# try: -# conn = yield from writer.write("1 + 1") -# error = False -# except RuntimeError: -# error = True -# self.assertTrue(error) -# self.loop.run_until_complete(go()) + + +class GremlinClientTests(unittest.TestCase): + + def setUp(self): + self.loop = asyncio.new_event_loop() + asyncio.set_event_loop(None) + self.gc = GremlinClient("ws://localhost:8182/", loop=self.loop) + + def tearDown(self): + self.loop.run_until_complete(self.gc.close()) + self.loop.close() + + def test_connection(self): + @asyncio.coroutine + def conn_coro(): + conn = yield from self.gc._acquire() + self.assertFalse(conn.closed) + return conn + conn = self.loop.run_until_complete(conn_coro()) + # Clean up the resource. + self.loop.run_until_complete(conn.close()) + + def test_sub(self): + execute = self.gc.execute("x + x", bindings={"x": 4}) + results = self.loop.run_until_complete(execute) + self.assertEqual(results[0].data[0], 8) + + +class GremlinClientPoolTests(unittest.TestCase): + + def setUp(self): + self.loop = asyncio.new_event_loop() + asyncio.set_event_loop(None) + self.gc = GremlinClient("ws://localhost:8182/", + factory=AiohttpFactory(), pool=WebSocketPool("ws://localhost:8182/", + loop=self.loop), + loop=self.loop) + + def tearDown(self): + self.loop.run_until_complete(self.gc.close()) + self.loop.close() + + def test_connection(self): + @asyncio.coroutine + def conn_coro(): + conn = yield from self.gc._acquire() + self.assertFalse(conn.closed) + return conn + conn = self.loop.run_until_complete(conn_coro()) + # Clean up the resource. + self.loop.run_until_complete(conn.close()) + + def test_sub(self): + execute = self.gc.execute("x + x", bindings={"x": 4}) + results = self.loop.run_until_complete(execute) + self.assertEqual(results[0].data[0], 8) + + def test_sub_waitfor(self): + sub1 = self.gc.execute("x + x", bindings={"x": 1}) + sub2 = self.gc.execute("x + x", bindings={"x": 2}) + sub3 = self.gc.execute("x + x", bindings={"x": 4}) + coro = asyncio.gather(*[asyncio.async(sub1, loop=self.loop), + asyncio.async(sub2, loop=self.loop), + asyncio.async(sub3, loop=self.loop)], loop=self.loop) + # Here I am looking for resource warnings. + results = self.loop.run_until_complete(coro) + self.assertIsNotNone(results) + + def test_resp_stream(self): + @asyncio.coroutine + def stream_coro(): + results = [] + resp = yield from self.gc.submit("x + x", bindings={"x": 4}) + while True: + f = yield from resp.stream.read() + if f is None: + break + results.append(f) + self.assertEqual(results[0].data[0], 8) + self.loop.run_until_complete(stream_coro()) + + def test_resp_get(self): + @asyncio.coroutine + def get_coro(): + conn = yield from self.gc.submit("x + x", bindings={"x": 4}) + results = yield from conn.get() + self.assertEqual(results[0].data[0], 8) + self.loop.run_until_complete(get_coro()) + + def test_execute_error(self): + execute = self.gc.execute("x + x g.asdfas", bindings={"x": 4}) + try: + self.loop.run_until_complete(execute) + error = False + except: + error = True + self.assertTrue(error) + + def test_session_gen(self): + execute = self.gc.execute("x + x", processor="session", bindings={"x": 4}) + results = self.loop.run_until_complete(execute) + self.assertEqual(results[0].data[0], 8) + + def test_session(self): + @asyncio.coroutine + def stream_coro(): + session = str(uuid.uuid4()) + resp = yield from self.gc.submit("x + x", bindings={"x": 4}, + session=session) + while True: + f = yield from resp.stream.read() + if f is None: + break + self.assertEqual(resp.session, session) + self.loop.run_until_complete(stream_coro()) + + +class WebSocketPoolTests(unittest.TestCase): + + def setUp(self): + self.loop = asyncio.new_event_loop() + asyncio.set_event_loop(None) + self.pool = WebSocketPool(poolsize=2, timeout=1, loop=self.loop, + factory=AiohttpFactory()) + + def tearDown(self): + self.loop.run_until_complete(self.pool.close()) + self.loop.close() + + def test_connect(self): + + @asyncio.coroutine + def conn(): + conn = yield from self.pool.acquire() + self.assertFalse(conn.closed) + self.pool.release(conn) + self.assertEqual(self.pool.num_active_conns, 0) + + self.loop.run_until_complete(conn()) + + def test_multi_connect(self): + + @asyncio.coroutine + def conn(): + conn1 = yield from self.pool.acquire() + conn2 = yield from self.pool.acquire() + self.assertFalse(conn1.closed) + self.assertFalse(conn2.closed) + self.pool.release(conn1) + self.assertEqual(self.pool.num_active_conns, 1) + self.pool.release(conn2) + self.assertEqual(self.pool.num_active_conns, 0) + + self.loop.run_until_complete(conn()) + + def test_timeout(self): + + @asyncio.coroutine + def conn(): + conn1 = yield from self.pool.acquire() + conn2 = yield from self.pool.acquire() + try: + conn3 = yield from self.pool.acquire() + timeout = False + except asyncio.TimeoutError: + timeout = True + self.assertTrue(timeout) + + self.loop.run_until_complete(conn()) + + def test_socket_reuse(self): + + @asyncio.coroutine + def conn(): + conn1 = yield from self.pool.acquire() + conn2 = yield from self.pool.acquire() + try: + conn3 = yield from self.pool.acquire() + timeout = False + except asyncio.TimeoutError: + timeout = True + self.assertTrue(timeout) + self.pool.release(conn2) + conn3 = yield from self.pool.acquire() + self.assertFalse(conn1.closed) + self.assertFalse(conn3.closed) + self.assertEqual(conn2, conn3) + + self.loop.run_until_complete(conn()) + + def test_socket_repare(self): + + @asyncio.coroutine + def conn(): + conn1 = yield from self.pool.acquire() + conn2 = yield from self.pool.acquire() + self.assertFalse(conn1.closed) + self.assertFalse(conn2.closed) + yield from conn1.close() + yield from conn2.close() + self.assertTrue(conn2.closed) + self.assertTrue(conn2.closed) + self.pool.release(conn1) + self.pool.release(conn2) + conn1 = yield from self.pool.acquire() + conn2 = yield from self.pool.acquire() + self.assertFalse(conn1.closed) + self.assertFalse(conn2.closed) + + self.loop.run_until_complete(conn()) + +class ContextMngrTest(unittest.TestCase): + + def setUp(self): + self.loop = asyncio.new_event_loop() + asyncio.set_event_loop(None) + self.pool = WebSocketPool(poolsize=1, loop=self.loop, + factory=AiohttpFactory, max_retries=0) + + def tearDown(self): + self.loop.run_until_complete(self.pool.close()) + self.loop.close() + + def test_connection_manager(self): + results = [] + @asyncio.coroutine + def go(): + with (yield from self.pool) as conn: + writer = GremlinWriter(conn) + conn = writer.write("1 + 1") + resp = GremlinResponse(conn, self.pool, loop=self.loop) + while True: + mssg = yield from resp.stream.read() + if mssg is None: + break + results.append(mssg) + conn = self.pool._pool.get_nowait() + self.assertTrue(conn.closed) + writer = GremlinWriter(conn) + try: + conn = yield from writer.write("1 + 1") + error = False + except RuntimeError: + error = True + self.assertTrue(error) + self.loop.run_until_complete(go()) class GremlinClientPoolSessionTests(unittest.TestCase):