ESP32 运行MQTT客户端进行主题的发布和订阅
目录
ESP32 MQTT的库有很多,凌顺实验室(lingshunlab.com)这次主要使用AsyncMQTT_ESP32,以后有机会再更多的MQTT其他库的使用方法。
前提条件
- 树莓派部署本地的MQTT服务端,具体安装请查看以下连接:
https://lingshunlab.com/book/raspberry-pi/raspberry-pi-install-mosquitto-mqtt-server-and-test-mqtt - ESP32和树莓派在同一WIFI网络里面
效果实现
凌顺实验室(lingshunlab.com)在本示例展示了使用两个ESP32,分别实现发布MQTT的主题消息和订阅并输出MQTT的主题内容。当然,可能会问能不能一个ESP32同时又是发布者,又是订阅者?答案是可以的,因为作为客户端,都是对中间商做信息交换。
BOM
需要准备2个ESP32,
一个ESP32用于发布,
一个ESP32用于订阅。
库的安装
可以在Arduino IDE的库管理里搜索并安装:
点击菜单栏的「工具」---> 「库管理」,然后在搜索框中输入“AsyncMQTT_ESP32”,点击安装即可
下图在我本地已经安装好了:
又或者在Github中下载,并安装到Arduino的 "libraries"文件夹里
Github 地址:
https://github.com/khoih-prog/AsyncMQTT_ESP32
程序提点
1, 首先,需要加载AsyncMQTT_ESP32的库
#include <AsyncMQTT_ESP32.h>
2,配置MQTT的服务器信息,可以是IP或者域名的方式
//#define MQTT_HOST IPAddress(192, 168, 100, 100)
#define MQTT_HOST "broker.emqx.io" // Broker address
#define MQTT_PORT 1883
3,设置主题,发布需要主题,订阅也需要主题
const char *Topic = "lingshunlab/ESP32"; // 主题
4,创建MQTT客户端的实例
// 创建MQTT客户端的实例,名为mqttClient
AsyncMqttClient mqttClient;
5,认识mqttClient的可用的回调函数
当MQTT触发特定事件的时候,可以配置自定义的函数
mqttClient.onConnect(onMqttConnect); // 设置 当MQTT连接时的回调函数
mqttClient.onDisconnect(onMqttDisconnect); // 设置 当MQTT断开连接时的回调函数
mqttClient.onSubscribe(onMqttSubscribe); // 设置 当MQTT订阅主题时的回调函数
mqttClient.onUnsubscribe(onMqttUnsubscribe); // 设置 当MQTT取消订阅主题时的回调函数
mqttClient.onMessage(onMqttMessage); // 设置 当MQTT订阅主题时的回调函数
mqttClient.onPublish(onMqttPublish); // 设置 当取消MQTT订阅主题时的回调函数
mqttClient.setServer(MQTT_HOST, MQTT_PORT); // 设置 MQTT服务器信息
6, 连接MQTT服务器
mqttClient.setServer(MQTT_HOST, MQTT_PORT); //连接MQTT服务器
7,发布主题
通过以下代码,可以对配置好的主题发布消息
// 发布主题消息
uint16_t packetIdPub = mqttClient.publish(PubTopic, 2, true, "welcome to Lingshunlab.com");
Serial.print("Publisshing at QoS 2, packetId: ");
Serial.println(packetIdPub);
delay(2000);
8,订阅主题
通过以下代码,可以订阅配置好的主题
// 订阅MQTT主题,并QoS设置为2
uint16_t packetIdSub = mqttClient.subscribe(SubTopic, 2);
Serial.print("Subscribing at QoS 2, packetId: ");
Serial.println(packetIdSub);
9,当发生主题消息变化的时候的回调函数
mqttClient的回调函数有很多种,请仔细学习查看例子中其他的回调函数。在这里,特别说明一下onMessage的回调函数onMqttMessage(这个函数名称你可以自己定义,随喜),里面有不少参数,例如topic,payload等,其中payload即是消息的内容,可以通过输出显示。
void onMqttMessage(char* topic, char* payload, const AsyncMqttClientMessageProperties& properties,
const size_t& len, const size_t& index, const size_t& total)
{
(void) payload;
Serial.println("=====On MQTT Message=====");
Serial.println("Publish received.");
Serial.print(" topic: ");
Serial.println(topic);
Serial.print(" qos: ");
Serial.println(properties.qos);
Serial.print(" dup: ");
Serial.println(properties.dup);
Serial.print(" retain: ");
Serial.println(properties.retain);
Serial.print(" len: ");
Serial.println(len);
Serial.print(" index: ");
Serial.println(index);
Serial.print(" total: ");
Serial.println(total);
Serial.print("payload: ");
Serial.println(payload); // 输出消息内容
}
10,请查看AsyncMQTT_ESP32的官方例子
可以学习到FreeRTOS的多线程如何应用。
程序代码
发布主题de完整代码
#include <WiFi.h>
// 配置 WIFI
#define WIFI_SSID "***your wifi***"
#define WIFI_PASSWORD "***your wifi password***"
// 加载AsyncMQTT_ESP32库
#include <AsyncMQTT_ESP32.h>
// 配置MQTT服务器地址和端口
#define MQTT_HOST IPAddress(192,168,100,100) // Broker IP
// #define MQTT_HOST "broker.emqx.io" // Broker address
#define MQTT_PORT 1883
const char *PubTopic = "lingshunlab/ESP32"; // 发布消息的主题
AsyncMqttClient mqttClient; // 创建 MQTT客户端实例
void onMqttConnect(bool sessionPresent) // 编写对应的回调函数
{
Serial.println("=====On MQTT Connect=====");
Serial.print("Connected to MQTT broker: ");
Serial.print(MQTT_HOST);
Serial.print(", port: ");
Serial.println(MQTT_PORT);
Serial.print("PubTopic: ");
Serial.println(PubTopic);
}
void onMqttDisconnect(AsyncMqttClientDisconnectReason reason)
{
(void) reason;
Serial.println("Disconnected from MQTT.");
}
void onMqttSubscribe(const uint16_t& packetId, const uint8_t& qos)
{
Serial.println("Subscribe acknowledged.");
Serial.print(" packetId: ");
Serial.println(packetId);
Serial.print(" qos: ");
Serial.println(qos);
}
void onMqttUnsubscribe(const uint16_t& packetId)
{
Serial.println("Unsubscribe acknowledged.");
Serial.print(" packetId: ");
Serial.println(packetId);
}
void onMqttMessage(char* topic, char* payload, const AsyncMqttClientMessageProperties& properties,
const size_t& len, const size_t& index, const size_t& total)
{
(void) payload;
Serial.println("=====On MQTT Message=====");
Serial.println("Publish received.");
Serial.print(" topic: ");
Serial.println(topic);
Serial.print(" qos: ");
Serial.println(properties.qos);
Serial.print(" dup: ");
Serial.println(properties.dup);
Serial.print(" retain: ");
Serial.println(properties.retain);
Serial.print(" len: ");
Serial.println(len);
Serial.print(" index: ");
Serial.println(index);
Serial.print(" total: ");
Serial.println(total);
Serial.print("payload: ");
Serial.println(payload); // 输出消息内容
}
void onMqttPublish(const uint16_t& packetId)
{
Serial.println("Publish acknowledged.");
Serial.print(" packetId: ");
Serial.println(packetId);
}
void setup()
{
Serial.begin(115200);
while (!Serial && millis() < 5000);
delay(500);
// 连接WIFI
WiFi.begin(WIFI_SSID, WIFI_PASSWORD);
while (WiFi.waitForConnectResult() != WL_CONNECTED) {
Serial.println("Connection Failed! Rebooting...");
delay(5000);
ESP.restart(); // 重启esp32
}
delay(500);
mqttClient.onConnect(onMqttConnect); // 设置 当MQTT连接时的回调函数
mqttClient.onDisconnect(onMqttDisconnect); // 设置 当MQTT断开连接时的回调函数
mqttClient.onMessage(onMqttMessage); // 设置 当MQTT订阅主题时的回调函数
mqttClient.onPublish(onMqttPublish); // 设置 当取消MQTT订阅主题时的回调函数
mqttClient.setServer(MQTT_HOST, MQTT_PORT); // 设置 MQTT服务器信息
mqttClient.connect(); // 连接 MQTT
delay(500);
}
void loop()
{
// 发布主题消息
uint16_t packetIdPub = mqttClient.publish(PubTopic, 2, true, "welcome to Lingshunlab.com");
Serial.print("Publisshing at QoS 2, packetId: ");
Serial.println(packetIdPub);
delay(2000);
}
上传代码后,程序将会先连接WIFI,然后连接MQTT服务器,再之后每隔2秒发布一个对应主题的消息
订阅主题de完整代码
#include <WiFi.h>
// 配置 WIFI
#define WIFI_SSID "***your wifi***"
#define WIFI_PASSWORD "***your wifi password***"
// 加载 AsyncMQTT_ESP32 库
#include <AsyncMQTT_ESP32.h>
// 配置MQTT服务器地址和端口
#define MQTT_HOST IPAddress(192,168,1,55) // Broker IP
// #define MQTT_HOST "broker.emqx.io" // Broker address
#define MQTT_PORT 1883
const char *SubTopic = "lingshunlab/ESP32"; // 订阅的主题
AsyncMqttClient mqttClient;
void onMqttConnect(bool sessionPresent)
{
Serial.println("=====On MQTT Connect=====");
Serial.print("Connected to MQTT broker: ");
Serial.print(MQTT_HOST);
Serial.print(", port: ");
Serial.println(MQTT_PORT);
Serial.print("PubTopic: ");
Serial.println(SubTopic);
// 订阅MQTT主题,并QoS设置为2
uint16_t packetIdSub = mqttClient.subscribe(SubTopic, 2);
Serial.print("Subscribing at QoS 2, packetId: ");
Serial.println(packetIdSub);
}
void onMqttDisconnect(AsyncMqttClientDisconnectReason reason)
{
(void) reason;
Serial.println("Disconnected from MQTT.");
}
void onMqttSubscribe(const uint16_t& packetId, const uint8_t& qos)
{
Serial.println("=====On MQTT Subscribe=====");
Serial.println("Subscribe acknowledged.");
Serial.print(" packetId: ");
Serial.println(packetId);
Serial.print(" qos: ");
Serial.println(qos);
}
void onMqttUnsubscribe(const uint16_t& packetId)
{
Serial.println("Unsubscribe acknowledged.");
Serial.print(" packetId: ");
Serial.println(packetId);
}
void onMqttMessage(char* topic, char* payload, const AsyncMqttClientMessageProperties& properties,
const size_t& len, const size_t& index, const size_t& total)
{
(void) payload;
Serial.println("=====On MQTT Message=====");
Serial.println("Publish received.");
Serial.print(" topic: ");
Serial.println(topic);
Serial.print(" qos: ");
Serial.println(properties.qos);
Serial.print(" dup: ");
Serial.println(properties.dup);
Serial.print(" retain: ");
Serial.println(properties.retain);
Serial.print(" len: ");
Serial.println(len);
Serial.print(" index: ");
Serial.println(index);
Serial.print(" total: ");
Serial.println(total);
Serial.print("payload: ");
Serial.println(payload);
}
void setup()
{
Serial.begin(115200);
while (!Serial && millis() < 5000);
delay(500);
// 连接WIFI
WiFi.begin(WIFI_SSID, WIFI_PASSWORD);
while (WiFi.waitForConnectResult() != WL_CONNECTED) {
Serial.println("Connection Failed! Rebooting...");
delay(5000);
ESP.restart();
}
delay(500);
mqttClient.onConnect(onMqttConnect); // 设置 当MQTT连接时的回调函数
mqttClient.onDisconnect(onMqttDisconnect); // 设置 当MQTT断开连接时的回调函数
mqttClient.onSubscribe(onMqttSubscribe); // 设置 当MQTT订阅主题时的回调函数
mqttClient.onUnsubscribe(onMqttUnsubscribe); // 设置 当取消MQTT订阅主题时的回调函数
mqttClient.onMessage(onMqttMessage); // 设置 当MQTT收到主题消息时的回调函数
mqttClient.setServer(MQTT_HOST, MQTT_PORT); // 设置 MQTT服务器信息
mqttClient.connect(); // 连接 MQTT
delay(500);
}
void loop()
{
}
上传代码后,程序将会先连接WIFI,然后连接MQTT服务器,当连接MQTT时,则会订阅主题,之后每隔2秒就会收到主题发布的消息