@@ -11,6 +11,7 @@ import { Settings } from "./types.js";
1111import { groupmqLifecycleExceptionWrapper } from "./utils.js" ;
1212import { syncSearchContexts } from "./ee/syncSearchContexts.js" ;
1313import { captureEvent } from "./posthog.js" ;
14+ import { PromClient } from "./promClient.js" ;
1415
1516const LOG_TAG = 'connection-manager' ;
1617const logger = createLogger ( LOG_TAG ) ;
@@ -38,6 +39,7 @@ export class ConnectionManager {
3839 private db : PrismaClient ,
3940 private settings : Settings ,
4041 redis : Redis ,
42+ private promClient : PromClient ,
4143 ) {
4244 this . queue = new Queue < JobPayload > ( {
4345 redis,
@@ -137,6 +139,8 @@ export class ConnectionManager {
137139 } ,
138140 jobId : job . id ,
139141 } ) ;
142+
143+ this . promClient . pendingConnectionSyncJobs . inc ( { connection : job . connection . name } ) ;
140144 }
141145
142146 return jobs . map ( job => job . id ) ;
@@ -147,6 +151,9 @@ export class ConnectionManager {
147151 const logger = createJobLogger ( jobId ) ;
148152 logger . info ( `Running connection sync job ${ jobId } for connection ${ connectionName } (id: ${ job . data . connectionId } ) (attempt ${ job . attempts + 1 } / ${ job . maxAttempts } )` ) ;
149153
154+ this . promClient . pendingConnectionSyncJobs . dec ( { connection : connectionName } ) ;
155+ this . promClient . activeConnectionSyncJobs . inc ( { connection : connectionName } ) ;
156+
150157 // @note : We aren't actually doing anything with this atm.
151158 const abortController = new AbortController ( ) ;
152159
@@ -265,7 +272,7 @@ export class ConnectionManager {
265272 private onJobCompleted = async ( job : Job < JobPayload > ) =>
266273 groupmqLifecycleExceptionWrapper ( 'onJobCompleted' , logger , async ( ) => {
267274 const logger = createJobLogger ( job . id ) ;
268- const { connectionId, orgId } = job . data ;
275+ const { connectionId, connectionName , orgId } = job . data ;
269276
270277 await this . db . connectionSyncJob . update ( {
271278 where : {
@@ -301,6 +308,9 @@ export class ConnectionManager {
301308
302309 logger . info ( `Connection sync job ${ job . id } for connection ${ job . data . connectionName } (id: ${ job . data . connectionId } ) completed` ) ;
303310
311+ this . promClient . activeConnectionSyncJobs . dec ( { connection : connectionName } ) ;
312+ this . promClient . connectionSyncJobSuccessTotal . inc ( { connection : connectionName } ) ;
313+
304314 const result = job . returnvalue as JobResult ;
305315 captureEvent ( 'backend_connection_sync_job_completed' , {
306316 connectionId : connectionId ,
@@ -328,12 +338,17 @@ export class ConnectionManager {
328338 }
329339 } ) ;
330340
341+ this . promClient . activeConnectionSyncJobs . dec ( { connection : connection . name } ) ;
342+ this . promClient . connectionSyncJobFailTotal . inc ( { connection : connection . name } ) ;
343+
331344 logger . error ( `Failed job ${ job . id } for connection ${ connection . name } (id: ${ connection . id } ). Attempt ${ attempt } / ${ job . opts . attempts } . Failing job.` ) ;
332345 } else {
333346 const connection = await this . db . connection . findUniqueOrThrow ( {
334347 where : { id : job . data . connectionId } ,
335348 } ) ;
336349
350+ this . promClient . connectionSyncJobReattemptsTotal . inc ( { connection : connection . name } ) ;
351+
337352 logger . warn ( `Failed job ${ job . id } for connection ${ connection . name } (id: ${ connection . id } ). Attempt ${ attempt } / ${ job . opts . attempts } . Retrying.` ) ;
338353 }
339354
@@ -358,6 +373,9 @@ export class ConnectionManager {
358373 }
359374 } ) ;
360375
376+ this . promClient . activeConnectionSyncJobs . dec ( { connection : connection . name } ) ;
377+ this . promClient . connectionSyncJobFailTotal . inc ( { connection : connection . name } ) ;
378+
361379 logger . error ( `Job ${ jobId } stalled for connection ${ connection . name } (id: ${ connection . id } )` ) ;
362380
363381 captureEvent ( 'backend_connection_sync_job_failed' , {
0 commit comments