From fc037e1ce6c36b50144474c503adba5f009df301 Mon Sep 17 00:00:00 2001 From: Greg Albrecht <gba@onbeep.com> Date: Sun, 18 Aug 2013 23:00:55 -0700 Subject: [PATCH] more cleanup and work, full aprs frame decoding seems to be working. --- kiss/__init__.py | 129 ++++++++++++++++++++++- kiss/constants.py | 4 +- kiss/kiss.py | 117 --------------------- kiss/util.py | 262 +++++++++++++++------------------------------- setup.py | 6 +- 5 files changed, 220 insertions(+), 298 deletions(-) delete mode 100644 kiss/kiss.py diff --git a/kiss/__init__.py b/kiss/__init__.py index 0a30e41..cf6e358 100644 --- a/kiss/__init__.py +++ b/kiss/__init__.py @@ -1,9 +1,136 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- +"""KISS Module for Python""" + __author__ = 'Greg Albrecht W2GMD <gba@onbeep.com>' __copyright__ = 'Copyright 2013 OnBeep, Inc.' __license__ = 'Apache License 2.0' -from .kiss import KISS +import logging + +import serial + +import kiss.constants +import kiss.util + + +class KISS(object): + + """KISS Object Class.""" + + logger = logging.getLogger(__name__) + logger.setLevel(kiss.constants.LOG_LEVEL) + console_handler = logging.StreamHandler() + console_handler.setLevel(kiss.constants.LOG_LEVEL) + formatter = logging.Formatter(kiss.constants.LOG_FORMAT) + console_handler.setFormatter(formatter) + logger.addHandler(console_handler) + logger.propagate = False + + def __init__(self, port, speed): + self.port = port + self.speed = speed + self.serial_int = None # TODO Potentially very f*cking unsafe. + + def start(self): + """ + Initializes the KISS device and commits configuration. + """ + self.logger.debug('start()') + self.serial_int = serial.Serial(self.port, self.speed) + self.serial_int.timeout = kiss.constants.SERIAL_TIMEOUT + + # http://en.wikipedia.org/wiki/KISS_(TNC)#Command_Codes + kiss_config = {} # TODO Yes, this isn't complete. + for setting in ['TX_DELAY', 'PERSISTENCE', 'SLOT_TIME', 'TX_TAIL', + 'FULL_DUPLEX']: + if kiss_config.get(setting): + self.write_setting(kiss_config[setting]) + + def write_setting(self, setting): + """ + Writes KISS Command Codes to attached device. + + :param setting: KISS Command Code to write. + """ + return self.serial_int.write( + kiss.constants.FEND + + kiss.constants.FULL_DUPLEX + + kiss.util.escape_special_chars(setting) + + kiss.constants.FEND + ) + + def read(self, callback=None): + """ + Reads data from KISS device. + + :param callback: Callback to call with decoded data. + """ + read_buffer = '' + while 1: + read_data = self.serial_int.read(kiss.constants.READ_BYTES) + + waiting_data = self.serial_int.inWaiting() + + if waiting_data: + read_data = ''.join([ + read_data, self.serial_int.read(waiting_data)]) + + if read_data: + frames = [] + + split_data = read_data.split(kiss.constants.FEND) + len_fend = len(split_data) + self.logger.debug('len_fend=%s', len_fend) + + # No FEND in frame + if len_fend == 1: + read_buffer = ''.join([read_buffer, split_data[0]]) + # Single FEND in frame + elif len_fend == 2: + # Closing FEND found + if split_data[0]: + # Partial frame continued, otherwise drop + frames.append(''.join([read_buffer, split_data[0]])) + read_buffer = '' + # Opening FEND found + else: + frames.append(read_buffer) + read_buffer = split_data[1] + # At least one complete frame received + elif len_fend >= 3: + for i in range(0, len_fend - 1): + _str = ''.join([read_buffer, split_data[i]]) + if _str: + frames.append(_str) + read_buffer = '' + if split_data[len_fend - 1]: + read_buffer = split_data[len_fend - 1] + + # Loop through received frames + for frame in frames: + if len(frame) and ord(frame[0]) == 0: + txt = kiss.util.format_aprs_frame(frame[1:]) + if txt: + self.logger.info('txt=%s', txt) + if callback: + callback(txt) + + def write(self, data): + """ + Writes data to KISS device. + + :param data: Data to write. + """ + raw = kiss.util.txt2raw(data) + self.logger.debug('raw=%s', raw) + frame = ''.join([ + kiss.constants.FEND, + kiss.constants.DATA_FRAME, + kiss.util.raw2kiss(raw), + kiss.constants.FEND + ]) + self.logger.debug('frame=%s', frame) + self.serial_int.write(frame) diff --git a/kiss/constants.py b/kiss/constants.py index cc1520a..fef74b5 100644 --- a/kiss/constants.py +++ b/kiss/constants.py @@ -11,7 +11,7 @@ __license__ = 'Apache License 2.0' import logging -LOG_LEVEL = logging.DEBUG +LOG_LEVEL = logging.INFO LOG_FORMAT = ('%(asctime)s %(levelname)s %(name)s.%(funcName)s:%(lineno)d' ' - %(message)s') SERIAL_TIMEOUT = 0.01 @@ -41,4 +41,4 @@ SLOT_TIME = chr(0x03) TX_TAIL = chr(0x04) FULL_DUPLEX = chr(0x05) SET_HARDWARE = chr(0x06) -RETURN = chr(0xFF) \ No newline at end of file +RETURN = chr(0xFF) diff --git a/kiss/kiss.py b/kiss/kiss.py deleted file mode 100644 index fd59072..0000000 --- a/kiss/kiss.py +++ /dev/null @@ -1,117 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - -"""KISS Module for Python""" - -__author__ = 'Greg Albrecht W2GMD <gba@onbeep.com>' -__copyright__ = 'Copyright 2013 OnBeep, Inc.' -__license__ = 'Apache License 2.0' - - -import logging - -import serial - -import kiss -import constants -import util - - -class KISS(object): - - logger = logging.getLogger(__name__) - logger.setLevel(kiss.constants.LOG_LEVEL) - console_handler = logging.StreamHandler() - console_handler.setLevel(kiss.constants.LOG_LEVEL) - formatter = logging.Formatter(kiss.constants.LOG_FORMAT) - console_handler.setFormatter(formatter) - logger.addHandler(console_handler) - - def __init__(self, port, speed): - self.port = port - self.speed = speed - - def start(self): - self.logger.debug('start()') - self.serial_int = serial.Serial(self.port, self.speed) - self.serial_int.timeout = kiss.constants.SERIAL_TIMEOUT - - # http://en.wikipedia.org/wiki/KISS_(TNC)#Command_Codes - kiss_config = {} - for setting in ['TX_DELAY', 'PERSISTENCE', 'SLOT_TIME', 'TX_TAIL', - 'FULL_DUPLEX']: - if kiss_config.get(setting): - self.write_setting(kiss_config[setting]) - - def write_setting(self, setting): - """ - TODO Finish this method. - """ - return self.serial_int.write( - kiss.constants.FEND + - kiss.constants.FULL_DUPLEX + - kiss.util.escape_special_chars(setting) + - kiss.constants.FEND - ) - - def read(self, cb=None): - read_buffer = '' - - while 1: - read_data = self.serial_int.read(kiss.constants.READ_BYTES) - - waiting_data = self.serial_int.inWaiting() - - if waiting_data: - read_data = ''.join([ - read_data, self.serial_int.read(waiting_data)]) - - if read_data: - frames = [] - - split_data = read_data.split(kiss.constants.FEND) - len_fend = len(split_data) - self.logger.debug('len_fend=%s', len_fend) - - # No FEND in frame - if len_fend == 1: - read_buffer = ''.join([read_buffer, split_data[0]]) - # Single FEND in frame - elif len_fend == 2: - # Closing FEND found - if split_data[0]: - # Partial frame continued, otherwise drop - frames.append(''.join([read_buffer, split_data[0]])) - read_buffer = '' - # Opening FEND found - else: - frames.append(read_buffer) - read_buffer = split_data[1] - # At least one complete frame received - elif len_fend >= 3: - for i in range(0, len_fend - 1): - st = ''.join([read_buffer, split_data[i]]) - - if st: - frames.append(st) - read_buffer = '' - - if split_data[len_fend - 1]: - read_buffer = split_data[len_fend - 1] - - # Loop through received frames - for frame in frames: - if len(frame) and ord(frame[0]) == 0: - txt = kiss.util.raw2txt(frame[1:]) - if txt: - self.logger.info('txt=%s', txt) - if cb: - cb(txt) - - def write(self, data): - raw = kiss.util.txt2raw(data) - self.logger.debug('raw=%s', raw) - frame = kiss.constants.FEND + kiss.constants.DATA_FRAME + kiss.util.raw2kiss(raw) + kiss.constants.FEND - self.logger.debug('frame=%s', frame) - self.serial_int.write(frame) - diff --git a/kiss/util.py b/kiss/util.py index bd04f70..1254252 100644 --- a/kiss/util.py +++ b/kiss/util.py @@ -10,8 +10,7 @@ __license__ = 'Apache License 2.0' import logging -import kiss -import constants +import kiss.constants logger = logging.getLogger(__name__) @@ -21,6 +20,7 @@ console_handler.setLevel(kiss.constants.LOG_LEVEL) formatter = logging.Formatter(kiss.constants.LOG_FORMAT) console_handler.setFormatter(formatter) logger.addHandler(console_handler) +logger.propagate = False def escape_special_chars(raw_char): @@ -36,13 +36,13 @@ def escape_special_chars(raw_char): """ logger.debug('raw_char=%s', raw_char) kiss_char = raw_char.replace( - constants.FEND, - constants.FEND_TFEND + kiss.constants.FEND, + kiss.constants.FEND_TFEND ).replace( - constants.FESC, - constants.FESC_TFESC + kiss.constants.FESC, + kiss.constants.FESC_TFESC ) - logger.debug("kiss_char=%s" % kiss_char) + logger.debug('kiss_char=%s', kiss_char) return kiss_char @@ -85,123 +85,98 @@ def valid_callsign(callsign): return False +# TODO Add example raw frame. def extract_callsign(raw_frame): """ - Extracts callsign from raw frame. + Extracts callsign from a raw KISS frame. - Parameters: - raw_frame: guess... - """ - logger.debug('raw_frame=%s', raw_frame) - callsign = '' + Test & Example: - for i in range(0, 6): - ch = chr(ord(raw_frame[i]) >> 1) - if ch == ' ': - break - callsign = ''.join([callsign, ch]) + >>> raw_frame = '' + >>> a = extract_callsign(raw_frame) + >>> a + {'callsign': 'W2GMD', 'ssid': 10} - ssid = (ord(raw_frame[6]) >> 1) & 0x0f - - if callsign.isalnum(): - if ssid > 0: - callsign = '-'.join([callsign, str(ssid)]) - else: - callsign = '' + :param raw_frame: Raw KISS Frame to decode. + :returns: Dict of callsign and ssid. + :rtype: dict + """ + logger.debug('raw_frame=%s', raw_frame) + callsign = ''.join([chr(ord(x) >> 1) for x in raw_frame[:6]]).strip() + ssid = (ord(raw_frame[6]) >> 1) & 0x0f logger.debug('ssid=%s callsign=%s', ssid, callsign) - return callsign - - -def hdump(hstr): - logger.debug('hstr=%s', hstr) - - i = 0 - k = 0 - - word1 = '' - word2 = '' - - for pstr in hstr: - rstr = ord(pstr) - word1 += '%02X ' % rstr - - if rstr < 32 or rstr > 127: - word2 += '.' - else: - word2 += pstr - - i += 1 - - if i == 16: - logger.debug('%04X %s %s', k, word1, word2) - word1 = '' - word2 = '' - i = 0 - k += 16 - - if not i == 0: - logger.debug('%04X %-48s %s', k, word1, word2) - logger.debug('%04X %-48s %s', k, word1, word2) + return {'callsign': callsign, 'ssid': ssid} -def raw2txt(raw): - logger.debug('raw=%s', raw) - hdump(raw) - - # Is it too short? - if len(raw) < 16: - hdump(raw) - return '' - - raw1 = '' - - for i in range(0, len(raw)): - if ord(raw[i]) & 0x01: - break - - # Is address field length correct? - if not ((i + 1) % 7) == 0: - return '' - - n = (i + 1) / 7 - - # Less than 2 callsigns? - if n < 2 or n > 10: - return '' +def full_callsign(raw_frame): + """ + Extract raw frame and returns full callsign (call + ssid). - if (i + 1) % 7 == 0 and n >= 2 and ord(raw[i + 1]) & 0x03 == 0x03 and ord(raw[i + 2]) == 0xf0: - strinfo = raw[i + 3:] + :param raw_frame: Raw KISS Frame to extract callsign from. + :returns: Callsign[-SSID]. + :rtype: str + """ + extracted = extract_callsign(raw_frame) + if extracted['ssid'] > 0: + return '-'.join([extracted['callsign'], str(extracted['ssid'])]) + else: + return extracted['callsign'] - if len(strinfo): - strto = extract_callsign(raw) - if strto == '': - return '' +def extract_path(start, raw_frame): + full_path = [] - strfrom = extract_callsign(raw[7:]) + for i in range(2, start): + path = full_callsign(raw_frame[i * 7:]) + if path: + if ord(raw_frame[i * 7 + 6]) & 0x80: + full_path.append(''.join([path, '*'])) + else: + full_path.append(path) + return full_path - if strfrom == '' or valid_callsign(strfrom): - return '' - raw1 = '>'.join([strfrom, strto]) +def format_path(start, raw_frame): + return ','.join(extract_path(start, raw_frame)) - for i in range(2, n): - s = extract_callsign(raw[i * 7:]) - if s == '': - hdump(raw) - return '' +def decode_aprs_frame(raw_frame): + logger.debug('raw_frame=%s', raw_frame) + decoded_frame = {} - raw1 += ''.join([',', s]) + frame_len = len(raw_frame) + logger.debug('frame_len=%s', frame_len) + + if frame_len > 16: + for raw_slice in range(0, frame_len): + # Is address field length correct? + if ord(raw_frame[raw_slice]) & 0x01 and ((raw_slice + 1) % 7) == 0: + n = (raw_slice + 1) / 7 + # Less than 2 callsigns? + if n >= 2 and n < 10: + logger.debug('n=%s', n) + break + + if (ord(raw_frame[raw_slice + 1]) & 0x03 == 0x03 and + ord(raw_frame[raw_slice + 2]) == 0xf0): + decoded_frame['text'] = raw_frame[raw_slice + 3:] + decoded_frame['destination'] = full_callsign(raw_frame) + decoded_frame['source'] = full_callsign(raw_frame[7:]) + decoded_frame['path'] = format_path(n, raw_frame) - if ord(raw[i * 7 + 6]) & 0x80: - raw1 += '*' + return decoded_frame - raw1 += ''.join([':', strinfo]) - logger.debug('raw1=%s', raw1) - return raw1 +def format_aprs_frame(raw_frame): + logger.debug('raw_frame=%s', raw_frame) + decoded_frame = decode_aprs_frame(raw_frame) + formatted_frame = '>'.join([ + decoded_frame['source'], decoded_frame['destination']]) + formatted_frame = ','.join([formatted_frame, decoded_frame['path']]) + formatted_frame = ':'.join([formatted_frame, decoded_frame['text']]) + logger.debug('formatted_frame=%s', formatted_frame) + return formatted_frame def kk2(ctxt): @@ -260,7 +235,13 @@ def txt2raw(s): if len(w2[i]) > 1: r += kk2(w2[i]) - rr = r[:-1] + chr(ord(r[-1]) | 0x01) + kiss.constants.SLOT_TIME + chr(0xf0) + inf + rr = ''.join([ + r[:-1], + chr(ord(r[-1]) | 0x01), + kiss.constants.SLOT_TIME, + chr(0xf0), + inf + ]) return rr @@ -270,76 +251,7 @@ def raw2kiss(raw): Inspired by dixprs. """ - logger.debug(locals()) return raw.replace( - kiss.constants.FEND, ''.join([kiss.constants.FESC, kiss.constants.TFEND]) + kiss.constants.FEND, + ''.join([kiss.constants.FESC, kiss.constants.TFEND]) ).replace(kiss.constants.FEND, kiss.constants.FESC_TFESC) - - -def decode_aprs_frame(frame): - """ - Decodes an APRS frame into its constituent parts. - - Inspired by dixprs. - """ - decoded_frame = {} - if ':' in frame and '>' in frame: - split_frame = frame.split(':') - if len(split_frame) == 2: - call_signs, decoded_frame['text'] = split_frame - decoded_frame['source'], other_calls = call_signs.split('>') - decoded_frame['digis'] = other_calls.split(',') - decoded_frame['destination'] = decoded_frame['digis'][0] - else: - logger.error('More than two semi-colons: %s', split_frame) - logger.debug('decoded_frame=%s', decoded_frame) - return decoded_frame - - -def is_invalid_call(s): - w = s.split('-') - - if len(w) > 2: - return True - - if len(w[0]) < 1 or len(w[0]) > 6: - return True - - for p in w[0]: - if not (p.isalpha() or p.isdigit()): - return True - - if w[0].isalpha() or w[0].isdigit(): - return True - - if len(w) == 2: - try: - ssid = int(w[1]) - - if ssid < 0 or ssid > 15: - return True - - except ValueError: - return True - - return False - - -def is_direct(frm): - for p in frm[2]: - if p[1] <> 0: - return False - - return True - - -def get_hops(frm): - n = 0 - - for p in frm[2]: - if p[1] == 0: - break - - n += 1 - - return n diff --git a/setup.py b/setup.py index ffb929f..640d6e6 100644 --- a/setup.py +++ b/setup.py @@ -23,7 +23,7 @@ def read_readme(): setuptools.setup( name='kiss', - version='0.0.1', + version='0.0.2', description=('KISS is a protocol for communicating with a serial TNC ' 'device used for Amateur Radio.'), author='Greg Albrecht', @@ -31,8 +31,8 @@ setuptools.setup( long_description=('A Python implementation of the KISS Protocol for ' 'communicating with serial TNC devices for use with ' 'Amateur Radio.'), - license='Apache License 2.0', - copyright='Copyright 2013 OnBeep, Inc.', + license='See LICENSE.txt', + copyright='See COPYRIGHT.txt', url='https://github.com/ampledata/kiss', setup_requires=['nose'], tests_require=['coverage', 'nose'], -- GitLab