Skip to content

Commit 4940e14

Browse files
authored
Retry event buffer submissions (#31)
* Retry event buffer submissions * Retry event buffer submissions * test
1 parent 24c0ff3 commit 4940e14

File tree

2 files changed

+116
-7
lines changed

2 files changed

+116
-7
lines changed

src/events.test.ts

Lines changed: 63 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,20 +5,32 @@ import { Events } from "./api/resources/events/client/Client";
55
import { CreateEventRequestBody } from "./api";
66
import { Logger } from "./logger";
77

8+
process.env.NODE_ENV = "test";
9+
810
jest.useFakeTimers();
911

1012
describe("EventBuffer", () => {
1113
let mockEventsApi: jest.Mocked<Events>;
1214
let mockLogger: jest.Mocked<Logger>;
1315

1416
beforeEach(() => {
17+
const mockResponse = {
18+
data: {
19+
events: [],
20+
},
21+
params: {},
22+
};
23+
1524
mockEventsApi = {
16-
createEventBatch: jest.fn().mockResolvedValue(undefined),
25+
createEventBatch: jest.fn().mockResolvedValue(mockResponse),
1726
} as any;
1827

1928
mockLogger = {
2029
error: jest.fn(),
2130
log: jest.fn(),
31+
warn: jest.fn(),
32+
info: jest.fn(),
33+
debug: jest.fn(),
2234
} as any;
2335
});
2436

@@ -70,9 +82,12 @@ describe("EventBuffer", () => {
7082
// The rest of the tests remain unchanged as they don't directly test the maxSize behavior
7183
it("should log error if flushing fails", async () => {
7284
mockEventsApi.createEventBatch.mockRejectedValue(new Error("Flush error"));
85+
7386
const buffer = new EventBuffer(mockEventsApi, {
7487
logger: mockLogger,
7588
interval: 1000,
89+
maxRetries: 1,
90+
initialRetryDelay: 1,
7691
});
7792

7893
const event: CreateEventRequestBody = {
@@ -87,10 +102,14 @@ describe("EventBuffer", () => {
87102
await buffer.push(event);
88103
await buffer.push(event);
89104

90-
// Manually trigger flush
105+
// Since we're skipping delays in test environment,
106+
// we can just call flush directly
91107
await buffer.flush();
92108

93-
expect(mockLogger.error).toHaveBeenCalledWith("Failed to flush events", expect.any(Error));
109+
expect(mockLogger.error).toHaveBeenCalledWith(
110+
"Event batch submission failed after 1 retries:",
111+
expect.any(Error)
112+
);
94113
});
95114

96115
it("should stop accepting events after stop is called", async () => {
@@ -166,4 +185,45 @@ describe("EventBuffer", () => {
166185

167186
expect(mockEventsApi.createEventBatch).not.toHaveBeenCalled();
168187
});
188+
189+
it("should retry and succeed after a failure", async () => {
190+
const mockResponse = {
191+
data: {
192+
events: [],
193+
},
194+
params: {},
195+
};
196+
197+
// First call fails, second succeeds
198+
mockEventsApi.createEventBatch
199+
.mockRejectedValueOnce(new Error("Temporary failure"))
200+
.mockResolvedValueOnce(mockResponse);
201+
202+
const buffer = new EventBuffer(mockEventsApi, {
203+
logger: mockLogger,
204+
interval: 1000,
205+
maxRetries: 3,
206+
initialRetryDelay: 1,
207+
});
208+
209+
const event: CreateEventRequestBody = {
210+
body: {
211+
company: { id: "test-company" },
212+
event: "test-event",
213+
user: { id: "test-user" },
214+
},
215+
eventType: "track",
216+
sentAt: new Date(),
217+
};
218+
await buffer.push(event);
219+
220+
// Since we're skipping delays in test environment,
221+
// we can just call flush directly
222+
await buffer.flush();
223+
224+
// Verify that the createEventBatch was called twice (once failed, once succeeded)
225+
expect(mockEventsApi.createEventBatch).toHaveBeenCalledTimes(2);
226+
227+
expect(mockLogger.info).toHaveBeenCalledWith("Event batch submission succeeded after 1 retries");
228+
});
169229
});

src/events.ts

Lines changed: 53 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,16 @@ import { ConsoleLogger, Logger } from "./logger";
44

55
const DEFAULT_FLUSH_INTERVAL = 1000; // 1 second
66
const DEFAULT_MAX_SIZE = 1000; // 1000 items
7+
const DEFAULT_MAX_RETRIES = 3;
8+
const DEFAULT_INITIAL_RETRY_DELAY = 1000; // 1 second in milliseconds
79

810
interface EventBufferOptions {
911
interval?: number;
1012
logger?: Logger;
1113
maxSize?: number;
1214
offline?: boolean;
15+
maxRetries?: number;
16+
initialRetryDelay?: number;
1317
}
1418

1519
class EventBuffer {
@@ -20,6 +24,8 @@ class EventBuffer {
2024
private logger: Logger;
2125
private maxSize: number;
2226
private offline: boolean;
27+
private maxRetries: number;
28+
private initialRetryDelay: number;
2329
private shutdown: boolean = false;
2430
private stopped: boolean = false;
2531

@@ -29,12 +35,16 @@ class EventBuffer {
2935
maxSize = DEFAULT_MAX_SIZE,
3036
interval = DEFAULT_FLUSH_INTERVAL,
3137
offline = false,
38+
maxRetries = DEFAULT_MAX_RETRIES,
39+
initialRetryDelay = DEFAULT_INITIAL_RETRY_DELAY,
3240
} = opts || {};
3341
this.eventsApi = eventsApi;
3442
this.interval = interval;
3543
this.logger = logger;
3644
this.maxSize = maxSize;
3745
this.offline = offline;
46+
this.maxRetries = maxRetries;
47+
this.initialRetryDelay = initialRetryDelay;
3848

3949
this.startPeriodicFlush();
4050
}
@@ -47,10 +57,49 @@ class EventBuffer {
4757
const events = [...this.events];
4858
this.events = [];
4959

50-
try {
51-
await this.eventsApi.createEventBatch({ events });
52-
} catch (err) {
53-
this.logger.error("Failed to flush events", err);
60+
// Initialize retry counter and success flag
61+
let retryCount = 0;
62+
let success = false;
63+
let lastError: any = null;
64+
65+
// Try with retries and exponential backoff
66+
while (retryCount <= this.maxRetries && !success) {
67+
try {
68+
if (retryCount > 0) {
69+
// Log retry attempt
70+
this.logger.info(`Retrying event batch submission (attempt ${retryCount} of ${this.maxRetries})`);
71+
}
72+
73+
// Attempt to send events
74+
await this.eventsApi.createEventBatch({ events });
75+
success = true;
76+
} catch (err) {
77+
lastError = err;
78+
retryCount++;
79+
80+
if (retryCount <= this.maxRetries) {
81+
// Calculate backoff with jitter
82+
const delay = this.initialRetryDelay * Math.pow(2, retryCount - 1);
83+
const jitter = Math.random() * 0.1 * delay; // 10% jitter
84+
const waitTime = delay + jitter;
85+
86+
this.logger.warn(
87+
`Event batch submission failed: ${err}. Retrying in ${(waitTime / 1000).toFixed(2)} seconds...`
88+
);
89+
90+
// Wait before retry
91+
if (process.env.NODE_ENV !== "test") {
92+
await new Promise((resolve) => setTimeout(resolve, waitTime));
93+
}
94+
}
95+
}
96+
}
97+
98+
// After all retries, if still not successful, log the error
99+
if (!success) {
100+
this.logger.error(`Event batch submission failed after ${this.maxRetries} retries:`, lastError);
101+
} else if (retryCount > 0) {
102+
this.logger.info(`Event batch submission succeeded after ${retryCount} retries`);
54103
}
55104
}
56105

0 commit comments

Comments
 (0)