#include "common/Timer.h"
#include "common/WorkQueue.h"
#include "journal/Journaler.h"
-#include "journal/ReplayEntry.h"
#include "journal/ReplayHandler.h"
#include "librbd/ExclusiveLock.h"
#include "librbd/ImageCtx.h"
}
};
-template <typename I>
-struct C_ReplayCommitted : public Context {
- typedef typename librbd::journal::TypeTraits<I>::ReplayEntry ReplayEntry;
-
- ImageReplayer<I> *replayer;
- ReplayEntry replay_entry;
-
- C_ReplayCommitted(ImageReplayer<I> *replayer,
- ReplayEntry &&replay_entry)
- : replayer(replayer), replay_entry(std::move(replay_entry)) {
- }
- virtual void finish(int r) {
- replayer->handle_replay_committed(&replay_entry, r);
- }
-};
-
class ImageReplayerAdminSocketCommand {
public:
virtual ~ImageReplayerAdminSocketCommand() {}
m_local_replay(nullptr),
m_remote_journaler(nullptr),
m_replay_handler(nullptr),
- m_on_finish(nullptr),
m_asok_hook(nullptr)
{
}
assert(m_local_replay == nullptr);
assert(m_remote_journaler == nullptr);
assert(m_replay_handler == nullptr);
-
+ assert(m_on_start_finish == nullptr);
+ assert(m_on_stop_finish == nullptr);
delete m_asok_hook;
}
void ImageReplayer<I>::start(Context *on_finish,
const BootstrapParams *bootstrap_params)
{
- dout(20) << "on_finish=" << on_finish << ", m_on_finish=" << m_on_finish
- << dendl;
+ assert(m_on_start_finish == nullptr);
+ assert(m_on_stop_finish == nullptr);
+ dout(20) << "on_finish=" << on_finish << dendl;
{
Mutex::Locker locker(m_lock);
assert(is_stopped_());
m_state = STATE_STARTING;
-
- assert(m_on_finish == nullptr);
- m_on_finish = on_finish;
+ m_on_start_finish = on_finish;
}
int r = m_remote->ioctx_create2(m_remote_pool_id, m_remote_ioctx);
Context *on_finish(nullptr);
{
Mutex::Locker locker(m_lock);
- if (m_state == STATE_STOPPING) {
+ if (m_stop_requested) {
on_start_fail_start(-EINTR);
return;
}
assert(m_state == STATE_STARTING);
m_state = STATE_REPLAYING;
- std::swap(m_on_finish, on_finish);
+ std::swap(m_on_start_finish, on_finish);
}
dout(20) << "start succeeded" << dendl;
- if (on_finish) {
+ if (on_finish != nullptr) {
dout(20) << "on finish complete, r=" << r << dendl;
on_finish->complete(r);
}
m_local_ioctx.close();
m_remote_ioctx.close();
- Context *on_finish(nullptr);
-
+ Context *on_start_finish(nullptr);
+ Context *on_stop_finish(nullptr);
{
Mutex::Locker locker(m_lock);
- if (m_state == STATE_STOPPING) {
+ if (m_stop_requested) {
assert(r == -EINTR);
dout(20) << "start interrupted" << dendl;
m_state = STATE_STOPPED;
+ m_stop_requested = false;
} else {
assert(m_state == STATE_STARTING);
dout(20) << "start failed" << dendl;
m_state = STATE_UNINITIALIZED;
}
- std::swap(m_on_finish, on_finish);
+ std::swap(m_on_start_finish, on_start_finish);
+ std::swap(m_on_stop_finish, on_stop_finish);
}
- if (on_finish) {
- dout(20) << "on finish complete, r=" << r << dendl;
- on_finish->complete(r);
+ if (on_start_finish != nullptr) {
+ dout(20) << "on start finish complete, r=" << r << dendl;
+ on_start_finish->complete(r);
+ }
+ if (on_stop_finish != nullptr) {
+ dout(20) << "on stop finish complete, r=" << r << dendl;
+ on_stop_finish->complete(0);
}
}
bool ImageReplayer<I>::on_start_interrupted()
{
Mutex::Locker locker(m_lock);
-
- if (m_state == STATE_STARTING) {
+ assert(m_state == STATE_STARTING);
+ if (m_on_stop_finish == nullptr) {
return false;
}
- assert(m_state == STATE_STOPPING);
-
on_start_fail_start(-EINTR);
return true;
}
template <typename I>
void ImageReplayer<I>::stop(Context *on_finish)
{
- dout(20) << "on_finish=" << on_finish << ", m_on_finish=" << m_on_finish
- << dendl;
+ dout(20) << "on_finish=" << on_finish << dendl;
- Mutex::Locker locker(m_lock);
- assert(is_running_());
-
- if (m_state == STATE_STARTING) {
- dout(20) << "interrupting start" << dendl;
-
- if (on_finish) {
- Context *on_start_finish = m_on_finish;
- FunctionContext *ctx = new FunctionContext(
- [this, on_start_finish, on_finish](int r) {
- if (on_start_finish) {
- on_start_finish->complete(r);
- }
- on_finish->complete(0);
- });
-
- m_on_finish = ctx;
- }
- } else if (m_state == STATE_FLUSHING_REPLAY) {
- dout(20) << "interrupting flush" << dendl;
-
- if (on_finish) {
- Context *on_flush_finish = m_on_finish;
- FunctionContext *ctx = new FunctionContext(
- [this, on_flush_finish, on_finish](int r) {
- if (on_flush_finish) {
- on_flush_finish->complete(r);
- }
- on_finish->complete(0);
- });
-
- m_on_finish = ctx;
+ bool shut_down_replay = false;
+ {
+ Mutex::Locker locker(m_lock);
+ assert(is_running_());
+
+ if (!is_stopped_()) {
+ if (m_state == STATE_STARTING) {
+ dout(20) << "interrupting start" << dendl;
+ } else {
+ dout(20) << "interrupting replay" << dendl;
+ shut_down_replay = true;
+ }
+
+ assert(m_on_stop_finish == nullptr);
+ std::swap(m_on_stop_finish, on_finish);
+ m_stop_requested = true;
}
- } else {
- assert(m_on_finish == nullptr);
- m_on_finish = on_finish;
+ }
+
+ if (shut_down_replay) {
on_stop_journal_replay_shut_down_start();
+ } else if (on_finish != nullptr) {
+ on_finish->complete(0);
}
- m_state = STATE_STOPPING;
}
template <typename I>
on_stop_journal_replay_shut_down_finish(r);
});
- m_local_replay->shut_down(false, ctx);
+ {
+ Mutex::Locker locker(m_lock);
+
+ // as we complete in-flight records, we might receive multiple stop requests
+ if (m_state != STATE_REPLAYING) {
+ return;
+ }
+ m_state = STATE_STOPPING;
+ m_local_replay->shut_down(false, ctx);
+ }
}
template <typename I>
void ImageReplayer<I>::on_stop_journal_replay_shut_down_finish(int r)
{
dout(20) << "r=" << r << dendl;
-
if (r < 0) {
derr << "error flushing journal replay: " << cpp_strerror(r) << dendl;
}
- m_local_image_ctx->journal->stop_external_replay();
- m_local_replay = nullptr;
+ {
+ Mutex::Locker locker(m_lock);
+ assert(m_state == STATE_STOPPING);
+ m_local_image_ctx->journal->stop_external_replay();
+ m_local_replay = nullptr;
+ }
on_stop_local_image_close_start();
}
assert(m_state == STATE_STOPPING);
m_state = STATE_STOPPED;
-
- std::swap(m_on_finish, on_finish);
+ m_stop_requested = false;
+ std::swap(m_on_stop_finish, on_finish);
}
dout(20) << "stop complete" << dendl;
- if (on_finish) {
+ if (on_finish != nullptr) {
dout(20) << "on finish complete, r=" << r << dendl;
on_finish->complete(r);
}
void ImageReplayer<I>::handle_replay_ready()
{
dout(20) << "enter" << dendl;
-
- ReplayEntry replay_entry;
- if (!m_remote_journaler->try_pop_front(&replay_entry)) {
+ if (on_replay_interrupted()) {
return;
}
- dout(20) << "processing entry tid=" << replay_entry.get_commit_tid() << dendl;
+ if (!m_remote_journaler->try_pop_front(&m_replay_entry)) {
+ return;
+ }
- bufferlist data = replay_entry.get_data();
- bufferlist::iterator it = data.begin();
- Context *on_ready = create_context_callback<
- ImageReplayer, &ImageReplayer<I>::handle_replay_process_ready>(this);
- Context *on_commit = new C_ReplayCommitted<I>(this, std::move(replay_entry));
- m_local_replay->process(&it, on_ready, on_commit);
+ // TODO
+ process_entry();
}
template <typename I>
{
dout(20) << "enter" << dendl;
- bool start_flush = false;
-
{
Mutex::Locker locker(m_lock);
-
- if (m_state == STATE_REPLAYING) {
- assert(m_on_finish == nullptr);
- m_on_finish = on_finish;
-
- m_state = STATE_FLUSHING_REPLAY;
-
- start_flush = true;
+ if (m_state == STATE_REPLAYING || m_state == STATE_REPLAYING) {
+ Context *ctx = new FunctionContext(
+ [on_finish](int r) {
+ if (on_finish != nullptr) {
+ on_finish->complete(r);
+ }
+ });
+ on_flush_local_replay_flush_start(ctx);
}
}
- if (start_flush) {
- on_flush_local_replay_flush_start();
- } else if (on_finish) {
+ if (on_finish) {
on_finish->complete(0);
}
}
template <typename I>
-void ImageReplayer<I>::on_flush_local_replay_flush_start()
+void ImageReplayer<I>::on_flush_local_replay_flush_start(Context *on_flush)
{
dout(20) << "enter" << dendl;
-
FunctionContext *ctx = new FunctionContext(
- [this](int r) {
- on_flush_local_replay_flush_finish(r);
+ [this, on_flush](int r) {
+ on_flush_local_replay_flush_finish(on_flush, r);
});
+ assert(m_lock.is_locked());
+ assert(m_state == STATE_REPLAYING);
m_local_replay->flush(ctx);
}
template <typename I>
-void ImageReplayer<I>::on_flush_local_replay_flush_finish(int r)
+void ImageReplayer<I>::on_flush_local_replay_flush_finish(Context *on_flush,
+ int r)
{
dout(20) << "r=" << r << dendl;
-
if (r < 0) {
derr << "error flushing local replay: " << cpp_strerror(r) << dendl;
- }
-
- if (on_flush_interrupted()) {
+ on_flush->complete(r);
return;
}
- on_flush_flush_commit_position_start(r);
+ on_flush_flush_commit_position_start(on_flush);
}
template <typename I>
-void ImageReplayer<I>::on_flush_flush_commit_position_start(int last_r)
+void ImageReplayer<I>::on_flush_flush_commit_position_start(Context *on_flush)
{
-
FunctionContext *ctx = new FunctionContext(
- [this, last_r](int r) {
- on_flush_flush_commit_position_finish(last_r, r);
+ [this, on_flush](int r) {
+ on_flush_flush_commit_position_finish(on_flush, r);
});
m_remote_journaler->flush_commit_position(ctx);
}
template <typename I>
-void ImageReplayer<I>::on_flush_flush_commit_position_finish(int last_r, int r)
+void ImageReplayer<I>::on_flush_flush_commit_position_finish(Context *on_flush,
+ int r)
{
if (r < 0) {
derr << "error flushing remote journal commit position: "
<< cpp_strerror(r) << dendl;
- } else {
- r = last_r;
- }
-
- Context *on_finish(nullptr);
-
- {
- Mutex::Locker locker(m_lock);
- if (m_state == STATE_STOPPING) {
- r = -EINTR;
- } else {
- assert(m_state == STATE_FLUSHING_REPLAY);
-
- m_state = STATE_REPLAYING;
- }
- std::swap(m_on_finish, on_finish);
}
dout(20) << "flush complete, r=" << r << dendl;
-
- if (on_finish) {
- dout(20) << "on finish complete, r=" << r << dendl;
- on_finish->complete(r);
- }
+ on_flush->complete(r);
}
template <typename I>
-bool ImageReplayer<I>::on_flush_interrupted()
+bool ImageReplayer<I>::on_replay_interrupted()
{
- Context *on_finish(nullptr);
-
+ bool shut_down;
{
Mutex::Locker locker(m_lock);
-
- if (m_state == STATE_FLUSHING_REPLAY) {
- return false;
- }
-
- assert(m_state == STATE_STOPPING);
-
- std::swap(m_on_finish, on_finish);
+ shut_down = m_stop_requested;
}
- dout(20) << "flush interrupted" << dendl;
-
- if (on_finish) {
- int r = -EINTR;
- dout(20) << "on finish complete, r=" << r << dendl;
- on_finish->complete(r);
+ if (shut_down) {
+ on_stop_journal_replay_shut_down_start();
}
-
- return true;
+ return shut_down;
}
template <typename I>
}
template <typename I>
-void ImageReplayer<I>::handle_replay_process_ready(int r)
+void ImageReplayer<I>::handle_replay_complete(int r)
{
- // journal::Replay is ready for more events -- attempt to pop another
-
- dout(20) << "enter" << dendl;
-
+ dout(20) << "r=" << r << dendl;
if (r < 0) {
- derr << "error replaying journal entry: " << cpp_strerror(r)
- << dendl;
- // TODO: handle error
+ derr << "replay encountered an error: " << cpp_strerror(r) << dendl;
}
- assert(r == 0);
- handle_replay_ready();
+ {
+ Mutex::Locker locker(m_lock);
+ m_stop_requested = true;
+ }
+ on_replay_interrupted();
}
template <typename I>
-void ImageReplayer<I>::handle_replay_complete(int r)
-{
- dout(20) "r=" << r << dendl;
+void ImageReplayer<I>::replay_flush() {
+ dout(20) << dendl;
- //m_remote_journaler->stop_replay();
+ // TODO
}
template <typename I>
-void ImageReplayer<I>::handle_replay_committed(ReplayEntry *replay_entry, int r)
-{
- dout(20) << "commit_tid=" << replay_entry->get_commit_tid() << ", r=" << r
+void ImageReplayer<I>::handle_replay_flush(int r) {
+ dout(20) << "r=" << r << dendl;
+
+ // TODO
+}
+
+template <typename I>
+void ImageReplayer<I>::get_remote_tag() {
+ dout(20) << dendl;
+
+ // TODO
+}
+
+template <typename I>
+void ImageReplayer<I>::handle_get_remote_tag(int r) {
+ dout(20) << "r=" << r << dendl;
+
+ // TODO
+}
+
+template <typename I>
+void ImageReplayer<I>::allocate_local_tag() {
+ dout(20) << dendl;
+
+ // TODO
+}
+
+template <typename I>
+void ImageReplayer<I>::handle_allocate_local_tag(int r) {
+ dout(20) << "r=" << r << dendl;
+
+ // TODO
+}
+
+template <typename I>
+void ImageReplayer<I>::process_entry() {
+ dout(20) << "processing entry tid=" << m_replay_entry.get_commit_tid()
+ << dendl;
+
+ bufferlist data = m_replay_entry.get_data();
+ bufferlist::iterator it = data.begin();
+
+ Context *on_ready = create_context_callback<
+ ImageReplayer, &ImageReplayer<I>::handle_process_entry_ready>(this);
+ Context *on_commit = new C_ReplayCommitted(this, std::move(m_replay_entry));
+ m_local_replay->process(&it, on_ready, on_commit);
+}
+
+template <typename I>
+void ImageReplayer<I>::handle_process_entry_ready(int r) {
+ dout(20) << dendl;
+ assert(r == 0);
+
+ // attempt to process the next event
+ handle_replay_ready();
+}
+
+template <typename I>
+void ImageReplayer<I>::handle_process_entry_safe(const ReplayEntry& replay_entry,
+ int r) {
+ dout(20) << "commit_tid=" << replay_entry.get_commit_tid() << ", r=" << r
<< dendl;
- m_remote_journaler->committed(*replay_entry);
+ if (r < 0) {
+ derr << "failed to commit journal event: " << cpp_strerror(r) << dendl;
+
+ handle_replay_complete(r);
+ return;
+ }
+
+ m_remote_journaler->committed(replay_entry);
}
template <typename I>
return "Starting";
case ImageReplayer<I>::STATE_REPLAYING:
return "Replaying";
- case ImageReplayer<I>::STATE_FLUSHING_REPLAY:
- return "FlushingReplay";
case ImageReplayer<I>::STATE_STOPPING:
return "Stopping";
case ImageReplayer<I>::STATE_STOPPED:
#include "common/WorkQueue.h"
#include "include/rados/librados.hpp"
#include "cls/journal/cls_journal_types.h"
+#include "journal/ReplayEntry.h"
#include "librbd/journal/Types.h"
#include "librbd/journal/TypeTraits.h"
#include "types.h"
STATE_UNINITIALIZED,
STATE_STARTING,
STATE_REPLAYING,
- STATE_FLUSHING_REPLAY,
STATE_STOPPING,
STATE_STOPPED,
};
};
ImageReplayer(Threads *threads, RadosRef local, RadosRef remote,
- const std::string &mirror_uuid, int64_t local_pool_id,
+ const std::string &mirror_uuid, int64_t local_pool_id,
int64_t remote_pool_id, const std::string &remote_image_id,
const std::string &global_image_id);
virtual ~ImageReplayer();
void print_status(Formatter *f, stringstream *ss);
virtual void handle_replay_ready();
- virtual void handle_replay_process_ready(int r);
virtual void handle_replay_complete(int r);
- virtual void handle_replay_committed(ReplayEntry* replay_entry, int r);
-
inline int64_t get_remote_pool_id() const {
return m_remote_pool_id;
}
/**
* @verbatim
* (error)
- * <uninitialized> <------------------------ FAIL
- * | ^
- * v *
- * <starting> *
- * | *
- * v (error) *
- * BOOTSTRAP_IMAGE * * * * * * * * * * * * * *
- * | *
- * v (error) *
- * INIT_REMOTE_JOURNALER * * * * * * * * * * *
- * | *
- * v (error) *
- * START_REPLAY * * * * * * * * * * * * * * * *
+ * <uninitialized> <------------------------------------ FAIL
+ * | ^
+ * v *
+ * <starting> *
+ * | *
+ * v (error) *
+ * BOOTSTRAP_IMAGE * * * * * * * * * * * * * * * * * * * *
+ * | *
+ * v (error) *
+ * INIT_REMOTE_JOURNALER * * * * * * * * * * * * * * * * *
+ * | *
+ * v (error) *
+ * START_REPLAY * * * * * * * * * * * * * * * * * * * * * *
+ * |
+ * | /--------------------------------------------\
+ * | | |
+ * v v (asok flush) |
+ * REPLAYING -------------> LOCAL_REPLAY_FLUSH |
+ * | \ | |
+ * | | v |
+ * | | FLUSH_COMMIT_POSITION |
+ * | | | |
+ * | | \--------------------/|
+ * | | |
+ * | | (entries available) |
+ * | \-----------> REPLAY_READY |
+ * | | |
+ * | | (skip if not |
+ * | v needed) (error)
+ * | REPLAY_FLUSH * * * * * * * * *
+ * | | | *
+ * | | (skip if not | *
+ * | v needed) (error) *
+ * | GET_REMOTE_TAG * * * * * * * *
+ * | | | *
+ * | | (skip if not | *
+ * | v needed) (error) *
+ * | ALLOCATE_LOCAL_TAG * * * * * *
+ * | | | *
+ * | v (error) *
+ * | PROCESS_ENTRY * * * * * * * * *
+ * | | | *
+ * | \---------------------/ *
+ * v *
+ * REPLAY_COMPLETE < * * * * * * * * * * * * * * * * * * *
* |
- * | /-------------------------------------------\
- * | | |
- * v v |
- * <replaying> --------------> <flushing_replay> |
- * | | |
- * v v |
- * <stopping> LOCAL_REPLAY_FLUSH |
- * | | |
- * v v |
- * JOURNAL_REPLAY_SHUT_DOWN FLUSH_COMMIT_POSITION |
- * | | |
- * v \-------------------/
+ * v
+ * JOURNAL_REPLAY_SHUT_DOWN
+ * |
+ * v
* LOCAL_IMAGE_CLOSE
* |
* v
virtual void on_stop_local_image_close_start();
virtual void on_stop_local_image_close_finish(int r);
- virtual void on_flush_local_replay_flush_start();
- virtual void on_flush_local_replay_flush_finish(int r);
- virtual void on_flush_flush_commit_position_start(int last_r);
- virtual void on_flush_flush_commit_position_finish(int last_r, int r);
- virtual bool on_flush_interrupted();
+ virtual void on_flush_local_replay_flush_start(Context *on_flush);
+ virtual void on_flush_local_replay_flush_finish(Context *on_flush, int r);
+ virtual void on_flush_flush_commit_position_start(Context *on_flush);
+ virtual void on_flush_flush_commit_position_finish(Context *on_flush, int r);
+
+ bool on_replay_interrupted();
void close_local_image(Context *on_finish); // for tests
librbd::journal::Replay<ImageCtxT> *m_local_replay;
Journaler* m_remote_journaler;
::journal::ReplayHandler *m_replay_handler;
- Context *m_on_finish;
+
+ Context *m_on_start_finish = nullptr;
+ Context *m_on_stop_finish = nullptr;
+ bool m_stop_requested = false;
+
AdminSocketHook *m_asok_hook;
librbd::journal::MirrorPeerClientMeta m_client_meta;
+ ReplayEntry m_replay_entry;
+
+ struct C_ReplayCommitted : public Context {
+ ImageReplayer *replayer;
+ ReplayEntry replay_entry;
+
+ C_ReplayCommitted(ImageReplayer *replayer,
+ ReplayEntry &&replay_entry)
+ : replayer(replayer), replay_entry(std::move(replay_entry)) {
+ }
+ virtual void finish(int r) {
+ replayer->handle_process_entry_safe(replay_entry, r);
+ }
+ };
+
static std::string to_string(const State state);
State get_state_() const { return m_state; }
void handle_init_remote_journaler(int r);
void start_replay();
+
+ void replay_flush();
+ void handle_replay_flush(int r);
+
+ void get_remote_tag();
+ void handle_get_remote_tag(int r);
+
+ void allocate_local_tag();
+ void handle_allocate_local_tag(int r);
+
+ void process_entry();
+ void handle_process_entry_ready(int r);
+ void handle_process_entry_safe(const ReplayEntry& replay_entry, int r);
+
};
} // namespace mirror