#include "include/random.h"
#include "include/Context.h"
+#include "common/async/spawn_throttle.h"
#include "common/errno.h"
+#include "common/error_code.h"
#include "rgw_cache.h"
#include "svc_notify.h"
int index;
rgw_rados_ref obj;
uint64_t watch_handle;
- int register_ret{0};
bool unregister_done{false};
- librados::AioCompletion *register_completion{nullptr};
uint64_t retries = 0;
class C_ReinitWatch : public Context {
abort();
}
if(!unregister_done) {
- int ret = unregister_watch();
+ int ret = unregister_watch(null_yield);
if (ret < 0) {
ldout(cct, 0) << "ERROR: unregister_watch() returned ret=" << ret << dendl;
if (-2 == ret) {
}
}
}
- int ret = register_watch();
+ int ret = register_watch(null_yield);
if (ret < 0) {
ldout(cct, 0) << "ERROR: register_watch() returned ret=" << ret << dendl;
++retries;
}
}
- int unregister_watch() {
- int r = svc->unwatch(obj, watch_handle);
+ int unregister_watch(optional_yield y) {
+ int r = svc->unwatch(this, obj, watch_handle, y);
unregister_done = true;
if (r < 0) {
return r;
return 0;
}
- int register_watch_async() {
- if (register_completion) {
- register_completion->release();
- register_completion = nullptr;
- }
- register_completion = librados::Rados::aio_create_completion(nullptr, nullptr);
- register_ret = obj.aio_watch(register_completion, &watch_handle, this);
- if (register_ret < 0) {
- register_completion->release();
- return register_ret;
- }
- return 0;
- }
-
- int register_watch_finish() {
- if (register_ret < 0) {
- return register_ret;
- }
- if (!register_completion) {
- return -EINVAL;
- }
- register_completion->wait_for_complete();
- int r = register_completion->get_return_value();
- register_completion->release();
- register_completion = nullptr;
- if (r < 0) {
- return r;
- }
- svc->add_watcher(index);
- unregister_done = false;
- return 0;
- }
-
- int register_watch() {
- int r = obj.watch(&watch_handle, this);
+ int register_watch(optional_yield y) {
+ int r = obj.watch(this, &watch_handle, this, y);
if (r < 0) {
return r;
}
if (num_watchers <= 0)
num_watchers = 1;
- int error = 0;
-
+ const size_t max_aio = cct->_conf.get_val<int64_t>("rgw_max_control_aio");
+ auto throttle = ceph::async::spawn_throttle{
+ y, max_aio, ceph::async::cancel_on_error::all};
watchers.reserve(num_watchers);
for (int i=0; i < num_watchers; i++) {
ldpp_dout(dpp, 0) << "ERROR: notify_obj.open() returned r=" << r << dendl;
return r;
}
-
- librados::ObjectWriteOperation op;
- op.create(false);
-
- r = notify_obj.operate(dpp, std::move(op), y);
- if (r < 0 && r != -EEXIST) {
- ldpp_dout(dpp, 0) << "ERROR: notify_obj.operate() returned r=" << r << dendl;
- return r;
- }
-
auto& watcher = watchers.emplace_back(cct, this, i, std::move(notify_obj));
- r = watcher.register_watch_async();
- if (r < 0) {
- ldpp_dout(dpp, 0) << "WARNING: register_watch_aio() returned " << r << dendl;
- error = r;
- continue;
- }
- }
-
- for (auto& watcher : watchers) {
- int r = watcher.register_watch_finish();
- if (r < 0) {
- ldpp_dout(dpp, 0) << "WARNING: async watch returned " << r << dendl;
- error = r;
+ try {
+ throttle.spawn([dpp, &watcher] (boost::asio::yield_context yield) {
+ // create the object if it doesn't exist
+ librados::ObjectWriteOperation op;
+ op.create(false);
+
+ int r = watcher.get_obj().operate(dpp, std::move(op), yield);
+ if (r < 0 && r != -EEXIST) {
+ ldpp_dout(dpp, 0) << "ERROR: notify_obj.operate() returned r=" << r << dendl;
+ throw boost::system::system_error(ceph::to_error_code(r));
+ }
+
+ r = watcher.register_watch(yield);
+ if (r < 0) {
+ throw boost::system::system_error(ceph::to_error_code(r));
+ }
+ });
+ } catch (const boost::system::system_error& e) {
+ return ceph::from_error_code(e.code());
}
- }
+ } // for num_watchers
- if (error < 0) {
- return error;
+ try {
+ throttle.wait();
+ } catch (const boost::system::system_error& e) {
+ return ceph::from_error_code(e.code());
}
return 0;
void RGWSI_Notify::finalize_watch()
{
+ const size_t max_aio = cct->_conf.get_val<int64_t>("rgw_max_control_aio");
+ auto throttle = ceph::async::spawn_throttle{
+ null_yield, max_aio, ceph::async::cancel_on_error::all};
for (int i = 0; i < num_watchers; i++) {
- if (watchers_set.find(i) != watchers_set.end())
- watchers[i].unregister_watch();
+ if (!watchers_set.contains(i)) {
+ continue;
+ }
+ throttle.spawn([&watcher = watchers[i]] (boost::asio::yield_context yield) {
+ std::ignore = watcher.unregister_watch(yield);
+ });
}
+ throttle.wait();
+
watchers.clear();
}
finalized = true;
}
-int RGWSI_Notify::unwatch(rgw_rados_ref& obj, uint64_t watch_handle)
+int RGWSI_Notify::unwatch(const DoutPrefixProvider* dpp, rgw_rados_ref& obj,
+ uint64_t handle, optional_yield y)
{
- int r = obj.unwatch(watch_handle);
+ int r = obj.unwatch(dpp, handle, y);
if (r < 0) {
ldout(cct, 0) << "ERROR: rados->unwatch2() returned r=" << r << dendl;
return r;