刚开始讨论技术方案的时候,还在担忧Spring能否支持MQTT的问题,事实证明我的担忧太多了,万能的Spring怎么能不支持MQTT呢? 在MQTT Support可以找到你想要的文档。这里只有代码 :)
工程Maven结构。logback.xml放在src/main/resources中,java代码放在src/main/java中。
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>io.dynamax</groupId>
<artifactId>spring-mqtt-demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>spring-mqtt-demo</name>
<description>The MQTT demo project for Spring Boot</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.4.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
MqttPahoConfig.java
package io.dynamax.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.core.Pollers;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.integration.handler.LoggingHandler;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.integration.stream.CharacterStreamReadingMessageSource;
import org.springframework.messaging.MessageHandler;
@Configuration
@IntegrationComponentScan
public class MqttPahoConfig {
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setServerURIs("tcp://192.168.118.130:1883");
return factory;
}
//发布者配置
@Bean
public IntegrationFlow mqttOutFlow() {
return IntegrationFlows
.from(CharacterStreamReadingMessageSource.stdin(), e -> e.poller(Pollers.fixedDelay(1000)))
.transform(p -> p)
.handle(mqttOutbound())
.get();
}
@Bean
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler("siSamplePublisher", mqttClientFactory());
messageHandler.setAsync(true);
messageHandler.setDefaultTopic("siSampleTopic");
return messageHandler;
}
//订阅者配置
@Bean
public IntegrationFlow mqttFlow() {
return IntegrationFlows.from(mqttInbound()).transform(p -> p + ", received from MQTT").handle(logger())
.get();
}
public MessageProducerSupport mqttInbound() {
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter("siSampleConsumer",
mqttClientFactory(), "siSampleTopic");
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
return adapter;
}
private LoggingHandler logger() {
LoggingHandler loggingHandler = new LoggingHandler("INFO");
loggingHandler.setLoggerName("siSample");
return loggingHandler;
}
}
SpringMqttDemoApplication.java
package io.dynamax;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class SpringMqttDemoApplication {
private static final Logger LOGGER = LoggerFactory.getLogger(SpringMqttDemoApplication.class);
public static void main(String[] args) {
LOGGER.info("\n========================================================="
+ "\n "
+ "\n Welcome to Spring Integration! "
+ "\n "
+ "\n For more information please visit: "
+ "\n https://spring.io/projects/spring-integration "
+ "\n "
+ "\n=========================================================");
LOGGER.info("\n========================================================="
+ "\n "
+ "\n This is the MQTT Sample - "
+ "\n "
+ "\n Please enter some text and press return. The entered "
+ "\n Message will be sent to the configured MQTT topic, "
+ "\n then again immediately retrieved from the Message "
+ "\n Broker and ultimately printed to the command line. "
+ "\n "
+ "\n=========================================================");
SpringApplication.run(SpringMqttDemoApplication.class, args);
}
}
logback.xml
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<!-- encoders are assigned the type ch.qos.logback.classic.encoder.PatternLayoutEncoder
by default -->
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n
</pattern>
</encoder>
</appender>
<root level="INFO">
<appender-ref ref="STDOUT"/>
</root>
<logger name="io.dynamax" level="INFO"/>
<logger name="siSample" level="INFO"/>
</configuration>
MQTT服务器为Mosquitto。运行程序,在控制台上输入,控制台也将输出。