Skip to content

PLC 远程监控方案

PLC 远程监控概述

PLC(Programmable Logic Controller,可编程逻辑控制器)是工业自动化的核心设备。通过移远模组实现 PLC 远程监控,可以大幅降低运维成本。

主流 PLC 品牌与协议

PLC 品牌通信协议接口
西门子 S7 系列S7 协议 / Modbus以太网 / RS485
三菱 FX/Q 系列MC 协议 / Modbus以太网 / RS232
欧姆龙 CP/CJ 系列FINS 协议 / Modbus以太网 / RS232
施耐德 M340/M580Modbus TCP / EtherNet/IP以太网
AB(罗克韦尔)EtherNet/IP / DF1以太网 / RS232
汇川 H3U/AM600Modbus / 汇川协议RS485 / 以太网

西门子 S7 PLC 远程访问

S7 协议通信

python
# 使用 python-snap7 库访问西门子 S7 PLC
import snap7
import struct
import time

class S7PLCMonitor:
    def __init__(self, ip, rack=0, slot=1):
        self.client = snap7.client.Client()
        self.client.connect(ip, rack, slot)
        
        if self.client.get_connected():
            print(f"Connected to S7 PLC: {ip}")
        else:
            raise Exception("Connection failed")
    
    def read_db(self, db_number, start, size):
        """读取数据块(DB)"""
        data = self.client.db_read(db_number, start, size)
        return data
    
    def read_temperature(self, db_number, offset):
        """读取温度值(REAL 类型,4字节)"""
        data = self.read_db(db_number, offset, 4)
        temperature = struct.unpack('>f', data)[0]
        return temperature
    
    def read_bool(self, db_number, byte_offset, bit_offset):
        """读取布尔值"""
        data = self.read_db(db_number, byte_offset, 1)
        return bool(data[0] & (1 << bit_offset))
    
    def write_bool(self, db_number, byte_offset, bit_offset, value):
        """写入布尔值(远程控制)"""
        data = self.read_db(db_number, byte_offset, 1)
        if value:
            data[0] |= (1 << bit_offset)
        else:
            data[0] &= ~(1 << bit_offset)
        self.client.db_write(db_number, byte_offset, data)
    
    def read_all_data(self):
        """读取所有监控数据"""
        return {
            "temperature_1": self.read_temperature(1, 0),   # DB1.DBD0
            "temperature_2": self.read_temperature(1, 4),   # DB1.DBD4
            "motor_running": self.read_bool(1, 8, 0),       # DB1.DBX8.0
            "alarm_active": self.read_bool(1, 8, 1),        # DB1.DBX8.1
            "production_count": struct.unpack('>i', 
                self.read_db(1, 10, 4))[0]                  # DB1.DBD10
        }

# 使用示例
monitor = S7PLCMonitor("192.168.1.100")

while True:
    data = monitor.read_all_data()
    print(f"PLC Data: {data}")
    
    # 发布到 MQTT
    mqtt_client.publish("factory/plc/data", json.dumps(data))
    
    time.sleep(5)

Modbus 通用方案

多从站轮询

python
from pymodbus.client import ModbusTcpClient, ModbusSerialClient
import threading
import queue

class MultiSlaveModbusPoller:
    """多从站 Modbus 轮询器"""
    
    def __init__(self, connection_type='tcp', **kwargs):
        if connection_type == 'tcp':
            self.client = ModbusTcpClient(
                host=kwargs['host'],
                port=kwargs.get('port', 502)
            )
        else:
            self.client = ModbusSerialClient(
                port=kwargs['port'],
                baudrate=kwargs.get('baudrate', 9600)
            )
        
        self.devices = []
        self.data_queue = queue.Queue()
        self.lock = threading.Lock()
    
    def add_device(self, slave_id, name, registers):
        """添加从站设备"""
        self.devices.append({
            'slave_id': slave_id,
            'name': name,
            'registers': registers
        })
    
    def poll_device(self, device):
        """轮询单个设备"""
        data = {'device': device['name'], 'timestamp': int(time.time())}
        
        with self.lock:  # 串口是共享资源,需要加锁
            for reg in device['registers']:
                try:
                    if reg['type'] == 'holding':
                        result = self.client.read_holding_registers(
                            reg['address'], reg['count'],
                            slave=device['slave_id']
                        )
                    elif reg['type'] == 'input':
                        result = self.client.read_input_registers(
                            reg['address'], reg['count'],
                            slave=device['slave_id']
                        )
                    elif reg['type'] == 'coil':
                        result = self.client.read_coils(
                            reg['address'], reg['count'],
                            slave=device['slave_id']
                        )
                    
                    if not result.isError():
                        if reg['type'] == 'coil':
                            value = result.bits[0]
                        else:
                            raw = result.registers[0]
                            value = raw * reg.get('scale', 1.0)
                        
                        data[reg['name']] = round(value, 2)
                        
                except Exception as e:
                    data[reg['name']] = None
                    print(f"Error reading {device['name']}.{reg['name']}: {e}")
        
        return data
    
    def start_polling(self, interval=5):
        """开始轮询"""
        self.client.connect()
        
        while True:
            for device in self.devices:
                data = self.poll_device(device)
                self.data_queue.put(data)
            
            time.sleep(interval)

# 配置示例
poller = MultiSlaveModbusPoller('rtu', port='/dev/ttyUSB0', baudrate=9600)

# 添加变频器(从站 1)
poller.add_device(1, "inverter_1", [
    {'name': 'output_freq', 'type': 'holding', 'address': 0x0001, 'count': 1, 'scale': 0.01},
    {'name': 'output_current', 'type': 'holding', 'address': 0x0002, 'count': 1, 'scale': 0.1},
    {'name': 'dc_voltage', 'type': 'holding', 'address': 0x0003, 'count': 1, 'scale': 0.1},
    {'name': 'fault_code', 'type': 'holding', 'address': 0x0008, 'count': 1, 'scale': 1},
])

# 添加温控仪(从站 2)
poller.add_device(2, "temp_controller_1", [
    {'name': 'pv_temp', 'type': 'holding', 'address': 0x0000, 'count': 1, 'scale': 0.1},
    {'name': 'sv_temp', 'type': 'holding', 'address': 0x0001, 'count': 1, 'scale': 0.1},
    {'name': 'output_percent', 'type': 'holding', 'address': 0x0002, 'count': 1, 'scale': 0.1},
])

远程参数修改

安全的远程写入

python
class SecureRemoteControl:
    """安全的远程控制:带权限验证和操作日志"""
    
    def __init__(self, modbus_client, mqtt_client):
        self.modbus = modbus_client
        self.mqtt = mqtt_client
        self.operation_log = []
    
    def handle_control_command(self, cmd):
        """处理远程控制命令"""
        # 1. 验证命令签名
        if not self.verify_signature(cmd):
            self.log_operation(cmd, "REJECTED", "Invalid signature")
            return False
        
        # 2. 检查操作权限
        if not self.check_permission(cmd['operator'], cmd['operation']):
            self.log_operation(cmd, "REJECTED", "Permission denied")
            return False
        
        # 3. 参数范围检查
        if not self.validate_params(cmd):
            self.log_operation(cmd, "REJECTED", "Parameter out of range")
            return False
        
        # 4. 执行操作
        try:
            result = self.execute_command(cmd)
            self.log_operation(cmd, "SUCCESS", str(result))
            return True
        except Exception as e:
            self.log_operation(cmd, "FAILED", str(e))
            return False
    
    def execute_command(self, cmd):
        """执行 Modbus 写操作"""
        if cmd['operation'] == 'write_register':
            return self.modbus.write_register(
                cmd['address'],
                cmd['value'],
                slave=cmd['slave_id']
            )
        elif cmd['operation'] == 'write_coil':
            return self.modbus.write_coil(
                cmd['address'],
                cmd['value'],
                slave=cmd['slave_id']
            )
    
    def validate_params(self, cmd):
        """参数范围验证"""
        # 从配置文件加载参数限制
        limits = self.get_param_limits(cmd['slave_id'], cmd['address'])
        if limits:
            value = cmd['value']
            if value < limits['min'] or value > limits['max']:
                return False
        return True
    
    def log_operation(self, cmd, status, message):
        """记录操作日志"""
        log_entry = {
            "timestamp": int(time.time()),
            "operator": cmd.get('operator', 'unknown'),
            "operation": cmd['operation'],
            "slave_id": cmd['slave_id'],
            "address": cmd['address'],
            "value": cmd.get('value'),
            "status": status,
            "message": message
        }
        
        self.operation_log.append(log_entry)
        
        # 发布到 MQTT 审计主题
        self.mqtt.publish(
            "factory/audit/operations",
            json.dumps(log_entry),
            qos=2  # 确保送达
        )
        
        print(f"[{status}] {log_entry}")

褚成志的笔记