cluster.py 6.26 KB
Newer Older
davebshow's avatar
davebshow committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
import asyncio
import collections
import configparser
import importlib
import ssl

try:
    import ujson as json
except ImportError:
    import json

import yaml

from aiogremlin import exception
from aiogremlin import driver
from aiogremlin.gremlin_python.driver import serializer


def my_import(name):
    names = name.rsplit('.', maxsplit=1)
    if len(names) != 2:
        raise exception.ConfigError("not a valid absolute python path to a class: {}".format(name))
    module_name, class_name = names
    try:
        module = importlib.import_module(module_name)
    except ImportError:
        raise exception.ConfigError(
                "Error processing cluster configuration: could not import {}".format(name))
    return getattr(module, class_name)


class Cluster:
    """
    A cluster of Gremlin Server hosts. This object provides the main high
    level interface used by the :py:mod:`aiogremlin` module.

    :param asyncio.BaseEventLoop loop:
    """

    DEFAULT_CONFIG = {
        'scheme': 'ws',
        'hosts': ['localhost'],
        'port': 8182,
        'ssl_certfile': '',
        'ssl_keyfile': '',
        'ssl_password': '',
        'username': '',
        'password': '',
        'response_timeout': None,
        'max_conns': 4,
        'min_conns': 1,
        'max_times_acquired': 16,
        'max_inflight': 64,
        'message_serializer': 'aiogremlin.gremlin_python.driver.serializer.GraphSONMessageSerializer',
        'provider': 'aiogremlin.driver.provider.TinkerGraph'
    }

    def __init__(self, loop, aliases=None, **config):
        self._loop = loop
        default_config = dict(self.DEFAULT_CONFIG)
        default_config.update(config)
        self._config = self._process_config_imports(default_config)
        self._hosts = collections.deque()
        self._closed = False
        if aliases is None:
            aliases = {}
        self._aliases = aliases

    @classmethod
    async def open(cls, loop, *, aliases=None, configfile=None, **config):
        """
        **coroutine** Open a cluster, connecting to all available hosts as
        specified in configuration.

        :param asyncio.BaseEventLoop loop:
        :param str configfile: Optional configuration file in .json or
            .yml format
        """
        cluster = cls(loop, aliases=aliases, **config)
        if configfile:
            cluster.config_from_file(configfile)
        await cluster.establish_hosts()
        return cluster

    @property
    def hosts(self):
        return self._hosts

    @property
    def config(self):
        """
        Readonly property.

        :returns: `dict` containing the cluster configuration
        """
        return self._config

    async def get_connection(self):
        """
        **coroutine** Get connection from next available host in a round robin
        fashion.

        :returns: :py:class:`Connection<aiogremlin.connection.Connection>`
        """
        if not self._hosts:
            await self.establish_hosts()
        host = self._hosts.popleft()
        conn = await host.get_connection()
        self._hosts.append(host)
        return conn

    async def establish_hosts(self):
        """
        **coroutine** Connect to all hosts as specified in configuration.
        """
        scheme = self._config['scheme']
        hosts = self._config['hosts']
        port = self._config['port']
        for host in hosts:
            url = '{}://{}:{}/gremlin'.format(scheme, host, port)
            host = await driver.GremlinServer.open(
                url, self._loop, **dict(self._config))
            self._hosts.append(host)

    def config_from_file(self, filename):
        """
        Load configuration from from file.

        :param str filename: Path to the configuration file.
        """
        if filename.endswith('yml') or filename.endswith('yaml'):
            self.config_from_yaml(filename)
        elif filename.endswith('.json'):
            self.config_from_json(filename)
        else:
            raise exception.ConfigurationError('Unknown config file format')

    def config_from_yaml(self, filename):
        with open(filename, 'r') as f:
            config = yaml.load(f)
        config = self._process_config_imports(config)
        self._config.update(config)

    def config_from_json(self, filename):
        """
        Load configuration from from JSON file.

        :param str filename: Path to the configuration file.
        """
        with open(filename, 'r') as f:
            config = json.load(f)
        config = self._process_config_imports(config)
        self.config.update(config)

    def _process_config_imports(self, config):
        message_serializer = config.get('message_serializer')
        provider = config.get('provider')
        if isinstance(message_serializer, str):
            config['message_serializer'] = my_import(message_serializer)
        if isinstance(provider, str):
            config['provider'] = my_import(provider)
        return config

    def config_from_module(self, module):
        if isinstance(module, str):
            module = importlib.import_module(module)
        config = dict()
        for item in dir(module):
            if not item.startswith('_') and item.lower() in self.DEFAULT_CONFIG:
                config[item.lower()] = getattr(module, item)
        config = self._process_config_imports(config)
        self.config.update(config)

    async def connect(self, aliases=None):
        """
        **coroutine** Get a connected client. Main API method.

        :returns: A connected instance of `Client<aiogremlin.client.Client>`
        """
        aliases = aliases or self._aliases
        if not self._hosts:
            await self.establish_hosts()
        # if session:
        #     host = self._hosts.popleft()
        #     client = client.SessionedClient(host, self._loop, session,
        #                                     aliases=aliases)
        #     self._hosts.append(host)
        # else:
        client = driver.Client(self, self._loop, aliases=aliases)
        return client

    async def close(self):
        """**coroutine** Close cluster and all connected hosts."""
        waiters = []
        while self._hosts:
            host = self._hosts.popleft()
            waiters.append(host.close())
        await asyncio.gather(*waiters, loop=self._loop)
        self._closed = True