Skip to content

RTOS 轻量模组产品概览

RTOS 模组定位

RTOS(Real-Time Operating System)轻量模组是移远产品线中成本最低、功耗最小的智能模组,适合简单的数据采集和上报场景。

与 Linux/Android 模组的区别

特性RTOS 模组Linux 模组Android 模组
操作系统FreeRTOS/ThreadXLinuxAndroid
启动时间< 3s10-30s30-60s
RAM256KB-64MB256MB+1GB+
功耗极低
开发语言C / QuecPythonC/C++/PythonJava/C++
适用场景简单IoT复杂IoT工业平板/POS

主力产品

EC600M-CN(Cat.1 + QuecPython)

参数规格
蜂窝LTE Cat.1 bis
芯片展锐 UIS8910DM
Flash4MB(用户可用约 2MB)
RAM1MB(用户可用约 512KB)
OSRTOS(ThreadX)
开发框架QuecOpen C / QuecPython
接口UART / SPI / I2C / GPIO / ADC / PWM
工作温度-35°C ~ +75°C

EC800M-CN(超小尺寸)

参数规格
封装LCC(16.0×18.0mm,超小)
蜂窝LTE Cat.1 bis
特点超小尺寸,适合空间受限设备

BC660K-GL(NB-IoT QuecOpen)

参数规格
蜂窝NB-IoT 全球版
芯片高通 MDM9205
开发框架QuecOpen C
PSM 功耗2.5μA
特点超低功耗,全球 NB-IoT

QuecOpen 开发框架

QuecOpen 是移远为 RTOS 模组提供的 C 语言开发框架,提供丰富的 API:

API 分类

网络 API:
  Ql_iotInit()          # 初始化 IoT 平台连接
  Ql_socketOpen()       # 打开 TCP/UDP socket
  Ql_socketSend()       # 发送数据
  Ql_socketRecv()       # 接收数据
  Ql_httpGet()          # HTTP GET 请求
  Ql_httpPost()         # HTTP POST 请求
  Ql_mqttConnect()      # MQTT 连接
  Ql_mqttPublish()      # MQTT 发布

外设 API:
  Ql_GPIO_Init()        # GPIO 初始化
  Ql_GPIO_SetLevel()    # 设置 GPIO 电平
  Ql_GPIO_GetLevel()    # 读取 GPIO 电平
  Ql_UART_Open()        # 打开串口
  Ql_UART_Write()       # 串口写数据
  Ql_I2C_Init()         # I2C 初始化
  Ql_I2C_Write()        # I2C 写数据
  Ql_ADC_Read()         # 读取 ADC 值

系统 API:
  Ql_Sleep()            # 延时(毫秒)
  Ql_GetLocalTime()     # 获取本地时间
  Ql_Reset()            # 系统复位
  Ql_PowerDown()        # 关机

QuecOpen 应用示例

c
// QuecOpen C 示例:温度监测上报
#include "ql_api_common.h"
#include "ql_api_datacall.h"
#include "ql_api_http.h"
#include "ql_api_adc.h"
#include "ql_log.h"

#define QL_APP_LOG_LEVEL QL_LOG_LEVEL_INFO
#define QL_APP_LOG_TAG "TEMP_MONITOR"

// 读取 NTC 温度传感器
float read_temperature(void) {
    int adc_value = 0;
    Ql_ADC_Read(QL_ADC_CHANNEL_1, &adc_value);
    
    // NTC 温度转换(简化公式)
    float voltage = adc_value * 3.3f / 4095.0f;
    float resistance = 10000.0f * voltage / (3.3f - voltage);
    float temperature = 1.0f / (log(resistance / 10000.0f) / 3950.0f + 1.0f / 298.15f) - 273.15f;
    
    return temperature;
}

// 上报数据到 HTTP 服务器
int upload_data(float temperature) {
    char url[] = "http://api.example.com/sensor/data";
    char body[128];
    snprintf(body, sizeof(body), 
             "{\"device_id\":\"EC600M_001\",\"temperature\":%.1f}", 
             temperature);
    
    ql_http_client_ctx_t *ctx = Ql_httpClientCreate();
    Ql_httpClientSetHeader(ctx, "Content-Type", "application/json");
    
    int ret = Ql_httpPost(ctx, url, body, strlen(body), NULL, 0, 10000);
    Ql_httpClientDestroy(ctx);
    
    return ret;
}

// 应用入口
void ql_app_main(void) {
    QL_APP_LOG(QL_APP_LOG_TAG, QL_LOG_LEVEL_INFO, "App started");
    
    // 等待网络连接
    ql_datacall_errcode_e ret;
    do {
        ret = Ql_iotDatacallStart(1, "cmnet", "", "");
        Ql_Sleep(1000);
    } while (ret != QL_DATACALL_SUCCESS);
    
    QL_APP_LOG(QL_APP_LOG_TAG, QL_LOG_LEVEL_INFO, "Network connected");
    
    // 主循环
    while (1) {
        float temp = read_temperature();
        QL_APP_LOG(QL_APP_LOG_TAG, QL_LOG_LEVEL_INFO, 
                   "Temperature: %.1f°C", temp);
        
        upload_data(temp);
        
        Ql_Sleep(60000);  // 每分钟上报一次
    }
}

QuecPython 开发

QuecPython 是移远将 MicroPython 移植到 RTOS 模组的开发框架,让开发者用 Python 开发 IoT 应用。

QuecPython 优势

优势:
  ✅ 开发效率高(Python 比 C 快 3-5 倍)
  ✅ 无需交叉编译(直接在模组上运行)
  ✅ 丰富的内置库(网络、文件、传感器)
  ✅ 交互式调试(REPL)
  ✅ 适合快速原型开发

劣势:
  ❌ 性能低于 C(约低 10-100 倍)
  ❌ 内存占用较大
  ❌ 不适合实时性要求高的场景

QuecPython 内置模块

python
# 网络模块
import dataCall      # 数据连接管理
import request       # HTTP 请求
import umqtt         # MQTT 客户端
import usocket       # Socket 编程

# 硬件模块
import machine       # GPIO / UART / SPI / I2C / ADC / PWM
import utime         # 时间和延时
import ubinascii     # 二进制/ASCII 转换

# 系统模块
import uos           # 文件系统操作
import ujson         # JSON 解析
import uhashlib      # 哈希算法
import ucryptolib    # 加密算法

# 移远扩展模块
import quecIot       # 移远 IoT 平台
import gnss          # GNSS 定位
import audio         # 音频播放
import sms           # 短信收发
import voiceCall     # 语音通话

QuecPython 完整示例

python
# 完整的 IoT 设备程序
import utime
import dataCall
import umqtt
import machine
import ujson
import uos

# 配置
DEVICE_ID = "QP_DEVICE_001"
MQTT_HOST = "broker.emqx.io"
MQTT_PORT = 1883
MQTT_TOPIC_PUB = f"quectel/{DEVICE_ID}/data"
MQTT_TOPIC_SUB = f"quectel/{DEVICE_ID}/cmd"

# 初始化 ADC
adc = machine.ADC()

# 初始化 GPIO(LED 指示灯)
led = machine.Pin(machine.Pin.GPIO1, machine.Pin.OUT, machine.Pin.PULL_DISABLE, 0)

class IoTDevice:
    def __init__(self):
        self.mqtt_client = None
        self.connected = False
    
    def connect_network(self):
        """连接蜂窝网络"""
        print("Connecting to network...")
        timeout = 30
        while timeout > 0:
            info = dataCall.getInfo(1, 0)
            if info[2][0] == 1:
                print(f"Network connected, IP: {info[2][2]}")
                return True
            utime.sleep(1)
            timeout -= 1
        return False
    
    def connect_mqtt(self):
        """连接 MQTT 服务器"""
        self.mqtt_client = umqtt.MQTTClient(
            client_id=DEVICE_ID,
            server=MQTT_HOST,
            port=MQTT_PORT,
            keepalive=60
        )
        
        # 设置消息回调
        self.mqtt_client.set_callback(self.on_message)
        
        try:
            self.mqtt_client.connect()
            self.mqtt_client.subscribe(MQTT_TOPIC_SUB)
            self.connected = True
            print("MQTT connected")
            return True
        except Exception as e:
            print(f"MQTT connect failed: {e}")
            return False
    
    def on_message(self, topic, msg):
        """处理接收到的 MQTT 消息"""
        topic_str = topic.decode()
        msg_str = msg.decode()
        print(f"Received: {topic_str} -> {msg_str}")
        
        try:
            cmd = ujson.loads(msg_str)
            if cmd.get("action") == "led_on":
                led.write(1)
            elif cmd.get("action") == "led_off":
                led.write(0)
            elif cmd.get("action") == "reboot":
                import Power
                Power.powerRestart()
        except Exception as e:
            print(f"Command parse error: {e}")
    
    def read_sensors(self):
        """读取传感器数据"""
        # 读取 ADC(模拟传感器)
        raw = adc.read(1)
        voltage = raw * 3.3 / 4095
        temperature = (voltage - 0.5) / 0.01
        
        return {
            "temperature": round(temperature, 1),
            "timestamp": utime.time()
        }
    
    def publish_data(self, data):
        """发布数据"""
        payload = ujson.dumps(data)
        try:
            self.mqtt_client.publish(MQTT_TOPIC_PUB, payload)
            led.write(1)
            utime.sleep_ms(100)
            led.write(0)
            return True
        except Exception as e:
            print(f"Publish failed: {e}")
            self.connected = False
            return False
    
    def run(self):
        """主运行循环"""
        if not self.connect_network():
            print("Network connection failed, restarting...")
            import Power
            Power.powerRestart()
        
        if not self.connect_mqtt():
            print("MQTT connection failed")
            return
        
        last_publish = 0
        
        while True:
            # 检查 MQTT 消息
            try:
                self.mqtt_client.check_msg()
            except Exception as e:
                print(f"MQTT error: {e}")
                self.connect_mqtt()
            
            # 每60秒发布一次数据
            now = utime.time()
            if now - last_publish >= 60:
                data = self.read_sensors()
                self.publish_data(data)
                last_publish = now
                print(f"Published: {data}")
            
            utime.sleep(1)

# 启动设备
device = IoTDevice()
device.run()

褚成志的笔记