diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index c084b63796520..9d257c39d5922 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -77,7 +77,6 @@ import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams, FetchPa import org.apache.kafka.storage.internals.log.AppendOrigin import org.apache.kafka.storage.log.metrics.BrokerTopicStats -import java.nio.ByteBuffer import java.time.Duration import java.util import java.util.concurrent.atomic.AtomicInteger @@ -2570,92 +2569,6 @@ class KafkaApis(val requestChannel: RequestChannel, } } - def handleCreateTokenRequestZk(request: RequestChannel.Request): Unit = { - val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.shouldAlwaysForward(request)) - - val createTokenRequest = request.body[CreateDelegationTokenRequest] - - // the callback for sending a create token response - def sendResponseCallback(createResult: CreateTokenResult): Unit = { - trace(s"Sending create token response for correlation id ${request.header.correlationId} " + - s"to client ${request.header.clientId}.") - requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => - CreateDelegationTokenResponse.prepareResponse(request.context.requestVersion, requestThrottleMs, createResult.error, createResult.owner, - createResult.tokenRequester, createResult.issueTimestamp, createResult.expiryTimestamp, createResult.maxTimestamp, createResult.tokenId, - ByteBuffer.wrap(createResult.hmac))) - } - - val ownerPrincipalName = createTokenRequest.data.ownerPrincipalName - val owner = if (ownerPrincipalName == null || ownerPrincipalName.isEmpty) { - request.context.principal - } else { - new KafkaPrincipal(createTokenRequest.data.ownerPrincipalType, ownerPrincipalName) - } - val requester = request.context.principal - val renewerList = createTokenRequest.data.renewers.asScala.toList.map(entry => - new KafkaPrincipal(entry.principalType, entry.principalName)) - - // DelegationToken changes only need to be executed on the controller during migration - if (!zkSupport.controller.isActive) { - requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => - CreateDelegationTokenResponse.prepareResponse(request.context.requestVersion, requestThrottleMs, - Errors.NOT_CONTROLLER, owner, requester)) - } else { - tokenManager.createToken( - owner, - requester, - renewerList, - createTokenRequest.data.maxLifetimeMs, - sendResponseCallback) - } - } - - def handleRenewTokenRequest(request: RequestChannel.Request): Unit = { - if (!allowTokenRequests(request)) { - requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => - new RenewDelegationTokenResponse( - new RenewDelegationTokenResponseData() - .setThrottleTimeMs(requestThrottleMs) - .setErrorCode(Errors.DELEGATION_TOKEN_REQUEST_NOT_ALLOWED.code) - .setExpiryTimestampMs(DelegationTokenManager.ErrorTimestamp))) - } else { - forwardToController(request) - } - } - - def handleRenewTokenRequestZk(request: RequestChannel.Request): Unit = { - val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.shouldAlwaysForward(request)) - - val renewTokenRequest = request.body[RenewDelegationTokenRequest] - - // the callback for sending a renew token response - def sendResponseCallback(error: Errors, expiryTimestamp: Long): Unit = { - trace("Sending renew token response for correlation id %d to client %s." - .format(request.header.correlationId, request.header.clientId)) - requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => - new RenewDelegationTokenResponse( - new RenewDelegationTokenResponseData() - .setThrottleTimeMs(requestThrottleMs) - .setErrorCode(error.code) - .setExpiryTimestampMs(expiryTimestamp))) - } - // DelegationToken changes only need to be executed on the controller during migration - if (!zkSupport.controller.isActive) { - requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => - new RenewDelegationTokenResponse( - new RenewDelegationTokenResponseData() - .setThrottleTimeMs(requestThrottleMs) - .setErrorCode(Errors.NOT_CONTROLLER.code))) - } else { - tokenManager.renewToken( - request.context.principal, - ByteBuffer.wrap(renewTokenRequest.data.hmac), - renewTokenRequest.data.renewPeriodMs, - sendResponseCallback - ) - } - } - def handleExpireTokenRequest(request: RequestChannel.Request): Unit = { if (!allowTokenRequests(request)) { requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => @@ -2669,36 +2582,16 @@ class KafkaApis(val requestChannel: RequestChannel, } } - def handleExpireTokenRequestZk(request: RequestChannel.Request): Unit = { - val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.shouldAlwaysForward(request)) - - val expireTokenRequest = request.body[ExpireDelegationTokenRequest] - - // the callback for sending a expire token response - def sendResponseCallback(error: Errors, expiryTimestamp: Long): Unit = { - trace("Sending expire token response for correlation id %d to client %s." - .format(request.header.correlationId, request.header.clientId)) - requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => - new ExpireDelegationTokenResponse( - new ExpireDelegationTokenResponseData() - .setThrottleTimeMs(requestThrottleMs) - .setErrorCode(error.code) - .setExpiryTimestampMs(expiryTimestamp))) - } - // DelegationToken changes only need to be executed on the controller during migration - if (!zkSupport.controller.isActive) { + def handleRenewTokenRequest(request: RequestChannel.Request): Unit = { + if (!allowTokenRequests(request)) { requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => - new ExpireDelegationTokenResponse( - new ExpireDelegationTokenResponseData() + new RenewDelegationTokenResponse( + new RenewDelegationTokenResponseData() .setThrottleTimeMs(requestThrottleMs) - .setErrorCode(Errors.NOT_CONTROLLER.code))) + .setErrorCode(Errors.DELEGATION_TOKEN_REQUEST_NOT_ALLOWED.code) + .setExpiryTimestampMs(DelegationTokenManager.ErrorTimestamp))) } else { - tokenManager.expireToken( - request.context.principal, - expireTokenRequest.hmac(), - expireTokenRequest.expiryTimePeriod(), - sendResponseCallback - ) + forwardToController(request) } } diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index d8532c6cd8b84..eb91e92d9fd58 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -10325,34 +10325,7 @@ class KafkaApisTest extends Logging { setResourceType(BROKER_LOGGER.id()))), response.data()) } - - @Test - // Test that in KRaft mode, a request that isn't forwarded gets the correct error message. - // We skip the pre-forward checks in handleCreateTokenRequest - def testRaftShouldAlwaysForwardCreateTokenRequest(): Unit = { - metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0) - kafkaApis = createKafkaApis(raftSupport = true) - verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleCreateTokenRequestZk) - } - - @Test - // Test that in KRaft mode, a request that isn't forwarded gets the correct error message. - // We skip the pre-forward checks in handleRenewTokenRequest - def testRaftShouldAlwaysForwardRenewTokenRequest(): Unit = { - metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0) - kafkaApis = createKafkaApis(raftSupport = true) - verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleRenewTokenRequestZk) - } - - @Test - // Test that in KRaft mode, a request that isn't forwarded gets the correct error message. - // We skip the pre-forward checks in handleExpireTokenRequest - def testRaftShouldAlwaysForwardExpireTokenRequest(): Unit = { - metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0) - kafkaApis = createKafkaApis(raftSupport = true) - verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleExpireTokenRequestZk) - } - + @Test def testRaftShouldAlwaysForwardAlterClientQuotasRequest(): Unit = { metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0)