--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "test/librbd/mock/MockImageCtx.h"
+#include "test/rbd_mirror/test_mock_fixture.h"
+#include "tools/rbd_mirror/ImageReplayer.h"
+#include "tools/rbd_mirror/ImageSyncThrottler.h"
+#include "tools/rbd_mirror/InstanceReplayer.h"
+#include "tools/rbd_mirror/Threads.h"
+
+namespace librbd {
+
+namespace {
+
+struct MockTestImageCtx : public MockImageCtx {
+ MockTestImageCtx(librbd::ImageCtx &image_ctx)
+ : librbd::MockImageCtx(image_ctx) {
+ }
+};
+
+} // anonymous namespace
+
+} // namespace librbd
+
+namespace rbd {
+namespace mirror {
+
+template <>
+struct Threads<librbd::MockTestImageCtx> {
+ Mutex &timer_lock;
+ SafeTimer *timer;
+ ContextWQ *work_queue;
+
+ Threads(Threads<librbd::ImageCtx> *threads)
+ : timer_lock(threads->timer_lock), timer(threads->timer),
+ work_queue(threads->work_queue) {
+ }
+};
+
+template<>
+struct ImageReplayer<librbd::MockTestImageCtx> {
+ static ImageReplayer* s_instance;
+ std::string global_image_id;
+
+ static ImageReplayer *create(
+ Threads<librbd::MockTestImageCtx> *threads,
+ std::shared_ptr<ImageDeleter> image_deleter,
+ ImageSyncThrottlerRef<librbd::MockTestImageCtx> image_sync_throttler,
+ RadosRef local, const std::string &local_mirror_uuid, int64_t local_pool_id,
+ const std::string &global_image_id) {
+ assert(s_instance != nullptr);
+ s_instance->global_image_id = global_image_id;
+ return s_instance;
+ }
+
+ ImageReplayer() {
+ assert(s_instance == nullptr);
+ s_instance = this;
+ }
+
+ virtual ~ImageReplayer() {
+ assert(s_instance == this);
+ s_instance = nullptr;
+ }
+
+ MOCK_METHOD0(destroy, void());
+ MOCK_METHOD2(start, void(Context *, bool));
+ MOCK_METHOD2(stop, void(Context *, bool));
+ MOCK_METHOD0(restart, void());
+ MOCK_METHOD0(flush, void());
+ MOCK_METHOD2(print_status, void(Formatter *, stringstream *));
+ MOCK_METHOD1(set_remote_images, void(const PeerImages &));
+ MOCK_METHOD2(remove_remote_image, void(const std::string &,
+ const std::string &));
+ MOCK_METHOD0(remote_images_empty, bool());
+ MOCK_METHOD0(get_global_image_id, const std::string &());
+ MOCK_METHOD0(get_local_image_id, const std::string &());
+ MOCK_METHOD0(is_running, bool());
+ MOCK_METHOD0(is_stopped, bool());
+ MOCK_METHOD0(is_blacklisted, bool());
+};
+
+template<>
+struct ImageSyncThrottler<librbd::MockTestImageCtx> {
+ ImageSyncThrottler() {
+ }
+ virtual ~ImageSyncThrottler() {
+ }
+};
+
+ImageReplayer<librbd::MockTestImageCtx>* ImageReplayer<librbd::MockTestImageCtx>::s_instance = nullptr;
+
+} // namespace mirror
+} // namespace rbd
+
+// template definitions
+#include "tools/rbd_mirror/InstanceReplayer.cc"
+
+namespace rbd {
+namespace mirror {
+
+using ::testing::_;
+using ::testing::InSequence;
+using ::testing::Invoke;
+using ::testing::Return;
+using ::testing::ReturnRef;
+
+class TestMockInstanceReplayer : public TestMockFixture {
+public:
+ typedef ImageReplayer<librbd::MockTestImageCtx> MockImageReplayer;
+ typedef InstanceReplayer<librbd::MockTestImageCtx> MockInstanceReplayer;
+ typedef Threads<librbd::MockTestImageCtx> MockThreads;
+
+ void SetUp() override {
+ TestMockFixture::SetUp();
+
+ m_mock_threads = new MockThreads(m_threads);
+
+ m_image_deleter.reset(
+ new rbd::mirror::ImageDeleter(m_threads->work_queue, m_threads->timer,
+ &m_threads->timer_lock));
+ m_image_sync_throttler.reset(
+ new rbd::mirror::ImageSyncThrottler<librbd::MockTestImageCtx>());
+ }
+
+ void TearDown() override {
+ delete m_mock_threads;
+ TestMockFixture::TearDown();
+ }
+
+ MockThreads *m_mock_threads;
+ std::shared_ptr<rbd::mirror::ImageDeleter> m_image_deleter;
+ std::shared_ptr<rbd::mirror::ImageSyncThrottler<librbd::MockTestImageCtx>>
+ m_image_sync_throttler;
+};
+
+TEST_F(TestMockInstanceReplayer, AcquireReleaseImage) {
+ MockImageReplayer mock_image_replayer;
+ MockInstanceReplayer instance_replayer(
+ m_mock_threads, m_image_deleter, m_image_sync_throttler,
+ rbd::mirror::RadosRef(new librados::Rados(m_local_io_ctx)),
+ "local_mirror_uuid", m_local_io_ctx.get_id());
+
+ std::string global_image_id("global_image_id");
+ rbd::mirror::instance_watcher::PeerImageIds peer_image_ids =
+ {{"remote_mirror_uuid", "remote_image_id"}};
+
+ EXPECT_CALL(mock_image_replayer, get_global_image_id())
+ .WillRepeatedly(ReturnRef(global_image_id));
+ EXPECT_CALL(mock_image_replayer, is_blacklisted())
+ .WillRepeatedly(Return(false));
+
+ InSequence seq;
+
+ instance_replayer.init();
+ instance_replayer.set_peers({{"remote_mirror_uuid", m_remote_io_ctx}});
+
+ // Acquire
+
+ C_SaferCond on_acquire;
+
+ EXPECT_CALL(mock_image_replayer, set_remote_images(_));
+ EXPECT_CALL(mock_image_replayer, is_stopped())
+ .WillOnce(Return(true));
+ EXPECT_CALL(mock_image_replayer, start(nullptr, false));
+
+ instance_replayer.acquire_image(global_image_id, peer_image_ids, &on_acquire);
+ ASSERT_EQ(0, on_acquire.wait());
+
+ // Release
+
+ C_SaferCond on_release;
+
+ EXPECT_CALL(mock_image_replayer,
+ remove_remote_image("remote_mirror_uuid", "remote_image_id"));
+ EXPECT_CALL(mock_image_replayer, remote_images_empty())
+ .WillOnce(Return(true));
+ EXPECT_CALL(mock_image_replayer, is_stopped())
+ .WillOnce(Return(false));
+ EXPECT_CALL(mock_image_replayer, is_running())
+ .WillOnce(Return(false));
+ EXPECT_CALL(mock_image_replayer, is_stopped())
+ .WillOnce(Return(false));
+ EXPECT_CALL(mock_image_replayer, is_running())
+ .WillOnce(Return(true));
+ EXPECT_CALL(mock_image_replayer, stop(_, false))
+ .WillOnce(CompleteContext(0));
+ EXPECT_CALL(mock_image_replayer, is_stopped())
+ .WillOnce(Return(true));
+ EXPECT_CALL(mock_image_replayer, destroy());
+
+ instance_replayer.release_image("global_image_id", peer_image_ids, false,
+ &on_release);
+ ASSERT_EQ(0, on_release.wait());
+
+ instance_replayer.shut_down();
+}
+
+} // namespace mirror
+} // namespace rbd
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "include/stringify.h"
+#include "common/Timer.h"
+#include "common/debug.h"
+#include "common/errno.h"
+#include "librbd/Utils.h"
+#include "ImageReplayer.h"
+#include "InstanceReplayer.h"
+#include "Threads.h"
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_rbd_mirror
+#undef dout_prefix
+#define dout_prefix *_dout << "rbd::mirror::InstanceReplayer: " \
+ << this << " " << __func__ << ": "
+
+namespace rbd {
+namespace mirror {
+
+using librbd::util::create_async_context_callback;
+using librbd::util::create_context_callback;
+
+template <typename I>
+InstanceReplayer<I>::InstanceReplayer(
+ Threads<I> *threads, std::shared_ptr<ImageDeleter> image_deleter,
+ ImageSyncThrottlerRef<I> image_sync_throttler, RadosRef local_rados,
+ const std::string &local_mirror_uuid, int64_t local_pool_id)
+ : m_threads(threads), m_image_deleter(image_deleter),
+ m_image_sync_throttler(image_sync_throttler), m_local_rados(local_rados),
+ m_local_mirror_uuid(local_mirror_uuid), m_local_pool_id(local_pool_id),
+ m_lock("rbd::mirror::InstanceReplayer " + stringify(local_pool_id)) {
+}
+
+template <typename I>
+InstanceReplayer<I>::~InstanceReplayer() {
+ assert(m_image_state_check_task == nullptr);
+ assert(m_async_op_tracker.empty());
+ assert(m_image_replayers.empty());
+}
+
+template <typename I>
+int InstanceReplayer<I>::init() {
+ C_SaferCond init_ctx;
+ init(&init_ctx);
+ return init_ctx.wait();
+}
+
+template <typename I>
+void InstanceReplayer<I>::init(Context *on_finish) {
+ dout(20) << dendl;
+
+ Context *ctx = new FunctionContext(
+ [this, on_finish] (int r) {
+ {
+ Mutex::Locker timer_locker(m_threads->timer_lock);
+ schedule_image_state_check_task();
+ }
+ on_finish->complete(0);
+ });
+
+ m_threads->work_queue->queue(ctx, 0);
+}
+
+template <typename I>
+void InstanceReplayer<I>::shut_down() {
+ C_SaferCond shut_down_ctx;
+ shut_down(&shut_down_ctx);
+ int r = shut_down_ctx.wait();
+ assert(r == 0);
+}
+
+template <typename I>
+void InstanceReplayer<I>::shut_down(Context *on_finish) {
+ dout(20) << dendl;
+
+ Mutex::Locker locker(m_lock);
+
+ assert(m_on_shut_down == nullptr);
+ m_on_shut_down = on_finish;
+
+ Context *ctx = new FunctionContext(
+ [this] (int r) {
+ cancel_image_state_check_task();
+ wait_for_ops();
+ });
+
+ m_threads->work_queue->queue(ctx, 0);
+}
+
+template <typename I>
+void InstanceReplayer<I>::set_peers(const Peers &peers) {
+ dout(20) << dendl;
+
+ Mutex::Locker locker(m_lock);
+ m_peers = peers;
+}
+
+template <typename I>
+void InstanceReplayer<I>::release_all(Context *on_finish) {
+ dout(20) << dendl;
+
+ Mutex::Locker locker(m_lock);
+
+ C_Gather *gather_ctx = new C_Gather(g_ceph_context, on_finish);
+ for (auto it = m_image_replayers.begin(); it != m_image_replayers.end();
+ it = m_image_replayers.erase(it)) {
+ auto image_replayer = it->second;
+ auto ctx = gather_ctx->new_sub();
+ ctx = new FunctionContext(
+ [image_replayer, ctx] (int r) {
+ image_replayer->destroy();
+ ctx->complete(0);
+ });
+ stop_image_replayer(image_replayer, ctx);
+ }
+ gather_ctx->activate();
+}
+
+template <typename I>
+void InstanceReplayer<I>::acquire_image(
+ const std::string &global_image_id,
+ const instance_watcher::PeerImageIds &peers, Context *on_finish) {
+ dout(20) << "global_image_id=" << global_image_id << dendl;
+
+ Mutex::Locker locker(m_lock);
+
+ assert(m_on_shut_down == nullptr);
+
+ auto it = m_image_replayers.find(global_image_id);
+
+ if (it == m_image_replayers.end()) {
+ auto image_replayer = ImageReplayer<I>::create(
+ m_threads, m_image_deleter, m_image_sync_throttler, m_local_rados,
+ m_local_mirror_uuid, m_local_pool_id, global_image_id);
+
+ dout(20) << global_image_id << ": creating replayer " << image_replayer
+ << dendl;
+
+ it = m_image_replayers.insert(std::make_pair(global_image_id,
+ image_replayer)).first;
+ }
+
+ auto image_replayer = it->second;
+
+ PeerImages remote_images;
+ for (auto &peer : peers) {
+ auto it = m_peers.find(Peer(peer.mirror_uuid));
+ assert(it != m_peers.end());
+ auto io_ctx = it->io_ctx;
+ remote_images.insert({peer.mirror_uuid, io_ctx, peer.image_id});
+ }
+
+ image_replayer->set_remote_images(remote_images);
+
+ start_image_replayer(image_replayer);
+
+ m_threads->work_queue->queue(on_finish, 0);
+}
+
+template <typename I>
+void InstanceReplayer<I>::release_image(
+ const std::string &global_image_id,
+ const instance_watcher::PeerImageIds &peers, bool schedule_delete,
+ Context *on_finish) {
+ dout(20) << "global_image_id=" << global_image_id << ", "
+ << "schedule_delete=" << schedule_delete << dendl;
+
+ Mutex::Locker locker(m_lock);
+
+ assert(m_on_shut_down == nullptr);
+
+ auto it = m_image_replayers.find(global_image_id);
+
+ if (it == m_image_replayers.end()) {
+ dout(20) << global_image_id << ": not found" << dendl;
+ m_threads->work_queue->queue(on_finish, 0);
+ return;
+ }
+
+ auto image_replayer = it->second;
+
+ for (auto &peer : peers) {
+ image_replayer->remove_remote_image(peer.mirror_uuid, peer.image_id);
+ }
+
+ if (!image_replayer->remote_images_empty()) {
+ dout(20) << global_image_id << ": still has remote images" << dendl;
+ m_threads->work_queue->queue(on_finish, 0);
+ return;
+ }
+
+ m_image_replayers.erase(it);
+
+ on_finish = new FunctionContext(
+ [image_replayer, on_finish] (int r) {
+ image_replayer->destroy();
+ on_finish->complete(0);
+ });
+
+ if (schedule_delete) {
+ on_finish = new FunctionContext(
+ [this, image_replayer, on_finish] (int r) {
+ auto global_image_id = image_replayer->get_global_image_id();
+ auto local_image_id = image_replayer->get_local_image_id();
+ if (local_image_id.empty()) {
+ dout(20) << global_image_id << ": unknown local_image_id"
+ << " (image does not exist or primary), skipping delete"
+ << dendl;
+ } else {
+ m_image_deleter->schedule_image_delete(
+ m_local_rados, m_local_pool_id, local_image_id, global_image_id);
+ }
+ on_finish->complete(0);
+ });
+ }
+
+ stop_image_replayer(image_replayer, on_finish);
+}
+
+template <typename I>
+void InstanceReplayer<I>::print_status(Formatter *f, stringstream *ss) {
+ dout(20) << dendl;
+
+ if (!f) {
+ return;
+ }
+
+ Mutex::Locker locker(m_lock);
+
+ f->open_array_section("image_replayers");
+ for (auto &kv : m_image_replayers) {
+ auto &image_replayer = kv.second;
+ image_replayer->print_status(f, ss);
+ }
+ f->close_section();
+}
+
+template <typename I>
+void InstanceReplayer<I>::start()
+{
+ dout(20) << dendl;
+
+ Mutex::Locker locker(m_lock);
+
+ m_manual_stop = false;
+
+ for (auto &kv : m_image_replayers) {
+ auto &image_replayer = kv.second;
+ image_replayer->start(nullptr, true);
+ }
+}
+
+template <typename I>
+void InstanceReplayer<I>::stop()
+{
+ dout(20) << dendl;
+
+ Mutex::Locker locker(m_lock);
+
+ m_manual_stop = true;
+
+ for (auto &kv : m_image_replayers) {
+ auto &image_replayer = kv.second;
+ image_replayer->stop(nullptr, true);
+ }
+}
+
+template <typename I>
+void InstanceReplayer<I>::restart()
+{
+ dout(20) << dendl;
+
+ Mutex::Locker locker(m_lock);
+
+ m_manual_stop = false;
+
+ for (auto &kv : m_image_replayers) {
+ auto &image_replayer = kv.second;
+ image_replayer->restart();
+ }
+}
+
+template <typename I>
+void InstanceReplayer<I>::flush()
+{
+ dout(20) << "enter" << dendl;
+
+ Mutex::Locker locker(m_lock);
+
+ for (auto &kv : m_image_replayers) {
+ auto &image_replayer = kv.second;
+ image_replayer->flush();
+ }
+}
+
+template <typename I>
+void InstanceReplayer<I>::start_image_replayer(
+ ImageReplayer<I> *image_replayer) {
+ assert(m_lock.is_locked());
+
+ std::string global_image_id = image_replayer->get_global_image_id();
+ dout(20) << "global_image_id=" << global_image_id << dendl;
+
+ if (!image_replayer->is_stopped()) {
+ return;
+ } else if (image_replayer->is_blacklisted()) {
+ derr << "blacklisted detected during image replay" << dendl;
+ return;
+ }
+
+ FunctionContext *ctx = new FunctionContext(
+ [this, global_image_id] (int r) {
+ dout(20) << "image deleter result: r=" << r << ", "
+ << "global_image_id=" << global_image_id << dendl;
+
+ Mutex::Locker locker(m_lock);
+ m_async_op_tracker.finish_op();
+
+ if (r == -ESTALE || r == -ECANCELED) {
+ return;
+ }
+
+ auto it = m_image_replayers.find(global_image_id);
+ if (it == m_image_replayers.end()) {
+ return;
+ }
+
+ auto image_replayer = it->second;
+ if (r >= 0) {
+ image_replayer->start(nullptr, false);
+ } else {
+ start_image_replayer(image_replayer);
+ }
+ });
+
+ m_async_op_tracker.start_op();
+ m_image_deleter->wait_for_scheduled_deletion(
+ m_local_pool_id, image_replayer->get_global_image_id(), ctx, false);
+}
+
+template <typename I>
+void InstanceReplayer<I>::start_image_replayers() {
+ dout(20) << dendl;
+
+ Context *ctx = new FunctionContext(
+ [this] (int r) {
+ Mutex::Locker locker(m_lock);
+ m_async_op_tracker.finish_op();
+ if (m_on_shut_down != nullptr) {
+ return;
+ }
+ for (auto &it : m_image_replayers) {
+ start_image_replayer(it.second);
+ }
+ });
+
+ m_async_op_tracker.start_op();
+ m_threads->work_queue->queue(ctx, 0);
+}
+
+
+template <typename I>
+void InstanceReplayer<I>::stop_image_replayer(ImageReplayer<I> *image_replayer,
+ Context *on_finish) {
+ dout(20) << image_replayer << " global_image_id="
+ << image_replayer->get_global_image_id() << ", on_finish="
+ << on_finish << dendl;
+
+ if (image_replayer->is_stopped()) {
+ m_threads->work_queue->queue(on_finish, 0);
+ return;
+ }
+
+ m_async_op_tracker.start_op();
+ Context *ctx = create_async_context_callback(
+ m_threads->work_queue, new FunctionContext(
+ [this, image_replayer, on_finish] (int r) {
+ stop_image_replayer(image_replayer, on_finish);
+ m_async_op_tracker.finish_op();
+ }));
+
+ if (image_replayer->is_running()) {
+ image_replayer->stop(ctx, false);
+ } else {
+ int after = 1;
+ dout(20) << "scheduling image replayer " << image_replayer << " stop after "
+ << after << " sec (task " << ctx << ")" << dendl;
+ ctx = new FunctionContext(
+ [this, after, ctx] (int r) {
+ Mutex::Locker timer_locker(m_threads->timer_lock);
+ m_threads->timer->add_event_after(after, ctx);
+ });
+ m_threads->work_queue->queue(ctx, 0);
+ }
+}
+
+template <typename I>
+void InstanceReplayer<I>::wait_for_ops() {
+ dout(20) << dendl;
+
+ Context *ctx = create_context_callback<
+ InstanceReplayer, &InstanceReplayer<I>::handle_wait_for_ops>(this);
+
+ m_async_op_tracker.wait_for_ops(ctx);
+}
+
+template <typename I>
+void InstanceReplayer<I>::handle_wait_for_ops(int r) {
+ dout(20) << "r=" << r << dendl;
+
+ assert(r == 0);
+
+ Mutex::Locker locker(m_lock);
+ stop_image_replayers();
+}
+
+template <typename I>
+void InstanceReplayer<I>::stop_image_replayers() {
+ dout(20) << dendl;
+
+ assert(m_lock.is_locked());
+
+ Context *ctx = create_async_context_callback(
+ m_threads->work_queue, create_context_callback<InstanceReplayer<I>,
+ &InstanceReplayer<I>::handle_stop_image_replayers>(this));
+
+ C_Gather *gather_ctx = new C_Gather(g_ceph_context, ctx);
+ for (auto &it : m_image_replayers) {
+ stop_image_replayer(it.second, gather_ctx->new_sub());
+ }
+ gather_ctx->activate();
+}
+
+template <typename I>
+void InstanceReplayer<I>::handle_stop_image_replayers(int r) {
+ dout(20) << "r=" << r << dendl;
+
+ assert(r == 0);
+
+ Context *on_finish = nullptr;
+ {
+ Mutex::Locker locker(m_lock);
+
+ for (auto &it : m_image_replayers) {
+ assert(it.second->is_stopped());
+ it.second->destroy();
+ }
+ m_image_replayers.clear();
+
+ assert(m_on_shut_down != nullptr);
+ std::swap(on_finish, m_on_shut_down);
+ }
+ on_finish->complete(r);
+}
+
+template <typename I>
+void InstanceReplayer<I>::cancel_image_state_check_task() {
+ Mutex::Locker timer_locker(m_threads->timer_lock);
+
+ if (m_image_state_check_task == nullptr) {
+ return;
+ }
+
+ dout(20) << m_image_state_check_task << dendl;
+ bool canceled = m_threads->timer->cancel_event(m_image_state_check_task);
+ assert(canceled);
+ m_image_state_check_task = nullptr;
+}
+
+template <typename I>
+void InstanceReplayer<I>::schedule_image_state_check_task() {
+ assert(m_threads->timer_lock.is_locked());
+ assert(m_image_state_check_task == nullptr);
+
+ m_image_state_check_task = new FunctionContext(
+ [this](int r) {
+ assert(m_threads->timer_lock.is_locked());
+ m_image_state_check_task = nullptr;
+ schedule_image_state_check_task();
+ start_image_replayers();
+ });
+
+ int after =
+ max(1, g_ceph_context->_conf->rbd_mirror_image_state_check_interval);
+
+ dout(20) << "scheduling image state check after " << after << " sec (task "
+ << m_image_state_check_task << ")" << dendl;
+ m_threads->timer->add_event_after(after, m_image_state_check_task);
+}
+
+} // namespace mirror
+} // namespace rbd
+
+template class rbd::mirror::InstanceReplayer<librbd::ImageCtx>;
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef RBD_MIRROR_INSTANCE_REPLAYER_H
+#define RBD_MIRROR_INSTANCE_REPLAYER_H
+
+#include <map>
+#include <sstream>
+
+#include "common/AsyncOpTracker.h"
+#include "common/Formatter.h"
+#include "common/Mutex.h"
+#include "tools/rbd_mirror/instance_watcher/Types.h"
+#include "types.h"
+
+namespace librbd { class ImageCtx; }
+
+namespace rbd {
+namespace mirror {
+
+class ImageDeleter;
+
+template <typename> class ImageReplayer;
+template <typename> struct Threads;
+
+template <typename ImageCtxT = librbd::ImageCtx>
+class InstanceReplayer {
+public:
+ static InstanceReplayer* create(
+ Threads<ImageCtxT> *threads, std::shared_ptr<ImageDeleter> image_deleter,
+ ImageSyncThrottlerRef<ImageCtxT> image_sync_throttler, RadosRef local_rados,
+ const std::string &local_mirror_uuid, int64_t local_pool_id) {
+ return new InstanceReplayer(threads, image_deleter, image_sync_throttler,
+ local_rados, local_mirror_uuid, local_pool_id);
+ }
+ void destroy() {
+ delete this;
+ }
+
+ InstanceReplayer(Threads<ImageCtxT> *threads,
+ std::shared_ptr<ImageDeleter> image_deleter,
+ ImageSyncThrottlerRef<ImageCtxT> image_sync_throttler,
+ RadosRef local_rados, const std::string &local_mirror_uuid,
+ int64_t local_pool_id);
+ ~InstanceReplayer();
+
+ int init();
+ void shut_down();
+
+ void init(Context *on_finish);
+ void shut_down(Context *on_finish);
+
+ void set_peers(const Peers &peers);
+
+ void acquire_image(const std::string &global_image_id,
+ const instance_watcher::PeerImageIds &peers,
+ Context *on_finish);
+ void release_image(const std::string &global_image_id,
+ const instance_watcher::PeerImageIds &peers,
+ bool schedule_delete, Context *on_finish);
+ void release_all(Context *on_finish);
+
+ void print_status(Formatter *f, stringstream *ss);
+ void start();
+ void stop();
+ void restart();
+ void flush();
+
+private:
+ /**
+ * @verbatim
+ *
+ * <uninitialized> <-------------------\
+ * | (init) | (repeat for each
+ * v STOP_IMAGE_REPLAYER ---\ image replayer)
+ * SCHEDULE_IMAGE_STATE_CHECK_TASK ^ ^ |
+ * | | | |
+ * v (shut_down) | \---------/
+ * <initialized> -----------------> WAIT_FOR_OPS
+ *
+ * @endverbatim
+ */
+
+ Threads<ImageCtxT> *m_threads;
+ std::shared_ptr<ImageDeleter> m_image_deleter;
+ ImageSyncThrottlerRef<ImageCtxT> m_image_sync_throttler;
+ RadosRef m_local_rados;
+ std::string m_local_mirror_uuid;
+ int64_t m_local_pool_id;
+
+ Mutex m_lock;
+ AsyncOpTracker m_async_op_tracker;
+ std::map<std::string, ImageReplayer<ImageCtxT> *> m_image_replayers;
+ Peers m_peers;
+ Context *m_image_state_check_task = nullptr;
+ Context *m_on_shut_down = nullptr;
+ bool m_manual_stop = false;
+
+ void wait_for_ops();
+ void handle_wait_for_ops(int r);
+
+ void start_image_replayer(ImageReplayer<ImageCtxT> *image_replayer);
+ void start_image_replayers();
+
+ void stop_image_replayer(ImageReplayer<ImageCtxT> *image_replayer,
+ Context *on_finish);
+
+ void stop_image_replayers();
+ void handle_stop_image_replayers(int r);
+
+ void schedule_image_state_check_task();
+ void cancel_image_state_check_task();
+};
+
+} // namespace mirror
+} // namespace rbd
+
+extern template class rbd::mirror::InstanceReplayer<librbd::ImageCtx>;
+
+#endif // RBD_MIRROR_INSTANCE_REPLAYER_H