From 5510fa9c7e1de7cfb2d941b9c6929c1fcf7ec13f Mon Sep 17 00:00:00 2001 From: Alessandro Ros Date: Mon, 7 Aug 2023 19:01:51 +0200 Subject: [PATCH] add node.IdleTimeout and close idle connections after a timeout (https://github.com/bluenviron/mavp2p/issues/33) (#61) * replace pkg/udplistener with pion/transport/v2/udp * add node.IdleTimeout and close idle connections after a timeout (https://github.com/bluenviron/mavp2p/issues/33) --- endpoint_broadcast_test.go | 10 +- endpoint_client.go | 8 +- endpoint_client_test.go | 127 +++++++-- endpoint_custom_test.go | 10 +- endpoint_serial_test.go | 19 +- endpoint_server.go | 32 ++- endpoint_server_test.go | 75 ++++- go.mod | 3 +- go.sum | 48 +++- node.go | 10 +- node_heartbeat_test.go | 12 +- ...autoreconnector.go => auto_reconnector.go} | 13 +- ...ector_test.go => auto_reconnector_test.go} | 0 pkg/timednetconn/conn.go | 48 ++++ pkg/timednetconn/timednetconn.go | 39 --- pkg/udplistener/udplistener.go | 263 ------------------ pkg/udplistener/udplistener_test.go | 175 ------------ 17 files changed, 323 insertions(+), 569 deletions(-) rename pkg/autoreconnector/{autoreconnector.go => auto_reconnector.go} (86%) rename pkg/autoreconnector/{autoreconnector_test.go => auto_reconnector_test.go} (100%) create mode 100644 pkg/timednetconn/conn.go delete mode 100644 pkg/timednetconn/timednetconn.go delete mode 100644 pkg/udplistener/udplistener.go delete mode 100644 pkg/udplistener/udplistener_test.go diff --git a/endpoint_broadcast_test.go b/endpoint_broadcast_test.go index f9db3de1a..b489d88a0 100644 --- a/endpoint_broadcast_test.go +++ b/endpoint_broadcast_test.go @@ -8,7 +8,6 @@ import ( "github.com/bluenviron/gomavlib/v2/pkg/dialect" "github.com/bluenviron/gomavlib/v2/pkg/frame" - "github.com/bluenviron/gomavlib/v2/pkg/message" ) var _ endpointChannelSingle = (*endpointUDPBroadcast)(nil) @@ -27,13 +26,8 @@ func (rw *readWriterFromFuncs) Write(p []byte) (int, error) { } func TestEndpointBroadcast(t *testing.T) { - dial := &dialect.Dialect{ - Version: 3, - Messages: []message.Message{&MessageHeartbeat{}}, - } - node, err := NewNode(NodeConf{ - Dialect: dial, + Dialect: testDialect, OutVersion: V2, OutSystemID: 10, Endpoints: []EndpointConf{EndpointUDPBroadcast{"127.255.255.255:5602", ":5601"}}, @@ -51,7 +45,7 @@ func TestEndpointBroadcast(t *testing.T) { require.NoError(t, err) defer pc.Close() - dialectRW, err := dialect.NewReadWriter(dial) + dialectRW, err := dialect.NewReadWriter(testDialect) require.NoError(t, err) rw, err := frame.NewReadWriter(frame.ReadWriterConf{ diff --git a/endpoint_client.go b/endpoint_client.go index ad9acf212..cfbe0be9a 100644 --- a/endpoint_client.go +++ b/endpoint_client.go @@ -79,14 +79,20 @@ func initEndpointClient(node *Node, conf endpointClientConf) (Endpoint, error) { } return "tcp4" }() + timedContext, timedContextClose := context.WithTimeout(ctx, node.conf.ReadTimeout) nconn, err := (&net.Dialer{}).DialContext(timedContext, network, conf.getAddress()) timedContextClose() + if err != nil { return nil, err } - return timednetconn.New(node.conf.WriteTimeout, nconn), nil + return timednetconn.New( + node.conf.IdleTimeout, + node.conf.WriteTimeout, + nconn, + ), nil }, ), } diff --git a/endpoint_client_test.go b/endpoint_client_test.go index 9aaf71a05..bc92c4ab6 100644 --- a/endpoint_client_test.go +++ b/endpoint_client_test.go @@ -1,15 +1,16 @@ package gomavlib import ( + "io" "net" "testing" + "time" + "github.com/pion/transport/v2/udp" "github.com/stretchr/testify/require" "github.com/bluenviron/gomavlib/v2/pkg/dialect" "github.com/bluenviron/gomavlib/v2/pkg/frame" - "github.com/bluenviron/gomavlib/v2/pkg/message" - "github.com/bluenviron/gomavlib/v2/pkg/udplistener" ) var _ endpointChannelSingle = (*endpointClient)(nil) @@ -17,28 +18,20 @@ var _ endpointChannelSingle = (*endpointClient)(nil) func TestEndpointClient(t *testing.T) { for _, ca := range []string{"tcp", "udp"} { t.Run(ca, func(t *testing.T) { - dial := &dialect.Dialect{ - Version: 3, - Messages: []message.Message{&MessageHeartbeat{}}, - } - var ln net.Listener - var err error if ca == "tcp" { + var err error ln, err = net.Listen("tcp", "127.0.0.1:5601") + require.NoError(t, err) } else { - ln, err = udplistener.New("udp", "127.0.0.1:5601") - } - require.NoError(t, err) - defer ln.Close() - - connOpened := make(chan net.Conn) + addr, err := net.ResolveUDPAddr("udp", "127.0.0.1:5601") + require.NoError(t, err) - go func() { - conn, err := ln.Accept() + ln, err = udp.Listen("udp", addr) require.NoError(t, err) - connOpened <- conn - }() + } + + defer ln.Close() var e EndpointConf if ca == "tcp" { @@ -48,7 +41,7 @@ func TestEndpointClient(t *testing.T) { } node, err := NewNode(NodeConf{ - Dialect: dial, + Dialect: testDialect, OutVersion: V2, OutSystemID: 10, Endpoints: []EndpointConf{e}, @@ -62,7 +55,6 @@ func TestEndpointClient(t *testing.T) { Channel: evt.(*EventChannelOpen).Channel, }, evt) - var conn net.Conn var rw *frame.ReadWriter for i := 0; i < 3; i++ { @@ -77,10 +69,11 @@ func TestEndpointClient(t *testing.T) { node.WriteMessageAll(msg) if i == 0 { - conn = <-connOpened + conn, err := ln.Accept() + require.NoError(t, err) defer conn.Close() - dialectRW, err := dialect.NewReadWriter(dial) + dialectRW, err := dialect.NewReadWriter(testDialect) require.NoError(t, err) rw, err = frame.NewReadWriter(frame.ReadWriterConf{ @@ -128,3 +121,93 @@ func TestEndpointClient(t *testing.T) { }) } } + +func TestEndpointClientIdleTimeout(t *testing.T) { + for _, ca := range []string{"tcp"} { + t.Run(ca, func(t *testing.T) { + var ln net.Listener + var err error + ln, err = net.Listen("tcp", "127.0.0.1:5603") + require.NoError(t, err) + + defer ln.Close() + + var e EndpointConf + if ca == "tcp" { + e = EndpointTCPClient{"127.0.0.1:5603"} + } else { + e = EndpointUDPClient{"127.0.0.1:5603"} + } + + node, err := NewNode(NodeConf{ + Dialect: testDialect, + OutVersion: V2, + OutSystemID: 10, + Endpoints: []EndpointConf{e}, + HeartbeatDisable: true, + IdleTimeout: 500 * time.Millisecond, + }) + require.NoError(t, err) + defer node.Close() + + evt := <-node.Events() + require.Equal(t, &EventChannelOpen{ + Channel: evt.(*EventChannelOpen).Channel, + }, evt) + + msg := &MessageHeartbeat{ + Type: 1, + Autopilot: 2, + BaseMode: 3, + CustomMode: 6, + SystemStatus: 4, + MavlinkVersion: 5, + } + node.WriteMessageAll(msg) + + conn, err := ln.Accept() + require.NoError(t, err) + + dialectRW, err := dialect.NewReadWriter(testDialect) + require.NoError(t, err) + + rw, err := frame.NewReadWriter(frame.ReadWriterConf{ + ReadWriter: conn, + DialectRW: dialectRW, + OutVersion: frame.V2, + OutSystemID: 11, + }) + require.NoError(t, err) + + fr, err := rw.Read() + require.NoError(t, err) + require.Equal(t, &frame.V2Frame{ + SequenceID: 0, + SystemID: 10, + ComponentID: 1, + Message: msg, + Checksum: fr.GetChecksum(), + }, fr) + + closed := make(chan struct{}) + + go func() { + _, err = rw.Read() + require.Equal(t, io.EOF, err) + conn.Close() + close(closed) + }() + + select { + case <-closed: + case <-time.After(1 * time.Second): + t.Errorf("should not happen") + } + + // the client reconnects to the server due to autoReconnector + conn, err = ln.Accept() + require.NoError(t, err) + conn.Close() + }) + } +} diff --git a/endpoint_custom_test.go b/endpoint_custom_test.go index ed5ab9a36..93c85fc75 100644 --- a/endpoint_custom_test.go +++ b/endpoint_custom_test.go @@ -10,7 +10,6 @@ import ( "github.com/bluenviron/gomavlib/v2/pkg/dialect" "github.com/bluenviron/gomavlib/v2/pkg/frame" - "github.com/bluenviron/gomavlib/v2/pkg/message" ) var _ endpointChannelSingle = (*endpointCustom)(nil) @@ -65,15 +64,10 @@ func (e *dummyEndpoint) Write(p []byte) (int, error) { } func TestEndpointCustom(t *testing.T) { - dial := &dialect.Dialect{ - Version: 3, - Messages: []message.Message{&MessageHeartbeat{}}, - } - de := newDummyEndpoint() node, err := NewNode(NodeConf{ - Dialect: dial, + Dialect: testDialect, OutVersion: V2, OutSystemID: 10, Endpoints: []EndpointConf{EndpointCustom{de}}, @@ -87,7 +81,7 @@ func TestEndpointCustom(t *testing.T) { Channel: evt.(*EventChannelOpen).Channel, }, evt) - dialectRW, err := dialect.NewReadWriter(dial) + dialectRW, err := dialect.NewReadWriter(testDialect) require.NoError(t, err) var buf bytes.Buffer diff --git a/endpoint_serial_test.go b/endpoint_serial_test.go index 3b8a91b20..09deafcb7 100644 --- a/endpoint_serial_test.go +++ b/endpoint_serial_test.go @@ -9,7 +9,6 @@ import ( "github.com/bluenviron/gomavlib/v2/pkg/dialect" "github.com/bluenviron/gomavlib/v2/pkg/frame" - "github.com/bluenviron/gomavlib/v2/pkg/message" ) var _ endpointChannelSingle = (*endpointSerial)(nil) @@ -22,13 +21,8 @@ func TestEndpointSerial(t *testing.T) { return de, nil } - dial := &dialect.Dialect{ - Version: 3, - Messages: []message.Message{&MessageHeartbeat{}}, - } - node, err := NewNode(NodeConf{ - Dialect: dial, + Dialect: testDialect, OutVersion: V2, OutSystemID: 10, Endpoints: []EndpointConf{EndpointSerial{ @@ -49,7 +43,7 @@ func TestEndpointSerial(t *testing.T) { de := <-endpointCreated - dialectRW, err := dialect.NewReadWriter(dial) + dialectRW, err := dialect.NewReadWriter(testDialect) require.NoError(t, err) var buf bytes.Buffer @@ -120,13 +114,8 @@ func TestEndpointSerialReconnect(t *testing.T) { return de, nil } - dial := &dialect.Dialect{ - Version: 3, - Messages: []message.Message{&MessageHeartbeat{}}, - } - node, err := NewNode(NodeConf{ - Dialect: dial, + Dialect: testDialect, OutVersion: V2, OutSystemID: 10, Endpoints: []EndpointConf{EndpointSerial{ @@ -147,7 +136,7 @@ func TestEndpointSerialReconnect(t *testing.T) { de := <-endpointCreated - dialectRW, err := dialect.NewReadWriter(dial) + dialectRW, err := dialect.NewReadWriter(testDialect) require.NoError(t, err) var buf bytes.Buffer diff --git a/endpoint_server.go b/endpoint_server.go index e17a6c7d2..834c0b73c 100644 --- a/endpoint_server.go +++ b/endpoint_server.go @@ -6,8 +6,9 @@ import ( "net" "time" + "github.com/pion/transport/v2/udp" + "github.com/bluenviron/gomavlib/v2/pkg/timednetconn" - "github.com/bluenviron/gomavlib/v2/pkg/udplistener" ) type endpointServerConf interface { @@ -53,6 +54,7 @@ type endpointServer struct { conf endpointServerConf listener net.Listener writeTimeout time.Duration + idleTimeout time.Duration // in terminate chan struct{} @@ -72,20 +74,29 @@ func initEndpointServer(node *Node, conf endpointServerConf) (Endpoint, error) { return nil, fmt.Errorf("invalid address") } - var listener net.Listener + var ln net.Listener if conf.isUDP() { - listener, err = udplistener.New("udp4", conf.getAddress()) + addr, err := net.ResolveUDPAddr("udp4", conf.getAddress()) + if err != nil { + return nil, err + } + + ln, err = udp.Listen("udp4", addr) + if err != nil { + return nil, err + } } else { - listener, err = net.Listen("tcp4", conf.getAddress()) - } - if err != nil { - return nil, err + ln, err = net.Listen("tcp4", conf.getAddress()) + if err != nil { + return nil, err + } } t := &endpointServer{ conf: conf, writeTimeout: node.conf.WriteTimeout, - listener: listener, + idleTimeout: node.conf.IdleTimeout, + listener: ln, terminate: make(chan struct{}), } return t, nil @@ -118,7 +129,10 @@ func (t *endpointServer) accept() (string, io.ReadWriteCloser, error) { return "tcp" }(), nconn.RemoteAddr()) - conn := timednetconn.New(t.writeTimeout, nconn) + conn := timednetconn.New( + t.idleTimeout, + t.writeTimeout, + nconn) return label, conn, nil } diff --git a/endpoint_server_test.go b/endpoint_server_test.go index 6bc2dc2af..ccbf4668f 100644 --- a/endpoint_server_test.go +++ b/endpoint_server_test.go @@ -3,12 +3,12 @@ package gomavlib import ( "net" "testing" + "time" "github.com/stretchr/testify/require" "github.com/bluenviron/gomavlib/v2/pkg/dialect" "github.com/bluenviron/gomavlib/v2/pkg/frame" - "github.com/bluenviron/gomavlib/v2/pkg/message" ) var _ endpointChannelAccepter = (*endpointServer)(nil) @@ -16,11 +16,6 @@ var _ endpointChannelAccepter = (*endpointServer)(nil) func TestEndpointServer(t *testing.T) { for _, ca := range []string{"tcp", "udp"} { t.Run(ca, func(t *testing.T) { - dial := &dialect.Dialect{ - Version: 3, - Messages: []message.Message{&MessageHeartbeat{}}, - } - var e EndpointConf if ca == "tcp" { e = EndpointTCPServer{"127.0.0.1:5601"} @@ -29,7 +24,7 @@ func TestEndpointServer(t *testing.T) { } node, err := NewNode(NodeConf{ - Dialect: dial, + Dialect: testDialect, OutVersion: V2, OutSystemID: 10, Endpoints: []EndpointConf{e}, @@ -42,7 +37,7 @@ func TestEndpointServer(t *testing.T) { require.NoError(t, err) defer conn.Close() - dialectRW, err := dialect.NewReadWriter(dial) + dialectRW, err := dialect.NewReadWriter(testDialect) require.NoError(t, err) rw, err := frame.NewReadWriter(frame.ReadWriterConf{ @@ -107,3 +102,67 @@ func TestEndpointServer(t *testing.T) { }) } } + +func TestEndpointServerIdleTimeout(t *testing.T) { + for _, ca := range []string{"tcp", "udp"} { + t.Run(ca, func(t *testing.T) { + var e EndpointConf + if ca == "tcp" { + e = EndpointTCPServer{"127.0.0.1:5601"} + } else { + e = EndpointUDPServer{"127.0.0.1:5601"} + } + + node, err := NewNode(NodeConf{ + Dialect: testDialect, + OutVersion: V2, + OutSystemID: 10, + Endpoints: []EndpointConf{e}, + HeartbeatDisable: true, + IdleTimeout: 500 * time.Millisecond, + }) + require.NoError(t, err) + defer node.Close() + + conn, err := net.Dial(ca, "127.0.0.1:5601") + require.NoError(t, err) + defer conn.Close() + + dialectRW, err := dialect.NewReadWriter(testDialect) + require.NoError(t, err) + + rw, err := frame.NewReadWriter(frame.ReadWriterConf{ + ReadWriter: conn, + DialectRW: dialectRW, + OutVersion: frame.V2, + OutSystemID: 11, + }) + require.NoError(t, err) + + msg := &MessageHeartbeat{ + Type: 1, + Autopilot: 2, + BaseMode: 3, + CustomMode: 6, + SystemStatus: 4, + MavlinkVersion: 5, + } + err = rw.WriteMessage(msg) + require.NoError(t, err) + + evt := <-node.Events() + ch := evt.(*EventChannelOpen).Channel + require.Equal(t, &EventChannelOpen{ + Channel: ch, + }, evt) + + // frame + <-node.Events() + + evt = <-node.Events() + require.Equal(t, &EventChannelClose{ + Channel: ch, + }, evt) + }) + } +} diff --git a/go.mod b/go.mod index 429fa83c4..05addecdf 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.18 require ( bou.ke/monkey v1.0.2 github.com/alecthomas/kong v0.8.0 + github.com/pion/transport/v2 v2.2.1 github.com/stretchr/testify v1.8.4 github.com/tarm/serial v0.0.0-20180830185346-98f6abe2eb07 ) @@ -12,6 +13,6 @@ require ( require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect - golang.org/x/sys v0.1.0 // indirect + golang.org/x/sys v0.7.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index fadcf58f9..965c0b157 100644 --- a/go.sum +++ b/go.sum @@ -4,18 +4,62 @@ github.com/alecthomas/assert/v2 v2.1.0 h1:tbredtNcQnoSd3QBhQWI7QZ3XHOVkw1Moklp2o github.com/alecthomas/kong v0.8.0 h1:ryDCzutfIqJPnNn0omnrgHLbAggDQM2VWHikE1xqK7s= github.com/alecthomas/kong v0.8.0/go.mod h1:n1iCIO2xS46oE8ZfYCNDqdR0b0wZNrXAIAqro/2132U= github.com/alecthomas/repr v0.1.0 h1:ENn2e1+J3k09gyj2shc0dHr/yjaWSHRlrJ4DPMevDqE= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/hexops/gotextdiff v1.0.3 h1:gitA9+qJrrTCsiCl7+kh75nPqQt1cx4ZkudSTLoUqJM= +github.com/pion/logging v0.2.2/go.mod h1:k0/tDVsRCX2Mb2ZEmTqNa7CWsQPc+YYCB7Q+5pahoms= +github.com/pion/transport/v2 v2.2.1 h1:7qYnCBlpgSJNYMbLCKuSY9KbQdBFoETvPNETv0y4N7c= +github.com/pion/transport/v2 v2.2.1/go.mod h1:cXXWavvCnFF6McHTft3DWS9iic2Mftcz1Aq29pGcU5g= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.3/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/tarm/serial v0.0.0-20180830185346-98f6abe2eb07 h1:UyzmZLoiDWMRywV4DUYb9Fbt8uiOSooupjTq10vpvnU= github.com/tarm/serial v0.0.0-20180830185346-98f6abe2eb07/go.mod h1:kDXzergiv9cbyO7IOYJZWg1U88JhDg3PB6klq9Hg2pA= -golang.org/x/sys v0.1.0 h1:kunALQeHf1/185U1i0GOB/fy1IPRDDpuoOOqRReG57U= -golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= +golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= +golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/net v0.9.0/go.mod h1:d48xBJpPfHeWQsugry2m+kC02ZBRGRgulfHnEXEuWns= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.7.0 h1:3jlCCIQZPdOYu1h8BkNvLz8Kgwtae2cagcG/VamtZRU= +golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= +golang.org/x/term v0.7.0/go.mod h1:P32HKFT3hSsZrRxla30E9HqToFYAQPCMs/zFMBUFqPY= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= +golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/node.go b/node.go index 4a6a3db3d..697fa3744 100644 --- a/node.go +++ b/node.go @@ -81,8 +81,11 @@ type NodeConf struct { // It defaults to 10 seconds. ReadTimeout time.Duration // (optional) write timeout. - // It defaults to 5 seconds. + // It defaults to 10 seconds. WriteTimeout time.Duration + // (optional) timeout before closing idle connections. + // It defaults to 60 seconds. + IdleTimeout time.Duration } // Node is a high-level Mavlink encoder and decoder that works with endpoints. @@ -145,7 +148,10 @@ func NewNode(conf NodeConf) (*Node, error) { conf.ReadTimeout = 10 * time.Second } if conf.WriteTimeout == 0 { - conf.WriteTimeout = 5 * time.Second + conf.WriteTimeout = 10 * time.Second + } + if conf.IdleTimeout == 0 { + conf.IdleTimeout = 60 * time.Second } dialectRW, err := func() (*dialect.ReadWriter, error) { diff --git a/node_heartbeat_test.go b/node_heartbeat_test.go index bfdc1df6e..7f4779d03 100644 --- a/node_heartbeat_test.go +++ b/node_heartbeat_test.go @@ -5,19 +5,11 @@ import ( "time" "github.com/stretchr/testify/require" - - "github.com/bluenviron/gomavlib/v2/pkg/dialect" - "github.com/bluenviron/gomavlib/v2/pkg/message" ) func TestNodeHeartbeat(t *testing.T) { - dialect := &dialect.Dialect{ - Version: 3, - Messages: []message.Message{&MessageHeartbeat{}}, - } - node1, err := NewNode(NodeConf{ - Dialect: dialect, + Dialect: testDialect, OutVersion: V2, OutSystemID: 10, Endpoints: []EndpointConf{ @@ -29,7 +21,7 @@ func TestNodeHeartbeat(t *testing.T) { defer node1.Close() node2, err := NewNode(NodeConf{ - Dialect: dialect, + Dialect: testDialect, OutVersion: V2, OutSystemID: 11, Endpoints: []EndpointConf{ diff --git a/pkg/autoreconnector/autoreconnector.go b/pkg/autoreconnector/auto_reconnector.go similarity index 86% rename from pkg/autoreconnector/autoreconnector.go rename to pkg/autoreconnector/auto_reconnector.go index d61220eab..172cd5c40 100644 --- a/pkg/autoreconnector/autoreconnector.go +++ b/pkg/autoreconnector/auto_reconnector.go @@ -14,7 +14,7 @@ var ( errTerminated = errors.New("terminated") ) -type autoreconnector struct { +type autoReconnector struct { connect func(context.Context) (io.ReadWriteCloser, error) ctx context.Context @@ -29,14 +29,14 @@ func New( ) io.ReadWriteCloser { ctx, ctxCancel := context.WithCancel(context.Background()) - return &autoreconnector{ + return &autoReconnector{ connect: connect, ctx: ctx, ctxCancel: ctxCancel, } } -func (a *autoreconnector) Close() error { +func (a *autoReconnector) Close() error { a.ctxCancel() a.connMutex.Lock() @@ -50,7 +50,7 @@ func (a *autoreconnector) Close() error { return nil } -func (a *autoreconnector) getConnection(reset bool) (io.ReadWriteCloser, bool) { +func (a *autoReconnector) getConnection(reset bool) (io.ReadWriteCloser, bool) { a.connMutex.Lock() defer a.connMutex.Unlock() @@ -92,7 +92,7 @@ func (a *autoreconnector) getConnection(reset bool) (io.ReadWriteCloser, bool) { } } -func (a *autoreconnector) Read(p []byte) (int, error) { +func (a *autoReconnector) Read(p []byte) (int, error) { reset := false for { @@ -110,7 +110,7 @@ func (a *autoreconnector) Read(p []byte) (int, error) { } } -func (a *autoreconnector) Write(p []byte) (int, error) { +func (a *autoReconnector) Write(p []byte) (int, error) { reset := false for { @@ -120,6 +120,7 @@ func (a *autoreconnector) Write(p []byte) (int, error) { } n, err := curConn.Write(p) + if err == nil { return n, nil } diff --git a/pkg/autoreconnector/autoreconnector_test.go b/pkg/autoreconnector/auto_reconnector_test.go similarity index 100% rename from pkg/autoreconnector/autoreconnector_test.go rename to pkg/autoreconnector/auto_reconnector_test.go diff --git a/pkg/timednetconn/conn.go b/pkg/timednetconn/conn.go new file mode 100644 index 000000000..f25cf3514 --- /dev/null +++ b/pkg/timednetconn/conn.go @@ -0,0 +1,48 @@ +// Package timednetconn contains a net.Conn wrapper with deadlines. +package timednetconn + +import ( + "io" + "net" + "time" +) + +type conn struct { + readTimeout time.Duration + writeTimeout time.Duration + wrapped net.Conn +} + +// New returns a io.ReadWriteCloser that calls SetReadDeadline() before Read() +// and SetWriteDeadline() before Write(). +func New( + readTimeout time.Duration, + writeTimeout time.Duration, + wrapped net.Conn, +) io.ReadWriteCloser { + return &conn{ + readTimeout: readTimeout, + writeTimeout: writeTimeout, + wrapped: wrapped, + } +} + +func (c *conn) Close() error { + return c.wrapped.Close() +} + +func (c *conn) Read(buf []byte) (int, error) { + err := c.wrapped.SetReadDeadline(time.Now().Add(c.readTimeout)) + if err != nil { + return 0, err + } + return c.wrapped.Read(buf) +} + +func (c *conn) Write(buf []byte) (int, error) { + err := c.wrapped.SetWriteDeadline(time.Now().Add(c.writeTimeout)) + if err != nil { + return 0, err + } + return c.wrapped.Write(buf) +} diff --git a/pkg/timednetconn/timednetconn.go b/pkg/timednetconn/timednetconn.go deleted file mode 100644 index 3f3d06b3e..000000000 --- a/pkg/timednetconn/timednetconn.go +++ /dev/null @@ -1,39 +0,0 @@ -// Package timednetconn contains a net.Conn wrapper that calls SetWriteDeadline() before Write(). -package timednetconn - -import ( - "io" - "net" - "time" -) - -type timednetconn struct { - writeTimeout time.Duration - wrapped net.Conn -} - -// New returns a io.ReadWriteCloser that calls SetWriteDeadline() before Write(). -func New(writeTimeout time.Duration, wrapped net.Conn) io.ReadWriteCloser { - return &timednetconn{ - writeTimeout: writeTimeout, - wrapped: wrapped, - } -} - -func (c *timednetconn) Close() error { - return c.wrapped.Close() -} - -func (c *timednetconn) Read(buf []byte) (int, error) { - // do not call SetReadDeadline() - // since we don't want to disconnect in case of long pauses between messages - return c.wrapped.Read(buf) -} - -func (c *timednetconn) Write(buf []byte) (int, error) { - err := c.wrapped.SetWriteDeadline(time.Now().Add(c.writeTimeout)) - if err != nil { - return 0, err - } - return c.wrapped.Write(buf) -} diff --git a/pkg/udplistener/udplistener.go b/pkg/udplistener/udplistener.go deleted file mode 100644 index cc29b91eb..000000000 --- a/pkg/udplistener/udplistener.go +++ /dev/null @@ -1,263 +0,0 @@ -// Package udplistener provides a UDP-based Listener. -package udplistener - -import ( - "net" - "sync" - "time" -) - -// MTU is ~1500 -const bufferSize = 2048 - -// implements net.Error -type udpNetError struct { - str string - isTimeout bool -} - -func (e udpNetError) Error() string { - return e.str -} - -func (e udpNetError) Timeout() bool { - return e.isTimeout -} - -func (udpNetError) Temporary() bool { - return false -} - -var ( - errTimeout net.Error = udpNetError{"timeout", true} - errTerminated net.Error = udpNetError{"terminated", false} -) - -type connIndex struct { - IP [4]byte - Port int -} - -type conn struct { - listener *Listener - index connIndex - addr *net.UDPAddr - closed bool - readDeadline time.Time - writeDeadline time.Time - - // in - read chan []byte -} - -func newConn(listener *Listener, index connIndex, addr *net.UDPAddr) *conn { - return &conn{ - listener: listener, - index: index, - addr: addr, - read: make(chan []byte), - } -} - -// LocalAddr implements the net.Conn interface. -func (c *conn) LocalAddr() net.Addr { - // not implemented - return nil -} - -// RemoteAddr implements the net.Conn interface. -func (c *conn) RemoteAddr() net.Addr { - return c.addr -} - -// Close implements the net.Conn interface. -func (c *conn) Close() error { - c.listener.readMutex.Lock() - defer c.listener.readMutex.Unlock() - - if c.closed { - return nil - } - - c.closed = true - delete(c.listener.conns, c.index) - - // release anyone waiting on Read() - close(c.read) - - // close socket when both listener and connections are closed - if c.listener.closed && len(c.listener.conns) == 0 { - c.listener.pc.Close() - } - - return nil -} - -// Read implements the net.Conn interface. -func (c *conn) Read(byt []byte) (int, error) { - var buf []byte - var ok bool - - if !c.readDeadline.IsZero() { - select { - case <-time.After(time.Until(c.readDeadline)): - return 0, errTimeout - case buf, ok = <-c.read: - } - } else { - buf, ok = <-c.read - } - - if !ok { - return 0, errTerminated - } - - n := copy(byt, buf) - c.listener.readDone <- n - return n, nil -} - -// Write implements the net.Conn interface. -func (c *conn) Write(byt []byte) (int, error) { - c.listener.writeMutex.Lock() - defer c.listener.writeMutex.Unlock() - - if !c.writeDeadline.IsZero() { - err := c.listener.pc.SetWriteDeadline(c.writeDeadline) - if err != nil { - return 0, err - } - } - - return c.listener.pc.WriteTo(byt, c.addr) -} - -// SetDeadline implements the net.Conn interface. -func (c *conn) SetDeadline(time.Time) error { - // not implemented - return nil -} - -// SetReadDeadline implements the net.Conn interface. -func (c *conn) SetReadDeadline(t time.Time) error { - c.readDeadline = t - return nil -} - -// SetWriteDeadline implements the net.Conn interface. -func (c *conn) SetWriteDeadline(t time.Time) error { - c.writeDeadline = t - return nil -} - -// Listener is a UDP listener. -type Listener struct { - pc net.PacketConn - conns map[connIndex]*conn - readMutex sync.Mutex - writeMutex sync.Mutex - closed bool - - accept chan net.Conn - readDone chan int -} - -// New allocates a Listener. -func New(network, address string) (net.Listener, error) { - pc, err := net.ListenPacket(network, address) - if err != nil { - return nil, err - } - - l := &Listener{ - pc: pc, - conns: make(map[connIndex]*conn), - accept: make(chan net.Conn), - readDone: make(chan int), - } - - go l.runReader() - - return l, nil -} - -// Close implements the net.Listener interface. -func (l *Listener) Close() error { - l.readMutex.Lock() - defer l.readMutex.Unlock() - - if l.closed { - return nil - } - - l.closed = true - - // release anyone waiting on Accept() - close(l.accept) - - // close socket when both listener and connections are closed - if len(l.conns) == 0 { - l.pc.Close() - } - - return nil -} - -// Addr implements the net.Listener interface. -func (l *Listener) Addr() net.Addr { - return l.pc.LocalAddr() -} - -func (l *Listener) runReader() { - buf := make([]byte, bufferSize) - - for { - // read WITHOUT deadline. Long periods without packets are normal since - // we're not directly connected to someone. - n, addr, err := l.pc.ReadFrom(buf) - if err != nil { - break - } - - // use ip and port as connection index - uaddr := addr.(*net.UDPAddr) - connIndex := connIndex{} - connIndex.Port = uaddr.Port - copy(connIndex.IP[:], uaddr.IP) - - func() { - l.readMutex.Lock() - defer l.readMutex.Unlock() - - conn, preExisting := l.conns[connIndex] - - if preExisting || !l.closed { - if !preExisting { - conn = newConn(l, connIndex, uaddr) - l.conns[connIndex] = conn - l.accept <- conn - } - - start := 0 - for n > 0 { - // route buffer to connection - conn.read <- buf[start : start+n] - - // wait copy since buffer is shared - read := <-l.readDone - n -= read - start += read - } - } - }() - } -} - -// Accept implements the net.Listener interface. -func (l *Listener) Accept() (net.Conn, error) { - conn, ok := <-l.accept - if !ok { - return nil, errTerminated - } - return conn, nil -} diff --git a/pkg/udplistener/udplistener_test.go b/pkg/udplistener/udplistener_test.go deleted file mode 100644 index 5ea5632e0..000000000 --- a/pkg/udplistener/udplistener_test.go +++ /dev/null @@ -1,175 +0,0 @@ -package udplistener - -import ( - "bytes" - "net" - "sync" - "testing" - "time" - - "github.com/stretchr/testify/require" -) - -func TestNew(t *testing.T) { - testBuf1 := []byte("testing testing 1 2 3") - testBuf2 := []byte("second part") - - l, err := New("udp4", "127.0.0.1:18456") - require.NoError(t, err) - defer l.Close() - - var wg sync.WaitGroup - wg.Add(5) - - go func() { - defer wg.Done() - - for i := 0; i < 2; i++ { - conn, err := l.Accept() - require.NoError(t, err) - - go func() { - defer wg.Done() - defer conn.Close() - - buf := make([]byte, 1024) - n, err := conn.Read(buf) - require.NoError(t, err) - require.Equal(t, len(testBuf1), n) - require.Equal(t, testBuf1, buf[:n]) - - n, err = conn.Write(testBuf2) - require.NoError(t, err) - require.Equal(t, len(testBuf2), n) - }() - } - }() - - for i := 0; i < 2; i++ { - go func() { - defer wg.Done() - - conn, err := net.Dial("udp4", "127.0.0.1:18456") - require.NoError(t, err) - defer conn.Close() - - conn.SetWriteDeadline(time.Now().Add(500 * time.Millisecond)) - n, err := conn.Write(testBuf1) - require.NoError(t, err) - require.Equal(t, len(testBuf1), n) - - buf := make([]byte, 1024) - n, err = conn.Read(buf) - require.NoError(t, err) - require.Equal(t, len(testBuf2), n) - require.Equal(t, testBuf2, buf[:n]) - }() - } - - wg.Wait() -} - -func TestNewError(t *testing.T) { - l1, err := New("udp4", "127.0.0.1:18456") - require.NoError(t, err) - defer l1.Close() - - _, err = New("udp4", "127.0.0.1:18456") - require.EqualError(t, err, "listen udp4 127.0.0.1:18456: bind: address already in use") -} - -func TestSamePacketMultipleReads(t *testing.T) { - l, err := New("udp4", "127.0.0.1:18456") - require.NoError(t, err) - defer l.Close() - - var wg sync.WaitGroup - wg.Add(1) - - go func() { - defer wg.Done() - - conn, err := l.Accept() - require.NoError(t, err) - defer conn.Close() - - buf := make([]byte, 256) - - for i := 0; i < 4; i++ { - n, err := conn.Read(buf) - require.NoError(t, err) - require.Equal(t, 256, n) - } - }() - - conn, err := net.Dial("udp4", "127.0.0.1:18456") - require.NoError(t, err) - defer conn.Close() - - _, err = conn.Write(bytes.Repeat([]byte{0x01, 0x02, 0x03, 0x04}, 1024/4)) - require.NoError(t, err) - - wg.Wait() -} - -func TestReadDeadline(t *testing.T) { - l, err := New("udp4", "127.0.0.1:18456") - require.NoError(t, err) - defer l.Close() - - var wg sync.WaitGroup - wg.Add(2) - var err1 error - var err2 error - - go func() { - defer wg.Done() - - conn, err := l.Accept() - require.NoError(t, err) - defer conn.Close() - - for i := 0; i < 2; i++ { - err = conn.SetReadDeadline(time.Now().Add(500 * time.Millisecond)) - require.NoError(t, err) - - buf := make([]byte, 1024) - _, err := conn.Read(buf) - if err != nil { - // accept first Read() - if i == 0 { - err1 = err - return - } - // second Read() must fail with Timeout - if ne, ok := err.(net.Error); ok && ne.Timeout() { - return - } - err1 = err - return - } - } - }() - - go func() { - defer wg.Done() - - conn, err := net.Dial("udp4", "127.0.0.1:18456") - require.NoError(t, err) - defer conn.Close() - - _, err = conn.Write([]byte("a")) - require.NoError(t, err) - }() - - wg.Wait() - require.NoError(t, err1) - require.NoError(t, err2) -} - -func TestDoubleClose(t *testing.T) { - l, err := New("udp4", "127.0.0.1:18456") - require.NoError(t, err) - l.Close() - l.Close() -}