Python MQTT
Python MQTT
魔力刘易斯MQTT 是一种基于发布/订阅模式的 轻量级物联网消息传输协议,可以用极少的代码和带宽为联网设备提供实时可靠的消息服务,它广泛应用于物联网、移动互联网、智能硬件、车联网、电力能源等行业。
安装
pip install paho-mqtt
使用
导入模块
import paho.mqtt.client as mqtt_client
from paho.mqtt.enums import CallbackAPIVersion
连接参数
broker = 'broker.emqx.io' # MQTT服务器地址
port = 1883 # MQTT服务器端口
topic = 'test/topic' # 主题
client_id = 'test-{random.randint(0, 1000)}' # 客户端ID
定义连接函数
def connect_mqtt():
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("已成功连接到MQTT服务器")
else:
print("连接失败,错误码:", rc)
# 创建客户端
client = mqtt_client.Client(
CallbackAPIVersion.VERSION2,
client_id=client_id,
protocol=mqtt_client.MQTTv5,
)
# 设置用户名和密码(如果需要)
client.username_pw_set('emqx', 'public')
# 设置加密证书(如果需要)
client.tls_set(ca_path)
# 回调函数
client.on_connect = on_connect # 连接成功回调
client.on_message = on_message # 消息接收回调
# client.on_disconnect = on_disconnect # 断开连接回调
# ... 其他回调函数
client.connect(broker, port)
return client
发布消息
def publish(client):
client.publish(topic, 'Hello World')
接收消息
定义消息接收回调函数
def on_message(client, userdata, message):
print(f"收到消息:{message.payload.decode()}")
并在连接函数的可选函数部分中设置
...
client.on_message = on_message # 消息接收回调
...
订阅主题
def subscribe(client: mqtt_client):
client.subscribe(topic)
接收消息后会自动调用消息接收回调函数
运行
if __name__ == '__main__':
client = connect_mqtt()
publish(client)
subscribe(client)
client.loop_forever()
高级配置
用户属性
用户属性(User Properties)其实是一种自定义属性,允许用户向 MQTT 消息添加自己的元数据,传输额外的自定义信息以扩充更多应用场景。
导入包
from paho.mqtt.properties import Properties # 导入 MQTT 属性
from paho.mqtt.packettypes import PacketTypes # 导入 MQTT 包类型
定义,添加属性
user_properties = Properties(PacketTypes.PUBLISH)
user_properties.UserProperty = [("username", username)]
使用属性
client.publish(topic, message, properties=properties)
持久会话
在 MQTT 中,我们通常将生命周期长于网络连接的会话称为 持久会话
常用于在网络连接短时间断开时,保留会话连接,保障消息的传递
在 MQTTv5 中,默认开启持久会话(仅首次连接为全新会话),且过期时间为永久
设定持久会话
mqtt_client.CleanStartOption = False
# 可选项:
# True - 全新会话
# False - 持久会话
# mqtt_client.MQTT_CLEAN_START_FIRST_ONLY - 默认 - 仅首次连接为全新会话
设置过期时间 - 属性
connect_properties = Properties(packetType=PacketTypes.CONNECT)
connect_properties.SessionExpiryInterval = 120 # 以秒为单位
在连接时配置
client.connect(broker, port,
clean_start=mqtt_client.CleanStartOption,
properties=connect_properties)
其他高级配置项可参考 MQTT 教程
示例 - MQTT 聊天室
import paho.mqtt.client as client # 导入 MQTT 客户端
from paho.mqtt.enums import CallbackAPIVersion # 导入回调 API 版本
from paho.mqtt.properties import Properties # 导入 MQTT 属性
from paho.mqtt.packettypes import PacketTypes # 导入 MQTT 包类型
from paho.mqtt.subscribeoptions import SubscribeOptions # 导入订阅选项
import os, json, uuid, threading
class MQTT:
def __init__(self, config, ca_path) -> None:
broker = config.get("broker")
port = config.get("port")
user = config.get("user")
passwd = config.get("passwd")
client_id = f"chat_{uuid.uuid4()}"
self.client = client.Client(
CallbackAPIVersion.VERSION2,
client_id=client_id,
protocol=client.MQTTv5,
)
self.client.username_pw_set(username=user, password=passwd)
self.client.tls_set(ca_certs=ca_path)
self.client.on_message = self.on_message
self.client.connect(broker, port)
def on_message(self, client, userdata, message: client.MQTTMessage):
properties = message.properties.json() if message.properties else {}
user_properties = properties.get("UserProperty", [])
username = next(
(value for key, value in user_properties if key == "username"), None
)
print(
f"{username}: {message.payload.decode()}"
if username
else message.payload.decode()
)
def subscribe(self, room):
return self.client.subscribe(
f"chat/room/{room}", options=SubscribeOptions(noLocal=True)
)
def publish(self, room, message, properties=None):
self.client.publish(f"chat/room/{room}", message, properties=properties)
def run(self):
self.client.loop_forever()
def disconnect(self):
self.client.disconnect()
if __name__ == "__main__":
config_path = os.path.join(
os.path.dirname(os.path.dirname(__file__)), "config.json"
)
if not os.path.exists(config_path):
print("缺少配置文件 config.json")
exit(1)
with open(config_path, "r") as f:
try:
config = json.load(f)
except json.JSONDecodeError:
print("配置文件 config.json 格式错误")
exit(1)
if any(config.get(key) is None for key in ("broker", "port", "user", "passwd")):
print("缺少 MQTT 服务器配置,请查阅文档!")
exit(1)
ca_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), "ssl/emqx.pem")
if not os.path.exists(ca_path):
print("缺少证书文件 emqx.pem,请查阅文档!")
exit(1)
username = config.get("username")
if username is None:
username = input("初次使用,请输入用户名:")
config["username"] = username
with open(config_path, "w") as f:
json.dump(config, f)
else:
print(f"欢迎回来,{username}")
print("正在连接 MQTT 服务器...")
mqtt = MQTT(config, ca_path)
threading.Thread(target=mqtt.run).start() # 开启一个线程运行 MQTT 客户端
print("连接成功")
while True:
room = input("请输入房间号:")
if room.isdigit():
if int(room) < 0 or int(room) > 256:
print("房间号必须在 0 到 256 之间,请重新输入。")
else:
break
else:
print("房间号必须是数字,请重新输入。")
code = mqtt.subscribe(room)
print("加入房间成功,输入 exit 退出房间")
message = f"{username} 进入了房间"
mqtt.publish(room, message)
while True:
message = input()
if message == "exit":
mqtt.publish(room, f"{username} 离开了房间")
break
if message == "rename":
last_name = username
username = input("请输入新的用户名:")
config["username"] = username
with open(config_path, "w") as f:
json.dump(config, f)
print(f"{last_name} 更名为 {username}")
message = f"{last_name} 更名为 {username}"
mqtt.publish(room, message)
continue
user_properties = Properties(PacketTypes.PUBLISH)
user_properties.UserProperty = [("username", username)]
mqtt.publish(room, message, user_properties)
mqtt.disconnect()