Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/sinks/vector/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,8 @@ async fn healthcheck(

// Build the request manually so we can attach the same authorization
// header that `Service::call` attaches to `push_events`. Without this,
// a source configured with `require_token = true` would refuse this
// healthcheck even though the sink has valid credentials.
// a source that requires a token would refuse this healthcheck even
// though the sink has valid credentials.
let mut request = tonic::Request::new(proto::HealthCheckRequest {});
if let Some(auth) = service.auth() {
let bearer = auth.bearer_token().map_err(|message| {
Expand Down
97 changes: 31 additions & 66 deletions src/sources/util/jwt_auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ pub(crate) const AUTH_FIELD_NAME_TAG: &str = "auth_field_name";
/// Metric tag key for the auth field value.
pub(crate) const AUTH_FIELD_VALUE_TAG: &str = "auth_field_value";

/// JWT claim carrying the site's version at token-issue time (stamped by the
/// manager's auth-service — OBE-9896). Read for telemetry / per-version policy;
/// absent for older sites whose tokens predate the claim.
const SITE_VERSION_CLAIM: &str = "site_version";

/// Errors returned by [`Auth::authenticate`] (request-level).
#[derive(Debug, PartialEq)]
pub enum AuthError {
Expand Down Expand Up @@ -881,23 +886,6 @@ pub struct AuthConfig {
/// filtered out. When absent, no per-event filtering is applied.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub value_path: Option<AuthValuePath>,

/// When `true`, requests without an `authorization` header are rejected
/// with `Unauthenticated`. Defaults to `true` (secure by default).
///
/// Set to `false` to opt into the legacy fallback that accepts requests
/// without a token (useful during a staged migration where older agents
/// haven't been updated yet).
///
/// Applies to both `push_events` and `health_check` RPCs so a sink with
/// auth misconfigured fails its healthcheck rather than silently
/// bypassing token validation.
#[serde(default = "default_require_token")]
pub require_token: bool,
}

fn default_require_token() -> bool {
true
}

/// Replace serde's flattened-enum error with an actionable message naming the
Expand Down Expand Up @@ -972,7 +960,6 @@ impl AuthConfig {
jwks_validations,
membership_claim,
value_path,
require_token: self.require_token,
})))
}
}
Expand Down Expand Up @@ -1003,7 +990,6 @@ struct Inner {
jwks_validations: HashMap<Algorithm, Validation>,
membership_claim: Option<MembershipClaim>,
value_path: Option<CompiledValuePath>,
require_token: bool,
}

/// Per-request auth context returned by a successful [`Auth::authenticate`] call.
Expand Down Expand Up @@ -1146,25 +1132,21 @@ impl Auth {
///
/// # Returns
///
/// * `Ok(None)` — `authorization` was absent and `require_token` is `false`; request is
/// accepted without per-event filtering (legacy / migration mode).
/// * `Ok(Some(ctx))` — token is valid. Use [`AuthContext::is_authorized`] for per-event
/// membership checks against the extracted allowed-values list.
/// * `Err(AuthError::InvalidToken)` — `authorization` was absent but required, the token
/// is malformed/expired/bad-signature, wrong issuer/audience, unsupported algorithm,
/// or the membership claim is missing.
/// * `Err(AuthError::InvalidToken)` — `authorization` was absent (a token is always
/// required), or the token is malformed/expired/bad-signature, wrong issuer/audience,
/// unsupported algorithm, or the membership claim is missing.
pub async fn authenticate(
&self,
authorization: Option<&str>,
) -> Result<Option<AuthContext>, AuthError> {
let Some(auth_value) = authorization else {
if self.0.require_token {
return Err(AuthError::InvalidToken(
"authorization header is required",
));
}
debug!(message = "No authorization header; allowing request.");
return Ok(None);
// A token is always required (OBE-9898 removed the legacy
// `require_token = false` bypass).
return Err(AuthError::InvalidToken(
"authorization header is required",
));
};

let token = strip_bearer_prefix(auth_value)
Expand Down Expand Up @@ -1219,6 +1201,15 @@ impl Auth {
}
};

// Surface the site's version (the manager stamps `site_version` as a JWT
// claim — OBE-9896) for telemetry / future per-version policy. Absent for
// older sites whose tokens predate the claim.
if let Some(site_version) =
token_data.claims.get(SITE_VERSION_CLAIM).and_then(Value::as_str)
{
debug!(message = "Authenticated site push.", %site_version);
}

let allowed_values = inner.membership_claim
.as_ref()
.map(|c| c.extract(&token_data.claims))
Expand Down Expand Up @@ -1272,8 +1263,8 @@ mod tests {
};

// Construct a baseline `AuthConfig` from the given authority, using the
// permissive defaults the tests below want (no issuer/audience/value_path,
// `require_token = false`). Individual tests override fields as needed.
// permissive defaults the tests below want (no issuer/audience/value_path).
// Individual tests override fields as needed.
fn cfg_with(authority: Authority) -> AuthConfig {
AuthConfig {
authority,
Expand All @@ -1282,7 +1273,6 @@ mod tests {
membership_claim: Some(MembershipClaimConfig::Identity("site_ids".to_string())),
value_path: None,
algorithms: None,
require_token: false,
}
}

Expand Down Expand Up @@ -1398,15 +1388,6 @@ mod tests {

// ── Auth::authenticate ───────────────────────────────────────────────────

#[tokio::test]
async fn no_auth_header_allows_legacy_client_when_require_token_false() {
// The shared `build_auth` helper now matches production default
// (require_token = true), so explicitly opt out to test legacy mode.
let auth = build_auth_with_require_token(false).await;
let result = auth.authenticate(None).await;
assert!(matches!(result, Ok(None)));
}

#[tokio::test]
async fn valid_token_returns_allowed_values() {
let auth = build_auth(None, None).await;
Expand Down Expand Up @@ -1659,37 +1640,25 @@ mod tests {
assert!(auth.authenticate(Some(&bearer(&token))).await.is_ok());
}

// ── require_token enforcement ────────────────────────────────────────────

async fn build_auth_with_require_token(require: bool) -> Auth {
let mut cfg = cfg_with(inline_public_key());
cfg.require_token = require;
cfg.build().await.unwrap()
}

#[tokio::test]
async fn require_token_false_allows_missing_authorization() {
let auth = build_auth_with_require_token(false).await;
assert!(matches!(auth.authenticate(None).await, Ok(None)));
}
// ── token requirement (a token is always required) ───────────────────────

#[tokio::test]
async fn require_token_true_rejects_missing_authorization() {
let auth = build_auth_with_require_token(true).await;
async fn missing_authorization_is_rejected() {
let auth = build_auth(None, None).await;
let result = auth.authenticate(None).await;
assert!(matches!(result, Err(AuthError::InvalidToken(_))));
}

#[tokio::test]
async fn require_token_true_accepts_valid_token() {
let auth = build_auth_with_require_token(true).await;
async fn valid_token_is_accepted() {
let auth = build_auth(None, None).await;
let token = make_token(HashMap::new());
assert!(auth.authenticate(Some(&bearer(&token))).await.is_ok());
}

#[tokio::test]
async fn require_token_true_still_rejects_invalid_token() {
let auth = build_auth_with_require_token(true).await;
async fn invalid_token_is_rejected() {
let auth = build_auth(None, None).await;
let result = auth.authenticate(Some("Bearer not.a.jwt")).await;
assert!(matches!(result, Err(AuthError::InvalidToken(_))));
}
Expand Down Expand Up @@ -1729,7 +1698,6 @@ mod tests {
async fn build_no_membership_auth() -> Auth {
let mut cfg = cfg_with(inline_public_key());
cfg.membership_claim = None;
cfg.require_token = true;
cfg.build().await.unwrap()
}

Expand Down Expand Up @@ -2085,7 +2053,6 @@ pub_key.type = "inline"
pub_key.value = "pem"
membership_claim = "tenants"
issuer = "https://issuer.example.com/"
require_token = false
"#;
let cfg: AuthConfig = toml::from_str(toml).unwrap();
assert!(matches!(
Expand All @@ -2094,7 +2061,6 @@ require_token = false
));
assert_eq!(cfg.membership_claim, Some(MembershipClaimConfig::Identity("tenants".to_string())));
assert_eq!(cfg.issuer.as_deref(), Some("https://issuer.example.com/"));
assert!(!cfg.require_token);
}

// ── MembershipClaimConfig serde ──────────────────────────────────────────
Expand Down Expand Up @@ -2386,7 +2352,6 @@ pub_key.value = "pem"
membership_claim: Some(MembershipClaimConfig::Identity("site_ids".to_string())),
value_path: None,
algorithms: None,
require_token: false,
}
}

Expand Down
109 changes: 14 additions & 95 deletions src/sources/vector/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,8 @@ impl AuthBatchStats {
impl Service {
/// Run request-level JWT validation against an inbound gRPC request.
///
/// Shared between `push_events` and `health_check` so both RPCs honor the
/// same `require_token` enforcement and reject the same set of bad tokens.
/// Shared between `push_events` and `health_check` so both RPCs require a
/// valid token and reject the same set of bad tokens.
async fn validate_auth_header<T>(
&self,
request: &Request<T>,
Expand Down Expand Up @@ -255,7 +255,7 @@ impl proto::Service for Service {
request: Request<proto::HealthCheckRequest>,
) -> Result<Response<proto::HealthCheckResponse>, Status> {
// Apply the same JWT validation as push_events — same auth posture,
// including `require_token` enforcement when configured.
// so a misconfigured/unauthenticated sink fails its healthcheck.
self.validate_auth_header(&request).await?;

let message = proto::HealthCheckResponse {
Expand Down Expand Up @@ -973,24 +973,6 @@ value = "{token}""#
);
}

#[tokio::test]
async fn legacy_sink_without_auth_is_accepted() {
// Source has auth configured with require_token=false (legacy mode);
// sink sends no token → request allowed through.
let source_auth = format!(
r#"[auth]
pub_key.type = "inline"
pub_key.value = "{}"
membership_claim = "site_ids"
require_token = false"#,
TEST_PUBLIC_KEY.replace('\n', "\\n")
);
assert_eq!(
run_auth_pair(&source_auth, "").await,
BatchStatus::Delivered
);
}

#[tokio::test]
async fn invalid_token_is_rejected() {
let source_auth = format!(
Expand Down Expand Up @@ -1044,27 +1026,7 @@ value = "{token}""#
);
}

#[tokio::test]
async fn legacy_sink_with_value_path_configured_is_accepted() {
// Source has value_path with require_token=false (legacy mode); sink sends
// no token → per-event filtering is skipped entirely, all events pass through.
let source_auth = format!(
r#"[auth]
pub_key.type = "inline"
pub_key.value = "{}"
membership_claim = "site_ids"
require_token = false
[auth.value_path]
default = "tenant_id""#,
TEST_PUBLIC_KEY.replace('\n', "\\n")
);
assert_eq!(
run_auth_pair(&source_auth, "").await,
BatchStatus::Delivered
);
}

// ── require_token + healthcheck integration ──────────────────────────
// ── healthcheck integration ──────────────────────────────────────────

/// Build a source+sink pair and return the result of the sink's healthcheck.
async fn run_healthcheck_pair(
Expand Down Expand Up @@ -1093,50 +1055,9 @@ default = "tenant_id""#,
}

#[tokio::test]
async fn require_token_source_rejects_unauthenticated_push() {
// Source: require_token = true (explicit); sink: no auth → rejected.
let source_auth = format!(
r#"[auth]
pub_key.type = "inline"
pub_key.value = "{}"
membership_claim = "site_ids"
require_token = true"#,
TEST_PUBLIC_KEY.replace('\n', "\\n")
);
assert_eq!(
run_auth_pair(&source_auth, "").await,
BatchStatus::Rejected
);
}

#[tokio::test]
async fn require_token_source_accepts_authenticated_push() {
// Source: require_token = true (explicit); sink: valid token → delivered.
let token = make_token(HashMap::new());
let source_auth = format!(
r#"[auth]
pub_key.type = "inline"
pub_key.value = "{}"
membership_claim = "site_ids"
require_token = true"#,
TEST_PUBLIC_KEY.replace('\n', "\\n")
);
let sink_auth = format!(
r#"[auth]
[auth.token]
type = "inline"
value = "{token}""#
);
assert_eq!(
run_auth_pair(&source_auth, &sink_auth).await,
BatchStatus::Delivered
);
}

#[tokio::test]
async fn default_require_token_rejects_request_without_token() {
// Source TOML omits `require_token` — the default is `true`,
// so a sink with no token must be rejected.
async fn source_rejects_request_without_token() {
// Auth is configured; a token is always required, so a sink with no
// token is rejected.
let source_auth = format!(
r#"[auth]
pub_key.type = "inline"
Expand All @@ -1151,9 +1072,9 @@ membership_claim = "site_ids""#,
}

#[tokio::test]
async fn default_require_token_accepts_request_with_token() {
// Source TOML omits `require_token` — default `true`. Sink sends
// a valid token; request flows through.
async fn source_accepts_request_with_token() {
// Auth is configured; the sink sends a valid token, so the request
// flows through.
let token = make_token(HashMap::new());
let source_auth = format!(
r#"[auth]
Expand All @@ -1175,14 +1096,13 @@ value = "{token}""#
}

#[tokio::test]
async fn healthcheck_succeeds_when_sink_sends_token_to_require_token_source() {
async fn healthcheck_succeeds_when_sink_sends_token() {
let token = make_token(HashMap::new());
let source_auth = format!(
r#"[auth]
pub_key.type = "inline"
pub_key.value = "{}"
membership_claim = "site_ids"
require_token = true"#,
membership_claim = "site_ids""#,
TEST_PUBLIC_KEY.replace('\n', "\\n")
);
let sink_auth = format!(
Expand All @@ -1195,13 +1115,12 @@ value = "{token}""#
}

#[tokio::test]
async fn healthcheck_fails_when_sink_omits_token_to_require_token_source() {
async fn healthcheck_fails_when_sink_omits_token() {
let source_auth = format!(
r#"[auth]
pub_key.type = "inline"
pub_key.value = "{}"
membership_claim = "site_ids"
require_token = true"#,
membership_claim = "site_ids""#,
TEST_PUBLIC_KEY.replace('\n', "\\n")
);
assert!(run_healthcheck_pair(&source_auth, "").await.is_err());
Expand Down
Loading