object_extents);
}
- journaling = (m_image_ctx.journal != NULL);
+ journaling = (m_image_ctx.journal != NULL &&
+ !m_image_ctx.journal->is_journal_replaying());
}
assert(!m_image_ctx.image_watcher->is_lock_supported() ||
{
// journal the flush event
RWLock::RLocker snap_locker(m_image_ctx.snap_lock);
- if (m_image_ctx.journal != NULL) {
+ if (m_image_ctx.journal != NULL &&
+ !m_image_ctx.journal->is_journal_replaying()) {
uint64_t journal_tid = m_image_ctx.journal->append_event(
m_aio_comp, journal::EventEntry(journal::AioFlushEvent()),
AioObjectRequests(), 0, 0, false);
#include "librbd/AioImageRequestWQ.h"
#include "librbd/AioObjectRequest.h"
#include "librbd/ImageCtx.h"
+#include "librbd/JournalReplay.h"
#include "librbd/JournalTypes.h"
#include "journal/Journaler.h"
#include "journal/ReplayEntry.h"
: m_image_ctx(image_ctx), m_journaler(NULL),
m_lock("Journal::m_lock"), m_state(STATE_UNINITIALIZED),
m_lock_listener(this), m_replay_handler(this), m_close_pending(false),
- m_event_tid(0), m_blocking_writes(false) {
+ m_event_tid(0), m_blocking_writes(false), m_journal_replay(NULL) {
ldout(m_image_ctx.cct, 5) << this << ": ictx=" << &m_image_ctx << dendl;
Journal::~Journal() {
assert(m_journaler == NULL);
+ assert(m_journal_replay == NULL);
m_image_ctx.image_watcher->unregister_listener(&m_lock_listener);
!image_ctx.read_only && image_ctx.snap_id == CEPH_NOSNAP);
}
+bool Journal::is_journal_replaying() const {
+ Mutex::Locker locker(m_lock);
+ return (m_state == STATE_REPLAYING);
+}
+
int Journal::create(librados::IoCtx &io_ctx, const std::string &image_id) {
CephContext *cct = reinterpret_cast<CephContext *>(io_ctx.cct());
ldout(cct, 5) << __func__ << ": image=" << image_id << dendl;
assert(m_lock.is_locked());
+ delete m_journal_replay;
+ m_journal_replay = NULL;
+
m_close_pending = false;
m_image_ctx.op_work_queue->queue(new C_DestroyJournaler(m_journaler), 0);
m_journaler = NULL;
return;
}
+ m_journal_replay = new JournalReplay(m_image_ctx);
+
transition_state(STATE_REPLAYING);
m_journaler->start_replay(&m_replay_handler);
}
}
m_lock.Unlock();
- // TODO process the payload
+ bufferlist data = replay_entry.get_data();
+ bufferlist::iterator it = data.begin();
+ int r = m_journal_replay->process(it);
m_lock.Lock();
+
+ if (r < 0) {
+ // TODO
+ }
}
}
return;
}
+ if (r == 0) {
+ r = m_journal_replay->flush();
+ }
+ delete m_journal_replay;
+ m_journal_replay = NULL;
+
if (r < 0) {
lderr(cct) << this << " " << __func__ << ": r=" << r << dendl;
class AioCompletion;
class AioObjectRequest;
class ImageCtx;
+class JournalReplay;
+
namespace journal {
class EventEntry;
}
static int remove(librados::IoCtx &io_ctx, const std::string &image_id);
bool is_journal_ready() const;
+ bool is_journal_replaying() const;
void open();
int close();
ImageCtx &m_image_ctx;
::journal::Journaler *m_journaler;
-
mutable Mutex m_lock;
Cond m_cond;
State m_state;
bool m_blocking_writes;
+ JournalReplay *m_journal_replay;
+
::journal::Future wait_event(Mutex &lock, uint64_t tid, Context *on_safe);
void create_journaler();
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "librbd/JournalReplay.h"
+#include "librbd/AioCompletion.h"
+#include "librbd/AioImageRequest.h"
+#include "librbd/ImageCtx.h"
+#include "librbd/internal.h"
+
+#define dout_subsys ceph_subsys_rbd
+#undef dout_prefix
+#define dout_prefix *_dout << "librbd::JournalReplay: "
+
+namespace librbd {
+
+JournalReplay::JournalReplay(ImageCtx &image_ctx)
+ : m_image_ctx(image_ctx), m_lock("JournalReplay::m_lock"), m_ret_val(0) {
+}
+
+JournalReplay::~JournalReplay() {
+ assert(m_aio_completions.empty());
+}
+
+int JournalReplay::process(bufferlist::iterator it) {
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << this << " " << __func__ << dendl;
+
+ RWLock::RLocker owner_lock(m_image_ctx.owner_lock);
+ journal::EventEntry event_entry;
+ try {
+ ::decode(event_entry, it);
+ } catch (const buffer::error &err) {
+ lderr(cct) << "failed to decode event entry: " << err.what() << dendl;
+ return -EINVAL;
+ }
+
+ boost::apply_visitor(EventVisitor(this), event_entry.event);
+ return 0;
+}
+
+int JournalReplay::flush() {
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << this << " " << __func__ << dendl;
+
+ Mutex::Locker locker(m_lock);
+ while (!m_aio_completions.empty()) {
+ m_cond.Wait(m_lock);
+ }
+ return m_ret_val;
+}
+
+void JournalReplay::handle_event(const journal::AioDiscardEvent &event) {
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << this << " " << __func__ << ": AIO discard event" << dendl;
+
+ AioCompletion *aio_comp = create_aio_completion();
+ AioImageRequest::aio_discard(&m_image_ctx, aio_comp, event.offset,
+ event.length);
+}
+
+void JournalReplay::handle_event(const journal::AioWriteEvent &event) {
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << this << " " << __func__ << ": AIO write event" << dendl;
+
+ bufferlist data = event.data;
+ AioCompletion *aio_comp = create_aio_completion();
+ AioImageRequest::aio_write(&m_image_ctx, aio_comp, event.offset, event.length,
+ data.c_str(), 0);
+}
+
+void JournalReplay::handle_event(const journal::AioFlushEvent &event) {
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << this << " " << __func__ << ": AIO flush event" << dendl;
+
+ AioCompletion *aio_comp = create_aio_completion();
+ AioImageRequest::aio_flush(&m_image_ctx, aio_comp);
+}
+
+void JournalReplay::handle_event(const journal::UnknownEvent &event) {
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << this << " " << __func__ << ": unknown event" << dendl;
+}
+
+AioCompletion *JournalReplay::create_aio_completion() {
+ Mutex::Locker locker(m_lock);
+ AioCompletion *aio_comp = aio_create_completion_internal(
+ this, &aio_completion_callback);
+ m_aio_completions.insert(aio_comp);
+ return aio_comp;
+}
+
+void JournalReplay::handle_aio_completion(AioCompletion *aio_comp) {
+ Mutex::Locker locker(m_lock);
+
+ AioCompletions::iterator it = m_aio_completions.find(aio_comp);
+ assert(it != m_aio_completions.end());
+
+ int r = aio_comp->get_return_value();
+
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << this << " " << __func__ << ": aio_comp=" << aio_comp << ", "
+ << "r=" << r << dendl;
+
+ if (r < 0 && m_ret_val == 0) {
+ m_ret_val = r;
+ }
+
+ m_aio_completions.erase(it);
+ m_cond.Signal();
+}
+
+void JournalReplay::aio_completion_callback(completion_t cb, void *arg) {
+ JournalReplay *journal_replay = reinterpret_cast<JournalReplay *>(arg);
+ AioCompletion *aio_comp = reinterpret_cast<AioCompletion *>(cb);
+
+ journal_replay->handle_aio_completion(aio_comp);
+ aio_comp->release();
+}
+
+} // namespace librbd
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_LIBRBD_JOURNAL_REPLAY_H
+#define CEPH_LIBRBD_JOURNAL_REPLAY_H
+
+#include "include/int_types.h"
+#include "include/buffer.h"
+#include "include/rbd/librbd.hpp"
+#include "common/Cond.h"
+#include "common/Mutex.h"
+#include "librbd/JournalTypes.h"
+#include <boost/variant.hpp>
+#include <set>
+
+namespace librbd {
+
+class AioCompletion;
+class ImageCtx;
+
+class JournalReplay {
+public:
+ JournalReplay(ImageCtx &image_ctx);
+ ~JournalReplay();
+
+ int process(bufferlist::iterator it);
+ int flush();
+
+private:
+ typedef std::set<AioCompletion *> AioCompletions;
+
+ struct EventVisitor : public boost::static_visitor<void> {
+ JournalReplay *journal_replay;
+
+ EventVisitor(JournalReplay *_journal_replay)
+ : journal_replay(_journal_replay) {
+ }
+
+ template <typename Event>
+ inline void operator()(const Event &event) const {
+ journal_replay->handle_event(event);
+ }
+ };
+
+ ImageCtx &m_image_ctx;
+
+ Mutex m_lock;
+ Cond m_cond;
+
+ AioCompletions m_aio_completions;
+ int m_ret_val;
+
+ void handle_event(const journal::AioDiscardEvent &event);
+ void handle_event(const journal::AioWriteEvent &event);
+ void handle_event(const journal::AioFlushEvent &event);
+ void handle_event(const journal::UnknownEvent &event);
+
+ AioCompletion *create_aio_completion();
+ void handle_aio_completion(AioCompletion *aio_comp);
+
+ static void aio_completion_callback(completion_t cb, void *arg);
+};
+
+} // namespace librbd
+
+#endif // CEPH_LIBRBD_JOURNAL_REPLAY_H
librbd/ImageWatcher.cc \
librbd/internal.cc \
librbd/Journal.cc \
+ librbd/JournalReplay.cc \
librbd/LibrbdAdminSocketHook.cc \
librbd/LibrbdWriteback.cc \
librbd/ObjectMap.cc \
librbd/ImageWatcher.h \
librbd/internal.h \
librbd/Journal.h \
+ librbd/JournalReplay.h \
librbd/JournalTypes.h \
librbd/LibrbdAdminSocketHook.h \
librbd/LibrbdWriteback.h \