Skip to content

OBD 诊断方案

OBD 方案概述

OBD(On-Board Diagnostics)诊断设备通过车辆的 OBD-II 接口读取车辆数据,是车联网最低成本的接入方式。

OBD vs T-Box 对比

特性OBD 设备T-Box
安装方式即插即用(OBD-II 接口)专业安装(集成到车辆)
成本低(100-500元)高(500-2000元)
数据丰富度中(OBD 标准数据)高(CAN 总线全数据)
稳定性一般(可能被拔出)高(固定安装)
适用场景后装市场、车队管理前装市场、新车出厂

OBD 设备硬件设计

最小系统

OBD 设备组成:
  OBD-II 接口(16针)

  电源管理(12V → 3.3V/3.8V)

  主控 MCU(STM32 / ESP32)

  CAN 收发器(MCP2551 / TJA1050)

  蜂窝模组(EC200U-CN,Cat.1)

  GNSS 模组(L76K,可选)

  存储(Flash,用于离线缓存)

电源设计

汽车 OBD-II 接口供电:
  Pin 16:12V(常电,点火前后均有)
  
  注意事项:
  1. 汽车电源噪声大,需要充分滤波
  2. 防反接保护(P-MOS 或 肖特基二极管)
  3. 过压保护(汽车电源可能有 Load Dump 瞬态,最高 87V)
  4. 低功耗设计(车辆熄火后仍需工作,不能耗尽电瓶)

推荐电源方案:
  12V → TVS 保护 → 肖特基二极管 → 100μF 滤波
      → LM2596(DC-DC)→ 3.8V(模组供电)
      → LDO → 3.3V(MCU 供电)

OBD 数据采集

标准 OBD-II PID 读取

c
#include <stdio.h>
#include <stdint.h>
#include <string.h>

// CAN 发送函数(需要根据硬件实现)
extern int can_send(uint32_t id, uint8_t *data, uint8_t len);
extern int can_recv(uint32_t *id, uint8_t *data, uint8_t *len, int timeout_ms);

// 读取 OBD PID
int obd_read_pid(uint8_t service, uint8_t pid, uint8_t *response, int *resp_len) {
    // 构建请求帧
    uint8_t request[8] = {0x02, service, pid, 0x00, 0x00, 0x00, 0x00, 0x00};
    
    // 发送到 OBD 广播地址
    can_send(0x7DF, request, 8);
    
    // 等待响应(ECU 响应地址:0x7E8-0x7EF)
    uint32_t resp_id;
    uint8_t resp_data[8];
    uint8_t resp_dlc;
    
    if (can_recv(&resp_id, resp_data, &resp_dlc, 200) < 0) {
        return -1;  // 超时
    }
    
    // 验证响应
    if (resp_id < 0x7E8 || resp_id > 0x7EF) return -1;
    if (resp_data[1] != (service + 0x40)) return -1;  // 响应服务码
    if (resp_data[2] != pid) return -1;
    
    // 复制数据字节
    *resp_len = resp_data[0] - 2;  // 数据长度 = 总长度 - 2(服务码+PID)
    memcpy(response, &resp_data[3], *resp_len);
    
    return 0;
}

// 读取车速
float obd_get_speed(void) {
    uint8_t data[4];
    int len;
    
    if (obd_read_pid(0x01, 0x0D, data, &len) == 0) {
        return (float)data[0];  // km/h
    }
    return -1.0f;
}

// 读取发动机转速
float obd_get_rpm(void) {
    uint8_t data[4];
    int len;
    
    if (obd_read_pid(0x01, 0x0C, data, &len) == 0) {
        return ((data[0] << 8) | data[1]) / 4.0f;  // RPM
    }
    return -1.0f;
}

// 读取冷却液温度
float obd_get_coolant_temp(void) {
    uint8_t data[4];
    int len;
    
    if (obd_read_pid(0x01, 0x05, data, &len) == 0) {
        return (float)data[0] - 40.0f;  // °C
    }
    return -273.0f;
}

// 读取故障码(DTC)
int obd_get_dtc(char dtc_list[][8], int max_count) {
    uint8_t request[8] = {0x01, 0x03, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00};
    can_send(0x7DF, request, 8);
    
    uint32_t resp_id;
    uint8_t resp_data[8];
    uint8_t resp_dlc;
    int count = 0;
    
    // 可能有多帧响应(多个故障码)
    while (can_recv(&resp_id, resp_data, &resp_dlc, 200) == 0 && count < max_count) {
        // 解析故障码
        for (int i = 2; i < resp_dlc - 1 && count < max_count; i += 2) {
            if (resp_data[i] == 0 && resp_data[i+1] == 0) break;
            
            // 故障码格式:P0XXX, C0XXX, B0XXX, U0XXX
            char prefix;
            switch ((resp_data[i] >> 6) & 0x03) {
                case 0: prefix = 'P'; break;
                case 1: prefix = 'C'; break;
                case 2: prefix = 'B'; break;
                case 3: prefix = 'U'; break;
            }
            
            snprintf(dtc_list[count], 8, "%c%04X", prefix,
                     ((resp_data[i] & 0x3F) << 8) | resp_data[i+1]);
            count++;
        }
    }
    
    return count;
}

车队管理平台

数据上报格式

json
// OBD 设备数据上报(每30秒)
{
  "device_id": "OBD_SN_123456",
  "timestamp": 1716192000,
  "location": {
    "lat": 31.2304,
    "lng": 121.4737,
    "speed": 60.5,
    "heading": 90
  },
  "obd": {
    "speed": 60,
    "rpm": 2000,
    "coolant_temp": 90,
    "fuel_level": 75,
    "throttle": 25,
    "engine_load": 40,
    "dtc_count": 0
  },
  "trip": {
    "distance": 12.5,
    "duration": 1200,
    "fuel_consumed": 1.2
  }
}

驾驶行为分析(UBI)

python
# 驾驶行为评分算法
class DrivingBehaviorAnalyzer:
    def __init__(self):
        self.score = 100
        self.events = []
    
    def analyze_acceleration(self, speed_data, time_interval=1.0):
        """分析急加速行为"""
        for i in range(1, len(speed_data)):
            acceleration = (speed_data[i] - speed_data[i-1]) / time_interval
            
            if acceleration > 3.0:  # 超过 3 m/s² 为急加速
                self.score -= 2
                self.events.append({
                    "type": "harsh_acceleration",
                    "value": acceleration,
                    "timestamp": i * time_interval
                })
    
    def analyze_braking(self, speed_data, time_interval=1.0):
        """分析急刹车行为"""
        for i in range(1, len(speed_data)):
            deceleration = (speed_data[i-1] - speed_data[i]) / time_interval
            
            if deceleration > 4.0:  # 超过 4 m/s² 为急刹车
                self.score -= 3
                self.events.append({
                    "type": "harsh_braking",
                    "value": deceleration,
                    "timestamp": i * time_interval
                })
    
    def analyze_speeding(self, speed_data, speed_limit=120):
        """分析超速行为"""
        speeding_duration = sum(1 for s in speed_data if s > speed_limit)
        
        if speeding_duration > 0:
            self.score -= min(20, speeding_duration // 10)
            self.events.append({
                "type": "speeding",
                "duration": speeding_duration,
                "max_speed": max(speed_data)
            })
    
    def get_score(self):
        return max(0, self.score)

褚成志的笔记