-
Notifications
You must be signed in to change notification settings - Fork 259
/
Copy pathDefaultMerger.cs
136 lines (126 loc) · 5.48 KB
/
DefaultMerger.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
using System.Collections.Concurrent;
using JT808.Protocol.Interfaces;
using JT808.Protocol.MessagePack;
namespace JT808.Protocol.Internal
{
/// <summary>
/// 默认分包合并实现
/// </summary>
public class DefaultMerger : IMerger, IDisposable
{
/// <summary>
/// 分包数据缓存
/// <para>key为sim卡号,value为字典,key为消息id,value为元组,结构为:(分包索引,分包元数据)</para>
/// </summary>
private readonly ConcurrentDictionary<string, ConcurrentDictionary<ushort, List<(ushort index, byte[] data)>>> splitPackageDictionary = new();
private readonly ConcurrentDictionary<string, DateTime> timeoutDictionary = new ConcurrentDictionary<string, DateTime>();
private readonly TimeSpan cleanInterval = TimeSpan.FromSeconds(60);
private readonly CancellationTokenSource cts = new CancellationTokenSource();
private bool disposed;
public DefaultMerger()
{
Task.Run(async () =>
{
while (!cts.IsCancellationRequested)
{
timeoutDictionary.ToList().ForEach(x =>
{
var key = x.Key;
var datetime = x.Value;
if (datetime < DateTime.Now && TryParseKey(key, out var phoneNumber, out var messageId) && splitPackageDictionary.TryGetValue(phoneNumber, out var value) && value.TryRemove(messageId, out var caches) && value.Count == 0 && splitPackageDictionary.TryRemove(phoneNumber, out _))
{
timeoutDictionary.TryRemove(key, out _);
}
});
await Task.Delay(cleanInterval);
}
}, cts.Token);
}
/// <inheritdoc/>
public bool TryMerge(JT808Header header, byte[] data, IJT808Config config, out JT808Bodies body)
{
body = null;
var timeoutKey = GenerateKey(header.TerminalPhoneNo, header.MsgId);
if (!CheckTimeout(timeoutKey)) return false;
var timeout = DateTime.Now.AddSeconds(config.AutoMergeTimeoutSecond);
if (timeoutDictionary.TryAdd(timeoutKey, timeout))
timeoutDictionary.TryUpdate(timeoutKey, timeout, timeout);
if (splitPackageDictionary.TryGetValue(header.TerminalPhoneNo, out var item) && item.TryGetValue(header.MsgId, out var packages))
{
packages.Add((header.PackageIndex, data));
if (packages.Count != header.PackgeCount)
{
return false;
}
item.TryRemove(header.MsgId, out _);
splitPackageDictionary.TryRemove(header.TerminalPhoneNo, out _);
var mateData = packages.OrderBy(x => x.index).SelectMany(x => x.data).Concat(data).ToArray();
byte[] buffer = JT808ArrayPool.Rent(mateData.Length);
try
{
var reader = new JT808MessagePackReader(mateData, (Enums.JT808Version)header.ProtocolVersion);
if (config.MsgIdFactory.TryGetValue(header.MsgId, out var value) && value is JT808Bodies instance)
{
body = instance.DeserializeExt<JT808Bodies>(ref reader, config);
header.MessageBodyProperty.IsMerged = true;
return true;
}
}
finally
{
JT808ArrayPool.Return(buffer);
}
}
else
{
splitPackageDictionary.AddOrUpdate(header.TerminalPhoneNo, new ConcurrentDictionary<ushort, List<(ushort, byte[])>>() { [header.MsgId] = new List<(ushort, byte[])> { (header.PackageIndex, data) } }, (_, value) =>
{
value.AddOrUpdate(header.MsgId, new List<(ushort, byte[])> { (header.PackageIndex, data) }, (_, item) =>
{
item.Add((header.PackageIndex, data));
return item;
});
return value;
});
}
return false;
}
private bool CheckTimeout(string key) => !timeoutDictionary.TryGetValue(key, out var dateTime) || dateTime >= DateTime.Now;
private const char keyJoiner = '-';
private const string keyJoinerNET7 = "-";
private string GenerateKey(string phoneNumber, ushort messageId) => string.Join(keyJoinerNET7, new[] { phoneNumber, messageId.ToString() });
private bool TryParseKey(string key, out string phoneNumber, out ushort messageId)
{
phoneNumber = null;
messageId = 0;
var tmp = key.Split(keyJoiner);
if (tmp.Length == 2 && ushort.TryParse(tmp[1], out messageId))
{
phoneNumber = tmp[0];
return true;
}
return false;
}
protected virtual void Dispose(bool disposing)
{
if (!disposed)
{
if (disposing)
{
cts.Cancel();
cts.Dispose();
}
disposed = true;
}
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
~DefaultMerger()
{
Dispose(false);
}
}
}