]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
librbd: journal: mark entry committed after replay
authorMykola Golub <mgolub@mirantis.com>
Fri, 30 Oct 2015 10:41:15 +0000 (12:41 +0200)
committerMykola Golub <mgolub@mirantis.com>
Fri, 4 Dec 2015 11:18:30 +0000 (13:18 +0200)
After replying a journal entry we have to call committed(). Otherwise,
the entries remain with flag committed=false in JournalMetadata
m_pending_commit_tids forever, which prevents commit position update.

Signed-off-by: Mykola Golub <mgolub@mirantis.com>
src/librbd/Journal.cc
src/librbd/JournalReplay.cc
src/librbd/JournalReplay.h

index 1b71d0418dc7f21815198341879ec51526e4b0d6..1a229be996761f026601afd9c299aab9b82606fb 100644 (file)
@@ -55,6 +55,19 @@ struct SetOpRequestTid : public boost::static_visitor<void> {
   }
 };
 
+struct C_ReplayCommitted : public Context {
+  ::journal::Journaler *journaler;
+  ::journal::ReplayEntry replay_entry;
+
+  C_ReplayCommitted(::journal::Journaler *journaler,
+                   ::journal::ReplayEntry &&replay_entry) :
+    journaler(journaler), replay_entry(std::move(replay_entry)) {
+  }
+  virtual void finish(int r) {
+    journaler->committed(replay_entry);
+  }
+};
+
 } // anonymous namespace
 
 Journal::Journal(ImageCtx &image_ctx)
@@ -547,7 +560,8 @@ void Journal::handle_replay_ready() {
     m_lock.Unlock();
     bufferlist data = replay_entry.get_data();
     bufferlist::iterator it = data.begin();
-    int r = m_journal_replay->process(it);
+    int r = m_journal_replay->process(it, new C_ReplayCommitted(m_journaler,
+                                                               std::move(replay_entry)));
     m_lock.Lock();
 
     if (r < 0) {
index 8c162311d9af3f97e9c9b9786db7428b3f07097f..8a6bff2538628f1104dc4e106b84fa147ab06243 100644 (file)
@@ -21,7 +21,7 @@ JournalReplay::~JournalReplay() {
   assert(m_aio_completions.empty());
 }
 
-int JournalReplay::process(bufferlist::iterator it) {
+int JournalReplay::process(bufferlist::iterator it, Context *on_safe) {
   CephContext *cct = m_image_ctx.cct;
   ldout(cct, 20) << this << " " << __func__ << dendl;
 
@@ -34,7 +34,7 @@ int JournalReplay::process(bufferlist::iterator it) {
     return -EINVAL;
   }
 
-  boost::apply_visitor(EventVisitor(this), event_entry.event);
+  boost::apply_visitor(EventVisitor(this, on_safe), event_entry.event);
   return 0;
 }
 
@@ -49,95 +49,111 @@ int JournalReplay::flush() {
   return m_ret_val;
 }
 
-void JournalReplay::handle_event(const journal::AioDiscardEvent &event) {
+void JournalReplay::handle_event(const journal::AioDiscardEvent &event,
+                                Context *on_safe) {
   CephContext *cct = m_image_ctx.cct;
   ldout(cct, 20) << this << " " << __func__ << ": AIO discard event" << dendl;
 
-  AioCompletion *aio_comp = create_aio_completion();
+  AioCompletion *aio_comp = create_aio_completion(on_safe);
   AioImageRequest::aio_discard(&m_image_ctx, aio_comp, event.offset,
                                event.length);
 }
 
-void JournalReplay::handle_event(const journal::AioWriteEvent &event) {
+void JournalReplay::handle_event(const journal::AioWriteEvent &event,
+                                Context *on_safe) {
   CephContext *cct = m_image_ctx.cct;
   ldout(cct, 20) << this << " " << __func__ << ": AIO write event" << dendl;
 
   bufferlist data = event.data;
-  AioCompletion *aio_comp = create_aio_completion();
+  AioCompletion *aio_comp = create_aio_completion(on_safe);
   AioImageRequest::aio_write(&m_image_ctx, aio_comp, event.offset, event.length,
                              data.c_str(), 0);
 }
 
-void JournalReplay::handle_event(const journal::AioFlushEvent &event) {
+void JournalReplay::handle_event(const journal::AioFlushEvent &event,
+                                Context *on_safe) {
   CephContext *cct = m_image_ctx.cct;
   ldout(cct, 20) << this << " " << __func__ << ": AIO flush event" << dendl;
 
-  AioCompletion *aio_comp = create_aio_completion();
+  AioCompletion *aio_comp = create_aio_completion(on_safe);
   AioImageRequest::aio_flush(&m_image_ctx, aio_comp);
 }
 
-void JournalReplay::handle_event(const journal::OpFinishEvent &event) {
+  void JournalReplay::handle_event(const journal::OpFinishEvent &event,
+                                  Context *on_safe) {
   CephContext *cct = m_image_ctx.cct;
   ldout(cct, 20) << this << " " << __func__ << ": Op finish event" << dendl;
 }
 
-void JournalReplay::handle_event(const journal::SnapCreateEvent &event) {
+void JournalReplay::handle_event(const journal::SnapCreateEvent &event,
+                                Context *on_safe) {
   CephContext *cct = m_image_ctx.cct;
   ldout(cct, 20) << this << " " << __func__ << ": Snap create event" << dendl;
 }
 
-void JournalReplay::handle_event(const journal::SnapRemoveEvent &event) {
+void JournalReplay::handle_event(const journal::SnapRemoveEvent &event,
+                                Context *on_safe) {
   CephContext *cct = m_image_ctx.cct;
   ldout(cct, 20) << this << " " << __func__ << ": Snap remove event" << dendl;
 }
 
-void JournalReplay::handle_event(const journal::SnapRenameEvent &event) {
+void JournalReplay::handle_event(const journal::SnapRenameEvent &event,
+                                Context *on_safe) {
   CephContext *cct = m_image_ctx.cct;
   ldout(cct, 20) << this << " " << __func__ << ": Snap rename event" << dendl;
 }
 
-void JournalReplay::handle_event(const journal::SnapProtectEvent &event) {
+void JournalReplay::handle_event(const journal::SnapProtectEvent &event,
+                                Context *on_safe) {
   CephContext *cct = m_image_ctx.cct;
   ldout(cct, 20) << this << " " << __func__ << ": Snap protect event" << dendl;
 }
 
-void JournalReplay::handle_event(const journal::SnapUnprotectEvent &event) {
+void JournalReplay::handle_event(const journal::SnapUnprotectEvent &event,
+                                Context *on_safe) {
   CephContext *cct = m_image_ctx.cct;
   ldout(cct, 20) << this << " " << __func__ << ": Snap unprotect event"
                  << dendl;
 }
 
-void JournalReplay::handle_event(const journal::SnapRollbackEvent &event) {
+void JournalReplay::handle_event(const journal::SnapRollbackEvent &event,
+                                Context *on_safe) {
   CephContext *cct = m_image_ctx.cct;
   ldout(cct, 20) << this << " " << __func__ << ": Snap rollback start event"
                  << dendl;
 }
 
-void JournalReplay::handle_event(const journal::RenameEvent &event) {
+void JournalReplay::handle_event(const journal::RenameEvent &event,
+                                Context *on_safe) {
   CephContext *cct = m_image_ctx.cct;
   ldout(cct, 20) << this << " " << __func__ << ": Rename event" << dendl;
 }
 
-void JournalReplay::handle_event(const journal::ResizeEvent &event) {
+void JournalReplay::handle_event(const journal::ResizeEvent &event,
+                                Context *on_safe) {
   CephContext *cct = m_image_ctx.cct;
   ldout(cct, 20) << this << " " << __func__ << ": Resize start event" << dendl;
 }
 
-void JournalReplay::handle_event(const journal::FlattenEvent &event) {
+void JournalReplay::handle_event(const journal::FlattenEvent &event,
+                                Context *on_safe) {
   CephContext *cct = m_image_ctx.cct;
   ldout(cct, 20) << this << " " << __func__ << ": Flatten start event" << dendl;
 }
 
-void JournalReplay::handle_event(const journal::UnknownEvent &event) {
+void JournalReplay::handle_event(const journal::UnknownEvent &event,
+                                Context *on_safe) {
   CephContext *cct = m_image_ctx.cct;
   ldout(cct, 20) << this << " " << __func__ << ": unknown event" << dendl;
+  on_safe->complete(0);
 }
 
-AioCompletion *JournalReplay::create_aio_completion() {
+AioCompletion *JournalReplay::create_aio_completion(Context *on_safe) {
   Mutex::Locker locker(m_lock);
   AioCompletion *aio_comp = aio_create_completion_internal(
     this, &aio_completion_callback);
-  m_aio_completions.insert(aio_comp);
+  m_aio_completions.insert(std::pair<AioCompletion*,Context*>(
+                            aio_comp, on_safe));
   return aio_comp;
 }
 
@@ -153,6 +169,9 @@ void JournalReplay::handle_aio_completion(AioCompletion *aio_comp) {
   ldout(cct, 20) << this << " " << __func__ << ": aio_comp=" << aio_comp << ", "
                  << "r=" << r << dendl;
 
+  Context *on_safe = it->second;
+  on_safe->complete(r);
+
   if (r < 0 && m_ret_val == 0) {
     m_ret_val = r;
   }
index 62be2dfb36ef85038638b823d5b3c5b614777a77..b2e2a88916adb0c2a3e8517df1e0ffbdaeecfad9 100644 (file)
@@ -11,7 +11,7 @@
 #include "common/Mutex.h"
 #include "librbd/JournalTypes.h"
 #include <boost/variant.hpp>
-#include <set>
+#include <map>
 
 namespace librbd {
 
@@ -23,22 +23,23 @@ public:
   JournalReplay(ImageCtx &image_ctx);
   ~JournalReplay();
 
-  int process(bufferlist::iterator it);
+  int process(bufferlist::iterator it, Context *on_safe = NULL);
   int flush();
 
 private:
-  typedef std::set<AioCompletion *> AioCompletions;
+  typedef std::map<AioCompletion*,Context*> AioCompletions;
 
   struct EventVisitor : public boost::static_visitor<void> {
     JournalReplay *journal_replay;
+    Context *on_safe;
 
-    EventVisitor(JournalReplay *_journal_replay)
-      : journal_replay(_journal_replay) {
+    EventVisitor(JournalReplay *_journal_replay, Context *_on_safe)
+      : journal_replay(_journal_replay), on_safe(_on_safe) {
     }
 
     template <typename Event>
     inline void operator()(const Event &event) const {
-      journal_replay->handle_event(event);
+      journal_replay->handle_event(event, on_safe);
     }
   };
 
@@ -50,22 +51,22 @@ private:
   AioCompletions m_aio_completions;
   int m_ret_val;
 
-  void handle_event(const journal::AioDiscardEvent &event);
-  void handle_event(const journal::AioWriteEvent &event);
-  void handle_event(const journal::AioFlushEvent &event);
-  void handle_event(const journal::OpFinishEvent &event);
-  void handle_event(const journal::SnapCreateEvent &event);
-  void handle_event(const journal::SnapRemoveEvent &event);
-  void handle_event(const journal::SnapRenameEvent &event);
-  void handle_event(const journal::SnapProtectEvent &event);
-  void handle_event(const journal::SnapUnprotectEvent &event);
-  void handle_event(const journal::SnapRollbackEvent &event);
-  void handle_event(const journal::RenameEvent &event);
-  void handle_event(const journal::ResizeEvent &event);
-  void handle_event(const journal::FlattenEvent &event);
-  void handle_event(const journal::UnknownEvent &event);
-
-  AioCompletion *create_aio_completion();
+  void handle_event(const journal::AioDiscardEvent &event, Context *on_safe);
+  void handle_event(const journal::AioWriteEvent &event, Context *on_safe);
+  void handle_event(const journal::AioFlushEvent &event, Context *on_safe);
+  void handle_event(const journal::OpFinishEvent &event, Context *on_safe);
+  void handle_event(const journal::SnapCreateEvent &event, Context *on_safe);
+  void handle_event(const journal::SnapRemoveEvent &event, Context *on_safe);
+  void handle_event(const journal::SnapRenameEvent &event, Context *on_safe);
+  void handle_event(const journal::SnapProtectEvent &event, Context *on_safe);
+  void handle_event(const journal::SnapUnprotectEvent &event, Context *on_safe);
+  void handle_event(const journal::SnapRollbackEvent &event, Context *on_safe);
+  void handle_event(const journal::RenameEvent &event, Context *on_safe);
+  void handle_event(const journal::ResizeEvent &event, Context *on_safe);
+  void handle_event(const journal::FlattenEvent &event, Context *on_safe);
+  void handle_event(const journal::UnknownEvent &event, Context *on_safe);
+
+  AioCompletion *create_aio_completion(Context *on_safe);
   void handle_aio_completion(AioCompletion *aio_comp);
 
   static void aio_completion_callback(completion_t cb, void *arg);