Skip to content

Commit cd1d723

Browse files
authored
error handling update (#389)
- added sdk_error_callback option to listen for unexpected exceptions caught - new metrics *sdk_exceptions_count* + *events_successfully_sent* - non retryable events will now be considered a *log_event_failed* exception - stricter typing on statsig user to only allow JSON serializable type - copy more options into logging copy - unit tests for new telemetry logger functionalities
1 parent 708d001 commit cd1d723

10 files changed

+153
-59
lines changed

statsig/globals.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,4 +18,6 @@ def init_logger(options: StatsigOptions):
1818
logger.set_log_level(options.output_logger_level)
1919
if options.observability_client is not None:
2020
logger.set_ob_client(options.observability_client)
21+
if options.sdk_error_callback is not None:
22+
logger.set_sdk_error_callback(options.sdk_error_callback)
2123
logger.init()

statsig/http_worker.py

Lines changed: 6 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -2,19 +2,18 @@
22
import json
33
import time
44
from concurrent.futures.thread import ThreadPoolExecutor
5-
from dataclasses import dataclass
65
from decimal import Decimal
76
from io import BytesIO
8-
from typing import Callable, Tuple, Optional, Any, Dict, Union
7+
from typing import Callable, Tuple, Optional, Any, Dict
98

109
import ijson
1110
import requests
12-
from requests.structures import CaseInsensitiveDict
1311

1412
from . import globals
1513
from .diagnostics import Diagnostics, Marker
1614
from .evaluation_details import DataSource
1715
from .interface_network import IStatsigNetworkWorker, NetworkProtocol, NetworkEndpoint
16+
from .request_result import RequestResult
1817
from .sdk_configs import _SDK_Configs
1918
from .statsig_context import InitContext
2019
from .statsig_error_boundary import _StatsigErrorBoundary
@@ -23,16 +22,6 @@
2322
REQUEST_TIMEOUT = 20
2423

2524

26-
@dataclass
27-
class RequestResult:
28-
data: Optional[Dict[str, Any]]
29-
success: bool
30-
status_code: Optional[int]
31-
text: Optional[str] = None
32-
headers: Optional[Union[CaseInsensitiveDict, Dict[str, str]]] = None
33-
error: Optional[Exception] = None
34-
35-
3625
class HttpWorker(IStatsigNetworkWorker):
3726
_raise_on_error = False
3827
__RETRY_CODES = [408, 500, 502, 503, 504, 522, 524, 599]
@@ -114,7 +103,7 @@ def get_id_list(self, on_complete, url, headers, log_on_exception=False):
114103
return on_complete(resp)
115104
return on_complete(None)
116105

117-
def log_events(self, payload, headers=None, log_on_exception=False, retry=0):
106+
def log_events(self, payload, headers=None, log_on_exception=False, retry=0) -> RequestResult:
118107
disable_compression = _SDK_Configs.on("stop_log_event_compression")
119108
additional_headers = {
120109
'STATSIG-RETRY': str(retry),
@@ -127,9 +116,9 @@ def log_events(self, payload, headers=None, log_on_exception=False, retry=0):
127116
headers=additional_headers,
128117
payload=payload, log_on_exception=log_on_exception, init_timeout=None, zipped=not disable_compression,
129118
tag="log_event")
130-
if response is None or response.status_code in self.__RETRY_CODES:
131-
return payload
132-
return None
119+
if response.status_code in self.__RETRY_CODES:
120+
response.retryable = True
121+
return response
133122

134123
def shutdown(self) -> None:
135124
self._executor.shutdown(wait=False)

statsig/request_result.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
from dataclasses import dataclass
2+
from typing import Optional, Any, Dict, Union
3+
4+
from requests.structures import CaseInsensitiveDict
5+
6+
7+
@dataclass
8+
class RequestResult:
9+
data: Optional[Dict[str, Any]]
10+
success: bool
11+
status_code: Optional[int]
12+
text: Optional[str] = None
13+
headers: Optional[Union[CaseInsensitiveDict, Dict[str, str]]] = None
14+
error: Optional[Exception] = None
15+
retryable: bool = False

statsig/statsig_error_boundary.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ def log_exception(
106106
return
107107
self._seen.add(name)
108108

109+
globals.logger.log_sdk_exception(name, exception)
109110
self._executor.submit(
110111
self._post_exception,
111112
name,

statsig/statsig_logger_worker.py

Lines changed: 22 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -118,24 +118,30 @@ def _send_and_reset_dropped_events_count(self):
118118
def _flush_to_server(self, batched_events: BatchEventLogs):
119119
if self._local_mode:
120120
return
121-
res = self._net.log_events(batched_events.payload, retry=batched_events.retries,
122-
log_on_exception=True, headers=batched_events.headers)
123-
if res is not None:
124-
if batched_events.retries >= 10:
125-
message = (
126-
f"Failed to post {batched_events.event_count} logs after 10 retries, dropping the request"
127-
)
128-
self._error_boundary.log_exception(
129-
"statsig::log_event_failed",
130-
Exception(message),
131-
{"eventCount": batched_events.event_count, "error": message},
132-
bypass_dedupe=True
133-
)
134-
globals.logger.warning(message)
135-
return
121+
result = self._net.log_events(batched_events.payload, retry=batched_events.retries,
122+
log_on_exception=True, headers=batched_events.headers)
136123

137-
self._failure_backoff()
124+
if result.success:
125+
globals.logger.increment("events_successfully_sent_count", batched_events.event_count)
126+
self._success_backoff()
127+
return
138128

129+
if batched_events.retries >= 10 or not result.retryable:
130+
message = (
131+
f"Failed to post {batched_events.event_count} logs. The request was either not retryable or failed after 10 retries, and has been dropped."
132+
)
133+
self._error_boundary.log_exception(
134+
"statsig::log_event_failed",
135+
Exception(message),
136+
{"eventCount": batched_events.event_count, "error": message},
137+
bypass_dedupe=True
138+
)
139+
globals.logger.warning(message)
140+
return
141+
142+
self._failure_backoff()
143+
144+
if result.retryable:
139145
self.event_batch_processor.add_to_batched_events_queue(
140146
BatchEventLogs(
141147
batched_events.payload,
@@ -144,8 +150,6 @@ def _flush_to_server(self, batched_events: BatchEventLogs):
144150
batched_events.retries + 1,
145151
)
146152
)
147-
else:
148-
self._success_backoff()
149153

150154
def _get_curr_interval(self):
151155
with self.lock:

statsig/statsig_network.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
from . import globals
66
from .diagnostics import Diagnostics
7-
from .http_worker import HttpWorker
7+
from .http_worker import HttpWorker, RequestResult
88
from .interface_network import (
99
IStreamingFallback,
1010
IStreamingListeners,
@@ -221,10 +221,10 @@ def get_id_list(self, on_complete: Any, url, headers, log_on_exception=False):
221221
return
222222
self.http_worker.get_id_list(on_complete, url, headers, log_on_exception)
223223

224-
def log_events(self, payload, headers=None, log_on_exception=False, retry=0):
224+
def log_events(self, payload, headers=None, log_on_exception=False, retry=0) -> RequestResult:
225225
if self.options.local_mode:
226226
globals.logger.warning("Local mode is enabled. Not logging events.")
227-
return None
227+
return RequestResult(data=None, status_code=None, success=False, error=None)
228228
return self.log_event_worker.log_events(
229229
payload, headers=headers, log_on_exception=log_on_exception, retry=retry
230230
)

statsig/statsig_options.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ def __init__(
110110
output_logger_level: Optional[LogLevel] = LogLevel.WARNING,
111111
overall_init_timeout: Optional[float] = None,
112112
observability_client: Optional[ObservabilityClient] = None,
113+
sdk_error_callback: Optional[Callable[[str, Exception], None]] = None,
113114
):
114115
self.data_store = data_store
115116
self._environment: Union[None, dict] = None
@@ -157,6 +158,7 @@ def __init__(
157158
self.output_logger_level = output_logger_level
158159
self.overall_init_timeout = overall_init_timeout
159160
self.observability_client = observability_client
161+
self.sdk_error_callback = sdk_error_callback
160162
self._logging_copy: Dict[str, Any] = {}
161163
self._set_logging_copy()
162164
self._attributes_changed = False
@@ -203,7 +205,7 @@ def _set_logging_copy(self):
203205
logging_copy["init_timeout"] = self.init_timeout
204206
if self.timeout:
205207
logging_copy["timeout"] = self.timeout
206-
if (self.rulesets_sync_interval) != DEFAULT_RULESET_SYNC_INTERVAL:
208+
if self.rulesets_sync_interval != DEFAULT_RULESET_SYNC_INTERVAL:
207209
logging_copy["rulesets_sync_interval"] = self.rulesets_sync_interval
208210
if self.idlists_sync_interval != DEFAULT_IDLIST_SYNC_INTERVAL:
209211
logging_copy["idlists_sync_interval"] = self.idlists_sync_interval
@@ -219,6 +221,8 @@ def _set_logging_copy(self):
219221
logging_copy["logging_interval"] = self.logging_interval
220222
if self.disable_diagnostics:
221223
logging_copy["disable_diagnostics"] = self.disable_diagnostics
224+
if self.disable_all_logging:
225+
logging_copy["disable_all_logging"] = self.disable_all_logging
222226
if self.event_queue_size != DEFAULT_EVENT_QUEUE_SIZE:
223227
logging_copy["event_queue_size"] = self.event_queue_size
224228
if self.retry_queue_size != DEFAULT_RETRY_QUEUE_SIZE:
@@ -227,5 +231,15 @@ def _set_logging_copy(self):
227231
logging_copy["overall_init_timeout"] = self.overall_init_timeout
228232
if self.observability_client is not None:
229233
logging_copy["observability_client"] = "SET"
234+
if self.fallback_to_statsig_api:
235+
logging_copy["fallback_to_statsig_api"] = self.fallback_to_statsig_api
236+
if self.out_of_sync_threshold_in_s:
237+
logging_copy["out_of_sync_threshold_in_s"] = self.out_of_sync_threshold_in_s
238+
if self.initialize_sources:
239+
logging_copy["initialize_sources"] = "SET"
240+
if self.config_sync_sources:
241+
logging_copy["config_sync_sources"] = "SET"
242+
if self.sdk_error_callback:
243+
logging_copy["sdk_error_callback"] = "SET"
230244
self._logging_copy = logging_copy
231245
self._attributes_changed = False

statsig/statsig_telemetry_logger.py

Lines changed: 37 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import functools
22
import time
3-
from typing import Optional, Dict, Any
3+
from typing import Optional, Dict, Any, Callable
44

55
from .initialize_details import InitializeDetails
66
from .interface_observability_client import ObservabilityClient
@@ -49,17 +49,22 @@ def __init_subclass__(cls, **kwargs):
4949

5050

5151
class StatsigTelemetryLogger(AutoTryCatch):
52-
def __init__(self, logger=None, ob_client: Optional[ObservabilityClient] = None):
52+
def __init__(self, logger=None, ob_client: Optional[ObservabilityClient] = None,
53+
sdk_error_callback: Optional[Callable[[str, Exception], None]] = None):
5354
self.high_cardinality_tags = {"lcut", "prev_lcut"}
5455
self.logger = logger or OutputLogger(TELEMETRY_PREFIX)
5556
self.ob_client = ob_client or NoopObservabilityClient()
57+
self.sdk_error_callback = sdk_error_callback
5658

5759
def set_logger(self, output_logger):
5860
self.logger = output_logger
5961

6062
def set_ob_client(self, ob_client):
6163
self.ob_client = ob_client
6264

65+
def set_sdk_error_callback(self, sdk_error_callback):
66+
self.sdk_error_callback = sdk_error_callback
67+
6368
def init(self):
6469
self.ob_client.init()
6570

@@ -84,6 +89,17 @@ def exception(self, msg, *args, **kwargs):
8489
def log_process(self, process, msg):
8590
self.logger.log_process(process, msg)
8691

92+
def increment(self, metric_name: str, value: int = 1, tags: Optional[Dict[str, Any]] = None):
93+
self.ob_client.increment(f'{TELEMETRY_PREFIX}.{metric_name}', value,
94+
self.filter_high_cardinality_tags(tags or {}))
95+
96+
def gauge(self, metric_name: str, value: float, tags: Optional[Dict[str, Any]] = None):
97+
self.ob_client.gauge(f'{TELEMETRY_PREFIX}.{metric_name}', value, self.filter_high_cardinality_tags(tags or {}))
98+
99+
def distribution(self, metric_name: str, value: float, tags: Optional[Dict[str, Any]] = None):
100+
self.ob_client.distribution(f'{TELEMETRY_PREFIX}.{metric_name}', value,
101+
self.filter_high_cardinality_tags(tags or {}))
102+
87103
def log_post_init(self, options: StatsigOptions, init_details: InitializeDetails):
88104
if options.local_mode:
89105
if init_details.init_success:
@@ -93,11 +109,11 @@ def log_post_init(self, options: StatsigOptions, init_details: InitializeDetails
93109
self.logger.error("Statsig SDK instance failed to initialize in local mode.")
94110
return
95111

96-
self.ob_client.distribution(f"{TELEMETRY_PREFIX}.initialization", init_details.duration,
97-
self.filter_high_cardinality_tags({"source": init_details.source,
98-
"store_populated": init_details.store_populated,
99-
"init_success": init_details.init_success,
100-
"init_source_api": init_details.init_source_api}))
112+
self.distribution("initialization", init_details.duration,
113+
self.filter_high_cardinality_tags({"source": init_details.source,
114+
"store_populated": init_details.store_populated,
115+
"init_success": init_details.init_success,
116+
"init_source_api": init_details.init_source_api}))
101117

102118
if init_details.init_success:
103119
if init_details.store_populated:
@@ -120,20 +136,26 @@ def log_config_sync_update(self, initialized: bool, has_update: bool, lcut: int,
120136
return # do not log for initialize
121137
if not has_update:
122138
self.log_process("Config Sync", "No update")
123-
self.ob_client.increment(f"{TELEMETRY_PREFIX}.config_no_update", 1, {"source": source, "source_api": api})
139+
self.increment("config_no_update", 1, {"source": source, "source_api": api})
124140
return
125141

126142
lcut_diff = abs(lcut - int(time.time() * 1000))
127143
if lcut_diff > 0:
128-
self.ob_client.distribution(f"{TELEMETRY_PREFIX}.config_propagation_diff", lcut_diff,
129-
self.filter_high_cardinality_tags({
130-
"source": source,
131-
"source_api": api,
132-
"lcut": lcut,
133-
"prev_lcut": prev_lcut
134-
}))
144+
self.distribution("config_propagation_diff", lcut_diff,
145+
self.filter_high_cardinality_tags({
146+
"source": source,
147+
"source_api": api,
148+
"lcut": lcut,
149+
"prev_lcut": prev_lcut
150+
}))
135151
self.log_process("Config Sync", f"Received updated configs from {lcut}")
136152

153+
def log_sdk_exception(self, tag: str, exception: Exception):
154+
if self.sdk_error_callback is not None:
155+
self.sdk_error_callback(tag, exception)
156+
157+
self.increment("sdk_exceptions_count")
158+
137159
def filter_high_cardinality_tags(self, tags: Dict[str, Any]) -> Dict[str, Any]:
138160
return {tag: value for tag, value in tags.items()
139161
if tag not in self.high_cardinality_tags or self.ob_client.should_enable_high_cardinality_for_this_tag(

statsig/statsig_user.py

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
from dataclasses import dataclass
2-
from typing import Optional
2+
from typing import Optional, Union, List, Dict
33

44
from .statsig_environment_tier import StatsigEnvironmentTier
55
from .statsig_errors import StatsigValueError
66
from .utils import str_or_none, to_raw_dict_or_none, djb2_hash_for_dict
77

8+
JSONPrimitive = Union[str, int, float, bool, None]
9+
JSONValue = Union[JSONPrimitive, List['JSONValue'], Dict[str, 'JSONValue']]
10+
811

912
@dataclass
1013
class StatsigUser:
@@ -21,10 +24,10 @@ class StatsigUser:
2124
country: Optional[str] = None
2225
locale: Optional[str] = None
2326
app_version: Optional[str] = None
24-
custom: Optional[dict] = None
25-
private_attributes: Optional[dict] = None
26-
custom_ids: Optional[dict] = None
27-
_statsig_environment: Optional[dict] = None
27+
custom: Optional[Dict[str, JSONValue]] = None
28+
private_attributes: Optional[Dict[str, JSONValue]] = None
29+
custom_ids: Optional[Dict[str, str]] = None
30+
_statsig_environment: Optional[Dict[str, JSONValue]] = None
2831

2932
def __post_init__(self):
3033
# ensure there is a user id or at least a custom ID, empty dict

0 commit comments

Comments
 (0)