const std::string CLIENT_DESCRIPTION = "master image";
-template <typename ImageCtxT>
-struct C_ReplayCommitted : public Context {
- typedef journal::TypeTraits<ImageCtxT> TypeTraits;
- typedef typename TypeTraits::Journaler Journaler;
- typedef typename TypeTraits::ReplayEntry ReplayEntry;
-
- Journaler *journaler;
- ReplayEntry replay_entry;
-
- C_ReplayCommitted(Journaler *journaler, ReplayEntry &&replay_entry) :
- journaler(journaler), replay_entry(std::move(replay_entry)) {
- }
- virtual void finish(int r) {
- journaler->committed(replay_entry);
- }
-};
-
} // anonymous namespace
template <typename I>
template <typename I>
void Journal<I>::handle_replay_ready() {
+ ReplayEntry replay_entry;
+ {
+ Mutex::Locker locker(m_lock);
+ if (m_state != STATE_REPLAYING) {
+ return;
+ }
+
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << this << " " << __func__ << dendl;
+ if (!m_journaler->try_pop_front(&replay_entry)) {
+ return;
+ }
+ }
+
+ bufferlist data = replay_entry.get_data();
+ bufferlist::iterator it = data.begin();
+ Context *on_ready = create_context_callback<
+ Journal<I>, &Journal<I>::handle_replay_process_ready>(this);
+ Context *on_commit = new C_ReplayProcessSafe(this, std::move(replay_entry));
+
+ m_journal_replay->process(&it, on_ready, on_commit);
+}
+
+template <typename I>
+void Journal<I>::handle_replay_complete(int r) {
CephContext *cct = m_image_ctx.cct;
- ldout(cct, 20) << this << " " << __func__ << dendl;
Mutex::Locker locker(m_lock);
if (m_state != STATE_REPLAYING) {
return;
}
- while (true) {
- ReplayEntry replay_entry;
- if (!m_journaler->try_pop_front(&replay_entry)) {
- return;
- }
-
- m_lock.Unlock();
- bufferlist data = replay_entry.get_data();
- bufferlist::iterator it = data.begin();
+ ldout(cct, 20) << this << " " << __func__ << ": r=" << r << dendl;
+ m_journaler->stop_replay();
+ if (r < 0) {
+ transition_state(STATE_FLUSHING_RESTART, r);
- Context *on_commit = new C_ReplayCommitted<I>(m_journaler,
- std::move(replay_entry));
- int r = m_journal_replay->process(it, on_commit);
- m_lock.Lock();
+ m_journal_replay->flush(create_context_callback<
+ Journal<I>, &Journal<I>::handle_flushing_restart>(this));
+ } else {
+ transition_state(STATE_FLUSHING_REPLAY, 0);
- if (r < 0) {
- lderr(cct) << "failed to replay journal entry: " << cpp_strerror(r)
- << dendl;
- delete on_commit;
+ m_journal_replay->flush(create_context_callback<
+ Journal<I>, &Journal<I>::handle_flushing_replay>(this));
+ }
+}
- m_journaler->stop_replay();
+template <typename I>
+void Journal<I>::handle_replay_process_ready(int r) {
+ // journal::Replay is ready for more events -- attempt to pop another
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << this << " " << __func__ << dendl;
- transition_state(STATE_FLUSHING_RESTART, r);
- m_journal_replay->flush(create_async_context_callback(
- m_image_ctx, create_context_callback<
- Journal<I>, &Journal<I>::handle_flushing_restart>(this)));
- return;
- }
- }
+ assert(r == 0);
+ handle_replay_ready();
}
template <typename I>
-void Journal<I>::handle_replay_complete(int r) {
+void Journal<I>::handle_replay_process_safe(ReplayEntry replay_entry, int r) {
+ Mutex::Locker locker(m_lock);
+
CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << this << " " << __func__ << ": r=" << r << dendl;
+ if (r < 0) {
+ lderr(cct) << "failed to commit journal event to disk: " << cpp_strerror(r)
+ << dendl;
- {
- Mutex::Locker locker(m_lock);
- if (m_state != STATE_REPLAYING) {
- return;
- }
+ if (m_state == STATE_REPLAYING) {
+ // abort the replay if we have an error
+ m_journaler->stop_replay();
+ transition_state(STATE_FLUSHING_RESTART, r);
- ldout(cct, 20) << this << " " << __func__ << dendl;
- m_journaler->stop_replay();
- if (r < 0) {
+ m_journal_replay->flush(create_context_callback<
+ Journal<I>, &Journal<I>::handle_flushing_restart>(this));
+ return;
+ } else if (m_state == STATE_FLUSHING_REPLAY) {
+ // end-of-replay flush in-progress -- we need to restart replay
transition_state(STATE_FLUSHING_RESTART, r);
- m_journal_replay->flush(create_async_context_callback(
- m_image_ctx, create_context_callback<
- Journal<I>, &Journal<I>::handle_flushing_restart>(this)));
return;
}
-
- transition_state(STATE_FLUSHING_REPLAY, 0);
- m_journal_replay->flush(create_async_context_callback(
- m_image_ctx, create_context_callback<
- Journal<I>, &Journal<I>::handle_flushing_replay>(this)));
+ } else {
+ // only commit the entry if written successfully
+ m_journaler->committed(replay_entry);
}
}
template <typename I>
void Journal<I>::handle_flushing_restart(int r) {
+ Mutex::Locker locker(m_lock);
+
CephContext *cct = m_image_ctx.cct;
- ldout(cct, 20) << this << " " << __func__ << ": r=" << r << dendl;
+ ldout(cct, 20) << this << " " << __func__ << dendl;
- Mutex::Locker locker(m_lock);
+ assert(r == 0);
assert(m_state == STATE_FLUSHING_RESTART);
if (m_close_pending) {
destroy_journaler(r);
template <typename I>
void Journal<I>::handle_flushing_replay(int r) {
+ Mutex::Locker locker(m_lock);
+
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << this << " " << __func__ << ": r=" << r << dendl;
- Mutex::Locker locker(m_lock);
- assert(m_state == STATE_FLUSHING_REPLAY);
+ assert(r == 0);
+ assert(m_state == STATE_FLUSHING_REPLAY || m_state == STATE_FLUSHING_RESTART);
if (m_close_pending) {
destroy_journaler(r);
return;
- }
-
- if (r < 0) {
- lderr(cct) << this << " " << __func__ << ": r=" << r << dendl;
- recreate_journaler(r);
+ } else if (m_state == STATE_FLUSHING_RESTART) {
+ // failed to replay one-or-more events -- restart
+ recreate_journaler(0);
return;
}
// vim: ts=8 sw=2 smarttab
#include "librbd/journal/Replay.h"
+#include "common/dout.h"
+#include "common/errno.h"
#include "common/WorkQueue.h"
#include "librbd/AioCompletion.h"
#include "librbd/AioImageRequest.h"
namespace {
+static const uint64_t IN_FLIGHT_IO_LOW_WATER_MARK(32);
+static const uint64_t IN_FLIGHT_IO_HIGH_WATER_MARK(64);
+
static NoOpProgressContext no_op_progress_callback;
} // anonymous namespace
-
template <typename I>
Replay<I>::Replay(I &image_ctx)
- : m_image_ctx(image_ctx), m_lock("Replay<I>::m_lock"), m_flush_ctx(nullptr),
- m_ret_val(0) {
+ : m_image_ctx(image_ctx), m_lock("Replay<I>::m_lock") {
}
template <typename I>
Replay<I>::~Replay() {
- assert(m_op_contexts.empty() && m_aio_completions.empty());
+ assert(m_in_flight_aio == 0);
+ assert(m_aio_modify_unsafe_contexts.empty());
+ assert(m_aio_modify_safe_contexts.empty());
+ assert(m_op_contexts.empty());
}
template <typename I>
-int Replay<I>::process(bufferlist::iterator it, Context *on_safe) {
+void Replay<I>::process(bufferlist::iterator *it, Context *on_ready,
+ Context *on_safe) {
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << this << " " << __func__ << dendl;
- RWLock::RLocker owner_lock(m_image_ctx.owner_lock);
journal::EventEntry event_entry;
try {
- ::decode(event_entry, it);
+ ::decode(event_entry, *it);
} catch (const buffer::error &err) {
lderr(cct) << "failed to decode event entry: " << err.what() << dendl;
- return -EINVAL;
+ on_ready->complete(-EINVAL);
+ return;
}
- boost::apply_visitor(EventVisitor(this, on_safe), event_entry.event);
- return 0;
+ Mutex::Locker locker(m_lock);
+ RWLock::RLocker owner_lock(m_image_ctx.owner_lock);
+ boost::apply_visitor(EventVisitor(this, on_ready, on_safe),
+ event_entry.event);
}
template <typename I>
assert(m_flush_ctx == nullptr);
m_flush_ctx = on_finish;
- if (!m_op_contexts.empty() || !m_aio_completions.empty()) {
+ if (m_in_flight_aio != 0) {
+ flush_aio();
+ }
+
+ if (!m_op_contexts.empty() || m_in_flight_aio != 0) {
return;
}
}
- on_finish->complete(m_ret_val);
+ on_finish->complete(0);
}
template <typename I>
void Replay<I>::handle_event(const journal::AioDiscardEvent &event,
- Context *on_safe) {
+ Context *on_ready, Context *on_safe) {
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << this << " " << __func__ << ": AIO discard event" << dendl;
- AioCompletion *aio_comp = create_aio_completion(on_safe);
+ bool flush_required;
+ AioCompletion *aio_comp = create_aio_modify_completion(on_ready, on_safe,
+ &flush_required);
AioImageRequest<I>::aio_discard(&m_image_ctx, aio_comp, event.offset,
event.length);
+ if (flush_required) {
+ flush_aio();
+ }
}
template <typename I>
void Replay<I>::handle_event(const journal::AioWriteEvent &event,
- Context *on_safe) {
+ Context *on_ready, Context *on_safe) {
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << this << " " << __func__ << ": AIO write event" << dendl;
bufferlist data = event.data;
- AioCompletion *aio_comp = create_aio_completion(on_safe);
+ bool flush_required;
+ AioCompletion *aio_comp = create_aio_modify_completion(on_ready, on_safe,
+ &flush_required);
AioImageRequest<I>::aio_write(&m_image_ctx, aio_comp, event.offset,
event.length, data.c_str(), 0);
+ if (flush_required) {
+ flush_aio();
+ }
}
template <typename I>
void Replay<I>::handle_event(const journal::AioFlushEvent &event,
- Context *on_safe) {
+ Context *on_ready, Context *on_safe) {
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << this << " " << __func__ << ": AIO flush event" << dendl;
- AioCompletion *aio_comp = create_aio_completion(on_safe);
+ AioCompletion *aio_comp = create_aio_flush_completion(on_ready, on_safe);
AioImageRequest<I>::aio_flush(&m_image_ctx, aio_comp);
}
template <typename I>
void Replay<I>::handle_event(const journal::OpFinishEvent &event,
- Context *on_safe) {
+ Context *on_ready, Context *on_safe) {
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << this << " " << __func__ << ": Op finish event" << dendl;
}
template <typename I>
void Replay<I>::handle_event(const journal::SnapCreateEvent &event,
- Context *on_safe) {
+ Context *on_ready, Context *on_safe) {
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << this << " " << __func__ << ": Snap create event" << dendl;
+ // TODO not-ready until state machine lets us know
Context *on_finish = create_op_context_callback(on_safe);
m_image_ctx.operations->snap_create(event.snap_name.c_str(), on_finish);
}
template <typename I>
void Replay<I>::handle_event(const journal::SnapRemoveEvent &event,
- Context *on_safe) {
+ Context *on_ready, Context *on_safe) {
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << this << " " << __func__ << ": Snap remove event" << dendl;
Context *on_finish = create_op_context_callback(on_safe);
m_image_ctx.operations->snap_remove(event.snap_name.c_str(), on_finish);
+ on_ready->complete(0);
}
template <typename I>
void Replay<I>::handle_event(const journal::SnapRenameEvent &event,
- Context *on_safe) {
+ Context *on_ready, Context *on_safe) {
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << this << " " << __func__ << ": Snap rename event" << dendl;
Context *on_finish = create_op_context_callback(on_safe);
m_image_ctx.operations->snap_rename(event.snap_id, event.snap_name.c_str(),
on_finish);
+ on_ready->complete(0);
}
template <typename I>
void Replay<I>::handle_event(const journal::SnapProtectEvent &event,
- Context *on_safe) {
+ Context *on_ready, Context *on_safe) {
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << this << " " << __func__ << ": Snap protect event" << dendl;
Context *on_finish = create_op_context_callback(on_safe);
m_image_ctx.operations->snap_protect(event.snap_name.c_str(), on_finish);
+ on_ready->complete(0);
}
template <typename I>
void Replay<I>::handle_event(const journal::SnapUnprotectEvent &event,
- Context *on_safe) {
+ Context *on_ready, Context *on_safe) {
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << this << " " << __func__ << ": Snap unprotect event"
<< dendl;
Context *on_finish = create_op_context_callback(on_safe);
m_image_ctx.operations->snap_unprotect(event.snap_name.c_str(), on_finish);
+ on_ready->complete(0);
}
template <typename I>
void Replay<I>::handle_event(const journal::SnapRollbackEvent &event,
- Context *on_safe) {
+ Context *on_ready, Context *on_safe) {
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << this << " " << __func__ << ": Snap rollback start event"
<< dendl;
Context *on_finish = create_op_context_callback(on_safe);
m_image_ctx.operations->snap_rollback(event.snap_name.c_str(),
no_op_progress_callback, on_finish);
+ on_ready->complete(0);
}
template <typename I>
void Replay<I>::handle_event(const journal::RenameEvent &event,
- Context *on_safe) {
+ Context *on_ready, Context *on_safe) {
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << this << " " << __func__ << ": Rename event" << dendl;
Context *on_finish = create_op_context_callback(on_safe);
m_image_ctx.operations->rename(event.image_name.c_str(), on_finish);
+ on_ready->complete(0);
}
template <typename I>
void Replay<I>::handle_event(const journal::ResizeEvent &event,
- Context *on_safe) {
+ Context *on_ready, Context *on_safe) {
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << this << " " << __func__ << ": Resize start event" << dendl;
+ // TODO not-ready until state machine lets us know
Context *on_finish = create_op_context_callback(on_safe);
m_image_ctx.operations->resize(event.size, no_op_progress_callback,
on_finish);
template <typename I>
void Replay<I>::handle_event(const journal::FlattenEvent &event,
- Context *on_safe) {
+ Context *on_ready, Context *on_safe) {
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << this << " " << __func__ << ": Flatten start event" << dendl;
Context *on_finish = create_op_context_callback(on_safe);
m_image_ctx.operations->flatten(no_op_progress_callback, on_finish);
+ on_ready->complete(0);
}
template <typename I>
void Replay<I>::handle_event(const journal::UnknownEvent &event,
- Context *on_safe) {
+ Context *on_ready, Context *on_safe) {
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << this << " " << __func__ << ": unknown event" << dendl;
+ on_ready->complete(0);
on_safe->complete(0);
}
template <typename I>
-Context *Replay<I>::create_op_context_callback(Context *on_safe) {
- C_OpOnFinish *on_finish;
- {
- on_finish = new C_OpOnFinish(this);
- m_op_contexts[on_finish] = on_safe;
+void Replay<I>::flush_aio() {
+ assert(m_lock.is_locked());
+
+ AioCompletion *aio_comp = create_aio_flush_completion(nullptr, nullptr);
+ AioImageRequest<I>::aio_flush(&m_image_ctx, aio_comp);
+}
+
+template <typename I>
+void Replay<I>::handle_aio_modify_complete(Context *on_safe, int r) {
+ Mutex::Locker locker(m_lock);
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << this << " " << __func__ << ": on_safe=" << on_safe << ", "
+ << "r=" << r << dendl;
+
+ if (r < 0) {
+ lderr(cct) << "AIO modify op failed: " << cpp_strerror(r) << dendl;
+ on_safe->complete(r);
+ return;
}
- return on_finish;
+
+ // will be completed after next flush operation completes
+ m_aio_modify_safe_contexts.insert(on_safe);
}
template <typename I>
-void Replay<I>::handle_op_context_callback(Context *on_op_finish, int r) {
- Context *on_safe = nullptr;
+void Replay<I>::handle_aio_flush_complete(Context *on_flush_safe,
+ Contexts &on_safe_ctxs, int r) {
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << this << " " << __func__ << ": " << "r=" << r << dendl;
+
+ if (r < 0) {
+ lderr(cct) << "AIO flush failed: " << cpp_strerror(r) << dendl;
+ }
+
+ Context *on_aio_ready = nullptr;
Context *on_flush = nullptr;
{
Mutex::Locker locker(m_lock);
- auto it = m_op_contexts.find(on_op_finish);
- assert(it != m_op_contexts.end());
+ assert(m_in_flight_aio >= on_safe_ctxs.size());
+ m_in_flight_aio -= on_safe_ctxs.size();
- if (m_ret_val == 0 && r < 0) {
- m_ret_val = r;
+ std::swap(on_aio_ready, m_on_aio_ready);
+ if (m_op_contexts.empty() && m_in_flight_aio == 0) {
+ on_flush = m_flush_ctx;
}
- on_safe = it->second;
- m_op_contexts.erase(it);
- if (m_op_contexts.empty() && m_aio_completions.empty()) {
- on_flush = m_flush_ctx;
+ // strip out previously failed on_safe contexts
+ for (auto it = on_safe_ctxs.begin(); it != on_safe_ctxs.end(); ) {
+ if (m_aio_modify_safe_contexts.erase(*it)) {
+ ++it;
+ } else {
+ it = on_safe_ctxs.erase(it);
+ }
}
}
- on_safe->complete(r);
+ if (on_aio_ready != nullptr) {
+ ldout(cct, 10) << "resuming paused AIO" << dendl;
+ on_aio_ready->complete(0);
+ }
+ for (auto ctx : on_safe_ctxs) {
+ ctx->complete(r);
+ }
+ if (on_flush_safe != nullptr) {
+ on_flush_safe->complete(r);
+ }
if (on_flush != nullptr) {
- on_flush->complete(m_ret_val);
+ on_flush->complete(r);
}
}
template <typename I>
-AioCompletion *Replay<I>::create_aio_completion(Context *on_safe) {
- Mutex::Locker locker(m_lock);
- AioCompletion *aio_comp = AioCompletion::create(this, aio_completion_callback,
- nullptr);
- m_aio_completions.insert(std::pair<AioCompletion*,Context*>(
- aio_comp, on_safe));
- return aio_comp;
+Context *Replay<I>::create_op_context_callback(Context *on_safe) {
+ assert(m_lock.is_locked());
+
+ C_OpOnFinish *on_finish;
+ {
+ on_finish = new C_OpOnFinish(this);
+ m_op_contexts[on_finish] = on_safe;
+ }
+ return on_finish;
}
template <typename I>
-void Replay<I>::handle_aio_completion(AioCompletion *aio_comp) {
- int r;
+void Replay<I>::handle_op_context_callback(Context *on_op_finish, int r) {
Context *on_safe = nullptr;
Context *on_flush = nullptr;
{
Mutex::Locker locker(m_lock);
- AioCompletions::iterator it = m_aio_completions.find(aio_comp);
- assert(it != m_aio_completions.end());
-
- r = aio_comp->get_return_value();
- if (m_ret_val == 0 && r < 0) {
- m_ret_val = r;
- }
-
- CephContext *cct = m_image_ctx.cct;
- ldout(cct, 20) << this << " " << __func__ << ": "
- << "aio_comp=" << aio_comp << ", "
- << "r=" << r << dendl;
+ auto it = m_op_contexts.find(on_op_finish);
+ assert(it != m_op_contexts.end());
on_safe = it->second;
- m_aio_completions.erase(it);
- if (m_op_contexts.empty() && m_aio_completions.empty()) {
+ m_op_contexts.erase(it);
+ if (m_op_contexts.empty() && m_in_flight_aio == 0) {
on_flush = m_flush_ctx;
}
}
on_safe->complete(r);
if (on_flush != nullptr) {
- on_flush->complete(m_ret_val);
+ on_flush->complete(0);
}
}
template <typename I>
-void Replay<I>::aio_completion_callback(completion_t cb, void *arg) {
- Replay *replay = reinterpret_cast<Replay *>(arg);
- AioCompletion *aio_comp = reinterpret_cast<AioCompletion *>(cb);
+AioCompletion *Replay<I>::create_aio_modify_completion(Context *on_ready,
+ Context *on_safe,
+ bool *flush_required) {
+ CephContext *cct = m_image_ctx.cct;
+ assert(m_lock.is_locked());
+ assert(m_on_aio_ready == nullptr);
+
+ ++m_in_flight_aio;
+ m_aio_modify_unsafe_contexts.push_back(on_safe);
+
+ // FLUSH if we hit the low-water mark -- on_safe contexts are
+ // completed by flushes-only so that we don't move the journal
+ // commit position until safely on-disk
+
+ // when safe, the completion of the next flush will fire the on_safe
+ // callback
+ AioCompletion *aio_comp = AioCompletion::create<Context>(
+ new C_AioModifyComplete(this, on_safe));
+
+ *flush_required = (m_aio_modify_unsafe_contexts.size() ==
+ IN_FLIGHT_IO_LOW_WATER_MARK);
+ if (*flush_required) {
+ ldout(cct, 10) << "hit AIO replay low-water mark: scheduling flush"
+ << dendl;
+ }
- replay->handle_aio_completion(aio_comp);
- aio_comp->release();
+ // READY for more events if:
+ // * not at high-water mark for IO
+ // * in-flight ops are at a consistent point (snap create has IO flushed,
+ // shrink has adjusted clip boundary, etc) -- should have already been
+ // flagged not-ready
+ if (m_in_flight_aio == IN_FLIGHT_IO_HIGH_WATER_MARK) {
+ ldout(cct, 10) << "hit AIO replay high-water mark: pausing replay"
+ << dendl;
+ m_on_aio_ready = on_ready;
+ } else {
+ on_ready->complete(0);
+ }
+
+ return aio_comp;
+}
+
+template <typename I>
+AioCompletion *Replay<I>::create_aio_flush_completion(Context *on_ready,
+ Context *on_safe) {
+ // associate all prior write/discard ops to this flush request
+ AioCompletion *aio_comp = AioCompletion::create<Context>(
+ new C_AioFlushComplete(this, on_safe,
+ std::move(m_aio_modify_unsafe_contexts)));
+ m_aio_modify_unsafe_contexts.clear();
+
+ if (on_ready != nullptr) {
+ on_ready->complete(0);
+ }
+ return aio_comp;
}
} // namespace journal
#include "include/int_types.h"
#include "include/buffer_fwd.h"
#include "include/Context.h"
+#include "include/unordered_set.h"
#include "include/unordered_map.h"
#include "include/rbd/librbd.hpp"
#include "common/Mutex.h"
#include "librbd/journal/Entries.h"
#include <boost/variant.hpp>
-#include <map>
+#include <list>
namespace librbd {
Replay(ImageCtxT &image_ctx);
~Replay();
- int process(bufferlist::iterator it, Context *on_safe = NULL);
+ void process(bufferlist::iterator *it, Context *on_ready, Context *on_safe);
void flush(Context *on_finish);
private:
+ typedef std::list<Context *> Contexts;
+ typedef ceph::unordered_set<Context *> ContextSet;
typedef ceph::unordered_map<Context *, Context *> OpContexts;
- typedef std::map<AioCompletion*,Context*> AioCompletions;
struct C_OpOnFinish : public Context {
Replay *replay;
}
};
+ struct C_AioModifyComplete : public Context {
+ Replay *replay;
+ Context *on_safe;
+ C_AioModifyComplete(Replay *replay, Context *on_safe)
+ : replay(replay), on_safe(on_safe) {
+ }
+ virtual void finish(int r) {
+ replay->handle_aio_modify_complete(on_safe, r);
+ }
+ };
+
+ struct C_AioFlushComplete : public Context {
+ Replay *replay;
+ Context *on_flush_safe;
+ Contexts on_safe_ctxs;
+ C_AioFlushComplete(Replay *replay, Context *on_flush_safe,
+ Contexts &&on_safe_ctxs)
+ : replay(replay), on_flush_safe(on_flush_safe),
+ on_safe_ctxs(on_safe_ctxs) {
+ }
+ virtual void finish(int r) {
+ replay->handle_aio_flush_complete(on_flush_safe, on_safe_ctxs, r);
+ }
+ };
+
struct EventVisitor : public boost::static_visitor<void> {
Replay *replay;
+ Context *on_ready;
Context *on_safe;
- EventVisitor(Replay *_replay, Context *_on_safe)
- : replay(_replay), on_safe(_on_safe) {
+ EventVisitor(Replay *_replay, Context *_on_ready, Context *_on_safe)
+ : replay(_replay), on_ready(_on_ready), on_safe(_on_safe) {
}
template <typename Event>
inline void operator()(const Event &event) const {
- replay->handle_event(event, on_safe);
+ replay->handle_event(event, on_ready, on_safe);
}
};
Mutex m_lock;
+ uint64_t m_in_flight_aio = 0;
+ Contexts m_aio_modify_unsafe_contexts;
+ ContextSet m_aio_modify_safe_contexts;
+
OpContexts m_op_contexts;
- AioCompletions m_aio_completions;
- Context *m_flush_ctx;
- int m_ret_val;
-
- void handle_event(const AioDiscardEvent &event, Context *on_safe);
- void handle_event(const AioWriteEvent &event, Context *on_safe);
- void handle_event(const AioFlushEvent &event, Context *on_safe);
- void handle_event(const OpFinishEvent &event, Context *on_safe);
- void handle_event(const SnapCreateEvent &event, Context *on_safe);
- void handle_event(const SnapRemoveEvent &event, Context *on_safe);
- void handle_event(const SnapRenameEvent &event, Context *on_safe);
- void handle_event(const SnapProtectEvent &event, Context *on_safe);
- void handle_event(const SnapUnprotectEvent &event, Context *on_safe);
- void handle_event(const SnapRollbackEvent &event, Context *on_safe);
- void handle_event(const RenameEvent &event, Context *on_safe);
- void handle_event(const ResizeEvent &event, Context *on_safe);
- void handle_event(const FlattenEvent &event, Context *on_safe);
- void handle_event(const UnknownEvent &event, Context *on_safe);
+
+ Context *m_flush_ctx = nullptr;
+ Context *m_on_aio_ready = nullptr;
+
+ void handle_event(const AioDiscardEvent &event, Context *on_ready,
+ Context *on_safe);
+ void handle_event(const AioWriteEvent &event, Context *on_ready,
+ Context *on_safe);
+ void handle_event(const AioFlushEvent &event, Context *on_ready,
+ Context *on_safe);
+ void handle_event(const OpFinishEvent &event, Context *on_ready,
+ Context *on_safe);
+ void handle_event(const SnapCreateEvent &event, Context *on_ready,
+ Context *on_safe);
+ void handle_event(const SnapRemoveEvent &event, Context *on_ready,
+ Context *on_safe);
+ void handle_event(const SnapRenameEvent &event, Context *on_ready,
+ Context *on_safe);
+ void handle_event(const SnapProtectEvent &event, Context *on_ready,
+ Context *on_safe);
+ void handle_event(const SnapUnprotectEvent &event, Context *on_ready,
+ Context *on_safe);
+ void handle_event(const SnapRollbackEvent &event, Context *on_ready,
+ Context *on_safe);
+ void handle_event(const RenameEvent &event, Context *on_ready,
+ Context *on_safe);
+ void handle_event(const ResizeEvent &event, Context *on_ready,
+ Context *on_safe);
+ void handle_event(const FlattenEvent &event, Context *on_ready,
+ Context *on_safe);
+ void handle_event(const UnknownEvent &event, Context *on_ready,
+ Context *on_safe);
+
+ void flush_aio();
+ void handle_aio_modify_complete(Context *on_safe, int r);
+ void handle_aio_flush_complete(Context *on_flush_safe, Contexts &on_safe_ctxs,
+ int r);
Context *create_op_context_callback(Context *on_safe);
void handle_op_context_callback(Context *on_op_finish, int r);
- AioCompletion *create_aio_completion(Context *on_safe);
+ AioCompletion *create_aio_modify_completion(Context *on_ready,
+ Context *on_safe,
+ bool *flush_required);
+ AioCompletion *create_aio_flush_completion(Context *on_ready,
+ Context *on_safe);
void handle_aio_completion(AioCompletion *aio_comp);
- static void aio_completion_callback(completion_t cb, void *arg);
};
} // namespace journal