Skip to content

Commit

Permalink
in case of reconnection, make sure that EventClose is sent before Eve…
Browse files Browse the repository at this point in the history
…ntOpen (#82)
  • Loading branch information
aler9 authored Sep 21, 2023
1 parent 4ff9532 commit f81129b
Show file tree
Hide file tree
Showing 8 changed files with 41 additions and 12 deletions.
5 changes: 5 additions & 0 deletions channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ type Channel struct {

// in
chWrite chan interface{}

// out
done chan struct{}
}

func newChannel(
Expand Down Expand Up @@ -79,6 +82,7 @@ func newChannel(
ctxCancel: ctxCancel,
frw: frw,
chWrite: make(chan interface{}, writeBufferSize),
done: make(chan struct{}),
}, nil
}

Expand All @@ -96,6 +100,7 @@ func (ch *Channel) start() {
}

func (ch *Channel) run() {
defer close(ch.done)
defer ch.n.wg.Done()

readerDone := make(chan struct{})
Expand Down
29 changes: 19 additions & 10 deletions channel_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,32 +16,41 @@ func newChannelProvider(n *Node, eca endpointChannelProvider) (*channelProvider,
}, nil
}

func (ca *channelProvider) close() {
ca.eca.close()
func (cp *channelProvider) close() {
cp.eca.close()
}

func (ca *channelProvider) start() {
ca.n.wg.Add(1)
go ca.run()
func (cp *channelProvider) start() {
cp.n.wg.Add(1)
go cp.run()
}

func (ca *channelProvider) run() {
defer ca.n.wg.Done()
func (cp *channelProvider) run() {
defer cp.n.wg.Done()

for {
label, rwc, err := ca.eca.provide()
label, rwc, err := cp.eca.provide()
if err != nil {
if err != errTerminated {
panic("errTerminated is the only error allowed here")
}
break
}

ch, err := newChannel(ca.n, ca.eca, label, rwc)
ch, err := newChannel(cp.n, cp.eca, label, rwc)
if err != nil {
panic(fmt.Errorf("newChannel unexpected error: %s", err))
}

ca.n.newChannel(ch)
cp.n.newChannel(ch)

if cp.eca.oneChannelAtAtime() {
// wait the channel to emit EventChannelClose
// before creating another channel
select {
case <-ch.done:
case <-cp.n.terminate:
}
}
}
}
1 change: 1 addition & 0 deletions endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,6 @@ type endpointChannelSingle interface {
type endpointChannelProvider interface {
Endpoint
close()
oneChannelAtAtime() bool
provide() (string, io.ReadWriteCloser, error)
}
4 changes: 4 additions & 0 deletions endpoint_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ func (t *endpointClient) close() {
t.reconnector.Close()
}

func (t *endpointClient) oneChannelAtAtime() bool {
return true
}

func (t *endpointClient) provide() (string, io.ReadWriteCloser, error) {
conn, ok := t.reconnector.Reconnect()
if !ok {
Expand Down
2 changes: 2 additions & 0 deletions endpoint_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,8 @@ func TestEndpointClientIdleTimeout(t *testing.T) {
t.Errorf("should not happen")
}

<-node.Events()

<-reconnected
})
}
Expand Down
4 changes: 4 additions & 0 deletions endpoint_serial.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ func (t *endpointSerial) close() {
t.reconnector.Close()
}

func (t *endpointSerial) oneChannelAtAtime() bool {
return true
}

func (t *endpointSerial) provide() (string, io.ReadWriteCloser, error) {
conn, ok := t.reconnector.Reconnect()
if !ok {
Expand Down
4 changes: 4 additions & 0 deletions endpoint_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,10 @@ func (t *endpointServer) close() {
t.listener.Close()
}

func (t *endpointServer) oneChannelAtAtime() bool {
return false
}

func (t *endpointServer) provide() (string, io.ReadWriteCloser, error) {
nconn, err := t.listener.Accept()
// wait termination, do not report errors
Expand Down
4 changes: 2 additions & 2 deletions pkg/reconnector/reconnector.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (c *connWithContext) Write(p []byte) (int, error) {
return n, err
}

// Reconnector allocws to perform automatic reconnections.
// Reconnector allows to perform automatic reconnections.
type Reconnector struct {
connect ConnectFunc

Expand All @@ -86,7 +86,7 @@ func (a *Reconnector) Close() {
a.ctxCancel()
}

// Reconnect returns the next working connection.
// Reconnect returns the next connection.
func (a *Reconnector) Reconnect() (io.ReadWriteCloser, bool) {
if a.curConn != nil {
select {
Expand Down

0 comments on commit f81129b

Please sign in to comment.