From 1cecdf8907c77af57a9b55e4c777ae9b0dbed23e Mon Sep 17 00:00:00 2001 From: Emiliano Sanchez Date: Wed, 5 Mar 2025 17:34:50 -0300 Subject: [PATCH 1/5] Handle RBSEGMENT_UPDATE notification --- src/logger/messages/warn.ts | 2 +- src/sync/polling/types.ts | 4 +- .../polling/updaters/segmentChangesUpdater.ts | 2 +- src/sync/streaming/SSEHandler/index.ts | 3 +- src/sync/streaming/SSEHandler/types.ts | 4 +- .../UpdateWorkers/SplitsUpdateWorker.ts | 159 ++++++++++-------- .../__tests__/SplitsUpdateWorker.spec.ts | 8 +- src/sync/streaming/constants.ts | 1 + src/sync/streaming/parseUtils.ts | 4 +- src/sync/streaming/pushManager.ts | 22 +-- src/sync/streaming/types.ts | 5 +- 11 files changed, 116 insertions(+), 98 deletions(-) diff --git a/src/logger/messages/warn.ts b/src/logger/messages/warn.ts index 52487f95..2b0b85ca 100644 --- a/src/logger/messages/warn.ts +++ b/src/logger/messages/warn.ts @@ -33,7 +33,7 @@ export const codesWarn: [number, string][] = codesError.concat([ [c.WARN_SDK_KEY, c.LOG_PREFIX_SETTINGS + ': You already have %s. We recommend keeping only one instance of the factory at all times (Singleton pattern) and reusing it throughout your application'], [c.STREAMING_PARSING_MEMBERSHIPS_UPDATE, c.LOG_PREFIX_SYNC_STREAMING + 'Fetching Memberships due to an error processing %s notification: %s'], - [c.STREAMING_PARSING_SPLIT_UPDATE, c.LOG_PREFIX_SYNC_STREAMING + 'Fetching SplitChanges due to an error processing SPLIT_UPDATE notification: %s'], + [c.STREAMING_PARSING_SPLIT_UPDATE, c.LOG_PREFIX_SYNC_STREAMING + 'Fetching SplitChanges due to an error processing %s notification: %s'], [c.WARN_INVALID_FLAGSET, '%s: you passed %s, flag set must adhere to the regular expressions %s. This means a flag set must start with a letter or number, be in lowercase, alphanumeric and have a max length of 50 characters. %s was discarded.'], [c.WARN_LOWERCASE_FLAGSET, '%s: flag set %s should be all lowercase - converting string to lowercase.'], [c.WARN_FLAGSET_WITHOUT_FLAGS, '%s: you passed %s flag set that does not contain cached feature flag names. Please double check what flag sets are in use in the Split user interface.'], diff --git a/src/sync/polling/types.ts b/src/sync/polling/types.ts index c542fec9..4ff29c83 100644 --- a/src/sync/polling/types.ts +++ b/src/sync/polling/types.ts @@ -1,10 +1,10 @@ -import { ISplit } from '../../dtos/types'; +import { IRBSegment, ISplit } from '../../dtos/types'; import { IReadinessManager } from '../../readiness/types'; import { IStorageSync } from '../../storages/types'; import { MEMBERSHIPS_LS_UPDATE, MEMBERSHIPS_MS_UPDATE } from '../streaming/types'; import { ITask, ISyncTask } from '../types'; -export interface ISplitsSyncTask extends ISyncTask<[noCache?: boolean, till?: number, splitUpdateNotification?: { payload: ISplit, changeNumber: number }], boolean> { } +export interface ISplitsSyncTask extends ISyncTask<[noCache?: boolean, till?: number, splitUpdateNotification?: { payload: ISplit | IRBSegment, changeNumber: number }], boolean> { } export interface ISegmentsSyncTask extends ISyncTask<[fetchOnlyNew?: boolean, segmentName?: string, noCache?: boolean, till?: number], boolean> { } diff --git a/src/sync/polling/updaters/segmentChangesUpdater.ts b/src/sync/polling/updaters/segmentChangesUpdater.ts index c1009077..679c7f6e 100644 --- a/src/sync/polling/updaters/segmentChangesUpdater.ts +++ b/src/sync/polling/updaters/segmentChangesUpdater.ts @@ -51,7 +51,7 @@ export function segmentChangesUpdaterFactory( * Returned promise will not be rejected. * * @param fetchOnlyNew - if true, only fetch the segments that not exists, i.e., which `changeNumber` is equal to -1. - * This param is used by SplitUpdateWorker on server-side SDK, to fetch new registered segments on SPLIT_UPDATE notifications. + * This param is used by SplitUpdateWorker on server-side SDK, to fetch new registered segments on SPLIT_UPDATE or RBSEGMENT_UPDATE notifications. * @param segmentName - segment name to fetch. By passing `undefined` it fetches the list of segments registered at the storage * @param noCache - true to revalidate data to fetch on a SEGMENT_UPDATE notifications. * @param till - till target for the provided segmentName, for CDN bypass. diff --git a/src/sync/streaming/SSEHandler/index.ts b/src/sync/streaming/SSEHandler/index.ts index fbbe329c..6a8d99fc 100644 --- a/src/sync/streaming/SSEHandler/index.ts +++ b/src/sync/streaming/SSEHandler/index.ts @@ -1,6 +1,6 @@ import { errorParser, messageParser } from './NotificationParser'; import { notificationKeeperFactory } from './NotificationKeeper'; -import { PUSH_RETRYABLE_ERROR, PUSH_NONRETRYABLE_ERROR, OCCUPANCY, CONTROL, SEGMENT_UPDATE, SPLIT_KILL, SPLIT_UPDATE, MEMBERSHIPS_MS_UPDATE, MEMBERSHIPS_LS_UPDATE } from '../constants'; +import { PUSH_RETRYABLE_ERROR, PUSH_NONRETRYABLE_ERROR, OCCUPANCY, CONTROL, SEGMENT_UPDATE, SPLIT_KILL, SPLIT_UPDATE, MEMBERSHIPS_MS_UPDATE, MEMBERSHIPS_LS_UPDATE, RBSEGMENT_UPDATE } from '../constants'; import { IPushEventEmitter } from '../types'; import { ISseEventHandler } from '../SSEClient/types'; import { INotificationError, INotificationMessage } from './types'; @@ -84,6 +84,7 @@ export function SSEHandlerFactory(log: ILogger, pushEmitter: IPushEventEmitter, case MEMBERSHIPS_MS_UPDATE: case MEMBERSHIPS_LS_UPDATE: case SPLIT_KILL: + case RBSEGMENT_UPDATE: pushEmitter.emit(parsedData.type, parsedData); break; diff --git a/src/sync/streaming/SSEHandler/types.ts b/src/sync/streaming/SSEHandler/types.ts index 192583c3..48e9f2c6 100644 --- a/src/sync/streaming/SSEHandler/types.ts +++ b/src/sync/streaming/SSEHandler/types.ts @@ -1,5 +1,5 @@ import { ControlType } from '../constants'; -import { SEGMENT_UPDATE, SPLIT_UPDATE, SPLIT_KILL, CONTROL, OCCUPANCY, MEMBERSHIPS_LS_UPDATE, MEMBERSHIPS_MS_UPDATE } from '../types'; +import { SEGMENT_UPDATE, SPLIT_UPDATE, SPLIT_KILL, CONTROL, OCCUPANCY, MEMBERSHIPS_LS_UPDATE, MEMBERSHIPS_MS_UPDATE, RBSEGMENT_UPDATE } from '../types'; export enum Compression { None = 0, @@ -42,7 +42,7 @@ export interface ISegmentUpdateData { } export interface ISplitUpdateData { - type: SPLIT_UPDATE, + type: SPLIT_UPDATE | RBSEGMENT_UPDATE, changeNumber: number, pcn?: number, d?: string, diff --git a/src/sync/streaming/UpdateWorkers/SplitsUpdateWorker.ts b/src/sync/streaming/UpdateWorkers/SplitsUpdateWorker.ts index 580fe9cb..8e49474a 100644 --- a/src/sync/streaming/UpdateWorkers/SplitsUpdateWorker.ts +++ b/src/sync/streaming/UpdateWorkers/SplitsUpdateWorker.ts @@ -1,4 +1,5 @@ -import { ISplit } from '../../../dtos/types'; +import { IRBSegment, ISplit } from '../../../dtos/types'; +import { STREAMING_PARSING_SPLIT_UPDATE } from '../../../logger/constants'; import { ILogger } from '../../../logger/types'; import { SDK_SPLITS_ARRIVED } from '../../../readiness/constants'; import { ISplitsEventEmitter } from '../../../readiness/types'; @@ -7,6 +8,8 @@ import { ITelemetryTracker } from '../../../trackers/types'; import { Backoff } from '../../../utils/Backoff'; import { SPLITS } from '../../../utils/constants'; import { ISegmentsSyncTask, ISplitsSyncTask } from '../../polling/types'; +import { RBSEGMENT_UPDATE } from '../constants'; +import { parseFFUpdatePayload } from '../parseUtils'; import { ISplitKillData, ISplitUpdateData } from '../SSEHandler/types'; import { FETCH_BACKOFF_BASE, FETCH_BACKOFF_MAX_WAIT, FETCH_BACKOFF_MAX_RETRIES } from './constants'; import { IUpdateWorker } from './types'; @@ -14,87 +17,111 @@ import { IUpdateWorker } from './types'; /** * SplitsUpdateWorker factory */ -export function SplitsUpdateWorker(log: ILogger, splitsCache: ISplitsCacheSync, splitsSyncTask: ISplitsSyncTask, splitsEventEmitter: ISplitsEventEmitter, telemetryTracker: ITelemetryTracker, segmentsSyncTask?: ISegmentsSyncTask): IUpdateWorker<[updateData: ISplitUpdateData, payload?: ISplit]> & { killSplit(event: ISplitKillData): void } { +export function SplitsUpdateWorker(log: ILogger, splitsCache: ISplitsCacheSync, splitsSyncTask: ISplitsSyncTask, splitsEventEmitter: ISplitsEventEmitter, telemetryTracker: ITelemetryTracker, segmentsSyncTask?: ISegmentsSyncTask): IUpdateWorker<[updateData: ISplitUpdateData]> & { killSplit(event: ISplitKillData): void } { - let maxChangeNumber = 0; - let handleNewEvent = false; - let isHandlingEvent: boolean; - let cdnBypass: boolean; - let payload: ISplit | undefined; - const backoff = new Backoff(__handleSplitUpdateCall, FETCH_BACKOFF_BASE, FETCH_BACKOFF_MAX_WAIT); + function SplitsUpdateWorker() { + let maxChangeNumber = 0; + let handleNewEvent = false; + let isHandlingEvent: boolean; + let cdnBypass: boolean; + let payload: ISplit | IRBSegment | undefined; + const backoff = new Backoff(__handleSplitUpdateCall, FETCH_BACKOFF_BASE, FETCH_BACKOFF_MAX_WAIT); - function __handleSplitUpdateCall() { - isHandlingEvent = true; - if (maxChangeNumber > splitsCache.getChangeNumber()) { - handleNewEvent = false; - const splitUpdateNotification = payload ? { payload, changeNumber: maxChangeNumber } : undefined; - // fetch splits revalidating data if cached - splitsSyncTask.execute(true, cdnBypass ? maxChangeNumber : undefined, splitUpdateNotification).then(() => { - if (!isHandlingEvent) return; // halt if `stop` has been called - if (handleNewEvent) { - __handleSplitUpdateCall(); - } else { - if (splitUpdateNotification) telemetryTracker.trackUpdatesFromSSE(SPLITS); - // fetch new registered segments for server-side API. Not retrying on error - if (segmentsSyncTask) segmentsSyncTask.execute(true); + function __handleSplitUpdateCall() { + isHandlingEvent = true; + if (maxChangeNumber > splitsCache.getChangeNumber()) { + handleNewEvent = false; + const splitUpdateNotification = payload ? { payload, changeNumber: maxChangeNumber } : undefined; + // fetch splits revalidating data if cached + splitsSyncTask.execute(true, cdnBypass ? maxChangeNumber : undefined, splitUpdateNotification).then(() => { + if (!isHandlingEvent) return; // halt if `stop` has been called + if (handleNewEvent) { + __handleSplitUpdateCall(); + } else { + if (splitUpdateNotification) telemetryTracker.trackUpdatesFromSSE(SPLITS); + // fetch new registered segments for server-side API. Not retrying on error + if (segmentsSyncTask) segmentsSyncTask.execute(true); - const attempts = backoff.attempts + 1; + const attempts = backoff.attempts + 1; - if (maxChangeNumber <= splitsCache.getChangeNumber()) { - log.debug(`Refresh completed${cdnBypass ? ' bypassing the CDN' : ''} in ${attempts} attempts.`); - isHandlingEvent = false; - return; - } + if (maxChangeNumber <= splitsCache.getChangeNumber()) { + log.debug(`Refresh completed${cdnBypass ? ' bypassing the CDN' : ''} in ${attempts} attempts.`); + isHandlingEvent = false; + return; + } - if (attempts < FETCH_BACKOFF_MAX_RETRIES) { - backoff.scheduleCall(); - return; - } + if (attempts < FETCH_BACKOFF_MAX_RETRIES) { + backoff.scheduleCall(); + return; + } - if (cdnBypass) { - log.debug(`No changes fetched after ${attempts} attempts with CDN bypassed.`); - isHandlingEvent = false; - } else { - backoff.reset(); - cdnBypass = true; - __handleSplitUpdateCall(); + if (cdnBypass) { + log.debug(`No changes fetched after ${attempts} attempts with CDN bypassed.`); + isHandlingEvent = false; + } else { + backoff.reset(); + cdnBypass = true; + __handleSplitUpdateCall(); + } } - } - }); - } else { - isHandlingEvent = false; + }); + } else { + isHandlingEvent = false; + } } - } - /** - * Invoked by NotificationProcessor on SPLIT_UPDATE event - * - * @param changeNumber - change number of the SPLIT_UPDATE notification - */ - function put({ changeNumber, pcn }: ISplitUpdateData, _payload?: ISplit) { - const currentChangeNumber = splitsCache.getChangeNumber(); + return { + /** + * Invoked by NotificationProcessor on SPLIT_UPDATE or RBSEGMENT_UPDATE event + * + * @param changeNumber - change number of the notification + */ + put({ changeNumber, pcn }: ISplitUpdateData, _payload?: ISplit | IRBSegment) { + const currentChangeNumber = splitsCache.getChangeNumber(); - if (changeNumber <= currentChangeNumber || changeNumber <= maxChangeNumber) return; + if (changeNumber <= currentChangeNumber || changeNumber <= maxChangeNumber) return; - maxChangeNumber = changeNumber; - handleNewEvent = true; - cdnBypass = false; - payload = undefined; + maxChangeNumber = changeNumber; + handleNewEvent = true; + cdnBypass = false; + payload = undefined; - if (_payload && currentChangeNumber === pcn) { - payload = _payload; - } + if (_payload && currentChangeNumber === pcn) { + payload = _payload; + } - if (backoff.timeoutID || !isHandlingEvent) __handleSplitUpdateCall(); - backoff.reset(); + if (backoff.timeoutID || !isHandlingEvent) __handleSplitUpdateCall(); + backoff.reset(); + }, + stop() { + isHandlingEvent = false; + backoff.reset(); + } + }; } + const ff = SplitsUpdateWorker(); + const rbs = SplitsUpdateWorker(); + return { - put, + put(parsedData) { + if (parsedData.d && parsedData.c !== undefined) { + try { + const payload = parseFFUpdatePayload(parsedData.c, parsedData.d); + if (payload) { + (parsedData.type === RBSEGMENT_UPDATE ? rbs : ff).put(parsedData, payload); + return; + } + } catch (e) { + log.warn(STREAMING_PARSING_SPLIT_UPDATE, [parsedData.type, e]); + } + } + (parsedData.type === RBSEGMENT_UPDATE ? rbs : ff).put(parsedData); + }, /** * Invoked by NotificationProcessor on SPLIT_KILL event * - * @param changeNumber - change number of the SPLIT_UPDATE notification + * @param changeNumber - change number of the notification * @param splitName - name of split to kill * @param defaultTreatment - default treatment value */ @@ -104,12 +131,12 @@ export function SplitsUpdateWorker(log: ILogger, splitsCache: ISplitsCacheSync, splitsEventEmitter.emit(SDK_SPLITS_ARRIVED, true); } // queues the SplitChanges fetch (only if changeNumber is newer) - put({ changeNumber } as ISplitUpdateData); + ff.put({ changeNumber } as ISplitUpdateData); }, stop() { - isHandlingEvent = false; - backoff.reset(); + ff.stop(); + rbs.stop(); } }; } diff --git a/src/sync/streaming/UpdateWorkers/__tests__/SplitsUpdateWorker.spec.ts b/src/sync/streaming/UpdateWorkers/__tests__/SplitsUpdateWorker.spec.ts index 4de69ca0..735c7ee0 100644 --- a/src/sync/streaming/UpdateWorkers/__tests__/SplitsUpdateWorker.spec.ts +++ b/src/sync/streaming/UpdateWorkers/__tests__/SplitsUpdateWorker.spec.ts @@ -219,7 +219,7 @@ describe('SplitsUpdateWorker', () => { const splitUpdateWorker = SplitsUpdateWorker(loggerMock, cache, splitsSyncTask, telemetryTracker); const payload = notification.decoded; const changeNumber = payload.changeNumber; - splitUpdateWorker.put({ changeNumber, pcn }, payload); // queued + splitUpdateWorker.put({ changeNumber, pcn, d: notification.data, c: notification.compression }); // queued expect(splitsSyncTask.execute).toBeCalledTimes(1); expect(splitsSyncTask.execute.mock.calls[0]).toEqual([true, undefined, { changeNumber, payload }]); }); @@ -237,7 +237,7 @@ describe('SplitsUpdateWorker', () => { let splitsSyncTask = splitsSyncTaskMock(cache); let splitUpdateWorker = SplitsUpdateWorker(loggerMock, cache, splitsSyncTask, telemetryTracker); - splitUpdateWorker.put({ changeNumber, pcn }, notification.decoded); + splitUpdateWorker.put({ changeNumber, pcn, d: notification.data, c: notification.compression }); expect(splitsSyncTask.execute).toBeCalledTimes(1); expect(splitsSyncTask.execute.mock.calls[0]).toEqual([true, undefined, undefined]); splitsSyncTask.execute.mockClear(); @@ -250,7 +250,7 @@ describe('SplitsUpdateWorker', () => { splitsSyncTask = splitsSyncTaskMock(cache); splitUpdateWorker = SplitsUpdateWorker(loggerMock, cache, splitsSyncTask, telemetryTracker); - splitUpdateWorker.put({ changeNumber, pcn }, notification.decoded); + splitUpdateWorker.put({ changeNumber, pcn, d: notification.data, c: notification.compression }); expect(splitsSyncTask.execute).toBeCalledTimes(1); expect(splitsSyncTask.execute.mock.calls[0]).toEqual([true, undefined, undefined]); splitsSyncTask.execute.mockClear(); @@ -263,7 +263,7 @@ describe('SplitsUpdateWorker', () => { splitsSyncTask = splitsSyncTaskMock(cache); splitUpdateWorker = SplitsUpdateWorker(loggerMock, cache, splitsSyncTask, telemetryTracker); - splitUpdateWorker.put({ changeNumber, pcn }, notification.decoded); + splitUpdateWorker.put({ changeNumber, pcn, d: notification.data, c: notification.compression }); expect(splitsSyncTask.execute).toBeCalledTimes(1); expect(splitsSyncTask.execute.mock.calls[0]).toEqual([true, undefined, { payload: notification.decoded, changeNumber }]); diff --git a/src/sync/streaming/constants.ts b/src/sync/streaming/constants.ts index ed958ee7..1aaa10e4 100644 --- a/src/sync/streaming/constants.ts +++ b/src/sync/streaming/constants.ts @@ -30,6 +30,7 @@ export const MEMBERSHIPS_LS_UPDATE = 'MEMBERSHIPS_LS_UPDATE'; export const SEGMENT_UPDATE = 'SEGMENT_UPDATE'; export const SPLIT_KILL = 'SPLIT_KILL'; export const SPLIT_UPDATE = 'SPLIT_UPDATE'; +export const RBSEGMENT_UPDATE = 'RBSEGMENT_UPDATE'; // Control-type push notifications, handled by NotificationKeeper export const CONTROL = 'CONTROL'; diff --git a/src/sync/streaming/parseUtils.ts b/src/sync/streaming/parseUtils.ts index 97fde935..a34f2dc9 100644 --- a/src/sync/streaming/parseUtils.ts +++ b/src/sync/streaming/parseUtils.ts @@ -2,7 +2,7 @@ import { algorithms } from '../../utils/decompress'; import { decodeFromBase64 } from '../../utils/base64'; import { hash } from '../../utils/murmur3/murmur3'; import { Compression, IMembershipMSUpdateData, KeyList } from './SSEHandler/types'; -import { ISplit } from '../../dtos/types'; +import { IRBSegment, ISplit } from '../../dtos/types'; const GZIP = 1; const ZLIB = 2; @@ -82,7 +82,7 @@ export function isInBitmap(bitmap: Uint8Array, hash64hex: string) { /** * Parse feature flags notifications for instant feature flag updates */ -export function parseFFUpdatePayload(compression: Compression, data: string): ISplit | undefined { +export function parseFFUpdatePayload(compression: Compression, data: string): ISplit | IRBSegment | undefined { return compression > 0 ? parseKeyList(data, compression, false) : JSON.parse(decodeFromBase64(data)); diff --git a/src/sync/streaming/pushManager.ts b/src/sync/streaming/pushManager.ts index c87f4945..8dade79f 100644 --- a/src/sync/streaming/pushManager.ts +++ b/src/sync/streaming/pushManager.ts @@ -11,10 +11,10 @@ import { authenticateFactory, hashUserKey } from './AuthClient'; import { forOwn } from '../../utils/lang'; import { SSEClient } from './SSEClient'; import { getMatching } from '../../utils/key'; -import { MEMBERSHIPS_MS_UPDATE, MEMBERSHIPS_LS_UPDATE, PUSH_NONRETRYABLE_ERROR, PUSH_SUBSYSTEM_DOWN, SECONDS_BEFORE_EXPIRATION, SEGMENT_UPDATE, SPLIT_KILL, SPLIT_UPDATE, PUSH_RETRYABLE_ERROR, PUSH_SUBSYSTEM_UP, ControlType } from './constants'; -import { STREAMING_FALLBACK, STREAMING_REFRESH_TOKEN, STREAMING_CONNECTING, STREAMING_DISABLED, ERROR_STREAMING_AUTH, STREAMING_DISCONNECTING, STREAMING_RECONNECT, STREAMING_PARSING_MEMBERSHIPS_UPDATE, STREAMING_PARSING_SPLIT_UPDATE } from '../../logger/constants'; +import { MEMBERSHIPS_MS_UPDATE, MEMBERSHIPS_LS_UPDATE, PUSH_NONRETRYABLE_ERROR, PUSH_SUBSYSTEM_DOWN, SECONDS_BEFORE_EXPIRATION, SEGMENT_UPDATE, SPLIT_KILL, SPLIT_UPDATE, RBSEGMENT_UPDATE, PUSH_RETRYABLE_ERROR, PUSH_SUBSYSTEM_UP, ControlType } from './constants'; +import { STREAMING_FALLBACK, STREAMING_REFRESH_TOKEN, STREAMING_CONNECTING, STREAMING_DISABLED, ERROR_STREAMING_AUTH, STREAMING_DISCONNECTING, STREAMING_RECONNECT, STREAMING_PARSING_MEMBERSHIPS_UPDATE } from '../../logger/constants'; import { IMembershipMSUpdateData, IMembershipLSUpdateData, KeyList, UpdateStrategy } from './SSEHandler/types'; -import { getDelay, isInBitmap, parseBitmap, parseFFUpdatePayload, parseKeyList } from './parseUtils'; +import { getDelay, isInBitmap, parseBitmap, parseKeyList } from './parseUtils'; import { Hash64, hash64 } from '../../utils/murmur3/murmur3_64'; import { IAuthTokenPushEnabled } from './AuthClient/types'; import { TOKEN_REFRESH, AUTH_REJECTION } from '../../utils/constants'; @@ -219,20 +219,8 @@ export function pushManagerFactory( /** Functions related to synchronization (Queues and Workers in the spec) */ pushEmitter.on(SPLIT_KILL, splitsUpdateWorker.killSplit); - pushEmitter.on(SPLIT_UPDATE, (parsedData) => { - if (parsedData.d && parsedData.c !== undefined) { - try { - const payload = parseFFUpdatePayload(parsedData.c, parsedData.d); - if (payload) { - splitsUpdateWorker.put(parsedData, payload); - return; - } - } catch (e) { - log.warn(STREAMING_PARSING_SPLIT_UPDATE, [e]); - } - } - splitsUpdateWorker.put(parsedData); - }); + pushEmitter.on(SPLIT_UPDATE, splitsUpdateWorker.put); + pushEmitter.on(RBSEGMENT_UPDATE, splitsUpdateWorker.put); function handleMySegmentsUpdate(parsedData: IMembershipMSUpdateData | IMembershipLSUpdateData) { switch (parsedData.u) { diff --git a/src/sync/streaming/types.ts b/src/sync/streaming/types.ts index ec80781e..7ad46349 100644 --- a/src/sync/streaming/types.ts +++ b/src/sync/streaming/types.ts @@ -16,18 +16,19 @@ export type MEMBERSHIPS_LS_UPDATE = 'MEMBERSHIPS_LS_UPDATE'; export type SEGMENT_UPDATE = 'SEGMENT_UPDATE'; export type SPLIT_KILL = 'SPLIT_KILL'; export type SPLIT_UPDATE = 'SPLIT_UPDATE'; +export type RBSEGMENT_UPDATE = 'RBSEGMENT_UPDATE'; // Control-type push notifications, handled by NotificationKeeper export type CONTROL = 'CONTROL'; export type OCCUPANCY = 'OCCUPANCY'; -export type IPushEvent = PUSH_SUBSYSTEM_UP | PUSH_SUBSYSTEM_DOWN | PUSH_NONRETRYABLE_ERROR | PUSH_RETRYABLE_ERROR | MEMBERSHIPS_MS_UPDATE | MEMBERSHIPS_LS_UPDATE | SEGMENT_UPDATE | SPLIT_UPDATE | SPLIT_KILL | ControlType.STREAMING_RESET +export type IPushEvent = PUSH_SUBSYSTEM_UP | PUSH_SUBSYSTEM_DOWN | PUSH_NONRETRYABLE_ERROR | PUSH_RETRYABLE_ERROR | MEMBERSHIPS_MS_UPDATE | MEMBERSHIPS_LS_UPDATE | SEGMENT_UPDATE | SPLIT_UPDATE | SPLIT_KILL | RBSEGMENT_UPDATE | ControlType.STREAMING_RESET type IParsedData = T extends MEMBERSHIPS_MS_UPDATE ? IMembershipMSUpdateData : T extends MEMBERSHIPS_LS_UPDATE ? IMembershipLSUpdateData : T extends SEGMENT_UPDATE ? ISegmentUpdateData : - T extends SPLIT_UPDATE ? ISplitUpdateData : + T extends SPLIT_UPDATE | RBSEGMENT_UPDATE ? ISplitUpdateData : T extends SPLIT_KILL ? ISplitKillData : INotificationData; /** From ebc547aabbd3a40ff41bf481a0d199d3bd7e7bec Mon Sep 17 00:00:00 2001 From: Emiliano Sanchez Date: Wed, 5 Mar 2025 17:41:22 -0300 Subject: [PATCH 2/5] Unit test --- .../mocks/message.RBSEGMENT_UPDATE.1457552620999.json | 4 ++++ src/sync/streaming/SSEHandler/__tests__/index.spec.ts | 7 ++++++- 2 files changed, 10 insertions(+), 1 deletion(-) create mode 100644 src/__tests__/mocks/message.RBSEGMENT_UPDATE.1457552620999.json diff --git a/src/__tests__/mocks/message.RBSEGMENT_UPDATE.1457552620999.json b/src/__tests__/mocks/message.RBSEGMENT_UPDATE.1457552620999.json new file mode 100644 index 00000000..a313d48f --- /dev/null +++ b/src/__tests__/mocks/message.RBSEGMENT_UPDATE.1457552620999.json @@ -0,0 +1,4 @@ +{ + "type": "message", + "data": "{\"id\":\"mc4i3NENoA:0:0\",\"clientId\":\"NDEzMTY5Mzg0MA==:MTM2ODE2NDMxNA==\",\"timestamp\":1457552621899,\"encoding\":\"json\",\"channel\":\"NzM2MDI5Mzc0_NDEzMjQ1MzA0Nw==_splits\",\"data\":\"{\\\"type\\\":\\\"RBSEGMENT_UPDATE\\\",\\\"changeNumber\\\":1457552620999}\"}" +} \ No newline at end of file diff --git a/src/sync/streaming/SSEHandler/__tests__/index.spec.ts b/src/sync/streaming/SSEHandler/__tests__/index.spec.ts index e85b22d8..5d54074b 100644 --- a/src/sync/streaming/SSEHandler/__tests__/index.spec.ts +++ b/src/sync/streaming/SSEHandler/__tests__/index.spec.ts @@ -1,10 +1,11 @@ // @ts-nocheck import { SSEHandlerFactory } from '..'; -import { PUSH_SUBSYSTEM_UP, PUSH_NONRETRYABLE_ERROR, PUSH_SUBSYSTEM_DOWN, PUSH_RETRYABLE_ERROR, SEGMENT_UPDATE, SPLIT_KILL, SPLIT_UPDATE, MEMBERSHIPS_MS_UPDATE, MEMBERSHIPS_LS_UPDATE, ControlType } from '../../constants'; +import { PUSH_SUBSYSTEM_UP, PUSH_NONRETRYABLE_ERROR, PUSH_SUBSYSTEM_DOWN, PUSH_RETRYABLE_ERROR, SEGMENT_UPDATE, SPLIT_KILL, SPLIT_UPDATE, RBSEGMENT_UPDATE, MEMBERSHIPS_MS_UPDATE, MEMBERSHIPS_LS_UPDATE, ControlType } from '../../constants'; import { loggerMock } from '../../../../logger/__tests__/sdkLogger.mock'; // update messages import splitUpdateMessage from '../../../../__tests__/mocks/message.SPLIT_UPDATE.1457552620999.json'; +import rbsegmentUpdateMessage from '../../../../__tests__/mocks/message.RBSEGMENT_UPDATE.1457552620999.json'; import splitKillMessage from '../../../../__tests__/mocks/message.SPLIT_KILL.1457552650000.json'; import segmentUpdateMessage from '../../../../__tests__/mocks/message.SEGMENT_UPDATE.1457552640000.json'; @@ -144,6 +145,10 @@ test('`handlerMessage` for update notifications (NotificationProcessor) and stre sseHandler.handleMessage(splitUpdateMessage); expect(pushEmitter.emit).toHaveBeenLastCalledWith(SPLIT_UPDATE, ...expectedParams); // must emit SPLIT_UPDATE with the message change number + expectedParams = [{ type: 'RBSEGMENT_UPDATE', changeNumber: 1457552620999 }]; + sseHandler.handleMessage(rbsegmentUpdateMessage); + expect(pushEmitter.emit).toHaveBeenLastCalledWith(RBSEGMENT_UPDATE, ...expectedParams); // must emit RBSEGMENT_UPDATE with the message change number + expectedParams = [{ type: 'SPLIT_KILL', changeNumber: 1457552650000, splitName: 'whitelist', defaultTreatment: 'not_allowed' }]; sseHandler.handleMessage(splitKillMessage); expect(pushEmitter.emit).toHaveBeenLastCalledWith(SPLIT_KILL, ...expectedParams); // must emit SPLIT_KILL with the message change number, split name and default treatment From 598c0d400bef6b4332fe84eb2ff684944e65f0a3 Mon Sep 17 00:00:00 2001 From: Emiliano Sanchez Date: Wed, 5 Mar 2025 17:59:20 -0300 Subject: [PATCH 3/5] Update flag spec version --- src/utils/constants/index.ts | 2 +- src/utils/settingsValidation/__tests__/settings.mocks.ts | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/utils/constants/index.ts b/src/utils/constants/index.ts index 24d3d31f..eab99bbb 100644 --- a/src/utils/constants/index.ts +++ b/src/utils/constants/index.ts @@ -104,7 +104,7 @@ export const DISABLED = 0; export const ENABLED = 1; export const PAUSED = 2; -export const FLAG_SPEC_VERSION = '1.2'; +export const FLAG_SPEC_VERSION = '1.3'; // Matcher types export const IN_SEGMENT = 'IN_SEGMENT'; diff --git a/src/utils/settingsValidation/__tests__/settings.mocks.ts b/src/utils/settingsValidation/__tests__/settings.mocks.ts index a2a3fb14..bc55891f 100644 --- a/src/utils/settingsValidation/__tests__/settings.mocks.ts +++ b/src/utils/settingsValidation/__tests__/settings.mocks.ts @@ -67,7 +67,7 @@ export const fullSettings: ISettings = { groupedFilters: { bySet: [], byName: [], byPrefix: [] }, }, enabled: true, - flagSpecVersion: '1.2' + flagSpecVersion: '1.3' }, version: 'jest', runtime: { From b6f5514b7db9a34005eb57c2704c4679101a116c Mon Sep 17 00:00:00 2001 From: Emiliano Sanchez Date: Thu, 6 Mar 2025 17:06:40 -0300 Subject: [PATCH 4/5] Rename notification type --- ....json => message.RB_SEGMENT_UPDATE.1457552620999.json} | 2 +- src/sync/polling/updaters/segmentChangesUpdater.ts | 2 +- src/sync/streaming/SSEHandler/__tests__/index.spec.ts | 8 ++++---- src/sync/streaming/SSEHandler/index.ts | 4 ++-- src/sync/streaming/SSEHandler/types.ts | 4 ++-- src/sync/streaming/UpdateWorkers/SplitsUpdateWorker.ts | 8 ++++---- src/sync/streaming/constants.ts | 2 +- src/sync/streaming/pushManager.ts | 4 ++-- src/sync/streaming/types.ts | 6 +++--- 9 files changed, 20 insertions(+), 20 deletions(-) rename src/__tests__/mocks/{message.RBSEGMENT_UPDATE.1457552620999.json => message.RB_SEGMENT_UPDATE.1457552620999.json} (70%) diff --git a/src/__tests__/mocks/message.RBSEGMENT_UPDATE.1457552620999.json b/src/__tests__/mocks/message.RB_SEGMENT_UPDATE.1457552620999.json similarity index 70% rename from src/__tests__/mocks/message.RBSEGMENT_UPDATE.1457552620999.json rename to src/__tests__/mocks/message.RB_SEGMENT_UPDATE.1457552620999.json index a313d48f..bd994511 100644 --- a/src/__tests__/mocks/message.RBSEGMENT_UPDATE.1457552620999.json +++ b/src/__tests__/mocks/message.RB_SEGMENT_UPDATE.1457552620999.json @@ -1,4 +1,4 @@ { "type": "message", - "data": "{\"id\":\"mc4i3NENoA:0:0\",\"clientId\":\"NDEzMTY5Mzg0MA==:MTM2ODE2NDMxNA==\",\"timestamp\":1457552621899,\"encoding\":\"json\",\"channel\":\"NzM2MDI5Mzc0_NDEzMjQ1MzA0Nw==_splits\",\"data\":\"{\\\"type\\\":\\\"RBSEGMENT_UPDATE\\\",\\\"changeNumber\\\":1457552620999}\"}" + "data": "{\"id\":\"mc4i3NENoA:0:0\",\"clientId\":\"NDEzMTY5Mzg0MA==:MTM2ODE2NDMxNA==\",\"timestamp\":1457552621899,\"encoding\":\"json\",\"channel\":\"NzM2MDI5Mzc0_NDEzMjQ1MzA0Nw==_splits\",\"data\":\"{\\\"type\\\":\\\"RB_SEGMENT_UPDATE\\\",\\\"changeNumber\\\":1457552620999}\"}" } \ No newline at end of file diff --git a/src/sync/polling/updaters/segmentChangesUpdater.ts b/src/sync/polling/updaters/segmentChangesUpdater.ts index 679c7f6e..ab951b24 100644 --- a/src/sync/polling/updaters/segmentChangesUpdater.ts +++ b/src/sync/polling/updaters/segmentChangesUpdater.ts @@ -51,7 +51,7 @@ export function segmentChangesUpdaterFactory( * Returned promise will not be rejected. * * @param fetchOnlyNew - if true, only fetch the segments that not exists, i.e., which `changeNumber` is equal to -1. - * This param is used by SplitUpdateWorker on server-side SDK, to fetch new registered segments on SPLIT_UPDATE or RBSEGMENT_UPDATE notifications. + * This param is used by SplitUpdateWorker on server-side SDK, to fetch new registered segments on SPLIT_UPDATE or RB_SEGMENT_UPDATE notifications. * @param segmentName - segment name to fetch. By passing `undefined` it fetches the list of segments registered at the storage * @param noCache - true to revalidate data to fetch on a SEGMENT_UPDATE notifications. * @param till - till target for the provided segmentName, for CDN bypass. diff --git a/src/sync/streaming/SSEHandler/__tests__/index.spec.ts b/src/sync/streaming/SSEHandler/__tests__/index.spec.ts index 5d54074b..90bdc8cd 100644 --- a/src/sync/streaming/SSEHandler/__tests__/index.spec.ts +++ b/src/sync/streaming/SSEHandler/__tests__/index.spec.ts @@ -1,11 +1,11 @@ // @ts-nocheck import { SSEHandlerFactory } from '..'; -import { PUSH_SUBSYSTEM_UP, PUSH_NONRETRYABLE_ERROR, PUSH_SUBSYSTEM_DOWN, PUSH_RETRYABLE_ERROR, SEGMENT_UPDATE, SPLIT_KILL, SPLIT_UPDATE, RBSEGMENT_UPDATE, MEMBERSHIPS_MS_UPDATE, MEMBERSHIPS_LS_UPDATE, ControlType } from '../../constants'; +import { PUSH_SUBSYSTEM_UP, PUSH_NONRETRYABLE_ERROR, PUSH_SUBSYSTEM_DOWN, PUSH_RETRYABLE_ERROR, SEGMENT_UPDATE, SPLIT_KILL, SPLIT_UPDATE, RB_SEGMENT_UPDATE, MEMBERSHIPS_MS_UPDATE, MEMBERSHIPS_LS_UPDATE, ControlType } from '../../constants'; import { loggerMock } from '../../../../logger/__tests__/sdkLogger.mock'; // update messages import splitUpdateMessage from '../../../../__tests__/mocks/message.SPLIT_UPDATE.1457552620999.json'; -import rbsegmentUpdateMessage from '../../../../__tests__/mocks/message.RBSEGMENT_UPDATE.1457552620999.json'; +import rbsegmentUpdateMessage from '../../../../__tests__/mocks/message.RB_SEGMENT_UPDATE.1457552620999.json'; import splitKillMessage from '../../../../__tests__/mocks/message.SPLIT_KILL.1457552650000.json'; import segmentUpdateMessage from '../../../../__tests__/mocks/message.SEGMENT_UPDATE.1457552640000.json'; @@ -145,9 +145,9 @@ test('`handlerMessage` for update notifications (NotificationProcessor) and stre sseHandler.handleMessage(splitUpdateMessage); expect(pushEmitter.emit).toHaveBeenLastCalledWith(SPLIT_UPDATE, ...expectedParams); // must emit SPLIT_UPDATE with the message change number - expectedParams = [{ type: 'RBSEGMENT_UPDATE', changeNumber: 1457552620999 }]; + expectedParams = [{ type: 'RB_SEGMENT_UPDATE', changeNumber: 1457552620999 }]; sseHandler.handleMessage(rbsegmentUpdateMessage); - expect(pushEmitter.emit).toHaveBeenLastCalledWith(RBSEGMENT_UPDATE, ...expectedParams); // must emit RBSEGMENT_UPDATE with the message change number + expect(pushEmitter.emit).toHaveBeenLastCalledWith(RB_SEGMENT_UPDATE, ...expectedParams); // must emit RB_SEGMENT_UPDATE with the message change number expectedParams = [{ type: 'SPLIT_KILL', changeNumber: 1457552650000, splitName: 'whitelist', defaultTreatment: 'not_allowed' }]; sseHandler.handleMessage(splitKillMessage); diff --git a/src/sync/streaming/SSEHandler/index.ts b/src/sync/streaming/SSEHandler/index.ts index 6a8d99fc..f7a39c8b 100644 --- a/src/sync/streaming/SSEHandler/index.ts +++ b/src/sync/streaming/SSEHandler/index.ts @@ -1,6 +1,6 @@ import { errorParser, messageParser } from './NotificationParser'; import { notificationKeeperFactory } from './NotificationKeeper'; -import { PUSH_RETRYABLE_ERROR, PUSH_NONRETRYABLE_ERROR, OCCUPANCY, CONTROL, SEGMENT_UPDATE, SPLIT_KILL, SPLIT_UPDATE, MEMBERSHIPS_MS_UPDATE, MEMBERSHIPS_LS_UPDATE, RBSEGMENT_UPDATE } from '../constants'; +import { PUSH_RETRYABLE_ERROR, PUSH_NONRETRYABLE_ERROR, OCCUPANCY, CONTROL, SEGMENT_UPDATE, SPLIT_KILL, SPLIT_UPDATE, MEMBERSHIPS_MS_UPDATE, MEMBERSHIPS_LS_UPDATE, RB_SEGMENT_UPDATE } from '../constants'; import { IPushEventEmitter } from '../types'; import { ISseEventHandler } from '../SSEClient/types'; import { INotificationError, INotificationMessage } from './types'; @@ -84,7 +84,7 @@ export function SSEHandlerFactory(log: ILogger, pushEmitter: IPushEventEmitter, case MEMBERSHIPS_MS_UPDATE: case MEMBERSHIPS_LS_UPDATE: case SPLIT_KILL: - case RBSEGMENT_UPDATE: + case RB_SEGMENT_UPDATE: pushEmitter.emit(parsedData.type, parsedData); break; diff --git a/src/sync/streaming/SSEHandler/types.ts b/src/sync/streaming/SSEHandler/types.ts index 48e9f2c6..a39b8000 100644 --- a/src/sync/streaming/SSEHandler/types.ts +++ b/src/sync/streaming/SSEHandler/types.ts @@ -1,5 +1,5 @@ import { ControlType } from '../constants'; -import { SEGMENT_UPDATE, SPLIT_UPDATE, SPLIT_KILL, CONTROL, OCCUPANCY, MEMBERSHIPS_LS_UPDATE, MEMBERSHIPS_MS_UPDATE, RBSEGMENT_UPDATE } from '../types'; +import { SEGMENT_UPDATE, SPLIT_UPDATE, SPLIT_KILL, CONTROL, OCCUPANCY, MEMBERSHIPS_LS_UPDATE, MEMBERSHIPS_MS_UPDATE, RB_SEGMENT_UPDATE } from '../types'; export enum Compression { None = 0, @@ -42,7 +42,7 @@ export interface ISegmentUpdateData { } export interface ISplitUpdateData { - type: SPLIT_UPDATE | RBSEGMENT_UPDATE, + type: SPLIT_UPDATE | RB_SEGMENT_UPDATE, changeNumber: number, pcn?: number, d?: string, diff --git a/src/sync/streaming/UpdateWorkers/SplitsUpdateWorker.ts b/src/sync/streaming/UpdateWorkers/SplitsUpdateWorker.ts index 8e49474a..8deeee4f 100644 --- a/src/sync/streaming/UpdateWorkers/SplitsUpdateWorker.ts +++ b/src/sync/streaming/UpdateWorkers/SplitsUpdateWorker.ts @@ -8,7 +8,7 @@ import { ITelemetryTracker } from '../../../trackers/types'; import { Backoff } from '../../../utils/Backoff'; import { SPLITS } from '../../../utils/constants'; import { ISegmentsSyncTask, ISplitsSyncTask } from '../../polling/types'; -import { RBSEGMENT_UPDATE } from '../constants'; +import { RB_SEGMENT_UPDATE } from '../constants'; import { parseFFUpdatePayload } from '../parseUtils'; import { ISplitKillData, ISplitUpdateData } from '../SSEHandler/types'; import { FETCH_BACKOFF_BASE, FETCH_BACKOFF_MAX_WAIT, FETCH_BACKOFF_MAX_RETRIES } from './constants'; @@ -72,7 +72,7 @@ export function SplitsUpdateWorker(log: ILogger, splitsCache: ISplitsCacheSync, return { /** - * Invoked by NotificationProcessor on SPLIT_UPDATE or RBSEGMENT_UPDATE event + * Invoked by NotificationProcessor on SPLIT_UPDATE or RB_SEGMENT_UPDATE event * * @param changeNumber - change number of the notification */ @@ -109,14 +109,14 @@ export function SplitsUpdateWorker(log: ILogger, splitsCache: ISplitsCacheSync, try { const payload = parseFFUpdatePayload(parsedData.c, parsedData.d); if (payload) { - (parsedData.type === RBSEGMENT_UPDATE ? rbs : ff).put(parsedData, payload); + (parsedData.type === RB_SEGMENT_UPDATE ? rbs : ff).put(parsedData, payload); return; } } catch (e) { log.warn(STREAMING_PARSING_SPLIT_UPDATE, [parsedData.type, e]); } } - (parsedData.type === RBSEGMENT_UPDATE ? rbs : ff).put(parsedData); + (parsedData.type === RB_SEGMENT_UPDATE ? rbs : ff).put(parsedData); }, /** * Invoked by NotificationProcessor on SPLIT_KILL event diff --git a/src/sync/streaming/constants.ts b/src/sync/streaming/constants.ts index 1aaa10e4..dd230a61 100644 --- a/src/sync/streaming/constants.ts +++ b/src/sync/streaming/constants.ts @@ -30,7 +30,7 @@ export const MEMBERSHIPS_LS_UPDATE = 'MEMBERSHIPS_LS_UPDATE'; export const SEGMENT_UPDATE = 'SEGMENT_UPDATE'; export const SPLIT_KILL = 'SPLIT_KILL'; export const SPLIT_UPDATE = 'SPLIT_UPDATE'; -export const RBSEGMENT_UPDATE = 'RBSEGMENT_UPDATE'; +export const RB_SEGMENT_UPDATE = 'RB_SEGMENT_UPDATE'; // Control-type push notifications, handled by NotificationKeeper export const CONTROL = 'CONTROL'; diff --git a/src/sync/streaming/pushManager.ts b/src/sync/streaming/pushManager.ts index 8dade79f..902d6522 100644 --- a/src/sync/streaming/pushManager.ts +++ b/src/sync/streaming/pushManager.ts @@ -11,7 +11,7 @@ import { authenticateFactory, hashUserKey } from './AuthClient'; import { forOwn } from '../../utils/lang'; import { SSEClient } from './SSEClient'; import { getMatching } from '../../utils/key'; -import { MEMBERSHIPS_MS_UPDATE, MEMBERSHIPS_LS_UPDATE, PUSH_NONRETRYABLE_ERROR, PUSH_SUBSYSTEM_DOWN, SECONDS_BEFORE_EXPIRATION, SEGMENT_UPDATE, SPLIT_KILL, SPLIT_UPDATE, RBSEGMENT_UPDATE, PUSH_RETRYABLE_ERROR, PUSH_SUBSYSTEM_UP, ControlType } from './constants'; +import { MEMBERSHIPS_MS_UPDATE, MEMBERSHIPS_LS_UPDATE, PUSH_NONRETRYABLE_ERROR, PUSH_SUBSYSTEM_DOWN, SECONDS_BEFORE_EXPIRATION, SEGMENT_UPDATE, SPLIT_KILL, SPLIT_UPDATE, RB_SEGMENT_UPDATE, PUSH_RETRYABLE_ERROR, PUSH_SUBSYSTEM_UP, ControlType } from './constants'; import { STREAMING_FALLBACK, STREAMING_REFRESH_TOKEN, STREAMING_CONNECTING, STREAMING_DISABLED, ERROR_STREAMING_AUTH, STREAMING_DISCONNECTING, STREAMING_RECONNECT, STREAMING_PARSING_MEMBERSHIPS_UPDATE } from '../../logger/constants'; import { IMembershipMSUpdateData, IMembershipLSUpdateData, KeyList, UpdateStrategy } from './SSEHandler/types'; import { getDelay, isInBitmap, parseBitmap, parseKeyList } from './parseUtils'; @@ -220,7 +220,7 @@ export function pushManagerFactory( pushEmitter.on(SPLIT_KILL, splitsUpdateWorker.killSplit); pushEmitter.on(SPLIT_UPDATE, splitsUpdateWorker.put); - pushEmitter.on(RBSEGMENT_UPDATE, splitsUpdateWorker.put); + pushEmitter.on(RB_SEGMENT_UPDATE, splitsUpdateWorker.put); function handleMySegmentsUpdate(parsedData: IMembershipMSUpdateData | IMembershipLSUpdateData) { switch (parsedData.u) { diff --git a/src/sync/streaming/types.ts b/src/sync/streaming/types.ts index 7ad46349..fcf5048e 100644 --- a/src/sync/streaming/types.ts +++ b/src/sync/streaming/types.ts @@ -16,19 +16,19 @@ export type MEMBERSHIPS_LS_UPDATE = 'MEMBERSHIPS_LS_UPDATE'; export type SEGMENT_UPDATE = 'SEGMENT_UPDATE'; export type SPLIT_KILL = 'SPLIT_KILL'; export type SPLIT_UPDATE = 'SPLIT_UPDATE'; -export type RBSEGMENT_UPDATE = 'RBSEGMENT_UPDATE'; +export type RB_SEGMENT_UPDATE = 'RB_SEGMENT_UPDATE'; // Control-type push notifications, handled by NotificationKeeper export type CONTROL = 'CONTROL'; export type OCCUPANCY = 'OCCUPANCY'; -export type IPushEvent = PUSH_SUBSYSTEM_UP | PUSH_SUBSYSTEM_DOWN | PUSH_NONRETRYABLE_ERROR | PUSH_RETRYABLE_ERROR | MEMBERSHIPS_MS_UPDATE | MEMBERSHIPS_LS_UPDATE | SEGMENT_UPDATE | SPLIT_UPDATE | SPLIT_KILL | RBSEGMENT_UPDATE | ControlType.STREAMING_RESET +export type IPushEvent = PUSH_SUBSYSTEM_UP | PUSH_SUBSYSTEM_DOWN | PUSH_NONRETRYABLE_ERROR | PUSH_RETRYABLE_ERROR | MEMBERSHIPS_MS_UPDATE | MEMBERSHIPS_LS_UPDATE | SEGMENT_UPDATE | SPLIT_UPDATE | SPLIT_KILL | RB_SEGMENT_UPDATE | ControlType.STREAMING_RESET type IParsedData = T extends MEMBERSHIPS_MS_UPDATE ? IMembershipMSUpdateData : T extends MEMBERSHIPS_LS_UPDATE ? IMembershipLSUpdateData : T extends SEGMENT_UPDATE ? ISegmentUpdateData : - T extends SPLIT_UPDATE | RBSEGMENT_UPDATE ? ISplitUpdateData : + T extends SPLIT_UPDATE | RB_SEGMENT_UPDATE ? ISplitUpdateData : T extends SPLIT_KILL ? ISplitKillData : INotificationData; /** From 781d8ce0c7d3c57a168e3f6a398833340c00b42e Mon Sep 17 00:00:00 2001 From: Emiliano Sanchez Date: Thu, 6 Mar 2025 22:04:21 -0300 Subject: [PATCH 5/5] Refactor usesSegments method and SplitsUpdateWorker --- README.md | 1 + src/storages/__tests__/KeyBuilder.spec.ts | 12 +++++----- .../__tests__/RBSegmentsCacheSync.spec.ts | 4 ++-- .../inLocalStorage/RBSegmentsCacheInLocal.ts | 6 ----- .../inMemory/RBSegmentsCacheInMemory.ts | 2 +- .../UpdateWorkers/SplitsUpdateWorker.ts | 18 +++++++------- .../__tests__/SplitsUpdateWorker.spec.ts | 24 +++++++++---------- src/sync/streaming/pushManager.ts | 2 +- 8 files changed, 32 insertions(+), 37 deletions(-) diff --git a/README.md b/README.md index 133917cb..85f791cd 100644 --- a/README.md +++ b/README.md @@ -27,6 +27,7 @@ Split has built and maintains SDKs for: * .NET [Github](https://github.com/splitio/dotnet-client) [Docs](https://help.split.io/hc/en-us/articles/360020240172--NET-SDK) * Android [Github](https://github.com/splitio/android-client) [Docs](https://help.split.io/hc/en-us/articles/360020343291-Android-SDK) * Angular [Github](https://github.com/splitio/angular-sdk-plugin) [Docs](https://help.split.io/hc/en-us/articles/6495326064397-Angular-utilities) +* Elixir thin-client [Github](https://github.com/splitio/elixir-thin-client) [Docs](https://help.split.io/hc/en-us/articles/26988707417869-Elixir-Thin-Client-SDK) * Flutter [Github](https://github.com/splitio/flutter-sdk-plugin) [Docs](https://help.split.io/hc/en-us/articles/8096158017165-Flutter-plugin) * GO [Github](https://github.com/splitio/go-client) [Docs](https://help.split.io/hc/en-us/articles/360020093652-Go-SDK) * iOS [Github](https://github.com/splitio/ios-client) [Docs](https://help.split.io/hc/en-us/articles/360020401491-iOS-SDK) diff --git a/src/storages/__tests__/KeyBuilder.spec.ts b/src/storages/__tests__/KeyBuilder.spec.ts index 45af194c..6c3fe5ff 100644 --- a/src/storages/__tests__/KeyBuilder.spec.ts +++ b/src/storages/__tests__/KeyBuilder.spec.ts @@ -106,16 +106,16 @@ test('KEYS / latency and exception keys (telemetry)', () => { test('getStorageHash', () => { expect(getStorageHash({ core: { authorizationKey: '' }, - sync: { __splitFiltersValidation: { queryString: '&names=p1__split,p2__split' }, flagSpecVersion: '1.2' } - } as ISettings)).toBe('7ccd6b31'); + sync: { __splitFiltersValidation: { queryString: '&names=p1__split,p2__split' }, flagSpecVersion: '1.3' } + } as ISettings)).toBe('2ce5cc38'); expect(getStorageHash({ core: { authorizationKey: '' }, - sync: { __splitFiltersValidation: { queryString: '&names=p2__split,p3__split' }, flagSpecVersion: '1.2' } - } as ISettings)).toBe('2a25d0e1'); + sync: { __splitFiltersValidation: { queryString: '&names=p2__split,p3__split' }, flagSpecVersion: '1.3' } + } as ISettings)).toBe('e65079c6'); expect(getStorageHash({ core: { authorizationKey: '' }, - sync: { __splitFiltersValidation: { queryString: null }, flagSpecVersion: '1.2' } - } as ISettings)).toBe('db8943b4'); + sync: { __splitFiltersValidation: { queryString: null }, flagSpecVersion: '1.3' } + } as ISettings)).toBe('193e6f3f'); }); diff --git a/src/storages/__tests__/RBSegmentsCacheSync.spec.ts b/src/storages/__tests__/RBSegmentsCacheSync.spec.ts index ad47af3c..03579351 100644 --- a/src/storages/__tests__/RBSegmentsCacheSync.spec.ts +++ b/src/storages/__tests__/RBSegmentsCacheSync.spec.ts @@ -61,7 +61,7 @@ describe.each([cacheInMemory, cacheInLocal])('Rule-based segments cache sync (Me }); test('usesSegments should track segments usage correctly', () => { - expect(cache.usesSegments()).toBe(true); // Initially true when changeNumber is -1 + expect(cache.usesSegments()).toBe(false); // No rbSegments, so false cache.update([rbSegment], [], 1); // rbSegment doesn't have IN_SEGMENT matcher expect(cache.usesSegments()).toBe(false); @@ -70,6 +70,6 @@ describe.each([cacheInMemory, cacheInLocal])('Rule-based segments cache sync (Me expect(cache.usesSegments()).toBe(true); cache.clear(); - expect(cache.usesSegments()).toBe(true); // True after clear since changeNumber is -1 + expect(cache.usesSegments()).toBe(false); // False after clear since there are no rbSegments }); }); diff --git a/src/storages/inLocalStorage/RBSegmentsCacheInLocal.ts b/src/storages/inLocalStorage/RBSegmentsCacheInLocal.ts index 28c0d1ee..85f73a56 100644 --- a/src/storages/inLocalStorage/RBSegmentsCacheInLocal.ts +++ b/src/storages/inLocalStorage/RBSegmentsCacheInLocal.ts @@ -12,7 +12,6 @@ export class RBSegmentsCacheInLocal implements IRBSegmentsCacheSync { private readonly keys: KeyBuilderCS; private readonly log: ILogger; - private hasSync?: boolean; constructor(settings: ISettings, keys: KeyBuilderCS) { this.keys = keys; @@ -22,7 +21,6 @@ export class RBSegmentsCacheInLocal implements IRBSegmentsCacheSync { clear() { this.getNames().forEach(name => this.remove(name)); localStorage.removeItem(this.keys.buildRBSegmentsTillKey()); - this.hasSync = false; } update(toAdd: IRBSegment[], toRemove: IRBSegment[], changeNumber: number): boolean { @@ -35,7 +33,6 @@ export class RBSegmentsCacheInLocal implements IRBSegmentsCacheSync { try { localStorage.setItem(this.keys.buildRBSegmentsTillKey(), changeNumber + ''); localStorage.setItem(this.keys.buildLastUpdatedKey(), Date.now() + ''); - this.hasSync = true; } catch (e) { this.log.error(LOG_PREFIX + e); } @@ -128,9 +125,6 @@ export class RBSegmentsCacheInLocal implements IRBSegmentsCacheSync { } usesSegments(): boolean { - // If cache hasn't been synchronized, assume we need segments - if (!this.hasSync) return true; - const storedCount = localStorage.getItem(this.keys.buildSplitsWithSegmentCountKey()); const splitsWithSegmentsCount = storedCount === null ? 0 : toNumber(storedCount); diff --git a/src/storages/inMemory/RBSegmentsCacheInMemory.ts b/src/storages/inMemory/RBSegmentsCacheInMemory.ts index 78debb86..568b0deb 100644 --- a/src/storages/inMemory/RBSegmentsCacheInMemory.ts +++ b/src/storages/inMemory/RBSegmentsCacheInMemory.ts @@ -62,7 +62,7 @@ export class RBSegmentsCacheInMemory implements IRBSegmentsCacheSync { } usesSegments(): boolean { - return this.getChangeNumber() === -1 || this.segmentsCount > 0; + return this.segmentsCount > 0; } } diff --git a/src/sync/streaming/UpdateWorkers/SplitsUpdateWorker.ts b/src/sync/streaming/UpdateWorkers/SplitsUpdateWorker.ts index 8deeee4f..b151477c 100644 --- a/src/sync/streaming/UpdateWorkers/SplitsUpdateWorker.ts +++ b/src/sync/streaming/UpdateWorkers/SplitsUpdateWorker.ts @@ -3,7 +3,7 @@ import { STREAMING_PARSING_SPLIT_UPDATE } from '../../../logger/constants'; import { ILogger } from '../../../logger/types'; import { SDK_SPLITS_ARRIVED } from '../../../readiness/constants'; import { ISplitsEventEmitter } from '../../../readiness/types'; -import { ISplitsCacheSync } from '../../../storages/types'; +import { IRBSegmentsCacheSync, ISplitsCacheSync, IStorageSync } from '../../../storages/types'; import { ITelemetryTracker } from '../../../trackers/types'; import { Backoff } from '../../../utils/Backoff'; import { SPLITS } from '../../../utils/constants'; @@ -17,9 +17,9 @@ import { IUpdateWorker } from './types'; /** * SplitsUpdateWorker factory */ -export function SplitsUpdateWorker(log: ILogger, splitsCache: ISplitsCacheSync, splitsSyncTask: ISplitsSyncTask, splitsEventEmitter: ISplitsEventEmitter, telemetryTracker: ITelemetryTracker, segmentsSyncTask?: ISegmentsSyncTask): IUpdateWorker<[updateData: ISplitUpdateData]> & { killSplit(event: ISplitKillData): void } { +export function SplitsUpdateWorker(log: ILogger, storage: IStorageSync, splitsSyncTask: ISplitsSyncTask, splitsEventEmitter: ISplitsEventEmitter, telemetryTracker: ITelemetryTracker, segmentsSyncTask?: ISegmentsSyncTask): IUpdateWorker<[updateData: ISplitUpdateData]> & { killSplit(event: ISplitKillData): void } { - function SplitsUpdateWorker() { + function SplitsUpdateWorker(cache: ISplitsCacheSync | IRBSegmentsCacheSync) { let maxChangeNumber = 0; let handleNewEvent = false; let isHandlingEvent: boolean; @@ -29,7 +29,7 @@ export function SplitsUpdateWorker(log: ILogger, splitsCache: ISplitsCacheSync, function __handleSplitUpdateCall() { isHandlingEvent = true; - if (maxChangeNumber > splitsCache.getChangeNumber()) { + if (maxChangeNumber > cache.getChangeNumber()) { handleNewEvent = false; const splitUpdateNotification = payload ? { payload, changeNumber: maxChangeNumber } : undefined; // fetch splits revalidating data if cached @@ -44,7 +44,7 @@ export function SplitsUpdateWorker(log: ILogger, splitsCache: ISplitsCacheSync, const attempts = backoff.attempts + 1; - if (maxChangeNumber <= splitsCache.getChangeNumber()) { + if (maxChangeNumber <= cache.getChangeNumber()) { log.debug(`Refresh completed${cdnBypass ? ' bypassing the CDN' : ''} in ${attempts} attempts.`); isHandlingEvent = false; return; @@ -77,7 +77,7 @@ export function SplitsUpdateWorker(log: ILogger, splitsCache: ISplitsCacheSync, * @param changeNumber - change number of the notification */ put({ changeNumber, pcn }: ISplitUpdateData, _payload?: ISplit | IRBSegment) { - const currentChangeNumber = splitsCache.getChangeNumber(); + const currentChangeNumber = cache.getChangeNumber(); if (changeNumber <= currentChangeNumber || changeNumber <= maxChangeNumber) return; @@ -100,8 +100,8 @@ export function SplitsUpdateWorker(log: ILogger, splitsCache: ISplitsCacheSync, }; } - const ff = SplitsUpdateWorker(); - const rbs = SplitsUpdateWorker(); + const ff = SplitsUpdateWorker(storage.splits); + const rbs = SplitsUpdateWorker(storage.rbSegments); return { put(parsedData) { @@ -126,7 +126,7 @@ export function SplitsUpdateWorker(log: ILogger, splitsCache: ISplitsCacheSync, * @param defaultTreatment - default treatment value */ killSplit({ changeNumber, splitName, defaultTreatment }: ISplitKillData) { - if (splitsCache.killLocally(splitName, defaultTreatment, changeNumber)) { + if (storage.splits.killLocally(splitName, defaultTreatment, changeNumber)) { // trigger an SDK_UPDATE if Split was killed locally splitsEventEmitter.emit(SDK_SPLITS_ARRIVED, true); } diff --git a/src/sync/streaming/UpdateWorkers/__tests__/SplitsUpdateWorker.spec.ts b/src/sync/streaming/UpdateWorkers/__tests__/SplitsUpdateWorker.spec.ts index 735c7ee0..bf3c294d 100644 --- a/src/sync/streaming/UpdateWorkers/__tests__/SplitsUpdateWorker.spec.ts +++ b/src/sync/streaming/UpdateWorkers/__tests__/SplitsUpdateWorker.spec.ts @@ -65,7 +65,7 @@ describe('SplitsUpdateWorker', () => { const splitsSyncTask = splitsSyncTaskMock(cache); Backoff.__TEST__BASE_MILLIS = 1; // retry immediately - const splitUpdateWorker = SplitsUpdateWorker(loggerMock, cache, splitsSyncTask, telemetryTracker); + const splitUpdateWorker = SplitsUpdateWorker(loggerMock, { splits: cache }, splitsSyncTask, telemetryTracker); // assert calling `splitsSyncTask.execute` if `isExecuting` is false expect(splitsSyncTask.isExecuting()).toBe(false); @@ -104,7 +104,7 @@ describe('SplitsUpdateWorker', () => { Backoff.__TEST__BASE_MILLIS = 50; const cache = new SplitsCacheInMemory(); const splitsSyncTask = splitsSyncTaskMock(cache, [90, 90, 90]); - const splitUpdateWorker = SplitsUpdateWorker(loggerMock, cache, splitsSyncTask, telemetryTracker); + const splitUpdateWorker = SplitsUpdateWorker(loggerMock, { splits: cache }, splitsSyncTask, telemetryTracker); // while fetch fails, should retry with backoff splitUpdateWorker.put({ changeNumber: 100 }); @@ -123,7 +123,7 @@ describe('SplitsUpdateWorker', () => { Backoff.__TEST__MAX_MILLIS = 60; // 60 millis instead of 1 min const cache = new SplitsCacheInMemory(); const splitsSyncTask = splitsSyncTaskMock(cache, [...Array(FETCH_BACKOFF_MAX_RETRIES).fill(90), 90, 100]); // 12 executions. Last one is valid - const splitUpdateWorker = SplitsUpdateWorker(loggerMock, cache, splitsSyncTask, telemetryTracker); + const splitUpdateWorker = SplitsUpdateWorker(loggerMock, { splits: cache }, splitsSyncTask, telemetryTracker); splitUpdateWorker.put({ changeNumber: 100 }); // queued @@ -148,7 +148,7 @@ describe('SplitsUpdateWorker', () => { Backoff.__TEST__MAX_MILLIS = 60; // 60 millis instead of 1 min const cache = new SplitsCacheInMemory(); const splitsSyncTask = splitsSyncTaskMock(cache, Array(FETCH_BACKOFF_MAX_RETRIES * 2).fill(90)); // 20 executions. No one is valid - const splitUpdateWorker = SplitsUpdateWorker(loggerMock, cache, splitsSyncTask, telemetryTracker); + const splitUpdateWorker = SplitsUpdateWorker(loggerMock, { splits: cache }, splitsSyncTask, telemetryTracker); splitUpdateWorker.put({ changeNumber: 100 }); // queued @@ -169,11 +169,11 @@ describe('SplitsUpdateWorker', () => { test('killSplit', async () => { // setup const cache = new SplitsCacheInMemory(); - cache.addSplit({ name: 'something'}); - cache.addSplit({ name: 'something else'}); + cache.addSplit({ name: 'something' }); + cache.addSplit({ name: 'something else' }); const splitsSyncTask = splitsSyncTaskMock(cache); - const splitUpdateWorker = SplitsUpdateWorker(loggerMock, cache, splitsSyncTask, splitsEventEmitterMock, telemetryTracker); + const splitUpdateWorker = SplitsUpdateWorker(loggerMock, { splits: cache }, splitsSyncTask, splitsEventEmitterMock, telemetryTracker); // assert killing split locally, emitting SDK_SPLITS_ARRIVED event, and synchronizing splits if changeNumber is new splitUpdateWorker.killSplit({ changeNumber: 100, splitName: 'something', defaultTreatment: 'off' }); // splitsCache.killLocally is synchronous @@ -200,7 +200,7 @@ describe('SplitsUpdateWorker', () => { const cache = new SplitsCacheInMemory(); const splitsSyncTask = splitsSyncTaskMock(cache, [95]); Backoff.__TEST__BASE_MILLIS = 1; - const splitUpdateWorker = SplitsUpdateWorker(loggerMock, cache, splitsSyncTask, telemetryTracker); + const splitUpdateWorker = SplitsUpdateWorker(loggerMock, { splits: cache }, splitsSyncTask, telemetryTracker); splitUpdateWorker.put({ changeNumber: 100 }); @@ -216,7 +216,7 @@ describe('SplitsUpdateWorker', () => { splitNotifications.forEach(notification => { const pcn = cache.getChangeNumber(); const splitsSyncTask = splitsSyncTaskMock(cache); - const splitUpdateWorker = SplitsUpdateWorker(loggerMock, cache, splitsSyncTask, telemetryTracker); + const splitUpdateWorker = SplitsUpdateWorker(loggerMock, { splits: cache }, splitsSyncTask, telemetryTracker); const payload = notification.decoded; const changeNumber = payload.changeNumber; splitUpdateWorker.put({ changeNumber, pcn, d: notification.data, c: notification.compression }); // queued @@ -236,7 +236,7 @@ describe('SplitsUpdateWorker', () => { const notification = splitNotifications[0]; let splitsSyncTask = splitsSyncTaskMock(cache); - let splitUpdateWorker = SplitsUpdateWorker(loggerMock, cache, splitsSyncTask, telemetryTracker); + let splitUpdateWorker = SplitsUpdateWorker(loggerMock, { splits: cache }, splitsSyncTask, telemetryTracker); splitUpdateWorker.put({ changeNumber, pcn, d: notification.data, c: notification.compression }); expect(splitsSyncTask.execute).toBeCalledTimes(1); expect(splitsSyncTask.execute.mock.calls[0]).toEqual([true, undefined, undefined]); @@ -249,7 +249,7 @@ describe('SplitsUpdateWorker', () => { cache.setChangeNumber(ccn); splitsSyncTask = splitsSyncTaskMock(cache); - splitUpdateWorker = SplitsUpdateWorker(loggerMock, cache, splitsSyncTask, telemetryTracker); + splitUpdateWorker = SplitsUpdateWorker(loggerMock, { splits: cache }, splitsSyncTask, telemetryTracker); splitUpdateWorker.put({ changeNumber, pcn, d: notification.data, c: notification.compression }); expect(splitsSyncTask.execute).toBeCalledTimes(1); expect(splitsSyncTask.execute.mock.calls[0]).toEqual([true, undefined, undefined]); @@ -262,7 +262,7 @@ describe('SplitsUpdateWorker', () => { cache.setChangeNumber(ccn); splitsSyncTask = splitsSyncTaskMock(cache); - splitUpdateWorker = SplitsUpdateWorker(loggerMock, cache, splitsSyncTask, telemetryTracker); + splitUpdateWorker = SplitsUpdateWorker(loggerMock, { splits: cache }, splitsSyncTask, telemetryTracker); splitUpdateWorker.put({ changeNumber, pcn, d: notification.data, c: notification.compression }); expect(splitsSyncTask.execute).toBeCalledTimes(1); expect(splitsSyncTask.execute.mock.calls[0]).toEqual([true, undefined, { payload: notification.decoded, changeNumber }]); diff --git a/src/sync/streaming/pushManager.ts b/src/sync/streaming/pushManager.ts index 902d6522..9122c176 100644 --- a/src/sync/streaming/pushManager.ts +++ b/src/sync/streaming/pushManager.ts @@ -56,7 +56,7 @@ export function pushManagerFactory( // MySegmentsUpdateWorker (client-side) are initiated in `add` method const segmentsUpdateWorker = userKey ? undefined : SegmentsUpdateWorker(log, pollingManager.segmentsSyncTask as ISegmentsSyncTask, storage.segments); // For server-side we pass the segmentsSyncTask, used by SplitsUpdateWorker to fetch new segments - const splitsUpdateWorker = SplitsUpdateWorker(log, storage.splits, pollingManager.splitsSyncTask, readiness.splits, telemetryTracker, userKey ? undefined : pollingManager.segmentsSyncTask as ISegmentsSyncTask); + const splitsUpdateWorker = SplitsUpdateWorker(log, storage, pollingManager.splitsSyncTask, readiness.splits, telemetryTracker, userKey ? undefined : pollingManager.segmentsSyncTask as ISegmentsSyncTask); // [Only for client-side] map of hashes to user keys, to dispatch membership update events to the corresponding MySegmentsUpdateWorker const userKeyHashes: Record = {};