OBD 诊断方案
OBD 方案概述
OBD(On-Board Diagnostics)诊断设备通过车辆的 OBD-II 接口读取车辆数据,是车联网最低成本的接入方式。
OBD vs T-Box 对比
| 特性 | OBD 设备 | T-Box |
|---|---|---|
| 安装方式 | 即插即用(OBD-II 接口) | 专业安装(集成到车辆) |
| 成本 | 低(100-500元) | 高(500-2000元) |
| 数据丰富度 | 中(OBD 标准数据) | 高(CAN 总线全数据) |
| 稳定性 | 一般(可能被拔出) | 高(固定安装) |
| 适用场景 | 后装市场、车队管理 | 前装市场、新车出厂 |
OBD 设备硬件设计
最小系统
OBD 设备组成:
OBD-II 接口(16针)
↓
电源管理(12V → 3.3V/3.8V)
↓
主控 MCU(STM32 / ESP32)
↓
CAN 收发器(MCP2551 / TJA1050)
↓
蜂窝模组(EC200U-CN,Cat.1)
↓
GNSS 模组(L76K,可选)
↓
存储(Flash,用于离线缓存)电源设计
汽车 OBD-II 接口供电:
Pin 16:12V(常电,点火前后均有)
注意事项:
1. 汽车电源噪声大,需要充分滤波
2. 防反接保护(P-MOS 或 肖特基二极管)
3. 过压保护(汽车电源可能有 Load Dump 瞬态,最高 87V)
4. 低功耗设计(车辆熄火后仍需工作,不能耗尽电瓶)
推荐电源方案:
12V → TVS 保护 → 肖特基二极管 → 100μF 滤波
→ LM2596(DC-DC)→ 3.8V(模组供电)
→ LDO → 3.3V(MCU 供电)OBD 数据采集
标准 OBD-II PID 读取
c
#include <stdio.h>
#include <stdint.h>
#include <string.h>
// CAN 发送函数(需要根据硬件实现)
extern int can_send(uint32_t id, uint8_t *data, uint8_t len);
extern int can_recv(uint32_t *id, uint8_t *data, uint8_t *len, int timeout_ms);
// 读取 OBD PID
int obd_read_pid(uint8_t service, uint8_t pid, uint8_t *response, int *resp_len) {
// 构建请求帧
uint8_t request[8] = {0x02, service, pid, 0x00, 0x00, 0x00, 0x00, 0x00};
// 发送到 OBD 广播地址
can_send(0x7DF, request, 8);
// 等待响应(ECU 响应地址:0x7E8-0x7EF)
uint32_t resp_id;
uint8_t resp_data[8];
uint8_t resp_dlc;
if (can_recv(&resp_id, resp_data, &resp_dlc, 200) < 0) {
return -1; // 超时
}
// 验证响应
if (resp_id < 0x7E8 || resp_id > 0x7EF) return -1;
if (resp_data[1] != (service + 0x40)) return -1; // 响应服务码
if (resp_data[2] != pid) return -1;
// 复制数据字节
*resp_len = resp_data[0] - 2; // 数据长度 = 总长度 - 2(服务码+PID)
memcpy(response, &resp_data[3], *resp_len);
return 0;
}
// 读取车速
float obd_get_speed(void) {
uint8_t data[4];
int len;
if (obd_read_pid(0x01, 0x0D, data, &len) == 0) {
return (float)data[0]; // km/h
}
return -1.0f;
}
// 读取发动机转速
float obd_get_rpm(void) {
uint8_t data[4];
int len;
if (obd_read_pid(0x01, 0x0C, data, &len) == 0) {
return ((data[0] << 8) | data[1]) / 4.0f; // RPM
}
return -1.0f;
}
// 读取冷却液温度
float obd_get_coolant_temp(void) {
uint8_t data[4];
int len;
if (obd_read_pid(0x01, 0x05, data, &len) == 0) {
return (float)data[0] - 40.0f; // °C
}
return -273.0f;
}
// 读取故障码(DTC)
int obd_get_dtc(char dtc_list[][8], int max_count) {
uint8_t request[8] = {0x01, 0x03, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00};
can_send(0x7DF, request, 8);
uint32_t resp_id;
uint8_t resp_data[8];
uint8_t resp_dlc;
int count = 0;
// 可能有多帧响应(多个故障码)
while (can_recv(&resp_id, resp_data, &resp_dlc, 200) == 0 && count < max_count) {
// 解析故障码
for (int i = 2; i < resp_dlc - 1 && count < max_count; i += 2) {
if (resp_data[i] == 0 && resp_data[i+1] == 0) break;
// 故障码格式:P0XXX, C0XXX, B0XXX, U0XXX
char prefix;
switch ((resp_data[i] >> 6) & 0x03) {
case 0: prefix = 'P'; break;
case 1: prefix = 'C'; break;
case 2: prefix = 'B'; break;
case 3: prefix = 'U'; break;
}
snprintf(dtc_list[count], 8, "%c%04X", prefix,
((resp_data[i] & 0x3F) << 8) | resp_data[i+1]);
count++;
}
}
return count;
}车队管理平台
数据上报格式
json
// OBD 设备数据上报(每30秒)
{
"device_id": "OBD_SN_123456",
"timestamp": 1716192000,
"location": {
"lat": 31.2304,
"lng": 121.4737,
"speed": 60.5,
"heading": 90
},
"obd": {
"speed": 60,
"rpm": 2000,
"coolant_temp": 90,
"fuel_level": 75,
"throttle": 25,
"engine_load": 40,
"dtc_count": 0
},
"trip": {
"distance": 12.5,
"duration": 1200,
"fuel_consumed": 1.2
}
}驾驶行为分析(UBI)
python
# 驾驶行为评分算法
class DrivingBehaviorAnalyzer:
def __init__(self):
self.score = 100
self.events = []
def analyze_acceleration(self, speed_data, time_interval=1.0):
"""分析急加速行为"""
for i in range(1, len(speed_data)):
acceleration = (speed_data[i] - speed_data[i-1]) / time_interval
if acceleration > 3.0: # 超过 3 m/s² 为急加速
self.score -= 2
self.events.append({
"type": "harsh_acceleration",
"value": acceleration,
"timestamp": i * time_interval
})
def analyze_braking(self, speed_data, time_interval=1.0):
"""分析急刹车行为"""
for i in range(1, len(speed_data)):
deceleration = (speed_data[i-1] - speed_data[i]) / time_interval
if deceleration > 4.0: # 超过 4 m/s² 为急刹车
self.score -= 3
self.events.append({
"type": "harsh_braking",
"value": deceleration,
"timestamp": i * time_interval
})
def analyze_speeding(self, speed_data, speed_limit=120):
"""分析超速行为"""
speeding_duration = sum(1 for s in speed_data if s > speed_limit)
if speeding_duration > 0:
self.score -= min(20, speeding_duration // 10)
self.events.append({
"type": "speeding",
"duration": speeding_duration,
"max_speed": max(speed_data)
})
def get_score(self):
return max(0, self.score)