const std::string &remote_mirror_uuid,
::journal::MockJournalerProxy *journaler,
librbd::journal::MirrorPeerClientMeta *client_meta,
- Context *on_finish) {
+ Context *on_finish,
+ rbd::mirror::ProgressContext *progress_ctx = nullptr) {
assert(s_instance != nullptr);
s_instance->on_finish = on_finish;
return s_instance;
journal::MockJournaler *journaler,
librbd::journal::MirrorPeerClientMeta *client_meta,
librbd::journal::MirrorPeerSyncPoint *sync_point,
- Context *on_finish) {
+ Context *on_finish,
+ rbd::mirror::ProgressContext *progress_ctx = nullptr) {
assert(s_instance != nullptr);
s_instance->on_finish = on_finish;
return s_instance;
tools/rbd_mirror/ImageSync.h \
tools/rbd_mirror/Mirror.h \
tools/rbd_mirror/PoolWatcher.h \
+ tools/rbd_mirror/ProgressContext.h \
tools/rbd_mirror/Replayer.h \
tools/rbd_mirror/Threads.h \
tools/rbd_mirror/types.h \
namespace mirror {
using librbd::util::create_context_callback;
+using librbd::util::create_rados_ack_callback;
using namespace rbd::mirror::image_replayer;
template <typename I>
replayer->handle_replay_ready();
}
virtual void handle_complete(int r) {
- replayer->handle_replay_complete(r);
+ std::stringstream ss;
+ if (r < 0) {
+ ss << "replay completed with error: " << cpp_strerror(r);
+ }
+ replayer->handle_replay_complete(r, ss.str());
}
};
} // anonymous namespace
+template <typename I>
+void ImageReplayer<I>::BootstrapProgressContext::update_progress(
+ const std::string &description, bool flush)
+{
+ const std::string desc = "bootstrapping, " + description;
+
+ FunctionContext *ctx = new FunctionContext(
+ [this, desc, flush](int r) {
+ replayer->set_state_description(0, desc);
+ if (flush) {
+ replayer->update_mirror_image_status();
+ }
+ });
+ replayer->m_threads->work_queue->queue(ctx, 0);
+}
+
template <typename I>
ImageReplayer<I>::ImageReplayer(Threads *threads, RadosRef local, RadosRef remote,
const std::string &local_mirror_uuid,
m_name(stringify(remote_pool_id) + "/" + remote_image_id),
m_lock("rbd::mirror::ImageReplayer " + stringify(remote_pool_id) + " " +
remote_image_id),
- m_state(STATE_UNINITIALIZED),
- m_local_image_ctx(nullptr),
- m_local_replay(nullptr),
- m_remote_journaler(nullptr),
- m_replay_handler(nullptr),
- m_asok_hook(nullptr)
+ m_progress_cxt(this)
{
}
assert(m_replay_handler == nullptr);
assert(m_on_start_finish == nullptr);
assert(m_on_stop_finish == nullptr);
+
delete m_asok_hook;
}
+template <typename I>
+void ImageReplayer<I>::set_state_description(int r, const std::string &desc) {
+ dout(20) << r << " " << desc << dendl;
+
+ Mutex::Locker l(m_lock);
+ m_last_r = r;
+ m_state_desc = desc;
+}
+
template <typename I>
void ImageReplayer<I>::start(Context *on_finish,
const BootstrapParams *bootstrap_params)
assert(is_stopped_());
m_state = STATE_STARTING;
+ m_last_r = 0;
+ m_state_desc.clear();
m_on_start_finish = on_finish;
}
if (r < 0) {
derr << "error opening ioctx for remote pool " << m_remote_pool_id
<< ": " << cpp_strerror(r) << dendl;
- on_start_fail_start(r);
+ on_start_fail_start(r, "error opening remote pool");
return;
}
if (r < 0) {
derr << "error opening ioctx for local pool " << m_local_pool_id
<< ": " << cpp_strerror(r) << dendl;
- on_start_fail_start(r);
+ on_start_fail_start(r, "error opening local pool");
return;
}
+ reschedule_update_status_task(10);
+
CephContext *cct = static_cast<CephContext *>(m_local->cct());
double commit_interval = cct->_conf->rbd_journal_commit_age;
m_remote_journaler = new Journaler(m_threads->work_queue,
dout(20) << "bootstrap params: "
<< "local_image_name=" << m_local_image_name << dendl;
+ update_mirror_image_status();
+
// TODO: add a new bootstrap state and support canceling
Context *ctx = create_context_callback<
ImageReplayer, &ImageReplayer<I>::handle_bootstrap>(this);
+
BootstrapRequest<I> *request = BootstrapRequest<I>::create(
m_local_ioctx, m_remote_ioctx, &m_local_image_ctx,
m_local_image_name, m_remote_image_id, m_global_image_id,
m_threads->work_queue, m_threads->timer, &m_threads->timer_lock,
m_local_mirror_uuid, m_remote_mirror_uuid, m_remote_journaler,
- &m_client_meta, ctx);
+ &m_client_meta, ctx, &m_progress_cxt);
request->send();
}
if (r == -EREMOTEIO) {
dout(5) << "remote image is non-primary or local image is primary" << dendl;
- on_start_fail_start(0);
+ on_start_fail_start(0, "remote image is non-primary or local image is primary");
return;
} else if (r < 0) {
- on_start_fail_start(r);
+ on_start_fail_start(r, "error bootstrapping replay");
return;
} else if (on_start_interrupted()) {
return;
m_asok_hook = new ImageReplayerAdminSocketHook<I>(cct, m_name, this);
}
+ update_mirror_image_status();
+
init_remote_journaler();
}
if (r < 0) {
derr << "failed to initialize remote journal: " << cpp_strerror(r) << dendl;
- on_start_fail_start(r);
+ on_start_fail_start(r, "error initializing remote journal");
return;
} else if (on_start_interrupted()) {
return;
if (r < 0) {
derr << "error starting external replay on local image "
<< m_local_image_id << ": " << cpp_strerror(r) << dendl;
- on_start_fail_start(r);
+ on_start_fail_start(r, "error starting replay on local image");
return;
}
{
Mutex::Locker locker(m_lock);
if (m_stop_requested) {
- on_start_fail_start(-EINTR);
+ on_start_fail_start(-EINTR, "start interrupted");
return;
}
assert(m_state == STATE_STARTING);
m_state = STATE_REPLAYING;
+ m_state_desc.clear();
std::swap(m_on_start_finish, on_finish);
}
+ update_mirror_image_status();
+ reschedule_update_status_task(30);
+
dout(20) << "start succeeded" << dendl;
if (on_finish != nullptr) {
dout(20) << "on finish complete, r=" << r << dendl;
}
template <typename I>
-void ImageReplayer<I>::on_start_fail_start(int r)
+void ImageReplayer<I>::on_start_fail_start(int r, const std::string &desc)
{
dout(20) << "r=" << r << dendl;
FunctionContext *ctx = new FunctionContext(
- [this, r](int r1) {
+ [this, r, desc](int r1) {
assert(r1 == 0);
+ set_state_description(r, desc);
on_start_fail_finish(r);
});
m_local_image_ctx = nullptr;
}
- m_local_ioctx.close();
- m_remote_ioctx.close();
-
- delete m_asok_hook;
- m_asok_hook = nullptr;
-
Context *on_start_finish(nullptr);
Context *on_stop_finish(nullptr);
{
} else {
assert(m_state == STATE_STARTING);
dout(20) << "start failed" << dendl;
- m_state = STATE_UNINITIALIZED;
+ m_state = (r < 0) ? STATE_UNINITIALIZED : STATE_STOPPED;
}
std::swap(m_on_start_finish, on_start_finish);
std::swap(m_on_stop_finish, on_stop_finish);
}
+ update_mirror_image_status(true);
+
+ m_local_ioctx.close();
+ m_remote_ioctx.close();
+
+ delete m_asok_hook;
+ m_asok_hook = nullptr;
+
if (on_start_finish != nullptr) {
dout(20) << "on start finish complete, r=" << r << dendl;
on_start_finish->complete(r);
m_state = STATE_STOPPING;
m_local_replay->shut_down(false, ctx);
}
+
+ update_mirror_image_status();
}
template <typename I>
derr << "error closing local image: " << cpp_strerror(r) << dendl;
}
+ update_mirror_image_status(true);
+
m_local_ioctx.close();
m_remote_journaler->stop_replay();
assert(m_state == STATE_STOPPING);
m_state = STATE_STOPPED;
+ m_state_desc.clear();
m_stop_requested = false;
std::swap(m_on_stop_finish, on_finish);
}
<< cpp_strerror(r) << dendl;
}
+ update_mirror_image_status();
+
dout(20) << "flush complete, r=" << r << dendl;
on_flush->complete(r);
}
}
template <typename I>
-void ImageReplayer<I>::handle_replay_complete(int r)
+void ImageReplayer<I>::handle_replay_complete(int r, const std::string &error_desc)
{
dout(20) << "r=" << r << dendl;
if (r < 0) {
derr << "replay encountered an error: " << cpp_strerror(r) << dendl;
+ set_state_description(r, error_desc);
}
{
if (r < 0) {
derr << "replay flush encountered an error: " << cpp_strerror(r) << dendl;
- handle_replay_complete(r);
+ handle_replay_complete(r, "replay flush encountered an error");
return;
}
if (r < 0) {
derr << "failed to retrieve remote tag " << m_replay_tag_tid << ": "
<< cpp_strerror(r) << dendl;
- handle_replay_complete(r);
+ handle_replay_complete(r, "failed to retrieve remote tag");
return;
}
if (r < 0) {
derr << "failed to allocate journal tag: " << cpp_strerror(r) << dendl;
- handle_replay_complete(r);
+ handle_replay_complete(r, "failed to allocate journal tag");
return;
}
if (r < 0) {
derr << "failed to commit journal event: " << cpp_strerror(r) << dendl;
- handle_replay_complete(r);
+ handle_replay_complete(r, "failed to commit journal event");
return;
}
}
}
+template <typename I>
+void ImageReplayer<I>::update_mirror_image_status(bool final)
+{
+ dout(20) << dendl;
+
+ cls::rbd::MirrorImageStatus status;
+
+ {
+ Mutex::Locker locker(m_lock);
+
+ if (!final) {
+ if (m_update_status_comp) {
+ dout(20) << "already sending update" << dendl;
+ m_update_status_pending = true;
+ return;
+ }
+
+ Context *ctx = new FunctionContext(
+ [this](int r) {
+ if (r < 0) {
+ derr << "error updating mirror image status: " << cpp_strerror(r)
+ << dendl;
+ }
+ bool pending = false;
+ librados::AioCompletion *comp = nullptr;
+ {
+ Mutex::Locker locker(m_lock);
+ std::swap(m_update_status_comp, comp);
+ std::swap(m_update_status_pending, pending);
+ }
+ if (comp) {
+ comp->release();
+ }
+ if (pending && r == 0) {
+ update_mirror_image_status();
+ }
+ });
+ m_update_status_comp = create_rados_ack_callback(ctx);
+ m_update_status_pending = false;
+ }
+
+ switch (m_state) {
+ case STATE_UNINITIALIZED:
+ if (m_last_r < 0) {
+ status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_ERROR;
+ status.description = m_state_desc;
+ } else {
+ status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN;
+ status.description = m_state_desc.empty() ? "not started yet" :
+ m_state_desc;
+ }
+ break;
+ case STATE_STARTING:
+ // TODO: a better way to detect syncing state.
+ if (!m_asok_hook) {
+ status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_SYNCING;
+ status.description = m_state_desc.empty() ? "syncing" : m_state_desc;
+ } else {
+ status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_STARTING_REPLAY;
+ status.description = "starting replay";
+ }
+ break;
+ case STATE_REPLAYING:
+ status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_REPLAYING;
+ status.description = get_replay_status_description();
+ break;
+ case STATE_STOPPING:
+ if (m_local_image_ctx) {
+ status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_STOPPING_REPLAY;
+ status.description = "stopping replay";
+ break;
+ }
+ /* FALLTHROUGH */
+ case STATE_STOPPED:
+ if (m_last_r < 0) {
+ status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_ERROR;
+ status.description = m_state_desc;
+ } else {
+ status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_STOPPED;
+ status.description = m_state_desc.empty() ? "stopped" : m_state_desc;
+ }
+ break;
+ default:
+ assert(!"invalid state");
+ }
+ }
+
+ dout(20) << "status=" << status << dendl;
+
+ librados::ObjectWriteOperation op;
+ librbd::cls_client::mirror_image_status_set(&op, m_global_image_id, status);
+
+ if (final) {
+ reschedule_update_status_task(-1);
+ m_local_ioctx.aio_flush();
+ librados::AioCompletion *comp = nullptr;
+ {
+ Mutex::Locker locker(m_lock);
+ std::swap(m_update_status_comp, comp);
+ }
+ if (comp) {
+ comp->wait_for_complete();
+ }
+ int r = m_local_ioctx.operate(RBD_MIRRORING, &op);
+ if (r < 0) {
+ derr << "error updating mirror image status: " << cpp_strerror(r)
+ << dendl;
+ }
+ return;
+ }
+
+ int r = m_local_ioctx.aio_operate(RBD_MIRRORING, m_update_status_comp, &op);
+ assert(r == 0);
+
+ reschedule_update_status_task();
+}
+
+template <typename I>
+void ImageReplayer<I>::reschedule_update_status_task(int new_interval)
+{
+ Mutex::Locker locker(m_threads->timer_lock);
+
+ if (m_update_status_task) {
+ m_threads->timer->cancel_event(m_update_status_task);
+ m_update_status_task = nullptr;
+ }
+
+ if (new_interval > 0) {
+ m_update_status_interval = new_interval;
+ }
+
+ if (new_interval < 0) {
+ return;
+ }
+
+ m_update_status_task = new FunctionContext(
+ [this](int r) {
+ start_update_status_task();
+ });
+
+ m_threads->timer->add_event_after(m_update_status_interval,
+ m_update_status_task);
+}
+
+template <typename I>
+void ImageReplayer<I>::start_update_status_task()
+{
+ FunctionContext *ctx = new FunctionContext(
+ [this](int r) {
+ {
+ Mutex::Locker locker(m_threads->timer_lock);
+ m_update_status_task = nullptr;
+ }
+ update_mirror_image_status();
+ });
+ m_threads->work_queue->queue(ctx, 0);
+}
+
+template <typename I>
+std::string ImageReplayer<I>::get_replay_status_description() {
+ assert(m_lock.is_locked());
+ assert(m_state == STATE_REPLAYING);
+
+ std::stringstream ss;
+ ss << "replaying";
+
+ cls::journal::Client master;
+ int r = m_remote_journaler->get_cached_client(
+ librbd::Journal<>::IMAGE_CLIENT_ID, &master);
+ if (r == 0) {
+ ss << ", master_position=";
+ cls::journal::ObjectPositions &object_positions =
+ master.commit_position.object_positions;
+
+ if (object_positions.begin() != object_positions.end()) {
+ ss << *(object_positions.begin());
+ } else {
+ ss << "[]";
+ }
+ }
+
+ cls::journal::Client mirror;
+ r = m_remote_journaler->get_cached_client(m_local_mirror_uuid, &mirror);
+ if (r == 0) {
+ ss << ", mirror_position=";
+ cls::journal::ObjectPositions &object_positions =
+ mirror.commit_position.object_positions;
+
+ if (object_positions.begin() != object_positions.end()) {
+ ss << *(object_positions.begin());
+ } else {
+ ss << "[]";
+ }
+ }
+
+ return ss.str();
+}
+
template <typename I>
std::string ImageReplayer<I>::to_string(const State state) {
switch (state) {
#include "common/WorkQueue.h"
#include "include/rados/librados.hpp"
#include "cls/journal/cls_journal_types.h"
+#include "cls/rbd/cls_rbd_types.h"
#include "journal/ReplayEntry.h"
#include "librbd/journal/Types.h"
#include "librbd/journal/TypeTraits.h"
+#include "ProgressContext.h"
#include "types.h"
class AdminSocketHook;
bool is_running() { Mutex::Locker l(m_lock); return is_running_(); }
std::string get_name() { Mutex::Locker l(m_lock); return m_name; };
+ void set_state_description(int r, const std::string &desc);
void start(Context *on_finish = nullptr,
const BootstrapParams *bootstrap_params = nullptr);
void print_status(Formatter *f, stringstream *ss);
virtual void handle_replay_ready();
- virtual void handle_replay_complete(int r);
+ virtual void handle_replay_complete(int r, const std::string &error_desc);
inline int64_t get_remote_pool_id() const {
return m_remote_pool_id;
* @endverbatim
*/
- virtual void on_start_fail_start(int r);
+ virtual void on_start_fail_start(int r, const std::string &desc = "");
virtual void on_start_fail_finish(int r);
virtual bool on_start_interrupted();
private:
typedef typename librbd::journal::TypeTraits<ImageCtxT>::Journaler Journaler;
+ class BootstrapProgressContext : public ProgressContext {
+ public:
+ BootstrapProgressContext(ImageReplayer<ImageCtxT> *replayer) :
+ replayer(replayer) {
+ }
+
+ virtual void update_progress(const std::string &description,
+ bool flush = true);
+ private:
+ ImageReplayer<ImageCtxT> *replayer;
+ };
+
Threads *m_threads;
RadosRef m_local, m_remote;
std::string m_local_mirror_uuid;
std::string m_local_image_name;
std::string m_name;
Mutex m_lock;
- State m_state;
+ State m_state = STATE_UNINITIALIZED;
+ int m_last_r = 0;
+ std::string m_state_desc;
+ BootstrapProgressContext m_progress_cxt;
librados::IoCtx m_local_ioctx, m_remote_ioctx;
- ImageCtxT *m_local_image_ctx;
- librbd::journal::Replay<ImageCtxT> *m_local_replay;
- Journaler* m_remote_journaler;
- ::journal::ReplayHandler *m_replay_handler;
+ ImageCtxT *m_local_image_ctx = nullptr;
+ librbd::journal::Replay<ImageCtxT> *m_local_replay = nullptr;
+ Journaler* m_remote_journaler = nullptr;
+ ::journal::ReplayHandler *m_replay_handler = nullptr;
Context *m_on_start_finish = nullptr;
Context *m_on_stop_finish = nullptr;
+ Context *m_update_status_task = nullptr;
+ int m_update_status_interval = 0;
+ librados::AioCompletion *m_update_status_comp = nullptr;
+ bool m_update_status_pending = false;
bool m_stop_requested = false;
- AdminSocketHook *m_asok_hook;
+ AdminSocketHook *m_asok_hook = nullptr;
librbd::journal::MirrorPeerClientMeta m_client_meta;
void shut_down_journal_replay(bool cancel_ops);
+ void update_mirror_image_status(bool final = false);
+ void reschedule_update_status_task(int new_interval = 0);
+ void start_update_status_task();
+
+ std::string get_replay_status_description();
+
void bootstrap();
void handle_bootstrap(int r);
// vim: ts=8 sw=2 smarttab
#include "ImageSync.h"
+#include "ProgressContext.h"
#include "common/errno.h"
#include "journal/Journaler.h"
#include "librbd/ImageCtx.h"
ImageSync<I>::ImageSync(I *local_image_ctx, I *remote_image_ctx,
SafeTimer *timer, Mutex *timer_lock,
const std::string &mirror_uuid, Journaler *journaler,
- MirrorPeerClientMeta *client_meta, Context *on_finish)
+ MirrorPeerClientMeta *client_meta, Context *on_finish,
+ ProgressContext *progress_ctx)
: m_local_image_ctx(local_image_ctx), m_remote_image_ctx(remote_image_ctx),
m_timer(timer), m_timer_lock(timer_lock), m_mirror_uuid(mirror_uuid),
m_journaler(journaler), m_client_meta(client_meta), m_on_finish(on_finish),
+ m_progress_ctx(progress_ctx),
m_lock(unique_lock_name("ImageSync::m_lock", this)) {
}
template <typename I>
void ImageSync<I>::send_prune_catch_up_sync_point() {
+ update_progress("PRUNE_CATCH_UP_SYNC_POINT");
+
if (m_client_meta->sync_points.size() <= 1) {
send_create_sync_point();
return;
template <typename I>
void ImageSync<I>::send_create_sync_point() {
+ update_progress("CREATE_SYNC_POINT");
+
// TODO: when support for disconnecting laggy clients is added,
// re-connect and create catch-up sync point
if (m_client_meta->sync_points.size() > 0) {
template <typename I>
void ImageSync<I>::send_copy_snapshots() {
+ update_progress("COPY_SNAPSHOTS");
+
CephContext *cct = m_local_image_ctx->cct;
ldout(cct, 20) << dendl;
return;
}
+ update_progress("COPY_IMAGE");
+
CephContext *cct = m_local_image_ctx->cct;
ldout(cct, 20) << dendl;
m_image_copy_request = ImageCopyRequest<I>::create(
m_local_image_ctx, m_remote_image_ctx, m_timer, m_timer_lock,
m_journaler, m_client_meta, &m_client_meta->sync_points.front(),
- ctx);
+ ctx, m_progress_ctx);
m_lock.Unlock();
m_image_copy_request->send();
return;
}
+ update_progress("COPY_OBJECT_MAP");
+
assert(m_local_image_ctx->object_map != nullptr);
assert(!m_client_meta->sync_points.empty());
CephContext *cct = m_local_image_ctx->cct;
ldout(cct, 20) << dendl;
+ update_progress("REFRESH_OBJECT_MAP");
+
Context *ctx = create_context_callback<
ImageSync<I>, &ImageSync<I>::handle_refresh_object_map>(this);
m_object_map = m_local_image_ctx->create_object_map(CEPH_NOSNAP);
CephContext *cct = m_local_image_ctx->cct;
ldout(cct, 20) << dendl;
+ update_progress("PRUNE_SYNC_POINTS");
+
Context *ctx = create_context_callback<
ImageSync<I>, &ImageSync<I>::handle_prune_sync_points>(this);
SyncPointPruneRequest<I> *request = SyncPointPruneRequest<I>::create(
delete this;
}
+template <typename I>
+void ImageSync<I>::update_progress(const std::string &description) {
+ dout(20) << ": " << description << dendl;
+
+ if (m_progress_ctx) {
+ m_progress_ctx->update_progress("IMAGE_SYNC/" + description);
+ }
+}
+
} // namespace mirror
} // namespace rbd
namespace rbd {
namespace mirror {
+class ProgressContext;
+
namespace image_sync { template <typename> class ImageCopyRequest; }
template <typename ImageCtxT = librbd::ImageCtx>
Mutex *timer_lock, const std::string &mirror_uuid,
Journaler *journaler,
MirrorPeerClientMeta *client_meta,
- Context *on_finish) {
+ Context *on_finish,
+ ProgressContext *progress_ctx = nullptr) {
return new ImageSync(local_image_ctx, remote_image_ctx, timer, timer_lock,
- mirror_uuid, journaler, client_meta, on_finish);
+ mirror_uuid, journaler, client_meta, on_finish,
+ progress_ctx);
}
ImageSync(ImageCtxT *local_image_ctx, ImageCtxT *remote_image_ctx,
SafeTimer *timer, Mutex *timer_lock, const std::string &mirror_uuid,
Journaler *journaler, MirrorPeerClientMeta *client_meta,
- Context *on_finish);
+ Context *on_finish, ProgressContext *progress_ctx = nullptr);
void start();
void cancel();
Journaler *m_journaler;
MirrorPeerClientMeta *m_client_meta;
Context *m_on_finish;
+ ProgressContext *m_progress_ctx;
SnapMap m_snap_map;
void handle_prune_sync_points(int r);
void finish(int r);
+
+ void update_progress(const std::string &description);
};
} // namespace mirror
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef RBD_MIRROR_PROGRESS_CONTEXT_H
+#define RBD_MIRROR_PROGRESS_CONTEXT_H
+
+namespace rbd {
+namespace mirror {
+
+class ProgressContext
+{
+public:
+ virtual ~ProgressContext() {}
+ virtual void update_progress(const std::string &description,
+ bool flush = true) = 0;
+};
+
+} // namespace mirror
+} // namespace rbd
+
+#endif // RBD_MIRROR_PROGRESS_CONTEXT_H
#include "common/errno.h"
#include "include/stringify.h"
#include "cls/rbd/cls_rbd_client.h"
+#include "librbd/ObjectWatcher.h"
#include "Replayer.h"
+#include "Threads.h"
#define dout_subsys ceph_subsys_rbd_mirror
#undef dout_prefix
Commands commands;
};
+class MirrorStatusWatchCtx {
+public:
+ MirrorStatusWatchCtx(librados::IoCtx &ioctx, ContextWQ *work_queue) {
+ m_ioctx.dup(ioctx);
+ m_watcher = new Watcher(m_ioctx, work_queue);
+ }
+
+ ~MirrorStatusWatchCtx() {
+ delete m_watcher;
+ }
+
+ int register_watch() {
+ C_SaferCond cond;
+ m_watcher->register_watch(&cond);
+ return cond.wait();
+ }
+
+ int unregister_watch() {
+ C_SaferCond cond;
+ m_watcher->unregister_watch(&cond);
+ return cond.wait();
+ }
+
+ std::string get_oid() const {
+ return m_watcher->get_oid();
+ }
+
+private:
+ class Watcher : public librbd::ObjectWatcher<> {
+ public:
+ Watcher(librados::IoCtx &ioctx, ContextWQ *work_queue) :
+ ObjectWatcher<>(ioctx, work_queue) {
+ }
+
+ virtual std::string get_oid() const {
+ return RBD_MIRRORING;
+ }
+
+ virtual void handle_notify(uint64_t notify_id, uint64_t handle,
+ bufferlist &bl) {
+ bufferlist out;
+ acknowledge_notify(notify_id, handle, out);
+ }
+ };
+
+ librados::IoCtx m_ioctx;
+ Watcher *m_watcher;
+};
+
Replayer::Replayer(Threads *threads, RadosRef local_cluster,
const peer_t &peer, const std::vector<const char*> &args) :
m_threads(threads),
}
}
if (pool_images.empty()) {
+ mirror_image_status_shut_down(pool_id);
it = m_images.erase(it);
} else {
++it;
// create entry for pool if it doesn't exist
auto &pool_replayers = m_images[pool_id];
+
+ if (pool_replayers.empty()) {
+ r = mirror_image_status_init(pool_id, local_ioctx);
+ if (r < 0) {
+ continue;
+ }
+ }
+
for (const auto &image_id : kv.second) {
auto it = pool_replayers.find(image_id.id);
if (it == pool_replayers.end()) {
}
}
+int Replayer::mirror_image_status_init(int64_t pool_id,
+ librados::IoCtx& ioctx) {
+ assert(m_status_watchers.find(pool_id) == m_status_watchers.end());
+
+ uint64_t instance_id = librados::Rados(ioctx).get_instance_id();
+
+ dout(20) << "pool_id=" << pool_id << ", instance_id=" << instance_id << dendl;
+
+ librados::ObjectWriteOperation op;
+ librbd::cls_client::mirror_image_status_remove_down(&op);
+ int r = ioctx.operate(RBD_MIRRORING, &op);
+ if (r < 0) {
+ derr << "error initializing " << RBD_MIRRORING << "object: "
+ << cpp_strerror(r) << dendl;
+ return r;
+ }
+
+ unique_ptr<MirrorStatusWatchCtx>
+ watch_ctx(new MirrorStatusWatchCtx(ioctx, m_threads->work_queue));
+
+ r = watch_ctx->register_watch();
+ if (r < 0) {
+ derr << "error registering watcher for " << watch_ctx->get_oid()
+ << " object: " << cpp_strerror(r) << dendl;
+ return r;
+ }
+
+ m_status_watchers.insert(std::make_pair(pool_id, std::move(watch_ctx)));
+
+ return 0;
+}
+
+void Replayer::mirror_image_status_shut_down(int64_t pool_id) {
+ auto watcher_it = m_status_watchers.find(pool_id);
+ assert(watcher_it != m_status_watchers.end());
+
+ int r = watcher_it->second->unregister_watch();
+ if (r < 0) {
+ derr << "error unregistering watcher for " << watcher_it->second->get_oid()
+ << " object: " << cpp_strerror(r) << dendl;
+ }
+
+ m_status_watchers.erase(watcher_it);
+}
+
void Replayer::start_image_replayer(unique_ptr<ImageReplayer<> > &image_replayer)
{
if (!image_replayer->is_stopped()) {
struct Threads;
class ReplayerAdminSocketHook;
+class MirrorStatusWatchCtx;
/**
* Controls mirroring for a single remote cluster.
void start_image_replayer(unique_ptr<ImageReplayer<> > &image_replayer);
bool stop_image_replayer(unique_ptr<ImageReplayer<> > &image_replayer);
+ int mirror_image_status_init(int64_t pool_id, librados::IoCtx& ioctx);
+ void mirror_image_status_shut_down(int64_t pool_id);
+
Threads *m_threads;
Mutex m_lock;
Cond m_cond;
// when a pool's configuration changes
std::map<int64_t, std::map<std::string,
std::unique_ptr<ImageReplayer<> > > > m_images;
+ std::map<int64_t, std::unique_ptr<MirrorStatusWatchCtx> > m_status_watchers;
ReplayerAdminSocketHook *m_asok_hook;
class ReplayerThread : public Thread {
#include "librbd/Utils.h"
#include "librbd/journal/Types.h"
#include "tools/rbd_mirror/ImageSync.h"
+#include "tools/rbd_mirror/ProgressContext.h"
#define dout_subsys ceph_subsys_rbd_mirror
#undef dout_prefix
const std::string &remote_mirror_uuid,
Journaler *journaler,
MirrorPeerClientMeta *client_meta,
- Context *on_finish)
+ Context *on_finish,
+ rbd::mirror::ProgressContext *progress_ctx)
: m_local_io_ctx(local_io_ctx), m_remote_io_ctx(remote_io_ctx),
m_local_image_ctx(local_image_ctx), m_local_image_name(local_image_name),
m_remote_image_id(remote_image_id), m_global_image_id(global_image_id),
m_work_queue(work_queue), m_timer(timer), m_timer_lock(timer_lock),
m_local_mirror_uuid(local_mirror_uuid),
m_remote_mirror_uuid(remote_mirror_uuid), m_journaler(journaler),
- m_client_meta(client_meta), m_on_finish(on_finish) {
+ m_client_meta(client_meta), m_on_finish(on_finish),
+ m_progress_ctx(progress_ctx) {
}
template <typename I>
void BootstrapRequest<I>::get_local_image_id() {
dout(20) << dendl;
+ update_progress("GET_LOCAL_IMAGE_ID");
+
// attempt to cross-reference a local image by the global image id
librados::ObjectReadOperation op;
librbd::cls_client::mirror_image_get_image_id_start(&op, m_global_image_id);
void BootstrapRequest<I>::get_remote_tag_class() {
dout(20) << dendl;
+ update_progress("GET_REMOTE_TAG_CLASS");
+
Context *ctx = create_context_callback<
BootstrapRequest<I>, &BootstrapRequest<I>::handle_get_remote_tag_class>(
this);
void BootstrapRequest<I>::get_client() {
dout(20) << dendl;
+ update_progress("GET_CLIENT");
+
Context *ctx = create_context_callback<
BootstrapRequest<I>, &BootstrapRequest<I>::handle_get_client>(
this);
void BootstrapRequest<I>::register_client() {
dout(20) << dendl;
+ update_progress("REGISTER_CLIENT");
+
// record an place-holder record
librbd::journal::ClientData client_data{
librbd::journal::MirrorPeerClientMeta{m_local_image_id}};
void BootstrapRequest<I>::open_remote_image() {
dout(20) << dendl;
+ update_progress("OPEN_REMOTE_IMAGE");
+
m_remote_image_ctx = I::create("", m_remote_image_id, nullptr,
m_remote_io_ctx, false);
Context *ctx = create_context_callback<
void BootstrapRequest<I>::open_local_image() {
dout(20) << dendl;
+ update_progress("OPEN_LOCAL_IMAGE");
+
Context *ctx = create_context_callback<
BootstrapRequest<I>, &BootstrapRequest<I>::handle_open_local_image>(
this);
void BootstrapRequest<I>::remove_local_image() {
dout(20) << dendl;
+ update_progress("REMOVE_LOCAL_IMAGE");
+
// TODO
}
void BootstrapRequest<I>::create_local_image() {
dout(20) << dendl;
+ update_progress("CREATE_LOCAL_IMAGE");
+
// TODO: librbd should provide an AIO image creation method -- this is
// blocking so we execute in our worker thread
Context *ctx = create_context_callback<
template <typename I>
void BootstrapRequest<I>::update_client() {
+ dout(20) << dendl;
+
+ update_progress("UPDATE_CLIENT");
+
if (m_client_meta->image_id == (*m_local_image_ctx)->id) {
// already registered local image with remote journal
get_remote_tags();
template <typename I>
void BootstrapRequest<I>::get_remote_tags() {
+ dout(20) << dendl;
+
+ update_progress("GET_REMOTE_TAGS");
+
if (m_created_local_image) {
// optimization -- no need to compare remote tags if we just created
// the image locally
template <typename I>
void BootstrapRequest<I>::image_sync() {
+ dout(20) << dendl;
+
+ update_progress("IMAGE_SYNC");
+
if (m_client_meta->state == librbd::journal::MIRROR_PEER_STATE_REPLAYING) {
// clean replay state -- no image sync required
close_remote_image();
m_remote_image_ctx, m_timer,
m_timer_lock,
m_local_mirror_uuid, m_journaler,
- m_client_meta, ctx);
+ m_client_meta, ctx,
+ m_progress_ctx);
request->start();
}
void BootstrapRequest<I>::close_local_image() {
dout(20) << dendl;
+ update_progress("CLOSE_LOCAL_IMAGE");
+
Context *ctx = create_context_callback<
BootstrapRequest<I>, &BootstrapRequest<I>::handle_close_local_image>(
this);
void BootstrapRequest<I>::close_remote_image() {
dout(20) << dendl;
+ update_progress("CLOSE_REMOTE_IMAGE");
+
Context *ctx = create_context_callback<
BootstrapRequest<I>, &BootstrapRequest<I>::handle_close_remote_image>(
this);
return true;
}
+template <typename I>
+void BootstrapRequest<I>::update_progress(const std::string &description) {
+ dout(20) << ": " << description << dendl;
+
+ if (m_progress_ctx) {
+ m_progress_ctx->update_progress(description);
+ }
+}
+
} // namespace image_replayer
} // namespace mirror
} // namespace rbd
namespace rbd {
namespace mirror {
+
+class ProgressContext;
+
namespace image_replayer {
template <typename ImageCtxT = librbd::ImageCtx>
typedef librbd::journal::TypeTraits<ImageCtxT> TypeTraits;
typedef typename TypeTraits::Journaler Journaler;
typedef librbd::journal::MirrorPeerClientMeta MirrorPeerClientMeta;
+ typedef rbd::mirror::ProgressContext ProgressContext;
static BootstrapRequest* create(librados::IoCtx &local_io_ctx,
librados::IoCtx &remote_io_ctx,
const std::string &remote_mirror_uuid,
Journaler *journaler,
MirrorPeerClientMeta *client_meta,
- Context *on_finish) {
+ Context *on_finish,
+ ProgressContext *progress_ctx = nullptr) {
return new BootstrapRequest(local_io_ctx, remote_io_ctx, local_image_ctx,
local_image_name, remote_image_id,
global_image_id, work_queue, timer, timer_lock,
local_mirror_uuid, remote_mirror_uuid,
- journaler, client_meta, on_finish);
+ journaler, client_meta, on_finish,
+ progress_ctx);
}
BootstrapRequest(librados::IoCtx &local_io_ctx,
SafeTimer *timer, Mutex *timer_lock,
const std::string &local_mirror_uuid,
const std::string &remote_mirror_uuid, Journaler *journaler,
- MirrorPeerClientMeta *client_meta, Context *on_finish);
+ MirrorPeerClientMeta *client_meta, Context *on_finish,
+ ProgressContext *progress_ctx = nullptr);
~BootstrapRequest();
void send();
Journaler *m_journaler;
MirrorPeerClientMeta *m_client_meta;
Context *m_on_finish;
+ ProgressContext *m_progress_ctx;
Tags m_remote_tags;
cls::journal::Client m_client;
void finish(int r);
bool decode_client_meta();
+
+ void update_progress(const std::string &description);
};
} // namespace image_replayer
#include "ImageCopyRequest.h"
#include "ObjectCopyRequest.h"
+#include "include/stringify.h"
#include "common/errno.h"
#include "journal/Journaler.h"
#include "librbd/Utils.h"
+#include "tools/rbd_mirror/ProgressContext.h"
#define dout_subsys ceph_subsys_rbd_mirror
#undef dout_prefix
Journaler *journaler,
MirrorPeerClientMeta *client_meta,
MirrorPeerSyncPoint *sync_point,
- Context *on_finish)
+ Context *on_finish,
+ ProgressContext *progress_ctx)
: m_local_image_ctx(local_image_ctx), m_remote_image_ctx(remote_image_ctx),
m_timer(timer), m_timer_lock(timer_lock), m_journaler(journaler),
m_client_meta(client_meta), m_sync_point(sync_point),
- m_on_finish(on_finish),
+ m_on_finish(on_finish), m_progress_ctx(progress_ctx),
m_lock(unique_lock_name("ImageCopyRequest::m_lock", this)),
m_client_meta_copy(*client_meta) {
assert(!m_client_meta_copy.sync_points.empty());
return;
}
+ update_progress("UPDATE_MAX_OBJECT_COUNT");
+
CephContext *cct = m_local_image_ctx->cct;
ldout(cct, 20) << ": sync_object_count=" << max_objects << dendl;
dout(20) << ": start_object=" << m_object_no << ", "
<< "end_object=" << m_end_object_no << dendl;
+ update_progress("COPY_OBJECT");
+
bool complete;
{
Mutex::Locker locker(m_lock);
CephContext *cct = m_local_image_ctx->cct;
ldout(cct, 20) << ": r=" << r << dendl;
+ int percent;
bool complete;
{
Mutex::Locker locker(m_lock);
assert(m_current_ops > 0);
--m_current_ops;
+ percent = 100 * m_object_no / m_end_object_no;
if (r < 0) {
lderr(cct) << ": object copy failed: " << cpp_strerror(r) << dendl;
complete = (m_current_ops == 0);
}
+ update_progress("COPY_OBJECT " + stringify(percent) + "%", false);
+
if (complete) {
send_flush_sync_point();
}
return;
}
+ update_progress("FLUSH_SYNC_POINT");
+
m_client_meta_copy = *m_client_meta;
if (m_object_no > 0) {
m_sync_point->object_number = m_object_no - 1;
return 0;
}
+template <typename I>
+void ImageCopyRequest<I>::update_progress(const std::string &description,
+ bool flush) {
+ dout(20) << ": " << description << dendl;
+
+ if (m_progress_ctx) {
+ m_progress_ctx->update_progress("IMAGE_COPY/" + description, flush);
+ }
+}
+
} // namespace image_sync
} // namespace mirror
} // namespace rbd
namespace rbd {
namespace mirror {
+
+class ProgressContext;
+
namespace image_sync {
template <typename ImageCtxT = librbd::ImageCtx>
typedef typename TypeTraits::Journaler Journaler;
typedef librbd::journal::MirrorPeerSyncPoint MirrorPeerSyncPoint;
typedef librbd::journal::MirrorPeerClientMeta MirrorPeerClientMeta;
+ typedef rbd::mirror::ProgressContext ProgressContext;
static ImageCopyRequest* create(ImageCtxT *local_image_ctx,
ImageCtxT *remote_image_ctx,
Journaler *journaler,
MirrorPeerClientMeta *client_meta,
MirrorPeerSyncPoint *sync_point,
- Context *on_finish) {
+ Context *on_finish,
+ ProgressContext *progress_ctx = nullptr) {
return new ImageCopyRequest(local_image_ctx, remote_image_ctx, timer,
timer_lock, journaler, client_meta, sync_point,
- on_finish);
+ on_finish, progress_ctx);
}
ImageCopyRequest(ImageCtxT *local_image_ctx, ImageCtxT *remote_image_ctx,
SafeTimer *timer, Mutex *timer_lock, Journaler *journaler,
MirrorPeerClientMeta *client_meta,
- MirrorPeerSyncPoint *sync_point, Context *on_finish);
+ MirrorPeerSyncPoint *sync_point, Context *on_finish,
+ ProgressContext *progress_ctx = nullptr);
void send();
void cancel();
MirrorPeerClientMeta *m_client_meta;
MirrorPeerSyncPoint *m_sync_point;
Context *m_on_finish;
+ ProgressContext *m_progress_ctx;
SnapMap m_snap_map;
int compute_snap_map();
+ void update_progress(const std::string &description, bool flush = true);
};
} // namespace image_sync