From: Matthew N. Heler Date: Wed, 22 Oct 2025 01:35:14 +0000 (-0500) Subject: rgw/restore: add configurable wait timeout for cloud restore read-through X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=cbe2f6f613e493555953c3354fe22f48b79dbbb5;p=ceph.git rgw/restore: add configurable wait timeout for cloud restore read-through Previously GET requests for cloud-tiered objects always failed immediately with ERR_REQUEST_TIMEOUT. Now they wait up to rgw_read_through_timeout_ms for the restore to complete using a waiter registry that maps objects to waiting requests. Signed-off-by: Matthew N. Heler --- diff --git a/doc/radosgw/cloud-restore.rst b/doc/radosgw/cloud-restore.rst index 252d6ef13a3..266904c55a4 100644 --- a/doc/radosgw/cloud-restore.rst +++ b/doc/radosgw/cloud-restore.rst @@ -227,8 +227,10 @@ Example 3: This will restore the object ``doc3.rtf`` for ``read_through_restore_days`` days. -.. note:: The above CLI command may time out if object restoration takes too long. - You can verify the restore status before reissuing the command. +The ``rgw_read_through_timeout_ms`` configuration option controls how long the +``GET`` request will wait for restore completion before returning a timeout error. +The default is 10000 milliseconds (10 seconds). Setting this to 0 disables waiting, +requiring clients to poll for completion by retrying the ``GET`` request. Verifying the Restoration Status diff --git a/src/common/options/rgw.yaml.in b/src/common/options/rgw.yaml.in index ad428b017b6..be86b3b3e08 100644 --- a/src/common/options/rgw.yaml.in +++ b/src/common/options/rgw.yaml.in @@ -518,6 +518,21 @@ options: services: - rgw with_legacy: true +- name: rgw_read_through_timeout_ms + type: int + level: advanced + desc: Maximum time in milliseconds for read-through GET requests to wait for cloud + object restore completion + long_desc: When a GET request is made for a cloud-tiered object that must be restored, + the request will wait up to this many milliseconds for the restore to complete. + If the restore completes within this time, the GET request succeeds and returns the + object data. If the time is exceeded, the request fails with ERR_REQUEST_TIMEOUT. + Set to 0 to fail immediately without waiting. + default: 10000 + min: 0 + services: + - rgw + with_legacy: true - name: rgw_restore_max_objs type: int level: advanced diff --git a/src/rgw/CMakeLists.txt b/src/rgw/CMakeLists.txt index 9c7fe7432ab..07d62d9d862 100644 --- a/src/rgw/CMakeLists.txt +++ b/src/rgw/CMakeLists.txt @@ -67,6 +67,7 @@ set(librgw_common_srcs rgw_lc.cc rgw_lc_s3.cc rgw_restore.cc + rgw_restore_waiter.cc rgw_metadata.cc rgw_multi.cc rgw_multi_del.cc diff --git a/src/rgw/rgw_op.cc b/src/rgw/rgw_op.cc index b5fd4bb8ddf..6b2b61ba0f7 100644 --- a/src/rgw/rgw_op.cc +++ b/src/rgw/rgw_op.cc @@ -69,6 +69,7 @@ #include "rgw_bucket_sync.h" #include "rgw_bucket_logging.h" #include "rgw_restore.h" +#include "rgw_restore_waiter.h" #include "services/svc_zone.h" #include "services/svc_quota.h" @@ -994,12 +995,111 @@ void handle_replication_status_header( * `1` : restore is already in progress * `2` : already restored */ +static int wait_for_restore_completion(req_state* s, const DoutPrefixProvider *dpp, + int64_t timeout_ms, + std::shared_ptr waiter = nullptr, + std::shared_ptr registry = nullptr, + optional_yield y = null_yield) +{ + if (timeout_ms <= 0 || (!waiter && !registry)) { + ldpp_dout(dpp, 5) << "restore is still in progress, please check restore status and retry" << dendl; + s->err.message = "restore is still in progress"; + return -ERR_REQUEST_TIMEOUT; + } + + // If waiter not provided, register one now (for RestoreAlreadyInProgress case) + std::unique_ptr guard; + if (!waiter) { + waiter = registry->register_waiter( + s->bucket->get_key(), + s->object->get_key() + ); + if (!waiter) { + ldpp_dout(dpp, 5) << "restore waiter unavailable, returning timeout" << dendl; + s->err.message = "restore is still in progress"; + return -ERR_REQUEST_TIMEOUT; + } + guard = std::make_unique( + registry, + waiter + ); + } + + const auto start_time = ceph::real_clock::now(); + constexpr int64_t poll_interval_ms = 200; // Poll RADOS every 200ms for cross-instance restores + int64_t remaining_ms = timeout_ms; + + auto elapsed_ms = [start_time]() { + return std::chrono::duration_cast( + ceph::real_clock::now() - start_time).count(); + }; + + while (remaining_ms > 0) { + // Try waiting on condition variable for notification + const int64_t wait_time_ms = (poll_interval_ms < remaining_ms) ? poll_interval_ms : remaining_ms; + + const bool notified = waiter->wait_for(std::chrono::milliseconds(wait_time_ms), y); + + if (notified) { + // Got notification from restore processor + if (waiter->failed.load(std::memory_order_acquire)) { + const int result = waiter->result.load(std::memory_order_acquire); + ldpp_dout(dpp, 0) << "Restore failed after " << elapsed_ms() << "ms (notified)" << dendl; + s->err.message = "restore operation failed"; + return result < 0 ? result : -EIO; + } + ldpp_dout(dpp, 10) << "Restore completed successfully in " << elapsed_ms() << "ms (notified)" << dendl; + return static_cast(rgw::sal::RGWRestoreStatus::CloudRestored); + } + + // No notification - poll RADOS to check status (for cross-instance restores) + // Invalidate cache to force fresh read from RADOS + s->object->invalidate(); + int ret = s->object->get_obj_attrs(y, dpp); + if (ret < 0) { + ldpp_dout(dpp, 5) << "Failed to read object attrs during restore wait: " << ret << dendl; + // Continue waiting - transient error + remaining_ms = timeout_ms - elapsed_ms(); + continue; + } + + const rgw::sal::Attrs& attrs = s->object->get_attrs(); + auto attr_iter = attrs.find(RGW_ATTR_RESTORE_STATUS); + if (attr_iter != attrs.end()) { + rgw::sal::RGWRestoreStatus restore_status; + auto iter = attr_iter->second.cbegin(); + decode(restore_status, iter); + + if (restore_status == rgw::sal::RGWRestoreStatus::CloudRestored) { + ldpp_dout(dpp, 10) << "Restore completed successfully in " << elapsed_ms() << "ms (polled)" << dendl; + return static_cast(rgw::sal::RGWRestoreStatus::CloudRestored); + } + if (restore_status == rgw::sal::RGWRestoreStatus::RestoreFailed) { + ldpp_dout(dpp, 0) << "Restore failed after " << elapsed_ms() << "ms (polled)" << dendl; + s->err.message = "restore operation failed"; + return -EIO; + } + // else RestoreAlreadyInProgress - continue waiting + } + + // Update remaining time based on actual elapsed time + remaining_ms = timeout_ms - elapsed_ms(); + } + + // Timeout reached + ldpp_dout(dpp, 5) << "Restore timeout after " << elapsed_ms() << "ms, still in progress" << dendl; + s->err.message = "restore is still in progress"; + return -ERR_REQUEST_TIMEOUT; +} + int handle_cloudtier_obj(req_state* s, const DoutPrefixProvider *dpp, rgw::sal::Driver* driver, rgw::sal::Attrs& attrs, bool sync_cloudtiered, std::optional days, bool read_through, optional_yield y) { int op_ret = 0; ldpp_dout(dpp, 20) << "reached handle cloud tier " << dendl; + rgw::restore::Restore* restore_handle = driver ? driver->get_rgwrestore() : nullptr; + auto waiter_registry = restore_handle ? restore_handle->get_waiter_registry() : nullptr; auto attr_iter = attrs.find(RGW_ATTR_MANIFEST); if (attr_iter == attrs.end()) { if (!read_through) { @@ -1042,11 +1142,11 @@ int handle_cloudtier_obj(req_state* s, const DoutPrefixProvider *dpp, rgw::sal:: } if (restore_status == rgw::sal::RGWRestoreStatus::RestoreAlreadyInProgress) { if (read_through) { - op_ret = -ERR_REQUEST_TIMEOUT; - ldpp_dout(dpp, 5) << "restore is still in progress, please check restore status and retry" << dendl; - s->err.message = "restore is still in progress"; - return op_ret; - } else { + // For glacier tier, fail immediately as restores can take hours/days + int64_t timeout_ms = (tier_config.tier_placement.tier_type == "cloud-s3-glacier") + ? 0 : s->cct->_conf.get_val("rgw_read_through_timeout_ms"); + return wait_for_restore_completion(s, dpp, timeout_ms, nullptr, waiter_registry, y); + } else { // for restore-op, corresponds to RESTORE_ALREADY_IN_PROGRESS return static_cast(rgw::sal::RGWRestoreStatus::RestoreAlreadyInProgress); } @@ -1098,6 +1198,26 @@ int handle_cloudtier_obj(req_state* s, const DoutPrefixProvider *dpp, rgw::sal:: } } + // For read-through, register waiter BEFORE initiating restore + std::shared_ptr waiter; + std::unique_ptr guard; + + if (read_through && waiter_registry) { + // For glacier tier, fail immediately as restores can take hours/days + int64_t timeout_ms = (tier->get_tier_type() == "cloud-s3-glacier") + ? 0 : s->cct->_conf.get_val("rgw_read_through_timeout_ms"); + if (timeout_ms > 0) { + waiter = waiter_registry->register_waiter( + s->bucket->get_key(), + s->object->get_key() + ); + guard = std::make_unique( + waiter_registry, + waiter + ); + } + } + op_ret = driver->get_rgwrestore()->restore_obj_from_cloud(s->bucket.get(), s->object.get(), tier.get(), days, dpp, y); @@ -1108,13 +1228,12 @@ int handle_cloudtier_obj(req_state* s, const DoutPrefixProvider *dpp, rgw::sal:: } ldpp_dout(dpp, 20) << "Restore of object " << s->object->get_key() << " initiated" << dendl; - /* Even if restore is complete the first read through request will return - * but actually downloaded object asyncronously. - */ - if (read_through) { //read-through - op_ret = -ERR_REQUEST_TIMEOUT; - ldpp_dout(dpp, 5) << "restore is still in progress, please check restore status and retry" << dendl; - s->err.message = "restore is still in progress"; + + if (read_through) { + // For glacier tier, fail immediately as restores can take hours/days + int64_t timeout_ms = (tier->get_tier_type() == "cloud-s3-glacier") + ? 0 : s->cct->_conf.get_val("rgw_read_through_timeout_ms"); + return wait_for_restore_completion(s, dpp, timeout_ms, waiter, waiter_registry, y); } return op_ret; } @@ -2614,6 +2733,19 @@ void RGWGetObj::execute(optional_yield y) <<". Failing with " << op_ret << dendl; goto done_err; } + // If restore completed (via wait), invalidate cache and reload attrs + if (op_ret == static_cast(rgw::sal::RGWRestoreStatus::CloudRestored)) { + // Invalidate cached state to force fresh read from RADOS with updated manifest + s->object->invalidate(); + + op_ret = s->object->get_obj_attrs(y, this); + if (op_ret < 0) { + ldpp_dout(this, 0) << "ERROR: failed to reload attrs after restore" << dendl; + goto done_err; + } + attrs = s->object->get_attrs(); + s->obj_size = s->object->get_size(); + } } attr_iter = attrs.find(RGW_ATTR_USER_MANIFEST); diff --git a/src/rgw/rgw_restore.cc b/src/rgw/rgw_restore.cc index 9f4bd97d688..f00ddbd8d6e 100644 --- a/src/rgw/rgw_restore.cc +++ b/src/rgw/rgw_restore.cc @@ -27,6 +27,7 @@ #include "rgw_common.h" #include "rgw_bucket.h" #include "rgw_restore.h" +#include "rgw_restore_waiter.h" #include "rgw_zone.h" #include "rgw_string.h" #include "rgw_multi.h" @@ -152,6 +153,11 @@ int Restore::initialize(CephContext *_cct, rgw::sal::Driver* _driver) { cct = _cct; driver = _driver; + // Initialize waiter registry + if (!waiter_registry) { + waiter_registry = std::make_shared(); + } + ldpp_dout(this, 20) << __PRETTY_FUNCTION__ << ": initializing Restore handle" << dendl; /* max_objs indicates the number of shards or objects * used to store Restore Entries */ @@ -188,6 +194,10 @@ void Restore::finalize() { sal_restore.reset(nullptr); obj_names.clear(); + if (waiter_registry) { + waiter_registry->shutdown(); + waiter_registry.reset(); + } ldpp_dout(this, 20) << __PRETTY_FUNCTION__ << ": finalize Restore handle" << dendl; } @@ -234,6 +244,14 @@ void Restore::stop_processor() worker.reset(nullptr); } +void Restore::wake_worker() +{ + if (worker) { + std::lock_guard lock(worker->lock); + worker->cond.notify_one(); + } +} + unsigned Restore::get_subsys() const { return dout_subsys; @@ -552,6 +570,18 @@ done: ldpp_dout(this, -1) << __PRETTY_FUNCTION__ << ": Restore of entry:'" << entry << "' failed" << ret << dendl; entry.status = rgw::sal::RGWRestoreStatus::RestoreFailed; } + + // Notify any waiting GET requests + if (waiter_registry && !in_progress) { + bool success = (ret >= 0); + waiter_registry->notify_completion( + entry.bucket, + entry.obj_key, + success, + ret + ); + } + return ret; } @@ -728,7 +758,14 @@ int Restore::restore_obj_from_cloud(rgw::sal::Bucket* pbucket, return ret; } - ldpp_dout(this, 10) << __PRETTY_FUNCTION__ << ": Restore of object " << pobj->get_key() << " is in progress." << dendl; + // For cloud-s3 tier (not glacier), wake the restore worker immediately + // to start the download instead of waiting for the next periodic cycle + if (tier && tier->get_tier_type() == "cloud-s3") { + ldpp_dout(this, 10) << __PRETTY_FUNCTION__ << ": Waking restore worker for immediate processing (cloud-s3 tier)" << dendl; + wake_worker(); + } + + ldpp_dout(this, 10) << __PRETTY_FUNCTION__ << ": Restore of object " << pobj->get_key() << " is in progress." << dendl; if (notify) { auto& attrs = pobj->get_attrs(); @@ -747,6 +784,7 @@ int Restore::restore_obj_from_cloud(rgw::sal::Bucket* pbucket, } } + return ret; } diff --git a/src/rgw/rgw_restore.h b/src/rgw/rgw_restore.h index 981eb49360c..c759fd8abf1 100644 --- a/src/rgw/rgw_restore.h +++ b/src/rgw/rgw_restore.h @@ -20,6 +20,7 @@ #include "cls/rgw/cls_rgw_types.h" #include "rgw_sal.h" #include "rgw_notify.h" +#include "rgw_restore_waiter.h" #include #include @@ -73,6 +74,7 @@ class Restore : public DoutPrefixProvider { int max_objs{0}; std::vector obj_names; std::atomic down_flag = { false }; + std::shared_ptr waiter_registry; class RestoreWorker : public Thread { @@ -95,6 +97,7 @@ class Restore : public DoutPrefixProvider { void *entry() override; void stop(); + friend class Restore; friend class RGWRados; }; // RestoreWorker @@ -116,6 +119,8 @@ public: bool going_down(); void start_processor(); void stop_processor(); + void wake_worker(); + std::shared_ptr get_waiter_registry() const { return waiter_registry; } CephContext *get_cct() const override { return cct; } rgw::sal::Restore* get_restore() const { return sal_restore.get(); } diff --git a/src/rgw/rgw_restore_waiter.cc b/src/rgw/rgw_restore_waiter.cc new file mode 100644 index 00000000000..58ec4af22fd --- /dev/null +++ b/src/rgw/rgw_restore_waiter.cc @@ -0,0 +1,226 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab ft=cpp + +#include "rgw_restore_waiter.h" +#include +#include + +namespace rgw::restore { + +void RestoreWaiter::reset() { + completed.store(false, std::memory_order_relaxed); + failed.store(false, std::memory_order_relaxed); + result.store(0, std::memory_order_relaxed); + cached_key.clear(); +} + +bool RestoreWaiter::wait_for(std::chrono::milliseconds timeout, optional_yield y) { + if (completed.load(std::memory_order_acquire)) { + return true; + } + + if (y) { + auto& yield = y.get_yield_context(); + auto timer = std::make_shared(yield.get_executor()); + + { + std::lock_guard lock(timer_mtx); + active_timer = timer; + } + + timer->expires_after(timeout); + + boost::system::error_code ec; + timer->async_wait(yield[ec]); + + { + std::lock_guard lock(timer_mtx); + active_timer.reset(); + } + + // timer cancellation is used to wake async waiters on completion/shutdown + if (ec && ec != boost::asio::error::operation_aborted) { + return false; + } + + return completed.load(std::memory_order_acquire); + } + + std::unique_lock lock(mtx); + return cv.wait_for(lock, timeout, + [this] { return completed.load(std::memory_order_acquire); }); +} + +void RestoreWaiter::complete(bool success, int result_code) { + failed.store(!success, std::memory_order_release); + result.store(result_code, std::memory_order_release); + completed.store(true, std::memory_order_release); + + cv.notify_all(); + + std::shared_ptr timer; + { + std::lock_guard lock(timer_mtx); + timer = active_timer.lock(); + } + if (timer) { + timer->cancel(); + } +} + +// RestoreWaiterPool implementation +std::shared_ptr RestoreWaiterPool::acquire(std::weak_ptr owner) { + std::unique_lock lock(pool_mtx); + + // Periodic eviction of old waiters + evict_old_waiters(); + + if (!free_list.empty()) { + auto waiter = std::move(free_list.back()); + free_list.pop_back(); + lock.unlock(); + + // Reset state + waiter->reset(); + + return std::shared_ptr(waiter.release(), + [owner](RestoreWaiter* w) { + if (auto reg = owner.lock()) { + reg->release_waiter(w); + } else { + delete w; + } + }); + } + + lock.unlock(); + auto waiter = std::make_unique(); + return std::shared_ptr(waiter.release(), + [owner](RestoreWaiter* w) { + if (auto reg = owner.lock()) { + reg->release_waiter(w); + } else { + delete w; + } + }); +} + +void RestoreWaiterPool::release(RestoreWaiter* waiter) { + std::lock_guard lock(pool_mtx); + if (free_list.size() < MAX_POOL_SIZE) { + waiter->last_used = ceph::coarse_real_clock::now(); + free_list.emplace_back(waiter); + } else { + delete waiter; + } +} + +void RestoreWaiterPool::evict_old_waiters() { + // Assumes pool_mtx is already held + if (free_list.empty()) { + return; + } + + const auto now = ceph::coarse_real_clock::now(); + const auto eviction_threshold = now - EVICTION_TIME; + + free_list.erase( + std::remove_if(free_list.begin(), free_list.end(), + [eviction_threshold](const std::unique_ptr& w) { + return w->last_used < eviction_threshold; + }), + free_list.end() + ); +} + +// RestoreWaiterRegistry implementation +std::string RestoreWaiterRegistry::make_key(const rgw_bucket& bucket, const rgw_obj_key& obj_key) { + std::string key; + key.reserve(bucket.name.size() + obj_key.name.size() + obj_key.instance.size() + 10); + key = bucket.get_key(); + key += ':'; + key += obj_key.name; + if (!obj_key.instance.empty()) { + key += ':'; + key += obj_key.instance; + } + return key; +} + +void RestoreWaiterRegistry::release_waiter(RestoreWaiter* waiter) { + waiter_pool.release(waiter); +} + +std::shared_ptr RestoreWaiterRegistry::register_waiter(const rgw_bucket& bucket, + const rgw_obj_key& obj_key) { + if (shutting_down.load(std::memory_order_acquire)) { + return nullptr; + } + + auto self = shared_from_this(); + auto waiter = waiter_pool.acquire(self); + waiter->cached_key = make_key(bucket, obj_key); + + std::unique_lock lock(registry_mtx); + if (shutting_down.load(std::memory_order_relaxed)) { + return nullptr; + } + auto& vec = waiters[waiter->cached_key]; + vec.push_back(waiter); + + return waiter; +} + +void RestoreWaiterRegistry::unregister_waiter(std::shared_ptr waiter) { + std::unique_lock lock(registry_mtx); + + auto it = waiters.find(waiter->cached_key); + if (it != waiters.end()) { + auto& vec = it->second; + vec.erase(std::remove(vec.begin(), vec.end(), waiter), vec.end()); + if (vec.empty()) { + waiters.erase(it); + } + } +} + +void RestoreWaiterRegistry::notify_completion(const rgw_bucket& bucket, + const rgw_obj_key& obj_key, + bool success, + int result) { + std::string key = make_key(bucket, obj_key); + std::vector> to_notify; + + { + std::unique_lock lock(registry_mtx); + auto it = waiters.find(key); + if (it != waiters.end()) { + to_notify = std::move(it->second); + waiters.erase(it); + } + } + + // Notify outside the lock - lock-free atomic updates + for (auto& waiter : to_notify) { + waiter->complete(success, result); + } +} + +void RestoreWaiterRegistry::shutdown() { + shutting_down.store(true, std::memory_order_release); + std::vector> to_notify; + + { + std::unique_lock lock(registry_mtx); + for (auto& [_, vec] : waiters) { + to_notify.insert(to_notify.end(), vec.begin(), vec.end()); + } + waiters.clear(); + } + + for (auto& waiter : to_notify) { + waiter->complete(false, -ECANCELED); + } +} + +} // namespace rgw::restore diff --git a/src/rgw/rgw_restore_waiter.h b/src/rgw/rgw_restore_waiter.h new file mode 100644 index 00000000000..fde9ebc7e7a --- /dev/null +++ b/src/rgw/rgw_restore_waiter.h @@ -0,0 +1,117 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab ft=cpp + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "rgw_common.h" +#include "rgw_sal.h" + +namespace rgw::restore { + +class RestoreWaiterRegistry; + +// Represents a single GET request waiting for restore completion +struct RestoreWaiter { + // the blocking wait uses std::condition_variable::wait_for(), which uses the + // std::chrono::steady_clock. use that clock for async waits as well + using Clock = std::chrono::steady_clock; + using Timer = boost::asio::basic_waitable_timer; + + std::mutex mtx; + std::condition_variable cv; + std::mutex timer_mtx; + std::weak_ptr active_timer; + std::atomic completed{false}; + std::atomic failed{false}; + std::atomic result{0}; // Error codes fit in int16_t + std::string cached_key; // Cached registry key to avoid recomputation in unregister + ceph::coarse_real_time last_used; // Timestamp for pool eviction + + // Wait for completion for up to 'timeout'. Uses cv for blocking callers and + // a timer for coroutine callers so we don't block the frontend coroutine. + bool wait_for(std::chrono::milliseconds timeout, optional_yield y); + // Mark completion and wake any waiting callers. + void complete(bool success, int result_code); + // Reset state before reuse. + void reset(); +}; + +// Object pool for RestoreWaiter to avoid repeated allocations +class RestoreWaiterPool { +private: + friend class RestoreWaiterRegistry; + std::mutex pool_mtx; + std::vector> free_list; + static constexpr size_t MAX_POOL_SIZE = 256; + static constexpr std::chrono::seconds EVICTION_TIME{300}; // 5 minutes + +public: + std::shared_ptr acquire(std::weak_ptr owner); + +private: + void release(RestoreWaiter* waiter); + void evict_old_waiters(); +}; + +// Registry mapping object keys to waiting GET requests +class RestoreWaiterRegistry : public std::enable_shared_from_this { +private: + mutable std::shared_mutex registry_mtx; + std::unordered_map>> waiters; + RestoreWaiterPool waiter_pool; + std::atomic shutting_down{false}; + friend class RestoreWaiterPool; + + static std::string make_key(const rgw_bucket& bucket, const rgw_obj_key& obj_key); + void release_waiter(RestoreWaiter* waiter); + +public: + // Register a waiter for an object restore + std::shared_ptr register_waiter(const rgw_bucket& bucket, + const rgw_obj_key& obj_key); + + // Unregister a waiter (called on timeout or completion) + void unregister_waiter(std::shared_ptr waiter); + + // Signal all waiters for an object (called by restore worker) + void notify_completion(const rgw_bucket& bucket, + const rgw_obj_key& obj_key, + bool success, + int result); + + // Cancel all waiters and prevent new registrations + void shutdown(); +}; + +// RAII guard for automatic waiter unregistration +struct WaiterGuard { + std::shared_ptr registry; + std::shared_ptr waiter; + + WaiterGuard(std::shared_ptr reg, std::shared_ptr w) + : registry(std::move(reg)), waiter(std::move(w)) {} + + ~WaiterGuard() { + if (registry && waiter) { + registry->unregister_waiter(waiter); + } + } + + WaiterGuard(const WaiterGuard&) = delete; + WaiterGuard& operator=(const WaiterGuard&) = delete; +}; + +} // namespace rgw::restore