Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ iceberg = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "d
] }
iceberg-catalog-glue = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "d1b8cabf4f8a33090d018b539bfec6e7c623d7c4" }
iceberg-catalog-rest = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "d1b8cabf4f8a33090d018b539bfec6e7c623d7c4" }

indexmap = { version = "2.12.0", features = ["serde"] }
itertools = "0.14.0"
jni = { version = "0.21.1", features = ["invocation"] }
jsonbb = "0.1.4"
Expand Down
2 changes: 1 addition & 1 deletion src/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ jiff = "0.1.15"
jni = { workspace = true }
jsonbb = { workspace = true }
linkme = { workspace = true }
lru = { workspace = true }
memcomparable = { version = "0.2", features = ["decimal"] }
num-integer = "0.1"
num-traits = "0.2"
Expand Down Expand Up @@ -144,6 +143,7 @@ mach2 = "0.4"
coarsetime = "0.1"
criterion = { workspace = true }
expect-test = "1"
lru = { workspace = true }
more-asserts = "0.3"
pretty_assertions = "1"
rand = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion src/connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ google-cloud-pubsub = { package = "gcloud-pubsub", version = "1" }
iceberg = { workspace = true }
iceberg-catalog-glue = { workspace = true }
iceberg-catalog-rest = { workspace = true }
indexmap = { version = "2.12.0", features = ["serde"] }
indexmap = { workspace = true }
itertools = { workspace = true }
jni = { workspace = true }
maplit = "1.0.2"
Expand Down
2 changes: 1 addition & 1 deletion src/meta/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ futures-async-stream = { workspace = true }
hex = "0.4"
http = "1"
iceberg = { workspace = true }
indexmap = { version = "2.12.0", features = ["serde"] }
indexmap = { workspace = true }
itertools = { workspace = true }
jsonbb = { workspace = true }
maplit = "1.0.2"
Expand Down
1 change: 0 additions & 1 deletion src/stream/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ hytra = "0.1.2"
iceberg = { workspace = true }
itertools = { workspace = true }
jsonbb = { workspace = true }
lru = { workspace = true }
maplit = "1.0.2"
memcomparable = "0.2"
multimap = "0.10"
Expand Down
9 changes: 2 additions & 7 deletions src/stream/src/executor/temporal_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use either::Either;
use futures::TryStreamExt;
use futures::stream::{self, PollNext};
use itertools::Itertools;
use lru::DefaultHasher;
use risingwave_common::array::Op;
use risingwave_common::bitmap::BitmapBuilder;
use risingwave_common::hash::{HashKey, NullBitmap};
Expand Down Expand Up @@ -107,7 +106,7 @@ struct TemporalSide<K: HashKey, S: StateStore> {
source: BatchTable<S>,
table_stream_key_indices: Vec<usize>,
table_output_indices: Vec<usize>,
cache: ManagedLruCache<K, JoinEntry, DefaultHasher>,
cache: ManagedLruCache<K, JoinEntry>,
join_key_data_types: Vec<DataType>,
}

Expand Down Expand Up @@ -618,11 +617,7 @@ impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive, const APPEND_ONLY: b
"temporal join",
);

let cache = ManagedLruCache::unbounded_with_hasher(
watermark_sequence,
metrics_info,
DefaultHasher::default(),
);
let cache = ManagedLruCache::unbounded(watermark_sequence, metrics_info);

let metrics = metrics.new_temporal_join_metrics(table.table_id(), ctx.id, ctx.fragment_id);

Expand Down
2 changes: 1 addition & 1 deletion src/tests/simulation/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ expect-test = "1"
fail = { version = "0.5" }
futures = { version = "0.3", default-features = false, features = ["alloc"] }
glob = "0.3"
indexmap = { workspace = true }
itertools = { workspace = true }
lru = { workspace = true }
madsim = { workspace = true }
maplit = "1"
paste = "1"
Expand Down
46 changes: 9 additions & 37 deletions src/tests/simulation/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@

use std::time::Duration;

use indexmap::IndexMap;
use itertools::Itertools;
use lru::{Iter, LruCache};
use risingwave_sqlparser::ast::Statement;
use risingwave_sqlparser::parser::Parser;
use sqllogictest::{DBOutput, DefaultColumnType};
Expand All @@ -33,33 +33,10 @@ pub struct RisingWave {

/// `SetStmts` stores and compacts all `SET` statements that have been executed in the client
/// history.
#[derive(Default)]
pub struct SetStmts {
stmts_cache: LruCache<String, String>,
}

impl Default for SetStmts {
fn default() -> Self {
Self {
stmts_cache: LruCache::unbounded(),
}
}
}

struct SetStmtsIterator<'a, 'b>
where
'a: 'b,
{
_stmts: &'a SetStmts,
stmts_iter: core::iter::Rev<Iter<'b, String, String>>,
}

impl<'a> SetStmtsIterator<'a, '_> {
fn new(stmts: &'a SetStmts) -> Self {
Self {
_stmts: stmts,
stmts_iter: stmts.stmts_cache.iter().rev(),
}
}
// variable name -> last set statement
stmts: IndexMap<String, String>,
}

impl SetStmts {
Expand All @@ -78,19 +55,14 @@ impl SetStmts {
} => {
let key = variable.real_value().to_lowercase();
// store complete sql as value.
self.stmts_cache.put(key, sql.to_owned());
self.stmts.insert(key, sql.to_owned());
}
_ => unreachable!(),
}
}
}

impl Iterator for SetStmtsIterator<'_, '_> {
type Item = String;

fn next(&mut self) -> Option<Self::Item> {
let (_, stmt) = self.stmts_iter.next()?;
Some(stmt.clone())
fn replay_iter(&self) -> impl Iterator<Item = &str> + '_ {
self.stmts.values().map(|s| s.as_str())
}
}

Expand Down Expand Up @@ -130,8 +102,8 @@ impl RisingWave {
}
});
// replay all SET statements
for stmt in SetStmtsIterator::new(set_stmts) {
client.simple_query(&stmt).await?;
for stmt in set_stmts.replay_iter() {
client.simple_query(stmt).await?;
}
Ok((client, task))
}
Expand Down
Loading