diff --git a/pyproject.toml b/pyproject.toml index 1f12cb6e..31c184b2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -52,6 +52,7 @@ dependencies = [ "opentelemetry-api>=1.27.0", "opentelemetry-semantic-conventions>=0.48b0", "typing_extensions>=4.12.2", + "pyyaml>=6.0.2", ] [project.entry-points."instana"] @@ -59,11 +60,11 @@ string = "instana:load" [project.optional-dependencies] dev = [ - "pytest", - "pytest-cov", - "pytest-mock", - "pre-commit>=3.0.0", - "ruff" + "pytest", + "pytest-cov", + "pytest-mock", + "pre-commit>=3.0.0", + "ruff", ] [project.urls] diff --git a/src/instana/agent/host.py b/src/instana/agent/host.py index 8e03833b..ee0e1d79 100644 --- a/src/instana/agent/host.py +++ b/src/instana/agent/host.py @@ -22,7 +22,7 @@ from instana.options import StandardOptions from instana.util import to_json from instana.util.runtime import get_py_source -from instana.util.span_utils import get_operation_specifier +from instana.util.span_utils import get_operation_specifiers from instana.version import VERSION @@ -351,13 +351,18 @@ def filter_spans(self, spans: List[Dict[str, Any]]) -> List[Dict[str, Any]]: Filters given span list using ignore-endpoint variable and returns the list of filtered spans. """ filtered_spans = [] + endpoint = "" for span in spans: if (hasattr(span, "n") or hasattr(span, "name")) and hasattr(span, "data"): service = span.n - operation_specifier = get_operation_specifier(service) - endpoint = span.data[service][operation_specifier] - if isinstance(endpoint, str) and self.__is_service_or_endpoint_ignored( - service, endpoint + operation_specifier_key, service_specifier_key = ( + get_operation_specifiers(service) + ) + if service == "kafka": + endpoint = span.data[service][service_specifier_key] + method = span.data[service][operation_specifier_key] + if isinstance(method, str) and self.__is_endpoint_ignored( + service, method, endpoint ): continue else: @@ -366,15 +371,28 @@ def filter_spans(self, spans: List[Dict[str, Any]]) -> List[Dict[str, Any]]: filtered_spans.append(span) return filtered_spans - def __is_service_or_endpoint_ignored( - self, service: str, endpoint: str = "" + def __is_endpoint_ignored( + self, + service: str, + method: str = "", + endpoint: str = "", ) -> bool: """Check if the given service and endpoint combination should be ignored.""" - - return ( - service.lower() in self.options.ignore_endpoints - or f"{service.lower()}.{endpoint.lower()}" in self.options.ignore_endpoints - ) + service = service.lower() + method = method.lower() + endpoint = endpoint.lower() + filter_rules = [ + f"{service}.{method}", # service.method + f"{service}.*", # service.* + ] + + if service == "kafka" and endpoint: + filter_rules += [ + f"{service}.{method}.{endpoint}", # service.method.endpoint + f"{service}.*.{endpoint}", # service.*.endpoint + f"{service}.{method}.*", # service.method.* + ] + return any(rule in self.options.ignore_endpoints for rule in filter_rules) def handle_agent_tasks(self, task: Dict[str, Any]) -> None: """ diff --git a/src/instana/instrumentation/kafka/confluent_kafka_python.py b/src/instana/instrumentation/kafka/confluent_kafka_python.py index 9c5d1194..04b1164c 100644 --- a/src/instana/instrumentation/kafka/confluent_kafka_python.py +++ b/src/instana/instrumentation/kafka/confluent_kafka_python.py @@ -58,6 +58,11 @@ def trace_kafka_produce( tracer, parent_span, _ = get_tracer_tuple() parent_context = parent_span.get_span_context() if parent_span else None + is_suppressed = tracer.exporter._HostAgent__is_endpoint_ignored( + "kafka", + "produce", + args[0], + ) with tracer.start_as_current_span( "kafka-producer", span_context=parent_context, kind=SpanKind.PRODUCER @@ -73,6 +78,9 @@ def trace_kafka_produce( # dictionary. To maintain compatibility with the headers for the # Kafka Python library, we will use a list of tuples. headers = args[6] if len(args) > 6 else kwargs.get("headers", []) + suppression_header = {"x_instana_l_s": "0" if is_suppressed else "1"} + headers.append(suppression_header) + tracer.inject( span.context, Format.KAFKA_HEADERS, @@ -80,8 +88,11 @@ def trace_kafka_produce( disable_w3c_trace_context=True, ) - try: + headers.remove(suppression_header) + + if tracer.exporter.options.kafka_trace_correlation: kwargs["headers"] = headers + try: res = wrapped(*args, **kwargs) except Exception as exc: span.record_exception(exc) diff --git a/src/instana/instrumentation/kafka/kafka_python.py b/src/instana/instrumentation/kafka/kafka_python.py index ad26ec0e..278390f9 100644 --- a/src/instana/instrumentation/kafka/kafka_python.py +++ b/src/instana/instrumentation/kafka/kafka_python.py @@ -30,7 +30,11 @@ def trace_kafka_send( tracer, parent_span, _ = get_tracer_tuple() parent_context = parent_span.get_span_context() if parent_span else None - + is_suppressed = tracer.exporter._HostAgent__is_endpoint_ignored( + "kafka", + "send", + args[0], + ) with tracer.start_as_current_span( "kafka-producer", span_context=parent_context, kind=SpanKind.PRODUCER ) as span: @@ -39,6 +43,9 @@ def trace_kafka_send( # context propagation headers = kwargs.get("headers", []) + suppression_header = {"x_instana_l_s": "0" if is_suppressed else "1"} + headers.append(suppression_header) + tracer.inject( span.context, Format.KAFKA_HEADERS, @@ -46,8 +53,11 @@ def trace_kafka_send( disable_w3c_trace_context=True, ) - try: + headers.remove(suppression_header) + + if tracer.exporter.options.kafka_trace_correlation: kwargs["headers"] = headers + try: res = wrapped(*args, **kwargs) except Exception as exc: span.record_exception(exc) diff --git a/src/instana/options.py b/src/instana/options.py index dee797d7..da124020 100644 --- a/src/instana/options.py +++ b/src/instana/options.py @@ -19,7 +19,10 @@ from typing import Any, Dict from instana.log import logger -from instana.util.config import parse_ignored_endpoints +from instana.util.config import ( + parse_ignored_endpoints, + parse_ignored_endpoints_from_yaml, +) from instana.util.runtime import determine_service_name from instana.configurator import config @@ -34,31 +37,9 @@ def __init__(self, **kwds: Dict[str, Any]) -> None: self.extra_http_headers = None self.allow_exit_as_root = False self.ignore_endpoints = [] + self.kafka_trace_correlation = True - if "INSTANA_DEBUG" in os.environ: - self.log_level = logging.DEBUG - self.debug = True - - if "INSTANA_EXTRA_HTTP_HEADERS" in os.environ: - self.extra_http_headers = ( - str(os.environ["INSTANA_EXTRA_HTTP_HEADERS"]).lower().split(";") - ) - - if "INSTANA_IGNORE_ENDPOINTS" in os.environ: - self.ignore_endpoints = parse_ignored_endpoints( - os.environ["INSTANA_IGNORE_ENDPOINTS"] - ) - else: - if ( - isinstance(config.get("tracing"), dict) - and "ignore_endpoints" in config["tracing"] - ): - self.ignore_endpoints = parse_ignored_endpoints( - config["tracing"]["ignore_endpoints"], - ) - - if os.environ.get("INSTANA_ALLOW_EXIT_AS_ROOT", None) == "1": - self.allow_exit_as_root = True + self.set_trace_configurations() # Defaults self.secrets_matcher = "contains-ignore-case" @@ -79,6 +60,56 @@ def __init__(self, **kwds: Dict[str, Any]) -> None: self.__dict__.update(kwds) + def set_trace_configurations(self) -> None: + """ + Set tracing configurations from the environment variables and config file. + @return: None + """ + # Use self.configurations to not read local configuration file + # in set_tracing method + if "INSTANA_DEBUG" in os.environ: + self.log_level = logging.DEBUG + self.debug = True + + if "INSTANA_EXTRA_HTTP_HEADERS" in os.environ: + self.extra_http_headers = ( + str(os.environ["INSTANA_EXTRA_HTTP_HEADERS"]).lower().split(";") + ) + + if "1" in [ + os.environ.get("INSTANA_ALLOW_EXIT_AS_ROOT", None), # deprecated + os.environ.get("INSTANA_ALLOW_ROOT_EXIT_SPAN", None), + ]: + self.allow_exit_as_root = True + + # The priority is as follows: + # environment variables > in-code configuration > + # > agent config (configuration.yaml) > default value + if "INSTANA_IGNORE_ENDPOINTS" in os.environ: + self.ignore_endpoints = parse_ignored_endpoints( + os.environ["INSTANA_IGNORE_ENDPOINTS"] + ) + elif "INSTANA_IGNORE_ENDPOINTS_PATH" in os.environ: + self.ignore_endpoints = parse_ignored_endpoints_from_yaml( + os.environ["INSTANA_IGNORE_ENDPOINTS_PATH"] + ) + elif ( + isinstance(config.get("tracing"), dict) + and "ignore_endpoints" in config["tracing"] + ): + self.ignore_endpoints = parse_ignored_endpoints( + config["tracing"]["ignore_endpoints"], + ) + + if "INSTANA_KAFKA_TRACE_CORRELATION" in os.environ: + self.kafka_trace_correlation = ( + os.environ["INSTANA_KAFKA_TRACE_CORRELATION"].lower() == "true" + ) + elif isinstance(config.get("tracing"), dict) and "kafka" in config["tracing"]: + self.kafka_trace_correlation = config["tracing"]["kafka"].get( + "trace_correlation", True + ) + class StandardOptions(BaseOptions): """The options class used when running directly on a host/node with an Instana agent""" @@ -124,12 +155,30 @@ def set_tracing(self, tracing: Dict[str, Any]) -> None: @param tracing: tracing configuration dictionary @return: None """ - if ( - "ignore-endpoints" in tracing - and "INSTANA_IGNORE_ENDPOINTS" not in os.environ - and "tracing" not in config - ): + if "ignore-endpoints" in tracing and not self.ignore_endpoints: self.ignore_endpoints = parse_ignored_endpoints(tracing["ignore-endpoints"]) + + if "kafka" in tracing: + if ( + "INSTANA_KAFKA_TRACE_CORRELATION" not in os.environ + and not ( + isinstance(config.get("tracing"), dict) + and "kafka" in config["tracing"] + ) + and "trace-correlation" in tracing["kafka"] + ): + self.kafka_trace_correlation = ( + str(tracing["kafka"].get("trace-correlation", True)) == "true" + ) + + if ( + "header-format" in tracing["kafka"] + and tracing["kafka"]["header-format"] == "binary" + ): + logger.warning( + "Binary header format for Kafka is deprecated. Please use string header format." + ) + if "extra-http-headers" in tracing: self.extra_http_headers = tracing["extra-http-headers"] @@ -141,13 +190,14 @@ def set_from(self, res_data: Dict[str, Any]) -> None: """ if not res_data or not isinstance(res_data, dict): logger.debug(f"options.set_from: Wrong data type - {type(res_data)}") - return + return if "secrets" in res_data: self.set_secrets(res_data["secrets"]) if "tracing" in res_data: self.set_tracing(res_data["tracing"]) + else: if "extraHeaders" in res_data: self.set_extra_headers(res_data["extraHeaders"]) diff --git a/src/instana/propagators/kafka_propagator.py b/src/instana/propagators/kafka_propagator.py index ad182b13..9ba27940 100644 --- a/src/instana/propagators/kafka_propagator.py +++ b/src/instana/propagators/kafka_propagator.py @@ -73,8 +73,8 @@ def extract( disable_w3c_trace_context=disable_w3c_trace_context, ) - except Exception: - logger.debug("kafka_propagator extract error:", exc_info=True) + except Exception as e: + logger.debug(f"kafka_propagator extract error: {e}", exc_info=True) # Assisted by watsonx Code Assistant def inject( @@ -98,15 +98,12 @@ def inject( span_id = span_context.span_id dictionary_carrier = self.extract_carrier_headers(carrier) + suppression_level = 1 if dictionary_carrier: # Suppression `level` made in the child context or in the parent context # has priority over any non-suppressed `level` setting - child_level = int( - self.extract_instana_headers(dictionary_carrier)[2] or "1" - ) - span_context.level = min(child_level, span_context.level) - - serializable_level = str(span_context.level) + suppression_level = int(self.extract_instana_headers(dictionary_carrier)[2]) + span_context.level = min(suppression_level, span_context.level) def inject_key_value(carrier, key, value): if isinstance(carrier, list): @@ -122,18 +119,18 @@ def inject_key_value(carrier, key, value): inject_key_value( carrier, self.KAFKA_HEADER_KEY_L_S, - serializable_level.encode("utf-8"), - ) - inject_key_value( - carrier, - self.KAFKA_HEADER_KEY_T, - hex_id_limited(trace_id).encode("utf-8"), + str(suppression_level).encode("utf-8"), ) - inject_key_value( - carrier, - self.KAFKA_HEADER_KEY_S, - format_span_id(span_id).encode("utf-8"), - ) - + if suppression_level == 1: + inject_key_value( + carrier, + self.KAFKA_HEADER_KEY_T, + hex_id_limited(trace_id).encode("utf-8"), + ) + inject_key_value( + carrier, + self.KAFKA_HEADER_KEY_S, + format_span_id(span_id).encode("utf-8"), + ) except Exception: logger.debug("KafkaPropagator - inject error:", exc_info=True) diff --git a/src/instana/util/config.py b/src/instana/util/config.py index c8f6d1f9..b53f8177 100644 --- a/src/instana/util/config.py +++ b/src/instana/util/config.py @@ -1,5 +1,11 @@ +# (c) Copyright IBM Corp. 2025 + +import itertools +import os from typing import Any, Dict, List, Union + from instana.log import logger +from instana.util.config_reader import ConfigReader def parse_service_pair(pair: str) -> List[str]: @@ -7,29 +13,29 @@ def parse_service_pair(pair: str) -> List[str]: Parses a pair string to prepare a list of ignored endpoints. @param pair: String format: - - "service1:endpoint1,endpoint2" or "service1:endpoint1" or "service1" - @return: List of strings in format ["service1.endpoint1", "service1.endpoint2", "service2"] + - "service1:method1,method2" or "service1:method1" or "service1" + @return: List of strings in format ["service1.method1", "service1.method2", "service2.*"] """ pair_list = [] if ":" in pair: - service, endpoints = pair.split(":", 1) + service, methods = pair.split(":", 1) service = service.strip() - endpoint_list = [ep.strip() for ep in endpoints.split(",") if ep.strip()] + method_list = [ep.strip() for ep in methods.split(",") if ep.strip()] - for endpoint in endpoint_list: - pair_list.append(f"{service}.{endpoint}") + for method in method_list: + pair_list.append(f"{service}.{method}") else: - pair_list.append(pair) + pair_list.append(f"{pair}.*") return pair_list -def parse_ignored_endpoints_string(params: str) -> List[str]: +def parse_ignored_endpoints_string(params: Union[str, os.PathLike]) -> List[str]: """ Parses a string to prepare a list of ignored endpoints. @param params: String format: - - "service1:endpoint1,endpoint2;service2:endpoint3" or "service1;service2" - @return: List of strings in format ["service1.endpoint1", "service1.endpoint2", "service2"] + - "service1:method1,method2;service2:method3" or "service1;service2" + @return: List of strings in format ["service1.method1", "service1.method2", "service2.*"] """ ignore_endpoints = [] if params: @@ -46,18 +52,45 @@ def parse_ignored_endpoints_dict(params: Dict[str, Any]) -> List[str]: Parses a dictionary to prepare a list of ignored endpoints. @param params: Dict format: - - {"service1": ["endpoint1", "endpoint2"], "service2": ["endpoint3"]} - @return: List of strings in format ["service1.endpoint1", "service1.endpoint2", "service2"] + - {"service1": ["method1", "method2"], "service2": ["method3"]} + @return: List of strings in format ["service1.method1", "service1.method2", "service2.*"] """ ignore_endpoints = [] - for service, endpoints in params.items(): - if not endpoints: # filtering all service - ignore_endpoints.append(service.lower()) + for service, methods in params.items(): + if not methods: # filtering all service + ignore_endpoints.append(f"{service.lower()}.*") else: # filtering specific endpoints - for endpoint in endpoints: - ignore_endpoints.append(f"{service.lower()}.{endpoint.lower()}") + ignore_endpoints = parse_endpoints_of_service( + ignore_endpoints, service, methods + ) + + return ignore_endpoints + +def parse_endpoints_of_service( + ignore_endpoints: List[str], + service: str, + methods: Union[str, List[str]], +) -> List[str]: + """ + Parses endpoints of each service. + + @param ignore_endpoints: A list of rules for endpoints to be filtered. + @param service: The name of the service to be filtered. + @param methods: A list of specific endpoints of the service to be filtered. + """ + if service == "kafka" and isinstance(methods, list): + for rule in methods: + for method, endpoint in itertools.product( + rule["methods"], rule["endpoints"] + ): + ignore_endpoints.append( + f"{service.lower()}.{method.lower()}.{endpoint.lower()}" + ) + else: + for method in methods: + ignore_endpoints.append(f"{service.lower()}.{method.lower()}") return ignore_endpoints @@ -66,9 +99,9 @@ def parse_ignored_endpoints(params: Union[Dict[str, Any], str]) -> List[str]: Parses input to prepare a list for ignored endpoints. @param params: Can be either: - - String: "service1:endpoint1,endpoint2;service2:endpoint3" or "service1;service2" - - Dict: {"service1": ["endpoint1", "endpoint2"], "service2": ["endpoint3"]} - @return: List of strings in format ["service1.endpoint1", "service1.endpoint2", "service2"] + - String: "service1:method1,method2;service2:method3" or "service1;service2" + - Dict: {"service1": ["method1", "method2"], "service2": ["method3"]} + @return: List of strings in format ["service1.method1", "service1.method2", "service2.*"] """ try: if isinstance(params, str): @@ -80,3 +113,28 @@ def parse_ignored_endpoints(params: Union[Dict[str, Any], str]) -> List[str]: except Exception as e: logger.debug("Error parsing ignored endpoints: %s", str(e)) return [] + + +def parse_ignored_endpoints_from_yaml(file_path: str) -> List[str]: + """ + Parses configuration yaml file and prepares a list of ignored endpoints. + + @param file_path: Path of the file as a string + @return: List of strings in format ["service1.method1", "service1.method2", "service2.*", "kafka.method.topic", "kafka.*.topic", "kafka.method.*"] + """ + config_reader = ConfigReader(file_path) + ignore_endpoints_dict = None + if "tracing" in config_reader.data: + ignore_endpoints_dict = config_reader.data["tracing"].get("ignore-endpoints") + elif "com.instana.tracing" in config_reader.data: + logger.warning( + 'Please use "tracing" instead of "com.instana.tracing" for local configuration file.' + ) + ignore_endpoints_dict = config_reader.data["com.instana.tracing"].get( + "ignore-endpoints" + ) + if ignore_endpoints_dict: + ignored_endpoints = parse_ignored_endpoints(ignore_endpoints_dict) + return ignored_endpoints + else: + return [] diff --git a/src/instana/util/config_reader.py b/src/instana/util/config_reader.py new file mode 100644 index 00000000..ddec31ec --- /dev/null +++ b/src/instana/util/config_reader.py @@ -0,0 +1,22 @@ +# (c) Copyright IBM Corp. 2025 + +from typing import Union +from instana.log import logger +import yaml + + +class ConfigReader: + def __init__(self, file_path: Union[str]) -> None: + self.file_path = file_path + self.data = None + self.load_file() + + def load_file(self) -> None: + """Loads and parses the YAML file""" + try: + with open(self.file_path, "r") as file: + self.data = yaml.safe_load(file) + except FileNotFoundError: + logger.error(f"Configuration file has not found: {self.file_path}") + except yaml.YAMLError as e: + logger.error(f"Error parsing YAML file: {e}") diff --git a/src/instana/util/span_utils.py b/src/instana/util/span_utils.py index 34049759..2dda4759 100644 --- a/src/instana/util/span_utils.py +++ b/src/instana/util/span_utils.py @@ -1,13 +1,17 @@ # (c) Copyright IBM Corp. 2025 -from typing import Optional +from typing import Tuple -def get_operation_specifier(span_name: str) -> Optional[str]: +def get_operation_specifiers(span_name: str) -> Tuple[str, str]: """Get the specific operation specifier for the given span.""" - operation_specifier = "" + operation_specifier_key = "" + service_specifier_key = "" if span_name == "redis": - operation_specifier = "command" + operation_specifier_key = "command" elif span_name == "dynamodb": - operation_specifier = "op" - return operation_specifier + operation_specifier_key = "op" + elif span_name == "kafka": + operation_specifier_key = "access" + service_specifier_key = "service" + return operation_specifier_key, service_specifier_key diff --git a/tests/agent/test_host.py b/tests/agent/test_host.py index 4ec2647d..93b89c0a 100644 --- a/tests/agent/test_host.py +++ b/tests/agent/test_host.py @@ -692,31 +692,21 @@ def test_diagnostics(self, caplog: pytest.LogCaptureFixture) -> None: assert "should_send_snapshot_data: True" in caplog.messages def test_is_service_or_endpoint_ignored(self) -> None: - self.agent.options.ignore_endpoints.append("service1") - self.agent.options.ignore_endpoints.append("service2.endpoint1") + self.agent.options.ignore_endpoints.append("service1.*") + self.agent.options.ignore_endpoints.append("service2.method1") # ignore all endpoints of service1 - assert self.agent._HostAgent__is_service_or_endpoint_ignored("service1") - assert self.agent._HostAgent__is_service_or_endpoint_ignored( - "service1", "endpoint1" - ) - assert self.agent._HostAgent__is_service_or_endpoint_ignored( - "service1", "endpoint2" - ) + assert self.agent._HostAgent__is_endpoint_ignored("service1") + assert self.agent._HostAgent__is_endpoint_ignored("service1", "method1") + assert self.agent._HostAgent__is_endpoint_ignored("service1", "method2") # case-insensitive - assert self.agent._HostAgent__is_service_or_endpoint_ignored("SERVICE1") - assert self.agent._HostAgent__is_service_or_endpoint_ignored( - "service1", "ENDPOINT1" - ) + assert self.agent._HostAgent__is_endpoint_ignored("SERVICE1") + assert self.agent._HostAgent__is_endpoint_ignored("service1", "METHOD1") # ignore only endpoint1 of service2 - assert self.agent._HostAgent__is_service_or_endpoint_ignored( - "service2", "endpoint1" - ) - assert not self.agent._HostAgent__is_service_or_endpoint_ignored( - "service2", "endpoint2" - ) + assert self.agent._HostAgent__is_endpoint_ignored("service2", "method1") + assert not self.agent._HostAgent__is_endpoint_ignored("service2", "method2") # don't ignore other services - assert not self.agent._HostAgent__is_service_or_endpoint_ignored("service3") + assert not self.agent._HostAgent__is_endpoint_ignored("service3") diff --git a/tests/clients/boto3/test_boto3_dynamodb.py b/tests/clients/boto3/test_boto3_dynamodb.py index bb427e64..55f09df6 100644 --- a/tests/clients/boto3/test_boto3_dynamodb.py +++ b/tests/clients/boto3/test_boto3_dynamodb.py @@ -94,7 +94,7 @@ def test_ignore_dynamodb(self) -> None: assert dynamodb_span not in filtered_spans def test_ignore_create_table(self) -> None: - os.environ["INSTANA_IGNORE_ENDPOINTS"] = "dynamodb.createtable" + os.environ["INSTANA_IGNORE_ENDPOINTS"] = "dynamodb:createtable" agent.options = StandardOptions() with tracer.start_as_current_span("test"): diff --git a/tests/clients/kafka/test_confluent_kafka.py b/tests/clients/kafka/test_confluent_kafka.py index 0b995e81..235e3937 100644 --- a/tests/clients/kafka/test_confluent_kafka.py +++ b/tests/clients/kafka/test_confluent_kafka.py @@ -1,5 +1,7 @@ # (c) Copyright IBM Corp. 2025 +import os +import time from typing import Generator import pytest @@ -9,9 +11,14 @@ Producer, ) from confluent_kafka.admin import AdminClient, NewTopic +from mock import patch from opentelemetry.trace import SpanKind +from opentelemetry.trace.span import format_span_id +from instana.configurator import config +from instana.options import StandardOptions from instana.singletons import agent, tracer +from instana.util.config import parse_ignored_endpoints_from_yaml from tests.helpers import get_first_span_by_filter, testenv @@ -29,13 +36,28 @@ def _resource(self) -> Generator[None, None, None]: self.kafka_client = AdminClient(self.kafka_config) try: - topics = self.kafka_client.create_topics( # noqa: F841 + _ = self.kafka_client.create_topics( # noqa: F841 [ NewTopic( testenv["kafka_topic"], num_partitions=1, replication_factor=1, ), + NewTopic( + testenv["kafka_topic"] + "_1", + num_partitions=1, + replication_factor=1, + ), + NewTopic( + testenv["kafka_topic"] + "_2", + num_partitions=1, + replication_factor=1, + ), + NewTopic( + testenv["kafka_topic"] + "_3", + num_partitions=1, + replication_factor=1, + ), ] ) except KafkaException: @@ -43,12 +65,21 @@ def _resource(self) -> Generator[None, None, None]: # Kafka producer self.producer = Producer(self.kafka_config) + agent.options = StandardOptions() yield # teardown # Ensure that allow_exit_as_root has the default value""" agent.options.allow_exit_as_root = False # Close connections - self.kafka_client.delete_topics([testenv["kafka_topic"]]) + self.kafka_client.delete_topics( + [ + testenv["kafka_topic"], + testenv["kafka_topic"] + "_1", + testenv["kafka_topic"] + "_2", + testenv["kafka_topic"] + "_3", + ] + ) + time.sleep(3) def test_trace_confluent_kafka_produce(self) -> None: with tracer.start_as_current_span("test"): @@ -77,6 +108,7 @@ def test_trace_confluent_kafka_produce(self) -> None: assert kafka_span.data["kafka"]["access"] == "produce" def test_trace_confluent_kafka_consume(self) -> None: + agent.options.set_trace_configurations() # Produce some events self.producer.produce(testenv["kafka_topic"], value=b"raw_bytes1") self.producer.flush(timeout=30) @@ -112,11 +144,13 @@ def test_trace_confluent_kafka_consume(self) -> None: assert kafka_span.n == "kafka" assert kafka_span.k == SpanKind.SERVER + assert kafka_span.data["kafka"]["service"] == testenv["kafka_topic"] assert kafka_span.data["kafka"]["access"] == "consume" def test_trace_confluent_kafka_poll(self) -> None: # Produce some events self.producer.produce(testenv["kafka_topic"], b"raw_bytes1") + self.producer.produce(testenv["kafka_topic"], b"raw_bytes2") self.producer.flush() # Consume the events @@ -146,6 +180,7 @@ def test_trace_confluent_kafka_poll(self) -> None: assert kafka_span.n == "kafka" assert kafka_span.k == SpanKind.SERVER + assert kafka_span.data["kafka"]["service"] == testenv["kafka_topic"] assert kafka_span.data["kafka"]["access"] == "poll" def test_trace_confluent_kafka_error(self) -> None: @@ -187,6 +222,151 @@ def test_trace_confluent_kafka_error(self) -> None: == "num_messages must be between 0 and 1000000 (1M)" ) + @patch.dict(os.environ, {"INSTANA_IGNORE_ENDPOINTS": "kafka"}) + def test_ignore_confluent_kafka(self) -> None: + agent.options.set_trace_configurations() + with tracer.start_as_current_span("test"): + self.producer.produce(testenv["kafka_topic"], b"raw_bytes") + self.producer.flush(timeout=10) + + spans = self.recorder.queued_spans() + assert len(spans) == 2 + + filtered_spans = agent.filter_spans(spans) + assert len(filtered_spans) == 1 + + @patch.dict(os.environ, {"INSTANA_IGNORE_ENDPOINTS": "kafka:produce"}) + def test_ignore_confluent_kafka_producer(self) -> None: + agent.options.set_trace_configurations() + with tracer.start_as_current_span("test-span"): + # Produce some events + self.producer.produce(testenv["kafka_topic"], b"raw_bytes1") + self.producer.produce(testenv["kafka_topic"], b"raw_bytes2") + self.producer.flush() + + # Consume the events + consumer_config = self.kafka_config.copy() + consumer_config["group.id"] = "my-group" + consumer_config["auto.offset.reset"] = "earliest" + + consumer = Consumer(consumer_config) + consumer.subscribe([testenv["kafka_topic"]]) + consumer.consume(num_messages=2, timeout=60) + + consumer.close() + + spans = self.recorder.queued_spans() + assert len(spans) == 3 + + filtered_spans = agent.filter_spans(spans) + assert len(filtered_spans) == 1 + + @patch.dict(os.environ, {"INSTANA_IGNORE_ENDPOINTS": "kafka:consume"}) + def test_ignore_confluent_kafka_consumer(self) -> None: + agent.options.set_trace_configurations() + # Produce some events + self.producer.produce(testenv["kafka_topic"], b"raw_bytes1") + self.producer.produce(testenv["kafka_topic"], b"raw_bytes2") + self.producer.flush() + + with tracer.start_as_current_span("test-span"): + # Consume the events + consumer_config = self.kafka_config.copy() + consumer_config["group.id"] = "my-group" + consumer_config["auto.offset.reset"] = "earliest" + + consumer = Consumer(consumer_config) + consumer.subscribe([testenv["kafka_topic"]]) + consumer.consume(num_messages=2, timeout=60) + + consumer.close() + + spans = self.recorder.queued_spans() + assert len(spans) == 3 + + filtered_spans = agent.filter_spans(spans) + assert len(filtered_spans) == 1 + + @patch.dict( + os.environ, + { + "INSTANA_IGNORE_ENDPOINTS_PATH": "tests/util/test_configuration-1.yaml", + }, + ) + def test_ignore_confluent_specific_topic(self) -> None: + agent.options.set_trace_configurations() + self.kafka_client.create_topics( # noqa: F841 + [ + NewTopic( + testenv["kafka_topic"] + "_1", + num_partitions=1, + replication_factor=1, + ), + ] + ) + + with tracer.start_as_current_span("test-span"): + # Produce some events + self.producer.produce(testenv["kafka_topic"], b"raw_bytes1") + self.producer.produce(testenv["kafka_topic"] + "_1", b"raw_bytes1") + self.producer.flush() + + # Consume the events + consumer_config = self.kafka_config.copy() + consumer_config["group.id"] = "my-group" + consumer_config["auto.offset.reset"] = "earliest" + + consumer = Consumer(consumer_config) + consumer.subscribe([testenv["kafka_topic"], testenv["kafka_topic"] + "_1"]) + consumer.consume(num_messages=2, timeout=60) + + consumer.close() + + spans = self.recorder.queued_spans() + assert len(spans) == 5 + + filtered_spans = agent.filter_spans(spans) + assert len(filtered_spans) == 3 + + span_to_be_filtered = get_first_span_by_filter( + spans, + lambda span: span.n == "kafka" + and span.data["kafka"]["service"] == "span-topic", + ) + assert span_to_be_filtered not in filtered_spans + + self.kafka_client.delete_topics( + [ + testenv["kafka_topic"] + "_1", + ] + ) + + def test_ignore_confluent_specific_topic_with_config_file(self) -> None: + agent.options.ignore_endpoints = parse_ignored_endpoints_from_yaml( + "tests/util/test_configuration-1.yaml" + ) + + with tracer.start_as_current_span("test-span"): + # Produce some events + self.producer.produce(testenv["kafka_topic"], b"raw_bytes1") + self.producer.flush() + + # Consume the events + consumer_config = self.kafka_config.copy() + consumer_config["group.id"] = "my-group" + consumer_config["auto.offset.reset"] = "earliest" + + consumer = Consumer(consumer_config) + consumer.subscribe([testenv["kafka_topic"]]) + consumer.consume(num_messages=1, timeout=60) + consumer.close() + + spans = self.recorder.queued_spans() + assert len(spans) == 3 + + filtered_spans = agent.filter_spans(spans) + assert len(filtered_spans) == 1 + def test_confluent_kafka_consumer_root_exit(self) -> None: agent.options.allow_exit_as_root = True @@ -255,8 +435,9 @@ def test_confluent_kafka_consumer_root_exit(self) -> None: ] ) - def test_confluent_kafka_poll_root_exit(self) -> None: + def test_confluent_kafka_poll_root_exit_with_trace_correlation(self) -> None: agent.options.allow_exit_as_root = True + agent.options.set_trace_configurations() # Produce some events self.producer.produce(testenv["kafka_topic"], b"raw_bytes1") @@ -296,8 +477,9 @@ def test_confluent_kafka_poll_root_exit(self) -> None: assert producer_span.s == poll_span.p assert producer_span.s != poll_span.s - def test_confluent_kafka_poll_root_exit_error(self) -> None: + def test_confluent_kafka_poll_root_exit_without_trace_correlation(self) -> None: agent.options.allow_exit_as_root = True + agent.options.kafka_trace_correlation = False # Produce some events self.producer.produce(testenv["kafka_topic"], b"raw_bytes1") @@ -311,17 +493,159 @@ def test_confluent_kafka_poll_root_exit_error(self) -> None: consumer = Consumer(consumer_config) consumer.subscribe([testenv["kafka_topic"]]) - msg = consumer.poll(timeout="wrong_value") # noqa: F841 + msg = consumer.poll(timeout=30) # noqa: F841 consumer.close() spans = self.recorder.queued_spans() assert len(spans) == 2 + producer_span = get_first_span_by_filter( + spans, + lambda span: span.n == "kafka" + and span.data["kafka"]["access"] == "produce" + and span.data["kafka"]["service"] == "span-topic", + ) + poll_span = get_first_span_by_filter( spans, lambda span: span.n == "kafka" and span.data["kafka"]["access"] == "poll" and span.data["kafka"]["service"] == "span-topic", ) + + # Different traceId + assert producer_span.t != poll_span.t + assert producer_span.s != poll_span.p + assert producer_span.s != poll_span.s + + def test_confluent_kafka_poll_root_exit_error(self) -> None: + agent.options.allow_exit_as_root = True + agent.options.set_trace_configurations() + + # Produce some events + self.producer.produce(testenv["kafka_topic"], b"raw_bytes1") + self.producer.flush() + + # Consume the events + consumer_config = self.kafka_config.copy() + consumer_config["group.id"] = "my-group" + consumer_config["auto.offset.reset"] = "earliest" + + consumer = Consumer(consumer_config) + consumer.subscribe([testenv["kafka_topic"]]) + + msg = consumer.poll(timeout="wrong_value") # noqa: F841 + + consumer.close() + + spans = self.recorder.queued_spans() + assert len(spans) == 2 + + poll_span = get_first_span_by_filter( + spans, + lambda span: span.n == "kafka" and span.data["kafka"]["access"] == "poll", + ) assert poll_span.data["kafka"]["error"] == "must be real number, not str" + + @patch.dict(os.environ, {"INSTANA_ALLOW_ROOT_EXIT_SPAN": "1"}) + def test_confluent_kafka_downstream_suppression(self) -> None: + config["tracing"]["ignore_endpoints"] = { + "kafka": [ + {"methods": ["produce"], "endpoints": [f"{testenv['kafka_topic']}_1"]}, + { + "methods": ["consume"], + "endpoints": [f"{testenv['kafka_topic']}_2"], + }, + ] + } + agent.options.set_trace_configurations() + + self.kafka_client.create_topics( # noqa: F841 + [ + NewTopic( + testenv["kafka_topic"] + "_1", + num_partitions=1, + replication_factor=1, + ), + NewTopic( + testenv["kafka_topic"] + "_2", + num_partitions=1, + replication_factor=1, + ), + ] + ) + + self.producer.produce(testenv["kafka_topic"] + "_1", b"raw_bytes1") + self.producer.produce(testenv["kafka_topic"] + "_2", b"raw_bytes2") + self.producer.flush(timeout=10) + + # Consume the events + consumer_config = self.kafka_config.copy() + consumer_config["group.id"] = "my-group" + consumer_config["auto.offset.reset"] = "earliest" + + consumer = Consumer(consumer_config) + consumer.subscribe( + [ + testenv["kafka_topic"] + "_1", + testenv["kafka_topic"] + "_2", + ] + ) + + messages = consumer.consume(num_messages=2, timeout=60) # noqa: F841 + + consumer.close() + + spans = self.recorder.queued_spans() + assert len(spans) == 3 + + producer_span_1 = get_first_span_by_filter( + spans, + lambda span: span.n == "kafka" + and span.data["kafka"]["access"] == "produce" + and span.data["kafka"]["service"] == "span-topic_1", + ) + producer_span_2 = get_first_span_by_filter( + spans, + lambda span: span.n == "kafka" + and span.data["kafka"]["access"] == "produce" + and span.data["kafka"]["service"] == "span-topic_2", + ) + consumer_span_1 = get_first_span_by_filter( + spans, + lambda span: span.n == "kafka" + and span.data["kafka"]["access"] == "consume" + and span.data["kafka"]["service"] == "span-topic_1", + ) + consumer_span_2 = get_first_span_by_filter( + spans, + lambda span: span.n == "kafka" + and span.data["kafka"]["access"] == "consume" + and span.data["kafka"]["service"] == "span-topic_2", + ) + + assert producer_span_1 + # consumer has been suppressed + assert not consumer_span_1 + + assert producer_span_2.t == consumer_span_2.t + assert producer_span_2.s == consumer_span_2.p + assert producer_span_2.s != consumer_span_2.s + + for message in messages: + if message.topic() == "span-topic_1": + assert message.headers() == [("x_instana_l_s", b"0")] + else: + assert message.headers() == [ + ("x_instana_l_s", b"1"), + ("x_instana_t", format_span_id(producer_span_2.t).encode("utf-8")), + ("x_instana_s", format_span_id(producer_span_2.s).encode("utf-8")), + ] + + self.kafka_client.delete_topics( + [ + testenv["kafka_topic"] + "_1", + testenv["kafka_topic"] + "_2", + ] + ) diff --git a/tests/clients/kafka/test_kafka_python.py b/tests/clients/kafka/test_kafka_python.py index 5999ef09..dd568583 100644 --- a/tests/clients/kafka/test_kafka_python.py +++ b/tests/clients/kafka/test_kafka_python.py @@ -1,14 +1,20 @@ # (c) Copyright IBM Corp. 2025 +import os from typing import Generator import pytest from kafka import KafkaConsumer, KafkaProducer from kafka.admin import KafkaAdminClient, NewTopic from kafka.errors import TopicAlreadyExistsError +from mock import patch from opentelemetry.trace import SpanKind +from opentelemetry.trace.span import format_span_id +from instana.configurator import config +from instana.options import StandardOptions from instana.singletons import agent, tracer +from instana.util.config import parse_ignored_endpoints_from_yaml from tests.helpers import get_first_span_by_filter, testenv @@ -35,6 +41,21 @@ def _resource(self) -> Generator[None, None, None]: num_partitions=1, replication_factor=1, ), + NewTopic( + name=testenv["kafka_topic"] + "_1", + num_partitions=1, + replication_factor=1, + ), + NewTopic( + name=testenv["kafka_topic"] + "_2", + num_partitions=1, + replication_factor=1, + ), + NewTopic( + name=testenv["kafka_topic"] + "_3", + num_partitions=1, + replication_factor=1, + ), ] ) except TopicAlreadyExistsError: @@ -44,20 +65,28 @@ def _resource(self) -> Generator[None, None, None]: self.producer = KafkaProducer( bootstrap_servers=testenv["kafka_bootstrap_servers"] ) + agent.options = StandardOptions() yield # teardown # Ensure that allow_exit_as_root has the default value""" agent.options.allow_exit_as_root = False # Close connections self.producer.close() - self.kafka_client.delete_topics([testenv["kafka_topic"]]) + self.kafka_client.delete_topics( + [ + testenv["kafka_topic"], + testenv["kafka_topic"] + "_1", + testenv["kafka_topic"] + "_2", + testenv["kafka_topic"] + "_3", + ] + ) self.kafka_client.close() def test_trace_kafka_python_send(self) -> None: with tracer.start_as_current_span("test"): future = self.producer.send(testenv["kafka_topic"], b"raw_bytes") - record_metadata = future.get(timeout=10) # noqa: F841 + _ = future.get(timeout=10) # noqa: F841 spans = self.recorder.queued_spans() assert len(spans) == 2 @@ -203,6 +232,130 @@ def test_trace_kafka_python_error(self) -> None: assert kafka_span.data["kafka"]["access"] == "consume" assert kafka_span.data["kafka"]["error"] == "StopIteration()" + def consume_from_topic(self, topic_name: str) -> None: + consumer = KafkaConsumer( + topic_name, + bootstrap_servers=testenv["kafka_bootstrap_servers"], + auto_offset_reset="earliest", + enable_auto_commit=False, + consumer_timeout_ms=1000, + ) + with tracer.start_as_current_span("test"): + for msg in consumer: + if msg is None: + break + + consumer.close() + + @patch.dict(os.environ, {"INSTANA_IGNORE_ENDPOINTS": "kafka"}) + def test_ignore_kafka(self) -> None: + agent.options.set_trace_configurations() + with tracer.start_as_current_span("test"): + self.producer.send(testenv["kafka_topic"], b"raw_bytes") + self.producer.flush() + + spans = self.recorder.queued_spans() + assert len(spans) == 2 + + filtered_spans = agent.filter_spans(spans) + assert len(filtered_spans) == 1 + + @patch.dict(os.environ, {"INSTANA_IGNORE_ENDPOINTS": "kafka:send"}) + def test_ignore_kafka_producer(self) -> None: + agent.options.set_trace_configurations() + with tracer.start_as_current_span("test-span"): + # Produce some events + self.producer.send(testenv["kafka_topic"], b"raw_bytes1") + self.producer.send(testenv["kafka_topic"], b"raw_bytes2") + self.producer.flush() + + # Consume the events manually + # consume_from_topic not used due to to not create sdk span + consumer = KafkaConsumer( + testenv["kafka_topic"], + bootstrap_servers=testenv["kafka_bootstrap_servers"], + auto_offset_reset="earliest", + enable_auto_commit=False, + consumer_timeout_ms=1000, + ) + for msg in consumer: + if msg is None: + break + + consumer.close() + + spans = self.recorder.queued_spans() + assert len(spans) == 3 + + filtered_spans = agent.filter_spans(spans) + assert len(filtered_spans) == 1 + + @patch.dict(os.environ, {"INSTANA_IGNORE_ENDPOINTS": "kafka:consume"}) + def test_ignore_kafka_consumer(self) -> None: + agent.options.set_trace_configurations() + # Produce some events + self.producer.send(testenv["kafka_topic"], b"raw_bytes1") + self.producer.send(testenv["kafka_topic"], b"raw_bytes2") + self.producer.flush() + + # Consume the events + self.consume_from_topic(testenv["kafka_topic"]) + + spans = self.recorder.queued_spans() + assert len(spans) == 4 + + filtered_spans = agent.filter_spans(spans) + assert len(filtered_spans) == 1 + + @patch.dict( + os.environ, + { + "INSTANA_IGNORE_ENDPOINTS_PATH": "tests/util/test_configuration-1.yaml", + }, + ) + def test_ignore_specific_topic(self) -> None: + agent.options.set_trace_configurations() + with tracer.start_as_current_span("test-span"): + # Produce some events + self.producer.send(testenv["kafka_topic"], b"raw_bytes1") + self.producer.send(testenv["kafka_topic"] + "_1", b"raw_bytes1") + self.producer.flush() + + # Consume the events + self.consume_from_topic(testenv["kafka_topic"]) + self.consume_from_topic(testenv["kafka_topic"] + "_1") + + spans = self.recorder.queued_spans() + assert len(spans) == 11 + + filtered_spans = agent.filter_spans(spans) + assert len(filtered_spans) == 8 + + span_to_be_filtered = get_first_span_by_filter( + spans, + lambda span: span.n == "kafka" + and span.data["kafka"]["service"] == "span-topic", + ) + assert span_to_be_filtered not in filtered_spans + + def test_ignore_specific_topic_with_config_file(self) -> None: + agent.options.ignore_endpoints = parse_ignored_endpoints_from_yaml( + "tests/util/test_configuration-1.yaml" + ) + + # Produce some events + self.producer.send(testenv["kafka_topic"], b"raw_bytes1") + self.producer.flush() + + # Consume the events + self.consume_from_topic(testenv["kafka_topic"]) + + spans = self.recorder.queued_spans() + assert len(spans) == 3 + + filtered_spans = agent.filter_spans(spans) + assert len(filtered_spans) == 1 + def test_kafka_consumer_root_exit(self) -> None: agent.options.allow_exit_as_root = True @@ -242,29 +395,9 @@ def test_kafka_consumer_root_exit(self) -> None: assert producer_span.t == consumer_span.t - def test_kafka_poll_root_exit(self) -> None: + def test_kafka_poll_root_exit_with_trace_correlation(self) -> None: agent.options.allow_exit_as_root = True - self.kafka_client.create_topics( - [ - NewTopic( - name=testenv["kafka_topic"] + "_1", - num_partitions=1, - replication_factor=1, - ), - NewTopic( - name=testenv["kafka_topic"] + "_2", - num_partitions=1, - replication_factor=1, - ), - NewTopic( - name=testenv["kafka_topic"] + "_3", - num_partitions=1, - replication_factor=1, - ), - ] - ) - self.producer.send(testenv["kafka_topic"] + "_1", b"raw_bytes1") self.producer.send(testenv["kafka_topic"] + "_2", b"raw_bytes2") self.producer.send(testenv["kafka_topic"] + "_3", b"raw_bytes3") @@ -362,10 +495,221 @@ def test_kafka_poll_root_exit(self) -> None: assert producer_span_3.t == poll_span_3.t assert producer_span_3.s != poll_span_3.s - self.kafka_client.delete_topics( - [ - testenv["kafka_topic"] + "_1", - testenv["kafka_topic"] + "_2", - testenv["kafka_topic"] + "_3", + def test_kafka_poll_root_exit_without_trace_correlation(self) -> None: + agent.options.allow_exit_as_root = True + agent.options.kafka_trace_correlation = False + + self.producer.send(testenv["kafka_topic"] + "_1", b"raw_bytes1") + self.producer.send(testenv["kafka_topic"] + "_2", b"raw_bytes2") + self.producer.send(testenv["kafka_topic"] + "_3", b"raw_bytes3") + self.producer.flush() + + # Consume the events + consumer = KafkaConsumer( + bootstrap_servers=testenv["kafka_bootstrap_servers"], + auto_offset_reset="earliest", # consume earliest available messages + enable_auto_commit=False, # do not auto-commit offsets + consumer_timeout_ms=1000, + ) + topics = [ + testenv["kafka_topic"] + "_1", + testenv["kafka_topic"] + "_2", + testenv["kafka_topic"] + "_3", + ] + consumer.subscribe(topics) + + messages = consumer.poll(timeout_ms=1000) # noqa: F841 + consumer.close() + + spans = self.recorder.queued_spans() + assert len(spans) == 6 + + producer_span_1 = get_first_span_by_filter( + spans, + lambda span: span.n == "kafka" + and span.data["kafka"]["access"] == "send" + and span.data["kafka"]["service"] == "span-topic_1", + ) + producer_span_2 = get_first_span_by_filter( + spans, + lambda span: span.n == "kafka" + and span.data["kafka"]["access"] == "send" + and span.data["kafka"]["service"] == "span-topic_2", + ) + producer_span_3 = get_first_span_by_filter( + spans, + lambda span: span.n == "kafka" + and span.data["kafka"]["access"] == "send" + and span.data["kafka"]["service"] == "span-topic_3", + ) + + poll_span_1 = get_first_span_by_filter( + spans, + lambda span: span.n == "kafka" + and span.data["kafka"]["access"] == "poll" + and span.data["kafka"]["service"] == "span-topic_1", + ) + poll_span_2 = get_first_span_by_filter( + spans, + lambda span: span.n == "kafka" + and span.data["kafka"]["access"] == "poll" + and span.data["kafka"]["service"] == "span-topic_2", + ) + poll_span_3 = get_first_span_by_filter( + spans, + lambda span: span.n == "kafka" + and span.data["kafka"]["access"] == "poll" + and span.data["kafka"]["service"] == "span-topic_3", + ) + + assert producer_span_1.n == "kafka" + assert producer_span_1.data["kafka"]["access"] == "send" + assert producer_span_1.data["kafka"]["service"] == "span-topic_1" + + assert producer_span_2.n == "kafka" + assert producer_span_2.data["kafka"]["access"] == "send" + assert producer_span_2.data["kafka"]["service"] == "span-topic_2" + + assert producer_span_3.n == "kafka" + assert producer_span_3.data["kafka"]["access"] == "send" + assert producer_span_3.data["kafka"]["service"] == "span-topic_3" + + assert poll_span_1.n == "kafka" + assert poll_span_1.data["kafka"]["access"] == "poll" + assert poll_span_1.data["kafka"]["service"] == "span-topic_1" + + assert poll_span_2.n == "kafka" + assert poll_span_2.data["kafka"]["access"] == "poll" + assert poll_span_2.data["kafka"]["service"] == "span-topic_2" + + assert poll_span_3.n == "kafka" + assert poll_span_3.data["kafka"]["access"] == "poll" + assert poll_span_3.data["kafka"]["service"] == "span-topic_3" + + # different trace id and span ids + assert producer_span_1.t != poll_span_1.t + assert producer_span_1.s != poll_span_1.s + + assert producer_span_2.t != poll_span_2.t + assert producer_span_2.s != poll_span_2.s + + assert producer_span_3.t != poll_span_3.t + assert producer_span_3.s != poll_span_3.s + + for topic_partition, partition_messages in messages.items(): + for message in partition_messages: + assert not message.headers + + @patch.dict(os.environ, {"INSTANA_ALLOW_ROOT_EXIT_SPAN": "1"}) + def test_kafka_downstream_suppression(self) -> None: + config["tracing"]["ignore_endpoints"] = { + "kafka": [ + {"methods": ["send"], "endpoints": [f"{testenv['kafka_topic']}_1"]}, + { + "methods": ["consume"], + "endpoints": [f"{testenv['kafka_topic']}_2"], + }, ] + } + agent.options.set_trace_configurations() + + self.producer.send(testenv["kafka_topic"] + "_1", b"raw_bytes1") + self.producer.send(testenv["kafka_topic"] + "_2", b"raw_bytes2") + self.producer.send(testenv["kafka_topic"] + "_3", b"raw_bytes3") + self.producer.flush() + + # Consume the events + consumer = KafkaConsumer( + bootstrap_servers=testenv["kafka_bootstrap_servers"], + auto_offset_reset="earliest", # consume earliest available messages + enable_auto_commit=False, # do not auto-commit offsets + consumer_timeout_ms=1000, ) + topics = [ + testenv["kafka_topic"] + "_1", + testenv["kafka_topic"] + "_2", + testenv["kafka_topic"] + "_3", + ] + consumer.subscribe(topics) + + messages = consumer.poll(timeout_ms=1000) # noqa: F841 + consumer.close() + + spans = self.recorder.queued_spans() + assert len(spans) == 5 + + producer_span_1 = get_first_span_by_filter( + spans, + lambda span: span.n == "kafka" + and span.data["kafka"]["access"] == "send" + and span.data["kafka"]["service"] == "span-topic_1", + ) + producer_span_2 = get_first_span_by_filter( + spans, + lambda span: span.n == "kafka" + and span.data["kafka"]["access"] == "send" + and span.data["kafka"]["service"] == "span-topic_2", + ) + producer_span_3 = get_first_span_by_filter( + spans, + lambda span: span.n == "kafka" + and span.data["kafka"]["access"] == "send" + and span.data["kafka"]["service"] == "span-topic_3", + ) + + poll_span_2 = get_first_span_by_filter( + spans, + lambda span: span.n == "kafka" + and span.data["kafka"]["access"] == "poll" + and span.data["kafka"]["service"] == "span-topic_2", + ) + poll_span_3 = get_first_span_by_filter( + spans, + lambda span: span.n == "kafka" + and span.data["kafka"]["access"] == "poll" + and span.data["kafka"]["service"] == "span-topic_3", + ) + + assert producer_span_1.n == "kafka" + assert producer_span_1.data["kafka"]["access"] == "send" + assert producer_span_1.data["kafka"]["service"] == "span-topic_1" + + assert producer_span_2.n == "kafka" + assert producer_span_2.data["kafka"]["access"] == "send" + assert producer_span_2.data["kafka"]["service"] == "span-topic_2" + + assert producer_span_3.n == "kafka" + assert producer_span_3.data["kafka"]["access"] == "send" + assert producer_span_3.data["kafka"]["service"] == "span-topic_3" + + assert poll_span_2.n == "kafka" + assert poll_span_2.data["kafka"]["access"] == "poll" + assert poll_span_2.data["kafka"]["service"] == "span-topic_2" + + assert poll_span_3.n == "kafka" + assert poll_span_3.data["kafka"]["access"] == "poll" + assert poll_span_3.data["kafka"]["service"] == "span-topic_3" + + # same trace id, different span ids + assert producer_span_2.t == poll_span_2.t + assert producer_span_2.s != poll_span_2.s + + assert producer_span_3.t == poll_span_3.t + assert producer_span_3.s != poll_span_3.s + + for topic_partition, partition_messages in messages.items(): + for message in partition_messages: + if message.topic == "span-topic_1": + assert message.headers == [("x_instana_l_s", b"0")] + elif message.topic == "span-topic_2": + assert message.headers == [ + ("x_instana_l_s", b"1"), + ( + "x_instana_t", + format_span_id(producer_span_2.t).encode("utf-8"), + ), + ( + "x_instana_s", + format_span_id(producer_span_2.s).encode("utf-8"), + ), + ] diff --git a/tests/propagators/test_kafka_propagator.py b/tests/propagators/test_kafka_propagator.py new file mode 100644 index 00000000..bec987bf --- /dev/null +++ b/tests/propagators/test_kafka_propagator.py @@ -0,0 +1,125 @@ +import logging +from typing import Generator + +import pytest +from mock import patch +from opentelemetry.trace.span import format_span_id + +from instana.propagators.kafka_propagator import KafkaPropagator +from instana.span_context import SpanContext + + +class TestKafkaPropagator: + @pytest.fixture(autouse=True) + def _resources(self) -> Generator[None, None, None]: + self.kafka_prop = KafkaPropagator() + yield + + def test_extract_carrier_headers_as_list_of_dicts(self) -> None: + carrier_as_a_list = [{"key": "value"}] + response = self.kafka_prop.extract_carrier_headers(carrier_as_a_list) + + assert response == {"key": "value"} + + carrier_as_a_list = [{"key": "value"}, {"key": "value2"}] + response = self.kafka_prop.extract_carrier_headers(carrier_as_a_list) + + assert response == {"key": "value2"} + + def test_extract_carrier_headers_as_list_of_tuples(self) -> None: + carrier_as_a_list = [("key", "value")] + response = self.kafka_prop.extract_carrier_headers(carrier_as_a_list) + + assert response == {"key": "value"} + + carrier_as_a_list = [("key", "value"), ("key", "value2")] + response = self.kafka_prop.extract_carrier_headers(carrier_as_a_list) + + assert response == {"key": "value2"} + + def test_extract_carrier_headers_as_dict(self) -> None: + carrier_as_a_dict = {"key": "value"} + response = self.kafka_prop.extract_carrier_headers(carrier_as_a_dict) + + assert response == {"key": "value"} + + def test_extract_carrier_headers_as_set( + self, + caplog: pytest.LogCaptureFixture, + ) -> None: + caplog.set_level(logging.DEBUG, logger="instana") + carrier_as_a_dict = {"key": "value"} + with patch.object( + KafkaPropagator, + "extract_headers_dict", + side_effect=Exception(), + ): + response = self.kafka_prop.extract_carrier_headers(carrier_as_a_dict) + + assert not response + assert ( + "kafka_propagator extract_headers_list: Couldn't convert - {'key': 'value'}" + in caplog.messages + ) + + def test_extract(self) -> None: + carrier_as_a_dict = {"key": "value"} + disable_w3c_trace_context = False + response = self.kafka_prop.extract(carrier_as_a_dict, disable_w3c_trace_context) + assert response + + def test_extract_with_error( + self, + caplog: pytest.LogCaptureFixture, + ) -> None: + caplog.set_level(logging.DEBUG, logger="instana") + carrier_as_a_dict = {"key": "value"} + disable_w3c_trace_context = False + with patch.object( + KafkaPropagator, + "extract_carrier_headers", + side_effect=Exception("fake error"), + ): + response = self.kafka_prop.extract( + carrier_as_a_dict, disable_w3c_trace_context + ) + assert not response + assert "kafka_propagator extract error: fake error" + + def test_inject_without_suppression(self, trace_id: int, span_id: int) -> None: + span_context = SpanContext( + span_id=span_id, + trace_id=trace_id, + is_remote=False, + level=1, + baggage={}, + sampled=True, + synthetic=False, + ) + trace_id = span_context.trace_id + span_id = span_context.span_id + carrier = {} + + self.kafka_prop.inject(span_context, carrier) + assert carrier == { + "x_instana_l_s": b"1", + "x_instana_t": format_span_id(trace_id).encode("utf-8"), + "x_instana_s": format_span_id(span_id).encode("utf-8"), + } + + def test_inject_with_suppression(self, trace_id: int, span_id: int) -> None: + span_context = SpanContext( + span_id=span_id, + trace_id=trace_id, + is_remote=False, + level=1, + baggage={}, + sampled=True, + synthetic=False, + ) + trace_id = span_context.trace_id + span_id = span_context.span_id + carrier = {"x_instana_l_s": "0"} + + self.kafka_prop.inject(span_context, carrier) + assert carrier == {"x_instana_l_s": b"0"} diff --git a/tests/requirements-kafka.txt b/tests/requirements-kafka.txt index 2451489a..640d2ad9 100644 --- a/tests/requirements-kafka.txt +++ b/tests/requirements-kafka.txt @@ -1,5 +1,5 @@ -r requirements-minimal.txt mock>=2.0.0 +confluent-kafka>=2.0.0 kafka-python>=2.0.0; python_version < "3.12" kafka-python-ng>=2.0.0; python_version >= "3.12" -confluent-kafka>=2.0.0 \ No newline at end of file diff --git a/tests/test_options.py b/tests/test_options.py index 025ff092..a2130c38 100644 --- a/tests/test_options.py +++ b/tests/test_options.py @@ -1,7 +1,10 @@ +# (c) Copyright IBM Corp. 2025 + import logging import os from typing import Generator +from mock import patch import pytest from instana.configurator import config @@ -15,178 +18,270 @@ StandardOptions, ) -env_vars = [ - "INSTANA_DEBUG", - "INSTANA_EXTRA_HTTP_HEADERS", - "INSTANA_IGNORE_ENDPOINTS", - "INSTANA_SECRETS", - "INSTANA_AGENT_KEY", - "INSTANA_ENDPOINT_URL", - "INSTANA_DISABLE_CA_CHECK", - "INSTANA_ENDPOINT_PROXY", - "INSTANA_TIMEOUT", - "INSTANA_LOG_LEVEL", -] - - -def clean_env_vars(): - for env_var in env_vars: - if env_var in os.environ.keys(): - del os.environ[env_var] - class TestBaseOptions: @pytest.fixture(autouse=True) def _resource(self) -> Generator[None, None, None]: + self.base_options = None yield - clean_env_vars() if "tracing" in config.keys(): del config["tracing"] def test_base_options(self) -> None: if "INSTANA_DEBUG" in os.environ: del os.environ["INSTANA_DEBUG"] - test_base_options = BaseOptions() - - assert not test_base_options.debug - assert test_base_options.log_level == logging.WARN - assert not test_base_options.extra_http_headers - assert not test_base_options.allow_exit_as_root - assert not test_base_options.ignore_endpoints - assert test_base_options.secrets_matcher == "contains-ignore-case" - assert test_base_options.secrets_list == ["key", "pass", "secret"] - assert not test_base_options.secrets + self.base_options = BaseOptions() + + assert not self.base_options.debug + assert self.base_options.log_level == logging.WARN + assert not self.base_options.extra_http_headers + assert not self.base_options.allow_exit_as_root + assert not self.base_options.ignore_endpoints + assert self.base_options.kafka_trace_correlation + assert self.base_options.secrets_matcher == "contains-ignore-case" + assert self.base_options.secrets_list == ["key", "pass", "secret"] + assert not self.base_options.secrets def test_base_options_with_config(self) -> None: - config["tracing"]["ignore_endpoints"] = "service1;service3:endpoint1,endpoint2" - test_base_options = BaseOptions() - assert test_base_options.ignore_endpoints == [ - "service1", - "service3.endpoint1", - "service3.endpoint2", + config["tracing"] = { + "ignore_endpoints": "service1;service3:method1,method2", + "kafka": {"trace_correlation": True}, + } + self.base_options = BaseOptions() + assert self.base_options.ignore_endpoints == [ + "service1.*", + "service3.method1", + "service3.method2", ] - + assert self.base_options.kafka_trace_correlation + + @patch.dict( + os.environ, + { + "INSTANA_DEBUG": "true", + "INSTANA_EXTRA_HTTP_HEADERS": "SOMETHING;HERE", + "INSTANA_IGNORE_ENDPOINTS": "service1;service2:method1,method2", + "INSTANA_SECRETS": "secret1:username,password", + }, + ) def test_base_options_with_env_vars(self) -> None: - os.environ["INSTANA_DEBUG"] = "true" - os.environ["INSTANA_EXTRA_HTTP_HEADERS"] = "SOMETHING;HERE" - os.environ["INSTANA_IGNORE_ENDPOINTS"] = "service1;service2:endpoint1,endpoint2" - os.environ["INSTANA_SECRETS"] = "secret1:username,password" + self.base_options = BaseOptions() + assert self.base_options.log_level == logging.DEBUG + assert self.base_options.debug + + assert self.base_options.extra_http_headers == ["something", "here"] + + assert self.base_options.ignore_endpoints == [ + "service1.*", + "service2.method1", + "service2.method2", + ] + + assert self.base_options.secrets_matcher == "secret1" + assert self.base_options.secrets_list == ["username", "password"] + + @patch.dict( + os.environ, + {"INSTANA_IGNORE_ENDPOINTS_PATH": "tests/util/test_configuration-1.yaml"}, + ) + def test_base_options_with_endpoint_file(self) -> None: + self.base_options = BaseOptions() + assert self.base_options.ignore_endpoints == [ + "redis.get", + "redis.type", + "dynamodb.query", + "kafka.consume.span-topic", + "kafka.consume.topic1", + "kafka.consume.topic2", + "kafka.send.span-topic", + "kafka.send.topic1", + "kafka.send.topic2", + "kafka.consume.topic3", + "kafka.*.span-topic", + "kafka.*.topic4", + ] + del self.base_options + + @patch.dict( + os.environ, + { + "INSTANA_IGNORE_ENDPOINTS": "env_service1;env_service2:method1,method2", + "INSTANA_KAFKA_TRACE_CORRELATION": "false", + "INSTANA_IGNORE_ENDPOINTS_PATH": "tests/util/test_configuration-1.yaml", + }, + ) + def test_set_trace_configurations_by_env_variable(self) -> None: + # The priority is as follows: + # environment variables > in-code configuration > + # > agent config (configuration.yaml) > default value + config["tracing"]["ignore_endpoints"] = ( + "config_service1;config_service2:method1,method2" + ) + config["tracing"]["kafka"] = {"trace_correlation": True} + test_tracing = {"ignore-endpoints": "service1;service2:method1,method2"} - test_base_options = BaseOptions() - assert test_base_options.log_level == logging.DEBUG - assert test_base_options.debug + # Setting by env variable + self.base_options = StandardOptions() + self.base_options.set_tracing(test_tracing) + + assert self.base_options.ignore_endpoints == [ + "env_service1.*", + "env_service2.method1", + "env_service2.method2", + ] + assert not self.base_options.kafka_trace_correlation + + @patch.dict( + os.environ, + { + "INSTANA_KAFKA_TRACE_CORRELATION": "false", + "INSTANA_IGNORE_ENDPOINTS_PATH": "tests/util/test_configuration-1.yaml", + }, + ) + def test_set_trace_configurations_by_local_configuration_file(self) -> None: + config["tracing"]["ignore_endpoints"] = ( + "config_service1;config_service2:method1,method2" + ) + config["tracing"]["kafka"] = {"trace_correlation": True} + test_tracing = {"ignore-endpoints": "service1;service2:method1,method2"} + + self.base_options = StandardOptions() + self.base_options.set_tracing(test_tracing) + + assert self.base_options.ignore_endpoints == [ + "redis.get", + "redis.type", + "dynamodb.query", + "kafka.consume.span-topic", + "kafka.consume.topic1", + "kafka.consume.topic2", + "kafka.send.span-topic", + "kafka.send.topic1", + "kafka.send.topic2", + "kafka.consume.topic3", + "kafka.*.span-topic", + "kafka.*.topic4", + ] + + def test_set_trace_configurations_by_in_code_variable(self) -> None: + config["tracing"]["ignore_endpoints"] = ( + "config_service1;config_service2:method1,method2" + ) + config["tracing"]["kafka"] = {"trace_correlation": True} + test_tracing = {"ignore-endpoints": "service1;service2:method1,method2"} + + self.base_options = StandardOptions() + self.base_options.set_tracing(test_tracing) + + assert self.base_options.ignore_endpoints == [ + "config_service1.*", + "config_service2.method1", + "config_service2.method2", + ] + assert self.base_options.kafka_trace_correlation + + def test_set_trace_configurations_by_agent_configuration(self) -> None: + test_tracing = { + "ignore-endpoints": "service1;service2:method1,method2", + "trace-correlation": True, + } - assert test_base_options.extra_http_headers == ["something", "here"] + self.base_options = StandardOptions() + self.base_options.set_tracing(test_tracing) - assert test_base_options.ignore_endpoints == [ - "service1", - "service2.endpoint1", - "service2.endpoint2", + assert self.base_options.ignore_endpoints == [ + "service1.*", + "service2.method1", + "service2.method2", ] + assert self.base_options.kafka_trace_correlation - assert test_base_options.secrets_matcher == "secret1" - assert test_base_options.secrets_list == ["username", "password"] + def test_set_trace_configurations_by_default(self) -> None: + self.base_options = StandardOptions() + self.base_options.set_tracing({}) + + assert not self.base_options.ignore_endpoints + assert self.base_options.kafka_trace_correlation class TestStandardOptions: @pytest.fixture(autouse=True) def _resource(self) -> Generator[None, None, None]: + self.standart_options = None yield - clean_env_vars() if "tracing" in config.keys(): del config["tracing"] def test_standard_options(self) -> None: - test_standard_options = StandardOptions() + self.standart_options = StandardOptions() - assert test_standard_options.AGENT_DEFAULT_HOST == "localhost" - assert test_standard_options.AGENT_DEFAULT_PORT == 42699 + assert self.standart_options.AGENT_DEFAULT_HOST == "localhost" + assert self.standart_options.AGENT_DEFAULT_PORT == 42699 def test_set_secrets(self) -> None: - test_standard_options = StandardOptions() + self.standart_options = StandardOptions() test_secrets = {"matcher": "sample-match", "list": ["sample", "list"]} - test_standard_options.set_secrets(test_secrets) - assert test_standard_options.secrets_matcher == "sample-match" - assert test_standard_options.secrets_list == ["sample", "list"] + self.standart_options.set_secrets(test_secrets) + assert self.standart_options.secrets_matcher == "sample-match" + assert self.standart_options.secrets_list == ["sample", "list"] def test_set_extra_headers(self) -> None: - test_standard_options = StandardOptions() + self.standart_options = StandardOptions() test_headers = {"header1": "sample-match", "header2": ["sample", "list"]} - test_standard_options.set_extra_headers(test_headers) - assert test_standard_options.extra_http_headers == test_headers + self.standart_options.set_extra_headers(test_headers) + assert self.standart_options.extra_http_headers == test_headers - def test_set_tracing(self) -> None: - test_standard_options = StandardOptions() + def test_set_tracing( + self, + caplog: pytest.LogCaptureFixture, + ) -> None: + caplog.set_level(logging.DEBUG, logger="instana") + self.standart_options = StandardOptions() - test_tracing = {"ignore-endpoints": "service1;service2:endpoint1,endpoint2"} - test_standard_options.set_tracing(test_tracing) + test_tracing = { + "ignore-endpoints": "service1;service2:method1,method2", + "kafka": {"trace-correlation": "false", "header-format": "binary"}, + } + self.standart_options.set_tracing(test_tracing) - assert test_standard_options.ignore_endpoints == [ - "service1", - "service2.endpoint1", - "service2.endpoint2", + assert self.standart_options.ignore_endpoints == [ + "service1.*", + "service2.method1", + "service2.method2", ] - assert not test_standard_options.extra_http_headers - - def test_set_tracing_priority(self) -> None: - # Environment variables > In-code Configuration > Agent Configuration - # First test when all attributes given - os.environ["INSTANA_IGNORE_ENDPOINTS"] = ( - "env_service1;env_service2:endpoint1,endpoint2" - ) - config["tracing"]["ignore_endpoints"] = ( - "config_service1;config_service2:endpoint1,endpoint2" + assert not self.standart_options.kafka_trace_correlation + assert ( + "Binary header format for Kafka is deprecated. Please use string header format." + in caplog.messages ) - test_tracing = {"ignore-endpoints": "service1;service2:endpoint1,endpoint2"} - - test_standard_options = StandardOptions() - test_standard_options.set_tracing(test_tracing) - - assert test_standard_options.ignore_endpoints == [ - "env_service1", - "env_service2.endpoint1", - "env_service2.endpoint2", - ] - - # Second test when In-code configuration and Agent configuration given - - del os.environ["INSTANA_IGNORE_ENDPOINTS"] - - test_standard_options = StandardOptions() - test_standard_options.set_tracing(test_tracing) - - assert test_standard_options.ignore_endpoints == [ - "config_service1", - "config_service2.endpoint1", - "config_service2.endpoint2", - ] + assert not self.standart_options.extra_http_headers def test_set_from(self) -> None: - test_standard_options = StandardOptions() + self.standart_options = StandardOptions() test_res_data = { "secrets": {"matcher": "sample-match", "list": ["sample", "list"]}, - "tracing": {"ignore-endpoints": "service1;service2:endpoint1,endpoint2"}, + "tracing": {"ignore-endpoints": "service1;service2:method1,method2"}, } - test_standard_options.set_from(test_res_data) + self.standart_options.set_from(test_res_data) assert ( - test_standard_options.secrets_matcher == test_res_data["secrets"]["matcher"] + self.standart_options.secrets_matcher == test_res_data["secrets"]["matcher"] ) - assert test_standard_options.secrets_list == test_res_data["secrets"]["list"] - assert test_standard_options.ignore_endpoints == [ - "service1", - "service2.endpoint1", - "service2.endpoint2", + assert self.standart_options.secrets_list == test_res_data["secrets"]["list"] + assert self.standart_options.ignore_endpoints == [ + "service1.*", + "service2.method1", + "service2.method2", ] test_res_data = { "extraHeaders": {"header1": "sample-match", "header2": ["sample", "list"]}, } - test_standard_options.set_from(test_res_data) + self.standart_options.set_from(test_res_data) - assert test_standard_options.extra_http_headers == test_res_data["extraHeaders"] + assert self.standart_options.extra_http_headers == test_res_data["extraHeaders"] def test_set_from_bool( self, @@ -195,9 +290,9 @@ def test_set_from_bool( caplog.set_level(logging.DEBUG, logger="instana") caplog.clear() - test_standard_options = StandardOptions() + self.standart_options = StandardOptions() test_res_data = True - test_standard_options.set_from(test_res_data) + self.standart_options.set_from(test_res_data) assert len(caplog.messages) == 1 assert len(caplog.records) == 1 @@ -205,180 +300,196 @@ def test_set_from_bool( "options.set_from: Wrong data type - " in caplog.messages[0] ) - assert test_standard_options.secrets_list == ["key", "pass", "secret"] - assert test_standard_options.ignore_endpoints == [] - assert not test_standard_options.extra_http_headers + assert self.standart_options.secrets_list == ["key", "pass", "secret"] + assert self.standart_options.ignore_endpoints == [] + assert not self.standart_options.extra_http_headers class TestServerlessOptions: @pytest.fixture(autouse=True) def _resource(self) -> Generator[None, None, None]: + self.serverless_options = None yield - clean_env_vars() def test_serverless_options(self) -> None: - test_serverless_options = ServerlessOptions() - - assert not test_serverless_options.debug - assert test_serverless_options.log_level == logging.WARN - assert not test_serverless_options.extra_http_headers - assert not test_serverless_options.allow_exit_as_root - assert not test_serverless_options.ignore_endpoints - assert test_serverless_options.secrets_matcher == "contains-ignore-case" - assert test_serverless_options.secrets_list == ["key", "pass", "secret"] - assert not test_serverless_options.secrets - assert not test_serverless_options.agent_key - assert not test_serverless_options.endpoint_url - assert test_serverless_options.ssl_verify - assert not test_serverless_options.endpoint_proxy - assert test_serverless_options.timeout == 0.8 - + self.serverless_options = ServerlessOptions() + + assert not self.serverless_options.debug + assert self.serverless_options.log_level == logging.WARN + assert not self.serverless_options.extra_http_headers + assert not self.serverless_options.allow_exit_as_root + assert not self.serverless_options.ignore_endpoints + assert self.serverless_options.secrets_matcher == "contains-ignore-case" + assert self.serverless_options.secrets_list == ["key", "pass", "secret"] + assert not self.serverless_options.secrets + assert not self.serverless_options.agent_key + assert not self.serverless_options.endpoint_url + assert self.serverless_options.ssl_verify + assert not self.serverless_options.endpoint_proxy + assert self.serverless_options.timeout == 0.8 + + @patch.dict( + os.environ, + { + "INSTANA_AGENT_KEY": "key1", + "INSTANA_ENDPOINT_URL": "localhost", + "INSTANA_DISABLE_CA_CHECK": "true", + "INSTANA_ENDPOINT_PROXY": "proxy1", + "INSTANA_TIMEOUT": "3000", + "INSTANA_LOG_LEVEL": "info", + }, + ) def test_serverless_options_with_env_vars(self) -> None: - os.environ["INSTANA_AGENT_KEY"] = "key1" - os.environ["INSTANA_ENDPOINT_URL"] = "localhost" - os.environ["INSTANA_DISABLE_CA_CHECK"] = "true" - os.environ["INSTANA_ENDPOINT_PROXY"] = "proxy1" - os.environ["INSTANA_TIMEOUT"] = "3000" - os.environ["INSTANA_LOG_LEVEL"] = "info" - - test_serverless_options = ServerlessOptions() + self.serverless_options = ServerlessOptions() - assert test_serverless_options.agent_key == "key1" - assert test_serverless_options.endpoint_url == "localhost" - assert not test_serverless_options.ssl_verify - assert test_serverless_options.endpoint_proxy == {"https": "proxy1"} - assert test_serverless_options.timeout == 3 - assert test_serverless_options.log_level == logging.INFO + assert self.serverless_options.agent_key == "key1" + assert self.serverless_options.endpoint_url == "localhost" + assert not self.serverless_options.ssl_verify + assert self.serverless_options.endpoint_proxy == {"https": "proxy1"} + assert self.serverless_options.timeout == 3 + assert self.serverless_options.log_level == logging.INFO class TestAWSLambdaOptions: @pytest.fixture(autouse=True) def _resource(self) -> Generator[None, None, None]: + self.aws_lambda_options = None yield - clean_env_vars() def test_aws_lambda_options(self) -> None: - test_aws_lambda_options = AWSLambdaOptions() + self.aws_lambda_options = AWSLambdaOptions() - assert not test_aws_lambda_options.agent_key - assert not test_aws_lambda_options.endpoint_url - assert test_aws_lambda_options.ssl_verify - assert not test_aws_lambda_options.endpoint_proxy - assert test_aws_lambda_options.timeout == 0.8 - assert test_aws_lambda_options.log_level == logging.WARN + assert not self.aws_lambda_options.agent_key + assert not self.aws_lambda_options.endpoint_url + assert self.aws_lambda_options.ssl_verify + assert not self.aws_lambda_options.endpoint_proxy + assert self.aws_lambda_options.timeout == 0.8 + assert self.aws_lambda_options.log_level == logging.WARN class TestAWSFargateOptions: @pytest.fixture(autouse=True) def _resource(self) -> Generator[None, None, None]: + self.aws_fargate_options = None yield - clean_env_vars() def test_aws_fargate_options(self) -> None: - test_aws_fargate_options = AWSFargateOptions() - - assert not test_aws_fargate_options.agent_key - assert not test_aws_fargate_options.endpoint_url - assert test_aws_fargate_options.ssl_verify - assert not test_aws_fargate_options.endpoint_proxy - assert test_aws_fargate_options.timeout == 0.8 - assert test_aws_fargate_options.log_level == logging.WARN - assert not test_aws_fargate_options.tags - assert not test_aws_fargate_options.zone - + self.aws_fargate_options = AWSFargateOptions() + + assert not self.aws_fargate_options.agent_key + assert not self.aws_fargate_options.endpoint_url + assert self.aws_fargate_options.ssl_verify + assert not self.aws_fargate_options.endpoint_proxy + assert self.aws_fargate_options.timeout == 0.8 + assert self.aws_fargate_options.log_level == logging.WARN + assert not self.aws_fargate_options.tags + assert not self.aws_fargate_options.zone + + @patch.dict( + os.environ, + { + "INSTANA_AGENT_KEY": "key1", + "INSTANA_ENDPOINT_URL": "localhost", + "INSTANA_DISABLE_CA_CHECK": "true", + "INSTANA_ENDPOINT_PROXY": "proxy1", + "INSTANA_TIMEOUT": "3000", + "INSTANA_LOG_LEVEL": "info", + "INSTANA_TAGS": "key1=value1,key2=value2", + "INSTANA_ZONE": "zone1", + }, + ) def test_aws_fargate_options_with_env_vars(self) -> None: - os.environ["INSTANA_AGENT_KEY"] = "key1" - os.environ["INSTANA_ENDPOINT_URL"] = "localhost" - os.environ["INSTANA_DISABLE_CA_CHECK"] = "true" - os.environ["INSTANA_ENDPOINT_PROXY"] = "proxy1" - os.environ["INSTANA_TIMEOUT"] = "3000" - os.environ["INSTANA_LOG_LEVEL"] = "info" - os.environ["INSTANA_TAGS"] = "key1=value1,key2=value2" - os.environ["INSTANA_ZONE"] = "zone1" - - test_aws_fargate_options = AWSFargateOptions() + self.aws_fargate_options = AWSFargateOptions() - assert test_aws_fargate_options.agent_key == "key1" - assert test_aws_fargate_options.endpoint_url == "localhost" - assert not test_aws_fargate_options.ssl_verify - assert test_aws_fargate_options.endpoint_proxy == {"https": "proxy1"} - assert test_aws_fargate_options.timeout == 3 - assert test_aws_fargate_options.log_level == logging.INFO + assert self.aws_fargate_options.agent_key == "key1" + assert self.aws_fargate_options.endpoint_url == "localhost" + assert not self.aws_fargate_options.ssl_verify + assert self.aws_fargate_options.endpoint_proxy == {"https": "proxy1"} + assert self.aws_fargate_options.timeout == 3 + assert self.aws_fargate_options.log_level == logging.INFO - assert test_aws_fargate_options.tags == {"key1": "value1", "key2": "value2"} - assert test_aws_fargate_options.zone == "zone1" + assert self.aws_fargate_options.tags == {"key1": "value1", "key2": "value2"} + assert self.aws_fargate_options.zone == "zone1" class TestEKSFargateOptions: @pytest.fixture(autouse=True) def _resource(self) -> Generator[None, None, None]: + self.eks_fargate_options = None yield - clean_env_vars() def test_eks_fargate_options(self) -> None: - test_eks_fargate_options = EKSFargateOptions() - - assert not test_eks_fargate_options.agent_key - assert not test_eks_fargate_options.endpoint_url - assert test_eks_fargate_options.ssl_verify - assert not test_eks_fargate_options.endpoint_proxy - assert test_eks_fargate_options.timeout == 0.8 - assert test_eks_fargate_options.log_level == logging.WARN - + self.eks_fargate_options = EKSFargateOptions() + + assert not self.eks_fargate_options.agent_key + assert not self.eks_fargate_options.endpoint_url + assert self.eks_fargate_options.ssl_verify + assert not self.eks_fargate_options.endpoint_proxy + assert self.eks_fargate_options.timeout == 0.8 + assert self.eks_fargate_options.log_level == logging.WARN + + @patch.dict( + os.environ, + { + "INSTANA_AGENT_KEY": "key1", + "INSTANA_ENDPOINT_URL": "localhost", + "INSTANA_DISABLE_CA_CHECK": "true", + "INSTANA_ENDPOINT_PROXY": "proxy1", + "INSTANA_TIMEOUT": "3000", + "INSTANA_LOG_LEVEL": "info", + }, + ) def test_eks_fargate_options_with_env_vars(self) -> None: - os.environ["INSTANA_AGENT_KEY"] = "key1" - os.environ["INSTANA_ENDPOINT_URL"] = "localhost" - os.environ["INSTANA_DISABLE_CA_CHECK"] = "true" - os.environ["INSTANA_ENDPOINT_PROXY"] = "proxy1" - os.environ["INSTANA_TIMEOUT"] = "3000" - os.environ["INSTANA_LOG_LEVEL"] = "info" - - test_eks_fargate_options = EKSFargateOptions() + self.eks_fargate_options = EKSFargateOptions() - assert test_eks_fargate_options.agent_key == "key1" - assert test_eks_fargate_options.endpoint_url == "localhost" - assert not test_eks_fargate_options.ssl_verify - assert test_eks_fargate_options.endpoint_proxy == {"https": "proxy1"} - assert test_eks_fargate_options.timeout == 3 - assert test_eks_fargate_options.log_level == logging.INFO + assert self.eks_fargate_options.agent_key == "key1" + assert self.eks_fargate_options.endpoint_url == "localhost" + assert not self.eks_fargate_options.ssl_verify + assert self.eks_fargate_options.endpoint_proxy == {"https": "proxy1"} + assert self.eks_fargate_options.timeout == 3 + assert self.eks_fargate_options.log_level == logging.INFO class TestGCROptions: @pytest.fixture(autouse=True) def _resource(self) -> Generator[None, None, None]: + self.gcr_options = None yield - clean_env_vars() def test_gcr_options(self) -> None: - test_gcr_options = GCROptions() - - assert not test_gcr_options.debug - assert test_gcr_options.log_level == logging.WARN - assert not test_gcr_options.extra_http_headers - assert not test_gcr_options.allow_exit_as_root - assert not test_gcr_options.ignore_endpoints - assert test_gcr_options.secrets_matcher == "contains-ignore-case" - assert test_gcr_options.secrets_list == ["key", "pass", "secret"] - assert not test_gcr_options.secrets - assert not test_gcr_options.agent_key - assert not test_gcr_options.endpoint_url - assert test_gcr_options.ssl_verify - assert not test_gcr_options.endpoint_proxy - assert test_gcr_options.timeout == 0.8 - + self.gcr_options = GCROptions() + + assert not self.gcr_options.debug + assert self.gcr_options.log_level == logging.WARN + assert not self.gcr_options.extra_http_headers + assert not self.gcr_options.allow_exit_as_root + assert not self.gcr_options.ignore_endpoints + assert self.gcr_options.secrets_matcher == "contains-ignore-case" + assert self.gcr_options.secrets_list == ["key", "pass", "secret"] + assert not self.gcr_options.secrets + assert not self.gcr_options.agent_key + assert not self.gcr_options.endpoint_url + assert self.gcr_options.ssl_verify + assert not self.gcr_options.endpoint_proxy + assert self.gcr_options.timeout == 0.8 + + @patch.dict( + os.environ, + { + "INSTANA_AGENT_KEY": "key1", + "INSTANA_ENDPOINT_URL": "localhost", + "INSTANA_DISABLE_CA_CHECK": "true", + "INSTANA_ENDPOINT_PROXY": "proxy1", + "INSTANA_TIMEOUT": "3000", + "INSTANA_LOG_LEVEL": "info", + }, + ) def test_gcr_options_with_env_vars(self) -> None: - os.environ["INSTANA_AGENT_KEY"] = "key1" - os.environ["INSTANA_ENDPOINT_URL"] = "localhost" - os.environ["INSTANA_DISABLE_CA_CHECK"] = "true" - os.environ["INSTANA_ENDPOINT_PROXY"] = "proxy1" - os.environ["INSTANA_TIMEOUT"] = "3000" - os.environ["INSTANA_LOG_LEVEL"] = "info" - - test_gcr_options = GCROptions() - - assert test_gcr_options.agent_key == "key1" - assert test_gcr_options.endpoint_url == "localhost" - assert not test_gcr_options.ssl_verify - assert test_gcr_options.endpoint_proxy == {"https": "proxy1"} - assert test_gcr_options.timeout == 3 - assert test_gcr_options.log_level == logging.INFO + self.gcr_options = GCROptions() + + assert self.gcr_options.agent_key == "key1" + assert self.gcr_options.endpoint_url == "localhost" + assert not self.gcr_options.ssl_verify + assert self.gcr_options.endpoint_proxy == {"https": "proxy1"} + assert self.gcr_options.timeout == 3 + assert self.gcr_options.log_level == logging.INFO diff --git a/tests/util/test_config.py b/tests/util/test_config.py index 891007e8..908035d7 100644 --- a/tests/util/test_config.py +++ b/tests/util/test_config.py @@ -1,8 +1,7 @@ -from typing import Generator - -import pytest +# (c) Copyright IBM Corp. 2025 from instana.util.config import ( + parse_endpoints_of_service, parse_ignored_endpoints, parse_ignored_endpoints_dict, parse_service_pair, @@ -10,33 +9,29 @@ class TestConfig: - @pytest.fixture(autouse=True) - def _resource(self) -> Generator[None, None, None]: - yield - def test_parse_service_pair(self) -> None: - test_string = "service1:endpoint1,endpoint2" + test_string = "service1:method1,method2" response = parse_service_pair(test_string) - assert response == ["service1.endpoint1", "service1.endpoint2"] + assert response == ["service1.method1", "service1.method2"] test_string = "service1;service2" response = parse_ignored_endpoints(test_string) - assert response == ["service1", "service2"] + assert response == ["service1.*", "service2.*"] test_string = "service1" response = parse_ignored_endpoints(test_string) - assert response == ["service1"] + assert response == ["service1.*"] test_string = ";" response = parse_ignored_endpoints(test_string) assert response == [] - test_string = "service1:endpoint1,endpoint2;;;service2:endpoint1;;" + test_string = "service1:method1,method2;;;service2:method1;;" response = parse_ignored_endpoints(test_string) assert response == [ - "service1.endpoint1", - "service1.endpoint2", - "service2.endpoint1", + "service1.method1", + "service1.method2", + "service2.method1", ] test_string = "" @@ -44,28 +39,28 @@ def test_parse_service_pair(self) -> None: assert response == [] def test_parse_ignored_endpoints_string(self) -> None: - test_string = "service1:endpoint1,endpoint2" + test_string = "service1:method1,method2" response = parse_service_pair(test_string) - assert response == ["service1.endpoint1", "service1.endpoint2"] + assert response == ["service1.method1", "service1.method2"] test_string = "service1;service2" response = parse_ignored_endpoints(test_string) - assert response == ["service1", "service2"] + assert response == ["service1.*", "service2.*"] test_string = "service1" response = parse_ignored_endpoints(test_string) - assert response == ["service1"] + assert response == ["service1.*"] test_string = ";" response = parse_ignored_endpoints(test_string) assert response == [] - test_string = "service1:endpoint1,endpoint2;;;service2:endpoint1;;" + test_string = "service1:method1,method2;;;service2:method1;;" response = parse_ignored_endpoints(test_string) assert response == [ - "service1.endpoint1", - "service1.endpoint2", - "service2.endpoint1", + "service1.method1", + "service1.method2", + "service2.method1", ] test_string = "" @@ -73,67 +68,92 @@ def test_parse_ignored_endpoints_string(self) -> None: assert response == [] def test_parse_ignored_endpoints_dict(self) -> None: - test_dict = {"service1": ["endpoint1", "endpoint2"]} + test_dict = {"service1": ["method1", "method2"]} response = parse_ignored_endpoints_dict(test_dict) - assert response == ["service1.endpoint1", "service1.endpoint2"] + assert response == ["service1.method1", "service1.method2"] - test_dict = {"SERVICE1": ["ENDPOINT1", "ENDPOINT2"]} + test_dict = {"SERVICE1": ["method1", "method2"]} response = parse_ignored_endpoints_dict(test_dict) - assert response == ["service1.endpoint1", "service1.endpoint2"] + assert response == ["service1.method1", "service1.method2"] test_dict = {"service1": [], "service2": []} response = parse_ignored_endpoints_dict(test_dict) - assert response == ["service1", "service2"] + assert response == ["service1.*", "service2.*"] test_dict = {"service1": []} response = parse_ignored_endpoints_dict(test_dict) - assert response == ["service1"] + assert response == ["service1.*"] test_dict = {} response = parse_ignored_endpoints_dict(test_dict) assert response == [] def test_parse_ignored_endpoints(self) -> None: - test_pair = "service1:endpoint1,endpoint2" + test_pair = "service1:method1,method2" response = parse_ignored_endpoints(test_pair) - assert response == ["service1.endpoint1", "service1.endpoint2"] + assert response == ["service1.method1", "service1.method2"] test_pair = "service1;service2" response = parse_ignored_endpoints(test_pair) - assert response == ["service1", "service2"] + assert response == ["service1.*", "service2.*"] test_pair = "service1" response = parse_ignored_endpoints(test_pair) - assert response == ["service1"] + assert response == ["service1.*"] test_pair = ";" response = parse_ignored_endpoints(test_pair) assert response == [] - test_pair = "service1:endpoint1,endpoint2;;;service2:endpoint1;;" + test_pair = "service1:method1,method2;;;service2:method1;;" response = parse_ignored_endpoints(test_pair) assert response == [ - "service1.endpoint1", - "service1.endpoint2", - "service2.endpoint1", + "service1.method1", + "service1.method2", + "service2.method1", ] test_pair = "" response = parse_ignored_endpoints(test_pair) assert response == [] - test_dict = {"service1": ["endpoint1", "endpoint2"]} + test_dict = {"service1": ["method1", "method2"]} response = parse_ignored_endpoints(test_dict) - assert response == ["service1.endpoint1", "service1.endpoint2"] + assert response == ["service1.method1", "service1.method2"] test_dict = {"service1": [], "service2": []} response = parse_ignored_endpoints(test_dict) - assert response == ["service1", "service2"] + assert response == ["service1.*", "service2.*"] test_dict = {"service1": []} response = parse_ignored_endpoints(test_dict) - assert response == ["service1"] + assert response == ["service1.*"] test_dict = {} response = parse_ignored_endpoints(test_dict) assert response == [] + + def test_parse_endpoints_of_service(self) -> None: + test_ignore_endpoints = { + "service1": ["method1", "method2"], + "service2": ["method3", "method4"], + "kafka": [ + { + "methods": ["method5", "method6"], + "endpoints": ["endpoint1", "endpoint2"], + } + ], + } + ignore_endpoints = [] + for service, methods in test_ignore_endpoints.items(): + ignore_endpoints.extend(parse_endpoints_of_service([], service, methods)) + assert ignore_endpoints == [ + "service1.method1", + "service1.method2", + "service2.method3", + "service2.method4", + "kafka.method5.endpoint1", + "kafka.method5.endpoint2", + "kafka.method6.endpoint1", + "kafka.method6.endpoint2", + ] diff --git a/tests/util/test_config_reader.py b/tests/util/test_config_reader.py new file mode 100644 index 00000000..0c9c3ede --- /dev/null +++ b/tests/util/test_config_reader.py @@ -0,0 +1,63 @@ +# (c) Copyright IBM Corp. 2025 + +import logging + +import pytest + +from instana.util.config import parse_ignored_endpoints_from_yaml + + +class TestConfigReader: + def test_load_configuration_with_tracing( + self, caplog: pytest.LogCaptureFixture + ) -> None: + caplog.set_level(logging.DEBUG, logger="instana") + + ignore_endpoints = parse_ignored_endpoints_from_yaml( + "tests/util/test_configuration-1.yaml" + ) + # test with tracing + assert ignore_endpoints == [ + "redis.get", + "redis.type", + "dynamodb.query", + "kafka.consume.span-topic", + "kafka.consume.topic1", + "kafka.consume.topic2", + "kafka.send.span-topic", + "kafka.send.topic1", + "kafka.send.topic2", + "kafka.consume.topic3", + "kafka.*.span-topic", + "kafka.*.topic4", + ] + + assert ( + 'Please use "tracing" instead of "com.instana.tracing" for local configuration file.' + not in caplog.messages + ) + + def test_load_configuration_legacy(self, caplog: pytest.LogCaptureFixture) -> None: + caplog.set_level(logging.DEBUG, logger="instana") + + ignore_endpoints = parse_ignored_endpoints_from_yaml( + "tests/util/test_configuration-2.yaml" + ) + assert ignore_endpoints == [ + "redis.get", + "redis.type", + "dynamodb.query", + "kafka.consume.span-topic", + "kafka.consume.topic1", + "kafka.consume.topic2", + "kafka.send.span-topic", + "kafka.send.topic1", + "kafka.send.topic2", + "kafka.consume.topic3", + "kafka.*.span-topic", + "kafka.*.topic4", + ] + assert ( + 'Please use "tracing" instead of "com.instana.tracing" for local configuration file.' + in caplog.messages + ) diff --git a/tests/util/test_configuration-1.yaml b/tests/util/test_configuration-1.yaml new file mode 100644 index 00000000..af890a35 --- /dev/null +++ b/tests/util/test_configuration-1.yaml @@ -0,0 +1,19 @@ +# (c) Copyright IBM Corp. 2025 + +# service-level configuration, aligning with in-code settings +tracing: + ignore-endpoints: + redis: + - get + - type + dynamodb: + - query + kafka: + - methods: ["consume", "send"] + endpoints: ["span-topic", "topic1", "topic2"] + - methods: ["consume"] + endpoints: ["topic3"] + - methods: ["*"] # Applied to all methods + endpoints: ["span-topic", "topic4"] + # - methods: ["consume", "send"] + # endpoints: ["*"] # Applied to all topics diff --git a/tests/util/test_configuration-2.yaml b/tests/util/test_configuration-2.yaml new file mode 100644 index 00000000..582202f0 --- /dev/null +++ b/tests/util/test_configuration-2.yaml @@ -0,0 +1,19 @@ +# (c) Copyright IBM Corp. 2025 + +# service-level configuration, aligning with in-code settings +com.instana.tracing: + ignore-endpoints: + redis: + - get + - type + dynamodb: + - query + kafka: + - methods: ["consume", "send"] + endpoints: ["span-topic", "topic1", "topic2"] + - methods: ["consume"] + endpoints: ["topic3"] + - methods: ["*"] # Applied to all methods + endpoints: ["span-topic", "topic4"] + # - methods: ["consume", "send"] + # endpoints: ["*"] # Applied to all topics diff --git a/tests/util/test_span_utils.py b/tests/util/test_span_utils.py index 32f623c6..22f67653 100644 --- a/tests/util/test_span_utils.py +++ b/tests/util/test_span_utils.py @@ -1,15 +1,22 @@ -from typing import Optional +from typing import List, Optional import pytest -from instana.util.span_utils import get_operation_specifier +from instana.util.span_utils import get_operation_specifiers @pytest.mark.parametrize( "span_name, expected_result", - [("something", ""), ("redis", "command"), ("dynamodb", "op")], + [ + ("something", ["", ""]), + ("redis", ["command", ""]), + ("dynamodb", ["op", ""]), + ("kafka", ["access", "service"]), + ], ) -def test_get_operation_specifier( - span_name: str, expected_result: Optional[str] +def test_get_operation_specifiers( + span_name: str, + expected_result: Optional[List[str]], ) -> None: - response_redis = get_operation_specifier(span_name) - assert response_redis == expected_result + operation_specifier, service_specifier = get_operation_specifiers(span_name) + assert operation_specifier == expected_result[0] + assert service_specifier == expected_result[1]