From c5a8b780edeb9b62a601f0718459837f3d66e63d Mon Sep 17 00:00:00 2001 From: Jason Dillaman Date: Thu, 26 May 2016 23:26:20 -0400 Subject: [PATCH] rbd-mirror: replayer should only handle a single pool The pool watcher now only needs to scan a single pool for its associated replayer since a peer is inherently tied to a single pool. Fixes: http://tracker.ceph.com/issues/16045 Signed-off-by: Jason Dillaman --- src/test/rbd_mirror/test_PoolWatcher.cc | 112 +------ src/tools/rbd_mirror/Mirror.cc | 11 +- src/tools/rbd_mirror/PoolWatcher.cc | 152 ++++----- src/tools/rbd_mirror/PoolWatcher.h | 26 +- src/tools/rbd_mirror/Replayer.cc | 402 +++++++++--------------- src/tools/rbd_mirror/Replayer.h | 40 +-- 6 files changed, 258 insertions(+), 485 deletions(-) diff --git a/src/test/rbd_mirror/test_PoolWatcher.cc b/src/test/rbd_mirror/test_PoolWatcher.cc index fd739f5bd8bd6..2a2708ed686c3 100644 --- a/src/test/rbd_mirror/test_PoolWatcher.cc +++ b/src/test/rbd_mirror/test_PoolWatcher.cc @@ -42,7 +42,6 @@ TestPoolWatcher() : m_lock("TestPoolWatcherLock"), { m_cluster = std::make_shared(); EXPECT_EQ("", connect_cluster_pp(*m_cluster)); - m_pool_watcher.reset(new PoolWatcher(m_cluster, 30, m_lock, m_cond)); } ~TestPoolWatcher() { @@ -59,9 +58,12 @@ TestPoolWatcher() : m_lock("TestPoolWatcherLock"), int64_t pool_id = m_cluster->pool_lookup(pool_name.c_str()); ASSERT_GE(pool_id, 0); m_pools.insert(pool_name); + + librados::IoCtx ioctx; + ASSERT_EQ(0, m_cluster->ioctx_create2(pool_id, ioctx)); + + m_pool_watcher.reset(new PoolWatcher(ioctx, 30, m_lock, m_cond)); if (enable_mirroring) { - librados::IoCtx ioctx; - ASSERT_EQ(0, m_cluster->ioctx_create2(pool_id, ioctx)); ASSERT_EQ(0, librbd::mirror_mode_set(ioctx, RBD_MIRROR_MODE_POOL)); std::string uuid; ASSERT_EQ(0, librbd::mirror_peer_add(ioctx, &uuid, @@ -73,50 +75,6 @@ TestPoolWatcher() : m_lock("TestPoolWatcherLock"), } } - void delete_pool(const string &name, const peer_t &peer) { - int64_t pool_id = m_cluster->pool_lookup(name.c_str()); - ASSERT_GE(pool_id, 0); - m_pools.erase(name); - ASSERT_EQ(0, m_cluster->pool_delete(name.c_str())); - m_mirrored_images.erase(pool_id); - } - - void create_cache_pool(const string &base_pool, string *cache_pool_name) { - bufferlist inbl; - *cache_pool_name = get_temp_pool_name("test-rbd-mirror-"); - ASSERT_EQ(0, m_cluster->pool_create(cache_pool_name->c_str())); - - ASSERT_EQ(0, m_cluster->mon_command( - "{\"prefix\": \"osd tier add\", \"pool\": \"" + base_pool + - "\", \"tierpool\": \"" + *cache_pool_name + - "\", \"force_nonempty\": \"--force-nonempty\" }", - inbl, NULL, NULL)); - ASSERT_EQ(0, m_cluster->mon_command( - "{\"prefix\": \"osd tier set-overlay\", \"pool\": \"" + base_pool + - "\", \"overlaypool\": \"" + *cache_pool_name + "\"}", - inbl, NULL, NULL)); - ASSERT_EQ(0, m_cluster->mon_command( - "{\"prefix\": \"osd tier cache-mode\", \"pool\": \"" + *cache_pool_name + - "\", \"mode\": \"writeback\"}", - inbl, NULL, NULL)); - m_cluster->wait_for_latest_osdmap(); - } - - void remove_cache_pool(const string &base_pool, const string &cache_pool) { - bufferlist inbl; - // tear down tiers - ASSERT_EQ(0, m_cluster->mon_command( - "{\"prefix\": \"osd tier remove-overlay\", \"pool\": \"" + base_pool + - "\"}", - inbl, NULL, NULL)); - ASSERT_EQ(0, m_cluster->mon_command( - "{\"prefix\": \"osd tier remove\", \"pool\": \"" + base_pool + - "\", \"tierpool\": \"" + cache_pool + "\"}", - inbl, NULL, NULL)); - m_cluster->wait_for_latest_osdmap(); - m_cluster->pool_delete(cache_pool.c_str()); - } - string get_image_id(librados::IoCtx *ioctx, const string &image_name) { string obj = librbd::util::id_obj_name(image_name); string id; @@ -148,7 +106,7 @@ TestPoolWatcher() : m_lock("TestPoolWatcherLock"), sizeof(mirror_image_info))); image.close(); - m_mirrored_images[ioctx.get_id()].insert(PoolWatcher::ImageIds( + m_mirrored_images.insert(PoolWatcher::ImageId( get_image_id(&ioctx, name), name, mirror_image_info.global_id)); } if (image_name != nullptr) @@ -193,7 +151,7 @@ TestPoolWatcher() : m_lock("TestPoolWatcherLock"), sizeof(mirror_image_info))); image.close(); - m_mirrored_images[cioctx.get_id()].insert(PoolWatcher::ImageIds( + m_mirrored_images.insert(PoolWatcher::ImageId( get_image_id(&cioctx, name), name, mirror_image_info.global_id)); } if (image_name != nullptr) @@ -212,23 +170,23 @@ TestPoolWatcher() : m_lock("TestPoolWatcherLock"), unique_ptr m_pool_watcher; set m_pools; - PoolWatcher::PoolImageIds m_mirrored_images; + PoolWatcher::ImageIds m_mirrored_images; uint64_t m_image_number; uint64_t m_snap_number; }; -TEST_F(TestPoolWatcher, NoPools) { +TEST_F(TestPoolWatcher, EmptyPool) { + string uuid1 = "00000000-0000-0000-0000-000000000001"; + peer_t site1(uuid1, "site1", "mirror1"); + create_pool(true, site1); check_images(); } TEST_F(TestPoolWatcher, ReplicatedPools) { string uuid1 = "00000000-0000-0000-0000-000000000001"; - string uuid2 = "20000000-2222-2222-2222-000000000002"; peer_t site1(uuid1, "site1", "mirror1"); - peer_t site2(uuid2, "site2", "mirror2"); string first_pool, local_pool, last_pool; - check_images(); create_pool(true, site1, &first_pool); check_images(); create_image(first_pool); @@ -242,50 +200,4 @@ TEST_F(TestPoolWatcher, ReplicatedPools) { check_images(); create_image(first_pool, false); check_images(); - - create_pool(false, peer_t(), &local_pool); - check_images(); - create_image(local_pool, false); - check_images(); - clone_image(first_pool, parent_image2, local_pool, false); - check_images(); - create_pool(true, site2); - check_images(); - - create_pool(true, site2, &last_pool); - check_images(); - clone_image(first_pool, parent_image2, last_pool); - check_images(); - create_image(last_pool); - check_images(); - delete_pool(last_pool, site2); - check_images(); - delete_pool(first_pool, site1); - check_images(); -} - -TEST_F(TestPoolWatcher, CachePools) { - peer_t site1("11111111-1111-1111-1111-111111111111", "site1", "mirror1"); - string base1, base2, cache1, cache2; - create_pool(true, site1, &base1); - check_images(); - - create_cache_pool(base1, &cache1); - BOOST_SCOPE_EXIT( base1, cache1, this_ ) { - this_->remove_cache_pool(base1, cache1); - } BOOST_SCOPE_EXIT_END; - check_images(); - create_image(base1); - check_images(); - create_image(base1, false); - check_images(); - - create_pool(false, peer_t(), &base2); - create_cache_pool(base2, &cache2); - BOOST_SCOPE_EXIT( base2, cache2, this_ ) { - this_->remove_cache_pool(base2, cache2); - } BOOST_SCOPE_EXIT_END; - check_images(); - create_image(base2, false); - check_images(); } diff --git a/src/tools/rbd_mirror/Mirror.cc b/src/tools/rbd_mirror/Mirror.cc index 21dac318bf064..71e64edf8d001 100644 --- a/src/tools/rbd_mirror/Mirror.cc +++ b/src/tools/rbd_mirror/Mirror.cc @@ -345,18 +345,16 @@ void Mirror::update_replayers(const PoolPeers &pool_peers) // remove stale replayers before creating new replayers for (auto it = m_replayers.begin(); it != m_replayers.end();) { - auto next_it(it); - ++next_it; - auto &peer = it->first.second; auto pool_peer_it = pool_peers.find(it->first.first); if (pool_peer_it == pool_peers.end() || pool_peer_it->second.find(peer) == pool_peer_it->second.end()) { dout(20) << "removing replayer for " << peer << dendl; // TODO: make async - m_replayers.erase(it); + it = m_replayers.erase(it); + } else { + ++it; } - it = next_it; } for (auto &kv : pool_peers) { @@ -365,7 +363,8 @@ void Mirror::update_replayers(const PoolPeers &pool_peers) if (m_replayers.find(pool_peer) == m_replayers.end()) { dout(20) << "starting replayer for " << peer << dendl; unique_ptr replayer(new Replayer(m_threads, m_image_deleter, - m_local, peer, m_args)); + m_local, kv.first, peer, + m_args)); // TODO: make async, and retry connecting within replayer int r = replayer->init(); if (r < 0) { diff --git a/src/tools/rbd_mirror/PoolWatcher.cc b/src/tools/rbd_mirror/PoolWatcher.cc index 21ccc2df7aa92..184e947fd4cbe 100644 --- a/src/tools/rbd_mirror/PoolWatcher.cc +++ b/src/tools/rbd_mirror/PoolWatcher.cc @@ -29,15 +29,15 @@ using librbd::cls_client::mirror_image_list; namespace rbd { namespace mirror { -PoolWatcher::PoolWatcher(RadosRef cluster, double interval_seconds, +PoolWatcher::PoolWatcher(librados::IoCtx &remote_io_ctx, + double interval_seconds, Mutex &lock, Cond &cond) : m_lock(lock), m_refresh_cond(cond), - m_stopping(false), - m_cluster(cluster), m_timer(g_ceph_context, m_lock, false), m_interval(interval_seconds) { + m_remote_io_ctx.dup(remote_io_ctx); m_timer.init(); } @@ -48,7 +48,7 @@ PoolWatcher::~PoolWatcher() m_timer.shutdown(); } -const PoolWatcher::PoolImageIds& PoolWatcher::get_images() const +const PoolWatcher::ImageIds& PoolWatcher::get_images() const { assert(m_lock.is_locked()); return m_images; @@ -56,106 +56,74 @@ const PoolWatcher::PoolImageIds& PoolWatcher::get_images() const void PoolWatcher::refresh_images(bool reschedule) { + ImageIds image_ids; + refresh(&image_ids); + + Mutex::Locker l(m_lock); + m_images = std::move(image_ids); + + if (!m_stopping && reschedule) { + FunctionContext *ctx = new FunctionContext( + boost::bind(&PoolWatcher::refresh_images, this, true)); + m_timer.add_event_after(m_interval, ctx); + } + m_refresh_cond.Signal(); + // TODO: perhaps use a workqueue instead, once we get notifications + // about new/removed mirrored images +} + +void PoolWatcher::refresh(ImageIds *image_ids) { dout(20) << "enter" << dendl; - PoolImageIds images; - list > pools; - int r = m_cluster->pool_list2(pools); + + std::string pool_name = m_remote_io_ctx.get_pool_name(); + rbd_mirror_mode_t mirror_mode; + int r = librbd::mirror_mode_get(m_remote_io_ctx, &mirror_mode); if (r < 0) { - derr << "error listing pools: " << cpp_strerror(r) << dendl; + derr << "could not tell whether mirroring was enabled for " + << pool_name << ": " << cpp_strerror(r) << dendl; + return; + } + if (mirror_mode == RBD_MIRROR_MODE_DISABLED) { + dout(20) << "pool " << pool_name << " has mirroring disabled" << dendl; return; } - for (auto kv : pools) { - int64_t pool_id = kv.first; - string pool_name = kv.second; - int64_t base_tier; - r = m_cluster->pool_get_base_tier(pool_id, &base_tier); - if (r == -ENOENT) { - dout(10) << "pool " << pool_name << " no longer exists" << dendl; - continue; - } else if (r < 0) { - derr << "Error retrieving base tier for pool " << pool_name << dendl; - continue; - } - if (pool_id != base_tier) { - // pool is a cache; skip it - continue; - } - - IoCtx ioctx; - r = m_cluster->ioctx_create2(pool_id, ioctx); - if (r == -ENOENT) { - dout(10) << "pool " << pool_name << " no longer exists" << dendl; - continue; - } else if (r < 0) { - derr << "Error accessing pool " << pool_name << cpp_strerror(r) << dendl; - continue; - } + std::map images_map; + r = librbd::list_images_v2(m_remote_io_ctx, images_map); + if (r < 0) { + derr << "error retrieving image names from pool " << pool_name << ": " + << cpp_strerror(r) << dendl; + } - rbd_mirror_mode_t mirror_mode; - r = librbd::mirror_mode_get(ioctx, &mirror_mode); - if (r < 0) { - derr << "could not tell whether mirroring was enabled for " << pool_name - << " : " << cpp_strerror(r) << dendl; - continue; - } - if (mirror_mode == RBD_MIRROR_MODE_DISABLED) { - dout(20) << "pool " << pool_name << " has mirroring disabled" << dendl; - continue; - } + std::map image_id_to_name; + for (const auto& img_pair : images_map) { + image_id_to_name.insert(std::make_pair(img_pair.second, img_pair.first)); + } - std::map images_map; - r = librbd::list_images_v2(ioctx, images_map); + std::string last_read = ""; + int max_read = 1024; + do { + std::map mirror_images; + r = mirror_image_list(&m_remote_io_ctx, last_read, max_read, + &mirror_images); if (r < 0) { - derr << "error retrieving image names from pool " << pool_name << ": " + derr << "error listing mirrored image directory: " << cpp_strerror(r) << dendl; + continue; } - - std::map image_id_to_name; - for (const auto& img_pair : images_map) { - image_id_to_name.insert(std::make_pair(img_pair.second, img_pair.first)); - } - - std::set image_ids; - std::string last_read = ""; - int max_read = 1024; - do { - std::map mirror_images; - r = mirror_image_list(&ioctx, last_read, max_read, &mirror_images); - if (r < 0) { - derr << "error listing mirrored image directory: " - << cpp_strerror(r) << dendl; - continue; - } - for (auto it = mirror_images.begin(); it != mirror_images.end(); ++it) { - boost::optional image_name(boost::none); - auto it2 = image_id_to_name.find(it->first); - if (it2 != image_id_to_name.end()) { - image_name = it2->second; - } - image_ids.insert(ImageIds(it->first, image_name, it->second)); - } - if (!mirror_images.empty()) { - last_read = mirror_images.rbegin()->first; + for (auto it = mirror_images.begin(); it != mirror_images.end(); ++it) { + boost::optional image_name(boost::none); + auto it2 = image_id_to_name.find(it->first); + if (it2 != image_id_to_name.end()) { + image_name = it2->second; } - r = mirror_images.size(); - } while (r == max_read); - - if (!image_ids.empty()) { - images[pool_id] = std::move(image_ids); + image_ids->insert(ImageId(it->first, image_name, it->second)); } - } - - Mutex::Locker l(m_lock); - m_images = std::move(images); - if (!m_stopping && reschedule) { - FunctionContext *ctx = new FunctionContext( - boost::bind(&PoolWatcher::refresh_images, this, true)); - m_timer.add_event_after(m_interval, ctx); - } - m_refresh_cond.Signal(); - // TODO: perhaps use a workqueue instead, once we get notifications - // about new/removed mirrored images + if (!mirror_images.empty()) { + last_read = mirror_images.rbegin()->first; + } + r = mirror_images.size(); + } while (r == max_read); } } // namespace mirror diff --git a/src/tools/rbd_mirror/PoolWatcher.h b/src/tools/rbd_mirror/PoolWatcher.h index 9eeb1015bbeb5..9983f9d56edfd 100644 --- a/src/tools/rbd_mirror/PoolWatcher.h +++ b/src/tools/rbd_mirror/PoolWatcher.h @@ -24,45 +24,47 @@ namespace mirror { */ class PoolWatcher { public: - struct ImageIds { + struct ImageId { std::string id; boost::optional name; std::string global_id; - ImageIds(const std::string &id, - const boost::optional &name = boost::none, - const std::string &global_id = "") + ImageId(const std::string &id, + const boost::optional &name = boost::none, + const std::string &global_id = "") : id(id), name(name), global_id(global_id) { } - inline bool operator==(const ImageIds &rhs) const { + inline bool operator==(const ImageId &rhs) const { return (id == rhs.id && name == rhs.name && global_id == rhs.global_id); } - inline bool operator<(const ImageIds &rhs) const { + inline bool operator<(const ImageId &rhs) const { return id < rhs.id; } }; - typedef std::map > PoolImageIds; + typedef std::set ImageIds; - PoolWatcher(RadosRef cluster, double interval_seconds, + PoolWatcher(librados::IoCtx &remote_io_ctx, double interval_seconds, Mutex &lock, Cond &cond); ~PoolWatcher(); PoolWatcher(const PoolWatcher&) = delete; PoolWatcher& operator=(const PoolWatcher&) = delete; - const PoolImageIds& get_images() const; + const ImageIds& get_images() const; void refresh_images(bool reschedule=true); private: + librados::IoCtx m_remote_io_ctx; Mutex &m_lock; Cond &m_refresh_cond; - bool m_stopping; - RadosRef m_cluster; + bool m_stopping = false; SafeTimer m_timer; double m_interval; - PoolImageIds m_images; + ImageIds m_images; + + void refresh(ImageIds *image_ids); }; } // namespace mirror diff --git a/src/tools/rbd_mirror/Replayer.cc b/src/tools/rbd_mirror/Replayer.cc index 8dd3a0dfa7f64..f3d4b03975fa7 100644 --- a/src/tools/rbd_mirror/Replayer.cc +++ b/src/tools/rbd_mirror/Replayer.cc @@ -229,8 +229,8 @@ private: }; Replayer::Replayer(Threads *threads, std::shared_ptr image_deleter, - RadosRef local_cluster, const peer_t &peer, - const std::vector &args) : + RadosRef local_cluster, int64_t local_pool_id, + const peer_t &peer, const std::vector &args) : m_threads(threads), m_image_deleter(image_deleter), m_lock(stringify("rbd::mirror::Replayer ") + stringify(peer)), @@ -238,6 +238,7 @@ Replayer::Replayer(Threads *threads, std::shared_ptr image_deleter m_args(args), m_local(local_cluster), m_remote(new librados::Rados), + m_local_pool_id(local_pool_id), m_asok_hook(nullptr), m_replayer_thread(this) { @@ -263,6 +264,13 @@ int Replayer::init() { dout(20) << "replaying for " << m_peer << dendl; + int r = m_local->ioctx_create2(m_local_pool_id, m_local_io_ctx); + if (r < 0) { + derr << "error accessing local pool " << m_local_pool_id << ": " + << cpp_strerror(r) << dendl; + return r; + } + // NOTE: manually bootstrap a CephContext here instead of via // the librados API to avoid mixing global singletons between // the librados shared library and the daemon @@ -279,7 +287,7 @@ int Replayer::init() cct->_conf->cluster = m_peer.cluster_name; // librados::Rados::conf_read_file - int r = cct->_conf->parse_config_files(nullptr, nullptr, 0); + r = cct->_conf->parse_config_files(nullptr, nullptr, 0); if (r < 0) { derr << "could not read ceph conf for " << m_peer << ": " << cpp_strerror(r) << dendl; @@ -326,13 +334,22 @@ int Replayer::init() return r; } + r = m_remote->ioctx_create(m_local_io_ctx.get_pool_name().c_str(), + m_remote_io_ctx); + if (r < 0) { + derr << "error accessing remote pool " << m_local_io_ctx.get_pool_name() + << ": " << cpp_strerror(r) << dendl; + return r; + } + m_remote_pool_id = m_remote_io_ctx.get_id(); + dout(20) << "connected to " << m_peer << dendl; // Bootstrap existing mirroring images init_local_mirroring_images(); // TODO: make interval configurable - m_pool_watcher.reset(new PoolWatcher(m_remote, 30, m_lock, m_cond)); + m_pool_watcher.reset(new PoolWatcher(m_remote_io_ctx, 30, m_lock, m_cond)); m_pool_watcher->refresh_images(); m_replayer_thread.create("replayer"); @@ -341,101 +358,49 @@ int Replayer::init() } void Replayer::init_local_mirroring_images() { - list > pools; - int r = m_local->pool_list2(pools); + rbd_mirror_mode_t mirror_mode; + int r = librbd::mirror_mode_get(m_local_io_ctx, &mirror_mode); if (r < 0) { - derr << "error listing pools: " << cpp_strerror(r) << dendl; + derr << "could not tell whether mirroring was enabled for " + << m_local_io_ctx.get_pool_name() << ": " << cpp_strerror(r) << dendl; + return; + } + if (mirror_mode == RBD_MIRROR_MODE_DISABLED) { + dout(20) << "pool " << m_local_io_ctx.get_pool_name() << " " + << "has mirroring disabled" << dendl; return; } - for (auto kv : pools) { - int64_t pool_id = kv.first; - string pool_name = kv.second; - int64_t base_tier; - r = m_local->pool_get_base_tier(pool_id, &base_tier); - if (r == -ENOENT) { - dout(10) << "pool " << pool_name << " no longer exists" << dendl; - continue; - } else if (r < 0) { - derr << "Error retrieving base tier for pool " << pool_name << dendl; - continue; - } - if (pool_id != base_tier) { - // pool is a cache; skip it - continue; - } - - librados::IoCtx ioctx; - r = m_local->ioctx_create2(pool_id, ioctx); - if (r == -ENOENT) { - dout(10) << "pool " << pool_name << " no longer exists" << dendl; - continue; - } else if (r < 0) { - derr << "Error accessing pool " << pool_name << cpp_strerror(r) << dendl; - continue; - } + std::set images; - rbd_mirror_mode_t mirror_mode; - r = librbd::mirror_mode_get(ioctx, &mirror_mode); + std::string last_read = ""; + int max_read = 1024; + do { + std::map mirror_images; + r = librbd::cls_client::mirror_image_list(&m_local_io_ctx, last_read, + max_read, &mirror_images); if (r < 0) { - derr << "could not tell whether mirroring was enabled for " << pool_name - << " : " << cpp_strerror(r) << dendl; - continue; - } - if (mirror_mode == RBD_MIRROR_MODE_DISABLED) { - dout(20) << "pool " << pool_name << " has mirroring disabled" << dendl; - continue; - } - - librados::IoCtx remote_ioctx; - r = m_remote->ioctx_create(ioctx.get_pool_name().c_str(), remote_ioctx); - if (r < 0 && r != -ENOENT) { - dout(10) << "Error connecting to remote pool " << ioctx.get_pool_name() - << ": " << cpp_strerror(r) << dendl; + derr << "error listing mirrored image directory: " + << cpp_strerror(r) << dendl; continue; - } else if (r == -ENOENT) { - // remote pool does not exist anymore, we are going to add the images - // with local pool id - pool_id = ioctx.get_id(); - } - else { - pool_id = remote_ioctx.get_id(); } - - std::set images; - - std::string last_read = ""; - int max_read = 1024; - do { - std::map mirror_images; - r = librbd::cls_client::mirror_image_list(&ioctx, last_read, max_read, - &mirror_images); + for (auto it = mirror_images.begin(); it != mirror_images.end(); ++it) { + std::string image_name; + r = dir_get_name(&m_local_io_ctx, RBD_DIRECTORY, it->first, &image_name); if (r < 0) { - derr << "error listing mirrored image directory: " - << cpp_strerror(r) << dendl; + derr << "error retrieving local image name: " << cpp_strerror(r) + << dendl; continue; } - for (auto it = mirror_images.begin(); it != mirror_images.end(); ++it) { - std::string image_name; - r = dir_get_name(&ioctx, RBD_DIRECTORY, it->first, &image_name); - if (r < 0) { - derr << "error retrieving local image name: " << cpp_strerror(r) - << dendl; - continue; - } - images.insert(InitImageInfo(it->second, ioctx.get_id(), it->first, - image_name)); - } - if (!mirror_images.empty()) { - last_read = mirror_images.rbegin()->first; - } - r = mirror_images.size(); - } while (r == max_read); - - if (!images.empty()) { - m_init_images[pool_id] = std::move(images); + images.insert(InitImageInfo(it->second, it->first, image_name)); } - } + if (!mirror_images.empty()) { + last_read = mirror_images.rbegin()->first; + } + r = mirror_images.size(); + } while (r == max_read); + + m_init_images = std::move(images); } void Replayer::run() @@ -452,11 +417,11 @@ void Replayer::run() m_image_deleter.reset(); - PoolImageIds empty_sources; + ImageIds empty_sources; while (true) { Mutex::Locker l(m_lock); set_sources(empty_sources); - if (m_images.empty()) { + if (m_image_replayers.empty()) { break; } m_cond.WaitInterval(g_ceph_context, m_lock, seconds(1)); @@ -475,12 +440,9 @@ void Replayer::print_status(Formatter *f, stringstream *ss) f->open_array_section("image_replayers"); }; - for (auto it = m_images.begin(); it != m_images.end(); it++) { - auto &pool_images = it->second; - for (auto i = pool_images.begin(); i != pool_images.end(); i++) { - auto &image_replayer = i->second; - image_replayer->print_status(f, ss); - } + for (auto &kv : m_image_replayers) { + auto &image_replayer = kv.second; + image_replayer->print_status(f, ss); } if (f) { @@ -502,12 +464,9 @@ void Replayer::start() m_manual_stop = false; - for (auto it = m_images.begin(); it != m_images.end(); it++) { - auto &pool_images = it->second; - for (auto i = pool_images.begin(); i != pool_images.end(); i++) { - auto &image_replayer = i->second; - image_replayer->start(nullptr, nullptr, true); - } + for (auto &kv : m_image_replayers) { + auto &image_replayer = kv.second; + image_replayer->start(nullptr, nullptr, true); } } @@ -523,12 +482,9 @@ void Replayer::stop() m_manual_stop = true; - for (auto it = m_images.begin(); it != m_images.end(); it++) { - auto &pool_images = it->second; - for (auto i = pool_images.begin(); i != pool_images.end(); i++) { - auto &image_replayer = i->second; - image_replayer->stop(nullptr, true); - } + for (auto &kv : m_image_replayers) { + auto &image_replayer = kv.second; + image_replayer->stop(nullptr, true); } } @@ -544,12 +500,9 @@ void Replayer::restart() m_manual_stop = false; - for (auto it = m_images.begin(); it != m_images.end(); it++) { - auto &pool_images = it->second; - for (auto i = pool_images.begin(); i != pool_images.end(); i++) { - auto &image_replayer = i->second; - image_replayer->restart(); - } + for (auto &kv : m_image_replayers) { + auto &image_replayer = kv.second; + image_replayer->restart(); } } @@ -563,184 +516,122 @@ void Replayer::flush() return; } - for (auto it = m_images.begin(); it != m_images.end(); it++) { - auto &pool_images = it->second; - for (auto i = pool_images.begin(); i != pool_images.end(); i++) { - auto &image_replayer = i->second; - image_replayer->flush(); - } + for (auto &kv : m_image_replayers) { + auto &image_replayer = kv.second; + image_replayer->flush(); } } -void Replayer::set_sources(const PoolImageIds &pool_image_ids) +void Replayer::set_sources(const ImageIds &image_ids) { dout(20) << "enter" << dendl; assert(m_lock.is_locked()); if (!m_init_images.empty()) { - dout(20) << "m_init_images has images!" << dendl; - for (auto it = m_init_images.begin(); it != m_init_images.end(); ++it) { - int64_t pool_id = it->first; - std::set& images = it->second; - auto remote_pool_it = pool_image_ids.find(pool_id); - if (remote_pool_it != pool_image_ids.end()) { - const std::set& remote_images = remote_pool_it->second; - for (const auto& remote_image : remote_images) { - auto image = images.find(InitImageInfo(remote_image.global_id)); - if (image != images.end()) { - images.erase(image); - } - } + dout(20) << "scanning initial local image set" << dendl; + for (auto &remote_image : image_ids) { + auto it = m_init_images.find(InitImageInfo(remote_image.global_id)); + if (it != m_init_images.end()) { + m_init_images.erase(it); } } + // the remaining images in m_init_images must be deleted - for (auto it = m_init_images.begin(); it != m_init_images.end(); ++it) { - for (const auto& image : it->second) { - dout(20) << "scheduling the deletion of init image: " - << image.name << dendl; - m_image_deleter->schedule_image_delete(image.pool_id, image.id, - image.name, image.global_id); - } + for (auto &image : m_init_images) { + dout(20) << "scheduling the deletion of init image: " + << image.name << dendl; + m_image_deleter->schedule_image_delete(m_local_pool_id, image.id, + image.name, image.global_id); } m_init_images.clear(); - } else { - dout(20) << "m_init_images is empty!" << dendl; } - for (auto it = m_images.begin(); it != m_images.end();) { - int64_t pool_id = it->first; - auto &pool_images = it->second; - - // pool has no mirrored images - if (pool_image_ids.find(pool_id) == pool_image_ids.end()) { - dout(20) << "pool " << pool_id << " has no mirrored images" << dendl; - for (auto images_it = pool_images.begin(); - images_it != pool_images.end();) { - if (images_it->second->is_running()) { - dout(20) << "stop image replayer for " - << images_it->second->get_global_image_id() << dendl; - } - if (stop_image_replayer(images_it->second)) { - images_it = pool_images.erase(images_it); - } else { - ++images_it; - } + // shut down replayers for non-mirrored images + bool existing_image_replayers = !m_image_replayers.empty(); + for (auto image_it = m_image_replayers.begin(); + image_it != m_image_replayers.end();) { + if (image_ids.find(ImageId(image_it->first)) == image_ids.end()) { + if (image_it->second->is_running()) { + dout(20) << "stop image replayer for " + << image_it->second->get_global_image_id() << dendl; } - if (pool_images.empty()) { - mirror_image_status_shut_down(pool_id); - it = m_images.erase(it); - } else { - ++it; - } - continue; - } - - // shut down replayers for non-mirrored images - for (auto images_it = pool_images.begin(); - images_it != pool_images.end();) { - auto &image_ids = pool_image_ids.at(pool_id); - if (image_ids.find(ImageIds(images_it->first)) == image_ids.end()) { - if (images_it->second->is_running()) { - dout(20) << "stop image replayer for " - << images_it->second->get_global_image_id() << dendl; - } - if (stop_image_replayer(images_it->second)) { - images_it = pool_images.erase(images_it); - } else { - ++images_it; - } - } else { - ++images_it; + if (stop_image_replayer(image_it->second)) { + image_it = m_image_replayers.erase(image_it); + continue; } } - ++it; + ++image_it; } - // (re)start new image replayers - for (const auto &kv : pool_image_ids) { - int64_t pool_id = kv.first; - - // TODO: clean up once remote peer -> image replayer refactored - librados::IoCtx remote_ioctx; - int r = m_remote->ioctx_create2(pool_id, remote_ioctx); - if (r < 0) { - derr << "failed to lookup remote pool " << pool_id << ": " - << cpp_strerror(r) << dendl; - continue; + if (image_ids.empty()) { + if (existing_image_replayers && m_image_replayers.empty()) { + mirror_image_status_shut_down(); } + return; + } - librados::IoCtx local_ioctx; - r = m_local->ioctx_create(remote_ioctx.get_pool_name().c_str(), local_ioctx); - if (r < 0) { - derr << "failed to lookup local pool " << remote_ioctx.get_pool_name() - << ": " << cpp_strerror(r) << dendl; - continue; - } + std::string local_mirror_uuid; + int r = librbd::cls_client::mirror_uuid_get(&m_local_io_ctx, + &local_mirror_uuid); + if (r < 0) { + derr << "failed to retrieve local mirror uuid from pool " + << m_local_io_ctx.get_pool_name() << ": " << cpp_strerror(r) << dendl; + return; + } - std::string local_mirror_uuid; - r = librbd::cls_client::mirror_uuid_get(&local_ioctx, &local_mirror_uuid); - if (r < 0) { - derr << "failed to retrieve local mirror uuid from pool " - << local_ioctx.get_pool_name() << ": " << cpp_strerror(r) << dendl; - continue; - } + std::string remote_mirror_uuid; + r = librbd::cls_client::mirror_uuid_get(&m_remote_io_ctx, + &remote_mirror_uuid); + if (r < 0) { + derr << "failed to retrieve remote mirror uuid from pool " + << m_remote_io_ctx.get_pool_name() << ": " << cpp_strerror(r) << dendl; + return; + } - std::string remote_mirror_uuid; - r = librbd::cls_client::mirror_uuid_get(&remote_ioctx, &remote_mirror_uuid); + if (m_image_replayers.empty()) { + // create entry for pool if it doesn't exist + r = mirror_image_status_init(); if (r < 0) { - derr << "failed to retrieve remote mirror uuid from pool " - << remote_ioctx.get_pool_name() << ": " << cpp_strerror(r) << dendl; - continue; + return; } + } - // create entry for pool if it doesn't exist - auto &pool_replayers = m_images[pool_id]; - - if (pool_replayers.empty()) { - r = mirror_image_status_init(pool_id, local_ioctx); - if (r < 0) { - continue; - } + for (auto &image_id : image_ids) { + auto it = m_image_replayers.find(image_id.id); + if (it == m_image_replayers.end()) { + unique_ptr > image_replayer(new ImageReplayer<>( + m_threads, m_local, m_remote, local_mirror_uuid, remote_mirror_uuid, + m_local_pool_id, m_remote_pool_id, image_id.id, image_id.global_id)); + it = m_image_replayers.insert( + std::make_pair(image_id.id, std::move(image_replayer))).first; } - - for (const auto &image_id : kv.second) { - auto it = pool_replayers.find(image_id.id); - if (it == pool_replayers.end()) { - unique_ptr > image_replayer(new ImageReplayer<>( - m_threads, m_local, m_remote, local_mirror_uuid, remote_mirror_uuid, - local_ioctx.get_id(), pool_id, image_id.id, image_id.global_id)); - it = pool_replayers.insert( - std::make_pair(image_id.id, std::move(image_replayer))).first; - } - if (!it->second->is_running()) { - dout(20) << "starting image replayer for " - << it->second->get_global_image_id() << dendl; - } - start_image_replayer(it->second, image_id.name); + if (!it->second->is_running()) { + dout(20) << "starting image replayer for " + << it->second->get_global_image_id() << dendl; } + start_image_replayer(it->second, image_id.name); } } -int Replayer::mirror_image_status_init(int64_t pool_id, - librados::IoCtx& ioctx) { - assert(m_status_watchers.find(pool_id) == m_status_watchers.end()); - - uint64_t instance_id = librados::Rados(ioctx).get_instance_id(); +int Replayer::mirror_image_status_init() { + assert(!m_status_watcher); - dout(20) << "pool_id=" << pool_id << ", instance_id=" << instance_id << dendl; + uint64_t instance_id = librados::Rados(m_local_io_ctx).get_instance_id(); + dout(20) << "pool_id=" << m_local_pool_id << ", " + << "instance_id=" << instance_id << dendl; librados::ObjectWriteOperation op; librbd::cls_client::mirror_image_status_remove_down(&op); - int r = ioctx.operate(RBD_MIRRORING, &op); + int r = m_local_io_ctx.operate(RBD_MIRRORING, &op); if (r < 0) { derr << "error initializing " << RBD_MIRRORING << "object: " << cpp_strerror(r) << dendl; return r; } - unique_ptr - watch_ctx(new MirrorStatusWatchCtx(ioctx, m_threads->work_queue)); + unique_ptr watch_ctx( + new MirrorStatusWatchCtx(m_local_io_ctx, m_threads->work_queue)); r = watch_ctx->register_watch(); if (r < 0) { @@ -749,22 +640,19 @@ int Replayer::mirror_image_status_init(int64_t pool_id, return r; } - m_status_watchers.insert(std::make_pair(pool_id, std::move(watch_ctx))); - + m_status_watcher = std::move(watch_ctx); return 0; } -void Replayer::mirror_image_status_shut_down(int64_t pool_id) { - auto watcher_it = m_status_watchers.find(pool_id); - assert(watcher_it != m_status_watchers.end()); +void Replayer::mirror_image_status_shut_down() { + assert(m_status_watcher); - int r = watcher_it->second->unregister_watch(); + int r = m_status_watcher->unregister_watch(); if (r < 0) { - derr << "error unregistering watcher for " << watcher_it->second->get_oid() + derr << "error unregistering watcher for " << m_status_watcher->get_oid() << " object: " << cpp_strerror(r) << dendl; } - - m_status_watchers.erase(watcher_it); + m_status_watcher.reset(); } void Replayer::start_image_replayer(unique_ptr > &image_replayer, diff --git a/src/tools/rbd_mirror/Replayer.h b/src/tools/rbd_mirror/Replayer.h index ff01fca5269c2..44b49e763f756 100644 --- a/src/tools/rbd_mirror/Replayer.h +++ b/src/tools/rbd_mirror/Replayer.h @@ -34,7 +34,7 @@ class MirrorStatusWatchCtx; class Replayer { public: Replayer(Threads *threads, std::shared_ptr image_deleter, - RadosRef local_cluster, const peer_t &peer, + RadosRef local_cluster, int64_t local_pool_id, const peer_t &peer, const std::vector &args); ~Replayer(); Replayer(const Replayer&) = delete; @@ -50,18 +50,18 @@ public: void flush(); private: + typedef PoolWatcher::ImageId ImageId; typedef PoolWatcher::ImageIds ImageIds; - typedef PoolWatcher::PoolImageIds PoolImageIds; void init_local_mirroring_images(); - void set_sources(const PoolImageIds &pool_image_ids); + void set_sources(const ImageIds &image_ids); void start_image_replayer(unique_ptr > &image_replayer, const boost::optional& image_name); bool stop_image_replayer(unique_ptr > &image_replayer); - int mirror_image_status_init(int64_t pool_id, librados::IoCtx& ioctx); - void mirror_image_status_shut_down(int64_t pool_id); + int mirror_image_status_init(); + void mirror_image_status_shut_down(); Threads *m_threads; std::shared_ptr m_image_deleter; @@ -72,35 +72,39 @@ private: peer_t m_peer; std::vector m_args; - RadosRef m_local, m_remote; + RadosRef m_local; + RadosRef m_remote; + + librados::IoCtx m_local_io_ctx; + librados::IoCtx m_remote_io_ctx; + + int64_t m_local_pool_id = -1; + int64_t m_remote_pool_id = -1; + std::unique_ptr m_pool_watcher; - // index by pool so it's easy to tell what is affected - // when a pool's configuration changes - std::map > > > m_images; - std::map > m_status_watchers; + std::map > > m_image_replayers; + std::unique_ptr m_status_watcher; + ReplayerAdminSocketHook *m_asok_hook; struct InitImageInfo { std::string global_id; - int64_t pool_id; std::string id; std::string name; - InitImageInfo(const std::string& global_id, int64_t pool_id = 0, - const std::string &id = "", const std::string &name = "") - : global_id(global_id), pool_id(pool_id), id(id), name(name) { + InitImageInfo(const std::string& global_id, const std::string &id = "", + const std::string &name = "") + : global_id(global_id), id(id), name(name) { } inline bool operator==(const InitImageInfo &rhs) const { - return (global_id == rhs.global_id && pool_id == rhs.pool_id && - id == rhs.id && name == rhs.name); + return (global_id == rhs.global_id && id == rhs.id && name == rhs.name); } inline bool operator<(const InitImageInfo &rhs) const { return global_id < rhs.global_id; } }; - std::map > m_init_images; + std::set m_init_images; class ReplayerThread : public Thread { Replayer *m_replayer; -- 2.39.5