Skip to content

Commit

Permalink
TrimNull
Browse files Browse the repository at this point in the history
  • Loading branch information
nnhy committed Dec 24, 2023
1 parent c6f81d5 commit 78e12c8
Show file tree
Hide file tree
Showing 7 changed files with 81 additions and 71 deletions.
56 changes: 29 additions & 27 deletions NewLife.MQTT/Handlers/IMqttHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,27 +13,27 @@ public interface IMqttHandler
/// <summary>处理消息</summary>
/// <param name="message">消息</param>
/// <returns></returns>
MqttMessage Process(MqttMessage message);
MqttMessage? Process(MqttMessage message);

/// <summary>发布消息</summary>
/// <param name="topic">主题</param>
/// <param name="data">消息数据</param>
/// <param name="qos">服务质量</param>
/// <returns></returns>
Task<MqttIdMessage> PublishAsync(String topic, Object data, QualityOfService qos = QualityOfService.AtMostOnce);
Task<MqttIdMessage?> PublishAsync(String topic, Object data, QualityOfService qos = QualityOfService.AtMostOnce);

/// <summary>发布消息</summary>
/// <param name="topic">主题</param>
/// <param name="data">消息数据</param>
/// <param name="qos">服务质量</param>
/// <param name="AllowExchange">允许消息交换</param>
/// <returns></returns>
Task<MqttIdMessage> PublishAsync(String topic, Object data, Boolean AllowExchange, QualityOfService qos = QualityOfService.AtMostOnce);
Task<MqttIdMessage?> PublishAsync(String topic, Object data, Boolean AllowExchange, QualityOfService qos = QualityOfService.AtMostOnce);

/// <summary>发布消息</summary>
/// <param name="message">消息</param>
/// <returns></returns>
Task<MqttIdMessage> PublishAsync(PublishMessage message);
Task<MqttIdMessage?> PublishAsync(PublishMessage message);

/// <summary>关闭连接。网络连接被关闭时触发</summary>
/// <param name="reason"></param>
Expand All @@ -47,28 +47,28 @@ public interface IMqttHandler
public class MqttHandler : IMqttHandler, ITracerFeature, ILogFeature
{
/// <summary>网络会话</summary>
public INetSession Session { get; set; }
public INetSession Session { get; set; } = null!;

/// <summary>消息交换机</summary>
public MqttExchange Exchange { get; set; }
public MqttExchange? Exchange { get; set; }

#region 接收消息
/// <summary>处理消息</summary>
/// <param name="message">消息</param>
/// <returns></returns>
public virtual MqttMessage Process(MqttMessage message)
public virtual MqttMessage? Process(MqttMessage message)
{
MqttMessage rs = null;
MqttMessage? rs = null;
rs = message.Type switch
{
MqttType.Connect => OnConnect(message as ConnectMessage),
MqttType.Publish => OnPublish(message as PublishMessage),
MqttType.PubRel => OnPublishRelease(message as PubRel),
MqttType.PubRec => OnPublishReceive(message as PubRec),
MqttType.Subscribe => OnSubscribe(message as SubscribeMessage),
MqttType.UnSubscribe => OnUnsubscribe(message as UnsubscribeMessage),
MqttType.PingReq => OnPing(message as PingRequest),
MqttType.Disconnect => OnDisconnect(message as DisconnectMessage),
MqttType.Connect => OnConnect((message as ConnectMessage)!),
MqttType.Publish => OnPublish((message as PublishMessage)!),
MqttType.PubRel => OnPublishRelease((message as PubRel)!),
MqttType.PubRec => OnPublishReceive((message as PubRec)!),
MqttType.Subscribe => OnSubscribe((message as SubscribeMessage)!),
MqttType.UnSubscribe => OnUnsubscribe((message as UnsubscribeMessage)!),
MqttType.PingReq => OnPing((message as PingRequest)!),
MqttType.Disconnect => OnDisconnect((message as DisconnectMessage)!),
_ => null,
};
return rs;
Expand All @@ -77,7 +77,7 @@ public virtual MqttMessage Process(MqttMessage message)
/// <summary>客户端连接时</summary>
/// <param name="message">消息</param>
/// <returns></returns>
protected virtual ConnAck OnConnect(ConnectMessage message)
protected virtual ConnAck? OnConnect(ConnectMessage message)
{
Exchange?.Add(Session.ID, this);

Expand All @@ -87,7 +87,7 @@ protected virtual ConnAck OnConnect(ConnectMessage message)
/// <summary>客户端断开时</summary>
/// <param name="message">消息</param>
/// <returns></returns>
protected virtual MqttMessage OnDisconnect(DisconnectMessage message)
protected virtual MqttMessage? OnDisconnect(DisconnectMessage message)
{
Exchange?.Remove(Session.ID);

Expand All @@ -97,12 +97,12 @@ protected virtual MqttMessage OnDisconnect(DisconnectMessage message)
/// <summary>收到心跳时</summary>
/// <param name="message">消息</param>
/// <returns></returns>
protected virtual PingResponse OnPing(PingRequest message) => new();
protected virtual PingResponse? OnPing(PingRequest message) => new();

/// <summary>收到发布消息时</summary>
/// <param name="message">消息</param>
/// <returns></returns>
protected virtual MqttIdMessage OnPublish(PublishMessage message)
protected virtual MqttIdMessage? OnPublish(PublishMessage message)
{
Exchange?.Publish(message);

Expand Down Expand Up @@ -169,10 +169,11 @@ protected virtual UnsubAck OnUnsubscribe(UnsubscribeMessage message)
/// <param name="data">消息数据</param>
/// <param name="qos">服务质量</param>
/// <returns></returns>
public async Task<MqttIdMessage> PublishAsync(String topic, Object data, QualityOfService qos = QualityOfService.AtMostOnce)
public async Task<MqttIdMessage?> PublishAsync(String topic, Object data, QualityOfService qos = QualityOfService.AtMostOnce)
{
var pk = data as Packet;
if (pk == null && data != null) pk = Serialize(data);
if (pk == null) throw new ArgumentNullException(nameof(data));

var message = new PublishMessage
{
Expand All @@ -190,10 +191,11 @@ public async Task<MqttIdMessage> PublishAsync(String topic, Object data, Quality
/// <param name="qos">服务质量</param>
/// <param name="AllowExchange">允许消息交换</param>
/// <returns></returns>
public async Task<MqttIdMessage> PublishAsync(String topic, Object data, Boolean AllowExchange, QualityOfService qos = QualityOfService.AtMostOnce)
public async Task<MqttIdMessage?> PublishAsync(String topic, Object data, Boolean AllowExchange, QualityOfService qos = QualityOfService.AtMostOnce)
{
var pk = data as Packet;
if (pk == null && data != null) pk = Serialize(data);
if (pk == null) throw new ArgumentNullException(nameof(data));

var message = new PublishMessage
{
Expand All @@ -208,7 +210,7 @@ public async Task<MqttIdMessage> PublishAsync(String topic, Object data, Boolean
/// <summary>发布消息</summary>
/// <param name="message">消息</param>
/// <returns></returns>
public async Task<MqttIdMessage> PublishAsync(PublishMessage message)
public async Task<MqttIdMessage?> PublishAsync(PublishMessage message)
{
if (message == null) throw new ArgumentNullException(nameof(message));

Expand All @@ -229,7 +231,7 @@ public async Task<MqttIdMessage> PublishAsync(PublishMessage message)
/// <param name="msg">消息</param>
/// <param name="waitForResponse">是否等待响应</param>
/// <returns></returns>
protected virtual async Task<MqttMessage> SendAsync(MqttMessage msg, Boolean waitForResponse = true)
protected virtual async Task<MqttMessage?> SendAsync(MqttMessage msg, Boolean waitForResponse = true)
{
if (msg is MqttIdMessage idm && idm.Id == 0 && (msg.Type != MqttType.Publish || msg.QoS > 0))
idm.Id = (UInt16)Interlocked.Increment(ref g_id);
Expand Down Expand Up @@ -291,14 +293,14 @@ protected virtual Packet Serialize(Object data)

#region 日志
/// <summary>链路追踪</summary>
public ITracer Tracer { get; set; }
public ITracer? Tracer { get; set; }

/// <summary>日志</summary>
public ILog Log { get; set; }
public ILog Log { get; set; } = null!;

/// <summary>写日志</summary>
/// <param name="format"></param>
/// <param name="args"></param>
public void WriteLog(String format, params Object[] args) => Log?.Info($"[MqttServer]{format}", args);
public void WriteLog(String format, params Object?[] args) => Log?.Info($"[MqttServer]{format}", args);
#endregion
}
12 changes: 6 additions & 6 deletions NewLife.MQTT/Messaging/ConnectMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -76,22 +76,22 @@ public sealed class ConnectMessage : MqttMessage
public UInt16 KeepAliveInSeconds { get; set; }

/// <summary>用户名</summary>
public String Username { get; set; }
public String? Username { get; set; }

/// <summary>密码</summary>
public String Password { get; set; }
public String? Password { get; set; }

/// <summary>客户端标识。必填项</summary>
public String ClientId { get; set; }
public String? ClientId { get; set; }

/// <summary>遗嘱主题</summary>
public String WillTopicName { get; set; }
public String? WillTopicName { get; set; }

/// <summary>遗嘱消息</summary>
public Packet WillMessage { get; set; }
public Packet? WillMessage { get; set; }

/// <summary>属性集合。MQTT5.0</summary>
public IDictionary<Byte, UInt32> Properties { get; set; }
public IDictionary<Byte, UInt32>? Properties { get; set; }
#endregion

#region 构造
Expand Down
10 changes: 5 additions & 5 deletions NewLife.MQTT/Messaging/MqttMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -186,26 +186,26 @@ protected Byte[] ReadData(Stream stream)
/// <summary>写字符串</summary>
/// <param name="stream"></param>
/// <param name="value"></param>
protected void WriteString(Stream stream, String value) => WriteData(stream, value?.GetBytes());
protected void WriteString(Stream stream, String? value) => WriteData(stream, value?.GetBytes());

/// <summary>写字节数组</summary>
/// <param name="stream"></param>
/// <param name="buf"></param>
protected void WriteData(Stream stream, Byte[] buf)
protected void WriteData(Stream stream, Byte[]? buf)
{
var len = buf == null ? 0 : buf.Length;
stream.Write(((UInt16)len).GetBytes(false));
if (len > 0) stream.Write(buf);
if (len > 0 && buf != null) stream.Write(buf);
}

/// <summary>写字节数组</summary>
/// <param name="stream"></param>
/// <param name="pk"></param>
protected void WriteData(Stream stream, Packet pk)
protected void WriteData(Stream stream, Packet? pk)
{
var len = pk == null ? 0 : pk.Total;
stream.Write(((UInt16)len).GetBytes(false));
if (len > 0) pk.CopyTo(stream);
if (len > 0 && pk != null) pk.CopyTo(stream);
}
#endregion
}
2 changes: 1 addition & 1 deletion NewLife.MQTT/Messaging/Subscription.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public class Subscription : IEquatable<Subscription>
public QualityOfService QualityOfService { get; }

/// <summary>消息处理方法</summary>
public Action<PublishMessage> Callback { get; set; }
public Action<PublishMessage>? Callback { get; set; }
#endregion

#region 构造
Expand Down
Loading

0 comments on commit 78e12c8

Please sign in to comment.