Skip to content

Commit 568a7b1

Browse files
authored
feat: Dynamic memory snapshots (#1715)
### Description - Add `Ratio` type to represent the maximum relative available memory of the system. - Allow to initialize the `Snapshotter.max_memory_size` with either `Ratio` (dynamic memory) or `ByteSize` (fixed memory) - When `Ratio` is used, `Snapshotter` will take into account the current available memory. (Previously, it would take into account only the initial available memory.) **Top level usage in Crawlers:** Fixed memory ``` BasicCrawler(configuration=Configuration(memory_mbytes=1024)) ``` Dynamic memory ``` BasicCrawler(configuration=Configuration(available_memory_ratio=0.5)) ``` ### Issues - Closes: #1704 ### Testing - Unit test ### Checklist - [ ] CI passed
1 parent 0d5796b commit 568a7b1

File tree

4 files changed

+146
-24
lines changed

4 files changed

+146
-24
lines changed

src/crawlee/_autoscaling/_types.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,14 @@
22

33
from dataclasses import dataclass, field
44
from datetime import datetime, timedelta, timezone
5-
from typing import TYPE_CHECKING
5+
from typing import TYPE_CHECKING, Annotated
6+
7+
from pydantic import Field
8+
from pydantic.dataclasses import dataclass as pydantic_dataclass
69

710
if TYPE_CHECKING:
811
from crawlee._utils.byte_size import ByteSize
912

10-
1113
SYSTEM_WIDE_MEMORY_OVERLOAD_THRESHOLD = 0.97
1214

1315

@@ -167,3 +169,10 @@ def is_overloaded(self) -> bool:
167169

168170

169171
Snapshot = MemorySnapshot | CpuSnapshot | EventLoopSnapshot | ClientSnapshot
172+
173+
174+
@pydantic_dataclass
175+
class Ratio:
176+
"""Represents ratio of memory."""
177+
178+
value: Annotated[float, Field(gt=0.0, le=1.0)]

src/crawlee/_autoscaling/snapshotter.py

Lines changed: 62 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,18 @@
33
from __future__ import annotations
44

55
import bisect
6+
import functools
67
from datetime import datetime, timedelta, timezone
78
from logging import getLogger
89
from typing import TYPE_CHECKING, TypeVar, cast
910

1011
from crawlee import service_locator
11-
from crawlee._autoscaling._types import ClientSnapshot, CpuSnapshot, EventLoopSnapshot, MemorySnapshot, Snapshot
12+
from crawlee._autoscaling._types import ClientSnapshot, CpuSnapshot, EventLoopSnapshot, MemorySnapshot, Ratio, Snapshot
1213
from crawlee._utils.byte_size import ByteSize
1314
from crawlee._utils.context import ensure_context
1415
from crawlee._utils.docs import docs_group
1516
from crawlee._utils.recurring_task import RecurringTask
16-
from crawlee._utils.system import MemoryInfo, get_memory_info
17+
from crawlee._utils.system import MemoryInfo, MemoryUsageInfo, get_memory_info
1718
from crawlee.events._types import Event, EventSystemInfoData
1819

1920
if TYPE_CHECKING:
@@ -26,6 +27,12 @@
2627
T = TypeVar('T', bound=Snapshot)
2728

2829

30+
@functools.lru_cache
31+
def _warn_once(warning_message: str) -> None:
32+
"""Log a warning message only once."""
33+
logger.warning(warning_message)
34+
35+
2936
class SortedSnapshotList(list[T]):
3037
"""A list that maintains sorted order by `created_at` attribute for snapshot objects."""
3138

@@ -69,7 +76,7 @@ def __init__(
6976
max_used_memory_ratio: float,
7077
max_event_loop_delay: timedelta,
7178
max_client_errors: int,
72-
max_memory_size: ByteSize,
79+
max_memory_size: ByteSize | Ratio,
7380
) -> None:
7481
"""Initialize a new instance.
7582
@@ -85,7 +92,9 @@ def __init__(
8592
value, the event loop is considered overloaded.
8693
max_client_errors: Sets the maximum number of client errors (HTTP 429). When the number of client errors
8794
is higher than the provided number, the client is considered overloaded.
88-
max_memory_size: Sets the maximum amount of system memory to be used by the `AutoscaledPool`.
95+
max_memory_size: Sets the maximum amount of system memory to be used by the `AutoscaledPool`. When of type
96+
`ByteSize` then it is used as fixed memory size. When of type `Ratio` then it allows for dynamic memory
97+
scaling based on the available system memory.
8998
"""
9099
self._max_used_cpu_ratio = max_used_cpu_ratio
91100
self._max_used_memory_ratio = max_used_memory_ratio
@@ -121,7 +130,7 @@ def from_config(cls, config: Configuration | None = None) -> Snapshotter:
121130
max_memory_size = (
122131
ByteSize.from_mb(config.memory_mbytes)
123132
if config.memory_mbytes
124-
else ByteSize(int(get_memory_info().total_size.bytes * config.available_memory_ratio))
133+
else Ratio(value=config.available_memory_ratio)
125134
)
126135

127136
return cls(
@@ -280,23 +289,55 @@ def _snapshot_memory(self, event_data: EventSystemInfoData) -> None:
280289
Args:
281290
event_data: System info data from which memory usage is read.
282291
"""
292+
match event_data.memory_info, self._max_memory_size:
293+
case MemoryInfo() as memory_info, Ratio() as ratio:
294+
max_memory_size = memory_info.total_size * ratio.value
295+
system_wide_used_size = memory_info.system_wide_used_size
296+
system_wide_memory_size = memory_info.total_size
297+
298+
case MemoryUsageInfo(), Ratio() as ratio:
299+
# This is just hypothetical case, that will most likely not happen in practice.
300+
# `LocalEventManager` should always provide `MemoryInfo` in the event data.
301+
# When running on Apify, `self._max_memory_size` is always `ByteSize`, not `Ratio`.
302+
_warn_once(
303+
'It is recommended that a custom implementation of `LocalEventManager` emits `SYSTEM_INFO` events '
304+
'with `MemoryInfo` and not just `MemoryUsageInfo`.'
305+
)
306+
max_memory_size = get_memory_info().total_size * ratio.value
307+
system_wide_used_size = None
308+
system_wide_memory_size = None
309+
310+
case MemoryInfo() as memory_info, ByteSize() as byte_size:
311+
max_memory_size = byte_size
312+
system_wide_used_size = memory_info.system_wide_used_size
313+
system_wide_memory_size = memory_info.total_size
314+
315+
case MemoryUsageInfo(), ByteSize() as byte_size:
316+
max_memory_size = byte_size
317+
system_wide_used_size = None
318+
system_wide_memory_size = None
319+
320+
case _, _:
321+
raise NotImplementedError('Unsupported combination of memory info and max memory size types.')
322+
283323
snapshot = MemorySnapshot(
284324
current_size=event_data.memory_info.current_size,
285-
max_memory_size=self._max_memory_size,
325+
max_memory_size=max_memory_size,
286326
max_used_memory_ratio=self._max_used_memory_ratio,
287327
created_at=event_data.memory_info.created_at,
288-
system_wide_used_size=None,
289-
system_wide_memory_size=None,
328+
system_wide_used_size=system_wide_used_size,
329+
system_wide_memory_size=system_wide_memory_size,
290330
)
291331

292-
if isinstance(memory_info := event_data.memory_info, MemoryInfo):
293-
snapshot.system_wide_used_size = memory_info.system_wide_used_size
294-
snapshot.system_wide_memory_size = memory_info.total_size
295-
296332
snapshots = cast('list[Snapshot]', self._memory_snapshots)
297333
self._prune_snapshots(snapshots, snapshot.created_at)
298334
self._memory_snapshots.add(snapshot)
299-
self._evaluate_memory_load(event_data.memory_info.current_size, event_data.memory_info.created_at)
335+
336+
self._evaluate_memory_load(
337+
event_data.memory_info.current_size,
338+
event_data.memory_info.created_at,
339+
max_memory_size=max_memory_size,
340+
)
300341

301342
def _snapshot_event_loop(self) -> None:
302343
"""Capture a snapshot of the current event loop usage.
@@ -364,27 +405,30 @@ def _prune_snapshots(self, snapshots: list[Snapshot], now: datetime) -> None:
364405
else:
365406
snapshots.clear()
366407

367-
def _evaluate_memory_load(self, current_memory_usage_size: ByteSize, snapshot_timestamp: datetime) -> None:
408+
def _evaluate_memory_load(
409+
self, current_memory_usage_size: ByteSize, snapshot_timestamp: datetime, max_memory_size: ByteSize
410+
) -> None:
368411
"""Evaluate and logs critical memory load conditions based on the system information.
369412
370413
Args:
371414
current_memory_usage_size: The current memory usage.
372415
snapshot_timestamp: The time at which the memory snapshot was taken.
416+
max_memory_size: The maximum memory size to be used for evaluation.
373417
"""
374418
# Check if the warning has been logged recently to avoid spamming
375419
if snapshot_timestamp < self._timestamp_of_last_memory_warning + self._MEMORY_WARNING_COOLDOWN_PERIOD:
376420
return
377421

378-
threshold_memory_size = self._max_used_memory_ratio * self._max_memory_size
379-
buffer_memory_size = self._max_memory_size * (1 - self._max_used_memory_ratio) * self._RESERVE_MEMORY_RATIO
422+
threshold_memory_size = self._max_used_memory_ratio * max_memory_size
423+
buffer_memory_size = max_memory_size * (1 - self._max_used_memory_ratio) * self._RESERVE_MEMORY_RATIO
380424
overload_memory_threshold_size = threshold_memory_size + buffer_memory_size
381425

382426
# Log a warning if current memory usage exceeds the critical overload threshold
383427
if current_memory_usage_size > overload_memory_threshold_size:
384-
memory_usage_percentage = round((current_memory_usage_size.bytes / self._max_memory_size.bytes) * 100)
428+
memory_usage_percentage = round((current_memory_usage_size.bytes / max_memory_size.bytes) * 100)
385429
logger.warning(
386430
f'Memory is critically overloaded. Using {current_memory_usage_size} of '
387-
f'{self._max_memory_size} ({memory_usage_percentage}%). '
431+
f'{max_memory_size} ({memory_usage_percentage}%). '
388432
'Consider increasing available memory.'
389433
)
390434
self._timestamp_of_last_memory_warning = snapshot_timestamp

src/crawlee/configuration.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -177,11 +177,14 @@ class Configuration(BaseSettings):
177177
validation_alias=AliasChoices(
178178
'apify_available_memory_ratio',
179179
'crawlee_available_memory_ratio',
180-
)
180+
),
181+
gt=0.0,
182+
le=1.0,
181183
),
182184
] = 0.25
183185
"""The maximum proportion of system memory to use. If `memory_mbytes` is not provided, this ratio is used to
184-
calculate the maximum memory. This option is utilized by the `Snapshotter`."""
186+
calculate the maximum memory. This option is utilized by the `Snapshotter` and supports the dynamic system memory
187+
scaling."""
185188

186189
storage_dir: Annotated[
187190
str,

tests/unit/_autoscaling/test_snapshotter.py

Lines changed: 68 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,23 @@
33
import asyncio
44
from datetime import datetime, timedelta, timezone
55
from logging import getLogger
6+
from math import floor
67
from typing import TYPE_CHECKING, cast
78
from unittest.mock import MagicMock
89

910
import pytest
1011

1112
from crawlee import service_locator
1213
from crawlee._autoscaling import Snapshotter
13-
from crawlee._autoscaling._types import ClientSnapshot, CpuSnapshot, MemorySnapshot
14+
from crawlee._autoscaling._types import (
15+
SYSTEM_WIDE_MEMORY_OVERLOAD_THRESHOLD,
16+
ClientSnapshot,
17+
CpuSnapshot,
18+
MemorySnapshot,
19+
)
1420
from crawlee._autoscaling.snapshotter import SortedSnapshotList
1521
from crawlee._utils.byte_size import ByteSize
16-
from crawlee._utils.system import CpuInfo, MemoryInfo
22+
from crawlee._utils.system import CpuInfo, MemoryInfo, get_memory_info
1723
from crawlee.configuration import Configuration
1824
from crawlee.events import LocalEventManager
1925
from crawlee.events._types import Event, EventSystemInfoData
@@ -373,3 +379,63 @@ def test_sorted_snapshot_list_add_maintains_order() -> None:
373379
prev_time = sorted_list[i - 1].created_at
374380
curr_time = snapshot.created_at
375381
assert prev_time <= curr_time, f'Items at indices {i - 1} and {i} are not in chronological order'
382+
383+
384+
@pytest.mark.parametrize('dynamic_memory', [True, False])
385+
async def test_dynamic_memory(
386+
*,
387+
default_cpu_info: CpuInfo,
388+
event_manager: LocalEventManager,
389+
dynamic_memory: bool,
390+
) -> None:
391+
"""Test dynamic memory scaling scenario where the system-wide memory can change.
392+
393+
Create two memory snapshots. They have same memory usage, but different available memory.
394+
First snapshot is created with insufficient memory, so it is overloaded.
395+
Second snapshot is created with sufficient memory.
396+
397+
Based on the Snapshotter configuration, it will either take into account the increased available memory or not.
398+
"""
399+
_initial_memory_info = get_memory_info()
400+
ratio_just_below_system_wide_overload = 0.99 * SYSTEM_WIDE_MEMORY_OVERLOAD_THRESHOLD
401+
402+
memory_mbytes = 0 if dynamic_memory else floor(_initial_memory_info.total_size.to_mb())
403+
404+
service_locator.set_event_manager(event_manager)
405+
406+
async with Snapshotter.from_config(
407+
Configuration(memory_mbytes=memory_mbytes, available_memory_ratio=ratio_just_below_system_wide_overload)
408+
) as snapshotter:
409+
# Default state, memory usage exactly at the overload threshold -> overloaded, but not system-wide overloaded
410+
memory_infos = [
411+
# Overloaded sample
412+
MemoryInfo(
413+
total_size=_initial_memory_info.total_size,
414+
current_size=_initial_memory_info.total_size * ratio_just_below_system_wide_overload,
415+
system_wide_used_size=_initial_memory_info.total_size * ratio_just_below_system_wide_overload,
416+
),
417+
# Same as first sample, with twice as memory available in the system
418+
MemoryInfo(
419+
total_size=_initial_memory_info.total_size * 2, # Simulate increased total memory
420+
current_size=_initial_memory_info.total_size * ratio_just_below_system_wide_overload,
421+
system_wide_used_size=_initial_memory_info.total_size * ratio_just_below_system_wide_overload,
422+
),
423+
]
424+
425+
for memory_info in memory_infos:
426+
event_manager.emit(
427+
event=Event.SYSTEM_INFO,
428+
event_data=EventSystemInfoData(
429+
cpu_info=default_cpu_info,
430+
memory_info=memory_info,
431+
),
432+
)
433+
434+
await event_manager.wait_for_all_listeners_to_complete()
435+
436+
memory_samples = snapshotter.get_memory_sample()
437+
assert len(memory_samples) == 2
438+
# First sample will be overloaded.
439+
assert memory_samples[0].is_overloaded
440+
# Second sample can reflect the increased available memory based on the configuration used to create Snapshotter
441+
assert memory_samples[1].is_overloaded == (not dynamic_memory)

0 commit comments

Comments
 (0)