Skip to content

工业协议转换

协议转换概述

工业现场存在大量不同协议的设备,协议转换网关负责将这些协议统一转换为标准的 IoT 协议(MQTT/HTTP)上传云端。

现场协议(南向):
  Modbus RTU / TCP
  OPC-UA
  PROFINET
  EtherNet/IP
  BACnet(楼宇自动化)
  DNP3(电力)
  IEC 61850(变电站)
  
云端协议(北向):
  MQTT
  HTTP/HTTPS
  AMQP
  CoAP

OPC-UA 集成

OPC-UA 服务器(网关侧)

python
# 使用 open62541 Python 绑定(opcua-asyncio)
import asyncio
from asyncua import Server, ua

async def setup_opcua_server():
    """设置 OPC-UA 服务器,暴露 PLC 数据"""
    server = Server()
    await server.init()
    
    server.set_endpoint("opc.tcp://0.0.0.0:4840/freeopcua/server/")
    server.set_server_name("Industrial Gateway OPC-UA Server")
    
    # 设置安全策略
    await server.set_security_policy([
        ua.SecurityPolicyType.NoSecurity,
        ua.SecurityPolicyType.Basic256Sha256_SignAndEncrypt
    ])
    
    # 创建命名空间
    uri = "http://quectel.com/gateway"
    idx = await server.register_namespace(uri)
    
    # 创建对象节点
    objects = server.nodes.objects
    factory = await objects.add_object(idx, "Factory")
    
    # 添加变量节点
    temperature = await factory.add_variable(idx, "Temperature", 0.0)
    await temperature.set_writable()
    
    pressure = await factory.add_variable(idx, "Pressure", 0.0)
    motor_status = await factory.add_variable(idx, "MotorStatus", False)
    
    async with server:
        print("OPC-UA Server started")
        
        while True:
            # 从 Modbus 读取数据并更新 OPC-UA 节点
            plc_data = read_plc_data()
            
            await temperature.write_value(plc_data['temperature'])
            await pressure.write_value(plc_data['pressure'])
            await motor_status.write_value(plc_data['motor_running'])
            
            await asyncio.sleep(1)

asyncio.run(setup_opcua_server())

OPC-UA 客户端(SCADA 侧)

python
# SCADA 系统通过 OPC-UA 读取网关数据
from asyncua import Client

async def read_from_gateway():
    async with Client("opc.tcp://192.168.1.100:4840/") as client:
        # 浏览节点
        root = client.nodes.root
        objects = client.nodes.objects
        
        # 读取温度
        temp_node = await client.nodes.root.get_child(
            ["0:Objects", "2:Factory", "2:Temperature"]
        )
        temperature = await temp_node.read_value()
        print(f"Temperature: {temperature}°C")
        
        # 订阅数据变化
        handler = SubHandler()
        sub = await client.create_subscription(500, handler)
        await sub.subscribe_data_change([temp_node])
        
        await asyncio.sleep(10)

class SubHandler:
    def datachange_notification(self, node, val, data):
        print(f"Data change: {node} = {val}")

PROFINET 集成

PROFINET 是西门子主导的工业以太网标准,广泛用于西门子 PLC 系统。

通过 Profinet IO 读取数据

python
# 使用 python-profinet 库(或通过 OPC-UA 桥接)
# 注意:PROFINET 通常需要专用硬件支持

# 更常见的方案:通过西门子 S7 协议访问
import snap7

def read_profinet_device_via_s7(plc_ip):
    """通过 S7 协议访问 PROFINET 设备数据"""
    client = snap7.client.Client()
    client.connect(plc_ip, 0, 1)
    
    # 读取 PROFINET IO 设备的数据
    # 数据通常映射到 PLC 的 I/O 区域
    input_data = client.read_area(snap7.types.Areas.PE, 0, 0, 10)  # 读取 10 字节输入
    output_data = client.read_area(snap7.types.Areas.PA, 0, 0, 10)  # 读取 10 字节输出
    
    return input_data, output_data

BACnet 楼宇自动化

BACnet 是楼宇自动化领域的标准协议,用于暖通空调(HVAC)、照明、门禁等系统。

python
# 使用 BAC0 库访问 BACnet 设备
import BAC0

def read_bacnet_device():
    """读取 BACnet 设备数据"""
    bacnet = BAC0.lite()
    
    # 读取模拟输入(温度传感器)
    # 格式:设备IP:端口 对象类型 对象实例 属性
    temperature = bacnet.read('192.168.1.200 analogInput 1 presentValue')
    print(f"Room Temperature: {temperature}°C")
    
    # 读取二进制输出(风机状态)
    fan_status = bacnet.read('192.168.1.200 binaryOutput 1 presentValue')
    print(f"Fan Status: {'ON' if fan_status else 'OFF'}")
    
    # 写入设定值(温度设定点)
    bacnet.write('192.168.1.200 analogValue 1 presentValue 22.5')
    
    bacnet.disconnect()

统一数据模型

设备信息模型(基于 IEC CIM)

json
{
  "device": {
    "id": "DEVICE_PLC_001",
    "name": "生产线1号PLC",
    "type": "PLC",
    "manufacturer": "Siemens",
    "model": "S7-1200",
    "location": {
      "plant": "上海工厂",
      "workshop": "1号车间",
      "line": "生产线1"
    },
    "protocol": "S7",
    "connection": {
      "type": "ethernet",
      "ip": "192.168.1.100",
      "port": 102
    }
  },
  "datapoints": [
    {
      "id": "DP_TEMP_001",
      "name": "设备温度",
      "type": "analog",
      "unit": "°C",
      "range": {"min": -20, "max": 150},
      "alarm": {"high": 85, "low": -10},
      "source": {"db": 1, "offset": 0, "datatype": "REAL"}
    },
    {
      "id": "DP_MOTOR_001",
      "name": "电机运行状态",
      "type": "digital",
      "source": {"db": 1, "byte": 8, "bit": 0}
    }
  ]
}

协议适配器模式

python
from abc import ABC, abstractmethod

class ProtocolAdapter(ABC):
    """协议适配器基类"""
    
    @abstractmethod
    def connect(self) -> bool:
        pass
    
    @abstractmethod
    def read_datapoint(self, datapoint_config) -> any:
        pass
    
    @abstractmethod
    def write_datapoint(self, datapoint_config, value) -> bool:
        pass
    
    @abstractmethod
    def disconnect(self):
        pass

class ModbusAdapter(ProtocolAdapter):
    """Modbus 协议适配器"""
    
    def connect(self):
        self.client = ModbusTcpClient(self.host, self.port)
        return self.client.connect()
    
    def read_datapoint(self, dp):
        result = self.client.read_holding_registers(
            dp['address'], 1, slave=dp['slave_id']
        )
        if not result.isError():
            return result.registers[0] * dp.get('scale', 1.0)
        return None

class S7Adapter(ProtocolAdapter):
    """西门子 S7 协议适配器"""
    
    def connect(self):
        self.client = snap7.client.Client()
        self.client.connect(self.ip, self.rack, self.slot)
        return self.client.get_connected()
    
    def read_datapoint(self, dp):
        data = self.client.db_read(dp['db'], dp['offset'], 4)
        return struct.unpack('>f', data)[0]

class ProtocolAdapterFactory:
    """协议适配器工厂"""
    
    @staticmethod
    def create(protocol, config):
        adapters = {
            'modbus_tcp': ModbusAdapter,
            'modbus_rtu': ModbusAdapter,
            's7': S7Adapter,
            'opcua': OPCUAAdapter,
            'bacnet': BACnetAdapter
        }
        
        adapter_class = adapters.get(protocol)
        if not adapter_class:
            raise ValueError(f"Unsupported protocol: {protocol}")
        
        return adapter_class(config)

褚成志的笔记