Skip to content

Commit 47b9dd2

Browse files
authored
refactor:  remove some deps on lru (#23489)
Signed-off-by: Bugen Zhao <[email protected]>
1 parent adb504d commit 47b9dd2

File tree

9 files changed

+17
-52
lines changed

9 files changed

+17
-52
lines changed

Cargo.lock

Lines changed: 1 addition & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,7 @@ iceberg = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "d
175175
] }
176176
iceberg-catalog-glue = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "d1b8cabf4f8a33090d018b539bfec6e7c623d7c4" }
177177
iceberg-catalog-rest = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "d1b8cabf4f8a33090d018b539bfec6e7c623d7c4" }
178-
178+
indexmap = { version = "2.12.0", features = ["serde"] }
179179
itertools = "0.14.0"
180180
jni = { version = "0.21.1", features = ["invocation"] }
181181
jsonbb = "0.1.4"

src/common/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,6 @@ jiff = "0.1.15"
5656
jni = { workspace = true }
5757
jsonbb = { workspace = true }
5858
linkme = { workspace = true }
59-
lru = { workspace = true }
6059
memcomparable = { version = "0.2", features = ["decimal"] }
6160
num-integer = "0.1"
6261
num-traits = "0.2"
@@ -144,6 +143,7 @@ mach2 = "0.4"
144143
coarsetime = "0.1"
145144
criterion = { workspace = true }
146145
expect-test = "1"
146+
lru = { workspace = true }
147147
more-asserts = "0.3"
148148
pretty_assertions = "1"
149149
rand = { workspace = true }

src/connector/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ google-cloud-pubsub = { package = "gcloud-pubsub", version = "1" }
7070
iceberg = { workspace = true }
7171
iceberg-catalog-glue = { workspace = true }
7272
iceberg-catalog-rest = { workspace = true }
73-
indexmap = { version = "2.12.0", features = ["serde"] }
73+
indexmap = { workspace = true }
7474
itertools = { workspace = true }
7575
jni = { workspace = true }
7676
maplit = "1.0.2"

src/meta/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ futures-async-stream = { workspace = true }
3333
hex = "0.4"
3434
http = "1"
3535
iceberg = { workspace = true }
36-
indexmap = { version = "2.12.0", features = ["serde"] }
36+
indexmap = { workspace = true }
3737
itertools = { workspace = true }
3838
jsonbb = { workspace = true }
3939
maplit = "1.0.2"

src/stream/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@ hytra = "0.1.2"
3434
iceberg = { workspace = true }
3535
itertools = { workspace = true }
3636
jsonbb = { workspace = true }
37-
lru = { workspace = true }
3837
maplit = "1.0.2"
3938
memcomparable = "0.2"
4039
multimap = "0.10"

src/stream/src/executor/temporal_join.rs

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ use either::Either;
1919
use futures::TryStreamExt;
2020
use futures::stream::{self, PollNext};
2121
use itertools::Itertools;
22-
use lru::DefaultHasher;
2322
use risingwave_common::array::Op;
2423
use risingwave_common::bitmap::BitmapBuilder;
2524
use risingwave_common::hash::{HashKey, NullBitmap};
@@ -107,7 +106,7 @@ struct TemporalSide<K: HashKey, S: StateStore> {
107106
source: BatchTable<S>,
108107
table_stream_key_indices: Vec<usize>,
109108
table_output_indices: Vec<usize>,
110-
cache: ManagedLruCache<K, JoinEntry, DefaultHasher>,
109+
cache: ManagedLruCache<K, JoinEntry>,
111110
join_key_data_types: Vec<DataType>,
112111
}
113112

@@ -618,11 +617,7 @@ impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive, const APPEND_ONLY: b
618617
"temporal join",
619618
);
620619

621-
let cache = ManagedLruCache::unbounded_with_hasher(
622-
watermark_sequence,
623-
metrics_info,
624-
DefaultHasher::default(),
625-
);
620+
let cache = ManagedLruCache::unbounded(watermark_sequence, metrics_info);
626621

627622
let metrics = metrics.new_temporal_join_metrics(table.table_id(), ctx.id, ctx.fragment_id);
628623

src/tests/simulation/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@ expect-test = "1"
1616
fail = { version = "0.5" }
1717
futures = { version = "0.3", default-features = false, features = ["alloc"] }
1818
glob = "0.3"
19+
indexmap = { workspace = true }
1920
itertools = { workspace = true }
20-
lru = { workspace = true }
2121
madsim = { workspace = true }
2222
maplit = "1"
2323
paste = "1"

src/tests/simulation/src/client.rs

Lines changed: 9 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@
1414

1515
use std::time::Duration;
1616

17+
use indexmap::IndexMap;
1718
use itertools::Itertools;
18-
use lru::{Iter, LruCache};
1919
use risingwave_sqlparser::ast::Statement;
2020
use risingwave_sqlparser::parser::Parser;
2121
use sqllogictest::{DBOutput, DefaultColumnType};
@@ -33,33 +33,10 @@ pub struct RisingWave {
3333

3434
/// `SetStmts` stores and compacts all `SET` statements that have been executed in the client
3535
/// history.
36+
#[derive(Default)]
3637
pub struct SetStmts {
37-
stmts_cache: LruCache<String, String>,
38-
}
39-
40-
impl Default for SetStmts {
41-
fn default() -> Self {
42-
Self {
43-
stmts_cache: LruCache::unbounded(),
44-
}
45-
}
46-
}
47-
48-
struct SetStmtsIterator<'a, 'b>
49-
where
50-
'a: 'b,
51-
{
52-
_stmts: &'a SetStmts,
53-
stmts_iter: core::iter::Rev<Iter<'b, String, String>>,
54-
}
55-
56-
impl<'a> SetStmtsIterator<'a, '_> {
57-
fn new(stmts: &'a SetStmts) -> Self {
58-
Self {
59-
_stmts: stmts,
60-
stmts_iter: stmts.stmts_cache.iter().rev(),
61-
}
62-
}
38+
// variable name -> last set statement
39+
stmts: IndexMap<String, String>,
6340
}
6441

6542
impl SetStmts {
@@ -78,19 +55,14 @@ impl SetStmts {
7855
} => {
7956
let key = variable.real_value().to_lowercase();
8057
// store complete sql as value.
81-
self.stmts_cache.put(key, sql.to_owned());
58+
self.stmts.insert(key, sql.to_owned());
8259
}
8360
_ => unreachable!(),
8461
}
8562
}
86-
}
87-
88-
impl Iterator for SetStmtsIterator<'_, '_> {
89-
type Item = String;
9063

91-
fn next(&mut self) -> Option<Self::Item> {
92-
let (_, stmt) = self.stmts_iter.next()?;
93-
Some(stmt.clone())
64+
fn replay_iter(&self) -> impl Iterator<Item = &str> + '_ {
65+
self.stmts.values().map(|s| s.as_str())
9466
}
9567
}
9668

@@ -130,8 +102,8 @@ impl RisingWave {
130102
}
131103
});
132104
// replay all SET statements
133-
for stmt in SetStmtsIterator::new(set_stmts) {
134-
client.simple_query(&stmt).await?;
105+
for stmt in set_stmts.replay_iter() {
106+
client.simple_query(stmt).await?;
135107
}
136108
Ok((client, task))
137109
}

0 commit comments

Comments
 (0)