Skip to content

Commit

Permalink
KAFKA-18399 Remove ZooKeeper from KafkaApis (5/N): ALTER_PARTITION_RE…
Browse files Browse the repository at this point in the history
…ASSIGNMENTS, LIST_PARTITION_REASSIGNMENTS (apache#18464)

Reviewers: Chia-Ping Tsai <[email protected]>
  • Loading branch information
m1a2st authored Jan 12, 2025
1 parent 3cf2e45 commit 6a8ffe7
Show file tree
Hide file tree
Showing 2 changed files with 0 additions and 102 deletions.
88 changes: 0 additions & 88 deletions core/src/main/scala/kafka/server/KafkaApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package kafka.server

import kafka.controller.ReplicaAssignment
import kafka.coordinator.transaction.{InitProducerIdResult, TransactionCoordinator}
import kafka.network.RequestChannel
import kafka.server.QuotaFactory.{QuotaManagers, UNBOUNDED_QUOTA}
Expand All @@ -37,7 +36,6 @@ import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, SHARE
import org.apache.kafka.common.internals.{FatalExitError, Topic}
import org.apache.kafka.common.message.AddPartitionsToTxnResponseData.{AddPartitionsToTxnResult, AddPartitionsToTxnResultCollection}
import org.apache.kafka.common.message.AlterConfigsResponseData.AlterConfigsResourceResponse
import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData.{ReassignablePartitionResponse, ReassignableTopicResponse}
import org.apache.kafka.common.message.CreatePartitionsResponseData.CreatePartitionsTopicResult
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic
import org.apache.kafka.common.message.CreateTopicsResponseData.{CreatableTopicResult, CreatableTopicResultCollection}
Expand Down Expand Up @@ -2409,92 +2407,6 @@ class KafkaApis(val requestChannel: RequestChannel,
response
}

def handleAlterPartitionReassignmentsRequest(request: RequestChannel.Request): Unit = {
val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.shouldAlwaysForward(request))
authHelper.authorizeClusterOperation(request, ALTER)
val alterPartitionReassignmentsRequest = request.body[AlterPartitionReassignmentsRequest]

def sendResponseCallback(result: Either[Map[TopicPartition, ApiError], ApiError]): Unit = {
val responseData = result match {
case Right(topLevelError) =>
new AlterPartitionReassignmentsResponseData().setErrorMessage(topLevelError.message).setErrorCode(topLevelError.error.code)

case Left(assignments) =>
val topicResponses = assignments.groupBy(_._1.topic).map {
case (topic, reassignmentsByTp) =>
val partitionResponses = reassignmentsByTp.map {
case (topicPartition, error) =>
new ReassignablePartitionResponse().setPartitionIndex(topicPartition.partition)
.setErrorCode(error.error.code).setErrorMessage(error.message)
}
new ReassignableTopicResponse().setName(topic).setPartitions(partitionResponses.toList.asJava)
}
new AlterPartitionReassignmentsResponseData().setResponses(topicResponses.toList.asJava)
}

requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
new AlterPartitionReassignmentsResponse(responseData.setThrottleTimeMs(requestThrottleMs))
)
}

val reassignments = alterPartitionReassignmentsRequest.data.topics.asScala.flatMap {
reassignableTopic => reassignableTopic.partitions.asScala.map {
reassignablePartition =>
val tp = new TopicPartition(reassignableTopic.name, reassignablePartition.partitionIndex)
if (reassignablePartition.replicas == null)
tp -> None // revert call
else
tp -> Some(reassignablePartition.replicas.asScala.map(_.toInt))
}
}.toMap

zkSupport.controller.alterPartitionReassignments(reassignments, sendResponseCallback)
}

def handleListPartitionReassignmentsRequest(request: RequestChannel.Request): Unit = {
val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.shouldAlwaysForward(request))
authHelper.authorizeClusterOperation(request, DESCRIBE)
val listPartitionReassignmentsRequest = request.body[ListPartitionReassignmentsRequest]

def sendResponseCallback(result: Either[Map[TopicPartition, ReplicaAssignment], ApiError]): Unit = {
val responseData = result match {
case Right(error) => new ListPartitionReassignmentsResponseData().setErrorMessage(error.message).setErrorCode(error.error.code)

case Left(assignments) =>
val topicReassignments = assignments.groupBy(_._1.topic).map {
case (topic, reassignmentsByTp) =>
val partitionReassignments = reassignmentsByTp.map {
case (topicPartition, assignment) =>
new ListPartitionReassignmentsResponseData.OngoingPartitionReassignment()
.setPartitionIndex(topicPartition.partition)
.setAddingReplicas(assignment.addingReplicas.toList.asJava.asInstanceOf[java.util.List[java.lang.Integer]])
.setRemovingReplicas(assignment.removingReplicas.toList.asJava.asInstanceOf[java.util.List[java.lang.Integer]])
.setReplicas(assignment.replicas.toList.asJava.asInstanceOf[java.util.List[java.lang.Integer]])
}.toList

new ListPartitionReassignmentsResponseData.OngoingTopicReassignment().setName(topic)
.setPartitions(partitionReassignments.asJava)
}.toList

new ListPartitionReassignmentsResponseData().setTopics(topicReassignments.asJava)
}

requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
new ListPartitionReassignmentsResponse(responseData.setThrottleTimeMs(requestThrottleMs))
)
}

val partitionsOpt = Option(listPartitionReassignmentsRequest.data.topics).map { topics =>
topics.iterator().asScala.flatMap { topic =>
topic.partitionIndexes.iterator().asScala.map { partitionIndex =>
new TopicPartition(topic.name(), partitionIndex)
}
}.toSet
}

zkSupport.controller.listPartitionReassignments(partitionsOpt, sendResponseCallback)
}

private def configsAuthorizationApiError(resource: ConfigResource): ApiError = {
val error = resource.`type` match {
case ConfigResource.Type.BROKER | ConfigResource.Type.BROKER_LOGGER => Errors.CLUSTER_AUTHORIZATION_FAILED
Expand Down
14 changes: 0 additions & 14 deletions core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10288,13 +10288,6 @@ class KafkaApisTest extends Logging {
response.data())
}

@Test
def testRaftShouldAlwaysForwardAlterPartitionReassignmentsRequest(): Unit = {
metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0)
kafkaApis = createKafkaApis(raftSupport = true)
verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleAlterPartitionReassignmentsRequest)
}

@Test
def testEmptyIncrementalAlterConfigsRequestWithKRaft(): Unit = {
val request = buildRequest(new IncrementalAlterConfigsRequest(new IncrementalAlterConfigsRequestData(), 1.toShort))
Expand Down Expand Up @@ -10381,13 +10374,6 @@ class KafkaApisTest extends Logging {
verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleElectLeaders)
}

@Test
def testRaftShouldAlwaysForwardListPartitionReassignments(): Unit = {
metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0)
kafkaApis = createKafkaApis(raftSupport = true)
verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleListPartitionReassignmentsRequest)
}

@Test
def testConsumerGroupHeartbeatReturnsUnsupportedVersion(): Unit = {
val consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequestData().setGroupId("group")
Expand Down

0 comments on commit 6a8ffe7

Please sign in to comment.