【MQTT】paho.mqtt.c 库的“介绍、下载、交叉编译” 详解,以及编写MQTT客户端例子源码
😁博客主页😁:🚀https://blog.csdn.net/wkd_007🚀
🤑博客内容🤑:🍭嵌入式开发、Linux、C语言、C++、数据结构、音视频🍭
⏰发布时间⏰:2024-05-13 12:43:09
本文未经允许,不得转发!!!
目录
- 🎄一、概述
- 🎄二、paho.mqtt.c库介绍
- ✨2.1 异步的paho.mqtt.c库(Asynchronous MQTT client library for C)
- ✨2.2 同步的paho.mqtt.c库(MQTT Client library for C)
- ✨2.3 异步和同步库的区别(Asynchronous vs synchronous client applications)
- 🎄三、paho.mqtt.c 库的下载、交叉编译
- ✨3.1 openssl 的下载、交叉编译
- ✨3.2 paho.mqtt.c 交叉编译
- 🎄四、paho.mqtt.c库写一个MQTT客户端
- ✨4.1 订阅——MQTTAsync_subscribe.c
- ✨4.2 发布——MQTTAsync_publish.c
- 🎄五、总结
🎄一、概述
上篇文章 【MQTT】mosquitto 的 “下载、交叉编译、使用” 详细教程,手把手搭建一个MQTT Broker ,介绍了怎样搭建一个MQTT Broker(代理)。MQTT Broker(代理)的搭建很重要,一般很少自己开发一个MQTT Broker,而是搭建好一个开源的MQTT Broker(代理),然后其他的设备或机器都作为MQTT客户端,让MQTT Broker来转发消息。
关于MQTT协议的,作为编程人员更多的是开发MQTT的客户端,本文介绍的paho.mqtt.c就是开发MQTT客户端常用的开源库之一。paho.mqtt.c 是 Eclipse Paho 项目的一个开源库。
Eclipse Paho项目提供了以各种编程语言实现的MQTT和MQTT-SN的开源库(主要是客户端)。下面是这个项目的MQTT客户端开源库,想要了解更多Eclipse Paho项目的资料,可以到其官网:https://eclipse.dev/paho/
🎄二、paho.mqtt.c库介绍
paho.mqtt.c库编译后会生成四个动态库,:
- libpaho-mqtt3a.so:异步的paho.mqtt.c库
- libpaho-mqtt3as.so:异步的、使用了SSL的paho.mqtt.c库
- libpaho-mqtt3c.so:经典的、同步的paho.mqtt.c库
- libpaho-mqtt3cs.so:经典的、同步的、使用了SSL的paho.mqtt.c库
✨2.1 异步的paho.mqtt.c库(Asynchronous MQTT client library for C)
关于异步的paho.mqtt.c库可以参考其官网文章:Asynchronous MQTT client library for C
这里只摘要了使用方法,异步的paho.mqtt.c库有如下相似的使用框架:
- 1.创建一个客户端对象;
- 2.设置连接MQTT服务器的选项;
- 3.设置回调函数;
- 4.将客户端和服务器连接;
- 5.订阅客户端需要接收的所有话题;
- 6.重复以下操作直到结束:
- a.发布客户端需要的任意信息;
- b.处理所有接收到的信息;
- 7.断开客户端连接;
- 8.释放客户端使用的所有内存。
✨2.2 同步的paho.mqtt.c库(MQTT Client library for C)
关于异步的paho.mqtt.c库可以参考其官网文章:MQTT Client library for C
这里只摘要了使用方法,同步的paho.mqtt.c库有如下相似的使用框架:
- 1.创建一个客户端对象;
- 2.设置连接MQTT服务器的选项;
- 3.如果要使用多线程(异步模式)操作,则调用 MQTTClient_setCallbacks() 设置回调函数;
- 4.订阅客户端需要接收的任意话题;
- 5.重复以下操作直到结束:
- a.发布客户端需要的任意信息;
- b.处理所有接收到的信息;
- 6.断开客户端连接;
- 7.释放客户端使用的所有内存。
✨2.3 异步和同步库的区别(Asynchronous vs synchronous client applications)
下面是译文,原文地址:Asynchronous vs synchronous client applications
客户端库支持两种操作模式。这些模式被称为同步模式和异步模式。如果您的应用程序调用 MQTTClient_setCallbacks(),这将使客户端进入异步模式,否则它将以同步模式运行。
在同步模式下,客户端应用程序在单个线程上运行。消息是使用 MQTTClient_publish() 和 MQTTClient_publishMessage() 函数发布的。要确定QoS1或QoS2(请参阅Quality of service)消息已成功传递,应用程序必须调用 MQTTClient_waitForCompletion() 函数。Synchronous publication example 中展示了一个同步发布的示例。在同步模式下接收消息使用MQTTClient_receive()函数。客户端应用程序必须相对频繁地调用MQTTClient_ereceived()或MQTTClient_yield(),以便允许(执行)确认和保持与服务器的网络连接的 MQTT “ping” 操作。
在异步模式下,客户端应用程序在多个线程上运行。主程序像同步模式一样地调用客户端库中的函数来发布和订阅。然而,握手和保持网络连接的处理是在后台执行的。通过调用 MQTTClient_setCallbacks(),使用在库中注册的回调将状态和消息接收通知提供给客户端应用程序(请参阅 MQTTClient_messageArrived()、MQTTClient_connectionLost() 和MQTTClient_edeliveryComplete() )。然而,这个API不是线程安全的——在没有同步的情况下,不可能从多个线程调用它。您可以使用MQTTAsync API来实现这一点。
🎄三、paho.mqtt.c 库的下载、交叉编译
本文下载的是paho.mqtt.c-1.3.13.tar.gz,下载地址:https://github.com/eclipse/paho.mqtt.c/archive/refs/tags/v1.3.13.tar.gz
编译 paho.mqtt.c 库之前,需要先编译其依赖库:openssl。
✨3.1 openssl 的下载、交叉编译
本文下载的是 openssl-OpenSSL_1_1_1g.tar.gz,
下载地址:https://codeload.github.com/openssl/openssl/tar.gz/refs/tags/OpenSSL_1_1_1g
为什么使用这么旧的版本,因为这个我之前编译过,而且使用没问题。
编译步骤:
-
1、解压缩
tar zxf openssl-OpenSSL_1_1_1g.tar.gz
-
2、进入目录,并配置输出目录和交叉编译器, (linux-generic32表示是32位操作系统,个别文章加了这个选项就不用去掉 -m64,我这里行不通)
cd openssl-OpenSSL_1_1_1g/ ./config no-asm shared no-async --prefix=`pwd`/ssl_result --cross-compile-prefix=aarch64-mix210-linux-
-
3、执行下面命令,删除Makefile文件的 -m64,
sed -i 's/-m64//' Makefile
执行后,可以避免出现这个编译错误:aarch64-mix210-linux-gcc: error: unrecognized command line option '-m64'
-
4、编译、安装
make && make install
成功编译后,在openssl-OpenSSL_1_1_1g/目录会生成一个ssl_result目录,可以看到里面生成的库:
✨3.2 paho.mqtt.c 交叉编译
编译步骤:
-
1、解压缩,创建要安装目录paho.mqtt.c_result
tar zxf paho.mqtt.c-1.3.13.tar.gz mkdir paho.mqtt.c_result/bin -p mkdir paho.mqtt.c_result/include -p mkdir paho.mqtt.c_result/lib -p mkdir paho.mqtt.c_result/share/man/man1 -p
-
2、进入目录,交叉编译
cd paho.mqtt.c-1.3.13/ make CC=aarch64-mix210-linux-gcc CFLAGS:="-I `pwd`/../ssl_result/include" LDFLAGS:="-L `pwd`/../ssl_result/lib"
CFLAGS:=“-I `pwd`/…/ssl_result/include”:指定前面编译的 openssl 的头文件;
LDFLAGS:=“-L `pwd`/…/ssl_result/lib”:指定前面编译的 openssl 的库文件路径;
-
3、make install,安装编译结果
make install prefix=`pwd`/../paho.mqtt.c_result
prefix=`pwd`/…/paho.mqtt.c_result :指定安装目录路径;
编译完成后,会生成 目录,内容如下:
🎄四、paho.mqtt.c库写一个MQTT客户端
下面提供两个使用了 paho.mqtt.c库(libpaho-mqtt3a) MQTT客户端的例子。
运行下面代码前,要先搭建一个MQTT Broker,参考上篇文章:【MQTT】mosquitto 的 “下载、交叉编译、使用” 详细教程,手把手搭建一个MQTT Broker
✨4.1 订阅——MQTTAsync_subscribe.c
这是使用了 libpaho-mqtt3a.so 进行订阅消息的源码,源码路径在源码的这个路径:paho.mqtt.c-1.3.13/src/samples/MQTTAsync_subscribe.c,只更改了服务器地址。完整代码如下:
/******************************************************************************* * Copyright (c) 2012, 2022 IBM Corp., Ian Craggs * * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v2.0 * and Eclipse Distribution License v1.0 which accompany this distribution. * * The Eclipse Public License is available at * https://www.eclipse.org/legal/epl-2.0/ * and the Eclipse Distribution License is available at * http://www.eclipse.org/org/documents/edl-v10.php. * * Contributors: * Ian Craggs - initial contribution *******************************************************************************/ #include #include #include #include "MQTTAsync.h" #if !defined(_WIN32) #include #else #include #endif #if defined(_WRS_KERNEL) #include #endif #define ADDRESS "192.168.3.227:1883" #define CLIENTID "ExampleClientSub" #define TOPIC "MQTT Examples" #define PAYLOAD "Hello World!" #define QOS 1 #define TIMEOUT 10000L int disc_finished = 0; int subscribed = 0; int finished = 0; void onConnect(void* context, MQTTAsync_successData* response); void onConnectFailure(void* context, MQTTAsync_failureData* response); void connlost(void *context, char *cause) { MQTTAsync client = (MQTTAsync)context; MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer; int rc; printf("\nConnection lost\n"); if (cause) printf(" cause: %s\n", cause); printf("Reconnecting\n"); conn_opts.keepAliveInterval = 20; conn_opts.cleansession = 1; conn_opts.onSuccess = onConnect; conn_opts.onFailure = onConnectFailure; if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS) { printf("Failed to start connect, return code %d\n", rc); finished = 1; } } int msgarrvd(void *context, char *topicName, int topicLen, MQTTAsync_message *message) { printf("Message arrived\n"); printf(" topic: %s\n", topicName); printf(" message: %.*s\n", message->payloadlen, (char*)message->payload); MQTTAsync_freeMessage(&message); MQTTAsync_free(topicName); return 1; } void onDisconnectFailure(void* context, MQTTAsync_failureData* response) { printf("Disconnect failed, rc %d\n", response->code); disc_finished = 1; } void onDisconnect(void* context, MQTTAsync_successData* response) { printf("Successful disconnection\n"); disc_finished = 1; } void onSubscribe(void* context, MQTTAsync_successData* response) { printf("Subscribe succeeded\n"); subscribed = 1; } void onSubscribeFailure(void* context, MQTTAsync_failureData* response) { printf("Subscribe failed, rc %d\n", response->code); finished = 1; } void onConnectFailure(void* context, MQTTAsync_failureData* response) { printf("Connect failed, rc %d\n", response->code); finished = 1; } void onConnect(void* context, MQTTAsync_successData* response) { MQTTAsync client = (MQTTAsync)context; MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer; int rc; printf("Successful connection\n"); printf("Subscribing to topic %s\nfor client %s using QoS%d\n\n" "Press Q to quit\n\n", TOPIC, CLIENTID, QOS); opts.onSuccess = onSubscribe; opts.onFailure = onSubscribeFailure; opts.context = client; if ((rc = MQTTAsync_subscribe(client, TOPIC, QOS, &opts)) != MQTTASYNC_SUCCESS) { printf("Failed to start subscribe, return code %d\n", rc); finished = 1; } } int main(int argc, char* argv[]) { MQTTAsync client; MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer; MQTTAsync_disconnectOptions disc_opts = MQTTAsync_disconnectOptions_initializer; int rc; int ch; if ((rc = MQTTAsync_create(&client, ADDRESS, CLIENTID, MQTTCLIENT_PERSISTENCE_NONE, NULL)) != MQTTASYNC_SUCCESS) { printf("Failed to create client, return code %d\n", rc); rc = EXIT_FAILURE; goto exit; } if ((rc = MQTTAsync_setCallbacks(client, client, connlost, msgarrvd, NULL)) != MQTTASYNC_SUCCESS) { printf("Failed to set callbacks, return code %d\n", rc); rc = EXIT_FAILURE; goto destroy_exit; } conn_opts.keepAliveInterval = 20; conn_opts.cleansession = 1; conn_opts.onSuccess = onConnect; conn_opts.onFailure = onConnectFailure; conn_opts.context = client; if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS) { printf("Failed to start connect, return code %d\n", rc); rc = EXIT_FAILURE; goto destroy_exit; } while (!subscribed && !finished) #if defined(_WIN32) Sleep(100); #else usleep(10000L); #endif if (finished) goto exit; do { ch = getchar(); } while (ch!='Q' && ch != 'q'); disc_opts.onSuccess = onDisconnect; disc_opts.onFailure = onDisconnectFailure; if ((rc = MQTTAsync_disconnect(client, &disc_opts)) != MQTTASYNC_SUCCESS) { printf("Failed to start disconnect, return code %d\n", rc); rc = EXIT_FAILURE; goto destroy_exit; } while (!disc_finished) { #if defined(_WIN32) Sleep(100); #else usleep(10000L); #endif } destroy_exit: MQTTAsync_destroy(&client); exit: return rc; }
交叉编译:
aarch64-mix210-linux-gcc MQTTAsync_subscribe.c -I paho.mqtt.c_result/include/ -L paho.mqtt.c_result/lib/ -l paho-mqtt3a -o MQTTAsync_subscribe
在嵌入式板子运行:
✨4.2 发布——MQTTAsync_publish.c
这是使用了 libpaho-mqtt3a.so 进行发布消息的源码,源码路径在源码的这个路径:paho.mqtt.c-1.3.13/src/samples/MQTTAsync_publish.c,只更改了服务器地址。完整代码如下:
/******************************************************************************* * Copyright (c) 2012, 2023 IBM Corp., Ian Craggs * * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v2.0 * and Eclipse Distribution License v1.0 which accompany this distribution. * * The Eclipse Public License is available at * https://www.eclipse.org/legal/epl-2.0/ * and the Eclipse Distribution License is available at * http://www.eclipse.org/org/documents/edl-v10.php. * * Contributors: * Ian Craggs - initial contribution *******************************************************************************/ #include #include #include #include "MQTTAsync.h" #if !defined(_WIN32) #include #else #include #endif #if defined(_WRS_KERNEL) #include #endif #define ADDRESS "192.168.3.227:1883" #define CLIENTID "ExampleClientPub" #define TOPIC "MQTT Examples" #define PAYLOAD "Hello World!" #define QOS 2 #define TIMEOUT 10000L int finished = 0; void connlost(void *context, char *cause) { MQTTAsync client = (MQTTAsync)context; MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer; int rc; printf("\nConnection lost\n"); if (cause) printf(" cause: %s\n", cause); printf("Reconnecting\n"); conn_opts.keepAliveInterval = 20; conn_opts.cleansession = 1; if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS) { printf("Failed to start connect, return code %d\n", rc); finished = 1; } } void onDisconnectFailure(void* context, MQTTAsync_failureData* response) { printf("Disconnect failed\n"); finished = 1; } void onDisconnect(void* context, MQTTAsync_successData* response) { printf("Successful disconnection\n"); finished = 1; } void onSendFailure(void* context, MQTTAsync_failureData* response) { MQTTAsync client = (MQTTAsync)context; MQTTAsync_disconnectOptions opts = MQTTAsync_disconnectOptions_initializer; int rc; printf("Message send failed token %d error code %d\n", response->token, response->code); opts.onSuccess = onDisconnect; opts.onFailure = onDisconnectFailure; opts.context = client; if ((rc = MQTTAsync_disconnect(client, &opts)) != MQTTASYNC_SUCCESS) { printf("Failed to start disconnect, return code %d\n", rc); exit(EXIT_FAILURE); } } void onSend(void* context, MQTTAsync_successData* response) { MQTTAsync client = (MQTTAsync)context; MQTTAsync_disconnectOptions opts = MQTTAsync_disconnectOptions_initializer; int rc; printf("Message with token value %d delivery confirmed\n", response->token); opts.onSuccess = onDisconnect; opts.onFailure = onDisconnectFailure; opts.context = client; if ((rc = MQTTAsync_disconnect(client, &opts)) != MQTTASYNC_SUCCESS) { printf("Failed to start disconnect, return code %d\n", rc); exit(EXIT_FAILURE); } } void onConnectFailure(void* context, MQTTAsync_failureData* response) { printf("Connect failed, rc %d\n", response ? response->code : 0); finished = 1; } void onConnect(void* context, MQTTAsync_successData* response) { MQTTAsync client = (MQTTAsync)context; MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer; MQTTAsync_message pubmsg = MQTTAsync_message_initializer; int rc; printf("Successful connection\n"); opts.onSuccess = onSend; opts.onFailure = onSendFailure; opts.context = client; pubmsg.payload = PAYLOAD; pubmsg.payloadlen = (int)strlen(PAYLOAD); pubmsg.qos = QOS; pubmsg.retained = 0; if ((rc = MQTTAsync_sendMessage(client, TOPIC, &pubmsg, &opts)) != MQTTASYNC_SUCCESS) { printf("Failed to start sendMessage, return code %d\n", rc); exit(EXIT_FAILURE); } } int messageArrived(void* context, char* topicName, int topicLen, MQTTAsync_message* m) { /* not expecting any messages */ return 1; } int main(int argc, char* argv[]) { MQTTAsync client; MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer; int rc; if ((rc = MQTTAsync_create(&client, ADDRESS, CLIENTID, MQTTCLIENT_PERSISTENCE_NONE, NULL)) != MQTTASYNC_SUCCESS) { printf("Failed to create client object, return code %d\n", rc); exit(EXIT_FAILURE); } if ((rc = MQTTAsync_setCallbacks(client, client, connlost, messageArrived, NULL)) != MQTTASYNC_SUCCESS) { printf("Failed to set callback, return code %d\n", rc); exit(EXIT_FAILURE); } conn_opts.keepAliveInterval = 20; conn_opts.cleansession = 1; conn_opts.onSuccess = onConnect; conn_opts.onFailure = onConnectFailure; conn_opts.context = client; if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS) { printf("Failed to start connect, return code %d\n", rc); exit(EXIT_FAILURE); } printf("Waiting for publication of %s\n" "on topic %s for client with ClientID: %s\n", PAYLOAD, TOPIC, CLIENTID); while (!finished) #if defined(_WIN32) Sleep(100); #else usleep(10000L); #endif MQTTAsync_destroy(&client); return rc; }
交叉编译:
aarch64-mix210-linux-gcc MQTTAsync_publish.c -I paho.mqtt.c_result/include/ -L paho.mqtt.c_result/lib/ -l paho-mqtt3a -o MQTTAsync_publish
在嵌入式板子的运行结果:
🎄五、总结
👉本文详细介绍了 paho.mqtt.c 库的“介绍、下载、交叉编译” ,以及怎么使用 aho.mqtt.c 库 编写MQTT客户端例子源码。
如果文章有帮助的话,点赞👍、收藏⭐,支持一波,谢谢 😁😁😁
-
-