ESP32 运行MQTT客户端进行主题的发布和订阅

ESP32 MQTT的库有很多,凌顺实验室(lingshunlab.com)这次主要使用AsyncMQTT_ESP32,以后有机会再更多的MQTT其他库的使用方法。

前提条件

效果实现

凌顺实验室(lingshunlab.com)在本示例展示了使用两个ESP32,分别实现发布MQTT的主题消息和订阅并输出MQTT的主题内容。当然,可能会问能不能一个ESP32同时又是发布者,又是订阅者?答案是可以的,因为作为客户端,都是对中间商做信息交换。

BOM

需要准备2个ESP32,
一个ESP32用于发布,
一个ESP32用于订阅。

库的安装

可以在Arduino IDE的库管理里搜索并安装:
点击菜单栏的「工具」---> 「库管理」,然后在搜索框中输入“AsyncMQTT_ESP32”,点击安装即可

下图在我本地已经安装好了:

WX20230223-2148082x

又或者在Github中下载,并安装到Arduino的 "libraries"文件夹里

Github 地址:
https://github.com/khoih-prog/AsyncMQTT_ESP32

程序提点

1, 首先,需要加载AsyncMQTT_ESP32的库

#include <AsyncMQTT_ESP32.h>

2,配置MQTT的服务器信息,可以是IP或者域名的方式

//#define MQTT_HOST         IPAddress(192, 168, 100, 100)
#define MQTT_HOST         "broker.emqx.io"        // Broker address
#define MQTT_PORT         1883

3,设置主题,发布需要主题,订阅也需要主题

const char *Topic  = "lingshunlab/ESP32";               // 主题 

4,创建MQTT客户端的实例

// 创建MQTT客户端的实例,名为mqttClient
AsyncMqttClient mqttClient;

5,认识mqttClient的可用的回调函数

当MQTT触发特定事件的时候,可以配置自定义的函数

  mqttClient.onConnect(onMqttConnect);  // 设置 当MQTT连接时的回调函数
  mqttClient.onDisconnect(onMqttDisconnect); // 设置 当MQTT断开连接时的回调函数
  mqttClient.onSubscribe(onMqttSubscribe); // 设置 当MQTT订阅主题时的回调函数
  mqttClient.onUnsubscribe(onMqttUnsubscribe); // 设置 当MQTT取消订阅主题时的回调函数
  mqttClient.onMessage(onMqttMessage); // 设置 当MQTT订阅主题时的回调函数
  mqttClient.onPublish(onMqttPublish); // 设置 当取消MQTT订阅主题时的回调函数
  mqttClient.setServer(MQTT_HOST, MQTT_PORT); // 设置 MQTT服务器信息

6, 连接MQTT服务器

  mqttClient.setServer(MQTT_HOST, MQTT_PORT); //连接MQTT服务器

7,发布主题

通过以下代码,可以对配置好的主题发布消息

// 发布主题消息
  uint16_t packetIdPub = mqttClient.publish(PubTopic, 2, true, "welcome to Lingshunlab.com");
  Serial.print("Publisshing at QoS 2, packetId: ");
  Serial.println(packetIdPub);
  delay(2000);

8,订阅主题

通过以下代码,可以订阅配置好的主题

// 订阅MQTT主题,并QoS设置为2
  uint16_t packetIdSub = mqttClient.subscribe(SubTopic, 2);
  Serial.print("Subscribing at QoS 2, packetId: ");
  Serial.println(packetIdSub);

9,当发生主题消息变化的时候的回调函数

mqttClient的回调函数有很多种,请仔细学习查看例子中其他的回调函数。在这里,特别说明一下onMessage的回调函数onMqttMessage(这个函数名称你可以自己定义,随喜),里面有不少参数,例如topic,payload等,其中payload即是消息的内容,可以通过输出显示。

void onMqttMessage(char* topic, char* payload, const AsyncMqttClientMessageProperties& properties,
                   const size_t& len, const size_t& index, const size_t& total)
{
  (void) payload;
  Serial.println("=====On MQTT Message=====");
  Serial.println("Publish received.");
  Serial.print("  topic: ");
  Serial.println(topic);
  Serial.print("  qos: ");
  Serial.println(properties.qos);
  Serial.print("  dup: ");
  Serial.println(properties.dup);
  Serial.print("  retain: ");
  Serial.println(properties.retain);
  Serial.print("  len: ");
  Serial.println(len);
  Serial.print("  index: ");
  Serial.println(index);
  Serial.print("  total: ");
  Serial.println(total);
  Serial.print("payload: ");
  Serial.println(payload);   // 输出消息内容
}

10,请查看AsyncMQTT_ESP32的官方例子

可以学习到FreeRTOS的多线程如何应用。

程序代码

发布主题de完整代码

#include <WiFi.h>

// 配置 WIFI 
#define WIFI_SSID         "***your wifi***"
#define WIFI_PASSWORD     "***your wifi password***"

// 加载AsyncMQTT_ESP32库
#include <AsyncMQTT_ESP32.h> 

// 配置MQTT服务器地址和端口
#define MQTT_HOST         IPAddress(192,168,100,100)    // Broker IP
// #define MQTT_HOST         "broker.emqx.io"        // Broker address
#define MQTT_PORT         1883

const char *PubTopic  = "lingshunlab/ESP32";               // 发布消息的主题

AsyncMqttClient mqttClient; // 创建 MQTT客户端实例

void onMqttConnect(bool sessionPresent)   // 编写对应的回调函数
{
  Serial.println("=====On MQTT Connect=====");
  Serial.print("Connected to MQTT broker: ");
  Serial.print(MQTT_HOST);
  Serial.print(", port: ");
  Serial.println(MQTT_PORT);
  Serial.print("PubTopic: ");
  Serial.println(PubTopic);
}

void onMqttDisconnect(AsyncMqttClientDisconnectReason reason)
{
  (void) reason;

  Serial.println("Disconnected from MQTT.");
}

void onMqttSubscribe(const uint16_t& packetId, const uint8_t& qos)
{
  Serial.println("Subscribe acknowledged.");
  Serial.print("  packetId: ");
  Serial.println(packetId);
  Serial.print("  qos: ");
  Serial.println(qos);
}

void onMqttUnsubscribe(const uint16_t& packetId)
{
  Serial.println("Unsubscribe acknowledged.");
  Serial.print("  packetId: ");
  Serial.println(packetId);
}

void onMqttMessage(char* topic, char* payload, const AsyncMqttClientMessageProperties& properties,
                   const size_t& len, const size_t& index, const size_t& total)
{
  (void) payload;
  Serial.println("=====On MQTT Message=====");
  Serial.println("Publish received.");
  Serial.print("  topic: ");
  Serial.println(topic);
  Serial.print("  qos: ");
  Serial.println(properties.qos);
  Serial.print("  dup: ");
  Serial.println(properties.dup);
  Serial.print("  retain: ");
  Serial.println(properties.retain);
  Serial.print("  len: ");
  Serial.println(len);
  Serial.print("  index: ");
  Serial.println(index);
  Serial.print("  total: ");
  Serial.println(total);
  Serial.print("payload: ");
  Serial.println(payload);   // 输出消息内容
}

void onMqttPublish(const uint16_t& packetId)
{
  Serial.println("Publish acknowledged.");
  Serial.print("  packetId: ");
  Serial.println(packetId);
}

void setup()
{
  Serial.begin(115200);
  while (!Serial && millis() < 5000);
  delay(500);

  // 连接WIFI
  WiFi.begin(WIFI_SSID, WIFI_PASSWORD);
  while (WiFi.waitForConnectResult() != WL_CONNECTED) {
        Serial.println("Connection Failed! Rebooting...");
        delay(5000);
        ESP.restart(); // 重启esp32
      }
  delay(500);

  mqttClient.onConnect(onMqttConnect);  // 设置 当MQTT连接时的回调函数
  mqttClient.onDisconnect(onMqttDisconnect); // 设置 当MQTT断开连接时的回调函数
  mqttClient.onMessage(onMqttMessage); // 设置 当MQTT订阅主题时的回调函数
  mqttClient.onPublish(onMqttPublish); // 设置 当取消MQTT订阅主题时的回调函数
  mqttClient.setServer(MQTT_HOST, MQTT_PORT); // 设置 MQTT服务器信息
  mqttClient.connect(); // 连接 MQTT

  delay(500);
}

void loop()
{
  // 发布主题消息
  uint16_t packetIdPub = mqttClient.publish(PubTopic, 2, true, "welcome to Lingshunlab.com");
  Serial.print("Publisshing at QoS 2, packetId: ");
  Serial.println(packetIdPub);
  delay(2000);
}

上传代码后,程序将会先连接WIFI,然后连接MQTT服务器,再之后每隔2秒发布一个对应主题的消息

WX20230223-2332482x

订阅主题de完整代码

#include <WiFi.h>

// 配置 WIFI 
#define WIFI_SSID         "***your wifi***"
#define WIFI_PASSWORD     "***your wifi password***"

// 加载 AsyncMQTT_ESP32 库
#include <AsyncMQTT_ESP32.h>

// 配置MQTT服务器地址和端口
#define MQTT_HOST         IPAddress(192,168,1,55)    // Broker IP
// #define MQTT_HOST         "broker.emqx.io"        // Broker address
#define MQTT_PORT         1883

const char *SubTopic  = "lingshunlab/ESP32";        // 订阅的主题

AsyncMqttClient mqttClient;

void onMqttConnect(bool sessionPresent)
{
  Serial.println("=====On MQTT Connect=====");
  Serial.print("Connected to MQTT broker: ");
  Serial.print(MQTT_HOST);
  Serial.print(", port: ");
  Serial.println(MQTT_PORT);
  Serial.print("PubTopic: ");
  Serial.println(SubTopic);

  // 订阅MQTT主题,并QoS设置为2
  uint16_t packetIdSub = mqttClient.subscribe(SubTopic, 2);
  Serial.print("Subscribing at QoS 2, packetId: ");
  Serial.println(packetIdSub);
}

void onMqttDisconnect(AsyncMqttClientDisconnectReason reason)
{
  (void) reason;

  Serial.println("Disconnected from MQTT.");
}

void onMqttSubscribe(const uint16_t& packetId, const uint8_t& qos)
{
  Serial.println("=====On MQTT Subscribe=====");
  Serial.println("Subscribe acknowledged.");
  Serial.print("  packetId: ");
  Serial.println(packetId);
  Serial.print("  qos: ");
  Serial.println(qos);
}

void onMqttUnsubscribe(const uint16_t& packetId)
{
  Serial.println("Unsubscribe acknowledged.");
  Serial.print("  packetId: ");
  Serial.println(packetId);
}

void onMqttMessage(char* topic, char* payload, const AsyncMqttClientMessageProperties& properties,
                   const size_t& len, const size_t& index, const size_t& total)
{
  (void) payload;
  Serial.println("=====On MQTT Message=====");
  Serial.println("Publish received.");
  Serial.print("  topic: ");
  Serial.println(topic);
  Serial.print("  qos: ");
  Serial.println(properties.qos);
  Serial.print("  dup: ");
  Serial.println(properties.dup);
  Serial.print("  retain: ");
  Serial.println(properties.retain);
  Serial.print("  len: ");
  Serial.println(len);
  Serial.print("  index: ");
  Serial.println(index);
  Serial.print("  total: ");
  Serial.println(total);
  Serial.print("payload: ");
  Serial.println(payload);  
}

void setup()
{
  Serial.begin(115200);
  while (!Serial && millis() < 5000);
  delay(500);

  // 连接WIFI
  WiFi.begin(WIFI_SSID, WIFI_PASSWORD);
  while (WiFi.waitForConnectResult() != WL_CONNECTED) {
        Serial.println("Connection Failed! Rebooting...");
        delay(5000);
        ESP.restart();
      }
  delay(500);

  mqttClient.onConnect(onMqttConnect); // 设置 当MQTT连接时的回调函数
  mqttClient.onDisconnect(onMqttDisconnect);  // 设置 当MQTT断开连接时的回调函数
  mqttClient.onSubscribe(onMqttSubscribe); // 设置 当MQTT订阅主题时的回调函数
  mqttClient.onUnsubscribe(onMqttUnsubscribe); // 设置 当取消MQTT订阅主题时的回调函数
  mqttClient.onMessage(onMqttMessage); // 设置 当MQTT收到主题消息时的回调函数
  mqttClient.setServer(MQTT_HOST, MQTT_PORT); // 设置 MQTT服务器信息
  mqttClient.connect(); // 连接 MQTT

  delay(500);
}

void loop()
{

}

上传代码后,程序将会先连接WIFI,然后连接MQTT服务器,当连接MQTT时,则会订阅主题,之后每隔2秒就会收到主题发布的消息

WX20230223-2341422x