-
Notifications
You must be signed in to change notification settings - Fork 43
feat: streaming support #248
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
7fb2b86
d52542a
739a856
237e448
c9c842b
8c26899
c92cd91
0ca130f
4bc41ee
84ce97e
4886cb7
019ddec
c85d9f1
7b8b8d6
623b9dd
0fdba09
0a0ab35
da360d1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,50 @@ | ||
#!/usr/bin/env ruby | ||
|
||
require 'unleash' | ||
require 'unleash/context' | ||
|
||
puts ">> START streaming.rb" | ||
|
||
@unleash = Unleash::Client.new( | ||
url: 'https://app.unleash-hosted.com/demo/api', | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. demo doesn't have streaming enabled so this needs to be changed to enterprise URL |
||
custom_http_headers: { 'Authorization': 'demo-app:dev.9fc74dd72d2b88bea5253c04240b21a54841f08d9918046ed55a06b5' }, | ||
app_name: 'streaming-test', | ||
instance_id: 'local-streaming-cli', | ||
refresh_interval: 2, | ||
metrics_interval: 2, | ||
retry_limit: 2, | ||
experimental_mode: { type: 'streaming' }, | ||
timeout: 5, | ||
log_level: Logger::DEBUG | ||
) | ||
|
||
feature_name = "example-flag" | ||
unleash_context = Unleash::Context.new | ||
unleash_context.user_id = 123 | ||
|
||
puts "Waiting for client to initialize..." | ||
sleep 2 | ||
|
||
100.times do | ||
if @unleash.is_enabled?(feature_name, unleash_context) | ||
puts "> #{feature_name} is enabled" | ||
else | ||
puts "> #{feature_name} is not enabled" | ||
end | ||
sleep 1 | ||
puts "---" | ||
puts "" | ||
puts "" | ||
end | ||
feature_name = "foobar" | ||
if @unleash.is_enabled?(feature_name, unleash_context, true) | ||
puts "> #{feature_name} is enabled" | ||
else | ||
puts "> #{feature_name} is not enabled" | ||
end | ||
|
||
puts "> shutting down client..." | ||
|
||
@unleash.shutdown | ||
|
||
puts ">> END streaming.rb" |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,8 +2,10 @@ | |
require 'unleash/toggle_fetcher' | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I feel like there's a missing somewhere here and I think that's related to the difference between the toggle fetcher internals and LD's event source lib. The toggle fetching code splits the logic for fetching toggles and the concurrency responsibility into two classes, whereas with the event source lib it's a single entity. This means we can't treat them like interchangable ducks and I really think we should be able to do that. It feels very wrong to have a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the root cause is who's driving the new updates: scheduler or streaming client handler. I will play around with a different split of responsibilities (as I did in the Java SDK) on Monday but I'm not sure we can have a drop-in replacement here There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You comment was spot on. Thank you for that. I managed to create streaming_client_executor and streaming_event_processor. streaming_client_executor can play a role of a fetcher_scheduled_executor (kept the same field name in attr jus tin case someone depends on it). Please let me know if you like it more now There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I like it! |
||
require 'unleash/metrics_reporter' | ||
require 'unleash/scheduled_executor' | ||
require 'unleash/streaming_client_executor' | ||
require 'unleash/variant' | ||
require 'unleash/util/http' | ||
require 'unleash/util/event_source_wrapper' | ||
require 'logger' | ||
require 'time' | ||
|
||
|
@@ -21,7 +23,8 @@ def initialize(*opts) | |
Unleash.engine = YggdrasilEngine.new | ||
Unleash.engine.register_custom_strategies(Unleash.configuration.strategies.custom_strategies) | ||
|
||
Unleash.toggle_fetcher = Unleash::ToggleFetcher.new Unleash.engine | ||
Unleash.toggle_fetcher = Unleash::ToggleFetcher.new Unleash.engine unless Unleash.configuration.streaming_mode? | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fetcher makes initial HTTP call on instance creation. We don't want that for how with streaming to avoid cross locks between streaming client and polling client |
||
|
||
if Unleash.configuration.disable_client | ||
Unleash.logger.warn "Unleash::Client is disabled! Will only return default (or bootstrapped if available) results!" | ||
Unleash.logger.warn "Unleash::Client is disabled! Metrics and MetricsReporter are also disabled!" | ||
|
@@ -30,7 +33,9 @@ def initialize(*opts) | |
end | ||
|
||
register | ||
start_toggle_fetcher | ||
|
||
initialize_client_mode | ||
|
||
start_metrics unless Unleash.configuration.disable_metrics | ||
end | ||
# rubocop:enable Metrics/AbcSize | ||
|
@@ -105,7 +110,7 @@ def shutdown | |
# quick shutdown: just kill running threads | ||
def shutdown! | ||
unless Unleash.configuration.disable_client | ||
self.fetcher_scheduled_executor.exit | ||
self.fetcher_scheduled_executor&.exit | ||
self.metrics_scheduled_executor.exit unless Unleash.configuration.disable_metrics | ||
end | ||
end | ||
|
@@ -140,6 +145,11 @@ def start_toggle_fetcher | |
end | ||
end | ||
|
||
def start_streaming_client | ||
self.fetcher_scheduled_executor = Unleash::StreamingClientExecutor.new('StreamingExecutor', Unleash.engine) | ||
self.fetcher_scheduled_executor.run | ||
end | ||
|
||
def start_metrics | ||
Unleash.reporter = Unleash::MetricsReporter.new | ||
self.metrics_scheduled_executor = Unleash::ScheduledExecutor.new( | ||
|
@@ -172,5 +182,13 @@ def disabled_variant | |
def first_fetch_is_eager | ||
Unleash.configuration.use_bootstrap? | ||
end | ||
|
||
def initialize_client_mode | ||
if Unleash.configuration.streaming_mode? | ||
start_streaming_client | ||
else | ||
start_toggle_fetcher | ||
end | ||
end | ||
end | ||
end |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,7 +22,8 @@ class Configuration | |
:log_level, | ||
:bootstrap_config, | ||
:strategies, | ||
:use_delta_api | ||
:use_delta_api, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we should deprecate this property as we have a standardized experimental mode in Node SDK that is either {type: 'streaming'} or {type: 'polling', format: 'delta'} or {type: 'polling', format: 'full'} There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think having a standard with interface between language and SDK is too important here, most of the SDKs provide interfaces that are comfortable in that language. I do agree that we should deprecate this and use your pattern though |
||
:experimental_mode | ||
attr_reader :connection_id | ||
|
||
def initialize(opts = {}) | ||
|
@@ -67,7 +68,9 @@ def fetch_toggles_uri | |
uri = nil | ||
## Personal feeling but Rubocop's suggestion here is too dense to be properly readable | ||
# rubocop:disable Style/ConditionalAssignment | ||
if self.use_delta_api | ||
if streaming_mode? | ||
uri = URI("#{self.url_stripped_of_slash}/client/streaming") | ||
elsif self.use_delta_api || polling_with_delta? | ||
uri = URI("#{self.url_stripped_of_slash}/client/delta") | ||
else | ||
uri = URI("#{self.url_stripped_of_slash}/client/features") | ||
|
@@ -93,6 +96,17 @@ def use_bootstrap? | |
self.bootstrap_config&.valid? | ||
end | ||
|
||
def streaming_mode? | ||
validate_streaming_support! if streaming_configured? | ||
streaming_configured? | ||
end | ||
|
||
def polling_with_delta? | ||
self.experimental_mode.is_a?(Hash) && | ||
self.experimental_mode[:type] == 'polling' && | ||
self.experimental_mode[:format] == 'delta' | ||
end | ||
|
||
private | ||
|
||
def set_defaults | ||
|
@@ -112,6 +126,7 @@ def set_defaults | |
self.bootstrap_config = nil | ||
self.strategies = Unleash::Strategies.new | ||
self.use_delta_api = false | ||
self.experimental_mode = nil | ||
|
||
self.custom_http_headers = {} | ||
@connection_id = SecureRandom.uuid | ||
|
@@ -149,5 +164,15 @@ def set_option(opt, val) | |
rescue NoMethodError | ||
raise ArgumentError, "unknown configuration parameter '#{val}'" | ||
end | ||
|
||
def streaming_configured? | ||
self.experimental_mode.is_a?(Hash) && self.experimental_mode[:type] == 'streaming' | ||
end | ||
|
||
def validate_streaming_support! | ||
return unless RUBY_ENGINE == 'jruby' | ||
|
||
raise "Streaming mode is not supported on JRuby. Please use polling mode instead or switch to MRI/CRuby." | ||
end | ||
end | ||
end |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,85 @@ | ||
require 'unleash/streaming_event_processor' | ||
require 'unleash/util/event_source_wrapper' | ||
|
||
module Unleash | ||
class StreamingClientExecutor | ||
attr_accessor :name, :event_source, :event_processor, :running | ||
|
||
def initialize(name, engine) | ||
self.name = name || 'StreamingClientExecutor' | ||
self.event_source = nil | ||
self.event_processor = Unleash::StreamingEventProcessor.new(engine) | ||
self.running = false | ||
end | ||
|
||
def run(&_block) | ||
start | ||
end | ||
|
||
def start | ||
return if self.running || Unleash.configuration.disable_client | ||
|
||
Unleash.logger.debug "Streaming client #{self.name} starting connection to: #{Unleash.configuration.fetch_toggles_uri}" | ||
|
||
self.event_source = create_event_source | ||
setup_event_handlers | ||
|
||
self.running = true | ||
Unleash.logger.debug "Streaming client #{self.name} connection established" | ||
end | ||
|
||
def stop | ||
return unless self.running | ||
|
||
Unleash.logger.debug "Streaming client #{self.name} stopping connection" | ||
self.running = false | ||
self.event_source&.close | ||
self.event_source = nil | ||
Unleash.logger.debug "Streaming client #{self.name} connection closed" | ||
end | ||
|
||
alias exit stop | ||
|
||
def running? | ||
self.running | ||
end | ||
|
||
private | ||
|
||
def create_event_source | ||
sse_client = Unleash::Util::EventSourceWrapper.client | ||
if sse_client.nil? | ||
raise "Streaming mode is configured but EventSource client is not available. " \ | ||
"Please install the 'ld-eventsource' gem or switch to polling mode." | ||
end | ||
|
||
headers = (Unleash.configuration.http_headers || {}).dup | ||
|
||
sse_client.new( | ||
Unleash.configuration.fetch_toggles_uri.to_s, | ||
headers: headers, | ||
read_timeout: 60, | ||
reconnect_time: 2, | ||
connect_timeout: 10, | ||
logger: Unleash.logger | ||
) | ||
end | ||
|
||
def setup_event_handlers | ||
self.event_source.on_event do |event| | ||
handle_event(event) | ||
end | ||
|
||
self.event_source.on_error do |error| | ||
Unleash.logger.warn "Streaming client #{self.name} error: #{error}" | ||
end | ||
end | ||
|
||
def handle_event(event) | ||
self.event_processor.process_event(event) | ||
rescue StandardError => e | ||
Unleash.logger.error "Streaming client #{self.name} threw exception #{e.class}: '#{e}'" | ||
Unleash.logger.debug "stacktrace: #{e.backtrace}" | ||
end | ||
end | ||
end |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,53 @@ | ||
require 'json' | ||
|
||
module Unleash | ||
class StreamingEventProcessor | ||
attr_accessor :toggle_engine, :mutex | ||
|
||
def initialize(toggle_engine) | ||
self.toggle_engine = toggle_engine | ||
self.mutex = Mutex.new | ||
end | ||
|
||
def process_event(event) | ||
case event.type.to_s | ||
when 'unleash-connected' | ||
Unleash.logger.debug "Streaming client connected" | ||
handle_connected_event(event) | ||
when 'unleash-updated' | ||
Unleash.logger.debug "Received streaming update" | ||
handle_updated_event(event) | ||
else | ||
Unleash.logger.debug "Received unknown event type: #{event.type}" | ||
end | ||
rescue StandardError => e | ||
Unleash.logger.error "Error handling streaming event threw exception #{e.class}: '#{e}'" | ||
Unleash.logger.debug "stacktrace: #{e.backtrace}" | ||
end | ||
|
||
def handle_delta_event(event_data) | ||
self.mutex.synchronize do | ||
self.toggle_engine.take_state(event_data) | ||
end | ||
end | ||
|
||
private | ||
|
||
def handle_connected_event(event) | ||
Unleash.logger.debug "Processing initial hydration data" | ||
handle_updated_event(event) | ||
end | ||
|
||
def handle_updated_event(event) | ||
handle_delta_event(event.data) | ||
|
||
# TODO: update backup file | ||
rescue JSON::ParserError => e | ||
Unleash.logger.error "Unable to parse JSON from streaming event data. Exception thrown #{e.class}: '#{e}'" | ||
Unleash.logger.debug "stacktrace: #{e.backtrace}" | ||
rescue StandardError => e | ||
Unleash.logger.error "Error processing delta update threw exception #{e.class}: '#{e}'" | ||
Unleash.logger.debug "stacktrace: #{e.backtrace}" | ||
end | ||
end | ||
end |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
module Unleash | ||
module Util | ||
module EventSourceWrapper | ||
def self.client | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm, I'm a bit tempted to fail hard here rather than fall back to polling. I'm kinda okay with errors that happen on startup and I think it's better to force folks to set their stuff up correctly. We can always relax it later but if we do this now, I doubt we can enforce it later There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
return nil if RUBY_ENGINE == 'jruby' | ||
|
||
begin | ||
require 'ld-eventsource' | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. don't let this dependency spread throughout the code |
||
SSE::Client | ||
rescue LoadError => e | ||
Unleash.logger.error "Failed to load ld-eventsource: #{e.message}" | ||
nil | ||
end | ||
end | ||
end | ||
end | ||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
re-using sdk-examples setup https://github.com/Unleash/unleash-sdk-examples/blob/main/Ruby/.env.example as heroku is deprecated