Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions packages/client/lib/tests/test-scenario/fault-injector-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,45 @@ export class FaultInjectorClient {
throw new Error(`Timeout waiting for action ${actionId}`);
}

/**
* Triggers a migrate and bind action.
* @param bdbId The database ID to target
* @param clusterIndex The cluster index to migrate to
* @returns The action status
*/
public async migrateAndBindAction({
bdbId,
clusterIndex,
}: {
bdbId: string | number;
clusterIndex: string | number;
}) {
const bdbIdStr = String(bdbId);
const clusterIndexStr = String(clusterIndex);

return this.triggerAction<{ action_id: string }>({
type: "sequence_of_actions",
parameters: {
bdb_id: bdbIdStr,
actions: [
{
type: "migrate",
params: {
cluster_index: clusterIndexStr,
},
},
{
type: "bind",
params: {
bdb_id: bdbIdStr,
cluster_index: clusterIndexStr,
},
},
],
},
});
}

async #request<T>(
method: string,
path: string,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,7 @@ describe("Push Notifications", () => {
username: clientConfig.username,
RESP: 3,
maintPushNotifications: "auto",
maintMovingEndpointType: "external-ip",
maintRelaxedCommandTimeout: 10000,
maintRelaxedSocketTimeout: 10000,
maintMovingEndpointType: "auto",
});

client.on("error", (err: Error) => {
Expand Down
104 changes: 104 additions & 0 deletions packages/client/lib/tests/test-scenario/test-command-runner.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
import { randomUUID } from "node:crypto";
import { setTimeout } from "node:timers/promises";
import { createClient } from "../../..";

/**
* Options for the `fireCommandsUntilStopSignal` method
*/
type FireCommandsUntilStopSignalOptions = {
/**
* Number of commands to fire in each batch
*/
batchSize: number;
/**
* Timeout between batches in milliseconds
*/
timeoutMs: number;
/**
* Function that creates the commands to be executed
*/
createCommands: (
client: ReturnType<typeof createClient<any, any, any, any>>
) => Array<() => Promise<unknown>>;
};

export class TestCommandRunner {
constructor(
private client: ReturnType<typeof createClient<any, any, any, any>>
) {}

private defaultOptions: FireCommandsUntilStopSignalOptions = {
batchSize: 60,
timeoutMs: 10,
createCommands: (
client: ReturnType<typeof createClient<any, any, any, any>>
) => [
() => client.set(randomUUID(), Date.now()),
() => client.get(randomUUID()),
],
};

#toSettled<T>(p: Promise<T>) {
return p
.then((value) => ({ status: "fulfilled" as const, value, error: null }))
.catch((reason) => ({
status: "rejected" as const,
value: null,
error: reason,
}));
}

async #racePromises<S, T>({
timeout,
stopper,
}: {
timeout: Promise<S>;
stopper: Promise<T>;
}) {
return Promise.race([
this.#toSettled<S>(timeout).then((result) => ({
...result,
stop: false,
})),
this.#toSettled<T>(stopper).then((result) => ({ ...result, stop: true })),
]);
}

/**
* Fires commands until a stop signal is received.
* @param stopSignalPromise Promise that resolves when the command execution should stop
* @param options Options for the command execution
* @returns Promise that resolves when the stop signal is received
*/
async fireCommandsUntilStopSignal(
stopSignalPromise: Promise<unknown>,
options?: Partial<FireCommandsUntilStopSignalOptions>
) {
const executeOptions = {
...this.defaultOptions,
...options,
};

const commandPromises = [];

while (true) {
for (let i = 0; i < executeOptions.batchSize; i++) {
for (const command of executeOptions.createCommands(this.client)) {
commandPromises.push(this.#toSettled(command()));
}
}

const result = await this.#racePromises({
timeout: setTimeout(executeOptions.timeoutMs),
stopper: stopSignalPromise,
});

if (result.stop) {
return {
commandPromises,
stopResult: result,
};
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
import { FaultInjectorClient } from "./fault-injector-client";
import {
getDatabaseConfig,
getDatabaseConfigFromEnv,
getEnvConfig,
RedisConnectionConfig,
} from "./test-scenario.util";
import { createClient } from "../../../dist";
import { before } from "mocha";
import { TestCommandRunner } from "./test-command-runner";
import assert from "node:assert";

describe("Timeout Handling During Notifications", () => {
let clientConfig: RedisConnectionConfig;
let client: ReturnType<typeof createClient<any, any, any, 3>>;
let faultInjectorClient: FaultInjectorClient;
let commandRunner: TestCommandRunner;

before(() => {
const envConfig = getEnvConfig();
const redisConfig = getDatabaseConfigFromEnv(
envConfig.redisEndpointsConfigPath
);

faultInjectorClient = new FaultInjectorClient(envConfig.faultInjectorUrl);
clientConfig = getDatabaseConfig(redisConfig);
});

beforeEach(async () => {
client = createClient({
socket: {
host: clientConfig.host,
port: clientConfig.port,
...(clientConfig.tls === true ? { tls: true } : {}),
},
password: clientConfig.password,
username: clientConfig.username,
RESP: 3,
maintPushNotifications: "auto",
maintMovingEndpointType: "auto",
});

client.on("error", (err: Error) => {
throw new Error(`Client error: ${err.message}`);
});

commandRunner = new TestCommandRunner(client);

await client.connect();
});

afterEach(() => {
client.destroy();
});

it("should relax command timeout on MOVING, MIGRATING, and MIGRATED", async () => {
// PART 1
// Set very low timeout to trigger errors
client.options!.maintRelaxedCommandTimeout = 50;

const { action_id: lowTimeoutBindAndMigrateActionId } =
await faultInjectorClient.migrateAndBindAction({
bdbId: clientConfig.bdbId,
clusterIndex: 0,
});

const lowTimeoutWaitPromise = faultInjectorClient.waitForAction(
lowTimeoutBindAndMigrateActionId
);

const lowTimeoutCommandPromises =
await commandRunner.fireCommandsUntilStopSignal(lowTimeoutWaitPromise);

const lowTimeoutRejectedCommands = (
await Promise.all(lowTimeoutCommandPromises.commandPromises)
).filter((result) => result.status === "rejected");

assert.ok(lowTimeoutRejectedCommands.length > 0);
assert.strictEqual(
lowTimeoutRejectedCommands.filter((rejected) => {
return (
// TODO instanceof doesn't work for some reason
rejected.error.constructor.name ===
"CommandTimeoutDuringMaintananceError"
);
}).length,
lowTimeoutRejectedCommands.length
);

// PART 2
// Set high timeout to avoid errors
client.options!.maintRelaxedCommandTimeout = 10000;

const { action_id: highTimeoutBindAndMigrateActionId } =
await faultInjectorClient.migrateAndBindAction({
bdbId: clientConfig.bdbId,
clusterIndex: 0,
});

const highTimeoutWaitPromise = faultInjectorClient.waitForAction(
highTimeoutBindAndMigrateActionId
);

const highTimeoutCommandPromises =
await commandRunner.fireCommandsUntilStopSignal(highTimeoutWaitPromise);

const highTimeoutRejectedCommands = (
await Promise.all(highTimeoutCommandPromises.commandPromises)
).filter((result) => result.status === "rejected");

assert.strictEqual(highTimeoutRejectedCommands.length, 0);
});
});