From 85e88c2180f04ffe7e81e0756d24abbeb50a8a57 Mon Sep 17 00:00:00 2001 From: Bilal Al-Shahwany Date: Thu, 14 Aug 2025 12:26:31 -0700 Subject: [PATCH 1/3] Updated post SS keys in chunks --- .../engine/impressions/unique_keys_tracker.rb | 30 +++++++++-- lib/splitclient-rb/split_config.rb | 10 ++-- .../memory_unique_keys_tracker_spec.rb | 54 ++++++++++++++----- .../redis_unique_keys_tracker_spec.rb | 1 - 4 files changed, 73 insertions(+), 22 deletions(-) diff --git a/lib/splitclient-rb/engine/impressions/unique_keys_tracker.rb b/lib/splitclient-rb/engine/impressions/unique_keys_tracker.rb index 79fc5b5c..77639fd3 100644 --- a/lib/splitclient-rb/engine/impressions/unique_keys_tracker.rb +++ b/lib/splitclient-rb/engine/impressions/unique_keys_tracker.rb @@ -14,9 +14,9 @@ def initialize(config, @filter_adapter = filter_adapter @sender_adapter = sender_adapter @cache = cache - @cache_max_size = config.unique_keys_cache_max_size @max_bulk_size = config.unique_keys_bulk_size @semaphore = Mutex.new + @keys_size = 0 end def call @@ -30,8 +30,9 @@ def track(feature_name, key) @filter_adapter.add(feature_name, key) add_or_update(feature_name, key) + @keys_size += 1 - send_bulk_data if @cache.size >= @cache_max_size + send_bulk_data if @keys_size >= @max_bulk_size true rescue StandardError => e @@ -75,14 +76,19 @@ def send_bulk_data return if @cache.empty? uniques = @cache.clone + keys_size = @keys_size @cache.clear + @keys_size = 0 - if uniques.size <= @max_bulk_size + if keys_size <= @max_bulk_size @sender_adapter.record_uniques_key(uniques) return end - bulks = SplitIoClient::Utilities.split_bulk_to_send(uniques, uniques.size / @max_bulk_size) + bulks = [] + uniques.each do |unique| + bulks += check_keys_and_split_to_bulks(unique) + end bulks.each do |b| @sender_adapter.record_uniques_key(b) @@ -91,6 +97,22 @@ def send_bulk_data rescue StandardError => e @config.log_found_exception(__method__.to_s, e) end + + def check_keys_and_split_to_bulks(unique) + unique_updated = [] + unique.each do |_, value| + if value.size > @max_bulk_size + sub_bulks = SplitIoClient::Utilities.split_bulk_to_send(value, value.size / @max_bulk_size) + sub_bulks.each do |sub_bulk| + unique_updated.add({ key: sub_bulk }) + end + end + unique_updated.add({ key: value }) + end + return [unique] if unique_updated == {} + + unique_updated + end end end end diff --git a/lib/splitclient-rb/split_config.rb b/lib/splitclient-rb/split_config.rb index de7f3e4d..f1967c93 100644 --- a/lib/splitclient-rb/split_config.rb +++ b/lib/splitclient-rb/split_config.rb @@ -112,7 +112,7 @@ def initialize(opts = {}) @telemetry_service_url = opts[:telemetry_service_url] || SplitConfig.default_telemetry_service_url @unique_keys_refresh_rate = SplitConfig.default_unique_keys_refresh_rate(@cache_adapter) - @unique_keys_cache_max_size = SplitConfig.default_unique_keys_cache_max_size + # @unique_keys_cache_max_size = SplitConfig.default_unique_keys_cache_max_size @unique_keys_bulk_size = SplitConfig.default_unique_keys_bulk_size(@cache_adapter) @counter_refresh_rate = SplitConfig.default_counter_refresh_rate(@cache_adapter) @@ -292,7 +292,7 @@ def initialize(opts = {}) attr_accessor :on_demand_fetch_max_retries attr_accessor :unique_keys_refresh_rate - attr_accessor :unique_keys_cache_max_size + #attr_accessor :unique_keys_cache_max_size attr_accessor :unique_keys_bulk_size attr_accessor :counter_refresh_rate @@ -498,9 +498,9 @@ def self.default_unique_keys_refresh_rate(adapter) 900 end - def self.default_unique_keys_cache_max_size - 30000 - end +# def self.default_unique_keys_cache_max_size +# 30000 +# end def self.default_unique_keys_bulk_size(adapter) return 2000 if adapter == :redis diff --git a/spec/engine/impressions/memory_unique_keys_tracker_spec.rb b/spec/engine/impressions/memory_unique_keys_tracker_spec.rb index cdff51b6..597c6ff1 100644 --- a/spec/engine/impressions/memory_unique_keys_tracker_spec.rb +++ b/spec/engine/impressions/memory_unique_keys_tracker_spec.rb @@ -21,13 +21,17 @@ it 'track - full cache and send bulk' do post_url = 'https://telemetry.split.io/api/v1/keys/ss' body_expect = { - keys: [{ f: 'feature-test-0', ks: ['key_test-1', 'key_test-2'] }, { f: 'feature-test-1', ks: ['key_test-1'] }] + keys: [{ f: 'feature-test-0', ks: ['key_test-1', 'key_test-2'] }] + }.to_json + + body_expect2 = { + keys: [{ f: 'feature-test-1', ks: ['key_test-1', 'key_test-2'] }] }.to_json stub_request(:post, post_url).with(body: body_expect).to_return(status: 200, body: '') + stub_request(:post, post_url).with(body: body_expect2).to_return(status: 200, body: '') cache = Concurrent::Hash.new - config.unique_keys_cache_max_size = 2 config.unique_keys_bulk_size = 2 tracker = subject.new(config, filter_adapter, sender_adapter, cache) @@ -36,36 +40,61 @@ expect(tracker.track("feature-test-#{i}", 'key_test-2')).to eq(true) end + expect(a_request(:post, post_url).with(body: body_expect2)).to have_been_made expect(a_request(:post, post_url).with(body: body_expect)).to have_been_made cache.clear end - it 'track - full cache and send 2 bulks' do + it 'track - full cache and send 4 bulks' do post_url = 'https://telemetry.split.io/api/v1/keys/ss' body_expect1 = { - keys: [{ f: 'feature-test-0', ks: ['key-1', 'key-2'] }, { f: 'feature-test-2', ks: ['key-1', 'key-2'] }] + keys: [{ f: 'feature-test-0', ks: ['key-1', 'key-2'] }] }.to_json body_expect2 = { - keys: [{ f: 'feature-test-1', ks: ['key-1', 'key-2'] }, { f: 'feature-test-3', ks: ['key-1'] }] + keys: [{ f: 'feature-test-0', ks: ['key-3'] }, { f: 'feature-test-1', ks: ['key-1'] }] + }.to_json + + body_expect3 = { + keys: [{ f: 'feature-test-1', ks: ['key-2', 'key-3'] }] + }.to_json + + body_expect4 = { + keys: [{ f: 'feature-test-2', ks: ['key-1', 'key-2'] }] + }.to_json + + body_expect5 = { + keys: [{ f: 'feature-test-2', ks: ['key-3'] }, { f: 'feature-test-3', ks: ['key-1'] }] + }.to_json + + body_expect6 = { + keys: [{ f: 'feature-test-3', ks: ['key-2', 'key-3'] }] }.to_json stub_request(:post, post_url).with(body: body_expect1).to_return(status: 200, body: '') stub_request(:post, post_url).with(body: body_expect2).to_return(status: 200, body: '') + stub_request(:post, post_url).with(body: body_expect3).to_return(status: 200, body: '') + stub_request(:post, post_url).with(body: body_expect4).to_return(status: 200, body: '') + stub_request(:post, post_url).with(body: body_expect5).to_return(status: 200, body: '') + stub_request(:post, post_url).with(body: body_expect6).to_return(status: 200, body: '') cache = Concurrent::Hash.new - config.unique_keys_cache_max_size = 4 config.unique_keys_bulk_size = 2 tracker = subject.new(config, filter_adapter, sender_adapter, cache) 4.times do |i| expect(tracker.track("feature-test-#{i}", 'key-1')).to eq(true) expect(tracker.track("feature-test-#{i}", 'key-2')).to eq(true) + expect(tracker.track("feature-test-#{i}", 'key-3')).to eq(true) end expect(a_request(:post, post_url).with(body: body_expect1)).to have_been_made expect(a_request(:post, post_url).with(body: body_expect2)).to have_been_made + expect(a_request(:post, post_url).with(body: body_expect3)).to have_been_made + expect(a_request(:post, post_url).with(body: body_expect4)).to have_been_made + expect(a_request(:post, post_url).with(body: body_expect5)).to have_been_made + expect(a_request(:post, post_url).with(body: body_expect6)).to have_been_made cache.clear end @@ -74,9 +103,8 @@ context 'with sender_adapter_test' do let(:sender_adapter_test) { MemoryUniqueKeysSenderTest.new } - it 'track - should add elemets to cache' do + it 'track - should trigger send when bulk size reached and add elemets to cache' do cache = Concurrent::Hash.new - config.unique_keys_cache_max_size = 5 config.unique_keys_bulk_size = 5 tracker = subject.new(config, filter_adapter, sender_adapter_test, cache) @@ -85,24 +113,26 @@ expect(tracker.track('feature_name_test', 'key_test-1')).to eq(true) expect(tracker.track('feature_name_test', 'key_test-2')).to eq(true) expect(tracker.track('other_test', 'key_test-2')).to eq(true) - expect(tracker.track('other_test', 'key_test-35')).to eq(true) - expect(cache.size).to eq(2) + expect(tracker.instance_variable_get(:@keys_size)).to eq(4) + expect(cache['feature_name_test'].include?('key_test')).to eq(true) expect(cache['feature_name_test'].include?('key_test-1')).to eq(true) expect(cache['feature_name_test'].include?('key_test-2')).to eq(true) expect(cache['feature_name_test'].include?('key_test-35')).to eq(false) expect(cache['other_test'].include?('key_test-2')).to eq(true) - expect(cache['other_test'].include?('key_test-35')).to eq(true) expect(cache['other_test'].include?('key_test-1')).to eq(false) + expect(tracker.track('other_test', 'key_test-35')).to eq(true) + expect(cache.size).to eq(0) + expect(tracker.instance_variable_get(:@keys_size)).to eq(0) + cache.clear end it 'track - full cache and send bulk' do cache = Concurrent::Hash.new - config.unique_keys_cache_max_size = 10 config.unique_keys_bulk_size = 5 tracker = subject.new(config, filter_adapter, sender_adapter_test, cache) diff --git a/spec/engine/impressions/redis_unique_keys_tracker_spec.rb b/spec/engine/impressions/redis_unique_keys_tracker_spec.rb index 9ad42288..ac665803 100644 --- a/spec/engine/impressions/redis_unique_keys_tracker_spec.rb +++ b/spec/engine/impressions/redis_unique_keys_tracker_spec.rb @@ -26,7 +26,6 @@ key = "#{config.redis_namespace}.uniquekeys" cache = Concurrent::Hash.new - config.unique_keys_cache_max_size = 20 config.unique_keys_bulk_size = 2 tracker = subject.new(config, filter_adapter, sender_adapter, cache) From f7bc08de9f2f0b0e89f977787fed60edb912b19c Mon Sep 17 00:00:00 2001 From: Bilal Al-Shahwany Date: Thu, 14 Aug 2025 12:32:42 -0700 Subject: [PATCH 2/3] polish --- lib/splitclient-rb/engine/impressions/unique_keys_tracker.rb | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lib/splitclient-rb/engine/impressions/unique_keys_tracker.rb b/lib/splitclient-rb/engine/impressions/unique_keys_tracker.rb index 77639fd3..fc0aae2c 100644 --- a/lib/splitclient-rb/engine/impressions/unique_keys_tracker.rb +++ b/lib/splitclient-rb/engine/impressions/unique_keys_tracker.rb @@ -106,10 +106,11 @@ def check_keys_and_split_to_bulks(unique) sub_bulks.each do |sub_bulk| unique_updated.add({ key: sub_bulk }) end + break + end unique_updated.add({ key: value }) end - return [unique] if unique_updated == {} unique_updated end From e3104bfa1a8587366c22232440fb0b5e3c75e4af Mon Sep 17 00:00:00 2001 From: Bilal Al-Shahwany Date: Wed, 20 Aug 2025 14:36:53 -0700 Subject: [PATCH 3/3] Changed hash to array to allow multiple feature flag records --- .../engine/impressions/unique_keys_tracker.rb | 70 +++++++++++++------ lib/splitclient-rb/utilitites.rb | 16 ++--- .../memory_unique_keys_tracker_spec.rb | 21 ++++++ spec/splitclient/utilities_spec.rb | 33 ++------- 4 files changed, 79 insertions(+), 61 deletions(-) diff --git a/lib/splitclient-rb/engine/impressions/unique_keys_tracker.rb b/lib/splitclient-rb/engine/impressions/unique_keys_tracker.rb index fc0aae2c..ea5dfd65 100644 --- a/lib/splitclient-rb/engine/impressions/unique_keys_tracker.rb +++ b/lib/splitclient-rb/engine/impressions/unique_keys_tracker.rb @@ -71,46 +71,70 @@ def add_or_update(feature_name, key) end end + def clear_cache + uniques = @cache.clone + keys_size = @keys_size + @cache.clear + @keys_size = 0 + + [uniques, keys_size] + end + def send_bulk_data @semaphore.synchronize do return if @cache.empty? - uniques = @cache.clone - keys_size = @keys_size - @cache.clear - @keys_size = 0 - + uniques, keys_size = clear_cache if keys_size <= @max_bulk_size @sender_adapter.record_uniques_key(uniques) return - end - - bulks = [] - uniques.each do |unique| - bulks += check_keys_and_split_to_bulks(unique) - end - bulks.each do |b| - @sender_adapter.record_uniques_key(b) end + bulks = flatten_bulks(uniques) + bulks_to_post = group_bulks_by_max_size(bulks) + @sender_adapter.record_uniques_key(bulks_to_post) end rescue StandardError => e @config.log_found_exception(__method__.to_s, e) end - def check_keys_and_split_to_bulks(unique) - unique_updated = [] - unique.each do |_, value| - if value.size > @max_bulk_size - sub_bulks = SplitIoClient::Utilities.split_bulk_to_send(value, value.size / @max_bulk_size) - sub_bulks.each do |sub_bulk| - unique_updated.add({ key: sub_bulk }) - end - break + def group_bulks_by_max_size(bulks) + current_size = 0 + bulks_to_post = Concurrent::Hash.new + bulks.each do |bulk| + key, value = bulk.first + if (value.size + current_size) > @max_bulk_size + @sender_adapter.record_uniques_key(bulks_to_post) + bulks_to_post = Concurrent::Hash.new + current_size = 0 + end + bulks_to_post[key] = value + current_size += value.size + end + + bulks_to_post + end + + def flatten_bulks(uniques) + bulks = [] + uniques.each_key do |unique_key| + bulks += check_keys_and_split_to_bulks(uniques[unique_key], unique_key) + end + bulks + end + + def check_keys_and_split_to_bulks(value, key) + unique_updated = [] + if value.size > @max_bulk_size + sub_bulks = SplitIoClient::Utilities.split_bulk_to_send(value, @max_bulk_size) + sub_bulks.each do |sub_bulk| + unique_updated << { key => sub_bulk.to_set } end - unique_updated.add({ key: value }) + return unique_updated + end + unique_updated << { key => value } unique_updated end diff --git a/lib/splitclient-rb/utilitites.rb b/lib/splitclient-rb/utilitites.rb index 03ca56cf..1bcea51e 100644 --- a/lib/splitclient-rb/utilitites.rb +++ b/lib/splitclient-rb/utilitites.rb @@ -38,16 +38,12 @@ def randomize_interval(interval) interval * random_factor end - def split_bulk_to_send(hash, divisions) - count = 0 - - hash.each_with_object([]) do |key_value, final| - final[count % divisions] ||= {} - final[count % divisions][key_value[0]] = key_value[1] - count += 1 - end - rescue StandardError - [] + def split_bulk_to_send(items, divisions) + to_return = [] + items.to_a.each_slice(divisions) {|bulk| + to_return.push(bulk.to_set) + } + to_return end end end diff --git a/spec/engine/impressions/memory_unique_keys_tracker_spec.rb b/spec/engine/impressions/memory_unique_keys_tracker_spec.rb index 597c6ff1..a6e6fa6e 100644 --- a/spec/engine/impressions/memory_unique_keys_tracker_spec.rb +++ b/spec/engine/impressions/memory_unique_keys_tracker_spec.rb @@ -146,5 +146,26 @@ cache.clear end + + it 'track - split chunks if above limit' do + cache = Concurrent::Hash.new + config.unique_keys_bulk_size = 1000 + tracker = subject.new(config, filter_adapter, sender_adapter_test, cache) + + 10.times { |i| expect(tracker.track("feature-test-#{i}", 'key_test')).to eq(true) } + 5.times { |i| expect(tracker.track("feature-test-1", "key_test-#{i}")).to eq(true) } + + tracker.instance_variable_set(:@max_bulk_size, 5) + tracker.send(:send_bulk_data) + + result = sender_adapter_test.bulks + expect(result[0].size).to eq(1) + expect(result[1].size).to eq(1) + expect(result[1]["feature-test-1"].size).to eq(5) + expect(result[2].size).to eq(5) + expect(result[3].size).to eq(4) + + cache.clear + end end end diff --git a/spec/splitclient/utilities_spec.rb b/spec/splitclient/utilities_spec.rb index db1a53f4..631c01e0 100644 --- a/spec/splitclient/utilities_spec.rb +++ b/spec/splitclient/utilities_spec.rb @@ -19,37 +19,14 @@ end it 'split bulk of data - split equally' do - hash = {} + items = Set['feature', 'feature-1', 'feature-2', 'feature-3', 'feature-4', 'feature-5', 'feature-6'] - i = 1 - while i <= 6 - hash["mauro-#{i}"] = Set.new(['feature', 'feature-1']) - i += 1 - end - - result = SplitIoClient::Utilities.split_bulk_to_send(hash, 3) + result = SplitIoClient::Utilities.split_bulk_to_send(items, 3) + puts result expect(result.size).to eq 3 - expect(result[0].size).to eq 2 - expect(result[1].size).to eq 2 - expect(result[2].size).to eq 2 - end - - it 'split bulk of data - split in 4 bulks' do - hash = {} - - i = 1 - while i <= 6 - hash["mauro-#{i}"] = 'feature-test' - i += 1 - end - - result = SplitIoClient::Utilities.split_bulk_to_send(hash, 4) - - expect(result.size).to eq 4 - expect(result[0].size).to eq 2 - expect(result[1].size).to eq 2 + expect(result[0].size).to eq 3 + expect(result[1].size).to eq 3 expect(result[2].size).to eq 1 - expect(result[3].size).to eq 1 end end