diff --git a/libdd-data-pipeline/src/telemetry/mod.rs b/libdd-data-pipeline/src/telemetry/mod.rs index cd196e3556..3ea0ea2096 100644 --- a/libdd-data-pipeline/src/telemetry/mod.rs +++ b/libdd-data-pipeline/src/telemetry/mod.rs @@ -13,7 +13,6 @@ use libdd_telemetry::worker::{ }; use libdd_trace_utils::{ send_with_retry::{SendWithRetryError, SendWithRetryResult}, - span::trace_utils::DroppedStats, trace_utils::SendDataResult, }; use std::{collections::HashMap, time::Duration}; @@ -165,8 +164,6 @@ pub struct SendPayloadTelemetry { errors_status_code: u64, bytes_sent: u64, chunks_sent: u64, - chunks_dropped_p0: u64, - chunks_dropped_by_trace_filter: u64, chunks_dropped_serialization_error: u64, chunks_dropped_send_failure: u64, responses_count_per_code: HashMap, @@ -195,18 +192,8 @@ impl SendPayloadTelemetry { /// * `value` - The result of sending traces with retry /// * `bytes_sent` - The number of bytes in the payload /// * `chunks` - The number of trace chunks in the payload - /// * `dropped_stats` - Trace dropped stats from `stats::process_traces_for_stats` - pub fn from_retry_result( - value: &SendWithRetryResult, - bytes_sent: u64, - chunks: u64, - dropped_stats: DroppedStats, - ) -> Self { - let mut telemetry = Self { - chunks_dropped_p0: dropped_stats.dropped_p0_traces as u64, - chunks_dropped_by_trace_filter: dropped_stats.dropped_by_trace_filter as u64, - ..Default::default() - }; + pub fn from_retry_result(value: &SendWithRetryResult, bytes_sent: u64, chunks: u64) -> Self { + let mut telemetry = Self::default(); match value { Ok((response, attempts)) => { telemetry.chunks_sent = chunks; @@ -286,18 +273,6 @@ impl TelemetryClient { self.worker .add_point(data.chunks_sent as f64, key, vec![])?; } - if data.chunks_dropped_p0 > 0 { - let key = self.metrics.get(metrics::MetricKind::ChunksDroppedP0); - self.worker - .add_point(data.chunks_dropped_p0 as f64, key, vec![])?; - } - if data.chunks_dropped_by_trace_filter > 0 { - let key = self - .metrics - .get(metrics::MetricKind::ChunksDroppedByTraceFilter); - self.worker - .add_point(data.chunks_dropped_by_trace_filter as f64, key, vec![])?; - } if data.chunks_dropped_serialization_error > 0 { let key = self .metrics @@ -322,6 +297,27 @@ impl TelemetryClient { Ok(()) } + /// Send dropped P0 trace counts to telemetry. + pub fn send_client_side_stats_drops( + &self, + dropped_p0_traces: usize, + dropped_by_trace_filter: usize, + ) -> Result<(), TelemetryError> { + if dropped_p0_traces > 0 { + let key = self.metrics.get(metrics::MetricKind::ChunksDroppedP0); + self.worker + .add_point(dropped_p0_traces as f64, key, vec![])?; + } + if dropped_by_trace_filter > 0 { + let key = self + .metrics + .get(metrics::MetricKind::ChunksDroppedByTraceFilter); + self.worker + .add_point(dropped_by_trace_filter as f64, key, vec![])?; + } + Ok(()) + } + /// Starts the client pub async fn start(&self) { _ = self @@ -645,23 +641,22 @@ mod tests { #[cfg_attr(miri, ignore)] #[test] - fn chunks_dropped_p0_test() { - let payload = Regex::new(r#""metric":"trace_chunks_dropped","points":\[\[\d+,1\.0\]\],"tags":\["src_library:libdatadog","reason:p0_drop"\],"common":true,"type":"count"#).unwrap(); + fn send_client_side_stats_drops_test() { + let payload_p0 = Regex::new(r#""metric":"trace_chunks_dropped","points":\[\[\d+,3\.0\]\],"tags":\["src_library:libdatadog","reason:p0_drop"\],"common":true,"type":"count"#).unwrap(); + let payload_trace_filter = Regex::new(r#""metric":"trace_chunks_dropped","points":\[\[\d+,5\.0\]\],"tags":\["src_library:libdatadog","reason:trace_filters"\],"common":true,"type":"count"#).unwrap(); let shared_runtime = SharedRuntime::new().expect("Failed to create runtime"); let server = MockServer::start(); let mut telemetry_srv = server.mock(|when, then| { - when.method(POST).body_matches(payload); + when.method(POST) + .body_matches(payload_p0) + .body_matches(payload_trace_filter); then.status(200).body(""); }); - let data = SendPayloadTelemetry { - chunks_dropped_p0: 1, - ..Default::default() - }; let (client, handle) = get_test_client(&server.url("/"), &shared_runtime); shared_runtime .block_on(async { client.start().await; - let _ = client.send(&data); + client.send_client_side_stats_drops(3, 5).unwrap(); // Wait for send to be processed sleep(Duration::from_millis(100)).await; @@ -714,16 +709,7 @@ mod tests { .unwrap(), 3, )); - let telemetry = SendPayloadTelemetry::from_retry_result( - &result, - 4, - 5, - DroppedStats { - dropped_p0_traces: 0, - dropped_p0_spans: 0, - dropped_by_trace_filter: 0, - }, - ); + let telemetry = SendPayloadTelemetry::from_retry_result(&result, 4, 5); assert_eq!( telemetry, SendPayloadTelemetry { @@ -736,38 +722,6 @@ mod tests { ) } - #[test] - fn telemetry_from_ok_response_with_p0_drops_test() { - let result = Ok(( - http::Response::builder() - .status(http::StatusCode::OK) - .body(Bytes::new()) - .unwrap(), - 3, - )); - let telemetry = SendPayloadTelemetry::from_retry_result( - &result, - 4, - 5, - DroppedStats { - dropped_p0_traces: 10, - dropped_p0_spans: 0, - dropped_by_trace_filter: 0, - }, - ); - assert_eq!( - telemetry, - SendPayloadTelemetry { - bytes_sent: 4, - chunks_sent: 5, - requests_count: 3, - chunks_dropped_p0: 10, - responses_count_per_code: HashMap::from([(200, 1)]), - ..Default::default() - } - ) - } - #[test] fn telemetry_from_request_error_test() { let error_response = http::Response::builder() @@ -775,16 +729,7 @@ mod tests { .body(Bytes::new()) .unwrap(); let result = Err(SendWithRetryError::Http(error_response, 5)); - let telemetry = SendPayloadTelemetry::from_retry_result( - &result, - 1, - 2, - DroppedStats { - dropped_p0_traces: 0, - dropped_p0_spans: 0, - dropped_by_trace_filter: 0, - }, - ); + let telemetry = SendPayloadTelemetry::from_retry_result(&result, 1, 2); assert_eq!( telemetry, SendPayloadTelemetry { @@ -803,16 +748,7 @@ mod tests { HttpError::Network(anyhow::anyhow!("connection refused")), 5, )); - let telemetry = SendPayloadTelemetry::from_retry_result( - &result, - 1, - 2, - DroppedStats { - dropped_p0_traces: 0, - dropped_p0_spans: 0, - dropped_by_trace_filter: 0, - }, - ); + let telemetry = SendPayloadTelemetry::from_retry_result(&result, 1, 2); assert_eq!( telemetry, SendPayloadTelemetry { @@ -827,16 +763,7 @@ mod tests { #[test] fn telemetry_from_timeout_error_test() { let result = Err(SendWithRetryError::Timeout(5)); - let telemetry = SendPayloadTelemetry::from_retry_result( - &result, - 1, - 2, - DroppedStats { - dropped_p0_traces: 0, - dropped_p0_spans: 0, - dropped_by_trace_filter: 0, - }, - ); + let telemetry = SendPayloadTelemetry::from_retry_result(&result, 1, 2); assert_eq!( telemetry, SendPayloadTelemetry { @@ -852,16 +779,7 @@ mod tests { #[test] fn telemetry_from_build_error_test() { let result = Err(SendWithRetryError::Build(5)); - let telemetry = SendPayloadTelemetry::from_retry_result( - &result, - 1, - 2, - DroppedStats { - dropped_p0_traces: 0, - dropped_p0_spans: 0, - dropped_by_trace_filter: 0, - }, - ); + let telemetry = SendPayloadTelemetry::from_retry_result(&result, 1, 2); assert_eq!( telemetry, SendPayloadTelemetry { diff --git a/libdd-data-pipeline/src/trace_exporter/mod.rs b/libdd-data-pipeline/src/trace_exporter/mod.rs index 0486a32678..a8acd1e336 100644 --- a/libdd-data-pipeline/src/trace_exporter/mod.rs +++ b/libdd-data-pipeline/src/trace_exporter/mod.rs @@ -9,7 +9,7 @@ mod trace_serializer; // Re-export the builder pub use builder::TraceExporterBuilder; -use libdd_trace_utils::{span::trace_utils::DroppedStats, trace_filter::TraceFilterer}; +use libdd_trace_utils::trace_filter::TraceFilterer; use self::agent_response::AgentResponse; use self::metrics::MetricsEmitter; @@ -577,8 +577,6 @@ impl Tra mp_payload: Vec, headers: HeaderMap, chunks: usize, - #[cfg_attr(not(feature = "telemetry"), allow(unused_variables))] - dropped_stats: DroppedStats, ) -> Result { let strategy = RetryStrategy::default(); let payload_len = mp_payload.len(); @@ -599,7 +597,6 @@ impl Tra &result, payload_len as u64, chunks as u64, - dropped_stats, )) { error!(?e, "Error sending telemetry"); } @@ -616,12 +613,14 @@ impl Tra // Process stats computation and drop non-sampled (p0) chunks. // This must run before the OTLP path so that unsampled spans are not exported. - let dropped_stats = stats::process_traces_for_stats( + stats::process_traces_for_stats( &mut traces, &mut header_tags, &self.client_side_stats.status, self.client_computed_top_level, &self.trace_filterer.load(), + #[cfg(all(not(target_arch = "wasm32"), feature = "telemetry"))] + self.telemetry.as_ref(), ); for chunk in &mut traces { @@ -674,7 +673,6 @@ impl Tra prepared.data, prepared.headers, prepared.chunk_count, - dropped_stats, ) .await; diff --git a/libdd-data-pipeline/src/trace_exporter/stats.rs b/libdd-data-pipeline/src/trace_exporter/stats.rs index 93d668781b..6a394d3249 100644 --- a/libdd-data-pipeline/src/trace_exporter/stats.rs +++ b/libdd-data-pipeline/src/trace_exporter/stats.rs @@ -299,14 +299,19 @@ fn add_spans_to_stats( } /// Process traces for stats computation and update header tags accordingly. -/// Returns the number of P0 traces and spans that were dropped. +/// +/// If a telemetry client is provided and stats are enabled, dropped P0 counts +/// will be sent to telemetry. pub(crate) fn process_traces_for_stats( traces: &mut Vec>>, header_tags: &mut libdd_trace_utils::trace_utils::TracerHeaderTags, client_side_stats: &ArcSwap, client_computed_top_level: bool, trace_filterer: &TraceFilterer, -) -> libdd_trace_utils::span::trace_utils::DroppedStats { + #[cfg(all(not(target_arch = "wasm32"), feature = "telemetry"))] telemetry: Option< + &crate::telemetry::TelemetryClient, + >, +) { let status = client_side_stats.load(); if let StatsComputationStatus::Enabled { stats_concentrator, .. @@ -322,22 +327,24 @@ pub(crate) fn process_traces_for_stats( add_spans_to_stats(stats_concentrator, traces); // Once stats have been computed we can drop all chunks that are not going to be // sampled by the agent - let mut dropped_stats = libdd_trace_utils::span::trace_utils::drop_chunks(traces); - dropped_stats.dropped_by_trace_filter = dropped_by_trace_filter; + let dropped_p0_stats = libdd_trace_utils::span::trace_utils::drop_chunks(traces); // Update the headers to indicate that stats have been computed and forward dropped // traces counts header_tags.client_computed_top_level = true; header_tags.client_computed_stats = true; - header_tags.dropped_p0_traces = dropped_stats.dropped_p0_traces; - header_tags.dropped_p0_spans = dropped_stats.dropped_p0_spans; - - dropped_stats - } else { - libdd_trace_utils::span::trace_utils::DroppedStats { - dropped_p0_traces: 0, - dropped_p0_spans: 0, - dropped_by_trace_filter: 0, + header_tags.dropped_p0_traces = dropped_p0_stats.dropped_p0_traces; + header_tags.dropped_p0_spans = dropped_p0_stats.dropped_p0_spans; + + // Send dropped P0 stats directly to telemetry if available + #[cfg(all(not(target_arch = "wasm32"), feature = "telemetry"))] + if let Some(telemetry_client) = telemetry { + if let Err(e) = telemetry_client.send_client_side_stats_drops( + dropped_p0_stats.dropped_p0_traces, + dropped_by_trace_filter, + ) { + tracing::error!(?e, "Error sending dropped P0 stats to telemetry"); + } } } } diff --git a/libdd-trace-utils/src/span/trace_utils.rs b/libdd-trace-utils/src/span/trace_utils.rs index 54699a484e..60790aa3cb 100644 --- a/libdd-trace-utils/src/span/trace_utils.rs +++ b/libdd-trace-utils/src/span/trace_utils.rs @@ -125,10 +125,9 @@ pub fn is_partial_snapshot(span: &Span) -> bool { .is_some_and(|v| *v >= 0.0) } -pub struct DroppedStats { +pub struct DroppedP0Stats { pub dropped_p0_traces: usize, pub dropped_p0_spans: usize, - pub dropped_by_trace_filter: usize, } // Keys used for sampling @@ -146,7 +145,7 @@ const SAMPLING_ANALYTICS_RATE_KEY: &str = "_dd1.sr.eausr"; /// /// # Trace-level attributes /// Some attributes related to the whole trace are stored in the root span of the chunk. -pub fn drop_chunks(traces: &mut Vec>>) -> DroppedStats +pub fn drop_chunks(traces: &mut Vec>>) -> DroppedP0Stats where T: TraceData, { @@ -197,10 +196,9 @@ where true }); - DroppedStats { + DroppedP0Stats { dropped_p0_traces, dropped_p0_spans, - dropped_by_trace_filter: 0, } }