diff --git a/Doc/Demo.png b/Doc/Demo.png new file mode 100644 index 0000000..2c0c0b3 Binary files /dev/null and b/Doc/Demo.png differ diff --git a/NewLife.MqttServer/NewLife.MqttServer.csproj b/NewLife.MqttServer/NewLife.MqttServer.csproj index f38dfb5..78099b3 100644 --- a/NewLife.MqttServer/NewLife.MqttServer.csproj +++ b/NewLife.MqttServer/NewLife.MqttServer.csproj @@ -22,7 +22,7 @@ - + diff --git a/README.md b/README.md index 0a684d9..14dfec9 100644 --- a/README.md +++ b/README.md @@ -7,22 +7,22 @@ ![Nuget (with prereleases)](https://img.shields.io/nuget/vpre/newlife.mqtt?label=dev%20nuget&logo=nuget) MQTT协议是物联网领域最流行的通信协议! -NewLife.MQTT包含了MQTT的完整实现,并实现了客户端MqttClient,以及服务端MqttServer。 -其中MqttServer仅实现基本网络框架,支持消息收发,完整的消息交换功能位于商用版IoT平台NewLife.IoT中。 +`NewLife.MQTT`包含了MQTT的完整实现,并实现了客户端`MqttClient`,以及服务端`MqttServer`。 +其中MqttServer仅实现基本网络框架,支持消息收发,完整的消息交换功能位于商用版IoT平台[FIoT](https://newlifex.com/iot/fiot)中。 ## MQTT协议 最流行的物联网通信协议MQTT,包括客户端、服务端和Web管理平台。 提供订阅/发布模式,更为简约、轻量,易于使用,针对受限环境(带宽低、网络延迟高、网络通信不稳定),可以简单概括为物联网打造,官方总结特点如下: -1.使用发布/订阅消息模式,提供一对多的消息发布,解除应用程序耦合。 -2.对负载内容屏蔽的消息传输。 -3.使用 TCP/IP 提供网络连接。 -4.有三种消息发布服务质量: +1. 使用发布/订阅消息模式,提供一对多的消息发布,解除应用程序耦合。 +2. 对负载内容屏蔽的消息传输。 +3. 使用 TCP/IP 提供网络连接。 +4. 有三种消息发布服务质量: “至多一次”,消息发布完全依赖底层 TCP/IP 网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。 “至少一次”,确保消息到达,但消息重复可能会发生。 “只有一次”,确保消息到达一次。这一级别可用于如下情况,在计费系统中,消息重复或丢失会导致不正确的结果。 -5.小型传输,开销很小(固定长度的头部是 2 字节),协议交换最小化,以降低网络流量。 -6.使用 Last Will 和 Testament 特性通知有关各方客户端异常中断的机制。 +5. 小型传输,开销很小(固定长度的头部是 2 字节),协议交换最小化,以降低网络流量。 +6. 使用 Last Will 和 Testament 特性通知有关各方客户端异常中断的机制。 ## MQTT 发布与订阅 发布时,指定消息Qos,broker保存的消息包含了Qos; @@ -39,6 +39,119 @@ NewLife.MQTT包含了MQTT的完整实现,并实现了客户端MqttClient,以 订阅Qos=2,broker推送Qos=2消息,客户端先回PubRec,broker再次发送PubRel,客户端答复PubComp,消息才算消费完成; 发布Qos=2消息时,双重确认流程不需要等消费端在线,仅限于发布者与broker之间即可完成。 +## 快速尝鲜 +打开源码解决方案,把Test设为启动项目,启动即可。 +默认先后启动TestServer和TestClient。 +![Demo](Doc/demo.png) + +## 服务端 +Nuget引用`NewLife.MQTT`,使用以下代码启动服务端: +```csharp +var services = ObjectContainer.Current; +services.AddSingleton(XTrace.Log); +services.AddTransient(); +services.AddSingleton(); + +var server = new MqttServer +{ + Port = 1883, + ServiceProvider = services.BuildServiceProvider(), + + Log = XTrace.Log, + SessionLog = XTrace.Log, +}; +server.Start(); +``` +通过指定端口1883,默认处理器`MqttHandler`,默认交换机`MqttExchange`,启动服务端。 + +## 客户端 +Nuget引用`NewLife.MQTT`,使用以下代码连接服务端: +```csharp +var client = new MqttClient +{ + Log = XTrace.Log, + Server = "tcp://127.0.0.1:1883", + //UserName = "admin", + //Password = "admin", + ClientId = Guid.NewGuid() + "", +}; + +await client.ConnectAsync(); + +// 订阅“/test”主题 +var rt = await client.SubscribeAsync("/test", (e) => +{ + XTrace.WriteLine("收到消息:" + "/test/# =>" + e.Topic + ":" + e.Payload.ToStr()); +}); + +// 每2秒向“/test”主题发布一条消息 +while (true) +{ + try + { + var msg = "学无先后达者为师" + Rand.NextString(8); + await client.PublishAsync("/test", msg); + } + catch (Exception ex) + { + XTrace.WriteException(ex); + } + await Task.Delay(2000); +} +``` +客户端连接服务端有几个要素:`服务端地址`、`用户名`、`密码`、`客户端标识`,然后通过`ConnectAsync`连接服务端。 +客户端可以是消费者角色,通过`SubscribeAsync`订阅指定Topic。 +客户端也可以是生产者角色,通过`PublishAsync`发布消息到指定Topic。 + +## 自定义服务端 +需要在服务端处理客户端连接和消息交互逻辑时,就需要自定义服务端。例如IoT平台,在收到设备上报MQTT数据以后,直接接收落库,而不需要再次消费。 +自定义处理器示例如下: +```csharp +private class MyHandler : MqttHandler +{ + private readonly ILog _log; + + public MyHandler(ILog log) => _log = log; + + protected override ConnAck OnConnect(ConnectMessage message) + { + _log.Info("客户端[{0}]连接 user={1} pass={2} clientId={3}", Session.Remote.EndPoint, message.Username, message.Password, message.ClientId); + + return base.OnConnect(message); + } + + protected override MqttMessage OnDisconnect(DisconnectMessage message) + { + _log.Info("客户端[{0}]断开", Session.Remote); + + return base.OnDisconnect(message); + } + + protected override MqttIdMessage OnPublish(PublishMessage message) + { + _log.Info("客户端[{0}]发布[{1}:qos={2}]: {3}", Session.Remote, message.Topic, (Int32)message.QoS, message.Payload.ToStr()); + + return base.OnPublish(message); + } +} +``` +稍微修改一下服务端注入处理器的代码即可使用自定义处理器: +```csharp +var services = ObjectContainer.Current; +services.AddSingleton(XTrace.Log); +services.AddTransient(); +services.AddSingleton(); + +var server = new MqttServer +{ + Port = 1883, + ServiceProvider = services.BuildServiceProvider(), + + Log = XTrace.Log, + SessionLog = XTrace.Log, +}; +server.Start(); +``` ## 新生命项目矩阵 各项目默认支持net7.0/netstandard2.1/netstandard2.0/net4.61,旧版(2022.1225)支持net4.5/net4.0/net2.0 diff --git a/Test/Program.cs b/Test/Program.cs index bb4a4e1..a6e95ec 100644 --- a/Test/Program.cs +++ b/Test/Program.cs @@ -3,7 +3,6 @@ using NewLife.MQTT; using NewLife.MQTT.Handlers; using NewLife.MQTT.Messaging; -using NewLife.Net; using NewLife.Security; namespace Test; @@ -22,7 +21,8 @@ private static void Main(String[] args) try { - Test2(); + TestServer(); + TestClient(); //var mi = typeof(Program).GetMethod("Test" + idx, BindingFlags.Static | BindingFlags.Public | BindingFlags.NonPublic); //if (mi != null) mi.Invoke(null, null); } @@ -57,43 +57,24 @@ private static void Test1() private static MqttServer _server; - private static async void Test2() + private static void TestServer() { - var ioc = ObjectContainer.Current; - ioc.AddSingleton(XTrace.Log); - ioc.AddTransient(); + var services = ObjectContainer.Current; + services.AddSingleton(XTrace.Log); + services.AddTransient(); + services.AddSingleton(); var server = new MqttServer { - Port = 50001, - ServiceProvider = ioc.BuildServiceProvider(), + Port = 1883, + ServiceProvider = services.BuildServiceProvider(), Log = XTrace.Log, SessionLog = XTrace.Log, }; - //server.AddHandler(new MyHandler()); server.Start(); _server = server; - - //var client = new MqttClient - //{ - // Server = "tcp://127.0.0.1:1883", - // Log = XTrace.Log - //}; - - //await client.ConnectAsync(); - - //for (var i = 0; i < 10; i++) - //{ - // var qos = (QualityOfService)(i % 3); - - // await client.PublishAsync("test", new { name = "p" + i, value = Rand.Next() }, qos); - - // await Task.Delay(1000); - //} - - //await client.DisconnectAsync(); } private class MyHandler : MqttHandler @@ -129,9 +110,9 @@ protected override MqttIdMessage OnPublish(PublishMessage message) /// /// 测试完整发布订阅 /// - private static async void Test3() + private static async void TestClient() { - _mc = new MqttClient + var client = new MqttClient { Log = XTrace.Log, Server = "tcp://127.0.0.1:1883", @@ -140,24 +121,29 @@ private static async void Test3() ClientId = Guid.NewGuid() + "", }; - await _mc.ConnectAsync(); - //订阅“/test”主题 - var rt = await _mc.SubscribeAsync("/test", (e) => + await client.ConnectAsync(); + + // 订阅“/test”主题 + var rt = await client.SubscribeAsync("/test", (e) => { XTrace.WriteLine("收到消息:" + "/test/# =>" + e.Topic + ":" + e.Payload.ToStr()); }); + + // 每2秒向“/test”主题发布一条消息 while (true) { - //每2秒向“/test”主题发布一条消息 try { var msg = "学无先后达者为师" + Rand.NextString(8); - await _mc.PublishAsync("/test", msg); + await client.PublishAsync("/test", msg); } catch (Exception ex) { + XTrace.WriteException(ex); } await Task.Delay(2000); } + + _mc = client; } } \ No newline at end of file diff --git a/XUnitTestClient/XUnitTestClient.csproj b/XUnitTestClient/XUnitTestClient.csproj index affb158..4acc34d 100644 --- a/XUnitTestClient/XUnitTestClient.csproj +++ b/XUnitTestClient/XUnitTestClient.csproj @@ -8,10 +8,10 @@ - + - + all runtime; build; native; contentfiles; analyzers; buildtransitive