Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@ title: Asynchronous Workers
sidebar_order: 70
---

<Alert level="warning" title="Deprecated">
Celery workers are deprecated, and will be removed as of the 25.10.x self-hosted
release.
</Alert>

Sentry comes with a built-in queue to process tasks in a more asynchronous fashion. For example when an event comes in instead of writing it to the database immediately, it sends a job to the queue so that the request can be returned right away, and the background workers handle actually saving that data.

Sentry relies on the [Celery](https://docs.celeryproject.org/) library for managing workers.
Expand Down Expand Up @@ -75,7 +80,7 @@ There are a few important points:
because this is what registers the task by name. Thus every module
containing a task must be added to the `CELERY_IMPORTS` and `TASKWORKER_IMPORTS` settings in
`src/sentry/conf/server.py`.

We have a separate setting for the Taskbroker workers until we fully deprecate the Celery workers.

## Running a Worker
Expand Down
286 changes: 286 additions & 0 deletions develop-docs/backend/application-domains/tasks/index.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,286 @@
---
title: Asynchronous Tasks
sidebar_order: 70
---

Sentry includes a framework for scheduling and executing tasks that are executed
asynchronously in a background worker process. The task framework within Sentry
is inspired by [Celery](https://docs.celeryproject.org) as it was originally
built with celery.

# Defining Tasks

Sentry tasks are configured with the `instrumented_task` decorator that
includes features like automatic tracing and metric collection, and multi-region
silo enforcement.

```python
from sentry.tasks.base import instrumented_task
from sentry.taskworker.namespaces import issues_tasks
from sentry.taskworker.retry import Retry

@instrumented_task(
name="sentry.widgets.tasks.do_work",
namespace=issues_tasks,
retry=Retry(times=3, on=(ConnectionError,)),
processing_deadline_duration=60
)
def do_work(organization_id: int, issue_id: int, **kwargs) -> None:
...
```

When defining tasks there are some constraints:

- All tasks _must_ have names. The name of task is serialized into an
'activation' message that is persisted in Kafka. Task names must be stable
between deployments to avoid lost tasks.

- Task parameters _must_ be JSON serializable. You cannot pass arbitrary python
objects through task parameters.

- Tasks _must_ define a 'processing deadline'. After a task's processing
deadline has elapsed, it will be killed by the worker runtime. Tasks that are
killed for execution duration are not automatically retried.

- All tasks must be assigned to a 'namespace'. A namespace is a group of related
tasks that are operated together and share a backlog.

- The return value of a task is not stored and ignored by workers.

- The module containing a task _must_ be added to `TASKWORKER_IMPORTS` in
`src/sentry/conf/server.py`

## Scheduling Tasks

With our task defined we can schedule a task (also called an "activation"):

```python
from sentry.widgets.tasks import do_work

# Can call the task synchronously like a normal function
do_work(organization_id=org.id, issue_id=issue.id)

# Use .delay() to schedule a task to run in the future as soon as possible
do_work.delay(organization_id=org.id, issue_id=issue.id)

# Use .apply_async() when you need to define headers, countdown, or expires
# for your task. Here we schedule a task to run in 5 minutes (300 seconds)
do_work.apply_async(
kwargs={"organization_id": org.id, "issue_id": issue.id},
countdown=300
)
```

When tasks are executed, the parameter payload is deserialized, and the task
function is called. Tasks are successfully completed if they don't raise an
error. If an error is raised from a task, or the task's deadline expires, the
task is considered a failure and needs to be retried, put into a dead-letter
queue or dropped depending on the task and failure.

## Retries

When defining tasks, you can define a retry policy with the `retry` parameter.
When a worker executes an activation with a retry policy, any non-successful
outcome will result in the retry policy being evaluated. If the task has retries
remaining, and the captured error is a retriable error, the worker sends
a status of retry to the worker's broker. The taskbroker will take care of marking
the current activation as complete and producing a new activation to be
processed later.

If a task does not define a retry policy the retry policy of the task namespace
is inherited.

```python
@instrumented_task(
name="sentry.issues.tasks.deliver_issue_webhook",
namespace=issues_tasks,
retry=Retry(times=3, times_exceeded=LastAction.Deadletter),
)
def deliver_issue_webhook(organization_id: int, group_id: int) -> None:
...
```

### Conditional Retries

Retries can be conditional based on the exception type:

```python
retry=Retry(on=(IntegrationError,), times=3, times_exceeded=LastAction.Discard)
```

### Retry delays

By default retries will be executed as soon as they are consumed. If a task
needs to stagger retries, it can use a delayed retry.

```python
@instrumented_task(
name="sentry.integrations.fetch_commits",
namespace=issues_tasks,
retry=Retry(times=3, on=(IntegrationError, ), delay=30)
)
def fetch_commits(repository_id: int) -> None:
...
```

With the above configuration, each retry will be processed at least 30 seconds
after the previous attempt. The delay between retries could be longer than 30
seconds, but won’t be shorter.

## Processing Deadlines

Every task has a 'processing deadline' which is the maximum expected runtime for a task. If
a task does not define a processing deadline, it will inherit the deadline
defined on the task's namespace or use the default of **10 seconds**. Task
deadlines are intended to be generous and are intended to prevent workers being
saturated by tasks running for unbounded amounts of time.

```python
@instrumented_task(
name="sentry.integrations.fetch_commits",
namespace=issues_tasks,
# Extended from the default 10
processing_deadline_duration=60
)
def fetch_commits(repository_id: int) -> None:
...
```

After this a task has run for the length of its processing deadline, it will be
interrupted by `SIGALRM` which raises a `ProcessingDeadlineExceeded` error which
will interrupt your task’s logic.

### Resolving deadline issues

In most scenarios the simplest solution is to extend the deadline for a task.
This is the recommended solution until you get above 20min of duration. After
this duration the chances of your task being terminated by a deploy increase
quickly. Instead of extending the deadline further, you should rethink your
logic and partition the workload into smaller batches, or individual jobs that
can be processed independently. Instead of mapping all projects in a single
task, spawn multiple tasks.

## Expiration deadlines

A task's expiration time defines a point in time after which a task is
considered expired and should not be executed. This mechanism allows tasks to be
skipped if they are stale and their results are no longer relevant.

```python
@instrumented_tasks(
name="sentry.issues.tasks.deliver_issue_webhook",
namespace=issues_tasks,
expires=timedelta("5 minutes"),
)
def deliver_issue_webhook(organization_id: int, group_id: int):
...
```

Expiration times can be expressed as `timedelta` objects or a number of seconds.
Tasks that are past their expiration will not be sent to workers. Instead they
will be discarded or dead-lettered depending on task configuration.

## Future schedules

Tasks can be scheduled to be run up to an hour in the future with the
`countdown` parameter.

```jsx
deliver_issue_webhook.apply_async(countdown=timedelta(minutes=10))
```

Countdown tasks will be processed and retained by taskbroker until their
countdown has elapsed. Once the countdown delay has elapsed the task will be
made available for workers.

## Idempotency (at_most_once)

Tasks are processed with at-least-once guarantees. A task may be attempted
multiple times if processing deadlines are exceeded. To prevent multiple
executions, tasks can enable `at_most_once` which enables at-most-once
execution.

```python
@instrumented_task(
name="sentry.issues.tasks.deliver_issue_webhook",
namespace=issues_tasks,
at_most_once=True,
)
def deliver_issue_webhook(organization_id: int, group_id: int) -> None:
...

```

If an idempotent task exceeds a processing deadline, it will *not* be retried.

# Testing Tasks

Tasks can be tested with a few different approaches. The first is with the
`self.tasks()` or `TaskRunner` context manager. When these context managers are
entered, tasks will be executed *synchronously* which allows you to validate the
side-effects of your tasks and validate that parameters to your task are JSON
compatible:

```python
def test_action_with_tasks(self):
with self.tasks():
self.client.get("/organizations/slug/do-thing/")
# can make assertions on side-effects of tasks spawned by the endpoint.
```

Tasks can also be tested with `mock.patch` :

```python
@patch("sentry.hybridcloud.tasks.deliver_webhooks.drain_mailbox")
def test_schedule_task(self, mock_deliver: MagicMock) -> None:
# Do work to trigger the task
# Assert that the task was scheduled
mock_deliver.delay.assert_called_with(webhook_one.id)
```

<Alert type="warning">
Mocking tasks will not validate that parameters are JSON compatible, nor will it catch TypeErrors from signature mismatches.
</Alert>

# Task namespaces

Task namespaces are created as code, and configuration are linked to the
namespace when it is declared.

```python
# in sentry.taskworker.namespaces
from sentry.taskworker.config import taskregistry
from sentry.taskworker.retry import LastAction, Retry

issues_tasks = taskregistry.create_namespace(
"issues",
retry=Retry(times=3, times_exceeded=LastAction.Discard)
)

# register tasks within a namespace
@instrumented_task(name="tasks.do_work", namespace=issues_tasks)
def do_work(**kwargs):
...
```

Namespaces can define default behaviour for `retry` , `processing_deadline` and
`expires` for the tasks they contain. Without explicit routing, any namespace
will be run in our `default` worker pools. If your task namespace will be
high-throughput (more than 1500 tasks per second) consider provisioning
a dedicated pool for your tasks.

# Periodically Scheduled Tasks

Task can also be set to spawn on a periodic schedule. To accomplish this, simply
update the `TASKWORKER_SCHEDULE` configuration found in
`src/sentry/conf/server.py` with the appropriate namespace, task, and schedule.
Taskworker supports `timedelta` and `crontab` schedule types:

```python
TASKWORKER_REGION_SCHEDULES: ScheduleConfigMap = {
"send-beacon": {
"task": "selfhosted:sentry.tasks.send_beacon",
"schedule": task_crontab("0", "*/1", "*", "*", "*"),
},
}
```
44 changes: 44 additions & 0 deletions develop-docs/backend/application-domains/tasks/running-workers.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
---
title: Running workers
sidebar_order: 30
---

<Alert title="Note">
See [operating tasks](/self-hosted/tasks/) for how to operate workers and
brokers in self-hosted.
</Alert>

## Running a worker

Workers are responsible for fetching tasks, executing them and reporting
completion status back to a taskbroker. Before you can run a worker, you'll need
to [start a taskbroker](#running-taskbroker]). There are a few options for
running workers:

```bash
# Run a worker in the foreground
sentry run taskworker

# Run a worker as part of the devserver
sentry devserver --workers

# Foreground the worker from a devservices mode like `ingest`
devservices foreground taskworker
```

## Running a task scheduler

The task-scheduler is responsible for reading the task schedules from
`settings.TASKWORKER_SCHEDULES` and spawning tasks as schedules become due.

```bash
sentry run taskworker-scheduler
```

## Running taskbroker

Taskbroker can be started as a background service with `devservices`

```bash
devservices up --mode=taskbroker
```
Loading
Loading