Skip to content

add series hash in metrics ingestion, bounded streaming merge#1648

Open
nikhilsinhaparseable wants to merge 2 commits into
parseablehq:mainfrom
nikhilsinhaparseable:streaming-fix
Open

add series hash in metrics ingestion, bounded streaming merge#1648
nikhilsinhaparseable wants to merge 2 commits into
parseablehq:mainfrom
nikhilsinhaparseable:streaming-fix

Conversation

@nikhilsinhaparseable
Copy link
Copy Markdown
Contributor

@nikhilsinhaparseable nikhilsinhaparseable commented May 20, 2026

Summary by CodeRabbit

  • New Features

    • Added deterministic per-series hashing for OTEL metrics to produce a stable series identifier.
  • Refactor

    • Improved merging of streaming query results from multiple partitions for more efficient result delivery.
  • Chores

    • Bumped dependency on rustc-hash (v2).

Review Change Stack

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented May 20, 2026

Walkthrough

This PR adds deterministic per-series hashing for OTEL metrics (FxHasher-based _series_hash) and refactors streaming query result merging to use a bounded Tokio mpsc channel with per-partition forwarder tasks and a merged ReceiverStream output.

Changes

Metrics Series Hashing and Query Streaming Updates

Layer / File(s) Summary
OTEL Series Hash Implementation
Cargo.toml, src/otel/metrics.rs
Adds rustc-hash v2 dependency. Implements compute_series_hash() using FxHasher to produce deterministic u64 identifiers by filtering known sample-level fields, sorting label pairs, and hashing metric name plus labels. Integrates hash computation into the resource/scope flattening pipeline, inserts _series_hash into flattened JSON output, updates OTEL_METRICS_KNOWN_FIELD_LIST to include _series_hash, and adds unit tests covering stability, insertion-order independence, sensitivity to label/metric-name changes, and invariance to sample-level field changes.
Query Result Streaming Refactor
src/query/mod.rs
Replaces futures select_all-based partition stream merging with a bounded Tokio mpsc channel approach. Simplifies BoxedBatchStream type alias to SendableRecordBatchStream. Spawns one task per partition to forward RecordBatches into the channel, wraps the receiver with ReceiverStream and RecordBatchStreamAdapter for merged output, and updates imports to remove select_all and add ReceiverStream and tracing::Instrument.

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Poem

🐇 I hash the metrics, neat and true,
With Fx's dance I bind each view.
Channels hum as batches flow,
Per-partition tasks make outputs grow.
A rabbit smiles at tidy rows.

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Description check ⚠️ Warning No pull request description was provided by the author. The description is required by the repository template and should include goal, solutions, key changes, and testing/documentation checkboxes. Add a comprehensive pull request description following the repository template, including the goal, rationale for changes, key implementation details, and confirm testing/documentation requirements.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The title accurately describes the main changes: adding series hash for metrics ingestion and implementing bounded streaming merge.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (2)
src/otel/metrics.rs (1)

89-91: 💤 Low value

Consider caching the known-fields set to avoid per-call allocation.

OTEL_METRICS_KNOWN_FIELD_LIST is a compile-time constant, but a new HashSet is built on every compute_series_hash invocation. Under high ingestion throughput (many data points per second), this repeated allocation adds overhead.

♻️ Suggested optimization using `LazyLock`
+use std::sync::LazyLock;
+
+static KNOWN_FIELDS: LazyLock<std::collections::HashSet<&'static str>> = LazyLock::new(|| {
+    OTEL_METRICS_KNOWN_FIELD_LIST.iter().copied().collect()
+});
+
 fn compute_series_hash(dp: &Map<String, Value>) -> u64 {
-    let known: std::collections::HashSet<&str> =
-        OTEL_METRICS_KNOWN_FIELD_LIST.iter().copied().collect();
     let mut label_pairs: Vec<(&str, String)> = dp
         .iter()
-        .filter(|(k, _)| !known.contains(k.as_str()))
+        .filter(|(k, _)| !KNOWN_FIELDS.contains(k.as_str()))
         .map(|(k, v)| {
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/otel/metrics.rs` around lines 89 - 91, The function compute_series_hash
repeatedly builds a HashSet from the compile-time OTEL_METRICS_KNOWN_FIELD_LIST
on every call; replace that per-call allocation by creating a cached,
lazily-initialized HashSet (e.g., via std::sync::OnceLock/OnceCell/LazyLock or
once_cell::sync::Lazy) and use the static cached set inside compute_series_hash
so the HashSet is constructed once and reused across invocations.
src/query/mod.rs (1)

322-333: ⚡ Quick win

Spawned task panics would be silently absorbed, potentially losing partition data.

The JoinHandle returned by tokio::spawn is dropped immediately, meaning if a task panics (e.g., due to an unexpected error in the stream), that partition's remaining data is silently lost with no indication to the consumer.

Consider either:

  1. Storing JoinHandles and awaiting them (complex, changes return semantics), or
  2. Using .abort_handle() and adding panic=abort logging, or
  3. At minimum, wrapping the async block body with catch_unwind to send an error through the channel before exiting.
Option: Catch panics and convert to DataFusion errors
 tokio::spawn(
     async move {
+        let result = std::panic::AssertUnwindSafe(async {
             let mut stream: SendableRecordBatchStream = Box::pin(wrapped);
             use futures::StreamExt;
             while let Some(batch) = stream.next().await {
                 if tx.send(batch).await.is_err() {
                     break;
                 }
             }
+        });
+        if let Err(e) = futures::FutureExt::catch_unwind(result).await {
+            let msg = e.downcast_ref::<&str>().map(|s| s.to_string())
+                .or_else(|| e.downcast_ref::<String>().cloned())
+                .unwrap_or_else(|| "partition task panicked".to_string());
+            let _ = tx.send(Err(datafusion::error::DataFusionError::Execution(msg))).await;
         }
     }
     .instrument(span),
 );
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Nitpick comments:
In `@src/otel/metrics.rs`:
- Around line 89-91: The function compute_series_hash repeatedly builds a
HashSet from the compile-time OTEL_METRICS_KNOWN_FIELD_LIST on every call;
replace that per-call allocation by creating a cached, lazily-initialized
HashSet (e.g., via std::sync::OnceLock/OnceCell/LazyLock or
once_cell::sync::Lazy) and use the static cached set inside compute_series_hash
so the HashSet is constructed once and reused across invocations.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: f5871794-e60a-465b-9cd2-470e7a2f13c7

📥 Commits

Reviewing files that changed from the base of the PR and between 1452b4a and 6f0eb2b.

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (3)
  • Cargo.toml
  • src/otel/metrics.rs
  • src/query/mod.rs

coderabbitai[bot]
coderabbitai Bot previously approved these changes May 20, 2026
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
src/otel/metrics.rs (1)

89-91: 💤 Low value

Consider caching the known-fields HashSet to avoid per-call allocation.

OTEL_METRICS_KNOWN_FIELD_LIST is a compile-time constant, but the HashSet is rebuilt on every compute_series_hash call. For high-throughput ingestion this adds allocation overhead per data point.

♻️ Proposed optimization using `std::sync::LazyLock`
+use std::sync::LazyLock;
+use std::collections::HashSet;

+static KNOWN_FIELDS: LazyLock<HashSet<&'static str>> = LazyLock::new(|| {
+    OTEL_METRICS_KNOWN_FIELD_LIST.iter().copied().collect()
+});
+
 fn compute_series_hash(dp: &Map<String, Value>) -> u64 {
-    let known: std::collections::HashSet<&str> =
-        OTEL_METRICS_KNOWN_FIELD_LIST.iter().copied().collect();
     let mut label_pairs: Vec<(&str, String)> = dp
         .iter()
-        .filter(|(k, _)| !known.contains(k.as_str()))
+        .filter(|(k, _)| !KNOWN_FIELDS.contains(k.as_str()))
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/otel/metrics.rs` around lines 89 - 91, compute_series_hash currently
rebuilds a HashSet from OTEL_METRICS_KNOWN_FIELD_LIST on every call; change this
to use a lazily-initialized static cached HashSet (e.g. std::sync::LazyLock or
once_cell::sync::Lazy) so the set is constructed once and reused; update
compute_series_hash to reference the static (e.g. KNOWN_FIELDS_SET) instead of
creating a local HashSet, keeping the rest of the hashing logic unchanged.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Nitpick comments:
In `@src/otel/metrics.rs`:
- Around line 89-91: compute_series_hash currently rebuilds a HashSet from
OTEL_METRICS_KNOWN_FIELD_LIST on every call; change this to use a
lazily-initialized static cached HashSet (e.g. std::sync::LazyLock or
once_cell::sync::Lazy) so the set is constructed once and reused; update
compute_series_hash to reference the static (e.g. KNOWN_FIELDS_SET) instead of
creating a local HashSet, keeping the rest of the hashing logic unchanged.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: b498a4b0-daf0-4a9d-ad17-143d62d561e0

📥 Commits

Reviewing files that changed from the base of the PR and between 6f0eb2b and c41db80.

📒 Files selected for processing (1)
  • src/otel/metrics.rs

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant