Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/server/dfly_main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -823,7 +823,12 @@ Usage: dragonfly [FLAGS]
// export MIMALLOC_VERBOSE=1 to see the options before the override.
mi_option_enable(mi_option_show_errors);
mi_option_set(mi_option_max_warnings, 0);

mi_option_enable(mi_option_purge_decommits);
DCHECK(mi_option_get(mi_option_reset_decommits) == 1);

mi_option_set(mi_option_purge_delay, 0);
DCHECK(!mi_option_get(mi_option_reset_delay));

fb2::SetDefaultStackResource(&fb2::std_malloc_resource, kFiberDefaultStackSize);

Expand Down
50 changes: 35 additions & 15 deletions src/server/engine_shard.cc
Original file line number Diff line number Diff line change
Expand Up @@ -230,28 +230,32 @@ size_t CalculateEvictionBytes() {
size_t goal_bytes = CalculateHowManyBytesToEvictOnShard(max_memory_limit, global_used_memory,
shard_memory_budget_threshold);

// TODO: Eviction due to rss usage is not working well as it causes eviction
// of to many keys untill we finally see decrease in rss. We need to improve
// this logic before we enable it.
/*
const double rss_oom_deny_ratio = ServerState::tlocal()->rss_oom_deny_ratio;
LOG_IF_EVERY_N(INFO, goal_bytes > 0, 50)
<< "Memory goal bytes: " << goal_bytes << ", used memory: " << global_used_memory
<< ", memory limit: " << max_memory_limit;

// If rss_oom_deny_ratio is set, we should evict depending on rss memory too
const double rss_oom_deny_ratio = ServerState::tlocal()->rss_oom_deny_ratio;
if (rss_oom_deny_ratio > 0.0) {
const size_t max_rss_memory = size_t(rss_oom_deny_ratio * max_memory_limit);
// We start eviction when we have less than eviction_memory_budget_threshold * 100% of free rss
memory const size_t shard_rss_memory_budget_threshold =
/* We start eviction when we have less than eviction_memory_budget_threshold * 100% of free rss
* memory */
const size_t shard_rss_memory_budget_threshold =
size_t(max_rss_memory * eviction_memory_budget_threshold) / shards_count;

// Calculate how much rss memory is used by all shards
const size_t global_used_rss_memory = rss_mem_current.load(memory_order_relaxed);

// Try to evict more bytes if we are close to the rss memory limit
goal_bytes = std::max(
goal_bytes, CalculateHowManyBytesToEvictOnShard(max_rss_memory, global_used_rss_memory,
shard_rss_memory_budget_threshold));
}
*/
const size_t rss_goal_bytes = CalculateHowManyBytesToEvictOnShard(
max_rss_memory, global_used_rss_memory, shard_rss_memory_budget_threshold);

LOG_IF_EVERY_N(INFO, rss_goal_bytes > 0, 50) << "Rss memory goal bytes: " << rss_goal_bytes
<< ", rss used memory: " << global_used_rss_memory
<< ", rss memory limit: " << max_rss_memory;

goal_bytes = std::max(goal_bytes, rss_goal_bytes);
}
return goal_bytes;
}

Expand Down Expand Up @@ -359,6 +363,18 @@ bool EngineShard::DefragTaskState::CheckRequired() {
return false;
}

std::optional<ShardMemUsage> shard_mem_usage;

if (GetFlag(FLAGS_enable_heartbeat_eviction)) {
shard_mem_usage = ReadShardMemUsage(GetFlag(FLAGS_mem_defrag_page_utilization_threshold));
const static double eviction_waste_threshold = 0.05;
if (shard_mem_usage->wasted_mem >
(uint64_t(shard_mem_usage->commited * eviction_waste_threshold))) {
VLOG(1) << "memory issue found for memory " << shard_mem_usage.value();
return true;
}
}

const std::size_t global_threshold = max_memory_limit * GetFlag(FLAGS_mem_defrag_threshold);
if (global_threshold > rss_mem_current.load(memory_order_relaxed)) {
return false;
Expand All @@ -373,11 +389,15 @@ bool EngineShard::DefragTaskState::CheckRequired() {
}
last_check_time = now;

ShardMemUsage usage = ReadShardMemUsage(GetFlag(FLAGS_mem_defrag_page_utilization_threshold));
if (!shard_mem_usage) {
shard_mem_usage = ReadShardMemUsage(GetFlag(FLAGS_mem_defrag_page_utilization_threshold));
}

DCHECK(shard_mem_usage.has_value());

const double waste_threshold = GetFlag(FLAGS_mem_defrag_waste_threshold);
if (usage.wasted_mem > (uint64_t(usage.commited * waste_threshold))) {
VLOG(1) << "memory issue found for memory " << usage;
if (shard_mem_usage->wasted_mem > (uint64_t(shard_mem_usage->commited * waste_threshold))) {
VLOG(1) << "memory issue found for memory " << shard_mem_usage.value();
return true;
}

Expand Down
220 changes: 199 additions & 21 deletions tests/dragonfly/memory_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,15 @@
from .instance import DflyInstance, DflyInstanceFactory


def extract_fragmentation_waste(memory_arena):
"""
Extracts the fragmentation waste from the memory arena info.
"""
match = re.search(r"fragmentation waste:\s*([0-9.]+)%", memory_arena)
assert match.group(1) is not None
return float(match.group(1))


@pytest.mark.slow
@pytest.mark.opt_only
@pytest.mark.parametrize(
Expand Down Expand Up @@ -176,49 +185,218 @@ async def test_eval_with_oom(df_factory: DflyInstanceFactory):
assert rss_before_eval * 1.01 > info["used_memory_rss"]


@pytest.mark.skip("rss eviction disabled")
@pytest.mark.asyncio
@dfly_args(
{
"proactor_threads": 1,
"cache_mode": "true",
"maxmemory": "5gb",
"rss_oom_deny_ratio": 0.8,
"max_eviction_per_heartbeat": 100,
}
@pytest.mark.parametrize(
"proactor_threads_param, maxmemory_param",
[(1, 6 * (1024**3)), (4, 6 * (1024**3))],
)
async def test_cache_eviction_with_rss_deny_oom(
async_client: aioredis.Redis,
async def test_cache_eviction_with_rss_deny_oom_simple_case(
df_factory: DflyInstanceFactory,
proactor_threads_param,
maxmemory_param,
):
"""
Test to verify that cache eviction is triggered even if used memory is small but rss memory is above limit
"""
df_server = df_factory.create(
proactor_threads=proactor_threads_param,
cache_mode="true",
maxmemory=maxmemory_param,
rss_oom_deny_ratio=0.8,
)
df_server.start()

max_memory = 5 * 1024 * 1024 * 1024 # 5G
rss_max_memory = int(max_memory * 0.8)
async_client = df_server.client()

data_fill_size = int(0.9 * rss_max_memory) # 95% of rss_max_memory
max_memory = maxmemory_param
rss_oom_deny_ratio = 0.8
eviction_memory_budget_threshold = 0.1 # 10% of max_memory

rss_eviction_threshold = max_memory * (rss_oom_deny_ratio - eviction_memory_budget_threshold)

data_fill_size = int((rss_oom_deny_ratio + 0.05) * max_memory) # 85% of max_memory

val_size = 1024 * 5 # 5 kb
num_keys = data_fill_size // val_size

await async_client.execute_command("DEBUG", "POPULATE", num_keys, "key", val_size)
# Test that used memory is less than 90% of max memory

# Test that used memory is less than 90% of max memory to not to start eviction based on used_memory
memory_info = await async_client.info("memory")
assert (
memory_info["used_memory"] < max_memory * 0.9
), "Used memory should be less than 90% of max memory."
), "Used memory should be less than 90% of max memory to not to start eviction based on used_memory."
assert (
memory_info["used_memory_rss"] > rss_max_memory * 0.9
), "RSS memory should be less than 90% of rss max memory (max_memory * rss_oom_deny_ratio)."
memory_info["used_memory_rss"] > max_memory * rss_oom_deny_ratio
), "Used RSS memory should be more than 80% of rss max memory (max_memory * rss_oom_deny_ratio) to start eviction based on rss memory usage."

# Get RSS memory after creating new connections
memory_info = await async_client.info("memory")
while memory_info["used_memory_rss"] > rss_max_memory * 0.9:
prev_evicted_keys = 0
evicted_keys_repeat_count = 0
while True:
# Wait for some time
await asyncio.sleep(1)

memory_info = await async_client.info("memory")
logging.info(
f'Current rss: {memory_info["used_memory_rss"]}. rss eviction threshold: {rss_max_memory * 0.9}.'
f'Current used memory: {memory_info["used_memory"]}, current used rss: {memory_info["used_memory_rss"]}, rss eviction threshold: {rss_eviction_threshold}.'
)

stats_info = await async_client.info("stats")
logging.info(f'Current evicted: {stats_info["evicted_keys"]}. Total keys: {num_keys}.')

# Check if evicted keys are not increasing
if prev_evicted_keys == stats_info["evicted_keys"]:
evicted_keys_repeat_count += 1
else:
prev_evicted_keys = stats_info["evicted_keys"]
evicted_keys_repeat_count = 1

if evicted_keys_repeat_count > 2:
break

# Wait for some time
await asyncio.sleep(2)

memory_arena = await async_client.execute_command("MEMORY", "ARENA")
fragmentation_waste = extract_fragmentation_waste(memory_arena)
logging.info(f"Memory fragmentation waste: {fragmentation_waste}")
assert fragmentation_waste < 12.0, "Memory fragmentation waste should be less than 12%."

# Assert that no more keys are evicted
memory_info = await async_client.info("memory")
stats_info = await async_client.info("stats")

assert memory_info["used_memory"] > max_memory * (
rss_oom_deny_ratio - eviction_memory_budget_threshold - 0.18
), "We should not evict all items."
assert memory_info["used_memory"] < max_memory * (
rss_oom_deny_ratio - eviction_memory_budget_threshold
), "Used memory should be smaller than threshold."
assert memory_info["used_memory_rss"] > max_memory * (
rss_oom_deny_ratio - eviction_memory_budget_threshold - 0.18
), "We should not evict all items."

evicted_keys = stats_info["evicted_keys"]
# We may evict slightly more than prev_evicted_keys due to gaps in RSS memory usage
assert (
evicted_keys > 0
and evicted_keys >= prev_evicted_keys
and evicted_keys <= prev_evicted_keys * 1.0015
), "We should not evict more items."


@pytest.mark.asyncio
@pytest.mark.parametrize(
"proactor_threads_param, maxmemory_param",
[(1, 6 * (1024**3)), (4, 6 * (1024**3))],
)
async def test_cache_eviction_with_rss_deny_oom_two_waves(
df_factory: DflyInstanceFactory, proactor_threads_param, maxmemory_param
):
"""
Test to verify that cache eviction is triggered even if used memory is small but rss memory is above limit
It is similar to the test_cache_eviction_with_rss_deny_oom_simple_case but here we have two waves of data filling:
1. First wave fills the instance to 85% of max memory, which is above rss_oom_deny_ratio.
2. Then we wait for eviction to happen based on rss memory usage. After eviction we should have 70% of max memory used.
3. Second wave fills the instance to 90% of max memory, which is above rss_oom_deny_ratio.
4. Second time eviction should happen
"""
df_server = df_factory.create(
proactor_threads=proactor_threads_param,
cache_mode="true",
maxmemory=maxmemory_param,
rss_oom_deny_ratio=0.8,
)
df_server.start()

async_client = df_server.client()

max_memory = maxmemory_param
rss_oom_deny_ratio = 0.8
eviction_memory_budget_threshold = 0.1 # 10% of max_memory

rss_eviction_threshold = max_memory * (rss_oom_deny_ratio - eviction_memory_budget_threshold)

# first wave fills 85% of max memory
# second wave fills 20% of max memory
data_fill_size = [
int((rss_oom_deny_ratio + 0.05) * max_memory),
int((1 - rss_oom_deny_ratio) * max_memory),
]

val_size = 1024 * 5 # 5 kb

for i in range(2):
if i > 0:
await asyncio.sleep(2)

num_keys = data_fill_size[i] // val_size
logging.info(
f"Populating data for wave {i}. Data fill size: {data_fill_size[i]}. Number of keys: {num_keys}."
)
await async_client.execute_command("DEBUG", "POPULATE", num_keys, f"key{i}", val_size)

# Test that used memory is less than 90% of max memory to not to start eviction based on used_memory
memory_info = await async_client.info("memory")
assert (
memory_info["used_memory"] < max_memory * 0.9
), "Used memory should be less than 90% of max memory to not to start eviction based on used_memory."
assert (
memory_info["used_memory_rss"] > max_memory * rss_oom_deny_ratio
), "Used RSS memory should be more than 80% of rss max memory (max_memory * rss_oom_deny_ratio) to start eviction based on rss memory usage."

memory_info = await async_client.info("memory")
prev_evicted_keys = 0
evicted_keys_repeat_count = 0
while True:
# Wait for some time
await asyncio.sleep(1)

memory_info = await async_client.info("memory")
logging.info(
f'Current used memory: {memory_info["used_memory"]}, current used rss: {memory_info["used_memory_rss"]}, rss eviction threshold: {rss_eviction_threshold}.'
)

stats_info = await async_client.info("stats")
logging.info(f'Current evicted: {stats_info["evicted_keys"]}. Total keys: {num_keys}.')

# Check if evicted keys are not increasing
if prev_evicted_keys == stats_info["evicted_keys"]:
evicted_keys_repeat_count += 1
else:
prev_evicted_keys = stats_info["evicted_keys"]
evicted_keys_repeat_count = 1

if evicted_keys_repeat_count > 2:
break

# Wait for some time
await asyncio.sleep(2)

memory_arena = await async_client.execute_command("MEMORY", "ARENA")
fragmentation_waste = extract_fragmentation_waste(memory_arena)
logging.info(f"Memory fragmentation waste: {fragmentation_waste}")
assert fragmentation_waste < 12.0, "Memory fragmentation waste should be less than 12%."

# Assert that no more keys are evicted
memory_info = await async_client.info("memory")
stats_info = await async_client.info("stats")

assert memory_info["used_memory"] > max_memory * (
rss_oom_deny_ratio - eviction_memory_budget_threshold - 0.18
), "We should not evict all items."
assert memory_info["used_memory"] < max_memory * (
rss_oom_deny_ratio - eviction_memory_budget_threshold
), "Used memory should be smaller than threshold."
assert memory_info["used_memory_rss"] > max_memory * (
rss_oom_deny_ratio - eviction_memory_budget_threshold - 0.18
), "We should not evict all items."

evicted_keys = stats_info["evicted_keys"]
# We may evict slightly more than prev_evicted_keys due to gaps in RSS memory usage
assert (
evicted_keys > 0
and evicted_keys >= prev_evicted_keys
and evicted_keys <= prev_evicted_keys * 1.0015
), "We should not evict more items."
Loading