Source code for pprzlink.message

#
# This file is part of PPRZLINK.
# 
# PPRZLINK is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# PPRZLINK is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with PPRZLINK.  If not, see <https://www.gnu.org/licenses/>.
#

"""
Paparazzi message representation

"""

from __future__ import division, print_function
import sys
import json
import struct
import re
from pprzlink import messages_xml_map


[docs]class PprzMessageError(Exception): def __init__(self, message, inner_exception=None): self.message = message self.inner_exception = inner_exception self.exception_info = sys.exc_info() def __str__(self): return self.message
[docs]class PprzMessage(object): """base Paparazzi message class""" def __init__(self, class_name, msg, component_id=0): if isinstance(class_name, int): # class_name is an integer, find the name # TODO handle None case self._class_id = class_name self._class_name = messages_xml_map.get_class_name(self._class_id) else: self._class_name = class_name self._class_id = messages_xml_map.get_class_id(class_name) self._component_id = component_id if isinstance(msg, int): self._id = msg self._name = messages_xml_map.get_msg_name(self._class_name, msg) else: self._name = msg self._id = messages_xml_map.get_msg_id(self._class_name, msg) self._fieldnames = messages_xml_map.get_msg_fields(self._class_name, self._name) self._fieldtypes = messages_xml_map.get_msg_fieldtypes(self._class_name, self._id) self._fieldcoefs = messages_xml_map.get_msg_fieldcoefs(self._class_name, self._id) self._fieldvalues = [] # set empty values according to type for t in self._fieldtypes: if t == "char[]": self._fieldvalues.append('') elif '[' in t: self._fieldvalues.append([0]) else: self._fieldvalues.append(0) if messages_xml_map.message_dictionary_broadcast[self._name]=='forwarded': self.broadcasted = False else: self.broadcasted = True @property def name(self): """Get the message name.""" return self._name @property def msg_id(self): """Get the message id.""" return self._id @property def class_id(self): """Get the class id.""" return self._class_id @property def msg_class(self): """Get the message class.""" return self._class_name @property def fieldnames(self): """Get list of field names.""" return self._fieldnames @property def fieldvalues(self): """Get list of field values.""" return self._fieldvalues @property def fieldtypes(self): """Get list of field types.""" return self._fieldtypes @property def fieldcoefs(self): """Get list of field coefs.""" return self._fieldcoefs
[docs] def fieldbintypes(self, t): """Get type and length for binary format""" data_types = { 'float': ['f', 4], 'double': ['d', 8], 'uint8': ['B', 1], 'uint16': ['H', 2], 'uint32': ['L', 4], 'int8': ['b', 1], 'int16': ['h', 2], 'int32': ['l', 4], 'char': ['c', 1] } base_type = t.split('[')[0] return data_types[base_type]
[docs] def get_field(self, idx): """Get field value by index.""" return self._fieldvalues[idx]
def __getattr__(self, attr): # Try to dynamically return the field value for the given name for idx, f in enumerate(self.fieldnames): if f == attr: return self.fieldvalues[idx] raise AttributeError("No such attribute %s" % attr) def __getitem__(self, key): # Try to dynamically return the field value for the given name for idx, f in enumerate(self.fieldnames): if f == key: return self.fieldvalues[idx] raise AttributeError("Msg %s has no field of name %s" % (self.name, key)) def __setitem__(self, key, value): self.set_value_by_name(key, value) def set_values(self, values): #print("msg %s: %s" % (self.name, ", ".join(self.fieldnames))) if len(values) == len(self.fieldnames): self._fieldvalues = values else: raise PprzMessageError("Error: Msg %s has %d fields, tried to set %i values" % (self.name, len(self.fieldnames), len(values))) def set_value_by_name(self, name, value): # Try to set a value from its name for idx, f in enumerate(self.fieldnames): if f == name: self._fieldvalues[idx] = value return raise AttributeError("Msg %s has no field of name %s" % (self.name, name)) def __str__(self): ret = '%s.%s {' % (self.msg_class, self.name) for idx, f in enumerate(self.fieldnames): ret += '%s : %s, ' % (f, self.fieldvalues[idx]) ret = ret + '}' return ret def to_dict(self, payload_only=False): d = {} if not payload_only: d['msgname'] = self.name d['msgclass'] = self.msg_class for idx, f in enumerate(self.fieldnames): d[f] = self.fieldvalues[idx] return d def to_json(self, payload_only=False): return json.dumps(self.to_dict(payload_only))
[docs] def to_csv(self, payload_only=False): """ return message as CSV string for use with RAW_DATALINK msg_name;field1;field2; """ return str(self.name) + ';' + self.payload_to_ivy_string(sep=';')
def payload_to_ivy_string(self, sep=' '): fields = [] for idx, t in enumerate(self.fieldtypes): if "char[" in t: str_value ='' for c in self.fieldvalues[idx]: if isinstance(c, bytes): str_value += c.decode() else: str_value += c fields.append('"' + str_value + '"') elif '[' in t: fields.append(','.join([str(x) for x in self.fieldvalues[idx]])) else: fields.append(str(self.fieldvalues[idx])) ivy_str = sep.join(fields) return ivy_str
[docs] def ivy_string_to_payload(self, data): """ parse Ivy data string to PPRZ values header and message name should have been removed Basically parts/args in string are separated by space, but char array can also contain a space: ``|f,o,o, ,b,a,r|`` in old format or ``"foo bar"`` in new format """ # first split on array delimiters # then slip on spaces and remove empty stings values = [] for el in re.split('([|\"][^|\"]*[|\"])', data): if '|' not in el and '"' not in el: # split non-array strings further up for e in [d for d in el.split(' ') if d != '']: if ',' in e: # array but not a string values.append([x for x in e.split(',') if x != '']) else: # not an array values.append(e) else: # add string array (stripped) values.append(str.strip(el)) self.set_values(values)
def payload_to_binary(self): struct_string = "<" data = [] length = 0 r = re.compile('[\[\]]') for idx, t in enumerate(self.fieldtypes): bin_type = self.fieldbintypes(t) s = r.split(t) if len(s) > 1: # this is an array array_length = len(self.fieldvalues[idx]) if len(s[1]) == 0: # this is a variable length array struct_string += 'B' data.append(array_length) length += 1 struct_string += bin_type[0] * array_length length += bin_type[1] * array_length for x in self.fieldvalues[idx]: if bin_type[0]=='f' or bin_type[0]== 'd': data.append(float(x)) elif bin_type[0]== 'B' or bin_type[0]== 'H' or bin_type[0]== 'L' or bin_type[0]== 'b' or bin_type[0]== 'h' or bin_type[0]== 'l': data.append(int(x)) elif bin_type[0]== 'c': data.append(x.encode()) else: data.append(x) else: struct_string += bin_type[0] length += bin_type[1] # Assign the right type according to field description if bin_type[0]=='f' or bin_type[0]== 'd': data.append(float(self.fieldvalues[idx])) elif bin_type[0]== 'B' or bin_type[0]== 'H' or bin_type[0]== 'L' or bin_type[0]== 'b' or bin_type[0]== 'h' or bin_type[0]== 'l': data.append(int(self.fieldvalues[idx])) else: data.append(self.fieldvalues[idx]) msg = struct.pack(struct_string, *data) return msg def binary_to_payload(self, data): msg_offset = 0 values = [] r = re.compile('[\[\]]') for idx, t in enumerate(self.fieldtypes): bin_type = self.fieldbintypes(t) s = r.split(t) if len(s) > 1: # this is an array if len(s[1]) == 0: # this is a variable length array array_length = data[msg_offset] msg_offset += 1 else: array_length = int(s[1]) array_value = [] for count in range(0, array_length): array_value.append(struct.unpack('<' + bin_type[0], data[msg_offset:msg_offset + bin_type[1]])[0]) msg_offset = msg_offset + bin_type[1] values.append(array_value) else: value = struct.unpack('<' + bin_type[0], data[msg_offset:msg_offset + bin_type[1]])[0] msg_offset = msg_offset + bin_type[1] values.append(value) self.set_values(values)
def test(): import argparse parser = argparse.ArgumentParser() parser.add_argument("-f", "--file", help="path to messages.xml file") parser.add_argument("-c", "--class", help="message class", dest="msg_class", default="telemetry") args = parser.parse_args() messages_xml_map.parse_messages(args.file) messages = [PprzMessage(args.msg_class, n) for n in messages_xml_map.get_msgs(args.msg_class)] print("Listing %i messages in '%s' msg_class" % (len(messages), args.msg_class)) for msg in messages: print(msg) if __name__ == '__main__': test()