Skip to content

Commit

Permalink
refactor: 优化mqtt设备接入
Browse files Browse the repository at this point in the history
  • Loading branch information
zhou-hao committed Mar 7, 2024
1 parent 101aa5b commit f3a5fb2
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.jetlinks.community.network.NetworkType;
import org.jetlinks.community.network.mqtt.gateway.device.session.MqttConnectionSession;
import org.jetlinks.community.network.mqtt.server.MqttConnection;
import org.jetlinks.community.network.mqtt.server.MqttPublishing;
import org.jetlinks.community.network.mqtt.server.MqttServer;
import org.jetlinks.community.network.utils.DeviceGatewayHelper;
import org.jetlinks.community.utils.SystemUtils;
Expand Down Expand Up @@ -112,27 +113,25 @@ private void doStart() {
conn.reject(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
monitor.rejected();
}
return isStarted();
return true;
})
.publishOn(Schedulers.parallel())
//处理mqtt连接请求
.flatMap(this::handleConnection)
//处理认证结果
.flatMap(tuple3 -> handleAuthResponse(tuple3.getT1(), tuple3.getT2(), tuple3.getT3()))
.flatMap(tp -> handleAcceptedMqttConnection(tp.getT1(), tp.getT2(), tp.getT3()), Integer.MAX_VALUE)
.contextWrite(ReactiveLogger.start("network", mqttServer.getId()))
.flatMap(connection -> this
.handleConnection(connection)
.flatMap(tuple3 -> handleAuthResponse(tuple3.getT1(), tuple3.getT2(), tuple3.getT3()))
.flatMap(tp -> handleAcceptedMqttConnection(tp.getT1(), tp.getT2(), tp.getT3()))
.onErrorResume(err -> {
log.error(err.getMessage(), err);
return Mono.empty();
}),
Integer.MAX_VALUE)
.subscribe();

}

//处理连接,并进行认证
private Mono<Tuple3<DeviceOperator, AuthenticationResponse, MqttConnection>> handleConnection(MqttConnection connection) {
//内存不够了
if (SystemUtils.memoryIsOutOfWatermark()) {
//直接拒绝,响应SERVER_UNAVAILABLE,不再处理此连接
connection.reject(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
return Mono.empty();
}

return Mono
.justOrEmpty(connection.getAuth())
.flatMap(auth -> {
Expand Down Expand Up @@ -170,7 +169,7 @@ private Mono<Tuple3<DeviceOperator, AuthenticationResponse, MqttConnection>> han
//应答SERVER_UNAVAILABLE
connection.reject(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
}))
.subscribeOn(Schedulers.parallel());
;
}

//处理认证结果
Expand All @@ -190,7 +189,7 @@ private Mono<Tuple3<MqttConnection, DeviceOperator, MqttConnectionSession>> hand
monitor.totalConnection(counter.sum());

sessionManager
.getSession(deviceId)
.getSession(deviceId, false)
.flatMap(_tmp -> {
//只有与创建的会话相同才移除(下线),因为有可能设置了keepOnline,
//或者设备通过其他方式注册了会话,这里断开连接不能影响到以上情况.
Expand Down Expand Up @@ -219,19 +218,21 @@ private Mono<Tuple3<MqttConnection, DeviceOperator, MqttConnectionSession>> hand
})
.defaultIfEmpty(newSession);
})
.flatMap(session -> Mono.fromCallable(() -> {
.mapNotNull(session->{
try {
return Tuples.of(connection.accept(), device, session.unwrap(MqttConnectionSession.class));
} catch (IllegalStateException ignore) {
//忽略错误,偶尔可能会出现网络异常,导致accept时,连接已经中断.还有其他更好的处理方式?
return null;
}
}))
})
.doOnNext(o -> {
//监控信息
monitor.connected();
monitor.totalConnection(counter.sum());
});
})
//会话empty说明注册会话失败?
.switchIfEmpty(Mono.fromRunnable(() -> connection.reject(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED)));
} else {
//认证失败返回 0x04 BAD_USER_NAME_OR_PASSWORD
connection.reject(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD);
Expand All @@ -253,20 +254,16 @@ private Mono<Tuple3<MqttConnection, DeviceOperator, MqttConnectionSession>> hand
private Mono<Void> handleAcceptedMqttConnection(MqttConnection connection,
DeviceOperator operator,
MqttConnectionSession session) {


return Flux
.usingWhen(Mono.just(connection),
MqttConnection::handleMessage,
MqttConnection::close)
//网关暂停或者已停止时,则不处理消息
.filter(pb -> isStarted())
.doOnNext(msg -> monitor.receivedMessage())
.publishOn(Schedulers.parallel())
//解码收到的mqtt报文
.flatMap(publishing -> this
.decodeAndHandleMessage(operator, session, publishing.getMessage(), connection)
//应答MQTT(QoS1,2的场景)
.doOnSuccess(s -> publishing.acknowledge())
.concatMap(publishing -> this
.decodeAndHandleMessage(operator, session, publishing, connection)
)
//合并遗言消息
.mergeWith(
Expand All @@ -282,30 +279,32 @@ private Mono<Void> decodeAndHandleMessage(DeviceOperator operator,
MqttConnectionSession session,
MqttMessage message,
MqttConnection connection) {
monitor.receivedMessage();

return operator
.getProtocol()
.flatMap(protocol -> protocol.getMessageCodec(getTransport()))
//解码
.flatMapMany(codec -> codec.decode(FromDeviceMessageContext.of(session, message, registry)))
.cast(DeviceMessage.class)
.flatMap(msg -> {
.concatMap(msg -> {
//回填deviceId,有的场景协议包不能或者没有解析出deviceId,则直接使用连接对应的设备id进行填充.
if (!StringUtils.hasText(msg.getDeviceId())) {
msg.thingId(DeviceThingType.device, operator.getDeviceId());
}
return this
.handleMessage(operator, msg, connection);
return this.handleMessage(operator, msg, connection);
})
.doOnComplete(() -> {
if (message instanceof MqttPublishing) {
((MqttPublishing) message).acknowledge();
}
})
.doOnEach(ReactiveLogger.onError(err -> log.error("处理MQTT连接[{}]消息失败:{}", operator.getDeviceId(), message, err)))
.as(FluxTracer
.create(DeviceTracer.SpanName.decode(operator.getDeviceId()),
(span, msg) -> span.setAttribute(DeviceTracer.SpanKey.message, msg
.toJson()
.toJSONString())))
//发生错误不中断流
.onErrorResume((err) -> Mono.empty())
.then()
.subscribeOn(Schedulers.parallel());
.onErrorResume((err) -> {
log.error("handle mqtt message [{}] error:{}", operator.getDeviceId(), message, err);
return Mono.empty();
})
.then();
}

private Mono<DeviceMessage> handleMessage(DeviceOperator mainDevice,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.jetlinks.core.message.codec.MqttMessage;
import org.jetlinks.core.message.codec.SimpleMqttMessage;
import org.jetlinks.core.server.mqtt.MqttAuth;
import org.jetlinks.core.utils.Reactors;
import reactor.core.publisher.*;

import javax.annotation.Nonnull;
Expand All @@ -41,7 +42,7 @@ class VertxMqttConnection implements MqttConnection {
private long keepAliveTimeoutMs;
@Getter
private long lastPingTime = System.currentTimeMillis();
private volatile boolean closed = false, accepted = false, autoAckSub = true, autoAckUnSub = true, autoAckMsg = true;
private volatile boolean closed = false, accepted = false, autoAckSub = true, autoAckUnSub = true, autoAckMsg = false;
private static final MqttAuth emptyAuth = new MqttAuth() {
@Override
public String getUsername() {
Expand All @@ -53,19 +54,9 @@ public String getPassword() {
return "";
}
};
private final Sinks.Many<MqttPublishing> messageProcessor = Sinks
.many()
.multicast()
.onBackpressureBuffer(Integer.MAX_VALUE);

private final Sinks.Many<MqttSubscription> subscription = Sinks
.many()
.multicast()
.onBackpressureBuffer(Integer.MAX_VALUE);
private final Sinks.Many<MqttUnSubscription> unsubscription = Sinks
.many()
.multicast()
.onBackpressureBuffer(Integer.MAX_VALUE);
private final Sinks.Many<MqttPublishing> messageProcessor = Reactors.createMany(Integer.MAX_VALUE, false);
private final Sinks.Many<MqttSubscription> subscription = Reactors.createMany(Integer.MAX_VALUE, false);
private final Sinks.Many<MqttUnSubscription> unsubscription = Reactors.createMany(Integer.MAX_VALUE, false);


public VertxMqttConnection(MqttEndpoint endpoint) {
Expand Down Expand Up @@ -178,7 +169,7 @@ void init() {
publishing.acknowledge();
}
if (hasDownstream) {
this.messageProcessor.tryEmitNext(publishing);
this.messageProcessor.emitNext(publishing, Reactors.emitFailureHandler());
}
})
//QoS 1 PUBACK
Expand Down Expand Up @@ -211,7 +202,7 @@ void init() {
subscription.acknowledge();
}
if (hasDownstream) {
this.subscription.tryEmitNext(subscription);
this.subscription.emitNext(subscription, Reactors.emitFailureHandler());
}
})
.unsubscribeHandler(msg -> {
Expand All @@ -222,7 +213,7 @@ void init() {
unSubscription.acknowledge();
}
if (hasDownstream) {
this.unsubscription.tryEmitNext(unSubscription);
this.unsubscription.emitNext(unSubscription, Reactors.emitFailureHandler());
}
});
}
Expand Down

0 comments on commit f3a5fb2

Please sign in to comment.