Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AKCORE-167: WriteState RPC complete implementation. #1274

Draft
wants to merge 2 commits into
base: kip-932
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions checkstyle/import-control.xml
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@
<allow pkg="org.apache.kafka.common.metrics" />
<allow pkg="org.apache.kafka.server.metrics" />
<allow pkg="org.apache.kafka.clients" />
<allow pkg="org.apache.kafka.server.group.share" />
</subpackage>
</subpackage>
</subpackage>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public static class Builder extends AbstractRequest.Builder<WriteShareGroupState
private final WriteShareGroupStateRequestData data;

public Builder(WriteShareGroupStateRequestData data) {
this(data, false);
this(data, true);
}

public Builder(WriteShareGroupStateRequestData data, boolean enableUnstableLastVersion) {
Expand Down
14 changes: 7 additions & 7 deletions core/src/main/scala/kafka/server/BrokerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import org.apache.kafka.common.utils.{LogContext, Time}
import org.apache.kafka.common.{ClusterResource, KafkaException, TopicPartition, Uuid}
import org.apache.kafka.coordinator.group
import org.apache.kafka.coordinator.group.metrics.{GroupCoordinatorMetrics, GroupCoordinatorRuntimeMetrics}
import org.apache.kafka.coordinator.group.share.{ShareCoordinator, ShareCoordinatorConfig, ShareCoordinatorMetrics, ShareCoordinatorRuntimeMetrics, ShareCoordinatorService}
import org.apache.kafka.coordinator.group.share.{ShareCoordinator, ShareCoordinatorConfig, ShareCoordinatorMetrics, ShareCoordinatorRuntimeMetrics, ShareCoordinatorService, ShareRecordSerde}
import org.apache.kafka.coordinator.group.{GroupCoordinator, GroupCoordinatorConfig, GroupCoordinatorService, RecordSerde}
import org.apache.kafka.image.publisher.MetadataPublisher
import org.apache.kafka.metadata.{BrokerState, ListenerInfo, VersionRange}
Expand Down Expand Up @@ -346,10 +346,8 @@ class BrokerServer(
() => metadataCache.getAliveBrokerNodes(config.interBrokerListenerName).asJava, Time.SYSTEM)
persister.configure(new PersisterConfig(persisterStateManager))

val serde = new RecordSerde

groupCoordinator = createGroupCoordinator(serde, persister)
shareCoordinator = createShareCoordinator(serde)
groupCoordinator = createGroupCoordinator(persister)
shareCoordinator = createShareCoordinator()

val producerIdManagerSupplier = () => ProducerIdManager.rpc(
config.brokerId,
Expand Down Expand Up @@ -580,7 +578,7 @@ class BrokerServer(
}
}

private def createGroupCoordinator(serde: RecordSerde, persister: Persister): GroupCoordinator = { //todo smjn: pass persister to group coord
private def createGroupCoordinator(persister: Persister): GroupCoordinator = { //todo smjn: pass persister to group coord
// Create group coordinator, but don't start it until we've started replica manager.
// Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good
// to fix the underlying issue.
Expand Down Expand Up @@ -609,6 +607,7 @@ class BrokerServer(
"group-coordinator-reaper",
new SystemTimer("group-coordinator")
)
val serde = new RecordSerde
val loader = new CoordinatorLoaderImpl[group.Record](
time,
replicaManager,
Expand Down Expand Up @@ -639,7 +638,7 @@ class BrokerServer(
}
}

private def createShareCoordinator(serde: RecordSerde): ShareCoordinator = {
private def createShareCoordinator(): ShareCoordinator = {
if (!config.isShareGroupEnabled) {
return null
}
Expand All @@ -654,6 +653,7 @@ class BrokerServer(
"share-coordinator-reaper",
new SystemTimer("share-coordinator")
)
val serde = new ShareRecordSerde
val loader = new CoordinatorLoaderImpl[group.Record](
time,
replicaManager,
Expand Down
8 changes: 8 additions & 0 deletions core/src/main/scala/kafka/server/KafkaApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ class KafkaApis(val requestChannel: RequestChannel,
case ApiKeys.SHARE_GROUP_HEARTBEAT => handleShareGroupHeartbeat(request).exceptionally(handleError)
case ApiKeys.SHARE_GROUP_DESCRIBE => handleShareGroupDescribe(request).exceptionally(handleError)
case ApiKeys.SHARE_ACKNOWLEDGE => handleShareAcknowledgeRequest(request)
case ApiKeys.WRITE_SHARE_GROUP_STATE => handleShareGroupStateWrite(request)
case _ => throw new IllegalStateException(s"No handler for request api key ${request.header.apiKey}")
}
} catch {
Expand Down Expand Up @@ -4800,6 +4801,13 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}

private def handleShareGroupStateWrite(request: RequestChannel.Request): Unit = {
val writeShareRequest = request.body[WriteShareGroupStateRequest]
//todo smjn: add auth check for group, topic
val writeShareData = shareCoordinator.writeState(request.context, writeShareRequest.data).get()
requestHelper.sendMaybeThrottle(request, new WriteShareGroupStateResponse(writeShareData))
}

private def updateRecordConversionStats(request: RequestChannel.Request,
tp: TopicPartition,
conversionStats: RecordValidationStats): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,17 @@ class BrokerMetadataPublisher(
s"coordinator with local changes in $deltaName", t)
}
//todo smjn: add code for share coord
try {
updateCoordinator(newImage,
delta,
Topic.SHARE_GROUP_STATE_TOPIC_NAME,
shareCoordinator.onElection,
(partitionIndex, leaderEpochOpt) => shareCoordinator.onResignation(partitionIndex, toOptionalInt(leaderEpochOpt))
)
} catch {
case t: Throwable => metadataPublishingFaultHandler.handleFault("Error updating share " +
s"coordinator with local changes in $deltaName", t)
}
try {
// Notify the group coordinator about deleted topics.
val deletedTopicPartitions = new mutable.ArrayBuffer[TopicPartition]()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,10 @@
import org.apache.kafka.coordinator.group.classic.ClassicGroup;
import org.apache.kafka.coordinator.group.generated.ShareGroupMemberMetadataKey;
import org.apache.kafka.coordinator.group.generated.ShareGroupMemberMetadataValue;
import org.apache.kafka.coordinator.group.generated.ShareSnapshotKey;
import org.apache.kafka.coordinator.group.generated.ShareSnapshotValue;
import org.apache.kafka.coordinator.group.share.ShareGroupMember;
import org.apache.kafka.coordinator.group.share.ShareGroupOffset;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.MetadataVersion;

Expand Down Expand Up @@ -641,6 +644,29 @@ public static Record newShareMemberSubscriptionTombstoneRecord(
);
}

public static Record newShareSnapshotRecord(String groupId, Uuid topicId, int partitionId, ShareGroupOffset offsetData) {
return new Record(
new ApiMessageAndVersion(new ShareSnapshotKey()
.setGroupId(groupId)
.setTopicId(topicId)
.setPartition(partitionId),
(short) 0),
new ApiMessageAndVersion(new ShareSnapshotValue()
.setSnapshotEpoch(offsetData.snapshotEpoch)
.setStateEpoch(offsetData.stateEpoch)
.setLeaderEpoch(offsetData.leaderEpoch)
.setStartOffset(offsetData.startOffset)
.setStateBatches(offsetData.stateBatches.stream()
.map(batch -> new ShareSnapshotValue.StateBatch()
.setFirstOffset(batch.firstOffset())
.setLastOffset(batch.lastOffset())
.setDeliveryCount(batch.deliveryCount())
.setDeliveryState(batch.deliveryState()))
.collect(Collectors.toList())),
(short) 0)
);
}

private static List<ConsumerGroupCurrentMemberAssignmentValue.TopicPartitions> toTopicPartitions(
Map<Uuid, Set<Integer>> topicPartitions
) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ private void readMessage(ApiMessage message, ByteBuffer buffer, short version, S
}
}

private ApiMessage apiMessageKeyFor(short recordType) {
protected ApiMessage apiMessageKeyFor(short recordType) {
switch (recordType) {
case 0:
case 1:
Expand All @@ -147,7 +147,7 @@ private ApiMessage apiMessageKeyFor(short recordType) {
}
}

private ApiMessage apiMessageValueFor(short recordType) {
protected ApiMessage apiMessageValueFor(short recordType) {
switch (recordType) {
case 0:
case 1:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.coordinator.group.share;

import org.apache.kafka.common.Uuid;

public class GroupTopicPartitionLeader {
public final String groupId;
public final Uuid topicId;
public final int partition;
public final int leaderEpoch;

public GroupTopicPartitionLeader(String groupId, Uuid topicId, int partition, int leaderEpoch) {
this.groupId = groupId;
this.topicId = topicId;
this.partition = partition;
this.leaderEpoch = leaderEpoch;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,13 @@
package org.apache.kafka.coordinator.group.share;

import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.message.WriteShareGroupStateRequestData;
import org.apache.kafka.common.message.WriteShareGroupStateResponseData;
import org.apache.kafka.common.requests.RequestContext;

import java.util.OptionalInt;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.function.IntSupplier;

@InterfaceStability.Evolving
Expand Down Expand Up @@ -51,4 +56,10 @@ public interface ShareCoordinator {
* Stop the share coordinator
*/
void shutdown();

CompletableFuture<WriteShareGroupStateResponseData> writeState(RequestContext context, WriteShareGroupStateRequestData request);

void onElection(int partitionIndex, int partitionLeaderEpoch);

void onResignation(int partitionIndex, OptionalInt partitionLeaderEpoch);
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,13 @@

package org.apache.kafka.coordinator.group.share;

import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.internals.Topic;
import org.apache.kafka.common.message.WriteShareGroupStateRequestData;
import org.apache.kafka.common.message.WriteShareGroupStateResponseData;
import org.apache.kafka.common.requests.RequestContext;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
Expand All @@ -29,14 +35,23 @@
import org.apache.kafka.coordinator.group.runtime.CoordinatorShardBuilderSupplier;
import org.apache.kafka.coordinator.group.runtime.MultiThreadedEventProcessor;
import org.apache.kafka.coordinator.group.runtime.PartitionWriter;
import org.apache.kafka.server.group.share.ShareGroupHelper;
import org.apache.kafka.server.record.BrokerCompressionType;
import org.apache.kafka.server.util.timer.Timer;
import org.slf4j.Logger;

import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.OptionalInt;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.IntSupplier;
import java.util.stream.Collectors;

public class ShareCoordinatorService implements ShareCoordinator {
private final ShareCoordinatorConfig config;
Expand Down Expand Up @@ -208,4 +223,86 @@ public void shutdown() {
Utils.closeQuietly(shareCoordinatorMetrics, "share coordinator metrics");
log.info("Shutdown complete.");
}

@Override
public CompletableFuture<WriteShareGroupStateResponseData> writeState(RequestContext context, WriteShareGroupStateRequestData request) {
log.info("ShareCoordinatorService writeState request received - {}", request);
String groupId = request.groupId();
Map<Uuid, Map<Integer, CompletableFuture<WriteShareGroupStateResponseData>>> futureMap = new HashMap<>();

// The request received here could have multiple keys of structure group:topic:partition. However,
// the writeState method in ShareCoordinatorShard expects a single key in the request. Hence, we will
// be looping over the keys below and constructing new WriteShareGroupStateRequestData objects to pass
// onto the shard method.
request.topics().forEach(topicData -> {
Map<Integer, CompletableFuture<WriteShareGroupStateResponseData>> partitionFut =
futureMap.computeIfAbsent(topicData.topicId(), k -> new HashMap<>());
topicData.partitions().forEach(
partitionData -> partitionFut.put(partitionData.partition(), runtime.scheduleWriteOperation(
"write-share-group-state",
topicPartitionFor(ShareGroupHelper.coordinatorKey(groupId, topicData.topicId(), partitionData.partition())),
Duration.ofMillis(config.writeTimeoutMs),
coordinator -> coordinator.writeState(context, new WriteShareGroupStateRequestData()
.setGroupId(groupId)
.setTopics(Collections.singletonList(new WriteShareGroupStateRequestData.WriteStateData()
.setTopicId(topicData.topicId())
.setPartitions(Collections.singletonList(new WriteShareGroupStateRequestData.PartitionData()
.setPartition(partitionData.partition())
.setStartOffset(partitionData.startOffset())
.setLeaderEpoch(partitionData.leaderEpoch())
.setStateEpoch(partitionData.stateEpoch())
.setStateBatches(partitionData.stateBatches()))))))))
);
});


// Combine all futures into a single CompletableFuture<Void>
CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(futureMap.values().stream()
.flatMap(partMap -> partMap.values().stream()).toArray(CompletableFuture[]::new));

return combinedFuture.thenApply(v -> {
List<WriteShareGroupStateResponseData.WriteStateResult> writeStateResults = futureMap.keySet().stream()
.map(topicId -> {
List<WriteShareGroupStateResponseData.PartitionResult> partitionResults = futureMap.get(topicId).values().stream()
.map(future -> {
try {
WriteShareGroupStateResponseData partitionData = future.get();
// error check if the partitionData results contains only 1 row (corresponding to topicId)
return partitionData.results().get(0).partitions();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
})
.flatMap(List::stream)
.collect(Collectors.toList());

return new WriteShareGroupStateResponseData.WriteStateResult()
.setTopicId(topicId)
.setPartitions(partitionResults);
})
.collect(Collectors.toList());
return new WriteShareGroupStateResponseData()
.setResults(writeStateResults);
});
}

@Override
public void onElection(int partitionIndex, int partitionLeaderEpoch) {
runtime.scheduleLoadOperation(
new TopicPartition(Topic.SHARE_GROUP_STATE_TOPIC_NAME, partitionIndex),
partitionLeaderEpoch
);
}

@Override
public void onResignation(int partitionIndex, OptionalInt partitionLeaderEpoch) {
runtime.scheduleUnloadOperation(
new TopicPartition(Topic.SHARE_GROUP_STATE_TOPIC_NAME, partitionIndex),
partitionLeaderEpoch
);
}

private TopicPartition topicPartitionFor(String key) {
return new TopicPartition(Topic.SHARE_GROUP_STATE_TOPIC_NAME, partitionFor(key));
}
}
Loading