Skip to content

Commit

Permalink
Merge pull request #10 from JoshKCarroll/debug-close-timeout
Browse files Browse the repository at this point in the history
 Add support for Sink.Close() being called multiple times
  • Loading branch information
JoshuaC215 authored Apr 23, 2019
2 parents 1e46a6b + 97f5f84 commit 36ae99e
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 10 deletions.
7 changes: 7 additions & 0 deletions integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,13 @@ func (s *sinkTestSuite) TestSend() {
s.Equal(expectedMessages, receivedMessages)
}

func (s *sinkTestSuite) TestCloseTwice() {
err := s.sink.Close()
s.Nil(err)
err = s.sink.Close()
s.Nil(err)
}

func (s *sourceTestSuite) produce(values []string) {
for _, m := range values {
s.prod.Produce(&kafka.Message{
Expand Down
26 changes: 16 additions & 10 deletions sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@ var (

var (
// how long to wait for messages to flush
flushTimeoutMS = 30 * 1000
flushTimeoutMS = 10 * 1000
)

// Sink encapsulates a kafka producer for Sending Msgs
type Sink struct {
prod *kafka.Producer
quitChan chan int
doneChan chan int
quitChan chan struct{}
doneChan chan struct{}
evtChan chan frizzle.Event
}

Expand All @@ -48,8 +48,8 @@ func InitSink(config *viper.Viper) (*Sink, error) {

s := &Sink{
prod: p,
quitChan: make(chan int),
doneChan: make(chan int),
quitChan: make(chan struct{}),
doneChan: make(chan struct{}),
evtChan: make(chan frizzle.Event),
}

Expand All @@ -62,11 +62,11 @@ func InitSink(config *viper.Viper) (*Sink, error) {
// message delivery is successful, any errors from broker, etc
func (s *Sink) deliveryReports() {
defer close(s.doneChan)
run := true
for run == true {
for {
select {
case <-s.quitChan:
run = false
s.quitChan = nil
return
case e := <-s.prod.Events():
switch ev := e.(type) {
case *kafka.Message:
Expand Down Expand Up @@ -107,8 +107,14 @@ func (s *Sink) Close() error {
if remaining := s.prod.Flush(flushTimeoutMS); remaining > 0 {
return fmt.Errorf("there are still %d messages which have not been delivered after %d milliseconds", remaining, flushTimeoutMS)
}
// tell deliveryReports() goroutine to finish
s.quitChan <- 1

// check if already closed, return if so
if s.quitChan == nil {
return nil
}

// tell deliveryReports() goroutine to finish if running
close(s.quitChan)
// wait for it to finish
<-s.doneChan
// stop event chan
Expand Down

0 comments on commit 36ae99e

Please sign in to comment.