diff --git a/Cargo.lock b/Cargo.lock index e54dc78b1c341..27aa563b0eb49 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2214,6 +2214,14 @@ dependencies = [ "hashbrown 0.11.2", ] +[[package]] +name = "lru" +version = "0.7.6" +source = "git+https://github.com/singularity-data/lru-rs.git?rev=e0e9ddaf8e4a51c244a03676734437e68a336b30#e0e9ddaf8e4a51c244a03676734437e68a336b30" +dependencies = [ + "hashbrown 0.11.2", +] + [[package]] name = "lz4" version = "1.23.3" @@ -3693,7 +3701,7 @@ dependencies = [ "itertools", "lazy_static", "log", - "lru", + "lru 0.7.6 (git+https://github.com/singularity-data/lru-rs.git?rev=e0e9ddaf8e4a51c244a03676734437e68a336b30)", "madsim", "madsim-tokio", "madsim-tonic", @@ -3890,7 +3898,7 @@ dependencies = [ "itertools", "lazy_static", "log", - "lru", + "lru 0.7.6 (registry+https://github.com/rust-lang/crates.io-index)", "madsim", "madsim-tokio", "madsim-tonic", @@ -4321,6 +4329,7 @@ dependencies = [ "num-traits", "parking_lot", "paste", + "pin-project", "prometheus", "prost", "rdkafka", @@ -4336,6 +4345,7 @@ dependencies = [ "serde_json", "smallvec", "static_assertions", + "stats_alloc", "thiserror", "tokio-stream", "tower", @@ -4742,6 +4752,13 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" +[[package]] +name = "stats_alloc" +version = "0.1.8" +dependencies = [ + "workspace-hack", +] + [[package]] name = "stringprep" version = "0.1.2" diff --git a/Cargo.toml b/Cargo.toml index fe00522b52d64..c25194da75a32 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,6 +29,7 @@ members = [ "src/utils/memcomparable", "src/utils/pgwire", "src/utils/static-link", + "src/utils/stats_alloc", "src/utils/value-encoding", "src/workspace-hack", ] diff --git a/src/common/Cargo.toml b/src/common/Cargo.toml index 455c2143a7dbe..921d5075f2c50 100644 --- a/src/common/Cargo.toml +++ b/src/common/Cargo.toml @@ -19,7 +19,7 @@ hyper = "0.14" itertools = "0.10" lazy_static = "1" log = "0.4" -lru = "0.7" +lru = { git = "https://github.com/singularity-data/lru-rs.git", rev = "e0e9ddaf8e4a51c244a03676734437e68a336b30" } madsim = "=0.2.0-alpha.3" memcomparable = { path = "../utils/memcomparable" } more-asserts = "0.2" diff --git a/src/common/src/collection/evictable.rs b/src/common/src/collection/evictable.rs index a5fc6f590e7db..a8d20657dcf6b 100644 --- a/src/common/src/collection/evictable.rs +++ b/src/common/src/collection/evictable.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::alloc::{Allocator, Global}; use std::cmp::Eq; use std::hash::{BuildHasher, Hash}; use std::ops::{Deref, DerefMut}; @@ -19,13 +20,30 @@ use std::ops::{Deref, DerefMut}; use lru::{DefaultHasher, LruCache}; /// A wrapper for [`LruCache`] which provides manual eviction. -pub struct EvictableHashMap { - inner: LruCache, +pub struct EvictableHashMap { + inner: LruCache, /// Target capacity to keep when calling `evict_to_target_cap`. target_cap: usize, } +impl EvictableHashMap { + /// Create a [`EvictableHashMap`] with the given target capacity and allocator. + pub fn new_in(target_cap: usize, alloc: A) -> Self { + Self::with_hasher_in(target_cap, DefaultHasher::new(), alloc) + } +} + +impl EvictableHashMap { + /// Create a [`EvictableHashMap`] with the given target capacity, hasher and allocator. + pub fn with_hasher_in(target_cap: usize, hasher: S, alloc: A) -> Self { + Self { + inner: LruCache::unbounded_with_hasher_in(hasher, alloc), + target_cap, + } + } +} + impl EvictableHashMap { /// Create a [`EvictableHashMap`] with the given target capacity. pub fn new(target_cap: usize) -> EvictableHashMap { @@ -41,7 +59,9 @@ impl EvictableHashMap { target_cap, } } +} +impl EvictableHashMap { pub fn target_cap(&self) -> usize { self.target_cap } @@ -79,15 +99,15 @@ impl EvictableHashMap { } } -impl Deref for EvictableHashMap { - type Target = LruCache; +impl Deref for EvictableHashMap { + type Target = LruCache; fn deref(&self) -> &Self::Target { &self.inner } } -impl DerefMut for EvictableHashMap { +impl DerefMut for EvictableHashMap { fn deref_mut(&mut self) -> &mut Self::Target { &mut self.inner } diff --git a/src/common/src/lib.rs b/src/common/src/lib.rs index 9af1591c8979b..f04554363ef00 100644 --- a/src/common/src/lib.rs +++ b/src/common/src/lib.rs @@ -35,6 +35,7 @@ #![feature(type_alias_impl_trait)] #![feature(test)] #![feature(trusted_len)] +#![feature(allocator_api)] #[macro_use] pub mod error; diff --git a/src/stream/Cargo.toml b/src/stream/Cargo.toml index ca2496f5e616e..649b5e8d8db29 100644 --- a/src/stream/Cargo.toml +++ b/src/stream/Cargo.toml @@ -28,6 +28,7 @@ memcomparable = { path = "../utils/memcomparable" } num-traits = "0.2" parking_lot = "0.12" paste = "1" +pin-project = "1" prometheus = { version = "0.13", features = ["process"] } prost = "0.10" rdkafka = { version = "0.28", features = ["cmake-build"] } @@ -43,6 +44,7 @@ serde-value = "0.7" serde_json = "1" smallvec = "1" static_assertions = "1" +stats_alloc = { path = "../utils/stats_alloc" } thiserror = "1" tokio = { version = "=0.2.0-alpha.3", package = "madsim-tokio", features = [ "rt", diff --git a/src/stream/src/executor/managed_state/join/mod.rs b/src/stream/src/executor/managed_state/join/mod.rs index 8628fc21b6b02..bcfad938fa71b 100644 --- a/src/stream/src/executor/managed_state/join/mod.rs +++ b/src/stream/src/executor/managed_state/join/mod.rs @@ -13,6 +13,7 @@ // limitations under the License. mod join_entry_state; +use std::alloc::Global; use std::ops::{Deref, DerefMut, Index}; use std::sync::Arc; @@ -25,6 +26,7 @@ use risingwave_common::error::{ErrorCode, Result as RwResult}; use risingwave_common::hash::{HashKey, PrecomputedBuildHasher}; use risingwave_common::types::{DataType, Datum}; use risingwave_storage::{Keyspace, StateStore}; +use stats_alloc::{SharedStatsAlloc, StatsAlloc}; /// This is a row with a match degree #[derive(Clone, Debug)] @@ -116,9 +118,16 @@ type PkType = Row; pub type StateValueType = JoinRow; pub type HashValueType = JoinEntryState; +type JoinHashMapInner = + EvictableHashMap, PrecomputedBuildHasher, SharedStatsAlloc>; + pub struct JoinHashMap { + /// Allocator + alloc: SharedStatsAlloc, /// Store the join states. - inner: EvictableHashMap, PrecomputedBuildHasher>, + // SAFETY: This is a self-referential data structure and the allocator is owned by the struct + // itself. Use the field is safe iff the struct is constructed with [`moveit`](https://crates.io/crates/moveit)'s way. + inner: JoinHashMapInner, /// Data types of the columns data_types: Arc<[DataType]>, /// Data types of the columns @@ -148,17 +157,29 @@ impl JoinHashMap { .iter() .map(|idx| data_types[*idx].clone()) .collect_vec(); - + let alloc = StatsAlloc::new(Global).shared(); Self { - inner: EvictableHashMap::with_hasher(target_cap, PrecomputedBuildHasher), + inner: EvictableHashMap::with_hasher_in( + target_cap, + PrecomputedBuildHasher, + alloc.clone(), + ), data_types: data_types.into(), join_key_data_types: join_key_data_types.into(), pk_data_types: pk_data_types.into(), keyspace, current_epoch: 0, + alloc, } } + #[allow(dead_code)] + /// Report the bytes used by the join map. + // FIXME: Currently, only memory used in the hash map itself is counted. + pub fn bytes_in_use(&self) -> usize { + self.alloc.bytes_in_use() + } + pub fn update_epoch(&mut self, epoch: u64) { self.current_epoch = epoch; } @@ -296,7 +317,7 @@ impl JoinHashMap { } impl Deref for JoinHashMap { - type Target = EvictableHashMap, PrecomputedBuildHasher>; + type Target = JoinHashMapInner; fn deref(&self) -> &Self::Target { &self.inner diff --git a/src/stream/src/lib.rs b/src/stream/src/lib.rs index 1b53d145e2895..45685eaf9d054 100644 --- a/src/stream/src/lib.rs +++ b/src/stream/src/lib.rs @@ -41,6 +41,7 @@ #![feature(proc_macro_hygiene)] #![feature(stmt_expr_attributes)] #![feature(unzip_option)] +#![feature(allocator_api)] #[macro_use] extern crate log; diff --git a/src/utils/stats_alloc/Cargo.toml b/src/utils/stats_alloc/Cargo.toml new file mode 100644 index 0000000000000..d111a74f64636 --- /dev/null +++ b/src/utils/stats_alloc/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "stats_alloc" +version = "0.1.8" +edition = "2021" +description = "Allocator with statistics" +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +workspace-hack = { version = "0.1", path = "../../workspace-hack" } + +[dev-dependencies] diff --git a/src/utils/stats_alloc/src/lib.rs b/src/utils/stats_alloc/src/lib.rs new file mode 100644 index 0000000000000..7c8e0b55e6c95 --- /dev/null +++ b/src/utils/stats_alloc/src/lib.rs @@ -0,0 +1,180 @@ +// Copyright 2022 Singularity Data +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#![feature(allocator_api)] + +use std::alloc::Allocator; +use std::ops::Deref; +use std::sync::atomic::AtomicUsize; +use std::sync::{atomic, Arc}; + +pub struct StatsAlloc { + bytes_in_use: AtomicUsize, + + inner: T, +} + +impl StatsAlloc { + pub fn new(inner: T) -> Self { + Self { + bytes_in_use: AtomicUsize::new(0), + inner, + } + } + + pub fn bytes_in_use(&self) -> usize { + self.bytes_in_use.load(atomic::Ordering::Relaxed) + } + + pub fn shared(self) -> SharedStatsAlloc { + SharedStatsAlloc(Arc::new(self)) + } +} + +unsafe impl Allocator for StatsAlloc +where + T: Allocator, +{ + #[inline(always)] + fn allocate( + &self, + layout: std::alloc::Layout, + ) -> Result, std::alloc::AllocError> { + self.bytes_in_use + .fetch_add(layout.size(), atomic::Ordering::Relaxed); + self.inner.allocate(layout) + } + + #[inline(always)] + unsafe fn deallocate(&self, ptr: std::ptr::NonNull, layout: std::alloc::Layout) { + self.bytes_in_use + .fetch_sub(layout.size(), atomic::Ordering::Relaxed); + self.inner.deallocate(ptr, layout) + } + + #[inline(always)] + fn allocate_zeroed( + &self, + layout: std::alloc::Layout, + ) -> Result, std::alloc::AllocError> { + self.bytes_in_use + .fetch_add(layout.size(), atomic::Ordering::Relaxed); + self.inner.allocate_zeroed(layout) + } + + #[inline(always)] + unsafe fn grow( + &self, + ptr: std::ptr::NonNull, + old_layout: std::alloc::Layout, + new_layout: std::alloc::Layout, + ) -> Result, std::alloc::AllocError> { + self.bytes_in_use + .fetch_add(new_layout.size(), atomic::Ordering::Relaxed); + self.bytes_in_use + .fetch_sub(old_layout.size(), atomic::Ordering::Relaxed); + self.inner.grow(ptr, old_layout, new_layout) + } + + #[inline(always)] + unsafe fn grow_zeroed( + &self, + ptr: std::ptr::NonNull, + old_layout: std::alloc::Layout, + new_layout: std::alloc::Layout, + ) -> Result, std::alloc::AllocError> { + self.bytes_in_use + .fetch_add(new_layout.size(), atomic::Ordering::Relaxed); + self.bytes_in_use + .fetch_sub(old_layout.size(), atomic::Ordering::Relaxed); + self.inner.grow_zeroed(ptr, old_layout, new_layout) + } + + #[inline(always)] + unsafe fn shrink( + &self, + ptr: std::ptr::NonNull, + old_layout: std::alloc::Layout, + new_layout: std::alloc::Layout, + ) -> Result, std::alloc::AllocError> { + self.bytes_in_use + .fetch_add(new_layout.size(), atomic::Ordering::Relaxed); + self.bytes_in_use + .fetch_sub(old_layout.size(), atomic::Ordering::Relaxed); + self.inner.shrink(ptr, old_layout, new_layout) + } +} + +pub struct SharedStatsAlloc(Arc>); + +impl Clone for SharedStatsAlloc { + fn clone(&self) -> Self { + SharedStatsAlloc(self.0.clone()) + } +} + +impl Deref for SharedStatsAlloc { + type Target = StatsAlloc; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +unsafe impl Allocator for SharedStatsAlloc { + fn allocate( + &self, + layout: std::alloc::Layout, + ) -> Result, std::alloc::AllocError> { + self.0.allocate(layout) + } + + unsafe fn deallocate(&self, ptr: std::ptr::NonNull, layout: std::alloc::Layout) { + self.0.deallocate(ptr, layout) + } + + fn allocate_zeroed( + &self, + layout: std::alloc::Layout, + ) -> Result, std::alloc::AllocError> { + self.0.allocate_zeroed(layout) + } + + unsafe fn grow( + &self, + ptr: std::ptr::NonNull, + old_layout: std::alloc::Layout, + new_layout: std::alloc::Layout, + ) -> Result, std::alloc::AllocError> { + self.0.grow(ptr, old_layout, new_layout) + } + + unsafe fn grow_zeroed( + &self, + ptr: std::ptr::NonNull, + old_layout: std::alloc::Layout, + new_layout: std::alloc::Layout, + ) -> Result, std::alloc::AllocError> { + self.0.grow_zeroed(ptr, old_layout, new_layout) + } + + unsafe fn shrink( + &self, + ptr: std::ptr::NonNull, + old_layout: std::alloc::Layout, + new_layout: std::alloc::Layout, + ) -> Result, std::alloc::AllocError> { + self.0.shrink(ptr, old_layout, new_layout) + } +} diff --git a/src/workspace-hack/Cargo.toml b/src/workspace-hack/Cargo.toml index ce345bd24f42f..92e36ee8942ac 100644 --- a/src/workspace-hack/Cargo.toml +++ b/src/workspace-hack/Cargo.toml @@ -32,7 +32,7 @@ futures-io = { version = "0.3", features = ["std"] } futures-sink = { version = "0.3", features = ["alloc", "std"] } futures-task = { version = "0.3", default-features = false, features = ["alloc", "std"] } futures-util = { version = "0.3", features = ["alloc", "async-await", "async-await-macro", "channel", "futures-channel", "futures-io", "futures-macro", "futures-sink", "io", "memchr", "sink", "slab", "std"] } -hashbrown = { version = "0.11", features = ["ahash", "inline-more", "raw"] } +hashbrown = { version = "0.11", features = ["ahash", "inline-more", "nightly", "raw"] } hyper = { version = "0.14", features = ["client", "full", "h2", "http1", "http2", "runtime", "server", "socket2", "stream", "tcp"] } indexmap = { version = "1", default-features = false, features = ["std"] } isahc = { version = "1", default-features = false, features = ["encoding_rs", "mime", "text-decoding"] } @@ -86,7 +86,7 @@ futures-io = { version = "0.3", features = ["std"] } futures-sink = { version = "0.3", features = ["alloc", "std"] } futures-task = { version = "0.3", default-features = false, features = ["alloc", "std"] } futures-util = { version = "0.3", features = ["alloc", "async-await", "async-await-macro", "channel", "futures-channel", "futures-io", "futures-macro", "futures-sink", "io", "memchr", "sink", "slab", "std"] } -hashbrown = { version = "0.11", features = ["ahash", "inline-more", "raw"] } +hashbrown = { version = "0.11", features = ["ahash", "inline-more", "nightly", "raw"] } hyper = { version = "0.14", features = ["client", "full", "h2", "http1", "http2", "runtime", "server", "socket2", "stream", "tcp"] } indexmap = { version = "1", default-features = false, features = ["std"] } isahc = { version = "1", default-features = false, features = ["encoding_rs", "mime", "text-decoding"] }