Skip to content

Commit 5cce42a

Browse files
authored
feat(meta): add KvGetMany gRPC API; refactor: split chained awaits (#19244)
* feat(meta): add `KvGetMany` gRPC API for batch key lookups Add bidirectional streaming gRPC API for looking up multiple keys. The client streams `KvGetManyRequest` messages (one key per message), and the server responds with a stream of `StreamItem` results. Changes: - Add `KvGetManyRequest` proto message and `KvGetMany` RPC - Implement `MetaLeader::kv_get_many()` for streaming key lookups - Add leader check in `MetaNode::handle_kv_get_many()` - Add `MetaHandle::handle_kv_get_many()` cross-runtime bridge - Implement gRPC service handler with token check and metrics - Add `EstablishedClient::kv_get_many()` client method - Add integration tests for leader, follower, and no-quorum scenarios - Fix: #349 * refactor: split chained await statements for readability Split chained `.await?.try_collect().await?` and `.await?.map_ok(...).try_collect().await?` patterns into separate statements across the codebase for improved readability and debugging. Changes: - Split patterns in `schema_api.rs`, `sequence_api_impl.rs`, `database_util.rs` - Split patterns in `kv_pb_api/mod.rs`, `tag_api.rs` - Split patterns in `compact_all.rs`, `leveled_map_test.rs` - Split patterns in `db_scoped_seq_bounded_read_test.rs`, `sm_v003_test.rs` - Split patterns in `compact_with_db_test.rs`, `store.rs` - Split patterns in `udf_mgr.rs`, `stage.rs` - Split patterns in `kvapi_test_suite.rs` - Split patterns in query service tests: `io.rs`, `commit.rs`, `truncate.rs`, `navigate.rs`, `index_scan.rs`, `index_refresh.rs`
1 parent 5451e1a commit 5cce42a

File tree

30 files changed

+453
-275
lines changed

30 files changed

+453
-275
lines changed

src/common/storage/src/stage.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -228,10 +228,9 @@ impl StageFilesInfo {
228228
pattern: Option<Regex>,
229229
max_files: usize,
230230
) -> Result<Vec<StageFileInfo>> {
231-
Self::list_files_stream_with_pattern(operator, path, pattern, Some(max_files))
232-
.await?
233-
.try_collect::<Vec<_>>()
234-
.await
231+
let strm =
232+
Self::list_files_stream_with_pattern(operator, path, pattern, Some(max_files)).await?;
233+
strm.try_collect::<Vec<_>>().await
235234
}
236235

237236
#[async_backtrace::framed]

src/meta/api/src/database_util.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -154,11 +154,8 @@ pub(crate) async fn drop_database_meta(
154154
ObjectTagIdRef::new(taggable_object.clone(), 0),
155155
);
156156
let obj_tag_dir = DirName::new(obj_tag_prefix);
157-
let tag_entries: Vec<_> = kv_api
158-
.list_pb(ListOptions::unlimited(&obj_tag_dir))
159-
.await?
160-
.try_collect()
161-
.await?;
157+
let strm = kv_api.list_pb(ListOptions::unlimited(&obj_tag_dir)).await?;
158+
let tag_entries: Vec<_> = strm.try_collect().await?;
162159
for entry in tag_entries {
163160
let tag_id = entry.key.name().tag_id;
164161
// Delete object -> tag reference

src/meta/api/src/kv_pb_api/mod.rs

Lines changed: 6 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -863,28 +863,19 @@ mod tests {
863863
let dir = DirName::new(CatalogIdIdent::new(&tenant, 0));
864864

865865
// List all with no limit
866-
let res: Vec<_> = foo
867-
.list_pb_values(ListOptions::unlimited(&dir))
868-
.await?
869-
.try_collect()
870-
.await?;
866+
let strm = foo.list_pb_values(ListOptions::unlimited(&dir)).await?;
867+
let res: Vec<_> = strm.try_collect().await?;
871868
assert_eq!(res.len(), 5);
872869
assert_eq!(res[0].catalog_option, catalog_meta.catalog_option);
873870

874871
// List with limit 3
875-
let res: Vec<_> = foo
876-
.list_pb_values(ListOptions::limited(&dir, 3))
877-
.await?
878-
.try_collect()
879-
.await?;
872+
let strm = foo.list_pb_values(ListOptions::limited(&dir, 3)).await?;
873+
let res: Vec<_> = strm.try_collect().await?;
880874
assert_eq!(res.len(), 3);
881875

882876
// List with limit 0
883-
let res: Vec<_> = foo
884-
.list_pb_values(ListOptions::limited(&dir, 0))
885-
.await?
886-
.try_collect()
887-
.await?;
877+
let strm = foo.list_pb_values(ListOptions::limited(&dir, 0)).await?;
878+
let res: Vec<_> = strm.try_collect().await?;
888879
assert!(res.is_empty());
889880

890881
Ok(())

src/meta/api/src/schema_api.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -348,11 +348,8 @@ pub async fn construct_drop_table_txn_operations(
348348
ObjectTagIdRef::new(taggable_object.clone(), 0),
349349
);
350350
let obj_tag_dir = DirName::new(obj_tag_prefix);
351-
let tag_entries: Vec<_> = kv_api
352-
.list_pb(ListOptions::unlimited(&obj_tag_dir))
353-
.await?
354-
.try_collect()
355-
.await?;
351+
let strm = kv_api.list_pb(ListOptions::unlimited(&obj_tag_dir)).await?;
352+
let tag_entries: Vec<_> = strm.try_collect().await?;
356353
for entry in tag_entries {
357354
let tag_id = entry.key.name().tag_id;
358355
// Delete object -> tag reference

src/meta/api/src/sequence_api_impl.rs

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -134,9 +134,8 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SequenceApi for KV {
134134
) -> Result<Vec<(String, SequenceMeta)>, MetaError> {
135135
let dir_name = DirName::new(SequenceIdent::new(tenant, "dummy"));
136136

137-
let ident_metas = self
138-
.list_pb(ListOptions::unlimited(&dir_name))
139-
.await?
137+
let strm = self.list_pb(ListOptions::unlimited(&dir_name)).await?;
138+
let ident_metas = strm
140139
.map_ok(|itm| (itm.key, itm.seqv.data))
141140
.try_collect::<Vec<_>>()
142141
.await?;
@@ -145,11 +144,8 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SequenceApi for KV {
145144
.iter()
146145
.map(|(ident, _)| SequenceStorageIdent::new_from(ident.clone()));
147146

148-
let storage_values = self
149-
.get_pb_values(storage_idents)
150-
.await?
151-
.try_collect::<Vec<_>>()
152-
.await?;
147+
let strm = self.get_pb_values(storage_idents).await?;
148+
let storage_values = strm.try_collect::<Vec<_>>().await?;
153149

154150
let key_metas = ident_metas
155151
.into_iter()

src/meta/api/src/tag_api.rs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -188,9 +188,8 @@ where
188188
}
189189

190190
// Transaction failed. Check if it's due to references or concurrent modification.
191-
let refs: Vec<String> = self
192-
.list_pb(ListOptions::unlimited(&refs_dir))
193-
.await?
191+
let strm = self.list_pb(ListOptions::unlimited(&refs_dir)).await?;
192+
let refs: Vec<String> = strm
194193
.map_ok(|entry| entry.key.name().object.to_string())
195194
.try_collect()
196195
.await?;
@@ -482,9 +481,8 @@ where
482481
TagIdObjectRefIdent::new_generic(tenant, TagIdObjectRef::prefix(tag_id)),
483482
2,
484483
);
485-
let refs = self
486-
.list_pb(ListOptions::unlimited(&refs_dir))
487-
.await?
484+
let strm = self.list_pb(ListOptions::unlimited(&refs_dir)).await?;
485+
let refs = strm
488486
.map_ok(|entry| {
489487
let object = entry.key.name().object.clone();
490488
let value_key = ObjectTagIdRefIdent::new_generic(

src/meta/client/src/established_client.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ use databend_common_meta_types::protobuf::Empty;
3232
use databend_common_meta_types::protobuf::ExportedChunk;
3333
use databend_common_meta_types::protobuf::KeysCount;
3434
use databend_common_meta_types::protobuf::KeysLayoutRequest;
35+
use databend_common_meta_types::protobuf::KvGetManyRequest;
3536
use databend_common_meta_types::protobuf::KvListRequest;
3637
use databend_common_meta_types::protobuf::MemberListReply;
3738
use databend_common_meta_types::protobuf::MemberListRequest;
@@ -280,6 +281,14 @@ impl EstablishedClient {
280281
self.client.kv_list(request).await.update_client(self)
281282
}
282283

284+
#[async_backtrace::framed]
285+
pub async fn kv_get_many(
286+
&mut self,
287+
request: impl tonic::IntoStreamingRequest<Message = KvGetManyRequest>,
288+
) -> Result<Response<Streaming<StreamItem>>, Status> {
289+
self.client.kv_get_many(request).await.update_client(self)
290+
}
291+
283292
#[async_backtrace::framed]
284293
pub async fn export(
285294
&mut self,

src/meta/client/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,9 @@ use semver::Version;
170170
/// - 2026-01-12: since TODO
171171
/// 🖥 server: add `kv_list` gRPC API: in protobuf, with `limit`, return stream.
172172
///
173+
/// - 2026-01-13: since TODO
174+
/// 🖥 server: add `kv_get_many` gRPC API: in protobuf, receive stream, return stream.
175+
///
173176
/// Server feature set:
174177
/// ```yaml
175178
/// server_features:

src/meta/client/tests/it/grpc_server.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ use databend_common_meta_types::protobuf::ExportedChunk;
2828
use databend_common_meta_types::protobuf::HandshakeResponse;
2929
use databend_common_meta_types::protobuf::KeysCount;
3030
use databend_common_meta_types::protobuf::KeysLayoutRequest;
31+
use databend_common_meta_types::protobuf::KvGetManyRequest;
3132
use databend_common_meta_types::protobuf::KvListRequest;
3233
use databend_common_meta_types::protobuf::MemberListReply;
3334
use databend_common_meta_types::protobuf::MemberListRequest;
@@ -97,6 +98,15 @@ impl MetaService for GrpcServiceForTestImpl {
9798
unimplemented!()
9899
}
99100

101+
type KvGetManyStream = BoxStream<StreamItem>;
102+
103+
async fn kv_get_many(
104+
&self,
105+
_request: Request<Streaming<KvGetManyRequest>>,
106+
) -> Result<Response<Self::KvGetManyStream>, Status> {
107+
unimplemented!()
108+
}
109+
100110
type ExportStream =
101111
Pin<Box<dyn Stream<Item = Result<ExportedChunk, tonic::Status>> + Send + 'static>>;
102112

src/meta/kvapi-test-suite/src/kvapi_test_suite.rs

Lines changed: 8 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -616,12 +616,8 @@ impl TestSuite {
616616
}
617617

618618
// List all with no limit
619-
let res: Vec<_> = kv
620-
.list_kv(ListOptions::unlimited("__limit_test/"))
621-
.await?
622-
.map_ok(|item| item.key)
623-
.try_collect()
624-
.await?;
619+
let strm = kv.list_kv(ListOptions::unlimited("__limit_test/")).await?;
620+
let res: Vec<_> = strm.map_ok(|item| item.key).try_collect().await?;
625621
assert_eq!(res, vec![
626622
"__limit_test/0",
627623
"__limit_test/1",
@@ -631,34 +627,24 @@ impl TestSuite {
631627
]);
632628

633629
// List with limit 3
634-
let res: Vec<_> = kv
635-
.list_kv(ListOptions::limited("__limit_test/", 3))
636-
.await?
637-
.map_ok(|item| item.key)
638-
.try_collect()
639-
.await?;
630+
let strm = kv.list_kv(ListOptions::limited("__limit_test/", 3)).await?;
631+
let res: Vec<_> = strm.map_ok(|item| item.key).try_collect().await?;
640632
assert_eq!(res, vec![
641633
"__limit_test/0",
642634
"__limit_test/1",
643635
"__limit_test/2",
644636
]);
645637

646638
// List with limit 0
647-
let res: Vec<_> = kv
648-
.list_kv(ListOptions::limited("__limit_test/", 0))
649-
.await?
650-
.map_ok(|item| item.key)
651-
.try_collect()
652-
.await?;
639+
let strm = kv.list_kv(ListOptions::limited("__limit_test/", 0)).await?;
640+
let res: Vec<_> = strm.map_ok(|item| item.key).try_collect().await?;
653641
assert_eq!(res, Vec::<String>::new());
654642

655643
// List with limit larger than result set
656-
let res: Vec<_> = kv
644+
let strm = kv
657645
.list_kv(ListOptions::limited("__limit_test/", 100))
658-
.await?
659-
.map_ok(|item| item.key)
660-
.try_collect()
661646
.await?;
647+
let res: Vec<_> = strm.map_ok(|item| item.key).try_collect().await?;
662648
assert_eq!(res, vec![
663649
"__limit_test/0",
664650
"__limit_test/1",

0 commit comments

Comments
 (0)