ESP8266通过mqtt保存数据到mysql
0.硬件和软件
ESP8266、DHT11、EMQX(企业版)、MYSQL(5.6)、服务器ubuntu2018
1.EMQX安装
下载地址
https://www.emqx.com/zh/downloads/enterprise
选择一个版本

选择版本复制下载链接

1.下载安装包
wget https://www.emqx.com/zh/downloads/enterprise/v4.4.17/emqx-ee-4.4.17-otp24.3.4.2-1-ubuntu20.04-amd64.deb
2.执行安装
sudo apt install ./emqx-ee-4.4.17-otp24.3.4.2-1-ubuntu20.04-amd64.deb
3.启动服务
emqx start
按步骤复制上面三个命令执行
安装成功后,开放以下端口:8883、1883、8083、8084、18083
然后访问EMQX后台:服务器IP:18083

由于企业版收费使用试用的license连接只能为10个

(1)安装MQTT库
打开arduino点击库管理,然后搜索pubsubclient

安装PubSubClient
(2)安装DHT11温湿度传感器库
搜索simpleDHT

安装
(3)安装WIFImanager库,这个库可以通过Web端配网
搜索WIFImanager

安装ESP32Wifimanager
下面代码每隔1秒通过MQTT上传温湿度数据,每隔200ms获取订阅的消息(用于远程控制)通过ticker库实现多任务处理,让上传数据和获取订阅同时执行,具体代码:
#include <ESP8266WiFi.h>
#include <PubSubClient.h>
#include <ArduinoJson.h>
#include <SimpleDHT.h>
#include <DNSServer.h>
#include <ESP8266WebServer.h>
#include <WiFiManager.h>
#include <Ticker.h>
// MQTT 信息
const char* mqttServer = "mqtt.xxxxx.com";//MQTT服务器地址
const int mqttPort = 1883; // 默认 MQTT 端口
const char* mqttPublishTopic = "wen"; // MQTT 发布主题
const char* mqttSubscribeTopic = "top"; // MQTT 订阅主题
// 温湿度传感器引脚
const int dhtPin = 2; // 假设温湿度传感器连接到 D4
WiFiClient espClient;
// 创建 MQTT 客户端
PubSubClient client(espClient);
// 创建 DHT 温湿度传感器对象
SimpleDHT11 dht11(dhtPin);
// 定义 Ticker 对象
Ticker publishDataTicker;
Ticker subscribeDataTicker;
// 定义定时器回调函数
void publishDataTask();
void subscribeDataTask();
void setup() {
// 初始化串口
Serial.begin(115200);
// Web配网
// 建立WiFiManager对象
WiFiManager wifiManager;
wifiManager.resetSettings();//清除上一次保存的WIFI账号和密码
wifiManager.autoConnect("Web配网的WIFI名称");
// 连接到 MQTT 服务器
client.setServer(mqttServer, mqttPort);
client.setCallback(callback);
while (!client.connected()) {
Serial.println("连接MQTT服务器中...");
if (client.connect("MQTT连接名称", mqttUser, mqttPassword)) {
Serial.println("连接MQTT成功");
} else {
Serial.print("连接失败: ");
Serial.print(client.state());
delay(2000);
}
}
// 订阅 MQTT 主题
client.subscribe(mqttSubscribeTopic);
// 启动定时器
publishDataTicker.attach_ms(1000, publishDataTask); // 每隔 1 秒执行一次 publishDataTask 函数
subscribeDataTicker.attach_ms(100, subscribeDataTask); // 每隔 100 毫秒执行一次 subscribeDataTask 函数
}
void loop() {
// 循环函数为空,所有任务通过定时器进行处理
}
// 定时器回调函数,发布温湿度数据到 MQTT
void publishDataTask() {
// 读取温湿度数据
byte temperature = 0;
byte humidity = 0;
int err = SimpleDHTErrSuccess;
if ((err = dht11.read(&temperature, &humidity, NULL)) != SimpleDHTErrSuccess) {
Serial.print("温湿度传感器异常:"); Serial.print(SimpleDHTErrCode(err));
Serial.print(","); Serial.println(SimpleDHTErrDuration(err)); delay(1000);
return;
}
// 将温湿度数据转换为json字符串
DynamicJsonDocument data(256);
data["wen"] = String(temperature);
data["shi"] = String(humidity);
char json_string[256];
serializeJson(data, sendjson);
// 发布温湿度数据到 MQTT
client.publish(mqttPublishTopic, sendjson, false);
}
// 定时器回调函数,检查 MQTT 订阅的消息
void subscribeDataTask() {
// 检查 MQTT 订阅的消息
client.loop();
}
// MQTT 消息回调函数
void callback(char* topic, byte* payload, unsigned int length) {
// 处理 MQTT 消息
// 将接收到的消息转换为字符串
String message;
for (int i = 0; i < length; i++) {
message += (char)payload[i];
}
// 输出接收到的消息
Serial.print("收到消息: ");
Serial.println(message);
}
烧录代码,复位后,手机连接esp配网wifi按照要求数据密码账号后等待esp连接到emqx
测试连接,访问mqtt消息调试工具MQTT X Web: 在线的 MQTT Websocket 客户端工具
https://mqttx.app/zh/web

点击新建连接,输入服务器地址点击连接

添加订阅接收数据

测试接收到的数据

3.保存到mysql数据库
添加数据库连接

点击创建,选择mysql

然后输入账号密码,点击确定


根据上面传上来的json数据添加sql模板
SELECT
timestamp as time_up,payload.wen as wen, payload.shi as shi
FROM
"wen"

添加响应动作,选择刚刚设置的sql数据库

添加模板
insert into tmp(time_up, wen, shi) values (FROM_UNIXTIME(${time_up}/1000), ${wen}, ${shi})
最后测试当客户端发送数据上来,数据被保存在sql数据库中

