diff --git a/aprs/__init__.py b/aprs/__init__.py index aaa6172944ff39dfbf93d3e9a356c7cdb3ff0278..94fd72b63d5ba86bdbb420fe1aef68413f5ae2f5 100644 --- a/aprs/__init__.py +++ b/aprs/__init__.py @@ -1,9 +1,11 @@ #!/usr/bin/env python +# -*- coding: utf-8 -*- -__author__ = 'Greg Albrecht W2GMD <gba@gregalbrecht.com>' -__copyright__ = 'Copyright 2013 Greg Albrecht' -__license__ = 'Creative Commons Attribution 3.0 Unported License' +__author__ = 'Greg Albrecht W2GMD <gba@onbeep.com>' +__copyright__ = 'Copyright 2013 OnBeep, Inc.' +__license__ = 'Apache 2.0' -from .aprs import APRS +from .classes import APRS, APRSKISS + import util diff --git a/aprs/aprs.py b/aprs/aprs.py deleted file mode 100755 index 08630e8bc20f88e6f82424cf92f863fef42e1680..0000000000000000000000000000000000000000 --- a/aprs/aprs.py +++ /dev/null @@ -1,33 +0,0 @@ -#!/usr/bin/env python - -__author__ = 'Greg Albrecht W2GMD <gba@gregalbrecht.com>' -__copyright__ = 'Copyright 2013 Greg Albrecht' -__license__ = 'Creative Commons Attribution 3.0 Unported License' - - -import logging -import logging.handlers - -import requests - - -class APRS(object): - - logger = logging.getLogger('aprs') - logger.addHandler(logging.StreamHandler()) - - def __init__(self, user, password=-1, input_url=None): - self._url = input_url or 'http://srvr.aprs-is.net:8080' - self._auth = "user %s pass %s" % (user, password) - - def send(self, message): - headers = { - 'content-type': 'application/octet-stream', - 'accept': 'text/plain' - } - - content = "\n".join([self._auth, message]) - - result = requests.post(self._url, data=content, headers=headers) - - return result.status_code == 204 diff --git a/aprs/classes.py b/aprs/classes.py new file mode 100755 index 0000000000000000000000000000000000000000..ae2eaa1190831e8a5001e5397a2a0941d053f707 --- /dev/null +++ b/aprs/classes.py @@ -0,0 +1,41 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +__author__ = 'Greg Albrecht W2GMD <gba@onbeep.com>' +__copyright__ = 'Copyright 2013 OnBeep, Inc.' +__license__ = 'Apache 2.0' + + +import logging +import logging.handlers + +import requests + +import kiss + +import aprs.constants + + +class APRS(object): + + logger = logging.getLogger('aprs') + logger.addHandler(logging.StreamHandler()) + + def __init__(self, user, password='-1', input_url=None): + self._url = input_url or aprs.constants.APRSIS_URL + self._auth = ' '.join(['user', user, 'pass', password]) + + def send(self, message, headers=None): + headers = headers or aprs.constants.APRSIS_HTTP_HEADERS + content = "\n".join([self._auth, message]) + + result = requests.post(self._url, data=content, headers=headers) + + return result.status_code == 204 + + +class APRSKISS(kiss.KISS): + + def write(self, frame): + encoded_frame = aprs.util.encode_frame(frame) + super(APRSKISS, self).write(encoded_frame) diff --git a/aprs/constants.py b/aprs/constants.py new file mode 100644 index 0000000000000000000000000000000000000000..c5d7bbb40b8400c35839e09a7507fc26aace933c --- /dev/null +++ b/aprs/constants.py @@ -0,0 +1,25 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +""" +Constants for APRS Module. +""" + +__author__ = 'Greg Albrecht W2GMD <gba@onbeep.com>' +__copyright__ = 'Copyright 2013 OnBeep, Inc.' +__license__ = 'Apache 2.0' + + +import logging + + +APRSIS_URL = 'http://srvr.aprs-is.net:8080' +APRSIS_HTTP_HEADERS = { + 'content-type': 'application/octet-stream', + 'accept': 'text/plain' +} + + +LOG_LEVEL = logging.DEBUG +LOG_FORMAT = ('%(asctime)s %(levelname)s %(name)s.%(funcName)s:%(lineno)d' + ' - %(message)s') \ No newline at end of file diff --git a/aprs/util.py b/aprs/util.py index b75ba6e7b8dc748cbfb8f34b7e14fbf63a576d08..9a95097d6a744cd8d0f939cb6513b1588a609385 100755 --- a/aprs/util.py +++ b/aprs/util.py @@ -1,11 +1,28 @@ #!/usr/bin/env python +# -*- coding: utf-8 -*- -__author__ = 'Greg Albrecht W2GMD <gba@gregalbrecht.com>' -__copyright__ = 'Copyright 2013 Greg Albrecht' -__license__ = 'Creative Commons Attribution 3.0 Unported License' +"""Utilities for the APRS Python Module.""" +__author__ = 'Greg Albrecht W2GMD <gba@onbeep.com>' +__copyright__ = 'Copyright 2013 OnBeep, Inc.' +__license__ = 'Apache 2.0' -import decimaldegrees + +import logging + +import aprs.constants +import aprs.decimaldegrees +import kiss.constants + + +logger = logging.getLogger(__name__) +logger.setLevel(aprs.constants.LOG_LEVEL) +console_handler = logging.StreamHandler() +console_handler.setLevel(aprs.constants.LOG_LEVEL) +formatter = logging.Formatter(aprs.constants.LOG_FORMAT) +console_handler.setFormatter(formatter) +logger.addHandler(console_handler) +logger.propagate = False # http://stackoverflow.com/questions/2056750/lat-long-to-minutes-and-seconds @@ -19,7 +36,7 @@ def dec2dm_lat(dec): >>> aprs_lat '3744.51N' """ - dm = decimaldegrees.decimal2dm(dec) + dm = aprs.decimaldegrees.decimal2dm(dec) deg = dm[0] abs_deg = abs(deg) @@ -42,7 +59,7 @@ def dec2dm_lng(dec): >>> aprs_lng '12223.30W' """ - dm = decimaldegrees.decimal2dm(dec) + dm = aprs.decimaldegrees.decimal2dm(dec) deg = dm[0] abs_deg = abs(deg) @@ -55,10 +72,206 @@ def dec2dm_lng(dec): return ''.join([str(abs_deg), "%.2f" % dm[1], suffix]) +def decode_aprs_ascii_frame(ascii_frame): + """ + Breaks an ASCII APRS Frame down to it's constituent parts. + + Test & Example + ~~~~ + + >>> frame = 'W2GMD-9>APOTC1,WIDE1-1,WIDE2-1:!3745.94N/12228.05W>118/010/A=000269 38C=Temp http://w2gmd.org/ Twitter: @ampledata' + >>> decode_aprs_ascii_frame(frame) + {'source': 'W2GMD-9', 'destination': 'APOTC1', 'text': '!3745.94N/12228.05W>118/010/A=000269 38C=Temp http://w2gmd.org/ Twitter: @ampledata', 'path': 'APOTC1,WIDE1-1,WIDE2-1'} + + + :param frame: ASCII APRS Frame. + :type frame: str + + :returns: Dictionary of APRS Frame parts: source, destination, path, text. + :rtype: dict + """ + logger.debug('frame=%s', ascii_frame) + decoded_frame = {} + frame_so_far = '' + + for c in ascii_frame: + if '>' in c and not 'source' in decoded_frame: + decoded_frame['source'] = frame_so_far + frame_so_far = '' + elif ':' in c and not 'path' in decoded_frame: + decoded_frame['path'] = frame_so_far + frame_so_far = '' + else: + frame_so_far = ''.join([frame_so_far, c]) + + decoded_frame['text'] = frame_so_far + decoded_frame['destination'] = decoded_frame['path'].split(',')[0] + + return decoded_frame + + +def format_aprs_frame(frame): + formatted_frame = '>'.join([frame['source'], frame['destination']]) + formatted_frame = ','.join([formatted_frame, frame['path']]) + formatted_frame = ':'.join([formatted_frame, frame['text']]) + return formatted_frame + + +def create_callsign(raw_callsign): + if '-' in raw_callsign: + call_sign, ssid = raw_callsign.split('-') + else: + call_sign = raw_callsign + ssid = 0 + return {'callsign': call_sign, 'ssid': int(ssid)} + + +def full_callsign(callsign): + """ + Returns a fully-formatted callsign (Callsign + SSID). + + :param callsign: Callsign Dictionary {'callsign': '', 'ssid': n} + :type callsign: dict + :returns: Callsign[-SSID]. + :rtype: str + """ + if callsign['ssid'] > 0: + return '-'.join([callsign['callsign'], str(callsign['ssid'])]) + else: + return callsign['callsign'] + + +def valid_callsign(callsign): + """ + Validates callsign. + + :param callsign: Callsign to validate. + :type callsign: str + + :returns: True if valid, False otherwise. + :rtype: bool + """ + logger.debug('callsign=%s', callsign) + + if '-' in callsign: + if not callsign.count('-') == 1: + return False + else: + callsign, ssid = callsign.split('-') + else: + ssid = 0 + + logger.debug('callsign=%s ssid=%s', callsign, ssid) + + if len(callsign) < 2 or len(callsign) > 6: + return False + + if len(str(ssid)) < 1 or len(str(ssid)) > 2: + return False + + for c in callsign: + if not (c.isalpha() or c.isdigit()): + return False + + if not str(ssid).isdigit(): + return False + + if int(ssid) < 0 or int(ssid) > 15: + return False + + return True + + +def extract_callsign(raw_frame): + """ + Extracts callsign from a raw KISS frame. + + :param raw_frame: Raw KISS Frame to decode. + :returns: Dict of callsign and ssid. + :rtype: dict + """ + callsign = ''.join([chr(ord(x) >> 1) for x in raw_frame[:6]]).strip() + ssid = (ord(raw_frame[6]) >> 1) & 0x0f + return {'callsign': callsign, 'ssid': ssid} + + +def extract_path(start, raw_frame): + full_path = [] + + for i in range(2, start): + path = aprs.util.full_callsign(extract_callsign(raw_frame[i * 7:])) + if path: + if ord(raw_frame[i * 7 + 6]) & 0x80: + full_path.append(''.join([path, '*'])) + else: + full_path.append(path) + return full_path + + +def format_path(start, raw_frame): + return ','.join(extract_path(start, raw_frame)) + + +def encode_callsign(callsign): + call_sign = callsign['callsign'] + digi = False + + ct = (callsign['ssid'] << 1) | 0x60 + + if '*' in call_sign: + call_sign = call_sign.replace('*', '') + ct |= 0x80 + + while len(call_sign) < 6: + call_sign = ''.join([call_sign, ' ']) + + encoded = ''.join([chr(ord(p) << 1) for p in call_sign]) + return ''.join([encoded, chr(ct)]) + + +def encode_frame(frame): + enc_frame = ''.join([ + encode_callsign(create_callsign(frame['destination'])), + encode_callsign(create_callsign(frame['source'])), + ''.join([encode_callsign(create_callsign(p)) for p in frame['path'].split(',')]) + ]) + + return ''.join([ + enc_frame[:-1], + chr(ord(enc_frame[-1]) | 0x01), + kiss.constants.SLOT_TIME, + chr(0xf0), + frame['text'] + ]) + + +def decode_frame(raw_frame): + logging.debug('raw_frame=%s', raw_frame) + frame = {} + frame_len = len(raw_frame) + + if frame_len > 16: + for raw_slice in range(0, frame_len): + # Is address field length correct? + if ord(raw_frame[raw_slice]) & 0x01 and ((raw_slice + 1) % 7) == 0: + n = (raw_slice + 1) / 7 + # Less than 2 callsigns? + if 2 < n < 10: + if (ord(raw_frame[raw_slice + 1]) & 0x03 == 0x03 and + ord(raw_frame[raw_slice + 2]) == 0xf0): + frame['text'] = raw_frame[raw_slice + 3:] + frame['destination'] = full_callsign(extract_callsign(raw_frame)) + frame['source'] = full_callsign(extract_callsign(raw_frame[7:])) + frame['path'] = format_path(n, raw_frame) + + logging.debug('frame=%s', frame) + return frame + + def run_doctest(): import doctest - import util - return doctest.testmod(util) + import aprs.util + return doctest.testmod(aprs.util) if __name__ == '__main__': diff --git a/requirements.txt b/requirements.txt index 6794368938f6ea0e5822ae3f178271460c93d469..60144c744aacdfcbab488bd49c8ab215abc189f2 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,3 +3,4 @@ clonedigger httpretty nose requests +pyserial \ No newline at end of file diff --git a/setup.py b/setup.py index 133b28a3c7ad79fb9b38bd38f45f0841c90b805e..dc0183110bcb14377836b86bd9c14fed3d0ee12e 100644 --- a/setup.py +++ b/setup.py @@ -1,4 +1,5 @@ #!/usr/bin/env python +# -*- coding: utf-8 -*- __author__ = 'Greg Albrecht W2GMD <gba@gregalbrecht.com>' __copyright__ = 'Copyright 2013 Greg Albrecht' diff --git a/tests/constants.py b/tests/constants.py new file mode 100644 index 0000000000000000000000000000000000000000..6bae02376c640d800230930d6ddfdeccbfac4c93 --- /dev/null +++ b/tests/constants.py @@ -0,0 +1,11 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +"""Constants for APRS Module Tests.""" + +__author__ = 'Greg Albrecht W2GMD <gba@onbeep.com>' +__copyright__ = 'Copyright 2013 OnBeep, Inc.' +__license__ = 'Apache 2.0' + + +TEST_FRAMES = 'tests/test_frames.log' diff --git a/tests/context.py b/tests/context.py index da83fba249190ce29fb2b0cf9b78eed93187362c..433ac011984baf41c0e210625e1da90465f0cffa 100644 --- a/tests/context.py +++ b/tests/context.py @@ -1,19 +1,10 @@ #!/usr/bin/env python -"""Test context. - -Based on http://kennethreitz.com/repository-structure-and-python.html -""" - -__author__ = 'Greg Albrecht W2GMD <gba@gregalbrecht.com>' -__copyright__ = 'Copyright 2013 Greg Albrecht' -__license__ = 'Creative Commons Attribution 3.0 Unported License' +# -*- coding: utf-8 -*- import os import sys - sys.path.insert(0, os.path.abspath('..')) - import aprs diff --git a/tests/test_aprs.py b/tests/test_aprs.py index 1337c61da25dbfba66612ae84909bc4acf5c6233..4e55f9d073e099b871471421ec0a80ddb4cfb801 100644 --- a/tests/test_aprs.py +++ b/tests/test_aprs.py @@ -1,8 +1,9 @@ #!/usr/bin/env python +# -*- coding: utf-8 -*- -__author__ = 'Greg Albrecht W2GMD <gba@gregalbrecht.com>' -__copyright__ = 'Copyright 2013 Greg Albrecht' -__license__ = 'Creative Commons Attribution 3.0 Unported License' +__author__ = 'Greg Albrecht W2GMD <gba@onbeep.com>' +__copyright__ = 'Copyright 2013 OnBeep, Inc.' +__license__ = 'Apache 2.0' import random @@ -24,8 +25,14 @@ ALPHANUM = ''.join([ALPHABET, NUMBERS]) class APRSTest(unittest.TestCase): """Tests for Python APRS Bindings.""" - logger = logging.getLogger('aprs.tests') - logger.addHandler(logging.StreamHandler()) + logger = logging.getLogger(__name__) + logger.setLevel(aprs.constants.LOG_LEVEL) + console_handler = logging.StreamHandler() + console_handler.setLevel(aprs.constants.LOG_LEVEL) + formatter = logging.Formatter(aprs.constants.LOG_FORMAT) + console_handler.setFormatter(formatter) + logger.addHandler(console_handler) + logger.propagate = False def random(self, length=8, alphabet=ALPHANUM): return ''.join(random.choice(alphabet) for _ in xrange(length)) diff --git a/tests/test_frames.log b/tests/test_frames.log new file mode 100644 index 0000000000000000000000000000000000000000..9a45d9737ae6f85a7af55d31ee225d8f3938fd6f --- /dev/null +++ b/tests/test_frames.log @@ -0,0 +1,2 @@ +‚ ¤°dh`®dŽšˆ@l®’ˆŠb@cð!3745.75NI12228.05W#W2GMD-6 Inner Sunset, SF iGate/Digipeater http://w2gmd.org +‚ ¤°dh`®dŽšˆ@l®’ˆŠb@cðT#939,10.9,4.5,57.0,1.0,18.0,00000000 diff --git a/tests/test_util.py b/tests/test_util.py index e01cf79e84a3d0b966317cabfd9fab3c778198f9..21f2a4a3e03194328a4fc2dfd386fa1876ec278a 100644 --- a/tests/test_util.py +++ b/tests/test_util.py @@ -1,8 +1,9 @@ #!/usr/bin/env python +# -*- coding: utf-8 -*- -__author__ = 'Greg Albrecht W2GMD <gba@gregalbrecht.com>' -__copyright__ = 'Copyright 2013 Greg Albrecht' -__license__ = 'Creative Commons Attribution 3.0 Unported License' +__author__ = 'Greg Albrecht W2GMD <gba@onbeep.com>' +__copyright__ = 'Copyright 2013 OnBeep, Inc.' +__license__ = 'Apache 2.0' import unittest @@ -11,18 +12,39 @@ import logging.handlers from .context import aprs +from . import constants + ALPHABET = 'ABCDEFGHIJKLMNOPQRSTUVWXYZ' NUMBERS = '0123456789' POSITIVE_NUMBERS = NUMBERS[1:] ALPHANUM = ''.join([ALPHABET, NUMBERS]) +VALID_CALLSIGNS = ['W2GMD', 'W2GMD-1', 'KF4MKT', 'KF4MKT-1', 'KF4LZA-15'] +INVALID_CALLSIGNS = ['xW2GMDx', 'W2GMD-16', 'W2GMD-A', 'W', 'W2GMD-1-0', + 'W*GMD', 'W2GMD-123'] + -class APRSUtilTest(unittest.TestCase): +class APRSUtilTestCase(unittest.TestCase): """Tests for Python APRS Utils.""" - logger = logging.getLogger('aprs.util.tests') - logger.addHandler(logging.StreamHandler()) + logger = logging.getLogger(__name__) + logger.setLevel(aprs.constants.LOG_LEVEL) + console_handler = logging.StreamHandler() + console_handler.setLevel(aprs.constants.LOG_LEVEL) + formatter = logging.Formatter(aprs.constants.LOG_FORMAT) + console_handler.setFormatter(formatter) + logger.addHandler(console_handler) + logger.propagate = False + + def setUp(self): + """Setup.""" + self.test_frames = open(constants.TEST_FRAMES, 'r') + self.test_frame = self.test_frames.readlines()[0].strip() + + def tearDown(self): + """Teardown.""" + self.test_frames.close() def test_latitude(self): """Test Decimal to APRS Latitude conversion. @@ -45,7 +67,7 @@ class APRSUtilTest(unittest.TestCase): """ test_lat = 37.7418096 aprs_lat = aprs.util.dec2dm_lat(test_lat) - self.logger.debug("aprs_lat=%s" % aprs_lat) + self.logger.debug('aprs_lat=%s', aprs_lat) lat_deg = int(aprs_lat.split('.')[0][:1]) lat_hsec = aprs_lat.split('.')[1] @@ -55,7 +77,6 @@ class APRSUtilTest(unittest.TestCase): self.assertTrue(lat_deg <= 90) self.assertTrue(aprs_lat.endswith('N')) - def test_longitude(self): """Test Decimal to APRS Longitude conversion. @@ -79,7 +100,7 @@ class APRSUtilTest(unittest.TestCase): """ test_lng = -122.38833 aprs_lng = aprs.util.dec2dm_lng(test_lng) - self.logger.debug("aprs_lng=%s" % aprs_lng) + self.logger.debug('aprs_lng=%s', aprs_lng) lng_deg = int(aprs_lng.split('.')[0][:2]) lng_hsec = aprs_lng.split('.')[1] @@ -89,6 +110,98 @@ class APRSUtilTest(unittest.TestCase): self.assertTrue(lng_deg <= 180) self.assertTrue(aprs_lng.endswith('W')) + def test_valid_callsign_valid(self): + for c in VALID_CALLSIGNS: + self.assertTrue(aprs.util.valid_callsign(c), "%s is a valid call" % c) + + def test_valid_callsign_invalid(self): + for c in INVALID_CALLSIGNS: + self.assertFalse(aprs.util.valid_callsign(c), "%s is an invalid call" % c) + + def test_extract_callsign_source(self): + callsign = {'callsign': 'W2GMD', 'ssid': 6} + extracted_callsign = aprs.util.extract_callsign(self.test_frame[7:]) + self.assertEqual(callsign, extracted_callsign) + + def test_extract_callsign_destination(self): + extracted_callsign = aprs.util.extract_callsign(self.test_frame) + self.assertEqual(extracted_callsign['callsign'], 'APRX24') + + def test_full_callsign_with_ssid(self): + callsign = { + 'callsign': 'W2GMD', + 'ssid': 1 + } + full_callsign = aprs.util.full_callsign(callsign) + self.assertEqual(full_callsign, 'W2GMD-1') + + def test_full_callsign_sans_ssid(self): + callsign = { + 'callsign': 'W2GMD', + 'ssid': 0 + } + full_callsign = aprs.util.full_callsign(callsign) + self.assertEqual(full_callsign, 'W2GMD') + + def test_format_aprs_frame(self): + frame = { + 'source': 'W2GMD-1', + 'destination': 'OMG', + 'path': 'WIDE1-1', + 'text': 'test_format_aprs_frame' + } + formatted_frame = aprs.util.format_aprs_frame(frame) + self.assertEqual( + formatted_frame, + 'W2GMD-1>OMG,WIDE1-1:test_format_aprs_frame' + ) + + def test_decode_aprs_ascii_frame(self): + ascii_frame = 'W2GMD-9>APOTC1,WIDE1-1,WIDE2-1:!3745.94N/12228.05W>118/010/A=000269 38C=Temp http://w2gmd.org/ Twitter: @ampledata' + frame = aprs.util.decode_aprs_ascii_frame(ascii_frame) + self.assertEqual( + frame, + {'source': 'W2GMD-9', 'destination': 'APOTC1', 'text': '!3745.94N/12228.05W>118/010/A=000269 38C=Temp http://w2gmd.org/ Twitter: @ampledata', 'path': 'APOTC1,WIDE1-1,WIDE2-1'} + ) + + def test_extract_path(self): + extracted_path = aprs.util.extract_path(3, self.test_frame) + self.assertEqual('WIDE1-1', extracted_path[0]) + + def test_format_path(self): + extracted_path = aprs.util.format_path(3, self.test_frame) + self.assertEqual('WIDE1-1', extracted_path) + + def test_encode_frame(self): + frame = { + 'source': 'W2GMD-1', + 'destination': 'OMG', + 'path': 'WIDE1-1', + 'text': 'test_encode_frame' + } + encoded_frame = aprs.util.encode_frame(frame) + legit = '\x9e\x9a\x8e@@@`\xaed\x8e\x9a\x88@b\xae\x92\x88\x8ab@c\x03\xf0test_encode_frame' + self.assertEqual(legit, encoded_frame) + + def test_decode_frame_recorded(self): + frame = { + 'text': '!3745.75NI12228.05W#W2GMD-6 Inner Sunset, SF iGate/Digipeater http://w2gmd.org', + 'path': 'WIDE1-1', + 'destination': 'APRX24', + 'source': 'W2GMD-6' + } + self.assertEqual(frame, aprs.util.decode_frame(self.test_frame)) + + def test_decode_frame(self): + frame = { + 'source': 'W2GMD-1', + 'destination': 'OMG', + 'path': 'WIDE1-1,WIDE2-2', + 'text': 'test_encode_frame' + } + encoded_frame = aprs.util.encode_frame(frame) + decoded_frame = aprs.util.decode_frame(encoded_frame) + self.assertEqual(frame, decoded_frame) if __name__ == '__main__': unittest.main()