diff --git a/src/apex/aprs/aprs.py b/src/apex/aprs/aprs.py index 64d01eb6b7222b4f9367d815cf42a5de8c281742..b54c780973f9b41ce1dad725ca80d7d01645f338 100644 --- a/src/apex/aprs/aprs.py +++ b/src/apex/aprs/aprs.py @@ -9,6 +9,7 @@ from __future__ import division from __future__ import print_function import logging +import threading import apex.kiss @@ -26,6 +27,7 @@ class Aprs(object): def __init__(self, data_stream): self.data_stream = data_stream + self.lock = threading.Lock() @staticmethod def __decode_frame(raw_frame): @@ -182,14 +184,16 @@ class Aprs(object): :param frame: APRS frame to write to KISS device. :type frame: dict """ - encoded_frame = Aprs.__encode_frame(frame) - self.data_stream.write(encoded_frame, port) + with self.lock: + encoded_frame = Aprs.__encode_frame(frame) + self.data_stream.write(encoded_frame, port) def read(self): """Reads APRS-encoded frame from KISS device. """ - frame = self.data_stream.read() - if frame is not None and len(frame): - return Aprs.__decode_frame(frame) - else: - return None + with self.lock: + frame = self.data_stream.read() + if frame is not None and len(frame): + return Aprs.__decode_frame(frame) + else: + return None diff --git a/src/apex/cli.py b/src/apex/cli.py index 349dd5961f95971623da727ca949bbc24ac80bc0..9000bbad88b1ca8852ac687ec4bb007ea4e1e269 100644 --- a/src/apex/cli.py +++ b/src/apex/cli.py @@ -49,6 +49,11 @@ __license__ = 'Apache License, Version 2.0' __copyright__ = 'Copyright 2016, Syncleus, Inc. and contributors' __credits__ = [] +port_map = {} +running = True +plugin_modules = [] +plugin_threads = [] + def find_config(config_paths, verbose): config_file = 'apex.conf' @@ -85,7 +90,6 @@ def find_config(config_paths, verbose): @click.option('-v', '--verbose', is_flag=True, help='Enables verbose mode.') def main(verbose, configfile): - port_map = {} config = find_config(configfile, verbose) if config is None: click.echo(click.style('Error: ', fg='red', bold=True, blink=True) + @@ -147,18 +151,10 @@ def main(verbose, configfile): aprsis = apex.aprs.AprsInternetService(aprsis_callsign, aprsis_password) aprsis.connect(aprsis_server, int(aprsis_server_port)) - def sigint_handler(signal, frame): - for port in port_map.values(): - port['tnc'].data_stream.close() - sys.exit(0) - - signal.signal(signal.SIGINT, sigint_handler) - click.echo("Press ctrl + c at any time to exit") packet_cache = cachetools.TTLCache(10000, 5) # start the plugins - plugins = [] try: plugin_loaders = get_plugins() if not len(plugin_loaders): @@ -168,15 +164,37 @@ def main(verbose, configfile): if verbose: click.echo('Plugin found at the following location: %s' % repr(plugin_loader)) loaded_plugin = load_plugin(plugin_loader) - plugins.append(loaded_plugin) - threading.Thread(target=loaded_plugin.start, args=(config, port_map, packet_cache, aprsis)).start() + plugin_modules.append(loaded_plugin) + new_thread = threading.Thread(target=loaded_plugin.start, args=(config, port_map, packet_cache, aprsis)) + new_thread.start() + plugin_threads.append(new_thread) except IOError: click.echo(click.style('Warning: ', fg='yellow') + click.style('plugin directory not found, will only display incoming messages.')) + def sigint_handler(signal, frame): + global running + global port_map + global plugin_modules + + running = False + + click.echo('SIGINT caught, exiting APEX...') + + for plugin_module in plugin_modules: + plugin_module.stop() + # Lets wait until all the plugins successfully end + for plugin_thread in plugin_threads: + plugin_thread.join() + for port in port_map.values(): + port['tnc'].data_stream.close() + sys.exit(0) + + signal.signal(signal.SIGINT, sigint_handler) + if verbose: click.echo('Starting packet processing...') - while 1: + while running: something_read = False try: for port_name in port_map.keys(): @@ -194,9 +212,9 @@ def main(verbose, configfile): formatted_aprs += frame['text'] click.echo(click.style(port_name + ' << ', fg='magenta') + formatted_aprs) - for plugin in plugins: + for plugin_module in plugin_modules: something_read = True - plugin.handle_packet(frame, port, port_name) + plugin_module.handle_packet(frame, port, port_name) except Exception as ex: # We want to keep this thread alive so long as the application runs. traceback.print_exc(file=sys.stdout) diff --git a/src/apex/kiss/kiss.py b/src/apex/kiss/kiss.py index 19cafb909686f154fb8a35e14970adb9a89beae2..f4a1d729abe7f29701b845a164189236d9cb6048 100644 --- a/src/apex/kiss/kiss.py +++ b/src/apex/kiss/kiss.py @@ -9,6 +9,7 @@ from __future__ import division from __future__ import print_function import logging +import threading from abc import ABCMeta from abc import abstractmethod from six import with_metaclass @@ -41,6 +42,7 @@ class Kiss(with_metaclass(ABCMeta, object)): def __init__(self, strip_df_start=True): self.strip_df_start = strip_df_start self.exit_kiss = False + self.lock = threading.Lock() @staticmethod def __strip_df_start(frame): @@ -193,15 +195,16 @@ class Kiss(with_metaclass(ABCMeta, object)): self.frame_buffer.append(new_frame) def read(self): - if not len(self.frame_buffer): - self.fill_buffer() - - if len(self.frame_buffer): - return_frame = self.frame_buffer[0] - del self.frame_buffer[0] - return return_frame - else: - return None + with self.lock: + if not len(self.frame_buffer): + self.fill_buffer() + + if len(self.frame_buffer): + return_frame = self.frame_buffer[0] + del self.frame_buffer[0] + return return_frame + else: + return None def write(self, frame_bytes, port=0): """ @@ -209,7 +212,8 @@ class Kiss(with_metaclass(ABCMeta, object)): :param frame: Frame to write. """ - kiss_packet = [kiss_constants.FEND] + [Kiss.__command_byte_combine(port, kiss_constants.DATA_FRAME)] + \ - Kiss.__escape_special_codes(frame_bytes) + [kiss_constants.FEND] + with self.lock: + kiss_packet = [kiss_constants.FEND] + [Kiss.__command_byte_combine(port, kiss_constants.DATA_FRAME)] + \ + Kiss.__escape_special_codes(frame_bytes) + [kiss_constants.FEND] - return self._write_interface(kiss_packet) + return self._write_interface(kiss_packet) diff --git a/src/apex/plugin_loader.py b/src/apex/plugin_loader.py index 5333105166c02fc0280635486d004a2fd66798f3..be8e7e8548f8134321727ce1d5826ee89547da76 100644 --- a/src/apex/plugin_loader.py +++ b/src/apex/plugin_loader.py @@ -4,7 +4,6 @@ from __future__ import division from __future__ import print_function import importlib -import os __author__ = 'Jeffrey Phillips Freeman (WI2ARD)' __maintainer__ = 'Jeffrey Phillips Freeman (WI2ARD)' @@ -16,14 +15,14 @@ __credits__ = [] PluginFolder = './plugins' MainModule = '__init__' -plugins = ['apex.plugins.apexparadigm', +plugin_modules = ['apex.plugins.apexparadigm', 'apex.plugins.beacon', 'apex.plugins.id', 'apex.plugins.status'] def get_plugins(): - return plugins + return plugin_modules def load_plugin(plugin): diff --git a/src/apex/plugins/apexparadigm/__init__.py b/src/apex/plugins/apexparadigm/__init__.py index 651130dcffd06de8e450e9a64034835218e9459a..e7481f322dc725e584bc95a471ada0e72c4aecfe 100644 --- a/src/apex/plugins/apexparadigm/__init__.py +++ b/src/apex/plugins/apexparadigm/__init__.py @@ -30,6 +30,10 @@ def handle_packet(frame, recv_port, recv_port_name): plugin.handle_packet(frame, recv_port, recv_port_name) +def stop(): + plugin.stop() + + class ApexParadigmPlugin(object): BAND_PATH_REGEX = re.compile(r'(\d{1,4})M(\d{0,3})') @@ -273,6 +277,9 @@ class ApexParadigmPlugin(object): click.echo(selected_hop['port_name'] + ' >> ' + apex.aprs.util.format_aprs_frame(frame)) return + def stop(self): + return + def run(self): return diff --git a/src/apex/plugins/beacon/__init__.py b/src/apex/plugins/beacon/__init__.py index ae1384c53bd43877a5ac09b0e9f8d82f41e3bd02..885e58a1c2627caba23ddb09fb02e2944aced6ce 100644 --- a/src/apex/plugins/beacon/__init__.py +++ b/src/apex/plugins/beacon/__init__.py @@ -29,12 +29,17 @@ def handle_packet(frame, recv_port, recv_port_name): return +def stop(): + plugin.stop() + + class BeaconPlugin(object): def __init__(self, config, port_map, packet_cache, aprsis): self.port_map = port_map self.packet_cache = packet_cache self.aprsis = aprsis + self.running = False for section in config.sections(): if section.startswith('TNC '): @@ -46,16 +51,31 @@ class BeaconPlugin(object): port['beacon_text'] = config.get(port_section, 'beacon_text') port['beacon_path'] = config.get(port_section, 'beacon_path') + def stop(self): + self.running = False + def run(self): - while 1: - for port_name in self.port_map.keys(): - port = self.port_map[port_name] - - beacon_frame = {'source': port['identifier'], 'destination': 'APRS', - 'path': port['beacon_path'].split(','), 'text': port['beacon_text']} - frame_hash = apex.aprs.util.hash_frame(beacon_frame) - if frame_hash not in self.packet_cache.values(): - self.packet_cache[str(frame_hash)] = frame_hash - port['tnc'].write(beacon_frame, port['tnc_port']) - click.echo(port_name + ' >> ' + apex.aprs.util.format_aprs_frame(beacon_frame)) - time.sleep(600) + self.running = True + + # Don't do anything in the first 30 seconds + last_trigger = time.time() + while self.running and time.time() - last_trigger < 30: + time.sleep(1) + + # run every 600 second + last_trigger = time.time() + while self.running: + if time.time() - last_trigger >= 600: + last_trigger = time.time() + for port_name in self.port_map.keys(): + port = self.port_map[port_name] + + beacon_frame = {'source': port['identifier'], 'destination': 'APRS', + 'path': port['beacon_path'].split(','), 'text': port['beacon_text']} + frame_hash = apex.aprs.util.hash_frame(beacon_frame) + if frame_hash not in self.packet_cache.values(): + self.packet_cache[str(frame_hash)] = frame_hash + port['tnc'].write(beacon_frame, port['tnc_port']) + click.echo(port_name + ' >> ' + apex.aprs.util.format_aprs_frame(beacon_frame)) + else: + time.sleep(1) diff --git a/src/apex/plugins/id/__init__.py b/src/apex/plugins/id/__init__.py index 1e6364e93367e87b06c10f0028d3826229dc010e..931227bc13be93043949b07ebfb71201da4243fa 100644 --- a/src/apex/plugins/id/__init__.py +++ b/src/apex/plugins/id/__init__.py @@ -29,12 +29,17 @@ def handle_packet(frame, recv_port, recv_port_name): return +def stop(): + plugin.stop() + + class IdPlugin(object): def __init__(self, config, port_map, packet_cache, aprsis): self.port_map = port_map self.packet_cache = packet_cache self.aprsis = aprsis + self.running = False for section in config.sections(): if section.startswith('TNC '): @@ -46,17 +51,31 @@ class IdPlugin(object): port['id_text'] = config.get(port_section, 'id_text') port['id_path'] = config.get(port_section, 'id_path') + def stop(self): + self.running = False + def run(self): - time.sleep(30) - while 1: - for port_name in self.port_map.keys(): - port = self.port_map[port_name] - - id_frame = {'source': port['identifier'], 'destination': 'ID', 'path': port['id_path'].split(','), - 'text': port['id_text']} - frame_hash = apex.aprs.util.hash_frame(id_frame) - if frame_hash not in self.packet_cache.values(): - self.packet_cache[str(frame_hash)] = frame_hash - port['tnc'].write(id_frame, port['tnc_port']) - click.echo(port_name + ' >> ' + apex.aprs.util.format_aprs_frame(id_frame)) - time.sleep(600) + self.running = True + + # Don't do anything in the first 30 seconds + last_trigger = time.time() + while self.running and time.time() - last_trigger < 30: + time.sleep(1) + + # run every 600 second + last_trigger = time.time() + while self.running: + if time.time() - last_trigger >= 600: + last_trigger = time.time() + for port_name in self.port_map.keys(): + port = self.port_map[port_name] + + id_frame = {'source': port['identifier'], 'destination': 'ID', 'path': port['id_path'].split(','), + 'text': port['id_text']} + frame_hash = apex.aprs.util.hash_frame(id_frame) + if frame_hash not in self.packet_cache.values(): + self.packet_cache[str(frame_hash)] = frame_hash + port['tnc'].write(id_frame, port['tnc_port']) + click.echo(port_name + ' >> ' + apex.aprs.util.format_aprs_frame(id_frame)) + else: + time.sleep(1) diff --git a/src/apex/plugins/status/__init__.py b/src/apex/plugins/status/__init__.py index eae2df712a5d06e57044e4eed5cb5daca3c491f3..bcfb1dd3b554ea5fcc9f359ac2bce24f84544cd9 100644 --- a/src/apex/plugins/status/__init__.py +++ b/src/apex/plugins/status/__init__.py @@ -28,12 +28,17 @@ def handle_packet(frame, recv_port, recv_port_name): return +def stop(): + plugin.stop() + + class StatusPlugin(object): def __init__(self, config, port_map, packet_cache, aprsis): self.port_map = port_map self.packet_cache = packet_cache self.aprsis = aprsis + self.running = False for section in config.sections(): if section.startswith('TNC '): @@ -45,20 +50,34 @@ class StatusPlugin(object): port['status_text'] = config.get(port_section, 'status_text') port['status_path'] = config.get(port_section, 'status_path') + def stop(self): + self.running = False + def run(self): - time.sleep(60) - while 1: - for port_name in self.port_map.keys(): - port = self.port_map[port_name] - - status_frame = { - 'source': port['identifier'], - 'destination': 'APRS', - 'path': port['status_path'].split(','), - 'text': port['status_text']} - frame_hash = apex.aprs.util.hash_frame(status_frame) - if frame_hash not in self.packet_cache.values(): - self.packet_cache[str(frame_hash)] = frame_hash - port['tnc'].write(status_frame, port['tnc_port']) - print(port_name + ' >> ' + apex.aprs.util.format_aprs_frame(status_frame)) - time.sleep(600) + self.running = True + + # Don't do anything in the first 60 seconds + last_trigger = time.time() + while self.running and time.time() - last_trigger < 60: + time.sleep(1) + + # run the id every 600 seconds + last_trigger = time.time() + while self.running: + if time.time() - last_trigger >= 600: + last_trigger = time.time() + for port_name in self.port_map.keys(): + port = self.port_map[port_name] + + status_frame = { + 'source': port['identifier'], + 'destination': 'APRS', + 'path': port['status_path'].split(','), + 'text': port['status_text']} + frame_hash = apex.aprs.util.hash_frame(status_frame) + if frame_hash not in self.packet_cache.values(): + self.packet_cache[str(frame_hash)] = frame_hash + port['tnc'].write(status_frame, port['tnc_port']) + print(port_name + ' >> ' + apex.aprs.util.format_aprs_frame(status_frame)) + else: + time.sleep(1)