transport.py 1.61 KB
Newer Older
davebshow's avatar
davebshow committed
1
2
import aiohttp

3
from gremlin_python.driver import transport
davebshow's avatar
davebshow committed
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24


class AiohttpTransport(transport.AbstractBaseTransport):

    def __init__(self, loop):
        self._loop = loop
        self._connected = False

    async def connect(self, url, *, ssl_context=None):
        await self.close()
        connector = aiohttp.TCPConnector(
            ssl_context=ssl_context, loop=self._loop)
        self._client_session = aiohttp.ClientSession(
            loop=self._loop, connector=connector)
        self._ws = await self._client_session.ws_connect(url)
        self._connected = True

    def write(self, message):
        self._ws.send_bytes(message)

    async def read(self):
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
        data = await self._ws.receive()
        if data.tp == aiohttp.WSMsgType.close:
            await self._transport.close()
            raise RuntimeError("Connection closed by server")
        elif data.tp == aiohttp.WSMsgType.error:
            # This won't raise properly, fix
            raise data.data
        elif data.tp == aiohttp.WSMsgType.closed:
            # Hmm
            raise RuntimeError("Connection closed by server")
        elif data.tp == aiohttp.WSMsgType.text:
            # Should return bytes
            data = data.data.strip().encode('utf-8')
        else:
            data = data.data
        return data
davebshow's avatar
davebshow committed
41
42
43
44
45
46
47
48
49
50
51

    async def close(self):
        if self._connected:
            if not self._ws.closed:
                await self._ws.close()
            if not self._client_session.closed:
                await self._client_session.close()

    @property
    def closed(self):
        return self._ws.closed or self._client_session.closed