diff --git a/api-consumption/src/main/java/org/zalando/nakadi/service/subscription/StreamingContext.java b/api-consumption/src/main/java/org/zalando/nakadi/service/subscription/StreamingContext.java index 42bc82b653..6fdce83224 100644 --- a/api-consumption/src/main/java/org/zalando/nakadi/service/subscription/StreamingContext.java +++ b/api-consumption/src/main/java/org/zalando/nakadi/service/subscription/StreamingContext.java @@ -16,7 +16,6 @@ import org.zalando.nakadi.domain.UnprocessableEventPolicy; import org.zalando.nakadi.exceptions.runtime.AccessDeniedException; import org.zalando.nakadi.exceptions.runtime.NakadiRuntimeException; -import org.zalando.nakadi.exceptions.runtime.RebalanceConflictException; import org.zalando.nakadi.repository.kafka.KafkaRecordDeserializer; import org.zalando.nakadi.service.AuthorizationValidator; import org.zalando.nakadi.service.ConsumptionKpiCollector; @@ -29,7 +28,6 @@ import org.zalando.nakadi.service.FeatureToggleService; import org.zalando.nakadi.service.publishing.EventPublisher; import org.zalando.nakadi.service.subscription.autocommit.AutocommitSupport; -import org.zalando.nakadi.service.subscription.model.Partition; import org.zalando.nakadi.service.subscription.model.Session; import org.zalando.nakadi.service.subscription.state.CleanupState; import org.zalando.nakadi.service.subscription.state.DummyState; @@ -42,7 +40,6 @@ import java.io.Closeable; import java.io.IOException; -import java.util.Collection; import java.util.Comparator; import java.util.List; import java.util.Optional; @@ -50,7 +47,6 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.function.BiFunction; public class StreamingContext implements SubscriptionStreamer { @@ -67,7 +63,6 @@ public class StreamingContext implements SubscriptionStreamer { private final EventStreamChecks eventStreamChecks; private final ScheduledExecutorService timer; private final BlockingQueue taskQueue = new LinkedBlockingQueue<>(); - private final BiFunction, Partition[], Partition[]> rebalancer; private final CursorConverter cursorConverter; private final Subscription subscription; private final MetricRegistry metricRegistry; @@ -102,7 +97,6 @@ private StreamingContext(final Builder builder) { this.out = builder.out; this.parameters = builder.parameters; this.session = builder.session; - this.rebalancer = builder.rebalancer; this.timer = builder.timer; this.zkClient = builder.zkClient; this.kafkaPollTimeout = builder.kafkaPollTimeout; @@ -363,16 +357,7 @@ private void rebalance() { if (null != sessionListSubscription) { // This call is needed to renew subscription for session list changes. sessionListSubscription.getData(); - zkClient.updateTopology(topology -> { - try { - return rebalancer.apply( - zkClient.listSessions(), - topology.getPartitions()); - } catch (final RebalanceConflictException e) { - LOG.warn("failed to rebalance partitions: {}", e.getMessage(), e); - return new Partition[0]; - } - }); + zkClient.rebalanceSessions(); } } @@ -437,7 +422,6 @@ public static final class Builder { private Session session; private ScheduledExecutorService timer; private ZkSubscriptionClient zkClient; - private BiFunction, Partition[], Partition[]> rebalancer; private long kafkaPollTimeout; private CursorTokenService cursorTokenService; private ObjectMapper objectMapper; @@ -505,11 +489,6 @@ public Builder setZkClient(final ZkSubscriptionClient zkClient) { return this; } - public Builder setRebalancer(final BiFunction, Partition[], Partition[]> rebalancer) { - this.rebalancer = rebalancer; - return this; - } - public Builder setKafkaPollTimeout(final long kafkaPollTimeout) { this.kafkaPollTimeout = kafkaPollTimeout; return this; diff --git a/api-consumption/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionStreamerFactory.java b/api-consumption/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionStreamerFactory.java index 3060b18563..e4864d79ac 100644 --- a/api-consumption/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionStreamerFactory.java +++ b/api-consumption/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionStreamerFactory.java @@ -115,7 +115,6 @@ public SubscriptionStreamer build( .setSession(session) .setTimer(executorService) .setZkClient(zkClient) - .setRebalancer(new SubscriptionRebalancer()) .setKafkaPollTimeout(kafkaPollTimeout) .setCursorTokenService(cursorTokenService) .setObjectMapper(objectMapper) diff --git a/api-consumption/src/test/java/org/zalando/nakadi/service/subscription/StreamingContextTest.java b/api-consumption/src/test/java/org/zalando/nakadi/service/subscription/StreamingContextTest.java index 325c82d7bd..c37ddd3e10 100644 --- a/api-consumption/src/test/java/org/zalando/nakadi/service/subscription/StreamingContextTest.java +++ b/api-consumption/src/test/java/org/zalando/nakadi/service/subscription/StreamingContextTest.java @@ -87,7 +87,6 @@ public OutputStream getOutputStream() { .setSubscription(new Subscription()) .setTimer(null) .setZkClient(zkClient) - .setRebalancer(null) .setKafkaPollTimeout(0) .setCursorTokenService(null) .setObjectMapper(null) diff --git a/app/src/main/java/org/zalando/nakadi/service/job/DlqRedriveEventTypeAttachmentJob.java b/app/src/main/java/org/zalando/nakadi/service/job/DlqRedriveEventTypeAttachmentJob.java index 427bd713a2..9ba17c75b3 100644 --- a/app/src/main/java/org/zalando/nakadi/service/job/DlqRedriveEventTypeAttachmentJob.java +++ b/app/src/main/java/org/zalando/nakadi/service/job/DlqRedriveEventTypeAttachmentJob.java @@ -30,7 +30,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @Service @@ -147,33 +146,31 @@ private void addDlqPartitionsToSubscription( final Subscription subscription, final List unassignedDlqPartitions) throws Exception { - final ZkSubscriptionClient client = subscriptionClientFactory.createClient(subscription); + try (ZkSubscriptionClient client = subscriptionClientFactory.createClient(subscription)) { - // idempotent, does not overwrite existing offsets - client.createOffsetZNodes( - unassignedDlqPartitions.stream() - .map(p -> p.getPartition()) - .map(p -> new Cursor(p, Cursor.BEFORE_OLDEST_OFFSET)) - .map(c -> cursorConverter.convert(dlqRedriveEventTypeName, c)) - .map(nc -> cursorConverter.convertToNoToken(nc)) - .collect(Collectors.toList())); + // idempotent, does not overwrite existing offsets + client.createOffsetZNodes( + unassignedDlqPartitions.stream() + .map(p -> p.getPartition()) + .map(p -> new Cursor(p, Cursor.BEFORE_OLDEST_OFFSET)) + .map(c -> cursorConverter.convert(dlqRedriveEventTypeName, c)) + .map(nc -> cursorConverter.convertToNoToken(nc)) + .collect(Collectors.toList())); - final boolean[] hasNewPartitions = {false}; - client.updateTopology(topology -> { + final boolean[] hasNewPartitions = {false}; + client.updateTopology(topology -> { final var newPartitions = selectMissingPartitions(topology.getPartitions(), unassignedDlqPartitions); if (!hasNewPartitions[0] && newPartitions.length != 0) { hasNewPartitions[0] = true; } return newPartitions; - }); - - // shortcut to enable new partitions for streaming, otherwise they will stay unassigned. - // the better way is to rebalance them. - if (hasNewPartitions[0]) { - client.closeSubscriptionStreams( - () -> LOG.info("Subscription `{}` streams were closed due to addition of " + - "Nakadi DLQ Event Type", subscription.getId()), - TimeUnit.SECONDS.toMillis(nakadiSettings.getMaxCommitTimeout())); + }); + + if (hasNewPartitions[0]) { + LOG.info("Rebalancing `{}` subscription's sessions due to addition of " + + "Nakadi DLQ Event Type", subscription.getId()); + client.rebalanceSessions(); + } } } diff --git a/core-common/src/main/java/org/zalando/nakadi/service/subscription/zk/NewZkSubscriptionClient.java b/core-common/src/main/java/org/zalando/nakadi/service/subscription/zk/NewZkSubscriptionClient.java index 79e7a7fa0b..7ae1afd246 100644 --- a/core-common/src/main/java/org/zalando/nakadi/service/subscription/zk/NewZkSubscriptionClient.java +++ b/core-common/src/main/java/org/zalando/nakadi/service/subscription/zk/NewZkSubscriptionClient.java @@ -13,6 +13,7 @@ import org.slf4j.LoggerFactory; import org.zalando.nakadi.domain.EventTypePartition; import org.zalando.nakadi.exceptions.runtime.NakadiRuntimeException; +import org.zalando.nakadi.exceptions.runtime.RebalanceConflictException; import org.zalando.nakadi.exceptions.runtime.ServiceTemporarilyUnavailableException; import org.zalando.nakadi.exceptions.runtime.ZookeeperException; import org.zalando.nakadi.repository.zookeeper.ZooKeeperHolder; @@ -142,6 +143,21 @@ public void updateTopology(final Function partitioner) .withExceptionsThatForceRetry(KeeperException.BadVersionException.class)); } + @Override + public void rebalanceSessions() { + updateTopology(topology -> { + final var rebalancer = new SubscriptionRebalancer(); + try { + return rebalancer.apply( + listSessions(), + topology.getPartitions()); + } catch (final RebalanceConflictException e) { + LOG.warn("failed to rebalance partitions: {}", e.getMessage(), e); + return new Partition[0]; + } + }); + } + @Override public Topology getTopology() throws NakadiRuntimeException, SubscriptionNotInitializedException { diff --git a/api-consumption/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionRebalancer.java b/core-common/src/main/java/org/zalando/nakadi/service/subscription/zk/SubscriptionRebalancer.java similarity index 99% rename from api-consumption/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionRebalancer.java rename to core-common/src/main/java/org/zalando/nakadi/service/subscription/zk/SubscriptionRebalancer.java index a25fa7b94b..3e4bc10e67 100644 --- a/api-consumption/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionRebalancer.java +++ b/core-common/src/main/java/org/zalando/nakadi/service/subscription/zk/SubscriptionRebalancer.java @@ -1,4 +1,4 @@ -package org.zalando.nakadi.service.subscription; +package org.zalando.nakadi.service.subscription.zk; import com.google.common.collect.Lists; import org.zalando.nakadi.domain.EventTypePartition; diff --git a/core-common/src/main/java/org/zalando/nakadi/service/subscription/zk/ZkSubscriptionClient.java b/core-common/src/main/java/org/zalando/nakadi/service/subscription/zk/ZkSubscriptionClient.java index ba80503dab..97e5df60b4 100644 --- a/core-common/src/main/java/org/zalando/nakadi/service/subscription/zk/ZkSubscriptionClient.java +++ b/core-common/src/main/java/org/zalando/nakadi/service/subscription/zk/ZkSubscriptionClient.java @@ -71,6 +71,11 @@ Collection listSessions() boolean isActiveSession(String streamId) throws ServiceTemporarilyUnavailableException; + /** + * Re-assigns topology's partitions among sessions and updates the topology with new assignment. + */ + void rebalanceSessions(); + /** * Returns subscription {@link Topology} object from Zookeeper * diff --git a/api-consumption/src/test/java/org/zalando/nakadi/service/subscription/SubscriptionRebalancerTest.java b/core-common/src/test/java/org/zalando/nakadi/service/subscription/zk/SubscriptionRebalancerTest.java similarity index 99% rename from api-consumption/src/test/java/org/zalando/nakadi/service/subscription/SubscriptionRebalancerTest.java rename to core-common/src/test/java/org/zalando/nakadi/service/subscription/zk/SubscriptionRebalancerTest.java index e7901f96a2..c0dfb9f066 100644 --- a/api-consumption/src/test/java/org/zalando/nakadi/service/subscription/SubscriptionRebalancerTest.java +++ b/core-common/src/test/java/org/zalando/nakadi/service/subscription/zk/SubscriptionRebalancerTest.java @@ -1,4 +1,4 @@ -package org.zalando.nakadi.service.subscription; +package org.zalando.nakadi.service.subscription.zk; import com.google.common.collect.ImmutableList; import org.junit.Test;