]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
cephfs-mirror: switch to using PeerReplayer class
authorVenky Shankar <vshankar@redhat.com>
Wed, 30 Sep 2020 09:41:23 +0000 (05:41 -0400)
committerVenky Shankar <vshankar@redhat.com>
Tue, 12 Jan 2021 10:56:52 +0000 (05:56 -0500)
Signed-off-by: Venky Shankar <vshankar@redhat.com>
src/tools/cephfs_mirror/FSMirror.cc
src/tools/cephfs_mirror/FSMirror.h

index 9bd054fe6f82acc063692e642be7726043c6fa4a..15aa73f831c3fcbd2f1daafe560835f6d96579ff 100644 (file)
@@ -11,7 +11,9 @@
 #include "include/stringify.h"
 #include "msg/Messenger.h"
 #include "FSMirror.h"
+#include "PeerReplayer.h"
 #include "aio_utils.h"
+#include "Utils.h"
 
 #include "common/Cond.h"
 
@@ -86,7 +88,8 @@ private:
 
 FSMirror::FSMirror(CephContext *cct, const Filesystem &filesystem, uint64_t pool_id,
                    std::vector<const char*> args, ContextWQ *work_queue)
-  : m_filesystem(filesystem),
+  : m_cct(cct),
+    m_filesystem(filesystem),
     m_pool_id(pool_id),
     m_args(args),
     m_work_queue(work_queue),
@@ -108,104 +111,20 @@ FSMirror::~FSMirror() {
   delete m_asok_hook;
 }
 
-int FSMirror::connect(std::string_view client_name, std::string_view cluster_name,
-                      RadosRef *cluster) {
-  dout(20) << ": connecting to cluster=" << cluster_name << ", client=" << client_name
-           << dendl;
-
-  CephInitParameters iparams(CEPH_ENTITY_TYPE_CLIENT);
-  if (client_name.empty() || !iparams.name.from_str(client_name)) {
-    derr << ": error initializing cluster handle for " << cluster_name << dendl;
-    return -EINVAL;
-  }
-
-  CephContext *cct = common_preinit(iparams, CODE_ENVIRONMENT_LIBRARY,
-                                    CINIT_FLAG_UNPRIVILEGED_DAEMON_DEFAULTS);
-  cct->_conf->cluster = cluster_name;
-
-  int r = cct->_conf.parse_config_files(nullptr, nullptr, 0);
-  if (r < 0) {
-    derr << ": could not read ceph conf: " << ": " << cpp_strerror(r) << dendl;
-    return r;
-  }
-
-  cct->_conf.parse_env(cct->get_module_type());
-
-  std::vector<const char*> args;
-  r = cct->_conf.parse_argv(args);
-  if (r < 0) {
-    derr << ": could not parse environment: " << cpp_strerror(r) << dendl;
-    cct->put();
-    return r;
-  }
-  cct->_conf.parse_env(cct->get_module_type());
-
-  cluster->reset(new librados::Rados());
-
-  r = (*cluster)->init_with_context(cct);
-  ceph_assert(r == 0);
-  cct->put();
-
-  r = (*cluster)->connect();
-  if (r < 0) {
-    derr << ": error connecting to " << cluster_name << ": " << cpp_strerror(r)
-         << dendl;
-    return r;
-  }
-
-  dout(10) << ": connected to cluster=" << cluster_name << " using client="
-           << client_name << dendl;
-
-  return 0;
-}
-
-void FSMirror::run(PeerReplayer *peer_replayer) {
-  dout(20) << dendl;
-
-  std::unique_lock locker(m_lock);
-  while (true) {
-    dout(20) << ": trying to pick from " << m_directories.size() << " directories" << dendl;
-    m_cond.wait(locker, [this, peer_replayer]{return m_directories.size() || peer_replayer->is_stopping();});
-    if (peer_replayer->is_stopping()) {
-      dout(5) << ": exiting" << dendl;
-      break;
-    }
-
-    locker.unlock();
-    ::sleep(1);
-    locker.lock();
-  }
-}
-
-void FSMirror::init_replayers(PeerReplayer *peer_replayer) {
+int FSMirror::init_replayer(PeerReplayer *peer_replayer) {
   ceph_assert(ceph_mutex_is_locked(m_lock));
-
-  auto nr_replayers = g_ceph_context->_conf.get_val<uint64_t>(
-    "cephfs_mirror_max_concurrent_directory_syncs");
-  dout(20) << ": spawning " << nr_replayers << " snapshot replayer(s)" << dendl;
-
-  while (nr_replayers-- > 0) {
-    std::unique_ptr<SnapshotReplayerThread> replayer(
-      new SnapshotReplayerThread(this, peer_replayer));
-    std::string name("replayer-" + stringify(nr_replayers));
-    replayer->create(name.c_str());
-    peer_replayer->replayers.push_back(std::move(replayer));
-  }
+  return peer_replayer->init();
 }
 
-void FSMirror::shutdown_replayers(PeerReplayer *peer_replayer,
-                                  std::unique_lock<ceph::mutex> &locker) {
-  peer_replayer->stopping = true;
-  m_cond.notify_all();
-
-  locker.unlock();
-  // safe to iterate unlocked
-  for (auto &replayer : peer_replayer->replayers) {
-    replayer->join();
-  }
-  locker.lock();
+void FSMirror::shutdown_replayer(PeerReplayer *peer_replayer) {
+  peer_replayer->shutdown();
+}
 
-  peer_replayer->replayers.clear();
+void FSMirror::cleanup() {
+  dout(20) << dendl;
+  ceph_unmount(m_mount);
+  m_ioctx.close();
+  m_cluster.reset();
 }
 
 void FSMirror::init(Context *on_finish) {
@@ -219,17 +138,26 @@ void FSMirror::init(Context *on_finish) {
     return;
   }
 
-  m_addrs = m_cluster->get_addrs();
-  dout(10) << ": rados addrs=" << m_addrs << dendl;
-
   r = m_cluster->ioctx_create2(m_pool_id, m_ioctx);
   if (r < 0) {
+    m_cluster.reset();
     derr << ": error accessing local pool (id=" << m_pool_id << "): "
          << cpp_strerror(r) << dendl;
     on_finish->complete(r);
     return;
   }
 
+  r = mount(m_cluster, m_filesystem, true, &m_mount);
+  if (r < 0) {
+    m_ioctx.close();
+    m_cluster.reset();
+    on_finish->complete(r);
+    return;
+  }
+
+  m_addrs = m_cluster->get_addrs();
+  dout(10) << ": rados addrs=" << m_addrs << dendl;
+
   init_instance_watcher(on_finish);
 }
 
@@ -237,12 +165,12 @@ void FSMirror::shutdown(Context *on_finish) {
   dout(20) << dendl;
 
   {
-    std::unique_lock locker(m_lock);
+    std::scoped_lock locker(m_lock);
     m_stopping = true;
-    m_cond.notify_all();
     if (m_on_init_finish != nullptr) {
       dout(10) << ": delaying shutdown -- init in progress" << dendl;
       m_on_shutdown_finish = new LambdaContext([this, on_finish](int r) {
+                                                 cleanup();
                                                  if (r < 0) {
                                                    on_finish->complete(0);
                                                    return;
@@ -254,14 +182,13 @@ void FSMirror::shutdown(Context *on_finish) {
     }
 
     m_on_shutdown_finish = on_finish;
+  }
 
-    for (auto &[peer, peer_replayer] : m_peer_replayers) {
+  for (auto &[peer, peer_replayer] : m_peer_replayers) {
       dout(5) << ": shutting down replayer for peer=" << peer << dendl;
-      shutdown_replayers(&peer_replayer, locker);
-    }
-    m_peer_replayers.clear();
+      shutdown_replayer(peer_replayer.get());
   }
-
+  m_peer_replayers.clear();
   shutdown_mirror_watcher();
 }
 
@@ -382,7 +309,10 @@ void FSMirror::handle_acquire_directory(string_view dir_path) {
 
   std::scoped_lock locker(m_lock);
   m_directories.emplace(dir_path);
-  m_cond.notify_all();
+  for (auto &[peer, peer_replayer] : m_peer_replayers) {
+    dout(10) << ": peer=" << peer << dendl;
+    peer_replayer->add_directory(dir_path);
+  }
 }
 
 void FSMirror::handle_release_directory(string_view dir_path) {
@@ -392,6 +322,10 @@ void FSMirror::handle_release_directory(string_view dir_path) {
   auto it = m_directories.find(dir_path);
   if (it != m_directories.end()) {
     m_directories.erase(it);
+    for (auto &[peer, peer_replayer] : m_peer_replayers) {
+      dout(10) << ": peer=" << peer << dendl;
+      peer_replayer->remove_directory(dir_path);
+    }
   }
 }
 
@@ -400,23 +334,37 @@ void FSMirror::add_peer(const Peer &peer) {
 
   std::scoped_lock locker(m_lock);
   m_all_peers.emplace(peer);
-  auto p = m_peer_replayers.emplace(peer, PeerReplayer());
-  ceph_assert(m_peer_replayers.size() == 1); // support only a single peer
-  if (p.second) {
-    init_replayers(&p.first->second);
+  if (m_peer_replayers.find(peer) != m_peer_replayers.end()) {
+    return;
+  }
+
+  auto replayer = std::make_unique<PeerReplayer>(
+    m_cct, this, m_filesystem, peer, m_directories, m_mount);
+  int r = init_replayer(replayer.get());
+  if (r < 0) {
+    return;
   }
+  m_peer_replayers.emplace(peer, std::move(replayer));
+  ceph_assert(m_peer_replayers.size() == 1); // support only a single peer
 }
 
 void FSMirror::remove_peer(const Peer &peer) {
   dout(10) << ": peer=" << peer << dendl;
 
-  std::unique_lock locker(m_lock);
-  m_all_peers.erase(peer);
-  auto it = m_peer_replayers.find(peer);
-  if (it != m_peer_replayers.end()) {
+  std::unique_ptr<PeerReplayer> replayer;
+  {
+    std::scoped_lock locker(m_lock);
+    m_all_peers.erase(peer);
+    auto it = m_peer_replayers.find(peer);
+    if (it != m_peer_replayers.end()) {
+      replayer = std::move(it->second);
+      m_peer_replayers.erase(it);
+    }
+  }
+
+  if (replayer) {
     dout(5) << ": shutting down replayers for peer=" << peer << dendl;
-    shutdown_replayers(&it->second, locker);
-    m_peer_replayers.erase(it);
+    shutdown_replayer(replayer.get());
   }
 }
 
index 7605529ca0f77f628bcc3d989509da527aea2ba0..f7939c143b7908c83990645a441ff9e4cd3e0b44 100644 (file)
@@ -17,6 +17,7 @@ namespace cephfs {
 namespace mirror {
 
 class MirrorAdminSocketHook;
+class PeerReplayer;
 
 // handle mirroring for a filesystem to a set of peers
 
@@ -89,45 +90,17 @@ private:
     }
   };
 
-  struct PeerReplayer;
-  class SnapshotReplayerThread : public Thread {
-  public:
-    SnapshotReplayerThread(FSMirror *fs_mirror, PeerReplayer *peer_replayer)
-      : m_fs_mirror(fs_mirror),
-        m_peer_replayer(peer_replayer) {
-    }
-
-    void *entry() override {
-      m_fs_mirror->run(m_peer_replayer);
-      return 0;
-    }
-
-  private:
-    FSMirror *m_fs_mirror;
-    PeerReplayer *m_peer_replayer;
-  };
-
-  typedef std::vector<std::unique_ptr<SnapshotReplayerThread>> SnapshotReplayers;
-  struct PeerReplayer {
-    SnapshotReplayers replayers;
-    bool stopping = false;
-
-    bool is_stopping() {
-      return stopping;
-    }
-  };
-
+  CephContext *m_cct;
   Filesystem m_filesystem;
   uint64_t m_pool_id;
   std::vector<const char *> m_args;
   ContextWQ *m_work_queue;
 
   ceph::mutex m_lock = ceph::make_mutex("cephfs::mirror::fs_mirror");
-  ceph::condition_variable m_cond;
   SnapListener m_snap_listener;
   std::set<std::string, std::less<>> m_directories;
   Peers m_all_peers;
-  std::map<Peer, PeerReplayer> m_peer_replayers;
+  std::map<Peer, std::unique_ptr<PeerReplayer>> m_peer_replayers;
 
   RadosRef m_cluster;
   std::string m_addrs;
@@ -143,13 +116,11 @@ private:
 
   MirrorAdminSocketHook *m_asok_hook = nullptr;
 
-  void run(PeerReplayer *peer_replayer);
-  void init_replayers(PeerReplayer *peer_replayer);
-  void shutdown_replayers(PeerReplayer *peer_replayer,
-                          std::unique_lock<ceph::mutex> &locker);
+  MountRef m_mount;
 
-  int connect(std::string_view cluster_name, std::string_view client_name,
-              RadosRef *cluster);
+  int init_replayer(PeerReplayer *peer_replayer);
+  void shutdown_replayer(PeerReplayer *peer_replayer);
+  void cleanup();
 
   void init_instance_watcher(Context *on_finish);
   void handle_init_instance_watcher(int r);