Skip to content
4 changes: 2 additions & 2 deletions src/apify/storage_clients/_apify/_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@ class CachedRequest(BaseModel):
Only internal structure.
"""

unique_key: str
"""Unique key of the request."""
id: str
"""Id of the request."""

was_already_handled: bool
"""Whether the request was already handled."""
Expand Down
78 changes: 44 additions & 34 deletions src/apify/storage_clients/_apify/_request_queue_shared_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,10 @@ def __init__(
"""The Apify request queue client for API operations."""

self._queue_head = deque[str]()
"""A deque to store request unique keys in the queue head."""
"""A deque to store request ids in the queue head."""

self._requests_cache: LRUCache[str, CachedRequest] = LRUCache(maxsize=cache_size)
"""A cache to store request objects. Request unique key is used as the cache key."""
"""A cache to store request objects. Request id is used as the cache key."""

self._queue_has_locked_requests: bool | None = None
"""Whether the queue has requests locked by another client."""
Expand Down Expand Up @@ -101,12 +101,14 @@ async def add_batch_of_requests(
already_present_requests: list[ProcessedRequest] = []

for request in requests:
if self._requests_cache.get(request.unique_key):
request_id = unique_key_to_request_id(request.unique_key)
if self._requests_cache.get(request_id):
# We are not sure if it was already handled at this point, and it is not worth calling API for it.
# It could have been handled by another client in the meantime, so cached information about
# `request.was_already_handled` is not reliable.
already_present_requests.append(
ProcessedRequest(
id=request_id,
unique_key=request.unique_key,
was_already_present=True,
was_already_handled=request.was_already_handled,
Expand All @@ -116,12 +118,13 @@ async def add_batch_of_requests(
else:
# Add new request to the cache.
processed_request = ProcessedRequest(
id=request_id,
unique_key=request.unique_key,
was_already_present=True,
was_already_handled=request.was_already_handled,
)
self._cache_request(
request.unique_key,
request_id,
processed_request,
)
new_requests.append(request)
Expand All @@ -131,7 +134,6 @@ async def add_batch_of_requests(
requests_dict = [
request.model_dump(
by_alias=True,
exclude={'id'}, # Exclude ID fields from requests since the API doesn't accept them.
)
for request in new_requests
]
Expand All @@ -146,7 +148,8 @@ async def add_batch_of_requests(

# Remove unprocessed requests from the cache
for unprocessed_request in api_response.unprocessed_requests:
self._requests_cache.pop(unprocessed_request.unique_key, None)
unprocessed_request_id = unique_key_to_request_id(unprocessed_request.unique_key)
self._requests_cache.pop(unprocessed_request_id, None)

else:
api_response = AddRequestsResponse.model_validate(
Expand Down Expand Up @@ -179,7 +182,10 @@ async def get_request(self, unique_key: str) -> Request | None:
Returns:
The request or None if not found.
"""
response = await self._api_client.get_request(unique_key_to_request_id(unique_key))
return await self._get_request_by_id(unique_key_to_request_id(unique_key))

async def _get_request_by_id(self, request_id: str) -> Request | None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am reluctant to follow a formatting rule that is not enforced automatically by a tool. We do not even have a coding style defined, so how does one know what rule to follow? The code base is not compliant with the rule anyway. Cherry picking it randomly in some PR's is just annoying.

I will be happy to follow the rule as long as it is enforced by a tool.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw. it is common for IDEs to offer customizable method sorting to your liking, for example, in PyCharm
image

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright, not gonna force you. I'm not familiar with this functionality in VS Code, but anyway, you can't do it in GitHub - the point is that with proper ordering, the public interface is clearer and the whole class more readable. And what bothers me the most is the chaotic alternation between private and public methods.

response = await self._api_client.get_request(request_id)

if response is None:
return None
Expand All @@ -206,32 +212,32 @@ async def fetch_next_request(self) -> Request | None:
return None

# Get the next request ID from the queue head
next_unique_key = self._queue_head.popleft()
next_request_id = self._queue_head.popleft()

request = await self._get_or_hydrate_request(next_unique_key)
request = await self._get_or_hydrate_request(next_request_id)

# Handle potential inconsistency where request might not be in the main table yet
if request is None:
logger.debug(
'Cannot find a request from the beginning of queue, will be retried later',
extra={'nextRequestUniqueKey': next_unique_key},
extra={'nextRequestId': next_request_id},
)
return None

# If the request was already handled, skip it
if request.handled_at is not None:
logger.debug(
'Request fetched from the beginning of queue was already handled',
extra={'nextRequestUniqueKey': next_unique_key},
extra={'nextRequestId': next_request_id},
)
return None

# Use get request to ensure we have the full request object.
request = await self.get_request(request.unique_key)
request = await self._get_request_by_id(next_request_id)
if request is None:
logger.debug(
'Request fetched from the beginning of queue was not found in the RQ',
extra={'nextRequestUniqueKey': next_unique_key},
extra={'nextRequestId': next_request_id},
)
return None

Expand All @@ -248,15 +254,17 @@ async def mark_request_as_handled(self, request: Request) -> ProcessedRequest |
Returns:
Information about the queue operation. `None` if the given request was not in progress.
"""
request_id = unique_key_to_request_id(request.unique_key)
# Set the handled_at timestamp if not already set
if request.handled_at is None:
request.handled_at = datetime.now(tz=timezone.utc)

if cached_request := self._requests_cache[request.unique_key]:
if cached_request := self._requests_cache[request_id]:
cached_request.was_already_handled = request.was_already_handled
try:
# Update the request in the API
processed_request = await self._update_request(request)
processed_request.id = request_id
processed_request.unique_key = request.unique_key

# Update assumed handled count if this wasn't already handled
Expand All @@ -265,10 +273,9 @@ async def mark_request_as_handled(self, request: Request) -> ProcessedRequest |
self.metadata.pending_request_count -= 1

# Update the cache with the handled request
cache_key = request.unique_key
self._cache_request(
cache_key,
processed_request,
cache_key=request_id,
processed_request=processed_request,
hydrated_request=request,
)
except Exception as exc:
Expand Down Expand Up @@ -352,17 +359,17 @@ async def _ensure_head_is_non_empty(self) -> None:
# Fetch requests from the API and populate the queue head
await self._list_head()

async def _get_or_hydrate_request(self, unique_key: str) -> Request | None:
async def _get_or_hydrate_request(self, request_id: str) -> Request | None:
"""Get a request by unique key, either from cache or by fetching from API.

Args:
unique_key: Unique key of the request to get.
request_id: Id of the request to get.

Returns:
The request if found and valid, otherwise None.
"""
# First check if the request is in our cache
cached_entry = self._requests_cache.get(unique_key)
cached_entry = self._requests_cache.get(request_id)

if cached_entry and cached_entry.hydrated:
# If we have the request hydrated in cache, return it
Expand All @@ -371,25 +378,25 @@ async def _get_or_hydrate_request(self, unique_key: str) -> Request | None:
# If not in cache or not hydrated, fetch the request
try:
# Fetch the request data
request = await self.get_request(unique_key)
request = await self._get_request_by_id(request_id)

# If request is not found and return None
if not request:
return None

# Update cache with hydrated request
cache_key = request.unique_key
self._cache_request(
cache_key,
ProcessedRequest(
cache_key=request_id,
processed_request=ProcessedRequest(
id=request_id,
unique_key=request.unique_key,
was_already_present=True,
was_already_handled=request.handled_at is not None,
),
hydrated_request=request,
)
except Exception as exc:
logger.debug(f'Error fetching request {unique_key}: {exc!s}')
logger.debug(f'Error fetching request {request_id}: {exc!s}')
return None
else:
return request
Expand Down Expand Up @@ -438,8 +445,8 @@ async def _list_head(
logger.debug(f'Using cached queue head with {len(self._queue_head)} requests')
# Create a list of requests from the cached queue head
items = []
for unique_key in list(self._queue_head)[:limit]:
cached_request = self._requests_cache.get(unique_key)
for request_id in list(self._queue_head)[:limit]:
cached_request = self._requests_cache.get(request_id)
if cached_request and cached_request.hydrated:
items.append(cached_request.hydrated)

Expand Down Expand Up @@ -472,32 +479,35 @@ async def _list_head(

for request_data in response.get('items', []):
request = Request.model_validate(request_data)
request_id = request_data.get('id')

# Skip requests without ID or unique key
if not request.unique_key:
if not request.unique_key or not request_id:
logger.debug(
'Skipping request from queue head, missing unique key',
'Skipping request from queue head, missing unique key or id',
extra={
'unique_key': request.unique_key,
'id': request_id,
},
)
continue

# Cache the request
self._cache_request(
request.unique_key,
request_id,
ProcessedRequest(
id=request_id,
unique_key=request.unique_key,
was_already_present=True,
was_already_handled=False,
),
hydrated_request=request,
)
self._queue_head.append(request.unique_key)
self._queue_head.append(request_id)

for leftover_unique_key in leftover_buffer:
for leftover_id in leftover_buffer:
# After adding new requests to the forefront, any existing leftover locked request is kept in the end.
self._queue_head.append(leftover_unique_key)
self._queue_head.append(leftover_id)
return RequestQueueHead.model_validate(response)

def _cache_request(
Expand All @@ -516,7 +526,7 @@ def _cache_request(
hydrated_request: The hydrated request object, if available.
"""
self._requests_cache[cache_key] = CachedRequest(
unique_key=processed_request.unique_key,
id=processed_request.id,
was_already_handled=processed_request.was_already_handled,
hydrated=hydrated_request,
lock_expires_at=None,
Expand Down
Loading