diff --git a/changelog.d/2-features/WPB-19713 b/changelog.d/2-features/WPB-19713 new file mode 100644 index 0000000000..cdbf1fd8a2 --- /dev/null +++ b/changelog.d/2-features/WPB-19713 @@ -0,0 +1 @@ +Implement `channels` and `channelsCount` in `user-groups` endpoints. diff --git a/charts/background-worker/README.md b/charts/background-worker/README.md index 55e379a4ed..6ccc8b582f 100644 --- a/charts/background-worker/README.md +++ b/charts/background-worker/README.md @@ -1,5 +1,19 @@ -Note that background-worker depends on some provisioned storage, namely: +Note that background-worker depends on some provisioned storage/services, namely: - rabbitmq +- postgresql +- cassandra (two clusters) + +PostgreSQL configuration +- Set connection parameters under `config.postgresql` (libpq keywords: `host`, `port`, `user`, `dbname`, etc.). +- Provide the password via `secrets.pgPassword`; it is mounted at `/etc/wire/background-worker/secrets/pgPassword` and referenced from the configmap. + +Cassandra configuration +- Background-worker connects to two Cassandra clusters: + - `config.cassandra` (keyspace: `gundeck`) for the dead user notification watcher. + - `config.cassandraBrig` (keyspace: `brig`) for the user store. +- TLS may be configured via either a reference (`tlsCaSecretRef`) or inline CA (`tlsCa`) for each cluster. Secrets mount under: + - `/etc/wire/background-worker/cassandra-gundeck` + - `/etc/wire/background-worker/cassandra-brig` These are dealt with independently from this chart. diff --git a/charts/background-worker/templates/_helpers.tpl b/charts/background-worker/templates/_helpers.tpl index 96bf8dd1b8..8c09767491 100644 --- a/charts/background-worker/templates/_helpers.tpl +++ b/charts/background-worker/templates/_helpers.tpl @@ -8,18 +8,26 @@ {{- (semverCompare ">= 1.24-0" (include "kubeVersion" .)) -}} {{- end -}} -{{- define "useCassandraTLS" -}} +{{- define "useGundeckCassandraTLS" -}} {{ or (hasKey .cassandra "tlsCa") (hasKey .cassandra "tlsCaSecretRef") }} {{- end -}} -{{/* Return a Dict of TLS CA secret name and key -This is used to switch between provided secret (e.g. by cert-manager) and -created one (in case the CA is provided as PEM string.) -*/}} -{{- define "tlsSecretRef" -}} +{{- define "useBrigCassandraTLS" -}} +{{ or (hasKey .cassandraBrig "tlsCa") (hasKey .cassandraBrig "tlsCaSecretRef") }} +{{- end -}} + +{{- define "gundeckTlsSecretRef" -}} {{- if .cassandra.tlsCaSecretRef -}} {{ .cassandra.tlsCaSecretRef | toYaml }} {{- else }} -{{- dict "name" "background-worker-cassandra" "key" "ca.pem" | toYaml -}} +{{- dict "name" "background-worker-cassandra-gundeck" "key" "ca.pem" | toYaml -}} +{{- end -}} +{{- end -}} + +{{- define "brigTlsSecretRef" -}} +{{- if .cassandraBrig.tlsCaSecretRef -}} +{{ .cassandraBrig.tlsCaSecretRef | toYaml }} +{{- else }} +{{- dict "name" "background-worker-cassandra-brig" "key" "ca.pem" | toYaml -}} {{- end -}} {{- end -}} diff --git a/charts/background-worker/templates/cassandra-secret.yaml b/charts/background-worker/templates/cassandra-secret.yaml index d5d9c61dfc..03018c0228 100644 --- a/charts/background-worker/templates/cassandra-secret.yaml +++ b/charts/background-worker/templates/cassandra-secret.yaml @@ -1,9 +1,9 @@ -{{/* Secret for the provided Cassandra TLS CA. */}} +{{/* Secrets for provided Cassandra TLS CAs */}} {{- if not (empty .Values.config.cassandra.tlsCa) }} apiVersion: v1 kind: Secret metadata: - name: background-worker-cassandra + name: background-worker-cassandra-gundeck labels: app: background-worker chart: {{ .Chart.Name }}-{{ .Chart.Version | replace "+" "_" }} @@ -13,3 +13,18 @@ type: Opaque data: ca.pem: {{ .Values.config.cassandra.tlsCa | b64enc | quote }} {{- end }} +{{- if not (empty .Values.config.cassandraBrig.tlsCa) }} +--- +apiVersion: v1 +kind: Secret +metadata: + name: background-worker-cassandra-brig + labels: + app: background-worker + chart: {{ .Chart.Name }}-{{ .Chart.Version | replace "+" "_" }} + release: "{{ .Release.Name }}" + heritage: "{{ .Release.Service }}" +type: Opaque +data: + ca.pem: {{ .Values.config.cassandraBrig.tlsCa | b64enc | quote }} +{{- end }} diff --git a/charts/background-worker/templates/configmap.yaml b/charts/background-worker/templates/configmap.yaml index 1a0e37e609..0fc87fb351 100644 --- a/charts/background-worker/templates/configmap.yaml +++ b/charts/background-worker/templates/configmap.yaml @@ -26,8 +26,17 @@ data: host: {{ .cassandra.host }} port: 9042 keyspace: gundeck - {{- if eq (include "useCassandraTLS" .) "true" }} - tlsCa: /etc/wire/background-worker/cassandra/{{- (include "tlsSecretRef" . | fromYaml).key }} + {{- if eq (include "useGundeckCassandraTLS" .) "true" }} + tlsCa: /etc/wire/background-worker/cassandra-gundeck/{{- (include "gundeckTlsSecretRef" . | fromYaml).key }} + {{- end }} + + cassandraBrig: + endpoint: + host: {{ .cassandraBrig.host }} + port: 9042 + keyspace: brig + {{- if eq (include "useBrigCassandraTLS" .) "true" }} + tlsCa: /etc/wire/background-worker/cassandra-brig/{{- (include "brigTlsSecretRef" . | fromYaml).key }} {{- end }} {{- with .rabbitmq }} @@ -48,4 +57,13 @@ data: backendNotificationPusher: {{toYaml .backendNotificationPusher | indent 6 }} + {{- with .backgroundJobs }} + backgroundJobs: +{{ toYaml . | indent 6 }} + {{- end }} + postgresql: +{{ toYaml .postgresql | indent 6 }} + {{- if hasKey $.Values.secrets "pgPassword" }} + postgresqlPassword: /etc/wire/background-worker/secrets/pgPassword + {{- end }} {{- end }} diff --git a/charts/background-worker/templates/deployment.yaml b/charts/background-worker/templates/deployment.yaml index aeeab1ecc5..75c6bd6a65 100644 --- a/charts/background-worker/templates/deployment.yaml +++ b/charts/background-worker/templates/deployment.yaml @@ -39,10 +39,15 @@ spec: - name: "background-worker-secrets" secret: secretName: "background-worker" - {{- if eq (include "useCassandraTLS" .Values.config) "true" }} - - name: "background-worker-cassandra" + {{- if eq (include "useGundeckCassandraTLS" .Values.config) "true" }} + - name: "background-worker-cassandra-gundeck" secret: - secretName: {{ (include "tlsSecretRef" .Values.config | fromYaml).name }} + secretName: {{ (include "gundeckTlsSecretRef" .Values.config | fromYaml).name }} + {{- end }} + {{- if eq (include "useBrigCassandraTLS" .Values.config) "true" }} + - name: "background-worker-cassandra-brig" + secret: + secretName: {{ (include "brigTlsSecretRef" .Values.config | fromYaml).name }} {{- end }} {{- if .Values.config.rabbitmq.tlsCaSecretRef }} - name: "rabbitmq-ca" @@ -58,11 +63,17 @@ spec: {{- toYaml .Values.podSecurityContext | nindent 12 }} {{- end }} volumeMounts: + - name: "background-worker-secrets" + mountPath: "/etc/wire/background-worker/secrets" - name: "background-worker-config" mountPath: "/etc/wire/background-worker/conf" - {{- if eq (include "useCassandraTLS" .Values.config) "true" }} - - name: "background-worker-cassandra" - mountPath: "/etc/wire/background-worker/cassandra" + {{- if eq (include "useGundeckCassandraTLS" .Values.config) "true" }} + - name: "background-worker-cassandra-gundeck" + mountPath: "/etc/wire/background-worker/cassandra-gundeck" + {{- end }} + {{- if eq (include "useBrigCassandraTLS" .Values.config) "true" }} + - name: "background-worker-cassandra-brig" + mountPath: "/etc/wire/background-worker/cassandra-brig" {{- end }} {{- if .Values.config.rabbitmq.tlsCaSecretRef }} - name: "rabbitmq-ca" diff --git a/charts/background-worker/templates/secret.yaml b/charts/background-worker/templates/secret.yaml index 25a22ce67e..dfde355db9 100644 --- a/charts/background-worker/templates/secret.yaml +++ b/charts/background-worker/templates/secret.yaml @@ -15,4 +15,7 @@ data: {{- with .Values.secrets }} rabbitmqUsername: {{ .rabbitmq.username | b64enc | quote }} rabbitmqPassword: {{ .rabbitmq.password | b64enc | quote }} + {{- if .pgPassword }} + pgPassword: {{ .pgPassword | b64enc | quote }} + {{- end }} {{- end }} diff --git a/charts/background-worker/values.yaml b/charts/background-worker/values.yaml index a0117c9363..0abef0f43b 100644 --- a/charts/background-worker/values.yaml +++ b/charts/background-worker/values.yaml @@ -19,6 +19,17 @@ config: logLevel: Info logFormat: StructuredJSON enableFederation: false # keep in sync with brig, cargohold and galley charts' config.enableFederation as well as wire-server chart's tags.federation + # Postgres connection settings + # + # Values are described in https://www.postgresql.org/docs/17/libpq-connect.html#LIBPQ-PARAMKEYWORDS + # To set the password via a background-worker secret see `secrets.pgPassword`. + # + # Below is an example configuration used in CI tests. + postgresql: + host: postgresql # DNS name without protocol + port: "5432" + user: wire-server + dbname: wire-server rabbitmq: host: rabbitmq port: 5672 @@ -29,15 +40,28 @@ config: # tlsCaSecretRef: # name: # key: + # Cassandra clusters used by background-worker cassandra: host: aws-cassandra + cassandraBrig: + host: aws-cassandra backendNotificationPusher: pushBackoffMinWait: 10000 # in microseconds, so 10ms pushBackoffMaxWait: 300000000 # microseconds, so 300s remotesRefreshInterval: 300000000 # microseconds, so 300s + # Background jobs consumer configuration + backgroundJobs: + # Maximum number of in-flight jobs per process + concurrency: 8 + # Per-attempt timeout in seconds + jobTimeout: 60 + # Total attempts, including the first try + maxAttempts: 3 + secrets: {} + # pgPassword: podSecurityContext: allowPrivilegeEscalation: false diff --git a/charts/integration/templates/configmap.yaml b/charts/integration/templates/configmap.yaml index 84aa623acb..5402a048f2 100644 --- a/charts/integration/templates/configmap.yaml +++ b/charts/integration/templates/configmap.yaml @@ -56,6 +56,11 @@ data: backgroundWorker: host: backgroundWorker.{{ .Release.Namespace }}.svc.cluster.local port: 8080 + # Background jobs defaults for integration tests + backgroundJobs: + concurrency: 4 + jobTimeout: 5 + maxAttempts: 3 stern: host: stern.{{ .Release.Namespace }}.svc.cluster.local diff --git a/docs/src/developer/reference/config-options.md b/docs/src/developer/reference/config-options.md index 02d66cd070..f8601b5415 100644 --- a/docs/src/developer/reference/config-options.md +++ b/docs/src/developer/reference/config-options.md @@ -1669,3 +1669,24 @@ gundeck: settings: cellsEventQueue: "cells_events" ``` +## Background worker: Background jobs + +The background worker consumes jobs from RabbitMQ to process tasks asynchronously. The following configuration controls the consumer’s behavior: + +Internal YAML file and Helm values (under `background-worker.config`): + +```yaml +backgroundJobs: + # Maximum number of in-flight jobs per process + concurrency: 8 + # Per-attempt timeout in seconds + jobTimeout: 60 + # Total attempts including the first run + maxAttempts: 3 +``` + +Notes: + +- `concurrency` controls the AMQP prefetch and caps parallel handler execution per process. +- `jobTimeout` bounds each attempt; timed‑out attempts are retried until `maxAttempts` is reached. +- `maxAttempts` is total tries (first run plus retries). On final failure, the job is dropped (NACK requeue=false) and counted in metrics. diff --git a/hack/helm_vars/wire-server/values.yaml.gotmpl b/hack/helm_vars/wire-server/values.yaml.gotmpl index 1216b96925..13eb08989a 100644 --- a/hack/helm_vars/wire-server/values.yaml.gotmpl +++ b/hack/helm_vars/wire-server/values.yaml.gotmpl @@ -58,6 +58,7 @@ brig: teamCreatorWelcome: https://teams.wire.com/login teamMemberWelcome: https://wire.com/download accountPages: https://account.wire.com + # Background-worker uses Brig's Cassandra keyspace. cassandra: host: {{ .Values.cassandraHost }} replicaCount: 1 @@ -603,6 +604,11 @@ background-worker: pushBackoffMinWait: 1000 # 1ms pushBackoffMaxWait: 500000 # 0.5s remotesRefreshInterval: 1000000 # 1s + backgroundJobs: + concurrency: 8 + jobTimeout: 60 + maxAttempts: 3 + # Cassandra clusters used by background-worker cassandra: host: {{ .Values.cassandraHost }} replicaCount: 1 @@ -611,6 +617,14 @@ background-worker: name: "cassandra-jks-keystore" key: "ca.crt" {{- end }} + cassandraBrig: + host: {{ .Values.cassandraHost }} + replicaCount: 1 + {{- if .Values.useK8ssandraSSL.enabled }} + tlsCaSecretRef: + name: "cassandra-jks-keystore" + key: "ca.crt" + {{- end }} rabbitmq: port: 5671 adminPort: 15671 @@ -619,10 +633,16 @@ background-worker: tlsCaSecretRef: name: "rabbitmq-certificate" key: "ca.crt" + postgresql: + host: "postgresql" + port: "5432" + user: wire-server + dbname: wire-server secrets: rabbitmq: username: {{ .Values.rabbitmqUsername }} password: {{ .Values.rabbitmqPassword }} + pgPassword: "posty-the-gres" integration: ingress: diff --git a/integration/test/API/Brig.hs b/integration/test/API/Brig.hs index b7ef7bc465..9963e05709 100644 --- a/integration/test/API/Brig.hs +++ b/integration/test/API/Brig.hs @@ -1061,6 +1061,16 @@ getUserGroup user gid = do req <- baseRequest user Brig Versioned $ joinHttpPath ["user-groups", gid] submit "GET" req +getUserGroupWithChannels :: (MakesValue user) => user -> String -> App Response +getUserGroupWithChannels user gid = do + req <- baseRequest user Brig Versioned $ joinHttpPath ["user-groups", gid] + submit "GET" $ req & addQueryParams [("include_channels", "true")] + +updateUserGroupChannels :: (MakesValue user) => user -> String -> [String] -> App Response +updateUserGroupChannels user gid convIds = do + req <- baseRequest user Brig Versioned $ joinHttpPath ["user-groups", gid, "channels"] + submit "PUT" $ req & addJSONObject ["channels" .= convIds] + data GetUserGroupsArgs = GetUserGroupsArgs { q :: Maybe String, sortByKeys :: Maybe String, diff --git a/integration/test/Test/Events.hs b/integration/test/Test/Events.hs index 5eaabf5354..40782657bd 100644 --- a/integration/test/Test/Events.hs +++ b/integration/test/Test/Events.hs @@ -157,11 +157,12 @@ testTemporaryQueuesAreDeletedAfterUse = do aliceClientQueue = Queue {name = fromString aliceClientQueueName, vhost = fromString beResource.berVHost} deadNotifsQueue = Queue {name = fromString "dead-user-notifications", vhost = fromString beResource.berVHost} cellsEventsQueue = Queue {name = fromString "cells_events", vhost = fromString beResource.berVHost} + backgroundJobsQueue = Queue {name = fromString "background-jobs", vhost = fromString beResource.berVHost} -- Wait for queue for the new client to be created eventually $ do queuesBeforeWS <- rabbitmqAdmin.listQueuesByVHost (fromString beResource.berVHost) (fromString "") True 100 1 - queuesBeforeWS.items `shouldMatchSet` [deadNotifsQueue, cellsEventsQueue, aliceClientQueue] + queuesBeforeWS.items `shouldMatchSet` [deadNotifsQueue, cellsEventsQueue, aliceClientQueue, backgroundJobsQueue] runCodensity (createEventsWebSocket alice Nothing) $ \ws -> do handle <- randomHandle @@ -169,7 +170,7 @@ testTemporaryQueuesAreDeletedAfterUse = do queuesDuringWS <- rabbitmqAdmin.listQueuesByVHost (fromString beResource.berVHost) (fromString "") True 100 1 addJSONToFailureContext "queuesDuringWS" queuesDuringWS $ do - length queuesDuringWS.items `shouldMatchInt` 4 + length queuesDuringWS.items `shouldMatchInt` 5 -- We cannot use 'assertEvent' here because there is a race between the temp -- queue being created and rabbitmq fanning out the previous events. @@ -183,7 +184,7 @@ testTemporaryQueuesAreDeletedAfterUse = do eventually $ do queuesAfterWS <- rabbitmqAdmin.listQueuesByVHost (fromString beResource.berVHost) (fromString "") True 100 1 - queuesAfterWS.items `shouldMatchSet` [deadNotifsQueue, cellsEventsQueue, aliceClientQueue] + queuesAfterWS.items `shouldMatchSet` [deadNotifsQueue, cellsEventsQueue, aliceClientQueue, backgroundJobsQueue] testSendMessageNoReturnToSenderWithConsumableNotificationsProteus :: (HasCallStack) => App () testSendMessageNoReturnToSenderWithConsumableNotificationsProteus = do diff --git a/integration/test/Test/UserGroup.hs b/integration/test/Test/UserGroup.hs index 34eaf5ccbd..340b8a65c6 100644 --- a/integration/test/Test/UserGroup.hs +++ b/integration/test/Test/UserGroup.hs @@ -373,3 +373,59 @@ testUserGroupMembersCount = do resp.status `shouldMatchInt` 200 resp.json %. "page.0.membersCount" `shouldMatchInt` 2 resp.json %. "total" `shouldMatchInt` 1 + +testUserGroupUpdateChannelsSucceeds :: (HasCallStack) => App () +testUserGroupUpdateChannelsSucceeds = do + (alice, tid, [_bob]) <- createTeam OwnDomain 2 + + ug <- + createUserGroup alice (object ["name" .= "none", "members" .= (mempty :: [String])]) + >>= getJSON 200 + gid <- ug %. "id" & asString + + convIds <- + replicateM 2 + $ postConversation alice (defProteus {team = Just tid}) + >>= getJSON 201 + >>= objConvId + updateUserGroupChannels alice gid ((.id_) <$> convIds) >>= assertSuccess + + -- bobId <- asString $ bob %. "id" + bindResponse (getUserGroupWithChannels alice gid) $ \resp -> do + resp.status `shouldMatchInt` 200 + actual <- resp.json %. "channels" >>= asList >>= traverse objQid + actual `shouldMatchSet` for convIds objQid + +-- FUTUREWORK: check the actual associated channels +-- resp.json %. "members" `shouldMatch` [bobId] + +testUserGroupUpdateChannelsNonAdmin :: (HasCallStack) => App () +testUserGroupUpdateChannelsNonAdmin = do + (alice, tid, [bob]) <- createTeam OwnDomain 2 + + ug <- + createUserGroup alice (object ["name" .= "none", "members" .= (mempty :: [String])]) + >>= getJSON 200 + gid <- ug %. "id" & asString + + convId <- + postConversation alice (defProteus {team = Just tid}) + >>= getJSON 201 + >>= objConvId + updateUserGroupChannels bob gid [convId.id_] >>= assertLabel 404 "user-group-not-found" + +testUserGroupUpdateChannelsNonExisting :: (HasCallStack) => App () +testUserGroupUpdateChannelsNonExisting = do + (alice, tid, _) <- createTeam OwnDomain 1 + (bob, _, _) <- createTeam OwnDomain 1 + + ug <- + createUserGroup alice (object ["name" .= "none", "members" .= (mempty :: [String])]) + >>= getJSON 200 + gid <- ug %. "id" & asString + + convId <- + postConversation alice (defProteus {team = Just tid}) + >>= getJSON 201 + >>= objConvId + updateUserGroupChannels bob gid [convId.id_] >>= assertLabel 404 "user-group-not-found" diff --git a/libs/polysemy-wire-zoo/polysemy-wire-zoo.cabal b/libs/polysemy-wire-zoo/polysemy-wire-zoo.cabal index 42d0ef800a..87dc102d65 100644 --- a/libs/polysemy-wire-zoo/polysemy-wire-zoo.cabal +++ b/libs/polysemy-wire-zoo/polysemy-wire-zoo.cabal @@ -33,6 +33,7 @@ library Wire.Sem.Paging.Cassandra Wire.Sem.Random Wire.Sem.Random.IO + Wire.Sem.Random.Null other-modules: Paths_polysemy_wire_zoo hs-source-dirs: src diff --git a/libs/polysemy-wire-zoo/src/Wire/Sem/Random.hs b/libs/polysemy-wire-zoo/src/Wire/Sem/Random.hs index 8cc1ef3386..20e6d02466 100644 --- a/libs/polysemy-wire-zoo/src/Wire/Sem/Random.hs +++ b/libs/polysemy-wire-zoo/src/Wire/Sem/Random.hs @@ -21,6 +21,7 @@ module Wire.Sem.Random ( Random (..), bytes, uuid, + newId, scimTokenId, liftRandom, nDigitNumber, @@ -28,7 +29,7 @@ module Wire.Sem.Random where import Crypto.Random.Types -import Data.Id (ScimTokenId) +import Data.Id (Id, ScimTokenId) import Data.UUID (UUID) import Imports import Polysemy @@ -36,6 +37,7 @@ import Polysemy data Random m a where Bytes :: Int -> Random m ByteString Uuid :: Random m UUID + NewId :: Random m (Id a) ScimTokenId :: Random m ScimTokenId LiftRandom :: (forall mr. (MonadRandom mr) => mr a) -> Random m a NDigitNumber :: Int -> Random m Integer diff --git a/libs/polysemy-wire-zoo/src/Wire/Sem/Random/IO.hs b/libs/polysemy-wire-zoo/src/Wire/Sem/Random/IO.hs index d073d267e8..53d75a904e 100644 --- a/libs/polysemy-wire-zoo/src/Wire/Sem/Random/IO.hs +++ b/libs/polysemy-wire-zoo/src/Wire/Sem/Random/IO.hs @@ -35,6 +35,7 @@ randomToIO :: randomToIO = interpret $ \case Bytes i -> embed $ randBytes i Uuid -> embed $ UUID.nextRandom + NewId -> embed $ randomId @IO ScimTokenId -> embed $ randomId @IO LiftRandom m -> embed @IO $ m NDigitNumber n -> embed $ randIntegerZeroToNMinusOne (10 ^ n) diff --git a/libs/polysemy-wire-zoo/src/Wire/Sem/Random/Null.hs b/libs/polysemy-wire-zoo/src/Wire/Sem/Random/Null.hs new file mode 100644 index 0000000000..7d19eff66c --- /dev/null +++ b/libs/polysemy-wire-zoo/src/Wire/Sem/Random/Null.hs @@ -0,0 +1,39 @@ +-- This file is part of the Wire Server implementation. +-- +-- Copyright (C) 2025 Wire Swiss GmbH +-- +-- This program is free software: you can redistribute it and/or modify it under +-- the terms of the GNU Affero General Public License as published by the Free +-- Software Foundation, either version 3 of the License, or (at your option) any +-- later version. +-- +-- This program is distributed in the hope that it will be useful, but WITHOUT +-- ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +-- FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more +-- details. +-- +-- You should have received a copy of the GNU Affero General Public License along +-- with this program. If not, see . + +module Wire.Sem.Random.Null + ( randomToNull, + ) +where + +import Crypto.Random +import Data.Id (Id (..)) +import qualified Data.UUID as UUID +import Imports +import Polysemy +import Wire.Sem.Random (Random (..)) + +randomToNull :: + Sem (Random ': r) a -> + Sem r a +randomToNull = interpret $ \case + Bytes i -> pure $ mconcat $ replicate i "0" + Uuid -> pure UUID.nil + NewId -> pure $ Id UUID.nil + ScimTokenId -> pure $ Id UUID.nil + LiftRandom m -> pure $ fst $ withDRG (drgNewSeed $ seedFromInteger 0) m + NDigitNumber n -> pure $ 10 ^ n diff --git a/libs/types-common/src/Data/Id.hs b/libs/types-common/src/Data/Id.hs index ebd24fc53b..ad7f4526ff 100644 --- a/libs/types-common/src/Data/Id.hs +++ b/libs/types-common/src/Data/Id.hs @@ -37,6 +37,7 @@ module Data.Id ServiceId, TeamId, ScimTokenId, + JobId, parseIdFromText, idToText, idObjectSchema, @@ -111,6 +112,7 @@ data IdTag | OAuthClient | OAuthRefreshToken | Challenge + | Job idTagName :: IdTag -> Text idTagName Asset = "Asset" @@ -125,6 +127,7 @@ idTagName ScimToken = "ScimToken" idTagName OAuthClient = "OAuthClient" idTagName OAuthRefreshToken = "OAuthRefreshToken" idTagName Challenge = "Challenge" +idTagName Job = "Job" class KnownIdTag (t :: IdTag) where idTagValue :: IdTag @@ -151,6 +154,8 @@ instance KnownIdTag 'OAuthClient where idTagValue = OAuthClient instance KnownIdTag 'OAuthRefreshToken where idTagValue = OAuthRefreshToken +instance KnownIdTag 'Job where idTagValue = Job + type AssetId = Id 'Asset type InvitationId = Id 'Invitation @@ -177,6 +182,8 @@ type OAuthRefreshTokenId = Id 'OAuthRefreshToken type ChallengeId = Id 'Challenge +type JobId = Id 'Job + -- Id ------------------------------------------------------------------------- data NoId = NoId deriving (Eq, Show, Generic) @@ -434,6 +441,9 @@ newtype RequestId = RequestId ToBytes ) +instance Arbitrary RequestId where + arbitrary = RequestId . UUID.toASCIIBytes <$> arbitrary @UUID + defRequestId :: (IsString s) => s defRequestId = "N/A" diff --git a/libs/wire-api/src/Wire/API/BackgroundJobs.hs b/libs/wire-api/src/Wire/API/BackgroundJobs.hs new file mode 100644 index 0000000000..a3d7474c14 --- /dev/null +++ b/libs/wire-api/src/Wire/API/BackgroundJobs.hs @@ -0,0 +1,165 @@ +{-# LANGUAGE StrictData #-} +{-# LANGUAGE TemplateHaskell #-} + +-- This file is part of the Wire Server implementation. +-- +-- Copyright (C) 2025 Wire Swiss GmbH +-- +-- This program is free software: you can redistribute it and/or modify it under +-- the terms of the GNU Affero General Public License as published by the Free +-- Software Foundation, either version 3 of the License, or (at your option) any +-- later version. +-- +-- This program is distributed in the hope that it will be useful, but WITHOUT +-- ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +-- FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more +-- details. +-- +-- You should have received a copy of the GNU Affero General Public License along +-- with this program. If not, see . + +module Wire.API.BackgroundJobs where + +import Control.Arrow ((&&&)) +import Control.Lens (makePrisms) +import Data.Aeson qualified as Aeson +import Data.Id +import Data.Map.Strict qualified as Map +import Data.OpenApi qualified as S +import Data.Schema +import Imports +import Network.AMQP qualified as Q +import Network.AMQP.Types qualified as QT +import Wire.Arbitrary (Arbitrary (..), GenericUniform (..)) + +data JobPayload + = JobSyncUserGroupAndChannel SyncUserGroupAndChannel + | JobSyncUserGroup SyncUserGroup + deriving stock (Eq, Show, Generic) + deriving (Arbitrary) via GenericUniform JobPayload + +jobPayloadLabel :: JobPayload -> Text +jobPayloadLabel p = case jobPayloadTag p of + JobSyncUserGroupAndChannelTag -> "sync-user-group-and-channel" + JobSyncUserGroupTag -> "sync-user-group" + +data JobPayloadTag + = JobSyncUserGroupAndChannelTag + | JobSyncUserGroupTag + deriving stock (Eq, Ord, Bounded, Enum, Show, Generic) + deriving (Arbitrary) via GenericUniform JobPayloadTag + +instance ToSchema JobPayloadTag where + schema = + enum @Text "JobPayloadTag" $ + mconcat + [ element "sync-user-group-and-channel" JobSyncUserGroupAndChannelTag, + element "sync-user-group" JobSyncUserGroupTag + ] + +jobPayloadTag :: JobPayload -> JobPayloadTag +jobPayloadTag = + \case + JobSyncUserGroupAndChannel {} -> JobSyncUserGroupAndChannelTag + JobSyncUserGroup {} -> JobSyncUserGroupTag + +jobPayloadTagSchema :: ObjectSchema SwaggerDoc JobPayloadTag +jobPayloadTagSchema = field "type" schema + +data SyncUserGroupAndChannel = SyncUserGroupAndChannel + { userGroupId :: UserGroupId, + convId :: ConvId, + actor :: UserId + } + deriving (Show, Eq, Generic) + deriving (Aeson.ToJSON, Aeson.FromJSON) via (Schema SyncUserGroupAndChannel) + deriving (Arbitrary) via GenericUniform SyncUserGroupAndChannel + +instance ToSchema SyncUserGroupAndChannel where + schema = + object "SyncUserGroupAndChannel" $ + SyncUserGroupAndChannel + <$> (.userGroupId) .= field "userGroupId" schema + <*> (.convId) .= field "convId" schema + <*> (.actor) .= field "actor" schema + +data SyncUserGroup = SyncUserGroup + { userGroupId :: UserGroupId, + actor :: UserId + } + deriving (Show, Eq, Generic) + deriving (Aeson.ToJSON, Aeson.FromJSON) via (Schema SyncUserGroup) + deriving (Arbitrary) via GenericUniform SyncUserGroup + +instance ToSchema SyncUserGroup where + schema = + object "SyncUserGroup" $ + SyncUserGroup + <$> (.userGroupId) .= field "userGroupId" schema + <*> (.actor) .= field "actor" schema + +makePrisms ''JobPayload + +jobPayloadObjectSchema :: ObjectSchema SwaggerDoc JobPayload +jobPayloadObjectSchema = + snd + <$> (jobPayloadTag &&& id) + .= bind + (fst .= jobPayloadTagSchema) + (snd .= dispatch jobPayloadDataSchema) + where + jobPayloadDataSchema :: JobPayloadTag -> ObjectSchema SwaggerDoc JobPayload + jobPayloadDataSchema = \case + JobSyncUserGroupAndChannelTag -> tag _JobSyncUserGroupAndChannel (field "payload" schema) + JobSyncUserGroupTag -> tag _JobSyncUserGroup (field "payload" schema) + +instance ToSchema JobPayload where + schema = object "JobPayload" jobPayloadObjectSchema + +deriving via (Schema JobPayload) instance Aeson.FromJSON JobPayload + +deriving via (Schema JobPayload) instance Aeson.ToJSON JobPayload + +deriving via (Schema JobPayload) instance S.ToSchema JobPayload + +-- | Background job envelope. Payload is a free-form JSON object. +data Job = Job + { jobId :: JobId, + requestId :: RequestId, + payload :: JobPayload + } + deriving stock (Eq, Show, Generic) + deriving (Arbitrary) via GenericUniform Job + deriving (Aeson.ToJSON, Aeson.FromJSON, S.ToSchema) via Schema Job + +instance ToSchema Job where + schema = + object "Job" $ + Job + <$> jobId .= field "id" schema + <*> requestId .= field "requestId" schema + <*> payload .= field "payload" schema + +backgroundJobsRoutingKey :: Text +backgroundJobsRoutingKey = backgroundJobsQueueName + +backgroundJobsQueueName :: Text +backgroundJobsQueueName = "background-jobs" + +ensureBackgroundJobsQueue :: Q.Channel -> IO () +ensureBackgroundJobsQueue chan = do + let headers = + QT.FieldTable + ( Map.fromList + [ ("x-queue-type", QT.FVString "quorum") + ] + ) + q = + Q.newQueue + { Q.queueName = backgroundJobsQueueName, + Q.queueDurable = True, + Q.queueAutoDelete = False, + Q.queueExclusive = False, + Q.queueHeaders = headers + } + void $ Q.declareQueue chan q diff --git a/libs/wire-api/src/Wire/API/Error/Brig.hs b/libs/wire-api/src/Wire/API/Error/Brig.hs index 856e5f9520..4c11d0840e 100644 --- a/libs/wire-api/src/Wire/API/Error/Brig.hs +++ b/libs/wire-api/src/Wire/API/Error/Brig.hs @@ -118,6 +118,7 @@ data BrigError | UserGroupNotFound | UserGroupNotATeamAdmin | UserGroupMemberIsNotInTheSameTeam + | UserGroupChannelNotFound | DuplicateEntry | MLSInvalidLeafNodeSignature @@ -351,6 +352,8 @@ type instance MapError 'UserGroupNotFound = 'StaticError 404 "user-group-not-fou type instance MapError 'UserGroupNotATeamAdmin = 'StaticError 403 "user-group-write-forbidden" "Only team admins can create, update, or delete user groups." +type instance MapError 'UserGroupChannelNotFound = 'StaticError 404 "user-group-channel-not-found" "Specified Channel does not exists or does not belongs to the team" + type instance MapError 'UserGroupMemberIsNotInTheSameTeam = 'StaticError 400 "user-group-invalid" "Only team members of the same team can be added to a user group." type instance MapError 'DuplicateEntry = 'StaticError 409 "duplicate-entry" "Entry already exists" diff --git a/libs/wire-api/src/Wire/API/Routes/Public/Brig.hs b/libs/wire-api/src/Wire/API/Routes/Public/Brig.hs index b54f9db2a8..255572678b 100644 --- a/libs/wire-api/src/Wire/API/Routes/Public/Brig.hs +++ b/libs/wire-api/src/Wire/API/Routes/Public/Brig.hs @@ -314,7 +314,7 @@ type UserGroupAPI = ) :<|> Named "get-user-group" - ( Summary "[STUB] (channels in response not implemented)" + ( Summary "Fetch a group accessible from the logged-in user" :> From 'V10 :> ZLocalUser :> CanThrow 'UserGroupNotFound @@ -331,7 +331,7 @@ type UserGroupAPI = ) :<|> Named "get-user-groups" - ( Summary "[STUB] (channelsCount not implemented)" + ( Summary "Fetch groups accessible from the logged-in user" :> From 'V10 :> ZLocalUser :> "user-groups" @@ -342,6 +342,7 @@ type UserGroupAPI = :> QueryParam' '[Optional, Strict, LastSeenNameDesc] "last_seen_name" UserGroupName :> QueryParam' '[Optional, Strict, LastSeenCreatedAtDesc] "last_seen_created_at" UTCTimeMillis :> QueryParam' '[Optional, Strict, LastSeenIdDesc] "last_seen_id" UserGroupId + :> QueryFlag "include_channels" :> QueryFlag "include_member_count" :> Get '[JSON] UserGroupPage ) @@ -418,8 +419,11 @@ type UserGroupAPI = ) :<|> Named "update-user-group-channels" - ( Summary "[STUB] Update user group channels. Replaces the channels with the given list." + ( Summary "Replaces the channels with the given list." :> From 'V12 + :> CanThrow 'UserGroupNotFound + :> CanThrow 'UserGroupNotATeamAdmin + :> CanThrow 'UserGroupNotFound :> ZLocalUser :> "user-groups" :> Capture "gid" UserGroupId diff --git a/libs/wire-api/test/unit/Test/Wire/API/Roundtrip/Aeson.hs b/libs/wire-api/test/unit/Test/Wire/API/Roundtrip/Aeson.hs index 57cf9e9fda..91f136c22b 100644 --- a/libs/wire-api/test/unit/Test/Wire/API/Roundtrip/Aeson.hs +++ b/libs/wire-api/test/unit/Test/Wire/API/Roundtrip/Aeson.hs @@ -27,6 +27,7 @@ import Test.Tasty qualified as T import Test.Tasty.QuickCheck (Arbitrary, counterexample, testProperty, (.&&.), (===)) import Type.Reflection (typeRep) import Wire.API.Asset qualified as Asset +import Wire.API.BackgroundJobs qualified as BackgroundJobs import Wire.API.Call.Config qualified as Call.Config import Wire.API.Connection qualified as Connection import Wire.API.Conversation qualified as Conversation @@ -356,7 +357,8 @@ tests = testRoundTrip @TeamsIntra.TeamStatus, testRoundTrip @TeamsIntra.TeamStatusUpdate, testRoundTrip @TeamsIntra.TeamData, - testRoundTrip @TeamsIntra.TeamName + testRoundTrip @TeamsIntra.TeamName, + testRoundTrip @BackgroundJobs.Job ] testRoundTrip :: @@ -365,7 +367,7 @@ testRoundTrip :: T.TestTree testRoundTrip = testProperty msg trip where - msg = show (typeRep @a) + msg = show (typeRep @a) <> " JSON roundtrip" trip (v :: a) = counterexample (show $ toJSON v) $ Right v === (parseEither parseJSON . toJSON) v diff --git a/libs/wire-api/wire-api.cabal b/libs/wire-api/wire-api.cabal index 29004088e0..17e175b766 100644 --- a/libs/wire-api/wire-api.cabal +++ b/libs/wire-api/wire-api.cabal @@ -72,6 +72,7 @@ library Wire.API.App Wire.API.ApplyMods Wire.API.Asset + Wire.API.BackgroundJobs Wire.API.Bot Wire.API.Bot.Service Wire.API.Call.Config diff --git a/libs/wire-subsystems/postgres-migrations/202509190851023-user_group_conversations_create_table.sql b/libs/wire-subsystems/postgres-migrations/202509190851023-user_group_conversations_create_table.sql new file mode 100644 index 0000000000..f63e3745f0 --- /dev/null +++ b/libs/wire-subsystems/postgres-migrations/202509190851023-user_group_conversations_create_table.sql @@ -0,0 +1,8 @@ +CREATE TABLE public.user_group_channel ( + user_group_id uuid NOT NULL, + conv_id uuid NOT NULL, + PRIMARY KEY (user_group_id, conv_id) +); + +ALTER TABLE ONLY public.user_group_channel + ADD CONSTRAINT fk_user_group_channel FOREIGN KEY (user_group_id) REFERENCES public.user_group(id) ON DELETE CASCADE; diff --git a/libs/wire-subsystems/src/Wire/BackgroundJobsPublisher.hs b/libs/wire-subsystems/src/Wire/BackgroundJobsPublisher.hs new file mode 100644 index 0000000000..9c65214dfc --- /dev/null +++ b/libs/wire-subsystems/src/Wire/BackgroundJobsPublisher.hs @@ -0,0 +1,12 @@ +{-# LANGUAGE TemplateHaskell #-} + +module Wire.BackgroundJobsPublisher where + +import Data.Id +import Polysemy +import Wire.API.BackgroundJobs (JobPayload) + +data BackgroundJobsPublisher m a where + PublishJob :: JobId -> JobPayload -> BackgroundJobsPublisher m () + +makeSem ''BackgroundJobsPublisher diff --git a/libs/wire-subsystems/src/Wire/BackgroundJobsPublisher/Null.hs b/libs/wire-subsystems/src/Wire/BackgroundJobsPublisher/Null.hs new file mode 100644 index 0000000000..87cc32e530 --- /dev/null +++ b/libs/wire-subsystems/src/Wire/BackgroundJobsPublisher/Null.hs @@ -0,0 +1,9 @@ +module Wire.BackgroundJobsPublisher.Null where + +import Imports +import Polysemy +import Wire.BackgroundJobsPublisher (BackgroundJobsPublisher (..)) + +interpretBackgroundJobsPublisherNoConfig :: InterpreterFor BackgroundJobsPublisher r +interpretBackgroundJobsPublisherNoConfig = interpret $ \case + PublishJob {} -> pure () diff --git a/libs/wire-subsystems/src/Wire/BackgroundJobsPublisher/RabbitMQ.hs b/libs/wire-subsystems/src/Wire/BackgroundJobsPublisher/RabbitMQ.hs new file mode 100644 index 0000000000..2ce9126ed9 --- /dev/null +++ b/libs/wire-subsystems/src/Wire/BackgroundJobsPublisher/RabbitMQ.hs @@ -0,0 +1,65 @@ +module Wire.BackgroundJobsPublisher.RabbitMQ where + +import Data.Aeson qualified as Aeson +import Data.Id (JobId, RequestId (..), idToText) +import Data.Text.Encoding qualified as T +import Imports +import Network.AMQP qualified as Q +import Polysemy +import Polysemy.Input +import Wire.API.BackgroundJobs +import Wire.BackgroundJobsPublisher (BackgroundJobsPublisher (..)) +import Wire.BackgroundJobsPublisher.Null (interpretBackgroundJobsPublisherNoConfig) + +interpretBackgroundJobsPublisherRabbitMQOptional :: + ( Member (Embed IO) r + ) => + RequestId -> + Maybe (MVar Q.Channel) -> + InterpreterFor BackgroundJobsPublisher r +interpretBackgroundJobsPublisherRabbitMQOptional requestId = + \case + Nothing -> interpretBackgroundJobsPublisherNoConfig + Just channelRef -> + runInputSem (readMVar channelRef) + . interpretBackgroundJobsPublisherRabbitMQ requestId + . raiseUnder + +interpretBackgroundJobsPublisherRabbitMQ :: + ( Member (Embed IO) r, + Member (Input Q.Channel) r + ) => + RequestId -> + InterpreterFor BackgroundJobsPublisher r +interpretBackgroundJobsPublisherRabbitMQ requestId = + interpret $ \case + PublishJob jobId jobPayload -> do + channel <- input + publishJob requestId channel jobId jobPayload + +publishJob :: + ( Member (Embed IO) r + ) => + RequestId -> + Q.Channel -> + JobId -> + JobPayload -> + Sem r () +publishJob requestId channel jobId jobPayload = do + let job = + Job + { payload = jobPayload, + jobId = jobId, + requestId = requestId + } + msg = + Q.newMsg + { Q.msgBody = Aeson.encode job, + Q.msgContentType = Just "application/json", + Q.msgID = Just (idToText job.jobId), + Q.msgCorrelationID = Just $ T.decodeUtf8 job.requestId.unRequestId + } + + liftIO $ do + ensureBackgroundJobsQueue channel + void $ Q.publishMsg channel "" backgroundJobsRoutingKey msg diff --git a/libs/wire-subsystems/src/Wire/BackgroundJobsRunner.hs b/libs/wire-subsystems/src/Wire/BackgroundJobsRunner.hs new file mode 100644 index 0000000000..4515563ef1 --- /dev/null +++ b/libs/wire-subsystems/src/Wire/BackgroundJobsRunner.hs @@ -0,0 +1,11 @@ +{-# LANGUAGE TemplateHaskell #-} + +module Wire.BackgroundJobsRunner where + +import Polysemy +import Wire.API.BackgroundJobs (Job) + +data BackgroundJobsRunner m a where + RunJob :: Job -> BackgroundJobsRunner m () + +makeSem ''BackgroundJobsRunner diff --git a/libs/wire-subsystems/src/Wire/BackgroundJobsRunner/Interpreter.hs b/libs/wire-subsystems/src/Wire/BackgroundJobsRunner/Interpreter.hs new file mode 100644 index 0000000000..340605f4be --- /dev/null +++ b/libs/wire-subsystems/src/Wire/BackgroundJobsRunner/Interpreter.hs @@ -0,0 +1,68 @@ +module Wire.BackgroundJobsRunner.Interpreter where + +import Data.Id +import Imports +import Polysemy +import Polysemy.Error +import Wire.API.BackgroundJobs +import Wire.BackgroundJobsPublisher +import Wire.BackgroundJobsRunner (BackgroundJobsRunner (..)) +import Wire.UserGroupStore (UserGroupStore, listUserGroupChannels) +import Wire.UserStore + +data BackgroundJobError = BackgroundJobUserTeamNotFound + deriving stock (Show, Eq) + +instance Exception BackgroundJobError + +interpretBackgroundJobsRunner :: + ( Member UserGroupStore r, + Member UserStore r, + Member (Error BackgroundJobError) r, + Member BackgroundJobsPublisher r, + Member (Embed IO) r + ) => + InterpreterFor BackgroundJobsRunner r +interpretBackgroundJobsRunner = interpret $ \case + RunJob job -> runJob job + +runJob :: + ( Member UserGroupStore r, + Member UserStore r, + Member (Error BackgroundJobError) r, + Member BackgroundJobsPublisher r, + Member (Embed IO) r + ) => + Job -> + Sem r () +runJob job = case job.payload of + JobSyncUserGroupAndChannel payload -> runSyncUserGroupAndChannel job.jobId job.requestId payload + JobSyncUserGroup payload -> runSyncUserGroup job.jobId job.requestId payload + +runSyncUserGroupAndChannel :: JobId -> RequestId -> SyncUserGroupAndChannel -> Sem r () +runSyncUserGroupAndChannel _jid _rid _job = do + pure () + +runSyncUserGroup :: + ( Member UserGroupStore r, + Member UserStore r, + Member (Error BackgroundJobError) r, + Member BackgroundJobsPublisher r, + Member (Embed IO) r + ) => + JobId -> + RequestId -> + SyncUserGroup -> + Sem r () +runSyncUserGroup _jid _rid job = do + void $ getUserTeam job.actor >>= note BackgroundJobUserTeamNotFound + channels <- listUserGroupChannels job.userGroupId + for_ channels $ \chan -> do + let syncUserGroupAndChannel = + SyncUserGroupAndChannel + { userGroupId = job.userGroupId, + convId = chan, + actor = job.actor + } + jobId <- liftIO randomId + publishJob jobId (JobSyncUserGroupAndChannel syncUserGroupAndChannel) diff --git a/libs/wire-subsystems/src/Wire/UserGroupStore.hs b/libs/wire-subsystems/src/Wire/UserGroupStore.hs index 8dad8471e7..6256e02672 100644 --- a/libs/wire-subsystems/src/Wire/UserGroupStore.hs +++ b/libs/wire-subsystems/src/Wire/UserGroupStore.hs @@ -37,5 +37,7 @@ data UserGroupStore m a where AddUser :: UserGroupId -> UserId -> UserGroupStore m () UpdateUsers :: UserGroupId -> Vector UserId -> UserGroupStore m () RemoveUser :: UserGroupId -> UserId -> UserGroupStore m () + UpdateUserGroupChannels :: UserGroupId -> Vector ConvId -> UserGroupStore m () + ListUserGroupChannels :: UserGroupId -> UserGroupStore m (Vector ConvId) makeSem ''UserGroupStore diff --git a/libs/wire-subsystems/src/Wire/UserGroupStore/Postgres.hs b/libs/wire-subsystems/src/Wire/UserGroupStore/Postgres.hs index ed216cffd6..48d49bdab7 100644 --- a/libs/wire-subsystems/src/Wire/UserGroupStore/Postgres.hs +++ b/libs/wire-subsystems/src/Wire/UserGroupStore/Postgres.hs @@ -29,7 +29,7 @@ import Polysemy.Error (Error, throw) import Polysemy.Input import Wire.API.Pagination import Wire.API.User.Profile -import Wire.API.UserGroup +import Wire.API.UserGroup hiding (UpdateUserGroupChannels) import Wire.API.UserGroup.Pagination import Wire.UserGroupStore (PaginationState (..), UserGroupPageRequest (..), UserGroupStore (..), toSortBy) @@ -53,6 +53,8 @@ interpretUserGroupStoreToPostgres = AddUser gid uid -> addUser gid uid UpdateUsers gid uids -> updateUsers gid uids RemoveUser gid uid -> removeUser gid uid + UpdateUserGroupChannels gid convIds -> updateUserGroupChannels gid convIds + ListUserGroupChannels gid -> listUserGroupChannels gid updateUsers :: (UserGroupStorePostgresEffectConstraints r) => UserGroupId -> Vector UserId -> Sem r () updateUsers gid uids = do @@ -408,6 +410,60 @@ removeUser = delete from user_group_member where user_group_id = ($1 :: uuid) and user_id = ($2 :: uuid) |] +updateUserGroupChannels :: + forall r. + (UserGroupStorePostgresEffectConstraints r) => + UserGroupId -> + Vector ConvId -> + Sem r () +updateUserGroupChannels gid convIds = do + pool <- input + eitherErrorOrUnit <- liftIO $ use pool session + either throw pure eitherErrorOrUnit + where + session :: Session () + session = TxSessions.transaction TxSessions.Serializable TxSessions.Write $ do + Tx.statement (gid, convIds) deleteStatement + Tx.statement (gid, convIds) insertStatement + + deleteStatement :: Statement (UserGroupId, Vector ConvId) () + deleteStatement = + lmap + (bimap toUUID (fmap toUUID)) + $ [resultlessStatement| + delete from user_group_channel where user_group_id = ($1 :: uuid) and conv_id not in (SELECT unnest($2 :: uuid[])) + |] + + insertStatement :: Statement (UserGroupId, Vector ConvId) () + insertStatement = + lmap (bimap (fmap (.toUUID)) (fmap (.toUUID)) . uncurry toRelationTable) $ + [resultlessStatement| + insert into user_group_channel (user_group_id, conv_id) select * from unnest ($1 :: uuid[], $2 :: uuid[]) + on conflict (user_group_id, conv_id) do nothing + |] + +listUserGroupChannels :: + forall r. + (UserGroupStorePostgresEffectConstraints r) => + UserGroupId -> + Sem r (Vector ConvId) +listUserGroupChannels gid = do + pool <- input + eitherErrorOrUnit <- liftIO $ use pool session + either throw pure eitherErrorOrUnit + where + session :: Session (Vector ConvId) + session = statement gid selectStatement + + selectStatement :: Statement UserGroupId (Vector ConvId) + selectStatement = + dimap + toUUID + (fmap Id) + [vectorStatement| + select (conv_id :: uuid) from user_group_channel where user_group_id = ($1 :: uuid) + |] + crudUser :: forall r. (UserGroupStorePostgresEffectConstraints r) => diff --git a/libs/wire-subsystems/src/Wire/UserGroupSubsystem.hs b/libs/wire-subsystems/src/Wire/UserGroupSubsystem.hs index 20ef54fe13..2148827d5d 100644 --- a/libs/wire-subsystems/src/Wire/UserGroupSubsystem.hs +++ b/libs/wire-subsystems/src/Wire/UserGroupSubsystem.hs @@ -32,5 +32,7 @@ data UserGroupSubsystem m a where AddUsers :: UserId -> UserGroupId -> Vector UserId -> UserGroupSubsystem m () UpdateUsers :: UserId -> UserGroupId -> Vector UserId -> UserGroupSubsystem m () RemoveUser :: UserId -> UserGroupId -> UserId -> UserGroupSubsystem m () + UpdateChannels :: UserId -> UserGroupId -> Vector ConvId -> UserGroupSubsystem m () + ListChannels :: UserId -> UserGroupId -> UserGroupSubsystem m (Vector ConvId) makeSem ''UserGroupSubsystem diff --git a/libs/wire-subsystems/src/Wire/UserGroupSubsystem/Interpreter.hs b/libs/wire-subsystems/src/Wire/UserGroupSubsystem/Interpreter.hs index e0655b786d..9cae4bdba9 100644 --- a/libs/wire-subsystems/src/Wire/UserGroupSubsystem/Interpreter.hs +++ b/libs/wire-subsystems/src/Wire/UserGroupSubsystem/Interpreter.hs @@ -12,6 +12,7 @@ import Imports import Polysemy import Polysemy.Error import Polysemy.Input (Input, input) +import Wire.API.BackgroundJobs import Wire.API.Error import Wire.API.Error.Brig qualified as E import Wire.API.Pagination @@ -22,8 +23,11 @@ import Wire.API.User import Wire.API.UserEvent import Wire.API.UserGroup import Wire.API.UserGroup.Pagination +import Wire.BackgroundJobsPublisher import Wire.Error +import Wire.GalleyAPIAccess (GalleyAPIAccess, getTeamConv) import Wire.NotificationSubsystem +import Wire.Sem.Random qualified as Random import Wire.TeamSubsystem import Wire.UserGroupStore (PaginationState (..), UserGroupPageRequest (..)) import Wire.UserGroupStore qualified as Store @@ -31,12 +35,15 @@ import Wire.UserGroupSubsystem (UserGroupSubsystem (..)) import Wire.UserSubsystem (UserSubsystem, getLocalUserProfiles, getUserTeam) interpretUserGroupSubsystem :: - ( Member UserSubsystem r, + ( Member Random.Random r, + Member UserSubsystem r, Member (Error UserGroupSubsystemError) r, Member Store.UserGroupStore r, Member (Input (Local ())) r, Member NotificationSubsystem r, - Member TeamSubsystem r + Member TeamSubsystem r, + Member GalleyAPIAccess r, + Member BackgroundJobsPublisher r ) => InterpreterFor UserGroupSubsystem r interpretUserGroupSubsystem = interpret $ \case @@ -50,11 +57,14 @@ interpretUserGroupSubsystem = interpret $ \case AddUsers adder groupId addeeIds -> addUsers adder groupId addeeIds UpdateUsers updater groupId uids -> updateUsers updater groupId uids RemoveUser remover groupId removeeId -> removeUser remover groupId removeeId + UpdateChannels performer groupId channelIds -> updateChannels performer groupId channelIds + ListChannels performer groupId -> listChannels performer groupId data UserGroupSubsystemError = UserGroupNotATeamAdmin | UserGroupMemberIsNotInTheSameTeam | UserGroupNotFound + | UserGroupChannelNotFound deriving (Show, Eq) userGroupSubsystemErrorToHttpError :: UserGroupSubsystemError -> HttpError @@ -63,14 +73,17 @@ userGroupSubsystemErrorToHttpError = UserGroupNotATeamAdmin -> errorToWai @E.UserGroupNotATeamAdmin UserGroupMemberIsNotInTheSameTeam -> errorToWai @E.UserGroupMemberIsNotInTheSameTeam UserGroupNotFound -> errorToWai @E.UserGroupNotFound + UserGroupChannelNotFound -> errorToWai @E.UserGroupChannelNotFound createUserGroup :: - ( Member UserSubsystem r, + ( Member Random.Random r, + Member UserSubsystem r, Member (Error UserGroupSubsystemError) r, Member Store.UserGroupStore r, Member (Input (Local ())) r, Member NotificationSubsystem r, - Member TeamSubsystem r + Member TeamSubsystem r, + Member BackgroundJobsPublisher r ) => UserId -> NewUserGroup -> @@ -91,6 +104,7 @@ createUserGroup creator newGroup = do pushNotifications [ mkEvent creator (UserGroupCreated ug.id_) admins ] + triggerSyncUserGroup creator ug.id_ pure ug getTeamAsAdmin :: @@ -239,11 +253,13 @@ deleteGroup deleter groupId = throw UserGroupNotATeamAdmin addUser :: - ( Member UserSubsystem r, + ( Member Random.Random r, + Member UserSubsystem r, Member Store.UserGroupStore r, Member (Error UserGroupSubsystemError) r, Member NotificationSubsystem r, - Member TeamSubsystem r + Member TeamSubsystem r, + Member BackgroundJobsPublisher r ) => UserId -> UserGroupId -> @@ -259,13 +275,16 @@ addUser adder groupId addeeId = do pushNotifications [ mkEvent adder (UserGroupUpdated groupId) admins ] + triggerSyncUserGroup adder groupId addUsers :: - ( Member UserSubsystem r, + ( Member Random.Random r, + Member UserSubsystem r, Member Store.UserGroupStore r, Member (Error UserGroupSubsystemError) r, Member NotificationSubsystem r, - Member TeamSubsystem r + Member TeamSubsystem r, + Member BackgroundJobsPublisher r ) => UserId -> UserGroupId -> @@ -285,12 +304,16 @@ addUsers adder groupId addeeIds = do [ mkEvent adder (UserGroupUpdated groupId) admins ] + triggerSyncUserGroup adder groupId + updateUsers :: - ( Member UserSubsystem r, + ( Member Random.Random r, + Member UserSubsystem r, Member Store.UserGroupStore r, Member (Error UserGroupSubsystemError) r, Member NotificationSubsystem r, - Member TeamSubsystem r + Member TeamSubsystem r, + Member BackgroundJobsPublisher r ) => UserId -> UserGroupId -> @@ -306,13 +329,16 @@ updateUsers updater groupId uids = do pushNotifications [ mkEvent updater (UserGroupUpdated groupId) admins ] + triggerSyncUserGroup updater groupId removeUser :: - ( Member UserSubsystem r, + ( Member Random.Random r, + Member UserSubsystem r, Member Store.UserGroupStore r, Member (Error UserGroupSubsystemError) r, Member NotificationSubsystem r, - Member TeamSubsystem r + Member TeamSubsystem r, + Member BackgroundJobsPublisher r ) => UserId -> UserGroupId -> @@ -328,3 +354,52 @@ removeUser remover groupId removeeId = do pushNotifications [ mkEvent remover (UserGroupUpdated groupId) admins ] + triggerSyncUserGroup remover groupId + +updateChannels :: + ( Member Random.Random r, + Member UserSubsystem r, + Member Store.UserGroupStore r, + Member (Error UserGroupSubsystemError) r, + Member TeamSubsystem r, + Member GalleyAPIAccess r, + Member BackgroundJobsPublisher r + ) => + UserId -> + UserGroupId -> + Vector ConvId -> + Sem r () +updateChannels performer groupId channelIds = do + void $ getUserGroup performer groupId >>= note UserGroupNotFound + teamId <- getTeamAsAdmin performer >>= note UserGroupNotATeamAdmin + traverse_ (getTeamConv performer teamId >=> note UserGroupChannelNotFound) channelIds + Store.updateUserGroupChannels groupId channelIds + triggerSyncUserGroup performer groupId + +listChannels :: + ( Member UserSubsystem r, + Member Store.UserGroupStore r, + Member (Error UserGroupSubsystemError) r, + Member TeamSubsystem r + ) => + UserId -> + UserGroupId -> + Sem r (Vector ConvId) +listChannels performer groupId = do + void $ getUserGroup performer groupId >>= note UserGroupNotFound + void $ getUserTeam performer >>= note UserGroupNotATeamAdmin + Store.listUserGroupChannels groupId + +triggerSyncUserGroup :: + ( Member Random.Random r, + Member BackgroundJobsPublisher r + ) => + UserId -> + UserGroupId -> + Sem r () +triggerSyncUserGroup performer groupId = do + jobId <- Random.newId + publishJob + jobId + $ JobSyncUserGroup + SyncUserGroup {userGroupId = groupId, actor = performer} diff --git a/libs/wire-subsystems/test/unit/Wire/MockInterpreters/Random.hs b/libs/wire-subsystems/test/unit/Wire/MockInterpreters/Random.hs index 6352edfe9d..870d813b0e 100644 --- a/libs/wire-subsystems/test/unit/Wire/MockInterpreters/Random.hs +++ b/libs/wire-subsystems/test/unit/Wire/MockInterpreters/Random.hs @@ -14,6 +14,7 @@ randomToStatefulStdGen = interpret $ \case Bytes n -> do fromShort <$> withStatefulGen (genShortByteString n) Uuid -> withStatefulGen random + NewId -> Id <$> withStatefulGen random ScimTokenId -> Id <$> withStatefulGen random LiftRandom m -> do seedInt <- withStatefulGen (random @Int) diff --git a/libs/wire-subsystems/test/unit/Wire/MockInterpreters/UserGroupStore.hs b/libs/wire-subsystems/test/unit/Wire/MockInterpreters/UserGroupStore.hs index 8be5da9a0e..59c901df94 100644 --- a/libs/wire-subsystems/test/unit/Wire/MockInterpreters/UserGroupStore.hs +++ b/libs/wire-subsystems/test/unit/Wire/MockInterpreters/UserGroupStore.hs @@ -7,9 +7,11 @@ module Wire.MockInterpreters.UserGroupStore where import Control.Lens ((%~), _2) +import Data.Domain (Domain (Domain)) import Data.Id import Data.Json.Util import Data.Map qualified as Map +import Data.Qualified import Data.Text qualified as T import Data.Time.Clock import Data.Vector (Vector, fromList) @@ -21,7 +23,7 @@ import Polysemy.State import System.Random (StdGen, mkStdGen) import Wire.API.Pagination import Wire.API.User -import Wire.API.UserGroup +import Wire.API.UserGroup hiding (UpdateUserGroupChannels) import Wire.API.UserGroup.Pagination import Wire.MockInterpreters.Now import Wire.MockInterpreters.Random @@ -62,6 +64,8 @@ userGroupStoreTestInterpreter = AddUser gid uid -> addUserImpl gid uid UpdateUsers gid uids -> updateUsersImpl gid uids RemoveUser gid uid -> removeUserImpl gid uid + UpdateUserGroupChannels gid convIds -> updateUserGroupChannelsImpl gid convIds + ListUserGroupChannels gid -> listUserGroupChannelsImpl gid updateUsersImpl :: (UserGroupStoreInMemEffectConstraints r) => UserGroupId -> Vector UserId -> Sem r () updateUsersImpl gid uids = do @@ -179,6 +183,33 @@ removeUserImpl gid uid = do modifyUserGroupsGidOnly gid (Map.alter f) +updateUserGroupChannelsImpl :: + (UserGroupStoreInMemEffectConstraints r) => + UserGroupId -> + Vector ConvId -> + Sem r () +updateUserGroupChannelsImpl gid convIds = do + let f :: Maybe UserGroup -> Maybe UserGroup + f Nothing = Nothing + f (Just g) = + Just + ( g + { channels = Identity $ Just $ flip Qualified (Domain "") <$> convIds, + channelsCount = Just $ length convIds + } :: + UserGroup + ) + + modifyUserGroupsGidOnly gid (Map.alter f) + +listUserGroupChannelsImpl :: + (UserGroupStoreInMemEffectConstraints r) => + UserGroupId -> + Sem r (Vector ConvId) +listUserGroupChannelsImpl gid = + foldMap (fmap qUnqualified) . (runIdentity . (.channels) . snd <=< find ((== gid) . snd . fst) . Map.toList) + <$> get @(Map (TeamId, UserGroupId) UserGroup) + ---------------------------------------------------------------------- modifyUserGroupsGidOnly :: diff --git a/libs/wire-subsystems/test/unit/Wire/UserGroupSubsystem/InterpreterSpec.hs b/libs/wire-subsystems/test/unit/Wire/UserGroupSubsystem/InterpreterSpec.hs index a6d75423ba..155a309d57 100644 --- a/libs/wire-subsystems/test/unit/Wire/UserGroupSubsystem/InterpreterSpec.hs +++ b/libs/wire-subsystems/test/unit/Wire/UserGroupSubsystem/InterpreterSpec.hs @@ -38,9 +38,13 @@ import Wire.API.UserEvent import Wire.API.UserGroup import Wire.API.UserGroup.Pagination import Wire.Arbitrary +import Wire.BackgroundJobsPublisher qualified as BackgroundJobsPublisher +import Wire.BackgroundJobsPublisher.Null qualified as BackgroundJobsPublisher import Wire.GalleyAPIAccess import Wire.MockInterpreters as Mock import Wire.NotificationSubsystem +import Wire.Sem.Random qualified as Random +import Wire.Sem.Random.Null qualified as Random import Wire.TeamSubsystem import Wire.TeamSubsystem.GalleyAPI import Wire.UserGroupSubsystem @@ -56,8 +60,10 @@ type AllDependencies = `Append` '[ Input (Local ()), MockNow, NotificationSubsystem, + BackgroundJobsPublisher.BackgroundJobsPublisher, State [Push], - Error UserGroupSubsystemError + Error UserGroupSubsystemError, + Random.Random ] runDependenciesFailOnError :: (HasCallStack) => [User] -> Map TeamId [TeamMember] -> Sem AllDependencies (IO ()) -> IO () @@ -70,8 +76,10 @@ runDependencies :: Either UserGroupSubsystemError a runDependencies initialUsers initialTeams = run + . Random.randomToNull . runError . evalState mempty + . BackgroundJobsPublisher.interpretBackgroundJobsPublisherNoConfig . inMemoryNotificationSubsystemInterpreter . evalState defaultTime . runInputConst (toLocalUnsafe (Domain "example.com") ()) @@ -87,8 +95,10 @@ runDependenciesWithReturnState :: Either UserGroupSubsystemError ([Push], a) runDependenciesWithReturnState initialUsers initialTeams = run + . Random.randomToNull . runError . runState mempty + . BackgroundJobsPublisher.interpretBackgroundJobsPublisherNoConfig . inMemoryNotificationSubsystemInterpreter . evalState defaultTime . runInputConst (toLocalUnsafe (Domain "example.com") ()) diff --git a/libs/wire-subsystems/wire-subsystems.cabal b/libs/wire-subsystems/wire-subsystems.cabal index ff514fc848..19d3a2baf0 100644 --- a/libs/wire-subsystems/wire-subsystems.cabal +++ b/libs/wire-subsystems/wire-subsystems.cabal @@ -182,6 +182,11 @@ library Wire.AuthenticationSubsystem.Interpreter Wire.AuthenticationSubsystem.ZAuth Wire.AWS + Wire.BackgroundJobsPublisher + Wire.BackgroundJobsPublisher.Null + Wire.BackgroundJobsPublisher.RabbitMQ + Wire.BackgroundJobsRunner + Wire.BackgroundJobsRunner.Interpreter Wire.BlockListStore Wire.BlockListStore.Cassandra Wire.BrigAPIAccess diff --git a/postgres-schema.sql b/postgres-schema.sql index c23af1aa97..70443f919e 100644 --- a/postgres-schema.sql +++ b/postgres-schema.sql @@ -92,9 +92,31 @@ CREATE TABLE public.user_group_member ( user_id uuid NOT NULL ); - ALTER TABLE public.user_group_member OWNER TO "wire-server"; +-- +-- Name: user_group_channel; Type: TABLE; Schema: public; Owner: wire-server +-- + +CREATE TABLE public.user_group_channel ( + user_group_id uuid NOT NULL, + conv_id uuid NOT NULL +); + + + +-- +-- Name: user_group_channel; Type: TABLE; Schema: public; Owner: wire-server +-- + +CREATE TABLE public.user_group_channel ( + user_group_id uuid NOT NULL, + conv_id uuid NOT NULL +); + + +ALTER TABLE public.user_group_channel OWNER TO "wire-server"; + -- -- Name: collaborators collaborators_pkey; Type: CONSTRAINT; Schema: public; Owner: wire-server -- @@ -119,6 +141,14 @@ ALTER TABLE ONLY public.user_group_member ADD CONSTRAINT user_group_member_pkey PRIMARY KEY (user_group_id, user_id); +-- +-- Name: user_group_channel user_group_member_pkey; Type: CONSTRAINT; Schema: public; Owner: wire-server +-- + +ALTER TABLE ONLY public.user_group_channel + ADD CONSTRAINT user_group_channel_pkey PRIMARY KEY (user_group_id, conv_id); + + -- -- Name: user_group user_group_pkey; Type: CONSTRAINT; Schema: public; Owner: wire-server -- @@ -149,6 +179,14 @@ ALTER TABLE ONLY public.user_group_member ADD CONSTRAINT fk_user_group FOREIGN KEY (user_group_id) REFERENCES public.user_group(id) ON DELETE CASCADE; +-- +-- Name: user_group_channel fk_user_group; Type: FK CONSTRAINT; Schema: public; Owner: wire-server +-- + +ALTER TABLE ONLY public.user_group_channel + ADD CONSTRAINT fk_user_group_channel FOREIGN KEY (user_group_id) REFERENCES public.user_group(id) ON DELETE CASCADE; + + -- -- Name: SCHEMA public; Type: ACL; Schema: -; Owner: wire-server -- diff --git a/services/background-worker/background-worker.cabal b/services/background-worker/background-worker.cabal index 25079919e4..758881060e 100644 --- a/services/background-worker/background-worker.cabal +++ b/services/background-worker/background-worker.cabal @@ -16,6 +16,8 @@ library Wire.BackgroundWorker Wire.BackgroundWorker.Env Wire.BackgroundWorker.Health + Wire.BackgroundWorker.Jobs.Consumer + Wire.BackgroundWorker.Jobs.Registry Wire.BackgroundWorker.Options Wire.BackgroundWorker.Util Wire.DeadUserNotificationWatcher @@ -35,14 +37,18 @@ library , bytestring-conversion , cassandra-util , containers + , data-timeout , exceptions , extended + , extra + , hasql-pool , HsOpenSSL , http-client , http2-manager , imports , metrics-wai , monad-control + , polysemy , prometheus-client , retry , servant-client @@ -56,6 +62,7 @@ library , wai-utilities , wire-api , wire-api-federation + , wire-subsystems default-extensions: AllowAmbiguousTypes diff --git a/services/background-worker/background-worker.integration.yaml b/services/background-worker/background-worker.integration.yaml index 60dc23a926..8add630a58 100644 --- a/services/background-worker/background-worker.integration.yaml +++ b/services/background-worker/background-worker.integration.yaml @@ -14,6 +14,12 @@ cassandra: port: 9042 keyspace: gundeck_test +cassandraBrig: + endpoint: + host: 127.0.0.1 + port: 9042 + keyspace: brig_test + rabbitmq: host: 127.0.0.1 port: 5671 @@ -28,3 +34,16 @@ backendNotificationPusher: pushBackoffMinWait: 1000 # 1ms pushBackoffMaxWait: 1000000 # 1s remotesRefreshInterval: 10000 # 10ms + +# Background jobs consumer configuration for integration +backgroundJobs: + concurrency: 4 + jobTimeout: 5 + maxAttempts: 3 + +postgresql: + host: 127.0.0.1 + port: "5432" + user: wire-server + dbname: backendA + password: posty-the-gres diff --git a/services/background-worker/default.nix b/services/background-worker/default.nix index 7ef4b6ab45..e3266a2416 100644 --- a/services/background-worker/default.nix +++ b/services/background-worker/default.nix @@ -11,10 +11,13 @@ , cassandra-util , containers , data-default +, data-timeout , exceptions , extended +, extra , federator , gitignoreSource +, hasql-pool , HsOpenSSL , hspec , http-client @@ -25,6 +28,7 @@ , lib , metrics-wai , monad-control +, polysemy , prometheus-client , QuickCheck , retry @@ -42,6 +46,7 @@ , wai-utilities , wire-api , wire-api-federation +, wire-subsystems }: mkDerivation { pname = "background-worker"; @@ -57,14 +62,18 @@ mkDerivation { bytestring-conversion cassandra-util containers + data-timeout exceptions extended + extra + hasql-pool HsOpenSSL http-client http2-manager imports metrics-wai monad-control + polysemy prometheus-client retry servant-client @@ -78,6 +87,7 @@ mkDerivation { wai-utilities wire-api wire-api-federation + wire-subsystems ]; executableHaskellDepends = [ HsOpenSSL imports types-common ]; testHaskellDepends = [ diff --git a/services/background-worker/src/Wire/BackgroundWorker.hs b/services/background-worker/src/Wire/BackgroundWorker.hs index e6110fb438..4fe9e2aefc 100644 --- a/services/background-worker/src/Wire/BackgroundWorker.hs +++ b/services/background-worker/src/Wire/BackgroundWorker.hs @@ -14,6 +14,7 @@ import Util.Options import Wire.BackendNotificationPusher qualified as BackendNotificationPusher import Wire.BackgroundWorker.Env import Wire.BackgroundWorker.Health qualified as Health +import Wire.BackgroundWorker.Jobs.Consumer qualified as Jobs import Wire.BackgroundWorker.Options import Wire.DeadUserNotificationWatcher qualified as DeadUserNotificationWatcher @@ -29,10 +30,14 @@ run opts = do runAppT env $ withNamedLogger "dead-user-notification-watcher" $ DeadUserNotificationWatcher.startWorker amqpEP + cleanupJobs <- + runAppT env $ + withNamedLogger "background-job-consumer" $ + Jobs.startWorker amqpEP let -- cleanup will run in a new thread when the signal is caught, so we need to use IORefs and -- specific exception types to message threads to clean up cleanup = do - concurrently_ cleanupDeadUserNotifWatcher cleanupBackendNotifPusher + concurrently_ cleanupDeadUserNotifWatcher (concurrently_ cleanupBackendNotifPusher cleanupJobs) let server = defaultServer (T.unpack $ opts.backgroundWorker.host) opts.backgroundWorker.port env.logger let settings = newSettings server -- Additional cleanup when shutting down via signals. diff --git a/services/background-worker/src/Wire/BackgroundWorker/Env.hs b/services/background-worker/src/Wire/BackgroundWorker/Env.hs index 93711e9d9b..753d0e25fa 100644 --- a/services/background-worker/src/Wire/BackgroundWorker/Env.hs +++ b/services/background-worker/src/Wire/BackgroundWorker/Env.hs @@ -10,7 +10,10 @@ import Control.Monad.Catch import Control.Monad.Trans.Control import Data.Map.Strict qualified as Map import HTTP2.Client.Manager +import Hasql.Pool qualified as HasqlPool +import Hasql.Pool.Extended (initPostgresPool) import Imports +import Network.AMQP qualified as Q import Network.AMQP.Extended import Network.HTTP.Client import Network.RabbitMqAdmin qualified as RabbitMqAdmin @@ -30,12 +33,14 @@ type IsWorking = Bool data Worker = BackendNotificationPusher | DeadUserNotificationWatcher + | BackgroundJobConsumer deriving (Eq, Ord) workerName :: Worker -> Text workerName = \case BackendNotificationPusher -> "backend-notification-pusher" DeadUserNotificationWatcher -> "dead-user-notification-watcher" + BackgroundJobConsumer -> "background-job-consumer" data Env = Env { http2Manager :: Http2Manager, @@ -47,9 +52,13 @@ data Env = Env defederationTimeout :: ResponseTimeout, backendNotificationMetrics :: BackendNotificationMetrics, backendNotificationsConfig :: BackendNotificationsConfig, + backgroundJobsConfig :: BackgroundJobsConfig, workerRunningGauge :: Vector Text Gauge, statuses :: IORef (Map Worker IsWorking), - cassandra :: ClientState + gundeckCassandra :: ClientState, + brigCassandra :: ClientState, + hasqlPool :: HasqlPool.Pool, + backgroundJobsQueue :: MVar Q.Channel } data BackendNotificationMetrics = BackendNotificationMetrics @@ -72,7 +81,8 @@ mkWorkerRunningGauge = mkEnv :: Opts -> IO Env mkEnv opts = do logger <- Log.mkLogger opts.logLevel Nothing opts.logFormat - cassandra <- defInitCassandra opts.cassandra logger + gundeckCassandra <- defInitCassandra opts.cassandra logger + brigCassandra <- defInitCassandra opts.cassandraBrig logger http2Manager <- initHttp2Manager httpManager <- newManager defaultManagerSettings let federatorInternal = opts.federatorInternal @@ -86,11 +96,17 @@ mkEnv opts = do statuses <- newIORef $ Map.fromList - [ (BackendNotificationPusher, False) + [ (BackendNotificationPusher, False), + (BackgroundJobConsumer, False) ] backendNotificationMetrics <- mkBackendNotificationMetrics let backendNotificationsConfig = opts.backendNotificationPusher + backgroundJobsConfig = opts.backgroundJobs workerRunningGauge <- mkWorkerRunningGauge + hasqlPool <- initPostgresPool opts.postgresql opts.postgresqlPassword + backgroundJobsQueue <- + mkRabbitMqChannelMVar logger (Just "background-worker") $ + either id demoteOpts opts.rabbitmq.unRabbitMqOpts pure Env {..} initHttp2Manager :: IO Http2Manager diff --git a/services/background-worker/src/Wire/BackgroundWorker/Jobs/Consumer.hs b/services/background-worker/src/Wire/BackgroundWorker/Jobs/Consumer.hs new file mode 100644 index 0000000000..4ab007cca8 --- /dev/null +++ b/services/background-worker/src/Wire/BackgroundWorker/Jobs/Consumer.hs @@ -0,0 +1,121 @@ +{-# LANGUAGE RecordWildCards #-} + +module Wire.BackgroundWorker.Jobs.Consumer (startWorker, BackgroundJobsMetrics (..)) where + +import Control.Concurrent.Timeout qualified as Timeout +import Control.Retry +import Data.Aeson qualified as Aeson +import Data.Range (Range (fromRange)) +import Data.Timeout +import Imports +import Network.AMQP qualified as Q +import Network.AMQP.Extended +import Network.AMQP.Lifted qualified as QL +import Prometheus +import System.Logger.Class qualified as Log +import System.Time.Extra (duration) +import UnliftIO +import Wire.API.BackgroundJobs +import Wire.BackgroundWorker.Env +import Wire.BackgroundWorker.Jobs.Registry +import Wire.BackgroundWorker.Options +import Wire.BackgroundWorker.Util (CleanupAction) + +data BackgroundJobsMetrics = BackgroundJobsMetrics + { workersBusy :: Gauge, + concurrencyConfigured :: Gauge, + jobsReceived :: Vector Text Counter, + jobsStarted :: Vector Text Counter, + jobsSucceeded :: Vector Text Counter, + jobsFailed :: Vector Text Counter, + jobsInvalid :: Vector Text Counter, + jobsRedelivered :: Vector Text Counter, + jobDuration :: Vector Text Histogram + } + +mkMetrics :: IO BackgroundJobsMetrics +mkMetrics = do + workersBusy <- register (gauge $ Info {metricName = "wire_background_jobs_workers_busy", metricHelp = "In-flight background jobs"}) + concurrencyConfigured <- register (gauge $ Info {metricName = "wire_background_jobs_concurrency_configured", metricHelp = "Configured concurrency for this process"}) + jobsReceived <- register (vector "job_type" $ counter $ Info "wire_background_jobs_received_total" "Jobs received") + jobsStarted <- register (vector "job_type" $ counter $ Info "wire_background_jobs_started_total" "Jobs started") + jobsSucceeded <- register (vector "job_type" $ counter $ Info "wire_background_jobs_succeeded_total" "Jobs succeeded") + jobsFailed <- register (vector "job_type" $ counter $ Info "wire_background_jobs_failed_total" "Jobs failed") + jobsInvalid <- register (vector "job_type" $ counter $ Info "wire_background_jobs_invalid_total" "Invalid jobs received") + jobsRedelivered <- register (vector "job_type" $ counter $ Info "wire_background_jobs_redelivered_total" "Jobs marked redelivered by broker") + jobDuration <- register (vector "job_type" $ histogram (Info "wire_background_jobs_duration_seconds" "Job duration seconds") defaultBuckets) + pure BackgroundJobsMetrics {..} + +startWorker :: AmqpEndpoint -> AppT IO CleanupAction +startWorker rabbitmqOpts = do + env <- ask + let cfg = env.backgroundJobsConfig + metrics <- liftIO mkMetrics + markAsNotWorking BackgroundJobConsumer + void . async . liftIO $ + openConnectionWithRetries env.logger rabbitmqOpts (Just "background-job-consumer") $ + RabbitMqHooks + { onNewChannel = \chan -> do + -- declare queue and set prefetch to concurrency + ensureBackgroundJobsQueue chan + Q.qos chan 0 (fromIntegral $ fromRange cfg.concurrency) False + -- set gauges + setGauge metrics.concurrencyConfigured (fromIntegral $ fromRange cfg.concurrency) + -- start consuming with manual ack and keep the channel alive + void $ QL.consumeMsgs chan backgroundJobsQueueName Q.Ack (void . runAppT env . handleDelivery metrics cfg) + runAppT env $ markAsWorking BackgroundJobConsumer + forever $ threadDelay maxBound, + onChannelException = \_ -> do + -- mark not working; TODO: only log unexpected exceptions + runAppT env $ markAsNotWorking BackgroundJobConsumer, + onConnectionClose = + runAppT env $ do + markAsNotWorking BackgroundJobConsumer + Log.info $ Log.msg (Log.val "RabbitMQ connection closed for background job consumer") + } + pure $ runAppT env $ cleanup + where + cleanup :: AppT IO () + cleanup = do + -- nothing to close explicitly; the AMQP helper closes channel/connection on shutdown + Log.info $ Log.msg (Log.val "Background job consumer cleanup") + markAsNotWorking BackgroundJobConsumer + +handleDelivery :: BackgroundJobsMetrics -> BackgroundJobsConfig -> (Q.Message, Q.Envelope) -> AppT IO () +handleDelivery metrics cfg (msg, env) = do + case Aeson.eitherDecode @Job (Q.msgBody msg) of + Left err -> do + withLabel metrics.jobsInvalid "invalid" incCounter + Log.err $ Log.msg (Log.val "Invalid background job JSON") . Log.field "error" err + Timeout.threadDelay (200 # MilliSecond) -- avoid tight redelivery loop + liftIO $ Q.rejectEnv env True + Right job -> do + let lbl = jobPayloadLabel job.payload + when (Q.envRedelivered env) $ withLabel metrics.jobsRedelivered lbl incCounter + withLabel metrics.jobsReceived lbl incCounter + UnliftIO.bracket_ (incGauge metrics.workersBusy) (decGauge metrics.workersBusy) $ do + outcome <- runAttempts lbl job + case outcome of + Right () -> do + withLabel metrics.jobsSucceeded lbl incCounter + liftIO $ Q.ackEnv env + Left e -> do + withLabel metrics.jobsFailed lbl incCounter + Log.err $ Log.msg (Log.val "Background job failed after retries") . Log.field "error" e + liftIO $ Q.rejectEnv env False + where + runAttempts :: Text -> Job -> AppT IO (Either Text ()) + runAttempts lbl job = do + let retries = max 0 (fromRange cfg.maxAttempts - 1) + policy = limitRetries retries <> fullJitterBackoff 100000 -- 100ms base + retrying policy shouldRetry $ \_rs -> do + withLabel metrics.jobsStarted lbl incCounter + (dur, r) <- + duration $ + fromMaybe (Left "job timeout") + <$> timeout (fromRange cfg.jobTimeout * 1000000) (dispatchJob job) + withLabel metrics.jobDuration lbl (`observe` dur) + pure r + where + shouldRetry _ (Right _) = pure False + shouldRetry _ (Left _) = pure True diff --git a/services/background-worker/src/Wire/BackgroundWorker/Jobs/Registry.hs b/services/background-worker/src/Wire/BackgroundWorker/Jobs/Registry.hs new file mode 100644 index 0000000000..b3a8f931b9 --- /dev/null +++ b/services/background-worker/src/Wire/BackgroundWorker/Jobs/Registry.hs @@ -0,0 +1,35 @@ +module Wire.BackgroundWorker.Jobs.Registry + ( dispatchJob, + ) +where + +import Data.Text qualified as T +import Hasql.Pool (Pool, UsageError) +import Imports +import Polysemy +import Polysemy.Error +import Polysemy.Input +import Wire.API.BackgroundJobs (Job (..)) +import Wire.BackgroundJobsPublisher.RabbitMQ (interpretBackgroundJobsPublisherRabbitMQ) +import Wire.BackgroundJobsRunner (runJob) +import Wire.BackgroundJobsRunner.Interpreter hiding (runJob) +import Wire.BackgroundWorker.Env (AppT, Env (..)) +import Wire.UserGroupStore.Postgres (interpretUserGroupStoreToPostgres) +import Wire.UserStore.Cassandra (interpretUserStoreCassandra) + +dispatchJob :: Job -> AppT IO (Either Text ()) +dispatchJob job = do + env <- ask @Env + liftIO $ runInterpreters env $ runJob job + where + runInterpreters env = + runM + . runError + . mapError @BackgroundJobError (T.pack . show) + . mapError @UsageError (T.pack . show) + . runInputConst @Pool env.hasqlPool + . interpretUserStoreCassandra env.brigCassandra + . interpretUserGroupStoreToPostgres + . runInputSem (readMVar env.backgroundJobsQueue) + . interpretBackgroundJobsPublisherRabbitMQ job.requestId + . interpretBackgroundJobsRunner diff --git a/services/background-worker/src/Wire/BackgroundWorker/Options.hs b/services/background-worker/src/Wire/BackgroundWorker/Options.hs index f9055d89e0..14f5eb94b4 100644 --- a/services/background-worker/src/Wire/BackgroundWorker/Options.hs +++ b/services/background-worker/src/Wire/BackgroundWorker/Options.hs @@ -1,6 +1,8 @@ module Wire.BackgroundWorker.Options where import Data.Aeson +import Data.Range (Range) +import GHC.Generics import Imports import Network.AMQP.Extended import System.Logger.Extended @@ -15,11 +17,16 @@ data Opts = Opts -- | Seconds, Nothing for no timeout defederationTimeout :: Maybe Int, backendNotificationPusher :: BackendNotificationsConfig, - cassandra :: CassandraOpts + cassandra :: CassandraOpts, + cassandraBrig :: CassandraOpts, + backgroundJobs :: BackgroundJobsConfig, + -- | Postgresql settings, the key values must be in libpq format. + -- https://www.postgresql.org/docs/17/libpq-connect.html#LIBPQ-PARAMKEYWORDS + postgresql :: !(Map Text Text), + postgresqlPassword :: !(Maybe FilePathSecrets) } deriving (Show, Generic) - -instance FromJSON Opts + deriving (FromJSON) via Generically Opts data BackendNotificationsConfig = BackendNotificationsConfig { -- | Minimum amount of time (in microseconds) to wait before doing the first @@ -36,8 +43,7 @@ data BackendNotificationsConfig = BackendNotificationsConfig remotesRefreshInterval :: Int } deriving (Show, Generic) - -instance FromJSON BackendNotificationsConfig + deriving (FromJSON) via Generically BackendNotificationsConfig newtype RabbitMqOpts = RabbitMqOpts {unRabbitMqOpts :: Either AmqpEndpoint RabbitMqAdminOpts} deriving (Show) @@ -48,3 +54,14 @@ instance FromJSON RabbitMqOpts where <$> ( (Right <$> parseJSON v) <|> (Left <$> parseJSON v) ) + +data BackgroundJobsConfig = BackgroundJobsConfig + { -- | Maximum parallel jobs processed by this process + concurrency :: Range 1 1000 Int, + -- | Per-attempt timeout (seconds) + jobTimeout :: Range 1 1000 Int, + -- | Total attempts including first run + maxAttempts :: Range 1 1000 Int + } + deriving (Show, Generic) + deriving (FromJSON) via Generically BackgroundJobsConfig diff --git a/services/background-worker/src/Wire/DeadUserNotificationWatcher.hs b/services/background-worker/src/Wire/DeadUserNotificationWatcher.hs index 802f89eda6..d4bb3c1d0a 100644 --- a/services/background-worker/src/Wire/DeadUserNotificationWatcher.hs +++ b/services/background-worker/src/Wire/DeadUserNotificationWatcher.hs @@ -34,7 +34,7 @@ startConsumer chan = do env <- ask markAsWorking DeadUserNotificationWatcher - cassandra <- asks (.cassandra) + cassandra <- asks (.gundeckCassandra) void . lift $ Q.declareQueue diff --git a/services/background-worker/test/Test/Wire/BackendNotificationPusherSpec.hs b/services/background-worker/test/Test/Wire/BackendNotificationPusherSpec.hs index 9e88cbe9f0..639f0e2126 100644 --- a/services/background-worker/test/Test/Wire/BackendNotificationPusherSpec.hs +++ b/services/background-worker/test/Test/Wire/BackendNotificationPusherSpec.hs @@ -322,7 +322,8 @@ spec = do ] logger <- Logger.new Logger.defSettings httpManager <- newManager defaultManagerSettings - let cassandra = undefined + let gundeckCassandra = undefined + brigCassandra = undefined let federatorInternal = Endpoint "localhost" 8097 http2Manager = undefined statuses = undefined @@ -330,6 +331,14 @@ spec = do rabbitmqVHost = "test-vhost" defederationTimeout = responseTimeoutNone backendNotificationsConfig = BackendNotificationsConfig 1000 500000 1000 + backgroundJobsConfig = + BackgroundJobsConfig + { concurrency = toRange (Proxy @1), + jobTimeout = toRange (Proxy @100), + maxAttempts = toRange (Proxy @3) + } + hasqlPool = undefined + backgroundJobsQueue = undefined backendNotificationMetrics <- mkBackendNotificationMetrics workerRunningGauge <- mkWorkerRunningGauge @@ -340,15 +349,24 @@ spec = do it "should retry fetching domains if a request fails" $ do mockAdmin <- newMockRabbitMqAdmin True ["backend-notifications.foo.example"] logger <- Logger.new Logger.defSettings - let cassandra = undefined httpManager <- newManager defaultManagerSettings let federatorInternal = Endpoint "localhost" 8097 + gundeckCassandra = undefined + brigCassandra = undefined http2Manager = undefined statuses = undefined rabbitmqAdminClient = Just $ mockRabbitMqAdminClient mockAdmin rabbitmqVHost = "test-vhost" defederationTimeout = responseTimeoutNone backendNotificationsConfig = BackendNotificationsConfig 1000 500000 1000 + backgroundJobsConfig = + BackgroundJobsConfig + { concurrency = toRange (Proxy @1), + jobTimeout = toRange (Proxy @100), + maxAttempts = toRange (Proxy @3) + } + hasqlPool = undefined + backgroundJobsQueue = undefined backendNotificationMetrics <- mkBackendNotificationMetrics workerRunningGauge <- mkWorkerRunningGauge domainsThread <- async $ runAppT Env {..} $ getRemoteDomains (fromJust rabbitmqAdminClient) diff --git a/services/background-worker/test/Test/Wire/Util.hs b/services/background-worker/test/Test/Wire/Util.hs index cb4eeef5f9..0fc43f503e 100644 --- a/services/background-worker/test/Test/Wire/Util.hs +++ b/services/background-worker/test/Test/Wire/Util.hs @@ -2,8 +2,10 @@ module Test.Wire.Util where +import Data.Proxy +import Data.Range import Imports -import Network.HTTP.Client +import Network.HTTP.Client hiding (Proxy) import System.Logger.Class qualified as Logger import Util.Options (Endpoint (..)) import Wire.BackgroundWorker.Env hiding (federatorInternal) @@ -14,7 +16,8 @@ testEnv :: IO Env testEnv = do http2Manager <- initHttp2Manager logger <- Logger.new Logger.defSettings - let cassandra = undefined + let gundeckCassandra = undefined + brigCassandra = undefined statuses <- newIORef mempty backendNotificationMetrics <- mkBackendNotificationMetrics workerRunningGauge <- mkWorkerRunningGauge @@ -24,6 +27,14 @@ testEnv = do rabbitmqVHost = undefined defederationTimeout = responseTimeoutNone backendNotificationsConfig = BackendNotificationsConfig 1000 500000 1000 + backgroundJobsConfig = + BackgroundJobsConfig + { concurrency = toRange (Proxy @1), + jobTimeout = toRange (Proxy @100), + maxAttempts = toRange (Proxy @3) + } + hasqlPool = undefined + backgroundJobsQueue = undefined pure Env {..} runTestAppT :: AppT IO a -> Int -> IO a diff --git a/services/brig/brig.cabal b/services/brig/brig.cabal index 67449f7bb0..4d921d809b 100644 --- a/services/brig/brig.cabal +++ b/services/brig/brig.cabal @@ -218,7 +218,7 @@ library , amqp , async >=2.1 , auto-update >=0.1 - , base >=4 && <5 + , base >=4 && <5 , base-prelude , base16-bytestring >=0.1 , base64-bytestring >=1.0 @@ -314,6 +314,7 @@ library , uri-bytestring >=0.2 , utf8-string , uuid >=1.3.5 + , vector >=0.13.2.0 , wai >=3.0 , wai-extra >=3.0 , wai-middleware-gunzip >=0.0.2 diff --git a/services/brig/default.nix b/services/brig/default.nix index 550ed4212b..5827352d74 100644 --- a/services/brig/default.nix +++ b/services/brig/default.nix @@ -266,6 +266,7 @@ mkDerivation { uri-bytestring utf8-string uuid + vector wai wai-extra wai-middleware-gunzip diff --git a/services/brig/src/Brig/API/Public.hs b/services/brig/src/Brig/API/Public.hs index 259ab37c89..9d620de83c 100644 --- a/services/brig/src/Brig/API/Public.hs +++ b/services/brig/src/Brig/API/Public.hs @@ -85,6 +85,7 @@ import Data.Qualified import Data.Range import Data.Schema () import Data.Text.Encoding qualified as Text +import Data.Vector qualified as Vector import Data.ZAuth.CryptoSign (CryptoSign) import Data.ZAuth.Token qualified as ZAuth import FileEmbedLzma @@ -1676,7 +1677,20 @@ createUserGroup :: (_) => Local UserId -> NewUserGroup -> Handler r UserGroup createUserGroup lusr newUserGroup = lift . liftSem $ UserGroup.createGroup (tUnqualified lusr) newUserGroup getUserGroup :: (_) => Local UserId -> UserGroupId -> Bool -> Handler r (Maybe UserGroup) -getUserGroup lusr ugid _ = lift . liftSem $ UserGroup.getGroup (tUnqualified lusr) ugid +getUserGroup lusr ugid includeChannels = + lift . liftSem $ do + mUserGroup <- UserGroup.getGroup (tUnqualified lusr) ugid + if includeChannels + then forM mUserGroup $ \userGroup -> do + fetchedChannels <- + fmap (tUntagged . qualifyAs lusr) + <$> UserGroup.listChannels (tUnqualified lusr) userGroup.id_ + pure + userGroup + { channels = Identity $ Just fetchedChannels, + channelsCount = Just $ Vector.length fetchedChannels + } + else pure mUserGroup getUserGroups :: (_) => @@ -1689,9 +1703,22 @@ getUserGroups :: Maybe UTCTimeMillis -> Maybe UserGroupId -> Bool -> + Bool -> Handler r UserGroupPage -getUserGroups lusr q sortByKeys sortOrder pSize mLastName mLastCreatedAt mLastId includeMemberCount = - lift . liftSem $ UserGroup.getGroups (tUnqualified lusr) q sortByKeys sortOrder pSize mLastName mLastCreatedAt mLastId includeMemberCount +getUserGroups lusr q sortByKeys sortOrder pSize mLastName mLastCreatedAt mLastId includeChannels includeMemberCount = + lift . liftSem $ do + userGroups <- UserGroup.getGroups (tUnqualified lusr) q sortByKeys sortOrder pSize mLastName mLastCreatedAt mLastId includeMemberCount + if includeChannels + then do + newPage <- + forM userGroups.page $ \userGroup -> do + fetchedChannels <- UserGroup.listChannels (tUnqualified lusr) userGroup.id_ + pure + userGroup + { channelsCount = Just $ Vector.length fetchedChannels + } + pure userGroups {page = newPage} + else pure userGroups updateUserGroup :: (_) => Local UserId -> UserGroupId -> UserGroupUpdate -> (Handler r) () updateUserGroup lusr gid gupd = lift . liftSem $ UserGroup.updateGroup (tUnqualified lusr) gid gupd @@ -1711,8 +1738,8 @@ removeUserFromGroup lusr gid mid = lift . liftSem $ UserGroup.removeUser (tUnqua updateUserGroupMembers :: (_) => Local UserId -> UserGroupId -> UpdateUserGroupMembers -> Handler r () updateUserGroupMembers lusr gid gupd = lift . liftSem $ UserGroup.updateUsers (tUnqualified lusr) gid gupd.members -updateUserGroupChannels :: Local UserId -> UserGroupId -> UpdateUserGroupChannels -> Handler r () -updateUserGroupChannels _ _ _ = pure () +updateUserGroupChannels :: (_) => Local UserId -> UserGroupId -> UpdateUserGroupChannels -> Handler r () +updateUserGroupChannels lusr gid upd = lift . liftSem $ UserGroup.updateChannels (tUnqualified lusr) gid upd.channels checkUserGroupNameAvailable :: Local UserId -> CheckUserGroupName -> Handler r UserGroupNameAvailability checkUserGroupNameAvailable _ _ = pure $ UserGroupNameAvailability True diff --git a/services/brig/src/Brig/App.hs b/services/brig/src/Brig/App.hs index 7b2f67979d..6b2411232f 100644 --- a/services/brig/src/Brig/App.hs +++ b/services/brig/src/Brig/App.hs @@ -71,6 +71,7 @@ module Brig.App disabledVersionsLens, enableSFTFederationLens, rateLimitEnvLens, + backgroundJobsQueueLens, initZAuth, initLogger, initPostgresPool, @@ -218,7 +219,8 @@ data Env = Env rabbitmqChannel :: Maybe (MVar Q.Channel), disabledVersions :: Set Version, enableSFTFederation :: Maybe Bool, - rateLimitEnv :: RateLimitEnv + rateLimitEnv :: RateLimitEnv, + backgroundJobsQueue :: Maybe (MVar Q.Channel) } makeLensesWith (lensRules & lensField .~ suffixNamer) ''Env @@ -280,6 +282,7 @@ newEnv opts = do idxEnv <- mkIndexEnv opts.elasticsearch lgr (Opt.galley opts) mgr rateLimitEnv <- newRateLimitEnv opts.settings.passwordHashingRateLimit hasqlPool <- initPostgresPool opts.postgresql opts.postgresqlPassword + backgroundJobsQueue <- traverse (Q.mkRabbitMqChannelMVar lgr (Just "brig")) opts.rabbitmq pure $! Env { cargohold = mkEndpoint $ opts.cargohold, @@ -319,7 +322,8 @@ newEnv opts = do rabbitmqChannel = rabbitChan, disabledVersions = allDisabledVersions, enableSFTFederation = opts.multiSFT, - rateLimitEnv + rateLimitEnv, + backgroundJobsQueue } where emailConn _ (Opt.EmailAWS aws) = pure (Just aws, Nothing) diff --git a/services/brig/src/Brig/CanonicalInterpreter.hs b/services/brig/src/Brig/CanonicalInterpreter.hs index 6e968f75ae..0fb1acf4f7 100644 --- a/services/brig/src/Brig/CanonicalInterpreter.hs +++ b/services/brig/src/Brig/CanonicalInterpreter.hs @@ -45,6 +45,8 @@ import Wire.AppSubsystem.Interpreter import Wire.AuthenticationSubsystem import Wire.AuthenticationSubsystem.Config import Wire.AuthenticationSubsystem.Interpreter +import Wire.BackgroundJobsPublisher (BackgroundJobsPublisher) +import Wire.BackgroundJobsPublisher.RabbitMQ (interpretBackgroundJobsPublisherRabbitMQOptional) import Wire.BlockListStore import Wire.BlockListStore.Cassandra import Wire.DeleteQueue @@ -153,6 +155,7 @@ type BrigLowerLevelEffects = DeleteQueue, Wire.Events.Events, NotificationSubsystem, + BackgroundJobsPublisher, RateLimit, UserGroupStore, Error AppSubsystemError, @@ -363,6 +366,7 @@ runBrigToIO e (AppT ma) = do . mapError appSubsystemErrorToHttpError . interpretUserGroupStoreToPostgres . interpretRateLimit e.rateLimitEnv + . interpretBackgroundJobsPublisherRabbitMQOptional e.requestId e.backgroundJobsQueue . runNotificationSubsystemGundeck (defaultNotificationSubsystemConfig e.requestId) . runEvents . runDeleteQueue e.internalEvents