昌浩MQTT全量消息接收项目-1016设备

This commit is contained in:
chuyongshuo 2025-03-07 17:09:01 +08:00
parent 999a4e7803
commit ce3162fb52
16 changed files with 1099 additions and 0 deletions

67
package.xml Normal file
View File

@ -0,0 +1,67 @@
<assembly>
<id>bin</id>
<!-- 最终打包成一个用于发布的zip文件 -->
<formats>
<format>tar.gz</format>
</formats>
<!-- Adds dependencies to zip package under lib directory -->
<dependencySets>
<dependencySet>
<!--
不使用项目的artifact第三方jar不要解压打包进zip文件的lib目录
-->
<useProjectArtifact>false</useProjectArtifact>
<outputDirectory>lib</outputDirectory>
<unpack>false</unpack>
</dependencySet>
</dependencySets>
<fileSets>
<!-- 把项目相关的说明文件打包进zip文件的根目录 -->
<fileSet>
<directory>${project.basedir}</directory>
<outputDirectory></outputDirectory>
<includes>
<include>README*</include>
<include>LICENSE*</include>
<include>NOTICE*</include>
</includes>
</fileSet>
<!-- 把项目的配置文件打包进zip文件的config目录 -->
<fileSet>
<directory>${project.basedir}/src/main/resources/</directory>
<outputDirectory>config</outputDirectory>
<!--<includes>
<include>*.*</include>
<include>*.properties</include>
</includes> -->
</fileSet>
<!-- 把项目脚本打包进zip文件的script目录 -->
<fileSet>
<directory>${project.basedir}/src/main/script/</directory>
<outputDirectory>script</outputDirectory>
<includes>
<include>*.bat</include>
<include>*.sh</include>
</includes>
</fileSet>
<!--将模板文件打包进tar包-->
<fileSet>
<directory>${project.basedir}/excel</directory>
<outputDirectory>excel</outputDirectory>
<includes>
<include>*.*</include>
</includes>
</fileSet>
<!-- 把项目自己编译出来的jar文件打包进zip文件的根目录 -->
<fileSet>
<directory>${project.build.directory}</directory>
<outputDirectory></outputDirectory>
<includes>
<include>*.jar</include>
</includes>
</fileSet>
</fileSets>
</assembly>

210
pom.xml Normal file
View File

@ -0,0 +1,210 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.goats</groupId>
<artifactId>changhaoshiye</artifactId>
<version>1.0-SNAPSHOT</version>
<description>昌昊加工工时采集/分析</description>
<properties>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.0</version>
<relativePath/>
</parent>
<dependencies>
<!--web启动项-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>2.7.0</version>
</dependency>
<!--测试使用-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- 阿里数据库连接池 -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>1.2.11</version>
</dependency>
<!-- Mysql驱动包 -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.30</version>
</dependency>
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>1.3.2</version>
</dependency>
<!--mqtt客户端依赖-->
<dependency>
<groupId>org.dromara.mica-mqtt</groupId>
<artifactId>mica-mqtt-client-spring-boot-starter</artifactId>
<version>2.4.1</version>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>5.7.1</version>
<scope>compile</scope>
</dependency>
<!-- &lt;!&ndash;JSON &ndash;&gt;-->
<!-- <dependency>-->
<!-- <groupId>com.alibaba</groupId>-->
<!-- <artifactId>fastjson</artifactId>-->
<!-- <version>2.0.12</version>-->
<!-- </dependency>-->
<dependency>
<groupId>net.sf.json-lib</groupId>
<artifactId>json-lib</artifactId>
<version>0.9</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<!-- 如果是通过parent方式继承spring-boot-starter-parent则不用此插件 -->
<!-- <plugin> <groupId>org.springframework.boot</groupId>-->
<!-- <artifactId>spring-boot-maven-plugin</artifactId> </plugin>-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<!-- The configuration of the plugin -->
<configuration>
<!-- Configuration of the archiver -->
<archive>
<!-- 生成的jar中不要包含pom.xml和pom.properties这两个文件 -->
<addMavenDescriptor>false</addMavenDescriptor>
<!-- Manifest specific configuration -->
<manifest>
<!-- 是否要把第三方jar放到manifest的classpath中 -->
<addClasspath>true</addClasspath>
<!-- 生成的manifest中classpath的前缀因为要把第三方jar放到lib目录下所以classpath的前 缀是lib/ -->
<classpathPrefix>lib/</classpathPrefix>
<!-- 应用的main class -->
<mainClass>com.goats.WebApplication</mainClass>
</manifest>
<manifestEntries>
<Class-Path>config</Class-Path>
</manifestEntries>
</archive>
<!-- 过滤掉不希望包含在jar中的文件 -->
<excludes>
<exclude>*.properties</exclude>
<exclude>*.yaml</exclude>
<exclude>*.config</exclude>
<exclude>config/**</exclude>
</excludes>
</configuration>
</plugin>
<plugin>
<artifactId>maven-resources-plugin</artifactId>
<version>3.1.0</version>
<executions>
<execution>
<id>copy-resources</id>
<phase>package</phase>
<goals>
<goal>copy-resources</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/maven-archiver/resources</outputDirectory>
<resources>
<resource>
<directory>${basedir}/src/main/resources</directory>
<include>*.properties</include>
<include>*.config</include>
<filtering>true</filtering>
</resource>
</resources>
</configuration>
</execution>
</executions>
</plugin>
<!-- The configuration of maven-assembly-plugin -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<!-- The configuration of the plugin -->
<configuration>
<!-- Specifies the configuration file of the assembly plugin -->
<descriptors>
<descriptor>${project.basedir}/package.xml</descriptor>
</descriptors>
<finalName>${project.artifactId}</finalName>
<appendAssemblyId>false</appendAssemblyId>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
<plugin>
<groupId>org.mybatis.generator</groupId>
<artifactId>mybatis-generator-maven-plugin</artifactId>
<version>1.3.2</version>
<configuration>
<verbose>true</verbose>
<overwrite>true</overwrite>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>11</source>
<target>11</target>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,18 @@
package com.goats;
import lombok.extern.slf4j.Slf4j;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
@SpringBootApplication
@Slf4j
@MapperScan("com.goats.mapper")
@EnableScheduling //开启定时任务
public class WebApplication {
public static void main(String[] args) {
SpringApplication.run(WebApplication.class, args);
log.info("项目启动成功");
}
}

View File

@ -0,0 +1,10 @@
package com.goats.mapper;
import com.goats.pojo.MqttData;
import org.springframework.stereotype.Repository;
@Repository
public interface MqttDataMapper {
void insertMqttData(MqttData mqttData);
}

View File

@ -0,0 +1,40 @@
package com.goats.mqtt;
import org.dromara.mica.mqtt.core.client.MqttClientCreator;
import org.dromara.mica.mqtt.spring.client.event.MqttConnectedEvent;
import org.dromara.mica.mqtt.spring.client.event.MqttDisconnectEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Service;
/**
* 示例客户端连接状态监听
*
* @author L.cm
*/
@Service
public class MqttClientConnectListener {
private static final Logger logger = LoggerFactory.getLogger(MqttClientConnectListener.class);
@Autowired
private MqttClientCreator mqttClientCreator;
@EventListener
public void onConnected(MqttConnectedEvent event) {
logger.info("MqttConnectedEvent:{}", event);
}
@EventListener
public void onDisconnect(MqttDisconnectEvent event) {
// 离线时更新重连时的密码适用于类似阿里云 mqtt clientId 连接带时间戳的方式
logger.info("MqttDisconnectEvent:{}", event);
// 在断线时更新 clientIdusernamepassword
mqttClientCreator.clientId("newClient" + System.currentTimeMillis())
.username("newUserName")
.password("newPassword");
}
}

View File

@ -0,0 +1,203 @@
package com.goats.mqtt;
import com.goats.mapper.MqttDataMapper;
import com.goats.pojo.DataPacket;
import com.goats.pojo.MqttData;
import net.sf.json.JSONObject;
import org.dromara.mica.mqtt.codec.MqttPublishMessage;
import org.dromara.mica.mqtt.core.client.IMqttClientGlobalMessageListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.tio.core.ChannelContext;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Service
public class MqttClientGlobalMessageListener implements IMqttClientGlobalMessageListener {
private static final Logger logger = LoggerFactory.getLogger(MqttClientGlobalMessageListener.class);
@Autowired
private MqttDataMapper mqttDataMapper;
// 超时时间单位为毫秒
private static final long TIMEOUT = 20000; // 20秒
// 模拟接收到的数据包 Map使用设备ID和数据时间戳组合来唯一标识
//key 设备主键 value 三个包信息
private static ConcurrentHashMap<String, DataPacket> messageMap = new ConcurrentHashMap<>();
@Override
public void onMessage(ChannelContext context, String topic, MqttPublishMessage message, byte[] payload) {
// 在这里处理收到的消息
try {
String str = new String(payload, StandardCharsets.UTF_8);
JSONObject jsonObject = new JSONObject(str);
logger.info("收到的消息订阅是{}", topic);
String deviceKey = jsonObject.optString("DeviceID"); //设备Id
String deviceDate = jsonObject.optString("Date"); //网关时间
Integer seq = jsonObject.optInt("Seq"); //包编号
if (topic.equals("v1/devices")&& deviceKey.equals("1016")) {
/**
* 1根据每条发送过来的消息存储的Map集合中选择合适的key 比如设备id 时间戳不同不能作为联合主键
* 2依次接收123存入每次接收到消息判断是否全部接收到3条消息如果已全部接收到执行下面逻辑插入数据库
* 3如果包顺序是按照老狗网关配置顺序不管按不按顺序一旦出现重复的SEQ消息那么直接丢弃上一个对象重新存储map集合
*
* 改造:
* 1超时处理只收到了 2 个包 3 个包丢失了缓存中的数据会一直存在
* 2重复包的处理逻辑 是丢失上个整个对象还是丢弃上次的包还是丢弃本次的包==== 丢弃本次发来的包
*/
//从map缓存对象中判断是否存在包信息
DataPacket existingPacket = messageMap.get(deviceKey);
//如果不存在包信息那么是第一条信息创建包对象
if (existingPacket == null) {
//添加包编号与JSON数据
existingPacket = new DataPacket(deviceKey, System.currentTimeMillis(), seq, str);
} else {
// 如果存在包信息检查是否重复
if (existingPacket.isSeqReceived(seq)) {
// 重复包丢弃当前包
System.out.println("重复包设备ID: " + deviceKey + ", 包编号: " + seq);
return;
} else {
// 添加新包
existingPacket.addPacket(seq, str);
}
}
//判断如何缓存中已存在三个包消息那么进行业务逻辑处理
if (existingPacket.isComplete()) {
//合并三个JSON串
String mergeJsonStrings = existingPacket.getMergedData();
logger.info("设备ID是->{},合并后JSON串->{}", deviceKey, mergeJsonStrings);
String DeviceID = jsonObject.optString("DeviceID"); //设备Id
messageMap.remove(DeviceID); // 数据保存后移除此数据包记录
logger.info("-------------------执行业务逻辑在下面-------------------------");
jsonObject = new JSONObject(mergeJsonStrings);
//数据添加到数据库
MqttData mqttData = new MqttData();
mqttData.setRid(String.valueOf(System.currentTimeMillis()));
mqttData.setId(jsonObject.optString("DeviceID"));
mqttData.setA001(jsonObject.optDouble("A001"));
mqttData.setA002(jsonObject.optDouble("A002"));
mqttData.setA003(jsonObject.optDouble("A003"));
mqttData.setA004(jsonObject.optInt("A004"));
mqttData.setA005(jsonObject.optInt("A005"));
mqttData.setA006(jsonObject.optInt("A006"));
mqttData.setA007(jsonObject.optInt("A007"));
mqttData.setA008(jsonObject.optInt("A008"));
mqttData.setA009(jsonObject.optInt("A009"));
mqttData.setA010(jsonObject.optInt("A010"));
mqttData.setA011(jsonObject.optInt("A011"));
mqttData.setA012(jsonObject.optInt("A012"));
mqttData.setA013(jsonObject.optInt("A013"));
mqttData.setA014(jsonObject.optInt("A014"));
mqttData.setA015(jsonObject.optInt("A015"));
mqttData.setA016(jsonObject.optInt("A016"));
mqttData.setA017(jsonObject.optInt("A017"));
mqttData.setA018(jsonObject.optDouble("A018"));
mqttData.setA019(jsonObject.optDouble("A019"));
mqttData.setA020(jsonObject.optDouble("A020"));
mqttData.setA021(jsonObject.optInt("A021"));
mqttData.setA022(jsonObject.optInt("A022"));
mqttData.setA023(jsonObject.optInt("A023"));
mqttData.setA024(jsonObject.optInt("A024"));
mqttData.setA025(jsonObject.optInt("A025"));
mqttData.setA026(jsonObject.optInt("A026"));
mqttData.setA027(jsonObject.optInt("A027"));
mqttData.setA028(jsonObject.optDouble("A028"));
mqttData.setA029(jsonObject.optDouble("A029"));
mqttData.setA030(jsonObject.optDouble("A030"));
mqttData.setA031(jsonObject.optInt("A031"));
mqttData.setA032(jsonObject.optInt("A032"));
mqttData.setA033(jsonObject.optInt("A033"));
mqttData.setA034(jsonObject.optInt("A034"));
mqttData.setA035(jsonObject.optInt("A035"));
mqttData.setA036(jsonObject.optInt("A036"));
mqttData.setA037(jsonObject.optInt("A037"));
mqttData.setA038(jsonObject.optInt("A038"));
mqttData.setA039(jsonObject.optString("A039"));
mqttData.setA040(jsonObject.optInt("A040"));
mqttData.setA041(jsonObject.optInt("A041"));
mqttData.setA042(jsonObject.optInt("A042"));
mqttData.setA043(jsonObject.optInt("A043"));
mqttData.setA044(jsonObject.optInt("A044"));
mqttData.setA045(jsonObject.optInt("A045"));
mqttData.setA046(jsonObject.optInt("A046"));
mqttData.setA047(jsonObject.optInt("A047"));
mqttData.setA048(jsonObject.optDouble("A048"));
mqttData.setA049(jsonObject.optDouble("A049"));
mqttData.setA050(jsonObject.optDouble("A050"));
mqttData.setA051(jsonObject.optDouble("A051"));
mqttData.setA052(jsonObject.optDouble("A052"));
mqttData.setA053(jsonObject.optDouble("A053"));
mqttData.setA054(jsonObject.optDouble("A054"));
mqttData.setA055(jsonObject.optDouble("A055"));
mqttData.setA056(jsonObject.optDouble("A056"));
mqttData.setA057(jsonObject.optDouble("A057"));
mqttData.setA058(jsonObject.optDouble("A058"));
mqttData.setA059(jsonObject.optDouble("A059"));
mqttData.setA060(jsonObject.optDouble("A060"));
mqttData.setA061(jsonObject.optDouble("A061"));
mqttData.setA062(jsonObject.optDouble("A062"));
mqttData.setA063(jsonObject.optDouble("A063"));
mqttData.setA064(jsonObject.optDouble("A064"));
mqttData.setA065(jsonObject.optDouble("A065"));
mqttData.setA066(jsonObject.optDouble("A066"));
mqttData.setA067(jsonObject.optDouble("A067"));
mqttData.setA068(jsonObject.optDouble("A068"));
mqttData.setA069(jsonObject.optDouble("A069"));
mqttData.setA070(jsonObject.optDouble("A070"));
mqttData.setA071(jsonObject.optDouble("A071"));
mqttData.setA072(jsonObject.optDouble("A072"));
mqttData.setA073(jsonObject.optDouble("A073"));
mqttData.setA074(jsonObject.optDouble("A074"));
mqttData.setA075(jsonObject.optDouble("A075"));
mqttData.setA076(jsonObject.optDouble("A076"));
mqttData.setA077(jsonObject.optDouble("A077"));
mqttData.setA078(jsonObject.optDouble("A078"));
mqttData.setA079(jsonObject.optDouble("A079"));
mqttData.setA080(jsonObject.optDouble("A080"));
mqttData.setA081(jsonObject.optInt("A081"));
mqttData.setA082(jsonObject.optInt("A082"));
mqttData.setA083(jsonObject.optInt("A083"));
mqttData.setA084(jsonObject.optInt("A084"));
mqttData.setA085(jsonObject.optInt("A085"));
mqttDataMapper.insertMqttData(mqttData);
} else {
logger.info("设备ID是->{},不满足三个包信息,进行PUT操作,保持对象信息是{}", deviceKey, existingPacket);
messageMap.put(deviceKey, existingPacket);
System.out.println("messageMap = " + messageMap);
return;
}
}else {
logger.info("次信息是1015设备不做处理{}",jsonObject);
}
} catch (Exception e) {
e.getMessage();
}
}
@Scheduled(fixedRate = 5000)
public void checkTimeout() {
long currentTime = System.currentTimeMillis();
logger.info("定时任务正在执行,{}",messageMap);
for (Iterator<Map.Entry<String, DataPacket>> it = messageMap.entrySet().iterator(); it.hasNext(); ) {
Map.Entry<String, DataPacket> entry = it.next();
DataPacket packet = entry.getValue();
long timeDiff = currentTime - packet.getLastUpdateTime();
logger.info("定时任务正在执行设备ID: {}, 时间差: {} 毫秒", packet.getDeviceId(), timeDiff);
if (timeDiff > TIMEOUT) {
it.remove();
logger.info("定时任务正在执行超时移除设备ID: {}", packet.getDeviceId());
}
}
}
}

View File

@ -0,0 +1,32 @@
package com.goats.mqtt;
import org.dromara.mica.mqtt.codec.MqttQoS;
import org.dromara.mica.mqtt.spring.client.MqttClientSubscribe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import java.nio.charset.StandardCharsets;
@Service
public class MqttClientSubscribeListener {
private static final Logger logger = LoggerFactory.getLogger(MqttClientSubscribeListener.class);
@MqttClientSubscribe("v1/devices")
public void subQos0(String topic, byte[] payload) {
logger.info("topic:{} payload:{}", topic, new String(payload, StandardCharsets.UTF_8));
}
/*@MqttClientSubscribe(value = "/qos1/#", qos = MqttQoS.QOS1)
public void subQos1(String topic, byte[] payload) {
logger.info("topic:{} payload:{}", topic, new String(payload, StandardCharsets.UTF_8));
}
@MqttClientSubscribe("/sys/${productKey}/${deviceName}/thing/sub/register")
public void thingSubRegister(String topic, byte[] payload) {
// 1.3.8 开始支持@MqttClientSubscribe 注解支持 ${} 变量替换会默认替换成 +
// 注意mica-mqtt 会先从 Spring boot 配置中替换参数 ${}如果存在配置会优先被替换
logger.info("topic:{} payload:{}", topic, new String(payload, StandardCharsets.UTF_8));
}
*/
}

View File

@ -0,0 +1,111 @@
package com.goats.pojo;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.io.IOException;
import java.util.Arrays;
/**
* 缓存数据结构用于存储每个设备的数据包
*/
public class DataPacket {
private String deviceId; // 设备ID
private String[] dataParts = new String[3]; // 用来存储三部分数据
private boolean[] receivedSeqs = new boolean[3]; // 记录哪些包已经收到
private long lastUpdateTime; // 最后一次更新时间
public DataPacket(String deviceId, Long timestamp, int seq, String data) {
this.deviceId = deviceId;
addPacket(seq, data);
this.lastUpdateTime = System.currentTimeMillis();
}
// 添加数据包
public void addPacket(int seq, String data) {
if (seq >= 1 && seq <= 3) {
this.dataParts[seq - 1] = data;
this.receivedSeqs[seq - 1] = true;
this.lastUpdateTime = System.currentTimeMillis();
}
}
// 判断是否已经接收到所有数据包
public boolean isComplete() {
for (boolean received : receivedSeqs) {
if (!received) {
return false;
}
}
return true;
}
// 判断是否已经接收到某个包
public boolean isSeqReceived(int seq) {
return seq >= 1 && seq <= 3 && receivedSeqs[seq - 1];
}
/**
* 获取合并后的数据
* @return 合并后的 JSON 字符串
* @throws IOException 如果 JSON 解析失败
*/
public String getMergedData() throws IOException {
// 调用 mergeJsonStrings 方法合并 JSON 数据
return mergeJsonStrings(dataParts[0], dataParts[1], dataParts[2]);
}
// 获取最后一次更新时间
public long getLastUpdateTime() {
return lastUpdateTime;
}
/**
* 合并多个 JSON 字符串
* @param jsonStrings 多个 JSON 字符串
* @return 合并后的 JSON 字符串
* @throws IOException 如果 JSON 解析失败
*/
public static String mergeJsonStrings(String... jsonStrings) throws IOException {
// 创建 ObjectMapper 实例
ObjectMapper mapper = new ObjectMapper();
// 创建一个空的 ObjectNode 来存放合并后的内容
ObjectNode mergedNode = mapper.createObjectNode();
// 遍历传入的所有 JSON 字符串
for (String jsonString : jsonStrings) {
// 解析当前 JSON 字符串为 JsonNode
JsonNode jsonNode = mapper.readTree(jsonString);
// 合并字段
jsonNode.fieldNames().forEachRemaining(fieldName -> {
// 忽略重复的字段 seq, Date DeviceID只保留一个
if (!fieldName.equals("seq") && !fieldName.equals("Date")) {
mergedNode.set(fieldName, jsonNode.get(fieldName));
}
});
}
// 返回合并后的 JSON 字符串
return mapper.writerWithDefaultPrettyPrinter().writeValueAsString(mergedNode);
}
public String getDeviceId() {
return deviceId;
}
public void setDeviceId(String deviceId) {
this.deviceId = deviceId;
}
@Override
public String toString() {
return "DataPacket{" +
"deviceId='" + deviceId + '\'' +
", dataParts=" + Arrays.toString(dataParts) +
", receivedSeqs=" + Arrays.toString(receivedSeqs) +
'}';
}
}

View File

@ -0,0 +1,102 @@
package com.goats.pojo;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* mqtt原始数据
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class MqttData {
private String Rid;
private String id;
private Double A001;
private Double A002;
private Double A003;
private Integer A004;
private Integer A005;
private Integer A006;
private Integer A007;
private Integer A008;
private Integer A009;
private Integer A010;
private Integer A011;
private Integer A012;
private Integer A013;
private Integer A014;
private Integer A015;
private Integer A016;
private Integer A017;
private Double A018;
private Double A019;
private Double A020;
private Integer A021;
private Integer A022;
private Integer A023;
private Integer A024;
private Integer A025;
private Integer A026;
private Integer A027;
private Double A028;
private Double A029;
private Double A030;
private Integer A031;
private Integer A032;
private Integer A033;
private Integer A034;
private Integer A035;
private Integer A036;
private Integer A037;
private Integer A038;
private String A039;
private Integer A040;
private Integer A041;
private Integer A042;
private Integer A043;
private Integer A044;
private Integer A045;
private Integer A046;
private Integer A047;
private Double A048;
private Double A049;
private Double A050;
private Double A051;
private Double A052;
private Double A053;
private Double A054;
private Double A055;
private Double A056;
private Double A057;
private Double A058;
private Double A059;
private Double A060;
private Double A061;
private Double A062;
private Double A063;
private Double A064;
private Double A065;
private Double A066;
private Double A067;
private Double A068;
private Double A069;
private Double A070;
private Double A071;
private Double A072;
private Double A073;
private Double A074;
private Double A075;
private Double A076;
private Double A077;
private Double A078;
private Double A079;
private Double A080;
private Integer A081;
private Integer A082;
private Integer A083;
private Integer A084;
private Integer A085;
}

View File

@ -0,0 +1,53 @@
package com.goats.utils;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.io.IOException;
/**
* 合并json串方法
*/
public class MergeJson {
public static void main(String[] args) {
// 示例多个JSON字符串
String jsonString1 = "{ \"DeviceID\": \"1016\", \"seq\": \"12345\", \"Date\": \"2025-02-11\", \"A049\": 12121122.1, \"A050\": 12121122.11 }";
String jsonString2 = "{ \"DeviceID\": \"1016\", \"seq\": \"67890\", \"Date\": \"2025-02-12\", \"A051\": 12121122.11, \"A052\": 12121122.11 }";
String jsonString3 = "{ \"DeviceID\": \"1016\", \"seq\": \"98765\", \"Date\": \"2025-02-13\", \"A053\": 12121122.22, \"A054\": 12121122.33 }";
}
/**
* 合并多个json串
* @param jsonStrings
* @return
* @throws IOException
*/
public static String mergeJsonStrings2(String... jsonStrings) throws IOException {
// 创建 ObjectMapper 实例
ObjectMapper mapper = new ObjectMapper();
// 创建一个空的 ObjectNode 来存放合并后的内容
ObjectNode mergedNode = mapper.createObjectNode();
// 遍历传入的所有 JSON 字符串
for (String jsonString : jsonStrings) {
// 解析当前 JSON 字符串为 JsonNode
JsonNode jsonNode = mapper.readTree(jsonString);
// 合并字段
jsonNode.fieldNames().forEachRemaining(fieldName -> {
// 忽略重复的字段 seq, Date DeviceID只保留一个
if (!fieldName.equals("seq") && !fieldName.equals("Date")) {
mergedNode.set(fieldName, jsonNode.get(fieldName));
}
});
}
// 返回合并后的 JSON 字符串
return mapper.writerWithDefaultPrettyPrinter().writeValueAsString(mergedNode);
}
}

View File

@ -0,0 +1,23 @@
################ 开发环境配置 ################
# 端口
server:
port: 9020
spring:
# 数据源配置
datasource:
type: com.alibaba.druid.pool.DruidDataSource
url: jdbc:mysql://8.130.165.100:33060/goats_ch?useUnicode=true&characterEncoding=utf-8&useSSL=true&serverTimezone=UTC
username: root
password: goats@2023mysql
# 是否打开sql监控台 (生产环境请务必关闭此选项)
druid:
stat-view-servlet:
enabled: true
web-stat-filter:
enabled: true
filter:
config:
enabled: true

View File

@ -0,0 +1,21 @@
################ 生产环境配置 ################
# 端口
server:
port: 9020
spring:
# 数据源配置
datasource:
type: com.alibaba.druid.pool.DruidDataSource
url: jdbc:mysql://192.168.15.20:3306/goats_chupdate?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8
username: root
password: 123456
# 是否打开sql监控台 (生产环境请务必关闭此选项)
druid:
stat-view-servlet:
enabled: false
web-stat-filter:
enabled: false
filter:
config:
enabled: false

View File

@ -0,0 +1,42 @@
spring:
# 应用名称
application.name: changhaoshiye
profiles:
# 启动环境加载同时加载两个的写法common,prod 优先级左>右
# active: dev
# active: test
active: prod
mqtt:
client:
enabled: true # 是否开启客户端默认true
# ip: 127.0.0.1 # 连接的服务端 ip 默认127.0.0.1
ip: 192.168.15.20 # 连接的服务端 ip 默认127.0.0.1
port: 1883 # 端口默认1883
name: Java-Mqtt-Client # 名称默认Mica-Mqtt-Client
client-id: 000001 # 客户端Id非常重要一般为设备 sn不可重复
user-name: admin # 认证的用户名
password: public # 认证的密码
global-subscribe: # 全局订阅的 topic可被全局监听到保留 session 停机重启依然可以接受到消息。2.2.9开始支持)
timeout: 5 # 超时时间单位默认5秒
reconnect: true # 是否重连默认true
re-interval: 5000 # 重连时间,默认 5000 毫秒
version: mqtt_3_1_1 # mqtt 协议版本,可选 MQTT_3_1、mqtt_3_1_1、mqtt_5默认mqtt_3_1_1
read-buffer-size: 8KB # 接收数据的 buffer size默认8k
max-bytes-in-message: 10MB # 消息解析最大 bytes 长度默认10M
keep-alive-secs: 60 # keep-alive 时间,单位:秒
clean-session: true # mqtt clean session默认true
ssl:
enabled: false # 是否开启 ssl 认证2.1.0 开始支持双向认证
keystore-path: # 可选参数ssl 双向认证 keystore 目录,支持 classpath:/ 路径。
keystore-pass: # 可选参数ssl 双向认证 keystore 密码
truststore-path: # 可选参数ssl 双向认证 truststore 目录,支持 classpath:/ 路径。
truststore-pass: # 可选参数ssl 双向认证 truststore 密码
# mybatis配置
mybatis:
#标注待解析的mapper的xml文件位置
mapper-locations: classpath*:mapper/*.xml
#标注实体类位置
type-aliases-package: com.goats.pojo
configuration:
log-impl: org.apache.ibatis.logging.stdout.StdOutImpl

View File

@ -0,0 +1,51 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration xmlns="http://ch.qos.logback/xml/ns/logback"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://ch.qos.logback/xml/ns/logback https://raw.githubusercontent.com/enricopulatzo/logback-XSD/master/src/main/xsd/logback.xsd">
<!-- 定义日志打印的根目录,不同的项目之间一般只用改这个属性值就够了 -->
<property name="LOG_HOME" value="/home/goats/work/changhaoshiye/logs" />
<!-- 输出到控制台 -->
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<!-- 输出的格式 -->
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<pattern>%-25d{yyyy-MM-dd HH:mm:ss.SSS} %green(%-5level) %boldBlue(-->) %msg%n</pattern>
<!-- <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level [%thread] --> <!-- %logger{50}: %msg%n</pattern> -->
</encoder>
</appender>
<!-- 文件输出 -->
<appender name="FILE"
class="ch.qos.logback.core.rolling.RollingFileAppender">
<!-- 配置滚动的策略 -->
<rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
<!-- 日志名称的格式LOG_HOME/年_月/年_月_日/年_月_日_时(索引).log -->
<fileNamePattern>${LOG_HOME}/%d{yyyy_MM}/%d{yyyy_MM_dd}/%d{yyyy_MM_dd_HH}(%i).log</fileNamePattern>
<!-- 单log文件最大大小 -->
<maxFileSize>1MB</maxFileSize>
<!-- 保存的最长时间:天数 -->
<maxHistory>3650</maxHistory>
</rollingPolicy>
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level [%thread] --> : %msg%n</pattern>
</encoder>
</appender>
<!-- 相当于logger元素只是name值已经确定为root了 -->
<root level="INFO">
<appender-ref ref="STDOUT" />
<appender-ref ref="FILE" />
</root>
<!-- 可以设置具体到某个包的日志打印规则 -->
<!-- 注意: level属性也可以直接写在logger上 <logger name="ws.log.logback.LogbackTest"
additivity="false" level="INFO"> <appender-ref ref="STDOUT" /> </logger> -->
<!-- <logger name="com.cr.utils.FC" additivity="false">
<level value="DEBUG" />
<appender-ref ref="STDOUT" />
<appender-ref ref="FILE" />
</logger> -->
</configuration>

View File

@ -0,0 +1,28 @@
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper
PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.goats.mapper.MqttDataMapper">
<insert id="insertMqttData">
insert into s_gateway(Rid, id, A001, A002, A003, A004, A005, A006, A007, A008, A009, A010,
A011, A012, A013, A014, A015, A016, A017, A018, A019, A020, A021, A022,
A023, A024, A025, A026, A027, A028, A029, A030, A031, A032, A033, A034,
A035, A036, A037, A038, A039, A040, A041, A042, A043, A044, A045, A046,
A047, A048, A049, A050, A051, A052, A053, A054, A055, A056, A057, A058,
A059, A060, A061, A062, A063, A064, A065, A066, A067, A068, A069, A070,
A071, A072, A073, A074, A075, A076, A077, A078, A079, A080, A081, A082,
A083, A084, A085)
values (#{Rid}, #{id}, #{A001}, #{A002}, #{A003}, #{A004}, #{A005}, #{A006}, #{A007},
#{A008}, #{A009}, #{A010}, #{A011}, #{A012}, #{A013}, #{A014}, #{A015}, #{A016},
#{A017}, #{A018}, #{A019}, #{A020}, #{A021}, #{A022}, #{A023}, #{A024}, #{A025},
#{A026}, #{A027}, #{A028}, #{A029}, #{A030}, #{A031}, #{A032}, #{A033}, #{A034},
#{A035}, #{A036}, #{A037}, #{A038}, #{A039}, #{A040}, #{A041}, #{A042}, #{A043},
#{A044}, #{A045}, #{A046}, #{A047}, #{A048}, #{A049}, #{A050}, #{A051}, #{A052},
#{A053}, #{A054}, #{A055}, #{A056}, #{A057}, #{A058}, #{A059}, #{A060}, #{A061},
#{A062}, #{A063}, #{A064}, #{A065}, #{A066}, #{A067}, #{A068}, #{A069}, #{A070},
#{A071}, #{A072}, #{A073}, #{A074}, #{A075}, #{A076}, #{A077}, #{A078}, #{A079},
#{A080}, #{A081}, #{A082}, #{A083}, #{A084}, #{A085});
</insert>
</mapper>

View File

@ -0,0 +1,88 @@
package com.goats;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class ProcessTimeCalculator {
public static void main(String[] args) {
String filePath = "D:\\GOATS\\昌昊项目资料\\sql\\s_data_202502271336.txt";
List<ProductionRecord> records = new ArrayList<>();
List<ProcessTimeResult> results = new ArrayList<>();
// 读取数据
try (BufferedReader br = new BufferedReader(new FileReader(filePath))) {
String line;
boolean isHeader = true;
while ((line = br.readLine()) != null) {
if (isHeader) { // 跳过表头
isHeader = false;
continue;
}
String[] fields = line.split("\\|");
if (fields.length < 8) continue;
ProductionRecord record = new ProductionRecord();
record.rid = Long.parseLong(fields[1].trim());
record.id = Integer.parseInt(fields[2].trim());
record.a002CycleTime = Double.parseDouble(fields[4].trim().replace(",", "")); //循环时间
record.a003RunningTime = Double.parseDouble(fields[5].trim().replace(",", ""));//运行时间
record.a005Quantity = Integer.parseInt(fields[7].trim().replace(",", ""));//加工数量
records.add(record);
}
} catch (IOException e) {
e.printStackTrace();
}
// 计算加工工时
int previousQuantity = 0;
double previousRunningTime = 0;
for (ProductionRecord record : records) {
if (record.a005Quantity > previousQuantity) {
if (previousQuantity != 0) { // 忽略初始值
double cycleTime = record.a003RunningTime - previousRunningTime;
results.add(new ProcessTimeResult(
record.a005Quantity,
cycleTime,
record.a002CycleTime
));
}
previousQuantity = record.a005Quantity;
previousRunningTime = record.a003RunningTime;
}
}
// 打印结果
System.out.println("零件编号 | 加工工时 (分钟) | A002循环时间 (秒)");
for (ProcessTimeResult result : results) {
System.out.printf("%-8d | %-12.2f | %-15.2f%n",
result.partNumber,
result.processTime,
result.a002CycleTime
);
}
}
static class ProductionRecord {
long rid;
int id;
double a002CycleTime;
double a003RunningTime;
int a005Quantity;
}
static class ProcessTimeResult {
int partNumber;
double processTime;
double a002CycleTime;
ProcessTimeResult(int partNumber, double processTime, double a002CycleTime) {
this.partNumber = partNumber;
this.processTime = processTime;
this.a002CycleTime = a002CycleTime;
}
}
}