Skip to content

Commit 0a66881

Browse files
committed
[kv] Support kv snapshot lease
1 parent 920809f commit 0a66881

File tree

71 files changed

+4162
-113
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

71 files changed

+4162
-113
lines changed

fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.fluss.client.admin;
1919

2020
import org.apache.fluss.annotation.PublicEvolving;
21+
import org.apache.fluss.client.metadata.AcquireKvSnapshotLeaseResult;
2122
import org.apache.fluss.client.metadata.KvSnapshotMetadata;
2223
import org.apache.fluss.client.metadata.KvSnapshots;
2324
import org.apache.fluss.client.metadata.LakeSnapshot;
@@ -68,6 +69,8 @@
6869

6970
import java.util.Collection;
7071
import java.util.List;
72+
import java.util.Map;
73+
import java.util.Set;
7174
import java.util.concurrent.CompletableFuture;
7275

7376
/**
@@ -408,6 +411,56 @@ CompletableFuture<Void> dropPartition(
408411
CompletableFuture<KvSnapshotMetadata> getKvSnapshotMetadata(
409412
TableBucket bucket, long snapshotId);
410413

414+
/**
415+
* Acquires a lease for specific KV snapshots of the given tableBuckets asynchronously.
416+
*
417+
* <p>Once acquired, the specified KV snapshots will be protected from garbage collection for
418+
* the duration of the {@code leaseDuration}. The client must call {@link
419+
* #releaseKvSnapshotLease} to release the lock early when reading is finished.
420+
*
421+
* <p>If the lease expires (no renew received within duration), the server is free to delete the
422+
* snapshot files.
423+
*
424+
* <p>The following exceptions can be anticipated when calling {@code get()} on returned future:
425+
*
426+
* <ul>
427+
* <li>{@link TableNotExistException} if the table does not exist.
428+
* <li>{@link PartitionNotExistException} if the partition does not exist.
429+
* </ul>
430+
*
431+
* @param leaseId The unique ID for this lease session (usually a UUID generated by client).
432+
* @param snapshotIds The snapshots to lease, a map from TableBucket to kvSnapshotId.
433+
* @param leaseDuration The duration (in milliseconds) for which the snapshots should be kept.
434+
* @return The result of the acquire operation, containing any buckets that failed to be locked.
435+
*/
436+
CompletableFuture<AcquireKvSnapshotLeaseResult> acquireKvSnapshotLease(
437+
String leaseId, Map<TableBucket, Long> snapshotIds, long leaseDuration);
438+
439+
/**
440+
* Releases the lease for specific tableBuckets asynchronously.
441+
*
442+
* <p>This is typically called when a client finishes reading a specific bucket (or a batch of
443+
* buckets) but is still reading others under the same leaseId.
444+
*
445+
* <p>If {@code bucketsToRelease} contains all buckets under this leaseId, the lease itself will
446+
* be removed.
447+
*
448+
* @param leaseId The lease id.
449+
* @param bucketsToRelease The specific tableBuckets to release.
450+
*/
451+
CompletableFuture<Void> releaseKvSnapshotLease(
452+
String leaseId, Set<TableBucket> bucketsToRelease);
453+
454+
/**
455+
* Drops the entire lease asynchronously.
456+
*
457+
* <p>All snapshots locked under this {@code leaseId} will be released immediately. This is
458+
* equivalent to calling {@link #releaseKvSnapshotLease} with all held buckets.
459+
*
460+
* @param leaseId The lease id to drop.
461+
*/
462+
CompletableFuture<Void> dropKvSnapshotLease(String leaseId);
463+
411464
/**
412465
* Get table lake snapshot info of the given table asynchronously.
413466
*

fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.fluss.client.admin;
1919

20+
import org.apache.fluss.client.metadata.AcquireKvSnapshotLeaseResult;
2021
import org.apache.fluss.client.metadata.KvSnapshotMetadata;
2122
import org.apache.fluss.client.metadata.KvSnapshots;
2223
import org.apache.fluss.client.metadata.LakeSnapshot;
@@ -57,6 +58,7 @@
5758
import org.apache.fluss.rpc.messages.DescribeClusterConfigsRequest;
5859
import org.apache.fluss.rpc.messages.DropAclsRequest;
5960
import org.apache.fluss.rpc.messages.DropDatabaseRequest;
61+
import org.apache.fluss.rpc.messages.DropKvSnapshotLeaseRequest;
6062
import org.apache.fluss.rpc.messages.DropTableRequest;
6163
import org.apache.fluss.rpc.messages.GetDatabaseInfoRequest;
6264
import org.apache.fluss.rpc.messages.GetKvSnapshotMetadataRequest;
@@ -91,13 +93,16 @@
9193
import java.util.HashMap;
9294
import java.util.List;
9395
import java.util.Map;
96+
import java.util.Set;
9497
import java.util.concurrent.CompletableFuture;
9598

99+
import static org.apache.fluss.client.utils.ClientRpcMessageUtils.makeAcquireKvSnapshotLeaseRequest;
96100
import static org.apache.fluss.client.utils.ClientRpcMessageUtils.makeAlterTableRequest;
97101
import static org.apache.fluss.client.utils.ClientRpcMessageUtils.makeCreatePartitionRequest;
98102
import static org.apache.fluss.client.utils.ClientRpcMessageUtils.makeDropPartitionRequest;
99103
import static org.apache.fluss.client.utils.ClientRpcMessageUtils.makeListOffsetsRequest;
100104
import static org.apache.fluss.client.utils.ClientRpcMessageUtils.makePbPartitionSpec;
105+
import static org.apache.fluss.client.utils.ClientRpcMessageUtils.makeReleaseKvSnapshotLeaseRequest;
101106
import static org.apache.fluss.client.utils.ClientRpcMessageUtils.toConfigEntries;
102107
import static org.apache.fluss.client.utils.MetadataUtils.sendMetadataRequestAndRebuildCluster;
103108
import static org.apache.fluss.rpc.util.CommonRpcMessageUtils.toAclBindings;
@@ -381,6 +386,33 @@ public CompletableFuture<KvSnapshotMetadata> getKvSnapshotMetadata(
381386
.thenApply(ClientRpcMessageUtils::toKvSnapshotMetadata);
382387
}
383388

389+
@Override
390+
public CompletableFuture<AcquireKvSnapshotLeaseResult> acquireKvSnapshotLease(
391+
String leaseId, Map<TableBucket, Long> snapshotIds, long leaseDuration) {
392+
if (snapshotIds.isEmpty()) {
393+
throw new IllegalArgumentException(
394+
"The snapshotIds to acquire kv snapshot lease is empty");
395+
}
396+
397+
return gateway.acquireKvSnapshotLease(
398+
makeAcquireKvSnapshotLeaseRequest(leaseId, snapshotIds, leaseDuration))
399+
.thenApply(ClientRpcMessageUtils::toAcquireKvSnapshotLeaseResult);
400+
}
401+
402+
@Override
403+
public CompletableFuture<Void> releaseKvSnapshotLease(
404+
String leaseId, Set<TableBucket> bucketsToRelease) {
405+
return gateway.releaseKvSnapshotLease(
406+
makeReleaseKvSnapshotLeaseRequest(leaseId, bucketsToRelease))
407+
.thenApply(r -> null);
408+
}
409+
410+
@Override
411+
public CompletableFuture<Void> dropKvSnapshotLease(String leaseId) {
412+
DropKvSnapshotLeaseRequest request = new DropKvSnapshotLeaseRequest().setLeaseId(leaseId);
413+
return gateway.dropKvSnapshotLease(request).thenApply(r -> null);
414+
}
415+
384416
@Override
385417
public CompletableFuture<LakeSnapshot> getLatestLakeSnapshot(TablePath tablePath) {
386418
GetLatestLakeSnapshotRequest request = new GetLatestLakeSnapshotRequest();
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.client.metadata;
19+
20+
import org.apache.fluss.annotation.PublicEvolving;
21+
import org.apache.fluss.metadata.TableBucket;
22+
23+
import java.util.Map;
24+
import java.util.Set;
25+
26+
/**
27+
* A class to represent the result of acquire kv snapshot lease. It contains:
28+
*
29+
* <ul>
30+
* <li>A map of unavailable snapshots. Such as the specify snapshotId is not exist for this table
31+
* bucket.
32+
* </ul>
33+
*
34+
* @since 0.9
35+
*/
36+
@PublicEvolving
37+
public class AcquireKvSnapshotLeaseResult {
38+
private final Map<TableBucket, Long> unavailableSnapshots;
39+
40+
public AcquireKvSnapshotLeaseResult(Map<TableBucket, Long> unavailableSnapshots) {
41+
this.unavailableSnapshots = unavailableSnapshots;
42+
}
43+
44+
/**
45+
* Returns the set of buckets that could not be locked (e.g., snapshot ID doesn't exist or has
46+
* already been GC'ed).
47+
*/
48+
public Map<TableBucket, Long> getUnavailableSnapshots() {
49+
return unavailableSnapshots;
50+
}
51+
52+
public Set<TableBucket> getUnavailableTableBucketSet() {
53+
return unavailableSnapshots.keySet();
54+
}
55+
}

fluss-client/src/main/java/org/apache/fluss/client/metadata/KvSnapshots.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,14 @@
1818
package org.apache.fluss.client.metadata;
1919

2020
import org.apache.fluss.annotation.PublicEvolving;
21+
import org.apache.fluss.metadata.TableBucket;
2122

2223
import javax.annotation.Nullable;
2324

2425
import java.util.Map;
2526
import java.util.OptionalLong;
2627
import java.util.Set;
28+
import java.util.stream.Collectors;
2729

2830
/**
2931
* A class representing the kv snapshots of a table or a partition. It contains multiple snapshots
@@ -71,6 +73,12 @@ public Set<Integer> getBucketIds() {
7173
return snapshotIds.keySet();
7274
}
7375

76+
public Set<TableBucket> getTableBuckets() {
77+
return snapshotIds.keySet().stream()
78+
.map(bucketId -> new TableBucket(tableId, partitionId, bucketId))
79+
.collect(Collectors.toSet());
80+
}
81+
7482
/**
7583
* Get the latest snapshot id for this kv tablet (bucket), or empty if there are no snapshots.
7684
*/

fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.apache.fluss.client.admin.OffsetSpec;
2121
import org.apache.fluss.client.lookup.LookupBatch;
2222
import org.apache.fluss.client.lookup.PrefixLookupBatch;
23+
import org.apache.fluss.client.metadata.AcquireKvSnapshotLeaseResult;
2324
import org.apache.fluss.client.metadata.KvSnapshotMetadata;
2425
import org.apache.fluss.client.metadata.KvSnapshots;
2526
import org.apache.fluss.client.metadata.LakeSnapshot;
@@ -37,6 +38,8 @@
3738
import org.apache.fluss.metadata.TableBucket;
3839
import org.apache.fluss.metadata.TableChange;
3940
import org.apache.fluss.metadata.TablePath;
41+
import org.apache.fluss.rpc.messages.AcquireKvSnapshotLeaseRequest;
42+
import org.apache.fluss.rpc.messages.AcquireKvSnapshotLeaseResponse;
4043
import org.apache.fluss.rpc.messages.AlterTableRequest;
4144
import org.apache.fluss.rpc.messages.CreatePartitionRequest;
4245
import org.apache.fluss.rpc.messages.DropPartitionRequest;
@@ -50,10 +53,13 @@
5053
import org.apache.fluss.rpc.messages.MetadataRequest;
5154
import org.apache.fluss.rpc.messages.PbAddColumn;
5255
import org.apache.fluss.rpc.messages.PbAlterConfig;
56+
import org.apache.fluss.rpc.messages.PbBucket;
5357
import org.apache.fluss.rpc.messages.PbDescribeConfig;
5458
import org.apache.fluss.rpc.messages.PbDropColumn;
5559
import org.apache.fluss.rpc.messages.PbKeyValue;
5660
import org.apache.fluss.rpc.messages.PbKvSnapshot;
61+
import org.apache.fluss.rpc.messages.PbKvSnapshotLeaseForBucket;
62+
import org.apache.fluss.rpc.messages.PbKvSnapshotLeaseForTable;
5763
import org.apache.fluss.rpc.messages.PbLakeSnapshotForBucket;
5864
import org.apache.fluss.rpc.messages.PbLookupReqForBucket;
5965
import org.apache.fluss.rpc.messages.PbModifyColumn;
@@ -66,6 +72,7 @@
6672
import org.apache.fluss.rpc.messages.PrefixLookupRequest;
6773
import org.apache.fluss.rpc.messages.ProduceLogRequest;
6874
import org.apache.fluss.rpc.messages.PutKvRequest;
75+
import org.apache.fluss.rpc.messages.ReleaseKvSnapshotLeaseRequest;
6976
import org.apache.fluss.utils.json.DataTypeJsonSerde;
7077
import org.apache.fluss.utils.json.JsonSerdeUtils;
7178

@@ -370,6 +377,75 @@ public static AlterTableRequest makeAlterTableRequest(
370377
return request;
371378
}
372379

380+
public static AcquireKvSnapshotLeaseRequest makeAcquireKvSnapshotLeaseRequest(
381+
String leaseId, Map<TableBucket, Long> snapshotIds, long leaseDuration) {
382+
AcquireKvSnapshotLeaseRequest request = new AcquireKvSnapshotLeaseRequest();
383+
request.setLeaseId(leaseId).setLeaseDuration(leaseDuration);
384+
385+
Map<Long, List<PbKvSnapshotLeaseForBucket>> pbLeaseForTables = new HashMap<>();
386+
for (Map.Entry<TableBucket, Long> entry : snapshotIds.entrySet()) {
387+
TableBucket tableBucket = entry.getKey();
388+
Long snapshotId = entry.getValue();
389+
PbKvSnapshotLeaseForBucket pbLeaseForBucket =
390+
new PbKvSnapshotLeaseForBucket()
391+
.setBucketId(tableBucket.getBucket())
392+
.setSnapshotId(snapshotId);
393+
if (tableBucket.getPartitionId() != null) {
394+
pbLeaseForBucket.setPartitionId(tableBucket.getPartitionId());
395+
}
396+
pbLeaseForTables
397+
.computeIfAbsent(tableBucket.getTableId(), k -> new ArrayList<>())
398+
.add(pbLeaseForBucket);
399+
}
400+
401+
for (Map.Entry<Long, List<PbKvSnapshotLeaseForBucket>> entry :
402+
pbLeaseForTables.entrySet()) {
403+
request.addTableLeaseReq()
404+
.setTableId(entry.getKey())
405+
.addAllBucketsReqs(entry.getValue());
406+
}
407+
return request;
408+
}
409+
410+
public static AcquireKvSnapshotLeaseResult toAcquireKvSnapshotLeaseResult(
411+
AcquireKvSnapshotLeaseResponse response) {
412+
Map<TableBucket, Long> unavailableSnapshots = new HashMap<>();
413+
for (PbKvSnapshotLeaseForTable leaseForTable : response.getTablesLeaseResList()) {
414+
long tableId = leaseForTable.getTableId();
415+
for (PbKvSnapshotLeaseForBucket leaseForBucket : leaseForTable.getBucketsReqsList()) {
416+
TableBucket tableBucket =
417+
new TableBucket(
418+
tableId,
419+
leaseForBucket.hasPartitionId()
420+
? leaseForBucket.getPartitionId()
421+
: null,
422+
leaseForBucket.getBucketId());
423+
unavailableSnapshots.put(tableBucket, leaseForBucket.getSnapshotId());
424+
}
425+
}
426+
return new AcquireKvSnapshotLeaseResult(unavailableSnapshots);
427+
}
428+
429+
public static ReleaseKvSnapshotLeaseRequest makeReleaseKvSnapshotLeaseRequest(
430+
String leaseId, Set<TableBucket> bucketsToRelease) {
431+
ReleaseKvSnapshotLeaseRequest request = new ReleaseKvSnapshotLeaseRequest();
432+
request.setLeaseId(leaseId);
433+
434+
Map<Long, List<PbBucket>> pbLeasedTable = new HashMap<>();
435+
for (TableBucket tb : bucketsToRelease) {
436+
PbBucket pbBucket = new PbBucket().setBucketId(tb.getBucket());
437+
if (tb.getPartitionId() != null) {
438+
pbBucket.setPartitionId(tb.getPartitionId());
439+
}
440+
pbLeasedTable.computeIfAbsent(tb.getTableId(), k -> new ArrayList<>()).add(pbBucket);
441+
}
442+
443+
for (Map.Entry<Long, List<PbBucket>> entry : pbLeasedTable.entrySet()) {
444+
request.addReleaseTable().setTableId(entry.getKey()).addAllBuckets(entry.getValue());
445+
}
446+
return request;
447+
}
448+
373449
public static List<PartitionInfo> toPartitionInfos(ListPartitionInfosResponse response) {
374450
return response.getPartitionsInfosList().stream()
375451
.map(

fluss-client/src/test/java/org/apache/fluss/client/metadata/TestingClientSchemaGetter.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,13 +32,13 @@ public class TestingClientSchemaGetter extends ClientSchemaGetter {
3232
public TestingClientSchemaGetter(
3333
TablePath tablePath,
3434
SchemaInfo latestSchemaInfo,
35-
TestingMetadataUpdater metadataUpdater) {
35+
TestingMetadataUpdater metadataUpdater,
36+
Configuration conf) {
3637
super(
3738
tablePath,
3839
latestSchemaInfo,
3940
new FlussAdmin(
40-
RpcClient.create(
41-
new Configuration(), TestingClientMetricGroup.newInstance(), false),
41+
RpcClient.create(conf, TestingClientMetricGroup.newInstance(), false),
4242
metadataUpdater));
4343
}
4444

0 commit comments

Comments
 (0)