原创

Mqtt注解式开发,轻便emqx消息服务器,发布订阅轻松搞定

MQTT啊EMQX啊这些概念性问题,博主不再复制粘贴了,需要的老铁直接去百度吧.
直接上测试老代码:

模拟服务端发


    /**
     * 构造函数
     * 
     * @throws MqttException
     */
    public PahoServer() throws MqttException {
        // MemoryPersistence设置clientid的保存形式,默认为以内存保存
        client = new MqttClient(HOST, clientid, new MemoryPersistence());        
        connect();
    }

    /**
     * 用来连接服务器
     */
    private void connect() {
        MqttConnectOptions options = new MqttConnectOptions();
        options.setCleanSession(true);
        options.setUserName(userName);
        options.setPassword(passWord.toCharArray());
        // 设置超时时间
        options.setConnectionTimeout(60);
        // 设置会话心跳时间
        options.setKeepAliveInterval(30);
        try {
            client.setCallback(new PushCallback());
            topic11 = client.getTopic(TOPIC);
            client.connect(options);
            int[] Qos = { 0 };
            String[] topic2 = { TOPIC2 };
            client.subscribe(topic2, Qos);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 
     * @param topic
     * @param message
     * @throws MqttPersistenceException
     * @throws MqttException
     */
    public void publish(MqttTopic topic, MqttMessage message) throws MqttPersistenceException, MqttException {
        MqttDeliveryToken token = topic.publish(message);
        System.out.println("这是服务端发送的消息:"+token.getMessage());
        token.waitForCompletion();
//        client.disconnect();
        System.out.println("message is published completely! " + token.isComplete());
    }

模拟客户端收

// host为主机名,clientid即连接MQTT的客户端ID,一般以唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存
        client = new MqttClient(HOST, clientid, new MemoryPersistence());
        client.setManualAcks(false);
        // MQTT的连接设置
        options = new MqttConnectOptions();
        // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,设置为true表示每次连接到服务器都以新的身份连接
        options.setCleanSession(true);
        // 设置连接的用户名
        options.setUserName(userName);
        // 设置连接的密码
        options.setPassword(passWord.toCharArray());
        // 设置超时时间 单位为秒
        options.setConnectionTimeout(60);
        // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
        options.setKeepAliveInterval(30);
        // 设置回调
        client.setCallback(new MqttCallback() {
            public void connectionLost(Throwable cause) {
                // 连接丢失后,一般在这里面进行重连
                System.out.println("连接断开,正在重连");
                while (true) {
                    try {// 如果没有发生异常说明连接成功,如果发生异常,则死循环
                        Thread.sleep(1000);
                        System.out.println("正在重新连接...");
                        NewPaho newPaho = new NewPaho();
                        newPaho.start();
                        break;
                    } catch (Exception e) {
                        continue;
                    }
                }
            }
            public void deliveryComplete(IMqttDeliveryToken token) {
                try {
                    token.waitForCompletion();
                } catch (MqttException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
                System.out.println("deliveryComplete---------" + token.isComplete());
            }

            public void messageArrived(String topic, MqttMessage message) throws Exception {
                try {
                    str = message.toString();
//                    System.out.println(" 从服务器收到的消息为:" + message.toString());
                      System.out.println("接收消息主题 : " + topic);  
                        System.out.println("接收消息Qos : " + message.getQos());  
                        System.out.println("接收消息内容 : " + new String(message.getPayload()));
                        MqttMessage message2 = new MqttMessage();
                        message2.setQos(0);
                        message2.setRetained(false);
                        message2.setPayload(resc().getBytes());
                        client.publish("mqtt/abc", message2);

                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
         MqttTopic topic = client.getTopic(TOPIC1);
        // setWill方法,如果项目中需要知道客户端是否掉线可以调用该方法。设置最终端口的通知消息
         options.setWill(topic, "客户端掉线了啊".getBytes(), 0, false); //遗嘱
         client.connect(options);
        System.out.println("客户端连接成功......连接地址:"+client.getCurrentServerURI());
        // 订阅消息
        int[] Qos = { 1 };
        String[] topic1 = { TOPIC1 };
        client.subscribe(topic1, Qos);

从上述部分代码可以看出,每次都要频繁的setCallback,重复性的操作很多,所以决定将其,封装.
封装代码
之后只需要在配置文件配置连接地址就行了.然后再实际代码中应用如下

    @MqttSubscribe("terminal/dataUp/{tid}")
    public void dataUpSub(@PathValue("tid") Integer tid, @Payload String json){
        System.out.println("dataUp"+tid);
        System.out.println(ZipUtils.gunzip(json));
    }
    @MqttSubscribe("terminal/dataUpEvent/{tid}")
    public void dataUpEventSub(@PathValue("tid") Integer tid, @Payload String json){
        System.out.println("dataUpEvent"+tid);
        System.out.println(ZipUtils.gunzip(json));
    }
    @MqttSubscribe("terminal/metadata/{tid}")
    public void metadataSub(@PathValue("tid") Integer tid, @Payload(Base64GunzipConverter.class) String json){
        System.out.println("metadata"+tid);
        System.out.println(json);
    }

    @MqttSubscribe("terminal/common/+")
    public void commonSub(@Payload CommonEntity commonEntity){
        System.out.println("common");
        Integer eventId = commonEntity.getEventId();
        Integer status = commonEntity.getStatus();
        if (status == 0) {
            mqttEventQueueService.commonEventSucceeStatus(eventId);
        } else
            mqttEventQueueService.commonEventFailStatus(eventId);
    }

@MqttSubscribe("xxx")代表订阅括xxx主题.
@Payload 代表可以直接将消息内容转为实体,如果不使用该注解的话无法使用,并且字符串会默认是主题.
至于发布private MqttPublisher mqttPublisher;引入该类就行.mqttPublisher.send("xxx",json); 就能直接发布.
是不是很方便.

{
  "groups": [
    {
      "name": "mqtt",
      "type": "cn.wellsgroup.intelligent.building.cloud.device.manager.mqtt.impl.autoconfigure.MqttProperties",
      "description": "MQTT配置.",
      "sourceType": "cn.wellsgroup.intelligent.building.cloud.device.manager.mqtt.impl.autoconfigure.MqttProperties"
    },
    {
      "name": "mqtt.persistence",
      "type": "cn.wellsgroup.intelligent.building.cloud.device.manager.mqtt.impl.autoconfigure.MqttProperties$PersistenceProperties",
      "description": "MqttClientPersistence配置.",
      "sourceType": "cn.wellsgroup.intelligent.building.cloud.device.manager.mqtt.impl.autoconfigure.MqttProperties",
      "sourceMethod": "getPersistence()"
    },
    {
      "name": "mqtt.will",
      "type": "cn.wellsgroup.intelligent.building.cloud.device.manager.mqtt.impl.autoconfigure.MqttProperties$WillProperties",
      "description": "遗愿相关配置.",
      "sourceType": "cn.wellsgroup.intelligent.building.cloud.device.manager.mqtt.impl.autoconfigure.MqttProperties",
      "sourceMethod": "getWill()"
    }
  ],
  "properties": [
    {
      "name": "mqtt.automatic-reconnect",
      "type": "java.lang.Boolean",
      "description": "断开是否重新连接.",
      "sourceType": "cn.wellsgroup.intelligent.building.cloud.device.manager.mqtt.impl.autoconfigure.MqttProperties",
      "defaultValue": true
    },
    {
      "name": "mqtt.clean-session",
      "type": "java.lang.Boolean",
      "description": "是否清除会话.",
      "sourceType": "cn.wellsgroup.intelligent.building.cloud.device.manager.mqtt.impl.autoconfigure.MqttProperties",
      "defaultValue": true
    },
    {
      "name": "mqtt.client-id",
      "type": "java.lang.String",
      "description": "客户端ID, 默认为MqttAsyncClient.generateClientId().",
      "sourceType": "cn.wellsgroup.intelligent.building.cloud.device.manager.mqtt.impl.autoconfigure.MqttProperties"
    },
    {
      "name": "mqtt.connection-timeout",
      "type": "java.lang.Integer",
      "description": "连接超时时间.",
      "sourceType": "cn.wellsgroup.intelligent.building.cloud.device.manager.mqtt.impl.autoconfigure.MqttProperties",
      "defaultValue": 30
    },
    {
      "name": "mqtt.disable",
      "type": "java.lang.Boolean",
      "description": "是否禁用.",
      "sourceType": "cn.wellsgroup.intelligent.building.cloud.device.manager.mqtt.impl.autoconfigure.MqttProperties",
      "defaultValue": false
    },
    {
      "name": "mqtt.executor-service-timeout",
      "type": "java.lang.Integer",
      "description": "发送超时时间.",
      "sourceType": "cn.wellsgroup.intelligent.building.cloud.device.manager.mqtt.impl.autoconfigure.MqttProperties",
      "defaultValue": 10
    },
    {
      "name": "mqtt.keep-alive-interval",
      "type": "java.lang.Integer",
      "description": "KeepAlive 周期.",
      "sourceType": "cn.wellsgroup.intelligent.building.cloud.device.manager.mqtt.impl.autoconfigure.MqttProperties",
      "defaultValue": 30
    },
    {
      "name": "mqtt.max-reconnect-delay",
      "type": "java.lang.Integer",
      "description": "最大重连等待时间.",
      "sourceType": "cn.wellsgroup.intelligent.building.cloud.device.manager.mqtt.impl.autoconfigure.MqttProperties",
      "defaultValue": 30
    },
    {
      "name": "mqtt.password",
      "type": "java.lang.String",
      "description": "密码.",
      "sourceType": "cn.wellsgroup.intelligent.building.cloud.device.manager.mqtt.impl.autoconfigure.MqttProperties"
    },
    {
      "name": "mqtt.persistence.dir",
      "type": "java.lang.String",
      "description": "当 mqtt.persistence.type=file 时使用, 默认System.getProperty(\"user.dir\").",
      "sourceType": "cn.wellsgroup.intelligent.building.cloud.device.manager.mqtt.impl.autoconfigure.MqttProperties$PersistenceProperties",
      "defaultValue": "user.dir"
    },
    {
      "name": "mqtt.persistence.type",
      "type": "cn.wellsgroup.intelligent.building.cloud.device.manager.mqtt.impl.autoconfigure.MqttProperties$PersistenceType",
      "description": "类型.",
      "sourceType": "cn.wellsgroup.intelligent.building.cloud.device.manager.mqtt.impl.autoconfigure.MqttProperties$PersistenceProperties",
      "defaultValue": "memory"
    },
    {
      "name": "mqtt.uri",
      "type": "java.lang.String[]",
      "description": "MQTT服务器地址, 可以配置多个.",
      "sourceType": "cn.wellsgroup.intelligent.building.cloud.device.manager.mqtt.impl.autoconfigure.MqttProperties",
      "defaultValue": "tcp:\/\/127.0.0.1:1883"
    },
    {
      "name": "mqtt.username",
      "type": "java.lang.String",
      "description": "用户名.",
      "sourceType": "cn.wellsgroup.intelligent.building.cloud.device.manager.mqtt.impl.autoconfigure.MqttProperties"
    },
    {
      "name": "mqtt.will.payload",
      "type": "java.lang.String",
      "description": "遗愿消息内容.",
      "sourceType": "cn.wellsgroup.intelligent.building.cloud.device.manager.mqtt.impl.autoconfigure.MqttProperties$WillProperties",
      "defaultValue": "default will msg."
    },
    {
      "name": "mqtt.will.qos",
      "type": "java.lang.Integer",
      "description": "遗愿消息QOS.",
      "sourceType": "cn.wellsgroup.intelligent.building.cloud.device.manager.mqtt.impl.autoconfigure.MqttProperties$WillProperties",
      "defaultValue": 0
    },
    {
      "name": "mqtt.will.retained",
      "type": "java.lang.Boolean",
      "description": "遗愿消息是否保留.",
      "sourceType": "cn.wellsgroup.intelligent.building.cloud.device.manager.mqtt.impl.autoconfigure.MqttProperties$WillProperties",
      "defaultValue": false
    },
    {
      "name": "mqtt.will.topic",
      "type": "java.lang.String",
      "description": "遗愿主题.",
      "sourceType": "cn.wellsgroup.intelligent.building.cloud.device.manager.mqtt.impl.autoconfigure.MqttProperties$WillProperties"
    }
  ],
  "hints": [
    {
      "name": "mqtt.persistence.type",
      "values": [
        {
          "value": "memory",
          "description": "MemoryPersistence."
        },
        {
          "value": "file",
          "description": "MqttDefaultFilePersistence."
        }
      ]
    }
  ]
}

在项目resourceMETA-INF下面加入spring-configuration-metadata.json这个文件,将上面内容复制进去!sourceType记得改成自己项目包路径

<dependency>
    <groupId>com.github.tocrhz</groupId>
    <artifactId>mqtt-spring-boot-starter</artifactId>
    <version>1.0.0.RELEASE</version>
</dependency>

直接项目中引用该依赖就可以了.具体使用详情,可以进群或者加博主QQ咨询.

正文到此结束