]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
cephfs_mirror: Add ErrorListener to maintain blocklisted/failed timestamp in FSMirror
authorJos Collin <jcollin@redhat.com>
Tue, 28 May 2024 14:57:55 +0000 (20:27 +0530)
committerJos Collin <jcollin@redhat.com>
Wed, 17 Jul 2024 08:15:05 +0000 (13:45 +0530)
Have FSMirror register a listener with InstanceWatcher/MirrorWatcher which would get invoked when the mirror daemon is blocklisted or failed.
Thus FSMirror can maintain the last blocklisted/failed timestamp and use that for restarting the mirror daemon.

Fixes: https://tracker.ceph.com/issues/64927
Fixes: https://tracker.ceph.com/issues/51964
Fixes: https://tracker.ceph.com/issues/63931
Fixes: https://tracker.ceph.com/issues/63089
Signed-off-by: Jos Collin <jcollin@redhat.com>
(cherry picked from commit 77ec7bfde7a349b0e06b34ecdf328996c7642d43)

src/tools/cephfs_mirror/FSMirror.cc
src/tools/cephfs_mirror/FSMirror.h
src/tools/cephfs_mirror/InstanceWatcher.cc
src/tools/cephfs_mirror/InstanceWatcher.h
src/tools/cephfs_mirror/MirrorWatcher.cc
src/tools/cephfs_mirror/MirrorWatcher.h
src/tools/cephfs_mirror/Watcher.h

index 3d5bf2d1c724244b19cf99f753cfeb27cce55307..ea1857b1eba86d9b0ba329ba14cc406012ee34bb 100644 (file)
@@ -114,6 +114,7 @@ FSMirror::FSMirror(CephContext *cct, const Filesystem &filesystem, uint64_t pool
     m_args(args),
     m_work_queue(work_queue),
     m_snap_listener(this),
+    m_ts_listener(this),
     m_asok_hook(new MirrorAdminSocketHook(cct, filesystem, this)) {
   m_service_daemon->add_or_update_fs_attribute(m_filesystem.fscid, SERVICE_DAEMON_DIR_COUNT_KEY,
                                                (uint64_t)0);
@@ -270,7 +271,7 @@ void FSMirror::init_instance_watcher(Context *on_finish) {
 
   Context *ctx = new C_CallbackAdapter<
     FSMirror, &FSMirror::handle_init_instance_watcher>(this);
-  m_instance_watcher = InstanceWatcher::create(m_ioctx, m_snap_listener, m_work_queue);
+  m_instance_watcher = InstanceWatcher::create(m_ioctx, m_snap_listener, m_ts_listener, m_work_queue);
   m_instance_watcher->init(ctx);
 }
 
@@ -299,7 +300,7 @@ void FSMirror::init_mirror_watcher() {
   std::scoped_lock locker(m_lock);
   Context *ctx = new C_CallbackAdapter<
     FSMirror, &FSMirror::handle_init_mirror_watcher>(this);
-  m_mirror_watcher = MirrorWatcher::create(m_ioctx, this, m_work_queue);
+  m_mirror_watcher = MirrorWatcher::create(m_ioctx, this, m_ts_listener, m_work_queue);
   m_mirror_watcher->init(ctx);
 }
 
index b106fdff8b60d4e61b9ca47e993c189e67b2c8a3..70ebbd0f4b6d74e40891eca8a6c9eb51f1db8d56 100644 (file)
@@ -59,14 +59,12 @@ public:
 
   monotime get_failed_ts() {
     std::scoped_lock locker(m_lock);
-    if (m_instance_watcher) {
-      return m_instance_watcher->get_failed_ts();
-    }
-    if (m_mirror_watcher) {
-      return m_mirror_watcher->get_failed_ts();
-    }
+    return m_failed_ts;
+  }
 
-    return clock::now();
+  void set_failed_ts() {
+    std::scoped_lock locker(m_lock);
+    m_failed_ts = clock::now();
   }
 
   bool is_blocklisted() {
@@ -76,14 +74,12 @@ public:
 
   monotime get_blocklisted_ts() {
     std::scoped_lock locker(m_lock);
-    if (m_instance_watcher) {
-      return m_instance_watcher->get_blocklisted_ts();
-    }
-    if (m_mirror_watcher) {
-      return m_mirror_watcher->get_blocklisted_ts();
-    }
+    return m_blocklisted_ts;
+  }
 
-    return clock::now();
+  void set_blocklisted_ts() {
+    std::scoped_lock locker(m_lock);
+    m_blocklisted_ts = clock::now();
   }
 
   Peers get_peers() {
@@ -128,8 +124,24 @@ private:
     void release_directory(std::string_view dir_path) override {
       fs_mirror->handle_release_directory(dir_path);
     }
+
+  };
+
+  struct TimestampListener: public Watcher::ErrorListener {
+    FSMirror *fs_mirror;
+    TimestampListener(FSMirror *fs_mirror)
+      : fs_mirror(fs_mirror) {
+    }
+    void set_blocklisted_ts() {
+      fs_mirror->set_blocklisted_ts();
+    }
+    void set_failed_ts() {
+      fs_mirror->set_failed_ts();
+    }
   };
 
+  monotime m_blocklisted_ts;
+  monotime m_failed_ts;
   CephContext *m_cct;
   Filesystem m_filesystem;
   uint64_t m_pool_id;
@@ -139,6 +151,7 @@ private:
 
   ceph::mutex m_lock = ceph::make_mutex("cephfs::mirror::fs_mirror");
   SnapListener m_snap_listener;
+  TimestampListener m_ts_listener;
   std::set<std::string, std::less<>> m_directories;
   Peers m_all_peers;
   std::map<Peer, std::unique_ptr<PeerReplayer>> m_peer_replayers;
index fece936a94b4fefefbf3220c59f00db0938f3d3d..5b19d017287d9b047f2362796c6905324dcd1d4b 100644 (file)
@@ -31,10 +31,11 @@ std::string instance_oid(const std::string &instance_id) {
 } // anonymous namespace
 
 InstanceWatcher::InstanceWatcher(librados::IoCtx &ioctx,
-                                 Listener &listener, ContextWQ *work_queue)
+                                 Listener &listener, ErrorListener &elistener, ContextWQ *work_queue)
   : Watcher(ioctx, instance_oid(stringify(ioctx.get_instance_id())), work_queue),
     m_ioctx(ioctx),
     m_listener(listener),
+    m_elistener(elistener),
     m_work_queue(work_queue),
     m_lock(ceph::make_mutex("cephfs::mirror::instance_watcher")) {
 }
@@ -116,15 +117,15 @@ void InstanceWatcher::handle_rewatch_complete(int r) {
     dout(0) << ": client blocklisted" <<dendl;
     std::scoped_lock locker(m_lock);
     m_blocklisted = true;
-    m_blocklisted_ts = clock::now();
+    m_elistener.set_blocklisted_ts();
   } else if (r == -ENOENT) {
     derr << ": mirroring object deleted" << dendl;
     m_failed = true;
-    m_failed_ts = clock::now();
+    m_elistener.set_failed_ts();
   } else if (r < 0) {
     derr << ": rewatch error: " << cpp_strerror(r) << dendl;
     m_failed = true;
-    m_failed_ts = clock::now();
+    m_elistener.set_failed_ts();
   }
 }
 
index 55353f9aa62f607bc43bab0a69f0d20be617da23..d8a873adc17c176b90c908945649802ec0c28b20 100644 (file)
@@ -31,11 +31,11 @@ public:
   };
 
   static InstanceWatcher *create(librados::IoCtx &ioctx,
-                                 Listener &listener, ContextWQ *work_queue) {
-    return new InstanceWatcher(ioctx, listener, work_queue);
+                                 Listener &listener, ErrorListener &elistener, ContextWQ *work_queue) {
+    return new InstanceWatcher(ioctx, listener, elistener, work_queue);
   }
 
-  InstanceWatcher(librados::IoCtx &ioctx, Listener &listener, ContextWQ *work_queue);
+  InstanceWatcher(librados::IoCtx &ioctx, Listener &listener, ErrorListener &elistener, ContextWQ *work_queue);
   ~InstanceWatcher();
 
   void init(Context *on_finish);
@@ -50,24 +50,15 @@ public:
     return m_blocklisted;
   }
 
-  monotime get_blocklisted_ts() {
-    std::scoped_lock locker(m_lock);
-    return m_blocklisted_ts;
-  }
-
   bool is_failed() {
     std::scoped_lock locker(m_lock);
     return m_failed;
   }
 
-  monotime get_failed_ts() {
-    std::scoped_lock locker(m_lock);
-    return m_failed_ts;
-  }
-
 private:
   librados::IoCtx &m_ioctx;
   Listener &m_listener;
+  ErrorListener &m_elistener;
   ContextWQ *m_work_queue;
 
   ceph::mutex m_lock;
@@ -77,9 +68,6 @@ private:
   bool m_blocklisted = false;
   bool m_failed = false;
 
-  monotime m_blocklisted_ts;
-  monotime m_failed_ts;
-
   void create_instance();
   void handle_create_instance(int r);
 
index 55e106512d31eb7579f3d479d993dfb8471e01a5..e84ef90375a0f2b1d73cdbd40d5f116ec4fd98a9 100644 (file)
@@ -21,10 +21,11 @@ namespace cephfs {
 namespace mirror {
 
 MirrorWatcher::MirrorWatcher(librados::IoCtx &ioctx, FSMirror *fs_mirror,
-                             ContextWQ *work_queue)
+                             ErrorListener &elistener, ContextWQ *work_queue)
   : Watcher(ioctx, CEPHFS_MIRROR_OBJECT, work_queue),
     m_ioctx(ioctx),
     m_fs_mirror(fs_mirror),
+    m_elistener(elistener),
     m_work_queue(work_queue),
     m_lock(ceph::make_mutex("cephfs::mirror::mirror_watcher")),
     m_instance_id(stringify(m_ioctx.get_instance_id())) {
@@ -92,15 +93,15 @@ void MirrorWatcher::handle_rewatch_complete(int r) {
     dout(0) << ": client blocklisted" <<dendl;
     std::scoped_lock locker(m_lock);
     m_blocklisted = true;
-    m_blocklisted_ts = clock::now();
+    m_elistener.set_blocklisted_ts();
   } else if (r == -ENOENT) {
     derr << ": mirroring object deleted" << dendl;
     m_failed = true;
-    m_failed_ts = clock::now();
+    m_elistener.set_failed_ts();
   } else if (r < 0) {
     derr << ": rewatch error: " << cpp_strerror(r) << dendl;
     m_failed = true;
-    m_failed_ts = clock::now();
+    m_elistener.set_failed_ts();
   }
 }
 
index 37fe55ef0c5ba7ab47a44fc1a15fa62308ba0982..610db51b1c04bfc94f0eaa7bd7bdc12eb82ffed2 100644 (file)
@@ -28,11 +28,11 @@ class FSMirror;
 class MirrorWatcher : public Watcher {
 public:
   static MirrorWatcher *create(librados::IoCtx &ioctx, FSMirror *fs_mirror,
-                               ContextWQ *work_queue) {
-    return new MirrorWatcher(ioctx, fs_mirror, work_queue);
+                               ErrorListener &elistener, ContextWQ *work_queue) {
+    return new MirrorWatcher(ioctx, fs_mirror, elistener, work_queue);
   }
 
-  MirrorWatcher(librados::IoCtx &ioctx, FSMirror *fs_mirror,
+  MirrorWatcher(librados::IoCtx &ioctx, FSMirror *fs_mirror, ErrorListener &elistener,
                 ContextWQ *work_queue);
   ~MirrorWatcher();
 
@@ -48,24 +48,15 @@ public:
     return m_blocklisted;
   }
 
-  monotime get_blocklisted_ts() {
-    std::scoped_lock locker(m_lock);
-    return m_blocklisted_ts;
-  }
-
   bool is_failed() {
     std::scoped_lock locker(m_lock);
     return m_failed;
   }
 
-  monotime get_failed_ts() {
-    std::scoped_lock locker(m_lock);
-    return m_failed_ts;
-  }
-
 private:
   librados::IoCtx &m_ioctx;
   FSMirror *m_fs_mirror;
+  ErrorListener &m_elistener;
   ContextWQ *m_work_queue;
 
   ceph::mutex m_lock;
@@ -77,9 +68,6 @@ private:
   bool m_blocklisted = false;
   bool m_failed = false;
 
-  monotime m_blocklisted_ts;
-  monotime m_failed_ts;
-
   void register_watcher();
   void handle_register_watcher(int r);
 
index 9e7c54eebbb2c8bdae35914a11194d46b356b77e..a0c514011702e657e02fe5bd517bb32e07c52968 100644 (file)
@@ -28,6 +28,13 @@ public:
   void register_watch(Context *on_finish);
   void unregister_watch(Context *on_finish);
 
+  struct ErrorListener {
+    virtual ~ErrorListener() {
+    }
+    virtual void set_blocklisted_ts() = 0;
+    virtual void set_failed_ts() = 0;
+  };
+
 protected:
   std::string m_oid;