Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions lib/splitclient-rb/cache/fetchers/split_fetcher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,9 @@ def call
splits_thread
end

def fetch_splits(fetch_options = { cache_control_headers: false, till: nil, till_rbs: nil })
def fetch_splits(fetch_options = { cache_control_headers: false, till: nil })
@semaphore.synchronize do
data = splits_since(@splits_repository.get_change_number, @rule_based_segments_repository.get_change_number, fetch_options)

SplitIoClient::Helpers::RepositoryHelper.update_feature_flag_repository(@splits_repository, data[:ff][:d], data[:ff][:t], @config)
SplitIoClient::Helpers::RepositoryHelper.update_rule_based_segment_repository(@rule_based_segments_repository, data[:rbs][:d], data[:rbs][:t], @config)
@splits_repository.set_segment_names(data[:segment_names])
Expand Down Expand Up @@ -57,7 +56,7 @@ def splits_thread
end
end

def splits_since(since, since_rbs, fetch_options = { cache_control_headers: false, till: nil, till_rbs: nil })
def splits_since(since, since_rbs, fetch_options = { cache_control_headers: false, till: nil })
splits_api.since(since, since_rbs, fetch_options)
end

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ def clear
end

def contains?(segment_names)
return false if rule_based_segment_names.empty?
return set(segment_names).subset?(rule_based_segment_names)
end

Expand Down
8 changes: 3 additions & 5 deletions lib/splitclient-rb/engine/api/splits.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,12 @@ def initialize(api_key, config, telemetry_runtime_producer)
@flag_sets_filter = @config.flag_sets_filter
end

def since(since, since_rbs, fetch_options = { cache_control_headers: false, till: nil, till_rbs: nil, sets: nil})
def since(since, since_rbs, fetch_options = { cache_control_headers: false, till: nil, sets: nil})
start = Time.now

params = { s: SplitIoClient::Spec::FeatureFlags::SPEC_VERSION, since: since, rbSince: since_rbs }
params[:sets] = @flag_sets_filter.join(",") unless @flag_sets_filter.empty?
params[:till] = fetch_options[:till] unless fetch_options[:till].nil?
params[:till_rbs] = fetch_options[:till_rbs] unless fetch_options[:till_rbs].nil?
@config.logger.debug("Fetching from splitChanges with #{params}: ")
response = get_api("#{@config.base_uri}/splitChanges", @api_key, params, fetch_options[:cache_control_headers])
if response.status == 414
Expand Down Expand Up @@ -57,14 +56,13 @@ def since(since, since_rbs, fetch_options = { cache_control_headers: false, till

def objects_with_segment_names(objects_json)
parsed_objects = JSON.parse(objects_json, symbolize_names: true)

parsed_objects[:segment_names] =
parsed_objects[:ff][:d].each_with_object(Set.new) do |split, splits|
splits << Helpers::Util.segment_names_by_object(split)
splits << Helpers::Util.segment_names_by_object(split, "IN_SEGMENT")
end.flatten
if not parsed_objects[:ff][:rbs].nil?
parsed_objects[:segment_names].merge parsed_objects[:ff][:rbs].each_with_object(Set.new) do |rule_based_segment, rule_based_segments|
rule_based_segments << Helpers::Util.segment_names_by_object(rule_based_segment)
rule_based_segments << Helpers::Util.segment_names_by_object(rule_based_segment, "IN_SEGMENT")
end.flatten
end
parsed_objects
Expand Down
30 changes: 23 additions & 7 deletions lib/splitclient-rb/engine/synchronizer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ def initialize(
)
@splits_repository = repositories[:splits]
@segments_repository = repositories[:segments]
@rule_based_segments_repository = repositories[:rule_based_segments]
@impressions_repository = repositories[:impressions]
@events_repository = repositories[:events]
@config = config
Expand Down Expand Up @@ -63,12 +64,12 @@ def stop_periodic_fetch
@segment_fetcher.stop_segments_thread
end

def fetch_splits(target_change_number)
return if target_change_number <= @splits_repository.get_change_number.to_i
def fetch_splits(target_change_number, rbs_target_change_number)
return if check_exit_conditions(target_change_number, rbs_target_change_number)

fetch_options = { cache_control_headers: true, till: nil }

result = attempt_splits_sync(target_change_number,
result = attempt_splits_sync(target_change_number, rbs_target_change_number,
fetch_options,
@config.on_demand_fetch_max_retries,
@config.on_demand_fetch_retry_delay_seconds,
Expand All @@ -82,8 +83,13 @@ def fetch_splits(target_change_number)
return
end

fetch_options[:till] = target_change_number
result = attempt_splits_sync(target_change_number,
if target_change_number != 0
fetch_options[:till] = target_change_number
else
fetch_options[:till] = rbs_target_change_number
end

result = attempt_splits_sync(target_change_number, rbs_target_change_number,
fetch_options,
ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES,
nil,
Expand Down Expand Up @@ -156,7 +162,7 @@ def attempt_segment_sync(name, target_cn, fetch_options, max_retries, retry_dela
end
end

def attempt_splits_sync(target_cn, fetch_options, max_retries, retry_delay_seconds, with_backoff)
def attempt_splits_sync(target_cn, rbs_target_cn, fetch_options, max_retries, retry_delay_seconds, with_backoff)
remaining_attempts = max_retries
@splits_sync_backoff.reset

Expand All @@ -165,7 +171,7 @@ def attempt_splits_sync(target_cn, fetch_options, max_retries, retry_delay_secon

result = @split_fetcher.fetch_splits(fetch_options)

return sync_result(true, remaining_attempts, result[:segment_names]) if target_cn <= @splits_repository.get_change_number
return sync_result(true, remaining_attempts, result[:segment_names]) if check_exit_conditions(target_cn, rbs_target_cn)
return sync_result(false, remaining_attempts, result[:segment_names]) if remaining_attempts <= 0

delay = with_backoff ? @splits_sync_backoff.interval : retry_delay_seconds
Expand Down Expand Up @@ -206,6 +212,16 @@ def sync_splits_and_segments

splits_result[:success] && @segment_fetcher.fetch_segments
end

def check_exit_conditions(target_change_number, rbs_target_change_number)
return true if rbs_target_change_number == 0 and target_change_number == 0

return target_change_number <= @splits_repository.get_change_number.to_i if rbs_target_change_number == 0

return rbs_target_change_number <= @rule_based_segments_repository.get_change_number.to_i if target_change_number == 0

return (target_change_number <= @splits_repository.get_change_number.to_i and rbs_target_change_number <= @rule_based_segments_repository.get_change_number.to_i)
end
end
end
end
5 changes: 2 additions & 3 deletions lib/splitclient-rb/helpers/util.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,10 @@
module SplitIoClient
module Helpers
class Util
def self.segment_names_by_object(object)
def self.segment_names_by_object(object, matcher_type)
object[:conditions].each_with_object(Set.new) do |condition, names|
condition[:matcherGroup][:matchers].each do |matcher|
next if matcher[:userDefinedSegmentMatcherData].nil?

next if matcher[:userDefinedSegmentMatcherData].nil? or matcher[:matcherType] != matcher_type
names << matcher[:userDefinedSegmentMatcherData][:segmentName]
end
end
Expand Down
6 changes: 4 additions & 2 deletions lib/splitclient-rb/split_factory.rb
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ def repositories
segments: @segments_repository,
impressions: @impressions_repository,
events: @events_repository,
rule_based_segments: @rule_based_segment_repository
}
end

Expand All @@ -178,7 +179,7 @@ def build_telemetry_components
end

def build_fetchers
@split_fetcher = SplitFetcher.new(@splits_repository, @api_key, @config, @runtime_producer)
@split_fetcher = SplitFetcher.new(@splits_repository, @rule_based_segments_repository, @api_key, @config, @runtime_producer)
@segment_fetcher = SegmentFetcher.new(@segments_repository, @api_key, @config, @runtime_producer)
end

Expand All @@ -198,7 +199,7 @@ def build_synchronizer

def build_streaming_components
@push_status_queue = Queue.new
splits_worker = SSE::Workers::SplitsWorker.new(@synchronizer, @config, @splits_repository, @runtime_producer, @segment_fetcher)
splits_worker = SSE::Workers::SplitsWorker.new(@synchronizer, @config, @splits_repository, @runtime_producer, @segment_fetcher, @rule_based_segment_repository)
segments_worker = SSE::Workers::SegmentsWorker.new(@synchronizer, @config, @segments_repository)
notification_manager_keeper = SSE::NotificationManagerKeeper.new(@config, @runtime_producer, @push_status_queue)
notification_processor = SSE::NotificationProcessor.new(@config, splits_worker, segments_worker)
Expand All @@ -220,6 +221,7 @@ def build_repositories
end
@splits_repository = SplitsRepository.new(@config, @flag_sets_repository, @flag_sets_filter)
@segments_repository = SegmentsRepository.new(@config)
@rule_based_segment_repository = RuleBasedSegmentRepository.new(@config)
@impressions_repository = ImpressionsRepository.new(@config)
@events_repository = EventsRepository.new(@config, @api_key, @runtime_producer)
end
Expand Down
1 change: 1 addition & 0 deletions lib/splitclient-rb/sse/event_source/event_types.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ class EventTypes
SPLIT_KILL = 'SPLIT_KILL'
SEGMENT_UPDATE = 'SEGMENT_UPDATE'
CONTROL = 'CONTROL'
RB_SEGMENT_UPDATE = 'RB_SEGMENT_UPDATE'
end
end
end
Expand Down
4 changes: 3 additions & 1 deletion lib/splitclient-rb/sse/notification_processor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ def process(incoming_notification)
case incoming_notification.data['type']
when SSE::EventSource::EventTypes::SPLIT_UPDATE
process_split_update(incoming_notification)
when SSE::EventSource::EventTypes::RB_SEGMENT_UPDATE
process_split_update(incoming_notification)
when SSE::EventSource::EventTypes::SPLIT_KILL
process_split_kill(incoming_notification)
when SSE::EventSource::EventTypes::SEGMENT_UPDATE
Expand All @@ -25,7 +27,7 @@ def process(incoming_notification)
private

def process_split_update(notification)
@config.logger.debug("SPLIT UPDATE notification received: #{notification}") if @config.debug_enabled
@config.logger.debug("#{notification.type} notification received: #{notification}") if @config.debug_enabled
@splits_worker.add_to_queue(notification)
end

Expand Down
59 changes: 47 additions & 12 deletions lib/splitclient-rb/sse/workers/splits_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@ module SplitIoClient
module SSE
module Workers
class SplitsWorker
def initialize(synchronizer, config, feature_flags_repository, telemetry_runtime_producer, segment_fetcher)
def initialize(synchronizer, config, feature_flags_repository, telemetry_runtime_producer, segment_fetcher, rule_based_segment_repository)
@synchronizer = synchronizer
@config = config
@feature_flags_repository = feature_flags_repository
@queue = Queue.new
@running = Concurrent::AtomicBoolean.new(false)
@telemetry_runtime_producer = telemetry_runtime_producer
@segment_fetcher = segment_fetcher
@rule_based_segment_repository = rule_based_segment_repository
end

def start
Expand Down Expand Up @@ -54,7 +55,10 @@ def perform
case notification.data['type']
when SSE::EventSource::EventTypes::SPLIT_UPDATE
success = update_feature_flag(notification)
@synchronizer.fetch_splits(notification.data['changeNumber']) unless success
@synchronizer.fetch_splits(notification.data['changeNumber'], 0) unless success
when SSE::EventSource::EventTypes::RB_SEGMENT_UPDATE
success = update_rule_based_segment(notification)
@synchronizer.fetch_splits(0, notification.data['changeNumber']) unless success
when SSE::EventSource::EventTypes::SPLIT_KILL
kill_feature_flag(notification)
end
Expand All @@ -64,12 +68,14 @@ def perform
def update_feature_flag(notification)
return true if @feature_flags_repository.get_change_number.to_i >= notification.data['changeNumber']
return false unless !notification.data['d'].nil? && @feature_flags_repository.get_change_number == notification.data['pcn']

new_split = return_split_from_json(notification)
new_split = return_object_from_json(notification)
SplitIoClient::Helpers::RepositoryHelper.update_feature_flag_repository(@feature_flags_repository,
[new_split],
notification.data['changeNumber'], @config)
fetch_segments_if_not_exists(new_split)
fetch_segments_if_not_exists(Helpers::Util.segment_names_by_object(new_split, "IN_SEGMENT"), @feature_flags_repository)
if fetch_rule_based_segments_if_not_exists(Helpers::Util.segment_names_by_object(new_split, "IN_RULE_BASED_SEGMENT"), notification.data['changeNumber'])
return true
end

@telemetry_runtime_producer.record_updates_from_sse(Telemetry::Domain::Constants::SPLITS)

Expand All @@ -80,6 +86,26 @@ def update_feature_flag(notification)
false
end

def update_rule_based_segment(notification)
return true if @rule_based_segment_repository.get_change_number.to_i >= notification.data['changeNumber']
return false unless !notification.data['d'].nil? && @rule_based_segment_repository.get_change_number == notification.data['pcn']

new_rb_segment = return_object_from_json(notification)
SplitIoClient::Helpers::RepositoryHelper.update_rule_based_segment_repository(@rule_based_segment_repository,
[new_rb_segment],
notification.data['changeNumber'], @config)
fetch_segments_if_not_exists(Helpers::Util.segment_names_by_object(new_rb_segment, "IN_SEGMENT"), @rule_based_segment_repository)

# TODO: enable when telemetry spec is added
# @telemetry_runtime_producer.record_updates_from_sse(Telemetry::Domain::Constants::SPLITS)

true
rescue StandardError => e
@config.logger.debug("Failed to update Split: #{e.inspect}") if @config.debug_enabled

false
end

def kill_feature_flag(notification)
return if @feature_flags_repository.get_change_number.to_i > notification.data['changeNumber']

Expand All @@ -89,21 +115,30 @@ def kill_feature_flag(notification)
notification.data['splitName'],
notification.data['defaultTreatment']
)
@synchronizer.fetch_splits(notification.data['changeNumber'])
@synchronizer.fetch_splits(notification.data['changeNumber'], 0)
end

def return_split_from_json(notification)
split_json = Helpers::DecryptionHelper.get_encoded_definition(notification.data['c'], notification.data['d'])
JSON.parse(split_json, symbolize_names: true)
def return_object_from_json(notification)
object_json = Helpers::DecryptionHelper.get_encoded_definition(notification.data['c'], notification.data['d'])
JSON.parse(object_json, symbolize_names: true)
end

def fetch_segments_if_not_exists(feature_flag)
segment_names = Helpers::Util.segment_names_by_feature_flag(feature_flag)
def fetch_segments_if_not_exists(segment_names, object_repository)

return if segment_names.nil?

@feature_flags_repository.set_segment_names(segment_names)
object_repository.set_segment_names(segment_names)
@segment_fetcher.fetch_segments_if_not_exists(segment_names)
end

def fetch_rule_based_segments_if_not_exists(segment_names, change_number)
if segment_names.nil? or segment_names.empty? or @rule_based_segment_repository.contains?(segment_names.to_a)
return false
end
@synchronizer.fetch_splits(0, change_number)

true
end
end
end
end
Expand Down
8 changes: 4 additions & 4 deletions spec/engine/api/splits_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
})
.to_return(status: 200, body: splits)

fetch_options = { cache_control_headers: false, till: nil, till_rbs: nil, sets: ['set_1','set_2'] }
fetch_options = { cache_control_headers: false, till: nil, sets: ['set_1','set_2'] }
returned_splits = splits_api.since(-1, -1, fetch_options)
expect(returned_splits[:segment_names]).to eq(Set.new(%w[demo employees]))

Expand All @@ -72,7 +72,7 @@
})
.to_return(status: 414, body: splits)

fetch_options = { cache_control_headers: false, till: nil, till_rbs: nil, sets: ['set_1','set_2'] }
fetch_options = { cache_control_headers: false, till: nil, sets: ['set_1','set_2'] }
captured = 0
begin
returned_splits = splits_api.since(-1, -1, fetch_options)
Expand Down Expand Up @@ -126,7 +126,7 @@
})
.to_return(status: 200, body: splits)

fetch_options = { cache_control_headers: false, till: 123_123, till_rbs: nil, sets: nil }
fetch_options = { cache_control_headers: false, till: 123_123, sets: nil }
returned_splits = splits_api.since(-1, -1, fetch_options)
expect(returned_splits[:segment_names]).to eq(Set.new(%w[demo employees]))

Expand All @@ -147,7 +147,7 @@
})
.to_return(status: 200, body: splits)

fetch_options = { cache_control_headers: true, till: nil, till_rbs: nil, sets: nil }
fetch_options = { cache_control_headers: true, till: nil, sets: nil }
returned_splits = splits_api.since(-1, -1, fetch_options)
expect(returned_splits[:segment_names]).to eq(Set.new(%w[demo employees]))

Expand Down
Loading