diff --git a/bottlecap/LICENSE-3rdparty.yml b/bottlecap/LICENSE-3rdparty.yml index da344a202..468f4892a 100644 --- a/bottlecap/LICENSE-3rdparty.yml +++ b/bottlecap/LICENSE-3rdparty.yml @@ -40,7 +40,19 @@ third_party_libraries: license: 0BSD OR MIT OR Apache-2.0 licenses: - license: 0BSD - text: NOT FOUND + text: | + Copyright (C) Jonas Schievink + + Permission to use, copy, modify, and/or distribute this software for + any purpose with or without fee is hereby granted. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN + AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT + OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. - license: MIT text: | Permission is hereby granted, free of charge, to any diff --git a/bottlecap/src/lifecycle/invocation/processor.rs b/bottlecap/src/lifecycle/invocation/processor.rs index b030b7303..e7835c1cf 100644 --- a/bottlecap/src/lifecycle/invocation/processor.rs +++ b/bottlecap/src/lifecycle/invocation/processor.rs @@ -361,6 +361,7 @@ impl Processor { header_tags, vec![traces], body_size, + self.inferrer.span_pointers.clone(), ); if let Err(e) = trace_agent_tx.send(send_data).await { diff --git a/bottlecap/src/lifecycle/invocation/span_inferrer.rs b/bottlecap/src/lifecycle/invocation/span_inferrer.rs index a916305f7..cfa540986 100644 --- a/bottlecap/src/lifecycle/invocation/span_inferrer.rs +++ b/bottlecap/src/lifecycle/invocation/span_inferrer.rs @@ -22,6 +22,7 @@ use crate::lifecycle::invocation::{ Trigger, FUNCTION_TRIGGER_EVENT_SOURCE_ARN_TAG, }, }; +use crate::traces::span_pointers::SpanPointer; use crate::traces::{context::SpanContext, propagation::Propagator}; #[derive(Default)] @@ -39,6 +40,8 @@ pub struct SpanInferrer { generated_span_context: Option, // Tags generated from the trigger trigger_tags: Option>, + // Span pointers from S3 or DynamoDB streams + pub span_pointers: Option>, } impl SpanInferrer { @@ -52,6 +55,7 @@ impl SpanInferrer { carrier: None, generated_span_context: None, trigger_tags: None, + span_pointers: None, } } @@ -178,6 +182,7 @@ impl SpanInferrer { } else if S3Record::is_match(payload_value) { if let Some(t) = S3Record::new(payload_value.clone()) { t.enrich_span(&mut inferred_span, &self.service_mapping); + self.span_pointers = t.get_span_pointers(); trigger = Some(Box::new(t)); } diff --git a/bottlecap/src/lifecycle/invocation/triggers/s3_event.rs b/bottlecap/src/lifecycle/invocation/triggers/s3_event.rs index 43065cb0f..2172700b4 100644 --- a/bottlecap/src/lifecycle/invocation/triggers/s3_event.rs +++ b/bottlecap/src/lifecycle/invocation/triggers/s3_event.rs @@ -10,6 +10,7 @@ use crate::lifecycle::invocation::{ processor::MS_TO_NS, triggers::{ServiceNameResolver, Trigger, FUNCTION_TRIGGER_EVENT_SOURCE_TAG}, }; +use crate::traces::span_pointers::{generate_span_pointer_hash, SpanPointer}; #[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] pub struct S3Event { @@ -133,6 +134,28 @@ impl ServiceNameResolver for S3Record { } } +impl S3Record { + pub fn get_span_pointers(&self) -> Option> { + let bucket_name = &self.s3.bucket.name; + let key = &self.s3.object.key; + // The AWS SDK sometimes wraps the S3 eTag in quotes, but sometimes doesn't. + let e_tag = self.s3.object.e_tag.trim_matches('"'); + + if bucket_name.is_empty() || key.is_empty() || e_tag.is_empty() { + debug!("Unable to create span pointer because bucket name, key, or etag is missing."); + return None; + } + + // https://github.com/DataDog/dd-span-pointer-rules/blob/main/AWS/S3/Object/README.md + let hash = generate_span_pointer_hash(&[bucket_name, key, e_tag]); + + Some(vec![SpanPointer { + hash, + kind: String::from("aws.s3.object"), + }]) + } +} + #[cfg(test)] #[allow(clippy::unwrap_used)] mod tests { @@ -274,4 +297,51 @@ mod tests { "generic-service" ); } + + #[test] + fn test_get_span_pointers() { + let event = S3Record { + event_source: String::from("aws:s3"), + event_time: Utc::now(), + event_name: String::from("ObjectCreated:Put"), + s3: S3Entity { + bucket: S3Bucket { + name: String::from("test-bucket"), + arn: String::from("arn:aws:s3:::test-bucket"), + }, + object: S3Object { + key: String::from("test/key"), + size: 1024, + e_tag: String::from("0123456789abcdef0123456789abcdef"), + }, + }, + }; // + + let span_pointers = event.get_span_pointers().expect("Should return Some(vec)"); + assert_eq!(span_pointers.len(), 1); + assert_eq!(span_pointers[0].kind, "aws.s3.object"); + assert_eq!(span_pointers[0].hash, "40df87dbfdf59f32253a2668c23e51b4"); + } + + #[test] + fn test_get_span_pointers_missing_fields() { + let event = S3Record { + event_source: String::from("aws:s3"), + event_time: Utc::now(), + event_name: String::from("ObjectCreated:Put"), + s3: S3Entity { + bucket: S3Bucket { + name: String::new(), // Empty bucket name + arn: String::from("arn"), + }, + object: S3Object { + key: String::from("key"), + size: 0, + e_tag: String::from("etag"), + }, + }, + }; + + assert!(event.get_span_pointers().is_none()); + } } diff --git a/bottlecap/src/traces/mod.rs b/bottlecap/src/traces/mod.rs index 9c87051cf..96b2ee9ab 100644 --- a/bottlecap/src/traces/mod.rs +++ b/bottlecap/src/traces/mod.rs @@ -3,6 +3,7 @@ pub mod context; pub mod propagation; +pub mod span_pointers; pub mod stats_flusher; pub mod stats_processor; pub mod trace_agent; diff --git a/bottlecap/src/traces/span_pointers.rs b/bottlecap/src/traces/span_pointers.rs new file mode 100644 index 000000000..e238d2b54 --- /dev/null +++ b/bottlecap/src/traces/span_pointers.rs @@ -0,0 +1,257 @@ +// Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +use datadog_trace_protobuf::pb::SpanLink; +use sha2::{Digest, Sha256}; +use std::collections::HashMap; + +const SPAN_POINTER_HASH_LENGTH: usize = 32; + +#[derive(Clone)] +pub struct SpanPointer { + pub hash: String, + pub kind: String, +} + +/// Returns the first 32 characters of the SHA-256 hash of the components joined by a '|'. +/// Used by span pointers to uniquely & deterministically identify an `S3` or `DynamoDB` stream. +/// +#[must_use] +pub fn generate_span_pointer_hash(components: &[&str]) -> String { + let mut hasher = Sha256::new(); + hasher.update(components.join("|").as_bytes()); + let result = hasher.finalize(); + hex::encode(result)[..SPAN_POINTER_HASH_LENGTH].to_string() +} + +pub fn attach_span_pointers_to_meta( + meta: &mut HashMap, + span_pointers: &Option>, +) { + let Some(span_pointers) = span_pointers.as_ref().filter(|sp| !sp.is_empty()) else { + return; + }; + + let new_span_links: Vec = span_pointers + .iter() + .map(|sp| { + SpanLink { + // We set all these fields as 0 or empty since they're unknown; the frontend + // uses `ptr.hash` instead to find the opposite link if it exists. + trace_id: 0, + span_id: 0, + trace_id_high: 0, + tracestate: String::new(), + flags: 0, + attributes: HashMap::from([ + ("link.kind".to_string(), "span-pointer".to_string()), + ("ptr.dir".to_string(), "u".to_string()), + ("ptr.hash".to_string(), sp.hash.clone()), + ("ptr.kind".to_string(), sp.kind.clone()), + ]), + } + }) + .collect(); + + let mut all_span_links = meta + .get("_dd.span_links") + .and_then(|existing| serde_json::from_str::>(existing).ok()) + .unwrap_or_default(); + + all_span_links.extend(new_span_links); + let _ = serde_json::to_string(&all_span_links) + .map(|json| meta.insert("_dd.span_links".to_string(), json)); +} + +#[cfg(test)] +mod tests { + use super::*; + use serde_json::json; + use std::collections::HashMap; + + #[derive(Debug, Default)] + struct TestSpan { + pub meta: HashMap, + } + + struct SpanPointerTestCase { + test_name: &'static str, + existing_links: Option, + span_pointers: Option>, + expected_links: Option, + } + + #[test] + fn test_attach_span_pointers_to_span() { + let test_cases = vec![ + SpanPointerTestCase { + test_name: "adds span links to span", + existing_links: None, + span_pointers: Some(vec![ + SpanPointer { + hash: "hash1".to_string(), + kind: "test.kind1".to_string(), + }, + SpanPointer { + hash: "hash2".to_string(), + kind: "test.kind2".to_string(), + }, + ]), + expected_links: Some(json!([ + { + "attributes": { + "link.kind": "span-pointer", + "ptr.dir": "u", + "ptr.hash": "hash1", + "ptr.kind": "test.kind1" + }, + "span_id": 0, + "trace_id": 0, + "trace_id_high": 0, + "tracestate": "", + "flags": 0 + }, + { + "attributes": { + "link.kind": "span-pointer", + "ptr.dir": "u", + "ptr.hash": "hash2", + "ptr.kind": "test.kind2" + }, + "span_id": 0, + "trace_id": 0, + "trace_id_high": 0, + "tracestate": "", + "flags": 0 + } + ])), + }, + SpanPointerTestCase { + test_name: "handles empty span pointers", + existing_links: None, + span_pointers: Some(vec![]), + expected_links: None, + }, + SpanPointerTestCase { + test_name: "handles None span pointers", + existing_links: None, + span_pointers: None, + expected_links: None, + }, + SpanPointerTestCase { + test_name: "appends to existing span links", + existing_links: Some(json!([{ + "attributes": { + "link.kind": "span-pointer", + "ptr.dir": "d", + "ptr.hash": "hash1", + "ptr.kind": "test.kind1" + }, + "span_id": 0, + "trace_id": 0, + "trace_id_high": 0, + "tracestate": "", + "flags": 0 + }])), + span_pointers: Some(vec![SpanPointer { + hash: "hash2".to_string(), + kind: "test.kind2".to_string(), + }]), + expected_links: Some(json!([ + { + "attributes": { + "link.kind": "span-pointer", + "ptr.dir": "d", + "ptr.hash": "hash1", + "ptr.kind": "test.kind1" + }, + "span_id": 0, + "trace_id": 0, + "trace_id_high": 0, + "tracestate": "", + "flags": 0 + }, + { + "attributes": { + "link.kind": "span-pointer", + "ptr.dir": "u", + "ptr.hash": "hash2", + "ptr.kind": "test.kind2" + }, + "span_id": 0, + "trace_id": 0, + "trace_id_high": 0, + "tracestate": "", + "flags": 0 + } + ])), + }, + ]; + + for case in test_cases { + let mut test_span = TestSpan { + meta: HashMap::new(), + }; + + // Set up existing links if any + if let Some(links) = case.existing_links { + test_span + .meta + .insert("_dd.span_links".to_string(), links.to_string()); + } + + attach_span_pointers_to_meta(&mut test_span.meta, &case.span_pointers); + + match case.expected_links { + Some(expected) => { + let span_links = test_span.meta.get("_dd.span_links").unwrap_or_else(|| { + panic!( + "[{}] _dd.span_links should be present in span meta", + case.test_name + ) + }); + let actual_links: serde_json::Value = + serde_json::from_str(span_links).expect("Should be valid JSON"); + assert_eq!( + actual_links, expected, + "Failed test case: {}", + case.test_name + ); + } + None => { + assert!( + !test_span.meta.contains_key("_dd.span_links"), + "Failed test case: {}", + case.test_name + ); + } + } + } + } + + #[test] + fn test_generate_span_pointer_hash() { + let test_cases = vec![ + ( + "basic values", + vec!["some-bucket", "some-key.data", "ab12ef34"], + "e721375466d4116ab551213fdea08413", + ), + ( + "non-ascii key", + vec!["some-bucket", "some-key.你好", "ab12ef34"], + "d1333a04b9928ab462b5c6cadfa401f4", + ), + ( + "multipart-upload", + vec!["some-bucket", "some-key.data", "ab12ef34-5"], + "2b90dffc37ebc7bc610152c3dc72af9f", + ), + ]; + + for (name, components, expected_hash) in test_cases { + let actual_hash = generate_span_pointer_hash(&components); + assert_eq!(actual_hash, expected_hash, "Test case: {name}"); + } + } +} diff --git a/bottlecap/src/traces/trace_agent.rs b/bottlecap/src/traces/trace_agent.rs index c617def85..2a0023908 100644 --- a/bottlecap/src/traces/trace_agent.rs +++ b/bottlecap/src/traces/trace_agent.rs @@ -258,6 +258,7 @@ impl TraceAgent { tracer_header_tags, traces, body_size, + None, ); // send trace payload to our trace flusher diff --git a/bottlecap/src/traces/trace_processor.rs b/bottlecap/src/traces/trace_processor.rs index 626a54654..8480e0cad 100644 --- a/bottlecap/src/traces/trace_processor.rs +++ b/bottlecap/src/traces/trace_processor.rs @@ -2,6 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 use crate::tags::provider; +use crate::traces::span_pointers::{attach_span_pointers_to_meta, SpanPointer}; use datadog_trace_obfuscation::obfuscation_config; use datadog_trace_protobuf::pb; use datadog_trace_utils::config_utils::trace_intake_url; @@ -32,10 +33,11 @@ pub struct ServerlessTraceProcessor { struct ChunkProcessor { obfuscation_config: Arc, tags_provider: Arc, + span_pointers: Option>, } impl TraceChunkProcessor for ChunkProcessor { - fn process(&mut self, chunk: &mut pb::TraceChunk, _index: usize) { + fn process(&mut self, chunk: &mut pb::TraceChunk, root_span_index: usize) { chunk .spans .retain(|span| !filter_span_from_lambda_library_or_runtime(span)); @@ -57,6 +59,10 @@ impl TraceChunkProcessor for ChunkProcessor { .insert("_dd.origin".to_string(), "lambda".to_string()); obfuscate_span(span, &self.obfuscation_config); } + + if let Some(span) = chunk.spans.get_mut(root_span_index) { + attach_span_pointers_to_meta(&mut span.meta, &self.span_pointers); + } } } @@ -116,6 +122,7 @@ pub trait TraceProcessor { header_tags: tracer_header_tags::TracerHeaderTags, traces: Vec>, body_size: usize, + span_pointers: Option>, ) -> SendData; } @@ -127,6 +134,7 @@ impl TraceProcessor for ServerlessTraceProcessor { header_tags: tracer_header_tags::TracerHeaderTags, traces: Vec>, body_size: usize, + span_pointers: Option>, ) -> SendData { let payload = trace_utils::collect_trace_chunks( V07(traces), @@ -134,6 +142,7 @@ impl TraceProcessor for ServerlessTraceProcessor { &mut ChunkProcessor { obfuscation_config: self.obfuscation_config.clone(), tags_provider: tags_provider.clone(), + span_pointers, }, true, ); @@ -265,8 +274,14 @@ mod tests { }; let config = create_test_config(); let tags_provider = create_tags_provider(config.clone()); - let tracer_payload = - trace_processor.process_traces(config, tags_provider.clone(), header_tags, traces, 100); + let tracer_payload = trace_processor.process_traces( + config, + tags_provider.clone(), + header_tags, + traces, + 100, + None, + ); let expected_tracer_payload = pb::TracerPayload { container_id: "33".to_string(),