]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: send concurrent watch/unwatch operations 62538/head
authorCasey Bodley <cbodley@redhat.com>
Thu, 27 Mar 2025 00:45:24 +0000 (20:45 -0400)
committerCasey Bodley <cbodley@redhat.com>
Thu, 27 Mar 2025 14:51:53 +0000 (10:51 -0400)
accelerate startup and shutdown of radosgw and radosgw-admin by sending
concurrent watch/unwatch operations over the rgw_num_control_oid=8 objects

Signed-off-by: Casey Bodley <cbodley@redhat.com>
src/common/options/rgw.yaml.in
src/rgw/services/svc_notify.cc
src/rgw/services/svc_notify.h

index 2b81985c9d2fe969207a2cbf1787ce9689343430..ac945fdc52028b4b27dcc699276d7df5a8a09c09 100644 (file)
@@ -1133,7 +1133,23 @@ options:
   default: 8
   services:
   - rgw
+  see_also:
+  - rgw_cache_enabled
+  - rgw_max_control_aio
   with_legacy: true
+- name: rgw_max_control_aio
+  type: int
+  level: advanced
+  desc: Maximum number of concurrent operations over control objects.
+  long_desc: When metadata caching is enabled, a watch operation is sent to each
+    control object on startup, with a corresponding unwatch on shutdown. To
+    accelerate startup/shutdown, allow several concurrent operations to be sent
+    at once.
+  default: 8
+  services:
+  - rgw
+  see_also:
+  - rgw_num_control_oids
 - name: rgw_verify_ssl
   type: bool
   level: advanced
index 4474e80c4bf97c3c0c438a48bfbfabe91e6de01f..6999aaa6a13d226ffc1c675bc8a4dcad8629f660 100644 (file)
@@ -3,7 +3,9 @@
 
 #include "include/random.h"
 #include "include/Context.h"
+#include "common/async/spawn_throttle.h"
 #include "common/errno.h"
+#include "common/error_code.h"
 
 #include "rgw_cache.h"
 #include "svc_notify.h"
@@ -33,9 +35,7 @@ class RGWWatcher : public DoutPrefixProvider , public librados::WatchCtx2 {
   int index;
   rgw_rados_ref obj;
   uint64_t watch_handle;
-  int register_ret{0};
   bool unregister_done{false};
-  librados::AioCompletion *register_completion{nullptr};
   uint64_t retries = 0;
 
   class C_ReinitWatch : public Context {
@@ -99,7 +99,7 @@ public:
       abort();
     }
     if(!unregister_done) {
-      int ret = unregister_watch();
+      int ret = unregister_watch(null_yield);
       if (ret < 0) {
         ldout(cct, 0) << "ERROR: unregister_watch() returned ret=" << ret << dendl;
        if (-2 == ret) {
@@ -111,7 +111,7 @@ public:
        }
       }
     }
-    int ret = register_watch();
+    int ret = register_watch(null_yield);
     if (ret < 0) {
       ldout(cct, 0) << "ERROR: register_watch() returned ret=" << ret << dendl;
       ++retries;
@@ -120,8 +120,8 @@ public:
     }
   }
 
-  int unregister_watch() {
-    int r = svc->unwatch(obj, watch_handle);
+  int unregister_watch(optional_yield y) {
+    int r = svc->unwatch(this, obj, watch_handle, y);
     unregister_done = true;
     if (r < 0) {
       return r;
@@ -130,41 +130,8 @@ public:
     return 0;
   }
 
-  int register_watch_async() {
-    if (register_completion) {
-      register_completion->release();
-      register_completion = nullptr;
-    }
-    register_completion = librados::Rados::aio_create_completion(nullptr, nullptr);
-    register_ret = obj.aio_watch(register_completion, &watch_handle, this);
-    if (register_ret < 0) {
-      register_completion->release();
-      return register_ret;
-    }
-    return 0;
-  }
-
-  int register_watch_finish() {
-    if (register_ret < 0) {
-      return register_ret;
-    }
-    if (!register_completion) {
-      return -EINVAL;
-    }
-    register_completion->wait_for_complete();
-    int r = register_completion->get_return_value();
-    register_completion->release();
-    register_completion = nullptr;
-    if (r < 0) {
-      return r;
-    }
-    svc->add_watcher(index);
-    unregister_done = false;
-    return 0;
-  }
-
-  int register_watch() {
-    int r = obj.watch(&watch_handle, this);
+  int register_watch(optional_yield y) {
+    int r = obj.watch(this, &watch_handle, this, y);
     if (r < 0) {
       return r;
     }
@@ -211,8 +178,9 @@ int RGWSI_Notify::init_watch(const DoutPrefixProvider *dpp, optional_yield y)
   if (num_watchers <= 0)
     num_watchers = 1;
 
-  int error = 0;
-
+  const size_t max_aio = cct->_conf.get_val<int64_t>("rgw_max_control_aio");
+  auto throttle = ceph::async::spawn_throttle{
+      y, max_aio, ceph::async::cancel_on_error::all};
   watchers.reserve(num_watchers);
 
   for (int i=0; i < num_watchers; i++) {
@@ -231,36 +199,34 @@ int RGWSI_Notify::init_watch(const DoutPrefixProvider *dpp, optional_yield y)
       ldpp_dout(dpp, 0) << "ERROR: notify_obj.open() returned r=" << r << dendl;
       return r;
     }
-
-    librados::ObjectWriteOperation op;
-    op.create(false);
-
-    r = notify_obj.operate(dpp, std::move(op), y);
-    if (r < 0 && r != -EEXIST) {
-      ldpp_dout(dpp, 0) << "ERROR: notify_obj.operate() returned r=" << r << dendl;
-      return r;
-    }
-
     auto& watcher = watchers.emplace_back(cct, this, i, std::move(notify_obj));
 
-    r = watcher.register_watch_async();
-    if (r < 0) {
-      ldpp_dout(dpp, 0) << "WARNING: register_watch_aio() returned " << r << dendl;
-      error = r;
-      continue;
-    }
-  }
-
-  for (auto& watcher : watchers) {
-    int r = watcher.register_watch_finish();
-    if (r < 0) {
-      ldpp_dout(dpp, 0) << "WARNING: async watch returned " << r << dendl;
-      error = r;
+    try {
+      throttle.spawn([dpp, &watcher] (boost::asio::yield_context yield) {
+            // create the object if it doesn't exist
+            librados::ObjectWriteOperation op;
+            op.create(false);
+
+            int r = watcher.get_obj().operate(dpp, std::move(op), yield);
+            if (r < 0 && r != -EEXIST) {
+              ldpp_dout(dpp, 0) << "ERROR: notify_obj.operate() returned r=" << r << dendl;
+              throw boost::system::system_error(ceph::to_error_code(r));
+            }
+
+            r = watcher.register_watch(yield);
+            if (r < 0) {
+              throw boost::system::system_error(ceph::to_error_code(r));
+            }
+          });
+    } catch (const boost::system::system_error& e) {
+      return ceph::from_error_code(e.code());
     }
-  }
+  } // for num_watchers
 
-  if (error < 0) {
-    return error;
+  try {
+    throttle.wait();
+  } catch (const boost::system::system_error& e) {
+    return ceph::from_error_code(e.code());
   }
 
   return 0;
@@ -268,10 +234,19 @@ int RGWSI_Notify::init_watch(const DoutPrefixProvider *dpp, optional_yield y)
 
 void RGWSI_Notify::finalize_watch()
 {
+  const size_t max_aio = cct->_conf.get_val<int64_t>("rgw_max_control_aio");
+  auto throttle = ceph::async::spawn_throttle{
+      null_yield, max_aio, ceph::async::cancel_on_error::all};
   for (int i = 0; i < num_watchers; i++) {
-    if (watchers_set.find(i) != watchers_set.end())
-      watchers[i].unregister_watch();
+    if (!watchers_set.contains(i)) {
+      continue;
+    }
+    throttle.spawn([&watcher = watchers[i]] (boost::asio::yield_context yield) {
+          std::ignore = watcher.unregister_watch(yield);
+        });
   }
+  throttle.wait();
+
   watchers.clear();
 }
 
@@ -325,9 +300,10 @@ void RGWSI_Notify::shutdown()
   finalized = true;
 }
 
-int RGWSI_Notify::unwatch(rgw_rados_ref& obj, uint64_t watch_handle)
+int RGWSI_Notify::unwatch(const DoutPrefixProvider* dpp, rgw_rados_ref& obj,
+                          uint64_t handle, optional_yield y)
 {
-  int r = obj.unwatch(watch_handle);
+  int r = obj.unwatch(dpp, handle, y);
   if (r < 0) {
     ldout(cct, 0) << "ERROR: rados->unwatch2() returned r=" << r << dendl;
     return r;
index eb271dc47da13e50ed8b3a096e7192ee0194f5df..a13219c684b09186e37ff4b6d7cf7f9c11099768 100644 (file)
@@ -66,7 +66,8 @@ private:
   int do_start(optional_yield, const DoutPrefixProvider *dpp) override;
   void shutdown() override;
 
-  int unwatch(rgw_rados_ref& obj, uint64_t watch_handle);
+  int unwatch(const DoutPrefixProvider* dpp, rgw_rados_ref& obj,
+              uint64_t handle, optional_yield y);
   void add_watcher(int i);
   void remove_watcher(int i);