Python MQTT

MQTT 是一种基于发布/订阅模式的 轻量级物联网消息传输协议,可以用极少的代码和带宽为联网设备提供实时可靠的消息服务,它广泛应用于物联网、移动互联网、智能硬件、车联网、电力能源等行业。

安装

pip install paho-mqtt

使用

导入模块

import paho.mqtt.client as mqtt_client
from paho.mqtt.enums import CallbackAPIVersion

连接参数

broker = 'broker.emqx.io'                       # MQTT服务器地址
port = 1883                                     # MQTT服务器端口
topic = 'test/topic'                            # 主题
client_id = 'test-{random.randint(0, 1000)}'    # 客户端ID

定义连接函数

def connect_mqtt():
    def on_connect(client, userdata, flags, rc):
        if rc == 0:
            print("已成功连接到MQTT服务器")
        else:
            print("连接失败,错误码:", rc)
    # 创建客户端
    client = mqtt_client.Client(
                CallbackAPIVersion.VERSION2,
                client_id=client_id,
                protocol=mqtt_client.MQTTv5,
            )
    # 设置用户名和密码(如果需要)
    client.username_pw_set('emqx', 'public')
    # 设置加密证书(如果需要)
    client.tls_set(ca_path)

    # 回调函数
    client.on_connect = on_connect  # 连接成功回调
    client.on_message = on_message    # 消息接收回调
    # client.on_disconnect = on_disconnect  # 断开连接回调
    # ... 其他回调函数

    client.connect(broker, port)
    return client

发布消息

def publish(client):
    client.publish(topic, 'Hello World')

接收消息

定义消息接收回调函数

def on_message(client, userdata, message):
    print(f"收到消息:{message.payload.decode()}")

并在连接函数的可选函数部分中设置

...
client.on_message = on_message  # 消息接收回调
...

订阅主题

def subscribe(client: mqtt_client):
    client.subscribe(topic)

接收消息后会自动调用消息接收回调函数

运行

if __name__ == '__main__':
    client = connect_mqtt()
    publish(client)
    subscribe(client)
    client.loop_forever()

高级配置

用户属性

用户属性(User Properties)其实是一种自定义属性,允许用户向 MQTT 消息添加自己的元数据,传输额外的自定义信息以扩充更多应用场景。

导入包

from paho.mqtt.properties import Properties # 导入 MQTT 属性
from paho.mqtt.packettypes import PacketTypes # 导入 MQTT 包类型

定义,添加属性

user_properties = Properties(PacketTypes.PUBLISH)
user_properties.UserProperty = [("username", username)]

使用属性

client.publish(topic, message, properties=properties)

持久会话

在 MQTT 中,我们通常将生命周期长于网络连接的会话称为 持久会话

常用于在网络连接短时间断开时,保留会话连接,保障消息的传递

在 MQTTv5 中,默认开启持久会话(仅首次连接为全新会话),且过期时间为永久

设定持久会话

mqtt_client.CleanStartOption = False
# 可选项:
#   True - 全新会话
#   False - 持久会话
#   mqtt_client.MQTT_CLEAN_START_FIRST_ONLY - 默认 - 仅首次连接为全新会话

设置过期时间 - 属性

connect_properties = Properties(packetType=PacketTypes.CONNECT)
connect_properties.SessionExpiryInterval = 120  # 以秒为单位

在连接时配置

client.connect(broker, port,
clean_start=mqtt_client.CleanStartOption,
properties=connect_properties)

其他高级配置项可参考 MQTT 教程

示例 - MQTT 聊天室

import paho.mqtt.client as client  # 导入 MQTT 客户端
from paho.mqtt.enums import CallbackAPIVersion  # 导入回调 API 版本
from paho.mqtt.properties import Properties  # 导入 MQTT 属性
from paho.mqtt.packettypes import PacketTypes  # 导入 MQTT 包类型
from paho.mqtt.subscribeoptions import SubscribeOptions  # 导入订阅选项
import os, json, uuid, threading


class MQTT:
    def __init__(self, config, ca_path) -> None:
        broker = config.get("broker")
        port = config.get("port")

        user = config.get("user")
        passwd = config.get("passwd")

        client_id = f"chat_{uuid.uuid4()}"

        self.client = client.Client(
            CallbackAPIVersion.VERSION2,
            client_id=client_id,
            protocol=client.MQTTv5,
        )

        self.client.username_pw_set(username=user, password=passwd)

        self.client.tls_set(ca_certs=ca_path)

        self.client.on_message = self.on_message

        self.client.connect(broker, port)

    def on_message(self, client, userdata, message: client.MQTTMessage):
        properties = message.properties.json() if message.properties else {}
        user_properties = properties.get("UserProperty", [])
        username = next(
            (value for key, value in user_properties if key == "username"), None
        )
        print(
            f"{username}: {message.payload.decode()}"
            if username
            else message.payload.decode()
        )

    def subscribe(self, room):
        return self.client.subscribe(
            f"chat/room/{room}", options=SubscribeOptions(noLocal=True)
        )

    def publish(self, room, message, properties=None):
        self.client.publish(f"chat/room/{room}", message, properties=properties)

    def run(self):
        self.client.loop_forever()

    def disconnect(self):
        self.client.disconnect()


if __name__ == "__main__":
    config_path = os.path.join(
        os.path.dirname(os.path.dirname(__file__)), "config.json"
    )
    if not os.path.exists(config_path):
        print("缺少配置文件 config.json")
        exit(1)

    with open(config_path, "r") as f:
        try:
            config = json.load(f)
        except json.JSONDecodeError:
            print("配置文件 config.json 格式错误")
            exit(1)

    if any(config.get(key) is None for key in ("broker", "port", "user", "passwd")):
        print("缺少 MQTT 服务器配置,请查阅文档!")
        exit(1)

    ca_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), "ssl/emqx.pem")

    if not os.path.exists(ca_path):
        print("缺少证书文件 emqx.pem,请查阅文档!")
        exit(1)

    username = config.get("username")
    if username is None:
        username = input("初次使用,请输入用户名:")
        config["username"] = username
        with open(config_path, "w") as f:
            json.dump(config, f)

    else:
        print(f"欢迎回来,{username}")

    print("正在连接 MQTT 服务器...")

    mqtt = MQTT(config, ca_path)
    threading.Thread(target=mqtt.run).start()  # 开启一个线程运行 MQTT 客户端

    print("连接成功")

    while True:
        room = input("请输入房间号:")
        if room.isdigit():
            if int(room) < 0 or int(room) > 256:
                print("房间号必须在 0 到 256 之间,请重新输入。")
            else:
                break
        else:
            print("房间号必须是数字,请重新输入。")

    code = mqtt.subscribe(room)

    print("加入房间成功,输入 exit 退出房间")

    message = f"{username} 进入了房间"
    mqtt.publish(room, message)

    while True:
        message = input()

        if message == "exit":
            mqtt.publish(room, f"{username} 离开了房间")
            break

        if message == "rename":
            last_name = username
            username = input("请输入新的用户名:")
            config["username"] = username
            with open(config_path, "w") as f:
                json.dump(config, f)
            print(f"{last_name} 更名为 {username}")
            message = f"{last_name} 更名为 {username}"
            mqtt.publish(room, message)
            continue

        user_properties = Properties(PacketTypes.PUBLISH)
        user_properties.UserProperty = [("username", username)]

        mqtt.publish(room, message, user_properties)

    mqtt.disconnect()