Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/kafka #538

Merged
merged 7 commits into from
Sep 5, 2024
Merged

Feat/kafka #538

merged 7 commits into from
Sep 5, 2024

Conversation

maurafortino
Copy link
Contributor

@maurafortino maurafortino commented Aug 27, 2024

What's Included:

  • added select case for ancla.RegistryV2 for NewSink - only for Kafka
  • added in basic configuration for a Kafka producer

TODO:

  • figure out what configurations will be needed for Kafka producer
  • edit Kafka send function
  • kafka headers and adding arrays correctly

@maurafortino maurafortino requested a review from denopink August 27, 2024 17:03
@maurafortino maurafortino self-assigned this Aug 27, 2024
@maurafortino maurafortino marked this pull request as draft August 27, 2024 17:06
@maurafortino maurafortino mentioned this pull request Aug 27, 2024
}

// Create a new Kafka producer
producer, err := sarama.NewSyncProducer(k.brokerAddr, k.config)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we want to cache the producer and not recreate it for every incoming event

internal/sink/sink.go Outdated Show resolved Hide resolved
Comment on lines 75 to 80
config := sarama.NewConfig()
config.Producer.Return.Successes = true
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Retry.Max = c.DeliveryRetries //should we be using retryhint for this?

kafka.config = config
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
config := sarama.NewConfig()
config.Producer.Return.Successes = true
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Retry.Max = c.DeliveryRetries //should we be using retryhint for this?
kafka.config = config
kafka.config.Producer.Return.Successes = true
kafka.config.Producer.RequiredAcks = sarama.WaitForAll
kafka.config.Producer.Retry.Max = c.DeliveryRetries //should we be using retryhint for this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are we getting rid of NewConfig?

@maurafortino maurafortino marked this pull request as ready for review September 3, 2024 12:31
@maurafortino maurafortino merged commit dad38f9 into denopink/feat/rewrite Sep 5, 2024
8 of 13 checks passed
@maurafortino maurafortino deleted the feat/kafka branch September 5, 2024 15:51
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
Status: Done
Development

Successfully merging this pull request may close these issues.

2 participants