From f81129b5ad74e6f9a81794822b26e8263290b120 Mon Sep 17 00:00:00 2001 From: Alessandro Ros Date: Thu, 21 Sep 2023 23:55:44 +0200 Subject: [PATCH] in case of reconnection, make sure that EventClose is sent before EventOpen (#82) --- channel.go | 5 +++++ channel_provider.go | 29 +++++++++++++++++++---------- endpoint.go | 1 + endpoint_client.go | 4 ++++ endpoint_client_test.go | 2 ++ endpoint_serial.go | 4 ++++ endpoint_server.go | 4 ++++ pkg/reconnector/reconnector.go | 4 ++-- 8 files changed, 41 insertions(+), 12 deletions(-) diff --git a/channel.go b/channel.go index 6e9324a00..e55016b87 100644 --- a/channel.go +++ b/channel.go @@ -36,6 +36,9 @@ type Channel struct { // in chWrite chan interface{} + + // out + done chan struct{} } func newChannel( @@ -79,6 +82,7 @@ func newChannel( ctxCancel: ctxCancel, frw: frw, chWrite: make(chan interface{}, writeBufferSize), + done: make(chan struct{}), }, nil } @@ -96,6 +100,7 @@ func (ch *Channel) start() { } func (ch *Channel) run() { + defer close(ch.done) defer ch.n.wg.Done() readerDone := make(chan struct{}) diff --git a/channel_provider.go b/channel_provider.go index d08c893e1..72f7e2eb1 100644 --- a/channel_provider.go +++ b/channel_provider.go @@ -16,20 +16,20 @@ func newChannelProvider(n *Node, eca endpointChannelProvider) (*channelProvider, }, nil } -func (ca *channelProvider) close() { - ca.eca.close() +func (cp *channelProvider) close() { + cp.eca.close() } -func (ca *channelProvider) start() { - ca.n.wg.Add(1) - go ca.run() +func (cp *channelProvider) start() { + cp.n.wg.Add(1) + go cp.run() } -func (ca *channelProvider) run() { - defer ca.n.wg.Done() +func (cp *channelProvider) run() { + defer cp.n.wg.Done() for { - label, rwc, err := ca.eca.provide() + label, rwc, err := cp.eca.provide() if err != nil { if err != errTerminated { panic("errTerminated is the only error allowed here") @@ -37,11 +37,20 @@ func (ca *channelProvider) run() { break } - ch, err := newChannel(ca.n, ca.eca, label, rwc) + ch, err := newChannel(cp.n, cp.eca, label, rwc) if err != nil { panic(fmt.Errorf("newChannel unexpected error: %s", err)) } - ca.n.newChannel(ch) + cp.n.newChannel(ch) + + if cp.eca.oneChannelAtAtime() { + // wait the channel to emit EventChannelClose + // before creating another channel + select { + case <-ch.done: + case <-cp.n.terminate: + } + } } } diff --git a/endpoint.go b/endpoint.go index e16021a98..148f92e48 100644 --- a/endpoint.go +++ b/endpoint.go @@ -32,5 +32,6 @@ type endpointChannelSingle interface { type endpointChannelProvider interface { Endpoint close() + oneChannelAtAtime() bool provide() (string, io.ReadWriteCloser, error) } diff --git a/endpoint_client.go b/endpoint_client.go index e859f47e4..fe84b8711 100644 --- a/endpoint_client.go +++ b/endpoint_client.go @@ -109,6 +109,10 @@ func (t *endpointClient) close() { t.reconnector.Close() } +func (t *endpointClient) oneChannelAtAtime() bool { + return true +} + func (t *endpointClient) provide() (string, io.ReadWriteCloser, error) { conn, ok := t.reconnector.Reconnect() if !ok { diff --git a/endpoint_client_test.go b/endpoint_client_test.go index fa4e106f9..8c00bb263 100644 --- a/endpoint_client_test.go +++ b/endpoint_client_test.go @@ -231,6 +231,8 @@ func TestEndpointClientIdleTimeout(t *testing.T) { t.Errorf("should not happen") } + <-node.Events() + <-reconnected }) } diff --git a/endpoint_serial.go b/endpoint_serial.go index c6ea587ac..5b8648ee3 100644 --- a/endpoint_serial.go +++ b/endpoint_serial.go @@ -60,6 +60,10 @@ func (t *endpointSerial) close() { t.reconnector.Close() } +func (t *endpointSerial) oneChannelAtAtime() bool { + return true +} + func (t *endpointSerial) provide() (string, io.ReadWriteCloser, error) { conn, ok := t.reconnector.Reconnect() if !ok { diff --git a/endpoint_server.go b/endpoint_server.go index d39ac1de3..a4d750b8e 100644 --- a/endpoint_server.go +++ b/endpoint_server.go @@ -113,6 +113,10 @@ func (t *endpointServer) close() { t.listener.Close() } +func (t *endpointServer) oneChannelAtAtime() bool { + return false +} + func (t *endpointServer) provide() (string, io.ReadWriteCloser, error) { nconn, err := t.listener.Accept() // wait termination, do not report errors diff --git a/pkg/reconnector/reconnector.go b/pkg/reconnector/reconnector.go index 112da12dd..4b49fb882 100644 --- a/pkg/reconnector/reconnector.go +++ b/pkg/reconnector/reconnector.go @@ -61,7 +61,7 @@ func (c *connWithContext) Write(p []byte) (int, error) { return n, err } -// Reconnector allocws to perform automatic reconnections. +// Reconnector allows to perform automatic reconnections. type Reconnector struct { connect ConnectFunc @@ -86,7 +86,7 @@ func (a *Reconnector) Close() { a.ctxCancel() } -// Reconnect returns the next working connection. +// Reconnect returns the next connection. func (a *Reconnector) Reconnect() (io.ReadWriteCloser, bool) { if a.curConn != nil { select {