]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
librbd: journal replay flush should be async
authorJason Dillaman <dillaman@redhat.com>
Tue, 15 Dec 2015 18:20:48 +0000 (13:20 -0500)
committerJason Dillaman <dillaman@redhat.com>
Fri, 15 Jan 2016 15:40:28 +0000 (10:40 -0500)
Signed-off-by: Jason Dillaman <dillaman@redhat.com>
src/librbd/Journal.cc
src/librbd/Journal.h
src/librbd/journal/Replay.cc
src/librbd/journal/Replay.h

index 44a37d0db6aa92ad74838f1f58214a2b5801406e..54d071c79ea50652950d5f565fd5aa54657b3f93 100644 (file)
@@ -22,6 +22,9 @@
 
 namespace librbd {
 
+using util::create_async_context_callback;
+using util::create_context_callback;
+
 namespace {
 
 const std::string CLIENT_DESCRIPTION = "master image";
@@ -47,12 +50,16 @@ struct SetOpRequestTid : public boost::static_visitor<void> {
   }
 };
 
+template <typename ImageCtxT>
 struct C_ReplayCommitted : public Context {
-  ::journal::Journaler *journaler;
-  ::journal::ReplayEntry replay_entry;
+  typedef journal::TypeTraits<ImageCtxT> TypeTraits;
+  typedef typename TypeTraits::Journaler Journaler;
+  typedef typename TypeTraits::ReplayEntry ReplayEntry;
+
+  Journaler *journaler;
+  ReplayEntry replay_entry;
 
-  C_ReplayCommitted(::journal::Journaler *journaler,
-                   ::journal::ReplayEntry &&replay_entry) :
+  C_ReplayCommitted(Journaler *journaler, ReplayEntry &&replay_entry) :
     journaler(journaler), replay_entry(std::move(replay_entry)) {
   }
   virtual void finish(int r) {
@@ -75,9 +82,15 @@ std::ostream &operator<<(std::ostream &os,
   case Journal<I>::STATE_REPLAYING:
     os << "Replaying";
     break;
+  case Journal<I>::STATE_FLUSHING_RESTART:
+    os << "FlushingRestart";
+    break;
   case Journal<I>::STATE_RESTARTING_REPLAY:
     os << "RestartingReplay";
     break;
+  case Journal<I>::STATE_FLUSHING_REPLAY:
+    os << "FlushingReplay";
+    break;
   case Journal<I>::STATE_READY:
     os << "Ready";
     break;
@@ -144,8 +157,7 @@ int Journal<I>::create(librados::IoCtx &io_ctx, const std::string &image_id,
     pool_id = data_io_ctx.get_id();
   }
 
-  ::journal::Journaler journaler(io_ctx, image_id, "",
-                                cct->_conf->rbd_journal_commit_age);
+  Journaler journaler(io_ctx, image_id, "", cct->_conf->rbd_journal_commit_age);
 
   int r = journaler.create(order, splay_width, pool_id);
   if (r < 0) {
@@ -166,8 +178,7 @@ int Journal<I>::remove(librados::IoCtx &io_ctx, const std::string &image_id) {
   CephContext *cct = reinterpret_cast<CephContext *>(io_ctx.cct());
   ldout(cct, 5) << __func__ << ": image=" << image_id << dendl;
 
-  ::journal::Journaler journaler(io_ctx, image_id, "",
-                                cct->_conf->rbd_journal_commit_age);
+  Journaler journaler(io_ctx, image_id, "", cct->_conf->rbd_journal_commit_age);
 
   bool journal_exists;
   int r = journaler.exists(&journal_exists);
@@ -202,8 +213,7 @@ int Journal<I>::reset(librados::IoCtx &io_ctx, const std::string &image_id) {
   CephContext *cct = reinterpret_cast<CephContext *>(io_ctx.cct());
   ldout(cct, 5) << __func__ << ": image=" << image_id << dendl;
 
-  ::journal::Journaler journaler(io_ctx, image_id, "",
-                                cct->_conf->rbd_journal_commit_age);
+  Journaler journaler(io_ctx, image_id, "", cct->_conf->rbd_journal_commit_age);
 
   C_SaferCond cond;
   journaler.init(&cond);
@@ -308,7 +318,7 @@ uint64_t Journal<I>::append_io_event(AioCompletion *aio_comp,
   bufferlist bl;
   ::encode(event_entry, bl);
 
-  ::journal::Future future;
+  Future future;
   uint64_t tid;
   {
     Mutex::Locker locker(m_lock);
@@ -442,7 +452,7 @@ void Journal<I>::flush_event(uint64_t tid, Context *on_safe) {
   ldout(cct, 20) << this << " " << __func__ << ": tid=" << tid << ", "
                  << "on_safe=" << on_safe << dendl;
 
-  ::journal::Future future;
+  Future future;
   {
     Mutex::Locker event_locker(m_event_lock);
     future = wait_event(m_lock, tid, on_safe);
@@ -464,8 +474,8 @@ void Journal<I>::wait_event(uint64_t tid, Context *on_safe) {
 }
 
 template <typename I>
-::journal::Future Journal<I>::wait_event(Mutex &lock, uint64_t tid,
-                                         Context *on_safe) {
+typename Journal<I>::Future Journal<I>::wait_event(Mutex &lock, uint64_t tid,
+                                                   Context *on_safe) {
   assert(m_event_lock.is_locked());
   CephContext *cct = m_image_ctx.cct;
 
@@ -474,7 +484,7 @@ template <typename I>
     // journal entry already safe
     ldout(cct, 20) << "journal entry already safe" << dendl;
     m_image_ctx.op_work_queue->queue(on_safe, 0);
-    return ::journal::Future();
+    return Future();
   }
 
   Event &event = it->second;
@@ -492,9 +502,11 @@ void Journal<I>::create_journaler() {
   assert(m_journaler == NULL);
 
   transition_state(STATE_INITIALIZING, 0);
-  m_journaler = new ::journal::Journaler(m_image_ctx.md_ctx, m_image_ctx.id, "",
-                                         m_image_ctx.journal_commit_age);
-  m_journaler->init(new C_InitJournal(this));
+  m_journaler = new Journaler(m_image_ctx.md_ctx, m_image_ctx.id, "",
+                              m_image_ctx.journal_commit_age);
+  m_journaler->init(create_async_context_callback(
+    m_image_ctx, create_context_callback<
+      Journal<I>, &Journal<I>::handle_initialized>(this)));
 }
 
 template <typename I>
@@ -508,7 +520,8 @@ void Journal<I>::destroy_journaler(int r) {
   m_journal_replay = NULL;
 
   transition_state(STATE_CLOSING, r);
-  m_image_ctx.op_work_queue->queue(new C_DestroyJournaler(this), 0);
+  m_image_ctx.op_work_queue->queue(create_context_callback<
+    Journal<I>, &Journal<I>::handle_journal_destroyed>(this), 0);
 }
 
 template <typename I>
@@ -517,13 +530,15 @@ void Journal<I>::recreate_journaler(int r) {
   ldout(cct, 20) << this << " " << __func__ << ": r=" << r << dendl;
 
   assert(m_lock.is_locked());
-  assert(m_state == STATE_REPLAYING);
+  assert(m_state == STATE_FLUSHING_RESTART ||
+         m_state == STATE_FLUSHING_REPLAY);
 
   delete m_journal_replay;
   m_journal_replay = NULL;
 
   transition_state(STATE_RESTARTING_REPLAY, r);
-  m_image_ctx.op_work_queue->queue(new C_DestroyJournaler(this), 0);
+  m_image_ctx.op_work_queue->queue(create_context_callback<
+    Journal<I>, &Journal<I>::handle_journal_destroyed>(this), 0);
 }
 
 template <typename I>
@@ -572,7 +587,7 @@ void Journal<I>::handle_replay_ready() {
   }
 
   while (true) {
-    ::journal::ReplayEntry replay_entry;
+    ReplayEntry replay_entry;
     if (!m_journaler->try_pop_front(&replay_entry)) {
       return;
     }
@@ -580,21 +595,24 @@ void Journal<I>::handle_replay_ready() {
     m_lock.Unlock();
     bufferlist data = replay_entry.get_data();
     bufferlist::iterator it = data.begin();
-    int r = m_journal_replay->process(
-      it, new C_ReplayCommitted(m_journaler, std::move(replay_entry)));
+
+    Context *on_commit = new C_ReplayCommitted<I>(m_journaler,
+                                                  std::move(replay_entry));
+    int r = m_journal_replay->process(it, on_commit);
     m_lock.Lock();
 
     if (r < 0) {
       lderr(cct) << "failed to replay journal entry: " << cpp_strerror(r)
                  << dendl;
-      m_journaler->stop_replay();
+      delete on_commit;
 
-      if (m_close_pending) {
-        destroy_journaler(r);
-        return;
-      }
+      m_journaler->stop_replay();
 
-      recreate_journaler(r);
+      transition_state(STATE_FLUSHING_RESTART, r);
+      m_journal_replay->flush(create_async_context_callback(
+        m_image_ctx, create_context_callback<
+          Journal<I>, &Journal<I>::handle_flushing_restart>(this)));
+      return;
     }
   }
 }
@@ -611,31 +629,62 @@ void Journal<I>::handle_replay_complete(int r) {
 
     ldout(cct, 20) << this << " " << __func__ << dendl;
     m_journaler->stop_replay();
-
-    if (r == 0) {
-      r = m_journal_replay->flush();
-    }
-
     if (r < 0) {
-      lderr(cct) << this << " " << __func__ << ": r=" << r << dendl;
-      recreate_journaler(r);
+      transition_state(STATE_FLUSHING_RESTART, r);
+      m_journal_replay->flush(create_async_context_callback(
+        m_image_ctx, create_context_callback<
+          Journal<I>, &Journal<I>::handle_flushing_restart>(this)));
       return;
     }
 
-    delete m_journal_replay;
-    m_journal_replay = NULL;
+    transition_state(STATE_FLUSHING_REPLAY, 0);
+    m_journal_replay->flush(create_async_context_callback(
+      m_image_ctx, create_context_callback<
+        Journal<I>, &Journal<I>::handle_flushing_replay>(this)));
+  }
+}
 
-    if (m_close_pending) {
-      destroy_journaler(0);
-      return;
-    }
+template <typename I>
+void Journal<I>::handle_flushing_restart(int r) {
+  CephContext *cct = m_image_ctx.cct;
+  ldout(cct, 20) << this << " " << __func__ << ": r=" << r << dendl;
+
+  Mutex::Locker locker(m_lock);
+  assert(m_state == STATE_FLUSHING_RESTART);
+  if (m_close_pending) {
+    destroy_journaler(r);
+    return;
+  }
+
+  recreate_journaler(r);
+}
 
-    m_error_result = 0;
-    m_journaler->start_append(m_image_ctx.journal_object_flush_interval,
-                             m_image_ctx.journal_object_flush_bytes,
-                             m_image_ctx.journal_object_flush_age);
-    transition_state(STATE_READY, 0);
+template <typename I>
+void Journal<I>::handle_flushing_replay(int r) {
+  CephContext *cct = m_image_ctx.cct;
+  ldout(cct, 20) << this << " " << __func__ << ": r=" << r << dendl;
+
+  Mutex::Locker locker(m_lock);
+  assert(m_state == STATE_FLUSHING_REPLAY);
+  if (m_close_pending) {
+    destroy_journaler(r);
+    return;
   }
+
+  if (r < 0) {
+    lderr(cct) << this << " " << __func__ << ": r=" << r << dendl;
+    recreate_journaler(r);
+    return;
+  }
+
+  delete m_journal_replay;
+  m_journal_replay = NULL;
+
+  m_error_result = 0;
+  m_journaler->start_append(m_image_ctx.journal_object_flush_interval,
+                           m_image_ctx.journal_object_flush_bytes,
+                           m_image_ctx.journal_object_flush_age);
+  transition_state(STATE_READY, 0);
 }
 
 template <typename I>
@@ -730,25 +779,8 @@ void Journal<I>::stop_recording() {
   transition_state(STATE_STOPPING, 0);
 
   m_journaler->stop_append(util::create_async_context_callback(
-    m_image_ctx, new C_StopRecording(this)));
-}
-
-template <typename I>
-void Journal<I>::block_writes() {
-  assert(m_lock.is_locked());
-  if (!m_blocking_writes) {
-    m_blocking_writes = true;
-    m_image_ctx.aio_work_queue->block_writes();
-  }
-}
-
-template <typename I>
-void Journal<I>::unblock_writes() {
-  assert(m_lock.is_locked());
-  if (m_blocking_writes) {
-    m_blocking_writes = false;
-    m_image_ctx.aio_work_queue->unblock_writes();
-  }
+    m_image_ctx, create_context_callback<
+      Journal<I>, &Journal<I>::handle_recording_stopped>(this)));
 }
 
 template <typename I>
@@ -780,7 +812,9 @@ bool Journal<I>::is_steady_state() const {
   case STATE_UNINITIALIZED:
   case STATE_INITIALIZING:
   case STATE_REPLAYING:
+  case STATE_FLUSHING_RESTART:
   case STATE_RESTARTING_REPLAY:
+  case STATE_FLUSHING_REPLAY:
   case STATE_STOPPING:
   case STATE_CLOSING:
     break;
index 80c0babbd623c4dcaee237c0446fd4f7e67a3566..934b947372b7799b70dc38f8355687cd6056252c 100644 (file)
@@ -20,6 +20,7 @@
 class Context;
 namespace journal {
 class Journaler;
+class ReplayEntry;
 }
 
 namespace librbd {
@@ -29,9 +30,19 @@ class AioObjectRequest;
 class ImageCtx;
 
 namespace journal {
+
 class EventEntry;
 template <typename> class Replay;
-}
+
+template <typename ImageCtxT>
+struct TypeTraits {
+  typedef ::journal::Journaler Journaler;
+  typedef ::journal::Future Future;
+  typedef ::journal::ReplayEntry ReplayEntry;
+};
+
+} // namespace journal
+
 
 template <typename ImageCtxT = ImageCtx>
 class Journal {
@@ -42,21 +53,25 @@ public:
    * <start>
    *    |
    *    v
-   * UNINITIALIZED ---> INITIALIZING ---> REPLAYING ------> READY
-   *    |                 *  .  ^             *  .            |
-   *    |                 *  .  |             *  .            |
-   *    |                 *  .  |    (error)  *  . . . .      |
-   *    |                 *  .  |             *        .      |
-   *    |                 *  .  |             v        .      v
-   *    |                 *  .  |         RESTARTING   .    STOPPING
-   *    |                 *  .  |             |        .      |
-   *    |                 *  .  |             |        .      |
-   *    |       * * * * * *  .  \-------------/        .      |
-   *    |       * (error)    .                         .      |
-   *    |       *            .   . . . . . . . . . . . .      |
-   *    |       *            .   .                            |
-   *    |       v            v   v                            |
-   *    |     CLOSED <----- CLOSING <-------------------------/
+   * UNINITIALIZED ---> INITIALIZING ---> REPLAYING ------> FLUSHING ---> READY
+   *    |                 *  .  ^             *  .              *           |
+   *    |                 *  .  |             *  .              *           |
+   *    |                 *  .  |    (error)  *  . . . . . . .  *           |
+   *    |                 *  .  |             *              .  *           |
+   *    |                 *  .  |             v              .  *           |
+   *    |                 *  .  |         FLUSHING_RESTART   .  *           |
+   *    |                 *  .  |             |              .  *           |
+   *    |                 *  .  |             |              .  *           |
+   *    |                 *  .  |             v              .  *           v
+   *    |                 *  .  |         RESTARTING  < * * * * *       STOPPING
+   *    |                 *  .  |             |              .              |
+   *    |                 *  .  |             |              .              |
+   *    |       * * * * * *  .  \-------------/              .              |
+   *    |       * (error)    .                               .              |
+   *    |       *            .   . . . . . . . . . . . . . . .              |
+   *    |       *            .   .                                          |
+   *    |       v            v   v                                          |
+   *    |     CLOSED <----- CLOSING <---------------------------------------/
    *    |       |
    *    |       v
    *    \---> <finish>
@@ -67,7 +82,9 @@ public:
     STATE_UNINITIALIZED,
     STATE_INITIALIZING,
     STATE_REPLAYING,
+    STATE_FLUSHING_RESTART,
     STATE_RESTARTING_REPLAY,
+    STATE_FLUSHING_REPLAY,
     STATE_READY,
     STATE_STOPPING,
     STATE_CLOSING,
@@ -110,11 +127,19 @@ public:
   void wait_event(uint64_t tid, Context *on_safe);
 
 private:
+  ImageCtxT &m_image_ctx;
+
+  // mock unit testing support
+  typedef journal::TypeTraits<ImageCtxT> TypeTraits;
+  typedef typename TypeTraits::Journaler Journaler;
+  typedef typename TypeTraits::Future Future;
+  typedef typename TypeTraits::ReplayEntry ReplayEntry;
+
   typedef std::list<Context *> Contexts;
   typedef interval_set<uint64_t> ExtentInterval;
 
   struct Event {
-    ::journal::Future future;
+    Future future;
     AioCompletion *aio_comp;
     AioObjectRequests aio_object_requests;
     Contexts on_safe_contexts;
@@ -124,7 +149,7 @@ private:
 
     Event() : aio_comp(NULL) {
     }
-    Event(const ::journal::Future &_future, AioCompletion *_aio_comp,
+    Event(const Future &_future, AioCompletion *_aio_comp,
           const AioObjectRequests &_requests, uint64_t offset, size_t length)
       : future(_future), aio_comp(_aio_comp), aio_object_requests(_requests),
         safe(false), ret_val(0) {
@@ -133,40 +158,8 @@ private:
       }
     }
   };
-  typedef ceph::unordered_map<uint64_t, Event> Events;
-
-  struct C_InitJournal : public Context {
-    Journal *journal;
-
-    C_InitJournal(Journal *_journal) : journal(_journal) {
-    }
-
-    virtual void finish(int r) {
-      journal->handle_initialized(r);
-    }
-  };
-
-  struct C_StopRecording : public Context {
-    Journal *journal;
-
-    C_StopRecording(Journal *_journal) : journal(_journal) {
-    }
-
-    virtual void finish(int r) {
-      journal->handle_recording_stopped(r);
-    }
-  };
 
-  struct C_DestroyJournaler : public Context {
-    Journal *journal;
-
-    C_DestroyJournaler(Journal *_journal) : journal(_journal) {
-    }
-
-    virtual void finish(int r) {
-      journal->handle_journal_destroyed(r);
-    }
-  };
+  typedef ceph::unordered_map<uint64_t, Event> Events;
 
   struct C_EventSafe : public Context {
     Journal *journal;
@@ -201,9 +194,7 @@ private:
     }
   };
 
-  ImageCtxT &m_image_ctx;
-
-  ::journal::Journaler *m_journaler;
+  Journaler *m_journaler;
   mutable Mutex m_lock;
   State m_state;
 
@@ -221,7 +212,7 @@ private:
 
   journal::Replay<ImageCtxT> *m_journal_replay;
 
-  ::journal::Future wait_event(Mutex &lock, uint64_t tid, Context *on_safe);
+  Future wait_event(Mutex &lock, uint64_t tid, Context *on_safe);
 
   void create_journaler();
   void destroy_journaler(int r);
@@ -234,6 +225,9 @@ private:
   void handle_replay_ready();
   void handle_replay_complete(int r);
 
+  void handle_flushing_restart(int r);
+  void handle_flushing_replay(int r);
+
   void handle_recording_stopped(int r);
 
   void handle_journal_destroyed(int r);
@@ -242,9 +236,6 @@ private:
 
   void stop_recording();
 
-  void block_writes();
-  void unblock_writes();
-
   void transition_state(State state, int r);
 
   bool is_steady_state() const;
index 01847f02bd9ff77abce595a0b3cbf31ee4be6cfa..23172179e18047c5a7d511e76f9f4ff9b4324876 100644 (file)
@@ -16,7 +16,8 @@ namespace journal {
 
 template <typename I>
 Replay<I>::Replay(I &image_ctx)
-  : m_image_ctx(image_ctx), m_lock("Replay<I>::m_lock"), m_ret_val(0) {
+  : m_image_ctx(image_ctx), m_lock("Replay<I>::m_lock"), m_flush_ctx(nullptr),
+    m_ret_val(0) {
 }
 
 template <typename I>
@@ -43,15 +44,20 @@ int Replay<I>::process(bufferlist::iterator it, Context *on_safe) {
 }
 
 template <typename I>
-int Replay<I>::flush() {
+void Replay<I>::flush(Context *on_finish) {
   CephContext *cct = m_image_ctx.cct;
   ldout(cct, 20) << this << " " << __func__ << dendl;
 
-  Mutex::Locker locker(m_lock);
-  while (!m_aio_completions.empty()) {
-    m_cond.Wait(m_lock);
+  {
+    Mutex::Locker locker(m_lock);
+    assert(m_flush_ctx == nullptr);
+    m_flush_ctx = on_finish;
+
+    if (!m_aio_completions.empty()) {
+      return;
+    }
   }
-  return m_ret_val;
+  on_finish->complete(m_ret_val);
 }
 
 template <typename I>
@@ -179,27 +185,36 @@ AioCompletion *Replay<I>::create_aio_completion(Context *on_safe) {
 
 template <typename I>
 void Replay<I>::handle_aio_completion(AioCompletion *aio_comp) {
-  Mutex::Locker locker(m_lock);
+  Context *on_finish = nullptr;
+  {
+    Mutex::Locker locker(m_lock);
 
-  AioCompletions::iterator it = m_aio_completions.find(aio_comp);
-  assert(it != m_aio_completions.end());
+    AioCompletions::iterator it = m_aio_completions.find(aio_comp);
+    assert(it != m_aio_completions.end());
 
-  int r = aio_comp->get_return_value();
+    int r = aio_comp->get_return_value();
 
-  CephContext *cct = m_image_ctx.cct;
-  ldout(cct, 20) << this << " " << __func__ << ": aio_comp=" << aio_comp << ", "
-                 << "r=" << r << dendl;
+    CephContext *cct = m_image_ctx.cct;
+    ldout(cct, 20) << this << " " << __func__ << ": "
+                   << "aio_comp=" << aio_comp << ", "
+                   << "r=" << r << dendl;
+
+    Context *on_safe = it->second;
+    on_safe->complete(r);
 
-  Context *on_safe = it->second;
-  on_safe->complete(r);
+    if (r < 0 && m_ret_val == 0) {
+      m_ret_val = r;
+    }
 
-  if (r < 0 && m_ret_val == 0) {
-    m_ret_val = r;
+    m_aio_completions.erase(it);
+    if (m_aio_completions.empty()) {
+      on_finish = m_flush_ctx;
+    }
   }
 
-  m_aio_completions.erase(it);
-  if (m_aio_completions.empty())
-    m_cond.Signal();
+  if (on_finish != nullptr) {
+    on_finish->complete(m_ret_val);
+  }
 }
 
 template <typename I>
index 64882844bb5d23a9cdb86ae06d8bdf44c7c5d331..3eeb452648ad4c3d7774e172c954f0eeaef8b547 100644 (file)
@@ -7,12 +7,13 @@
 #include "include/int_types.h"
 #include "include/buffer_fwd.h"
 #include "include/rbd/librbd.hpp"
-#include "common/Cond.h"
 #include "common/Mutex.h"
 #include "librbd/journal/Entries.h"
 #include <boost/variant.hpp>
 #include <map>
 
+class Context;
+
 namespace librbd {
 
 class AioCompletion;
@@ -30,7 +31,7 @@ public:
   ~Replay();
 
   int process(bufferlist::iterator it, Context *on_safe = NULL);
-  int flush();
+  void flush(Context *on_finish);
 
 private:
   typedef std::map<AioCompletion*,Context*> AioCompletions;
@@ -52,9 +53,9 @@ private:
   ImageCtxT &m_image_ctx;
 
   Mutex m_lock;
-  Cond m_cond;
 
   AioCompletions m_aio_completions;
+  Context *m_flush_ctx;
   int m_ret_val;
 
   Replay(ImageCtxT &image_ctx);