// vim: ts=8 sw=2 smarttab
#include "librbd/journal/Replay.h"
+#include "common/WorkQueue.h"
#include "librbd/AioCompletion.h"
#include "librbd/AioImageRequest.h"
#include "librbd/ImageCtx.h"
#include "librbd/internal.h"
+#include "librbd/Operations.h"
+#include "librbd/Utils.h"
#define dout_subsys ceph_subsys_rbd
#undef dout_prefix
namespace librbd {
namespace journal {
+namespace {
+
+static NoOpProgressContext no_op_progress_callback;
+
+} // anonymous namespace
+
+
template <typename I>
Replay<I>::Replay(I &image_ctx)
: m_image_ctx(image_ctx), m_lock("Replay<I>::m_lock"), m_flush_ctx(nullptr),
template <typename I>
Replay<I>::~Replay() {
- assert(m_aio_completions.empty());
+ assert(m_op_contexts.empty() && m_aio_completions.empty());
}
template <typename I>
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << this << " " << __func__ << dendl;
+ on_finish = util::create_async_context_callback(
+ m_image_ctx, on_finish);
{
Mutex::Locker locker(m_lock);
assert(m_flush_ctx == nullptr);
m_flush_ctx = on_finish;
- if (!m_aio_completions.empty()) {
+ if (!m_op_contexts.empty() || !m_aio_completions.empty()) {
return;
}
}
Context *on_safe) {
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << this << " " << __func__ << ": Snap create event" << dendl;
+
+ Context *on_finish = create_op_context_callback(on_safe);
+ m_image_ctx.operations->snap_create(event.snap_name.c_str(), on_finish);
}
template <typename I>
Context *on_safe) {
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << this << " " << __func__ << ": Snap remove event" << dendl;
+
+ Context *on_finish = create_op_context_callback(on_safe);
+ m_image_ctx.operations->snap_remove(event.snap_name.c_str(), on_finish);
}
template <typename I>
Context *on_safe) {
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << this << " " << __func__ << ": Snap rename event" << dendl;
+
+ Context *on_finish = create_op_context_callback(on_safe);
+ m_image_ctx.operations->snap_rename(event.snap_id, event.snap_name.c_str(),
+ on_finish);
}
template <typename I>
Context *on_safe) {
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << this << " " << __func__ << ": Snap protect event" << dendl;
+
+ Context *on_finish = create_op_context_callback(on_safe);
+ m_image_ctx.operations->snap_protect(event.snap_name.c_str(), on_finish);
}
template <typename I>
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << this << " " << __func__ << ": Snap unprotect event"
<< dendl;
+
+ Context *on_finish = create_op_context_callback(on_safe);
+ m_image_ctx.operations->snap_unprotect(event.snap_name.c_str(), on_finish);
}
template <typename I>
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << this << " " << __func__ << ": Snap rollback start event"
<< dendl;
+
+ Context *on_finish = create_op_context_callback(on_safe);
+ m_image_ctx.operations->snap_rollback(event.snap_name.c_str(),
+ no_op_progress_callback, on_finish);
}
template <typename I>
Context *on_safe) {
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << this << " " << __func__ << ": Rename event" << dendl;
+
+ Context *on_finish = create_op_context_callback(on_safe);
+ m_image_ctx.operations->rename(event.image_name.c_str(), on_finish);
}
template <typename I>
Context *on_safe) {
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << this << " " << __func__ << ": Resize start event" << dendl;
+
+ Context *on_finish = create_op_context_callback(on_safe);
+ m_image_ctx.operations->resize(event.size, no_op_progress_callback,
+ on_finish);
}
template <typename I>
Context *on_safe) {
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << this << " " << __func__ << ": Flatten start event" << dendl;
+
+ Context *on_finish = create_op_context_callback(on_safe);
+ m_image_ctx.operations->flatten(no_op_progress_callback, on_finish);
}
template <typename I>
on_safe->complete(0);
}
+template <typename I>
+Context *Replay<I>::create_op_context_callback(Context *on_safe) {
+ C_OpOnFinish *on_finish;
+ {
+ on_finish = new C_OpOnFinish(this);
+ m_op_contexts[on_finish] = on_safe;
+ }
+ return on_finish;
+}
+
+template <typename I>
+void Replay<I>::handle_op_context_callback(Context *on_op_finish, int r) {
+ Context *on_safe = nullptr;
+ Context *on_flush = nullptr;
+ {
+ Mutex::Locker locker(m_lock);
+ auto it = m_op_contexts.find(on_op_finish);
+ assert(it != m_op_contexts.end());
+
+ if (m_ret_val == 0 && r < 0) {
+ m_ret_val = r;
+ }
+
+ on_safe = it->second;
+ m_op_contexts.erase(it);
+ if (m_op_contexts.empty() && m_aio_completions.empty()) {
+ on_flush = m_flush_ctx;
+ }
+ }
+
+ on_safe->complete(r);
+ if (on_flush != nullptr) {
+ on_flush->complete(m_ret_val);
+ }
+}
+
template <typename I>
AioCompletion *Replay<I>::create_aio_completion(Context *on_safe) {
Mutex::Locker locker(m_lock);
template <typename I>
void Replay<I>::handle_aio_completion(AioCompletion *aio_comp) {
- Context *on_finish = nullptr;
+ int r;
+ Context *on_safe = nullptr;
+ Context *on_flush = nullptr;
{
Mutex::Locker locker(m_lock);
-
AioCompletions::iterator it = m_aio_completions.find(aio_comp);
assert(it != m_aio_completions.end());
- int r = aio_comp->get_return_value();
+ r = aio_comp->get_return_value();
+ if (m_ret_val == 0 && r < 0) {
+ m_ret_val = r;
+ }
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << this << " " << __func__ << ": "
<< "aio_comp=" << aio_comp << ", "
<< "r=" << r << dendl;
- Context *on_safe = it->second;
- on_safe->complete(r);
-
- if (r < 0 && m_ret_val == 0) {
- m_ret_val = r;
- }
-
+ on_safe = it->second;
m_aio_completions.erase(it);
- if (m_aio_completions.empty()) {
- on_finish = m_flush_ctx;
+ if (m_op_contexts.empty() && m_aio_completions.empty()) {
+ on_flush = m_flush_ctx;
}
}
- if (on_finish != nullptr) {
- on_finish->complete(m_ret_val);
+ on_safe->complete(r);
+ if (on_flush != nullptr) {
+ on_flush->complete(m_ret_val);
}
}
#include "include/int_types.h"
#include "include/buffer_fwd.h"
+#include "include/Context.h"
+#include "include/unordered_map.h"
#include "include/rbd/librbd.hpp"
#include "common/Mutex.h"
#include "librbd/journal/Entries.h"
#include <boost/variant.hpp>
#include <map>
-class Context;
-
namespace librbd {
class AioCompletion;
return new Replay(image_ctx);
}
+ Replay(ImageCtxT &image_ctx);
~Replay();
int process(bufferlist::iterator it, Context *on_safe = NULL);
void flush(Context *on_finish);
private:
+ typedef ceph::unordered_map<Context *, Context *> OpContexts;
typedef std::map<AioCompletion*,Context*> AioCompletions;
+ struct C_OpOnFinish : public Context {
+ Replay *replay;
+ C_OpOnFinish(Replay *replay) : replay(replay) {
+ }
+ virtual void finish(int r) override {
+ replay->handle_op_context_callback(this, r);
+ }
+ };
+
struct EventVisitor : public boost::static_visitor<void> {
Replay *replay;
Context *on_safe;
Mutex m_lock;
+ OpContexts m_op_contexts;
AioCompletions m_aio_completions;
Context *m_flush_ctx;
int m_ret_val;
- Replay(ImageCtxT &image_ctx);
-
void handle_event(const AioDiscardEvent &event, Context *on_safe);
void handle_event(const AioWriteEvent &event, Context *on_safe);
void handle_event(const AioFlushEvent &event, Context *on_safe);
void handle_event(const FlattenEvent &event, Context *on_safe);
void handle_event(const UnknownEvent &event, Context *on_safe);
+ Context *create_op_context_callback(Context *on_safe);
+ void handle_op_context_callback(Context *on_op_finish, int r);
+
AioCompletion *create_aio_completion(Context *on_safe);
void handle_aio_completion(AioCompletion *aio_comp);