diff --git a/develop-docs/backend/application-domains/asynchronous-workers.mdx b/develop-docs/backend/application-domains/asynchronous-workers.mdx index 697d5350ce277..6029bdd4a1f58 100644 --- a/develop-docs/backend/application-domains/asynchronous-workers.mdx +++ b/develop-docs/backend/application-domains/asynchronous-workers.mdx @@ -3,6 +3,11 @@ title: Asynchronous Workers sidebar_order: 70 --- + +Celery workers are deprecated, and will be removed as of the 25.10.x self-hosted +release. + + Sentry comes with a built-in queue to process tasks in a more asynchronous fashion. For example when an event comes in instead of writing it to the database immediately, it sends a job to the queue so that the request can be returned right away, and the background workers handle actually saving that data. Sentry relies on the [Celery](https://docs.celeryproject.org/) library for managing workers. @@ -75,7 +80,7 @@ There are a few important points: because this is what registers the task by name. Thus every module containing a task must be added to the `CELERY_IMPORTS` and `TASKWORKER_IMPORTS` settings in `src/sentry/conf/server.py`. - + We have a separate setting for the Taskbroker workers until we fully deprecate the Celery workers. ## Running a Worker diff --git a/develop-docs/backend/application-domains/tasks/index.mdx b/develop-docs/backend/application-domains/tasks/index.mdx new file mode 100644 index 0000000000000..90c66822c4dfc --- /dev/null +++ b/develop-docs/backend/application-domains/tasks/index.mdx @@ -0,0 +1,286 @@ +--- +title: Asynchronous Tasks +sidebar_order: 70 +--- + +Sentry includes a framework for scheduling and executing tasks that are executed +asynchronously in a background worker process. The task framework within Sentry +is inspired by [Celery](https://docs.celeryproject.org) as it was originally +built with celery. + +# Defining Tasks + +Sentry tasks are configured with the `instrumented_task` decorator that +includes features like automatic tracing and metric collection, and multi-region +silo enforcement. + +```python +from sentry.tasks.base import instrumented_task +from sentry.taskworker.namespaces import issues_tasks +from sentry.taskworker.retry import Retry + +@instrumented_task( + name="sentry.widgets.tasks.do_work", + namespace=issues_tasks, + retry=Retry(times=3, on=(ConnectionError,)), + processing_deadline_duration=60 +) +def do_work(organization_id: int, issue_id: int, **kwargs) -> None: + ... +``` + +When defining tasks there are some constraints: + +- All tasks _must_ have names. The name of task is serialized into an + 'activation' message that is persisted in Kafka. Task names must be stable + between deployments to avoid lost tasks. + +- Task parameters _must_ be JSON serializable. You cannot pass arbitrary python + objects through task parameters. + +- Tasks _must_ define a 'processing deadline'. After a task's processing + deadline has elapsed, it will be killed by the worker runtime. Tasks that are + killed for execution duration are not automatically retried. + +- All tasks must be assigned to a 'namespace'. A namespace is a group of related + tasks that are operated together and share a backlog. + +- The return value of a task is not stored and ignored by workers. + +- The module containing a task _must_ be added to `TASKWORKER_IMPORTS` in + `src/sentry/conf/server.py` + +## Scheduling Tasks + +With our task defined we can schedule a task (also called an "activation"): + +```python +from sentry.widgets.tasks import do_work + +# Can call the task synchronously like a normal function +do_work(organization_id=org.id, issue_id=issue.id) + +# Use .delay() to schedule a task to run in the future as soon as possible +do_work.delay(organization_id=org.id, issue_id=issue.id) + +# Use .apply_async() when you need to define headers, countdown, or expires +# for your task. Here we schedule a task to run in 5 minutes (300 seconds) +do_work.apply_async( + kwargs={"organization_id": org.id, "issue_id": issue.id}, + countdown=300 +) +``` + +When tasks are executed, the parameter payload is deserialized, and the task +function is called. Tasks are successfully completed if they don't raise an +error. If an error is raised from a task, or the task's deadline expires, the +task is considered a failure and needs to be retried, put into a dead-letter +queue or dropped depending on the task and failure. + +## Retries + +When defining tasks, you can define a retry policy with the `retry` parameter. +When a worker executes an activation with a retry policy, any non-successful +outcome will result in the retry policy being evaluated. If the task has retries +remaining, and the captured error is a retriable error, the worker sends +a status of retry to the worker's broker. The taskbroker will take care of marking +the current activation as complete and producing a new activation to be +processed later. + +If a task does not define a retry policy the retry policy of the task namespace +is inherited. + +```python +@instrumented_task( + name="sentry.issues.tasks.deliver_issue_webhook", + namespace=issues_tasks, + retry=Retry(times=3, times_exceeded=LastAction.Deadletter), +) +def deliver_issue_webhook(organization_id: int, group_id: int) -> None: + ... +``` + +### Conditional Retries + +Retries can be conditional based on the exception type: + +```python +retry=Retry(on=(IntegrationError,), times=3, times_exceeded=LastAction.Discard) +``` + +### Retry delays + +By default retries will be executed as soon as they are consumed. If a task +needs to stagger retries, it can use a delayed retry. + +```python +@instrumented_task( + name="sentry.integrations.fetch_commits", + namespace=issues_tasks, + retry=Retry(times=3, on=(IntegrationError, ), delay=30) +) +def fetch_commits(repository_id: int) -> None: + ... +``` + +With the above configuration, each retry will be processed at least 30 seconds +after the previous attempt. The delay between retries could be longer than 30 +seconds, but won’t be shorter. + +## Processing Deadlines + +Every task has a 'processing deadline' which is the maximum expected runtime for a task. If +a task does not define a processing deadline, it will inherit the deadline +defined on the task's namespace or use the default of **10 seconds**. Task +deadlines are intended to be generous and are intended to prevent workers being +saturated by tasks running for unbounded amounts of time. + +```python +@instrumented_task( + name="sentry.integrations.fetch_commits", + namespace=issues_tasks, + # Extended from the default 10 + processing_deadline_duration=60 +) +def fetch_commits(repository_id: int) -> None: + ... +``` + +After this a task has run for the length of its processing deadline, it will be +interrupted by `SIGALRM` which raises a `ProcessingDeadlineExceeded` error which +will interrupt your task’s logic. + +### Resolving deadline issues + +In most scenarios the simplest solution is to extend the deadline for a task. +This is the recommended solution until you get above 20min of duration. After +this duration the chances of your task being terminated by a deploy increase +quickly. Instead of extending the deadline further, you should rethink your +logic and partition the workload into smaller batches, or individual jobs that +can be processed independently. Instead of mapping all projects in a single +task, spawn multiple tasks. + +## Expiration deadlines + +A task's expiration time defines a point in time after which a task is +considered expired and should not be executed. This mechanism allows tasks to be +skipped if they are stale and their results are no longer relevant. + +```python +@instrumented_tasks( + name="sentry.issues.tasks.deliver_issue_webhook", + namespace=issues_tasks, + expires=timedelta("5 minutes"), +) +def deliver_issue_webhook(organization_id: int, group_id: int): + ... +``` + +Expiration times can be expressed as `timedelta` objects or a number of seconds. +Tasks that are past their expiration will not be sent to workers. Instead they +will be discarded or dead-lettered depending on task configuration. + +## Future schedules + +Tasks can be scheduled to be run up to an hour in the future with the +`countdown` parameter. + +```jsx +deliver_issue_webhook.apply_async(countdown=timedelta(minutes=10)) +``` + +Countdown tasks will be processed and retained by taskbroker until their +countdown has elapsed. Once the countdown delay has elapsed the task will be +made available for workers. + +## Idempotency (at_most_once) + +Tasks are processed with at-least-once guarantees. A task may be attempted +multiple times if processing deadlines are exceeded. To prevent multiple +executions, tasks can enable `at_most_once` which enables at-most-once +execution. + +```python +@instrumented_task( + name="sentry.issues.tasks.deliver_issue_webhook", + namespace=issues_tasks, + at_most_once=True, +) +def deliver_issue_webhook(organization_id: int, group_id: int) -> None: + ... + +``` + +If an idempotent task exceeds a processing deadline, it will *not* be retried. + +# Testing Tasks + +Tasks can be tested with a few different approaches. The first is with the +`self.tasks()` or `TaskRunner` context manager. When these context managers are +entered, tasks will be executed *synchronously* which allows you to validate the +side-effects of your tasks and validate that parameters to your task are JSON +compatible: + +```python +def test_action_with_tasks(self): + with self.tasks(): + self.client.get("/organizations/slug/do-thing/") + # can make assertions on side-effects of tasks spawned by the endpoint. +``` + +Tasks can also be tested with `mock.patch` : + +```python +@patch("sentry.hybridcloud.tasks.deliver_webhooks.drain_mailbox") +def test_schedule_task(self, mock_deliver: MagicMock) -> None: + # Do work to trigger the task + # Assert that the task was scheduled + mock_deliver.delay.assert_called_with(webhook_one.id) +``` + + +Mocking tasks will not validate that parameters are JSON compatible, nor will it catch TypeErrors from signature mismatches. + + +# Task namespaces + +Task namespaces are created as code, and configuration are linked to the +namespace when it is declared. + +```python +# in sentry.taskworker.namespaces +from sentry.taskworker.config import taskregistry +from sentry.taskworker.retry import LastAction, Retry + +issues_tasks = taskregistry.create_namespace( + "issues", + retry=Retry(times=3, times_exceeded=LastAction.Discard) +) + +# register tasks within a namespace +@instrumented_task(name="tasks.do_work", namespace=issues_tasks) +def do_work(**kwargs): + ... +``` + +Namespaces can define default behaviour for `retry` , `processing_deadline` and +`expires` for the tasks they contain. Without explicit routing, any namespace +will be run in our `default` worker pools. If your task namespace will be +high-throughput (more than 1500 tasks per second) consider provisioning +a dedicated pool for your tasks. + +# Periodically Scheduled Tasks + +Task can also be set to spawn on a periodic schedule. To accomplish this, simply +update the `TASKWORKER_SCHEDULE` configuration found in +`src/sentry/conf/server.py` with the appropriate namespace, task, and schedule. +Taskworker supports `timedelta` and `crontab` schedule types: + +```python +TASKWORKER_REGION_SCHEDULES: ScheduleConfigMap = { + "send-beacon": { + "task": "selfhosted:sentry.tasks.send_beacon", + "schedule": task_crontab("0", "*/1", "*", "*", "*"), + }, +} +``` diff --git a/develop-docs/backend/application-domains/tasks/running-workers.mdx b/develop-docs/backend/application-domains/tasks/running-workers.mdx new file mode 100644 index 0000000000000..4ee55e6f35003 --- /dev/null +++ b/develop-docs/backend/application-domains/tasks/running-workers.mdx @@ -0,0 +1,44 @@ +--- +title: Running workers +sidebar_order: 30 +--- + + +See [operating tasks](/self-hosted/tasks/) for how to operate workers and +brokers in self-hosted. + + +## Running a worker + +Workers are responsible for fetching tasks, executing them and reporting +completion status back to a taskbroker. Before you can run a worker, you'll need +to [start a taskbroker](#running-taskbroker]). There are a few options for +running workers: + +```bash +# Run a worker in the foreground +sentry run taskworker + +# Run a worker as part of the devserver +sentry devserver --workers + +# Foreground the worker from a devservices mode like `ingest` +devservices foreground taskworker +``` + +## Running a task scheduler + +The task-scheduler is responsible for reading the task schedules from +`settings.TASKWORKER_SCHEDULES` and spawning tasks as schedules become due. + +```bash +sentry run taskworker-scheduler +``` + +## Running taskbroker + +Taskbroker can be started as a background service with `devservices` + +```bash +devservices up --mode=taskbroker +``` diff --git a/develop-docs/backend/application-domains/tasks/task-lifecycle.mdx b/develop-docs/backend/application-domains/tasks/task-lifecycle.mdx new file mode 100644 index 0000000000000..c953435d72034 --- /dev/null +++ b/develop-docs/backend/application-domains/tasks/task-lifecycle.mdx @@ -0,0 +1,145 @@ +--- +title: Task Lifecycle +sidebar_order: 20 +--- + +Once a `TaskActivation` is consumed by a Taskbroker, the broker creates an +`InflightActivation` that manages the state and lifecycle of an activation. + +```mermaid +flowchart + +start -- add task --> pending +start -- add task --> delay +delay -- delay elapses --> pending +pending -- worker is given task --> processing +processing -- worker completes task --> complete +processing -- worker fails task --> failure +processing -- worker requests retry --> retry +processing -- processing_timeout expires --> pending +failure -- DLQ or discard --> complete +retry -- new task append --> complete +``` + +### State descriptions + +- `pending` - The activation has been added to `InflightActivations` but not given to a worker. +- `delay` - The activation has a `delay` attribute set. The activation will be stored as status=delay until the activation `received_at + delay` has elapsed. +- `processing` - The activation has been assigned to a worker and we’re waiting on the worker. +- `failure` - The activation failed in the worker. The task will not be retried and the consumer can DLQ/discard the message. +- `retry` - The activation should be rescheduled on the next consumer loop. +- `complete` - The activation has all required actions complete, and can be removed from the task store. + +## Advancing the state machine - Upkeep + +The `BrokerUpkeep` activity periodically advances the state machines of all inflight activations. Upkeep is composed of several steps: + +```mermaid +sequenceDiagram + +participant Kafka +participant BrokerUpkeep +participant InflightActivations + +note over BrokerUpkeep: 1. handle tasks with state=retry +BrokerUpkeep ->>+ InflightActivations : get tasks with state=retry +InflightActivations -->>- BrokerUpkeep: task list +BrokerUpkeep ->> BrokerUpkeep : update retry state +BrokerUpkeep ->>+ Kafka : append tasks with updated retry state +Kafka -->>- BrokerUpkeep : ok +BrokerUpkeep ->> InflightActivations : mark tasks complete +InflightActivations -->> BrokerUpkeep : ok + +note over BrokerUpkeep: 2. handle processing deadlines +BrokerUpkeep ->>+ InflightActivations : find tasks past processing_deadline +InflightActivations -->> InflightActivations : set state=failure|pending +InflightActivations -->>- BrokerUpkeep : ok + +note over BrokerUpkeep: 3. handle expired +BrokerUpkeep ->>+ InflightActivations : find tasks with expiry > now +InflightActivations -->> InflightActivations : set state=failure +InflightActivations -->>- BrokerUpkeep : ok + +note over BrokerUpkeep: 4. handle delay_until +BrokerUpkeep ->>+ InflightActivations : find tasks with delay_until < now +InflightActivations -->> InflightActivations : set state=pending +InflightActivations -->>- BrokerUpkeep : ok + +note over BrokerUpkeep: 5. handle state=failure tasks +BrokerUpkeep ->>+ InflightActivations : get state=failure tasks +InflightActivations -->>- BrokerUpkeep : task list +BrokerUpkeep ->>+ Kafka : submit to DLQ or discard +Kafka -->>- BrokerUpkeep : ok + +note over BrokerUpkeep: 4. Remove completed tasks +BrokerUpkeep ->>+ InflightActivations : mark tasks complete +InflightActivations -->>- BrokerUpkeep : ok +``` + +## Task processing deadline + +When BrokerRpc provides a worker a task, the task’s `processing_deadline` is set, and the `processing_attempts` counter is incremented. Processing deadlines are calculated with the current timestamp and task’s `processing_deadline_duration` configuration. After the processing deadline has elapsed, the Broker assumes the Worker has died, or the TaskActivation had an abnormal execution time. + +```mermaid +sequenceDiagram + +participant Worker +participant BrokerRpc +participant InflightActivation + +Worker ->> BrokerRpc : get_task() +BrokerRpc ->>+ InflightActivation : get next task +InflightActivation ->> InflightActivation : update task state & processing_deadline +InflightActivation -->>- BrokerRpc : return task +BrokerRpc -->> Worker : return task +Worker -) Worker : Is hard killed by k8s or hard crashes +note over Worker: Time passes +BrokerRpc ->> BrokerRpc : upkeep loop (see BrokerConsumer flow) +BrokerRpc ->>+ InflightActivation : get tasks past processing deadline +InflightActivation -->>- BrokerRpc : task list +BrokerRpc ->> BrokerRpc : check if there are retries remaining +BrokerRpc ->>+ InflightActivation : update tasks +InflightActivation -->>- BrokerRpc : ok + +``` + +Workers need to enforce execution timeouts to prevent a `while True` poison-pill +task from consuming all available workers. When a worker receives a task to +process, they also receive a `processing_deadline`. The worker uses the +`processing_deadline` as a hard timeout for task execution. If a task fails to +complete by its `processing_deadline`, and the worker is still alive, the worker +is expected to update the status of the task to be `failure` or `retry`. + +When a processing deadline elapses and the broker did not get a confirmed +outcome from a worker, the task’s `status` is reset to `pending` and the +current `processing_deadline` is cleared. When a task exceeds +a `processing_deadline` no retries are consumed, as the failure could be an +infrastructure issue. + +## Processing Attempts + +Because tasks contain arbitrary logic, they have theoretically unbounded +execution time. We must also assume that workers will be restarted, crash or +otherwise be unable to complete all work they are given. To avoid brokers +becoming blocked by slow to process messages, or workers that are lost, inflight +tasks have deadlines for both processing - `processing_deadline`, and maximum +number of processing attempts. Once an activation has reached the broker’s +`max_processing_attempts` , the activation’s status is set to `failure` . The +activation will be discarded/deadlettered based on the activation’s retry state. + +## Task completion & retries + +As tasks complete `InflightActivation` state is updated and the consumer state +machine will retry, deadletter or discard tasks in subsequent upkeep run. + +## Task Failures + +When a worker reports a task as `failure` the `InflightActivation` state is +updated, and the Consumer will deadletter/discard the task in a future upkeep +run. Once the `failure` action has been taken, the task state is set to +`complete` for garbage collection. + +## Data loss + +Data loss of the `InflightActivationStore` will result in tasks not being +executed. diff --git a/develop-docs/backend/application-domains/tasks/terminology-and-concepts.mdx b/develop-docs/backend/application-domains/tasks/terminology-and-concepts.mdx new file mode 100644 index 0000000000000..923439ef2f45e --- /dev/null +++ b/develop-docs/backend/application-domains/tasks/terminology-and-concepts.mdx @@ -0,0 +1,113 @@ +--- +title: Terminology and Concepts +sidebar_order: 10 +--- + +Tasks are functions that can be scheduled to run asynchronously in the future. +Tasks are defined as functions and registered into namespaces: + +```python +@instrumented_task( + name="sentry.issues.tasks.deliver_issue_webhook", + namespace=issues_tasks, + retry=Retry(times=3, times_exceeded=LastAction.Deadletter) +) +def deliver_issue_webhook(organization_id: int, group_id: int) -> None: + ... +``` + +Once a task is defined it can be scheduled: + +```python +from sentry.issues.tasks.webhooks import deliver_issue_webhook + +# Schedule a task to run in the future as soon as possible +deliver_issue_webhook.delay(organization_id=org.id, issue_id=issue.id) +``` + +See [Defining Tasks](/backend/application-domains/tasks/#defining-tasks) for +more information on defining tasks. + +# Task Namespaces + +Namespaces provide logical groupings of tasks by product domain or +functionality. All task activations within a namespace are processed in order +(normal partitioning races are still possible). There are no ordering guarantees +between namespaces. + +Namespaces can define defaults for retries and processing deadlines. Namespaces +provide logical separation between workloads in the application, and enable +different regions to be scaled differently. + +```mermaid +erDiagram + +TaskNamespace ||--o{ Task : "contains many" +TaskNamespace }|--|| Topic : "assigned to one" +``` + +Each namespace is mapped to a Kafka topic. All activations for a namespace are +enqueued to the same topic. While a namespace has a single topic, multiple +namespaces can share a topic. + +These relations allow us to map all tasks to a single topic in local development + +```mermaid +flowchart LR + +namespace:issues --> kw +namespace:ingest.errors --> kw[kafka-tasks] +namespace:integrations --> kw +namespace:uptime --> kw +namespace:notifications --> kw +namespace:replays --> kw +namespace:ingest.profiling --> kw + +``` + +And map those same namespaces to multiple topics and kafka clusters in saas: + +```mermaid +flowchart LR + +namespace:ingest.errors --> ki[kafka-ingest] +namespace:integrations --> kg[kafka-general] +namespace:uptime --> kc[kafka-crons] +namespace:issues --> kg +namespace:replays --> kp[kafka-profiling] +namespace:ingest.profiling --> kr[kafka-replays] + +``` + +# System Components + +The task framework is composed of a few components: + +```mermaid +flowchart + +Sentry -- produce activation --> k[(Kafka)] +k -- consume activations --> b[Taskbroker] +w[Sentry Taskworker] -- fetch activation --> b +w -- execute task --> w +w -- publish result --> b +``` + +Client applications produce TaskActivation messages (serialized as protobuf messages) to Kafka topics. Taskbroker instances consume Kafka messages, and make activations available to workers via gRPC. + +# Terminology + +- `Task` A function that can be scheduled to run later. Tasks are executed by workers, and can be retried should they fail. +- `TaskNamespace` A collection of related tasks that are operated together. Activations within a namespace will be ordered, but activations between namespaces have no ordering promises. +- `TaskActivation` A deferred task invocation that can be serialized and executed later. Also referred to as an `activation` . +- `Broker` Taskbrokers consume tasks from a Kafka topic and save them to `InflightActivations`. Brokers prepare tasks for Workers, and receive work completion updates from Workers. +- `Worker` workers fetch tasks from Brokers and handle executing tasks. As tasks are executed, workers report task outcomes back to a Broker. +- `processing_pool` Used to describe a broker + worker group. Each region has multiple processing pools that handle different namespaces. Processing pools provide failure isolation boundaries from each other. +- `processing_deadline` The number of seconds that an activation result is expected by. If a broker does not receive a result by the processing deadline, the broker will assume the worker has died and will give the activation to a new worker + +### Broker specific terms + +- `InflightActivations` When `TaskActivations` are being processed, additional metadata is stored in the `InflightActivationStore` until execution is complete. +- `max_pending_count` The maximum number of tasks that can be pending in `InflightActivations` sqlite database. +- `max_processing_deadline` The maximum duration that tasks can be in a `processing` state. If a task is not completed within this time, Brokers assume the worker has died and the task is made `pending` again. +- `max_processing_attempts` The maximum number of times an activation can move from `pending` → `processing`. diff --git a/develop-docs/development-infrastructure/devservices.mdx b/develop-docs/development-infrastructure/devservices.mdx index 6487b51d515fc..4f52db9a82fa2 100644 --- a/develop-docs/development-infrastructure/devservices.mdx +++ b/develop-docs/development-infrastructure/devservices.mdx @@ -78,7 +78,8 @@ Common modes in sentry: - `migrations`: Bring up postgres and redis - `minimal`: Bring up minimal services for local development - `full`: Bring up all services (ingest, symbolicator, taskbroker, snuba, vroom, etc) -- `taskbroker`: Bring up sentry dependencies, taskbroker, taskworker, and taskworker-scheduler +- `taskbroker`: Bring up sentry dependencies, and taskbroker +- `taskworker`: Bring up sentry dependencies, taskbroker, taskworker, and taskworker-scheduler - `memcached`: Bring up sentry dependencies and memcached Modes for specific development scenarios: diff --git a/develop-docs/self-hosted/tasks.mdx b/develop-docs/self-hosted/tasks.mdx new file mode 100644 index 0000000000000..5f0d426f20a5e --- /dev/null +++ b/develop-docs/self-hosted/tasks.mdx @@ -0,0 +1,76 @@ +--- +title: Tasks +sidebar_order: 70 +--- + +## System Architecture + +Sentry's task platform is designed to scale horizontally to enable +high-throughput processing. The task platform is composed of a few components: + +```mermaid +flowchart + +Sentry -- produce task activation --> k[(Kafka)] +k -- consume messages --> Taskbroker +Worker -- grpc GetTask --> Taskbroker +Worker -- execute task --> Worker +Worker -- grpc SetTaskStatus --> Taskbroker +``` + +Brokers and workers are paired together to create 'processing pools' for tasks. +Brokers and workers can be scaled horizontally to increase parallelism. + +## Scaling workers + +By default, self-hosted installs come with a single broker & worker replica. You +can increase processing capacity by adding more concurrency to the single worker +(via the `--concurrency` option on the worker), or by adding additional worker, and broker +replicas. It is not recommended to go above 24 worker replicas per broker as +broker performance can degrade with higher worker counts. + +If your deployment requires additional processing capacity, you can add +additional broker replicas and use CLI options to inform the workers of the +broker addresses: + +```bash +sentry run taskworker --rpc-host-list=sentry-broker-default-0:50051,sentry-broker-default-1:50051 +``` + +Workers use client-side loadbalancing to distribute load across the brokers they +have been assigned to. + +## Running multiple brokers + +In higher throughput installations, you may also want to isolate task workloads +from each other to ensure timely processing of lower volume tasks. For example, +you could isolate ingestion related tasks from other work: + +```mermaid +flowchart + +Sentry -- produce tasks --> k[(Kafka)] +k -- topic-taskworker-ingest --> tb-i[Taskbroker ingest] +k -- topic-taskworker --> tb-d[Taskbroker default] +tb-i --> w-i[ingest Worker] +tb-d --> w-d[default Worker] +``` + +To achieve this work separation we need to make a few changes: + +1. Provision any additional topics. Topic names need to come from one of the + predefined topics in `src/sentry/conf/types/kafka_definition.py` +2. Deploy the additional broker replicas. You can use the + `TASKBROKER_KAFKA_TOPIC` environment variable to define the topic a + taskbroker consumes from. +3. Deploy additional workers that use the new brokers in their `rpc-host-list` + CLI flag. +4. Find the list of namespaces you want to shift to the new topic. The list of + task namespaces can be found in the `sentry.taskworker.namespaces` module. +5. Update task routing option, defining the namespace -> topic mappings. e.g. + ```yaml + # in sentry/config.yml + taskworker.route.overrides: + "ingest.errors": "taskworker-ingest" + "ingest.transactions": "taskworker-ingest" + ```