From c65b56f4d3d20a1bb39c0e8c73a681bf3c8aba42 Mon Sep 17 00:00:00 2001 From: Leibale Date: Fri, 27 Jan 2023 09:35:35 -0500 Subject: [PATCH] fix #2329 - do not ignore errors in `legacyMode` --- packages/client/lib/client/index.spec.ts | 190 +++-------------------- packages/client/lib/client/index.ts | 19 ++- 2 files changed, 36 insertions(+), 173 deletions(-) diff --git a/packages/client/lib/client/index.spec.ts b/packages/client/lib/client/index.spec.ts index aadf823e572..7af7f35d4d8 100644 --- a/packages/client/lib/client/index.spec.ts +++ b/packages/client/lib/client/index.spec.ts @@ -2,20 +2,14 @@ import { strict as assert } from 'assert'; import testUtils, { GLOBAL, waitTillBeenCalled } from '../test-utils'; import RedisClient, { RedisClientType } from '.'; import { RedisClientMultiCommandType } from './multi-command'; -import { RedisCommandRawReply, RedisModules, RedisFunctions, RedisScripts } from '../commands'; -import { AbortError, ClientClosedError, ClientOfflineError, ConnectionTimeoutError, DisconnectsClientError, ErrorReply, SocketClosedUnexpectedlyError, WatchError } from '../errors'; +import { RedisCommandArguments, RedisCommandRawReply, RedisModules, RedisFunctions, RedisScripts } from '../commands'; +import { AbortError, ClientClosedError, ClientOfflineError, ConnectionTimeoutError, DisconnectsClientError, SocketClosedUnexpectedlyError, WatchError } from '../errors'; import { defineScript } from '../lua-script'; import { spy } from 'sinon'; import { once } from 'events'; import { ClientKillFilters } from '../commands/CLIENT_KILL'; -import { ClusterSlotStates } from '../commands/CLUSTER_SETSLOT'; import { promisify } from 'util'; -// We need to use 'require', because it's not possible with Typescript to import -// function that are exported as 'module.exports = function`, without esModuleInterop -// set to true. -const calculateSlot = require('cluster-key-slot'); - export const SQUARE_SCRIPT = defineScript({ SCRIPT: 'return ARGV[1] * ARGV[1];', NUMBER_OF_KEYS: 0, @@ -171,6 +165,28 @@ describe('Client', () => { } }); + testUtils.testWithClient('client.sendCommand should reply with error', async client => { + await assert.rejects( + promisify(client.sendCommand).call(client, '1', '2') + ); + }, { + ...GLOBAL.SERVERS.OPEN, + clientOptions: { + legacyMode: true + } + }); + + testUtils.testWithClient('client.hGetAll should reply with error', async client => { + await assert.rejects( + promisify(client.hGetAll).call(client) + ); + }, { + ...GLOBAL.SERVERS.OPEN, + clientOptions: { + legacyMode: true + } + }); + testUtils.testWithClient('client.v4.sendCommand should return a promise', async client => { assert.equal( await client.v4.sendCommand(['PING']), @@ -347,19 +363,6 @@ describe('Client', () => { legacyMode: true } }); - - testUtils.testWithClient('pingInterval', async client => { - assert.deepEqual( - await once(client, 'ping-interval'), - ['PONG'] - ); - }, { - ...GLOBAL.SERVERS.OPEN, - clientOptions: { - legacyMode: true, - pingInterval: 1 - } - }); }); describe('events', () => { @@ -823,34 +826,7 @@ describe('Client', () => { } }, GLOBAL.SERVERS.OPEN); - testUtils.testWithClient('should be able to PING in PubSub mode', async client => { - await client.connect(); - - try { - await client.subscribe('channel', () => { - // noop - }); - - const [string, buffer, customString, customBuffer] = await Promise.all([ - client.ping(), - client.ping(client.commandOptions({ returnBuffers: true })), - client.ping('custom'), - client.ping(client.commandOptions({ returnBuffers: true }), 'custom') - ]); - - assert.equal(string, 'pong'); - assert.deepEqual(buffer, Buffer.from('pong')); - assert.equal(customString, 'custom'); - assert.deepEqual(customBuffer, Buffer.from('custom')); - } finally { - await client.disconnect(); - } - }, { - ...GLOBAL.SERVERS.OPEN, - disableClientSetup: true - }); - - testUtils.testWithClient('should be able to QUIT in PubSub mode', async client => { + testUtils.testWithClient('should be able to quit in PubSub mode', async client => { await client.subscribe('channel', () => { // noop }); @@ -859,122 +835,6 @@ describe('Client', () => { assert.equal(client.isOpen, false); }, GLOBAL.SERVERS.OPEN); - - testUtils.testWithClient('should reject GET in PubSub mode', async client => { - await client.connect(); - - try { - await client.subscribe('channel', () => { - // noop - }); - - await assert.rejects(client.get('key'), ErrorReply); - } finally { - await client.disconnect(); - } - }, { - ...GLOBAL.SERVERS.OPEN, - disableClientSetup: true - }); - - describe('shareded PubSub', () => { - testUtils.isVersionGreaterThanHook([7]); - - testUtils.testWithClient('should be able to receive messages', async publisher => { - const subscriber = publisher.duplicate(); - - await subscriber.connect(); - - try { - const listener = spy(); - await subscriber.sSubscribe('channel', listener); - - await Promise.all([ - waitTillBeenCalled(listener), - publisher.sPublish('channel', 'message') - ]); - - assert.ok(listener.calledOnceWithExactly('message', 'channel')); - - await subscriber.sUnsubscribe(); - - // should be able to send commands - await assert.doesNotReject(subscriber.ping()); - } finally { - await subscriber.disconnect(); - } - }, { - ...GLOBAL.SERVERS.OPEN - }); - - testUtils.testWithClient('should emit sharded-channel-moved event', async publisher => { - await publisher.clusterAddSlotsRange({ start: 0, end: 16383 }); - - const subscriber = publisher.duplicate(); - - await subscriber.connect(); - - try { - await subscriber.sSubscribe('channel', () => {}); - - await Promise.all([ - publisher.clusterSetSlot( - calculateSlot('channel'), - ClusterSlotStates.NODE, - await publisher.clusterMyId() - ), - once(subscriber, 'sharded-channel-moved') - ]); - - assert.equal( - await subscriber.ping(), - 'PONG' - ); - } finally { - await subscriber.disconnect(); - } - }, { - serverArguments: ['--cluster-enabled', 'yes'] - }); - }); - - testUtils.testWithClient('should handle errors in SUBSCRIBE', async publisher => { - const subscriber = publisher.duplicate(); - - await subscriber.connect(); - - try { - const listener1 = spy(); - await subscriber.subscribe('1', listener1); - - await publisher.aclSetUser('default', 'resetchannels'); - - - const listener2 = spy(); - await assert.rejects(subscriber.subscribe('2', listener2)); - - await Promise.all([ - waitTillBeenCalled(listener1), - publisher.aclSetUser('default', 'allchannels'), - publisher.publish('1', 'message'), - ]); - assert.ok(listener1.calledOnceWithExactly('message', '1')); - - await subscriber.subscribe('2', listener2); - - await Promise.all([ - waitTillBeenCalled(listener2), - publisher.publish('2', 'message'), - ]); - assert.ok(listener2.calledOnceWithExactly('message', '2')); - } finally { - await subscriber.disconnect(); - } - }, { - // this test change ACL rules, running in isolated server - serverArguments: [], - minimumDockerVersion: [6 ,2] // ACL PubSub rules were added in Redis 6.2 - }); }); testUtils.testWithClient('ConnectionTimeoutError', async client => { diff --git a/packages/client/lib/client/index.ts b/packages/client/lib/client/index.ts index ae5e2fe5e84..6af572edc6a 100644 --- a/packages/client/lib/client/index.ts +++ b/packages/client/lib/client/index.ts @@ -15,6 +15,7 @@ import { ClientClosedError, ClientOfflineError, DisconnectsClientError } from '. import { URL } from 'url'; import { TcpSocketConnectOpts } from 'net'; import { PubSubType, PubSubListener, PubSubTypeListeners, ChannelListeners } from './pub-sub'; +import { callbackify } from 'util'; export interface RedisClientOptions< M extends RedisModules = RedisModules, @@ -343,7 +344,9 @@ export default class RedisClient< (this as any).sendCommand = (...args: Array): void => { const result = this.#legacySendCommand(...args); if (result) { - result.promise.then(reply => result.callback(null, reply)); + result.promise + .then(reply => result.callback(null, reply)) + .catch(err => result.callback(err)); } }; @@ -380,18 +383,18 @@ export default class RedisClient< promise.catch(err => this.emit('error', err)); } - #defineLegacyCommand(this: any, name: string, command?: RedisCommand): void { - this.#v4[name] = this[name].bind(this); - this[name] = command && command.TRANSFORM_LEGACY_REPLY && command.transformReply ? + #defineLegacyCommand(name: string, command?: RedisCommand): void { + this.#v4[name] = (this as any)[name].bind(this); + (this as any)[name] = command && command.TRANSFORM_LEGACY_REPLY && command.transformReply ? (...args: Array) => { const result = this.#legacySendCommand(name, ...args); if (result) { - result.promise.then((reply: any) => { - result.callback(null, command.transformReply!(reply)); - }); + result.promise + .then(reply => result.callback(null, command.transformReply!(reply))) + .catch(err => result.callback(err)); } } : - (...args: Array) => this.sendCommand(name, ...args); + (...args: Array) => (this as any).sendCommand(name, ...args); } #pingTimer?: NodeJS.Timer;