Skip to content

Commit

Permalink
replace ring buffer with fixed-size channel (#69)
Browse files Browse the repository at this point in the history
  • Loading branch information
aler9 authored Aug 18, 2023
1 parent 34b87ed commit a025891
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 302 deletions.
92 changes: 52 additions & 40 deletions channel.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package gomavlib

import (
"context"
"crypto/rand"
"io"

"github.com/bluenviron/gomavlib/v2/pkg/frame"
"github.com/bluenviron/gomavlib/v2/pkg/message"
"github.com/bluenviron/gomavlib/v2/pkg/ringbuffer"
)

const (
Expand All @@ -26,19 +26,26 @@ func randomByte() (byte, error) {
// For instance, a TCP client endpoint creates a single channel, while a TCP
// server endpoint creates a channel for each incoming connection.
type Channel struct {
e Endpoint
label string
rwc io.Closer
n *Node
frw *frame.ReadWriter
running bool
writeBuffer *ringbuffer.RingBuffer
n *Node
e Endpoint
label string
rwc io.Closer

ctx context.Context
ctxCancel func()
frw *frame.ReadWriter
running bool

// in
terminate chan struct{}
chWrite chan interface{}
}

func newChannel(n *Node, e Endpoint, label string, rwc io.ReadWriteCloser) (*Channel, error) {
func newChannel(
n *Node,
e Endpoint,
label string,
rwc io.ReadWriteCloser,
) (*Channel, error) {
linkID, err := randomByte()
if err != nil {
return nil, err
Expand All @@ -63,26 +70,23 @@ func newChannel(n *Node, e Endpoint, label string, rwc io.ReadWriteCloser) (*Cha
return nil, err
}

writeBuffer, err := ringbuffer.New(writeBufferSize)
if err != nil {
return nil, err
}
ctx, ctxCancel := context.WithCancel(context.Background())

return &Channel{
e: e,
label: label,
rwc: rwc,
n: n,
frw: frw,
writeBuffer: writeBuffer,
terminate: make(chan struct{}),
n: n,
e: e,
label: label,
rwc: rwc,
ctx: ctx,
ctxCancel: ctxCancel,
frw: frw,
chWrite: make(chan interface{}, writeBufferSize),
}, nil
}

func (ch *Channel) close() {
if ch.running {
close(ch.terminate)
} else {
ch.ctxCancel()
if !ch.running {
ch.rwc.Close()
}
}
Expand All @@ -99,24 +103,27 @@ func (ch *Channel) run() {
readerDone := make(chan struct{})
go ch.runReader(readerDone)

writerTerminate := make(chan struct{})
writerDone := make(chan struct{})
go ch.runWriter(writerDone)
go ch.runWriter(writerTerminate, writerDone)

select {
case <-readerDone:
ch.rwc.Close()

ch.writeBuffer.Close()
close(writerTerminate)
<-writerDone

case <-ch.terminate:
ch.writeBuffer.Close()
case <-ch.ctx.Done():
close(writerTerminate)
<-writerDone

ch.rwc.Close()
<-readerDone
}

ch.ctxCancel()

ch.n.pushEvent(&EventChannelClose{ch})
ch.n.closeChannel(ch)
}
Expand Down Expand Up @@ -148,21 +155,22 @@ func (ch *Channel) runReader(readerDone chan struct{}) {
}
}

func (ch *Channel) runWriter(writerDone chan struct{}) {
func (ch *Channel) runWriter(writerTerminate chan struct{}, writerDone chan struct{}) {
defer close(writerDone)

for {
what, ok := ch.writeBuffer.Pull()
if !ok {
return
}

switch wh := what.(type) {
case message.Message:
ch.frw.WriteMessage(wh) //nolint:errcheck
select {
case what := <-ch.chWrite:
switch wh := what.(type) {
case message.Message:
ch.frw.WriteMessage(wh) //nolint:errcheck

case frame.Frame:
ch.frw.WriteFrame(wh) //nolint:errcheck
}

case frame.Frame:
ch.frw.WriteFrame(wh) //nolint:errcheck
case <-writerTerminate:
return
}
}
}
Expand All @@ -178,5 +186,9 @@ func (ch *Channel) Endpoint() Endpoint {
}

func (ch *Channel) write(what interface{}) {
ch.writeBuffer.Push(what)
select {
case ch.chWrite <- what:
case <-ch.ctx.Done():
default: // buffer is full
}
}
38 changes: 0 additions & 38 deletions pkg/ringbuffer/event.go

This file was deleted.

77 changes: 0 additions & 77 deletions pkg/ringbuffer/ringbuffer.go

This file was deleted.

Loading

0 comments on commit a025891

Please sign in to comment.