Skip to content

feat: Kafka FUP Phase I and Phase II #718

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,18 +52,19 @@ dependencies = [
"opentelemetry-api>=1.27.0",
"opentelemetry-semantic-conventions>=0.48b0",
"typing_extensions>=4.12.2",
"pyyaml>=6.0.2",
]

[project.entry-points."instana"]
string = "instana:load"

[project.optional-dependencies]
dev = [
"pytest",
"pytest-cov",
"pytest-mock",
"pre-commit>=3.0.0",
"ruff"
"pytest",
"pytest-cov",
"pytest-mock",
"pre-commit>=3.0.0",
"ruff",
]

[project.urls]
Expand Down
42 changes: 30 additions & 12 deletions src/instana/agent/host.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from instana.options import StandardOptions
from instana.util import to_json
from instana.util.runtime import get_py_source
from instana.util.span_utils import get_operation_specifier
from instana.util.span_utils import get_operation_specifiers
from instana.version import VERSION


Expand Down Expand Up @@ -351,13 +351,18 @@ def filter_spans(self, spans: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
Filters given span list using ignore-endpoint variable and returns the list of filtered spans.
"""
filtered_spans = []
endpoint = ""
for span in spans:
if (hasattr(span, "n") or hasattr(span, "name")) and hasattr(span, "data"):
service = span.n
operation_specifier = get_operation_specifier(service)
endpoint = span.data[service][operation_specifier]
if isinstance(endpoint, str) and self.__is_service_or_endpoint_ignored(
service, endpoint
operation_specifier_key, service_specifier_key = (
get_operation_specifiers(service)
)
if service == "kafka":
endpoint = span.data[service][service_specifier_key]
method = span.data[service][operation_specifier_key]
if isinstance(method, str) and self.__is_endpoint_ignored(
service, method, endpoint
):
continue
else:
Expand All @@ -366,15 +371,28 @@ def filter_spans(self, spans: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
filtered_spans.append(span)
return filtered_spans

def __is_service_or_endpoint_ignored(
self, service: str, endpoint: str = ""
def __is_endpoint_ignored(
self,
service: str,
method: str = "",
endpoint: str = "",
) -> bool:
"""Check if the given service and endpoint combination should be ignored."""

return (
service.lower() in self.options.ignore_endpoints
or f"{service.lower()}.{endpoint.lower()}" in self.options.ignore_endpoints
)
service = service.lower()
method = method.lower()
endpoint = endpoint.lower()
filter_rules = [
f"{service}.{method}", # service.method
f"{service}.*", # service.*
]

if service == "kafka" and endpoint:
filter_rules += [
f"{service}.{method}.{endpoint}", # service.method.endpoint
f"{service}.*.{endpoint}", # service.*.endpoint
f"{service}.{method}.*", # service.method.*
]
return any(rule in self.options.ignore_endpoints for rule in filter_rules)

def handle_agent_tasks(self, task: Dict[str, Any]) -> None:
"""
Expand Down
13 changes: 12 additions & 1 deletion src/instana/instrumentation/kafka/confluent_kafka_python.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ def trace_kafka_produce(

tracer, parent_span, _ = get_tracer_tuple()
parent_context = parent_span.get_span_context() if parent_span else None
is_suppressed = tracer.exporter._HostAgent__is_endpoint_ignored(
"kafka",
"produce",
args[0],
)

with tracer.start_as_current_span(
"kafka-producer", span_context=parent_context, kind=SpanKind.PRODUCER
Expand All @@ -73,15 +78,21 @@ def trace_kafka_produce(
# dictionary. To maintain compatibility with the headers for the
# Kafka Python library, we will use a list of tuples.
headers = args[6] if len(args) > 6 else kwargs.get("headers", [])
suppression_header = {"x_instana_l_s": "0" if is_suppressed else "1"}
headers.append(suppression_header)

tracer.inject(
span.context,
Format.KAFKA_HEADERS,
headers,
disable_w3c_trace_context=True,
)

try:
headers.remove(suppression_header)

if tracer.exporter.options.kafka_trace_correlation:
kwargs["headers"] = headers
try:
res = wrapped(*args, **kwargs)
except Exception as exc:
span.record_exception(exc)
Expand Down
14 changes: 12 additions & 2 deletions src/instana/instrumentation/kafka/kafka_python.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,11 @@ def trace_kafka_send(

tracer, parent_span, _ = get_tracer_tuple()
parent_context = parent_span.get_span_context() if parent_span else None

is_suppressed = tracer.exporter._HostAgent__is_endpoint_ignored(
"kafka",
"send",
args[0],
)
with tracer.start_as_current_span(
"kafka-producer", span_context=parent_context, kind=SpanKind.PRODUCER
) as span:
Expand All @@ -39,15 +43,21 @@ def trace_kafka_send(

# context propagation
headers = kwargs.get("headers", [])
suppression_header = {"x_instana_l_s": "0" if is_suppressed else "1"}
headers.append(suppression_header)

tracer.inject(
span.context,
Format.KAFKA_HEADERS,
headers,
disable_w3c_trace_context=True,
)

try:
headers.remove(suppression_header)

if tracer.exporter.options.kafka_trace_correlation:
kwargs["headers"] = headers
try:
res = wrapped(*args, **kwargs)
except Exception as exc:
span.record_exception(exc)
Expand Down
112 changes: 81 additions & 31 deletions src/instana/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@
from typing import Any, Dict

from instana.log import logger
from instana.util.config import parse_ignored_endpoints
from instana.util.config import (
parse_ignored_endpoints,
parse_ignored_endpoints_from_yaml,
)
from instana.util.runtime import determine_service_name
from instana.configurator import config

Expand All @@ -34,31 +37,9 @@ def __init__(self, **kwds: Dict[str, Any]) -> None:
self.extra_http_headers = None
self.allow_exit_as_root = False
self.ignore_endpoints = []
self.kafka_trace_correlation = True

if "INSTANA_DEBUG" in os.environ:
self.log_level = logging.DEBUG
self.debug = True

if "INSTANA_EXTRA_HTTP_HEADERS" in os.environ:
self.extra_http_headers = (
str(os.environ["INSTANA_EXTRA_HTTP_HEADERS"]).lower().split(";")
)

if "INSTANA_IGNORE_ENDPOINTS" in os.environ:
self.ignore_endpoints = parse_ignored_endpoints(
os.environ["INSTANA_IGNORE_ENDPOINTS"]
)
else:
if (
isinstance(config.get("tracing"), dict)
and "ignore_endpoints" in config["tracing"]
):
self.ignore_endpoints = parse_ignored_endpoints(
config["tracing"]["ignore_endpoints"],
)

if os.environ.get("INSTANA_ALLOW_EXIT_AS_ROOT", None) == "1":
self.allow_exit_as_root = True
self.set_trace_configurations()

# Defaults
self.secrets_matcher = "contains-ignore-case"
Expand All @@ -79,6 +60,56 @@ def __init__(self, **kwds: Dict[str, Any]) -> None:

self.__dict__.update(kwds)

def set_trace_configurations(self) -> None:
"""
Set tracing configurations from the environment variables and config file.
@return: None
"""
# Use self.configurations to not read local configuration file
# in set_tracing method
if "INSTANA_DEBUG" in os.environ:
self.log_level = logging.DEBUG
self.debug = True

if "INSTANA_EXTRA_HTTP_HEADERS" in os.environ:
self.extra_http_headers = (
str(os.environ["INSTANA_EXTRA_HTTP_HEADERS"]).lower().split(";")
)

if "1" in [
os.environ.get("INSTANA_ALLOW_EXIT_AS_ROOT", None), # deprecated
os.environ.get("INSTANA_ALLOW_ROOT_EXIT_SPAN", None),
]:
self.allow_exit_as_root = True

# The priority is as follows:
# environment variables > in-code configuration >
# > agent config (configuration.yaml) > default value
if "INSTANA_IGNORE_ENDPOINTS" in os.environ:
self.ignore_endpoints = parse_ignored_endpoints(
os.environ["INSTANA_IGNORE_ENDPOINTS"]
)
elif "INSTANA_IGNORE_ENDPOINTS_PATH" in os.environ:
self.ignore_endpoints = parse_ignored_endpoints_from_yaml(
os.environ["INSTANA_IGNORE_ENDPOINTS_PATH"]
)
elif (
isinstance(config.get("tracing"), dict)
and "ignore_endpoints" in config["tracing"]
):
self.ignore_endpoints = parse_ignored_endpoints(
config["tracing"]["ignore_endpoints"],
)

if "INSTANA_KAFKA_TRACE_CORRELATION" in os.environ:
self.kafka_trace_correlation = (
os.environ["INSTANA_KAFKA_TRACE_CORRELATION"].lower() == "true"
)
elif isinstance(config.get("tracing"), dict) and "kafka" in config["tracing"]:
self.kafka_trace_correlation = config["tracing"]["kafka"].get(
"trace_correlation", True
)


class StandardOptions(BaseOptions):
"""The options class used when running directly on a host/node with an Instana agent"""
Expand Down Expand Up @@ -124,12 +155,30 @@ def set_tracing(self, tracing: Dict[str, Any]) -> None:
@param tracing: tracing configuration dictionary
@return: None
"""
if (
"ignore-endpoints" in tracing
and "INSTANA_IGNORE_ENDPOINTS" not in os.environ
and "tracing" not in config
):
if "ignore-endpoints" in tracing and not self.ignore_endpoints:
self.ignore_endpoints = parse_ignored_endpoints(tracing["ignore-endpoints"])

if "kafka" in tracing:
if (
"INSTANA_KAFKA_TRACE_CORRELATION" not in os.environ
and not (
isinstance(config.get("tracing"), dict)
and "kafka" in config["tracing"]
)
and "trace-correlation" in tracing["kafka"]
):
self.kafka_trace_correlation = (
str(tracing["kafka"].get("trace-correlation", True)) == "true"
)

if (
"header-format" in tracing["kafka"]
and tracing["kafka"]["header-format"] == "binary"
):
logger.warning(
"Binary header format for Kafka is deprecated. Please use string header format."
)

if "extra-http-headers" in tracing:
self.extra_http_headers = tracing["extra-http-headers"]

Expand All @@ -141,13 +190,14 @@ def set_from(self, res_data: Dict[str, Any]) -> None:
"""
if not res_data or not isinstance(res_data, dict):
logger.debug(f"options.set_from: Wrong data type - {type(res_data)}")
return
return

if "secrets" in res_data:
self.set_secrets(res_data["secrets"])

if "tracing" in res_data:
self.set_tracing(res_data["tracing"])

else:
if "extraHeaders" in res_data:
self.set_extra_headers(res_data["extraHeaders"])
Expand Down
37 changes: 17 additions & 20 deletions src/instana/propagators/kafka_propagator.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ def extract(
disable_w3c_trace_context=disable_w3c_trace_context,
)

except Exception:
logger.debug("kafka_propagator extract error:", exc_info=True)
except Exception as e:
logger.debug(f"kafka_propagator extract error: {e}", exc_info=True)

# Assisted by watsonx Code Assistant
def inject(
Expand All @@ -98,15 +98,12 @@ def inject(
span_id = span_context.span_id
dictionary_carrier = self.extract_carrier_headers(carrier)

suppression_level = 1
if dictionary_carrier:
# Suppression `level` made in the child context or in the parent context
# has priority over any non-suppressed `level` setting
child_level = int(
self.extract_instana_headers(dictionary_carrier)[2] or "1"
)
span_context.level = min(child_level, span_context.level)

serializable_level = str(span_context.level)
suppression_level = int(self.extract_instana_headers(dictionary_carrier)[2])
span_context.level = min(suppression_level, span_context.level)

def inject_key_value(carrier, key, value):
if isinstance(carrier, list):
Expand All @@ -122,18 +119,18 @@ def inject_key_value(carrier, key, value):
inject_key_value(
carrier,
self.KAFKA_HEADER_KEY_L_S,
serializable_level.encode("utf-8"),
)
inject_key_value(
carrier,
self.KAFKA_HEADER_KEY_T,
hex_id_limited(trace_id).encode("utf-8"),
str(suppression_level).encode("utf-8"),
)
inject_key_value(
carrier,
self.KAFKA_HEADER_KEY_S,
format_span_id(span_id).encode("utf-8"),
)

if suppression_level == 1:
inject_key_value(
carrier,
self.KAFKA_HEADER_KEY_T,
hex_id_limited(trace_id).encode("utf-8"),
)
inject_key_value(
carrier,
self.KAFKA_HEADER_KEY_S,
format_span_id(span_id).encode("utf-8"),
)
except Exception:
logger.debug("KafkaPropagator - inject error:", exc_info=True)
Loading
Loading