Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/haskell-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ on:
jobs:
linux:
name: Haskell-CI - Linux - ${{ matrix.compiler }}
runs-on: ubuntu-20.04
runs-on: ubuntu-24.04
timeout-minutes:
60
container:
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
/dist-newstyle/
/cabal.project.local
/cabal.project.freeze
/haddocks/
TAGS
.ghc.environment.*
.cabal-sandbox
Expand Down
9 changes: 9 additions & 0 deletions consumers/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
# consumers-2.3.3.2 (XXX-XX-XX)
* Log batch size limits when processing.
* Split off testing utilities into separate module.
* Ensure tables are cleaned up on test-teardown.
* Allow passing in PG variables through environment variables.
* Expose inlining information so it is possible to specialize consumers at
call-sites.
* Bump Ubuntu image used in CI.

# consumers-2.3.3.1 (2025-04-03)
* Do not prepare query that updates jobs in the monitor thread.

Expand Down
3 changes: 3 additions & 0 deletions consumers/consumers.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ test-suite consumers-test
monad-time,
mtl,
stm,
tasty,
tasty-hunit,
text,
time,
transformers,
Expand All @@ -114,3 +116,4 @@ test-suite consumers-test

type: exitcode-stdio-1.0
main-is: Test.hs
other-modules: Util
37 changes: 25 additions & 12 deletions consumers/src/Database/PostgreSQL/Consumers/Components.hs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ runConsumer
-- ^ The consumer.
-> ConnectionSourceM m
-> m (m ())
{-# INLINEABLE runConsumer #-}
runConsumer cc cs = runConsumerWithMaybeIdleSignal cc cs Nothing

runConsumerWithIdleSignal
Expand All @@ -67,6 +68,7 @@ runConsumerWithIdleSignal
-> ConnectionSourceM m
-> TMVar Bool
-> m (m ())
{-# INLINEABLE runConsumerWithIdleSignal #-}
runConsumerWithIdleSignal cc cs idleSignal = runConsumerWithMaybeIdleSignal cc cs (Just idleSignal)

-- | Run the consumer and also signal whenever the consumer is waiting for
Expand All @@ -85,12 +87,13 @@ runConsumerWithMaybeIdleSignal
-> ConnectionSourceM m
-> Maybe (TMVar Bool)
-> m (m ())
{-# INLINEABLE runConsumerWithMaybeIdleSignal #-}
runConsumerWithMaybeIdleSignal cc0 cs mIdleSignal
| ccMaxRunningJobs cc < 1 = do
logInfo_ "ccMaxRunningJobs < 1, not starting the consumer"
pure $ pure ()
| otherwise = do
semaphore <- newMVar ()
(triggerNotification, listenNotification) <- mkNotification
runningJobsInfo <- liftBase $ newTVarIO M.empty
runningJobs <- liftBase $ newTVarIO 0

Expand All @@ -102,15 +105,15 @@ runConsumerWithMaybeIdleSignal cc0 cs mIdleSignal

cid <- registerConsumer cc cs
localData ["consumer_id" .= show cid] $ do
listener <- spawnListener cc cs semaphore
listener <- spawnListener cc cs triggerNotification
monitor <- localDomain "monitor" $ spawnMonitor cc cs cid
dispatcher <-
localDomain "dispatcher" $
spawnDispatcher
cc
cs
cid
semaphore
listenNotification
runningJobsInfo
runningJobs
mIdleSignal
Expand Down Expand Up @@ -184,9 +187,10 @@ spawnListener
:: (MonadBaseControl IO m, MonadMask m)
=> ConsumerConfig m idx job
-> ConnectionSourceM m
-> MVar ()
-> TriggerNotification m
-> m ThreadId
spawnListener cc cs semaphore =
{-# INLINEABLE spawnListener #-}
spawnListener cc cs outbox =
forkP "listener" $
case ccNotificationChannel cc of
Just chan ->
Expand All @@ -204,8 +208,7 @@ spawnListener cc cs semaphore =
liftBase . threadDelay $ ccNotificationTimeout cc
signalDispatcher
where
signalDispatcher = do
liftBase $ tryPutMVar semaphore ()
signalDispatcher = triggerNotification outbox

noTs =
defaultTransactionSettings
Expand All @@ -228,6 +231,7 @@ spawnMonitor
-> ConnectionSourceM m
-> ConsumerID
-> m ThreadId
{-# INLINEABLE spawnMonitor #-}
spawnMonitor ConsumerConfig {..} cs cid = forkP "monitor" . forever $ do
runDBT cs ts $ do
now <- currentTime
Expand Down Expand Up @@ -309,14 +313,18 @@ spawnDispatcher
=> ConsumerConfig m idx job
-> ConnectionSourceM m
-> ConsumerID
-> MVar ()
-> ListenNotification m
-> TVar (M.Map ThreadId idx)
-> TVar Int
-> Maybe (TMVar Bool)
-> m ThreadId
spawnDispatcher ConsumerConfig {..} cs cid semaphore runningJobsInfo runningJobs mIdleSignal =
{-# INLINEABLE spawnDispatcher #-}
spawnDispatcher ConsumerConfig {..} cs cid inbox runningJobsInfo runningJobs mIdleSignal =
forkP "dispatcher" . forever $ do
void $ takeMVar semaphore
listenNotification inbox
-- When awoken, we always start slow, processing only a single job in a
-- batch. Each time we can fill a batch completely with jobs, we grow the maximum
-- batch size.
someJobWasProcessed <- loop 1
if someJobWasProcessed
then setIdle False
Expand All @@ -336,7 +344,9 @@ spawnDispatcher ConsumerConfig {..} cs cid semaphore runningJobsInfo runningJobs
logInfo "Processing batch" $
object
[ "batch_size" .= batchSize
, "limit" .= limit
]

-- Update runningJobs before forking so that we can adjust
-- maxBatchSize appropriately later. We also need to mask asynchronous
-- exceptions here as we rely on correct value of runningJobs to
Expand All @@ -349,9 +359,11 @@ spawnDispatcher ConsumerConfig {..} cs cid semaphore runningJobsInfo runningJobs
. forkP "batch processor"
. (`finally` subtractJobs)
. restore
$ do
mapM startJob batch >>= mapM joinJob >>= updateJobs
$ mapM startJob batch >>= mapM joinJob >>= updateJobs

-- Induce some backpressure. If the number of running jobs by all batch
-- processors exceed the global limit, we wait. If it does not, start a
-- new iteration with a double the limit
when (batchSize == limit) $ do
maxBatchSize <- atomically $ do
jobs <- readTVar runningJobs
Expand Down Expand Up @@ -433,6 +445,7 @@ updateJobsQuery
-> [(idx, Result)]
-> UTCTime
-> SQL
{-# INLINEABLE updateJobsQuery #-}
updateJobsQuery jobsTable results now =
smconcat
[ "WITH removed AS ("
Expand Down
24 changes: 24 additions & 0 deletions consumers/src/Database/PostgreSQL/Consumers/Utils.hs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ module Database.PostgreSQL.Consumers.Utils
, forkP
, gforkP
, preparedSqlName
, TriggerNotification (triggerNotification)
, ListenNotification (listenNotification)
, mkNotification
) where

import Control.Concurrent.Lifted
Expand All @@ -14,6 +17,7 @@ import Control.Exception.Lifted qualified as E
import Control.Monad.Base
import Control.Monad.Catch
import Control.Monad.Trans.Control
import Data.Functor (void)
import Data.Maybe
import Data.Text qualified as T
import Database.PostgreSQL.PQTypes.Class
Expand All @@ -22,6 +26,7 @@ import Database.PostgreSQL.PQTypes.SQL.Raw
-- | Run an action 'm' that returns a finalizer and perform the returned
-- finalizer after the action 'action' completes.
finalize :: (MonadMask m, MonadBase IO m) => m (m ()) -> m a -> m a
{-# INLINEABLE finalize #-}
finalize m action = do
finalizer <- newEmptyMVar
flip finally (tryTakeMVar finalizer >>= fromMaybe (pure ())) $ do
Expand Down Expand Up @@ -49,13 +54,15 @@ instance Exception ThrownFrom

-- | Stop execution of a thread.
stopExecution :: MonadBase IO m => ThreadId -> m ()
{-# INLINEABLE stopExecution #-}
stopExecution = flip throwTo StopExecution

----------------------------------------

-- | Modified version of 'fork' that propagates thrown exceptions to the parent
-- thread.
forkP :: MonadBaseControl IO m => String -> m () -> m ThreadId
{-# INLINEABLE forkP #-}
forkP = forkImpl fork

-- | Modified version of 'TG.fork' that propagates thrown exceptions to the
Expand All @@ -66,6 +73,7 @@ gforkP
-> String
-> m ()
-> m (ThreadId, m (T.Result ()))
{-# INLINEABLE gforkP #-}
gforkP = forkImpl . TG.fork

----------------------------------------
Expand All @@ -76,6 +84,7 @@ forkImpl
-> String
-> m ()
-> m a
{-# INLINEABLE forkImpl #-}
forkImpl ffork tname m = E.mask $ \release -> do
parent <- myThreadId
ffork $
Expand All @@ -86,3 +95,18 @@ forkImpl ffork tname m = E.mask $ \release -> do

preparedSqlName :: T.Text -> RawSQL () -> QueryName
preparedSqlName baseName tableName = QueryName . T.take 63 $ baseName <> "$" <> unRawSQL tableName

----------------------------------------

newtype TriggerNotification m = TriggerNotification {triggerNotification :: m ()}

newtype ListenNotification m = ListenNotification {listenNotification :: m ()}

mkNotification :: MonadBaseControl IO m => m (TriggerNotification m, ListenNotification m)
{-# INLINEABLE mkNotification #-}
mkNotification = do
notificationRef <- newEmptyMVar
pure
( TriggerNotification . void $ tryPutMVar notificationRef ()
, ListenNotification $ takeMVar notificationRef
)
Loading