diff --git a/services/calendar/pod-calendar/src/calendarController.ts b/services/calendar/pod-calendar/src/calendarController.ts index bdd5445349e..639b7a96341 100644 --- a/services/calendar/pod-calendar/src/calendarController.ts +++ b/services/calendar/pod-calendar/src/calendarController.ts @@ -27,6 +27,11 @@ import { getIntegrations } from './integrations' import { WorkspaceClient } from './workspaceClient' import { cleanUserByEmail } from './kvsUtils' +interface WorkspaceStateInfo { + shouldStart: boolean + needRecheck: boolean +} + export class CalendarController { protected static _instance: CalendarController @@ -72,10 +77,13 @@ export class CalendarController { if (ids.length === 0) return const limiter = new RateLimiter(config.InitLimit) const infos = await this.accountClient.getWorkspacesInfo(ids) + const outdatedWorkspaces = new Set() for (let index = 0; index < infos.length; index++) { const info = infos[index] const integrations = groups.get(info.uuid) ?? [] - if (await this.checkWorkspace(info, integrations)) { + const { shouldStart, needRecheck } = await this.checkWorkspace(info, integrations) + + if (shouldStart) { await limiter.add(async () => { try { this.ctx.info('start workspace', { workspace: info.uuid }) @@ -85,27 +93,105 @@ export class CalendarController { } }) } + + if (needRecheck) { + outdatedWorkspaces.add(info.uuid) + } + if (index % 10 === 0) { this.ctx.info('starting progress', { value: index + 1, total: infos.length }) } } await limiter.waitProcessing() this.ctx.info('Started all workspaces', { count: infos.length }) + + if (outdatedWorkspaces.size > 0) { + this.ctx.info('Found outdated workspaces for future recheck', { count: outdatedWorkspaces.size }) + // Schedule recheck for outdated workspaces + const outdatedGroups = new Map() + for (const workspaceId of outdatedWorkspaces) { + const integrations = groups.get(workspaceId) + if (integrations !== undefined) { + outdatedGroups.set(workspaceId, integrations) + } + } + void this.recheckOutdatedWorkspaces(outdatedGroups) + } } - private async checkWorkspace (info: WorkspaceInfoWithStatus, integrations: Integration[]): Promise { + private async checkWorkspace ( + info: WorkspaceInfoWithStatus, + integrations: Integration[] + ): Promise { if (isDeletingMode(info.mode)) { if (integrations !== undefined) { for (const int of integrations) { await this.accountClient.deleteIntegration(int) } } - return false + return { shouldStart: false, needRecheck: false } } if (!isActiveMode(info.mode)) { this.ctx.info('workspace is not active', { workspaceUuid: info.uuid }) - return false + return { shouldStart: false, needRecheck: false } + } + const lastVisit = (Date.now() - (info.lastVisit ?? 0)) / (3600 * 24 * 1000) // In days + + if (lastVisit > config.WorkspaceInactivityInterval) { + this.ctx.info('workspace is outdated, needs recheck', { + workspaceUuid: info.uuid, + lastVisitDays: lastVisit.toFixed(1) + }) + return { shouldStart: false, needRecheck: true } + } + return { shouldStart: true, needRecheck: false } + } + + // TODO: Subscribe to workspace queue istead of using setTimeout + async recheckOutdatedWorkspaces (outdatedGroups: Map): Promise { + try { + await new Promise((resolve) => { + setTimeout( + () => { + resolve() + }, + 10 * 60 * 1000 + ) // Wait 10 minutes + }) + + const ids = [...outdatedGroups.keys()] + const limiter = new RateLimiter(config.InitLimit) + const infos = await this.accountClient.getWorkspacesInfo(ids) + const stillOutdatedGroups = new Map() + + for (let index = 0; index < infos.length; index++) { + const info = infos[index] + const integrations = outdatedGroups.get(info.uuid) ?? [] + const { shouldStart, needRecheck } = await this.checkWorkspace(info, integrations) + + if (shouldStart) { + await limiter.add(async () => { + try { + this.ctx.info('restarting previously outdated workspace', { workspace: info.uuid }) + await WorkspaceClient.run(this.ctx, this.accountClient, info.uuid) + } catch (err) { + this.ctx.error('Failed to restart workspace', { workspace: info.uuid, error: err }) + } + }) + } else if (needRecheck) { + // Keep this workspace for future recheck + stillOutdatedGroups.set(info.uuid, integrations) + } + } + + await limiter.waitProcessing() + + if (stillOutdatedGroups.size > 0) { + this.ctx.info('Still outdated workspaces, scheduling next recheck', { count: stillOutdatedGroups.size }) + void this.recheckOutdatedWorkspaces(stillOutdatedGroups) + } + } catch (err: any) { + this.ctx.error('Failed to recheck outdated workspaces', { error: err }) } - return true } } diff --git a/services/calendar/pod-calendar/src/config.ts b/services/calendar/pod-calendar/src/config.ts index 8b0134c10e8..2f46eb281ca 100644 --- a/services/calendar/pod-calendar/src/config.ts +++ b/services/calendar/pod-calendar/src/config.ts @@ -23,6 +23,7 @@ interface Config { Credentials: string WATCH_URL: string InitLimit: number + WorkspaceInactivityInterval: number // Interval in days to stop workspace synchronization if not visited } const envMap: { [key in keyof Config]: string } = { @@ -34,7 +35,8 @@ const envMap: { [key in keyof Config]: string } = { Credentials: 'Credentials', WATCH_URL: 'WATCH_URL', InitLimit: 'INIT_LIMIT', - KvsUrl: 'KVS_URL' + KvsUrl: 'KVS_URL', + WorkspaceInactivityInterval: 'WORKSPACE_INACTIVITY_INTERVAL' } const parseNumber = (str: string | undefined): number | undefined => (str !== undefined ? Number(str) : undefined) @@ -48,7 +50,8 @@ const config: Config = (() => { Credentials: process.env[envMap.Credentials], InitLimit: parseNumber(process.env[envMap.InitLimit]) ?? 50, WATCH_URL: process.env[envMap.WATCH_URL], - KvsUrl: process.env[envMap.KvsUrl] + KvsUrl: process.env[envMap.KvsUrl], + WorkspaceInactivityInterval: parseNumber(process.env[envMap.WorkspaceInactivityInterval] ?? '3') // In days } const missingEnv = (Object.keys(params) as Array)