From 005752e7489b4324420b24950d1d241bd5e880c1 Mon Sep 17 00:00:00 2001 From: oneRain Date: Wed, 24 Jun 2020 17:45:27 +0800 Subject: [PATCH 1/4] =?UTF-8?q?chore:=20=E6=94=AF=E6=8C=81=20command=20?= =?UTF-8?q?=E8=8A=82=E6=B5=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../Internal/Connection/LCConnection.cs | 32 +++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/Realtime/Realtime/Internal/Connection/LCConnection.cs b/Realtime/Realtime/Internal/Connection/LCConnection.cs index 3b797aca..0c8de858 100644 --- a/Realtime/Realtime/Internal/Connection/LCConnection.cs +++ b/Realtime/Realtime/Internal/Connection/LCConnection.cs @@ -60,6 +60,7 @@ enum State { /// 请求回调缓存 /// private readonly Dictionary> responses; + private readonly List sendingRequests; private int requestI = 1; @@ -79,6 +80,7 @@ enum State { internal LCConnection(string id) { this.id = id; responses = new Dictionary>(); + sendingRequests = new List(); heartBeat = new LCHeartBeat(this, OnDisconnect); router = new LCRTMRouter(); ws = new LCWebSocketClient { @@ -127,6 +129,22 @@ internal async Task ConnectInternal() { /// /// internal async Task SendRequest(GenericCommand request) { + if (IsIdempotentCommand(request)) { + GenericCommand sendingReq = sendingRequests.Find(item => { + // TRICK 除了 I 其他字段相等 + request.I = item.I; + return Equals(request, item); + }); + if (sendingReq == null) { + sendingRequests.Add(request); + } else { + LCLogger.Warn("duplicated request"); + if (responses.TryGetValue(sendingReq.I, out TaskCompletionSource waitingTcs)) { + return await waitingTcs.Task; + } + } + } + TaskCompletionSource tcs = new TaskCompletionSource(); request.I = requestI++; responses.Add(request.I, tcs); @@ -187,6 +205,9 @@ private void OnMessage(byte[] bytes) { LCException exception = new LCException(code, detail); tcs.TrySetException(exception); } else { + sendingRequests.RemoveAll(item => { + return item.I == command.I; + }); tcs.TrySetResult(command); } responses.Remove(requestIndex); @@ -303,5 +324,16 @@ internal void Pause() { internal void Resume() { _ = Reconnect(); } + + private static bool IsIdempotentCommand(GenericCommand command) { + return !( + command.Cmd == CommandType.Direct || + (command.Cmd == CommandType.Session && command.Op == OpType.Open) || + (command.Cmd == CommandType.Conv && + (command.Op == OpType.Start || + command.Op == OpType.Update || + command.Op == OpType.Members)) + ); + } } } From 4a6cf1ea09088955b69a91dbd85b1f3a470d3019 Mon Sep 17 00:00:00 2001 From: oneRain Date: Sun, 28 Jun 2020 12:07:20 +0800 Subject: [PATCH 2/4] * Throttle.cs: MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * LCConnection.cs: chore: 支持节流 --- Realtime/Realtime.Test/Throttle.cs | 71 +++++++++++++++++++ .../Internal/Connection/LCConnection.cs | 7 +- 2 files changed, 75 insertions(+), 3 deletions(-) create mode 100644 Realtime/Realtime.Test/Throttle.cs diff --git a/Realtime/Realtime.Test/Throttle.cs b/Realtime/Realtime.Test/Throttle.cs new file mode 100644 index 00000000..f25c651c --- /dev/null +++ b/Realtime/Realtime.Test/Throttle.cs @@ -0,0 +1,71 @@ +using NUnit.Framework; +using System; +using System.Threading.Tasks; +using LeanCloud; +using LeanCloud.Realtime; +using LeanCloud.Realtime.Internal.Protocol; + +namespace Realtime.Test { + public class Throttle { + private LCIMClient c1; + private LCIMClient c2; + + private LCIMConversation conversation; + + [SetUp] + public async Task SetUp() { + LCLogger.LogDelegate += Utils.Print; + LCApplication.Initialize("ikGGdRE2YcVOemAaRbgp1xGJ-gzGzoHsz", "NUKmuRbdAhg1vrb2wexYo1jo", "https://ikggdre2.lc-cn-n1-shared.com"); + c1 = new LCIMClient(Guid.NewGuid().ToString()); + c2 = new LCIMClient(Guid.NewGuid().ToString()); + await c1.Open(); + await c2.Open(); + + conversation = await c1.CreateConversation(new string[] { Guid.NewGuid().ToString() }); + } + + [TearDown] + public async Task TearDown() { + await c1.Close(); + await c2.Close(); + LCLogger.LogDelegate -= Utils.Print; + } + + [Test] + public void Equality() { + GenericCommand cmd1 = new GenericCommand { + Cmd = CommandType.Session, + Op = OpType.Open, + PeerId = "hello", + I = 1, + SessionMessage = new SessionCommand { + Code = 123 + } + }; + GenericCommand cmd2 = new GenericCommand { + Cmd = CommandType.Session, + Op = OpType.Open, + PeerId = "hello", + I = 2, + SessionMessage = new SessionCommand { + Code = 123 + } + }; + Assert.IsFalse(Equals(cmd1, cmd2)); + cmd2.I = cmd1.I; + Assert.IsTrue(Equals(cmd1, cmd2)); + } + + [Test] + public async Task CreateSameConversation() { + Task t1 = conversation.RemoveMembers(new string[] { c2.Id }).ContinueWith(t => { + Assert.IsTrue(t.IsCompleted && !t.IsFaulted); + }); + Task t2 = conversation.RemoveMembers(new string[] { c2.Id }).ContinueWith(t => { + Assert.IsTrue(t.IsCompleted && !t.IsFaulted); + }); + + await Task.WhenAll(t1, t2); + } + } +} diff --git a/Realtime/Realtime/Internal/Connection/LCConnection.cs b/Realtime/Realtime/Internal/Connection/LCConnection.cs index 0c8de858..c4703fae 100644 --- a/Realtime/Realtime/Internal/Connection/LCConnection.cs +++ b/Realtime/Realtime/Internal/Connection/LCConnection.cs @@ -135,13 +135,14 @@ internal async Task SendRequest(GenericCommand request) { request.I = item.I; return Equals(request, item); }); - if (sendingReq == null) { - sendingRequests.Add(request); - } else { + if (sendingReq != null) { LCLogger.Warn("duplicated request"); if (responses.TryGetValue(sendingReq.I, out TaskCompletionSource waitingTcs)) { return await waitingTcs.Task; } + LCLogger.Error($"error request: {request}"); + } else { + sendingRequests.Add(request); } } From 5976f5dbb306d49b3e6a1fe890b42c4a15f9aa40 Mon Sep 17 00:00:00 2001 From: oneRain Date: Sun, 28 Jun 2020 12:30:29 +0800 Subject: [PATCH 3/4] =?UTF-8?q?chore:=20=E7=AE=80=E5=8C=96=E7=BC=93?= =?UTF-8?q?=E5=AD=98=E7=9A=84=E8=AF=B7=E6=B1=82/=E5=BA=94=E7=AD=94?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../Internal/Connection/LCConnection.cs | 34 +++++++++++-------- 1 file changed, 20 insertions(+), 14 deletions(-) diff --git a/Realtime/Realtime/Internal/Connection/LCConnection.cs b/Realtime/Realtime/Internal/Connection/LCConnection.cs index c4703fae..1e7732a5 100644 --- a/Realtime/Realtime/Internal/Connection/LCConnection.cs +++ b/Realtime/Realtime/Internal/Connection/LCConnection.cs @@ -1,4 +1,5 @@ using System; +using System.Linq; using System.Text; using System.Collections.Generic; using System.Threading.Tasks; @@ -12,6 +13,17 @@ namespace LeanCloud.Realtime.Internal.Connection { /// 连接层,只与数据协议相关 /// public class LCConnection { + // 请求/应答比对,即 I 相等 + class RequestAndResponseComparer : IEqualityComparer { + public bool Equals(GenericCommand x, GenericCommand y) { + return true; + } + + public int GetHashCode(GenericCommand obj) { + return obj.I; + } + } + /// /// 连接状态 /// @@ -59,8 +71,7 @@ enum State { /// /// 请求回调缓存 /// - private readonly Dictionary> responses; - private readonly List sendingRequests; + private readonly Dictionary> requestToResponses; private int requestI = 1; @@ -79,8 +90,8 @@ enum State { internal LCConnection(string id) { this.id = id; - responses = new Dictionary>(); - sendingRequests = new List(); + requestToResponses = new Dictionary>(new RequestAndResponseComparer()); + heartBeat = new LCHeartBeat(this, OnDisconnect); router = new LCRTMRouter(); ws = new LCWebSocketClient { @@ -130,25 +141,23 @@ internal async Task ConnectInternal() { /// internal async Task SendRequest(GenericCommand request) { if (IsIdempotentCommand(request)) { - GenericCommand sendingReq = sendingRequests.Find(item => { + GenericCommand sendingReq = requestToResponses.Keys.FirstOrDefault(item => { // TRICK 除了 I 其他字段相等 request.I = item.I; return Equals(request, item); }); if (sendingReq != null) { LCLogger.Warn("duplicated request"); - if (responses.TryGetValue(sendingReq.I, out TaskCompletionSource waitingTcs)) { + if (requestToResponses.TryGetValue(sendingReq, out TaskCompletionSource waitingTcs)) { return await waitingTcs.Task; } LCLogger.Error($"error request: {request}"); - } else { - sendingRequests.Add(request); } } TaskCompletionSource tcs = new TaskCompletionSource(); request.I = requestI++; - responses.Add(request.I, tcs); + requestToResponses.Add(request, tcs); try { await SendCommand(request); } catch (Exception e) { @@ -196,7 +205,7 @@ private void OnMessage(byte[] bytes) { if (command.HasI) { // 应答 int requestIndex = command.I; - if (responses.TryGetValue(requestIndex, out TaskCompletionSource tcs)) { + if (requestToResponses.TryGetValue(command, out TaskCompletionSource tcs)) { if (command.HasErrorMessage) { // 错误 ErrorCommand error = command.ErrorMessage; @@ -206,12 +215,9 @@ private void OnMessage(byte[] bytes) { LCException exception = new LCException(code, detail); tcs.TrySetException(exception); } else { - sendingRequests.RemoveAll(item => { - return item.I == command.I; - }); tcs.TrySetResult(command); } - responses.Remove(requestIndex); + requestToResponses.Remove(command); } else { LCLogger.Error($"No request for {requestIndex}"); } From f5728b203a5b1a1d33d468cda13bdd4a2336d1b7 Mon Sep 17 00:00:00 2001 From: oneRain Date: Sun, 28 Jun 2020 12:38:14 +0800 Subject: [PATCH 4/4] rename --- Realtime/Realtime.Test/Throttle.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Realtime/Realtime.Test/Throttle.cs b/Realtime/Realtime.Test/Throttle.cs index f25c651c..b0f5efaa 100644 --- a/Realtime/Realtime.Test/Throttle.cs +++ b/Realtime/Realtime.Test/Throttle.cs @@ -57,7 +57,7 @@ public void Equality() { } [Test] - public async Task CreateSameConversation() { + public async Task RemoveMemberTwice() { Task t1 = conversation.RemoveMembers(new string[] { c2.Id }).ContinueWith(t => { Assert.IsTrue(t.IsCompleted && !t.IsFaulted); });