centos7 mqtt服务mosquitto搭建记录
1、系统centos7.6,安装默认版本
yum install mosquitto
2、启动运行
systemctl start mosquitto
3、设置自启动
systemctl enable mosquitto
4、修改配置文件 vim /etc/mosquitto/mosquitto.conf
监听端口,默认为1883,需要修改删除前面 # 注释,改为自己的端口
listener 1883
是否启用密码,为true不需要,false需要,默认为false,如果不要用户名和密码可连接时,删除掉前面的 # 注释,后面添加true
allow_anonymous
需要密码,配置密码文件,删除password_file 前面的 # 注释,后面添加密码配置文件
password_file /etc/mosquitto/passwordFile
拷贝创建密码文件
cp /etc/mosquitto/pwfile.example /etc/mosquitto/passwordfile
5、创建用户和密码
mosquitto_passwd -c /etc/mosquitto/passwd username
其中username 需要换成自己的账户名,比如admin,回车执行后,会提示输入密码和输入确认密码,完成后重启服务生效
扩展,重启服务 systemctl restart mosquitto,停止服务systemctl stop mosquitto
到此mqtt服务器搭建完成
6、连接测试
下载MQTTX工具进行测试连接可发布消息,https://mqttx.app/zh/downloads
如果 填写名称、IP/域名,端口,账号密码,点击Connect 连接,连接成功左侧状态 会是绿色,失败会提示,连接成功发布消息 test为主题---topic,下面输入消息内容---message,点击发送
java springboot 连接测试
org.springframework.boot spring-boot-starter-integration org.springframework.integration spring-integration-mqtt
config
package com.szhz.util; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import javax.annotation.PostConstruct; import org.springframework.context.annotation.Configuration; @Configuration @Slf4j public class MqttPubConfig { private String hostUrl = "tcp://192.168.1.1:1883"; private String clientId = "mqtt123456_test1"; private String username = "admin"; private String password = "123456"; /** * 客户端对象 */ private MqttClient client; /** * 在bean初始化后连接到服务器 */ @PostConstruct public void init() { connect(); } /** * 客户端连接服务端 */ public void connect() { try { //创建MQTT客户端对象 client = new MqttClient(hostUrl, clientId, new MemoryPersistence()); //连接设置 MqttConnectOptions options = new MqttConnectOptions(); //是否清空session,设置false表示服务器会保留客户端的连接记录(订阅主题,qos),客户端重连之后能获取到服务器在客户端断开连接期间推送的消息 //设置为true表示每次连接服务器都是以新的身份 options.setCleanSession(true); //设置连接用户名 options.setUserName(username); //设置连接密码 options.setPassword(password.toCharArray()); //设置超时时间,单位为秒 options.setConnectionTimeout(100); //设置心跳时间 单位为秒,表示服务器每隔 1.5*20秒的时间向客户端发送心跳判断客户端是否在线 options.setKeepAliveInterval(20); //设置遗嘱消息的话题,若客户端和服务器之间的连接意外断开,服务器将发布客户端的遗嘱信息 options.setWill("willTopic", (clientId + "与服务器断开连接").getBytes(), 0, false); //设置回调 client.setCallback(new MqttPubCallBack()); client.connect(options); } catch (MqttException e) { e.printStackTrace(); } } /** * 发送主题内容 * * @param topic 主题 */ public void publish( String topic, String message) { MqttMessage mqttMessage = new MqttMessage(); mqttMessage.setQos(1); mqttMessage.setRetained(false); mqttMessage.setPayload(message.getBytes()); //主题的目的地,用于发布/订阅信息 MqttTopic mqttTopic = client.getTopic(topic); //提供一种机制来跟踪消息的传递进度 //用于在以非阻塞方式(在后台运行)执行发布是跟踪消息的传递进度 MqttDeliveryToken token; try { //将指定消息发布到主题,但不等待消息传递完成,返回的token可用于跟踪消息的传递状态 //一旦此方法干净地返回,消息就已被客户端接受发布,当连接可用,将在后台完成消息传递。 token = mqttTopic.publish(mqttMessage); token.waitForCompletion(); } catch (MqttException e) { e.printStackTrace(); } } /** * 订阅主题 ,此方法默认的的Qos等级为:1 * * @param topic 主题 */ public void sub(String topic) throws MqttException { client.subscribe(topic); } public static void main(String[] args) throws MqttException { MqttPubConfig mqttPubConfig = new MqttPubConfig(); mqttPubConfig.connect(); mqttPubConfig.sub("test111"); mqttPubConfig.publish("test111", "我发的第一条消息!"); } }
MqttPubCallBack
package com.szhz.util; import org.eclipse.paho.client.mqttv3.IMqttAsyncClient; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Configuration; @Configuration public class MqttPubCallBack implements MqttCallback{ @Value("${spring.mqtt.clientId}") private String clientId; /** * 与服务器断开的回调 */ @Override public void connectionLost(Throwable cause) { System.out.println(clientId+"与服务器断开连接"); } /** * 接受消息 */ @Override public void messageArrived(String topic, MqttMessage message) throws Exception { System.out.println(String.format("接收消息主题 : %s",topic)); System.out.println(String.format("接收消息内容 : %s",new String(message.getPayload()))); } /** * 消息发布成功的回调 */ @Override public void deliveryComplete(IMqttDeliveryToken token) { IMqttAsyncClient client = token.getClient(); System.out.println(client.getClientId()+"发布消息成功!"); } }
SendController
package com.szhz.controller; //import com.szhz.util.MqttPubConfig; import com.szhz.entity.MqttEntity; import com.szhz.util.MqttPubConfig; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.ResponseBody; @Controller public class SendController { @Autowired private MqttPubConfig providerClient; @RequestMapping(value = "/pubMessage", method = RequestMethod.POST) @ResponseBody public String sendMessage(@RequestBody MqttEntity entity){ try { providerClient.publish(entity.getTopic(), entity.getMessage()); return "发送成功"; } catch (Exception e) { e.printStackTrace(); return "发送失败"; } } @RequestMapping("/subTopic") @ResponseBody public String subTopic(String topic){ try { providerClient.sub(topic); return "订阅成功"; } catch (Exception e) { e.printStackTrace(); return "订阅失败"; } } }
文章版权声明:除非注明,否则均为主机测评原创文章,转载或复制请以超链接形式并注明出处。