工业网关方案
工业网关软件架构
基于 Linux 的网关软件栈
┌─────────────────────────────────────────────────────┐
│ 应用层 │
│ 数据采集 / 协议转换 / 规则引擎 / 本地存储 / OTA │
├─────────────────────────────────────────────────────┤
│ 中间件层 │
│ MQTT 客户端 / OPC-UA / Modbus 库 / TLS / 数据库 │
├─────────────────────────────────────────────────────┤
│ 操作系统层 │
│ Linux(Yocto / OpenWRT / Ubuntu Core) │
├─────────────────────────────────────────────────────┤
│ 驱动层 │
│ 串口驱动 / CAN 驱动 / 以太网驱动 / 蜂窝驱动 │
└─────────────────────────────────────────────────────┘开源网关框架
推荐使用 Node-RED 或 EdgeX Foundry 作为网关应用框架:
Node-RED(推荐,轻量级):
- 可视化流程编程
- 丰富的节点库(Modbus、MQTT、HTTP)
- 适合中小型项目
- 资源占用:约 100MB RAM
EdgeX Foundry(企业级):
- 微服务架构
- 支持多种南向协议
- 适合大型工业项目
- 资源占用:约 500MB RAMModbus 网关实现
Python Modbus 转 MQTT
python
#!/usr/bin/env python3
"""
工业网关:Modbus RTU → MQTT 协议转换
"""
import time
import json
import logging
import threading
from pymodbus.client import ModbusSerialClient
import paho.mqtt.client as mqtt
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class ModbusMQTTGateway:
def __init__(self, config):
self.config = config
self.modbus_client = None
self.mqtt_client = None
self.running = False
def connect_modbus(self):
"""连接 Modbus RTU 设备"""
self.modbus_client = ModbusSerialClient(
port=self.config['modbus']['port'],
baudrate=self.config['modbus']['baudrate'],
parity=self.config['modbus']['parity'],
stopbits=self.config['modbus']['stopbits'],
bytesize=8,
timeout=1
)
if self.modbus_client.connect():
logger.info(f"Modbus connected: {self.config['modbus']['port']}")
return True
else:
logger.error("Modbus connection failed")
return False
def connect_mqtt(self):
"""连接 MQTT 服务器"""
self.mqtt_client = mqtt.Client(
client_id=self.config['mqtt']['client_id']
)
if self.config['mqtt'].get('username'):
self.mqtt_client.username_pw_set(
self.config['mqtt']['username'],
self.config['mqtt']['password']
)
# TLS 配置
if self.config['mqtt'].get('tls'):
self.mqtt_client.tls_set(
ca_certs=self.config['mqtt']['tls']['ca_cert'],
certfile=self.config['mqtt']['tls']['client_cert'],
keyfile=self.config['mqtt']['tls']['client_key']
)
self.mqtt_client.on_connect = self._on_mqtt_connect
self.mqtt_client.on_message = self._on_mqtt_message
self.mqtt_client.connect(
self.config['mqtt']['host'],
self.config['mqtt']['port'],
keepalive=60
)
self.mqtt_client.loop_start()
def _on_mqtt_connect(self, client, userdata, flags, rc):
if rc == 0:
logger.info("MQTT connected")
# 订阅控制主题
client.subscribe(f"gateway/{self.config['gateway_id']}/cmd/#")
else:
logger.error(f"MQTT connection failed: {rc}")
def _on_mqtt_message(self, client, userdata, msg):
"""处理 MQTT 控制命令"""
try:
topic = msg.topic
payload = json.loads(msg.payload.decode())
# 解析写寄存器命令
if '/cmd/write_register' in topic:
slave_id = payload['slave_id']
address = payload['address']
value = payload['value']
result = self.modbus_client.write_register(
address, value, slave=slave_id
)
# 发布执行结果
response = {
"cmd_id": payload.get('cmd_id'),
"result": "success" if not result.isError() else "failed"
}
self.mqtt_client.publish(
f"gateway/{self.config['gateway_id']}/response",
json.dumps(response)
)
except Exception as e:
logger.error(f"Command processing error: {e}")
def read_and_publish(self, device_config):
"""读取 Modbus 数据并发布到 MQTT"""
slave_id = device_config['slave_id']
device_name = device_config['name']
data = {}
# 读取保持寄存器
for reg_config in device_config.get('holding_registers', []):
try:
result = self.modbus_client.read_holding_registers(
reg_config['address'],
reg_config['count'],
slave=slave_id
)
if not result.isError():
raw_value = result.registers[0]
# 应用缩放因子
value = raw_value * reg_config.get('scale', 1.0) + reg_config.get('offset', 0.0)
data[reg_config['name']] = round(value, reg_config.get('decimals', 2))
except Exception as e:
logger.error(f"Read register error: {e}")
if data:
payload = {
"device": device_name,
"timestamp": int(time.time()),
"data": data
}
topic = f"factory/{device_name}/data"
self.mqtt_client.publish(topic, json.dumps(payload), qos=1)
logger.info(f"Published: {topic} -> {payload}")
def run(self):
"""主运行循环"""
if not self.connect_modbus():
return
self.connect_mqtt()
self.running = True
while self.running:
for device in self.config['devices']:
self.read_and_publish(device)
time.sleep(self.config.get('poll_interval', 5))
# 配置示例
config = {
"gateway_id": "GW_FACTORY_001",
"modbus": {
"port": "/dev/ttyUSB0",
"baudrate": 9600,
"parity": "N",
"stopbits": 1
},
"mqtt": {
"host": "broker.emqx.io",
"port": 1883,
"client_id": "industrial_gateway_001",
"username": "user",
"password": "pass"
},
"poll_interval": 5,
"devices": [
{
"name": "PLC_001",
"slave_id": 1,
"holding_registers": [
{"name": "temperature", "address": 0, "count": 1, "scale": 0.1, "decimals": 1},
{"name": "pressure", "address": 1, "count": 1, "scale": 0.01, "decimals": 2},
{"name": "flow_rate", "address": 2, "count": 1, "scale": 1.0, "decimals": 0}
]
}
]
}
gateway = ModbusMQTTGateway(config)
gateway.run()边缘计算
本地规则引擎
python
class EdgeRuleEngine:
"""边缘侧规则引擎:本地处理,减少云端压力"""
def __init__(self):
self.rules = []
self.alert_history = {}
def add_rule(self, rule):
"""添加规则"""
self.rules.append(rule)
def evaluate(self, device_name, data):
"""评估规则"""
for rule in self.rules:
if rule['device'] != device_name:
continue
field = rule['field']
if field not in data:
continue
value = data[field]
condition = rule['condition']
threshold = rule['threshold']
triggered = False
if condition == 'gt' and value > threshold:
triggered = True
elif condition == 'lt' and value < threshold:
triggered = True
elif condition == 'eq' and value == threshold:
triggered = True
if triggered:
self._handle_alert(rule, device_name, field, value)
def _handle_alert(self, rule, device, field, value):
"""处理告警(防抖动)"""
alert_key = f"{device}_{field}"
now = time.time()
# 防抖动:同一告警 5 分钟内只触发一次
if alert_key in self.alert_history:
if now - self.alert_history[alert_key] < 300:
return
self.alert_history[alert_key] = now
alert = {
"device": device,
"field": field,
"value": value,
"threshold": rule['threshold'],
"condition": rule['condition'],
"message": rule['message'],
"timestamp": int(now)
}
# 发送告警(MQTT / 短信 / 钉钉)
self.send_alert(alert)
def send_alert(self, alert):
"""发送告警通知"""
# 发布到 MQTT 告警主题
mqtt_client.publish("factory/alerts", json.dumps(alert), qos=2)
logger.warning(f"ALERT: {alert['message']} - {alert['device']}.{alert['field']} = {alert['value']}")
# 配置规则
engine = EdgeRuleEngine()
engine.add_rule({
"device": "PLC_001",
"field": "temperature",
"condition": "gt",
"threshold": 85.0,
"message": "设备温度过高"
})
engine.add_rule({
"device": "PLC_001",
"field": "pressure",
"condition": "gt",
"threshold": 10.0,
"message": "压力超限"
})离线缓存与断点续传
python
import sqlite3
import json
import time
class OfflineCache:
"""离线数据缓存:网络断开时本地存储,恢复后自动上传"""
def __init__(self, db_path="/data/cache.db"):
self.db_path = db_path
self._init_db()
def _init_db(self):
conn = sqlite3.connect(self.db_path)
conn.execute("""
CREATE TABLE IF NOT EXISTS data_cache (
id INTEGER PRIMARY KEY AUTOINCREMENT,
topic TEXT NOT NULL,
payload TEXT NOT NULL,
timestamp INTEGER NOT NULL,
uploaded INTEGER DEFAULT 0
)
""")
conn.commit()
conn.close()
def save(self, topic, payload):
"""保存数据到本地缓存"""
conn = sqlite3.connect(self.db_path)
conn.execute(
"INSERT INTO data_cache (topic, payload, timestamp) VALUES (?, ?, ?)",
(topic, json.dumps(payload), int(time.time()))
)
conn.commit()
conn.close()
def get_pending(self, limit=100):
"""获取待上传的数据"""
conn = sqlite3.connect(self.db_path)
cursor = conn.execute(
"SELECT id, topic, payload FROM data_cache WHERE uploaded=0 ORDER BY id LIMIT ?",
(limit,)
)
rows = cursor.fetchall()
conn.close()
return rows
def mark_uploaded(self, ids):
"""标记已上传"""
conn = sqlite3.connect(self.db_path)
conn.execute(
f"UPDATE data_cache SET uploaded=1 WHERE id IN ({','.join('?'*len(ids))})",
ids
)
conn.commit()
conn.close()
def cleanup(self, days=7):
"""清理旧数据"""
cutoff = int(time.time()) - days * 86400
conn = sqlite3.connect(self.db_path)
conn.execute(
"DELETE FROM data_cache WHERE uploaded=1 AND timestamp < ?",
(cutoff,)
)
conn.commit()
conn.close()