From a3827de946c61008c85cf4cd318332fb2e0f6e70 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Thu, 16 Oct 2025 16:12:51 +0800 Subject: [PATCH 1/2] chore: remove `local_stats_alloc` Signed-off-by: Bugen Zhao --- Cargo.lock | 8 - Cargo.toml | 1 - docs/rustdoc/index.md | 1 - src/stream/Cargo.toml | 1 - src/stream/src/executor/join/hash_join.rs | 8 +- src/stream/src/executor/temporal_join.rs | 8 +- src/utils/local_stats_alloc/Cargo.toml | 17 -- src/utils/local_stats_alloc/src/lib.rs | 188 ---------------------- 8 files changed, 4 insertions(+), 228 deletions(-) delete mode 100644 src/utils/local_stats_alloc/Cargo.toml delete mode 100644 src/utils/local_stats_alloc/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index 64f08baf876fe..c9b9189b6c856 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7635,13 +7635,6 @@ version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f5e54036fe321fd421e10d732f155734c4e4afd610dd556d9a82833ab3ee0bed" -[[package]] -name = "local_stats_alloc" -version = "2.7.0-alpha" -dependencies = [ - "workspace-hack", -] - [[package]] name = "lock_api" version = "0.4.10" @@ -12921,7 +12914,6 @@ dependencies = [ "iceberg", "itertools 0.14.0", "jsonbb", - "local_stats_alloc", "lru 0.7.6", "madsim-tokio", "madsim-tonic", diff --git a/Cargo.toml b/Cargo.toml index 206381c378064..0ce17a953656b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -61,7 +61,6 @@ members = [ "src/utils/delta_btree_map", "src/utils/futures_util", "src/utils/iter_util", - "src/utils/local_stats_alloc", "src/utils/openai_embedding_service", "src/utils/pgwire", "src/utils/resource_util", diff --git a/docs/rustdoc/index.md b/docs/rustdoc/index.md index 9978aa12cd65e..101a283648603 100644 --- a/docs/rustdoc/index.md +++ b/docs/rustdoc/index.md @@ -47,6 +47,5 @@ Common functionalities shared inside RisingWave. The crates under `src/utils` are several independent util crates which helps to simplify development. We plan to publish them to [crates.io](https://crates.io/) in future when they are more mature. -- [local_stats_alloc](local_stats_alloc/index.html) - [memcomparable](memcomparable/index.html) - [pgwire](pgwire/index.html) diff --git a/src/stream/Cargo.toml b/src/stream/Cargo.toml index c5c8f45b7774c..dec6b2cb781c3 100644 --- a/src/stream/Cargo.toml +++ b/src/stream/Cargo.toml @@ -33,7 +33,6 @@ hytra = "0.1.2" iceberg = { workspace = true } itertools = { workspace = true } jsonbb = { workspace = true } -local_stats_alloc = { path = "../utils/local_stats_alloc" } lru = { workspace = true } maplit = "1.0.2" memcomparable = "0.2" diff --git a/src/stream/src/executor/join/hash_join.rs b/src/stream/src/executor/join/hash_join.rs index c374fe05fc7f8..3fbeb745be76e 100644 --- a/src/stream/src/executor/join/hash_join.rs +++ b/src/stream/src/executor/join/hash_join.rs @@ -21,7 +21,6 @@ use futures::future::{join, try_join}; use futures::{StreamExt, pin_mut, stream}; use futures_async_stream::for_await; use join_row_set::JoinRowSet; -use local_stats_alloc::{SharedStatsAlloc, StatsAlloc}; use risingwave_common::bitmap::Bitmap; use risingwave_common::hash::{HashKey, PrecomputedBuildHasher}; use risingwave_common::metrics::LabelGuardedIntCounter; @@ -102,8 +101,7 @@ impl DerefMut for HashValueWrapper { } } -type JoinHashMapInner = - ManagedLruCache, PrecomputedBuildHasher, SharedStatsAlloc>; +type JoinHashMapInner = ManagedLruCache, PrecomputedBuildHasher>; pub struct JoinHashMapMetrics { /// Basic information @@ -425,7 +423,6 @@ impl JoinHashMap { fragment_id: FragmentId, side: &'static str, ) -> Self { - let alloc = StatsAlloc::new(Global).shared(); // TODO: unify pk encoding with state table. let pk_data_types = state_pk_indices .iter() @@ -461,11 +458,10 @@ impl JoinHashMap { format!("hash join {}", side), ); - let cache = ManagedLruCache::unbounded_with_hasher_in( + let cache = ManagedLruCache::unbounded_with_hasher( watermark_sequence, metrics_info, PrecomputedBuildHasher, - alloc, ); Self { diff --git a/src/stream/src/executor/temporal_join.rs b/src/stream/src/executor/temporal_join.rs index 3242754db852b..159c31c868704 100644 --- a/src/stream/src/executor/temporal_join.rs +++ b/src/stream/src/executor/temporal_join.rs @@ -20,7 +20,6 @@ use either::Either; use futures::TryStreamExt; use futures::stream::{self, PollNext}; use itertools::Itertools; -use local_stats_alloc::{SharedStatsAlloc, StatsAlloc}; use lru::DefaultHasher; use risingwave_common::array::Op; use risingwave_common::bitmap::BitmapBuilder; @@ -109,7 +108,7 @@ struct TemporalSide { source: BatchTable, table_stream_key_indices: Vec, table_output_indices: Vec, - cache: ManagedLruCache>, + cache: ManagedLruCache, join_key_data_types: Vec, } @@ -613,8 +612,6 @@ impl, memo_table: Option>, ) -> Self { - let alloc = StatsAlloc::new(Global).shared(); - let metrics_info = MetricsInfo::new( metrics.clone(), table.table_id().table_id, @@ -622,11 +619,10 @@ impl { - bytes_in_use: AtomicUsize, - - inner: T, -} - -impl StatsAlloc { - pub fn new(inner: T) -> Self { - Self { - bytes_in_use: AtomicUsize::new(0), - inner, - } - } - - pub fn bytes_in_use(&self) -> usize { - self.bytes_in_use.load(atomic::Ordering::Relaxed) - } - - pub fn shared(self) -> SharedStatsAlloc { - SharedStatsAlloc(Arc::new(self)) - } -} - -unsafe impl Allocator for StatsAlloc -where - T: Allocator, -{ - #[inline(always)] - fn allocate( - &self, - layout: std::alloc::Layout, - ) -> Result, std::alloc::AllocError> { - self.bytes_in_use - .fetch_add(layout.size(), atomic::Ordering::Relaxed); - self.inner.allocate(layout) - } - - #[inline(always)] - unsafe fn deallocate(&self, ptr: std::ptr::NonNull, layout: std::alloc::Layout) { - unsafe { - self.bytes_in_use - .fetch_sub(layout.size(), atomic::Ordering::Relaxed); - self.inner.deallocate(ptr, layout) - } - } - - #[inline(always)] - fn allocate_zeroed( - &self, - layout: std::alloc::Layout, - ) -> Result, std::alloc::AllocError> { - self.bytes_in_use - .fetch_add(layout.size(), atomic::Ordering::Relaxed); - self.inner.allocate_zeroed(layout) - } - - #[inline(always)] - unsafe fn grow( - &self, - ptr: std::ptr::NonNull, - old_layout: std::alloc::Layout, - new_layout: std::alloc::Layout, - ) -> Result, std::alloc::AllocError> { - unsafe { - self.bytes_in_use - .fetch_add(new_layout.size(), atomic::Ordering::Relaxed); - self.bytes_in_use - .fetch_sub(old_layout.size(), atomic::Ordering::Relaxed); - self.inner.grow(ptr, old_layout, new_layout) - } - } - - #[inline(always)] - unsafe fn grow_zeroed( - &self, - ptr: std::ptr::NonNull, - old_layout: std::alloc::Layout, - new_layout: std::alloc::Layout, - ) -> Result, std::alloc::AllocError> { - unsafe { - self.bytes_in_use - .fetch_add(new_layout.size(), atomic::Ordering::Relaxed); - self.bytes_in_use - .fetch_sub(old_layout.size(), atomic::Ordering::Relaxed); - self.inner.grow_zeroed(ptr, old_layout, new_layout) - } - } - - #[inline(always)] - unsafe fn shrink( - &self, - ptr: std::ptr::NonNull, - old_layout: std::alloc::Layout, - new_layout: std::alloc::Layout, - ) -> Result, std::alloc::AllocError> { - unsafe { - self.bytes_in_use - .fetch_add(new_layout.size(), atomic::Ordering::Relaxed); - self.bytes_in_use - .fetch_sub(old_layout.size(), atomic::Ordering::Relaxed); - self.inner.shrink(ptr, old_layout, new_layout) - } - } -} - -pub struct SharedStatsAlloc(Arc>); - -impl Clone for SharedStatsAlloc { - fn clone(&self) -> Self { - SharedStatsAlloc(self.0.clone()) - } -} - -impl Deref for SharedStatsAlloc { - type Target = StatsAlloc; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -unsafe impl Allocator for SharedStatsAlloc { - fn allocate( - &self, - layout: std::alloc::Layout, - ) -> Result, std::alloc::AllocError> { - self.0.allocate(layout) - } - - unsafe fn deallocate(&self, ptr: std::ptr::NonNull, layout: std::alloc::Layout) { - unsafe { self.0.deallocate(ptr, layout) } - } - - fn allocate_zeroed( - &self, - layout: std::alloc::Layout, - ) -> Result, std::alloc::AllocError> { - self.0.allocate_zeroed(layout) - } - - unsafe fn grow( - &self, - ptr: std::ptr::NonNull, - old_layout: std::alloc::Layout, - new_layout: std::alloc::Layout, - ) -> Result, std::alloc::AllocError> { - unsafe { self.0.grow(ptr, old_layout, new_layout) } - } - - unsafe fn grow_zeroed( - &self, - ptr: std::ptr::NonNull, - old_layout: std::alloc::Layout, - new_layout: std::alloc::Layout, - ) -> Result, std::alloc::AllocError> { - unsafe { self.0.grow_zeroed(ptr, old_layout, new_layout) } - } - - unsafe fn shrink( - &self, - ptr: std::ptr::NonNull, - old_layout: std::alloc::Layout, - new_layout: std::alloc::Layout, - ) -> Result, std::alloc::AllocError> { - unsafe { self.0.shrink(ptr, old_layout, new_layout) } - } -} From f5b78af04402b697fe849018d0f4257fb0c1157d Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Thu, 16 Oct 2025 16:51:38 +0800 Subject: [PATCH 2/2] fix check Signed-off-by: Bugen Zhao --- src/stream/src/executor/join/hash_join.rs | 2 +- src/stream/src/executor/temporal_join.rs | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/src/stream/src/executor/join/hash_join.rs b/src/stream/src/executor/join/hash_join.rs index 3fbeb745be76e..ba850a71b0374 100644 --- a/src/stream/src/executor/join/hash_join.rs +++ b/src/stream/src/executor/join/hash_join.rs @@ -11,7 +11,7 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. -use std::alloc::Global; + use std::cmp::Ordering; use std::ops::{Bound, Deref, DerefMut, RangeBounds}; use std::sync::Arc; diff --git a/src/stream/src/executor/temporal_join.rs b/src/stream/src/executor/temporal_join.rs index 159c31c868704..186b69bf9cf3a 100644 --- a/src/stream/src/executor/temporal_join.rs +++ b/src/stream/src/executor/temporal_join.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::alloc::Global; use std::collections::HashMap; use std::collections::hash_map::Entry;