#include "librbd/ImageCtx.h"
#include "librbd/ImageState.h"
#include "tools/rbd_mirror/ImageReplayer.h"
+#include "tools/rbd_mirror/ImageDeleter.h"
#include "tools/rbd_mirror/Threads.h"
#include <string>
rbd::mirror::RadosRef local(new librados::Rados());
rbd::mirror::RadosRef remote(new librados::Rados());
rbd::mirror::Threads *threads = nullptr;
+ std::shared_ptr<rbd::mirror::ImageDeleter> image_deleter;
C_SaferCond start_cond, stop_cond;
threads = new rbd::mirror::Threads(reinterpret_cast<CephContext*>(
local->cct()));
- replayer = new rbd::mirror::ImageReplayer<>(threads, local, remote, client_id,
+
+ image_deleter.reset(new rbd::mirror::ImageDeleter(local, threads->timer,
+ &threads->timer_lock));
+
+ replayer = new rbd::mirror::ImageReplayer<>(threads, image_deleter, local,
+ remote, client_id,
"remote mirror uuid",
local_pool_id, remote_pool_id,
remote_image_id,
#include "tools/rbd_mirror/types.h"
#include "tools/rbd_mirror/ImageReplayer.h"
#include "tools/rbd_mirror/Threads.h"
+#include "tools/rbd_mirror/ImageDeleter.h"
#include "test/librados/test.h"
#include "gtest/gtest.h"
}
};
- TestImageReplayer() : m_watch_handle(0)
+ TestImageReplayer()
+ : m_local_cluster(new librados::Rados()), m_watch_handle(0)
{
- EXPECT_EQ("", connect_cluster_pp(m_local_cluster));
- EXPECT_EQ(0, m_local_cluster.conf_set("rbd_cache", "false"));
+ EXPECT_EQ("", connect_cluster_pp(*m_local_cluster.get()));
+ EXPECT_EQ(0, m_local_cluster->conf_set("rbd_cache", "false"));
m_local_pool_name = get_temp_pool_name();
- EXPECT_EQ(0, m_local_cluster.pool_create(m_local_pool_name.c_str()));
- EXPECT_EQ(0, m_local_cluster.ioctx_create(m_local_pool_name.c_str(),
+ EXPECT_EQ(0, m_local_cluster->pool_create(m_local_pool_name.c_str()));
+ EXPECT_EQ(0, m_local_cluster->ioctx_create(m_local_pool_name.c_str(),
m_local_ioctx));
EXPECT_EQ("", connect_cluster_pp(m_remote_cluster));
m_threads = new rbd::mirror::Threads(reinterpret_cast<CephContext*>(
m_local_ioctx.cct()));
+ m_image_deleter.reset(new rbd::mirror::ImageDeleter(m_local_cluster,
+ m_threads->timer,
+ &m_threads->timer_lock));
}
~TestImageReplayer()
delete m_threads;
EXPECT_EQ(0, m_remote_cluster.pool_delete(m_remote_pool_name.c_str()));
- EXPECT_EQ(0, m_local_cluster.pool_delete(m_local_pool_name.c_str()));
+ EXPECT_EQ(0, m_local_cluster->pool_delete(m_local_pool_name.c_str()));
}
template <typename ImageReplayerT = rbd::mirror::ImageReplayer<> >
void create_replayer() {
- m_replayer = new ImageReplayerT(m_threads,
+ m_replayer = new ImageReplayerT(m_threads, m_image_deleter,
rbd::mirror::RadosRef(new librados::Rados(m_local_ioctx)),
rbd::mirror::RadosRef(new librados::Rados(m_remote_ioctx)),
m_local_mirror_uuid, m_remote_mirror_uuid, m_local_ioctx.get_id(),
static int _image_number;
rbd::mirror::Threads *m_threads = nullptr;
- librados::Rados m_local_cluster, m_remote_cluster;
+ std::shared_ptr<rbd::mirror::ImageDeleter> m_image_deleter;
+ std::shared_ptr<librados::Rados> m_local_cluster;
+ librados::Rados m_remote_cluster;
std::string m_local_mirror_uuid = "local mirror uuid";
std::string m_remote_mirror_uuid = "remote mirror uuid";
std::string m_local_pool_name, m_remote_pool_name;
using std::map;
using std::string;
using std::unique_ptr;
+using std::shared_ptr;
using std::vector;
namespace rbd {
Commands commands;
};
+template <typename I>
+struct ResyncListener : public librbd::journal::ResyncListener {
+ ImageReplayer<I> *img_replayer;
+
+ ResyncListener(ImageReplayer<I> *img_replayer)
+ : img_replayer(img_replayer) {
+ }
+
+ virtual void handle_resync() {
+ img_replayer->resync_image();
+ }
+};
+
} // anonymous namespace
template <typename I>
}
template <typename I>
-ImageReplayer<I>::ImageReplayer(Threads *threads, RadosRef local, RadosRef remote,
+ImageReplayer<I>::ImageReplayer(Threads *threads,
+ shared_ptr<ImageDeleter> image_deleter,
+ RadosRef local, RadosRef remote,
const std::string &local_mirror_uuid,
const std::string &remote_mirror_uuid,
int64_t local_pool_id,
const std::string &remote_image_id,
const std::string &global_image_id) :
m_threads(threads),
+ m_image_deleter(image_deleter),
m_local(local),
m_remote(remote),
m_local_mirror_uuid(local_mirror_uuid),
m_name(stringify(remote_pool_id) + "/" + remote_image_id),
m_lock("rbd::mirror::ImageReplayer " + stringify(remote_pool_id) + " " +
remote_image_id),
- m_progress_cxt(this)
+ m_progress_cxt(this),
+ m_resync_listener(new ResyncListener<I>(this))
{
// Register asok commands using a temporary "remote_pool_name/global_image_id"
// name. When the image name becomes known on start the asok commands will be
assert(m_on_stop_finish == nullptr);
assert(m_bootstrap_request == nullptr);
assert(m_in_flight_status_updates == 0);
+
+ delete m_resync_listener;
delete m_asok_hook;
}
{
Mutex::Locker locker(m_lock);
+ m_local_image_ctx->journal->add_listener(
+ librbd::journal::ListenerType::RESYNC,
+ m_resync_listener);
+
+ bool do_resync = false;
+ r = m_local_image_ctx->journal->check_resync_requested(&do_resync);
+ if (r < 0) {
+ derr << "failed to check if a resync was requested" << dendl;
+ }
+
+ if (do_resync) {
+ Context *on_finish = m_on_start_finish;
+ FunctionContext *ctx = new FunctionContext([this, on_finish](int r) {
+ resync_image(on_finish);
+ });
+ m_on_start_finish = ctx;
+ }
+
std::string name = m_local_ioctx.get_pool_name() + "/" +
m_local_image_ctx->name;
if (m_name != name) {
on_finish->complete(r);
}
+ if (on_replay_interrupted()) {
+ return;
+ }
+
{
Mutex::Locker locker(m_lock);
m_replay_handler = new ReplayHandler<I>(this);
dout(20) << "m_remote_journaler=" << *m_remote_journaler << dendl;
}
- on_replay_interrupted();
}
template <typename I>
shut_down_replay = true;
}
- assert(m_on_stop_finish == nullptr);
- std::swap(m_on_stop_finish, on_finish);
- m_stop_requested = true;
- m_manual_stop = manual;
+ assert(m_on_stop_finish == nullptr);
+ std::swap(m_on_stop_finish, on_finish);
+ m_stop_requested = true;
+ m_manual_stop = manual;
}
}
}
return;
}
- m_remote_journaler->committed(replay_entry);
+ if (m_remote_journaler) {
+ m_remote_journaler->committed(replay_entry);
+ }
}
template <typename I>
ctx = new FunctionContext([this, ctx](int r) {
m_remote_journaler->shut_down(ctx);
});
+ if (m_stopping_for_resync) {
+ ctx = new FunctionContext([this, ctx](int r) {
+ m_remote_journaler->unregister_client(ctx);
+ });
+ }
}
if (m_local_replay != nullptr) {
ctx = new FunctionContext([this, ctx](int r) {
ctx->complete(0);
});
ctx = new FunctionContext([this, ctx](int r) {
+ m_local_image_ctx->journal->remove_listener(
+ librbd::journal::ListenerType::RESYNC, m_resync_listener);
m_local_replay->shut_down(true, ctx);
});
}
return;
}
+ if (m_stopping_for_resync) {
+ m_image_deleter->schedule_image_delete(m_local_pool_id,
+ m_local_image_id,
+ m_local_image_name,
+ m_global_image_id);
+ m_stopping_for_resync = false;
+
+ FunctionContext *ctx = new FunctionContext(
+ [this, r, on_start] (int r) {
+ handle_shut_down(r, on_start);
+ }
+ );
+ m_image_deleter->wait_for_scheduled_deletion(m_local_image_name,
+ ctx, false);
+ return;
+ }
+
std::swap(on_stop, m_on_stop_finish);
m_stop_requested = false;
assert(m_state == STATE_STOPPING);
return "Unknown(" + stringify(state) + ")";
}
+template <typename I>
+void ImageReplayer<I>::resync_image(Context *on_finish) {
+ dout(20) << dendl;
+
+ {
+ Mutex::Locker l(m_lock);
+ m_stopping_for_resync = true;
+ }
+
+ stop(on_finish);
+}
+
template <typename I>
std::ostream &operator<<(std::ostream &os, const ImageReplayer<I> &replayer)
{
#include "librbd/ImageCtx.h"
#include "librbd/journal/Types.h"
#include "librbd/journal/TypeTraits.h"
+#include "ImageDeleter.h"
#include "ProgressContext.h"
#include "types.h"
#include <boost/optional.hpp>
}
};
- ImageReplayer(Threads *threads, RadosRef local, RadosRef remote,
- const std::string &local_mirror_uuid,
+ ImageReplayer(Threads *threads, std::shared_ptr<ImageDeleter> image_deleter,
+ RadosRef local, RadosRef remote,
+ const std::string &local_mirror_uuid,
const std::string &remote_mirror_uuid, int64_t local_pool_id,
int64_t remote_pool_id, const std::string &remote_image_id,
const std::string &global_image_id);
State get_state() { Mutex::Locker l(m_lock); return get_state_(); }
bool is_stopped() { Mutex::Locker l(m_lock); return is_stopped_(); }
bool is_running() { Mutex::Locker l(m_lock); return is_running_(); }
+ bool is_replaying() { Mutex::Locker l(m_lock); return is_replaying_(); }
std::string get_name() { Mutex::Locker l(m_lock); return m_name; };
void set_state_description(int r, const std::string &desc);
void restart(Context *on_finish = nullptr);
void flush(Context *on_finish = nullptr);
+ void resync_image(Context *on_finish=nullptr);
+
void print_status(Formatter *f, stringstream *ss);
virtual void handle_replay_ready();
};
Threads *m_threads;
+ std::shared_ptr<ImageDeleter> m_image_deleter;
RadosRef m_local, m_remote;
std::string m_local_mirror_uuid;
std::string m_remote_mirror_uuid;
librbd::journal::Replay<ImageCtxT> *m_local_replay = nullptr;
Journaler* m_remote_journaler = nullptr;
::journal::ReplayHandler *m_replay_handler = nullptr;
+ librbd::journal::ResyncListener *m_resync_listener;
+ bool m_stopping_for_resync = false;
Context *m_on_start_finish = nullptr;
Context *m_on_stop_finish = nullptr;
bool is_running_() const {
return !is_stopped_() && m_state != STATE_STOPPING && !m_stop_requested;
}
+ bool is_replaying_() const {
+ return m_state == STATE_REPLAYING;
+ }
bool update_mirror_image_status(bool force, const OptionalState &state);
bool start_mirror_image_status_update(bool force, bool restarting);
auto it = m_image_replayers.find(image_id.id);
if (it == m_image_replayers.end()) {
unique_ptr<ImageReplayer<> > image_replayer(new ImageReplayer<>(
- m_threads, m_local, m_remote, local_mirror_uuid, remote_mirror_uuid,
- m_local_pool_id, m_remote_pool_id, image_id.id, image_id.global_id));
+ m_threads, m_image_deleter, m_local, m_remote, local_mirror_uuid,
+ remote_mirror_uuid, m_local_pool_id, m_remote_pool_id, image_id.id,
+ image_id.global_id));
it = m_image_replayers.insert(
std::make_pair(image_id.id, std::move(image_replayer))).first;
}