Skip to content

Commit

Permalink
调整示例代码,同时测试客户端服务端
Browse files Browse the repository at this point in the history
  • Loading branch information
nnhy committed Nov 13, 2023
1 parent 2eb3207 commit 8c2f734
Show file tree
Hide file tree
Showing 5 changed files with 145 additions and 46 deletions.
Binary file added Doc/Demo.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
2 changes: 1 addition & 1 deletion NewLife.MqttServer/NewLife.MqttServer.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

<ItemGroup>
<PackageReference Include="NewLife.Core" Version="10.6.2023.1101" />
<PackageReference Include="NewLife.Stardust" Version="2.9.2023.1001" />
<PackageReference Include="NewLife.Stardust" Version="2.9.2023.1103" />
</ItemGroup>

<ItemGroup>
Expand Down
129 changes: 121 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,22 @@
![Nuget (with prereleases)](https://img.shields.io/nuget/vpre/newlife.mqtt?label=dev%20nuget&logo=nuget)

MQTT协议是物联网领域最流行的通信协议!
NewLife.MQTT包含了MQTT的完整实现,并实现了客户端MqttClient,以及服务端MqttServer
其中MqttServer仅实现基本网络框架,支持消息收发,完整的消息交换功能位于商用版IoT平台NewLife.IoT中
`NewLife.MQTT`包含了MQTT的完整实现,并实现了客户端`MqttClient`,以及服务端`MqttServer`
其中MqttServer仅实现基本网络框架,支持消息收发,完整的消息交换功能位于商用版IoT平台[FIoT](https://newlifex.com/iot/fiot)

## MQTT协议
最流行的物联网通信协议MQTT,包括客户端、服务端和Web管理平台。

提供订阅/发布模式,更为简约、轻量,易于使用,针对受限环境(带宽低、网络延迟高、网络通信不稳定),可以简单概括为物联网打造,官方总结特点如下:
1.使用发布/订阅消息模式,提供一对多的消息发布,解除应用程序耦合。
2.对负载内容屏蔽的消息传输。
3.使用 TCP/IP 提供网络连接。
4.有三种消息发布服务质量:
1. 使用发布/订阅消息模式,提供一对多的消息发布,解除应用程序耦合。
2. 对负载内容屏蔽的消息传输。
3. 使用 TCP/IP 提供网络连接。
4. 有三种消息发布服务质量:
“至多一次”,消息发布完全依赖底层 TCP/IP 网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。
“至少一次”,确保消息到达,但消息重复可能会发生。
“只有一次”,确保消息到达一次。这一级别可用于如下情况,在计费系统中,消息重复或丢失会导致不正确的结果。
5.小型传输,开销很小(固定长度的头部是 2 字节),协议交换最小化,以降低网络流量。
6.使用 Last Will 和 Testament 特性通知有关各方客户端异常中断的机制。
5. 小型传输,开销很小(固定长度的头部是 2 字节),协议交换最小化,以降低网络流量。
6. 使用 Last Will 和 Testament 特性通知有关各方客户端异常中断的机制。

## MQTT 发布与订阅
发布时,指定消息Qos,broker保存的消息包含了Qos;
Expand All @@ -39,6 +39,119 @@ NewLife.MQTT包含了MQTT的完整实现,并实现了客户端MqttClient,以
订阅Qos=2,broker推送Qos=2消息,客户端先回PubRec,broker再次发送PubRel,客户端答复PubComp,消息才算消费完成;
发布Qos=2消息时,双重确认流程不需要等消费端在线,仅限于发布者与broker之间即可完成。

## 快速尝鲜
打开源码解决方案,把Test设为启动项目,启动即可。
默认先后启动TestServer和TestClient。
![Demo](Doc/demo.png)

## 服务端
Nuget引用`NewLife.MQTT`,使用以下代码启动服务端:
```csharp
var services = ObjectContainer.Current;
services.AddSingleton<ILog>(XTrace.Log);
services.AddTransient<IMqttHandler, MqttHandler>();
services.AddSingleton<MqttExchange, MqttExchange>();

var server = new MqttServer
{
Port = 1883,
ServiceProvider = services.BuildServiceProvider(),

Log = XTrace.Log,
SessionLog = XTrace.Log,
};
server.Start();
```
通过指定端口1883,默认处理器`MqttHandler`,默认交换机`MqttExchange`,启动服务端。

## 客户端
Nuget引用`NewLife.MQTT`,使用以下代码连接服务端:
```csharp
var client = new MqttClient
{
Log = XTrace.Log,
Server = "tcp://127.0.0.1:1883",
//UserName = "admin",
//Password = "admin",
ClientId = Guid.NewGuid() + "",
};

await client.ConnectAsync();

// 订阅“/test”主题
var rt = await client.SubscribeAsync("/test", (e) =>
{
XTrace.WriteLine("收到消息:" + "/test/# =>" + e.Topic + ":" + e.Payload.ToStr());
});

// 每2秒向“/test”主题发布一条消息
while (true)
{
try
{
var msg = "学无先后达者为师" + Rand.NextString(8);
await client.PublishAsync("/test", msg);
}
catch (Exception ex)
{
XTrace.WriteException(ex);
}
await Task.Delay(2000);
}
```
客户端连接服务端有几个要素:`服务端地址``用户名``密码``客户端标识`,然后通过`ConnectAsync`连接服务端。
客户端可以是消费者角色,通过`SubscribeAsync`订阅指定Topic。
客户端也可以是生产者角色,通过`PublishAsync`发布消息到指定Topic。

## 自定义服务端
需要在服务端处理客户端连接和消息交互逻辑时,就需要自定义服务端。例如IoT平台,在收到设备上报MQTT数据以后,直接接收落库,而不需要再次消费。
自定义处理器示例如下:
```csharp
private class MyHandler : MqttHandler
{
private readonly ILog _log;

public MyHandler(ILog log) => _log = log;

protected override ConnAck OnConnect(ConnectMessage message)
{
_log.Info("客户端[{0}]连接 user={1} pass={2} clientId={3}", Session.Remote.EndPoint, message.Username, message.Password, message.ClientId);

return base.OnConnect(message);
}

protected override MqttMessage OnDisconnect(DisconnectMessage message)
{
_log.Info("客户端[{0}]断开", Session.Remote);

return base.OnDisconnect(message);
}

protected override MqttIdMessage OnPublish(PublishMessage message)
{
_log.Info("客户端[{0}]发布[{1}:qos={2}]: {3}", Session.Remote, message.Topic, (Int32)message.QoS, message.Payload.ToStr());

return base.OnPublish(message);
}
}
```
稍微修改一下服务端注入处理器的代码即可使用自定义处理器:
```csharp
var services = ObjectContainer.Current;
services.AddSingleton<ILog>(XTrace.Log);
services.AddTransient<IMqttHandler, MyHandler>();
services.AddSingleton<MqttExchange, MqttExchange>();

var server = new MqttServer
{
Port = 1883,
ServiceProvider = services.BuildServiceProvider(),

Log = XTrace.Log,
SessionLog = XTrace.Log,
};
server.Start();
```

## 新生命项目矩阵
各项目默认支持net7.0/netstandard2.1/netstandard2.0/net4.61,旧版(2022.1225)支持net4.5/net4.0/net2.0
Expand Down
56 changes: 21 additions & 35 deletions Test/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
using NewLife.MQTT;
using NewLife.MQTT.Handlers;
using NewLife.MQTT.Messaging;
using NewLife.Net;
using NewLife.Security;

namespace Test;
Expand All @@ -22,7 +21,8 @@ private static void Main(String[] args)

try
{
Test2();
TestServer();
TestClient();
//var mi = typeof(Program).GetMethod("Test" + idx, BindingFlags.Static | BindingFlags.Public | BindingFlags.NonPublic);
//if (mi != null) mi.Invoke(null, null);
}
Expand Down Expand Up @@ -57,43 +57,24 @@ private static void Test1()

private static MqttServer _server;

private static async void Test2()
private static void TestServer()
{
var ioc = ObjectContainer.Current;
ioc.AddSingleton<ILog>(XTrace.Log);
ioc.AddTransient<IMqttHandler, MyHandler>();
var services = ObjectContainer.Current;
services.AddSingleton<ILog>(XTrace.Log);
services.AddTransient<IMqttHandler, MyHandler>();
services.AddSingleton<MqttExchange, MqttExchange>();

var server = new MqttServer
{
Port = 50001,
ServiceProvider = ioc.BuildServiceProvider(),
Port = 1883,
ServiceProvider = services.BuildServiceProvider(),

Log = XTrace.Log,
SessionLog = XTrace.Log,
};
//server.AddHandler(new MyHandler());
server.Start();

_server = server;

//var client = new MqttClient
//{
// Server = "tcp://127.0.0.1:1883",
// Log = XTrace.Log
//};

//await client.ConnectAsync();

//for (var i = 0; i < 10; i++)
//{
// var qos = (QualityOfService)(i % 3);

// await client.PublishAsync("test", new { name = "p" + i, value = Rand.Next() }, qos);

// await Task.Delay(1000);
//}

//await client.DisconnectAsync();
}

private class MyHandler : MqttHandler
Expand Down Expand Up @@ -129,9 +110,9 @@ protected override MqttIdMessage OnPublish(PublishMessage message)
/// <summary>
/// 测试完整发布订阅
/// </summary>
private static async void Test3()
private static async void TestClient()
{
_mc = new MqttClient
var client = new MqttClient
{
Log = XTrace.Log,
Server = "tcp://127.0.0.1:1883",
Expand All @@ -140,24 +121,29 @@ private static async void Test3()
ClientId = Guid.NewGuid() + "",
};

await _mc.ConnectAsync();
//订阅“/test”主题
var rt = await _mc.SubscribeAsync("/test", (e) =>
await client.ConnectAsync();

// 订阅“/test”主题
var rt = await client.SubscribeAsync("/test", (e) =>
{
XTrace.WriteLine("收到消息:" + "/test/# =>" + e.Topic + ":" + e.Payload.ToStr());
});

// 每2秒向“/test”主题发布一条消息
while (true)
{
//每2秒向“/test”主题发布一条消息
try
{
var msg = "学无先后达者为师" + Rand.NextString(8);
await _mc.PublishAsync("/test", msg);
await client.PublishAsync("/test", msg);
}
catch (Exception ex)
{
XTrace.WriteException(ex);
}
await Task.Delay(2000);
}

_mc = client;

Check warning on line 147 in Test/Program.cs

View workflow job for this annotation

GitHub Actions / build-publish

Unreachable code detected

Check warning on line 147 in Test/Program.cs

View workflow job for this annotation

GitHub Actions / build-publish

Unreachable code detected
}
}
4 changes: 2 additions & 2 deletions XUnitTestClient/XUnitTestClient.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.7.2" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.8.0" />
<PackageReference Include="NewLife.Core" Version="10.6.2023.1101" />
<PackageReference Include="NewLife.UnitTest" Version="1.0.2023.905" />
<PackageReference Include="xunit" Version="2.6.0" />
<PackageReference Include="xunit" Version="2.6.1" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.5.3">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
Expand Down

0 comments on commit 8c2f734

Please sign in to comment.