-
Notifications
You must be signed in to change notification settings - Fork 19
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Matcher v2 init #541
Matcher v2 init #541
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -52,10 +52,9 @@ type Kafka struct { | |
} | ||
|
||
func NewSink(c Config, logger *zap.Logger, listener ancla.Register) Sink { | ||
var sink Sink | ||
switch l := listener.(type) { | ||
case *ancla.RegistryV1: | ||
sink = &WebhookV1{ | ||
sink := &WebhookV1{ | ||
id: l.GetId(), | ||
deliveryInterval: c.DeliveryInterval, | ||
deliveryRetries: c.DeliveryRetries, | ||
|
@@ -68,31 +67,32 @@ func NewSink(c Config, logger *zap.Logger, listener ancla.Register) Sink { | |
kafka := &Kafka{ | ||
id: l.Registration.CanonicalName, | ||
brokerAddr: k.BootstrapServers, | ||
topic: "test", | ||
topic: "quickstart-events", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is based off the kafka docs for testing locally... i imagine we will be including topic as a part of the kafka webhook struct that users will be sending us? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yup |
||
logger: logger, | ||
} | ||
|
||
config := sarama.NewConfig() | ||
//TODO: this is basic set up for now - will need to add more options to config | ||
//once we know what we are allowing users to send | ||
kafka.config.Producer.Return.Successes = true | ||
kafka.config.Producer.RequiredAcks = sarama.WaitForAll | ||
kafka.config.Producer.Retry.Max = c.DeliveryRetries //should we be using retryhint for this? | ||
|
||
config.Producer.Return.Successes = true | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I added back in the default config because it kept failing due to certain fields not being set. i'm setting these fields this way based on a tutorial I found: https://youtu.be/j6bqJKxb2w0?si=-htt-I0kgF8RqHMy&t=1384 |
||
config.Producer.RequiredAcks = sarama.WaitForAll | ||
config.Producer.Retry.Max = c.DeliveryRetries //should we be using retryhint for this? | ||
kafka.config = config | ||
// Create a new Kafka producer | ||
producer, err := sarama.NewSyncProducer(kafka.brokerAddr, config) | ||
if err != nil { | ||
kafka.logger.Error("Could not create Kafka producer", zap.Error(err)) | ||
return nil | ||
} | ||
defer producer.Close() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
|
||
kafka.producer = producer | ||
sink = append(sink, kafka) | ||
} | ||
|
||
return sink | ||
default: | ||
return nil | ||
} | ||
return sink | ||
} | ||
|
||
func (v1 *WebhookV1) Update(l ancla.Register) (err error) { | ||
|
@@ -346,6 +346,7 @@ func (k *Kafka) send(secret string, acceptType string, msg *wrp.Message) error { | |
|
||
// Send the message to Kafka | ||
partition, offset, err := k.producer.SendMessage(kafkaMsg) | ||
defer k.producer.Close() | ||
if err != nil { | ||
k.logger.Error("Failed to send message to Kafka", zap.Error(err)) | ||
return err | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't believe MatcherV2 is going to need getUrls - in my refactored branch i move the urls to the matcher to get rid of this function which is also called here:
caduceus/internal/sink/sinkSender.go
Line 423 in dad38f9