MQTT简介和基本操作

协议简介

MQ 遥测传输 (MQTT) 是轻量级基于代理的发布/订阅的消息传输协议,设计思想是开放、简单、轻量、易于实现。这些特点使它适用于受限环境。例如,但不仅限于此:

  • 网络代价昂贵,带宽低、不可靠。
  • 在嵌入设备中运行,处理器和内存资源有限。

该协议的特点有:

  • 使用发布/订阅消息模式,提供一对多的消息发布,解除应用程序耦合。
  • 对负载内容屏蔽的消息传输。
  • 使用 TCP/IP 提供网络连接。
  • 有三种消息发布服务质量:
    • “至多一次”,消息发布完全依赖底层 TCP/IP 网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。
    • “至少一次”,确保消息到达,但消息重复可能会发生。
    • “只有一次”,确保消息到达一次。这一级别可用于如下情况,在计费系统中,消息重复或丢失会导致不正确的结果。
  • 小型传输,开销很小(固定长度的头部是 2 字节),协议交换最小化,以降低网络流量。
  • 使用 Last Will 和 Testament 特性通知有关各方客户端异常中断的机制。

服务器

现在常用的MQTT服务器:

ServerQoS 0QoS 1QoS 2auth网关$SYSSSL动态主题主从websockets插件系统
2lemetry§
Apache ActiveMQ
Apache ActiveMQ Artemis
Bevywise IoT Platformrmrm
emitter§
emqttd
flespi
GnatMQ
HBMQTT
HiveMQ
IBM MessageSight§
JoramMQ
Mongoose?????????
moquette???rm
mosca????
mosquitto§
MQTT.js§
MqttWk?
RabbitMQ???
RSMB?
Software AG Universal Messagingrm
Solace§
SwiftMQ
Trafero Tstack
VerneMQ
WebSphere MQ???

其中,对中文支持友好的是emqttd,现在叫EMQ,或者EMQX。http://emqtt.com/

EMQ

简介

参考链接:http://emqtt.com/docs/v3/getstarted.html

安装

参考链接:http://emqtt.com/docs/v3/install.html

在开发环境中,建议使用docker版

《MQTT简介和基本操作》

基本操作

使用Java作为开发语言

创建项目

引入:org.eclipse.paho.client.mqttv3包

maven为例:

 <!--https://mvnrepository.com/artifact/org.eclipse.paho/org.eclipse.paho.client.mqttv3-->
    <dependency>
      <groupId>org.eclipse.paho</groupId>
      <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
      <version>1.1.1</version>
    </dependency>

发送端

先创建Callback代码,

PushCallback.java

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;

/** * 发布消息的回调类 * <p> * 必须实现MqttCallback的接口并实现对应的相关接口方法CallBack 类将实现 MqttCallBack。 * 每个客户机标识都需要一个回调实例。在此示例中,构造函数传递客户机标识以另存为实例数据。 * 在回调中,将它用来标识已经启动了该回调的哪个实例。 * 必须在回调类中实现三个方法: * <p> * public void messageArrived(MqttTopic topic, MqttMessage message)接收已经预订的发布。 * <p> * public void connectionLost(Throwable cause)在断开连接时调用。 * <p> * public void deliveryComplete(MqttDeliveryToken token)) * 接收到已经发布的 QoS 1 或 QoS 2 消息的传递令牌时调用。 * 由 MqttClient.connect 激活此回调。 */
public class PushCallback implements MqttCallback {
  @Override
  public void connectionLost(Throwable cause) {
    // 连接丢失后,一般在这里面进行重连
    System.out.println("连接断开,可以做重连");
  }

  @Override
  public void deliveryComplete(IMqttDeliveryToken token) {
    System.out.println("deliveryComplete---------" + token.isComplete());
  }

  @Override
  public void messageArrived(String topic, MqttMessage message) throws Exception {
    // subscribe后得到的消息会执行到这里面
    System.out.println("接收消息主题 : " + topic);
    System.out.println("接收消息Qos : " + message.getQos());
    System.out.println("接收消息内容 : " + new String(message.getPayload()));
  }

}

创建Server.java

import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

/** * Title:Server * Description: 服务器向多个客户端推送主题,即不同客户端可向服务器订阅相同主题 */
public class Server {

  public static final String HOST = "tcp://172.24.105.130:11883";
  public static final String TOPIC = "topic/test";
  private static final String clientid = "server1";

  private MqttClient client;
  private MqttTopic topic;
  private String userName = "admin";
  private String passWord = "password";

  private MqttMessage message;

  public Server() throws MqttException {
    // MemoryPersistence设置clientid的保存形式,默认为以内存保存
    client = new MqttClient(HOST, clientid, new MemoryPersistence());
    connect();
  }

  private void connect() {
    MqttConnectOptions options = new MqttConnectOptions();
    options.setCleanSession(false);
    options.setUserName(userName);
    options.setPassword(passWord.toCharArray());
    // 设置超时时间
    options.setConnectionTimeout(10);
    // 设置会话心跳时间
    options.setKeepAliveInterval(20);
    try {
      client.setCallback(new PushCallback());
      client.connect(options);
      topic = client.getTopic(TOPIC);
    } catch (Exception e) {
      e.printStackTrace();
    }
  }

  public void publish(MqttTopic topic, MqttMessage message) throws MqttPersistenceException,
    MqttException {
    MqttDeliveryToken token = topic.publish(message);
    token.waitForCompletion();
    System.out.println("message is published completely! "
      + token.isComplete());
  }

  public static void main(String[] args) throws Exception {
    Server server = new Server();
    server.message = new MqttMessage();
    Long startTime = System.currentTimeMillis();
    server.message.setQos(1);
    server.message.setRetained(true);
    for (int i = 0; i < 10; i++) {
      server.message.setPayload(("server message " + i).getBytes());
      server.publish(server.topic, server.message);
    }
    System.out.println(System.currentTimeMillis() - startTime);
    System.out.println(server.message.isRetained() + "------ratained状态");
  }

}

接收端

创建:Client.java


import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class Client {
  public static final String HOST = "tcp://172.24.105.130:11883";
  public static final String TOPIC = "topic/test";
  private static final String clientid = "client";
  private MqttClient client;
  private MqttConnectOptions options;
  private String userName = "admin";
  private String passWord = "password";


  private void start(String topicName) {
    try {
      // host为主机名,clientid即连接MQTT的客户端ID,一般以唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存
      client = new MqttClient(HOST, clientid, new MemoryPersistence());
      // MQTT的连接设置
      options = new MqttConnectOptions();
      // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接
      options.setCleanSession(true);
      // 设置连接的用户名
      options.setUserName(userName);
      // 设置连接的密码
      options.setPassword(passWord.toCharArray());
      // 设置超时时间 单位为秒
      options.setConnectionTimeout(10);
      // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
      options.setKeepAliveInterval(20);
      // 设置回调
      client.setCallback(new PushCallback());
      //setWill方法,如果项目中需要知道客户端是否掉线可以调用该方法。设置最终端口的通知消息

      client.connect(options);
      //订阅消息
      int[] Qos = {1};
      String[] topic1 = {topicName};
      client.subscribe(topic1, Qos);

    } catch (Exception e) {
      e.printStackTrace();
    }
  }

  public static void main(String[] args) throws MqttException {
    Client client = new Client();
    client.start(TOPIC);
  }
}

管理

emqx_dashboard提供了丰富的管理功能,查看服务运行状态,数据统计,包计数等。

从dashboard菜单中可以看到提供了丰富的数据查看功能

点赞