PLC 远程监控方案
PLC 远程监控概述
PLC(Programmable Logic Controller,可编程逻辑控制器)是工业自动化的核心设备。通过移远模组实现 PLC 远程监控,可以大幅降低运维成本。
主流 PLC 品牌与协议
| PLC 品牌 | 通信协议 | 接口 |
|---|---|---|
| 西门子 S7 系列 | S7 协议 / Modbus | 以太网 / RS485 |
| 三菱 FX/Q 系列 | MC 协议 / Modbus | 以太网 / RS232 |
| 欧姆龙 CP/CJ 系列 | FINS 协议 / Modbus | 以太网 / RS232 |
| 施耐德 M340/M580 | Modbus TCP / EtherNet/IP | 以太网 |
| AB(罗克韦尔) | EtherNet/IP / DF1 | 以太网 / RS232 |
| 汇川 H3U/AM600 | Modbus / 汇川协议 | RS485 / 以太网 |
西门子 S7 PLC 远程访问
S7 协议通信
python
# 使用 python-snap7 库访问西门子 S7 PLC
import snap7
import struct
import time
class S7PLCMonitor:
def __init__(self, ip, rack=0, slot=1):
self.client = snap7.client.Client()
self.client.connect(ip, rack, slot)
if self.client.get_connected():
print(f"Connected to S7 PLC: {ip}")
else:
raise Exception("Connection failed")
def read_db(self, db_number, start, size):
"""读取数据块(DB)"""
data = self.client.db_read(db_number, start, size)
return data
def read_temperature(self, db_number, offset):
"""读取温度值(REAL 类型,4字节)"""
data = self.read_db(db_number, offset, 4)
temperature = struct.unpack('>f', data)[0]
return temperature
def read_bool(self, db_number, byte_offset, bit_offset):
"""读取布尔值"""
data = self.read_db(db_number, byte_offset, 1)
return bool(data[0] & (1 << bit_offset))
def write_bool(self, db_number, byte_offset, bit_offset, value):
"""写入布尔值(远程控制)"""
data = self.read_db(db_number, byte_offset, 1)
if value:
data[0] |= (1 << bit_offset)
else:
data[0] &= ~(1 << bit_offset)
self.client.db_write(db_number, byte_offset, data)
def read_all_data(self):
"""读取所有监控数据"""
return {
"temperature_1": self.read_temperature(1, 0), # DB1.DBD0
"temperature_2": self.read_temperature(1, 4), # DB1.DBD4
"motor_running": self.read_bool(1, 8, 0), # DB1.DBX8.0
"alarm_active": self.read_bool(1, 8, 1), # DB1.DBX8.1
"production_count": struct.unpack('>i',
self.read_db(1, 10, 4))[0] # DB1.DBD10
}
# 使用示例
monitor = S7PLCMonitor("192.168.1.100")
while True:
data = monitor.read_all_data()
print(f"PLC Data: {data}")
# 发布到 MQTT
mqtt_client.publish("factory/plc/data", json.dumps(data))
time.sleep(5)Modbus 通用方案
多从站轮询
python
from pymodbus.client import ModbusTcpClient, ModbusSerialClient
import threading
import queue
class MultiSlaveModbusPoller:
"""多从站 Modbus 轮询器"""
def __init__(self, connection_type='tcp', **kwargs):
if connection_type == 'tcp':
self.client = ModbusTcpClient(
host=kwargs['host'],
port=kwargs.get('port', 502)
)
else:
self.client = ModbusSerialClient(
port=kwargs['port'],
baudrate=kwargs.get('baudrate', 9600)
)
self.devices = []
self.data_queue = queue.Queue()
self.lock = threading.Lock()
def add_device(self, slave_id, name, registers):
"""添加从站设备"""
self.devices.append({
'slave_id': slave_id,
'name': name,
'registers': registers
})
def poll_device(self, device):
"""轮询单个设备"""
data = {'device': device['name'], 'timestamp': int(time.time())}
with self.lock: # 串口是共享资源,需要加锁
for reg in device['registers']:
try:
if reg['type'] == 'holding':
result = self.client.read_holding_registers(
reg['address'], reg['count'],
slave=device['slave_id']
)
elif reg['type'] == 'input':
result = self.client.read_input_registers(
reg['address'], reg['count'],
slave=device['slave_id']
)
elif reg['type'] == 'coil':
result = self.client.read_coils(
reg['address'], reg['count'],
slave=device['slave_id']
)
if not result.isError():
if reg['type'] == 'coil':
value = result.bits[0]
else:
raw = result.registers[0]
value = raw * reg.get('scale', 1.0)
data[reg['name']] = round(value, 2)
except Exception as e:
data[reg['name']] = None
print(f"Error reading {device['name']}.{reg['name']}: {e}")
return data
def start_polling(self, interval=5):
"""开始轮询"""
self.client.connect()
while True:
for device in self.devices:
data = self.poll_device(device)
self.data_queue.put(data)
time.sleep(interval)
# 配置示例
poller = MultiSlaveModbusPoller('rtu', port='/dev/ttyUSB0', baudrate=9600)
# 添加变频器(从站 1)
poller.add_device(1, "inverter_1", [
{'name': 'output_freq', 'type': 'holding', 'address': 0x0001, 'count': 1, 'scale': 0.01},
{'name': 'output_current', 'type': 'holding', 'address': 0x0002, 'count': 1, 'scale': 0.1},
{'name': 'dc_voltage', 'type': 'holding', 'address': 0x0003, 'count': 1, 'scale': 0.1},
{'name': 'fault_code', 'type': 'holding', 'address': 0x0008, 'count': 1, 'scale': 1},
])
# 添加温控仪(从站 2)
poller.add_device(2, "temp_controller_1", [
{'name': 'pv_temp', 'type': 'holding', 'address': 0x0000, 'count': 1, 'scale': 0.1},
{'name': 'sv_temp', 'type': 'holding', 'address': 0x0001, 'count': 1, 'scale': 0.1},
{'name': 'output_percent', 'type': 'holding', 'address': 0x0002, 'count': 1, 'scale': 0.1},
])远程参数修改
安全的远程写入
python
class SecureRemoteControl:
"""安全的远程控制:带权限验证和操作日志"""
def __init__(self, modbus_client, mqtt_client):
self.modbus = modbus_client
self.mqtt = mqtt_client
self.operation_log = []
def handle_control_command(self, cmd):
"""处理远程控制命令"""
# 1. 验证命令签名
if not self.verify_signature(cmd):
self.log_operation(cmd, "REJECTED", "Invalid signature")
return False
# 2. 检查操作权限
if not self.check_permission(cmd['operator'], cmd['operation']):
self.log_operation(cmd, "REJECTED", "Permission denied")
return False
# 3. 参数范围检查
if not self.validate_params(cmd):
self.log_operation(cmd, "REJECTED", "Parameter out of range")
return False
# 4. 执行操作
try:
result = self.execute_command(cmd)
self.log_operation(cmd, "SUCCESS", str(result))
return True
except Exception as e:
self.log_operation(cmd, "FAILED", str(e))
return False
def execute_command(self, cmd):
"""执行 Modbus 写操作"""
if cmd['operation'] == 'write_register':
return self.modbus.write_register(
cmd['address'],
cmd['value'],
slave=cmd['slave_id']
)
elif cmd['operation'] == 'write_coil':
return self.modbus.write_coil(
cmd['address'],
cmd['value'],
slave=cmd['slave_id']
)
def validate_params(self, cmd):
"""参数范围验证"""
# 从配置文件加载参数限制
limits = self.get_param_limits(cmd['slave_id'], cmd['address'])
if limits:
value = cmd['value']
if value < limits['min'] or value > limits['max']:
return False
return True
def log_operation(self, cmd, status, message):
"""记录操作日志"""
log_entry = {
"timestamp": int(time.time()),
"operator": cmd.get('operator', 'unknown'),
"operation": cmd['operation'],
"slave_id": cmd['slave_id'],
"address": cmd['address'],
"value": cmd.get('value'),
"status": status,
"message": message
}
self.operation_log.append(log_entry)
# 发布到 MQTT 审计主题
self.mqtt.publish(
"factory/audit/operations",
json.dumps(log_entry),
qos=2 # 确保送达
)
print(f"[{status}] {log_entry}")