Skip to content

Commit

Permalink
KAFKA-18399 Remove ZooKeeper from KafkaApis (6/N): `handleCreateToken…
Browse files Browse the repository at this point in the history
…Request`, `handleRenewTokenRequestZk`, `handleExpireTokenRequestZk` (apache#18447)

Reviewers: Chia-Ping Tsai <[email protected]>
  • Loading branch information
m1a2st authored Jan 12, 2025
1 parent 6a8ffe7 commit 33556ae
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 142 deletions.
121 changes: 7 additions & 114 deletions core/src/main/scala/kafka/server/KafkaApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams, FetchPa
import org.apache.kafka.storage.internals.log.AppendOrigin
import org.apache.kafka.storage.log.metrics.BrokerTopicStats

import java.nio.ByteBuffer
import java.time.Duration
import java.util
import java.util.concurrent.atomic.AtomicInteger
Expand Down Expand Up @@ -2570,92 +2569,6 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}

def handleCreateTokenRequestZk(request: RequestChannel.Request): Unit = {
val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.shouldAlwaysForward(request))

val createTokenRequest = request.body[CreateDelegationTokenRequest]

// the callback for sending a create token response
def sendResponseCallback(createResult: CreateTokenResult): Unit = {
trace(s"Sending create token response for correlation id ${request.header.correlationId} " +
s"to client ${request.header.clientId}.")
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
CreateDelegationTokenResponse.prepareResponse(request.context.requestVersion, requestThrottleMs, createResult.error, createResult.owner,
createResult.tokenRequester, createResult.issueTimestamp, createResult.expiryTimestamp, createResult.maxTimestamp, createResult.tokenId,
ByteBuffer.wrap(createResult.hmac)))
}

val ownerPrincipalName = createTokenRequest.data.ownerPrincipalName
val owner = if (ownerPrincipalName == null || ownerPrincipalName.isEmpty) {
request.context.principal
} else {
new KafkaPrincipal(createTokenRequest.data.ownerPrincipalType, ownerPrincipalName)
}
val requester = request.context.principal
val renewerList = createTokenRequest.data.renewers.asScala.toList.map(entry =>
new KafkaPrincipal(entry.principalType, entry.principalName))

// DelegationToken changes only need to be executed on the controller during migration
if (!zkSupport.controller.isActive) {
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
CreateDelegationTokenResponse.prepareResponse(request.context.requestVersion, requestThrottleMs,
Errors.NOT_CONTROLLER, owner, requester))
} else {
tokenManager.createToken(
owner,
requester,
renewerList,
createTokenRequest.data.maxLifetimeMs,
sendResponseCallback)
}
}

def handleRenewTokenRequest(request: RequestChannel.Request): Unit = {
if (!allowTokenRequests(request)) {
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
new RenewDelegationTokenResponse(
new RenewDelegationTokenResponseData()
.setThrottleTimeMs(requestThrottleMs)
.setErrorCode(Errors.DELEGATION_TOKEN_REQUEST_NOT_ALLOWED.code)
.setExpiryTimestampMs(DelegationTokenManager.ErrorTimestamp)))
} else {
forwardToController(request)
}
}

def handleRenewTokenRequestZk(request: RequestChannel.Request): Unit = {
val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.shouldAlwaysForward(request))

val renewTokenRequest = request.body[RenewDelegationTokenRequest]

// the callback for sending a renew token response
def sendResponseCallback(error: Errors, expiryTimestamp: Long): Unit = {
trace("Sending renew token response for correlation id %d to client %s."
.format(request.header.correlationId, request.header.clientId))
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
new RenewDelegationTokenResponse(
new RenewDelegationTokenResponseData()
.setThrottleTimeMs(requestThrottleMs)
.setErrorCode(error.code)
.setExpiryTimestampMs(expiryTimestamp)))
}
// DelegationToken changes only need to be executed on the controller during migration
if (!zkSupport.controller.isActive) {
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
new RenewDelegationTokenResponse(
new RenewDelegationTokenResponseData()
.setThrottleTimeMs(requestThrottleMs)
.setErrorCode(Errors.NOT_CONTROLLER.code)))
} else {
tokenManager.renewToken(
request.context.principal,
ByteBuffer.wrap(renewTokenRequest.data.hmac),
renewTokenRequest.data.renewPeriodMs,
sendResponseCallback
)
}
}

def handleExpireTokenRequest(request: RequestChannel.Request): Unit = {
if (!allowTokenRequests(request)) {
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
Expand All @@ -2669,36 +2582,16 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}

def handleExpireTokenRequestZk(request: RequestChannel.Request): Unit = {
val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.shouldAlwaysForward(request))

val expireTokenRequest = request.body[ExpireDelegationTokenRequest]

// the callback for sending a expire token response
def sendResponseCallback(error: Errors, expiryTimestamp: Long): Unit = {
trace("Sending expire token response for correlation id %d to client %s."
.format(request.header.correlationId, request.header.clientId))
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
new ExpireDelegationTokenResponse(
new ExpireDelegationTokenResponseData()
.setThrottleTimeMs(requestThrottleMs)
.setErrorCode(error.code)
.setExpiryTimestampMs(expiryTimestamp)))
}
// DelegationToken changes only need to be executed on the controller during migration
if (!zkSupport.controller.isActive) {
def handleRenewTokenRequest(request: RequestChannel.Request): Unit = {
if (!allowTokenRequests(request)) {
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
new ExpireDelegationTokenResponse(
new ExpireDelegationTokenResponseData()
new RenewDelegationTokenResponse(
new RenewDelegationTokenResponseData()
.setThrottleTimeMs(requestThrottleMs)
.setErrorCode(Errors.NOT_CONTROLLER.code)))
.setErrorCode(Errors.DELEGATION_TOKEN_REQUEST_NOT_ALLOWED.code)
.setExpiryTimestampMs(DelegationTokenManager.ErrorTimestamp)))
} else {
tokenManager.expireToken(
request.context.principal,
expireTokenRequest.hmac(),
expireTokenRequest.expiryTimePeriod(),
sendResponseCallback
)
forwardToController(request)
}
}

Expand Down
29 changes: 1 addition & 28 deletions core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10325,34 +10325,7 @@ class KafkaApisTest extends Logging {
setResourceType(BROKER_LOGGER.id()))),
response.data())
}

@Test
// Test that in KRaft mode, a request that isn't forwarded gets the correct error message.
// We skip the pre-forward checks in handleCreateTokenRequest
def testRaftShouldAlwaysForwardCreateTokenRequest(): Unit = {
metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0)
kafkaApis = createKafkaApis(raftSupport = true)
verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleCreateTokenRequestZk)
}

@Test
// Test that in KRaft mode, a request that isn't forwarded gets the correct error message.
// We skip the pre-forward checks in handleRenewTokenRequest
def testRaftShouldAlwaysForwardRenewTokenRequest(): Unit = {
metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0)
kafkaApis = createKafkaApis(raftSupport = true)
verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleRenewTokenRequestZk)
}

@Test
// Test that in KRaft mode, a request that isn't forwarded gets the correct error message.
// We skip the pre-forward checks in handleExpireTokenRequest
def testRaftShouldAlwaysForwardExpireTokenRequest(): Unit = {
metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0)
kafkaApis = createKafkaApis(raftSupport = true)
verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleExpireTokenRequestZk)
}


@Test
def testRaftShouldAlwaysForwardAlterClientQuotasRequest(): Unit = {
metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0)
Expand Down

0 comments on commit 33556ae

Please sign in to comment.