]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
librbd: the journal state machine is now asynchronous
authorJason Dillaman <dillaman@redhat.com>
Fri, 11 Dec 2015 20:09:38 +0000 (15:09 -0500)
committerJason Dillaman <dillaman@redhat.com>
Tue, 15 Dec 2015 01:31:31 +0000 (20:31 -0500)
Signed-off-by: Jason Dillaman <dillaman@redhat.com>
src/librbd/ImageCtx.cc
src/librbd/ImageCtx.h
src/librbd/Journal.cc
src/librbd/Journal.h
src/librbd/internal.cc

index 6cb3979f779368215b702a6f73fbe2e00ae805e5..4249ccbefb492124cd590de0b04a2a52a74a7a33 100644 (file)
@@ -1024,24 +1024,4 @@ struct C_InvalidateCache : public Context {
   Journal *ImageCtx::create_journal() {
     return new Journal(*this);
   }
-
-  void ImageCtx::open_journal() {
-    assert(journal == NULL);
-    journal = new Journal(*this);
-  }
-
-  int ImageCtx::close_journal(bool force) {
-    assert(journal != NULL);
-    int r = journal->close();
-    if (r < 0) {
-      lderr(cct) << "failed to flush journal: " << cpp_strerror(r) << dendl;
-      if (!force) {
-        return r;
-      }
-    }
-
-    delete journal;
-    journal = NULL;
-    return r;
-  }
 }
index 5845f6425e00db68442678d22d4ef93fe770cc27..977544a343d38d2463b24816b143a723f6a78294 100644 (file)
@@ -273,10 +273,7 @@ namespace librbd {
     void apply_metadata_confs();
 
     ObjectMap *create_object_map(uint64_t snap_id);
-
     Journal *create_journal();
-    void open_journal();            // TODO remove
-    int close_journal(bool force);  // TODO remove
 
     void clear_pending_completions();
   };
index 8b2c82646c0d46d0fc013857a9e0527b5a2057d1..903a4581a81d7df61c52aa06de21a477b3ec4847 100644 (file)
@@ -9,6 +9,7 @@
 #include "librbd/ImageCtx.h"
 #include "librbd/JournalReplay.h"
 #include "librbd/JournalTypes.h"
+#include "librbd/Utils.h"
 #include "journal/Journaler.h"
 #include "journal/ReplayEntry.h"
 #include "common/errno.h"
@@ -25,16 +26,6 @@ namespace {
 
 const std::string CLIENT_DESCRIPTION = "master image";
 
-struct C_DestroyJournaler : public Context {
-  ::journal::Journaler *journaler;
-
-  C_DestroyJournaler(::journal::Journaler *_journaler) : journaler(_journaler) {
-  }
-  virtual void finish(int r) {
-    delete journaler;
-  }
-};
-
 struct SetOpRequestTid : public boost::static_visitor<void> {
   uint64_t tid;
 
@@ -74,24 +65,18 @@ struct C_ReplayCommitted : public Context {
 Journal::Journal(ImageCtx &image_ctx)
   : m_image_ctx(image_ctx), m_journaler(NULL),
     m_lock("Journal::m_lock"), m_state(STATE_UNINITIALIZED),
-    m_replay_handler(this), m_close_pending(false),
+    m_error_result(0), m_replay_handler(this), m_close_pending(false),
     m_event_lock("Journal::m_event_lock"), m_event_tid(0),
     m_blocking_writes(false), m_journal_replay(NULL) {
 
   ldout(m_image_ctx.cct, 5) << this << ": ictx=" << &m_image_ctx << dendl;
-
-  Mutex::Locker locker(m_lock);
-  block_writes();
 }
 
 Journal::~Journal() {
-  m_image_ctx.op_work_queue->drain();
+  assert(m_state == STATE_UNINITIALIZED || m_state == STATE_CLOSED);
   assert(m_journaler == NULL);
   assert(m_journal_replay == NULL);
   assert(m_wait_for_state_contexts.empty());
-
-  Mutex::Locker locker(m_lock);
-  unblock_writes();
 }
 
 bool Journal::is_journal_supported(ImageCtx &image_ctx) {
@@ -214,7 +199,7 @@ int Journal::reset(librados::IoCtx &io_ctx, const std::string &image_id) {
 
 bool Journal::is_journal_ready() const {
   Mutex::Locker locker(m_lock);
-  return (m_state == STATE_RECORDING);
+  return (m_state == STATE_READY);
 }
 
 bool Journal::is_journal_replaying() const {
@@ -223,76 +208,47 @@ bool Journal::is_journal_replaying() const {
 }
 
 void Journal::wait_for_journal_ready(Context *on_ready) {
-  Mutex::Locker locker(m_lock);
-  schedule_wait_for_ready(on_ready);
-}
+  on_ready = util::create_async_context_callback(m_image_ctx, on_ready);
 
-void Journal::wait_for_journal_ready() {
   Mutex::Locker locker(m_lock);
-  while (m_state != STATE_RECORDING) {
-    wait_for_state_transition();
+  if (m_state == STATE_READY) {
+    on_ready->complete(m_error_result);
+  } else {
+    wait_for_steady_state(on_ready);
   }
 }
 
-void Journal::open() {
-  Mutex::Locker locker(m_lock);
-  if (m_journaler != NULL) {
-    return;
-  }
-
+void Journal::open(Context *on_finish) {
   CephContext *cct = m_image_ctx.cct;
   ldout(cct, 20) << this << " " << __func__ << dendl;
-  create_journaler();
-}
 
-void Journal::open(Context *on_finish) {
-  open();
-  wait_for_journal_ready(on_finish);
+  on_finish = util::create_async_context_callback(m_image_ctx, on_finish);
+
+  Mutex::Locker locker(m_lock);
+  assert(m_state == STATE_UNINITIALIZED);
+  wait_for_steady_state(on_finish);
+  create_journaler();
 }
 
-int Journal::close() {
+void Journal::close(Context *on_finish) {
   CephContext *cct = m_image_ctx.cct;
-  ldout(cct, 20) << this << " " << __func__ << ": state=" << m_state << dendl;
+  ldout(cct, 20) << this << " " << __func__ << dendl;
+
+  on_finish = util::create_async_context_callback(m_image_ctx, on_finish);
 
   Mutex::Locker locker(m_lock);
-  if (m_state == STATE_UNINITIALIZED) {
-    return 0;
+  assert(m_state != STATE_UNINITIALIZED);
+  if (m_state == STATE_CLOSED) {
+    on_finish->complete(m_error_result);
+    return;
   }
 
-  int r;
-  bool done = false;
-  while (!done) {
-    switch (m_state) {
-    case STATE_UNINITIALIZED:
-      done = true;
-      break;
-    case STATE_INITIALIZING:
-    case STATE_REPLAYING:
-      m_close_pending = true;
-      wait_for_state_transition();
-      break;
-    case STATE_STOPPING_RECORDING:
-      wait_for_state_transition();
-      break;
-    case STATE_RECORDING:
-      r = stop_recording();
-      if (r < 0) {
-        return r;
-      }
-      done = true;
-      break;
-    default:
-      assert(false);
-    }
+  if (m_state == STATE_READY) {
+    stop_recording();
   }
 
-  destroy_journaler();
-  return 0;
-}
-
-void Journal::close(Context *on_finish) {
-  // TODO
-  assert(false);
+  m_close_pending = true;
+  wait_for_steady_state(on_finish);
 }
 
 uint64_t Journal::append_io_event(AioCompletion *aio_comp,
@@ -309,7 +265,7 @@ uint64_t Journal::append_io_event(AioCompletion *aio_comp,
   uint64_t tid;
   {
     Mutex::Locker locker(m_lock);
-    assert(m_state == STATE_RECORDING);
+    assert(m_state == STATE_READY);
 
     future = m_journaler->append("", bl);
 
@@ -391,7 +347,7 @@ uint64_t Journal::append_op_event(journal::EventEntry &event_entry) {
   uint64_t tid;
   {
     Mutex::Locker locker(m_lock);
-    assert(m_state == STATE_RECORDING);
+    assert(m_state == STATE_READY);
 
     Mutex::Locker event_locker(m_event_lock);
     tid = ++m_event_tid;
@@ -423,7 +379,7 @@ void Journal::commit_op_event(uint64_t tid, int r) {
 
   {
     Mutex::Locker locker(m_lock);
-    assert(m_state == STATE_RECORDING);
+    assert(m_state == STATE_READY);
 
     m_journaler->committed(m_journaler->append("", bl));
   }
@@ -477,35 +433,45 @@ void Journal::create_journaler() {
   ldout(cct, 20) << this << " " << __func__ << dendl;
 
   assert(m_lock.is_locked());
-  assert(m_state == STATE_UNINITIALIZED);
+  assert(m_state == STATE_UNINITIALIZED || m_state == STATE_RESTARTING_REPLAY);
   assert(m_journaler == NULL);
 
-  m_close_pending = false;
+  transition_state(STATE_INITIALIZING, 0);
   m_journaler = new ::journal::Journaler(m_image_ctx.md_ctx, m_image_ctx.id, "",
                                          m_image_ctx.journal_commit_age);
   m_journaler->init(new C_InitJournal(this));
-  transition_state(STATE_INITIALIZING);
 }
 
-void Journal::destroy_journaler() {
+void Journal::destroy_journaler(int r) {
   CephContext *cct = m_image_ctx.cct;
-  ldout(cct, 20) << this << " " << __func__ << dendl;
+  ldout(cct, 20) << this << " " << __func__ << ": r=" << r << dendl;
 
   assert(m_lock.is_locked());
 
   delete m_journal_replay;
   m_journal_replay = NULL;
 
-  m_close_pending = false;
-  m_image_ctx.op_work_queue->queue(new C_DestroyJournaler(m_journaler), 0);
-  m_journaler = NULL;
+  transition_state(STATE_CLOSING, r);
+  m_image_ctx.op_work_queue->queue(new C_DestroyJournaler(this), 0);
+}
+
+void Journal::recreate_journaler(int r) {
+  CephContext *cct = m_image_ctx.cct;
+  ldout(cct, 20) << this << " " << __func__ << ": r=" << r << dendl;
+
+  assert(m_lock.is_locked());
+  assert(m_state == STATE_REPLAYING);
+
+  delete m_journal_replay;
+  m_journal_replay = NULL;
 
-  transition_state(STATE_UNINITIALIZED);
+  transition_state(STATE_RESTARTING_REPLAY, r);
+  m_image_ctx.op_work_queue->queue(new C_DestroyJournaler(this), 0);
 }
 
 void Journal::complete_event(Events::iterator it, int r) {
   assert(m_event_lock.is_locked());
-  assert(m_state == STATE_RECORDING);
+  assert(m_state == STATE_READY);
 
   CephContext *cct = m_image_ctx.cct;
   ldout(cct, 20) << this << " " << __func__ << ": tid=" << it->first << " "
@@ -519,28 +485,20 @@ void Journal::complete_event(Events::iterator it, int r) {
 
 void Journal::handle_initialized(int r) {
   CephContext *cct = m_image_ctx.cct;
-  if (r < 0) {
-    lderr(cct) << this << " " << __func__ << ": r=" << r << dendl;
-    Mutex::Locker locker(m_lock);
-
-    // TODO: failed to open journal -- retry?
-    destroy_journaler();
-    create_journaler();
-    return;
-  }
+  ldout(cct, 20) << this << " " << __func__ << ": r=" << r << dendl;
 
-  ldout(cct, 20) << this << " " << __func__ << dendl;
   Mutex::Locker locker(m_lock);
-  if (m_close_pending) {
-    destroy_journaler();
+
+  if (r < 0) {
+    lderr(cct) << this << " " << __func__
+               << "failed to initialize journal: " << cpp_strerror(r)
+               << dendl;
+    destroy_journaler(r);
     return;
   }
 
-  ldout(cct, 20) << __func__ << ": Journaler" << *m_journaler << dendl;
-
+  transition_state(STATE_REPLAYING, 0);
   m_journal_replay = new JournalReplay(m_image_ctx);
-
-  transition_state(STATE_REPLAYING);
   m_journaler->start_replay(&m_replay_handler);
 }
 
@@ -554,12 +512,6 @@ void Journal::handle_replay_ready() {
   }
 
   while (true) {
-    if (m_close_pending) {
-      m_journaler->stop_replay();
-      destroy_journaler();
-      return;
-    }
-
     ::journal::ReplayEntry replay_entry;
     if (!m_journaler->try_pop_front(&replay_entry)) {
       return;
@@ -568,12 +520,21 @@ void Journal::handle_replay_ready() {
     m_lock.Unlock();
     bufferlist data = replay_entry.get_data();
     bufferlist::iterator it = data.begin();
-    int r = m_journal_replay->process(it, new C_ReplayCommitted(m_journaler,
-                                                               std::move(replay_entry)));
+    int r = m_journal_replay->process(
+      it, new C_ReplayCommitted(m_journaler, std::move(replay_entry)));
     m_lock.Lock();
 
     if (r < 0) {
-      // TODO
+      lderr(cct) << "failed to replay journal entry: " << cpp_strerror(r)
+                 << dendl;
+      m_journaler->stop_replay();
+
+      if (m_close_pending) {
+        destroy_journaler(r);
+        return;
+      }
+
+      recreate_journaler(r);
     }
   }
 }
@@ -587,36 +548,66 @@ void Journal::handle_replay_complete(int r) {
       return;
     }
 
+    ldout(cct, 20) << this << " " << __func__ << dendl;
+    m_journaler->stop_replay();
+
     if (r == 0) {
       r = m_journal_replay->flush();
     }
-    delete m_journal_replay;
-    m_journal_replay = NULL;
 
     if (r < 0) {
       lderr(cct) << this << " " << __func__ << ": r=" << r << dendl;
-
-      // TODO: failed to replay journal -- retry?
-      destroy_journaler();
-      create_journaler();
+      recreate_journaler(r);
       return;
     }
 
-    ldout(cct, 20) << this << " " << __func__ << dendl;
-    m_journaler->stop_replay();
+    delete m_journal_replay;
+    m_journal_replay = NULL;
 
     if (m_close_pending) {
-      destroy_journaler();
+      destroy_journaler(0);
       return;
     }
 
+    m_error_result = 0;
     m_journaler->start_append(m_image_ctx.journal_object_flush_interval,
                              m_image_ctx.journal_object_flush_bytes,
                              m_image_ctx.journal_object_flush_age);
-    transition_state(STATE_RECORDING);
+    transition_state(STATE_READY, 0);
+  }
+}
+
+void Journal::handle_recording_stopped(int r) {
+  CephContext *cct = m_image_ctx.cct;
+  ldout(cct, 20) << this << " " << __func__ << ": r=" << r << dendl;
+
+  Mutex::Locker locker(m_lock);
+  assert(m_state == STATE_STOPPING);
+
+  destroy_journaler(r);
+}
 
-    unblock_writes();
+void Journal::handle_journal_destroyed(int r) {
+  CephContext *cct = m_image_ctx.cct;
+  ldout(cct, 20) << this << " " << __func__ << ": r=" << r << dendl;
+
+  if (r < 0) {
+    lderr(cct) << this << " " << __func__
+               << "error detected while closing journal: " << cpp_strerror(r)
+               << dendl;
+  }
+
+  Mutex::Locker locker(m_lock);
+  delete m_journaler;
+  m_journaler = nullptr;
+
+  assert(m_state == STATE_CLOSING || m_state == STATE_RESTARTING_REPLAY);
+  if (m_state == STATE_RESTARTING_REPLAY) {
+    create_journaler();
+    return;
   }
+
+  transition_state(STATE_CLOSED, r);
 }
 
 void Journal::handle_event_safe(int r, uint64_t tid) {
@@ -653,8 +644,6 @@ void Journal::handle_event_safe(int r, uint64_t tid) {
   } else {
     // send any waiting aio requests now that journal entry is safe
     RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
-    assert(m_image_ctx.exclusive_lock->is_lock_owner());
-
     for (AioObjectRequests::iterator it = aio_object_requests.begin();
          it != aio_object_requests.end(); ++it) {
       (*it)->send();
@@ -668,25 +657,15 @@ void Journal::handle_event_safe(int r, uint64_t tid) {
   }
 }
 
-int Journal::stop_recording() {
+void Journal::stop_recording() {
   assert(m_lock.is_locked());
   assert(m_journaler != NULL);
 
-  transition_state(STATE_STOPPING_RECORDING);
-
-  C_SaferCond cond;
-  m_lock.Unlock();
-  m_journaler->stop_append(&cond);
-  int r = cond.wait();
-  m_lock.Lock();
+  assert(m_state == STATE_READY);
+  transition_state(STATE_STOPPING, 0);
 
-  destroy_journaler();
-  if (r < 0) {
-    lderr(m_image_ctx.cct) << "failed to flush journal: " << cpp_strerror(r)
-                           << dendl;
-    return r;
-  }
-  return 0;
+  m_journaler->stop_append(util::create_async_context_callback(
+    m_image_ctx, new C_StopRecording(this)));
 }
 
 void Journal::block_writes() {
@@ -705,61 +684,49 @@ void Journal::unblock_writes() {
   }
 }
 
-void Journal::flush_journal() {
-  assert(m_lock.is_locked());
-
-  CephContext *cct = m_image_ctx.cct;
-  ldout(cct, 20) << this << " " << __func__ << dendl;
-
-  m_lock.Unlock();
-  C_SaferCond cond_ctx;
-  m_journaler->flush(&cond_ctx);
-  cond_ctx.wait();
-  m_lock.Lock();
-}
-
-void Journal::transition_state(State state) {
+void Journal::transition_state(State state, int r) {
   CephContext *cct = m_image_ctx.cct;
   ldout(cct, 20) << this << " " << __func__ << ": new state=" << state << dendl;
   assert(m_lock.is_locked());
   m_state = state;
-  m_cond.Signal();
 
-  Contexts wait_for_state_contexts;
-  wait_for_state_contexts.swap(m_wait_for_state_contexts);
-  for (Contexts::iterator it = wait_for_state_contexts.begin();
-       it != wait_for_state_contexts.end(); ++it) {
-    (*it)->complete(0);
+  if (m_error_result == 0 && r < 0) {
+    m_error_result = r;
   }
-}
 
-void Journal::wait_for_state_transition() {
-  assert(m_lock.is_locked());
-  State state = m_state;
-  while (m_state == state) {
-    m_cond.Wait(m_lock);
+  if (is_steady_state()) {
+    Contexts wait_for_state_contexts(std::move(m_wait_for_state_contexts));
+    for (auto ctx : wait_for_state_contexts) {
+      ctx->complete(m_error_result);
+    }
   }
 }
 
-void Journal::schedule_wait_for_ready(Context *on_ready) {
-  CephContext *cct = m_image_ctx.cct;
-  ldout(cct, 20) << this << __func__ << ": on_ready=" << on_ready << dendl;
-
+bool Journal::is_steady_state() const {
   assert(m_lock.is_locked());
-  m_wait_for_state_contexts.push_back(new C_WaitForReady(this, on_ready));
+  switch (m_state) {
+  case STATE_READY:
+  case STATE_CLOSED:
+    return true;
+  case STATE_UNINITIALIZED:
+  case STATE_INITIALIZING:
+  case STATE_REPLAYING:
+  case STATE_RESTARTING_REPLAY:
+  case STATE_STOPPING:
+  case STATE_CLOSING:
+    break;
+  }
+  return false;
 }
 
-void Journal::handle_wait_for_ready(Context *on_ready) {
+void Journal::wait_for_steady_state(Context *on_state) {
   assert(m_lock.is_locked());
-  CephContext *cct = m_image_ctx.cct;
-  ldout(cct, 20) << this << " " << __func__ << ": on_ready=" << on_ready << ", "
-                 << "state=" << m_state << dendl;
+  assert(!is_steady_state());
 
-  if (m_state == STATE_RECORDING) {
-    m_image_ctx.op_work_queue->queue(on_ready, 0);
-  } else {
-    schedule_wait_for_ready(on_ready);
-  }
+  CephContext *cct = m_image_ctx.cct;
+  ldout(cct, 20) << this << " " << __func__ << ": on_state=" << on_state
+                 << dendl;
+  m_wait_for_state_contexts.push_back(on_state);
 }
 
 std::ostream &operator<<(std::ostream &os, const Journal::State &state) {
@@ -773,11 +740,20 @@ std::ostream &operator<<(std::ostream &os, const Journal::State &state) {
   case Journal::STATE_REPLAYING:
     os << "Replaying";
     break;
-  case Journal::STATE_RECORDING:
-    os << "Recording";
+  case Journal::STATE_RESTARTING_REPLAY:
+    os << "RestartingReplay";
+    break;
+  case Journal::STATE_READY:
+    os << "Ready";
+    break;
+  case Journal::STATE_STOPPING:
+    os << "Stopping";
+    break;
+  case Journal::STATE_CLOSING:
+    os << "Closing";
     break;
-  case Journal::STATE_STOPPING_RECORDING:
-    os << "StoppingRecording";
+  case Journal::STATE_CLOSED:
+    os << "Closed";
     break;
   default:
     os << "Unknown (" << static_cast<uint32_t>(state) << ")";
index 190bf91397c22075dd1193f26710bb0f8ba4bdfd..0ea9ae428827a641a7a74c780f06058766d47665 100644 (file)
@@ -10,7 +10,6 @@
 #include "include/unordered_map.h"
 #include "include/rados/librados.hpp"
 #include "common/Mutex.h"
-#include "common/Cond.h"
 #include "journal/Future.h"
 #include "journal/ReplayHandler.h"
 #include <algorithm>
@@ -51,11 +50,8 @@ public:
   bool is_journal_replaying() const;
 
   void wait_for_journal_ready(Context *on_ready);
-  void wait_for_journal_ready();
 
-  void open();  // TODO remove
   void open(Context *on_finish);
-  int close();  // TODO remove
   void close(Context *on_finish);
 
   uint64_t append_io_event(AioCompletion *aio_comp,
@@ -77,12 +73,42 @@ private:
   typedef std::list<Context *> Contexts;
   typedef interval_set<uint64_t> ExtentInterval;
 
+  /**
+   * @verbatim
+   *
+   * <start>
+   *    |
+   *    v
+   * UNINITIALIZED ---> INITIALIZING ---> REPLAYING ------> READY
+   *    |                 *  .  ^             *  .            |
+   *    |                 *  .  |             *  .            |
+   *    |                 *  .  |    (error)  *  . . . .      |
+   *    |                 *  .  |             *        .      |
+   *    |                 *  .  |             v        .      v
+   *    |                 *  .  |         RESTARTING   .    STOPPING
+   *    |                 *  .  |             |        .      |
+   *    |                 *  .  |             |        .      |
+   *    |       * * * * * *  .  \-------------/        .      |
+   *    |       * (error)    .                         .      |
+   *    |       *            .   . . . . . . . . . . . .      |
+   *    |       *            .   .                            |
+   *    |       v            v   v                            |
+   *    |     CLOSED <----- CLOSING <-------------------------/
+   *    |       |
+   *    |       v
+   *    \---> <finish>
+   *
+   * @endverbatim
+   */
   enum State {
     STATE_UNINITIALIZED,
     STATE_INITIALIZING,
     STATE_REPLAYING,
-    STATE_RECORDING,
-    STATE_STOPPING_RECORDING
+    STATE_RESTARTING_REPLAY,
+    STATE_READY,
+    STATE_STOPPING,
+    STATE_CLOSING,
+    STATE_CLOSED
   };
 
   struct Event {
@@ -118,29 +144,38 @@ private:
     }
   };
 
-  struct C_EventSafe : public Context {
+  struct C_StopRecording : public Context {
     Journal *journal;
-    uint64_t tid;
 
-    C_EventSafe(Journal *_journal, uint64_t _tid)
-      : journal(_journal), tid(_tid) {
+    C_StopRecording(Journal *_journal) : journal(_journal) {
     }
 
     virtual void finish(int r) {
-      journal->handle_event_safe(r, tid);
+      journal->handle_recording_stopped(r);
     }
   };
 
-  struct C_WaitForReady : public Context {
+  struct C_DestroyJournaler : public Context {
     Journal *journal;
-    Context *on_ready;
 
-    C_WaitForReady(Journal *_journal, Context *_on_ready)
-      : journal(_journal), on_ready(_on_ready) {
+    C_DestroyJournaler(Journal *_journal) : journal(_journal) {
     }
 
     virtual void finish(int r) {
-      journal->handle_wait_for_ready(on_ready);
+      journal->handle_journal_destroyed(r);
+    }
+  };
+
+  struct C_EventSafe : public Context {
+    Journal *journal;
+    uint64_t tid;
+
+    C_EventSafe(Journal *_journal, uint64_t _tid)
+      : journal(_journal), tid(_tid) {
+    }
+
+    virtual void finish(int r) {
+      journal->handle_event_safe(r, tid);
     }
   };
 
@@ -168,9 +203,9 @@ private:
 
   ::journal::Journaler *m_journaler;
   mutable Mutex m_lock;
-  Cond m_cond;
   State m_state;
 
+  int m_error_result;
   Contexts m_wait_for_state_contexts;
 
   ReplayHandler m_replay_handler;
@@ -187,7 +222,8 @@ private:
   ::journal::Future wait_event(Mutex &lock, uint64_t tid, Context *on_safe);
 
   void create_journaler();
-  void destroy_journaler();
+  void destroy_journaler(int r);
+  void recreate_journaler(int r);
 
   void complete_event(Events::iterator it, int r);
 
@@ -196,18 +232,21 @@ private:
   void handle_replay_ready();
   void handle_replay_complete(int r);
 
+  void handle_recording_stopped(int r);
+
+  void handle_journal_destroyed(int r);
+
   void handle_event_safe(int r, uint64_t tid);
 
-  int stop_recording();
+  void stop_recording();
 
   void block_writes();
   void unblock_writes();
 
-  void flush_journal();
-  void transition_state(State state);
-  void wait_for_state_transition();
-  void schedule_wait_for_ready(Context *on_ready);
-  void handle_wait_for_ready(Context *on_ready);
+  void transition_state(State state, int r);
+
+  bool is_steady_state() const;
+  void wait_for_steady_state(Context *on_state);
 
   friend std::ostream &operator<<(std::ostream &os, const State &state);
 };
index 2ff9e5dfd2ee59786dbacc6eab2e020e7e142f3f..acf6d959bc84ca7e1d543406246dcbfa9fbab3b3 100644 (file)
@@ -2310,11 +2310,22 @@ int validate_pool(IoCtx &io_ctx, CephContext *cct) {
        return -EROFS;
       }
 
+      ictx->snap_lock.get_read();
       if (ictx->journal != NULL) {
-        ictx->journal->wait_for_journal_ready();
+        C_SaferCond journal_ctx;
+        ictx->journal->wait_for_journal_ready(&journal_ctx);
+
+        ictx->snap_lock.put_read();
+        r = journal_ctx.wait();
+        if (r < 0) {
+          lderr(cct) << "Failed to initialize journal: " << cpp_strerror(r)
+                     << dendl;
+          return r;
+        }
+
+        ictx->snap_lock.get_read();
       }
 
-      ictx->snap_lock.get_read();
       new_size = ictx->get_image_size(snap_id);
       ictx->snap_lock.put_read();
     }