diff --git a/.gitignore b/.gitignore index a74475a34668d830688fb6e2e562732d5c8cdb34..bdf05e0b7b3903ccf379f63376ef14fb81e09ff0 100644 --- a/.gitignore +++ b/.gitignore @@ -62,3 +62,7 @@ docs/_build .bootstrap .appveyor.token *.bak +**/__pycache__ + +apex.cfg + diff --git a/CONTRIBUTORS.rst b/CONTRIBUTORS.rst new file mode 100644 index 0000000000000000000000000000000000000000..bc388c1b9df339bb015f6a498f589b9b50fdc388 --- /dev/null +++ b/CONTRIBUTORS.rst @@ -0,0 +1,11 @@ +* Jeffrey Phillips Freeman WI2ARD - http://JeffreyFreeman.me +* Martin Murray KD8LVZ +* Paul McMillan - https://github.com/PaulMcMillan +* Russ Innes +* John Hogenmiller KB3DFZ - https://github.com/ytjohn +* Phil Gagnon N1HHG +* Ben Benesh - https://github.com/bbene +* Joe Goforth +* Rick Eason +* Jay Nugent +* Pete Loveall AE5PL diff --git a/LICENSE b/LICENSE index 27a8126117aa5ca5f44cb1501bf55590166bab2c..269928377d82556e30c10b002ad0f7a933cf1547 100644 --- a/LICENSE +++ b/LICENSE @@ -1,19 +1,13 @@ -Copyright (c) 2016, Jeffrey Phillips Freeman -All rights reserved. +Copyright 2016 Syncleus, Inc. -Redistribution and use in source and binary forms, with or without modification, are permitted provided that the -following conditions are met: +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at -1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following -disclaimer. + http://www.apache.org/licenses/LICENSE-2.0 -2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following -disclaimer in the documentation and/or other materials provided with the distribution. - -THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, -INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE -DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR -SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, -WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF -THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. diff --git a/README.rst b/README.rst index 010d622c47c3b873fad6563a6c984d3608c60061..17e35e4868832aa9f96c07d0f4ea648d271b435e 100644 --- a/README.rst +++ b/README.rst @@ -75,7 +75,30 @@ Overview .. end-badges -APEX reference implementation +APEX is a next generation APRS based protocol. This repository represents the reference implementation and is a full features application for digipeating across multiple AX.25 KISS TNC devices using the full APEX stack. + +For more information on the project please check out [the project's home page](http://apexprotocol.com/). + +## Running the app + +Right now the setup.py has a few bugs in it. So you can either try to fix it, wait for us to fix it, or simply install +the prerequsites manually. The following is a list of the preequsites that need to be installed. + + pynmea2 >= 1.4.2 + pyserial >= 2.7 + requests >= 2.7.0 + cachetools >= 1.1.5 + +The application is written for python 3 specifically, it may not work with python 2. Once installed copy the +apex.cfg.example file over to apex.cfg in the same directory, then edit the file and replace it with your details. Next +just run the application with the following command. + + python ./apex.py + +There isnt much to the application right now, so thats all you should need to run it. Digipeating will occur +automatically and respond to the WIDEN-n paradigm as well as your own callsign. Cross-band repeating is enabled right +now but only by specifying the call sign directly. The application is still pre-release so more features and +configuration options should be added soon. * Free software: BSD license diff --git a/apex.cfg.example b/apex.cfg.example new file mode 100644 index 0000000000000000000000000000000000000000..ad0aecedc167d3c06e9ad58ca05857e6cfe52ff9 --- /dev/null +++ b/apex.cfg.example @@ -0,0 +1,46 @@ +[TNC KENWOOD] +com_port=/dev/ttyUSB1 +baud=9600 +parity=none +stop_bits=1 +byte_size=8 +port_count=1 +kiss_init=MODE_INIT_KENWOOD_D710 + +[TNC RPR] +com_port=/dev/ttyUSB0 +baud=38400 +parity=none +stop_bits=1 +byte_Size=8 +port_count=1 +kiss_init=MODE_INIT_W8DED + +[PORT KENWOOD-1] +identifier=WI2ARD-1 +net=2M1 +tnc_port=0 +beacon_path=WIDE1-1,WIDE2-2 +status_path=WIDE1-1,WIDE2-2 +beacon_text=!/:=i@;N.G& --PHG5790/G/D R-I-R H24 C30 +status_text=>Listening on 146.52Mhz http://JeffreyFreeman.me +id_text=WI2ARD/30M1 GATE/2M1 WI2ARD-1/2M1 WIDEN-n IGATE +id_path=WIDE1-1,WIDE2-2 + +[PORT RPR-1] +identifier=WI2ARD +net=30M1 +tnc_port=0 +beacon_path=WIDE1-1 +status_path=WIDE1-1 +beacon_text=!/:=i@;N.G& --PHG5210/G/D R-I-R H24 C1 +status_text=>Robust Packet Radio http://JeffreyFreeman.me +id_text=WI2ARD/30M1 GATE/2M1 WI2ARD-1/2M1 WIDEN-n IGATE +id_path=WIDE1-1 + +[APRS-IS] +callsign=WI2ARD +password=12345 +server=noam.aprs2.net +server_port=14580 + diff --git a/setup.py b/setup.py index 270bb8a1c38e7969802f9058af0392fd3a185b22..02de1063e49f3c9b9ccd997d1ea8ce9164494628 100644 --- a/setup.py +++ b/setup.py @@ -31,7 +31,7 @@ setup( re.compile('^.. start-badges.*^.. end-badges', re.M | re.S).sub('', read('README.rst')), re.sub(':[a-z]+:`~?(.*?)`', r'``\1``', read('CHANGELOG.rst')) ), - author='Jeffrey Phillips Freeman', + author='Jeffrey Phillips Freeman (WI2ARD)', author_email='freemo@gmail.com', url='https://github.com/syncleus/apex', packages=find_packages('src'), @@ -66,6 +66,10 @@ setup( ], install_requires=[ 'click', + 'pynmea2 >= 1.4.2', + 'pyserial >= 2.7', + 'requests >= 2.7.0', + 'cachetools >= 1.1.5' ], extras_require={ # eg: diff --git a/src/apex/apex.py b/src/apex/apex.py new file mode 100755 index 0000000000000000000000000000000000000000..0f98920c4462490342e84a7c7bb5a5616237b0ce --- /dev/null +++ b/src/apex/apex.py @@ -0,0 +1,101 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +"""This is the entry point for the application, just a sandbox right now.""" +import aprs.aprs_kiss + +__author__ = 'Jeffrey Phillips Freeman WI2ARD <freemo@gmail.com>' +__license__ = 'Apache License, Version 2.0' +__copyright__ = 'Copyright 2016, Syncleus, Inc. and contributors' + +import time +import signal +import sys +import kiss.constants +import aprs +import aprs.util +import threading +import configparser +import cachetools +import traceback +import pluginloader + +port_map = {} +config = configparser.ConfigParser() +config.read('apex.cfg') +for section in config.sections(): + if section.startswith("TNC "): + tnc_name = section.split(" ")[1] + if config.has_option(section, 'com_port') and config.has_option(section, 'baud'): + com_port = config.get(section, 'com_port') + baud = config.get(section, 'baud') + kiss_tnc = aprs.AprsKiss(com_port=com_port, baud=baud) + elif config.has_option(section, 'tcp_host') and config.has_option(section, 'tcp_port'): + tcp_host = config.get(section, 'tcp_host') + tcp_port = config.get(section, 'tcp_port') + kiss_tnc = aprs.AprsKiss(host=tcp_host, tcp_port=tcp_port) + else: + raise Exception("Must have either both com_port and baud set or tcp_host and tcp_port set in configuration file") + kiss_init_string = config.get(section,'kiss_init') + if kiss_init_string == 'MODE_INIT_W8DED': + kiss_tnc.start(kiss.constants.MODE_INIT_W8DED) + elif kiss_init_string == 'MODE_INIT_KENWOOD_D710': + kiss_tnc.start(kiss.constants.MODE_INIT_KENWOOD_D710) + elif kiss_init_string == 'NONE': + kiss_tnc.start() + else: + raise Exception("KISS init mode not specified") + for port in range(1, 1+int(config.get(section, 'port_count'))): + port_name = tnc_name + '-' + str(port) + port_section = 'PORT ' + port_name + port_identifier = config.get(port_section, 'identifier') + port_net = config.get(port_section, 'net') + tnc_port = int(config.get(port_section, 'tnc_port')) + port_map[port_name] = {'identifier':port_identifier, 'net':port_net, 'tnc':kiss_tnc, 'tnc_port':tnc_port} +aprsis_callsign = config.get('APRS-IS', 'callsign') +if config.has_option('APRS-IS', 'password'): + aprsis_password = config.get('APRS-IS', 'password') +else: + aprsis_password = -1 +aprsis_server = config.get('APRS-IS', 'server') +aprsis_server_port = config.get('APRS-IS', 'server_port') +aprsis = aprs.AprsInternetService(aprsis_callsign, aprsis_password) +aprsis.connect(aprsis_server, int(aprsis_server_port)) +packet_cache = cachetools.TTLCache(10000, 5) + +def sigint_handler(signal, frame): + for port in port_map.values(): + port['tnc'].close() + sys.exit(0) + +signal.signal(signal.SIGINT, sigint_handler) + +print("Press ctrl + c at any time to exit") + +#start the plugins +plugins = [] +plugin_loaders=pluginloader.getPlugins() +for plugin_loader in plugin_loaders: + loaded_plugin=pluginloader.loadPlugin(plugin_loader) + plugins.append(loaded_plugin) + threading.Thread(target=loaded_plugin.start, args=(config, port_map, packet_cache, aprsis)).start() + +while 1: + something_read = False + try: + for port_name in port_map.keys(): + port = port_map[port_name] + frame = port['tnc'].read() + if frame: + formatted_aprs = aprs.util.format_aprs_frame(frame) + print(port_name + " << " + formatted_aprs) + for plugin in plugins: + something_read = True + plugin.handle_packet(frame, port, port_name) + except Exception as ex: + # We want to keep this thread alive so long as the application runs. + traceback.print_exc(file=sys.stdout) + print("caught exception while reading packet: " + str(ex)) + + if something_read is False: + time.sleep(1) diff --git a/src/apex/aprs/__init__.py b/src/apex/aprs/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..d66c8e5567197625fc4a3325a0b9474ddd00df30 --- /dev/null +++ b/src/apex/aprs/__init__.py @@ -0,0 +1,32 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +# APRS Python Module. + +""" +APRS Python Module. +~~~~ + + +:author: Jeffrey Phillips Freeman WI2ARD <freemo@gmail.com> +:copyright: Copyright 2016 Syncleus, Inc. and contributors +:license: Apache License, Version 2.0 +:source: <https://github.com/syncleus/apex> + +""" + +import logging +from aprs.aprs_kiss import AprsKiss +from aprs.aprs_internet_service import AprsInternetService + +# Set default logging handler to avoid "No handler found" warnings. +try: # Python 2.7+ + from logging import NullHandler +except ImportError: + class NullHandler(logging.Handler): + """Default logging handler to avoid "No handler found" warnings.""" + def emit(self, record): + """Default logging handler to avoid "No handler found" warnings.""" + pass + +logging.getLogger(__name__).addHandler(NullHandler()) diff --git a/src/apex/aprs/aprs_internet_service.py b/src/apex/aprs/aprs_internet_service.py new file mode 100644 index 0000000000000000000000000000000000000000..ef90cf42c082c79303a016b63720b8f201b54b08 --- /dev/null +++ b/src/apex/aprs/aprs_internet_service.py @@ -0,0 +1,148 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +"""APRS Internet Service Class Definitions""" + +__author__ = 'Jeffrey Phillips Freeman WI2ARD <freemo@gmail.com>' +__license__ = 'Apache License, Version 2.0' +__copyright__ = 'Copyright 2016, Syncleus, Inc. and contributors' + +import logging +import socket +import requests +import aprs.constants +import time + + +class AprsInternetService(object): + + """APRS Object.""" + + logger = logging.getLogger(__name__) + logger.setLevel(aprs.constants.LOG_LEVEL) + console_handler = logging.StreamHandler() + console_handler.setLevel(aprs.constants.LOG_LEVEL) + console_handler.setFormatter(aprs.constants.LOG_FORMAT) + logger.addHandler(console_handler) + logger.propagate = False + + def __init__(self, user, password='-1', input_url=None): + self.user = user + self._url = input_url or aprs.constants.APRSIS_URL + self._auth = ' '.join( + ['user', user, 'pass', password, 'vers', 'APRS Python Module']) + self.aprsis_sock = None + + def connect(self, server=None, port=None, aprs_filter=None): + """ + Connects & logs in to APRS-IS. + + :param server: Optional alternative APRS-IS server. + :param port: Optional APRS-IS port. + :param filter: Optional filter to use. + :type server: str + :type port: int + :type filte: str + """ + server = server or aprs.constants.APRSIS_SERVER + port = port or aprs.constants.APRSIS_FILTER_PORT + aprs_filter = aprs_filter or '/'.join(['p', self.user]) + + self.full_auth = ' '.join([self._auth, 'filter', aprs_filter]) + + self.server = server + self.port = port + self.aprsis_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.aprsis_sock.connect((server, port)) + self.logger.info('Connected to server=%s port=%s', server, port) + self.logger.debug('Sending full_auth=%s', self.full_auth) + self.aprsis_sock.sendall((self.full_auth + '\n\r').encode('ascii')) + + def send(self, frame, headers=None, protocol='TCP'): + """ + Sends message to APRS-IS. + + :param message: Message to send to APRS-IS. + :param headers: Optional HTTP headers to post. + :param protocol: Protocol to use: One of TCP, HTTP or UDP. + :type message: str + :type headers: dict + + :return: True on success, False otherwise. + :rtype: bool + """ + self.logger.debug( + 'message=%s headers=%s protocol=%s', str(frame), headers, protocol) + + if 'TCP' in protocol: + self.logger.debug('sending message=%s', str(frame)) + # message = frame['source'].encode('ascii') + b">" + frame['destination'].encode('ascii') + aprs.util.format_path(frame['path']).encode('ascii') + b":" + frame['text'] + b'\n\r' + # TODO: simplify this + message = bytearray() + for frame_chr in aprs.util.format_aprs_frame(frame): + message.append(ord(frame_chr)) + message_sent = False + while not message_sent: + try: + self.aprsis_sock.sendall(message) + message_sent = True + except (ConnectionResetError, BrokenPipeError) as ex: + #connection reset wait a second then try again + time.sleep(1) + self.aprsis_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.aprsis_sock.connect((self.server, self.port)) + self.aprsis_sock.sendall((self.full_auth + '\n\r').encode('ascii')) + return True + elif 'HTTP' in protocol: + content = "\n".join([self._auth, aprs.util.format_aprs_frame(frame)]) + headers = headers or aprs.constants.APRSIS_HTTP_HEADERS + result = requests.post(self._url, data=content, headers=headers) + return 204 in result.status_code + elif 'UDP' in protocol: + content = "\n".join([self._auth, aprs.util.format_aprs_frame(frame)]) + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + sock.sendto( + content, + (aprs.constants.APRSIS_SERVER, aprs.constants.APRSIS_RX_PORT) + ) + return True + + def receive(self, callback=None): + """ + Receives from APRS-IS. + + :param callback: Optional callback to deliver data to. + :type callback: func + """ + recvd_data = '' + + try: + while 1: + recv_data = self.aprsis_sock.recv(aprs.constants.RECV_BUFFER) + + if not recv_data: + break + + recvd_data += recv_data + + self.logger.debug('recv_data=%s', recv_data.strip()) + + if recvd_data.endswith('\r\n'): + lines = recvd_data.strip().split('\r\n') + recvd_data = '' + else: + lines = recvd_data.split('\r\n') + recvd_data = str(lines.pop(-1)) + + for line in lines: + if line.startswith('#'): + if 'logresp' in line: + self.logger.debug('logresp=%s', line) + else: + self.logger.debug('line=%s', line) + if callback: + callback(line) + + except socket.error as sock_err: + self.logger.error(sock_err) + raise \ No newline at end of file diff --git a/src/apex/aprs/aprs_kiss.py b/src/apex/aprs/aprs_kiss.py new file mode 100644 index 0000000000000000000000000000000000000000..446573571b841de7b58129f8c5dcbb7d179939f3 --- /dev/null +++ b/src/apex/aprs/aprs_kiss.py @@ -0,0 +1,184 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +"""APRS KISS Class Definitions""" + +__author__ = 'Jeffrey Phillips Freeman WI2ARD <freemo@gmail.com>' +__license__ = 'Apache License, Version 2.0' +__copyright__ = 'Copyright 2016, Syncleus, Inc. and contributors' + +import logging +import math + +import kiss + + +class AprsKiss(kiss.Kiss): + + """APRS interface for KISS serial devices.""" + + @staticmethod + def __decode_frame(raw_frame): + """ + Decodes a KISS-encoded APRS frame. + + :param raw_frame: KISS-encoded frame to decode. + :type raw_frame: str + + :return: APRS frame-as-dict. + :rtype: dict + """ + logging.debug('raw_frame=%s', raw_frame) + frame = {} + frame_len = len(raw_frame) + + if frame_len > 16: + for raw_slice in range(0, frame_len): + # Is address field length correct? + if raw_frame[raw_slice] & 0x01 and ((raw_slice + 1) % 7) == 0: + i = (raw_slice + 1) / 7 + # Less than 2 callsigns? + if 1 < i < 11: + if (raw_frame[raw_slice + 1] & 0x03 == 0x03 and raw_frame[raw_slice + 2] in [0xf0, 0xcf]): + frame['text'] = raw_frame[raw_slice + 3:] + frame['destination'] = AprsKiss.__identity_as_string(AprsKiss.__extract_callsign(raw_frame)) + frame['source'] = AprsKiss.__identity_as_string(AprsKiss.__extract_callsign(raw_frame[7:])) + frame['path'] = AprsKiss.__extract_path(math.floor(i), raw_frame) + return frame + + logging.debug('frame=%s', frame) + return frame + + @staticmethod + def __extract_path(start, raw_frame): + """Extracts path from raw APRS KISS frame. + + :param start: + :param raw_frame: Raw APRS frame from a KISS device. + + :return: Full path from APRS frame. + :rtype: list + """ + full_path = [] + + for i in range(2, start): + path = AprsKiss.__identity_as_string(AprsKiss.__extract_callsign(raw_frame[i * 7:])) + if path: + if raw_frame[i * 7 + 6] & 0x80: + full_path.append(''.join([path, '*'])) + else: + full_path.append(path) + + return full_path + + @staticmethod + def __extract_callsign(raw_frame): + """ + Extracts callsign from a raw KISS frame. + + :param raw_frame: Raw KISS Frame to decode. + :returns: Dict of callsign and ssid. + :rtype: dict + """ + callsign = ''.join([chr(x >> 1) for x in raw_frame[:6]]).strip() + ssid = ((raw_frame[6]) >> 1) & 0x0f + return {'callsign': callsign, 'ssid': ssid} + + @staticmethod + def __identity_as_string(identity): + """ + Returns a fully-formatted callsign (Callsign + SSID). + + :param identity: Callsign Dictionary {'callsign': '', 'ssid': n} + :type callsign: dict + :returns: Callsign[-SSID]. + :rtype: str + """ + if identity['ssid'] > 0: + return '-'.join([identity['callsign'], str(identity['ssid'])]) + else: + return identity['callsign'] + + @staticmethod + def __encode_frame(frame): + """ + Encodes an APRS frame-as-dict as a KISS frame. + + :param frame: APRS frame-as-dict to encode. + :type frame: dict + + :return: KISS-encoded APRS frame. + :rtype: list + """ + enc_frame = AprsKiss.__encode_callsign(AprsKiss.__parse_identity_string(frame['destination'])) + AprsKiss.__encode_callsign(AprsKiss.__parse_identity_string(frame['source'])) + for p in frame['path']: + enc_frame += AprsKiss.__encode_callsign(AprsKiss.__parse_identity_string(p)) + + return enc_frame[:-1] + [enc_frame[-1] | 0x01] + [kiss.constants.SLOT_TIME] + [0xf0] + frame['text'] + + @staticmethod + def __encode_callsign(callsign): + """ + Encodes a callsign-dict within a KISS frame. + + :param callsign: Callsign-dict to encode. + :type callsign: dict + + :return: KISS-encoded callsign. + :rtype: list + """ + call_sign = callsign['callsign'] + + enc_ssid = (callsign['ssid'] << 1) | 0x60 + + if '*' in call_sign: + call_sign = call_sign.replace('*', '') + enc_ssid |= 0x80 + + while len(call_sign) < 6: + call_sign = ''.join([call_sign, ' ']) + + encoded = [] + for p in call_sign: + encoded += [ord(p) << 1] + return encoded + [enc_ssid] + + @staticmethod + def __parse_identity_string(identity_string): + """ + Creates callsign-as-dict from callsign-as-string. + + :param identity_string: Callsign-as-string (with or without ssid). + :type raw_callsign: str + + :return: Callsign-as-dict. + :rtype: dict + """ + # If we are parsing a spent token then first lets get rid of the astresick suffix. + if identity_string.endswith('*'): + identity_string = identity_string[:-1] + + if '-' in identity_string: + call_sign, ssid = identity_string.split('-') + else: + call_sign = identity_string + ssid = 0 + return {'callsign': call_sign, 'ssid': int(ssid)} + + def write(self, frame, port=0): + """Writes APRS-encoded frame to KISS device. + + :param frame: APRS frame to write to KISS device. + :type frame: dict + """ + encoded_frame = AprsKiss.__encode_frame(frame) + super(AprsKiss, self).write(encoded_frame, port) + + def read(self): + """Reads APRS-encoded frame from KISS device. + """ + frame = super(AprsKiss, self).read() + if frame is not None and len(frame): + return AprsKiss.__decode_frame(frame) + else: + return None \ No newline at end of file diff --git a/src/apex/aprs/constants.py b/src/apex/aprs/constants.py new file mode 100644 index 0000000000000000000000000000000000000000..8628a466b4a491b8933255338712a3f49fa3623a --- /dev/null +++ b/src/apex/aprs/constants.py @@ -0,0 +1,33 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +""" +Constants for APRS Module. +""" + +__author__ = 'Jeffrey Phillips Freeman WI2ARD <freemo@gmail.com>' +__license__ = 'Apache License, Version 2.0' +__copyright__ = 'Copyright 2016, Syncleus, Inc. and contributors' + + +import logging + + +APRSIS_URL = 'http://srvr.aprs-is.net:8080' +APRSIS_HTTP_HEADERS = { + 'content-type': 'application/octet-stream', + 'accept': 'text/plain' +} +APRSIS_SERVER = 'rotate.aprs.net' +APRSIS_FILTER_PORT = 14580 +APRSIS_RX_PORT = 8080 + +RECV_BUFFER = 1024 + + +LOG_LEVEL = logging.INFO +LOG_FORMAT = logging.Formatter( + ('%(asctime)s %(levelname)s %(name)s.%(funcName)s:%(lineno)d - ' + '%(message)s')) + +GPS_WARM_UP = 5 diff --git a/src/apex/aprs/decimaldegrees.py b/src/apex/aprs/decimaldegrees.py new file mode 100644 index 0000000000000000000000000000000000000000..de7b1de3374c952a6890ac4c95b8b153a846e0c8 --- /dev/null +++ b/src/apex/aprs/decimaldegrees.py @@ -0,0 +1,162 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +""" +PyDecimalDegrees - geographic coordinates conversion utility. + +Copyright (C) 2006-2013 by Mateusz Åoskot <mateusz@loskot.net> +Copyright (C) 2010-2013 by Evan Wheeler <ewheeler@unicef.org> + +This file is part of PyDecimalDegrees module. + +This software is provided 'as-is', without any express or implied warranty. +In no event will the authors be held liable for any damages arising from +the use of this software. + +Permission is granted to anyone to use this software for any purpose, +including commercial applications, and to alter it and redistribute it freely, +subject to the following restrictions: +1. The origin of this software must not be misrepresented; you must not + claim that you wrote the original software. If you use this software + in a product, an acknowledgment in the product documentation would be + appreciated but is not required. +2. Altered source versions must be plainly marked as such, and must not be + misrepresented as being the original software. +3. This notice may not be removed or altered from any source distribution. + +DESCRIPTION + +DecimalDegrees module provides functions to convert between +degrees/minutes/seconds and decimal degrees. + +Original source distribution: +http://mateusz.loskot.net/software/gis/pydecimaldegrees/ + +Inspired by Walter Mankowski's Geo::Coordinates::DecimalDegrees module +for Perl, originally located in CPAN Archives: +http://search.cpan.org/~waltman/Geo-Coordinates-DecimalDegrees-0.05/ + +doctest examples are based following coordinates: +DMS: 121 8' 6" +DM: 121 8.1' +DD: 121.135 + +To run doctest units just execut this module script as follows +(-v instructs Python to run script in verbose mode): + +$ python decimaldegrees.py [-v] + +""" + + +__revision__ = '$Revision: 1.1 $' + + +import decimal as libdecimal + +from decimal import Decimal as D + + +def decimal2dms(decimal_degrees): + """ Converts a floating point number of degrees to the equivalent + number of degrees, minutes, and seconds, which are returned + as a 3-element tuple of decimals. If 'decimal_degrees' is negative, + only degrees (1st element of returned tuple) will be negative, + minutes (2nd element) and seconds (3rd element) will always be positive. + + Example: + + >>> decimal2dms(121.135) + (Decimal('121'), Decimal('8'), Decimal('6.000')) + >>> decimal2dms(-121.135) + (Decimal('-121'), Decimal('8'), Decimal('6.000')) + + """ + + degrees = D(int(decimal_degrees)) + decimal_minutes = libdecimal.getcontext().multiply( + (D(str(decimal_degrees)) - degrees).copy_abs(), D(60)) + minutes = D(int(decimal_minutes)) + seconds = libdecimal.getcontext().multiply( + (decimal_minutes - minutes), D(60)) + return (degrees, minutes, seconds) + + +def decimal2dm(decimal_degrees): + """ + Converts a floating point number of degrees to the degress & minutes. + + Returns a 2-element tuple of decimals. + + If 'decimal_degrees' is negative, only degrees (1st element of returned + tuple) will be negative, minutes (2nd element) will always be positive. + + Example: + + >>> decimal2dm(121.135) + (Decimal('121'), Decimal('8.100')) + >>> decimal2dm(-121.135) + (Decimal('-121'), Decimal('8.100')) + + """ + degrees = D(int(decimal_degrees)) + + minutes = libdecimal.getcontext().multiply( + (D(str(decimal_degrees)) - degrees).copy_abs(), D(60)) + + return (degrees, minutes) + + +def dms2decimal(degrees, minutes, seconds): + """ Converts degrees, minutes, and seconds to the equivalent + number of decimal degrees. If parameter 'degrees' is negative, + then returned decimal-degrees will also be negative. + + NOTE: this method returns a decimal.Decimal + + Example: + + >>> dms2decimal(121, 8, 6) + Decimal('121.135') + >>> dms2decimal(-121, 8, 6) + Decimal('-121.135') + + """ + decimal = D(0) + degs = D(str(degrees)) + mins = libdecimal.getcontext().divide(D(str(minutes)), D(60)) + secs = libdecimal.getcontext().divide(D(str(seconds)), D(3600)) + + if degrees >= D(0): + decimal = degs + mins + secs + else: + decimal = degs - mins - secs + + return libdecimal.getcontext().normalize(decimal) + + +def dm2decimal(degrees, minutes): + """ Converts degrees and minutes to the equivalent number of decimal + degrees. If parameter 'degrees' is negative, then returned decimal-degrees + will also be negative. + + Example: + + >>> dm2decimal(121, 8.1) + Decimal('121.135') + >>> dm2decimal(-121, 8.1) + Decimal('-121.135') + + """ + return dms2decimal(degrees, minutes, 0) + + +def run_doctest(): # pragma: no cover + """Runs doctests for this module.""" + import doctest + import decimaldegrees # pylint: disable=W0406 + return doctest.testmod(decimaldegrees) + + +if __name__ == '__main__': + run_doctest() # pragma: no cover diff --git a/src/apex/aprs/util.py b/src/apex/aprs/util.py new file mode 100755 index 0000000000000000000000000000000000000000..2585b24d65916a2e497f8339a46e551c5c4ef233 --- /dev/null +++ b/src/apex/aprs/util.py @@ -0,0 +1,193 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +"""Utilities for the APRS Python Module.""" + +__author__ = 'Jeffrey Phillips Freeman WI2ARD <freemo@gmail.com>' +__license__ = 'Apache License, Version 2.0' +__copyright__ = 'Copyright 2016, Syncleus, Inc. and contributors' + + +import logging + +import aprs.constants +import aprs.decimaldegrees +import kiss.constants +import math + + +def dec2dm_lat(dec): + """Converts DecDeg to APRS Coord format. + See: http://ember2ash.com/lat.htm + + Source: http://stackoverflow.com/questions/2056750 + + Example: + >>> test_lat = 37.7418096 + >>> aprs_lat = dec2dm_lat(test_lat) + >>> aprs_lat + '3744.51N' + """ + dec_min = aprs.decimaldegrees.decimal2dm(dec) + + deg = dec_min[0] + abs_deg = abs(deg) + + if not deg == abs_deg: + suffix = 'S' + else: + suffix = 'N' + + return ''.join([str(abs_deg), "%.2f" % dec_min[1], suffix]) + + +def dec2dm_lng(dec): + """Converts DecDeg to APRS Coord format. + See: http://ember2ash.com/lat.htm + + Example: + >>> test_lng = -122.38833 + >>> aprs_lng = dec2dm_lng(test_lng) + >>> aprs_lng + '12223.30W' + """ + dec_min = aprs.decimaldegrees.decimal2dm(dec) + + deg = dec_min[0] + abs_deg = abs(deg) + + if not deg == abs_deg: + suffix = 'W' + else: + suffix = 'E' + + return ''.join([str(abs_deg), "%.2f" % dec_min[1], suffix]) + + +def decode_aprs_ascii_frame(ascii_frame): + """ + Breaks an ASCII APRS Frame down to it's constituent parts. + + :param frame: ASCII APRS Frame. + :type frame: str + + :returns: Dictionary of APRS Frame parts: source, destination, path, text. + :rtype: dict + """ + logging.debug('frame=%s', ascii_frame) + decoded_frame = {} + frame_so_far = '' + + for char in ascii_frame: + if '>' in char and 'source' not in decoded_frame: + decoded_frame['source'] = frame_so_far + frame_so_far = '' + elif ':' in char and 'path' not in decoded_frame: + decoded_frame['path'] = frame_so_far + frame_so_far = '' + else: + frame_so_far = ''.join([frame_so_far, char]) + + decoded_frame['text'] = frame_so_far + decoded_frame['destination'] = decoded_frame['path'].split(',')[0] + + return decoded_frame + +def format_path(path_list): + """ + Formats path from raw APRS KISS frame. + + :param path_list: List of path elements. + :type path_list: list + + :return: Formatted APRS path. + :rtype: str + """ + return ','.join(path_list) + + +def format_aprs_frame(frame): + """ + Formats APRS frame-as-dict into APRS frame-as-string. + + :param frame: APRS frame-as-dict + :type frame: dict + + :return: APRS frame-as-string. + :rtype: str + """ + formatted_frame = '>'.join([frame['source'], frame['destination']]) + if frame['path']: + formatted_frame = ','.join([formatted_frame, format_path(frame['path'])]) + formatted_frame += ':' + for frame_byte in frame['text']: + formatted_frame += chr(frame_byte) + return formatted_frame + +def valid_callsign(callsign): + """ + Validates callsign. + + :param callsign: Callsign to validate. + :type callsign: str + + :returns: True if valid, False otherwise. + :rtype: bool + """ + logging.debug('callsign=%s', callsign) + + if '-' in callsign: + if not callsign.count('-') == 1: + return False + else: + callsign, ssid = callsign.split('-') + else: + ssid = 0 + + logging.debug('callsign=%s ssid=%s', callsign, ssid) + + if (len(callsign) < 2 or len(callsign) > 6 or len(str(ssid)) < 1 or + len(str(ssid)) > 2): + return False + + for char in callsign: + if not (char.isalpha() or char.isdigit()): + return False + + if not str(ssid).isdigit(): + return False + + if int(ssid) < 0 or int(ssid) > 15: + return False + + return True + + +def run_doctest(): # pragma: no cover + """Runs doctests for this module.""" + import doctest + import aprs.util # pylint: disable=W0406,W0621 + return doctest.testmod(aprs.util) + + +def hash_frame(frame): + """ + Produces an integr value that acts as a hash for the frame + :param frame: A frame packet + :type frame: dict + :return: an integer representing the hash + """ + hash = 0 + index = 0 + frame_string_prefix = frame['source'] + ">" + frame['destination'] + ":" + for frame_chr in frame_string_prefix: + hash = ord(frame_chr)<<(8*(index%4)) ^ hash + index += 1 + for byte in frame['text']: + hash = byte<<(8*(index%4)) ^ hash + index += 1 + return hash + + +if __name__ == '__main__': + run_doctest() # pragma: no cover diff --git a/src/apex/kiss/__init__.py b/src/apex/kiss/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..7e9c3964dc46fa168ab383c1df5a5eeb42853b01 --- /dev/null +++ b/src/apex/kiss/__init__.py @@ -0,0 +1,37 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +# KISS Python Module. + +""" +KISS Python Module. +~~~~ + + +:author: Jeffrey Phillips Freeman WI2ARD <freemo@gmail.com> +:copyright: Copyright 2016 Syncleus, Inc. and contributors +:license: Apache License, Version 2.0 +:source: <https://github.com/syncleus/apex> + +""" + +__author__ = 'Jeffrey Phillips Freeman WI2ARD <freemo@gmail.com>' +__license__ = 'Apache License, Version 2.0' +__copyright__ = 'Copyright 2016, Syncleus, Inc. and contributors' + + +import logging +from .kiss import Kiss + + +# Set default logging handler to avoid "No handler found" warnings. +try: # Python 2.7+ + from logging import NullHandler +except ImportError: + class NullHandler(logging.Handler): + """Default logging handler to avoid "No handler found" warnings.""" + def emit(self, record): + """Default logging handler to avoid "No handler found" warnings.""" + pass + +logging.getLogger(__name__).addHandler(NullHandler()) diff --git a/src/apex/kiss/constants.py b/src/apex/kiss/constants.py new file mode 100644 index 0000000000000000000000000000000000000000..aa0b6aa10c3d5756d6af8f94d4f2e168d7b8f22e --- /dev/null +++ b/src/apex/kiss/constants.py @@ -0,0 +1,59 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +"""Constants for KISS Python Module.""" + +__author__ = 'Jeffrey Phillips Freeman WI2ARD <freemo@gmail.com>' +__license__ = 'Apache License, Version 2.0' +__copyright__ = 'Copyright 2016, Syncleus, Inc. and contributors' + + +import logging + + +LOG_LEVEL = logging.DEBUG +LOG_FORMAT = ('%(asctime)s %(levelname)s %(name)s.%(funcName)s:%(lineno)d' + ' - %(message)s') + +SERIAL_TIMEOUT = 0.01 +READ_BYTES = 1000 + +# KISS Special Characters +# http://en.wikipedia.org/wiki/KISS_(TNC)#Special_Characters +FEND = 0xC0 +FESC = 0xDB +TFEND = 0xDC +TFESC = 0xDD + +# "FEND is sent as FESC, TFEND" +FESC_TFEND = [FESC] + [TFEND] + +# "FESC is sent as FESC, TFESC" +FESC_TFESC = [FESC] + [TFESC] + +# KISS Command Codes +# http://en.wikipedia.org/wiki/KISS_(TNC)#Command_Codes +DATA_FRAME = 0x00 +TX_DELAY = 0x01 +PERSISTENCE = 0x02 +SLOT_TIME = 0x03 +TX_TAIL = 0x04 +FULL_DUPLEX = 0x05 +SET_HARDWARE = 0x06 +RETURN = 0xFF + +DEFAULT_KISS_CONFIG_VALUES = { + 'TX_DELAY': 40, + 'PERSISTENCE': 63, + 'SLOT_TIME': 20, + 'TX_TAIL': 30, + 'FULL_DUPLEX': 0, + } + +#This command will exit KISS mode +MODE_END = [192, 255, 192, 13] +# This will start kiss on a W8DED or LINK>.<NORD firmware +MODE_INIT_W8DED = [13, 27, 64, 75, 13] +MODE_INIT_LINKNORD = MODE_INIT_W8DED +#This will work for any Kenwood D710 +MODE_INIT_KENWOOD_D710 = [72,66,32,49,50,48,48,13,75,73,83,83,32,79,78,13,82,69,83,84,65,82,84,13] \ No newline at end of file diff --git a/src/apex/kiss/kiss.py b/src/apex/kiss/kiss.py new file mode 100644 index 0000000000000000000000000000000000000000..e2231731f5c256c7dd629dce5b69502d9bc8f868 --- /dev/null +++ b/src/apex/kiss/kiss.py @@ -0,0 +1,263 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +"""KISS Core Classes.""" + +__author__ = 'Jeffrey Phillips Freeman WI2ARD <freemo@gmail.com>' +__license__ = 'Apache License, Version 2.0' +__copyright__ = 'Copyright 2016, Syncleus, Inc. and contributors' + + +import logging +import serial +import socket +import kiss.constants +import kiss.util + + +class Kiss(object): + + """KISS Object Class.""" + + logger = logging.getLogger(__name__) + logger.setLevel(kiss.constants.LOG_LEVEL) + console_handler = logging.StreamHandler() + console_handler.setLevel(kiss.constants.LOG_LEVEL) + formatter = logging.Formatter(kiss.constants.LOG_FORMAT) + console_handler.setFormatter(formatter) + logger.addHandler(console_handler) + logger.propagate = False + + frame_buffer = [] + + def __init__(self, com_port=None, baud=38400, parity=serial.PARITY_NONE, stop_bits=serial.STOPBITS_ONE, byte_size=serial.EIGHTBITS, host=None, tcp_port=8000, strip_df_start=True): + self.com_port = com_port + self.baud = baud + self.parity = parity + self.stop_bits = stop_bits + self.byte_size = byte_size + self.host = host + self.tcp_port = tcp_port + self.interface = None + self.interface_mode = None + self.strip_df_start = strip_df_start + self.exit_kiss = False + + if self.com_port is not None: + self.interface_mode = 'serial' + elif self.host is not None: + self.interface_mode = 'tcp' + if self.interface_mode is None: + raise Exception('Must set port/speed or host/tcp_port.') + + self.logger.info('Using interface_mode=%s', self.interface_mode) + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + if 'tcp' in self.interface_mode: + self.interface.shutdown() + elif self.interface and self.interface.isOpen(): + self.interface.close() + + def __del__(self): + if self.interface and self.interface.isOpen(): + self.interface.close() + + def __read_interface(self): + if 'tcp' in self.interface_mode: + return self.interface.recv(kiss.constants.READ_BYTES) + elif 'serial' in self.interface_mode: + read_data = self.interface.read(kiss.constants.READ_BYTES) + waiting_data = self.interface.inWaiting() + if waiting_data: + read_data += self.interface.read(waiting_data) + return read_data + + @staticmethod + def __strip_df_start(frame): + """ + Strips KISS DATA_FRAME start (0x00) and newline from frame. + + :param frame: APRS/AX.25 frame. + :type frame: str + :returns: APRS/AX.25 frame sans DATA_FRAME start (0x00). + :rtype: str + """ + while frame[0] is kiss.constants.DATA_FRAME: + del frame[0] + while chr(frame[0]).isspace(): + del frame[0] + while chr(frame[-1]).isspace(): + del frame[-1] + return frame + + @staticmethod + def __escape_special_codes(raw_code_bytes): + """ + Escape special codes, per KISS spec. + + "If the FEND or FESC codes appear in the data to be transferred, they + need to be escaped. The FEND code is then sent as FESC, TFEND and the + FESC is then sent as FESC, TFESC." + - http://en.wikipedia.org/wiki/KISS_(TNC)#Description + """ + encoded_bytes = [] + for raw_code_byte in raw_code_bytes: + if raw_code_byte is kiss.constants.FESC: + encoded_bytes += kiss.constants.FESC_TFESC + elif raw_code_byte is kiss.constants.FEND: + encoded_bytes += kiss.constants.FESC_TFEND + else: + encoded_bytes += [raw_code_byte]; + return encoded_bytes + + @staticmethod + def __command_byte_combine(port, command_code): + """ + Constructs the command byte for the tnc which includes the tnc port and command code. + :param port: integer from 0 to 127 indicating the TNC port + :type port: int + :param command_code: A command code constant, a value from 0 to 127 + :type command_code: int + :return: An integer combining the two values into a single byte + """ + if port > 127 or port < 0: + raise Exception("port out of range") + elif command_code > 127 or command_code < 0: + raise Exception("command_Code out of range") + return (port<<4) & command_code + + def start(self, mode_init=None, **kwargs): + """ + Initializes the KISS device and commits configuration. + + See http://en.wikipedia.org/wiki/KISS_(TNC)#Command_codes + for configuration names. + + :param **kwargs: name/value pairs to use as initial config values. + """ + self.logger.debug("kwargs=%s", kwargs) + + if 'tcp' in self.interface_mode: + address = (self.host, self.tcp_port) + self.interface = socket.create_connection(address) + elif 'serial' in self.interface_mode: + self.interface = serial.Serial(port=self.com_port, baudrate=self.baud, parity=self.parity, stopbits=self.stop_bits, bytesize=self.byte_size) + self.interface.timeout = kiss.constants.SERIAL_TIMEOUT + if mode_init is not None: + self.interface.write(mode_init) + self.exit_kiss = True + + # Previous verious defaulted to Xastir-friendly configs. Unfortunately + # those don't work with Bluetooth TNCs, so we're reverting to None. + if 'serial' in self.interface_mode and kwargs: + for name, value in kwargs.items(): + self.write_setting(name, value) + + # If no settings specified, default to config values similar + # to those that ship with Xastir. + #if not kwargs: + # kwargs = kiss.constants.DEFAULT_KISS_CONFIG_VALUES + + def close(self): + if self.exit_kiss is True: + self.interface.write(kiss.constants.MODE_END) + + + def write_setting(self, name, value): + """ + Writes KISS Command Codes to attached device. + + http://en.wikipedia.org/wiki/KISS_(TNC)#Command_Codes + + :param name: KISS Command Code Name as a string. + :param value: KISS Command Code Value to write. + """ + self.logger.debug('Configuring %s = %s', name, repr(value)) + + # Do the reasonable thing if a user passes an int + if isinstance(value, int): + value = chr(value) + + return self.interface.write( + kiss.constants.FEND + + getattr(kiss.constants, name.upper()) + + Kiss.__escape_special_codes(value) + + kiss.constants.FEND + ) + + def fill_buffer(self): + """ + Reads any pending data in the interface and stores it in the frame_buffer + """ + + new_frames = [] + read_buffer = [] + read_data = self.__read_interface() + while read_data is not None and len(read_data): + split_data = [[]] + for read_byte in read_data: + if read_byte is kiss.constants.FEND: + #split_data_index += 1 + split_data.append([]) + else: + split_data[-1].append(read_byte) + len_fend = len(split_data) + + # No FEND in frame + if len_fend == 1: + read_buffer += split_data[0] + # Single FEND in frame + elif len_fend == 2: + # Closing FEND found + if split_data[0]: + # Partial frame continued, otherwise drop + new_frames.append(read_buffer + split_data[0]) + read_buffer = [] + # Opening FEND found + else: + new_frames.append(read_buffer) + read_buffer = split_data[1] + # At least one complete frame received + elif len_fend >= 3: + for i in range(0, len_fend - 1): + read_buffer_tmp = read_buffer + split_data[i] + if len(read_buffer_tmp) is not 0: + new_frames.append(read_buffer_tmp) + read_buffer = [] + if split_data[len_fend - 1]: + read_buffer = split_data[len_fend - 1] + # Get anymore data that is waiting + read_data = self.__read_interface() + + for new_frame in new_frames: + if len(new_frame) and new_frame[0] == 0: + if self.strip_df_start: + new_frame = Kiss.__strip_df_start(new_frame) + self.frame_buffer.append(new_frame) + + def read(self): + if not len(self.frame_buffer): + self.fill_buffer() + + if len(self.frame_buffer): + return_frame = self.frame_buffer[0] + del self.frame_buffer[0] + return return_frame + else: + return None + + def write(self, frame_bytes, port=0): + """ + Writes frame to KISS interface. + + :param frame: Frame to write. + """ + kiss_packet = [kiss.constants.FEND] + [Kiss.__command_byte_combine(port, kiss.constants.DATA_FRAME)] + Kiss.__escape_special_codes(frame_bytes) + [kiss.constants.FEND] + + if 'tcp' in self.interface_mode: + return self.interface.send(bytearray(kiss_packet)) + elif 'serial' in self.interface_mode: + return self.interface.write(kiss_packet) diff --git a/src/apex/kiss/util.py b/src/apex/kiss/util.py new file mode 100644 index 0000000000000000000000000000000000000000..a5301d27c4a57461aaa7b7bd1be90921c371f4c0 --- /dev/null +++ b/src/apex/kiss/util.py @@ -0,0 +1,27 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +"""Utilities for the KISS Python Module.""" + +__author__ = 'Jeffrey Phillips Freeman WI2ARD <freemo@gmail.com>' +__license__ = 'Apache License, Version 2.0' +__copyright__ = 'Copyright 2016, Syncleus, Inc. and contributors' + + +import kiss.constants + +def extract_ui(frame): + """ + Extracts the UI component of an individual frame. + + :param frame: APRS/AX.25 frame. + :type frame: str + :returns: UI component of frame. + :rtype: str + """ + start_ui = frame.split( + ''.join([kiss.constants.FEND, kiss.constants.DATA_FRAME])) + end_ui = start_ui[0].split(''.join([kiss.constants.SLOT_TIME, chr(0xF0)])) + return ''.join([chr(ord(x) >> 1) for x in end_ui[0]]) + + diff --git a/src/apex/pluginloader.py b/src/apex/pluginloader.py new file mode 100644 index 0000000000000000000000000000000000000000..5025862c6e23eb9811b89caddc21d83f936c5377 --- /dev/null +++ b/src/apex/pluginloader.py @@ -0,0 +1,21 @@ +import importlib +import importlib.util +import importlib.machinery +import os + +PluginFolder = "./plugins" +MainModule = "__init__" + +def getPlugins(): + plugins = [] + possibleplugins = os.listdir(PluginFolder) + for i in possibleplugins: + location = os.path.join(PluginFolder, i) + if not os.path.isdir(location) or not MainModule + ".py" in os.listdir(location): + continue + + plugins.append(i) + return plugins + +def loadPlugin(plugin): + return importlib.import_module("plugins." + plugin) \ No newline at end of file diff --git a/src/apex/plugins/apexparadigm/__init__.py b/src/apex/plugins/apexparadigm/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..57ce6728459f122c121e4670a5af7d8a8810a6a5 --- /dev/null +++ b/src/apex/plugins/apexparadigm/__init__.py @@ -0,0 +1,260 @@ +import aprs.util +import copy +import re + +plugin = None + +def start(config, port_map, packet_cache, aprsis): + global plugin + plugin = ApexParadigmPlugin(config, port_map, packet_cache, aprsis) + plugin.run() + +def handle_packet(frame, recv_port, recv_port_name): + global plugin + plugin.handle_packet(frame, recv_port, recv_port_name) + +class ApexParadigmPlugin(object): + + BAND_PATH_REGEX = re.compile(r'(\d{1,4})M(\d{0,3})') + + def __init__(self, config, port_map, packet_cache, aprsis): + self.port_map = port_map + self.packet_cache = packet_cache + self.aprsis = aprsis + + def __passive_digipeat(self, frame, recv_port, recv_port_name): + # Can't digipeat anything when you are the source + for port in self.port_map.values(): + if frame['source'] == port['identifier']: + return + + # can't digipeat things we already digipeated. + for hop in frame['path']: + if hop.startswith('WI2ARD') and hop.endswith('*'): + return + + for hop_index in range(0,len(frame['path'])): + hop = frame['path'][hop_index] + if hop[-1] is not '*': + split_hop = hop.split('-') + node = split_hop[0].upper() + if len(split_hop) >= 2 and split_hop[1]: + ssid = int(split_hop[1]) + else: + ssid = 0 + + band_path = None + band_path_net = None + band_match = self.BAND_PATH_REGEX.match(node) + if band_match is not None: + band_path = band_match.group(1) + band_path_net = band_match.group(2) + + for port_name in self.port_map.keys(): + port = self.port_map[port_name] + split_port_identifier = port['identifier'].split('-') + port_callsign = split_port_identifier[0].upper() + if len(split_port_identifier) >= 2 and split_port_identifier[1]: + port_ssid = int(split_port_identifier[1]) + else: + port_ssid = 0 + + if band_path: + if band_path_net: + if node == port['net']: + frame['path'] = frame['path'][:hop_index] + [recv_port['identifier'] + '*'] + [hop + "*"] + frame['path'][hop_index+1:] + frame_hash = aprs.util.hash_frame(frame) + if not frame_hash in self.packet_cache.values(): + self.packet_cache[str(frame_hash)] = frame_hash + port['tnc'].write(frame, port['tnc_port']) + self.aprsis.send(frame) + print(port_name + " >> " + aprs.util.format_aprs_frame(frame)) + return + else: + if port['net'].startswith(node): + frame['path'] = frame['path'][:hop_index] + [recv_port['identifier'] + '*'] + [hop + "*"] + frame['path'][hop_index+1:] + frame_hash = aprs.util.hash_frame(frame) + if not frame_hash in self.packet_cache.values(): + self.packet_cache[str(frame_hash)] = frame_hash + port['tnc'].write(frame, port['tnc_port']) + self.aprsis.send(frame) + print(port_name + " >> " + aprs.util.format_aprs_frame(frame)) + return + if node == port_callsign and ssid == port_ssid: + if ssid is 0: + frame['path'][hop_index] = port_callsign + '*' + else: + frame['path'][hop_index] = port['identifier'] + '*' + frame_hash = aprs.util.hash_frame(frame) + if not frame_hash in self.packet_cache.values(): + self.packet_cache[str(frame_hash)] = frame_hash + port['tnc'].write(frame, port['tnc_port']) + self.aprsis.send(frame) + print(port_name + " >> " + aprs.util.format_aprs_frame(frame)) + return + elif node == "GATE" and port['net'].startswith("2M"): + frame['path'] = frame['path'][:hop_index] + [recv_port['identifier'] + '*'] + [node + "*"] + frame['path'][hop_index+1:] + frame_hash = aprs.util.hash_frame(frame) + if not frame_hash in self.packet_cache.values(): + self.packet_cache[str(frame_hash)] = frame_hash + port['tnc'].write(frame, port['tnc_port']) + self.aprsis.send(frame) + print(port_name + " >> " + aprs.util.format_aprs_frame(frame)) + return + if node.startswith('WIDE') and ssid > 1: + frame['path'] = frame['path'][:hop_index] + [recv_port['identifier'] + '*'] + [node + "-" + str(ssid-1)] + frame['path'][hop_index+1:] + frame_hash = aprs.util.hash_frame(frame) + if not frame_hash in self.packet_cache.values(): + self.packet_cache[str(frame_hash)] = frame_hash + recv_port['tnc'].write(frame, recv_port['tnc_port']) + self.aprsis.send(frame) + print(recv_port_name + " >> " + aprs.util.format_aprs_frame(frame)) + return + elif node.startswith('WIDE') and ssid is 1: + frame['path'] = frame['path'][:hop_index] + [recv_port['identifier'] + '*'] + [node + "*"] + frame['path'][hop_index+1:] + frame_hash = aprs.util.hash_frame(frame) + if not frame_hash in self.packet_cache.values(): + self.packet_cache[str(frame_hash)] = frame_hash + recv_port['tnc'].write(frame, recv_port['tnc_port']) + self.aprsis.send(frame) + print(recv_port_name + " >> " + aprs.util.format_aprs_frame(frame)) + return + elif node.startswith('WIDE') and ssid is 0: + frame['path'][hop_index] = node + "*" + # no return + else: + #If we didnt digipeat it then we didn't modify the frame, send it to aprsis as-is + self.aprsis.send(frame) + return + + def __preemptive_digipeat(self, frame, recv_port, recv_port_name): + # Can't digipeat anything when you are the source + for port in self.port_map.values(): + if frame['source'] == port['identifier']: + return + + # can't digipeat things we already digipeated. + for hop in frame['path']: + if hop.startswith('WI2ARD') and hop.endswith('*'): + return + + selected_hop = {} + for hop_index in reversed(range(0, len(frame['path']))): + hop = frame['path'][hop_index] + # If this is the last node before a spent node, or a spent node itself, we are done + if hop[-1] == '*' or frame['path'][hop_index-1][-1] == '*': + break + split_hop = hop.split('-') + node = split_hop[0].upper() + if len(split_hop) >= 2 and split_hop[1]: + ssid = int(split_hop[1]) + else: + continue + + band_path = None + band_path_net = None + band_match = self.BAND_PATH_REGEX.match(node) + if band_match is not None: + band_path = band_match.group(1) + band_path_net = band_match.group(2) + + if not band_path: + continue; + + for port_name in self.port_map.keys(): + port = self.port_map[port_name] + if band_path_net and node == port['net']: + # only when a ssid is present should it be treated preemptively if it is a band path + if not selected_hop: + selected_hop['index'] = hop_index + selected_hop['hop'] = hop + selected_hop['node'] = node + selected_hop['ssid'] = ssid + selected_hop['port_name'] = port_name + selected_hop['port'] = port + selected_hop['band_path'] = band_path + selected_hop['band_path_net'] = band_path_net + elif ssid > selected_hop['ssid']: + selected_hop['index'] = hop_index + selected_hop['hop'] = hop + selected_hop['node'] = node + selected_hop['ssid'] = ssid + selected_hop['port_name'] = port_name + selected_hop['port'] = port + selected_hop['band_path'] = band_path + selected_hop['band_path_net'] = band_path_net + elif not band_path_net and port['net'].startswith(band_path): + # only when a ssid is present should it be treated preemptively if it is a band path + if not selected_hop: + selected_hop['index'] = hop_index + selected_hop['hop'] = hop + selected_hop['node'] = node + selected_hop['ssid'] = ssid + selected_hop['port_name'] = port_name + selected_hop['port'] = port + selected_hop['band_path'] = band_path + selected_hop['band_path_net'] = band_path_net + elif ssid > selected_hop['ssid']: + selected_hop['index'] = hop_index + selected_hop['hop'] = hop + selected_hop['node'] = node + selected_hop['ssid'] = ssid + selected_hop['port_name'] = port_name + selected_hop['port'] = port + selected_hop['band_path'] = band_path + selected_hop['band_path_net'] = band_path_net + for hop_index in reversed(range(0, len(frame['path']))): + hop = frame['path'][hop_index] + # If this is the last node before a spent node, or a spent node itself, we are done + if hop[-1] == '*' or frame['path'][hop_index-1][-1] == '*': + break + elif selected_hop and selected_hop['index'] <= hop_index: + break + + for port_name in self.port_map.keys(): + port = self.port_map[port_name] + + # since the callsign specifically was specified in the path after the band-path the callsign takes + # precedence + if port['identifier'] == hop: + selected_hop['index'] = hop_index + selected_hop['hop'] = hop + selected_hop['node'] = node + selected_hop['ssid'] = ssid + selected_hop['port_name'] = port_name + selected_hop['port'] = port + selected_hop['band_path'] = None + selected_hop['band_path_net'] = None + + if not selected_hop: + return + + #now lets digipeat this packet + new_path=[] + for hop_index in range(0, len(frame['path'])): + hop = frame['path'][hop_index] + if hop[-1] != '*': + if hop_index == selected_hop['index']: + if selected_hop['band_path'] is None: + new_path += [hop + "*"] + else: + new_path += [selected_hop['port']['identifier'] + "*"] + [hop + "*"] + elif hop_index > selected_hop['index']: + new_path += [hop] + else: + new_path += [hop] + frame['path'] = new_path + frame_hash = aprs.util.hash_frame(frame) + if not frame_hash in self.packet_cache.values(): + self.packet_cache[str(frame_hash)] = frame_hash + selected_hop['port']['tnc'].write(frame, selected_hop['port']['tnc_port']) + self.aprsis.send(frame) + print(selected_hop['port_name'] + " >> " + aprs.util.format_aprs_frame(frame)) + return + + def run(self): + return + + def handle_packet(self, frame, recv_port, recv_port_name): + self.__preemptive_digipeat(copy.deepcopy(frame), recv_port, recv_port_name) + self.__passive_digipeat(copy.deepcopy(frame), recv_port, recv_port_name) \ No newline at end of file diff --git a/src/apex/plugins/beacon/__init__.py b/src/apex/plugins/beacon/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..79f8074a7ac0f6ecccfbdcbc722606a7d60768d0 --- /dev/null +++ b/src/apex/plugins/beacon/__init__.py @@ -0,0 +1,42 @@ +import aprs.util +import time + +plugin = None + +def start(config, port_map, packet_cache, aprsis): + global plugin + plugin = BeaconPlugin(config, port_map, packet_cache, aprsis) + plugin.run() + +def handle_packet(frame, recv_port, recv_port_name): + return + +class BeaconPlugin( object ): + + def __init__(self, config, port_map, packet_cache, aprsis): + self.port_map = port_map + self.packet_cache = packet_cache + self.aprsis = aprsis + + for section in config.sections(): + if section.startswith("TNC "): + tnc_name = section.split(" ")[1] + for port_id in range(1, 1+int(config.get(section, 'port_count'))): + port_name = tnc_name + '-' + str(port_id) + port = port_map[port_name] + port_section = 'PORT ' + port_name + port['beacon_text'] = config.get(port_section, 'beacon_text') + port['beacon_path'] = config.get(port_section, 'beacon_path') + + def run(self): + while 1 : + for port_name in self.port_map.keys(): + port = self.port_map[port_name] + + beacon_frame = {'source':port['identifier'], 'destination': 'APRS', 'path':port['beacon_path'].split(','), 'text': list(port['beacon_text'].encode('ascii'))} + frame_hash = aprs.util.hash_frame(beacon_frame) + if not frame_hash in self.packet_cache.values(): + self.packet_cache[str(frame_hash)] = frame_hash + port['tnc'].write(beacon_frame, port['tnc_port']) + print(port_name + " >> " + aprs.util.format_aprs_frame(beacon_frame)) + time.sleep(600) \ No newline at end of file diff --git a/src/apex/plugins/id/__init__.py b/src/apex/plugins/id/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..14e10e688aa45bd9661ff33c732e83f34736f4f8 --- /dev/null +++ b/src/apex/plugins/id/__init__.py @@ -0,0 +1,43 @@ +import aprs.util +import time + +plugin = None + +def start(config, port_map, packet_cache, aprsis): + global plugin + plugin = IdPlugin(config, port_map, packet_cache, aprsis) + plugin.run() + +def handle_packet(frame, recv_port, recv_port_name): + return + +class IdPlugin(object): + + def __init__(self, config, port_map, packet_cache, aprsis): + self.port_map = port_map + self.packet_cache = packet_cache + self.aprsis = aprsis + + for section in config.sections(): + if section.startswith("TNC "): + tnc_name = section.split(" ")[1] + for port_id in range(1, 1+int(config.get(section, 'port_count'))): + port_name = tnc_name + '-' + str(port_id) + port = port_map[port_name] + port_section = 'PORT ' + port_name + port['id_text'] = config.get(port_section, 'id_text') + port['id_path'] = config.get(port_section, 'id_path') + + def run(self): + time.sleep(30) + while 1 : + for port_name in self.port_map.keys(): + port = self.port_map[port_name] + + id_frame = {'source':port['identifier'], 'destination': 'ID', 'path':port['id_path'].split(','), 'text': list(port['id_text'].encode('ascii'))} + frame_hash = aprs.util.hash_frame(id_frame) + if not frame_hash in self.packet_cache.values(): + self.packet_cache[str(frame_hash)] = frame_hash + port['tnc'].write(id_frame, port['tnc_port']) + print(port_name + " >> " + aprs.util.format_aprs_frame(id_frame)) + time.sleep(600) diff --git a/src/apex/plugins/status/__init__.py b/src/apex/plugins/status/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..39b99d7c5608aa40484b6df1832074b90ba25f59 --- /dev/null +++ b/src/apex/plugins/status/__init__.py @@ -0,0 +1,45 @@ +import aprs.util +import time + +plugin = None + +def start(config, port_map, packet_cache, aprsis): + global plugin + plugin = StatusPlugin(config, port_map, packet_cache, aprsis) + plugin.run() + +def handle_packet(frame, recv_port, recv_port_name): + return + +class StatusPlugin(object): + + def __init__(self, config, port_map, packet_cache, aprsis): + self.port_map = port_map + self.packet_cache = packet_cache + self.aprsis = aprsis + + for section in config.sections(): + if section.startswith("TNC "): + tnc_name = section.split(" ")[1] + kiss_tnc = None + for port_id in range(1, 1+int(config.get(section, 'port_count'))): + port_name = tnc_name + '-' + str(port_id) + port = port_map[port_name] + port_section = 'PORT ' + port_name + port['status_text'] = config.get(port_section, 'status_text') + port['status_path'] = config.get(port_section, 'status_path') + + + def run(self): + time.sleep(60) + while 1 : + for port_name in self.port_map.keys(): + port = self.port_map[port_name] + + status_frame = {'source':port['identifier'], 'destination': 'APRS', 'path':port['status_path'].split(','), 'text': list(port['status_text'].encode('ascii'))} + frame_hash = aprs.util.hash_frame(status_frame) + if not frame_hash in self.packet_cache.values(): + self.packet_cache[str(frame_hash)] = frame_hash + port['tnc'].write(status_frame, port['tnc_port']) + print(port_name + " >> " + aprs.util.format_aprs_frame(status_frame)) + time.sleep(600) diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..4d41b251d5dd94996c042cbfe2e7bda2b82adb8c --- /dev/null +++ b/tests/__init__.py @@ -0,0 +1 @@ +"""Tests for APEX Python Module.""" diff --git a/tests/constants.py b/tests/constants.py new file mode 100644 index 0000000000000000000000000000000000000000..f317007446131e7daa2b068f0ae1a6df965c15bb --- /dev/null +++ b/tests/constants.py @@ -0,0 +1,11 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +"""Constants for APEX Module Tests.""" + +__author__ = 'Jeffrey Phillips Freeman WI2ARD <freemo@gmail.com>' +__license__ = 'Apache License, Version 2.0' +__copyright__ = 'Copyright 2016, Syncleus, Inc. and contributors' + + +TEST_FRAMES = 'tests/test_frames.log' diff --git a/tests/context.py b/tests/context.py new file mode 100644 index 0000000000000000000000000000000000000000..a1034f63056d884d6e628758bc2854ba4592dc88 --- /dev/null +++ b/tests/context.py @@ -0,0 +1,17 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +"""Context for tests for APRS Python Module.""" + +__author__ = 'Jeffrey Phillips Freeman WI2ARD <freemo@gmail.com>' +__license__ = 'Apache License, Version 2.0' +__copyright__ = 'Copyright 2016, Syncleus, Inc. and contributors' + + +import os +import sys + +sys.path.insert(0, os.path.abspath('..')) + +import aprs # pylint: disable=W0611 +import kiss diff --git a/tests/test_aprs.py b/tests/test_aprs.py new file mode 100644 index 0000000000000000000000000000000000000000..4716950004e55b64510593c3d2118b0f20693c3a --- /dev/null +++ b/tests/test_aprs.py @@ -0,0 +1,149 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +"""Tests for Python APRS-IS Bindings.""" +import aprs.aprs_internet_service + +__author__ = 'Jeffrey Phillips Freeman WI2ARD <freemo@gmail.com>' +__license__ = 'Apache License, Version 2.0' +__copyright__ = 'Copyright 2016, Syncleus, Inc. and contributors' + + +import random +import unittest +import logging +import logging.handlers + +import httpretty + +from .context import aprs + + +ALPHABET = 'ABCDEFGHIJKLMNOPQRSTUVWXYZ' +NUMBERS = '0123456789' +POSITIVE_NUMBERS = NUMBERS[1:] +ALPHANUM = ''.join([ALPHABET, NUMBERS]) + + +class APRSTest(unittest.TestCase): # pylint: disable=R0904 + """Tests for Python APRS-IS Bindings.""" + + logger = logging.getLogger(__name__) + logger.setLevel(aprs.constants.LOG_LEVEL) + console_handler = logging.StreamHandler() + console_handler.setLevel(aprs.constants.LOG_LEVEL) + formatter = logging.Formatter(aprs.constants.LOG_FORMAT) + console_handler.setFormatter(formatter) + logger.addHandler(console_handler) + logger.propagate = False + + @classmethod + def random(cls, length=8, alphabet=ALPHANUM): + """ + Generates a random string for test cases. + + :param length: Length of string to generate. + :param alphabet: Alphabet to use to create string. + :type length: int + :type alphabet: str + """ + return ''.join(random.choice(alphabet) for _ in xrange(length)) + + def setUp(self): # pylint: disable=C0103 + self.fake_server = ''.join([ + 'http://localhost:', + self.random(4, POSITIVE_NUMBERS), + '/' + ]) + + self.fake_callsign = ''.join([ + self.random(1, 'KWN'), + self.random(1, NUMBERS), + self.random(3, ALPHABET), + '-', + self.random(1, POSITIVE_NUMBERS) + ]) + + self.real_server = 'http://localhost:14580' + self.real_callsign = '-'.join(['W2GMD', self.random(1, '123456789')]) + + self.logger.debug( + "fake_server=%s fake_callsign=%s", + self.fake_server, + self.fake_callsign + ) + + @httpretty.httprettified + def test_fake_good_auth(self): + """ + Tests authenticating against APRS-IS using a valid call+pass. + """ + httpretty.HTTPretty.register_uri( + httpretty.HTTPretty.POST, + self.fake_server, + status=204 + ) + + aprs_conn = aprs.aprs_internet_service.AprsInternetService( + user=self.fake_callsign, + input_url=self.fake_server + ) + aprs_conn.connect() + + msg = '>'.join([ + self.fake_callsign, + 'APRS,TCPIP*:=3745.00N/12227.00W-Simulated Location' + ]) + self.logger.debug(locals()) + + result = aprs_conn.send(msg) + + self.assertTrue(result) + + @httpretty.httprettified + def test_fake_bad_auth_http(self): + """ + Tests authenticating against APRS-IS using an invalid call+pass. + """ + httpretty.HTTPretty.register_uri( + httpretty.HTTPretty.POST, + self.fake_server, + status=401 + ) + + aprs_conn = aprs.aprs_internet_service.AprsInternetService( + user=self.fake_callsign, + input_url=self.fake_server + ) + aprs_conn.connect() + + msg = '>'.join([ + self.fake_callsign, + 'APRS,TCPIP*:=3745.00N/12227.00W-Simulated Location' + ]) + self.logger.debug(locals()) + + result = aprs_conn.send(msg, protocol='HTTP') + + self.assertFalse(result) + + @unittest.skip('Test only works with real server.') + def test_more(self): + """ + Tests APRS-IS binding against a real APRS-IS server. + """ + aprs_conn = aprs.aprs_internet_service.AprsInternetService( + user=self.real_callsign, + input_url=self.real_server + ) + aprs_conn.connect() + + msg = '>'.join([ + self.real_callsign, + 'APRS,TCPIP*:=3745.00N/12227.00W-Simulated Location' + ]) + self.logger.debug(locals()) + + result = aprs_conn.send(msg) + + self.assertFalse(result) diff --git a/tests/test_frames.log b/tests/test_frames.log new file mode 100644 index 0000000000000000000000000000000000000000..9a45d9737ae6f85a7af55d31ee225d8f3938fd6f --- /dev/null +++ b/tests/test_frames.log @@ -0,0 +1,2 @@ +‚ ¤°dh`®dŽšˆ@l®’ˆŠb@cð!3745.75NI12228.05W#W2GMD-6 Inner Sunset, SF iGate/Digipeater http://w2gmd.org +‚ ¤°dh`®dŽšˆ@l®’ˆŠb@cðT#939,10.9,4.5,57.0,1.0,18.0,00000000 diff --git a/tests/test_kiss_util.py b/tests/test_kiss_util.py new file mode 100644 index 0000000000000000000000000000000000000000..5dd72b54d08f198a4557d5b3223ba162079e174e --- /dev/null +++ b/tests/test_kiss_util.py @@ -0,0 +1,55 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +"""Tests for KISS Util Module.""" + +__author__ = 'Jeffrey Phillips Freeman WI2ARD <freemo@gmail.com>' +__license__ = 'Apache License, Version 2.0' +__copyright__ = 'Copyright 2016, Syncleus, Inc. and contributors' + + +import unittest + +from .context import kiss + +from . import constants + + +# pylint: disable=R0904,C0103 +class KISSUtilTestCase(unittest.TestCase): + + """Test class for KISS Python Module.""" + + def setUp(self): + """Setup.""" + self.test_frames = open(constants.TEST_FRAMES, 'r') + self.test_frame = self.test_frames.readlines()[0].strip() + + def tearDown(self): + """Teardown.""" + self.test_frames.close() + + def test_escape_special_codes_fend(self): + """ + Tests `kiss.util.escape_special_codes` util function. + """ + fend = kiss.util.escape_special_codes(kiss.constants.FEND) + self.assertEqual(fend, kiss.constants.FESC_TFEND) + + def test_escape_special_codes_fesc(self): + """ + Tests `kiss.util.escape_special_codes` util function. + """ + fesc = kiss.util.escape_special_codes(kiss.constants.FESC) + self.assertEqual(fesc, kiss.constants.FESC_TFESC) + + def test_extract_ui(self): + """ + Tests `kiss.util.extract_ui` util function. + """ + frame_ui = kiss.util.extract_ui(self.test_frame) + self.assertEqual('APRX240W2GMD 6WIDE1 1', frame_ui) + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/test_util.py b/tests/test_util.py new file mode 100644 index 0000000000000000000000000000000000000000..a593fc14fa15cb8a69c2e2c02f09262abf646cf8 --- /dev/null +++ b/tests/test_util.py @@ -0,0 +1,372 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +"""Tests for Python APRS util methods.""" + +__author__ = 'Jeffrey Phillips Freeman WI2ARD <freemo@gmail.com>' +__license__ = 'Apache License, Version 2.0' +__copyright__ = 'Copyright 2016, Syncleus, Inc. and contributors' + + +import unittest +import logging +import logging.handlers + +from .context import aprs + +from . import constants + + +ALPHABET = 'ABCDEFGHIJKLMNOPQRSTUVWXYZ' +NUMBERS = '0123456789' +POSITIVE_NUMBERS = NUMBERS[1:] +ALPHANUM = ''.join([ALPHABET, NUMBERS]) + +VALID_CALLSIGNS = ['W2GMD', 'W2GMD-1', 'KF4MKT', 'KF4MKT-1', 'KF4LZA-15'] +INVALID_CALLSIGNS = ['xW2GMDx', 'W2GMD-16', 'W2GMD-A', 'W', 'W2GMD-1-0', + 'W*GMD', 'W2GMD-123'] + + +class APRSUtilTestCase(unittest.TestCase): # pylint: disable=R0904 + """Tests for Python APRS Utils.""" + + logger = logging.getLogger(__name__) + logger.setLevel(aprs.constants.LOG_LEVEL) + console_handler = logging.StreamHandler() + console_handler.setLevel(aprs.constants.LOG_LEVEL) + formatter = logging.Formatter(aprs.constants.LOG_FORMAT) + console_handler.setFormatter(formatter) + logger.addHandler(console_handler) + logger.propagate = False + + def setUp(self): # pylint: disable=C0103 + """Setup.""" + self.test_frames = open(constants.TEST_FRAMES, 'r') + self.test_frame = self.test_frames.readlines()[0].strip() + + def tearDown(self): # pylint: disable=C0103 + """Teardown.""" + self.test_frames.close() + + def test_latitude_north(self): + """Test Decimal to APRS Latitude conversion. + + Spec per ftp://ftp.tapr.org/aprssig/aprsspec/spec/aprs101/APRS101.pdf + -- + Latitude is expressed as a fixed 8-character field, in degrees and + decimal minutes (to two decimal places), followed by the letter N for + north or S for south. Latitude degrees are in the range 00 to 90. + Latitude minutes are expressed as whole minutes and hundredths of a + minute, separated by a decimal point. + + For example: + + 4903.50N is 49 degrees 3 minutes 30 seconds north. + + In generic format examples, the latitude is shown as the 8-character + string ddmm.hhN (i.e. degrees, minutes and hundredths of a minute + north). + """ + test_lat = 37.7418096 + aprs_lat = aprs.util.dec2dm_lat(test_lat) + self.logger.debug('aprs_lat=%s', aprs_lat) + + lat_deg = int(aprs_lat.split('.')[0][:1]) + # lat_hsec = aprs_lat.split('.')[1] + + self.assertTrue(len(aprs_lat) == 8) + self.assertTrue(lat_deg >= 00) + self.assertTrue(lat_deg <= 90) + self.assertTrue(aprs_lat.endswith('N')) + + def test_latitude_south(self): + """Test Decimal to APRS Latitude conversion. + + Spec per ftp://ftp.tapr.org/aprssig/aprsspec/spec/aprs101/APRS101.pdf + -- + Latitude is expressed as a fixed 8-character field, in degrees and + decimal minutes (to two decimal places), followed by the letter N for + north or S for south. Latitude degrees are in the range 00 to 90. + Latitude minutes are expressed as whole minutes and hundredths of a + minute, separated by a decimal point. + + For example: + + 4903.50N is 49 degrees 3 minutes 30 seconds north. + + In generic format examples, the latitude is shown as the 8-character + string ddmm.hhN (i.e. degrees, minutes and hundredths of a minute + north). + """ + test_lat = -37.7418096 + aprs_lat = aprs.util.dec2dm_lat(test_lat) + self.logger.debug('aprs_lat=%s', aprs_lat) + + lat_deg = int(aprs_lat.split('.')[0][:1]) + # lat_hsec = aprs_lat.split('.')[1] + + self.assertTrue(len(aprs_lat) == 8) + self.assertTrue(lat_deg >= 00) + self.assertTrue(lat_deg <= 90) + self.assertTrue(aprs_lat.endswith('S')) + + def test_longitude_west(self): + """Test Decimal to APRS Longitude conversion. + + Spec per ftp://ftp.tapr.org/aprssig/aprsspec/spec/aprs101/APRS101.pdf + -- + Longitude is expressed as a fixed 9-character field, in degrees and + decimal minutes (to two decimal places), followed by the letter E for + east or W for west. + + Longitude degrees are in the range 000 to 180. Longitude minutes are + expressed as whole minutes and hundredths of a minute, separated by a + decimal point. + + For example: + + 07201.75W is 72 degrees 1 minute 45 seconds west. + + In generic format examples, the longitude is shown as the 9-character + string dddmm.hhW (i.e. degrees, minutes and hundredths of a minute + west). + """ + test_lng = -122.38833 + aprs_lng = aprs.util.dec2dm_lng(test_lng) + self.logger.debug('aprs_lng=%s', aprs_lng) + + lng_deg = int(aprs_lng.split('.')[0][:2]) + # lng_hsec = aprs_lng.split('.')[1] + + self.assertTrue(len(aprs_lng) == 9) + self.assertTrue(lng_deg >= 000) + self.assertTrue(lng_deg <= 180) + self.assertTrue(aprs_lng.endswith('W')) + + def test_longitude_east(self): + """Test Decimal to APRS Longitude conversion. + + Spec per ftp://ftp.tapr.org/aprssig/aprsspec/spec/aprs101/APRS101.pdf + -- + Longitude is expressed as a fixed 9-character field, in degrees and + decimal minutes (to two decimal places), followed by the letter E for + east or W for west. + + Longitude degrees are in the range 000 to 180. Longitude minutes are + expressed as whole minutes and hundredths of a minute, separated by a + decimal point. + + For example: + + 07201.75W is 72 degrees 1 minute 45 seconds west. + + In generic format examples, the longitude is shown as the 9-character + string dddmm.hhW (i.e. degrees, minutes and hundredths of a minute + west). + """ + test_lng = 122.38833 + aprs_lng = aprs.util.dec2dm_lng(test_lng) + self.logger.debug('aprs_lng=%s', aprs_lng) + + lng_deg = int(aprs_lng.split('.')[0][:2]) + # lng_hsec = aprs_lng.split('.')[1] + + self.assertTrue(len(aprs_lng) == 9) + self.assertTrue(lng_deg >= 000) + self.assertTrue(lng_deg <= 180) + self.assertTrue(aprs_lng.endswith('E')) + + def test_valid_callsign_valid(self): + """ + Tests valid callsigns using `aprs.util.valid_callsign()`. + """ + for i in VALID_CALLSIGNS: + self.assertTrue( + aprs.util.valid_callsign(i), "%s is a valid call" % i) + + def test_valid_callsign_invalid(self): + """ + Tests invalid callsigns using `aprs.util.valid_callsign()`. + """ + for i in INVALID_CALLSIGNS: + self.assertFalse( + aprs.util.valid_callsign(i), "%s is an invalid call" % i) + + def test_extract_callsign_source(self): + """ + Tests extracting the source callsign from a KISS-encoded APRS frame + using `aprs.util.extract_callsign()`. + """ + callsign = {'callsign': 'W2GMD', 'ssid': 6} + extracted_callsign = aprs.util.extract_callsign(self.test_frame[7:]) + self.assertEqual(callsign, extracted_callsign) + + def test_extract_callsign_dest(self): + """ + Tests extracting the destination callsign from a KISS-encoded APRS + frame using `aprs.util.extract_callsign()`. + """ + extracted_callsign = aprs.util.extract_callsign(self.test_frame) + self.assertEqual(extracted_callsign['callsign'], 'APRX24') + + def test_full_callsign_with_ssid(self): + """ + Tests creating a full callsign string from a callsign+ssid dict using + `aprs.util.full_callsign()`. + """ + callsign = { + 'callsign': 'W2GMD', + 'ssid': 1 + } + full_callsign = aprs.util.full_callsign(callsign) + self.assertEqual(full_callsign, 'W2GMD-1') + + def test_full_callsign_sans_ssid(self): + """ + Tests creating a full callsign string from a callsign dict using + `aprs.util.full_callsign()`. + """ + callsign = { + 'callsign': 'W2GMD', + 'ssid': 0 + } + full_callsign = aprs.util.full_callsign(callsign) + self.assertEqual(full_callsign, 'W2GMD') + + def test_format_aprs_frame(self): + """ + Tests formatting an APRS frame-as-string from an APRS frame-as-dict + using `aprs.util.format_aprs_frame()`. + """ + frame = { + 'source': 'W2GMD-1', + 'destination': 'OMG', + 'path': 'WIDE1-1', + 'text': 'test_format_aprs_frame' + } + formatted_frame = aprs.util.format_aprs_frame(frame) + self.assertEqual( + formatted_frame, + 'W2GMD-1>OMG,WIDE1-1:test_format_aprs_frame' + ) + + def test_decode_aprs_ascii_frame(self): + """ + Tests creating an APRS frame-as-dict from an APRS frame-as-string + using `aprs.util.decode_aprs_ascii_frame()`. + """ + ascii_frame = ( + 'W2GMD-9>APOTC1,WIDE1-1,WIDE2-1:!3745.94N/12228.05W>118/010/' + 'A=000269 38C=Temp http://w2gmd.org/ Twitter: @ampledata') + frame = aprs.util.decode_aprs_ascii_frame(ascii_frame) + self.assertEqual( + { + 'source': 'W2GMD-9', + 'destination': 'APOTC1', + 'path': 'APOTC1,WIDE1-1,WIDE2-1', + 'text': ('!3745.94N/12228.05W>118/010/A=000269 38C=Temp ' + 'http://w2gmd.org/ Twitter: @ampledata'), + }, + frame + ) + + def test_extract_path(self): + """ + Tests extracting the APRS path from a KISS-encoded frame + using `aprs.util.extract_path()`. + """ + extracted_path = aprs.util.extract_path(3, self.test_frame) + self.assertEqual('WIDE1-1', extracted_path[0]) + + def test_format_path(self): + """ + Tests formatting an APRS path from a KISS-encoded frame + using `aprs.util.format_path()`. + """ + extracted_path = aprs.util.format_path(3, self.test_frame) + self.assertEqual('WIDE1-1', extracted_path) + + def test_encode_frame(self): + """ + Tests KISS-encoding an APRS frame using + `aprs.util.encode_frame()`. + """ + frame = { + 'source': 'W2GMD-1', + 'destination': 'OMG', + 'path': 'WIDE1-1', + 'text': 'test_encode_frame' + } + encoded_frame = aprs.util.encode_frame(frame) + legit = ('\x9e\x9a\x8e@@@`\xaed\x8e\x9a\x88@b' + '\xae\x92\x88\x8ab@c\x03\xf0test_encode_frame') + self.assertEqual(legit, encoded_frame) + + def test_decode_frame_recorded(self): + """ + Tests decoding a KISS-encoded APRS frame using + `aprs.util.decode_frame()`. + """ + frame = { + 'path': 'WIDE1-1', + 'destination': 'APRX24', + 'source': 'W2GMD-6', + 'text': ('!3745.75NI12228.05W#W2GMD-6 Inner Sunset, ' + 'SF iGate/Digipeater http://w2gmd.org') + } + self.assertEqual(frame, aprs.util.decode_frame(self.test_frame)) + + def test_decode_frame(self): + """ + Tests decoding a KISS-encoded APRS frame using + `aprs.util.decode_frame()`. + """ + frame = { + 'source': 'W2GMD-1', + 'destination': 'OMG', + 'path': 'WIDE1-1,WIDE2-2', + 'text': 'test_encode_frame' + } + encoded_frame = aprs.util.encode_frame(frame) + decoded_frame = aprs.util.decode_frame(encoded_frame) + self.assertEqual(frame, decoded_frame) + + def test_create_callsign(self): + """ + Tests creating a callsign string from a callsign dict using + `aprs.util.create_callsign()`. + """ + full_callsign = 'W2GMD-1' + callsign = aprs.util.create_callsign(full_callsign) + self.assertEqual({'callsign': 'W2GMD', 'ssid': 1}, callsign) + + def test_full_callsign(self): + """ + Tests converting a callsign dict to a callsing string + (callsign-ssid) using `aprs.util.full_callsign()`. + """ + callsign = {'callsign': 'W2GMD', 'ssid': 1} + full_callsign = aprs.util.full_callsign(callsign) + self.assertEqual('W2GMD-1', full_callsign) + + def test_encode_callsign_digipeated(self): + """ + Tests encoding a digipeated callsign with + `aprs.util.encode_callsign()`. + """ + callsign = {'callsign': 'W2GMD*', 'ssid': 1} + encoded_callsign = aprs.util.encode_callsign(callsign) + self.assertEqual('\xaed\x8e\x9a\x88@\xe2', encoded_callsign) + + def test_encode_callsign(self): + """ + Tests encoding a non-digipeated callsign with + `aprs.util.encode_callsign()`. + """ + callsign = {'callsign': 'W2GMD', 'ssid': 1} + encoded_callsign = aprs.util.encode_callsign(callsign) + self.assertEqual('\xaed\x8e\x9a\x88@b', encoded_callsign) + + +if __name__ == '__main__': + unittest.main()