Skip to content

Commit 274da69

Browse files
committed
feat: add comprehensive debug logging for streaming responses
Added extensive debug logging throughout the response streaming pipeline to diagnose issues with missing input, output, and token usage data. Debug logging added to: - process_input: Log input type and value - _process_chunk: Log chunk details, text accumulation, and usage capture - _process_complete_response: Log final state before setting span attributes - set_data_attributes: Log what's being set as prompts and completions - Stream initialization: Log kwargs and processed input data This will help identify where data is being lost in the streaming pipeline.
1 parent 935e51c commit 274da69

File tree

1 file changed

+32
-9
lines changed
  • packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/v1

1 file changed

+32
-9
lines changed

packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/v1/responses_wrappers.py

Lines changed: 32 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ def process_input(inp: ResponseInputParam) -> ResponseInputParam:
102102
Process input parameters for OpenAI Responses API.
103103
Ensures list inputs have proper type annotations for each item.
104104
"""
105+
logger.debug(f"process_input called with type: {type(inp)}, value: {inp[:100] if isinstance(inp, str) else inp}")
105106
if not isinstance(inp, list):
106107
return inp
107108
return [prepare_input_param(item) for item in inp]
@@ -366,6 +367,7 @@ def _process_chunk(self, chunk):
366367

367368
try:
368369
parsed_chunk = parse_response(chunk)
370+
logger.debug(f"ResponseStream chunk - id: {getattr(parsed_chunk, 'id', None)}, has_output: {hasattr(parsed_chunk, 'output')}, has_usage: {hasattr(parsed_chunk, 'usage')}")
369371

370372
# Update response_id if it becomes available
371373
if parsed_chunk.id and not self._traced_data.response_id:
@@ -382,6 +384,7 @@ def _process_chunk(self, chunk):
382384

383385
if hasattr(parsed_chunk, 'usage') and parsed_chunk.usage:
384386
self._traced_data.usage = parsed_chunk.usage
387+
logger.debug(f"ResponseStream got usage data: {parsed_chunk.usage}")
385388

386389
if hasattr(parsed_chunk, 'model') and parsed_chunk.model:
387390
self._traced_data.response_model = parsed_chunk.model
@@ -391,6 +394,7 @@ def _process_chunk(self, chunk):
391394
if self._traced_data.output_text is None:
392395
self._traced_data.output_text = ""
393396
self._traced_data.output_text += parsed_chunk.output_text
397+
logger.debug(f"ResponseStream accumulated text from output_text: {len(parsed_chunk.output_text)} chars, total: {len(self._traced_data.output_text)}")
394398
else:
395399
# Try to extract and accumulate text from output blocks
396400
try:
@@ -402,8 +406,9 @@ def _process_chunk(self, chunk):
402406
if self._traced_data.output_text is None:
403407
self._traced_data.output_text = ""
404408
self._traced_data.output_text += content_item.text
405-
except Exception:
406-
pass
409+
logger.debug(f"ResponseStream accumulated text from content: {len(content_item.text)} chars, total: {len(self._traced_data.output_text)}")
410+
except Exception as e:
411+
logger.debug(f"ResponseStream error extracting text: {e}")
407412

408413
# Update global dict with latest data
409414
if self._traced_data.response_id:
@@ -429,6 +434,7 @@ def _process_complete_response(self):
429434
Process the complete streaming response.
430435
Sets final span attributes, records metrics, and ends the span.
431436
"""
437+
logger.debug(f"ResponseStream _process_complete_response - input: {self._traced_data.input}, output_text: {self._traced_data.output_text[:100] if self._traced_data.output_text else None}, usage: {self._traced_data.usage}")
432438
if self._span and self._span.is_recording():
433439
set_data_attributes(self._traced_data, self._span)
434440

@@ -514,6 +520,8 @@ def set_data_attributes(traced_response: TracedData, span: Span):
514520
Set OpenTelemetry span attributes from traced response data.
515521
Includes model info, usage stats, prompts, and completions.
516522
"""
523+
logger.debug(f"set_data_attributes - input: {traced_response.input}, output_text: {traced_response.output_text[:100] if traced_response.output_text else None}")
524+
logger.debug(f"set_data_attributes - usage: {traced_response.usage}")
517525
_set_span_attribute(span, GEN_AI_SYSTEM, "openai")
518526
_set_span_attribute(span, GEN_AI_REQUEST_MODEL, traced_response.request_model)
519527
_set_span_attribute(span, GEN_AI_RESPONSE_ID, traced_response.response_id)
@@ -606,12 +614,14 @@ def set_data_attributes(traced_response: TracedData, span: Span):
606614
prompt_index += 1
607615

608616
if isinstance(traced_response.input, str):
617+
logger.debug(f"Setting prompt as string: {traced_response.input[:100]}")
609618
_set_span_attribute(
610619
span, f"{GEN_AI_PROMPT}.{prompt_index}.content", traced_response.input
611620
)
612621
_set_span_attribute(span, f"{GEN_AI_PROMPT}.{prompt_index}.role", "user")
613622
prompt_index += 1
614-
else:
623+
elif traced_response.input:
624+
logger.debug(f"Setting prompt as list with {len(traced_response.input) if traced_response.input else 0} items")
615625
for block in traced_response.input:
616626
block_dict = model_as_dict(block)
617627
if block_dict.get("type", "message") == "message":
@@ -675,12 +685,17 @@ def set_data_attributes(traced_response: TracedData, span: Span):
675685
)
676686
prompt_index += 1
677687
# TODO: handle other block types
688+
else:
689+
logger.debug(f"Input is neither string nor list: {type(traced_response.input)}")
678690

679691
_set_span_attribute(span, f"{GEN_AI_COMPLETION}.0.role", "assistant")
680692
if traced_response.output_text:
693+
logger.debug(f"Setting completion content: {traced_response.output_text[:100]}")
681694
_set_span_attribute(
682695
span, f"{GEN_AI_COMPLETION}.0.content", traced_response.output_text
683696
)
697+
else:
698+
logger.debug("No output_text to set as completion content")
684699
tool_call_index = 0
685700
if traced_response.output_blocks:
686701
for block in traced_response.output_blocks.values():
@@ -813,12 +828,16 @@ def responses_get_or_create_wrapper(
813828
if response_id and response_id in responses:
814829
existing_data = responses[response_id].model_dump()
815830

831+
input_data = process_input(
832+
kwargs.get("input", existing_data.get("input", []))
833+
)
834+
logger.debug(f"SyncResponseStream init - input_data: {input_data}")
835+
logger.debug(f"SyncResponseStream init - kwargs keys: {kwargs.keys()}")
836+
816837
traced_data = TracedData(
817838
start_time=time.time_ns(), # Use nanoseconds for TracedData
818839
response_id=response_id or "",
819-
input=process_input(
820-
kwargs.get("input", existing_data.get("input", []))
821-
),
840+
input=input_data,
822841
instructions=kwargs.get(
823842
"instructions", existing_data.get("instructions")
824843
),
@@ -972,12 +991,16 @@ async def async_responses_get_or_create_wrapper(
972991
if response_id and response_id in responses:
973992
existing_data = responses[response_id].model_dump()
974993

994+
input_data = process_input(
995+
kwargs.get("input", existing_data.get("input", []))
996+
)
997+
logger.debug(f"AsyncResponseStream init - input_data: {input_data}")
998+
logger.debug(f"AsyncResponseStream init - kwargs keys: {kwargs.keys()}")
999+
9751000
traced_data = TracedData(
9761001
start_time=time.time_ns(), # Use nanoseconds for TracedData
9771002
response_id=response_id or "",
978-
input=process_input(
979-
kwargs.get("input", existing_data.get("input", []))
980-
),
1003+
input=input_data,
9811004
instructions=kwargs.get(
9821005
"instructions", existing_data.get("instructions", "")
9831006
),

0 commit comments

Comments
 (0)