Skip to content

feat: Add eviction based on rss memory #4991

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/server/dragonfly_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ ABSL_DECLARE_FLAG(float, mem_defrag_waste_threshold);
ABSL_DECLARE_FLAG(uint32_t, mem_defrag_check_sec_interval);
ABSL_DECLARE_FLAG(std::vector<std::string>, rename_command);
ABSL_DECLARE_FLAG(bool, lua_resp2_legacy_float);
ABSL_DECLARE_FLAG(bool, enable_heartbeat_eviction);
ABSL_DECLARE_FLAG(double, eviction_memory_budget_threshold);
ABSL_DECLARE_FLAG(std::vector<std::string>, command_alias);
ABSL_DECLARE_FLAG(bool, latency_tracking);
Expand Down Expand Up @@ -753,6 +754,8 @@ TEST_F(DefragDflyEngineTest, TestDefragOption) {
absl::SetFlag(&FLAGS_mem_defrag_threshold, 0.0);
absl::SetFlag(&FLAGS_mem_defrag_check_sec_interval, 0);
absl::SetFlag(&FLAGS_mem_defrag_waste_threshold, 0.1);
// We need to disable heartbeat eviction because it enfluences the defragmentation
absl::SetFlag(&FLAGS_enable_heartbeat_eviction, false);

// Fill data into dragonfly and then check if we have
// any location in memory to defrag. See issue #448 for details about this.
Expand Down
145 changes: 99 additions & 46 deletions src/server/engine_shard.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@ extern "C" {
#include "server/search/doc_index.h"
#include "server/server_state.h"
#include "server/tiered_storage.h"
#include "server/tiering/common.h"
#include "server/transaction.h"
#include "util/fibers/proactor_base.h"

using namespace std;
using namespace ::dfly::tiering::literals;

ABSL_FLAG(float, mem_defrag_threshold, 0.7,
"Minimum percentage of used memory relative to maxmemory cap before running "
Expand Down Expand Up @@ -132,46 +134,6 @@ size_t CalculateHowManyBytesToEvictOnShard(size_t global_memory_limit, size_t gl
return shard_budget < shard_memory_threshold ? (shard_memory_threshold - shard_budget) : 0;
}

/* Calculates the number of bytes to evict based on memory and rss memory usage. */
size_t CalculateEvictionBytes() {
const size_t shards_count = shard_set->size();
const double eviction_memory_budget_threshold = GetFlag(FLAGS_eviction_memory_budget_threshold);

size_t limit = max_memory_limit.load(memory_order_relaxed);
const size_t shard_memory_budget_threshold =
size_t(limit * eviction_memory_budget_threshold) / shards_count;

const size_t global_used_memory = used_mem_current.load(memory_order_relaxed);

// Calculate how many bytes we need to evict on this shard
size_t goal_bytes =
CalculateHowManyBytesToEvictOnShard(limit, global_used_memory, shard_memory_budget_threshold);

// TODO: Eviction due to rss usage is not working well as it causes eviction
// of to many keys untill we finally see decrease in rss. We need to improve
// this logic before we enable it.
/*
const double rss_oom_deny_ratio = ServerState::tlocal()->rss_oom_deny_ratio;
// If rss_oom_deny_ratio is set, we should evict depending on rss memory too
if (rss_oom_deny_ratio > 0.0) {
const size_t max_rss_memory = size_t(rss_oom_deny_ratio * max_memory_limit);
// We start eviction when we have less than eviction_memory_budget_threshold * 100% of free rss
memory const size_t shard_rss_memory_budget_threshold =
size_t(max_rss_memory * eviction_memory_budget_threshold) / shards_count;

// Calculate how much rss memory is used by all shards
const size_t global_used_rss_memory = rss_mem_current.load(memory_order_relaxed);

// Try to evict more bytes if we are close to the rss memory limit
goal_bytes = std::max(
goal_bytes, CalculateHowManyBytesToEvictOnShard(max_rss_memory, global_used_rss_memory,
shard_rss_memory_budget_threshold));
}
*/

return goal_bytes;
}

} // namespace

__thread EngineShard* EngineShard::shard_ = nullptr;
Expand Down Expand Up @@ -249,25 +211,55 @@ bool EngineShard::DefragTaskState::CheckRequired() {
return false;
}

const std::size_t global_threshold = limit * GetFlag(FLAGS_mem_defrag_threshold);
/*
If the eviction is enabled, we want to run the defrag task more frequently and more aggressively.
For global threshold rss we use the rss memory minus the eviction budget threshold and minus 3%
- For example if rss_deny_oom_ratio is 0.8 and eviction_memory_budget_threshold is 0.1,
we will start eviction when rss memory is above 0.8 - 0.1 = 0.7. And defragmentation
should still working if used rss memory is above 0.7 - 0.03 = 0.67.
For defrag interval we use the EvictionTaskState::kMemDefragCheckSecInterval
For waste threshold we use the EvictionTaskState::kEvictionWasteThreshold
*/
const bool is_eviction_enabled = GetFlag(FLAGS_enable_heartbeat_eviction);

const double mem_defrag_threshold_flag = GetFlag(FLAGS_mem_defrag_threshold);
const double defrag_threshold =
!is_eviction_enabled ? mem_defrag_threshold_flag
: std::min(mem_defrag_threshold_flag,
ServerState::tlocal()->rss_oom_deny_ratio -
GetFlag(FLAGS_eviction_memory_budget_threshold) -
EvictionTaskState::kDefragRssMemoryDelta);

const std::size_t global_threshold = limit * defrag_threshold;
if (global_threshold > rss_mem_current.load(memory_order_relaxed)) {
return false;
}

const auto now = time(nullptr);
const auto seconds_from_prev_check = now - last_check_time;
const auto mem_defrag_interval = GetFlag(FLAGS_mem_defrag_check_sec_interval);

const uint32_t check_sec_interval_flag = GetFlag(FLAGS_mem_defrag_check_sec_interval);
const auto mem_defrag_interval =
!is_eviction_enabled
? check_sec_interval_flag
: std::min(check_sec_interval_flag, EvictionTaskState::kDefragCheckSecInterval);

if (seconds_from_prev_check < mem_defrag_interval) {
return false;
}

last_check_time = now;

ShardMemUsage usage = ReadShardMemUsage(GetFlag(FLAGS_mem_defrag_page_utilization_threshold));
ShardMemUsage shard_mem_usage =
ReadShardMemUsage(GetFlag(FLAGS_mem_defrag_page_utilization_threshold));

const double waste_threshold = GetFlag(FLAGS_mem_defrag_waste_threshold);
if (usage.wasted_mem > (uint64_t(usage.commited * waste_threshold))) {
VLOG(1) << "memory issue found for memory " << usage;
const float waste_threshold_flag = GetFlag(FLAGS_mem_defrag_waste_threshold);
const double waste_threshold =
!is_eviction_enabled
? waste_threshold_flag
: std::min(waste_threshold_flag, EvictionTaskState::kDefragWasteThreshold);
if (shard_mem_usage.wasted_mem > (uint64_t(shard_mem_usage.commited * waste_threshold))) {
VLOG(1) << "memory issue found for memory " << shard_mem_usage;
return true;
}

Expand Down Expand Up @@ -696,6 +688,7 @@ void EngineShard::RetireExpiredAndEvict() {
DbContext db_cntx;
db_cntx.time_now_ms = GetCurrentTimeMs();

size_t deleted_bytes = 0;
size_t eviction_goal = GetFlag(FLAGS_enable_heartbeat_eviction) ? CalculateEvictionBytes() : 0;

for (unsigned i = 0; i < db_slice.db_array_size(); ++i) {
Expand All @@ -707,6 +700,7 @@ void EngineShard::RetireExpiredAndEvict() {
if (!expt->Empty()) {
DbSlice::DeleteExpiredStats stats = db_slice.DeleteExpiredStep(db_cntx, ttl_delete_target);

deleted_bytes += stats.deleted_bytes;
eviction_goal -= std::min(eviction_goal, size_t(stats.deleted_bytes));
counter_[TTL_TRAVERSE].IncBy(stats.traversed);
counter_[TTL_DELETE].IncBy(stats.deleted);
Expand All @@ -728,9 +722,68 @@ void EngineShard::RetireExpiredAndEvict() {
<< " bytes. Max eviction per heartbeat: "
<< GetFlag(FLAGS_max_eviction_per_heartbeat);

deleted_bytes += evicted_bytes;
eviction_goal -= std::min(eviction_goal, evicted_bytes);
}
}

eviction_state_.deleted_bytes_before_rss_update += deleted_bytes;
}

size_t EngineShard::CalculateEvictionBytes() {
const size_t shards_count = shard_set->size();
const double eviction_memory_budget_threshold = GetFlag(FLAGS_eviction_memory_budget_threshold);

size_t limit = max_memory_limit.load(memory_order_relaxed);
const size_t shard_memory_budget_threshold =
size_t(limit * eviction_memory_budget_threshold) / shards_count;

const size_t global_used_memory = used_mem_current.load(memory_order_relaxed);

// Calculate how many bytes we need to evict on this shard
size_t goal_bytes =
CalculateHowManyBytesToEvictOnShard(limit, global_used_memory, shard_memory_budget_threshold);

VLOG_IF_EVERY_N(1, goal_bytes > 0, 50)
<< "Memory goal bytes: " << goal_bytes << ", used memory: " << global_used_memory
<< ", memory limit: " << max_memory_limit;

// If rss_oom_deny_ratio is set, we should evict depending on rss memory too
const double rss_oom_deny_ratio = ServerState::tlocal()->rss_oom_deny_ratio;
if (rss_oom_deny_ratio > 0.0) {
const size_t max_rss_memory = size_t(rss_oom_deny_ratio * max_memory_limit);
/* We start eviction when we have less than eviction_memory_budget_threshold * 100% of free rss
* memory */
const size_t shard_rss_memory_budget_threshold =
size_t(max_rss_memory * eviction_memory_budget_threshold) / shards_count;

// Calculate how much rss memory is used by all shards
const size_t global_used_rss_memory = rss_mem_current.load(memory_order_relaxed);

auto& global_rss_memory_at_prev_eviction = eviction_state_.global_rss_memory_at_prev_eviction;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

eviction_state_ is used to track how much memory has already been evicted but not yet freed, meaning it hasn’t reduced rss_memory_usage yet. Once we observe a decrease in rss_memory_usage, we also reduce the corresponding amount of evicted bytes in eviction_state_.

This is necessary because rss_memory_usage updates with a delay, which can cause the eviction process to remove more items than actually needed. As shown in the tests, this approach may over-evict by about 5%, and for larger datastores, this percentage tends to be even lower. In comparison, the basic approach (you can see it here #5218) can over-evict by up to 18%.

auto& deleted_bytes_before_rss_update = eviction_state_.deleted_bytes_before_rss_update;
if (global_used_rss_memory < eviction_state_.global_rss_memory_at_prev_eviction) {
deleted_bytes_before_rss_update -=
std::min(deleted_bytes_before_rss_update,
(global_rss_memory_at_prev_eviction - global_used_rss_memory) / shards_count);
}

global_rss_memory_at_prev_eviction = global_used_rss_memory;

// Try to evict more bytes if we are close to the rss memory limit
const size_t rss_goal_bytes = CalculateHowManyBytesToEvictOnShard(
max_rss_memory, global_used_rss_memory - deleted_bytes_before_rss_update * shards_count,
shard_rss_memory_budget_threshold);

VLOG_IF_EVERY_N(1, rss_goal_bytes > 0, 50)
<< "Rss memory goal bytes: " << rss_goal_bytes
<< ", rss used memory: " << global_used_rss_memory
<< ", rss memory limit: " << max_rss_memory
<< ", deleted_bytes_before_rss_update: " << deleted_bytes_before_rss_update;

goal_bytes = std::max(goal_bytes, rss_goal_bytes);
}
return goal_bytes;
}

void EngineShard::CacheStats() {
Expand Down
17 changes: 17 additions & 0 deletions src/server/engine_shard.h
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,19 @@ class EngineShard {
void ResetScanState();
};

struct EvictionTaskState {
/* This constant are used to control the defragmentation task when eviction is enabled.
The task is run periodically to check whether we need to do memory defragmentation.
When eviction is enabled, we want to make defragment task run more frequently
and also we want to make waste threshold lower to allow more aggressive defragmentation. */
static constexpr uint32_t kDefragCheckSecInterval = 2;
static constexpr float kDefragWasteThreshold = 0.05;
static constexpr double kDefragRssMemoryDelta = 0.03;

size_t deleted_bytes_before_rss_update = 0;
size_t global_rss_memory_at_prev_eviction = 0;
};

EngineShard(util::ProactorBase* pb, mi_heap_t* heap);

// blocks the calling fiber.
Expand All @@ -235,6 +248,9 @@ class EngineShard {
void Heartbeat();
void RetireExpiredAndEvict();

/* Calculates the number of bytes to evict based on memory and rss memory usage. */
size_t CalculateEvictionBytes();

void CacheStats();

// We are running a task that checks whether we need to
Expand Down Expand Up @@ -274,6 +290,7 @@ class EngineShard {
IntentLock shard_lock_;

uint32_t defrag_task_ = 0;
EvictionTaskState eviction_state_; // Used on eviction fiber
util::fb2::Fiber fiber_heartbeat_periodic_;
util::fb2::Done fiber_heartbeat_periodic_done_;

Expand Down
Loading
Loading