From 6ccb2969a5c87b9ee8d5841086ee6b63871b2638 Mon Sep 17 00:00:00 2001 From: "Mr.hulei" Date: Sat, 19 Jul 2025 00:14:39 +0800 Subject: [PATCH 1/2] MINOR:Extract public code --- .../org/apache/kafka/raft/DynamicVoter.java | 123 +++++++----------- 1 file changed, 46 insertions(+), 77 deletions(-) diff --git a/raft/src/main/java/org/apache/kafka/raft/DynamicVoter.java b/raft/src/main/java/org/apache/kafka/raft/DynamicVoter.java index f95431a42c41e..e33926588fb7e 100644 --- a/raft/src/main/java/org/apache/kafka/raft/DynamicVoter.java +++ b/raft/src/main/java/org/apache/kafka/raft/DynamicVoter.java @@ -23,49 +23,25 @@ import java.net.InetSocketAddress; import java.util.Map; -import java.util.Objects; /** * The textual representation of a KIP-853 voter. - * + *

* Since this is used in command-line tools, format changes to the parsing logic require a KIP, * and should be backwards compatible. */ -public final class DynamicVoter { - private final Uuid directoryId; - private final int nodeId; - private final String host; - private final int port; - +public record DynamicVoter(Uuid directoryId, int nodeId, String host, int port) { /** * Create a DynamicVoter object by parsing an input string. * - * @param input The input string. - * - * @return The DynamicVoter object. - * - * @throws IllegalArgumentException If parsing fails. + * @param input The input string. + * @return The DynamicVoter object. + * @throws IllegalArgumentException If parsing fails. */ public static DynamicVoter parse(String input) { input = input.trim(); int atIndex = input.indexOf("@"); - if (atIndex < 0) { - throw new IllegalArgumentException("No @ found in dynamic voter string."); - } - if (atIndex == 0) { - throw new IllegalArgumentException("Invalid @ at beginning of dynamic voter string."); - } - String idString = input.substring(0, atIndex); - int nodeId; - try { - nodeId = Integer.parseInt(idString); - } catch (NumberFormatException e) { - throw new IllegalArgumentException("Failed to parse node id in dynamic voter string.", e); - } - if (nodeId < 0) { - throw new IllegalArgumentException("Invalid negative node id " + nodeId + - " in dynamic voter string."); - } + int nodeId = getNodeId(input, atIndex); input = input.substring(atIndex + 1); if (input.isEmpty()) { throw new IllegalArgumentException("No hostname found after node id."); @@ -92,6 +68,18 @@ public static DynamicVoter parse(String input) { } input = input.substring(1); int endColonIndex = input.indexOf(":"); + int port = getPort(input, endColonIndex); + String directoryIdString = input.substring(endColonIndex + 1); + Uuid directoryId; + try { + directoryId = Uuid.fromString(directoryIdString); + } catch (IllegalArgumentException e) { + throw new IllegalArgumentException("Failed to parse directory ID in dynamic voter string.", e); + } + return new DynamicVoter(directoryId, nodeId, host, port); + } + + private static int getPort(String input, int endColonIndex) { if (endColonIndex < 0) { throw new IllegalArgumentException("No colon following port could be found."); } @@ -105,50 +93,39 @@ public static DynamicVoter parse(String input) { if (port < 0 || port > 65535) { throw new IllegalArgumentException("Invalid port " + port + " in dynamic voter string."); } - String directoryIdString = input.substring(endColonIndex + 1); - Uuid directoryId; + return port; + } + + private static int getNodeId(String input, int atIndex) { + if (atIndex < 0) { + throw new IllegalArgumentException("No @ found in dynamic voter string."); + } + if (atIndex == 0) { + throw new IllegalArgumentException("Invalid @ at beginning of dynamic voter string."); + } + String idString = input.substring(0, atIndex); + int nodeId; try { - directoryId = Uuid.fromString(directoryIdString); - } catch (IllegalArgumentException e) { - throw new IllegalArgumentException("Failed to parse directory ID in dynamic voter string.", e); + nodeId = Integer.parseInt(idString); + } catch (NumberFormatException e) { + throw new IllegalArgumentException("Failed to parse node id in dynamic voter string.", e); } - return new DynamicVoter(directoryId, nodeId, host, port); + if (nodeId < 0) { + throw new IllegalArgumentException("Invalid negative node id " + nodeId + + " in dynamic voter string."); + } + return nodeId; } /** * Create a new KIP-853 voter. * - * @param directoryId The directory ID. - * @param nodeId The voter ID. - * @param host The voter hostname or IP address. - * @param port The voter port. + * @param directoryId The directory ID. + * @param nodeId The voter ID. + * @param host The voter hostname or IP address. + * @param port The voter port. */ - public DynamicVoter( - Uuid directoryId, - int nodeId, - String host, - int port - ) { - this.directoryId = directoryId; - this.nodeId = nodeId; - this.host = host; - this.port = port; - } - - public Uuid directoryId() { - return directoryId; - } - - public int nodeId() { - return nodeId; - } - - public String host() { - return host; - } - - public int port() { - return port; + public DynamicVoter { } public VoterSet.VoterNode toVoterNode(String controllerListenerName) { @@ -166,17 +143,9 @@ public boolean equals(Object o) { if (o == null || (!(o.getClass().equals(DynamicVoter.class)))) return false; DynamicVoter other = (DynamicVoter) o; return directoryId.equals(other.directoryId) && - nodeId == other.nodeId && - host.equals(other.host) && - port == other.port; - } - - @Override - public int hashCode() { - return Objects.hash(directoryId, - nodeId, - host, - port); + nodeId == other.nodeId && + host.equals(other.host) && + port == other.port; } @Override From 464678a1011ac0268ae91c053ef212fd779cb882 Mon Sep 17 00:00:00 2001 From: "Mr.hulei" Date: Mon, 28 Jul 2025 00:15:46 +0800 Subject: [PATCH 2/2] MINOR: Extract public code --- .../apache/kafka/raft/UnattachedState.java | 60 ++-- .../KRaftControlRecordStateMachine.java | 129 ++++---- .../raft/internals/RemoveVoterHandler.java | 45 ++- .../raft/internals/ThresholdPurgatory.java | 23 +- .../raft/internals/UpdateVoterHandler.java | 275 +++++++++--------- 5 files changed, 271 insertions(+), 261 deletions(-) diff --git a/raft/src/main/java/org/apache/kafka/raft/UnattachedState.java b/raft/src/main/java/org/apache/kafka/raft/UnattachedState.java index 6b7e4b700f241..56609ff3103f5 100644 --- a/raft/src/main/java/org/apache/kafka/raft/UnattachedState.java +++ b/raft/src/main/java/org/apache/kafka/raft/UnattachedState.java @@ -30,9 +30,9 @@ /** * A replica is "unattached" when it doesn't know the leader or the leader's endpoint. - * + *

* Typically, a replica doesn't know the leader if the KRaft topic is undergoing an election cycle. - * + *

* It is also possible for a replica to be unattached if it doesn't know the leader's endpoint. * This typically happens when a replica starts up and the known leader id is not part of the local * voter set. In that case, during startup the replica transitions to unattached instead of @@ -52,14 +52,14 @@ public class UnattachedState implements EpochState { private final Logger log; public UnattachedState( - Time time, - int epoch, - OptionalInt leaderId, - Optional votedKey, - Set voters, - Optional highWatermark, - long electionTimeoutMs, - LogContext logContext + Time time, + int epoch, + OptionalInt leaderId, + Optional votedKey, + Set voters, + Optional highWatermark, + long electionTimeoutMs, + LogContext logContext ) { this.epoch = epoch; this.leaderId = leaderId; @@ -75,11 +75,8 @@ public UnattachedState( public ElectionState election() { if (leaderId.isPresent()) { return ElectionState.withElectedLeader(epoch, leaderId.getAsInt(), votedKey, voters); - } else if (votedKey.isPresent()) { - return ElectionState.withVotedCandidate(epoch, votedKey.get(), voters); - } else { - return ElectionState.withUnknownLeader(epoch, voters); - } + } else + return votedKey.map(replicaKey -> ElectionState.withVotedCandidate(epoch, replicaKey, voters)).orElseGet(() -> ElectionState.withUnknownLeader(epoch, voters)); } @Override @@ -123,30 +120,31 @@ public Optional highWatermark() { @Override public boolean canGrantVote(ReplicaKey replicaKey, boolean isLogUpToDate, boolean isPreVote) { return unattachedOrProspectiveCanGrantVote( - leaderId, - votedKey, - epoch, - replicaKey, - isLogUpToDate, - isPreVote, - log + leaderId, + votedKey, + epoch, + replicaKey, + isLogUpToDate, + isPreVote, + log ); } @Override public String toString() { return String.format( - "UnattachedState(epoch=%d, leaderId=%s, votedKey=%s, voters=%s, " + - "electionTimeoutMs=%d, highWatermark=%s)", - epoch, - leaderId, - votedKey, - voters, - electionTimeoutMs, - highWatermark + "UnattachedState(epoch=%d, leaderId=%s, votedKey=%s, voters=%s, " + + "electionTimeoutMs=%d, highWatermark=%s)", + epoch, + leaderId, + votedKey, + voters, + electionTimeoutMs, + highWatermark ); } @Override - public void close() {} + public void close() { + } } diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java b/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java index d9cd47a3a2790..2e81097c2074c 100644 --- a/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java +++ b/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java @@ -40,10 +40,10 @@ /** * The KRaft state machine for tracking control records in the topic partition. - * + *

* This type keeps track of changes to the finalized kraft.version and the sets of voters between * the latest snapshot and the log end offset. - * + *

* There are two type of actors/threads accessing this type. One is the KRaft driver which indirectly call a lot of * the public methods. The other actors/threads are the callers of {@code RaftClient.createSnapshot} which * indirectly call {@code voterSetAtOffset} and {@code kraftVersionAtOffset} when freezing a snapshot. @@ -79,22 +79,22 @@ public final class KRaftControlRecordStateMachine { /** * Constructs an internal log listener * - * @param staticVoterSet the set of voter statically configured - * @param log the on disk topic partition - * @param serde the record decoder for data records - * @param bufferSupplier the supplier of byte buffers + * @param staticVoterSet the set of voter statically configured + * @param log the on disk topic partition + * @param serde the record decoder for data records + * @param bufferSupplier the supplier of byte buffers * @param maxBatchSizeBytes the maximum size of record batch - * @param logContext the log context + * @param logContext the log context */ public KRaftControlRecordStateMachine( - VoterSet staticVoterSet, - ReplicatedLog log, - RecordSerde serde, - BufferSupplier bufferSupplier, - int maxBatchSizeBytes, - LogContext logContext, - KafkaRaftMetrics kafkaRaftMetrics, - ExternalKRaftMetrics externalKRaftMetrics + VoterSet staticVoterSet, + ReplicatedLog log, + RecordSerde serde, + BufferSupplier bufferSupplier, + int maxBatchSizeBytes, + LogContext logContext, + KafkaRaftMetrics kafkaRaftMetrics, + ExternalKRaftMetrics externalKRaftMetrics ) { this.logContext = logContext; this.log = log; @@ -184,7 +184,7 @@ public OptionalLong lastVoterSetOffset() { public KRaftVersion lastKraftVersion() { synchronized (kraftVersionHistory) { return kraftVersionHistory.lastEntry().map(LogHistory.Entry::value). - orElse(KRaftVersion.KRAFT_VERSION_0); + orElse(KRaftVersion.KRAFT_VERSION_0); } } @@ -213,7 +213,7 @@ public KRaftVersion kraftVersionAtOffset(long offset) { synchronized (kraftVersionHistory) { return kraftVersionHistory.valueAtOrBefore(offset). - orElse(KRaftVersion.KRAFT_VERSION_0); + orElse(KRaftVersion.KRAFT_VERSION_0); } } @@ -221,12 +221,12 @@ private void checkOffsetIsValid(long offset) { long fixedNextOffset = nextOffset; if (offset >= fixedNextOffset) { throw new IllegalArgumentException( - String.format( - "Attempting the read a value at an offset (%d) which is greater than or " + - "equal to the largest known offset (%d)", - offset, - fixedNextOffset - 1 - ) + String.format( + "Attempting the read a value at an offset (%d) which is greater than or " + + "equal to the largest known offset (%d)", + offset, + fixedNextOffset - 1 + ) ); } } @@ -241,7 +241,7 @@ private void maybeLoadLog() { maxBatchSizeBytes, true, // Validate batch CRC logContext - ) + ) ) { while (iterator.hasNext()) { Batch batch = iterator.next(); @@ -271,13 +271,13 @@ private void maybeLoadSnapshot() { maxBatchSizeBytes, true, // Validate batch CRC logContext - ) + ) ) { logger.info( - "Loading snapshot ({}) since log start offset ({}) is greater than the internal listener's next offset ({})", - reader.snapshotId(), - log.startOffset(), - nextOffset + "Loading snapshot ({}) since log start offset ({}) is greater than the internal listener's next offset ({})", + reader.snapshotId(), + log.startOffset(), + nextOffset ); OptionalLong currentOffset = OptionalLong.of(reader.lastContainedLogOffset()); while (reader.hasNext()) { @@ -295,42 +295,65 @@ private void maybeLoadSnapshot() { } private void handleBatch(Batch batch, OptionalLong overrideOffset) { + // Determine the base offset for records in this batch. + // If overrideOffset is present, it takes precedence; otherwise, use the batch's base offset. + final long baseOffset = overrideOffset.orElse(batch.baseOffset()); int offsetDelta = 0; for (ControlRecord record : batch.controlRecords()) { - long currentOffset = overrideOffset.orElse(batch.baseOffset() + offsetDelta); + final long currentOffset = baseOffset + offsetDelta; + switch (record.type()) { case KRAFT_VOTERS: - VoterSet voters = VoterSet.fromVotersRecord((VotersRecord) record.message()); - kafkaRaftMetrics.updateNumVoters(voters.size()); - if (!staticVoterSet.isEmpty()) { - externalKRaftMetrics.setIgnoredStaticVoters(true); - } - logger.info("Latest set of voters is {} at offset {}", voters, currentOffset); - synchronized (voterSetHistory) { - voterSetHistory.addAt(currentOffset, voters); - } + handleKraftVotersRecord((VotersRecord) record.message(), currentOffset); break; case KRAFT_VERSION: - KRaftVersion kraftVersion = KRaftVersion.fromFeatureLevel( - ((KRaftVersionRecord) record.message()).kRaftVersion() - ); - logger.info( - "Latest {} is {} at offset {}", - KRaftVersion.FEATURE_NAME, - kraftVersion, - currentOffset - ); - synchronized (kraftVersionHistory) { - kraftVersionHistory.addAt(currentOffset, kraftVersion); - } + handleKraftVersionRecord((KRaftVersionRecord) record.message(), currentOffset); break; default: - // Skip the rest of the control records + // Log and skip any unhandled control record types. + // This makes it clear that other types are intentionally ignored. + logger.debug("Skipping unhandled control record type: {} at offset {}", record.type(), currentOffset); break; } - ++offsetDelta; + offsetDelta++; // Use post-increment for conciseness + } + } + + // Extracted method for handling KRAFT_VOTERS records to improve readability + private void handleKraftVotersRecord(VotersRecord votersRecord, long currentOffset) { + VoterSet voters = VoterSet.fromVotersRecord(votersRecord); + kafkaRaftMetrics.updateNumVoters(voters.size()); + + // Check if staticVoterSet is not empty and update metric if necessary. + // This condition should ideally be placed directly within the if statement. + if (!staticVoterSet.isEmpty()) { + externalKRaftMetrics.setIgnoredStaticVoters(true); + } + + logger.info("Latest set of voters is {} at offset {}", voters, currentOffset); + + // Synchronize access to voterSetHistory when adding a new entry. + synchronized (voterSetHistory) { + voterSetHistory.addAt(currentOffset, voters); + } + } + + // Extracted method for handling KRAFT_VERSION records to improve readability + private void handleKraftVersionRecord(KRaftVersionRecord kraftVersionRecord, long currentOffset) { + KRaftVersion kraftVersion = KRaftVersion.fromFeatureLevel(kraftVersionRecord.kRaftVersion()); + + logger.info( + "Latest {} is {} at offset {}", + KRaftVersion.FEATURE_NAME, + kraftVersion, + currentOffset + ); + + // Synchronize access to kraftVersionHistory when adding a new entry. + synchronized (kraftVersionHistory) { + kraftVersionHistory.addAt(currentOffset, kraftVersion); } } } diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/RemoveVoterHandler.java b/raft/src/main/java/org/apache/kafka/raft/internals/RemoveVoterHandler.java index 2dea86d593bfe..bf2734eb7a06e 100644 --- a/raft/src/main/java/org/apache/kafka/raft/internals/RemoveVoterHandler.java +++ b/raft/src/main/java/org/apache/kafka/raft/internals/RemoveVoterHandler.java @@ -156,33 +156,32 @@ public CompletableFuture handleRemoveVoterRequest( } public void highWatermarkUpdated(LeaderState leaderState) { - leaderState.removeVoterHandlerState().ifPresent(current -> { - leaderState.highWatermark().ifPresent(highWatermark -> { - if (highWatermark.offset() > current.lastOffset()) { - // VotersRecord with the removed voter was committed; complete the RPC - leaderState.resetRemoveVoterHandlerState(Errors.NONE, null, Optional.empty()); + leaderState.removeVoterHandlerState().ifPresent(current -> + leaderState.highWatermark().ifPresent(highWatermark -> { + if (highWatermark.offset() > current.lastOffset()) { + // VotersRecord with the removed voter was committed; complete the RPC + leaderState.resetRemoveVoterHandlerState(Errors.NONE, null, Optional.empty()); - // Resign if the leader is not part of the new committed voter set - VoterSet voters = partitionState.lastVoterSet(); - ReplicaKey localKey = localReplicaKey.orElseThrow( - () -> new IllegalStateException( - String.format( - "Leaders mush have an id and directory id %s", - localReplicaKey - ) + // Resign if the leader is not part of the new committed voter set + VoterSet voters = partitionState.lastVoterSet(); + ReplicaKey localKey = localReplicaKey.orElseThrow( + () -> new IllegalStateException( + String.format( + "Leaders mush have an id and directory id %s", + localReplicaKey ) + ) + ); + if (!voters.isVoter(localKey)) { + logger.info( + "Leader is not in the committed voter set {} resign from epoch {}", + voters.voterKeys(), + leaderState.epoch() ); - if (!voters.isVoter(localKey)) { - logger.info( - "Leader is not in the committed voter set {} resign from epoch {}", - voters.voterKeys(), - leaderState.epoch() - ); - leaderState.requestResign(); - } + leaderState.requestResign(); } - }); - }); + } + })); } } diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/ThresholdPurgatory.java b/raft/src/main/java/org/apache/kafka/raft/internals/ThresholdPurgatory.java index eec3911bc3adf..2ab7ac1a180d3 100644 --- a/raft/src/main/java/org/apache/kafka/raft/internals/ThresholdPurgatory.java +++ b/raft/src/main/java/org/apache/kafka/raft/internals/ThresholdPurgatory.java @@ -71,24 +71,17 @@ public int numWaiting() { return thresholdMap.size(); } - private static class ThresholdKey> implements Comparable> { - private final long id; - private final T threshold; - - private ThresholdKey(long id, T threshold) { - this.id = id; - this.threshold = threshold; - } + private record ThresholdKey>(long id, T threshold) implements Comparable> { @Override - public int compareTo(ThresholdKey o) { - int res = this.threshold.compareTo(o.threshold); - if (res != 0) { - return res; - } else { - return Long.compare(this.id, o.id); + public int compareTo(ThresholdKey o) { + int res = this.threshold.compareTo(o.threshold); + if (res != 0) { + return res; + } else { + return Long.compare(this.id, o.id); + } } } - } } diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/UpdateVoterHandler.java b/raft/src/main/java/org/apache/kafka/raft/internals/UpdateVoterHandler.java index 6f98ff80f6917..cf130613aba0f 100644 --- a/raft/src/main/java/org/apache/kafka/raft/internals/UpdateVoterHandler.java +++ b/raft/src/main/java/org/apache/kafka/raft/internals/UpdateVoterHandler.java @@ -39,19 +39,19 @@ /** * This type implements the protocol for updating a voter from a KRaft partition. - * + *

* 1. Check that the leader has fenced the previous leader(s) by checking that the HWM is known, - * otherwise return the REQUEST_TIMED_OUT error. + * otherwise return the REQUEST_TIMED_OUT error. * 2. Check that the cluster supports kraft.version 1, otherwise return the UNSUPPORTED_VERSION error. * 3. Check that there are no uncommitted voter changes, otherwise return the REQUEST_TIMED_OUT error. * 4. Check that the updated voter still supports the currently finalized kraft.version, otherwise - * return the INVALID_REQUEST error. + * return the INVALID_REQUEST error. * 5. Check that the updated voter is still listening on the default listener. * 6. Update voter set with new voter configuration. - * a. If reconfiguration is supported, append the updated VotersRecord to the log. The KRaft internal listener will read this - * uncommitted record from the log and update the voter in the set of voters. - * b. If reconfiguration is not supported, update the in-memory information for the voter. This will get - * appended to the log when the cluster is upgraded to a kraft version that supports reconfiguration. + * a. If reconfiguration is supported, append the updated VotersRecord to the log. The KRaft internal listener will read this + * uncommitted record from the log and update the voter in the set of voters. + * b. If reconfiguration is not supported, update the in-memory information for the voter. This will get + * appended to the log when the cluster is upgraded to a kraft version that supports reconfiguration. * 7. Send the UpdateVoter successful response to the voter. */ public final class UpdateVoterHandler { @@ -61,10 +61,10 @@ public final class UpdateVoterHandler { private final Logger log; public UpdateVoterHandler( - OptionalInt localId, - KRaftControlRecordStateMachine partitionState, - ListenerName defaultListenerName, - LogContext logContext + OptionalInt localId, + KRaftControlRecordStateMachine partitionState, + ListenerName defaultListenerName, + LogContext logContext ) { this.localId = localId; this.partitionState = partitionState; @@ -73,25 +73,25 @@ public UpdateVoterHandler( } public CompletableFuture handleUpdateVoterRequest( - LeaderState leaderState, - ListenerName requestListenerName, - ReplicaKey voterKey, - Endpoints voterEndpoints, - UpdateRaftVoterRequestData.KRaftVersionFeature supportedKraftVersions, - long currentTimeMs + LeaderState leaderState, + ListenerName requestListenerName, + ReplicaKey voterKey, + Endpoints voterEndpoints, + UpdateRaftVoterRequestData.KRaftVersionFeature supportedKraftVersions, + long currentTimeMs ) { // Check if there are any pending voter change requests if (leaderState.isOperationPending(currentTimeMs)) { return CompletableFuture.completedFuture( - RaftUtil.updateVoterResponse( - Errors.REQUEST_TIMED_OUT, - requestListenerName, - new LeaderAndEpoch( - localId, - leaderState.epoch() - ), - leaderState.leaderEndpoints() - ) + RaftUtil.updateVoterResponse( + Errors.REQUEST_TIMED_OUT, + requestListenerName, + new LeaderAndEpoch( + localId, + leaderState.epoch() + ), + leaderState.leaderEndpoints() + ) ); } @@ -99,15 +99,15 @@ public CompletableFuture handleUpdateVoterRequest( Optional highWatermark = leaderState.highWatermark().map(LogOffsetMetadata::offset); if (highWatermark.isEmpty()) { return CompletableFuture.completedFuture( - RaftUtil.updateVoterResponse( - Errors.REQUEST_TIMED_OUT, - requestListenerName, - new LeaderAndEpoch( - localId, - leaderState.epoch() - ), - leaderState.leaderEndpoints() - ) + RaftUtil.updateVoterResponse( + Errors.REQUEST_TIMED_OUT, + requestListenerName, + new LeaderAndEpoch( + localId, + leaderState.epoch() + ), + leaderState.leaderEndpoints() + ) ); } @@ -132,15 +132,15 @@ public CompletableFuture handleUpdateVoterRequest( * updated kraft version has been written to the log */ return CompletableFuture.completedFuture( - RaftUtil.updateVoterResponse( - Errors.REQUEST_TIMED_OUT, - requestListenerName, - new LeaderAndEpoch( - localId, - leaderState.epoch() - ), - leaderState.leaderEndpoints() - ) + RaftUtil.updateVoterResponse( + Errors.REQUEST_TIMED_OUT, + requestListenerName, + new LeaderAndEpoch( + localId, + leaderState.epoch() + ), + leaderState.leaderEndpoints() + ) ); } voters = inMemoryVoters.map(KRaftVersionUpgrade.Voters::voters); @@ -148,109 +148,106 @@ public CompletableFuture handleUpdateVoterRequest( if (voters.isEmpty()) { log.info("Unable to read the current voter set with kraft version {}", kraftVersion); return CompletableFuture.completedFuture( - RaftUtil.updateVoterResponse( - Errors.REQUEST_TIMED_OUT, - requestListenerName, - new LeaderAndEpoch( - localId, - leaderState.epoch() - ), - leaderState.leaderEndpoints() - ) + RaftUtil.updateVoterResponse( + Errors.REQUEST_TIMED_OUT, + requestListenerName, + new LeaderAndEpoch( + localId, + leaderState.epoch() + ), + leaderState.leaderEndpoints() + ) ); } // Check that the supported version range is valid if (!validVersionRange(kraftVersion, supportedKraftVersions)) { return CompletableFuture.completedFuture( - RaftUtil.updateVoterResponse( - Errors.INVALID_REQUEST, - requestListenerName, - new LeaderAndEpoch( - localId, - leaderState.epoch() - ), - leaderState.leaderEndpoints() - ) + RaftUtil.updateVoterResponse( + Errors.INVALID_REQUEST, + requestListenerName, + new LeaderAndEpoch( + localId, + leaderState.epoch() + ), + leaderState.leaderEndpoints() + ) ); } // Check that endpoints includes the default listener if (voterEndpoints.address(defaultListenerName).isEmpty()) { return CompletableFuture.completedFuture( - RaftUtil.updateVoterResponse( - Errors.INVALID_REQUEST, - requestListenerName, - new LeaderAndEpoch( - localId, - leaderState.epoch() - ), - leaderState.leaderEndpoints() - ) + RaftUtil.updateVoterResponse( + Errors.INVALID_REQUEST, + requestListenerName, + new LeaderAndEpoch( + localId, + leaderState.epoch() + ), + leaderState.leaderEndpoints() + ) ); } // Update the voter Optional updatedVoters = updateVoters( - voters.get(), - kraftVersion, - VoterSet.VoterNode.of( - voterKey, - voterEndpoints, - new SupportedVersionRange( - supportedKraftVersions.minSupportedVersion(), - supportedKraftVersions.maxSupportedVersion() + voters.get(), + kraftVersion, + VoterSet.VoterNode.of( + voterKey, + voterEndpoints, + new SupportedVersionRange( + supportedKraftVersions.minSupportedVersion(), + supportedKraftVersions.maxSupportedVersion() + ) ) - ) ); - if (updatedVoters.isEmpty()) { - return CompletableFuture.completedFuture( + return updatedVoters.map(voterSet -> storeUpdatedVoters( + leaderState, + voterKey, + inMemoryVoters, + voterSet, + requestListenerName, + currentTimeMs + )).orElseGet(() -> CompletableFuture.completedFuture( RaftUtil.updateVoterResponse( - Errors.VOTER_NOT_FOUND, - requestListenerName, - new LeaderAndEpoch( - localId, - leaderState.epoch() - ), - leaderState.leaderEndpoints() + Errors.VOTER_NOT_FOUND, + requestListenerName, + new LeaderAndEpoch( + localId, + leaderState.epoch() + ), + leaderState.leaderEndpoints() ) - ); - } + )); - return storeUpdatedVoters( - leaderState, - voterKey, - inMemoryVoters, - updatedVoters.get(), - requestListenerName, - currentTimeMs - ); } private boolean validVersionRange( - KRaftVersion finalizedVersion, - UpdateRaftVoterRequestData.KRaftVersionFeature supportedKraftVersions + KRaftVersion finalizedVersion, + UpdateRaftVoterRequestData.KRaftVersionFeature supportedKraftVersions ) { return supportedKraftVersions.minSupportedVersion() <= finalizedVersion.featureLevel() && - supportedKraftVersions.maxSupportedVersion() >= finalizedVersion.featureLevel(); + supportedKraftVersions.maxSupportedVersion() >= finalizedVersion.featureLevel(); } private Optional updateVoters( - VoterSet voters, - KRaftVersion kraftVersion, - VoterSet.VoterNode updatedVoter + VoterSet voters, + KRaftVersion kraftVersion, + VoterSet.VoterNode updatedVoter ) { return kraftVersion.isReconfigSupported() ? - voters.updateVoter(updatedVoter) : - voters.updateVoterIgnoringDirectoryId(updatedVoter); + voters.updateVoter(updatedVoter) : + voters.updateVoterIgnoringDirectoryId(updatedVoter); } private CompletableFuture storeUpdatedVoters( - LeaderState leaderState, - ReplicaKey voterKey, - Optional inMemoryVoters, - VoterSet newVoters, - ListenerName requestListenerName, - long currentTimeMs + LeaderState leaderState, + ReplicaKey voterKey, + Optional inMemoryVoters, + VoterSet newVoters, + ListenerName requestListenerName, + long currentTimeMs ) { if (inMemoryVoters.isEmpty()) { // Since the partition support reconfig then just write the update voter set directly to the log @@ -258,31 +255,31 @@ private CompletableFuture storeUpdatedVoters( } else { // Store the new voters set in the leader state since it cannot be written to the log var successful = leaderState.compareAndSetVolatileVoters( - inMemoryVoters.get(), - new KRaftVersionUpgrade.Voters(newVoters) + inMemoryVoters.get(), + new KRaftVersionUpgrade.Voters(newVoters) ); if (successful) { log.info( - "Updated in-memory voters from {} to {}", - inMemoryVoters.get().voters(), - newVoters + "Updated in-memory voters from {} to {}", + inMemoryVoters.get().voters(), + newVoters ); } else { log.info( - "Unable to update in-memory voters from {} to {}", - inMemoryVoters.get().voters(), - newVoters + "Unable to update in-memory voters from {} to {}", + inMemoryVoters.get().voters(), + newVoters ); return CompletableFuture.completedFuture( - RaftUtil.updateVoterResponse( - Errors.REQUEST_TIMED_OUT, - requestListenerName, - new LeaderAndEpoch( - localId, - leaderState.epoch() - ), - leaderState.leaderEndpoints() - ) + RaftUtil.updateVoterResponse( + Errors.REQUEST_TIMED_OUT, + requestListenerName, + new LeaderAndEpoch( + localId, + leaderState.epoch() + ), + leaderState.leaderEndpoints() + ) ); } } @@ -291,15 +288,15 @@ private CompletableFuture storeUpdatedVoters( leaderState.updateCheckQuorumForFollowingVoter(voterKey, currentTimeMs); return CompletableFuture.completedFuture( - RaftUtil.updateVoterResponse( - Errors.NONE, - requestListenerName, - new LeaderAndEpoch( - localId, - leaderState.epoch() - ), - leaderState.leaderEndpoints() - ) + RaftUtil.updateVoterResponse( + Errors.NONE, + requestListenerName, + new LeaderAndEpoch( + localId, + leaderState.epoch() + ), + leaderState.leaderEndpoints() + ) ); } }