mirror of
https://github.com/gunner47/GyverLamp.git
synced 2025-08-08 09:20:59 +03:00
Добавлены библиотеки для работы с MQTT
This commit is contained in:
12
libraries/async-mqtt-client/.editorconfig
Normal file
12
libraries/async-mqtt-client/.editorconfig
Normal file
@@ -0,0 +1,12 @@
|
||||
root = true
|
||||
|
||||
[*]
|
||||
end_of_line = lf
|
||||
insert_final_newline = true
|
||||
charset = utf-8
|
||||
indent_style = space
|
||||
indent_size = 2
|
||||
trim_trailing_whitespace = true
|
||||
|
||||
[keywords.txt]
|
||||
indent_style = tab
|
1
libraries/async-mqtt-client/.gitignore
vendored
Normal file
1
libraries/async-mqtt-client/.gitignore
vendored
Normal file
@@ -0,0 +1 @@
|
||||
/config.json
|
20
libraries/async-mqtt-client/.travis.yml
Normal file
20
libraries/async-mqtt-client/.travis.yml
Normal file
@@ -0,0 +1,20 @@
|
||||
language: python
|
||||
python:
|
||||
- "2.7"
|
||||
|
||||
cache:
|
||||
directories:
|
||||
- "~/.platformio"
|
||||
|
||||
env:
|
||||
- PLATFORMIO_CI_SRC=examples/FullyFeatured-ESP8266 PLATFORMIO_CI_EXTRA_ARGS="--board=esp01 --board=nodemcuv2"
|
||||
- PLATFORMIO_CI_SRC=examples/FullyFeatured-ESP32 PLATFORMIO_CI_EXTRA_ARGS="--board=lolin32"
|
||||
- CPPLINT=true
|
||||
|
||||
install:
|
||||
- pip install -U https://github.com/platformio/platformio-core/archive/develop.zip
|
||||
- pip install -U cpplint
|
||||
- platformio lib -g install file://.
|
||||
|
||||
script:
|
||||
- if [[ "$CPPLINT" ]]; then make cpplint; else platformio ci $PLATFORMIO_CI_EXTRA_ARGS; fi
|
21
libraries/async-mqtt-client/LICENSE
Normal file
21
libraries/async-mqtt-client/LICENSE
Normal file
@@ -0,0 +1,21 @@
|
||||
The MIT License (MIT)
|
||||
|
||||
Copyright (c) 2015 Marvin Roger
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all
|
||||
copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
SOFTWARE.
|
3
libraries/async-mqtt-client/Makefile
Normal file
3
libraries/async-mqtt-client/Makefile
Normal file
@@ -0,0 +1,3 @@
|
||||
cpplint:
|
||||
cpplint --repository=. --recursive --filter=-whitespace/line_length,-legal/copyright,-runtime/printf,-build/include,-build/namespace ./src
|
||||
.PHONY: cpplint
|
18
libraries/async-mqtt-client/README.md
Normal file
18
libraries/async-mqtt-client/README.md
Normal file
@@ -0,0 +1,18 @@
|
||||
Async MQTT client for ESP8266 and ESP32
|
||||
=============================
|
||||
|
||||
[](https://travis-ci.org/marvinroger/async-mqtt-client)
|
||||
|
||||
An Arduino for ESP8266 and ESP32 asynchronous [MQTT](http://mqtt.org/) client implementation, built on [me-no-dev/ESPAsyncTCP (ESP8266)](https://github.com/me-no-dev/ESPAsyncTCP) | [me-no-dev/AsyncTCP (ESP32)](https://github.com/me-no-dev/AsyncTCP) .
|
||||
## Features
|
||||
|
||||
* Compliant with the 3.1.1 version of the protocol
|
||||
* Fully asynchronous
|
||||
* Subscribe at QoS 0, 1 and 2
|
||||
* Publish at QoS 0, 1 and 2
|
||||
* SSL/TLS support
|
||||
* Available in the [PlatformIO registry](http://platformio.org/lib/show/346/AsyncMqttClient)
|
||||
|
||||
## Requirements, installation and usage
|
||||
|
||||
The project is documented in the [/docs folder](docs).
|
6
libraries/async-mqtt-client/async-mqtt-client.cppcheck
Normal file
6
libraries/async-mqtt-client/async-mqtt-client.cppcheck
Normal file
@@ -0,0 +1,6 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project version="1">
|
||||
<includedir>
|
||||
<dir name="src/"/>
|
||||
</includedir>
|
||||
</project>
|
26
libraries/async-mqtt-client/docs/1.-Getting-started.md
Normal file
26
libraries/async-mqtt-client/docs/1.-Getting-started.md
Normal file
@@ -0,0 +1,26 @@
|
||||
# Getting started
|
||||
|
||||
To use AsyncMqttClient, you need:
|
||||
|
||||
* An ESP8266
|
||||
* The Arduino IDE for ESP8266 (version 2.2.0 minimum)
|
||||
* Basic knowledge of the Arduino environment (upload a sketch, import libraries, ...)
|
||||
|
||||
## Installing AsyncMqttClient
|
||||
|
||||
There are two ways to install AsyncMqttClient.
|
||||
|
||||
### 1a. For the Arduino IDE
|
||||
|
||||
1. Download the [corresponding release](https://github.com/marvinroger/async-mqtt-client/releases/latest)
|
||||
2. Load the `.zip` with **Sketch → Include Library → Add .ZIP Library**
|
||||
|
||||
AsyncMqttClient has 1 dependency: [ESPAsyncTCP](https://github.com/me-no-dev/ESPAsyncTCP). Download the [.zip](https://github.com/me-no-dev/ESPAsyncTCP/archive/master.zip) and install it with the same method as above.
|
||||
|
||||
## Fully-featured sketch
|
||||
|
||||
See [examples/FullyFeatured-ESP8266.ino](../examples/FullyFeatured-ESP8266/FullyFeatured-ESP8266.ino)
|
||||
|
||||
**<u>Very important:</u> As a rule of thumb, never use blocking functions in the callbacks (don't use `delay()` or `yield()`).** Otherwise, you may very probably experience unexpected behaviors.
|
||||
|
||||
You can go to the [API reference](2.-API-reference.md).
|
160
libraries/async-mqtt-client/docs/2.-API-reference.md
Normal file
160
libraries/async-mqtt-client/docs/2.-API-reference.md
Normal file
@@ -0,0 +1,160 @@
|
||||
# API reference
|
||||
|
||||
#### AsyncMqttClient()
|
||||
|
||||
Instantiate a new AsyncMqttClient object.
|
||||
|
||||
### Configuration
|
||||
|
||||
#### AsyncMqttClient& setKeepAlive(uint16_t `keepAlive`)
|
||||
|
||||
Set the keep alive. Defaults to 15 seconds.
|
||||
|
||||
* **`keepAlive`**: Keep alive in seconds
|
||||
|
||||
#### AsyncMqttClient& setClientId(const char\* `clientId`)
|
||||
|
||||
Set the client ID. Defaults to `esp8266<chip ID on 6 hex caracters>`.
|
||||
|
||||
* **`clientId`**: Client ID
|
||||
|
||||
#### AsyncMqttClient& setCleanSession(bool `cleanSession`)
|
||||
|
||||
Whether or not to set the CleanSession flag. Defaults to `true`.
|
||||
|
||||
* **`cleanSession`**: clean session wanted or not
|
||||
|
||||
#### AsyncMqttClient& setMaxTopicLength(uint16_t `maxTopicLength`)
|
||||
|
||||
Set the maximum allowed topic length to receive. If an MQTT packet is received
|
||||
with a topic longer than this maximum, the packet will be ignored. Defaults to `128`.
|
||||
|
||||
* **`maxTopicLength`**: Maximum allowed topic length to receive
|
||||
|
||||
#### AsyncMqttClient& setCredentials(const char\* `username`, const char\* `password` = nullptr)
|
||||
|
||||
Set the username/password. Defaults to non-auth.
|
||||
|
||||
* **`username`**: Username
|
||||
* **`password`**: Password
|
||||
|
||||
#### AsyncMqttClient& setWill(const char\* `topic`, uint8_t `qos`, bool `retain`, const char\* `payload` = nullptr, size_t `length` = 0)
|
||||
|
||||
Set the Last Will Testament. Defaults to none.
|
||||
|
||||
* **`topic`**: Topic of the LWT
|
||||
* **`qos`**: QoS of the LWT
|
||||
* **`retain`**: Retain flag of the LWT
|
||||
* **`payload`**: Payload of the LWT. If unset, the payload will be empty
|
||||
* **`length`**: Payload length. If unset or set to 0, the payload will be considered as a string and its size will be calculated using `strlen(payload)`
|
||||
|
||||
#### AsyncMqttClient& setServer(IPAddress `ip`, uint16_t `port`)
|
||||
|
||||
Set the server.
|
||||
|
||||
* **`ip`**: IP of the server
|
||||
* **`port`**: Port of the server
|
||||
|
||||
#### AsyncMqttClient& setServer(const char\* `host`, uint16_t `port`)
|
||||
|
||||
Set the server.
|
||||
|
||||
* **`host`**: Host of the server
|
||||
* **`port`**: Port of the server
|
||||
|
||||
#### AsyncMqttClient& setSecure(bool `secure`)
|
||||
|
||||
Whether or not to use SSL. Defaults to `false`.
|
||||
|
||||
* **`secure`**: SSL wanted or not.
|
||||
|
||||
#### AsyncMqttClient& addServerFingerprint(const uint8_t\* `fingerprint`)
|
||||
|
||||
Adds an acceptable server fingerprint (SHA1). This may be called multiple times to permit any one of the specified fingerprints. By default, if no fingerprint is added, any fingerprint is accepted.
|
||||
|
||||
* **`fingerprint`**: Fingerprint to add
|
||||
|
||||
### Events handlers
|
||||
|
||||
#### AsyncMqttClient& onConnect(AsyncMqttClientInternals::OnConnectUserCallback `callback`)
|
||||
|
||||
Add a connect event handler.
|
||||
|
||||
* **`callback`**: Function to call
|
||||
|
||||
#### AsyncMqttClient& onDisconnect(AsyncMqttClientInternals::OnDisconnectUserCallback `callback`)
|
||||
|
||||
Add a disconnect event handler.
|
||||
|
||||
* **`callback`**: Function to call
|
||||
|
||||
#### AsyncMqttClient& onSubscribe(AsyncMqttClientInternals::OnSubscribeUserCallback `callback`)
|
||||
|
||||
Add a subscribe acknowledged event handler.
|
||||
|
||||
* **`callback`**: Function to call
|
||||
|
||||
#### AsyncMqttClient& onUnsubscribe(AsyncMqttClientInternals::OnUnsubscribeUserCallback `callback`)
|
||||
|
||||
Add an unsubscribe acknowledged event handler.
|
||||
|
||||
* **`callback`**: Function to call
|
||||
|
||||
#### AsyncMqttClient& onMessage(AsyncMqttClientInternals::OnMessageUserCallback `callback`)
|
||||
|
||||
Add a publish received event handler.
|
||||
|
||||
* **`callback`**: Function to call
|
||||
|
||||
#### AsyncMqttClient& onPublish(AsyncMqttClientInternals::OnPublishUserCallback `callback`)
|
||||
|
||||
Add a publish acknowledged event handler.
|
||||
|
||||
* **`callback`**: Function to call
|
||||
|
||||
### Operation functions
|
||||
|
||||
#### bool connected()
|
||||
|
||||
Return if the client is currently connected to the broker or not.
|
||||
|
||||
#### void connect()
|
||||
|
||||
Connect to the server.
|
||||
|
||||
#### void disconnect(bool `force` = false)
|
||||
|
||||
Disconnect from the server.
|
||||
|
||||
* **`force`**: Whether to force the disconnection. Defaults to `false` (clean disconnection).
|
||||
|
||||
#### uint16_t subscribe(const char\* `topic`, uint8_t `qos`)
|
||||
|
||||
Subscribe to the given topic at the given QoS.
|
||||
|
||||
Return the packet ID or 0 if failed.
|
||||
|
||||
* **`topic`**: Topic
|
||||
* **`qos`**: QoS
|
||||
|
||||
#### uint16_t unsubscribe(const char\* `topic`)
|
||||
|
||||
Unsubscribe from the given topic.
|
||||
|
||||
Return the packet ID or 0 if failed.
|
||||
|
||||
* **`topic`**: Topic
|
||||
|
||||
#### uint16_t publish(const char\* `topic`, uint8_t `qos`, bool `retain`, const char\* `payload` = nullptr, size_t `length` = 0, bool dup = false, uint16_t message_id = 0)
|
||||
|
||||
Publish a packet.
|
||||
|
||||
Return the packet ID (or 1 if QoS 0) or 0 if failed.
|
||||
|
||||
* **`topic`**: Topic
|
||||
* **`qos`**: QoS
|
||||
* **`retain`**: Retain flag
|
||||
* **`payload`**: Payload. If unset, the payload will be empty
|
||||
* **`length`**: Payload length. If unset or set to 0, the payload will be considered as a string and its size will be calculated using `strlen(payload)`
|
||||
* **`dup`**: Duplicate flag. If set or set to 1, the payload will be flagged as a duplicate
|
||||
* **`message_id`**: The message ID. If unset or set to 0, the message ID will be automtaically assigned. Use this with the DUP flag to identify which message is being duplicated
|
7
libraries/async-mqtt-client/docs/3.-Memory-management.md
Normal file
7
libraries/async-mqtt-client/docs/3.-Memory-management.md
Normal file
@@ -0,0 +1,7 @@
|
||||
# Memory management
|
||||
|
||||
AsyncMqttClient does not use an internal buffer, it uses the raw TCP buffer.
|
||||
|
||||
The max receive size is about 1460 bytes per call to your onMessage callback. But the amount of data you can receive is unlimited, as if you receive, say, a 300kB payload (such as an OTA payload), then your `onMessage` callback will be called about 200 times, with the according len, index and total parameters. Keep in mind the library will call your `onMessage` callbacks with the same topic buffer, so if you change the buffer on one call, the buffer will remain changed on subsequent calls.
|
||||
|
||||
You can send data as long as you stay below the available TCP window (which is about 3-4kB on the ESP8266). The data is indeed held in memory by the async TCP code until ACK is received. If the TCP window was sufficient to send your packet, the `publish` method will return a packet ID indicating the packet was sent. Otherwise, a `0` will be returned, and it's your responsability to resend the packet with `publish`.
|
@@ -0,0 +1,19 @@
|
||||
# Limitations and known issues
|
||||
|
||||
* When the CleanSession is set to `false`, the implementation is not spec compliant. The following is not honored:
|
||||
|
||||
> Must be kept in memory:
|
||||
* All messages in a QoS 1 or 2 flow, which are not confirmed by the broker
|
||||
* All received QoS 2 messages, which are not yet confirmed to the broker
|
||||
|
||||
This means retransmission is not honored in case of a failure.
|
||||
|
||||
* You cannot send payload larger that what can fit on RAM.
|
||||
|
||||
## SSL limitations
|
||||
|
||||
* SSL requires use of esp8266/Arduino 2.4.0, which is not yet released (platform = espressif8266_stage in PlatformIO).
|
||||
* SSL requires the build flag -DASYNC_TCP_SSL_ENABLED=1
|
||||
* SSL only supports fingerprints for server validation.
|
||||
* If you do not specify one or more acceptable server fingerprints, the SSL connection will be vulnerable to man-in-the-middle attacks.
|
||||
* Some server certificate signature algorithms do not work. SHA1, SHA224, SHA256, and MD5 are working. SHA384, and SHA512 will cause a crash.
|
3
libraries/async-mqtt-client/docs/5.-Troubleshooting.md
Normal file
3
libraries/async-mqtt-client/docs/5.-Troubleshooting.md
Normal file
@@ -0,0 +1,3 @@
|
||||
# Troubleshooting
|
||||
|
||||
To be completed when issues arise.
|
4
libraries/async-mqtt-client/docs/README.md
Normal file
4
libraries/async-mqtt-client/docs/README.md
Normal file
@@ -0,0 +1,4 @@
|
||||
AsyncMqttClient documentation
|
||||
=============================
|
||||
|
||||
See [index.md](index.md) to view it locally, or http://marvinroger.viewdocs.io/async-mqtt-client/ to view it online.
|
11
libraries/async-mqtt-client/docs/index.md
Normal file
11
libraries/async-mqtt-client/docs/index.md
Normal file
@@ -0,0 +1,11 @@
|
||||
Welcome to the AsyncMqttClient for ESP8266 docs.
|
||||
|
||||
**<p align="center">This documentation is only valid for the AsyncMqttClient version in this repo/directory</p>**
|
||||
|
||||
-----
|
||||
|
||||
#### 1. [Getting started](1.-Getting-started.md)
|
||||
#### 2. [API reference](2.-API-reference.md)
|
||||
#### 3. [Memory management](3.-Memory-management.md)
|
||||
#### 4. [Limitations and known issues](4.-Limitations-and-known-issues.md)
|
||||
#### 5. [Troubleshooting](5.-Troubleshooting.md)
|
@@ -0,0 +1,135 @@
|
||||
/*
|
||||
This example uses FreeRTOS softwaretimers as there is no built-in Ticker library
|
||||
*/
|
||||
|
||||
|
||||
#include <WiFi.h>
|
||||
extern "C" {
|
||||
#include "freertos/FreeRTOS.h"
|
||||
#include "freertos/timers.h"
|
||||
}
|
||||
#include <AsyncMqttClient.h>
|
||||
|
||||
#define WIFI_SSID "yourSSID"
|
||||
#define WIFI_PASSWORD "yourpass"
|
||||
|
||||
#define MQTT_HOST IPAddress(192, 168, 1, 10)
|
||||
#define MQTT_PORT 1883
|
||||
|
||||
AsyncMqttClient mqttClient;
|
||||
TimerHandle_t mqttReconnectTimer;
|
||||
TimerHandle_t wifiReconnectTimer;
|
||||
|
||||
void connectToWifi() {
|
||||
Serial.println("Connecting to Wi-Fi...");
|
||||
WiFi.begin(WIFI_SSID, WIFI_PASSWORD);
|
||||
}
|
||||
|
||||
void connectToMqtt() {
|
||||
Serial.println("Connecting to MQTT...");
|
||||
mqttClient.connect();
|
||||
}
|
||||
|
||||
void WiFiEvent(WiFiEvent_t event) {
|
||||
Serial.printf("[WiFi-event] event: %d\n", event);
|
||||
switch(event) {
|
||||
case SYSTEM_EVENT_STA_GOT_IP:
|
||||
Serial.println("WiFi connected");
|
||||
Serial.println("IP address: ");
|
||||
Serial.println(WiFi.localIP());
|
||||
connectToMqtt();
|
||||
break;
|
||||
case SYSTEM_EVENT_STA_DISCONNECTED:
|
||||
Serial.println("WiFi lost connection");
|
||||
xTimerStop(mqttReconnectTimer, 0); // ensure we don't reconnect to MQTT while reconnecting to Wi-Fi
|
||||
xTimerStart(wifiReconnectTimer, 0);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
void onMqttConnect(bool sessionPresent) {
|
||||
Serial.println("Connected to MQTT.");
|
||||
Serial.print("Session present: ");
|
||||
Serial.println(sessionPresent);
|
||||
uint16_t packetIdSub = mqttClient.subscribe("test/lol", 2);
|
||||
Serial.print("Subscribing at QoS 2, packetId: ");
|
||||
Serial.println(packetIdSub);
|
||||
mqttClient.publish("test/lol", 0, true, "test 1");
|
||||
Serial.println("Publishing at QoS 0");
|
||||
uint16_t packetIdPub1 = mqttClient.publish("test/lol", 1, true, "test 2");
|
||||
Serial.print("Publishing at QoS 1, packetId: ");
|
||||
Serial.println(packetIdPub1);
|
||||
uint16_t packetIdPub2 = mqttClient.publish("test/lol", 2, true, "test 3");
|
||||
Serial.print("Publishing at QoS 2, packetId: ");
|
||||
Serial.println(packetIdPub2);
|
||||
}
|
||||
|
||||
void onMqttDisconnect(AsyncMqttClientDisconnectReason reason) {
|
||||
Serial.println("Disconnected from MQTT.");
|
||||
|
||||
if (WiFi.isConnected()) {
|
||||
xTimerStart(mqttReconnectTimer, 0);
|
||||
}
|
||||
}
|
||||
|
||||
void onMqttSubscribe(uint16_t packetId, uint8_t qos) {
|
||||
Serial.println("Subscribe acknowledged.");
|
||||
Serial.print(" packetId: ");
|
||||
Serial.println(packetId);
|
||||
Serial.print(" qos: ");
|
||||
Serial.println(qos);
|
||||
}
|
||||
|
||||
void onMqttUnsubscribe(uint16_t packetId) {
|
||||
Serial.println("Unsubscribe acknowledged.");
|
||||
Serial.print(" packetId: ");
|
||||
Serial.println(packetId);
|
||||
}
|
||||
|
||||
void onMqttMessage(char* topic, char* payload, AsyncMqttClientMessageProperties properties, size_t len, size_t index, size_t total) {
|
||||
Serial.println("Publish received.");
|
||||
Serial.print(" topic: ");
|
||||
Serial.println(topic);
|
||||
Serial.print(" qos: ");
|
||||
Serial.println(properties.qos);
|
||||
Serial.print(" dup: ");
|
||||
Serial.println(properties.dup);
|
||||
Serial.print(" retain: ");
|
||||
Serial.println(properties.retain);
|
||||
Serial.print(" len: ");
|
||||
Serial.println(len);
|
||||
Serial.print(" index: ");
|
||||
Serial.println(index);
|
||||
Serial.print(" total: ");
|
||||
Serial.println(total);
|
||||
}
|
||||
|
||||
void onMqttPublish(uint16_t packetId) {
|
||||
Serial.println("Publish acknowledged.");
|
||||
Serial.print(" packetId: ");
|
||||
Serial.println(packetId);
|
||||
}
|
||||
|
||||
void setup() {
|
||||
Serial.begin(115200);
|
||||
Serial.println();
|
||||
Serial.println();
|
||||
|
||||
mqttReconnectTimer = xTimerCreate("mqttTimer", pdMS_TO_TICKS(2000), pdFALSE, (void*)0, reinterpret_cast<TimerCallbackFunction_t>(connectToMqtt));
|
||||
wifiReconnectTimer = xTimerCreate("wifiTimer", pdMS_TO_TICKS(2000), pdFALSE, (void*)0, reinterpret_cast<TimerCallbackFunction_t>(connectToWifi));
|
||||
|
||||
WiFi.onEvent(WiFiEvent);
|
||||
|
||||
mqttClient.onConnect(onMqttConnect);
|
||||
mqttClient.onDisconnect(onMqttDisconnect);
|
||||
mqttClient.onSubscribe(onMqttSubscribe);
|
||||
mqttClient.onUnsubscribe(onMqttUnsubscribe);
|
||||
mqttClient.onMessage(onMqttMessage);
|
||||
mqttClient.onPublish(onMqttPublish);
|
||||
mqttClient.setServer(MQTT_HOST, MQTT_PORT);
|
||||
|
||||
connectToWifi();
|
||||
}
|
||||
|
||||
void loop() {
|
||||
}
|
@@ -0,0 +1,122 @@
|
||||
#include <ESP8266WiFi.h>
|
||||
#include <Ticker.h>
|
||||
#include <AsyncMqttClient.h>
|
||||
|
||||
#define WIFI_SSID "My_Wi-Fi"
|
||||
#define WIFI_PASSWORD "my-awesome-password"
|
||||
|
||||
#define MQTT_HOST IPAddress(192, 168, 1, 10)
|
||||
#define MQTT_PORT 1883
|
||||
|
||||
AsyncMqttClient mqttClient;
|
||||
Ticker mqttReconnectTimer;
|
||||
|
||||
WiFiEventHandler wifiConnectHandler;
|
||||
WiFiEventHandler wifiDisconnectHandler;
|
||||
Ticker wifiReconnectTimer;
|
||||
|
||||
void connectToWifi() {
|
||||
Serial.println("Connecting to Wi-Fi...");
|
||||
WiFi.begin(WIFI_SSID, WIFI_PASSWORD);
|
||||
}
|
||||
|
||||
void onWifiConnect(const WiFiEventStationModeGotIP& event) {
|
||||
Serial.println("Connected to Wi-Fi.");
|
||||
connectToMqtt();
|
||||
}
|
||||
|
||||
void onWifiDisconnect(const WiFiEventStationModeDisconnected& event) {
|
||||
Serial.println("Disconnected from Wi-Fi.");
|
||||
mqttReconnectTimer.detach(); // ensure we don't reconnect to MQTT while reconnecting to Wi-Fi
|
||||
wifiReconnectTimer.once(2, connectToWifi);
|
||||
}
|
||||
|
||||
void connectToMqtt() {
|
||||
Serial.println("Connecting to MQTT...");
|
||||
mqttClient.connect();
|
||||
}
|
||||
|
||||
void onMqttConnect(bool sessionPresent) {
|
||||
Serial.println("Connected to MQTT.");
|
||||
Serial.print("Session present: ");
|
||||
Serial.println(sessionPresent);
|
||||
uint16_t packetIdSub = mqttClient.subscribe("test/lol", 2);
|
||||
Serial.print("Subscribing at QoS 2, packetId: ");
|
||||
Serial.println(packetIdSub);
|
||||
mqttClient.publish("test/lol", 0, true, "test 1");
|
||||
Serial.println("Publishing at QoS 0");
|
||||
uint16_t packetIdPub1 = mqttClient.publish("test/lol", 1, true, "test 2");
|
||||
Serial.print("Publishing at QoS 1, packetId: ");
|
||||
Serial.println(packetIdPub1);
|
||||
uint16_t packetIdPub2 = mqttClient.publish("test/lol", 2, true, "test 3");
|
||||
Serial.print("Publishing at QoS 2, packetId: ");
|
||||
Serial.println(packetIdPub2);
|
||||
}
|
||||
|
||||
void onMqttDisconnect(AsyncMqttClientDisconnectReason reason) {
|
||||
Serial.println("Disconnected from MQTT.");
|
||||
|
||||
if (WiFi.isConnected()) {
|
||||
mqttReconnectTimer.once(2, connectToMqtt);
|
||||
}
|
||||
}
|
||||
|
||||
void onMqttSubscribe(uint16_t packetId, uint8_t qos) {
|
||||
Serial.println("Subscribe acknowledged.");
|
||||
Serial.print(" packetId: ");
|
||||
Serial.println(packetId);
|
||||
Serial.print(" qos: ");
|
||||
Serial.println(qos);
|
||||
}
|
||||
|
||||
void onMqttUnsubscribe(uint16_t packetId) {
|
||||
Serial.println("Unsubscribe acknowledged.");
|
||||
Serial.print(" packetId: ");
|
||||
Serial.println(packetId);
|
||||
}
|
||||
|
||||
void onMqttMessage(char* topic, char* payload, AsyncMqttClientMessageProperties properties, size_t len, size_t index, size_t total) {
|
||||
Serial.println("Publish received.");
|
||||
Serial.print(" topic: ");
|
||||
Serial.println(topic);
|
||||
Serial.print(" qos: ");
|
||||
Serial.println(properties.qos);
|
||||
Serial.print(" dup: ");
|
||||
Serial.println(properties.dup);
|
||||
Serial.print(" retain: ");
|
||||
Serial.println(properties.retain);
|
||||
Serial.print(" len: ");
|
||||
Serial.println(len);
|
||||
Serial.print(" index: ");
|
||||
Serial.println(index);
|
||||
Serial.print(" total: ");
|
||||
Serial.println(total);
|
||||
}
|
||||
|
||||
void onMqttPublish(uint16_t packetId) {
|
||||
Serial.println("Publish acknowledged.");
|
||||
Serial.print(" packetId: ");
|
||||
Serial.println(packetId);
|
||||
}
|
||||
|
||||
void setup() {
|
||||
Serial.begin(115200);
|
||||
Serial.println();
|
||||
Serial.println();
|
||||
|
||||
wifiConnectHandler = WiFi.onStationModeGotIP(onWifiConnect);
|
||||
wifiDisconnectHandler = WiFi.onStationModeDisconnected(onWifiDisconnect);
|
||||
|
||||
mqttClient.onConnect(onMqttConnect);
|
||||
mqttClient.onDisconnect(onMqttDisconnect);
|
||||
mqttClient.onSubscribe(onMqttSubscribe);
|
||||
mqttClient.onUnsubscribe(onMqttUnsubscribe);
|
||||
mqttClient.onMessage(onMqttMessage);
|
||||
mqttClient.onPublish(onMqttPublish);
|
||||
mqttClient.setServer(MQTT_HOST, MQTT_PORT);
|
||||
|
||||
connectToWifi();
|
||||
}
|
||||
|
||||
void loop() {
|
||||
}
|
@@ -0,0 +1,24 @@
|
||||
#
|
||||
# Example PlatformIO configuration file for SSL and non-SSL builds.
|
||||
#
|
||||
# Before you will be able to build the SSL version of this project, you will
|
||||
# need to explicitly install the espressif8266_stage platform.
|
||||
#
|
||||
# To perform this installation, refer to step 1 of:
|
||||
# http://docs.platformio.org/en/latest/platforms/espressif8266.html#using-arduino-framework-with-staging-version
|
||||
|
||||
[platformio]
|
||||
env_default = ssl
|
||||
|
||||
[env:ssl]
|
||||
platform = espressif8266_stage
|
||||
framework = arduino
|
||||
board = esp01_1m
|
||||
build_flags = -DASYNC_TCP_SSL_ENABLED=1
|
||||
lib_deps = AsyncMqttClient
|
||||
|
||||
[env:nossl]
|
||||
platform = espressif8266
|
||||
framework = arduino
|
||||
board = esp01_1m
|
||||
lib_deps = AsyncMqttClient
|
@@ -0,0 +1,145 @@
|
||||
|
||||
// Example project which can be built with SSL enabled or disabled.
|
||||
// The espressif8266_stage platform must be installed.
|
||||
// Refer to platformio.ini for the build configuration and platform installation.
|
||||
|
||||
#include <Arduino.h>
|
||||
#include <ESP8266WiFi.h>
|
||||
#include <Ticker.h>
|
||||
#include <AsyncMqttClient.h>
|
||||
|
||||
#define WIFI_SSID "My_Wi-Fi"
|
||||
#define WIFI_PASSWORD "my-awesome-password"
|
||||
|
||||
#define MQTT_HOST IPAddress(192, 168, 1, 10)
|
||||
|
||||
#if ASYNC_TCP_SSL_ENABLED
|
||||
#define MQTT_SECURE true
|
||||
#define MQTT_SERVER_FINGERPRINT {0x7e, 0x36, 0x22, 0x01, 0xf9, 0x7e, 0x99, 0x2f, 0xc5, 0xdb, 0x3d, 0xbe, 0xac, 0x48, 0x67, 0x5b, 0x5d, 0x47, 0x94, 0xd2}
|
||||
#define MQTT_PORT 8883
|
||||
#else
|
||||
#define MQTT_PORT 1883
|
||||
#endif
|
||||
|
||||
AsyncMqttClient mqttClient;
|
||||
Ticker mqttReconnectTimer;
|
||||
|
||||
WiFiEventHandler wifiConnectHandler;
|
||||
WiFiEventHandler wifiDisconnectHandler;
|
||||
Ticker wifiReconnectTimer;
|
||||
|
||||
void connectToWifi() {
|
||||
Serial.println("Connecting to Wi-Fi...");
|
||||
WiFi.begin(WIFI_SSID, WIFI_PASSWORD);
|
||||
}
|
||||
|
||||
void connectToMqtt() {
|
||||
Serial.println("Connecting to MQTT...");
|
||||
mqttClient.connect();
|
||||
}
|
||||
|
||||
void onWifiConnect(const WiFiEventStationModeGotIP& event) {
|
||||
Serial.println("Connected to Wi-Fi.");
|
||||
connectToMqtt();
|
||||
}
|
||||
|
||||
void onWifiDisconnect(const WiFiEventStationModeDisconnected& event) {
|
||||
Serial.println("Disconnected from Wi-Fi.");
|
||||
mqttReconnectTimer.detach(); // ensure we don't reconnect to MQTT while reconnecting to Wi-Fi
|
||||
wifiReconnectTimer.once(2, connectToWifi);
|
||||
}
|
||||
|
||||
void onMqttConnect(bool sessionPresent) {
|
||||
Serial.println("Connected to MQTT.");
|
||||
Serial.print("Session present: ");
|
||||
Serial.println(sessionPresent);
|
||||
uint16_t packetIdSub = mqttClient.subscribe("test/lol", 2);
|
||||
Serial.print("Subscribing at QoS 2, packetId: ");
|
||||
Serial.println(packetIdSub);
|
||||
mqttClient.publish("test/lol", 0, true, "test 1");
|
||||
Serial.println("Publishing at QoS 0");
|
||||
uint16_t packetIdPub1 = mqttClient.publish("test/lol", 1, true, "test 2");
|
||||
Serial.print("Publishing at QoS 1, packetId: ");
|
||||
Serial.println(packetIdPub1);
|
||||
uint16_t packetIdPub2 = mqttClient.publish("test/lol", 2, true, "test 3");
|
||||
Serial.print("Publishing at QoS 2, packetId: ");
|
||||
Serial.println(packetIdPub2);
|
||||
}
|
||||
|
||||
void onMqttDisconnect(AsyncMqttClientDisconnectReason reason) {
|
||||
Serial.println("Disconnected from MQTT.");
|
||||
|
||||
if (reason == AsyncMqttClientDisconnectReason::TLS_BAD_FINGERPRINT) {
|
||||
Serial.println("Bad server fingerprint.");
|
||||
}
|
||||
|
||||
if (WiFi.isConnected()) {
|
||||
mqttReconnectTimer.once(2, connectToMqtt);
|
||||
}
|
||||
}
|
||||
|
||||
void onMqttSubscribe(uint16_t packetId, uint8_t qos) {
|
||||
Serial.println("Subscribe acknowledged.");
|
||||
Serial.print(" packetId: ");
|
||||
Serial.println(packetId);
|
||||
Serial.print(" qos: ");
|
||||
Serial.println(qos);
|
||||
}
|
||||
|
||||
void onMqttUnsubscribe(uint16_t packetId) {
|
||||
Serial.println("Unsubscribe acknowledged.");
|
||||
Serial.print(" packetId: ");
|
||||
Serial.println(packetId);
|
||||
}
|
||||
|
||||
void onMqttMessage(char* topic, char* payload, AsyncMqttClientMessageProperties properties, size_t len, size_t index, size_t total) {
|
||||
Serial.println("Publish received.");
|
||||
Serial.print(" topic: ");
|
||||
Serial.println(topic);
|
||||
Serial.print(" qos: ");
|
||||
Serial.println(properties.qos);
|
||||
Serial.print(" dup: ");
|
||||
Serial.println(properties.dup);
|
||||
Serial.print(" retain: ");
|
||||
Serial.println(properties.retain);
|
||||
Serial.print(" len: ");
|
||||
Serial.println(len);
|
||||
Serial.print(" index: ");
|
||||
Serial.println(index);
|
||||
Serial.print(" total: ");
|
||||
Serial.println(total);
|
||||
}
|
||||
|
||||
void onMqttPublish(uint16_t packetId) {
|
||||
Serial.println("Publish acknowledged.");
|
||||
Serial.print(" packetId: ");
|
||||
Serial.println(packetId);
|
||||
}
|
||||
|
||||
void setup() {
|
||||
Serial.begin(115200);
|
||||
Serial.println();
|
||||
Serial.println();
|
||||
|
||||
wifiConnectHandler = WiFi.onStationModeGotIP(onWifiConnect);
|
||||
wifiDisconnectHandler = WiFi.onStationModeDisconnected(onWifiDisconnect);
|
||||
|
||||
mqttClient.onConnect(onMqttConnect);
|
||||
mqttClient.onDisconnect(onMqttDisconnect);
|
||||
mqttClient.onSubscribe(onMqttSubscribe);
|
||||
mqttClient.onUnsubscribe(onMqttUnsubscribe);
|
||||
mqttClient.onMessage(onMqttMessage);
|
||||
mqttClient.onPublish(onMqttPublish);
|
||||
mqttClient.setServer(MQTT_HOST, MQTT_PORT);
|
||||
#if ASYNC_TCP_SSL_ENABLED
|
||||
mqttClient.setSecure(MQTT_SECURE);
|
||||
if (MQTT_SECURE) {
|
||||
mqttClient.addServerFingerprint((const uint8_t[])MQTT_SERVER_FINGERPRINT);
|
||||
}
|
||||
#endif
|
||||
|
||||
connectToWifi();
|
||||
}
|
||||
|
||||
void loop() {
|
||||
}
|
47
libraries/async-mqtt-client/keywords.txt
Normal file
47
libraries/async-mqtt-client/keywords.txt
Normal file
@@ -0,0 +1,47 @@
|
||||
#######################################
|
||||
# Datatypes (KEYWORD1)
|
||||
#######################################
|
||||
|
||||
AsyncMqttClient KEYWORD1
|
||||
AsyncMqttClientDisconnectReason KEYWORD1
|
||||
AsyncMqttClientMessageProperties KEYWORD1
|
||||
|
||||
#######################################
|
||||
# Methods and Functions (KEYWORD2)
|
||||
#######################################
|
||||
|
||||
setKeepAlive KEYWORD2
|
||||
setClientId KEYWORD2
|
||||
setCleanSession KEYWORD2
|
||||
setMaxTopicLength KEYWORD2
|
||||
setCredentials KEYWORD2
|
||||
setWill KEYWORD2
|
||||
setServer KEYWORD2
|
||||
setSecure KEYWORD2
|
||||
addServerFingerprint KEYWORD2
|
||||
|
||||
onConnect KEYWORD2
|
||||
onDisconnect KEYWORD2
|
||||
onSubscribe KEYWORD2
|
||||
onUnsubscribe KEYWORD2
|
||||
onMessage KEYWORD2
|
||||
onPublish KEYWORD2
|
||||
|
||||
connected KEYWORD2
|
||||
connect KEYWORD2
|
||||
disconnect KEYWORD2
|
||||
subscribe KEYWORD2
|
||||
unsubscribe KEYWORD2
|
||||
publish KEYWORD2
|
||||
|
||||
#######################################
|
||||
# Constants (LITERAL1)
|
||||
#######################################
|
||||
|
||||
TCP_DISCONNECTED LITERAL1
|
||||
|
||||
MQTT_UNACCEPTABLE_PROTOCOL_VERSION LITERAL1
|
||||
MQTT_IDENTIFIER_REJECTED LITERAL1
|
||||
MQTT_SERVER_UNAVAILABLE LITERAL1
|
||||
MQTT_MALFORMED_CREDENTIALS LITERAL1
|
||||
MQTT_NOT_AUTHORIZED LITERAL1
|
30
libraries/async-mqtt-client/library.json
Normal file
30
libraries/async-mqtt-client/library.json
Normal file
@@ -0,0 +1,30 @@
|
||||
{
|
||||
"name": "AsyncMqttClient",
|
||||
"keywords": "iot, home, automation, async, mqtt, client, esp8266",
|
||||
"description": "An Arduino for ESP8266 / ESP32 asynchronous MQTT client implementation",
|
||||
"authors":
|
||||
{
|
||||
"name": "Marvin ROGER",
|
||||
"url": "https://www.marvinroger.fr"
|
||||
},
|
||||
"repository":
|
||||
{
|
||||
"type": "git",
|
||||
"url": "https://github.com/marvinroger/async-mqtt-client.git"
|
||||
},
|
||||
"version": "0.8.2",
|
||||
"frameworks": "arduino",
|
||||
"platforms": ["espressif8266", "espressif32"],
|
||||
"dependencies": [
|
||||
{
|
||||
"name": "ESPAsyncTCP",
|
||||
"version": "^1.1.0",
|
||||
"platforms": "espressif8266"
|
||||
},
|
||||
{
|
||||
"name": "AsyncTCP",
|
||||
"version": "^1.0.0",
|
||||
"platforms": "espressif32"
|
||||
}
|
||||
]
|
||||
}
|
9
libraries/async-mqtt-client/library.properties
Normal file
9
libraries/async-mqtt-client/library.properties
Normal file
@@ -0,0 +1,9 @@
|
||||
name=AsyncMqttClient
|
||||
version=0.8.2
|
||||
author=Marvin ROGER
|
||||
maintainer=Marvin ROGER
|
||||
sentence=An Arduino for ESP8266 and ESP32 asynchronous MQTT client implementation
|
||||
paragraph=Like this project? Please star it on GitHub!
|
||||
category=Communication
|
||||
url=https://github.com/marvinroger/async-mqtt-client
|
||||
architectures=esp8266,esp32
|
@@ -0,0 +1,28 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
import argparse
|
||||
import ssl
|
||||
import hashlib
|
||||
|
||||
parser = argparse.ArgumentParser(description='Compute SSL/TLS fingerprints.')
|
||||
parser.add_argument('--host', required=True)
|
||||
parser.add_argument('--port', default=8883)
|
||||
|
||||
args = parser.parse_args()
|
||||
print(args.host)
|
||||
|
||||
cert_pem = ssl.get_server_certificate((args.host, args.port))
|
||||
cert_der = ssl.PEM_cert_to_DER_cert(cert_pem)
|
||||
|
||||
md5 = hashlib.md5(cert_der).hexdigest()
|
||||
sha1 = hashlib.sha1(cert_der).hexdigest()
|
||||
sha256 = hashlib.sha256(cert_der).hexdigest()
|
||||
print("MD5: " + md5)
|
||||
print("SHA1: " + sha1)
|
||||
print("SHA256: " + sha256)
|
||||
|
||||
print("\nSHA1 as array initializer:")
|
||||
print("const uint8_t fingerprint[] = {0x" + ", 0x".join([sha1[i:i+2] for i in range(0, len(sha1), 2)]) + "};")
|
||||
|
||||
print("\nSHA1 as function call:")
|
||||
print("mqttClient.addServerFingerprint((const uint8_t[]){0x" + ", 0x".join([sha1[i:i+2] for i in range(0, len(sha1), 2)]) + "});")
|
877
libraries/async-mqtt-client/src/AsyncMqttClient.cpp
Normal file
877
libraries/async-mqtt-client/src/AsyncMqttClient.cpp
Normal file
@@ -0,0 +1,877 @@
|
||||
#include "AsyncMqttClient.hpp"
|
||||
|
||||
AsyncMqttClient::AsyncMqttClient()
|
||||
: _connected(false)
|
||||
, _connectPacketNotEnoughSpace(false)
|
||||
, _disconnectFlagged(false)
|
||||
, _tlsBadFingerprint(false)
|
||||
, _lastClientActivity(0)
|
||||
, _lastServerActivity(0)
|
||||
, _lastPingRequestTime(0)
|
||||
, _host(nullptr)
|
||||
, _useIp(false)
|
||||
#if ASYNC_TCP_SSL_ENABLED
|
||||
, _secure(false)
|
||||
#endif
|
||||
, _port(0)
|
||||
, _keepAlive(15)
|
||||
, _cleanSession(true)
|
||||
, _clientId(nullptr)
|
||||
, _username(nullptr)
|
||||
, _password(nullptr)
|
||||
, _willTopic(nullptr)
|
||||
, _willPayload(nullptr)
|
||||
, _willPayloadLength(0)
|
||||
, _willQos(0)
|
||||
, _willRetain(false)
|
||||
, _parsingInformation { .bufferState = AsyncMqttClientInternals::BufferState::NONE }
|
||||
, _currentParsedPacket(nullptr)
|
||||
, _remainingLengthBufferPosition(0)
|
||||
, _nextPacketId(1) {
|
||||
_client.onConnect([](void* obj, AsyncClient* c) { (static_cast<AsyncMqttClient*>(obj))->_onConnect(c); }, this);
|
||||
_client.onDisconnect([](void* obj, AsyncClient* c) { (static_cast<AsyncMqttClient*>(obj))->_onDisconnect(c); }, this);
|
||||
_client.onError([](void* obj, AsyncClient* c, int8_t error) { (static_cast<AsyncMqttClient*>(obj))->_onError(c, error); }, this);
|
||||
_client.onTimeout([](void* obj, AsyncClient* c, uint32_t time) { (static_cast<AsyncMqttClient*>(obj))->_onTimeout(c, time); }, this);
|
||||
_client.onAck([](void* obj, AsyncClient* c, size_t len, uint32_t time) { (static_cast<AsyncMqttClient*>(obj))->_onAck(c, len, time); }, this);
|
||||
_client.onData([](void* obj, AsyncClient* c, void* data, size_t len) { (static_cast<AsyncMqttClient*>(obj))->_onData(c, static_cast<char*>(data), len); }, this);
|
||||
_client.onPoll([](void* obj, AsyncClient* c) { (static_cast<AsyncMqttClient*>(obj))->_onPoll(c); }, this);
|
||||
|
||||
#ifdef ESP32
|
||||
sprintf(_generatedClientId, "esp32%06x", ESP.getEfuseMac());
|
||||
_xSemaphore = xSemaphoreCreateMutex();
|
||||
#elif defined(ESP8266)
|
||||
sprintf(_generatedClientId, "esp8266%06x", ESP.getChipId());
|
||||
#endif
|
||||
_clientId = _generatedClientId;
|
||||
|
||||
setMaxTopicLength(128);
|
||||
}
|
||||
|
||||
AsyncMqttClient::~AsyncMqttClient() {
|
||||
delete _currentParsedPacket;
|
||||
delete[] _parsingInformation.topicBuffer;
|
||||
#ifdef ESP32
|
||||
vSemaphoreDelete(_xSemaphore);
|
||||
#endif
|
||||
}
|
||||
|
||||
AsyncMqttClient& AsyncMqttClient::setKeepAlive(uint16_t keepAlive) {
|
||||
_keepAlive = keepAlive;
|
||||
return *this;
|
||||
}
|
||||
|
||||
AsyncMqttClient& AsyncMqttClient::setClientId(const char* clientId) {
|
||||
_clientId = clientId;
|
||||
return *this;
|
||||
}
|
||||
|
||||
AsyncMqttClient& AsyncMqttClient::setCleanSession(bool cleanSession) {
|
||||
_cleanSession = cleanSession;
|
||||
return *this;
|
||||
}
|
||||
|
||||
AsyncMqttClient& AsyncMqttClient::setMaxTopicLength(uint16_t maxTopicLength) {
|
||||
_parsingInformation.maxTopicLength = maxTopicLength;
|
||||
delete[] _parsingInformation.topicBuffer;
|
||||
_parsingInformation.topicBuffer = new char[maxTopicLength + 1];
|
||||
return *this;
|
||||
}
|
||||
|
||||
AsyncMqttClient& AsyncMqttClient::setCredentials(const char* username, const char* password) {
|
||||
_username = username;
|
||||
_password = password;
|
||||
return *this;
|
||||
}
|
||||
|
||||
AsyncMqttClient& AsyncMqttClient::setWill(const char* topic, uint8_t qos, bool retain, const char* payload, size_t length) {
|
||||
_willTopic = topic;
|
||||
_willQos = qos;
|
||||
_willRetain = retain;
|
||||
_willPayload = payload;
|
||||
_willPayloadLength = length;
|
||||
return *this;
|
||||
}
|
||||
|
||||
AsyncMqttClient& AsyncMqttClient::setServer(IPAddress ip, uint16_t port) {
|
||||
_useIp = true;
|
||||
_ip = ip;
|
||||
_port = port;
|
||||
return *this;
|
||||
}
|
||||
|
||||
AsyncMqttClient& AsyncMqttClient::setServer(const char* host, uint16_t port) {
|
||||
_useIp = false;
|
||||
_host = host;
|
||||
_port = port;
|
||||
return *this;
|
||||
}
|
||||
|
||||
#if ASYNC_TCP_SSL_ENABLED
|
||||
AsyncMqttClient& AsyncMqttClient::setSecure(bool secure) {
|
||||
_secure = secure;
|
||||
return *this;
|
||||
}
|
||||
|
||||
AsyncMqttClient& AsyncMqttClient::addServerFingerprint(const uint8_t* fingerprint) {
|
||||
std::array<uint8_t, SHA1_SIZE> newFingerprint;
|
||||
memcpy(newFingerprint.data(), fingerprint, SHA1_SIZE);
|
||||
_secureServerFingerprints.push_back(newFingerprint);
|
||||
return *this;
|
||||
}
|
||||
#endif
|
||||
|
||||
AsyncMqttClient& AsyncMqttClient::onConnect(AsyncMqttClientInternals::OnConnectUserCallback callback) {
|
||||
_onConnectUserCallbacks.push_back(callback);
|
||||
return *this;
|
||||
}
|
||||
|
||||
AsyncMqttClient& AsyncMqttClient::onDisconnect(AsyncMqttClientInternals::OnDisconnectUserCallback callback) {
|
||||
_onDisconnectUserCallbacks.push_back(callback);
|
||||
return *this;
|
||||
}
|
||||
|
||||
AsyncMqttClient& AsyncMqttClient::onSubscribe(AsyncMqttClientInternals::OnSubscribeUserCallback callback) {
|
||||
_onSubscribeUserCallbacks.push_back(callback);
|
||||
return *this;
|
||||
}
|
||||
|
||||
AsyncMqttClient& AsyncMqttClient::onUnsubscribe(AsyncMqttClientInternals::OnUnsubscribeUserCallback callback) {
|
||||
_onUnsubscribeUserCallbacks.push_back(callback);
|
||||
return *this;
|
||||
}
|
||||
|
||||
AsyncMqttClient& AsyncMqttClient::onMessage(AsyncMqttClientInternals::OnMessageUserCallback callback) {
|
||||
_onMessageUserCallbacks.push_back(callback);
|
||||
return *this;
|
||||
}
|
||||
|
||||
AsyncMqttClient& AsyncMqttClient::onPublish(AsyncMqttClientInternals::OnPublishUserCallback callback) {
|
||||
_onPublishUserCallbacks.push_back(callback);
|
||||
return *this;
|
||||
}
|
||||
|
||||
void AsyncMqttClient::_freeCurrentParsedPacket() {
|
||||
delete _currentParsedPacket;
|
||||
_currentParsedPacket = nullptr;
|
||||
}
|
||||
|
||||
void AsyncMqttClient::_clear() {
|
||||
_lastPingRequestTime = 0;
|
||||
_connected = false;
|
||||
_disconnectFlagged = false;
|
||||
_connectPacketNotEnoughSpace = false;
|
||||
_tlsBadFingerprint = false;
|
||||
_freeCurrentParsedPacket();
|
||||
|
||||
_pendingPubRels.clear();
|
||||
_pendingPubRels.shrink_to_fit();
|
||||
|
||||
_toSendAcks.clear();
|
||||
_toSendAcks.shrink_to_fit();
|
||||
|
||||
_nextPacketId = 1;
|
||||
_parsingInformation.bufferState = AsyncMqttClientInternals::BufferState::NONE;
|
||||
}
|
||||
|
||||
/* TCP */
|
||||
void AsyncMqttClient::_onConnect(AsyncClient* client) {
|
||||
(void)client;
|
||||
|
||||
#if ASYNC_TCP_SSL_ENABLED
|
||||
if (_secure && _secureServerFingerprints.size() > 0) {
|
||||
SSL* clientSsl = _client.getSSL();
|
||||
|
||||
bool sslFoundFingerprint = false;
|
||||
for (std::array<uint8_t, SHA1_SIZE> fingerprint : _secureServerFingerprints) {
|
||||
if (ssl_match_fingerprint(clientSsl, fingerprint.data()) == SSL_OK) {
|
||||
sslFoundFingerprint = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (!sslFoundFingerprint) {
|
||||
_tlsBadFingerprint = true;
|
||||
_client.close(true);
|
||||
return;
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
char fixedHeader[5];
|
||||
fixedHeader[0] = AsyncMqttClientInternals::PacketType.CONNECT;
|
||||
fixedHeader[0] = fixedHeader[0] << 4;
|
||||
fixedHeader[0] = fixedHeader[0] | AsyncMqttClientInternals::HeaderFlag.CONNECT_RESERVED;
|
||||
|
||||
uint16_t protocolNameLength = 4;
|
||||
char protocolNameLengthBytes[2];
|
||||
protocolNameLengthBytes[0] = protocolNameLength >> 8;
|
||||
protocolNameLengthBytes[1] = protocolNameLength & 0xFF;
|
||||
|
||||
char protocolLevel[1];
|
||||
protocolLevel[0] = 0x04;
|
||||
|
||||
char connectFlags[1];
|
||||
connectFlags[0] = 0;
|
||||
if (_cleanSession) connectFlags[0] |= AsyncMqttClientInternals::ConnectFlag.CLEAN_SESSION;
|
||||
if (_username != nullptr) connectFlags[0] |= AsyncMqttClientInternals::ConnectFlag.USERNAME;
|
||||
if (_password != nullptr) connectFlags[0] |= AsyncMqttClientInternals::ConnectFlag.PASSWORD;
|
||||
if (_willTopic != nullptr) {
|
||||
connectFlags[0] |= AsyncMqttClientInternals::ConnectFlag.WILL;
|
||||
if (_willRetain) connectFlags[0] |= AsyncMqttClientInternals::ConnectFlag.WILL_RETAIN;
|
||||
switch (_willQos) {
|
||||
case 0:
|
||||
connectFlags[0] |= AsyncMqttClientInternals::ConnectFlag.WILL_QOS0;
|
||||
break;
|
||||
case 1:
|
||||
connectFlags[0] |= AsyncMqttClientInternals::ConnectFlag.WILL_QOS1;
|
||||
break;
|
||||
case 2:
|
||||
connectFlags[0] |= AsyncMqttClientInternals::ConnectFlag.WILL_QOS2;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
char keepAliveBytes[2];
|
||||
keepAliveBytes[0] = _keepAlive >> 8;
|
||||
keepAliveBytes[1] = _keepAlive & 0xFF;
|
||||
|
||||
uint16_t clientIdLength = strlen(_clientId);
|
||||
char clientIdLengthBytes[2];
|
||||
clientIdLengthBytes[0] = clientIdLength >> 8;
|
||||
clientIdLengthBytes[1] = clientIdLength & 0xFF;
|
||||
|
||||
// Optional fields
|
||||
uint16_t willTopicLength = 0;
|
||||
char willTopicLengthBytes[2];
|
||||
uint16_t willPayloadLength = _willPayloadLength;
|
||||
char willPayloadLengthBytes[2];
|
||||
if (_willTopic != nullptr) {
|
||||
willTopicLength = strlen(_willTopic);
|
||||
willTopicLengthBytes[0] = willTopicLength >> 8;
|
||||
willTopicLengthBytes[1] = willTopicLength & 0xFF;
|
||||
|
||||
if (_willPayload != nullptr && willPayloadLength == 0) willPayloadLength = strlen(_willPayload);
|
||||
|
||||
willPayloadLengthBytes[0] = willPayloadLength >> 8;
|
||||
willPayloadLengthBytes[1] = willPayloadLength & 0xFF;
|
||||
}
|
||||
|
||||
uint16_t usernameLength = 0;
|
||||
char usernameLengthBytes[2];
|
||||
if (_username != nullptr) {
|
||||
usernameLength = strlen(_username);
|
||||
usernameLengthBytes[0] = usernameLength >> 8;
|
||||
usernameLengthBytes[1] = usernameLength & 0xFF;
|
||||
}
|
||||
|
||||
uint16_t passwordLength = 0;
|
||||
char passwordLengthBytes[2];
|
||||
if (_password != nullptr) {
|
||||
passwordLength = strlen(_password);
|
||||
passwordLengthBytes[0] = passwordLength >> 8;
|
||||
passwordLengthBytes[1] = passwordLength & 0xFF;
|
||||
}
|
||||
|
||||
uint32_t remainingLength = 2 + protocolNameLength + 1 + 1 + 2 + 2 + clientIdLength; // always present
|
||||
if (_willTopic != nullptr) remainingLength += 2 + willTopicLength + 2 + willPayloadLength;
|
||||
if (_username != nullptr) remainingLength += 2 + usernameLength;
|
||||
if (_password != nullptr) remainingLength += 2 + passwordLength;
|
||||
uint8_t remainingLengthLength = AsyncMqttClientInternals::Helpers::encodeRemainingLength(remainingLength, fixedHeader + 1);
|
||||
|
||||
uint32_t neededSpace = 1 + remainingLengthLength;
|
||||
neededSpace += 2;
|
||||
neededSpace += protocolNameLength;
|
||||
neededSpace += 1;
|
||||
neededSpace += 1;
|
||||
neededSpace += 2;
|
||||
neededSpace += 2;
|
||||
neededSpace += clientIdLength;
|
||||
if (_willTopic != nullptr) {
|
||||
neededSpace += 2;
|
||||
neededSpace += willTopicLength;
|
||||
|
||||
neededSpace += 2;
|
||||
if (_willPayload != nullptr) neededSpace += willPayloadLength;
|
||||
}
|
||||
if (_username != nullptr) {
|
||||
neededSpace += 2;
|
||||
neededSpace += usernameLength;
|
||||
}
|
||||
if (_password != nullptr) {
|
||||
neededSpace += 2;
|
||||
neededSpace += passwordLength;
|
||||
}
|
||||
|
||||
SEMAPHORE_TAKE();
|
||||
if (_client.space() < neededSpace) {
|
||||
_connectPacketNotEnoughSpace = true;
|
||||
_client.close(true);
|
||||
SEMAPHORE_GIVE();
|
||||
return;
|
||||
}
|
||||
|
||||
_client.add(fixedHeader, 1 + remainingLengthLength);
|
||||
_client.add(protocolNameLengthBytes, 2);
|
||||
_client.add("MQTT", protocolNameLength);
|
||||
_client.add(protocolLevel, 1);
|
||||
_client.add(connectFlags, 1);
|
||||
_client.add(keepAliveBytes, 2);
|
||||
_client.add(clientIdLengthBytes, 2);
|
||||
_client.add(_clientId, clientIdLength);
|
||||
if (_willTopic != nullptr) {
|
||||
_client.add(willTopicLengthBytes, 2);
|
||||
_client.add(_willTopic, willTopicLength);
|
||||
|
||||
_client.add(willPayloadLengthBytes, 2);
|
||||
if (_willPayload != nullptr) _client.add(_willPayload, willPayloadLength);
|
||||
}
|
||||
if (_username != nullptr) {
|
||||
_client.add(usernameLengthBytes, 2);
|
||||
_client.add(_username, usernameLength);
|
||||
}
|
||||
if (_password != nullptr) {
|
||||
_client.add(passwordLengthBytes, 2);
|
||||
_client.add(_password, passwordLength);
|
||||
}
|
||||
_client.send();
|
||||
_lastClientActivity = millis();
|
||||
SEMAPHORE_GIVE();
|
||||
}
|
||||
|
||||
void AsyncMqttClient::_onDisconnect(AsyncClient* client) {
|
||||
(void)client;
|
||||
if (!_disconnectFlagged) {
|
||||
AsyncMqttClientDisconnectReason reason;
|
||||
|
||||
if (_connectPacketNotEnoughSpace) {
|
||||
reason = AsyncMqttClientDisconnectReason::ESP8266_NOT_ENOUGH_SPACE;
|
||||
} else if (_tlsBadFingerprint) {
|
||||
reason = AsyncMqttClientDisconnectReason::TLS_BAD_FINGERPRINT;
|
||||
} else {
|
||||
reason = AsyncMqttClientDisconnectReason::TCP_DISCONNECTED;
|
||||
}
|
||||
for (auto callback : _onDisconnectUserCallbacks) callback(reason);
|
||||
}
|
||||
_clear();
|
||||
}
|
||||
|
||||
void AsyncMqttClient::_onError(AsyncClient* client, int8_t error) {
|
||||
(void)client;
|
||||
(void)error;
|
||||
// _onDisconnect called anyway
|
||||
}
|
||||
|
||||
void AsyncMqttClient::_onTimeout(AsyncClient* client, uint32_t time) {
|
||||
(void)client;
|
||||
(void)time;
|
||||
// disconnection will be handled by ping/pong management
|
||||
}
|
||||
|
||||
void AsyncMqttClient::_onAck(AsyncClient* client, size_t len, uint32_t time) {
|
||||
(void)client;
|
||||
(void)len;
|
||||
(void)time;
|
||||
}
|
||||
|
||||
void AsyncMqttClient::_onData(AsyncClient* client, char* data, size_t len) {
|
||||
(void)client;
|
||||
size_t currentBytePosition = 0;
|
||||
char currentByte;
|
||||
do {
|
||||
switch (_parsingInformation.bufferState) {
|
||||
case AsyncMqttClientInternals::BufferState::NONE:
|
||||
currentByte = data[currentBytePosition++];
|
||||
_parsingInformation.packetType = currentByte >> 4;
|
||||
_parsingInformation.packetFlags = (currentByte << 4) >> 4;
|
||||
_parsingInformation.bufferState = AsyncMqttClientInternals::BufferState::REMAINING_LENGTH;
|
||||
_lastServerActivity = millis();
|
||||
switch (_parsingInformation.packetType) {
|
||||
case AsyncMqttClientInternals::PacketType.CONNACK:
|
||||
_currentParsedPacket = new AsyncMqttClientInternals::ConnAckPacket(&_parsingInformation, std::bind(&AsyncMqttClient::_onConnAck, this, std::placeholders::_1, std::placeholders::_2));
|
||||
break;
|
||||
case AsyncMqttClientInternals::PacketType.PINGRESP:
|
||||
_currentParsedPacket = new AsyncMqttClientInternals::PingRespPacket(&_parsingInformation, std::bind(&AsyncMqttClient::_onPingResp, this));
|
||||
break;
|
||||
case AsyncMqttClientInternals::PacketType.SUBACK:
|
||||
_currentParsedPacket = new AsyncMqttClientInternals::SubAckPacket(&_parsingInformation, std::bind(&AsyncMqttClient::_onSubAck, this, std::placeholders::_1, std::placeholders::_2));
|
||||
break;
|
||||
case AsyncMqttClientInternals::PacketType.UNSUBACK:
|
||||
_currentParsedPacket = new AsyncMqttClientInternals::UnsubAckPacket(&_parsingInformation, std::bind(&AsyncMqttClient::_onUnsubAck, this, std::placeholders::_1));
|
||||
break;
|
||||
case AsyncMqttClientInternals::PacketType.PUBLISH:
|
||||
_currentParsedPacket = new AsyncMqttClientInternals::PublishPacket(&_parsingInformation, std::bind(&AsyncMqttClient::_onMessage, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4, std::placeholders::_5, std::placeholders::_6, std::placeholders::_7, std::placeholders::_8, std::placeholders::_9), std::bind(&AsyncMqttClient::_onPublish, this, std::placeholders::_1, std::placeholders::_2));
|
||||
break;
|
||||
case AsyncMqttClientInternals::PacketType.PUBREL:
|
||||
_currentParsedPacket = new AsyncMqttClientInternals::PubRelPacket(&_parsingInformation, std::bind(&AsyncMqttClient::_onPubRel, this, std::placeholders::_1));
|
||||
break;
|
||||
case AsyncMqttClientInternals::PacketType.PUBACK:
|
||||
_currentParsedPacket = new AsyncMqttClientInternals::PubAckPacket(&_parsingInformation, std::bind(&AsyncMqttClient::_onPubAck, this, std::placeholders::_1));
|
||||
break;
|
||||
case AsyncMqttClientInternals::PacketType.PUBREC:
|
||||
_currentParsedPacket = new AsyncMqttClientInternals::PubRecPacket(&_parsingInformation, std::bind(&AsyncMqttClient::_onPubRec, this, std::placeholders::_1));
|
||||
break;
|
||||
case AsyncMqttClientInternals::PacketType.PUBCOMP:
|
||||
_currentParsedPacket = new AsyncMqttClientInternals::PubCompPacket(&_parsingInformation, std::bind(&AsyncMqttClient::_onPubComp, this, std::placeholders::_1));
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
break;
|
||||
case AsyncMqttClientInternals::BufferState::REMAINING_LENGTH:
|
||||
currentByte = data[currentBytePosition++];
|
||||
_remainingLengthBuffer[_remainingLengthBufferPosition++] = currentByte;
|
||||
if (currentByte >> 7 == 0) {
|
||||
_parsingInformation.remainingLength = AsyncMqttClientInternals::Helpers::decodeRemainingLength(_remainingLengthBuffer);
|
||||
_remainingLengthBufferPosition = 0;
|
||||
if (_parsingInformation.remainingLength > 0) {
|
||||
_parsingInformation.bufferState = AsyncMqttClientInternals::BufferState::VARIABLE_HEADER;
|
||||
} else {
|
||||
// PINGRESP is a special case where it has no variable header, so the packet ends right here
|
||||
_parsingInformation.bufferState = AsyncMqttClientInternals::BufferState::NONE;
|
||||
_onPingResp();
|
||||
}
|
||||
}
|
||||
break;
|
||||
case AsyncMqttClientInternals::BufferState::VARIABLE_HEADER:
|
||||
_currentParsedPacket->parseVariableHeader(data, len, ¤tBytePosition);
|
||||
break;
|
||||
case AsyncMqttClientInternals::BufferState::PAYLOAD:
|
||||
_currentParsedPacket->parsePayload(data, len, ¤tBytePosition);
|
||||
break;
|
||||
default:
|
||||
currentBytePosition = len;
|
||||
}
|
||||
} while (currentBytePosition != len);
|
||||
}
|
||||
|
||||
void AsyncMqttClient::_onPoll(AsyncClient* client) {
|
||||
if (!_connected) return;
|
||||
|
||||
// if there is too much time the client has sent a ping request without a response, disconnect client to avoid half open connections
|
||||
if (_lastPingRequestTime != 0 && (millis() - _lastPingRequestTime) >= (_keepAlive * 1000 * 2)) {
|
||||
disconnect();
|
||||
return;
|
||||
// send ping to ensure the server will receive at least one message inside keepalive window
|
||||
} else if (_lastPingRequestTime == 0 && (millis() - _lastClientActivity) >= (_keepAlive * 1000 * 0.7)) {
|
||||
_sendPing();
|
||||
|
||||
// send ping to verify if the server is still there (ensure this is not a half connection)
|
||||
} else if (_connected && _lastPingRequestTime == 0 && (millis() - _lastServerActivity) >= (_keepAlive * 1000 * 0.7)) {
|
||||
_sendPing();
|
||||
}
|
||||
|
||||
// handle to send ack packets
|
||||
|
||||
_sendAcks();
|
||||
|
||||
// handle disconnect
|
||||
|
||||
if (_disconnectFlagged) {
|
||||
_sendDisconnect();
|
||||
}
|
||||
}
|
||||
|
||||
/* MQTT */
|
||||
void AsyncMqttClient::_onPingResp() {
|
||||
_freeCurrentParsedPacket();
|
||||
_lastPingRequestTime = 0;
|
||||
}
|
||||
|
||||
void AsyncMqttClient::_onConnAck(bool sessionPresent, uint8_t connectReturnCode) {
|
||||
(void)sessionPresent;
|
||||
_freeCurrentParsedPacket();
|
||||
|
||||
if (connectReturnCode == 0) {
|
||||
_connected = true;
|
||||
for (auto callback : _onConnectUserCallbacks) callback(sessionPresent);
|
||||
} else {
|
||||
for (auto callback : _onDisconnectUserCallbacks) callback(static_cast<AsyncMqttClientDisconnectReason>(connectReturnCode));
|
||||
_disconnectFlagged = true;
|
||||
}
|
||||
}
|
||||
|
||||
void AsyncMqttClient::_onSubAck(uint16_t packetId, char status) {
|
||||
_freeCurrentParsedPacket();
|
||||
|
||||
for (auto callback : _onSubscribeUserCallbacks) callback(packetId, status);
|
||||
}
|
||||
|
||||
void AsyncMqttClient::_onUnsubAck(uint16_t packetId) {
|
||||
_freeCurrentParsedPacket();
|
||||
|
||||
for (auto callback : _onUnsubscribeUserCallbacks) callback(packetId);
|
||||
}
|
||||
|
||||
void AsyncMqttClient::_onMessage(char* topic, char* payload, uint8_t qos, bool dup, bool retain, size_t len, size_t index, size_t total, uint16_t packetId) {
|
||||
bool notifyPublish = true;
|
||||
|
||||
if (qos == 2) {
|
||||
for (AsyncMqttClientInternals::PendingPubRel pendingPubRel : _pendingPubRels) {
|
||||
if (pendingPubRel.packetId == packetId) {
|
||||
notifyPublish = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (notifyPublish) {
|
||||
AsyncMqttClientMessageProperties properties;
|
||||
properties.qos = qos;
|
||||
properties.dup = dup;
|
||||
properties.retain = retain;
|
||||
|
||||
for (auto callback : _onMessageUserCallbacks) callback(topic, payload, properties, len, index, total);
|
||||
}
|
||||
}
|
||||
|
||||
void AsyncMqttClient::_onPublish(uint16_t packetId, uint8_t qos) {
|
||||
AsyncMqttClientInternals::PendingAck pendingAck;
|
||||
|
||||
if (qos == 1) {
|
||||
pendingAck.packetType = AsyncMqttClientInternals::PacketType.PUBACK;
|
||||
pendingAck.headerFlag = AsyncMqttClientInternals::HeaderFlag.PUBACK_RESERVED;
|
||||
pendingAck.packetId = packetId;
|
||||
_toSendAcks.push_back(pendingAck);
|
||||
} else if (qos == 2) {
|
||||
pendingAck.packetType = AsyncMqttClientInternals::PacketType.PUBREC;
|
||||
pendingAck.headerFlag = AsyncMqttClientInternals::HeaderFlag.PUBREC_RESERVED;
|
||||
pendingAck.packetId = packetId;
|
||||
_toSendAcks.push_back(pendingAck);
|
||||
|
||||
bool pubRelAwaiting = false;
|
||||
for (AsyncMqttClientInternals::PendingPubRel pendingPubRel : _pendingPubRels) {
|
||||
if (pendingPubRel.packetId == packetId) {
|
||||
pubRelAwaiting = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (!pubRelAwaiting) {
|
||||
AsyncMqttClientInternals::PendingPubRel pendingPubRel;
|
||||
pendingPubRel.packetId = packetId;
|
||||
_pendingPubRels.push_back(pendingPubRel);
|
||||
}
|
||||
|
||||
_sendAcks();
|
||||
}
|
||||
|
||||
_freeCurrentParsedPacket();
|
||||
}
|
||||
|
||||
void AsyncMqttClient::_onPubRel(uint16_t packetId) {
|
||||
_freeCurrentParsedPacket();
|
||||
|
||||
AsyncMqttClientInternals::PendingAck pendingAck;
|
||||
pendingAck.packetType = AsyncMqttClientInternals::PacketType.PUBCOMP;
|
||||
pendingAck.headerFlag = AsyncMqttClientInternals::HeaderFlag.PUBCOMP_RESERVED;
|
||||
pendingAck.packetId = packetId;
|
||||
_toSendAcks.push_back(pendingAck);
|
||||
|
||||
for (size_t i = 0; i < _pendingPubRels.size(); i++) {
|
||||
if (_pendingPubRels[i].packetId == packetId) {
|
||||
_pendingPubRels.erase(_pendingPubRels.begin() + i);
|
||||
_pendingPubRels.shrink_to_fit();
|
||||
}
|
||||
}
|
||||
|
||||
_sendAcks();
|
||||
}
|
||||
|
||||
void AsyncMqttClient::_onPubAck(uint16_t packetId) {
|
||||
_freeCurrentParsedPacket();
|
||||
|
||||
for (auto callback : _onPublishUserCallbacks) callback(packetId);
|
||||
}
|
||||
|
||||
void AsyncMqttClient::_onPubRec(uint16_t packetId) {
|
||||
_freeCurrentParsedPacket();
|
||||
|
||||
AsyncMqttClientInternals::PendingAck pendingAck;
|
||||
pendingAck.packetType = AsyncMqttClientInternals::PacketType.PUBREL;
|
||||
pendingAck.headerFlag = AsyncMqttClientInternals::HeaderFlag.PUBREL_RESERVED;
|
||||
pendingAck.packetId = packetId;
|
||||
_toSendAcks.push_back(pendingAck);
|
||||
|
||||
_sendAcks();
|
||||
}
|
||||
|
||||
void AsyncMqttClient::_onPubComp(uint16_t packetId) {
|
||||
_freeCurrentParsedPacket();
|
||||
|
||||
for (auto callback : _onPublishUserCallbacks) callback(packetId);
|
||||
}
|
||||
|
||||
bool AsyncMqttClient::_sendPing() {
|
||||
char fixedHeader[2];
|
||||
fixedHeader[0] = AsyncMqttClientInternals::PacketType.PINGREQ;
|
||||
fixedHeader[0] = fixedHeader[0] << 4;
|
||||
fixedHeader[0] = fixedHeader[0] | AsyncMqttClientInternals::HeaderFlag.PINGREQ_RESERVED;
|
||||
fixedHeader[1] = 0;
|
||||
|
||||
size_t neededSpace = 2;
|
||||
|
||||
SEMAPHORE_TAKE(false);
|
||||
if (_client.space() < neededSpace) { SEMAPHORE_GIVE(); return false; }
|
||||
|
||||
_client.add(fixedHeader, 2);
|
||||
_client.send();
|
||||
_lastClientActivity = millis();
|
||||
_lastPingRequestTime = millis();
|
||||
|
||||
SEMAPHORE_GIVE();
|
||||
return true;
|
||||
}
|
||||
|
||||
void AsyncMqttClient::_sendAcks() {
|
||||
uint8_t neededAckSpace = 2 + 2;
|
||||
|
||||
SEMAPHORE_TAKE();
|
||||
for (size_t i = 0; i < _toSendAcks.size(); i++) {
|
||||
if (_client.space() < neededAckSpace) break;
|
||||
|
||||
AsyncMqttClientInternals::PendingAck pendingAck = _toSendAcks[i];
|
||||
|
||||
char fixedHeader[2];
|
||||
fixedHeader[0] = pendingAck.packetType;
|
||||
fixedHeader[0] = fixedHeader[0] << 4;
|
||||
fixedHeader[0] = fixedHeader[0] | pendingAck.headerFlag;
|
||||
fixedHeader[1] = 2;
|
||||
|
||||
char packetIdBytes[2];
|
||||
packetIdBytes[0] = pendingAck.packetId >> 8;
|
||||
packetIdBytes[1] = pendingAck.packetId & 0xFF;
|
||||
|
||||
_client.add(fixedHeader, 2);
|
||||
_client.add(packetIdBytes, 2);
|
||||
_client.send();
|
||||
|
||||
_toSendAcks.erase(_toSendAcks.begin() + i);
|
||||
_toSendAcks.shrink_to_fit();
|
||||
|
||||
_lastClientActivity = millis();
|
||||
}
|
||||
SEMAPHORE_GIVE();
|
||||
}
|
||||
|
||||
bool AsyncMqttClient::_sendDisconnect() {
|
||||
if (!_connected) return true;
|
||||
|
||||
const uint8_t neededSpace = 2;
|
||||
|
||||
SEMAPHORE_TAKE(false);
|
||||
|
||||
if (_client.space() < neededSpace) { SEMAPHORE_GIVE(); return false; }
|
||||
|
||||
char fixedHeader[2];
|
||||
fixedHeader[0] = AsyncMqttClientInternals::PacketType.DISCONNECT;
|
||||
fixedHeader[0] = fixedHeader[0] << 4;
|
||||
fixedHeader[0] = fixedHeader[0] | AsyncMqttClientInternals::HeaderFlag.DISCONNECT_RESERVED;
|
||||
fixedHeader[1] = 0;
|
||||
|
||||
_client.add(fixedHeader, 2);
|
||||
_client.send();
|
||||
_client.close(true);
|
||||
|
||||
_disconnectFlagged = false;
|
||||
|
||||
SEMAPHORE_GIVE();
|
||||
return true;
|
||||
}
|
||||
|
||||
uint16_t AsyncMqttClient::_getNextPacketId() {
|
||||
uint16_t nextPacketId = _nextPacketId;
|
||||
|
||||
if (_nextPacketId == 65535) _nextPacketId = 0; // 0 is forbidden
|
||||
_nextPacketId++;
|
||||
|
||||
return nextPacketId;
|
||||
}
|
||||
|
||||
bool AsyncMqttClient::connected() const {
|
||||
return _connected;
|
||||
}
|
||||
|
||||
void AsyncMqttClient::connect() {
|
||||
if (_connected) return;
|
||||
|
||||
#if ASYNC_TCP_SSL_ENABLED
|
||||
if (_useIp) {
|
||||
_client.connect(_ip, _port, _secure);
|
||||
} else {
|
||||
_client.connect(_host, _port, _secure);
|
||||
}
|
||||
#else
|
||||
if (_useIp) {
|
||||
_client.connect(_ip, _port);
|
||||
} else {
|
||||
_client.connect(_host, _port);
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
void AsyncMqttClient::disconnect(bool force) {
|
||||
if (!_connected) return;
|
||||
|
||||
if (force) {
|
||||
_client.close(true);
|
||||
} else {
|
||||
_disconnectFlagged = true;
|
||||
_sendDisconnect();
|
||||
}
|
||||
}
|
||||
|
||||
uint16_t AsyncMqttClient::subscribe(const char* topic, uint8_t qos) {
|
||||
if (!_connected) return 0;
|
||||
|
||||
char fixedHeader[5];
|
||||
fixedHeader[0] = AsyncMqttClientInternals::PacketType.SUBSCRIBE;
|
||||
fixedHeader[0] = fixedHeader[0] << 4;
|
||||
fixedHeader[0] = fixedHeader[0] | AsyncMqttClientInternals::HeaderFlag.SUBSCRIBE_RESERVED;
|
||||
|
||||
uint16_t topicLength = strlen(topic);
|
||||
char topicLengthBytes[2];
|
||||
topicLengthBytes[0] = topicLength >> 8;
|
||||
topicLengthBytes[1] = topicLength & 0xFF;
|
||||
|
||||
char qosByte[1];
|
||||
qosByte[0] = qos;
|
||||
|
||||
uint8_t remainingLengthLength = AsyncMqttClientInternals::Helpers::encodeRemainingLength(2 + 2 + topicLength + 1, fixedHeader + 1);
|
||||
|
||||
size_t neededSpace = 0;
|
||||
neededSpace += 1 + remainingLengthLength;
|
||||
neededSpace += 2;
|
||||
neededSpace += 2;
|
||||
neededSpace += topicLength;
|
||||
neededSpace += 1;
|
||||
|
||||
SEMAPHORE_TAKE(0);
|
||||
if (_client.space() < neededSpace) { SEMAPHORE_GIVE(); return 0; }
|
||||
|
||||
uint16_t packetId = _getNextPacketId();
|
||||
char packetIdBytes[2];
|
||||
packetIdBytes[0] = packetId >> 8;
|
||||
packetIdBytes[1] = packetId & 0xFF;
|
||||
|
||||
_client.add(fixedHeader, 1 + remainingLengthLength);
|
||||
_client.add(packetIdBytes, 2);
|
||||
_client.add(topicLengthBytes, 2);
|
||||
_client.add(topic, topicLength);
|
||||
_client.add(qosByte, 1);
|
||||
_client.send();
|
||||
_lastClientActivity = millis();
|
||||
|
||||
SEMAPHORE_GIVE();
|
||||
return packetId;
|
||||
}
|
||||
|
||||
uint16_t AsyncMqttClient::unsubscribe(const char* topic) {
|
||||
if (!_connected) return 0;
|
||||
|
||||
char fixedHeader[5];
|
||||
fixedHeader[0] = AsyncMqttClientInternals::PacketType.UNSUBSCRIBE;
|
||||
fixedHeader[0] = fixedHeader[0] << 4;
|
||||
fixedHeader[0] = fixedHeader[0] | AsyncMqttClientInternals::HeaderFlag.UNSUBSCRIBE_RESERVED;
|
||||
|
||||
uint16_t topicLength = strlen(topic);
|
||||
char topicLengthBytes[2];
|
||||
topicLengthBytes[0] = topicLength >> 8;
|
||||
topicLengthBytes[1] = topicLength & 0xFF;
|
||||
|
||||
uint8_t remainingLengthLength = AsyncMqttClientInternals::Helpers::encodeRemainingLength(2 + 2 + topicLength, fixedHeader + 1);
|
||||
|
||||
size_t neededSpace = 0;
|
||||
neededSpace += 1 + remainingLengthLength;
|
||||
neededSpace += 2;
|
||||
neededSpace += 2;
|
||||
neededSpace += topicLength;
|
||||
|
||||
SEMAPHORE_TAKE(0);
|
||||
if (_client.space() < neededSpace) { SEMAPHORE_GIVE(); return 0; }
|
||||
|
||||
uint16_t packetId = _getNextPacketId();
|
||||
char packetIdBytes[2];
|
||||
packetIdBytes[0] = packetId >> 8;
|
||||
packetIdBytes[1] = packetId & 0xFF;
|
||||
|
||||
_client.add(fixedHeader, 1 + remainingLengthLength);
|
||||
_client.add(packetIdBytes, 2);
|
||||
_client.add(topicLengthBytes, 2);
|
||||
_client.add(topic, topicLength);
|
||||
_client.send();
|
||||
_lastClientActivity = millis();
|
||||
|
||||
SEMAPHORE_GIVE();
|
||||
return packetId;
|
||||
}
|
||||
|
||||
uint16_t AsyncMqttClient::publish(const char* topic, uint8_t qos, bool retain, const char* payload, size_t length, bool dup, uint16_t message_id) {
|
||||
if (!_connected) return 0;
|
||||
|
||||
char fixedHeader[5];
|
||||
fixedHeader[0] = AsyncMqttClientInternals::PacketType.PUBLISH;
|
||||
fixedHeader[0] = fixedHeader[0] << 4;
|
||||
if (dup) fixedHeader[0] |= AsyncMqttClientInternals::HeaderFlag.PUBLISH_DUP;
|
||||
if (retain) fixedHeader[0] |= AsyncMqttClientInternals::HeaderFlag.PUBLISH_RETAIN;
|
||||
switch (qos) {
|
||||
case 0:
|
||||
fixedHeader[0] |= AsyncMqttClientInternals::HeaderFlag.PUBLISH_QOS0;
|
||||
break;
|
||||
case 1:
|
||||
fixedHeader[0] |= AsyncMqttClientInternals::HeaderFlag.PUBLISH_QOS1;
|
||||
break;
|
||||
case 2:
|
||||
fixedHeader[0] |= AsyncMqttClientInternals::HeaderFlag.PUBLISH_QOS2;
|
||||
break;
|
||||
}
|
||||
|
||||
uint16_t topicLength = strlen(topic);
|
||||
char topicLengthBytes[2];
|
||||
topicLengthBytes[0] = topicLength >> 8;
|
||||
topicLengthBytes[1] = topicLength & 0xFF;
|
||||
|
||||
uint32_t payloadLength = length;
|
||||
if (payload != nullptr && payloadLength == 0) payloadLength = strlen(payload);
|
||||
|
||||
uint32_t remainingLength = 2 + topicLength + payloadLength;
|
||||
if (qos != 0) remainingLength += 2;
|
||||
uint8_t remainingLengthLength = AsyncMqttClientInternals::Helpers::encodeRemainingLength(remainingLength, fixedHeader + 1);
|
||||
|
||||
size_t neededSpace = 0;
|
||||
neededSpace += 1 + remainingLengthLength;
|
||||
neededSpace += 2;
|
||||
neededSpace += topicLength;
|
||||
if (qos != 0) neededSpace += 2;
|
||||
if (payload != nullptr) neededSpace += payloadLength;
|
||||
|
||||
SEMAPHORE_TAKE(0);
|
||||
if (_client.space() < neededSpace) { SEMAPHORE_GIVE(); return 0; }
|
||||
|
||||
uint16_t packetId = 0;
|
||||
char packetIdBytes[2];
|
||||
if (qos != 0) {
|
||||
if (dup && message_id > 0) {
|
||||
packetId = message_id;
|
||||
} else {
|
||||
packetId = _getNextPacketId();
|
||||
}
|
||||
|
||||
packetIdBytes[0] = packetId >> 8;
|
||||
packetIdBytes[1] = packetId & 0xFF;
|
||||
}
|
||||
|
||||
_client.add(fixedHeader, 1 + remainingLengthLength);
|
||||
_client.add(topicLengthBytes, 2);
|
||||
_client.add(topic, topicLength);
|
||||
if (qos != 0) _client.add(packetIdBytes, 2);
|
||||
if (payload != nullptr) _client.add(payload, payloadLength);
|
||||
_client.send();
|
||||
_lastClientActivity = millis();
|
||||
|
||||
SEMAPHORE_GIVE();
|
||||
if (qos != 0) {
|
||||
return packetId;
|
||||
} else {
|
||||
return 1;
|
||||
}
|
||||
}
|
6
libraries/async-mqtt-client/src/AsyncMqttClient.h
Normal file
6
libraries/async-mqtt-client/src/AsyncMqttClient.h
Normal file
@@ -0,0 +1,6 @@
|
||||
#ifndef SRC_ASYNCMQTTCLIENT_H_
|
||||
#define SRC_ASYNCMQTTCLIENT_H_
|
||||
|
||||
#include "AsyncMqttClient.hpp"
|
||||
|
||||
#endif // SRC_ASYNCMQTTCLIENT_H_
|
166
libraries/async-mqtt-client/src/AsyncMqttClient.hpp
Normal file
166
libraries/async-mqtt-client/src/AsyncMqttClient.hpp
Normal file
@@ -0,0 +1,166 @@
|
||||
#pragma once
|
||||
|
||||
#include <functional>
|
||||
#include <vector>
|
||||
|
||||
#include "Arduino.h"
|
||||
|
||||
#ifdef ESP32
|
||||
#include <AsyncTCP.h>
|
||||
#include <freertos/semphr.h>
|
||||
#elif defined(ESP8266)
|
||||
#include <ESPAsyncTCP.h>
|
||||
#else
|
||||
#error Platform not supported
|
||||
#endif
|
||||
|
||||
#if ASYNC_TCP_SSL_ENABLED
|
||||
#include <tcp_axtls.h>
|
||||
#define SHA1_SIZE 20
|
||||
#endif
|
||||
|
||||
#include "AsyncMqttClient/Flags.hpp"
|
||||
#include "AsyncMqttClient/ParsingInformation.hpp"
|
||||
#include "AsyncMqttClient/MessageProperties.hpp"
|
||||
#include "AsyncMqttClient/Helpers.hpp"
|
||||
#include "AsyncMqttClient/Callbacks.hpp"
|
||||
#include "AsyncMqttClient/DisconnectReasons.hpp"
|
||||
#include "AsyncMqttClient/Storage.hpp"
|
||||
|
||||
#include "AsyncMqttClient/Packets/Packet.hpp"
|
||||
#include "AsyncMqttClient/Packets/ConnAckPacket.hpp"
|
||||
#include "AsyncMqttClient/Packets/PingRespPacket.hpp"
|
||||
#include "AsyncMqttClient/Packets/SubAckPacket.hpp"
|
||||
#include "AsyncMqttClient/Packets/UnsubAckPacket.hpp"
|
||||
#include "AsyncMqttClient/Packets/PublishPacket.hpp"
|
||||
#include "AsyncMqttClient/Packets/PubRelPacket.hpp"
|
||||
#include "AsyncMqttClient/Packets/PubAckPacket.hpp"
|
||||
#include "AsyncMqttClient/Packets/PubRecPacket.hpp"
|
||||
#include "AsyncMqttClient/Packets/PubCompPacket.hpp"
|
||||
|
||||
#if ESP32
|
||||
#define SEMAPHORE_TAKE(X) if (xSemaphoreTake(_xSemaphore, 1000 / portTICK_PERIOD_MS) != pdTRUE) { return X; } // Waits max 1000ms
|
||||
#define SEMAPHORE_GIVE() xSemaphoreGive(_xSemaphore);
|
||||
#elif defined(ESP8266)
|
||||
#define SEMAPHORE_TAKE(X) void()
|
||||
#define SEMAPHORE_GIVE() void()
|
||||
#endif
|
||||
|
||||
class AsyncMqttClient {
|
||||
public:
|
||||
AsyncMqttClient();
|
||||
~AsyncMqttClient();
|
||||
|
||||
AsyncMqttClient& setKeepAlive(uint16_t keepAlive);
|
||||
AsyncMqttClient& setClientId(const char* clientId);
|
||||
AsyncMqttClient& setCleanSession(bool cleanSession);
|
||||
AsyncMqttClient& setMaxTopicLength(uint16_t maxTopicLength);
|
||||
AsyncMqttClient& setCredentials(const char* username, const char* password = nullptr);
|
||||
AsyncMqttClient& setWill(const char* topic, uint8_t qos, bool retain, const char* payload = nullptr, size_t length = 0);
|
||||
AsyncMqttClient& setServer(IPAddress ip, uint16_t port);
|
||||
AsyncMqttClient& setServer(const char* host, uint16_t port);
|
||||
#if ASYNC_TCP_SSL_ENABLED
|
||||
AsyncMqttClient& setSecure(bool secure);
|
||||
AsyncMqttClient& addServerFingerprint(const uint8_t* fingerprint);
|
||||
#endif
|
||||
|
||||
AsyncMqttClient& onConnect(AsyncMqttClientInternals::OnConnectUserCallback callback);
|
||||
AsyncMqttClient& onDisconnect(AsyncMqttClientInternals::OnDisconnectUserCallback callback);
|
||||
AsyncMqttClient& onSubscribe(AsyncMqttClientInternals::OnSubscribeUserCallback callback);
|
||||
AsyncMqttClient& onUnsubscribe(AsyncMqttClientInternals::OnUnsubscribeUserCallback callback);
|
||||
AsyncMqttClient& onMessage(AsyncMqttClientInternals::OnMessageUserCallback callback);
|
||||
AsyncMqttClient& onPublish(AsyncMqttClientInternals::OnPublishUserCallback callback);
|
||||
|
||||
bool connected() const;
|
||||
void connect();
|
||||
void disconnect(bool force = false);
|
||||
uint16_t subscribe(const char* topic, uint8_t qos);
|
||||
uint16_t unsubscribe(const char* topic);
|
||||
uint16_t publish(const char* topic, uint8_t qos, bool retain, const char* payload = nullptr, size_t length = 0, bool dup = false, uint16_t message_id = 0);
|
||||
|
||||
private:
|
||||
AsyncClient _client;
|
||||
|
||||
bool _connected;
|
||||
bool _connectPacketNotEnoughSpace;
|
||||
bool _disconnectFlagged;
|
||||
bool _tlsBadFingerprint;
|
||||
uint32_t _lastClientActivity;
|
||||
uint32_t _lastServerActivity;
|
||||
uint32_t _lastPingRequestTime;
|
||||
|
||||
char _generatedClientId[13 + 1]; // esp8266abc123
|
||||
IPAddress _ip;
|
||||
const char* _host;
|
||||
bool _useIp;
|
||||
#if ASYNC_TCP_SSL_ENABLED
|
||||
bool _secure;
|
||||
#endif
|
||||
uint16_t _port;
|
||||
uint16_t _keepAlive;
|
||||
bool _cleanSession;
|
||||
const char* _clientId;
|
||||
const char* _username;
|
||||
const char* _password;
|
||||
const char* _willTopic;
|
||||
const char* _willPayload;
|
||||
uint16_t _willPayloadLength;
|
||||
uint8_t _willQos;
|
||||
bool _willRetain;
|
||||
|
||||
#if ASYNC_TCP_SSL_ENABLED
|
||||
std::vector<std::array<uint8_t, SHA1_SIZE>> _secureServerFingerprints;
|
||||
#endif
|
||||
|
||||
std::vector<AsyncMqttClientInternals::OnConnectUserCallback> _onConnectUserCallbacks;
|
||||
std::vector<AsyncMqttClientInternals::OnDisconnectUserCallback> _onDisconnectUserCallbacks;
|
||||
std::vector<AsyncMqttClientInternals::OnSubscribeUserCallback> _onSubscribeUserCallbacks;
|
||||
std::vector<AsyncMqttClientInternals::OnUnsubscribeUserCallback> _onUnsubscribeUserCallbacks;
|
||||
std::vector<AsyncMqttClientInternals::OnMessageUserCallback> _onMessageUserCallbacks;
|
||||
std::vector<AsyncMqttClientInternals::OnPublishUserCallback> _onPublishUserCallbacks;
|
||||
|
||||
AsyncMqttClientInternals::ParsingInformation _parsingInformation;
|
||||
AsyncMqttClientInternals::Packet* _currentParsedPacket;
|
||||
uint8_t _remainingLengthBufferPosition;
|
||||
char _remainingLengthBuffer[4];
|
||||
|
||||
uint16_t _nextPacketId;
|
||||
|
||||
std::vector<AsyncMqttClientInternals::PendingPubRel> _pendingPubRels;
|
||||
|
||||
std::vector<AsyncMqttClientInternals::PendingAck> _toSendAcks;
|
||||
|
||||
#ifdef ESP32
|
||||
SemaphoreHandle_t _xSemaphore = nullptr;
|
||||
#endif
|
||||
|
||||
void _clear();
|
||||
void _freeCurrentParsedPacket();
|
||||
|
||||
// TCP
|
||||
void _onConnect(AsyncClient* client);
|
||||
void _onDisconnect(AsyncClient* client);
|
||||
static void _onError(AsyncClient* client, int8_t error);
|
||||
void _onTimeout(AsyncClient* client, uint32_t time);
|
||||
static void _onAck(AsyncClient* client, size_t len, uint32_t time);
|
||||
void _onData(AsyncClient* client, char* data, size_t len);
|
||||
void _onPoll(AsyncClient* client);
|
||||
|
||||
// MQTT
|
||||
void _onPingResp();
|
||||
void _onConnAck(bool sessionPresent, uint8_t connectReturnCode);
|
||||
void _onSubAck(uint16_t packetId, char status);
|
||||
void _onUnsubAck(uint16_t packetId);
|
||||
void _onMessage(char* topic, char* payload, uint8_t qos, bool dup, bool retain, size_t len, size_t index, size_t total, uint16_t packetId);
|
||||
void _onPublish(uint16_t packetId, uint8_t qos);
|
||||
void _onPubRel(uint16_t packetId);
|
||||
void _onPubAck(uint16_t packetId);
|
||||
void _onPubRec(uint16_t packetId);
|
||||
void _onPubComp(uint16_t packetId);
|
||||
|
||||
bool _sendPing();
|
||||
void _sendAcks();
|
||||
bool _sendDisconnect();
|
||||
|
||||
uint16_t _getNextPacketId();
|
||||
};
|
@@ -0,0 +1,28 @@
|
||||
#pragma once
|
||||
|
||||
#include <functional>
|
||||
|
||||
#include "DisconnectReasons.hpp"
|
||||
#include "MessageProperties.hpp"
|
||||
|
||||
namespace AsyncMqttClientInternals {
|
||||
// user callbacks
|
||||
typedef std::function<void(bool sessionPresent)> OnConnectUserCallback;
|
||||
typedef std::function<void(AsyncMqttClientDisconnectReason reason)> OnDisconnectUserCallback;
|
||||
typedef std::function<void(uint16_t packetId, uint8_t qos)> OnSubscribeUserCallback;
|
||||
typedef std::function<void(uint16_t packetId)> OnUnsubscribeUserCallback;
|
||||
typedef std::function<void(char* topic, char* payload, AsyncMqttClientMessageProperties properties, size_t len, size_t index, size_t total)> OnMessageUserCallback;
|
||||
typedef std::function<void(uint16_t packetId)> OnPublishUserCallback;
|
||||
|
||||
// internal callbacks
|
||||
typedef std::function<void(bool sessionPresent, uint8_t connectReturnCode)> OnConnAckInternalCallback;
|
||||
typedef std::function<void()> OnPingRespInternalCallback;
|
||||
typedef std::function<void(uint16_t packetId, char status)> OnSubAckInternalCallback;
|
||||
typedef std::function<void(uint16_t packetId)> OnUnsubAckInternalCallback;
|
||||
typedef std::function<void(char* topic, char* payload, uint8_t qos, bool dup, bool retain, size_t len, size_t index, size_t total, uint16_t packetId)> OnMessageInternalCallback;
|
||||
typedef std::function<void(uint16_t packetId, uint8_t qos)> OnPublishInternalCallback;
|
||||
typedef std::function<void(uint16_t packetId)> OnPubRelInternalCallback;
|
||||
typedef std::function<void(uint16_t packetId)> OnPubAckInternalCallback;
|
||||
typedef std::function<void(uint16_t packetId)> OnPubRecInternalCallback;
|
||||
typedef std::function<void(uint16_t packetId)> OnPubCompInternalCallback;
|
||||
} // namespace AsyncMqttClientInternals
|
@@ -0,0 +1,15 @@
|
||||
#pragma once
|
||||
|
||||
enum class AsyncMqttClientDisconnectReason : int8_t {
|
||||
TCP_DISCONNECTED = 0,
|
||||
|
||||
MQTT_UNACCEPTABLE_PROTOCOL_VERSION = 1,
|
||||
MQTT_IDENTIFIER_REJECTED = 2,
|
||||
MQTT_SERVER_UNAVAILABLE = 3,
|
||||
MQTT_MALFORMED_CREDENTIALS = 4,
|
||||
MQTT_NOT_AUTHORIZED = 5,
|
||||
|
||||
ESP8266_NOT_ENOUGH_SPACE = 6,
|
||||
|
||||
TLS_BAD_FINGERPRINT = 7
|
||||
};
|
57
libraries/async-mqtt-client/src/AsyncMqttClient/Flags.hpp
Normal file
57
libraries/async-mqtt-client/src/AsyncMqttClient/Flags.hpp
Normal file
@@ -0,0 +1,57 @@
|
||||
#pragma once
|
||||
|
||||
namespace AsyncMqttClientInternals {
|
||||
constexpr struct {
|
||||
const uint8_t RESERVED = 0;
|
||||
const uint8_t CONNECT = 1;
|
||||
const uint8_t CONNACK = 2;
|
||||
const uint8_t PUBLISH = 3;
|
||||
const uint8_t PUBACK = 4;
|
||||
const uint8_t PUBREC = 5;
|
||||
const uint8_t PUBREL = 6;
|
||||
const uint8_t PUBCOMP = 7;
|
||||
const uint8_t SUBSCRIBE = 8;
|
||||
const uint8_t SUBACK = 9;
|
||||
const uint8_t UNSUBSCRIBE = 10;
|
||||
const uint8_t UNSUBACK = 11;
|
||||
const uint8_t PINGREQ = 12;
|
||||
const uint8_t PINGRESP = 13;
|
||||
const uint8_t DISCONNECT = 14;
|
||||
const uint8_t RESERVED2 = 1;
|
||||
} PacketType;
|
||||
|
||||
constexpr struct {
|
||||
const uint8_t CONNECT_RESERVED = 0x00;
|
||||
const uint8_t CONNACK_RESERVED = 0x00;
|
||||
const uint8_t PUBLISH_DUP = 0x08;
|
||||
const uint8_t PUBLISH_QOS0 = 0x00;
|
||||
const uint8_t PUBLISH_QOS1 = 0x02;
|
||||
const uint8_t PUBLISH_QOS2 = 0x04;
|
||||
const uint8_t PUBLISH_QOSRESERVED = 0x06;
|
||||
const uint8_t PUBLISH_RETAIN = 0x01;
|
||||
const uint8_t PUBACK_RESERVED = 0x00;
|
||||
const uint8_t PUBREC_RESERVED = 0x00;
|
||||
const uint8_t PUBREL_RESERVED = 0x02;
|
||||
const uint8_t PUBCOMP_RESERVED = 0x00;
|
||||
const uint8_t SUBSCRIBE_RESERVED = 0x02;
|
||||
const uint8_t SUBACK_RESERVED = 0x00;
|
||||
const uint8_t UNSUBSCRIBE_RESERVED = 0x02;
|
||||
const uint8_t UNSUBACK_RESERVED = 0x00;
|
||||
const uint8_t PINGREQ_RESERVED = 0x00;
|
||||
const uint8_t PINGRESP_RESERVED = 0x00;
|
||||
const uint8_t DISCONNECT_RESERVED = 0x00;
|
||||
const uint8_t RESERVED2_RESERVED = 0x00;
|
||||
} HeaderFlag;
|
||||
|
||||
constexpr struct {
|
||||
const uint8_t USERNAME = 0x80;
|
||||
const uint8_t PASSWORD = 0x40;
|
||||
const uint8_t WILL_RETAIN = 0x20;
|
||||
const uint8_t WILL_QOS0 = 0x00;
|
||||
const uint8_t WILL_QOS1 = 0x08;
|
||||
const uint8_t WILL_QOS2 = 0x10;
|
||||
const uint8_t WILL = 0x04;
|
||||
const uint8_t CLEAN_SESSION = 0x02;
|
||||
const uint8_t RESERVED = 0x00;
|
||||
} ConnectFlag;
|
||||
} // namespace AsyncMqttClientInternals
|
38
libraries/async-mqtt-client/src/AsyncMqttClient/Helpers.hpp
Normal file
38
libraries/async-mqtt-client/src/AsyncMqttClient/Helpers.hpp
Normal file
@@ -0,0 +1,38 @@
|
||||
#pragma once
|
||||
|
||||
namespace AsyncMqttClientInternals {
|
||||
class Helpers {
|
||||
public:
|
||||
static uint32_t decodeRemainingLength(char* bytes) {
|
||||
uint32_t multiplier = 1;
|
||||
uint32_t value = 0;
|
||||
uint8_t currentByte = 0;
|
||||
uint8_t encodedByte;
|
||||
do {
|
||||
encodedByte = bytes[currentByte++];
|
||||
value += (encodedByte & 127) * multiplier;
|
||||
multiplier *= 128;
|
||||
} while ((encodedByte & 128) != 0);
|
||||
|
||||
return value;
|
||||
}
|
||||
|
||||
static uint8_t encodeRemainingLength(uint32_t remainingLength, char* destination) {
|
||||
uint8_t currentByte = 0;
|
||||
uint8_t bytesNeeded = 0;
|
||||
|
||||
do {
|
||||
uint8_t encodedByte = remainingLength % 128;
|
||||
remainingLength /= 128;
|
||||
if (remainingLength > 0) {
|
||||
encodedByte = encodedByte | 128;
|
||||
}
|
||||
|
||||
destination[currentByte++] = encodedByte;
|
||||
bytesNeeded++;
|
||||
} while (remainingLength > 0);
|
||||
|
||||
return bytesNeeded;
|
||||
}
|
||||
};
|
||||
} // namespace AsyncMqttClientInternals
|
@@ -0,0 +1,7 @@
|
||||
#pragma once
|
||||
|
||||
struct AsyncMqttClientMessageProperties {
|
||||
uint8_t qos;
|
||||
bool dup;
|
||||
bool retain;
|
||||
};
|
@@ -0,0 +1,30 @@
|
||||
#include "ConnAckPacket.hpp"
|
||||
|
||||
using AsyncMqttClientInternals::ConnAckPacket;
|
||||
|
||||
ConnAckPacket::ConnAckPacket(ParsingInformation* parsingInformation, OnConnAckInternalCallback callback)
|
||||
: _parsingInformation(parsingInformation)
|
||||
, _callback(callback)
|
||||
, _bytePosition(0)
|
||||
, _sessionPresent(false)
|
||||
, _connectReturnCode(0) {
|
||||
}
|
||||
|
||||
ConnAckPacket::~ConnAckPacket() {
|
||||
}
|
||||
|
||||
void ConnAckPacket::parseVariableHeader(char* data, size_t len, size_t* currentBytePosition) {
|
||||
char currentByte = data[(*currentBytePosition)++];
|
||||
if (_bytePosition++ == 0) {
|
||||
_sessionPresent = (currentByte << 7) >> 7;
|
||||
} else {
|
||||
_connectReturnCode = currentByte;
|
||||
_parsingInformation->bufferState = BufferState::NONE;
|
||||
_callback(_sessionPresent, _connectReturnCode);
|
||||
}
|
||||
}
|
||||
|
||||
void ConnAckPacket::parsePayload(char* data, size_t len, size_t* currentBytePosition) {
|
||||
(void)data;
|
||||
(void)currentBytePosition;
|
||||
}
|
@@ -0,0 +1,25 @@
|
||||
#pragma once
|
||||
|
||||
#include "Arduino.h"
|
||||
#include "Packet.hpp"
|
||||
#include "../ParsingInformation.hpp"
|
||||
#include "../Callbacks.hpp"
|
||||
|
||||
namespace AsyncMqttClientInternals {
|
||||
class ConnAckPacket : public Packet {
|
||||
public:
|
||||
explicit ConnAckPacket(ParsingInformation* parsingInformation, OnConnAckInternalCallback callback);
|
||||
~ConnAckPacket();
|
||||
|
||||
void parseVariableHeader(char* data, size_t len, size_t* currentBytePosition);
|
||||
void parsePayload(char* data, size_t len, size_t* currentBytePosition);
|
||||
|
||||
private:
|
||||
ParsingInformation* _parsingInformation;
|
||||
OnConnAckInternalCallback _callback;
|
||||
|
||||
uint8_t _bytePosition;
|
||||
bool _sessionPresent;
|
||||
uint8_t _connectReturnCode;
|
||||
};
|
||||
} // namespace AsyncMqttClientInternals
|
@@ -0,0 +1,11 @@
|
||||
#pragma once
|
||||
|
||||
namespace AsyncMqttClientInternals {
|
||||
class Packet {
|
||||
public:
|
||||
virtual ~Packet() {}
|
||||
|
||||
virtual void parseVariableHeader(char* data, size_t len, size_t* currentBytePosition) = 0;
|
||||
virtual void parsePayload(char* data, size_t len, size_t* currentBytePosition) = 0;
|
||||
};
|
||||
} // namespace AsyncMqttClientInternals
|
@@ -0,0 +1,21 @@
|
||||
#include "PingRespPacket.hpp"
|
||||
|
||||
using AsyncMqttClientInternals::PingRespPacket;
|
||||
|
||||
PingRespPacket::PingRespPacket(ParsingInformation* parsingInformation, OnPingRespInternalCallback callback)
|
||||
: _parsingInformation(parsingInformation)
|
||||
, _callback(callback) {
|
||||
}
|
||||
|
||||
PingRespPacket::~PingRespPacket() {
|
||||
}
|
||||
|
||||
void PingRespPacket::parseVariableHeader(char* data, size_t len, size_t* currentBytePosition) {
|
||||
(void)data;
|
||||
(void)currentBytePosition;
|
||||
}
|
||||
|
||||
void PingRespPacket::parsePayload(char* data, size_t len, size_t* currentBytePosition) {
|
||||
(void)data;
|
||||
(void)currentBytePosition;
|
||||
}
|
@@ -0,0 +1,21 @@
|
||||
#pragma once
|
||||
|
||||
#include "Arduino.h"
|
||||
#include "Packet.hpp"
|
||||
#include "../ParsingInformation.hpp"
|
||||
#include "../Callbacks.hpp"
|
||||
|
||||
namespace AsyncMqttClientInternals {
|
||||
class PingRespPacket : public Packet {
|
||||
public:
|
||||
explicit PingRespPacket(ParsingInformation* parsingInformation, OnPingRespInternalCallback callback);
|
||||
~PingRespPacket();
|
||||
|
||||
void parseVariableHeader(char* data, size_t len, size_t* currentBytePosition);
|
||||
void parsePayload(char* data, size_t len, size_t* currentBytePosition);
|
||||
|
||||
private:
|
||||
ParsingInformation* _parsingInformation;
|
||||
OnPingRespInternalCallback _callback;
|
||||
};
|
||||
} // namespace AsyncMqttClientInternals
|
@@ -0,0 +1,30 @@
|
||||
#include "PubAckPacket.hpp"
|
||||
|
||||
using AsyncMqttClientInternals::PubAckPacket;
|
||||
|
||||
PubAckPacket::PubAckPacket(ParsingInformation* parsingInformation, OnPubAckInternalCallback callback)
|
||||
: _parsingInformation(parsingInformation)
|
||||
, _callback(callback)
|
||||
, _bytePosition(0)
|
||||
, _packetIdMsb(0)
|
||||
, _packetId(0) {
|
||||
}
|
||||
|
||||
PubAckPacket::~PubAckPacket() {
|
||||
}
|
||||
|
||||
void PubAckPacket::parseVariableHeader(char* data, size_t len, size_t* currentBytePosition) {
|
||||
char currentByte = data[(*currentBytePosition)++];
|
||||
if (_bytePosition++ == 0) {
|
||||
_packetIdMsb = currentByte;
|
||||
} else {
|
||||
_packetId = currentByte | _packetIdMsb << 8;
|
||||
_parsingInformation->bufferState = BufferState::NONE;
|
||||
_callback(_packetId);
|
||||
}
|
||||
}
|
||||
|
||||
void PubAckPacket::parsePayload(char* data, size_t len, size_t* currentBytePosition) {
|
||||
(void)data;
|
||||
(void)currentBytePosition;
|
||||
}
|
@@ -0,0 +1,25 @@
|
||||
#pragma once
|
||||
|
||||
#include "Arduino.h"
|
||||
#include "Packet.hpp"
|
||||
#include "../ParsingInformation.hpp"
|
||||
#include "../Callbacks.hpp"
|
||||
|
||||
namespace AsyncMqttClientInternals {
|
||||
class PubAckPacket : public Packet {
|
||||
public:
|
||||
explicit PubAckPacket(ParsingInformation* parsingInformation, OnPubAckInternalCallback callback);
|
||||
~PubAckPacket();
|
||||
|
||||
void parseVariableHeader(char* data, size_t len, size_t* currentBytePosition);
|
||||
void parsePayload(char* data, size_t len, size_t* currentBytePosition);
|
||||
|
||||
private:
|
||||
ParsingInformation* _parsingInformation;
|
||||
OnPubAckInternalCallback _callback;
|
||||
|
||||
uint8_t _bytePosition;
|
||||
char _packetIdMsb;
|
||||
uint16_t _packetId;
|
||||
};
|
||||
} // namespace AsyncMqttClientInternals
|
@@ -0,0 +1,30 @@
|
||||
#include "PubCompPacket.hpp"
|
||||
|
||||
using AsyncMqttClientInternals::PubCompPacket;
|
||||
|
||||
PubCompPacket::PubCompPacket(ParsingInformation* parsingInformation, OnPubCompInternalCallback callback)
|
||||
: _parsingInformation(parsingInformation)
|
||||
, _callback(callback)
|
||||
, _bytePosition(0)
|
||||
, _packetIdMsb(0)
|
||||
, _packetId(0) {
|
||||
}
|
||||
|
||||
PubCompPacket::~PubCompPacket() {
|
||||
}
|
||||
|
||||
void PubCompPacket::parseVariableHeader(char* data, size_t len, size_t* currentBytePosition) {
|
||||
char currentByte = data[(*currentBytePosition)++];
|
||||
if (_bytePosition++ == 0) {
|
||||
_packetIdMsb = currentByte;
|
||||
} else {
|
||||
_packetId = currentByte | _packetIdMsb << 8;
|
||||
_parsingInformation->bufferState = BufferState::NONE;
|
||||
_callback(_packetId);
|
||||
}
|
||||
}
|
||||
|
||||
void PubCompPacket::parsePayload(char* data, size_t len, size_t* currentBytePosition) {
|
||||
(void)data;
|
||||
(void)currentBytePosition;
|
||||
}
|
@@ -0,0 +1,25 @@
|
||||
#pragma once
|
||||
|
||||
#include "Arduino.h"
|
||||
#include "Packet.hpp"
|
||||
#include "../ParsingInformation.hpp"
|
||||
#include "../Callbacks.hpp"
|
||||
|
||||
namespace AsyncMqttClientInternals {
|
||||
class PubCompPacket : public Packet {
|
||||
public:
|
||||
explicit PubCompPacket(ParsingInformation* parsingInformation, OnPubCompInternalCallback callback);
|
||||
~PubCompPacket();
|
||||
|
||||
void parseVariableHeader(char* data, size_t len, size_t* currentBytePosition);
|
||||
void parsePayload(char* data, size_t len, size_t* currentBytePosition);
|
||||
|
||||
private:
|
||||
ParsingInformation* _parsingInformation;
|
||||
OnPubCompInternalCallback _callback;
|
||||
|
||||
uint8_t _bytePosition;
|
||||
char _packetIdMsb;
|
||||
uint16_t _packetId;
|
||||
};
|
||||
} // namespace AsyncMqttClientInternals
|
@@ -0,0 +1,30 @@
|
||||
#include "PubRecPacket.hpp"
|
||||
|
||||
using AsyncMqttClientInternals::PubRecPacket;
|
||||
|
||||
PubRecPacket::PubRecPacket(ParsingInformation* parsingInformation, OnPubRecInternalCallback callback)
|
||||
: _parsingInformation(parsingInformation)
|
||||
, _callback(callback)
|
||||
, _bytePosition(0)
|
||||
, _packetIdMsb(0)
|
||||
, _packetId(0) {
|
||||
}
|
||||
|
||||
PubRecPacket::~PubRecPacket() {
|
||||
}
|
||||
|
||||
void PubRecPacket::parseVariableHeader(char* data, size_t len, size_t* currentBytePosition) {
|
||||
char currentByte = data[(*currentBytePosition)++];
|
||||
if (_bytePosition++ == 0) {
|
||||
_packetIdMsb = currentByte;
|
||||
} else {
|
||||
_packetId = currentByte | _packetIdMsb << 8;
|
||||
_parsingInformation->bufferState = BufferState::NONE;
|
||||
_callback(_packetId);
|
||||
}
|
||||
}
|
||||
|
||||
void PubRecPacket::parsePayload(char* data, size_t len, size_t* currentBytePosition) {
|
||||
(void)data;
|
||||
(void)currentBytePosition;
|
||||
}
|
@@ -0,0 +1,25 @@
|
||||
#pragma once
|
||||
|
||||
#include "Arduino.h"
|
||||
#include "Packet.hpp"
|
||||
#include "../ParsingInformation.hpp"
|
||||
#include "../Callbacks.hpp"
|
||||
|
||||
namespace AsyncMqttClientInternals {
|
||||
class PubRecPacket : public Packet {
|
||||
public:
|
||||
explicit PubRecPacket(ParsingInformation* parsingInformation, OnPubRecInternalCallback callback);
|
||||
~PubRecPacket();
|
||||
|
||||
void parseVariableHeader(char* data, size_t len, size_t* currentBytePosition);
|
||||
void parsePayload(char* data, size_t len, size_t* currentBytePosition);
|
||||
|
||||
private:
|
||||
ParsingInformation* _parsingInformation;
|
||||
OnPubRecInternalCallback _callback;
|
||||
|
||||
uint8_t _bytePosition;
|
||||
char _packetIdMsb;
|
||||
uint16_t _packetId;
|
||||
};
|
||||
} // namespace AsyncMqttClientInternals
|
@@ -0,0 +1,30 @@
|
||||
#include "PubRelPacket.hpp"
|
||||
|
||||
using AsyncMqttClientInternals::PubRelPacket;
|
||||
|
||||
PubRelPacket::PubRelPacket(ParsingInformation* parsingInformation, OnPubRelInternalCallback callback)
|
||||
: _parsingInformation(parsingInformation)
|
||||
, _callback(callback)
|
||||
, _bytePosition(0)
|
||||
, _packetIdMsb(0)
|
||||
, _packetId(0) {
|
||||
}
|
||||
|
||||
PubRelPacket::~PubRelPacket() {
|
||||
}
|
||||
|
||||
void PubRelPacket::parseVariableHeader(char* data, size_t len, size_t* currentBytePosition) {
|
||||
char currentByte = data[(*currentBytePosition)++];
|
||||
if (_bytePosition++ == 0) {
|
||||
_packetIdMsb = currentByte;
|
||||
} else {
|
||||
_packetId = currentByte | _packetIdMsb << 8;
|
||||
_parsingInformation->bufferState = BufferState::NONE;
|
||||
_callback(_packetId);
|
||||
}
|
||||
}
|
||||
|
||||
void PubRelPacket::parsePayload(char* data, size_t len, size_t* currentBytePosition) {
|
||||
(void)data;
|
||||
(void)currentBytePosition;
|
||||
}
|
@@ -0,0 +1,25 @@
|
||||
#pragma once
|
||||
|
||||
#include "Arduino.h"
|
||||
#include "Packet.hpp"
|
||||
#include "../ParsingInformation.hpp"
|
||||
#include "../Callbacks.hpp"
|
||||
|
||||
namespace AsyncMqttClientInternals {
|
||||
class PubRelPacket : public Packet {
|
||||
public:
|
||||
explicit PubRelPacket(ParsingInformation* parsingInformation, OnPubRelInternalCallback callback);
|
||||
~PubRelPacket();
|
||||
|
||||
void parseVariableHeader(char* data, size_t len, size_t* currentBytePosition);
|
||||
void parsePayload(char* data, size_t len, size_t* currentBytePosition);
|
||||
|
||||
private:
|
||||
ParsingInformation* _parsingInformation;
|
||||
OnPubRelInternalCallback _callback;
|
||||
|
||||
uint8_t _bytePosition;
|
||||
char _packetIdMsb;
|
||||
uint16_t _packetId;
|
||||
};
|
||||
} // namespace AsyncMqttClientInternals
|
@@ -0,0 +1,91 @@
|
||||
#include "PublishPacket.hpp"
|
||||
|
||||
using AsyncMqttClientInternals::PublishPacket;
|
||||
|
||||
PublishPacket::PublishPacket(ParsingInformation* parsingInformation, OnMessageInternalCallback dataCallback, OnPublishInternalCallback completeCallback)
|
||||
: _parsingInformation(parsingInformation)
|
||||
, _dataCallback(dataCallback)
|
||||
, _completeCallback(completeCallback)
|
||||
, _dup(false)
|
||||
, _qos(0)
|
||||
, _retain(0)
|
||||
, _bytePosition(0)
|
||||
, _topicLengthMsb(0)
|
||||
, _topicLength(0)
|
||||
, _ignore(false)
|
||||
, _packetIdMsb(0)
|
||||
, _packetId(0)
|
||||
, _payloadLength(0)
|
||||
, _payloadBytesRead(0) {
|
||||
_dup = _parsingInformation->packetFlags & HeaderFlag.PUBLISH_DUP;
|
||||
_retain = _parsingInformation->packetFlags & HeaderFlag.PUBLISH_RETAIN;
|
||||
char qosMasked = _parsingInformation->packetFlags & 0x06;
|
||||
switch (qosMasked) {
|
||||
case HeaderFlag.PUBLISH_QOS0:
|
||||
_qos = 0;
|
||||
break;
|
||||
case HeaderFlag.PUBLISH_QOS1:
|
||||
_qos = 1;
|
||||
break;
|
||||
case HeaderFlag.PUBLISH_QOS2:
|
||||
_qos = 2;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
PublishPacket::~PublishPacket() {
|
||||
}
|
||||
|
||||
void PublishPacket::parseVariableHeader(char* data, size_t len, size_t* currentBytePosition) {
|
||||
char currentByte = data[(*currentBytePosition)++];
|
||||
if (_bytePosition == 0) {
|
||||
_topicLengthMsb = currentByte;
|
||||
} else if (_bytePosition == 1) {
|
||||
_topicLength = currentByte | _topicLengthMsb << 8;
|
||||
if (_topicLength > _parsingInformation->maxTopicLength) {
|
||||
_ignore = true;
|
||||
} else {
|
||||
_parsingInformation->topicBuffer[_topicLength] = '\0';
|
||||
}
|
||||
} else if (_bytePosition >= 2 && _bytePosition < 2 + _topicLength) {
|
||||
// Starting from here, _ignore might be true
|
||||
if (!_ignore) _parsingInformation->topicBuffer[_bytePosition - 2] = currentByte;
|
||||
if (_bytePosition == 2 + _topicLength - 1 && _qos == 0) {
|
||||
_preparePayloadHandling(_parsingInformation->remainingLength - (_bytePosition + 1));
|
||||
return;
|
||||
}
|
||||
} else if (_bytePosition == 2 + _topicLength) {
|
||||
_packetIdMsb = currentByte;
|
||||
} else {
|
||||
_packetId = currentByte | _packetIdMsb << 8;
|
||||
_preparePayloadHandling(_parsingInformation->remainingLength - (_bytePosition + 1));
|
||||
}
|
||||
_bytePosition++;
|
||||
}
|
||||
|
||||
void PublishPacket::_preparePayloadHandling(uint32_t payloadLength) {
|
||||
_payloadLength = payloadLength;
|
||||
if (payloadLength == 0) {
|
||||
_parsingInformation->bufferState = BufferState::NONE;
|
||||
if (!_ignore) {
|
||||
_dataCallback(_parsingInformation->topicBuffer, nullptr, _qos, _dup, _retain, 0, 0, 0, _packetId);
|
||||
_completeCallback(_packetId, _qos);
|
||||
}
|
||||
} else {
|
||||
_parsingInformation->bufferState = BufferState::PAYLOAD;
|
||||
}
|
||||
}
|
||||
|
||||
void PublishPacket::parsePayload(char* data, size_t len, size_t* currentBytePosition) {
|
||||
size_t remainToRead = len - (*currentBytePosition);
|
||||
if (_payloadBytesRead + remainToRead > _payloadLength) remainToRead = _payloadLength - _payloadBytesRead;
|
||||
|
||||
if (!_ignore) _dataCallback(_parsingInformation->topicBuffer, data + (*currentBytePosition), _qos, _dup, _retain, remainToRead, _payloadBytesRead, _payloadLength, _packetId);
|
||||
_payloadBytesRead += remainToRead;
|
||||
(*currentBytePosition) += remainToRead;
|
||||
|
||||
if (_payloadBytesRead == _payloadLength) {
|
||||
_parsingInformation->bufferState = BufferState::NONE;
|
||||
if (!_ignore) _completeCallback(_packetId, _qos);
|
||||
}
|
||||
}
|
@@ -0,0 +1,38 @@
|
||||
#pragma once
|
||||
|
||||
#include "Arduino.h"
|
||||
#include "Packet.hpp"
|
||||
#include "../Flags.hpp"
|
||||
#include "../ParsingInformation.hpp"
|
||||
#include "../Callbacks.hpp"
|
||||
|
||||
namespace AsyncMqttClientInternals {
|
||||
class PublishPacket : public Packet {
|
||||
public:
|
||||
explicit PublishPacket(ParsingInformation* parsingInformation, OnMessageInternalCallback dataCallback, OnPublishInternalCallback completeCallback);
|
||||
~PublishPacket();
|
||||
|
||||
void parseVariableHeader(char* data, size_t len, size_t* currentBytePosition);
|
||||
void parsePayload(char* data, size_t len, size_t* currentBytePosition);
|
||||
|
||||
private:
|
||||
ParsingInformation* _parsingInformation;
|
||||
OnMessageInternalCallback _dataCallback;
|
||||
OnPublishInternalCallback _completeCallback;
|
||||
|
||||
void _preparePayloadHandling(uint32_t payloadLength);
|
||||
|
||||
bool _dup;
|
||||
uint8_t _qos;
|
||||
bool _retain;
|
||||
|
||||
uint8_t _bytePosition;
|
||||
char _topicLengthMsb;
|
||||
uint16_t _topicLength;
|
||||
bool _ignore;
|
||||
char _packetIdMsb;
|
||||
uint16_t _packetId;
|
||||
uint32_t _payloadLength;
|
||||
uint32_t _payloadBytesRead;
|
||||
};
|
||||
} // namespace AsyncMqttClientInternals
|
@@ -0,0 +1,46 @@
|
||||
#include "SubAckPacket.hpp"
|
||||
|
||||
using AsyncMqttClientInternals::SubAckPacket;
|
||||
|
||||
SubAckPacket::SubAckPacket(ParsingInformation* parsingInformation, OnSubAckInternalCallback callback)
|
||||
: _parsingInformation(parsingInformation)
|
||||
, _callback(callback)
|
||||
, _bytePosition(0)
|
||||
, _packetIdMsb(0)
|
||||
, _packetId(0) {
|
||||
}
|
||||
|
||||
SubAckPacket::~SubAckPacket() {
|
||||
}
|
||||
|
||||
void SubAckPacket::parseVariableHeader(char* data, size_t len, size_t* currentBytePosition) {
|
||||
char currentByte = data[(*currentBytePosition)++];
|
||||
if (_bytePosition++ == 0) {
|
||||
_packetIdMsb = currentByte;
|
||||
} else {
|
||||
_packetId = currentByte | _packetIdMsb << 8;
|
||||
_parsingInformation->bufferState = BufferState::PAYLOAD;
|
||||
}
|
||||
}
|
||||
|
||||
void SubAckPacket::parsePayload(char* data, size_t len, size_t* currentBytePosition) {
|
||||
char status = data[(*currentBytePosition)++];
|
||||
|
||||
/* switch (status) {
|
||||
case 0:
|
||||
Serial.println("Success QoS 0");
|
||||
break;
|
||||
case 1:
|
||||
Serial.println("Success QoS 1");
|
||||
break;
|
||||
case 2:
|
||||
Serial.println("Success QoS 2");
|
||||
break;
|
||||
case 0x80:
|
||||
Serial.println("Failure");
|
||||
break;
|
||||
} */
|
||||
|
||||
_parsingInformation->bufferState = BufferState::NONE;
|
||||
_callback(_packetId, status);
|
||||
}
|
@@ -0,0 +1,25 @@
|
||||
#pragma once
|
||||
|
||||
#include "Arduino.h"
|
||||
#include "Packet.hpp"
|
||||
#include "../ParsingInformation.hpp"
|
||||
#include "../Callbacks.hpp"
|
||||
|
||||
namespace AsyncMqttClientInternals {
|
||||
class SubAckPacket : public Packet {
|
||||
public:
|
||||
explicit SubAckPacket(ParsingInformation* parsingInformation, OnSubAckInternalCallback callback);
|
||||
~SubAckPacket();
|
||||
|
||||
void parseVariableHeader(char* data, size_t len, size_t* currentBytePosition);
|
||||
void parsePayload(char* data, size_t len, size_t* currentBytePosition);
|
||||
|
||||
private:
|
||||
ParsingInformation* _parsingInformation;
|
||||
OnSubAckInternalCallback _callback;
|
||||
|
||||
uint8_t _bytePosition;
|
||||
char _packetIdMsb;
|
||||
uint16_t _packetId;
|
||||
};
|
||||
} // namespace AsyncMqttClientInternals
|
@@ -0,0 +1,30 @@
|
||||
#include "UnsubAckPacket.hpp"
|
||||
|
||||
using AsyncMqttClientInternals::UnsubAckPacket;
|
||||
|
||||
UnsubAckPacket::UnsubAckPacket(ParsingInformation* parsingInformation, OnUnsubAckInternalCallback callback)
|
||||
: _parsingInformation(parsingInformation)
|
||||
, _callback(callback)
|
||||
, _bytePosition(0)
|
||||
, _packetIdMsb(0)
|
||||
, _packetId(0) {
|
||||
}
|
||||
|
||||
UnsubAckPacket::~UnsubAckPacket() {
|
||||
}
|
||||
|
||||
void UnsubAckPacket::parseVariableHeader(char* data, size_t len, size_t* currentBytePosition) {
|
||||
char currentByte = data[(*currentBytePosition)++];
|
||||
if (_bytePosition++ == 0) {
|
||||
_packetIdMsb = currentByte;
|
||||
} else {
|
||||
_packetId = currentByte | _packetIdMsb << 8;
|
||||
_parsingInformation->bufferState = BufferState::NONE;
|
||||
_callback(_packetId);
|
||||
}
|
||||
}
|
||||
|
||||
void UnsubAckPacket::parsePayload(char* data, size_t len, size_t* currentBytePosition) {
|
||||
(void)data;
|
||||
(void)currentBytePosition;
|
||||
}
|
@@ -0,0 +1,25 @@
|
||||
#pragma once
|
||||
|
||||
#include "Arduino.h"
|
||||
#include "Packet.hpp"
|
||||
#include "../ParsingInformation.hpp"
|
||||
#include "../Callbacks.hpp"
|
||||
|
||||
namespace AsyncMqttClientInternals {
|
||||
class UnsubAckPacket : public Packet {
|
||||
public:
|
||||
explicit UnsubAckPacket(ParsingInformation* parsingInformation, OnUnsubAckInternalCallback callback);
|
||||
~UnsubAckPacket();
|
||||
|
||||
void parseVariableHeader(char* data, size_t len, size_t* currentBytePosition);
|
||||
void parsePayload(char* data, size_t len, size_t* currentBytePosition);
|
||||
|
||||
private:
|
||||
ParsingInformation* _parsingInformation;
|
||||
OnUnsubAckInternalCallback _callback;
|
||||
|
||||
uint8_t _bytePosition;
|
||||
char _packetIdMsb;
|
||||
uint16_t _packetId;
|
||||
};
|
||||
} // namespace AsyncMqttClientInternals
|
@@ -0,0 +1,21 @@
|
||||
#pragma once
|
||||
|
||||
namespace AsyncMqttClientInternals {
|
||||
enum class BufferState : uint8_t {
|
||||
NONE = 0,
|
||||
REMAINING_LENGTH = 2,
|
||||
VARIABLE_HEADER = 3,
|
||||
PAYLOAD = 4
|
||||
};
|
||||
|
||||
struct ParsingInformation {
|
||||
BufferState bufferState;
|
||||
|
||||
uint16_t maxTopicLength;
|
||||
char* topicBuffer;
|
||||
|
||||
uint8_t packetType;
|
||||
uint16_t packetFlags;
|
||||
uint32_t remainingLength;
|
||||
};
|
||||
} // namespace AsyncMqttClientInternals
|
13
libraries/async-mqtt-client/src/AsyncMqttClient/Storage.hpp
Normal file
13
libraries/async-mqtt-client/src/AsyncMqttClient/Storage.hpp
Normal file
@@ -0,0 +1,13 @@
|
||||
#pragma once
|
||||
|
||||
namespace AsyncMqttClientInternals {
|
||||
struct PendingPubRel {
|
||||
uint16_t packetId;
|
||||
};
|
||||
|
||||
struct PendingAck {
|
||||
uint8_t packetType;
|
||||
uint8_t headerFlag;
|
||||
uint16_t packetId;
|
||||
};
|
||||
} // namespace AsyncMqttClientInternals
|
Reference in New Issue
Block a user