From af3694aa6626a06d4c4e056b096b7d09e0985097 Mon Sep 17 00:00:00 2001 From: Saurabh Chauhan Date: Thu, 4 Jun 2026 14:14:11 +0530 Subject: [PATCH] feat(OBE-9898): always require a token, read site_version from claim The vector source's auth drops the require_token config knob: a valid bearer token is now always required, removing the legacy require_token=false bypass that accepted unauthenticated pushes. The site's version is read from the validated JWT's site_version claim (stamped by the manager auth-service, OBE-9896) and logged for telemetry / future per-version policy. - jwt_auth.rs: remove AuthConfig.require_token, default_require_token, Inner.require_token; authenticate() rejects a missing authorization header unconditionally; read SITE_VERSION_CLAIM from the decoded claims. - sources/vector/mod.rs: delete the legacy and now-redundant require_token tests, rename the kept ones to the unconditional behavior. - test_util/jwt_auth.rs: drop require_token from the shared build_auth helper. Co-Authored-By: Claude Opus 4.8 (1M context) --- src/sinks/vector/config.rs | 4 +- src/sources/util/jwt_auth.rs | 97 ++++++++++--------------------- src/sources/vector/mod.rs | 109 +++++------------------------------ src/test_util/jwt_auth.rs | 1 - 4 files changed, 47 insertions(+), 164 deletions(-) diff --git a/src/sinks/vector/config.rs b/src/sinks/vector/config.rs index 447bce0ff..a71ed776f 100644 --- a/src/sinks/vector/config.rs +++ b/src/sinks/vector/config.rs @@ -189,8 +189,8 @@ async fn healthcheck( // Build the request manually so we can attach the same authorization // header that `Service::call` attaches to `push_events`. Without this, - // a source configured with `require_token = true` would refuse this - // healthcheck even though the sink has valid credentials. + // a source that requires a token would refuse this healthcheck even + // though the sink has valid credentials. let mut request = tonic::Request::new(proto::HealthCheckRequest {}); if let Some(auth) = service.auth() { let bearer = auth.bearer_token().map_err(|message| { diff --git a/src/sources/util/jwt_auth.rs b/src/sources/util/jwt_auth.rs index 786518f7f..e0c52b8bf 100644 --- a/src/sources/util/jwt_auth.rs +++ b/src/sources/util/jwt_auth.rs @@ -33,6 +33,11 @@ pub(crate) const AUTH_FIELD_NAME_TAG: &str = "auth_field_name"; /// Metric tag key for the auth field value. pub(crate) const AUTH_FIELD_VALUE_TAG: &str = "auth_field_value"; +/// JWT claim carrying the site's version at token-issue time (stamped by the +/// manager's auth-service — OBE-9896). Read for telemetry / per-version policy; +/// absent for older sites whose tokens predate the claim. +const SITE_VERSION_CLAIM: &str = "site_version"; + /// Errors returned by [`Auth::authenticate`] (request-level). #[derive(Debug, PartialEq)] pub enum AuthError { @@ -881,23 +886,6 @@ pub struct AuthConfig { /// filtered out. When absent, no per-event filtering is applied. #[serde(default, skip_serializing_if = "Option::is_none")] pub value_path: Option, - - /// When `true`, requests without an `authorization` header are rejected - /// with `Unauthenticated`. Defaults to `true` (secure by default). - /// - /// Set to `false` to opt into the legacy fallback that accepts requests - /// without a token (useful during a staged migration where older agents - /// haven't been updated yet). - /// - /// Applies to both `push_events` and `health_check` RPCs so a sink with - /// auth misconfigured fails its healthcheck rather than silently - /// bypassing token validation. - #[serde(default = "default_require_token")] - pub require_token: bool, -} - -fn default_require_token() -> bool { - true } /// Replace serde's flattened-enum error with an actionable message naming the @@ -972,7 +960,6 @@ impl AuthConfig { jwks_validations, membership_claim, value_path, - require_token: self.require_token, }))) } } @@ -1003,7 +990,6 @@ struct Inner { jwks_validations: HashMap, membership_claim: Option, value_path: Option, - require_token: bool, } /// Per-request auth context returned by a successful [`Auth::authenticate`] call. @@ -1146,25 +1132,21 @@ impl Auth { /// /// # Returns /// - /// * `Ok(None)` — `authorization` was absent and `require_token` is `false`; request is - /// accepted without per-event filtering (legacy / migration mode). /// * `Ok(Some(ctx))` — token is valid. Use [`AuthContext::is_authorized`] for per-event /// membership checks against the extracted allowed-values list. - /// * `Err(AuthError::InvalidToken)` — `authorization` was absent but required, the token - /// is malformed/expired/bad-signature, wrong issuer/audience, unsupported algorithm, - /// or the membership claim is missing. + /// * `Err(AuthError::InvalidToken)` — `authorization` was absent (a token is always + /// required), or the token is malformed/expired/bad-signature, wrong issuer/audience, + /// unsupported algorithm, or the membership claim is missing. pub async fn authenticate( &self, authorization: Option<&str>, ) -> Result, AuthError> { let Some(auth_value) = authorization else { - if self.0.require_token { - return Err(AuthError::InvalidToken( - "authorization header is required", - )); - } - debug!(message = "No authorization header; allowing request."); - return Ok(None); + // A token is always required (OBE-9898 removed the legacy + // `require_token = false` bypass). + return Err(AuthError::InvalidToken( + "authorization header is required", + )); }; let token = strip_bearer_prefix(auth_value) @@ -1219,6 +1201,15 @@ impl Auth { } }; + // Surface the site's version (the manager stamps `site_version` as a JWT + // claim — OBE-9896) for telemetry / future per-version policy. Absent for + // older sites whose tokens predate the claim. + if let Some(site_version) = + token_data.claims.get(SITE_VERSION_CLAIM).and_then(Value::as_str) + { + debug!(message = "Authenticated site push.", %site_version); + } + let allowed_values = inner.membership_claim .as_ref() .map(|c| c.extract(&token_data.claims)) @@ -1272,8 +1263,8 @@ mod tests { }; // Construct a baseline `AuthConfig` from the given authority, using the - // permissive defaults the tests below want (no issuer/audience/value_path, - // `require_token = false`). Individual tests override fields as needed. + // permissive defaults the tests below want (no issuer/audience/value_path). + // Individual tests override fields as needed. fn cfg_with(authority: Authority) -> AuthConfig { AuthConfig { authority, @@ -1282,7 +1273,6 @@ mod tests { membership_claim: Some(MembershipClaimConfig::Identity("site_ids".to_string())), value_path: None, algorithms: None, - require_token: false, } } @@ -1398,15 +1388,6 @@ mod tests { // ── Auth::authenticate ─────────────────────────────────────────────────── - #[tokio::test] - async fn no_auth_header_allows_legacy_client_when_require_token_false() { - // The shared `build_auth` helper now matches production default - // (require_token = true), so explicitly opt out to test legacy mode. - let auth = build_auth_with_require_token(false).await; - let result = auth.authenticate(None).await; - assert!(matches!(result, Ok(None))); - } - #[tokio::test] async fn valid_token_returns_allowed_values() { let auth = build_auth(None, None).await; @@ -1659,37 +1640,25 @@ mod tests { assert!(auth.authenticate(Some(&bearer(&token))).await.is_ok()); } - // ── require_token enforcement ──────────────────────────────────────────── - - async fn build_auth_with_require_token(require: bool) -> Auth { - let mut cfg = cfg_with(inline_public_key()); - cfg.require_token = require; - cfg.build().await.unwrap() - } - - #[tokio::test] - async fn require_token_false_allows_missing_authorization() { - let auth = build_auth_with_require_token(false).await; - assert!(matches!(auth.authenticate(None).await, Ok(None))); - } + // ── token requirement (a token is always required) ─────────────────────── #[tokio::test] - async fn require_token_true_rejects_missing_authorization() { - let auth = build_auth_with_require_token(true).await; + async fn missing_authorization_is_rejected() { + let auth = build_auth(None, None).await; let result = auth.authenticate(None).await; assert!(matches!(result, Err(AuthError::InvalidToken(_)))); } #[tokio::test] - async fn require_token_true_accepts_valid_token() { - let auth = build_auth_with_require_token(true).await; + async fn valid_token_is_accepted() { + let auth = build_auth(None, None).await; let token = make_token(HashMap::new()); assert!(auth.authenticate(Some(&bearer(&token))).await.is_ok()); } #[tokio::test] - async fn require_token_true_still_rejects_invalid_token() { - let auth = build_auth_with_require_token(true).await; + async fn invalid_token_is_rejected() { + let auth = build_auth(None, None).await; let result = auth.authenticate(Some("Bearer not.a.jwt")).await; assert!(matches!(result, Err(AuthError::InvalidToken(_)))); } @@ -1729,7 +1698,6 @@ mod tests { async fn build_no_membership_auth() -> Auth { let mut cfg = cfg_with(inline_public_key()); cfg.membership_claim = None; - cfg.require_token = true; cfg.build().await.unwrap() } @@ -2085,7 +2053,6 @@ pub_key.type = "inline" pub_key.value = "pem" membership_claim = "tenants" issuer = "https://issuer.example.com/" -require_token = false "#; let cfg: AuthConfig = toml::from_str(toml).unwrap(); assert!(matches!( @@ -2094,7 +2061,6 @@ require_token = false )); assert_eq!(cfg.membership_claim, Some(MembershipClaimConfig::Identity("tenants".to_string()))); assert_eq!(cfg.issuer.as_deref(), Some("https://issuer.example.com/")); - assert!(!cfg.require_token); } // ── MembershipClaimConfig serde ────────────────────────────────────────── @@ -2386,7 +2352,6 @@ pub_key.value = "pem" membership_claim: Some(MembershipClaimConfig::Identity("site_ids".to_string())), value_path: None, algorithms: None, - require_token: false, } } diff --git a/src/sources/vector/mod.rs b/src/sources/vector/mod.rs index b41222a54..b0a5080de 100644 --- a/src/sources/vector/mod.rs +++ b/src/sources/vector/mod.rs @@ -114,8 +114,8 @@ impl AuthBatchStats { impl Service { /// Run request-level JWT validation against an inbound gRPC request. /// - /// Shared between `push_events` and `health_check` so both RPCs honor the - /// same `require_token` enforcement and reject the same set of bad tokens. + /// Shared between `push_events` and `health_check` so both RPCs require a + /// valid token and reject the same set of bad tokens. async fn validate_auth_header( &self, request: &Request, @@ -255,7 +255,7 @@ impl proto::Service for Service { request: Request, ) -> Result, Status> { // Apply the same JWT validation as push_events — same auth posture, - // including `require_token` enforcement when configured. + // so a misconfigured/unauthenticated sink fails its healthcheck. self.validate_auth_header(&request).await?; let message = proto::HealthCheckResponse { @@ -973,24 +973,6 @@ value = "{token}""# ); } - #[tokio::test] - async fn legacy_sink_without_auth_is_accepted() { - // Source has auth configured with require_token=false (legacy mode); - // sink sends no token → request allowed through. - let source_auth = format!( - r#"[auth] -pub_key.type = "inline" -pub_key.value = "{}" -membership_claim = "site_ids" -require_token = false"#, - TEST_PUBLIC_KEY.replace('\n', "\\n") - ); - assert_eq!( - run_auth_pair(&source_auth, "").await, - BatchStatus::Delivered - ); - } - #[tokio::test] async fn invalid_token_is_rejected() { let source_auth = format!( @@ -1044,27 +1026,7 @@ value = "{token}""# ); } - #[tokio::test] - async fn legacy_sink_with_value_path_configured_is_accepted() { - // Source has value_path with require_token=false (legacy mode); sink sends - // no token → per-event filtering is skipped entirely, all events pass through. - let source_auth = format!( - r#"[auth] -pub_key.type = "inline" -pub_key.value = "{}" -membership_claim = "site_ids" -require_token = false -[auth.value_path] -default = "tenant_id""#, - TEST_PUBLIC_KEY.replace('\n', "\\n") - ); - assert_eq!( - run_auth_pair(&source_auth, "").await, - BatchStatus::Delivered - ); - } - - // ── require_token + healthcheck integration ────────────────────────── + // ── healthcheck integration ────────────────────────────────────────── /// Build a source+sink pair and return the result of the sink's healthcheck. async fn run_healthcheck_pair( @@ -1093,50 +1055,9 @@ default = "tenant_id""#, } #[tokio::test] - async fn require_token_source_rejects_unauthenticated_push() { - // Source: require_token = true (explicit); sink: no auth → rejected. - let source_auth = format!( - r#"[auth] -pub_key.type = "inline" -pub_key.value = "{}" -membership_claim = "site_ids" -require_token = true"#, - TEST_PUBLIC_KEY.replace('\n', "\\n") - ); - assert_eq!( - run_auth_pair(&source_auth, "").await, - BatchStatus::Rejected - ); - } - - #[tokio::test] - async fn require_token_source_accepts_authenticated_push() { - // Source: require_token = true (explicit); sink: valid token → delivered. - let token = make_token(HashMap::new()); - let source_auth = format!( - r#"[auth] -pub_key.type = "inline" -pub_key.value = "{}" -membership_claim = "site_ids" -require_token = true"#, - TEST_PUBLIC_KEY.replace('\n', "\\n") - ); - let sink_auth = format!( - r#"[auth] -[auth.token] -type = "inline" -value = "{token}""# - ); - assert_eq!( - run_auth_pair(&source_auth, &sink_auth).await, - BatchStatus::Delivered - ); - } - - #[tokio::test] - async fn default_require_token_rejects_request_without_token() { - // Source TOML omits `require_token` — the default is `true`, - // so a sink with no token must be rejected. + async fn source_rejects_request_without_token() { + // Auth is configured; a token is always required, so a sink with no + // token is rejected. let source_auth = format!( r#"[auth] pub_key.type = "inline" @@ -1151,9 +1072,9 @@ membership_claim = "site_ids""#, } #[tokio::test] - async fn default_require_token_accepts_request_with_token() { - // Source TOML omits `require_token` — default `true`. Sink sends - // a valid token; request flows through. + async fn source_accepts_request_with_token() { + // Auth is configured; the sink sends a valid token, so the request + // flows through. let token = make_token(HashMap::new()); let source_auth = format!( r#"[auth] @@ -1175,14 +1096,13 @@ value = "{token}""# } #[tokio::test] - async fn healthcheck_succeeds_when_sink_sends_token_to_require_token_source() { + async fn healthcheck_succeeds_when_sink_sends_token() { let token = make_token(HashMap::new()); let source_auth = format!( r#"[auth] pub_key.type = "inline" pub_key.value = "{}" -membership_claim = "site_ids" -require_token = true"#, +membership_claim = "site_ids""#, TEST_PUBLIC_KEY.replace('\n', "\\n") ); let sink_auth = format!( @@ -1195,13 +1115,12 @@ value = "{token}""# } #[tokio::test] - async fn healthcheck_fails_when_sink_omits_token_to_require_token_source() { + async fn healthcheck_fails_when_sink_omits_token() { let source_auth = format!( r#"[auth] pub_key.type = "inline" pub_key.value = "{}" -membership_claim = "site_ids" -require_token = true"#, +membership_claim = "site_ids""#, TEST_PUBLIC_KEY.replace('\n', "\\n") ); assert!(run_healthcheck_pair(&source_auth, "").await.is_err()); diff --git a/src/test_util/jwt_auth.rs b/src/test_util/jwt_auth.rs index bdf7bd864..ffbd5ec88 100644 --- a/src/test_util/jwt_auth.rs +++ b/src/test_util/jwt_auth.rs @@ -112,7 +112,6 @@ pub async fn build_auth(issuer: Option<&str>, audience: Option>) -> Au membership_claim: Some(MembershipClaimConfig::Identity("site_ids".to_string())), value_path: None, algorithms: None, - require_token: true, } .build() .await