diff --git a/src/canmatrix/canmatrix.py b/src/canmatrix/canmatrix.py index e556e467..09ca9f86 100644 --- a/src/canmatrix/canmatrix.py +++ b/src/canmatrix/canmatrix.py @@ -468,8 +468,8 @@ def phys2raw(self, value=None): # if not (0 <= value <= 10): if not (self.min <= value <= self.max): logger.warning( - "Value {} is not valid for {}. Min={} and Max={}".format( - value, self, self.min, self.max) + "Signal {}: Value {} is not valid for {}. Min={} and Max={}".format( + self.name, value, self, self.min, self.max) ) raw_value = (self.float_factory(value) - self.float_factory(self.offset)) / self.float_factory(self.factor) @@ -612,6 +612,10 @@ def unpack_bitstring(length, is_float, is_signed, bits): value, = struct.unpack(float_type, bytearray(int(''.join(b), 2) for b in grouper(bits, 8))) else: value = int(bits, 2) + # try: + # value = int(bits, 2) + # except Exception as e: + # xx = 1 if is_signed and bits[0] == '1': value -= (1 << len(bits)) @@ -823,18 +827,28 @@ class Endpoint(object): @attr.s(eq=False) class AutosarE2EProperties(object): profile = attr.ib(default=None) # type: str + data_id_mode = attr.ib(default=None) # type: str data_ids = attr.ib(default=None) # type: List[int] data_length = attr.ib(default=None) # type: int @attr.s(eq=False) class AutosarSecOCProperties(object): - auth_algorithm = attr.ib(default="") # type: str - payload_length = attr.ib(default=0) # type: int - auth_tx_length = attr.ib(default=0) # type: int - data_id = attr.ib(default=0) # type: int - freshness_length = attr.ib(default=0) # type: int - freshness_tx_length = attr.ib(default=0) # type: int + secured_i_pdu_name = attr.ib(default="unknown") # type: str + authentic_i_pdu_name = attr.ib(default="unknown") # type: str + auth_algorithm = attr.ib(default=None) # type: str|None + authentic_pdu_length = attr.ib(default=0) # type: int + secured_i_pdu_length = attr.ib(default=0) # type: int + auth_tx_length = attr.ib(default=None) # type: int|None + data_id = attr.ib(default=None) # type: int|None + freshness_value_id = attr.ib(default=None) # type: int|None + freshness_length = attr.ib(default=None) # type: int|None + freshness_tx_length = attr.ib(default=None) # type: int|None + use_as_cryptographic_i_pdu = attr.ib(default=False) # type: bool + message_link_length = attr.ib(default=None) # type: int|None + message_link_position = attr.ib(default=None) # type: int|None + key_id = attr.ib(default=None) # type: int|None + @attr.s(eq=False) class Pdu(object): @@ -845,6 +859,8 @@ class Pdu(object): Whereas a PDU is the same than a frame on CAN bus, at flexray a frame may consist of multiple PDUs (a bit like multiple signal layout for multiplexed can frames). This class is only used for flexray busses. + Note: since container-pdus are supported for arxml, this class is also used for arxml (but only + for sub-pdus of container-pdus). """ name = attr.ib(default="") # type: str @@ -856,6 +872,9 @@ class Pdu(object): signals = attr.ib(factory=list) # type: typing.MutableSequence[Signal] signalGroups = attr.ib(factory=list) # type: typing.MutableSequence[SignalGroup] cycle_time = attr.ib(default=0) # type: int + secOC_properties = attr.ib(default=None) # type: Optional[AutosarSecOCProperties] + # offset is used for arxml, sub-pdu inside a static-container-pdu + offset_bytes = attr.ib(default=0) # type: int def add_signal(self, signal): # type: (Signal) -> Signal @@ -1503,56 +1522,86 @@ def unpack(self, data: bytes, f"Received message 0x{msg_id:04X} with wrong data size: {rx_length} instead of {self.size}") if self.is_pdu_container: + # note: PDU-Container without header is possible for ARXML-Container-PDUs with NO-HEADER + # that mean this are not dynamic Container-PDUs rather than static ones. (each sub-pdu has + # a fixed offset in the container) + header_signals = [] header_id_signal = self.signal_by_name("Header_ID") header_dlc_signal = self.signal_by_name("Header_DLC") - if header_id_signal is None or header_dlc_signal is None: - raise DecodingContainerPdu( - 'Received message 0x{:08X} without Header_ID or ' - 'Header_DLC signal'.format(self.arbitration_id.id) - ) + + if header_id_signal is not None: + header_signals.append(header_id_signal) + _header_id_signal_size = header_id_signal.size + else: + _header_id_signal_size = 0 + if header_dlc_signal is not None: + header_signals.append(header_dlc_signal) + _header_dlc_signal_size = header_dlc_signal.size + else: + _header_dlc_signal_size = 0 # TODO: may be we need to check that ID/DLC signals are contiguous - header_size = header_id_signal.size + header_dlc_signal.size + if len(header_signals) > 0 and len(header_signals) != 2: + raise DecodingConatainerPdu( + 'Received message 0x{:08X} with incorrect Header-Defintiion. ' + 'Header_ID signal or Header_DLC is missing'.format(self.arbitration_id.id) + ) + header_size = _header_id_signal_size + _header_dlc_signal_size little, big = self.bytes_to_bitstrings(data) size = self.size * 8 return_dict = dict({"pdus": []}) # decode signal which are not in PDUs - signals = [s for s in self.signals if s not in [header_id_signal, header_dlc_signal]] + signals = [s for s in self.signals if s not in header_signals] if signals: unpacked = self.bitstring_to_signal_list(signals, big, little, size) for s, v in zip(signals, unpacked): return_dict[s.name] = DecodedSignal(v, s) # decode PDUs - offset = header_id_signal.start_bit - header_signals = [header_id_signal, header_dlc_signal] - while (offset + header_size) < size: - unpacked = self.bitstring_to_signal_list( - header_signals, - big[offset:offset + header_size], - little[size - offset - header_size:size - offset], - header_size - ) - offset += header_size - pdu_id = unpacked[0] - pdu_dlc = unpacked[1] - for s, v in zip(header_signals, unpacked): - if s.name not in return_dict: - return_dict[s.name] = [] - return_dict[s.name].append(DecodedSignal(v, s)) - pdu = self.pdu_by_id(pdu_id) + offset = header_id_signal.start_bit if header_id_signal is not None else 0 + no_header_next_pdu_idx = 0 + # decode as long as there is data left to decode (if there is a header), or as long as there are sub-pdus + # left to decode (in case of static-container without pdu-headers) + while (offset + header_size) < size and no_header_next_pdu_idx < len(self.pdus): + if len(header_signals) > 0: + unpacked = self.bitstring_to_signal_list( + header_signals, + big[offset:offset + header_size], + little[size - offset - header_size:size - offset], + header_size + ) + offset += header_size + pdu_id = unpacked[0] + pdu_dlc = unpacked[1] + for s, v in zip(header_signals, unpacked): + if s.name not in return_dict: + return_dict[s.name] = [] + return_dict[s.name].append(DecodedSignal(v, s)) + pdu = self.pdu_by_id(pdu_id) + else: + # if there is no pdu-header, then we have a static container-pdu + # we have to loop all sub-pdus and set the offset to the offset of the PDU + # note: order of processing sub-PDUs is not important, even if the sub-PDUs are not ordered + # by the pdu-offset (we just set the offset correct to the actual processed sub-PDU) + pdu = self.pdus[no_header_next_pdu_idx] + no_header_next_pdu_idx += 1 + pdu_dlc = pdu.size + offset = pdu.offset_bytes * 8 + decode_size_bits = pdu_dlc * 8 if pdu is None: return_dict['pdus'].append(None) else: unpacked = self.bitstring_to_signal_list( pdu.signals, - big[offset:offset + pdu_dlc * 8], - little[size - offset - pdu_dlc * 8:size - offset], - pdu_dlc * 8 + big[offset:offset + decode_size_bits], + little[size - offset - decode_size_bits:size - offset], + decode_size_bits ) pdu_dict = dict() for s, v in zip(pdu.signals, unpacked): pdu_dict[s.name] = DecodedSignal(v, s) return_dict["pdus"].append({pdu.name: pdu_dict}) - offset += (pdu_dlc * 8) + if len(header_signals) > 0: + # if there is a pdu-header, we have to set the offset to the start of the next pdu + offset += decode_size_bits return return_dict else: little, big = self.bytes_to_bitstrings(data) diff --git a/src/canmatrix/cli/convert.py b/src/canmatrix/cli/convert.py index c2b692f6..2472e44f 100644 --- a/src/canmatrix/cli/convert.py +++ b/src/canmatrix/cli/convert.py @@ -96,6 +96,7 @@ def get_formats(): @click.option('--arxmlFlexray/--no-arxmlFlexray', 'decode_flexray', default = False, help="EXPERIMENTAL: import basic flexray data from ARXML") @click.option('--arxmlEthernet/--no-arxmlEthernet', 'decode_ethernet', default = False, help="EXPERIMENTAL: import basic ethernet data from ARXML") @click.option('--preferred-languages', 'preferred_languages', default = "EN,DE", help="the preferred languages will be given priority when there are comments or descriptions available in multiple languages\ndefault EN,DE") +@click.option('--arxmlUpdate-bit-init_1/--no-arxmlUpdate-bit-init_1', 'update_bit_init_1', default=False, help="Init generated Update-Bits with init-value 1 (True: init_value=1, False: init_value=0") # dbc switches diff --git a/src/canmatrix/formats/arxml.py b/src/canmatrix/formats/arxml.py index 9f93e2e3..fbd6cd31 100644 --- a/src/canmatrix/formats/arxml.py +++ b/src/canmatrix/formats/arxml.py @@ -307,6 +307,46 @@ def selector(self, start_element, selector): return sorted(result_list, key=lambda element: element.sourceline) +class AutosarBasePlatformTypes(): + """handing/helper for autosar Platform Types + see autosar document-No 48 'Specification of Platform Types' """ + _encoding_VOID = 'VOID' + _encoding_BOOLEAN = 'BOOLEAN' + _encoding_IEEE753 = 'IEEE754' + _encoding_2C = '2C' + _encoding_None = 'NONE' + _base_types = { + '/AUTOSAR_Platform/BaseTypes/dtRef_const_VOID': ('dtRef_const_VOID', _encoding_VOID), + '/AUTOSAR_Platform/BaseTypes/dtRef_VOID': ('dtRef_VOID', _encoding_VOID), + '/AUTOSAR_Platform/BaseTypes/boolean': ('boolean', _encoding_BOOLEAN), + '/AUTOSAR_Platform/BaseTypes/float32': ('float32', _encoding_IEEE753), + '/AUTOSAR_Platform/BaseTypes/float64': ('float64', _encoding_IEEE753), + '/AUTOSAR_Platform/BaseTypes/sint8': ('sint8', _encoding_2C), + '/AUTOSAR_Platform/BaseTypes/sint16': ('sint16', _encoding_2C), + '/AUTOSAR_Platform/BaseTypes/sint32': ('sint32', _encoding_2C), + '/AUTOSAR_Platform/BaseTypes/uint8': ('uint8', _encoding_None), + '/AUTOSAR_Platform/BaseTypes/uint16': ('uint16', _encoding_None), + '/AUTOSAR_Platform/BaseTypes/uint32': ('uint32', _encoding_None), + } + + @classmethod + def _get_reference_of_element(cls, element: _Element|str): + if isinstance(element, _Element): + return element.text + else: + return element + + @classmethod + def datatype_by_ref(cls, basetype_ref: _Element|str): + _definition = cls._base_types.get(cls._get_reference_of_element(basetype_ref), None) + return _definition[0] if _definition is not None else None + + @classmethod + def encoding_by_ref(cls, basetype_ref: _Element|str): + _definition = cls._base_types.get(cls._get_reference_of_element(basetype_ref), None) + return _definition[1] if _definition is not None else None + + def create_sub_element(parent, element_name, text=None, dest=None): # type: (_Element, str, typing.Optional[str], typing.Optional[str]) -> _Element sn = lxml.etree.SubElement(parent, element_name) @@ -997,19 +1037,21 @@ def get_signalgrp_and_signals(sys_signal, sys_signal_array, frame, group_id, ea) transformer_ele = ea.follow_ref(sys_signal, "TRANSFORMER-REF") e2e_properties = None if transformer_ele is not None: - e2e_profile = ea.get_child(transformer_ele, "PROFILE-NAME").text + _tmp_ele = ea.get_child(transformer_ele, "PROFILE-NAME") + e2e_profile = _tmp_ele.text if _tmp_ele is not None else None + _tmp_ele = ea.get_child(transformer_ele, "DATA-ID-MODE") + e2e_data_id_mode = _tmp_ele.text if _tmp_ele is not None else None trans_isignal_propss_elem = ea.get_child(sys_signal, "TRANSFORMATION-I-SIGNAL-PROPSS") - data_id_elems = ea.get_children(trans_isignal_propss_elem, "DATA-ID") - if data_id_elems is not None: - e2e_data_ids = [int(x.text, 0) for x in data_id_elems] + _tmp_ele = ea.get_children(trans_isignal_propss_elem, "DATA-ID") + if _tmp_ele is not None: + e2e_data_ids = [int(x.text, 0) for x in _tmp_ele] - data_len_elem = ea.get_child(trans_isignal_propss_elem, "DATA-LENGTH") - if data_len_elem is not None: - e2e_data_length = int(data_len_elem.text, 0) + _tmp_ele = ea.get_child(trans_isignal_propss_elem, "DATA-LENGTH") + e2e_data_length = int(_tmp_ele.text, 0) if _tmp_ele is not None else None - e2e_properties = canmatrix.AutosarE2EProperties(e2e_profile, e2e_data_ids, e2e_data_length) + e2e_properties = canmatrix.AutosarE2EProperties(e2e_profile, e2e_data_id_mode, e2e_data_ids, e2e_data_length) frame.add_signal_group(ea.get_element_name(sys_signal), group_id, members, e2e_properties) @@ -1090,7 +1132,10 @@ def eval_type_of_signal(type_encoding, base_type, ea): is_float = False elif base_type is not None: is_float = False - type_name = ea.get_element_name(base_type) + if isinstance(base_type, _Element): + type_name = ea.get_element_name(base_type) + else: + type_name = base_type if type_name[0] == 'u': is_signed = False # unsigned else: @@ -1107,9 +1152,13 @@ def ar_byteorder_is_little(in_string): return False -def get_signals(signal_array, frame, ea, multiplex_id, float_factory, bit_offset=0): +def get_signals(signal_array, frame, ea, multiplex_id, float_factory, bit_offset=0, + generated_update_bits_init_to_1: bool = False): # type: (typing.Sequence[_Element], typing.Union[canmatrix.Frame, canmatrix.Pdu], Earxml, int, typing.Callable, int) -> None - """Add signals from xml to the Frame.""" + """Add signals from xml to the Frame. + ATTENTION: be careful if you plan to use bit_offset != 0. + This will result in non-valid signal definitions (start-bit) in relation to the PDU. + Maybe a possible refactoring in the future to remove this bit_offset if no further need is identified?""" group_id = 1 if signal_array is None: # Empty signalarray - nothing to do @@ -1133,12 +1182,16 @@ def get_signals(signal_array, frame, ea, multiplex_id, float_factory, bit_offset isignal_array = ea.follow_all_ref(isignal, "I-SIGNAL-REF") get_signalgrp_and_signals(isignal, isignal_array, frame, group_id, ea) if ub_start_bit is not None: - ub_name = ea.get_element_name(isignal) + "_UB" + _ub_target_signal_name = ea.get_element_name(isignal) + ub_name = _ub_target_signal_name + "_UB" + _init_value = 1 if generated_update_bits_init_to_1 else 0 isignal_ub = canmatrix.Signal(ub_name, + comment=f"Update-Bit for Signal '{_ub_target_signal_name}'", start_bit=int(ub_start_bit.text, 0), size = 1, is_signed = False, - unit = "Unitless") + unit = "Unitless", + initial_value=_init_value) frame.add_signal(isignal_ub) group_id = group_id + 1 @@ -1157,15 +1210,35 @@ def get_signals(signal_array, frame, ea, multiplex_id, float_factory, bit_offset except IndexError: pass + base_type_name = None base_type = ea.follow_ref(isignal, "BASE-TYPE-REF") # AR4 if base_type is None: a = ea.selector(isignal, ">SYSTEM-SIGNAL-REF>DATA-TYPE-REF>BASE-TYPE-REF") if len(a) > 0: base_type = a[0] try: - type_encoding = ea.get_child(base_type, "BASE-TYPE-ENCODING").text + if base_type is None: + # if not found, get/check if it's an autosar defined SwBaseType + # search in different locations + _search_for_base_type = [ea.get_child(isignal, "BASE-TYPE-REF")] + _tmp = ea.selector(isignal, ">SYSTEM-SIGNAL-REF>DATA-TYPE-REF>BASE-TYPE-REF") + if _tmp is not None: + _search_for_base_type.extend(_tmp) + _tmp = ea.selector(isignal, ">SYSTEM-SIGNAL-REF>BASE-TYPE-REF") + if _tmp is not None: + _search_for_base_type.extend(_tmp) + for _ele in _search_for_base_type: + if _ele is None: + continue + type_encoding = AutosarBasePlatformTypes.encoding_by_ref(_ele) + base_type_name = AutosarBasePlatformTypes.datatype_by_ref(_ele) + if type_encoding is not None: + break + else: + type_encoding = ea.get_child(base_type, "BASE-TYPE-ENCODING").text + base_type_name = ea.get_element_name(base_type) except AttributeError: - type_encoding = "None" + type_encoding = "NONE" signal_name = None # type: typing.Optional[str] signal_name_elem = ea.get_child(isignal, "LONG-NAME") if signal_name_elem is not None: @@ -1241,8 +1314,14 @@ def get_signals(signal_array, frame, ea, multiplex_id, float_factory, bit_offset compu_method = ea.follow_ref(isignal, "COMPU-METHOD-REF") base_type = ea.follow_ref(isignal, "BASE-TYPE-REF") - encoding = ea.get_child(base_type, "BASE-TYPE-ENCODING") - if encoding is not None and encoding.text == "IEEE754": + if base_type is None: + # if not found, get/check if it's an autosar defined SwBaseType + encoding = AutosarBasePlatformTypes.encoding_by_ref(ea.get_child(isignal, "BASE-TYPE-REF")) + else: + encoding = ea.get_child(base_type, "BASE-TYPE-ENCODING") + if encoding is not None: + encoding = encoding.text + if encoding is not None and encoding == "IEEE754": is_float = True if compu_method is None: #logger.debug('No Compmethod found!! - try alternate scheme 1.') @@ -1256,7 +1335,7 @@ def get_signals(signal_array, frame, ea, multiplex_id, float_factory, bit_offset logger.debug('No valid compu method found for this - check ARXML file!!') compu_method = None if compu_method is None: - logger.error('No valid compu method found for isignal/systemsignal {}/{} - check ARXML file!!' + logger.info('No valid compu method found for isignal/systemsignal {}/{} - check ARXML file!!' .format(ea.get_short_name(isignal), ea.get_short_name(system_signal))) ##################################################################################################### # no found compu-method fuzzy search in systemsignal: @@ -1277,8 +1356,11 @@ def get_signals(signal_array, frame, ea, multiplex_id, float_factory, bit_offset if base_type is None: base_type = ea.follow_ref(datdefprops, "BASE-TYPE-REF") - - (is_signed, is_float) = eval_type_of_signal(type_encoding, base_type, ea) + if base_type is None and base_type_name is not None: + # if not found, maybe we already know the base_type_name? + (is_signed, is_float) = eval_type_of_signal(type_encoding, base_type_name, ea) + else: + (is_signed, is_float) = eval_type_of_signal(type_encoding, base_type, ea) unit_element = ea.follow_ref(isignal, "UNIT-REF") display_name = ea.get_child(unit_element, "DISPLAY-NAME") @@ -1359,11 +1441,18 @@ def get_signals(signal_array, frame, ea, multiplex_id, float_factory, bit_offset if communication_direction[0].text == "IN": new_signal.add_receiver(ea.get_short_name(ecu)) - if base_type is not None: - temp = ea.get_child(base_type, "SHORT-NAME") - if temp is not None and "boolean" == temp.text: - new_signal.add_values(1, "true") - new_signal.add_values(0, "false") + # TODO KONNI passt das jetzt so, eigentlich sollte es so passen + # wenn keine Fehler mehr auftauchen, dann den kommentierten Teil löschen vor dem einchecken! + # _base_type_name = None + # if base_type is not None: + # _base_type_name = ea.get_child(base_type, "SHORT-NAME") + # _base_type_name = _base_type_name.text if _base_type_name is not None else None + # # if _base_type_name is None: + # # # if not found, get/check if it's an autosar defined SwBaseType + # # _base_type_name = AutosarBasePlatformTypes.datatype_by_ref(ea.get_child(isignal, "BASE-TYPE-REF")) + if base_type_name is not None and "boolean" == base_type_name: + new_signal.add_values(1, "true") + new_signal.add_values(0, "false") if initvalue is not None and initvalue.text is not None: initvalue.text = canmatrix.utils.guess_value(initvalue.text) @@ -1372,6 +1461,11 @@ def get_signals(signal_array, frame, ea, multiplex_id, float_factory, bit_offset new_signal.initial_value = (float_factory(initvalue.text) * factor) + offset except decimal.InvalidOperation: logger.error("could not decode value {}".format(initvalue.text)) + else: + # no init-value found, check if existing initialized initial_value of Signal-object is between min/max, + # if not => INVALID! => set it to min to at least not generate an invalid Signal + if new_signal.min > new_signal.initial_value or new_signal.max < new_signal.initial_value: + new_signal.initial_value = new_signal.min for key, value in list(values.items()): new_signal.add_values(canmatrix.utils.decode_number(key, float_factory), value) @@ -1390,15 +1484,19 @@ def get_signals(signal_array, frame, ea, multiplex_id, float_factory, bit_offset if ub_start_bit is not None: ub_name = name + "_UB" + _init_value = 1 if generated_update_bits_init_to_1 else 0 new_signal_ub = canmatrix.Signal(ub_name, + comment=f"Update-Bit for Signal '{name}'", start_bit = int(ub_start_bit.text, 0), size = 1, is_signed = False, - unit = "Unitless") + unit = "Unitless", + initial_value=_init_value) frame.add_signal(new_signal_ub) -def get_frame_from_multiplexed_ipdu(pdu, target_frame, multiplex_translation, ea, float_factory): +def get_frame_from_multiplexed_ipdu(pdu, target_frame, multiplex_translation, ea, float_factory, + generated_update_bits_init_to_1: bool): selector_byte_order = ea.get_child(pdu, "SELECTOR-FIELD-BYTE-ORDER") selector_len = ea.get_child(pdu, "SELECTOR-FIELD-LENGTH") selector_start = ea.get_child(pdu, "SELECTOR-FIELD-START-POSITION") @@ -1420,7 +1518,8 @@ def get_frame_from_multiplexed_ipdu(pdu, target_frame, multiplex_translation, ea if ipdu is not None: pdu_sig_mappings = ea.get_child(ipdu, "SIGNAL-TO-PDU-MAPPINGS") pdu_sig_mapping = ea.get_children(pdu_sig_mappings, "I-SIGNAL-TO-I-PDU-MAPPING") - get_signals(pdu_sig_mapping, target_frame, ea, None, float_factory) + get_signals(pdu_sig_mapping, target_frame, ea, None, float_factory, + generated_update_bits_init_to_1=generated_update_bits_init_to_1) multiplex_translation[ea.get_element_name(ipdu)] = ea.get_element_name(pdu) dynamic_part = ea.get_child(pdu, "DYNAMIC-PART") @@ -1439,7 +1538,8 @@ def get_frame_from_multiplexed_ipdu(pdu, target_frame, multiplex_translation, ea pdu_sig_mappings = ea.get_child(ipdu, "SIGNAL-TO-PDU-MAPPINGS") pdu_sig_mapping = ea.get_children(pdu_sig_mappings, "I-SIGNAL-TO-I-PDU-MAPPING") - get_signals(pdu_sig_mapping, target_frame, ea, selector_id.text, float_factory) + get_signals(pdu_sig_mapping, target_frame, ea, selector_id.text, float_factory, + generated_update_bits_init_to_1=generated_update_bits_init_to_1) def containters_are_little_endian(ea): @@ -1451,7 +1551,191 @@ def containters_are_little_endian(ea): return False -def get_frame_from_container_ipdu(pdu, target_frame, ea, float_factory, headers_are_littleendian): +# def _get_secOC_properties(ea, pdu) -> (canmatrix.AutosarSecOCProperties|None, canmatrix.PDU|None): +def _get_secOC_properties(ea, pdu, secured_pdu_trigger): + if pdu is None or 'SECURED-I-PDU' not in pdu.tag: + return None, None + + # info about cryptographic-pdu: + # in a cryptographic-i-pdu the authentic-pdu is NOT a payload of the secured-id-pdu, + # authentic-pdu and secured-i-pdu are two different pdus which are transmitted + # separately (in a container-pdu without header/fixed sub-pdu positions) + # a cryptographic-pdu may can contain a message-link where a some bytes (len and pos defined + # by MESSAGE-LINK-LENGTH/-POSITION) of the authentic-pdu are copied to the secured-i-pdu + + try: + secured_i_pdu_name = ea.get_element_name(pdu) + secured_i_pdu_length = int(ea.get_child(pdu, "LENGTH").text, 0) + + _tmp_ele = ea.get_child(pdu, "USE-AS-CRYPTOGRAPHIC-I-PDU") + use_as_cryptographic_i_pdu = _tmp_ele.text if _tmp_ele is not None \ + else "False" + use_as_cryptographic_i_pdu = True if use_as_cryptographic_i_pdu.lower() == 'true' else False + + # depending on the arxml-file secoc-related information can be found directly in the SECURE-I-PDU, + # or if not, they are defined in the AUTHENTICATION-PROPS-REF or FRESHNESS-PROPS-REF + secured_ipdu_secoc = ea.get_child(pdu, "SECURE-COMMUNICATION-PROPS") + auth_props = ea.selector(pdu, ">AUTHENTICATION-PROPS-REF") + auth_props = auth_props[0] if len(auth_props) == 1 else None + freshness_props = ea.selector(pdu, ">FRESHNESS-PROPS-REF") + freshness_props = freshness_props[0] if len(freshness_props) == 1 else None + payload_pdu_trigger = ea.selector(pdu, ">PAYLOAD-REF") + payload_pdu_trigger = payload_pdu_trigger[0] if len(payload_pdu_trigger) == 1 else None + secured_pdu_port = ea.selector(secured_pdu_trigger, ">I-PDU-PORT-REF") + secured_pdu_port = secured_pdu_port[0] if len(secured_pdu_port) == 1 else None + # payload_i_pdu_port = ea.selector(payload_pdu_trigger, ">I-PDU-PORT-REF") + # payload_i_pdu_port = payload_i_pdu_port[0] if len(payload_i_pdu_port) == 1 else None + + _tmp_ele = ea.get_child(secured_ipdu_secoc, "AUTH-ALGORITHM") + if _tmp_ele is None and auth_props is not None: + _tmp_ele = ea.get_child(auth_props, "AUTH-ALGORITHM") + auth_algorithm = _tmp_ele.text if _tmp_ele is not None else None + + _tmp_ele = ea.get_child(secured_ipdu_secoc, "AUTH-INFO-TX-LENGTH") + if _tmp_ele is None and auth_props is not None: + _tmp_ele = ea.get_child(auth_props, "AUTH-INFO-TX-LENGTH") + auth_tx_length = int(_tmp_ele.text, 0) if _tmp_ele is not None else None + + _tmp_ele = ea.get_child(secured_ipdu_secoc, "DATA-ID") + data_id = int(_tmp_ele.text, 0) if _tmp_ele is not None else None + + _tmp_ele = ea.get_child(secured_ipdu_secoc, "FRESHNESS-VALUE-ID") + freshness_value_id = int(_tmp_ele.text, 0) if _tmp_ele is not None else None + + _tmp_ele = ea.get_child(secured_ipdu_secoc, "FRESHNESS-VALUE-LENGTH") + if _tmp_ele is None and freshness_props is not None: + _tmp_ele = ea.get_child(freshness_props, "FRESHNESS-VALUE-LENGTH") + freshness_bit_length = int(_tmp_ele.text, 0) if _tmp_ele is not None else None + + _tmp_ele = ea.get_child(secured_ipdu_secoc, "FRESHNESS-VALUE-TX-LENGTH") + if _tmp_ele is None and freshness_props is not None: + _tmp_ele = ea.get_child(freshness_props, "FRESHNESS-VALUE-TX-LENGTH") + freshness_tx_length = int(_tmp_ele.text, 0) if _tmp_ele is not None else None + + _tmp_ele = ea.get_child(secured_ipdu_secoc, "MESSAGE-LINK-LENGTH") + message_link_length = int(_tmp_ele.text, 0) if _tmp_ele is not None else None + + _tmp_ele = ea.get_child(secured_ipdu_secoc, "MESSAGE-LINK-POSITION") + message_link_position = int(_tmp_ele.text, 0) if _tmp_ele is not None else None + + _tmp_ele = ea.get_child(secured_pdu_port, "KEY-ID") + key_id = int(_tmp_ele.text, 0) if _tmp_ele is not None else None + + except Exception as e: + logger.warning(f"Unable to get SecOC-Properties for {ea.get_element_name(pdu)}", exc_info=True) + return None, None + + authentic_pdu = ea.selector(payload_pdu_trigger, ">I-PDU-REF") + if not authentic_pdu: + logger.error("SecuredIPdu %r is missing Payload", ea.get_short_name(pdu)) + return None, None + if isinstance(authentic_pdu, list): + authentic_pdu = authentic_pdu[0] + + authentic_i_pdu_name = ea.get_element_name(authentic_pdu) + authentic_pdu_length = int(ea.get_child(authentic_pdu, "LENGTH").text, 0) + + secOC_properties = canmatrix.AutosarSecOCProperties(secured_i_pdu_name, + authentic_i_pdu_name, + auth_algorithm, + authentic_pdu_length, + secured_i_pdu_length, + auth_tx_length, + data_id, + freshness_value_id, + freshness_bit_length, + freshness_tx_length, + use_as_cryptographic_i_pdu, + message_link_length, + message_link_position, + key_id=key_id, + ) + return secOC_properties, authentic_pdu + + +def _add_autosar_secoc_signals_to_parent(parent): + """single method for Frame and Pdu + used to add SecOC related signals to pdu and frame, based on secOC_properties as stored in the object itself. + do NOTHING if no secOC_properties are present! + + Note: atm canmatrix is handling Autosar-ARXML Frame which only points to a signal-i-pdu NOT as a Frame containing + a PDU, it is handled/stored as Frame-Only. But container-i-pdus are handled/stored as Frame containing PDUs. + Therefore, this method (and secOC_properties) is needed for the Frame AND the PDU (even if this is only + part of PDUs according to Autosar). + + :param Frame|Pdu parent: Frame or Pdu to process + :return: None + """ + if not hasattr(parent, "secOC_properties") or parent.secOC_properties is None: + return + _secOC_properties = parent.secOC_properties + + # info: secOC_properties.freshness_tx_length, secOC_properties.auth_tx_length + # and secOC_properties.message_link_length are in bit and NOT byte!!! + + # structure/order of a secured-i-pdu: + # secured-i-pdu header: Optional, not implemented in canmatrix at the moment + # authentic-pdu (here the parent): use_as_cryptographic_i_pdu == False: mandatory, if True: not allowed + # Truncated Freshness + # Truncated Auth-Info (cmac) + # Message-Link: Optional, only used if use_as_cryptographic_i_pdu == True and message_link_length > 0 + + # get the correct start_bit position + if _secOC_properties.use_as_cryptographic_i_pdu == True: + _start_base_bit = 0 + else: + _start_base_bit = _secOC_properties.authentic_pdu_length * 8 + + # set shortcuts for different lengths + if _secOC_properties.freshness_tx_length and _secOC_properties.freshness_tx_length > 0: + _fv_tx_len = _secOC_properties.freshness_tx_length + else: + _fv_tx_len = 0 + if _secOC_properties.auth_tx_length and _secOC_properties.auth_tx_length > 0: + _auth_tx_len = _secOC_properties.auth_tx_length + else: + _auth_tx_len = 0 + if _secOC_properties.message_link_length and _secOC_properties.message_link_length > 0: + _msg_lnk_len = _secOC_properties.message_link_length + else: + _msg_lnk_len = 0 + + # add needed signals + if _fv_tx_len > 0: + freshness_name = f"{parent.name}_Freshness" + signal_freshness = canmatrix.Signal(freshness_name, + comment='Truncated Freshness-Value', + start_bit=_start_base_bit, + size=_fv_tx_len, + is_signed=False, + is_little_endian=False, + unit="Unitless") + parent.add_signal(signal_freshness) + + if _auth_tx_len > 0: + authinfo_name = f"{parent.name}_AuthInfo" + signal_authinfo = canmatrix.Signal(authinfo_name, + comment='Truncated Auth-Info', + start_bit=_start_base_bit + _fv_tx_len, + size=_auth_tx_len, + is_signed=False, + is_little_endian=False, + unit="Unitless") + parent.add_signal(signal_authinfo) + if _secOC_properties.use_as_cryptographic_i_pdu == True and _msg_lnk_len > 0: + message_link_name = f"{parent.name}_MsgLink" + signal_msglnk = canmatrix.Signal(message_link_name, + comment='Message-Link Value', + start_bit=_start_base_bit + _fv_tx_len + _auth_tx_len, + size=_msg_lnk_len, + is_signed=False, + is_little_endian=False, + unit="Unitless") + parent.add_signal(signal_msglnk) + + +def get_frame_from_container_ipdu(pdu, target_frame, ea, float_factory, headers_are_littleendian, + generated_update_bits_init_to_1: bool): target_frame.is_fd = True pdus = ea.follow_all_ref(pdu, "CONTAINED-PDU-TRIGGERING-REF") header_type = ea.get_child(pdu, "HEADER-TYPE").text @@ -1477,6 +1761,38 @@ def get_frame_from_container_ipdu(pdu, target_frame, ea, float_factory, headers_ ipdu = ea.follow_ref(cpdu, "I-PDU-REF") if ipdu in ipdus_refs: continue + try: + if header_type == "SHORT-HEADER": + header_id = ea.get_child(ipdu, "HEADER-ID-SHORT-HEADER").text + elif header_type == "LONG-HEADER": + header_id = ea.get_child(ipdu, "HEADER-ID-LONG-HEADER").text + else: + # none type + header_id = None + except AttributeError: + header_id = None + if header_id is not None: + header_id = int(header_id, 0) + + ipdu_name = ea.get_element_name(ipdu) + + # check/get secoc-stuff + _secoc_properties, authentic_pdu = _get_secOC_properties(ea, ipdu, cpdu) + if (_secoc_properties is not None and authentic_pdu is not None + and _secoc_properties.use_as_cryptographic_i_pdu == False): + # in case of a 'normal' secured-i-pdu, copy the authentic-pdu over the secured-i-pdu + # + # hint: copy the authentic/payload-pdu over the secured-i-pdu and adding signals to the pdu is + # in terms of SecOC-Specification incorrect (also practically/logically incorrect, + # because it is the wrong pdu-name) + # but as long as canmatrix does not support 'a pdu containing another pdu' this is the only acceptable + # workaround + ipdu = authentic_pdu + ipdu_name = ea.get_element_name(ipdu) # get it once more after copy authentic-pdu over + logger.info("found secured pdu '%s', dissolved to '%s'", _secoc_properties.secured_i_pdu_name, + ipdu_name) + + # this need to be done AFTER the SECURED-I-PDU handling, otherwise the timing-specifications are wrong! ipdus_refs.append(ipdu) timing_spec = ea.get_child(ipdu, "I-PDU-TIMING-SPECIFICATION") if timing_spec is None: @@ -1492,28 +1808,17 @@ def get_frame_from_container_ipdu(pdu, target_frame, ea, float_factory, headers_ value = ea.get_child(time_period, "VALUE") if value is not None: cycle_time = int(float_factory(value.text) * 1000) + # TODO maybe we need to refactor getting the offset for secured-i-pdus to get the offset BEFORE we + # copy the authentic/payload-PDU over the ipdu. need to be clarified. For the moment leave it as it was. + # until now we have not seen an non-cryptographic secured-i-pdu inside + # an static-container-PDU (=without header) (only in that veeeery specific situation it's important) try: - if header_type == "SHORT-HEADER": - header_id = ea.get_child(ipdu, "HEADER-ID-SHORT-HEADER").text - elif header_type == "LONG-HEADER": - header_id = ea.get_child(ipdu, "HEADER-ID-LONG-HEADER").text - else: - # none type - header_id = None - except AttributeError: - header_id = None - if header_id is not None: - header_id = int(header_id, 0) - - if ipdu is not None and 'SECURED-I-PDU' in ipdu.tag: - secured_i_pdu_name = ea.get_element_name(ipdu) - payload = ea.follow_ref(ipdu, "PAYLOAD-REF") - ipdu = ea.follow_ref(payload, "I-PDU-REF") - logger.info("found secured pdu '%s', dissolved to '%s'", secured_i_pdu_name, ea.get_element_name(ipdu)) - try: - offset = int(ea.get_child(ipdu, "OFFSET").text, 0) * 8 + offset_bytes = int(ea.get_child(ipdu, "OFFSET").text, 0) except: - offset = 0 + if header_id is None: + logger.error(f"PDU {ipdu_name} is a Container-sub-PDU with no header, but NO PDU offset was found! " + f"(will most likely result in wrong encoding/decoding!) - check ARXML file!!") + offset_bytes = 0 try: pdu_type = ipdu.attrib["DEST"] @@ -1523,14 +1828,30 @@ def get_frame_from_container_ipdu(pdu, target_frame, ea, float_factory, headers_ pdu_port_type = ea.get_child(cpdu, "I-PDU-PORT-REF").attrib["DEST"] except (AttributeError, KeyError): pdu_port_type = "" - ipdu_length = int(ea.get_child(ipdu, "LENGTH").text, 0) - ipdu_name = ea.get_element_name(ipdu) + if _secoc_properties is not None: + # use the correct length for SECURED-I-PDU + # WARNING: for SECURED-I-PDU (which is not a cryptographic-pdu) inside an CONTAINER-I-PDU the length + # of the PDU with the name of the authentic-pdu DOES NOT match + # the length of the PDU as in the arxml-file + # Reason: we need added the SecOC-related signals (truncated freshness and trunctated cmac) to the PDU + # as adding this to the frame is not possible at dynamic container-pdus + # this is the only acceptable/possible solution in canmatrix at the moment + ipdu_length = _secoc_properties.secured_i_pdu_length + else: + ipdu_length = int(ea.get_child(ipdu, "LENGTH").text, 0) ipdu_triggering_name = ea.get_element_name(cpdu) target_pdu = canmatrix.Pdu(name=ipdu_name, size=ipdu_length, id=header_id, triggering_name=ipdu_triggering_name, pdu_type=pdu_type, - port_type=pdu_port_type, cycle_time=cycle_time) + port_type=pdu_port_type, cycle_time=cycle_time, + secOC_properties=_secoc_properties, + offset_bytes=offset_bytes) + + # for a secured-id-pdu which is contained in a container-i-pdu it is NOT possible to add the SecOC-signals + # to the frame, so we HAVE to add it to the authentic-pdu (which is incorrect, see hint above). + _add_autosar_secoc_signals_to_parent(target_pdu) # add secoc-related signals to PDU pdu_sig_mapping = ea.get_children(ipdu, "I-SIGNAL-TO-I-PDU-MAPPING") - get_signals(pdu_sig_mapping, target_pdu, ea, None, float_factory, bit_offset=offset) + get_signals(pdu_sig_mapping, target_pdu, ea, None, float_factory, bit_offset=0, + generated_update_bits_init_to_1=generated_update_bits_init_to_1) target_frame.add_pdu(target_pdu) @@ -1572,23 +1893,54 @@ def store_frame_timings(target_frame, cyclic_timing, event_timing, minimum_delay target_frame.cycle_time = int(float_factory(value.text) * 1000) -def get_frame(frame_triggering, ea, multiplex_translation, float_factory, headers_are_littleendian): +def get_frame(frame_triggering, ea, multiplex_translation, float_factory, headers_are_littleendian, + generated_update_bits_init_to_1: bool): # type: (_Element, Earxml, dict, typing.Callable, bool) -> typing.Union[canmatrix.Frame, None] global frames_cache - arb_id = ea.get_child(frame_triggering, "IDENTIFIER") + def set_frame_trigger_attributes_to_frame(frame_triggering, _frame): + arbitration_id = int(ea.get_child(frame_triggering, "IDENTIFIER").text, 0) + address_mode = ea.get_child(frame_triggering, "CAN-ADDRESSING-MODE") + frame_rx_behaviour_elem = ea.get_child(frame_triggering, "CAN-FRAME-RX-BEHAVIOR") + frame_tx_behaviour_elem = ea.get_child(frame_triggering, "CAN-FRAME-TX-BEHAVIOR") + is_fd_elem = ea.get_child(frame_triggering, "CAN-FD-FRAME-SUPPORT") + if address_mode is not None and address_mode.text == 'EXTENDED': + _frame.arbitration_id = canmatrix.ArbitrationId(arbitration_id, extended=True) + else: + _frame.arbitration_id = canmatrix.ArbitrationId(arbitration_id, extended=False) + + if (frame_rx_behaviour_elem is not None and frame_rx_behaviour_elem.text == 'CAN-FD') or \ + (frame_tx_behaviour_elem is not None and frame_tx_behaviour_elem.text == 'CAN-FD') or \ + (is_fd_elem is not None and is_fd_elem.text.lower() == 'true'): + _frame.is_fd = True + else: + _frame.is_fd = False + frame_elem = ea.follow_ref(frame_triggering, "FRAME-REF") frame_trig_name_elem = ea.get_child(frame_triggering, "SHORT-NAME") logger.debug("processing Frame-Trigger: %s", frame_trig_name_elem.text) + arb_id = ea.get_child(frame_triggering, "IDENTIFIER") if arb_id is None: logger.info("found Frame-Trigger %s without arbitration id", frame_trig_name_elem.text) return None - arbitration_id = int(arb_id.text, 0) + + pdu_trigger = ea.selector(frame_triggering, ">I-PDU-TRIGGERING-REF") + if len(pdu_trigger) == 0: + pdu_trigger = ea.selector(frame_triggering, ">PDU-TRIGGERING-REF") + if len(pdu_trigger) == 1: + pdu_trigger = pdu_trigger[0] + else: + logger.debug(f"Frame-Trigger '{frame_trig_name_elem.text}' not a single PDU-Trigger found!") + pdu_trigger = None if frame_elem is not None: logger.debug("Frame: %s", ea.get_element_name(frame_elem)) if frame_elem in frames_cache: - return copy.deepcopy(frames_cache[frame_elem]) + # if we use some kind of cache for the frame, we need to take over the information which depends on + # the frame-triggering. otherwise we got wrong frames (e.g. wrong/duplicated IDs) + _f = copy.deepcopy(frames_cache[frame_elem]) + set_frame_trigger_attributes_to_frame(frame_triggering, _f) + return copy.deepcopy(_f) dlc_elem = ea.get_child(frame_elem, "FRAME-LENGTH") # pdu_mapping = ea.get_child(frame_elem, "PDU-TO-FRAME-MAPPING") # pdu = ea.follow_ref(pdu_mapping, "PDU-REF") # SIGNAL-I-PDU @@ -1597,62 +1949,26 @@ def get_frame(frame_triggering, ea, multiplex_translation, float_factory, header # pdu_name = ea.get_element_name(pdu) # target_pdu = canmatrix.Pdu(name=pdu_name) - secOC_properties = None - if pdu is not None and 'SECURED-I-PDU' in pdu.tag: - try: - payload_length = ea.get_child(pdu, "LENGTH").text - - secured_ipdu_SecoC = ea.get_child(pdu, "SECURE-COMMUNICATION-PROPS") - - auth_algorithm = ea.get_child(secured_ipdu_SecoC, "AUTH-ALGORITHM").text - auth_tx_length = ea.get_child(secured_ipdu_SecoC, "AUTH-INFO-TX-LENGTH").text - data_id = ea.get_child(secured_ipdu_SecoC, "DATA-ID").text - freshness_bit_length = ea.get_child(secured_ipdu_SecoC, "FRESHNESS-VALUE-LENGTH").text - freshness_tx_length = ea.get_child(secured_ipdu_SecoC, "FRESHNESS-VALUE-TX-LENGTH").text - - secOC_properties = canmatrix.AutosarSecOCProperties(auth_algorithm, - int(payload_length, 0), - int(auth_tx_length, 0), - int(data_id, 0), - int(freshness_bit_length, 0), - int(freshness_tx_length, 0) - ) - except Exception as e: - logger.warning(f"{e}") - - ipdu = ea.selector(pdu, ">PAYLOAD-REF>I-PDU-REF") - if not ipdu: - logger.error("SecuredIPdu %r is missing Payload", ea.get_short_name(pdu)) - return None - - pdu = ipdu[0] - - ipdu_length = ea.get_child(pdu, "LENGTH").text - - new_frame = canmatrix.Frame(ea.get_element_name(frame_elem), size=int(dlc_elem.text, 0), secOC_properties=secOC_properties) + _secoc_properties, authentic_pdu = _get_secOC_properties(ea, pdu, pdu_trigger) + if (_secoc_properties is not None and authentic_pdu is not None + and _secoc_properties.use_as_cryptographic_i_pdu == False): + # in case of an SECURE-I-PDU which is NOT a cryptographic-pdu: + # copy the authentic/payload pdu OVER the secure-i-pdu + # + # hint: copy the authentic/payload-pdu over the secured-i-pdu and adding signals to the frame is + # in terms of SecOC-Specification incorrect (also practically/logically incorrect, + # because it is the wrong pdu-name) + # but as long as canmatrix does not support 'a pdu containing another pdu' this is the only acceptable + # workaround + pdu = authentic_pdu + + new_frame = canmatrix.Frame(ea.get_element_name(frame_elem), size=int(dlc_elem.text, 0), + secOC_properties=_secoc_properties) + # adding the SecOC-signals to the Frame is technically/logically incorrect, but as long as canmatrix + # does not support "a pdu containing another pdu" this is the only acceptable workaround + _add_autosar_secoc_signals_to_parent(new_frame) # add secoc-realted signals to Frame # new_frame.add_pdu(target_pdu) - - if secOC_properties is not None: - if freshness_tx_length is not None and int(freshness_tx_length, 0) > 0: - freshness_name = f"{ea.get_element_name(frame_elem)}_Freshness" - signal_freshness = canmatrix.Signal(freshness_name, - start_bit = int(ipdu_length, 0) * 8 + int(freshness_tx_length, 0) - 16, - size = int(freshness_tx_length, 0), - is_signed = False, - is_little_endian = False, - unit = "Unitless") - new_frame.add_signal(signal_freshness) - - if auth_tx_length is not None and int(auth_tx_length, 0) > 0: - authinfo_name = f"{ea.get_element_name(frame_elem)}_AuthInfo" - signal_authinfo = canmatrix.Signal(authinfo_name, - start_bit = int(ipdu_length, 0) * 8 + int(freshness_tx_length, 0), - size = int(auth_tx_length, 0), - is_signed = False, - is_little_endian = False, - unit = "Unitless") - new_frame.add_signal(signal_authinfo) - + comment = ea.get_element_desc(frame_elem) if pdu is not None: new_frame.add_attribute("PduName", ea.get_short_name(pdu)) @@ -1663,14 +1979,13 @@ def get_frame(frame_triggering, ea, multiplex_translation, float_factory, header else: # without frameinfo take short-name of frametriggering and dlc = 8 logger.debug("Frame-Trigger %s has no FRAME-REF", frame_trig_name_elem.text) - pdu = ea.selector(frame_triggering, ">I-PDU-TRIGGERING-REF>I-PDU-REF") # AR4.2 - if len(pdu) == 0: - pdu = ea.selector(frame_triggering, ">PDU-TRIGGERING-REF>I-PDU-REF") + pdu = ea.selector(pdu_trigger, ">I-PDU-REF") # AR4.2 if len(pdu) > 0: pdu = pdu[0] else: pdu = None dlc_elem = ea.get_child(pdu, "LENGTH") + arbitration_id = int(arb_id.text, 0) new_frame = canmatrix.Frame(frame_trig_name_elem.text, arbitration_id=arbitration_id, size=int(int(dlc_elem.text, 0) / 8)) if pdu is not None: @@ -1686,21 +2001,7 @@ def get_frame(frame_triggering, ea, multiplex_translation, float_factory, header if new_frame.comment is None: new_frame.add_comment(ea.get_element_desc(pdu)) - address_mode = ea.get_child(frame_triggering, "CAN-ADDRESSING-MODE") - frame_rx_behaviour_elem = ea.get_child(frame_triggering, "CAN-FRAME-RX-BEHAVIOR") - frame_tx_behaviour_elem = ea.get_child(frame_triggering, "CAN-FRAME-TX-BEHAVIOR") - is_fd_elem = ea.get_child(frame_triggering, "CAN-FD-FRAME-SUPPORT") - if address_mode is not None and address_mode.text == 'EXTENDED': - new_frame.arbitration_id = canmatrix.ArbitrationId(arbitration_id, extended=True) - else: - new_frame.arbitration_id = canmatrix.ArbitrationId(arbitration_id, extended=False) - - if (frame_rx_behaviour_elem is not None and frame_rx_behaviour_elem.text == 'CAN-FD') or \ - (frame_tx_behaviour_elem is not None and frame_tx_behaviour_elem.text == 'CAN-FD') or \ - (is_fd_elem is not None and is_fd_elem.text.lower() == 'true'): - new_frame.is_fd = True - else: - new_frame.is_fd = False + set_frame_trigger_attributes_to_frame(frame_triggering, new_frame) timing_spec = ea.get_child(pdu, "I-PDU-TIMING-SPECIFICATION") # AR 3 if timing_spec is None: @@ -1722,25 +2023,40 @@ def get_frame(frame_triggering, ea, multiplex_translation, float_factory, header if pdu is not None: if "MULTIPLEXED-I-PDU" in pdu.tag: - get_frame_from_multiplexed_ipdu(pdu, new_frame, multiplex_translation, ea, float_factory) + get_frame_from_multiplexed_ipdu(pdu, new_frame, multiplex_translation, ea, float_factory, + generated_update_bits_init_to_1) elif pdu.tag == ea.ns + "CONTAINER-I-PDU": - get_frame_from_container_ipdu(pdu, new_frame, ea, float_factory, headers_are_littleendian) + get_frame_from_container_ipdu(pdu, new_frame, ea, float_factory, headers_are_littleendian, + generated_update_bits_init_to_1) else: pdu_sig_mapping = ea.selector(pdu, "//I-SIGNAL-TO-I-PDU-MAPPING") if pdu_sig_mapping: - get_signals(pdu_sig_mapping, new_frame, ea, None, float_factory) + get_signals(pdu_sig_mapping, new_frame, ea, None, float_factory, + generated_update_bits_init_to_1=generated_update_bits_init_to_1) else: # Seen some pdu_sig_mapping being [] and not None with some arxml 4.2 - update_frame_with_pdu_triggerings(new_frame, ea, frame_triggering, float_factory) + update_frame_with_pdu_triggerings(new_frame, ea, frame_triggering, float_factory, + generated_update_bits_init_to_1) else: # AR 4.2 update_frame_with_pdu_triggerings( - new_frame, ea, frame_triggering, float_factory) + new_frame, ea, frame_triggering, float_factory, generated_update_bits_init_to_1) if new_frame.is_pdu_container and new_frame.cycle_time == 0: + # TODO refactoring needed! + # TODO it IS absolutely okay, and also seen in real-world systems that the cycle-times of the container-pdu + # TODO and sub-pdus differ (differ from container to sub-pdu BUT also from sub-pdu to sub-pdu) + # TODO that is an KEY-FEATURE (!!!) of dynamic container-pdus + # TODO often the cycle-time of the container-pdu is 0 and the sub-pdus has different cycle-times + # TODO setting here the container-cycle-time to the shortest cycle-time of all sub-pdus is incorrect and + # TODO can lead into wrong behavior. + # TODO need to be refactored, for the moment at least changed the logger-output from error to info (because + # TODO it is NOT an error) and add the information to which cycle-time the frame is updated. + # TODO need to be clarified why this was added? was there any specific reason for this? cycle_times = {pdu.cycle_time for pdu in new_frame.pdus} if len(cycle_times) > 1: - logger.warning("%s is pdu-container(frame) with different cycle times (%s), frame cycle-time: %s", + logger.info("%s is pdu-container(frame) with different cycle times (%s), frame cycle-time: %s. " + "Set Frame-cycle-time to {min(cycle_times)}", new_frame.name, cycle_times, new_frame.cycle_time) new_frame.cycle_time = min(cycle_times) new_frame.fit_dlc() @@ -1750,7 +2066,7 @@ def get_frame(frame_triggering, ea, multiplex_translation, float_factory, header return copy.deepcopy(new_frame) -def update_frame_with_pdu_triggerings(frame, ea, frame_triggering, float_factory): +def update_frame_with_pdu_triggerings(frame, ea, frame_triggering, float_factory, generated_update_bits_init_to_1: bool): # type: (canmatrix.Frame, Earxml, _Element, typing.Callable) -> None """Update frame with signals from PDU Triggerings.""" pdu_trigs = ea.follow_all_ref(frame_triggering, "PDU-TRIGGERINGS-REF") @@ -1771,7 +2087,8 @@ def update_frame_with_pdu_triggerings(frame, ea, frame_triggering, float_factory ) # signal_to_pdu_map = get_children(signal_to_pdu_maps, "I-SIGNAL-TO-I-PDU-MAPPING", arDict, ns) - get_signals(signal_to_pdu_maps, frame, ea, None, float_factory) # todo BUG expects list, not item + get_signals(signal_to_pdu_maps, frame, ea, None, float_factory, + generated_update_bits_init_to_1=generated_update_bits_init_to_1) # todo BUG expects list, not item else: logger.debug("Frame %s (assuming AR4.2) no PDU-TRIGGERINGS found", frame.name) @@ -1851,7 +2168,7 @@ def extract_cm_from_ecuc(com_module, ea): return {"": db} -def decode_ethernet_helper(ea, float_factory): +def decode_ethernet_helper(ea, float_factory, generated_update_bits_init_to_1: bool): found_matrixes = {} nodes = {} # type: typing.Dict[_Element, canmatrix.Ecu] @@ -1990,14 +2307,15 @@ def decode_ethernet_helper(ea, float_factory): pdu_sig_mapping = ea.findall("I-SIGNAL-TO-I-PDU-MAPPING", ipdu) - get_signals(pdu_sig_mapping, target_frame, ea, None, float_factory) + get_signals(pdu_sig_mapping, target_frame, ea, None, float_factory, + generated_update_bits_init_to_1=generated_update_bits_init_to_1) # target_frame.update_receiver() # It will make transmitter and receiver worse db.add_frame(target_frame) return found_matrixes -def decode_flexray_helper(ea, float_factory): +def decode_flexray_helper(ea, float_factory, generated_update_bits_init_to_1: bool): found_matrixes = {} fcs = ea.findall('FLEXRAY-CLUSTER') frame_counter = 0 @@ -2037,12 +2355,13 @@ def decode_flexray_helper(ea, float_factory): triggering_name=ipdu_triggering_name, pdu_type=pdu_type, port_type=pdu_port_type) pdu_sig_mapping = ea.get_children(ipdu, "I-SIGNAL-TO-I-PDU-MAPPING") - get_signals(pdu_sig_mapping, target_pdu, ea, None, float_factory) + get_signals(pdu_sig_mapping, target_pdu, ea, None, float_factory, + generated_update_bits_init_to_1=generated_update_bits_init_to_1) frame.add_pdu(target_pdu) return found_matrixes -def decode_can_helper(ea, float_factory, ignore_cluster_info): +def decode_can_helper(ea, float_factory, ignore_cluster_info, generated_update_bits_init_to_1: bool): found_matrixes = {} if ignore_cluster_info is True: ccs = [lxml.etree.Element("ignoreClusterInfo")] # type: typing.Sequence[_Element] @@ -2096,7 +2415,8 @@ def decode_can_helper(ea, float_factory, ignore_cluster_info): multiplex_translation = {} # type: typing.Dict[str, str] for frameTrig in can_frame_trig: # type: _Element - frame = get_frame(frameTrig, ea, multiplex_translation, float_factory, headers_are_littleendian) + frame = get_frame(frameTrig, ea, multiplex_translation, float_factory, headers_are_littleendian, + generated_update_bits_init_to_1) if frame is not None: frame.is_j1939 = "J-1939" in cc.tag @@ -2187,6 +2507,8 @@ def load(file, **options): preferred_languages.append("FOR-ALL") logger.debug(f"preferred_languages: {preferred_languages}") + generated_update_bits_init_to_1: bool = options.get('update_bit_init_1', False) + result = {} logger.debug("Read arxml ...") @@ -2207,12 +2529,12 @@ def load(file, **options): logger.debug("%d I-SIGNAL-TO-I-PDU-MAPPING in arxml...", get_sig_ipdu_nb(ea)) if decode_ethernet: - result.update(decode_ethernet_helper(ea, float_factory)) + result.update(decode_ethernet_helper(ea, float_factory, generated_update_bits_init_to_1)) if decode_flexray: - result.update(decode_flexray_helper(ea, float_factory)) + result.update(decode_flexray_helper(ea, float_factory, generated_update_bits_init_to_1)) - result.update(decode_can_helper(ea, float_factory, ignore_cluster_info)) + result.update(decode_can_helper(ea, float_factory, ignore_cluster_info, generated_update_bits_init_to_1)) result = canmatrix.cancluster.CanCluster(result)