From: Jason Dillaman Date: Thu, 26 May 2016 20:03:59 +0000 (-0400) Subject: rbd-mirror: group peers by local pools X-Git-Tag: v11.0.0~378^2~2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=020f0282a0771c0d321732a62f53cee10a193715;p=ceph.git rbd-mirror: group peers by local pools The peer structure is automatically unique per pool due to its UUID, so grouping local pools by a single peer doesn't work. Signed-off-by: Jason Dillaman --- diff --git a/src/test/rbd_mirror/test_ClusterWatcher.cc b/src/test/rbd_mirror/test_ClusterWatcher.cc index 2d7d3f2717f..204064ce35b 100644 --- a/src/test/rbd_mirror/test_ClusterWatcher.cc +++ b/src/test/rbd_mirror/test_ClusterWatcher.cc @@ -60,7 +60,7 @@ public: uuid != nullptr ? uuid : &gen_uuid, peer.cluster_name, peer.client_name)); - m_peer_configs[peer].insert(pool_id); + m_pool_peers[pool_id].insert(peer); m_mirrored_pools.insert(pool_name); } if (name != nullptr) { @@ -71,11 +71,11 @@ public: void delete_pool(const string &name, const peer_t &peer) { int64_t pool_id = m_cluster->pool_lookup(name.c_str()); ASSERT_GE(pool_id, 0); - if (m_peer_configs.find(peer) != m_peer_configs.end()) { - m_peer_configs[peer].erase(pool_id); + if (m_pool_peers.find(pool_id) != m_pool_peers.end()) { + m_pool_peers[pool_id].erase(peer); m_mirrored_pools.erase(name); - if (m_peer_configs[peer].empty()) { - m_peer_configs.erase(peer); + if (m_pool_peers[pool_id].empty()) { + m_pool_peers.erase(pool_id); } } m_pools.erase(name); @@ -121,7 +121,7 @@ public: void check_peers() { m_cluster_watcher->refresh_pools(); Mutex::Locker l(m_lock); - ASSERT_EQ(m_peer_configs, m_cluster_watcher->get_peer_configs()); + ASSERT_EQ(m_pool_peers, m_cluster_watcher->get_pool_peers()); ASSERT_EQ(m_mirrored_pools, m_cluster_watcher->get_pool_names()); } @@ -131,7 +131,7 @@ public: set m_pools; set m_mirrored_pools; - map > m_peer_configs; + ClusterWatcher::PoolPeers m_pool_peers; }; TEST_F(TestClusterWatcher, NoPools) { diff --git a/src/tools/rbd_mirror/ClusterWatcher.cc b/src/tools/rbd_mirror/ClusterWatcher.cc index 4a69573138a..516e1e3deef 100644 --- a/src/tools/rbd_mirror/ClusterWatcher.cc +++ b/src/tools/rbd_mirror/ClusterWatcher.cc @@ -32,13 +32,13 @@ ClusterWatcher::ClusterWatcher(RadosRef cluster, Mutex &lock) : { } -const map >& ClusterWatcher::get_peer_configs() const +const ClusterWatcher::PoolPeers& ClusterWatcher::get_pool_peers() const { assert(m_lock.is_locked()); - return m_peer_configs; + return m_pool_peers; } -const std::set& ClusterWatcher::get_pool_names() const +const ClusterWatcher::PoolNames& ClusterWatcher::get_pool_names() const { assert(m_lock.is_locked()); return m_pool_names; @@ -47,19 +47,20 @@ const std::set& ClusterWatcher::get_pool_names() const void ClusterWatcher::refresh_pools() { dout(20) << "enter" << dendl; - map > peer_configs; - set pool_names; - read_configs(&peer_configs, &pool_names); + + PoolPeers pool_peers; + PoolNames pool_names; + read_pool_peers(&pool_peers, &pool_names); Mutex::Locker l(m_lock); - m_peer_configs = peer_configs; + m_pool_peers = pool_peers; m_pool_names = pool_names; // TODO: perhaps use a workqueue instead, once we get notifications // about config changes for existing pools } -void ClusterWatcher::read_configs(map > *peer_configs, - set *pool_names) +void ClusterWatcher::read_pool_peers(PoolPeers *pool_peers, + PoolNames *pool_names) { list > pools; int r = m_cluster->pool_list2(pools); @@ -115,12 +116,7 @@ void ClusterWatcher::read_configs(map > *peer_configs, continue; } - for (peer_t peer : configs) { - dout(20) << "pool " << pool_name << " has mirroring enabled for peer " - << peer << dendl; - (*peer_configs)[peer].insert(pool_id); - } - + pool_peers->insert({pool_id, Peers{configs.begin(), configs.end()}}); pool_names->insert(pool_name); } } diff --git a/src/tools/rbd_mirror/ClusterWatcher.h b/src/tools/rbd_mirror/ClusterWatcher.h index d7087484a12..b21e49a4574 100644 --- a/src/tools/rbd_mirror/ClusterWatcher.h +++ b/src/tools/rbd_mirror/ClusterWatcher.h @@ -23,23 +23,27 @@ namespace mirror { */ class ClusterWatcher { public: + typedef std::set Peers; + typedef std::map PoolPeers; + typedef std::set PoolNames; + ClusterWatcher(RadosRef cluster, Mutex &lock); ~ClusterWatcher() = default; ClusterWatcher(const ClusterWatcher&) = delete; ClusterWatcher& operator=(const ClusterWatcher&) = delete; + // Caller controls frequency of calls void refresh_pools(); - const std::map >& get_peer_configs() const; - const std::set& get_pool_names() const; + const PoolPeers& get_pool_peers() const; + const PoolNames& get_pool_names() const; private: - void read_configs(std::map > *peer_configs, - std::set *pool_names); - Mutex &m_lock; RadosRef m_cluster; - std::map > m_peer_configs; - std::set m_pool_names; + PoolPeers m_pool_peers; + PoolNames m_pool_names; + + void read_pool_peers(PoolPeers *pool_peers, PoolNames *pool_names); }; } // namespace mirror diff --git a/src/tools/rbd_mirror/Mirror.cc b/src/tools/rbd_mirror/Mirror.cc index 02dda7cdad0..21dac318bf0 100644 --- a/src/tools/rbd_mirror/Mirror.cc +++ b/src/tools/rbd_mirror/Mirror.cc @@ -230,7 +230,7 @@ void Mirror::run() m_local_cluster_watcher->refresh_pools(); Mutex::Locker l(m_lock); if (!m_manual_stop) { - update_replayers(m_local_cluster_watcher->get_peer_configs()); + update_replayers(m_local_cluster_watcher->get_pool_peers()); } // TODO: make interval configurable m_cond.WaitInterval(g_ceph_context, m_lock, seconds(30)); @@ -338,35 +338,41 @@ void Mirror::flush() } } -void Mirror::update_replayers(const map > &peer_configs) +void Mirror::update_replayers(const PoolPeers &pool_peers) { dout(20) << "enter" << dendl; assert(m_lock.is_locked()); // remove stale replayers before creating new replayers for (auto it = m_replayers.begin(); it != m_replayers.end();) { - peer_t peer = it->first; - if (peer_configs.find(peer) == peer_configs.end()) { + auto next_it(it); + ++next_it; + + auto &peer = it->first.second; + auto pool_peer_it = pool_peers.find(it->first.first); + if (pool_peer_it == pool_peers.end() || + pool_peer_it->second.find(peer) == pool_peer_it->second.end()) { dout(20) << "removing replayer for " << peer << dendl; // TODO: make async - m_replayers.erase(it++); - } else { - ++it; + m_replayers.erase(it); } + it = next_it; } - for (auto &kv : peer_configs) { - const peer_t &peer = kv.first; - if (m_replayers.find(peer) == m_replayers.end()) { - dout(20) << "starting replayer for " << peer << dendl; - unique_ptr replayer(new Replayer(m_threads, m_image_deleter, - m_local, peer, m_args)); - // TODO: make async, and retry connecting within replayer - int r = replayer->init(); - if (r < 0) { - continue; + for (auto &kv : pool_peers) { + for (auto &peer : kv.second) { + PoolPeer pool_peer(kv.first, peer); + if (m_replayers.find(pool_peer) == m_replayers.end()) { + dout(20) << "starting replayer for " << peer << dendl; + unique_ptr replayer(new Replayer(m_threads, m_image_deleter, + m_local, peer, m_args)); + // TODO: make async, and retry connecting within replayer + int r = replayer->init(); + if (r < 0) { + continue; + } + m_replayers.insert(std::make_pair(pool_peer, std::move(replayer))); } - m_replayers.insert(std::make_pair(peer, std::move(replayer))); } } } diff --git a/src/tools/rbd_mirror/Mirror.h b/src/tools/rbd_mirror/Mirror.h index 20efe0d94cc..88f06698530 100644 --- a/src/tools/rbd_mirror/Mirror.h +++ b/src/tools/rbd_mirror/Mirror.h @@ -47,8 +47,10 @@ public: void flush(); private: - void refresh_peers(const set &peers); - void update_replayers(const map > &peer_configs); + typedef ClusterWatcher::PoolPeers PoolPeers; + typedef std::pair PoolPeer; + + void update_replayers(const PoolPeers &pool_peers); CephContext *m_cct; std::vector m_args; @@ -59,8 +61,8 @@ private: // monitor local cluster for config changes in peers std::unique_ptr m_local_cluster_watcher; - std::map > m_replayers; std::shared_ptr m_image_deleter; + std::map > m_replayers; atomic_t m_stopping; bool m_manual_stop = false; MirrorAdminSocketHook *m_asok_hook;