From 7bab9f7c68cf7ecb816d13683ef6243dc501dec5 Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Thu, 12 Feb 2015 15:29:24 -0800 Subject: [PATCH] rgw: fix watch initialization and reinit on error We can't just call rados->finalize_watch() and rados->init_watch() from the watcher, as these calls modify the watcher itself. Also, we can't just enable the cache after successful watch, because there's more than one watcher. Only enable the cache if all watchers are set. Signed-off-by: Yehuda Sadeh --- src/rgw/rgw_rados.cc | 87 ++++++++++++++++++++++++++++++++++++-------- src/rgw/rgw_rados.h | 11 ++++-- 2 files changed, 80 insertions(+), 18 deletions(-) diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index 7851e28d11520..585bc567efeda 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -1210,9 +1210,51 @@ int RGWPutObjProcessor_Atomic::do_complete(string& etag, time_t *mtime, time_t s return 0; } +int RGWRados::watch(const string& oid, uint64_t *watch_handle, librados::WatchCtx2 *ctx) { + int r = control_pool_ctx.watch2(oid, watch_handle, ctx); + if (r < 0) + return r; + return 0; +} + +int RGWRados::unwatch(uint64_t watch_handle) +{ + int r = control_pool_ctx.unwatch2(watch_handle); + if (r < 0) { + return r; + } + return 0; +} + +void RGWRados::add_watcher(int i) +{ + ldout(cct, 20) << "add_watcher() i=" << i << dendl; + Mutex::Locker l(watchers_lock); + watchers_set.insert(i); + if (watchers_set.size() == (size_t)num_watchers) { + ldout(cct, 2) << "all " << num_watchers << " watchers are set, enabling cache" << dendl; + set_cache_enabled(true); + } +} + +void RGWRados::remove_watcher(int i) +{ + ldout(cct, 20) << "remove_watcher() i=" << i << dendl; + Mutex::Locker l(watchers_lock); + size_t orig_size = watchers_set.size(); + watchers_set.erase(i); + if (orig_size == (size_t)num_watchers && + watchers_set.size() < orig_size) { /* actually removed */ + ldout(cct, 2) << "removed watcher, disabling cache" << dendl; + set_cache_enabled(false); + } +} + class RGWWatcher : public librados::WatchCtx2 { RGWRados *rados; + int index; string oid; + uint64_t watch_handle; class C_ReinitWatch : public Context { RGWWatcher *watcher; @@ -1223,7 +1265,7 @@ class RGWWatcher : public librados::WatchCtx2 { } }; public: - RGWWatcher(RGWRados *r, const string& o) : rados(r), oid(o) {} + RGWWatcher(RGWRados *r, int i, const string& o) : rados(r), index(i), oid(o), watch_handle(0) {} void handle_notify(uint64_t notify_id, uint64_t cookie, uint64_t notifier_id, @@ -1241,17 +1283,39 @@ public: void handle_error(uint64_t cookie, int err) { lderr(rados->ctx()) << "RGWWatcher::handle_error cookie " << cookie << " err " << cpp_strerror(err) << dendl; - rados->set_cache_enabled(false); + rados->remove_watcher(index); rados->schedule_context(new C_ReinitWatch(this)); } void reinit() { - rados->finalize_watch(); - int ret = rados->init_watch(); + int ret = unregister_watch(); if (ret < 0) { - ldout(rados->ctx(), 0) << "ERROR: init_watch() returned ret=" << ret << ", cache is disabled" << dendl; + ldout(rados->ctx(), 0) << "ERROR: unregister_watch() returned ret=" << ret << dendl; return; } + ret = register_watch(); + if (ret < 0) { + ldout(rados->ctx(), 0) << "ERROR: register_watch() returned ret=" << ret << dendl; + return; + } + } + + int unregister_watch() { + int r = rados->unwatch(watch_handle); + if (r < 0) { + return r; + } + rados->remove_watcher(index); + return 0; + } + + int register_watch() { + int r = rados->watch(oid, &watch_handle, this); + if (r < 0) { + return r; + } + rados->add_watcher(index); + return 0; } }; @@ -1519,18 +1583,12 @@ int RGWRados::initialize() void RGWRados::finalize_watch() { for (int i = 0; i < num_watchers; i++) { - string& notify_oid = notify_oids[i]; - if (notify_oid.empty()) - continue; - uint64_t watch_handle = watch_handles[i]; - control_pool_ctx.unwatch2(watch_handle); - RGWWatcher *watcher = watchers[i]; + watcher->unregister_watch(); delete watcher; } delete[] notify_oids; - delete[] watch_handles; delete[] watchers; } @@ -1645,7 +1703,6 @@ int RGWRados::init_watch() notify_oids = new string[num_watchers]; watchers = new RGWWatcher *[num_watchers]; - watch_handles = new uint64_t[num_watchers]; for (int i=0; i < num_watchers; i++) { string& notify_oid = notify_oids[i]; @@ -1659,10 +1716,10 @@ int RGWRados::init_watch() if (r < 0 && r != -EEXIST) return r; - RGWWatcher *watcher = new RGWWatcher(this, notify_oid); + RGWWatcher *watcher = new RGWWatcher(this, i, notify_oid); watchers[i] = watcher; - r = control_pool_ctx.watch2(notify_oid, &watch_handles[i], watcher); + r = watcher->register_watch(); if (r < 0) return r; } diff --git a/src/rgw/rgw_rados.h b/src/rgw/rgw_rados.h index 76f8019946fff..082e85bb3c082 100644 --- a/src/rgw/rgw_rados.h +++ b/src/rgw/rgw_rados.h @@ -1189,6 +1189,7 @@ class RGWRados void get_bucket_instance_ids(RGWBucketInfo& bucket_info, int shard_id, map *result); Mutex lock; + Mutex watchers_lock; SafeTimer *timer; class C_Tick : public Context { @@ -1206,7 +1207,7 @@ class RGWRados int num_watchers; RGWWatcher **watchers; - uint64_t *watch_handles; + std::set watchers_set; librados::IoCtx root_pool_ctx; // .rgw librados::IoCtx control_pool_ctx; // .rgw.control bool watch_initialized; @@ -1248,9 +1249,9 @@ protected: Finisher *finisher; public: - RGWRados() : lock("rados_timer_lock"), timer(NULL), + RGWRados() : lock("rados_timer_lock"), watchers_lock("watchers_lock"), timer(NULL), gc(NULL), use_gc_thread(false), quota_threads(false), - num_watchers(0), watchers(NULL), watch_handles(NULL), + num_watchers(0), watchers(NULL), watch_initialized(false), bucket_id_lock("rados_bucket_id"), bucket_index_max_shards(0), @@ -1895,6 +1896,10 @@ public: virtual int update_containers_stats(map& m); virtual int append_async(rgw_obj& obj, size_t size, bufferlist& bl); + int watch(const string& oid, uint64_t *watch_handle, librados::WatchCtx2 *ctx); + int unwatch(uint64_t watch_handle); + void add_watcher(int i); + void remove_watcher(int i); virtual bool need_watch_notify() { return false; } virtual int init_watch(); virtual void finalize_watch(); -- 2.39.5