工业协议转换
协议转换概述
工业现场存在大量不同协议的设备,协议转换网关负责将这些协议统一转换为标准的 IoT 协议(MQTT/HTTP)上传云端。
现场协议(南向):
Modbus RTU / TCP
OPC-UA
PROFINET
EtherNet/IP
BACnet(楼宇自动化)
DNP3(电力)
IEC 61850(变电站)
云端协议(北向):
MQTT
HTTP/HTTPS
AMQP
CoAPOPC-UA 集成
OPC-UA 服务器(网关侧)
python
# 使用 open62541 Python 绑定(opcua-asyncio)
import asyncio
from asyncua import Server, ua
async def setup_opcua_server():
"""设置 OPC-UA 服务器,暴露 PLC 数据"""
server = Server()
await server.init()
server.set_endpoint("opc.tcp://0.0.0.0:4840/freeopcua/server/")
server.set_server_name("Industrial Gateway OPC-UA Server")
# 设置安全策略
await server.set_security_policy([
ua.SecurityPolicyType.NoSecurity,
ua.SecurityPolicyType.Basic256Sha256_SignAndEncrypt
])
# 创建命名空间
uri = "http://quectel.com/gateway"
idx = await server.register_namespace(uri)
# 创建对象节点
objects = server.nodes.objects
factory = await objects.add_object(idx, "Factory")
# 添加变量节点
temperature = await factory.add_variable(idx, "Temperature", 0.0)
await temperature.set_writable()
pressure = await factory.add_variable(idx, "Pressure", 0.0)
motor_status = await factory.add_variable(idx, "MotorStatus", False)
async with server:
print("OPC-UA Server started")
while True:
# 从 Modbus 读取数据并更新 OPC-UA 节点
plc_data = read_plc_data()
await temperature.write_value(plc_data['temperature'])
await pressure.write_value(plc_data['pressure'])
await motor_status.write_value(plc_data['motor_running'])
await asyncio.sleep(1)
asyncio.run(setup_opcua_server())OPC-UA 客户端(SCADA 侧)
python
# SCADA 系统通过 OPC-UA 读取网关数据
from asyncua import Client
async def read_from_gateway():
async with Client("opc.tcp://192.168.1.100:4840/") as client:
# 浏览节点
root = client.nodes.root
objects = client.nodes.objects
# 读取温度
temp_node = await client.nodes.root.get_child(
["0:Objects", "2:Factory", "2:Temperature"]
)
temperature = await temp_node.read_value()
print(f"Temperature: {temperature}°C")
# 订阅数据变化
handler = SubHandler()
sub = await client.create_subscription(500, handler)
await sub.subscribe_data_change([temp_node])
await asyncio.sleep(10)
class SubHandler:
def datachange_notification(self, node, val, data):
print(f"Data change: {node} = {val}")PROFINET 集成
PROFINET 是西门子主导的工业以太网标准,广泛用于西门子 PLC 系统。
通过 Profinet IO 读取数据
python
# 使用 python-profinet 库(或通过 OPC-UA 桥接)
# 注意:PROFINET 通常需要专用硬件支持
# 更常见的方案:通过西门子 S7 协议访问
import snap7
def read_profinet_device_via_s7(plc_ip):
"""通过 S7 协议访问 PROFINET 设备数据"""
client = snap7.client.Client()
client.connect(plc_ip, 0, 1)
# 读取 PROFINET IO 设备的数据
# 数据通常映射到 PLC 的 I/O 区域
input_data = client.read_area(snap7.types.Areas.PE, 0, 0, 10) # 读取 10 字节输入
output_data = client.read_area(snap7.types.Areas.PA, 0, 0, 10) # 读取 10 字节输出
return input_data, output_dataBACnet 楼宇自动化
BACnet 是楼宇自动化领域的标准协议,用于暖通空调(HVAC)、照明、门禁等系统。
python
# 使用 BAC0 库访问 BACnet 设备
import BAC0
def read_bacnet_device():
"""读取 BACnet 设备数据"""
bacnet = BAC0.lite()
# 读取模拟输入(温度传感器)
# 格式:设备IP:端口 对象类型 对象实例 属性
temperature = bacnet.read('192.168.1.200 analogInput 1 presentValue')
print(f"Room Temperature: {temperature}°C")
# 读取二进制输出(风机状态)
fan_status = bacnet.read('192.168.1.200 binaryOutput 1 presentValue')
print(f"Fan Status: {'ON' if fan_status else 'OFF'}")
# 写入设定值(温度设定点)
bacnet.write('192.168.1.200 analogValue 1 presentValue 22.5')
bacnet.disconnect()统一数据模型
设备信息模型(基于 IEC CIM)
json
{
"device": {
"id": "DEVICE_PLC_001",
"name": "生产线1号PLC",
"type": "PLC",
"manufacturer": "Siemens",
"model": "S7-1200",
"location": {
"plant": "上海工厂",
"workshop": "1号车间",
"line": "生产线1"
},
"protocol": "S7",
"connection": {
"type": "ethernet",
"ip": "192.168.1.100",
"port": 102
}
},
"datapoints": [
{
"id": "DP_TEMP_001",
"name": "设备温度",
"type": "analog",
"unit": "°C",
"range": {"min": -20, "max": 150},
"alarm": {"high": 85, "low": -10},
"source": {"db": 1, "offset": 0, "datatype": "REAL"}
},
{
"id": "DP_MOTOR_001",
"name": "电机运行状态",
"type": "digital",
"source": {"db": 1, "byte": 8, "bit": 0}
}
]
}协议适配器模式
python
from abc import ABC, abstractmethod
class ProtocolAdapter(ABC):
"""协议适配器基类"""
@abstractmethod
def connect(self) -> bool:
pass
@abstractmethod
def read_datapoint(self, datapoint_config) -> any:
pass
@abstractmethod
def write_datapoint(self, datapoint_config, value) -> bool:
pass
@abstractmethod
def disconnect(self):
pass
class ModbusAdapter(ProtocolAdapter):
"""Modbus 协议适配器"""
def connect(self):
self.client = ModbusTcpClient(self.host, self.port)
return self.client.connect()
def read_datapoint(self, dp):
result = self.client.read_holding_registers(
dp['address'], 1, slave=dp['slave_id']
)
if not result.isError():
return result.registers[0] * dp.get('scale', 1.0)
return None
class S7Adapter(ProtocolAdapter):
"""西门子 S7 协议适配器"""
def connect(self):
self.client = snap7.client.Client()
self.client.connect(self.ip, self.rack, self.slot)
return self.client.get_connected()
def read_datapoint(self, dp):
data = self.client.db_read(dp['db'], dp['offset'], 4)
return struct.unpack('>f', data)[0]
class ProtocolAdapterFactory:
"""协议适配器工厂"""
@staticmethod
def create(protocol, config):
adapters = {
'modbus_tcp': ModbusAdapter,
'modbus_rtu': ModbusAdapter,
's7': S7Adapter,
'opcua': OPCUAAdapter,
'bacnet': BACnetAdapter
}
adapter_class = adapters.get(protocol)
if not adapter_class:
raise ValueError(f"Unsupported protocol: {protocol}")
return adapter_class(config)