namespace librbd {
+using util::create_async_context_callback;
+using util::create_context_callback;
+
namespace {
const std::string CLIENT_DESCRIPTION = "master image";
}
};
+template <typename ImageCtxT>
struct C_ReplayCommitted : public Context {
- ::journal::Journaler *journaler;
- ::journal::ReplayEntry replay_entry;
+ typedef journal::TypeTraits<ImageCtxT> TypeTraits;
+ typedef typename TypeTraits::Journaler Journaler;
+ typedef typename TypeTraits::ReplayEntry ReplayEntry;
+
+ Journaler *journaler;
+ ReplayEntry replay_entry;
- C_ReplayCommitted(::journal::Journaler *journaler,
- ::journal::ReplayEntry &&replay_entry) :
+ C_ReplayCommitted(Journaler *journaler, ReplayEntry &&replay_entry) :
journaler(journaler), replay_entry(std::move(replay_entry)) {
}
virtual void finish(int r) {
case Journal<I>::STATE_REPLAYING:
os << "Replaying";
break;
+ case Journal<I>::STATE_FLUSHING_RESTART:
+ os << "FlushingRestart";
+ break;
case Journal<I>::STATE_RESTARTING_REPLAY:
os << "RestartingReplay";
break;
+ case Journal<I>::STATE_FLUSHING_REPLAY:
+ os << "FlushingReplay";
+ break;
case Journal<I>::STATE_READY:
os << "Ready";
break;
pool_id = data_io_ctx.get_id();
}
- ::journal::Journaler journaler(io_ctx, image_id, "",
- cct->_conf->rbd_journal_commit_age);
+ Journaler journaler(io_ctx, image_id, "", cct->_conf->rbd_journal_commit_age);
int r = journaler.create(order, splay_width, pool_id);
if (r < 0) {
CephContext *cct = reinterpret_cast<CephContext *>(io_ctx.cct());
ldout(cct, 5) << __func__ << ": image=" << image_id << dendl;
- ::journal::Journaler journaler(io_ctx, image_id, "",
- cct->_conf->rbd_journal_commit_age);
+ Journaler journaler(io_ctx, image_id, "", cct->_conf->rbd_journal_commit_age);
bool journal_exists;
int r = journaler.exists(&journal_exists);
CephContext *cct = reinterpret_cast<CephContext *>(io_ctx.cct());
ldout(cct, 5) << __func__ << ": image=" << image_id << dendl;
- ::journal::Journaler journaler(io_ctx, image_id, "",
- cct->_conf->rbd_journal_commit_age);
+ Journaler journaler(io_ctx, image_id, "", cct->_conf->rbd_journal_commit_age);
C_SaferCond cond;
journaler.init(&cond);
bufferlist bl;
::encode(event_entry, bl);
- ::journal::Future future;
+ Future future;
uint64_t tid;
{
Mutex::Locker locker(m_lock);
ldout(cct, 20) << this << " " << __func__ << ": tid=" << tid << ", "
<< "on_safe=" << on_safe << dendl;
- ::journal::Future future;
+ Future future;
{
Mutex::Locker event_locker(m_event_lock);
future = wait_event(m_lock, tid, on_safe);
}
template <typename I>
-::journal::Future Journal<I>::wait_event(Mutex &lock, uint64_t tid,
- Context *on_safe) {
+typename Journal<I>::Future Journal<I>::wait_event(Mutex &lock, uint64_t tid,
+ Context *on_safe) {
assert(m_event_lock.is_locked());
CephContext *cct = m_image_ctx.cct;
// journal entry already safe
ldout(cct, 20) << "journal entry already safe" << dendl;
m_image_ctx.op_work_queue->queue(on_safe, 0);
- return ::journal::Future();
+ return Future();
}
Event &event = it->second;
assert(m_journaler == NULL);
transition_state(STATE_INITIALIZING, 0);
- m_journaler = new ::journal::Journaler(m_image_ctx.md_ctx, m_image_ctx.id, "",
- m_image_ctx.journal_commit_age);
- m_journaler->init(new C_InitJournal(this));
+ m_journaler = new Journaler(m_image_ctx.md_ctx, m_image_ctx.id, "",
+ m_image_ctx.journal_commit_age);
+ m_journaler->init(create_async_context_callback(
+ m_image_ctx, create_context_callback<
+ Journal<I>, &Journal<I>::handle_initialized>(this)));
}
template <typename I>
m_journal_replay = NULL;
transition_state(STATE_CLOSING, r);
- m_image_ctx.op_work_queue->queue(new C_DestroyJournaler(this), 0);
+ m_image_ctx.op_work_queue->queue(create_context_callback<
+ Journal<I>, &Journal<I>::handle_journal_destroyed>(this), 0);
}
template <typename I>
ldout(cct, 20) << this << " " << __func__ << ": r=" << r << dendl;
assert(m_lock.is_locked());
- assert(m_state == STATE_REPLAYING);
+ assert(m_state == STATE_FLUSHING_RESTART ||
+ m_state == STATE_FLUSHING_REPLAY);
delete m_journal_replay;
m_journal_replay = NULL;
transition_state(STATE_RESTARTING_REPLAY, r);
- m_image_ctx.op_work_queue->queue(new C_DestroyJournaler(this), 0);
+ m_image_ctx.op_work_queue->queue(create_context_callback<
+ Journal<I>, &Journal<I>::handle_journal_destroyed>(this), 0);
}
template <typename I>
}
while (true) {
- ::journal::ReplayEntry replay_entry;
+ ReplayEntry replay_entry;
if (!m_journaler->try_pop_front(&replay_entry)) {
return;
}
m_lock.Unlock();
bufferlist data = replay_entry.get_data();
bufferlist::iterator it = data.begin();
- int r = m_journal_replay->process(
- it, new C_ReplayCommitted(m_journaler, std::move(replay_entry)));
+
+ Context *on_commit = new C_ReplayCommitted<I>(m_journaler,
+ std::move(replay_entry));
+ int r = m_journal_replay->process(it, on_commit);
m_lock.Lock();
if (r < 0) {
lderr(cct) << "failed to replay journal entry: " << cpp_strerror(r)
<< dendl;
- m_journaler->stop_replay();
+ delete on_commit;
- if (m_close_pending) {
- destroy_journaler(r);
- return;
- }
+ m_journaler->stop_replay();
- recreate_journaler(r);
+ transition_state(STATE_FLUSHING_RESTART, r);
+ m_journal_replay->flush(create_async_context_callback(
+ m_image_ctx, create_context_callback<
+ Journal<I>, &Journal<I>::handle_flushing_restart>(this)));
+ return;
}
}
}
ldout(cct, 20) << this << " " << __func__ << dendl;
m_journaler->stop_replay();
-
- if (r == 0) {
- r = m_journal_replay->flush();
- }
-
if (r < 0) {
- lderr(cct) << this << " " << __func__ << ": r=" << r << dendl;
- recreate_journaler(r);
+ transition_state(STATE_FLUSHING_RESTART, r);
+ m_journal_replay->flush(create_async_context_callback(
+ m_image_ctx, create_context_callback<
+ Journal<I>, &Journal<I>::handle_flushing_restart>(this)));
return;
}
- delete m_journal_replay;
- m_journal_replay = NULL;
+ transition_state(STATE_FLUSHING_REPLAY, 0);
+ m_journal_replay->flush(create_async_context_callback(
+ m_image_ctx, create_context_callback<
+ Journal<I>, &Journal<I>::handle_flushing_replay>(this)));
+ }
+}
- if (m_close_pending) {
- destroy_journaler(0);
- return;
- }
+template <typename I>
+void Journal<I>::handle_flushing_restart(int r) {
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << this << " " << __func__ << ": r=" << r << dendl;
+
+ Mutex::Locker locker(m_lock);
+ assert(m_state == STATE_FLUSHING_RESTART);
+ if (m_close_pending) {
+ destroy_journaler(r);
+ return;
+ }
+
+ recreate_journaler(r);
+}
- m_error_result = 0;
- m_journaler->start_append(m_image_ctx.journal_object_flush_interval,
- m_image_ctx.journal_object_flush_bytes,
- m_image_ctx.journal_object_flush_age);
- transition_state(STATE_READY, 0);
+template <typename I>
+void Journal<I>::handle_flushing_replay(int r) {
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << this << " " << __func__ << ": r=" << r << dendl;
+
+ Mutex::Locker locker(m_lock);
+ assert(m_state == STATE_FLUSHING_REPLAY);
+ if (m_close_pending) {
+ destroy_journaler(r);
+ return;
}
+
+ if (r < 0) {
+ lderr(cct) << this << " " << __func__ << ": r=" << r << dendl;
+ recreate_journaler(r);
+ return;
+ }
+
+ delete m_journal_replay;
+ m_journal_replay = NULL;
+
+ m_error_result = 0;
+ m_journaler->start_append(m_image_ctx.journal_object_flush_interval,
+ m_image_ctx.journal_object_flush_bytes,
+ m_image_ctx.journal_object_flush_age);
+ transition_state(STATE_READY, 0);
}
template <typename I>
transition_state(STATE_STOPPING, 0);
m_journaler->stop_append(util::create_async_context_callback(
- m_image_ctx, new C_StopRecording(this)));
-}
-
-template <typename I>
-void Journal<I>::block_writes() {
- assert(m_lock.is_locked());
- if (!m_blocking_writes) {
- m_blocking_writes = true;
- m_image_ctx.aio_work_queue->block_writes();
- }
-}
-
-template <typename I>
-void Journal<I>::unblock_writes() {
- assert(m_lock.is_locked());
- if (m_blocking_writes) {
- m_blocking_writes = false;
- m_image_ctx.aio_work_queue->unblock_writes();
- }
+ m_image_ctx, create_context_callback<
+ Journal<I>, &Journal<I>::handle_recording_stopped>(this)));
}
template <typename I>
case STATE_UNINITIALIZED:
case STATE_INITIALIZING:
case STATE_REPLAYING:
+ case STATE_FLUSHING_RESTART:
case STATE_RESTARTING_REPLAY:
+ case STATE_FLUSHING_REPLAY:
case STATE_STOPPING:
case STATE_CLOSING:
break;
class Context;
namespace journal {
class Journaler;
+class ReplayEntry;
}
namespace librbd {
class ImageCtx;
namespace journal {
+
class EventEntry;
template <typename> class Replay;
-}
+
+template <typename ImageCtxT>
+struct TypeTraits {
+ typedef ::journal::Journaler Journaler;
+ typedef ::journal::Future Future;
+ typedef ::journal::ReplayEntry ReplayEntry;
+};
+
+} // namespace journal
+
template <typename ImageCtxT = ImageCtx>
class Journal {
* <start>
* |
* v
- * UNINITIALIZED ---> INITIALIZING ---> REPLAYING ------> READY
- * | * . ^ * . |
- * | * . | * . |
- * | * . | (error) * . . . . |
- * | * . | * . |
- * | * . | v . v
- * | * . | RESTARTING . STOPPING
- * | * . | | . |
- * | * . | | . |
- * | * * * * * * . \-------------/ . |
- * | * (error) . . |
- * | * . . . . . . . . . . . . . |
- * | * . . |
- * | v v v |
- * | CLOSED <----- CLOSING <-------------------------/
+ * UNINITIALIZED ---> INITIALIZING ---> REPLAYING ------> FLUSHING ---> READY
+ * | * . ^ * . * |
+ * | * . | * . * |
+ * | * . | (error) * . . . . . . . * |
+ * | * . | * . * |
+ * | * . | v . * |
+ * | * . | FLUSHING_RESTART . * |
+ * | * . | | . * |
+ * | * . | | . * |
+ * | * . | v . * v
+ * | * . | RESTARTING < * * * * * STOPPING
+ * | * . | | . |
+ * | * . | | . |
+ * | * * * * * * . \-------------/ . |
+ * | * (error) . . |
+ * | * . . . . . . . . . . . . . . . . |
+ * | * . . |
+ * | v v v |
+ * | CLOSED <----- CLOSING <---------------------------------------/
* | |
* | v
* \---> <finish>
STATE_UNINITIALIZED,
STATE_INITIALIZING,
STATE_REPLAYING,
+ STATE_FLUSHING_RESTART,
STATE_RESTARTING_REPLAY,
+ STATE_FLUSHING_REPLAY,
STATE_READY,
STATE_STOPPING,
STATE_CLOSING,
void wait_event(uint64_t tid, Context *on_safe);
private:
+ ImageCtxT &m_image_ctx;
+
+ // mock unit testing support
+ typedef journal::TypeTraits<ImageCtxT> TypeTraits;
+ typedef typename TypeTraits::Journaler Journaler;
+ typedef typename TypeTraits::Future Future;
+ typedef typename TypeTraits::ReplayEntry ReplayEntry;
+
typedef std::list<Context *> Contexts;
typedef interval_set<uint64_t> ExtentInterval;
struct Event {
- ::journal::Future future;
+ Future future;
AioCompletion *aio_comp;
AioObjectRequests aio_object_requests;
Contexts on_safe_contexts;
Event() : aio_comp(NULL) {
}
- Event(const ::journal::Future &_future, AioCompletion *_aio_comp,
+ Event(const Future &_future, AioCompletion *_aio_comp,
const AioObjectRequests &_requests, uint64_t offset, size_t length)
: future(_future), aio_comp(_aio_comp), aio_object_requests(_requests),
safe(false), ret_val(0) {
}
}
};
- typedef ceph::unordered_map<uint64_t, Event> Events;
-
- struct C_InitJournal : public Context {
- Journal *journal;
-
- C_InitJournal(Journal *_journal) : journal(_journal) {
- }
-
- virtual void finish(int r) {
- journal->handle_initialized(r);
- }
- };
-
- struct C_StopRecording : public Context {
- Journal *journal;
-
- C_StopRecording(Journal *_journal) : journal(_journal) {
- }
-
- virtual void finish(int r) {
- journal->handle_recording_stopped(r);
- }
- };
- struct C_DestroyJournaler : public Context {
- Journal *journal;
-
- C_DestroyJournaler(Journal *_journal) : journal(_journal) {
- }
-
- virtual void finish(int r) {
- journal->handle_journal_destroyed(r);
- }
- };
+ typedef ceph::unordered_map<uint64_t, Event> Events;
struct C_EventSafe : public Context {
Journal *journal;
}
};
- ImageCtxT &m_image_ctx;
-
- ::journal::Journaler *m_journaler;
+ Journaler *m_journaler;
mutable Mutex m_lock;
State m_state;
journal::Replay<ImageCtxT> *m_journal_replay;
- ::journal::Future wait_event(Mutex &lock, uint64_t tid, Context *on_safe);
+ Future wait_event(Mutex &lock, uint64_t tid, Context *on_safe);
void create_journaler();
void destroy_journaler(int r);
void handle_replay_ready();
void handle_replay_complete(int r);
+ void handle_flushing_restart(int r);
+ void handle_flushing_replay(int r);
+
void handle_recording_stopped(int r);
void handle_journal_destroyed(int r);
void stop_recording();
- void block_writes();
- void unblock_writes();
-
void transition_state(State state, int r);
bool is_steady_state() const;
template <typename I>
Replay<I>::Replay(I &image_ctx)
- : m_image_ctx(image_ctx), m_lock("Replay<I>::m_lock"), m_ret_val(0) {
+ : m_image_ctx(image_ctx), m_lock("Replay<I>::m_lock"), m_flush_ctx(nullptr),
+ m_ret_val(0) {
}
template <typename I>
}
template <typename I>
-int Replay<I>::flush() {
+void Replay<I>::flush(Context *on_finish) {
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << this << " " << __func__ << dendl;
- Mutex::Locker locker(m_lock);
- while (!m_aio_completions.empty()) {
- m_cond.Wait(m_lock);
+ {
+ Mutex::Locker locker(m_lock);
+ assert(m_flush_ctx == nullptr);
+ m_flush_ctx = on_finish;
+
+ if (!m_aio_completions.empty()) {
+ return;
+ }
}
- return m_ret_val;
+ on_finish->complete(m_ret_val);
}
template <typename I>
template <typename I>
void Replay<I>::handle_aio_completion(AioCompletion *aio_comp) {
- Mutex::Locker locker(m_lock);
+ Context *on_finish = nullptr;
+ {
+ Mutex::Locker locker(m_lock);
- AioCompletions::iterator it = m_aio_completions.find(aio_comp);
- assert(it != m_aio_completions.end());
+ AioCompletions::iterator it = m_aio_completions.find(aio_comp);
+ assert(it != m_aio_completions.end());
- int r = aio_comp->get_return_value();
+ int r = aio_comp->get_return_value();
- CephContext *cct = m_image_ctx.cct;
- ldout(cct, 20) << this << " " << __func__ << ": aio_comp=" << aio_comp << ", "
- << "r=" << r << dendl;
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << this << " " << __func__ << ": "
+ << "aio_comp=" << aio_comp << ", "
+ << "r=" << r << dendl;
+
+ Context *on_safe = it->second;
+ on_safe->complete(r);
- Context *on_safe = it->second;
- on_safe->complete(r);
+ if (r < 0 && m_ret_val == 0) {
+ m_ret_val = r;
+ }
- if (r < 0 && m_ret_val == 0) {
- m_ret_val = r;
+ m_aio_completions.erase(it);
+ if (m_aio_completions.empty()) {
+ on_finish = m_flush_ctx;
+ }
}
- m_aio_completions.erase(it);
- if (m_aio_completions.empty())
- m_cond.Signal();
+ if (on_finish != nullptr) {
+ on_finish->complete(m_ret_val);
+ }
}
template <typename I>