Skip to content

[AURON #2316] Support native scan for Paimon DSv2 (BatchScanExec) COW tables#2313

Open
zhuxiangyi wants to merge 2 commits into
apache:masterfrom
zhuxiangyi:feat/paimon-dsv2-native-scan
Open

[AURON #2316] Support native scan for Paimon DSv2 (BatchScanExec) COW tables#2313
zhuxiangyi wants to merge 2 commits into
apache:masterfrom
zhuxiangyi:feat/paimon-dsv2-native-scan

Conversation

@zhuxiangyi

Copy link
Copy Markdown

Which issue does this PR close?

N/A — extends the Paimon native scan support to DSv2 catalogs. Happy to file a tracking issue if preferred.

Rationale for this change

PaimonConvertProvider only matches HiveTableScanExec today, so Paimon tables registered through a DSv2 catalog (e.g. spark.sql.catalog.paimon = org.apache.paimon.spark.SparkCatalog, accessed as paimon.db.t) fall back to Spark execution even when the underlying layout is a raw-convertible COW Parquet/ORC dataset that Auron can read natively.

This PR adds a parallel DSv2 path that reuses the existing native file scan infrastructure (the same protobufs used by NativeIcebergTableScanExec / native FileSourceScan) so DSv2 Paimon scans get the same native acceleration as the Hive-registered path.

What changes are included in this PR?

  • PaimonScanSupport — entry point that:
    • detects Paimon DSv2 scans via class-name reflection (org.apache.paimon.spark.PaimonBaseScan / PaimonInputPartition), so the module does not pull a hard Maven dependency on a particular paimon-spark-* artifact at compile time;
    • resolves the underlying FileStoreTable, verifies COW + supported file format (Parquet/ORC) + supported column types;
    • collects DataSplits from input partitions and rejects splits where !rawConvertible() or deletion files are present (MOR/MOW with positional / equality deletes);
    • reconstructs per-split partition InternalRow via RowDataToObjectArrayConverter so partition columns land in Auron's (fileSchema ++ partitionSchema) layout.
  • NativePaimonV2TableScanExec — leaf native exec built on the existing NativeRDD / NativeSupports machinery. Mirrors NativeIcebergTableScanExec:
    • splits files by filesMaxPartitionBytes (both Parquet and ORC are splittable here);
    • dispatches to pb.ParquetScanExecNode / pb.OrcScanExecNode;
    • exposes numPartitions / numFiles driver metrics and forwards bytes_scanned / output_rows to TaskMetrics.inputMetrics;
    • returns EmptyNativeRDD when there are no partitions to read.
  • PaimonConvertProvider — adds a BatchScanExec branch dispatching to the new path. Reuses the existing spark.auron.enable.paimon.scan config so there is no new feature flag.

The Hive-registered Paimon path (NativePaimonTableScanExec) is untouched.

Are there any user-facing changes?

Yes — purely additive: when spark.auron.enable.paimon.scan=true (default), Paimon DSv2 catalog reads on COW Parquet/ORC tables that pass the safety checks above will now be executed natively. Tables that are MOR/MOW or use deletion files continue to fall back to Spark execution (logged at DEBUG with the reason).

No new configuration knobs are introduced.

How was this patch tested?

New integration suite AuronPaimonV2IntegrationSuite (extends SharedSparkSession, wires AuronSparkSessionExtension + PaimonSparkSessionExtensions, uses a temp-dir warehouse via the paimon DSv2 catalog) covers:

  • simple COW Parquet select — correctness + NativePaimonV2TableScan in executed plan;
  • column projection;
  • partitioned COW table with predicate (asserts native scan still applied);
  • ORC COW table (file.format=orc);
  • empty table (asserts EmptyNativeRDD path returns no rows);
  • driver metrics (numPartitions / numFiles) actually posted via SparkListenerDriverAccumUpdates;
  • spark.auron.enable.paimon.scan=false falls back to Spark;
  • MOR primary-key table falls back to Spark (rawConvertible check).

The existing Paimon CI workflow (.github/workflows/paimon.yml, matrix paimon-1.2 / spark-3.5 / scala-2.12) picks up the new suite automatically; paimon-spark-${shortSparkVersion}:${paimonVersion} is added as a test-scope dependency (uses the shaded paimon-spark-3.5:1.2.0 bundle published on Maven Central).

mvn test-compile -pl thirdparty/auron-paimon -am -Pscala-2.12 -Ppaimon-1.2 -Pspark-3.5 -Prelease builds cleanly with scalastyle passing.

Auron currently only matches Paimon tables exposed via HiveTableScanExec.
Paimon tables registered through DSv2 catalogs (e.g. `paimon.db.t`) are
planned as BatchScanExec and were skipped by the native converter.

This change adds a parallel DSv2 path:
- PaimonScanSupport: reflection-based detection of PaimonBaseScan /
  PaimonInputPartition so the integration does not require a hard Maven
  dependency on paimon-spark across Spark versions. Collects DataSplits,
  rejects MOR/MOW splits (rawConvertible=false or non-empty deletion
  files), and builds per-split partition values via
  RowDataToObjectArrayConverter.
- NativePaimonV2TableScanExec: leaf native exec mirroring the Iceberg
  pattern; supports Parquet/ORC, computes split sizes from
  filesMaxPartitionBytes, exposes numPartitions/numFiles driver metrics,
  and returns EmptyNativeRDD for empty inputs.
- PaimonConvertProvider: dispatches BatchScanExec to the new path,
  reusing spark.auron.enable.paimon.scan.

Integration tests (AuronPaimonV2IntegrationSuite) cover:
  simple COW select, projection, partitioned + predicate, ORC format,
  empty table, driver metrics propagation, disable-flag fallback, and
  MOR primary-key table fallback.

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR extends Auron’s native scan acceleration for Apache Paimon from the existing Hive-registered table path to DSv2 catalogs by detecting Paimon DSv2 BatchScanExec scans, building a native file-scan plan (Parquet/ORC COW only), and executing via a new native leaf scan exec. It also adds an integration suite covering DSv2 behavior.

Changes:

  • Add PaimonScanSupport to detect/plan DSv2 Paimon scans (COW + Parquet/ORC + supported types) and extract DataSplit/data files + partition rows.
  • Introduce NativePaimonV2TableScanExec to execute planned DSv2 scans using existing native file scan protobuf nodes and metrics wiring.
  • Extend PaimonConvertProvider to convert DSv2 BatchScanExec to the new native exec, and add DSv2 integration tests + test dependencies.

Reviewed changes

Copilot reviewed 6 out of 6 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
thirdparty/auron-paimon/src/test/scala/org/apache/auron/paimon/BaseAuronPaimonSuite.scala Adds shared Spark/Paimon DSv2 catalog test session configuration and warehouse cleanup.
thirdparty/auron-paimon/src/test/scala/org/apache/auron/paimon/AuronPaimonV2IntegrationSuite.scala Adds integration coverage for DSv2 Paimon native scans (projection/partitioning/ORC/metrics/fallbacks).
thirdparty/auron-paimon/src/main/scala/org/apache/spark/sql/hive/auron/paimon/PaimonConvertProvider.scala Adds DSv2 BatchScanExec conversion path using PaimonScanSupport and NativePaimonV2TableScanExec.
thirdparty/auron-paimon/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativePaimonV2TableScanExec.scala Implements the native DSv2 Paimon table scan exec using the existing native file scan infrastructure.
thirdparty/auron-paimon/src/main/scala/org/apache/spark/sql/auron/paimon/PaimonScanSupport.scala Adds DSv2 planning support: reflection-based Paimon scan detection, split extraction, and file/partition reconstruction.
thirdparty/auron-paimon/pom.xml Adds Paimon Spark bundle + Spark test-jars needed for the new DSv2 integration tests.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +74 to +78
// Project the read schema onto the (fileSchema ++ partitionSchema) layout used by the native scan.
private lazy val projection: Seq[Integer] = {
val combined = StructType(fileSchema.fields ++ partitionSchema.fields)
plan.readSchema.fields.map(f => Integer.valueOf(combined.fieldIndex(f.name)))
}
Comment on lines +87 to +92
test("paimon v2 native scan handles empty table") {
withTable("paimon.db.t_empty") {
sql("create table paimon.db.t_empty (id int, v string) using paimon")
val df = sql("select * from paimon.db.t_empty")
checkAnswer(df, Seq.empty)
}
Comment on lines +94 to +99
val partitions = inputPartitions(exec)
if (partitions.isEmpty) {
logDebug("Paimon scan planned with empty input partitions.")
return Some(
PaimonScanPlan(table, Seq.empty, fileFormat, readSchema, fileSchema, partitionSchema))
}

@merrily01 merrily01 left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the contribute. @zhuxiangyi Could you create an issue to track this and avoid the CI check failure?

@zhuxiangyi zhuxiangyi changed the title Support native scan for Paimon DSv2 (BatchScanExec) COW tables [AURON #2316] Support native scan for Paimon DSv2 (BatchScanExec) COW tables Jun 8, 2026
@zhuxiangyi

Copy link
Copy Markdown
Author

@merrily01 Filed tracking issue #2316 and renamed the PR title to [AURON #2316] … to satisfy the title check. The new pr-title-check run is in action_required state since this PR is from a fork — could you help approve it? Thanks!

- NativePaimonV2TableScanExec: resolve projection field index via
  SQLConf.caseSensitiveAnalysis (mirrors NativeIcebergTableScanExec)
  and derive projection from output attributes, fixing case-insensitive
  analysis edge case.
- PaimonScanSupport.inputPartitions: distinguish "no partitions" from
  "failed to obtain partitions"; planning failure now falls back to
  Spark instead of silently returning a zero-row scan.
- AuronPaimonV2IntegrationSuite: empty-table test now asserts the
  native scan is applied so regressions to Spark fallback are caught.
@zhuxiangyi

Copy link
Copy Markdown
Author

Pushed a follow-up commit addressing the three Copilot review comments:

  1. NativePaimonV2TableScanExec — projection field-index lookup now respects SQLConf.caseSensitiveAnalysis (mirrors NativeIcebergTableScanExec) and is derived from output attributes, so case-insensitive analysis no longer breaks the projection.
  2. PaimonScanSupport.inputPartitions — return type changed to Option[Seq[InputPartition]] so plan can distinguish a genuinely empty table (Some(Seq.empty) -> empty native scan) from a planning failure (None -> fall back to Spark). No more silent zero-row scans on failure.
  3. AuronPaimonV2IntegrationSuite — empty-table test now asserts NativePaimonV2TableScan is in the executed plan.

@zhuxiangyi zhuxiangyi requested a review from merrily01 June 8, 2026 14:49
@merrily01

Copy link
Copy Markdown
Member

@merrily01 Filed tracking issue #2316 and renamed the PR title to [AURON #2316] … to satisfy the title check. The new pr-title-check run is in action_required state since this PR is from a fork — could you help approve it? Thanks!

Thank you, the CI has been re-triggered.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants