Skip to content

Commit 3da60d8

Browse files
committed
refactor to address comments
1 parent a0828cf commit 3da60d8

File tree

9 files changed

+261
-474
lines changed

9 files changed

+261
-474
lines changed

package.json

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -29,20 +29,21 @@
2929
"readable-stream": "^4.5.2"
3030
},
3131
"devDependencies": {
32+
"@biomejs/biome": "2.3.1",
33+
"@jest/globals": "^29.7.0",
34+
"@types/jest": "^29.5.14",
35+
"@types/node": "^18.19.70",
3236
"@types/node-fetch": "^2.6.12",
3337
"@types/readable-stream": "^4.0.18",
34-
"webpack": "^5.97.1",
35-
"ts-loader": "^9.5.1",
38+
"@types/ws": "^8.18.1",
39+
"esbuild": "^0.25.9",
3640
"jest": "^29.7.0",
37-
"@jest/globals": "^29.7.0",
38-
"@types/jest": "^29.5.14",
39-
"ts-jest": "^29.3.4",
4041
"jest-environment-jsdom": "^29.7.0",
4142
"msw": "2.11.2",
42-
"@types/node": "^18.19.70",
43+
"ts-jest": "^29.3.4",
44+
"ts-loader": "^9.5.1",
4345
"typescript": "~5.7.2",
44-
"@biomejs/biome": "2.3.1",
45-
"esbuild": "^0.25.9"
46+
"webpack": "^5.97.1"
4647
},
4748
"browser": {
4849
"fs": false,

src/datastream/datastream-client.ts

Lines changed: 19 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -10,24 +10,6 @@ import type { CacheProvider } from '../cache/types';
1010
import { LocalCache } from '../cache/local';
1111
import { RedisCacheProvider, type RedisOptions } from '../cache/redis';
1212

13-
/**
14-
* Redis cache configuration options for DataStream client
15-
*/
16-
export interface DataStreamRedisConfig {
17-
/** Redis connection URL (e.g., 'redis://localhost:6379') */
18-
url?: string;
19-
/** Redis host (default: 'localhost') */
20-
host?: string;
21-
/** Redis port (default: 6379) */
22-
port?: number;
23-
/** Redis password */
24-
password?: string;
25-
/** Redis database number (default: 0) */
26-
db?: number;
27-
/** Redis key prefix for all cache keys (default: 'schematic:') */
28-
keyPrefix?: string;
29-
}
30-
3113
/**
3214
* Options for configuring the DataStream client
3315
*/
@@ -41,7 +23,7 @@ export interface DataStreamClientOptions {
4123
/** Cache TTL in milliseconds (default: 5 minutes) */
4224
cacheTTL?: number;
4325
/** Redis configuration for all cache providers (if not provided, uses memory cache) */
44-
redisConfig?: DataStreamRedisConfig;
26+
redisConfig?: RedisOptions;
4527
/** Custom cache provider for company entities (overrides Redis config if provided) */
4628
companyCache?: CacheProvider<Schematic.RulesengineCompany>;
4729
/** Custom cache provider for user entities (overrides Redis config if provided) */
@@ -637,7 +619,7 @@ export class DataStreamClient extends EventEmitter {
637619
/**
638620
* handleMessage processes incoming datastream messages
639621
*/
640-
private async handleMessage(ctx: any, message: DataStreamResp): Promise<void> {
622+
private async handleMessage(message: DataStreamResp): Promise<void> {
641623
this.logger.debug(`Processing datastream message: EntityType=${message.entity_type}, MessageType=${message.message_type}`);
642624

643625
try {
@@ -754,17 +736,23 @@ export class DataStreamClient extends EventEmitter {
754736
return;
755737
}
756738

757-
const cacheKeys: string[] = [];
758-
for (const flag of flags) {
759-
if (flag?.key) {
760-
const cacheKey = this.flagCacheKey(flag.key);
761-
try {
739+
const results = await Promise.allSettled(
740+
flags
741+
.filter((flag) => flag?.key)
742+
.map(async (flag) => {
743+
const cacheKey = this.flagCacheKey(flag.key!);
762744
await this.flagsCacheProvider.set(cacheKey, flag);
763-
cacheKeys.push(cacheKey);
764-
} catch (error) {
765-
this.logger.warn(`Failed to cache flag: ${error}`);
766-
}
767-
}
745+
return cacheKey;
746+
}),
747+
);
748+
749+
const cacheKeys = results
750+
.filter((r): r is PromiseFulfilledResult<string> => r.status === 'fulfilled')
751+
.map((r) => r.value);
752+
753+
const failures = results.filter((r) => r.status === 'rejected');
754+
if (failures.length > 0) {
755+
this.logger.warn(`Failed to cache ${failures.length} flag(s)`);
768756
}
769757

770758
// Delete flags not in the response
@@ -845,7 +833,7 @@ export class DataStreamClient extends EventEmitter {
845833
/**
846834
* handleConnectionReady is called when the WebSocket connection is ready
847835
*/
848-
private async handleConnectionReady(ctx: any): Promise<void> {
836+
private async handleConnectionReady(): Promise<void> {
849837
this.logger.info('DataStream connection is ready');
850838

851839
// Request initial flag data

src/datastream/index.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
// Main datastream client for external use
2-
export {
2+
export {
33
DataStreamClient
44
} from './datastream-client';
55
export type {
66
DataStreamClientOptions,
7-
DataStreamRedisConfig,
87
} from './datastream-client';

src/datastream/websocket-client.test.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,11 @@ describe('DatastreamWSClient', () => {
2525
let client: DatastreamWSClient;
2626
const mockLogger = new MockLogger();
2727

28-
const mockMessageHandler: MessageHandlerFunc = async (ctx: any, message: DataStreamResp) => {
28+
const mockMessageHandler: MessageHandlerFunc = async (message: DataStreamResp) => {
2929
console.log('Received message:', message);
3030
};
3131

32-
const mockConnectionReadyHandler: ConnectionReadyHandlerFunc = async (ctx: any) => {
32+
const mockConnectionReadyHandler: ConnectionReadyHandlerFunc = async () => {
3333
console.log('Connection ready');
3434
};
3535

@@ -177,6 +177,6 @@ describe('DatastreamWSClient', () => {
177177
logger: mockLogger,
178178
});
179179

180-
await expect(client.sendMessage({ test: 'message' })).rejects.toThrow('WebSocket connection is not available!');
180+
await expect(client.sendMessage({ data: { entity_type: 'rulesengine.Company' as any, keys: {} } })).rejects.toThrow('WebSocket connection is not available!');
181181
});
182182
});

src/datastream/websocket-client.ts

Lines changed: 18 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,9 @@
22
// The ws package is required and provides the WebSocket implementation
33

44
import { EventEmitter } from 'events';
5-
import { DataStreamResp } from './types';
5+
import { DataStreamResp, DataStreamBaseReq } from './types';
66
import { Logger } from '../logger';
7+
import type WebSocket from 'ws';
78

89
// Dynamic imports to avoid webpack issues
910
const createWebSocket = () => {
@@ -37,12 +38,12 @@ const MAX_RECONNECT_DELAY = 30 * 1000; // 30 seconds
3738
* MessageHandlerFunc is a function type for handling incoming datastream messages
3839
* Expects parsed DataStreamResp messages
3940
*/
40-
export type MessageHandlerFunc = (ctx: any, message: DataStreamResp) => Promise<void>;
41+
export type MessageHandlerFunc = (message: DataStreamResp) => Promise<void>;
4142

4243
/**
4344
* ConnectionReadyHandlerFunc is a function type for functions that need to be called before connection is considered ready
4445
*/
45-
export type ConnectionReadyHandlerFunc = (ctx: any) => Promise<void>;
46+
export type ConnectionReadyHandlerFunc = () => Promise<void>;
4647

4748
/**
4849
* ClientOptions contains configuration for the datastream client
@@ -74,7 +75,7 @@ export interface ClientOptions {
7475
* https://custom.example.com -> wss://custom.example.com/datastream
7576
* http://localhost:8080 -> ws://localhost:8080/datastream
7677
*/
77-
function convertAPIURLToWebSocketURL(apiURL: string): any {
78+
function convertAPIURLToWebSocketURL(apiURL: string): string {
7879
const URLClass = createURL();
7980
const parsedURL = new URLClass(apiURL);
8081

@@ -110,7 +111,7 @@ function convertAPIURLToWebSocketURL(apiURL: string): any {
110111
*/
111112
export class DatastreamWSClient extends EventEmitter {
112113
// Configuration
113-
private readonly url: any;
114+
private readonly url: string;
114115
private readonly headers: Record<string, string>;
115116
private readonly logger: Logger;
116117
private readonly messageHandler: MessageHandlerFunc;
@@ -120,7 +121,7 @@ export class DatastreamWSClient extends EventEmitter {
120121
private readonly maxReconnectDelay: number;
121122

122123
// Connection state
123-
private ws?: any;
124+
private ws?: WebSocket;
124125
private connected: boolean = false;
125126
private ready: boolean = false;
126127

@@ -131,9 +132,6 @@ export class DatastreamWSClient extends EventEmitter {
131132
private pongTimeout?: NodeJS.Timeout;
132133
private reconnectTimeout?: NodeJS.Timeout;
133134

134-
// Context
135-
private ctx: any = {};
136-
137135
constructor(options: ClientOptions) {
138136
super();
139137

@@ -153,8 +151,7 @@ export class DatastreamWSClient extends EventEmitter {
153151
if (options.url.startsWith('http://') || options.url.startsWith('https://')) {
154152
this.url = convertAPIURLToWebSocketURL(options.url);
155153
} else {
156-
const URLClass = createURL();
157-
this.url = new URLClass(options.url);
154+
this.url = options.url;
158155
}
159156

160157
// Create headers with API key
@@ -198,7 +195,7 @@ export class DatastreamWSClient extends EventEmitter {
198195
/**
199196
* SendMessage sends a message through the WebSocket connection
200197
*/
201-
public async sendMessage(message: any): Promise<void> {
198+
public async sendMessage(message: DataStreamBaseReq): Promise<void> {
202199
if (!this.isConnected() || !this.ws) {
203200
throw new Error('WebSocket connection is not available!');
204201
}
@@ -280,7 +277,7 @@ export class DatastreamWSClient extends EventEmitter {
280277
// Call connection ready handler if provided
281278
if (this.connectionReadyHandler) {
282279
try {
283-
await this.connectionReadyHandler(this.ctx);
280+
await this.connectionReadyHandler();
284281
this.logger.debug('Connection ready handler completed successfully');
285282
} catch (err) {
286283
this.logger.error(`Connection ready handler failed: ${err}`);
@@ -354,10 +351,10 @@ export class DatastreamWSClient extends EventEmitter {
354351
*/
355352
private connect(): Promise<any> {
356353
return new Promise((resolve, reject) => {
357-
this.logger.debug(`Connecting to WebSocket: ${this.url.toString()}`);
354+
this.logger.debug(`Connecting to WebSocket: ${this.url}`);
358355

359356
const WebSocketClass = createWebSocket();
360-
const ws = new WebSocketClass(this.url.toString(), {
357+
const ws = new WebSocketClass(this.url, {
361358
headers: this.headers,
362359
handshakeTimeout: 30000, // 30 seconds
363360
});
@@ -382,8 +379,8 @@ export class DatastreamWSClient extends EventEmitter {
382379
/**
383380
* setupWebSocketHandlers sets up message and error handlers for the WebSocket
384381
*/
385-
private setupWebSocketHandlers(ws: any): void {
386-
ws.on('message', (data: any) => {
382+
private setupWebSocketHandlers(ws: WebSocket): void {
383+
ws.on('message', (data: Buffer | Buffer[]) => {
387384
this.handleMessage(data);
388385
});
389386

@@ -403,7 +400,7 @@ export class DatastreamWSClient extends EventEmitter {
403400
/**
404401
* handleMessage processes incoming WebSocket messages
405402
*/
406-
private async handleMessage(data: any): Promise<void> {
403+
private async handleMessage(data: Buffer | Buffer[]): Promise<void> {
407404
try {
408405

409406
let messageStr: string;
@@ -412,7 +409,8 @@ export class DatastreamWSClient extends EventEmitter {
412409
} else if (Array.isArray(data)) {
413410
messageStr = Buffer.concat(data).toString();
414411
} else {
415-
messageStr = data.toString();
412+
// Fallback for other string-like types
413+
messageStr = String(data);
416414
}
417415

418416
// Parse the datastream message
@@ -427,7 +425,7 @@ export class DatastreamWSClient extends EventEmitter {
427425

428426
// Handle the parsed message using the provided handler
429427
try {
430-
await this.messageHandler(this.ctx, message);
428+
await this.messageHandler(message);
431429
} catch (err) {
432430
this.emit('error', new Error(`Message handler error: ${err}`));
433431
}

src/index.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ export { LocalCache } from "./cache/local";
33
export { RedisCacheProvider } from "./cache/redis";
44
export { SchematicClient } from "./wrapper";
55
export type { RedisOptions } from "./cache/redis";
6-
export type { DataStreamRedisConfig } from "./datastream";
76
export { SchematicEnvironment } from "./environments";
87
export { SchematicError, SchematicTimeoutError } from "./errors";
98
export { RulesEngineClient } from "./rules-engine";

src/wrapper.ts

Lines changed: 16 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@ import { type CacheProvider, LocalCache } from "./cache";
55
import { ConsoleLogger, Logger } from "./logger";
66
import { EventBuffer } from "./events";
77
import { offlineFetcher, provideFetcher } from "./core/fetcher/custom";
8-
import { DataStreamClient, type DataStreamClientOptions, type DataStreamRedisConfig } from "./datastream";
8+
import { DataStreamClient, type DataStreamClientOptions } from "./datastream";
9+
import type { RedisOptions } from "./cache/redis";
910

1011
/**
1112
* Configuration options for the SchematicClient
@@ -27,7 +28,7 @@ export interface SchematicOptions {
2728
/** Cache TTL in milliseconds (default: 5 minutes) */
2829
cacheTTL?: number;
2930
/** Redis configuration for DataStream caching */
30-
redisConfig?: DataStreamRedisConfig;
31+
redisConfig?: RedisOptions;
3132
/** Enable replicator mode for external data synchronization */
3233
replicatorMode?: boolean;
3334
/** Health check URL for replicator mode */
@@ -174,22 +175,17 @@ export class SchematicClient extends BaseClient {
174175
// Extract boolean value from DataStream response
175176
const flagValue = typeof resp === 'boolean' ? resp : resp?.value;
176177

177-
// Track the flag check event
178-
this.track({
179-
event: "flag_check",
180-
company: evalCtx.company,
181-
user: evalCtx.user,
182-
traits: {
183-
flag_key: key,
184-
value: flagValue,
185-
company_id: resp?.companyId,
186-
user_id: resp?.userId,
187-
flag_id: resp?.flagId,
188-
req_company: evalCtx.company,
189-
req_user: evalCtx.user,
190-
reason: resp?.reason,
191-
},
192-
});
178+
// Enqueue the flag check event
179+
this.enqueueEvent(api.EventType.FlagCheck, {
180+
flagKey: key,
181+
value: flagValue ?? false,
182+
reason: resp?.reason ?? "unknown",
183+
companyId: resp?.companyId,
184+
userId: resp?.userId,
185+
flagId: resp?.flagId,
186+
reqCompany: evalCtx.company,
187+
reqUser: evalCtx.user,
188+
} satisfies api.EventBodyFlagCheck);
193189

194190
return flagValue ?? this.getFlagDefault(key);
195191
} catch (err) {
@@ -433,8 +429,8 @@ export class SchematicClient extends BaseClient {
433429
}
434430

435431
private async enqueueEvent(
436-
eventType: "identify" | "track",
437-
body: api.EventBodyIdentify | api.EventBodyTrack
432+
eventType: api.EventType,
433+
body: api.EventBody
438434
): Promise<void> {
439435
try {
440436
this.eventBuffer.push({

0 commit comments

Comments
 (0)