@@ -8,9 +8,130 @@ import { emptyOrNullToUndefined } from "../util"
88import { sendEmailTemplateToUser } from "./kafkaConsumer/common/EmailTemplater/sendEmailTemplate"
99
1010const BATCH_SIZE = 100
11+ const RETRY_CHECK_INTERVAL_MS = 10 * 60 * 1000
12+ const RETRY_WINDOW_MS = 3 * 24 * 60 * 60 * 1000
13+ const RETRY_BACKOFF_BASE_MS = 10 * 60 * 1000
14+ const RETRY_BACKOFF_MAX_MS = 6 * 60 * 60 * 1000
15+ const RETRY_CANDIDATE_BATCH_SIZE = 500
16+ const RETRY_REQUEUE_LIMIT = 500
17+ const TEMPLATE_ID_CACHE_TTL_MS = 60 * 60 * 1000
1118
1219const logger = sentryLogger ( { service : "background-emailer" } )
1320
21+ // Backoff is age-based (created_at), since we don't track attempt count.
22+ const getRetryBackoffMs = ( createdAt : Date , now : Date ) => {
23+ const ageMs = Math . max ( 0 , now . getTime ( ) - createdAt . getTime ( ) )
24+ const exponent = Math . max (
25+ 0 ,
26+ Math . floor ( Math . log2 ( ageMs / RETRY_BACKOFF_BASE_MS + 1 ) ) - 1 ,
27+ )
28+ return Math . min ( RETRY_BACKOFF_MAX_MS , RETRY_BACKOFF_BASE_MS * 2 ** exponent )
29+ }
30+
31+ let cachedTemplateIds : string [ ] = [ ]
32+ let lastTemplateIdFetchAt = 0
33+
34+ const getCourseStatsTemplateIds = async ( now : number ) => {
35+ if (
36+ cachedTemplateIds . length > 0 &&
37+ now - lastTemplateIdFetchAt < TEMPLATE_ID_CACHE_TTL_MS
38+ ) {
39+ return cachedTemplateIds
40+ }
41+
42+ const courseStatsTemplateIds = await prisma . courseStatsSubscription . findMany ( {
43+ select : { email_template_id : true } ,
44+ distinct : [ "email_template_id" ] ,
45+ } )
46+ cachedTemplateIds = courseStatsTemplateIds
47+ . map ( ( entry ) => entry . email_template_id )
48+ . filter ( ( id ) : id is string => Boolean ( id ) )
49+ lastTemplateIdFetchAt = now
50+
51+ return cachedTemplateIds
52+ }
53+
54+ const retryErroredCourseStatsEmails = async ( ) => {
55+ const now = new Date ( )
56+ const retryWindowStart = new Date ( now . getTime ( ) - RETRY_WINDOW_MS )
57+
58+ logger . info (
59+ `Retry check: course stats errors created after ${ retryWindowStart . toISOString ( ) } with exponential backoff` ,
60+ )
61+
62+ const templateIds = await getCourseStatsTemplateIds ( now . getTime ( ) )
63+
64+ if ( templateIds . length === 0 ) {
65+ logger . info ( "Retry check: no course stats email templates found, skipping" )
66+ return
67+ }
68+
69+ logger . info (
70+ `Retry check: found ${ templateIds . length } course stats email templates` ,
71+ )
72+
73+ let cursorId : string | undefined
74+ let totalCandidates = 0
75+ const eligibleIds : string [ ] = [ ]
76+
77+ while ( eligibleIds . length < RETRY_REQUEUE_LIMIT ) {
78+ const batch = await prisma . emailDelivery . findMany ( {
79+ where : {
80+ error : true ,
81+ sent : false ,
82+ created_at : { gte : retryWindowStart } ,
83+ email_template_id : { in : templateIds } ,
84+ } ,
85+ select : {
86+ id : true ,
87+ created_at : true ,
88+ updated_at : true ,
89+ error_message : true ,
90+ } ,
91+ orderBy : { updated_at : "asc" } ,
92+ take : RETRY_CANDIDATE_BATCH_SIZE ,
93+ ...( cursorId ? { cursor : { id : cursorId } , skip : 1 } : { } ) ,
94+ } )
95+
96+ if ( batch . length === 0 ) {
97+ break
98+ }
99+
100+ totalCandidates += batch . length
101+ cursorId = batch [ batch . length - 1 ] ?. id
102+
103+ for ( const candidate of batch ) {
104+ const backoffMs = getRetryBackoffMs ( candidate . created_at , now )
105+ const elapsedSinceUpdateMs =
106+ now . getTime ( ) - candidate . updated_at . getTime ( )
107+ if ( elapsedSinceUpdateMs >= backoffMs ) {
108+ eligibleIds . push ( candidate . id )
109+ if ( eligibleIds . length >= RETRY_REQUEUE_LIMIT ) {
110+ break
111+ }
112+ }
113+ }
114+ }
115+
116+ if ( eligibleIds . length === 0 ) {
117+ logger . info (
118+ `Retry check: no course stats emails eligible for retry (candidates scanned: ${ totalCandidates } )` ,
119+ )
120+ return
121+ }
122+
123+ const { count } = await prisma . emailDelivery . updateMany ( {
124+ where : { id : { in : eligibleIds } } ,
125+ data : {
126+ error : { set : false } ,
127+ } ,
128+ } )
129+
130+ logger . info (
131+ `Re-queued ${ count } errored course stats emails for retry (candidates scanned: ${ totalCandidates } , capped at ${ RETRY_REQUEUE_LIMIT } )` ,
132+ )
133+ }
134+
14135const sendEmail = async ( emailDelivery : EmailDelivery ) => {
15136 const { user, email_template, organization } =
16137 ( await prisma . emailDelivery . findUnique ( {
@@ -83,7 +204,21 @@ const sendEmail = async (emailDelivery: EmailDelivery) => {
83204}
84205
85206const main = async ( ) => {
207+ let lastRetryCheckAt = 0
86208 while ( true ) {
209+ const now = Date . now ( )
210+ if ( now - lastRetryCheckAt >= RETRY_CHECK_INTERVAL_MS ) {
211+ try {
212+ await retryErroredCourseStatsEmails ( )
213+ } catch ( error : any ) {
214+ logger . error (
215+ new EmailTemplaterError ( "Retry check failed" , undefined , error ) ,
216+ )
217+ } finally {
218+ lastRetryCheckAt = Date . now ( )
219+ }
220+ }
221+
87222 const emailsToDeliver = await prisma . emailDelivery . findMany ( {
88223 where : { sent : false , error : false } ,
89224 take : BATCH_SIZE ,
0 commit comments