]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
cephfs_mirror: add --sync-latest-snapshot and --sync_from_snapshot options
authorJos Collin <jcollin@redhat.com>
Fri, 14 Feb 2025 09:43:31 +0000 (15:13 +0530)
committerJos Collin <jcollin@redhat.com>
Wed, 7 Jan 2026 09:36:40 +0000 (15:06 +0530)
Fixes: https://tracker.ceph.com/issues/69966
Fixes: https://tracker.ceph.com/issues/69187
Fixes: https://tracker.ceph.com/issues/69188
Signed-off-by: Jos Collin <jcollin@redhat.com>
src/pybind/mgr/mirroring/fs/snapshot_mirror.py
src/pybind/mgr/mirroring/module.py
src/tools/cephfs_mirror/FSMirror.cc
src/tools/cephfs_mirror/FSMirror.h
src/tools/cephfs_mirror/InstanceWatcher.cc
src/tools/cephfs_mirror/InstanceWatcher.h
src/tools/cephfs_mirror/PeerReplayer.cc
src/tools/cephfs_mirror/PeerReplayer.h

index c348ce82de1f46f3c6dcd8205e11cc3daaac50c0..3d4f88a6e0e7f33bdd2d5f67effe6f91cccacdee 100644 (file)
@@ -178,8 +178,16 @@ class FSPolicy:
 
     def process_updates(self):
         def acquire_message(dir_path):
+            lookup_info = self.policy.lookup(dir_path)
+            sync_latest_snapshot = False
+            sync_from_snapshot = None
+            if lookup_info:
+                sync_latest_snapshot = lookup_info['sync_latest_snapshot']
+                sync_from_snapshot = lookup_info['sync_from_snapshot']
             return json.dumps({'dir_path': dir_path,
-                               'mode': 'acquire'
+                               'mode': 'acquire',
+                               'sync_latest_snapshot': sync_latest_snapshot,
+                               'sync_from_snapshot': sync_from_snapshot
                                })
         def release_message(dir_path):
             return json.dumps({'dir_path': dir_path,
@@ -202,7 +210,9 @@ class FSPolicy:
                     # take care to not overwrite purge status
                     update_map[dir_path] = {'version': 1,
                                             'instance_id': lookup_info['instance_id'],
-                                            'last_shuffled': lookup_info['mapped_time']
+                                            'last_shuffled': lookup_info['mapped_time'],
+                                            'sync_latest_snapshot': lookup_info['sync_latest_snapshot'],
+                                            'sync_from_snapshot': lookup_info['sync_from_snapshot']
                     }
                     if lookup_info['purging']:
                         update_map[dir_path]['purging'] = 1
@@ -219,7 +229,7 @@ class FSPolicy:
                 self.notifier.notify(dir_path, message, self.handle_peer_ack)
             self.dir_paths.clear()
 
-    def add_dir(self, dir_path):
+    def add_dir(self, dir_path, sync_latest_snapshot, sync_from_snapshot):
         with self.lock:
             lookup_info = self.policy.lookup(dir_path)
             if lookup_info:
@@ -230,7 +240,9 @@ class FSPolicy:
             schedule = self.policy.add_dir(dir_path)
             if not schedule:
                 return
-            update_map = {dir_path: {'version': 1, 'instance_id': '', 'last_shuffled': 0.0}}
+            update_map = {dir_path: {'version': 1, 'instance_id': '', 'last_shuffled': 0.0,
+                                     'sync_latest_snapshot': sync_latest_snapshot,
+                                     'sync_from_snapshot': sync_from_snapshot}}
             updated = False
             def update_safe(updates, removals, r):
                 nonlocal updated
@@ -689,7 +701,7 @@ class FSSnapshotMirror:
             raise MirrorException(-errno.EINVAL, f'{dir_path} should be an absolute path')
         return os.path.normpath(dir_path)
 
-    def add_dir(self, filesystem, dir_path):
+    def add_dir(self, filesystem, dir_path, sync_latest_snapshot, sync_from_snapshot):
         try:
             with self.lock:
                 if not self.filesystem_exist(filesystem):
@@ -699,7 +711,7 @@ class FSSnapshotMirror:
                     raise MirrorException(-errno.EINVAL, f'filesystem {filesystem} is not mirrored')
                 dir_path = FSSnapshotMirror.norm_path(dir_path)
                 log.debug(f'path normalized to {dir_path}')
-                fspolicy.add_dir(dir_path)
+                fspolicy.add_dir(dir_path, sync_latest_snapshot, sync_from_snapshot)
                 return 0, json.dumps({}), ''
         except MirrorException as me:
             return me.args[0], '', me.args[1]
index 2ea3e1b067c445abb006612cbbd8305460138ab1..744a69ba809ff4ce7a74f497c667f2a70a3865cf 100644 (file)
@@ -77,9 +77,12 @@ class Module(MgrModule):
     @CLIWriteCommand('fs snapshot mirror add')
     def snapshot_mirror_add_dir(self,
                                 fs_name: str,
-                                path: str):
+                                path: str,
+                                sync_latest_snapshot: Optional[bool] = False,
+                                sync_from_snapshot: Optional[str] = None):
         """Add a directory for snapshot mirroring"""
-        return self.fs_snapshot_mirror.add_dir(fs_name, path)
+        return self.fs_snapshot_mirror.add_dir(fs_name, path, sync_latest_snapshot,
+                                               sync_from_snapshot)
 
     @CLIWriteCommand('fs snapshot mirror remove')
     def snapshot_mirror_remove_dir(self,
index 4158361e7b981cb4a89817a9fbf2ae228797b248..f2bd5a3c18d6adacd1be24ee9efc1c89ac7ab8e5 100644 (file)
@@ -371,8 +371,11 @@ void FSMirror::handle_shutdown_instance_watcher(int r) {
   }
 }
 
-void FSMirror::handle_acquire_directory(string_view dir_path) {
-  dout(5) << ": dir_path=" << dir_path << dendl;
+void FSMirror::handle_acquire_directory(string_view dir_path,
+                                        bool sync_latest_snapshot,
+                                        string_view sync_from_snapshot) {
+  dout(5) << ": dir_path=" << dir_path << ", sync_latest_snapshot=" << sync_latest_snapshot
+          << ", sync_from_snapshot=" << sync_from_snapshot << dendl;
 
   {
     std::scoped_lock locker(m_lock);
@@ -382,7 +385,7 @@ void FSMirror::handle_acquire_directory(string_view dir_path) {
 
     for (auto &[peer, peer_replayer] : m_peer_replayers) {
       dout(10) << ": peer=" << peer << dendl;
-      peer_replayer->add_directory(dir_path);
+      peer_replayer->add_directory(dir_path, sync_latest_snapshot, sync_from_snapshot);
     }
   }
   if (m_perf_counters) {
index 17f0f82164b0f5d0d1909e18b5c499fa8d0d991f..216206d8ed70fbd4f7bd493a5cf44bb9b72850ef 100644 (file)
@@ -117,8 +117,10 @@ private:
       : fs_mirror(fs_mirror) {
     }
 
-    void acquire_directory(std::string_view dir_path) override {
-      fs_mirror->handle_acquire_directory(dir_path);
+    void acquire_directory(std::string_view dir_path,
+                           bool sync_latest_snapshot = false,
+                           std::string_view sync_from_snapshot = "") override {
+      fs_mirror->handle_acquire_directory(dir_path, sync_latest_snapshot, sync_from_snapshot);
     }
 
     void release_directory(std::string_view dir_path) override {
@@ -192,7 +194,8 @@ private:
   void shutdown_instance_watcher();
   void handle_shutdown_instance_watcher(int r);
 
-  void handle_acquire_directory(std::string_view dir_path);
+  void handle_acquire_directory(std::string_view dir_path, bool sync_latest_snapshot,
+                                std::string_view sync_from_snapshot);
   void handle_release_directory(std::string_view dir_path);
 };
 
index 3ea3906404cc5bf8ab984af0946464d8f2e4b416..c4bdd8c13fa13ecf7f9cb7f508e0ff617838013e 100644 (file)
@@ -87,19 +87,29 @@ void InstanceWatcher::handle_notify(uint64_t notify_id, uint64_t handle,
 
   std::string dir_path;
   std::string mode;
+  bool sync_latest_snapshot;
+  std::string sync_from_snapshot;
+
   try {
     JSONDecoder jd(bl);
     JSONDecoder::decode_json("dir_path", dir_path, &jd.parser, true);
     JSONDecoder::decode_json("mode", mode, &jd.parser, true);
+    JSONDecoder::decode_json("sync_latest_snapshot", sync_latest_snapshot, &jd.parser, true);
+    JSONDecoder::decode_json("sync_from_snapshot", sync_from_snapshot, &jd.parser, true);
   } catch (const JSONDecoder::err &e) {
     derr << ": failed to decode notify json: " << e.what() << dendl;
   }
 
   dout(20) << ": notifier_id=" << notifier_id << ", dir_path=" << dir_path
-           << ", mode=" << mode << dendl;
+           << ", mode=" << mode << ", sync_latest_snapshot=" << sync_latest_snapshot
+           << ", sync_from_snapshot=" << sync_from_snapshot << dendl;
 
   if (mode == "acquire") {
-    m_listener.acquire_directory(dir_path);
+    if (sync_from_snapshot == "null") {
+      m_listener.acquire_directory(dir_path, sync_latest_snapshot);
+    } else {
+      m_listener.acquire_directory(dir_path, sync_latest_snapshot, sync_from_snapshot);
+    }
   } else if (mode == "release") {
     m_listener.release_directory(dir_path);
   } else {
index 5a48085d28cf28436d824b432aea1ac25ba95b64..3b85bb7118d228cbe4836aafcb6c6030b17cbac9 100644 (file)
@@ -26,7 +26,9 @@ public:
     virtual ~Listener() {
     }
 
-    virtual void acquire_directory(std::string_view dir_path) = 0;
+    virtual void acquire_directory(std::string_view dir_path,
+                                   bool sync_latest_snapshot = false,
+                                   std::string_view sync_from_snapshot = "") = 0;
     virtual void release_directory(std::string_view dir_path) = 0;
   };
 
index 7ea41ec11acbf2068004c5191a653463f753075f..b9e4059d21fce4552481990c2a40edcf631e58f9 100644 (file)
@@ -174,11 +174,14 @@ PeerReplayer::PeerReplayer(CephContext *cct, FSMirror *fs_mirror,
     m_local_cluster(local_cluster),
     m_filesystem(filesystem),
     m_peer(peer),
-    m_directories(directories.begin(), directories.end()),
     m_local_mount(mount),
     m_service_daemon(service_daemon),
     m_asok_hook(new PeerReplayerAdminSocketHook(cct, filesystem, peer, this)),
     m_lock(ceph::make_mutex("cephfs::mirror::PeerReplayer::" + stringify(peer.uuid))) {
+  std::for_each(directories.begin(), directories.end(), [this](const auto& dir_root) {
+    m_directories[dir_root] = SnapSyncFrom();
+  });
+
   // reset sync stats sent via service daemon
   m_service_daemon->add_or_update_peer_attribute(m_filesystem.fscid, m_peer,
                                                  SERVICE_DAEMON_FAILED_DIR_COUNT_KEY, (uint64_t)0);
@@ -228,10 +231,12 @@ PeerReplayer::~PeerReplayer() {
 }
 
 int PeerReplayer::init() {
-  dout(20) << ": initial dir list=[" << m_directories << "]" << dendl;
+  std::string str_list = ": initial dir list=[";
   for (auto &dir_root : m_directories) {
-    m_snap_sync_stats.emplace(dir_root, SnapSyncStat());
+    m_snap_sync_stats.emplace(dir_root.first, SnapSyncStat());
+    str_list += dir_root.first + " ";
   }
+  dout(20) << str_list <<  "]" << dendl;
 
   auto &remote_client = m_peer.remote.client_name;
   auto &remote_cluster = m_peer.remote.cluster_name;
@@ -322,11 +327,13 @@ void PeerReplayer::shutdown() {
   m_remote_cluster.reset();
 }
 
-void PeerReplayer::add_directory(string_view dir_root) {
-  dout(20) << ": dir_root=" << dir_root << dendl;
+void PeerReplayer::add_directory(string_view dir_root, bool sync_latest_snapshot,
+                                string_view sync_from_snapshot) {
+  dout(20) << ": dir_root=" << dir_root << ", sync_latest_snapshot=" << sync_latest_snapshot
+           << ", sync_from_snapshot=" << sync_from_snapshot << dendl;
 
   std::scoped_lock locker(m_lock);
-  m_directories.emplace_back(dir_root);
+  m_directories[std::string(dir_root)] = SnapSyncFrom(sync_latest_snapshot, sync_from_snapshot);
   m_snap_sync_stats.emplace(dir_root, SnapSyncStat());
   m_cond.notify_all();
 }
@@ -336,7 +343,7 @@ void PeerReplayer::remove_directory(string_view dir_root) {
   auto _dir_root = std::string(dir_root);
 
   std::scoped_lock locker(m_lock);
-  auto it = std::find(m_directories.begin(), m_directories.end(), _dir_root);
+  auto it = m_directories.find(_dir_root);
   if (it != m_directories.end()) {
     m_directories.erase(it);
   }
@@ -359,20 +366,19 @@ boost::optional<std::string> PeerReplayer::pick_directory() {
 
   boost::optional<std::string> candidate;
   for (auto &dir_root : m_directories) {
-    auto &sync_stat = m_snap_sync_stats.at(dir_root);
+    auto &sync_stat = m_snap_sync_stats.at(dir_root.first);
     if (sync_stat.failed) {
       std::chrono::duration<double> d = now - *sync_stat.last_failed;
       if (d.count() < retry_timo) {
         continue;
       }
     }
-    if (!m_registered.count(dir_root)) {
-      candidate = dir_root;
+    if (!m_registered.count(dir_root.first)) {
+      candidate = dir_root.first;
       break;
     }
   }
 
-  std::rotate(m_directories.begin(), m_directories.begin() + 1, m_directories.end());
   return candidate;
 }
 
@@ -401,7 +407,7 @@ void PeerReplayer::unregister_directory(const std::string &dir_root) {
 
   unlock_directory(it->first, it->second);
   m_registered.erase(it);
-  if (std::find(m_directories.begin(), m_directories.end(), dir_root) == m_directories.end()) {
+  if (m_directories.find(dir_root) == m_directories.end()) {
     m_snap_sync_stats.erase(dir_root);
   }
 }
@@ -480,6 +486,19 @@ void PeerReplayer::unlock_directory(const std::string &dir_root, const DirRegist
   dout(10) << ": dir_root=" << dir_root << " unlocked" << dendl;
 }
 
+int PeerReplayer::get_snap_id(const std::string &dir_root, const std::string& snap_name) {
+  snap_info info;
+  auto snap_dir = snapshot_dir_path(m_cct, dir_root);
+  auto snap_path = snapshot_path(snap_dir, snap_name);
+  int r = ceph_get_snap_info(m_local_mount, snap_path.c_str(), &info);
+  if (r < 0) {
+    derr << ": failed to fetch " << snap_name << ", snap info for snap_path=" << snap_path
+        << ": " << cpp_strerror(r) << dendl;
+    return r;
+  }
+  return info.id;
+}
+
 int PeerReplayer::build_snap_map(const std::string &dir_root,
                                  std::map<uint64_t, std::string> *snap_map, bool is_remote) {
   auto snap_dir = snapshot_dir_path(m_cct, dir_root);
@@ -500,6 +519,7 @@ int PeerReplayer::build_snap_map(const std::string &dir_root,
     return r;
   }
 
+  auto sync_stat = m_snap_sync_stats.at(dir_root);
   std::set<std::string> snaps;
   auto entry = ceph_readdir(mnt, dirp);
   while (entry != NULL) {
@@ -512,6 +532,17 @@ int PeerReplayer::build_snap_map(const std::string &dir_root,
     entry = ceph_readdir(mnt, dirp);
   }
 
+  uint64_t snap_id_in = 0;
+  std::string snap_name = m_directories.at(dir_root).sync_from_snapshot;
+  if (!is_remote && !snap_name.empty()) {
+    r = get_snap_id(dir_root, snap_name);
+    if (r < 0) {
+      derr << ": defaulting to first snapshot for syncing" << dendl;
+    } else {
+      snap_id_in = r;
+    }
+  }
+
   int rv = 0;
   for (auto &snap : snaps) {
     snap_info info;
@@ -551,7 +582,21 @@ int PeerReplayer::build_snap_map(const std::string &dir_root,
     if (rv != 0) {
       break;
     }
-    snap_map->emplace(snap_id, snap);
+    if (snap_id >= snap_id_in) {
+      snap_map->emplace(snap_id, snap);
+    }
+  }
+
+  if (!is_remote && m_directories.at(dir_root).sync_latest_snapshot) {
+    // Now we know the highest snap_id object emplaced.
+    // Reset the snap_map to the highest snap_id element.
+    if (!sync_stat.current_syncing_snap) { // skip if there's a sync in progress
+      auto it = snap_map->rbegin();
+      auto sid = it->first;
+      auto sname = it->second;
+      snap_map->clear();
+      snap_map->emplace(sid, sname);
+    }
   }
 
   r = ceph_closedir(mnt, dirp);
@@ -1956,10 +2001,12 @@ int PeerReplayer::do_sync_snaps(const std::string &dir_root) {
     }
   }
 
-  r = propagate_snap_deletes(dir_root, snaps_deleted);
-  if (r < 0) {
-    derr << ": failed to propgate deleted snapshots" << dendl;
-    return r;
+  if (!m_directories.at(dir_root).sync_latest_snapshot) {
+    r = propagate_snap_deletes(dir_root, snaps_deleted);
+    if (r < 0) {
+      derr << ": failed to propgate deleted snapshots" << dendl;
+      return r;
+    }
   }
 
   r = propagate_snap_renames(dir_root, snaps_renamed);
index c99d75abafcf799dfe61d95ae6743a4d6a1b4651..ffbc329540b1cb5e7042704410a3a3c3e6206bf5 100644 (file)
@@ -34,7 +34,8 @@ public:
   void shutdown();
 
   // add a directory to mirror queue
-  void add_directory(std::string_view dir_root);
+  void add_directory(std::string_view dir_root, bool sync_latest_snapshot,
+                     std::string_view sync_from_snapshot);
 
   // remove a directory from queue
   void remove_directory(std::string_view dir_root);
@@ -242,19 +243,42 @@ private:
   };
 
   struct SnapSyncStat {
-    uint64_t nr_failures = 0; // number of consecutive failures
+    uint64_t nr_failures; // number of consecutive failures
     boost::optional<monotime> last_failed; // lat failed timestamp
     boost::optional<std::string> last_failed_reason;
-    bool failed = false; // hit upper cap for consecutive failures
+    bool failed; // hit upper cap for consecutive failures
     boost::optional<std::pair<uint64_t, std::string>> last_synced_snap;
     boost::optional<std::pair<uint64_t, std::string>> current_syncing_snap;
-    uint64_t synced_snap_count = 0;
-    uint64_t deleted_snap_count = 0;
-    uint64_t renamed_snap_count = 0;
-    monotime last_synced = clock::zero();
+    uint64_t synced_snap_count;
+    uint64_t deleted_snap_count;
+    uint64_t renamed_snap_count;
+    monotime last_synced;
     boost::optional<double> last_sync_duration;
     boost::optional<uint64_t> last_sync_bytes; //last sync bytes for display in status
-    uint64_t sync_bytes = 0; //sync bytes counter, independently for each directory sync.
+    uint64_t sync_bytes; //sync bytes counter, independently for each directory sync.
+
+    SnapSyncStat() {
+      nr_failures = 0;
+      failed = false;
+      synced_snap_count = 0;
+      deleted_snap_count = 0;
+      renamed_snap_count = 0;
+      last_synced = clock::zero();
+      sync_bytes = 0;
+    }
+  };
+
+  struct SnapSyncFrom {
+    bool sync_latest_snapshot;
+    std::string sync_from_snapshot;
+    SnapSyncFrom() {
+      sync_latest_snapshot = false;
+      sync_from_snapshot = "";
+    }
+    SnapSyncFrom(bool sl, const std::string_view& sf) {
+      sync_latest_snapshot = sl;
+      sync_from_snapshot = sf;
+    }
   };
 
   void _inc_failed_count(const std::string &dir_root) {
@@ -365,7 +389,7 @@ private:
   Peer m_peer;
   // probably need to be encapsulated when supporting cancelations
   std::map<std::string, DirRegistry> m_registered;
-  std::vector<std::string> m_directories;
+  std::map<std::string, SnapSyncFrom> m_directories;
   std::map<std::string, SnapSyncStat> m_snap_sync_stats;
   MountRef m_local_mount;
   ServiceDaemon *m_service_daemon;
@@ -396,6 +420,7 @@ private:
   int build_snap_map(const std::string &dir_root, std::map<uint64_t, std::string> *snap_map,
                      bool is_remote=false);
 
+  int get_snap_id(const std::string &dir_root, const std::string& snap_name);
   int propagate_snap_deletes(const std::string &dir_root, const std::set<std::string> &snaps);
   int propagate_snap_renames(const std::string &dir_root,
                              const std::set<std::pair<std::string,std::string>> &snaps);