From d6f269db4d9c103f01f9181484b1c7ed8d1ed299 Mon Sep 17 00:00:00 2001 From: vianney Date: Wed, 17 Jun 2026 15:48:39 +0200 Subject: [PATCH 1/3] chore(stats): submit p0 telemetry in stats --- libdd-data-pipeline/src/telemetry/mod.rs | 10 ++++++++++ libdd-data-pipeline/src/trace_exporter/mod.rs | 2 ++ .../src/trace_exporter/stats.rs | 20 +++++++++++++++++-- 3 files changed, 30 insertions(+), 2 deletions(-) diff --git a/libdd-data-pipeline/src/telemetry/mod.rs b/libdd-data-pipeline/src/telemetry/mod.rs index cd196e3556..ce1252e2bc 100644 --- a/libdd-data-pipeline/src/telemetry/mod.rs +++ b/libdd-data-pipeline/src/telemetry/mod.rs @@ -322,6 +322,16 @@ impl TelemetryClient { Ok(()) } + /// Send dropped P0 trace counts to telemetry. + pub fn send_p0_drops(&self, dropped_p0_traces: u64) -> Result<(), TelemetryError> { + if dropped_p0_traces > 0 { + let key = self.metrics.get(metrics::MetricKind::ChunksDroppedP0); + self.worker + .add_point(dropped_p0_traces as f64, key, vec![])?; + } + Ok(()) + } + /// Starts the client pub async fn start(&self) { _ = self diff --git a/libdd-data-pipeline/src/trace_exporter/mod.rs b/libdd-data-pipeline/src/trace_exporter/mod.rs index 0486a32678..129ee3704d 100644 --- a/libdd-data-pipeline/src/trace_exporter/mod.rs +++ b/libdd-data-pipeline/src/trace_exporter/mod.rs @@ -622,6 +622,8 @@ impl Tra &self.client_side_stats.status, self.client_computed_top_level, &self.trace_filterer.load(), + #[cfg(all(not(target_arch = "wasm32"), feature = "telemetry"))] + self.telemetry.as_ref(), ); for chunk in &mut traces { diff --git a/libdd-data-pipeline/src/trace_exporter/stats.rs b/libdd-data-pipeline/src/trace_exporter/stats.rs index 93d668781b..9f14e8e866 100644 --- a/libdd-data-pipeline/src/trace_exporter/stats.rs +++ b/libdd-data-pipeline/src/trace_exporter/stats.rs @@ -300,13 +300,19 @@ fn add_spans_to_stats( /// Process traces for stats computation and update header tags accordingly. /// Returns the number of P0 traces and spans that were dropped. +/// +/// If a telemetry client is provided and stats are enabled, dropped P0 counts +/// will be sent to telemetry. pub(crate) fn process_traces_for_stats( traces: &mut Vec>>, header_tags: &mut libdd_trace_utils::trace_utils::TracerHeaderTags, client_side_stats: &ArcSwap, client_computed_top_level: bool, trace_filterer: &TraceFilterer, -) -> libdd_trace_utils::span::trace_utils::DroppedStats { + #[cfg(all(not(target_arch = "wasm32"), feature = "telemetry"))] telemetry: Option< + &crate::telemetry::TelemetryClient, + >, +) -> libdd_trace_utils::span::trace_utils::DroppedP0Stats { let status = client_side_stats.load(); if let StatsComputationStatus::Enabled { stats_concentrator, .. @@ -332,7 +338,17 @@ pub(crate) fn process_traces_for_stats( header_tags.dropped_p0_traces = dropped_stats.dropped_p0_traces; header_tags.dropped_p0_spans = dropped_stats.dropped_p0_spans; - dropped_stats + // Send dropped P0 stats directly to telemetry if available + #[cfg(all(not(target_arch = "wasm32"), feature = "telemetry"))] + if let Some(telemetry_client) = telemetry { + if let Err(e) = + telemetry_client.send_p0_drops(dropped_p0_stats.dropped_p0_traces as u64) + { + tracing::error!(?e, "Error sending dropped P0 stats to telemetry"); + } + } + + dropped_p0_stats } else { libdd_trace_utils::span::trace_utils::DroppedStats { dropped_p0_traces: 0, From facbbc47e274007a2e212c7dfb2b4f0abd3d99b2 Mon Sep 17 00:00:00 2001 From: vianney Date: Wed, 17 Jun 2026 16:46:39 +0200 Subject: [PATCH 2/3] chore(telemetry): remove drop p0 from send result telemetry --- libdd-data-pipeline/src/telemetry/mod.rs | 145 ++++-------------- libdd-data-pipeline/src/trace_exporter/mod.rs | 8 +- .../src/trace_exporter/stats.rs | 24 +-- libdd-trace-utils/src/span/trace_utils.rs | 8 +- 4 files changed, 40 insertions(+), 145 deletions(-) diff --git a/libdd-data-pipeline/src/telemetry/mod.rs b/libdd-data-pipeline/src/telemetry/mod.rs index ce1252e2bc..af761a08c1 100644 --- a/libdd-data-pipeline/src/telemetry/mod.rs +++ b/libdd-data-pipeline/src/telemetry/mod.rs @@ -13,7 +13,6 @@ use libdd_telemetry::worker::{ }; use libdd_trace_utils::{ send_with_retry::{SendWithRetryError, SendWithRetryResult}, - span::trace_utils::DroppedStats, trace_utils::SendDataResult, }; use std::{collections::HashMap, time::Duration}; @@ -165,8 +164,6 @@ pub struct SendPayloadTelemetry { errors_status_code: u64, bytes_sent: u64, chunks_sent: u64, - chunks_dropped_p0: u64, - chunks_dropped_by_trace_filter: u64, chunks_dropped_serialization_error: u64, chunks_dropped_send_failure: u64, responses_count_per_code: HashMap, @@ -195,18 +192,9 @@ impl SendPayloadTelemetry { /// * `value` - The result of sending traces with retry /// * `bytes_sent` - The number of bytes in the payload /// * `chunks` - The number of trace chunks in the payload - /// * `dropped_stats` - Trace dropped stats from `stats::process_traces_for_stats` - pub fn from_retry_result( - value: &SendWithRetryResult, - bytes_sent: u64, - chunks: u64, - dropped_stats: DroppedStats, - ) -> Self { - let mut telemetry = Self { - chunks_dropped_p0: dropped_stats.dropped_p0_traces as u64, - chunks_dropped_by_trace_filter: dropped_stats.dropped_by_trace_filter as u64, - ..Default::default() - }; + /// * `chunks_dropped_p0` - The number of P0 trace chunks dropped due to sampling + pub fn from_retry_result(value: &SendWithRetryResult, bytes_sent: u64, chunks: u64) -> Self { + let mut telemetry = Self::default(); match value { Ok((response, attempts)) => { telemetry.chunks_sent = chunks; @@ -286,18 +274,6 @@ impl TelemetryClient { self.worker .add_point(data.chunks_sent as f64, key, vec![])?; } - if data.chunks_dropped_p0 > 0 { - let key = self.metrics.get(metrics::MetricKind::ChunksDroppedP0); - self.worker - .add_point(data.chunks_dropped_p0 as f64, key, vec![])?; - } - if data.chunks_dropped_by_trace_filter > 0 { - let key = self - .metrics - .get(metrics::MetricKind::ChunksDroppedByTraceFilter); - self.worker - .add_point(data.chunks_dropped_by_trace_filter as f64, key, vec![])?; - } if data.chunks_dropped_serialization_error > 0 { let key = self .metrics @@ -323,12 +299,23 @@ impl TelemetryClient { } /// Send dropped P0 trace counts to telemetry. - pub fn send_p0_drops(&self, dropped_p0_traces: u64) -> Result<(), TelemetryError> { + pub fn send_client_side_stats_drops( + &self, + dropped_p0_traces: usize, + dropped_by_trace_filter: usize, + ) -> Result<(), TelemetryError> { if dropped_p0_traces > 0 { let key = self.metrics.get(metrics::MetricKind::ChunksDroppedP0); self.worker .add_point(dropped_p0_traces as f64, key, vec![])?; } + if dropped_by_trace_filter > 0 { + let key = self + .metrics + .get(metrics::MetricKind::ChunksDroppedByTraceFilter); + self.worker + .add_point(dropped_by_trace_filter as f64, key, vec![])?; + } Ok(()) } @@ -655,23 +642,22 @@ mod tests { #[cfg_attr(miri, ignore)] #[test] - fn chunks_dropped_p0_test() { - let payload = Regex::new(r#""metric":"trace_chunks_dropped","points":\[\[\d+,1\.0\]\],"tags":\["src_library:libdatadog","reason:p0_drop"\],"common":true,"type":"count"#).unwrap(); + fn send_client_side_stats_drops_test() { + let payload_p0 = Regex::new(r#""metric":"trace_chunks_dropped","points":\[\[\d+,3\.0\]\],"tags":\["src_library:libdatadog","reason:p0_drop"\],"common":true,"type":"count"#).unwrap(); + let payload_trace_filter = Regex::new(r#""metric":"trace_chunks_dropped","points":\[\[\d+,5\.0\]\],"tags":\["src_library:libdatadog","reason:trace_filters"\],"common":true,"type":"count"#).unwrap(); let shared_runtime = SharedRuntime::new().expect("Failed to create runtime"); let server = MockServer::start(); let mut telemetry_srv = server.mock(|when, then| { - when.method(POST).body_matches(payload); + when.method(POST) + .body_matches(payload_p0) + .body_matches(payload_trace_filter); then.status(200).body(""); }); - let data = SendPayloadTelemetry { - chunks_dropped_p0: 1, - ..Default::default() - }; let (client, handle) = get_test_client(&server.url("/"), &shared_runtime); shared_runtime .block_on(async { client.start().await; - let _ = client.send(&data); + client.send_client_side_stats_drops(3, 5).unwrap(); // Wait for send to be processed sleep(Duration::from_millis(100)).await; @@ -724,16 +710,7 @@ mod tests { .unwrap(), 3, )); - let telemetry = SendPayloadTelemetry::from_retry_result( - &result, - 4, - 5, - DroppedStats { - dropped_p0_traces: 0, - dropped_p0_spans: 0, - dropped_by_trace_filter: 0, - }, - ); + let telemetry = SendPayloadTelemetry::from_retry_result(&result, 4, 5); assert_eq!( telemetry, SendPayloadTelemetry { @@ -746,38 +723,6 @@ mod tests { ) } - #[test] - fn telemetry_from_ok_response_with_p0_drops_test() { - let result = Ok(( - http::Response::builder() - .status(http::StatusCode::OK) - .body(Bytes::new()) - .unwrap(), - 3, - )); - let telemetry = SendPayloadTelemetry::from_retry_result( - &result, - 4, - 5, - DroppedStats { - dropped_p0_traces: 10, - dropped_p0_spans: 0, - dropped_by_trace_filter: 0, - }, - ); - assert_eq!( - telemetry, - SendPayloadTelemetry { - bytes_sent: 4, - chunks_sent: 5, - requests_count: 3, - chunks_dropped_p0: 10, - responses_count_per_code: HashMap::from([(200, 1)]), - ..Default::default() - } - ) - } - #[test] fn telemetry_from_request_error_test() { let error_response = http::Response::builder() @@ -785,16 +730,7 @@ mod tests { .body(Bytes::new()) .unwrap(); let result = Err(SendWithRetryError::Http(error_response, 5)); - let telemetry = SendPayloadTelemetry::from_retry_result( - &result, - 1, - 2, - DroppedStats { - dropped_p0_traces: 0, - dropped_p0_spans: 0, - dropped_by_trace_filter: 0, - }, - ); + let telemetry = SendPayloadTelemetry::from_retry_result(&result, 1, 2); assert_eq!( telemetry, SendPayloadTelemetry { @@ -813,16 +749,7 @@ mod tests { HttpError::Network(anyhow::anyhow!("connection refused")), 5, )); - let telemetry = SendPayloadTelemetry::from_retry_result( - &result, - 1, - 2, - DroppedStats { - dropped_p0_traces: 0, - dropped_p0_spans: 0, - dropped_by_trace_filter: 0, - }, - ); + let telemetry = SendPayloadTelemetry::from_retry_result(&result, 1, 2); assert_eq!( telemetry, SendPayloadTelemetry { @@ -837,16 +764,7 @@ mod tests { #[test] fn telemetry_from_timeout_error_test() { let result = Err(SendWithRetryError::Timeout(5)); - let telemetry = SendPayloadTelemetry::from_retry_result( - &result, - 1, - 2, - DroppedStats { - dropped_p0_traces: 0, - dropped_p0_spans: 0, - dropped_by_trace_filter: 0, - }, - ); + let telemetry = SendPayloadTelemetry::from_retry_result(&result, 1, 2); assert_eq!( telemetry, SendPayloadTelemetry { @@ -862,16 +780,7 @@ mod tests { #[test] fn telemetry_from_build_error_test() { let result = Err(SendWithRetryError::Build(5)); - let telemetry = SendPayloadTelemetry::from_retry_result( - &result, - 1, - 2, - DroppedStats { - dropped_p0_traces: 0, - dropped_p0_spans: 0, - dropped_by_trace_filter: 0, - }, - ); + let telemetry = SendPayloadTelemetry::from_retry_result(&result, 1, 2); assert_eq!( telemetry, SendPayloadTelemetry { diff --git a/libdd-data-pipeline/src/trace_exporter/mod.rs b/libdd-data-pipeline/src/trace_exporter/mod.rs index 129ee3704d..a8acd1e336 100644 --- a/libdd-data-pipeline/src/trace_exporter/mod.rs +++ b/libdd-data-pipeline/src/trace_exporter/mod.rs @@ -9,7 +9,7 @@ mod trace_serializer; // Re-export the builder pub use builder::TraceExporterBuilder; -use libdd_trace_utils::{span::trace_utils::DroppedStats, trace_filter::TraceFilterer}; +use libdd_trace_utils::trace_filter::TraceFilterer; use self::agent_response::AgentResponse; use self::metrics::MetricsEmitter; @@ -577,8 +577,6 @@ impl Tra mp_payload: Vec, headers: HeaderMap, chunks: usize, - #[cfg_attr(not(feature = "telemetry"), allow(unused_variables))] - dropped_stats: DroppedStats, ) -> Result { let strategy = RetryStrategy::default(); let payload_len = mp_payload.len(); @@ -599,7 +597,6 @@ impl Tra &result, payload_len as u64, chunks as u64, - dropped_stats, )) { error!(?e, "Error sending telemetry"); } @@ -616,7 +613,7 @@ impl Tra // Process stats computation and drop non-sampled (p0) chunks. // This must run before the OTLP path so that unsampled spans are not exported. - let dropped_stats = stats::process_traces_for_stats( + stats::process_traces_for_stats( &mut traces, &mut header_tags, &self.client_side_stats.status, @@ -676,7 +673,6 @@ impl Tra prepared.data, prepared.headers, prepared.chunk_count, - dropped_stats, ) .await; diff --git a/libdd-data-pipeline/src/trace_exporter/stats.rs b/libdd-data-pipeline/src/trace_exporter/stats.rs index 9f14e8e866..85c507f854 100644 --- a/libdd-data-pipeline/src/trace_exporter/stats.rs +++ b/libdd-data-pipeline/src/trace_exporter/stats.rs @@ -312,7 +312,7 @@ pub(crate) fn process_traces_for_stats( #[cfg(all(not(target_arch = "wasm32"), feature = "telemetry"))] telemetry: Option< &crate::telemetry::TelemetryClient, >, -) -> libdd_trace_utils::span::trace_utils::DroppedP0Stats { +) { let status = client_side_stats.load(); if let StatsComputationStatus::Enabled { stats_concentrator, .. @@ -328,33 +328,25 @@ pub(crate) fn process_traces_for_stats( add_spans_to_stats(stats_concentrator, traces); // Once stats have been computed we can drop all chunks that are not going to be // sampled by the agent - let mut dropped_stats = libdd_trace_utils::span::trace_utils::drop_chunks(traces); - dropped_stats.dropped_by_trace_filter = dropped_by_trace_filter; + let dropped_p0_stats = libdd_trace_utils::span::trace_utils::drop_chunks(traces); // Update the headers to indicate that stats have been computed and forward dropped // traces counts header_tags.client_computed_top_level = true; header_tags.client_computed_stats = true; - header_tags.dropped_p0_traces = dropped_stats.dropped_p0_traces; - header_tags.dropped_p0_spans = dropped_stats.dropped_p0_spans; + header_tags.dropped_p0_traces = dropped_p0_stats.dropped_p0_traces; + header_tags.dropped_p0_spans = dropped_p0_stats.dropped_p0_spans; // Send dropped P0 stats directly to telemetry if available #[cfg(all(not(target_arch = "wasm32"), feature = "telemetry"))] if let Some(telemetry_client) = telemetry { - if let Err(e) = - telemetry_client.send_p0_drops(dropped_p0_stats.dropped_p0_traces as u64) - { + if let Err(e) = telemetry_client.send_client_side_stats_drops( + dropped_p0_stats.dropped_p0_traces, + dropped_by_trace_filter, + ) { tracing::error!(?e, "Error sending dropped P0 stats to telemetry"); } } - - dropped_p0_stats - } else { - libdd_trace_utils::span::trace_utils::DroppedStats { - dropped_p0_traces: 0, - dropped_p0_spans: 0, - dropped_by_trace_filter: 0, - } } } diff --git a/libdd-trace-utils/src/span/trace_utils.rs b/libdd-trace-utils/src/span/trace_utils.rs index 54699a484e..60790aa3cb 100644 --- a/libdd-trace-utils/src/span/trace_utils.rs +++ b/libdd-trace-utils/src/span/trace_utils.rs @@ -125,10 +125,9 @@ pub fn is_partial_snapshot(span: &Span) -> bool { .is_some_and(|v| *v >= 0.0) } -pub struct DroppedStats { +pub struct DroppedP0Stats { pub dropped_p0_traces: usize, pub dropped_p0_spans: usize, - pub dropped_by_trace_filter: usize, } // Keys used for sampling @@ -146,7 +145,7 @@ const SAMPLING_ANALYTICS_RATE_KEY: &str = "_dd1.sr.eausr"; /// /// # Trace-level attributes /// Some attributes related to the whole trace are stored in the root span of the chunk. -pub fn drop_chunks(traces: &mut Vec>>) -> DroppedStats +pub fn drop_chunks(traces: &mut Vec>>) -> DroppedP0Stats where T: TraceData, { @@ -197,10 +196,9 @@ where true }); - DroppedStats { + DroppedP0Stats { dropped_p0_traces, dropped_p0_spans, - dropped_by_trace_filter: 0, } } From 19a8712fa1cfa01f4e470bef59a5edc11ce5f909 Mon Sep 17 00:00:00 2001 From: vianney Date: Thu, 18 Jun 2026 15:27:32 +0200 Subject: [PATCH 3/3] apply suggestions --- libdd-data-pipeline/src/telemetry/mod.rs | 1 - libdd-data-pipeline/src/trace_exporter/stats.rs | 1 - 2 files changed, 2 deletions(-) diff --git a/libdd-data-pipeline/src/telemetry/mod.rs b/libdd-data-pipeline/src/telemetry/mod.rs index af761a08c1..3ea0ea2096 100644 --- a/libdd-data-pipeline/src/telemetry/mod.rs +++ b/libdd-data-pipeline/src/telemetry/mod.rs @@ -192,7 +192,6 @@ impl SendPayloadTelemetry { /// * `value` - The result of sending traces with retry /// * `bytes_sent` - The number of bytes in the payload /// * `chunks` - The number of trace chunks in the payload - /// * `chunks_dropped_p0` - The number of P0 trace chunks dropped due to sampling pub fn from_retry_result(value: &SendWithRetryResult, bytes_sent: u64, chunks: u64) -> Self { let mut telemetry = Self::default(); match value { diff --git a/libdd-data-pipeline/src/trace_exporter/stats.rs b/libdd-data-pipeline/src/trace_exporter/stats.rs index 85c507f854..6a394d3249 100644 --- a/libdd-data-pipeline/src/trace_exporter/stats.rs +++ b/libdd-data-pipeline/src/trace_exporter/stats.rs @@ -299,7 +299,6 @@ fn add_spans_to_stats( } /// Process traces for stats computation and update header tags accordingly. -/// Returns the number of P0 traces and spans that were dropped. /// /// If a telemetry client is provided and stats are enabled, dropped P0 counts /// will be sent to telemetry.