Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
5190ab4
docs(otlp): design spec for OTLP HTTP/protobuf trace export
bm1549 Jun 12, 2026
946f116
docs(otlp): implementation plan for OTLP HTTP/protobuf trace export
bm1549 Jun 12, 2026
07c7296
feat(trace-protobuf): vendor + generate OTLP trace/collector prost types
bm1549 Jun 12, 2026
6f385ba
feat(trace-utils): add serde->prost OTLP converter
bm1549 Jun 12, 2026
a52e30b
refactor(trace-utils): clarify OTLP converter + add fallback/status t…
bm1549 Jun 12, 2026
4a4846a
feat(trace-utils): add encode_otlp_json/encode_otlp_protobuf
bm1549 Jun 12, 2026
54d8856
feat(data-pipeline): make OtlpProtocol public with FromStr
bm1549 Jun 12, 2026
772be3e
feat(data-pipeline): set OTLP content-type from protocol
bm1549 Jun 12, 2026
46d72a7
feat(data-pipeline): add TraceExporterBuilder::set_otlp_protocol
bm1549 Jun 12, 2026
8f3c38e
feat(data-pipeline): dispatch OTLP encoder by protocol + protobuf test
bm1549 Jun 12, 2026
2633427
refactor(data-pipeline): narrow otlp pub surface + exhaustive content…
bm1549 Jun 12, 2026
e493d8d
feat(data-pipeline-ffi): add ddog_trace_exporter_config_set_otlp_prot…
bm1549 Jun 12, 2026
4a81412
test(data-pipeline-ffi): cover set_otlp_protocol + clarify contract
bm1549 Jun 12, 2026
3091b57
fix(trace-protobuf): disable comments on vendored OTLP trace protos t…
bm1549 Jun 12, 2026
664f16f
docs(data-pipeline): clarify parse_u64/kind fallbacks and unreachable…
bm1549 Jun 12, 2026
9bc5cf2
chore: drop in-repo planning docs (moved to chonk)
bm1549 Jun 12, 2026
a8a305f
fix(data-pipeline): reject unsupported OTLP gRPC at build time
bm1549 Jun 12, 2026
58ba1b8
refactor(data-pipeline): mark OtlpProtocol non_exhaustive
bm1549 Jun 18, 2026
1cb1dfb
fix(data-pipeline-ffi): store parsed OtlpProtocol, drop silent re-parse
bm1549 Jun 18, 2026
b479255
test(trace-utils): extend OTLP parity test to trace_id, status, attri…
bm1549 Jun 18, 2026
55541eb
perf(trace-utils): build OTLP protobuf directly from native spans
bm1549 Jun 18, 2026
421cb6d
fix(trace-utils): explicit OTLP scope fields + clamp negative timestamps
bm1549 Jun 18, 2026
a790182
docs(data-pipeline-ffi): clarify OTLP setter is inert without endpoin…
bm1549 Jun 18, 2026
af79de7
feat(trace-utils): add OTLP/JSON serde serializer over prost types
bm1549 Jun 18, 2026
b21be02
refactor(trace-utils): prost OTLP types as single IR, delete json_types
bm1549 Jun 18, 2026
809914e
refactor(data-pipeline): OtlpWireProtocol encapsulates content-type +…
bm1549 Jun 18, 2026
855f4c1
fix(data-pipeline): reject OTLP gRPC only when an endpoint is configured
bm1549 Jun 19, 2026
5952434
refactor(otlp): crate-private OtlpWireProtocol/json_serializer, drop …
bm1549 Jun 19, 2026
21769ac
test(trace-utils): add OTLP encoder hot-path benchmarks
bm1549 Jun 19, 2026
f60fa27
perf(trace-utils): pre-size OTLP mapper Vecs, allocation-free JSON id…
bm1549 Jun 19, 2026
3431c1d
perf(trace-utils): serialize OTLP/JSON timestamps and ints from a sta…
bm1549 Jun 19, 2026
18f28c5
docs(trace-protobuf): retain OTLP proto doc comments; fence example a…
bm1549 Jun 19, 2026
00261ae
test(data-pipeline): skip live-build OTLP gRPC test under miri
bm1549 Jun 19, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

141 changes: 137 additions & 4 deletions libdd-data-pipeline-ffi/src/trace_exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use libdd_common_ffi::{
CharSlice,
{slice::AsBytes, slice::ByteSlice},
};
use libdd_data_pipeline::otlp::OtlpProtocol;
use libdd_data_pipeline::trace_exporter::{
TelemetryConfig, TelemetryInstrumentationSessions, TraceExporter as GenericTraceExporter,
TraceExporterInputFormat, TraceExporterOutputFormat,
Expand Down Expand Up @@ -83,6 +84,7 @@ pub struct TraceExporterConfig {
connection_timeout: Option<u64>,
shared_runtime: Option<Arc<SharedRuntime>>,
otlp_endpoint: Option<String>,
otlp_protocol: Option<OtlpProtocol>,
}

#[no_mangle]
Expand Down Expand Up @@ -498,12 +500,51 @@ pub unsafe extern "C" fn ddog_trace_exporter_config_set_otlp_endpoint(
)
}

/// Sets the OTLP export protocol. Accepts the OTel-standard values `http/json` (default) or

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The doc should probably make it clear that this function is inert if an otlp endpoint isn't set.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated in a790182

/// `http/protobuf`; `grpc` is rejected as not yet supported. The host language resolves the value
/// (e.g. from `OTEL_EXPORTER_OTLP_TRACES_PROTOCOL`).
///
/// Has no effect unless an OTLP endpoint is also configured via
/// `ddog_trace_exporter_config_set_otlp_endpoint`; without one, traces are sent to the
/// Datadog agent and this protocol selection is ignored.
///
/// Returns `None` on success, `ErrorCode::InvalidArgument` for a null config or an unaccepted
/// value, and `ErrorCode::InvalidInput` for a non-UTF-8 string.
#[no_mangle]
pub unsafe extern "C" fn ddog_trace_exporter_config_set_otlp_protocol(
config: Option<&mut TraceExporterConfig>,
protocol: CharSlice,
) -> Option<Box<ExporterError>> {
catch_panic!(
if let Some(handle) = config {
let value = match sanitize_string(protocol) {
Ok(s) => s,
Err(e) => return Some(e),
};
// `FromStr` is the single source of truth for string -> OtlpProtocol. The OTLP trace
// exporter is HTTP-only, so we additionally reject `Grpc` here (it parses, but is
// unsupported) rather than storing a value the exporter would refuse at send time.
// The `_` arm also covers any future non_exhaustive variant.
match value.parse::<OtlpProtocol>() {
Ok(p @ (OtlpProtocol::HttpJson | OtlpProtocol::HttpProtobuf)) => {
handle.otlp_protocol = Some(p);
None
}
_ => gen_error!(ErrorCode::InvalidArgument),
}
} else {
gen_error!(ErrorCode::InvalidArgument)
},
gen_error!(ErrorCode::Panic)
)
}

/// Create a new TraceExporter instance.
///
/// When an OTLP endpoint is configured via `TraceExporterConfig`, the exporter sends traces in
/// OTLP HTTP/JSON to that endpoint instead of the Datadog agent. The same payload (e.g.
/// MessagePack) is passed to `ddog_trace_exporter_send`; the library decodes and converts to
/// OTLP when OTLP is enabled.
/// When an OTLP endpoint is configured via `TraceExporterConfig`, the exporter sends traces to
/// that endpoint in OTLP over HTTP — JSON or protobuf per the configured protocol — instead of
/// to the Datadog agent. The same payload (e.g. MessagePack) is passed to
/// `ddog_trace_exporter_send`; the library decodes and converts it to OTLP when OTLP is enabled.
///
/// # Arguments
///
Expand Down Expand Up @@ -565,6 +606,9 @@ pub unsafe extern "C" fn ddog_trace_exporter_new(

if let Some(ref url) = config.otlp_endpoint {
builder.set_otlp_endpoint(url);
if let Some(protocol) = config.otlp_protocol {
builder.set_otlp_protocol(protocol);
}
}

match builder.build() {
Expand Down Expand Up @@ -1283,6 +1327,95 @@ mod tests {
}
}

#[test]
fn config_otlp_protocol_test() {
unsafe {
// Null config → InvalidArgument
let error =
ddog_trace_exporter_config_set_otlp_protocol(None, CharSlice::from("http/json"));
assert_eq!(error.as_ref().unwrap().code, ErrorCode::InvalidArgument);
ddog_trace_exporter_error_free(error);

// "http/json" → success, stored
let mut config = Some(TraceExporterConfig::default());
let error = ddog_trace_exporter_config_set_otlp_protocol(
config.as_mut(),
CharSlice::from("http/json"),
);
assert_eq!(error, None);
assert_eq!(
config.as_ref().unwrap().otlp_protocol,
Some(OtlpProtocol::HttpJson)
);

// "http/protobuf" → success, stored
let mut config = Some(TraceExporterConfig::default());
let error = ddog_trace_exporter_config_set_otlp_protocol(
config.as_mut(),
CharSlice::from("http/protobuf"),
);
assert_eq!(error, None);
assert_eq!(
config.as_ref().unwrap().otlp_protocol,
Some(OtlpProtocol::HttpProtobuf)
);

// "grpc" → InvalidArgument
let mut config = Some(TraceExporterConfig::default());
let error = ddog_trace_exporter_config_set_otlp_protocol(
config.as_mut(),
CharSlice::from("grpc"),
);
assert_eq!(error.as_ref().unwrap().code, ErrorCode::InvalidArgument);
ddog_trace_exporter_error_free(error);

// Garbage value → InvalidArgument
let mut config = Some(TraceExporterConfig::default());
let error = ddog_trace_exporter_config_set_otlp_protocol(
config.as_mut(),
CharSlice::from("nonsense"),
);
assert_eq!(error.as_ref().unwrap().code, ErrorCode::InvalidArgument);
ddog_trace_exporter_error_free(error);

// Non-UTF-8 input → InvalidInput
let mut config = Some(TraceExporterConfig::default());
let invalid: [u8; 2] = [0x80u8, 0xFFu8];
let error = ddog_trace_exporter_config_set_otlp_protocol(
config.as_mut(),
CharSlice::from_bytes(&invalid),
);
assert_eq!(error.as_ref().unwrap().code, ErrorCode::InvalidInput);
ddog_trace_exporter_error_free(error);
}
}

#[test]
fn set_otlp_protocol_stores_parsed_enum() {
use libdd_data_pipeline::otlp::OtlpProtocol;
let mut cfg = TraceExporterConfig::default();
let err = unsafe {
ddog_trace_exporter_config_set_otlp_protocol(
Some(&mut cfg),
CharSlice::from("http/protobuf"),
)
};
assert!(err.is_none());
assert_eq!(cfg.otlp_protocol, Some(OtlpProtocol::HttpProtobuf));
}

#[test]
fn set_otlp_protocol_rejects_grpc_and_unknown() {
let mut cfg = TraceExporterConfig::default();
for bad in ["grpc", "nonsense"] {
let err = unsafe {
ddog_trace_exporter_config_set_otlp_protocol(Some(&mut cfg), CharSlice::from(bad))
};
assert!(err.is_some(), "expected error for {bad}");
assert_eq!(cfg.otlp_protocol, None, "{bad} must not be stored");
}
}

#[cfg(all(feature = "catch_panic", panic = "unwind"))]
#[test]
fn catch_panic_test() {
Expand Down
1 change: 1 addition & 0 deletions libdd-data-pipeline/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ libdd-trace-utils = { path = "../libdd-trace-utils", features = [
"test-utils",
] }
httpmock = "0.8.0-alpha.1"
prost = "0.14.1"
rand = "0.8.5"
tempfile = "3.3.0"
tokio = { version = "1.23", features = [
Expand Down
2 changes: 1 addition & 1 deletion libdd-data-pipeline/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
pub mod agent_info;
mod health_metrics;
pub(crate) mod otlp;
pub mod otlp;
#[cfg(feature = "telemetry")]
pub(crate) mod telemetry;
#[cfg(not(target_arch = "wasm32"))]
Expand Down
119 changes: 110 additions & 9 deletions libdd-data-pipeline/src/otlp/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,77 @@
use http::HeaderMap;
use std::time::Duration;

/// OTLP trace export protocol. HTTP/JSON is currently supported.
/// OTLP trace export protocol.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like grpc should still be excluded here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kept Grpc on the user-facing OtlpProtocol (#[non_exhaustive], room for later), but it's excluded from the wire path - OtlpWireProtocol is Json/Protobuf only, and grpc errors at build when an endpoint is set. 809914e / 855f4c1.

#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
pub(crate) enum OtlpProtocol {
#[non_exhaustive]
pub enum OtlpProtocol {
/// HTTP with JSON body (Content-Type: application/json). Default for HTTP.
#[default]
HttpJson,
/// HTTP with protobuf body. (Not supported yet)
#[allow(dead_code)]
/// HTTP with protobuf body (Content-Type: application/x-protobuf).
HttpProtobuf,
/// gRPC. (Not supported yet)
#[allow(dead_code)]
/// gRPC. Parsed by `FromStr` so callers get a clean error, but rejected at export time
/// (unsupported).
Grpc,
}

impl std::str::FromStr for OtlpProtocol {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"http/json" => Ok(OtlpProtocol::HttpJson),
"http/protobuf" => Ok(OtlpProtocol::HttpProtobuf),
"grpc" => Ok(OtlpProtocol::Grpc),
other => Err(format!("unknown OTLP protocol: {other}")),
}
}
}

/// The wire encoding actually used to send OTLP traces over HTTP. Internal, closed set: the
/// only encodings the exporter supports. The user-facing [`OtlpProtocol`] (which also carries the
/// unsupported `Grpc`) converts into this at the send boundary via `TryFrom`.
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum OtlpWireProtocol {
Json,
Protobuf,
}

impl std::convert::TryFrom<OtlpProtocol> for OtlpWireProtocol {
type Error = OtlpProtocol;
/// Maps the user-facing protocol to a supported wire encoding. `Grpc` is unsupported and
/// returns `Err(Grpc)` so the caller surfaces a clean error instead of silently downgrading.
fn try_from(p: OtlpProtocol) -> Result<Self, Self::Error> {
match p {
OtlpProtocol::HttpJson => Ok(OtlpWireProtocol::Json),
OtlpProtocol::HttpProtobuf => Ok(OtlpWireProtocol::Protobuf),
other => Err(other),
}
}
}

impl OtlpWireProtocol {
/// The HTTP `Content-Type` for this encoding.
pub fn content_type(&self) -> http::HeaderValue {
match self {
OtlpWireProtocol::Json => libdd_common::header::APPLICATION_JSON,
OtlpWireProtocol::Protobuf => libdd_common::header::APPLICATION_PROTOBUF,
}
}

/// Encode the prost OTLP request to this wire format.
pub fn encode(
&self,
req: &libdd_trace_utils::otlp_encoder::ProtoExportTraceServiceRequest,
) -> Result<Vec<u8>, serde_json::Error> {
match self {
OtlpWireProtocol::Json => libdd_trace_utils::otlp_encoder::encode_otlp_json(req),
OtlpWireProtocol::Protobuf => {
Ok(libdd_trace_utils::otlp_encoder::encode_otlp_protobuf(req))
}
}
}
}

/// Default timeout for OTLP export requests.
pub const DEFAULT_OTLP_TIMEOUT: Duration = Duration::from_secs(10);

Expand All @@ -32,7 +89,51 @@ pub struct OtlpTraceConfig {
pub headers: HeaderMap,
/// Request timeout.
pub timeout: Duration,
/// Protocol (for future use; currently only HttpJson is supported).
#[allow(dead_code)]
pub(crate) protocol: OtlpProtocol,
/// OTLP export protocol (selects body encoding and content-type).
pub protocol: OtlpProtocol,

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're matching protocol several times in this PR and it's kind of awkward because OtlpProtocol contains a currently unsupported variant. Why not keep the user facing OtlpProtocol as is, and internally have something like enum OtlpWireProtocol { Json, Protobuf }? It'll make the code much cleaner and when GRPC support is added you can get rid of OTLPWireProtocol. Or you could do:

  impl OtlpWireProtocol {                                                                                                                                                                                                                                                               
      fn content_type(&self) -> HeaderValue { /* 2 arms */ }                                                                                                                                                                                                                            
      fn encode(&self, req: &ExportTraceServiceRequest) -> Result<Vec<u8>> { /* 2 arms */ }                                                                                                                                                                                             
  } 

and encapsulate the correct behavior in the type.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated in 809914e

}

#[cfg(test)]
mod tests {
use super::*;
use std::str::FromStr;
#[test]
fn protocol_from_str() {
assert_eq!(
OtlpProtocol::from_str("http/json").unwrap(),
OtlpProtocol::HttpJson
);
assert_eq!(
OtlpProtocol::from_str("http/protobuf").unwrap(),
OtlpProtocol::HttpProtobuf
);
assert_eq!(OtlpProtocol::from_str("grpc").unwrap(), OtlpProtocol::Grpc);
assert!(OtlpProtocol::from_str("nonsense").is_err());
}

#[test]
fn wire_protocol_from_user_protocol() {
use std::convert::TryFrom;
assert_eq!(
OtlpWireProtocol::try_from(OtlpProtocol::HttpJson).unwrap(),
OtlpWireProtocol::Json
);
assert_eq!(
OtlpWireProtocol::try_from(OtlpProtocol::HttpProtobuf).unwrap(),
OtlpWireProtocol::Protobuf
);
assert!(OtlpWireProtocol::try_from(OtlpProtocol::Grpc).is_err());
}

#[test]
fn wire_protocol_content_types() {
assert_eq!(
OtlpWireProtocol::Json.content_type(),
libdd_common::header::APPLICATION_JSON
);
assert_eq!(
OtlpWireProtocol::Protobuf.content_type(),
libdd_common::header::APPLICATION_PROTOBUF
);
}
}
Loading
Loading