]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
librbd: Notifier::notify API improvement 14072/head
authorMykola Golub <mgolub@mirantis.com>
Tue, 21 Mar 2017 21:20:27 +0000 (22:20 +0100)
committerMykola Golub <mgolub@mirantis.com>
Tue, 28 Mar 2017 14:07:29 +0000 (16:07 +0200)
Replace the out bufferlist with a response struct.

Signed-off-by: Mykola Golub <mgolub@mirantis.com>
src/librbd/WatchNotifyTypes.h
src/librbd/Watcher.cc
src/librbd/Watcher.h
src/librbd/image_watcher/NotifyLockOwner.cc
src/librbd/image_watcher/NotifyLockOwner.h
src/librbd/watcher/Notifier.cc
src/librbd/watcher/Notifier.h
src/librbd/watcher/Types.cc
src/librbd/watcher/Types.h
src/tools/rbd_mirror/LeaderWatcher.cc
src/tools/rbd_mirror/LeaderWatcher.h

index 6cd94b3959b04681f52f2a80898d2daa432b4f1e..74cf9799e8f5ed6441521e56936b8acd38b1db85 100644 (file)
@@ -22,6 +22,8 @@ namespace watch_notify {
 
 using librbd::watcher::ClientId;
 
+WRITE_CLASS_ENCODER(ClientId);
+
 struct AsyncRequestId {
   ClientId client_id;
   uint64_t request_id;
index d678ffba80914e8f1ffdb7ccad7c8bca75320fd8..2964918dc670570e2e7b61d902d53b03208767d4 100644 (file)
@@ -251,9 +251,10 @@ void Watcher::handle_rewatch(int r) {
   }
 }
 
-void Watcher::send_notify(bufferlist& payload, bufferlist *out_bl,
+void Watcher::send_notify(bufferlist& payload,
+                          watcher::NotifyResponse *response,
                           Context *on_finish) {
-  m_notifier.notify(payload, out_bl, on_finish);
+  m_notifier.notify(payload, response, on_finish);
 }
 
 void Watcher::WatchCtx::handle_notify(uint64_t notify_id,
index 7d50192e05743e59b02fc93ca43cd8bd6b19765b..01de690937d084d458f556634d89fde57c86cb6b 100644 (file)
@@ -16,6 +16,8 @@ class ContextWQ;
 
 namespace librbd {
 
+namespace watcher { struct NotifyResponse; }
+
 class Watcher {
 public:
   struct C_NotifyAck : public Context {
@@ -72,7 +74,8 @@ protected:
   watcher::Notifier m_notifier;
   WatchState m_watch_state;
 
-  void send_notify(bufferlist &payload, bufferlist *out_bl = nullptr,
+  void send_notify(bufferlist &payload,
+                   watcher::NotifyResponse *response = nullptr,
                    Context *on_finish = nullptr);
 
   virtual void handle_notify(uint64_t notify_id, uint64_t handle,
index 3ba11bba2bb52d267cea11f9fe9d4865bb986fee..f534291200906d6b7b15cec3494813b52612eb74 100644 (file)
@@ -37,7 +37,7 @@ void NotifyLockOwner::send_notify() {
   ldout(cct, 20) << dendl;
 
   assert(m_image_ctx.owner_lock.is_locked());
-  m_notifier.notify(m_bl, &m_out_bl, create_context_callback<
+  m_notifier.notify(m_bl, &m_notify_response, create_context_callback<
     NotifyLockOwner, &NotifyLockOwner::handle_notify>(this));
 }
 
@@ -52,30 +52,17 @@ void NotifyLockOwner::handle_notify(int r) {
     return;
   }
 
-  typedef std::map<std::pair<uint64_t, uint64_t>, bufferlist> responses_t;
-  responses_t responses;
-  if (m_out_bl.length() > 0) {
-    try {
-      bufferlist::iterator iter = m_out_bl.begin();
-      ::decode(responses, iter);
-    } catch (const buffer::error &err) {
-      lderr(cct) << ": failed to decode response" << dendl;
-      finish(-EINVAL);
-      return;
-    }
-  }
-
   bufferlist response;
   bool lock_owner_responded = false;
-  for (responses_t::iterator i = responses.begin(); i != responses.end(); ++i) {
-    if (i->second.length() > 0) {
+  for (auto &it : m_notify_response.acks) {
+    if (it.second.length() > 0) {
       if (lock_owner_responded) {
         lderr(cct) << ": duplicate lock owners detected" << dendl;
         finish(-EINVAL);
         return;
       }
       lock_owner_responded = true;
-      response.claim(i->second);
+      response.claim(it.second);
     }
   }
 
index 3ed5f39d9d8e8054761b965bac350a0bca68da4c..6249bc1284aae087f67badf71397c9a9358b74ab 100644 (file)
@@ -5,6 +5,7 @@
 #define CEPH_LIBRBD_IMAGE_WATCHER_NOTIFY_LOCK_OWNER_H
 
 #include "include/buffer.h"
+#include "librbd/watcher/Types.h"
 
 class Context;
 
@@ -34,7 +35,7 @@ private:
   watcher::Notifier &m_notifier;
 
   bufferlist m_bl;
-  bufferlist m_out_bl;
+  watcher::NotifyResponse m_notify_response;
   Context *m_on_finish;
 
   void send_notify();
index f36bee7603453db542ef50a39c32e92ee1fc68c8..b899de2232113ce159ba44d0326b30660387aa17 100644 (file)
@@ -5,6 +5,7 @@
 #include "common/WorkQueue.h"
 #include "librbd/ImageCtx.h"
 #include "librbd/Utils.h"
+#include "librbd/watcher/Types.h"
 
 #define dout_subsys ceph_subsys_rbd
 #undef dout_prefix
@@ -15,6 +16,25 @@ namespace watcher {
 
 const uint64_t Notifier::NOTIFY_TIMEOUT = 5000;
 
+Notifier::C_AioNotify::C_AioNotify(Notifier *notifier, NotifyResponse *response,
+                                   Context *on_finish)
+  : notifier(notifier), response(response), on_finish(on_finish) {
+}
+
+void Notifier::C_AioNotify::finish(int r) {
+  if (response != nullptr) {
+    if (r == 0 || r == -ETIMEDOUT) {
+      try {
+        bufferlist::iterator it = out_bl.begin();
+        ::decode(*response, it);
+      } catch (const buffer::error &err) {
+        r = -EBADMSG;
+      }
+    }
+  }
+  notifier->handle_notify(r, on_finish);
+}
+
 Notifier::Notifier(ContextWQ *work_queue, IoCtx &ioctx, const std::string &oid)
   : m_work_queue(work_queue), m_ioctx(ioctx), m_oid(oid),
     m_aio_notify_lock(util::unique_lock_name(
@@ -37,7 +57,8 @@ void Notifier::flush(Context *on_finish) {
   m_aio_notify_flush_ctxs.push_back(on_finish);
 }
 
-void Notifier::notify(bufferlist &bl, bufferlist *out_bl, Context *on_finish) {
+void Notifier::notify(bufferlist &bl, NotifyResponse *response,
+                      Context *on_finish) {
   {
     Mutex::Locker aio_notify_locker(m_aio_notify_lock);
     ++m_pending_aio_notifies;
@@ -46,9 +67,9 @@ void Notifier::notify(bufferlist &bl, bufferlist *out_bl, Context *on_finish) {
                      << dendl;
   }
 
-  C_AioNotify *ctx = new C_AioNotify(this, on_finish);
+  C_AioNotify *ctx = new C_AioNotify(this, response, on_finish);
   librados::AioCompletion *comp = util::create_rados_callback(ctx);
-  int r = m_ioctx.aio_notify(m_oid, comp, bl, NOTIFY_TIMEOUT, out_bl);
+  int r = m_ioctx.aio_notify(m_oid, comp, bl, NOTIFY_TIMEOUT, &ctx->out_bl);
   assert(r == 0);
   comp->release();
 }
index b0173e3f212e21fd8e0c5a704d1fcc4c66009316..8b0ad37b4d3d08c4bfd4c3fe23322affd487f803 100644 (file)
@@ -16,6 +16,8 @@ namespace librbd {
 
 namespace watcher {
 
+struct NotifyResponse;
+
 class Notifier {
 public:
   static const uint64_t NOTIFY_TIMEOUT;
@@ -25,21 +27,21 @@ public:
   ~Notifier();
 
   void flush(Context *on_finish);
-  void notify(bufferlist &bl, bufferlist *out_bl, Context *on_finish);
+  void notify(bufferlist &bl, NotifyResponse *response, Context *on_finish);
 
 private:
   typedef std::list<Context*> Contexts;
 
   struct C_AioNotify : public Context {
     Notifier *notifier;
+    NotifyResponse *response;
     Context *on_finish;
+    bufferlist out_bl;
+
+    C_AioNotify(Notifier *notifier, NotifyResponse *response,
+                Context *on_finish);
 
-    C_AioNotify(Notifier *notifier, Context *on_finish)
-      : notifier(notifier), on_finish(on_finish) {
-    }
-    void finish(int r) override {
-      notifier->handle_notify(r, on_finish);
-    }
+    void finish(int r) override;
   };
 
   ContextWQ *m_work_queue;
index e50cfdaf729f7dd056d82b2421165d72faa6c762..b0250f026d74f7b51a4093839845be5e57edecd8 100644 (file)
@@ -22,6 +22,18 @@ void ClientId::dump(Formatter *f) const {
   f->dump_unsigned("handle", handle);
 }
 
+WRITE_CLASS_ENCODER(ClientId);
+
+void NotifyResponse::encode(bufferlist& bl) const {
+  ::encode(acks, bl);
+  ::encode(timeouts, bl);
+}
+
+void NotifyResponse::decode(bufferlist::iterator& iter) {
+  ::decode(acks, iter);
+  ::decode(timeouts, iter);
+}
+
 } // namespace watcher
 } // namespace librbd
 
index 0c65e32fe1230fb2eb1a11900ca32ef64793bbbb..e7886f6cad5e77869939528ec7b4a49c8e91cbd6 100644 (file)
@@ -46,6 +46,14 @@ struct ClientId {
   }
 };
 
+struct NotifyResponse {
+  std::map<ClientId, bufferlist> acks;
+  std::vector<ClientId> timeouts;
+
+  void encode(bufferlist& bl) const;
+  void decode(bufferlist::iterator& it);
+};
+
 template <typename ImageCtxT>
 struct Traits {
   typedef librbd::Watcher Watcher;
@@ -58,5 +66,6 @@ std::ostream &operator<<(std::ostream &out,
                          const librbd::watcher::ClientId &client);
 
 WRITE_CLASS_ENCODER(librbd::watcher::ClientId);
+WRITE_CLASS_ENCODER(librbd::watcher::NotifyResponse);
 
 #endif // CEPH_LIBRBD_WATCHER_TYPES_H
index c23863dab44e4a61166619fd0581971c801f76ec..152ec3862c7ccebee0611f6f0c0428d8c5641dcc 100644 (file)
@@ -903,8 +903,8 @@ void LeaderWatcher<I>::notify_heartbeat() {
   bufferlist bl;
   ::encode(NotifyMessage{HeartbeatPayload{}}, bl);
 
-  m_heartbeat_ack_bl.clear();
-  send_notify(bl, &m_heartbeat_ack_bl, ctx);
+  m_heartbeat_response.acks.clear();
+  send_notify(bl, &m_heartbeat_response, ctx);
 }
 
 template <typename I>
@@ -930,31 +930,17 @@ void LeaderWatcher<I>::handle_notify_heartbeat(int r) {
     return;
   }
 
-  try {
-    bufferlist::iterator iter = m_heartbeat_ack_bl.begin();
-    uint32_t num_acks;
-    ::decode(num_acks, iter);
-
-    dout(20) << num_acks << " acks received" << dendl;
-
-    for (uint32_t i = 0; i < num_acks; i++) {
-      uint64_t notifier_id;
-      uint64_t cookie;
-      bufferlist reply_bl;
-
-      ::decode(notifier_id, iter);
-      ::decode(cookie, iter);
-      ::decode(reply_bl, iter);
+  dout(20) << m_heartbeat_response.acks.size() << " acks received, "
+           << m_heartbeat_response.timeouts.size() << " timed out" << dendl;
 
-      if (notifier_id == m_notifier_id) {
-       continue;
-      }
-
-      std::string instance_id = stringify(notifier_id);
-      m_instances->notify(instance_id);
+  for (auto &it: m_heartbeat_response.acks) {
+    uint64_t notifier_id = it.first.gid;
+    if (notifier_id == m_notifier_id) {
+      continue;
     }
-  } catch (const buffer::error &err) {
-    derr << ": error decoding heartbeat acks: " << err.what() << dendl;
+
+    std::string instance_id = stringify(notifier_id);
+    m_instances->notify(instance_id);
   }
 
   schedule_timer_task("heartbeat", 1, true,
index c1fbd013452d7a6809ccd36ef1e3c03226879683..b3d05122c2fc0d4d8f58d0deb47ec72c8b042ff9 100644 (file)
@@ -10,8 +10,9 @@
 
 #include "common/AsyncOpTracker.h"
 #include "librbd/ManagedLock.h"
-#include "librbd/managed_lock/Types.h"
 #include "librbd/Watcher.h"
+#include "librbd/managed_lock/Types.h"
+#include "librbd/watcher/Types.h"
 #include "Instances.h"
 #include "MirrorStatusWatcher.h"
 #include "tools/rbd_mirror/leader_watcher/Types.h"
@@ -195,7 +196,7 @@ private:
   Context *m_timer_task = nullptr;
   C_TimerGate *m_timer_gate = nullptr;
 
-  bufferlist m_heartbeat_ack_bl;
+  librbd::watcher::NotifyResponse m_heartbeat_response;
 
   bool is_leader(Mutex &m_lock);