]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
cephfs-mirror: spawn peer replayers on peer changes
authorVenky Shankar <vshankar@redhat.com>
Wed, 16 Sep 2020 04:23:00 +0000 (00:23 -0400)
committerVenky Shankar <vshankar@redhat.com>
Thu, 24 Sep 2020 12:18:11 +0000 (08:18 -0400)
Signed-off-by: Venky Shankar <vshankar@redhat.com>
src/tools/cephfs_mirror/FSMirror.cc
src/tools/cephfs_mirror/FSMirror.h

index 68cb5789b31e8b96c37f3c68f725940fb8b62ec8..b2952cfac07c6192516ea3ff998944588cfbe260 100644 (file)
@@ -154,14 +154,15 @@ int FSMirror::connect(std::string_view client_name, std::string_view cluster_nam
   return 0;
 }
 
-void FSMirror::run() {
+void FSMirror::run(PeerReplayer *peer_replayer) {
   dout(20) << dendl;
 
   std::unique_lock locker(m_lock);
   while (true) {
     dout(20) << ": trying to pick from " << m_directories.size() << " directories" << dendl;
-    m_cond.wait(locker, [this]{return m_directories.size() || is_stopping();});
-    if (is_stopping()) {
+    m_cond.wait(locker, [this, peer_replayer]{return m_directories.size() || peer_replayer->is_stopping();});
+    if (peer_replayer->is_stopping()) {
+      dout(5) << ": exiting" << dendl;
       break;
     }
 
@@ -171,19 +172,35 @@ void FSMirror::run() {
   }
 }
 
-void FSMirror::init_replayers() {
-  std::scoped_lock locker(m_lock);
+void FSMirror::init_replayers(PeerReplayer *peer_replayer) {
+  ceph_assert(ceph_mutex_is_locked(m_lock));
 
-  auto replayers = g_ceph_context->_conf.get_val<uint64_t>(
+  auto nr_replayers = g_ceph_context->_conf.get_val<uint64_t>(
     "cephfs_mirror_max_concurrent_directory_syncs");
-  dout(20) << ": spawning " << replayers << " snapshot replayer(s)" << dendl;
+  dout(20) << ": spawning " << nr_replayers << " snapshot replayer(s)" << dendl;
 
-  while (replayers-- > 0) {
-    std::unique_ptr<SnapshotReplayer> replayer(new SnapshotReplayer(this));
-    std::string name("replayer-" + stringify(replayers));
+  while (nr_replayers-- > 0) {
+    std::unique_ptr<SnapshotReplayerThread> replayer(
+      new SnapshotReplayerThread(this, peer_replayer));
+    std::string name("replayer-" + stringify(nr_replayers));
     replayer->create(name.c_str());
-    m_snapshot_replayers.push_back(std::move(replayer));
+    peer_replayer->replayers.push_back(std::move(replayer));
+  }
+}
+
+void FSMirror::shutdown_replayers(PeerReplayer *peer_replayer,
+                                  std::unique_lock<ceph::mutex> &locker) {
+  peer_replayer->stopping = true;
+  m_cond.notify_all();
+
+  locker.unlock();
+  // safe to iterate unlocked
+  for (auto &replayer : peer_replayer->replayers) {
+    replayer->join();
   }
+  locker.lock();
+
+  peer_replayer->replayers.clear();
 }
 
 void FSMirror::init(Context *on_finish) {
@@ -215,7 +232,7 @@ void FSMirror::shutdown(Context *on_finish) {
   dout(20) << dendl;
 
   {
-    std::scoped_lock locker(m_lock);
+    std::unique_lock locker(m_lock);
     m_stopping = true;
     m_cond.notify_all();
     if (m_on_init_finish != nullptr) {
@@ -232,9 +249,15 @@ void FSMirror::shutdown(Context *on_finish) {
     }
 
     m_on_shutdown_finish = on_finish;
+
+    for (auto &[peer, peer_replayer] : m_peer_replayers) {
+      dout(5) << ": shutting down replayer for peer=" << peer << dendl;
+      shutdown_replayers(&peer_replayer, locker);
+    }
+    m_peer_replayers.clear();
   }
 
-  wait_for_replayers();
+  shutdown_mirror_watcher();
 }
 
 void FSMirror::init_instance_watcher(Context *on_finish) {
@@ -305,17 +328,6 @@ void FSMirror::handle_init_mirror_watcher(int r) {
   shutdown_instance_watcher();
 }
 
-void FSMirror::wait_for_replayers() {
-  dout(20) << dendl;
-
-  for (auto &replayer : m_snapshot_replayers) {
-    replayer->join();
-  }
-
-  m_snapshot_replayers.clear();
-  shutdown_mirror_watcher();
-}
-
 void FSMirror::shutdown_mirror_watcher() {
   dout(20) << dendl;
 
@@ -382,22 +394,30 @@ void FSMirror::add_peer(const Peer &peer) {
   dout(10) << ": peer=" << peer << dendl;
 
   std::scoped_lock locker(m_lock);
-  m_peers.emplace(peer);
-  ceph_assert(m_peers.size() == 1); // support only a single peer
+  auto p = m_peer_replayers.emplace(peer, PeerReplayer());
+  ceph_assert(m_peer_replayers.size() == 1); // support only a single peer
+  if (p.second) {
+    init_replayers(&p.first->second);
+  }
 }
 
 void FSMirror::remove_peer(const Peer &peer) {
   dout(10) << ": peer=" << peer << dendl;
 
-  std::scoped_lock locker(m_lock);
-  m_peers.erase(peer);
+  std::unique_lock locker(m_lock);
+  auto it = m_peer_replayers.find(peer);
+  if (it != m_peer_replayers.end()) {
+    dout(5) << ": shutting down replayers for peer=" << peer << dendl;
+    shutdown_replayers(&it->second, locker);
+  }
+  m_peer_replayers.erase(it);
 }
 
 void FSMirror::mirror_status(Formatter *f) {
   std::scoped_lock locker(m_lock);
   f->open_object_section("status");
   f->open_object_section("peers");
-  for (auto &peer : m_peers) {
+  for ([[maybe_unused]] auto &[peer, peer_replayer] : m_peer_replayers) {
     peer.dump(f);
   }
   f->close_section(); // peers
index 28e5d127a6512df26603bfdee7be704d7d75fb58..993b291482b493924031df201fa804ef3849bbeb 100644 (file)
@@ -56,19 +56,32 @@ private:
     }
   };
 
-  class SnapshotReplayer : public Thread {
+  struct PeerReplayer;
+  class SnapshotReplayerThread : public Thread {
   public:
-    SnapshotReplayer(FSMirror *fs_mirror)
-      : m_fs_mirror(fs_mirror) {
+    SnapshotReplayerThread(FSMirror *fs_mirror, PeerReplayer *peer_replayer)
+      : m_fs_mirror(fs_mirror),
+        m_peer_replayer(peer_replayer) {
     }
 
     void *entry() override {
-      m_fs_mirror->run();
+      m_fs_mirror->run(m_peer_replayer);
       return 0;
     }
 
   private:
     FSMirror *m_fs_mirror;
+    PeerReplayer *m_peer_replayer;
+  };
+
+  typedef std::vector<std::unique_ptr<SnapshotReplayerThread>> SnapshotReplayers;
+  struct PeerReplayer {
+    SnapshotReplayers replayers;
+    bool stopping = false;
+
+    bool is_stopping() {
+      return stopping;
+    }
   };
 
   std::string m_fs_name;
@@ -79,9 +92,8 @@ private:
   ceph::mutex m_lock = ceph::make_mutex("cephfs::mirror::fs_mirror");
   ceph::condition_variable m_cond;
   SnapListener m_snap_listener;
-  std::set<Peer> m_peers;
   std::set<std::string, std::less<>> m_directories;
-  std::vector<std::unique_ptr<SnapshotReplayer>> m_snapshot_replayers;
+  std::map<Peer, PeerReplayer> m_peer_replayers;
 
   RadosRef m_cluster;
   std::string m_addrs;
@@ -91,14 +103,16 @@ private:
 
   int m_retval = 0;
   bool m_stopping = false;
+  bool m_init_failed = false;
   Context *m_on_init_finish = nullptr;
   Context *m_on_shutdown_finish = nullptr;
 
   MirrorAdminSocketHook *m_asok_hook = nullptr;
 
-  void run();
-  void init_replayers();
-  void wait_for_replayers();
+  void run(PeerReplayer *peer_replayer);
+  void init_replayers(PeerReplayer *peer_replayer);
+  void shutdown_replayers(PeerReplayer *peer_replayer,
+                          std::unique_lock<ceph::mutex> &locker);
 
   int connect(std::string_view cluster_name, std::string_view client_name,
               RadosRef *cluster);