Skip to content

Commit 65acd60

Browse files
feat: Add eviction based on rss memory usage (#5791)
This PR introduced eviction based on RSS memory usage. It uses max_memory and eviction_memory_budget_threshold flags to evict items. The eviction starts when rss_mem_current is above max_memory_limit * eviction_memory_budget_threshold. Another important step was introducing the eviction_state_ in EngineShard. This state is used to track how much memory has already been evicted but not yet freed, meaning it hasn’t reduced rss_mem_current counter yet. Once we observe a decrease in rss_mem_current, we also reduce the corresponding amount of evicted bytes in eviction_state_. This is necessary because rss_mem_current updates with a delay, which can cause the eviction process to remove more items than actually needed. Flag `enable_heartbeat_rss_eviction` was added which can be used to enable or disable RSS eviction. Closes #4011 Signed-off-by: Stepan Bagritsevich <[email protected]> Co-authored-by: mkaruza <[email protected]>
1 parent b4a5c87 commit 65acd60

File tree

5 files changed

+135
-88
lines changed

5 files changed

+135
-88
lines changed

src/server/engine_shard.cc

Lines changed: 70 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -54,13 +54,13 @@ ABSL_FLAG(string, tiered_prefix, "",
5454

5555
ABSL_FLAG(bool, enable_heartbeat_eviction, true,
5656
"Enable eviction during heartbeat when memory is under pressure.");
57-
57+
ABSL_FLAG(bool, enable_heartbeat_rss_eviction, true,
58+
"Enable eviction during heartbeat when rss memory is under pressure. Evicition based "
59+
"on used_memory will still be enabled.");
5860
ABSL_FLAG(double, eviction_memory_budget_threshold, 0.1,
5961
"Eviction starts when the free memory (including RSS memory) drops below "
6062
"eviction_memory_budget_threshold * max_memory_limit.");
61-
6263
ABSL_FLAG(bool, background_heartbeat, false, "Whether to run heartbeat as a background fiber");
63-
6464
ABSL_DECLARE_FLAG(uint32_t, max_eviction_per_heartbeat);
6565

6666
namespace dfly {
@@ -116,46 +116,6 @@ size_t CalculateHowManyBytesToEvictOnShard(size_t global_memory_limit, size_t gl
116116
return shard_budget < shard_memory_threshold ? (shard_memory_threshold - shard_budget) : 0;
117117
}
118118

119-
/* Calculates the number of bytes to evict based on memory and rss memory usage. */
120-
size_t CalculateEvictionBytes() {
121-
const size_t shards_count = shard_set->size();
122-
const double eviction_memory_budget_threshold = GetFlag(FLAGS_eviction_memory_budget_threshold);
123-
124-
size_t limit = max_memory_limit.load(memory_order_relaxed);
125-
const size_t shard_memory_budget_threshold =
126-
size_t(limit * eviction_memory_budget_threshold) / shards_count;
127-
128-
const size_t global_used_memory = used_mem_current.load(memory_order_relaxed);
129-
130-
// Calculate how many bytes we need to evict on this shard
131-
size_t goal_bytes =
132-
CalculateHowManyBytesToEvictOnShard(limit, global_used_memory, shard_memory_budget_threshold);
133-
134-
// TODO: Eviction due to rss usage is not working well as it causes eviction
135-
// of to many keys untill we finally see decrease in rss. We need to improve
136-
// this logic before we enable it.
137-
/*
138-
const double rss_oom_deny_ratio = ServerState::tlocal()->rss_oom_deny_ratio;
139-
// If rss_oom_deny_ratio is set, we should evict depending on rss memory too
140-
if (rss_oom_deny_ratio > 0.0) {
141-
const size_t max_rss_memory = size_t(rss_oom_deny_ratio * max_memory_limit);
142-
// We start eviction when we have less than eviction_memory_budget_threshold * 100% of free rss
143-
memory const size_t shard_rss_memory_budget_threshold =
144-
size_t(max_rss_memory * eviction_memory_budget_threshold) / shards_count;
145-
146-
// Calculate how much rss memory is used by all shards
147-
const size_t global_used_rss_memory = rss_mem_current.load(memory_order_relaxed);
148-
149-
// Try to evict more bytes if we are close to the rss memory limit
150-
goal_bytes = std::max(
151-
goal_bytes, CalculateHowManyBytesToEvictOnShard(max_rss_memory, global_used_rss_memory,
152-
shard_rss_memory_budget_threshold));
153-
}
154-
*/
155-
156-
return goal_bytes;
157-
}
158-
159119
} // namespace
160120

161121
__thread EngineShard* EngineShard::shard_ = nullptr;
@@ -265,10 +225,7 @@ bool EngineShard::DefragTaskState::CheckRequired() {
265225
if (res == 0) {
266226
// finished checking.
267227
last_check_time = time(nullptr);
268-
if (finfo.committed != finfo.committed_golden) {
269-
LOG_FIRST_N(ERROR, 100) << "committed memory computed incorrectly: " << finfo.committed
270-
<< " vs " << finfo.committed_golden;
271-
}
228+
272229
const double waste_threshold = GetFlag(FLAGS_mem_defrag_waste_threshold);
273230
if (finfo.wasted > size_t(finfo.committed * waste_threshold)) {
274231
VLOG(1) << "memory fragmentation issue found: " << finfo.wasted << " " << finfo.committed;
@@ -425,6 +382,8 @@ void EngineShard::StartPeriodicHeartbeatFiber(util::ProactorBase* pb) {
425382
return;
426383
}
427384
auto heartbeat = [this]() { Heartbeat(); };
385+
386+
eviction_state_.rss_eviction_enabled_ = GetFlag(FLAGS_enable_heartbeat_rss_eviction);
428387
std::chrono::milliseconds period_ms(*cycle_ms);
429388

430389
fb2::Fiber::Opts fb_opts{.priority = absl::GetFlag(FLAGS_background_heartbeat)
@@ -702,6 +661,7 @@ void EngineShard::RetireExpiredAndEvict() {
702661
DbContext db_cntx;
703662
db_cntx.time_now_ms = GetCurrentTimeMs();
704663

664+
size_t deleted_bytes = 0;
705665
size_t eviction_goal = GetFlag(FLAGS_enable_heartbeat_eviction) ? CalculateEvictionBytes() : 0;
706666

707667
for (unsigned i = 0; i < db_slice.db_array_size(); ++i) {
@@ -713,6 +673,7 @@ void EngineShard::RetireExpiredAndEvict() {
713673
if (!expt->Empty()) {
714674
DbSlice::DeleteExpiredStats stats = db_slice.DeleteExpiredStep(db_cntx, ttl_delete_target);
715675

676+
deleted_bytes += stats.deleted_bytes;
716677
eviction_goal -= std::min(eviction_goal, size_t(stats.deleted_bytes));
717678
counter_[TTL_TRAVERSE].IncBy(stats.traversed);
718679
counter_[TTL_DELETE].IncBy(stats.deleted);
@@ -734,9 +695,71 @@ void EngineShard::RetireExpiredAndEvict() {
734695
<< " bytes. Max eviction per heartbeat: "
735696
<< GetFlag(FLAGS_max_eviction_per_heartbeat);
736697

698+
deleted_bytes += evicted_bytes;
737699
eviction_goal -= std::min(eviction_goal, evicted_bytes);
738700
}
739701
}
702+
703+
eviction_state_.deleted_bytes_before_rss_update += deleted_bytes;
704+
}
705+
706+
size_t EngineShard::CalculateEvictionBytes() {
707+
const size_t shards_count = shard_set->size();
708+
const double eviction_memory_budget_threshold = GetFlag(FLAGS_eviction_memory_budget_threshold);
709+
710+
// Calculate threshold for both used_memory and rss_memory
711+
size_t limit = max_memory_limit.load(memory_order_relaxed);
712+
const size_t shard_memory_budget_threshold =
713+
size_t(limit * eviction_memory_budget_threshold) / shards_count;
714+
715+
const size_t global_used_memory = used_mem_current.load(memory_order_relaxed);
716+
717+
// Calculate how many bytes we need to evict on this shard
718+
size_t goal_bytes =
719+
CalculateHowManyBytesToEvictOnShard(limit, global_used_memory, shard_memory_budget_threshold);
720+
721+
VLOG_IF_EVERY_N(1, goal_bytes > 0, 50)
722+
<< "Used memory goal bytes: " << goal_bytes << ", used memory: " << global_used_memory
723+
<< ", memory limit: " << max_memory_limit;
724+
725+
// Check for `enable_heartbeat_rss_eviction` flag since it dynamic. And reset
726+
// state if flag has changed.
727+
bool rss_eviction_enabled_flag = GetFlag(FLAGS_enable_heartbeat_rss_eviction);
728+
if (eviction_state_.rss_eviction_enabled_ != rss_eviction_enabled_flag) {
729+
eviction_state_.global_rss_memory_at_prev_eviction =
730+
eviction_state_.deleted_bytes_before_rss_update = 0;
731+
eviction_state_.rss_eviction_enabled_ = rss_eviction_enabled_flag;
732+
}
733+
if (eviction_state_.rss_eviction_enabled_) {
734+
// Calculate how much rss memory is used by all shards
735+
const size_t global_used_rss_memory = rss_mem_current.load(memory_order_relaxed);
736+
auto& global_rss_memory_at_prev_eviction = eviction_state_.global_rss_memory_at_prev_eviction;
737+
auto& deleted_bytes_before_rss_update = eviction_state_.deleted_bytes_before_rss_update;
738+
if (global_used_rss_memory < eviction_state_.global_rss_memory_at_prev_eviction) {
739+
auto decrease_delete_bytes_before_rss_update =
740+
std::min(deleted_bytes_before_rss_update,
741+
(global_rss_memory_at_prev_eviction - global_used_rss_memory) / shards_count);
742+
VLOG_EVERY_N(1, 50) << "deleted_bytes_before_rss_update: " << deleted_bytes_before_rss_update
743+
<< " decrease_delete_bytes_before_rss_update: "
744+
<< decrease_delete_bytes_before_rss_update;
745+
deleted_bytes_before_rss_update -= decrease_delete_bytes_before_rss_update;
746+
}
747+
748+
global_rss_memory_at_prev_eviction = global_used_rss_memory;
749+
750+
// Try to evict more bytes if we are close to the rss memory limit
751+
const size_t rss_goal_bytes = CalculateHowManyBytesToEvictOnShard(
752+
limit, global_used_rss_memory - deleted_bytes_before_rss_update * shards_count,
753+
shard_memory_budget_threshold);
754+
755+
VLOG_IF_EVERY_N(1, rss_goal_bytes > 0, 50)
756+
<< "Rss memory goal bytes: " << rss_goal_bytes
757+
<< ", rss used memory: " << global_used_rss_memory << ", rss memory limit: " << limit
758+
<< ", deleted_bytes_before_rss_update: " << deleted_bytes_before_rss_update;
759+
760+
goal_bytes = std::max(goal_bytes, rss_goal_bytes);
761+
}
762+
return goal_bytes;
740763
}
741764

742765
void EngineShard::CacheStats() {

src/server/engine_shard.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,12 @@ class EngineShard {
225225
void ResetScanState();
226226
};
227227

228+
struct EvictionTaskState {
229+
bool rss_eviction_enabled_ = true;
230+
size_t deleted_bytes_before_rss_update = 0;
231+
size_t global_rss_memory_at_prev_eviction = 0;
232+
};
233+
228234
EngineShard(util::ProactorBase* pb, mi_heap_t* heap);
229235

230236
// blocks the calling fiber.
@@ -236,6 +242,9 @@ class EngineShard {
236242
void Heartbeat();
237243
void RetireExpiredAndEvict();
238244

245+
/* Calculates the number of bytes to evict based on memory and rss memory usage. */
246+
size_t CalculateEvictionBytes();
247+
239248
void CacheStats();
240249

241250
// We are running a task that checks whether we need to
@@ -275,6 +284,7 @@ class EngineShard {
275284
IntentLock shard_lock_;
276285

277286
uint32_t defrag_task_ = 0;
287+
EvictionTaskState eviction_state_; // Used on eviction fiber
278288
util::fb2::Fiber fiber_heartbeat_periodic_;
279289
util::fb2::Done fiber_heartbeat_periodic_done_;
280290

src/server/main_service.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -916,6 +916,7 @@ void Service::Init(util::AcceptServer* acceptor, std::vector<facade::Listener*>
916916
config_registry.Register("dbnum"); // equivalent to databases in redis.
917917
config_registry.Register("dir");
918918
config_registry.RegisterMutable("enable_heartbeat_eviction");
919+
config_registry.RegisterMutable("enable_heartbeat_rss_eviction");
919920
config_registry.RegisterMutable("masterauth");
920921
config_registry.RegisterMutable("masteruser");
921922
config_registry.RegisterMutable("max_eviction_per_heartbeat");

src/server/test_utils.cc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ ABSL_DECLARE_FLAG(double, rss_oom_deny_ratio);
3232
ABSL_DECLARE_FLAG(uint32_t, num_shards);
3333
ABSL_FLAG(bool, force_epoll, false, "If true, uses epoll api instead iouring to run tests");
3434
ABSL_DECLARE_FLAG(uint32_t, acllog_max_len);
35+
ABSL_DECLARE_FLAG(bool, enable_heartbeat_rss_eviction);
36+
3537
namespace dfly {
3638

3739
namespace {
@@ -185,6 +187,8 @@ void BaseFamilyTest::SetUpTestSuite() {
185187

186188
absl::SetFlag(&FLAGS_rss_oom_deny_ratio, -1);
187189
absl::SetFlag(&FLAGS_dbfilename, "");
190+
// We don't want rss eviction
191+
absl::SetFlag(&FLAGS_enable_heartbeat_rss_eviction, false);
188192

189193
static bool init = true;
190194
if (exchange(init, false)) {

tests/dragonfly/memory_test.py

Lines changed: 50 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -186,52 +186,61 @@ async def test_eval_with_oom(df_factory: DflyInstanceFactory):
186186
assert rss_before_eval * 1.01 > info["used_memory_rss"]
187187

188188

189-
@pytest.mark.skip("rss eviction disabled")
190-
@pytest.mark.asyncio
191-
@dfly_args(
192-
{
193-
"proactor_threads": 1,
194-
"cache_mode": "true",
195-
"maxmemory": "5gb",
196-
"rss_oom_deny_ratio": 0.8,
197-
"max_eviction_per_heartbeat": 100,
198-
}
199-
)
200-
async def test_cache_eviction_with_rss_deny_oom(
201-
async_client: aioredis.Redis,
202-
):
203-
"""
204-
Test to verify that cache eviction is triggered even if used memory is small but rss memory is above limit
205-
"""
206-
207-
max_memory = 5 * 1024 * 1024 * 1024 # 5G
208-
rss_max_memory = int(max_memory * 0.8)
189+
@pytest.mark.parametrize("heartbeat_rss_eviction", [True, False])
190+
async def test_eviction_on_rss_treshold(df_factory: DflyInstanceFactory, heartbeat_rss_eviction):
191+
max_memory = 1024 * 1024**2 # 10242mb
192+
193+
df_server = df_factory.create(
194+
proactor_threads=3,
195+
cache_mode="yes",
196+
maxmemory=max_memory,
197+
enable_heartbeat_eviction="false",
198+
enable_heartbeat_rss_eviction=heartbeat_rss_eviction,
199+
)
200+
df_server.start()
201+
client = df_server.client()
209202

210-
data_fill_size = int(0.9 * rss_max_memory) # 95% of rss_max_memory
203+
data_fill_size = int(0.70 * max_memory) # 70% of max_memory
211204

212205
val_size = 1024 * 5 # 5 kb
213206
num_keys = data_fill_size // val_size
214207

215-
await async_client.execute_command("DEBUG", "POPULATE", num_keys, "key", val_size)
216-
# Test that used memory is less than 90% of max memory
217-
memory_info = await async_client.info("memory")
218-
assert (
219-
memory_info["used_memory"] < max_memory * 0.9
220-
), "Used memory should be less than 90% of max memory."
221-
assert (
222-
memory_info["used_memory_rss"] > rss_max_memory * 0.9
223-
), "RSS memory should be less than 90% of rss max memory (max_memory * rss_oom_deny_ratio)."
224-
225-
# Get RSS memory after creating new connections
226-
memory_info = await async_client.info("memory")
227-
while memory_info["used_memory_rss"] > rss_max_memory * 0.9:
228-
await asyncio.sleep(1)
229-
memory_info = await async_client.info("memory")
230-
logging.info(
231-
f'Current rss: {memory_info["used_memory_rss"]}. rss eviction threshold: {rss_max_memory * 0.9}.'
232-
)
233-
stats_info = await async_client.info("stats")
234-
logging.info(f'Current evicted: {stats_info["evicted_keys"]}. Total keys: {num_keys}.')
208+
await client.execute_command("DEBUG", "POPULATE", num_keys, "key", val_size)
209+
210+
# Create huge list which can be used with LRANGE to increase RSS memory only
211+
for name in ["list_1", "list_2"]:
212+
for i in range(1, 1000):
213+
rand_str = "".join(random.choices(string.ascii_letters, k=val_size))
214+
await client.execute_command(f"LPUSH {name} {rand_str}")
215+
216+
# Make them STICK so we don't evict them
217+
await client.execute_command(f"STICK list_1")
218+
await client.execute_command(f"STICK list_2")
219+
220+
await client.execute_command("CONFIG SET enable_heartbeat_eviction true")
221+
222+
memory_info_before = await client.info("memory")
223+
224+
# This will increase only RSS memory above treshold
225+
p = client.pipeline()
226+
for _ in range(50):
227+
p.execute_command("LRANGE list_1 0 -1")
228+
p.execute_command("LRANGE list_2 0 -1")
229+
await p.execute()
230+
231+
# Wait for some time
232+
await asyncio.sleep(3)
233+
memory_info_after = await client.info("memory")
234+
stats_info_after = await client.info("stats")
235+
236+
if heartbeat_rss_eviction:
237+
# We should see used memory deacrease and number of some number of evicted keys
238+
assert memory_info_after["used_memory"] < memory_info_before["used_memory"]
239+
assert stats_info_after["evicted_keys"]
240+
else:
241+
# If heartbeat rss eviction is disabled there should be no chage
242+
assert memory_info_after["used_memory"] == memory_info_before["used_memory"]
243+
assert stats_info_after["evicted_keys"] == 0
235244

236245

237246
@pytest.mark.asyncio

0 commit comments

Comments
 (0)