}
};
+class BootstrapThread : public Thread {
+public:
+ explicit BootstrapThread(ImageReplayer *replayer,
+ const ImageReplayer::BootstrapParams ¶ms,
+ Context *on_finish)
+ : replayer(replayer), params(params), on_finish(on_finish) {}
+
+ virtual ~BootstrapThread() {}
+
+protected:
+ void *entry()
+ {
+ int r = replayer->bootstrap(params);
+ on_finish->complete(r);
+ delete this;
+ return NULL;
+ }
+
+private:
+ ImageReplayer *replayer;
+ ImageReplayer::BootstrapParams params;
+ Context *on_finish;
+};
+
class ImageReplayerAdminSocketCommand {
public:
virtual ~ImageReplayerAdminSocketCommand() {}
m_local_image_ctx(nullptr),
m_local_replay(nullptr),
m_remote_journaler(nullptr),
- m_replay_handler(nullptr)
+ m_replay_handler(nullptr),
+ m_on_finish(nullptr)
{
CephContext *cct = static_cast<CephContext *>(m_local->cct());
delete m_asok_hook;
}
-int ImageReplayer::start(const BootstrapParams *bootstrap_params)
+void ImageReplayer::start(Context *on_finish,
+ const BootstrapParams *bootstrap_params)
{
- // TODO: make async
-
- dout(20) << "enter" << dendl;
+ dout(20) << "on_finish=" << on_finish << ", m_on_finish=" << m_on_finish
+ << dendl;
{
Mutex::Locker locker(m_lock);
- assert(m_state == STATE_UNINITIALIZED || m_state == STATE_STOPPED);
+ assert(is_stopped_());
m_state = STATE_STARTING;
- }
- std::string remote_journal_id = m_remote_image_id;
- std::string image_name = "";
- C_SaferCond cond, lock_ctx;
- double commit_interval;
- bool registered;
- int r = 0;
+ assert(m_on_finish == nullptr);
+ m_on_finish = on_finish;
+ }
- r = m_remote->ioctx_create2(m_remote_pool_id, m_remote_ioctx);
+ int r = m_remote->ioctx_create2(m_remote_pool_id, m_remote_ioctx);
if (r < 0) {
derr << "error opening ioctx for remote pool " << m_remote_pool_id
<< ": " << cpp_strerror(r) << dendl;
- return r;
+ on_start_fail_start(r);
+ return;
}
CephContext *cct = static_cast<CephContext *>(m_local->cct());
- commit_interval = cct->_conf->rbd_journal_commit_age;
- bool remote_journaler_initialized = false;
+ double commit_interval = cct->_conf->rbd_journal_commit_age;
m_remote_journaler = new ::journal::Journaler(m_threads->work_queue,
- m_threads->timer,
- &m_threads->timer_lock,
- m_remote_ioctx,
- remote_journal_id,
- m_client_id, commit_interval);
- r = get_registered_client_status(®istered);
+ m_threads->timer,
+ &m_threads->timer_lock,
+ m_remote_ioctx,
+ m_remote_image_id, m_client_id,
+ commit_interval);
+
+ on_start_get_registered_client_status_start(bootstrap_params);
+}
+
+void ImageReplayer::on_start_get_registered_client_status_start(
+ const BootstrapParams *bootstrap_params)
+{
+ dout(20) << "enter" << dendl;
+
+ struct Metadata {
+ uint64_t minimum_set;
+ uint64_t active_set;
+ std::set<cls::journal::Client> registered_clients;
+ BootstrapParams bootstrap_params;
+ } *m = new Metadata();
+
+ if (bootstrap_params) {
+ m->bootstrap_params = *bootstrap_params;
+ }
+
+ FunctionContext *ctx = new FunctionContext(
+ [this, m, bootstrap_params](int r) {
+ on_start_get_registered_client_status_finish(r, m->registered_clients,
+ m->bootstrap_params);
+ delete m;
+ });
+
+ m_remote_journaler->get_mutable_metadata(&m->minimum_set, &m->active_set,
+ &m->registered_clients, ctx);
+}
+
+void ImageReplayer::on_start_get_registered_client_status_finish(int r,
+ const std::set<cls::journal::Client> ®istered_clients,
+ const BootstrapParams &bootstrap_params)
+{
+ dout(20) << "r=" << r << dendl;
+
if (r < 0) {
derr << "error obtaining registered client status: "
<< cpp_strerror(r) << dendl;
- goto fail;
+ on_start_fail_start(r);
+ return;
+ }
+ if (on_start_interrupted()) {
+ return;
}
- if (registered) {
- if (bootstrap_params) {
- dout(0) << "ignoring bootsrap params: client already registered" << dendl;
- }
- } else {
- r = bootstrap(bootstrap_params);
- if (r < 0) {
- derr << "bootstrap failed: " << cpp_strerror(r) << dendl;
- goto fail;
+ for (auto c : registered_clients) {
+ if (c.id == m_client_id) {
+ librbd::journal::ClientData client_data;
+ bufferlist::iterator bl = c.data.begin();
+ try {
+ ::decode(client_data, bl);
+ } catch (const buffer::error &err) {
+ derr << "failed to decode client meta data: " << err.what() << dendl;
+ on_start_fail_start(-EINVAL);
+ return;
+ }
+ librbd::journal::MirrorPeerClientMeta &cm =
+ boost::get<librbd::journal::MirrorPeerClientMeta>(client_data.client_meta);
+ m_local_image_id = cm.image_id;
+
+ // TODO: snap name should be transient
+ if (cm.sync_points.empty()) {
+ derr << "sync points not found" << dendl;
+ on_start_fail_start(-ENOENT);
+ return;
+ }
+ m_snap_name = cm.sync_points.front().snap_name;
+
+ dout(20) << "client found, pool_id=" << m_local_pool_id << ", image_id="
+ << m_local_image_id << ", snap_name=" << m_snap_name << dendl;
+
+ if (!bootstrap_params.empty()) {
+ dout(0) << "ignoring bootsrap params: client already registered" << dendl;
+ }
+
+ on_start_bootstrap_finish(0);
+ return;
}
}
- m_remote_journaler->init(&cond);
- r = cond.wait();
+ dout(20) << "client not found" << dendl;
+
+ on_start_bootstrap_start(bootstrap_params);
+}
+
+void ImageReplayer::on_start_bootstrap_start(const BootstrapParams ¶ms)
+{
+ dout(20) << "enter" << dendl;
+
+ FunctionContext *ctx = new FunctionContext(
+ [this](int r) {
+ on_start_bootstrap_finish(r);
+ });
+
+ BootstrapThread *thread = new BootstrapThread(this, params, ctx);
+
+ thread->create("bootstrap");
+ thread->detach();
+ // TODO: As the bootstrap might take long time it needs some control
+ // to get current status, interrupt, etc...
+}
+
+void ImageReplayer::on_start_bootstrap_finish(int r)
+{
+ dout(20) << "r=" << r << dendl;
+
+ if (r < 0) {
+ on_start_fail_start(r);
+ return;
+ }
+ if (on_start_interrupted()) {
+ return;
+ }
+
+ on_start_remote_journaler_init_start();
+}
+
+void ImageReplayer::on_start_remote_journaler_init_start()
+{
+ dout(20) << "enter" << dendl;
+
+ FunctionContext *ctx = new FunctionContext(
+ [this](int r) {
+ on_start_remote_journaler_init_finish(r);
+ });
+
+ m_remote_journaler->init(ctx);
+}
+
+void ImageReplayer::on_start_remote_journaler_init_finish(int r)
+{
+ dout(20) << "r=" << r << dendl;
+
if (r < 0) {
derr << "error initializing journal: " << cpp_strerror(r) << dendl;
- goto fail;
+ on_start_fail_start(r);
+ return;
+ }
+ if (on_start_interrupted()) {
+ return;
}
- remote_journaler_initialized = true;
r = m_local->ioctx_create2(m_local_pool_id, m_local_ioctx);
if (r < 0) {
derr << "error opening ioctx for local pool " << m_local_pool_id
<< ": " << cpp_strerror(r) << dendl;
- goto fail;
+ on_start_fail_start(r);
+ return;
}
+ on_start_local_image_open_start();
+}
+
+void ImageReplayer::on_start_local_image_open_start()
+{
+ dout(20) << "enter" << dendl;
+
m_local_image_ctx = new librbd::ImageCtx("", m_local_image_id, NULL,
m_local_ioctx, false);
- r = m_local_image_ctx->state->open();
+
+ FunctionContext *ctx = new FunctionContext(
+ [this](int r) {
+ on_start_local_image_open_finish(r);
+ });
+ m_local_image_ctx->state->open(ctx);
+}
+
+void ImageReplayer::on_start_local_image_open_finish(int r)
+{
+ dout(20) << "r=" << r << dendl;
+
if (r < 0) {
derr << "error opening local image " << m_local_image_id
<< ": " << cpp_strerror(r) << dendl;
- delete m_local_image_ctx;
- m_local_image_ctx = nullptr;
- goto fail;
- }
- {
- RWLock::WLocker owner_locker(m_local_image_ctx->owner_lock);
- m_local_image_ctx->exclusive_lock->request_lock(&lock_ctx);
+ FunctionContext *ctx = new FunctionContext(
+ [this, r](int r1) {
+ assert(r1 == 0);
+ delete m_local_image_ctx;
+ m_local_image_ctx = nullptr;
+ on_start_fail_start(r);
+ });
+
+ m_threads->work_queue->queue(ctx, 0);
+ return;
}
- r = lock_ctx.wait();
+ if (on_start_interrupted()) {
+ return;
+ }
+
+ on_start_local_image_lock_start();
+}
+
+void ImageReplayer::on_start_local_image_lock_start()
+{
+ dout(20) << "enter" << dendl;
+
+ RWLock::WLocker owner_locker(m_local_image_ctx->owner_lock);
+ FunctionContext *ctx = new FunctionContext(
+ [this](int r) {
+ on_start_local_image_lock_finish(r);
+ });
+ m_local_image_ctx->exclusive_lock->request_lock(ctx);
+}
+
+void ImageReplayer::on_start_local_image_lock_finish(int r)
+{
+ dout(20) << "r=" << r << dendl;
+
if (r < 0) {
derr << "error to lock exclusively local image " << m_local_image_id
<< ": " << cpp_strerror(r) << dendl;
- goto fail;
+ on_start_fail_start(r);
+ return;
}
-
if (m_local_image_ctx->journal == nullptr) {
- derr << "journaling is not enabled on local image " << m_local_image_id
- << ": " << cpp_strerror(r) << dendl;
- goto fail;
+ on_start_fail_start(-EINVAL);
+ return;
+ }
+ if (on_start_interrupted()) {
+ return;
+ }
+
+ on_start_wait_for_local_journal_ready_start();
+}
+
+void ImageReplayer::on_start_wait_for_local_journal_ready_start()
+{
+ dout(20) << "enter" << dendl;
+
+ FunctionContext *ctx = new FunctionContext(
+ [this](int r) {
+ on_start_wait_for_local_journal_ready_finish(r);
+ });
+ m_local_image_ctx->journal->wait_for_journal_ready(ctx);
+}
+
+void ImageReplayer::on_start_wait_for_local_journal_ready_finish(int r)
+{
+ dout(20) << "r=" << r << dendl;
+
+ if (r < 0) {
+ derr << "error when waiting for local journal ready: " << cpp_strerror(r)
+ << dendl;
+ on_start_fail_start(r);
+ return;
+ }
+ if (on_start_interrupted()) {
+ return;
}
r = m_local_image_ctx->journal->start_external_replay(&m_local_replay);
if (r < 0) {
derr << "error starting external replay on local image "
<< m_local_image_id << ": " << cpp_strerror(r) << dendl;
- goto fail;
+ on_start_fail_start(r);
+ return;
}
m_replay_handler = new ReplayHandler(this);
dout(20) << "m_remote_journaler=" << *m_remote_journaler << dendl;
+ assert(r == 0);
+
+ Context *on_finish(nullptr);
+
{
Mutex::Locker locker(m_lock);
- assert(m_state == STATE_STARTING);
+ if (m_state == STATE_STOPPING) {
+ on_start_fail_start(-EINTR);
+ return;
+ }
+
+ assert(m_state == STATE_STARTING);
m_state = STATE_REPLAYING;
+
+ std::swap(m_on_finish, on_finish);
}
- return 0;
+ dout(20) << "start succeeded" << dendl;
+
+ if (on_finish) {
+ dout(20) << "on finish complete, r=" << r << dendl;
+ on_finish->complete(r);
+ }
+}
-fail:
- dout(20) << "fail, r=" << r << dendl;
+void ImageReplayer::on_start_fail_start(int r)
+{
+ dout(20) << "r=" << r << dendl;
+
+ FunctionContext *ctx = new FunctionContext(
+ [this, r](int r1) {
+ assert(r1 == 0);
+ on_start_fail_finish(r);
+ });
+
+ m_threads->work_queue->queue(ctx, 0);
+}
+
+void ImageReplayer::on_start_fail_finish(int r)
+{
+ dout(20) << "r=" << r << dendl;
if (m_remote_journaler) {
- if (remote_journaler_initialized) {
- m_remote_journaler->stop_replay();
+ if (m_remote_journaler->is_initialized()) {
m_remote_journaler->shut_down();
}
delete m_remote_journaler;
}
if (m_local_replay) {
- Mutex::Locker locker(m_lock);
shut_down_journal_replay(true);
m_local_image_ctx->journal->stop_external_replay();
m_local_replay = nullptr;
m_local_ioctx.close();
m_remote_ioctx.close();
+ Context *on_finish(nullptr);
+
{
Mutex::Locker locker(m_lock);
- assert(m_state == STATE_STARTING);
+ if (m_state == STATE_STOPPING) {
+ assert(r == -EINTR);
+ dout(20) << "start interrupted" << dendl;
+ m_state = STATE_STOPPED;
+ } else {
+ assert(m_state == STATE_STARTING);
+ dout(20) << "start failed" << dendl;
+ m_state = STATE_UNINITIALIZED;
+ }
+ std::swap(m_on_finish, on_finish);
+ }
- m_state = STATE_UNINITIALIZED;
+ if (on_finish) {
+ dout(20) << "on finish complete, r=" << r << dendl;
+ on_finish->complete(r);
}
+}
- return r;
+bool ImageReplayer::on_start_interrupted()
+{
+ Mutex::Locker locker(m_lock);
+
+ if (m_state == STATE_STARTING) {
+ return false;
+ }
+
+ assert(m_state == STATE_STOPPING);
+
+ on_start_fail_start(-EINTR);
+ return true;
}
-void ImageReplayer::stop()
+void ImageReplayer::stop(Context *on_finish)
{
- dout(20) << "enter" << dendl;
+ dout(20) << "on_finish=" << on_finish << ", m_on_finish=" << m_on_finish
+ << dendl;
- {
- Mutex::Locker locker(m_lock);
- assert(m_state == STATE_REPLAYING);
+ Mutex::Locker locker(m_lock);
+ assert(is_running_());
+
+ if (m_state == STATE_STARTING) {
+ dout(20) << "interrupting start" << dendl;
- m_state = STATE_STOPPING;
+ if (on_finish) {
+ Context *on_start_finish = m_on_finish;
+ FunctionContext *ctx = new FunctionContext(
+ [this, on_start_finish, on_finish](int r) {
+ if (on_start_finish) {
+ on_start_finish->complete(r);
+ }
+ on_finish->complete(0);
+ });
+
+ m_on_finish = ctx;
+ }
+ } else {
+ assert(m_on_finish == nullptr);
+ m_on_finish = on_finish;
+ on_stop_journal_replay_shut_down_start();
}
+ m_state = STATE_STOPPING;
+}
- shut_down_journal_replay(false);
+void ImageReplayer::on_stop_journal_replay_shut_down_start()
+{
+ dout(20) << "enter" << dendl;
+
+ FunctionContext *ctx = new FunctionContext(
+ [this](int r) {
+ on_stop_journal_replay_shut_down_finish(r);
+ });
+
+ m_local_replay->shut_down(false, ctx);
+}
+
+void ImageReplayer::on_stop_journal_replay_shut_down_finish(int r)
+{
+ dout(20) << "r=" << r << dendl;
+
+ if (r < 0) {
+ derr << "error flushing journal replay: " << cpp_strerror(r) << dendl;
+ }
m_local_image_ctx->journal->stop_external_replay();
m_local_replay = nullptr;
- m_local_image_ctx->state->close();
- m_local_image_ctx = nullptr;
+ on_stop_local_image_close_start();
+}
+
+void ImageReplayer::on_stop_local_image_close_start()
+{
+ dout(20) << "enter" << dendl;
+
+ FunctionContext *ctx = new FunctionContext(
+ [this](int r) {
+ on_stop_local_image_close_finish(r);
+ });
+
+ m_local_image_ctx->state->close(ctx);
+}
+
+void ImageReplayer::on_stop_local_image_close_finish(int r)
+{
+ dout(20) << "r=" << r << dendl;
+
+ if (r < 0) {
+ derr << "error closing local image: " << cpp_strerror(r) << dendl;
+ }
+
+ on_stop_local_image_delete_start();
+}
+
+void ImageReplayer::on_stop_local_image_delete_start()
+{
+ FunctionContext *ctx = new FunctionContext(
+ [this](int r) {
+ delete m_local_image_ctx;
+ m_local_image_ctx = nullptr;
+ on_stop_local_image_delete_finish(r);
+ });
+
+ m_threads->work_queue->queue(ctx, 0);
+}
+
+void ImageReplayer::on_stop_local_image_delete_finish(int r)
+{
+ assert(r == 0);
m_local_ioctx.close();
m_remote_ioctx.close();
+ Context *on_finish(nullptr);
+
{
Mutex::Locker locker(m_lock);
assert(m_state == STATE_STOPPING);
m_state = STATE_STOPPED;
+
+ std::swap(m_on_finish, on_finish);
}
- dout(20) << "done" << dendl;
+ dout(20) << "stop complete" << dendl;
+
+ if (on_finish) {
+ dout(20) << "on finish complete, r=" << r << dendl;
+ on_finish->complete(r);
+ }
+}
+
+void ImageReplayer::close_local_image(Context *on_finish)
+{
+ m_local_image_ctx->state->close(on_finish);
+}
+
+void ImageReplayer::handle_replay_ready()
+{
+ dout(20) << "enter" << dendl;
+
+ ::journal::ReplayEntry replay_entry;
+ if (!m_remote_journaler->try_pop_front(&replay_entry)) {
+ return;
+ }
+
+ dout(20) << "processing entry tid=" << replay_entry.get_commit_tid() << dendl;
+
+ bufferlist data = replay_entry.get_data();
+ bufferlist::iterator it = data.begin();
+ Context *on_ready = create_context_callback<
+ ImageReplayer, &ImageReplayer::handle_replay_process_ready>(this);
+ Context *on_commit = new C_ReplayCommitted(this, std::move(replay_entry));
+ m_local_replay->process(&it, on_ready, on_commit);
}
int ImageReplayer::flush()
return r < 0 ? r : r1;
}
-void ImageReplayer::handle_replay_ready()
-{
- dout(20) << "enter" << dendl;
-
- ::journal::ReplayEntry replay_entry;
- if (!m_remote_journaler->try_pop_front(&replay_entry)) {
- return;
- }
-
- dout(20) << "processing entry tid=" << replay_entry.get_commit_tid() << dendl;
-
- bufferlist data = replay_entry.get_data();
- bufferlist::iterator it = data.begin();
- Context *on_ready = create_context_callback<
- ImageReplayer, &ImageReplayer::handle_replay_process_ready>(this);
- Context *on_commit = new C_ReplayCommitted(this, std::move(replay_entry));
- m_local_replay->process(&it, on_ready, on_commit);
-}
-
void ImageReplayer::handle_replay_process_ready(int r)
{
// journal::Replay is ready for more events -- attempt to pop another
m_remote_journaler->committed(*replay_entry);
}
-int ImageReplayer::get_registered_client_status(bool *registered)
-{
- dout(20) << "enter" << dendl;
-
- uint64_t minimum_set;
- uint64_t active_set;
- std::set<cls::journal::Client> registered_clients;
- C_SaferCond cond;
- m_remote_journaler->get_mutable_metadata(&minimum_set, &active_set,
- ®istered_clients, &cond);
- int r = cond.wait();
- if (r < 0) {
- derr << "error retrieving remote journal registered clients: "
- << cpp_strerror(r) << dendl;
- return r;
- }
-
- for (auto c : registered_clients) {
- if (c.id == m_client_id) {
- *registered = true;
- librbd::journal::ClientData client_data;
- bufferlist::iterator bl = c.data.begin();
- try {
- ::decode(client_data, bl);
- } catch (const buffer::error &err) {
- derr << "failed to decode client meta data: " << err.what() << dendl;
- return -EINVAL;
- }
- librbd::journal::MirrorPeerClientMeta &cm =
- boost::get<librbd::journal::MirrorPeerClientMeta>(client_data.client_meta);
- m_local_image_id = cm.image_id;
-
- // TODO: snap name should be transient
- if (cm.sync_points.empty()) {
- return -ENOENT;
- }
- m_snap_name = cm.sync_points.front().snap_name;
-
- dout(20) << "client found, pool_id=" << m_local_pool_id << ", image_id="
- << m_local_image_id << ", snap_name=" << m_snap_name << dendl;
- return 0;
- }
- }
-
- dout(20) << "client not found" << dendl;
-
- *registered = false;
- return 0;
-}
-
int ImageReplayer::register_client()
{
// TODO allocate snap as part of sync process
- std::string m_snap_name = ".rbd-mirror." + m_client_id;
+ m_snap_name = ".rbd-mirror." + m_client_id;
dout(20) << "mirror_uuid=" << m_client_id << ", "
<< "image_id=" << m_local_image_id << ", "
return 0;
}
-int ImageReplayer::bootstrap(const BootstrapParams *bootstrap_params)
+int ImageReplayer::bootstrap(const BootstrapParams &bootstrap_params)
{
// Register client and sync images
int r;
BootstrapParams params;
- if (bootstrap_params) {
+ if (!bootstrap_params.empty()) {
dout(20) << "using external bootstrap params" << dendl;
- params = *bootstrap_params;
+ params = bootstrap_params;
} else {
r = get_bootrstap_params(¶ms);
if (r < 0) {
- derr << "error obtaining bootrstap parameters: "
+ derr << "error obtaining bootstrap parameters: "
<< cpp_strerror(r) << dendl;
return r;
}
#include "common/Mutex.h"
#include "common/WorkQueue.h"
#include "include/rados/librados.hpp"
+#include "cls/journal/cls_journal_types.h"
#include "types.h"
-class ContextWQ;
-
namespace journal {
class Journaler;
const std::string local_image_name) :
local_pool_name(local_pool_name),
local_image_name(local_image_name) {}
+
+ bool empty() const {
+ return local_pool_name.empty() && local_image_name.empty();
+ }
};
public:
ImageReplayer(const ImageReplayer&) = delete;
ImageReplayer& operator=(const ImageReplayer&) = delete;
- State get_state() { return m_state; }
-
- int start(const BootstrapParams *bootstrap_params = nullptr);
- void stop();
+ State get_state() { Mutex::Locker l(m_lock); return get_state_(); }
+ bool is_stopped() { Mutex::Locker l(m_lock); return is_stopped_(); }
+ bool is_running() { Mutex::Locker l(m_lock); return is_running_(); }
+ void start(Context *on_finish = nullptr,
+ const BootstrapParams *bootstrap_params = nullptr);
+ void stop(Context *on_finish = nullptr);
int flush();
+ int bootstrap(const BootstrapParams &bootstrap_params);
+
virtual void handle_replay_ready();
virtual void handle_replay_process_ready(int r);
virtual void handle_replay_complete(int r);
virtual void handle_replay_committed(::journal::ReplayEntry* replay_entry, int r);
+protected:
+ /**
+ * @verbatim
+ * (error)
+ * <uninitialized> <------------------- FAIL
+ * | ^
+ * v |
+ * <starting> |
+ * | |
+ * v (error) |
+ * GET_REGISTERED_CLIENT_STATUS -------->|
+ * | |
+ * v (error)|
+ * BOOTSTRAP (skip if not needed) ------>|
+ * | |
+ * v (error) |
+ * REMOTE_JOURNALER_INIT --------------->|
+ * | |
+ * v (error) |
+ * LOCAL_IMAGE_OPEN -------------------->|
+ * | |
+ * v (error) |
+ * LOCAL_IMAGE_LOCK -------------------->|
+ * | |
+ * v (error) |
+ * WAIT_FOR_LOCAL_JOURNAL_READY -------->/
+ * |
+ * v
+ * <replaying>
+ * |
+ * v
+ * <stopping>
+ * |
+ * v
+ * JOURNAL_REPLAY_SHUT_DOWN
+ * |
+ * v
+ * LOCAL_IMAGE_CLOSE
+ * |
+ * v
+ * LOCAL_IMAGE_DELETE
+ * |
+ * v
+ * <stopped>
+ *
+ * @endverbatim
+ */
+
+ virtual void on_start_get_registered_client_status_start(
+ const BootstrapParams *bootstrap_params);
+ virtual void on_start_get_registered_client_status_finish(int r,
+ const std::set<cls::journal::Client> ®istered_clients,
+ const BootstrapParams &bootstrap_params);
+ virtual void on_start_bootstrap_start(const BootstrapParams ¶ms);
+ virtual void on_start_bootstrap_finish(int r);
+ virtual void on_start_remote_journaler_init_start();
+ virtual void on_start_remote_journaler_init_finish(int r);
+ virtual void on_start_local_image_open_start();
+ virtual void on_start_local_image_open_finish(int r);
+ virtual void on_start_local_image_lock_start();
+ virtual void on_start_local_image_lock_finish(int r);
+ virtual void on_start_wait_for_local_journal_ready_start();
+ virtual void on_start_wait_for_local_journal_ready_finish(int r);
+ virtual void on_start_fail_start(int r);
+ virtual void on_start_fail_finish(int r);
+ virtual bool on_start_interrupted();
+
+ virtual void on_stop_journal_replay_shut_down_start();
+ virtual void on_stop_journal_replay_shut_down_finish(int r);
+ virtual void on_stop_local_image_close_start();
+ virtual void on_stop_local_image_close_finish(int r);
+ virtual void on_stop_local_image_delete_start();
+ virtual void on_stop_local_image_delete_finish(int r);
+
+ void close_local_image(Context *on_finish); // for tests
+
private:
- int get_registered_client_status(bool *registered);
- int register_client();
+ State get_state_() const { return m_state; }
+ bool is_stopped_() const { return m_state == STATE_UNINITIALIZED ||
+ m_state == STATE_STOPPED; }
+ bool is_running_() const { return !is_stopped_() && m_state != STATE_STOPPING; }
+
int get_bootrstap_params(BootstrapParams *params);
- int bootstrap(const BootstrapParams *bootstrap_params);
+ int register_client();
int create_local_image(const BootstrapParams &bootstrap_params);
int get_image_id(librados::IoCtx &ioctx, const std::string &image_name,
std::string *image_id);
int64_t m_remote_pool_id, m_local_pool_id;
std::string m_remote_image_id, m_local_image_id;
std::string m_snap_name;
+ ContextWQ *m_work_queue;
Mutex m_lock;
State m_state;
std::string m_local_pool_name, m_remote_pool_name;
librbd::journal::Replay<librbd::ImageCtx> *m_local_replay;
::journal::Journaler *m_remote_journaler;
::journal::ReplayHandler *m_replay_handler;
+ Context *m_on_finish;
ImageReplayerAdminSocketHook *m_asok_hook;
};