]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
librbd: delay commit of op start event
authorJason Dillaman <dillaman@redhat.com>
Wed, 24 Feb 2016 21:35:58 +0000 (16:35 -0500)
committerJason Dillaman <dillaman@redhat.com>
Tue, 1 Mar 2016 12:38:06 +0000 (07:38 -0500)
If the start event is flagged as committed before the op is
actually executed, librbd won't be able to replay the event
should a crash occur.

Signed-off-by: Jason Dillaman <dillaman@redhat.com>
src/librbd/Journal.cc
src/librbd/Journal.h

index c2d7f02b37fed6b05cf5c50bf13e9d9c18a5a23b..58a76d1ab04c8b37bb1000453aac04bbbecd78cf 100644 (file)
@@ -407,10 +407,14 @@ void Journal<I>::append_op_event(uint64_t op_tid,
 
     // TODO: use allocated tag_id
     future = m_journaler->append(0, bl);
+
+    // delay committing op event to ensure consistent replay
+    assert(m_op_futures.count(op_tid) == 0);
+    m_op_futures[op_tid] = future;
   }
 
   on_safe = create_async_context_callback(m_image_ctx, on_safe);
-  future.flush(new C_OpEventSafe(this, op_tid, future, on_safe));
+  future.flush(on_safe);
 
   CephContext *cct = m_image_ctx.cct;
   ldout(cct, 10) << this << " " << __func__ << ": "
@@ -429,16 +433,24 @@ void Journal<I>::commit_op_event(uint64_t op_tid, int r) {
   bufferlist bl;
   ::encode(event_entry, bl);
 
-  Future future;
+  Future op_start_future;
+  Future op_finish_future;
   {
     Mutex::Locker locker(m_lock);
     assert(m_state == STATE_READY);
 
+    // ready to commit op event
+    auto it = m_op_futures.find(op_tid);
+    assert(it != m_op_futures.end());
+    op_start_future = it->second;
+    m_op_futures.erase(it);
+
     // TODO: use allocated tag_id
-    future = m_journaler->append(0, bl);
+    op_finish_future = m_journaler->append(0, bl);
   }
 
-  future.flush(new C_OpEventSafe(this, op_tid, future, nullptr));
+  op_finish_future.flush(new C_OpEventSafe(this, op_tid, op_start_future,
+                                           op_finish_future));
 }
 
 template <typename I>
@@ -866,8 +878,9 @@ void Journal<I>::handle_io_event_safe(int r, uint64_t tid) {
 }
 
 template <typename I>
-void Journal<I>::handle_op_event_safe(int r, uint64_t tid, const Future &future,
-                                      Context *on_safe) {
+void Journal<I>::handle_op_event_safe(int r, uint64_t tid,
+                                      const Future &op_start_future,
+                                      const Future &op_finish_future) {
   CephContext *cct = m_image_ctx.cct;
   ldout(cct, 20) << this << " " << __func__ << ": r=" << r << ", "
                  << "tid=" << tid << dendl;
@@ -878,10 +891,11 @@ void Journal<I>::handle_op_event_safe(int r, uint64_t tid, const Future &future,
     lderr(cct) << "failed to commit op event: "  << cpp_strerror(r) << dendl;
   }
 
-  m_journaler->committed(future);
-  if (on_safe != nullptr) {
-    on_safe->complete(r);
-  }
+  m_journaler->committed(op_start_future);
+  m_journaler->committed(op_finish_future);
+
+  // reduce the replay window after committing an op event
+  m_journaler->flush_commit_position(nullptr);
 }
 
 template <typename I>
index 44a7932877f7ce522359e4dedfc16953de66c19d..a9f376763f0f0049b786e3a62e485f1607cc7112 100644 (file)
@@ -8,7 +8,6 @@
 #include "include/atomic.h"
 #include "include/Context.h"
 #include "include/interval_set.h"
-#include "include/unordered_map.h"
 #include "include/rados/librados.hpp"
 #include "common/Mutex.h"
 #include "journal/Future.h"
@@ -18,6 +17,7 @@
 #include <iosfwd>
 #include <list>
 #include <string>
+#include <unordered_map>
 
 class Context;
 namespace journal {
@@ -173,7 +173,8 @@ private:
     }
   };
 
-  typedef ceph::unordered_map<uint64_t, Event> Events;
+  typedef std::unordered_map<uint64_t, Event> Events;
+  typedef std::unordered_map<uint64_t, Future> TidToFutures;
 
   struct C_IOEventSafe : public Context {
     Journal *journal;
@@ -191,16 +192,17 @@ private:
   struct C_OpEventSafe : public Context {
     Journal *journal;
     uint64_t tid;
-    Future future;
-    Context *on_safe;
+    Future op_start_future;
+    Future op_finish_future;
 
-    C_OpEventSafe(Journal *journal, uint64_t tid, const Future &future,
-                  Context *on_safe)
-      : journal(journal), tid(tid), future(future), on_safe(on_safe) {
+    C_OpEventSafe(Journal *journal, uint64_t tid, const Future &op_start_future,
+                  const Future &op_finish_future)
+      : journal(journal), tid(tid), op_start_future(op_start_future),
+        op_finish_future(op_finish_future) {
     }
 
     virtual void finish(int r) {
-      journal->handle_op_event_safe(r, tid, future, on_safe);
+      journal->handle_op_event_safe(r, tid, op_start_future, op_finish_future);
     }
   };
 
@@ -251,6 +253,7 @@ private:
   Events m_events;
 
   atomic_t m_op_tid;
+  TidToFutures m_op_futures;
 
   bool m_blocking_writes;
 
@@ -279,8 +282,8 @@ private:
   void handle_journal_destroyed(int r);
 
   void handle_io_event_safe(int r, uint64_t tid);
-  void handle_op_event_safe(int r, uint64_t tid, const Future &future,
-                            Context *on_safe);
+  void handle_op_event_safe(int r, uint64_t tid, const Future &op_start_future,
+                            const Future &op_finish_future);
 
   void stop_recording();