Skip to content

updated models and recorder #548

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion splitio/engine/evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ def eval_with_context(self, key, bucketing, feature_name, attrs, ctx):
'impression': {
'label': label,
'change_number': _change_number
}
},
'track': feature.trackImpressions
}

def _treatment_for_flag(self, flag, key, bucketing, attributes, ctx):
Expand Down
26 changes: 20 additions & 6 deletions splitio/engine/impressions/impressions.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ class ImpressionsMode(Enum):
class Manager(object): # pylint:disable=too-few-public-methods
"""Impression manager."""

def __init__(self, strategy, telemetry_runtime_producer):
def __init__(self, strategy, none_strategy, telemetry_runtime_producer):
"""
Construct a manger to track and forward impressions to the queue.

Expand All @@ -23,19 +23,33 @@ def __init__(self, strategy, telemetry_runtime_producer):
"""

self._strategy = strategy
self._none_strategy = none_strategy
self._telemetry_runtime_producer = telemetry_runtime_producer

def process_impressions(self, impressions):
def process_impressions(self, impressions_decorated):
"""
Process impressions.

Impressions are analyzed to see if they've been seen before and counted.

:param impressions: List of impression objects with attributes
:type impressions: list[tuple[splitio.models.impression.Impression, dict]]
:param impressions_decorated: List of impression objects with attributes
:type impressions_decorated: list[tuple[splitio.models.impression.ImpressionDecorated, dict]]

:return: processed and deduped impressions.
:rtype: tuple(list[tuple[splitio.models.impression.Impression, dict]], list(int))
"""
for_log, for_listener, for_counter, for_unique_keys_tracker = self._strategy.process_impressions(impressions)
return for_log, len(impressions) - len(for_log), for_listener, for_counter, for_unique_keys_tracker
for_listener_all = []
for_log_all = []
for_counter_all = []
for_unique_keys_tracker_all = []
for impression_decorated, att in impressions_decorated:
if not impression_decorated.track:
for_log, for_listener, for_counter, for_unique_keys_tracker = self._none_strategy.process_impressions([(impression_decorated.Impression, att)])
else:
for_log, for_listener, for_counter, for_unique_keys_tracker = self._strategy.process_impressions([(impression_decorated.Impression, att)])
for_listener_all.extend(for_listener)
for_log_all.extend(for_log)
for_counter_all.extend(for_counter)
for_unique_keys_tracker_all.extend(for_unique_keys_tracker)

return for_log_all, len(impressions_decorated) - len(for_log_all), for_listener_all, for_counter_all, for_unique_keys_tracker_all
8 changes: 8 additions & 0 deletions splitio/models/impressions.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,14 @@
]
)

ImpressionDecorated = namedtuple(
'ImpressionDecorated',
[
'Impression',
'track'
]
)

# pre-python3.7 hack to make previous_time optional
Impression.__new__.__defaults__ = (None,)

Expand Down
22 changes: 17 additions & 5 deletions splitio/models/splits.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

SplitView = namedtuple(
'SplitView',
['name', 'traffic_type', 'killed', 'treatments', 'change_number', 'configs', 'default_treatment', 'sets']
['name', 'traffic_type', 'killed', 'treatments', 'change_number', 'configs', 'default_treatment', 'sets', 'trackImpressions']
)

_DEFAULT_CONDITIONS_TEMPLATE = {
Expand Down Expand Up @@ -73,7 +73,8 @@ def __init__( # pylint: disable=too-many-arguments
traffic_allocation=None,
traffic_allocation_seed=None,
configurations=None,
sets=None
sets=None,
trackImpressions=None
):
"""
Class constructor.
Expand All @@ -96,6 +97,8 @@ def __init__( # pylint: disable=too-many-arguments
:type traffic_allocation_seed: int
:pram sets: list of flag sets
:type sets: list
:pram trackImpressions: track impressions flag
:type trackImpressions: boolean
"""
self._name = name
self._seed = seed
Expand Down Expand Up @@ -125,6 +128,7 @@ def __init__( # pylint: disable=too-many-arguments

self._configurations = configurations
self._sets = set(sets) if sets is not None else set()
self._trackImpressions = trackImpressions if trackImpressions is not None else True

@property
def name(self):
Expand Down Expand Up @@ -186,6 +190,11 @@ def sets(self):
"""Return the flag sets of the split."""
return self._sets

@property
def trackImpressions(self):
"""Return trackImpressions of the split."""
return self._trackImpressions

def get_configurations_for(self, treatment):
"""Return the mapping of treatments to configurations."""
return self._configurations.get(treatment) if self._configurations else None
Expand Down Expand Up @@ -214,7 +223,8 @@ def to_json(self):
'algo': self.algo.value,
'conditions': [c.to_json() for c in self.conditions],
'configurations': self._configurations,
'sets': list(self._sets)
'sets': list(self._sets),
'trackImpressions': self._trackImpressions
}

def to_split_view(self):
Expand All @@ -232,7 +242,8 @@ def to_split_view(self):
self.change_number,
self._configurations if self._configurations is not None else {},
self._default_treatment,
list(self._sets) if self._sets is not None else []
list(self._sets) if self._sets is not None else [],
self._trackImpressions
)

def local_kill(self, default_treatment, change_number):
Expand Down Expand Up @@ -288,5 +299,6 @@ def from_raw(raw_split):
traffic_allocation=raw_split.get('trafficAllocation'),
traffic_allocation_seed=raw_split.get('trafficAllocationSeed'),
configurations=raw_split.get('configurations'),
sets=set(raw_split.get('sets')) if raw_split.get('sets') is not None else []
sets=set(raw_split.get('sets')) if raw_split.get('sets') is not None else [],
trackImpressions=raw_split.get('trackImpressions') if raw_split.get('trackImpressions') is not None else True
)
16 changes: 8 additions & 8 deletions splitio/recorder/recorder.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ def __init__(self, impressions_manager, event_storage, impression_storage, telem
self._telemetry_evaluation_producer = telemetry_evaluation_producer
self._telemetry_runtime_producer = telemetry_runtime_producer

def record_treatment_stats(self, impressions, latency, operation, method_name):
def record_treatment_stats(self, impressions_decorated, latency, operation, method_name):
"""
Record stats for treatment evaluation.

Expand All @@ -165,7 +165,7 @@ def record_treatment_stats(self, impressions, latency, operation, method_name):
try:
if method_name is not None:
self._telemetry_evaluation_producer.record_latency(operation, latency)
impressions, deduped, for_listener, for_counter, for_unique_keys_tracker = self._impressions_manager.process_impressions(impressions)
impressions, deduped, for_listener, for_counter, for_unique_keys_tracker = self._impressions_manager.process_impressions(impressions_decorated)
if deduped > 0:
self._telemetry_runtime_producer.record_impression_stats(telemetry.CounterConstants.IMPRESSIONS_DEDUPED, deduped)
self._impression_storage.put(impressions)
Expand Down Expand Up @@ -210,7 +210,7 @@ def __init__(self, impressions_manager, event_storage, impression_storage, telem
self._telemetry_evaluation_producer = telemetry_evaluation_producer
self._telemetry_runtime_producer = telemetry_runtime_producer

async def record_treatment_stats(self, impressions, latency, operation, method_name):
async def record_treatment_stats(self, impressions_decorated, latency, operation, method_name):
"""
Record stats for treatment evaluation.

Expand All @@ -224,7 +224,7 @@ async def record_treatment_stats(self, impressions, latency, operation, method_n
try:
if method_name is not None:
await self._telemetry_evaluation_producer.record_latency(operation, latency)
impressions, deduped, for_listener, for_counter, for_unique_keys_tracker = self._impressions_manager.process_impressions(impressions)
impressions, deduped, for_listener, for_counter, for_unique_keys_tracker = self._impressions_manager.process_impressions(impressions_decorated)
if deduped > 0:
await self._telemetry_runtime_producer.record_impression_stats(telemetry.CounterConstants.IMPRESSIONS_DEDUPED, deduped)

Expand Down Expand Up @@ -277,7 +277,7 @@ def __init__(self, pipe, impressions_manager, event_storage,
self._data_sampling = data_sampling
self._telemetry_redis_storage = telemetry_redis_storage

def record_treatment_stats(self, impressions, latency, operation, method_name):
def record_treatment_stats(self, impressions_decorated, latency, operation, method_name):
"""
Record stats for treatment evaluation.

Expand All @@ -294,7 +294,7 @@ def record_treatment_stats(self, impressions, latency, operation, method_name):
if self._data_sampling < rnumber:
return

impressions, deduped, for_listener, for_counter, for_unique_keys_tracker = self._impressions_manager.process_impressions(impressions)
impressions, deduped, for_listener, for_counter, for_unique_keys_tracker = self._impressions_manager.process_impressions(impressions_decorated)
if impressions:
pipe = self._make_pipe()
self._impression_storage.add_impressions_to_pipe(impressions, pipe)
Expand Down Expand Up @@ -367,7 +367,7 @@ def __init__(self, pipe, impressions_manager, event_storage,
self._data_sampling = data_sampling
self._telemetry_redis_storage = telemetry_redis_storage

async def record_treatment_stats(self, impressions, latency, operation, method_name):
async def record_treatment_stats(self, impressions_decorated, latency, operation, method_name):
"""
Record stats for treatment evaluation.

Expand All @@ -384,7 +384,7 @@ async def record_treatment_stats(self, impressions, latency, operation, method_n
if self._data_sampling < rnumber:
return

impressions, deduped, for_listener, for_counter, for_unique_keys_tracker = self._impressions_manager.process_impressions(impressions)
impressions, deduped, for_listener, for_counter, for_unique_keys_tracker = self._impressions_manager.process_impressions(impressions_decorated)
if impressions:
pipe = self._make_pipe()
self._impression_storage.add_impressions_to_pipe(impressions, pipe)
Expand Down
1 change: 1 addition & 0 deletions tests/engine/test_evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ def test_evaluate_treatment_ok(self, mocker):
assert result['impression']['change_number'] == 123
assert result['impression']['label'] == 'some_label'
assert mocked_split.get_configurations_for.mock_calls == [mocker.call('on')]
assert result['track'] == mocked_split.trackImpressions


def test_evaluate_treatment_ok_no_config(self, mocker):
Expand Down
Loading