From 0375b5bff1e42d0295d337ded82ba0e737871702 Mon Sep 17 00:00:00 2001 From: Joshua Carroll Date: Mon, 22 Apr 2019 16:22:13 -0400 Subject: [PATCH 1/2] Print statements --- sink.go | 5 ++++- source.go | 3 +++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/sink.go b/sink.go index 0ac98da..979676f 100644 --- a/sink.go +++ b/sink.go @@ -17,7 +17,7 @@ var ( var ( // how long to wait for messages to flush - flushTimeoutMS = 30 * 1000 + flushTimeoutMS = 10 * 1000 ) // Sink encapsulates a kafka producer for Sending Msgs @@ -104,15 +104,18 @@ func (s *Sink) Send(m frizzle.Msg, topic string) error { // Close the Sink after flushing any Msgs not fully sent func (s *Sink) Close() error { // Flush any messages still pending send + fmt.Println("flush producer") if remaining := s.prod.Flush(flushTimeoutMS); remaining > 0 { return fmt.Errorf("there are still %d messages which have not been delivered after %d milliseconds", remaining, flushTimeoutMS) } // tell deliveryReports() goroutine to finish + fmt.Println("close deliveryReports() loop") s.quitChan <- 1 // wait for it to finish <-s.doneChan // stop event chan close(s.evtChan) + fmt.Println("close producer") s.prod.Close() return nil } diff --git a/source.go b/source.go index a75cbef..62c31e3 100644 --- a/source.go +++ b/source.go @@ -182,6 +182,7 @@ func (s *Source) Ping() error { // unAcked Msgs. func (s *Source) Close() error { // confirm that consume() goroutine finished + fmt.Println("begin source close") select { case <-s.doneChan: case <-time.After(stopCloseTimeout): @@ -190,7 +191,9 @@ func (s *Source) Close() error { if s.unAcked.Count() > 0 { return frizzle.ErrUnackedMsgsRemain } + fmt.Println("close channels") close(s.msgChan) close(s.evtChan) + fmt.Println("close consumer") return s.cons.Close() } From 97f5f84406f6cc9e66580a6c189ae76c70371736 Mon Sep 17 00:00:00 2001 From: Joshua Carroll Date: Tue, 23 Apr 2019 17:23:56 -0400 Subject: [PATCH 2/2] Add support for Sink.Close() being called multiple times --- integration_test.go | 7 +++++++ sink.go | 27 +++++++++++++++------------ source.go | 3 --- 3 files changed, 22 insertions(+), 15 deletions(-) diff --git a/integration_test.go b/integration_test.go index 5898a53..2058567 100644 --- a/integration_test.go +++ b/integration_test.go @@ -219,6 +219,13 @@ func (s *sinkTestSuite) TestSend() { s.Equal(expectedMessages, receivedMessages) } +func (s *sinkTestSuite) TestCloseTwice() { + err := s.sink.Close() + s.Nil(err) + err = s.sink.Close() + s.Nil(err) +} + func (s *sourceTestSuite) produce(values []string) { for _, m := range values { s.prod.Produce(&kafka.Message{ diff --git a/sink.go b/sink.go index 979676f..6740ded 100644 --- a/sink.go +++ b/sink.go @@ -23,8 +23,8 @@ var ( // Sink encapsulates a kafka producer for Sending Msgs type Sink struct { prod *kafka.Producer - quitChan chan int - doneChan chan int + quitChan chan struct{} + doneChan chan struct{} evtChan chan frizzle.Event } @@ -48,8 +48,8 @@ func InitSink(config *viper.Viper) (*Sink, error) { s := &Sink{ prod: p, - quitChan: make(chan int), - doneChan: make(chan int), + quitChan: make(chan struct{}), + doneChan: make(chan struct{}), evtChan: make(chan frizzle.Event), } @@ -62,11 +62,11 @@ func InitSink(config *viper.Viper) (*Sink, error) { // message delivery is successful, any errors from broker, etc func (s *Sink) deliveryReports() { defer close(s.doneChan) - run := true - for run == true { + for { select { case <-s.quitChan: - run = false + s.quitChan = nil + return case e := <-s.prod.Events(): switch ev := e.(type) { case *kafka.Message: @@ -104,18 +104,21 @@ func (s *Sink) Send(m frizzle.Msg, topic string) error { // Close the Sink after flushing any Msgs not fully sent func (s *Sink) Close() error { // Flush any messages still pending send - fmt.Println("flush producer") if remaining := s.prod.Flush(flushTimeoutMS); remaining > 0 { return fmt.Errorf("there are still %d messages which have not been delivered after %d milliseconds", remaining, flushTimeoutMS) } - // tell deliveryReports() goroutine to finish - fmt.Println("close deliveryReports() loop") - s.quitChan <- 1 + + // check if already closed, return if so + if s.quitChan == nil { + return nil + } + + // tell deliveryReports() goroutine to finish if running + close(s.quitChan) // wait for it to finish <-s.doneChan // stop event chan close(s.evtChan) - fmt.Println("close producer") s.prod.Close() return nil } diff --git a/source.go b/source.go index 62c31e3..a75cbef 100644 --- a/source.go +++ b/source.go @@ -182,7 +182,6 @@ func (s *Source) Ping() error { // unAcked Msgs. func (s *Source) Close() error { // confirm that consume() goroutine finished - fmt.Println("begin source close") select { case <-s.doneChan: case <-time.After(stopCloseTimeout): @@ -191,9 +190,7 @@ func (s *Source) Close() error { if s.unAcked.Count() > 0 { return frizzle.ErrUnackedMsgsRemain } - fmt.Println("close channels") close(s.msgChan) close(s.evtChan) - fmt.Println("close consumer") return s.cons.Close() }