#include "librbd/internal.h"
#include "librbd/journal/Replay.h"
#include "ImageReplayer.h"
+#include "ImageSync.h"
#include "Threads.h"
+#include "tools/rbd_mirror/image_replayer/BootstrapRequest.h"
+#include "tools/rbd_mirror/image_replayer/CloseImageRequest.h"
+#include "tools/rbd_mirror/image_replayer/OpenLocalImageRequest.h"
#define dout_subsys ceph_subsys_rbd_mirror
#undef dout_prefix
namespace mirror {
using librbd::util::create_context_callback;
+using namespace rbd::mirror::image_replayer;
namespace {
}
};
-class BootstrapThread : public Thread {
-public:
- explicit BootstrapThread(ImageReplayer *replayer,
- const ImageReplayer::BootstrapParams ¶ms,
- Context *on_finish)
- : replayer(replayer), params(params), on_finish(on_finish) {}
-
- virtual ~BootstrapThread() {}
-
-protected:
- void *entry()
- {
- int r = replayer->bootstrap(params);
- on_finish->complete(r);
- delete this;
- return NULL;
- }
-
-private:
- ImageReplayer *replayer;
- ImageReplayer::BootstrapParams params;
- Context *on_finish;
-};
-
class ImageReplayerAdminSocketCommand {
public:
virtual ~ImageReplayerAdminSocketCommand() {}
ImageReplayer::~ImageReplayer()
{
- m_threads->work_queue->drain();
-
assert(m_local_image_ctx == nullptr);
assert(m_local_replay == nullptr);
assert(m_remote_journaler == nullptr);
return;
}
+ if (bootstrap_params != nullptr && !bootstrap_params->empty()) {
+ r = m_local->pool_lookup(bootstrap_params->local_pool_name.c_str());
+ if (r < 0) {
+ derr << "error finding local pool " << bootstrap_params->local_pool_name
+ << ": " << cpp_strerror(r) << dendl;
+ on_start_fail_start(r);
+ return;
+ }
+ m_local_pool_id = r;
+ }
+
+ r = m_local->ioctx_create2(m_local_pool_id, m_local_ioctx);
+ if (r < 0) {
+ derr << "error opening ioctx for local pool " << m_local_pool_id
+ << ": " << cpp_strerror(r) << dendl;
+ on_start_fail_start(r);
+ return;
+ }
+
CephContext *cct = static_cast<CephContext *>(m_local->cct());
double commit_interval = cct->_conf->rbd_journal_commit_age;
on_start_fail_start(-EINVAL);
return;
}
- librbd::journal::MirrorPeerClientMeta &cm =
- boost::get<librbd::journal::MirrorPeerClientMeta>(client_data.client_meta);
- m_local_image_id = cm.image_id;
- // TODO: snap name should be transient
- if (cm.sync_points.empty()) {
- derr << "sync points not found" << dendl;
- on_start_fail_start(-ENOENT);
- return;
- }
- m_snap_name = cm.sync_points.front().snap_name;
+ // TODO: unsafe cast
+ m_client_meta =
+ boost::get<librbd::journal::MirrorPeerClientMeta>(client_data.client_meta);
+ m_local_image_id = m_client_meta.image_id;
dout(20) << "client found, pool_id=" << m_local_pool_id << ", image_id="
- << m_local_image_id << ", snap_name=" << m_snap_name << dendl;
+ << m_local_image_id << dendl;
if (!bootstrap_params.empty()) {
dout(0) << "ignoring bootsrap params: client already registered" << dendl;
}
- on_start_bootstrap_finish(0);
+ on_start_remote_journaler_init_start();
return;
}
}
dout(20) << "client not found" << dendl;
-
- on_start_bootstrap_start(bootstrap_params);
+ bootstrap(bootstrap_params);
}
-void ImageReplayer::on_start_bootstrap_start(const BootstrapParams ¶ms)
-{
- dout(20) << "enter" << dendl;
+void ImageReplayer::bootstrap(const BootstrapParams &bootstrap_params) {
+ int r;
+ BootstrapParams params;
- FunctionContext *ctx = new FunctionContext(
- [this](int r) {
- on_start_bootstrap_finish(r);
- });
+ if (!bootstrap_params.empty()) {
+ dout(20) << "using external bootstrap params" << dendl;
+ params = bootstrap_params;
+ } else {
+ r = get_bootstrap_params(¶ms);
+ if (r < 0) {
+ derr << "error obtaining bootstrap parameters: "
+ << cpp_strerror(r) << dendl;
+ on_start_fail_start(r);
+ return;
+ }
+ }
- BootstrapThread *thread = new BootstrapThread(this, params, ctx);
+ dout(20) << "bootstrap params: "
+ << "local_pool_name=" << params.local_pool_name << ", "
+ << "local_image_name=" << params.local_image_name << dendl;
- thread->create("bootstrap");
- thread->detach();
- // TODO: As the bootstrap might take long time it needs some control
- // to get current status, interrupt, etc...
+ // TODO: add a new bootstrap state and support canceling
+ Context *ctx = create_context_callback<
+ ImageReplayer, &ImageReplayer::handle_bootstrap>(this);
+ BootstrapRequest<> *request = BootstrapRequest<>::create(
+ m_local_ioctx, m_remote_ioctx, &m_local_image_ctx,
+ params.local_image_name, m_remote_image_id, m_threads->work_queue,
+ m_threads->timer, &m_threads->timer_lock, m_client_id, m_remote_journaler,
+ &m_client_meta, ctx);
+ request->send();
}
-void ImageReplayer::on_start_bootstrap_finish(int r)
-{
+void ImageReplayer::handle_bootstrap(int r) {
dout(20) << "r=" << r << dendl;
if (r < 0) {
void ImageReplayer::on_start_remote_journaler_init_start()
{
+ if (on_start_interrupted()) {
+ return;
+ }
+
dout(20) << "enter" << dendl;
FunctionContext *ctx = new FunctionContext(
return;
}
- r = m_local->ioctx_create2(m_local_pool_id, m_local_ioctx);
- if (r < 0) {
- derr << "error opening ioctx for local pool " << m_local_pool_id
- << ": " << cpp_strerror(r) << dendl;
- on_start_fail_start(r);
- return;
- }
on_start_local_image_open_start();
}
void ImageReplayer::on_start_local_image_open_start()
{
dout(20) << "enter" << dendl;
-
- m_local_image_ctx = new librbd::ImageCtx("", m_local_image_id, NULL,
- m_local_ioctx, false);
-
- FunctionContext *ctx = new FunctionContext(
- [this](int r) {
- on_start_local_image_open_finish(r);
- });
- m_local_image_ctx->state->open(ctx);
-}
-
-void ImageReplayer::on_start_local_image_open_finish(int r)
-{
- dout(20) << "r=" << r << dendl;
-
- if (r < 0) {
- derr << "error opening local image " << m_local_image_id
- << ": " << cpp_strerror(r) << dendl;
-
- FunctionContext *ctx = new FunctionContext(
- [this, r](int r1) {
- assert(r1 == 0);
- delete m_local_image_ctx;
- m_local_image_ctx = nullptr;
- on_start_fail_start(r);
- });
-
- m_threads->work_queue->queue(ctx, 0);
- return;
- }
- if (on_start_interrupted()) {
+ if (m_local_image_ctx != nullptr) {
+ // already opened during bootstrap
+ on_start_wait_for_local_journal_ready_start();
return;
}
- on_start_local_image_lock_start();
+ // open and lock the local image
+ Context *ctx = create_context_callback<
+ ImageReplayer, &ImageReplayer::on_start_local_image_open_finish>(this);
+ OpenLocalImageRequest<> *request = OpenLocalImageRequest<>::create(
+ m_local_ioctx, &m_local_image_ctx, "", m_local_image_id,
+ m_threads->work_queue, ctx);
+ request->send();
}
-void ImageReplayer::on_start_local_image_lock_start()
-{
- dout(20) << "enter" << dendl;
-
- RWLock::WLocker owner_locker(m_local_image_ctx->owner_lock);
- FunctionContext *ctx = new FunctionContext(
- [this](int r) {
- on_start_local_image_lock_finish(r);
- });
- m_local_image_ctx->exclusive_lock->request_lock(ctx);
-}
-
-void ImageReplayer::on_start_local_image_lock_finish(int r)
+void ImageReplayer::on_start_local_image_open_finish(int r)
{
dout(20) << "r=" << r << dendl;
if (r < 0) {
- derr << "error to lock exclusively local image " << m_local_image_id
+ derr << "error opening local image " << m_local_image_id
<< ": " << cpp_strerror(r) << dendl;
on_start_fail_start(r);
return;
}
- if (m_local_image_ctx->journal == nullptr) {
- on_start_fail_start(-EINVAL);
- return;
- }
if (on_start_interrupted()) {
return;
}
{
dout(20) << "enter" << dendl;
- FunctionContext *ctx = new FunctionContext(
- [this](int r) {
- on_stop_local_image_close_finish(r);
- });
-
- m_local_image_ctx->state->close(ctx);
+ // close and delete the image (from outside the image's thread context)
+ Context *ctx = create_context_callback<
+ ImageReplayer, &ImageReplayer::on_stop_local_image_close_finish>(this);
+ CloseImageRequest<> *request = CloseImageRequest<>::create(
+ &m_local_image_ctx, m_threads->work_queue, ctx);
+ request->send();
}
void ImageReplayer::on_stop_local_image_close_finish(int r)
derr << "error closing local image: " << cpp_strerror(r) << dendl;
}
- on_stop_local_image_delete_start();
-}
-
-void ImageReplayer::on_stop_local_image_delete_start()
-{
- FunctionContext *ctx = new FunctionContext(
- [this](int r) {
- delete m_local_image_ctx;
- m_local_image_ctx = nullptr;
- on_stop_local_image_delete_finish(r);
- });
-
- m_threads->work_queue->queue(ctx, 0);
-}
-
-void ImageReplayer::on_stop_local_image_delete_finish(int r)
-{
- assert(r == 0);
-
m_local_ioctx.close();
m_remote_journaler->stop_replay();
m_remote_journaler->committed(*replay_entry);
}
-int ImageReplayer::register_client()
-{
- // TODO allocate snap as part of sync process
- m_snap_name = ".rbd-mirror." + m_client_id;
-
- dout(20) << "mirror_uuid=" << m_client_id << ", "
- << "image_id=" << m_local_image_id << ", "
- << "snap_name=" << m_snap_name << dendl;
-
- bufferlist client_data;
- ::encode(librbd::journal::ClientData{librbd::journal::MirrorPeerClientMeta{
- m_local_image_id, {{m_snap_name, boost::none}}}}, client_data);
- int r = m_remote_journaler->register_client(client_data);
- if (r < 0) {
- derr << "error registering client: " << cpp_strerror(r) << dendl;
- return r;
- }
-
- return 0;
-}
-
-int ImageReplayer::get_bootrstap_params(BootstrapParams *params)
+int ImageReplayer::get_bootstrap_params(BootstrapParams *params)
{
int r = librbd::cls_client::dir_get_name(&m_remote_ioctx, RBD_DIRECTORY,
m_remote_image_id,
return 0;
}
-int ImageReplayer::bootstrap(const BootstrapParams &bootstrap_params)
-{
- // Register client and sync images
-
- dout(20) << "enter" << dendl;
-
- int r;
- BootstrapParams params;
-
- if (!bootstrap_params.empty()) {
- dout(20) << "using external bootstrap params" << dendl;
- params = bootstrap_params;
- } else {
- r = get_bootrstap_params(¶ms);
- if (r < 0) {
- derr << "error obtaining bootstrap parameters: "
- << cpp_strerror(r) << dendl;
- return r;
- }
- }
-
- dout(20) << "bootstrap params: local_pool_name=" << params.local_pool_name
- << ", local_image_name=" << params.local_image_name << dendl;
-
- r = create_local_image(params);
- if (r < 0) {
- derr << "error creating local image " << params.local_image_name
- << " in pool " << params.local_pool_name << ": " << cpp_strerror(r)
- << dendl;
- return r;
- }
-
- r = register_client();
- if (r < 0) {
- derr << "error registering journal client: " << cpp_strerror(r) << dendl;
- return r;
- }
-
- r = copy();
- if (r < 0) {
- derr << "error copying data to local image: " << cpp_strerror(r) << dendl;
- return r;
- }
-
- dout(20) << "succeeded" << dendl;
-
- return 0;
-}
-
-int ImageReplayer::create_local_image(const BootstrapParams &bootstrap_params)
-{
- dout(20) << "enter" << dendl;
-
- librbd::ImageCtx *image_ctx = new librbd::ImageCtx("", m_remote_image_id, nullptr,
- m_remote_ioctx, true);
- int r = image_ctx->state->open();
- if (r < 0) {
- derr << "error opening remote image " << m_remote_image_id
- << ": " << cpp_strerror(r) << dendl;
- return r;
- }
-
- uint64_t size = image_ctx->size;
- uint64_t features = image_ctx->features;
- int order = image_ctx->order;
- uint64_t stripe_unit = image_ctx->stripe_unit;
- uint64_t stripe_count = image_ctx->stripe_count;
-
- image_ctx->state->close();
-
- r = m_local->pool_lookup(bootstrap_params.local_pool_name.c_str());
- if (r < 0) {
- derr << "error finding local pool " << bootstrap_params.local_pool_name
- << ": " << cpp_strerror(r) << dendl;
- return r;
- }
- m_local_pool_id = r;
-
- librados::IoCtx ioctx;
- r = m_local->ioctx_create2(m_local_pool_id, ioctx);
- if (r < 0) {
- derr << "error opening ioctx for local pool " << m_local_pool_id
- << ": " << cpp_strerror(r) << dendl;
- return r;
- }
-
- r = librbd::create(ioctx, bootstrap_params.local_image_name.c_str(), size,
- false, features, &order, stripe_unit, stripe_count);
- if (r < 0) {
- derr << "error creating local image " << m_local_image_id
- << ": " << cpp_strerror(r) << dendl;
- return r;
- }
-
- r = get_image_id(ioctx, bootstrap_params.local_image_name, &m_local_image_id);
- if (r < 0) {
- derr << "error resolving ID for local image " << m_local_image_id
- << ": " << cpp_strerror(r) << dendl;
- return r;
- }
-
- dout(20) << "created, image_id=" << m_local_image_id << dendl;
-
- return 0;
-}
-
-int ImageReplayer::get_image_id(librados::IoCtx &ioctx,
- const std::string &image_name,
- std::string *image_id)
-{
- librbd::ImageCtx *image_ctx = new librbd::ImageCtx(image_name, "", NULL,
- ioctx, true);
- int r = image_ctx->state->open();
- if (r < 0) {
- derr << "error opening remote image " << image_name
- << ": " << cpp_strerror(r) << dendl;
- delete image_ctx;
- return r;
- }
-
- *image_id = image_ctx->id;
- image_ctx->state->close();
- return 0;
-}
-
-int ImageReplayer::copy()
-{
- dout(20) << m_remote_pool_id << "/" << m_remote_image_id << "->"
- << m_local_pool_id << "/" << m_local_image_id << dendl;
-
- librados::IoCtx local_ioctx;
- librbd::ImageCtx *remote_image_ctx, *local_image_ctx;
- librbd::NoOpProgressContext prog_ctx;
- int r;
-
- remote_image_ctx = new librbd::ImageCtx("", m_remote_image_id, nullptr,
- m_remote_ioctx, false);
- r = remote_image_ctx->state->open();
- if (r < 0) {
- derr << "error opening remote image " << m_remote_image_id
- << ": " << cpp_strerror(r) << dendl;
- delete remote_image_ctx;
- return r;
- }
-
- dout(20) << "creating temporary snapshot " << m_snap_name << dendl;
-
- // TODO: use internal snapshots
- r = remote_image_ctx->operations->snap_create(m_snap_name.c_str());
- if (r == -EEXIST) {
- // Probably left after a previous unsuccessful bootsrapt.
- dout(0) << "removing stale snapshot " << m_snap_name << " of remote image "
- << m_remote_image_id << dendl;
- (void)remote_image_ctx->operations->snap_remove(m_snap_name.c_str());
- r = remote_image_ctx->operations->snap_create(m_snap_name.c_str());
- }
- if (r < 0) {
- derr << "error creating snapshot " << m_snap_name << " of remote image "
- << m_remote_image_id << ": " << cpp_strerror(r) << dendl;
- goto cleanup;
- }
-
- remote_image_ctx->state->close();
- remote_image_ctx = new librbd::ImageCtx("", m_remote_image_id,
- m_snap_name.c_str(), m_remote_ioctx,
- true);
- r = remote_image_ctx->state->open();
- if (r < 0) {
- derr << "error opening snapshot " << m_snap_name << " of remote image "
- << m_remote_image_id << ": " << cpp_strerror(r) << dendl;
- delete remote_image_ctx;
- remote_image_ctx = nullptr;
- goto cleanup;
- }
-
- r = m_local->ioctx_create2(m_local_pool_id, local_ioctx);
- if (r < 0) {
- derr << "error opening ioctx for local pool " << m_local_pool_id
- << ": " << cpp_strerror(r) << dendl;
- goto cleanup;
- }
-
- local_image_ctx = new librbd::ImageCtx("", m_local_image_id, nullptr,
- local_ioctx, false);
- r = local_image_ctx->state->open();
- if (r < 0) {
- derr << "error opening local image " << m_local_image_id
- << ": " << cpp_strerror(r) << dendl;
- delete local_image_ctx;
- local_image_ctx = nullptr;
- goto cleanup;
- }
-
- dout(20) << "copying" << dendl;
-
- // TODO: show copy progress in image replay status
- r = librbd::copy(remote_image_ctx, local_image_ctx, prog_ctx);
- if (r < 0) {
- derr << "error copying snapshot " << m_snap_name << " of remote image "
- << m_remote_image_id << " to local image " << m_local_image_id
- << ": " << cpp_strerror(r) << dendl;
- }
-
- local_image_ctx->state->close();
- local_image_ctx = nullptr;
-
- remote_image_ctx->state->close();
- remote_image_ctx = nullptr;
-
- dout(20) << "done" << dendl;
-
-cleanup:
- if (local_image_ctx) {
- local_image_ctx->state->close();
- }
- if (remote_image_ctx) {
- remote_image_ctx->state->close();
- }
- remote_image_ctx = new librbd::ImageCtx("", m_remote_image_id, nullptr,
- m_remote_ioctx, false);
- int r1 = remote_image_ctx->state->open();
- if (r1 < 0) {
- derr << "error opening remote image " << m_remote_image_id
- << ": " << cpp_strerror(r1) << dendl;
- delete remote_image_ctx;
- } else {
- dout(20) << "removing temporary snapshot " << m_snap_name << dendl;
- r1 = remote_image_ctx->operations->snap_remove(m_snap_name.c_str());
- if (r1 < 0) {
- derr << "error removing snapshot " << m_snap_name << " of remote image "
- << m_remote_image_id << ": " << cpp_strerror(r1) << dendl;
- }
- remote_image_ctx->state->close();
- }
-
- return r;
-}
-
void ImageReplayer::shut_down_journal_replay(bool cancel_ops)
{
C_SaferCond cond;
#include "common/WorkQueue.h"
#include "include/rados/librados.hpp"
#include "cls/journal/cls_journal_types.h"
+#include "librbd/journal/Types.h"
#include "types.h"
namespace journal {
}
};
-public:
ImageReplayer(Threads *threads, RadosRef local, RadosRef remote,
const std::string &client_id, int64_t local_pool_id,
int64_t remote_pool_id, const std::string &remote_image_id);
void stop(Context *on_finish = nullptr);
int flush();
- int bootstrap(const BootstrapParams &bootstrap_params);
-
virtual void handle_replay_ready();
virtual void handle_replay_process_ready(int r);
virtual void handle_replay_complete(int r);
/**
* @verbatim
* (error)
- * <uninitialized> <------------------- FAIL
- * | ^
- * v |
- * <starting> |
- * | |
- * v (error) |
- * GET_REGISTERED_CLIENT_STATUS -------->|
- * | |
- * v (error)|
- * BOOTSTRAP (skip if not needed) ------>|
- * | |
- * v (error) |
- * REMOTE_JOURNALER_INIT --------------->|
- * | |
- * v (error) |
- * LOCAL_IMAGE_OPEN -------------------->|
- * | |
- * v (error) |
- * LOCAL_IMAGE_LOCK -------------------->|
- * | |
- * v (error) |
- * WAIT_FOR_LOCAL_JOURNAL_READY -------->/
+ * <uninitialized> <------------------------ FAIL
+ * | ^
+ * v *
+ * <starting> *
+ * | *
+ * v (error) *
+ * GET_REGISTERED_CLIENT_STATUS * * * * * * * *
+ * | *
+ * | (sync required) *
+ * |\-----\ *
+ * | | *
+ * | v *
+ * | BOOTSTRAP_IMAGE * * * * * * * * * * *
+ * | | *
+ * | v *
+ * |/-----/ *
+ * | *
+ * v (no sync required) (error) *
+ * REMOTE_JOURNALER_INIT * * * * * * * * * * *
+ * | *
+ * v (error) *
+ * LOCAL_IMAGE_OPEN (skip if not *
+ * | needed *
+ * v (error) *
+ * WAIT_FOR_LOCAL_JOURNAL_READY * * * * * * * *
* |
* v
* <replaying>
* LOCAL_IMAGE_CLOSE
* |
* v
- * LOCAL_IMAGE_DELETE
- * |
- * v
* <stopped>
*
* @endverbatim
virtual void on_start_get_registered_client_status_finish(int r,
const std::set<cls::journal::Client> ®istered_clients,
const BootstrapParams &bootstrap_params);
- virtual void on_start_bootstrap_start(const BootstrapParams ¶ms);
- virtual void on_start_bootstrap_finish(int r);
+
+ void bootstrap(const BootstrapParams ¶ms);
+ void handle_bootstrap(int r);
+
virtual void on_start_remote_journaler_init_start();
virtual void on_start_remote_journaler_init_finish(int r);
virtual void on_start_local_image_open_start();
virtual void on_start_local_image_open_finish(int r);
- virtual void on_start_local_image_lock_start();
- virtual void on_start_local_image_lock_finish(int r);
virtual void on_start_wait_for_local_journal_ready_start();
virtual void on_start_wait_for_local_journal_ready_finish(int r);
virtual void on_start_fail_start(int r);
virtual void on_stop_journal_replay_shut_down_finish(int r);
virtual void on_stop_local_image_close_start();
virtual void on_stop_local_image_close_finish(int r);
- virtual void on_stop_local_image_delete_start();
- virtual void on_stop_local_image_delete_finish(int r);
void close_local_image(Context *on_finish); // for tests
m_state == STATE_STOPPED; }
bool is_running_() const { return !is_stopped_() && m_state != STATE_STOPPING; }
- int get_bootrstap_params(BootstrapParams *params);
- int register_client();
- int create_local_image(const BootstrapParams &bootstrap_params);
- int get_image_id(librados::IoCtx &ioctx, const std::string &image_name,
- std::string *image_id);
- int copy();
+ int get_bootstrap_params(BootstrapParams *params);
void shut_down_journal_replay(bool cancel_ops);
friend std::ostream &operator<<(std::ostream &os,
const ImageReplayer &replayer);
-private:
+
Threads *m_threads;
RadosRef m_local, m_remote;
std::string m_client_id;
int64_t m_remote_pool_id, m_local_pool_id;
std::string m_remote_image_id, m_local_image_id;
- std::string m_snap_name;
- ContextWQ *m_work_queue;
Mutex m_lock;
State m_state;
std::string m_local_pool_name, m_remote_pool_name;
::journal::ReplayHandler *m_replay_handler;
Context *m_on_finish;
ImageReplayerAdminSocketHook *m_asok_hook;
+
+ librbd::journal::MirrorPeerClientMeta m_client_meta;
};
} // namespace mirror