From 2d2fc38df128feb7b35492bf5d8d47f6f594f6f3 Mon Sep 17 00:00:00 2001
From: Jeffrey Phillips Freeman <jeffrey.freeman@syncleus.com>
Date: Fri, 30 Sep 2016 04:59:15 -0400
Subject: [PATCH] Added warning messages to the reconnecting buffer.
Issue: #25
---
src/apex/__init__.py | 3 +-
src/apex/aprs/__init__.py | 1 -
src/apex/aprs/igate.py | 125 +--------------------
src/apex/buffers.py | 191 ++++++++++++++++++++++++++++++++
src/apex/cli.py | 9 +-
src/apex/nonrepeating_buffer.py | 63 -----------
6 files changed, 199 insertions(+), 193 deletions(-)
create mode 100644 src/apex/buffers.py
delete mode 100644 src/apex/nonrepeating_buffer.py
diff --git a/src/apex/__init__.py b/src/apex/__init__.py
index c2721c9..79da91d 100644
--- a/src/apex/__init__.py
+++ b/src/apex/__init__.py
@@ -7,7 +7,8 @@ from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
-from .nonrepeating_buffer import NonrepeatingBuffer # noqa: F401
+from .buffers import NonrepeatingBuffer # noqa: F401
+from .buffers import ReconnectingPacketBuffer # noqa: F401
from .util import echo_colorized_error # noqa: F401
from .util import echo_colorized_frame # noqa: F401
from .util import echo_colorized_warning # noqa: F401
diff --git a/src/apex/aprs/__init__.py b/src/apex/aprs/__init__.py
index 1120cdf..39e8cc0 100644
--- a/src/apex/aprs/__init__.py
+++ b/src/apex/aprs/__init__.py
@@ -24,7 +24,6 @@ import logging
from .aprs import Aprs # noqa: F401
from .igate import IGate # noqa: F401
-from .igate import ReconnectingPacketBuffer # noqa: F401
__author__ = 'Jeffrey Phillips Freeman (WI2ARD)'
__maintainer__ = 'Jeffrey Phillips Freeman (WI2ARD)'
diff --git a/src/apex/aprs/igate.py b/src/apex/aprs/igate.py
index 25bf96f..b826884 100644
--- a/src/apex/aprs/igate.py
+++ b/src/apex/aprs/igate.py
@@ -11,9 +11,7 @@ from __future__ import print_function
import logging
import select
import socket
-import threading
-import time
-import cachetools
+
import requests
from apex.aprs import constants as aprs_constants
@@ -27,127 +25,6 @@ __copyright__ = 'Copyright 2016, Syncleus, Inc. and contributors'
__credits__ = []
-class ReconnectingPacketBuffer(object):
-
- STARTING_WAIT_TIME = 2
- MAX_WAIT_TIME = 300
- WAIT_TIME_MULTIPLIER = 2
- MAX_INDEX = 1000000
-
- def __init__(self, packet_layer):
- self.packet_layer = packet_layer
- self.to_packet_layer = cachetools.TTLCache(10, 30)
- self.current_index = 0
- self.from_packet_layer = cachetools.TTLCache(10, 30)
- self.connect_thread = None
- self.lock = threading.Lock()
- self.running = False
- self.reconnect_wait_time = self.STARTING_WAIT_TIME
- self.last_connect_attempt = None
- self.connect_args = None
- self.connect_kwargs = None
- self.connected = False
-
- def __increment_wait_time(self):
- self.reconnect_wait_time *= self.WAIT_TIME_MULTIPLIER
- if self.reconnect_wait_time > self.MAX_WAIT_TIME:
- self.reconnect_wait_time = self.MAX_WAIT_TIME
-
- def __reset_wait_time(self):
- self.reconnect_wait_time = self.STARTING_WAIT_TIME
-
- def __run(self):
- while self.running:
- if not self.connected:
- if not self.last_connect_attempt or time.time() - self.last_connect_attempt > self.reconnect_wait_time:
- try:
- self.last_connect_attempt = time.time()
- self.packet_layer.connect(*self.connect_args, **self.connect_kwargs)
- self.connected = True
- except IOError:
- try:
- self.packet_layer.close()
- except IOError:
- pass
- self.__increment_wait_time()
- else:
- time.sleep(1)
- else:
- io_occured = False
-
- # lets attempt to read in a packet
- try:
- read_packet = self.packet_layer.read()
- self.__reset_wait_time()
- if read_packet:
- with self.lock:
- self.from_packet_layer[str(aprs_util.hash_frame(read_packet))] = read_packet
- io_occured = True
- except IOError:
- try:
- self.packet_layer.close()
- except IOError:
- pass
- self.connected = False
- continue
-
- # lets try to write a packet, if any are waiting.
- write_packet = None
- with self.lock:
- if self.to_packet_layer:
- write_packet = self.to_packet_layer.popitem()[1]
- if write_packet:
- try:
- self.packet_layer.write(write_packet)
- io_occured = True
- self.__reset_wait_time()
- except IOError:
- self.to_packet_layer[str(aprs_util.hash_frame(read_packet))] = write_packet
- try:
- self.packet_layer.close()
- except IOError:
- pass
- self.connected = False
- continue
-
- if not io_occured:
- time.sleep(1)
- try:
- self.packet_layer.close()
- except IOError:
- pass
-
- def connect(self, *args, **kwargs):
- with self.lock:
- if self.connect_thread:
- raise RuntimeError('already connected')
-
- self.running = True
- self.connect_args = args
- self.connect_kwargs = kwargs
- self.connect_thread = threading.Thread(target=self.__run)
- self.connect_thread.start()
-
- def close(self):
- with self.lock:
- if not self.connect_thread:
- raise RuntimeError('not connected')
-
- self.running = False
- self.connect_thread.join()
- self.connect_thread = None
-
- def read(self):
- with self.lock:
- if self.from_packet_layer:
- return self.from_packet_layer.popitem()[1]
- return None
-
- def write(self, packet):
- with self.lock:
- self.to_packet_layer[str(aprs_util.hash_frame(packet))] = packet
-
-
class IGate(object):
"""APRS Object."""
diff --git a/src/apex/buffers.py b/src/apex/buffers.py
new file mode 100644
index 0000000..2e009a3
--- /dev/null
+++ b/src/apex/buffers.py
@@ -0,0 +1,191 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+# These imports are for python3 compatibility inside python2
+from __future__ import absolute_import
+from __future__ import division
+from __future__ import print_function
+
+import threading
+import time
+
+import cachetools
+
+from apex.aprs import util as aprs_util
+from .util import echo_colorized_frame
+from .util import echo_colorized_warning
+
+
+class NonrepeatingBuffer(object):
+ def __init__(self, base_tnc, base_name, base_port=None, echo_packets=True, buffer_size=10000, buffer_time=30):
+ self.packet_cache = cachetools.TTLCache(buffer_size, buffer_time)
+ self.lock = threading.Lock()
+ self.base_tnc = base_tnc
+ self.base_port = base_port
+ self.base_name = base_name
+ self.echo_packets = echo_packets
+
+ @property
+ def port(self):
+ return self.base_port
+
+ @property
+ def name(self):
+ return self.base_name
+
+ def connect(self, *args, **kwargs):
+ self.base_tnc.connect(*args, **kwargs)
+
+ def close(self, *args, **kwargs):
+ self.base_tnc.close(*args, **kwargs)
+
+ def write(self, frame, *args, **kwargs):
+ with self.lock:
+ frame_hash = str(aprs_util.hash_frame(frame))
+ if frame_hash not in self.packet_cache:
+ self.packet_cache[frame_hash] = frame
+ if self.base_port:
+ self.base_tnc.write(frame, self.base_port)
+ else:
+ self.base_tnc.write(frame)
+
+ if self.echo_packets:
+ echo_colorized_frame(frame, self.base_name, False)
+
+ def read(self, *args, **kwargs):
+ with self.lock:
+ frame = self.base_tnc.read(*args, **kwargs)
+ if not frame:
+ return frame
+ frame_hash = str(aprs_util.hash_frame(frame))
+ if frame_hash not in self.packet_cache:
+ self.packet_cache[frame_hash] = frame
+ if self.echo_packets:
+ echo_colorized_frame(frame, self.base_name, True)
+ return frame
+ else:
+ return None
+
+
+class ReconnectingPacketBuffer(object):
+
+ STARTING_WAIT_TIME = 2
+ MAX_WAIT_TIME = 300
+ WAIT_TIME_MULTIPLIER = 2
+ MAX_INDEX = 1000000
+
+ def __init__(self, packet_layer):
+ self.packet_layer = packet_layer
+ self.to_packet_layer = cachetools.TTLCache(10, 30)
+ self.current_index = 0
+ self.from_packet_layer = cachetools.TTLCache(10, 30)
+ self.connect_thread = None
+ self.lock = threading.Lock()
+ self.running = False
+ self.reconnect_wait_time = self.STARTING_WAIT_TIME
+ self.last_connect_attempt = None
+ self.connect_args = None
+ self.connect_kwargs = None
+ self.connected = False
+
+ def __increment_wait_time(self):
+ self.reconnect_wait_time *= self.WAIT_TIME_MULTIPLIER
+ if self.reconnect_wait_time > self.MAX_WAIT_TIME:
+ self.reconnect_wait_time = self.MAX_WAIT_TIME
+
+ def __reset_wait_time(self):
+ self.reconnect_wait_time = self.STARTING_WAIT_TIME
+
+ def __run(self):
+ while self.running:
+ if not self.connected:
+ if not self.last_connect_attempt or time.time() - self.last_connect_attempt > self.reconnect_wait_time:
+ try:
+ self.last_connect_attempt = time.time()
+ self.packet_layer.connect(*self.connect_args, **self.connect_kwargs)
+ self.connected = True
+ except IOError:
+ echo_colorized_warning('Could not connect, will reattempt.')
+ try:
+ self.packet_layer.close()
+ except IOError:
+ pass
+ self.__increment_wait_time()
+ else:
+ time.sleep(1)
+ else:
+ io_occured = False
+
+ # lets attempt to read in a packet
+ try:
+ read_packet = self.packet_layer.read()
+ self.__reset_wait_time()
+ if read_packet:
+ with self.lock:
+ self.from_packet_layer[str(aprs_util.hash_frame(read_packet))] = read_packet
+ io_occured = True
+ except IOError:
+ echo_colorized_warning('Read failed. Will disconnect and attempt to reconnect.')
+ try:
+ self.packet_layer.close()
+ except IOError:
+ pass
+ self.connected = False
+ continue
+
+ # lets try to write a packet, if any are waiting.
+ write_packet = None
+ with self.lock:
+ if self.to_packet_layer:
+ write_packet = self.to_packet_layer.popitem()[1]
+ if write_packet:
+ try:
+ self.packet_layer.write(write_packet)
+ io_occured = True
+ self.__reset_wait_time()
+ except IOError:
+ echo_colorized_warning('Write failed. Will disconnect and attempt to reconnect.')
+ self.to_packet_layer[str(aprs_util.hash_frame(read_packet))] = write_packet
+ try:
+ self.packet_layer.close()
+ except IOError:
+ pass
+ self.connected = False
+ continue
+
+ if not io_occured:
+ time.sleep(1)
+ try:
+ self.packet_layer.close()
+ except IOError:
+ pass
+
+ def connect(self, *args, **kwargs):
+ with self.lock:
+ if self.connect_thread:
+ raise RuntimeError('already connected')
+
+ self.running = True
+ self.connect_args = args
+ self.connect_kwargs = kwargs
+ self.connect_thread = threading.Thread(target=self.__run)
+ self.connect_thread.start()
+
+ def close(self):
+ with self.lock:
+ if not self.connect_thread:
+ raise RuntimeError('not connected')
+
+ self.running = False
+ self.connect_thread.join()
+ self.connect_thread = None
+
+ def read(self):
+ with self.lock:
+ if self.from_packet_layer:
+ return self.from_packet_layer.popitem()[1]
+ return None
+
+ def write(self, packet):
+ with self.lock:
+ self.to_packet_layer[str(aprs_util.hash_frame(packet))] = packet
diff --git a/src/apex/cli.py b/src/apex/cli.py
index 9bdc72a..efb9e13 100644
--- a/src/apex/cli.py
+++ b/src/apex/cli.py
@@ -32,11 +32,12 @@ import traceback
import click
import apex.aprs
+import apex.buffers
from apex.kiss import constants as kissConstants
from apex.plugin_loader import get_plugins
from apex.plugin_loader import load_plugin
-from .nonrepeating_buffer import NonrepeatingBuffer
+from .buffers import NonrepeatingBuffer
from .util import echo_colorized_error
from .util import echo_colorized_warning
@@ -105,11 +106,11 @@ def configure(configfile, verbose=False):
if config.has_option(section, 'com_port') and config.has_option(section, 'baud'):
com_port = config.get(section, 'com_port')
baud = config.get(section, 'baud')
- kiss_tnc = apex.aprs.ReconnectingPacketBuffer(apex.aprs.Aprs(apex.kiss.KissSerial(com_port=com_port, baud=baud)))
+ kiss_tnc = apex.buffers.ReconnectingPacketBuffer(apex.aprs.Aprs(apex.kiss.KissSerial(com_port=com_port, baud=baud)))
elif config.has_option(section, 'tcp_host') and config.has_option(section, 'tcp_port'):
tcp_host = config.get(section, 'tcp_host')
tcp_port = config.get(section, 'tcp_port')
- kiss_tnc = apex.aprs.ReconnectingPacketBuffer(apex.aprs.Aprs(apex.kiss.KissTcp(host=tcp_host, tcp_port=tcp_port)))
+ kiss_tnc = apex.buffers.ReconnectingPacketBuffer(apex.aprs.Aprs(apex.kiss.KissTcp(host=tcp_host, tcp_port=tcp_port)))
else:
echo_colorized_error("""Invalid configuration, must have both com_port and baud set or tcp_host and
tcp_port set in TNC sections of configuration file""")
@@ -153,7 +154,7 @@ def configure(configfile, verbose=False):
aprsis_password = -1
aprsis_server = config.get('APRS-IS', 'server')
aprsis_server_port = config.get('APRS-IS', 'server_port')
- aprsis_base = apex.aprs.ReconnectingPacketBuffer(apex.aprs.IGate(aprsis_callsign, aprsis_password))
+ aprsis_base = apex.buffers.ReconnectingPacketBuffer(apex.aprs.IGate(aprsis_callsign, aprsis_password))
aprsis = NonrepeatingBuffer(aprsis_base, 'APRS-IS')
aprsis.connect(aprsis_server, int(aprsis_server_port))
diff --git a/src/apex/nonrepeating_buffer.py b/src/apex/nonrepeating_buffer.py
deleted file mode 100644
index f401288..0000000
--- a/src/apex/nonrepeating_buffer.py
+++ /dev/null
@@ -1,63 +0,0 @@
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-
-# These imports are for python3 compatibility inside python2
-from __future__ import absolute_import
-from __future__ import division
-from __future__ import print_function
-
-import threading
-import cachetools
-
-import apex.aprs.util
-
-
-class NonrepeatingBuffer(object):
- def __init__(self, base_tnc, base_name, base_port=None, echo_packets=True, buffer_size=10000, buffer_time=30):
- self.packet_cache = cachetools.TTLCache(buffer_size, buffer_time)
- self.lock = threading.Lock()
- self.base_tnc = base_tnc
- self.base_port = base_port
- self.base_name = base_name
- self.echo_packets = echo_packets
-
- @property
- def port(self):
- return self.base_port
-
- @property
- def name(self):
- return self.base_name
-
- def connect(self, *args, **kwargs):
- self.base_tnc.connect(*args, **kwargs)
-
- def close(self, *args, **kwargs):
- self.base_tnc.close(*args, **kwargs)
-
- def write(self, frame, *args, **kwargs):
- with self.lock:
- frame_hash = str(apex.aprs.util.hash_frame(frame))
- if frame_hash not in self.packet_cache:
- self.packet_cache[frame_hash] = frame
- if self.base_port:
- self.base_tnc.write(frame, self.base_port)
- else:
- self.base_tnc.write(frame)
-
- if self.echo_packets:
- apex.echo_colorized_frame(frame, self.base_name, False)
-
- def read(self, *args, **kwargs):
- with self.lock:
- frame = self.base_tnc.read(*args, **kwargs)
- if not frame:
- return frame
- frame_hash = str(apex.aprs.util.hash_frame(frame))
- if frame_hash not in self.packet_cache:
- self.packet_cache[frame_hash] = frame
- if self.echo_packets:
- apex.echo_colorized_frame(frame, self.base_name, True)
- return frame
- else:
- return None
--
GitLab