From 1c90d750f27821eec22958a11d7bee9036547f3c Mon Sep 17 00:00:00 2001 From: Florian Hussonnois Date: Thu, 15 Aug 2024 23:14:16 +0200 Subject: [PATCH] fix(provider/connect): fix NPE on connector config deletion (#483) Fix: #483 --- .../change/KafkaConnectorChangeHandler.java | 23 +++++++++++-------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/providers/jikkou-provider-kafka-connect/src/main/java/io/streamthoughts/jikkou/kafka/connect/change/KafkaConnectorChangeHandler.java b/providers/jikkou-provider-kafka-connect/src/main/java/io/streamthoughts/jikkou/kafka/connect/change/KafkaConnectorChangeHandler.java index 3faf916a2..fe071fd3f 100644 --- a/providers/jikkou-provider-kafka-connect/src/main/java/io/streamthoughts/jikkou/kafka/connect/change/KafkaConnectorChangeHandler.java +++ b/providers/jikkou-provider-kafka-connect/src/main/java/io/streamthoughts/jikkou/kafka/connect/change/KafkaConnectorChangeHandler.java @@ -6,10 +6,6 @@ */ package io.streamthoughts.jikkou.kafka.connect.change; -import static io.streamthoughts.jikkou.kafka.connect.KafkaConnectConstants.CONNECTOR_CLASS_CONFIG; -import static io.streamthoughts.jikkou.kafka.connect.KafkaConnectConstants.CONNECTOR_TASKS_MAX_CONFIG; -import static io.streamthoughts.jikkou.kafka.connect.change.KafkaConnectorChangeComputer.DATA_CONNECTOR_CLASS; - import io.streamthoughts.jikkou.core.data.TypeConverter; import io.streamthoughts.jikkou.core.models.change.ResourceChange; import io.streamthoughts.jikkou.core.models.change.SpecificStateChange; @@ -26,6 +22,9 @@ import io.streamthoughts.jikkou.kafka.connect.api.data.ConnectorInfoResponse; import io.streamthoughts.jikkou.kafka.connect.api.data.ErrorResponse; import io.streamthoughts.jikkou.kafka.connect.models.KafkaConnectorState; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.VisibleForTesting; + import java.util.HashMap; import java.util.List; import java.util.Map; @@ -35,8 +34,10 @@ import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; import java.util.stream.Stream; -import org.jetbrains.annotations.NotNull; -import org.jetbrains.annotations.VisibleForTesting; + +import static io.streamthoughts.jikkou.kafka.connect.KafkaConnectConstants.CONNECTOR_CLASS_CONFIG; +import static io.streamthoughts.jikkou.kafka.connect.KafkaConnectConstants.CONNECTOR_TASKS_MAX_CONFIG; +import static io.streamthoughts.jikkou.kafka.connect.change.KafkaConnectorChangeComputer.DATA_CONNECTOR_CLASS; public final class KafkaConnectorChangeHandler extends BaseChangeHandler { @@ -107,8 +108,9 @@ private Stream> deleteConnector(ResourceChange ch @NotNull private Stream> createOrUpdateConnectorConfig(ResourceChange change) { + final Map configAsMap = buildConnectorConfig(change); CompletableFuture future = CompletableFuture.supplyAsync(() -> - api.createOrUpdateConnector(change.getMetadata().getName(), buildConnectorConfig(change)) + api.createOrUpdateConnector(change.getMetadata().getName(), configAsMap) ); ChangeResponse response = toChangeResponse(change, future); @@ -140,10 +142,11 @@ static boolean isStateOnlyChange(ResourceChange change) { return getState(change).getOp() != Operation.NONE; } - private Map buildConnectorConfig(ResourceChange change) { + private Map buildConnectorConfig(final ResourceChange change) { Map configs = getConfig(change) - .stream() - .collect(Collectors.toMap(StateChange::getName, StateChange::getAfter)); + .stream() + .filter(state -> state.getOp() != Operation.DELETE && state.getAfter() != null) + .collect(Collectors.toMap(StateChange::getName, StateChange::getAfter)); Map config = new HashMap<>(); config.put(CONNECTOR_TASKS_MAX_CONFIG, getTasksMax(change).getAfter()); config.put(CONNECTOR_CLASS_CONFIG, getConnectorClass(change).getAfter());