Skip to content

Commit

Permalink
Add support for Sink.Close() being called multiple times
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshuaC215 committed Apr 23, 2019
1 parent 0375b5b commit 97f5f84
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 15 deletions.
7 changes: 7 additions & 0 deletions integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,13 @@ func (s *sinkTestSuite) TestSend() {
s.Equal(expectedMessages, receivedMessages)
}

func (s *sinkTestSuite) TestCloseTwice() {
err := s.sink.Close()
s.Nil(err)
err = s.sink.Close()
s.Nil(err)
}

func (s *sourceTestSuite) produce(values []string) {
for _, m := range values {
s.prod.Produce(&kafka.Message{
Expand Down
27 changes: 15 additions & 12 deletions sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ var (
// Sink encapsulates a kafka producer for Sending Msgs
type Sink struct {
prod *kafka.Producer
quitChan chan int
doneChan chan int
quitChan chan struct{}
doneChan chan struct{}
evtChan chan frizzle.Event
}

Expand All @@ -48,8 +48,8 @@ func InitSink(config *viper.Viper) (*Sink, error) {

s := &Sink{
prod: p,
quitChan: make(chan int),
doneChan: make(chan int),
quitChan: make(chan struct{}),
doneChan: make(chan struct{}),
evtChan: make(chan frizzle.Event),
}

Expand All @@ -62,11 +62,11 @@ func InitSink(config *viper.Viper) (*Sink, error) {
// message delivery is successful, any errors from broker, etc
func (s *Sink) deliveryReports() {
defer close(s.doneChan)
run := true
for run == true {
for {
select {
case <-s.quitChan:
run = false
s.quitChan = nil
return
case e := <-s.prod.Events():
switch ev := e.(type) {
case *kafka.Message:
Expand Down Expand Up @@ -104,18 +104,21 @@ func (s *Sink) Send(m frizzle.Msg, topic string) error {
// Close the Sink after flushing any Msgs not fully sent
func (s *Sink) Close() error {
// Flush any messages still pending send
fmt.Println("flush producer")
if remaining := s.prod.Flush(flushTimeoutMS); remaining > 0 {
return fmt.Errorf("there are still %d messages which have not been delivered after %d milliseconds", remaining, flushTimeoutMS)
}
// tell deliveryReports() goroutine to finish
fmt.Println("close deliveryReports() loop")
s.quitChan <- 1

// check if already closed, return if so
if s.quitChan == nil {
return nil
}

// tell deliveryReports() goroutine to finish if running
close(s.quitChan)
// wait for it to finish
<-s.doneChan
// stop event chan
close(s.evtChan)
fmt.Println("close producer")
s.prod.Close()
return nil
}
3 changes: 0 additions & 3 deletions source.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,6 @@ func (s *Source) Ping() error {
// unAcked Msgs.
func (s *Source) Close() error {
// confirm that consume() goroutine finished
fmt.Println("begin source close")
select {
case <-s.doneChan:
case <-time.After(stopCloseTimeout):
Expand All @@ -191,9 +190,7 @@ func (s *Source) Close() error {
if s.unAcked.Count() > 0 {
return frizzle.ErrUnackedMsgsRemain
}
fmt.Println("close channels")
close(s.msgChan)
close(s.evtChan)
fmt.Println("close consumer")
return s.cons.Close()
}

0 comments on commit 97f5f84

Please sign in to comment.