add series hash in metrics ingestion, bounded streaming merge#1648
add series hash in metrics ingestion, bounded streaming merge#1648nikhilsinhaparseable wants to merge 2 commits into
Conversation
WalkthroughThis PR adds deterministic per-series hashing for OTEL metrics (FxHasher-based _series_hash) and refactors streaming query result merging to use a bounded Tokio mpsc channel with per-partition forwarder tasks and a merged ReceiverStream output. ChangesMetrics Series Hashing and Query Streaming Updates
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
🧹 Nitpick comments (2)
src/otel/metrics.rs (1)
89-91: 💤 Low valueConsider caching the known-fields set to avoid per-call allocation.
OTEL_METRICS_KNOWN_FIELD_LISTis a compile-time constant, but a newHashSetis built on everycompute_series_hashinvocation. Under high ingestion throughput (many data points per second), this repeated allocation adds overhead.♻️ Suggested optimization using `LazyLock`
+use std::sync::LazyLock; + +static KNOWN_FIELDS: LazyLock<std::collections::HashSet<&'static str>> = LazyLock::new(|| { + OTEL_METRICS_KNOWN_FIELD_LIST.iter().copied().collect() +}); + fn compute_series_hash(dp: &Map<String, Value>) -> u64 { - let known: std::collections::HashSet<&str> = - OTEL_METRICS_KNOWN_FIELD_LIST.iter().copied().collect(); let mut label_pairs: Vec<(&str, String)> = dp .iter() - .filter(|(k, _)| !known.contains(k.as_str())) + .filter(|(k, _)| !KNOWN_FIELDS.contains(k.as_str())) .map(|(k, v)| {🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/otel/metrics.rs` around lines 89 - 91, The function compute_series_hash repeatedly builds a HashSet from the compile-time OTEL_METRICS_KNOWN_FIELD_LIST on every call; replace that per-call allocation by creating a cached, lazily-initialized HashSet (e.g., via std::sync::OnceLock/OnceCell/LazyLock or once_cell::sync::Lazy) and use the static cached set inside compute_series_hash so the HashSet is constructed once and reused across invocations.src/query/mod.rs (1)
322-333: ⚡ Quick winSpawned task panics would be silently absorbed, potentially losing partition data.
The
JoinHandlereturned bytokio::spawnis dropped immediately, meaning if a task panics (e.g., due to an unexpected error in the stream), that partition's remaining data is silently lost with no indication to the consumer.Consider either:
- Storing
JoinHandles and awaiting them (complex, changes return semantics), or- Using
.abort_handle()and adding panic=abort logging, or- At minimum, wrapping the async block body with
catch_unwindto send an error through the channel before exiting.Option: Catch panics and convert to DataFusion errors
tokio::spawn( async move { + let result = std::panic::AssertUnwindSafe(async { let mut stream: SendableRecordBatchStream = Box::pin(wrapped); use futures::StreamExt; while let Some(batch) = stream.next().await { if tx.send(batch).await.is_err() { break; } } + }); + if let Err(e) = futures::FutureExt::catch_unwind(result).await { + let msg = e.downcast_ref::<&str>().map(|s| s.to_string()) + .or_else(|| e.downcast_ref::<String>().cloned()) + .unwrap_or_else(|| "partition task panicked".to_string()); + let _ = tx.send(Err(datafusion::error::DataFusionError::Execution(msg))).await; } } .instrument(span), );
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Nitpick comments:
In `@src/otel/metrics.rs`:
- Around line 89-91: The function compute_series_hash repeatedly builds a
HashSet from the compile-time OTEL_METRICS_KNOWN_FIELD_LIST on every call;
replace that per-call allocation by creating a cached, lazily-initialized
HashSet (e.g., via std::sync::OnceLock/OnceCell/LazyLock or
once_cell::sync::Lazy) and use the static cached set inside compute_series_hash
so the HashSet is constructed once and reused across invocations.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: f5871794-e60a-465b-9cd2-470e7a2f13c7
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (3)
Cargo.tomlsrc/otel/metrics.rssrc/query/mod.rs
There was a problem hiding this comment.
🧹 Nitpick comments (1)
src/otel/metrics.rs (1)
89-91: 💤 Low valueConsider caching the known-fields HashSet to avoid per-call allocation.
OTEL_METRICS_KNOWN_FIELD_LISTis a compile-time constant, but theHashSetis rebuilt on everycompute_series_hashcall. For high-throughput ingestion this adds allocation overhead per data point.♻️ Proposed optimization using `std::sync::LazyLock`
+use std::sync::LazyLock; +use std::collections::HashSet; +static KNOWN_FIELDS: LazyLock<HashSet<&'static str>> = LazyLock::new(|| { + OTEL_METRICS_KNOWN_FIELD_LIST.iter().copied().collect() +}); + fn compute_series_hash(dp: &Map<String, Value>) -> u64 { - let known: std::collections::HashSet<&str> = - OTEL_METRICS_KNOWN_FIELD_LIST.iter().copied().collect(); let mut label_pairs: Vec<(&str, String)> = dp .iter() - .filter(|(k, _)| !known.contains(k.as_str())) + .filter(|(k, _)| !KNOWN_FIELDS.contains(k.as_str()))🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/otel/metrics.rs` around lines 89 - 91, compute_series_hash currently rebuilds a HashSet from OTEL_METRICS_KNOWN_FIELD_LIST on every call; change this to use a lazily-initialized static cached HashSet (e.g. std::sync::LazyLock or once_cell::sync::Lazy) so the set is constructed once and reused; update compute_series_hash to reference the static (e.g. KNOWN_FIELDS_SET) instead of creating a local HashSet, keeping the rest of the hashing logic unchanged.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Nitpick comments:
In `@src/otel/metrics.rs`:
- Around line 89-91: compute_series_hash currently rebuilds a HashSet from
OTEL_METRICS_KNOWN_FIELD_LIST on every call; change this to use a
lazily-initialized static cached HashSet (e.g. std::sync::LazyLock or
once_cell::sync::Lazy) so the set is constructed once and reused; update
compute_series_hash to reference the static (e.g. KNOWN_FIELDS_SET) instead of
creating a local HashSet, keeping the rest of the hashing logic unchanged.
Summary by CodeRabbit
New Features
Refactor
Chores