]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: switch to new watch/notify API
authorSage Weil <sage@redhat.com>
Wed, 11 Feb 2015 00:20:12 +0000 (16:20 -0800)
committerYehuda Sadeh <yehuda@redhat.com>
Thu, 5 Mar 2015 20:29:23 +0000 (12:29 -0800)
Note that we don't really use it fully, yet.  The main semantic change
here is that we have to explicitly ack the notify.

However, still missing is re-registration of the watch if we see a failure,
and ignoring the cache if watch_check tells us the watch is stale.

Signed-off-by: Sage Weil <sage@redhat.com>
src/rgw/rgw_cache.h
src/rgw/rgw_rados.cc
src/rgw/rgw_rados.h

index 4db884b41c73a164afa8082bb4690f6641333214..2446bf0729b9d84b071af1a7d6d5bb22557255e6 100644 (file)
@@ -201,7 +201,10 @@ class RGWCache  : public T
   }
 
   int distribute_cache(const string& normal_name, rgw_obj& obj, ObjectCacheInfo& obj_info, int op);
-  int watch_cb(int opcode, uint64_t ver, bufferlist& bl);
+  int watch_cb(uint64_t notify_id,
+              uint64_t cookie,
+              uint64_t notifier_id,
+              bufferlist& bl);
 public:
   RGWCache() {}
 
@@ -554,7 +557,10 @@ int RGWCache<T>::distribute_cache(const string& normal_name, rgw_obj& obj, Objec
 }
 
 template <class T>
-int RGWCache<T>::watch_cb(int opcode, uint64_t ver, bufferlist& bl)
+int RGWCache<T>::watch_cb(uint64_t notify_id,
+                         uint64_t cookie,
+                         uint64_t notifier_id,
+                         bufferlist& bl)
 {
   RGWCacheNotifyInfo info;
 
index 036204e4a9549dd902c7ff0f0ed0edc6f8073286..e9fc30a9d6e312caa51f7b1ca96fd32823f870ca 100644 (file)
@@ -1201,13 +1201,28 @@ int RGWPutObjProcessor_Atomic::do_complete(string& etag, time_t *mtime, time_t s
   return 0;
 }
 
-class RGWWatcher : public librados::WatchCtx {
+class RGWWatcher : public librados::WatchCtx2 {
   RGWRados *rados;
+  string oid;
 public:
-  RGWWatcher(RGWRados *r) : rados(r) {}
-  void notify(uint8_t opcode, uint64_t ver, bufferlist& bl) {
-    ldout(rados->ctx(), 10) << "RGWWatcher::notify() opcode=" << (int)opcode << " ver=" << ver << " bl.length()=" << bl.length() << dendl;
-    rados->watch_cb(opcode, ver, bl);
+  RGWWatcher(RGWRados *r, const string& o) : rados(r), oid(o) {}
+  void handle_notify(uint64_t notify_id,
+                    uint64_t cookie,
+                    uint64_t notifier_id,
+                    bufferlist& bl) {
+    ldout(rados->ctx(), 10) << "RGWWatcher::handle_notify() "
+                           << " notify_id " << notify_id
+                           << " cookie " << cookie
+                           << " notifier " << notifier_id
+                           << " bl.length()=" << bl.length() << dendl;
+    rados->watch_cb(notify_id, cookie, notifier_id, bl);
+
+    bufferlist reply_bl; // empty reply payload
+    rados->control_pool_ctx.notify_ack(oid, notify_id, cookie, reply_bl);
+  }
+  void handle_error(uint64_t cookie, int err) {
+    lderr(rados->ctx()) << "RGWWatcher::handle_error cookie " << cookie
+                       << " err " << cpp_strerror(err) << dendl;
   }
 };
 
@@ -1472,7 +1487,7 @@ void RGWRados::finalize_watch()
     if (notify_oid.empty())
       continue;
     uint64_t watch_handle = watch_handles[i];
-    control_pool_ctx.unwatch(notify_oid, watch_handle);
+    control_pool_ctx.unwatch2(watch_handle);
 
     RGWWatcher *watcher = watchers[i];
     delete watcher;
@@ -1604,10 +1619,10 @@ int RGWRados::init_watch()
     if (r < 0 && r != -EEXIST)
       return r;
 
-    RGWWatcher *watcher = new RGWWatcher(this);
+    RGWWatcher *watcher = new RGWWatcher(this, notify_oid);
     watchers[i] = watcher;
 
-    r = control_pool_ctx.watch(notify_oid, 0, &watch_handles[i], watcher);
+    r = control_pool_ctx.watch2(notify_oid, &watch_handles[i], watcher);
     if (r < 0)
       return r;
   }
@@ -7018,7 +7033,7 @@ int RGWRados::distribute(const string& key, bufferlist& bl)
   pick_control_oid(key, notify_oid);
 
   ldout(cct, 10) << "distributing notification oid=" << notify_oid << " bl.length()=" << bl.length() << dendl;
-  int r = control_pool_ctx.notify(notify_oid, 0, bl);
+  int r = control_pool_ctx.notify2(notify_oid, bl, 0, NULL);
   return r;
 }
 
index e6fe72b60d5f823b61032f6a13d2f0c481be5ce1..fa09f0617dd3acad07ec5b31101ce50735ec381d 100644 (file)
@@ -1208,6 +1208,8 @@ class RGWRados
   librados::IoCtx control_pool_ctx;   // .rgw.control
   bool watch_initialized;
 
+  friend class RGWWatcher;
+
   Mutex bucket_id_lock;
 
   // This field represents the number of bucket index object shards
@@ -1888,7 +1890,10 @@ public:
   virtual int init_watch();
   virtual void finalize_watch();
   virtual int distribute(const string& key, bufferlist& bl);
-  virtual int watch_cb(int opcode, uint64_t ver, bufferlist& bl) { return 0; }
+  virtual int watch_cb(uint64_t notify_id,
+                      uint64_t cookie,
+                      uint64_t notifier_id,
+                      bufferlist& bl) { return 0; }
   void pick_control_oid(const string& key, string& notify_oid);
 
   void set_atomic(void *ctx, rgw_obj& obj) {