return 0;
}
+int RGWRados::aio_watch(const string& oid, uint64_t *watch_handle, librados::WatchCtx2 *ctx, librados::AioCompletion *c) {
+ int r = control_pool_ctx.aio_watch2(oid, c, watch_handle, ctx, 0);
+ if (r < 0)
+ return r;
+ return 0;
+}
+
int RGWRados::unwatch(uint64_t watch_handle)
{
int r = control_pool_ctx.unwatch2(watch_handle);
int index;
string oid;
uint64_t watch_handle;
+ int register_ret{0};
+ librados::AioCompletion *register_completion{nullptr};
class C_ReinitWatch : public Context {
RGWWatcher *watcher;
return 0;
}
+ int register_watch_async() {
+ if (register_completion) {
+ register_completion->release();
+ register_completion = nullptr;
+ }
+ register_completion = librados::Rados::aio_create_completion(nullptr, nullptr, nullptr);
+ register_ret = rados->aio_watch(oid, &watch_handle, this, register_completion);
+ if (register_ret < 0) {
+ register_completion->release();
+ return register_ret;
+ }
+ return 0;
+ }
+
+ int register_watch_finish() {
+ if (register_ret < 0) {
+ return register_ret;
+ }
+ if (!register_completion) {
+ return -EINVAL;
+ }
+ register_completion->wait_for_safe();
+ int r = register_completion->get_return_value();
+ register_completion->release();
+ register_completion = nullptr;
+ if (r < 0) {
+ return r;
+ }
+ rados->add_watcher(index);
+ return 0;
+ }
+
int register_watch() {
int r = rados->watch(oid, &watch_handle, this);
if (r < 0) {
notify_oids = new string[num_watchers];
watchers = new RGWWatcher *[num_watchers];
+ int error = 0;
+
for (int i=0; i < num_watchers; i++) {
string& notify_oid = notify_oids[i];
notify_oid = notify_oid_prefix;
RGWWatcher *watcher = new RGWWatcher(this, i, notify_oid);
watchers[i] = watcher;
- r = watcher->register_watch();
- if (r < 0)
- return r;
+ r = watcher->register_watch_async();
+ if (r < 0) {
+ ldout(cct, 0) << "WARNING: register_watch_aio() returned " << r << dendl;
+ error = r;
+ continue;
+ }
+ }
+
+ for (int i = 0; i < num_watchers; ++i) {
+ int r = watchers[i]->register_watch_finish();
+ if (r < 0) {
+ ldout(cct, 0) << "WARNING: async watch returned " << r << dendl;
+ error = r;
+ }
+ }
+
+ if (error < 0) {
+ return error;
}
watch_initialized = true;
int append_async(rgw_raw_obj& obj, size_t size, bufferlist& bl);
int watch(const string& oid, uint64_t *watch_handle, librados::WatchCtx2 *ctx);
+ int aio_watch(const string& oid, uint64_t *watch_handle, librados::WatchCtx2 *ctx, librados::AioCompletion *c);
int unwatch(uint64_t watch_handle);
void add_watcher(int i);
void remove_watcher(int i);