From 662b1b3ad6044230eac3fcf077d54725ee4cf907 Mon Sep 17 00:00:00 2001 From: Stepan Bagritsevich Date: Tue, 3 Jun 2025 08:22:50 +0200 Subject: [PATCH] feat: Add eviction based on rss memory Signed-off-by: Stepan Bagritsevich --- src/server/dfly_main.cc | 5 + src/server/engine_shard.cc | 50 +++++--- tests/dragonfly/memory_test.py | 220 +++++++++++++++++++++++++++++---- 3 files changed, 239 insertions(+), 36 deletions(-) diff --git a/src/server/dfly_main.cc b/src/server/dfly_main.cc index 03031be1176a..0c75f273ebb8 100644 --- a/src/server/dfly_main.cc +++ b/src/server/dfly_main.cc @@ -823,7 +823,12 @@ Usage: dragonfly [FLAGS] // export MIMALLOC_VERBOSE=1 to see the options before the override. mi_option_enable(mi_option_show_errors); mi_option_set(mi_option_max_warnings, 0); + mi_option_enable(mi_option_purge_decommits); + DCHECK(mi_option_get(mi_option_reset_decommits) == 1); + + mi_option_set(mi_option_purge_delay, 0); + DCHECK(!mi_option_get(mi_option_reset_delay)); fb2::SetDefaultStackResource(&fb2::std_malloc_resource, kFiberDefaultStackSize); diff --git a/src/server/engine_shard.cc b/src/server/engine_shard.cc index 98354cd96bae..b5d2c7cf66a2 100644 --- a/src/server/engine_shard.cc +++ b/src/server/engine_shard.cc @@ -230,28 +230,32 @@ size_t CalculateEvictionBytes() { size_t goal_bytes = CalculateHowManyBytesToEvictOnShard(max_memory_limit, global_used_memory, shard_memory_budget_threshold); - // TODO: Eviction due to rss usage is not working well as it causes eviction - // of to many keys untill we finally see decrease in rss. We need to improve - // this logic before we enable it. - /* - const double rss_oom_deny_ratio = ServerState::tlocal()->rss_oom_deny_ratio; + LOG_IF_EVERY_N(INFO, goal_bytes > 0, 50) + << "Memory goal bytes: " << goal_bytes << ", used memory: " << global_used_memory + << ", memory limit: " << max_memory_limit; + // If rss_oom_deny_ratio is set, we should evict depending on rss memory too + const double rss_oom_deny_ratio = ServerState::tlocal()->rss_oom_deny_ratio; if (rss_oom_deny_ratio > 0.0) { const size_t max_rss_memory = size_t(rss_oom_deny_ratio * max_memory_limit); - // We start eviction when we have less than eviction_memory_budget_threshold * 100% of free rss - memory const size_t shard_rss_memory_budget_threshold = + /* We start eviction when we have less than eviction_memory_budget_threshold * 100% of free rss + * memory */ + const size_t shard_rss_memory_budget_threshold = size_t(max_rss_memory * eviction_memory_budget_threshold) / shards_count; // Calculate how much rss memory is used by all shards const size_t global_used_rss_memory = rss_mem_current.load(memory_order_relaxed); // Try to evict more bytes if we are close to the rss memory limit - goal_bytes = std::max( - goal_bytes, CalculateHowManyBytesToEvictOnShard(max_rss_memory, global_used_rss_memory, - shard_rss_memory_budget_threshold)); - } - */ + const size_t rss_goal_bytes = CalculateHowManyBytesToEvictOnShard( + max_rss_memory, global_used_rss_memory, shard_rss_memory_budget_threshold); + + LOG_IF_EVERY_N(INFO, rss_goal_bytes > 0, 50) << "Rss memory goal bytes: " << rss_goal_bytes + << ", rss used memory: " << global_used_rss_memory + << ", rss memory limit: " << max_rss_memory; + goal_bytes = std::max(goal_bytes, rss_goal_bytes); + } return goal_bytes; } @@ -359,6 +363,18 @@ bool EngineShard::DefragTaskState::CheckRequired() { return false; } + std::optional shard_mem_usage; + + if (GetFlag(FLAGS_enable_heartbeat_eviction)) { + shard_mem_usage = ReadShardMemUsage(GetFlag(FLAGS_mem_defrag_page_utilization_threshold)); + const static double eviction_waste_threshold = 0.05; + if (shard_mem_usage->wasted_mem > + (uint64_t(shard_mem_usage->commited * eviction_waste_threshold))) { + VLOG(1) << "memory issue found for memory " << shard_mem_usage.value(); + return true; + } + } + const std::size_t global_threshold = max_memory_limit * GetFlag(FLAGS_mem_defrag_threshold); if (global_threshold > rss_mem_current.load(memory_order_relaxed)) { return false; @@ -373,11 +389,15 @@ bool EngineShard::DefragTaskState::CheckRequired() { } last_check_time = now; - ShardMemUsage usage = ReadShardMemUsage(GetFlag(FLAGS_mem_defrag_page_utilization_threshold)); + if (!shard_mem_usage) { + shard_mem_usage = ReadShardMemUsage(GetFlag(FLAGS_mem_defrag_page_utilization_threshold)); + } + + DCHECK(shard_mem_usage.has_value()); const double waste_threshold = GetFlag(FLAGS_mem_defrag_waste_threshold); - if (usage.wasted_mem > (uint64_t(usage.commited * waste_threshold))) { - VLOG(1) << "memory issue found for memory " << usage; + if (shard_mem_usage->wasted_mem > (uint64_t(shard_mem_usage->commited * waste_threshold))) { + VLOG(1) << "memory issue found for memory " << shard_mem_usage.value(); return true; } diff --git a/tests/dragonfly/memory_test.py b/tests/dragonfly/memory_test.py index d871371dfb01..254cb6c5c3d2 100644 --- a/tests/dragonfly/memory_test.py +++ b/tests/dragonfly/memory_test.py @@ -6,6 +6,15 @@ from .instance import DflyInstance, DflyInstanceFactory +def extract_fragmentation_waste(memory_arena): + """ + Extracts the fragmentation waste from the memory arena info. + """ + match = re.search(r"fragmentation waste:\s*([0-9.]+)%", memory_arena) + assert match.group(1) is not None + return float(match.group(1)) + + @pytest.mark.slow @pytest.mark.opt_only @pytest.mark.parametrize( @@ -176,49 +185,218 @@ async def test_eval_with_oom(df_factory: DflyInstanceFactory): assert rss_before_eval * 1.01 > info["used_memory_rss"] -@pytest.mark.skip("rss eviction disabled") @pytest.mark.asyncio -@dfly_args( - { - "proactor_threads": 1, - "cache_mode": "true", - "maxmemory": "5gb", - "rss_oom_deny_ratio": 0.8, - "max_eviction_per_heartbeat": 100, - } +@pytest.mark.parametrize( + "proactor_threads_param, maxmemory_param", + [(1, 6 * (1024**3)), (4, 6 * (1024**3))], ) -async def test_cache_eviction_with_rss_deny_oom( - async_client: aioredis.Redis, +async def test_cache_eviction_with_rss_deny_oom_simple_case( + df_factory: DflyInstanceFactory, + proactor_threads_param, + maxmemory_param, ): """ Test to verify that cache eviction is triggered even if used memory is small but rss memory is above limit """ + df_server = df_factory.create( + proactor_threads=proactor_threads_param, + cache_mode="true", + maxmemory=maxmemory_param, + rss_oom_deny_ratio=0.8, + ) + df_server.start() - max_memory = 5 * 1024 * 1024 * 1024 # 5G - rss_max_memory = int(max_memory * 0.8) + async_client = df_server.client() - data_fill_size = int(0.9 * rss_max_memory) # 95% of rss_max_memory + max_memory = maxmemory_param + rss_oom_deny_ratio = 0.8 + eviction_memory_budget_threshold = 0.1 # 10% of max_memory + + rss_eviction_threshold = max_memory * (rss_oom_deny_ratio - eviction_memory_budget_threshold) + + data_fill_size = int((rss_oom_deny_ratio + 0.05) * max_memory) # 85% of max_memory val_size = 1024 * 5 # 5 kb num_keys = data_fill_size // val_size await async_client.execute_command("DEBUG", "POPULATE", num_keys, "key", val_size) - # Test that used memory is less than 90% of max memory + + # Test that used memory is less than 90% of max memory to not to start eviction based on used_memory memory_info = await async_client.info("memory") assert ( memory_info["used_memory"] < max_memory * 0.9 - ), "Used memory should be less than 90% of max memory." + ), "Used memory should be less than 90% of max memory to not to start eviction based on used_memory." assert ( - memory_info["used_memory_rss"] > rss_max_memory * 0.9 - ), "RSS memory should be less than 90% of rss max memory (max_memory * rss_oom_deny_ratio)." + memory_info["used_memory_rss"] > max_memory * rss_oom_deny_ratio + ), "Used RSS memory should be more than 80% of rss max memory (max_memory * rss_oom_deny_ratio) to start eviction based on rss memory usage." - # Get RSS memory after creating new connections memory_info = await async_client.info("memory") - while memory_info["used_memory_rss"] > rss_max_memory * 0.9: + prev_evicted_keys = 0 + evicted_keys_repeat_count = 0 + while True: + # Wait for some time await asyncio.sleep(1) + memory_info = await async_client.info("memory") logging.info( - f'Current rss: {memory_info["used_memory_rss"]}. rss eviction threshold: {rss_max_memory * 0.9}.' + f'Current used memory: {memory_info["used_memory"]}, current used rss: {memory_info["used_memory_rss"]}, rss eviction threshold: {rss_eviction_threshold}.' ) + stats_info = await async_client.info("stats") logging.info(f'Current evicted: {stats_info["evicted_keys"]}. Total keys: {num_keys}.') + + # Check if evicted keys are not increasing + if prev_evicted_keys == stats_info["evicted_keys"]: + evicted_keys_repeat_count += 1 + else: + prev_evicted_keys = stats_info["evicted_keys"] + evicted_keys_repeat_count = 1 + + if evicted_keys_repeat_count > 2: + break + + # Wait for some time + await asyncio.sleep(2) + + memory_arena = await async_client.execute_command("MEMORY", "ARENA") + fragmentation_waste = extract_fragmentation_waste(memory_arena) + logging.info(f"Memory fragmentation waste: {fragmentation_waste}") + assert fragmentation_waste < 12.0, "Memory fragmentation waste should be less than 12%." + + # Assert that no more keys are evicted + memory_info = await async_client.info("memory") + stats_info = await async_client.info("stats") + + assert memory_info["used_memory"] > max_memory * ( + rss_oom_deny_ratio - eviction_memory_budget_threshold - 0.18 + ), "We should not evict all items." + assert memory_info["used_memory"] < max_memory * ( + rss_oom_deny_ratio - eviction_memory_budget_threshold + ), "Used memory should be smaller than threshold." + assert memory_info["used_memory_rss"] > max_memory * ( + rss_oom_deny_ratio - eviction_memory_budget_threshold - 0.18 + ), "We should not evict all items." + + evicted_keys = stats_info["evicted_keys"] + # We may evict slightly more than prev_evicted_keys due to gaps in RSS memory usage + assert ( + evicted_keys > 0 + and evicted_keys >= prev_evicted_keys + and evicted_keys <= prev_evicted_keys * 1.0015 + ), "We should not evict more items." + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + "proactor_threads_param, maxmemory_param", + [(1, 6 * (1024**3)), (4, 6 * (1024**3))], +) +async def test_cache_eviction_with_rss_deny_oom_two_waves( + df_factory: DflyInstanceFactory, proactor_threads_param, maxmemory_param +): + """ + Test to verify that cache eviction is triggered even if used memory is small but rss memory is above limit + It is similar to the test_cache_eviction_with_rss_deny_oom_simple_case but here we have two waves of data filling: + 1. First wave fills the instance to 85% of max memory, which is above rss_oom_deny_ratio. + 2. Then we wait for eviction to happen based on rss memory usage. After eviction we should have 70% of max memory used. + 3. Second wave fills the instance to 90% of max memory, which is above rss_oom_deny_ratio. + 4. Second time eviction should happen + """ + df_server = df_factory.create( + proactor_threads=proactor_threads_param, + cache_mode="true", + maxmemory=maxmemory_param, + rss_oom_deny_ratio=0.8, + ) + df_server.start() + + async_client = df_server.client() + + max_memory = maxmemory_param + rss_oom_deny_ratio = 0.8 + eviction_memory_budget_threshold = 0.1 # 10% of max_memory + + rss_eviction_threshold = max_memory * (rss_oom_deny_ratio - eviction_memory_budget_threshold) + + # first wave fills 85% of max memory + # second wave fills 20% of max memory + data_fill_size = [ + int((rss_oom_deny_ratio + 0.05) * max_memory), + int((1 - rss_oom_deny_ratio) * max_memory), + ] + + val_size = 1024 * 5 # 5 kb + + for i in range(2): + if i > 0: + await asyncio.sleep(2) + + num_keys = data_fill_size[i] // val_size + logging.info( + f"Populating data for wave {i}. Data fill size: {data_fill_size[i]}. Number of keys: {num_keys}." + ) + await async_client.execute_command("DEBUG", "POPULATE", num_keys, f"key{i}", val_size) + + # Test that used memory is less than 90% of max memory to not to start eviction based on used_memory + memory_info = await async_client.info("memory") + assert ( + memory_info["used_memory"] < max_memory * 0.9 + ), "Used memory should be less than 90% of max memory to not to start eviction based on used_memory." + assert ( + memory_info["used_memory_rss"] > max_memory * rss_oom_deny_ratio + ), "Used RSS memory should be more than 80% of rss max memory (max_memory * rss_oom_deny_ratio) to start eviction based on rss memory usage." + + memory_info = await async_client.info("memory") + prev_evicted_keys = 0 + evicted_keys_repeat_count = 0 + while True: + # Wait for some time + await asyncio.sleep(1) + + memory_info = await async_client.info("memory") + logging.info( + f'Current used memory: {memory_info["used_memory"]}, current used rss: {memory_info["used_memory_rss"]}, rss eviction threshold: {rss_eviction_threshold}.' + ) + + stats_info = await async_client.info("stats") + logging.info(f'Current evicted: {stats_info["evicted_keys"]}. Total keys: {num_keys}.') + + # Check if evicted keys are not increasing + if prev_evicted_keys == stats_info["evicted_keys"]: + evicted_keys_repeat_count += 1 + else: + prev_evicted_keys = stats_info["evicted_keys"] + evicted_keys_repeat_count = 1 + + if evicted_keys_repeat_count > 2: + break + + # Wait for some time + await asyncio.sleep(2) + + memory_arena = await async_client.execute_command("MEMORY", "ARENA") + fragmentation_waste = extract_fragmentation_waste(memory_arena) + logging.info(f"Memory fragmentation waste: {fragmentation_waste}") + assert fragmentation_waste < 12.0, "Memory fragmentation waste should be less than 12%." + + # Assert that no more keys are evicted + memory_info = await async_client.info("memory") + stats_info = await async_client.info("stats") + + assert memory_info["used_memory"] > max_memory * ( + rss_oom_deny_ratio - eviction_memory_budget_threshold - 0.18 + ), "We should not evict all items." + assert memory_info["used_memory"] < max_memory * ( + rss_oom_deny_ratio - eviction_memory_budget_threshold + ), "Used memory should be smaller than threshold." + assert memory_info["used_memory_rss"] > max_memory * ( + rss_oom_deny_ratio - eviction_memory_budget_threshold - 0.18 + ), "We should not evict all items." + + evicted_keys = stats_info["evicted_keys"] + # We may evict slightly more than prev_evicted_keys due to gaps in RSS memory usage + assert ( + evicted_keys > 0 + and evicted_keys >= prev_evicted_keys + and evicted_keys <= prev_evicted_keys * 1.0015 + ), "We should not evict more items."