]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
rgw/restore: add configurable wait timeout for cloud restore read-through
authorMatthew N. Heler <matthew.heler@hotmail.com>
Wed, 22 Oct 2025 01:35:14 +0000 (20:35 -0500)
committerMatthew N. Heler <matthew.heler@hotmail.com>
Wed, 18 Feb 2026 12:30:09 +0000 (06:30 -0600)
Previously GET requests for cloud-tiered objects always failed immediately
with ERR_REQUEST_TIMEOUT. Now they wait up to rgw_read_through_timeout_ms
for the restore to complete using a waiter registry that maps objects to
waiting requests.

Signed-off-by: Matthew N. Heler <matthew.heler@hotmail.com>
doc/radosgw/cloud-restore.rst
src/common/options/rgw.yaml.in
src/rgw/CMakeLists.txt
src/rgw/rgw_op.cc
src/rgw/rgw_restore.cc
src/rgw/rgw_restore.h
src/rgw/rgw_restore_waiter.cc [new file with mode: 0644]
src/rgw/rgw_restore_waiter.h [new file with mode: 0644]

index 252d6ef13a365b4241d2f5d182d557f978e9dada..266904c55a49f4e0a4b0761a93b5c7394ce32df4 100644 (file)
@@ -227,8 +227,10 @@ Example 3:
 
 This will restore the object ``doc3.rtf`` for ``read_through_restore_days`` days.
 
-.. note:: The above CLI command may time out if object restoration takes too long.
-          You can verify the restore status before reissuing the command.
+The ``rgw_read_through_timeout_ms`` configuration option controls how long the
+``GET`` request will wait for restore completion before returning a timeout error.
+The default is 10000 milliseconds (10 seconds). Setting this to 0 disables waiting,
+requiring clients to poll for completion by retrying the ``GET`` request.
 
 
 Verifying the Restoration Status
index ad428b017b6c6ac736f98ec058be1dc179cc546d..be86b3b3e084c2029a97f07f1b228f999951dc61 100644 (file)
@@ -518,6 +518,21 @@ options:
   services:
   - rgw
   with_legacy: true
+- name: rgw_read_through_timeout_ms
+  type: int
+  level: advanced
+  desc: Maximum time in milliseconds for read-through GET requests to wait for cloud
+    object restore completion
+  long_desc: When a GET request is made for a cloud-tiered object that must be restored,
+    the request will wait up to this many milliseconds for the restore to complete.
+    If the restore completes within this time, the GET request succeeds and returns the
+    object data. If the time is exceeded, the request fails with ERR_REQUEST_TIMEOUT.
+    Set to 0 to fail immediately without waiting.
+  default: 10000
+  min: 0
+  services:
+  - rgw
+  with_legacy: true
 - name: rgw_restore_max_objs
   type: int
   level: advanced
index 9c7fe7432abd8fedb05335fe7cb4b74d5cff02c5..07d62d9d862acdeca4e9aa1f27a90d292ea2b584 100644 (file)
@@ -67,6 +67,7 @@ set(librgw_common_srcs
   rgw_lc.cc
   rgw_lc_s3.cc
   rgw_restore.cc
+  rgw_restore_waiter.cc
   rgw_metadata.cc
   rgw_multi.cc
   rgw_multi_del.cc
index b5fd4bb8ddf9711572a2e9691dc842cb6dd89318..6b2b61ba0f757e59f9850631c7b4c7fe7c61f829 100644 (file)
@@ -69,6 +69,7 @@
 #include "rgw_bucket_sync.h"
 #include "rgw_bucket_logging.h"
 #include "rgw_restore.h"
+#include "rgw_restore_waiter.h"
 
 #include "services/svc_zone.h"
 #include "services/svc_quota.h"
@@ -994,12 +995,111 @@ void handle_replication_status_header(
  *  `1`  :  restore is already in progress
  *  `2`  :  already restored
  */
+static int wait_for_restore_completion(req_state* s, const DoutPrefixProvider *dpp,
+                                        int64_t timeout_ms,
+                                        std::shared_ptr<rgw::restore::RestoreWaiter> waiter = nullptr,
+                                        std::shared_ptr<rgw::restore::RestoreWaiterRegistry> registry = nullptr,
+                                        optional_yield y = null_yield)
+{
+  if (timeout_ms <= 0 || (!waiter && !registry)) {
+    ldpp_dout(dpp, 5) << "restore is still in progress, please check restore status and retry" << dendl;
+    s->err.message = "restore is still in progress";
+    return -ERR_REQUEST_TIMEOUT;
+  }
+
+  // If waiter not provided, register one now (for RestoreAlreadyInProgress case)
+  std::unique_ptr<rgw::restore::WaiterGuard> guard;
+  if (!waiter) {
+    waiter = registry->register_waiter(
+      s->bucket->get_key(),
+      s->object->get_key()
+    );
+    if (!waiter) {
+      ldpp_dout(dpp, 5) << "restore waiter unavailable, returning timeout" << dendl;
+      s->err.message = "restore is still in progress";
+      return -ERR_REQUEST_TIMEOUT;
+    }
+    guard = std::make_unique<rgw::restore::WaiterGuard>(
+      registry,
+      waiter
+    );
+  }
+
+  const auto start_time = ceph::real_clock::now();
+  constexpr int64_t poll_interval_ms = 200;  // Poll RADOS every 200ms for cross-instance restores
+  int64_t remaining_ms = timeout_ms;
+
+  auto elapsed_ms = [start_time]() {
+    return std::chrono::duration_cast<std::chrono::milliseconds>(
+      ceph::real_clock::now() - start_time).count();
+  };
+
+  while (remaining_ms > 0) {
+    // Try waiting on condition variable for notification
+    const int64_t wait_time_ms = (poll_interval_ms < remaining_ms) ? poll_interval_ms : remaining_ms;
+
+    const bool notified = waiter->wait_for(std::chrono::milliseconds(wait_time_ms), y);
+
+    if (notified) {
+      // Got notification from restore processor
+      if (waiter->failed.load(std::memory_order_acquire)) {
+        const int result = waiter->result.load(std::memory_order_acquire);
+        ldpp_dout(dpp, 0) << "Restore failed after " << elapsed_ms() << "ms (notified)" << dendl;
+        s->err.message = "restore operation failed";
+        return result < 0 ? result : -EIO;
+      }
+      ldpp_dout(dpp, 10) << "Restore completed successfully in " << elapsed_ms() << "ms (notified)" << dendl;
+      return static_cast<int>(rgw::sal::RGWRestoreStatus::CloudRestored);
+    }
+
+    // No notification - poll RADOS to check status (for cross-instance restores)
+    // Invalidate cache to force fresh read from RADOS
+    s->object->invalidate();
+    int ret = s->object->get_obj_attrs(y, dpp);
+    if (ret < 0) {
+      ldpp_dout(dpp, 5) << "Failed to read object attrs during restore wait: " << ret << dendl;
+      // Continue waiting - transient error
+      remaining_ms = timeout_ms - elapsed_ms();
+      continue;
+    }
+
+    const rgw::sal::Attrs& attrs = s->object->get_attrs();
+    auto attr_iter = attrs.find(RGW_ATTR_RESTORE_STATUS);
+    if (attr_iter != attrs.end()) {
+      rgw::sal::RGWRestoreStatus restore_status;
+      auto iter = attr_iter->second.cbegin();
+      decode(restore_status, iter);
+
+      if (restore_status == rgw::sal::RGWRestoreStatus::CloudRestored) {
+        ldpp_dout(dpp, 10) << "Restore completed successfully in " << elapsed_ms() << "ms (polled)" << dendl;
+        return static_cast<int>(rgw::sal::RGWRestoreStatus::CloudRestored);
+      }
+      if (restore_status == rgw::sal::RGWRestoreStatus::RestoreFailed) {
+        ldpp_dout(dpp, 0) << "Restore failed after " << elapsed_ms() << "ms (polled)" << dendl;
+        s->err.message = "restore operation failed";
+        return -EIO;
+      }
+      // else RestoreAlreadyInProgress - continue waiting
+    }
+
+    // Update remaining time based on actual elapsed time
+    remaining_ms = timeout_ms - elapsed_ms();
+  }
+
+  // Timeout reached
+  ldpp_dout(dpp, 5) << "Restore timeout after " << elapsed_ms() << "ms, still in progress" << dendl;
+  s->err.message = "restore is still in progress";
+  return -ERR_REQUEST_TIMEOUT;
+}
+
 int handle_cloudtier_obj(req_state* s, const DoutPrefixProvider *dpp, rgw::sal::Driver* driver,
                          rgw::sal::Attrs& attrs, bool sync_cloudtiered, std::optional<uint64_t> days,
                          bool read_through, optional_yield y)
 {
   int op_ret = 0;
   ldpp_dout(dpp, 20) << "reached handle cloud tier " << dendl;
+  rgw::restore::Restore* restore_handle = driver ? driver->get_rgwrestore() : nullptr;
+  auto waiter_registry = restore_handle ? restore_handle->get_waiter_registry() : nullptr;
   auto attr_iter = attrs.find(RGW_ATTR_MANIFEST);
   if (attr_iter == attrs.end()) {
     if (!read_through) {
@@ -1042,11 +1142,11 @@ int handle_cloudtier_obj(req_state* s, const DoutPrefixProvider *dpp, rgw::sal::
     }
     if (restore_status == rgw::sal::RGWRestoreStatus::RestoreAlreadyInProgress) {
       if (read_through) {
-        op_ret = -ERR_REQUEST_TIMEOUT;
-        ldpp_dout(dpp, 5) << "restore is still in progress, please check restore status and retry" << dendl;
-        s->err.message = "restore is still in progress";
-        return op_ret;
-      } else { 
+        // For glacier tier, fail immediately as restores can take hours/days
+        int64_t timeout_ms = (tier_config.tier_placement.tier_type == "cloud-s3-glacier")
+          ? 0 : s->cct->_conf.get_val<int64_t>("rgw_read_through_timeout_ms");
+        return wait_for_restore_completion(s, dpp, timeout_ms, nullptr, waiter_registry, y);
+      } else {
                // for restore-op, corresponds to RESTORE_ALREADY_IN_PROGRESS
         return static_cast<int>(rgw::sal::RGWRestoreStatus::RestoreAlreadyInProgress);
       } 
@@ -1098,6 +1198,26 @@ int handle_cloudtier_obj(req_state* s, const DoutPrefixProvider *dpp, rgw::sal::
         }
       }
 
+      // For read-through, register waiter BEFORE initiating restore
+      std::shared_ptr<rgw::restore::RestoreWaiter> waiter;
+      std::unique_ptr<rgw::restore::WaiterGuard> guard;
+
+      if (read_through && waiter_registry) {
+        // For glacier tier, fail immediately as restores can take hours/days
+        int64_t timeout_ms = (tier->get_tier_type() == "cloud-s3-glacier")
+          ? 0 : s->cct->_conf.get_val<int64_t>("rgw_read_through_timeout_ms");
+        if (timeout_ms > 0) {
+          waiter = waiter_registry->register_waiter(
+            s->bucket->get_key(),
+            s->object->get_key()
+          );
+          guard = std::make_unique<rgw::restore::WaiterGuard>(
+            waiter_registry,
+            waiter
+          );
+        }
+      }
+
       op_ret = driver->get_rgwrestore()->restore_obj_from_cloud(s->bucket.get(),
                      s->object.get(), tier.get(), days, dpp, y);
 
@@ -1108,13 +1228,12 @@ int handle_cloudtier_obj(req_state* s, const DoutPrefixProvider *dpp, rgw::sal::
       }
 
       ldpp_dout(dpp, 20) << "Restore of object " << s->object->get_key() << " initiated" << dendl;
-      /*  Even if restore is complete the first read through request will return
-       *  but actually downloaded object asyncronously.
-       */
-      if (read_through) { //read-through
-        op_ret = -ERR_REQUEST_TIMEOUT;
-        ldpp_dout(dpp, 5) << "restore is still in progress, please check restore status and retry" << dendl;
-        s->err.message = "restore is still in progress";
+
+      if (read_through) {
+        // For glacier tier, fail immediately as restores can take hours/days
+        int64_t timeout_ms = (tier->get_tier_type() == "cloud-s3-glacier")
+          ? 0 : s->cct->_conf.get_val<int64_t>("rgw_read_through_timeout_ms");
+        return wait_for_restore_completion(s, dpp, timeout_ms, waiter, waiter_registry, y);
       }
       return op_ret;
     }
@@ -2614,6 +2733,19 @@ void RGWGetObj::execute(optional_yield y)
                        <<". Failing with " << op_ret << dendl;
       goto done_err;
     }
+    // If restore completed (via wait), invalidate cache and reload attrs
+    if (op_ret == static_cast<int>(rgw::sal::RGWRestoreStatus::CloudRestored)) {
+      // Invalidate cached state to force fresh read from RADOS with updated manifest
+      s->object->invalidate();
+
+      op_ret = s->object->get_obj_attrs(y, this);
+      if (op_ret < 0) {
+        ldpp_dout(this, 0) << "ERROR: failed to reload attrs after restore" << dendl;
+        goto done_err;
+      }
+      attrs = s->object->get_attrs();
+      s->obj_size = s->object->get_size();
+    }
   }
 
   attr_iter = attrs.find(RGW_ATTR_USER_MANIFEST);
index 9f4bd97d6882fc1a8fd5eaa07ae321373ddc1605..f00ddbd8d6e5665708b473a148a4c2878a8ab06e 100644 (file)
@@ -27,6 +27,7 @@
 #include "rgw_common.h"
 #include "rgw_bucket.h"
 #include "rgw_restore.h"
+#include "rgw_restore_waiter.h"
 #include "rgw_zone.h"
 #include "rgw_string.h"
 #include "rgw_multi.h"
@@ -152,6 +153,11 @@ int Restore::initialize(CephContext *_cct, rgw::sal::Driver* _driver) {
   cct = _cct;
   driver = _driver;
 
+  // Initialize waiter registry
+  if (!waiter_registry) {
+    waiter_registry = std::make_shared<RestoreWaiterRegistry>();
+  }
+
   ldpp_dout(this, 20) << __PRETTY_FUNCTION__ << ": initializing Restore handle" << dendl;
   /* max_objs indicates the number of shards or objects
    * used to store Restore Entries */
@@ -188,6 +194,10 @@ void Restore::finalize()
 {
   sal_restore.reset(nullptr);
   obj_names.clear();
+  if (waiter_registry) {
+    waiter_registry->shutdown();
+    waiter_registry.reset();
+  }
   ldpp_dout(this, 20) << __PRETTY_FUNCTION__ << ": finalize Restore handle" << dendl;
 }
 
@@ -234,6 +244,14 @@ void Restore::stop_processor()
   worker.reset(nullptr);
 }
 
+void Restore::wake_worker()
+{
+  if (worker) {
+    std::lock_guard lock(worker->lock);
+    worker->cond.notify_one();
+  }
+}
+
 unsigned Restore::get_subsys() const
 {
   return dout_subsys;
@@ -552,6 +570,18 @@ done:
     ldpp_dout(this, -1) << __PRETTY_FUNCTION__ << ": Restore of entry:'" << entry << "' failed" << ret << dendl;         
     entry.status = rgw::sal::RGWRestoreStatus::RestoreFailed;
   }
+
+  // Notify any waiting GET requests
+  if (waiter_registry && !in_progress) {
+    bool success = (ret >= 0);
+    waiter_registry->notify_completion(
+      entry.bucket,
+      entry.obj_key,
+      success,
+      ret
+    );
+  }
+
   return ret;
 }
 
@@ -728,7 +758,14 @@ int Restore::restore_obj_from_cloud(rgw::sal::Bucket* pbucket,
     return ret;
   }
 
-  ldpp_dout(this, 10) << __PRETTY_FUNCTION__ << ": Restore of object " << pobj->get_key() << " is in progress." << dendl;  
+  // For cloud-s3 tier (not glacier), wake the restore worker immediately
+  // to start the download instead of waiting for the next periodic cycle
+  if (tier && tier->get_tier_type() == "cloud-s3") {
+    ldpp_dout(this, 10) << __PRETTY_FUNCTION__ << ": Waking restore worker for immediate processing (cloud-s3 tier)" << dendl;
+    wake_worker();
+  }
+
+  ldpp_dout(this, 10) << __PRETTY_FUNCTION__ << ": Restore of object " << pobj->get_key() << " is in progress." << dendl;
 
   if (notify) {
     auto& attrs = pobj->get_attrs();
@@ -747,6 +784,7 @@ int Restore::restore_obj_from_cloud(rgw::sal::Bucket* pbucket,
     }
   }
 
+
   return ret;
 }
 
index 981eb49360c5d01144ea2474ae22befb49558722..c759fd8abf1b29f4696b3b50d507182b80599307 100644 (file)
@@ -20,6 +20,7 @@
 #include "cls/rgw/cls_rgw_types.h"
 #include "rgw_sal.h"
 #include "rgw_notify.h"
+#include "rgw_restore_waiter.h"
 
 #include <atomic>
 #include <tuple>
@@ -73,6 +74,7 @@ class Restore : public DoutPrefixProvider {
   int max_objs{0};
   std::vector<std::string> obj_names;
   std::atomic<bool> down_flag = { false };
+  std::shared_ptr<RestoreWaiterRegistry> waiter_registry;
 
   class RestoreWorker : public Thread
   {
@@ -95,6 +97,7 @@ class Restore : public DoutPrefixProvider {
     void *entry() override;
     void stop();
 
+    friend class Restore;
     friend class RGWRados;
   }; // RestoreWorker
 
@@ -116,6 +119,8 @@ public:
   bool going_down();
   void start_processor();
   void stop_processor();
+  void wake_worker();
+  std::shared_ptr<RestoreWaiterRegistry> get_waiter_registry() const { return waiter_registry; }
 
   CephContext *get_cct() const override { return cct; }
   rgw::sal::Restore* get_restore() const { return sal_restore.get(); }
diff --git a/src/rgw/rgw_restore_waiter.cc b/src/rgw/rgw_restore_waiter.cc
new file mode 100644 (file)
index 0000000..58ec4af
--- /dev/null
@@ -0,0 +1,226 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab ft=cpp
+
+#include "rgw_restore_waiter.h"
+#include <algorithm>
+#include <errno.h>
+
+namespace rgw::restore {
+
+void RestoreWaiter::reset() {
+  completed.store(false, std::memory_order_relaxed);
+  failed.store(false, std::memory_order_relaxed);
+  result.store(0, std::memory_order_relaxed);
+  cached_key.clear();
+}
+
+bool RestoreWaiter::wait_for(std::chrono::milliseconds timeout, optional_yield y) {
+  if (completed.load(std::memory_order_acquire)) {
+    return true;
+  }
+
+  if (y) {
+    auto& yield = y.get_yield_context();
+    auto timer = std::make_shared<Timer>(yield.get_executor());
+
+    {
+      std::lock_guard lock(timer_mtx);
+      active_timer = timer;
+    }
+
+    timer->expires_after(timeout);
+
+    boost::system::error_code ec;
+    timer->async_wait(yield[ec]);
+
+    {
+      std::lock_guard lock(timer_mtx);
+      active_timer.reset();
+    }
+
+    // timer cancellation is used to wake async waiters on completion/shutdown
+    if (ec && ec != boost::asio::error::operation_aborted) {
+      return false;
+    }
+
+    return completed.load(std::memory_order_acquire);
+  }
+
+  std::unique_lock lock(mtx);
+  return cv.wait_for(lock, timeout,
+                     [this] { return completed.load(std::memory_order_acquire); });
+}
+
+void RestoreWaiter::complete(bool success, int result_code) {
+  failed.store(!success, std::memory_order_release);
+  result.store(result_code, std::memory_order_release);
+  completed.store(true, std::memory_order_release);
+
+  cv.notify_all();
+
+  std::shared_ptr<Timer> timer;
+  {
+    std::lock_guard lock(timer_mtx);
+    timer = active_timer.lock();
+  }
+  if (timer) {
+    timer->cancel();
+  }
+}
+
+// RestoreWaiterPool implementation
+std::shared_ptr<RestoreWaiter> RestoreWaiterPool::acquire(std::weak_ptr<RestoreWaiterRegistry> owner) {
+  std::unique_lock lock(pool_mtx);
+
+  // Periodic eviction of old waiters
+  evict_old_waiters();
+
+  if (!free_list.empty()) {
+    auto waiter = std::move(free_list.back());
+    free_list.pop_back();
+    lock.unlock();
+
+    // Reset state
+    waiter->reset();
+
+    return std::shared_ptr<RestoreWaiter>(waiter.release(),
+      [owner](RestoreWaiter* w) {
+        if (auto reg = owner.lock()) {
+          reg->release_waiter(w);
+        } else {
+          delete w;
+        }
+      });
+  }
+
+  lock.unlock();
+  auto waiter = std::make_unique<RestoreWaiter>();
+  return std::shared_ptr<RestoreWaiter>(waiter.release(),
+    [owner](RestoreWaiter* w) {
+      if (auto reg = owner.lock()) {
+        reg->release_waiter(w);
+      } else {
+        delete w;
+      }
+    });
+}
+
+void RestoreWaiterPool::release(RestoreWaiter* waiter) {
+  std::lock_guard lock(pool_mtx);
+  if (free_list.size() < MAX_POOL_SIZE) {
+    waiter->last_used = ceph::coarse_real_clock::now();
+    free_list.emplace_back(waiter);
+  } else {
+    delete waiter;
+  }
+}
+
+void RestoreWaiterPool::evict_old_waiters() {
+  // Assumes pool_mtx is already held
+  if (free_list.empty()) {
+    return;
+  }
+
+  const auto now = ceph::coarse_real_clock::now();
+  const auto eviction_threshold = now - EVICTION_TIME;
+
+  free_list.erase(
+    std::remove_if(free_list.begin(), free_list.end(),
+      [eviction_threshold](const std::unique_ptr<RestoreWaiter>& w) {
+        return w->last_used < eviction_threshold;
+      }),
+    free_list.end()
+  );
+}
+
+// RestoreWaiterRegistry implementation
+std::string RestoreWaiterRegistry::make_key(const rgw_bucket& bucket, const rgw_obj_key& obj_key) {
+  std::string key;
+  key.reserve(bucket.name.size() + obj_key.name.size() + obj_key.instance.size() + 10);
+  key = bucket.get_key();
+  key += ':';
+  key += obj_key.name;
+  if (!obj_key.instance.empty()) {
+    key += ':';
+    key += obj_key.instance;
+  }
+  return key;
+}
+
+void RestoreWaiterRegistry::release_waiter(RestoreWaiter* waiter) {
+  waiter_pool.release(waiter);
+}
+
+std::shared_ptr<RestoreWaiter> RestoreWaiterRegistry::register_waiter(const rgw_bucket& bucket,
+                                                                        const rgw_obj_key& obj_key) {
+  if (shutting_down.load(std::memory_order_acquire)) {
+    return nullptr;
+  }
+
+  auto self = shared_from_this();
+  auto waiter = waiter_pool.acquire(self);
+  waiter->cached_key = make_key(bucket, obj_key);
+
+  std::unique_lock lock(registry_mtx);
+  if (shutting_down.load(std::memory_order_relaxed)) {
+    return nullptr;
+  }
+  auto& vec = waiters[waiter->cached_key];
+  vec.push_back(waiter);
+
+  return waiter;
+}
+
+void RestoreWaiterRegistry::unregister_waiter(std::shared_ptr<RestoreWaiter> waiter) {
+  std::unique_lock lock(registry_mtx);
+
+  auto it = waiters.find(waiter->cached_key);
+  if (it != waiters.end()) {
+    auto& vec = it->second;
+    vec.erase(std::remove(vec.begin(), vec.end(), waiter), vec.end());
+    if (vec.empty()) {
+      waiters.erase(it);
+    }
+  }
+}
+
+void RestoreWaiterRegistry::notify_completion(const rgw_bucket& bucket,
+                                               const rgw_obj_key& obj_key,
+                                               bool success,
+                                               int result) {
+  std::string key = make_key(bucket, obj_key);
+  std::vector<std::shared_ptr<RestoreWaiter>> to_notify;
+
+  {
+    std::unique_lock lock(registry_mtx);
+    auto it = waiters.find(key);
+    if (it != waiters.end()) {
+      to_notify = std::move(it->second);
+      waiters.erase(it);
+    }
+  }
+
+  // Notify outside the lock - lock-free atomic updates
+  for (auto& waiter : to_notify) {
+    waiter->complete(success, result);
+  }
+}
+
+void RestoreWaiterRegistry::shutdown() {
+  shutting_down.store(true, std::memory_order_release);
+  std::vector<std::shared_ptr<RestoreWaiter>> to_notify;
+
+  {
+    std::unique_lock lock(registry_mtx);
+    for (auto& [_, vec] : waiters) {
+      to_notify.insert(to_notify.end(), vec.begin(), vec.end());
+    }
+    waiters.clear();
+  }
+
+  for (auto& waiter : to_notify) {
+    waiter->complete(false, -ECANCELED);
+  }
+}
+
+} // namespace rgw::restore
diff --git a/src/rgw/rgw_restore_waiter.h b/src/rgw/rgw_restore_waiter.h
new file mode 100644 (file)
index 0000000..fde9ebc
--- /dev/null
@@ -0,0 +1,117 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab ft=cpp
+
+#pragma once
+
+#include <memory>
+#include <mutex>
+#include <shared_mutex>
+#include <chrono>
+#include <boost/asio/basic_waitable_timer.hpp>
+#include <boost/asio/any_io_executor.hpp>
+#include <boost/asio/error.hpp>
+#include <boost/system/error_code.hpp>
+#include <condition_variable>
+#include <atomic>
+#include <unordered_map>
+#include <vector>
+#include <string>
+#include "rgw_common.h"
+#include "rgw_sal.h"
+
+namespace rgw::restore {
+
+class RestoreWaiterRegistry;
+
+// Represents a single GET request waiting for restore completion
+struct RestoreWaiter {
+  // the blocking wait uses std::condition_variable::wait_for(), which uses the
+  // std::chrono::steady_clock. use that clock for async waits as well
+  using Clock = std::chrono::steady_clock;
+  using Timer = boost::asio::basic_waitable_timer<Clock>;
+
+  std::mutex mtx;
+  std::condition_variable cv;
+  std::mutex timer_mtx;
+  std::weak_ptr<Timer> active_timer;
+  std::atomic<bool> completed{false};
+  std::atomic<bool> failed{false};
+  std::atomic<int16_t> result{0};  // Error codes fit in int16_t
+  std::string cached_key;  // Cached registry key to avoid recomputation in unregister
+  ceph::coarse_real_time last_used;  // Timestamp for pool eviction
+
+  // Wait for completion for up to 'timeout'. Uses cv for blocking callers and
+  // a timer for coroutine callers so we don't block the frontend coroutine.
+  bool wait_for(std::chrono::milliseconds timeout, optional_yield y);
+  // Mark completion and wake any waiting callers.
+  void complete(bool success, int result_code);
+  // Reset state before reuse.
+  void reset();
+};
+
+// Object pool for RestoreWaiter to avoid repeated allocations
+class RestoreWaiterPool {
+private:
+  friend class RestoreWaiterRegistry;
+  std::mutex pool_mtx;
+  std::vector<std::unique_ptr<RestoreWaiter>> free_list;
+  static constexpr size_t MAX_POOL_SIZE = 256;
+  static constexpr std::chrono::seconds EVICTION_TIME{300};  // 5 minutes
+
+public:
+  std::shared_ptr<RestoreWaiter> acquire(std::weak_ptr<RestoreWaiterRegistry> owner);
+
+private:
+  void release(RestoreWaiter* waiter);
+  void evict_old_waiters();
+};
+
+// Registry mapping object keys to waiting GET requests
+class RestoreWaiterRegistry : public std::enable_shared_from_this<RestoreWaiterRegistry> {
+private:
+  mutable std::shared_mutex registry_mtx;
+  std::unordered_map<std::string, std::vector<std::shared_ptr<RestoreWaiter>>> waiters;
+  RestoreWaiterPool waiter_pool;
+  std::atomic<bool> shutting_down{false};
+  friend class RestoreWaiterPool;
+
+  static std::string make_key(const rgw_bucket& bucket, const rgw_obj_key& obj_key);
+  void release_waiter(RestoreWaiter* waiter);
+
+public:
+  // Register a waiter for an object restore
+  std::shared_ptr<RestoreWaiter> register_waiter(const rgw_bucket& bucket,
+                                                   const rgw_obj_key& obj_key);
+
+  // Unregister a waiter (called on timeout or completion)
+  void unregister_waiter(std::shared_ptr<RestoreWaiter> waiter);
+
+  // Signal all waiters for an object (called by restore worker)
+  void notify_completion(const rgw_bucket& bucket,
+                         const rgw_obj_key& obj_key,
+                         bool success,
+                         int result);
+
+  // Cancel all waiters and prevent new registrations
+  void shutdown();
+};
+
+// RAII guard for automatic waiter unregistration
+struct WaiterGuard {
+  std::shared_ptr<RestoreWaiterRegistry> registry;
+  std::shared_ptr<RestoreWaiter> waiter;
+
+  WaiterGuard(std::shared_ptr<RestoreWaiterRegistry> reg, std::shared_ptr<RestoreWaiter> w)
+    : registry(std::move(reg)), waiter(std::move(w)) {}
+
+  ~WaiterGuard() {
+    if (registry && waiter) {
+      registry->unregister_waiter(waiter);
+    }
+  }
+
+  WaiterGuard(const WaiterGuard&) = delete;
+  WaiterGuard& operator=(const WaiterGuard&) = delete;
+};
+
+} // namespace rgw::restore