-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathchannel.go
318 lines (269 loc) · 6.79 KB
/
channel.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
package main
import (
"net"
"runtime"
"strings"
"sync"
"time"
"github.com/bitly/go-simplejson"
)
const (
CMDKEYCHANIDCLIENT = "chidcli"
CMDKEYCHANIDSERVER = "chidsrv"
CMDKEYCOMMAND = "cmd"
CMDKEYDATA = "data"
CMDKEYREMIP = "remip"
CMDKEYREMPORT = "remport"
CMDKEYCONV = "conv"
CMDKEYMSGID = "msgid"
CMDCONNSYN = "connect_syn"
CMDCONNACK = "connect_ack"
CMDCLOSEFIN = "close_fin"
// CMDCLOSEACK = "close_ack"
CMDSENDDATA = "send_data"
)
var chanid0 = 10000
var chanidlock sync.Mutex
var msgid0 = uint64(10000)
var msgidlock sync.Mutex
func nextChanid() int {
chanidlock.Lock()
defer chanidlock.Unlock()
id := chanid0
chanid0 += 1
return id
}
func nextMsgid() uint64 {
msgidlock.Lock()
defer msgidlock.Unlock()
id := msgid0
msgid0 += 1
return id
}
type Channel struct {
state int
conn net.Conn
chidcli int
chidsrv int
ip string
port string
conv uint32
toxid string // 仅用于服务器端
kcp *KCP
udp_peer_addr net.Addr
// 关闭状态
client_socket_close bool
client_kcp_close bool
server_socket_close bool
server_kcp_close bool
// 连接超时,客户端使用
conn_begin_time time.Time
conn_try_times int
conn_ack_recved bool
close_reasons []string
close_stacks [][]uintptr
rmctimes int
}
func NewChannelClient(conn net.Conn) *Channel {
ch := new(Channel)
ch.chidcli = nextChanid()
ch.conn = conn
ch.conn_begin_time = time.Now()
ch.close_reasons = make([]string, 0)
ch.close_stacks = make([][]uintptr, 0)
return ch
}
func NewChannelWithId(chanid int) *Channel {
ch := new(Channel)
ch.chidsrv = nextChanid()
ch.chidcli = chanid
return ch
}
func NewChannelFromPacket(pkt *Packet) *Channel {
ch := new(Channel)
ch.chidcli = pkt.chidcli
ch.chidsrv = pkt.chidsrv
ch.conv = pkt.conv
return ch
}
func (this *Channel) makeConnectSYNPacket() *Packet {
pkt := NewPacket(this, CMDCONNSYN, "")
pkt.conv = this.conv
return pkt
}
func (this *Channel) makeConnectACKPacket() *Packet {
pkt := NewPacket(this, CMDCONNACK, "")
pkt.conv = this.conv
return pkt
}
func (this *Channel) makeDataPacket(data string) *Packet {
pkt := NewPacket(this, CMDSENDDATA, data)
pkt.conv = this.conv
return pkt
}
func (this *Channel) makeCloseFINPacket() *Packet {
pkt := NewPacket(this, CMDCLOSEFIN, "")
pkt.conv = this.conv
return pkt
}
/*
func (this *Channel) makeCloseACKPacket() *Packet {
pkt := NewPacket(this, CMDCLOSEACK, "")
pkt.conv = this.conv
return pkt
}
*/
func (this *Channel) addCloseReason(reason string) {
for i := 0; i < len(this.close_reasons); i++ {
if reason == this.close_reasons[i] {
info.Println("reason already exists, maybe loop,", reason, this.closeReason())
break
}
}
this.close_reasons = append(this.close_reasons, reason)
if len(this.close_reasons) > 5 {
info.Println(this.chidcli, this.chidsrv, this.conv)
panic("wtf")
}
}
func (this *Channel) closeReason() string {
return strings.Join(this.close_reasons, ",")
}
///////////
type ChannelPool struct {
pool map[int]*Channel
pool2 map[uint32]*Channel
}
func NewChannelPool() *ChannelPool {
p := new(ChannelPool)
p.pool = make(map[int]*Channel, 0)
p.pool2 = make(map[uint32]*Channel, 0)
return p
}
func (this *ChannelPool) putClient(ch *Channel) {
this.pool[ch.chidcli] = ch
if ch.conv > 0 {
this.pool2[ch.conv] = ch
}
appevt.Trigger("chanact", 1, len(this.pool), len(this.pool2))
}
// put lacked
func (this *ChannelPool) putClientLacks(ch *Channel) {
if _, ok := this.pool[ch.chidcli]; !ok {
}
this.pool2[ch.conv] = ch
appevt.Trigger("chanact", 0, len(this.pool), len(this.pool2))
}
func (this *ChannelPool) putServer(ch *Channel) {
this.pool[ch.chidsrv] = ch
this.pool2[ch.conv] = ch
appevt.Trigger("chanact", 1, len(this.pool), len(this.pool2))
}
func dumpStacks(pcs []uintptr) {
for idx, pc := range pcs {
fn := runtime.FuncForPC(pc)
file, line := fn.FileLine(pc)
info.Println(idx, fn.Name(), file, line)
}
}
func (this *ChannelPool) rmClient(ch *Channel) {
ch.rmctimes += 1
haserr := false
if _, ok := this.pool[ch.chidcli]; !ok {
errl.Println("maybe already removed.", ch.chidsrv, ch.chidsrv, ch.conv)
dumpStacks(ch.close_stacks[len(ch.close_stacks)-1])
info.Println("=======")
pcs := make([]uintptr, 16)
pcn := runtime.Callers(1, pcs)
dumpStacks(pcs[0:pcn])
// panic(ch.chidcli)
} else {
pcs := make([]uintptr, 16)
pcn := runtime.Callers(1, pcs)
ch.close_stacks = append(ch.close_stacks, pcs[0:pcn])
delete(this.pool, ch.chidcli)
}
if _, ok := this.pool2[ch.conv]; !ok {
// panic(ch.conv)
} else {
delete(this.pool2, ch.conv)
}
if haserr {
if ch.rmctimes > 2 {
// go func() {
errl.Println("errinfo:", ch.rmctimes, len(this.pool), len(this.pool2))
panic(ch.chidcli)
// }()
} else {
errl.Println("errinfo:", ch.rmctimes, len(this.pool), len(this.pool2))
}
}
appevt.Trigger("chanact", -1, len(this.pool), len(this.pool2))
}
func (this *ChannelPool) rmServer(ch *Channel) {
delete(this.pool, ch.chidsrv)
delete(this.pool2, ch.conv)
appevt.Trigger("chanact", -1, len(this.pool), len(this.pool2))
}
////////////
type Packet struct {
chidcli int
chidsrv int
command string
data string
remoteip string
remoteport string
conv uint32
msgid uint64
}
func NewPacket(ch *Channel, command string, data string) *Packet {
return &Packet{chidcli: ch.chidcli, chidsrv: ch.chidsrv, command: command, data: data,
remoteip: ch.ip, remoteport: ch.port, msgid: nextMsgid()}
}
func NewBrokenPacket(conv uint32) *Packet {
pkt := new(Packet)
pkt.conv = conv
pkt.msgid = nextMsgid()
return pkt
}
func (this *Packet) isconnsyn() bool {
return this.command == CMDCONNSYN
}
func (this *Packet) isconnack() bool {
return this.command == CMDCONNACK
}
func (this *Packet) isdata() bool {
return this.command == CMDSENDDATA
}
func (this *Packet) toJson() []byte {
jso := simplejson.New()
jso.Set(CMDKEYCHANIDCLIENT, this.chidcli)
jso.Set(CMDKEYCHANIDSERVER, this.chidsrv)
jso.Set(CMDKEYCOMMAND, this.command)
jso.Set(CMDKEYDATA, this.data)
jso.Set(CMDKEYREMIP, this.remoteip)
jso.Set(CMDKEYREMPORT, this.remoteport)
jso.Set(CMDKEYCONV, this.conv)
jso.Set(CMDKEYMSGID, this.msgid)
jsb, err := jso.Encode()
if err != nil {
return nil
}
return jsb
}
func parsePacket(buf []byte) *Packet {
jso, err := simplejson.NewJson(buf)
if err != nil {
return nil
}
pkt := new(Packet)
pkt.chidcli = jso.Get(CMDKEYCHANIDCLIENT).MustInt()
pkt.chidsrv = jso.Get(CMDKEYCHANIDSERVER).MustInt()
pkt.command = jso.Get(CMDKEYCOMMAND).MustString()
pkt.data = jso.Get(CMDKEYDATA).MustString()
pkt.remoteip = jso.Get(CMDKEYREMIP).MustString()
pkt.remoteport = jso.Get(CMDKEYREMPORT).MustString()
pkt.conv = uint32(jso.Get(CMDKEYCONV).MustUint64())
pkt.msgid = jso.Get(CMDKEYMSGID).MustUint64()
return pkt
}