Skip to content

Commit 33a752a

Browse files
committed
fix: Assign correct partition number to span
1 parent 72db2fe commit 33a752a

File tree

1 file changed

+9
-7
lines changed

1 file changed

+9
-7
lines changed

src/sentry/spans/consumers/process/factory.py

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ def create_with_partitions(
6464
committer = CommitOffsets(commit)
6565

6666
buffer = SpansBuffer(assigned_shards=[p.index for p in partitions])
67+
first_partition = next((p.index for p in partitions), 0)
6768

6869
# patch onto self just for testing
6970
flusher: ProcessingStrategy[FilteredPayload | int]
@@ -75,7 +76,7 @@ def create_with_partitions(
7576

7677
if self.num_processes != 1:
7778
run_task = run_task_with_multiprocessing(
78-
function=partial(process_batch, buffer),
79+
function=partial(process_batch, buffer, first_partition),
7980
next_step=flusher,
8081
max_batch_size=self.max_batch_size,
8182
max_batch_time=self.max_batch_time,
@@ -85,7 +86,7 @@ def create_with_partitions(
8586
)
8687
else:
8788
run_task = RunTask(
88-
function=partial(process_batch, buffer),
89+
function=partial(process_batch, buffer, first_partition),
8990
next_step=flusher,
9091
)
9192

@@ -119,7 +120,9 @@ def shutdown(self) -> None:
119120

120121

121122
def process_batch(
122-
buffer: SpansBuffer, values: Message[ValuesBatch[tuple[int, KafkaPayload]]]
123+
buffer: SpansBuffer,
124+
first_partition: int,
125+
values: Message[ValuesBatch[tuple[int, KafkaPayload]]],
123126
) -> int:
124127
min_timestamp = None
125128
spans = []
@@ -130,10 +133,9 @@ def process_batch(
130133

131134
val = rapidjson.loads(payload.value)
132135

133-
partition_id: int | None = None
134-
136+
partition_id: int = first_partition
135137
if len(value.committable) == 1:
136-
partition_id = value.committable[next(iter(value.committable))]
138+
partition_id = next(iter(value.committable)).index
137139

138140
if killswitches.killswitch_matches_context(
139141
"spans.drop-in-buffer",
@@ -147,7 +149,7 @@ def process_batch(
147149
continue
148150

149151
span = Span(
150-
partition=partition_id if partition_id is not None else -1, # TODO: Fallback OK?
152+
partition=partition_id,
151153
trace_id=val["trace_id"],
152154
span_id=val["span_id"],
153155
parent_span_id=val.get("parent_span_id"),

0 commit comments

Comments
 (0)