80
80
81
81
# SegmentKey is an internal identifier used by the redis buffer that is also
82
82
# directly used as raw redis key. the format is
83
- # "span-buf:s:{project_id:trace_id} :span_id", and the type is bytes because our
83
+ # "span-buf:s:{partition}: project_id:trace_id:span_id", and the type is bytes because our
84
84
# redis client is bytes.
85
85
#
86
86
# The segment ID in the Kafka protocol is only the span ID.
92
92
93
93
94
94
def _segment_key_to_span_id (segment_key : SegmentKey ) -> bytes :
95
- return parse_segment_key (segment_key )[2 ]
95
+ return parse_segment_key (segment_key )[3 ]
96
96
97
97
98
- def parse_segment_key (segment_key : SegmentKey ) -> tuple [bytes , bytes , bytes ]:
98
+ def parse_segment_key (segment_key : SegmentKey ) -> tuple [int , bytes , bytes , bytes ]:
99
99
segment_key_parts = segment_key .split (b":" )
100
- project_id = segment_key_parts [2 ][1 :]
101
- trace_id = segment_key_parts [3 ][:- 1 ]
102
- span_id = segment_key_parts [4 ]
100
+ partition = int (segment_key_parts [2 ][1 :- 1 ])
101
+ project_id = segment_key_parts [3 ]
102
+ trace_id = segment_key_parts [4 ]
103
+ span_id = segment_key_parts [5 ]
103
104
104
- return project_id , trace_id , span_id
105
+ return partition , project_id , trace_id , span_id
105
106
106
107
107
108
def get_redis_client () -> RedisCluster [bytes ] | StrictRedis [bytes ]:
@@ -113,6 +114,7 @@ def get_redis_client() -> RedisCluster[bytes] | StrictRedis[bytes]:
113
114
114
115
# NamedTuples are faster to construct than dataclasses
115
116
class Span (NamedTuple ):
117
+ partition : int
116
118
trace_id : str
117
119
span_id : str
118
120
parent_span_id : str | None
@@ -153,8 +155,8 @@ def client(self) -> RedisCluster[bytes] | StrictRedis[bytes]:
153
155
def __reduce__ (self ):
154
156
return (SpansBuffer , (self .assigned_shards ,))
155
157
156
- def _get_span_key (self , project_and_trace : str , span_id : str ) -> bytes :
157
- return f"span-buf:s:{{{ project_and_trace } } }:{ span_id } " .encode ("ascii" )
158
+ def _get_span_key (self , partition : int , project_and_trace : str , span_id : str ) -> bytes :
159
+ return f"span-buf:s:{{{ partition } }}: { project_and_trace } :{ span_id } " .encode ("ascii" )
158
160
159
161
def process_spans (self , spans : Sequence [Span ], now : int ):
160
162
"""
@@ -176,8 +178,8 @@ def process_spans(self, spans: Sequence[Span], now: int):
176
178
trees = self ._group_by_parent (spans )
177
179
178
180
with self .client .pipeline (transaction = False ) as p :
179
- for (project_and_trace , parent_span_id ), subsegment in trees .items ():
180
- set_key = self ._get_span_key (project_and_trace , parent_span_id )
181
+ for (partition , project_and_trace , parent_span_id ), subsegment in trees .items ():
182
+ set_key = self ._get_span_key (partition , project_and_trace , parent_span_id )
181
183
p .sadd (set_key , * [span .payload for span in subsegment ])
182
184
183
185
p .execute ()
@@ -189,11 +191,12 @@ def process_spans(self, spans: Sequence[Span], now: int):
189
191
add_buffer_sha = self ._ensure_script ()
190
192
191
193
with self .client .pipeline (transaction = False ) as p :
192
- for (project_and_trace , parent_span_id ), subsegment in trees .items ():
194
+ for (partition , project_and_trace , parent_span_id ), subsegment in trees .items ():
193
195
p .execute_command (
194
196
"EVALSHA" ,
195
197
add_buffer_sha ,
196
198
1 ,
199
+ partition ,
197
200
project_and_trace ,
198
201
len (subsegment ),
199
202
parent_span_id ,
@@ -203,7 +206,7 @@ def process_spans(self, spans: Sequence[Span], now: int):
203
206
)
204
207
205
208
is_root_span_count += sum (span .is_segment_span for span in subsegment )
206
- result_meta .append ((project_and_trace , parent_span_id ))
209
+ result_meta .append ((partition , project_and_trace , parent_span_id ))
207
210
208
211
results = p .execute ()
209
212
@@ -213,14 +216,10 @@ def process_spans(self, spans: Sequence[Span], now: int):
213
216
214
217
assert len (result_meta ) == len (results )
215
218
216
- for (project_and_trace , parent_span_id ), result in zip (result_meta , results ):
219
+ for (partition , project_and_trace , parent_span_id ), result in zip (result_meta , results ):
217
220
redirect_depth , set_key , has_root_span = result
218
221
219
- shard = self .assigned_shards [
220
- int (project_and_trace .split (":" )[1 ], 16 ) % len (self .assigned_shards )
221
- ]
222
- queue_key = self ._get_queue_key (shard )
223
-
222
+ queue_key = self ._get_queue_key (partition )
224
223
min_redirect_depth = min (min_redirect_depth , redirect_depth )
225
224
max_redirect_depth = max (max_redirect_depth , redirect_depth )
226
225
@@ -235,10 +234,11 @@ def process_spans(self, spans: Sequence[Span], now: int):
235
234
zadd_items = queue_adds .setdefault (queue_key , {})
236
235
zadd_items [set_key ] = now + offset
237
236
238
- subsegment_spans = trees [project_and_trace , parent_span_id ]
237
+ subsegment_spans = trees [partition , project_and_trace , parent_span_id ]
239
238
delete_set = queue_deletes .setdefault (queue_key , set ())
240
239
delete_set .update (
241
- self ._get_span_key (project_and_trace , span .span_id ) for span in subsegment_spans
240
+ self ._get_span_key (partition , project_and_trace , span .span_id )
241
+ for span in subsegment_spans
242
242
)
243
243
delete_set .discard (set_key )
244
244
@@ -271,7 +271,7 @@ def _ensure_script(self):
271
271
def _get_queue_key (self , shard : int ) -> bytes :
272
272
return f"span-buf:q:{ shard } " .encode ("ascii" )
273
273
274
- def _group_by_parent (self , spans : Sequence [Span ]) -> dict [tuple [str , str ], list [Span ]]:
274
+ def _group_by_parent (self , spans : Sequence [Span ]) -> dict [tuple [int , str , str ], list [Span ]]:
275
275
"""
276
276
Groups partial trees of spans by their top-most parent span ID in the
277
277
provided list. The result is a dictionary where the keys identify a
@@ -282,7 +282,7 @@ def _group_by_parent(self, spans: Sequence[Span]) -> dict[tuple[str, str], list[
282
282
:return: Dictionary of grouped spans. The key is a tuple of
283
283
the `project_and_trace`, and the `parent_span_id`.
284
284
"""
285
- trees : dict [tuple [str , str ], list [Span ]] = {}
285
+ trees : dict [tuple [int , str , str ], list [Span ]] = {}
286
286
redirects : dict [str , dict [str , str ]] = {}
287
287
288
288
for span in spans :
@@ -293,9 +293,9 @@ def _group_by_parent(self, spans: Sequence[Span]) -> dict[tuple[str, str], list[
293
293
while redirect := trace_redirects .get (parent ):
294
294
parent = redirect
295
295
296
- subsegment = trees .setdefault ((project_and_trace , parent ), [])
296
+ subsegment = trees .setdefault ((span . partition , project_and_trace , parent ), [])
297
297
if parent != span .span_id :
298
- subsegment .extend (trees .pop ((project_and_trace , span .span_id ), []))
298
+ subsegment .extend (trees .pop ((span . partition , project_and_trace , span .span_id ), []))
299
299
trace_redirects [span .span_id ] = parent
300
300
subsegment .append (span )
301
301
@@ -471,8 +471,12 @@ def done_flush_segments(self, segment_keys: dict[SegmentKey, FlushedSegment]):
471
471
p .delete (hrs_key )
472
472
p .unlink (segment_key )
473
473
474
- project_id , trace_id , _ = parse_segment_key (segment_key )
475
- redirect_map_key = b"span-buf:sr:{%s:%s}" % (project_id , trace_id )
474
+ partition , project_id , trace_id , _ = parse_segment_key (segment_key )
475
+ redirect_map_key = b"span-buf:sr:{%d}:%s:%s" % (
476
+ partition ,
477
+ project_id ,
478
+ trace_id ,
479
+ )
476
480
p .zrem (flushed_segment .queue_key , segment_key )
477
481
478
482
for span_batch in itertools .batched (flushed_segment .spans , 100 ):
0 commit comments