]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: fix watch initialization and reinit on error
authorYehuda Sadeh <yehuda@redhat.com>
Thu, 12 Feb 2015 23:29:24 +0000 (15:29 -0800)
committerYehuda Sadeh <yehuda@redhat.com>
Thu, 5 Mar 2015 20:29:24 +0000 (12:29 -0800)
We can't just call rados->finalize_watch() and rados->init_watch() from
the watcher, as these calls modify the watcher itself. Also, we can't just
enable the cache after successful watch, because there's more than one
watcher. Only enable the cache if all watchers are set.

Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_rados.cc
src/rgw/rgw_rados.h

index 7851e28d11520af9179cf7e9952115c938577739..585bc567efeda030cd77a9917f1ff4ccaea64212 100644 (file)
@@ -1210,9 +1210,51 @@ int RGWPutObjProcessor_Atomic::do_complete(string& etag, time_t *mtime, time_t s
   return 0;
 }
 
+int RGWRados::watch(const string& oid, uint64_t *watch_handle, librados::WatchCtx2 *ctx) {
+  int r = control_pool_ctx.watch2(oid, watch_handle, ctx);
+  if (r < 0)
+    return r;
+  return 0;
+}
+
+int RGWRados::unwatch(uint64_t watch_handle)
+{
+  int r = control_pool_ctx.unwatch2(watch_handle);
+  if (r < 0) {
+    return r;
+  }
+  return 0;
+}
+
+void RGWRados::add_watcher(int i)
+{
+  ldout(cct, 20) << "add_watcher() i=" << i << dendl;
+  Mutex::Locker l(watchers_lock);
+  watchers_set.insert(i);
+  if (watchers_set.size() ==  (size_t)num_watchers) {
+    ldout(cct, 2) << "all " << num_watchers << " watchers are set, enabling cache" << dendl;
+    set_cache_enabled(true);
+  }
+}
+
+void RGWRados::remove_watcher(int i)
+{
+  ldout(cct, 20) << "remove_watcher() i=" << i << dendl;
+  Mutex::Locker l(watchers_lock);
+  size_t orig_size = watchers_set.size();
+  watchers_set.erase(i);
+  if (orig_size == (size_t)num_watchers &&
+      watchers_set.size() < orig_size) { /* actually removed */
+    ldout(cct, 2) << "removed watcher, disabling cache" << dendl;
+    set_cache_enabled(false);
+  }
+}
+
 class RGWWatcher : public librados::WatchCtx2 {
   RGWRados *rados;
+  int index;
   string oid;
+  uint64_t watch_handle;
 
   class C_ReinitWatch : public Context {
     RGWWatcher *watcher;
@@ -1223,7 +1265,7 @@ class RGWWatcher : public librados::WatchCtx2 {
       }
   };
 public:
-  RGWWatcher(RGWRados *r, const string& o) : rados(r), oid(o) {}
+  RGWWatcher(RGWRados *r, int i, const string& o) : rados(r), index(i), oid(o), watch_handle(0) {}
   void handle_notify(uint64_t notify_id,
                     uint64_t cookie,
                     uint64_t notifier_id,
@@ -1241,17 +1283,39 @@ public:
   void handle_error(uint64_t cookie, int err) {
     lderr(rados->ctx()) << "RGWWatcher::handle_error cookie " << cookie
                        << " err " << cpp_strerror(err) << dendl;
-    rados->set_cache_enabled(false);
+    rados->remove_watcher(index);
     rados->schedule_context(new C_ReinitWatch(this));
   }
 
   void reinit() {
-    rados->finalize_watch();
-    int ret = rados->init_watch();
+    int ret = unregister_watch();
     if (ret < 0) {
-      ldout(rados->ctx(), 0) << "ERROR: init_watch() returned ret=" << ret << ", cache is disabled" << dendl;
+      ldout(rados->ctx(), 0) << "ERROR: unregister_watch() returned ret=" << ret << dendl;
       return;
     }
+    ret = register_watch();
+    if (ret < 0) {
+      ldout(rados->ctx(), 0) << "ERROR: register_watch() returned ret=" << ret << dendl;
+      return;
+    }
+  }
+
+  int unregister_watch() {
+    int r = rados->unwatch(watch_handle);
+    if (r < 0) {
+      return r;
+    }
+    rados->remove_watcher(index);
+    return 0;
+  }
+
+  int register_watch() {
+    int r = rados->watch(oid, &watch_handle, this);
+    if (r < 0) {
+      return r;
+    }
+    rados->add_watcher(index);
+    return 0;
   }
 };
 
@@ -1519,18 +1583,12 @@ int RGWRados::initialize()
 void RGWRados::finalize_watch()
 {
   for (int i = 0; i < num_watchers; i++) {
-    string& notify_oid = notify_oids[i];
-    if (notify_oid.empty())
-      continue;
-    uint64_t watch_handle = watch_handles[i];
-    control_pool_ctx.unwatch2(watch_handle);
-
     RGWWatcher *watcher = watchers[i];
+    watcher->unregister_watch();
     delete watcher;
   }
 
   delete[] notify_oids;
-  delete[] watch_handles;
   delete[] watchers;
 }
 
@@ -1645,7 +1703,6 @@ int RGWRados::init_watch()
 
   notify_oids = new string[num_watchers];
   watchers = new RGWWatcher *[num_watchers];
-  watch_handles = new uint64_t[num_watchers];
 
   for (int i=0; i < num_watchers; i++) {
     string& notify_oid = notify_oids[i];
@@ -1659,10 +1716,10 @@ int RGWRados::init_watch()
     if (r < 0 && r != -EEXIST)
       return r;
 
-    RGWWatcher *watcher = new RGWWatcher(this, notify_oid);
+    RGWWatcher *watcher = new RGWWatcher(this, i, notify_oid);
     watchers[i] = watcher;
 
-    r = control_pool_ctx.watch2(notify_oid, &watch_handles[i], watcher);
+    r = watcher->register_watch();
     if (r < 0)
       return r;
   }
index 76f8019946fff634489683f8bd02571bb099bd14..082e85bb3c082d8272ec413f36a130053a6a3bf7 100644 (file)
@@ -1189,6 +1189,7 @@ class RGWRados
   void get_bucket_instance_ids(RGWBucketInfo& bucket_info, int shard_id, map<int, string> *result);
 
   Mutex lock;
+  Mutex watchers_lock;
   SafeTimer *timer;
 
   class C_Tick : public Context {
@@ -1206,7 +1207,7 @@ class RGWRados
 
   int num_watchers;
   RGWWatcher **watchers;
-  uint64_t *watch_handles;
+  std::set<int> watchers_set;
   librados::IoCtx root_pool_ctx;      // .rgw
   librados::IoCtx control_pool_ctx;   // .rgw.control
   bool watch_initialized;
@@ -1248,9 +1249,9 @@ protected:
   Finisher *finisher;
 
 public:
-  RGWRados() : lock("rados_timer_lock"), timer(NULL),
+  RGWRados() : lock("rados_timer_lock"), watchers_lock("watchers_lock"), timer(NULL),
                gc(NULL), use_gc_thread(false), quota_threads(false),
-               num_watchers(0), watchers(NULL), watch_handles(NULL),
+               num_watchers(0), watchers(NULL),
                watch_initialized(false),
                bucket_id_lock("rados_bucket_id"),
                bucket_index_max_shards(0),
@@ -1895,6 +1896,10 @@ public:
   virtual int update_containers_stats(map<string, RGWBucketEnt>& m);
   virtual int append_async(rgw_obj& obj, size_t size, bufferlist& bl);
 
+  int watch(const string& oid, uint64_t *watch_handle, librados::WatchCtx2 *ctx);
+  int unwatch(uint64_t watch_handle);
+  void add_watcher(int i);
+  void remove_watcher(int i);
   virtual bool need_watch_notify() { return false; }
   virtual int init_watch();
   virtual void finalize_watch();