本文主要是介绍ISO15765-2 CAN报文解析 python,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
网络传输层帧结构:
个人微信: wzdhh991
邮箱: wzd991@126.com
个人比较懒,CSDN更新缓慢,有相关问题可发邮箱或个人微信交流!
源码:
# -*- coding:utf-8 -*-
import queue
import sys
import os
import binascii
import struct
import re
import json
import traceback
import threading
from tool_base import *
from zlgcan import *
from PyQt6.QtCore import *
from queue import *
import time
import logging
uds_tp_wdbg = logging.getLogger('UdsTp')
# mode
(UDS_TP_MODE_TX_AND_RX, UDS_TP_MODE_RX) = range(2)
# rx msg
(MSG_ID_RX_PHY, MSG_ID_TX_PHY,MSG_ID_TX_APP, MSG_ID_RX_UDS_APP, MSG_ID_TX_END, MSG_ID_READ_INFO,MSG_ID_WRITE_ST) = range(7)
# data
(BAND_RATE_125K, BAND_RATE_250K, BAND_RATE_500K, BAND_RATE_1M) = (125, 250, 500, 1000)
(RX_MARK_PHY, RX_MARK_UDS_APP) = (1, 2)
MAX_RCV_NUM = 1
(NPCItype_SINGLE_FRAME, NPCItype_FIRST_FRAME, NPCItype_CONTINUE_FRAME, NPCItype_CTRL_FRAME) = range(0, 4)
(ST_IDLE, TX_ST_RXING) = range(0, 2)
(MSG_ID_SEND_UDS, MSG_ID_SEND_PHY) = range(2)
def get_uds_tp_msg_id_st(id):name = ('RX PHY', 'TX PHY', 'TX APP', 'RX APP')if id >= len(name):return 'unknon'else:return name[id]def get_n_pci(head):out_msg = {}try:pci_type = head[0] >> 4out_msg['N_PCItype'] = pci_typeif pci_type == NPCItype_SINGLE_FRAME: # singleframe sfout_msg['SF_DL'] = head[0] & 0xfif out_msg['SF_DL'] > 8 or out_msg['SF_DL'] == 0:out_msg['SF_DL'] = 7out_msg['data'] = binascii.hexlify(bytearray(head[1:out_msg['SF_DL'] + 1]))elif pci_type == NPCItype_FIRST_FRAME: # first frame ff'''0-6 无效 7:扩展地址或者可变地址8-fff数据长度'''out_msg['FF_DL'] = ((head[0] & 0xf) << 8) | head[1]out_msg['data'] = binascii.hexlify(bytearray(head[2:]))passelif pci_type == NPCItype_CONTINUE_FRAME: # consecutive frame cfout_msg['SN'] = head[0] & 0xfout_msg['data'] = binascii.hexlify(bytearray(head[1:]))passelif pci_type == NPCItype_CTRL_FRAME: # flow controlout_msg['data'] = ''out_msg['FS'] = head[0] & 0x0fif out_msg['FS'] == 0:out_msg['FS_name'] = 'continuetosend(CTS)' # continuetosendelif out_msg['FS'] == 1:out_msg['FS_name'] = 'wait (WT)'elif out_msg['FS'] == 2:out_msg['FS_name'] = 'overflow (OVFLW)'else:out_msg['FS_name'] = 'unknow'out_msg['BS'] = head[1]out_msg['STmin'] = head[2]if 127 >= out_msg['STmin'] >= 0:out_msg['STmin_name'] = 'separation time:%dms' % out_msg['STmin']elif 0xf1 <= out_msg['STmin'] <= 0xf9:out_msg['STmin_name'] = 'separation time:%dus' % out_msg['STmin']else:out_msg['STmin_name'] = 'unknow'except Exception as e:printf_except_info(e)return out_msgclass uds_tp(object):rx_signal_hd = [None, None]ch_run_mark = [0, 0]dev_info = {}can_hd = Nonedev_handle = Nonecan_handle = [None, None]read_thread = [None,None]run_mode = [UDS_TP_MODE_RX, UDS_TP_MODE_RX]msg_queue = (Queue(),Queue())run_st = [ST_IDLE, ST_IDLE]wait_sn = [1,1]decode_all_len = [0, 0]decode_msg = [bytearray(), bytearray()]can_id_tx = [0x700, 0x700]can_id_rx = [0x730, 0x730]"""docstring for uds_tp"""def __init__(self, dev_id):super(uds_tp, self).__init__()uds_tp_wdbg.info('device id:%d', dev_id)with open("./dev_info.json", "r") as fd:self.dev_info = json.load(fd)["USBCAN-II"]self.can_hd = ZCAN()self.dev_handle = self.can_hd.OpenDevice(self.dev_info["dev_type"], dev_id, 0)"""请求发送数据"""def set_tx_rx_id(self,cid:int, tx:int, rx:int):uds_tp_wdbg.info('tx:0x%x rx:0x%x'%(tx,rx))self.can_id_tx[cid] = txself.can_id_rx[cid] = rxdef send_phy_data(self, cid, can_id, data):self.msg_queue[cid].put((MSG_ID_SEND_PHY, can_id , data))#私用使用def _send_data_to_can(self, cid, can_id, data):#todo.try:msg = {}msg['cid'] = cidmsg['cmd'] = MSG_ID_TX_PHYmsg['canid'] = can_idmsg['time'] = time.time()msg['len'] = 8msg['data'] = dataself.rx_signal_hd[cid].emit(msg)msg_temp = ZCAN_Transmit_Data()msg_temp.transmit_type = 0msg_temp.frame.rtr = 0if can_id > 0x7fff:msg_temp.frame.eff = 1else:msg_temp.frame.eff = 0msg_temp.frame.can_dlc = 8msg_temp.frame.brs = 0msg_temp.frame.len = 8msg_temp.frame.can_id = can_idfor j in range(0, 8):msg_temp.frame.data[j] = int(data[j])uds_tp_wdbg.info( 'tx canid:0x%x data:%s'%(can_id, binascii.hexlify(data)))ret = self.can_hd.Transmit(self.can_handle[cid], msg_temp, 1)if ret == 1:return 0except Exception as e:printf_except_info(e)def send_uds_frame(self, cid, can_id, data):msg = {}msg['cid'] = cidmsg['cmd'] = MSG_ID_TX_APPmsg['canid'] = can_idmsg['time'] = time.time()msg['len'] = len(data)msg['data'] = dataself.rx_signal_hd[cid].emit(msg)if len(data) == 0:returnif self.run_mode[cid] == UDS_TP_MODE_TX_AND_RX:self.msg_queue[cid].put((MSG_ID_SEND_UDS, can_id, data))def start(self, mode=UDS_TP_MODE_TX_AND_RX,cid=0, band_rate='500K',can_id=(),rx_signal=None, rx_mark=0, dev_id=0):if self.ch_run_mark[cid] == 0:try:self.rx_signal_hd[cid] = rx_signalchn_cfg = ZCAN_CHANNEL_INIT_CONFIG()chn_cfg.can_type = ZCAN_TYPE_CANchn_cfg.config.can.mode = 0chn_cfg.config.can.timing0 = self.dev_info["chn_info"]["baudrate"][band_rate]["timing0"]chn_cfg.config.can.timing1 = self.dev_info["chn_info"]["baudrate"][band_rate]["timing1"]chn_cfg.config.can.acc_code = 0chn_cfg.config.can.acc_mask = 0xFFFFFFFFself.run_mode[cid] = modeif self.dev_handle == INVALID_DEVICE_HANDLE:return "OpenDevice!"self.can_handle[cid] = self.can_hd.InitCAN(self.dev_handle, cid, chn_cfg)if self.can_handle[cid] == INVALID_CHANNEL_HANDLE:return "初始化通道失败!"ret = self.can_hd.StartCAN(self.can_handle[cid])if ret != ZCAN_STATUS_OK:return "打开通道失败!"self.ch_run_mark[cid] = 1self.read_thread[cid] = threading.Thread(None, target=self.handle_press, args=(cid, self.rx_signal_hd[cid]))self.read_thread[cid].start()except Exception as e:printf_except_info(e)return "ok"else:return "err""""请求停止"""def stop(self, cid):if self.ch_run_mark[cid] == 1:self.ch_run_mark[cid] = 0self.read_thread[cid].join(0.1)self.can_hd.ResetCAN(self.can_handle[cid])self.can_hd.CloseDevice(self.can_handle[cid])passdef set_mode(self,cid, mode):self.run_mode[cid] = modedef get_mode(self,cid):return self.run_mode[cid]def _transmit_layer_decode(self, cid, can_id, data):if self.run_mode[cid] == UDS_TP_MODE_TX_AND_RX:if (can_id != self.can_id_rx[cid]) and (can_id != 0x7df):return Noneif self.run_mode[cid] == UDS_TP_MODE_RX:idmap = (self.can_id_rx[cid], 0x7df, self.can_id_tx[cid])if can_id not in idmap: # 物理寻址符合uds规则return Nonehead = get_n_pci(data)if head['N_PCItype'] == NPCItype_SINGLE_FRAME:#单帧if head['SF_DL'] > 0:if head['SF_DL'] > 7:head['SF_DL'] = 7msg = {}msg['cid'] = cidmsg['cmd'] = MSG_ID_RX_UDS_APPmsg['canid'] = can_idmsg['time'] = time.time()msg['len'] = head['SF_DL']msg['data'] = data[1:head['SF_DL'] + 1]self.rx_signal_hd[cid].emit(msg)#elif head['N_PCItype'] == NPCItype_FIRST_FRAME:#多帧,第一帧out = bytearray(8)out[0] = 0x30out[2] = 1if self.run_mode[cid] == UDS_TP_MODE_TX_AND_RX:self._send_data_to_can(cid, self.can_id_tx[cid], out)self.wait_sn[cid] = 1self.decode_all_len[cid] = head['FF_DL']uds_tp_wdbg.info('can id:0x%x all size:%d', can_id, self.decode_all_len[cid])self.decode_msg[cid] = bytearray(0)if self.decode_all_len[cid] > 6:self.decode_msg[cid].extend(data[2:])self.decode_all_len[cid] -= 6# self.run_st[cid] = TX_ST_RXINGelse:msg = {}msg['cid'] = cidmsg['cmd'] = MSG_ID_RX_UDS_APPmsg['canid'] = can_idmsg['time'] = time.time()msg['len'] = self.decode_all_len[cid]msg['data'] = data[2:2 + self.decode_all_len[cid]]self.rx_signal_hd[cid].emit(msg)self.decode_all_len[cid] = 0elif head['N_PCItype'] == NPCItype_CONTINUE_FRAME:#多帧连续帧uds_tp_wdbg.info('can id:0x%x all size:%d sn:%d', can_id, self.decode_all_len[cid], self.wait_sn[cid])if (head['SN'] == self.wait_sn[cid]):self.wait_sn[cid] += 1self.wait_sn[cid] %= 16if self.decode_all_len[cid] >= 7:self.decode_all_len[cid] -= 7self.decode_msg[cid].extend(data[1:])else:self.decode_msg[cid].extend(data[1:self.decode_all_len[cid] + 1])self.decode_all_len[cid] = 0if self.decode_all_len[cid] <= 0:self.run_st[cid] = ST_IDLEmsg = {}msg['cid'] = cidmsg['cmd'] = MSG_ID_RX_UDS_APPmsg['canid'] = can_idmsg['time'] = time.time()msg['len'] = len(self.decode_msg[cid])msg['data'] = self.decode_msg[cid]self.rx_signal_hd[cid].emit(msg)else:passelif head['N_PCItype'] == NPCItype_CTRL_FRAME:#流控帧passreturn headdef get_rec_data(self, cid):can_num = self.can_hd.GetReceiveNum(self.can_handle[cid], ZCAN_TYPE_CAN)if can_num == 0:return Noneread_cnt = MAX_RCV_NUM if can_num >= MAX_RCV_NUM else can_numcan_msgs, act_num = self.can_hd.Receive(self.can_handle[cid], read_cnt, MAX_RCV_NUM)if act_num == 0:return Nonemsg = {}msg['cid'] = cidmsg['cmd'] = MSG_ID_RX_PHYmsg['canid'] = can_msgs[0].frame.can_id & 0xfffffffmsg['time'] = time.time()msg['len'] = can_msgs[0].frame.can_dlcmsg['data'] = bytearray(can_msgs[0].frame.data)msg['pci'] = self._transmit_layer_decode(cid, msg['canid'], msg['data'])self.rx_signal_hd[cid].emit(msg)# can_id_map = [0x700,0x730,0x313, 0x701, 0x781]if (msg['canid'] == self.can_id_rx[cid]) or (self.can_id_tx[cid] == msg['canid']):uds_tp_wdbg.info('rx canid:0x%x data:%s'%(msg['canid'], binascii.hexlify(msg['data'])))return msgdef _wait_flow_ctrl(self, cid):now = time.time()while True:if time.time() - now > 0.5:return Nonemsg = self.get_rec_data(cid)if msg == None:continueif 'pci' not in msg.keys():continueelse:if msg['pci'] is not None:if 'N_PCItype' in msg['pci'].keys():if msg['pci']['N_PCItype'] == NPCItype_CTRL_FRAME:return msg['pci']def handle_press(self, cid, signal_hd):while True:if self.ch_run_mark[cid] == 0:returnrec_msg = self.get_rec_data(cid)if self.msg_queue[cid].qsize() > 0 and (self.run_st[cid] == ST_IDLE):msg = self.msg_queue[cid].get()if msg[0] == MSG_ID_SEND_UDS:can_id = msg[1]send_temp_data = msg[2]if len(send_temp_data) <= 7:tempx = bytearray(1)tempx[0] = len(send_temp_data)tempx.extend(send_temp_data)if len(send_temp_data) < 8:temp_size = 8 - len(tempx)for i in range(0, temp_size):tempx.append(0xaa)self._send_data_to_can(cid, can_id, tempx)else:all_size = len(send_temp_data)i = 6sn = 1tempx = bytearray(0)tempx.append(0x10 | (all_size >> 8))tempx.append(all_size & 0xff)tempx.extend(send_temp_data[0:6])self._send_data_to_can(cid, can_id, tempx)pci = self._wait_flow_ctrl(cid)if pci is None:#发送失败continuewhile (i < all_size):if all_size - i > 7:tempx = bytearray(0)tempx.append(0x20 + sn)tempx.extend(send_temp_data[i: i + 7])self._send_data_to_can(cid, can_id, tempx)i += 7else:tempx = bytearray(0)tempx.append(0x20 + sn)tempx.extend(send_temp_data[i:])if len(tempx) < 8:temp_size = 8 - len(tempx)for i in range(0, temp_size):tempx.append(0xaa)self._send_data_to_can(cid, can_id, tempx)i = all_sizesn += 1sn %= 16elif msg[0] == MSG_ID_SEND_PHY:self._send_data_to_can(cid, msg[1], msg[2])
这篇关于ISO15765-2 CAN报文解析 python的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!