Skip to content

Commit

Permalink
add a service for kafka broker interactions
Browse files Browse the repository at this point in the history
  • Loading branch information
PapePathe committed Oct 15, 2023
1 parent 68e6ab7 commit 2337dda
Showing 1 changed file with 34 additions and 0 deletions.
34 changes: 34 additions & 0 deletions pkg/broker/kafka.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package broker

import (
"context"

"github.com/segmentio/kafka-go"
)

type KafkaPublisher struct {
autocreateTopics bool
writer *kafka.Writer
}

func NewPublisher(addr []string, autocreateTopics bool) *KafkaPublisher {
w := &kafka.Writer{
Addr: kafka.TCP(addr...),
Balancer: &kafka.LeastBytes{},
AllowAutoTopicCreation: autocreateTopics,
}

return &KafkaPublisher{writer: w}
}

func (kp KafkaPublisher) Publish(msg []kafka.Message) error {
if err := kp.writer.WriteMessages(context.Background(), msg...); err != nil {
return err
}

if err := kp.writer.Close(); err != nil {
return err
}

return nil
}

0 comments on commit 2337dda

Please sign in to comment.