]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rbd-mirror: group peers by local pools
authorJason Dillaman <dillaman@redhat.com>
Thu, 26 May 2016 20:03:59 +0000 (16:03 -0400)
committerJason Dillaman <dillaman@redhat.com>
Fri, 27 May 2016 03:50:04 +0000 (23:50 -0400)
The peer structure is automatically unique per pool due to its
UUID, so grouping local pools by a single peer doesn't work.

Signed-off-by: Jason Dillaman <dillaman@redhat.com>
src/test/rbd_mirror/test_ClusterWatcher.cc
src/tools/rbd_mirror/ClusterWatcher.cc
src/tools/rbd_mirror/ClusterWatcher.h
src/tools/rbd_mirror/Mirror.cc
src/tools/rbd_mirror/Mirror.h

index 2d7d3f2717fc7495354c53ce1110011c92e0d5c6..204064ce35bee18eb28463a1727c3a3e2a2f42c7 100644 (file)
@@ -60,7 +60,7 @@ public:
                                            uuid != nullptr ? uuid : &gen_uuid,
                                           peer.cluster_name,
                                           peer.client_name));
-      m_peer_configs[peer].insert(pool_id);
+      m_pool_peers[pool_id].insert(peer);
       m_mirrored_pools.insert(pool_name);
     }
     if (name != nullptr) {
@@ -71,11 +71,11 @@ public:
   void delete_pool(const string &name, const peer_t &peer) {
     int64_t pool_id = m_cluster->pool_lookup(name.c_str());
     ASSERT_GE(pool_id, 0);
-    if (m_peer_configs.find(peer) != m_peer_configs.end()) {
-      m_peer_configs[peer].erase(pool_id);
+    if (m_pool_peers.find(pool_id) != m_pool_peers.end()) {
+      m_pool_peers[pool_id].erase(peer);
       m_mirrored_pools.erase(name);
-      if (m_peer_configs[peer].empty()) {
-       m_peer_configs.erase(peer);
+      if (m_pool_peers[pool_id].empty()) {
+       m_pool_peers.erase(pool_id);
       }
     }
     m_pools.erase(name);
@@ -121,7 +121,7 @@ public:
   void check_peers() {
     m_cluster_watcher->refresh_pools();
     Mutex::Locker l(m_lock);
-    ASSERT_EQ(m_peer_configs, m_cluster_watcher->get_peer_configs());
+    ASSERT_EQ(m_pool_peers, m_cluster_watcher->get_pool_peers());
     ASSERT_EQ(m_mirrored_pools, m_cluster_watcher->get_pool_names());
   }
 
@@ -131,7 +131,7 @@ public:
 
   set<string> m_pools;
   set<string> m_mirrored_pools;
-  map<peer_t, set<int64_t> > m_peer_configs;
+  ClusterWatcher::PoolPeers m_pool_peers;
 };
 
 TEST_F(TestClusterWatcher, NoPools) {
index 4a69573138a0d6de7b333f0c4d3bbef6d1105122..516e1e3deef91bde16774d421171551c635e4d38 100644 (file)
@@ -32,13 +32,13 @@ ClusterWatcher::ClusterWatcher(RadosRef cluster, Mutex &lock) :
 {
 }
 
-const map<peer_t, set<int64_t> >& ClusterWatcher::get_peer_configs() const
+const ClusterWatcher::PoolPeers& ClusterWatcher::get_pool_peers() const
 {
   assert(m_lock.is_locked());
-  return m_peer_configs;
+  return m_pool_peers;
 }
 
-const std::set<std::string>& ClusterWatcher::get_pool_names() const
+const ClusterWatcher::PoolNames& ClusterWatcher::get_pool_names() const
 {
   assert(m_lock.is_locked());
   return m_pool_names;
@@ -47,19 +47,20 @@ const std::set<std::string>& ClusterWatcher::get_pool_names() const
 void ClusterWatcher::refresh_pools()
 {
   dout(20) << "enter" << dendl;
-  map<peer_t, set<int64_t> > peer_configs;
-  set<string> pool_names;
-  read_configs(&peer_configs, &pool_names);
+
+  PoolPeers pool_peers;
+  PoolNames pool_names;
+  read_pool_peers(&pool_peers, &pool_names);
 
   Mutex::Locker l(m_lock);
-  m_peer_configs = peer_configs;
+  m_pool_peers = pool_peers;
   m_pool_names = pool_names;
   // TODO: perhaps use a workqueue instead, once we get notifications
   // about config changes for existing pools
 }
 
-void ClusterWatcher::read_configs(map<peer_t, set<int64_t> > *peer_configs,
-                                 set<string> *pool_names)
+void ClusterWatcher::read_pool_peers(PoolPeers *pool_peers,
+                                    PoolNames *pool_names)
 {
   list<pair<int64_t, string> > pools;
   int r = m_cluster->pool_list2(pools);
@@ -115,12 +116,7 @@ void ClusterWatcher::read_configs(map<peer_t, set<int64_t> > *peer_configs,
       continue;
     }
 
-    for (peer_t peer : configs) {
-      dout(20) << "pool " << pool_name << " has mirroring enabled for peer "
-              << peer << dendl;
-      (*peer_configs)[peer].insert(pool_id);
-    }
-
+    pool_peers->insert({pool_id, Peers{configs.begin(), configs.end()}});
     pool_names->insert(pool_name);
   }
 }
index d7087484a12eee99278922d374ea326e19a694d2..b21e49a45743931aa008db2355491352ea0883e3 100644 (file)
@@ -23,23 +23,27 @@ namespace mirror {
  */
 class ClusterWatcher {
 public:
+  typedef std::set<peer_t> Peers;
+  typedef std::map<int64_t, Peers>  PoolPeers;
+  typedef std::set<std::string> PoolNames;
+
   ClusterWatcher(RadosRef cluster, Mutex &lock);
   ~ClusterWatcher() = default;
   ClusterWatcher(const ClusterWatcher&) = delete;
   ClusterWatcher& operator=(const ClusterWatcher&) = delete;
+
   // Caller controls frequency of calls
   void refresh_pools();
-  const std::map<peer_t, std::set<int64_t> >& get_peer_configs() const;
-  const std::set<std::string>& get_pool_names() const;
+  const PoolPeers& get_pool_peers() const;
+  const PoolNames& get_pool_names() const;
 
 private:
-  void read_configs(std::map<peer_t, std::set<int64_t> > *peer_configs,
-                   std::set<std::string> *pool_names);
-
   Mutex &m_lock;
   RadosRef m_cluster;
-  std::map<peer_t, std::set<int64_t> > m_peer_configs;
-  std::set<std::string> m_pool_names;
+  PoolPeers m_pool_peers;
+  PoolNames m_pool_names;
+
+  void read_pool_peers(PoolPeers *pool_peers, PoolNames *pool_names);
 };
 
 } // namespace mirror
index 02dda7cdad065354356b82dc8cc59927e6b9abc5..21dac318bf064194c2b33877fed620e144f52e04 100644 (file)
@@ -230,7 +230,7 @@ void Mirror::run()
     m_local_cluster_watcher->refresh_pools();
     Mutex::Locker l(m_lock);
     if (!m_manual_stop) {
-      update_replayers(m_local_cluster_watcher->get_peer_configs());
+      update_replayers(m_local_cluster_watcher->get_pool_peers());
     }
     // TODO: make interval configurable
     m_cond.WaitInterval(g_ceph_context, m_lock, seconds(30));
@@ -338,35 +338,41 @@ void Mirror::flush()
   }
 }
 
-void Mirror::update_replayers(const map<peer_t, set<int64_t> > &peer_configs)
+void Mirror::update_replayers(const PoolPeers &pool_peers)
 {
   dout(20) << "enter" << dendl;
   assert(m_lock.is_locked());
 
   // remove stale replayers before creating new replayers
   for (auto it = m_replayers.begin(); it != m_replayers.end();) {
-    peer_t peer = it->first;
-    if (peer_configs.find(peer) == peer_configs.end()) {
+    auto next_it(it);
+    ++next_it;
+
+    auto &peer = it->first.second;
+    auto pool_peer_it = pool_peers.find(it->first.first);
+    if (pool_peer_it == pool_peers.end() ||
+        pool_peer_it->second.find(peer) == pool_peer_it->second.end()) {
       dout(20) << "removing replayer for " << peer << dendl;
       // TODO: make async
-      m_replayers.erase(it++);
-    } else {
-      ++it;
+      m_replayers.erase(it);
     }
+    it = next_it;
   }
 
-  for (auto &kv : peer_configs) {
-    const peer_t &peer = kv.first;
-    if (m_replayers.find(peer) == m_replayers.end()) {
-      dout(20) << "starting replayer for " << peer << dendl;
-      unique_ptr<Replayer> replayer(new Replayer(m_threads, m_image_deleter,
-                                                 m_local, peer, m_args));
-      // TODO: make async, and retry connecting within replayer
-      int r = replayer->init();
-      if (r < 0) {
-       continue;
+  for (auto &kv : pool_peers) {
+    for (auto &peer : kv.second) {
+      PoolPeer pool_peer(kv.first, peer);
+      if (m_replayers.find(pool_peer) == m_replayers.end()) {
+        dout(20) << "starting replayer for " << peer << dendl;
+        unique_ptr<Replayer> replayer(new Replayer(m_threads, m_image_deleter,
+                                                   m_local, peer, m_args));
+        // TODO: make async, and retry connecting within replayer
+        int r = replayer->init();
+        if (r < 0) {
+         continue;
+        }
+        m_replayers.insert(std::make_pair(pool_peer, std::move(replayer)));
       }
-      m_replayers.insert(std::make_pair(peer, std::move(replayer)));
     }
   }
 }
index 20efe0d94cc13e950e10570ad7065b1b3287a41f..88f0669853013d0f47a98e5379d233ff256e136f 100644 (file)
@@ -47,8 +47,10 @@ public:
   void flush();
 
 private:
-  void refresh_peers(const set<peer_t> &peers);
-  void update_replayers(const map<peer_t, set<int64_t> > &peer_configs);
+  typedef ClusterWatcher::PoolPeers PoolPeers;
+  typedef std::pair<int64_t, peer_t> PoolPeer;
+
+  void update_replayers(const PoolPeers &pool_peers);
 
   CephContext *m_cct;
   std::vector<const char*> m_args;
@@ -59,8 +61,8 @@ private:
 
   // monitor local cluster for config changes in peers
   std::unique_ptr<ClusterWatcher> m_local_cluster_watcher;
-  std::map<peer_t, std::unique_ptr<Replayer> > m_replayers;
   std::shared_ptr<ImageDeleter> m_image_deleter;
+  std::map<PoolPeer, std::unique_ptr<Replayer> > m_replayers;
   atomic_t m_stopping;
   bool m_manual_stop = false;
   MirrorAdminSocketHook *m_asok_hook;