]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
cephfs_mirror: use snapdiff api for incremental syncing 58984/head
authorJos Collin <jcollin@redhat.com>
Thu, 23 Nov 2023 06:25:11 +0000 (11:55 +0530)
committerJos Collin <jcollin@redhat.com>
Thu, 1 Aug 2024 10:31:09 +0000 (16:01 +0530)
Use snapdiff api to sync only the delta of files between two snapshots.

Fixes: https://tracker.ceph.com/issues/61334
Signed-off-by: Jos Collin <jcollin@redhat.com>
(cherry picked from commit 96c351c81cb60c34aef16fe1dc8dd2e70fc5acc6)

src/tools/cephfs_mirror/PeerReplayer.cc
src/tools/cephfs_mirror/PeerReplayer.h

index 47a4fd0219f9703fa431df0a3f406f85fe8f4476..6b543dc369de4a018e86e02e86757b8ca02ebab2 100644 (file)
@@ -2,6 +2,7 @@
 // vim: ts=8 sw=2 smarttab
 
 #include <stack>
+#include <queue>
 #include <fcntl.h>
 #include <algorithm>
 #include <sys/time.h>
@@ -63,6 +64,12 @@ std::string entry_path(const std::string &dir, const std::string &name) {
   return dir + "/" + name;
 }
 
+std::string entry_diff_path(const std::string &dir, const std::string &name) {
+  if (dir == ".")
+    return name;
+  return dir + "/" + name;
+}
+
 std::map<std::string, std::string> decode_snap_metadata(snap_metadata *snap_metadata,
                                                         size_t nr_snap_metadata) {
   std::map<std::string, std::string> metadata;
@@ -1210,17 +1217,12 @@ void PeerReplayer::post_sync_close_handles(const FHandles &fh) {
   ceph_close(fh.p_mnt, fh.p_fd);
 }
 
-int PeerReplayer::do_synchronize(const std::string &dir_root, const Snapshot &current,
-                                 boost::optional<Snapshot> prev) {
+int PeerReplayer::do_synchronize(const std::string &dir_root, const Snapshot &current) {
   dout(20) << ": dir_root=" << dir_root << ", current=" << current << dendl;
-  if (prev) {
-    dout(20) << ": incremental sync check from prev=" << prev << dendl;
-  }
-
   FHandles fh;
-  int r = pre_sync_check_and_open_handles(dir_root, current, prev, &fh);
+  int r = pre_sync_check_and_open_handles(dir_root, current, boost::none, &fh);
   if (r < 0) {
-    dout(5) << ": cannot proceeed with sync: " << cpp_strerror(r) << dendl;
+    dout(5) << ": cannot proceed with sync: " << cpp_strerror(r) << dendl;
     return r;
   }
 
@@ -1371,6 +1373,177 @@ int PeerReplayer::do_synchronize(const std::string &dir_root, const Snapshot &cu
   return r;
 }
 
+int PeerReplayer::do_synchronize(const std::string &dir_root, const Snapshot &current,
+                                 boost::optional<Snapshot> prev) {
+  if (!prev) {
+    derr << ": invalid previous snapshot" << dendl;
+    return -ENODATA;
+  }
+
+  dout(20) << ": incremental sync check from prev=" << prev << dendl;
+
+  FHandles fh;
+  int r = pre_sync_check_and_open_handles(dir_root, current, prev, &fh);
+  if (r < 0) {
+    dout(5) << ": cannot proceed with sync: " << cpp_strerror(r) << dendl;
+    return r;
+  }
+
+  BOOST_SCOPE_EXIT_ALL( (this)(&fh) ) {
+    post_sync_close_handles(fh);
+  };
+
+  // record that we are going to "dirty" the data under this directory root
+  auto snap_id_str{stringify(current.second)};
+  r = ceph_setxattr(m_remote_mount, dir_root.c_str(), "ceph.mirror.dirty_snap_id",
+                    snap_id_str.c_str(), snap_id_str.size(), 0);
+  if (r < 0) {
+    derr << ": error setting \"ceph.mirror.dirty_snap_id\" on dir_root=" << dir_root
+         << ": " << cpp_strerror(r) << dendl;
+    return r;
+  }
+
+  struct ceph_statx cstx;
+  r = ceph_fstatx(m_local_mount, fh.c_fd, &cstx,
+                  CEPH_STATX_MODE | CEPH_STATX_UID | CEPH_STATX_GID |
+                  CEPH_STATX_SIZE | CEPH_STATX_ATIME | CEPH_STATX_MTIME,
+                  AT_STATX_DONT_SYNC | AT_SYMLINK_NOFOLLOW);
+  if (r < 0) {
+    derr << ": failed to stat snap=" << current.first << ": " << cpp_strerror(r)
+         << dendl;
+    return r;
+  }
+
+  ceph_snapdiff_info sd_info;
+  ceph_snapdiff_entry_t sd_entry;
+
+  //The queue of SyncEntry items (directories) to be synchronized.
+  //We follow a breadth first approach here based on the snapdiff output.
+  std::queue<SyncEntry> sync_queue;
+
+  //start with initial/default entry
+  std::string epath = ".", npath = "", nabs_path = "", nname = "";
+  sync_queue.emplace(SyncEntry(epath, cstx));
+
+  while (!sync_queue.empty()) {
+    if (should_backoff(dir_root, &r)) {
+      dout(0) << ": backing off r=" << r << dendl;
+      break;
+    }
+    r = pre_sync_check_and_open_handles(dir_root, current, prev, &fh);
+    if (r < 0) {
+      dout(5) << ": cannot proceed with sync: " << cpp_strerror(r) << dendl;
+      return r;
+    }
+
+    dout(20) << ": " << sync_queue.size() << " entries in queue" << dendl;
+    const auto &queue_entry = sync_queue.front();
+    epath = queue_entry.epath;
+    dout(20) << ": syncing entry, path=" << epath << dendl;
+    r = ceph_open_snapdiff(fh.p_mnt, dir_root.c_str(), epath.c_str(),
+                           stringify((*prev).first).c_str(), current.first.c_str(), &sd_info);
+    if (r != 0) {
+      derr << ": failed to open snapdiff, r=" << r << dendl;
+      return r;
+    }
+    while (0 < (r = ceph_readdir_snapdiff(&sd_info, &sd_entry))) {
+      if (r < 0) {
+        derr << ": failed to read directory=" << epath << dendl;
+        ceph_close_snapdiff(&sd_info);
+        return r;
+      }
+
+      //New entry found
+      nname = sd_entry.dir_entry.d_name;
+      if ("." == nname || ".." == nname)
+        continue;
+      // create path for the newly found entry
+      npath = entry_diff_path(epath, nname);
+      nabs_path = entry_diff_path(dir_root, npath);
+
+      r = ceph_statx(sd_info.cmount, nabs_path.c_str(), &cstx,
+                     CEPH_STATX_MODE | CEPH_STATX_UID | CEPH_STATX_GID |
+                     CEPH_STATX_SIZE | CEPH_STATX_ATIME | CEPH_STATX_MTIME,
+                     AT_STATX_DONT_SYNC | AT_SYMLINK_NOFOLLOW);
+      if (r < 0) {
+        // can't stat, so it's a deleted entry.
+        if (DT_DIR == sd_entry.dir_entry.d_type) { // is a directory
+          r = cleanup_remote_dir(dir_root, npath, fh);
+          if (r < 0) {
+            derr << ": failed to remove directory=" << nabs_path << dendl;
+            break;
+          }
+        }
+        else { // is a file
+          r = ceph_unlinkat(m_remote_mount, fh.r_fd_dir_root, npath.c_str(), 0);
+          if (r < 0) {
+            break;
+          }
+        }
+      } else {
+        // stat success, update the existing entry
+        struct ceph_statx tstx;
+        int rstat_r = ceph_statx(m_remote_mount, nabs_path.c_str(), &tstx,
+                                 CEPH_STATX_MODE | CEPH_STATX_UID | CEPH_STATX_GID |
+                                 CEPH_STATX_SIZE | CEPH_STATX_ATIME | CEPH_STATX_MTIME,
+                                 AT_STATX_DONT_SYNC | AT_SYMLINK_NOFOLLOW);
+        if (S_ISDIR(cstx.stx_mode)) { // is a directory
+          //cleanup if it's a file in the remotefs
+          if ((0 == rstat_r) && !S_ISDIR(tstx.stx_mode)) {
+            r = ceph_unlinkat(m_remote_mount, fh.r_fd_dir_root, npath.c_str(), 0);
+            if (r < 0) {
+              derr << ": Error in directory sync. Failed to remove file="
+                   << nabs_path << dendl;
+              break;
+            }
+          }
+          r = remote_mkdir(npath, cstx, fh);
+          if (r < 0) {
+            break;
+          }
+          // push it to sync_queue for later processing
+          sync_queue.emplace(SyncEntry(npath, cstx));
+        } else { // is a file
+          bool need_data_sync = true;
+          bool need_attr_sync = true;
+          r = should_sync_entry(npath, cstx, fh, &need_data_sync, &need_attr_sync);
+          if (r < 0) {
+            break;
+          }
+          dout(5) << ": entry=" << npath << ", data_sync=" << need_data_sync
+                  << ", attr_sync=" << need_attr_sync << dendl;
+          if (need_data_sync || need_attr_sync) {
+            //cleanup if it's a directory in the remotefs
+            if ((0 == rstat_r) && S_ISDIR(tstx.stx_mode)) {
+              r = cleanup_remote_dir(dir_root, npath, fh);
+              if (r < 0) {
+                derr << ": Error in file sync. Failed to remove remote directory="
+                     << nabs_path << dendl;
+                break;
+              }
+            }
+            r = remote_file_op(dir_root, npath, cstx, fh, need_data_sync, need_attr_sync);
+            if (r < 0) {
+              break;
+            }
+          }
+        }
+      }
+    }
+    if (0 == r) {
+      dout(10) << ": successfully synchronized the entry=" << epath << dendl;
+    }
+
+    //Close the current open directory and take the next queue_entry, if success or failure.
+    r = ceph_close_snapdiff(&sd_info);
+    if (r != 0) {
+      derr << ": failed to close directory=" << epath << dendl;
+    }
+    sync_queue.pop();
+  }
+  return r;
+}
+
 int PeerReplayer::synchronize(const std::string &dir_root, const Snapshot &current,
                               boost::optional<Snapshot> prev) {
   dout(20) << ": dir_root=" << dir_root << ", current=" << current << dendl;
@@ -1389,7 +1562,7 @@ int PeerReplayer::synchronize(const std::string &dir_root, const Snapshot &curre
   if (r < 0) {
     dout(5) << ": missing \"ceph.mirror.dirty_snap_id\" xattr on remote -- using"
             << " incremental sync with remote scan" << dendl;
-    r = do_synchronize(dir_root, current, boost::none);
+    r = do_synchronize(dir_root, current);
   } else {
     size_t xlen = r;
     char *val = (char *)alloca(xlen+1);
@@ -1410,7 +1583,7 @@ int PeerReplayer::synchronize(const std::string &dir_root, const Snapshot &curre
       r = do_synchronize(dir_root, current, prev);
     } else {
       dout(5) << ": mismatch -- using incremental sync with remote scan" << dendl;
-      r = do_synchronize(dir_root, current, boost::none);
+      r = do_synchronize(dir_root, current);
     }
   }
 
index 4d86dc43632dc9e353cb9f3d42324aa9125dc2cd..35918fc6e49ea18f19bf9b222d7820f96ce4481d 100644 (file)
@@ -301,6 +301,8 @@ private:
   int do_synchronize(const std::string &dir_root, const Snapshot &current,
                      boost::optional<Snapshot> prev);
 
+  int do_synchronize(const std::string &dir_root, const Snapshot &current);
+
   int synchronize(const std::string &dir_root, const Snapshot &current,
                   boost::optional<Snapshot> prev);
   int do_sync_snaps(const std::string &dir_root);