diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index c4e193b89..6254c7c85 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -110,6 +110,7 @@ set(ICEBERG_SOURCES util/string_util.cc util/struct_like_set.cc util/temporal_util.cc + util/thread_pool_internal.cc util/timepoint.cc util/transform_util.cc util/truncate_util.cc diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build index c2947f3fe..4fd213a78 100644 --- a/src/iceberg/meson.build +++ b/src/iceberg/meson.build @@ -132,6 +132,7 @@ iceberg_sources = files( 'util/string_util.cc', 'util/struct_like_set.cc', 'util/temporal_util.cc', + 'util/thread_pool_internal.cc', 'util/timepoint.cc', 'util/transform_util.cc', 'util/truncate_util.cc', diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt index 1d80b29a5..2169669b1 100644 --- a/src/iceberg/test/CMakeLists.txt +++ b/src/iceberg/test/CMakeLists.txt @@ -132,6 +132,7 @@ add_iceberg_test(util_test retry_util_test.cc string_util_test.cc struct_like_set_test.cc + thread_pool_test.cc transform_util_test.cc truncate_util_test.cc url_encoder_test.cc diff --git a/src/iceberg/test/expire_snapshots_test.cc b/src/iceberg/test/expire_snapshots_test.cc index 4dcc72d6c..651386096 100644 --- a/src/iceberg/test/expire_snapshots_test.cc +++ b/src/iceberg/test/expire_snapshots_test.cc @@ -19,6 +19,7 @@ #include "iceberg/update/expire_snapshots.h" +#include #include #include #include @@ -244,10 +245,13 @@ TEST_F(ExpireSnapshotsTest, ExpireOlderThan) { } TEST_F(ExpireSnapshotsTest, FinalizeRequiresCommittedMetadata) { + std::mutex deleted_files_mu; std::vector deleted_files; ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots()); - update->DeleteWith( - [&deleted_files](const std::string& path) { deleted_files.push_back(path); }); + update->DeleteWith([&deleted_files, &deleted_files_mu](const std::string& path) { + std::lock_guard lock(deleted_files_mu); + deleted_files.push_back(path); + }); // Apply first so apply_result_ is cached ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply()); @@ -262,11 +266,14 @@ TEST_F(ExpireSnapshotsTest, FinalizeRequiresCommittedMetadata) { } TEST_F(ExpireSnapshotsTest, CleanupNoneSkipsDeletion) { + std::mutex deleted_files_mu; std::vector deleted_files; ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots()); update->CleanupLevel(CleanupLevel::kNone); - update->DeleteWith( - [&deleted_files](const std::string& path) { deleted_files.push_back(path); }); + update->DeleteWith([&deleted_files, &deleted_files_mu](const std::string& path) { + std::lock_guard lock(deleted_files_mu); + deleted_files.push_back(path); + }); ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply()); EXPECT_EQ(result.snapshot_ids_to_remove.size(), 1); @@ -278,10 +285,13 @@ TEST_F(ExpireSnapshotsTest, CleanupNoneSkipsDeletion) { } TEST_F(ExpireSnapshotsTest, FinalizeSkippedOnCommitError) { + std::mutex deleted_files_mu; std::vector deleted_files; ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots()); - update->DeleteWith( - [&deleted_files](const std::string& path) { deleted_files.push_back(path); }); + update->DeleteWith([&deleted_files, &deleted_files_mu](const std::string& path) { + std::lock_guard lock(deleted_files_mu); + deleted_files.push_back(path); + }); ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply()); EXPECT_EQ(result.snapshot_ids_to_remove.size(), 1); @@ -294,11 +304,14 @@ TEST_F(ExpireSnapshotsTest, FinalizeSkippedOnCommitError) { } TEST_F(ExpireSnapshotsTest, FinalizeSkipsWhenNothingExpired) { + std::mutex deleted_files_mu; std::vector deleted_files; ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots()); update->RetainLast(2); - update->DeleteWith( - [&deleted_files](const std::string& path) { deleted_files.push_back(path); }); + update->DeleteWith([&deleted_files, &deleted_files_mu](const std::string& path) { + std::lock_guard lock(deleted_files_mu); + deleted_files.push_back(path); + }); ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply()); EXPECT_TRUE(result.snapshot_ids_to_remove.empty()); @@ -348,10 +361,13 @@ TEST_F(ExpireSnapshotsCleanupTest, IgnoresExpiredDeleteManifestReadFailures) { kCurrentSequenceNumber, {}); RewriteTableWithManifestLists(expired_manifest_list_path, current_manifest_list_path); + std::mutex deleted_files_mu; std::vector deleted_files; ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots()); - update->DeleteWith( - [&deleted_files](const std::string& path) { deleted_files.push_back(path); }); + update->DeleteWith([&deleted_files, &deleted_files_mu](const std::string& path) { + std::lock_guard lock(deleted_files_mu); + deleted_files.push_back(path); + }); EXPECT_THAT(update->Commit(), IsOk()); EXPECT_THAT(deleted_files, testing::UnorderedElementsAre(expired_data_file_path, @@ -386,10 +402,13 @@ TEST_F(ExpireSnapshotsCleanupTest, DeletesExpiredFiles) { kCurrentSequenceNumber, {}); RewriteTableWithManifestLists(expired_manifest_list_path, current_manifest_list_path); + std::mutex deleted_files_mu; std::vector deleted_files; ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots()); - update->DeleteWith( - [&deleted_files](const std::string& path) { deleted_files.push_back(path); }); + update->DeleteWith([&deleted_files, &deleted_files_mu](const std::string& path) { + std::lock_guard lock(deleted_files_mu); + deleted_files.push_back(path); + }); EXPECT_THAT(update->Commit(), IsOk()); EXPECT_THAT(deleted_files, testing::UnorderedElementsAre(expired_data_file_path, @@ -424,11 +443,14 @@ TEST_F(ExpireSnapshotsCleanupTest, MetadataOnlySkipsDataDeletion) { kCurrentSequenceNumber, {}); RewriteTableWithManifestLists(expired_manifest_list_path, current_manifest_list_path); + std::mutex deleted_files_mu; std::vector deleted_files; ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots()); update->CleanupLevel(CleanupLevel::kMetadataOnly); - update->DeleteWith( - [&deleted_files](const std::string& path) { deleted_files.push_back(path); }); + update->DeleteWith([&deleted_files, &deleted_files_mu](const std::string& path) { + std::lock_guard lock(deleted_files_mu); + deleted_files.push_back(path); + }); EXPECT_THAT(update->Commit(), IsOk()); EXPECT_THAT(deleted_files, testing::UnorderedElementsAre(expired_data_manifest_path, @@ -462,10 +484,13 @@ TEST_F(ExpireSnapshotsCleanupTest, RetainedDeleteManifestSkipsDataDeletion) { kCurrentSequenceNumber, {current_delete_manifest}); RewriteTableWithManifestLists(expired_manifest_list_path, current_manifest_list_path); + std::mutex deleted_files_mu; std::vector deleted_files; ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots()); - update->DeleteWith( - [&deleted_files](const std::string& path) { deleted_files.push_back(path); }); + update->DeleteWith([&deleted_files, &deleted_files_mu](const std::string& path) { + std::lock_guard lock(deleted_files_mu); + deleted_files.push_back(path); + }); EXPECT_THAT(update->Commit(), IsOk()); EXPECT_THAT(deleted_files, testing::UnorderedElementsAre(expired_data_manifest_path, @@ -487,10 +512,13 @@ TEST_F(ExpireSnapshotsCleanupTest, DeletesExpiredStats) { expired_manifest_list_path, current_manifest_list_path, {MakeStatisticsFile(kExpiredSnapshotId, expired_statistics_path)}, {}); + std::mutex deleted_files_mu; std::vector deleted_files; ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots()); - update->DeleteWith( - [&deleted_files](const std::string& path) { deleted_files.push_back(path); }); + update->DeleteWith([&deleted_files, &deleted_files_mu](const std::string& path) { + std::lock_guard lock(deleted_files_mu); + deleted_files.push_back(path); + }); EXPECT_THAT(update->Commit(), IsOk()); EXPECT_THAT(deleted_files, testing::Contains(expired_statistics_path)); @@ -513,10 +541,13 @@ TEST_F(ExpireSnapshotsCleanupTest, KeepsReusedStats) { MakeStatisticsFile(kCurrentSnapshotId, reused_statistics_path)}, {}); + std::mutex deleted_files_mu; std::vector deleted_files; ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots()); - update->DeleteWith( - [&deleted_files](const std::string& path) { deleted_files.push_back(path); }); + update->DeleteWith([&deleted_files, &deleted_files_mu](const std::string& path) { + std::lock_guard lock(deleted_files_mu); + deleted_files.push_back(path); + }); EXPECT_THAT(update->Commit(), IsOk()); EXPECT_THAT(deleted_files, testing::Not(testing::Contains(reused_statistics_path))); @@ -538,10 +569,13 @@ TEST_F(ExpireSnapshotsCleanupTest, DeletesExpiredPartitionStats) { expired_manifest_list_path, current_manifest_list_path, {}, {MakePartitionStatisticsFile(kExpiredSnapshotId, expired_statistics_path)}); + std::mutex deleted_files_mu; std::vector deleted_files; ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots()); - update->DeleteWith( - [&deleted_files](const std::string& path) { deleted_files.push_back(path); }); + update->DeleteWith([&deleted_files, &deleted_files_mu](const std::string& path) { + std::lock_guard lock(deleted_files_mu); + deleted_files.push_back(path); + }); EXPECT_THAT(update->Commit(), IsOk()); EXPECT_THAT(deleted_files, testing::Contains(expired_statistics_path)); @@ -564,10 +598,13 @@ TEST_F(ExpireSnapshotsCleanupTest, KeepsReusedPartitionStats) { {MakePartitionStatisticsFile(kExpiredSnapshotId, reused_statistics_path), MakePartitionStatisticsFile(kCurrentSnapshotId, reused_statistics_path)}); + std::mutex deleted_files_mu; std::vector deleted_files; ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots()); - update->DeleteWith( - [&deleted_files](const std::string& path) { deleted_files.push_back(path); }); + update->DeleteWith([&deleted_files, &deleted_files_mu](const std::string& path) { + std::lock_guard lock(deleted_files_mu); + deleted_files.push_back(path); + }); EXPECT_THAT(update->Commit(), IsOk()); EXPECT_THAT(deleted_files, testing::Not(testing::Contains(reused_statistics_path))); diff --git a/src/iceberg/test/meson.build b/src/iceberg/test/meson.build index e168d08bf..5d5faf005 100644 --- a/src/iceberg/test/meson.build +++ b/src/iceberg/test/meson.build @@ -96,6 +96,7 @@ iceberg_tests = { 'roaring_position_bitmap_test.cc', 'string_util_test.cc', 'struct_like_set_test.cc', + 'thread_pool_test.cc', 'transform_util_test.cc', 'truncate_util_test.cc', 'url_encoder_test.cc', diff --git a/src/iceberg/test/thread_pool_test.cc b/src/iceberg/test/thread_pool_test.cc new file mode 100644 index 000000000..5398ab8b7 --- /dev/null +++ b/src/iceberg/test/thread_pool_test.cc @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include +#include +#include +#include +#include +#include +#include + +#include + +#include "iceberg/util/thread_pool_internal.h" + +namespace iceberg { + +TEST(ThreadPoolTest, ZeroWorkersThrows) { + EXPECT_THROW(ThreadPool(0), std::invalid_argument); +} + +TEST(ThreadPoolTest, SubmitRunsTask) { + ThreadPool pool(2); + std::atomic n{0}; + auto fut = pool.Submit([&] { n.fetch_add(1, std::memory_order_relaxed); }); + fut.wait(); + EXPECT_EQ(n.load(), 1); +} + +TEST(ThreadPoolTest, RunAndWaitProcessesEveryItem) { + ThreadPool pool(4); + std::vector items(100); + for (int i = 0; i < 100; ++i) items[i] = i; + + std::atomic sum{0}; + pool.RunAndWait(std::span(items), + [&](int v) { sum.fetch_add(v, std::memory_order_relaxed); }); + + // 0 + 1 + ... + 99 = 4950 + EXPECT_EQ(sum.load(), 4950); +} + +TEST(ThreadPoolTest, RunAndWaitEmptyIsNoOp) { + ThreadPool pool(2); + std::vector items; + std::atomic seen{0}; + pool.RunAndWait(std::span(items), + [&](int) { seen.fetch_add(1, std::memory_order_relaxed); }); + EXPECT_EQ(seen.load(), 0); +} + +TEST(ThreadPoolTest, RunAndWaitExecutesConcurrently) { + // Use a barrier-style check: each task increments an in-flight counter, sleeps + // briefly, decrements, and records the peak. With multiple workers the peak + // should exceed 1. + constexpr int kWorkers = 4; + constexpr int kItems = 8; + ThreadPool pool(kWorkers); + + std::vector items(kItems, 0); + std::atomic in_flight{0}; + std::atomic peak{0}; + + pool.RunAndWait(std::span(items), [&](int) { + int now = in_flight.fetch_add(1, std::memory_order_acq_rel) + 1; + int prev = peak.load(std::memory_order_relaxed); + while (now > prev && + !peak.compare_exchange_weak(prev, now, std::memory_order_relaxed)) { + } + std::this_thread::sleep_for(std::chrono::milliseconds(20)); + in_flight.fetch_sub(1, std::memory_order_acq_rel); + }); + + EXPECT_GT(peak.load(), 1) << "expected concurrent execution across workers"; +} + +TEST(ThreadPoolTest, ExceptionInTaskDoesNotKillWorker) { + ThreadPool pool(1); + // packaged_task captures the exception into the future; the worker loop must + // continue and process the next submission. + auto bad = pool.Submit([] { throw std::runtime_error("boom"); }); + EXPECT_THROW(bad.get(), std::runtime_error); + + std::atomic ok{0}; + auto good = pool.Submit([&] { ok.store(1, std::memory_order_relaxed); }); + good.wait(); + EXPECT_EQ(ok.load(), 1); +} + +TEST(ThreadPoolTest, DestructorJoinsAllPendingWork) { + std::atomic done{0}; + { + ThreadPool pool(2); + for (int i = 0; i < 16; ++i) { + // Discard futures: we rely on the destructor to drain queued work. + (void)pool.Submit([&] { + std::this_thread::sleep_for(std::chrono::milliseconds(5)); + done.fetch_add(1, std::memory_order_relaxed); + }); + } + } + EXPECT_EQ(done.load(), 16); +} + +} // namespace iceberg diff --git a/src/iceberg/update/expire_snapshots.cc b/src/iceberg/update/expire_snapshots.cc index ce65882c9..4e758105c 100644 --- a/src/iceberg/update/expire_snapshots.cc +++ b/src/iceberg/update/expire_snapshots.cc @@ -20,11 +20,13 @@ #include "iceberg/update/expire_snapshots.h" #include +#include #include #include #include #include #include +#include #include #include @@ -40,6 +42,7 @@ #include "iceberg/util/error_collector.h" #include "iceberg/util/macros.h" #include "iceberg/util/snapshot_util_internal.h" +#include "iceberg/util/thread_pool_internal.h" namespace iceberg { @@ -54,12 +57,28 @@ Result> MakeManifestReader( return ManifestReader::Make(manifest, file_io, std::move(schema), std::move(spec)); } +/// \brief Cap on per-strategy worker concurrency. +/// +/// Java's RemoveSnapshots takes ExecutorServices from the table operations layer. +/// C++ has no shared executor today, so each strategy owns a private ThreadPool. +/// Cap concurrency to avoid swamping FileIO with hundreds of in-flight requests +/// on hosts with very high core counts. +constexpr std::size_t kMaxParallelism = 8; + +std::size_t WorkerCount() { + std::size_t hw = std::thread::hardware_concurrency(); + if (hw == 0) hw = 2; + return std::min(kMaxParallelism, hw); +} + /// \brief Abstract strategy for cleaning up files after snapshot expiration. class FileCleanupStrategy { public: FileCleanupStrategy(std::shared_ptr file_io, std::function delete_func) - : file_io_(std::move(file_io)), delete_func_(std::move(delete_func)) {} + : file_io_(std::move(file_io)), + delete_func_(std::move(delete_func)), + pool_(WorkerCount()) {} virtual ~FileCleanupStrategy() = default; @@ -75,24 +94,57 @@ class FileCleanupStrategy { CleanupLevel level) = 0; protected: - /// \brief Delete a single file + /// Number of attempts for a single best-effort delete. Mirrors Java's + /// Tasks.foreach(...).retry(3). + static constexpr int kDeleteMaxAttempts = 3; + + /// \brief Delete a file, suppressing errors (best-effort) with bounded retries. + /// + /// Uses the custom delete function if set, otherwise FileIO::DeleteFile. Retries + /// up to kDeleteMaxAttempts times on transient FileIO errors; stops immediately + /// when the underlying status is kNotFound (matching Java's stopRetryOn + /// NotFoundException). Custom delete callbacks are invoked exactly once -- caller + /// retry policy is opaque to us. void DeleteFile(const std::string& path) { - try { - if (delete_func_) { + if (delete_func_) { + try { delete_func_(path); - } else { - std::ignore = file_io_->DeleteFile(path); + } catch (...) { + // Suppress all exceptions during file cleanup to match Java's + // suppressFailureWhenFinished behavior. } - } catch (...) { - /// TODO(shangxinli): add retry + return; + } + + for (int attempt = 1; attempt <= kDeleteMaxAttempts; ++attempt) { + try { + auto status = file_io_->DeleteFile(path); + if (status.has_value()) return; + if (status.error().kind == ErrorKind::kNotFound) return; + if (attempt == kDeleteMaxAttempts) return; + } catch (...) { + if (attempt == kDeleteMaxAttempts) return; + } + // Linear backoff (10ms, 20ms). Tiny on purpose -- this is best-effort cleanup, + // not a critical write path; we just want to ride out a transient blip. + std::this_thread::sleep_for(std::chrono::milliseconds(10 * attempt)); } } - /// TODO(shangxinli): Add bulk deletion + /// \brief Delete a batch of files in parallel via the strategy's worker pool. + /// + /// TODO(shangxinli): When FileIO grows a SupportsBulkOperations-style + /// `DeleteFiles(span)` API, prefer the bulk path here (mirroring + /// Java's FileCleanupStrategy.deleteFiles). void DeleteFiles(const std::unordered_set& paths) { - for (const auto& path : paths) { - DeleteFile(path); + if (paths.empty()) return; + if (paths.size() == 1) { + DeleteFile(*paths.begin()); + return; } + std::vector as_vec(paths.begin(), paths.end()); + pool_.RunAndWait(std::span(as_vec), + [this](const std::string& p) { DeleteFile(p); }); } bool HasAnyStatisticsFiles(const TableMetadata& metadata) const { @@ -134,6 +186,13 @@ class FileCleanupStrategy { std::shared_ptr file_io_; std::function delete_func_; + // Worker pool for parallel best-effort deletion. Must be declared after the + // members it does NOT touch -- the pool's worker threads only run tasks + // submitted via Submit/RunAndWait, which capture `this` and dereference + // file_io_ / delete_func_, so those members must outlive the pool. Since + // destruction order is reverse declaration order, listing pool_ last ensures + // workers are joined before file_io_ and delete_func_ are destroyed. + ThreadPool pool_; }; /// \brief File cleanup strategy that determines safe deletions via full reachability. @@ -142,7 +201,7 @@ class FileCleanupStrategy { /// still referenced by retained snapshots, then deletes orphaned manifests, data /// files, and manifest lists. /// -/// TODO(shangxinli): Add multi-threaded manifest reading and file deletion support. +/// TODO(shangxinli): Add multi-threaded manifest reading. class ReachableFileCleanup : public FileCleanupStrategy { public: using FileCleanupStrategy::FileCleanupStrategy; diff --git a/src/iceberg/update/expire_snapshots.h b/src/iceberg/update/expire_snapshots.h index 7c1588aa5..c47e2488a 100644 --- a/src/iceberg/update/expire_snapshots.h +++ b/src/iceberg/update/expire_snapshots.h @@ -114,6 +114,9 @@ class ICEBERG_EXPORT ExpireSnapshots : public PendingUpdate { /// If this method is not called, unnecessary manifests and data files will still be /// deleted. /// + /// \note The supplied function may be invoked concurrently from worker threads + /// when bulk deletion is parallelized; implementations must be thread-safe. + /// /// \param delete_func A function that will be called to delete manifests and data files /// \return Reference to this for method chaining. ExpireSnapshots& DeleteWith(std::function delete_func); diff --git a/src/iceberg/util/thread_pool_internal.cc b/src/iceberg/util/thread_pool_internal.cc new file mode 100644 index 000000000..97356ef57 --- /dev/null +++ b/src/iceberg/util/thread_pool_internal.cc @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/util/thread_pool_internal.h" + +#include +#include + +namespace iceberg { + +ThreadPool::ThreadPool(std::size_t num_workers) { + if (num_workers == 0) { + throw std::invalid_argument("ThreadPool num_workers must be > 0"); + } + workers_.reserve(num_workers); + for (std::size_t i = 0; i < num_workers; ++i) { + workers_.emplace_back([this] { WorkerLoop(); }); + } +} + +ThreadPool::~ThreadPool() { + { + std::lock_guard lock(mu_); + stop_ = true; + } + cv_.notify_all(); + for (auto& w : workers_) { + if (w.joinable()) w.join(); + } +} + +std::future ThreadPool::Submit(std::function task) { + auto pkg = std::make_shared>(std::move(task)); + std::future fut = pkg->get_future(); + { + std::unique_lock lock(mu_); + if (stop_) { + // Pool is shutting down. Run the task on the calling thread so the future + // becomes ready and callers don't deadlock waiting on it. + lock.unlock(); + (*pkg)(); + return fut; + } + queue_.emplace([pkg]() { (*pkg)(); }); + } + cv_.notify_one(); + return fut; +} + +void ThreadPool::WorkerLoop() { + while (true) { + std::function task; + { + std::unique_lock lock(mu_); + cv_.wait(lock, [this] { return stop_ || !queue_.empty(); }); + if (queue_.empty()) { + // Drain mode: only exit once all queued work has been pulled. + return; + } + task = std::move(queue_.front()); + queue_.pop(); + } + // packaged_task captures exceptions into the future, so this won't throw. + task(); + } +} + +} // namespace iceberg diff --git a/src/iceberg/util/thread_pool_internal.h b/src/iceberg/util/thread_pool_internal.h new file mode 100644 index 000000000..6d84379c9 --- /dev/null +++ b/src/iceberg/util/thread_pool_internal.h @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "iceberg/iceberg_export.h" + +namespace iceberg { + +/// \brief Fixed-size worker pool for fire-and-wait task batches. +/// +/// Internal API: not part of the public iceberg surface. Used today by +/// FileCleanupStrategy in update/expire_snapshots.cc to parallelize bulk deletes +/// without spawning a fresh std::thread per call. Workers are created eagerly in +/// the constructor and joined in the destructor. +/// +/// All public methods are thread-safe. +class ICEBERG_EXPORT ThreadPool { + public: + /// \brief Construct a pool with `num_workers` worker threads. Must be > 0. + explicit ThreadPool(std::size_t num_workers); + + ~ThreadPool(); + + ThreadPool(const ThreadPool&) = delete; + ThreadPool& operator=(const ThreadPool&) = delete; + ThreadPool(ThreadPool&&) = delete; + ThreadPool& operator=(ThreadPool&&) = delete; + + std::size_t num_workers() const { return workers_.size(); } + + /// \brief Submit a task. The returned future becomes ready after `task` runs. + /// + /// If `task` throws, the exception is captured into the returned future + /// (standard std::packaged_task semantics) and the worker thread continues. + std::future Submit(std::function task); + + /// \brief Submit one task per item and block until every task has completed. + /// + /// Exceptions thrown by `work` are caught and discarded. Callers that need to + /// observe per-item failures should record them inside `work` (e.g. via an + /// atomic counter or a thread-safe collection). + /// + /// `items` and `work` must outlive this call. Both are captured by reference + /// into the queued tasks; this is safe because RunAndWait blocks until every + /// task has finished. + template + void RunAndWait(std::span items, Fn&& work) { + if (items.empty()) return; + std::vector> futures; + futures.reserve(items.size()); + for (const auto& item : items) { + futures.emplace_back(Submit([&item, &work]() { + try { + work(item); + } catch (...) { + // best-effort: see RunAndWait doc + } + })); + } + for (auto& f : futures) { + f.wait(); + } + } + + private: + void WorkerLoop(); + + std::vector workers_; + std::queue> queue_; + std::mutex mu_; + std::condition_variable cv_; + bool stop_ = false; +}; + +} // namespace iceberg