智慧城市解决方案概览
智慧城市 IoT 架构
智慧城市是 NB-IoT 最重要的应用场景,移远 BC660K 等 NB-IoT 模组在智慧城市中有大量应用。
智慧城市 IoT 架构:
┌─────────────────────────────────────────────────────┐
│ 感知层(终端设备) │
│ 智慧路灯 / 智慧停车 / 环境监测 / 智能井盖 / 垃圾桶 │
│ NB-IoT / Cat.1 / LoRa 模组 │
└─────────────────────────────────────────────────────┘
↓ NB-IoT / Cat.1 网络
┌─────────────────────────────────────────────────────┐
│ 网络层 │
│ 运营商 NB-IoT 网络 / 4G/5G 网络 │
└─────────────────────────────────────────────────────┘
↓ MQTT / CoAP / HTTP
┌─────────────────────────────────────────────────────┐
│ 平台层 │
│ IoT 平台(设备管理 / 数据存储 / 规则引擎) │
└─────────────────────────────────────────────────────┘
↓ API
┌─────────────────────────────────────────────────────┐
│ 应用层 │
│ 城市管理平台 / GIS 地图 / 移动 App / 大屏展示 │
└─────────────────────────────────────────────────────┘智慧路灯
系统架构
路灯控制器(NB-IoT 模组)
├── 灯具控制(PWM 调光)
├── 电量计量(电流/电压/功率)
├── 故障检测(断路/短路)
└── 环境感知(光照/温度)
↓ NB-IoT
路灯管理平台
├── 单灯控制(远程开关/调光)
├── 分组控制(按区域/时间)
├── 能耗统计(节能分析)
└── 故障告警(自动派单)推荐方案
模组:BC660K-GL(NB-IoT)
- 超低功耗(路灯有市电,但控制器需要低功耗)
- 覆盖广(城市各角落)
- 成本低(大规模部署)
通信协议:
上行:CoAP / MQTT(状态上报)
下行:CoAP / MQTT(控制命令)
数据上报频率:
正常状态:每小时上报一次
故障状态:立即上报
控制响应:< 5秒路灯控制器代码
c
// QuecOpen 路灯控制器
#include "ql_api_common.h"
#include "ql_api_datacall.h"
#include "ql_api_coap.h"
#include "ql_api_adc.h"
#include "ql_api_gpio.h"
#define COAP_SERVER "coap://iot.example.com:5683"
#define DEVICE_ID "LAMP_001"
// 读取电量数据
typedef struct {
float voltage; // 电压(V)
float current; // 电流(A)
float power; // 功率(W)
float energy; // 累计电量(kWh)
int lamp_status; // 灯具状态(0=关,1=开)
int brightness; // 亮度(0-100%)
int fault_code; // 故障码(0=正常)
} LampData;
LampData read_lamp_data(void) {
LampData data = {0};
// 通过 RS485 读取电量计数据(Modbus RTU)
// ... Modbus 读取代码 ...
// 读取 GPIO 状态
data.lamp_status = Ql_GPIO_GetLevel(GPIO_LAMP_STATUS);
return data;
}
// 上报数据
void report_lamp_data(LampData *data) {
char payload[256];
snprintf(payload, sizeof(payload),
"{\"id\":\"%s\",\"v\":%.1f,\"i\":%.2f,\"p\":%.1f,"
"\"e\":%.3f,\"s\":%d,\"b\":%d,\"f\":%d}",
DEVICE_ID, data->voltage, data->current, data->power,
data->energy, data->lamp_status, data->brightness, data->fault_code);
// CoAP PUT 请求
Ql_coapPut(COAP_SERVER "/lamp/status", payload, strlen(payload));
}
// 处理控制命令
void handle_control_cmd(const char *cmd_json) {
// 解析 JSON 命令
// {"action":"set_brightness","value":80}
int brightness = parse_brightness(cmd_json);
if (brightness >= 0 && brightness <= 100) {
// PWM 调光
set_lamp_brightness(brightness);
QL_APP_LOG("LAMP", QL_LOG_LEVEL_INFO, "Brightness set to %d%%", brightness);
}
}智慧停车
地磁传感器方案
地磁传感器工作原理:
地球磁场在有车/无车时会发生变化
传感器检测磁场变化判断车位状态
优点:
- 安装简单(埋入地面)
- 无需外部电源(电池供电)
- 不受天气影响
缺点:
- 受地磁干扰(大型车辆经过)
- 需要定期标定停车传感器代码
python
# QuecPython 停车传感器
import utime
import machine
import dataCall
import request
import ujson
# 地磁传感器(通过 I2C 连接)
i2c = machine.I2C(machine.I2C.I2C1, machine.I2C.STANDARD_MODE)
SENSOR_ADDR = 0x0D # QMC5883L 地磁传感器地址
PARKING_ID = "PARKING_A_001"
SERVER_URL = "https://parking.example.com/api"
def read_magnetic_field():
"""读取磁场强度"""
# 读取 X/Y/Z 轴磁场数据
data = i2c.read(SENSOR_ADDR, 6)
x = (data[1] << 8) | data[0]
y = (data[3] << 8) | data[2]
z = (data[5] << 8) | data[4]
# 处理有符号数
if x > 32767: x -= 65536
if y > 32767: y -= 65536
if z > 32767: z -= 65536
return x, y, z
def detect_vehicle(baseline_x, baseline_y, baseline_z, threshold=200):
"""检测是否有车辆"""
x, y, z = read_magnetic_field()
# 计算与基准值的偏差
delta = abs(x - baseline_x) + abs(y - baseline_y) + abs(z - baseline_z)
return delta > threshold
def calibrate_baseline():
"""标定基准值(无车时执行)"""
samples = []
for _ in range(10):
x, y, z = read_magnetic_field()
samples.append((x, y, z))
utime.sleep_ms(100)
# 取平均值
baseline_x = sum(s[0] for s in samples) // len(samples)
baseline_y = sum(s[1] for s in samples) // len(samples)
baseline_z = sum(s[2] for s in samples) // len(samples)
return baseline_x, baseline_y, baseline_z
# 主程序
baseline = calibrate_baseline()
last_status = None
while True:
occupied = detect_vehicle(*baseline)
if occupied != last_status:
# 状态变化,立即上报
data = {
"parking_id": PARKING_ID,
"occupied": occupied,
"timestamp": utime.time()
}
try:
resp = request.post(
SERVER_URL + "/status",
json=data
)
print(f"Status updated: {'occupied' if occupied else 'free'}")
except Exception as e:
print(f"Upload failed: {e}")
last_status = occupied
utime.sleep(5)环境监测
空气质量监测站
监测参数:
PM2.5 / PM10(颗粒物)
CO2(二氧化碳)
NO2(二氧化氮)
O3(臭氧)
SO2(二氧化硫)
温度 / 湿度 / 气压
噪声
通信方案:
Cat.1(EC200U):数据量较大,需要较高速率
NB-IoT(BC660K):数据量小,低功耗
数据上报:
正常:每5分钟上报一次
超标:立即上报,触发告警环境监测数据格式
json
{
"station_id": "ENV_STATION_001",
"location": {
"lat": 31.2304,
"lng": 121.4737,
"address": "上海市浦东新区XX路XX号"
},
"timestamp": 1716192000,
"air_quality": {
"aqi": 85,
"pm25": 35.6,
"pm10": 68.2,
"co2": 420,
"no2": 45,
"o3": 80,
"so2": 12
},
"weather": {
"temperature": 25.6,
"humidity": 65,
"pressure": 1013.2,
"wind_speed": 3.5,
"wind_direction": 180
},
"noise": {
"leq": 65.2,
"lmax": 78.5
}
}