MQTT(Message Queuing Telemetry Transport) 是一種基于客戶端服務(wù)端架構(gòu)的發(fā)布/訂閱模式的消息傳輸協(xié)議。它的設(shè)計(jì)思想是輕巧、開(kāi)放、 簡(jiǎn)單、規(guī)范,易于實(shí)現(xiàn)。這些特點(diǎn)使得它對(duì)很多場(chǎng)景來(lái)說(shuō)都是很好的選擇,特別是對(duì)于受限的環(huán)境如機(jī)器與機(jī)器的通信(M2M)以及物聯(lián)網(wǎng)環(huán)境(IoT)。
MQTT 最大的優(yōu)點(diǎn)在于,可以以極少的代碼和有限的帶寬,為連接遠(yuǎn)程設(shè)備提供實(shí)時(shí)可靠的消息服務(wù)。而作為一種低開(kāi)銷、低帶寬占用的即時(shí)通訊協(xié)議,也使其在物聯(lián)網(wǎng)、小型設(shè)備、移動(dòng)應(yīng)用等方面有較廣泛的應(yīng)用。在MQTT中分為客戶端和服務(wù)器端,客戶端(MQTT Client)可以發(fā)布和訂閱消息,而服務(wù)器端(MQTT Broker)則對(duì)多個(gè)客戶端發(fā)布或訂閱的各種消息做相應(yīng)的處理轉(zhuǎn)發(fā),如下圖所示:
在MQTT官網(wǎng)就有詳細(xì)的中文文檔說(shuō)明,因此在這里我們就不過(guò)多介紹MQTT協(xié)議本身了,有興趣的客戶可以參考資料:https://blog.mcxiaoke.com/mqtt/protocol/MQTT-3.1.1-CN.pdf。
英創(chuàng)公司在Linux主板上移植了Mosquitto開(kāi)源軟件,以支持基于MQTT的各種應(yīng)用。Mosquitto是一款輕量級(jí)的開(kāi)源軟件,適用于低功耗設(shè)備使用,實(shí)現(xiàn)了MQTT 協(xié)議versions 5.0,3.1.1和3.1,因此非常適合在嵌入式設(shè)備上使用。通過(guò)Mosquitto軟件,Linux主板可以作為MQTT的客戶端,對(duì)消息進(jìn)行發(fā)布和訂閱,也可以作為MQTT的服務(wù)器端,來(lái)處理客戶端發(fā)布和訂閱的消息。
當(dāng)英創(chuàng)Linux主板作為服務(wù)器端時(shí),可以通過(guò)命令mosquitto -c /etc/mosquitto/mosquitto.conf來(lái)啟動(dòng)服務(wù),此時(shí)客戶端的設(shè)備就都可以與服務(wù)器建立連接,進(jìn)行對(duì)消息的發(fā)布和訂閱。在/etc/mosquitto/mosquitto.conf文件中有MQTT服務(wù)器的一些配置選項(xiàng),用戶可以根據(jù)需求修改,具體的說(shuō)明可以參考https://mosquitto.org/man/mosquitto-conf-5.html。在英創(chuàng)Linux主板中,采用了systemd來(lái)管理服務(wù),在配置好后,systemd會(huì)在系統(tǒng)啟動(dòng)完成后,自動(dòng)啟動(dòng)mosquitto服務(wù),不需要用戶在手動(dòng)去執(zhí)行命令了。如果對(duì)啟動(dòng)服務(wù)有特殊的需求,可以通過(guò)systemd的標(biāo)準(zhǔn)命令來(lái)控制服務(wù)的啟動(dòng),常用的命令如下表:
命令 | 作用 |
systemctl disable mosquitto | 關(guān)閉開(kāi)機(jī)自啟動(dòng)服務(wù) |
systemctl enable mosquitto | 開(kāi)啟開(kāi)機(jī)自啟動(dòng)服務(wù) |
systemctl stop mosquitto | 停止服務(wù) |
systemctl start mosquitto | 啟動(dòng)服務(wù) |
systemctl restart mosquitto | 重啟服務(wù) |
當(dāng)英創(chuàng)Linux主板作為客戶端時(shí),需要先連接MQTT服務(wù)器,然后就可以發(fā)布或者訂閱消息了。對(duì)應(yīng)的功能可以通過(guò)Mosquitto軟件提供的現(xiàn)成工具來(lái)測(cè)試,mosquitto_pub工具用來(lái)發(fā)布消息而mosquitto_sub工具用于訂閱消息。下面我們?cè)诒镜販y(cè)試通過(guò)MQTT協(xié)議來(lái)發(fā)布和訂閱消息,主板中mosquitto服務(wù)是自動(dòng)啟動(dòng)的,所以連接到自身啟動(dòng)的MQTT服務(wù)器(Broker)就可以做本地測(cè)試,首先是訂閱主題為test的消息,通過(guò)mosquitto_sub工具實(shí)現(xiàn)就能實(shí)現(xiàn)這個(gè)功能,在默認(rèn)情況下mosquitto_sub工具會(huì)連接本地的MQTT服務(wù)器(Broker),也可以通過(guò)參數(shù)指定服務(wù)器地址,具體的用法介紹可以參考https://mosquitto.org/man/mosquitto_sub-1.html,測(cè)試命令如下圖:
然后我們通過(guò)mosquitto_pub工具來(lái)向test主題發(fā)布一個(gè)消息,內(nèi)容為”Hello World”。同樣默認(rèn)情況下mosquitto_pub工具會(huì)連接本地的MQTT服務(wù)器(Broker),也可以通過(guò)參數(shù)指定服務(wù)器地址,具體的用法介紹可以參考https://mosquitto.org/man/mosquitto_pub-1.html,測(cè)試命令如下圖:
此時(shí),通過(guò)mosquitto_sub工具訂閱消息的終端就會(huì)收到剛剛發(fā)布的消息:
另外也可以通過(guò)C程序調(diào)用Mosquitto軟件庫(kù)中提供的API來(lái)進(jìn)行消息的發(fā)布和訂閱。關(guān)于API的介紹,可以參考Mosquitto軟件的官方文檔,里面有詳細(xì)全面的介紹:https://mosquitto.org/api/files/mosquitto-h.html。英創(chuàng)公司基于這些API實(shí)現(xiàn)了兩個(gè)簡(jiǎn)單的基本例程,一個(gè)用于發(fā)布消息,一個(gè)用于訂閱消息,可以供用戶參考。
訂閱消息例程的主要代碼如下:
// 初始化mosquitto庫(kù)
ret = mosquitto_lib_init();
if(ret){
printf("Init lib error!\n");
return -1;
}
// 創(chuàng)建一個(gè)訂閱端實(shí)例,名稱為sub_test
mosq = mosquitto_new("sub_test", true, NULL);
if(mosq == NULL){
printf("New test error!\n");
mosquitto_lib_cleanup();
return -1;
}
// 設(shè)置回調(diào)函數(shù),在connect的回調(diào)函數(shù)中設(shè)置訂閱的主題(test)
mosquitto_connect_callback_set(mosq, my_connect_callback);
// 設(shè)置回調(diào)函數(shù),這里都是一些簡(jiǎn)單的打印信息
mosquitto_disconnect_callback_set(mosq, my_disconnect_callback);
mosquitto_subscribe_callback_set(mosq, my_subscribe_callback);
mosquitto_message_callback_set(mosq, my_message_callback);
// 連接至本地MQTT服務(wù)器
ret = mosquitto_connect(mosq, "localhost", 1883, KEEP_ALIVE);
if(ret){
printf("Connect server error!\n");
mosquitto_destroy(mosq);
mosquitto_lib_cleanup();
return -1;
}
// 開(kāi)始通信:循環(huán)執(zhí)行、直到運(yùn)行標(biāo)志running被改變
while(running)
{
mosquitto_loop(mosq, -1, 1);
}
// 結(jié)束后的清理工作
mosquitto_destroy(mosq);
mosquitto_lib_cleanup();
發(fā)布消息例程的主要代碼如下:
//初始化libmosquitto庫(kù)
ret = mosquitto_lib_init();
if(ret){
printf("Init lib error!\n");
return -1;
}
//創(chuàng)建一個(gè)發(fā)布端實(shí)例,名稱為pub_test
mosq = mosquitto_new("pub_test", true, NULL);
if(mosq == NULL){
printf("New pub_test error!\n");
mosquitto_lib_cleanup();
return -1;
}
//設(shè)置回調(diào)函數(shù),這里都是簡(jiǎn)單的打印信息
mosquitto_connect_callback_set(mosq, my_connect_callback);
mosquitto_disconnect_callback_set(mosq, my_disconnect_callback);
mosquitto_publish_callback_set(mosq, my_publish_callback);
// 連接至本地MQTT服務(wù)器
ret = mosquitto_connect(mosq, "localhost", 1883, KEEP_ALIVE);
if(ret){
printf("Connect server error!\n");
mosquitto_destroy(mosq);
mosquitto_lib_cleanup();
return -1;
}
//mosquitto_loop_start作用是開(kāi)啟一個(gè)線程,在線程里不停的調(diào)用 mosquitto_loop() 來(lái)處理網(wǎng)絡(luò)信息
int loop = mosquitto_loop_start(mosq);
if(loop != MOSQ_ERR_SUCCESS)
{
printf("mosquitto loop error\n");
return 1;
}
while(fgets(buff, MSG_MAX_SIZE, stdin) != NULL)
{
/* 發(fā)布消息,這里是將fget獲取到的內(nèi)容,作為test主題的消息發(fā)布出去 */
mosquitto_publish(mosq,NULL,"test",strlen(buff)+1,buff,0,0);
memset(buff,0,sizeof(buff));
}
mosquitto_destroy(mosq);
mosquitto_lib_cleanup();
我們運(yùn)行測(cè)試?yán)?,發(fā)送主題為test的消息,消息內(nèi)容為emtronix,如下圖:
同時(shí)在另一個(gè)終端中運(yùn)行訂閱消息的例程,訂閱主題為test的消息,當(dāng)發(fā)布了消息后,例程就會(huì)收到消息并打印出來(lái):
感興趣的客戶可以和英創(chuàng)公司的工程師聯(lián)系,索取相關(guān)的資料和例程。
成都英創(chuàng)信息技術(shù)有限公司 028-8618 0660