-
Notifications
You must be signed in to change notification settings - Fork 445
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
contrib/segmentio/kafka.go.v0: add DSM support #2625
Conversation
datastreams.InjectToBase64Carrier(ctx, carrier) | ||
} | ||
|
||
func getProducerMsgSize(msg *kafka.Message) (size int64) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is an alternative private implementation here:
I've kept with the one in the other integrations.
Hi @adrien-f, thanks for contributing this feature. I approved running the CI pipeline and there are some things to fix:
|
Many thanks @darccio ! I've pushed the Regarding the issue reported in the CI, it appears to be caused by the |
I believe I corrected the issue 👍 Happy weekend, |
@@ -46,13 +48,17 @@ func WrapReader(c *kafka.Reader, opts ...Option) *Reader { | |||
if c.Config().Brokers != nil { | |||
wrapped.bootstrapServers = strings.Join(c.Config().Brokers, ",") | |||
} | |||
|
|||
wrapped.groupID = c.Config().GroupID |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@adrien-f Comparing with Shopify/sarama
contrib, groupID
comes from WithGroupID
instead. I think it'll be better to be coherent with the other contrib, and avoid changing the default behaviour (working without GroupID).
Do you mind adding the option and removing this line?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For sure, I'm also looking at the confluent-kafka-go
integration and they have a WithConfig:
dd-trace-go/contrib/confluentinc/confluent-kafka-go/kafka/option.go
Lines 114 to 116 in 4904f57
if groupID, err := cg.Get("group.id", ""); err == nil { | |
cfg.groupID = groupID.(string) | |
} |
Which is called through this:
dd-trace-go/contrib/confluentinc/confluent-kafka-go/kafka/kafka.go
Lines 43 to 44 in 4904f57
opts = append(opts, WithConfig(conf)) | |
return WrapConsumer(c, opts...), nil |
And I like this approach of enhancing the constructor transparently as users adopting DSM will only need to update and turn on the environment variable instead of having to explicitly pass the groupID.
This might be too much of a broad change for this PR though, what do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@adrien-f If you want, I can contribute the change. It won't take me too much time. I'd rather avoid introducing a subtle breaking change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No worries, I just wanted to be sure :) I've pushed the change!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@adrien-f I don't see any new commit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@darccio @adrien-f the reason sarama has this group ID option is because the consumer group is not available otherwise (the usage of consumer groups is through a different api that we currently don't support, see: #2133).
In general, the implementation that we should be looking for reference is the confluent-kafka one: https://github.com/DataDog/dd-trace-go/blob/main/contrib/confluentinc/confluent-kafka-go/kafka.v2/option.go#L114-L116 -> the groupID is fetched from the kafka config actually used by the consumer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Thanks for the review |
You are welcome. I'll get a second review from Ecosystems, @rarguelloF, but it won't be done until next week (Holy week is close, so some Datadogs are already OOO). |
// WithGroupID tags the produced data streams metrics with the given groupID (aka consumer group) | ||
func WithGroupID(groupID string) Option { | ||
return func(cfg *config) { | ||
cfg.groupID = groupID | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should be fetched from the actual consumer config right?
You can access it in the WrapReader
function using kafka.Reader.Config().GroupID
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey :) This is what I had at first before changing it following #2625 (comment)
How would you see it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I replied here: #2625 (comment)
Sorry for the confusion! @adrien-f but TLDR; the reason we have that option in Sarama is because we don't have a way to access the groupID there, but in this contrib and the confluent kafka one we do have the option to access it from the config, so this would make this WithGroupID
option unnecessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the explanation! I've restored this behavior 👍
@adrien-f Please, merge |
Head branch was pushed to by a user without write access
@darccio I used the rebase button/feature but looks like it removed the auto-merge, sorry about that 🙏 |
Signed-off-by: Adrien Fillon <[email protected]>
/merge |
🚂 MergeQueue This merge request is not mergeable yet, because of pending checks/missing approvals. It will be added to the queue as soon as checks pass and/or get approvals. Use |
🚂 MergeQueue Pull request added to the queue. This build is going to start soon! (estimated merge in less than 0s) Use |
/remove |
🚂 Devflow: |
This merge request build was cancelled If you need support, contact us on Slack #devflow! |
What does this PR do?
Add Data Streams Monitoring support to segmentio/kafka integration
Motivation
We currently use segmentio/kafka for our Go projects and it was easier to update the integration than change library.
Reviewer's Checklist
For Datadog employees:
@DataDog/security-design-and-guidance
.Unsure? Have a question? Request a review!