From 717af7caa9ec2cea321faf07053dd31908be33d6 Mon Sep 17 00:00:00 2001 From: Jason Dillaman Date: Mon, 24 Apr 2017 23:02:17 -0400 Subject: [PATCH] rbd-mirror: forward initial pool image deletions to instance Signed-off-by: Jason Dillaman --- .../rbd_mirror/test_mock_InstanceReplayer.cc | 8 +- src/tools/rbd_mirror/ImageReplayer.cc | 3 +- src/tools/rbd_mirror/ImageReplayer.h | 3 +- src/tools/rbd_mirror/InstanceReplayer.cc | 20 +-- src/tools/rbd_mirror/PoolReplayer.cc | 151 +++++++++--------- src/tools/rbd_mirror/PoolReplayer.h | 33 ++-- 6 files changed, 113 insertions(+), 105 deletions(-) diff --git a/src/test/rbd_mirror/test_mock_InstanceReplayer.cc b/src/test/rbd_mirror/test_mock_InstanceReplayer.cc index d406d9f57fa..53a9d0b6edd 100644 --- a/src/test/rbd_mirror/test_mock_InstanceReplayer.cc +++ b/src/test/rbd_mirror/test_mock_InstanceReplayer.cc @@ -72,8 +72,9 @@ struct ImageReplayer { MOCK_METHOD3(add_remote_image, void(const std::string &, const std::string &, librados::IoCtx &)); - MOCK_METHOD2(remove_remote_image, void(const std::string &, - const std::string &)); + MOCK_METHOD3(remove_remote_image, void(const std::string &, + const std::string &, + bool)); MOCK_METHOD0(remote_images_empty, bool()); MOCK_METHOD0(get_global_image_id, const std::string &()); MOCK_METHOD0(get_local_image_id, const std::string &()); @@ -174,7 +175,8 @@ TEST_F(TestMockInstanceReplayer, AcquireReleaseImage) { C_SaferCond on_release; EXPECT_CALL(mock_image_replayer, - remove_remote_image("remote_mirror_uuid", "remote_image_id")); + remove_remote_image("remote_mirror_uuid", "remote_image_id", + false)); EXPECT_CALL(mock_image_replayer, remote_images_empty()) .WillOnce(Return(true)); EXPECT_CALL(mock_image_replayer, is_stopped()) diff --git a/src/tools/rbd_mirror/ImageReplayer.cc b/src/tools/rbd_mirror/ImageReplayer.cc index 2ab66bbb9c1..f9a38c2fe3f 100644 --- a/src/tools/rbd_mirror/ImageReplayer.cc +++ b/src/tools/rbd_mirror/ImageReplayer.cc @@ -335,7 +335,8 @@ void ImageReplayer::add_remote_image(const std::string &mirror_uuid, template void ImageReplayer::remove_remote_image(const std::string &mirror_uuid, - const std::string &image_id) { + const std::string &image_id, + bool schedule_delete) { Mutex::Locker locker(m_lock); m_remote_images.erase({mirror_uuid, image_id}); } diff --git a/src/tools/rbd_mirror/ImageReplayer.h b/src/tools/rbd_mirror/ImageReplayer.h index 8cb33eefed6..934df3e460a 100644 --- a/src/tools/rbd_mirror/ImageReplayer.h +++ b/src/tools/rbd_mirror/ImageReplayer.h @@ -109,7 +109,8 @@ public: const std::string &remote_image_id, librados::IoCtx &remote_io_ctx); void remove_remote_image(const std::string &remote_mirror_uuid, - const std::string &remote_image_id); + const std::string &remote_image_id, + bool schedule_delete); bool remote_images_empty() const; inline int64_t get_local_pool_id() const { diff --git a/src/tools/rbd_mirror/InstanceReplayer.cc b/src/tools/rbd_mirror/InstanceReplayer.cc index 74735d63444..d2426d0d0e4 100644 --- a/src/tools/rbd_mirror/InstanceReplayer.cc +++ b/src/tools/rbd_mirror/InstanceReplayer.cc @@ -156,13 +156,13 @@ void InstanceReplayer::acquire_image(const std::string &global_image_id, } auto image_replayer = it->second; + if (!peer_mirror_uuid.empty()) { + auto iter = m_peers.find(Peer(peer_mirror_uuid)); + assert(iter != m_peers.end()); + auto io_ctx = iter->io_ctx; - auto iter = m_peers.find(Peer(peer_mirror_uuid)); - assert(iter != m_peers.end()); - auto io_ctx = iter->io_ctx; - - image_replayer->add_remote_image(peer_mirror_uuid, peer_image_id, io_ctx); - + image_replayer->add_remote_image(peer_mirror_uuid, peer_image_id, io_ctx); + } start_image_replayer(image_replayer); m_threads->work_queue->queue(on_finish, 0); @@ -190,11 +190,13 @@ void InstanceReplayer::release_image(const std::string &global_image_id, } auto image_replayer = it->second; - - image_replayer->remove_remote_image(peer_mirror_uuid, peer_image_id); + if (!peer_mirror_uuid.empty()) { + image_replayer->remove_remote_image(peer_mirror_uuid, peer_image_id, + schedule_delete); + } if (!image_replayer->remote_images_empty()) { - dout(20) << global_image_id << ": still has remote images" << dendl; + dout(20) << global_image_id << ": still has peer images" << dendl; m_threads->work_queue->queue(on_finish, 0); return; } diff --git a/src/tools/rbd_mirror/PoolReplayer.cc b/src/tools/rbd_mirror/PoolReplayer.cc index dbede1ed0b5..0dbfeffe54d 100644 --- a/src/tools/rbd_mirror/PoolReplayer.cc +++ b/src/tools/rbd_mirror/PoolReplayer.cc @@ -21,7 +21,6 @@ #include "InstanceWatcher.h" #include "LeaderWatcher.h" #include "Threads.h" -#include "pool_watcher/RefreshImagesRequest.h" #define dout_context g_ceph_context #define dout_subsys ceph_subsys_rbd_mirror @@ -206,20 +205,6 @@ private: } // anonymous namespace -struct PoolReplayer::C_RefreshLocalImages : public Context { - PoolReplayer *pool_replayer; - Context *on_finish; - ImageIds image_ids; - - C_RefreshLocalImages(PoolReplayer *pool_replayer, Context *on_finish) - : pool_replayer(pool_replayer), on_finish(on_finish) { - } - - void finish(int r) override { - pool_replayer->handle_refresh_local_images(r, std::move(image_ids), on_finish); - } -}; - PoolReplayer::PoolReplayer(Threads *threads, std::shared_ptr image_deleter, ImageSyncThrottlerRef<> image_sync_throttler, @@ -232,7 +217,8 @@ PoolReplayer::PoolReplayer(Threads *threads, m_peer(peer), m_args(args), m_local_pool_id(local_pool_id), - m_pool_watcher_listener(this), + m_local_pool_watcher_listener(this, true), + m_remote_pool_watcher_listener(this, false), m_asok_hook(nullptr), m_pool_replayer_thread(this), m_leader_listener(this) @@ -261,7 +247,8 @@ PoolReplayer::~PoolReplayer() m_instance_replayer->shut_down(); } - assert(!m_pool_watcher); + assert(!m_local_pool_watcher); + assert(!m_remote_pool_watcher); } bool PoolReplayer::is_blacklisted() const { @@ -435,7 +422,8 @@ void PoolReplayer::run() } Mutex::Locker locker(m_lock); - if (m_pool_watcher && m_pool_watcher->is_blacklisted()) { + if ((m_local_pool_watcher && m_local_pool_watcher->is_blacklisted()) || + (m_remote_pool_watcher && m_remote_pool_watcher->is_blacklisted())) { m_blacklisted = true; m_stopping.set(1); break; @@ -553,42 +541,44 @@ void PoolReplayer::release_leader() void PoolReplayer::handle_update(const std::string &mirror_uuid, ImageIds &&added_image_ids, ImageIds &&removed_image_ids) { - assert(!mirror_uuid.empty()); if (m_stopping.read()) { return; } - dout(10) << dendl; + dout(10) << "mirror_uuid=" << mirror_uuid << ", " + << "added_count=" << added_image_ids.size() << ", " + << "removed_count=" << removed_image_ids.size() << dendl; Mutex::Locker locker(m_lock); if (!m_leader_watcher->is_leader()) { return; } - if (m_peer.uuid != mirror_uuid) { - m_instance_replayer->remove_peer(m_peer.uuid); - m_instance_replayer->add_peer(mirror_uuid, m_remote_io_ctx); - m_peer.uuid = mirror_uuid; - } + if (m_initial_mirror_image_ids.find(mirror_uuid) == + m_initial_mirror_image_ids.end() && + m_initial_mirror_image_ids.size() < 2) { + m_initial_mirror_image_ids[mirror_uuid] = added_image_ids; + + if (m_initial_mirror_image_ids.size() == 2) { + dout(10) << "local and remote pools refreshed" << dendl; - // first callback will be a full directory -- so see if we need to remove - // any local images that no longer exist on the remote side - if (!m_init_image_ids.empty()) { - dout(20) << "scanning initial local image set" << dendl; - for (auto &image_id : added_image_ids) { - auto it = m_init_image_ids.find(image_id); - if (it != m_init_image_ids.end()) { - m_init_image_ids.erase(it); + // both local and remote initial pool listing received. derive + // removal notifications for the remote pool + auto &local_image_ids = m_initial_mirror_image_ids.begin()->second; + auto &remote_image_ids = m_initial_mirror_image_ids.rbegin()->second; + for (auto &local_image_id : local_image_ids) { + if (remote_image_ids.find(local_image_id) == remote_image_ids.end()) { + removed_image_ids.emplace(local_image_id.global_id, ""); + } } + local_image_ids.clear(); + remote_image_ids.clear(); } + } - // the remaining images in m_init_image_ids must be deleted - for (auto &image_id : m_init_image_ids) { - dout(20) << "scheduling the deletion of init image: " - << image_id.global_id << " (" << image_id.id << ")" << dendl; - m_image_deleter->schedule_image_delete(m_local_rados, m_local_pool_id, - image_id.global_id); - } - m_init_image_ids.clear(); + if (!mirror_uuid.empty() && m_peer.uuid != mirror_uuid) { + m_instance_replayer->remove_peer(m_peer.uuid); + m_instance_replayer->add_peer(mirror_uuid, m_remote_io_ctx); + m_peer.uuid = mirror_uuid; } m_update_op_tracker.start_op(); @@ -599,19 +589,19 @@ void PoolReplayer::handle_update(const std::string &mirror_uuid, C_Gather *gather_ctx = new C_Gather(g_ceph_context, ctx); - for (auto &image_id : removed_image_ids) { + for (auto &image_id : added_image_ids) { // for now always send to myself (the leader) std::string &instance_id = m_instance_watcher->get_instance_id(); - m_instance_watcher->notify_image_release(instance_id, image_id.global_id, - mirror_uuid, image_id.id, true, + m_instance_watcher->notify_image_acquire(instance_id, image_id.global_id, + mirror_uuid, image_id.id, gather_ctx->new_sub()); } - for (auto &image_id : added_image_ids) { + for (auto &image_id : removed_image_ids) { // for now always send to myself (the leader) std::string &instance_id = m_instance_watcher->get_instance_id(); - m_instance_watcher->notify_image_acquire(instance_id, image_id.global_id, - mirror_uuid, image_id.id, + m_instance_watcher->notify_image_release(instance_id, image_id.global_id, + mirror_uuid, image_id.id, true, gather_ctx->new_sub()); } @@ -620,68 +610,73 @@ void PoolReplayer::handle_update(const std::string &mirror_uuid, void PoolReplayer::handle_post_acquire_leader(Context *on_finish) { dout(20) << dendl; - refresh_local_images(on_finish); + init_local_pool_watcher(on_finish); } void PoolReplayer::handle_pre_release_leader(Context *on_finish) { dout(20) << dendl; - shut_down_pool_watcher(on_finish); + shut_down_pool_watchers(on_finish); } -void PoolReplayer::refresh_local_images(Context *on_finish) { +void PoolReplayer::init_local_pool_watcher(Context *on_finish) { dout(20) << dendl; + Mutex::Locker locker(m_lock); + assert(!m_local_pool_watcher); + m_local_pool_watcher.reset(new PoolWatcher<>( + m_threads, m_local_io_ctx, m_local_pool_watcher_listener)); + m_initial_mirror_image_ids.clear(); + // ensure the initial set of local images is up-to-date // after acquiring the leader role - auto ctx = new C_RefreshLocalImages(this, on_finish); - auto req = pool_watcher::RefreshImagesRequest<>::create( - m_local_io_ctx, &ctx->image_ids, ctx); - req->send(); + auto ctx = new FunctionContext([this, on_finish](int r) { + handle_init_local_pool_watcher(r, on_finish); + }); + m_local_pool_watcher->init(create_async_context_callback( + m_threads->work_queue, ctx)); } -void PoolReplayer::handle_refresh_local_images(int r, ImageIds &&image_ids, - Context *on_finish) { +void PoolReplayer::handle_init_local_pool_watcher(int r, Context *on_finish) { dout(20) << "r=" << r << dendl; - - { - Mutex::Locker locker(m_lock); - m_init_image_ids = std::move(image_ids); - } - if (r < 0) { derr << "failed to retrieve local images: " << cpp_strerror(r) << dendl; on_finish->complete(r); return; } - init_pool_watcher(on_finish); + init_remote_pool_watcher(on_finish); } -void PoolReplayer::init_pool_watcher(Context *on_finish) { +void PoolReplayer::init_remote_pool_watcher(Context *on_finish) { dout(20) << dendl; Mutex::Locker locker(m_lock); - assert(!m_pool_watcher); - m_pool_watcher.reset(new PoolWatcher<>( - m_threads, m_remote_io_ctx, m_pool_watcher_listener)); - m_pool_watcher->init(create_async_context_callback( + assert(!m_remote_pool_watcher); + m_remote_pool_watcher.reset(new PoolWatcher<>( + m_threads, m_remote_io_ctx, m_remote_pool_watcher_listener)); + m_remote_pool_watcher->init(create_async_context_callback( m_threads->work_queue, on_finish)); m_cond.Signal(); } -void PoolReplayer::shut_down_pool_watcher(Context *on_finish) { +void PoolReplayer::shut_down_pool_watchers(Context *on_finish) { dout(20) << dendl; { Mutex::Locker locker(m_lock); - if (m_pool_watcher) { + if (m_local_pool_watcher) { Context *ctx = new FunctionContext([this, on_finish](int r) { - handle_shut_down_pool_watcher(r, on_finish); - }); + handle_shut_down_pool_watchers(r, on_finish); + }); ctx = create_async_context_callback(m_threads->work_queue, ctx); - m_pool_watcher->shut_down(ctx); + auto gather_ctx = new C_Gather(g_ceph_context, ctx); + m_local_pool_watcher->shut_down(gather_ctx->new_sub()); + if (m_remote_pool_watcher) { + m_remote_pool_watcher->shut_down(gather_ctx->new_sub()); + } + gather_ctx->activate(); return; } } @@ -689,13 +684,17 @@ void PoolReplayer::shut_down_pool_watcher(Context *on_finish) { on_finish->complete(0); } -void PoolReplayer::handle_shut_down_pool_watcher(int r, Context *on_finish) { +void PoolReplayer::handle_shut_down_pool_watchers(int r, Context *on_finish) { dout(20) << "r=" << r << dendl; { Mutex::Locker locker(m_lock); - assert(m_pool_watcher); - m_pool_watcher.reset(); + assert(m_local_pool_watcher); + m_local_pool_watcher.reset(); + + if (m_remote_pool_watcher) { + m_remote_pool_watcher.reset(); + } } wait_for_update_ops(on_finish); } diff --git a/src/tools/rbd_mirror/PoolReplayer.h b/src/tools/rbd_mirror/PoolReplayer.h index 526ed536c0f..620b6db9a5e 100644 --- a/src/tools/rbd_mirror/PoolReplayer.h +++ b/src/tools/rbd_mirror/PoolReplayer.h @@ -63,21 +63,21 @@ public: private: struct PoolWatcherListener : public PoolWatcher<>::Listener { PoolReplayer *pool_replayer; + bool local; - PoolWatcherListener(PoolReplayer *pool_replayer) - : pool_replayer(pool_replayer) { + PoolWatcherListener(PoolReplayer *pool_replayer, bool local) + : pool_replayer(pool_replayer), local(local) { } void handle_update(const std::string &mirror_uuid, ImageIds &&added_image_ids, ImageIds &&removed_image_ids) override { - pool_replayer->handle_update(mirror_uuid, std::move(added_image_ids), - std::move(removed_image_ids)); + pool_replayer->handle_update((local ? "" : mirror_uuid), + std::move(added_image_ids), + std::move(removed_image_ids)); } }; - struct C_RefreshLocalImages; - void handle_update(const std::string &mirror_uuid, ImageIds &&added_image_ids, ImageIds &&removed_image_ids); @@ -89,13 +89,13 @@ private: void handle_post_acquire_leader(Context *on_finish); void handle_pre_release_leader(Context *on_finish); - void refresh_local_images(Context *on_finish); - void handle_refresh_local_images(int r, ImageIds &&image_ids, - Context *on_finish); + void init_local_pool_watcher(Context *on_finish); + void handle_init_local_pool_watcher(int r, Context *on_finish); + + void init_remote_pool_watcher(Context *on_finish); - void init_pool_watcher(Context *on_finish); - void shut_down_pool_watcher(Context *on_finish); - void handle_shut_down_pool_watcher(int r, Context *on_finish); + void shut_down_pool_watchers(Context *on_finish); + void handle_shut_down_pool_watchers(int r, Context *on_finish); void wait_for_update_ops(Context *on_finish); void handle_wait_for_update_ops(int r, Context *on_finish); @@ -119,15 +119,18 @@ private: int64_t m_local_pool_id = -1; - PoolWatcherListener m_pool_watcher_listener; - std::unique_ptr > m_pool_watcher; + PoolWatcherListener m_local_pool_watcher_listener; + std::unique_ptr > m_local_pool_watcher; + + PoolWatcherListener m_remote_pool_watcher_listener; + std::unique_ptr > m_remote_pool_watcher; std::unique_ptr> m_instance_replayer; std::string m_asok_hook_name; AdminSocketHook *m_asok_hook; - std::set m_init_image_ids; + std::map m_initial_mirror_image_ids; class PoolReplayerThread : public Thread { PoolReplayer *m_pool_replayer; -- 2.39.5