diff --git a/server/src-lib/Hasura/Eventing/ScheduledTrigger.hs b/server/src-lib/Hasura/Eventing/ScheduledTrigger.hs index dcddfa85b7518..78a908450385a 100644 --- a/server/src-lib/Hasura/Eventing/ScheduledTrigger.hs +++ b/server/src-lib/Hasura/Eventing/ScheduledTrigger.hs @@ -123,48 +123,45 @@ runScheduledEventsGenerator logger pgpool getSC = do forever $ do sc <- getSC -- get scheduled triggers from cache - let scheduledTriggers = Map.elems $ scScheduledTriggers sc + let scheduledTriggers = scScheduledTriggers sc - -- get scheduled trigger stats from db + -- get deprived scheduled triggers(those which have < 100 scheduled future events) + -- that need hydration runExceptT - (Q.runTx pgpool (Q.ReadCommitted, Just Q.ReadOnly) getScheduledTriggerStats) >>= \case + (Q.runTx pgpool (Q.ReadCommitted, Just Q.ReadOnly) getDeprivedScheduledTriggersStats) >>= \case Left err -> L.unLogger logger $ ScheduledTriggerInternalErr $ err500 Unexpected (T.pack $ show err) - Right scheduledTriggerStats -> do + Right deprivedScheduledTriggerStats -> do - -- join scheduled triggers with stats and produce @[(ScheduledTriggerInfo, ScheduledTriggerStats)]@ - scheduledTriggersWithStats' <- mapM (withStats scheduledTriggerStats) scheduledTriggers - let scheduledTriggersWithStats = catMaybes scheduledTriggersWithStats' - - -- filter out scheduled trigger which have more than 100 upcoming events already - let scheduledTriggersForHydration = - filter (\(_sti, stats) -> stsUpcomingEventsCount stats < 100) scheduledTriggersWithStats + -- join stats with scheduled triggers and produce @[(ScheduledTriggerInfo, ScheduledTriggerStats)]@ + scheduledTriggersForHydrationWithStats' <- mapM (withST scheduledTriggers) deprivedScheduledTriggerStats + let scheduledTriggersForHydrationWithStats = catMaybes scheduledTriggersForHydrationWithStats' -- insert scheduled events for scheduled triggers that need hydration runExceptT (Q.runTx pgpool (Q.ReadCommitted, Just Q.ReadWrite) $ - insertScheduledEventsFor scheduledTriggersForHydration) >>= \case + insertScheduledEventsFor scheduledTriggersForHydrationWithStats) >>= \case Right _ -> pure () Left err -> L.unLogger logger $ ScheduledTriggerInternalErr $ err500 Unexpected (T.pack $ show err) threadDelay oneMinute where - getScheduledTriggerStats = liftTx $ do + getDeprivedScheduledTriggersStats = liftTx $ do map uncurryStats <$> Q.listQE defaultTxErrorHandler [Q.sql| SELECT name, upcoming_events_count, max_scheduled_time FROM hdb_catalog.hdb_scheduled_events_stats + WHERE upcoming_events_count < 100 |] () True uncurryStats (n, count, maxTs) = ScheduledTriggerStats n count maxTs - withStats stStats sti = do - let mStats = find (\ScheduledTriggerStats{stsName} -> stsName == stiName sti) stStats - case mStats of + withST stis stat = do + case Map.lookup (stsName stat) stis of Nothing -> do L.unLogger logger $ - ScheduledTriggerInternalErr $ err500 Unexpected "could not find scheduled trigger in stats" + ScheduledTriggerInternalErr $ err500 Unexpected "could not find scheduled trigger in the schema cache" pure Nothing - Just stats -> pure $ Just (sti, stats) + Just sti -> pure $ Just (sti, stat) insertScheduledEventsFor :: [(ScheduledTriggerInfo, ScheduledTriggerStats)] -> Q.TxE QErr () insertScheduledEventsFor scheduledTriggersWithStats = do