From: Mykola Golub Date: Fri, 7 Apr 2017 12:14:29 +0000 (+0200) Subject: rbd-mirror A/A: InstanceReplayer class to acquire and release images X-Git-Tag: v12.0.2~138^2~1 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=7558df58106eb97c1c856debe437c4cb36ad2bb9;p=ceph.git rbd-mirror A/A: InstanceReplayer class to acquire and release images Signed-off-by: Mykola Golub --- diff --git a/src/test/rbd_mirror/CMakeLists.txt b/src/test/rbd_mirror/CMakeLists.txt index 88435ff7828..291b457bb60 100644 --- a/src/test/rbd_mirror/CMakeLists.txt +++ b/src/test/rbd_mirror/CMakeLists.txt @@ -19,6 +19,7 @@ add_executable(unittest_rbd_mirror test_mock_ImageReplayer.cc test_mock_ImageSync.cc test_mock_ImageSyncThrottler.cc + test_mock_InstanceReplayer.cc test_mock_InstanceWatcher.cc test_mock_LeaderWatcher.cc test_mock_PoolWatcher.cc diff --git a/src/test/rbd_mirror/test_mock_InstanceReplayer.cc b/src/test/rbd_mirror/test_mock_InstanceReplayer.cc new file mode 100644 index 00000000000..2fef0544773 --- /dev/null +++ b/src/test/rbd_mirror/test_mock_InstanceReplayer.cc @@ -0,0 +1,200 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "test/librbd/mock/MockImageCtx.h" +#include "test/rbd_mirror/test_mock_fixture.h" +#include "tools/rbd_mirror/ImageReplayer.h" +#include "tools/rbd_mirror/ImageSyncThrottler.h" +#include "tools/rbd_mirror/InstanceReplayer.h" +#include "tools/rbd_mirror/Threads.h" + +namespace librbd { + +namespace { + +struct MockTestImageCtx : public MockImageCtx { + MockTestImageCtx(librbd::ImageCtx &image_ctx) + : librbd::MockImageCtx(image_ctx) { + } +}; + +} // anonymous namespace + +} // namespace librbd + +namespace rbd { +namespace mirror { + +template <> +struct Threads { + Mutex &timer_lock; + SafeTimer *timer; + ContextWQ *work_queue; + + Threads(Threads *threads) + : timer_lock(threads->timer_lock), timer(threads->timer), + work_queue(threads->work_queue) { + } +}; + +template<> +struct ImageReplayer { + static ImageReplayer* s_instance; + std::string global_image_id; + + static ImageReplayer *create( + Threads *threads, + std::shared_ptr image_deleter, + ImageSyncThrottlerRef image_sync_throttler, + RadosRef local, const std::string &local_mirror_uuid, int64_t local_pool_id, + const std::string &global_image_id) { + assert(s_instance != nullptr); + s_instance->global_image_id = global_image_id; + return s_instance; + } + + ImageReplayer() { + assert(s_instance == nullptr); + s_instance = this; + } + + virtual ~ImageReplayer() { + assert(s_instance == this); + s_instance = nullptr; + } + + MOCK_METHOD0(destroy, void()); + MOCK_METHOD2(start, void(Context *, bool)); + MOCK_METHOD2(stop, void(Context *, bool)); + MOCK_METHOD0(restart, void()); + MOCK_METHOD0(flush, void()); + MOCK_METHOD2(print_status, void(Formatter *, stringstream *)); + MOCK_METHOD1(set_remote_images, void(const PeerImages &)); + MOCK_METHOD2(remove_remote_image, void(const std::string &, + const std::string &)); + MOCK_METHOD0(remote_images_empty, bool()); + MOCK_METHOD0(get_global_image_id, const std::string &()); + MOCK_METHOD0(get_local_image_id, const std::string &()); + MOCK_METHOD0(is_running, bool()); + MOCK_METHOD0(is_stopped, bool()); + MOCK_METHOD0(is_blacklisted, bool()); +}; + +template<> +struct ImageSyncThrottler { + ImageSyncThrottler() { + } + virtual ~ImageSyncThrottler() { + } +}; + +ImageReplayer* ImageReplayer::s_instance = nullptr; + +} // namespace mirror +} // namespace rbd + +// template definitions +#include "tools/rbd_mirror/InstanceReplayer.cc" + +namespace rbd { +namespace mirror { + +using ::testing::_; +using ::testing::InSequence; +using ::testing::Invoke; +using ::testing::Return; +using ::testing::ReturnRef; + +class TestMockInstanceReplayer : public TestMockFixture { +public: + typedef ImageReplayer MockImageReplayer; + typedef InstanceReplayer MockInstanceReplayer; + typedef Threads MockThreads; + + void SetUp() override { + TestMockFixture::SetUp(); + + m_mock_threads = new MockThreads(m_threads); + + m_image_deleter.reset( + new rbd::mirror::ImageDeleter(m_threads->work_queue, m_threads->timer, + &m_threads->timer_lock)); + m_image_sync_throttler.reset( + new rbd::mirror::ImageSyncThrottler()); + } + + void TearDown() override { + delete m_mock_threads; + TestMockFixture::TearDown(); + } + + MockThreads *m_mock_threads; + std::shared_ptr m_image_deleter; + std::shared_ptr> + m_image_sync_throttler; +}; + +TEST_F(TestMockInstanceReplayer, AcquireReleaseImage) { + MockImageReplayer mock_image_replayer; + MockInstanceReplayer instance_replayer( + m_mock_threads, m_image_deleter, m_image_sync_throttler, + rbd::mirror::RadosRef(new librados::Rados(m_local_io_ctx)), + "local_mirror_uuid", m_local_io_ctx.get_id()); + + std::string global_image_id("global_image_id"); + rbd::mirror::instance_watcher::PeerImageIds peer_image_ids = + {{"remote_mirror_uuid", "remote_image_id"}}; + + EXPECT_CALL(mock_image_replayer, get_global_image_id()) + .WillRepeatedly(ReturnRef(global_image_id)); + EXPECT_CALL(mock_image_replayer, is_blacklisted()) + .WillRepeatedly(Return(false)); + + InSequence seq; + + instance_replayer.init(); + instance_replayer.set_peers({{"remote_mirror_uuid", m_remote_io_ctx}}); + + // Acquire + + C_SaferCond on_acquire; + + EXPECT_CALL(mock_image_replayer, set_remote_images(_)); + EXPECT_CALL(mock_image_replayer, is_stopped()) + .WillOnce(Return(true)); + EXPECT_CALL(mock_image_replayer, start(nullptr, false)); + + instance_replayer.acquire_image(global_image_id, peer_image_ids, &on_acquire); + ASSERT_EQ(0, on_acquire.wait()); + + // Release + + C_SaferCond on_release; + + EXPECT_CALL(mock_image_replayer, + remove_remote_image("remote_mirror_uuid", "remote_image_id")); + EXPECT_CALL(mock_image_replayer, remote_images_empty()) + .WillOnce(Return(true)); + EXPECT_CALL(mock_image_replayer, is_stopped()) + .WillOnce(Return(false)); + EXPECT_CALL(mock_image_replayer, is_running()) + .WillOnce(Return(false)); + EXPECT_CALL(mock_image_replayer, is_stopped()) + .WillOnce(Return(false)); + EXPECT_CALL(mock_image_replayer, is_running()) + .WillOnce(Return(true)); + EXPECT_CALL(mock_image_replayer, stop(_, false)) + .WillOnce(CompleteContext(0)); + EXPECT_CALL(mock_image_replayer, is_stopped()) + .WillOnce(Return(true)); + EXPECT_CALL(mock_image_replayer, destroy()); + + instance_replayer.release_image("global_image_id", peer_image_ids, false, + &on_release); + ASSERT_EQ(0, on_release.wait()); + + instance_replayer.shut_down(); +} + +} // namespace mirror +} // namespace rbd diff --git a/src/tools/rbd_mirror/CMakeLists.txt b/src/tools/rbd_mirror/CMakeLists.txt index 200fdee63e8..4f1acbcbd4f 100644 --- a/src/tools/rbd_mirror/CMakeLists.txt +++ b/src/tools/rbd_mirror/CMakeLists.txt @@ -1,4 +1,5 @@ add_library(rbd_mirror_types STATIC + instance_watcher/Types.cc leader_watcher/Types.cc) set(rbd_mirror_internal @@ -7,6 +8,7 @@ set(rbd_mirror_internal ImageReplayer.cc ImageSync.cc ImageSyncThrottler.cc + InstanceReplayer.cc InstanceWatcher.cc Instances.cc LeaderWatcher.cc diff --git a/src/tools/rbd_mirror/InstanceReplayer.cc b/src/tools/rbd_mirror/InstanceReplayer.cc new file mode 100644 index 00000000000..b1dd697309b --- /dev/null +++ b/src/tools/rbd_mirror/InstanceReplayer.cc @@ -0,0 +1,496 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "include/stringify.h" +#include "common/Timer.h" +#include "common/debug.h" +#include "common/errno.h" +#include "librbd/Utils.h" +#include "ImageReplayer.h" +#include "InstanceReplayer.h" +#include "Threads.h" + +#define dout_context g_ceph_context +#define dout_subsys ceph_subsys_rbd_mirror +#undef dout_prefix +#define dout_prefix *_dout << "rbd::mirror::InstanceReplayer: " \ + << this << " " << __func__ << ": " + +namespace rbd { +namespace mirror { + +using librbd::util::create_async_context_callback; +using librbd::util::create_context_callback; + +template +InstanceReplayer::InstanceReplayer( + Threads *threads, std::shared_ptr image_deleter, + ImageSyncThrottlerRef image_sync_throttler, RadosRef local_rados, + const std::string &local_mirror_uuid, int64_t local_pool_id) + : m_threads(threads), m_image_deleter(image_deleter), + m_image_sync_throttler(image_sync_throttler), m_local_rados(local_rados), + m_local_mirror_uuid(local_mirror_uuid), m_local_pool_id(local_pool_id), + m_lock("rbd::mirror::InstanceReplayer " + stringify(local_pool_id)) { +} + +template +InstanceReplayer::~InstanceReplayer() { + assert(m_image_state_check_task == nullptr); + assert(m_async_op_tracker.empty()); + assert(m_image_replayers.empty()); +} + +template +int InstanceReplayer::init() { + C_SaferCond init_ctx; + init(&init_ctx); + return init_ctx.wait(); +} + +template +void InstanceReplayer::init(Context *on_finish) { + dout(20) << dendl; + + Context *ctx = new FunctionContext( + [this, on_finish] (int r) { + { + Mutex::Locker timer_locker(m_threads->timer_lock); + schedule_image_state_check_task(); + } + on_finish->complete(0); + }); + + m_threads->work_queue->queue(ctx, 0); +} + +template +void InstanceReplayer::shut_down() { + C_SaferCond shut_down_ctx; + shut_down(&shut_down_ctx); + int r = shut_down_ctx.wait(); + assert(r == 0); +} + +template +void InstanceReplayer::shut_down(Context *on_finish) { + dout(20) << dendl; + + Mutex::Locker locker(m_lock); + + assert(m_on_shut_down == nullptr); + m_on_shut_down = on_finish; + + Context *ctx = new FunctionContext( + [this] (int r) { + cancel_image_state_check_task(); + wait_for_ops(); + }); + + m_threads->work_queue->queue(ctx, 0); +} + +template +void InstanceReplayer::set_peers(const Peers &peers) { + dout(20) << dendl; + + Mutex::Locker locker(m_lock); + m_peers = peers; +} + +template +void InstanceReplayer::release_all(Context *on_finish) { + dout(20) << dendl; + + Mutex::Locker locker(m_lock); + + C_Gather *gather_ctx = new C_Gather(g_ceph_context, on_finish); + for (auto it = m_image_replayers.begin(); it != m_image_replayers.end(); + it = m_image_replayers.erase(it)) { + auto image_replayer = it->second; + auto ctx = gather_ctx->new_sub(); + ctx = new FunctionContext( + [image_replayer, ctx] (int r) { + image_replayer->destroy(); + ctx->complete(0); + }); + stop_image_replayer(image_replayer, ctx); + } + gather_ctx->activate(); +} + +template +void InstanceReplayer::acquire_image( + const std::string &global_image_id, + const instance_watcher::PeerImageIds &peers, Context *on_finish) { + dout(20) << "global_image_id=" << global_image_id << dendl; + + Mutex::Locker locker(m_lock); + + assert(m_on_shut_down == nullptr); + + auto it = m_image_replayers.find(global_image_id); + + if (it == m_image_replayers.end()) { + auto image_replayer = ImageReplayer::create( + m_threads, m_image_deleter, m_image_sync_throttler, m_local_rados, + m_local_mirror_uuid, m_local_pool_id, global_image_id); + + dout(20) << global_image_id << ": creating replayer " << image_replayer + << dendl; + + it = m_image_replayers.insert(std::make_pair(global_image_id, + image_replayer)).first; + } + + auto image_replayer = it->second; + + PeerImages remote_images; + for (auto &peer : peers) { + auto it = m_peers.find(Peer(peer.mirror_uuid)); + assert(it != m_peers.end()); + auto io_ctx = it->io_ctx; + remote_images.insert({peer.mirror_uuid, io_ctx, peer.image_id}); + } + + image_replayer->set_remote_images(remote_images); + + start_image_replayer(image_replayer); + + m_threads->work_queue->queue(on_finish, 0); +} + +template +void InstanceReplayer::release_image( + const std::string &global_image_id, + const instance_watcher::PeerImageIds &peers, bool schedule_delete, + Context *on_finish) { + dout(20) << "global_image_id=" << global_image_id << ", " + << "schedule_delete=" << schedule_delete << dendl; + + Mutex::Locker locker(m_lock); + + assert(m_on_shut_down == nullptr); + + auto it = m_image_replayers.find(global_image_id); + + if (it == m_image_replayers.end()) { + dout(20) << global_image_id << ": not found" << dendl; + m_threads->work_queue->queue(on_finish, 0); + return; + } + + auto image_replayer = it->second; + + for (auto &peer : peers) { + image_replayer->remove_remote_image(peer.mirror_uuid, peer.image_id); + } + + if (!image_replayer->remote_images_empty()) { + dout(20) << global_image_id << ": still has remote images" << dendl; + m_threads->work_queue->queue(on_finish, 0); + return; + } + + m_image_replayers.erase(it); + + on_finish = new FunctionContext( + [image_replayer, on_finish] (int r) { + image_replayer->destroy(); + on_finish->complete(0); + }); + + if (schedule_delete) { + on_finish = new FunctionContext( + [this, image_replayer, on_finish] (int r) { + auto global_image_id = image_replayer->get_global_image_id(); + auto local_image_id = image_replayer->get_local_image_id(); + if (local_image_id.empty()) { + dout(20) << global_image_id << ": unknown local_image_id" + << " (image does not exist or primary), skipping delete" + << dendl; + } else { + m_image_deleter->schedule_image_delete( + m_local_rados, m_local_pool_id, local_image_id, global_image_id); + } + on_finish->complete(0); + }); + } + + stop_image_replayer(image_replayer, on_finish); +} + +template +void InstanceReplayer::print_status(Formatter *f, stringstream *ss) { + dout(20) << dendl; + + if (!f) { + return; + } + + Mutex::Locker locker(m_lock); + + f->open_array_section("image_replayers"); + for (auto &kv : m_image_replayers) { + auto &image_replayer = kv.second; + image_replayer->print_status(f, ss); + } + f->close_section(); +} + +template +void InstanceReplayer::start() +{ + dout(20) << dendl; + + Mutex::Locker locker(m_lock); + + m_manual_stop = false; + + for (auto &kv : m_image_replayers) { + auto &image_replayer = kv.second; + image_replayer->start(nullptr, true); + } +} + +template +void InstanceReplayer::stop() +{ + dout(20) << dendl; + + Mutex::Locker locker(m_lock); + + m_manual_stop = true; + + for (auto &kv : m_image_replayers) { + auto &image_replayer = kv.second; + image_replayer->stop(nullptr, true); + } +} + +template +void InstanceReplayer::restart() +{ + dout(20) << dendl; + + Mutex::Locker locker(m_lock); + + m_manual_stop = false; + + for (auto &kv : m_image_replayers) { + auto &image_replayer = kv.second; + image_replayer->restart(); + } +} + +template +void InstanceReplayer::flush() +{ + dout(20) << "enter" << dendl; + + Mutex::Locker locker(m_lock); + + for (auto &kv : m_image_replayers) { + auto &image_replayer = kv.second; + image_replayer->flush(); + } +} + +template +void InstanceReplayer::start_image_replayer( + ImageReplayer *image_replayer) { + assert(m_lock.is_locked()); + + std::string global_image_id = image_replayer->get_global_image_id(); + dout(20) << "global_image_id=" << global_image_id << dendl; + + if (!image_replayer->is_stopped()) { + return; + } else if (image_replayer->is_blacklisted()) { + derr << "blacklisted detected during image replay" << dendl; + return; + } + + FunctionContext *ctx = new FunctionContext( + [this, global_image_id] (int r) { + dout(20) << "image deleter result: r=" << r << ", " + << "global_image_id=" << global_image_id << dendl; + + Mutex::Locker locker(m_lock); + m_async_op_tracker.finish_op(); + + if (r == -ESTALE || r == -ECANCELED) { + return; + } + + auto it = m_image_replayers.find(global_image_id); + if (it == m_image_replayers.end()) { + return; + } + + auto image_replayer = it->second; + if (r >= 0) { + image_replayer->start(nullptr, false); + } else { + start_image_replayer(image_replayer); + } + }); + + m_async_op_tracker.start_op(); + m_image_deleter->wait_for_scheduled_deletion( + m_local_pool_id, image_replayer->get_global_image_id(), ctx, false); +} + +template +void InstanceReplayer::start_image_replayers() { + dout(20) << dendl; + + Context *ctx = new FunctionContext( + [this] (int r) { + Mutex::Locker locker(m_lock); + m_async_op_tracker.finish_op(); + if (m_on_shut_down != nullptr) { + return; + } + for (auto &it : m_image_replayers) { + start_image_replayer(it.second); + } + }); + + m_async_op_tracker.start_op(); + m_threads->work_queue->queue(ctx, 0); +} + + +template +void InstanceReplayer::stop_image_replayer(ImageReplayer *image_replayer, + Context *on_finish) { + dout(20) << image_replayer << " global_image_id=" + << image_replayer->get_global_image_id() << ", on_finish=" + << on_finish << dendl; + + if (image_replayer->is_stopped()) { + m_threads->work_queue->queue(on_finish, 0); + return; + } + + m_async_op_tracker.start_op(); + Context *ctx = create_async_context_callback( + m_threads->work_queue, new FunctionContext( + [this, image_replayer, on_finish] (int r) { + stop_image_replayer(image_replayer, on_finish); + m_async_op_tracker.finish_op(); + })); + + if (image_replayer->is_running()) { + image_replayer->stop(ctx, false); + } else { + int after = 1; + dout(20) << "scheduling image replayer " << image_replayer << " stop after " + << after << " sec (task " << ctx << ")" << dendl; + ctx = new FunctionContext( + [this, after, ctx] (int r) { + Mutex::Locker timer_locker(m_threads->timer_lock); + m_threads->timer->add_event_after(after, ctx); + }); + m_threads->work_queue->queue(ctx, 0); + } +} + +template +void InstanceReplayer::wait_for_ops() { + dout(20) << dendl; + + Context *ctx = create_context_callback< + InstanceReplayer, &InstanceReplayer::handle_wait_for_ops>(this); + + m_async_op_tracker.wait_for_ops(ctx); +} + +template +void InstanceReplayer::handle_wait_for_ops(int r) { + dout(20) << "r=" << r << dendl; + + assert(r == 0); + + Mutex::Locker locker(m_lock); + stop_image_replayers(); +} + +template +void InstanceReplayer::stop_image_replayers() { + dout(20) << dendl; + + assert(m_lock.is_locked()); + + Context *ctx = create_async_context_callback( + m_threads->work_queue, create_context_callback, + &InstanceReplayer::handle_stop_image_replayers>(this)); + + C_Gather *gather_ctx = new C_Gather(g_ceph_context, ctx); + for (auto &it : m_image_replayers) { + stop_image_replayer(it.second, gather_ctx->new_sub()); + } + gather_ctx->activate(); +} + +template +void InstanceReplayer::handle_stop_image_replayers(int r) { + dout(20) << "r=" << r << dendl; + + assert(r == 0); + + Context *on_finish = nullptr; + { + Mutex::Locker locker(m_lock); + + for (auto &it : m_image_replayers) { + assert(it.second->is_stopped()); + it.second->destroy(); + } + m_image_replayers.clear(); + + assert(m_on_shut_down != nullptr); + std::swap(on_finish, m_on_shut_down); + } + on_finish->complete(r); +} + +template +void InstanceReplayer::cancel_image_state_check_task() { + Mutex::Locker timer_locker(m_threads->timer_lock); + + if (m_image_state_check_task == nullptr) { + return; + } + + dout(20) << m_image_state_check_task << dendl; + bool canceled = m_threads->timer->cancel_event(m_image_state_check_task); + assert(canceled); + m_image_state_check_task = nullptr; +} + +template +void InstanceReplayer::schedule_image_state_check_task() { + assert(m_threads->timer_lock.is_locked()); + assert(m_image_state_check_task == nullptr); + + m_image_state_check_task = new FunctionContext( + [this](int r) { + assert(m_threads->timer_lock.is_locked()); + m_image_state_check_task = nullptr; + schedule_image_state_check_task(); + start_image_replayers(); + }); + + int after = + max(1, g_ceph_context->_conf->rbd_mirror_image_state_check_interval); + + dout(20) << "scheduling image state check after " << after << " sec (task " + << m_image_state_check_task << ")" << dendl; + m_threads->timer->add_event_after(after, m_image_state_check_task); +} + +} // namespace mirror +} // namespace rbd + +template class rbd::mirror::InstanceReplayer; diff --git a/src/tools/rbd_mirror/InstanceReplayer.h b/src/tools/rbd_mirror/InstanceReplayer.h new file mode 100644 index 00000000000..5d521ec2bb7 --- /dev/null +++ b/src/tools/rbd_mirror/InstanceReplayer.h @@ -0,0 +1,120 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef RBD_MIRROR_INSTANCE_REPLAYER_H +#define RBD_MIRROR_INSTANCE_REPLAYER_H + +#include +#include + +#include "common/AsyncOpTracker.h" +#include "common/Formatter.h" +#include "common/Mutex.h" +#include "tools/rbd_mirror/instance_watcher/Types.h" +#include "types.h" + +namespace librbd { class ImageCtx; } + +namespace rbd { +namespace mirror { + +class ImageDeleter; + +template class ImageReplayer; +template struct Threads; + +template +class InstanceReplayer { +public: + static InstanceReplayer* create( + Threads *threads, std::shared_ptr image_deleter, + ImageSyncThrottlerRef image_sync_throttler, RadosRef local_rados, + const std::string &local_mirror_uuid, int64_t local_pool_id) { + return new InstanceReplayer(threads, image_deleter, image_sync_throttler, + local_rados, local_mirror_uuid, local_pool_id); + } + void destroy() { + delete this; + } + + InstanceReplayer(Threads *threads, + std::shared_ptr image_deleter, + ImageSyncThrottlerRef image_sync_throttler, + RadosRef local_rados, const std::string &local_mirror_uuid, + int64_t local_pool_id); + ~InstanceReplayer(); + + int init(); + void shut_down(); + + void init(Context *on_finish); + void shut_down(Context *on_finish); + + void set_peers(const Peers &peers); + + void acquire_image(const std::string &global_image_id, + const instance_watcher::PeerImageIds &peers, + Context *on_finish); + void release_image(const std::string &global_image_id, + const instance_watcher::PeerImageIds &peers, + bool schedule_delete, Context *on_finish); + void release_all(Context *on_finish); + + void print_status(Formatter *f, stringstream *ss); + void start(); + void stop(); + void restart(); + void flush(); + +private: + /** + * @verbatim + * + * <-------------------\ + * | (init) | (repeat for each + * v STOP_IMAGE_REPLAYER ---\ image replayer) + * SCHEDULE_IMAGE_STATE_CHECK_TASK ^ ^ | + * | | | | + * v (shut_down) | \---------/ + * -----------------> WAIT_FOR_OPS + * + * @endverbatim + */ + + Threads *m_threads; + std::shared_ptr m_image_deleter; + ImageSyncThrottlerRef m_image_sync_throttler; + RadosRef m_local_rados; + std::string m_local_mirror_uuid; + int64_t m_local_pool_id; + + Mutex m_lock; + AsyncOpTracker m_async_op_tracker; + std::map *> m_image_replayers; + Peers m_peers; + Context *m_image_state_check_task = nullptr; + Context *m_on_shut_down = nullptr; + bool m_manual_stop = false; + + void wait_for_ops(); + void handle_wait_for_ops(int r); + + void start_image_replayer(ImageReplayer *image_replayer); + void start_image_replayers(); + + void stop_image_replayer(ImageReplayer *image_replayer, + Context *on_finish); + + void stop_image_replayers(); + void handle_stop_image_replayers(int r); + + void schedule_image_state_check_task(); + void cancel_image_state_check_task(); +}; + +} // namespace mirror +} // namespace rbd + +extern template class rbd::mirror::InstanceReplayer; + +#endif // RBD_MIRROR_INSTANCE_REPLAYER_H diff --git a/src/tools/rbd_mirror/instance_watcher/Types.cc b/src/tools/rbd_mirror/instance_watcher/Types.cc new file mode 100644 index 00000000000..1fe5316f225 --- /dev/null +++ b/src/tools/rbd_mirror/instance_watcher/Types.cc @@ -0,0 +1,28 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "Types.h" +#include "common/Formatter.h" + +namespace rbd { +namespace mirror { +namespace instance_watcher { + +void PeerImageId::encode(bufferlist &bl) const { + ::encode(mirror_uuid, bl); + ::encode(image_id, bl); +} + +void PeerImageId::decode(bufferlist::iterator &iter) { + ::decode(mirror_uuid, iter); + ::decode(image_id, iter); +} + +void PeerImageId::dump(Formatter *f) const { + f->dump_string("mirror_uuid", mirror_uuid); + f->dump_string("image_id", image_id); +} + +} // namespace instance_watcher +} // namespace mirror +} // namespace rbd diff --git a/src/tools/rbd_mirror/instance_watcher/Types.h b/src/tools/rbd_mirror/instance_watcher/Types.h new file mode 100644 index 00000000000..f4e08e34be4 --- /dev/null +++ b/src/tools/rbd_mirror/instance_watcher/Types.h @@ -0,0 +1,46 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef RBD_MIRROR_INSTANCE_WATCHER_TYPES_H +#define RBD_MIRROR_INSTANCE_WATCHER_TYPES_H + +#include +#include +#include + +#include "include/buffer_fwd.h" +#include "include/encoding.h" +#include "include/int_types.h" + +namespace ceph { class Formatter; } + +namespace rbd { +namespace mirror { +namespace instance_watcher { + +struct PeerImageId { + std::string mirror_uuid; + std::string image_id; + + inline bool operator<(const PeerImageId &rhs) const { + return mirror_uuid < rhs.mirror_uuid; + } + + inline bool operator==(const PeerImageId &rhs) const { + return (mirror_uuid == rhs.mirror_uuid && image_id == rhs.image_id); + } + + void encode(bufferlist& bl) const; + void decode(bufferlist::iterator& it); + void dump(Formatter *f) const; +}; + +WRITE_CLASS_ENCODER(PeerImageId); + +typedef std::set PeerImageIds; + +} // namespace instance_watcher +} // namespace mirror +} // namespace librbd + +#endif // RBD_MIRROR_INSTANCE_WATCHER_TYPES_H