Skip to content

Commit e370d95

Browse files
authored
Merge pull request #550 from splitio/rbs-sse
Updated SSE and synchronizer classes
2 parents b72babe + 207532c commit e370d95

File tree

18 files changed

+269
-138
lines changed

18 files changed

+269
-138
lines changed

lib/splitclient-rb/cache/fetchers/split_fetcher.rb

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ def call
2525
def fetch_splits(fetch_options = { cache_control_headers: false, till: nil })
2626
@semaphore.synchronize do
2727
data = splits_since(@splits_repository.get_change_number, @rule_based_segments_repository.get_change_number, fetch_options)
28-
2928
SplitIoClient::Helpers::RepositoryHelper.update_feature_flag_repository(@splits_repository, data[:ff][:d], data[:ff][:t], @config)
3029
SplitIoClient::Helpers::RepositoryHelper.update_rule_based_segment_repository(@rule_based_segments_repository, data[:rbs][:d], data[:rbs][:t], @config)
3130
@splits_repository.set_segment_names(data[:segment_names])

lib/splitclient-rb/cache/repositories/rule_based_segments_repository.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ def clear
8181
end
8282

8383
def contains?(segment_names)
84+
return false if rule_based_segment_names.empty?
8485
return set(segment_names).subset?(rule_based_segment_names)
8586
end
8687

lib/splitclient-rb/engine/api/splits.rb

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,14 +56,13 @@ def since(since, since_rbs, fetch_options = { cache_control_headers: false, till
5656

5757
def objects_with_segment_names(objects_json)
5858
parsed_objects = JSON.parse(objects_json, symbolize_names: true)
59-
6059
parsed_objects[:segment_names] =
6160
parsed_objects[:ff][:d].each_with_object(Set.new) do |split, splits|
62-
splits << Helpers::Util.segment_names_by_object(split)
61+
splits << Helpers::Util.segment_names_by_object(split, "IN_SEGMENT")
6362
end.flatten
6463
if not parsed_objects[:ff][:rbs].nil?
6564
parsed_objects[:segment_names].merge parsed_objects[:ff][:rbs].each_with_object(Set.new) do |rule_based_segment, rule_based_segments|
66-
rule_based_segments << Helpers::Util.segment_names_by_object(rule_based_segment)
65+
rule_based_segments << Helpers::Util.segment_names_by_object(rule_based_segment, "IN_SEGMENT")
6766
end.flatten
6867
end
6968
parsed_objects

lib/splitclient-rb/engine/synchronizer.rb

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ def initialize(
1515
)
1616
@splits_repository = repositories[:splits]
1717
@segments_repository = repositories[:segments]
18+
@rule_based_segments_repository = repositories[:rule_based_segments]
1819
@impressions_repository = repositories[:impressions]
1920
@events_repository = repositories[:events]
2021
@config = config
@@ -63,12 +64,12 @@ def stop_periodic_fetch
6364
@segment_fetcher.stop_segments_thread
6465
end
6566

66-
def fetch_splits(target_change_number)
67-
return if target_change_number <= @splits_repository.get_change_number.to_i
67+
def fetch_splits(target_change_number, rbs_target_change_number)
68+
return if check_exit_conditions(target_change_number, rbs_target_change_number)
6869

6970
fetch_options = { cache_control_headers: true, till: nil }
7071

71-
result = attempt_splits_sync(target_change_number,
72+
result = attempt_splits_sync(target_change_number, rbs_target_change_number,
7273
fetch_options,
7374
@config.on_demand_fetch_max_retries,
7475
@config.on_demand_fetch_retry_delay_seconds,
@@ -82,8 +83,13 @@ def fetch_splits(target_change_number)
8283
return
8384
end
8485

85-
fetch_options[:till] = target_change_number
86-
result = attempt_splits_sync(target_change_number,
86+
if target_change_number != 0
87+
fetch_options[:till] = target_change_number
88+
else
89+
fetch_options[:till] = rbs_target_change_number
90+
end
91+
92+
result = attempt_splits_sync(target_change_number, rbs_target_change_number,
8793
fetch_options,
8894
ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES,
8995
nil,
@@ -156,7 +162,7 @@ def attempt_segment_sync(name, target_cn, fetch_options, max_retries, retry_dela
156162
end
157163
end
158164

159-
def attempt_splits_sync(target_cn, fetch_options, max_retries, retry_delay_seconds, with_backoff)
165+
def attempt_splits_sync(target_cn, rbs_target_cn, fetch_options, max_retries, retry_delay_seconds, with_backoff)
160166
remaining_attempts = max_retries
161167
@splits_sync_backoff.reset
162168

@@ -165,7 +171,7 @@ def attempt_splits_sync(target_cn, fetch_options, max_retries, retry_delay_secon
165171

166172
result = @split_fetcher.fetch_splits(fetch_options)
167173

168-
return sync_result(true, remaining_attempts, result[:segment_names]) if target_cn <= @splits_repository.get_change_number
174+
return sync_result(true, remaining_attempts, result[:segment_names]) if check_exit_conditions(target_cn, rbs_target_cn)
169175
return sync_result(false, remaining_attempts, result[:segment_names]) if remaining_attempts <= 0
170176

171177
delay = with_backoff ? @splits_sync_backoff.interval : retry_delay_seconds
@@ -206,6 +212,16 @@ def sync_splits_and_segments
206212

207213
splits_result[:success] && @segment_fetcher.fetch_segments
208214
end
215+
216+
def check_exit_conditions(target_change_number, rbs_target_change_number)
217+
return true if rbs_target_change_number == 0 and target_change_number == 0
218+
219+
return target_change_number <= @splits_repository.get_change_number.to_i if rbs_target_change_number == 0
220+
221+
return rbs_target_change_number <= @rule_based_segments_repository.get_change_number.to_i if target_change_number == 0
222+
223+
return (target_change_number <= @splits_repository.get_change_number.to_i and rbs_target_change_number <= @rule_based_segments_repository.get_change_number.to_i)
224+
end
209225
end
210226
end
211227
end

lib/splitclient-rb/helpers/util.rb

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,10 @@
33
module SplitIoClient
44
module Helpers
55
class Util
6-
def self.segment_names_by_object(object)
6+
def self.segment_names_by_object(object, matcher_type)
77
object[:conditions].each_with_object(Set.new) do |condition, names|
88
condition[:matcherGroup][:matchers].each do |matcher|
9-
next if matcher[:userDefinedSegmentMatcherData].nil?
10-
9+
next if matcher[:userDefinedSegmentMatcherData].nil? or matcher[:matcherType] != matcher_type
1110
names << matcher[:userDefinedSegmentMatcherData][:segmentName]
1211
end
1312
end

lib/splitclient-rb/split_factory.rb

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,7 @@ def repositories
154154
segments: @segments_repository,
155155
impressions: @impressions_repository,
156156
events: @events_repository,
157+
rule_based_segments: @rule_based_segment_repository
157158
}
158159
end
159160

@@ -178,7 +179,7 @@ def build_telemetry_components
178179
end
179180

180181
def build_fetchers
181-
@split_fetcher = SplitFetcher.new(@splits_repository, @api_key, @config, @runtime_producer)
182+
@split_fetcher = SplitFetcher.new(@splits_repository, @rule_based_segments_repository, @api_key, @config, @runtime_producer)
182183
@segment_fetcher = SegmentFetcher.new(@segments_repository, @api_key, @config, @runtime_producer)
183184
end
184185

@@ -198,7 +199,7 @@ def build_synchronizer
198199

199200
def build_streaming_components
200201
@push_status_queue = Queue.new
201-
splits_worker = SSE::Workers::SplitsWorker.new(@synchronizer, @config, @splits_repository, @runtime_producer, @segment_fetcher)
202+
splits_worker = SSE::Workers::SplitsWorker.new(@synchronizer, @config, @splits_repository, @runtime_producer, @segment_fetcher, @rule_based_segment_repository)
202203
segments_worker = SSE::Workers::SegmentsWorker.new(@synchronizer, @config, @segments_repository)
203204
notification_manager_keeper = SSE::NotificationManagerKeeper.new(@config, @runtime_producer, @push_status_queue)
204205
notification_processor = SSE::NotificationProcessor.new(@config, splits_worker, segments_worker)
@@ -220,6 +221,7 @@ def build_repositories
220221
end
221222
@splits_repository = SplitsRepository.new(@config, @flag_sets_repository, @flag_sets_filter)
222223
@segments_repository = SegmentsRepository.new(@config)
224+
@rule_based_segment_repository = RuleBasedSegmentRepository.new(@config)
223225
@impressions_repository = ImpressionsRepository.new(@config)
224226
@events_repository = EventsRepository.new(@config, @api_key, @runtime_producer)
225227
end

lib/splitclient-rb/sse/event_source/event_types.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ class EventTypes
88
SPLIT_KILL = 'SPLIT_KILL'
99
SEGMENT_UPDATE = 'SEGMENT_UPDATE'
1010
CONTROL = 'CONTROL'
11+
RB_SEGMENT_UPDATE = 'RB_SEGMENT_UPDATE'
1112
end
1213
end
1314
end

lib/splitclient-rb/sse/notification_processor.rb

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ def process(incoming_notification)
1313
case incoming_notification.data['type']
1414
when SSE::EventSource::EventTypes::SPLIT_UPDATE
1515
process_split_update(incoming_notification)
16+
when SSE::EventSource::EventTypes::RB_SEGMENT_UPDATE
17+
process_split_update(incoming_notification)
1618
when SSE::EventSource::EventTypes::SPLIT_KILL
1719
process_split_kill(incoming_notification)
1820
when SSE::EventSource::EventTypes::SEGMENT_UPDATE
@@ -25,7 +27,7 @@ def process(incoming_notification)
2527
private
2628

2729
def process_split_update(notification)
28-
@config.logger.debug("SPLIT UPDATE notification received: #{notification}") if @config.debug_enabled
30+
@config.logger.debug("#{notification.type} notification received: #{notification}") if @config.debug_enabled
2931
@splits_worker.add_to_queue(notification)
3032
end
3133

lib/splitclient-rb/sse/workers/splits_worker.rb

Lines changed: 47 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,15 @@ module SplitIoClient
44
module SSE
55
module Workers
66
class SplitsWorker
7-
def initialize(synchronizer, config, feature_flags_repository, telemetry_runtime_producer, segment_fetcher)
7+
def initialize(synchronizer, config, feature_flags_repository, telemetry_runtime_producer, segment_fetcher, rule_based_segment_repository)
88
@synchronizer = synchronizer
99
@config = config
1010
@feature_flags_repository = feature_flags_repository
1111
@queue = Queue.new
1212
@running = Concurrent::AtomicBoolean.new(false)
1313
@telemetry_runtime_producer = telemetry_runtime_producer
1414
@segment_fetcher = segment_fetcher
15+
@rule_based_segment_repository = rule_based_segment_repository
1516
end
1617

1718
def start
@@ -54,7 +55,10 @@ def perform
5455
case notification.data['type']
5556
when SSE::EventSource::EventTypes::SPLIT_UPDATE
5657
success = update_feature_flag(notification)
57-
@synchronizer.fetch_splits(notification.data['changeNumber']) unless success
58+
@synchronizer.fetch_splits(notification.data['changeNumber'], 0) unless success
59+
when SSE::EventSource::EventTypes::RB_SEGMENT_UPDATE
60+
success = update_rule_based_segment(notification)
61+
@synchronizer.fetch_splits(0, notification.data['changeNumber']) unless success
5862
when SSE::EventSource::EventTypes::SPLIT_KILL
5963
kill_feature_flag(notification)
6064
end
@@ -64,12 +68,14 @@ def perform
6468
def update_feature_flag(notification)
6569
return true if @feature_flags_repository.get_change_number.to_i >= notification.data['changeNumber']
6670
return false unless !notification.data['d'].nil? && @feature_flags_repository.get_change_number == notification.data['pcn']
67-
68-
new_split = return_split_from_json(notification)
71+
new_split = return_object_from_json(notification)
6972
SplitIoClient::Helpers::RepositoryHelper.update_feature_flag_repository(@feature_flags_repository,
7073
[new_split],
7174
notification.data['changeNumber'], @config)
72-
fetch_segments_if_not_exists(new_split)
75+
fetch_segments_if_not_exists(Helpers::Util.segment_names_by_object(new_split, "IN_SEGMENT"), @feature_flags_repository)
76+
if fetch_rule_based_segments_if_not_exists(Helpers::Util.segment_names_by_object(new_split, "IN_RULE_BASED_SEGMENT"), notification.data['changeNumber'])
77+
return true
78+
end
7379

7480
@telemetry_runtime_producer.record_updates_from_sse(Telemetry::Domain::Constants::SPLITS)
7581

@@ -80,6 +86,26 @@ def update_feature_flag(notification)
8086
false
8187
end
8288

89+
def update_rule_based_segment(notification)
90+
return true if @rule_based_segment_repository.get_change_number.to_i >= notification.data['changeNumber']
91+
return false unless !notification.data['d'].nil? && @rule_based_segment_repository.get_change_number == notification.data['pcn']
92+
93+
new_rb_segment = return_object_from_json(notification)
94+
SplitIoClient::Helpers::RepositoryHelper.update_rule_based_segment_repository(@rule_based_segment_repository,
95+
[new_rb_segment],
96+
notification.data['changeNumber'], @config)
97+
fetch_segments_if_not_exists(Helpers::Util.segment_names_by_object(new_rb_segment, "IN_SEGMENT"), @rule_based_segment_repository)
98+
99+
# TODO: enable when telemetry spec is added
100+
# @telemetry_runtime_producer.record_updates_from_sse(Telemetry::Domain::Constants::SPLITS)
101+
102+
true
103+
rescue StandardError => e
104+
@config.logger.debug("Failed to update Split: #{e.inspect}") if @config.debug_enabled
105+
106+
false
107+
end
108+
83109
def kill_feature_flag(notification)
84110
return if @feature_flags_repository.get_change_number.to_i > notification.data['changeNumber']
85111

@@ -89,21 +115,30 @@ def kill_feature_flag(notification)
89115
notification.data['splitName'],
90116
notification.data['defaultTreatment']
91117
)
92-
@synchronizer.fetch_splits(notification.data['changeNumber'])
118+
@synchronizer.fetch_splits(notification.data['changeNumber'], 0)
93119
end
94120

95-
def return_split_from_json(notification)
96-
split_json = Helpers::DecryptionHelper.get_encoded_definition(notification.data['c'], notification.data['d'])
97-
JSON.parse(split_json, symbolize_names: true)
121+
def return_object_from_json(notification)
122+
object_json = Helpers::DecryptionHelper.get_encoded_definition(notification.data['c'], notification.data['d'])
123+
JSON.parse(object_json, symbolize_names: true)
98124
end
99125

100-
def fetch_segments_if_not_exists(feature_flag)
101-
segment_names = Helpers::Util.segment_names_by_feature_flag(feature_flag)
126+
def fetch_segments_if_not_exists(segment_names, object_repository)
127+
102128
return if segment_names.nil?
103129

104-
@feature_flags_repository.set_segment_names(segment_names)
130+
object_repository.set_segment_names(segment_names)
105131
@segment_fetcher.fetch_segments_if_not_exists(segment_names)
106132
end
133+
134+
def fetch_rule_based_segments_if_not_exists(segment_names, change_number)
135+
if segment_names.nil? or segment_names.empty? or @rule_based_segment_repository.contains?(segment_names.to_a)
136+
return false
137+
end
138+
@synchronizer.fetch_splits(0, change_number)
139+
140+
true
141+
end
107142
end
108143
end
109144
end

0 commit comments

Comments
 (0)