Skip to content

Commit f9875f6

Browse files
feat: Add eviction based on rss memory
Signed-off-by: Stepan Bagritsevich <[email protected]>
1 parent 680722d commit f9875f6

File tree

4 files changed

+189
-63
lines changed

4 files changed

+189
-63
lines changed

src/server/dfly_main.cc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -814,7 +814,12 @@ Usage: dragonfly [FLAGS]
814814
// export MIMALLOC_VERBOSE=1 to see the options before the override.
815815
mi_option_enable(mi_option_show_errors);
816816
mi_option_set(mi_option_max_warnings, 0);
817+
817818
mi_option_enable(mi_option_purge_decommits);
819+
DCHECK(mi_option_get(mi_option_reset_decommits) == 1);
820+
821+
mi_option_set(mi_option_purge_delay, 0);
822+
DCHECK(!mi_option_get(mi_option_reset_delay));
818823

819824
fb2::SetDefaultStackResource(&fb2::std_malloc_resource, kFiberDefaultStackSize);
820825

src/server/engine_shard.cc

Lines changed: 92 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,12 @@ extern "C" {
1919
#include "server/search/doc_index.h"
2020
#include "server/server_state.h"
2121
#include "server/tiered_storage.h"
22+
#include "server/tiering/common.h"
2223
#include "server/transaction.h"
2324
#include "util/fibers/proactor_base.h"
2425

2526
using namespace std;
27+
using namespace ::dfly::tiering::literals;
2628

2729
ABSL_FLAG(float, mem_defrag_threshold, 0.7,
2830
"Minimum percentage of used memory relative to maxmemory cap before running "
@@ -65,6 +67,9 @@ ABSL_FLAG(double, eviction_memory_budget_threshold, 0.1,
6567
"Eviction starts when the free memory (including RSS memory) drops below "
6668
"eviction_memory_budget_threshold * max_memory_limit.");
6769

70+
ABSL_FLAG(uint64_t, force_decommit_threshold, 8_MB,
71+
"The threshold of memory to force decommit when memory is under pressure.");
72+
6873
ABSL_DECLARE_FLAG(uint32_t, max_eviction_per_heartbeat);
6974

7075
namespace dfly {
@@ -216,45 +221,6 @@ size_t CalculateHowManyBytesToEvictOnShard(size_t global_memory_limit, size_t gl
216221
return shard_budget < shard_memory_threshold ? (shard_memory_threshold - shard_budget) : 0;
217222
}
218223

219-
/* Calculates the number of bytes to evict based on memory and rss memory usage. */
220-
size_t CalculateEvictionBytes() {
221-
const size_t shards_count = shard_set->size();
222-
const double eviction_memory_budget_threshold = GetFlag(FLAGS_eviction_memory_budget_threshold);
223-
224-
const size_t shard_memory_budget_threshold =
225-
size_t(max_memory_limit * eviction_memory_budget_threshold) / shards_count;
226-
227-
const size_t global_used_memory = used_mem_current.load(memory_order_relaxed);
228-
229-
// Calculate how many bytes we need to evict on this shard
230-
size_t goal_bytes = CalculateHowManyBytesToEvictOnShard(max_memory_limit, global_used_memory,
231-
shard_memory_budget_threshold);
232-
233-
// TODO: Eviction due to rss usage is not working well as it causes eviction
234-
// of to many keys untill we finally see decrease in rss. We need to improve
235-
// this logic before we enable it.
236-
/*
237-
const double rss_oom_deny_ratio = ServerState::tlocal()->rss_oom_deny_ratio;
238-
// If rss_oom_deny_ratio is set, we should evict depending on rss memory too
239-
if (rss_oom_deny_ratio > 0.0) {
240-
const size_t max_rss_memory = size_t(rss_oom_deny_ratio * max_memory_limit);
241-
// We start eviction when we have less than eviction_memory_budget_threshold * 100% of free rss
242-
memory const size_t shard_rss_memory_budget_threshold =
243-
size_t(max_rss_memory * eviction_memory_budget_threshold) / shards_count;
244-
245-
// Calculate how much rss memory is used by all shards
246-
const size_t global_used_rss_memory = rss_mem_current.load(memory_order_relaxed);
247-
248-
// Try to evict more bytes if we are close to the rss memory limit
249-
goal_bytes = std::max(
250-
goal_bytes, CalculateHowManyBytesToEvictOnShard(max_rss_memory, global_used_rss_memory,
251-
shard_rss_memory_budget_threshold));
252-
}
253-
*/
254-
255-
return goal_bytes;
256-
}
257-
258224
} // namespace
259225

260226
__thread EngineShard* EngineShard::shard_ = nullptr;
@@ -347,6 +313,18 @@ bool EngineShard::DefragTaskState::CheckRequired() {
347313
return false;
348314
}
349315

316+
std::optional<ShardMemUsage> shard_mem_usage;
317+
318+
if (GetFlag(FLAGS_enable_heartbeat_eviction)) {
319+
shard_mem_usage = ReadShardMemUsage(GetFlag(FLAGS_mem_defrag_page_utilization_threshold));
320+
const static double eviction_waste_threshold = 0.05;
321+
if (shard_mem_usage->wasted_mem >
322+
(uint64_t(shard_mem_usage->commited * eviction_waste_threshold))) {
323+
VLOG(1) << "memory issue found for memory " << shard_mem_usage.value();
324+
return true;
325+
}
326+
}
327+
350328
const std::size_t global_threshold = max_memory_limit * GetFlag(FLAGS_mem_defrag_threshold);
351329
if (global_threshold > rss_mem_current.load(memory_order_relaxed)) {
352330
return false;
@@ -361,11 +339,15 @@ bool EngineShard::DefragTaskState::CheckRequired() {
361339
}
362340
last_check_time = now;
363341

364-
ShardMemUsage usage = ReadShardMemUsage(GetFlag(FLAGS_mem_defrag_page_utilization_threshold));
342+
if (!shard_mem_usage) {
343+
shard_mem_usage = ReadShardMemUsage(GetFlag(FLAGS_mem_defrag_page_utilization_threshold));
344+
}
345+
346+
DCHECK(shard_mem_usage.has_value());
365347

366348
const double waste_threshold = GetFlag(FLAGS_mem_defrag_waste_threshold);
367-
if (usage.wasted_mem > (uint64_t(usage.commited * waste_threshold))) {
368-
VLOG(1) << "memory issue found for memory " << usage;
349+
if (shard_mem_usage->wasted_mem > (uint64_t(shard_mem_usage->commited * waste_threshold))) {
350+
VLOG(1) << "memory issue found for memory " << shard_mem_usage.value();
369351
return true;
370352
}
371353

@@ -800,6 +782,7 @@ void EngineShard::RetireExpiredAndEvict() {
800782
DbContext db_cntx;
801783
db_cntx.time_now_ms = GetCurrentTimeMs();
802784

785+
size_t deleted_bytes = 0;
803786
size_t eviction_goal = GetFlag(FLAGS_enable_heartbeat_eviction) ? CalculateEvictionBytes() : 0;
804787

805788
for (unsigned i = 0; i < db_slice.db_array_size(); ++i) {
@@ -811,6 +794,7 @@ void EngineShard::RetireExpiredAndEvict() {
811794
if (expt->size() > 0) {
812795
DbSlice::DeleteExpiredStats stats = db_slice.DeleteExpiredStep(db_cntx, ttl_delete_target);
813796

797+
deleted_bytes += stats.deleted_bytes;
814798
eviction_goal -= std::min(eviction_goal, size_t(stats.deleted_bytes));
815799
counter_[TTL_TRAVERSE].IncBy(stats.traversed);
816800
counter_[TTL_DELETE].IncBy(stats.deleted);
@@ -832,9 +816,75 @@ void EngineShard::RetireExpiredAndEvict() {
832816
<< " bytes. Max eviction per heartbeat: "
833817
<< GetFlag(FLAGS_max_eviction_per_heartbeat);
834818

819+
deleted_bytes += evicted_bytes;
835820
eviction_goal -= std::min(eviction_goal, evicted_bytes);
836821
}
837822
}
823+
824+
auto& deleted_bytes_before_decommit = eviction_state_.deleted_bytes_before_decommit;
825+
eviction_state_.deleted_bytes_before_rss_update += deleted_bytes;
826+
deleted_bytes_before_decommit += deleted_bytes;
827+
828+
if (deleted_bytes_before_decommit >= absl::GetFlag(FLAGS_force_decommit_threshold)) {
829+
// Decommit with force
830+
deleted_bytes_before_decommit = 0;
831+
ServerState::tlocal()->DecommitMemory(ServerState::kAllMemory);
832+
}
833+
}
834+
835+
size_t EngineShard::CalculateEvictionBytes() {
836+
const size_t shards_count = shard_set->size();
837+
const double eviction_memory_budget_threshold = GetFlag(FLAGS_eviction_memory_budget_threshold);
838+
839+
const size_t shard_memory_budget_threshold =
840+
size_t(max_memory_limit * eviction_memory_budget_threshold) / shards_count;
841+
842+
const size_t global_used_memory = used_mem_current.load(memory_order_relaxed);
843+
844+
// Calculate how many bytes we need to evict on this shard
845+
size_t goal_bytes = CalculateHowManyBytesToEvictOnShard(max_memory_limit, global_used_memory,
846+
shard_memory_budget_threshold);
847+
848+
LOG_IF_EVERY_N(INFO, goal_bytes > 0, 10)
849+
<< "Memory goal bytes: " << goal_bytes << ", used memory: " << global_used_memory
850+
<< ", memory limit: " << max_memory_limit;
851+
852+
// If rss_oom_deny_ratio is set, we should evict depending on rss memory too
853+
const double rss_oom_deny_ratio = ServerState::tlocal()->rss_oom_deny_ratio;
854+
if (rss_oom_deny_ratio > 0.0) {
855+
const size_t max_rss_memory = size_t(rss_oom_deny_ratio * max_memory_limit);
856+
/* We start eviction when we have less than eviction_memory_budget_threshold * 100% of free rss
857+
* memory */
858+
const size_t shard_rss_memory_budget_threshold =
859+
size_t(max_rss_memory * eviction_memory_budget_threshold) / shards_count;
860+
861+
// Calculate how much rss memory is used by all shards
862+
const size_t global_used_rss_memory = rss_mem_current.load(memory_order_relaxed);
863+
864+
auto& global_rss_memory_at_prev_eviction = eviction_state_.global_rss_memory_at_prev_eviction;
865+
auto& deleted_bytes_before_rss_update = eviction_state_.deleted_bytes_before_rss_update;
866+
if (global_used_rss_memory < eviction_state_.global_rss_memory_at_prev_eviction) {
867+
deleted_bytes_before_rss_update -=
868+
std::min(deleted_bytes_before_rss_update,
869+
(global_rss_memory_at_prev_eviction - global_used_rss_memory) / shards_count);
870+
}
871+
872+
global_rss_memory_at_prev_eviction = global_used_rss_memory;
873+
874+
// Try to evict more bytes if we are close to the rss memory limit
875+
const size_t rss_goal_bytes = CalculateHowManyBytesToEvictOnShard(
876+
max_rss_memory, global_used_rss_memory - deleted_bytes_before_rss_update * shards_count,
877+
shard_rss_memory_budget_threshold);
878+
879+
LOG_IF_EVERY_N(INFO, goal_bytes > 0, 10)
880+
<< "Rss memory goal bytes: " << rss_goal_bytes
881+
<< ", rss used memory: " << global_used_rss_memory
882+
<< ", rss memory limit: " << max_rss_memory
883+
<< ", deleted_bytes_before_rss_update: " << deleted_bytes_before_rss_update;
884+
885+
goal_bytes = std::max(goal_bytes, rss_goal_bytes);
886+
}
887+
return goal_bytes;
838888
}
839889

840890
void EngineShard::CacheStats() {

src/server/engine_shard.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,12 @@ class EngineShard {
216216
void ResetScanState();
217217
};
218218

219+
struct EvictionTaskState {
220+
size_t deleted_bytes_before_decommit = 0;
221+
size_t deleted_bytes_before_rss_update = 0;
222+
size_t global_rss_memory_at_prev_eviction = 0;
223+
};
224+
219225
EngineShard(util::ProactorBase* pb, mi_heap_t* heap);
220226

221227
// blocks the calling fiber.
@@ -227,6 +233,9 @@ class EngineShard {
227233
void Heartbeat();
228234
void RetireExpiredAndEvict();
229235

236+
/* Calculates the number of bytes to evict based on memory and rss memory usage. */
237+
size_t CalculateEvictionBytes();
238+
230239
void CacheStats();
231240

232241
// We are running a task that checks whether we need to
@@ -267,6 +276,7 @@ class EngineShard {
267276
IntentLock shard_lock_;
268277

269278
uint32_t defrag_task_ = 0;
279+
EvictionTaskState eviction_state_; // Used on eviction fiber
270280
util::fb2::Fiber fiber_heartbeat_periodic_;
271281
util::fb2::Done fiber_heartbeat_periodic_done_;
272282

tests/dragonfly/memory_test.py

Lines changed: 82 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,15 @@
66
from .instance import DflyInstance, DflyInstanceFactory
77

88

9+
def extract_fragmentation_waste(memory_arena):
10+
"""
11+
Extracts the fragmentation waste from the memory arena info.
12+
"""
13+
match = re.search(r"fragmentation waste:\s*([0-9.]+)%", memory_arena)
14+
assert match.group(1) is not None
15+
return float(match.group(1))
16+
17+
918
@pytest.mark.slow
1019
@pytest.mark.opt_only
1120
@pytest.mark.parametrize(
@@ -176,49 +185,101 @@ async def test_eval_with_oom(df_factory: DflyInstanceFactory):
176185
assert rss_before_eval * 1.01 > info["used_memory_rss"]
177186

178187

179-
@pytest.mark.skip("rss eviction disabled")
180188
@pytest.mark.asyncio
181-
@dfly_args(
182-
{
183-
"proactor_threads": 1,
184-
"cache_mode": "true",
185-
"maxmemory": "5gb",
186-
"rss_oom_deny_ratio": 0.8,
187-
"max_eviction_per_heartbeat": 100,
188-
}
189+
@pytest.mark.parametrize(
190+
"proactor_threads_param, maxmemory_param",
191+
[(1, 512 * (1024**2)), (1, 6 * (1024**3)), (4, 6 * (1024**3))],
189192
)
190-
async def test_cache_eviction_with_rss_deny_oom(
191-
async_client: aioredis.Redis,
193+
async def test_cache_eviction_with_rss_deny_oom_simple_case(
194+
df_factory: DflyInstanceFactory,
195+
proactor_threads_param,
196+
maxmemory_param,
192197
):
193198
"""
194199
Test to verify that cache eviction is triggered even if used memory is small but rss memory is above limit
195200
"""
201+
df_server = df_factory.create(
202+
proactor_threads=proactor_threads_param,
203+
cache_mode="true",
204+
maxmemory=maxmemory_param,
205+
rss_oom_deny_ratio=0.8,
206+
)
207+
df_server.start()
208+
209+
async_client = df_server.client()
196210

197-
max_memory = 5 * 1024 * 1024 * 1024 # 5G
198-
rss_max_memory = int(max_memory * 0.8)
211+
max_memory = maxmemory_param
212+
rss_oom_deny_ratio = 0.8
213+
eviction_memory_budget_threshold = 0.1 # 10% of max_memory
199214

200-
data_fill_size = int(0.9 * rss_max_memory) # 95% of rss_max_memory
215+
data_fill_size = int((rss_oom_deny_ratio + 0.05) * max_memory) # 85% of max_memory
201216

202217
val_size = 1024 * 5 # 5 kb
203218
num_keys = data_fill_size // val_size
204219

205220
await async_client.execute_command("DEBUG", "POPULATE", num_keys, "key", val_size)
206-
# Test that used memory is less than 90% of max memory
221+
222+
# Test that used memory is less than 90% of max memory to not to start eviction based on used_memory
207223
memory_info = await async_client.info("memory")
208224
assert (
209225
memory_info["used_memory"] < max_memory * 0.9
210-
), "Used memory should be less than 90% of max memory."
226+
), "Used memory should be less than 90% of max memory to not to start eviction based on used_memory."
211227
assert (
212-
memory_info["used_memory_rss"] > rss_max_memory * 0.9
213-
), "RSS memory should be less than 90% of rss max memory (max_memory * rss_oom_deny_ratio)."
228+
memory_info["used_memory_rss"] > max_memory * rss_oom_deny_ratio
229+
), "Used RSS memory should be more than 80% of rss max memory (max_memory * rss_oom_deny_ratio) to start eviction based on rss memory usage."
214230

215-
# Get RSS memory after creating new connections
231+
# Track eviction stats
216232
memory_info = await async_client.info("memory")
217-
while memory_info["used_memory_rss"] > rss_max_memory * 0.9:
233+
prev_evicted_keys = 0
234+
evicted_keys_repeat_count = 0
235+
while True:
236+
# Wait for some time
218237
await asyncio.sleep(1)
238+
219239
memory_info = await async_client.info("memory")
220240
logging.info(
221-
f'Current rss: {memory_info["used_memory_rss"]}. rss eviction threshold: {rss_max_memory * 0.9}.'
241+
f'Current used memory: {memory_info["used_memory"]}, current used rss: {memory_info["used_memory_rss"]}, rss eviction threshold: {max_memory * (rss_oom_deny_ratio - eviction_memory_budget_threshold)}.'
222242
)
243+
223244
stats_info = await async_client.info("stats")
224245
logging.info(f'Current evicted: {stats_info["evicted_keys"]}. Total keys: {num_keys}.')
246+
247+
# Check if evicted keys are not increasing
248+
if prev_evicted_keys == stats_info["evicted_keys"]:
249+
evicted_keys_repeat_count += 1
250+
else:
251+
prev_evicted_keys = stats_info["evicted_keys"]
252+
evicted_keys_repeat_count = 1
253+
254+
if evicted_keys_repeat_count > 2:
255+
break
256+
257+
# Wait for some time
258+
await asyncio.sleep(2)
259+
260+
memory_arena = await async_client.execute_command("MEMORY", "ARENA")
261+
fragmentation_waste = extract_fragmentation_waste(memory_arena)
262+
logging.info(f"Memory fragmentation waste: {fragmentation_waste}")
263+
assert fragmentation_waste < 12.0, "Memory fragmentation waste should be less than 12%."
264+
265+
# Assert that no more keys are evicted
266+
memory_info = await async_client.info("memory")
267+
stats_info = await async_client.info("stats")
268+
269+
assert memory_info["used_memory"] > max_memory * (
270+
rss_oom_deny_ratio - eviction_memory_budget_threshold - 0.08
271+
), "We should not evict all items."
272+
assert memory_info["used_memory"] < max_memory * (
273+
rss_oom_deny_ratio - eviction_memory_budget_threshold
274+
), "Used memory should be smaller than threshold."
275+
assert memory_info["used_memory_rss"] > max_memory * (
276+
rss_oom_deny_ratio - eviction_memory_budget_threshold - 0.08
277+
), "We should not evict all items."
278+
279+
evicted_keys = stats_info["evicted_keys"]
280+
# We may evict slightly more than prev_evicted_keys due to gaps in RSS memory usage
281+
assert (
282+
evicted_keys > 0
283+
and evicted_keys >= prev_evicted_keys
284+
and evicted_keys <= prev_evicted_keys * 1.0015
285+
), "We should not evict more items."

0 commit comments

Comments
 (0)