]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
librbd: add replay support for IO events
authorJason Dillaman <dillaman@redhat.com>
Wed, 22 Jul 2015 02:49:04 +0000 (22:49 -0400)
committerJason Dillaman <dillaman@redhat.com>
Fri, 13 Nov 2015 04:27:06 +0000 (23:27 -0500)
Signed-off-by: Jason Dillaman <dillaman@redhat.com>
src/librbd/AioImageRequest.cc
src/librbd/Journal.cc
src/librbd/Journal.h
src/librbd/JournalReplay.cc [new file with mode: 0644]
src/librbd/JournalReplay.h [new file with mode: 0644]
src/librbd/Makefile.am

index daa99ecad9b301f51299bc079021d9565821163d..f594f61f9cad0b649c297577d61d9eb32ae9c7f8 100644 (file)
@@ -237,7 +237,8 @@ void AbstractAioImageWrite::send_request() {
                                object_extents);
     }
 
-    journaling = (m_image_ctx.journal != NULL);
+    journaling = (m_image_ctx.journal != NULL &&
+                  !m_image_ctx.journal->is_journal_replaying());
   }
 
   assert(!m_image_ctx.image_watcher->is_lock_supported() ||
@@ -417,7 +418,8 @@ void AioImageFlush::send_request() {
   {
     // journal the flush event
     RWLock::RLocker snap_locker(m_image_ctx.snap_lock);
-    if (m_image_ctx.journal != NULL) {
+    if (m_image_ctx.journal != NULL &&
+        !m_image_ctx.journal->is_journal_replaying()) {
       uint64_t journal_tid = m_image_ctx.journal->append_event(
         m_aio_comp, journal::EventEntry(journal::AioFlushEvent()),
         AioObjectRequests(), 0, 0, false);
index a6a1394082dababff85bb0bd2680e27765f4366d..9618357330482652b5b35c0ddb4c73de721dbe78 100644 (file)
@@ -6,6 +6,7 @@
 #include "librbd/AioImageRequestWQ.h"
 #include "librbd/AioObjectRequest.h"
 #include "librbd/ImageCtx.h"
+#include "librbd/JournalReplay.h"
 #include "librbd/JournalTypes.h"
 #include "journal/Journaler.h"
 #include "journal/ReplayEntry.h"
@@ -37,7 +38,7 @@ Journal::Journal(ImageCtx &image_ctx)
   : m_image_ctx(image_ctx), m_journaler(NULL),
     m_lock("Journal::m_lock"), m_state(STATE_UNINITIALIZED),
     m_lock_listener(this), m_replay_handler(this), m_close_pending(false),
-    m_event_tid(0), m_blocking_writes(false) {
+    m_event_tid(0), m_blocking_writes(false), m_journal_replay(NULL) {
 
   ldout(m_image_ctx.cct, 5) << this << ": ictx=" << &m_image_ctx << dendl;
 
@@ -49,6 +50,7 @@ Journal::Journal(ImageCtx &image_ctx)
 
 Journal::~Journal() {
   assert(m_journaler == NULL);
+  assert(m_journal_replay == NULL);
 
   m_image_ctx.image_watcher->unregister_listener(&m_lock_listener);
 
@@ -62,6 +64,11 @@ bool Journal::is_journal_supported(ImageCtx &image_ctx) {
           !image_ctx.read_only && image_ctx.snap_id == CEPH_NOSNAP);
 }
 
+bool Journal::is_journal_replaying() const {
+  Mutex::Locker locker(m_lock);
+  return (m_state == STATE_REPLAYING);
+}
+
 int Journal::create(librados::IoCtx &io_ctx, const std::string &image_id) {
   CephContext *cct = reinterpret_cast<CephContext *>(io_ctx.cct());
   ldout(cct, 5) << __func__ << ": image=" << image_id << dendl;
@@ -314,6 +321,9 @@ void Journal::destroy_journaler() {
 
   assert(m_lock.is_locked());
 
+  delete m_journal_replay;
+  m_journal_replay = NULL;
+
   m_close_pending = false;
   m_image_ctx.op_work_queue->queue(new C_DestroyJournaler(m_journaler), 0);
   m_journaler = NULL;
@@ -352,6 +362,8 @@ void Journal::handle_initialized(int r) {
     return;
   }
 
+  m_journal_replay = new JournalReplay(m_image_ctx);
+
   transition_state(STATE_REPLAYING);
   m_journaler->start_replay(&m_replay_handler);
 }
@@ -378,8 +390,14 @@ void Journal::handle_replay_ready() {
     }
 
     m_lock.Unlock();
-    // TODO process the payload
+    bufferlist data = replay_entry.get_data();
+    bufferlist::iterator it = data.begin();
+    int r = m_journal_replay->process(it);
     m_lock.Lock();
+
+    if (r < 0) {
+      // TODO
+    }
   }
 }
 
@@ -392,6 +410,12 @@ void Journal::handle_replay_complete(int r) {
       return;
     }
 
+    if (r == 0) {
+      r = m_journal_replay->flush();
+    }
+    delete m_journal_replay;
+    m_journal_replay = NULL;
+
     if (r < 0) {
       lderr(cct) << this << " " << __func__ << ": r=" << r << dendl;
 
index 4ac2bee97c62370b2893f8b0357cdaac1c46b77c..e0e68a8ca98f26e4245cbec3ee5bd778605ae7b4 100644 (file)
@@ -28,6 +28,8 @@ namespace librbd {
 class AioCompletion;
 class AioObjectRequest;
 class ImageCtx;
+class JournalReplay;
+
 namespace journal {
 class EventEntry;
 }
@@ -44,6 +46,7 @@ public:
   static int remove(librados::IoCtx &io_ctx, const std::string &image_id);
 
   bool is_journal_ready() const;
+  bool is_journal_replaying() const;
 
   void open();
   int close();
@@ -157,7 +160,6 @@ private:
   ImageCtx &m_image_ctx;
 
   ::journal::Journaler *m_journaler;
-
   mutable Mutex m_lock;
   Cond m_cond;
   State m_state;
@@ -172,6 +174,8 @@ private:
 
   bool m_blocking_writes;
 
+  JournalReplay *m_journal_replay;
+
   ::journal::Future wait_event(Mutex &lock, uint64_t tid, Context *on_safe);
 
   void create_journaler();
diff --git a/src/librbd/JournalReplay.cc b/src/librbd/JournalReplay.cc
new file mode 100644 (file)
index 0000000..7daf10c
--- /dev/null
@@ -0,0 +1,120 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "librbd/JournalReplay.h"
+#include "librbd/AioCompletion.h"
+#include "librbd/AioImageRequest.h"
+#include "librbd/ImageCtx.h"
+#include "librbd/internal.h"
+
+#define dout_subsys ceph_subsys_rbd
+#undef dout_prefix
+#define dout_prefix *_dout << "librbd::JournalReplay: "
+
+namespace librbd {
+
+JournalReplay::JournalReplay(ImageCtx &image_ctx)
+  : m_image_ctx(image_ctx), m_lock("JournalReplay::m_lock"), m_ret_val(0) {
+}
+
+JournalReplay::~JournalReplay() {
+  assert(m_aio_completions.empty());
+}
+
+int JournalReplay::process(bufferlist::iterator it) {
+  CephContext *cct = m_image_ctx.cct;
+  ldout(cct, 20) << this << " " << __func__ << dendl;
+
+  RWLock::RLocker owner_lock(m_image_ctx.owner_lock);
+  journal::EventEntry event_entry;
+  try {
+    ::decode(event_entry, it);
+  } catch (const buffer::error &err) {
+    lderr(cct) << "failed to decode event entry: " << err.what() << dendl;
+    return -EINVAL;
+  }
+
+  boost::apply_visitor(EventVisitor(this), event_entry.event);
+  return 0;
+}
+
+int JournalReplay::flush() {
+  CephContext *cct = m_image_ctx.cct;
+  ldout(cct, 20) << this << " " << __func__ << dendl;
+
+  Mutex::Locker locker(m_lock);
+  while (!m_aio_completions.empty()) {
+    m_cond.Wait(m_lock);
+  }
+  return m_ret_val;
+}
+
+void JournalReplay::handle_event(const journal::AioDiscardEvent &event) {
+  CephContext *cct = m_image_ctx.cct;
+  ldout(cct, 20) << this << " " << __func__ << ": AIO discard event" << dendl;
+
+  AioCompletion *aio_comp = create_aio_completion();
+  AioImageRequest::aio_discard(&m_image_ctx, aio_comp, event.offset,
+                               event.length);
+}
+
+void JournalReplay::handle_event(const journal::AioWriteEvent &event) {
+  CephContext *cct = m_image_ctx.cct;
+  ldout(cct, 20) << this << " " << __func__ << ": AIO write event" << dendl;
+
+  bufferlist data = event.data;
+  AioCompletion *aio_comp = create_aio_completion();
+  AioImageRequest::aio_write(&m_image_ctx, aio_comp, event.offset, event.length,
+                             data.c_str(), 0);
+}
+
+void JournalReplay::handle_event(const journal::AioFlushEvent &event) {
+  CephContext *cct = m_image_ctx.cct;
+  ldout(cct, 20) << this << " " << __func__ << ": AIO flush event" << dendl;
+
+  AioCompletion *aio_comp = create_aio_completion();
+  AioImageRequest::aio_flush(&m_image_ctx, aio_comp);
+}
+
+void JournalReplay::handle_event(const journal::UnknownEvent &event) {
+  CephContext *cct = m_image_ctx.cct;
+  ldout(cct, 20) << this << " " << __func__ << ": unknown event" << dendl;
+}
+
+AioCompletion *JournalReplay::create_aio_completion() {
+  Mutex::Locker locker(m_lock);
+  AioCompletion *aio_comp = aio_create_completion_internal(
+    this, &aio_completion_callback);
+  m_aio_completions.insert(aio_comp);
+  return aio_comp;
+}
+
+void JournalReplay::handle_aio_completion(AioCompletion *aio_comp) {
+  Mutex::Locker locker(m_lock);
+
+  AioCompletions::iterator it = m_aio_completions.find(aio_comp);
+  assert(it != m_aio_completions.end());
+
+  int r = aio_comp->get_return_value();
+
+  CephContext *cct = m_image_ctx.cct;
+  ldout(cct, 20) << this << " " << __func__ << ": aio_comp=" << aio_comp << ", "
+                 << "r=" << r << dendl;
+
+  if (r < 0 && m_ret_val == 0) {
+    m_ret_val = r;
+  }
+
+  m_aio_completions.erase(it);
+  m_cond.Signal();
+}
+
+void JournalReplay::aio_completion_callback(completion_t cb, void *arg) {
+  JournalReplay *journal_replay = reinterpret_cast<JournalReplay *>(arg);
+  AioCompletion *aio_comp = reinterpret_cast<AioCompletion *>(cb);
+
+  journal_replay->handle_aio_completion(aio_comp);
+  aio_comp->release();
+}
+
+} // namespace librbd
diff --git a/src/librbd/JournalReplay.h b/src/librbd/JournalReplay.h
new file mode 100644 (file)
index 0000000..7b85713
--- /dev/null
@@ -0,0 +1,66 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_LIBRBD_JOURNAL_REPLAY_H
+#define CEPH_LIBRBD_JOURNAL_REPLAY_H
+
+#include "include/int_types.h"
+#include "include/buffer.h"
+#include "include/rbd/librbd.hpp"
+#include "common/Cond.h"
+#include "common/Mutex.h"
+#include "librbd/JournalTypes.h"
+#include <boost/variant.hpp>
+#include <set>
+
+namespace librbd {
+
+class AioCompletion;
+class ImageCtx;
+
+class JournalReplay {
+public:
+  JournalReplay(ImageCtx &image_ctx);
+  ~JournalReplay();
+
+  int process(bufferlist::iterator it);
+  int flush();
+
+private:
+  typedef std::set<AioCompletion *> AioCompletions;
+
+  struct EventVisitor : public boost::static_visitor<void> {
+    JournalReplay *journal_replay;
+
+    EventVisitor(JournalReplay *_journal_replay)
+      : journal_replay(_journal_replay) {
+    }
+
+    template <typename Event>
+    inline void operator()(const Event &event) const {
+      journal_replay->handle_event(event);
+    }
+  };
+
+  ImageCtx &m_image_ctx;
+
+  Mutex m_lock;
+  Cond m_cond;
+
+  AioCompletions m_aio_completions;
+  int m_ret_val;
+
+  void handle_event(const journal::AioDiscardEvent &event);
+  void handle_event(const journal::AioWriteEvent &event);
+  void handle_event(const journal::AioFlushEvent &event);
+  void handle_event(const journal::UnknownEvent &event);
+
+  AioCompletion *create_aio_completion();
+  void handle_aio_completion(AioCompletion *aio_comp);
+
+  static void aio_completion_callback(completion_t cb, void *arg);
+};
+
+} // namespace librbd
+
+#endif // CEPH_LIBRBD_JOURNAL_REPLAY_H
index 863531f786388586c745cb72c020537165aa7cc0..72268a7ef9bc74bf2cc13db523730b3e0df175ac 100644 (file)
@@ -24,6 +24,7 @@ librbd_internal_la_SOURCES = \
        librbd/ImageWatcher.cc \
        librbd/internal.cc \
        librbd/Journal.cc \
+       librbd/JournalReplay.cc \
        librbd/LibrbdAdminSocketHook.cc \
        librbd/LibrbdWriteback.cc \
        librbd/ObjectMap.cc \
@@ -69,6 +70,7 @@ noinst_HEADERS += \
        librbd/ImageWatcher.h \
        librbd/internal.h \
        librbd/Journal.h \
+       librbd/JournalReplay.h \
        librbd/JournalTypes.h \
        librbd/LibrbdAdminSocketHook.h \
        librbd/LibrbdWriteback.h \