Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-17338 ConsumerConfig should prevent using partition assignors with CONSUMER group protocol #16899

Open
wants to merge 30 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
5f17f52
validate PARTITION_ASSIGNMENT_STRATEGY_CONFIG in consumer protocol
m1a2st Aug 16, 2024
38fa6c9
fix checkPartitionAssigmentStrategy logic problem
m1a2st Aug 20, 2024
59da76d
fix default value logic
m1a2st Aug 22, 2024
2ee47e6
refactor checkPartitionAssigmentStrategy method
m1a2st Aug 31, 2024
e8354a6
fix test
m1a2st Sep 1, 2024
ec363a1
change PARTITION_ASSIGNOR_DEFAULT_VALUE into immutable list
m1a2st Sep 2, 2024
9117089
refactor checkPartitionAssignmentStrategy method
m1a2st Sep 4, 2024
be90c63
remove unused test
m1a2st Sep 4, 2024
6c975f5
remove unused test
m1a2st Sep 4, 2024
8bcbdee
use spotless
m1a2st Sep 4, 2024
8846272
add check for consumer protocol
m1a2st Sep 5, 2024
0e77b0d
update test
m1a2st Sep 5, 2024
9edd23f
update the config check
m1a2st Sep 6, 2024
f705555
fix clearConfig config
m1a2st Sep 6, 2024
55108c5
fix test
m1a2st Sep 6, 2024
c805310
fix test
m1a2st Sep 6, 2024
a03d614
Merge branch 'trunk' into KAFKA-17338
m1a2st Oct 17, 2024
07a7a7e
revert print log issue code
m1a2st Oct 17, 2024
b81c90a
fix test error
m1a2st Oct 27, 2024
baa285e
fix test error
m1a2st Oct 27, 2024
c306364
fix test error
m1a2st Oct 27, 2024
0841459
addressed by comments
m1a2st Oct 31, 2024
76e5c85
addressed by comments
m1a2st Oct 31, 2024
15d3255
Merge branch 'trunk' into KAFKA-17338
m1a2st Nov 20, 2024
8fc8205
remove unused import
m1a2st Nov 20, 2024
fbf606b
Merge branch 'trunk' into KAFKA-17338
m1a2st Nov 23, 2024
c0bb015
addressed by comments
m1a2st Nov 23, 2024
abfb729
addressed by comments
m1a2st Nov 23, 2024
1d3dbc0
addressed by comment
m1a2st Nov 26, 2024
e038978
addressed by comment
m1a2st Nov 27, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.utils.Utils;

import java.util.Arrays;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand All @@ -60,8 +60,12 @@ public class ConsumerConfig extends AbstractConfig {

// a list contains all the assignor names that only assign subscribed topics to consumer. Should be updated when new assignor added.
// This is to help optimize ConsumerCoordinator#performAssignment method
public static final List<String> ASSIGN_FROM_SUBSCRIBED_ASSIGNORS =
List.of(RANGE_ASSIGNOR_NAME, ROUNDROBIN_ASSIGNOR_NAME, STICKY_ASSIGNOR_NAME, COOPERATIVE_STICKY_ASSIGNOR_NAME);
public static final List<String> ASSIGN_FROM_SUBSCRIBED_ASSIGNORS = List.of(
RANGE_ASSIGNOR_NAME,
ROUNDROBIN_ASSIGNOR_NAME,
STICKY_ASSIGNOR_NAME,
COOPERATIVE_STICKY_ASSIGNOR_NAME
);

/*
* NOTE: DO NOT CHANGE EITHER CONFIG STRINGS OR THEIR JAVA VARIABLE NAMES AS
Expand Down Expand Up @@ -375,6 +379,22 @@ public class ConsumerConfig extends AbstractConfig {

private static final AtomicInteger CONSUMER_CLIENT_ID_SEQUENCE = new AtomicInteger(1);

/**
* A list of configuration keys not supported for CLASSIC protocol.
*/
private static final List<String> CLASSIC_PROTOCOL_UNSUPPORTED_CONFIGS = Collections.singletonList(
GROUP_REMOTE_ASSIGNOR_CONFIG
);

/**
* A list of configuration keys not supported for CONSUMER protocol.
*/
private static final List<String> CONSUMER_PROTOCOL_UNSUPPORTED_CONFIGS = List.of(
PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
HEARTBEAT_INTERVAL_MS_CONFIG,
SESSION_TIMEOUT_MS_CONFIG
);

static {
CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG,
Type.LIST,
Expand Down Expand Up @@ -408,7 +428,7 @@ public class ConsumerConfig extends AbstractConfig {
HEARTBEAT_INTERVAL_MS_DOC)
.define(PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
Type.LIST,
Arrays.asList(RangeAssignor.class, CooperativeStickyAssignor.class),
List.of(RangeAssignor.class, CooperativeStickyAssignor.class),
new ConfigDef.NonNullValidator(),
Importance.MEDIUM,
PARTITION_ASSIGNMENT_STRATEGY_DOC)
Expand Down Expand Up @@ -666,7 +686,8 @@ protected Map<String, Object> postProcessParsedConfig(final Map<String, Object>
Map<String, Object> refinedConfigs = CommonClientConfigs.postProcessReconnectBackoffConfigs(this, parsedValues);
maybeOverrideClientId(refinedConfigs);
maybeOverrideEnableAutoCommit(refinedConfigs);
checkGroupRemoteAssignor();
checkUnsupportedConfigs(GroupProtocol.CLASSIC, CLASSIC_PROTOCOL_UNSUPPORTED_CONFIGS);
checkUnsupportedConfigs(GroupProtocol.CONSUMER, CONSUMER_PROTOCOL_UNSUPPORTED_CONFIGS);
Comment on lines +689 to +690
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we simplify here and call checkUnsupportedConfigs once, with the groupProtocol read from the config? (seems clearer to have a single call that would encapsulate the logic to determine the unsupportedConfigs list to use based on the protocol used in the config, which is only one, seems a bit confusing how we check here for the 2 protocols). Makes sense?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer the current approach because it allows us to reuse this method in the future if new protocols are added, without having to add additional if-else statements to the original method. WDYT?

return refinedConfigs;
}

Expand Down Expand Up @@ -713,9 +734,19 @@ private void maybeOverrideEnableAutoCommit(Map<String, Object> configs) {
}
}

private void checkGroupRemoteAssignor() {
if (getString(GROUP_PROTOCOL_CONFIG).equalsIgnoreCase(GroupProtocol.CLASSIC.name()) && getString(GROUP_REMOTE_ASSIGNOR_CONFIG) != null && !getString(GROUP_REMOTE_ASSIGNOR_CONFIG).isEmpty()) {
throw new ConfigException(GROUP_REMOTE_ASSIGNOR_CONFIG + " cannot be set when " + GROUP_PROTOCOL_CONFIG + "=" + GroupProtocol.CLASSIC.name());
private void checkUnsupportedConfigs(GroupProtocol groupProtocol, List<String> unsupportedConfigs) {
if (getString(GROUP_PROTOCOL_CONFIG).equalsIgnoreCase(groupProtocol.name())) {
List<String> invalidConfigs = new ArrayList<>();
unsupportedConfigs.forEach(configName -> {
Object config = originals().get(configName);
if (config != null && !Utils.isBlank(config.toString())) {
invalidConfigs.add(configName);
}
});
if (!invalidConfigs.isEmpty()) {
throw new ConfigException(String.join(", ", invalidConfigs) +
" cannot be set when " + GROUP_PROTOCOL_CONFIG + "=" + groupProtocol.name());
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,4 +237,23 @@ public void testProtocolConfigValidation(String protocol, boolean isValid) {
assertThrows(ConfigException.class, () -> new ConsumerConfig(configs));
}
}

@Test
public void testUnsupportedConfigsWithConsumerGroupProtocol() {
testUnsupportedConfigsWithConsumerGroupProtocol(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "RoundRobinAssignor");
testUnsupportedConfigsWithConsumerGroupProtocol(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);
testUnsupportedConfigsWithConsumerGroupProtocol(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
}

private void testUnsupportedConfigsWithConsumerGroupProtocol(String configName, Object value) {
final Map<String, Object> configs = Map.of(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializerClass,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializerClass,
ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name(),
configName, value
);
ConfigException exception = assertThrows(ConfigException.class, () -> new ConsumerConfig(configs));
assertEquals(configName + " cannot be set when " +
ConsumerConfig.GROUP_PROTOCOL_CONFIG + "=" + GroupProtocol.CONSUMER.name(), exception.getMessage());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3064,7 +3064,10 @@ private ConsumerConfig newConsumerConfig(GroupProtocol groupProtocol,
configs.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, minBytes);
configs.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
configs.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name());
configs.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, heartbeatIntervalMs);
if (groupProtocol == GroupProtocol.CLASSIC) {
configs.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, heartbeatIntervalMs);
configs.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeoutMs);
}
configs.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_UNCOMMITTED.toString());
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configs.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, fetchSize);
Expand All @@ -3073,7 +3076,6 @@ private ConsumerConfig newConsumerConfig(GroupProtocol groupProtocol,
configs.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, requestTimeoutMs);
configs.put(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG, retryBackoffMaxMs);
configs.put(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG, retryBackoffMs);
configs.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeoutMs);
configs.put(ConsumerConfig.THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED, throwOnStableOffsetNotSupported);
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getClass());
groupInstanceId.ifPresent(gi -> configs.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, gi));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1533,18 +1533,6 @@ public void testGroupRemoteAssignorUsedInConsumerProtocol() {
assertFalse(config.unused().contains(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG));
}

@Test
public void testPartitionAssignmentStrategyUnusedInAsyncConsumer() {
final Properties props = requiredConsumerConfig();
props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroup1");
props.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT));
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "CooperativeStickyAssignor");
final ConsumerConfig config = new ConsumerConfig(props);
consumer = newConsumer(config);

assertTrue(config.unused().contains(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG));
}

@Test
public void testGroupIdNull() {
final Properties props = requiredConsumerConfig();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package kafka.api

import kafka.utils.TestInfoUtils
import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig}
import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, GroupProtocol}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig}
import org.apache.kafka.common.header.Headers
import org.apache.kafka.common.{ClusterResource, ClusterResourceListener, PartitionInfo}
Expand Down Expand Up @@ -85,8 +85,10 @@ abstract class BaseConsumerTest extends AbstractConsumerTest {
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testCoordinatorFailover(quorum: String, groupProtocol: String): Unit = {
val listener = new TestConsumerReassignmentListener()
this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "5001")
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "1000")
if (groupProtocol.equals(GroupProtocol.CLASSIC.name)) {
this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "5001")
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "1000")
}
// Use higher poll timeout to avoid consumer leaving the group due to timeout
this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "15000")
val consumer = createConsumer()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,10 @@ class PlaintextConsumerPollTest extends AbstractConsumerTest {
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testMaxPollIntervalMs(quorum: String, groupProtocol: String): Unit = {
this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 1000.toString)
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 500.toString)
this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 2000.toString)
if (groupProtocol.equals(GroupProtocol.CLASSIC.name)) {
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 500.toString)
this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 2000.toString)
}

val consumer = createConsumer()

Expand All @@ -87,8 +89,10 @@ class PlaintextConsumerPollTest extends AbstractConsumerTest {
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testMaxPollIntervalMsDelayInRevocation(quorum: String, groupProtocol: String): Unit = {
this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 5000.toString)
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 500.toString)
this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 1000.toString)
if (groupProtocol.equals(GroupProtocol.CLASSIC.name)) {
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 500.toString)
this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 1000.toString)
}
this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false.toString)

val consumer = createConsumer()
Expand Down Expand Up @@ -129,8 +133,10 @@ class PlaintextConsumerPollTest extends AbstractConsumerTest {
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testMaxPollIntervalMsDelayInAssignment(quorum: String, groupProtocol: String): Unit = {
this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 5000.toString)
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 500.toString)
this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 1000.toString)
if (groupProtocol.equals(GroupProtocol.CLASSIC.name)) {
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 500.toString)
this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 1000.toString)
}
this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false.toString)

val consumer = createConsumer()
Expand All @@ -154,7 +160,9 @@ class PlaintextConsumerPollTest extends AbstractConsumerTest {
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testMaxPollIntervalMsShorterThanPollTimeout(quorum: String, groupProtocol: String): Unit = {
this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 1000.toString)
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 500.toString)
if (groupProtocol.equals(GroupProtocol.CLASSIC.name)) {
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 500.toString)
}

val consumer = createConsumer()
val listener = new TestConsumerReassignmentListener
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,8 +211,10 @@ class PlaintextConsumerSubscriptionTest extends AbstractConsumerTest {
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testUnsubscribeTopic(quorum: String, groupProtocol: String): Unit = {
this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "100") // timeout quickly to avoid slow test
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "30")
if (groupProtocol.equals(GroupProtocol.CLASSIC.name)) {
this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "100") // timeout quickly to avoid slow test
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "30")
}
val consumer = createConsumer()

val listener = new TestConsumerReassignmentListener()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -388,8 +388,10 @@ class PlaintextConsumerTest extends BaseConsumerTest {
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testPauseStateNotPreservedByRebalance(quorum: String, groupProtocol: String): Unit = {
this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "100") // timeout quickly to avoid slow test
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "30")
if (groupProtocol.equals(GroupProtocol.CLASSIC.name)) {
this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "100") // timeout quickly to avoid slow test
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "30")
}
val consumer = createConsumer()

val producer = createProducer()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -337,8 +337,9 @@ private Map<String, Object> composeConfigs(ClusterInstance cluster, String group
configs.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
configs.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
configs.put(GROUP_PROTOCOL_CONFIG, groupProtocol);
configs.put(PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RangeAssignor.class.getName());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just to double check, even though we're removing this prop, I expect the test will still use the range assignor if under classic protocol just because range is the first assignor in the default value of the config. Correct?


if (GroupProtocol.CLASSIC.name.equalsIgnoreCase(groupProtocol)) {
configs.put(PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RangeAssignor.class.getName());
}
configs.putAll(customConfigs);
return configs;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,9 @@ private Consumer<byte[], byte[]> createConsumer(String group, GroupProtocol grou
consumerConfig.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
// Increase timeouts to avoid having a rebalance during the test
consumerConfig.putIfAbsent(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, Integer.toString(Integer.MAX_VALUE));
consumerConfig.putIfAbsent(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, Integer.toString(GroupCoordinatorConfig.GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT));
if (groupProtocol == GroupProtocol.CLASSIC) {
consumerConfig.putIfAbsent(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, Integer.toString(GroupCoordinatorConfig.GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT));
}

return new KafkaConsumer<>(consumerConfig);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -992,7 +992,6 @@ private Map<String, Object> composeConfigs(String groupId, String groupProtocol,
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
configs.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol);
configs.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RangeAssignor.class.getName());

configs.putAll(customConfigs);
return configs;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -511,7 +511,9 @@ private Map<String, Object> composeConfigs(String groupId, String groupProtocol,
configs.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
configs.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
configs.put(GROUP_PROTOCOL_CONFIG, groupProtocol);
configs.put(PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RangeAssignor.class.getName());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we explicitly set it as you did on DeleteOffsetsConsumerGroupCommandIntegrationTest?

if (GroupProtocol.CLASSIC.name.equalsIgnoreCase(groupProtocol)) {
configs.put(PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RangeAssignor.class.getName());
}

configs.putAll(customConfigs);
return configs;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -800,9 +800,11 @@ private Map<String, Object> composeConsumerConfigs(ClusterInstance cluster,
configs.put(GROUP_PROTOCOL_CONFIG, groupProtocol.name);
configs.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
configs.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
configs.put(PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RangeAssignor.class.getName());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we explicitly set it as you did on DeleteOffsetsConsumerGroupCommandIntegrationTest?

configs.put(AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
configs.put(GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, 1000);
if (GroupProtocol.CLASSIC == groupProtocol) {
configs.put(PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RangeAssignor.class.getName());
}
return configs;
}

Expand Down