-
-
Notifications
You must be signed in to change notification settings - Fork 1.6k
feat: Add taskworker usage guide content #14900
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 4 commits
Commits
Show all changes
8 commits
Select commit
Hold shift + click to select a range
8c8a9b9
feat: Add taskworker usage guide content
markstory b846d7f
Apply suggestions from code review
markstory dc51aae
Fix incorrect method usage.
markstory db6d94b
Add processing deadline details to docs
markstory cebdf09
feat: Add taskworker self-hosted docs (#14913)
markstory 3adce90
Update develop-docs/backend/application-domains/tasks/index.mdx
markstory 3ec792e
Remove duplicate content.
markstory 38e6e12
Fix running taskbroker instructions
markstory File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
286 changes: 286 additions & 0 deletions
286
develop-docs/backend/application-domains/tasks/index.mdx
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,286 @@ | ||
| --- | ||
| title: Asynchronous Tasks | ||
| sidebar_order: 70 | ||
| --- | ||
|
|
||
| Sentry includes a framework for scheduling and executing tasks that are executed | ||
| asynchronously in a background worker process. The task framework within Sentry | ||
| is inspired by [Celery](https://docs.celeryproject.org) as it was originally | ||
| built with celery. | ||
|
|
||
| # Defining Tasks | ||
|
|
||
| Sentry tasks are configured with the `instrumented_task` decorator that | ||
| includes features like automatic tracing and metric collection, and multi-region | ||
| silo enforcement. | ||
|
|
||
| ```python | ||
| from sentry.tasks.base import instrumented_task | ||
| from sentry.taskworker.namespaces import issues_tasks | ||
| from sentry.taskworker.retry import Retry | ||
|
|
||
| @instrumented_task( | ||
| name="sentry.widgets.tasks.do_work", | ||
| namespace=issues_tasks, | ||
| retry=Retry(times=3, on=(ConnectionError,)), | ||
| processing_deadline_duration=60 | ||
| ) | ||
| def do_work(organization_id: int, issue_id: int, **kwargs) -> None: | ||
| ... | ||
| ``` | ||
|
|
||
| When defining tasks there are some constraints: | ||
|
|
||
| - All tasks _must_ have names. The name of task is serialized into an | ||
| 'activation' message that is persisted in Kafka. Task names must be stable | ||
| between deployments to avoid lost tasks. | ||
|
|
||
| - Task parameters _must_ be JSON serializable. You cannot pass arbitrary python | ||
| objects through task parameters. | ||
|
|
||
| - Tasks _must_ define a 'processing deadline'. After a task's processing | ||
| deadline has elapsed, it will be killed by the worker runtime. Tasks that are | ||
| killed for execution duration are not automatically retried. | ||
|
|
||
| - All tasks must be assigned to a 'namespace'. A namespace is a group of related | ||
| tasks that are operated together and share a backlog. | ||
|
|
||
| - The return value of a task is not stored and ignored by workers. | ||
|
|
||
| - The module containing a task _must_ be added to `TASKWORKER_IMPORTS` in | ||
| `src/sentry/conf/server.py` | ||
|
|
||
| ## Scheduling Tasks | ||
|
|
||
| With our task defined we can schedule a task (also called an "activation"): | ||
|
|
||
| ```python | ||
| from sentry.widgets.tasks import do_work | ||
|
|
||
| # Can call the task synchronously like a normal function | ||
| do_work(organization_id=org.id, issue_id=issue.id) | ||
|
|
||
| # Use .delay() to schedule a task to run in the future as soon as possible | ||
| do_work.delay(organization_id=org.id, issue_id=issue.id) | ||
|
|
||
| # Use .apply_async() when you need to define headers, countdown, or expires | ||
| # for your task. Here we schedule a task to run in 5 minutes (300 seconds) | ||
| do_work.apply_async( | ||
| kwargs={"organization_id": org.id, "issue_id": issue.id}, | ||
| countdown=300 | ||
| ) | ||
| ``` | ||
|
|
||
| When tasks are executed, the parameter payload is deserialized, and the task | ||
| function is called. Tasks are successfully completed if they don't raise an | ||
| error. If an error is raised from a task, or the task's deadline expires, the | ||
| task is considered a failure and needs to be retried, put into a dead-letter | ||
| queue or dropped depending on the task and failure. | ||
|
|
||
| ## Retries | ||
|
|
||
| When defining tasks, you can define a retry policy with the `retry` parameter. | ||
| When a worker executes an activation with a retry policy, any non-successful | ||
| outcome will result in the retry policy being evaluated. If the task has retries | ||
| remaining, and the captured error is a retriable error, the worker sends | ||
| a status of retry to the worker's broke. The taskbroker will take care of marking | ||
| the current activation as complete and producing a new activation to be | ||
| processed later. | ||
|
|
||
| If a task does not define a retry policy the retry policy of the task namespace | ||
| is inherited. | ||
|
|
||
| ```python | ||
| @instrumented_task( | ||
| name="sentry.issues.tasks.deliver_issue_webhook", | ||
| namespace=issues_tasks, | ||
| retry=Retry(times=3, times_exceeded=LastAction.Deadletter), | ||
| ) | ||
| def deliver_issue_webhook(organization_id: int, group_id: int) -> None: | ||
| ... | ||
| ``` | ||
|
|
||
| ### Conditional Retries | ||
|
|
||
| Retries can be conditional based on the exception type: | ||
|
|
||
| ```python | ||
| retry=Retry(on=(IntegrationError,), times=3, times_exceeded=LastAction.Discard) | ||
| ``` | ||
|
|
||
| ### Retry delays | ||
|
|
||
| By default retries will be executed as soon as they are consumed. If a task | ||
| needs to stagger retries, it can use a delayed retry. | ||
|
|
||
| ```python | ||
| @instrumented_task( | ||
| name="sentry.integrations.fetch_commits", | ||
| namespace=issues_tasks, | ||
| retry=Retry(times=3, on=(IntegrationError, ), delay=30) | ||
| ) | ||
| def fetch_commits(repository_id: int) -> None: | ||
| ... | ||
| ``` | ||
|
|
||
| With the above configuration, each retry will be processed at least 30 seconds | ||
| after the previous attempt. The delay between retries could be longer than 30 | ||
| seconds, but won’t be shorter. | ||
|
|
||
| ## Processing Deadlines | ||
|
|
||
| Every task has a 'processing deadline' which is the maximum expected runtime for a task. If | ||
| a task does not define a processing deadline, it will inherit the deadline | ||
| defined on the task's namespace or use the default of **10 seconds**. Task | ||
| deadlines are intended to be generous and are intended to prevent workers being | ||
| saturated by tasks running for unbounded amounts of time. | ||
|
|
||
| ```python | ||
| @instrumented_task( | ||
| name="sentry.integrations.fetch_commits", | ||
| namespace=issues_tasks, | ||
| # Extended from the default 10 | ||
| processing_deadline_duration=60 | ||
| ) | ||
| def fetch_commits(repository_id: int) -> None: | ||
| ... | ||
| ``` | ||
|
|
||
| After this a task has run for the length of its processing deadline, it will be | ||
| interrupted by `SIGALRM` which raises a `ProcessingDeadlineExceeded` error which | ||
| will interrupt your task’s logic. | ||
|
|
||
| ### Resolving deadline issues | ||
|
|
||
| In most scenarios the simplest solution is to extend the deadline for a task. | ||
| This is the recommended solution until you get above 20min of duration. After | ||
| this duration the chances of your task being terminated by a deploy increase | ||
| quickly. Instead of extending the deadline further, you should rethink your | ||
| logic and partition the workload into smaller batches, or individual jobs that | ||
| can be processed independently. Instead of mapping all projects in a single | ||
| task, spawn multiple tasks. | ||
|
|
||
| ## Expiration deadlines | ||
|
|
||
| A task's expiration time defines a point in time after which a task is | ||
| considered expired and should not be executed. This mechanism allows tasks to be | ||
| skipped if they are stale and their results are no longer relevant. | ||
|
|
||
| ```python | ||
| @instrumented_tasks( | ||
| name="sentry.issues.tasks.deliver_issue_webhook", | ||
| namespace=issues_tasks, | ||
| expires=timedelta("5 minutes"), | ||
| ) | ||
| def deliver_issue_webhook(organization_id: int, group_id: int): | ||
| ... | ||
| ``` | ||
|
|
||
| Expiration times can be expressed as `timedelta` objects or a number of seconds. | ||
| Tasks that are past their expiration will not be sent to workers. Instead they | ||
| will be discarded or dead-lettered depending on task configuration. | ||
|
|
||
| ## Future schedules | ||
|
|
||
| Tasks can be scheduled to be run up to an hour in the future with the | ||
| `countdown` parameter. | ||
|
|
||
| ```jsx | ||
| deliver_issue_webhook.apply_async(countdown=timedelta(minutes=10)) | ||
| ``` | ||
|
|
||
| Countdown tasks will be processed and retained by taskbroker until their | ||
| countdown has elapsed. Once the countdown delay has elapsed the task will be | ||
| made available for workers. | ||
|
|
||
| ## Idempotency (at_most_once) | ||
|
|
||
| Tasks are processed with at-least-once guarantees. A task may be attempted | ||
| multiple times if processing deadlines are exceeded. To prevent multiple | ||
| executions, tasks can enable `at_most_once` which enables at-most-once | ||
| execution. | ||
|
|
||
| ```python | ||
| @instrumented_task( | ||
| name="sentry.issues.tasks.deliver_issue_webhook", | ||
| namespace=issues_tasks, | ||
| at_most_once=True, | ||
| ) | ||
| def deliver_issue_webhook(organization_id: int, group_id: int) -> None: | ||
| ... | ||
|
|
||
| ``` | ||
|
|
||
| If an idempotent task exceeds a processing deadline, it will *not* be retried. | ||
|
|
||
| # Testing Tasks | ||
|
|
||
| Tasks can be tested with a few different approaches. The first is with the | ||
| `self.tasks()` or `TaskRunner` context manager. When these context managers are | ||
| entered, tasks will be executed *synchronously* which allows you to validate the | ||
| side-effects of your tasks and validate that parameters to your task are JSON | ||
| compatible: | ||
|
|
||
| ```python | ||
| def test_action_with_tasks(self): | ||
| with self.tasks(): | ||
| self.client.get("/organizations/slug/do-thing/") | ||
| # can make assertions on side-effects of tasks spawned by the endpoint. | ||
| ``` | ||
|
|
||
| Tasks can also be tested with `mock.patch` : | ||
|
|
||
| ```python | ||
| @patch("sentry.hybridcloud.tasks.deliver_webhooks.drain_mailbox") | ||
| def test_schedule_task(self, mock_deliver: MagicMock) -> None: | ||
| # Do work to trigger the task | ||
| # Assert that the task was scheduled | ||
| mock_deliver.delay.assert_called_with(webhook_one.id) | ||
| ``` | ||
|
|
||
| <Alert type="warning"> | ||
| Mocking tasks will not validate that parameters are JSON compatible, nor will it catch TypeErrors from signature mismatches. | ||
| </Alert> | ||
|
|
||
| # Task namespaces | ||
|
|
||
| Task namespaces are created as code, and configuration are linked to the | ||
| namespace when it is declared. | ||
|
|
||
| ```python | ||
| # in sentry.taskworker.namespaces | ||
| from sentry.taskworker.config import taskregistry | ||
| from sentry.taskworker.retry import LastAction, Retry | ||
|
|
||
| issues_tasks = taskregistry.create_namespace( | ||
| "issues", | ||
| retry=Retry(times=3, times_exceeded=LastAction.Discard) | ||
| ) | ||
|
|
||
| # register tasks within a namespace | ||
| @instrumented_task(name="tasks.do_work", namespace=issues_tasks) | ||
| def do_work(**kwargs): | ||
| ... | ||
| ``` | ||
|
|
||
| Namespaces can define default behaviour for `retry` , `processing_deadline` and | ||
| `expires` for the tasks they contain. Without explicit routing, any namespace | ||
| will be run in our `default` worker pools. If your task namespace will be | ||
| high-throughput (more than 1500 tasks per second) consider provisioning | ||
| a dedicated pool for your tasks. | ||
|
|
||
| # Periodically Scheduled Tasks | ||
|
|
||
| Task can also be set to spawn on a periodic schedule. To accomplish this, simply | ||
| update the `TASKWORKER_SCHEDULE` configuration found in | ||
| `src/sentry/conf/server.py` with the appropriate namespace, task, and schedule. | ||
| Taskworker supports `timedelta` and `crontab` schedule types: | ||
|
|
||
| ```python | ||
| TASKWORKER_REGION_SCHEDULES: ScheduleConfigMap = { | ||
| "send-beacon": { | ||
| "task": "selfhosted:sentry.tasks.send_beacon", | ||
| "schedule": task_crontab("0", "*/1", "*", "*", "*"), | ||
| }, | ||
| } | ||
| ``` | ||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.