Skip to content

Commit

Permalink
fix(provider/connect): fix NPE on connector config deletion (#483)
Browse files Browse the repository at this point in the history
Fix: #483
  • Loading branch information
fhussonnois committed Aug 15, 2024
1 parent 1738181 commit 1c90d75
Showing 1 changed file with 13 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,6 @@
*/
package io.streamthoughts.jikkou.kafka.connect.change;

import static io.streamthoughts.jikkou.kafka.connect.KafkaConnectConstants.CONNECTOR_CLASS_CONFIG;
import static io.streamthoughts.jikkou.kafka.connect.KafkaConnectConstants.CONNECTOR_TASKS_MAX_CONFIG;
import static io.streamthoughts.jikkou.kafka.connect.change.KafkaConnectorChangeComputer.DATA_CONNECTOR_CLASS;

import io.streamthoughts.jikkou.core.data.TypeConverter;
import io.streamthoughts.jikkou.core.models.change.ResourceChange;
import io.streamthoughts.jikkou.core.models.change.SpecificStateChange;
Expand All @@ -26,6 +22,9 @@
import io.streamthoughts.jikkou.kafka.connect.api.data.ConnectorInfoResponse;
import io.streamthoughts.jikkou.kafka.connect.api.data.ErrorResponse;
import io.streamthoughts.jikkou.kafka.connect.models.KafkaConnectorState;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.VisibleForTesting;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -35,8 +34,10 @@
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.VisibleForTesting;

import static io.streamthoughts.jikkou.kafka.connect.KafkaConnectConstants.CONNECTOR_CLASS_CONFIG;
import static io.streamthoughts.jikkou.kafka.connect.KafkaConnectConstants.CONNECTOR_TASKS_MAX_CONFIG;
import static io.streamthoughts.jikkou.kafka.connect.change.KafkaConnectorChangeComputer.DATA_CONNECTOR_CLASS;

public final class KafkaConnectorChangeHandler extends BaseChangeHandler<ResourceChange> {

Expand Down Expand Up @@ -107,8 +108,9 @@ private Stream<ChangeResponse<ResourceChange>> deleteConnector(ResourceChange ch

@NotNull
private Stream<ChangeResponse<ResourceChange>> createOrUpdateConnectorConfig(ResourceChange change) {
final Map<String, Object> configAsMap = buildConnectorConfig(change);
CompletableFuture<ConnectorInfoResponse> future = CompletableFuture.supplyAsync(() ->
api.createOrUpdateConnector(change.getMetadata().getName(), buildConnectorConfig(change))
api.createOrUpdateConnector(change.getMetadata().getName(), configAsMap)
);

ChangeResponse<ResourceChange> response = toChangeResponse(change, future);
Expand Down Expand Up @@ -140,10 +142,11 @@ static boolean isStateOnlyChange(ResourceChange change) {
return getState(change).getOp() != Operation.NONE;
}

private Map<String, Object> buildConnectorConfig(ResourceChange change) {
private Map<String, Object> buildConnectorConfig(final ResourceChange change) {
Map<String, Object> configs = getConfig(change)
.stream()
.collect(Collectors.toMap(StateChange::getName, StateChange::getAfter));
.stream()
.filter(state -> state.getOp() != Operation.DELETE && state.getAfter() != null)
.collect(Collectors.toMap(StateChange::getName, StateChange::getAfter));
Map<String, Object> config = new HashMap<>();
config.put(CONNECTOR_TASKS_MAX_CONFIG, getTasksMax(change).getAfter());
config.put(CONNECTOR_CLASS_CONFIG, getConnectorClass(change).getAfter());
Expand Down

0 comments on commit 1c90d75

Please sign in to comment.