diff --git a/NewLife.Remoting/Clients/ClientBase.cs b/NewLife.Remoting/Clients/ClientBase.cs index 94988fa..4fcb156 100644 --- a/NewLife.Remoting/Clients/ClientBase.cs +++ b/NewLife.Remoting/Clients/ClientBase.cs @@ -5,6 +5,7 @@ using NewLife.Caching; using NewLife.Log; using NewLife.Model; +using NewLife.Net; using NewLife.Reflection; using NewLife.Remoting.Models; using NewLife.Security; @@ -29,6 +30,10 @@ public abstract class ClientBase : DisposeBase, ICommandClient, IEventProvider, /// 服务提供者 public IServiceProvider? ServiceProvider { get; set; } + private IApiClient? _client; + /// Api客户端 + public IApiClient? Client => _client; + /// 是否已登录 public Boolean Logined { get; set; } @@ -123,6 +128,31 @@ protected virtual void OnInit() } PasswordProvider ??= GetService() ?? new SaltPasswordProvider { Algorithm = "md5" }; + + if (Client == null) + { + var urls = Setting?.Server; + if (urls.IsNullOrEmpty()) throw new ArgumentNullException(nameof(Setting), "未指定服务端地址"); + + _client = urls.StartsWithIgnoreCase("http", "https") ? CreateHttp(urls) : CreateRpc(urls); + } + } + + /// 创建Http客户端 + /// + /// + protected virtual ApiHttpClient CreateHttp(String urls) => new(urls) { Log = Log }; + + /// 创建RPC客户端 + /// + /// + protected virtual ApiClient CreateRpc(String urls) => new MyApiClient { Client = this, Servers = urls.Split(","), Log = Log }; + + class MyApiClient : ApiClient + { + public ClientBase Client { get; set; } = null!; + + protected override async Task OnLoginAsync(ISocketClient client, Boolean force) => await InvokeWithClientAsync(client, Client.Prefix + "Login", Client.BuildLoginRequest()); } /// 异步调用 @@ -130,7 +160,19 @@ protected virtual void OnInit() /// 参数 /// /// - public abstract Task OnInvokeAsync(String action, Object? args, CancellationToken cancellationToken); + public virtual async Task OnInvokeAsync(String action, Object? args, CancellationToken cancellationToken) + { + if (_client is ApiHttpClient http) + { + var method = System.Net.Http.HttpMethod.Post; + if (args == null || action.StartsWithIgnoreCase("Get") || action.ToLower().Contains("/get")) + method = System.Net.Http.HttpMethod.Get; + + return await http.InvokeAsync(method, action, args, null, cancellationToken); + } + + return await _client.InvokeAsync(action, args, cancellationToken); + } /// 远程调用拦截,支持重新登录 /// @@ -170,7 +212,10 @@ protected virtual void OnInit() /// 设置令牌。派生类可重定义逻辑 /// - protected virtual void SetToken(String? token) { } + protected virtual void SetToken(String? token) + { + if (_client != null) _client.Token = token; + } /// 获取相对于服务器的当前时间,避免两端时间差 /// @@ -439,18 +484,45 @@ protected virtual void StopTimer() _timerUpgrade = null; _eventTimer.TryDispose(); _eventTimer = null; + + _ws.TryDispose(); + _ws = null; } + private WsChannel? _ws; /// 定时心跳 /// /// - protected virtual Task OnPing(Object state) => Ping(); + protected virtual async Task OnPing(Object state) + { + DefaultSpan.Current = null; + using var span = Tracer?.NewSpan("DevicePing"); + try + { + var rs = await Ping(); + + if (_client is ApiHttpClient http) + { +#if NETCOREAPP + _ws ??= new WsChannelCore(this); +#else + _ws ??= new WsChannel(this); +#endif + if (_ws != null) await _ws.ValidWebSocket(http); + } + } + catch (Exception ex) + { + span?.SetError(ex, null); + Log?.Debug("{0}", ex); + } + } /// 收到命令 /// /// /// - protected async Task ReceiveCommand(CommandModel model, String source) + public async Task ReceiveCommand(CommandModel model, String source) { if (model == null) return; diff --git a/NewLife.Remoting/Clients/HttpClientBase.cs b/NewLife.Remoting/Clients/HttpClientBase.cs index f874a57..c9a74ef 100644 --- a/NewLife.Remoting/Clients/HttpClientBase.cs +++ b/NewLife.Remoting/Clients/HttpClientBase.cs @@ -86,36 +86,6 @@ protected override void SetToken(String? token) } #endregion - #region 登录 - /// 登录 - /// 登录信息 - /// - protected override async Task LoginAsync(ILoginRequest request) - { - // 登录前清空令牌,避免服务端使用上一次信息 - _client.Token = null; - - var rs = await base.LoginAsync(request); - - // 登录后设置用于用户认证的token - _client.Token = rs?.Token; - - return rs; - } - - /// 注销 - /// - protected override async Task LogoutAsync(String reason) - { - var rs = await base.LogoutAsync(reason); - - // 更新令牌 - _client.Token = rs?.Token; - - return rs; - } - #endregion - #region 心跳 /// 心跳后建立WebSocket长连接 /// diff --git a/NewLife.Remoting/Clients/RpcClientBase.cs b/NewLife.Remoting/Clients/RpcClientBase.cs index 08a7a08..35fe1b6 100644 --- a/NewLife.Remoting/Clients/RpcClientBase.cs +++ b/NewLife.Remoting/Clients/RpcClientBase.cs @@ -1,8 +1,5 @@ -using System.Diagnostics.CodeAnalysis; -using System.Net.Http; -using NewLife.Log; +using NewLife.Log; using NewLife.Net; -using NewLife.Remoting.Models; namespace NewLife.Remoting.Clients; @@ -60,62 +57,4 @@ protected override void SetToken(String? token) if (_client != null) _client.Token = token; } #endregion - - #region 登录 - /// 登录 - /// - public override async Task Login() - { - _client.Token = null; - - var rs = await base.Login(); - - _client.Token = rs?.Token; - - return rs; - } - - /// 登录 - /// 登录信息 - /// - protected override async Task LoginAsync(ILoginRequest request) - { - // 登录前清空令牌,避免服务端使用上一次信息 - _client.Token = null; - - var rs = await base.LoginAsync(request); - - // 登录后设置用于用户认证的token - _client.Token = rs?.Token; - - return rs; - } - - /// 注销 - /// - protected override async Task LogoutAsync(String reason) - { - var rs = await base.LogoutAsync(reason); - - // 更新令牌 - _client.Token = rs?.Token; - - return rs; - } - #endregion - - #region 心跳 - /// 心跳 - /// - public override async Task Ping() - { - var rs = await base.Ping(); - - // 令牌 - if (rs != null && rs.Token.IsNullOrEmpty()) - _client.Token = rs.Token; - - return rs; - } - #endregion } \ No newline at end of file diff --git a/NewLife.Remoting/Clients/WsChannel.cs b/NewLife.Remoting/Clients/WsChannel.cs new file mode 100644 index 0000000..2a35b72 --- /dev/null +++ b/NewLife.Remoting/Clients/WsChannel.cs @@ -0,0 +1,124 @@ +using NewLife.Log; +using NewLife.Net; +using NewLife.Remoting.Models; +using NewLife.Serialization; + +namespace NewLife.Remoting.Clients; + +/// WebSocket +class WsChannel : DisposeBase +{ + private readonly ClientBase _client; + + public WsChannel(ClientBase client) => _client = client; + + protected override void Dispose(Boolean disposing) + { + base.Dispose(disposing); + + StopWebSocket(); + } + + public virtual async Task ValidWebSocket(ApiHttpClient http) + { + var svc = http.Current; + if (svc == null) return; + + // 使用过滤器内部token,因为它有过期刷新机制 + var token = http.Token; + if (http.Filter is NewLife.Http.TokenHttpFilter thf) token = thf.Token?.AccessToken; + + var span = DefaultSpan.Current; + span?.AppendTag($"svc={svc.Address} Token=[{token?.Length}]"); + + if (token.IsNullOrEmpty()) return; + + if (_websocket != null && !_websocket.Disposed) + { + try + { + // 在websocket链路上定时发送心跳,避免长连接被断开 + await _websocket.SendTextAsync("Ping"); + } + catch (Exception ex) + { + span?.SetError(ex, null); + _client.WriteLog("{0}", ex); + } + } + + if (_websocket == null || _websocket.Disposed) + { + var url = svc.Address.ToString().Replace("http://", "ws://").Replace("https://", "wss://"); + var uri = new Uri(new Uri(url), "/Device/Notify"); + + using var span2 = _client.Tracer?.NewSpan("WebSocketConnect", uri + ""); + + var client = uri.CreateRemote() as WebSocketClient; + //todo 设置令牌 + //client.Options.SetRequestHeader("Authorization", "Bearer " + token); + + span?.AppendTag($"WebSocket.Connect {uri}"); + //await client.ConnectAsync(uri, default); + client.Open(); + + _websocket = client; + + _source = new CancellationTokenSource(); + _ = Task.Run(() => DoPull(client, _source.Token)); + } + } + + private WebSocketClient? _websocket; + private CancellationTokenSource? _source; + private async Task DoPull(WebSocketClient socket, CancellationToken cancellationToken) + { + DefaultSpan.Current = null; + try + { + var buf = new Byte[4 * 1024]; + while (!cancellationToken.IsCancellationRequested && !socket.Disposed) + { + var rs = await socket.ReceiveMessageAsync(cancellationToken); + var txt = rs.Payload.ToStr(); + await OnReceive(txt); + } + } + catch (Exception ex) + { + XTrace.WriteException(ex); + } + + if (!socket.Disposed) await socket.CloseAsync(1000, "finish", default); + } + + /// 收到服务端主动下发消息。默认转为CommandModel命令处理 + /// + /// + private async Task OnReceive(String message) + { + if (message.StartsWithIgnoreCase("Pong")) + { + } + else + { + var model = message.ToJsonEntity(); + if (model != null) await _client.ReceiveCommand(model, "WebSocket"); + } + } + + private void StopWebSocket() + { +#if NETCOREAPP + _source?.Cancel(); + try + { + if (_websocket != null && !_websocket.Disposed) + _websocket.CloseAsync(1000, "finish", default); + } + catch { } + + _websocket = null; +#endif + } +} \ No newline at end of file diff --git a/NewLife.Remoting/Clients/WsChannelCore.cs b/NewLife.Remoting/Clients/WsChannelCore.cs new file mode 100644 index 0000000..44ddaee --- /dev/null +++ b/NewLife.Remoting/Clients/WsChannelCore.cs @@ -0,0 +1,126 @@ +#if NETCOREAPP +using System.Net.WebSockets; +using NewLife.Log; +using NewLife.Remoting.Models; +using NewLife.Serialization; +using WebSocket = System.Net.WebSockets.WebSocket; +using WebSocketMessageType = System.Net.WebSockets.WebSocketMessageType; + +namespace NewLife.Remoting.Clients; + +/// WebSocket +class WsChannelCore : WsChannel +{ + private readonly ClientBase _client; + public WsChannelCore(ClientBase client) : base(client) => _client = client; + + protected override void Dispose(Boolean disposing) + { + base.Dispose(disposing); + + StopWebSocket(); + } + + public override async Task ValidWebSocket(ApiHttpClient http) + { + var svc = http.Current; + if (svc == null) return; + + // 使用过滤器内部token,因为它有过期刷新机制 + var token = http.Token; + if (http.Filter is NewLife.Http.TokenHttpFilter thf) token = thf.Token?.AccessToken; + + var span = DefaultSpan.Current; + span?.AppendTag($"svc={svc.Address} Token=[{token?.Length}]"); + + if (token.IsNullOrEmpty()) return; + + if (_websocket != null && _websocket.State == WebSocketState.Open) + { + try + { + // 在websocket链路上定时发送心跳,避免长连接被断开 + var str = "Ping"; + await _websocket.SendAsync(new ArraySegment(str.GetBytes()), WebSocketMessageType.Text, true, default); + } + catch (Exception ex) + { + span?.SetError(ex, null); + _client.WriteLog("{0}", ex); + } + } + + if (_websocket == null || _websocket.State != WebSocketState.Open) + { + var url = svc.Address.ToString().Replace("http://", "ws://").Replace("https://", "wss://"); + var uri = new Uri(new Uri(url), "/Device/Notify"); + + using var span2 = _client.Tracer?.NewSpan("WebSocketConnect", uri + ""); + + var client = new ClientWebSocket(); + client.Options.SetRequestHeader("Authorization", "Bearer " + token); + + span?.AppendTag($"WebSocket.Connect {uri}"); + await client.ConnectAsync(uri, default); + + _websocket = client; + + _source = new CancellationTokenSource(); + _ = Task.Run(() => DoPull(client, _source.Token)); + } + } + + private WebSocket? _websocket; + private CancellationTokenSource? _source; + private async Task DoPull(WebSocket socket, CancellationToken cancellationToken) + { + DefaultSpan.Current = null; + try + { + var buf = new Byte[4 * 1024]; + while (!cancellationToken.IsCancellationRequested && socket.State == WebSocketState.Open) + { + var data = await socket.ReceiveAsync(new ArraySegment(buf), cancellationToken); + var txt = buf.ToStr(null, 0, data.Count); + await OnReceive(txt); + } + } + catch (Exception ex) + { + XTrace.WriteException(ex); + } + + if (socket.State == WebSocketState.Open) await socket.CloseAsync(WebSocketCloseStatus.NormalClosure, "finish", default); + } + + /// 收到服务端主动下发消息。默认转为CommandModel命令处理 + /// + /// + private async Task OnReceive(String message) + { + if (message.StartsWithIgnoreCase("Pong")) + { + } + else + { + var model = message.ToJsonEntity(); + if (model != null) await _client.ReceiveCommand(model, "WebSocket"); + } + } + + private void StopWebSocket() + { +#if NETCOREAPP + _source?.Cancel(); + try + { + if (_websocket != null && _websocket.State == WebSocketState.Open) + _websocket.CloseAsync(WebSocketCloseStatus.NormalClosure, "finish", default); + } + catch { } + + _websocket = null; +#endif + } +} +#endif \ No newline at end of file diff --git a/NewLife.Remoting/Clients/WsClientBase.cs b/NewLife.Remoting/Clients/WsClientBase.cs deleted file mode 100644 index 4fa9fd9..0000000 --- a/NewLife.Remoting/Clients/WsClientBase.cs +++ /dev/null @@ -1,114 +0,0 @@ -#if NETCOREAPP -using System.Diagnostics.CodeAnalysis; -using System.Net.WebSockets; -using NewLife.Log; -using NewLife.Remoting.Models; - -namespace NewLife.Remoting.Clients; - -/// Websocket版应用客户端基类 -public class WsClientBase : ClientBase -{ - #region 属性 - private WsClient _client = null!; - #endregion - - #region 构造 - /// 实例化 - public WsClientBase() : base() - { - _client = new MyApiClient - { - Client = this, - Log = XTrace.Log - }; - } - - /// 实例化 - /// - public WsClientBase(String urls) : this() - { - if (!urls.IsNullOrEmpty()) - _client.Servers = urls.Split(","); - } - #endregion - - #region 方法 - /// 异步调用 - /// 动作 - /// 参数 - /// - /// - [return: MaybeNull] - public override async Task OnInvokeAsync(String action, Object? args, CancellationToken cancellationToken) - { - return await _client.InvokeAsync(action, args, cancellationToken); - } - - class MyApiClient : WsClient - { - public ClientBase Client { get; set; } = null!; - - protected override async Task OnLoginAsync(WebSocket client, Boolean force) => await InvokeWithClientAsync(client, Client.Prefix + "/Login", Client.BuildLoginRequest()); - } - #endregion - - #region 登录 - /// 登录 - /// - public override async Task Login() - { - _client.Token = null; - - var rs = await base.Login(); - - _client.Token = rs?.Token; - - return rs; - } - - /// 登录 - /// 登录信息 - /// - protected override async Task LoginAsync(LoginRequest request) - { - // 登录前清空令牌,避免服务端使用上一次信息 - _client.Token = null; - - var rs = await base.LoginAsync(request); - - // 登录后设置用于用户认证的token - _client.Token = rs?.Token; - - return rs; - } - - /// 注销 - /// - protected override async Task LogoutAsync(String reason) - { - var rs = await base.LogoutAsync(reason); - - // 更新令牌 - _client.Token = rs?.Token; - - return rs; - } - #endregion - - #region 心跳 - /// 心跳 - /// - public override async Task Ping() - { - var rs = await base.Ping(); - - // 令牌 - if (rs != null && rs.Token.IsNullOrEmpty()) - _client.Token = rs.Token; - - return rs; - } - #endregion -} -#endif \ No newline at end of file diff --git a/NewLife.Remoting/NewLife.Remoting.csproj b/NewLife.Remoting/NewLife.Remoting.csproj index 25b0526..5f695c7 100644 --- a/NewLife.Remoting/NewLife.Remoting.csproj +++ b/NewLife.Remoting/NewLife.Remoting.csproj @@ -38,7 +38,8 @@ - + + diff --git a/NewLife.Remoting/WsClient.cs b/NewLife.Remoting/WsClient.cs index 056c44b..35990b3 100644 --- a/NewLife.Remoting/WsClient.cs +++ b/NewLife.Remoting/WsClient.cs @@ -1,5 +1,4 @@ -using System; -using System.Net.WebSockets; +using System.Net.WebSockets; using NewLife.Collections; using NewLife.Data; using NewLife.Log; diff --git a/Samples/IoTZero/Clients/HttpDevice.cs b/Samples/IoTZero/Clients/HttpDevice.cs index a5908df..ab556d5 100644 --- a/Samples/IoTZero/Clients/HttpDevice.cs +++ b/Samples/IoTZero/Clients/HttpDevice.cs @@ -9,7 +9,7 @@ namespace IoTEdge; /// Http协议设备 -public class HttpDevice : HttpClientBase +public class HttpDevice : ClientBase { #region 属性 /// 产品编码。从IoT管理平台获取 @@ -26,8 +26,6 @@ public HttpDevice(ClientSetting setting) : base(setting) _setting = setting; ProductKey = setting.ProductKey; - - AddServices(setting.Server); } #endregion