-
Notifications
You must be signed in to change notification settings - Fork 14.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-18013: Add AutoOffsetResetStrategy internal class #17858
Conversation
5fb0ba3
to
ab1a138
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Made a pass
|
||
// This may be null if the task we are currently processing was apart of a named topology that was just removed. | ||
// TODO KAFKA-13713: keep the StreamThreads and TopologyMetadata view of named topologies in sync until final thread has acked | ||
if (offsetResetStrategy != null) { | ||
switch (offsetResetStrategy) { | ||
case EARLIEST: | ||
switch (offsetResetStrategy.name()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to switch to name()
here and do a string comparison? Can't we keep an enum comparison?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As mentioned in other comment, enum wont help much here. We will be having different instances DurationBasedStrategy class. I have the updated code to avoid string comparisons.
public class AutoOffsetResetStrategy { | ||
public static final String EARLIEST_STRATEGY_NAME = "earliest"; | ||
public static final String LATEST_STRATEGY_NAME = "latest"; | ||
public static final String NONE_STRATEGY_NAME = "none"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we still have an internal enum
for the different strategies, to avoid doing string comparisons?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Enum wont help us as we still need to parse strings like `by_duration:PnDTnHnMn. n>. I prefer to keep string names for now. I will update in next PR if required. I will update code avoid string comparison in SteamsThread class
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The idea would be to have an enum plus an Optional<Duration>
which is only used when the enum value is "by-duration".
assertTrue(AutoOffsetResetStrategy.isValid("none")); | ||
assertFalse(AutoOffsetResetStrategy.isValid("invalid")); | ||
assertFalse(AutoOffsetResetStrategy.isValid("LATEST")); | ||
assertFalse(AutoOffsetResetStrategy.isValid("")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we add a test case for null
?
assertEquals(AutoOffsetResetStrategy.NONE, AutoOffsetResetStrategy.valueOf("none")); | ||
assertThrows(IllegalArgumentException.class, () -> AutoOffsetResetStrategy.valueOf("invalid")); | ||
assertThrows(IllegalArgumentException.class, () -> AutoOffsetResetStrategy.valueOf("LATEST")); | ||
assertThrows(IllegalArgumentException.class, () -> AutoOffsetResetStrategy.valueOf("")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we add a test case for null
?
assertDoesNotThrow(() -> validator.ensureValid("test", "none")); | ||
assertThrows(ConfigException.class, () -> validator.ensureValid("test", "invalid")); | ||
assertThrows(ConfigException.class, () -> validator.ensureValid("test", "LATEST")); | ||
assertThrows(ConfigException.class, () -> validator.ensureValid("test", "")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
null
?
|
||
public static final AutoOffsetResetStrategy EARLIEST = new AutoOffsetResetStrategy(EARLIEST_STRATEGY_NAME); | ||
public static final AutoOffsetResetStrategy LATEST = new AutoOffsetResetStrategy(LATEST_STRATEGY_NAME); | ||
public static final AutoOffsetResetStrategy NONE = new AutoOffsetResetStrategy(NONE_STRATEGY_NAME); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we use an enum
we don't need this boiler plate objects?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are not boilerplate objects. These are static instances to represent current strategies.
The main reason for converting enum into class is to allow us to add additional reset strategies with some properties.
For example, we will be adding new class like below to represent duration based reset strategy:
DurationBasedOffsetResetStrategy extends AutoOffsetResetStrategy {
DurationBasedOffsetResetStrategy(String name, String isoDuration) {
}
}
2d36c40
to
f4c60ea
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Personally, I would prefer enum
plus Optional<Duration>
instead of static final object, but guess it works either way.
f4c60ea
to
a71c5f0
Compare
I have moved the sting names to enum now. I will keep static instances as they easily fit into current code. |
78c7892
to
d336dfd
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR. Just a few minor comments.
} else if (offsetResetStrategy == AutoOffsetResetStrategy.LATEST) { | ||
addToResetList(partition, seekToEnd, "Setting topic '{}' to consume from {} offset", "latest", loggedTopics); | ||
} else if (offsetResetStrategy == AutoOffsetResetStrategy.NONE) { | ||
if (AutoOffsetResetStrategy.EARLIEST == AutoOffsetResetStrategy.fromString(originalReset)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest making a variable to hold AutoOffsetResetStrategy.fromString(originalReset)
so this code is not repeated in both if conditions.
@@ -18,6 +18,10 @@ | |||
|
|||
import java.util.Locale; | |||
|
|||
/** | |||
* @deprecated Since 4.0. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest adding an indication about what to migrate to instead.
@AndrewJSchofield Thanks for the review. Updated the PR, |
@@ -19,7 +19,7 @@ | |||
import java.util.Locale; | |||
|
|||
/** | |||
* @deprecated Since 4.0. | |||
* @deprecated Since 4.0. Use {@link org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy instead. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Missing the closing '}'.
- Deprecates OffsetResetStrategy enum - Adds new internal class AutoOffsetResetStrategy - Replaces all OffsetResetStrategy enum usages with AutoOffsetResetStrategy - Deprecate old/Add new constructors to MockConsumer Reviewers: Andrew Schofield <[email protected]>, Matthias J. Sax <[email protected]>
- Deprecates OffsetResetStrategy enum - Adds new internal class AutoOffsetResetStrategy - Replaces all OffsetResetStrategy enum usages with AutoOffsetResetStrategy - Deprecate old/Add new constructors to MockConsumer Reviewers: Andrew Schofield <[email protected]>, Matthias J. Sax <[email protected]>
This is preparatory change for KIP-1106.
This PR:
There are no functionality changes. We will add duration based offset reset support in subsequent PRs.