Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
909912b
WPB-19712: Allow team admin to update the channels to user-group asso…
blackheaven Sep 18, 2025
b4af1e1
WPB-19712: Allow team admin to update the channels to user-group asso…
blackheaven Sep 18, 2025
05cb59e
job types
battermann Sep 30, 2025
2af71d8
job consumer scaffold
battermann Sep 30, 2025
d1b656b
implement basic handler for jobs
battermann Sep 30, 2025
0638cd3
clean up
battermann Sep 30, 2025
c7903c7
refactor: some clean ups
blackheaven Sep 30, 2025
0954e7f
feat: add jobs defiinitions, fix schema and validate settings
blackheaven Oct 1, 2025
42f8567
feat: add Effect and Interpreters
blackheaven Oct 1, 2025
9a54bf0
update cabal and nix dependencies
battermann Oct 2, 2025
f315a2a
moved and removed some code
battermann Oct 2, 2025
66e322f
background job runner
battermann Oct 2, 2025
9c0d7a9
split job publisher and runner
battermann Oct 2, 2025
aa19626
interpret job runner from background worker
battermann Oct 2, 2025
e53e152
config maps, helm charts, default values, docs
battermann Oct 2, 2025
49f443e
reduce invalid message sleep
battermann Oct 2, 2025
ed353a2
fix linting issue
battermann Oct 2, 2025
4b83b63
fix arbitrary instance of request id
battermann Oct 2, 2025
6053f5d
wip
battermann Oct 2, 2025
ac34b18
fix warnings in background worker tests
battermann Oct 2, 2025
ed0aac7
implement sync user group job
battermann Oct 2, 2025
a09bc11
fix: rebase
blackheaven Oct 2, 2025
4dcbb28
feat: call the jobs, run the jobs, add config options
blackheaven Oct 3, 2025
c0c61ef
fix: hlint
blackheaven Oct 3, 2025
c0f8a25
fix: nix files
blackheaven Oct 3, 2025
975475b
fix: config in yamls
blackheaven Oct 3, 2025
aca0036
refactor: move publish from wire-api to wire-subsystem
blackheaven Oct 6, 2025
86b8fc9
fix: k8s configs
blackheaven Oct 6, 2025
26c4936
fix: k8s configs
blackheaven Oct 6, 2025
cc46c4f
fix: k8s configs
blackheaven Oct 6, 2025
f3666c6
fix: k8s config
battermann Oct 6, 2025
3b719fe
refactor
battermann Oct 6, 2025
e9252ad
missing wiring of postgres config in background-worker
battermann Oct 6, 2025
fb9ce89
removed todos
battermann Oct 6, 2025
e967a41
configure brig cassandra client for background worker
battermann Oct 6, 2025
b040bda
fix testTemporaryQueuesAreDeletedAfterUse test
battermann Oct 6, 2025
583b878
wip: debugging bg jobs
battermann Oct 6, 2025
e702d0d
fix: backgroung timeout second translation
blackheaven Oct 6, 2025
cc07944
fix: integration tests background-worker postgresql params
blackheaven Oct 6, 2025
aad7bca
fix fetching the channels for a group
battermann Oct 7, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/2-features/WPB-19713
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Implement `channels` and `channelsCount` in `user-groups` endpoints.
16 changes: 15 additions & 1 deletion charts/background-worker/README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,19 @@
Note that background-worker depends on some provisioned storage, namely:
Note that background-worker depends on some provisioned storage/services, namely:

- rabbitmq
- postgresql
- cassandra (two clusters)

PostgreSQL configuration
- Set connection parameters under `config.postgresql` (libpq keywords: `host`, `port`, `user`, `dbname`, etc.).
- Provide the password via `secrets.pgPassword`; it is mounted at `/etc/wire/background-worker/secrets/pgPassword` and referenced from the configmap.

Cassandra configuration
- Background-worker connects to two Cassandra clusters:
- `config.cassandra` (keyspace: `gundeck`) for the dead user notification watcher.
- `config.cassandraBrig` (keyspace: `brig`) for the user store.
- TLS may be configured via either a reference (`tlsCaSecretRef`) or inline CA (`tlsCa`) for each cluster. Secrets mount under:
- `/etc/wire/background-worker/cassandra-gundeck`
- `/etc/wire/background-worker/cassandra-brig`

These are dealt with independently from this chart.
22 changes: 15 additions & 7 deletions charts/background-worker/templates/_helpers.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,26 @@
{{- (semverCompare ">= 1.24-0" (include "kubeVersion" .)) -}}
{{- end -}}

{{- define "useCassandraTLS" -}}
{{- define "useGundeckCassandraTLS" -}}
{{ or (hasKey .cassandra "tlsCa") (hasKey .cassandra "tlsCaSecretRef") }}
{{- end -}}

{{/* Return a Dict of TLS CA secret name and key
This is used to switch between provided secret (e.g. by cert-manager) and
created one (in case the CA is provided as PEM string.)
*/}}
{{- define "tlsSecretRef" -}}
{{- define "useBrigCassandraTLS" -}}
{{ or (hasKey .cassandraBrig "tlsCa") (hasKey .cassandraBrig "tlsCaSecretRef") }}
{{- end -}}

{{- define "gundeckTlsSecretRef" -}}
{{- if .cassandra.tlsCaSecretRef -}}
{{ .cassandra.tlsCaSecretRef | toYaml }}
{{- else }}
{{- dict "name" "background-worker-cassandra" "key" "ca.pem" | toYaml -}}
{{- dict "name" "background-worker-cassandra-gundeck" "key" "ca.pem" | toYaml -}}
{{- end -}}
{{- end -}}

{{- define "brigTlsSecretRef" -}}
{{- if .cassandraBrig.tlsCaSecretRef -}}
{{ .cassandraBrig.tlsCaSecretRef | toYaml }}
{{- else }}
{{- dict "name" "background-worker-cassandra-brig" "key" "ca.pem" | toYaml -}}
{{- end -}}
{{- end -}}
19 changes: 17 additions & 2 deletions charts/background-worker/templates/cassandra-secret.yaml
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
{{/* Secret for the provided Cassandra TLS CA. */}}
{{/* Secrets for provided Cassandra TLS CAs */}}
{{- if not (empty .Values.config.cassandra.tlsCa) }}
apiVersion: v1
kind: Secret
metadata:
name: background-worker-cassandra
name: background-worker-cassandra-gundeck
labels:
app: background-worker
chart: {{ .Chart.Name }}-{{ .Chart.Version | replace "+" "_" }}
Expand All @@ -13,3 +13,18 @@ type: Opaque
data:
ca.pem: {{ .Values.config.cassandra.tlsCa | b64enc | quote }}
{{- end }}
{{- if not (empty .Values.config.cassandraBrig.tlsCa) }}
---
apiVersion: v1
kind: Secret
metadata:
name: background-worker-cassandra-brig
labels:
app: background-worker
chart: {{ .Chart.Name }}-{{ .Chart.Version | replace "+" "_" }}
release: "{{ .Release.Name }}"
heritage: "{{ .Release.Service }}"
type: Opaque
data:
ca.pem: {{ .Values.config.cassandraBrig.tlsCa | b64enc | quote }}
{{- end }}
22 changes: 20 additions & 2 deletions charts/background-worker/templates/configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,17 @@ data:
host: {{ .cassandra.host }}
port: 9042
keyspace: gundeck
{{- if eq (include "useCassandraTLS" .) "true" }}
tlsCa: /etc/wire/background-worker/cassandra/{{- (include "tlsSecretRef" . | fromYaml).key }}
{{- if eq (include "useGundeckCassandraTLS" .) "true" }}
tlsCa: /etc/wire/background-worker/cassandra-gundeck/{{- (include "gundeckTlsSecretRef" . | fromYaml).key }}
{{- end }}

cassandraBrig:
endpoint:
host: {{ .cassandraBrig.host }}
port: 9042
keyspace: brig
{{- if eq (include "useBrigCassandraTLS" .) "true" }}
tlsCa: /etc/wire/background-worker/cassandra-brig/{{- (include "brigTlsSecretRef" . | fromYaml).key }}
{{- end }}

{{- with .rabbitmq }}
Expand All @@ -48,4 +57,13 @@ data:

backendNotificationPusher:
{{toYaml .backendNotificationPusher | indent 6 }}
{{- with .backgroundJobs }}
backgroundJobs:
{{ toYaml . | indent 6 }}
{{- end }}
postgresql:
{{ toYaml .postgresql | indent 6 }}
{{- if hasKey $.Values.secrets "pgPassword" }}
postgresqlPassword: /etc/wire/background-worker/secrets/pgPassword
{{- end }}
{{- end }}
23 changes: 17 additions & 6 deletions charts/background-worker/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,15 @@ spec:
- name: "background-worker-secrets"
secret:
secretName: "background-worker"
{{- if eq (include "useCassandraTLS" .Values.config) "true" }}
- name: "background-worker-cassandra"
{{- if eq (include "useGundeckCassandraTLS" .Values.config) "true" }}
- name: "background-worker-cassandra-gundeck"
secret:
secretName: {{ (include "tlsSecretRef" .Values.config | fromYaml).name }}
secretName: {{ (include "gundeckTlsSecretRef" .Values.config | fromYaml).name }}
{{- end }}
{{- if eq (include "useBrigCassandraTLS" .Values.config) "true" }}
- name: "background-worker-cassandra-brig"
secret:
secretName: {{ (include "brigTlsSecretRef" .Values.config | fromYaml).name }}
{{- end }}
{{- if .Values.config.rabbitmq.tlsCaSecretRef }}
- name: "rabbitmq-ca"
Expand All @@ -58,11 +63,17 @@ spec:
{{- toYaml .Values.podSecurityContext | nindent 12 }}
{{- end }}
volumeMounts:
- name: "background-worker-secrets"
mountPath: "/etc/wire/background-worker/secrets"
- name: "background-worker-config"
mountPath: "/etc/wire/background-worker/conf"
{{- if eq (include "useCassandraTLS" .Values.config) "true" }}
- name: "background-worker-cassandra"
mountPath: "/etc/wire/background-worker/cassandra"
{{- if eq (include "useGundeckCassandraTLS" .Values.config) "true" }}
- name: "background-worker-cassandra-gundeck"
mountPath: "/etc/wire/background-worker/cassandra-gundeck"
{{- end }}
{{- if eq (include "useBrigCassandraTLS" .Values.config) "true" }}
- name: "background-worker-cassandra-brig"
mountPath: "/etc/wire/background-worker/cassandra-brig"
{{- end }}
{{- if .Values.config.rabbitmq.tlsCaSecretRef }}
- name: "rabbitmq-ca"
Expand Down
3 changes: 3 additions & 0 deletions charts/background-worker/templates/secret.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,7 @@ data:
{{- with .Values.secrets }}
rabbitmqUsername: {{ .rabbitmq.username | b64enc | quote }}
rabbitmqPassword: {{ .rabbitmq.password | b64enc | quote }}
{{- if .pgPassword }}
pgPassword: {{ .pgPassword | b64enc | quote }}
{{- end }}
{{- end }}
24 changes: 24 additions & 0 deletions charts/background-worker/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,17 @@ config:
logLevel: Info
logFormat: StructuredJSON
enableFederation: false # keep in sync with brig, cargohold and galley charts' config.enableFederation as well as wire-server chart's tags.federation
# Postgres connection settings
#
# Values are described in https://www.postgresql.org/docs/17/libpq-connect.html#LIBPQ-PARAMKEYWORDS
# To set the password via a background-worker secret see `secrets.pgPassword`.
#
# Below is an example configuration used in CI tests.
postgresql:
host: postgresql # DNS name without protocol
port: "5432"
user: wire-server
dbname: wire-server
rabbitmq:
host: rabbitmq
port: 5672
Expand All @@ -29,15 +40,28 @@ config:
# tlsCaSecretRef:
# name: <secret-name>
# key: <ca-attribute>
# Cassandra clusters used by background-worker
cassandra:
host: aws-cassandra
cassandraBrig:
host: aws-cassandra

backendNotificationPusher:
pushBackoffMinWait: 10000 # in microseconds, so 10ms
pushBackoffMaxWait: 300000000 # microseconds, so 300s
remotesRefreshInterval: 300000000 # microseconds, so 300s

# Background jobs consumer configuration
backgroundJobs:
# Maximum number of in-flight jobs per process
concurrency: 8
# Per-attempt timeout in seconds
jobTimeout: 60
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't this something we want to define per job type? deleting users takes less time than scanning a huge table for inconsistencies.

i have little context, though. maybe i'm thinking ahead to far.

# Total attempts, including the first try
maxAttempts: 3

secrets: {}
# pgPassword: <postgres-password>

podSecurityContext:
allowPrivilegeEscalation: false
Expand Down
5 changes: 5 additions & 0 deletions charts/integration/templates/configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ data:
backgroundWorker:
host: backgroundWorker.{{ .Release.Namespace }}.svc.cluster.local
port: 8080
# Background jobs defaults for integration tests
backgroundJobs:
concurrency: 4
jobTimeout: 5
maxAttempts: 3

stern:
host: stern.{{ .Release.Namespace }}.svc.cluster.local
Expand Down
21 changes: 21 additions & 0 deletions docs/src/developer/reference/config-options.md
Original file line number Diff line number Diff line change
Expand Up @@ -1669,3 +1669,24 @@ gundeck:
settings:
cellsEventQueue: "cells_events"
```
## Background worker: Background jobs

The background worker consumes jobs from RabbitMQ to process tasks asynchronously. The following configuration controls the consumer’s behavior:

Internal YAML file and Helm values (under `background-worker.config`):

```yaml
backgroundJobs:
# Maximum number of in-flight jobs per process
concurrency: 8
# Per-attempt timeout in seconds
jobTimeout: 60
# Total attempts including the first run
maxAttempts: 3
```

Notes:

- `concurrency` controls the AMQP prefetch and caps parallel handler execution per process.
- `jobTimeout` bounds each attempt; timed‑out attempts are retried until `maxAttempts` is reached.
- `maxAttempts` is total tries (first run plus retries). On final failure, the job is dropped (NACK requeue=false) and counted in metrics.
20 changes: 20 additions & 0 deletions hack/helm_vars/wire-server/values.yaml.gotmpl
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ brig:
teamCreatorWelcome: https://teams.wire.com/login
teamMemberWelcome: https://wire.com/download
accountPages: https://account.wire.com
# Background-worker uses Brig's Cassandra keyspace.
cassandra:
host: {{ .Values.cassandraHost }}
replicaCount: 1
Expand Down Expand Up @@ -603,6 +604,11 @@ background-worker:
pushBackoffMinWait: 1000 # 1ms
pushBackoffMaxWait: 500000 # 0.5s
remotesRefreshInterval: 1000000 # 1s
backgroundJobs:
concurrency: 8
jobTimeout: 60
maxAttempts: 3
# Cassandra clusters used by background-worker
cassandra:
host: {{ .Values.cassandraHost }}
replicaCount: 1
Expand All @@ -611,6 +617,14 @@ background-worker:
name: "cassandra-jks-keystore"
key: "ca.crt"
{{- end }}
cassandraBrig:
host: {{ .Values.cassandraHost }}
replicaCount: 1
{{- if .Values.useK8ssandraSSL.enabled }}
tlsCaSecretRef:
name: "cassandra-jks-keystore"
key: "ca.crt"
{{- end }}
rabbitmq:
port: 5671
adminPort: 15671
Expand All @@ -619,10 +633,16 @@ background-worker:
tlsCaSecretRef:
name: "rabbitmq-certificate"
key: "ca.crt"
postgresql:
host: "postgresql"
port: "5432"
user: wire-server
dbname: wire-server
secrets:
rabbitmq:
username: {{ .Values.rabbitmqUsername }}
password: {{ .Values.rabbitmqPassword }}
pgPassword: "posty-the-gres"

integration:
ingress:
Expand Down
10 changes: 10 additions & 0 deletions integration/test/API/Brig.hs
Original file line number Diff line number Diff line change
Expand Up @@ -1061,6 +1061,16 @@ getUserGroup user gid = do
req <- baseRequest user Brig Versioned $ joinHttpPath ["user-groups", gid]
submit "GET" req

getUserGroupWithChannels :: (MakesValue user) => user -> String -> App Response
getUserGroupWithChannels user gid = do
req <- baseRequest user Brig Versioned $ joinHttpPath ["user-groups", gid]
submit "GET" $ req & addQueryParams [("include_channels", "true")]

updateUserGroupChannels :: (MakesValue user) => user -> String -> [String] -> App Response
updateUserGroupChannels user gid convIds = do
req <- baseRequest user Brig Versioned $ joinHttpPath ["user-groups", gid, "channels"]
submit "PUT" $ req & addJSONObject ["channels" .= convIds]

data GetUserGroupsArgs = GetUserGroupsArgs
{ q :: Maybe String,
sortByKeys :: Maybe String,
Expand Down
7 changes: 4 additions & 3 deletions integration/test/Test/Events.hs
Original file line number Diff line number Diff line change
Expand Up @@ -157,19 +157,20 @@ testTemporaryQueuesAreDeletedAfterUse = do
aliceClientQueue = Queue {name = fromString aliceClientQueueName, vhost = fromString beResource.berVHost}
deadNotifsQueue = Queue {name = fromString "dead-user-notifications", vhost = fromString beResource.berVHost}
cellsEventsQueue = Queue {name = fromString "cells_events", vhost = fromString beResource.berVHost}
backgroundJobsQueue = Queue {name = fromString "background-jobs", vhost = fromString beResource.berVHost}

-- Wait for queue for the new client to be created
eventually $ do
queuesBeforeWS <- rabbitmqAdmin.listQueuesByVHost (fromString beResource.berVHost) (fromString "") True 100 1
queuesBeforeWS.items `shouldMatchSet` [deadNotifsQueue, cellsEventsQueue, aliceClientQueue]
queuesBeforeWS.items `shouldMatchSet` [deadNotifsQueue, cellsEventsQueue, aliceClientQueue, backgroundJobsQueue]

runCodensity (createEventsWebSocket alice Nothing) $ \ws -> do
handle <- randomHandle
putHandle bob handle >>= assertSuccess

queuesDuringWS <- rabbitmqAdmin.listQueuesByVHost (fromString beResource.berVHost) (fromString "") True 100 1
addJSONToFailureContext "queuesDuringWS" queuesDuringWS $ do
length queuesDuringWS.items `shouldMatchInt` 4
length queuesDuringWS.items `shouldMatchInt` 5

-- We cannot use 'assertEvent' here because there is a race between the temp
-- queue being created and rabbitmq fanning out the previous events.
Expand All @@ -183,7 +184,7 @@ testTemporaryQueuesAreDeletedAfterUse = do

eventually $ do
queuesAfterWS <- rabbitmqAdmin.listQueuesByVHost (fromString beResource.berVHost) (fromString "") True 100 1
queuesAfterWS.items `shouldMatchSet` [deadNotifsQueue, cellsEventsQueue, aliceClientQueue]
queuesAfterWS.items `shouldMatchSet` [deadNotifsQueue, cellsEventsQueue, aliceClientQueue, backgroundJobsQueue]

testSendMessageNoReturnToSenderWithConsumableNotificationsProteus :: (HasCallStack) => App ()
testSendMessageNoReturnToSenderWithConsumableNotificationsProteus = do
Expand Down
Loading