Skip to content

Commit 82aeacc

Browse files
authored
feat(stream): introduce allocator_api in JoinHashMap (#3020)
* feat(stream): introduce allocator_api in JoinHashMap Signed-off-by: TennyZhuang <[email protected]> * fix typo Signed-off-by: TennyZhuang <[email protected]> * cargo sort Signed-off-by: TennyZhuang <[email protected]>
1 parent 73d525b commit 82aeacc

File tree

11 files changed

+268
-14
lines changed

11 files changed

+268
-14
lines changed

Cargo.lock

Lines changed: 19 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ members = [
2929
"src/utils/memcomparable",
3030
"src/utils/pgwire",
3131
"src/utils/static-link",
32+
"src/utils/stats_alloc",
3233
"src/utils/value-encoding",
3334
"src/workspace-hack",
3435
]

src/common/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ hyper = "0.14"
1919
itertools = "0.10"
2020
lazy_static = "1"
2121
log = "0.4"
22-
lru = "0.7"
22+
lru = { git = "https://github.com/singularity-data/lru-rs.git", rev = "e0e9ddaf8e4a51c244a03676734437e68a336b30" }
2323
madsim = "=0.2.0-alpha.3"
2424
memcomparable = { path = "../utils/memcomparable" }
2525
more-asserts = "0.3"

src/common/src/collection/evictable.rs

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,20 +12,38 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::alloc::{Allocator, Global};
1516
use std::cmp::Eq;
1617
use std::hash::{BuildHasher, Hash};
1718
use std::ops::{Deref, DerefMut};
1819

1920
use lru::{DefaultHasher, LruCache};
2021

2122
/// A wrapper for [`LruCache`] which provides manual eviction.
22-
pub struct EvictableHashMap<K, V, S = DefaultHasher> {
23-
inner: LruCache<K, V, S>,
23+
pub struct EvictableHashMap<K, V, S = DefaultHasher, A: Clone + Allocator = Global> {
24+
inner: LruCache<K, V, S, A>,
2425

2526
/// Target capacity to keep when calling `evict_to_target_cap`.
2627
target_cap: usize,
2728
}
2829

30+
impl<K: Hash + Eq, V, A: Clone + Allocator> EvictableHashMap<K, V, DefaultHasher, A> {
31+
/// Create a [`EvictableHashMap`] with the given target capacity and allocator.
32+
pub fn new_in(target_cap: usize, alloc: A) -> Self {
33+
Self::with_hasher_in(target_cap, DefaultHasher::new(), alloc)
34+
}
35+
}
36+
37+
impl<K: Hash + Eq, V, S: BuildHasher, A: Clone + Allocator> EvictableHashMap<K, V, S, A> {
38+
/// Create a [`EvictableHashMap`] with the given target capacity, hasher and allocator.
39+
pub fn with_hasher_in(target_cap: usize, hasher: S, alloc: A) -> Self {
40+
Self {
41+
inner: LruCache::unbounded_with_hasher_in(hasher, alloc),
42+
target_cap,
43+
}
44+
}
45+
}
46+
2947
impl<K: Hash + Eq, V> EvictableHashMap<K, V> {
3048
/// Create a [`EvictableHashMap`] with the given target capacity.
3149
pub fn new(target_cap: usize) -> EvictableHashMap<K, V> {
@@ -41,7 +59,9 @@ impl<K: Hash + Eq, V, S: BuildHasher> EvictableHashMap<K, V, S> {
4159
target_cap,
4260
}
4361
}
62+
}
4463

64+
impl<K: Hash + Eq, V, S: BuildHasher, A: Clone + Allocator> EvictableHashMap<K, V, S, A> {
4565
pub fn target_cap(&self) -> usize {
4666
self.target_cap
4767
}
@@ -79,15 +99,15 @@ impl<K: Hash + Eq, V, S: BuildHasher> EvictableHashMap<K, V, S> {
7999
}
80100
}
81101

82-
impl<K, V, S> Deref for EvictableHashMap<K, V, S> {
83-
type Target = LruCache<K, V, S>;
102+
impl<K, V, S, A: Clone + Allocator> Deref for EvictableHashMap<K, V, S, A> {
103+
type Target = LruCache<K, V, S, A>;
84104

85105
fn deref(&self) -> &Self::Target {
86106
&self.inner
87107
}
88108
}
89109

90-
impl<K, V, S> DerefMut for EvictableHashMap<K, V, S> {
110+
impl<K, V, S, A: Clone + Allocator> DerefMut for EvictableHashMap<K, V, S, A> {
91111
fn deref_mut(&mut self) -> &mut Self::Target {
92112
&mut self.inner
93113
}

src/common/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
#![feature(type_alias_impl_trait)]
3636
#![feature(test)]
3737
#![feature(trusted_len)]
38+
#![feature(allocator_api)]
3839

3940
#[macro_use]
4041
pub mod error;

src/stream/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ memcomparable = { path = "../utils/memcomparable" }
2828
num-traits = "0.2"
2929
parking_lot = "0.12"
3030
paste = "1"
31+
pin-project = "1"
3132
prometheus = { version = "0.13", features = ["process"] }
3233
prost = "0.10"
3334
rdkafka = { version = "0.28", features = ["cmake-build"] }
@@ -43,6 +44,7 @@ serde-value = "0.7"
4344
serde_json = "1"
4445
smallvec = "1"
4546
static_assertions = "1"
47+
stats_alloc = { path = "../utils/stats_alloc" }
4648
thiserror = "1"
4749
tokio = { version = "=0.2.0-alpha.3", package = "madsim-tokio", features = [
4850
"rt",

src/stream/src/executor/managed_state/join/mod.rs

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
// limitations under the License.
1414

1515
mod join_entry_state;
16+
use std::alloc::Global;
1617
use std::ops::{Deref, DerefMut, Index};
1718
use std::sync::Arc;
1819

@@ -25,6 +26,7 @@ use risingwave_common::error::{ErrorCode, Result as RwResult};
2526
use risingwave_common::hash::{HashKey, PrecomputedBuildHasher};
2627
use risingwave_common::types::{DataType, Datum};
2728
use risingwave_storage::{Keyspace, StateStore};
29+
use stats_alloc::{SharedStatsAlloc, StatsAlloc};
2830

2931
/// This is a row with a match degree
3032
#[derive(Clone, Debug)]
@@ -116,9 +118,16 @@ type PkType = Row;
116118
pub type StateValueType = JoinRow;
117119
pub type HashValueType<S> = JoinEntryState<S>;
118120

121+
type JoinHashMapInner<K, S> =
122+
EvictableHashMap<K, HashValueType<S>, PrecomputedBuildHasher, SharedStatsAlloc<Global>>;
123+
119124
pub struct JoinHashMap<K: HashKey, S: StateStore> {
125+
/// Allocator
126+
alloc: SharedStatsAlloc<Global>,
120127
/// Store the join states.
121-
inner: EvictableHashMap<K, HashValueType<S>, PrecomputedBuildHasher>,
128+
// SAFETY: This is a self-referential data structure and the allocator is owned by the struct
129+
// itself. Use the field is safe iff the struct is constructed with [`moveit`](https://crates.io/crates/moveit)'s way.
130+
inner: JoinHashMapInner<K, S>,
122131
/// Data types of the columns
123132
data_types: Arc<[DataType]>,
124133
/// Data types of the columns
@@ -148,17 +157,29 @@ impl<K: HashKey, S: StateStore> JoinHashMap<K, S> {
148157
.iter()
149158
.map(|idx| data_types[*idx].clone())
150159
.collect_vec();
151-
160+
let alloc = StatsAlloc::new(Global).shared();
152161
Self {
153-
inner: EvictableHashMap::with_hasher(target_cap, PrecomputedBuildHasher),
162+
inner: EvictableHashMap::with_hasher_in(
163+
target_cap,
164+
PrecomputedBuildHasher,
165+
alloc.clone(),
166+
),
154167
data_types: data_types.into(),
155168
join_key_data_types: join_key_data_types.into(),
156169
pk_data_types: pk_data_types.into(),
157170
keyspace,
158171
current_epoch: 0,
172+
alloc,
159173
}
160174
}
161175

176+
#[allow(dead_code)]
177+
/// Report the bytes used by the join map.
178+
// FIXME: Currently, only memory used in the hash map itself is counted.
179+
pub fn bytes_in_use(&self) -> usize {
180+
self.alloc.bytes_in_use()
181+
}
182+
162183
pub fn update_epoch(&mut self, epoch: u64) {
163184
self.current_epoch = epoch;
164185
}
@@ -296,7 +317,7 @@ impl<K: HashKey, S: StateStore> JoinHashMap<K, S> {
296317
}
297318

298319
impl<K: HashKey, S: StateStore> Deref for JoinHashMap<K, S> {
299-
type Target = EvictableHashMap<K, HashValueType<S>, PrecomputedBuildHasher>;
320+
type Target = JoinHashMapInner<K, S>;
300321

301322
fn deref(&self) -> &Self::Target {
302323
&self.inner

src/stream/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
#![feature(proc_macro_hygiene)]
4242
#![feature(stmt_expr_attributes)]
4343
#![feature(unzip_option)]
44+
#![feature(allocator_api)]
4445

4546
#[macro_use]
4647
extern crate log;

src/utils/stats_alloc/Cargo.toml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
[package]
2+
name = "stats_alloc"
3+
version = "0.1.8"
4+
edition = "2021"
5+
description = "Allocator with statistics"
6+
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
7+
8+
[dependencies]
9+
workspace-hack = { version = "0.1", path = "../../workspace-hack" }
10+
11+
[dev-dependencies]

0 commit comments

Comments
 (0)