From: Jason Dillaman Date: Wed, 22 Jul 2015 02:49:04 +0000 (-0400) Subject: librbd: add replay support for IO events X-Git-Tag: v10.0.1~52^2~14 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=e34cd8005e17479a3e3ee4e91d601cafd90d04b1;p=ceph.git librbd: add replay support for IO events Signed-off-by: Jason Dillaman --- diff --git a/src/librbd/AioImageRequest.cc b/src/librbd/AioImageRequest.cc index daa99ecad9b3..f594f61f9cad 100644 --- a/src/librbd/AioImageRequest.cc +++ b/src/librbd/AioImageRequest.cc @@ -237,7 +237,8 @@ void AbstractAioImageWrite::send_request() { object_extents); } - journaling = (m_image_ctx.journal != NULL); + journaling = (m_image_ctx.journal != NULL && + !m_image_ctx.journal->is_journal_replaying()); } assert(!m_image_ctx.image_watcher->is_lock_supported() || @@ -417,7 +418,8 @@ void AioImageFlush::send_request() { { // journal the flush event RWLock::RLocker snap_locker(m_image_ctx.snap_lock); - if (m_image_ctx.journal != NULL) { + if (m_image_ctx.journal != NULL && + !m_image_ctx.journal->is_journal_replaying()) { uint64_t journal_tid = m_image_ctx.journal->append_event( m_aio_comp, journal::EventEntry(journal::AioFlushEvent()), AioObjectRequests(), 0, 0, false); diff --git a/src/librbd/Journal.cc b/src/librbd/Journal.cc index a6a1394082da..961835733048 100644 --- a/src/librbd/Journal.cc +++ b/src/librbd/Journal.cc @@ -6,6 +6,7 @@ #include "librbd/AioImageRequestWQ.h" #include "librbd/AioObjectRequest.h" #include "librbd/ImageCtx.h" +#include "librbd/JournalReplay.h" #include "librbd/JournalTypes.h" #include "journal/Journaler.h" #include "journal/ReplayEntry.h" @@ -37,7 +38,7 @@ Journal::Journal(ImageCtx &image_ctx) : m_image_ctx(image_ctx), m_journaler(NULL), m_lock("Journal::m_lock"), m_state(STATE_UNINITIALIZED), m_lock_listener(this), m_replay_handler(this), m_close_pending(false), - m_event_tid(0), m_blocking_writes(false) { + m_event_tid(0), m_blocking_writes(false), m_journal_replay(NULL) { ldout(m_image_ctx.cct, 5) << this << ": ictx=" << &m_image_ctx << dendl; @@ -49,6 +50,7 @@ Journal::Journal(ImageCtx &image_ctx) Journal::~Journal() { assert(m_journaler == NULL); + assert(m_journal_replay == NULL); m_image_ctx.image_watcher->unregister_listener(&m_lock_listener); @@ -62,6 +64,11 @@ bool Journal::is_journal_supported(ImageCtx &image_ctx) { !image_ctx.read_only && image_ctx.snap_id == CEPH_NOSNAP); } +bool Journal::is_journal_replaying() const { + Mutex::Locker locker(m_lock); + return (m_state == STATE_REPLAYING); +} + int Journal::create(librados::IoCtx &io_ctx, const std::string &image_id) { CephContext *cct = reinterpret_cast(io_ctx.cct()); ldout(cct, 5) << __func__ << ": image=" << image_id << dendl; @@ -314,6 +321,9 @@ void Journal::destroy_journaler() { assert(m_lock.is_locked()); + delete m_journal_replay; + m_journal_replay = NULL; + m_close_pending = false; m_image_ctx.op_work_queue->queue(new C_DestroyJournaler(m_journaler), 0); m_journaler = NULL; @@ -352,6 +362,8 @@ void Journal::handle_initialized(int r) { return; } + m_journal_replay = new JournalReplay(m_image_ctx); + transition_state(STATE_REPLAYING); m_journaler->start_replay(&m_replay_handler); } @@ -378,8 +390,14 @@ void Journal::handle_replay_ready() { } m_lock.Unlock(); - // TODO process the payload + bufferlist data = replay_entry.get_data(); + bufferlist::iterator it = data.begin(); + int r = m_journal_replay->process(it); m_lock.Lock(); + + if (r < 0) { + // TODO + } } } @@ -392,6 +410,12 @@ void Journal::handle_replay_complete(int r) { return; } + if (r == 0) { + r = m_journal_replay->flush(); + } + delete m_journal_replay; + m_journal_replay = NULL; + if (r < 0) { lderr(cct) << this << " " << __func__ << ": r=" << r << dendl; diff --git a/src/librbd/Journal.h b/src/librbd/Journal.h index 4ac2bee97c62..e0e68a8ca98f 100644 --- a/src/librbd/Journal.h +++ b/src/librbd/Journal.h @@ -28,6 +28,8 @@ namespace librbd { class AioCompletion; class AioObjectRequest; class ImageCtx; +class JournalReplay; + namespace journal { class EventEntry; } @@ -44,6 +46,7 @@ public: static int remove(librados::IoCtx &io_ctx, const std::string &image_id); bool is_journal_ready() const; + bool is_journal_replaying() const; void open(); int close(); @@ -157,7 +160,6 @@ private: ImageCtx &m_image_ctx; ::journal::Journaler *m_journaler; - mutable Mutex m_lock; Cond m_cond; State m_state; @@ -172,6 +174,8 @@ private: bool m_blocking_writes; + JournalReplay *m_journal_replay; + ::journal::Future wait_event(Mutex &lock, uint64_t tid, Context *on_safe); void create_journaler(); diff --git a/src/librbd/JournalReplay.cc b/src/librbd/JournalReplay.cc new file mode 100644 index 000000000000..7daf10c254fc --- /dev/null +++ b/src/librbd/JournalReplay.cc @@ -0,0 +1,120 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "librbd/JournalReplay.h" +#include "librbd/AioCompletion.h" +#include "librbd/AioImageRequest.h" +#include "librbd/ImageCtx.h" +#include "librbd/internal.h" + +#define dout_subsys ceph_subsys_rbd +#undef dout_prefix +#define dout_prefix *_dout << "librbd::JournalReplay: " + +namespace librbd { + +JournalReplay::JournalReplay(ImageCtx &image_ctx) + : m_image_ctx(image_ctx), m_lock("JournalReplay::m_lock"), m_ret_val(0) { +} + +JournalReplay::~JournalReplay() { + assert(m_aio_completions.empty()); +} + +int JournalReplay::process(bufferlist::iterator it) { + CephContext *cct = m_image_ctx.cct; + ldout(cct, 20) << this << " " << __func__ << dendl; + + RWLock::RLocker owner_lock(m_image_ctx.owner_lock); + journal::EventEntry event_entry; + try { + ::decode(event_entry, it); + } catch (const buffer::error &err) { + lderr(cct) << "failed to decode event entry: " << err.what() << dendl; + return -EINVAL; + } + + boost::apply_visitor(EventVisitor(this), event_entry.event); + return 0; +} + +int JournalReplay::flush() { + CephContext *cct = m_image_ctx.cct; + ldout(cct, 20) << this << " " << __func__ << dendl; + + Mutex::Locker locker(m_lock); + while (!m_aio_completions.empty()) { + m_cond.Wait(m_lock); + } + return m_ret_val; +} + +void JournalReplay::handle_event(const journal::AioDiscardEvent &event) { + CephContext *cct = m_image_ctx.cct; + ldout(cct, 20) << this << " " << __func__ << ": AIO discard event" << dendl; + + AioCompletion *aio_comp = create_aio_completion(); + AioImageRequest::aio_discard(&m_image_ctx, aio_comp, event.offset, + event.length); +} + +void JournalReplay::handle_event(const journal::AioWriteEvent &event) { + CephContext *cct = m_image_ctx.cct; + ldout(cct, 20) << this << " " << __func__ << ": AIO write event" << dendl; + + bufferlist data = event.data; + AioCompletion *aio_comp = create_aio_completion(); + AioImageRequest::aio_write(&m_image_ctx, aio_comp, event.offset, event.length, + data.c_str(), 0); +} + +void JournalReplay::handle_event(const journal::AioFlushEvent &event) { + CephContext *cct = m_image_ctx.cct; + ldout(cct, 20) << this << " " << __func__ << ": AIO flush event" << dendl; + + AioCompletion *aio_comp = create_aio_completion(); + AioImageRequest::aio_flush(&m_image_ctx, aio_comp); +} + +void JournalReplay::handle_event(const journal::UnknownEvent &event) { + CephContext *cct = m_image_ctx.cct; + ldout(cct, 20) << this << " " << __func__ << ": unknown event" << dendl; +} + +AioCompletion *JournalReplay::create_aio_completion() { + Mutex::Locker locker(m_lock); + AioCompletion *aio_comp = aio_create_completion_internal( + this, &aio_completion_callback); + m_aio_completions.insert(aio_comp); + return aio_comp; +} + +void JournalReplay::handle_aio_completion(AioCompletion *aio_comp) { + Mutex::Locker locker(m_lock); + + AioCompletions::iterator it = m_aio_completions.find(aio_comp); + assert(it != m_aio_completions.end()); + + int r = aio_comp->get_return_value(); + + CephContext *cct = m_image_ctx.cct; + ldout(cct, 20) << this << " " << __func__ << ": aio_comp=" << aio_comp << ", " + << "r=" << r << dendl; + + if (r < 0 && m_ret_val == 0) { + m_ret_val = r; + } + + m_aio_completions.erase(it); + m_cond.Signal(); +} + +void JournalReplay::aio_completion_callback(completion_t cb, void *arg) { + JournalReplay *journal_replay = reinterpret_cast(arg); + AioCompletion *aio_comp = reinterpret_cast(cb); + + journal_replay->handle_aio_completion(aio_comp); + aio_comp->release(); +} + +} // namespace librbd diff --git a/src/librbd/JournalReplay.h b/src/librbd/JournalReplay.h new file mode 100644 index 000000000000..7b857138872d --- /dev/null +++ b/src/librbd/JournalReplay.h @@ -0,0 +1,66 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_LIBRBD_JOURNAL_REPLAY_H +#define CEPH_LIBRBD_JOURNAL_REPLAY_H + +#include "include/int_types.h" +#include "include/buffer.h" +#include "include/rbd/librbd.hpp" +#include "common/Cond.h" +#include "common/Mutex.h" +#include "librbd/JournalTypes.h" +#include +#include + +namespace librbd { + +class AioCompletion; +class ImageCtx; + +class JournalReplay { +public: + JournalReplay(ImageCtx &image_ctx); + ~JournalReplay(); + + int process(bufferlist::iterator it); + int flush(); + +private: + typedef std::set AioCompletions; + + struct EventVisitor : public boost::static_visitor { + JournalReplay *journal_replay; + + EventVisitor(JournalReplay *_journal_replay) + : journal_replay(_journal_replay) { + } + + template + inline void operator()(const Event &event) const { + journal_replay->handle_event(event); + } + }; + + ImageCtx &m_image_ctx; + + Mutex m_lock; + Cond m_cond; + + AioCompletions m_aio_completions; + int m_ret_val; + + void handle_event(const journal::AioDiscardEvent &event); + void handle_event(const journal::AioWriteEvent &event); + void handle_event(const journal::AioFlushEvent &event); + void handle_event(const journal::UnknownEvent &event); + + AioCompletion *create_aio_completion(); + void handle_aio_completion(AioCompletion *aio_comp); + + static void aio_completion_callback(completion_t cb, void *arg); +}; + +} // namespace librbd + +#endif // CEPH_LIBRBD_JOURNAL_REPLAY_H diff --git a/src/librbd/Makefile.am b/src/librbd/Makefile.am index 863531f78638..72268a7ef9bc 100644 --- a/src/librbd/Makefile.am +++ b/src/librbd/Makefile.am @@ -24,6 +24,7 @@ librbd_internal_la_SOURCES = \ librbd/ImageWatcher.cc \ librbd/internal.cc \ librbd/Journal.cc \ + librbd/JournalReplay.cc \ librbd/LibrbdAdminSocketHook.cc \ librbd/LibrbdWriteback.cc \ librbd/ObjectMap.cc \ @@ -69,6 +70,7 @@ noinst_HEADERS += \ librbd/ImageWatcher.h \ librbd/internal.h \ librbd/Journal.h \ + librbd/JournalReplay.h \ librbd/JournalTypes.h \ librbd/LibrbdAdminSocketHook.h \ librbd/LibrbdWriteback.h \