Skip to content

Added the ability to post unique keys in batches #227

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/splitio/go-client/v6
go 1.18

require (
github.com/splitio/go-split-commons/v6 v6.1.0
github.com/splitio/go-split-commons/v6 v6.1.1-0.20250414232405-62906f35f3b0
github.com/splitio/go-toolkit/v5 v5.4.0
)

Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/redis/go-redis/v9 v9.0.4 h1:FC82T+CHJ/Q/PdyLW++GeCO+Ol59Y4T7R4jbgjvktgc=
github.com/redis/go-redis/v9 v9.0.4/go.mod h1:WqMKv5vnQbRuZstUwxQI195wHy+t4PuXDOjzMvcuQHk=
github.com/splitio/go-split-commons/v6 v6.1.0 h1:k3mwr12DF6gbEaV8XXU/tSAQlPkIEuzIgTEneYhGg2I=
github.com/splitio/go-split-commons/v6 v6.1.0/go.mod h1:D/XIY/9Hmfk9ivWsRsJVp439kEdmHbzUi3PKzQQDOXY=
github.com/splitio/go-split-commons/v6 v6.1.1-0.20250414232405-62906f35f3b0 h1:Fvn5XPPQmsh1Zq0+3f9zq/bn2wTtKacdEudAf3JeL9o=
github.com/splitio/go-split-commons/v6 v6.1.1-0.20250414232405-62906f35f3b0/go.mod h1:D/XIY/9Hmfk9ivWsRsJVp439kEdmHbzUi3PKzQQDOXY=
github.com/splitio/go-toolkit/v5 v5.4.0 h1:g5WFpRhQomnXCmvfsNOWV4s5AuUrWIZ+amM68G8NBKM=
github.com/splitio/go-toolkit/v5 v5.4.0/go.mod h1:xYhUvV1gga9/1029Wbp5pjnR6Cy8nvBpjw99wAbsMko=
github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY=
Expand Down
26 changes: 14 additions & 12 deletions splitio/client/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ type sdkStorages struct {
runtimeTelemetry storage.TelemetryRuntimeProducer
evaluationTelemetry storage.TelemetryEvaluationProducer
impressionsCount storage.ImpressionsCountProducer
uniqueKeysStorage storage.UniqueKeysStorage
}

// SplitFactory struct is responsible for instantiating and storing instances of client and manager.
Expand Down Expand Up @@ -283,14 +284,15 @@ func setupInMemoryFactory(
advanced.StreamingEnabled = false
}

inMememoryFullQueue := make(chan string, 2) // Size 2: So that it's able to accept one event from each resource simultaneously.
inMememoryFullQueue := make(chan string, 3) // Size 3: So that it's able to accept one event from each resource simultaneously.

flagSetFilter := flagsets.NewFlagSetFilter(advanced.FlagSetsFilter)
splitsStorage := mutexmap.NewMMSplitStorage(flagSetFilter)
segmentsStorage := mutexmap.NewMMSegmentStorage()
telemetryStorage, err := inmemory.NewTelemetryStorage()
impressionsStorage := mutexqueue.NewMQImpressionsStorage(cfg.Advanced.ImpressionsQueueSize, inMememoryFullQueue, logger, telemetryStorage)
eventsStorage := mutexqueue.NewMQEventsStorage(cfg.Advanced.EventsQueueSize, inMememoryFullQueue, logger, telemetryStorage)
uniqueKeysStorage := mutexqueue.NewMQUniqueKeysStorage(advanced.UniqueKeysQueueSize, inMememoryFullQueue, logger)
if err != nil {
return nil, err
}
Expand All @@ -302,7 +304,7 @@ func setupInMemoryFactory(
SplitUpdater: split.NewSplitUpdater(splitsStorage, splitAPI.SplitFetcher, logger, telemetryStorage, dummyHC, flagSetFilter),
SegmentUpdater: segment.NewSegmentUpdater(splitsStorage, segmentsStorage, splitAPI.SegmentFetcher, logger, telemetryStorage, dummyHC),
EventRecorder: event.NewEventRecorderSingle(eventsStorage, splitAPI.EventRecorder, logger, metadata, telemetryStorage),
TelemetryRecorder: telemetry.NewTelemetrySynchronizer(telemetryStorage, splitAPI.TelemetryRecorder, splitsStorage, segmentsStorage, logger, metadata, telemetryStorage),
TelemetryRecorder: telemetry.NewTelemetrySynchronizer(telemetryStorage, splitAPI.TelemetryRecorder, splitsStorage, segmentsStorage, logger, metadata, telemetryStorage, uniqueKeysStorage),
}
splitTasks := synchronizer.SplitTasks{
SplitSyncTask: tasks.NewFetchSplitsTask(workers.SplitUpdater, cfg.TaskPeriods.SplitSync, logger),
Expand All @@ -320,13 +322,14 @@ func setupInMemoryFactory(
initTelemetry: telemetryStorage,
evaluationTelemetry: telemetryStorage,
runtimeTelemetry: telemetryStorage,
uniqueKeysStorage: uniqueKeysStorage,
}

if cfg.ImpressionsMode == "" {
cfg.ImpressionsMode = config.ImpressionsModeOptimized
}

impressionManager, err := impressions.BuildInMemoryManager(cfg, advanced, logger, &splitTasks, &workers, metadata, splitAPI, storages.runtimeTelemetry, storages.impressionsConsumer)
impressionManager, err := impressions.BuildInMemoryManager(cfg, advanced, logger, &splitTasks, &workers, metadata, splitAPI, storages.runtimeTelemetry, storages.impressionsConsumer, uniqueKeysStorage)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -394,11 +397,8 @@ func setupRedisFactory(apikey string, cfg *conf.SplitSdkConfig, logger logging.L
inMememoryFullQueue := make(chan string, 2) // Size 2: So that it's able to accept one event from each resource simultaneously.
impressionStorage := redis.NewImpressionStorage(redisClient, metadata, logger)

if len(cfg.Advanced.FlagSetsFilter) != 0 {
cfg.Advanced.FlagSetsFilter = []string{}
logger.Warning("FlagSets filter is not applicable for Consumer modes where the SDK does not keep rollout data in sync. FlagSet filter was discarded")
}
flagSetFilter := flagsets.NewFlagSetFilter([]string{})
advanced := conf.NormalizeRedisSDKConf(cfg.Advanced, logger)
flagSetFilter := flagsets.NewFlagSetFilter(advanced.FlagSetsFilter)

storages := sdkStorages{
splits: redis.NewSplitStorage(redisClient, logger, flagSetFilter),
Expand All @@ -410,17 +410,19 @@ func setupRedisFactory(apikey string, cfg *conf.SplitSdkConfig, logger logging.L
evaluationTelemetry: telemetryStorage,
impressionsCount: redis.NewImpressionsCountStorage(redisClient, logger),
runtimeTelemetry: runtimeTelemetry,
uniqueKeysStorage: mutexqueue.NewMQUniqueKeysStorage(advanced.UniqueKeysQueueSize, inMememoryFullQueue, logger),
}

splitTasks := synchronizer.SplitTasks{}
workers := synchronizer.Workers{}
advanced := config.AdvancedConfig{}
workers := synchronizer.Workers{
TelemetryRecorder: telemetry.NewSynchronizerRedis(telemetryStorage, logger, storages.uniqueKeysStorage),
}

if cfg.ImpressionsMode == "" {
cfg.ImpressionsMode = config.ImpressionsModeDebug
}

impressionManager, err := impressions.BuildRedisManager(cfg, logger, &splitTasks, storages.initTelemetry, storages.impressionsCount, storages.runtimeTelemetry)
impressionManager, err := impressions.BuildRedisManager(cfg, logger, &splitTasks, storages.initTelemetry, storages.impressionsCount, storages.runtimeTelemetry, storages.uniqueKeysStorage)
if err != nil {
return nil, err
}
Expand All @@ -444,7 +446,7 @@ func setupRedisFactory(apikey string, cfg *conf.SplitSdkConfig, logger logging.L
operationMode: conf.RedisConsumer,
storages: storages,
readinessSubscriptors: make(map[int]chan int),
telemetrySync: telemetry.NewSynchronizerRedis(telemetryStorage, logger),
telemetrySync: workers.TelemetryRecorder,
impressionManager: impressionManager,
syncManager: syncManager,
}
Expand Down
4 changes: 4 additions & 0 deletions splitio/conf/sdkconf.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ type AdvancedConfig struct {
ImpressionsBulkSize int64
StreamingEnabled bool
FlagSetsFilter []string
UniqueKeysQueueSize int64
UniqueKeysBulkSize int64
}

// Default returns a config struct with all the default values
Expand Down Expand Up @@ -158,6 +160,8 @@ func Default() *SplitSdkConfig {
ImpressionsQueueSize: 10000,
ImpressionsBulkSize: 5000,
StreamingEnabled: true,
UniqueKeysQueueSize: 10000,
UniqueKeysBulkSize: 5000,
},
}
}
Expand Down
29 changes: 29 additions & 0 deletions splitio/conf/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,43 @@ import (

"github.com/splitio/go-split-commons/v6/conf"
"github.com/splitio/go-split-commons/v6/flagsets"
"github.com/splitio/go-toolkit/v5/logging"
)

const (
defaultUniqueKeysQueueSize = 2000
defaultUniqueKeysBulkSize = 1000
)

func NormalizeRedisSDKConf(sdkConfig AdvancedConfig, logger logging.LoggerInterface) conf.AdvancedConfig {
config, _ := NormalizeSDKConf(sdkConfig)

if sdkConfig.UniqueKeysQueueSize == 0 {
config.UniqueKeysQueueSize = defaultUniqueKeysQueueSize
}
if sdkConfig.UniqueKeysBulkSize == 0 {
config.UniqueKeysBulkSize = defaultUniqueKeysBulkSize
}
if len(sdkConfig.FlagSetsFilter) != 0 {
config.FlagSetsFilter = []string{}
logger.Warning("FlagSets filter is not applicable for Consumer modes where the SDK does not keep rollout data in sync. FlagSet filter was discarded")
}

return config
}

// NormalizeSDKConf compares against SDK Config to set defaults
func NormalizeSDKConf(sdkConfig AdvancedConfig) (conf.AdvancedConfig, []error) {
config := conf.GetDefaultAdvancedConfig()
if sdkConfig.HTTPTimeout > 0 {
config.HTTPTimeout = sdkConfig.HTTPTimeout
}
if sdkConfig.UniqueKeysQueueSize > 0 {
config.UniqueKeysQueueSize = sdkConfig.UniqueKeysQueueSize
}
if sdkConfig.UniqueKeysBulkSize > 0 {
config.UniqueKeysBulkSize = sdkConfig.UniqueKeysBulkSize
}
if sdkConfig.EventsBulkSize > 0 {
config.EventsBulkSize = sdkConfig.EventsBulkSize
}
Expand Down
12 changes: 7 additions & 5 deletions splitio/impressions/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,17 @@ func BuildInMemoryManager(
splitAPI *api.SplitAPI,
telemetryStorage storage.TelemetryRuntimeProducer,
impressionStorage storage.ImpressionStorageConsumer,
uniqueKeysStorage storage.UniqueKeysStorage,
) (provisional.ImpressionManager, error) {
listenerEnabled := cfg.Advanced.ImpressionListener != nil
impressionsCounter := strategy.NewImpressionsCounter()
filter := filter.NewBloomFilter(bfExpectedElemenets, bfFalsePositiveProbability)
uniqueKeysTracker := strategy.NewUniqueKeysTracker(filter)
uniqueKeysTracker := strategy.NewUniqueKeysTracker(filter, uniqueKeysStorage)

workers.ImpressionsCountRecorder = impressionscount.NewRecorderSingle(impressionsCounter, splitAPI.ImpressionRecorder, metadata, logger, telemetryStorage)

splitTasks.ImpressionsCountSyncTask = tasks.NewRecordImpressionsCountTask(workers.ImpressionsCountRecorder, logger, impressionsCountPeriodTaskInMemory)
splitTasks.UniqueKeysTask = tasks.NewRecordUniqueKeysTask(workers.TelemetryRecorder, uniqueKeysTracker, uniqueKeysPeriodTaskInMemory, logger)
splitTasks.UniqueKeysTask = tasks.NewRecordUniqueKeysTask(workers.TelemetryRecorder, uniqueKeysPeriodTaskInMemory, logger, advanced.UniqueKeysBulkSize)
splitTasks.CleanFilterTask = tasks.NewCleanFilterTask(filter, logger, bfCleaningPeriod)

noneStrategy := strategy.NewNoneImpl(impressionsCounter, uniqueKeysTracker, listenerEnabled)
Expand Down Expand Up @@ -85,18 +86,19 @@ func BuildRedisManager(
telemetryConfigStorage storage.TelemetryConfigProducer,
impressionsCountStorage storage.ImpressionsCountProducer,
telemetryRuntimeStorage storage.TelemetryRuntimeProducer,
uniqueKeysStorage storage.UniqueKeysStorage,
) (provisional.ImpressionManager, error) {
listenerEnabled := cfg.Advanced.ImpressionListener != nil

impressionsCounter := strategy.NewImpressionsCounter()
filter := filter.NewBloomFilter(bfExpectedElemenets, bfFalsePositiveProbability)
uniqueKeysTracker := strategy.NewUniqueKeysTracker(filter)
uniqueKeysTracker := strategy.NewUniqueKeysTracker(filter, uniqueKeysStorage)

telemetryRecorder := telemetry.NewSynchronizerRedis(telemetryConfigStorage, logger)
telemetryRecorder := telemetry.NewSynchronizerRedis(telemetryConfigStorage, logger, uniqueKeysStorage)
impressionsCountRecorder := impressionscount.NewRecorderRedis(impressionsCounter, impressionsCountStorage, logger)

splitTasks.ImpressionsCountSyncTask = tasks.NewRecordImpressionsCountTask(impressionsCountRecorder, logger, impressionsCountPeriodTaskRedis)
splitTasks.UniqueKeysTask = tasks.NewRecordUniqueKeysTask(telemetryRecorder, uniqueKeysTracker, uniqueKeysPeriodTaskRedis, logger)
splitTasks.UniqueKeysTask = tasks.NewRecordUniqueKeysTask(telemetryRecorder, uniqueKeysPeriodTaskRedis, logger, cfg.Advanced.UniqueKeysBulkSize)
splitTasks.CleanFilterTask = tasks.NewCleanFilterTask(filter, logger, bfCleaningPeriod)

noneStrategy := strategy.NewNoneImpl(impressionsCounter, uniqueKeysTracker, listenerEnabled)
Expand Down
12 changes: 8 additions & 4 deletions splitio/impressions/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ func TestBuildInMemoryWithNone(t *testing.T) {
splitAPI := api.NewSplitAPI("apikey", advanced, logger, metadata)
telemetryStorage, _ := inmemory.NewTelemetryStorage()
impressionsStorage := mutexqueue.NewMQImpressionsStorage(cfg.Advanced.ImpressionsQueueSize, make(chan string, 2), logger, telemetryStorage)
uniqueKeysStorage := mutexqueue.NewMQUniqueKeysStorage(cfg.Advanced.UniqueKeysQueueSize, make(chan string), logger)

impManager, err := BuildInMemoryManager(&cfg, advanced, logger, &splitTasks, &workers, metadata, splitAPI, telemetryStorage, impressionsStorage)
impManager, err := BuildInMemoryManager(&cfg, advanced, logger, &splitTasks, &workers, metadata, splitAPI, telemetryStorage, impressionsStorage, uniqueKeysStorage)
if err != nil {
t.Error("err should be nil. ", err.Error())
}
Expand Down Expand Up @@ -67,8 +68,9 @@ func TestBuildInMemoryWithDebug(t *testing.T) {
splitAPI := api.NewSplitAPI("apikey", advanced, logger, metadata)
telemetryStorage, _ := inmemory.NewTelemetryStorage()
impressionsStorage := mutexqueue.NewMQImpressionsStorage(cfg.Advanced.ImpressionsQueueSize, make(chan string, 2), logger, telemetryStorage)
uniqueKeysStorage := mutexqueue.NewMQUniqueKeysStorage(cfg.Advanced.UniqueKeysQueueSize, make(chan string), logger)

impManager, err := BuildInMemoryManager(&cfg, advanced, logger, &splitTasks, &workers, metadata, splitAPI, telemetryStorage, impressionsStorage)
impManager, err := BuildInMemoryManager(&cfg, advanced, logger, &splitTasks, &workers, metadata, splitAPI, telemetryStorage, impressionsStorage, uniqueKeysStorage)
if err != nil {
t.Error("err should be nil. ", err.Error())
}
Expand Down Expand Up @@ -107,8 +109,9 @@ func TestBuildInMemoryWithOptimized(t *testing.T) {
splitAPI := api.NewSplitAPI("apikey", advanced, logger, metadata)
telemetryStorage, _ := inmemory.NewTelemetryStorage()
impressionsStorage := mutexqueue.NewMQImpressionsStorage(cfg.Advanced.ImpressionsQueueSize, make(chan string, 2), logger, telemetryStorage)
uniqueKeysStorage := mutexqueue.NewMQUniqueKeysStorage(cfg.Advanced.UniqueKeysQueueSize, make(chan string), logger)

impManager, err := BuildInMemoryManager(&cfg, advanced, logger, &splitTasks, &workers, metadata, splitAPI, telemetryStorage, impressionsStorage)
impManager, err := BuildInMemoryManager(&cfg, advanced, logger, &splitTasks, &workers, metadata, splitAPI, telemetryStorage, impressionsStorage, uniqueKeysStorage)
if err != nil {
t.Error("err should be nil. ", err.Error())
}
Expand Down Expand Up @@ -143,8 +146,9 @@ func TestBuildRedisWithNone(t *testing.T) {
splitTasks := synchronizer.SplitTasks{}
runtimeTelemetry := mocks.MockTelemetryStorage{}
impressionCountStorage := mocks.MockImpressionsCountStorage{}
uniqueKeysStorage := mocks.MockUniqueKeysStorage{}

impManager, err := BuildRedisManager(&cfg, logger, &splitTasks, runtimeTelemetry, impressionCountStorage, runtimeTelemetry)
impManager, err := BuildRedisManager(&cfg, logger, &splitTasks, runtimeTelemetry, impressionCountStorage, runtimeTelemetry, uniqueKeysStorage)
if err != nil {
t.Error("err should be nil. ", err.Error())
}
Expand Down
2 changes: 1 addition & 1 deletion splitio/version.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package splitio

// Version contains a string with the split sdk version
const Version = "6.7.0"
const Version = "6.7.1.rc.1"