Skip to content

Commit

Permalink
Cleaning up a bit
Browse files Browse the repository at this point in the history
  • Loading branch information
dehort committed Apr 15, 2024
1 parent 67eb572 commit 84e505f
Show file tree
Hide file tree
Showing 2 changed files with 2 additions and 7 deletions.
5 changes: 0 additions & 5 deletions internal/common/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,6 @@ func Produce(producer *kafka.Producer, topic string, value interface{}, key stri

err = producer.Produce(msg, deliveryChan)
if err != nil {
fmt.Printf("Produce failed: %v\n", err)
return err
}

Expand All @@ -158,13 +157,9 @@ func Produce(producer *kafka.Producer, topic string, value interface{}, key stri
m := e.(*kafka.Message)

if m.TopicPartition.Error != nil {
fmt.Printf("Delivery failed: %v\n", m.TopicPartition.Error)
return m.TopicPartition.Error
}

fmt.Printf("Delivered message to topic %s [%d] at offset %v\n",
*m.TopicPartition.Topic, m.TopicPartition.Partition, m.TopicPartition.Offset)

return nil
}

Expand Down
4 changes: 2 additions & 2 deletions internal/validator/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ func (this *handler) produceMessage(ctx context.Context, topic string, value int
if err := kafkaUtils.Produce(this.producer, topic, value, key, headers...); err != nil {
instrumentation.ProducerError(ctx, err, topic)

if ignoreKafkaError(err) {
if ignoreKafkaProduceError(err) {
return
}

Expand All @@ -261,7 +261,7 @@ func (this *handler) produceMessage(ctx context.Context, topic string, value int
}
}

func ignoreKafkaError(err error) bool {
func ignoreKafkaProduceError(err error) bool {

kafkaErr := err.(kafka.Error)

Expand Down

0 comments on commit 84e505f

Please sign in to comment.