]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
cephfs-mirror: periodically drive mirror enable, disable and peer updates
authorVenky Shankar <vshankar@redhat.com>
Wed, 16 Sep 2020 07:23:05 +0000 (03:23 -0400)
committerVenky Shankar <vshankar@redhat.com>
Thu, 24 Sep 2020 12:18:11 +0000 (08:18 -0400)
... and switch using (name, id) tuple to identify a filesystem.

Signed-off-by: Venky Shankar <vshankar@redhat.com>
src/common/options.cc
src/tools/cephfs_mirror/FSMirror.cc
src/tools/cephfs_mirror/FSMirror.h
src/tools/cephfs_mirror/Mirror.cc
src/tools/cephfs_mirror/Mirror.h

index 286d5bf7719dd6c8a155b5d0c80dbb66ce82a44a..e2ca57afbf145398134fb30be13c30b1a30d72fd 100644 (file)
@@ -8778,6 +8778,13 @@ std::vector<Option> get_cephfs_mirror_options() {
     .set_default("random")
     .set_description("policy for choosing directories to mirror snapshots")
     .set_long_description("policy used by cephfs-mirror daemon to choose directories for snapshot mirroring"),
+
+    Option("cephfs_mirror_mirror_action_update_interval", Option::TYPE_SECS, Option::LEVEL_ADVANCED)
+    .set_default(2)
+    .set_min(1)
+    .set_description("")
+    .set_long_description(""),
+
     });
 }
 
index e6563bdd1c47de2b746f1da7ef60ad2c580a643e..c865d943c5c1dcba9cbf33b172a7e507d6ec22e4 100644 (file)
@@ -50,12 +50,13 @@ private:
 
 class MirrorAdminSocketHook : public AdminSocketHook {
 public:
-  MirrorAdminSocketHook(CephContext *cct, std::string_view fs_name, FSMirror *fs_mirror)
+  MirrorAdminSocketHook(CephContext *cct, const Filesystem &filesystem, FSMirror *fs_mirror)
     : admin_socket(cct->get_admin_socket()) {
     int r;
     std::string cmd;
 
-    cmd = "fs mirror status " + std::string(fs_name);
+    // mirror status format is name@fscid
+    cmd = "fs mirror status " + stringify(filesystem.fs_name) + "@" + stringify(filesystem.fscid);
     r = admin_socket->register_command(
       cmd, this, "get filesystem mirror status");
     if (r == 0) {
@@ -83,14 +84,14 @@ private:
   Commands commands;
 };
 
-FSMirror::FSMirror(CephContext *cct, std::string_view fs_name, uint64_t pool_id,
+FSMirror::FSMirror(CephContext *cct, const Filesystem &filesystem, uint64_t pool_id,
                    std::vector<const char*> args, ContextWQ *work_queue)
-  : m_fs_name(fs_name),
+  : m_filesystem(filesystem),
     m_pool_id(pool_id),
     m_args(args),
     m_work_queue(work_queue),
     m_snap_listener(this),
-    m_asok_hook(new MirrorAdminSocketHook(cct, fs_name, this)) {
+    m_asok_hook(new MirrorAdminSocketHook(cct, filesystem, this)) {
 }
 
 FSMirror::~FSMirror() {
index d5584497a630f63dc6bb45b247862aabb02ee657..cef0e57d7d867d6b675eea4ab16fe14d044a23e0 100644 (file)
@@ -22,7 +22,7 @@ class MirrorAdminSocketHook;
 
 class FSMirror {
 public:
-  FSMirror(CephContext *cct, std::string_view fs_name, uint64_t pool_id,
+  FSMirror(CephContext *cct, const Filesystem &filesystem, uint64_t pool_id,
            std::vector<const char*> args, ContextWQ *work_queue);
   ~FSMirror();
 
@@ -90,7 +90,7 @@ private:
     }
   };
 
-  std::string m_fs_name;
+  Filesystem m_filesystem;
   uint64_t m_pool_id;
   std::vector<const char *> m_args;
   ContextWQ *m_work_queue;
index 9eb1ba371246e5e8ba6486877ec730e8a46b1bbe..c40349799c894a65657c7cff84679c45fcb1af00 100644 (file)
@@ -29,7 +29,6 @@ namespace {
 
 class SafeTimerSingleton : public SafeTimer {
 public:
-  SafeTimer *timer;
   ceph::mutex timer_lock = ceph::make_mutex("cephfs::mirror::timer_lock");
 
   explicit SafeTimerSingleton(CephContext *cct)
@@ -64,6 +63,97 @@ public:
 
 } // anonymous namespace
 
+struct Mirror::C_EnableMirroring : Context {
+  Mirror *mirror;
+  Filesystem filesystem;
+  uint64_t pool_id;
+
+  C_EnableMirroring(Mirror *mirror, const Filesystem &filesystem, uint64_t pool_id)
+    : mirror(mirror),
+      filesystem(filesystem),
+      pool_id(pool_id) {
+  }
+
+  void finish(int r) override {
+    enable_mirroring();
+  }
+
+  void enable_mirroring() {
+    Context *ctx = new C_CallbackAdapter<C_EnableMirroring,
+                                         &C_EnableMirroring::handle_enable_mirroring>(this);
+    mirror->enable_mirroring(filesystem, pool_id, ctx);
+  }
+
+  void handle_enable_mirroring(int r) {
+    mirror->handle_enable_mirroring(filesystem, r);
+    delete this;
+  }
+
+  // context needs to live post completion
+  void complete(int r) override {
+    finish(r);
+  }
+};
+
+struct Mirror::C_DisableMirroring : Context {
+  Mirror *mirror;
+  Filesystem filesystem;
+
+  C_DisableMirroring(Mirror *mirror, const Filesystem &filesystem)
+    : mirror(mirror),
+      filesystem(filesystem) {
+  }
+
+  void finish(int r) override {
+    disable_mirroring();
+  }
+
+  void disable_mirroring() {
+    Context *ctx = new C_CallbackAdapter<C_DisableMirroring,
+                                         &C_DisableMirroring::handle_disable_mirroring>(this);
+    mirror->disable_mirroring(filesystem, ctx);
+  }
+
+  void handle_disable_mirroring(int r) {
+    mirror->handle_disable_mirroring(filesystem, r);
+    delete this;
+  }
+
+  // context needs to live post completion
+  void complete(int r) override {
+    finish(r);
+  }
+};
+
+struct Mirror::C_PeerUpdate : Context {
+  Mirror *mirror;
+  Filesystem filesystem;
+  Peer peer;
+  bool remove = false;
+
+  C_PeerUpdate(Mirror *mirror, const Filesystem &filesystem,
+               const Peer &peer)
+    : mirror(mirror),
+      filesystem(filesystem),
+      peer(peer) {
+  }
+  C_PeerUpdate(Mirror *mirror, const Filesystem &filesystem,
+               const Peer &peer, bool remove)
+    : mirror(mirror),
+      filesystem(filesystem),
+      peer(peer),
+      remove(remove) {
+  }
+
+  void finish(int r) override {
+    if (remove) {
+      mirror->remove_peer(filesystem, peer);
+    } else {
+      mirror->add_peer(filesystem, peer);
+    }
+  }
+};
+
 Mirror::Mirror(CephContext *cct, const std::vector<const char*> &args,
                 MonClient *monc, Messenger *msgr)
   : m_cct(cct),
@@ -76,7 +166,10 @@ Mirror::Mirror(CephContext *cct, const std::vector<const char*> &args,
   auto safe_timer = &(cct->lookup_or_create_singleton_object<SafeTimerSingleton>(
                         "cephfs::mirror::safe_timer", false, cct));
   m_work_queue = thread_pool->work_queue;
-  m_timer = safe_timer->timer;
+  m_timer = safe_timer;
+  m_timer_lock = &safe_timer->timer_lock;
+  std::scoped_lock timer_lock(*m_timer_lock);
+  schedule_mirror_update_task();
 }
 
 Mirror::~Mirror() {
@@ -122,7 +215,7 @@ void Mirror::shutdown() {
   dout(20) << dendl;
 
   std::unique_lock locker(m_lock);
-  if (m_fs_mirrors.empty()) {
+  if (m_mirror_actions.empty()) {
     return;
   }
 
@@ -138,112 +231,180 @@ void Mirror::handle_signal(int signum) {
   ::exit(0);
 }
 
-void Mirror::handle_mirroring_enabled(const std::string &fs_name, int r) {
-  dout(20) << ": fs_name=" << fs_name << ", r=" << r << dendl;
+void Mirror::handle_mirroring_enabled(const Filesystem &filesystem, int r) {
+  dout(20) << ": filesystem=" << filesystem << ", r=" << r << dendl;
 
   std::scoped_lock locker(m_lock);
+  auto &mirror_action = m_mirror_actions.at(filesystem);
+  ceph_assert(mirror_action.action_in_progress);
+
+  mirror_action.action_in_progress = false;
+  m_cond.notify_all();
   if (r < 0) {
-    derr << ": failed to initialize FSMirror for filesystem=" << fs_name
+    derr << ": failed to initialize FSMirror for filesystem=" << filesystem
          << ": " << cpp_strerror(r) << dendl;
-    if (!m_stopping) {
-      m_fs_mirrors.erase(fs_name);
-    }
-
     return;
   }
 
-  dout(10) << ": Initialized FSMirror for filesystem=" << fs_name << dendl;
+  dout(10) << ": Initialized FSMirror for filesystem=" << filesystem << dendl;
+}
+
+void Mirror::enable_mirroring(const Filesystem &filesystem, uint64_t local_pool_id,
+                              Context *on_finish) {
+  ceph_assert(ceph_mutex_is_locked(m_lock));
+
+  auto &mirror_action = m_mirror_actions.at(filesystem);
+  ceph_assert(!mirror_action.fs_mirror);
+  ceph_assert(!mirror_action.action_in_progress);
+
+  dout(10) << ": starting FSMirror: filesystem=" << filesystem << dendl;
+
+  mirror_action.action_in_progress = true;
+  mirror_action.fs_mirror = std::make_unique<FSMirror>(m_cct, filesystem, local_pool_id,
+                                                       m_args, m_work_queue);
+  mirror_action.fs_mirror->init(new C_AsyncCallback<ContextWQ>(m_work_queue, on_finish));
 }
 
-void Mirror::mirroring_enabled(const std::string &fs_name, uint64_t local_pool_id) {
-  dout(10) << ": fs_name=" << fs_name << ", pool_id=" << local_pool_id << dendl;
+void Mirror::mirroring_enabled(const Filesystem &filesystem, uint64_t local_pool_id) {
+  dout(10) << ": filesystem=" << filesystem << ", pool_id=" << local_pool_id << dendl;
 
   std::scoped_lock locker(m_lock);
   if (m_stopping) {
     return;
   }
 
-  // TODO: handle consecutive overlapping enable/disable calls
-  ceph_assert(m_fs_mirrors.find(fs_name) == m_fs_mirrors.end());
-
-  dout(10) << ": starting FSMirror: fs_name=" << fs_name << dendl;
-  std::unique_ptr<FSMirror> fs_mirror(new FSMirror(m_cct, fs_name, local_pool_id,
-                                                   m_args, m_work_queue));
-  Context *on_finish = new LambdaContext([this, fs_name](int r) {
-                                           handle_mirroring_enabled(fs_name, r);
-                                         });
-  fs_mirror->init(new C_AsyncCallback<ContextWQ>(m_work_queue, on_finish));
-  m_fs_mirrors.emplace(fs_name, std::move(fs_mirror));
+  auto p = m_mirror_actions.emplace(filesystem, MirrorAction());
+  auto &mirror_action = p.first->second;
+  mirror_action.action_ctxs.push_back(new C_EnableMirroring(this, filesystem, local_pool_id));
 }
 
-void Mirror::mirroring_disabled(const std::string &fs_name) {
-  dout(10) << ": fs_name=" << fs_name << dendl;
+void Mirror::handle_disable_mirroring(const Filesystem &filesystem, int r) {
+  dout(10) << ": filesystem=" << filesystem << ", r=" << r << dendl;
 
   std::scoped_lock locker(m_lock);
-  if (!m_fs_mirrors.count(fs_name)) {
-    dout(5) << ": fs mirror not found -- init failure(?) for " << fs_name
-            << dendl;
-    return;
-  }
+  auto &mirror_action = m_mirror_actions.at(filesystem);
 
-  if (m_stopping) {
-    dout(5) << "shutting down" << dendl;
-    return;
+  if (!mirror_action.fs_mirror->is_failed()) {
+    ceph_assert(mirror_action.action_in_progress);
+    mirror_action.action_in_progress = false;
+    m_cond.notify_all();
   }
 
-  auto &fs_mirror = m_fs_mirrors.at(fs_name);
-  if (!fs_mirror->is_stopping()) {
-    Context *on_finish = new LambdaContext([this, fs_name](int r) {
-                                             handle_shutdown(fs_name, r);
-                                           });
-    fs_mirror->shutdown(new C_AsyncCallback<ContextWQ>(m_work_queue, on_finish));
+  if (!m_stopping) {
+    mirror_action.fs_mirror.reset();
+    if (mirror_action.action_ctxs.empty()) {
+      dout(10) << ": no pending actions for filesystem=" << filesystem << dendl;
+      m_mirror_actions.erase(filesystem);
+    }
   }
 }
 
-void Mirror::peer_added(const std::string &fs_name, const Peer &peer) {
-  dout(20) << ": fs_name=" << fs_name << ", peer=" << peer << dendl;
+void Mirror::disable_mirroring(const Filesystem &filesystem, Context *on_finish) {
+  ceph_assert(ceph_mutex_is_locked(m_lock));
 
-  std::scoped_lock locker(m_lock);
-  if (!m_fs_mirrors.count(fs_name)) {
-    dout(5) << ": fs mirror not found -- init failure(?) for " << fs_name
-            << dendl;
+  auto &mirror_action = m_mirror_actions.at(filesystem);
+  ceph_assert(mirror_action.fs_mirror);
+  ceph_assert(!mirror_action.action_in_progress);
+
+  if (mirror_action.fs_mirror->is_failed()) {
+    dout(10) << ": init failed for filesystem=" << filesystem << dendl;
+    m_work_queue->queue(on_finish, -EINVAL);
     return;
   }
 
+  mirror_action.action_in_progress = true;
+  mirror_action.fs_mirror->shutdown(new C_AsyncCallback<ContextWQ>(m_work_queue, on_finish));
+}
+
+void Mirror::mirroring_disabled(const Filesystem &filesystem) {
+  dout(10) << ": filesystem=" << filesystem << dendl;
+
+  std::scoped_lock locker(m_lock);
   if (m_stopping) {
     dout(5) << "shutting down" << dendl;
     return;
   }
 
-  auto &fs_mirror = m_fs_mirrors.at(fs_name);
-  fs_mirror->add_peer(peer);
+  auto &mirror_action = m_mirror_actions.at(filesystem);
+  mirror_action.action_ctxs.push_back(new C_DisableMirroring(this, filesystem));
+}
+
+void Mirror::add_peer(const Filesystem &filesystem, const Peer &peer) {
+  ceph_assert(ceph_mutex_is_locked(m_lock));
+
+  auto &mirror_action = m_mirror_actions.at(filesystem);
+  ceph_assert(mirror_action.fs_mirror);
+  ceph_assert(!mirror_action.action_in_progress);
+
+  mirror_action.fs_mirror->add_peer(peer);
 }
 
-void Mirror::peer_removed(const std::string &fs_name, const Peer &peer) {
-  dout(20) << ": fs_name=" << fs_name << ", peer=" << peer << dendl;
+void Mirror::peer_added(const Filesystem &filesystem, const Peer &peer) {
+  dout(20) << ": filesystem=" << filesystem << ", peer=" << peer << dendl;
 
   std::scoped_lock locker(m_lock);
-  if (!m_fs_mirrors.count(fs_name)) {
-    dout(5) << ": fs mirror not found -- init failure(?) for " << fs_name
-            << dendl;
+  if (m_stopping) {
+    dout(5) << "shutting down" << dendl;
     return;
   }
 
+  auto &mirror_action = m_mirror_actions.at(filesystem);
+  mirror_action.action_ctxs.push_back(new C_PeerUpdate(this, filesystem, peer));
+}
+
+void Mirror::remove_peer(const Filesystem &filesystem, const Peer &peer) {
+  ceph_assert(ceph_mutex_is_locked(m_lock));
+
+  auto &mirror_action = m_mirror_actions.at(filesystem);
+  ceph_assert(mirror_action.fs_mirror);
+  ceph_assert(!mirror_action.action_in_progress);
+
+  mirror_action.fs_mirror->remove_peer(peer);
+}
+
+void Mirror::peer_removed(const Filesystem &filesystem, const Peer &peer) {
+  dout(20) << ": filesystem=" << filesystem << ", peer=" << peer << dendl;
+
+  std::scoped_lock locker(m_lock);
   if (m_stopping) {
     dout(5) << "shutting down" << dendl;
     return;
   }
 
-  auto &fs_mirror = m_fs_mirrors.at(fs_name);
-  fs_mirror->remove_peer(peer);
+  auto &mirror_action = m_mirror_actions.at(filesystem);
+  mirror_action.action_ctxs.push_back(new C_PeerUpdate(this, filesystem, peer, true));
 }
 
-void Mirror::handle_shutdown(const std::string &fs_name, int r) {
-  dout(10) << ": fs_name=" << fs_name << ", r=" << r << dendl;
+void Mirror::update_fs_mirrors() {
+  dout(20) << dendl;
 
-  std::scoped_lock locker(m_lock);
-  m_fs_mirrors.erase(fs_name);
-  m_cond.notify_all();
+  {
+    std::scoped_lock locker(m_lock);
+    for (auto &[filesystem, mirror_action] : m_mirror_actions) {
+      if (!mirror_action.action_ctxs.empty() && !mirror_action.action_in_progress) {
+        auto ctx = std::move(mirror_action.action_ctxs.front());
+        mirror_action.action_ctxs.pop_front();
+        ctx->complete(0);
+      }
+    }
+  }
+
+  schedule_mirror_update_task();
+}
+
+void Mirror::schedule_mirror_update_task() {
+  ceph_assert(m_timer_task == nullptr);
+  ceph_assert(ceph_mutex_is_locked(*m_timer_lock));
+
+  m_timer_task = new LambdaContext([this](int _) {
+                                     m_timer_task = nullptr;
+                                     update_fs_mirrors();
+                                   });
+  double after = g_ceph_context->_conf.get_val<std::chrono::seconds>
+    ("cephfs_mirror_mirror_action_update_interval").count();
+  dout(20) << ": scheduling fs mirror update (" << m_timer_task << ") after "
+           << after << " seconds" << dendl;
+  m_timer->add_event_after(after, m_timer_task);
 }
 
 void Mirror::run() {
@@ -256,20 +417,30 @@ void Mirror::run() {
   m_cluster_watcher->init();
   m_cond.wait(locker, [this]{return m_stopping;});
 
-  for (auto &[fs_name, fs_mirror] : m_fs_mirrors) {
-    dout(10) << ": shutting down mirror for fs_name=" << fs_name << dendl;
-    if (fs_mirror->is_stopping()) {
-      dout(10) << ": fs_name=" << fs_name << " is under shutdown" << dendl;
-      continue;
+  locker.unlock();
+  {
+    std::scoped_lock timer_lock(*m_timer_lock);
+    if (m_timer_task != nullptr) {
+      dout(10) << ": canceling timer task=" << m_timer_task << dendl;
+      m_timer->cancel_event(m_timer_task);
+      m_timer_task = nullptr;
+    }
+  }
+  locker.lock();
+
+  for (auto &[filesystem, mirror_action] : m_mirror_actions) {
+    dout(10) << ": trying to shutdown filesystem=" << filesystem << dendl;
+    // wait for in-progress action and shutdown
+    m_cond.wait(locker, [this, &mirror_action] {return !mirror_action.action_in_progress;});
+    if (mirror_action.fs_mirror &&
+        !mirror_action.fs_mirror->is_stopping() &&
+        !mirror_action.fs_mirror->is_failed()) {
+      C_SaferCond cond;
+      mirror_action.fs_mirror->shutdown(new C_AsyncCallback<ContextWQ>(m_work_queue, &cond));
+      int r = cond.wait();
+      dout(10) << ": shutdown filesystem=" << filesystem << ", r=" << r << dendl;
     }
-
-    Context *on_finish = new LambdaContext([this, fs_name = fs_name](int r) {
-                                             handle_shutdown(fs_name, r);
-                            });
-    fs_mirror->shutdown(new C_AsyncCallback<ContextWQ>(m_work_queue, on_finish));
   }
-
-  m_cond.wait(locker, [this] {return m_fs_mirrors.empty();});
 
   m_stopped = true;
   m_cond.notify_all();
index 3d1b3699589e1699655babbcf6fd3b8a969e8dee..f86b8bf0e1fdcd8f8127d1a1bf893fd4866d128d 100644 (file)
@@ -39,6 +39,10 @@ public:
 private:
   static constexpr std::string_view MIRRORING_MODULE = "mirroring";
 
+  struct C_EnableMirroring;
+  struct C_DisableMirroring;
+  struct C_PeerUpdate;
+
   struct ClusterListener : ClusterWatcher::Listener {
     Mirror *mirror;
 
@@ -47,22 +51,28 @@ private:
     }
 
     void handle_mirroring_enabled(const FilesystemSpec &spec) override {
-      mirror->mirroring_enabled(spec.fs_name, spec.pool_id);
+      mirror->mirroring_enabled(spec.filesystem, spec.pool_id);
     }
 
-    void handle_mirroring_disabled(const std::string &fs_name) override {
-      mirror->mirroring_disabled(fs_name);
+    void handle_mirroring_disabled(const Filesystem &filesystem) override {
+      mirror->mirroring_disabled(filesystem);
     }
 
-    void handle_peers_added(const std::string &fs_name, const Peer &peer) override {
-      mirror->peer_added(fs_name, peer);
+    void handle_peers_added(const Filesystem &filesystem, const Peer &peer) override {
+      mirror->peer_added(filesystem, peer);
     }
 
-    void handle_peers_removed(const std::string &fs_name, const Peer &peer) override {
-      mirror->peer_removed(fs_name, peer);
+    void handle_peers_removed(const Filesystem &filesystem, const Peer &peer) override {
+      mirror->peer_removed(filesystem, peer);
     }
   };
 
+  struct MirrorAction {
+    bool action_in_progress = false;
+    std::list<Context *> action_ctxs;
+    std::unique_ptr<FSMirror> fs_mirror;
+  };
+
   ceph::mutex m_lock = ceph::make_mutex("cephfs::mirror::Mirror");
   ceph::condition_variable m_cond;
 
@@ -74,22 +84,37 @@ private:
 
   ContextWQ *m_work_queue = nullptr;
   SafeTimer *m_timer = nullptr;
+  ceph::mutex *m_timer_lock = nullptr;
+  Context *m_timer_task = nullptr;
 
   bool m_stopping = false;
   bool m_stopped = false;
   std::unique_ptr<ClusterWatcher> m_cluster_watcher;
-  std::map<std::string, std::unique_ptr<FSMirror>> m_fs_mirrors;
+  std::map<Filesystem, MirrorAction> m_mirror_actions;
 
   int init_mon_client();
 
-  void handle_mirroring_enabled(const std::string &fs_name, int r);
-  void mirroring_enabled(const std::string &fs_name, uint64_t local_pool_id);
-  void mirroring_disabled(const std::string &fs_name);
+  // called via listener
+  void mirroring_enabled(const Filesystem &filesystem, uint64_t local_pool_id);
+  void mirroring_disabled(const Filesystem &filesystem);
+  void peer_added(const Filesystem &filesystem, const Peer &peer);
+  void peer_removed(const Filesystem &filesystem, const Peer &peer);
+
+  // mirror enable callback
+  void enable_mirroring(const Filesystem &filesystem, uint64_t local_pool_id,
+                        Context *on_finish);
+  void handle_enable_mirroring(const Filesystem &filesystem, int r);
+
+  // mirror disable callback
+  void disable_mirroring(const Filesystem &filesystem, Context *on_finish);
+  void handle_disable_mirroring(const Filesystem &filesystem, int r);
 
-  void peer_added(const std::string &fs_name, const Peer &peer);
-  void peer_removed(const std::string &fs_name, const Peer &peer);
+  // peer update callback
+  void add_peer(const Filesystem &filesystem, const Peer &peer);
+  void remove_peer(const Filesystem &filesystem, const Peer &peer);
 
-  void handle_shutdown(const std::string &fs_name, int r);
+  void schedule_mirror_update_task();
+  void update_fs_mirrors();
 };
 
 } // namespace mirror