@@ -3182,6 +3182,35 @@ class KafkaApis(val requestChannel: RequestChannel,
31823182 val newReqMetadata : ShareRequestMetadata = new ShareRequestMetadata (Uuid .fromString(memberId), shareSessionEpoch)
31833183 var shareFetchContext : ShareFetchContext = null
31843184
3185+ // KIP-1222 enforces setting the maxBytes, minBytes, maxRecords, maxWaitMs
3186+ // values to 0, in case isRenewAck is true.
3187+ if (shareFetchRequest.version >= 2 && shareFetchRequest.data.isRenewAck) {
3188+ val reqData = shareFetchRequest.data
3189+ var errorMsg : String = " "
3190+ if (reqData.maxBytes != 0 ) {
3191+ errorMsg += " maxBytes must be set to 0, "
3192+ }
3193+
3194+ if (reqData.minBytes != 0 ) {
3195+ errorMsg += " minBytes must be set to 0, "
3196+ }
3197+
3198+ if (reqData.maxRecords != 0 ) {
3199+ errorMsg += " maxRecords must be set to 0, "
3200+ }
3201+
3202+ if (reqData.maxWaitMs != 0 ) {
3203+ errorMsg += " maxWaitMs must be set to 0, "
3204+ }
3205+
3206+ if (errorMsg != " " ) {
3207+ errorMsg += " if isRenewAck is true."
3208+ error(errorMsg)
3209+ requestHelper.sendMaybeThrottle(request, shareFetchRequest.getErrorResponse(AbstractResponse .DEFAULT_THROTTLE_TIME , Errors .INVALID_REQUEST .exception(errorMsg)))
3210+ return CompletableFuture .completedFuture[Unit ](())
3211+ }
3212+ }
3213+
31853214 try {
31863215 // Creating the shareFetchContext for Share Session Handling. if context creation fails, the request is failed directly here.
31873216 shareFetchContext = sharePartitionManager.newContext(groupId, shareFetchData, forgottenTopics, newReqMetadata, isAcknowledgeDataPresent, request.context.connectionId)
@@ -3234,18 +3263,24 @@ class KafkaApis(val requestChannel: RequestChannel,
32343263 authorizedTopics,
32353264 groupId,
32363265 memberId,
3266+ shareFetchRequest.version == 2 ,
3267+ shareFetchRequest.data.isRenewAck
32373268 )
32383269 }
32393270
32403271 // Handling the Fetch from the ShareFetchRequest.
32413272 // Variable to store the topic partition wise result of fetching.
3242- val fetchResult : CompletableFuture [Map [TopicIdPartition , ShareFetchResponseData .PartitionData ]] = handleFetchFromShareFetchRequest(
3243- request,
3244- shareSessionEpoch,
3245- erroneousAndValidPartitionData,
3246- sharePartitionManager,
3247- authorizedTopics
3248- )
3273+
3274+ // Here we are populating fetchResult conditionally because per the design of
3275+ // KIP-1222, if a ShareFetch request contains a RENEW ack type piggybacked then
3276+ // we must forego the record fetching as the amount of time spent in fetching
3277+ // might be more that the acquisition lock timeout which got RENEWed and as a result
3278+ // it'll timeout again, before the response reaches the share consumer.
3279+ val fetchResult : CompletableFuture [Map [TopicIdPartition , ShareFetchResponseData .PartitionData ]] =
3280+ if (shareFetchRequest.version() >= 2 && shareFetchRequest.data.isRenewAck)
3281+ CompletableFuture .completedFuture(mutable.Map .empty[TopicIdPartition , ShareFetchResponseData .PartitionData ])
3282+ else
3283+ handleFetchFromShareFetchRequest(request, shareSessionEpoch, erroneousAndValidPartitionData, sharePartitionManager, authorizedTopics)
32493284
32503285 def combineShareFetchAndShareAcknowledgeResponses (fetchResult : CompletableFuture [Map [TopicIdPartition , ShareFetchResponseData .PartitionData ]],
32513286 acknowledgeResult : CompletableFuture [Map [TopicIdPartition , ShareAcknowledgeResponseData .PartitionData ]],
@@ -3422,9 +3457,11 @@ class KafkaApis(val requestChannel: RequestChannel,
34223457 sharePartitionManagerInstance : SharePartitionManager ,
34233458 authorizedTopics : Set [String ],
34243459 groupId : String ,
3425- memberId : String ): CompletableFuture [Map [TopicIdPartition , ShareAcknowledgeResponseData .PartitionData ]] = {
3460+ memberId : String ,
3461+ supportsRenewAcknowledgements : Boolean ,
3462+ isRenewAck : Boolean ): CompletableFuture [Map [TopicIdPartition , ShareAcknowledgeResponseData .PartitionData ]] = {
34263463
3427- val erroneousTopicIdPartitions = validateAcknowledgementBatches(acknowledgementData, erroneous)
3464+ val erroneousTopicIdPartitions = validateAcknowledgementBatches(acknowledgementData, erroneous, supportsRenewAcknowledgements, isRenewAck )
34283465 erroneousTopicIdPartitions.foreach(tp => acknowledgementData.remove(tp))
34293466
34303467 val interested = mutable.Map [TopicIdPartition , util.List [ShareAcknowledgementBatch ]]()
@@ -3523,7 +3560,7 @@ class KafkaApis(val requestChannel: RequestChannel,
35233560
35243561 val erroneous = mutable.Map [TopicIdPartition , ShareAcknowledgeResponseData .PartitionData ]()
35253562 val acknowledgementDataFromRequest = getAcknowledgeBatchesFromShareAcknowledgeRequest(shareAcknowledgeRequest, topicIdNames, erroneous)
3526- handleAcknowledgements(acknowledgementDataFromRequest, erroneous, sharePartitionManager, authorizedTopics, groupId, memberId)
3563+ handleAcknowledgements(acknowledgementDataFromRequest, erroneous, sharePartitionManager, authorizedTopics, groupId, memberId, shareAcknowledgeRequest.version == 2 , shareAcknowledgeRequest.data.isRenewAck )
35273564 .handle[Unit ] {(result, exception) =>
35283565 if (exception != null ) {
35293566 requestHelper.sendMaybeThrottle(request, shareAcknowledgeRequest.getErrorResponse(AbstractResponse .DEFAULT_THROTTLE_TIME , exception))
@@ -3987,13 +4024,16 @@ class KafkaApis(val requestChannel: RequestChannel,
39874024
39884025 // Visible for Testing
39894026 def validateAcknowledgementBatches (acknowledgementDataFromRequest : mutable.Map [TopicIdPartition , util.List [ShareAcknowledgementBatch ]],
3990- erroneous : mutable.Map [TopicIdPartition , ShareAcknowledgeResponseData .PartitionData ]
4027+ erroneous : mutable.Map [TopicIdPartition , ShareAcknowledgeResponseData .PartitionData ],
4028+ supportsRenewAcknowledgements : Boolean ,
4029+ isRenewAck : Boolean
39914030 ): mutable.Set [TopicIdPartition ] = {
39924031 val erroneousTopicIdPartitions : mutable.Set [TopicIdPartition ] = mutable.Set .empty[TopicIdPartition ]
39934032
39944033 acknowledgementDataFromRequest.foreach { case (tp : TopicIdPartition , acknowledgeBatches : util.List [ShareAcknowledgementBatch ]) =>
39954034 var prevEndOffset = - 1L
39964035 var isErroneous = false
4036+ val maxAcknowledgeType = if (supportsRenewAcknowledgements) 4 else 3
39974037 acknowledgeBatches.forEach { batch =>
39984038 if (! isErroneous) {
39994039 if (batch.firstOffset > batch.lastOffset) {
@@ -4012,7 +4052,11 @@ class KafkaApis(val requestChannel: RequestChannel,
40124052 erroneous += tp -> ShareAcknowledgeResponse .partitionResponse(tp, Errors .INVALID_REQUEST )
40134053 erroneousTopicIdPartitions.add(tp)
40144054 isErroneous = true
4015- } else if (batch.acknowledgeTypes.stream().anyMatch(ackType => ackType < 0 || ackType > 3 )) {
4055+ } else if (batch.acknowledgeTypes.stream().anyMatch(ackType => ackType < 0 || ackType > maxAcknowledgeType)) {
4056+ erroneous += tp -> ShareAcknowledgeResponse .partitionResponse(tp, Errors .INVALID_REQUEST )
4057+ erroneousTopicIdPartitions.add(tp)
4058+ isErroneous = true
4059+ } else if (batch.acknowledgeTypes.stream().anyMatch(ackType => ackType == 4 ) && ! isRenewAck) {
40164060 erroneous += tp -> ShareAcknowledgeResponse .partitionResponse(tp, Errors .INVALID_REQUEST )
40174061 erroneousTopicIdPartitions.add(tp)
40184062 isErroneous = true
0 commit comments