Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 58 additions & 11 deletions lib/splitclient-rb/engine/impressions/unique_keys_tracker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ def initialize(config,
@filter_adapter = filter_adapter
@sender_adapter = sender_adapter
@cache = cache
@cache_max_size = config.unique_keys_cache_max_size
@max_bulk_size = config.unique_keys_bulk_size
@semaphore = Mutex.new
@keys_size = 0
end

def call
Expand All @@ -30,8 +30,9 @@ def track(feature_name, key)
@filter_adapter.add(feature_name, key)

add_or_update(feature_name, key)
@keys_size += 1

send_bulk_data if @cache.size >= @cache_max_size
send_bulk_data if @keys_size >= @max_bulk_size

true
rescue StandardError => e
Expand Down Expand Up @@ -70,27 +71,73 @@ def add_or_update(feature_name, key)
end
end

def clear_cache
uniques = @cache.clone
keys_size = @keys_size
@cache.clear
@keys_size = 0

[uniques, keys_size]
end

def send_bulk_data
@semaphore.synchronize do
return if @cache.empty?

uniques = @cache.clone
@cache.clear

if uniques.size <= @max_bulk_size
uniques, keys_size = clear_cache
if keys_size <= @max_bulk_size
@sender_adapter.record_uniques_key(uniques)
return
end

bulks = SplitIoClient::Utilities.split_bulk_to_send(uniques, uniques.size / @max_bulk_size)

bulks.each do |b|
@sender_adapter.record_uniques_key(b)
end
bulks = flatten_bulks(uniques)
bulks_to_post = group_bulks_by_max_size(bulks)
@sender_adapter.record_uniques_key(bulks_to_post)
end
rescue StandardError => e
@config.log_found_exception(__method__.to_s, e)
end

def group_bulks_by_max_size(bulks)
current_size = 0
bulks_to_post = Concurrent::Hash.new
bulks.each do |bulk|
key, value = bulk.first
if (value.size + current_size) > @max_bulk_size
@sender_adapter.record_uniques_key(bulks_to_post)
bulks_to_post = Concurrent::Hash.new
current_size = 0
end
bulks_to_post[key] = value
current_size += value.size
end

bulks_to_post
end

def flatten_bulks(uniques)
bulks = []
uniques.each_key do |unique_key|
bulks += check_keys_and_split_to_bulks(uniques[unique_key], unique_key)
end

bulks
end

def check_keys_and_split_to_bulks(value, key)
unique_updated = []
if value.size > @max_bulk_size
sub_bulks = SplitIoClient::Utilities.split_bulk_to_send(value, @max_bulk_size)
sub_bulks.each do |sub_bulk|
unique_updated << { key => sub_bulk.to_set }
end
return unique_updated

end
unique_updated << { key => value }

unique_updated
end
end
end
end
Expand Down
10 changes: 5 additions & 5 deletions lib/splitclient-rb/split_config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ def initialize(opts = {})
@telemetry_service_url = opts[:telemetry_service_url] || SplitConfig.default_telemetry_service_url

@unique_keys_refresh_rate = SplitConfig.default_unique_keys_refresh_rate(@cache_adapter)
@unique_keys_cache_max_size = SplitConfig.default_unique_keys_cache_max_size
# @unique_keys_cache_max_size = SplitConfig.default_unique_keys_cache_max_size
@unique_keys_bulk_size = SplitConfig.default_unique_keys_bulk_size(@cache_adapter)

@counter_refresh_rate = SplitConfig.default_counter_refresh_rate(@cache_adapter)
Expand Down Expand Up @@ -292,7 +292,7 @@ def initialize(opts = {})
attr_accessor :on_demand_fetch_max_retries

attr_accessor :unique_keys_refresh_rate
attr_accessor :unique_keys_cache_max_size
#attr_accessor :unique_keys_cache_max_size
attr_accessor :unique_keys_bulk_size

attr_accessor :counter_refresh_rate
Expand Down Expand Up @@ -498,9 +498,9 @@ def self.default_unique_keys_refresh_rate(adapter)
900
end

def self.default_unique_keys_cache_max_size
30000
end
# def self.default_unique_keys_cache_max_size
# 30000
# end

def self.default_unique_keys_bulk_size(adapter)
return 2000 if adapter == :redis
Expand Down
16 changes: 6 additions & 10 deletions lib/splitclient-rb/utilitites.rb
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,12 @@ def randomize_interval(interval)
interval * random_factor
end

def split_bulk_to_send(hash, divisions)
count = 0

hash.each_with_object([]) do |key_value, final|
final[count % divisions] ||= {}
final[count % divisions][key_value[0]] = key_value[1]
count += 1
end
rescue StandardError
[]
def split_bulk_to_send(items, divisions)
to_return = []
items.to_a.each_slice(divisions) {|bulk|
to_return.push(bulk.to_set)
}
to_return
end
end
end
75 changes: 63 additions & 12 deletions spec/engine/impressions/memory_unique_keys_tracker_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,17 @@
it 'track - full cache and send bulk' do
post_url = 'https://telemetry.split.io/api/v1/keys/ss'
body_expect = {
keys: [{ f: 'feature-test-0', ks: ['key_test-1', 'key_test-2'] }, { f: 'feature-test-1', ks: ['key_test-1'] }]
keys: [{ f: 'feature-test-0', ks: ['key_test-1', 'key_test-2'] }]
}.to_json

body_expect2 = {
keys: [{ f: 'feature-test-1', ks: ['key_test-1', 'key_test-2'] }]
}.to_json

stub_request(:post, post_url).with(body: body_expect).to_return(status: 200, body: '')
stub_request(:post, post_url).with(body: body_expect2).to_return(status: 200, body: '')

cache = Concurrent::Hash.new
config.unique_keys_cache_max_size = 2
config.unique_keys_bulk_size = 2
tracker = subject.new(config, filter_adapter, sender_adapter, cache)

Expand All @@ -36,36 +40,61 @@
expect(tracker.track("feature-test-#{i}", 'key_test-2')).to eq(true)
end

expect(a_request(:post, post_url).with(body: body_expect2)).to have_been_made
expect(a_request(:post, post_url).with(body: body_expect)).to have_been_made

cache.clear
end

it 'track - full cache and send 2 bulks' do
it 'track - full cache and send 4 bulks' do
post_url = 'https://telemetry.split.io/api/v1/keys/ss'
body_expect1 = {
keys: [{ f: 'feature-test-0', ks: ['key-1', 'key-2'] }, { f: 'feature-test-2', ks: ['key-1', 'key-2'] }]
keys: [{ f: 'feature-test-0', ks: ['key-1', 'key-2'] }]
}.to_json

body_expect2 = {
keys: [{ f: 'feature-test-1', ks: ['key-1', 'key-2'] }, { f: 'feature-test-3', ks: ['key-1'] }]
keys: [{ f: 'feature-test-0', ks: ['key-3'] }, { f: 'feature-test-1', ks: ['key-1'] }]
}.to_json

body_expect3 = {
keys: [{ f: 'feature-test-1', ks: ['key-2', 'key-3'] }]
}.to_json

body_expect4 = {
keys: [{ f: 'feature-test-2', ks: ['key-1', 'key-2'] }]
}.to_json

body_expect5 = {
keys: [{ f: 'feature-test-2', ks: ['key-3'] }, { f: 'feature-test-3', ks: ['key-1'] }]
}.to_json

body_expect6 = {
keys: [{ f: 'feature-test-3', ks: ['key-2', 'key-3'] }]
}.to_json

stub_request(:post, post_url).with(body: body_expect1).to_return(status: 200, body: '')
stub_request(:post, post_url).with(body: body_expect2).to_return(status: 200, body: '')
stub_request(:post, post_url).with(body: body_expect3).to_return(status: 200, body: '')
stub_request(:post, post_url).with(body: body_expect4).to_return(status: 200, body: '')
stub_request(:post, post_url).with(body: body_expect5).to_return(status: 200, body: '')
stub_request(:post, post_url).with(body: body_expect6).to_return(status: 200, body: '')

cache = Concurrent::Hash.new
config.unique_keys_cache_max_size = 4
config.unique_keys_bulk_size = 2
tracker = subject.new(config, filter_adapter, sender_adapter, cache)

4.times do |i|
expect(tracker.track("feature-test-#{i}", 'key-1')).to eq(true)
expect(tracker.track("feature-test-#{i}", 'key-2')).to eq(true)
expect(tracker.track("feature-test-#{i}", 'key-3')).to eq(true)
end

expect(a_request(:post, post_url).with(body: body_expect1)).to have_been_made
expect(a_request(:post, post_url).with(body: body_expect2)).to have_been_made
expect(a_request(:post, post_url).with(body: body_expect3)).to have_been_made
expect(a_request(:post, post_url).with(body: body_expect4)).to have_been_made
expect(a_request(:post, post_url).with(body: body_expect5)).to have_been_made
expect(a_request(:post, post_url).with(body: body_expect6)).to have_been_made

cache.clear
end
Expand All @@ -74,9 +103,8 @@
context 'with sender_adapter_test' do
let(:sender_adapter_test) { MemoryUniqueKeysSenderTest.new }

it 'track - should add elemets to cache' do
it 'track - should trigger send when bulk size reached and add elemets to cache' do
cache = Concurrent::Hash.new
config.unique_keys_cache_max_size = 5
config.unique_keys_bulk_size = 5
tracker = subject.new(config, filter_adapter, sender_adapter_test, cache)

Expand All @@ -85,24 +113,26 @@
expect(tracker.track('feature_name_test', 'key_test-1')).to eq(true)
expect(tracker.track('feature_name_test', 'key_test-2')).to eq(true)
expect(tracker.track('other_test', 'key_test-2')).to eq(true)
expect(tracker.track('other_test', 'key_test-35')).to eq(true)

expect(cache.size).to eq(2)
expect(tracker.instance_variable_get(:@keys_size)).to eq(4)

expect(cache['feature_name_test'].include?('key_test')).to eq(true)
expect(cache['feature_name_test'].include?('key_test-1')).to eq(true)
expect(cache['feature_name_test'].include?('key_test-2')).to eq(true)
expect(cache['feature_name_test'].include?('key_test-35')).to eq(false)

expect(cache['other_test'].include?('key_test-2')).to eq(true)
expect(cache['other_test'].include?('key_test-35')).to eq(true)
expect(cache['other_test'].include?('key_test-1')).to eq(false)

expect(tracker.track('other_test', 'key_test-35')).to eq(true)
expect(cache.size).to eq(0)
expect(tracker.instance_variable_get(:@keys_size)).to eq(0)

cache.clear
end

it 'track - full cache and send bulk' do
cache = Concurrent::Hash.new
config.unique_keys_cache_max_size = 10
config.unique_keys_bulk_size = 5
tracker = subject.new(config, filter_adapter, sender_adapter_test, cache)

Expand All @@ -116,5 +146,26 @@

cache.clear
end

it 'track - split chunks if above limit' do
cache = Concurrent::Hash.new
config.unique_keys_bulk_size = 1000
tracker = subject.new(config, filter_adapter, sender_adapter_test, cache)

10.times { |i| expect(tracker.track("feature-test-#{i}", 'key_test')).to eq(true) }
5.times { |i| expect(tracker.track("feature-test-1", "key_test-#{i}")).to eq(true) }

tracker.instance_variable_set(:@max_bulk_size, 5)
tracker.send(:send_bulk_data)

result = sender_adapter_test.bulks
expect(result[0].size).to eq(1)
expect(result[1].size).to eq(1)
expect(result[1]["feature-test-1"].size).to eq(5)
expect(result[2].size).to eq(5)
expect(result[3].size).to eq(4)

cache.clear
end
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
key = "#{config.redis_namespace}.uniquekeys"

cache = Concurrent::Hash.new
config.unique_keys_cache_max_size = 20
config.unique_keys_bulk_size = 2
tracker = subject.new(config, filter_adapter, sender_adapter, cache)

Expand Down
33 changes: 5 additions & 28 deletions spec/splitclient/utilities_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,37 +19,14 @@
end

it 'split bulk of data - split equally' do
hash = {}
items = Set['feature', 'feature-1', 'feature-2', 'feature-3', 'feature-4', 'feature-5', 'feature-6']

i = 1
while i <= 6
hash["mauro-#{i}"] = Set.new(['feature', 'feature-1'])
i += 1
end

result = SplitIoClient::Utilities.split_bulk_to_send(hash, 3)
result = SplitIoClient::Utilities.split_bulk_to_send(items, 3)

puts result
expect(result.size).to eq 3
expect(result[0].size).to eq 2
expect(result[1].size).to eq 2
expect(result[2].size).to eq 2
end

it 'split bulk of data - split in 4 bulks' do
hash = {}

i = 1
while i <= 6
hash["mauro-#{i}"] = 'feature-test'
i += 1
end

result = SplitIoClient::Utilities.split_bulk_to_send(hash, 4)

expect(result.size).to eq 4
expect(result[0].size).to eq 2
expect(result[1].size).to eq 2
expect(result[0].size).to eq 3
expect(result[1].size).to eq 3
expect(result[2].size).to eq 1
expect(result[3].size).to eq 1
end
end