]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
librbd: asynchronously replay journal events
authorJason Dillaman <dillaman@redhat.com>
Tue, 22 Dec 2015 01:27:36 +0000 (20:27 -0500)
committerJason Dillaman <dillaman@redhat.com>
Fri, 15 Jan 2016 15:40:29 +0000 (10:40 -0500)
Properly track when AIO operations are safely committed to
disk before marking the journal events are committed.

Signed-off-by: Jason Dillaman <dillaman@redhat.com>
src/librbd/Journal.cc
src/librbd/Journal.h
src/librbd/journal/Replay.cc
src/librbd/journal/Replay.h

index 6b050556e7497a4b0f0c6c2fabde9dfa7f8e60fd..b30a34a5ef5d5d9b9fc1e7a88728d3a90fdc22d8 100644 (file)
@@ -27,23 +27,6 @@ namespace {
 
 const std::string CLIENT_DESCRIPTION = "master image";
 
-template <typename ImageCtxT>
-struct C_ReplayCommitted : public Context {
-  typedef journal::TypeTraits<ImageCtxT> TypeTraits;
-  typedef typename TypeTraits::Journaler Journaler;
-  typedef typename TypeTraits::ReplayEntry ReplayEntry;
-
-  Journaler *journaler;
-  ReplayEntry replay_entry;
-
-  C_ReplayCommitted(Journaler *journaler, ReplayEntry &&replay_entry) :
-    journaler(journaler), replay_entry(std::move(replay_entry)) {
-  }
-  virtual void finish(int r) {
-    journaler->committed(replay_entry);
-  }
-};
-
 } // anonymous namespace
 
 template <typename I>
@@ -569,78 +552,100 @@ void Journal<I>::handle_initialized(int r) {
 
 template <typename I>
 void Journal<I>::handle_replay_ready() {
+  ReplayEntry replay_entry;
+  {
+    Mutex::Locker locker(m_lock);
+    if (m_state != STATE_REPLAYING) {
+      return;
+    }
+
+    CephContext *cct = m_image_ctx.cct;
+    ldout(cct, 20) << this << " " << __func__ << dendl;
+    if (!m_journaler->try_pop_front(&replay_entry)) {
+      return;
+    }
+  }
+
+  bufferlist data = replay_entry.get_data();
+  bufferlist::iterator it = data.begin();
+  Context *on_ready = create_context_callback<
+      Journal<I>, &Journal<I>::handle_replay_process_ready>(this);
+  Context *on_commit = new C_ReplayProcessSafe(this, std::move(replay_entry));
+
+  m_journal_replay->process(&it, on_ready, on_commit);
+}
+
+template <typename I>
+void Journal<I>::handle_replay_complete(int r) {
   CephContext *cct = m_image_ctx.cct;
-  ldout(cct, 20) << this << " " << __func__ << dendl;
 
   Mutex::Locker locker(m_lock);
   if (m_state != STATE_REPLAYING) {
     return;
   }
 
-  while (true) {
-    ReplayEntry replay_entry;
-    if (!m_journaler->try_pop_front(&replay_entry)) {
-      return;
-    }
-
-    m_lock.Unlock();
-    bufferlist data = replay_entry.get_data();
-    bufferlist::iterator it = data.begin();
+  ldout(cct, 20) << this << " " << __func__ << ": r=" << r << dendl;
+  m_journaler->stop_replay();
+  if (r < 0) {
+    transition_state(STATE_FLUSHING_RESTART, r);
 
-    Context *on_commit = new C_ReplayCommitted<I>(m_journaler,
-                                                  std::move(replay_entry));
-    int r = m_journal_replay->process(it, on_commit);
-    m_lock.Lock();
+    m_journal_replay->flush(create_context_callback<
+      Journal<I>, &Journal<I>::handle_flushing_restart>(this));
+  } else {
+    transition_state(STATE_FLUSHING_REPLAY, 0);
 
-    if (r < 0) {
-      lderr(cct) << "failed to replay journal entry: " << cpp_strerror(r)
-                 << dendl;
-      delete on_commit;
+    m_journal_replay->flush(create_context_callback<
+      Journal<I>, &Journal<I>::handle_flushing_replay>(this));
+  }
+}
 
-      m_journaler->stop_replay();
+template <typename I>
+void Journal<I>::handle_replay_process_ready(int r) {
+  // journal::Replay is ready for more events -- attempt to pop another
+  CephContext *cct = m_image_ctx.cct;
+  ldout(cct, 20) << this << " " << __func__ << dendl;
 
-      transition_state(STATE_FLUSHING_RESTART, r);
-      m_journal_replay->flush(create_async_context_callback(
-        m_image_ctx, create_context_callback<
-          Journal<I>, &Journal<I>::handle_flushing_restart>(this)));
-      return;
-    }
-  }
+  assert(r == 0);
+  handle_replay_ready();
 }
 
 template <typename I>
-void Journal<I>::handle_replay_complete(int r) {
+void Journal<I>::handle_replay_process_safe(ReplayEntry replay_entry, int r) {
+  Mutex::Locker locker(m_lock);
+
   CephContext *cct = m_image_ctx.cct;
+  ldout(cct, 20) << this << " " << __func__ << ": r=" << r << dendl;
+  if (r < 0) {
+    lderr(cct) << "failed to commit journal event to disk: " << cpp_strerror(r)
+               << dendl;
 
-  {
-    Mutex::Locker locker(m_lock);
-    if (m_state != STATE_REPLAYING) {
-      return;
-    }
+    if (m_state == STATE_REPLAYING) {
+      // abort the replay if we have an error
+      m_journaler->stop_replay();
+      transition_state(STATE_FLUSHING_RESTART, r);
 
-    ldout(cct, 20) << this << " " << __func__ << dendl;
-    m_journaler->stop_replay();
-    if (r < 0) {
+      m_journal_replay->flush(create_context_callback<
+        Journal<I>, &Journal<I>::handle_flushing_restart>(this));
+      return;
+    } else if (m_state == STATE_FLUSHING_REPLAY) {
+      // end-of-replay flush in-progress -- we need to restart replay
       transition_state(STATE_FLUSHING_RESTART, r);
-      m_journal_replay->flush(create_async_context_callback(
-        m_image_ctx, create_context_callback<
-          Journal<I>, &Journal<I>::handle_flushing_restart>(this)));
       return;
     }
-
-    transition_state(STATE_FLUSHING_REPLAY, 0);
-    m_journal_replay->flush(create_async_context_callback(
-      m_image_ctx, create_context_callback<
-        Journal<I>, &Journal<I>::handle_flushing_replay>(this)));
+  } else {
+    // only commit the entry if written successfully
+    m_journaler->committed(replay_entry);
   }
 }
 
 template <typename I>
 void Journal<I>::handle_flushing_restart(int r) {
+  Mutex::Locker locker(m_lock);
+
   CephContext *cct = m_image_ctx.cct;
-  ldout(cct, 20) << this << " " << __func__ << ": r=" << r << dendl;
+  ldout(cct, 20) << this << " " << __func__ << dendl;
 
-  Mutex::Locker locker(m_lock);
+  assert(r == 0);
   assert(m_state == STATE_FLUSHING_RESTART);
   if (m_close_pending) {
     destroy_journaler(r);
@@ -652,19 +657,19 @@ void Journal<I>::handle_flushing_restart(int r) {
 
 template <typename I>
 void Journal<I>::handle_flushing_replay(int r) {
+  Mutex::Locker locker(m_lock);
+
   CephContext *cct = m_image_ctx.cct;
   ldout(cct, 20) << this << " " << __func__ << ": r=" << r << dendl;
 
-  Mutex::Locker locker(m_lock);
-  assert(m_state == STATE_FLUSHING_REPLAY);
+  assert(r == 0);
+  assert(m_state == STATE_FLUSHING_REPLAY || m_state == STATE_FLUSHING_RESTART);
   if (m_close_pending) {
     destroy_journaler(r);
     return;
-  }
-
-  if (r < 0) {
-    lderr(cct) << this << " " << __func__ << ": r=" << r << dendl;
-    recreate_journaler(r);
+  } else if (m_state == STATE_FLUSHING_RESTART) {
+    // failed to replay one-or-more events -- restart
+    recreate_journaler(0);
     return;
   }
 
index 1740bc6a67616a317b00eb09146a83ce1eec16ce..eda2eb92eed6d5091c9180c7059c4c5e3840ecb8 100644 (file)
@@ -12,6 +12,7 @@
 #include "include/rados/librados.hpp"
 #include "common/Mutex.h"
 #include "journal/Future.h"
+#include "journal/ReplayEntry.h"
 #include "journal/ReplayHandler.h"
 #include <algorithm>
 #include <iosfwd>
@@ -21,7 +22,6 @@
 class Context;
 namespace journal {
 class Journaler;
-class ReplayEntry;
 }
 
 namespace librbd {
@@ -198,6 +198,18 @@ private:
     }
   };
 
+  struct C_ReplayProcessSafe : public Context {
+    Journal *journal;
+    ReplayEntry replay_entry;
+
+    C_ReplayProcessSafe(Journal *journal, ReplayEntry &&replay_entry) :
+      journal(journal), replay_entry(std::move(replay_entry)) {
+    }
+    virtual void finish(int r) {
+      journal->handle_replay_process_safe(replay_entry, r);
+    }
+  };
+
   struct ReplayHandler : public ::journal::ReplayHandler {
     Journal *journal;
     ReplayHandler(Journal *_journal) : journal(_journal) {
@@ -250,6 +262,8 @@ private:
 
   void handle_replay_ready();
   void handle_replay_complete(int r);
+  void handle_replay_process_ready(int r);
+  void handle_replay_process_safe(ReplayEntry replay_entry, int r);
 
   void handle_flushing_restart(int r);
   void handle_flushing_replay(int r);
index c2b5eabe074121d7c6bad6142e156d698f857120..95a412f519d7f128166380888fa2de1510ea7986 100644 (file)
@@ -2,6 +2,8 @@
 // vim: ts=8 sw=2 smarttab
 
 #include "librbd/journal/Replay.h"
+#include "common/dout.h"
+#include "common/errno.h"
 #include "common/WorkQueue.h"
 #include "librbd/AioCompletion.h"
 #include "librbd/AioImageRequest.h"
@@ -19,38 +21,45 @@ namespace journal {
 
 namespace {
 
+static const uint64_t IN_FLIGHT_IO_LOW_WATER_MARK(32);
+static const uint64_t IN_FLIGHT_IO_HIGH_WATER_MARK(64);
+
 static NoOpProgressContext no_op_progress_callback;
 
 } // anonymous namespace
 
-
 template <typename I>
 Replay<I>::Replay(I &image_ctx)
-  : m_image_ctx(image_ctx), m_lock("Replay<I>::m_lock"), m_flush_ctx(nullptr),
-    m_ret_val(0) {
+  : m_image_ctx(image_ctx), m_lock("Replay<I>::m_lock") {
 }
 
 template <typename I>
 Replay<I>::~Replay() {
-  assert(m_op_contexts.empty() && m_aio_completions.empty());
+  assert(m_in_flight_aio == 0);
+  assert(m_aio_modify_unsafe_contexts.empty());
+  assert(m_aio_modify_safe_contexts.empty());
+  assert(m_op_contexts.empty());
 }
 
 template <typename I>
-int Replay<I>::process(bufferlist::iterator it, Context *on_safe) {
+void Replay<I>::process(bufferlist::iterator *it, Context *on_ready,
+                        Context *on_safe) {
   CephContext *cct = m_image_ctx.cct;
   ldout(cct, 20) << this << " " << __func__ << dendl;
 
-  RWLock::RLocker owner_lock(m_image_ctx.owner_lock);
   journal::EventEntry event_entry;
   try {
-    ::decode(event_entry, it);
+    ::decode(event_entry, *it);
   } catch (const buffer::error &err) {
     lderr(cct) << "failed to decode event entry: " << err.what() << dendl;
-    return -EINVAL;
+    on_ready->complete(-EINVAL);
+    return;
   }
 
-  boost::apply_visitor(EventVisitor(this, on_safe), event_entry.event);
-  return 0;
+  Mutex::Locker locker(m_lock);
+  RWLock::RLocker owner_lock(m_image_ctx.owner_lock);
+  boost::apply_visitor(EventVisitor(this, on_ready, on_safe),
+                       event_entry.event);
 }
 
 template <typename I>
@@ -65,108 +74,127 @@ void Replay<I>::flush(Context *on_finish) {
     assert(m_flush_ctx == nullptr);
     m_flush_ctx = on_finish;
 
-    if (!m_op_contexts.empty() || !m_aio_completions.empty()) {
+    if (m_in_flight_aio != 0) {
+      flush_aio();
+    }
+
+    if (!m_op_contexts.empty() || m_in_flight_aio != 0) {
       return;
     }
   }
-  on_finish->complete(m_ret_val);
+  on_finish->complete(0);
 }
 
 template <typename I>
 void Replay<I>::handle_event(const journal::AioDiscardEvent &event,
-                             Context *on_safe) {
+                             Context *on_ready, Context *on_safe) {
   CephContext *cct = m_image_ctx.cct;
   ldout(cct, 20) << this << " " << __func__ << ": AIO discard event" << dendl;
 
-  AioCompletion *aio_comp = create_aio_completion(on_safe);
+  bool flush_required;
+  AioCompletion *aio_comp = create_aio_modify_completion(on_ready, on_safe,
+                                                         &flush_required);
   AioImageRequest<I>::aio_discard(&m_image_ctx, aio_comp, event.offset,
                                   event.length);
+  if (flush_required) {
+    flush_aio();
+  }
 }
 
 template <typename I>
 void Replay<I>::handle_event(const journal::AioWriteEvent &event,
-                             Context *on_safe) {
+                             Context *on_ready, Context *on_safe) {
   CephContext *cct = m_image_ctx.cct;
   ldout(cct, 20) << this << " " << __func__ << ": AIO write event" << dendl;
 
   bufferlist data = event.data;
-  AioCompletion *aio_comp = create_aio_completion(on_safe);
+  bool flush_required;
+  AioCompletion *aio_comp = create_aio_modify_completion(on_ready, on_safe,
+                                                         &flush_required);
   AioImageRequest<I>::aio_write(&m_image_ctx, aio_comp, event.offset,
                                 event.length, data.c_str(), 0);
+  if (flush_required) {
+    flush_aio();
+  }
 }
 
 template <typename I>
 void Replay<I>::handle_event(const journal::AioFlushEvent &event,
-                            Context *on_safe) {
+                            Context *on_ready, Context *on_safe) {
   CephContext *cct = m_image_ctx.cct;
   ldout(cct, 20) << this << " " << __func__ << ": AIO flush event" << dendl;
 
-  AioCompletion *aio_comp = create_aio_completion(on_safe);
+  AioCompletion *aio_comp = create_aio_flush_completion(on_ready, on_safe);
   AioImageRequest<I>::aio_flush(&m_image_ctx, aio_comp);
 }
 
 template <typename I>
 void Replay<I>::handle_event(const journal::OpFinishEvent &event,
-                             Context *on_safe) {
+                             Context *on_ready, Context *on_safe) {
   CephContext *cct = m_image_ctx.cct;
   ldout(cct, 20) << this << " " << __func__ << ": Op finish event" << dendl;
 }
 
 template <typename I>
 void Replay<I>::handle_event(const journal::SnapCreateEvent &event,
-                            Context *on_safe) {
+                            Context *on_ready, Context *on_safe) {
   CephContext *cct = m_image_ctx.cct;
   ldout(cct, 20) << this << " " << __func__ << ": Snap create event" << dendl;
 
+  // TODO not-ready until state machine lets us know
   Context *on_finish = create_op_context_callback(on_safe);
   m_image_ctx.operations->snap_create(event.snap_name.c_str(), on_finish);
 }
 
 template <typename I>
 void Replay<I>::handle_event(const journal::SnapRemoveEvent &event,
-                            Context *on_safe) {
+                            Context *on_ready, Context *on_safe) {
   CephContext *cct = m_image_ctx.cct;
   ldout(cct, 20) << this << " " << __func__ << ": Snap remove event" << dendl;
 
   Context *on_finish = create_op_context_callback(on_safe);
   m_image_ctx.operations->snap_remove(event.snap_name.c_str(), on_finish);
+  on_ready->complete(0);
 }
 
 template <typename I>
 void Replay<I>::handle_event(const journal::SnapRenameEvent &event,
-                            Context *on_safe) {
+                            Context *on_ready, Context *on_safe) {
   CephContext *cct = m_image_ctx.cct;
   ldout(cct, 20) << this << " " << __func__ << ": Snap rename event" << dendl;
 
   Context *on_finish = create_op_context_callback(on_safe);
   m_image_ctx.operations->snap_rename(event.snap_id, event.snap_name.c_str(),
                                       on_finish);
+  on_ready->complete(0);
 }
 
 template <typename I>
 void Replay<I>::handle_event(const journal::SnapProtectEvent &event,
-                            Context *on_safe) {
+                            Context *on_ready, Context *on_safe) {
   CephContext *cct = m_image_ctx.cct;
   ldout(cct, 20) << this << " " << __func__ << ": Snap protect event" << dendl;
 
   Context *on_finish = create_op_context_callback(on_safe);
   m_image_ctx.operations->snap_protect(event.snap_name.c_str(), on_finish);
+  on_ready->complete(0);
 }
 
 template <typename I>
 void Replay<I>::handle_event(const journal::SnapUnprotectEvent &event,
-                            Context *on_safe) {
+                            Context *on_ready, Context *on_safe) {
   CephContext *cct = m_image_ctx.cct;
   ldout(cct, 20) << this << " " << __func__ << ": Snap unprotect event"
                  << dendl;
 
   Context *on_finish = create_op_context_callback(on_safe);
   m_image_ctx.operations->snap_unprotect(event.snap_name.c_str(), on_finish);
+  on_ready->complete(0);
 }
 
 template <typename I>
 void Replay<I>::handle_event(const journal::SnapRollbackEvent &event,
-                            Context *on_safe) {
+                            Context *on_ready, Context *on_safe) {
   CephContext *cct = m_image_ctx.cct;
   ldout(cct, 20) << this << " " << __func__ << ": Snap rollback start event"
                  << dendl;
@@ -174,24 +202,27 @@ void Replay<I>::handle_event(const journal::SnapRollbackEvent &event,
   Context *on_finish = create_op_context_callback(on_safe);
   m_image_ctx.operations->snap_rollback(event.snap_name.c_str(),
                                         no_op_progress_callback, on_finish);
+  on_ready->complete(0);
 }
 
 template <typename I>
 void Replay<I>::handle_event(const journal::RenameEvent &event,
-                            Context *on_safe) {
+                            Context *on_ready, Context *on_safe) {
   CephContext *cct = m_image_ctx.cct;
   ldout(cct, 20) << this << " " << __func__ << ": Rename event" << dendl;
 
   Context *on_finish = create_op_context_callback(on_safe);
   m_image_ctx.operations->rename(event.image_name.c_str(), on_finish);
+  on_ready->complete(0);
 }
 
 template <typename I>
 void Replay<I>::handle_event(const journal::ResizeEvent &event,
-                            Context *on_safe) {
+                            Context *on_ready, Context *on_safe) {
   CephContext *cct = m_image_ctx.cct;
   ldout(cct, 20) << this << " " << __func__ << ": Resize start event" << dendl;
 
+  // TODO not-ready until state machine lets us know
   Context *on_finish = create_op_context_callback(on_safe);
   m_image_ctx.operations->resize(event.size, no_op_progress_callback,
                                  on_finish);
@@ -199,108 +230,186 @@ void Replay<I>::handle_event(const journal::ResizeEvent &event,
 
 template <typename I>
 void Replay<I>::handle_event(const journal::FlattenEvent &event,
-                            Context *on_safe) {
+                            Context *on_ready, Context *on_safe) {
   CephContext *cct = m_image_ctx.cct;
   ldout(cct, 20) << this << " " << __func__ << ": Flatten start event" << dendl;
 
   Context *on_finish = create_op_context_callback(on_safe);
   m_image_ctx.operations->flatten(no_op_progress_callback, on_finish);
+  on_ready->complete(0);
 }
 
 template <typename I>
 void Replay<I>::handle_event(const journal::UnknownEvent &event,
-                            Context *on_safe) {
+                            Context *on_ready, Context *on_safe) {
   CephContext *cct = m_image_ctx.cct;
   ldout(cct, 20) << this << " " << __func__ << ": unknown event" << dendl;
+  on_ready->complete(0);
   on_safe->complete(0);
 }
 
 template <typename I>
-Context *Replay<I>::create_op_context_callback(Context *on_safe) {
-  C_OpOnFinish *on_finish;
-  {
-    on_finish = new C_OpOnFinish(this);
-    m_op_contexts[on_finish] = on_safe;
+void Replay<I>::flush_aio() {
+  assert(m_lock.is_locked());
+
+  AioCompletion *aio_comp = create_aio_flush_completion(nullptr, nullptr);
+  AioImageRequest<I>::aio_flush(&m_image_ctx, aio_comp);
+}
+
+template <typename I>
+void Replay<I>::handle_aio_modify_complete(Context *on_safe, int r) {
+  Mutex::Locker locker(m_lock);
+  CephContext *cct = m_image_ctx.cct;
+  ldout(cct, 20) << this << " " << __func__ << ": on_safe=" << on_safe << ", "
+                 << "r=" << r << dendl;
+
+  if (r < 0) {
+    lderr(cct) << "AIO modify op failed: " << cpp_strerror(r) << dendl;
+    on_safe->complete(r);
+    return;
   }
-  return on_finish;
+
+  // will be completed after next flush operation completes
+  m_aio_modify_safe_contexts.insert(on_safe);
 }
 
 template <typename I>
-void Replay<I>::handle_op_context_callback(Context *on_op_finish, int r) {
-  Context *on_safe = nullptr;
+void Replay<I>::handle_aio_flush_complete(Context *on_flush_safe,
+                                          Contexts &on_safe_ctxs, int r) {
+  CephContext *cct = m_image_ctx.cct;
+  ldout(cct, 20) << this << " " << __func__ << ": " << "r=" << r << dendl;
+
+  if (r < 0) {
+    lderr(cct) << "AIO flush failed: " << cpp_strerror(r) << dendl;
+  }
+
+  Context *on_aio_ready = nullptr;
   Context *on_flush = nullptr;
   {
     Mutex::Locker locker(m_lock);
-    auto it = m_op_contexts.find(on_op_finish);
-    assert(it != m_op_contexts.end());
+    assert(m_in_flight_aio >= on_safe_ctxs.size());
+    m_in_flight_aio -= on_safe_ctxs.size();
 
-    if (m_ret_val == 0 && r < 0) {
-      m_ret_val = r;
+    std::swap(on_aio_ready, m_on_aio_ready);
+    if (m_op_contexts.empty() && m_in_flight_aio == 0) {
+      on_flush = m_flush_ctx;
     }
 
-    on_safe = it->second;
-    m_op_contexts.erase(it);
-    if (m_op_contexts.empty() && m_aio_completions.empty()) {
-      on_flush = m_flush_ctx;
+    // strip out previously failed on_safe contexts
+    for (auto it = on_safe_ctxs.begin(); it != on_safe_ctxs.end(); ) {
+      if (m_aio_modify_safe_contexts.erase(*it)) {
+        ++it;
+      } else {
+        it = on_safe_ctxs.erase(it);
+      }
     }
   }
 
-  on_safe->complete(r);
+  if (on_aio_ready != nullptr) {
+    ldout(cct, 10) << "resuming paused AIO" << dendl;
+    on_aio_ready->complete(0);
+  }
+  for (auto ctx : on_safe_ctxs) {
+    ctx->complete(r);
+  }
+  if (on_flush_safe != nullptr) {
+    on_flush_safe->complete(r);
+  }
   if (on_flush != nullptr) {
-    on_flush->complete(m_ret_val);
+    on_flush->complete(r);
   }
 }
 
 template <typename I>
-AioCompletion *Replay<I>::create_aio_completion(Context *on_safe) {
-  Mutex::Locker locker(m_lock);
-  AioCompletion *aio_comp = AioCompletion::create(this, aio_completion_callback,
-                                                  nullptr);
-  m_aio_completions.insert(std::pair<AioCompletion*,Context*>(
-                            aio_comp, on_safe));
-  return aio_comp;
+Context *Replay<I>::create_op_context_callback(Context *on_safe) {
+  assert(m_lock.is_locked());
+
+  C_OpOnFinish *on_finish;
+  {
+    on_finish = new C_OpOnFinish(this);
+    m_op_contexts[on_finish] = on_safe;
+  }
+  return on_finish;
 }
 
 template <typename I>
-void Replay<I>::handle_aio_completion(AioCompletion *aio_comp) {
-  int r;
+void Replay<I>::handle_op_context_callback(Context *on_op_finish, int r) {
   Context *on_safe = nullptr;
   Context *on_flush = nullptr;
   {
     Mutex::Locker locker(m_lock);
-    AioCompletions::iterator it = m_aio_completions.find(aio_comp);
-    assert(it != m_aio_completions.end());
-
-    r = aio_comp->get_return_value();
-    if (m_ret_val == 0 && r < 0) {
-      m_ret_val = r;
-    }
-
-    CephContext *cct = m_image_ctx.cct;
-    ldout(cct, 20) << this << " " << __func__ << ": "
-                   << "aio_comp=" << aio_comp << ", "
-                   << "r=" << r << dendl;
+    auto it = m_op_contexts.find(on_op_finish);
+    assert(it != m_op_contexts.end());
 
     on_safe = it->second;
-    m_aio_completions.erase(it);
-    if (m_op_contexts.empty() && m_aio_completions.empty()) {
+    m_op_contexts.erase(it);
+    if (m_op_contexts.empty() && m_in_flight_aio == 0) {
       on_flush = m_flush_ctx;
     }
   }
 
   on_safe->complete(r);
   if (on_flush != nullptr) {
-    on_flush->complete(m_ret_val);
+    on_flush->complete(0);
   }
 }
 
 template <typename I>
-void Replay<I>::aio_completion_callback(completion_t cb, void *arg) {
-  Replay *replay = reinterpret_cast<Replay *>(arg);
-  AioCompletion *aio_comp = reinterpret_cast<AioCompletion *>(cb);
+AioCompletion *Replay<I>::create_aio_modify_completion(Context *on_ready,
+                                                       Context *on_safe,
+                                                       bool *flush_required) {
+  CephContext *cct = m_image_ctx.cct;
+  assert(m_lock.is_locked());
+  assert(m_on_aio_ready == nullptr);
+
+  ++m_in_flight_aio;
+  m_aio_modify_unsafe_contexts.push_back(on_safe);
+
+  // FLUSH if we hit the low-water mark -- on_safe contexts are
+  // completed by flushes-only so that we don't move the journal
+  // commit position until safely on-disk
+
+  // when safe, the completion of the next flush will fire the on_safe
+  // callback
+  AioCompletion *aio_comp = AioCompletion::create<Context>(
+    new C_AioModifyComplete(this, on_safe));
+
+  *flush_required = (m_aio_modify_unsafe_contexts.size() ==
+                       IN_FLIGHT_IO_LOW_WATER_MARK);
+  if (*flush_required) {
+    ldout(cct, 10) << "hit AIO replay low-water mark: scheduling flush"
+                   << dendl;
+  }
 
-  replay->handle_aio_completion(aio_comp);
-  aio_comp->release();
+  // READY for more events if:
+  // * not at high-water mark for IO
+  // * in-flight ops are at a consistent point (snap create has IO flushed,
+  //   shrink has adjusted clip boundary, etc) -- should have already been
+  //   flagged not-ready
+  if (m_in_flight_aio == IN_FLIGHT_IO_HIGH_WATER_MARK) {
+    ldout(cct, 10) << "hit AIO replay high-water mark: pausing replay"
+                   << dendl;
+    m_on_aio_ready = on_ready;
+  } else {
+    on_ready->complete(0);
+  }
+
+  return aio_comp;
+}
+
+template <typename I>
+AioCompletion *Replay<I>::create_aio_flush_completion(Context *on_ready,
+                                                      Context *on_safe) {
+  // associate all prior write/discard ops to this flush request
+  AioCompletion *aio_comp = AioCompletion::create<Context>(
+      new C_AioFlushComplete(this, on_safe,
+                             std::move(m_aio_modify_unsafe_contexts)));
+  m_aio_modify_unsafe_contexts.clear();
+
+  if (on_ready != nullptr) {
+    on_ready->complete(0);
+  }
+  return aio_comp;
 }
 
 } // namespace journal
index 5a8c559866d6fa87edb6a3a36b1fabf8990e2f16..c71157da04dc0e0d2d593f236846a3483b838ec9 100644 (file)
@@ -7,12 +7,13 @@
 #include "include/int_types.h"
 #include "include/buffer_fwd.h"
 #include "include/Context.h"
+#include "include/unordered_set.h"
 #include "include/unordered_map.h"
 #include "include/rbd/librbd.hpp"
 #include "common/Mutex.h"
 #include "librbd/journal/Entries.h"
 #include <boost/variant.hpp>
-#include <map>
+#include <list>
 
 namespace librbd {
 
@@ -31,12 +32,13 @@ public:
   Replay(ImageCtxT &image_ctx);
   ~Replay();
 
-  int process(bufferlist::iterator it, Context *on_safe = NULL);
+  void process(bufferlist::iterator *it, Context *on_ready, Context *on_safe);
   void flush(Context *on_finish);
 
 private:
+  typedef std::list<Context *> Contexts;
+  typedef ceph::unordered_set<Context *> ContextSet;
   typedef ceph::unordered_map<Context *, Context *> OpContexts;
-  typedef std::map<AioCompletion*,Context*> AioCompletions;
 
   struct C_OpOnFinish : public Context {
     Replay *replay;
@@ -47,17 +49,43 @@ private:
     }
   };
 
+  struct C_AioModifyComplete : public Context {
+    Replay *replay;
+    Context *on_safe;
+    C_AioModifyComplete(Replay *replay, Context *on_safe)
+      : replay(replay), on_safe(on_safe) {
+    }
+    virtual void finish(int r) {
+      replay->handle_aio_modify_complete(on_safe, r);
+    }
+  };
+
+  struct C_AioFlushComplete : public Context {
+    Replay *replay;
+    Context *on_flush_safe;
+    Contexts on_safe_ctxs;
+    C_AioFlushComplete(Replay *replay, Context *on_flush_safe,
+                       Contexts &&on_safe_ctxs)
+      : replay(replay), on_flush_safe(on_flush_safe),
+        on_safe_ctxs(on_safe_ctxs) {
+    }
+    virtual void finish(int r) {
+      replay->handle_aio_flush_complete(on_flush_safe, on_safe_ctxs, r);
+    }
+  };
+
   struct EventVisitor : public boost::static_visitor<void> {
     Replay *replay;
+    Context *on_ready;
     Context *on_safe;
 
-    EventVisitor(Replay *_replay, Context *_on_safe)
-      : replay(_replay), on_safe(_on_safe) {
+    EventVisitor(Replay *_replay, Context *_on_ready, Context *_on_safe)
+      : replay(_replay), on_ready(_on_ready), on_safe(_on_safe) {
     }
 
     template <typename Event>
     inline void operator()(const Event &event) const {
-      replay->handle_event(event, on_safe);
+      replay->handle_event(event, on_ready, on_safe);
     }
   };
 
@@ -65,33 +93,59 @@ private:
 
   Mutex m_lock;
 
+  uint64_t m_in_flight_aio = 0;
+  Contexts m_aio_modify_unsafe_contexts;
+  ContextSet m_aio_modify_safe_contexts;
+
   OpContexts m_op_contexts;
-  AioCompletions m_aio_completions;
-  Context *m_flush_ctx;
-  int m_ret_val;
-
-  void handle_event(const AioDiscardEvent &event, Context *on_safe);
-  void handle_event(const AioWriteEvent &event, Context *on_safe);
-  void handle_event(const AioFlushEvent &event, Context *on_safe);
-  void handle_event(const OpFinishEvent &event, Context *on_safe);
-  void handle_event(const SnapCreateEvent &event, Context *on_safe);
-  void handle_event(const SnapRemoveEvent &event, Context *on_safe);
-  void handle_event(const SnapRenameEvent &event, Context *on_safe);
-  void handle_event(const SnapProtectEvent &event, Context *on_safe);
-  void handle_event(const SnapUnprotectEvent &event, Context *on_safe);
-  void handle_event(const SnapRollbackEvent &event, Context *on_safe);
-  void handle_event(const RenameEvent &event, Context *on_safe);
-  void handle_event(const ResizeEvent &event, Context *on_safe);
-  void handle_event(const FlattenEvent &event, Context *on_safe);
-  void handle_event(const UnknownEvent &event, Context *on_safe);
+
+  Context *m_flush_ctx = nullptr;
+  Context *m_on_aio_ready = nullptr;
+
+  void handle_event(const AioDiscardEvent &event, Context *on_ready,
+                    Context *on_safe);
+  void handle_event(const AioWriteEvent &event, Context *on_ready,
+                    Context *on_safe);
+  void handle_event(const AioFlushEvent &event, Context *on_ready,
+                    Context *on_safe);
+  void handle_event(const OpFinishEvent &event, Context *on_ready,
+                    Context *on_safe);
+  void handle_event(const SnapCreateEvent &event, Context *on_ready,
+                    Context *on_safe);
+  void handle_event(const SnapRemoveEvent &event, Context *on_ready,
+                    Context *on_safe);
+  void handle_event(const SnapRenameEvent &event, Context *on_ready,
+                    Context *on_safe);
+  void handle_event(const SnapProtectEvent &event, Context *on_ready,
+                    Context *on_safe);
+  void handle_event(const SnapUnprotectEvent &event, Context *on_ready,
+                    Context *on_safe);
+  void handle_event(const SnapRollbackEvent &event, Context *on_ready,
+                    Context *on_safe);
+  void handle_event(const RenameEvent &event, Context *on_ready,
+                    Context *on_safe);
+  void handle_event(const ResizeEvent &event, Context *on_ready,
+                    Context *on_safe);
+  void handle_event(const FlattenEvent &event, Context *on_ready,
+                    Context *on_safe);
+  void handle_event(const UnknownEvent &event, Context *on_ready,
+                    Context *on_safe);
+
+  void flush_aio();
+  void handle_aio_modify_complete(Context *on_safe, int r);
+  void handle_aio_flush_complete(Context *on_flush_safe, Contexts &on_safe_ctxs,
+                                 int r);
 
   Context *create_op_context_callback(Context *on_safe);
   void handle_op_context_callback(Context *on_op_finish, int r);
 
-  AioCompletion *create_aio_completion(Context *on_safe);
+  AioCompletion *create_aio_modify_completion(Context *on_ready,
+                                              Context *on_safe,
+                                              bool *flush_required);
+  AioCompletion *create_aio_flush_completion(Context *on_ready,
+                                             Context *on_safe);
   void handle_aio_completion(AioCompletion *aio_comp);
 
-  static void aio_completion_callback(completion_t cb, void *arg);
 };
 
 } // namespace journal