Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 15 additions & 18 deletions server/src-lib/Hasura/Eventing/ScheduledTrigger.hs
Original file line number Diff line number Diff line change
Expand Up @@ -123,48 +123,45 @@ runScheduledEventsGenerator logger pgpool getSC = do
forever $ do
sc <- getSC
-- get scheduled triggers from cache
let scheduledTriggers = Map.elems $ scScheduledTriggers sc
let scheduledTriggers = scScheduledTriggers sc

-- get scheduled trigger stats from db
-- get deprived scheduled triggers(those which have < 100 scheduled future events)
-- that need hydration
runExceptT
(Q.runTx pgpool (Q.ReadCommitted, Just Q.ReadOnly) getScheduledTriggerStats) >>= \case
(Q.runTx pgpool (Q.ReadCommitted, Just Q.ReadOnly) getDeprivedScheduledTriggersStats) >>= \case
Left err -> L.unLogger logger $
ScheduledTriggerInternalErr $ err500 Unexpected (T.pack $ show err)
Right scheduledTriggerStats -> do
Right deprivedScheduledTriggerStats -> do

-- join scheduled triggers with stats and produce @[(ScheduledTriggerInfo, ScheduledTriggerStats)]@
scheduledTriggersWithStats' <- mapM (withStats scheduledTriggerStats) scheduledTriggers
let scheduledTriggersWithStats = catMaybes scheduledTriggersWithStats'

-- filter out scheduled trigger which have more than 100 upcoming events already
let scheduledTriggersForHydration =
filter (\(_sti, stats) -> stsUpcomingEventsCount stats < 100) scheduledTriggersWithStats
-- join stats with scheduled triggers and produce @[(ScheduledTriggerInfo, ScheduledTriggerStats)]@
scheduledTriggersForHydrationWithStats' <- mapM (withST scheduledTriggers) deprivedScheduledTriggerStats
let scheduledTriggersForHydrationWithStats = catMaybes scheduledTriggersForHydrationWithStats'

-- insert scheduled events for scheduled triggers that need hydration
runExceptT
(Q.runTx pgpool (Q.ReadCommitted, Just Q.ReadWrite) $
insertScheduledEventsFor scheduledTriggersForHydration) >>= \case
insertScheduledEventsFor scheduledTriggersForHydrationWithStats) >>= \case
Right _ -> pure ()
Left err ->
L.unLogger logger $ ScheduledTriggerInternalErr $ err500 Unexpected (T.pack $ show err)
threadDelay oneMinute
where
getScheduledTriggerStats = liftTx $ do
getDeprivedScheduledTriggersStats = liftTx $ do
map uncurryStats <$>
Q.listQE defaultTxErrorHandler
[Q.sql|
SELECT name, upcoming_events_count, max_scheduled_time
FROM hdb_catalog.hdb_scheduled_events_stats
WHERE upcoming_events_count < 100
|] () True
uncurryStats (n, count, maxTs) = ScheduledTriggerStats n count maxTs
withStats stStats sti = do
let mStats = find (\ScheduledTriggerStats{stsName} -> stsName == stiName sti) stStats
case mStats of
withST stis stat = do
case Map.lookup (stsName stat) stis of
Nothing -> do
L.unLogger logger $
ScheduledTriggerInternalErr $ err500 Unexpected "could not find scheduled trigger in stats"
ScheduledTriggerInternalErr $ err500 Unexpected "could not find scheduled trigger in the schema cache"
pure Nothing
Just stats -> pure $ Just (sti, stats)
Just sti -> pure $ Just (sti, stat)

insertScheduledEventsFor :: [(ScheduledTriggerInfo, ScheduledTriggerStats)] -> Q.TxE QErr ()
insertScheduledEventsFor scheduledTriggersWithStats = do
Expand Down