From e025b22e46dae23e1a9d8af7e4f4ec1443dcb5fb Mon Sep 17 00:00:00 2001 From: Jason Dillaman Date: Tue, 22 Nov 2016 13:47:37 -0500 Subject: [PATCH] rbd-mirror: utilize the mirroring watcher to receive update notifications Fixes: http://tracker.ceph.com/issues/15029 Signed-off-by: Jason Dillaman --- src/common/config_opts.h | 1 - src/librbd/MirroringWatcher.cc | 4 +- src/librbd/MirroringWatcher.h | 6 +- src/librbd/Watcher.cc | 5 + src/librbd/Watcher.h | 1 + src/test/librbd/test_MirroringWatcher.cc | 13 +- src/test/rbd_mirror/CMakeLists.txt | 9 +- src/test/rbd_mirror/mock/MockContextWQ.h | 18 + src/test/rbd_mirror/mock/MockSafeTimer.h | 16 + .../test_mock_RefreshImagesRequest.cc | 6 +- src/test/rbd_mirror/test_PoolWatcher.cc | 56 +- src/test/rbd_mirror/test_mock_PoolWatcher.cc | 807 ++++++++++++++++++ src/tools/rbd_mirror/ImageReplayer.cc | 4 +- src/tools/rbd_mirror/PoolWatcher.cc | 579 +++++++++++-- src/tools/rbd_mirror/PoolWatcher.h | 153 +++- src/tools/rbd_mirror/Replayer.cc | 228 +++-- src/tools/rbd_mirror/Replayer.h | 32 +- .../pool_watcher/RefreshImagesRequest.cc | 4 +- .../pool_watcher/RefreshImagesRequest.h | 4 +- src/tools/rbd_mirror/types.cc | 9 +- src/tools/rbd_mirror/types.h | 7 +- 21 files changed, 1752 insertions(+), 210 deletions(-) create mode 100644 src/test/rbd_mirror/mock/MockContextWQ.h create mode 100644 src/test/rbd_mirror/mock/MockSafeTimer.h create mode 100644 src/test/rbd_mirror/test_mock_PoolWatcher.cc diff --git a/src/common/config_opts.h b/src/common/config_opts.h index 009eb2e2186..4593c7da69e 100644 --- a/src/common/config_opts.h +++ b/src/common/config_opts.h @@ -1397,7 +1397,6 @@ OPTION(rbd_mirror_sync_point_update_age, OPT_DOUBLE, 30) // number of seconds be OPTION(rbd_mirror_concurrent_image_syncs, OPT_U32, 5) // maximum number of image syncs in parallel OPTION(rbd_mirror_pool_replayers_refresh_interval, OPT_INT, 30) // interval to refresh peers in rbd-mirror daemon OPTION(rbd_mirror_delete_retry_interval, OPT_DOUBLE, 30) // interval to check and retry the failed requests in deleter -OPTION(rbd_mirror_image_directory_refresh_interval, OPT_INT, 30) // interval to refresh images in pool watcher OPTION(rbd_mirror_image_state_check_interval, OPT_INT, 30) // interval to get images from pool watcher and set sources in replayer OPTION(rbd_mirror_leader_heartbeat_interval, OPT_INT, 5) // interval (in seconds) between mirror leader heartbeats OPTION(rbd_mirror_leader_max_missed_heartbeats, OPT_INT, 2) // number of missed heartbeats for non-lock owner to attempt to acquire lock diff --git a/src/librbd/MirroringWatcher.cc b/src/librbd/MirroringWatcher.cc index b083baa5e58..753ca91f44b 100644 --- a/src/librbd/MirroringWatcher.cc +++ b/src/librbd/MirroringWatcher.cc @@ -114,7 +114,7 @@ bool MirroringWatcher::handle_payload(const ModeUpdatedPayload &payload, Context *on_notify_ack) { CephContext *cct = this->m_cct; ldout(cct, 20) << ": mode updated: " << payload.mirror_mode << dendl; - handle_mode_updated(payload.mirror_mode, on_notify_ack); + handle_mode_updated(payload.mirror_mode); return true; } @@ -124,7 +124,7 @@ bool MirroringWatcher::handle_payload(const ImageUpdatedPayload &payload, CephContext *cct = this->m_cct; ldout(cct, 20) << ": image state updated" << dendl; handle_image_updated(payload.mirror_image_state, payload.image_id, - payload.global_image_id, on_notify_ack); + payload.global_image_id); return true; } diff --git a/src/librbd/MirroringWatcher.h b/src/librbd/MirroringWatcher.h index 407223bf85c..ede06d8fe08 100644 --- a/src/librbd/MirroringWatcher.h +++ b/src/librbd/MirroringWatcher.h @@ -43,12 +43,10 @@ public: const std::string &global_image_id, Context *on_finish); - virtual void handle_mode_updated(cls::rbd::MirrorMode mirror_mode, - Context *on_ack) = 0; + virtual void handle_mode_updated(cls::rbd::MirrorMode mirror_mode) = 0; virtual void handle_image_updated(cls::rbd::MirrorImageState state, const std::string &image_id, - const std::string &global_image_id, - Context *on_ack) = 0; + const std::string &global_image_id) = 0; private: bool handle_payload(const mirroring_watcher::ModeUpdatedPayload &payload, diff --git a/src/librbd/Watcher.cc b/src/librbd/Watcher.cc index 20e1a88edfc..9d986a1a578 100644 --- a/src/librbd/Watcher.cc +++ b/src/librbd/Watcher.cc @@ -157,6 +157,11 @@ void Watcher::flush(Context *on_finish) { m_notifier.flush(on_finish); } +std::string Watcher::get_oid() const { + RWLock::RLocker locker(m_watch_lock); + return m_oid; +} + void Watcher::set_oid(const string& oid) { RWLock::WLocker l(m_watch_lock); assert(m_watch_state == WATCH_STATE_UNREGISTERED); diff --git a/src/librbd/Watcher.h b/src/librbd/Watcher.h index 3e353bdb645..099b007b8f6 100644 --- a/src/librbd/Watcher.h +++ b/src/librbd/Watcher.h @@ -28,6 +28,7 @@ public: void unregister_watch(Context *on_finish); void flush(Context *on_finish); + std::string get_oid() const; void set_oid(const string& oid); uint64_t get_watch_handle() const { diff --git a/src/test/librbd/test_MirroringWatcher.cc b/src/test/librbd/test_MirroringWatcher.cc index c1365a5e0f5..f29ff933fda 100644 --- a/src/test/librbd/test_MirroringWatcher.cc +++ b/src/test/librbd/test_MirroringWatcher.cc @@ -23,11 +23,10 @@ struct MockMirroringWatcher : public MirroringWatcher<> { : MirroringWatcher<>(image_ctx.md_ctx, image_ctx.op_work_queue) { } - MOCK_METHOD2(handle_mode_updated, void(cls::rbd::MirrorMode, Context*)); - MOCK_METHOD4(handle_image_updated, void(cls::rbd::MirrorImageState, + MOCK_METHOD1(handle_mode_updated, void(cls::rbd::MirrorMode)); + MOCK_METHOD3(handle_image_updated, void(cls::rbd::MirrorImageState, const std::string &, - const std::string &, - Context*)); + const std::string &)); }; } // anonymous namespace @@ -73,7 +72,7 @@ public: }; TEST_F(TestMirroringWatcher, ModeUpdated) { - EXPECT_CALL(*m_image_watcher, handle_mode_updated(cls::rbd::MIRROR_MODE_DISABLED, _)); + EXPECT_CALL(*m_image_watcher, handle_mode_updated(cls::rbd::MIRROR_MODE_DISABLED)); C_SaferCond ctx; MockMirroringWatcher::notify_mode_updated(m_ioctx, cls::rbd::MIRROR_MODE_DISABLED, &ctx); @@ -83,8 +82,8 @@ TEST_F(TestMirroringWatcher, ModeUpdated) { TEST_F(TestMirroringWatcher, ImageStatusUpdated) { EXPECT_CALL(*m_image_watcher, handle_image_updated(cls::rbd::MIRROR_IMAGE_STATE_ENABLED, - StrEq("image id"), StrEq("global image id"), - _)); + StrEq("image id"), + StrEq("global image id"))); C_SaferCond ctx; MockMirroringWatcher::notify_image_updated(m_ioctx, diff --git a/src/test/rbd_mirror/CMakeLists.txt b/src/test/rbd_mirror/CMakeLists.txt index 08ca7fa7d5c..88435ff7828 100644 --- a/src/test/rbd_mirror/CMakeLists.txt +++ b/src/test/rbd_mirror/CMakeLists.txt @@ -9,8 +9,8 @@ set(rbd_mirror_test_srcs test_LeaderWatcher.cc test_fixture.cc ) -add_library(rbd_mirror STATIC ${rbd_mirror_test_srcs}) -set_target_properties(rbd_mirror PROPERTIES COMPILE_FLAGS +add_library(rbd_mirror_test STATIC ${rbd_mirror_test_srcs}) +set_target_properties(rbd_mirror_test PROPERTIES COMPILE_FLAGS ${UNITTEST_CXX_FLAGS}) add_executable(unittest_rbd_mirror @@ -21,6 +21,7 @@ add_executable(unittest_rbd_mirror test_mock_ImageSyncThrottler.cc test_mock_InstanceWatcher.cc test_mock_LeaderWatcher.cc + test_mock_PoolWatcher.cc image_replayer/test_mock_BootstrapRequest.cc image_replayer/test_mock_CreateImageRequest.cc image_replayer/test_mock_EventPreprocessor.cc @@ -40,7 +41,7 @@ add_dependencies(unittest_rbd_mirror cls_lock cls_rbd) target_link_libraries(unittest_rbd_mirror - rbd_mirror + rbd_mirror_test rados_test_stub rbd_mirror_internal rbd_mirror_types @@ -65,7 +66,7 @@ add_executable(ceph_test_rbd_mirror set_target_properties(ceph_test_rbd_mirror PROPERTIES COMPILE_FLAGS ${UNITTEST_CXX_FLAGS}) target_link_libraries(ceph_test_rbd_mirror - rbd_mirror + rbd_mirror_test rbd_mirror_internal rbd_mirror_types rbd_api diff --git a/src/test/rbd_mirror/mock/MockContextWQ.h b/src/test/rbd_mirror/mock/MockContextWQ.h new file mode 100644 index 00000000000..1c0ee88f526 --- /dev/null +++ b/src/test/rbd_mirror/mock/MockContextWQ.h @@ -0,0 +1,18 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_MOCK_CONTEXT_WQ_H +#define CEPH_MOCK_CONTEXT_WQ_H + +#include + +struct Context; + +struct MockContextWQ { + void queue(Context *ctx) { + queue(ctx, 0); + } + MOCK_METHOD2(queue, void(Context *, int)); +}; + +#endif // CEPH_MOCK_CONTEXT_WQ_H diff --git a/src/test/rbd_mirror/mock/MockSafeTimer.h b/src/test/rbd_mirror/mock/MockSafeTimer.h new file mode 100644 index 00000000000..4926660b9ea --- /dev/null +++ b/src/test/rbd_mirror/mock/MockSafeTimer.h @@ -0,0 +1,16 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_MOCK_SAFE_TIMER_H +#define CEPH_MOCK_SAFE_TIMER_H + +#include + +struct Context; + +struct MockSafeTimer { + MOCK_METHOD2(add_event_after, void(double, Context*)); + MOCK_METHOD1(cancel_event, void(Context *)); +}; + +#endif // CEPH_MOCK_SAFE_TIMER_H diff --git a/src/test/rbd_mirror/pool_watcher/test_mock_RefreshImagesRequest.cc b/src/test/rbd_mirror/pool_watcher/test_mock_RefreshImagesRequest.cc index 4e29dba97f6..1e92670d8a5 100644 --- a/src/test/rbd_mirror/pool_watcher/test_mock_RefreshImagesRequest.cc +++ b/src/test/rbd_mirror/pool_watcher/test_mock_RefreshImagesRequest.cc @@ -81,8 +81,7 @@ TEST_F(TestMockPoolWatcherRefreshImagesRequest, Success) { req->send(); ASSERT_EQ(0, ctx.wait()); - ImageIds expected_image_ids = {{"global id", "local id", - boost::optional{"image name"}}}; + ImageIds expected_image_ids = {{"global id", "local id", "image name"}}; ASSERT_EQ(expected_image_ids, image_ids); } @@ -114,8 +113,7 @@ TEST_F(TestMockPoolWatcherRefreshImagesRequest, LargeDirectory) { req->send(); ASSERT_EQ(0, ctx.wait()); - expected_image_ids.insert({"global id", "local id", - boost::optional{"image name"}}); + expected_image_ids.insert({"global id", "local id", "image name"}); ASSERT_EQ(expected_image_ids, image_ids); } diff --git a/src/test/rbd_mirror/test_PoolWatcher.cc b/src/test/rbd_mirror/test_PoolWatcher.cc index bbc9bb123e0..fbdaab9a218 100644 --- a/src/test/rbd_mirror/test_PoolWatcher.cc +++ b/src/test/rbd_mirror/test_PoolWatcher.cc @@ -1,5 +1,6 @@ // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab + #include "include/rados/librados.hpp" #include "include/rbd/librbd.hpp" #include "include/stringify.h" @@ -17,6 +18,7 @@ #include "common/errno.h" #include "common/Mutex.h" #include "tools/rbd_mirror/PoolWatcher.h" +#include "tools/rbd_mirror/Threads.h" #include "tools/rbd_mirror/types.h" #include "test/librados/test.h" #include "gtest/gtest.h" @@ -42,20 +44,48 @@ void register_test_pool_watcher() { class TestPoolWatcher : public ::rbd::mirror::TestFixture { public: -TestPoolWatcher() : m_lock("TestPoolWatcherLock"), - m_image_number(0), m_snap_number(0) + TestPoolWatcher() + : m_lock("TestPoolWatcherLock"), m_pool_watcher_listener(this), + m_image_number(0), m_snap_number(0) { m_cluster = std::make_shared(); EXPECT_EQ("", connect_cluster_pp(*m_cluster)); } - ~TestPoolWatcher() override { + void TearDown() override { + if (m_pool_watcher) { + C_SaferCond ctx; + m_pool_watcher->shut_down(&ctx); + EXPECT_EQ(0, ctx.wait()); + } + m_cluster->wait_for_latest_osdmap(); for (auto& pool : m_pools) { EXPECT_EQ(0, m_cluster->pool_delete(pool.c_str())); } + + TestFixture::TearDown(); } + struct PoolWatcherListener : public PoolWatcher<>::Listener { + TestPoolWatcher *test; + Cond cond; + ImageIds image_ids; + + PoolWatcherListener(TestPoolWatcher *test) : test(test) { + } + + void handle_update(const ImageIds &added_image_ids, + const ImageIds &removed_image_ids) override { + Mutex::Locker locker(test->m_lock); + for (auto &image_id : removed_image_ids) { + image_ids.erase(image_id); + } + image_ids.insert(added_image_ids.begin(), added_image_ids.end()); + cond.Signal(); + } + }; + void create_pool(bool enable_mirroring, const peer_t &peer, string *name=nullptr) { string pool_name = get_temp_pool_name("test-rbd-mirror-"); ASSERT_EQ(0, m_cluster->pool_create(pool_name.c_str())); @@ -67,7 +97,9 @@ TestPoolWatcher() : m_lock("TestPoolWatcherLock"), librados::IoCtx ioctx; ASSERT_EQ(0, m_cluster->ioctx_create2(pool_id, ioctx)); - m_pool_watcher.reset(new PoolWatcher(ioctx, 30, m_lock, m_cond)); + m_pool_watcher.reset(new PoolWatcher<>(m_threads, ioctx, + m_pool_watcher_listener)); + if (enable_mirroring) { ASSERT_EQ(0, librbd::api::Mirror<>::mode_set(ioctx, RBD_MIRROR_MODE_POOL)); @@ -79,6 +111,8 @@ TestPoolWatcher() : m_lock("TestPoolWatcherLock"), if (name != nullptr) { *name = pool_name; } + + m_pool_watcher->init(); } string get_image_id(librados::IoCtx *ioctx, const string &image_name) { @@ -166,15 +200,21 @@ TestPoolWatcher() : m_lock("TestPoolWatcherLock"), } void check_images() { - m_pool_watcher->refresh_images(false); Mutex::Locker l(m_lock); - ASSERT_EQ(m_mirrored_images, m_pool_watcher->get_images()); + while (m_mirrored_images != m_pool_watcher_listener.image_ids) { + if (m_pool_watcher_listener.cond.WaitInterval( + m_lock, utime_t(10, 0)) != 0) { + break; + } + } + + ASSERT_EQ(m_mirrored_images, m_pool_watcher_listener.image_ids); } Mutex m_lock; - Cond m_cond; RadosRef m_cluster; - unique_ptr m_pool_watcher; + PoolWatcherListener m_pool_watcher_listener; + unique_ptr > m_pool_watcher; set m_pools; ImageIds m_mirrored_images; diff --git a/src/test/rbd_mirror/test_mock_PoolWatcher.cc b/src/test/rbd_mirror/test_mock_PoolWatcher.cc new file mode 100644 index 00000000000..737aad9061f --- /dev/null +++ b/src/test/rbd_mirror/test_mock_PoolWatcher.cc @@ -0,0 +1,807 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "test/rbd_mirror/test_mock_fixture.h" +#include "test/librados_test_stub/MockTestMemIoCtxImpl.h" +#include "test/librados_test_stub/MockTestMemRadosClient.h" +#include "test/librbd/mock/MockImageCtx.h" +#include "test/rbd_mirror/mock/MockContextWQ.h" +#include "test/rbd_mirror/mock/MockSafeTimer.h" +#include "librbd/MirroringWatcher.h" +#include "tools/rbd_mirror/Threads.h" +#include "tools/rbd_mirror/PoolWatcher.h" +#include "tools/rbd_mirror/pool_watcher/RefreshImagesRequest.h" +#include "include/stringify.h" + +namespace librbd { +namespace { + +struct MockTestImageCtx : public librbd::MockImageCtx { + MockTestImageCtx(librbd::ImageCtx &image_ctx) + : librbd::MockImageCtx(image_ctx) { + } +}; + +} // anonymous namespace + +struct MockMirroringWatcher { + static MockMirroringWatcher *s_instance; + static MockMirroringWatcher &get_instance() { + assert(s_instance != nullptr); + return *s_instance; + } + + MockMirroringWatcher() { + s_instance = this; + } + + MOCK_CONST_METHOD0(is_unregistered, bool()); + MOCK_METHOD1(register_watch, void(Context*)); + MOCK_METHOD1(unregister_watch, void(Context*)); + + MOCK_CONST_METHOD0(get_oid, std::string()); +}; + +template <> +struct MirroringWatcher { + static MirroringWatcher *s_instance; + + MirroringWatcher(librados::IoCtx &io_ctx, ::MockContextWQ *work_queue) { + s_instance = this; + } + virtual ~MirroringWatcher() { + } + + static MirroringWatcher &get_instance() { + assert(s_instance != nullptr); + return *s_instance; + } + + virtual void handle_rewatch_complete(int r) = 0; + + virtual void handle_mode_updated(cls::rbd::MirrorMode mirror_mode) = 0; + virtual void handle_image_updated(cls::rbd::MirrorImageState state, + const std::string &remote_image_id, + const std::string &global_image_id) = 0; + + bool is_unregistered() const { + return MockMirroringWatcher::get_instance().is_unregistered(); + } + void register_watch(Context *ctx) { + MockMirroringWatcher::get_instance().register_watch(ctx); + } + void unregister_watch(Context *ctx) { + MockMirroringWatcher::get_instance().unregister_watch(ctx); + } + std::string get_oid() const { + return MockMirroringWatcher::get_instance().get_oid(); + } +}; + +MockMirroringWatcher *MockMirroringWatcher::s_instance = nullptr; +MirroringWatcher *MirroringWatcher::s_instance = nullptr; + +} // namespace librbd + +namespace rbd { +namespace mirror { + +template <> +struct Threads { + MockSafeTimer *timer; + Mutex &timer_lock; + + MockContextWQ *work_queue; + + Threads(Threads *threads) + : timer(new MockSafeTimer()), + timer_lock(threads->timer_lock), + work_queue(new MockContextWQ()) { + } + ~Threads() { + delete timer; + delete work_queue; + } +}; + +namespace pool_watcher { + +template <> +struct RefreshImagesRequest { + ImageIds *image_ids = nullptr; + Context *on_finish = nullptr; + static RefreshImagesRequest *s_instance; + static RefreshImagesRequest *create(librados::IoCtx &io_ctx, + ImageIds *image_ids, + Context *on_finish) { + assert(s_instance != nullptr); + s_instance->image_ids = image_ids; + s_instance->on_finish = on_finish; + return s_instance; + } + + MOCK_METHOD0(send, void()); + + RefreshImagesRequest() { + s_instance = this; + } +}; + +RefreshImagesRequest *RefreshImagesRequest::s_instance = nullptr; + +} // namespace pool_watcher + +} // namespace mirror +} // namespace rbd + +// template definitions +#include "tools/rbd_mirror/PoolWatcher.cc" + +namespace rbd { +namespace mirror { + +using ::testing::_; +using ::testing::DoAll; +using ::testing::InSequence; +using ::testing::Invoke; +using ::testing::Return; +using ::testing::StrEq; +using ::testing::WithArg; +using ::testing::WithoutArgs; + +class TestMockPoolWatcher : public TestMockFixture { +public: + typedef PoolWatcher MockPoolWatcher; + typedef Threads MockThreads; + typedef pool_watcher::RefreshImagesRequest MockRefreshImagesRequest; + typedef librbd::MockMirroringWatcher MockMirroringWatcher; + typedef librbd::MirroringWatcher MirroringWatcher; + + struct MockListener : MockPoolWatcher::Listener { + TestMockPoolWatcher *test; + + MockListener(TestMockPoolWatcher *test) : test(test) { + } + + MOCK_METHOD2(handle_update, void(const ImageIds &, const ImageIds &)); + }; + + TestMockPoolWatcher() : m_lock("TestMockPoolWatcher::m_lock") { + } + + void expect_work_queue(MockThreads &mock_threads) { + EXPECT_CALL(*mock_threads.work_queue, queue(_, _)) + .WillRepeatedly(Invoke([this](Context *ctx, int r) { + m_threads->work_queue->queue(ctx, r); + })); + } + + void expect_mirroring_watcher_is_unregistered(MockMirroringWatcher &mock_mirroring_watcher, + bool unregistered) { + EXPECT_CALL(mock_mirroring_watcher, is_unregistered()) + .WillOnce(Return(unregistered)); + } + + void expect_mirroring_watcher_register(MockMirroringWatcher &mock_mirroring_watcher, + int r) { + EXPECT_CALL(mock_mirroring_watcher, register_watch(_)) + .WillOnce(CompleteContext(r)); + } + + void expect_mirroring_watcher_unregister(MockMirroringWatcher &mock_mirroring_watcher, + int r) { + EXPECT_CALL(mock_mirroring_watcher, unregister_watch(_)) + .WillOnce(CompleteContext(r)); + } + + void expect_refresh_images(MockRefreshImagesRequest &request, + const ImageIds &image_ids, int r) { + EXPECT_CALL(request, send()) + .WillOnce(Invoke([&request, image_ids, r]() { + *request.image_ids = image_ids; + request.on_finish->complete(r); + })); + } + + void expect_listener_handle_update(MockListener &mock_listener, + const ImageIds &added_image_ids, + const ImageIds &removed_image_ids) { + EXPECT_CALL(mock_listener, handle_update(added_image_ids, removed_image_ids)) + .WillOnce(WithoutArgs(Invoke([this]() { + Mutex::Locker locker(m_lock); + ++m_update_count; + m_cond.Signal(); + }))); + } + + void expect_dir_list(librados::IoCtx &io_ctx, + const std::string &id, const std::string &name, int r) { + bufferlist in_bl; + ::encode(id, in_bl); + + bufferlist out_bl; + ::encode(name, out_bl); + + EXPECT_CALL(get_mock_io_ctx(io_ctx), + exec(RBD_DIRECTORY, _, StrEq("rbd"), StrEq("dir_get_name"), + ContentsEqual(in_bl), _, _)) + .WillOnce(DoAll(WithArg<5>(Invoke([this, out_bl](bufferlist *bl) { + *bl = out_bl; + Mutex::Locker locker(m_lock); + ++m_get_name_count; + m_cond.Signal(); + })), + Return(r))); + } + + void expect_timer_add_event(MockThreads &mock_threads) { + EXPECT_CALL(*mock_threads.timer, add_event_after(_, _)) + .WillOnce(WithArg<1>(Invoke([](Context *ctx) { + ctx->complete(0); + }))); + } + + int when_shut_down(MockPoolWatcher &mock_pool_watcher) { + C_SaferCond ctx; + mock_pool_watcher.shut_down(&ctx); + return ctx.wait(); + } + + bool wait_for_update(uint32_t count) { + Mutex::Locker locker(m_lock); + while (m_update_count < count) { + if (m_cond.WaitInterval(m_lock, utime_t(10, 0)) != 0) { + break; + } + } + if (m_update_count < count) { + return false; + } + + m_update_count -= count; + return true; + } + + bool wait_for_get_name(uint32_t count) { + Mutex::Locker locker(m_lock); + while (m_get_name_count < count) { + if (m_cond.WaitInterval(m_lock, utime_t(10, 0)) != 0) { + break; + } + } + if (m_get_name_count < count) { + return false; + } + + m_get_name_count -= count; + return true; + } + + Mutex m_lock; + Cond m_cond; + uint32_t m_update_count = 0; + uint32_t m_get_name_count = 0; +}; + +TEST_F(TestMockPoolWatcher, EmptyPool) { + MockThreads mock_threads(m_threads); + expect_work_queue(mock_threads); + + InSequence seq; + MockMirroringWatcher mock_mirroring_watcher; + expect_mirroring_watcher_is_unregistered(mock_mirroring_watcher, true); + expect_mirroring_watcher_register(mock_mirroring_watcher, 0); + + MockRefreshImagesRequest mock_refresh_images_request; + expect_refresh_images(mock_refresh_images_request, {}, 0); + + MockListener mock_listener(this); + expect_listener_handle_update(mock_listener, {}, {}); + + MockPoolWatcher mock_pool_watcher(&mock_threads, m_remote_io_ctx, + mock_listener); + C_SaferCond ctx; + mock_pool_watcher.init(&ctx); + ASSERT_EQ(0, ctx.wait()); + + ASSERT_TRUE(wait_for_update(1)); + expect_mirroring_watcher_unregister(mock_mirroring_watcher, 0); + ASSERT_EQ(0, when_shut_down(mock_pool_watcher)); +} + +TEST_F(TestMockPoolWatcher, NonEmptyPool) { + MockThreads mock_threads(m_threads); + expect_work_queue(mock_threads); + + InSequence seq; + MockMirroringWatcher mock_mirroring_watcher; + expect_mirroring_watcher_is_unregistered(mock_mirroring_watcher, true); + expect_mirroring_watcher_register(mock_mirroring_watcher, 0); + + ImageIds image_ids{ + {"global id 1", "remote id 1", "image name 1"}, + {"global id 2", "remote id 2", "image name 2"}}; + MockRefreshImagesRequest mock_refresh_images_request; + expect_refresh_images(mock_refresh_images_request, image_ids, 0); + + MockListener mock_listener(this); + expect_listener_handle_update(mock_listener, image_ids, {}); + + MockPoolWatcher mock_pool_watcher(&mock_threads, m_remote_io_ctx, + mock_listener); + C_SaferCond ctx; + mock_pool_watcher.init(&ctx); + ASSERT_EQ(0, ctx.wait()); + + ASSERT_TRUE(wait_for_update(1)); + expect_mirroring_watcher_unregister(mock_mirroring_watcher, 0); + ASSERT_EQ(0, when_shut_down(mock_pool_watcher)); +} + +TEST_F(TestMockPoolWatcher, NotifyDuringRefresh) { + MockThreads mock_threads(m_threads); + expect_work_queue(mock_threads); + + InSequence seq; + MockMirroringWatcher mock_mirroring_watcher; + expect_mirroring_watcher_is_unregistered(mock_mirroring_watcher, true); + expect_mirroring_watcher_register(mock_mirroring_watcher, 0); + + ImageIds image_ids{ + {"global id 1", "remote id 1", "image name 1"}, + {"global id 2", "remote id 2", "image name 2"}}; + MockRefreshImagesRequest mock_refresh_images_request; + bool refresh_sent = false; + EXPECT_CALL(mock_refresh_images_request, send()) + .WillOnce(Invoke([this, &mock_refresh_images_request, &image_ids, + &refresh_sent]() { + *mock_refresh_images_request.image_ids = image_ids; + + Mutex::Locker locker(m_lock); + refresh_sent = true; + m_cond.Signal(); + })); + + expect_dir_list(m_remote_io_ctx, "remote id 1a", "image name 1a", 0); + expect_dir_list(m_remote_io_ctx, "remote id 3", "image name 3", 0); + expect_dir_list(m_remote_io_ctx, "dummy", "", -ENOENT); + + MockListener mock_listener(this); + image_ids = { + {"global id 1", "remote id 1a", "image name 1a"}, + {"global id 3", "remote id 3", "image name 3"}}; + expect_listener_handle_update(mock_listener, image_ids, {}); + + MockPoolWatcher mock_pool_watcher(&mock_threads, m_remote_io_ctx, + mock_listener); + mock_pool_watcher.init(nullptr); + + { + Mutex::Locker locker(m_lock); + while (!refresh_sent) { + m_cond.Wait(m_lock); + } + } + + MirroringWatcher::get_instance().handle_image_updated( + cls::rbd::MIRROR_IMAGE_STATE_DISABLING, "remote id 2", "global id 2"); + MirroringWatcher::get_instance().handle_image_updated( + cls::rbd::MIRROR_IMAGE_STATE_ENABLED, "remote id 1a", "global id 1"); + MirroringWatcher::get_instance().handle_image_updated( + cls::rbd::MIRROR_IMAGE_STATE_ENABLED, "remote id 3", "global id 3"); + MirroringWatcher::get_instance().handle_image_updated( + cls::rbd::MIRROR_IMAGE_STATE_ENABLED, "dummy", "dummy"); + wait_for_get_name(3); + + mock_refresh_images_request.on_finish->complete(0); + ASSERT_TRUE(wait_for_update(1)); + + expect_mirroring_watcher_unregister(mock_mirroring_watcher, 0); + ASSERT_EQ(0, when_shut_down(mock_pool_watcher)); +} + +TEST_F(TestMockPoolWatcher, Notify) { + MockThreads mock_threads(m_threads); + + InSequence seq; + MockMirroringWatcher mock_mirroring_watcher; + expect_mirroring_watcher_is_unregistered(mock_mirroring_watcher, true); + expect_mirroring_watcher_register(mock_mirroring_watcher, 0); + + ImageIds image_ids{ + {"global id 1", "remote id 1", "image name 1"}, + {"global id 2", "remote id 2", "image name 2"}}; + MockRefreshImagesRequest mock_refresh_images_request; + expect_refresh_images(mock_refresh_images_request, image_ids, 0); + EXPECT_CALL(*mock_threads.work_queue, queue(_, _)) + .WillOnce(Invoke([this](Context *ctx, int r) { + m_threads->work_queue->queue(ctx, r); + })); + + MockListener mock_listener(this); + expect_listener_handle_update(mock_listener, image_ids, {}); + + Context *notify_ctx = nullptr; + EXPECT_CALL(*mock_threads.work_queue, queue(_, _)) + .WillOnce(Invoke([this, ¬ify_ctx](Context *ctx, int r) { + Mutex::Locker locker(m_lock); + ASSERT_EQ(nullptr, notify_ctx); + notify_ctx = ctx; + m_cond.Signal(); + })); + expect_dir_list(m_remote_io_ctx, "remote id 1a", "image name 1a", 0); + expect_dir_list(m_remote_io_ctx, "remote id 3", "image name 3", 0); + expect_dir_list(m_remote_io_ctx, "dummy", "", -ENOENT); + expect_listener_handle_update( + mock_listener, + {{"global id 1", "remote id 1a", "image name 1a"}, + {"global id 3", "remote id 3", "image name 3"}}, + {{"global id 1", "remote id 1", "image name 1"}, + {"global id 2", "remote id 2", "image name 2"}}); + + MockPoolWatcher mock_pool_watcher(&mock_threads, m_remote_io_ctx, + mock_listener); + C_SaferCond ctx; + mock_pool_watcher.init(&ctx); + ASSERT_EQ(0, ctx.wait()); + ASSERT_TRUE(wait_for_update(1)); + + C_SaferCond flush_ctx; + m_threads->work_queue->queue(&flush_ctx, 0); + ASSERT_EQ(0, flush_ctx.wait()); + + MirroringWatcher::get_instance().handle_image_updated( + cls::rbd::MIRROR_IMAGE_STATE_DISABLING, "remote id 2", "global id 2"); + MirroringWatcher::get_instance().handle_image_updated( + cls::rbd::MIRROR_IMAGE_STATE_DISABLED, "remote id 2", "global id 2"); + MirroringWatcher::get_instance().handle_image_updated( + cls::rbd::MIRROR_IMAGE_STATE_ENABLED, "remote id 1a", "global id 1"); + MirroringWatcher::get_instance().handle_image_updated( + cls::rbd::MIRROR_IMAGE_STATE_ENABLED, "remote id 3", "global id 3"); + MirroringWatcher::get_instance().handle_image_updated( + cls::rbd::MIRROR_IMAGE_STATE_ENABLED, "dummy", "dummy"); + ASSERT_TRUE(wait_for_get_name(3)); + notify_ctx->complete(0); + + ASSERT_TRUE(wait_for_update(1)); + + expect_mirroring_watcher_unregister(mock_mirroring_watcher, 0); + ASSERT_EQ(0, when_shut_down(mock_pool_watcher)); +} + +TEST_F(TestMockPoolWatcher, RegisterWatcherBlacklist) { + MockThreads mock_threads(m_threads); + expect_work_queue(mock_threads); + + InSequence seq; + MockMirroringWatcher mock_mirroring_watcher; + expect_mirroring_watcher_is_unregistered(mock_mirroring_watcher, true); + expect_mirroring_watcher_register(mock_mirroring_watcher, -EBLACKLISTED); + + MockListener mock_listener(this); + MockPoolWatcher mock_pool_watcher(&mock_threads, m_remote_io_ctx, + mock_listener); + C_SaferCond ctx; + mock_pool_watcher.init(&ctx); + ASSERT_EQ(-EBLACKLISTED, ctx.wait()); + ASSERT_TRUE(mock_pool_watcher.is_blacklisted()); + + expect_mirroring_watcher_unregister(mock_mirroring_watcher, 0); + ASSERT_EQ(0, when_shut_down(mock_pool_watcher)); +} + +TEST_F(TestMockPoolWatcher, RegisterWatcherMissing) { + MockThreads mock_threads(m_threads); + expect_work_queue(mock_threads); + + InSequence seq; + MockMirroringWatcher mock_mirroring_watcher; + expect_mirroring_watcher_is_unregistered(mock_mirroring_watcher, true); + expect_mirroring_watcher_register(mock_mirroring_watcher, -ENOENT); + expect_timer_add_event(mock_threads); + + expect_mirroring_watcher_is_unregistered(mock_mirroring_watcher, true); + expect_mirroring_watcher_register(mock_mirroring_watcher, 0); + + MockRefreshImagesRequest mock_refresh_images_request; + expect_refresh_images(mock_refresh_images_request, {}, 0); + + MockListener mock_listener(this); + expect_listener_handle_update(mock_listener, {}, {}); + + MockPoolWatcher mock_pool_watcher(&mock_threads, m_remote_io_ctx, + mock_listener); + C_SaferCond ctx; + mock_pool_watcher.init(&ctx); + ASSERT_EQ(0, ctx.wait()); + + ASSERT_TRUE(wait_for_update(1)); + expect_mirroring_watcher_unregister(mock_mirroring_watcher, 0); + ASSERT_EQ(0, when_shut_down(mock_pool_watcher)); +} + +TEST_F(TestMockPoolWatcher, RegisterWatcherError) { + MockThreads mock_threads(m_threads); + expect_work_queue(mock_threads); + + InSequence seq; + MockMirroringWatcher mock_mirroring_watcher; + expect_mirroring_watcher_is_unregistered(mock_mirroring_watcher, true); + expect_mirroring_watcher_register(mock_mirroring_watcher, -EINVAL); + expect_timer_add_event(mock_threads); + + expect_mirroring_watcher_is_unregistered(mock_mirroring_watcher, true); + expect_mirroring_watcher_register(mock_mirroring_watcher, 0); + + MockRefreshImagesRequest mock_refresh_images_request; + expect_refresh_images(mock_refresh_images_request, {}, 0); + + MockListener mock_listener(this); + expect_listener_handle_update(mock_listener, {}, {}); + + MockPoolWatcher mock_pool_watcher(&mock_threads, m_remote_io_ctx, + mock_listener); + C_SaferCond ctx; + mock_pool_watcher.init(&ctx); + ASSERT_EQ(0, ctx.wait()); + + ASSERT_TRUE(wait_for_update(1)); + expect_mirroring_watcher_unregister(mock_mirroring_watcher, 0); + ASSERT_EQ(0, when_shut_down(mock_pool_watcher)); +} + +TEST_F(TestMockPoolWatcher, RefreshBlacklist) { + MockThreads mock_threads(m_threads); + expect_work_queue(mock_threads); + + InSequence seq; + MockMirroringWatcher mock_mirroring_watcher; + expect_mirroring_watcher_is_unregistered(mock_mirroring_watcher, true); + expect_mirroring_watcher_register(mock_mirroring_watcher, 0); + + MockRefreshImagesRequest mock_refresh_images_request; + expect_refresh_images(mock_refresh_images_request, {}, -EBLACKLISTED); + + MockListener mock_listener(this); + MockPoolWatcher mock_pool_watcher(&mock_threads, m_remote_io_ctx, + mock_listener); + C_SaferCond ctx; + mock_pool_watcher.init(&ctx); + ASSERT_EQ(-EBLACKLISTED, ctx.wait()); + ASSERT_TRUE(mock_pool_watcher.is_blacklisted()); + + expect_mirroring_watcher_unregister(mock_mirroring_watcher, 0); + ASSERT_EQ(0, when_shut_down(mock_pool_watcher)); +} + +TEST_F(TestMockPoolWatcher, RefreshMissing) { + MockThreads mock_threads(m_threads); + expect_work_queue(mock_threads); + + InSequence seq; + MockMirroringWatcher mock_mirroring_watcher; + expect_mirroring_watcher_is_unregistered(mock_mirroring_watcher, true); + expect_mirroring_watcher_register(mock_mirroring_watcher, 0); + + MockRefreshImagesRequest mock_refresh_images_request; + expect_refresh_images(mock_refresh_images_request, {}, -ENOENT); + + MockListener mock_listener(this); + expect_listener_handle_update(mock_listener, {}, {}); + + MockPoolWatcher mock_pool_watcher(&mock_threads, m_remote_io_ctx, + mock_listener); + C_SaferCond ctx; + mock_pool_watcher.init(&ctx); + ASSERT_EQ(0, ctx.wait()); + + ASSERT_TRUE(wait_for_update(1)); + expect_mirroring_watcher_unregister(mock_mirroring_watcher, 0); + ASSERT_EQ(0, when_shut_down(mock_pool_watcher)); +} + +TEST_F(TestMockPoolWatcher, RefreshError) { + MockThreads mock_threads(m_threads); + expect_work_queue(mock_threads); + + InSequence seq; + MockMirroringWatcher mock_mirroring_watcher; + expect_mirroring_watcher_is_unregistered(mock_mirroring_watcher, true); + expect_mirroring_watcher_register(mock_mirroring_watcher, 0); + + MockRefreshImagesRequest mock_refresh_images_request; + expect_refresh_images(mock_refresh_images_request, {}, -EINVAL); + expect_timer_add_event(mock_threads); + + expect_mirroring_watcher_is_unregistered(mock_mirroring_watcher, false); + expect_refresh_images(mock_refresh_images_request, {}, 0); + + MockListener mock_listener(this); + expect_listener_handle_update(mock_listener, {}, {}); + + MockPoolWatcher mock_pool_watcher(&mock_threads, m_remote_io_ctx, + mock_listener); + C_SaferCond ctx; + mock_pool_watcher.init(&ctx); + ASSERT_EQ(0, ctx.wait()); + + ASSERT_TRUE(wait_for_update(1)); + expect_mirroring_watcher_unregister(mock_mirroring_watcher, 0); + ASSERT_EQ(0, when_shut_down(mock_pool_watcher)); +} + +TEST_F(TestMockPoolWatcher, Rewatch) { + MockThreads mock_threads(m_threads); + expect_work_queue(mock_threads); + + InSequence seq; + MockMirroringWatcher mock_mirroring_watcher; + expect_mirroring_watcher_is_unregistered(mock_mirroring_watcher, true); + expect_mirroring_watcher_register(mock_mirroring_watcher, 0); + + MockRefreshImagesRequest mock_refresh_images_request; + expect_refresh_images(mock_refresh_images_request, {}, 0); + + MockListener mock_listener(this); + expect_listener_handle_update(mock_listener, {}, {}); + + expect_timer_add_event(mock_threads); + expect_mirroring_watcher_is_unregistered(mock_mirroring_watcher, false); + expect_refresh_images(mock_refresh_images_request, {{"global id", "image id", "name"}}, 0); + expect_listener_handle_update(mock_listener, {{"global id", "image id", "name"}}, {}); + + MockPoolWatcher mock_pool_watcher(&mock_threads, m_remote_io_ctx, + mock_listener); + C_SaferCond ctx; + mock_pool_watcher.init(&ctx); + ASSERT_EQ(0, ctx.wait()); + ASSERT_TRUE(wait_for_update(1)); + + MirroringWatcher::get_instance().handle_rewatch_complete(0); + ASSERT_TRUE(wait_for_update(1)); + + expect_mirroring_watcher_unregister(mock_mirroring_watcher, 0); + ASSERT_EQ(0, when_shut_down(mock_pool_watcher)); +} + +TEST_F(TestMockPoolWatcher, RewatchBlacklist) { + MockThreads mock_threads(m_threads); + expect_work_queue(mock_threads); + + InSequence seq; + MockMirroringWatcher mock_mirroring_watcher; + expect_mirroring_watcher_is_unregistered(mock_mirroring_watcher, true); + expect_mirroring_watcher_register(mock_mirroring_watcher, 0); + + MockRefreshImagesRequest mock_refresh_images_request; + expect_refresh_images(mock_refresh_images_request, {}, 0); + + MockListener mock_listener(this); + expect_listener_handle_update(mock_listener, {}, {}); + + MockPoolWatcher mock_pool_watcher(&mock_threads, m_remote_io_ctx, + mock_listener); + C_SaferCond ctx; + mock_pool_watcher.init(&ctx); + ASSERT_EQ(0, ctx.wait()); + ASSERT_TRUE(wait_for_update(1)); + + MirroringWatcher::get_instance().handle_rewatch_complete(-EBLACKLISTED); + ASSERT_TRUE(mock_pool_watcher.is_blacklisted()); + + expect_mirroring_watcher_unregister(mock_mirroring_watcher, 0); + ASSERT_EQ(0, when_shut_down(mock_pool_watcher)); +} + +TEST_F(TestMockPoolWatcher, RewatchError) { + MockThreads mock_threads(m_threads); + expect_work_queue(mock_threads); + + InSequence seq; + MockMirroringWatcher mock_mirroring_watcher; + expect_mirroring_watcher_is_unregistered(mock_mirroring_watcher, true); + expect_mirroring_watcher_register(mock_mirroring_watcher, 0); + + MockRefreshImagesRequest mock_refresh_images_request; + expect_refresh_images(mock_refresh_images_request, {}, 0); + + MockListener mock_listener(this); + expect_listener_handle_update(mock_listener, {}, {}); + + expect_timer_add_event(mock_threads); + expect_mirroring_watcher_is_unregistered(mock_mirroring_watcher, false); + expect_refresh_images(mock_refresh_images_request, {{"global id", "image id", "name"}}, 0); + expect_listener_handle_update(mock_listener, {{"global id", "image id", "name"}}, {}); + + MockPoolWatcher mock_pool_watcher(&mock_threads, m_remote_io_ctx, + mock_listener); + C_SaferCond ctx; + mock_pool_watcher.init(&ctx); + ASSERT_EQ(0, ctx.wait()); + ASSERT_TRUE(wait_for_update(1)); + + MirroringWatcher::get_instance().handle_rewatch_complete(-EINVAL); + ASSERT_TRUE(wait_for_update(1)); + + expect_mirroring_watcher_unregister(mock_mirroring_watcher, 0); + ASSERT_EQ(0, when_shut_down(mock_pool_watcher)); +} + +TEST_F(TestMockPoolWatcher, GetImageNameBlacklist) { + MockThreads mock_threads(m_threads); + expect_work_queue(mock_threads); + + InSequence seq; + MockMirroringWatcher mock_mirroring_watcher; + expect_mirroring_watcher_is_unregistered(mock_mirroring_watcher, true); + expect_mirroring_watcher_register(mock_mirroring_watcher, 0); + + MockRefreshImagesRequest mock_refresh_images_request; + expect_refresh_images(mock_refresh_images_request, {}, 0); + + MockListener mock_listener(this); + expect_listener_handle_update(mock_listener, {}, {}); + + expect_dir_list(m_remote_io_ctx, "remote id", "image name", -EBLACKLISTED); + + MockPoolWatcher mock_pool_watcher(&mock_threads, m_remote_io_ctx, + mock_listener); + C_SaferCond ctx; + mock_pool_watcher.init(&ctx); + ASSERT_EQ(0, ctx.wait()); + ASSERT_TRUE(wait_for_update(1)); + + MirroringWatcher::get_instance().handle_image_updated( + cls::rbd::MIRROR_IMAGE_STATE_ENABLED, "remote id", "global id"); + ASSERT_TRUE(wait_for_get_name(1)); + while (true) { + if (mock_pool_watcher.is_blacklisted()) { + break; + } + usleep(1000); + } + + expect_mirroring_watcher_unregister(mock_mirroring_watcher, 0); + ASSERT_EQ(0, when_shut_down(mock_pool_watcher)); +} + +TEST_F(TestMockPoolWatcher, GetImageNameError) { + MockThreads mock_threads(m_threads); + expect_work_queue(mock_threads); + + InSequence seq; + MockMirroringWatcher mock_mirroring_watcher; + expect_mirroring_watcher_is_unregistered(mock_mirroring_watcher, true); + expect_mirroring_watcher_register(mock_mirroring_watcher, 0); + + MockRefreshImagesRequest mock_refresh_images_request; + expect_refresh_images(mock_refresh_images_request, {}, 0); + + MockListener mock_listener(this); + expect_listener_handle_update(mock_listener, {}, {}); + + expect_dir_list(m_remote_io_ctx, "remote id", "image name", -EINVAL); + expect_timer_add_event(mock_threads); + + expect_mirroring_watcher_is_unregistered(mock_mirroring_watcher, false); + expect_refresh_images(mock_refresh_images_request, {{"global id", "remote id", "name"}}, 0); + expect_listener_handle_update(mock_listener, {{"global id", "remote id", "name"}}, {}); + + MockPoolWatcher mock_pool_watcher(&mock_threads, m_remote_io_ctx, + mock_listener); + C_SaferCond ctx; + mock_pool_watcher.init(&ctx); + ASSERT_EQ(0, ctx.wait()); + ASSERT_TRUE(wait_for_update(1)); + + MirroringWatcher::get_instance().handle_image_updated( + cls::rbd::MIRROR_IMAGE_STATE_ENABLED, "remote id", "global id"); + ASSERT_TRUE(wait_for_get_name(1)); + ASSERT_TRUE(wait_for_update(1)); + + expect_mirroring_watcher_unregister(mock_mirroring_watcher, 0); + ASSERT_EQ(0, when_shut_down(mock_pool_watcher)); +} + +} // namespace mirror +} // namespace rbd diff --git a/src/tools/rbd_mirror/ImageReplayer.cc b/src/tools/rbd_mirror/ImageReplayer.cc index 7de87f3d15c..0a58a6b72c6 100644 --- a/src/tools/rbd_mirror/ImageReplayer.cc +++ b/src/tools/rbd_mirror/ImageReplayer.cc @@ -295,8 +295,9 @@ ImageReplayer::ImageReplayer(Threads *threads, << ": " << cpp_strerror(r) << dendl; pool_name = stringify(m_local_pool_id); } - m_name = pool_name + "/" + m_global_image_id; + m_name = pool_name + "/" + m_global_image_id; + dout(20) << "registered asok hook: " << m_name << dendl; m_asok_hook = new ImageReplayerAdminSocketHook(g_ceph_context, m_name, this); } @@ -515,6 +516,7 @@ void ImageReplayer::handle_bootstrap(int r) { } } if (!m_asok_hook) { + dout(20) << "registered asok hook: " << m_name << dendl; m_asok_hook = new ImageReplayerAdminSocketHook(g_ceph_context, m_name, this); } diff --git a/src/tools/rbd_mirror/PoolWatcher.cc b/src/tools/rbd_mirror/PoolWatcher.cc index f654855faab..270180738e0 100644 --- a/src/tools/rbd_mirror/PoolWatcher.cc +++ b/src/tools/rbd_mirror/PoolWatcher.cc @@ -1,17 +1,21 @@ // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab -#include - +#include "tools/rbd_mirror/PoolWatcher.h" +#include "include/rbd_types.h" +#include "cls/rbd/cls_rbd_client.h" #include "common/debug.h" #include "common/errno.h" - -#include "cls/rbd/cls_rbd_client.h" -#include "include/rbd_types.h" +#include "common/Timer.h" +#include "librbd/ImageCtx.h" #include "librbd/internal.h" +#include "librbd/MirroringWatcher.h" +#include "librbd/Utils.h" #include "librbd/api/Image.h" #include "librbd/api/Mirror.h" -#include "PoolWatcher.h" +#include "tools/rbd_mirror/Threads.h" +#include "tools/rbd_mirror/pool_watcher/RefreshImagesRequest.h" +#include #define dout_context g_ceph_context #define dout_subsys ceph_subsys_rbd_mirror @@ -23,123 +27,526 @@ using std::list; using std::string; using std::unique_ptr; using std::vector; - -using librados::Rados; -using librados::IoCtx; -using librbd::cls_client::mirror_image_list; +using librbd::util::create_context_callback; +using librbd::util::create_rados_callback; namespace rbd { namespace mirror { -PoolWatcher::PoolWatcher(librados::IoCtx &remote_io_ctx, - double interval_seconds, - Mutex &lock, Cond &cond) : - m_lock(lock), - m_refresh_cond(cond), - m_timer(g_ceph_context, m_lock, false), - m_interval(interval_seconds) -{ - m_remote_io_ctx.dup(remote_io_ctx); - m_timer.init(); +template +class PoolWatcher::MirroringWatcher : public librbd::MirroringWatcher { +public: + using ContextWQ = typename std::decay< + typename std::remove_pointer< + decltype(Threads::work_queue)>::type>::type; + + MirroringWatcher(librados::IoCtx &io_ctx, ContextWQ *work_queue, + PoolWatcher *pool_watcher) + : librbd::MirroringWatcher(io_ctx, work_queue), + m_pool_watcher(pool_watcher) { + } + + void handle_rewatch_complete(int r) override { + m_pool_watcher->handle_rewatch_complete(r); + } + + void handle_mode_updated(cls::rbd::MirrorMode mirror_mode) override { + // do nothing + } + + void handle_image_updated(cls::rbd::MirrorImageState state, + const std::string &remote_image_id, + const std::string &global_image_id) override { + bool enabled = (state == cls::rbd::MIRROR_IMAGE_STATE_ENABLED); + m_pool_watcher->handle_image_updated(remote_image_id, global_image_id, + enabled); + } + +private: + PoolWatcher *m_pool_watcher; +}; + +template +PoolWatcher::PoolWatcher(Threads *threads, librados::IoCtx &remote_io_ctx, + Listener &listener) + : m_threads(threads), m_remote_io_ctx(remote_io_ctx), m_listener(listener), + m_lock(librbd::util::unique_lock_name("rbd::mirror::PoolWatcher", this)) { + m_mirroring_watcher = new MirroringWatcher(m_remote_io_ctx, + m_threads->work_queue, this); } -PoolWatcher::~PoolWatcher() -{ - Mutex::Locker l(m_lock); - m_stopping = true; - m_timer.shutdown(); +template +PoolWatcher::~PoolWatcher() { + delete m_mirroring_watcher; } -bool PoolWatcher::is_blacklisted() const { - assert(m_lock.is_locked()); +template +bool PoolWatcher::is_blacklisted() const { + Mutex::Locker locker(m_lock); return m_blacklisted; } -const ImageIds& PoolWatcher::get_images() const -{ - assert(m_lock.is_locked()); - return m_images; +template +void PoolWatcher::init(Context *on_finish) { + dout(5) << dendl; + + { + Mutex::Locker locker(m_lock); + m_on_init_finish = on_finish; + } + + // start async updates for mirror image directory + register_watcher(); +} + +template +void PoolWatcher::shut_down(Context *on_finish) { + dout(5) << dendl; + + { + Mutex::Locker timer_locker(m_threads->timer_lock); + Mutex::Locker locker(m_lock); + + assert(!m_shutting_down); + m_shutting_down = true; + if (m_timer_ctx != nullptr) { + m_threads->timer->cancel_event(m_timer_ctx); + m_timer_ctx = nullptr; + } + } + + // in-progress unregister tracked as async op + unregister_watcher(); + + m_async_op_tracker.wait_for_ops(on_finish); } -void PoolWatcher::refresh_images(bool reschedule) -{ - ImageIds image_ids; - int r = refresh(&image_ids); +template +void PoolWatcher::register_watcher() { + { + Mutex::Locker locker(m_lock); + assert(m_image_ids_invalid); + assert(!m_refresh_in_progress); + m_refresh_in_progress = true; + } + + // if the watch registration is in-flight, let the watcher + // handle the transition -- only (re-)register if it's not registered + if (!m_mirroring_watcher->is_unregistered()) { + refresh_images(); + return; + } + + // first time registering or the watch failed + dout(5) << dendl; + m_async_op_tracker.start_op(); + + Context *ctx = create_context_callback< + PoolWatcher, &PoolWatcher::handle_register_watcher>(this); + m_mirroring_watcher->register_watch(ctx); +} + +template +void PoolWatcher::handle_register_watcher(int r) { + dout(5) << "r=" << r << dendl; + + { + Mutex::Locker locker(m_lock); + assert(m_image_ids_invalid); + assert(m_refresh_in_progress); + if (r < 0) { + m_refresh_in_progress = false; + } + } - Mutex::Locker l(m_lock); + Context *on_init_finish = nullptr; if (r >= 0) { - m_images = std::move(image_ids); + refresh_images(); } else if (r == -EBLACKLISTED) { - derr << "blacklisted during image refresh" << dendl; + dout(0) << "detected client is blacklisted" << dendl; + + Mutex::Locker locker(m_lock); m_blacklisted = true; + std::swap(on_init_finish, m_on_init_finish); + } else if (r == -ENOENT) { + dout(5) << "mirroring directory does not exist" << dendl; + schedule_refresh_images(30); + } else { + derr << "unexpected error registering mirroring directory watch: " + << cpp_strerror(r) << dendl; + schedule_refresh_images(10); + } + + m_async_op_tracker.finish_op(); + if (on_init_finish != nullptr) { + on_init_finish->complete(r); } +} - if (!m_stopping && reschedule) { - FunctionContext *ctx = new FunctionContext( - boost::bind(&PoolWatcher::refresh_images, this, true)); - m_timer.add_event_after(m_interval, ctx); +template +void PoolWatcher::unregister_watcher() { + dout(5) << dendl; + + m_async_op_tracker.start_op(); + Context *ctx = new FunctionContext([this](int r) { + dout(5) << "unregister_watcher: r=" << r << dendl; + if (r < 0) { + derr << "error unregistering watcher for " + << m_mirroring_watcher->get_oid() << " object: " << cpp_strerror(r) + << dendl; + } + m_async_op_tracker.finish_op(); + }); + + m_mirroring_watcher->unregister_watch(ctx); +} + +template +void PoolWatcher::refresh_images() { + dout(5) << dendl; + + { + Mutex::Locker locker(m_lock); + assert(m_image_ids_invalid); + assert(m_refresh_in_progress); + + // clear all pending notification events since we need to perform + // a full image list refresh + m_pending_added_image_ids.clear(); + m_pending_removed_image_ids.clear(); + if (!m_updated_images.empty()) { + auto it = m_updated_images.begin(); + it->invalid = true; + + // only have a single in-flight request -- remove the rest + ++it; + while (it != m_updated_images.end()) { + m_id_to_updated_images.erase({it->global_image_id, + it->remote_image_id}); + it = m_updated_images.erase(it); + } + } } - m_refresh_cond.Signal(); - // TODO: perhaps use a workqueue instead, once we get notifications - // about new/removed mirrored images + + m_async_op_tracker.start_op(); + m_refresh_image_ids.clear(); + Context *ctx = create_context_callback< + PoolWatcher, &PoolWatcher::handle_refresh_images>(this); + auto req = pool_watcher::RefreshImagesRequest::create(m_remote_io_ctx, + &m_refresh_image_ids, + ctx); + req->send(); } -int PoolWatcher::refresh(ImageIds *image_ids) { - dout(20) << "enter" << dendl; +template +void PoolWatcher::handle_refresh_images(int r) { + dout(5) << "r=" << r << dendl; + + bool retry_refresh = false; + Context *on_init_finish = nullptr; + { + Mutex::Locker locker(m_lock); + assert(m_image_ids_invalid); + assert(m_refresh_in_progress); + m_refresh_in_progress = false; + + if (r >= 0) { + m_image_ids_invalid = false; + m_pending_image_ids = m_refresh_image_ids; + std::swap(on_init_finish, m_on_init_finish); + schedule_listener(); + } else if (r == -EBLACKLISTED) { + dout(0) << "detected client is blacklisted during image refresh" << dendl; + + m_blacklisted = true; + std::swap(on_init_finish, m_on_init_finish); + } else if (r == -ENOENT) { + dout(5) << "mirroring directory not found" << dendl; + m_image_ids_invalid = false; + m_pending_image_ids.clear(); + std::swap(on_init_finish, m_on_init_finish); + r = 0; + schedule_listener(); + } else { + retry_refresh = true; + } + } + + if (retry_refresh) { + derr << "failed to retrieve mirroring directory: " << cpp_strerror(r) + << dendl; + schedule_refresh_images(10); + } - std::string pool_name = m_remote_io_ctx.get_pool_name(); - rbd_mirror_mode_t mirror_mode; - int r = librbd::api::Mirror<>::mode_get(m_remote_io_ctx, &mirror_mode); - if (r < 0) { - derr << "could not tell whether mirroring was enabled for " - << pool_name << ": " << cpp_strerror(r) << dendl; - return r; + m_async_op_tracker.finish_op(); + if (on_init_finish != nullptr) { + on_init_finish->complete(r); } - if (mirror_mode == RBD_MIRROR_MODE_DISABLED) { - dout(20) << "pool " << pool_name << " has mirroring disabled" << dendl; - return 0; +} + +template +void PoolWatcher::schedule_refresh_images(double interval) { + Mutex::Locker timer_locker(m_threads->timer_lock); + Mutex::Locker locker(m_lock); + if (m_shutting_down || m_refresh_in_progress || m_timer_ctx != nullptr) { + return; } - std::map images_map; - r = librbd::api::Image<>::list_images(m_remote_io_ctx, &images_map); - if (r < 0) { - derr << "error retrieving image names from pool " << pool_name << ": " + m_image_ids_invalid = true; + m_timer_ctx = new FunctionContext([this](int r) { + processs_refresh_images(); + }); + m_threads->timer->add_event_after(interval, m_timer_ctx); +} + +template +void PoolWatcher::handle_rewatch_complete(int r) { + dout(5) << "r=" << r << dendl; + + if (r == -EBLACKLISTED) { + dout(0) << "detected client is blacklisted" << dendl; + + Mutex::Locker locker(m_lock); + m_blacklisted = true; + return; + } else if (r == -ENOENT) { + dout(5) << "mirroring directory deleted" << dendl; + } else if (r < 0) { + derr << "unexpected error re-registering mirroring directory watch: " << cpp_strerror(r) << dendl; - return r; } - std::map image_id_to_name; - for (const auto& img_pair : images_map) { - image_id_to_name.insert(std::make_pair(img_pair.second, img_pair.first)); + schedule_refresh_images(5); +} + +template +void PoolWatcher::handle_image_updated(const std::string &remote_image_id, + const std::string &global_image_id, + bool enabled) { + dout(10) << "remote_image_id=" << remote_image_id << ", " + << "global_image_id=" << global_image_id << ", " + << "enabled=" << enabled << dendl; + + Mutex::Locker locker(m_lock); + ImageId image_id(global_image_id, remote_image_id); + m_pending_added_image_ids.erase(image_id); + m_pending_removed_image_ids.erase(image_id); + + auto id = std::make_pair(global_image_id, remote_image_id); + auto id_it = m_id_to_updated_images.find(id); + if (id_it != m_id_to_updated_images.end()) { + id_it->second->enabled = enabled; + id_it->second->invalid = false; + } else if (enabled) { + // need to resolve the image name before notifying listener + auto it = m_updated_images.emplace(m_updated_images.end(), + global_image_id, remote_image_id); + m_id_to_updated_images[id] = it; + schedule_get_image_name(); + } else { + // delete image w/ if no resolve name in-flight + m_pending_removed_image_ids.insert(image_id); + schedule_listener(); } +} - std::string last_read = ""; - int max_read = 1024; - do { - std::map mirror_images; - r = mirror_image_list(&m_remote_io_ctx, last_read, max_read, - &mirror_images); - if (r < 0) { - derr << "error listing mirrored image directory: " - << cpp_strerror(r) << dendl; - return r; +template +void PoolWatcher::schedule_get_image_name() { + assert(m_lock.is_locked()); + if (m_shutting_down || m_blacklisted || m_updated_images.empty() || + m_get_name_in_progress) { + return; + } + m_get_name_in_progress = true; + + auto &updated_image = m_updated_images.front(); + dout(10) << "global_image_id=" << updated_image.global_image_id << ", " + << "remote_image_id=" << updated_image.remote_image_id << dendl; + + librados::ObjectReadOperation op; + librbd::cls_client::dir_get_name_start(&op, updated_image.remote_image_id); + + m_async_op_tracker.start_op(); + + m_out_bl.clear(); + librados::AioCompletion *aio_comp = create_rados_callback< + PoolWatcher, &PoolWatcher::handle_get_image_name>(this); + int r = m_remote_io_ctx.aio_operate(RBD_DIRECTORY, aio_comp, &op, &m_out_bl); + assert(r == 0); + aio_comp->release(); +} + +template +void PoolWatcher::handle_get_image_name(int r) { + dout(10) << "r=" << r << dendl; + + std::string name; + if (r == 0) { + bufferlist::iterator it = m_out_bl.begin(); + r = librbd::cls_client::dir_get_name_finish(&it, &name); + } + + bool image_ids_invalid = false; + { + Mutex::Locker locker(m_lock); + assert(!m_updated_images.empty()); + m_get_name_in_progress = false; + + auto updated_image = m_updated_images.front(); + m_updated_images.pop_front(); + m_id_to_updated_images.erase(std::make_pair(updated_image.global_image_id, + updated_image.remote_image_id)); + + if (r == 0) { + // since names are resolved in event order -- the current update is + // the latest state + ImageId image_id(updated_image.global_image_id, + updated_image.remote_image_id, name); + m_pending_added_image_ids.erase(image_id); + m_pending_removed_image_ids.erase(image_id); + if (!updated_image.invalid) { + if (updated_image.enabled) { + m_pending_added_image_ids.insert(image_id); + } else { + m_pending_removed_image_ids.insert(image_id); + } + schedule_listener(); + } + } else if (r == -EBLACKLISTED) { + dout(0) << "detected client is blacklisted" << dendl; + + m_blacklisted = true; + } else if (r == -ENOENT) { + dout(10) << "image removed after add notification" << dendl; + } else { + derr << "error resolving image name " << updated_image.remote_image_id + << " (" << updated_image.global_image_id << "): " << cpp_strerror(r) + << dendl; + image_ids_invalid = true; + } + + if (!image_ids_invalid) { + schedule_get_image_name(); } - for (auto it = mirror_images.begin(); it != mirror_images.end(); ++it) { - std::string image_name; - auto it2 = image_id_to_name.find(it->first); - if (it2 != image_id_to_name.end()) { - image_name = it2->second; + } + + if (image_ids_invalid) { + schedule_refresh_images(5); + } + m_async_op_tracker.finish_op(); +} + +template +void PoolWatcher::processs_refresh_images() { + assert(m_threads->timer_lock.is_locked()); + assert(m_timer_ctx != nullptr); + m_timer_ctx = nullptr; + + // execute outside of the timer's lock + m_async_op_tracker.start_op(); + Context *ctx = new FunctionContext([this](int r) { + register_watcher(); + m_async_op_tracker.finish_op(); + }); + m_threads->work_queue->queue(ctx, 0); +} + +template +void PoolWatcher::schedule_listener() { + assert(m_lock.is_locked()); + m_pending_updates = true; + if (m_shutting_down || m_image_ids_invalid || m_notify_listener_in_progress) { + return; + } + + dout(20) << dendl; + + m_async_op_tracker.start_op(); + Context *ctx = new FunctionContext([this](int r) { + notify_listener(); + m_async_op_tracker.finish_op(); + }); + + m_notify_listener_in_progress = true; + m_threads->work_queue->queue(ctx, 0); +} + +template +void PoolWatcher::notify_listener() { + dout(10) << dendl; + + ImageIds added_image_ids; + ImageIds removed_image_ids; + { + Mutex::Locker locker(m_lock); + assert(m_notify_listener_in_progress); + + // if the watch failed while we didn't own the lock, we are going + // to need to perform a full refresh + if (m_image_ids_invalid) { + m_notify_listener_in_progress = false; + return; + } + + // merge add/remove notifications into pending set (a given image + // can only be in one set or another) + for (auto it = m_pending_removed_image_ids.begin(); + it != m_pending_removed_image_ids.end(); ) { + if (std::find_if(m_updated_images.begin(), m_updated_images.end(), + [&it](const UpdatedImage &updated_image) { + return (it->id == updated_image.remote_image_id); + }) != m_updated_images.end()) { + // still resolving the name -- so keep it in the pending set + auto image_id_it = m_image_ids.find(*it); + if (image_id_it != m_image_ids.end()) { + m_pending_image_ids.insert(*image_id_it); + } + ++it; + continue; + } + + // merge the remove event into the pending set + m_pending_image_ids.erase(*it); + it = m_pending_removed_image_ids.erase(it); + } + + for (auto &image_id : m_pending_added_image_ids) { + dout(20) << "image_id=" << image_id << dendl; + m_pending_image_ids.erase(image_id); + m_pending_image_ids.insert(image_id); + } + m_pending_added_image_ids.clear(); + + // compute added/removed images + for (auto &image_id : m_image_ids) { + auto it = m_pending_image_ids.find(image_id); + if (it == m_pending_image_ids.end() || it->id != image_id.id) { + removed_image_ids.insert(image_id); } - image_ids->insert(ImageId(it->second, it->first, image_name)); } - if (!mirror_images.empty()) { - last_read = mirror_images.rbegin()->first; + for (auto &image_id : m_pending_image_ids) { + auto it = m_image_ids.find(image_id); + if (it == m_image_ids.end() || it->id != image_id.id) { + added_image_ids.insert(image_id); + } } - r = mirror_images.size(); - } while (r == max_read); - return 0; + m_pending_updates = false; + m_image_ids = m_pending_image_ids; + } + + m_listener.handle_update(added_image_ids, removed_image_ids); + + { + Mutex::Locker locker(m_lock); + m_notify_listener_in_progress = false; + if (m_pending_updates) { + schedule_listener(); + } + } } } // namespace mirror } // namespace rbd + +template class rbd::mirror::PoolWatcher; diff --git a/src/tools/rbd_mirror/PoolWatcher.h b/src/tools/rbd_mirror/PoolWatcher.h index 721106f761e..1cd2ede2d12 100644 --- a/src/tools/rbd_mirror/PoolWatcher.h +++ b/src/tools/rbd_mirror/PoolWatcher.h @@ -9,49 +9,178 @@ #include #include +#include "common/AsyncOpTracker.h" #include "common/ceph_context.h" #include "common/Mutex.h" -#include "common/Timer.h" #include "include/rados/librados.hpp" #include "types.h" +#include +#include +#include #include +#include "include/assert.h" + +namespace librbd { struct ImageCtx; } namespace rbd { namespace mirror { +template struct Threads; + /** * Keeps track of images that have mirroring enabled within all * pools. */ +template class PoolWatcher { public: - PoolWatcher(librados::IoCtx &remote_io_ctx, double interval_seconds, - Mutex &lock, Cond &cond); + struct Listener { + virtual ~Listener() { + } + + virtual void handle_update(const ImageIds &added_image_ids, + const ImageIds &removed_image_ids) = 0; + }; + + PoolWatcher(Threads *threads, librados::IoCtx &remote_io_ctx, + Listener &listener); ~PoolWatcher(); PoolWatcher(const PoolWatcher&) = delete; PoolWatcher& operator=(const PoolWatcher&) = delete; bool is_blacklisted() const; - const ImageIds& get_images() const; - void refresh_images(bool reschedule=true); + void init(Context *on_finish = nullptr); + void shut_down(Context *on_finish); private: + /** + * @verbatim + * + * + * | + * v + * INIT + * | + * v + * REGISTER_WATCHER + * | + * |/--------------------------------\ + * | | + * v | + * REFRESH_IMAGES | + * | | + * |/----------------------------\ | + * | | | + * v | | + * NOTIFY_LISTENER | | + * | | | + * v | | + * IDLE ---\ | | + * | | | | + * | |\---> IMAGE_UPDATED | | + * | | | | | + * | | v | | + * | | GET_IMAGE_NAME --/ | + * | | | + * | \----> WATCH_ERROR ---------/ + * v + * SHUT_DOWN + * | + * v + * UNREGISTER_WATCHER + * | + * v + * + * + * @endverbatim + */ + class MirroringWatcher; + + struct UpdatedImage { + std::string global_image_id; + std::string remote_image_id; + bool enabled = true; + bool invalid = false; + + UpdatedImage(const std::string &global_image_id, + const std::string &remote_image_id) + : global_image_id(global_image_id), remote_image_id(remote_image_id) { + } + }; + + typedef std::pair GlobalRemoteIds; + typedef std::list UpdatedImages; + typedef std::unordered_map > IdToUpdatedImages; + + struct StrictImageIdCompare { + bool operator()(const ImageId &lhs, const ImageId &rhs) const { + if (lhs.global_id != rhs.global_id) { + return lhs.global_id < rhs.global_id; + } + return lhs.id < rhs.id; + } + }; + Threads *m_threads; librados::IoCtx m_remote_io_ctx; - Mutex &m_lock; - Cond &m_refresh_cond; + Listener &m_listener; + + ImageIds m_refresh_image_ids; + bufferlist m_out_bl; + + mutable Mutex m_lock; + + Context *m_on_init_finish = nullptr; - bool m_stopping = false; + ImageIds m_image_ids; + + bool m_pending_updates = false; + bool m_notify_listener_in_progress = false; + ImageIds m_pending_image_ids; + ImageIds m_pending_added_image_ids; + ImageIds m_pending_removed_image_ids; + + MirroringWatcher *m_mirroring_watcher; + + Context *m_timer_ctx = nullptr; + + AsyncOpTracker m_async_op_tracker; bool m_blacklisted = false; - SafeTimer m_timer; - double m_interval; + bool m_shutting_down = false; + bool m_image_ids_invalid = true; + bool m_refresh_in_progress = false; + + UpdatedImages m_updated_images; + IdToUpdatedImages m_id_to_updated_images; + bool m_get_name_in_progress = false; + + void register_watcher(); + void handle_register_watcher(int r); + void unregister_watcher(); - ImageIds m_images; + void refresh_images(); + void handle_refresh_images(int r); + + void schedule_refresh_images(double interval); + void processs_refresh_images(); + + void handle_rewatch_complete(int r); + void handle_image_updated(const std::string &remote_image_id, + const std::string &global_image_id, + bool enabled); + + void schedule_get_image_name(); + void handle_get_image_name(int r); + + void schedule_listener(); + void notify_listener(); - int refresh(ImageIds *image_ids); }; } // namespace mirror } // namespace rbd +extern template class rbd::mirror::PoolWatcher; + #endif // CEPH_RBD_MIRROR_POOL_WATCHER_H diff --git a/src/tools/rbd_mirror/Replayer.cc b/src/tools/rbd_mirror/Replayer.cc index 4e6af299cc2..558df3bec7b 100644 --- a/src/tools/rbd_mirror/Replayer.cc +++ b/src/tools/rbd_mirror/Replayer.cc @@ -218,6 +218,7 @@ Replayer::Replayer(Threads *threads, m_peer(peer), m_args(args), m_local_pool_id(local_pool_id), + m_pool_watcher_listener(this), m_asok_hook(nullptr), m_replayer_thread(this), m_leader_listener(this) @@ -242,6 +243,8 @@ Replayer::~Replayer() if (m_instance_watcher) { m_instance_watcher->shut_down(); } + + assert(!m_pool_watcher); } bool Replayer::is_blacklisted() const { @@ -290,6 +293,11 @@ int Replayer::init() dout(20) << "connected to " << m_peer << dendl; + r = init_local_mirroring_images(); + if (r < 0) { + return r; + } + m_leader_watcher.reset(new LeaderWatcher<>(m_threads, m_local_io_ctx, &m_leader_listener)); r = m_leader_watcher->init(); @@ -306,14 +314,6 @@ int Replayer::init() return r; } - // Bootstrap existing mirroring images - init_local_mirroring_images(); - - m_pool_watcher.reset(new PoolWatcher(m_remote_io_ctx, - g_ceph_context->_conf->rbd_mirror_image_directory_refresh_interval, - m_lock, m_cond)); - m_pool_watcher->refresh_images(); - m_replayer_thread.create("replayer"); return 0; @@ -390,18 +390,20 @@ int Replayer::init_rados(const std::string &cluster_name, return 0; } -void Replayer::init_local_mirroring_images() { +int Replayer::init_local_mirroring_images() { + dout(20) << dendl; + rbd_mirror_mode_t mirror_mode; int r = librbd::api::Mirror<>::mode_get(m_local_io_ctx, &mirror_mode); if (r < 0) { derr << "could not tell whether mirroring was enabled for " << m_local_io_ctx.get_pool_name() << ": " << cpp_strerror(r) << dendl; - return; + return r; } if (mirror_mode == RBD_MIRROR_MODE_DISABLED) { dout(20) << "pool " << m_local_io_ctx.get_pool_name() << " " << "has mirroring disabled" << dendl; - return; + return -ENOENT; } ImageIds image_ids; @@ -415,16 +417,22 @@ void Replayer::init_local_mirroring_images() { if (r < 0) { derr << "error listing mirrored image directory: " << cpp_strerror(r) << dendl; - continue; + return r; } for (auto it = mirror_images.begin(); it != mirror_images.end(); ++it) { std::string image_name; r = dir_get_name(&m_local_io_ctx, RBD_DIRECTORY, it->first, &image_name); - if (r < 0) { + if (r == -ENOENT) { + dout(20) << "orphaned mirror image: " << it->first << dendl; + continue; + } else if (r < 0) { derr << "error retrieving local image name: " << cpp_strerror(r) << dendl; - continue; + return r; } + + dout(20) << "local image: " << it->second << " (" << it->first << ")" + << dendl; image_ids.insert(ImageId(it->second, it->first, image_name)); } if (!mirror_images.empty()) { @@ -434,6 +442,7 @@ void Replayer::init_local_mirroring_images() { } while (r == max_read); m_init_image_ids = std::move(image_ids); + return 0; } void Replayer::run() @@ -441,7 +450,6 @@ void Replayer::run() dout(20) << "enter" << dendl; while (!m_stopping.read()) { - std::string asok_hook_name = m_local_io_ctx.get_pool_name() + " " + m_peer.cluster_name; if (m_asok_hook_name != asok_hook_name || m_asok_hook == nullptr) { @@ -453,29 +461,33 @@ void Replayer::run() } Mutex::Locker locker(m_lock); - if (m_pool_watcher->is_blacklisted()) { + if (m_pool_watcher && m_pool_watcher->is_blacklisted()) { m_blacklisted = true; m_stopping.set(1); - } else if (!m_manual_stop && m_leader_watcher->is_leader()) { - set_sources(m_pool_watcher->get_images()); + break; } - if (m_blacklisted) { - break; + for (auto image_it = m_image_replayers.begin(); + image_it != m_image_replayers.end(); ) { + if (image_it->second->remote_images_empty()) { + if (stop_image_replayer(image_it->second)) { + image_it = m_image_replayers.erase(image_it); + continue; + } + } else { + start_image_replayer(image_it->second); + } + ++image_it; } + m_cond.WaitInterval(m_lock, - utime_t(g_ceph_context->_conf - ->rbd_mirror_image_state_check_interval, 0)); + utime_t(g_ceph_context->_conf-> + rbd_mirror_image_state_check_interval, 0)); } - ImageIds empty_sources; - while (true) { - Mutex::Locker locker(m_lock); - set_sources(empty_sources); - if (m_image_replayers.empty()) { - break; - } - m_cond.WaitInterval(m_lock, seconds(1)); + Mutex::Locker locker(m_lock); + while (!m_image_replayers.empty()) { + stop_image_replayers(); } } @@ -601,17 +613,24 @@ void Replayer::release_leader() m_leader_watcher->release_leader(); } -void Replayer::set_sources(const ImageIds &image_ids) -{ - dout(20) << "enter" << dendl; +void Replayer::handle_update(const ImageIds &added_image_ids, + const ImageIds &removed_image_ids) { + if (m_stopping.read()) { + return; + } - assert(m_lock.is_locked()); + dout(10) << dendl; + Mutex::Locker locker(m_lock); + if (!m_leader_watcher->is_leader()) { + return; + } - if (!m_init_image_ids.empty() && !m_stopping.read() && - m_leader_watcher->is_leader()) { + // first callback will be a full directory -- so see if we need to remove + // any local images that no longer exist on the remote side + if (!m_init_image_ids.empty()) { dout(20) << "scanning initial local image set" << dendl; - for (auto &remote_image : image_ids) { - auto it = m_init_image_ids.find(ImageId(remote_image.global_id)); + for (auto &image_id : added_image_ids) { + auto it = m_init_image_ids.find(image_id); if (it != m_init_image_ids.end()) { m_init_image_ids.erase(it); } @@ -628,30 +647,46 @@ void Replayer::set_sources(const ImageIds &image_ids) } // shut down replayers for non-mirrored images - for (auto image_it = m_image_replayers.begin(); - image_it != m_image_replayers.end();) { - auto image_id_it = image_ids.find(ImageId(image_it->first)); - if (image_id_it == image_ids.end()) { + for (auto &image_id : removed_image_ids) { + auto image_it = m_image_replayers.find(image_id.global_id); + if (image_it != m_image_replayers.end()) { + assert(!m_remote_mirror_uuid.empty()); + image_it->second->remove_remote_image(m_remote_mirror_uuid, + image_id.id); + if (image_it->second->is_running()) { dout(20) << "stop image replayer for remote image " - << image_it->second->get_global_image_id() << dendl; + << image_id.id << " (" << image_id.global_id << ")" + << dendl; } - if (stop_image_replayer(image_it->second)) { - image_it = m_image_replayers.erase(image_it); - continue; + + if (image_it->second->remote_images_empty() && + stop_image_replayer(image_it->second)) { + // no additional remotes registered for this image + m_image_replayers.erase(image_it); } } - ++image_it; } - if (image_ids.empty()) { + // prune previously stopped image replayers + for (auto image_it = m_image_replayers.begin(); + image_it != m_image_replayers.end(); ) { + if (image_it->second->remote_images_empty() && + stop_image_replayer(image_it->second)) { + image_it = m_image_replayers.erase(image_it); + } else { + ++image_it; + } + } + + if (added_image_ids.empty()) { return; } std::string local_mirror_uuid; int r = librbd::cls_client::mirror_uuid_get(&m_local_io_ctx, &local_mirror_uuid); - if (r < 0) { + if (r < 0 || local_mirror_uuid.empty()) { derr << "failed to retrieve local mirror uuid from pool " << m_local_io_ctx.get_pool_name() << ": " << cpp_strerror(r) << dendl; return; @@ -660,18 +695,24 @@ void Replayer::set_sources(const ImageIds &image_ids) std::string remote_mirror_uuid; r = librbd::cls_client::mirror_uuid_get(&m_remote_io_ctx, &remote_mirror_uuid); - if (r < 0) { + if (r < 0 || remote_mirror_uuid.empty()) { derr << "failed to retrieve remote mirror uuid from pool " << m_remote_io_ctx.get_pool_name() << ": " << cpp_strerror(r) << dendl; return; } + m_remote_mirror_uuid = remote_mirror_uuid; - for (auto &image_id : image_ids) { + // start replayers for newly added remote image sources + for (auto &image_id : added_image_ids) { auto it = m_image_replayers.find(image_id.global_id); if (it == m_image_replayers.end()) { unique_ptr > image_replayer(new ImageReplayer<>( m_threads, m_image_deleter, m_image_sync_throttler, m_local_rados, local_mirror_uuid, m_local_pool_id, image_id.global_id)); + if (m_manual_stop) { + image_replayer->stop(nullptr, true); + } + it = m_image_replayers.insert( std::make_pair(image_id.global_id, std::move(image_replayer))).first; } @@ -689,11 +730,7 @@ void Replayer::set_sources(const ImageIds &image_ids) void Replayer::start_image_replayer(unique_ptr > &image_replayer) { assert(m_lock.is_locked()); - - std::string global_image_id = image_replayer->get_global_image_id(); - dout(20) << "global_image_id=" << global_image_id << dendl; - - if (!image_replayer->is_stopped()) { + if (!image_replayer->is_stopped() || image_replayer->remote_images_empty()) { return; } else if (image_replayer->is_blacklisted()) { derr << "blacklisted detected during image replay" << dendl; @@ -702,6 +739,9 @@ void Replayer::start_image_replayer(unique_ptr > &image_replayer return; } + std::string global_image_id = image_replayer->get_global_image_id(); + dout(20) << "global_image_id=" << global_image_id << dendl; + FunctionContext *ctx = new FunctionContext( [this, global_image_id] (int r) { dout(20) << "image deleter result: r=" << r << ", " @@ -770,31 +810,36 @@ bool Replayer::stop_image_replayer(unique_ptr > &image_replayer) return false; } -void Replayer::handle_post_acquire_leader(Context *on_finish) { +void Replayer::stop_image_replayers() { dout(20) << dendl; - { - Mutex::Locker locker(m_lock); - m_cond.Signal(); + assert(m_lock.is_locked()); + for (auto image_it = m_image_replayers.begin(); + image_it != m_image_replayers.end();) { + if (stop_image_replayer(image_it->second)) { + image_it = m_image_replayers.erase(image_it); + continue; + } + ++image_it; } - - on_finish->complete(0); } -void Replayer::handle_pre_release_leader(Context *on_finish) { +void Replayer::stop_image_replayers(Context *on_finish) { dout(20) << dendl; { Mutex::Locker locker(m_lock); - set_sources(ImageIds()); + stop_image_replayers(); + if (!m_image_replayers.empty()) { + Context *ctx = new FunctionContext([this, on_finish](int r) { + assert(r == 0); + stop_image_replayers(on_finish); + }); + ctx = create_async_context_callback(m_threads->work_queue, ctx); + Mutex::Locker timer_locker(m_threads->timer_lock); - Context *task = create_async_context_callback( - m_threads->work_queue, new FunctionContext( - [this, on_finish](int r) { - handle_pre_release_leader(on_finish); - })); - m_threads->timer->add_event_after(1, task); + m_threads->timer->add_event_after(1, ctx); return; } } @@ -802,5 +847,48 @@ void Replayer::handle_pre_release_leader(Context *on_finish) { on_finish->complete(0); } +void Replayer::handle_post_acquire_leader(Context *on_finish) { + dout(20) << dendl; + + Mutex::Locker locker(m_lock); + assert(!m_pool_watcher); + m_pool_watcher.reset(new PoolWatcher<>( + m_threads, m_remote_io_ctx, m_pool_watcher_listener)); + m_pool_watcher->init(create_async_context_callback( + m_threads->work_queue, on_finish)); + + m_cond.Signal(); +} + +void Replayer::handle_pre_release_leader(Context *on_finish) { + dout(20) << dendl; + shut_down_pool_watcher(on_finish); +} + +void Replayer::shut_down_pool_watcher(Context *on_finish) { + dout(20) << dendl; + + Context *ctx = new FunctionContext([this, on_finish](int r) { + handle_shut_down_pool_watcher(r, on_finish); + }); + ctx = create_async_context_callback(m_threads->work_queue, ctx); + + Mutex::Locker locker(m_lock); + assert(m_pool_watcher); + m_pool_watcher->shut_down(ctx); +} + +void Replayer::handle_shut_down_pool_watcher(int r, Context *on_finish) { + dout(20) << "r=" << r << dendl; + + { + Mutex::Locker locker(m_lock); + assert(m_pool_watcher); + m_pool_watcher.reset(); + } + + stop_image_replayers(on_finish); +} + } // namespace mirror } // namespace rbd diff --git a/src/tools/rbd_mirror/Replayer.h b/src/tools/rbd_mirror/Replayer.h index abcc35694af..aff667381a9 100644 --- a/src/tools/rbd_mirror/Replayer.h +++ b/src/tools/rbd_mirror/Replayer.h @@ -59,18 +59,38 @@ public: void release_leader(); private: - void init_local_mirroring_images(); - void set_sources(const ImageIds &image_ids); + struct PoolWatcherListener : public PoolWatcher<>::Listener { + Replayer *replayer; + + PoolWatcherListener(Replayer *replayer) : replayer(replayer) { + } + + void handle_update(const ImageIds &added_image_ids, + const ImageIds &removed_image_ids) override { + replayer->handle_update(added_image_ids, removed_image_ids); + } + }; + + int init_local_mirroring_images(); + + void handle_update(const ImageIds &added_image_ids, + const ImageIds &removed_image_ids); void start_image_replayer(unique_ptr > &image_replayer); bool stop_image_replayer(unique_ptr > &image_replayer); + void stop_image_replayers(); + void stop_image_replayers(Context *on_finish); - int init_rados(const std::string &cluster_name, const std::string &client_name, + int init_rados(const std::string &cluster_name, + const std::string &client_name, const std::string &description, RadosRef *rados_ref); void handle_post_acquire_leader(Context *on_finish); void handle_pre_release_leader(Context *on_finish); + void shut_down_pool_watcher(Context *on_finish); + void handle_shut_down_pool_watcher(int r, Context *on_finish); + Threads *m_threads; std::shared_ptr m_image_deleter; ImageSyncThrottlerRef<> m_image_sync_throttler; @@ -91,7 +111,11 @@ private: int64_t m_local_pool_id = -1; int64_t m_remote_pool_id = -1; - std::unique_ptr m_pool_watcher; + std::string m_remote_mirror_uuid; + + PoolWatcherListener m_pool_watcher_listener; + std::unique_ptr > m_pool_watcher; + std::map > > m_image_replayers; std::string m_asok_hook_name; diff --git a/src/tools/rbd_mirror/pool_watcher/RefreshImagesRequest.cc b/src/tools/rbd_mirror/pool_watcher/RefreshImagesRequest.cc index e134c13ace5..c387ecea789 100644 --- a/src/tools/rbd_mirror/pool_watcher/RefreshImagesRequest.cc +++ b/src/tools/rbd_mirror/pool_watcher/RefreshImagesRequest.cc @@ -52,7 +52,7 @@ void RefreshImagesRequest::handle_mirror_image_list(int r) { r = librbd::cls_client::mirror_image_list_finish(&it, &ids); } - if (r < 0) { + if (r < 0 && r != -ENOENT) { derr << "failed to list mirrored images: " << cpp_strerror(r) << dendl; finish(r); return; @@ -98,7 +98,7 @@ void RefreshImagesRequest::handle_dir_list(int r) { r = librbd::cls_client::dir_list_finish(&it, &name_to_ids); } - if (r < 0) { + if (r < 0 && r != -ENOENT) { derr << "failed to list images: " << cpp_strerror(r) << dendl; finish(r); return; diff --git a/src/tools/rbd_mirror/pool_watcher/RefreshImagesRequest.h b/src/tools/rbd_mirror/pool_watcher/RefreshImagesRequest.h index 38dd1f5d53b..79a327cd6c7 100644 --- a/src/tools/rbd_mirror/pool_watcher/RefreshImagesRequest.h +++ b/src/tools/rbd_mirror/pool_watcher/RefreshImagesRequest.h @@ -21,8 +21,8 @@ namespace pool_watcher { template class RefreshImagesRequest { public: - RefreshImagesRequest *create(librados::IoCtx &remote_io_ctx, - ImageIds *image_ids, Context *on_finish) { + static RefreshImagesRequest *create(librados::IoCtx &remote_io_ctx, + ImageIds *image_ids, Context *on_finish) { return new RefreshImagesRequest(remote_io_ctx, image_ids, on_finish); } diff --git a/src/tools/rbd_mirror/types.cc b/src/tools/rbd_mirror/types.cc index 9040f210c70..52bf9de20fc 100644 --- a/src/tools/rbd_mirror/types.cc +++ b/src/tools/rbd_mirror/types.cc @@ -6,8 +6,13 @@ namespace rbd { namespace mirror { -std::ostream& operator<<(std::ostream& lhs, const peer_t &peer) -{ +std::ostream &operator<<(std::ostream &os, const ImageId &image_id) { + return os << "global id=" << image_id.global_id << ", " + << "id=" << image_id.id << ", " + << "name=" << image_id.name; +} + +std::ostream& operator<<(std::ostream& lhs, const peer_t &peer) { return lhs << "uuid: " << peer.uuid << " cluster: " << peer.cluster_name << " client: " << peer.client_name; diff --git a/src/tools/rbd_mirror/types.h b/src/tools/rbd_mirror/types.h index 0a13049f581..617effd21cf 100644 --- a/src/tools/rbd_mirror/types.h +++ b/src/tools/rbd_mirror/types.h @@ -31,8 +31,11 @@ struct ImageId { explicit ImageId(const std::string &global_id) : global_id(global_id) { } + ImageId(const std::string &global_id, const std::string &id) + : global_id(global_id), id(id) { + } ImageId(const std::string &global_id, const std::string &id, - const boost::optional &name = boost::none) + const std::string &name) : global_id(global_id), id(id), name(name) { } @@ -44,6 +47,8 @@ struct ImageId { } }; +std::ostream &operator<<(std::ostream &, const ImageId &image_id); + typedef std::set ImageIds; struct peer_t { -- 2.39.5