diff --git a/src/apex/__init__.py b/src/apex/__init__.py index c2721c915095b088e5ec3e37f8fa158c4360305c..79da91db8ad25e591535e0cfcb9d0d222cbf890b 100644 --- a/src/apex/__init__.py +++ b/src/apex/__init__.py @@ -7,7 +7,8 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function -from .nonrepeating_buffer import NonrepeatingBuffer # noqa: F401 +from .buffers import NonrepeatingBuffer # noqa: F401 +from .buffers import ReconnectingPacketBuffer # noqa: F401 from .util import echo_colorized_error # noqa: F401 from .util import echo_colorized_frame # noqa: F401 from .util import echo_colorized_warning # noqa: F401 diff --git a/src/apex/aprs/__init__.py b/src/apex/aprs/__init__.py index 1120cdf51c711b1246fdf479c133018625d4e12f..39e8cc03e1a17571f368fe4baed27334aa226f85 100644 --- a/src/apex/aprs/__init__.py +++ b/src/apex/aprs/__init__.py @@ -24,7 +24,6 @@ import logging from .aprs import Aprs # noqa: F401 from .igate import IGate # noqa: F401 -from .igate import ReconnectingPacketBuffer # noqa: F401 __author__ = 'Jeffrey Phillips Freeman (WI2ARD)' __maintainer__ = 'Jeffrey Phillips Freeman (WI2ARD)' diff --git a/src/apex/aprs/igate.py b/src/apex/aprs/igate.py index 25bf96f5bb56d3ddc925831afc9db41eb4df956b..b826884db77307d90822027e5ce23c616a5b1c6b 100644 --- a/src/apex/aprs/igate.py +++ b/src/apex/aprs/igate.py @@ -11,9 +11,7 @@ from __future__ import print_function import logging import select import socket -import threading -import time -import cachetools + import requests from apex.aprs import constants as aprs_constants @@ -27,127 +25,6 @@ __copyright__ = 'Copyright 2016, Syncleus, Inc. and contributors' __credits__ = [] -class ReconnectingPacketBuffer(object): - - STARTING_WAIT_TIME = 2 - MAX_WAIT_TIME = 300 - WAIT_TIME_MULTIPLIER = 2 - MAX_INDEX = 1000000 - - def __init__(self, packet_layer): - self.packet_layer = packet_layer - self.to_packet_layer = cachetools.TTLCache(10, 30) - self.current_index = 0 - self.from_packet_layer = cachetools.TTLCache(10, 30) - self.connect_thread = None - self.lock = threading.Lock() - self.running = False - self.reconnect_wait_time = self.STARTING_WAIT_TIME - self.last_connect_attempt = None - self.connect_args = None - self.connect_kwargs = None - self.connected = False - - def __increment_wait_time(self): - self.reconnect_wait_time *= self.WAIT_TIME_MULTIPLIER - if self.reconnect_wait_time > self.MAX_WAIT_TIME: - self.reconnect_wait_time = self.MAX_WAIT_TIME - - def __reset_wait_time(self): - self.reconnect_wait_time = self.STARTING_WAIT_TIME - - def __run(self): - while self.running: - if not self.connected: - if not self.last_connect_attempt or time.time() - self.last_connect_attempt > self.reconnect_wait_time: - try: - self.last_connect_attempt = time.time() - self.packet_layer.connect(*self.connect_args, **self.connect_kwargs) - self.connected = True - except IOError: - try: - self.packet_layer.close() - except IOError: - pass - self.__increment_wait_time() - else: - time.sleep(1) - else: - io_occured = False - - # lets attempt to read in a packet - try: - read_packet = self.packet_layer.read() - self.__reset_wait_time() - if read_packet: - with self.lock: - self.from_packet_layer[str(aprs_util.hash_frame(read_packet))] = read_packet - io_occured = True - except IOError: - try: - self.packet_layer.close() - except IOError: - pass - self.connected = False - continue - - # lets try to write a packet, if any are waiting. - write_packet = None - with self.lock: - if self.to_packet_layer: - write_packet = self.to_packet_layer.popitem()[1] - if write_packet: - try: - self.packet_layer.write(write_packet) - io_occured = True - self.__reset_wait_time() - except IOError: - self.to_packet_layer[str(aprs_util.hash_frame(read_packet))] = write_packet - try: - self.packet_layer.close() - except IOError: - pass - self.connected = False - continue - - if not io_occured: - time.sleep(1) - try: - self.packet_layer.close() - except IOError: - pass - - def connect(self, *args, **kwargs): - with self.lock: - if self.connect_thread: - raise RuntimeError('already connected') - - self.running = True - self.connect_args = args - self.connect_kwargs = kwargs - self.connect_thread = threading.Thread(target=self.__run) - self.connect_thread.start() - - def close(self): - with self.lock: - if not self.connect_thread: - raise RuntimeError('not connected') - - self.running = False - self.connect_thread.join() - self.connect_thread = None - - def read(self): - with self.lock: - if self.from_packet_layer: - return self.from_packet_layer.popitem()[1] - return None - - def write(self, packet): - with self.lock: - self.to_packet_layer[str(aprs_util.hash_frame(packet))] = packet - - class IGate(object): """APRS Object.""" diff --git a/src/apex/buffers.py b/src/apex/buffers.py new file mode 100644 index 0000000000000000000000000000000000000000..2e009a3a143c6f24edebe35ed95aa0f05c43208c --- /dev/null +++ b/src/apex/buffers.py @@ -0,0 +1,191 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +# These imports are for python3 compatibility inside python2 +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import threading +import time + +import cachetools + +from apex.aprs import util as aprs_util +from .util import echo_colorized_frame +from .util import echo_colorized_warning + + +class NonrepeatingBuffer(object): + def __init__(self, base_tnc, base_name, base_port=None, echo_packets=True, buffer_size=10000, buffer_time=30): + self.packet_cache = cachetools.TTLCache(buffer_size, buffer_time) + self.lock = threading.Lock() + self.base_tnc = base_tnc + self.base_port = base_port + self.base_name = base_name + self.echo_packets = echo_packets + + @property + def port(self): + return self.base_port + + @property + def name(self): + return self.base_name + + def connect(self, *args, **kwargs): + self.base_tnc.connect(*args, **kwargs) + + def close(self, *args, **kwargs): + self.base_tnc.close(*args, **kwargs) + + def write(self, frame, *args, **kwargs): + with self.lock: + frame_hash = str(aprs_util.hash_frame(frame)) + if frame_hash not in self.packet_cache: + self.packet_cache[frame_hash] = frame + if self.base_port: + self.base_tnc.write(frame, self.base_port) + else: + self.base_tnc.write(frame) + + if self.echo_packets: + echo_colorized_frame(frame, self.base_name, False) + + def read(self, *args, **kwargs): + with self.lock: + frame = self.base_tnc.read(*args, **kwargs) + if not frame: + return frame + frame_hash = str(aprs_util.hash_frame(frame)) + if frame_hash not in self.packet_cache: + self.packet_cache[frame_hash] = frame + if self.echo_packets: + echo_colorized_frame(frame, self.base_name, True) + return frame + else: + return None + + +class ReconnectingPacketBuffer(object): + + STARTING_WAIT_TIME = 2 + MAX_WAIT_TIME = 300 + WAIT_TIME_MULTIPLIER = 2 + MAX_INDEX = 1000000 + + def __init__(self, packet_layer): + self.packet_layer = packet_layer + self.to_packet_layer = cachetools.TTLCache(10, 30) + self.current_index = 0 + self.from_packet_layer = cachetools.TTLCache(10, 30) + self.connect_thread = None + self.lock = threading.Lock() + self.running = False + self.reconnect_wait_time = self.STARTING_WAIT_TIME + self.last_connect_attempt = None + self.connect_args = None + self.connect_kwargs = None + self.connected = False + + def __increment_wait_time(self): + self.reconnect_wait_time *= self.WAIT_TIME_MULTIPLIER + if self.reconnect_wait_time > self.MAX_WAIT_TIME: + self.reconnect_wait_time = self.MAX_WAIT_TIME + + def __reset_wait_time(self): + self.reconnect_wait_time = self.STARTING_WAIT_TIME + + def __run(self): + while self.running: + if not self.connected: + if not self.last_connect_attempt or time.time() - self.last_connect_attempt > self.reconnect_wait_time: + try: + self.last_connect_attempt = time.time() + self.packet_layer.connect(*self.connect_args, **self.connect_kwargs) + self.connected = True + except IOError: + echo_colorized_warning('Could not connect, will reattempt.') + try: + self.packet_layer.close() + except IOError: + pass + self.__increment_wait_time() + else: + time.sleep(1) + else: + io_occured = False + + # lets attempt to read in a packet + try: + read_packet = self.packet_layer.read() + self.__reset_wait_time() + if read_packet: + with self.lock: + self.from_packet_layer[str(aprs_util.hash_frame(read_packet))] = read_packet + io_occured = True + except IOError: + echo_colorized_warning('Read failed. Will disconnect and attempt to reconnect.') + try: + self.packet_layer.close() + except IOError: + pass + self.connected = False + continue + + # lets try to write a packet, if any are waiting. + write_packet = None + with self.lock: + if self.to_packet_layer: + write_packet = self.to_packet_layer.popitem()[1] + if write_packet: + try: + self.packet_layer.write(write_packet) + io_occured = True + self.__reset_wait_time() + except IOError: + echo_colorized_warning('Write failed. Will disconnect and attempt to reconnect.') + self.to_packet_layer[str(aprs_util.hash_frame(read_packet))] = write_packet + try: + self.packet_layer.close() + except IOError: + pass + self.connected = False + continue + + if not io_occured: + time.sleep(1) + try: + self.packet_layer.close() + except IOError: + pass + + def connect(self, *args, **kwargs): + with self.lock: + if self.connect_thread: + raise RuntimeError('already connected') + + self.running = True + self.connect_args = args + self.connect_kwargs = kwargs + self.connect_thread = threading.Thread(target=self.__run) + self.connect_thread.start() + + def close(self): + with self.lock: + if not self.connect_thread: + raise RuntimeError('not connected') + + self.running = False + self.connect_thread.join() + self.connect_thread = None + + def read(self): + with self.lock: + if self.from_packet_layer: + return self.from_packet_layer.popitem()[1] + return None + + def write(self, packet): + with self.lock: + self.to_packet_layer[str(aprs_util.hash_frame(packet))] = packet diff --git a/src/apex/cli.py b/src/apex/cli.py index 9bdc72aa6cb9f036a908f1ab48f63f7ccccb6be1..efb9e138e9eb71e49ecfdfabba25086682431f39 100644 --- a/src/apex/cli.py +++ b/src/apex/cli.py @@ -32,11 +32,12 @@ import traceback import click import apex.aprs +import apex.buffers from apex.kiss import constants as kissConstants from apex.plugin_loader import get_plugins from apex.plugin_loader import load_plugin -from .nonrepeating_buffer import NonrepeatingBuffer +from .buffers import NonrepeatingBuffer from .util import echo_colorized_error from .util import echo_colorized_warning @@ -105,11 +106,11 @@ def configure(configfile, verbose=False): if config.has_option(section, 'com_port') and config.has_option(section, 'baud'): com_port = config.get(section, 'com_port') baud = config.get(section, 'baud') - kiss_tnc = apex.aprs.ReconnectingPacketBuffer(apex.aprs.Aprs(apex.kiss.KissSerial(com_port=com_port, baud=baud))) + kiss_tnc = apex.buffers.ReconnectingPacketBuffer(apex.aprs.Aprs(apex.kiss.KissSerial(com_port=com_port, baud=baud))) elif config.has_option(section, 'tcp_host') and config.has_option(section, 'tcp_port'): tcp_host = config.get(section, 'tcp_host') tcp_port = config.get(section, 'tcp_port') - kiss_tnc = apex.aprs.ReconnectingPacketBuffer(apex.aprs.Aprs(apex.kiss.KissTcp(host=tcp_host, tcp_port=tcp_port))) + kiss_tnc = apex.buffers.ReconnectingPacketBuffer(apex.aprs.Aprs(apex.kiss.KissTcp(host=tcp_host, tcp_port=tcp_port))) else: echo_colorized_error("""Invalid configuration, must have both com_port and baud set or tcp_host and tcp_port set in TNC sections of configuration file""") @@ -153,7 +154,7 @@ def configure(configfile, verbose=False): aprsis_password = -1 aprsis_server = config.get('APRS-IS', 'server') aprsis_server_port = config.get('APRS-IS', 'server_port') - aprsis_base = apex.aprs.ReconnectingPacketBuffer(apex.aprs.IGate(aprsis_callsign, aprsis_password)) + aprsis_base = apex.buffers.ReconnectingPacketBuffer(apex.aprs.IGate(aprsis_callsign, aprsis_password)) aprsis = NonrepeatingBuffer(aprsis_base, 'APRS-IS') aprsis.connect(aprsis_server, int(aprsis_server_port)) diff --git a/src/apex/nonrepeating_buffer.py b/src/apex/nonrepeating_buffer.py deleted file mode 100644 index f40128841a64160313d603ca4470d6c4eb193731..0000000000000000000000000000000000000000 --- a/src/apex/nonrepeating_buffer.py +++ /dev/null @@ -1,63 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - -# These imports are for python3 compatibility inside python2 -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function - -import threading -import cachetools - -import apex.aprs.util - - -class NonrepeatingBuffer(object): - def __init__(self, base_tnc, base_name, base_port=None, echo_packets=True, buffer_size=10000, buffer_time=30): - self.packet_cache = cachetools.TTLCache(buffer_size, buffer_time) - self.lock = threading.Lock() - self.base_tnc = base_tnc - self.base_port = base_port - self.base_name = base_name - self.echo_packets = echo_packets - - @property - def port(self): - return self.base_port - - @property - def name(self): - return self.base_name - - def connect(self, *args, **kwargs): - self.base_tnc.connect(*args, **kwargs) - - def close(self, *args, **kwargs): - self.base_tnc.close(*args, **kwargs) - - def write(self, frame, *args, **kwargs): - with self.lock: - frame_hash = str(apex.aprs.util.hash_frame(frame)) - if frame_hash not in self.packet_cache: - self.packet_cache[frame_hash] = frame - if self.base_port: - self.base_tnc.write(frame, self.base_port) - else: - self.base_tnc.write(frame) - - if self.echo_packets: - apex.echo_colorized_frame(frame, self.base_name, False) - - def read(self, *args, **kwargs): - with self.lock: - frame = self.base_tnc.read(*args, **kwargs) - if not frame: - return frame - frame_hash = str(apex.aprs.util.hash_frame(frame)) - if frame_hash not in self.packet_cache: - self.packet_cache[frame_hash] = frame - if self.echo_packets: - apex.echo_colorized_frame(frame, self.base_name, True) - return frame - else: - return None