Skip to content

工业网关方案

工业网关软件架构

基于 Linux 的网关软件栈

┌─────────────────────────────────────────────────────┐
│                  应用层                               │
│  数据采集 / 协议转换 / 规则引擎 / 本地存储 / OTA      │
├─────────────────────────────────────────────────────┤
│                  中间件层                             │
│  MQTT 客户端 / OPC-UA / Modbus 库 / TLS / 数据库     │
├─────────────────────────────────────────────────────┤
│                  操作系统层                           │
│  Linux(Yocto / OpenWRT / Ubuntu Core)              │
├─────────────────────────────────────────────────────┤
│                  驱动层                               │
│  串口驱动 / CAN 驱动 / 以太网驱动 / 蜂窝驱动          │
└─────────────────────────────────────────────────────┘

开源网关框架

推荐使用 Node-REDEdgeX Foundry 作为网关应用框架:

Node-RED(推荐,轻量级):
  - 可视化流程编程
  - 丰富的节点库(Modbus、MQTT、HTTP)
  - 适合中小型项目
  - 资源占用:约 100MB RAM

EdgeX Foundry(企业级):
  - 微服务架构
  - 支持多种南向协议
  - 适合大型工业项目
  - 资源占用:约 500MB RAM

Modbus 网关实现

Python Modbus 转 MQTT

python
#!/usr/bin/env python3
"""
工业网关:Modbus RTU → MQTT 协议转换
"""

import time
import json
import logging
import threading
from pymodbus.client import ModbusSerialClient
import paho.mqtt.client as mqtt

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class ModbusMQTTGateway:
    def __init__(self, config):
        self.config = config
        self.modbus_client = None
        self.mqtt_client = None
        self.running = False
        
    def connect_modbus(self):
        """连接 Modbus RTU 设备"""
        self.modbus_client = ModbusSerialClient(
            port=self.config['modbus']['port'],
            baudrate=self.config['modbus']['baudrate'],
            parity=self.config['modbus']['parity'],
            stopbits=self.config['modbus']['stopbits'],
            bytesize=8,
            timeout=1
        )
        
        if self.modbus_client.connect():
            logger.info(f"Modbus connected: {self.config['modbus']['port']}")
            return True
        else:
            logger.error("Modbus connection failed")
            return False
    
    def connect_mqtt(self):
        """连接 MQTT 服务器"""
        self.mqtt_client = mqtt.Client(
            client_id=self.config['mqtt']['client_id']
        )
        
        if self.config['mqtt'].get('username'):
            self.mqtt_client.username_pw_set(
                self.config['mqtt']['username'],
                self.config['mqtt']['password']
            )
        
        # TLS 配置
        if self.config['mqtt'].get('tls'):
            self.mqtt_client.tls_set(
                ca_certs=self.config['mqtt']['tls']['ca_cert'],
                certfile=self.config['mqtt']['tls']['client_cert'],
                keyfile=self.config['mqtt']['tls']['client_key']
            )
        
        self.mqtt_client.on_connect = self._on_mqtt_connect
        self.mqtt_client.on_message = self._on_mqtt_message
        
        self.mqtt_client.connect(
            self.config['mqtt']['host'],
            self.config['mqtt']['port'],
            keepalive=60
        )
        self.mqtt_client.loop_start()
    
    def _on_mqtt_connect(self, client, userdata, flags, rc):
        if rc == 0:
            logger.info("MQTT connected")
            # 订阅控制主题
            client.subscribe(f"gateway/{self.config['gateway_id']}/cmd/#")
        else:
            logger.error(f"MQTT connection failed: {rc}")
    
    def _on_mqtt_message(self, client, userdata, msg):
        """处理 MQTT 控制命令"""
        try:
            topic = msg.topic
            payload = json.loads(msg.payload.decode())
            
            # 解析写寄存器命令
            if '/cmd/write_register' in topic:
                slave_id = payload['slave_id']
                address = payload['address']
                value = payload['value']
                
                result = self.modbus_client.write_register(
                    address, value, slave=slave_id
                )
                
                # 发布执行结果
                response = {
                    "cmd_id": payload.get('cmd_id'),
                    "result": "success" if not result.isError() else "failed"
                }
                self.mqtt_client.publish(
                    f"gateway/{self.config['gateway_id']}/response",
                    json.dumps(response)
                )
        except Exception as e:
            logger.error(f"Command processing error: {e}")
    
    def read_and_publish(self, device_config):
        """读取 Modbus 数据并发布到 MQTT"""
        slave_id = device_config['slave_id']
        device_name = device_config['name']
        
        data = {}
        
        # 读取保持寄存器
        for reg_config in device_config.get('holding_registers', []):
            try:
                result = self.modbus_client.read_holding_registers(
                    reg_config['address'],
                    reg_config['count'],
                    slave=slave_id
                )
                
                if not result.isError():
                    raw_value = result.registers[0]
                    # 应用缩放因子
                    value = raw_value * reg_config.get('scale', 1.0) + reg_config.get('offset', 0.0)
                    data[reg_config['name']] = round(value, reg_config.get('decimals', 2))
                    
            except Exception as e:
                logger.error(f"Read register error: {e}")
        
        if data:
            payload = {
                "device": device_name,
                "timestamp": int(time.time()),
                "data": data
            }
            
            topic = f"factory/{device_name}/data"
            self.mqtt_client.publish(topic, json.dumps(payload), qos=1)
            logger.info(f"Published: {topic} -> {payload}")
    
    def run(self):
        """主运行循环"""
        if not self.connect_modbus():
            return
        
        self.connect_mqtt()
        self.running = True
        
        while self.running:
            for device in self.config['devices']:
                self.read_and_publish(device)
            
            time.sleep(self.config.get('poll_interval', 5))

# 配置示例
config = {
    "gateway_id": "GW_FACTORY_001",
    "modbus": {
        "port": "/dev/ttyUSB0",
        "baudrate": 9600,
        "parity": "N",
        "stopbits": 1
    },
    "mqtt": {
        "host": "broker.emqx.io",
        "port": 1883,
        "client_id": "industrial_gateway_001",
        "username": "user",
        "password": "pass"
    },
    "poll_interval": 5,
    "devices": [
        {
            "name": "PLC_001",
            "slave_id": 1,
            "holding_registers": [
                {"name": "temperature", "address": 0, "count": 1, "scale": 0.1, "decimals": 1},
                {"name": "pressure", "address": 1, "count": 1, "scale": 0.01, "decimals": 2},
                {"name": "flow_rate", "address": 2, "count": 1, "scale": 1.0, "decimals": 0}
            ]
        }
    ]
}

gateway = ModbusMQTTGateway(config)
gateway.run()

边缘计算

本地规则引擎

python
class EdgeRuleEngine:
    """边缘侧规则引擎:本地处理,减少云端压力"""
    
    def __init__(self):
        self.rules = []
        self.alert_history = {}
    
    def add_rule(self, rule):
        """添加规则"""
        self.rules.append(rule)
    
    def evaluate(self, device_name, data):
        """评估规则"""
        for rule in self.rules:
            if rule['device'] != device_name:
                continue
            
            field = rule['field']
            if field not in data:
                continue
            
            value = data[field]
            condition = rule['condition']
            threshold = rule['threshold']
            
            triggered = False
            if condition == 'gt' and value > threshold:
                triggered = True
            elif condition == 'lt' and value < threshold:
                triggered = True
            elif condition == 'eq' and value == threshold:
                triggered = True
            
            if triggered:
                self._handle_alert(rule, device_name, field, value)
    
    def _handle_alert(self, rule, device, field, value):
        """处理告警(防抖动)"""
        alert_key = f"{device}_{field}"
        now = time.time()
        
        # 防抖动:同一告警 5 分钟内只触发一次
        if alert_key in self.alert_history:
            if now - self.alert_history[alert_key] < 300:
                return
        
        self.alert_history[alert_key] = now
        
        alert = {
            "device": device,
            "field": field,
            "value": value,
            "threshold": rule['threshold'],
            "condition": rule['condition'],
            "message": rule['message'],
            "timestamp": int(now)
        }
        
        # 发送告警(MQTT / 短信 / 钉钉)
        self.send_alert(alert)
    
    def send_alert(self, alert):
        """发送告警通知"""
        # 发布到 MQTT 告警主题
        mqtt_client.publish("factory/alerts", json.dumps(alert), qos=2)
        logger.warning(f"ALERT: {alert['message']} - {alert['device']}.{alert['field']} = {alert['value']}")

# 配置规则
engine = EdgeRuleEngine()
engine.add_rule({
    "device": "PLC_001",
    "field": "temperature",
    "condition": "gt",
    "threshold": 85.0,
    "message": "设备温度过高"
})
engine.add_rule({
    "device": "PLC_001",
    "field": "pressure",
    "condition": "gt",
    "threshold": 10.0,
    "message": "压力超限"
})

离线缓存与断点续传

python
import sqlite3
import json
import time

class OfflineCache:
    """离线数据缓存:网络断开时本地存储,恢复后自动上传"""
    
    def __init__(self, db_path="/data/cache.db"):
        self.db_path = db_path
        self._init_db()
    
    def _init_db(self):
        conn = sqlite3.connect(self.db_path)
        conn.execute("""
            CREATE TABLE IF NOT EXISTS data_cache (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                topic TEXT NOT NULL,
                payload TEXT NOT NULL,
                timestamp INTEGER NOT NULL,
                uploaded INTEGER DEFAULT 0
            )
        """)
        conn.commit()
        conn.close()
    
    def save(self, topic, payload):
        """保存数据到本地缓存"""
        conn = sqlite3.connect(self.db_path)
        conn.execute(
            "INSERT INTO data_cache (topic, payload, timestamp) VALUES (?, ?, ?)",
            (topic, json.dumps(payload), int(time.time()))
        )
        conn.commit()
        conn.close()
    
    def get_pending(self, limit=100):
        """获取待上传的数据"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.execute(
            "SELECT id, topic, payload FROM data_cache WHERE uploaded=0 ORDER BY id LIMIT ?",
            (limit,)
        )
        rows = cursor.fetchall()
        conn.close()
        return rows
    
    def mark_uploaded(self, ids):
        """标记已上传"""
        conn = sqlite3.connect(self.db_path)
        conn.execute(
            f"UPDATE data_cache SET uploaded=1 WHERE id IN ({','.join('?'*len(ids))})",
            ids
        )
        conn.commit()
        conn.close()
    
    def cleanup(self, days=7):
        """清理旧数据"""
        cutoff = int(time.time()) - days * 86400
        conn = sqlite3.connect(self.db_path)
        conn.execute(
            "DELETE FROM data_cache WHERE uploaded=1 AND timestamp < ?",
            (cutoff,)
        )
        conn.commit()
        conn.close()

褚成志的笔记