From: Casey Bodley Date: Thu, 27 Mar 2025 00:45:24 +0000 (-0400) Subject: rgw: send concurrent watch/unwatch operations X-Git-Tag: testing/wip-vshankar-testing-20250411.090237-debug~22^2 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=f863b7dcad1ff2731790337780cdaf80377a6e71;p=ceph-ci.git rgw: send concurrent watch/unwatch operations accelerate startup and shutdown of radosgw and radosgw-admin by sending concurrent watch/unwatch operations over the rgw_num_control_oid=8 objects Signed-off-by: Casey Bodley --- diff --git a/src/common/options/rgw.yaml.in b/src/common/options/rgw.yaml.in index 2b81985c9d2..ac945fdc520 100644 --- a/src/common/options/rgw.yaml.in +++ b/src/common/options/rgw.yaml.in @@ -1133,7 +1133,23 @@ options: default: 8 services: - rgw + see_also: + - rgw_cache_enabled + - rgw_max_control_aio with_legacy: true +- name: rgw_max_control_aio + type: int + level: advanced + desc: Maximum number of concurrent operations over control objects. + long_desc: When metadata caching is enabled, a watch operation is sent to each + control object on startup, with a corresponding unwatch on shutdown. To + accelerate startup/shutdown, allow several concurrent operations to be sent + at once. + default: 8 + services: + - rgw + see_also: + - rgw_num_control_oids - name: rgw_verify_ssl type: bool level: advanced diff --git a/src/rgw/services/svc_notify.cc b/src/rgw/services/svc_notify.cc index 4474e80c4bf..6999aaa6a13 100644 --- a/src/rgw/services/svc_notify.cc +++ b/src/rgw/services/svc_notify.cc @@ -3,7 +3,9 @@ #include "include/random.h" #include "include/Context.h" +#include "common/async/spawn_throttle.h" #include "common/errno.h" +#include "common/error_code.h" #include "rgw_cache.h" #include "svc_notify.h" @@ -33,9 +35,7 @@ class RGWWatcher : public DoutPrefixProvider , public librados::WatchCtx2 { int index; rgw_rados_ref obj; uint64_t watch_handle; - int register_ret{0}; bool unregister_done{false}; - librados::AioCompletion *register_completion{nullptr}; uint64_t retries = 0; class C_ReinitWatch : public Context { @@ -99,7 +99,7 @@ public: abort(); } if(!unregister_done) { - int ret = unregister_watch(); + int ret = unregister_watch(null_yield); if (ret < 0) { ldout(cct, 0) << "ERROR: unregister_watch() returned ret=" << ret << dendl; if (-2 == ret) { @@ -111,7 +111,7 @@ public: } } } - int ret = register_watch(); + int ret = register_watch(null_yield); if (ret < 0) { ldout(cct, 0) << "ERROR: register_watch() returned ret=" << ret << dendl; ++retries; @@ -120,8 +120,8 @@ public: } } - int unregister_watch() { - int r = svc->unwatch(obj, watch_handle); + int unregister_watch(optional_yield y) { + int r = svc->unwatch(this, obj, watch_handle, y); unregister_done = true; if (r < 0) { return r; @@ -130,41 +130,8 @@ public: return 0; } - int register_watch_async() { - if (register_completion) { - register_completion->release(); - register_completion = nullptr; - } - register_completion = librados::Rados::aio_create_completion(nullptr, nullptr); - register_ret = obj.aio_watch(register_completion, &watch_handle, this); - if (register_ret < 0) { - register_completion->release(); - return register_ret; - } - return 0; - } - - int register_watch_finish() { - if (register_ret < 0) { - return register_ret; - } - if (!register_completion) { - return -EINVAL; - } - register_completion->wait_for_complete(); - int r = register_completion->get_return_value(); - register_completion->release(); - register_completion = nullptr; - if (r < 0) { - return r; - } - svc->add_watcher(index); - unregister_done = false; - return 0; - } - - int register_watch() { - int r = obj.watch(&watch_handle, this); + int register_watch(optional_yield y) { + int r = obj.watch(this, &watch_handle, this, y); if (r < 0) { return r; } @@ -211,8 +178,9 @@ int RGWSI_Notify::init_watch(const DoutPrefixProvider *dpp, optional_yield y) if (num_watchers <= 0) num_watchers = 1; - int error = 0; - + const size_t max_aio = cct->_conf.get_val("rgw_max_control_aio"); + auto throttle = ceph::async::spawn_throttle{ + y, max_aio, ceph::async::cancel_on_error::all}; watchers.reserve(num_watchers); for (int i=0; i < num_watchers; i++) { @@ -231,36 +199,34 @@ int RGWSI_Notify::init_watch(const DoutPrefixProvider *dpp, optional_yield y) ldpp_dout(dpp, 0) << "ERROR: notify_obj.open() returned r=" << r << dendl; return r; } - - librados::ObjectWriteOperation op; - op.create(false); - - r = notify_obj.operate(dpp, std::move(op), y); - if (r < 0 && r != -EEXIST) { - ldpp_dout(dpp, 0) << "ERROR: notify_obj.operate() returned r=" << r << dendl; - return r; - } - auto& watcher = watchers.emplace_back(cct, this, i, std::move(notify_obj)); - r = watcher.register_watch_async(); - if (r < 0) { - ldpp_dout(dpp, 0) << "WARNING: register_watch_aio() returned " << r << dendl; - error = r; - continue; - } - } - - for (auto& watcher : watchers) { - int r = watcher.register_watch_finish(); - if (r < 0) { - ldpp_dout(dpp, 0) << "WARNING: async watch returned " << r << dendl; - error = r; + try { + throttle.spawn([dpp, &watcher] (boost::asio::yield_context yield) { + // create the object if it doesn't exist + librados::ObjectWriteOperation op; + op.create(false); + + int r = watcher.get_obj().operate(dpp, std::move(op), yield); + if (r < 0 && r != -EEXIST) { + ldpp_dout(dpp, 0) << "ERROR: notify_obj.operate() returned r=" << r << dendl; + throw boost::system::system_error(ceph::to_error_code(r)); + } + + r = watcher.register_watch(yield); + if (r < 0) { + throw boost::system::system_error(ceph::to_error_code(r)); + } + }); + } catch (const boost::system::system_error& e) { + return ceph::from_error_code(e.code()); } - } + } // for num_watchers - if (error < 0) { - return error; + try { + throttle.wait(); + } catch (const boost::system::system_error& e) { + return ceph::from_error_code(e.code()); } return 0; @@ -268,10 +234,19 @@ int RGWSI_Notify::init_watch(const DoutPrefixProvider *dpp, optional_yield y) void RGWSI_Notify::finalize_watch() { + const size_t max_aio = cct->_conf.get_val("rgw_max_control_aio"); + auto throttle = ceph::async::spawn_throttle{ + null_yield, max_aio, ceph::async::cancel_on_error::all}; for (int i = 0; i < num_watchers; i++) { - if (watchers_set.find(i) != watchers_set.end()) - watchers[i].unregister_watch(); + if (!watchers_set.contains(i)) { + continue; + } + throttle.spawn([&watcher = watchers[i]] (boost::asio::yield_context yield) { + std::ignore = watcher.unregister_watch(yield); + }); } + throttle.wait(); + watchers.clear(); } @@ -325,9 +300,10 @@ void RGWSI_Notify::shutdown() finalized = true; } -int RGWSI_Notify::unwatch(rgw_rados_ref& obj, uint64_t watch_handle) +int RGWSI_Notify::unwatch(const DoutPrefixProvider* dpp, rgw_rados_ref& obj, + uint64_t handle, optional_yield y) { - int r = obj.unwatch(watch_handle); + int r = obj.unwatch(dpp, handle, y); if (r < 0) { ldout(cct, 0) << "ERROR: rados->unwatch2() returned r=" << r << dendl; return r; diff --git a/src/rgw/services/svc_notify.h b/src/rgw/services/svc_notify.h index eb271dc47da..a13219c684b 100644 --- a/src/rgw/services/svc_notify.h +++ b/src/rgw/services/svc_notify.h @@ -66,7 +66,8 @@ private: int do_start(optional_yield, const DoutPrefixProvider *dpp) override; void shutdown() override; - int unwatch(rgw_rados_ref& obj, uint64_t watch_handle); + int unwatch(const DoutPrefixProvider* dpp, rgw_rados_ref& obj, + uint64_t handle, optional_yield y); void add_watcher(int i); void remove_watcher(int i);