Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

{DO NOT MERGE] ccs jenkins fix #944

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -336,9 +336,9 @@

<!-- storage -->
<suppress checks="CyclomaticComplexity"
files="(LogValidator|RemoteLogManagerConfig).java"/>
files="(LogValidator|RemoteLogManagerConfig|RemoteLogManager).java"/>
<suppress checks="NPathComplexity"
files="(LogValidator|RemoteIndexCache).java"/>
files="(LogValidator|RemoteLogManager|RemoteIndexCache).java"/>
<suppress checks="ParameterNumber"
files="(LogAppendInfo|RemoteLogManagerConfig).java"/>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1228,7 +1228,7 @@ private void handleResponses(long now, List<ClientResponse> responses) {
call.fail(now, authException);
} else {
call.fail(now, new DisconnectException(String.format(
"Cancelled %s request with correlation id %s due to node %s being disconnected",
"Cancelled %s request with correlation id %d due to node %s being disconnected",
call.callName, correlationId, response.destination())));
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@ private void appendWithOffset(long offset, boolean isControlRecord, long timesta
throw new IllegalArgumentException("Control records can only be appended to control batches");

if (lastOffset != null && offset <= lastOffset)
throw new IllegalArgumentException(String.format("Illegal offset %s following previous offset %s " +
throw new IllegalArgumentException(String.format("Illegal offset %d following previous offset %d " +
"(Offsets must increase monotonically).", offset, lastOffset));

if (timestamp < 0 && timestamp != RecordBatch.NO_TIMESTAMP)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ static String handleOutput(final HttpURLConnection con) throws IOException {
errorResponseBody);

if (responseBody == null || responseBody.isEmpty())
throw new IOException(String.format("The token endpoint response was unexpectedly empty despite response code %s from %s and error message %s",
throw new IOException(String.format("The token endpoint response was unexpectedly empty despite response code %d from %s and error message %s",
responseCode, con.getURL(), formatErrorMessage(errorResponseBody)));

return responseBody;
Expand Down Expand Up @@ -337,7 +337,7 @@ static String parseAccessToken(String responseBody) throws IOException {
if (snippet.length() > MAX_RESPONSE_BODY_LENGTH) {
int actualLength = responseBody.length();
String s = responseBody.substring(0, MAX_RESPONSE_BODY_LENGTH);
snippet = String.format("%s (trimmed to first %s characters out of %s total)", s, MAX_RESPONSE_BODY_LENGTH, actualLength);
snippet = String.format("%s (trimmed to first %d characters out of %d total)", s, MAX_RESPONSE_BODY_LENGTH, actualLength);
}

throw new IOException(String.format("The token endpoint response did not contain an access_token value. Response: (%s)", snippet));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -348,8 +348,8 @@ public boolean maybeExpediteRefresh(String keyId) {
// 1. Don't try to resolve the key as the large ID will sit in our cache
// 2. Report the issue in the logs but include only the first N characters
int actualLength = keyId.length();
String s = keyId.substring(0, MISSING_KEY_ID_MAX_KEY_LENGTH);
String snippet = String.format("%s (trimmed to first %s characters out of %s total)", s, MISSING_KEY_ID_MAX_KEY_LENGTH, actualLength);
String trimmedKeyId = keyId.substring(0, MISSING_KEY_ID_MAX_KEY_LENGTH);
String snippet = String.format("%s (trimmed to first %d characters out of %d total)", trimmedKeyId, MISSING_KEY_ID_MAX_KEY_LENGTH, actualLength);
log.warn("Key ID {} was too long to cache", snippet);
return false;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public SerializedJwt(String token) {
String[] splits = token.split("\\.");

if (splits.length != 3)
throw new ValidateException(String.format("Malformed JWT provided (%s); expected three sections (header, payload, and signature), but %s sections provided",
throw new ValidateException(String.format("Malformed JWT provided (%s); expected three sections (header, payload, and signature), but %d sections provided",
token, splits.length));

this.token = token.trim();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@
import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonMap;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
Expand Down Expand Up @@ -493,6 +494,12 @@ public void testNoSerializerProvided() {
Properties producerProps = new Properties();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
assertThrows(ConfigException.class, () -> new KafkaProducer(producerProps));

final Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");

// Invalid value null for configuration key.serializer: must be non-null.
assertThrows(ConfigException.class, () -> new KafkaProducer<String, String>(configs));
}

@Test
Expand Down Expand Up @@ -2399,4 +2406,23 @@ public KafkaProducer<T, T> newKafkaProducer() {
}
}

@Test
void testDeliveryTimeoutAndLingerMsConfig() {
final Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.CLIENT_ID_CONFIG, "testDeliveryTimeoutAndLingerMsConfig");
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
configs.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 1000);
configs.put(ProducerConfig.LINGER_MS_CONFIG, 1000);
configs.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 1);

// delivery.timeout.ms should be equal to or larger than linger.ms + request.timeout.ms
assertThrows(KafkaException.class, () -> new KafkaProducer<>(configs, new StringSerializer(), new StringSerializer()));

configs.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 1000);
configs.put(ProducerConfig.LINGER_MS_CONFIG, 999);
configs.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 1);

assertDoesNotThrow(() -> new KafkaProducer<>(configs, new StringSerializer(), new StringSerializer()));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@
import java.util.concurrent.atomic.AtomicInteger;

import static java.util.Arrays.asList;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class BuiltInPartitionerTest {
Expand Down Expand Up @@ -195,4 +197,10 @@ public void adaptivePartitionsTest() {
"Partition " + i + " was chosen " + frequencies[i] + " times");
}
}

@Test
void testStickyBatchSizeMoreThatZero() {
assertThrows(IllegalArgumentException.class, () -> new BuiltInPartitioner(logContext, TOPIC_A, 0));
assertDoesNotThrow(() -> new BuiltInPartitioner(logContext, TOPIC_A, 1));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -262,8 +262,6 @@ public void shutdownClusters() throws Exception {
for (String x : backup.connectors()) {
backup.deleteConnector(x);
}
deleteAllTopics(primary.kafka());
deleteAllTopics(backup.kafka());
} finally {
shuttingDown = true;
try {
Expand Down Expand Up @@ -1049,17 +1047,6 @@ protected static void waitForTopicCreated(EmbeddedConnectCluster cluster, String
}
}

/*
* delete all topics of the input kafka cluster
*/
private static void deleteAllTopics(EmbeddedKafkaCluster cluster) throws Exception {
try (final Admin adminClient = cluster.createAdminClient()) {
Set<String> topicsToBeDeleted = adminClient.listTopics().names().get();
log.debug("Deleting topics: {} ", topicsToBeDeleted);
adminClient.deleteTopics(topicsToBeDeleted).all().get();
}
}

/*
* retrieve the config value based on the input cluster, topic and config name
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ protected Map<ConnectorTaskId, Map<String, String>> buildTasksConfig(String conn

Map<ConnectorTaskId, Map<String, String>> configs = new HashMap<>();
for (ConnectorTaskId cti : configState.tasks(connector)) {
configs.put(cti, configState.taskConfig(cti));
configs.put(cti, configState.rawTaskConfig(cti));
}

return configs;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -579,7 +579,7 @@ public int hashCode() {
public void tasksConfig(String connName, Callback<Map<ConnectorTaskId, Map<String, String>>> callback) {
Map<ConnectorTaskId, Map<String, String>> tasksConfig = buildTasksConfig(connName);
if (tasksConfig.isEmpty()) {
callback.onCompletion(new NotFoundException("Connector " + connName + " not found"), tasksConfig);
callback.onCompletion(new NotFoundException("Connector " + connName + " not found"), null);
return;
}
callback.onCompletion(null, tasksConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2091,6 +2091,8 @@ public void testAccessors() throws Exception {
herder.connectorConfig(CONN1, connectorConfigCb);
FutureCallback<List<TaskInfo>> taskConfigsCb = new FutureCallback<>();
herder.taskConfigs(CONN1, taskConfigsCb);
FutureCallback<Map<ConnectorTaskId, Map<String, String>>> tasksConfigCb = new FutureCallback<>();
herder.tasksConfig(CONN1, tasksConfigCb);

herder.tick();
assertTrue(listConnectorsCb.isDone());
Expand All @@ -2107,6 +2109,11 @@ public void testAccessors() throws Exception {
new TaskInfo(TASK1, TASK_CONFIG),
new TaskInfo(TASK2, TASK_CONFIG)),
taskConfigsCb.get());
Map<ConnectorTaskId, Map<String, String>> tasksConfig = new HashMap<>();
tasksConfig.put(TASK0, TASK_CONFIG);
tasksConfig.put(TASK1, TASK_CONFIG);
tasksConfig.put(TASK2, TASK_CONFIG);
assertEquals(tasksConfig, tasksConfigCb.get());

// Config transformation should not occur when requesting connector or task info
verify(configTransformer, never()).transform(eq(CONN1), any());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -765,6 +765,7 @@ public void testAccessors() throws Exception {
Callback<ConnectorInfo> connectorInfoCb = PowerMock.createMock(Callback.class);
Callback<Map<String, String>> connectorConfigCb = PowerMock.createMock(Callback.class);
Callback<List<TaskInfo>> taskConfigsCb = PowerMock.createMock(Callback.class);
Callback<Map<ConnectorTaskId, Map<String, String>>> tasksConfigCb = PowerMock.createMock(Callback.class);

// Check accessors with empty worker
listConnectorsCb.onCompletion(null, Collections.EMPTY_SET);
Expand All @@ -775,6 +776,8 @@ public void testAccessors() throws Exception {
EasyMock.expectLastCall();
taskConfigsCb.onCompletion(EasyMock.<NotFoundException>anyObject(), EasyMock.isNull());
EasyMock.expectLastCall();
tasksConfigCb.onCompletion(EasyMock.<NotFoundException>anyObject(), EasyMock.isNull());
EasyMock.expectLastCall();

// Create connector
connector = PowerMock.createMock(BogusSourceConnector.class);
Expand All @@ -795,6 +798,10 @@ public void testAccessors() throws Exception {
taskConfigsCb.onCompletion(null, Arrays.asList(taskInfo));
EasyMock.expectLastCall();

Map<ConnectorTaskId, Map<String, String>> tasksConfig = Collections.singletonMap(new ConnectorTaskId(CONNECTOR_NAME, 0),
taskConfig(SourceSink.SOURCE));
tasksConfigCb.onCompletion(null, tasksConfig);
EasyMock.expectLastCall();

PowerMock.replayAll();

Expand All @@ -803,6 +810,7 @@ public void testAccessors() throws Exception {
herder.connectorInfo(CONNECTOR_NAME, connectorInfoCb);
herder.connectorConfig(CONNECTOR_NAME, connectorConfigCb);
herder.taskConfigs(CONNECTOR_NAME, taskConfigsCb);
herder.tasksConfig(CONNECTOR_NAME, tasksConfigCb);

herder.putConnectorConfig(CONNECTOR_NAME, connConfig, false, createCallback);
Herder.Created<ConnectorInfo> connectorInfo = createCallback.get(1000L, TimeUnit.SECONDS);
Expand All @@ -818,6 +826,7 @@ public void testAccessors() throws Exception {
herder.connectorInfo(CONNECTOR_NAME, connectorInfoCb);
herder.connectorConfig(CONNECTOR_NAME, connectorConfigCb);
herder.taskConfigs(CONNECTOR_NAME, taskConfigsCb);
herder.tasksConfig(CONNECTOR_NAME, tasksConfigCb);

PowerMock.verifyAll();
}
Expand Down
Loading