Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 13 additions & 50 deletions packages/client/lib/tests/test-scenario/connection-handoff.e2e.ts
Original file line number Diff line number Diff line change
@@ -1,73 +1,45 @@
import diagnostics_channel from "node:diagnostics_channel";
import { FaultInjectorClient } from "./fault-injector-client";
import {
createTestClient,
getDatabaseConfig,
getDatabaseConfigFromEnv,
getEnvConfig,
RedisConnectionConfig,
} from "./test-scenario.util";
import { createClient } from "../../..";
import { DiagnosticsEvent } from "../../client/enterprise-maintenance-manager";
import { before } from "mocha";
import { spy } from "sinon";
import assert from "node:assert";
import { TestCommandRunner } from "./test-command-runner";
import net from "node:net";

describe("Connection Handoff", () => {
const diagnosticsLog: DiagnosticsEvent[] = [];

const onMessageHandler = (message: unknown) => {
diagnosticsLog.push(message as DiagnosticsEvent);
};

let clientConfig: RedisConnectionConfig;
let client: ReturnType<typeof createClient<any, any, any, 3>>;
let client: ReturnType<typeof createClient<any, any, any, any>>;
let faultInjectorClient: FaultInjectorClient;
let connectSpy = spy(net, "createConnection");

before(() => {
const envConfig = getEnvConfig();
const redisConfig = getDatabaseConfigFromEnv(
envConfig.redisEndpointsConfigPath,
envConfig.redisEndpointsConfigPath
);

faultInjectorClient = new FaultInjectorClient(envConfig.faultInjectorUrl);
clientConfig = getDatabaseConfig(redisConfig);
});

beforeEach(async () => {
diagnosticsLog.length = 0;
diagnostics_channel.subscribe("redis.maintenance", onMessageHandler);

connectSpy.resetHistory();

client = createClient({
socket: {
host: clientConfig.host,
port: clientConfig.port,
...(clientConfig.tls === true ? { tls: true } : {}),
},
password: clientConfig.password,
username: clientConfig.username,
RESP: 3,
maintPushNotifications: "auto",
maintMovingEndpointType: "external-ip",
maintRelaxedCommandTimeout: 10000,
maintRelaxedSocketTimeout: 10000,
});
client = await createTestClient(clientConfig);

client.on("error", (err: Error) => {
throw new Error(`Client error: ${err.message}`);
});

await client.connect();
await client.flushAll();
});

afterEach(() => {
diagnostics_channel.unsubscribe("redis.maintenance", onMessageHandler);
client.destroy();
if (client && client.isOpen) {
client.destroy();
}
});

describe("New Connection Establishment", () => {
Expand All @@ -80,11 +52,8 @@ describe("Connection Handoff", () => {
clusterIndex: 0,
});

const lowTimeoutWaitPromise = faultInjectorClient.waitForAction(
lowTimeoutBindAndMigrateActionId,
);
await faultInjectorClient.waitForAction(lowTimeoutBindAndMigrateActionId);

await lowTimeoutWaitPromise;
assert.equal(connectSpy.callCount, 2);
});
});
Expand All @@ -108,19 +77,13 @@ describe("Connection Handoff", () => {
clusterIndex: 0,
});

const workloadPromise = faultInjectorClient.waitForAction(action_id);

const commandPromises =
await TestCommandRunner.fireCommandsUntilStopSignal(
client,
workloadPromise,
);
await faultInjectorClient.waitForAction(action_id);

const rejected = (
await Promise.all(commandPromises.commandPromises)
).filter((result) => result.status === "rejected");
const currentTime = Date.now().toString();
await client.set("key", currentTime);
const result = await client.get("key");

assert.ok(rejected.length === 0);
assert.strictEqual(result, currentTime);
});
});
});
16 changes: 1 addition & 15 deletions packages/client/lib/tests/test-scenario/fault-injector-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ export class FaultInjectorClient {
* @param action The action request to trigger
* @throws {Error} When the HTTP request fails or response cannot be parsed as JSON
*/
public triggerAction<T = unknown>(action: ActionRequest): Promise<T> {
public triggerAction<T extends { action_id: string }>(action: ActionRequest): Promise<T> {
return this.#request<T>("POST", "/action", action);
}

Expand All @@ -60,20 +60,6 @@ export class FaultInjectorClient {
return this.#request<T>("GET", `/action/${actionId}`);
}

/**
* Executes an rladmin command.
* @param command The rladmin command to execute
* @param bdbId Optional database ID to target
* @throws {Error} When the HTTP request fails or response cannot be parsed as JSON
*/
public executeRladminCommand<T = unknown>(
command: string,
bdbId?: string
): Promise<T> {
const cmd = bdbId ? `rladmin -b ${bdbId} ${command}` : `rladmin ${command}`;
return this.#request<T>("POST", "/rladmin", cmd);
}

/**
* Waits for an action to complete.
* @param actionId The ID of the action to wait for
Expand Down
137 changes: 90 additions & 47 deletions packages/client/lib/tests/test-scenario/push-notification.e2e.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import assert from "node:assert";
import diagnostics_channel from "node:diagnostics_channel";
import { FaultInjectorClient } from "./fault-injector-client";
import {
createTestClient,
getDatabaseConfig,
getDatabaseConfigFromEnv,
getEnvConfig,
Expand All @@ -12,14 +13,21 @@ import { DiagnosticsEvent } from "../../client/enterprise-maintenance-manager";
import { before } from "mocha";

describe("Push Notifications", () => {
const diagnosticsLog: DiagnosticsEvent[] = [];

const onMessageHandler = (message: unknown) => {
diagnosticsLog.push(message as DiagnosticsEvent);
const createNotificationMessageHandler = (
result: Record<DiagnosticsEvent["type"], number>,
notifications: Array<DiagnosticsEvent["type"]>
) => {
return (message: unknown) => {
if (notifications.includes((message as DiagnosticsEvent).type)) {
const event = message as DiagnosticsEvent;
result[event.type] = (result[event.type] ?? 0) + 1;
}
};
};

let onMessageHandler: ReturnType<typeof createNotificationMessageHandler>;
let clientConfig: RedisConnectionConfig;
let client: ReturnType<typeof createClient<any, any, any, 3>>;
let client: ReturnType<typeof createClient<any, any, any, any>>;
let faultInjectorClient: FaultInjectorClient;

before(() => {
Expand All @@ -33,62 +41,97 @@ describe("Push Notifications", () => {
});

beforeEach(async () => {
diagnosticsLog.length = 0;
diagnostics_channel.subscribe("redis.maintenance", onMessageHandler);
client = await createTestClient(clientConfig);

client = createClient({
socket: {
host: clientConfig.host,
port: clientConfig.port,
...(clientConfig.tls === true ? { tls: true } : {}),
},
password: clientConfig.password,
username: clientConfig.username,
RESP: 3,
maintPushNotifications: "auto",
maintMovingEndpointType: "external-ip",
maintRelaxedCommandTimeout: 10000,
maintRelaxedSocketTimeout: 10000,
});

client.on("error", (err: Error) => {
throw new Error(`Client error: ${err.message}`);
});

await client.connect();
await client.flushAll();
});

afterEach(() => {
diagnostics_channel.unsubscribe("redis.maintenance", onMessageHandler);
client.destroy();
if (onMessageHandler!) {
diagnostics_channel.unsubscribe("redis.maintenance", onMessageHandler);
}

if (client && client.isOpen) {
client.destroy();
}
});

it("should receive MOVING, MIGRATING, and MIGRATED push notifications", async () => {
const { action_id: migrateActionId } =
await faultInjectorClient.triggerAction<{ action_id: string }>({
type: "migrate",
parameters: {
cluster_index: "0",
},
const notifications: Array<DiagnosticsEvent["type"]> = [
"MOVING",
"MIGRATING",
"MIGRATED",
];

const diagnosticsMap: Record<DiagnosticsEvent["type"], number> = {};

onMessageHandler = createNotificationMessageHandler(
diagnosticsMap,
notifications
);

diagnostics_channel.subscribe("redis.maintenance", onMessageHandler);

const { action_id: bindAndMigrateActionId } =
await faultInjectorClient.migrateAndBindAction({
bdbId: clientConfig.bdbId,
clusterIndex: 0,
});

await faultInjectorClient.waitForAction(migrateActionId);
await faultInjectorClient.waitForAction(bindAndMigrateActionId);

const { action_id: bindActionId } =
await faultInjectorClient.triggerAction<{ action_id: string }>({
type: "bind",
assert.strictEqual(
diagnosticsMap.MOVING,
1,
"Should have received exactly one MOVING notification"
);
assert.strictEqual(
diagnosticsMap.MIGRATING,
1,
"Should have received exactly one MIGRATING notification"
);
assert.strictEqual(
diagnosticsMap.MIGRATED,
1,
"Should have received exactly one MIGRATED notification"
);
});

it("should receive FAILING_OVER and FAILED_OVER push notifications", async () => {
const notifications: Array<DiagnosticsEvent["type"]> = [
"FAILING_OVER",
"FAILED_OVER",
];

const diagnosticsMap: Record<DiagnosticsEvent["type"], number> = {};

onMessageHandler = createNotificationMessageHandler(
diagnosticsMap,
notifications
);

diagnostics_channel.subscribe("redis.maintenance", onMessageHandler);

const { action_id: failoverActionId } =
await faultInjectorClient.triggerAction({
type: "failover",
parameters: {
cluster_index: "0",
bdb_id: `${clientConfig.bdbId}`,
bdb_id: clientConfig.bdbId.toString(),
cluster_index: 0,
},
});

await faultInjectorClient.waitForAction(bindActionId);
await faultInjectorClient.waitForAction(failoverActionId);

const pushNotificationLogs = diagnosticsLog.filter((log) => {
return ["MOVING", "MIGRATING", "MIGRATED"].includes(log?.type);
});

assert.strictEqual(pushNotificationLogs.length, 3);
assert.strictEqual(
diagnosticsMap.FAILING_OVER,
1,
"Should have received exactly one FAILING_OVER notification"
);
assert.strictEqual(
diagnosticsMap.FAILED_OVER,
1,
"Should have received exactly one FAILED_OVER notification"
);
});
});
Loading
Loading