From 3f7f5b07bb2d421cef1b1541136849fc3e6c18ee Mon Sep 17 00:00:00 2001 From: Agonnee <67187469+Agonnee@users.noreply.github.com> Date: Wed, 18 Oct 2023 15:02:52 -0700 Subject: [PATCH] Added deduplication for task partition assignment --- .../NamespaceAwareCoordinatorStreamStore.java | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/NamespaceAwareCoordinatorStreamStore.java b/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/NamespaceAwareCoordinatorStreamStore.java index f5b99d7b0d..e4c7aabf3c 100644 --- a/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/NamespaceAwareCoordinatorStreamStore.java +++ b/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/NamespaceAwareCoordinatorStreamStore.java @@ -19,8 +19,11 @@ package org.apache.samza.coordinator.metadatastore; import com.google.common.annotations.VisibleForTesting; + +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.stream.Collectors; @@ -32,6 +35,9 @@ */ public class NamespaceAwareCoordinatorStreamStore implements MetadataStore { + private static final String SET_TASK_PARTITION_ASSIGNMENT = "set-task-partition-assignment"; + private static final String KEY_BUCKET = ",\"keyBucket\":-1}"; + private static final String WITHOUT_KEY_BUCKET = "}"; private final MetadataStore metadataStore; private final String namespace; @@ -105,16 +111,23 @@ String getCoordinatorMessageKey(String key) { private Map readMessagesFromCoordinatorStore() { Map bootstrappedMessages = new HashMap<>(); Map coordinatorStreamMessages = metadataStore.all(); + List keyDeduplicationReference = new ArrayList<>(); coordinatorStreamMessages.forEach((coordinatorMessageKeyAsJson, value) -> { CoordinatorMessageKey coordinatorMessageKey = CoordinatorStreamStore.deserializeCoordinatorMessageKeyFromJson(coordinatorMessageKeyAsJson); if (Objects.equals(namespace, coordinatorMessageKey.getNamespace())) { if (value != null) { + if (namespace.equals(SET_TASK_PARTITION_ASSIGNMENT) && coordinatorMessageKey.getKey().contains(KEY_BUCKET)) { + keyDeduplicationReference.add(coordinatorMessageKey.getKey().replace(KEY_BUCKET, WITHOUT_KEY_BUCKET)); + } bootstrappedMessages.put(coordinatorMessageKey.getKey(), value); } else { bootstrappedMessages.remove(coordinatorMessageKey.getKey()); } } }); + for (String dedupKey: keyDeduplicationReference) { + bootstrappedMessages.remove(dedupKey); + } return bootstrappedMessages; }