From ebe666c2d63058c47f471330cb638f04fc0f8244 Mon Sep 17 00:00:00 2001 From: Mauro Ezequiel Moltrasio Date: Wed, 1 Jul 2026 17:48:20 +0200 Subject: [PATCH 1/4] feat(output): add basic opentelemetry output This output allows exporting file activity events as opentelemetry logs via otlp to be collected by compatible systems. TODO: * Add feature flag * Add configuration * Add integration tests * Add docs on how to use this output --- Cargo.lock | 442 ++++++++++++++++++++++++++++++++++++ fact/Cargo.toml | 3 + fact/src/event/mod.rs | 117 ++++++++++ fact/src/event/process.rs | 52 ++++- fact/src/output/mod.rs | 4 + fact/src/output/otel/mod.rs | 59 +++++ 6 files changed, 676 insertions(+), 1 deletion(-) create mode 100644 fact/src/output/otel/mod.rs diff --git a/Cargo.lock b/Cargo.lock index 6930c0bd..91fdc4e3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -414,6 +414,17 @@ dependencies = [ "thiserror", ] +[[package]] +name = "displaydoc" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ac70aa55017e108007fbaf5aa0f54b021c98f92ff8af59d42eda9da96e3dd4f" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "dtoa" version = "1.0.11" @@ -493,6 +504,9 @@ dependencies = [ "log", "native-tls", "openssl", + "opentelemetry", + "opentelemetry-otlp", + "opentelemetry_sdk", "prometheus-client", "prost", "prost-types", @@ -585,6 +599,15 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" +[[package]] +name = "form_urlencoded" +version = "1.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb4cb245038516f5f85277875cdaa4f7d2c9a0fa0468de06ed190163b1581fcf" +dependencies = [ + "percent-encoding", +] + [[package]] name = "futures-channel" version = "0.3.32" @@ -592,6 +615,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "07bbe89c50d7a535e539b8c17bc0b49bdb77747034daa8087407d655f3f7cc1d" dependencies = [ "futures-core", + "futures-sink", ] [[package]] @@ -600,6 +624,34 @@ version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7e3450815272ef58cec6d564423f6e755e25379b217b0bc688e295ba24df6b1d" +[[package]] +name = "futures-executor" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "baf29c38818342a3b26b5b923639e7b1f4a61fc5e76102d4b1981c6dc7a7579d" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cecba35d7ad927e23624b22ad55235f2239cfa44fd10428eecbeba6d6a717718" + +[[package]] +name = "futures-macro" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e835b70203e41293343137df5c0664546da5745f82ec9b84d40be8336958447b" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "futures-sink" version = "0.3.32" @@ -625,8 +677,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "389ca41296e6190b48053de0321d02a77f32f8a5d2461dd38762c0593805c6d6" dependencies = [ "futures-core", + "futures-io", + "futures-macro", "futures-sink", "futures-task", + "memchr", "pin-project-lite", "slab", ] @@ -871,13 +926,16 @@ version = "0.1.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "96547c2556ec9d12fb1578c4eaf448b04993e7fb79cbaad930a656880a6bdfa0" dependencies = [ + "base64", "bytes", "futures-channel", "futures-util", "http", "http-body", "hyper", + "ipnet", "libc", + "percent-encoding", "pin-project-lite", "socket2", "tokio", @@ -885,6 +943,109 @@ dependencies = [ "tracing", ] +[[package]] +name = "icu_collections" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2984d1cd16c883d7935b9e07e44071dca8d917fd52ecc02c04d5fa0b5a3f191c" +dependencies = [ + "displaydoc", + "potential_utf", + "utf8_iter", + "yoke", + "zerofrom", + "zerovec", +] + +[[package]] +name = "icu_locale_core" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "92219b62b3e2b4d88ac5119f8904c10f8f61bf7e95b640d25ba3075e6cac2c29" +dependencies = [ + "displaydoc", + "litemap", + "tinystr", + "writeable", + "zerovec", +] + +[[package]] +name = "icu_normalizer" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c56e5ee99d6e3d33bd91c5d85458b6005a22140021cc324cea84dd0e72cff3b4" +dependencies = [ + "icu_collections", + "icu_normalizer_data", + "icu_properties", + "icu_provider", + "smallvec", + "zerovec", +] + +[[package]] +name = "icu_normalizer_data" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da3be0ae77ea334f4da67c12f149704f19f81d1adf7c51cf482943e84a2bad38" + +[[package]] +name = "icu_properties" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bee3b67d0ea5c2cca5003417989af8996f8604e34fb9ddf96208a033901e70de" +dependencies = [ + "icu_collections", + "icu_locale_core", + "icu_properties_data", + "icu_provider", + "zerotrie", + "zerovec", +] + +[[package]] +name = "icu_properties_data" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e2bbb201e0c04f7b4b3e14382af113e17ba4f63e2c9d2ee626b720cbce54a14" + +[[package]] +name = "icu_provider" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "139c4cf31c8b5f33d7e199446eff9c1e02decfc2f0eec2c8d71f65befa45b421" +dependencies = [ + "displaydoc", + "icu_locale_core", + "writeable", + "yoke", + "zerofrom", + "zerotrie", + "zerovec", +] + +[[package]] +name = "idna" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b0875f23caa03898994f6ddc501886a45c7d3d62d04d2d90788d47be1b1e4de" +dependencies = [ + "idna_adapter", + "smallvec", + "utf8_iter", +] + +[[package]] +name = "idna_adapter" +version = "1.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb68373c0d6620ef8105e855e7745e18b0d00d3bdb07fb532e434244cdb9a714" +dependencies = [ + "icu_normalizer", + "icu_properties", +] + [[package]] name = "indexmap" version = "2.14.0" @@ -895,6 +1056,12 @@ dependencies = [ "hashbrown 0.17.1", ] +[[package]] +name = "ipnet" +version = "2.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d98f6fed1fde3f8c21bc40a1abb88dd75e67924f9cffc3ef95607bad8017f8e2" + [[package]] name = "is_terminal_polyfill" version = "1.70.2" @@ -983,6 +1150,12 @@ version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "32a66949e030da00e8c7d4434b251670a91556f4144941d37452769c25d58a53" +[[package]] +name = "litemap" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "92daf443525c4cce67b150400bc2316076100ce0b3686209eb8cf3c31612e6f0" + [[package]] name = "lock_api" version = "0.4.14" @@ -1139,6 +1312,76 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "opentelemetry" +version = "0.32.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b0142c63252a9e054e68a4c61a5778f7b14f576274d593f8ce883d191a099682" +dependencies = [ + "futures-core", + "futures-sink", + "js-sys", + "pin-project-lite", + "thiserror", + "tracing", +] + +[[package]] +name = "opentelemetry-http" +version = "0.32.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5683015d09e2df236ef005b17f6f196f0d5f6313c4fa43a7b6a53b52776e4331" +dependencies = [ + "async-trait", + "bytes", + "http", + "opentelemetry", + "reqwest", +] + +[[package]] +name = "opentelemetry-otlp" +version = "0.32.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9966929966d17620d7c316c643ba62631826e10021409357772d5eea84f62c35" +dependencies = [ + "http", + "opentelemetry", + "opentelemetry-http", + "opentelemetry-proto", + "opentelemetry_sdk", + "prost", + "reqwest", + "thiserror", +] + +[[package]] +name = "opentelemetry-proto" +version = "0.32.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "56d658ba1faf63f7b9c492cfbe6e0ec365440a16132d3270c1065f7b33f1b638" +dependencies = [ + "opentelemetry", + "opentelemetry_sdk", + "prost", +] + +[[package]] +name = "opentelemetry_sdk" +version = "0.32.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b59f80e1ac4d5ff7a2db8fb6c80badb7f0f3f858211fba08dd9aaec750894f9" +dependencies = [ + "futures-channel", + "futures-executor", + "futures-util", + "opentelemetry", + "percent-encoding", + "portable-atomic", + "rand 0.9.4", + "thiserror", +] + [[package]] name = "parking_lot" version = "0.12.5" @@ -1226,6 +1469,15 @@ dependencies = [ "portable-atomic", ] +[[package]] +name = "potential_utf" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0103b1cef7ec0cf76490e969665504990193874ea05c85ff9bab8b911d0a0564" +dependencies = [ + "zerovec", +] + [[package]] name = "ppv-lite86" version = "0.2.21" @@ -1501,6 +1753,37 @@ version = "0.8.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d6f6ff9a378485b298a5286656da665ba74413d36db0979633275d2e708145d4" +[[package]] +name = "reqwest" +version = "0.13.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "219c5811de6525e5416c7d5d53bb656d3afdbc6c5af816e0802bcfa42dbdc1c3" +dependencies = [ + "base64", + "bytes", + "futures-channel", + "futures-core", + "futures-util", + "http", + "http-body", + "http-body-util", + "hyper", + "hyper-util", + "js-sys", + "log", + "percent-encoding", + "pin-project-lite", + "sync_wrapper", + "tokio", + "tower", + "tower-http", + "tower-service", + "url", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", +] + [[package]] name = "rustc-hash" version = "2.1.3" @@ -1660,6 +1943,12 @@ dependencies = [ "lock_api", ] +[[package]] +name = "stable_deref_trait" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ce2be8dc25455e1f91df71bfa12ad37d7af1092ae736f3a6cd0e37bc7810596" + [[package]] name = "strsim" version = "0.11.1" @@ -1682,6 +1971,20 @@ name = "sync_wrapper" version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0bf256ce5efdfa370213c1dabab5935a12e49f2c58d15e9eac2870d3b4f27263" +dependencies = [ + "futures-core", +] + +[[package]] +name = "synstructure" +version = "0.13.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "728a70f3dbaf5bab7f0c4b1ac8d7ae5ea60a4b5549c8a5914361c99147a709d2" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] [[package]] name = "tempfile" @@ -1716,6 +2019,16 @@ dependencies = [ "syn", ] +[[package]] +name = "tinystr" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c8323304221c2a851516f22236c5722a72eaa19749016521d6dff0824447d96d" +dependencies = [ + "displaydoc", + "zerovec", +] + [[package]] name = "tokio" version = "1.52.3" @@ -1865,6 +2178,24 @@ dependencies = [ "tracing", ] +[[package]] +name = "tower-http" +version = "0.6.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4cfcf7e2740e6fc6d4d688b4ef00650406bb94adf4731e43c096c3a19fe40840" +dependencies = [ + "bitflags 2.13.0", + "bytes", + "futures-util", + "http", + "http-body", + "pin-project-lite", + "tower", + "tower-layer", + "tower-service", + "url", +] + [[package]] name = "tower-layer" version = "0.3.3" @@ -1926,6 +2257,24 @@ version = "1.0.24" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e6e4313cd5fcd3dad5cafa179702e2b244f760991f45397d14d4ebf38247da75" +[[package]] +name = "url" +version = "2.5.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff67a8a4397373c3ef660812acab3268222035010ab8680ec4215f38ba3d0eed" +dependencies = [ + "form_urlencoded", + "idna", + "percent-encoding", + "serde", +] + +[[package]] +name = "utf8_iter" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be" + [[package]] name = "utf8parse" version = "0.2.2" @@ -1986,6 +2335,16 @@ dependencies = [ "wasm-bindgen-shared", ] +[[package]] +name = "wasm-bindgen-futures" +version = "0.4.76" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c62df1340f32221cb9c54d6a27b030e3dba64361d4a95bed55f9aacb44da291d" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + [[package]] name = "wasm-bindgen-macro" version = "0.2.126" @@ -2081,6 +2440,12 @@ version = "0.57.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1ebf944e87a7c253233ad6766e082e3cd714b5d03812acc24c318f549614536e" +[[package]] +name = "writeable" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ffae5123b2d3fc086436f8834ae3ab053a283cfac8fe0a0b8eaae044768a4c4" + [[package]] name = "yaml-rust2" version = "0.11.0" @@ -2092,6 +2457,29 @@ dependencies = [ "hashlink", ] +[[package]] +name = "yoke" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "709fe23a0424b6a435d82152b1bd3fdfb0833487d5fa90d05d42762a9891fef5" +dependencies = [ + "stable_deref_trait", + "yoke-derive", + "zerofrom", +] + +[[package]] +name = "yoke-derive" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "de844c262c8848816172cef550288e7dc6c7b7814b4ee56b3e1553f275f1858e" +dependencies = [ + "proc-macro2", + "quote", + "syn", + "synstructure", +] + [[package]] name = "zerocopy" version = "0.8.52" @@ -2112,6 +2500,60 @@ dependencies = [ "syn", ] +[[package]] +name = "zerofrom" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ec05a11813ea801ff6d75110ad09cd0824ddba17dfe17128ea0d5f68e6c5272" +dependencies = [ + "zerofrom-derive", +] + +[[package]] +name = "zerofrom-derive" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "11532158c46691caf0f2593ea8358fed6bbf68a0315e80aae9bd41fbade684a1" +dependencies = [ + "proc-macro2", + "quote", + "syn", + "synstructure", +] + +[[package]] +name = "zerotrie" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f9152d31db0792fa83f70fb2f83148effb5c1f5b8c7686c3459e361d9bc20bf" +dependencies = [ + "displaydoc", + "yoke", + "zerofrom", +] + +[[package]] +name = "zerovec" +version = "0.11.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "90f911cbc359ab6af17377d242225f4d75119aec87ea711a880987b18cd7b239" +dependencies = [ + "yoke", + "zerofrom", + "zerovec-derive", +] + +[[package]] +name = "zerovec-derive" +version = "0.11.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "625dc425cab0dca6dc3c3319506e6593dcb08a9f387ea3b284dbd52a92c40555" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "zmij" version = "1.0.21" diff --git a/fact/Cargo.toml b/fact/Cargo.toml index 8bba7aff..3501995d 100644 --- a/fact/Cargo.toml +++ b/fact/Cargo.toml @@ -20,6 +20,9 @@ hyper-util = { workspace = true } libc = { workspace = true } log = { workspace = true } native-tls = { workspace = true } +opentelemetry = { version = "0.32.0" } +opentelemetry-otlp = { version = "0.32.0" } +opentelemetry_sdk = { version = "0.32.1", features = [ "logs"] } openssl = { workspace = true } tonic = { workspace = true } tokio = { workspace = true } diff --git a/fact/src/event/mod.rs b/fact/src/event/mod.rs index a91c5f00..c863a14b 100644 --- a/fact/src/event/mod.rs +++ b/fact/src/event/mod.rs @@ -1,12 +1,14 @@ #[cfg(all(test, feature = "bpf-test"))] use std::time::{SystemTime, UNIX_EPOCH}; use std::{ + collections::HashMap, ffi::{CStr, OsStr}, os::{raw::c_char, unix::ffi::OsStrExt}, path::{Path, PathBuf}, }; use globset::GlobSet; +use opentelemetry::logs::AnyValue; use serde::Serialize; use fact_ebpf::{ @@ -357,6 +359,17 @@ impl From for fact_api::FileActivity { } } +impl From for opentelemetry::logs::AnyValue { + fn from(value: Event) -> Self { + AnyValue::Map(Box::new(HashMap::from([ + ("file".into(), value.file.into()), + ("timestamp".into(), AnyValue::Int(value.timestamp as i64)), + ("process".into(), value.process.into()), + ("hostname".into(), value.hostname.into()), + ]))) + } +} + #[cfg(test)] impl PartialEq for Event { fn eq(&self, other: &Self) -> bool { @@ -444,6 +457,21 @@ impl FileData { Ok(file) } + + fn event_type(&self) -> &'static str { + match self { + FileData::Open(_) => "open", + FileData::Creation(_) => "creation", + FileData::MkDir(_) => "mkdir", + FileData::RmDir(_) => "rmdir", + FileData::Unlink(_) => "unlink", + FileData::Chmod(_) => "chmod", + FileData::Chown(_) => "chown", + FileData::Rename(_) => "rename", + FileData::SetXattr(_) => "setxattr", + FileData::RemoveXattr(_) => "removexattr", + } + } } impl From for fact_api::file_activity::File { @@ -494,6 +522,29 @@ impl From for fact_api::file_activity::File { } } +impl From for opentelemetry::logs::AnyValue { + fn from(value: FileData) -> Self { + let event_type = value.event_type(); + let AnyValue::Map(mut map) = (match value { + FileData::Open(data) + | FileData::Creation(data) + | FileData::MkDir(data) + | FileData::RmDir(data) + | FileData::Unlink(data) => AnyValue::from(data), + FileData::Chmod(data) => AnyValue::from(data), + FileData::Chown(data) => AnyValue::from(data), + FileData::Rename(data) => AnyValue::from(data), + FileData::SetXattr(data) | FileData::RemoveXattr(data) => AnyValue::from(data), + }) else { + unreachable!("event data did not serialize to map"); + }; + + map.insert("event_type".into(), event_type.into()); + + AnyValue::Map(map) + } +} + #[cfg(test)] impl PartialEq for FileData { fn eq(&self, other: &Self) -> bool { @@ -554,6 +605,21 @@ impl From for fact_api::FileActivityBase { } } +impl From for opentelemetry::logs::AnyValue { + fn from(value: BaseFileData) -> Self { + AnyValue::Map(Box::new(HashMap::from([ + ( + "filename".into(), + value.filename.to_string_lossy().to_string().into(), + ), + ( + "host_path".into(), + value.host_file.to_string_lossy().to_string().into(), + ), + ]))) + } +} + #[derive(Debug, Clone, Serialize)] pub struct ChmodFileData { inner: BaseFileData, @@ -576,6 +642,20 @@ impl From for fact_api::FilePermissionChange { } } +impl From for opentelemetry::logs::AnyValue { + fn from(value: ChmodFileData) -> Self { + let AnyValue::Map(mut map) = value.inner.into() else { + unreachable!("inner value did not serialize to map"); + }; + map.extend([ + ("new_mode".into(), value.new_mode.into()), + ("old_mode".into(), value.old_mode.into()), + ]); + + AnyValue::Map(map) + } +} + #[cfg(test)] impl PartialEq for ChmodFileData { fn eq(&self, other: &Self) -> bool { @@ -613,6 +693,22 @@ impl From for fact_api::FileOwnershipChange { } } +impl From for opentelemetry::logs::AnyValue { + fn from(value: ChownFileData) -> Self { + let AnyValue::Map(mut map) = value.inner.into() else { + unreachable!("inner value did not serialize to map"); + }; + map.extend([ + ("new_uid".into(), value.new_uid.into()), + ("old_uid".into(), value.old_uid.into()), + ("new_gid".into(), value.new_gid.into()), + ("old_gid".into(), value.old_gid.into()), + ]); + + AnyValue::Map(map) + } +} + #[derive(Debug, Clone, Serialize)] pub struct RenameFileData { new: BaseFileData, @@ -630,6 +726,16 @@ impl From for fact_api::FileRename { } } +impl From for opentelemetry::logs::AnyValue { + fn from(value: RenameFileData) -> Self { + let AnyValue::Map(mut map) = value.new.into() else { + unreachable!("new value did not serialize to map"); + }; + map.insert("old".into(), value.old.into()); + AnyValue::Map(map) + } +} + #[cfg(test)] impl PartialEq for RenameFileData { fn eq(&self, other: &Self) -> bool { @@ -653,6 +759,17 @@ impl From for fact_api::FileXattrChange { } } +impl From for opentelemetry::logs::AnyValue { + fn from(value: XattrFileData) -> Self { + let AnyValue::Map(mut map) = value.inner.into() else { + unreachable!("inner value did not serialize to map"); + }; + map.insert("xattr_name".into(), value.xattr_name.into()); + + AnyValue::Map(map) + } +} + #[cfg(test)] impl PartialEq for XattrFileData { fn eq(&self, other: &Self) -> bool { diff --git a/fact/src/event/process.rs b/fact/src/event/process.rs index f295ac66..048b9a44 100644 --- a/fact/src/event/process.rs +++ b/fact/src/event/process.rs @@ -1,6 +1,7 @@ -use std::{ffi::CStr, path::PathBuf}; +use std::{collections::HashMap, ffi::CStr, path::PathBuf}; use fact_ebpf::{lineage_t, process_t}; +use opentelemetry::logs::AnyValue; use serde::Serialize; use uuid::Uuid; @@ -38,6 +39,18 @@ impl From for fact_api::process_signal::LineageInfo { } } +impl From for opentelemetry::logs::AnyValue { + fn from(value: Lineage) -> Self { + AnyValue::Map(Box::new(HashMap::from([ + ("uid".into(), value.uid.into()), + ( + "exec_path".into(), + value.exe_path.to_string_lossy().to_string().into(), + ), + ]))) + } +} + #[derive(Debug, Clone, Default, Serialize)] pub struct Process { comm: String, @@ -220,6 +233,43 @@ impl From for fact_api::ProcessSignal { } } +impl From for opentelemetry::logs::AnyValue { + fn from(value: Process) -> Self { + let args = value + .args + .into_iter() + .map(AnyValue::from) + .collect::>(); + + let lineage = value + .lineage + .into_iter() + .map(AnyValue::from) + .collect::>(); + + let mut map = HashMap::from([ + ("comm".into(), value.comm.into()), + ("args".into(), AnyValue::ListAny(Box::new(args))), + ( + "exe_path".into(), + value.exe_path.to_string_lossy().to_string().into(), + ), + ("uid".into(), value.uid.into()), + ("gid".into(), value.gid.into()), + ("login_uid".into(), value.login_uid.into()), + ("username".into(), value.username.into()), + ("in_root_mount_ns".into(), value.in_root_mount_ns.into()), + ("lineage".into(), AnyValue::ListAny(Box::new(lineage))), + ]); + + if let Some(container_id) = value.container_id { + map.insert("container_id".into(), container_id.into()); + } + + AnyValue::Map(Box::new(map)) + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/fact/src/output/mod.rs b/fact/src/output/mod.rs index 756c8eec..492fc4be 100644 --- a/fact/src/output/mod.rs +++ b/fact/src/output/mod.rs @@ -10,6 +10,7 @@ use tokio::{ use crate::{config::GrpcConfig, event::Event, metrics::OutputMetrics}; mod grpc; +mod otel; mod stdout; /// Starts all the output tasks. @@ -33,6 +34,8 @@ pub fn start( config.clone(), ); + let otel_client = otel::Client::new(broad_rx.resubscribe()); + // JSON client will only start if explicitly enabled or no other // output is active at startup if !grpc_client.is_enabled() || stdout_enabled { @@ -45,6 +48,7 @@ pub fn start( } let mut grpc_handle = grpc_client.start(); + otel_client.start(); tokio::spawn(async move { debug!("Starting output component..."); diff --git a/fact/src/output/otel/mod.rs b/fact/src/output/otel/mod.rs new file mode 100644 index 00000000..de55f589 --- /dev/null +++ b/fact/src/output/otel/mod.rs @@ -0,0 +1,59 @@ +use std::sync::Arc; + +use log::warn; +use opentelemetry::logs::{AnyValue, LogRecord, Logger, LoggerProvider, Severity}; +use opentelemetry_otlp::{LogExporter, WithExportConfig}; +use opentelemetry_sdk::Resource; +use opentelemetry_sdk::logs::SdkLoggerProvider; +use tokio::sync::broadcast::{self, error::RecvError}; + +use crate::event::Event; + +pub(super) struct Client { + rx: broadcast::Receiver>, +} + +impl Client { + pub(super) fn new(rx: broadcast::Receiver>) -> Self { + Client { rx } + } + + pub(super) fn start(mut self) { + tokio::spawn(async move { + let exporter_otlp = LogExporter::builder() + .with_http() + .with_protocol(opentelemetry_otlp::Protocol::HttpBinary) + .with_endpoint("http://127.0.0.1:4318/v1/logs") + .build() + .expect("Failed to create log exporter"); + + let logger_provider = SdkLoggerProvider::builder() + .with_batch_exporter(exporter_otlp) + .with_resource(Resource::builder().with_service_name("fact").build()) + .build(); + let logger = logger_provider.logger("fact"); + + loop { + match self.rx.recv().await { + Ok(event) => { + let event = Arc::unwrap_or_clone(event).into(); + let mut record = logger.create_log_record(); + record.set_severity_number(Severity::Info); + if let AnyValue::Map(map) = event { + for (k, v) in *map { + record.add_attribute(k, v); + } + } + logger.emit(record); + } + Err(RecvError::Closed) => break, + Err(RecvError::Lagged(n)) => { + warn!("oTel stream lagged, dropped {n} events"); + } + } + } + + let _ = logger_provider.shutdown(); + }); + } +} From c2d0d32cd870c952fbda74d428169fa27bcc0fca Mon Sep 17 00:00:00 2001 From: Mauro Ezequiel Moltrasio Date: Fri, 3 Jul 2026 14:56:14 +0200 Subject: [PATCH 2/4] feat(otel): add basic configuration --- fact/src/config/mod.rs | 65 ++++++- fact/src/config/reloader.rs | 21 +++ fact/src/config/tests.rs | 358 +++++++++++++++++++++++------------- fact/src/lib.rs | 1 + fact/src/metrics/mod.rs | 8 + fact/src/output/mod.rs | 45 +++-- fact/src/output/otel.rs | 123 +++++++++++++ fact/src/output/otel/mod.rs | 59 ------ 8 files changed, 472 insertions(+), 208 deletions(-) create mode 100644 fact/src/output/otel.rs delete mode 100644 fact/src/output/otel/mod.rs diff --git a/fact/src/config/mod.rs b/fact/src/config/mod.rs index ff829eb5..88daa519 100644 --- a/fact/src/config/mod.rs +++ b/fact/src/config/mod.rs @@ -34,6 +34,7 @@ fn yaml_to_duration_secs(v: &Yaml) -> Option { pub struct FactConfig { paths: Option>, pub grpc: GrpcConfig, + pub otel: OTelConfig, pub endpoint: EndpointConfig, pub bpf: BpfConfig, skip_pre_flight: Option, @@ -72,7 +73,7 @@ impl FactConfig { )?; // Once file configuration is handled, apply CLI arguments - static CLI_ARGS: LazyLock = LazyLock::new(|| FactCli::parse().to_config()); + static CLI_ARGS: LazyLock = LazyLock::new(|| FactCli::parse().into_config()); config.update(&CLI_ARGS); Ok(config) @@ -84,6 +85,7 @@ impl FactConfig { } self.grpc.update(&from.grpc); + self.otel.update(&from.otel); self.endpoint.update(&from.endpoint); self.bpf.update(&from.bpf); @@ -196,6 +198,10 @@ impl TryFrom> for FactConfig { let grpc = v.as_hash().unwrap(); config.grpc = GrpcConfig::try_from(grpc)?; } + "otel" if v.is_hash() => { + let otel = v.as_hash().unwrap(); + config.otel = OTelConfig::try_from(otel)?; + } "endpoint" if v.is_hash() => { let endpoint = v.as_hash().unwrap(); config.endpoint = EndpointConfig::try_from(endpoint)?; @@ -492,6 +498,48 @@ impl TryFrom<&yaml::Hash> for GrpcConfig { } } +#[derive(Debug, Default, PartialEq, Eq, Clone)] +pub struct OTelConfig { + endpoint: Option, +} + +impl OTelConfig { + fn update(&mut self, from: &OTelConfig) { + if let Some(endpoint) = from.endpoint.as_deref() { + self.endpoint = Some(endpoint.to_owned()); + } + } + + pub fn endpoint(&self) -> Option<&str> { + self.endpoint.as_deref() + } +} + +impl TryFrom<&yaml::Hash> for OTelConfig { + type Error = anyhow::Error; + + fn try_from(value: &yaml::Hash) -> Result { + let mut otel = OTelConfig::default(); + for (k, v) in value.iter() { + let Some(k) = k.as_str() else { + bail!("key is not string: {k:?}"); + }; + + match k { + "endpoint" => { + let Some(endpoint) = v.as_str() else { + bail!("otel.endpoint field has incorrect type: {v:?}") + }; + otel.endpoint = Some(endpoint.to_owned()); + } + name => bail!("Invalid field 'otel.{name}' with value: {v:?}"), + } + } + + Ok(otel) + } +} + #[derive(Debug, Default, PartialEq, Eq, Clone)] pub struct BpfConfig { ringbuf_size: Option, @@ -627,6 +675,10 @@ pub struct FactCli { #[arg(long, env = "FACT_GRPC_BACKOFF_RETRIES_MAX")] backoff_retries_max: Option, + /// OpenTelemetry endpoint to push logs into + #[arg(long, env = "FACT_OTEL_ENDPOINT")] + otel_endpoint: Option, + /// The port to bind for all exposed endpoints #[arg(long, short, env = "FACT_ENDPOINT_ADDRESS")] address: Option, @@ -711,12 +763,12 @@ pub struct FactCli { } impl FactCli { - fn to_config(&self) -> FactConfig { + fn into_config(self) -> FactConfig { FactConfig { - paths: self.paths.clone(), + paths: self.paths, grpc: GrpcConfig { - url: self.url.clone(), - certs: self.certs.clone(), + url: self.url, + certs: self.certs, backoff: BackoffConfig { initial: self.backoff_initial, max: self.backoff_max, @@ -725,6 +777,9 @@ impl FactCli { retries_max: self.backoff_retries_max, }, }, + otel: OTelConfig { + endpoint: self.otel_endpoint, + }, endpoint: EndpointConfig { address: self.address, expose_metrics: resolve_bool_arg(self.expose_metrics, self.no_expose_metrics), diff --git a/fact/src/config/reloader.rs b/fact/src/config/reloader.rs index 2ac14742..48baecb7 100644 --- a/fact/src/config/reloader.rs +++ b/fact/src/config/reloader.rs @@ -9,12 +9,15 @@ use tokio::{ time::interval, }; +use crate::config::OTelConfig; + use super::{CONFIG_FILES, EndpointConfig, FactConfig, GrpcConfig}; pub struct Reloader { config: FactConfig, endpoint: watch::Sender, grpc: watch::Sender, + otel: watch::Sender, paths: watch::Sender>, files: HashMap<&'static str, i64>, scan_interval: watch::Sender, @@ -71,6 +74,12 @@ impl Reloader { self.grpc.subscribe() } + /// Subscribe to get notifications when otel configuration is + /// changed. + pub fn otel(&self) -> watch::Receiver { + self.otel.subscribe() + } + /// Subscribe to get notifications when paths configuration is /// changed. pub fn paths(&self) -> watch::Receiver> { @@ -174,6 +183,16 @@ impl Reloader { } }); + self.otel.send_if_modified(|old| { + if *old != new.otel { + debug!("Sending new OTel configuration..."); + *old = new.otel.clone(); + true + } else { + false + } + }); + self.paths.send_if_modified(|old| { let new = new.paths(); if *old != new { @@ -238,6 +257,7 @@ impl From for Reloader { .collect(); let (endpoint, _) = watch::channel(config.endpoint.clone()); let (grpc, _) = watch::channel(config.grpc.clone()); + let (otel, _) = watch::channel(config.otel.clone()); let (paths, _) = watch::channel(config.paths().to_vec()); let (scan_interval, _) = watch::channel(config.scan_interval()); let (rate_limit, _) = watch::channel(config.rate_limit()); @@ -247,6 +267,7 @@ impl From for Reloader { config, endpoint, grpc, + otel, paths, scan_interval, rate_limit, diff --git a/fact/src/config/tests.rs b/fact/src/config/tests.rs index 01edbbbf..57f4d401 100644 --- a/fact/src/config/tests.rs +++ b/fact/src/config/tests.rs @@ -49,6 +49,144 @@ fn parsing() { ..Default::default() }, ), + ( + r#" + grpc: + backoff: + initial: 2 + "#, + FactConfig { + grpc: GrpcConfig { + backoff: BackoffConfig { + initial: Some(Duration::from_secs(2)), + ..Default::default() + }, + ..Default::default() + }, + ..Default::default() + }, + ), + ( + r#" + grpc: + backoff: + max: 30 + "#, + FactConfig { + grpc: GrpcConfig { + backoff: BackoffConfig { + max: Some(Duration::from_secs(30)), + ..Default::default() + }, + ..Default::default() + }, + ..Default::default() + }, + ), + ( + r#" + grpc: + backoff: + jitter: false + "#, + FactConfig { + grpc: GrpcConfig { + backoff: BackoffConfig { + jitter: Some(false), + ..Default::default() + }, + ..Default::default() + }, + ..Default::default() + }, + ), + ( + r#" + grpc: + backoff: + multiplier: 2 + "#, + FactConfig { + grpc: GrpcConfig { + backoff: BackoffConfig { + multiplier: Some(2.0), + ..Default::default() + }, + ..Default::default() + }, + ..Default::default() + }, + ), + ( + r#" + grpc: + backoff: + multiplier: 3.5 + "#, + FactConfig { + grpc: GrpcConfig { + backoff: BackoffConfig { + multiplier: Some(3.5), + ..Default::default() + }, + ..Default::default() + }, + ..Default::default() + }, + ), + ( + r#" + grpc: + backoff: + retries: 5 + "#, + FactConfig { + grpc: GrpcConfig { + backoff: BackoffConfig { + retries_max: Some(5), + ..Default::default() + }, + ..Default::default() + }, + ..Default::default() + }, + ), + ( + r#" + grpc: + backoff: + initial: 0.5 + max: 120 + jitter: false + multiplier: 2 + retries: 5 + "#, + FactConfig { + grpc: GrpcConfig { + backoff: BackoffConfig { + initial: Some(Duration::from_secs_f64(0.5)), + max: Some(Duration::from_secs(120)), + jitter: Some(false), + multiplier: Some(2.0), + retries_max: Some(5), + }, + ..Default::default() + }, + ..Default::default() + }, + ), + ( + r#" + otel: + endpoint: http://localhost:4317 + "#, + FactConfig { + otel: OTelConfig { + endpoint: Some("http://localhost:4317".into()), + }, + ..Default::default() + }, + ), ( r#" endpoint: @@ -255,136 +393,12 @@ fn parsing() { ..Default::default() }, ), - ( - r#" - grpc: - backoff: - initial: 2 - "#, - FactConfig { - grpc: GrpcConfig { - backoff: BackoffConfig { - initial: Some(Duration::from_secs(2)), - ..Default::default() - }, - ..Default::default() - }, - ..Default::default() - }, - ), - ( - r#" - grpc: - backoff: - max: 30 - "#, - FactConfig { - grpc: GrpcConfig { - backoff: BackoffConfig { - max: Some(Duration::from_secs(30)), - ..Default::default() - }, - ..Default::default() - }, - ..Default::default() - }, - ), - ( - r#" - grpc: - backoff: - jitter: false - "#, - FactConfig { - grpc: GrpcConfig { - backoff: BackoffConfig { - jitter: Some(false), - ..Default::default() - }, - ..Default::default() - }, - ..Default::default() - }, - ), - ( - r#" - grpc: - backoff: - multiplier: 2 - "#, - FactConfig { - grpc: GrpcConfig { - backoff: BackoffConfig { - multiplier: Some(2.0), - ..Default::default() - }, - ..Default::default() - }, - ..Default::default() - }, - ), - ( - r#" - grpc: - backoff: - multiplier: 3.5 - "#, - FactConfig { - grpc: GrpcConfig { - backoff: BackoffConfig { - multiplier: Some(3.5), - ..Default::default() - }, - ..Default::default() - }, - ..Default::default() - }, - ), - ( - r#" - grpc: - backoff: - retries: 5 - "#, - FactConfig { - grpc: GrpcConfig { - backoff: BackoffConfig { - retries_max: Some(5), - ..Default::default() - }, - ..Default::default() - }, - ..Default::default() - }, - ), - ( - r#" - grpc: - backoff: - initial: 0.5 - max: 120 - jitter: false - multiplier: 2 - retries: 5 - "#, - FactConfig { - grpc: GrpcConfig { - backoff: BackoffConfig { - initial: Some(Duration::from_secs_f64(0.5)), - max: Some(Duration::from_secs(120)), - jitter: Some(false), - multiplier: Some(2.0), - retries_max: Some(5), - }, - ..Default::default() - }, - ..Default::default() - }, - ), ( r#" paths: - /etc + otel: + endpoint: 'http://localhost:4317' grpc: url: 'https://svc.sensor.stackrox:9090' certs: /etc/stackrox/certs @@ -419,6 +433,9 @@ fn parsing() { retries_max: Some(5), }, }, + otel: OTelConfig { + endpoint: Some("http://localhost:4317".into()), + }, endpoint: EndpointConfig { address: Some(SocketAddr::from(([0, 0, 0, 0], 8080))), expose_metrics: Some(true), @@ -596,6 +613,26 @@ paths: "#, "Invalid field 'grpc.backoff.unknown' with value: Integer(4)", ), + ( + r#" + otel: 5 + "#, + "Invalid field 'otel' with value: Integer(5)", + ), + ( + r#" + otel: + something: true + "#, + "Invalid field 'otel.something' with value: Boolean(true)", + ), + ( + r#" + otel: + endpoint: false + "#, + "otel.endpoint field has incorrect type: Boolean(false)", + ), ( "endpoint: true", "Invalid field 'endpoint' with value: Boolean(true)", @@ -1148,6 +1185,37 @@ fn update() { ..Default::default() }, ), + ( + r#" + otel: + endpoint: 'http://localhost:4317' + "#, + FactConfig::default(), + FactConfig { + otel: OTelConfig { + endpoint: Some(String::from("http://localhost:4317")), + }, + ..Default::default() + }, + ), + ( + r#" + otel: + endpoint: 'http://localhost:4317' + "#, + FactConfig { + otel: OTelConfig { + endpoint: Some(String::from("http://localhost:1234")), + }, + ..Default::default() + }, + FactConfig { + otel: OTelConfig { + endpoint: Some(String::from("http://localhost:4317")), + }, + ..Default::default() + }, + ), ( r#" endpoint: @@ -1516,6 +1584,8 @@ fn update() { jitter: false multiplier: 3.0 retries: 5 + otel: + endpoint: 'http://localhost:4317' endpoint: address: 127.0.0.1:8080 expose_metrics: true @@ -1541,6 +1611,9 @@ fn update() { retries_max: Some(20), }, }, + otel: OTelConfig { + endpoint: Some(String::from("http://localhost:1234")), + }, endpoint: EndpointConfig { address: Some(SocketAddr::from(([0, 0, 0, 0], 9000))), expose_metrics: Some(false), @@ -1569,6 +1642,9 @@ fn update() { retries_max: Some(5), }, }, + otel: OTelConfig { + endpoint: Some(String::from("http://localhost:4317")), + }, endpoint: EndpointConfig { address: Some(SocketAddr::from(([127, 0, 0, 1], 8080))), expose_metrics: Some(true), @@ -1678,7 +1754,7 @@ impl Display for EnvVar { fn with_env_var(env: EnvVar) -> Result { let _guard = env.set(); - FactCli::try_parse_from(["fact"]).map(|cli| cli.to_config()) + FactCli::try_parse_from(["fact"]).map(|cli| cli.into_config()) } #[test] @@ -1866,6 +1942,18 @@ fn env_vars() { ..Default::default() }, ), + ( + EnvVar { + name: "FACT_OTEL_ENDPOINT", + value: "http://localhost:4317", + }, + FactConfig { + otel: OTelConfig { + endpoint: Some(String::from("http://localhost:4317")), + }, + ..Default::default() + }, + ), ( EnvVar { name: "FACT_ENDPOINT_ADDRESS", @@ -2011,6 +2099,22 @@ fn env_vars_override_yaml() { ..Default::default() }, ), + ( + EnvVar { + name: "FACT_OTEL_ENDPOINT", + value: "http://localhost:4317", + }, + r#" + otel: + endpoint: 'http://localhost:1234' + "#, + FactConfig { + otel: OTelConfig { + endpoint: Some(String::from("http://localhost:4317")), + }, + ..Default::default() + }, + ), ( EnvVar { name: "FACT_PATHS", diff --git a/fact/src/lib.rs b/fact/src/lib.rs index 8ec187f6..94fe1cc4 100644 --- a/fact/src/lib.rs +++ b/fact/src/lib.rs @@ -118,6 +118,7 @@ pub async fn run(config: FactConfig) -> anyhow::Result<()> { running.subscribe(), exporter.metrics.output.clone(), reloader.grpc(), + reloader.otel(), reloader.config().json(), ); let mut host_scanner_handle = host_scanner.start(); diff --git a/fact/src/metrics/mod.rs b/fact/src/metrics/mod.rs index 98297329..af8cdebc 100644 --- a/fact/src/metrics/mod.rs +++ b/fact/src/metrics/mod.rs @@ -101,6 +101,7 @@ impl EventCounter { pub struct OutputMetrics { pub stdout: EventCounter, pub grpc: EventCounter, + pub otel: EventCounter, } impl OutputMetrics { @@ -116,16 +117,23 @@ impl OutputMetrics { "Events processed by the grpc output component", &labels, ); + let otel_counter = EventCounter::new( + "output_otel_events", + "Events processed by the otel output component", + &labels, + ); OutputMetrics { stdout: stdout_counter, grpc: grpc_counter, + otel: otel_counter, } } fn register(&self, reg: &mut Registry) { self.stdout.register(reg); self.grpc.register(reg); + self.otel.register(reg); } } diff --git a/fact/src/output/mod.rs b/fact/src/output/mod.rs index 492fc4be..734c04f0 100644 --- a/fact/src/output/mod.rs +++ b/fact/src/output/mod.rs @@ -1,13 +1,17 @@ use std::{borrow::BorrowMut, sync::Arc}; -use anyhow::bail; use log::{debug, warn}; use tokio::{ sync::{broadcast, mpsc, watch}, task::JoinHandle, }; -use crate::{config::GrpcConfig, event::Event, metrics::OutputMetrics}; +use crate::{ + config::{GrpcConfig, OTelConfig}, + event::Event, + flatten_task_result, + metrics::OutputMetrics, +}; mod grpc; mod otel; @@ -21,7 +25,8 @@ pub fn start( mut rx: mpsc::Receiver, running: watch::Receiver, metrics: OutputMetrics, - config: watch::Receiver, + grpc_config: watch::Receiver, + otel_config: watch::Receiver, stdout_enabled: bool, ) -> JoinHandle> { let (broad_tx, broad_rx) = broadcast::channel(100); @@ -31,14 +36,19 @@ pub fn start( broad_rx.resubscribe(), running.clone(), metrics.grpc.clone(), - config.clone(), + grpc_config, ); - let otel_client = otel::Client::new(broad_rx.resubscribe()); + let otel_client = otel::Client::new( + broad_rx.resubscribe(), + running.clone(), + metrics.otel.clone(), + otel_config, + ); // JSON client will only start if explicitly enabled or no other // output is active at startup - if !grpc_client.is_enabled() || stdout_enabled { + if (!grpc_client.is_enabled() && !otel_client.is_enabled()) || stdout_enabled { stdout::Client::new( broad_rx.resubscribe(), running.clone(), @@ -48,15 +58,17 @@ pub fn start( } let mut grpc_handle = grpc_client.start(); - otel_client.start(); + let mut otel_handle = otel_client.start(); tokio::spawn(async move { debug!("Starting output component..."); - loop { + let res = loop { tokio::select! { event = rx.recv() => { let Some(event) = event else { - break; + // Channel has been closed and no more messages + // are present. + break Ok(()); }; if let Err(e) = broad_tx.send(Arc::new(event)) { @@ -64,16 +76,15 @@ pub fn start( } } res = grpc_handle.borrow_mut() => { - match res { - Ok(Ok(_)) => break, - Ok(Err(e)) => bail!("gRPC worker errored out: {e:?}"), - Err(e) => bail!("gRPC task errored out: {e:?}"), - } + break flatten_task_result("gRPC", res); + } + res = otel_handle.borrow_mut() => { + break flatten_task_result("oTel", res); } - _ = run.changed() => if !*run.borrow() { break; } + _ = run.changed() => if !*run.borrow() { break Ok(()); } } - } + }; debug!("Stopping output component..."); - Ok(()) + res }) } diff --git a/fact/src/output/otel.rs b/fact/src/output/otel.rs new file mode 100644 index 00000000..6d1d4c31 --- /dev/null +++ b/fact/src/output/otel.rs @@ -0,0 +1,123 @@ +use std::sync::Arc; + +use anyhow::bail; +use log::{debug, info, warn}; +use opentelemetry::logs::{AnyValue, LogRecord, Logger, LoggerProvider, Severity}; +use opentelemetry_otlp::{LogExporter, WithExportConfig}; +use opentelemetry_sdk::Resource; +use opentelemetry_sdk::logs::SdkLoggerProvider; +use tokio::{ + sync::{ + broadcast::{self, error::RecvError}, + watch, + }, + task::JoinHandle, +}; + +use crate::{config::OTelConfig, event::Event, metrics::EventCounter}; + +pub(super) struct Client { + rx: broadcast::Receiver>, + running: watch::Receiver, + config: watch::Receiver, + metrics: EventCounter, +} + +impl Client { + pub(super) fn new( + rx: broadcast::Receiver>, + running: watch::Receiver, + metrics: EventCounter, + config: watch::Receiver, + ) -> Self { + Client { + rx, + running, + config, + metrics, + } + } + + pub(super) fn start(mut self) -> JoinHandle> { + tokio::spawn(async move { + loop { + let res = if self.is_enabled() { + self.run().await + } else { + self.idle().await + }; + + match res { + Ok(true) => info!("Reloading oTel configuration..."), + Ok(false) => { + info!("Stopping oTel output..."); + break; + } + Err(e) => bail!("oTel error: {e:?}"), + } + } + Ok(()) + }) + } + + async fn run(&mut self) -> anyhow::Result { + let Some(endpoint) = self.config.borrow().endpoint().map(|e| e.to_string()) else { + bail!("Attempted to unwrap empty endpoint"); + }; + debug!("oTel: forwarding events to {endpoint}"); + let exporter_otlp = LogExporter::builder() + .with_http() + .with_protocol(opentelemetry_otlp::Protocol::HttpBinary) + .with_endpoint(endpoint) + .build() + .expect("Failed to create log exporter"); + + let logger_provider = SdkLoggerProvider::builder() + .with_batch_exporter(exporter_otlp) + .with_resource(Resource::builder().with_service_name("fact").build()) + .build(); + let logger = logger_provider.logger("fact"); + + let res = loop { + tokio::select! { + event = self.rx.recv() => { + match event { + Ok(event) => { + self.metrics.added(); + let event = Arc::unwrap_or_clone(event).into(); + let mut record = logger.create_log_record(); + record.set_severity_number(Severity::Info); + if let AnyValue::Map(map) = event { + for (k, v) in *map { + record.add_attribute(k, v); + } + } + logger.emit(record); + } + Err(RecvError::Closed) => break Err(anyhow::anyhow!("oTel: event stream closed")), + Err(RecvError::Lagged(n)) => { + warn!("oTel stream lagged, dropped {n} events"); + self.metrics.dropped_n(n); + } + } + } + _ = self.config.changed() => break Ok(true), + _ = self.running.changed() => break Ok(*self.running.borrow()), + } + }; + + logger_provider.shutdown()?; + res + } + + pub(super) fn is_enabled(&self) -> bool { + self.config.borrow().endpoint().is_some() + } + + async fn idle(&mut self) -> anyhow::Result { + tokio::select! { + _ = self.config.changed() => Ok(true), + _ = self.running.changed() => Ok(*self.running.borrow()), + } + } +} diff --git a/fact/src/output/otel/mod.rs b/fact/src/output/otel/mod.rs deleted file mode 100644 index de55f589..00000000 --- a/fact/src/output/otel/mod.rs +++ /dev/null @@ -1,59 +0,0 @@ -use std::sync::Arc; - -use log::warn; -use opentelemetry::logs::{AnyValue, LogRecord, Logger, LoggerProvider, Severity}; -use opentelemetry_otlp::{LogExporter, WithExportConfig}; -use opentelemetry_sdk::Resource; -use opentelemetry_sdk::logs::SdkLoggerProvider; -use tokio::sync::broadcast::{self, error::RecvError}; - -use crate::event::Event; - -pub(super) struct Client { - rx: broadcast::Receiver>, -} - -impl Client { - pub(super) fn new(rx: broadcast::Receiver>) -> Self { - Client { rx } - } - - pub(super) fn start(mut self) { - tokio::spawn(async move { - let exporter_otlp = LogExporter::builder() - .with_http() - .with_protocol(opentelemetry_otlp::Protocol::HttpBinary) - .with_endpoint("http://127.0.0.1:4318/v1/logs") - .build() - .expect("Failed to create log exporter"); - - let logger_provider = SdkLoggerProvider::builder() - .with_batch_exporter(exporter_otlp) - .with_resource(Resource::builder().with_service_name("fact").build()) - .build(); - let logger = logger_provider.logger("fact"); - - loop { - match self.rx.recv().await { - Ok(event) => { - let event = Arc::unwrap_or_clone(event).into(); - let mut record = logger.create_log_record(); - record.set_severity_number(Severity::Info); - if let AnyValue::Map(map) = event { - for (k, v) in *map { - record.add_attribute(k, v); - } - } - logger.emit(record); - } - Err(RecvError::Closed) => break, - Err(RecvError::Lagged(n)) => { - warn!("oTel stream lagged, dropped {n} events"); - } - } - } - - let _ = logger_provider.shutdown(); - }); - } -} From 3c2a9dd11b05b4e1057a21aef139126d72e16319 Mon Sep 17 00:00:00 2001 From: Mauro Ezequiel Moltrasio Date: Fri, 3 Jul 2026 16:32:45 +0200 Subject: [PATCH 3/4] feat(output): change resubscription method The new way for output components to resubscribe the broadcast channel distributing events is to send a oneshot sender back to the main task which will resubscribe from the broadcast sender and forward the receiver back. While this approach might seem overly complicated, it ensures there will be no lingering receivers laying around, which in turn allows for Arc::unwrap_or_clone to properly unwrap all messages when only one output is in use. --- fact/src/output/grpc.rs | 38 ++++++++++++++++++++------------------ fact/src/output/mod.rs | 23 ++++++++++++++--------- fact/src/output/otel.rs | 19 ++++++++++--------- fact/src/output/stdout.rs | 20 +++++++++----------- 4 files changed, 53 insertions(+), 47 deletions(-) diff --git a/fact/src/output/grpc.rs b/fact/src/output/grpc.rs index 969a15fd..af547ec1 100644 --- a/fact/src/output/grpc.rs +++ b/fact/src/output/grpc.rs @@ -9,7 +9,7 @@ use native_tls::{Certificate, Identity}; use openssl::{ec::EcKey, pkey::PKey}; use tokio::{ fs, - sync::{broadcast, watch}, + sync::{mpsc, oneshot, watch}, task::JoinHandle, time::sleep, }; @@ -21,8 +21,8 @@ use tonic::transport::Channel; use crate::{ config::{BackoffConfig, GrpcConfig}, - event::Event, metrics::EventCounter, + output::EventReceiver, }; struct Backoff { @@ -104,7 +104,7 @@ impl From<&BackoffConfig> for Backoff { } pub struct Client { - rx: broadcast::Receiver>, + subscriber: mpsc::Sender>, running: watch::Receiver, config: watch::Receiver, metrics: EventCounter, @@ -112,13 +112,13 @@ pub struct Client { impl Client { pub fn new( - rx: broadcast::Receiver>, + subscriber: mpsc::Sender>, running: watch::Receiver, metrics: EventCounter, config: watch::Receiver, ) -> Self { Client { - rx, + subscriber, running, config, metrics, @@ -230,19 +230,21 @@ impl Client { let mut client = FileActivityServiceClient::new(channel); let metrics = self.metrics.clone(); - let rx = - BroadcastStream::new(self.rx.resubscribe()).filter_map(move |event| match event { - Ok(event) => { - metrics.added(); - let event = Arc::unwrap_or_clone(event); - Some(event.into()) - } - Err(BroadcastStreamRecvError::Lagged(n)) => { - warn!("gRPC stream lagged, dropped {n} events"); - metrics.dropped_n(n); - None - } - }); + let (tx, rx) = oneshot::channel(); + self.subscriber.send(tx).await?; + let rx = rx.await?; + let rx = BroadcastStream::new(rx).filter_map(move |event| match event { + Ok(event) => { + metrics.added(); + let event = Arc::unwrap_or_clone(event); + Some(event.into()) + } + Err(BroadcastStreamRecvError::Lagged(n)) => { + warn!("gRPC stream lagged, dropped {n} events"); + metrics.dropped_n(n); + None + } + }); tokio::select! { res = client.communicate(rx) => { diff --git a/fact/src/output/mod.rs b/fact/src/output/mod.rs index 734c04f0..ecbc48d3 100644 --- a/fact/src/output/mod.rs +++ b/fact/src/output/mod.rs @@ -17,6 +17,8 @@ mod grpc; mod otel; mod stdout; +type EventReceiver = broadcast::Receiver>; + /// Starts all the output tasks. /// /// Each task is responsible for managing its lifetime, handling @@ -29,18 +31,19 @@ pub fn start( otel_config: watch::Receiver, stdout_enabled: bool, ) -> JoinHandle> { - let (broad_tx, broad_rx) = broadcast::channel(100); + let (broad_tx, _) = broadcast::channel(100); + let (subs_req, mut subs_rx) = mpsc::channel(10); let mut run = running.clone(); let grpc_client = grpc::Client::new( - broad_rx.resubscribe(), + subs_req.clone(), running.clone(), metrics.grpc.clone(), grpc_config, ); let otel_client = otel::Client::new( - broad_rx.resubscribe(), + subs_req.clone(), running.clone(), metrics.otel.clone(), otel_config, @@ -49,12 +52,7 @@ pub fn start( // JSON client will only start if explicitly enabled or no other // output is active at startup if (!grpc_client.is_enabled() && !otel_client.is_enabled()) || stdout_enabled { - stdout::Client::new( - broad_rx.resubscribe(), - running.clone(), - metrics.stdout.clone(), - ) - .start(); + stdout::Client::new(subs_req, running.clone(), metrics.stdout.clone()).start(); } let mut grpc_handle = grpc_client.start(); @@ -75,6 +73,13 @@ pub fn start( warn!("Failed to forward output event: {e}"); } } + req = subs_rx.recv() => { + let Some(req) = req else { continue; }; + let rx = broad_tx.subscribe(); + if let Err(e) = req.send(rx) { + break Err(anyhow::anyhow!("Failed to subscribe worker: {e:?}")); + } + } res = grpc_handle.borrow_mut() => { break flatten_task_result("gRPC", res); } diff --git a/fact/src/output/otel.rs b/fact/src/output/otel.rs index 6d1d4c31..91acfa56 100644 --- a/fact/src/output/otel.rs +++ b/fact/src/output/otel.rs @@ -7,17 +7,14 @@ use opentelemetry_otlp::{LogExporter, WithExportConfig}; use opentelemetry_sdk::Resource; use opentelemetry_sdk::logs::SdkLoggerProvider; use tokio::{ - sync::{ - broadcast::{self, error::RecvError}, - watch, - }, + sync::{broadcast::error::RecvError, mpsc, oneshot, watch}, task::JoinHandle, }; -use crate::{config::OTelConfig, event::Event, metrics::EventCounter}; +use crate::{config::OTelConfig, metrics::EventCounter, output::EventReceiver}; pub(super) struct Client { - rx: broadcast::Receiver>, + subscriber: mpsc::Sender>, running: watch::Receiver, config: watch::Receiver, metrics: EventCounter, @@ -25,13 +22,13 @@ pub(super) struct Client { impl Client { pub(super) fn new( - rx: broadcast::Receiver>, + subscriber: mpsc::Sender>, running: watch::Receiver, metrics: EventCounter, config: watch::Receiver, ) -> Self { Client { - rx, + subscriber, running, config, metrics, @@ -78,9 +75,13 @@ impl Client { .build(); let logger = logger_provider.logger("fact"); + let (tx, rx) = oneshot::channel(); + self.subscriber.send(tx).await?; + let mut rx = rx.await?; + let res = loop { tokio::select! { - event = self.rx.recv() => { + event = rx.recv() => { match event { Ok(event) => { self.metrics.added(); diff --git a/fact/src/output/stdout.rs b/fact/src/output/stdout.rs index debc57fd..7b23e82e 100644 --- a/fact/src/output/stdout.rs +++ b/fact/src/output/stdout.rs @@ -1,27 +1,22 @@ -use std::sync::Arc; - use log::{info, warn}; -use tokio::sync::{ - broadcast::{self, error::RecvError}, - watch, -}; +use tokio::sync::{broadcast::error::RecvError, mpsc, oneshot, watch}; -use crate::{event::Event, metrics::EventCounter}; +use crate::{metrics::EventCounter, output::EventReceiver}; pub struct Client { - rx: broadcast::Receiver>, + subscriber: mpsc::Sender>, running: watch::Receiver, metrics: EventCounter, } impl Client { pub fn new( - rx: broadcast::Receiver>, + subscriber: mpsc::Sender>, running: watch::Receiver, metrics: EventCounter, ) -> Self { Client { - rx, + subscriber, running, metrics, } @@ -29,9 +24,12 @@ impl Client { pub fn start(mut self) { tokio::spawn(async move { + let (tx, rx) = oneshot::channel(); + self.subscriber.send(tx).await.unwrap(); + let mut rx = rx.await.unwrap(); loop { tokio::select! { - event = self.rx.recv() => { + event = rx.recv() => { let event = match event { Ok(event) => event, Err(RecvError::Closed) => { From 3803d50b3cc80fb659a44289a31e73cf194a77fb Mon Sep 17 00:00:00 2001 From: Mauro Ezequiel Moltrasio Date: Fri, 3 Jul 2026 17:15:38 +0200 Subject: [PATCH 4/4] cleanup(otel): put otel behind a feature This allows regular fact builds to keep using just the gRPC and stodout outputs, no additional dependencies. --- fact/Cargo.toml | 7 +++--- fact/src/event/mod.rs | 12 +++++++++- fact/src/event/process.rs | 7 +++++- fact/src/output/grpc.rs | 8 +++---- fact/src/output/mod.rs | 48 +++++++++++++++++++++++---------------- fact/src/output/otel.rs | 8 +++---- 6 files changed, 58 insertions(+), 32 deletions(-) diff --git a/fact/Cargo.toml b/fact/Cargo.toml index 3501995d..4e649315 100644 --- a/fact/Cargo.toml +++ b/fact/Cargo.toml @@ -20,9 +20,9 @@ hyper-util = { workspace = true } libc = { workspace = true } log = { workspace = true } native-tls = { workspace = true } -opentelemetry = { version = "0.32.0" } -opentelemetry-otlp = { version = "0.32.0" } -opentelemetry_sdk = { version = "0.32.1", features = [ "logs"] } +opentelemetry = { version = "0.32.0", optional = true } +opentelemetry-otlp = { version = "0.32.0", optional = true } +opentelemetry_sdk = { version = "0.32.1", features = [ "logs"], optional = true } openssl = { workspace = true } tonic = { workspace = true } tokio = { workspace = true } @@ -54,3 +54,4 @@ path = "src/main.rs" [features] bpf-test = [] +otel = ["dep:opentelemetry", "dep:opentelemetry_sdk", "dep:opentelemetry-otlp"] diff --git a/fact/src/event/mod.rs b/fact/src/event/mod.rs index c863a14b..49a50229 100644 --- a/fact/src/event/mod.rs +++ b/fact/src/event/mod.rs @@ -1,13 +1,15 @@ +#[cfg(feature = "otel")] +use std::collections::HashMap; #[cfg(all(test, feature = "bpf-test"))] use std::time::{SystemTime, UNIX_EPOCH}; use std::{ - collections::HashMap, ffi::{CStr, OsStr}, os::{raw::c_char, unix::ffi::OsStrExt}, path::{Path, PathBuf}, }; use globset::GlobSet; +#[cfg(feature = "otel")] use opentelemetry::logs::AnyValue; use serde::Serialize; @@ -359,6 +361,7 @@ impl From for fact_api::FileActivity { } } +#[cfg(feature = "otel")] impl From for opentelemetry::logs::AnyValue { fn from(value: Event) -> Self { AnyValue::Map(Box::new(HashMap::from([ @@ -458,6 +461,7 @@ impl FileData { Ok(file) } + #[cfg(feature = "otel")] fn event_type(&self) -> &'static str { match self { FileData::Open(_) => "open", @@ -522,6 +526,7 @@ impl From for fact_api::file_activity::File { } } +#[cfg(feature = "otel")] impl From for opentelemetry::logs::AnyValue { fn from(value: FileData) -> Self { let event_type = value.event_type(); @@ -605,6 +610,7 @@ impl From for fact_api::FileActivityBase { } } +#[cfg(feature = "otel")] impl From for opentelemetry::logs::AnyValue { fn from(value: BaseFileData) -> Self { AnyValue::Map(Box::new(HashMap::from([ @@ -642,6 +648,7 @@ impl From for fact_api::FilePermissionChange { } } +#[cfg(feature = "otel")] impl From for opentelemetry::logs::AnyValue { fn from(value: ChmodFileData) -> Self { let AnyValue::Map(mut map) = value.inner.into() else { @@ -693,6 +700,7 @@ impl From for fact_api::FileOwnershipChange { } } +#[cfg(feature = "otel")] impl From for opentelemetry::logs::AnyValue { fn from(value: ChownFileData) -> Self { let AnyValue::Map(mut map) = value.inner.into() else { @@ -726,6 +734,7 @@ impl From for fact_api::FileRename { } } +#[cfg(feature = "otel")] impl From for opentelemetry::logs::AnyValue { fn from(value: RenameFileData) -> Self { let AnyValue::Map(mut map) = value.new.into() else { @@ -759,6 +768,7 @@ impl From for fact_api::FileXattrChange { } } +#[cfg(feature = "otel")] impl From for opentelemetry::logs::AnyValue { fn from(value: XattrFileData) -> Self { let AnyValue::Map(mut map) = value.inner.into() else { diff --git a/fact/src/event/process.rs b/fact/src/event/process.rs index 048b9a44..f18d4e42 100644 --- a/fact/src/event/process.rs +++ b/fact/src/event/process.rs @@ -1,6 +1,9 @@ -use std::{collections::HashMap, ffi::CStr, path::PathBuf}; +#[cfg(feature = "otel")] +use std::collections::HashMap; +use std::{ffi::CStr, path::PathBuf}; use fact_ebpf::{lineage_t, process_t}; +#[cfg(feature = "otel")] use opentelemetry::logs::AnyValue; use serde::Serialize; use uuid::Uuid; @@ -39,6 +42,7 @@ impl From for fact_api::process_signal::LineageInfo { } } +#[cfg(feature = "otel")] impl From for opentelemetry::logs::AnyValue { fn from(value: Lineage) -> Self { AnyValue::Map(Box::new(HashMap::from([ @@ -233,6 +237,7 @@ impl From for fact_api::ProcessSignal { } } +#[cfg(feature = "otel")] impl From for opentelemetry::logs::AnyValue { fn from(value: Process) -> Self { let args = value diff --git a/fact/src/output/grpc.rs b/fact/src/output/grpc.rs index af547ec1..b643b363 100644 --- a/fact/src/output/grpc.rs +++ b/fact/src/output/grpc.rs @@ -10,7 +10,7 @@ use openssl::{ec::EcKey, pkey::PKey}; use tokio::{ fs, sync::{mpsc, oneshot, watch}, - task::JoinHandle, + task::JoinSet, time::sleep, }; use tokio_stream::{ @@ -125,8 +125,8 @@ impl Client { } } - pub fn start(mut self) -> JoinHandle> { - tokio::spawn(async move { + pub fn start(mut self, set: &mut JoinSet>) { + set.spawn(async move { loop { let res = if self.is_enabled() { self.run().await @@ -144,7 +144,7 @@ impl Client { } } Ok(()) - }) + }); } async fn get_connector(&self) -> anyhow::Result>> { diff --git a/fact/src/output/mod.rs b/fact/src/output/mod.rs index ecbc48d3..373ff9e2 100644 --- a/fact/src/output/mod.rs +++ b/fact/src/output/mod.rs @@ -1,19 +1,19 @@ -use std::{borrow::BorrowMut, sync::Arc}; +use std::sync::Arc; use log::{debug, warn}; use tokio::{ sync::{broadcast, mpsc, watch}, - task::JoinHandle, + task::{JoinHandle, JoinSet}, }; use crate::{ config::{GrpcConfig, OTelConfig}, event::Event, - flatten_task_result, metrics::OutputMetrics, }; mod grpc; +#[cfg(feature = "otel")] mod otel; mod stdout; @@ -28,12 +28,14 @@ pub fn start( running: watch::Receiver, metrics: OutputMetrics, grpc_config: watch::Receiver, - otel_config: watch::Receiver, + #[allow(unused)] otel_config: watch::Receiver, stdout_enabled: bool, ) -> JoinHandle> { let (broad_tx, _) = broadcast::channel(100); let (subs_req, mut subs_rx) = mpsc::channel(10); let mut run = running.clone(); + let mut handles = JoinSet::new(); + let mut enabled_outputs = Vec::new(); let grpc_client = grpc::Client::new( subs_req.clone(), @@ -41,23 +43,27 @@ pub fn start( metrics.grpc.clone(), grpc_config, ); + enabled_outputs.push(grpc_client.is_enabled()); + grpc_client.start(&mut handles); - let otel_client = otel::Client::new( - subs_req.clone(), - running.clone(), - metrics.otel.clone(), - otel_config, - ); + #[cfg(feature = "otel")] + { + let otel_client = otel::Client::new( + subs_req.clone(), + running.clone(), + metrics.otel.clone(), + otel_config, + ); + enabled_outputs.push(otel_client.is_enabled()); + otel_client.start(&mut handles); + } // JSON client will only start if explicitly enabled or no other // output is active at startup - if (!grpc_client.is_enabled() && !otel_client.is_enabled()) || stdout_enabled { + if stdout_enabled || enabled_outputs.iter().all(|enabled| !enabled) { stdout::Client::new(subs_req, running.clone(), metrics.stdout.clone()).start(); } - let mut grpc_handle = grpc_client.start(); - let mut otel_handle = otel_client.start(); - tokio::spawn(async move { debug!("Starting output component..."); let res = loop { @@ -80,11 +86,15 @@ pub fn start( break Err(anyhow::anyhow!("Failed to subscribe worker: {e:?}")); } } - res = grpc_handle.borrow_mut() => { - break flatten_task_result("gRPC", res); - } - res = otel_handle.borrow_mut() => { - break flatten_task_result("oTel", res); + res = handles.join_next() => { + let Some(res) = res else { + unreachable!("output handles should always have a task"); + }; + match res { + Ok(Ok(_)) => break Ok(()), + Ok(Err(e)) => break Err(e), + Err(e) => break Err(e.into()), + } } _ = run.changed() => if !*run.borrow() { break Ok(()); } } diff --git a/fact/src/output/otel.rs b/fact/src/output/otel.rs index 91acfa56..8a33ab3b 100644 --- a/fact/src/output/otel.rs +++ b/fact/src/output/otel.rs @@ -8,7 +8,7 @@ use opentelemetry_sdk::Resource; use opentelemetry_sdk::logs::SdkLoggerProvider; use tokio::{ sync::{broadcast::error::RecvError, mpsc, oneshot, watch}, - task::JoinHandle, + task::JoinSet, }; use crate::{config::OTelConfig, metrics::EventCounter, output::EventReceiver}; @@ -35,8 +35,8 @@ impl Client { } } - pub(super) fn start(mut self) -> JoinHandle> { - tokio::spawn(async move { + pub(super) fn start(mut self, set: &mut JoinSet>) { + set.spawn(async move { loop { let res = if self.is_enabled() { self.run().await @@ -54,7 +54,7 @@ impl Client { } } Ok(()) - }) + }); } async fn run(&mut self) -> anyhow::Result {