Skip to content

Commit

Permalink
feat: 优化topic解析
Browse files Browse the repository at this point in the history
  • Loading branch information
zhou-hao committed Aug 29, 2024
1 parent 99e1569 commit 845206f
Showing 1 changed file with 22 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@
import org.jetlinks.core.device.session.DeviceSessionManager;
import org.jetlinks.core.event.EventBus;
import org.jetlinks.core.message.*;
import org.jetlinks.core.message.collector.ReportCollectorDataMessage;
import org.jetlinks.core.message.event.EventMessage;
import org.jetlinks.core.server.MessageHandler;
import org.jetlinks.core.server.session.ChildrenDeviceSession;
import org.jetlinks.core.server.session.DeviceSession;
import org.jetlinks.supports.server.DecodedClientMessageHandler;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

Expand Down Expand Up @@ -130,6 +132,26 @@ public class DeviceMessageConnector implements DecodedClientMessageHandler {
});
//上报了新的物模型
createFastBuilder(MessageType.DERIVED_METADATA, "/metadata/derived");
//状态检查
createFastBuilder(MessageType.STATE_CHECK, "/message/state_check");
createFastBuilder(MessageType.STATE_CHECK_REPLY, "/message/state_check_reply");

//数采相关消息 since 2.2
createFastBuilder(MessageType.REPORT_COLLECTOR, ((message, stringBuilder) -> {
String addr = message.getHeaderOrElse(ReportCollectorDataMessage.ADDRESS, null);
stringBuilder.append("/message/collector/report");
if (StringUtils.hasText(addr)) {
if (!addr.startsWith("/")) {
stringBuilder.append('/');
}
stringBuilder.append(addr);
}
}));
createFastBuilder(MessageType.READ_COLLECTOR_DATA, "/message/collector/read");
createFastBuilder(MessageType.READ_COLLECTOR_DATA_REPLY, "/message/collector/read/reply");
createFastBuilder(MessageType.WRITE_COLLECTOR_DATA, "/message/collector/write");
createFastBuilder(MessageType.WRITE_COLLECTOR_DATA_REPLY, "/message/collector/write/reply");

}

private final DeviceRegistry registry;
Expand Down

0 comments on commit 845206f

Please sign in to comment.