From 0a9022d641b9748fcf9bfdedfffbc1c97639346b Mon Sep 17 00:00:00 2001 From: Arthur Passos Date: Mon, 25 May 2026 14:56:43 -0300 Subject: [PATCH 1/7] add commit info to partition exports table --- ...portReplicatedMergeTreePartitionManifest.h | 76 +++++++++ ...ortReplicatedMergeTreePartitionTaskEntry.h | 15 ++ src/Storages/IStorage.h | 25 ++- .../ExportPartitionManifestUpdatingTask.cpp | 158 +++++++++++++++++- .../ExportPartitionManifestUpdatingTask.h | 2 + .../MergeTree/ExportPartitionUtils.cpp | 57 ++++++- .../tests/gtest_export_partition_ordering.cpp | 6 +- .../DataLakes/IDataLakeMetadata.h | 3 +- .../DataLakes/Iceberg/IcebergMetadata.cpp | 50 ++++-- .../DataLakes/Iceberg/IcebergMetadata.h | 9 +- .../ObjectStorage/StorageObjectStorage.cpp | 13 +- .../ObjectStorage/StorageObjectStorage.h | 2 +- .../StorageObjectStorageCluster.cpp | 7 +- .../StorageObjectStorageCluster.h | 2 +- ...torageSystemReplicatedPartitionExports.cpp | 30 ++++ .../StorageSystemReplicatedPartitionExports.h | 13 ++ .../test.py | 15 ++ .../test.py | 16 ++ 18 files changed, 464 insertions(+), 35 deletions(-) diff --git a/src/Storages/ExportReplicatedMergeTreePartitionManifest.h b/src/Storages/ExportReplicatedMergeTreePartitionManifest.h index acfabc28ca61..dea68c833fa1 100644 --- a/src/Storages/ExportReplicatedMergeTreePartitionManifest.h +++ b/src/Storages/ExportReplicatedMergeTreePartitionManifest.h @@ -152,6 +152,82 @@ struct ExportReplicatedMergeTreePartitionProcessedPartEntry } }; +/// Per-task "commit info" record persisted at /commit_info. +/// +/// Written exactly once, atomically with the status -> COMPLETED transition +/// (see ExportPartitionUtils::commit). Captures the metadata-layer file paths +/// produced by the destination storage during commit so they can be surfaced in +/// system.replicated_partition_exports for debugging. +/// +/// All Iceberg fields are empty for non-Iceberg destinations. They may also be +/// empty for an Iceberg destination if the committing replica crashed between +/// writing the object-storage files and writing this znode; in that case the +/// task still transitions to COMPLETED via the recovery path but commit_info +/// remains absent. This is best-effort observability and acceptable. +struct ExportReplicatedMergeTreePartitionCommitInfoEntry +{ + /// Iceberg: path (in destination object storage) of the new vN.metadata.json + /// written by the commit. + String iceberg_metadata_file; + + /// Iceberg: path of the snap---.avro manifest list + /// referenced by the new snapshot. + String iceberg_manifest_list; + + /// Iceberg: path of the manifest entry file (*.avro) referenced by the + /// manifest list. + String iceberg_manifest_file; + + /// Plain object storage: path of the commit marker file written by + /// StorageObjectStorage::commitExportPartitionTransaction. Empty for Iceberg. + String commit_marker_file; + + bool empty() const + { + return iceberg_metadata_file.empty() + && iceberg_manifest_list.empty() + && iceberg_manifest_file.empty() + && commit_marker_file.empty(); + } + + std::string toJsonString() const + { + Poco::JSON::Object json; + json.set("iceberg_metadata_file", iceberg_metadata_file); + json.set("iceberg_manifest_list", iceberg_manifest_list); + json.set("iceberg_manifest_file", iceberg_manifest_file); + json.set("commit_marker_file", commit_marker_file); + + std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM + oss.exceptions(std::ios::failbit); + Poco::JSON::Stringifier::stringify(json, oss); + return oss.str(); + } + + static ExportReplicatedMergeTreePartitionCommitInfoEntry fromJsonString(const std::string & json_string) + { + ExportReplicatedMergeTreePartitionCommitInfoEntry entry; + if (json_string.empty()) + return entry; + + Poco::JSON::Parser parser; + auto json = parser.parse(json_string).extract(); + + if (json->has("iceberg_metadata_file")) + entry.iceberg_metadata_file = json->getValue("iceberg_metadata_file"); + if (json->has("iceberg_manifest_list")) + entry.iceberg_manifest_list = json->getValue("iceberg_manifest_list"); + + if (json->has("iceberg_manifest_file")) + entry.iceberg_manifest_file = json->getValue("iceberg_manifest_file"); + + if (json->has("commit_marker_file")) + entry.commit_marker_file = json->getValue("commit_marker_file"); + + return entry; + } +}; + struct ExportReplicatedMergeTreePartitionManifest { String transaction_id; diff --git a/src/Storages/ExportReplicatedMergeTreePartitionTaskEntry.h b/src/Storages/ExportReplicatedMergeTreePartitionTaskEntry.h index 8af873e0b89c..bf7b3bdfa6a4 100644 --- a/src/Storages/ExportReplicatedMergeTreePartitionTaskEntry.h +++ b/src/Storages/ExportReplicatedMergeTreePartitionTaskEntry.h @@ -1,6 +1,7 @@ #pragma once #include +#include #include #include #include "Core/QualifiedTableName.h" @@ -40,6 +41,20 @@ struct ExportReplicatedMergeTreePartitionTaskEntry /// An empty map means no replica has recorded an exception yet for this task. mutable std::map last_exception_per_replica; + /// In-memory mirror of /processed/ leaves in ZK, keyed by + /// part name. Each value is the list of destination file paths produced by the + /// per-part export (typically Parquet object-storage keys). Refreshed on every + /// poll() cycle and on status-change handler invocations; served verbatim to + /// system.replicated_partition_exports without any extra ZK read at query time. + /// An empty map means no part has finished exporting yet for this task. + mutable std::map> destination_file_paths_per_part; + + /// In-memory mirror of the /commit_info znode (written atomically + /// with the COMPLETED status transition; see ExportPartitionUtils::commit). + /// nullopt until commit_info is observed in ZK. Empty fields inside the struct + /// for non-Iceberg destinations. + mutable std::optional commit_info; + std::string getCompositeKey() const { const auto qualified_table_name = QualifiedTableName {manifest.destination_database, manifest.destination_table}; diff --git a/src/Storages/IStorage.h b/src/Storages/IStorage.h index 0a07b892cef8..35732dd6e277 100644 --- a/src/Storages/IStorage.h +++ b/src/Storages/IStorage.h @@ -511,7 +511,30 @@ It is currently only implemented in StorageObjectStorage. std::vector partition_values; }; - virtual void commitExportPartitionTransaction( + /// Paths produced by the destination storage during commit. Surfaced via + /// system.replicated_partition_exports for debugging. Empty for commits + /// that short-circuit on idempotency. + struct ExportPartitionCommitInfo + { + /// Iceberg destinations only. + String iceberg_metadata_file; + String iceberg_manifest_list; + String iceberg_manifest_file; + + /// Plain object storage destinations only: path of the commit marker file + /// written/observed by StorageObjectStorage::commitExportPartitionTransaction. + String commit_marker_file; + + bool empty() const + { + return iceberg_metadata_file.empty() + && iceberg_manifest_list.empty() + && iceberg_manifest_file.empty() + && commit_marker_file.empty(); + } + }; + + virtual ExportPartitionCommitInfo commitExportPartitionTransaction( const String & /* transaction_id */, const String & /* partition_id */, const Strings & /* exported_paths */, diff --git a/src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.cpp b/src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.cpp index 106dff071188..197260aaf1b2 100644 --- a/src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.cpp +++ b/src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.cpp @@ -117,6 +117,107 @@ namespace return out; } + /// Fetch all per-part processed leaves under /processed and build a + /// fresh map keyed by part_name. Returns the destination file paths recorded for + /// each finished part. + /// + /// Same lenient semantics as readLastExceptionPerReplica: an empty result means + /// "nothing actionable" (transient ZK error, no children yet, or all leaves + /// concurrently removed) and callers MUST skip the assignment to preserve the + /// in-memory mirror across glitches. Safe because processed/ leaves are + /// written once on per-part success and never rewritten — the entire entry path + /// is wiped recursively at task cleanup, handled separately. + std::map> readDestinationFilePathsPerPart( + const zkutil::ZooKeeperPtr & zk, + const std::filesystem::path & entry_path, + const std::string & log_key, + const LoggerPtr & log) + { + std::map> out; + + const auto container_path = entry_path / "processed"; + + Strings children; + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperGetChildren); + if (Coordination::Error::ZOK != zk->tryGetChildren(container_path, children)) + { + LOG_INFO(log, "ExportPartition Manifest Updating Task: failed to list processed leaves for {}, leaving in-memory copy untouched", log_key); + return out; + } + + if (children.empty()) + return out; + + std::vector paths; + paths.reserve(children.size()); + for (const auto & child : children) + paths.emplace_back(container_path / child); + + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperGet, paths.size()); + auto responses = zk->tryGet(paths); + responses.waitForResponses(); + + for (size_t i = 0; i < paths.size(); ++i) + { + Coordination::GetResponse response; + try + { + response = responses[i]; + } + catch (...) + { + LOG_WARNING(log, "ExportPartition Manifest Updating Task: ZK error fetching processed leaf {} for {}, skipping", children[i], log_key); + continue; + } + + if (response.error != Coordination::Error::ZOK) + continue; + + try + { + auto entry = ExportReplicatedMergeTreePartitionProcessedPartEntry::fromJsonString(response.data); + out.emplace(std::move(entry.part_name), std::move(entry.paths_in_destination)); + } + catch (...) + { + LOG_WARNING(log, "ExportPartition Manifest Updating Task: malformed processed JSON for {} (leaf {}), ignoring", log_key, children[i]); + } + } + + return out; + } + + /// Read the optional /commit_info znode and return the parsed entry. + /// Returns nullopt when the znode is absent (task has not committed yet, peer + /// crashed before writing it, or transient ZK error). Callers should treat + /// nullopt as "leave the in-memory copy untouched". + std::optional readCommitInfo( + const zkutil::ZooKeeperPtr & zk, + const std::filesystem::path & entry_path, + const std::string & log_key, + const LoggerPtr & log) + { + const auto commit_info_path = entry_path / "commit_info"; + + std::string data; + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperGet); + if (!zk->tryGet(commit_info_path, data)) + return std::nullopt; + + try + { + return ExportReplicatedMergeTreePartitionCommitInfoEntry::fromJsonString(data); + } + catch (...) + { + LOG_WARNING(log, "ExportPartition Manifest Updating Task: malformed commit_info JSON for {}, ignoring", log_key); + return std::nullopt; + } + } + /* Remove expired entries and fix non-committed exports that have already exported all parts. @@ -331,6 +432,16 @@ std::vector ExportPartitionManifestUpdatingTask:: } info.exception_count = total_exception_count; + info.destination_file_paths_per_part = entry.destination_file_paths_per_part; + + if (entry.commit_info) + { + info.committed_metadata_file = entry.commit_info->iceberg_metadata_file; + info.committed_manifest_list = entry.commit_info->iceberg_manifest_list; + info.committed_manifest_file = entry.commit_info->iceberg_manifest_file; + info.committed_marker_file = entry.commit_info->commit_marker_file; + } + infos.emplace_back(std::move(info)); } @@ -392,6 +503,17 @@ void ExportPartitionManifestUpdatingTask::poll() auto last_exception_per_replica = readLastExceptionPerReplica( zk, fs::path(entry_path), key, storage.log.load()); + /// Mirror per-part destination file paths from /processed/. + /// Same lenient pattern: empty result = leave in-memory copy untouched. + auto destination_file_paths_per_part = readDestinationFilePathsPerPart( + zk, fs::path(entry_path), key, storage.log.load()); + + /// Mirror commit_info znode (present only after a successful commit). nullopt + /// means the znode does not exist yet (or transient ZK error); leave the + /// in-memory copy untouched in that case. + auto commit_info = readCommitInfo( + zk, fs::path(entry_path), key, storage.log.load()); + const auto local_entry = entries_by_key.find(key); /// If the zk entry has been replaced with export_merge_tree_partition_force_export, checking only for the export key is not enough @@ -407,6 +529,10 @@ void ExportPartitionManifestUpdatingTask::poll() { if (!last_exception_per_replica.empty()) local_entry->last_exception_per_replica = std::move(last_exception_per_replica); + if (!destination_file_paths_per_part.empty()) + local_entry->destination_file_paths_per_part = std::move(destination_file_paths_per_part); + if (commit_info) + local_entry->commit_info = std::move(commit_info); continue; } @@ -465,11 +591,15 @@ void ExportPartitionManifestUpdatingTask::poll() /// holding the cleanup lock (cleanup did not consume the entry). if (!last_exception_per_replica.empty()) local_entry->last_exception_per_replica = std::move(last_exception_per_replica); + if (!destination_file_paths_per_part.empty()) + local_entry->destination_file_paths_per_part = std::move(destination_file_paths_per_part); + if (commit_info) + local_entry->commit_info = std::move(commit_info); LOG_INFO(storage.log, "ExportPartition Manifest Updating Task: Skipping {}: already exists", key); continue; } - addTask(metadata, *status, std::move(last_exception_per_replica), key, entries_by_key); + addTask(metadata, *status, std::move(last_exception_per_replica), std::move(destination_file_paths_per_part), std::move(commit_info), key, entries_by_key); } /// Remove entries that were deleted by someone else @@ -484,6 +614,8 @@ void ExportPartitionManifestUpdatingTask::addTask( const ExportReplicatedMergeTreePartitionManifest & metadata, ExportReplicatedMergeTreePartitionTaskEntry::Status status, std::map last_exception_per_replica, + std::map> destination_file_paths_per_part, + std::optional commit_info, const std::string & key, auto & entries_by_key ) @@ -506,7 +638,13 @@ void ExportPartitionManifestUpdatingTask::addTask( } /// Insert or update entry. The multi_index container automatically maintains both indexes. - ExportReplicatedMergeTreePartitionTaskEntry entry {metadata, status, std::move(part_references), std::move(last_exception_per_replica)}; + ExportReplicatedMergeTreePartitionTaskEntry entry { + metadata, + status, + std::move(part_references), + std::move(last_exception_per_replica), + std::move(destination_file_paths_per_part), + std::move(commit_info)}; auto it = entries_by_key.find(key); if (it != entries_by_key.end()) entries_by_key.replace(it, entry); @@ -618,6 +756,22 @@ void ExportPartitionManifestUpdatingTask::handleStatusChanges() it->last_exception_per_replica = std::move(fetched); } + /// Refresh per-part destination paths and commit_info on the status flip too, + /// so the system table observes the COMPLETED state and the committed file + /// paths in the same poll cycle. + if (auto fetched = readDestinationFilePathsPerPart( + zk, fs::path(storage.zookeeper_path) / "exports" / key, key, storage.log.load()); + !fetched.empty()) + { + it->destination_file_paths_per_part = std::move(fetched); + } + + if (auto fetched_commit_info = readCommitInfo( + zk, fs::path(storage.zookeeper_path) / "exports" / key, key, storage.log.load())) + { + it->commit_info = std::move(fetched_commit_info); + } + /// If status changed to KILLED, cancel local export operations if (*new_status == ExportReplicatedMergeTreePartitionTaskEntry::Status::KILLED) { diff --git a/src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.h b/src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.h index 32487f2dc68c..71e16c1848b0 100644 --- a/src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.h +++ b/src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.h @@ -34,6 +34,8 @@ class ExportPartitionManifestUpdatingTask const ExportReplicatedMergeTreePartitionManifest & metadata, ExportReplicatedMergeTreePartitionTaskEntry::Status status, std::map last_exception_per_replica, + std::map> destination_file_paths_per_part, + std::optional commit_info, const std::string & key, auto & entries_by_key ); diff --git a/src/Storages/MergeTree/ExportPartitionUtils.cpp b/src/Storages/MergeTree/ExportPartitionUtils.cpp index 81df09c86523..54ad17247335 100644 --- a/src/Storages/MergeTree/ExportPartitionUtils.cpp +++ b/src/Storages/MergeTree/ExportPartitionUtils.cpp @@ -188,7 +188,8 @@ namespace ExportPartitionUtils getPartitionValuesForIcebergCommit(source_storage, manifest.partition_id); } - destination_storage->commitExportPartitionTransaction(manifest.transaction_id, manifest.partition_id, exported_paths, iceberg_args, context); + const auto destination_commit_info = destination_storage->commitExportPartitionTransaction( + manifest.transaction_id, manifest.partition_id, exported_paths, iceberg_args, context); /// Failpoint to simulate a crash after the Iceberg commit succeeds but before /// ZooKeeper is updated to COMPLETED. Used by idempotency integration tests. @@ -201,9 +202,61 @@ namespace ExportPartitionUtils }); LOG_INFO(log, "ExportPartition: Committed export, mark as completed"); + + /// Mirror the destination commit paths into a dedicated znode so peers can + /// surface them via system.replicated_partition_exports. Wrapped in the same + /// `tryMulti` as the COMPLETED status flip whenever non-empty so the two + /// observations land together; if the destination call short-circuited on + /// idempotency (empty struct), we fall back to the original single trySet + /// and leave whatever commit_info a peer may have already written intact. + /// + /// Best-effort caveat: if this replica crashes between writing the Iceberg + /// files and reaching this point, the task still completes via the recovery + /// path but commit_info will be absent. Recovering commit_info from the + /// live Iceberg snapshot in that case is a possible future enhancement. + const std::string status_path = fs::path(entry_path) / "status"; + const std::string completed_name = String(magic_enum::enum_name(ExportReplicatedMergeTreePartitionTaskEntry::Status::COMPLETED)).data(); + + if (!destination_commit_info.empty()) + { + ExportReplicatedMergeTreePartitionCommitInfoEntry commit_info_entry { + destination_commit_info.iceberg_metadata_file, + destination_commit_info.iceberg_manifest_list, + destination_commit_info.iceberg_manifest_file, + destination_commit_info.commit_marker_file}; + + const std::string commit_info_path = fs::path(entry_path) / "commit_info"; + + Coordination::Requests ops; + ops.emplace_back(zkutil::makeCreateRequest(commit_info_path, commit_info_entry.toJsonString(), zkutil::CreateMode::Persistent)); + ops.emplace_back(zkutil::makeSetRequest(status_path, completed_name, -1)); + + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperMulti); + + Coordination::Responses responses; + const auto rc = zk->tryMulti(ops, responses); + if (rc == Coordination::Error::ZOK) + { + LOG_INFO(log, "ExportPartition: Marked export as completed and persisted commit_info"); + return; + } + + /// ZNODEEXISTS on commit_info: a peer already wrote it; their record + /// stands. Fall through to set the status only. + if (rc == Coordination::Error::ZNODEEXISTS) + { + LOG_INFO(log, "ExportPartition: commit_info already present (peer wrote it first); marking status COMPLETED only"); + } + else + { + LOG_WARNING(log, "ExportPartition: Failed to persist commit_info atomically with COMPLETED (rc={}); falling back to status-only set", rc); + } + } + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperSet); - if (Coordination::Error::ZOK == zk->trySet(fs::path(entry_path) / "status", String(magic_enum::enum_name(ExportReplicatedMergeTreePartitionTaskEntry::Status::COMPLETED)).data(), -1)) + if (Coordination::Error::ZOK == zk->trySet(status_path, completed_name, -1)) { LOG_INFO(log, "ExportPartition: Marked export as completed"); } diff --git a/src/Storages/MergeTree/tests/gtest_export_partition_ordering.cpp b/src/Storages/MergeTree/tests/gtest_export_partition_ordering.cpp index c4669a9bf55c..b3c750129988 100644 --- a/src/Storages/MergeTree/tests/gtest_export_partition_ordering.cpp +++ b/src/Storages/MergeTree/tests/gtest_export_partition_ordering.cpp @@ -43,9 +43,9 @@ TEST_F(ExportPartitionOrderingTest, IterationOrderMatchesCreateTime) manifest3.transaction_id = "tx3"; manifest3.create_time = base_time; // Oldest - ExportReplicatedMergeTreePartitionTaskEntry entry1{manifest1, ExportReplicatedMergeTreePartitionTaskEntry::Status::PENDING, {}, {}}; - ExportReplicatedMergeTreePartitionTaskEntry entry2{manifest2, ExportReplicatedMergeTreePartitionTaskEntry::Status::PENDING, {}, {}}; - ExportReplicatedMergeTreePartitionTaskEntry entry3{manifest3, ExportReplicatedMergeTreePartitionTaskEntry::Status::PENDING, {}, {}}; + ExportReplicatedMergeTreePartitionTaskEntry entry1{manifest1, ExportReplicatedMergeTreePartitionTaskEntry::Status::PENDING, {}, {}, {}, {}}; + ExportReplicatedMergeTreePartitionTaskEntry entry2{manifest2, ExportReplicatedMergeTreePartitionTaskEntry::Status::PENDING, {}, {}, {}, {}}; + ExportReplicatedMergeTreePartitionTaskEntry entry3{manifest3, ExportReplicatedMergeTreePartitionTaskEntry::Status::PENDING, {}, {}, {}, {}}; // Insert in reverse order by_key.insert(entry1); diff --git a/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h b/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h index de35e5d4c4e1..732b9a1f56e0 100644 --- a/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h +++ b/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h @@ -16,6 +16,7 @@ #include #include #include +#include #include #include #include @@ -217,7 +218,7 @@ class IDataLakeMetadata : boost::noncopyable throwNotImplemented("import"); } - virtual void commitExportPartitionTransaction( + virtual IStorage::ExportPartitionCommitInfo commitExportPartitionTransaction( std::shared_ptr /* catalog */, const StorageID & /* table_id */, const String & /* transaction_id */, diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp index 9d52ca68cc41..81dac159dd73 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp @@ -1554,7 +1554,7 @@ Poco::JSON::Object::Ptr lookupSchema(const Poco::JSON::Object::Ptr & meta, Int64 } -bool IcebergMetadata::commitImportPartitionTransactionImpl( +IStorage::ExportPartitionCommitInfo IcebergMetadata::commitImportPartitionTransactionImpl( FileNamesGenerator & filename_generator, Poco::JSON::Object::Ptr & metadata, Poco::JSON::Object::Ptr & partition_spec, @@ -1582,7 +1582,13 @@ bool IcebergMetadata::commitImportPartitionTransactionImpl( LOG_INFO(log, "Export transaction {} already committed, skipping re-commit", transaction_id); - return true; + /// Surface a sentinel so the caller treats this as a successful attempt (non-empty + /// commit info), persists a commit_info znode, and makes the situation visible in + /// system.replicated_partition_exports.committed_metadata_file. We do not know the + /// original committer's paths from here. + IStorage::ExportPartitionCommitInfo already_committed_info; + already_committed_info.iceberg_metadata_file = ""; + return already_committed_info; } CompressionMethod metadata_compression_method = persistent_components.metadata_compression_method; @@ -1770,7 +1776,7 @@ bool IcebergMetadata::commitImportPartitionTransactionImpl( { LOG_DEBUG(log, "Failed to write metadata {}, retrying", storage_metadata_name); cleanup(true); - return false; + return {}; } LOG_DEBUG(log, "Metadata file {} written", storage_metadata_name); @@ -1785,7 +1791,7 @@ bool IcebergMetadata::commitImportPartitionTransactionImpl( if (!catalog->updateMetadata(namespace_name, table_name, catalog_filename, new_snapshot)) { cleanup(true); - return false; + return {}; } /// Catalog has accepted the commit - the new snapshot is now live and references @@ -1828,11 +1834,16 @@ bool IcebergMetadata::commitImportPartitionTransactionImpl( /// post-publish work (e.g. metadata-cache invalidation). Running cleanup() /// here would delete manifest files referenced by the published snapshot /// and corrupt it. Log and swallow - any transient state (stale cache) - /// is self-healing on subsequent reads. + /// is self-healing on subsequent reads. Surface the published paths anyway + /// so the partition export task can persist them in ZooKeeper. tryLogCurrentException(log, "Post-publish work failed after Iceberg snapshot was committed; " "skipping manifest cleanup to preserve published snapshot"); - return true; + IStorage::ExportPartitionCommitInfo published_info; + published_info.iceberg_metadata_file = storage_metadata_name; + published_info.iceberg_manifest_list = storage_manifest_list_name; + published_info.iceberg_manifest_file = storage_manifest_entry_name; + return published_info; } LOG_ERROR(log, "Failed to commit import partition transaction: {}", getCurrentExceptionMessage(false)); @@ -1840,10 +1851,18 @@ bool IcebergMetadata::commitImportPartitionTransactionImpl( throw; } - return true; + /// Record the storage paths of the files we just published so the partition + /// export task can persist them in ZooKeeper for observability. Only set here + /// (not on the retry / "already committed" paths) so the struct reflects + /// exactly what this attempt produced. + IStorage::ExportPartitionCommitInfo published_info; + published_info.iceberg_metadata_file = storage_metadata_name; + published_info.iceberg_manifest_list = storage_manifest_list_name; + published_info.iceberg_manifest_file = storage_manifest_entry_name; + return published_info; } -void IcebergMetadata::commitExportPartitionTransaction( +IStorage::ExportPartitionCommitInfo IcebergMetadata::commitExportPartitionTransaction( std::shared_ptr catalog, const StorageID & table_id, const String & transaction_id, @@ -1881,7 +1900,11 @@ void IcebergMetadata::commitExportPartitionTransaction( LOG_INFO(log, "Export transaction {} already committed, skipping re-commit", transaction_id); - return; + /// Empty commit info: paths produced by the original commit are not recoverable + /// here without re-reading the snapshot. ExportPartitionUtils::commit only + /// writes the commit_info znode when this struct is non-empty, so the original + /// committer's record (if any) is preserved. + return {}; } /// Fail fast if the table schema or partition spec changed between export-start and commit. @@ -1955,7 +1978,7 @@ void IcebergMetadata::commitExportPartitionTransaction( size_t attempt = 0; while (attempt < MAX_TRANSACTION_RETRIES) { - if (commitImportPartitionTransactionImpl( + auto commit_info = commitImportPartitionTransactionImpl( filename_generator, metadata, partition_spec, @@ -1975,10 +1998,9 @@ void IcebergMetadata::commitExportPartitionTransaction( table_id, configuration->getTypeName(), configuration->getNamespace(), - context)) - { - return; - } + context); + if (!commit_info.empty()) + return commit_info; ++attempt; } diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h index cd54eb235035..9206a038163a 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h @@ -134,7 +134,7 @@ class IcebergMetadata : public IDataLakeMetadata /// data_file_paths contains the metadata-path for each exported data file (as recorded in /// ZooKeeper). For every path a co-located sidecar Avro file (same path, ".avro" extension) /// must exist in the object storage; it supplies record_count and file_size_in_bytes. - void commitExportPartitionTransaction( + IStorage::ExportPartitionCommitInfo commitExportPartitionTransaction( std::shared_ptr catalog, const StorageID & table_id, const String & transaction_id, @@ -208,7 +208,12 @@ class IcebergMetadata : public IDataLakeMetadata std::optional getPartitionKey(ContextPtr local_context, Iceberg::TableStateSnapshot actual_table_state_snapshot) const; KeyDescription getSortingKey(ContextPtr local_context, Iceberg::TableStateSnapshot actual_table_state_snapshot) const; - bool commitImportPartitionTransactionImpl( + /// Non-empty return value means the attempt succeeded (covers both the normal + /// publish path and the `isExportPartitionTransactionAlreadyCommitted` short-circuit). + /// An empty `ExportPartitionCommitInfo` means the caller must retry. The + /// short-circuit branch fills `iceberg_metadata_file` with a sentinel note since + /// the original committer's paths are not trivially recoverable from inside this call. + IStorage::ExportPartitionCommitInfo commitImportPartitionTransactionImpl( FileNamesGenerator & filename_generator, Poco::JSON::Object::Ptr & metadata, Poco::JSON::Object::Ptr & partition_spec, diff --git a/src/Storages/ObjectStorage/StorageObjectStorage.cpp b/src/Storages/ObjectStorage/StorageObjectStorage.cpp index 56b4e52296b2..f8a0f6037cce 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorage.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorage.cpp @@ -668,7 +668,7 @@ SinkToStoragePtr StorageObjectStorage::import( local_context); } -void StorageObjectStorage::commitExportPartitionTransaction( +IStorage::ExportPartitionCommitInfo StorageObjectStorage::commitExportPartitionTransaction( const String & transaction_id, const String & partition_id, const Strings & exported_paths, @@ -689,7 +689,7 @@ void StorageObjectStorage::commitExportPartitionTransaction( const auto partition_spec_id = iceberg_metadata->getValue(Iceberg::f_default_spec_id); configuration->lazyInitializeIfNeeded(object_storage, local_context); - configuration->getExternalMetadata()->commitExportPartitionTransaction( + return configuration->getExternalMetadata()->commitExportPartitionTransaction( catalog, storage_id, transaction_id, @@ -700,16 +700,20 @@ void StorageObjectStorage::commitExportPartitionTransaction( exported_paths, configuration, local_context); - return; } const String commit_object = configuration->getRawPath().path + "/commit_" + partition_id + "_" + transaction_id; + ExportPartitionCommitInfo result; + result.commit_marker_file = commit_object; + /// if file already exists, nothing to be done if (object_storage->exists(StoredObject(commit_object))) { LOG_DEBUG(getLogger("StorageObjectStorage"), "Commit file already exists, nothing to be done: {}", commit_object); - return; + /// Still surface the path: observability does not require we wrote it, + /// only that it is the committed marker for this transaction. + return result; } auto out = object_storage->writeObject(StoredObject(commit_object), WriteMode::Rewrite, /* attributes= */ {}, DBMS_DEFAULT_BUFFER_SIZE, local_context->getWriteSettings()); @@ -719,6 +723,7 @@ void StorageObjectStorage::commitExportPartitionTransaction( out->write("\n", 1); } out->finalize(); + return result; } void StorageObjectStorage::truncate( diff --git a/src/Storages/ObjectStorage/StorageObjectStorage.h b/src/Storages/ObjectStorage/StorageObjectStorage.h index 8c04b71d9487..6ace629b7085 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorage.h +++ b/src/Storages/ObjectStorage/StorageObjectStorage.h @@ -89,7 +89,7 @@ class StorageObjectStorage : public IStorage const std::optional & /* format_settings_ */, ContextPtr /* context */) override; - void commitExportPartitionTransaction( + ExportPartitionCommitInfo commitExportPartitionTransaction( const String & transaction_id, const String & partition_id, const Strings & exported_paths, diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp index e5131d06ae2e..6211237d220e 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp @@ -1111,7 +1111,7 @@ SinkToStoragePtr StorageObjectStorageCluster::import( context); } -void StorageObjectStorageCluster::commitExportPartitionTransaction( +IStorage::ExportPartitionCommitInfo StorageObjectStorageCluster::commitExportPartitionTransaction( const String & transaction_id, const String & partition_id, const Strings & exported_paths, @@ -1120,16 +1120,15 @@ void StorageObjectStorageCluster::commitExportPartitionTransaction( { if (pure_storage) { - pure_storage->commitExportPartitionTransaction( + return pure_storage->commitExportPartitionTransaction( transaction_id, partition_id, exported_paths, iceberg_commit_export_partition_arguments, local_context ); - return; } - IStorageCluster::commitExportPartitionTransaction( + return IStorageCluster::commitExportPartitionTransaction( transaction_id, partition_id, exported_paths, diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.h b/src/Storages/ObjectStorage/StorageObjectStorageCluster.h index 52c7d5951855..f5fa4e7434bd 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.h +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.h @@ -44,7 +44,7 @@ class StorageObjectStorageCluster : public IStorageCluster const std::optional & format_settings_, ContextPtr context) override; - void commitExportPartitionTransaction( + ExportPartitionCommitInfo commitExportPartitionTransaction( const String & transaction_id, const String & partition_id, const Strings & exported_paths, diff --git a/src/Storages/System/StorageSystemReplicatedPartitionExports.cpp b/src/Storages/System/StorageSystemReplicatedPartitionExports.cpp index 9e8faab689d6..25fd130d0198 100644 --- a/src/Storages/System/StorageSystemReplicatedPartitionExports.cpp +++ b/src/Storages/System/StorageSystemReplicatedPartitionExports.cpp @@ -6,6 +6,7 @@ #include #include #include +#include #include #include #include @@ -47,6 +48,16 @@ ColumnsDescription StorageSystemReplicatedPartitionExports::getColumnsDescriptio "Per-replica last exception entries. Each tuple records the most recent exception observed by that replica plus a best-effort within-replica count. Empty array if no replica has reported an exception for this task."}, {"exception_count", std::make_shared(), "Sum of per-replica exception counts. Each replica owns its own count, so the sum is exact w.r.t. the in-memory snapshot; within-replica updates remain best-effort and may under-count by one under concurrent failures."}, + {"destination_file_paths", std::make_shared(std::make_shared(), std::make_shared(std::make_shared())), + "Per-part destination file paths written to the destination object storage. Keyed by part name; values are the file paths produced by exporting that part. Mirrored from ZooKeeper on every poll while PENDING; partial during in-flight tasks."}, + {"committed_metadata_file", std::make_shared(), + "For Iceberg destinations: path of the new metadata JSON file written at commit time. Empty for non-Iceberg destinations and for tasks that have not committed yet. May also be empty if the committing replica crashed between writing the object-storage files and persisting commit_info. If the export was already committed by a previous run (detected via the transaction id stored in the snapshot summary), this column holds a human-readable note instead of a path since the original committer's paths are not trivially recoverable."}, + {"committed_manifest_list", std::make_shared(), + "For Iceberg destinations: path of the manifest list file (snap-*.avro) referenced by the new snapshot. Empty under the same conditions as committed_metadata_file."}, + {"committed_manifest_file", std::make_shared(), + "For Iceberg destinations: path of the manifest file referenced by committed_manifest_list. Empty under the same conditions as committed_metadata_file."}, + {"committed_marker_file", std::make_shared(), + "For plain object storage destinations: path of the per-transaction commit marker file written by the destination. Empty for Iceberg destinations and for tasks that have not committed yet."}, }; } @@ -152,6 +163,25 @@ void StorageSystemReplicatedPartitionExports::fillData(MutableColumns & res_colu per_replica.push_back(Tuple{ex.replica, ex.message, ex.part, ex.time, ex.count}); res_columns[i++]->insert(per_replica); res_columns[i++]->insert(info.exception_count); + + Map destination_paths_map; + destination_paths_map.reserve(info.destination_file_paths_per_part.size()); + for (const auto & [part_name, paths] : info.destination_file_paths_per_part) + { + Array paths_array; + paths_array.reserve(paths.size()); + for (const auto & path : paths) + paths_array.push_back(path); + destination_paths_map.emplace_back(Tuple{part_name, std::move(paths_array)}); + } + res_columns[i++]->insert(std::move(destination_paths_map)); + + res_columns[i++]->insert(info.committed_metadata_file); + res_columns[i++]->insert(info.committed_manifest_list); + + res_columns[i++]->insert(info.committed_manifest_file); + + res_columns[i++]->insert(info.committed_marker_file); } } } diff --git a/src/Storages/System/StorageSystemReplicatedPartitionExports.h b/src/Storages/System/StorageSystemReplicatedPartitionExports.h index a8666374a7f0..244fee61ae4a 100644 --- a/src/Storages/System/StorageSystemReplicatedPartitionExports.h +++ b/src/Storages/System/StorageSystemReplicatedPartitionExports.h @@ -29,6 +29,19 @@ struct ReplicatedPartitionExportInfo /// single replica the count is best-effort (concurrent failing writers may under- /// count by one), matching the documented column semantics. size_t exception_count = 0; + /// Per-part destination file paths, keyed by part name. Mirrors the + /// /processed//paths_in_destination data from ZooKeeper. + /// Empty until parts complete; partial during PENDING. + std::map> destination_file_paths_per_part; + /// Iceberg commit-time paths surfaced from /commit_info. + /// All empty for non-Iceberg destinations or before commit lands. + String committed_metadata_file; + String committed_manifest_list; + String committed_manifest_file; + /// Plain object storage commit marker file surfaced from + /// /commit_info. Empty for Iceberg destinations or before + /// commit lands. + String committed_marker_file; }; class StorageSystemReplicatedPartitionExports final : public IStorageSystemOneBlock diff --git a/tests/integration/test_export_replicated_mt_partition_to_iceberg/test.py b/tests/integration/test_export_replicated_mt_partition_to_iceberg/test.py index 471d8d36fe73..916c5ccc307d 100644 --- a/tests/integration/test_export_replicated_mt_partition_to_iceberg/test.py +++ b/tests/integration/test_export_replicated_mt_partition_to_iceberg/test.py @@ -843,6 +843,21 @@ def test_post_publish_exception_preserves_snapshot(cluster): f"Unexpected data after post-publish exception recovery:\n{result}" ) + # The recovery path goes through the isExportPartitionTransactionAlreadyCommitted + # short-circuit on retry, which surfaces a sentinel note in committed_metadata_file + # (the original committer's manifest paths are not recoverable from inside the impl). + committed_metadata_file = node.query( + f""" + SELECT committed_metadata_file FROM system.replicated_partition_exports + WHERE source_table = '{mt_table}' + AND destination_table = '{iceberg_table}' + AND partition_id = '2020' + """ + ).strip() + assert committed_metadata_file == "", ( + f"Expected sentinel in committed_metadata_file for already-committed retry, got: {committed_metadata_file!r}" + ) + def test_export_task_timeout_kills_stuck_pending_task(cluster): """ diff --git a/tests/integration/test_export_replicated_mt_partition_to_object_storage/test.py b/tests/integration/test_export_replicated_mt_partition_to_object_storage/test.py index 3161e3b67100..a784799bd155 100644 --- a/tests/integration/test_export_replicated_mt_partition_to_object_storage/test.py +++ b/tests/integration/test_export_replicated_mt_partition_to_object_storage/test.py @@ -693,6 +693,22 @@ def test_export_partition_file_already_exists_policy(cluster): # wait for the exports to finish wait_for_export_status(node, mt_table, s3_table, "2020", "COMPLETED") + # plain object storage destinations surface the commit marker file path via + # system.replicated_partition_exports.committed_marker_file + committed_marker_file = node.query( + f""" + SELECT committed_marker_file FROM system.replicated_partition_exports + WHERE source_table = '{mt_table}' + AND destination_table = '{s3_table}' + AND partition_id = '2020' + """ + ).strip() + assert committed_marker_file.startswith(f"{s3_table}/commit_2020_"), \ + f"Expected committed_marker_file under {s3_table}/, got: {committed_marker_file!r}" + assert node.query( + f"SELECT count() FROM s3(s3_conn, filename='{committed_marker_file}', format=LineAsString)" + ) == '1\n', f"Commit marker file does not exist at {committed_marker_file!r}" + # try to export the partition node.query( f"ALTER TABLE {mt_table} EXPORT PARTITION ID '2020' TO TABLE {s3_table} SETTINGS export_merge_tree_partition_force_export=1" From febfc21fefeb4512fa94f09e265908f5308e0ae5 Mon Sep 17 00:00:00 2001 From: Arthur Passos Date: Mon, 25 May 2026 15:58:08 -0300 Subject: [PATCH 2/7] address coments --- docs/en/antalya/partition_export.md | 9 ++++++++ .../DataLakes/Iceberg/IcebergMetadata.cpp | 8 +++---- .../test.py | 22 +++++++++++++------ .../test_export_partition_iceberg.py | 13 +++++++++++ .../test_export_partition_iceberg_catalog.py | 8 +++++++ 5 files changed, 48 insertions(+), 12 deletions(-) diff --git a/docs/en/antalya/partition_export.md b/docs/en/antalya/partition_export.md index 5365ed90e353..612adb149d99 100644 --- a/docs/en/antalya/partition_export.md +++ b/docs/en/antalya/partition_export.md @@ -206,6 +206,15 @@ Status values include: - `last_exception_per_replica` is an `Array(Tuple(replica String, message String, part String, time DateTime, count UInt64))`. Each tuple is the most recent exception observed by a single replica plus a best-effort within-replica `count`. Replicas that have never reported an exception are omitted. - `exception_count` is the sum of every `count` in `last_exception_per_replica`. Each replica owns its own counter, so cross-replica updates do not race; the sum is exact w.r.t. the snapshot returned. Within a single replica concurrent failing writers may under-count by one. +### Commit info columns + +These columns surface paths produced by the destination storage during commit, so it is possible to inspect what was written without consulting the destination directly: + +- `committed_metadata_file` — for Iceberg destinations: path of the new `vN.metadata.json` written by the commit. Empty for non-Iceberg destinations and before the commit lands. If the commit was already finished by a previous run (detected via the transaction id stored in the snapshot summary), this column carries a human-readable sentinel string instead of a path because the original committer's paths are not recoverable from inside the impl. +- `committed_manifest_list` — for Iceberg destinations: path of the manifest list file (`snap-*.avro`) referenced by the new snapshot. Empty under the same conditions as `committed_metadata_file`. +- `committed_manifest_file` — for Iceberg destinations: path of the manifest file referenced by `committed_manifest_list`. Empty under the same conditions as `committed_metadata_file`. +- `committed_marker_file` — for plain object storage destinations: path of the per-transaction commit marker file written by the destination. Empty for Iceberg destinations and for tasks that have not committed yet. + To pick the latest exception across replicas: ```sql diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp index 81dac159dd73..839ba34b2123 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp @@ -1900,11 +1900,9 @@ IStorage::ExportPartitionCommitInfo IcebergMetadata::commitExportPartitionTransa LOG_INFO(log, "Export transaction {} already committed, skipping re-commit", transaction_id); - /// Empty commit info: paths produced by the original commit are not recoverable - /// here without re-reading the snapshot. ExportPartitionUtils::commit only - /// writes the commit_info znode when this struct is non-empty, so the original - /// committer's record (if any) is preserved. - return {}; + IStorage::ExportPartitionCommitInfo already_committed_info; + already_committed_info.iceberg_metadata_file = ""; + return already_committed_info; } /// Fail fast if the table schema or partition spec changed between export-start and commit. diff --git a/tests/integration/test_export_replicated_mt_partition_to_iceberg/test.py b/tests/integration/test_export_replicated_mt_partition_to_iceberg/test.py index 916c5ccc307d..52de6b3fb436 100644 --- a/tests/integration/test_export_replicated_mt_partition_to_iceberg/test.py +++ b/tests/integration/test_export_replicated_mt_partition_to_iceberg/test.py @@ -814,8 +814,9 @@ def test_post_publish_exception_preserves_snapshot(cluster): post-publish region (after both the metadata file is written and `published = true` is set). With the fix in place: - the commit stays durable (snapshot is readable, manifests are intact); - - the export is marked COMPLETED because the idempotency check on retry - detects that the transaction is already committed and returns success; + - the export is marked COMPLETED because the outer `catch (...)` sees + `published == true` and returns the populated commit info with the real + paths produced by this attempt (no retry needed); - all exported rows are visible through the Iceberg table. """ node = cluster.instances["replica1"] @@ -843,9 +844,10 @@ def test_post_publish_exception_preserves_snapshot(cluster): f"Unexpected data after post-publish exception recovery:\n{result}" ) - # The recovery path goes through the isExportPartitionTransactionAlreadyCommitted - # short-circuit on retry, which surfaces a sentinel note in committed_metadata_file - # (the original committer's manifest paths are not recoverable from inside the impl). + # After a post-publish exception the catch handler with published==true returns + # the populated commit info (real metadata / manifest list / manifest file paths). + # ExportPartitionUtils::commit persists it to the commit_info znode, so the system + # table should show a real metadata path here, not the already-committed sentinel. committed_metadata_file = node.query( f""" SELECT committed_metadata_file FROM system.replicated_partition_exports @@ -854,8 +856,14 @@ def test_post_publish_exception_preserves_snapshot(cluster): AND partition_id = '2020' """ ).strip() - assert committed_metadata_file == "", ( - f"Expected sentinel in committed_metadata_file for already-committed retry, got: {committed_metadata_file!r}" + assert committed_metadata_file, ( + "committed_metadata_file should be populated after a successful post-publish-catch return" + ) + assert not committed_metadata_file.startswith("<"), ( + f"committed_metadata_file should be a real metadata path, got the already-committed sentinel: {committed_metadata_file!r}" + ) + assert committed_metadata_file.endswith(".metadata.json"), ( + f"Expected a *.metadata.json path in committed_metadata_file, got: {committed_metadata_file!r}" ) diff --git a/tests/integration/test_storage_iceberg_with_spark/test_export_partition_iceberg.py b/tests/integration/test_storage_iceberg_with_spark/test_export_partition_iceberg.py index d289639eac12..760dff9171f9 100644 --- a/tests/integration/test_storage_iceberg_with_spark/test_export_partition_iceberg.py +++ b/tests/integration/test_storage_iceberg_with_spark/test_export_partition_iceberg.py @@ -705,6 +705,19 @@ def test_idempotency_after_commit_crash(export_cluster): count = int(node.query(f"SELECT count() FROM {iceberg}").strip()) assert count == 3, f"Expected 3 rows (no duplicates), got {count}" + # The already-committed early-exit in commitExportPartitionTransaction surfaces + # a sentinel note in committed_metadata_file (the original committer's paths + # are not recoverable from inside the call). The sentinel makes the situation + # visible in system.replicated_partition_exports rather than leaving the + # commit_info columns empty. + committed_metadata_file = node.query( + f"SELECT committed_metadata_file FROM system.replicated_partition_exports " + f"WHERE source_table = '{source}' AND partition_id = '{pid}'" + ).strip() + assert committed_metadata_file == "", ( + f"Expected already-committed sentinel after idempotent retry, got: {committed_metadata_file!r}" + ) + def test_commit_attempts_budget_transitions_to_failed(export_cluster): """ diff --git a/tests/integration/test_storage_iceberg_with_spark/test_export_partition_iceberg_catalog.py b/tests/integration/test_storage_iceberg_with_spark/test_export_partition_iceberg_catalog.py index 9d0cc557dc18..c45b5a148b96 100644 --- a/tests/integration/test_storage_iceberg_with_spark/test_export_partition_iceberg_catalog.py +++ b/tests/integration/test_storage_iceberg_with_spark/test_export_partition_iceberg_catalog.py @@ -367,6 +367,14 @@ def test_catalog_idempotent_retry(catalog_export_cluster): f"got {len(history)}" ) + committed_metadata_file = node.query( + f"SELECT committed_metadata_file FROM system.replicated_partition_exports " + f"WHERE source_table = '{source}' AND partition_id = '{pid}'" + ).strip() + assert committed_metadata_file == "", ( + f"Expected already-committed sentinel after idempotent retry, got: {committed_metadata_file!r}" + ) + # --------------------------------------------------------------------------- # Replicated catalog tests From 1dfdc10be660c2f511b6f8e202d86fe2aa9837a1 Mon Sep 17 00:00:00 2001 From: Arthur Passos Date: Tue, 26 May 2026 15:20:55 -0300 Subject: [PATCH 3/7] fix test --- .../test.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/tests/integration/test_export_replicated_mt_partition_to_object_storage/test.py b/tests/integration/test_export_replicated_mt_partition_to_object_storage/test.py index a784799bd155..74f03033976d 100644 --- a/tests/integration/test_export_replicated_mt_partition_to_object_storage/test.py +++ b/tests/integration/test_export_replicated_mt_partition_to_object_storage/test.py @@ -703,10 +703,17 @@ def test_export_partition_file_already_exists_policy(cluster): AND partition_id = '2020' """ ).strip() - assert committed_marker_file.startswith(f"{s3_table}/commit_2020_"), \ + # `committed_marker_file` is the absolute key in the bucket (same convention as + # `destination_file_paths`); it may carry the s3_conn URL's in-bucket prefix on + # top of the table's `filename` argument, so use a "contains" check that does + # not depend on knowing that prefix. + assert f"{s3_table}/commit_2020_" in committed_marker_file, \ f"Expected committed_marker_file under {s3_table}/, got: {committed_marker_file!r}" + # Path relative to the `s3_conn` URL, derived from the absolute key without + # assuming a particular URL prefix. + marker_relative_path = committed_marker_file[committed_marker_file.index(f"{s3_table}/"):] assert node.query( - f"SELECT count() FROM s3(s3_conn, filename='{committed_marker_file}', format=LineAsString)" + f"SELECT count() FROM s3(s3_conn, filename='{marker_relative_path}', format=LineAsString)" ) == '1\n', f"Commit marker file does not exist at {committed_marker_file!r}" # try to export the partition From 70f5c15d9db747a0a8bc68843495939b33b2b413 Mon Sep 17 00:00:00 2001 From: Arthur Passos Date: Wed, 27 May 2026 09:29:46 -0300 Subject: [PATCH 4/7] make code simpler --- .../MergeTree/ExportPartitionUtils.cpp | 59 ++++++------------- 1 file changed, 18 insertions(+), 41 deletions(-) diff --git a/src/Storages/MergeTree/ExportPartitionUtils.cpp b/src/Storages/MergeTree/ExportPartitionUtils.cpp index 54ad17247335..c1552c9bcd96 100644 --- a/src/Storages/MergeTree/ExportPartitionUtils.cpp +++ b/src/Storages/MergeTree/ExportPartitionUtils.cpp @@ -203,20 +203,12 @@ namespace ExportPartitionUtils LOG_INFO(log, "ExportPartition: Committed export, mark as completed"); - /// Mirror the destination commit paths into a dedicated znode so peers can - /// surface them via system.replicated_partition_exports. Wrapped in the same - /// `tryMulti` as the COMPLETED status flip whenever non-empty so the two - /// observations land together; if the destination call short-circuited on - /// idempotency (empty struct), we fall back to the original single trySet - /// and leave whatever commit_info a peer may have already written intact. - /// - /// Best-effort caveat: if this replica crashes between writing the Iceberg - /// files and reaching this point, the task still completes via the recovery - /// path but commit_info will be absent. Recovering commit_info from the - /// live Iceberg snapshot in that case is a possible future enhancement. const std::string status_path = fs::path(entry_path) / "status"; const std::string completed_name = String(magic_enum::enum_name(ExportReplicatedMergeTreePartitionTaskEntry::Status::COMPLETED)).data(); + Coordination::Requests ops; + ops.emplace_back(zkutil::makeSetRequest(status_path, completed_name, -1)); + if (!destination_commit_info.empty()) { ExportReplicatedMergeTreePartitionCommitInfoEntry commit_info_entry { @@ -226,44 +218,29 @@ namespace ExportPartitionUtils destination_commit_info.commit_marker_file}; const std::string commit_info_path = fs::path(entry_path) / "commit_info"; - - Coordination::Requests ops; ops.emplace_back(zkutil::makeCreateRequest(commit_info_path, commit_info_entry.toJsonString(), zkutil::CreateMode::Persistent)); - ops.emplace_back(zkutil::makeSetRequest(status_path, completed_name, -1)); - - ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); - ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperMulti); - - Coordination::Responses responses; - const auto rc = zk->tryMulti(ops, responses); - if (rc == Coordination::Error::ZOK) - { - LOG_INFO(log, "ExportPartition: Marked export as completed and persisted commit_info"); - return; - } - - /// ZNODEEXISTS on commit_info: a peer already wrote it; their record - /// stands. Fall through to set the status only. - if (rc == Coordination::Error::ZNODEEXISTS) - { - LOG_INFO(log, "ExportPartition: commit_info already present (peer wrote it first); marking status COMPLETED only"); - } - else - { - LOG_WARNING(log, "ExportPartition: Failed to persist commit_info atomically with COMPLETED (rc={}); falling back to status-only set", rc); - } } ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); - ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperSet); - if (Coordination::Error::ZOK == zk->trySet(status_path, completed_name, -1)) + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperMulti); + + Coordination::Responses responses; + const auto rc = zk->tryMulti(ops, responses); + + if (rc == Coordination::Error::ZOK) { - LOG_INFO(log, "ExportPartition: Marked export as completed"); + LOG_INFO(log, "ExportPartition: Marked export as completed{}", + destination_commit_info.empty() ? "" : " and persisted commit_info"); + return; } - else + + if (rc == Coordination::Error::ZNODEEXISTS) { - throw Exception(ErrorCodes::NETWORK_ERROR, "ExportPartition: Failed to mark export as completed, will not try to fix it"); + LOG_INFO(log, "ExportPartition: commit_info already present (peer wrote it first); task already COMPLETED"); + return; } + + throw Exception(ErrorCodes::NETWORK_ERROR, "ExportPartition: Failed to mark export as completed (rc={}), will not try to fix it", rc); } bool handleCommitFailure( From 15a0608a8a8c233e277683e3dddd0eb2f1ee25cd Mon Sep 17 00:00:00 2001 From: Arthur Passos Date: Wed, 27 May 2026 11:38:47 -0300 Subject: [PATCH 5/7] ai docs fix --- docs/en/antalya/partition_export.md | 76 +++++++++++++++++------------ 1 file changed, 44 insertions(+), 32 deletions(-) diff --git a/docs/en/antalya/partition_export.md b/docs/en/antalya/partition_export.md index ed268b12bf4d..b4cd5f937008 100644 --- a/docs/en/antalya/partition_export.md +++ b/docs/en/antalya/partition_export.md @@ -158,41 +158,49 @@ Query id: 9efc271a-a501-44d1-834f-bc4d20156164 Row 1: ────── -source_database: default -source_table: replicated_source -destination_database: default -destination_table: replicated_destination -create_time: 2025-11-21 18:21:51 -partition_id: 2022 -transaction_id: 7397746091717128192 -source_replica: r1 -parts: ['2022_0_0_0','2022_1_1_0','2022_2_2_0'] -parts_count: 3 -parts_to_do: 0 -status: COMPLETED -exception_replica: -last_exception: -exception_part: -exception_count: 0 +source_database: default +source_table: replicated_source +destination_database: default +destination_table: s3_destination +create_time: 2025-11-21 18:21:51 +partition_id: 2022 +transaction_id: 7397746091717128192 +query_id: 3fa3c8d3-7d6b-4f8b-9aa2-2c1f1ad0a111 +source_replica: r1 +parts: ['2022_0_0_0','2022_1_1_0','2022_2_2_0'] +parts_count: 3 +parts_to_do: 0 +status: COMPLETED +last_exception_per_replica: [] +exception_count: 0 +destination_file_paths: {'2022_0_0_0':['s3://bucket/db/t/year=2022/2022_0_0_0_.parquet'],'2022_1_1_0':['s3://bucket/db/t/year=2022/2022_1_1_0_.parquet'],'2022_2_2_0':['s3://bucket/db/t/year=2022/2022_2_2_0_.parquet']} +committed_metadata_file: +committed_manifest_list: +committed_manifest_file: +committed_marker_file: s3://bucket/db/t/commit_2022_7397746091717128192 Row 2: ────── -source_database: default -source_table: replicated_source -destination_database: default -destination_table: replicated_destination -create_time: 2025-11-21 18:20:35 -partition_id: 2021 -transaction_id: 7397745772618674176 -source_replica: r1 -parts: ['2021_0_0_0'] -parts_count: 1 -parts_to_do: 0 -status: COMPLETED -exception_replica: -last_exception: -exception_part: -exception_count: 0 +source_database: default +source_table: replicated_source +destination_database: default +destination_table: iceberg_destination +create_time: 2025-11-21 18:20:35 +partition_id: 2021 +transaction_id: 7397745772618674176 +query_id: 1c8e0fd0-6a3a-4d6e-9bd6-bdf64adfe118 +source_replica: r2 +parts: ['2021_0_0_0'] +parts_count: 1 +parts_to_do: 0 +status: COMPLETED +last_exception_per_replica: [('r1','Code: 999. Coordination::Exception: Session expired','2021_0_0_0','2025-11-21 18:20:42',1)] +exception_count: 1 +destination_file_paths: {'2021_0_0_0':['s3://lake/db/t/data/year=2021/2021_0_0_0_.parquet']} +committed_metadata_file: s3://lake/db/t/metadata/v3.metadata.json +committed_manifest_list: s3://lake/db/t/metadata/snap-7397745772618674176-1-.avro +committed_manifest_file: s3://lake/db/t/metadata/-m0.avro +committed_marker_file: 2 rows in set. Elapsed: 0.019 sec. @@ -210,6 +218,10 @@ Status values include: - `last_exception_per_replica` is an `Array(Tuple(replica String, message String, part String, time DateTime, count UInt64))`. Each tuple is the most recent exception observed by a single replica plus a best-effort within-replica `count`. Replicas that have never reported an exception are omitted. - `exception_count` is the sum of every `count` in `last_exception_per_replica`. Each replica owns its own counter, so cross-replica updates do not race; the sum is exact w.r.t. the snapshot returned. Within a single replica concurrent failing writers may under-count by one. +### Per-part destination file paths + +- `destination_file_paths` is a `Map(String, Array(String))` keyed by source part name. Each value is the list of file paths written to the destination object storage when that part was exported (a single part can produce multiple files depending on `max_bytes` / `max_rows`). The map is rebuilt from ZooKeeper on every poll, so it grows as parts complete during `PENDING` and becomes the full picture once the task reaches `COMPLETED`. + ### Commit info columns These columns surface paths produced by the destination storage during commit, so it is possible to inspect what was written without consulting the destination directly: From 5da440f72c5e867cf9e5d23599c17fcc47849dd6 Mon Sep 17 00:00:00 2001 From: Arthur Passos Date: Mon, 1 Jun 2026 12:25:29 -0300 Subject: [PATCH 6/7] fix build issue due to merge --- .../ExportPartitionManifestUpdatingTask.cpp | 21 ++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.cpp b/src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.cpp index e1136475154c..1b479e39fe5b 100644 --- a/src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.cpp +++ b/src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.cpp @@ -406,6 +406,8 @@ std::vector ExportPartitionManifestUpdatingTask:: ExportReplicatedMergeTreePartitionManifest manifest; ExportReplicatedMergeTreePartitionTaskEntry::Status status; std::map last_exception_per_replica; + std::map> destination_file_paths_per_part; + std::optional commit_info; }; std::vector snapshots; @@ -415,7 +417,12 @@ std::vector ExportPartitionManifestUpdatingTask:: snapshots.reserve(storage.export_merge_tree_partition_task_entries_by_key.size()); for (const auto & entry : storage.export_merge_tree_partition_task_entries_by_key) - snapshots.push_back(EntrySnapshot{entry.manifest, entry.status, entry.last_exception_per_replica}); + snapshots.emplace_back( + entry.manifest, + entry.status, + std::move(entry.last_exception_per_replica), + std::move(entry.destination_file_paths_per_part), + std::move(entry.commit_info)); } std::vector infos; @@ -446,14 +453,14 @@ std::vector ExportPartitionManifestUpdatingTask:: } info.exception_count = total_exception_count; - info.destination_file_paths_per_part = entry.destination_file_paths_per_part; + info.destination_file_paths_per_part = std::move(snapshot.destination_file_paths_per_part); - if (entry.commit_info) + if (snapshot.commit_info) { - info.committed_metadata_file = entry.commit_info->iceberg_metadata_file; - info.committed_manifest_list = entry.commit_info->iceberg_manifest_list; - info.committed_manifest_file = entry.commit_info->iceberg_manifest_file; - info.committed_marker_file = entry.commit_info->commit_marker_file; + info.committed_metadata_file = snapshot.commit_info->iceberg_metadata_file; + info.committed_manifest_list = snapshot.commit_info->iceberg_manifest_list; + info.committed_manifest_file = snapshot.commit_info->iceberg_manifest_file; + info.committed_marker_file = snapshot.commit_info->commit_marker_file; } infos.emplace_back(std::move(info)); From 39319cb3875035f81f690431117bf89b8f5c07ec Mon Sep 17 00:00:00 2001 From: Arthur Passos Date: Mon, 1 Jun 2026 14:12:37 -0300 Subject: [PATCH 7/7] one more fix --- .../ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp index 6f202e565b14..258e75142b53 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp @@ -1812,9 +1812,9 @@ IStorage::ExportPartitionCommitInfo IcebergMetadata::commitImportPartitionTransa "Post-publish work failed after Iceberg snapshot was committed; " "skipping manifest cleanup to preserve published snapshot"); IStorage::ExportPartitionCommitInfo published_info; - published_info.iceberg_metadata_file = storage_metadata_name; + published_info.iceberg_metadata_file = resolver.resolve(metadata_info.path); published_info.iceberg_manifest_list = storage_manifest_list_name; - published_info.iceberg_manifest_file = storage_manifest_entry_name; + published_info.iceberg_manifest_file = storage_manifest_entry_path; return published_info; } @@ -1828,9 +1828,9 @@ IStorage::ExportPartitionCommitInfo IcebergMetadata::commitImportPartitionTransa /// (not on the retry / "already committed" paths) so the struct reflects /// exactly what this attempt produced. IStorage::ExportPartitionCommitInfo published_info; - published_info.iceberg_metadata_file = storage_metadata_name; + published_info.iceberg_metadata_file = resolver.resolve(metadata_info.path); published_info.iceberg_manifest_list = storage_manifest_list_name; - published_info.iceberg_manifest_file = storage_manifest_entry_name; + published_info.iceberg_manifest_file = storage_manifest_entry_path; return published_info; }