-
Notifications
You must be signed in to change notification settings - Fork 56
Move tasks to rate-limited queue near rate-limit threshold #2941
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a mechanism to handle API rate limiting by moving tasks to a rate-limited queue when the remaining API calls are below a certain threshold. This is achieved by adding a check_rate_limit_remaining method to the JobHandler and calling it in various handlers.
My review identifies a critical issue in the implementation where apply_async is used to re-queue tasks. This would lead to task duplication instead of a retry. I've suggested using celery_task.retry() which is the correct approach to stop the current task and re-queue it for later execution.
| celery_task.apply_async( | ||
| queue=CELERY_TASK_RATE_LIMITED_QUEUE, | ||
| expires=RATE_LIMITED_QUEUE_EXPIRES_SECONDS, | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using apply_async here will duplicate the task when the rate limit is low, as it schedules a new task without stopping the current one. This can lead to unexpected behavior and resource consumption.
To correctly re-queue the task for a later attempt, you should use celery_task.retry(). This method raises a Retry exception that the Celery worker catches, stopping the current execution and rescheduling the task with the specified options.
| celery_task.apply_async( | |
| queue=CELERY_TASK_RATE_LIMITED_QUEUE, | |
| expires=RATE_LIMITED_QUEUE_EXPIRES_SECONDS, | |
| ) | |
| celery_task.retry( | |
| queue=CELERY_TASK_RATE_LIMITED_QUEUE, | |
| expires=RATE_LIMITED_QUEUE_EXPIRES_SECONDS, | |
| ) |
|
Build succeeded. ✔️ pre-commit SUCCESS in 1m 42s |
| ) | ||
|
|
||
| def run(self) -> TaskResults: | ||
| self.check_rate_limit_remaining() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldn't it be better to call this automatically in the parent class (JobHandler)? Or a subclass (RateLimitAwareJobHandler) if some handlers should be excluded?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I didn't want to refactor to much the run methods but I can do it.
lbarcziova
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
have you checked if Celery doesn't provide something more native for this kind of use case, of scheduling a task for later, e.g. could this be used, instead of a separate queue? Or could you explain what would be benefit of the separate queue?
The idea behind this code is not to delay the execution of a task. I just want to be sure that if any of these tasks are going to block they will not stop the processing of our short-running tasks. I rely on the retry-after handling in ogr to do the waiting (if needed), in this way the tasks will wait for exactly the amount of time they are supposed to, based on the feedback from the service. They will wait in a queue where they can rest for up to an hour (much longer than in our other queues). And they will not stop other tasks from running. |
Thinking about it now, wouldn't this mess up the ordering? Let's say there is a task that would hit rate limits so it's placed in the queue, and in the meantime there is another task that starts when rate limits are no longer in place - it will be executed sooner than the first task. And this can add up, leaving "unlucky" tasks without being processed for a long time. That could be very confusing for users (but perhaps they are equally confused already by the current state 😅). But without any prioritization there could be a task that would be unlucky enough to be put back in the queue every time, or multiple times in a row, and users may be tempted to try and retrigger it, so we should account for that as well. |
I'm not sure. I expect that once a project is no longer rate-limited, tasks from the rate-limited queue will be executed as soon as possible since they'll be among the oldest tasks waiting. But absolutely the order can somehow change. However I think it is better reshuffle the order than to let tasks die. |
You are probably right, let's see how it behaves in practice.
👍 |
|
@majamassarini ok I see now, thanks for the explanation! Let's see how it performs like this and adjust accordingly. |
00456df to
4f11e36
Compare
|
Build succeeded. ✔️ pre-commit SUCCESS in 1m 44s |
Related to packit/deployment#681