Source code for pprzlink.serial

#
# This file is part of PPRZLINK.
# 
# PPRZLINK is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# PPRZLINK is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with PPRZLINK.  If not, see <https://www.gnu.org/licenses/>.
#

from __future__ import absolute_import, division, print_function

import threading
import serial
import logging
import time
import struct
from enum import Enum
from typing import List, Tuple, Dict, Any

from pprzlink.message import PprzMessage
from pprzlink.pprz_transport import PprzTransport


logger = logging.getLogger("PprzLink")


[docs]class SerialMessagesInterface(threading.Thread): def __init__(self, callback, verbose=False, device='/dev/ttyUSB0', baudrate=115200, msg_class='telemetry', interface_id=0): threading.Thread.__init__(self) self.callback = callback self.verbose = verbose self.msg_class = msg_class self.id = interface_id self.running = True try: self.ser = serial.Serial(device, baudrate, timeout=1.0) except serial.SerialException: logger.error("Error: unable to open serial port '%s'" % device) exit(0) self.trans = PprzTransport(msg_class) def stop(self): logger.info("End thread and close serial link") self.running = False self.ser.close() def shutdown(self): self.stop() def __del__(self): try: self.ser.close() except: pass
[docs] def send(self, msg, sender_id=None, receiver_id=0, component_id=0): """ Send a message over a serial link""" if sender_id is None: sender_id = self.id if isinstance(msg, PprzMessage): data = self.trans.pack_pprz_msg(sender_id, msg, receiver_id, component_id) self.ser.write(data) self.ser.flush()
[docs] def run(self): """Thread running function""" try: while self.running: # Parse incoming data c = self.ser.read(1) if len(c) == 1: if self.trans.parse_byte(c): try: (sender_id, receiver_id, component_id, msg) = self.trans.unpack() except ValueError as e: logger.warning("Ignoring unknown message, %s" % e) else: if self.verbose: # See the note on the same line in v1.0 logger.info("New incoming message '%s' from %i (%i) to %i" % (msg.name, sender_id, component_id, receiver_id)) # Callback function on new message if self.id == receiver_id: self.callback(sender_id, msg) except StopIteration: pass
class XbeeMessagesInterface(threading.Thread): XBEE_API_START = 0x7E GROUND_STATION_ADDR = 0x0100 DEFAULT_TIMEOUT = 0.1 class RxState(Enum): WAIT_START = 0 GET_LEN = 1 GET_FRAME_DATA = 2 class ATStatus(Enum): Ok = 0 Error = 1 InvalidCmd = 2 InvalidParam = 3 class APITypes(Enum): MODEM_STATUS = 0x8A AT_CMD = 0x08 AT_CMD_QU = 0x09 AT_CMD_RES = 0x88 REMOTE_AT_REQ = 0x17 REMOTE_AT_RES = 0x97 TX_64 = 0x00 TX_16 = 0x01 TX_STATUS = 0x89 RX_64 = 0x80 RX_16 = 0x81 RX_IO_64 = 0x82 RX_IO_16 = 0x83 @staticmethod def from16(nb: int): return nb.to_bytes(2, 'big') @staticmethod def to16(data: bytes): return int.from_bytes(data, 'big') def __init__(self, callback, id, port, baudrate=57600, verbose=False): threading.Thread.__init__(self) self.id = id if id == 0: self.id = self.GROUND_STATION_ADDR self.ser = serial.Serial(port, baudrate) self.rx_state = XbeeMessagesInterface.RxState.WAIT_START self.bytes_needed = 1 self.running = True self._frame_id = 0 self.responses = {} # type: Dict[int, Any] self.verbose = verbose self.callback = callback self.buffer = bytearray() def start(self) -> None: threading.Thread.start(self) self.at_cmd(b'MY', self.from16(self.id), resp=False) def stop(self): if self.running: self.running = False time.sleep(0.1) self.ser.close() def __del__(self): self.stop() @staticmethod def checksum(frame): return 0xFF - (sum(frame) & 0xFF) @property def frame_id(self): self._frame_id = (self._frame_id + 1) % 0xFF return self._frame_id def send(self, msg: PprzMessage, sender_id=None, receiver_id: int = 0, component_id=0, ack=False): """ msg: PprzMessage destination: integer. 0 is the ground station, 0xFF is the broadcast address Can also be > 0XFF (<0xFFFF) (but not for drones). receiver id will be set to 0 (like ground station). """ if not isinstance(msg, PprzMessage): raise TypeError("not a PprzMessage") if not (isinstance(receiver_id, int) and receiver_id < 0xFFFF): raise TypeError("destination invalid") if sender_id is None: msg_sender = self.id else: msg_sender = sender_id if self.id == self.GROUND_STATION_ADDR: msg_sender = 0 xbee_dest = receiver_id if receiver_id == 0xFF: # broadcast : xbee use 0xFFFF xbee_dest = 0xFFFF elif receiver_id == 0: # ground station xbee_dest = self.GROUND_STATION_ADDR elif receiver_id > 0xFF: receiver_id = 0 # for non-drone destinations, assume its ground class_comp = msg.class_id | (component_id << 4) header = struct.pack("<BBBB", msg_sender, receiver_id, class_comp, msg.msg_id) payload = header + msg.payload_to_binary() self.send_data(xbee_dest, payload, ack=ack) def rx_16_cb(self, source, rssi, options, data: bytes): sender = data[0] receiver = data[1] class_id = data[2] & 0x0F component_id = data[2] >> 4 msg_id = data[3] msg_payload = data[4:] if (receiver == self.id or # message addressed to me receiver == 0xFF or # broadcast message (receiver == 0 and self.id == self.GROUND_STATION_ADDR)): # message to ground station msg = PprzMessage(class_id, msg_id) msg.binary_to_payload(msg_payload) self.callback(sender, msg) def handle_rcv(self, frame): frame_type = frame[0] if frame_type == self.APITypes.MODEM_STATUS.value: pass # TODO elif int(frame_type) == self.APITypes.AT_CMD_RES.value: frame_id = frame[1] at_cmd = frame[2:4] status = self.ATStatus(frame[4]) data = frame[5:] self.responses[frame_id] = (status, data) elif frame_type == self.APITypes.RX_16.value: source_addr = self.to16(frame[1:3]) rssi = frame[3] options = frame[4] data = frame[5:] self.rx_16_cb(source_addr, rssi, options, data) elif frame_type == self.APITypes.TX_STATUS.value: frame_id = frame[1] status = frame[2] self.responses[frame_id] = status else: print("unknown frame type:", hex(frame_type)) def send_data(self, dest, data, ack=False, options=0): if ack: frame_id = self.frame_id else: frame_id = 0 frame = struct.pack(">BBHB", self.APITypes.TX_16.value, frame_id, dest, options) + data self.transmit_frame(frame) def get_response_timeout(self, frame_id, timeout=DEFAULT_TIMEOUT) -> Any: if frame_id == 0: return None start_time = time.time() while frame_id not in self.responses: if time.time() - start_time > timeout: raise TimeoutError() time.sleep(0.01) return self.responses.pop(frame_id) def at_cmd(self, at, value=None, resp=True): if resp: frame_id = self.frame_id else: frame_id = 0 cmd = struct.pack("<BB", 0x08, frame_id) + at if value is not None: cmd += value self.transmit_frame(cmd) return self.get_response_timeout(frame_id) def transmit_frame(self, frame): data = struct.pack(">BH", self.XBEE_API_START, len(frame)) + frame + struct.pack("<B", self.checksum(frame)) self.ser.write(data) # self.ser.flush() def run(self): while self.running: self.buffer.extend(self.ser.read()) if len(self.buffer) < self.bytes_needed: continue data = self.buffer[0:self.bytes_needed] self.buffer = self.buffer[self.bytes_needed:] if self.rx_state == self.RxState.WAIT_START: if data[0] == self.XBEE_API_START: self.rx_state = self.RxState.GET_LEN self.bytes_needed = 2 elif self.rx_state == self.RxState.GET_LEN: self.bytes_needed = ((data[0] << 8) + data[1]) + 1 # Add 1 byte for the checksum self.rx_state = self.RxState.GET_FRAME_DATA elif self.rx_state == self.RxState.GET_FRAME_DATA: frame = data[:-1] chk = data[-1] if self.checksum(frame) == chk: self.handle_rcv(frame) else: print("invalid chk: {} {}".format(sum(frame), chk)) self.rx_state = self.RxState.WAIT_START self.bytes_needed = 1 print("stopped") def test(): ''' run test program as a module to avoid namespace conflicts with serial module: python -p pprzlink.serial pprzlink should be installed in a python standard path or included to your PYTHONPATH ''' import time import argparse from pprzlink import messages_xml_map parser = argparse.ArgumentParser() parser.add_argument("-f", "--file", help="path to messages.xml file") parser.add_argument("-c", "--class", help="message class", dest='msg_class', default='telemetry') parser.add_argument("-d", "--device", help="device name", dest='dev', default='/dev/ttyUSB0') parser.add_argument("-b", "--baudrate", help="baudrate", dest='baud', default=115200, type=int) parser.add_argument("-id", "--ac_id", help="aircraft id (receiver)", dest='ac_id', default=42, type=int) parser.add_argument("--interface_id", help="interface id (sender)", dest='id', default=0, type=int) parser.add_argument("-t", "--transport", help="pprz or xbee", dest='transport', default="serial") args = parser.parse_args() messages_xml_map.parse_messages(args.file) if args.transport == "pprz": interface = SerialMessagesInterface(lambda s, m: print("new message from %i: %s" % (s, m)), device=args.dev, baudrate=args.baud, msg_class=args.msg_class, interface_id=args.id, verbose=True) print("Starting serial interface on %s at %i baud" % (args.dev, args.baud)) elif args.transport == "xbee": interface = XbeeMessagesInterface(lambda s, m: print("new message from %i: %s" % (s, m)), args.id, args.dev, args.baud, verbose=True) print("Starting xbee interface on %s at %i baud" % (args.dev, args.baud)) else: print("No such transport: ", args.transport) exit(1) try: interface.start() # give the thread some time to properly start time.sleep(0.1) # send some datalink messages to aicraft for test ac_id = args.ac_id print("sending ping") ping = PprzMessage('datalink', 'PING') interface.send(ping, 0) get_setting = PprzMessage('datalink', 'GET_SETTING') get_setting['index'] = 0 get_setting['ac_id'] = ac_id interface.send(get_setting, 0) # change setting with index 0 (usually the telemetry mode) set_setting = PprzMessage('datalink', 'SETTING') set_setting['index'] = 12 set_setting['ac_id'] = ac_id set_setting['value'] = 1 interface.send(set_setting, 0) # block = PprzMessage('datalink', 'BLOCK') # block['block_id'] = 3 # block['ac_id'] = ac_id # serial_interface.send(block, 0) while interface.is_alive(): interface.join(1) except (KeyboardInterrupt, SystemExit): print('Shutting down...') interface.stop() exit() if __name__ == '__main__': test()