#include "librbd/ImageCtx.h"
#include "librbd/JournalReplay.h"
#include "librbd/JournalTypes.h"
+#include "librbd/Utils.h"
#include "journal/Journaler.h"
#include "journal/ReplayEntry.h"
#include "common/errno.h"
const std::string CLIENT_DESCRIPTION = "master image";
-struct C_DestroyJournaler : public Context {
- ::journal::Journaler *journaler;
-
- C_DestroyJournaler(::journal::Journaler *_journaler) : journaler(_journaler) {
- }
- virtual void finish(int r) {
- delete journaler;
- }
-};
-
struct SetOpRequestTid : public boost::static_visitor<void> {
uint64_t tid;
Journal::Journal(ImageCtx &image_ctx)
: m_image_ctx(image_ctx), m_journaler(NULL),
m_lock("Journal::m_lock"), m_state(STATE_UNINITIALIZED),
- m_replay_handler(this), m_close_pending(false),
+ m_error_result(0), m_replay_handler(this), m_close_pending(false),
m_event_lock("Journal::m_event_lock"), m_event_tid(0),
m_blocking_writes(false), m_journal_replay(NULL) {
ldout(m_image_ctx.cct, 5) << this << ": ictx=" << &m_image_ctx << dendl;
-
- Mutex::Locker locker(m_lock);
- block_writes();
}
Journal::~Journal() {
- m_image_ctx.op_work_queue->drain();
+ assert(m_state == STATE_UNINITIALIZED || m_state == STATE_CLOSED);
assert(m_journaler == NULL);
assert(m_journal_replay == NULL);
assert(m_wait_for_state_contexts.empty());
-
- Mutex::Locker locker(m_lock);
- unblock_writes();
}
bool Journal::is_journal_supported(ImageCtx &image_ctx) {
bool Journal::is_journal_ready() const {
Mutex::Locker locker(m_lock);
- return (m_state == STATE_RECORDING);
+ return (m_state == STATE_READY);
}
bool Journal::is_journal_replaying() const {
}
void Journal::wait_for_journal_ready(Context *on_ready) {
- Mutex::Locker locker(m_lock);
- schedule_wait_for_ready(on_ready);
-}
+ on_ready = util::create_async_context_callback(m_image_ctx, on_ready);
-void Journal::wait_for_journal_ready() {
Mutex::Locker locker(m_lock);
- while (m_state != STATE_RECORDING) {
- wait_for_state_transition();
+ if (m_state == STATE_READY) {
+ on_ready->complete(m_error_result);
+ } else {
+ wait_for_steady_state(on_ready);
}
}
-void Journal::open() {
- Mutex::Locker locker(m_lock);
- if (m_journaler != NULL) {
- return;
- }
-
+void Journal::open(Context *on_finish) {
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << this << " " << __func__ << dendl;
- create_journaler();
-}
-void Journal::open(Context *on_finish) {
- open();
- wait_for_journal_ready(on_finish);
+ on_finish = util::create_async_context_callback(m_image_ctx, on_finish);
+
+ Mutex::Locker locker(m_lock);
+ assert(m_state == STATE_UNINITIALIZED);
+ wait_for_steady_state(on_finish);
+ create_journaler();
}
-int Journal::close() {
+void Journal::close(Context *on_finish) {
CephContext *cct = m_image_ctx.cct;
- ldout(cct, 20) << this << " " << __func__ << ": state=" << m_state << dendl;
+ ldout(cct, 20) << this << " " << __func__ << dendl;
+
+ on_finish = util::create_async_context_callback(m_image_ctx, on_finish);
Mutex::Locker locker(m_lock);
- if (m_state == STATE_UNINITIALIZED) {
- return 0;
+ assert(m_state != STATE_UNINITIALIZED);
+ if (m_state == STATE_CLOSED) {
+ on_finish->complete(m_error_result);
+ return;
}
- int r;
- bool done = false;
- while (!done) {
- switch (m_state) {
- case STATE_UNINITIALIZED:
- done = true;
- break;
- case STATE_INITIALIZING:
- case STATE_REPLAYING:
- m_close_pending = true;
- wait_for_state_transition();
- break;
- case STATE_STOPPING_RECORDING:
- wait_for_state_transition();
- break;
- case STATE_RECORDING:
- r = stop_recording();
- if (r < 0) {
- return r;
- }
- done = true;
- break;
- default:
- assert(false);
- }
+ if (m_state == STATE_READY) {
+ stop_recording();
}
- destroy_journaler();
- return 0;
-}
-
-void Journal::close(Context *on_finish) {
- // TODO
- assert(false);
+ m_close_pending = true;
+ wait_for_steady_state(on_finish);
}
uint64_t Journal::append_io_event(AioCompletion *aio_comp,
uint64_t tid;
{
Mutex::Locker locker(m_lock);
- assert(m_state == STATE_RECORDING);
+ assert(m_state == STATE_READY);
future = m_journaler->append("", bl);
uint64_t tid;
{
Mutex::Locker locker(m_lock);
- assert(m_state == STATE_RECORDING);
+ assert(m_state == STATE_READY);
Mutex::Locker event_locker(m_event_lock);
tid = ++m_event_tid;
{
Mutex::Locker locker(m_lock);
- assert(m_state == STATE_RECORDING);
+ assert(m_state == STATE_READY);
m_journaler->committed(m_journaler->append("", bl));
}
ldout(cct, 20) << this << " " << __func__ << dendl;
assert(m_lock.is_locked());
- assert(m_state == STATE_UNINITIALIZED);
+ assert(m_state == STATE_UNINITIALIZED || m_state == STATE_RESTARTING_REPLAY);
assert(m_journaler == NULL);
- m_close_pending = false;
+ transition_state(STATE_INITIALIZING, 0);
m_journaler = new ::journal::Journaler(m_image_ctx.md_ctx, m_image_ctx.id, "",
m_image_ctx.journal_commit_age);
m_journaler->init(new C_InitJournal(this));
- transition_state(STATE_INITIALIZING);
}
-void Journal::destroy_journaler() {
+void Journal::destroy_journaler(int r) {
CephContext *cct = m_image_ctx.cct;
- ldout(cct, 20) << this << " " << __func__ << dendl;
+ ldout(cct, 20) << this << " " << __func__ << ": r=" << r << dendl;
assert(m_lock.is_locked());
delete m_journal_replay;
m_journal_replay = NULL;
- m_close_pending = false;
- m_image_ctx.op_work_queue->queue(new C_DestroyJournaler(m_journaler), 0);
- m_journaler = NULL;
+ transition_state(STATE_CLOSING, r);
+ m_image_ctx.op_work_queue->queue(new C_DestroyJournaler(this), 0);
+}
+
+void Journal::recreate_journaler(int r) {
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << this << " " << __func__ << ": r=" << r << dendl;
+
+ assert(m_lock.is_locked());
+ assert(m_state == STATE_REPLAYING);
+
+ delete m_journal_replay;
+ m_journal_replay = NULL;
- transition_state(STATE_UNINITIALIZED);
+ transition_state(STATE_RESTARTING_REPLAY, r);
+ m_image_ctx.op_work_queue->queue(new C_DestroyJournaler(this), 0);
}
void Journal::complete_event(Events::iterator it, int r) {
assert(m_event_lock.is_locked());
- assert(m_state == STATE_RECORDING);
+ assert(m_state == STATE_READY);
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << this << " " << __func__ << ": tid=" << it->first << " "
void Journal::handle_initialized(int r) {
CephContext *cct = m_image_ctx.cct;
- if (r < 0) {
- lderr(cct) << this << " " << __func__ << ": r=" << r << dendl;
- Mutex::Locker locker(m_lock);
-
- // TODO: failed to open journal -- retry?
- destroy_journaler();
- create_journaler();
- return;
- }
+ ldout(cct, 20) << this << " " << __func__ << ": r=" << r << dendl;
- ldout(cct, 20) << this << " " << __func__ << dendl;
Mutex::Locker locker(m_lock);
- if (m_close_pending) {
- destroy_journaler();
+
+ if (r < 0) {
+ lderr(cct) << this << " " << __func__
+ << "failed to initialize journal: " << cpp_strerror(r)
+ << dendl;
+ destroy_journaler(r);
return;
}
- ldout(cct, 20) << __func__ << ": Journaler" << *m_journaler << dendl;
-
+ transition_state(STATE_REPLAYING, 0);
m_journal_replay = new JournalReplay(m_image_ctx);
-
- transition_state(STATE_REPLAYING);
m_journaler->start_replay(&m_replay_handler);
}
}
while (true) {
- if (m_close_pending) {
- m_journaler->stop_replay();
- destroy_journaler();
- return;
- }
-
::journal::ReplayEntry replay_entry;
if (!m_journaler->try_pop_front(&replay_entry)) {
return;
m_lock.Unlock();
bufferlist data = replay_entry.get_data();
bufferlist::iterator it = data.begin();
- int r = m_journal_replay->process(it, new C_ReplayCommitted(m_journaler,
- std::move(replay_entry)));
+ int r = m_journal_replay->process(
+ it, new C_ReplayCommitted(m_journaler, std::move(replay_entry)));
m_lock.Lock();
if (r < 0) {
- // TODO
+ lderr(cct) << "failed to replay journal entry: " << cpp_strerror(r)
+ << dendl;
+ m_journaler->stop_replay();
+
+ if (m_close_pending) {
+ destroy_journaler(r);
+ return;
+ }
+
+ recreate_journaler(r);
}
}
}
return;
}
+ ldout(cct, 20) << this << " " << __func__ << dendl;
+ m_journaler->stop_replay();
+
if (r == 0) {
r = m_journal_replay->flush();
}
- delete m_journal_replay;
- m_journal_replay = NULL;
if (r < 0) {
lderr(cct) << this << " " << __func__ << ": r=" << r << dendl;
-
- // TODO: failed to replay journal -- retry?
- destroy_journaler();
- create_journaler();
+ recreate_journaler(r);
return;
}
- ldout(cct, 20) << this << " " << __func__ << dendl;
- m_journaler->stop_replay();
+ delete m_journal_replay;
+ m_journal_replay = NULL;
if (m_close_pending) {
- destroy_journaler();
+ destroy_journaler(0);
return;
}
+ m_error_result = 0;
m_journaler->start_append(m_image_ctx.journal_object_flush_interval,
m_image_ctx.journal_object_flush_bytes,
m_image_ctx.journal_object_flush_age);
- transition_state(STATE_RECORDING);
+ transition_state(STATE_READY, 0);
+ }
+}
+
+void Journal::handle_recording_stopped(int r) {
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << this << " " << __func__ << ": r=" << r << dendl;
+
+ Mutex::Locker locker(m_lock);
+ assert(m_state == STATE_STOPPING);
+
+ destroy_journaler(r);
+}
- unblock_writes();
+void Journal::handle_journal_destroyed(int r) {
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << this << " " << __func__ << ": r=" << r << dendl;
+
+ if (r < 0) {
+ lderr(cct) << this << " " << __func__
+ << "error detected while closing journal: " << cpp_strerror(r)
+ << dendl;
+ }
+
+ Mutex::Locker locker(m_lock);
+ delete m_journaler;
+ m_journaler = nullptr;
+
+ assert(m_state == STATE_CLOSING || m_state == STATE_RESTARTING_REPLAY);
+ if (m_state == STATE_RESTARTING_REPLAY) {
+ create_journaler();
+ return;
}
+
+ transition_state(STATE_CLOSED, r);
}
void Journal::handle_event_safe(int r, uint64_t tid) {
} else {
// send any waiting aio requests now that journal entry is safe
RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
- assert(m_image_ctx.exclusive_lock->is_lock_owner());
-
for (AioObjectRequests::iterator it = aio_object_requests.begin();
it != aio_object_requests.end(); ++it) {
(*it)->send();
}
}
-int Journal::stop_recording() {
+void Journal::stop_recording() {
assert(m_lock.is_locked());
assert(m_journaler != NULL);
- transition_state(STATE_STOPPING_RECORDING);
-
- C_SaferCond cond;
- m_lock.Unlock();
- m_journaler->stop_append(&cond);
- int r = cond.wait();
- m_lock.Lock();
+ assert(m_state == STATE_READY);
+ transition_state(STATE_STOPPING, 0);
- destroy_journaler();
- if (r < 0) {
- lderr(m_image_ctx.cct) << "failed to flush journal: " << cpp_strerror(r)
- << dendl;
- return r;
- }
- return 0;
+ m_journaler->stop_append(util::create_async_context_callback(
+ m_image_ctx, new C_StopRecording(this)));
}
void Journal::block_writes() {
}
}
-void Journal::flush_journal() {
- assert(m_lock.is_locked());
-
- CephContext *cct = m_image_ctx.cct;
- ldout(cct, 20) << this << " " << __func__ << dendl;
-
- m_lock.Unlock();
- C_SaferCond cond_ctx;
- m_journaler->flush(&cond_ctx);
- cond_ctx.wait();
- m_lock.Lock();
-}
-
-void Journal::transition_state(State state) {
+void Journal::transition_state(State state, int r) {
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << this << " " << __func__ << ": new state=" << state << dendl;
assert(m_lock.is_locked());
m_state = state;
- m_cond.Signal();
- Contexts wait_for_state_contexts;
- wait_for_state_contexts.swap(m_wait_for_state_contexts);
- for (Contexts::iterator it = wait_for_state_contexts.begin();
- it != wait_for_state_contexts.end(); ++it) {
- (*it)->complete(0);
+ if (m_error_result == 0 && r < 0) {
+ m_error_result = r;
}
-}
-void Journal::wait_for_state_transition() {
- assert(m_lock.is_locked());
- State state = m_state;
- while (m_state == state) {
- m_cond.Wait(m_lock);
+ if (is_steady_state()) {
+ Contexts wait_for_state_contexts(std::move(m_wait_for_state_contexts));
+ for (auto ctx : wait_for_state_contexts) {
+ ctx->complete(m_error_result);
+ }
}
}
-void Journal::schedule_wait_for_ready(Context *on_ready) {
- CephContext *cct = m_image_ctx.cct;
- ldout(cct, 20) << this << __func__ << ": on_ready=" << on_ready << dendl;
-
+bool Journal::is_steady_state() const {
assert(m_lock.is_locked());
- m_wait_for_state_contexts.push_back(new C_WaitForReady(this, on_ready));
+ switch (m_state) {
+ case STATE_READY:
+ case STATE_CLOSED:
+ return true;
+ case STATE_UNINITIALIZED:
+ case STATE_INITIALIZING:
+ case STATE_REPLAYING:
+ case STATE_RESTARTING_REPLAY:
+ case STATE_STOPPING:
+ case STATE_CLOSING:
+ break;
+ }
+ return false;
}
-void Journal::handle_wait_for_ready(Context *on_ready) {
+void Journal::wait_for_steady_state(Context *on_state) {
assert(m_lock.is_locked());
- CephContext *cct = m_image_ctx.cct;
- ldout(cct, 20) << this << " " << __func__ << ": on_ready=" << on_ready << ", "
- << "state=" << m_state << dendl;
+ assert(!is_steady_state());
- if (m_state == STATE_RECORDING) {
- m_image_ctx.op_work_queue->queue(on_ready, 0);
- } else {
- schedule_wait_for_ready(on_ready);
- }
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << this << " " << __func__ << ": on_state=" << on_state
+ << dendl;
+ m_wait_for_state_contexts.push_back(on_state);
}
std::ostream &operator<<(std::ostream &os, const Journal::State &state) {
case Journal::STATE_REPLAYING:
os << "Replaying";
break;
- case Journal::STATE_RECORDING:
- os << "Recording";
+ case Journal::STATE_RESTARTING_REPLAY:
+ os << "RestartingReplay";
+ break;
+ case Journal::STATE_READY:
+ os << "Ready";
+ break;
+ case Journal::STATE_STOPPING:
+ os << "Stopping";
+ break;
+ case Journal::STATE_CLOSING:
+ os << "Closing";
break;
- case Journal::STATE_STOPPING_RECORDING:
- os << "StoppingRecording";
+ case Journal::STATE_CLOSED:
+ os << "Closed";
break;
default:
os << "Unknown (" << static_cast<uint32_t>(state) << ")";
#include "include/unordered_map.h"
#include "include/rados/librados.hpp"
#include "common/Mutex.h"
-#include "common/Cond.h"
#include "journal/Future.h"
#include "journal/ReplayHandler.h"
#include <algorithm>
bool is_journal_replaying() const;
void wait_for_journal_ready(Context *on_ready);
- void wait_for_journal_ready();
- void open(); // TODO remove
void open(Context *on_finish);
- int close(); // TODO remove
void close(Context *on_finish);
uint64_t append_io_event(AioCompletion *aio_comp,
typedef std::list<Context *> Contexts;
typedef interval_set<uint64_t> ExtentInterval;
+ /**
+ * @verbatim
+ *
+ * <start>
+ * |
+ * v
+ * UNINITIALIZED ---> INITIALIZING ---> REPLAYING ------> READY
+ * | * . ^ * . |
+ * | * . | * . |
+ * | * . | (error) * . . . . |
+ * | * . | * . |
+ * | * . | v . v
+ * | * . | RESTARTING . STOPPING
+ * | * . | | . |
+ * | * . | | . |
+ * | * * * * * * . \-------------/ . |
+ * | * (error) . . |
+ * | * . . . . . . . . . . . . . |
+ * | * . . |
+ * | v v v |
+ * | CLOSED <----- CLOSING <-------------------------/
+ * | |
+ * | v
+ * \---> <finish>
+ *
+ * @endverbatim
+ */
enum State {
STATE_UNINITIALIZED,
STATE_INITIALIZING,
STATE_REPLAYING,
- STATE_RECORDING,
- STATE_STOPPING_RECORDING
+ STATE_RESTARTING_REPLAY,
+ STATE_READY,
+ STATE_STOPPING,
+ STATE_CLOSING,
+ STATE_CLOSED
};
struct Event {
}
};
- struct C_EventSafe : public Context {
+ struct C_StopRecording : public Context {
Journal *journal;
- uint64_t tid;
- C_EventSafe(Journal *_journal, uint64_t _tid)
- : journal(_journal), tid(_tid) {
+ C_StopRecording(Journal *_journal) : journal(_journal) {
}
virtual void finish(int r) {
- journal->handle_event_safe(r, tid);
+ journal->handle_recording_stopped(r);
}
};
- struct C_WaitForReady : public Context {
+ struct C_DestroyJournaler : public Context {
Journal *journal;
- Context *on_ready;
- C_WaitForReady(Journal *_journal, Context *_on_ready)
- : journal(_journal), on_ready(_on_ready) {
+ C_DestroyJournaler(Journal *_journal) : journal(_journal) {
}
virtual void finish(int r) {
- journal->handle_wait_for_ready(on_ready);
+ journal->handle_journal_destroyed(r);
+ }
+ };
+
+ struct C_EventSafe : public Context {
+ Journal *journal;
+ uint64_t tid;
+
+ C_EventSafe(Journal *_journal, uint64_t _tid)
+ : journal(_journal), tid(_tid) {
+ }
+
+ virtual void finish(int r) {
+ journal->handle_event_safe(r, tid);
}
};
::journal::Journaler *m_journaler;
mutable Mutex m_lock;
- Cond m_cond;
State m_state;
+ int m_error_result;
Contexts m_wait_for_state_contexts;
ReplayHandler m_replay_handler;
::journal::Future wait_event(Mutex &lock, uint64_t tid, Context *on_safe);
void create_journaler();
- void destroy_journaler();
+ void destroy_journaler(int r);
+ void recreate_journaler(int r);
void complete_event(Events::iterator it, int r);
void handle_replay_ready();
void handle_replay_complete(int r);
+ void handle_recording_stopped(int r);
+
+ void handle_journal_destroyed(int r);
+
void handle_event_safe(int r, uint64_t tid);
- int stop_recording();
+ void stop_recording();
void block_writes();
void unblock_writes();
- void flush_journal();
- void transition_state(State state);
- void wait_for_state_transition();
- void schedule_wait_for_ready(Context *on_ready);
- void handle_wait_for_ready(Context *on_ready);
+ void transition_state(State state, int r);
+
+ bool is_steady_state() const;
+ void wait_for_steady_state(Context *on_state);
friend std::ostream &operator<<(std::ostream &os, const State &state);
};