diff --git a/Cargo.lock b/Cargo.lock index 9e73bce83..2fe32f13b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3456,11 +3456,13 @@ dependencies = [ name = "openshell-core" version = "0.0.0" dependencies = [ + "chrono", "ipnet", "miette", "prost", "prost-types", "protobuf-src", + "reqwest 0.12.28", "serde", "serde_json", "tempfile", @@ -3480,6 +3482,7 @@ dependencies = [ "openshell-core", "serde", "tar", + "temp-env", "tempfile", "tokio", "tokio-stream", @@ -3503,6 +3506,7 @@ dependencies = [ "prost-types", "serde", "serde_json", + "temp-env", "thiserror 2.0.18", "tokio", "tokio-stream", @@ -3556,6 +3560,7 @@ dependencies = [ "serde_json", "sha2 0.10.9", "tar", + "temp-env", "tokio", "tokio-stream", "tonic", diff --git a/README.md b/README.md index 574347642..d6ba6a31e 100644 --- a/README.md +++ b/README.md @@ -243,6 +243,16 @@ All implementation work is human-gated — agents propose plans, humans approve, OpenShell is built agent-first — your agent is your first collaborator. Before opening issues or submitting code, point your agent at the repo and let it use the skills in `.agents/skills/` to investigate, diagnose, and prototype. See [CONTRIBUTING.md](CONTRIBUTING.md) for the full agent skills table, contribution workflow, and development setup. +## Telemetry + +OpenShell collects anonymous telemetry to help improve the project for developers. This data is not used to track individual user behavior. It helps us understand aggregate usage of sandbox, provider, and policy workflows so we can prioritize product improvements and share usage trends with the community. + +Disable telemetry by setting `OPENSHELL_TELEMETRY_ENABLED=false` on the gateway deployment. OpenShell propagates this deployment setting into sandbox supervisor environments so sandbox-side telemetry collection is disabled as well. + +Telemetry events are limited to anonymous operational categories and counts, such as sandbox lifecycle outcomes, provider profile buckets, policy decision counts, and aggregate network activity denial categories. OpenShell telemetry does not collect sandbox names or IDs, hostnames, file paths, binary paths, prompts, credentials, provider names, model names, or user content. + +Opting out applies only to telemetry emitted by OpenShell. Third-party services, model providers, inference endpoints, agents, or tools that you configure and use with OpenShell may have their own terms and privacy practices. + ## Notice and Disclaimer This software automatically retrieves, accesses or interacts with external materials. Those retrieved materials are not distributed with this software and are governed solely by separate terms, conditions and licenses. You are solely responsible for finding, reviewing and complying with all applicable terms, conditions, and licenses, and for verifying the security, integrity and suitability of any retrieved materials for your specific use case. This software is provided "AS IS", without warranty of any kind. The author makes no representations or warranties regarding any retrieved materials, and assumes no liability for any losses, damages, liabilities or legal consequences from your use or inability to use this software or any retrieved materials. Use this software and the retrieved materials at your own risk. diff --git a/crates/openshell-core/Cargo.toml b/crates/openshell-core/Cargo.toml index b03fb1494..78c87d54c 100644 --- a/crates/openshell-core/Cargo.toml +++ b/crates/openshell-core/Cargo.toml @@ -20,6 +20,8 @@ serde = { workspace = true } serde_json = { workspace = true } url = { workspace = true } ipnet = "2" +chrono = { version = "0.4", default-features = false, features = ["clock", "std"] } +reqwest = { workspace = true, features = ["blocking", "rustls-tls-webpki-roots"] } [features] ## Include test-only settings (dummy_bool, dummy_int) in the registry. diff --git a/crates/openshell-core/src/lib.rs b/crates/openshell-core/src/lib.rs index d0225c471..bc66adb66 100644 --- a/crates/openshell-core/src/lib.rs +++ b/crates/openshell-core/src/lib.rs @@ -24,6 +24,7 @@ pub mod progress; pub mod proto; pub mod sandbox_env; pub mod settings; +pub mod telemetry; pub mod time; pub use config::{ComputeDriverKind, Config, OidcConfig, TlsConfig}; diff --git a/crates/openshell-core/src/sandbox_env.rs b/crates/openshell-core/src/sandbox_env.rs index d345762ca..4287a4617 100644 --- a/crates/openshell-core/src/sandbox_env.rs +++ b/crates/openshell-core/src/sandbox_env.rs @@ -26,6 +26,9 @@ pub const LOG_LEVEL: &str = "OPENSHELL_LOG_LEVEL"; /// Shell command to run inside the sandbox. pub const SANDBOX_COMMAND: &str = "OPENSHELL_SANDBOX_COMMAND"; +/// Deployment-controlled telemetry toggle propagated to the sandbox supervisor. +pub const TELEMETRY_ENABLED: &str = "OPENSHELL_TELEMETRY_ENABLED"; + /// Path to the CA certificate for mTLS communication with the gateway. pub const TLS_CA: &str = "OPENSHELL_TLS_CA"; diff --git a/crates/openshell-core/src/telemetry.rs b/crates/openshell-core/src/telemetry.rs new file mode 100644 index 000000000..01a865544 --- /dev/null +++ b/crates/openshell-core/src/telemetry.rs @@ -0,0 +1,711 @@ +// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +//! Best-effort anonymous telemetry emission helpers. + +use chrono::{SecondsFormat, Utc}; +use reqwest::blocking::Client; +use serde_json::{Value, json}; +use std::collections::BTreeMap; +use std::sync::{OnceLock, mpsc}; +use std::thread; +use std::time::Duration; + +const TELEMETRY_EVENT_QUEUE_CAPACITY: usize = 1024; +const MAX_TELEMETRY_INTEGER: u64 = 9_223_372_036_854_775_807; +const CLIENT_ID: &str = "415437562476676"; +const DEFAULT_ENDPOINT: &str = "https://events.telemetry.data-uat.nvidia.com/v1.1/events/json"; +const EVENT_SCHEMA_VERSION: &str = "4.0"; +const EVENT_PROTOCOL_VERSION: &str = "1.6"; +const EVENT_SYSTEM_VERSION: &str = "openshell-telemetry/1.0"; +const HTTP_TIMEOUT: Duration = Duration::from_secs(5); +const SOURCE: TelemetrySource = TelemetrySource::OpenShell; +static TELEMETRY_SENDER: OnceLock>> = OnceLock::new(); + +#[derive(Debug)] +struct TelemetryEvent { + endpoint: String, + name: &'static str, + event_ts: String, + event: Value, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum TelemetrySource { + OpenShell, +} + +impl TelemetrySource { + const fn as_str(self) -> &'static str { + match self { + Self::OpenShell => "openshell", + } + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum TelemetryOutcome { + Success, + Failure, +} + +impl TelemetryOutcome { + #[must_use] + pub const fn as_str(self) -> &'static str { + match self { + Self::Success => "success", + Self::Failure => "failure", + } + } + + #[must_use] + pub const fn from_success(success: bool) -> Self { + if success { + Self::Success + } else { + Self::Failure + } + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum LifecycleResource { + Sandbox, + SandboxPolicy, +} + +impl LifecycleResource { + #[must_use] + pub const fn as_str(self) -> &'static str { + match self { + Self::Sandbox => "sandbox", + Self::SandboxPolicy => "sandbox_policy", + } + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum LifecycleOperation { + Create, + Delete, + Update, +} + +impl LifecycleOperation { + #[must_use] + pub const fn as_str(self) -> &'static str { + match self { + Self::Create => "create", + Self::Delete => "delete", + Self::Update => "update", + } + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum PolicyDecisionOperation { + Approve, + Reject, + Undo, + ApproveAll, +} + +impl PolicyDecisionOperation { + #[must_use] + pub const fn as_str(self) -> &'static str { + match self { + Self::Approve => "approve", + Self::Reject => "reject", + Self::Undo => "undo", + Self::ApproveAll => "approve_all", + } + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum SandboxTemplateSource { + Default, + Image, + Undefined, +} + +impl SandboxTemplateSource { + #[must_use] + pub const fn as_str(self) -> &'static str { + match self { + Self::Default => "default", + Self::Image => "image", + Self::Undefined => "undefined", + } + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum TelemetryComputeDriver { + Docker, + Kubernetes, + Podman, + Vm, + Unknown, +} + +impl TelemetryComputeDriver { + #[must_use] + pub const fn as_str(self) -> &'static str { + match self { + Self::Docker => "docker", + Self::Kubernetes => "kubernetes", + Self::Podman => "podman", + Self::Vm => "vm", + Self::Unknown => "unknown", + } + } + + #[must_use] + pub fn from_raw(raw: &str) -> Self { + match raw.trim().to_ascii_lowercase().as_str() { + "docker" => Self::Docker, + "k8s" | "kubernetes" => Self::Kubernetes, + "podman" => Self::Podman, + "vm" => Self::Vm, + _ => Self::Unknown, + } + } + + #[must_use] + pub const fn from_driver_kind(driver_kind: Option) -> Self { + match driver_kind { + Some(crate::ComputeDriverKind::Docker) => Self::Docker, + Some(crate::ComputeDriverKind::Kubernetes) => Self::Kubernetes, + Some(crate::ComputeDriverKind::Podman) => Self::Podman, + Some(crate::ComputeDriverKind::Vm) => Self::Vm, + None => Self::Unknown, + } + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ProviderProfile { + Anthropic, + Claude, + Codex, + Copilot, + Github, + Gitlab, + Nvidia, + Openai, + Opencode, + Outlook, + Custom, +} + +impl ProviderProfile { + #[must_use] + pub const fn as_str(self) -> &'static str { + match self { + Self::Anthropic => "anthropic", + Self::Claude => "claude", + Self::Codex => "codex", + Self::Copilot => "copilot", + Self::Github => "github", + Self::Gitlab => "gitlab", + Self::Nvidia => "nvidia", + Self::Openai => "openai", + Self::Opencode => "opencode", + Self::Outlook => "outlook", + Self::Custom => "custom", + } + } + + #[must_use] + pub fn from_raw(raw: &str) -> Self { + match raw.trim().to_ascii_lowercase().as_str() { + "anthropic" => Self::Anthropic, + "claude" | "claude-code" => Self::Claude, + "codex" => Self::Codex, + "copilot" => Self::Copilot, + "github" | "gh" => Self::Github, + "gitlab" | "glab" => Self::Gitlab, + "nvidia" => Self::Nvidia, + "openai" => Self::Openai, + "opencode" => Self::Opencode, + "outlook" => Self::Outlook, + _ => Self::Custom, + } + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub enum DenyGroup { + Bypass, + ConnectPolicy, + ForwardPolicy, + L7ParseRejection, + L7Policy, + PolicyStale, + Ssrf, + Unknown, +} + +impl DenyGroup { + #[must_use] + pub const fn as_str(self) -> &'static str { + match self { + Self::Bypass => "bypass", + Self::ConnectPolicy => "connect_policy", + Self::ForwardPolicy => "forward_policy", + Self::L7ParseRejection => "l7_parse_rejection", + Self::L7Policy => "l7_policy", + Self::PolicyStale => "policy_stale", + Self::Ssrf => "ssrf", + Self::Unknown => "unknown", + } + } + + #[must_use] + pub fn from_raw(raw: &str) -> Self { + match raw { + "connect_policy" | "connect" | "l4_deny" => Self::ConnectPolicy, + "forward_policy" | "forward" => Self::ForwardPolicy, + "l7_policy" | "l7" | "l7_deny" | "forward-l7-deny" => Self::L7Policy, + "l7_parse_rejection" | "parse_rejection" => Self::L7ParseRejection, + "ssrf" => Self::Ssrf, + "bypass" => Self::Bypass, + "policy_stale" => Self::PolicyStale, + _ => Self::Unknown, + } + } +} + +pub fn enabled() -> bool { + telemetry_enabled_from(std::env::var("OPENSHELL_TELEMETRY_ENABLED").ok().as_deref()) +} + +pub fn enabled_env_value() -> &'static str { + enabled_env_value_from(std::env::var("OPENSHELL_TELEMETRY_ENABLED").ok().as_deref()) +} + +fn enabled_env_value_from(value: Option<&str>) -> &'static str { + if telemetry_enabled_from(value) { + "true" + } else { + "false" + } +} + +fn telemetry_enabled_from(value: Option<&str>) -> bool { + let value = value.unwrap_or("true"); + !matches!( + value.trim().to_ascii_lowercase().as_str(), + "0" | "false" | "no" | "off" + ) +} + +fn telemetry_endpoint() -> Option { + telemetry_endpoint_from( + std::env::var("OPENSHELL_TELEMETRY_ENDPOINT") + .ok() + .as_deref(), + ) +} + +fn telemetry_endpoint_from(endpoint: Option<&str>) -> Option { + let endpoint = endpoint.unwrap_or(DEFAULT_ENDPOINT); + let endpoint = endpoint.trim(); + if endpoint.is_empty() { + None + } else { + Some(endpoint.to_string()) + } +} + +fn timestamp() -> String { + Utc::now().to_rfc3339_opts(SecondsFormat::Millis, true) +} + +fn client_version() -> &'static str { + crate::VERSION +} + +fn build_payload(name: &str, event: Value, event_ts: &str, sent_ts: &str) -> Value { + json!({ + "browserType": "undefined", + "clientId": CLIENT_ID, + "clientType": "Native", + "clientVariant": "Release", + "clientVer": client_version(), + "cpuArchitecture": std::env::consts::ARCH, + "deviceGdprBehOptIn": "None", + "deviceGdprFuncOptIn": "None", + "deviceGdprTechOptIn": "None", + "deviceId": "undefined", + "deviceMake": "undefined", + "deviceModel": "undefined", + "deviceOS": "undefined", + "deviceOSVersion": "undefined", + "deviceType": "undefined", + "eventProtocol": EVENT_PROTOCOL_VERSION, + "eventSchemaVer": EVENT_SCHEMA_VERSION, + "eventSysVer": EVENT_SYSTEM_VERSION, + "externalUserId": "undefined", + "gdprBehOptIn": "None", + "gdprFuncOptIn": "None", + "gdprTechOptIn": "None", + "idpId": "undefined", + "integrationId": "undefined", + "productName": "undefined", + "productVersion": "undefined", + "sentTs": sent_ts, + "sessionId": "undefined", + "userId": "undefined", + "events": [ + { + "ts": event_ts, + "parameters": event, + "name": name, + } + ], + }) +} + +fn telemetry_sender() -> Option<&'static mpsc::SyncSender> { + TELEMETRY_SENDER + .get_or_init(|| { + let (tx, rx) = mpsc::sync_channel(TELEMETRY_EVENT_QUEUE_CAPACITY); + thread::Builder::new() + .name("openshell-telemetry".to_string()) + .spawn(move || telemetry_worker(rx)) + .ok() + .map(|_| tx) + }) + .as_ref() +} + +fn telemetry_worker(rx: mpsc::Receiver) { + for event in rx { + let payload = build_payload(event.name, event.event, &event.event_ts, ×tamp()); + let _ = publish_payload(&event.endpoint, payload); + } +} + +fn publish_payload(endpoint: &str, payload: Value) -> Result<(), reqwest::Error> { + Client::builder() + .use_rustls_tls() + .tls_built_in_root_certs(true) + .timeout(HTTP_TIMEOUT) + .build()? + .post(endpoint) + .json(&payload) + .send()? + .error_for_status()?; + Ok(()) +} + +fn try_enqueue_event(sender: &mpsc::SyncSender, event: TelemetryEvent) -> bool { + sender.try_send(event).is_ok() +} + +fn emit_event(name: &'static str, event: Value) { + if !enabled() { + return; + } + let Some(endpoint) = telemetry_endpoint() else { + return; + }; + let Some(sender) = telemetry_sender() else { + return; + }; + + let _ = try_enqueue_event( + sender, + TelemetryEvent { + endpoint, + name, + event_ts: timestamp(), + event, + }, + ); +} + +pub fn emit_lifecycle( + resource: LifecycleResource, + operation: LifecycleOperation, + outcome: TelemetryOutcome, +) { + emit_event( + "openshell_lifecycle_event", + json!({ + "nvidiaSource": SOURCE.as_str(), + "resource": resource.as_str(), + "operation": operation.as_str(), + "outcome": outcome.as_str(), + }), + ); +} + +pub fn emit_provider_lifecycle( + operation: LifecycleOperation, + outcome: TelemetryOutcome, + provider_profile: ProviderProfile, +) { + emit_event( + "openshell_provider_lifecycle_event", + json!({ + "nvidiaSource": SOURCE.as_str(), + "operation": operation.as_str(), + "outcome": outcome.as_str(), + "providerProfile": provider_profile.as_str(), + }), + ); +} + +pub fn emit_sandbox_create( + outcome: TelemetryOutcome, + requested_gpu: bool, + provider_count: u64, + has_custom_policy: bool, + template_source: SandboxTemplateSource, + compute_driver: TelemetryComputeDriver, +) { + if !valid_count(provider_count) { + return; + } + emit_event( + "openshell_sandbox_create_event", + json!({ + "nvidiaSource": SOURCE.as_str(), + "outcome": outcome.as_str(), + "requestedGpu": requested_gpu, + "providerCount": provider_count, + "hasCustomPolicy": has_custom_policy, + "templateSource": template_source.as_str(), + "computeDriver": compute_driver.as_str(), + }), + ); +} + +pub fn emit_policy_decision( + operation: PolicyDecisionOperation, + outcome: TelemetryOutcome, + rule_count: u64, +) { + if !valid_count(rule_count) { + return; + } + emit_event( + "openshell_policy_decision_event", + json!({ + "nvidiaSource": SOURCE.as_str(), + "operation": operation.as_str(), + "outcome": outcome.as_str(), + "ruleCount": rule_count, + }), + ); +} + +pub fn emit_sandbox_activity_summary( + network_activity_count: u64, + denied_action_count: u64, + denial_rate_pct: f64, + denials_by_group: I, +) where + I: IntoIterator, +{ + if !valid_count(network_activity_count) + || !valid_count(denied_action_count) + || !denial_rate_pct.is_finite() + || !(0.0..=100.0).contains(&denial_rate_pct) + { + return; + } + let Some(denials_by_group) = sanitize_denials_by_group(denials_by_group) else { + return; + }; + let rows: Vec = denials_by_group + .into_iter() + .map(|(group, count)| json!({ "denyGroup": group.as_str(), "deniedCount": count })) + .collect(); + emit_event( + "openshell_sandbox_activity_summary_event", + json!({ + "nvidiaSource": SOURCE.as_str(), + "networkActivityCount": network_activity_count, + "deniedActionCount": denied_action_count, + "denialRatePct": denial_rate_pct, + "denialsByGroup": rows, + }), + ); +} + +fn valid_count(value: u64) -> bool { + value <= MAX_TELEMETRY_INTEGER +} + +fn sanitize_denials_by_group(denials_by_group: I) -> Option> +where + I: IntoIterator, +{ + let mut sanitized = BTreeMap::::new(); + for (group, count) in denials_by_group { + if !valid_count(count) { + return None; + } + let next_count = sanitized + .get(&group) + .copied() + .unwrap_or(0) + .checked_add(count)?; + if !valid_count(next_count) { + return None; + } + sanitized.insert(group, next_count); + } + Some(sanitized) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn telemetry_enabled_defaults_true() { + assert!(telemetry_enabled_from(None)); + } + + #[test] + fn telemetry_enabled_honors_false_values() { + assert!(!telemetry_enabled_from(Some("off"))); + assert!(!telemetry_enabled_from(Some("false"))); + assert!(telemetry_enabled_from(Some("yes"))); + } + + #[test] + fn telemetry_enabled_env_value_is_normalized() { + assert_eq!(enabled_env_value_from(Some("false")), "false"); + assert_eq!(enabled_env_value_from(Some("0")), "false"); + assert_eq!(enabled_env_value_from(None), "true"); + assert_eq!(enabled_env_value_from(Some("yes")), "true"); + } + + #[test] + fn telemetry_endpoint_empty_disables_publish() { + assert_eq!(telemetry_endpoint_from(Some(" ")), None); + assert_eq!( + telemetry_endpoint_from(None), + Some(DEFAULT_ENDPOINT.to_string()) + ); + } + + #[test] + fn build_payload_matches_schema_metadata() { + let payload = build_payload( + "openshell_sandbox_create_event", + json!({ + "nvidiaSource": SOURCE.as_str(), + "outcome": "success", + "requestedGpu": false, + "providerCount": 1, + "hasCustomPolicy": true, + "templateSource": "default", + "computeDriver": "docker", + }), + "2026-05-18T00:00:00.000Z", + "2026-05-18T00:00:01.000Z", + ); + + assert_eq!(payload["clientId"], CLIENT_ID); + assert_eq!(payload["clientVer"], crate::VERSION); + assert_eq!(payload["eventSchemaVer"], EVENT_SCHEMA_VERSION); + assert_eq!(payload["deviceId"], "undefined"); + assert_eq!(payload["userId"], "undefined"); + assert_eq!( + payload["events"][0]["name"], + "openshell_sandbox_create_event" + ); + assert_eq!( + payload["events"][0]["parameters"]["nvidiaSource"], + SOURCE.as_str() + ); + assert_eq!(payload["events"][0]["ts"], "2026-05-18T00:00:00.000Z"); + assert_eq!(payload["sentTs"], "2026-05-18T00:00:01.000Z"); + } + + #[test] + fn compute_driver_values_are_sanitized() { + assert_eq!( + TelemetryComputeDriver::from_raw("docker").as_str(), + "docker" + ); + assert_eq!( + TelemetryComputeDriver::from_raw("k8s").as_str(), + "kubernetes" + ); + assert_eq!( + TelemetryComputeDriver::from_raw("KUBERNETES").as_str(), + "kubernetes" + ); + assert_eq!(TelemetryComputeDriver::from_raw("vm").as_str(), "vm"); + assert_eq!( + TelemetryComputeDriver::from_raw("podman").as_str(), + "podman" + ); + assert_eq!( + TelemetryComputeDriver::from_raw("private-driver").as_str(), + "unknown" + ); + } + + #[test] + fn telemetry_enqueue_drops_when_queue_is_full() { + let (tx, _rx) = mpsc::sync_channel(1); + let event = || TelemetryEvent { + endpoint: "https://example.test/events".to_string(), + name: "openshell_lifecycle_event", + event_ts: "2026-05-18T00:00:00.000Z".to_string(), + event: json!({ + "nvidiaSource": SOURCE.as_str(), + "resource": "sandbox", + "operation": "create", + "outcome": "success", + }), + }; + + assert!(try_enqueue_event(&tx, event())); + assert!(!try_enqueue_event(&tx, event())); + } + + #[test] + fn telemetry_validation_maps_privacy_sensitive_strings_to_safe_buckets() { + assert_eq!( + ProviderProfile::from_raw("corp-llm-prod"), + ProviderProfile::Custom + ); + assert_eq!( + DenyGroup::from_raw("host=private.example"), + DenyGroup::Unknown + ); + } + + #[test] + fn telemetry_enums_serialize_to_expected_strings() { + assert_eq!(LifecycleResource::SandboxPolicy.as_str(), "sandbox_policy"); + assert_eq!(LifecycleOperation::Delete.as_str(), "delete"); + assert_eq!(PolicyDecisionOperation::ApproveAll.as_str(), "approve_all"); + assert_eq!(TelemetryOutcome::Failure.as_str(), "failure"); + assert_eq!(SandboxTemplateSource::Undefined.as_str(), "undefined"); + assert!(!valid_count(MAX_TELEMETRY_INTEGER + 1)); + } + + #[test] + fn activity_groups_are_sanitized_and_aggregated() { + let rows = sanitize_denials_by_group([ + (DenyGroup::from_raw("connect"), 1), + (DenyGroup::from_raw("connect_policy"), 2), + (DenyGroup::from_raw("host=private.example"), 3), + ]) + .expect("rows should sanitize"); + + assert_eq!(rows.get(&DenyGroup::ConnectPolicy), Some(&3)); + assert_eq!(rows.get(&DenyGroup::Unknown), Some(&3)); + } +} diff --git a/crates/openshell-driver-docker/Cargo.toml b/crates/openshell-driver-docker/Cargo.toml index e2c97532a..0cdc205ed 100644 --- a/crates/openshell-driver-docker/Cargo.toml +++ b/crates/openshell-driver-docker/Cargo.toml @@ -25,5 +25,8 @@ tar = "0.4" tempfile = "3" url = { workspace = true } +[dev-dependencies] +temp-env = "0.3" + [lints] workspace = true diff --git a/crates/openshell-driver-docker/src/lib.rs b/crates/openshell-driver-docker/src/lib.rs index 3a0772217..3c0dbd882 100644 --- a/crates/openshell-driver-docker/src/lib.rs +++ b/crates/openshell-driver-docker/src/lib.rs @@ -974,6 +974,10 @@ fn build_environment(sandbox: &DriverSandbox, config: &DockerDriverRuntimeConfig openshell_core::sandbox_env::SANDBOX_COMMAND.to_string(), SANDBOX_COMMAND.to_string(), ); + environment.insert( + openshell_core::sandbox_env::TELEMETRY_ENABLED.to_string(), + openshell_core::telemetry::enabled_env_value().to_string(), + ); // The root supervisor executes namespace helpers during bootstrap; keep // their search path driver-owned even when the template/spec set PATH. environment.insert("PATH".to_string(), SUPERVISOR_PATH.to_string()); diff --git a/crates/openshell-driver-docker/src/tests.rs b/crates/openshell-driver-docker/src/tests.rs index 2ac2da1ee..d1ca05be1 100644 --- a/crates/openshell-driver-docker/src/tests.rs +++ b/crates/openshell-driver-docker/src/tests.rs @@ -8,9 +8,11 @@ use openshell_core::proto::compute::v1::{ }; use std::fs; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; +use std::sync::{LazyLock, Mutex}; use tempfile::TempDir; const TLS_MOUNT_DIR: &str = "/etc/openshell/tls/client"; +static ENV_LOCK: LazyLock> = LazyLock::new(|| Mutex::new(())); fn test_sandbox() -> DriverSandbox { // Mirrors the gateway-supplied request: the public `Sandbox` API no @@ -420,6 +422,41 @@ fn build_environment_keeps_path_driver_controlled() { assert_eq!(path_entries[0], &expected_path); } +#[test] +fn build_environment_keeps_telemetry_toggle_driver_controlled() { + let _guard = ENV_LOCK.lock().unwrap(); + temp_env::with_vars( + [( + openshell_core::sandbox_env::TELEMETRY_ENABLED, + Some("false"), + )], + || { + let mut sandbox = test_sandbox(); + sandbox.spec.as_mut().unwrap().environment.insert( + openshell_core::sandbox_env::TELEMETRY_ENABLED.to_string(), + "true".to_string(), + ); + + let env = build_environment(&sandbox, &runtime_config()); + let telemetry_entries = env + .iter() + .filter(|entry| { + entry.starts_with(&format!( + "{}=", + openshell_core::sandbox_env::TELEMETRY_ENABLED + )) + }) + .collect::>(); + + assert_eq!(telemetry_entries.len(), 1); + assert_eq!( + telemetry_entries[0], + &format!("{}=false", openshell_core::sandbox_env::TELEMETRY_ENABLED) + ); + }, + ); +} + #[test] fn build_binds_uses_docker_tls_directory() { let binds = build_binds(&runtime_config()); diff --git a/crates/openshell-driver-kubernetes/Cargo.toml b/crates/openshell-driver-kubernetes/Cargo.toml index c222c9c31..885c64944 100644 --- a/crates/openshell-driver-kubernetes/Cargo.toml +++ b/crates/openshell-driver-kubernetes/Cargo.toml @@ -34,5 +34,8 @@ tracing-subscriber = { workspace = true } thiserror = { workspace = true } miette = { workspace = true } +[dev-dependencies] +temp-env = "0.3" + [lints] workspace = true diff --git a/crates/openshell-driver-kubernetes/src/driver.rs b/crates/openshell-driver-kubernetes/src/driver.rs index a624f787e..f8877e74c 100644 --- a/crates/openshell-driver-kubernetes/src/driver.rs +++ b/crates/openshell-driver-kubernetes/src/driver.rs @@ -1424,6 +1424,11 @@ fn apply_required_env( openshell_core::sandbox_env::SANDBOX_COMMAND, "sleep infinity", ); + upsert_env( + env, + openshell_core::sandbox_env::TELEMETRY_ENABLED, + openshell_core::telemetry::enabled_env_value(), + ); if !ssh_socket_path.is_empty() { upsert_env( env, @@ -1592,6 +1597,9 @@ mod tests { }; use prost_types::{Struct, Value, value::Kind}; + static ENV_LOCK: std::sync::LazyLock> = + std::sync::LazyLock::new(|| std::sync::Mutex::new(())); + #[test] fn kube_pulling_event_adds_image_progress_metadata() { let mut metadata = std::collections::HashMap::new(); @@ -2475,6 +2483,37 @@ mod tests { assert!(cr["spec"].get("logLevel").is_none()); } + #[test] + fn telemetry_toggle_propagates_from_driver_env_to_sandbox_pod() { + let _guard = ENV_LOCK.lock().unwrap(); + temp_env::with_vars( + [( + openshell_core::sandbox_env::TELEMETRY_ENABLED, + Some("false"), + )], + || { + let spec = SandboxSpec { + environment: std::collections::HashMap::from([( + openshell_core::sandbox_env::TELEMETRY_ENABLED.to_string(), + "true".to_string(), + )]), + ..SandboxSpec::default() + }; + let cr = sandbox_to_k8s_spec(Some(&spec), &SandboxPodParams::default()); + let env = cr["spec"]["podTemplate"]["spec"]["containers"][0]["env"] + .as_array() + .unwrap(); + let telemetry_entries = env + .iter() + .filter(|entry| entry["name"] == openshell_core::sandbox_env::TELEMETRY_ENABLED) + .collect::>(); + + assert_eq!(telemetry_entries.len(), 1); + assert_eq!(telemetry_entries[0]["value"], serde_json::json!("false")); + }, + ); + } + #[test] fn node_selector_from_platform_config() { let template = SandboxTemplate { diff --git a/crates/openshell-driver-podman/src/container.rs b/crates/openshell-driver-podman/src/container.rs index c3f2c3282..d60ef6e67 100644 --- a/crates/openshell-driver-podman/src/container.rs +++ b/crates/openshell-driver-podman/src/container.rs @@ -280,6 +280,10 @@ fn build_env( openshell_core::sandbox_env::SANDBOX_COMMAND.into(), "sleep infinity".into(), ); + env.insert( + openshell_core::sandbox_env::TELEMETRY_ENABLED.into(), + openshell_core::telemetry::enabled_env_value().into(), + ); // 3. TLS client cert paths (when mTLS is enabled). These point to // the container-side mount paths where the cert files are @@ -636,6 +640,9 @@ fn parse_memory_to_bytes(quantity: &str) -> Option { mod tests { use super::*; + static ENV_LOCK: std::sync::LazyLock> = + std::sync::LazyLock::new(|| std::sync::Mutex::new(())); + #[test] fn parse_cpu_millicore() { assert_eq!(parse_cpu_to_microseconds("500m"), Some(50_000)); @@ -908,6 +915,41 @@ mod tests { ); } + #[test] + fn container_spec_telemetry_toggle_comes_from_driver_env() { + use openshell_core::proto::compute::v1::{DriverSandboxSpec, DriverSandboxTemplate}; + + let _guard = ENV_LOCK.lock().unwrap(); + temp_env::with_vars( + [( + openshell_core::sandbox_env::TELEMETRY_ENABLED, + Some("false"), + )], + || { + let mut sandbox = test_sandbox("test-id", "legit-name"); + sandbox.spec = Some(DriverSandboxSpec { + environment: std::collections::HashMap::from([( + openshell_core::sandbox_env::TELEMETRY_ENABLED.to_string(), + "true".to_string(), + )]), + template: Some(DriverSandboxTemplate::default()), + ..Default::default() + }); + + let spec = build_container_spec(&sandbox, &test_config()); + let env_map = spec["env"].as_object().expect("env should be an object"); + + assert_eq!( + env_map + .get(openshell_core::sandbox_env::TELEMETRY_ENABLED) + .and_then(|v| v.as_str()), + Some("false"), + "telemetry toggle must come from the deployment environment" + ); + }, + ); + } + #[test] fn container_spec_required_labels_cannot_be_overridden() { use openshell_core::proto::compute::v1::{DriverSandboxSpec, DriverSandboxTemplate}; diff --git a/crates/openshell-driver-vm/Cargo.toml b/crates/openshell-driver-vm/Cargo.toml index fb1964415..fd583465a 100644 --- a/crates/openshell-driver-vm/Cargo.toml +++ b/crates/openshell-driver-vm/Cargo.toml @@ -45,6 +45,9 @@ flate2 = "1" sha2 = "0.10" zstd = "0.13" +[dev-dependencies] +temp-env = "0.3" + # smol-rs/polling drives the BSD/macOS parent-death detection in # procguard via kqueue's EVFILT_PROC / NOTE_EXIT filter. We could use # it on Linux too (via epoll + pidfd) but sticking with diff --git a/crates/openshell-driver-vm/src/driver.rs b/crates/openshell-driver-vm/src/driver.rs index 52a9729f8..6c605cd2c 100644 --- a/crates/openshell-driver-vm/src/driver.rs +++ b/crates/openshell-driver-vm/src/driver.rs @@ -3494,6 +3494,10 @@ fn build_guest_environment( ])); } environment.extend(merged_environment(sandbox)); + environment.insert( + openshell_core::sandbox_env::TELEMETRY_ENABLED.to_string(), + openshell_core::telemetry::enabled_env_value().to_string(), + ); let mut pairs = environment.into_iter().collect::>(); pairs.sort_by(|left, right| left.0.cmp(&right.0)); @@ -4393,6 +4397,9 @@ mod tests { use std::time::{SystemTime, UNIX_EPOCH}; use tonic::Code; + static ENV_LOCK: std::sync::LazyLock> = + std::sync::LazyLock::new(|| std::sync::Mutex::new(())); + #[test] fn vm_pulling_layer_event_adds_progress_detail_metadata() { let mut event = platform_event( @@ -4963,6 +4970,52 @@ mod tests { ))); } + #[test] + fn build_guest_environment_uses_deployment_telemetry_toggle() { + let _guard = ENV_LOCK.lock().unwrap(); + temp_env::with_vars( + [( + openshell_core::sandbox_env::TELEMETRY_ENABLED, + Some("false"), + )], + || { + let config = VmDriverConfig { + openshell_endpoint: "http://127.0.0.1:8080".to_string(), + ..Default::default() + }; + let sandbox = Sandbox { + id: "sandbox-123".to_string(), + name: "sandbox-123".to_string(), + spec: Some(SandboxSpec { + environment: HashMap::from([( + openshell_core::sandbox_env::TELEMETRY_ENABLED.to_string(), + "true".to_string(), + )]), + ..Default::default() + }), + ..Default::default() + }; + + let env = build_guest_environment(&sandbox, &config, None); + let telemetry_entries = env + .iter() + .filter(|entry| { + entry.starts_with(&format!( + "{}=", + openshell_core::sandbox_env::TELEMETRY_ENABLED + )) + }) + .collect::>(); + + assert_eq!(telemetry_entries.len(), 1); + assert_eq!( + telemetry_entries[0], + &format!("{}=false", openshell_core::sandbox_env::TELEMETRY_ENABLED) + ); + }, + ); + } + #[test] fn build_guest_environment_uses_endpoint_override_for_tap() { let config = VmDriverConfig { diff --git a/crates/openshell-sandbox/src/activity_aggregator.rs b/crates/openshell-sandbox/src/activity_aggregator.rs new file mode 100644 index 000000000..a653fb7b2 --- /dev/null +++ b/crates/openshell-sandbox/src/activity_aggregator.rs @@ -0,0 +1,223 @@ +// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +//! Anonymous sandbox network activity counter aggregation. + +use std::collections::HashMap; +use std::future::Future; +use tokio::sync::mpsc; +use tracing::debug; + +pub const ACTIVITY_EVENT_QUEUE_CAPACITY: usize = 1024; +const ACTIVITY_FLUSH_QUEUE_CAPACITY: usize = 1; +pub const DEFAULT_ACTIVITY_FLUSH_INTERVAL_SECS: u64 = 10; + +#[derive(Debug, Clone)] +pub struct ActivityEvent { + pub denied: bool, + pub deny_group: &'static str, +} + +pub type ActivitySender = mpsc::Sender; + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct FlushableActivitySummary { + pub network_activity_count: u32, + pub denied_action_count: u32, + pub denials_by_group: Vec<(String, u32)>, +} + +pub struct ActivityAggregator { + rx: mpsc::Receiver, + network_activity_count: u32, + denied_action_count: u32, + denials_by_group: HashMap, + flush_interval_secs: u64, +} + +impl ActivityAggregator { + pub fn new(rx: mpsc::Receiver, flush_interval_secs: u64) -> Self { + Self { + rx, + network_activity_count: 0, + denied_action_count: 0, + denials_by_group: HashMap::new(), + flush_interval_secs, + } + } + + pub async fn run(mut self, flush_callback: F) + where + F: Fn(FlushableActivitySummary) -> Fut + Send + 'static, + Fut: Future + Send + 'static, + { + let (flush_tx, mut flush_rx) = + mpsc::channel::(ACTIVITY_FLUSH_QUEUE_CAPACITY); + tokio::spawn(async move { + while let Some(summary) = flush_rx.recv().await { + flush_callback(summary).await; + } + }); + + let mut flush_interval = + tokio::time::interval(std::time::Duration::from_secs(self.flush_interval_secs)); + flush_interval.tick().await; + + loop { + tokio::select! { + event = self.rx.recv() => { + if let Some(event) = event { + self.ingest(event); + } else { + if let Some(summary) = self.drain() { + queue_flush_summary(&flush_tx, summary); + } + debug!("ActivityAggregator: channel closed, exiting"); + return; + } + } + _ = flush_interval.tick() => { + if let Some(summary) = self.drain() { + debug!( + count = summary.network_activity_count, + denied = summary.denied_action_count, + "ActivityAggregator: flushing anonymous activity summary" + ); + queue_flush_summary(&flush_tx, summary); + } + } + } + } + } + + fn ingest(&mut self, event: ActivityEvent) { + self.network_activity_count = self.network_activity_count.saturating_add(1); + if event.denied { + self.denied_action_count = self.denied_action_count.saturating_add(1); + let group = sanitize_deny_group(event.deny_group).to_string(); + let count = self.denials_by_group.entry(group).or_default(); + *count = count.saturating_add(1); + } + } + + fn drain(&mut self) -> Option { + if self.network_activity_count == 0 { + return None; + } + let mut denials_by_group: Vec<(String, u32)> = self.denials_by_group.drain().collect(); + denials_by_group.sort_by(|left, right| left.0.cmp(&right.0)); + let summary = FlushableActivitySummary { + network_activity_count: self.network_activity_count, + denied_action_count: self.denied_action_count, + denials_by_group, + }; + self.network_activity_count = 0; + self.denied_action_count = 0; + Some(summary) + } +} + +pub fn try_record_activity(tx: &ActivitySender, denied: bool, deny_group: &'static str) -> bool { + tx.try_send(ActivityEvent { denied, deny_group }).is_ok() +} + +pub fn activity_flush_interval_secs_from_env(value: Option<&str>) -> u64 { + value + .and_then(|value| value.parse::().ok()) + .filter(|value| *value > 0) + .unwrap_or(DEFAULT_ACTIVITY_FLUSH_INTERVAL_SECS) +} + +fn queue_flush_summary( + tx: &mpsc::Sender, + summary: FlushableActivitySummary, +) -> bool { + tx.try_send(summary).is_ok() +} + +pub fn sanitize_deny_group(raw: &str) -> &'static str { + match raw { + "connect_policy" | "connect" | "l4_deny" => "connect_policy", + "forward_policy" | "forward" => "forward_policy", + "l7_policy" | "l7" | "l7_deny" | "forward-l7-deny" => "l7_policy", + "l7_parse_rejection" | "parse_rejection" => "l7_parse_rejection", + "ssrf" => "ssrf", + "bypass" => "bypass", + "policy_stale" => "policy_stale", + _ => "unknown", + } +} + +#[cfg(test)] +fn denial_rate_pct(network_activity_count: u32, denied_action_count: u32) -> f64 { + if network_activity_count == 0 { + return 0.0; + } + ((f64::from(denied_action_count) / f64::from(network_activity_count)) * 100.0).clamp(0.0, 100.0) +} + +#[cfg(test)] +mod tests { + use super::*; + + fn assert_float_eq(actual: f64, expected: f64) { + assert!((actual - expected).abs() <= f64::EPSILON); + } + + #[test] + fn deny_group_sanitization_uses_allowlist() { + assert_eq!(sanitize_deny_group("connect"), "connect_policy"); + assert_eq!(sanitize_deny_group("forward-l7-deny"), "l7_policy"); + assert_eq!(sanitize_deny_group("host=example.test/path"), "unknown"); + assert_eq!(sanitize_deny_group("acme.internal:443"), "unknown"); + assert_eq!( + sanitize_deny_group("binary=/usr/local/bin/private"), + "unknown" + ); + } + + #[test] + fn denial_rate_handles_zero_and_clamps() { + assert_float_eq(denial_rate_pct(0, 10), 0.0); + assert_float_eq(denial_rate_pct(4, 1), 25.0); + assert_float_eq(denial_rate_pct(4, 10), 100.0); + } + + #[test] + fn activity_send_drops_when_queue_is_full() { + let (tx, _rx) = mpsc::channel(1); + + assert!(try_record_activity(&tx, false, "unknown")); + assert!(!try_record_activity(&tx, true, "connect_policy")); + } + + #[test] + fn flush_summary_drops_when_queue_is_full() { + let (tx, _rx) = mpsc::channel(1); + let summary = FlushableActivitySummary { + network_activity_count: 1, + denied_action_count: 0, + denials_by_group: Vec::new(), + }; + + assert!(queue_flush_summary(&tx, summary.clone())); + assert!(!queue_flush_summary(&tx, summary)); + } + + #[test] + fn activity_flush_interval_uses_positive_values_only() { + assert_eq!( + activity_flush_interval_secs_from_env(None), + DEFAULT_ACTIVITY_FLUSH_INTERVAL_SECS + ); + assert_eq!( + activity_flush_interval_secs_from_env(Some("not-a-number")), + DEFAULT_ACTIVITY_FLUSH_INTERVAL_SECS + ); + assert_eq!( + activity_flush_interval_secs_from_env(Some("0")), + DEFAULT_ACTIVITY_FLUSH_INTERVAL_SECS + ); + assert_eq!(activity_flush_interval_secs_from_env(Some("5")), 5); + } +} diff --git a/crates/openshell-sandbox/src/bypass_monitor.rs b/crates/openshell-sandbox/src/bypass_monitor.rs index 9e37ef27c..df90362ec 100644 --- a/crates/openshell-sandbox/src/bypass_monitor.rs +++ b/crates/openshell-sandbox/src/bypass_monitor.rs @@ -16,6 +16,7 @@ //! the monitor logs a one-time warning and returns. The nftables reject rules //! still provide fast-fail UX — the monitor only adds diagnostic visibility. +use crate::activity_aggregator::{ActivitySender, try_record_activity}; use crate::denial_aggregator::DenialEvent; use openshell_ocsf::{ ActionId, ActivityId, ConfidenceId, DetectionFindingBuilder, DispositionId, Endpoint, @@ -118,6 +119,7 @@ pub fn spawn( namespace_name: String, entrypoint_pid: Arc, denial_tx: Option>, + activity_tx: Option, ) -> Option> { use std::io::BufRead; use std::process::{Command, Stdio}; @@ -277,6 +279,9 @@ pub fn spawn( l7_path: None, }); } + if let Some(ref tx) = activity_tx { + let _ = try_record_activity(tx, true, "bypass"); + } } // Clean up the dmesg child process. diff --git a/crates/openshell-sandbox/src/grpc_client.rs b/crates/openshell-sandbox/src/grpc_client.rs index 3fccb680f..a24e763f1 100644 --- a/crates/openshell-sandbox/src/grpc_client.rs +++ b/crates/openshell-sandbox/src/grpc_client.rs @@ -10,10 +10,10 @@ use std::time::Duration; use miette::{IntoDiagnostic, Result, WrapErr}; use openshell_core::proto::{ DenialSummary, GetDraftPolicyRequest, GetInferenceBundleRequest, GetInferenceBundleResponse, - GetSandboxConfigRequest, GetSandboxProviderEnvironmentRequest, PolicyChunk, PolicySource, - PolicyStatus, ReportPolicyStatusRequest, SandboxPolicy as ProtoSandboxPolicy, - SubmitPolicyAnalysisRequest, SubmitPolicyAnalysisResponse, UpdateConfigRequest, - inference_client::InferenceClient, open_shell_client::OpenShellClient, + GetSandboxConfigRequest, GetSandboxProviderEnvironmentRequest, NetworkActivitySummary, + PolicyChunk, PolicySource, PolicyStatus, ReportPolicyStatusRequest, + SandboxPolicy as ProtoSandboxPolicy, SubmitPolicyAnalysisRequest, SubmitPolicyAnalysisResponse, + UpdateConfigRequest, inference_client::InferenceClient, open_shell_client::OpenShellClient, }; use tonic::transport::{Certificate, Channel, ClientTlsConfig, Endpoint, Identity}; use tracing::debug; @@ -310,6 +310,7 @@ impl CachedOpenShellClient { sandbox_name: &str, summaries: Vec, proposed_chunks: Vec, + network_activity_summaries: Vec, analysis_mode: &str, ) -> Result { let response = self @@ -319,6 +320,7 @@ impl CachedOpenShellClient { name: sandbox_name.to_string(), summaries, proposed_chunks, + network_activity_summaries, analysis_mode: analysis_mode.to_string(), }) .await diff --git a/crates/openshell-sandbox/src/l7/graphql.rs b/crates/openshell-sandbox/src/l7/graphql.rs index 5d0746d01..2ff502d1c 100644 --- a/crates/openshell-sandbox/src/l7/graphql.rs +++ b/crates/openshell-sandbox/src/l7/graphql.rs @@ -801,6 +801,7 @@ network_policies: ancestors: Vec::new(), cmdline_paths: Vec::new(), secret_resolver: None, + activity_tx: None, }; let request_info = crate::l7::L7RequestInfo { action: req.action, diff --git a/crates/openshell-sandbox/src/l7/relay.rs b/crates/openshell-sandbox/src/l7/relay.rs index 6d271af21..9efa7ca9f 100644 --- a/crates/openshell-sandbox/src/l7/relay.rs +++ b/crates/openshell-sandbox/src/l7/relay.rs @@ -7,6 +7,7 @@ //! Parses each request within the tunnel, evaluates it against OPA policy, //! and either forwards or denies the request. +use crate::activity_aggregator::{ActivitySender, try_record_activity}; use crate::l7::provider::{L7Provider, RelayOutcome}; use crate::l7::rest::WebSocketExtensionMode; use crate::l7::{EnforcementMode, L7EndpointConfig, L7Protocol, L7RequestInfo}; @@ -37,6 +38,8 @@ pub struct L7EvalContext { pub cmdline_paths: Vec, /// Supervisor-only placeholder resolver for outbound headers. pub(crate) secret_resolver: Option>, + /// Anonymous activity counter channel. + pub(crate) activity_tx: Option, } #[derive(Default)] @@ -119,6 +122,7 @@ fn emit_parse_rejection(ctx: &L7EvalContext, detail: &str, engine_type: &str) { .status_detail(detail) .build(); ocsf_emit!(event); + emit_activity(ctx, true, "l7_parse_rejection"); } /// Run protocol-aware L7 inspection on a tunnel. @@ -448,6 +452,13 @@ fn emit_l7_request_log( )) .build(); ocsf_emit!(event); + emit_activity(ctx, decision_str == "deny", "l7_policy"); +} + +fn emit_activity(ctx: &L7EvalContext, denied: bool, deny_group: &'static str) { + if let Some(tx) = &ctx.activity_tx { + let _ = try_record_activity(tx, denied, deny_group); + } } /// Handle an upgraded connection (101 Switching Protocols). @@ -1371,6 +1382,7 @@ network_policies: ancestors: vec![], cmdline_paths: vec![], secret_resolver: None, + activity_tx: None, }; let request = L7RequestInfo { action: "WEBSOCKET_TEXT".into(), @@ -1426,6 +1438,7 @@ network_policies: ancestors: vec![], cmdline_paths: vec![], secret_resolver: None, + activity_tx: None, }; let (mut app, mut relay_client) = tokio::io::duplex(8192); @@ -1530,6 +1543,7 @@ network_policies: ancestors: vec![], cmdline_paths: vec![], secret_resolver: resolver.map(Arc::new), + activity_tx: None, }; let (mut app, mut relay_client) = tokio::io::duplex(8192); @@ -1647,6 +1661,7 @@ network_policies: ancestors: vec![], cmdline_paths: vec![], secret_resolver: resolver.map(Arc::new), + activity_tx: None, }; let (mut app, mut relay_client) = tokio::io::duplex(8192); @@ -1817,6 +1832,7 @@ network_policies: ancestors: vec![], cmdline_paths: vec![], secret_resolver: None, + activity_tx: None, }; let (mut app, mut relay_client) = tokio::io::duplex(8192); @@ -1904,6 +1920,7 @@ network_policies: ancestors: vec![], cmdline_paths: vec![], secret_resolver: None, + activity_tx: None, }; let (mut app, mut relay_client) = tokio::io::duplex(8192); diff --git a/crates/openshell-sandbox/src/l7/websocket.rs b/crates/openshell-sandbox/src/l7/websocket.rs index 2dc1b25c3..89a6e6c51 100644 --- a/crates/openshell-sandbox/src/l7/websocket.rs +++ b/crates/openshell-sandbox/src/l7/websocket.rs @@ -1270,6 +1270,7 @@ network_policies: ancestors: vec![], cmdline_paths: vec![], secret_resolver: None, + activity_tx: None, }; let (mut client_write, mut relay_read) = tokio::io::duplex(MAX_TEXT_MESSAGE_BYTES + 1024); let (mut relay_write, mut upstream_read) = tokio::io::duplex(MAX_TEXT_MESSAGE_BYTES + 1024); diff --git a/crates/openshell-sandbox/src/lib.rs b/crates/openshell-sandbox/src/lib.rs index ded56ce9e..be9392044 100644 --- a/crates/openshell-sandbox/src/lib.rs +++ b/crates/openshell-sandbox/src/lib.rs @@ -5,6 +5,7 @@ //! //! This crate provides process sandboxing and monitoring capabilities. +mod activity_aggregator; pub mod bypass_monitor; mod child_env; pub mod denial_aggregator; @@ -237,6 +238,23 @@ fn route_refresh_interval_secs() -> u64 { } } +type ActivityCollectionChannels = ( + Option, + Option>, + Option, +); + +fn activity_collection_channels(sandbox_id: Option<&str>) -> ActivityCollectionChannels { + if sandbox_id.is_some() && openshell_core::telemetry::enabled() { + let (tx, rx) = + tokio::sync::mpsc::channel(activity_aggregator::ACTIVITY_EVENT_QUEUE_CAPACITY); + let bypass_tx = tx.clone(); + (Some(tx), Some(rx), Some(bypass_tx)) + } else { + (None, None, None) + } +} + #[cfg(target_os = "linux")] static MANAGED_CHILDREN: LazyLock>> = LazyLock::new(|| Mutex::new(HashSet::new())); @@ -571,67 +589,80 @@ pub async fn run_sandbox( // the entrypoint process's /proc/net/tcp for identity binding. let entrypoint_pid = Arc::new(AtomicU32::new(0)); - let (_proxy, denial_rx, bypass_denial_tx) = if matches!(policy.network.mode, NetworkMode::Proxy) - { - let proxy_policy = policy.network.proxy.as_ref().ok_or_else(|| { - miette::miette!("Network mode is set to proxy but no proxy configuration was provided") - })?; - - let engine = opa_engine.clone().ok_or_else(|| { - miette::miette!("Proxy mode requires an OPA engine (--rego-policy and --rego-data)") - })?; + let (_proxy, denial_rx, bypass_denial_tx, activity_rx, bypass_activity_tx) = + if matches!(policy.network.mode, NetworkMode::Proxy) { + let proxy_policy = policy.network.proxy.as_ref().ok_or_else(|| { + miette::miette!( + "Network mode is set to proxy but no proxy configuration was provided" + ) + })?; - let cache = identity_cache.clone().ok_or_else(|| { - miette::miette!("Proxy mode requires an identity cache (OPA engine must be configured)") - })?; + let engine = opa_engine.clone().ok_or_else(|| { + miette::miette!("Proxy mode requires an OPA engine (--rego-policy and --rego-data)") + })?; - // If we have a network namespace, bind to the veth host IP so sandboxed - // processes can reach the proxy via TCP. - #[cfg(target_os = "linux")] - let bind_addr = netns.as_ref().map(|ns| { - let port = proxy_policy.http_addr.map_or(3128, |addr| addr.port()); - SocketAddr::new(ns.host_ip(), port) - }); + let cache = identity_cache.clone().ok_or_else(|| { + miette::miette!( + "Proxy mode requires an identity cache (OPA engine must be configured)" + ) + })?; - #[cfg(not(target_os = "linux"))] - let bind_addr: Option = None; + // If we have a network namespace, bind to the veth host IP so sandboxed + // processes can reach the proxy via TCP. + #[cfg(target_os = "linux")] + let bind_addr = netns.as_ref().map(|ns| { + let port = proxy_policy.http_addr.map_or(3128, |addr| addr.port()); + SocketAddr::new(ns.host_ip(), port) + }); - // Build inference context for local routing of intercepted inference calls. - let inference_ctx = build_inference_context( - sandbox_id.as_deref(), - openshell_endpoint_for_proxy.as_deref(), - inference_routes.as_deref(), - ) - .await?; + #[cfg(not(target_os = "linux"))] + let bind_addr: Option = None; - // Create denial aggregator channel if in gRPC mode (sandbox_id present). - // Clone the sender for the bypass monitor before passing to the proxy. - let (denial_tx, denial_rx, bypass_denial_tx) = if sandbox_id.is_some() { - let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); - let bypass_tx = tx.clone(); - (Some(tx), Some(rx), Some(bypass_tx)) + // Build inference context for local routing of intercepted inference calls. + let inference_ctx = build_inference_context( + sandbox_id.as_deref(), + openshell_endpoint_for_proxy.as_deref(), + inference_routes.as_deref(), + ) + .await?; + + // Create denial aggregator channel if in gRPC mode (sandbox_id present). + // Clone the sender for the bypass monitor before passing to the proxy. + let (denial_tx, denial_rx, bypass_denial_tx) = if sandbox_id.is_some() { + let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); + let bypass_tx = tx.clone(); + (Some(tx), Some(rx), Some(bypass_tx)) + } else { + (None, None, None) + }; + let (activity_tx, activity_rx, bypass_activity_tx) = + activity_collection_channels(sandbox_id.as_deref()); + + let proxy_handle = ProxyHandle::start_with_bind_addr( + proxy_policy, + bind_addr, + engine, + cache, + entrypoint_pid.clone(), + tls_state, + inference_ctx, + Some(provider_credentials.clone()), + Some(policy_local_ctx.clone()), + denial_tx, + activity_tx, + ) + .await?; + ( + Some(proxy_handle), + denial_rx, + bypass_denial_tx, + activity_rx, + bypass_activity_tx, + ) } else { - (None, None, None) + (None, None, None, None, None) }; - let proxy_handle = ProxyHandle::start_with_bind_addr( - proxy_policy, - bind_addr, - engine, - cache, - entrypoint_pid.clone(), - tls_state, - inference_ctx, - Some(provider_credentials.clone()), - Some(policy_local_ctx.clone()), - denial_tx, - ) - .await?; - (Some(proxy_handle), denial_rx, bypass_denial_tx) - } else { - (None, None, None) - }; - // Spawn bypass detection monitor (Linux only, proxy mode only). // Reads /dev/kmsg for nftables log entries and emits structured // tracing events for direct connection attempts that bypass the proxy. @@ -641,12 +672,15 @@ pub async fn run_sandbox( ns.name().to_string(), entrypoint_pid.clone(), bypass_denial_tx, + bypass_activity_tx, ) }); // On non-Linux, bypass_denial_tx is unused (no /dev/kmsg). #[cfg(not(target_os = "linux"))] drop(bypass_denial_tx); + #[cfg(not(target_os = "linux"))] + drop(bypass_activity_tx); // Compute the proxy URL and netns fd for SSH sessions. // SSH shell processes need both to enforce network policy: @@ -996,6 +1030,32 @@ pub async fn run_sandbox( .await; }); } + if let Some(rx) = activity_rx { + let agg_name = sandbox_name_for_agg.clone().unwrap_or_else(|| id.clone()); + let agg_endpoint = endpoint.clone(); + let flush_interval_secs = activity_aggregator::activity_flush_interval_secs_from_env( + std::env::var("OPENSHELL_ACTIVITY_FLUSH_INTERVAL_SECS") + .ok() + .as_deref(), + ); + let aggregator = activity_aggregator::ActivityAggregator::new(rx, flush_interval_secs); + + tokio::spawn(async move { + aggregator + .run(move |summary| { + let endpoint = agg_endpoint.clone(); + let sandbox_name = agg_name.clone(); + async move { + if let Err(e) = + flush_activity_to_gateway(&endpoint, &sandbox_name, summary).await + { + warn!(error = %e, "Failed to flush activity summary to gateway"); + } + } + }) + .await; + }); + } } // Wait for process with optional timeout @@ -2278,12 +2338,45 @@ async fn flush_proposals_to_gateway( ); client - .submit_policy_analysis(sandbox_name, proto_summaries, proposals, "mechanistic") + .submit_policy_analysis( + sandbox_name, + proto_summaries, + proposals, + vec![], + "mechanistic", + ) .await?; Ok(()) } +async fn flush_activity_to_gateway( + endpoint: &str, + sandbox_name: &str, + summary: activity_aggregator::FlushableActivitySummary, +) -> Result<()> { + use crate::grpc_client::CachedOpenShellClient; + use openshell_core::proto::{DenialGroupCount, NetworkActivitySummary}; + + let client = CachedOpenShellClient::connect(endpoint).await?; + let summary = NetworkActivitySummary { + network_activity_count: summary.network_activity_count, + denied_action_count: summary.denied_action_count, + denials_by_group: summary + .denials_by_group + .into_iter() + .map(|(deny_group, denied_count)| DenialGroupCount { + deny_group, + denied_count, + }) + .collect(), + }; + client + .submit_policy_analysis(sandbox_name, vec![], vec![], vec![summary], "telemetry") + .await?; + Ok(()) +} + /// `reload_from_proto_with_pid()`. Reports load success/failure back to the /// server. On failure, the previous engine is untouched (LKG behavior). /// @@ -3028,6 +3121,32 @@ filesystem_policy: ); } + #[test] + fn telemetry_opt_out_disables_activity_collection_and_flush_channel() { + let _guard = ENV_LOCK.lock().unwrap(); + with_vars([("OPENSHELL_TELEMETRY_ENABLED", Some("false"))], || { + let (activity_tx, activity_rx, bypass_activity_tx) = + activity_collection_channels(Some("sb-1")); + + assert!(activity_tx.is_none()); + assert!(activity_rx.is_none()); + assert!(bypass_activity_tx.is_none()); + }); + } + + #[test] + fn telemetry_enabled_creates_activity_collection_and_flush_channel() { + let _guard = ENV_LOCK.lock().unwrap(); + with_vars([("OPENSHELL_TELEMETRY_ENABLED", Some("true"))], || { + let (activity_tx, activity_rx, bypass_activity_tx) = + activity_collection_channels(Some("sb-1")); + + assert!(activity_tx.is_some()); + assert!(activity_rx.is_some()); + assert!(bypass_activity_tx.is_some()); + }); + } + #[tokio::test] async fn route_cache_preserves_content_when_not_written() { use std::sync::Arc; diff --git a/crates/openshell-sandbox/src/policy_local.rs b/crates/openshell-sandbox/src/policy_local.rs index 657fd760f..4006028cf 100644 --- a/crates/openshell-sandbox/src/policy_local.rs +++ b/crates/openshell-sandbox/src/policy_local.rs @@ -494,7 +494,7 @@ async fn submit_proposal(ctx: &PolicyLocalContext, body: &[u8]) -> (u16, serde_j let audit_summaries: Vec = chunks.iter().map(summarize_chunk_for_audit).collect(); let response = match client - .submit_policy_analysis(sandbox_name, vec![], chunks, "agent_authored") + .submit_policy_analysis(sandbox_name, vec![], chunks, vec![], "agent_authored") .await { Ok(response) => response, diff --git a/crates/openshell-sandbox/src/proxy.rs b/crates/openshell-sandbox/src/proxy.rs index 81ed0322f..c5da5f6c3 100644 --- a/crates/openshell-sandbox/src/proxy.rs +++ b/crates/openshell-sandbox/src/proxy.rs @@ -3,6 +3,7 @@ //! HTTP CONNECT proxy with OPA policy evaluation and process-identity binding. +use crate::activity_aggregator::{ActivitySender, try_record_activity}; use crate::denial_aggregator::DenialEvent; use crate::identity::BinaryIdentityCache; use crate::l7::tls::ProxyTlsState; @@ -186,6 +187,7 @@ impl ProxyHandle { provider_credentials: Option, policy_local_ctx: Option>, denial_tx: Option>, + activity_tx: Option, ) -> Result { // Use override bind_addr, fall back to policy http_addr, then default // to loopback:3128. The default allows the proxy to function when no @@ -241,6 +243,7 @@ impl ProxyHandle { .as_ref() .and_then(ProviderCredentialState::resolver); let dtx = denial_tx.clone(); + let atx = activity_tx.clone(); tokio::spawn(async move { if let Err(err) = handle_tcp_connection( stream, @@ -253,6 +256,7 @@ impl ProxyHandle { gw, resolver, dtx, + atx, ) .await { @@ -298,6 +302,43 @@ impl Drop for ProxyHandle { } } +fn emit_activity(tx: &Option, denied: bool, deny_group: &'static str) { + if let Some(tx) = tx { + let _ = try_record_activity(tx, denied, deny_group); + } +} + +fn l7_inspection_active(l7_route: Option<&L7RouteSnapshot>) -> bool { + l7_route.is_some_and(|route| !route.configs.is_empty()) +} + +fn emit_connect_activity_if_l4_only( + tx: &Option, + l7_route: Option<&L7RouteSnapshot>, +) { + if !l7_inspection_active(l7_route) { + emit_activity(tx, false, "unknown"); + } +} + +fn emit_activity_simple(tx: Option<&ActivitySender>, denied: bool, deny_group: &'static str) { + if let Some(tx) = tx { + let _ = try_record_activity(tx, denied, deny_group); + } +} + +fn emit_forward_success_activity(tx: Option<&ActivitySender>, l7_activity_pending: bool) { + emit_activity_simple( + tx, + false, + if l7_activity_pending { + "l7_policy" + } else { + "unknown" + }, + ); +} + /// Emit a denial event to the aggregator channel (if configured). /// Used by `handle_tcp_connection` which owns `Option`. fn emit_denial( @@ -371,6 +412,7 @@ async fn handle_tcp_connection( trusted_host_gateway: Arc>, secret_resolver: Option>, denial_tx: Option>, + activity_tx: Option, ) -> Result<()> { let mut buf = vec![0u8; MAX_HEADER_BYTES]; let mut used = 0usize; @@ -417,6 +459,7 @@ async fn handle_tcp_connection( trusted_host_gateway, secret_resolver, denial_tx.as_ref(), + activity_tx.as_ref(), ) .await; } @@ -435,6 +478,7 @@ async fn handle_tcp_connection( ) .await?; if let InferenceOutcome::Denied { reason } = outcome { + emit_activity(&activity_tx, true, "forward_policy"); let event = NetworkActivityBuilder::new(crate::ocsf_ctx()) .activity(ActivityId::Open) .action(ActionId::Denied) @@ -539,6 +583,7 @@ async fn handle_tcp_connection( &deny_reason, "connect", ); + emit_activity(&activity_tx, true, "connect_policy"); respond( &mut client, &build_json_error_response( @@ -612,6 +657,7 @@ async fn handle_tcp_connection( &reason, "ssrf", ); + emit_activity(&activity_tx, true, "ssrf"); respond( &mut client, &build_json_error_response( @@ -667,6 +713,7 @@ async fn handle_tcp_connection( &reason, "ssrf", ); + emit_activity(&activity_tx, true, "ssrf"); respond( &mut client, &build_json_error_response( @@ -714,6 +761,7 @@ async fn handle_tcp_connection( &reason, "ssrf", ); + emit_activity(&activity_tx, true, "ssrf"); respond( &mut client, &build_json_error_response( @@ -764,6 +812,7 @@ async fn handle_tcp_connection( &reason, "ssrf", ); + emit_activity(&activity_tx, true, "ssrf"); respond( &mut client, &build_json_error_response( @@ -789,13 +838,11 @@ async fn handle_tcp_connection( // Check if endpoint has L7 config for protocol-aware inspection, and // retain the generation for HTTP passthrough keep-alive tunnels. let l7_route = query_l7_route_snapshot(&opa_engine, &decision, &host_lc, port); + let should_inspect_l7 = l7_inspection_active(l7_route.as_ref()); // Log the allowed CONNECT — use CONNECT_L7 when L7 inspection follows, // so log consumers can distinguish L4-only decisions from tunnel lifecycle events. - let connect_msg = if l7_route - .as_ref() - .is_some_and(|route| !route.configs.is_empty()) - { + let connect_msg = if should_inspect_l7 { "CONNECT_L7" } else { "CONNECT" @@ -818,6 +865,7 @@ async fn handle_tcp_connection( .build(); ocsf_emit!(event); } + emit_connect_activity_if_l4_only(&activity_tx, l7_route.as_ref()); // Determine effective TLS mode. Check the raw endpoint config for // `tls: skip` independently of L7 config (which requires `protocol`). @@ -845,6 +893,7 @@ async fn handle_tcp_connection( .map(|p| p.to_string_lossy().into_owned()) .collect(), secret_resolver: secret_resolver.clone(), + activity_tx: activity_tx.clone(), }; if effective_tls_skip { @@ -2699,6 +2748,7 @@ async fn handle_forward_proxy( trusted_host_gateway: Arc>, secret_resolver: Option>, denial_tx: Option<&mpsc::UnboundedSender>, + activity_tx: Option<&ActivitySender>, ) -> Result<()> { // 1. Parse the absolute-form URI. `path` is marked `mut` so that, when an // L7 config applies, the canonicalized form produced below replaces it @@ -2861,6 +2911,7 @@ async fn handle_forward_proxy( reason, "forward", ); + emit_activity_simple(activity_tx, true, "forward_policy"); respond( client, &build_json_error_response( @@ -2880,6 +2931,7 @@ async fn handle_forward_proxy( Ok(guard) => guard, Err(e) => { emit_l7_tunnel_close_after_policy_change(&host_lc, port, e); + emit_activity_simple(activity_tx, true, "policy_stale"); respond( client, &build_json_error_response( @@ -2923,7 +2975,9 @@ async fn handle_forward_proxy( .map(|p| p.to_string_lossy().into_owned()) .collect(), secret_resolver: secret_resolver.clone(), + activity_tx: activity_tx.cloned(), }; + let mut l7_activity_pending = false; // 4b. If the endpoint has L7 config, evaluate the request against // L7 policy. The forward proxy handles exactly one request per @@ -2941,6 +2995,7 @@ async fn handle_forward_proxy( route.generation, ), ); + emit_activity_simple(activity_tx, true, "policy_stale"); respond( client, &build_json_error_response( @@ -2957,6 +3012,7 @@ async fn handle_forward_proxy( Ok(engine) => engine, Err(e) => { emit_l7_tunnel_close_after_policy_change(&host_lc, port, e); + emit_activity_simple(activity_tx, true, "policy_stale"); respond( client, &build_json_error_response( @@ -3013,6 +3069,7 @@ async fn handle_forward_proxy( )) .build(); ocsf_emit!(event); + emit_activity_simple(activity_tx, true, "l7_parse_rejection"); respond( client, &build_json_error_response( @@ -3027,6 +3084,7 @@ async fn handle_forward_proxy( } }; let Some(l7_config) = select_l7_config_for_path(&route.configs, &path) else { + emit_activity_simple(activity_tx, true, "l7_policy"); respond( client, &build_json_error_response( @@ -3079,6 +3137,7 @@ async fn handle_forward_proxy( .message(format!("FORWARD_GRAPHQL_L7 request rejected: {e}")) .build(); ocsf_emit!(event); + emit_activity_simple(activity_tx, true, "l7_parse_rejection"); respond( client, &build_json_error_response( @@ -3186,6 +3245,7 @@ async fn handle_forward_proxy( || (!allowed && l7_config.config.enforcement == crate::l7::EnforcementMode::Enforce); if effectively_denied { + emit_activity_simple(activity_tx, true, "l7_policy"); emit_denial_simple( denial_tx, &host_lc, @@ -3207,6 +3267,7 @@ async fn handle_forward_proxy( .await?; return Ok(()); } + l7_activity_pending = true; forward_tunnel_engine = Some(tunnel_engine); } @@ -3267,6 +3328,7 @@ async fn handle_forward_proxy( &reason, "ssrf", ); + emit_activity_simple(activity_tx, true, "ssrf"); respond( client, &build_json_error_response( @@ -3323,6 +3385,7 @@ async fn handle_forward_proxy( &reason, "ssrf", ); + emit_activity_simple(activity_tx, true, "ssrf"); respond( client, &build_json_error_response( @@ -3374,6 +3437,7 @@ async fn handle_forward_proxy( &reason, "ssrf", ); + emit_activity_simple(activity_tx, true, "ssrf"); respond( client, &build_json_error_response( @@ -3428,6 +3492,7 @@ async fn handle_forward_proxy( &reason, "ssrf", ); + emit_activity_simple(activity_tx, true, "ssrf"); respond( client, &build_json_error_response( @@ -3445,6 +3510,7 @@ async fn handle_forward_proxy( if let Err(e) = forward_generation_guard.ensure_current() { emit_l7_tunnel_close_after_policy_change(&host_lc, port, e); + emit_activity_simple(activity_tx, true, "policy_stale"); respond( client, &build_json_error_response( @@ -3518,6 +3584,7 @@ async fn handle_forward_proxy( .build(); ocsf_emit!(event); } + emit_forward_success_activity(activity_tx, l7_activity_pending); // 9. Rewrite request and forward to upstream let rewritten = match rewrite_forward_request( @@ -3702,6 +3769,83 @@ mod tests { } } + #[test] + fn connect_activity_is_skipped_when_l7_will_count_the_request() { + let (tx, mut rx) = mpsc::channel(4); + let activity_tx = Some(tx); + let l7_route = L7RouteSnapshot { + configs: vec![L7ConfigSnapshot { + config: websocket_l7_config(crate::l7::L7Protocol::Rest, false), + }], + generation: 1, + }; + let l4_route = L7RouteSnapshot { + configs: Vec::new(), + generation: 1, + }; + + emit_connect_activity_if_l4_only(&activity_tx, Some(&l7_route)); + assert!( + rx.try_recv().is_err(), + "L7-inspected CONNECT should not emit an extra L4 activity event" + ); + + emit_connect_activity_if_l4_only(&activity_tx, Some(&l4_route)); + let event = rx.try_recv().expect("L4-only CONNECT should emit activity"); + assert!(!event.denied); + assert_eq!(event.deny_group, "unknown"); + + emit_connect_activity_if_l4_only(&activity_tx, None); + let event = rx + .try_recv() + .expect("CONNECT without an L7 route should emit activity"); + assert!(!event.denied); + assert_eq!(event.deny_group, "unknown"); + } + + #[test] + fn forward_l7_allowed_activity_is_deferred_until_after_ssrf() { + let (tx, mut rx) = mpsc::channel(4); + let activity_tx = Some(tx); + + let l7_activity_pending = true; + assert!( + rx.try_recv().is_err(), + "allowed L7 evaluation must not emit activity before SSRF succeeds" + ); + + emit_activity_simple(activity_tx.as_ref(), true, "ssrf"); + let event = rx + .try_recv() + .expect("SSRF denial should emit the request activity"); + assert!(event.denied); + assert_eq!(event.deny_group, "ssrf"); + assert!( + rx.try_recv().is_err(), + "SSRF-denied forward request must not also emit allowed L7 activity" + ); + + emit_forward_success_activity(activity_tx.as_ref(), l7_activity_pending); + let event = rx + .try_recv() + .expect("L7 activity should emit after SSRF succeeds"); + assert!(!event.denied); + assert_eq!(event.deny_group, "l7_policy"); + } + + #[test] + fn forward_success_activity_uses_unknown_without_l7() { + let (tx, mut rx) = mpsc::channel(4); + let activity_tx = Some(tx); + + emit_forward_success_activity(activity_tx.as_ref(), false); + let event = rx + .try_recv() + .expect("non-L7 forward success should emit activity"); + assert!(!event.denied); + assert_eq!(event.deny_group, "unknown"); + } + fn forward_test_guard() -> PolicyGenerationGuard { let policy = include_str!("../data/sandbox-policy.rego"); let policy_data = "network_policies: {}\n"; @@ -3828,6 +3972,7 @@ mod tests { ancestors: vec![], cmdline_paths: vec![], secret_resolver: None, + activity_tx: None, }; (config, tunnel_engine, ctx) } @@ -3993,6 +4138,7 @@ mod tests { ancestors: vec![], cmdline_paths: vec![], secret_resolver: resolver, + activity_tx: None, }; let query_params = std::collections::HashMap::new(); @@ -4033,6 +4179,7 @@ mod tests { ancestors: vec![], cmdline_paths: vec![], secret_resolver: None, + activity_tx: None, }; let query_params = std::collections::HashMap::new(); let config = websocket_l7_config(crate::l7::L7Protocol::Rest, false); diff --git a/crates/openshell-server/src/compute/mod.rs b/crates/openshell-server/src/compute/mod.rs index a69231dea..7beb7cf26 100644 --- a/crates/openshell-server/src/compute/mod.rs +++ b/crates/openshell-server/src/compute/mod.rs @@ -15,6 +15,7 @@ use crate::sandbox_watch::SandboxWatchBus; use crate::supervisor_session::SupervisorSessionRegistry; use crate::tracing_bus::TracingLogBus; use futures::{Stream, StreamExt}; +use openshell_core::ComputeDriverKind; use openshell_core::proto::compute::v1::{ CreateSandboxRequest, DeleteSandboxRequest, DriverCondition, DriverPlatformEvent, DriverResourceRequirements, DriverSandbox, DriverSandboxSpec, DriverSandboxStatus, @@ -219,6 +220,7 @@ impl ComputeDriver for RemoteComputeDriver { #[derive(Clone)] pub struct ComputeRuntime { driver: SharedComputeDriver, + driver_kind: Option, shutdown_cleanup: Option>, startup_resume: Option>, _driver_process: Option>, @@ -241,6 +243,7 @@ impl fmt::Debug for ComputeRuntime { impl ComputeRuntime { #[allow(clippy::too_many_arguments)] async fn from_driver( + driver_kind: ComputeDriverKind, driver: SharedComputeDriver, shutdown_cleanup: Option>, startup_resume: Option>, @@ -261,6 +264,7 @@ impl ComputeRuntime { .default_image; Ok(Self { driver, + driver_kind: Some(driver_kind), shutdown_cleanup, startup_resume, _driver_process: driver_process, @@ -304,6 +308,7 @@ impl ComputeRuntime { let startup_resume: Arc = driver.clone(); let driver: SharedComputeDriver = driver; Self::from_driver( + ComputeDriverKind::Docker, driver, Some(shutdown_cleanup), Some(startup_resume), @@ -332,6 +337,7 @@ impl ComputeRuntime { .map_err(|err| ComputeError::Message(err.to_string()))?; let driver: SharedComputeDriver = Arc::new(ComputeDriverService::new(driver)); Self::from_driver( + ComputeDriverKind::Kubernetes, driver, None, None, @@ -358,6 +364,7 @@ impl ComputeRuntime { ) -> Result { let driver: SharedComputeDriver = Arc::new(RemoteComputeDriver::new(channel)); Self::from_driver( + ComputeDriverKind::Vm, driver, None, None, @@ -386,6 +393,7 @@ impl ComputeRuntime { .map_err(|err| ComputeError::Message(err.to_string()))?; let driver: SharedComputeDriver = Arc::new(PodmanDriverService::new(driver)); Self::from_driver( + ComputeDriverKind::Podman, driver, None, None, @@ -406,6 +414,11 @@ impl ComputeRuntime { &self.default_image } + #[must_use] + pub fn driver_kind(&self) -> Option { + self.driver_kind + } + #[must_use] pub fn gateway_bind_addresses(&self) -> &[SocketAddr] { &self.gateway_bind_addresses @@ -1719,6 +1732,7 @@ impl ComputeDriver for NoopTestDriver { pub async fn new_test_runtime(store: Arc) -> ComputeRuntime { ComputeRuntime { driver: Arc::new(NoopTestDriver), + driver_kind: None, shutdown_cleanup: None, startup_resume: None, _driver_process: None, @@ -1886,6 +1900,7 @@ mod tests { let store = Arc::new(Store::connect("sqlite::memory:").await.unwrap()); ComputeRuntime { driver, + driver_kind: None, shutdown_cleanup: None, startup_resume, _driver_process: None, diff --git a/crates/openshell-server/src/grpc/policy.rs b/crates/openshell-server/src/grpc/policy.rs index 412febb96..4319e8c67 100644 --- a/crates/openshell-server/src/grpc/policy.rs +++ b/crates/openshell-server/src/grpc/policy.rs @@ -36,6 +36,9 @@ use openshell_core::proto::{ L7DenyRule, L7Rule, NetworkBinary, NetworkEndpoint, NetworkPolicyRule, Provider, Sandbox, SandboxPolicy as ProtoSandboxPolicy, }; +use openshell_core::telemetry::{ + LifecycleOperation, LifecycleResource, PolicyDecisionOperation, TelemetryOutcome, +}; use openshell_core::{ VERSION, settings::{self, SettingValueKind}, @@ -76,6 +79,58 @@ const GLOBAL_POLICY_SANDBOX_ID: &str = "__global__"; /// Maximum number of optimistic retry attempts for policy version conflicts. const MERGE_RETRY_LIMIT: usize = 5; +fn emit_sandbox_policy_update_success() { + openshell_core::telemetry::emit_lifecycle( + LifecycleResource::SandboxPolicy, + LifecycleOperation::Update, + TelemetryOutcome::Success, + ); +} + +fn emit_sandbox_policy_update_failure() { + openshell_core::telemetry::emit_lifecycle( + LifecycleResource::SandboxPolicy, + LifecycleOperation::Update, + TelemetryOutcome::Failure, + ); +} + +fn should_emit_config_update_policy_telemetry(sandbox_caller: bool) -> bool { + !sandbox_caller +} + +fn emit_config_update_policy_success(sandbox_caller: bool) { + if should_emit_config_update_policy_telemetry(sandbox_caller) { + emit_sandbox_policy_update_success(); + } +} + +fn should_emit_full_policy_update_telemetry(sandbox_caller: bool, next_version: i64) -> bool { + !sandbox_caller && next_version > 1 +} + +fn emit_full_policy_update_success(sandbox_caller: bool, next_version: i64) { + if should_emit_full_policy_update_telemetry(sandbox_caller, next_version) { + emit_sandbox_policy_update_success(); + } +} + +fn emit_policy_decision_success(operation: PolicyDecisionOperation, rule_count: u64) { + openshell_core::telemetry::emit_policy_decision( + operation, + TelemetryOutcome::Success, + rule_count, + ); +} + +fn emit_policy_decision_failure(operation: PolicyDecisionOperation, rule_count: u64) { + openshell_core::telemetry::emit_policy_decision( + operation, + TelemetryOutcome::Failure, + rule_count, + ); +} + fn emit_gateway_policy_audit_log( sandbox_id: &str, sandbox_name: &str, @@ -657,6 +712,21 @@ pub(super) async fn handle_get_sandbox_provider_environment( pub(super) async fn handle_update_config( state: &Arc, request: Request, +) -> Result, Status> { + let sandbox_caller = is_sandbox_caller(&request); + let update = request.get_ref(); + let should_emit_policy_failure = should_emit_config_update_policy_telemetry(sandbox_caller) + && (update.policy.is_some() || !update.merge_operations.is_empty()); + let result = handle_update_config_inner(state, request).await; + if result.is_err() && should_emit_policy_failure { + emit_sandbox_policy_update_failure(); + } + result +} + +async fn handle_update_config_inner( + state: &Arc, + request: Request, ) -> Result, Status> { let sandbox_caller = is_sandbox_caller(&request); let req = request.into_inner(); @@ -981,6 +1051,7 @@ pub(super) async fn handle_update_config( operation_count = merge_ops.len(), "UpdateConfig: merged incremental policy operations" ); + emit_config_update_policy_success(sandbox_caller); return Ok(Response::new(UpdateConfigResponse { version: u32::try_from(version).unwrap_or(0), @@ -1080,6 +1151,7 @@ pub(super) async fn handle_update_config( policy_hash = %hash, "UpdateConfig: new policy version persisted" ); + emit_full_policy_update_success(sandbox_caller, next_version); Ok(Response::new(UpdateConfigResponse { version: u32::try_from(next_version).unwrap_or(0), @@ -1359,6 +1431,22 @@ pub(super) async fn handle_submit_policy_analysis( .map_err(|e| Status::internal(format!("fetch sandbox failed: {e}")))? .ok_or_else(|| Status::not_found("sandbox not found"))?; let sandbox_id = sandbox.object_id().to_string(); + for summary in &req.network_activity_summaries { + state + .telemetry + .record_network_activity(&sandbox_id, summary); + } + if req.proposed_chunks.is_empty() + && req.summaries.is_empty() + && !req.network_activity_summaries.is_empty() + { + return Ok(Response::new(SubmitPolicyAnalysisResponse { + accepted_chunks: 0, + rejected_chunks: 0, + rejection_reasons: Vec::new(), + accepted_chunk_ids: Vec::new(), + })); + } let current_version = state .store @@ -1531,6 +1619,17 @@ pub(super) async fn handle_get_draft_policy( pub(super) async fn handle_approve_draft_chunk( state: &Arc, request: Request, +) -> Result, Status> { + let result = handle_approve_draft_chunk_inner(state, request).await; + if result.is_err() { + emit_policy_decision_failure(PolicyDecisionOperation::Approve, 1); + } + result +} + +async fn handle_approve_draft_chunk_inner( + state: &Arc, + request: Request, ) -> Result, Status> { let req = request.into_inner(); if req.name.is_empty() { @@ -1608,6 +1707,8 @@ pub(super) async fn handle_approve_draft_chunk( policy_hash = %hash, "ApproveDraftChunk: rule merged successfully" ); + emit_sandbox_policy_update_success(); + emit_policy_decision_success(PolicyDecisionOperation::Approve, 1); Ok(Response::new(ApproveDraftChunkResponse { policy_version: u32::try_from(version).unwrap_or(0), @@ -1618,6 +1719,17 @@ pub(super) async fn handle_approve_draft_chunk( pub(super) async fn handle_reject_draft_chunk( state: &Arc, request: Request, +) -> Result, Status> { + let result = handle_reject_draft_chunk_inner(state, request).await; + if result.is_err() { + emit_policy_decision_failure(PolicyDecisionOperation::Reject, 1); + } + result +} + +async fn handle_reject_draft_chunk_inner( + state: &Arc, + request: Request, ) -> Result, Status> { let req = request.into_inner(); if req.name.is_empty() { @@ -1677,6 +1789,7 @@ pub(super) async fn handle_reject_draft_chunk( version, &hash, ); + emit_sandbox_policy_update_success(); } let now_ms = current_time_ms(); @@ -1695,6 +1808,7 @@ pub(super) async fn handle_reject_draft_chunk( .map_err(|e| Status::internal(format!("update chunk status failed: {e}")))?; state.sandbox_watch_bus.notify(&sandbox_id); + emit_policy_decision_success(PolicyDecisionOperation::Reject, 1); Ok(Response::new(RejectDraftChunkResponse {})) } @@ -1702,6 +1816,17 @@ pub(super) async fn handle_reject_draft_chunk( pub(super) async fn handle_approve_all_draft_chunks( state: &Arc, request: Request, +) -> Result, Status> { + let result = handle_approve_all_draft_chunks_inner(state, request).await; + if result.is_err() { + emit_policy_decision_failure(PolicyDecisionOperation::ApproveAll, 0); + } + result +} + +async fn handle_approve_all_draft_chunks_inner( + state: &Arc, + request: Request, ) -> Result, Status> { let req = request.into_inner(); if req.name.is_empty() { @@ -1784,6 +1909,7 @@ pub(super) async fn handle_approve_all_draft_chunks( &last_hash, ); chunks_approved += 1; + emit_sandbox_policy_update_success(); } state.sandbox_watch_bus.notify(&sandbox_id); @@ -1806,6 +1932,10 @@ pub(super) async fn handle_approve_all_draft_chunks( policy_hash = %last_hash, "ApproveAllDraftChunks: bulk approval complete" ); + emit_policy_decision_success( + PolicyDecisionOperation::ApproveAll, + u64::from(chunks_approved), + ); Ok(Response::new(ApproveAllDraftChunksResponse { policy_version: u32::try_from(last_version).unwrap_or(0), @@ -1871,6 +2001,17 @@ pub(super) async fn handle_edit_draft_chunk( pub(super) async fn handle_undo_draft_chunk( state: &Arc, request: Request, +) -> Result, Status> { + let result = handle_undo_draft_chunk_inner(state, request).await; + if result.is_err() { + emit_policy_decision_failure(PolicyDecisionOperation::Undo, 1); + } + result +} + +async fn handle_undo_draft_chunk_inner( + state: &Arc, + request: Request, ) -> Result, Status> { let req = request.into_inner(); if req.name.is_empty() { @@ -1944,6 +2085,8 @@ pub(super) async fn handle_undo_draft_chunk( policy_hash = %hash, "UndoDraftChunk: rule removed, chunk reverted to pending" ); + emit_sandbox_policy_update_success(); + emit_policy_decision_success(PolicyDecisionOperation::Undo, 1); Ok(Response::new(UndoDraftChunkResponse { policy_version: u32::try_from(version).unwrap_or(0), @@ -2876,6 +3019,19 @@ mod tests { assert!(is_sandbox_caller(&req)); } + #[test] + fn sandbox_caller_policy_sync_does_not_emit_policy_update_telemetry() { + assert!(!should_emit_config_update_policy_telemetry(true)); + assert!(should_emit_config_update_policy_telemetry(false)); + } + + #[test] + fn first_policy_revision_does_not_emit_policy_update_telemetry() { + assert!(!should_emit_full_policy_update_telemetry(false, 1)); + assert!(!should_emit_full_policy_update_telemetry(true, 2)); + assert!(should_emit_full_policy_update_telemetry(false, 2)); + } + // ---- Sandbox without policy ---- #[tokio::test] diff --git a/crates/openshell-server/src/grpc/provider.rs b/crates/openshell-server/src/grpc/provider.rs index cd85e31b1..6eebdd682 100644 --- a/crates/openshell-server/src/grpc/provider.rs +++ b/crates/openshell-server/src/grpc/provider.rs @@ -9,6 +9,9 @@ use crate::persistence::{ ObjectId, ObjectLabels, ObjectName, ObjectType, Store, WriteCondition, generate_name, }; use openshell_core::proto::{Provider, Sandbox}; +use openshell_core::telemetry::{ + LifecycleOperation, ProviderProfile as TelemetryProviderProfile, TelemetryOutcome, +}; use prost::Message; use tonic::Status; use tracing::warn; @@ -640,7 +643,7 @@ use openshell_core::proto::{ }; use openshell_providers::{ CredentialRefreshProfile, ProfileValidationDiagnostic, ProviderTypeProfile, default_profiles, - get_default_profile, normalize_profile_id, validate_profile_set, + get_default_profile, normalize_profile_id, normalize_provider_type, validate_profile_set, }; use std::sync::Arc; use tonic::{Request, Response}; @@ -650,14 +653,36 @@ pub(super) async fn handle_create_provider( request: Request, ) -> Result, Status> { let req = request.into_inner(); - let provider = req - .provider - .ok_or_else(|| Status::invalid_argument("provider is required"))?; - let provider = create_provider_record(state.store.as_ref(), provider).await?; - - Ok(Response::new(ProviderResponse { - provider: Some(provider), - })) + let Some(provider) = req.provider else { + emit_provider_lifecycle( + "custom", + LifecycleOperation::Create, + TelemetryOutcome::Failure, + ); + return Err(Status::invalid_argument("provider is required")); + }; + let provider_type = provider.r#type.clone(); + let result = create_provider_record(state.store.as_ref(), provider).await; + match result { + Ok(provider) => { + emit_provider_lifecycle( + &provider.r#type, + LifecycleOperation::Create, + TelemetryOutcome::Success, + ); + Ok(Response::new(ProviderResponse { + provider: Some(provider), + })) + } + Err(err) => { + emit_provider_lifecycle( + &provider_type, + LifecycleOperation::Create, + TelemetryOutcome::Failure, + ); + Err(err) + } + } } pub(super) async fn handle_get_provider( @@ -1093,17 +1118,39 @@ pub(super) async fn handle_update_provider( request: Request, ) -> Result, Status> { let req = request.into_inner(); - let mut provider = req - .provider - .ok_or_else(|| Status::invalid_argument("provider is required"))?; + let Some(mut provider) = req.provider else { + emit_provider_lifecycle( + "custom", + LifecycleOperation::Update, + TelemetryOutcome::Failure, + ); + return Err(Status::invalid_argument("provider is required")); + }; + let provider_type = provider.r#type.clone(); provider .credential_expires_at_ms .extend(req.credential_expires_at_ms); - let provider = update_provider_record(state.store.as_ref(), provider).await?; - - Ok(Response::new(ProviderResponse { - provider: Some(provider), - })) + let result = update_provider_record(state.store.as_ref(), provider).await; + match result { + Ok(provider) => { + emit_provider_lifecycle( + &provider.r#type, + LifecycleOperation::Update, + TelemetryOutcome::Success, + ); + Ok(Response::new(ProviderResponse { + provider: Some(provider), + })) + } + Err(err) => { + emit_provider_lifecycle( + &provider_type, + LifecycleOperation::Update, + TelemetryOutcome::Failure, + ); + Err(err) + } + } } pub(super) async fn handle_get_provider_refresh_status( @@ -1442,9 +1489,69 @@ pub(super) async fn handle_delete_provider( request: Request, ) -> Result, Status> { let name = request.into_inner().name; - let deleted = delete_provider_record(state.store.as_ref(), &name).await?; + let provider_profile = provider_profile_for_name(state.store.as_ref(), &name).await; + let result = delete_provider_record(state.store.as_ref(), &name).await; + match result { + Ok(deleted) => { + let outcome = TelemetryOutcome::from_success(deleted); + emit_provider_profile_lifecycle( + provider_profile.unwrap_or(TelemetryProviderProfile::Custom), + LifecycleOperation::Delete, + outcome, + ); + Ok(Response::new(DeleteProviderResponse { deleted })) + } + Err(err) => { + emit_provider_profile_lifecycle( + provider_profile.unwrap_or(TelemetryProviderProfile::Custom), + LifecycleOperation::Delete, + TelemetryOutcome::Failure, + ); + Err(err) + } + } +} - Ok(Response::new(DeleteProviderResponse { deleted })) +fn emit_provider_lifecycle( + provider_type: &str, + operation: LifecycleOperation, + outcome: TelemetryOutcome, +) { + let provider_profile = telemetry_provider_profile(provider_type); + emit_provider_profile_lifecycle(provider_profile, operation, outcome); +} + +fn emit_provider_profile_lifecycle( + provider_profile: TelemetryProviderProfile, + operation: LifecycleOperation, + outcome: TelemetryOutcome, +) { + openshell_core::telemetry::emit_provider_lifecycle(operation, outcome, provider_profile); +} + +async fn provider_profile_for_name(store: &Store, name: &str) -> Option { + store + .get_message_by_name::(name) + .await + .ok() + .flatten() + .map(|provider| telemetry_provider_profile(&provider.r#type)) +} + +fn telemetry_provider_profile(provider_type: &str) -> TelemetryProviderProfile { + match normalize_provider_type(provider_type) { + Some("anthropic") => TelemetryProviderProfile::Anthropic, + Some("claude" | "claude-code") => TelemetryProviderProfile::Claude, + Some("codex") => TelemetryProviderProfile::Codex, + Some("copilot") => TelemetryProviderProfile::Copilot, + Some("github") => TelemetryProviderProfile::Github, + Some("gitlab") => TelemetryProviderProfile::Gitlab, + Some("nvidia") => TelemetryProviderProfile::Nvidia, + Some("openai") => TelemetryProviderProfile::Openai, + Some("opencode") => TelemetryProviderProfile::Opencode, + Some("outlook") => TelemetryProviderProfile::Outlook, + _ => TelemetryProviderProfile::Custom, + } } // --------------------------------------------------------------------------- @@ -1484,6 +1591,46 @@ mod tests { assert!(!is_valid_env_key("X;rm -rf /")); } + #[test] + fn telemetry_provider_profile_maps_unknown_to_custom() { + assert_eq!( + telemetry_provider_profile("CLAUDE"), + TelemetryProviderProfile::Claude + ); + assert_eq!( + telemetry_provider_profile("github"), + TelemetryProviderProfile::Github + ); + assert_eq!( + telemetry_provider_profile("gh"), + TelemetryProviderProfile::Github + ); + assert_eq!( + telemetry_provider_profile("glab"), + TelemetryProviderProfile::Gitlab + ); + assert_eq!( + telemetry_provider_profile("outlook"), + TelemetryProviderProfile::Outlook + ); + assert_eq!( + telemetry_provider_profile("generic"), + TelemetryProviderProfile::Custom + ); + assert_eq!( + telemetry_provider_profile("unknown-private"), + TelemetryProviderProfile::Custom + ); + assert_eq!( + telemetry_provider_profile("acme-internal"), + TelemetryProviderProfile::Custom + ); + assert_eq!( + telemetry_provider_profile("corp-llm-prod"), + TelemetryProviderProfile::Custom + ); + } + fn provider_with_values(name: &str, provider_type: &str) -> Provider { Provider { metadata: Some(openshell_core::proto::datamodel::v1::ObjectMeta { diff --git a/crates/openshell-server/src/grpc/sandbox.rs b/crates/openshell-server/src/grpc/sandbox.rs index 4978687ed..75d3978ab 100644 --- a/crates/openshell-server/src/grpc/sandbox.rs +++ b/crates/openshell-server/src/grpc/sandbox.rs @@ -23,6 +23,10 @@ use openshell_core::proto::{ TcpRelayTarget, WatchSandboxRequest, relay_open, tcp_forward_init, }; use openshell_core::proto::{Sandbox, SandboxPhase, SandboxTemplate, SshSession}; +use openshell_core::telemetry::{ + LifecycleOperation, LifecycleResource, SandboxTemplateSource, TelemetryComputeDriver, + TelemetryOutcome, +}; use openshell_core::{ObjectId, ObjectName}; use prost::Message; use std::net::IpAddr; @@ -56,6 +60,62 @@ const TCP_FORWARD_CHUNK_SIZE: usize = 64 * 1024; pub(super) async fn handle_create_sandbox( state: &Arc, request: Request, +) -> Result, Status> { + let create_request = request.get_ref().clone(); + let result = handle_create_sandbox_inner(state, request).await; + emit_sandbox_create_telemetry( + state, + &create_request, + TelemetryOutcome::from_success(result.is_ok()), + ); + result +} + +fn emit_sandbox_create_telemetry( + state: &Arc, + request: &CreateSandboxRequest, + outcome: TelemetryOutcome, +) { + let compute_driver = telemetry_compute_driver(state.compute.driver_kind()); + let Some(spec) = request.spec.as_ref() else { + openshell_core::telemetry::emit_sandbox_create( + outcome, + false, + 0, + false, + SandboxTemplateSource::Undefined, + compute_driver, + ); + return; + }; + let template_source = if spec + .template + .as_ref() + .is_some_and(|template| !template.image.trim().is_empty()) + { + SandboxTemplateSource::Image + } else { + SandboxTemplateSource::Default + }; + openshell_core::telemetry::emit_sandbox_create( + outcome, + spec.gpu, + spec.providers.len() as u64, + spec.policy.is_some(), + template_source, + compute_driver, + ); +} + +fn telemetry_compute_driver( + driver_kind: Option, +) -> TelemetryComputeDriver { + TelemetryComputeDriver::from_driver_kind(driver_kind) +} + +async fn handle_create_sandbox_inner( + state: &Arc, + request: Request, ) -> Result, Status> { use crate::persistence::current_time_ms; @@ -392,13 +452,40 @@ pub(super) async fn handle_detach_sandbox_provider( pub(super) async fn handle_delete_sandbox( state: &Arc, request: Request, +) -> Result, Status> { + let result = handle_delete_sandbox_inner(state, request).await; + let outcome = match &result { + Ok(response) if response.get_ref().deleted => TelemetryOutcome::Success, + _ => TelemetryOutcome::Failure, + }; + openshell_core::telemetry::emit_lifecycle( + LifecycleResource::Sandbox, + LifecycleOperation::Delete, + outcome, + ); + result +} + +async fn handle_delete_sandbox_inner( + state: &Arc, + request: Request, ) -> Result, Status> { let name = request.into_inner().name; if name.is_empty() { return Err(Status::invalid_argument("name is required")); } + let sandbox_id = state + .store + .get_message_by_name::(&name) + .await + .ok() + .flatten() + .map(|sandbox| sandbox.object_id().to_string()); let deleted = state.compute.delete_sandbox(&name).await?; + if deleted && let Some(sandbox_id) = sandbox_id { + state.telemetry.end_sandbox_session(&sandbox_id); + } info!(sandbox_name = %name, "DeleteSandbox request completed successfully"); Ok(Response::new(DeleteSandboxResponse { deleted })) } @@ -1854,6 +1941,30 @@ mod tests { // ---- shell_escape ---- + #[test] + fn telemetry_compute_driver_uses_resolved_driver_kind() { + assert_eq!( + telemetry_compute_driver(Some(openshell_core::ComputeDriverKind::Docker)), + TelemetryComputeDriver::Docker + ); + assert_eq!( + telemetry_compute_driver(Some(openshell_core::ComputeDriverKind::Kubernetes)), + TelemetryComputeDriver::Kubernetes + ); + assert_eq!( + telemetry_compute_driver(Some(openshell_core::ComputeDriverKind::Podman)), + TelemetryComputeDriver::Podman + ); + assert_eq!( + telemetry_compute_driver(Some(openshell_core::ComputeDriverKind::Vm)), + TelemetryComputeDriver::Vm + ); + assert_eq!( + telemetry_compute_driver(None), + TelemetryComputeDriver::Unknown + ); + } + #[test] fn shell_escape_safe_chars_pass_through() { assert_eq!(shell_escape("ls").unwrap(), "ls"); diff --git a/crates/openshell-server/src/lib.rs b/crates/openshell-server/src/lib.rs index 220e45026..92c67ecac 100644 --- a/crates/openshell-server/src/lib.rs +++ b/crates/openshell-server/src/lib.rs @@ -37,6 +37,7 @@ mod sandbox_watch; mod service_routing; mod ssh_sessions; pub mod supervisor_session; +mod telemetry; mod tls; pub mod tracing_bus; mod ws_tunnel; @@ -84,6 +85,9 @@ pub struct ServerState { /// In-memory bus for server process logs. pub tracing_log_bus: TracingLogBus, + /// In-memory anonymous telemetry accounting for active sandbox sessions. + pub(crate) telemetry: telemetry::TelemetryState, + /// Active SSH tunnel connection counts per session token. pub ssh_connections_by_token: Mutex>, @@ -144,6 +148,7 @@ impl ServerState { sandbox_index, sandbox_watch_bus, tracing_log_bus, + telemetry: telemetry::TelemetryState::new(), ssh_connections_by_token: Mutex::new(HashMap::new()), ssh_connections_by_sandbox: Mutex::new(HashMap::new()), settings_mutex: tokio::sync::Mutex::new(()), diff --git a/crates/openshell-server/src/supervisor_session.rs b/crates/openshell-server/src/supervisor_session.rs index 91f40c289..3e3e753f9 100644 --- a/crates/openshell-server/src/supervisor_session.rs +++ b/crates/openshell-server/src/supervisor_session.rs @@ -630,6 +630,8 @@ pub async fn handle_connect_supervisor( error = %err, "supervisor session: failed to mark sandbox ready" ); + } else { + state.telemetry.sandbox_session_connected(&sandbox_id); } // Step 4: Spawn the session loop that reads inbound messages. @@ -650,6 +652,9 @@ pub async fn handle_connect_supervisor( .remove_if_current(&sandbox_id_clone, &session_id); if still_ours { info!(sandbox_id = %sandbox_id_clone, session_id = %session_id, "supervisor session: ended"); + state_clone + .telemetry + .sandbox_session_disconnected(&sandbox_id_clone); if let Err(err) = state_clone .compute .supervisor_session_disconnected(&sandbox_id_clone) diff --git a/crates/openshell-server/src/telemetry.rs b/crates/openshell-server/src/telemetry.rs new file mode 100644 index 000000000..7de21154e --- /dev/null +++ b/crates/openshell-server/src/telemetry.rs @@ -0,0 +1,104 @@ +// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +//! Anonymous sandbox activity telemetry forwarding. + +use openshell_core::proto::NetworkActivitySummary; +use openshell_core::telemetry::DenyGroup; +#[cfg(not(test))] +use std::collections::HashMap; + +#[derive(Debug, Default)] +pub struct TelemetryState; + +#[allow(clippy::unused_self)] +impl TelemetryState { + pub fn new() -> Self { + Self + } + + pub fn sandbox_session_connected(&self, _sandbox_id: &str) {} + + pub fn sandbox_session_disconnected(&self, _sandbox_id: &str) {} + + pub fn end_sandbox_session(&self, _sandbox_id: &str) {} + + pub fn record_network_activity(&self, sandbox_id: &str, summary: &NetworkActivitySummary) { + if sandbox_id.is_empty() || !openshell_core::telemetry::enabled() { + return; + } + #[cfg(not(test))] + emit_network_activity_summary(summary); + #[cfg(test)] + let _ = summary; + } +} + +#[allow(clippy::cast_precision_loss)] +fn calculate_denial_rate_pct(network_activity_count: u64, denied_action_count: u64) -> f64 { + if network_activity_count == 0 { + return 0.0; + } + ((denied_action_count as f64 / network_activity_count as f64) * 100.0).clamp(0.0, 100.0) +} + +fn sanitize_deny_group(raw: &str) -> DenyGroup { + DenyGroup::from_raw(raw) +} + +#[cfg(not(test))] +fn emit_network_activity_summary(summary: &NetworkActivitySummary) { + let mut denials_by_group = HashMap::::new(); + for group in &summary.denials_by_group { + let deny_group = sanitize_deny_group(&group.deny_group); + let entry = denials_by_group.entry(deny_group).or_default(); + *entry = entry.saturating_add(u64::from(group.denied_count)); + } + openshell_core::telemetry::emit_sandbox_activity_summary( + u64::from(summary.network_activity_count), + u64::from(summary.denied_action_count), + calculate_denial_rate_pct( + u64::from(summary.network_activity_count), + u64::from(summary.denied_action_count), + ), + denials_by_group, + ); +} + +#[cfg(test)] +mod tests { + use super::*; + + fn assert_float_eq(actual: f64, expected: f64) { + assert!((actual - expected).abs() <= f64::EPSILON); + } + + #[test] + fn denial_rate_handles_empty_and_clamps() { + assert_float_eq(calculate_denial_rate_pct(0, 1), 0.0); + assert_float_eq(calculate_denial_rate_pct(10, 2), 20.0); + assert_float_eq(calculate_denial_rate_pct(10, 15), 100.0); + } + + #[test] + fn deny_group_sanitization_drops_raw_values() { + assert_eq!(sanitize_deny_group("forward-l7-deny"), DenyGroup::L7Policy); + assert_eq!( + sanitize_deny_group("host=/secret.example"), + DenyGroup::Unknown + ); + assert_eq!(sanitize_deny_group("acme.internal:443"), DenyGroup::Unknown); + assert_eq!( + sanitize_deny_group("binary=/usr/local/bin/private"), + DenyGroup::Unknown + ); + } + + #[test] + fn session_lifecycle_hooks_are_noops() { + let telemetry = TelemetryState::new(); + telemetry.sandbox_session_connected("sb-1"); + telemetry.sandbox_session_disconnected("sb-1"); + telemetry.end_sandbox_session("sb-1"); + } +} diff --git a/proto/openshell.proto b/proto/openshell.proto index ca62646e3..937fa67e1 100644 --- a/proto/openshell.proto +++ b/proto/openshell.proto @@ -1426,6 +1426,25 @@ message DenialSummary { bool l7_inspection_active = 17; } +// Count of denied actions grouped only by sanitized telemetry category. +message DenialGroupCount { + // Sanitized denial category, e.g. "connect_policy", "l7_policy", "ssrf". + string deny_group = 1; + // Number of denied actions in this category. + uint32 denied_count = 2; +} + +// Anonymous sandbox network activity counters. This intentionally excludes +// hosts, paths, binaries, raw deny reasons, sandbox IDs, and user content. +message NetworkActivitySummary { + // Total observed network activities in the current window. + uint32 network_activity_count = 1; + // Total denied actions in the current window. + uint32 denied_action_count = 2; + // Denied action counts grouped by sanitized category. + repeated DenialGroupCount denials_by_group = 3; +} + // A proposed policy rule with rationale and approval status. message PolicyChunk { // Unique chunk identifier. @@ -1499,6 +1518,8 @@ message SubmitPolicyAnalysisRequest { string analysis_mode = 3; // Sandbox name. string name = 4; + // Anonymous network activity counters. + repeated NetworkActivitySummary network_activity_summaries = 5; } message SubmitPolicyAnalysisResponse {