Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 67 additions & 32 deletions docs/en/antalya/partition_export.md
Original file line number Diff line number Diff line change
Expand Up @@ -158,41 +158,49 @@ Query id: 9efc271a-a501-44d1-834f-bc4d20156164

Row 1:
──────
source_database: default
source_table: replicated_source
destination_database: default
destination_table: replicated_destination
create_time: 2025-11-21 18:21:51
partition_id: 2022
transaction_id: 7397746091717128192
source_replica: r1
parts: ['2022_0_0_0','2022_1_1_0','2022_2_2_0']
parts_count: 3
parts_to_do: 0
status: COMPLETED
exception_replica:
last_exception:
exception_part:
exception_count: 0
source_database: default
source_table: replicated_source
destination_database: default
destination_table: s3_destination
create_time: 2025-11-21 18:21:51
partition_id: 2022
transaction_id: 7397746091717128192
query_id: 3fa3c8d3-7d6b-4f8b-9aa2-2c1f1ad0a111
source_replica: r1
parts: ['2022_0_0_0','2022_1_1_0','2022_2_2_0']
parts_count: 3
parts_to_do: 0
status: COMPLETED
last_exception_per_replica: []
exception_count: 0
destination_file_paths: {'2022_0_0_0':['s3://bucket/db/t/year=2022/2022_0_0_0_<hash>.parquet'],'2022_1_1_0':['s3://bucket/db/t/year=2022/2022_1_1_0_<hash>.parquet'],'2022_2_2_0':['s3://bucket/db/t/year=2022/2022_2_2_0_<hash>.parquet']}
committed_metadata_file:
committed_manifest_list:
committed_manifest_file:
committed_marker_file: s3://bucket/db/t/commit_2022_7397746091717128192

Row 2:
──────
source_database: default
source_table: replicated_source
destination_database: default
destination_table: replicated_destination
create_time: 2025-11-21 18:20:35
partition_id: 2021
transaction_id: 7397745772618674176
source_replica: r1
parts: ['2021_0_0_0']
parts_count: 1
parts_to_do: 0
status: COMPLETED
exception_replica:
last_exception:
exception_part:
exception_count: 0
source_database: default
source_table: replicated_source
destination_database: default
destination_table: iceberg_destination
create_time: 2025-11-21 18:20:35
partition_id: 2021
transaction_id: 7397745772618674176
query_id: 1c8e0fd0-6a3a-4d6e-9bd6-bdf64adfe118
source_replica: r2
parts: ['2021_0_0_0']
parts_count: 1
parts_to_do: 0
status: COMPLETED
last_exception_per_replica: [('r1','Code: 999. Coordination::Exception: Session expired','2021_0_0_0','2025-11-21 18:20:42',1)]
exception_count: 1
destination_file_paths: {'2021_0_0_0':['s3://lake/db/t/data/year=2021/2021_0_0_0_<hash>.parquet']}
committed_metadata_file: s3://lake/db/t/metadata/v3.metadata.json
committed_manifest_list: s3://lake/db/t/metadata/snap-7397745772618674176-1-<uuid>.avro
committed_manifest_file: s3://lake/db/t/metadata/<uuid>-m0.avro
committed_marker_file:

2 rows in set. Elapsed: 0.019 sec.

Expand All @@ -205,6 +213,33 @@ Status values include:
- `FAILED` - Export failed
- `KILLED` - Export was cancelled

### Exception columns

- `last_exception_per_replica` is an `Array(Tuple(replica String, message String, part String, time DateTime, count UInt64))`. Each tuple is the most recent exception observed by a single replica plus a best-effort within-replica `count`. Replicas that have never reported an exception are omitted.
- `exception_count` is the sum of every `count` in `last_exception_per_replica`. Each replica owns its own counter, so cross-replica updates do not race; the sum is exact w.r.t. the snapshot returned. Within a single replica concurrent failing writers may under-count by one.

### Per-part destination file paths

- `destination_file_paths` is a `Map(String, Array(String))` keyed by source part name. Each value is the list of file paths written to the destination object storage when that part was exported (a single part can produce multiple files depending on `max_bytes` / `max_rows`). The map is rebuilt from ZooKeeper on every poll, so it grows as parts complete during `PENDING` and becomes the full picture once the task reaches `COMPLETED`.

### Commit info columns

These columns surface paths produced by the destination storage during commit, so it is possible to inspect what was written without consulting the destination directly:

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Description for destination_file_paths column is missing.

- `committed_metadata_file` — for Iceberg destinations: path of the new `vN.metadata.json` written by the commit. Empty for non-Iceberg destinations and before the commit lands. If the commit was already finished by a previous run (detected via the transaction id stored in the snapshot summary), this column carries a human-readable sentinel string instead of a path because the original committer's paths are not recoverable from inside the impl.
- `committed_manifest_list` — for Iceberg destinations: path of the manifest list file (`snap-*.avro`) referenced by the new snapshot. Empty under the same conditions as `committed_metadata_file`.
- `committed_manifest_file` — for Iceberg destinations: path of the manifest file referenced by `committed_manifest_list`. Empty under the same conditions as `committed_metadata_file`.
- `committed_marker_file` — for plain object storage destinations: path of the per-transaction commit marker file written by the destination. Empty for Iceberg destinations and for tasks that have not committed yet.

To pick the latest exception across replicas:

```sql
SELECT
arraySort(x -> -x.time, last_exception_per_replica)[1] AS latest_exception
FROM system.replicated_partition_exports
WHERE source_table = 'rmt_table' AND destination_table = 's3_table';
```

## Related Features

- [ALTER TABLE EXPORT PART](/docs/en/engines/table-engines/mergetree-family/part_export.md) - Export individual parts (non-replicated)
Expand Down
76 changes: 76 additions & 0 deletions src/Storages/ExportReplicatedMergeTreePartitionManifest.h
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,82 @@ struct ExportReplicatedMergeTreePartitionProcessedPartEntry
}
};

/// Per-task "commit info" record persisted at <export-entry>/commit_info.
///
/// Written exactly once, atomically with the status -> COMPLETED transition
/// (see ExportPartitionUtils::commit). Captures the metadata-layer file paths
/// produced by the destination storage during commit so they can be surfaced in
/// system.replicated_partition_exports for debugging.
///
/// All Iceberg fields are empty for non-Iceberg destinations. They may also be
/// empty for an Iceberg destination if the committing replica crashed between
/// writing the object-storage files and writing this znode; in that case the
/// task still transitions to COMPLETED via the recovery path but commit_info
/// remains absent. This is best-effort observability and acceptable.
struct ExportReplicatedMergeTreePartitionCommitInfoEntry
{
/// Iceberg: path (in destination object storage) of the new vN.metadata.json
/// written by the commit.
String iceberg_metadata_file;

/// Iceberg: path of the snap-<id>-<format_version>-<uuid>.avro manifest list
/// referenced by the new snapshot.
String iceberg_manifest_list;

/// Iceberg: path of the manifest entry file (*.avro) referenced by the
/// manifest list.
String iceberg_manifest_file;

/// Plain object storage: path of the commit marker file written by
/// StorageObjectStorage::commitExportPartitionTransaction. Empty for Iceberg.
String commit_marker_file;

bool empty() const
{
return iceberg_metadata_file.empty()
&& iceberg_manifest_list.empty()
&& iceberg_manifest_file.empty()
&& commit_marker_file.empty();
}

std::string toJsonString() const
{
Poco::JSON::Object json;
json.set("iceberg_metadata_file", iceberg_metadata_file);
json.set("iceberg_manifest_list", iceberg_manifest_list);
json.set("iceberg_manifest_file", iceberg_manifest_file);
json.set("commit_marker_file", commit_marker_file);

std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
oss.exceptions(std::ios::failbit);
Poco::JSON::Stringifier::stringify(json, oss);
return oss.str();
}

static ExportReplicatedMergeTreePartitionCommitInfoEntry fromJsonString(const std::string & json_string)
{
ExportReplicatedMergeTreePartitionCommitInfoEntry entry;
if (json_string.empty())
return entry;

Poco::JSON::Parser parser;
auto json = parser.parse(json_string).extract<Poco::JSON::Object::Ptr>();

if (json->has("iceberg_metadata_file"))
entry.iceberg_metadata_file = json->getValue<String>("iceberg_metadata_file");
if (json->has("iceberg_manifest_list"))
entry.iceberg_manifest_list = json->getValue<String>("iceberg_manifest_list");

if (json->has("iceberg_manifest_file"))
entry.iceberg_manifest_file = json->getValue<String>("iceberg_manifest_file");

if (json->has("commit_marker_file"))
entry.commit_marker_file = json->getValue<String>("commit_marker_file");

return entry;
}
};

struct ExportReplicatedMergeTreePartitionManifest
{
String transaction_id;
Expand Down
15 changes: 15 additions & 0 deletions src/Storages/ExportReplicatedMergeTreePartitionTaskEntry.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once

#include <map>
#include <optional>
#include <Storages/ExportReplicatedMergeTreePartitionManifest.h>
#include <Storages/MergeTree/IMergeTreeDataPart.h>
#include "Core/QualifiedTableName.h"
Expand Down Expand Up @@ -40,6 +41,20 @@ struct ExportReplicatedMergeTreePartitionTaskEntry
/// An empty map means no replica has recorded an exception yet for this task.
mutable std::map<String, LastExceptionEntry> last_exception_per_replica;

/// In-memory mirror of <export-entry>/processed/<part> leaves in ZK, keyed by
/// part name. Each value is the list of destination file paths produced by the
/// per-part export (typically Parquet object-storage keys). Refreshed on every
/// poll() cycle and on status-change handler invocations; served verbatim to
/// system.replicated_partition_exports without any extra ZK read at query time.
/// An empty map means no part has finished exporting yet for this task.
mutable std::map<String, std::vector<String>> destination_file_paths_per_part;

/// In-memory mirror of the <export-entry>/commit_info znode (written atomically
/// with the COMPLETED status transition; see ExportPartitionUtils::commit).
/// nullopt until commit_info is observed in ZK. Empty fields inside the struct
/// for non-Iceberg destinations.
mutable std::optional<ExportReplicatedMergeTreePartitionCommitInfoEntry> commit_info;

std::string getCompositeKey() const
{
const auto qualified_table_name = QualifiedTableName {manifest.destination_database, manifest.destination_table};
Expand Down
25 changes: 24 additions & 1 deletion src/Storages/IStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -511,7 +511,30 @@ It is currently only implemented in StorageObjectStorage.
std::vector<Field> partition_values;
};

virtual void commitExportPartitionTransaction(
/// Paths produced by the destination storage during commit. Surfaced via
/// system.replicated_partition_exports for debugging. Empty for commits
/// that short-circuit on idempotency.
struct ExportPartitionCommitInfo
{
/// Iceberg destinations only.
String iceberg_metadata_file;
String iceberg_manifest_list;
String iceberg_manifest_file;

/// Plain object storage destinations only: path of the commit marker file
/// written/observed by StorageObjectStorage::commitExportPartitionTransaction.
String commit_marker_file;

bool empty() const
{
return iceberg_metadata_file.empty()
&& iceberg_manifest_list.empty()
&& iceberg_manifest_file.empty()
&& commit_marker_file.empty();
}
};

virtual ExportPartitionCommitInfo commitExportPartitionTransaction(
const String & /* transaction_id */,
const String & /* partition_id */,
const Strings & /* exported_paths */,
Expand Down
Loading
Loading