Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-17338 ConsumerConfig should prevent using partition assignors with CONSUMER group protocol #16899

Open
wants to merge 28 commits into
base: trunk
Choose a base branch
from

Conversation

m1a2st
Copy link
Contributor

@m1a2st m1a2st commented Aug 16, 2024

Jira: https://issues.apache.org/jira/browse/KAFKA-17338
add a validator for consumer group protocol, when setting partition.assignment.strategy

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@m1a2st m1a2st changed the title KAFKA-17338 KAFKA-17338 ConsumerConfig should prevent using partition assignors with CONSUMER group protocol Aug 17, 2024
@m1a2st m1a2st marked this pull request as ready for review August 20, 2024 08:02
Copy link
Contributor

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@m1a2st thanks for this patch

private void checkPartitionAssigmentStrategy() {
List<String> assignmentStrategies = getList(PARTITION_ASSIGNMENT_STRATEGY_CONFIG);
if (getString(GROUP_PROTOCOL_CONFIG).equalsIgnoreCase(GroupProtocol.CONSUMER.name())) {
List<Class<? extends AbstractPartitionAssignor>> defaultValue = Arrays.asList(RangeAssignor.class, CooperativeStickyAssignor.class);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please create a default value of Arrays.asList(RangeAssignor.class, CooperativeStickyAssignor.class)? That can keep the consistency in the futrue

List<String> assignmentStrategies = getList(PARTITION_ASSIGNMENT_STRATEGY_CONFIG);
if (getString(GROUP_PROTOCOL_CONFIG).equalsIgnoreCase(GroupProtocol.CONSUMER.name())) {
List<Class<? extends AbstractPartitionAssignor>> defaultValue = Arrays.asList(RangeAssignor.class, CooperativeStickyAssignor.class);
if (!new HashSet<>(assignmentStrategies).containsAll(defaultValue) || assignmentStrategies.size() != defaultValue.size()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we should accept the config equal to the default. for example, the config having org.apache.kafka.clients.consumer.RangeAssignor and org.apache.kafka.clients.consumer.CooperativeStickyAssignor should work

@m1a2st
Copy link
Contributor Author

m1a2st commented Aug 22, 2024

@chia7712, Thanks for your comments, I have addressed all the comments, PTAL

.map(Class::getName).collect(Collectors.toSet());
if (getString(GROUP_PROTOCOL_CONFIG).equalsIgnoreCase(GroupProtocol.CONSUMER.name())) {
if ((!new HashSet<>(assignmentStrategies).containsAll(defaultAssignmentStrategiesClassNames) &&
!new HashSet<>(assignmentStrategies).containsAll(PARTITION_ASSIGNOR_DEFAULT_VALUE)) ||
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about:

    @SuppressWarnings("unchecked")
    private void checkPartitionAssigmentStrategy() {
        List<Object> assignmentStrategies = (List<Object>) get(PARTITION_ASSIGNMENT_STRATEGY_CONFIG);
        Set<String> defaultAssignmentStrategiesClassNames = PARTITION_ASSIGNOR_DEFAULT_VALUE.stream()
                .map(Class::getName).collect(Collectors.toSet());
        if (PARTITION_ASSIGNOR_DEFAULT_VALUE.size() != assignmentStrategies.size() ||
            !PARTITION_ASSIGNOR_DEFAULT_VALUE.stream().allMatch(clz -> assignmentStrategies.stream().anyMatch(obj -> clz.equals(obj) || clz.getName().equals(obj)))) {
            throw new ConfigException(PARTITION_ASSIGNMENT_STRATEGY_CONFIG + " cannot be set when " + GROUP_PROTOCOL_CONFIG + "=" + GroupProtocol.CONSUMER.name());
        }
    }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is great, I will address it

@chia7712
Copy link
Contributor

chia7712 commented Sep 1, 2024

@m1a2st please fix the failed tests

@@ -384,7 +385,9 @@ public class ConsumerConfig extends AbstractConfig {
private static final String SECURITY_PROVIDERS_DOC = SecurityConfig.SECURITY_PROVIDERS_DOC;

private static final AtomicInteger CONSUMER_CLIENT_ID_SEQUENCE = new AtomicInteger(1);

private static final List<Class<? extends AbstractPartitionAssignor>> PARTITION_ASSIGNOR_DEFAULT_VALUE =
Arrays.asList(RangeAssignor.class, CooperativeStickyAssignor.class);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please make it immutable

final Map<String, Object> configs = new HashMap<>();
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializerClass);
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializerClass);
configs.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.RangeAssignor,org.apache.kafka.clients.consumer.CooperativeStickyAssignor");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please add following test cases?

    @Test
    public void testSettingDefaultValuePartitionAssigmentStrategyWithConsumerGroupProtocol() {
        testSettingDefaultValuePartitionAssigmentStrategyWithConsumerGroupProtocol("org.apache.kafka.clients.consumer.RangeAssignor,org.apache.kafka.clients.consumer.CooperativeStickyAssignor");
        testSettingDefaultValuePartitionAssigmentStrategyWithConsumerGroupProtocol("org.apache.kafka.clients.consumer.CooperativeStickyAssignor, org.apache.kafka.clients.consumer.RangeAssignor");
        testSettingDefaultValuePartitionAssigmentStrategyWithConsumerGroupProtocol(Arrays.asList("org.apache.kafka.clients.consumer.RangeAssignor", CooperativeStickyAssignor.class));
        testSettingDefaultValuePartitionAssigmentStrategyWithConsumerGroupProtocol(Arrays.asList(CooperativeStickyAssignor.class, "org.apache.kafka.clients.consumer.RangeAssignor"));
        testSettingDefaultValuePartitionAssigmentStrategyWithConsumerGroupProtocol(Arrays.asList(RangeAssignor.class, "org.apache.kafka.clients.consumer.CooperativeStickyAssignor"));
        testSettingDefaultValuePartitionAssigmentStrategyWithConsumerGroupProtocol(Arrays.asList("org.apache.kafka.clients.consumer.CooperativeStickyAssignor", RangeAssignor.class));
        testSettingDefaultValuePartitionAssigmentStrategyWithConsumerGroupProtocol(Arrays.asList(CooperativeStickyAssignor.class, RangeAssignor.class));
        testSettingDefaultValuePartitionAssigmentStrategyWithConsumerGroupProtocol(Arrays.asList(RangeAssignor.class, CooperativeStickyAssignor.class));
    }

    private void testSettingDefaultValuePartitionAssigmentStrategyWithConsumerGroupProtocol(Object value) {
        final Map<String, Object> configs = new HashMap<>();
        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializerClass);
        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializerClass);
        configs.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, value);
        configs.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name());
        assertDoesNotThrow(() -> new ConsumerConfig(configs));
    }

Copy link
Collaborator

@kirktrue kirktrue left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR @m1a2st!

I'd like to see if we can simplify this change. It seems by using originals(), we can clean up the checkPartitionAssignmentStrategy() method, which then allows us to revert the changes that introduced PARTITION_ASSIGNOR_DEFAULT_VALUE.

LMK if I'm missing something, though.

Thanks!

Comment on lines 734 to 740
@SuppressWarnings("unchecked")
private void checkPartitionAssigmentStrategy() {
List<Object> assignmentStrategies = (List<Object>) get(PARTITION_ASSIGNMENT_STRATEGY_CONFIG);
if (getString(GROUP_PROTOCOL_CONFIG).equalsIgnoreCase(GroupProtocol.CONSUMER.name())) {
if (PARTITION_ASSIGNOR_DEFAULT_VALUE.size() != assignmentStrategies.size() ||
!PARTITION_ASSIGNOR_DEFAULT_VALUE.stream()
.allMatch(clz -> assignmentStrategies.stream()
.anyMatch(obj -> clz.equals(obj) || clz.getName().equals(obj)))
) {
throw new ConfigException(PARTITION_ASSIGNMENT_STRATEGY_CONFIG + " cannot be set when " + GROUP_PROTOCOL_CONFIG + "=" + GroupProtocol.CONSUMER.name());
}
}
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the core check could be much simpler 🤔

I used the originals() method to check the values the user provided before any defaults were applied.

Suggested change
@SuppressWarnings("unchecked")
private void checkPartitionAssigmentStrategy() {
List<Object> assignmentStrategies = (List<Object>) get(PARTITION_ASSIGNMENT_STRATEGY_CONFIG);
if (getString(GROUP_PROTOCOL_CONFIG).equalsIgnoreCase(GroupProtocol.CONSUMER.name())) {
if (PARTITION_ASSIGNOR_DEFAULT_VALUE.size() != assignmentStrategies.size() ||
!PARTITION_ASSIGNOR_DEFAULT_VALUE.stream()
.allMatch(clz -> assignmentStrategies.stream()
.anyMatch(obj -> clz.equals(obj) || clz.getName().equals(obj)))
) {
throw new ConfigException(PARTITION_ASSIGNMENT_STRATEGY_CONFIG + " cannot be set when " + GROUP_PROTOCOL_CONFIG + "=" + GroupProtocol.CONSUMER.name());
}
}
}
private void checkPartitionAssignmentStrategy() {
if (!getString(GROUP_PROTOCOL_CONFIG).equalsIgnoreCase(GroupProtocol.CONSUMER.name()))
return;
Object config = originals().get(PARTITION_ASSIGNMENT_STRATEGY_CONFIG);
if (config != null && !Utils.isBlank(config.toString())) {
throw new ConfigException(PARTITION_ASSIGNMENT_STRATEGY_CONFIG + " cannot be set when " + GROUP_PROTOCOL_CONFIG + "=" + GroupProtocol.CONSUMER.name());
}
}

Is there anything missing in the above?

Copy link
Contributor Author

@m1a2st m1a2st Sep 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hello @kirktrue, Thanks for your comments, I think we have two scenario to discuss this issue
If we want to block any input of partition.assignment.strategy, your method is more simplify and better, however if we allow user to input the default value of partition.assignment.strategy, we use the origin method is more easier to check.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would say we want to throw if there is any value defined by the user for this property (it's truly deprecated with the new protocol), so @kirktrue 's suggestion of working with the originals seems effective and simpler.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to @kirktrue @lianetm that checking the user-defined values makes sense to me.

For another, ConsumerConfig's constructor will log all configs, so it would be nice to set the config partition.assignment.strategy to null when the protocol is CONSUMER. This behavior is similar to maybeOverrideEnableAutoCommit which can avoid printing "weird" consumer configs in log file.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you refer to the log for unused? I did think about that, but then checked and noticed that it only logs based on the originals, not the defaults , so seems we shouldn't need to override to avoid the log

Set<String> keys = new HashSet<>(originals.keySet());

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you refer to the log for unused?

I think about logAll()

If we don't remove the default value of partition.assignment.strategy, it will print following log.

 [2024-09-04 19:45:40,533] INFO ConsumerConfig values: 
        ...
	group.protocol = CONSUMER
        ...
	partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor, class org.apache.kafka.clients.consumer.CooperativeStickyAssignor]

That is not a big issue, but the output could misdirect users to assume "those" configs are valid.

Copy link
Contributor Author

@m1a2st m1a2st Sep 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It make sense to me. +1 to @chia7712, the config log should not misdirect user.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

totally agree it will be confusing on the logAll, +1 to remove

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@m1a2st I think this comment can be addressed in a follow-up (https://issues.apache.org/jira/browse/KAFKA-17821). Could you please revert the related changes? Sorry for complicating this PR.

@kirktrue kirktrue added consumer KIP-848 The Next Generation of the Consumer Rebalance Protocol ctr Consumer Threading Refactor (KIP-848) labels Sep 3, 2024
@SuppressWarnings("unchecked")
private void checkPartitionAssigmentStrategy() {
List<Object> assignmentStrategies = (List<Object>) get(PARTITION_ASSIGNMENT_STRATEGY_CONFIG);
if (getString(GROUP_PROTOCOL_CONFIG).equalsIgnoreCase(GroupProtocol.CONSUMER.name())) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it would help readability to encapsulate this in something along the lines of:

Suggested change
if (getString(GROUP_PROTOCOL_CONFIG).equalsIgnoreCase(GroupProtocol.CONSUMER.name())) {
if (isConsumerProtocol()) {

And then a similar isClassicProtocol(). The trick is that we already have protocol checks in 2 places (this and checkGroupRemoteAssignor), and I expect we will need it again as we keep adding other consistency validations.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a good idea, I will extract these method.

@lianetm
Copy link
Contributor

lianetm commented Sep 4, 2024

High level comment, there are other properties in the exact same situation, basically properties that do not make sense with the new protocol, and we want to throw ConfigException if the user provides a value for them (we also want to remove the defaults as @chia7712 pointed out, to remove them from the log and avoid confusion).

Ok with me if we prefer to tackle them separately (I can file the jiras), but the implementation of the check seems the same, so wonder if we should consider it here, and instead of having a checkPartitionAssigmentStrategy (that we'll end replicating for each of these props), we have a single checkDeprecatedPropertyUsed(propId) (or a better name), doing the exact same check that we're considering here for the partition strategy. I expect we would need the same check for all these that are deprecated with the new protocol (still thinking if I'm missing another one):

  • heartbeat.interval.ms
  • session.timeout.ms
  • group.max.session.timeout.ms
  • group.mix.session.timeout.ms

Thoughts?

@m1a2st
Copy link
Contributor Author

m1a2st commented Sep 4, 2024

I think that I can address this check method in this PR, and remove these config default value when user config the consumer group protocol.

@chia7712
Copy link
Contributor

chia7712 commented Sep 4, 2024

heartbeat.interval.ms
session.timeout.ms
group.max.session.timeout.ms
group.mix.session.timeout.ms

+1 and thanks @m1a2st to address them in this PR :)

@chia7712
Copy link
Contributor

chia7712 commented Sep 5, 2024

group.max.session.timeout.ms
group.mix.session.timeout.ms

Those configs are used by server rather than client. The new coordinator supports CLASSIC protocol so we don't need to unset those configs I think.

@kirktrue
Copy link
Collaborator

kirktrue commented Oct 8, 2024

@m1a2st—I wanted to make sure that you weren't held up waiting for input from others. After the number of review comments gets to a certain size, it can be hard to tell where we stand. Let me know if you need us to review or respond to any outstanding questions. Thanks!

@m1a2st
Copy link
Contributor Author

m1a2st commented Oct 9, 2024

@kirktrue,
Thank you for your comments. I have a question regarding this:

  • Should we treat this as an issue and resolve it?
  • Or, is it not considered an issue, and setting it to zero is the appropriate solution?

@chia7712
Copy link
Contributor

@m1a2st could you please rebase code?

@github-actions github-actions bot added tools small Small PRs labels Oct 17, 2024
@chia7712
Copy link
Contributor

@m1a2st please check the failed tests

@github-actions github-actions bot added the core Kafka Broker label Oct 27, 2024
@github-actions github-actions bot removed the small Small PRs label Oct 27, 2024
Copy link
Collaborator

@kirktrue kirktrue left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the changes @m1a2st. Just a few more comments, mostly formatting.

Comment on lines 380 to 394
/**
* A list of configuration keys for CLASSIC protocol not supported. we should check the input string and clean up the
* default value.
*/
private static final List<String> CLASSIC_PROTOCOL_UNSUPPORTED_CONFIGS = Collections.singletonList(
GROUP_REMOTE_ASSIGNOR_CONFIG
);

/**
* A list of configuration keys for consumer protocol not supported. we should check the input string and clean up the
* default value.
*/
private static final List<String> CONSUMER_PROTOCOL_UNSUPPORTED_CONFIGS =
List.of(PARTITION_ASSIGNMENT_STRATEGY_CONFIG, HEARTBEAT_INTERVAL_MS_CONFIG, SESSION_TIMEOUT_MS_CONFIG);

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we change this to a Map<GroupProtocol, Set<String>> GROUP_PROTOCOL_UNSUPPORTED_CONFIGS to make it a little more concise?

Collections.unmodifiableList(Arrays.asList(
RANGE_ASSIGNOR_NAME,
ROUNDROBIN_ASSIGNOR_NAME,
STICKY_ASSIGNOR_NAME,
COOPERATIVE_STICKY_ASSIGNOR_NAME
));
List.of(RANGE_ASSIGNOR_NAME, ROUNDROBIN_ASSIGNOR_NAME, STICKY_ASSIGNOR_NAME, COOPERATIVE_STICKY_ASSIGNOR_NAME);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we keep the configuration on each line like it is now? That allows the list to expand or contract without messing with line wrapping. But this is good:

-         Collections.unmodifiableList(Arrays.asList(
+         List.of(

@@ -83,8 +83,10 @@ abstract class BaseConsumerTest extends AbstractConsumerTest {
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testCoordinatorFailover(quorum: String, groupProtocol: String): Unit = {
val listener = new TestConsumerReassignmentListener()
this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "5001")
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "1000")
if (groupProtocol.contains("CONSUMER")) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we only set these for the new consumer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a mistake, fix it

* default value.
*/
private static final List<String> CONSUMER_PROTOCOL_UNSUPPORTED_CONFIGS =
List.of(PARTITION_ASSIGNMENT_STRATEGY_CONFIG, HEARTBEAT_INTERVAL_MS_CONFIG, SESSION_TIMEOUT_MS_CONFIG);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lianetm earlier mentioned these configuration as being invalid for the new consumer:

  • heartbeat.interval.ms
  • session.timeout.ms
  • group.max.session.timeout.ms
  • group.mix.session.timeout.ms

It looks like we have the first two. Did we need to add the other two group session timeout configuration to that list?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes you are right, If this PR didn't need to deal with log problem, we should add group.max.session.timeout.ms and group.mix.session.timeout.ms to CONSUMER_PROTOCOL_UNSUPPORTED_CONFIGS list.

Comment on lines 400 to 401
"group.max.session.timeout.ms",
"group.mix.session.timeout.ms"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If use GroupCoordinatorConfig.GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG it will have a circle dependency, thus I use string, Im not sure it a good way to do it, or we should check it at `GroupCoordinatorConfig.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well these configs are used by the broker only, not the client, so I would say we don't deal with them here and let the broker drive it?

@@ -83,8 +83,10 @@ abstract class BaseConsumerTest extends AbstractConsumerTest {
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testCoordinatorFailover(quorum: String, groupProtocol: String): Unit = {
val listener = new TestConsumerReassignmentListener()
this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "5001")
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "1000")
if (groupProtocol.contains("CONSUMER")) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a mistake, fix it

@kirktrue
Copy link
Collaborator

@m1a2st—just checking in on this PR. It seems like we're really close. Is there anything you need from @lianetm or myself? Thanks!

# Conflicts:
#	tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java
#	tools/src/test/java/org/apache/kafka/tools/consumer/group/ListConsumerGroupTest.java
@m1a2st
Copy link
Contributor Author

m1a2st commented Nov 20, 2024

Hello @kirktrue, I think if this PR pass the CI, and won't have comments, we should merge this PR, WDYT?

Copy link
Contributor

@lianetm lianetm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this patch @m1a2st, just some minor comments for consideration.


/**
* A list of configuration keys for CLASSIC protocol not supported. we should check the input string and clean up the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* A list of configuration keys for CLASSIC protocol not supported. we should check the input string and clean up the
* A list of configuration keys not supported for CLASSIC protocol.

I'm also suggesting we remove the second sentence about "clean up default values" because I cannot find that we clean up values for unsupported configs, I only see we throw ConfigException. Am I missing something?

);

/**
* A list of configuration keys for consumer protocol not supported. we should check the input string and clean up the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* A list of configuration keys for consumer protocol not supported. we should check the input string and clean up the
* A list of configuration keys not supported for CONSUMER protocol.

Comment on lines 400 to 401
"group.max.session.timeout.ms",
"group.mix.session.timeout.ms"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well these configs are used by the broker only, not the client, so I would say we don't deal with them here and let the broker drive it?

unsupportedConfigs.forEach(configName -> {
Object config = originals().get(configName);
if (config != null && !Utils.isBlank(config.toString())) {
throw new ConfigException(configName + " cannot be set when " + GROUP_PROTOCOL_CONFIG + "=" + groupProtocol.name());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it be a better experience for the user if we gather all the unsupported configs used first, and then throw an error with all of them? I expect it will be helpful when users upgrade to the new protocol, and probably leave the existing config (with several unsupported).

Comment on lines 249 to 253
final Map<String, Object> configs = new HashMap<>();
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializerClass);
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializerClass);
configs.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name());
configs.put(configName, value);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Map.of?

configs.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name());
configs.put(configName, value);
ConfigException exception = assertThrows(ConfigException.class, () -> new ConsumerConfig(configs));
assertTrue(exception.getMessage().contains(configName +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can't we assertEquals here ?

@@ -337,7 +335,6 @@ private Map<String, Object> composeConfigs(ClusterInstance cluster, String group
configs.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
configs.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
configs.put(GROUP_PROTOCOL_CONFIG, groupProtocol);
configs.put(PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RangeAssignor.class.getName());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just to double check, even though we're removing this prop, I expect the test will still use the range assignor if under classic protocol just because range is the first assignor in the default value of the config. Correct?

@@ -413,7 +434,7 @@ public class ConsumerConfig extends AbstractConfig {
HEARTBEAT_INTERVAL_MS_DOC)
.define(PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
Type.LIST,
Arrays.asList(RangeAssignor.class, CooperativeStickyAssignor.class),
PARTITION_ASSIGNOR_DEFAULT_VALUE,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This constant was introduced to be reused and that made sense, but seems it's only used here now right? If so I think keeping the default explicitly here as it was is more convenient, it just makes it easier to discover (I personally navigate to this definition a lot looking for defaults :))

# Conflicts:
#	clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@m1a2st
Copy link
Contributor Author

m1a2st commented Nov 23, 2024

Thanks for @lianetm review, addressed all comments

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
clients consumer core Kafka Broker ctr Consumer Threading Refactor (KIP-848) KIP-848 The Next Generation of the Consumer Rebalance Protocol tools
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants