1、MQTT是什么
MQTT(Message Queuing Telemetry Transport)是一種輕量級的消息協議,設計用于在低帶寬、不穩定或高延遲的網絡環境中傳輸消息。MQTT最初由IBM開發,后來成為OASIS標準,廣泛用于物聯網(IoT)應用和其他需要高效消息傳遞的場景。
MQTT是輕量級基于代理的發布/訂閱的消息傳輸協議,它可以通過很少的代碼和帶寬和遠程設備連接。例如通過衛星和代理連接,通過撥號和醫療保健提供者連接,以及在一些自動化或小型設備上,而且由于小巧,省電,協議開銷小和能高效的向一和多個接收者傳遞信息,故同樣適用于移動應用設備上。
- 早在1999年,IBM的Andy Stanford-Clark博士以及Arcom公司ArlenNipper博士發明了MQTT(Message Queuing Telemetry Transport,消息隊列遙測傳輸)技術 。
- MQTT的話題是他們在談論開源物聯網平臺Pachube時提到的。Stanford-Clark認為Pachube很酷,其不足之處是不具備真正的推送功能。你需要不斷的進行輪詢才能得到即時數據。這正是MQTT能夠實現的,他提到了使用推送通信系統的石油管道檢測系統。
2、MQTT協議的關鍵概念和特性
以下是MQTT協議的一些關鍵概念和特性:
1. 發布/訂閱模型:
MQTT采用發布/訂閱模型,消息的發送者稱為發布者(Publisher),而消息的接收者稱為訂閱者(Subscriber)。發布者將消息發布到主題(Topic),而訂閱者可以選擇訂閱特定主題以接收相關消息。
2. 主題(Topic):
主題是MQTT中消息的分類標識,用于將消息發送到特定的目標。訂閱者可以通過訂閱特定主題來接收與該主題相關的消息。主題可以是層次結構的,例如:home/living-room/temperature
,其中home
是根主題,living-room
是home
的子主題,temperature
是living-room
的子主題。
3. 消息質量等級(QoS):
MQTT支持不同的消息質量等級,用于確保消息傳遞的可靠性。共有三個等級:
- QoS 0: 最多一次傳遞,消息可能會丟失或重復。
- QoS 1: 至少一次傳遞,確保消息至少被接收一次。
- QoS 2: 剛好一次傳遞,確保消息僅被接收一次。
4. 保留消息:
發布者可以發送保留消息,這是一個持久的消息,當有新訂閱者訂閱與保留消息相匹配的主題時,將立即發送該消息。這對于傳遞重要信息或者初始化狀態很有用。
5. 遺囑消息(Will Message):
連接到MQTT代理的客戶端可以指定一個遺囑消息。如果客戶端非正常斷開連接,代理將自動發布遺囑消息到預定的主題。這可用于通知其他客戶端某個設備的狀態變化。
6. 保持活動性(Keep Alive):
MQTT使用保持活動性機制來確保客戶端與代理之間的連接保持活動。客戶端會定期向代理發送保持活動性的消息,如果代理在指定的時間內未收到客戶端的消息,將關閉連接。
7. 連接過程:
MQTT的連接過程包括客戶端向代理發送連接請求、代理響應并確認連接、客戶端發送連接信息、代理確認連接信息。在這個過程中,客戶端和代理之間會協商使用的MQTT版本、連接的用戶名和密碼、保持活動性時間等。
8. 安全性:
MQTT本身并沒有內建的安全性機制,但可以通過TLS/SSL進行加密傳輸。此外,可以通過用戶名和密碼進行身份驗證,以及通過訪問控制列表(ACL)限制客戶端的訪問權限。
3、MQTT客戶端代碼示例
實現一個完整的MQTT協議是一個龐大而復雜的任務,因為MQTT涉及到多個方面,包括連接、發布/訂閱、消息質量等級等。下面是一個簡化的C++實現的示例,用于建立一個基本的MQTT客戶端,演示連接到MQTT代理服務去發布/訂閱消息。
該示例使用了 Eclipse Paho MQTT C++ 客戶端庫,該庫提供了MQTT協議的C++實現。有興趣的小獲取可以去官網了解一下。
首先,確保你已經安裝了 Paho MQTT C++ 客戶端庫。可以從Paho官網獲取該庫。
接下來,以下是一個簡單的C++示例:
#include <iostream>
#include <mqtt/async_client.h>
class MQTTCallbacks : public virtual mqtt::callback, public virtual mqtt::iaction_listener {
void connectionLost(const std::string& cause) override {
std::cout << "Connection lost: " << cause << std::endl;
}
void deliveryComplete(mqtt::delivery_token_ptr tok) override {
std::cout << "Message delivered" << std::endl;
}
void onSuccess(const mqtt::token& tok) override {
std::cout << "Action successful" << std::endl;
}
void onFailure(const mqtt::token& tok) override {
std::cout << "Action failed" << std::endl;
}
};
int main() {
const std::string brokerAddress = "tcp://localhost:1883";
const std::string clientId = "cpp_mqtt_client";
const std::string topic = "test/topic";
mqtt::async_client client(brokerAddress, clientId);
MQTTCallbacks callbacks;
client.set_callback(callbacks);
mqtt::connect_options connOpts;
connOpts.set_keep_alive_interval(20);
connOpts.set_clean_session(true);
try {
std::cout << "Connecting to the broker..." << std::endl;
client.connect(connOpts)->wait();
std::cout << "Connected!" << std::endl;
std::string payload = "Hello, MQTT!";
mqtt::message_ptr pubmsg = mqtt::make_message(topic, payload);
pubmsg->set_qos(1);
client.publish(pubmsg)->wait();
std::cout << "Message published" << std::endl;
client.subscribe(topic, 1)->wait();
while (true) {
// Continue to keep the program running
std::this_thread::sleep_for(std::chrono::seconds(1));
}
client.unsubscribe(topic)->wait();
client.disconnect()->wait();
} catch (const mqtt::exception& exc) {
std::cerr << "Error: " << exc.what() << std::endl;
return 1;
}
return 0;
}
該示例假定了本地運行的MQTT代理地址為 tcp://localhost:1883
,客戶端連接后發布了一條消息到主題 test/topic
,并訂閱了該主題。實際開發中我們需要修改 brokerAddress
和其他參數。
需要說明的是這只是一個簡單的示例,實際上MQTT的實現要更加復雜,需要處理連接丟失、重連、消息質量等級等情況。在實際應用中,建議使用現成的MQTT客戶端庫,以確保正確性和穩定性。
小結
MQTT在實際開發中需要學習的內容有很多,而且很多細節需要注意。希望正在學習MQTT協議的小伙伴通過本節內容對MQTT協議有個初步的認識。
題外話:學習技術是一個慢慢沉淀的過程。不積跬步無以至千里,技術同樣如此。希望小伙伴們在以后的技術路上堅持下去,并到達一片開闊的境地。