Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions packit_service/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,10 @@
}

CELERY_TASK_DEFAULT_QUEUE = "short-running"
CELERY_TASK_RATE_LIMITED_QUEUE = "rate-limited"
RATE_LIMIT_THRESHOLD = 200
# Jobs in rate-limited queue expire after 1 hour
RATE_LIMITED_QUEUE_EXPIRES_SECONDS = 3600

CELERY_DEFAULT_MAIN_TASK_NAME = "task.steve_jobs.process_message"

Expand Down
36 changes: 36 additions & 0 deletions packit_service/worker/handlers/abstract.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@
from packit.constants import DATETIME_FORMAT

from packit_service.config import ServiceConfig
from packit_service.constants import (
CELERY_TASK_RATE_LIMITED_QUEUE,
RATE_LIMIT_THRESHOLD,
RATE_LIMITED_QUEUE_EXPIRES_SECONDS,
)
from packit_service.events.event import Event
from packit_service.events.event_data import EventData
from packit_service.models import (
Expand Down Expand Up @@ -457,6 +462,37 @@ def get_signature(cls, event: Event, job: Optional[JobConfig]) -> Signature:
def run(self) -> TaskResults:
raise NotImplementedError("This should have been implemented.")

def check_rate_limit_remaining(self) -> None:
"""
Check the remaining rate limit towards the service.
To be used when running in a task context.
If it is low, enqueue the task to the rate-limited queue.
"""
from packit_service.celerizer import celery_app

# Get the current executing task from the worker context.
# This is needed because handlers can be created in different contexts:
# - Regular handlers: created in task functions with celery_task=self
# - Babysit handlers: created to generate signatures, then executed in new tasks
# When check_rate_limit_remaining() is called during execution, we need the
# actual task that's currently running, not the one that created the handler.
celery_task = celery_app.current_worker_task
if not celery_task:
logger.warning("No current task found, skipping rate limit check.")
return
remaining = self.project.get_rate_limit_remaining() if self.project else None
if remaining and remaining < RATE_LIMIT_THRESHOLD:
logger.warning(
f"Rate limit remaining is low: {remaining}, "
"enqueuing task to the rate-limited queue."
)
celery_task.apply_async(
queue=CELERY_TASK_RATE_LIMITED_QUEUE,
expires=RATE_LIMITED_QUEUE_EXPIRES_SECONDS,
)
Comment on lines +489 to +492
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

Using apply_async here will duplicate the task when the rate limit is low, as it schedules a new task without stopping the current one. This can lead to unexpected behavior and resource consumption.

To correctly re-queue the task for a later attempt, you should use celery_task.retry(). This method raises a Retry exception that the Celery worker catches, stopping the current execution and rescheduling the task with the specified options.

Suggested change
celery_task.apply_async(
queue=CELERY_TASK_RATE_LIMITED_QUEUE,
expires=RATE_LIMITED_QUEUE_EXPIRES_SECONDS,
)
celery_task.retry(
queue=CELERY_TASK_RATE_LIMITED_QUEUE,
expires=RATE_LIMITED_QUEUE_EXPIRES_SECONDS,
)

else:
logger.info(f"Rate limit remaining is high or not available: {remaining}.")


class RetriableJobHandler(JobHandler):
def __init__(
Expand Down
3 changes: 3 additions & 0 deletions packit_service/worker/handlers/copr.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ def get_checkers() -> tuple[type[Checker], ...]:
)

def run(self) -> TaskResults:
self.check_rate_limit_remaining()
Copy link
Member

@nforro nforro Jan 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't it be better to call this automatically in the parent class (JobHandler)? Or a subclass (RateLimitAwareJobHandler) if some handlers should be excluded?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I didn't want to refactor to much the run methods but I can do it.

# [XXX] For now cancel only when an environment variable is defined,
# should allow for less stressful testing and also optionally turning
# the cancelling on-and-off on the prod
Expand Down Expand Up @@ -173,6 +174,7 @@ def set_logs_url(self):
self.build.set_build_logs_url(copr_build_logs)

def run(self):
self.check_rate_limit_remaining()
run_start_time = datetime.now(timezone.utc)
if not self.build:
model = "SRPMBuildDB" if self.copr_event.chroot == COPR_SRPM_CHROOT else "CoprBuildDB"
Expand Down Expand Up @@ -299,6 +301,7 @@ def set_built_packages(self):
self.build.set_built_packages(built_packages)

def run(self):
self.check_rate_limit_remaining()
run_start_time = datetime.now(timezone.utc)
logger.info(
f"[CELERY_EXEC] CoprBuildEndHandler execution started for "
Expand Down
2 changes: 2 additions & 0 deletions packit_service/worker/handlers/open_scan_hub.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ def get_issues_added_url(
return f"{openscanhub_url}/task/{self.event.task_id}/log/added.{file_format}"

def run(self) -> TaskResults:
self.check_rate_limit_remaining()
self.check_scan_and_build()
external_links = {"OpenScanHub task": self.event.scan.url}
if self.event.status == openscanhub.task.Status.success:
Expand Down Expand Up @@ -208,6 +209,7 @@ def __init__(self, **kwargs):
self.event: openscanhub.task.Started = self.data.to_event()

def run(self) -> TaskResults:
self.check_rate_limit_remaining()
self.check_scan_and_build()

state = BaseCommitStatus.running
Expand Down
2 changes: 2 additions & 0 deletions packit_service/worker/handlers/testing_farm.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,7 @@ def run_for_target(
failed[test_run.target] = result.get("details")

def run(self) -> TaskResults:
self.check_rate_limit_remaining()
# TODO: once we turn handlers into respective celery tasks, we should iterate
# here over *all* matching jobs and do them all, not just the first one
logger.debug(f"Test job config: {self.job_config}")
Expand Down Expand Up @@ -548,6 +549,7 @@ def db_project_event(self) -> Optional[ProjectEventModel]:
return self._db_project_event

def run(self) -> TaskResults:
self.check_rate_limit_remaining()
logger.debug(f"Testing farm {self.pipeline_id} result:\n{self.result}")

test_run_model = TFTTestRunTargetModel.get_by_pipeline_id(
Expand Down
Loading