diff --git a/apps/api/src/controllers/live.controller.ts b/apps/api/src/controllers/live.controller.ts index 29931aa8..cd7afe91 100644 --- a/apps/api/src/controllers/live.controller.ts +++ b/apps/api/src/controllers/live.controller.ts @@ -8,17 +8,10 @@ import { transformMinimalEvent, } from '@openpanel/db'; import { setSuperJson } from '@openpanel/json'; -import { - psubscribeToPublishedEvent, - subscribeToPublishedEvent, -} from '@openpanel/redis'; +import { subscribeToPublishedEvent } from '@openpanel/redis'; import { getProjectAccess } from '@openpanel/trpc'; import { getOrganizationAccess } from '@openpanel/trpc/src/access'; -export function getLiveEventInfo(key: string) { - return key.split(':').slice(2) as [string, string]; -} - export function wsVisitors( socket: WebSocket, req: FastifyRequest<{ @@ -36,21 +29,8 @@ export function wsVisitors( } }); - const punsubscribe = psubscribeToPublishedEvent( - '__keyevent@0__:expired', - (key) => { - const [projectId] = getLiveEventInfo(key); - if (projectId && projectId === params.projectId) { - eventBuffer.getActiveVisitorCount(params.projectId).then((count) => { - socket.send(String(count)); - }); - } - }, - ); - socket.on('close', () => { unsubscribe(); - punsubscribe(); }); } diff --git a/packages/db/src/buffers/event-buffer.test.ts b/packages/db/src/buffers/event-buffer.test.ts index 95852bd2..50600c70 100644 --- a/packages/db/src/buffers/event-buffer.test.ts +++ b/packages/db/src/buffers/event-buffer.test.ts @@ -71,8 +71,9 @@ describe('EventBuffer', () => { // Get initial count const initialCount = await eventBuffer.getBufferSize(); - // Add event - await eventBuffer.add(event); + // Add event and flush (events are micro-batched) + eventBuffer.add(event); + await eventBuffer.flush(); // Buffer counter should increase by 1 const newCount = await eventBuffer.getBufferSize(); @@ -109,7 +110,8 @@ describe('EventBuffer', () => { // Add first screen_view const count1 = await eventBuffer.getBufferSize(); - await eventBuffer.add(view1); + eventBuffer.add(view1); + await eventBuffer.flush(); // Should be stored as "last" but NOT in queue yet const count2 = await eventBuffer.getBufferSize(); @@ -124,7 +126,8 @@ describe('EventBuffer', () => { expect(last1!.createdAt.toISOString()).toBe(view1.created_at); // Add second screen_view - await eventBuffer.add(view2); + eventBuffer.add(view2); + await eventBuffer.flush(); // Now view1 should be in buffer const count3 = await eventBuffer.getBufferSize(); @@ -138,7 +141,8 @@ describe('EventBuffer', () => { expect(last2!.createdAt.toISOString()).toBe(view2.created_at); // Add third screen_view - await eventBuffer.add(view3); + eventBuffer.add(view3); + await eventBuffer.flush(); // Now view2 should also be in buffer const count4 = await eventBuffer.getBufferSize(); @@ -174,14 +178,16 @@ describe('EventBuffer', () => { // Add screen_view const count1 = await eventBuffer.getBufferSize(); - await eventBuffer.add(view); + eventBuffer.add(view); + await eventBuffer.flush(); // Should be stored as "last", not in buffer yet const count2 = await eventBuffer.getBufferSize(); expect(count2).toBe(count1); // Add session_end - await eventBuffer.add(sessionEnd); + eventBuffer.add(sessionEnd); + await eventBuffer.flush(); // Both should now be in buffer (+2) const count3 = await eventBuffer.getBufferSize(); @@ -207,7 +213,8 @@ describe('EventBuffer', () => { } as any; const count1 = await eventBuffer.getBufferSize(); - await eventBuffer.add(sessionEnd); + eventBuffer.add(sessionEnd); + await eventBuffer.flush(); // Only session_end should be in buffer (+1) const count2 = await eventBuffer.getBufferSize(); @@ -224,7 +231,8 @@ describe('EventBuffer', () => { created_at: new Date().toISOString(), } as any; - await eventBuffer.add(view); + eventBuffer.add(view); + await eventBuffer.flush(); // Query by profileId const result = await eventBuffer.getLastScreenView({ @@ -248,7 +256,8 @@ describe('EventBuffer', () => { created_at: new Date().toISOString(), } as any; - await eventBuffer.add(view); + eventBuffer.add(view); + await eventBuffer.flush(); // Query by sessionId const result = await eventBuffer.getLastScreenView({ @@ -275,43 +284,47 @@ describe('EventBuffer', () => { expect(await eventBuffer.getBufferSize()).toBe(0); // Add regular event - await eventBuffer.add({ + eventBuffer.add({ project_id: 'p6', name: 'event1', created_at: new Date().toISOString(), } as any); + await eventBuffer.flush(); expect(await eventBuffer.getBufferSize()).toBe(1); // Add another regular event - await eventBuffer.add({ + eventBuffer.add({ project_id: 'p6', name: 'event2', created_at: new Date().toISOString(), } as any); + await eventBuffer.flush(); expect(await eventBuffer.getBufferSize()).toBe(2); // Add screen_view (not counted until flushed) - await eventBuffer.add({ + eventBuffer.add({ project_id: 'p6', profile_id: 'u6', session_id: 'session_6', name: 'screen_view', created_at: new Date().toISOString(), } as any); + await eventBuffer.flush(); // Still 2 (screen_view is pending) expect(await eventBuffer.getBufferSize()).toBe(2); // Add another screen_view (first one gets flushed) - await eventBuffer.add({ + eventBuffer.add({ project_id: 'p6', profile_id: 'u6', session_id: 'session_6', name: 'screen_view', created_at: new Date(Date.now() + 1000).toISOString(), } as any); + await eventBuffer.flush(); // Now 3 (2 regular + 1 flushed screen_view) expect(await eventBuffer.getBufferSize()).toBe(3); @@ -330,8 +343,9 @@ describe('EventBuffer', () => { created_at: new Date(Date.now() + 1000).toISOString(), } as any; - await eventBuffer.add(event1); - await eventBuffer.add(event2); + eventBuffer.add(event1); + eventBuffer.add(event2); + await eventBuffer.flush(); expect(await eventBuffer.getBufferSize()).toBe(2); @@ -361,12 +375,13 @@ describe('EventBuffer', () => { // Add 4 events for (let i = 0; i < 4; i++) { - await eb.add({ + eb.add({ project_id: 'p8', name: `event${i}`, created_at: new Date(Date.now() + i).toISOString(), } as any); } + await eb.flush(); const insertSpy = vi .spyOn(ch, 'insert') @@ -396,7 +411,8 @@ describe('EventBuffer', () => { created_at: new Date().toISOString(), } as any; - await eventBuffer.add(event); + eventBuffer.add(event); + await eventBuffer.flush(); const count = await eventBuffer.getActiveVisitorCount('p9'); expect(count).toBeGreaterThanOrEqual(1); @@ -439,10 +455,11 @@ describe('EventBuffer', () => { created_at: new Date(t0 + 2000).toISOString(), } as any; - await eventBuffer.add(view1a); - await eventBuffer.add(view2a); - await eventBuffer.add(view1b); // Flushes view1a - await eventBuffer.add(view2b); // Flushes view2a + eventBuffer.add(view1a); + eventBuffer.add(view2a); + eventBuffer.add(view1b); // Flushes view1a + eventBuffer.add(view2b); // Flushes view2a + await eventBuffer.flush(); // Should have 2 events in buffer (one from each session) expect(await eventBuffer.getBufferSize()).toBe(2); @@ -470,7 +487,8 @@ describe('EventBuffer', () => { } as any; const count1 = await eventBuffer.getBufferSize(); - await eventBuffer.add(view); + eventBuffer.add(view); + await eventBuffer.flush(); // Should go directly to buffer (no session_id) const count2 = await eventBuffer.getBufferSize(); @@ -498,8 +516,9 @@ describe('EventBuffer', () => { created_at: new Date(t0 + 1000).toISOString(), } as any; - await eventBuffer.add(view1); - await eventBuffer.add(view2); + eventBuffer.add(view1); + eventBuffer.add(view2); + await eventBuffer.flush(); // Both sessions should have their own "last" const lastSession1 = await eventBuffer.getLastScreenView({ diff --git a/packages/db/src/buffers/event-buffer.ts b/packages/db/src/buffers/event-buffer.ts index d305372a..15d29b5b 100644 --- a/packages/db/src/buffers/event-buffer.ts +++ b/packages/db/src/buffers/event-buffer.ts @@ -25,8 +25,21 @@ import { BaseBuffer } from './base-buffer'; * - Retrieve the last screen_view (don't modify it) * - Push both screen_view and session_end to buffer * 4. Flush: Simply process all events from the list buffer + * + * Optimizations: + * - Micro-batching: Events are buffered locally and flushed every 10ms to reduce Redis round-trips + * - Batched publishes: All PUBLISH commands are included in the multi pipeline + * - Simplified active visitor tracking: Only uses ZADD (removed redundant heartbeat SET) */ +// Pending event for local buffer +interface PendingEvent { + event: IClickhouseEvent; + eventJson: string; + eventWithTimestamp?: string; + type: 'regular' | 'screen_view' | 'session_end'; +} + export class EventBuffer extends BaseBuffer { // Configurable limits private batchSize = process.env.EVENT_BUFFER_BATCH_SIZE @@ -36,6 +49,27 @@ export class EventBuffer extends BaseBuffer { ? Number.parseInt(process.env.EVENT_BUFFER_CHUNK_SIZE, 10) : 1000; + // Micro-batching configuration + private microBatchIntervalMs = process.env.EVENT_BUFFER_MICRO_BATCH_MS + ? Number.parseInt(process.env.EVENT_BUFFER_MICRO_BATCH_MS, 10) + : 10; // Flush every 10ms by default + private microBatchMaxSize = process.env.EVENT_BUFFER_MICRO_BATCH_SIZE + ? Number.parseInt(process.env.EVENT_BUFFER_MICRO_BATCH_SIZE, 10) + : 100; // Or when we hit 100 events + + // Local event buffer for micro-batching + private pendingEvents: PendingEvent[] = []; + private flushTimer: ReturnType | null = null; + private isFlushing = false; + + // Throttled publish configuration + private publishThrottleMs = process.env.EVENT_BUFFER_PUBLISH_THROTTLE_MS + ? Number.parseInt(process.env.EVENT_BUFFER_PUBLISH_THROTTLE_MS, 10) + : 1000; // Publish at most once per second + private lastPublishTime = 0; + private pendingPublishEvent: IClickhouseEvent | null = null; + private publishTimer: ReturnType | null = null; + private activeVisitorsExpiration = 60 * 5; // 5 minutes // LIST - Stores all events ready to be flushed @@ -190,98 +224,228 @@ return added } bulkAdd(events: IClickhouseEvent[]) { - const redis = getRedisCache(); - const multi = redis.multi(); + // Add all events to local buffer - they will be flushed together for (const event of events) { - this.add(event, multi); + this.add(event); } - return multi.exec(); } /** - * Add an event into Redis buffer. + * Add an event into the local buffer for micro-batching. + * + * Events are buffered locally and flushed to Redis every microBatchIntervalMs + * or when microBatchMaxSize is reached. This dramatically reduces Redis round-trips. * * Logic: * - screen_view: Store as "last" for session, flush previous if exists * - session_end: Flush last screen_view + session_end * - Other events: Add directly to queue */ - async add(event: IClickhouseEvent, _multi?: ReturnType) { + add(event: IClickhouseEvent, _multi?: ReturnType) { + const eventJson = JSON.stringify(event); + + // Determine event type and prepare data + let type: PendingEvent['type'] = 'regular'; + let eventWithTimestamp: string | undefined; + + if (event.session_id && event.name === 'screen_view') { + type = 'screen_view'; + const timestamp = new Date(event.created_at || Date.now()).getTime(); + eventWithTimestamp = JSON.stringify({ + event: event, + ts: timestamp, + }); + } else if (event.session_id && event.name === 'session_end') { + type = 'session_end'; + } + + const pendingEvent: PendingEvent = { + event, + eventJson, + eventWithTimestamp, + type, + }; + + // If a multi was provided (legacy bulkAdd pattern), add directly without batching + if (_multi) { + this.addToMulti(_multi, pendingEvent); + return; + } + + // Add to local buffer for micro-batching + this.pendingEvents.push(pendingEvent); + + // Check if we should flush immediately due to size + if (this.pendingEvents.length >= this.microBatchMaxSize) { + this.flushLocalBuffer(); + return; + } + + // Schedule flush if not already scheduled + if (!this.flushTimer) { + this.flushTimer = setTimeout(() => { + this.flushTimer = null; + this.flushLocalBuffer(); + }, this.microBatchIntervalMs); + } + } + + /** + * Add a single pending event to a multi pipeline. + * Used both for legacy _multi pattern and during batch flush. + */ + private addToMulti(multi: ReturnType, pending: PendingEvent) { + const { event, eventJson, eventWithTimestamp, type } = pending; + + if (type === 'screen_view' && event.session_id) { + const sessionKey = this.getLastScreenViewKeyBySession(event.session_id); + const profileKey = event.profile_id + ? this.getLastScreenViewKeyByProfile(event.project_id, event.profile_id) + : ''; + + this.evalScript( + multi, + 'addScreenView', + this.addScreenViewScript, + 4, + sessionKey, + profileKey, + this.queueKey, + this.bufferCounterKey, + eventWithTimestamp!, + '3600', + ); + } else if (type === 'session_end' && event.session_id) { + const sessionKey = this.getLastScreenViewKeyBySession(event.session_id); + const profileKey = event.profile_id + ? this.getLastScreenViewKeyByProfile(event.project_id, event.profile_id) + : ''; + + this.evalScript( + multi, + 'addSessionEnd', + this.addSessionEndScript, + 4, + sessionKey, + profileKey, + this.queueKey, + this.bufferCounterKey, + eventJson, + ); + } else { + // Regular events go directly to queue + multi.rpush(this.queueKey, eventJson).incr(this.bufferCounterKey); + } + + // Active visitor tracking (simplified - only ZADD, no redundant SET) + if (event.profile_id) { + this.incrementActiveVisitorCount( + multi, + event.project_id, + event.profile_id, + ); + } + } + + /** + * Force flush all pending events from local buffer to Redis immediately. + * Useful for testing or when you need to ensure all events are persisted. + */ + public async flush() { + // Clear any pending timer + if (this.flushTimer) { + clearTimeout(this.flushTimer); + this.flushTimer = null; + } + await this.flushLocalBuffer(); + } + + /** + * Flush all pending events from local buffer to Redis in a single pipeline. + * This is the core optimization - batching many events into one round-trip. + */ + private async flushLocalBuffer() { + if (this.isFlushing || this.pendingEvents.length === 0) { + return; + } + + this.isFlushing = true; + + // Grab current pending events and clear buffer + const eventsToFlush = this.pendingEvents; + this.pendingEvents = []; + try { const redis = getRedisCache(); - const eventJson = JSON.stringify(event); - const multi = _multi || redis.multi(); - - if (event.session_id && event.name === 'screen_view') { - // Handle screen_view - const sessionKey = this.getLastScreenViewKeyBySession(event.session_id); - const profileKey = event.profile_id - ? this.getLastScreenViewKeyByProfile( - event.project_id, - event.profile_id, - ) - : ''; - const timestamp = new Date(event.created_at || Date.now()).getTime(); - - // Combine event and timestamp into single JSON for atomic operations - const eventWithTimestamp = JSON.stringify({ - event: event, - ts: timestamp, - }); + const multi = redis.multi(); - this.evalScript( - multi, - 'addScreenView', - this.addScreenViewScript, - 4, - sessionKey, - profileKey, - this.queueKey, - this.bufferCounterKey, - eventWithTimestamp, - '3600', // 1 hour TTL - ); - } else if (event.session_id && event.name === 'session_end') { - // Handle session_end - const sessionKey = this.getLastScreenViewKeyBySession(event.session_id); - const profileKey = event.profile_id - ? this.getLastScreenViewKeyByProfile( - event.project_id, - event.profile_id, - ) - : ''; - - this.evalScript( - multi, - 'addSessionEnd', - this.addSessionEndScript, - 4, - sessionKey, - profileKey, - this.queueKey, - this.bufferCounterKey, - eventJson, - ); - } else { - // All other events go directly to queue - multi.rpush(this.queueKey, eventJson).incr(this.bufferCounterKey); + // Add all events to the pipeline + for (const pending of eventsToFlush) { + this.addToMulti(multi, pending); } - if (event.profile_id) { - this.incrementActiveVisitorCount( - multi, - event.project_id, - event.profile_id, - ); - } + await multi.exec(); - if (!_multi) { - await multi.exec(); + // Throttled publish - just signal that events were received + // Store the last event for publishing (we only need one to signal activity) + const lastEvent = eventsToFlush[eventsToFlush.length - 1]; + if (lastEvent) { + this.scheduleThrottledPublish(lastEvent.event); } - - await publishEvent('events', 'received', transformEvent(event)); } catch (error) { - this.logger.error('Failed to add event to Redis buffer', { error }); + this.logger.error('Failed to flush local buffer to Redis', { + error, + eventCount: eventsToFlush.length, + }); + } finally { + this.isFlushing = false; + } + } + + /** + * Throttled publish - publishes at most once per publishThrottleMs. + * Instead of publishing every event, we just signal that events were received. + * This reduces pub/sub load from 3000/s to 1/s. + */ + private scheduleThrottledPublish(event: IClickhouseEvent) { + // Always keep the latest event + this.pendingPublishEvent = event; + + const now = Date.now(); + const timeSinceLastPublish = now - this.lastPublishTime; + + // If enough time has passed, publish immediately + if (timeSinceLastPublish >= this.publishThrottleMs) { + this.executeThrottledPublish(); + return; + } + + // Otherwise, schedule a publish if not already scheduled + if (!this.publishTimer) { + const delay = this.publishThrottleMs - timeSinceLastPublish; + this.publishTimer = setTimeout(() => { + this.publishTimer = null; + this.executeThrottledPublish(); + }, delay); + } + } + + /** + * Execute the throttled publish with the latest pending event. + */ + private executeThrottledPublish() { + if (!this.pendingPublishEvent) { + return; + } + + const event = this.pendingPublishEvent; + this.pendingPublishEvent = null; + this.lastPublishTime = Date.now(); + + // Fire-and-forget publish (no multi = returns Promise) + const result = publishEvent('events', 'received', transformEvent(event)); + if (result instanceof Promise) { + result.catch(() => {}); } } @@ -440,18 +604,22 @@ return added }); } - private async incrementActiveVisitorCount( + /** + * Track active visitors using ZADD only. + * + * Optimization: Removed redundant heartbeat SET key. + * The ZADD score (timestamp) already tracks when a visitor was last seen. + * We use ZRANGEBYSCORE in getActiveVisitorCount to filter active visitors. + */ + private incrementActiveVisitorCount( multi: ReturnType, projectId: string, profileId: string, ) { - // Track active visitors and emit expiry events when inactive for TTL const now = Date.now(); const zsetKey = `live:visitors:${projectId}`; - const heartbeatKey = `live:visitor:${projectId}:${profileId}`; - return multi - .zadd(zsetKey, now, profileId) - .set(heartbeatKey, '1', 'EX', this.activeVisitorsExpiration); + // Only ZADD - the score is the timestamp, no need for separate heartbeat key + return multi.zadd(zsetKey, now, profileId); } public async getActiveVisitorCount(projectId: string): Promise {