diff --git a/docs/en/antalya/partition_export.md b/docs/en/antalya/partition_export.md index 975915859482..b4cd5f937008 100644 --- a/docs/en/antalya/partition_export.md +++ b/docs/en/antalya/partition_export.md @@ -158,41 +158,49 @@ Query id: 9efc271a-a501-44d1-834f-bc4d20156164 Row 1: ────── -source_database: default -source_table: replicated_source -destination_database: default -destination_table: replicated_destination -create_time: 2025-11-21 18:21:51 -partition_id: 2022 -transaction_id: 7397746091717128192 -source_replica: r1 -parts: ['2022_0_0_0','2022_1_1_0','2022_2_2_0'] -parts_count: 3 -parts_to_do: 0 -status: COMPLETED -exception_replica: -last_exception: -exception_part: -exception_count: 0 +source_database: default +source_table: replicated_source +destination_database: default +destination_table: s3_destination +create_time: 2025-11-21 18:21:51 +partition_id: 2022 +transaction_id: 7397746091717128192 +query_id: 3fa3c8d3-7d6b-4f8b-9aa2-2c1f1ad0a111 +source_replica: r1 +parts: ['2022_0_0_0','2022_1_1_0','2022_2_2_0'] +parts_count: 3 +parts_to_do: 0 +status: COMPLETED +last_exception_per_replica: [] +exception_count: 0 +destination_file_paths: {'2022_0_0_0':['s3://bucket/db/t/year=2022/2022_0_0_0_.parquet'],'2022_1_1_0':['s3://bucket/db/t/year=2022/2022_1_1_0_.parquet'],'2022_2_2_0':['s3://bucket/db/t/year=2022/2022_2_2_0_.parquet']} +committed_metadata_file: +committed_manifest_list: +committed_manifest_file: +committed_marker_file: s3://bucket/db/t/commit_2022_7397746091717128192 Row 2: ────── -source_database: default -source_table: replicated_source -destination_database: default -destination_table: replicated_destination -create_time: 2025-11-21 18:20:35 -partition_id: 2021 -transaction_id: 7397745772618674176 -source_replica: r1 -parts: ['2021_0_0_0'] -parts_count: 1 -parts_to_do: 0 -status: COMPLETED -exception_replica: -last_exception: -exception_part: -exception_count: 0 +source_database: default +source_table: replicated_source +destination_database: default +destination_table: iceberg_destination +create_time: 2025-11-21 18:20:35 +partition_id: 2021 +transaction_id: 7397745772618674176 +query_id: 1c8e0fd0-6a3a-4d6e-9bd6-bdf64adfe118 +source_replica: r2 +parts: ['2021_0_0_0'] +parts_count: 1 +parts_to_do: 0 +status: COMPLETED +last_exception_per_replica: [('r1','Code: 999. Coordination::Exception: Session expired','2021_0_0_0','2025-11-21 18:20:42',1)] +exception_count: 1 +destination_file_paths: {'2021_0_0_0':['s3://lake/db/t/data/year=2021/2021_0_0_0_.parquet']} +committed_metadata_file: s3://lake/db/t/metadata/v3.metadata.json +committed_manifest_list: s3://lake/db/t/metadata/snap-7397745772618674176-1-.avro +committed_manifest_file: s3://lake/db/t/metadata/-m0.avro +committed_marker_file: 2 rows in set. Elapsed: 0.019 sec. @@ -205,6 +213,33 @@ Status values include: - `FAILED` - Export failed - `KILLED` - Export was cancelled +### Exception columns + +- `last_exception_per_replica` is an `Array(Tuple(replica String, message String, part String, time DateTime, count UInt64))`. Each tuple is the most recent exception observed by a single replica plus a best-effort within-replica `count`. Replicas that have never reported an exception are omitted. +- `exception_count` is the sum of every `count` in `last_exception_per_replica`. Each replica owns its own counter, so cross-replica updates do not race; the sum is exact w.r.t. the snapshot returned. Within a single replica concurrent failing writers may under-count by one. + +### Per-part destination file paths + +- `destination_file_paths` is a `Map(String, Array(String))` keyed by source part name. Each value is the list of file paths written to the destination object storage when that part was exported (a single part can produce multiple files depending on `max_bytes` / `max_rows`). The map is rebuilt from ZooKeeper on every poll, so it grows as parts complete during `PENDING` and becomes the full picture once the task reaches `COMPLETED`. + +### Commit info columns + +These columns surface paths produced by the destination storage during commit, so it is possible to inspect what was written without consulting the destination directly: + +- `committed_metadata_file` — for Iceberg destinations: path of the new `vN.metadata.json` written by the commit. Empty for non-Iceberg destinations and before the commit lands. If the commit was already finished by a previous run (detected via the transaction id stored in the snapshot summary), this column carries a human-readable sentinel string instead of a path because the original committer's paths are not recoverable from inside the impl. +- `committed_manifest_list` — for Iceberg destinations: path of the manifest list file (`snap-*.avro`) referenced by the new snapshot. Empty under the same conditions as `committed_metadata_file`. +- `committed_manifest_file` — for Iceberg destinations: path of the manifest file referenced by `committed_manifest_list`. Empty under the same conditions as `committed_metadata_file`. +- `committed_marker_file` — for plain object storage destinations: path of the per-transaction commit marker file written by the destination. Empty for Iceberg destinations and for tasks that have not committed yet. + +To pick the latest exception across replicas: + +```sql +SELECT + arraySort(x -> -x.time, last_exception_per_replica)[1] AS latest_exception +FROM system.replicated_partition_exports +WHERE source_table = 'rmt_table' AND destination_table = 's3_table'; +``` + ## Related Features - [ALTER TABLE EXPORT PART](/docs/en/engines/table-engines/mergetree-family/part_export.md) - Export individual parts (non-replicated) diff --git a/src/Storages/ExportReplicatedMergeTreePartitionManifest.h b/src/Storages/ExportReplicatedMergeTreePartitionManifest.h index acfabc28ca61..dea68c833fa1 100644 --- a/src/Storages/ExportReplicatedMergeTreePartitionManifest.h +++ b/src/Storages/ExportReplicatedMergeTreePartitionManifest.h @@ -152,6 +152,82 @@ struct ExportReplicatedMergeTreePartitionProcessedPartEntry } }; +/// Per-task "commit info" record persisted at /commit_info. +/// +/// Written exactly once, atomically with the status -> COMPLETED transition +/// (see ExportPartitionUtils::commit). Captures the metadata-layer file paths +/// produced by the destination storage during commit so they can be surfaced in +/// system.replicated_partition_exports for debugging. +/// +/// All Iceberg fields are empty for non-Iceberg destinations. They may also be +/// empty for an Iceberg destination if the committing replica crashed between +/// writing the object-storage files and writing this znode; in that case the +/// task still transitions to COMPLETED via the recovery path but commit_info +/// remains absent. This is best-effort observability and acceptable. +struct ExportReplicatedMergeTreePartitionCommitInfoEntry +{ + /// Iceberg: path (in destination object storage) of the new vN.metadata.json + /// written by the commit. + String iceberg_metadata_file; + + /// Iceberg: path of the snap---.avro manifest list + /// referenced by the new snapshot. + String iceberg_manifest_list; + + /// Iceberg: path of the manifest entry file (*.avro) referenced by the + /// manifest list. + String iceberg_manifest_file; + + /// Plain object storage: path of the commit marker file written by + /// StorageObjectStorage::commitExportPartitionTransaction. Empty for Iceberg. + String commit_marker_file; + + bool empty() const + { + return iceberg_metadata_file.empty() + && iceberg_manifest_list.empty() + && iceberg_manifest_file.empty() + && commit_marker_file.empty(); + } + + std::string toJsonString() const + { + Poco::JSON::Object json; + json.set("iceberg_metadata_file", iceberg_metadata_file); + json.set("iceberg_manifest_list", iceberg_manifest_list); + json.set("iceberg_manifest_file", iceberg_manifest_file); + json.set("commit_marker_file", commit_marker_file); + + std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM + oss.exceptions(std::ios::failbit); + Poco::JSON::Stringifier::stringify(json, oss); + return oss.str(); + } + + static ExportReplicatedMergeTreePartitionCommitInfoEntry fromJsonString(const std::string & json_string) + { + ExportReplicatedMergeTreePartitionCommitInfoEntry entry; + if (json_string.empty()) + return entry; + + Poco::JSON::Parser parser; + auto json = parser.parse(json_string).extract(); + + if (json->has("iceberg_metadata_file")) + entry.iceberg_metadata_file = json->getValue("iceberg_metadata_file"); + if (json->has("iceberg_manifest_list")) + entry.iceberg_manifest_list = json->getValue("iceberg_manifest_list"); + + if (json->has("iceberg_manifest_file")) + entry.iceberg_manifest_file = json->getValue("iceberg_manifest_file"); + + if (json->has("commit_marker_file")) + entry.commit_marker_file = json->getValue("commit_marker_file"); + + return entry; + } +}; + struct ExportReplicatedMergeTreePartitionManifest { String transaction_id; diff --git a/src/Storages/ExportReplicatedMergeTreePartitionTaskEntry.h b/src/Storages/ExportReplicatedMergeTreePartitionTaskEntry.h index 8af873e0b89c..bf7b3bdfa6a4 100644 --- a/src/Storages/ExportReplicatedMergeTreePartitionTaskEntry.h +++ b/src/Storages/ExportReplicatedMergeTreePartitionTaskEntry.h @@ -1,6 +1,7 @@ #pragma once #include +#include #include #include #include "Core/QualifiedTableName.h" @@ -40,6 +41,20 @@ struct ExportReplicatedMergeTreePartitionTaskEntry /// An empty map means no replica has recorded an exception yet for this task. mutable std::map last_exception_per_replica; + /// In-memory mirror of /processed/ leaves in ZK, keyed by + /// part name. Each value is the list of destination file paths produced by the + /// per-part export (typically Parquet object-storage keys). Refreshed on every + /// poll() cycle and on status-change handler invocations; served verbatim to + /// system.replicated_partition_exports without any extra ZK read at query time. + /// An empty map means no part has finished exporting yet for this task. + mutable std::map> destination_file_paths_per_part; + + /// In-memory mirror of the /commit_info znode (written atomically + /// with the COMPLETED status transition; see ExportPartitionUtils::commit). + /// nullopt until commit_info is observed in ZK. Empty fields inside the struct + /// for non-Iceberg destinations. + mutable std::optional commit_info; + std::string getCompositeKey() const { const auto qualified_table_name = QualifiedTableName {manifest.destination_database, manifest.destination_table}; diff --git a/src/Storages/IStorage.h b/src/Storages/IStorage.h index 0a07b892cef8..35732dd6e277 100644 --- a/src/Storages/IStorage.h +++ b/src/Storages/IStorage.h @@ -511,7 +511,30 @@ It is currently only implemented in StorageObjectStorage. std::vector partition_values; }; - virtual void commitExportPartitionTransaction( + /// Paths produced by the destination storage during commit. Surfaced via + /// system.replicated_partition_exports for debugging. Empty for commits + /// that short-circuit on idempotency. + struct ExportPartitionCommitInfo + { + /// Iceberg destinations only. + String iceberg_metadata_file; + String iceberg_manifest_list; + String iceberg_manifest_file; + + /// Plain object storage destinations only: path of the commit marker file + /// written/observed by StorageObjectStorage::commitExportPartitionTransaction. + String commit_marker_file; + + bool empty() const + { + return iceberg_metadata_file.empty() + && iceberg_manifest_list.empty() + && iceberg_manifest_file.empty() + && commit_marker_file.empty(); + } + }; + + virtual ExportPartitionCommitInfo commitExportPartitionTransaction( const String & /* transaction_id */, const String & /* partition_id */, const Strings & /* exported_paths */, diff --git a/src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.cpp b/src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.cpp index b45f3667be87..1b479e39fe5b 100644 --- a/src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.cpp +++ b/src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.cpp @@ -127,6 +127,107 @@ namespace return out; } + /// Fetch all per-part processed leaves under /processed and build a + /// fresh map keyed by part_name. Returns the destination file paths recorded for + /// each finished part. + /// + /// Same lenient semantics as readLastExceptionPerReplica: an empty result means + /// "nothing actionable" (transient ZK error, no children yet, or all leaves + /// concurrently removed) and callers MUST skip the assignment to preserve the + /// in-memory mirror across glitches. Safe because processed/ leaves are + /// written once on per-part success and never rewritten — the entire entry path + /// is wiped recursively at task cleanup, handled separately. + std::map> readDestinationFilePathsPerPart( + const zkutil::ZooKeeperPtr & zk, + const std::filesystem::path & entry_path, + const std::string & log_key, + const LoggerPtr & log) + { + std::map> out; + + const auto container_path = entry_path / "processed"; + + Strings children; + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperGetChildren); + if (Coordination::Error::ZOK != zk->tryGetChildren(container_path, children)) + { + LOG_INFO(log, "ExportPartition Manifest Updating Task: failed to list processed leaves for {}, leaving in-memory copy untouched", log_key); + return out; + } + + if (children.empty()) + return out; + + std::vector paths; + paths.reserve(children.size()); + for (const auto & child : children) + paths.emplace_back(container_path / child); + + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperGet, paths.size()); + auto responses = zk->tryGet(paths); + responses.waitForResponses(); + + for (size_t i = 0; i < paths.size(); ++i) + { + Coordination::GetResponse response; + try + { + response = responses[i]; + } + catch (...) + { + LOG_WARNING(log, "ExportPartition Manifest Updating Task: ZK error fetching processed leaf {} for {}, skipping", children[i], log_key); + continue; + } + + if (response.error != Coordination::Error::ZOK) + continue; + + try + { + auto entry = ExportReplicatedMergeTreePartitionProcessedPartEntry::fromJsonString(response.data); + out.emplace(std::move(entry.part_name), std::move(entry.paths_in_destination)); + } + catch (...) + { + LOG_WARNING(log, "ExportPartition Manifest Updating Task: malformed processed JSON for {} (leaf {}), ignoring", log_key, children[i]); + } + } + + return out; + } + + /// Read the optional /commit_info znode and return the parsed entry. + /// Returns nullopt when the znode is absent (task has not committed yet, peer + /// crashed before writing it, or transient ZK error). Callers should treat + /// nullopt as "leave the in-memory copy untouched". + std::optional readCommitInfo( + const zkutil::ZooKeeperPtr & zk, + const std::filesystem::path & entry_path, + const std::string & log_key, + const LoggerPtr & log) + { + const auto commit_info_path = entry_path / "commit_info"; + + std::string data; + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperGet); + if (!zk->tryGet(commit_info_path, data)) + return std::nullopt; + + try + { + return ExportReplicatedMergeTreePartitionCommitInfoEntry::fromJsonString(data); + } + catch (...) + { + LOG_WARNING(log, "ExportPartition Manifest Updating Task: malformed commit_info JSON for {}, ignoring", log_key); + return std::nullopt; + } + } + /* Remove expired entries and fix non-committed exports that have already exported all parts. @@ -305,6 +406,8 @@ std::vector ExportPartitionManifestUpdatingTask:: ExportReplicatedMergeTreePartitionManifest manifest; ExportReplicatedMergeTreePartitionTaskEntry::Status status; std::map last_exception_per_replica; + std::map> destination_file_paths_per_part; + std::optional commit_info; }; std::vector snapshots; @@ -314,7 +417,12 @@ std::vector ExportPartitionManifestUpdatingTask:: snapshots.reserve(storage.export_merge_tree_partition_task_entries_by_key.size()); for (const auto & entry : storage.export_merge_tree_partition_task_entries_by_key) - snapshots.push_back(EntrySnapshot{entry.manifest, entry.status, entry.last_exception_per_replica}); + snapshots.emplace_back( + entry.manifest, + entry.status, + std::move(entry.last_exception_per_replica), + std::move(entry.destination_file_paths_per_part), + std::move(entry.commit_info)); } std::vector infos; @@ -345,6 +453,16 @@ std::vector ExportPartitionManifestUpdatingTask:: } info.exception_count = total_exception_count; + info.destination_file_paths_per_part = std::move(snapshot.destination_file_paths_per_part); + + if (snapshot.commit_info) + { + info.committed_metadata_file = snapshot.commit_info->iceberg_metadata_file; + info.committed_manifest_list = snapshot.commit_info->iceberg_manifest_list; + info.committed_manifest_file = snapshot.commit_info->iceberg_manifest_file; + info.committed_marker_file = snapshot.commit_info->commit_marker_file; + } + infos.emplace_back(std::move(info)); } @@ -420,6 +538,17 @@ void ExportPartitionManifestUpdatingTask::poll() /// in-memory copy stays intact. auto last_exception_per_replica = readLastExceptionPerReplica( zk, fs::path(entry_path), key, storage.log.load()); + + /// Mirror per-part destination file paths from /processed/. + /// Same lenient pattern: empty result = leave in-memory copy untouched. + auto destination_file_paths_per_part = readDestinationFilePathsPerPart( + zk, fs::path(entry_path), key, storage.log.load()); + + /// Mirror commit_info znode (present only after a successful commit). nullopt + /// means the znode does not exist yet (or transient ZK error); leave the + /// in-memory copy untouched in that case. + auto commit_info = readCommitInfo( + zk, fs::path(entry_path), key, storage.log.load()); const auto local_entry = entries_by_key.find(key); @@ -495,11 +624,15 @@ void ExportPartitionManifestUpdatingTask::poll() /// holding the cleanup lock (cleanup did not consume the entry). if (!last_exception_per_replica.empty()) local_entry->last_exception_per_replica = std::move(last_exception_per_replica); + if (!destination_file_paths_per_part.empty()) + local_entry->destination_file_paths_per_part = std::move(destination_file_paths_per_part); + if (commit_info) + local_entry->commit_info = std::move(commit_info); LOG_INFO(storage.log, "ExportPartition Manifest Updating Task: Skipping {}: already exists", key); continue; } - addTask(metadata, *status, std::move(last_exception_per_replica), key, entries_by_key); + addTask(metadata, *status, std::move(last_exception_per_replica), std::move(destination_file_paths_per_part), std::move(commit_info), key, entries_by_key); } /// Remove entries that were deleted by someone else @@ -567,6 +700,8 @@ void ExportPartitionManifestUpdatingTask::addTask( const ExportReplicatedMergeTreePartitionManifest & metadata, ExportReplicatedMergeTreePartitionTaskEntry::Status status, std::map last_exception_per_replica, + std::map> destination_file_paths_per_part, + std::optional commit_info, const std::string & key, auto & entries_by_key ) @@ -589,7 +724,13 @@ void ExportPartitionManifestUpdatingTask::addTask( } /// Insert or update entry. The multi_index container automatically maintains both indexes. - ExportReplicatedMergeTreePartitionTaskEntry entry {metadata, status, std::move(part_references), std::move(last_exception_per_replica)}; + ExportReplicatedMergeTreePartitionTaskEntry entry { + metadata, + status, + std::move(part_references), + std::move(last_exception_per_replica), + std::move(destination_file_paths_per_part), + std::move(commit_info)}; auto it = entries_by_key.find(key); if (it != entries_by_key.end()) entries_by_key.replace(it, entry); @@ -701,6 +842,22 @@ void ExportPartitionManifestUpdatingTask::handleStatusChanges() it->last_exception_per_replica = std::move(fetched); } + /// Refresh per-part destination paths and commit_info on the status flip too, + /// so the system table observes the COMPLETED state and the committed file + /// paths in the same poll cycle. + if (auto fetched = readDestinationFilePathsPerPart( + zk, fs::path(storage.zookeeper_path) / "exports" / key, key, storage.log.load()); + !fetched.empty()) + { + it->destination_file_paths_per_part = std::move(fetched); + } + + if (auto fetched_commit_info = readCommitInfo( + zk, fs::path(storage.zookeeper_path) / "exports" / key, key, storage.log.load())) + { + it->commit_info = std::move(fetched_commit_info); + } + /// If status changed to KILLED, cancel local export operations if (*new_status == ExportReplicatedMergeTreePartitionTaskEntry::Status::KILLED) { diff --git a/src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.h b/src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.h index 32487f2dc68c..71e16c1848b0 100644 --- a/src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.h +++ b/src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.h @@ -34,6 +34,8 @@ class ExportPartitionManifestUpdatingTask const ExportReplicatedMergeTreePartitionManifest & metadata, ExportReplicatedMergeTreePartitionTaskEntry::Status status, std::map last_exception_per_replica, + std::map> destination_file_paths_per_part, + std::optional commit_info, const std::string & key, auto & entries_by_key ); diff --git a/src/Storages/MergeTree/ExportPartitionUtils.cpp b/src/Storages/MergeTree/ExportPartitionUtils.cpp index 4a447c8de3ab..0cd4b086d871 100644 --- a/src/Storages/MergeTree/ExportPartitionUtils.cpp +++ b/src/Storages/MergeTree/ExportPartitionUtils.cpp @@ -208,7 +208,8 @@ namespace ExportPartitionUtils getPartitionValuesForIcebergCommit(source_storage, manifest.partition_id); } - destination_storage->commitExportPartitionTransaction(manifest.transaction_id, manifest.partition_id, exported_paths, iceberg_args, context); + const auto destination_commit_info = destination_storage->commitExportPartitionTransaction( + manifest.transaction_id, manifest.partition_id, exported_paths, iceberg_args, context); /// Failpoint to simulate a crash after the Iceberg commit succeeds but before /// ZooKeeper is updated to COMPLETED. Used by idempotency integration tests. @@ -221,16 +222,45 @@ namespace ExportPartitionUtils }); LOG_INFO(log, "ExportPartition: Committed export, mark as completed"); + + const std::string status_path = fs::path(entry_path) / "status"; + const std::string completed_name = String(magic_enum::enum_name(ExportReplicatedMergeTreePartitionTaskEntry::Status::COMPLETED)).data(); + + Coordination::Requests ops; + ops.emplace_back(zkutil::makeSetRequest(status_path, completed_name, -1)); + + if (!destination_commit_info.empty()) + { + ExportReplicatedMergeTreePartitionCommitInfoEntry commit_info_entry { + destination_commit_info.iceberg_metadata_file, + destination_commit_info.iceberg_manifest_list, + destination_commit_info.iceberg_manifest_file, + destination_commit_info.commit_marker_file}; + + const std::string commit_info_path = fs::path(entry_path) / "commit_info"; + ops.emplace_back(zkutil::makeCreateRequest(commit_info_path, commit_info_entry.toJsonString(), zkutil::CreateMode::Persistent)); + } + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); - ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperSet); - if (Coordination::Error::ZOK == zk->trySet(fs::path(entry_path) / "status", String(magic_enum::enum_name(ExportReplicatedMergeTreePartitionTaskEntry::Status::COMPLETED)).data(), -1)) + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperMulti); + + Coordination::Responses responses; + const auto rc = zk->tryMulti(ops, responses); + + if (rc == Coordination::Error::ZOK) { - LOG_INFO(log, "ExportPartition: Marked export as completed"); + LOG_INFO(log, "ExportPartition: Marked export as completed{}", + destination_commit_info.empty() ? "" : " and persisted commit_info"); + return; } - else + + if (rc == Coordination::Error::ZNODEEXISTS) { - throw Exception(ErrorCodes::NETWORK_ERROR, "ExportPartition: Failed to mark export as completed, will not try to fix it"); + LOG_INFO(log, "ExportPartition: commit_info already present (peer wrote it first); task already COMPLETED"); + return; } + + throw Exception(ErrorCodes::NETWORK_ERROR, "ExportPartition: Failed to mark export as completed (rc={}), will not try to fix it", rc); } bool handleCommitFailure( diff --git a/src/Storages/MergeTree/tests/gtest_export_partition_ordering.cpp b/src/Storages/MergeTree/tests/gtest_export_partition_ordering.cpp index c4669a9bf55c..b3c750129988 100644 --- a/src/Storages/MergeTree/tests/gtest_export_partition_ordering.cpp +++ b/src/Storages/MergeTree/tests/gtest_export_partition_ordering.cpp @@ -43,9 +43,9 @@ TEST_F(ExportPartitionOrderingTest, IterationOrderMatchesCreateTime) manifest3.transaction_id = "tx3"; manifest3.create_time = base_time; // Oldest - ExportReplicatedMergeTreePartitionTaskEntry entry1{manifest1, ExportReplicatedMergeTreePartitionTaskEntry::Status::PENDING, {}, {}}; - ExportReplicatedMergeTreePartitionTaskEntry entry2{manifest2, ExportReplicatedMergeTreePartitionTaskEntry::Status::PENDING, {}, {}}; - ExportReplicatedMergeTreePartitionTaskEntry entry3{manifest3, ExportReplicatedMergeTreePartitionTaskEntry::Status::PENDING, {}, {}}; + ExportReplicatedMergeTreePartitionTaskEntry entry1{manifest1, ExportReplicatedMergeTreePartitionTaskEntry::Status::PENDING, {}, {}, {}, {}}; + ExportReplicatedMergeTreePartitionTaskEntry entry2{manifest2, ExportReplicatedMergeTreePartitionTaskEntry::Status::PENDING, {}, {}, {}, {}}; + ExportReplicatedMergeTreePartitionTaskEntry entry3{manifest3, ExportReplicatedMergeTreePartitionTaskEntry::Status::PENDING, {}, {}, {}, {}}; // Insert in reverse order by_key.insert(entry1); diff --git a/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h b/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h index 92152ce75c2e..d22012f55c67 100644 --- a/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h +++ b/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h @@ -16,6 +16,7 @@ #include #include #include +#include #include #include #include @@ -217,7 +218,7 @@ class IDataLakeMetadata : boost::noncopyable throwNotImplemented("import"); } - virtual void commitExportPartitionTransaction( + virtual IStorage::ExportPartitionCommitInfo commitExportPartitionTransaction( std::shared_ptr /* catalog */, const StorageID & /* table_id */, const String & /* transaction_id */, diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp index 7e1dbdec3a43..258e75142b53 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp @@ -1521,7 +1521,7 @@ Poco::JSON::Object::Ptr lookupSchema(const Poco::JSON::Object::Ptr & meta, Int64 } -bool IcebergMetadata::commitImportPartitionTransactionImpl( +IStorage::ExportPartitionCommitInfo IcebergMetadata::commitImportPartitionTransactionImpl( FileNamesGenerator & filename_generator, Poco::JSON::Object::Ptr & metadata, Poco::JSON::Object::Ptr & partition_spec, @@ -1547,7 +1547,13 @@ bool IcebergMetadata::commitImportPartitionTransactionImpl( LOG_INFO(log, "Export transaction {} already committed, skipping re-commit", transaction_id); - return true; + /// Surface a sentinel so the caller treats this as a successful attempt (non-empty + /// commit info), persists a commit_info znode, and makes the situation visible in + /// system.replicated_partition_exports.committed_metadata_file. We do not know the + /// original committer's paths from here. + IStorage::ExportPartitionCommitInfo already_committed_info; + already_committed_info.iceberg_metadata_file = ""; + return already_committed_info; } CompressionMethod metadata_compression_method = persistent_components.metadata_compression_method; @@ -1744,7 +1750,7 @@ bool IcebergMetadata::commitImportPartitionTransactionImpl( { LOG_DEBUG(log, "Failed to write metadata {}, retrying", metadata_info.path); cleanup(true); - return false; + return {}; } LOG_DEBUG(log, "Metadata file {} written", metadata_info.path); @@ -1757,7 +1763,7 @@ bool IcebergMetadata::commitImportPartitionTransactionImpl( if (!catalog->updateMetadata(namespace_name, table_name, catalog_filename, new_snapshot)) { cleanup(true); - return false; + return {}; } /// Catalog has accepted the commit - the new snapshot is now live and references @@ -1800,11 +1806,16 @@ bool IcebergMetadata::commitImportPartitionTransactionImpl( /// post-publish work (e.g. metadata-cache invalidation). Running cleanup() /// here would delete manifest files referenced by the published snapshot /// and corrupt it. Log and swallow - any transient state (stale cache) - /// is self-healing on subsequent reads. + /// is self-healing on subsequent reads. Surface the published paths anyway + /// so the partition export task can persist them in ZooKeeper. tryLogCurrentException(log, "Post-publish work failed after Iceberg snapshot was committed; " "skipping manifest cleanup to preserve published snapshot"); - return true; + IStorage::ExportPartitionCommitInfo published_info; + published_info.iceberg_metadata_file = resolver.resolve(metadata_info.path); + published_info.iceberg_manifest_list = storage_manifest_list_name; + published_info.iceberg_manifest_file = storage_manifest_entry_path; + return published_info; } LOG_ERROR(log, "Failed to commit import partition transaction: {}", getCurrentExceptionMessage(false)); @@ -1812,10 +1823,18 @@ bool IcebergMetadata::commitImportPartitionTransactionImpl( throw; } - return true; + /// Record the storage paths of the files we just published so the partition + /// export task can persist them in ZooKeeper for observability. Only set here + /// (not on the retry / "already committed" paths) so the struct reflects + /// exactly what this attempt produced. + IStorage::ExportPartitionCommitInfo published_info; + published_info.iceberg_metadata_file = resolver.resolve(metadata_info.path); + published_info.iceberg_manifest_list = storage_manifest_list_name; + published_info.iceberg_manifest_file = storage_manifest_entry_path; + return published_info; } -void IcebergMetadata::commitExportPartitionTransaction( +IStorage::ExportPartitionCommitInfo IcebergMetadata::commitExportPartitionTransaction( std::shared_ptr catalog, const StorageID & table_id, const String & transaction_id, @@ -1858,7 +1877,9 @@ void IcebergMetadata::commitExportPartitionTransaction( LOG_INFO(log, "Export transaction {} already committed, skipping re-commit", transaction_id); - return; + IStorage::ExportPartitionCommitInfo already_committed_info; + already_committed_info.iceberg_metadata_file = ""; + return already_committed_info; } /// Fail fast if the table schema or partition spec changed between export-start and commit. @@ -1917,7 +1938,7 @@ void IcebergMetadata::commitExportPartitionTransaction( size_t attempt = 0; while (attempt < MAX_TRANSACTION_RETRIES) { - if (commitImportPartitionTransactionImpl( + auto commit_info = commitImportPartitionTransactionImpl( filename_generator, metadata, partition_spec, @@ -1935,10 +1956,9 @@ void IcebergMetadata::commitExportPartitionTransaction( total_chunks_size, catalog, table_id, - context)) - { - return; - } + context); + if (!commit_info.empty()) + return commit_info; ++attempt; } diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h index 085e7586440a..d55bfb3168a9 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h @@ -134,7 +134,7 @@ class IcebergMetadata : public IDataLakeMetadata /// data_file_paths contains the metadata-path for each exported data file (as recorded in /// ZooKeeper). For every path a co-located sidecar Avro file (same path, ".avro" extension) /// must exist in the object storage; it supplies record_count and file_size_in_bytes. - void commitExportPartitionTransaction( + IStorage::ExportPartitionCommitInfo commitExportPartitionTransaction( std::shared_ptr catalog, const StorageID & table_id, const String & transaction_id, @@ -207,7 +207,12 @@ class IcebergMetadata : public IDataLakeMetadata std::optional getPartitionKey(ContextPtr local_context, Iceberg::TableStateSnapshot actual_table_state_snapshot) const; KeyDescription getSortingKey(ContextPtr local_context, Iceberg::TableStateSnapshot actual_table_state_snapshot) const; - bool commitImportPartitionTransactionImpl( + /// Non-empty return value means the attempt succeeded (covers both the normal + /// publish path and the `isExportPartitionTransactionAlreadyCommitted` short-circuit). + /// An empty `ExportPartitionCommitInfo` means the caller must retry. The + /// short-circuit branch fills `iceberg_metadata_file` with a sentinel note since + /// the original committer's paths are not trivially recoverable from inside this call. + IStorage::ExportPartitionCommitInfo commitImportPartitionTransactionImpl( FileNamesGenerator & filename_generator, Poco::JSON::Object::Ptr & metadata, Poco::JSON::Object::Ptr & partition_spec, diff --git a/src/Storages/ObjectStorage/StorageObjectStorage.cpp b/src/Storages/ObjectStorage/StorageObjectStorage.cpp index c08db678c46c..980b2a636d1b 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorage.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorage.cpp @@ -668,7 +668,7 @@ SinkToStoragePtr StorageObjectStorage::import( local_context); } -void StorageObjectStorage::commitExportPartitionTransaction( +IStorage::ExportPartitionCommitInfo StorageObjectStorage::commitExportPartitionTransaction( const String & transaction_id, const String & partition_id, const Strings & exported_paths, @@ -689,7 +689,7 @@ void StorageObjectStorage::commitExportPartitionTransaction( const auto partition_spec_id = iceberg_metadata->getValue(Iceberg::f_default_spec_id); configuration->lazyInitializeIfNeeded(object_storage, local_context); - configuration->getExternalMetadata()->commitExportPartitionTransaction( + return configuration->getExternalMetadata()->commitExportPartitionTransaction( catalog, storage_id, transaction_id, @@ -699,16 +699,20 @@ void StorageObjectStorage::commitExportPartitionTransaction( std::make_shared(getInMemoryMetadataPtr()->getSampleBlock()), exported_paths, local_context); - return; } const String commit_object = configuration->getRawPath().path + "/commit_" + partition_id + "_" + transaction_id; + ExportPartitionCommitInfo result; + result.commit_marker_file = commit_object; + /// if file already exists, nothing to be done if (object_storage->exists(StoredObject(commit_object))) { LOG_DEBUG(getLogger("StorageObjectStorage"), "Commit file already exists, nothing to be done: {}", commit_object); - return; + /// Still surface the path: observability does not require we wrote it, + /// only that it is the committed marker for this transaction. + return result; } auto out = object_storage->writeObject(StoredObject(commit_object), WriteMode::Rewrite, /* attributes= */ {}, DBMS_DEFAULT_BUFFER_SIZE, local_context->getWriteSettings()); @@ -718,6 +722,7 @@ void StorageObjectStorage::commitExportPartitionTransaction( out->write("\n", 1); } out->finalize(); + return result; } void StorageObjectStorage::truncate( diff --git a/src/Storages/ObjectStorage/StorageObjectStorage.h b/src/Storages/ObjectStorage/StorageObjectStorage.h index 8c04b71d9487..6ace629b7085 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorage.h +++ b/src/Storages/ObjectStorage/StorageObjectStorage.h @@ -89,7 +89,7 @@ class StorageObjectStorage : public IStorage const std::optional & /* format_settings_ */, ContextPtr /* context */) override; - void commitExportPartitionTransaction( + ExportPartitionCommitInfo commitExportPartitionTransaction( const String & transaction_id, const String & partition_id, const Strings & exported_paths, diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp index e5131d06ae2e..6211237d220e 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp @@ -1111,7 +1111,7 @@ SinkToStoragePtr StorageObjectStorageCluster::import( context); } -void StorageObjectStorageCluster::commitExportPartitionTransaction( +IStorage::ExportPartitionCommitInfo StorageObjectStorageCluster::commitExportPartitionTransaction( const String & transaction_id, const String & partition_id, const Strings & exported_paths, @@ -1120,16 +1120,15 @@ void StorageObjectStorageCluster::commitExportPartitionTransaction( { if (pure_storage) { - pure_storage->commitExportPartitionTransaction( + return pure_storage->commitExportPartitionTransaction( transaction_id, partition_id, exported_paths, iceberg_commit_export_partition_arguments, local_context ); - return; } - IStorageCluster::commitExportPartitionTransaction( + return IStorageCluster::commitExportPartitionTransaction( transaction_id, partition_id, exported_paths, diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.h b/src/Storages/ObjectStorage/StorageObjectStorageCluster.h index 52c7d5951855..f5fa4e7434bd 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.h +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.h @@ -44,7 +44,7 @@ class StorageObjectStorageCluster : public IStorageCluster const std::optional & format_settings_, ContextPtr context) override; - void commitExportPartitionTransaction( + ExportPartitionCommitInfo commitExportPartitionTransaction( const String & transaction_id, const String & partition_id, const Strings & exported_paths, diff --git a/src/Storages/System/StorageSystemReplicatedPartitionExports.cpp b/src/Storages/System/StorageSystemReplicatedPartitionExports.cpp index 9e8faab689d6..25fd130d0198 100644 --- a/src/Storages/System/StorageSystemReplicatedPartitionExports.cpp +++ b/src/Storages/System/StorageSystemReplicatedPartitionExports.cpp @@ -6,6 +6,7 @@ #include #include #include +#include #include #include #include @@ -47,6 +48,16 @@ ColumnsDescription StorageSystemReplicatedPartitionExports::getColumnsDescriptio "Per-replica last exception entries. Each tuple records the most recent exception observed by that replica plus a best-effort within-replica count. Empty array if no replica has reported an exception for this task."}, {"exception_count", std::make_shared(), "Sum of per-replica exception counts. Each replica owns its own count, so the sum is exact w.r.t. the in-memory snapshot; within-replica updates remain best-effort and may under-count by one under concurrent failures."}, + {"destination_file_paths", std::make_shared(std::make_shared(), std::make_shared(std::make_shared())), + "Per-part destination file paths written to the destination object storage. Keyed by part name; values are the file paths produced by exporting that part. Mirrored from ZooKeeper on every poll while PENDING; partial during in-flight tasks."}, + {"committed_metadata_file", std::make_shared(), + "For Iceberg destinations: path of the new metadata JSON file written at commit time. Empty for non-Iceberg destinations and for tasks that have not committed yet. May also be empty if the committing replica crashed between writing the object-storage files and persisting commit_info. If the export was already committed by a previous run (detected via the transaction id stored in the snapshot summary), this column holds a human-readable note instead of a path since the original committer's paths are not trivially recoverable."}, + {"committed_manifest_list", std::make_shared(), + "For Iceberg destinations: path of the manifest list file (snap-*.avro) referenced by the new snapshot. Empty under the same conditions as committed_metadata_file."}, + {"committed_manifest_file", std::make_shared(), + "For Iceberg destinations: path of the manifest file referenced by committed_manifest_list. Empty under the same conditions as committed_metadata_file."}, + {"committed_marker_file", std::make_shared(), + "For plain object storage destinations: path of the per-transaction commit marker file written by the destination. Empty for Iceberg destinations and for tasks that have not committed yet."}, }; } @@ -152,6 +163,25 @@ void StorageSystemReplicatedPartitionExports::fillData(MutableColumns & res_colu per_replica.push_back(Tuple{ex.replica, ex.message, ex.part, ex.time, ex.count}); res_columns[i++]->insert(per_replica); res_columns[i++]->insert(info.exception_count); + + Map destination_paths_map; + destination_paths_map.reserve(info.destination_file_paths_per_part.size()); + for (const auto & [part_name, paths] : info.destination_file_paths_per_part) + { + Array paths_array; + paths_array.reserve(paths.size()); + for (const auto & path : paths) + paths_array.push_back(path); + destination_paths_map.emplace_back(Tuple{part_name, std::move(paths_array)}); + } + res_columns[i++]->insert(std::move(destination_paths_map)); + + res_columns[i++]->insert(info.committed_metadata_file); + res_columns[i++]->insert(info.committed_manifest_list); + + res_columns[i++]->insert(info.committed_manifest_file); + + res_columns[i++]->insert(info.committed_marker_file); } } } diff --git a/src/Storages/System/StorageSystemReplicatedPartitionExports.h b/src/Storages/System/StorageSystemReplicatedPartitionExports.h index a8666374a7f0..244fee61ae4a 100644 --- a/src/Storages/System/StorageSystemReplicatedPartitionExports.h +++ b/src/Storages/System/StorageSystemReplicatedPartitionExports.h @@ -29,6 +29,19 @@ struct ReplicatedPartitionExportInfo /// single replica the count is best-effort (concurrent failing writers may under- /// count by one), matching the documented column semantics. size_t exception_count = 0; + /// Per-part destination file paths, keyed by part name. Mirrors the + /// /processed//paths_in_destination data from ZooKeeper. + /// Empty until parts complete; partial during PENDING. + std::map> destination_file_paths_per_part; + /// Iceberg commit-time paths surfaced from /commit_info. + /// All empty for non-Iceberg destinations or before commit lands. + String committed_metadata_file; + String committed_manifest_list; + String committed_manifest_file; + /// Plain object storage commit marker file surfaced from + /// /commit_info. Empty for Iceberg destinations or before + /// commit lands. + String committed_marker_file; }; class StorageSystemReplicatedPartitionExports final : public IStorageSystemOneBlock diff --git a/tests/integration/test_export_replicated_mt_partition_to_iceberg/test.py b/tests/integration/test_export_replicated_mt_partition_to_iceberg/test.py index 8b49589a3005..fddee4e1850a 100644 --- a/tests/integration/test_export_replicated_mt_partition_to_iceberg/test.py +++ b/tests/integration/test_export_replicated_mt_partition_to_iceberg/test.py @@ -830,8 +830,9 @@ def test_post_publish_exception_preserves_snapshot(cluster): post-publish region (after both the metadata file is written and `published = true` is set). With the fix in place: - the commit stays durable (snapshot is readable, manifests are intact); - - the export is marked COMPLETED because the idempotency check on retry - detects that the transaction is already committed and returns success; + - the export is marked COMPLETED because the outer `catch (...)` sees + `published == true` and returns the populated commit info with the real + paths produced by this attempt (no retry needed); - all exported rows are visible through the Iceberg table. """ node = cluster.instances["replica1"] @@ -862,6 +863,28 @@ def test_post_publish_exception_preserves_snapshot(cluster): f"Unexpected data after post-publish exception recovery:\n{result}" ) + # After a post-publish exception the catch handler with published==true returns + # the populated commit info (real metadata / manifest list / manifest file paths). + # ExportPartitionUtils::commit persists it to the commit_info znode, so the system + # table should show a real metadata path here, not the already-committed sentinel. + committed_metadata_file = node.query( + f""" + SELECT committed_metadata_file FROM system.replicated_partition_exports + WHERE source_table = '{mt_table}' + AND destination_table = '{iceberg_table}' + AND partition_id = '2020' + """ + ).strip() + assert committed_metadata_file, ( + "committed_metadata_file should be populated after a successful post-publish-catch return" + ) + assert not committed_metadata_file.startswith("<"), ( + f"committed_metadata_file should be a real metadata path, got the already-committed sentinel: {committed_metadata_file!r}" + ) + assert committed_metadata_file.endswith(".metadata.json"), ( + f"Expected a *.metadata.json path in committed_metadata_file, got: {committed_metadata_file!r}" + ) + def test_export_task_timeout_kills_stuck_pending_task(cluster): """ diff --git a/tests/integration/test_export_replicated_mt_partition_to_object_storage/test.py b/tests/integration/test_export_replicated_mt_partition_to_object_storage/test.py index 3161e3b67100..74f03033976d 100644 --- a/tests/integration/test_export_replicated_mt_partition_to_object_storage/test.py +++ b/tests/integration/test_export_replicated_mt_partition_to_object_storage/test.py @@ -693,6 +693,29 @@ def test_export_partition_file_already_exists_policy(cluster): # wait for the exports to finish wait_for_export_status(node, mt_table, s3_table, "2020", "COMPLETED") + # plain object storage destinations surface the commit marker file path via + # system.replicated_partition_exports.committed_marker_file + committed_marker_file = node.query( + f""" + SELECT committed_marker_file FROM system.replicated_partition_exports + WHERE source_table = '{mt_table}' + AND destination_table = '{s3_table}' + AND partition_id = '2020' + """ + ).strip() + # `committed_marker_file` is the absolute key in the bucket (same convention as + # `destination_file_paths`); it may carry the s3_conn URL's in-bucket prefix on + # top of the table's `filename` argument, so use a "contains" check that does + # not depend on knowing that prefix. + assert f"{s3_table}/commit_2020_" in committed_marker_file, \ + f"Expected committed_marker_file under {s3_table}/, got: {committed_marker_file!r}" + # Path relative to the `s3_conn` URL, derived from the absolute key without + # assuming a particular URL prefix. + marker_relative_path = committed_marker_file[committed_marker_file.index(f"{s3_table}/"):] + assert node.query( + f"SELECT count() FROM s3(s3_conn, filename='{marker_relative_path}', format=LineAsString)" + ) == '1\n', f"Commit marker file does not exist at {committed_marker_file!r}" + # try to export the partition node.query( f"ALTER TABLE {mt_table} EXPORT PARTITION ID '2020' TO TABLE {s3_table} SETTINGS export_merge_tree_partition_force_export=1" diff --git a/tests/integration/test_storage_iceberg_with_spark/test_export_partition_iceberg.py b/tests/integration/test_storage_iceberg_with_spark/test_export_partition_iceberg.py index 5466fe543275..9e83d160d2f9 100644 --- a/tests/integration/test_storage_iceberg_with_spark/test_export_partition_iceberg.py +++ b/tests/integration/test_storage_iceberg_with_spark/test_export_partition_iceberg.py @@ -712,6 +712,19 @@ def test_idempotency_after_commit_crash(export_cluster): count = int(node.query(f"SELECT count() FROM {iceberg}").strip()) assert count == 3, f"Expected 3 rows (no duplicates), got {count}" + # The already-committed early-exit in commitExportPartitionTransaction surfaces + # a sentinel note in committed_metadata_file (the original committer's paths + # are not recoverable from inside the call). The sentinel makes the situation + # visible in system.replicated_partition_exports rather than leaving the + # commit_info columns empty. + committed_metadata_file = node.query( + f"SELECT committed_metadata_file FROM system.replicated_partition_exports " + f"WHERE source_table = '{source}' AND partition_id = '{pid}'" + ).strip() + assert committed_metadata_file == "", ( + f"Expected already-committed sentinel after idempotent retry, got: {committed_metadata_file!r}" + ) + def test_commit_attempts_budget_transitions_to_failed(export_cluster): """ diff --git a/tests/integration/test_storage_iceberg_with_spark/test_export_partition_iceberg_catalog.py b/tests/integration/test_storage_iceberg_with_spark/test_export_partition_iceberg_catalog.py index 8a216860d029..4b1aa1e7d22c 100644 --- a/tests/integration/test_storage_iceberg_with_spark/test_export_partition_iceberg_catalog.py +++ b/tests/integration/test_storage_iceberg_with_spark/test_export_partition_iceberg_catalog.py @@ -367,6 +367,14 @@ def test_catalog_idempotent_retry(catalog_export_cluster): f"got {len(history)}" ) + committed_metadata_file = node.query( + f"SELECT committed_metadata_file FROM system.replicated_partition_exports " + f"WHERE source_table = '{source}' AND partition_id = '{pid}'" + ).strip() + assert committed_metadata_file == "", ( + f"Expected already-committed sentinel after idempotent retry, got: {committed_metadata_file!r}" + ) + # --------------------------------------------------------------------------- # Replicated catalog tests