diff --git a/src/dtos/types.ts b/src/dtos/types.ts index 7573183c..1bb49a21 100644 --- a/src/dtos/types.ts +++ b/src/dtos/types.ts @@ -194,6 +194,17 @@ export interface ISplitCondition { conditionType: 'ROLLOUT' | 'WHITELIST' } +export interface IRBSegment { + name: string, + changeNumber: number, + status: 'ACTIVE' | 'ARCHIVED', + excluded: { + keys: string[], + segments: string[] + }, + conditions: ISplitCondition[], +} + export interface ISplit { name: string, changeNumber: number, diff --git a/src/storages/KeyBuilder.ts b/src/storages/KeyBuilder.ts index 2f5dc800..7e9b7e85 100644 --- a/src/storages/KeyBuilder.ts +++ b/src/storages/KeyBuilder.ts @@ -37,6 +37,18 @@ export class KeyBuilder { return `${this.prefix}.split.`; } + buildRBSegmentKey(splitName: string) { + return `${this.prefix}.rbsegment.${splitName}`; + } + + buildRBSegmentsTillKey() { + return `${this.prefix}.rbsegments.till`; + } + + buildRBSegmentKeyPrefix() { + return `${this.prefix}.rbsegment.`; + } + buildSegmentNameKey(segmentName: string) { return `${this.prefix}.segment.${segmentName}`; } diff --git a/src/storages/KeyBuilderSS.ts b/src/storages/KeyBuilderSS.ts index 6232d88a..cf8d2156 100644 --- a/src/storages/KeyBuilderSS.ts +++ b/src/storages/KeyBuilderSS.ts @@ -53,6 +53,10 @@ export class KeyBuilderSS extends KeyBuilder { return `${this.buildSplitKeyPrefix()}*`; } + searchPatternForRBSegmentKeys() { + return `${this.buildRBSegmentKeyPrefix()}*`; + } + /* Telemetry keys */ buildLatencyKey(method: Method, bucket: number) { diff --git a/src/storages/__tests__/RBSegmentsCacheAsync.spec.ts b/src/storages/__tests__/RBSegmentsCacheAsync.spec.ts new file mode 100644 index 00000000..8c2d6678 --- /dev/null +++ b/src/storages/__tests__/RBSegmentsCacheAsync.spec.ts @@ -0,0 +1,59 @@ +import { RBSegmentsCacheInRedis } from '../inRedis/RBSegmentsCacheInRedis'; +import { RBSegmentsCachePluggable } from '../pluggable/RBSegmentsCachePluggable'; +import { KeyBuilderSS } from '../KeyBuilderSS'; +import { rbSegment, rbSegmentWithInSegmentMatcher } from '../__tests__/testUtils'; +import { loggerMock } from '../../logger/__tests__/sdkLogger.mock'; +import { metadata } from './KeyBuilder.spec'; +import { RedisAdapter } from '../inRedis/RedisAdapter'; +import { wrapperMockFactory } from '../pluggable/__tests__/wrapper.mock'; + +const keys = new KeyBuilderSS('RBSEGMENT', metadata); + +const redisClient = new RedisAdapter(loggerMock); +const cacheInRedis = new RBSegmentsCacheInRedis(loggerMock, keys, redisClient); + +const storageWrapper = wrapperMockFactory(); +const cachePluggable = new RBSegmentsCachePluggable(loggerMock, keys, storageWrapper); + +describe.each([{ cache: cacheInRedis, wrapper: redisClient }, { cache: cachePluggable, wrapper: storageWrapper }])('Rule-based segments cache async (Redis & Pluggable)', ({ cache, wrapper }) => { + + afterAll(async () => { + await wrapper.del(keys.buildRBSegmentsTillKey()); + await wrapper.disconnect(); + }); + + test('update should add and remove segments correctly', async () => { + // Add segments + expect(await cache.update([rbSegment, rbSegmentWithInSegmentMatcher], [], 1)).toBe(true); + expect(await cache.get(rbSegment.name)).toEqual(rbSegment); + expect(await cache.get(rbSegmentWithInSegmentMatcher.name)).toEqual(rbSegmentWithInSegmentMatcher); + expect(await cache.getChangeNumber()).toBe(1); + + // Remove a segment + expect(await cache.update([], [rbSegment], 2)).toBe(true); + expect(await cache.get(rbSegment.name)).toBeNull(); + expect(await cache.get(rbSegmentWithInSegmentMatcher.name)).toEqual(rbSegmentWithInSegmentMatcher); + expect(await cache.getChangeNumber()).toBe(2); + + // Remove remaining segment + expect(await cache.update([], [rbSegmentWithInSegmentMatcher], 3)).toBe(true); + expect(await cache.get(rbSegment.name)).toBeNull(); + expect(await cache.get(rbSegmentWithInSegmentMatcher.name)).toBeNull(); + expect(await cache.getChangeNumber()).toBe(3); + + // No changes + expect(await cache.update([], [rbSegmentWithInSegmentMatcher], 4)).toBe(false); + expect(await cache.getChangeNumber()).toBe(4); + }); + + test('contains should check for segment existence correctly', async () => { + await cache.update([rbSegment, rbSegmentWithInSegmentMatcher], [], 1); + + expect(await cache.contains(new Set([rbSegment.name]))).toBe(true); + expect(await cache.contains(new Set([rbSegment.name, rbSegmentWithInSegmentMatcher.name]))).toBe(true); + expect(await cache.contains(new Set(['nonexistent']))).toBe(false); + expect(await cache.contains(new Set([rbSegment.name, 'nonexistent']))).toBe(false); + + await cache.update([], [rbSegment, rbSegmentWithInSegmentMatcher], 2); + }); +}); diff --git a/src/storages/__tests__/testUtils.ts b/src/storages/__tests__/testUtils.ts index fa38944f..a4009b1c 100644 --- a/src/storages/__tests__/testUtils.ts +++ b/src/storages/__tests__/testUtils.ts @@ -1,4 +1,4 @@ -import { ISplit } from '../../dtos/types'; +import { IRBSegment, ISplit } from '../../dtos/types'; import { IStorageSync, IStorageAsync, IImpressionsCacheSync, IEventsCacheSync } from '../types'; // Assert that instances created by storage factories have the expected interface @@ -45,3 +45,9 @@ export const featureFlagTwo: ISplit = { name: 'ff_two', sets: ['t','w','o'] }; export const featureFlagThree: ISplit = { name: 'ff_three', sets: ['t','h','r','e'] }; //@ts-ignore export const featureFlagWithoutFS: ISplit = { name: 'ff_four' }; + +// Rule-based segments +//@ts-ignore +export const rbSegment: IRBSegment = { name: 'rb_segment', conditions: [{ matcherGroup: { matchers: [{ matcherType: 'EQUAL_TO', unaryNumericMatcherData: { value: 10 } }] } }] }; +//@ts-ignore +export const rbSegmentWithInSegmentMatcher: IRBSegment = { name: 'rb_segment_with_in_segment_matcher', conditions: [{ matcherGroup: { matchers: [{ matcherType: 'IN_SEGMENT', userDefinedSegmentMatcherData: { segmentName: 'employees' } }] } }] }; diff --git a/src/storages/inRedis/RBSegmentsCacheInRedis.ts b/src/storages/inRedis/RBSegmentsCacheInRedis.ts new file mode 100644 index 00000000..dc36f64c --- /dev/null +++ b/src/storages/inRedis/RBSegmentsCacheInRedis.ts @@ -0,0 +1,79 @@ +import { isNaNNumber } from '../../utils/lang'; +import { IRBSegmentsCacheAsync } from '../types'; +import { ILogger } from '../../logger/types'; +import { IRBSegment } from '../../dtos/types'; +import { LOG_PREFIX } from './constants'; +import { setToArray } from '../../utils/lang/sets'; +import { RedisAdapter } from './RedisAdapter'; +import { KeyBuilderSS } from '../KeyBuilderSS'; + +export class RBSegmentsCacheInRedis implements IRBSegmentsCacheAsync { + + private readonly log: ILogger; + private readonly keys: KeyBuilderSS; + private readonly redis: RedisAdapter; + + constructor(log: ILogger, keys: KeyBuilderSS, redis: RedisAdapter) { + this.log = log; + this.keys = keys; + this.redis = redis; + } + + get(name: string): Promise { + return this.redis.get(this.keys.buildRBSegmentKey(name)) + .then(maybeRBSegment => maybeRBSegment && JSON.parse(maybeRBSegment)); + } + + private getNames(): Promise { + return this.redis.keys(this.keys.searchPatternForRBSegmentKeys()).then( + (listOfKeys) => listOfKeys.map(this.keys.extractKey) + ); + } + + contains(names: Set): Promise { + const namesArray = setToArray(names); + return this.getNames().then(namesInStorage => { + return namesArray.every(name => namesInStorage.includes(name)); + }); + } + + update(toAdd: IRBSegment[], toRemove: IRBSegment[], changeNumber: number): Promise { + return Promise.all([ + this.setChangeNumber(changeNumber), + Promise.all(toAdd.map(toAdd => { + const key = this.keys.buildRBSegmentKey(toAdd.name); + const stringifiedNewRBSegment = JSON.stringify(toAdd); + return this.redis.set(key, stringifiedNewRBSegment).then(() => true); + })), + Promise.all(toRemove.map(toRemove => { + const key = this.keys.buildRBSegmentKey(toRemove.name); + return this.redis.del(key).then(status => status === 1); + })) + ]).then(([, added, removed]) => { + return added.some(result => result) || removed.some(result => result); + }); + } + + setChangeNumber(changeNumber: number) { + return this.redis.set(this.keys.buildRBSegmentsTillKey(), changeNumber + '').then( + status => status === 'OK' + ); + } + + getChangeNumber(): Promise { + return this.redis.get(this.keys.buildRBSegmentsTillKey()).then((value: string | null) => { + const i = parseInt(value as string, 10); + + return isNaNNumber(i) ? -1 : i; + }).catch((e) => { + this.log.error(LOG_PREFIX + 'Could not retrieve changeNumber from storage. Error: ' + e); + return -1; + }); + } + + // @TODO implement if required by DataLoader or producer mode + clear() { + return Promise.resolve(); + } + +} diff --git a/src/storages/pluggable/RBSegmentsCachePluggable.ts b/src/storages/pluggable/RBSegmentsCachePluggable.ts new file mode 100644 index 00000000..c1967f6d --- /dev/null +++ b/src/storages/pluggable/RBSegmentsCachePluggable.ts @@ -0,0 +1,76 @@ +import { isNaNNumber } from '../../utils/lang'; +import { KeyBuilder } from '../KeyBuilder'; +import { IPluggableStorageWrapper, IRBSegmentsCacheAsync } from '../types'; +import { ILogger } from '../../logger/types'; +import { IRBSegment } from '../../dtos/types'; +import { LOG_PREFIX } from './constants'; +import { setToArray } from '../../utils/lang/sets'; + +export class RBSegmentsCachePluggable implements IRBSegmentsCacheAsync { + + private readonly log: ILogger; + private readonly keys: KeyBuilder; + private readonly wrapper: IPluggableStorageWrapper; + + constructor(log: ILogger, keys: KeyBuilder, wrapper: IPluggableStorageWrapper) { + this.log = log; + this.keys = keys; + this.wrapper = wrapper; + } + + get(name: string): Promise { + return this.wrapper.get(this.keys.buildRBSegmentKey(name)) + .then(maybeRBSegment => maybeRBSegment && JSON.parse(maybeRBSegment)); + } + + private getNames(): Promise { + return this.wrapper.getKeysByPrefix(this.keys.buildRBSegmentKeyPrefix()).then( + (listOfKeys) => listOfKeys.map(this.keys.extractKey) + ); + } + + contains(names: Set): Promise { + const namesArray = setToArray(names); + return this.getNames().then(namesInStorage => { + return namesArray.every(name => namesInStorage.includes(name)); + }); + } + + update(toAdd: IRBSegment[], toRemove: IRBSegment[], changeNumber: number): Promise { + return Promise.all([ + this.setChangeNumber(changeNumber), + Promise.all(toAdd.map(toAdd => { + const key = this.keys.buildRBSegmentKey(toAdd.name); + const stringifiedNewRBSegment = JSON.stringify(toAdd); + return this.wrapper.set(key, stringifiedNewRBSegment).then(() => true); + })), + Promise.all(toRemove.map(toRemove => { + const key = this.keys.buildRBSegmentKey(toRemove.name); + return this.wrapper.del(key); + })) + ]).then(([, added, removed]) => { + return added.some(result => result) || removed.some(result => result); + }); + } + + setChangeNumber(changeNumber: number) { + return this.wrapper.set(this.keys.buildRBSegmentsTillKey(), changeNumber + ''); + } + + getChangeNumber(): Promise { + return this.wrapper.get(this.keys.buildRBSegmentsTillKey()).then((value) => { + const i = parseInt(value as string, 10); + + return isNaNNumber(i) ? -1 : i; + }).catch((e) => { + this.log.error(LOG_PREFIX + 'Could not retrieve changeNumber from storage. Error: ' + e); + return -1; + }); + } + + // @TODO implement if required by DataLoader or producer mode + clear() { + return Promise.resolve(); + } + +} diff --git a/src/storages/types.ts b/src/storages/types.ts index 71962254..9a1741c9 100644 --- a/src/storages/types.ts +++ b/src/storages/types.ts @@ -1,5 +1,5 @@ import SplitIO from '../../types/splitio'; -import { MaybeThenable, ISplit, IMySegmentsResponse } from '../dtos/types'; +import { MaybeThenable, ISplit, IRBSegment, IMySegmentsResponse } from '../dtos/types'; import { MySegmentsData } from '../sync/polling/types'; import { EventDataType, HttpErrors, HttpLatencies, ImpressionDataType, LastSync, Method, MethodExceptions, MethodLatencies, MultiMethodExceptions, MultiMethodLatencies, MultiConfigs, OperationType, StoredEventWithMetadata, StoredImpressionWithMetadata, StreamingEvent, UniqueKeysPayloadCs, UniqueKeysPayloadSs, TelemetryUsageStatsPayload, UpdatesFromSSEEnum } from '../sync/submitters/types'; import { ISettings } from '../types'; @@ -225,6 +225,34 @@ export interface ISplitsCacheAsync extends ISplitsCacheBase { getNamesByFlagSets(flagSets: string[]): Promise[]> } +/** Rule-Based Segments cache */ + +export interface IRBSegmentsCacheBase { + update(toAdd: IRBSegment[], toRemove: IRBSegment[], changeNumber: number): MaybeThenable, + get(name: string): MaybeThenable, + getChangeNumber(): MaybeThenable, + clear(): MaybeThenable, + contains(names: Set): MaybeThenable, +} + +export interface IRBSegmentsCacheSync extends IRBSegmentsCacheBase { + update(toAdd: IRBSegment[], toRemove: IRBSegment[], changeNumber: number): boolean, + get(name: string): IRBSegment | null, + getChangeNumber(): number, + clear(): void, + contains(names: Set): boolean, + // Used only for smart pausing in client-side standalone. Returns true if the storage contains a RBSegment using segments or large segments matchers + usesSegments(): boolean, +} + +export interface IRBSegmentsCacheAsync extends IRBSegmentsCacheBase { + update(toAdd: IRBSegment[], toRemove: IRBSegment[], changeNumber: number): Promise, + get(name: string): Promise, + getChangeNumber(): Promise, + clear(): Promise, + contains(names: Set): Promise, +} + /** Segments cache */ export interface ISegmentsCacheBase {