]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osd: implement notify ack payloads
authorSage Weil <sage@redhat.com>
Thu, 21 Aug 2014 21:46:35 +0000 (14:46 -0700)
committerSage Weil <sage@redhat.com>
Thu, 4 Dec 2014 18:32:37 +0000 (10:32 -0800)
If the notified send back reply payloads, pass them back to the notifier.

Note that we have changed the on-wire behavior of the watch completion
message a bit: instead of sending the original notify payload back to the
notifier, we send the map of notified to replies.  Note that only users of
the new API will know what to do with the notify acknowledgement
information.  At the same time, we stop sending the original payload.
However, the old API users never saw that data; we were uselessly sending
it over the wire.

Signed-off-by: Sage Weil <sage@redhat.com>
src/osd/ReplicatedPG.cc
src/osd/ReplicatedPG.h
src/osd/Watch.cc
src/osd/Watch.h

index 5f3a7117a8609aee5a720429fe4dc3f358969aff..d0dea52df1ef490a99e3a25d0e4af674dc73124b 100644 (file)
@@ -3938,8 +3938,12 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
          uint64_t watch_cookie = 0;
          ::decode(notify_id, bp);
          ::decode(watch_cookie, bp);
+         bufferlist reply_bl;
+         if (!bp.end()) {
+           ::decode(reply_bl, bp);
+         }
          tracepoint(osd, do_osd_op_pre_notify_ack, soid.oid.name.c_str(), soid.snap.val, notify_id, watch_cookie, "Y");
-         OpContext::NotifyAck ack(notify_id, watch_cookie);
+         OpContext::NotifyAck ack(notify_id, watch_cookie, reply_bl);
          ctx->notify_acks.push_back(ack);
        } catch (const buffer::error &e) {
          tracepoint(osd, do_osd_op_pre_notify_ack, soid.oid.name.c_str(), soid.snap.val, op.watch.cookie, 0, "N");
@@ -5431,7 +5435,7 @@ void ReplicatedPG::do_osd_op_effects(OpContext *ctx)
       if (p->watch_cookie &&
          p->watch_cookie.get() != i->first.first) continue;
       dout(10) << "acking notify on watch " << i->first << dendl;
-      i->second->notify_ack(p->notify_id);
+      i->second->notify_ack(p->notify_id, p->reply_bl);
     }
   }
 }
index 3d57058cf8844096f2353bcfc83d9a01ab8d15f5..452661a058832d9610786dc415021694e855f3c5 100644 (file)
@@ -447,9 +447,12 @@ public:
     struct NotifyAck {
       boost::optional<uint64_t> watch_cookie;
       uint64_t notify_id;
+      bufferlist reply_bl;
       NotifyAck(uint64_t notify_id) : notify_id(notify_id) {}
-      NotifyAck(uint64_t notify_id, uint64_t cookie)
-       : watch_cookie(cookie), notify_id(notify_id) {}
+      NotifyAck(uint64_t notify_id, uint64_t cookie, bufferlist& rbl)
+       : watch_cookie(cookie), notify_id(notify_id) {
+       reply_bl.claim(rbl);
+      }
     };
     list<NotifyAck> notify_acks;
     
index 7137cd81de5b77cffe5973c65092ef25b1454ea7..4dfc8346f6616a46187bb75af3a17050139d9cfc 100644 (file)
@@ -152,7 +152,7 @@ void Notify::start_watcher(WatchRef watch)
   watchers.insert(watch);
 }
 
-void Notify::complete_watcher(WatchRef watch)
+void Notify::complete_watcher(WatchRef watch, bufferlist& reply_bl)
 {
   Mutex::Locker l(lock);
   dout(10) << "complete_watcher" << dendl;
@@ -161,6 +161,19 @@ void Notify::complete_watcher(WatchRef watch)
   assert(in_progress_watchers > 0);
   watchers.erase(watch);
   --in_progress_watchers;
+  notify_replies[watch->get_watcher_gid()].claim(reply_bl);
+  maybe_complete_notify();
+}
+
+void Notify::complete_watcher_remove(WatchRef watch)
+{
+  Mutex::Locker l(lock);
+  dout(10) << __func__ << dendl;
+  if (is_discarded())
+    return;
+  assert(in_progress_watchers > 0);
+  watchers.erase(watch);
+  --in_progress_watchers;
   maybe_complete_notify();
 }
 
@@ -170,8 +183,12 @@ void Notify::maybe_complete_notify()
           << in_progress_watchers
           << " in progress watchers " << dendl;
   if (!in_progress_watchers) {
+    bufferlist bl;
+    ::encode(notify_replies, bl);
+    bufferlist empty;
     MWatchNotify *reply(new MWatchNotify(cookie, version, notify_id,
-                                        WATCH_NOTIFY, payload));
+                                        WATCH_NOTIFY, empty));
+    reply->set_data(bl);
     if (timed_out)
       reply->return_code = -ETIMEDOUT;
     osd->send_message_osd_client(reply, client.get());
@@ -383,7 +400,7 @@ void Watch::remove()
   for (map<uint64_t, NotifyRef>::iterator i = in_progress_notifies.begin();
        i != in_progress_notifies.end();
        ++i) {
-    i->second->complete_watcher(self.lock());
+    i->second->complete_watcher_remove(self.lock());
   }
   discard_state();
 }
@@ -414,12 +431,12 @@ void Watch::send_notify(NotifyRef notif)
   osd->send_message_osd_client(notify_msg, conn.get());
 }
 
-void Watch::notify_ack(uint64_t notify_id)
+void Watch::notify_ack(uint64_t notify_id, bufferlist& reply_bl)
 {
   dout(10) << "notify_ack" << dendl;
   map<uint64_t, NotifyRef>::iterator i = in_progress_notifies.find(notify_id);
   if (i != in_progress_notifies.end()) {
-    i->second->complete_watcher(self.lock());
+    i->second->complete_watcher(self.lock(), reply_bl);
     in_progress_notifies.erase(i);
   }
 }
index 152ea951cd2a791a32778e572439c74c0c9654e5..00bcc638f6495bc48ffe9dbf4a13bafbf0e839af 100644 (file)
@@ -71,6 +71,8 @@ class Notify {
   CancelableContext *cb;
   Mutex lock;
 
+  /// gid -> reply_bl for everyone who acked the notify
+  map<uint64_t,bufferlist> notify_replies;
 
   /// true if this notify is being discarded
   bool is_discarded() {
@@ -130,6 +132,11 @@ public:
 
   /// Called once per NotifyAck
   void complete_watcher(
+    WatchRef watcher, ///< [in] watcher to complete
+    bufferlist& reply_bl ///< [in] reply buffer from the notified watcher
+    );
+  /// Called when a watcher unregisters or times out
+  void complete_watcher_remove(
     WatchRef watcher ///< [in] watcher to complete
     );
 
@@ -186,6 +193,10 @@ public:
   /// NOTE: must be called with pg lock held
   ~Watch();
 
+  uint64_t get_watcher_gid() const {
+    return entity.num();
+  }
+
   string gen_dbg_prefix();
   static WatchRef makeWatchRef(
     ReplicatedPG *pg, OSDService *osd,
@@ -239,7 +250,8 @@ public:
 
   /// Call when notify_ack received on notify_id
   void notify_ack(
-    uint64_t notify_id ///< [in] id of acked notify
+    uint64_t notify_id, ///< [in] id of acked notify
+    bufferlist& reply_bl ///< [in] notify reply buffer
     );
 };