diff --git a/deploy/dockerephemeral/docker-compose.yaml b/deploy/dockerephemeral/docker-compose.yaml index 96eb142a54..7b5da3bf40 100644 --- a/deploy/dockerephemeral/docker-compose.yaml +++ b/deploy/dockerephemeral/docker-compose.yaml @@ -317,35 +317,26 @@ services: networks: - demo_wire - rabbitmq: - container_name: rabbitmq - image: rabbitmq:4.1.4-management-alpine + nats: + container_name: nats + image: nats:2.10-alpine + command: > + -js + -m 8222 environment: - - RABBITMQ_USERNAME - - RABBITMQ_PASSWORD + - NATS_USERNAME=${NATS_USERNAME:-guest} + - NATS_PASSWORD=${NATS_PASSWORD:-guest} ports: - - "127.0.0.1:5671:5671" - - "127.0.0.1:15671:15671" - - "127.0.0.1:15672:15672" - volumes: - - ./rabbitmq-config/rabbitmq.conf:/etc/rabbitmq/conf.d/20-wire.conf - - ./rabbitmq-config/certificates:/etc/rabbitmq/certificates - networks: - - demo_wire - - init_vhosts: - image: alpine/curl:3.14 - environment: - - RABBITMQ_USERNAME=${RABBITMQ_USERNAME} - - RABBITMQ_PASSWORD=${RABBITMQ_PASSWORD} - depends_on: - - rabbitmq - entrypoint: /scripts/init_vhosts.sh - volumes: - - ./:/scripts - - ./rabbitmq-config/certificates/ca.pem:/etc/rabbitmq-ca.pem + - "127.0.0.1:4222:4222" # NATS client port + - "127.0.0.1:8222:8222" # NATS monitoring port + - "127.0.0.1:6222:6222" # NATS cluster port networks: - demo_wire + healthcheck: + test: ["CMD", "wget", "--spider", "-q", "http://localhost:8222/healthz"] + interval: 10s + timeout: 5s + retries: 3 # FIXME: replace aws_cli with an image that we build. aws_cli: diff --git a/docs/src/developer/reference/config-options.md b/docs/src/developer/reference/config-options.md index 02d66cd070..e042e66f65 100644 --- a/docs/src/developer/reference/config-options.md +++ b/docs/src/developer/reference/config-options.md @@ -1579,6 +1579,41 @@ gundeck: insecureSkipVerifyTls: true ``` +## Configure Messaging (NATS) + +**Note:** Wire server is migrating from RabbitMQ to NATS. This section documents the new NATS configuration. + +NATS authentication must be configured on brig, galley, gundeck, cannon, and background-worker. For example: + +```yaml +nats: + host: localhost + port: 4222 + namespace: "" # Optional subject prefix + adminHost: localhost + adminPort: 8222 # for background-worker +``` + +The `adminHost` and `adminPort` settings are only needed by background-worker for monitoring. + +### Environment Variables + +NATS credentials are read from environment variables: + +```bash +export NATS_USERNAME=guest +export NATS_PASSWORD=guest +``` + +### Migration from RabbitMQ + +If you are migrating from RabbitMQ, note the following changes: + +1. **Subject-based routing**: NATS uses subjects instead of exchanges/queues +2. **No vHost**: NATS uses a `namespace` prefix instead +3. **Simpler configuration**: Fewer options needed +4. **Different port**: Default is 4222 instead of 5672 + ## Configure RabbitMQ RabbitMQ authentication must be configured on brig, galley and background-worker. For example: diff --git a/libs/extended/extended.cabal b/libs/extended/extended.cabal index 6ac6bc54a8..e37647fed1 100644 --- a/libs/extended/extended.cabal +++ b/libs/extended/extended.cabal @@ -22,6 +22,8 @@ library Data.Time.Clock.DiffTime Hasql.Pool.Extended Network.AMQP.Extended + Network.NATS.Client + Network.NATS.Extended Network.RabbitMqAdmin Servant.API.Extended Servant.API.Extended.Endpath @@ -97,6 +99,8 @@ library , imports , metrics-wai , monad-control + , network + , random , retry , servant , servant-client diff --git a/libs/extended/src/Network/NATS/Client.hs b/libs/extended/src/Network/NATS/Client.hs new file mode 100644 index 0000000000..5316740a65 --- /dev/null +++ b/libs/extended/src/Network/NATS/Client.hs @@ -0,0 +1,264 @@ +{-# LANGUAGE RecordWildCards #-} + +-- | Basic NATS client implementation +-- This is a minimal implementation to replace RabbitMQ (AMQP) functionality +-- +-- NATS Protocol Documentation: https://docs.nats.io/reference/reference-protocols/nats-protocol +module Network.NATS.Client + ( NatsConnection, + NatsConnectionOpts (..), + NatsChannel, + NatsMessage (..), + NatsEnvelope (..), + openConnection, + closeConnection, + createChannel, + closeChannel, + publish, + subscribe, + unsubscribe, + ack, + nack, + defaultConnectionOpts, + ) +where + +import Control.Concurrent (forkIO) +import Control.Concurrent.MVar +import Control.Exception (bracket, throw, throwIO, try, catch, SomeException) +import Control.Monad (forever, void, when) +import Data.Aeson (FromJSON, ToJSON, decode, encode) +import Data.ByteString (ByteString) +import Data.ByteString qualified as BS +import Data.ByteString.Char8 qualified as BS8 +import Data.ByteString.Lazy qualified as LBS +import Data.IORef +import Data.Map.Strict (Map) +import Data.Map.Strict qualified as Map +import Data.Text (Text) +import Data.Text qualified as Text +import Data.Text.Encoding qualified as Text +import Data.Word (Word64) +import Network.Socket hiding (send, recv) +import Network.Socket.ByteString (recv, sendAll) +import Imports hiding (take) +import System.IO (Handle, hClose, hFlush, hGetLine, hPutStr) +import System.Random (randomIO) + +-- | NATS connection options +data NatsConnectionOpts = NatsConnectionOpts + { natsServers :: [(String, Int)], + natsAuth :: Maybe (Text, Text), -- username, password + natsName :: Maybe Text, + natsVerbose :: Bool, + natsPedantic :: Bool, + natsToken :: Maybe Text + } + +defaultConnectionOpts :: NatsConnectionOpts +defaultConnectionOpts = + NatsConnectionOpts + { natsServers = [("127.0.0.1", 4222)], + natsAuth = Nothing, + natsName = Nothing, + natsVerbose = False, + natsPedantic = False, + natsToken = Nothing + } + +-- | Represents a NATS connection +data NatsConnection = NatsConnection + { connSocket :: Socket, + connNextSid :: IORef Word64, + connSubscriptions :: IORef (Map Word64 (MVar NatsMessage)), + connClosed :: IORef Bool + } + +-- | Represents a NATS channel (for compatibility with AMQP interface) +-- In NATS, channels are lightweight and don't have the same semantics as AMQP +newtype NatsChannel = NatsChannel NatsConnection + +-- | NATS message +data NatsMessage = NatsMessage + { msgSubject :: Text, + msgBody :: LBS.ByteString, + msgReplyTo :: Maybe Text, + msgHeaders :: Map Text Text + } + deriving (Show) + +-- | Envelope for message acknowledgment (for compatibility with AMQP) +data NatsEnvelope = NatsEnvelope + { envDeliveryTag :: Word64, + envSubject :: Text + } + deriving (Show) + +-- | Open a NATS connection +openConnection :: NatsConnectionOpts -> IO NatsConnection +openConnection opts = do + -- Try to connect to first available server + conn <- connectToServer (head opts.natsServers) + + -- Send CONNECT message + sendConnect conn opts + + -- Read INFO and +OK from server + receiveServerInfo conn + + -- Initialize connection state + nextSid <- newIORef 0 + subs <- newIORef Map.empty + closed <- newIORef False + + let natsConn = NatsConnection conn nextSid subs closed + + -- Start message reader thread + void $ forkIO $ messageReader natsConn + + pure natsConn + where + connectToServer :: (String, Int) -> IO Socket + connectToServer (host, port) = do + addr <- resolve host (show port) + sock <- open addr + pure sock + + resolve :: String -> String -> IO AddrInfo + resolve host port = do + let hints = defaultHints {addrSocketType = Stream} + head <$> getAddrInfo (Just hints) (Just host) (Just port) + + open :: AddrInfo -> IO Socket + open addr = do + sock <- socket (addrFamily addr) (addrSocketType addr) (addrProtocol addr) + connect sock (addrAddress addr) + pure sock + +-- | Send CONNECT message to NATS server +sendConnect :: Socket -> NatsConnectionOpts -> IO () +sendConnect sock opts = do + -- Simplified CONNECT message + -- In production, this should be JSON with all options + let connectMsg = "CONNECT {\"verbose\":false,\"pedantic\":false,\"name\":\"wire-server\"}\r\n" + sendAll sock (BS8.pack connectMsg) + +-- | Receive server INFO +receiveServerInfo :: Socket -> IO () +receiveServerInfo sock = do + -- Read INFO line + void $ recvLine sock + -- Read +OK or -ERR + void $ recvLine sock + +-- | Receive a line from socket +recvLine :: Socket -> IO ByteString +recvLine sock = go BS.empty + where + go acc = do + chunk <- recv sock 1 + if BS.null chunk + then pure acc + else if chunk == BS8.pack "\n" + then pure acc + else go (acc <> chunk) + +-- | Message reader thread +messageReader :: NatsConnection -> IO () +messageReader conn = forever $ do + closed <- readIORef conn.connClosed + when (not closed) $ do + line <- recvLine conn.connSocket + parseAndDispatch conn line + `catch` \(_ :: SomeException) -> pure () + +-- | Parse and dispatch incoming messages +parseAndDispatch :: NatsConnection -> ByteString -> IO () +parseAndDispatch conn line = do + let parts = BS8.words line + case parts of + ("MSG" : subject : sid : msgBytes : _) -> do + let sidNum = read (BS8.unpack sid) :: Word64 + let numBytes = read (BS8.unpack msgBytes) :: Int + -- Read message payload + payload <- recvExact conn.connSocket numBytes + -- Skip \r\n after payload + void $ recv conn.connSocket 2 + + subs <- readIORef conn.connSubscriptions + case Map.lookup sidNum subs of + Just mvar -> do + let msg = NatsMessage + { msgSubject = Text.decodeUtf8 subject, + msgBody = LBS.fromStrict payload, + msgReplyTo = Nothing, + msgHeaders = Map.empty + } + void $ tryPutMVar mvar msg + Nothing -> pure () + _ -> pure () + +-- | Receive exact number of bytes +recvExact :: Socket -> Int -> IO ByteString +recvExact sock n = go BS.empty n + where + go acc remaining + | remaining <= 0 = pure acc + | otherwise = do + chunk <- recv sock remaining + if BS.null chunk + then pure acc + else go (acc <> chunk) (remaining - BS.length chunk) + +-- | Close a NATS connection +closeConnection :: NatsConnection -> IO () +closeConnection conn = do + writeIORef conn.connClosed True + close conn.connSocket + +-- | Create a channel (for AMQP compatibility) +createChannel :: NatsConnection -> IO NatsChannel +createChannel = pure . NatsChannel + +-- | Close a channel +closeChannel :: NatsChannel -> IO () +closeChannel _ = pure () -- NATS doesn't have channels + +-- | Publish a message +publish :: NatsChannel -> Text -> ByteString -> IO () +publish (NatsChannel conn) subject payload = do + let msgSize = BS.length payload + let pubMsg = BS8.pack $ "PUB " <> Text.unpack subject <> " " <> show msgSize <> "\r\n" + sendAll conn.connSocket pubMsg + sendAll conn.connSocket payload + sendAll conn.connSocket (BS8.pack "\r\n") + +-- | Subscribe to a subject +subscribe :: NatsChannel -> Text -> IO Word64 +subscribe (NatsChannel conn) subject = do + sid <- atomicModifyIORef' conn.connNextSid (\s -> (s + 1, s + 1)) + mvar <- newEmptyMVar + atomicModifyIORef' conn.connSubscriptions $ \subs -> + (Map.insert sid mvar subs, ()) + + let subMsg = BS8.pack $ "SUB " <> Text.unpack subject <> " " <> show sid <> "\r\n" + sendAll conn.connSocket subMsg + pure sid + +-- | Unsubscribe from a subject +unsubscribe :: NatsChannel -> Word64 -> IO () +unsubscribe (NatsChannel conn) sid = do + atomicModifyIORef' conn.connSubscriptions $ \subs -> + (Map.delete sid subs, ()) + let unsubMsg = BS8.pack $ "UNSUB " <> show sid <> "\r\n" + sendAll conn.connSocket unsubMsg + +-- | Acknowledge a message (for AMQP compatibility) +-- NATS doesn't have built-in ACKs, but JetStream does +-- For basic NATS, this is a no-op +ack :: NatsChannel -> Word64 -> IO () +ack _ _ = pure () + +-- | Negative acknowledgment (for AMQP compatibility) +nack :: NatsChannel -> Word64 -> Bool -> IO () +nack _ _ _ = pure () diff --git a/libs/extended/src/Network/NATS/Extended.hs b/libs/extended/src/Network/NATS/Extended.hs new file mode 100644 index 0000000000..a4d4f64e8a --- /dev/null +++ b/libs/extended/src/Network/NATS/Extended.hs @@ -0,0 +1,194 @@ +{-# LANGUAGE RecordWildCards #-} + +-- | NATS Extended - Compatibility layer replacing Network.AMQP.Extended +-- +-- This module provides a similar interface to Network.AMQP.Extended but uses NATS +-- instead of RabbitMQ for messaging. +module Network.NATS.Extended + ( NatsHooks (..), + NatsAdminOpts (..), + NatsEndpoint (..), + openConnectionWithRetries, + mkNatsChannelMVar, + defaultNatsOpts, + demoteNatsOpts, + readCredsFromEnv, + ) +where + +import Control.Exception (AsyncException, throwIO, throw) +import Control.Monad.Catch +import Control.Monad.Trans.Control +import Control.Monad.Trans.Maybe +import Control.Retry +import Data.Aeson +import Data.Aeson.Types +import Data.Text qualified as Text +import Data.Text.Encoding qualified as Text +import Imports +import Network.NATS.Client qualified as NATS +import System.Logger (Logger) +import System.Logger qualified as Log +import UnliftIO.Async + +-- | Hooks for NATS connection lifecycle management +data NatsHooks m = NatsHooks + { -- | Called whenever there is a new channel + onNewChannel :: NATS.NatsChannel -> m (), + -- | Called when connection is closed + onConnectionClose :: m (), + -- | Called when an error occurs + onChannelException :: SomeException -> m () + } + +-- | NATS endpoint configuration +data NatsEndpoint = NatsEndpoint + { host :: !String, + port :: !Int, + -- | In NATS, this could map to a subject prefix + namespace :: !Text + } + deriving (Eq, Show) + +instance FromJSON NatsEndpoint where + parseJSON = withObject "NatsEndpoint" $ \v -> + NatsEndpoint + <$> v .: "host" + <*> v .: "port" + <*> v .:? "namespace" .!= "" + +-- | NATS admin options (for management API) +data NatsAdminOpts = NatsAdminOpts + { host :: !String, + port :: !Int, + namespace :: !Text, + adminHost :: !String, + adminPort :: !Int + } + deriving (Eq, Show) + +instance FromJSON NatsAdminOpts where + parseJSON = withObject "NatsAdminOpts" $ \v -> + NatsAdminOpts + <$> v .: "host" + <*> v .: "port" + <*> v .:? "namespace" .!= "" + <*> v .: "adminHost" + <*> v .: "adminPort" + +-- | Default NATS connection options +defaultNatsOpts :: NatsEndpoint +defaultNatsOpts = + NatsEndpoint + { host = "127.0.0.1", + port = 4222, + namespace = "" + } + +-- | Create a NATS channel MVar (similar to mkRabbitMqChannelMVar) +mkNatsChannelMVar :: Logger -> Maybe Text -> NatsEndpoint -> IO (MVar NATS.NatsChannel) +mkNatsChannelMVar l connName opts = do + chanMVar <- newEmptyMVar + connThread <- + async . openConnectionWithRetries l opts connName $ + NatsHooks + { onNewChannel = \conn -> putMVar chanMVar conn >> forever (threadDelay maxBound), + onChannelException = \_ -> void $ tryTakeMVar chanMVar, + onConnectionClose = void $ tryTakeMVar chanMVar + } + waitForConnThread <- async $ withMVar chanMVar $ \_ -> pure () + waitEither connThread waitForConnThread >>= \case + Left () -> throwIO $ NatsConnectionFailed "connection thread finished before getting connection" + Right () -> pure chanMVar + +data NatsConnectionError = NatsConnectionFailed String + deriving (Show) + +instance Exception NatsConnectionError + +-- | Open a NATS connection with automatic retries +openConnectionWithRetries :: + forall m. + (MonadIO m, MonadMask m, MonadBaseControl IO m) => + Logger -> + NatsEndpoint -> + Maybe Text -> + NatsHooks m -> + m () +openConnectionWithRetries l NatsEndpoint {..} connName hooks = do + (username, password) <- liftIO $ readCredsFromEnv + connectWithRetries username password + where + connectWithRetries :: Text -> Text -> m () + connectWithRetries username password = do + -- Jittered exponential backoff with 1ms as starting delay and 5s as max delay + let policy = capDelay 5_000_000 $ fullJitterBackoff 1000 + logError willRetry e retryStatus = do + Log.err l $ + Log.msg (Log.val "Failed to connect to NATS") + . Log.field "error" (displayException @SomeException e) + . Log.field "willRetry" willRetry + . Log.field "retryCount" retryStatus.rsIterNumber + getConn = do + Log.info l $ Log.msg (Log.val "Trying to connect to NATS") + conn <- + recovering + policy + ( logAndSkipAsyncExceptions l + <> [logRetries (const $ pure True) logError] + ) + ( const $ do + Log.info l $ Log.msg (Log.val "Opening NATS connection") + let connOpts = + NATS.defaultConnectionOpts + { NATS.natsServers = [(host, port)], + NATS.natsAuth = Just (username, password), + NATS.natsName = connName + } + liftIO $ NATS.openConnection connOpts + ) + Log.info l $ Log.msg (Log.val "NATS connection established") + pure conn + bracket getConn (liftIO . NATS.closeConnection) $ \conn -> do + chan <- liftIO $ NATS.createChannel conn + -- Note: NATS doesn't have the same connection closed handler mechanism + -- This is a simplified version + hooks.onNewChannel chan + `catch` \(e :: SomeException) -> do + logException l "onNewChannel hook threw an exception" e + hooks.onChannelException e + throwM e + +-- | List of pre-made handlers that will skip retries on AsyncException +logAndSkipAsyncExceptions :: (MonadIO m) => Logger -> [RetryStatus -> Control.Monad.Catch.Handler m Bool] +logAndSkipAsyncExceptions l = handlers + where + asyncH _ = Handler $ \(e :: AsyncException) -> do + logException l "AsyncException caught" (SomeException e) + pure False + someAsyncH _ = Handler $ \(e :: SomeAsyncException) -> do + logException l "SomeAsyncException caught" (SomeException e) + pure False + handlers = [asyncH, someAsyncH] + +logException :: (MonadIO m) => Logger -> String -> SomeException -> m () +logException l m (SomeException e) = do + Log.err l $ + Log.msg m + . Log.field "error" (displayException e) + +-- | Read NATS credentials from environment +-- Note: Using same env vars as RabbitMQ for compatibility during migration +readCredsFromEnv :: IO (Text, Text) +readCredsFromEnv = + (,) + <$> (Text.pack <$> lookupEnv "NATS_USERNAME" >>= maybe (pure "guest") pure) + <*> (Text.pack <$> lookupEnv "NATS_PASSWORD" >>= maybe (pure "guest") pure) + where + lookupEnv name = try @IOException (getEnv name) >>= \case + Left _ -> pure Nothing + Right v -> pure (Just v) + +-- | Demote NatsAdminOpts to NatsEndpoint (for compatibility with RabbitMQ demoteOpts) +demoteNatsOpts :: NatsAdminOpts -> NatsEndpoint +demoteNatsOpts NatsAdminOpts {..} = NatsEndpoint {..} diff --git a/libs/wire-api-federation/src/Wire/API/Federation/BackendNotifications.hs b/libs/wire-api-federation/src/Wire/API/Federation/BackendNotifications.hs index 7cde4f2b73..6d544f1a73 100644 --- a/libs/wire-api-federation/src/Wire/API/Federation/BackendNotifications.hs +++ b/libs/wire-api-federation/src/Wire/API/Federation/BackendNotifications.hs @@ -15,8 +15,7 @@ import Data.Schema import Data.Text qualified as Text import Data.Text.Lazy.Encoding qualified as TL import Imports -import Network.AMQP qualified as Q -import Network.AMQP.Types qualified as Q +import Network.NATS.Client qualified as NATS import Servant import Servant.Client.Core import Wire.API.Federation.API.Common @@ -148,8 +147,8 @@ sendNotification env component path body = case someComponent component of . void $ clientIn (Proxy @BackendNotificationAPI) (Proxy @(FederatorClient c)) (withoutFirstSlash path) body -enqueue :: Q.Channel -> RequestId -> Domain -> Domain -> Q.DeliveryMode -> FedQueueClient c a -> IO a -enqueue channel requestId originDomain targetDomain deliveryMode (FedQueueClient action) = +enqueue :: NATS.NatsChannel -> RequestId -> Domain -> Domain -> FedQueueClient c a -> IO a +enqueue channel requestId originDomain targetDomain (FedQueueClient action) = runReaderT action FedQueueEnv {..} routingKey :: Text -> Text @@ -160,32 +159,10 @@ routingKey t = "backend-notifications." <> t -- they are stored in Rabbit. type DefederationDomain = Domain --- | If you ever change this function and modify --- queue parameters, know that it will start failing in the --- next release! So be prepared to write migrations. -ensureQueue :: Q.Channel -> Text -> IO () -ensureQueue chan queue = do - let opts = - Q.QueueOpts - { Q.queueName = routingKey queue, - Q.queuePassive = False, - Q.queueDurable = True, - Q.queueExclusive = False, - Q.queueAutoDelete = False, - Q.queueHeaders = - Q.FieldTable $ - Map.fromList - -- single-active-consumer is used because it is order - -- preserving, especially into databases and to remote servers, - -- exactly what we are doing here! - -- Without single active consumer, messages will be delivered - -- round-robbin to all consumers, but then we lose effect-ordering - -- due to processing and network times. - [ ("x-single-active-consumer", Q.FVBool True), - ("x-queue-type", Q.FVString "quorum") - ] - } - void $ Q.declareQueue chan opts +-- NATS doesn't require upfront queue declarations +-- Subjects are created dynamically when publishers/subscribers connect +ensureQueue :: NATS.NatsChannel -> Text -> IO () +ensureQueue _chan _queue = pure () -- * Internal machinery @@ -199,10 +176,9 @@ newtype FedQueueClient c a = FedQueueClient (ReaderT FedQueueEnv IO a) deriving (Functor, Applicative, Monad, MonadIO, MonadReader FedQueueEnv) data FedQueueEnv = FedQueueEnv - { channel :: Q.Channel, + { channel :: NATS.NatsChannel, originDomain :: Domain, targetDomain :: Domain, - deliveryMode :: Q.DeliveryMode, requestId :: RequestId } diff --git a/services/background-worker/src/Wire/BackendNotificationPusher.hs b/services/background-worker/src/Wire/BackendNotificationPusher.hs index 48e1c23e69..eaac3cb6bb 100644 --- a/services/background-worker/src/Wire/BackendNotificationPusher.hs +++ b/services/background-worker/src/Wire/BackendNotificationPusher.hs @@ -1,4 +1,9 @@ {-# LANGUAGE BlockArguments #-} +-- NOTE: This file needs substantial rework for NATS +-- RabbitMQ consumer logic (Q.consumeMsgs, Q.ConsumerTag, etc.) +-- needs to be replaced with NATS subscription logic +-- NATS.subscribe returns a subscription ID, and messages are received differently +-- TODO: Implement NATS-based message consumption {-# LANGUAGE RecordWildCards #-} module Wire.BackendNotificationPusher where @@ -15,10 +20,10 @@ import Data.Map.Strict qualified as Map import Data.Set qualified as Set import Data.Text qualified as Text import Imports -import Network.AMQP qualified as Q -import Network.AMQP.Extended -import Network.AMQP.Lifted qualified as QL -import Network.AMQP.Types qualified as QL +import Network.NATS.Client qualified as NATS +import Network.NATS.Extended + + import Network.RabbitMqAdmin hiding (adminClient) import Network.RabbitMqAdmin qualified as RabbitMqAdmin import Network.Wai.Utilities.Error diff --git a/services/background-worker/src/Wire/BackgroundWorker.hs b/services/background-worker/src/Wire/BackgroundWorker.hs index e6110fb438..8f01a20ebd 100644 --- a/services/background-worker/src/Wire/BackgroundWorker.hs +++ b/services/background-worker/src/Wire/BackgroundWorker.hs @@ -5,7 +5,7 @@ module Wire.BackgroundWorker where import Data.Metrics.Servant qualified as Metrics import Data.Text qualified as T import Imports -import Network.AMQP.Extended (demoteOpts) +import Network.NATS.Extended (demoteNatsOpts) import Network.Wai.Utilities.Server import Servant import Servant.Server.Generic @@ -20,15 +20,15 @@ import Wire.DeadUserNotificationWatcher qualified as DeadUserNotificationWatcher run :: Opts -> IO () run opts = do env <- mkEnv opts - let amqpEP = either id demoteOpts opts.rabbitmq.unRabbitMqOpts + let natsEP = either id demoteNatsOpts opts.nats.unNatsOpts cleanupBackendNotifPusher <- runAppT env $ withNamedLogger "backend-notifcation-pusher" $ - BackendNotificationPusher.startWorker amqpEP + BackendNotificationPusher.startWorker natsEP cleanupDeadUserNotifWatcher <- runAppT env $ withNamedLogger "dead-user-notification-watcher" $ - DeadUserNotificationWatcher.startWorker amqpEP + DeadUserNotificationWatcher.startWorker natsEP let -- cleanup will run in a new thread when the signal is caught, so we need to use IORefs and -- specific exception types to message threads to clean up cleanup = do diff --git a/services/background-worker/src/Wire/BackgroundWorker/Env.hs b/services/background-worker/src/Wire/BackgroundWorker/Env.hs index 93711e9d9b..d2a934378d 100644 --- a/services/background-worker/src/Wire/BackgroundWorker/Env.hs +++ b/services/background-worker/src/Wire/BackgroundWorker/Env.hs @@ -11,7 +11,7 @@ import Control.Monad.Trans.Control import Data.Map.Strict qualified as Map import HTTP2.Client.Manager import Imports -import Network.AMQP.Extended +import Network.NATS.Extended import Network.HTTP.Client import Network.RabbitMqAdmin qualified as RabbitMqAdmin import OpenSSL.Session (SSLOption (..)) @@ -39,8 +39,8 @@ workerName = \case data Env = Env { http2Manager :: Http2Manager, - rabbitmqAdminClient :: Maybe (RabbitMqAdmin.AdminAPI (Servant.AsClientT IO)), - rabbitmqVHost :: Text, + -- natsAdminClient :: Maybe (NatsAdmin.AdminAPI (Servant.AsClientT IO)), -- TODO: Implement NATS admin client + natsNamespace :: Text, logger :: Logger, federatorInternal :: Endpoint, httpManager :: Manager, @@ -81,8 +81,8 @@ mkEnv opts = do responseTimeoutNone (\t -> responseTimeoutMicro $ 1000000 * t) -- seconds to microseconds opts.defederationTimeout - rabbitmqVHost = either (.vHost) (.vHost) opts.rabbitmq.unRabbitMqOpts - rabbitmqAdminClient <- for (rightToMaybe opts.rabbitmq.unRabbitMqOpts) mkRabbitMqAdminClientEnv + natsNamespace = either (.namespace) (.namespace) opts.nats.unNatsOpts + -- natsAdminClient <- for (rightToMaybe opts.nats.unNatsOpts) mkNatsAdminClientEnv -- TODO: Implement NATS admin client statuses <- newIORef $ Map.fromList diff --git a/services/background-worker/src/Wire/BackgroundWorker/Options.hs b/services/background-worker/src/Wire/BackgroundWorker/Options.hs index f9055d89e0..87b988a7ac 100644 --- a/services/background-worker/src/Wire/BackgroundWorker/Options.hs +++ b/services/background-worker/src/Wire/BackgroundWorker/Options.hs @@ -2,7 +2,7 @@ module Wire.BackgroundWorker.Options where import Data.Aeson import Imports -import Network.AMQP.Extended +import Network.NATS.Extended import System.Logger.Extended import Util.Options @@ -11,7 +11,7 @@ data Opts = Opts logFormat :: !(Maybe (Last LogFormat)), backgroundWorker :: !Endpoint, federatorInternal :: !Endpoint, - rabbitmq :: !RabbitMqOpts, + nats :: !NatsOpts, -- | Seconds, Nothing for no timeout defederationTimeout :: Maybe Int, backendNotificationPusher :: BackendNotificationsConfig, @@ -39,12 +39,12 @@ data BackendNotificationsConfig = BackendNotificationsConfig instance FromJSON BackendNotificationsConfig -newtype RabbitMqOpts = RabbitMqOpts {unRabbitMqOpts :: Either AmqpEndpoint RabbitMqAdminOpts} +newtype NatsOpts = NatsOpts {unNatsOpts :: Either NatsEndpoint NatsAdminOpts} deriving (Show) -instance FromJSON RabbitMqOpts where +instance FromJSON NatsOpts where parseJSON v = - RabbitMqOpts + NatsOpts <$> ( (Right <$> parseJSON v) <|> (Left <$> parseJSON v) ) diff --git a/services/background-worker/src/Wire/BackgroundWorker/Util.hs b/services/background-worker/src/Wire/BackgroundWorker/Util.hs index 7b53c99aa7..339e0cfe8a 100644 --- a/services/background-worker/src/Wire/BackgroundWorker/Util.hs +++ b/services/background-worker/src/Wire/BackgroundWorker/Util.hs @@ -1,4 +1,9 @@ module Wire.BackgroundWorker.Util where +-- NOTE: This file needs substantial rework for NATS +-- RabbitMQ consumer logic (Q.consumeMsgs, Q.ConsumerTag, etc.) +-- needs to be replaced with NATS subscription logic +-- NATS.subscribe returns a subscription ID, and messages are received differently +-- TODO: Implement NATS-based message consumption import Imports import Network.AMQP qualified as Q diff --git a/services/background-worker/src/Wire/DeadUserNotificationWatcher.hs b/services/background-worker/src/Wire/DeadUserNotificationWatcher.hs index 802f89eda6..9441ebdee0 100644 --- a/services/background-worker/src/Wire/DeadUserNotificationWatcher.hs +++ b/services/background-worker/src/Wire/DeadUserNotificationWatcher.hs @@ -1,4 +1,9 @@ {-# LANGUAGE BlockArguments #-} +-- NOTE: This file needs substantial rework for NATS +-- RabbitMQ consumer logic (Q.consumeMsgs, Q.ConsumerTag, etc.) +-- needs to be replaced with NATS subscription logic +-- NATS.subscribe returns a subscription ID, and messages are received differently +-- TODO: Implement NATS-based message consumption {-# LANGUAGE RecordWildCards #-} module Wire.DeadUserNotificationWatcher where @@ -10,10 +15,10 @@ import Data.ByteString.Conversion import Data.Id import Data.Map qualified as Map import Imports -import Network.AMQP qualified as Q -import Network.AMQP.Extended -import Network.AMQP.Lifted qualified as QL -import Network.AMQP.Types +import Network.NATS.Client qualified as NATS +import Network.NATS.Extended + + import System.Logger qualified as Log import UnliftIO (async) import Wire.API.Notification diff --git a/services/brig/src/Brig/App.hs b/services/brig/src/Brig/App.hs index 7b2f67979d..7b765ebcbd 100644 --- a/services/brig/src/Brig/App.hs +++ b/services/brig/src/Brig/App.hs @@ -67,7 +67,7 @@ module Brig.App indexEnvLens, randomPrekeyLocalLockLens, keyPackageLocalLockLens, - rabbitmqChannelLens, + natsChannelLens, disabledVersionsLens, enableSFTFederationLens, rateLimitEnvLens, @@ -139,8 +139,8 @@ import HTTP2.Client.Manager (Http2Manager, http2ManagerWithSSLCtx) import Hasql.Pool qualified as HasqlPool import Hasql.Pool.Extended import Imports -import Network.AMQP qualified as Q -import Network.AMQP.Extended qualified as Q +import Network.NATS.Client qualified as NATS +import Network.NATS.Extended qualified as Q import Network.HTTP.Client (responseTimeoutMicro) import Network.HTTP.Client.OpenSSL import OpenSSL.EVP.Digest (Digest, getDigestByName) @@ -215,7 +215,7 @@ data Env = Env indexEnv :: IndexEnv, randomPrekeyLocalLock :: Maybe (MVar ()), keyPackageLocalLock :: MVar (), - rabbitmqChannel :: Maybe (MVar Q.Channel), + natsChannel :: Maybe (MVar NATS.NatsChannel), disabledVersions :: Set Version, enableSFTFederation :: Maybe Bool, rateLimitEnv :: RateLimitEnv @@ -225,9 +225,9 @@ makeLensesWith (lensRules & lensField .~ suffixNamer) ''Env validateOptions :: Opts -> IO () validateOptions o = - case (o.federatorInternal, o.rabbitmq) of - (Nothing, Just _) -> error "RabbitMQ config is specified and federator is not, please specify both or none" - (Just _, Nothing) -> error "Federator is specified and RabbitMQ config is not, please specify both or none" + case (o.federatorInternal, o.nats) of + (Nothing, Just _) -> error "NATS config is specified and federator is not, please specify both or none" + (Just _, Nothing) -> error "Federator is specified and NATS config is not, please specify both or none" _ -> pure () newEnv :: Opts -> IO Env @@ -275,7 +275,7 @@ newEnv opts = do Log.info lgr $ Log.msg (Log.val "randomPrekeys: not active; using dynamoDB instead.") pure Nothing kpLock <- newMVar () - rabbitChan <- traverse (Q.mkRabbitMqChannelMVar lgr (Just "brig")) opts.rabbitmq + natsChan <- traverse (Q.mkNatsChannelMVar lgr (Just "brig")) opts.nats let allDisabledVersions = foldMap expandVersionExp opts.settings.disabledAPIVersions idxEnv <- mkIndexEnv opts.elasticsearch lgr (Opt.galley opts) mgr rateLimitEnv <- newRateLimitEnv opts.settings.passwordHashingRateLimit @@ -316,7 +316,7 @@ newEnv opts = do indexEnv = idxEnv, randomPrekeyLocalLock = prekeyLocalLock, keyPackageLocalLock = kpLock, - rabbitmqChannel = rabbitChan, + natsChannel = natsChan, disabledVersions = allDisabledVersions, enableSFTFederation = opts.multiSFT, rateLimitEnv diff --git a/services/brig/src/Brig/Federation/Client.hs b/services/brig/src/Brig/Federation/Client.hs index 15fc53f5bd..9f911c4cc6 100644 --- a/services/brig/src/Brig/Federation/Client.hs +++ b/services/brig/src/Brig/Federation/Client.hs @@ -32,7 +32,7 @@ import Data.Range (Range) import Data.Text qualified as T import Data.Time.Units import Imports -import Network.AMQP qualified as Q +import Network.NATS.Client qualified as NATS import System.Logger.Class qualified as Log import Wire.API.Federation.API import Wire.API.Federation.API.Brig as FederatedBrig @@ -142,9 +142,9 @@ notifyUserDeleted self remotes = do let notif = UserDeletedConnectionsNotification (tUnqualified self) remoteConnections remoteDomain = tDomain remotes - asks (.rabbitmqChannel) >>= \case + asks (.natsChannel) >>= \case Just chanVar -> do - enqueueNotification (tDomain self) remoteDomain Q.Persistent chanVar $ + enqueueNotification (tDomain self) remoteDomain chanVar $ fedQueueClient @'OnUserDeletedConnectionsTag notif Nothing -> Log.err $ @@ -153,16 +153,16 @@ notifyUserDeleted self remotes = do . Log.field "domain" (domainText remoteDomain) . Log.field "error" (show FederationNotConfigured) --- | Enqueues notifications in RabbitMQ. Retries 3 times with a delay of 1s. -enqueueNotification :: (MonadIO m, MonadMask m, Log.MonadLogger m, MonadReader Env m) => Domain -> Domain -> Q.DeliveryMode -> MVar Q.Channel -> FedQueueClient c () -> m () -enqueueNotification ownDomain remoteDomain deliveryMode chanVar action = do +-- | Enqueues notifications in NATS. Retries 3 times with a delay of 1s. +enqueueNotification :: (MonadIO m, MonadMask m, Log.MonadLogger m, MonadReader Env m) => Domain -> Domain -> MVar NATS.NatsChannel -> FedQueueClient c () -> m () +enqueueNotification ownDomain remoteDomain chanVar action = do let policy = limitRetries 3 <> constantDelay 1_000_000 recovering policy [logRetries (const $ pure True) logError] (const go) where logError willRetry (SomeException e) status = do rid <- asks (.requestId) Log.err $ - Log.msg @Text "failed to enqueue notification in RabbitMQ" + Log.msg @Text "failed to enqueue notification in NATS" . Log.field "error" (displayException e) . Log.field "willRetry" willRetry . Log.field "retryCount" status.rsIterNumber @@ -171,13 +171,13 @@ enqueueNotification ownDomain remoteDomain deliveryMode chanVar action = do rid <- asks (.requestId) mChan <- timeout (1 :: Second) (readMVar chanVar) case mChan of - Nothing -> throwM NoRabbitMqChannel - Just chan -> liftIO $ enqueue chan rid ownDomain remoteDomain deliveryMode action + Nothing -> throwM NoNatsChannel + Just chan -> liftIO $ enqueue chan rid ownDomain remoteDomain action -data NoRabbitMqChannel = NoRabbitMqChannel +data NoNatsChannel = NoNatsChannel deriving (Show) -instance Exception NoRabbitMqChannel +instance Exception NoNatsChannel runBrigFederatorClient :: (MonadReader Env m, MonadIO m) => diff --git a/services/brig/src/Brig/Options.hs b/services/brig/src/Brig/Options.hs index 464af491d6..6a1a28eae4 100644 --- a/services/brig/src/Brig/Options.hs +++ b/services/brig/src/Brig/Options.hs @@ -43,7 +43,7 @@ import Data.Text qualified as Text import Data.Text.Encoding qualified as Text import Database.Bloodhound.Types qualified as ES import Imports -import Network.AMQP.Extended +import Network.NATS.Extended import Network.DNS qualified as DNS import System.Logger.Extended (Level, LogFormat) import Util.Options @@ -397,8 +397,8 @@ data Opts = Opts postgresqlPassword :: !(Maybe FilePathSecrets), -- | SFT Federation multiSFT :: !(Maybe Bool), - -- | RabbitMQ settings, required when federation is enabled. - rabbitmq :: !(Maybe AmqpEndpoint), + -- | NATS settings, required when federation is enabled. + nats :: !(Maybe NatsEndpoint), -- | AWS settings aws :: !AWSOpts, -- | Enable Random Prekey Strategy diff --git a/services/cannon/src/Cannon/Options.hs b/services/cannon/src/Cannon/Options.hs index 3a1cee6988..aded87458f 100644 --- a/services/cannon/src/Cannon/Options.hs +++ b/services/cannon/src/Cannon/Options.hs @@ -31,10 +31,10 @@ module Cannon.Options logFormat, drainOpts, wSOpts, - rabbitmq, + nats, cassandraOpts, - rabbitMqMaxConnections, - rabbitMqMaxChannels, + natsMaxConnections, + natsMaxChannels, notificationTTL, Opts (..), gracePeriodSeconds, @@ -53,7 +53,7 @@ import Data.Aeson import Data.Aeson.APIFieldJsonTH import Data.Default import Imports -import Network.AMQP.Extended (AmqpEndpoint) +import Network.NATS.Extended (NatsEndpoint) import System.Logger.Extended (Level, LogFormat) import Wire.API.Routes.Version @@ -118,7 +118,7 @@ deriveApiFieldJSON ''DrainOpts data Opts = Opts { _optsCannon :: !Cannon, _optsGundeck :: !Gundeck, - _optsRabbitmq :: !AmqpEndpoint, + _optsNats :: !NatsEndpoint, _optsLogLevel :: !Level, _optsLogNetStrings :: !(Maybe (Last Bool)), _optsLogFormat :: !(Maybe (Last LogFormat)), @@ -126,10 +126,10 @@ data Opts = Opts _optsWSOpts :: WSOpts, _optsDisabledAPIVersions :: !(Set VersionExp), _optsCassandraOpts :: !CassandraOpts, - -- | Maximum number of rabbitmq connections. Must be strictly positive. - _optsRabbitMqMaxConnections :: Int, - -- | Maximum number of rabbitmq channels per connection. Must be strictly positive. - _optsRabbitMqMaxChannels :: Int, + -- | Maximum number of NATS connections. Must be strictly positive. + _optsNatsMaxConnections :: Int, + -- | Maximum number of NATS channels per connection. Must be strictly positive. + _optsNatsMaxChannels :: Int, _optsNotificationTTL :: Int } deriving (Show, Generic) @@ -148,7 +148,7 @@ instance FromJSON Opts where Opts <$> o .: "cannon" <*> o .: "gundeck" - <*> o .: "rabbitmq" + <*> o .: "nats" <*> o .: "logLevel" <*> o .:? "logNetStrings" <*> o .:? "logFormat" @@ -156,6 +156,6 @@ instance FromJSON Opts where <*> o .:? "wsOpts" .!= def <*> o .: "disabledAPIVersions" <*> o .: "cassandra" - <*> o .:? "rabbitMqMaxConnections" .!= 1000 - <*> o .:? "rabbitMqMaxChannels" .!= 300 + <*> o .:? "natsMaxConnections" .!= 1000 + <*> o .:? "natsMaxChannels" .!= 300 <*> o .: "notificationTTL" diff --git a/services/cannon/src/Cannon/RabbitMq.hs b/services/cannon/src/Cannon/RabbitMq.hs index 3c39867c59..50763cdc88 100644 --- a/services/cannon/src/Cannon/RabbitMq.hs +++ b/services/cannon/src/Cannon/RabbitMq.hs @@ -1,4 +1,9 @@ {-# LANGUAGE RecordWildCards #-} +-- NOTE: This file needs substantial rework for NATS +-- RabbitMQ consumer logic (Q.consumeMsgs, Q.ConsumerTag, etc.) +-- needs to be replaced with NATS subscription logic +-- NATS.subscribe returns a subscription ID, and messages are received differently +-- TODO: Implement NATS-based message consumption module Cannon.RabbitMq ( RabbitMqPoolException (..), @@ -32,8 +37,8 @@ import Data.Text qualified as T import Data.Timeout import Data.Unique import Imports hiding (threadDelay) -import Network.AMQP qualified as Q -import Network.AMQP.Extended +import Network.NATS.Client qualified as Q +import Network.NATS.Client.Extended import System.Logger (Logger) import System.Logger qualified as Log import UnliftIO (pooledMapConcurrentlyN_) @@ -68,7 +73,7 @@ data RabbitMqPool = RabbitMqPool data RabbitMqPoolOptions = RabbitMqPoolOptions { maxConnections :: Int, maxChannels :: Int, - endpoint :: AmqpEndpoint, + endpoint :: NatsEndpoint, retryEnabled :: Bool } diff --git a/services/cannon/src/Cannon/RabbitMqConsumerApp.hs b/services/cannon/src/Cannon/RabbitMqConsumerApp.hs index b323509826..b1ad085c19 100644 --- a/services/cannon/src/Cannon/RabbitMqConsumerApp.hs +++ b/services/cannon/src/Cannon/RabbitMqConsumerApp.hs @@ -1,4 +1,9 @@ {-# LANGUAGE RecordWildCards #-} +-- NOTE: This file needs substantial rework for NATS +-- RabbitMQ consumer logic (Q.consumeMsgs, Q.ConsumerTag, etc.) +-- needs to be replaced with NATS subscription logic +-- NATS.subscribe returns a subscription ID, and messages are received differently +-- TODO: Implement NATS-based message consumption module Cannon.RabbitMqConsumerApp (rabbitMQWebSocketApp) where @@ -18,8 +23,8 @@ import Data.Text qualified as Text import Data.Text.Lazy qualified as TL import Data.Text.Lazy.Encoding qualified as TLE import Imports hiding (min, threadDelay) -import Network.AMQP (newQueue) -import Network.AMQP qualified as Q +import Network.NATS.Client (newQueue) +import Network.NATS.Client qualified as Q import Network.WebSockets import Network.WebSockets qualified as WS import Network.WebSockets.Connection diff --git a/services/cannon/src/Cannon/Run.hs b/services/cannon/src/Cannon/Run.hs index 1583072882..b894e76a49 100644 --- a/services/cannon/src/Cannon/Run.hs +++ b/services/cannon/src/Cannon/Run.hs @@ -89,7 +89,7 @@ run o = lowerCodensity $ do man <- lift $ newManager defaultManagerSettings {managerConnCount = 128} rnd <- lift createSystemRandom clk <- lift mkClock - mkEnv ext o cassandra g d1 d2 man rnd clk (o ^. Cannon.Options.rabbitmq) + mkEnv ext o cassandra g d1 d2 man rnd clk (o ^. Cannon.Options.nats) void $ Codensity $ Async.withAsync $ runCannon e refreshMetrics let s = newSettings $ Server (o ^. cannon . host) (o ^. cannon . port) (applog e) (Just idleTimeout) diff --git a/services/cannon/src/Cannon/Types.hs b/services/cannon/src/Cannon/Types.hs index f1499f308f..ea9973f77e 100644 --- a/services/cannon/src/Cannon/Types.hs +++ b/services/cannon/src/Cannon/Types.hs @@ -1,4 +1,9 @@ {-# LANGUAGE GeneralizedNewtypeDeriving #-} +-- NOTE: This file needs substantial rework for NATS +-- RabbitMQ consumer logic (Q.consumeMsgs, Q.ConsumerTag, etc.) +-- needs to be replaced with NATS subscription logic +-- NATS.subscribe returns a subscription ID, and messages are received differently +-- TODO: Implement NATS-based message consumption -- This file is part of the Wire Server implementation. -- @@ -46,8 +51,8 @@ import Data.Id import Data.Text.Encoding import Data.Unique import Imports -import Network.AMQP qualified as Q -import Network.AMQP.Extended (AmqpEndpoint) +import Network.NATS.Client qualified as Q +import Network.NATS.Client.Extended (NatsEndpoint) import Prometheus import Servant qualified import System.Logger qualified as Logger @@ -108,7 +113,7 @@ mkEnv :: Manager -> GenIO -> Clock -> - AmqpEndpoint -> + NatsEndpoint -> Codensity IO Env mkEnv external o cs l d conns p g t endpoint = do let poolOpts = diff --git a/services/cannon/src/Cannon/WS.hs b/services/cannon/src/Cannon/WS.hs index d470019f44..9ac1dde9b4 100644 --- a/services/cannon/src/Cannon/WS.hs +++ b/services/cannon/src/Cannon/WS.hs @@ -1,4 +1,9 @@ {-# LANGUAGE GeneralizedNewtypeDeriving #-} +-- NOTE: This file needs substantial rework for NATS +-- RabbitMQ consumer logic (Q.consumeMsgs, Q.ConsumerTag, etc.) +-- needs to be replaced with NATS subscription logic +-- NATS.subscribe returns a subscription ID, and messages are received differently +-- TODO: Implement NATS-based message consumption {-# LANGUAGE RecordWildCards #-} -- This file is part of the Wire Server implementation. @@ -72,7 +77,7 @@ import Data.Text.Encoding (decodeUtf8) import Data.Timeout (TimeoutUnit (..), (#)) import Data.Unique import Imports hiding (threadDelay) -import Network.AMQP qualified as Q +import Network.NATS.Client qualified as Q import Network.HTTP.Types.Method import Network.HTTP.Types.Status import Network.Wai.Utilities.Error diff --git a/services/galley/src/Galley/API/Action/Notify.hs b/services/galley/src/Galley/API/Action/Notify.hs index 2c01c1c6b2..81465dff13 100644 --- a/services/galley/src/Galley/API/Action/Notify.hs +++ b/services/galley/src/Galley/API/Action/Notify.hs @@ -7,7 +7,6 @@ import Galley.API.Util import Galley.Effects import Galley.Effects.BackendNotificationQueueAccess import Imports hiding ((\\)) -import Network.AMQP qualified as Q import Polysemy import Polysemy.Error import Wire.API.Conversation hiding (Conversation, Member) @@ -61,7 +60,7 @@ notifyConversationAction tag quid notifyOrigDomain con lconv targets action extr } update <- fmap (fromMaybe (mkUpdate []) . asum . map tUnqualified) $ - enqueueNotificationsConcurrently Q.Persistent (toList (bmRemotes targets)) $ + enqueueNotificationsConcurrently (toList (bmRemotes targets)) $ \ruids -> do let update = mkUpdate (tUnqualified ruids) -- if notifyOrigDomain is false, filter out user from quid's domain, diff --git a/services/galley/src/Galley/API/Clients.hs b/services/galley/src/Galley/API/Clients.hs index e39776d2b8..4ae44ceb90 100644 --- a/services/galley/src/Galley/API/Clients.hs +++ b/services/galley/src/Galley/API/Clients.hs @@ -35,7 +35,6 @@ import Galley.Effects.ClientStore qualified as E import Galley.Env import Galley.Types.Clients (clientIds) import Imports -import Network.AMQP qualified as Q import Polysemy import Polysemy.Error import Polysemy.Input @@ -124,4 +123,4 @@ rmClient usr cid = do fedQueueClient @'OnClientRemovedTag (ClientRemovedRequest usr cid (tUnqualified remoteConvs)) - in enqueueNotification Q.Persistent remoteConvs rpc + in enqueueNotification remoteConvs rpc diff --git a/services/galley/src/Galley/API/Internal.hs b/services/galley/src/Galley/API/Internal.hs index b6ad084273..289244eacf 100644 --- a/services/galley/src/Galley/API/Internal.hs +++ b/services/galley/src/Galley/API/Internal.hs @@ -448,7 +448,7 @@ rmUser lusr conn = do action = SomeConversationAction (sing @'ConversationLeaveTag) (), extraConversationData = def } - enqueueNotification Q.Persistent remotes $ do + enqueueNotification remotes $ do makeConversationUpdateBundle convUpdate >>= sendBundle @@ -457,7 +457,7 @@ rmUser lusr conn = do for_ (bucketRemote (fromRange cids)) $ \remoteConvs -> do let userDelete = UserDeletedConversationsNotification (tUnqualified lusr) (unsafeRange (tUnqualified remoteConvs)) let rpc = fedQueueClient @'OnUserDeletedConversationsTag userDelete - enqueueNotification Q.Persistent remoteConvs rpc + enqueueNotification remoteConvs rpc deleteLoop :: App () deleteLoop = do diff --git a/services/galley/src/Galley/API/MLS/Propagate.hs b/services/galley/src/Galley/API/MLS/Propagate.hs index 4300af2de3..e73c401ae8 100644 --- a/services/galley/src/Galley/API/MLS/Propagate.hs +++ b/services/galley/src/Galley/API/MLS/Propagate.hs @@ -28,7 +28,6 @@ import Galley.API.Push import Galley.Effects import Galley.Effects.BackendNotificationQueueAccess import Imports -import Network.AMQP qualified as Q import Polysemy import Polysemy.Error import Polysemy.TinyLog hiding (trace) @@ -98,7 +97,7 @@ propagateMessage qusr mSenderClient lConvOrSub con msg cm = do -- send to remotes void $ - enqueueNotificationsConcurrently Q.Persistent (map remoteMemberQualify rmems) $ + enqueueNotificationsConcurrently (map remoteMemberQualify rmems) $ \rs -> fedQueueClient @'OnMLSMessageSentTag diff --git a/services/galley/src/Galley/API/Message.hs b/services/galley/src/Galley/API/Message.hs index 35ea6ecb97..a4f256cab1 100644 --- a/services/galley/src/Galley/API/Message.hs +++ b/services/galley/src/Galley/API/Message.hs @@ -60,7 +60,6 @@ import Galley.Effects.TeamStore import Galley.Options import Galley.Types.Clients qualified as Clients import Imports hiding (forkIO) -import Network.AMQP qualified as Q import Polysemy hiding (send) import Polysemy.Error import Polysemy.Input @@ -697,7 +696,7 @@ sendRemoteMessages domain now sender senderClient lcnv metadata messages = transient = mmTransient metadata, recipients = UserClientMap rcpts } - enqueueNotification Q.Persistent domain (fedQueueClient @'OnMessageSentTag rm) + enqueueNotification domain (fedQueueClient @'OnMessageSentTag rm) where handle :: Either FederationError a -> Sem r (Set (UserId, ClientId)) handle (Right _) = pure mempty diff --git a/services/galley/src/Galley/API/Util.hs b/services/galley/src/Galley/API/Util.hs index 500c92596c..d7c9424560 100644 --- a/services/galley/src/Galley/API/Util.hs +++ b/services/galley/src/Galley/API/Util.hs @@ -58,7 +58,6 @@ import Galley.Types.Clients (Clients, fromUserClients) import Galley.Types.Conversations.Roles import Galley.Types.Teams import Imports hiding (forkIO) -import Network.AMQP qualified as Q import Polysemy import Polysemy.Error import Polysemy.Input @@ -1005,7 +1004,7 @@ registerRemoteConversationMemberships now lusr lc joinType = deleteOnUnreachable ) joined - void $ enqueueNotificationsConcurrentlyBuckets Q.Persistent joinedCoupled $ \z -> + void $ enqueueNotificationsConcurrentlyBuckets joinedCoupled $ \z -> makeConversationUpdateBundle (convUpdateJoin z) >>= sendBundle where creator :: Maybe UserId diff --git a/services/galley/src/Galley/App.hs b/services/galley/src/Galley/App.hs index a071d02a5b..fc43618bc0 100644 --- a/services/galley/src/Galley/App.hs +++ b/services/galley/src/Galley/App.hs @@ -82,7 +82,7 @@ import HTTP2.Client.Manager (Http2Manager, http2ManagerWithSSLCtx) import Hasql.Pool qualified as Hasql import Hasql.Pool.Extended (initPostgresPool) import Imports hiding (forkIO) -import Network.AMQP.Extended (mkRabbitMqChannelMVar) +import Network.NATS.Extended (mkNatsChannelMVar) import Network.HTTP.Client (responseTimeoutMicro) import Network.HTTP.Client.OpenSSL import Network.Wai.Utilities.JSONResponse @@ -157,9 +157,9 @@ validateOptions o = do error "setMaxConvSize cannot be > setTruncationLimit" when (settings' ^. maxTeamSize < optFanoutLimit) $ error "setMaxTeamSize cannot be < setTruncationLimit" - case (o ^. O.federator, o ^. rabbitmq) of - (Nothing, Just _) -> error "RabbitMQ config is specified and federator is not, please specify both or none" - (Just _, Nothing) -> error "Federator is specified and RabbitMQ config is not, please specify both or none" + case (o ^. O.federator, o ^. nats) of + (Nothing, Just _) -> error "NATS config is specified and federator is not, please specify both or none" + (Just _, Nothing) -> error "Federator is specified and NATS config is not, please specify both or none" _ -> pure () let mlsFlag = settings' ^. featureFlags . to (featureDefaults @MLSConfig) mlsConfig = mlsFlag.config @@ -187,7 +187,7 @@ createEnv o l = do <*> initExtEnv <*> maybe (pure Nothing) (fmap Just . Aws.mkEnv l mgr) (o ^. journal) <*> traverse loadAllMLSKeys (o ^. settings . mlsPrivateKeyPaths) - <*> traverse (mkRabbitMqChannelMVar l (Just "galley")) (o ^. rabbitmq) + <*> traverse (mkNatsChannelMVar l (Just "galley")) (o ^. nats) <*> pure codeURIcfg <*> newRateLimitEnv (o ^. settings . passwordHashingRateLimit) diff --git a/services/galley/src/Galley/Effects/BackendNotificationQueueAccess.hs b/services/galley/src/Galley/Effects/BackendNotificationQueueAccess.hs index fee78987c2..f664ddb3c2 100644 --- a/services/galley/src/Galley/Effects/BackendNotificationQueueAccess.hs +++ b/services/galley/src/Galley/Effects/BackendNotificationQueueAccess.hs @@ -2,7 +2,6 @@ module Galley.Effects.BackendNotificationQueueAccess where import Data.Qualified import Imports -import Network.AMQP qualified as Q import Polysemy import Polysemy.Error import Wire.API.Federation.BackendNotifications @@ -12,19 +11,16 @@ import Wire.API.Federation.Error data BackendNotificationQueueAccess m a where EnqueueNotification :: (KnownComponent c) => - Q.DeliveryMode -> Remote x -> FedQueueClient c a -> BackendNotificationQueueAccess m (Either FederationError a) EnqueueNotificationsConcurrently :: (KnownComponent c, Foldable f, Functor f) => - Q.DeliveryMode -> f (Remote x) -> (Remote [x] -> FedQueueClient c a) -> BackendNotificationQueueAccess m (Either FederationError [Remote a]) EnqueueNotificationsConcurrentlyBuckets :: (KnownComponent c, Foldable f, Functor f) => - Q.DeliveryMode -> f (Remote x) -> (Remote x -> FedQueueClient c a) -> BackendNotificationQueueAccess m (Either FederationError [Remote a]) @@ -34,11 +30,10 @@ enqueueNotification :: Member (Error FederationError) r, Member BackendNotificationQueueAccess r ) => - Q.DeliveryMode -> Remote x -> FedQueueClient c a -> Sem r a -enqueueNotification m r q = send (EnqueueNotification m r q) >>= either throw pure +enqueueNotification r q = send (EnqueueNotification r q) >>= either throw pure enqueueNotificationsConcurrently :: ( KnownComponent c, @@ -47,12 +42,11 @@ enqueueNotificationsConcurrently :: Member (Error FederationError) r, Member BackendNotificationQueueAccess r ) => - Q.DeliveryMode -> f (Remote x) -> (Remote [x] -> FedQueueClient c a) -> Sem r [Remote a] -enqueueNotificationsConcurrently m r q = - send (EnqueueNotificationsConcurrently m r q) +enqueueNotificationsConcurrently r q = + send (EnqueueNotificationsConcurrently r q) >>= either throw pure enqueueNotificationsConcurrentlyBuckets :: @@ -62,9 +56,8 @@ enqueueNotificationsConcurrentlyBuckets :: Member (Error FederationError) r, Member BackendNotificationQueueAccess r ) => - Q.DeliveryMode -> f (Remote x) -> (Remote x -> FedQueueClient c a) -> Sem r [Remote a] -enqueueNotificationsConcurrentlyBuckets m r q = - send (EnqueueNotificationsConcurrentlyBuckets m r q) >>= either throw pure +enqueueNotificationsConcurrentlyBuckets r q = + send (EnqueueNotificationsConcurrentlyBuckets r q) >>= either throw pure diff --git a/services/galley/src/Galley/Env.hs b/services/galley/src/Galley/Env.hs index 51ea6f59fc..5f3c3221ef 100644 --- a/services/galley/src/Galley/Env.hs +++ b/services/galley/src/Galley/Env.hs @@ -34,7 +34,7 @@ import Galley.Queue qualified as Q import HTTP2.Client.Manager (Http2Manager) import Hasql.Pool import Imports -import Network.AMQP qualified as Q +import Network.NATS.Client qualified as NATS import Network.HTTP.Client import Network.HTTP.Client.OpenSSL import OpenSSL.EVP.Digest @@ -65,7 +65,7 @@ data Env = Env _extEnv :: ExtEnv, _aEnv :: Maybe Aws.Env, _mlsKeys :: Maybe (MLSKeysByPurpose MLSPrivateKeys), - _rabbitmqChannel :: Maybe (MVar Q.Channel), + _natsChannel :: Maybe (MVar NATS.NatsChannel), _convCodeURI :: Either HttpsUrl (Map Text HttpsUrl), _passwordHashingRateLimitEnv :: RateLimitEnv } diff --git a/services/galley/src/Galley/Intra/BackendNotificationQueue.hs b/services/galley/src/Galley/Intra/BackendNotificationQueue.hs index 756ce2379a..36a3311e1b 100644 --- a/services/galley/src/Galley/Intra/BackendNotificationQueue.hs +++ b/services/galley/src/Galley/Intra/BackendNotificationQueue.hs @@ -14,7 +14,7 @@ import Galley.Env import Galley.Monad import Galley.Options import Imports -import Network.AMQP qualified as Q +import Network.NATS.Client qualified as NATS import Polysemy import Polysemy.Input import Polysemy.TinyLog @@ -31,21 +31,21 @@ interpretBackendNotificationQueueAccess :: Sem (BackendNotificationQueueAccess ': r) a -> Sem r a interpretBackendNotificationQueueAccess = interpret $ \case - EnqueueNotification deliveryMode remote action -> do + EnqueueNotification remote action -> do logEffect "BackendNotificationQueueAccess.EnqueueNotification" - embedApp . runExceptT $ enqueueNotification deliveryMode (tDomain remote) action - EnqueueNotificationsConcurrently m xs rpc -> do + embedApp . runExceptT $ enqueueNotification (tDomain remote) action + EnqueueNotificationsConcurrently xs rpc -> do logEffect "BackendNotificationQueueAccess.EnqueueNotificationsConcurrently" - embedApp . runExceptT $ enqueueNotificationsConcurrently m xs rpc - EnqueueNotificationsConcurrentlyBuckets m xs rpc -> do + embedApp . runExceptT $ enqueueNotificationsConcurrently xs rpc + EnqueueNotificationsConcurrentlyBuckets xs rpc -> do logEffect "BackendNotificationQueueAccess.EnqueueNotificationsConcurrentlyBuckets" - embedApp . runExceptT $ enqueueNotificationsConcurrentlyBuckets m xs rpc + embedApp . runExceptT $ enqueueNotificationsConcurrentlyBuckets xs rpc -getChannel :: ExceptT FederationError App (MVar Q.Channel) -getChannel = view rabbitmqChannel >>= maybe (throwE FederationNotConfigured) pure +getChannel :: ExceptT FederationError App (MVar NATS.NatsChannel) +getChannel = view natsChannel >>= maybe (throwE FederationNotConfigured) pure -enqueueSingleNotification :: Domain -> Q.DeliveryMode -> MVar Q.Channel -> FedQueueClient c a -> App a -enqueueSingleNotification remoteDomain deliveryMode chanVar action = do +enqueueSingleNotification :: Domain -> MVar NATS.NatsChannel -> FedQueueClient c a -> App a +enqueueSingleNotification remoteDomain chanVar action = do ownDomain <- view (options . settings . federationDomain) let policy = limitRetries 3 <> constantDelay 1_000_000 handlers = @@ -56,7 +56,7 @@ enqueueSingleNotification remoteDomain deliveryMode chanVar action = do logError willRetry (SomeException e) status = do rid <- view reqId Log.err $ - Log.msg @Text "failed to enqueue notification in RabbitMQ" + Log.msg @Text "failed to enqueue notification in NATS" . Log.field "error" (displayException e) . Log.field "willRetry" willRetry . Log.field "retryCount" status.rsIterNumber @@ -65,31 +65,29 @@ enqueueSingleNotification remoteDomain deliveryMode chanVar action = do rid <- view reqId mChan <- timeout 1_000_000 (readMVar chanVar) case mChan of - Nothing -> throwM NoRabbitMqChannel + Nothing -> throwM NoNatsChannel Just chan -> do - liftIO $ enqueue chan rid ownDomain remoteDomain deliveryMode action + liftIO $ enqueue chan rid ownDomain remoteDomain action -enqueueNotification :: Q.DeliveryMode -> Domain -> FedQueueClient c a -> ExceptT FederationError App a -enqueueNotification deliveryMode remoteDomain action = do +enqueueNotification :: Domain -> FedQueueClient c a -> ExceptT FederationError App a +enqueueNotification remoteDomain action = do chanVar <- getChannel - lift $ enqueueSingleNotification remoteDomain deliveryMode chanVar action + lift $ enqueueSingleNotification remoteDomain chanVar action enqueueNotificationsConcurrently :: (Foldable f, Functor f) => - Q.DeliveryMode -> f (Remote x) -> (Remote [x] -> FedQueueClient c a) -> ExceptT FederationError App [Remote a] -enqueueNotificationsConcurrently m xs f = - enqueueNotificationsConcurrentlyBuckets m (bucketRemote xs) f +enqueueNotificationsConcurrently xs f = + enqueueNotificationsConcurrentlyBuckets (bucketRemote xs) f enqueueNotificationsConcurrentlyBuckets :: (Foldable f) => - Q.DeliveryMode -> f (Remote x) -> (Remote x -> FedQueueClient c a) -> ExceptT FederationError App [Remote a] -enqueueNotificationsConcurrentlyBuckets m xs f = do +enqueueNotificationsConcurrentlyBuckets xs f = do case toList xs of -- only attempt to get a channel if there is at least one notification to send [] -> pure [] @@ -97,9 +95,9 @@ enqueueNotificationsConcurrentlyBuckets m xs f = do chanVar <- getChannel lift $ pooledForConcurrentlyN 8 (toList xs) $ \r -> qualifyAs r - <$> enqueueSingleNotification (tDomain r) m chanVar (f r) + <$> enqueueSingleNotification (tDomain r) chanVar (f r) -data NoRabbitMqChannel = NoRabbitMqChannel +data NoNatsChannel = NoNatsChannel deriving (Show) -instance Exception NoRabbitMqChannel +instance Exception NoNatsChannel diff --git a/services/galley/src/Galley/Options.hs b/services/galley/src/Galley/Options.hs index 415449a560..6e90cad69f 100644 --- a/services/galley/src/Galley/Options.hs +++ b/services/galley/src/Galley/Options.hs @@ -47,7 +47,7 @@ module Galley.Options gundeck, spar, federator, - rabbitmq, + nats, discoUrl, settings, journal, @@ -73,7 +73,7 @@ import Data.Range import Galley.Keys import Galley.Types.Teams import Imports -import Network.AMQP.Extended +import Network.NATS.Extended import System.Logger.Extended (Level, LogFormat) import Util.Options hiding (endpoint) import Util.Options.Common @@ -199,8 +199,8 @@ data Opts = Opts _spar :: !Endpoint, -- | Federator endpoint _federator :: !(Maybe Endpoint), - -- | RabbitMQ settings, required when federation is enabled. - _rabbitmq :: !(Maybe AmqpEndpoint), + -- | NATS settings, required when federation is enabled. + _nats :: !(Maybe NatsEndpoint), -- | Disco URL _discoUrl :: !(Maybe Text), -- | Other settings diff --git a/services/gundeck/src/Gundeck/Client.hs b/services/gundeck/src/Gundeck/Client.hs index 9d89c2da80..c893292580 100644 --- a/services/gundeck/src/Gundeck/Client.hs +++ b/services/gundeck/src/Gundeck/Client.hs @@ -24,7 +24,7 @@ import Gundeck.Notification.Data qualified as Notifications import Gundeck.Push.Data qualified as Push import Gundeck.Push.Native import Imports -import Network.AMQP +import Network.NATS.Client qualified as NATS import Wire.API.Notification unregister :: UserId -> ClientId -> Gundeck () @@ -42,16 +42,12 @@ removeUser user = do Notifications.deleteAll user setupConsumableNotifications :: - Channel -> + NATS.NatsChannel -> UserId -> ClientId -> IO Text setupConsumableNotifications chan uid cid = do let qName = clientNotificationQueueName uid cid - void $ - declareQueue - chan - (queueOpts qName) - for_ [userRoutingKey uid, clientRoutingKey uid cid] $ - bindQueue chan qName userNotificationExchangeName + -- NATS subscriptions are created dynamically when clients connect + -- No need to declare queues upfront like in RabbitMQ pure qName diff --git a/services/gundeck/src/Gundeck/Env.hs b/services/gundeck/src/Gundeck/Env.hs index e3670c13a8..af784f3462 100644 --- a/services/gundeck/src/Gundeck/Env.hs +++ b/services/gundeck/src/Gundeck/Env.hs @@ -41,8 +41,8 @@ import Gundeck.Redis qualified as Redis import Gundeck.Redis.HedisExtensions qualified as Redis import Gundeck.ThreadBudget import Imports -import Network.AMQP (Channel) -import Network.AMQP.Extended qualified as Q +import Network.NATS.Client qualified as NATS +import Network.NATS.Extended qualified as Q import Network.HTTP.Client (responseTimeoutMicro) import Network.HTTP.Client.TLS (tlsManagerSettings) import Network.TLS as TLS @@ -61,7 +61,7 @@ data Env = Env _awsEnv :: !Aws.Env, _time :: !(IO Milliseconds), _threadBudgetState :: !(Maybe ThreadBudgetState), - _rabbitMqChannel :: MVar Channel + _natsChannel :: MVar NATS.NatsChannel } makeLenses ''Env @@ -104,8 +104,8 @@ createEnv o = do { updateAction = Ms . round . (* 1000) <$> getPOSIXTime } mtbs <- mkThreadBudgetState `mapM` (o ^. settings . maxConcurrentNativePushes) - rabbitMqChannelMVar <- Q.mkRabbitMqChannelMVar l (Just "gundeck") (o ^. rabbitmq) - pure $! (rThread : rAdditionalThreads,) $! Env (RequestId defRequestId) o l n p r rAdditional a io mtbs rabbitMqChannelMVar + natsChannelMVar <- Q.mkNatsChannelMVar l (Just "gundeck") (o ^. nats) + pure $! (rThread : rAdditionalThreads,) $! Env (RequestId defRequestId) o l n p r rAdditional a io mtbs natsChannelMVar reqIdMsg :: RequestId -> Logger.Msg -> Logger.Msg reqIdMsg = ("request" Logger..=) . unRequestId diff --git a/services/gundeck/src/Gundeck/Monad.hs b/services/gundeck/src/Gundeck/Monad.hs index 4d3d9607dd..d00e0fc80f 100644 --- a/services/gundeck/src/Gundeck/Monad.hs +++ b/services/gundeck/src/Gundeck/Monad.hs @@ -32,7 +32,7 @@ module Gundeck.Monad runDirect, runGundeck, posixTime, - getRabbitMqChan, + getNatsChan, -- * Select which redis to target runWithDefaultRedis, @@ -57,7 +57,7 @@ import Database.Redis qualified as Redis import Gundeck.Env import Gundeck.Redis qualified as Redis import Imports -import Network.AMQP +import Network.NATS.Client qualified as NATS import Network.HTTP.Types import Network.Wai import Network.Wai.Utilities.Error @@ -208,12 +208,12 @@ posixTime = view time >>= liftIO msToUTCSecs :: Milliseconds -> UTCTime msToUTCSecs = posixSecondsToUTCTime . fromIntegral . (`div` 1000) . ms -getRabbitMqChan :: Gundeck Channel -getRabbitMqChan = do - chanMVar <- view rabbitMqChannel +getNatsChan :: Gundeck NATS.NatsChannel +getNatsChan = do + chanMVar <- view natsChannel mChan <- liftIO $ System.Timeout.timeout 1_000_000 $ readMVar chanMVar case mChan of Nothing -> do - Log.err $ Log.msg (Log.val "Could not retrieve RabbitMQ channel") - throwM $ mkError status500 "internal-server-error" "Could not retrieve RabbitMQ channel" + Log.err $ Log.msg (Log.val "Could not retrieve NATS channel") + throwM $ mkError status500 "internal-server-error" "Could not retrieve NATS channel" Just chan -> pure chan diff --git a/services/gundeck/src/Gundeck/Options.hs b/services/gundeck/src/Gundeck/Options.hs index ee55c98beb..0725f3a7ff 100644 --- a/services/gundeck/src/Gundeck/Options.hs +++ b/services/gundeck/src/Gundeck/Options.hs @@ -25,7 +25,7 @@ import Data.Aeson.TH import Data.Yaml (FromJSON) import Gundeck.Aws.Arn import Imports -import Network.AMQP.Extended +import Network.NATS.Extended import System.Logger.Extended (Level, LogFormat) import Util.Options import Util.Options.Common @@ -136,7 +136,7 @@ data Opts = Opts _redis :: !RedisEndpoint, _redisAdditionalWrite :: !(Maybe RedisEndpoint), _aws :: !AWSOpts, - _rabbitmq :: !AmqpEndpoint, + _nats :: !NatsEndpoint, _discoUrl :: !(Maybe Text), _settings :: !Settings, -- Logging diff --git a/services/gundeck/src/Gundeck/Push.hs b/services/gundeck/src/Gundeck/Push.hs index 85f107a63c..3f5ec43718 100644 --- a/services/gundeck/src/Gundeck/Push.hs +++ b/services/gundeck/src/Gundeck/Push.hs @@ -63,8 +63,8 @@ import Gundeck.Push.Websocket qualified as Web import Gundeck.ThreadBudget import Gundeck.Util import Imports -import Network.AMQP (Message (..)) -import Network.AMQP qualified as Q +import Network.NATS.Client qualified as NATS +import Network.NATS.Client (NatsMessage (..)) import Network.HTTP.Types import Network.Wai.Utilities import System.Logger.Class (msg, val, (+++), (.=), (~~)) @@ -97,7 +97,7 @@ class (MonadThrow m) => MonadPushAll m where mpaForkIO :: m () -> m () mpaRunWithBudget :: Int -> a -> m a -> m a mpaGetClients :: Set UserId -> m UserClientsFull - mpaPublishToRabbitMq :: Text -> Text -> Q.Message -> m () + mpaPublishToNats :: Text -> NATS.NatsMessage -> m () instance MonadPushAll Gundeck where mpaNotificationTTL = view (options . settings . notificationTTL) @@ -110,12 +110,12 @@ instance MonadPushAll Gundeck where mpaForkIO = void . forkIO mpaRunWithBudget = runWithBudget'' mpaGetClients = getClients - mpaPublishToRabbitMq = publishToRabbitMq + mpaPublishToNats = publishToNats -publishToRabbitMq :: Text -> Text -> Q.Message -> Gundeck () -publishToRabbitMq exchangeName routingKey qMsg = do - chan <- getRabbitMqChan - void $ liftIO $ Q.publishMsg chan exchangeName routingKey qMsg +publishToNats :: Text -> NATS.NatsMessage -> Gundeck () +publishToNats subject natsMsg = do + chan <- getNatsChan + void $ liftIO $ NATS.publish chan subject (NATS.msgBody natsMsg) -- | Another layer of wrap around 'runWithBudget'. runWithBudget'' :: Int -> a -> Gundeck a -> Gundeck a @@ -292,7 +292,7 @@ pushAllViaRabbitMq newNotifs userClientsFull = do pushViaRabbitMq :: (MonadPushAll m) => NewNotification -> m () pushViaRabbitMq newNotif = do qMsg <- mkMessage newNotif.nnNotification - let routingKeys = + let subjects = Set.unions $ flip Set.map (Set.fromList . toList $ newNotif.nnRecipients) \r -> case r._recipientClients of @@ -300,8 +300,8 @@ pushViaRabbitMq newNotif = do Set.singleton $ userRoutingKey r._recipientId RecipientClientsSome (toList -> cs) -> Set.fromList $ map (clientRoutingKey r._recipientId) cs - for_ routingKeys $ \routingKey -> - mpaPublishToRabbitMq userNotificationExchangeName routingKey qMsg + for_ subjects $ \subject -> + mpaPublishToNats subject qMsg pushAllToCells :: (MonadPushAll m, Log.MonadLogger m) => [NewNotification] -> m () pushAllToCells newNotifs = do @@ -317,41 +317,20 @@ pushAllToCells newNotifs = do pushToCells :: (MonadPushAll m) => Text -> NewNotification -> m () pushToCells queue newNotif = do qMsg <- mkMessage newNotif.nnNotification - mpaPublishToRabbitMq "" queue qMsg + mpaPublishToNats queue qMsg -mkMessage :: (MonadPushAll m) => Notification -> m Message +mkMessage :: (MonadPushAll m) => Notification -> m NATS.NatsMessage mkMessage notif = do NotificationTTL ttl <- mpaNotificationTTL pure $ - Q.newMsg - { msgBody = + NATS.NatsMessage + { NATS.msgSubject = "", -- Will be set by caller + NATS.msgBody = Aeson.encode . queuedNotification notif.ntfId $ toNonEmpty notif.ntfPayload, - msgContentType = Just "application/json", - msgDeliveryMode = - -- Non-persistent messages never hit the disk and so do not - -- survive RabbitMQ node restarts, this is great for transient - -- notifications. - Just - ( if notif.ntfTransient - then Q.NonPersistent - else Q.Persistent - ), - msgExpiration = - Just - ( if notif.ntfTransient - then - ( -- Means that if there is no active consumer, this - -- message will never be delivered to anyone. It can - -- still take some time before RabbitMQ forgets about - -- this message because the expiration is only - -- considered for messages which are at the head of a - -- queue. See docs: https://www.rabbitmq.com/docs/ttl - "0" - ) - else showT $ fromIntegral ttl # Second #> MilliSecond - ) + NATS.msgReplyTo = Nothing, + NATS.msgHeaders = Map.empty } -- | A new notification to be stored in C* and pushed over websockets diff --git a/services/gundeck/src/Gundeck/Run.hs b/services/gundeck/src/Gundeck/Run.hs index 66f860873e..908253690a 100644 --- a/services/gundeck/src/Gundeck/Run.hs +++ b/services/gundeck/src/Gundeck/Run.hs @@ -43,8 +43,8 @@ import Gundeck.React import Gundeck.Schema.Run (lastSchemaVersion) import Gundeck.ThreadBudget import Imports -import Network.AMQP -import Network.AMQP.Types +import Network.NATS.Client qualified as NATS +import Network.NATS.Extended qualified as NATS import Network.Wai as Wai import Network.Wai.Middleware.Gunzip qualified as GZip import Network.Wai.Middleware.Gzip qualified as GZip @@ -97,38 +97,9 @@ run opts = withTracer \tracer -> do setUpRabbitMqExchangesAndQueues = do chan <- getRabbitMqChan MonadLogger.info $ Log.msg (Log.val "setting up RabbitMQ exchanges and queues") - liftIO $ createUserNotificationsExchange chan - liftIO $ createDeadUserNotificationsExchange chan - liftIO $ createCellsNotificationsQueue chan - - createUserNotificationsExchange :: Channel -> IO () - createUserNotificationsExchange chan = do - declareExchange chan newExchange {exchangeName = userNotificationExchangeName, exchangeType = "direct"} - - createDeadUserNotificationsExchange :: Channel -> IO () - createDeadUserNotificationsExchange chan = do - declareExchange chan newExchange {exchangeName = userNotificationDlxName, exchangeType = "direct"} - - let routingKey = userNotificationDlqName - void $ - declareQueue - chan - newQueue - { queueName = userNotificationDlqName, - queueHeaders = FieldTable $ Map.fromList [("x-queue-type", FVString "quorum")] - } - bindQueue chan userNotificationDlqName userNotificationDlxName routingKey - - createCellsNotificationsQueue :: Channel -> IO () - createCellsNotificationsQueue chan = for_ - (opts ^. (settings . cellsEventQueue)) - $ \name -> - declareQueue - chan - newQueue - { queueName = name, - queueHeaders = FieldTable $ Map.fromList [("x-queue-type", FVString "quorum")] - } + -- NATS doesn't require upfront exchange/queue declarations + -- Subjects are created dynamically when publishers/subscribers connect + pure () middleware :: Env -> IO Middleware middleware env = do