Skip to content

Commit

Permalink
Trying to fix the memory leak
Browse files Browse the repository at this point in the history
  • Loading branch information
dehort committed Apr 11, 2024
1 parent 8a2079e commit b844d63
Showing 1 changed file with 20 additions and 1 deletion.
21 changes: 20 additions & 1 deletion internal/common/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,26 @@ func Produce(producer *kafka.Producer, topic string, value interface{}, key stri
msg.Headers = headers
}

return producer.Produce(msg, nil)
deliveryChan := make(chan kafka.Event)
defer close(deliveryChan)

err = producer.Produce(msg, deliveryChan)

Check failure on line 150 in internal/common/kafka/kafka.go

View workflow job for this annotation

GitHub Actions / lint

ineffectual assignment to err (ineffassign)

// FIXME: do we just ignore this err instance and check the delivery channel? This api is confusing...

// Reading the delivery channel here makes this a synchronous write (blocking)
e := <-deliveryChan
m := e.(*kafka.Message)

if m.TopicPartition.Error != nil {
fmt.Printf("Delivery failed: %v\n", m.TopicPartition.Error)
return m.TopicPartition.Error
}

fmt.Printf("Delivered message to topic %s [%d] at offset %v\n",
*m.TopicPartition.Topic, m.TopicPartition.Partition, m.TopicPartition.Offset)

return nil
}

type KafkaMessagePredicate func(msg *kafka.Message) bool
Expand Down

0 comments on commit b844d63

Please sign in to comment.