Skip to content
This repository has been archived by the owner on Jun 7, 2024. It is now read-only.

Commit

Permalink
Rebalance subscription's sessions on DLQ event type attachment to top…
Browse files Browse the repository at this point in the history
…ology (#1581)

Refactoring: move subscription's sessions rebalance part of the `ZkSubscriptionClient`

---------

Co-authored-by: Aleksey Pak <[email protected]>
  • Loading branch information
1u0 and 1u0 authored Dec 19, 2023
1 parent 5dd37c6 commit 208c5c4
Show file tree
Hide file tree
Showing 8 changed files with 42 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import org.zalando.nakadi.domain.UnprocessableEventPolicy;
import org.zalando.nakadi.exceptions.runtime.AccessDeniedException;
import org.zalando.nakadi.exceptions.runtime.NakadiRuntimeException;
import org.zalando.nakadi.exceptions.runtime.RebalanceConflictException;
import org.zalando.nakadi.repository.kafka.KafkaRecordDeserializer;
import org.zalando.nakadi.service.AuthorizationValidator;
import org.zalando.nakadi.service.ConsumptionKpiCollector;
Expand All @@ -29,7 +28,6 @@
import org.zalando.nakadi.service.FeatureToggleService;
import org.zalando.nakadi.service.publishing.EventPublisher;
import org.zalando.nakadi.service.subscription.autocommit.AutocommitSupport;
import org.zalando.nakadi.service.subscription.model.Partition;
import org.zalando.nakadi.service.subscription.model.Session;
import org.zalando.nakadi.service.subscription.state.CleanupState;
import org.zalando.nakadi.service.subscription.state.DummyState;
Expand All @@ -42,15 +40,13 @@

import java.io.Closeable;
import java.io.IOException;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;

public class StreamingContext implements SubscriptionStreamer {

Expand All @@ -67,7 +63,6 @@ public class StreamingContext implements SubscriptionStreamer {
private final EventStreamChecks eventStreamChecks;
private final ScheduledExecutorService timer;
private final BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>();
private final BiFunction<Collection<Session>, Partition[], Partition[]> rebalancer;
private final CursorConverter cursorConverter;
private final Subscription subscription;
private final MetricRegistry metricRegistry;
Expand Down Expand Up @@ -102,7 +97,6 @@ private StreamingContext(final Builder builder) {
this.out = builder.out;
this.parameters = builder.parameters;
this.session = builder.session;
this.rebalancer = builder.rebalancer;
this.timer = builder.timer;
this.zkClient = builder.zkClient;
this.kafkaPollTimeout = builder.kafkaPollTimeout;
Expand Down Expand Up @@ -363,16 +357,7 @@ private void rebalance() {
if (null != sessionListSubscription) {
// This call is needed to renew subscription for session list changes.
sessionListSubscription.getData();
zkClient.updateTopology(topology -> {
try {
return rebalancer.apply(
zkClient.listSessions(),
topology.getPartitions());
} catch (final RebalanceConflictException e) {
LOG.warn("failed to rebalance partitions: {}", e.getMessage(), e);
return new Partition[0];
}
});
zkClient.rebalanceSessions();
}
}

Expand Down Expand Up @@ -437,7 +422,6 @@ public static final class Builder {
private Session session;
private ScheduledExecutorService timer;
private ZkSubscriptionClient zkClient;
private BiFunction<Collection<Session>, Partition[], Partition[]> rebalancer;
private long kafkaPollTimeout;
private CursorTokenService cursorTokenService;
private ObjectMapper objectMapper;
Expand Down Expand Up @@ -505,11 +489,6 @@ public Builder setZkClient(final ZkSubscriptionClient zkClient) {
return this;
}

public Builder setRebalancer(final BiFunction<Collection<Session>, Partition[], Partition[]> rebalancer) {
this.rebalancer = rebalancer;
return this;
}

public Builder setKafkaPollTimeout(final long kafkaPollTimeout) {
this.kafkaPollTimeout = kafkaPollTimeout;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,6 @@ public SubscriptionStreamer build(
.setSession(session)
.setTimer(executorService)
.setZkClient(zkClient)
.setRebalancer(new SubscriptionRebalancer())
.setKafkaPollTimeout(kafkaPollTimeout)
.setCursorTokenService(cursorTokenService)
.setObjectMapper(objectMapper)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ public OutputStream getOutputStream() {
.setSubscription(new Subscription())
.setTimer(null)
.setZkClient(zkClient)
.setRebalancer(null)
.setKafkaPollTimeout(0)
.setCursorTokenService(null)
.setObjectMapper(null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

@Service
Expand Down Expand Up @@ -147,33 +146,31 @@ private void addDlqPartitionsToSubscription(
final Subscription subscription,
final List<Partition> unassignedDlqPartitions) throws Exception {

final ZkSubscriptionClient client = subscriptionClientFactory.createClient(subscription);
try (ZkSubscriptionClient client = subscriptionClientFactory.createClient(subscription)) {

// idempotent, does not overwrite existing offsets
client.createOffsetZNodes(
unassignedDlqPartitions.stream()
.map(p -> p.getPartition())
.map(p -> new Cursor(p, Cursor.BEFORE_OLDEST_OFFSET))
.map(c -> cursorConverter.convert(dlqRedriveEventTypeName, c))
.map(nc -> cursorConverter.convertToNoToken(nc))
.collect(Collectors.toList()));
// idempotent, does not overwrite existing offsets
client.createOffsetZNodes(
unassignedDlqPartitions.stream()
.map(p -> p.getPartition())
.map(p -> new Cursor(p, Cursor.BEFORE_OLDEST_OFFSET))
.map(c -> cursorConverter.convert(dlqRedriveEventTypeName, c))
.map(nc -> cursorConverter.convertToNoToken(nc))
.collect(Collectors.toList()));

final boolean[] hasNewPartitions = {false};
client.updateTopology(topology -> {
final boolean[] hasNewPartitions = {false};
client.updateTopology(topology -> {
final var newPartitions = selectMissingPartitions(topology.getPartitions(), unassignedDlqPartitions);
if (!hasNewPartitions[0] && newPartitions.length != 0) {
hasNewPartitions[0] = true;
}
return newPartitions;
});

// shortcut to enable new partitions for streaming, otherwise they will stay unassigned.
// the better way is to rebalance them.
if (hasNewPartitions[0]) {
client.closeSubscriptionStreams(
() -> LOG.info("Subscription `{}` streams were closed due to addition of " +
"Nakadi DLQ Event Type", subscription.getId()),
TimeUnit.SECONDS.toMillis(nakadiSettings.getMaxCommitTimeout()));
});

if (hasNewPartitions[0]) {
LOG.info("Rebalancing `{}` subscription's sessions due to addition of " +
"Nakadi DLQ Event Type", subscription.getId());
client.rebalanceSessions();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.slf4j.LoggerFactory;
import org.zalando.nakadi.domain.EventTypePartition;
import org.zalando.nakadi.exceptions.runtime.NakadiRuntimeException;
import org.zalando.nakadi.exceptions.runtime.RebalanceConflictException;
import org.zalando.nakadi.exceptions.runtime.ServiceTemporarilyUnavailableException;
import org.zalando.nakadi.exceptions.runtime.ZookeeperException;
import org.zalando.nakadi.repository.zookeeper.ZooKeeperHolder;
Expand Down Expand Up @@ -142,6 +143,21 @@ public void updateTopology(final Function<Topology, Partition[]> partitioner)
.withExceptionsThatForceRetry(KeeperException.BadVersionException.class));
}

@Override
public void rebalanceSessions() {
updateTopology(topology -> {
final var rebalancer = new SubscriptionRebalancer();
try {
return rebalancer.apply(
listSessions(),
topology.getPartitions());
} catch (final RebalanceConflictException e) {
LOG.warn("failed to rebalance partitions: {}", e.getMessage(), e);
return new Partition[0];
}
});
}

@Override
public Topology getTopology() throws NakadiRuntimeException,
SubscriptionNotInitializedException {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package org.zalando.nakadi.service.subscription;
package org.zalando.nakadi.service.subscription.zk;

import com.google.common.collect.Lists;
import org.zalando.nakadi.domain.EventTypePartition;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@ Collection<Session> listSessions()

boolean isActiveSession(String streamId) throws ServiceTemporarilyUnavailableException;

/**
* Re-assigns topology's partitions among sessions and updates the topology with new assignment.
*/
void rebalanceSessions();

/**
* Returns subscription {@link Topology} object from Zookeeper
*
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package org.zalando.nakadi.service.subscription;
package org.zalando.nakadi.service.subscription.zk;

import com.google.common.collect.ImmutableList;
import org.junit.Test;
Expand Down

0 comments on commit 208c5c4

Please sign in to comment.