Skip to content
Snippets Groups Projects
Commit 1590132c authored by Greg Albrecht's avatar Greg Albrecht
Browse files

Complete rewrite.

parent 7c5496e0
No related branches found
No related tags found
No related merge requests found
File moved
A pure-Python KISS module partially based off the work on dixprs by HA5DI,
et al.
KISS is a protocol for communicating with a serial TNC device used for Amateur Radio.
A Python implementation of the KISS Protocol for communicating with serial TNC devices for use with Amateur Radio.
A pure-Python KISS module partially based off the work on dixprs by HA5DI, et al.
dixprs: https://sites.google.com/site/dixprs/
As this project still uses components derived from dixprs, portions fall
under License #2 in LICENSE.txt
Otherwise, non-derived portions are covered by License #1 in LICENSE.txt.
\ No newline at end of file
Otherwise, non-derived portions are covered by License #1 in LICENSE.txt.
description=(''),
long_description=(''),
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""KISS Module for Python"""
# KISS Python Module.
__author__ = 'Greg Albrecht W2GMD <gba@onbeep.com>'
__copyright__ = 'Copyright 2013 OnBeep, Inc.'
__license__ = 'Apache License 2.0'
import logging
import serial
import kiss.constants
import kiss.util
class KISS(object):
"""KISS Object Class."""
"""
KISS Python Module.
~~~~
logger = logging.getLogger(__name__)
logger.setLevel(kiss.constants.LOG_LEVEL)
console_handler = logging.StreamHandler()
console_handler.setLevel(kiss.constants.LOG_LEVEL)
formatter = logging.Formatter(kiss.constants.LOG_FORMAT)
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
logger.propagate = False
def __init__(self, port, speed):
self.port = port
self.speed = speed
self.serial_int = None # TODO Potentially very f*cking unsafe.
:author: Greg Albrecht W2GMD <gba@onbeep.com>
:copyright: Copyright 2013 OnBeep, Inc.
:license: Apache 2.0, see LICENSE for details.
:source: <https://github.com/ampledata/kiss>
def start(self):
"""
Initializes the KISS device and commits configuration.
"""
self.logger.debug('start()')
self.serial_int = serial.Serial(self.port, self.speed)
self.serial_int.timeout = kiss.constants.SERIAL_TIMEOUT
"""
# http://en.wikipedia.org/wiki/KISS_(TNC)#Command_Codes
kiss_config = {} # TODO Yes, this isn't complete.
for setting in ['TX_DELAY', 'PERSISTENCE', 'SLOT_TIME', 'TX_TAIL',
'FULL_DUPLEX']:
if kiss_config.get(setting):
self.write_setting(kiss_config[setting])
def write_setting(self, setting):
"""
Writes KISS Command Codes to attached device.
:param setting: KISS Command Code to write.
"""
return self.serial_int.write(
kiss.constants.FEND +
kiss.constants.FULL_DUPLEX +
kiss.util.escape_special_chars(setting) +
kiss.constants.FEND
)
def read(self, callback=None):
"""
Reads data from KISS device.
:param callback: Callback to call with decoded data.
"""
read_buffer = ''
while 1:
read_data = self.serial_int.read(kiss.constants.READ_BYTES)
waiting_data = self.serial_int.inWaiting()
if waiting_data:
read_data = ''.join([
read_data, self.serial_int.read(waiting_data)])
__title__ = 'kiss'
__version__ = '0.0.3'
__build__ = '0x000003'
__author__ = 'Greg Albrecht W2GMD <gba@onbeep.com>'
__license__ = 'Apache 2.0'
__copyright__ = 'Copyright 2013 OnBeep, Inc.'
if read_data:
frames = []
split_data = read_data.split(kiss.constants.FEND)
len_fend = len(split_data)
self.logger.debug('len_fend=%s', len_fend)
import logging
# No FEND in frame
if len_fend == 1:
read_buffer = ''.join([read_buffer, split_data[0]])
# Single FEND in frame
elif len_fend == 2:
# Closing FEND found
if split_data[0]:
# Partial frame continued, otherwise drop
frames.append(''.join([read_buffer, split_data[0]]))
read_buffer = ''
# Opening FEND found
else:
frames.append(read_buffer)
read_buffer = split_data[1]
# At least one complete frame received
elif len_fend >= 3:
for i in range(0, len_fend - 1):
_str = ''.join([read_buffer, split_data[i]])
if _str:
frames.append(_str)
read_buffer = ''
if split_data[len_fend - 1]:
read_buffer = split_data[len_fend - 1]
from .classes import KISS
# Loop through received frames
for frame in frames:
if len(frame) and ord(frame[0]) == 0:
txt = kiss.util.format_aprs_frame(frame[1:])
if txt:
self.logger.info('txt=%s', txt)
if callback:
callback(txt)
def write(self, data):
"""
Writes data to KISS device.
# Set default logging handler to avoid "No handler found" warnings.
try: # Python 2.7+
from logging import NullHandler
except ImportError:
class NullHandler(logging.Handler):
def emit(self, record):
pass
:param data: Data to write.
"""
raw = kiss.util.txt2raw(data)
self.logger.debug('raw=%s', raw)
frame = ''.join([
kiss.constants.FEND,
kiss.constants.DATA_FRAME,
kiss.util.raw2kiss(raw),
kiss.constants.FEND
])
self.logger.debug('frame=%s', frame)
self.serial_int.write(frame)
logging.getLogger(__name__).addHandler(NullHandler())
\ No newline at end of file
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""KISS Core Classes."""
__author__ = 'Greg Albrecht W2GMD <gba@onbeep.com>'
__copyright__ = 'Copyright 2013 OnBeep, Inc.'
__license__ = 'Apache 2.0'
import logging
import serial
import kiss.constants
import kiss.util
class KISS(object):
"""KISS Object Class."""
logger = logging.getLogger(__name__)
logger.setLevel(kiss.constants.LOG_LEVEL)
console_handler = logging.StreamHandler()
console_handler.setLevel(kiss.constants.LOG_LEVEL)
formatter = logging.Formatter(kiss.constants.LOG_FORMAT)
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
logger.propagate = False
def __init__(self, port, speed):
self.port = port
self.speed = speed
self.serial_int = None # TODO Potentially very f*cking unsafe.
def start(self):
"""
Initializes the KISS device and commits configuration.
"""
self.logger.debug('start()')
self.serial_int = serial.Serial(self.port, self.speed)
self.serial_int.timeout = kiss.constants.SERIAL_TIMEOUT
# http://en.wikipedia.org/wiki/KISS_(TNC)#Command_Codes
kiss_config = {} # TODO Yes, this isn't complete.
for setting in ['TX_DELAY', 'PERSISTENCE', 'SLOT_TIME', 'TX_TAIL',
'FULL_DUPLEX']:
if kiss_config.get(setting):
self.write_setting(kiss_config[setting])
def write_setting(self, setting):
"""
Writes KISS Command Codes to attached device.
:param setting: KISS Command Code to write.
"""
return self.serial_int.write(
kiss.constants.FEND +
kiss.constants.FULL_DUPLEX +
kiss.util.escape_special_codes(setting) +
kiss.constants.FEND
)
def read(self, callback=None):
"""
Reads data from KISS device.
:param callback: Callback to call with decoded data.
"""
read_buffer = ''
while 1:
read_data = self.serial_int.read(kiss.constants.READ_BYTES)
waiting_data = self.serial_int.inWaiting()
if waiting_data:
read_data = ''.join([
read_data, self.serial_int.read(waiting_data)])
if read_data:
frames = []
split_data = read_data.split(kiss.constants.FEND)
len_fend = len(split_data)
self.logger.debug('len_fend=%s', len_fend)
# No FEND in frame
if len_fend == 1:
read_buffer = ''.join([read_buffer, split_data[0]])
# Single FEND in frame
elif len_fend == 2:
# Closing FEND found
if split_data[0]:
# Partial frame continued, otherwise drop
frames.append(''.join([read_buffer, split_data[0]]))
read_buffer = ''
# Opening FEND found
else:
frames.append(read_buffer)
read_buffer = split_data[1]
# At least one complete frame received
elif len_fend >= 3:
for i in range(0, len_fend - 1):
_str = ''.join([read_buffer, split_data[i]])
if _str:
frames.append(_str)
read_buffer = ''
if split_data[len_fend - 1]:
read_buffer = split_data[len_fend - 1]
# Loop through received frames
for frame in frames:
if len(frame) and ord(frame[0]) == 0:
with open('full_frame.log', 'a+') as full_frame:
full_frame.write(frame[1:] + '\n')
self.logger.info('frame=%s', frame)
if callback:
callback(frame)
def write(self, frame):
"""
Writes frame to KISS device.
:param frame: Frame to write.
"""
return self.serial_int.write(''.join([
kiss.constants.FEND,
kiss.constants.DATA_FRAME,
kiss.util.escape_special_codes(frame),
kiss.constants.FEND
]))
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""Constants for KISS Module."""
"""Constants for KISS Python Module."""
__author__ = 'Greg Albrecht W2GMD <gba@onbeep.com>'
__copyright__ = 'Copyright 2013 OnBeep, Inc.'
__license__ = 'Apache License 2.0'
__license__ = 'Apache 2.0'
import logging
......@@ -14,6 +14,7 @@ import logging
LOG_LEVEL = logging.INFO
LOG_FORMAT = ('%(asctime)s %(levelname)s %(name)s.%(funcName)s:%(lineno)d'
' - %(message)s')
SERIAL_TIMEOUT = 0.01
READ_BYTES = 1000
......@@ -24,14 +25,12 @@ FESC = chr(0xDB)
TFEND = chr(0xDC)
TFESC = chr(0xDD)
# "FEND is sent as FESC, TFEND"
FEND_TFEND = ''.join([FEND, TFEND])
FESC_TFEND = ''.join([FESC, TFEND])
# "FESC is sent as FESC, TFESC"
FESC_TFESC = ''.join([FESC, TFESC])
# KISS Command Codes
# http://en.wikipedia.org/wiki/KISS_(TNC)#Command_Codes
DATA_FRAME = chr(0x00)
......
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""Utilities for the Python KISS Module."""
"""Utilities for the KISS Python Module."""
__author__ = 'Greg Albrecht word2GMD <gba@onbeep.com>'
__author__ = 'Greg Albrecht W2GMD <gba@onbeep.com>'
__copyright__ = 'Copyright 2013 OnBeep, Inc.'
__license__ = 'Apache License 2.0'
__license__ = 'Apache 2.0'
import logging
......@@ -23,235 +23,19 @@ logger.addHandler(console_handler)
logger.propagate = False
def escape_special_chars(raw_char):
def escape_special_codes(raw_codes):
"""
Escape special characters, per KISS spec.
Escape special codes, per KISS spec.
"If the FEND or FESC codes appear in the data to be transferred, they
need to be escaped. The FEND code is then sent as FESC, TFEND and the
FESC is then sent as FESC, TFESC."
- http://en.wikipedia.org/wiki/KISS_(TNC)#Description
Borrowed from dixprs.
"""
logger.debug('raw_char=%s', raw_char)
kiss_char = raw_char.replace(
kiss.constants.FEND,
kiss.constants.FEND_TFEND
).replace(
return raw_codes.replace(
kiss.constants.FESC,
kiss.constants.FESC_TFESC
)
logger.debug('kiss_char=%s', kiss_char)
return kiss_char
def valid_callsign(callsign):
"""
Validates callsign.
Parameters:
callsign: Callsign candidate.
Returns:
bool
"""
logger.debug('callsign=%s', callsign)
split_cs = callsign.split('-')
if len(split_cs) > 2:
return True
if len(split_cs[0]) < 1 or len(split_cs[0]) > 6:
return True
for p in split_cs[0]:
if not (p.isalpha() or p.isdigit()):
return True
if split_cs[0].isalpha() or split_cs[0].isdigit():
return True
if len(split_cs) == 2:
try:
ssid = int(split_cs[1])
if ssid < 0 or ssid > 15:
return True
except ValueError:
return True
return False
# TODO Add example raw frame.
def extract_callsign(raw_frame):
"""
Extracts callsign from a raw KISS frame.
Test & Example:
>>> raw_frame = ''
>>> a = extract_callsign(raw_frame)
>>> a
{'callsign': 'W2GMD', 'ssid': 10}
:param raw_frame: Raw KISS Frame to decode.
:returns: Dict of callsign and ssid.
:rtype: dict
"""
logger.debug('raw_frame=%s', raw_frame)
callsign = ''.join([chr(ord(x) >> 1) for x in raw_frame[:6]]).strip()
ssid = (ord(raw_frame[6]) >> 1) & 0x0f
logger.debug('ssid=%s callsign=%s', ssid, callsign)
return {'callsign': callsign, 'ssid': ssid}
def full_callsign(raw_frame):
"""
Extract raw frame and returns full callsign (call + ssid).
:param raw_frame: Raw KISS Frame to extract callsign from.
:returns: Callsign[-SSID].
:rtype: str
"""
extracted = extract_callsign(raw_frame)
if extracted['ssid'] > 0:
return '-'.join([extracted['callsign'], str(extracted['ssid'])])
else:
return extracted['callsign']
def extract_path(start, raw_frame):
full_path = []
for i in range(2, start):
path = full_callsign(raw_frame[i * 7:])
if path:
if ord(raw_frame[i * 7 + 6]) & 0x80:
full_path.append(''.join([path, '*']))
else:
full_path.append(path)
return full_path
def format_path(start, raw_frame):
return ','.join(extract_path(start, raw_frame))
def decode_aprs_frame(raw_frame):
logger.debug('raw_frame=%s', raw_frame)
decoded_frame = {}
frame_len = len(raw_frame)
logger.debug('frame_len=%s', frame_len)
if frame_len > 16:
for raw_slice in range(0, frame_len):
# Is address field length correct?
if ord(raw_frame[raw_slice]) & 0x01 and ((raw_slice + 1) % 7) == 0:
n = (raw_slice + 1) / 7
# Less than 2 callsigns?
if n >= 2 and n < 10:
logger.debug('n=%s', n)
break
if (ord(raw_frame[raw_slice + 1]) & 0x03 == 0x03 and
ord(raw_frame[raw_slice + 2]) == 0xf0):
decoded_frame['text'] = raw_frame[raw_slice + 3:]
decoded_frame['destination'] = full_callsign(raw_frame)
decoded_frame['source'] = full_callsign(raw_frame[7:])
decoded_frame['path'] = format_path(n, raw_frame)
return decoded_frame
def format_aprs_frame(raw_frame):
logger.debug('raw_frame=%s', raw_frame)
decoded_frame = decode_aprs_frame(raw_frame)
formatted_frame = '>'.join([
decoded_frame['source'], decoded_frame['destination']])
formatted_frame = ','.join([formatted_frame, decoded_frame['path']])
formatted_frame = ':'.join([formatted_frame, decoded_frame['text']])
logger.debug('formatted_frame=%s', formatted_frame)
return formatted_frame
def kk2(ctxt):
logger.debug(locals())
if ctxt[-1] == '*':
s = ctxt[:-1]
digi = True
else:
s = ctxt
digi = False
ssid = 0
w1 = s.split('-')
call = w1[0]
while len(call) < 6:
call += ' '
r = ''
for p in call:
r += chr(ord(p) << 1)
if not len(w1) == 1:
try:
ssid = int(w1[1])
except ValueError:
return ''
ct = (ssid << 1) | 0x60
if digi:
ct |= 0x80
return r + chr(ct)
def txt2raw(s):
logger.debug(locals())
ix = s.find(':')
if ix:
hdr = s[:ix]
inf = s[ix + 1:]
w1 = hdr.split('>')
call_from = w1[0]
w2 = w1[1].split(',')
call_to = w2[0]
r = kk2(call_to) + kk2(call_from)
for i in range(1, len(w2)):
if len(w2[i]) > 1:
r += kk2(w2[i])
rr = ''.join([
r[:-1],
chr(ord(r[-1]) | 0x01),
kiss.constants.SLOT_TIME,
chr(0xf0),
inf
])
return rr
def raw2kiss(raw):
"""
Escape special characters to make it binary transparent.
Inspired by dixprs.
"""
return raw.replace(
).replace(
kiss.constants.FEND,
''.join([kiss.constants.FESC, kiss.constants.TFEND])
).replace(kiss.constants.FEND, kiss.constants.FESC_TFESC)
kiss.constants.FESC_TFEND
)
......@@ -2,7 +2,7 @@
# -*- coding: utf-8 -*-
"""
Setup for kiss.
Setup for the KISS Python Module.
Source:: https://github.com/ampledata/kiss
"""
......@@ -12,29 +12,38 @@ __copyright__ = 'Copyright 2013 Onbeep, Inc.'
__license__ = 'Apache License 2.0'
import setuptools
import os
import sys
import kiss
def read_readme():
"""Reads in README file for use in setuptools."""
with open('README.rst') as rmf:
rmf.read()
try:
from setuptools import setup
except ImportError:
from distutils.core import setup
setuptools.setup(
def publish():
if sys.argv[-1] == 'publish':
os.system('python setup.py sdist upload')
sys.exit()
publish()
setup(
name='kiss',
version='0.0.2',
description=('KISS is a protocol for communicating with a serial TNC '
'device used for Amateur Radio.'),
version=kiss.__version__,
description='KISS Python Module.',
long_description=open('README.rst').read(),
author='Greg Albrecht',
author_email='gba@onbeep.com',
long_description=('A Python implementation of the KISS Protocol for '
'communicating with serial TNC devices for use with '
'Amateur Radio.'),
license='See LICENSE.txt',
copyright='See COPYRIGHT.txt',
license=open('LICENSE').read(),
url='https://github.com/ampledata/kiss',
setup_requires=['nose'],
tests_require=['coverage', 'nose'],
install_requires=['pyserial']
install_requires=['pyserial'],
package_dir={'kiss': 'kiss'},
zip_safe=False
)
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""Constants for KISS Module Tests."""
__author__ = 'Greg Albrecht W2GMD <gba@onbeep.com>'
__copyright__ = 'Copyright 2013 OnBeep, Inc.'
__license__ = 'Apache 2.0'
TEST_FRAMES = 'tests/test_frames.log'
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import sys
sys.path.insert(0, os.path.abspath('..'))
import kiss
dh`d@lb@c!3745.75NI12228.05W#W2GMD-6 Inner Sunset, SF iGate/Digipeater http://w2gmd.org
dh`d@lb@cT#939,10.9,4.5,57.0,1.0,18.0,00000000
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""Tests for KISS Module."""
__author__ = 'Greg Albrecht W2GMD <gba@onbeep.com>'
__copyright__ = 'Copyright 2013 OnBeep, Inc.'
__license__ = 'Apache 2.0'
import unittest
from .context import kiss
from . import constants
class KISSTestCase(unittest.TestCase):
def setUp(self):
"""Setup."""
self.test_frames = open(constants.TEST_FRAMES, 'r')
self.test_frame = self.test_frames.readlines()[0].strip()
def tearDown(self):
"""Teardown."""
self.test_frames.close()
def test_escape_special_codes_fend(self):
"""
Tests `kiss.util.escape_special_codes` util function.
"""
fend = kiss.util.escape_special_codes(kiss.constants.FEND)
self.assertEqual(fend, kiss.constants.FESC_TFEND)
def test_escape_special_codes_fesc(self):
"""
Tests `kiss.util.escape_special_codes` util function.
"""
fesc = kiss.util.escape_special_codes(kiss.constants.FESC)
self.assertEqual(fesc, kiss.constants.FESC_TFESC)
if __name__ == '__main__':
unittest.main()
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment