diff --git a/src/apex/aprs/aprs_kiss.py b/src/apex/aprs/aprs_kiss.py index 86a6d0aed7ade850ffdb0106d610cff88f732c47..67034bab73e3d351499f9e04c4ca21ae45bf0515 100644 --- a/src/apex/aprs/aprs_kiss.py +++ b/src/apex/aprs/aprs_kiss.py @@ -61,6 +61,25 @@ class AprsKiss(object): logging.debug('frame=%s', frame) return frame + @staticmethod + def __valid_frame(raw_frame): + logging.debug('raw_frame=%s', raw_frame) + frame = {} + frame_len = len(raw_frame) + + if frame_len > 16: + for raw_slice in range(0, frame_len - 2): + # Is address field length correct? + if raw_frame[raw_slice] & 0x01 and ((raw_slice + 1) % 7) == 0: + i = (raw_slice + 1) / 7 + # Less than 2 callsigns? + if 1 < i < 11: + if raw_frame[raw_slice + 1] & 0x03 is 0x03 and raw_frame[raw_slice + 2] in [0xf0, 0xcf]: + return True + + logging.debug('frame=%s', frame) + return False + @staticmethod def __extract_path(start, raw_frame): """Extracts path from raw APRS KISS frame. @@ -193,7 +212,8 @@ class AprsKiss(object): """ with self.lock: encoded_frame = AprsKiss.__encode_frame(frame) - self.data_stream.write(encoded_frame, *args, **kwargs) + if AprsKiss.__valid_frame(encoded_frame): + self.data_stream.write(encoded_frame, *args, **kwargs) def read(self, *args, **kwargs): """Reads APRS-encoded frame from KISS device. diff --git a/tests/test_aprs.py b/tests/test_aprs.py index 5928a5cef2b1ee7e3ef0e3ba655801f78718b953..298813de44c799320a559a3b39695879feb6769a 100644 --- a/tests/test_aprs.py +++ b/tests/test_aprs.py @@ -15,6 +15,8 @@ import six import apex.aprs.constants import apex.aprs.igate +from .kiss_mock import KissMock + if six.PY2: import httpretty @@ -69,6 +71,28 @@ ENCODED_FRAME_MULTIPATH = [158, 154, 142, 64, 64, 64, 96, 174, 100, 142, 154, 13 98, 174, 146, 136, 138, 100, 64, 101, 3, 240, 116, 101, 115, 116, 95, 101, 110, 99, 111, 100, 101, 95, 102, 114, 97, 109, 101] +DECODED_FRAME_KISS = { + 'source': 'W2GMD-1', + 'destination': 'OMG', + 'path': ['WIDE1-1', 'WIDE2-2'], + 'text': 'test_encode_frame' + } +ENCODED_FRAME_KISS = [192, 0, 158, 154, 142, 64, 64, 64, 96, 174, 100, 142, 154, 136, 64, 98, 174, 146, 136, 138, 98, + 64, 98, 174, 146, 136, 138, 100, 64, 101, 3, 240, 116, 101, 115, 116, 95, 101, 110, 99, 111, 100, + 101, 95, 102, 114, 97, 109, 101, 192] + +DECODED_FRAME_KISS_INVALID = { + 'source': 'KG6WTF', + 'destination': 'S7TSUV', + 'path': ['MTOSO-2', 'WIDE2*' 'qAR', 'KF6FIR-10'], + 'text': '`17El#X-/kg6wtf@gosselinfamily.com' + } +ENCODED_FRAME_KISS_INVALID = [192, 0, 166, 110, 168, 166, 170, 172, 96, 150, 142, 108, 174, 168, 140, 96, 154, 168, 158, + 166, 158, 64, 100, 174, 146, 136, 138, 100, 226, 130, 164, 224, 150, 140, 108, 140, 146, 164, + 117, 3, 240, 96, 49, 55, 69, 108, 35, 88, 45, 47, 107, 103, 54, 119, 116, 102, 64, 103, 111, + 115, 115, 101, 108, 105, 110, 102, 97, 109, 105, 108, 121, 46, 99, 111, 109, 192] + + class AprsTest(unittest.TestCase): # pylint: disable=R0904 """Tests for Python APRS-IS Bindings.""" @@ -77,6 +101,54 @@ class AprsTest(unittest.TestCase): # pylint: disable=R0904 self.fake_server = 'http://localhost:5567/' self.fake_callsign = 'KWN4YGH-5' + def test_read_frame_kiss(self): + kiss_mock = KissMock() + aprs_kiss = apex.aprs.AprsKiss(kiss_mock) + + kiss_mock.clear_interface() + kiss_mock.add_read_from_interface(ENCODED_FRAME_KISS) + translated_frame = None + iter_left = 1000 + while not translated_frame and iter_left > 0: + translated_frame = aprs_kiss.read() + iter_left -= 1 + + self.assertEqual(DECODED_FRAME_KISS, translated_frame) + + def test_write_frame_kiss(self): + kiss_mock = KissMock() + aprs_kiss = apex.aprs.AprsKiss(kiss_mock) + + kiss_mock.clear_interface() + aprs_kiss.write(DECODED_FRAME_KISS) + all_raw_frames = kiss_mock.get_sent_to_interface() + + self.assertEqual(ENCODED_FRAME_KISS, all_raw_frames[0]) + + def test_read_frame_kiss_invalid(self): + kiss_mock = KissMock() + aprs_kiss = apex.aprs.AprsKiss(kiss_mock) + + kiss_mock.clear_interface() + kiss_mock.add_read_from_interface(ENCODED_FRAME_KISS_INVALID) + translated_frame = None + iter_left = 1000 + while not translated_frame and iter_left > 0: + translated_frame = aprs_kiss.read() + iter_left -= 1 + + self.assertEqual(None, translated_frame) + + def test_write_frame_kiss_invalid(self): + kiss_mock = KissMock() + aprs_kiss = apex.aprs.AprsKiss(kiss_mock) + + kiss_mock.clear_interface() + aprs_kiss.write(DECODED_FRAME_KISS_INVALID) + all_raw_frames = kiss_mock.get_sent_to_interface() + + self.assertEqual(0, len(all_raw_frames)) + def test_encode_callsign(self): """ Tests encoding a callsign.