Skip to content

Commit

Permalink
Merge pull request #79 from onerain88/throttle
Browse files Browse the repository at this point in the history
Throttle
  • Loading branch information
onerain88 authored Jun 28, 2020
2 parents 5ffe1de + f5728b2 commit 6db2980
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 5 deletions.
71 changes: 71 additions & 0 deletions Realtime/Realtime.Test/Throttle.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
using NUnit.Framework;
using System;
using System.Threading.Tasks;
using LeanCloud;
using LeanCloud.Realtime;
using LeanCloud.Realtime.Internal.Protocol;

namespace Realtime.Test {
public class Throttle {
private LCIMClient c1;
private LCIMClient c2;

private LCIMConversation conversation;

[SetUp]
public async Task SetUp() {
LCLogger.LogDelegate += Utils.Print;
LCApplication.Initialize("ikGGdRE2YcVOemAaRbgp1xGJ-gzGzoHsz", "NUKmuRbdAhg1vrb2wexYo1jo", "https://ikggdre2.lc-cn-n1-shared.com");
c1 = new LCIMClient(Guid.NewGuid().ToString());
c2 = new LCIMClient(Guid.NewGuid().ToString());
await c1.Open();
await c2.Open();

conversation = await c1.CreateConversation(new string[] { Guid.NewGuid().ToString() });
}

[TearDown]
public async Task TearDown() {
await c1.Close();
await c2.Close();
LCLogger.LogDelegate -= Utils.Print;
}

[Test]
public void Equality() {
GenericCommand cmd1 = new GenericCommand {
Cmd = CommandType.Session,
Op = OpType.Open,
PeerId = "hello",
I = 1,
SessionMessage = new SessionCommand {
Code = 123
}
};
GenericCommand cmd2 = new GenericCommand {
Cmd = CommandType.Session,
Op = OpType.Open,
PeerId = "hello",
I = 2,
SessionMessage = new SessionCommand {
Code = 123
}
};
Assert.IsFalse(Equals(cmd1, cmd2));
cmd2.I = cmd1.I;
Assert.IsTrue(Equals(cmd1, cmd2));
}

[Test]
public async Task RemoveMemberTwice() {
Task t1 = conversation.RemoveMembers(new string[] { c2.Id }).ContinueWith(t => {
Assert.IsTrue(t.IsCompleted && !t.IsFaulted);
});
Task t2 = conversation.RemoveMembers(new string[] { c2.Id }).ContinueWith(t => {
Assert.IsTrue(t.IsCompleted && !t.IsFaulted);
});

await Task.WhenAll(t1, t2);
}
}
}
49 changes: 44 additions & 5 deletions Realtime/Realtime/Internal/Connection/LCConnection.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Linq;
using System.Text;
using System.Collections.Generic;
using System.Threading.Tasks;
Expand All @@ -12,6 +13,17 @@ namespace LeanCloud.Realtime.Internal.Connection {
/// 连接层,只与数据协议相关
/// </summary>
public class LCConnection {
// 请求/应答比对,即 I 相等
class RequestAndResponseComparer : IEqualityComparer<GenericCommand> {
public bool Equals(GenericCommand x, GenericCommand y) {
return true;
}

public int GetHashCode(GenericCommand obj) {
return obj.I;
}
}

/// <summary>
/// 连接状态
/// </summary>
Expand Down Expand Up @@ -59,7 +71,7 @@ enum State {
/// <summary>
/// 请求回调缓存
/// </summary>
private readonly Dictionary<int, TaskCompletionSource<GenericCommand>> responses;
private readonly Dictionary<GenericCommand, TaskCompletionSource<GenericCommand>> requestToResponses;

private int requestI = 1;

Expand All @@ -78,7 +90,8 @@ enum State {

internal LCConnection(string id) {
this.id = id;
responses = new Dictionary<int, TaskCompletionSource<GenericCommand>>();
requestToResponses = new Dictionary<GenericCommand, TaskCompletionSource<GenericCommand>>(new RequestAndResponseComparer());

heartBeat = new LCHeartBeat(this, OnDisconnect);
router = new LCRTMRouter();
ws = new LCWebSocketClient {
Expand Down Expand Up @@ -127,9 +140,24 @@ internal async Task ConnectInternal() {
/// <param name="request"></param>
/// <returns></returns>
internal async Task<GenericCommand> SendRequest(GenericCommand request) {
if (IsIdempotentCommand(request)) {
GenericCommand sendingReq = requestToResponses.Keys.FirstOrDefault(item => {
// TRICK 除了 I 其他字段相等
request.I = item.I;
return Equals(request, item);
});
if (sendingReq != null) {
LCLogger.Warn("duplicated request");
if (requestToResponses.TryGetValue(sendingReq, out TaskCompletionSource<GenericCommand> waitingTcs)) {
return await waitingTcs.Task;
}
LCLogger.Error($"error request: {request}");
}
}

TaskCompletionSource<GenericCommand> tcs = new TaskCompletionSource<GenericCommand>();
request.I = requestI++;
responses.Add(request.I, tcs);
requestToResponses.Add(request, tcs);
try {
await SendCommand(request);
} catch (Exception e) {
Expand Down Expand Up @@ -177,7 +205,7 @@ private void OnMessage(byte[] bytes) {
if (command.HasI) {
// 应答
int requestIndex = command.I;
if (responses.TryGetValue(requestIndex, out TaskCompletionSource<GenericCommand> tcs)) {
if (requestToResponses.TryGetValue(command, out TaskCompletionSource<GenericCommand> tcs)) {
if (command.HasErrorMessage) {
// 错误
ErrorCommand error = command.ErrorMessage;
Expand All @@ -189,7 +217,7 @@ private void OnMessage(byte[] bytes) {
} else {
tcs.TrySetResult(command);
}
responses.Remove(requestIndex);
requestToResponses.Remove(command);
} else {
LCLogger.Error($"No request for {requestIndex}");
}
Expand Down Expand Up @@ -303,5 +331,16 @@ internal void Pause() {
internal void Resume() {
_ = Reconnect();
}

private static bool IsIdempotentCommand(GenericCommand command) {
return !(
command.Cmd == CommandType.Direct ||
(command.Cmd == CommandType.Session && command.Op == OpType.Open) ||
(command.Cmd == CommandType.Conv &&
(command.Op == OpType.Start ||
command.Op == OpType.Update ||
command.Op == OpType.Members))
);
}
}
}

0 comments on commit 6db2980

Please sign in to comment.