diff --git a/integration_test.go b/integration_test.go index 5898a53..2058567 100644 --- a/integration_test.go +++ b/integration_test.go @@ -219,6 +219,13 @@ func (s *sinkTestSuite) TestSend() { s.Equal(expectedMessages, receivedMessages) } +func (s *sinkTestSuite) TestCloseTwice() { + err := s.sink.Close() + s.Nil(err) + err = s.sink.Close() + s.Nil(err) +} + func (s *sourceTestSuite) produce(values []string) { for _, m := range values { s.prod.Produce(&kafka.Message{ diff --git a/sink.go b/sink.go index 0ac98da..6740ded 100644 --- a/sink.go +++ b/sink.go @@ -17,14 +17,14 @@ var ( var ( // how long to wait for messages to flush - flushTimeoutMS = 30 * 1000 + flushTimeoutMS = 10 * 1000 ) // Sink encapsulates a kafka producer for Sending Msgs type Sink struct { prod *kafka.Producer - quitChan chan int - doneChan chan int + quitChan chan struct{} + doneChan chan struct{} evtChan chan frizzle.Event } @@ -48,8 +48,8 @@ func InitSink(config *viper.Viper) (*Sink, error) { s := &Sink{ prod: p, - quitChan: make(chan int), - doneChan: make(chan int), + quitChan: make(chan struct{}), + doneChan: make(chan struct{}), evtChan: make(chan frizzle.Event), } @@ -62,11 +62,11 @@ func InitSink(config *viper.Viper) (*Sink, error) { // message delivery is successful, any errors from broker, etc func (s *Sink) deliveryReports() { defer close(s.doneChan) - run := true - for run == true { + for { select { case <-s.quitChan: - run = false + s.quitChan = nil + return case e := <-s.prod.Events(): switch ev := e.(type) { case *kafka.Message: @@ -107,8 +107,14 @@ func (s *Sink) Close() error { if remaining := s.prod.Flush(flushTimeoutMS); remaining > 0 { return fmt.Errorf("there are still %d messages which have not been delivered after %d milliseconds", remaining, flushTimeoutMS) } - // tell deliveryReports() goroutine to finish - s.quitChan <- 1 + + // check if already closed, return if so + if s.quitChan == nil { + return nil + } + + // tell deliveryReports() goroutine to finish if running + close(s.quitChan) // wait for it to finish <-s.doneChan // stop event chan