]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
cephfs-mirror: transfer snapshot diffs whenever possible
authorVenky Shankar <vshankar@redhat.com>
Wed, 17 Mar 2021 12:23:39 +0000 (08:23 -0400)
committerVenky Shankar <vshankar@redhat.com>
Tue, 25 May 2021 12:44:44 +0000 (08:44 -0400)
Use incremental transfer when the data on the remote file system
for a given directory root is belongs to the snapshot which can
be used for local comparison.

Files are chosen based on mtime changes.

Signed-off-by: Venky Shankar <vshankar@redhat.com>
src/tools/cephfs_mirror/PeerReplayer.cc
src/tools/cephfs_mirror/PeerReplayer.h

index 90005795cae9ae501ed5b4ff1d0112ba41ed3a26..ee82efbb6e1149c63bab6b66331ccbb86d0cde85 100644 (file)
@@ -6,6 +6,7 @@
 #include <algorithm>
 #include <sys/time.h>
 #include <sys/file.h>
+#include <boost/scope_exit.hpp>
 
 #include "common/admin_socket.h"
 #include "common/ceph_context.h"
@@ -82,6 +83,20 @@ private:
   PeerReplayer *peer_replayer;
 };
 
+// helper to open a directory relative to a file descriptor
+int opendirat(MountRef mnt, int dirfd, const std::string &relpath, int flags,
+              ceph_dir_result **dirp) {
+  int r = ceph_openat(mnt, dirfd, relpath.c_str(), flags, 0);
+  if (r < 0) {
+    return r;
+  }
+
+  int fd = r;
+  r = ceph_fdopendir(mnt, fd, dirp);
+  ceph_close(mnt, fd);
+  return r;
+}
+
 } // anonymous namespace
 
 class PeerReplayerAdminSocketHook : public AdminSocketHook {
@@ -151,8 +166,8 @@ PeerReplayer::~PeerReplayer() {
 
 int PeerReplayer::init() {
   dout(20) << ": initial dir list=[" << m_directories << "]" << dendl;
-  for (auto &dir_path : m_directories) {
-    m_snap_sync_stats.emplace(dir_path, SnapSyncStat());
+  for (auto &dir_root : m_directories) {
+    m_snap_sync_stats.emplace(dir_root, SnapSyncStat());
   }
 
   auto &remote_client = m_peer.remote.client_name;
@@ -244,28 +259,28 @@ void PeerReplayer::shutdown() {
   m_remote_cluster.reset();
 }
 
-void PeerReplayer::add_directory(string_view dir_path) {
-  dout(20) << ": dir_path=" << dir_path << dendl;
+void PeerReplayer::add_directory(string_view dir_root) {
+  dout(20) << ": dir_root=" << dir_root << dendl;
 
   std::scoped_lock locker(m_lock);
-  m_directories.emplace_back(dir_path);
-  m_snap_sync_stats.emplace(dir_path, SnapSyncStat());
+  m_directories.emplace_back(dir_root);
+  m_snap_sync_stats.emplace(dir_root, SnapSyncStat());
   m_cond.notify_all();
 }
 
-void PeerReplayer::remove_directory(string_view dir_path) {
-  dout(20) << ": dir_path=" << dir_path << dendl;
-  auto _dir_path = std::string(dir_path);
+void PeerReplayer::remove_directory(string_view dir_root) {
+  dout(20) << ": dir_root=" << dir_root << dendl;
+  auto _dir_root = std::string(dir_root);
 
   std::scoped_lock locker(m_lock);
-  auto it = std::find(m_directories.begin(), m_directories.end(), _dir_path);
+  auto it = std::find(m_directories.begin(), m_directories.end(), _dir_root);
   if (it != m_directories.end()) {
     m_directories.erase(it);
   }
 
-  auto it1 = m_registered.find(_dir_path);
+  auto it1 = m_registered.find(_dir_root);
   if (it1 == m_registered.end()) {
-    m_snap_sync_stats.erase(_dir_path);
+    m_snap_sync_stats.erase(_dir_root);
   } else {
     it1->second.replayer->cancel();
   }
@@ -280,16 +295,16 @@ boost::optional<std::string> PeerReplayer::pick_directory() {
     "cephfs_mirror_retry_failed_directories_interval");
 
   boost::optional<std::string> candidate;
-  for (auto &dir_path : m_directories) {
-    auto &sync_stat = m_snap_sync_stats.at(dir_path);
+  for (auto &dir_root : m_directories) {
+    auto &sync_stat = m_snap_sync_stats.at(dir_root);
     if (sync_stat.failed) {
       std::chrono::duration<double> d = now - *sync_stat.last_failed;
       if (d.count() < retry_timo) {
         continue;
       }
     }
-    if (!m_registered.count(dir_path)) {
-      candidate = dir_path;
+    if (!m_registered.count(dir_root)) {
+      candidate = dir_root;
       break;
     }
   }
@@ -298,59 +313,59 @@ boost::optional<std::string> PeerReplayer::pick_directory() {
   return candidate;
 }
 
-int PeerReplayer::register_directory(const std::string &dir_path,
+int PeerReplayer::register_directory(const std::string &dir_root,
                                      SnapshotReplayerThread *replayer) {
-  dout(20) << ": dir_path=" << dir_path << dendl;
-  ceph_assert(m_registered.find(dir_path) == m_registered.end());
+  dout(20) << ": dir_root=" << dir_root << dendl;
+  ceph_assert(m_registered.find(dir_root) == m_registered.end());
 
   DirRegistry registry;
-  int r = try_lock_directory(dir_path, replayer, &registry);
+  int r = try_lock_directory(dir_root, replayer, &registry);
   if (r < 0) {
     return r;
   }
 
-  dout(5) << ": dir_path=" << dir_path << " registered with replayer="
+  dout(5) << ": dir_root=" << dir_root << " registered with replayer="
           << replayer << dendl;
-  m_registered.emplace(dir_path, std::move(registry));
+  m_registered.emplace(dir_root, std::move(registry));
   return 0;
 }
 
-void PeerReplayer::unregister_directory(const std::string &dir_path) {
-  dout(20) << ": dir_path=" << dir_path << dendl;
+void PeerReplayer::unregister_directory(const std::string &dir_root) {
+  dout(20) << ": dir_root=" << dir_root << dendl;
 
-  auto it = m_registered.find(dir_path);
+  auto it = m_registered.find(dir_root);
   ceph_assert(it != m_registered.end());
 
   unlock_directory(it->first, it->second);
   m_registered.erase(it);
-  if (std::find(m_directories.begin(), m_directories.end(), dir_path) == m_directories.end()) {
-    m_snap_sync_stats.erase(dir_path);
+  if (std::find(m_directories.begin(), m_directories.end(), dir_root) == m_directories.end()) {
+    m_snap_sync_stats.erase(dir_root);
   }
 }
 
-int PeerReplayer::try_lock_directory(const std::string &dir_path,
+int PeerReplayer::try_lock_directory(const std::string &dir_root,
                                      SnapshotReplayerThread *replayer, DirRegistry *registry) {
-  dout(20) << ": dir_path=" << dir_path << dendl;
+  dout(20) << ": dir_root=" << dir_root << dendl;
 
-  int r = ceph_open(m_remote_mount, dir_path.c_str(), O_RDONLY | O_DIRECTORY, 0);
+  int r = ceph_open(m_remote_mount, dir_root.c_str(), O_RDONLY | O_DIRECTORY, 0);
   if (r < 0 && r != -ENOENT) {
-    derr << ": failed to open remote dir_path=" << dir_path << ": " << cpp_strerror(r)
+    derr << ": failed to open remote dir_root=" << dir_root << ": " << cpp_strerror(r)
          << dendl;
     return r;
   }
 
   if (r == -ENOENT) {
-    // we snap under dir_path, so mode does not matter much
-    r = ceph_mkdirs(m_remote_mount, dir_path.c_str(), 0755);
+    // we snap under dir_root, so mode does not matter much
+    r = ceph_mkdirs(m_remote_mount, dir_root.c_str(), 0755);
     if (r < 0) {
-      derr << ": failed to create remote directory=" << dir_path << ": " << cpp_strerror(r)
+      derr << ": failed to create remote directory=" << dir_root << ": " << cpp_strerror(r)
            << dendl;
       return r;
     }
 
-    r = ceph_open(m_remote_mount, dir_path.c_str(), O_RDONLY | O_DIRECTORY, 0);
+    r = ceph_open(m_remote_mount, dir_root.c_str(), O_RDONLY | O_DIRECTORY, 0);
     if (r < 0) {
-      derr << ": failed to open remote dir_path=" << dir_path << ": " << cpp_strerror(r)
+      derr << ": failed to open remote dir_root=" << dir_root << ": " << cpp_strerror(r)
            << dendl;
       return r;
     }
@@ -360,51 +375,51 @@ int PeerReplayer::try_lock_directory(const std::string &dir_path,
   r = ceph_flock(m_remote_mount, fd, LOCK_EX | LOCK_NB, (uint64_t)replayer->get_thread_id());
   if (r != 0) {
     if (r == -EWOULDBLOCK) {
-      dout(5) << ": dir_path=" << dir_path << " is locked by cephfs-mirror, "
+      dout(5) << ": dir_root=" << dir_root << " is locked by cephfs-mirror, "
               << "will retry again" << dendl;
     } else {
-      derr << ": failed to lock dir_path=" << dir_path << ": " << cpp_strerror(r)
+      derr << ": failed to lock dir_root=" << dir_root << ": " << cpp_strerror(r)
            << dendl;
     }
 
     if (ceph_close(m_remote_mount, fd) < 0) {
-      derr << ": failed to close (cleanup) remote dir_path=" << dir_path << ": "
+      derr << ": failed to close (cleanup) remote dir_root=" << dir_root << ": "
            << cpp_strerror(r) << dendl;
     }
     return r;
   }
 
-  dout(10) << ": dir_path=" << dir_path << " locked" << dendl;
+  dout(10) << ": dir_root=" << dir_root << " locked" << dendl;
 
   registry->fd = fd;
   registry->replayer = replayer;
   return 0;
 }
 
-void PeerReplayer::unlock_directory(const std::string &dir_path, const DirRegistry &registry) {
-  dout(20) << ": dir_path=" << dir_path << dendl;
+void PeerReplayer::unlock_directory(const std::string &dir_root, const DirRegistry &registry) {
+  dout(20) << ": dir_root=" << dir_root << dendl;
 
   int r = ceph_flock(m_remote_mount, registry.fd, LOCK_UN,
                      (uint64_t)registry.replayer->get_thread_id());
   if (r < 0) {
-    derr << ": failed to unlock remote dir_path=" << dir_path << ": " << cpp_strerror(r)
+    derr << ": failed to unlock remote dir_root=" << dir_root << ": " << cpp_strerror(r)
          << dendl;
     return;
   }
 
   r = ceph_close(m_remote_mount, registry.fd);
   if (r < 0) {
-    derr << ": failed to close remote dir_path=" << dir_path << ": " << cpp_strerror(r)
+    derr << ": failed to close remote dir_root=" << dir_root << ": " << cpp_strerror(r)
          << dendl;
   }
 
-  dout(10) << ": dir_path=" << dir_path << " unlocked" << dendl;
+  dout(10) << ": dir_root=" << dir_root << " unlocked" << dendl;
 }
 
-int PeerReplayer::build_snap_map(const std::string &dir_path,
+int PeerReplayer::build_snap_map(const std::string &dir_root,
                                  std::map<uint64_t, std::string> *snap_map, bool is_remote) {
-  auto snap_dir = snapshot_dir_path(m_cct, dir_path);
-  dout(20) << ": dir_path=" << dir_path << ", snap_dir=" << snap_dir
+  auto snap_dir = snapshot_dir_path(m_cct, dir_root);
+  dout(20) << ": dir_root=" << dir_root << ", snap_dir=" << snap_dir
            << ", is_remote=" << is_remote << dendl;
 
   auto lr_str = is_remote ? "remote" : "local";
@@ -484,73 +499,72 @@ int PeerReplayer::build_snap_map(const std::string &dir_path,
   return rv;
 }
 
-int PeerReplayer::propagate_snap_deletes(const std::string &dir_path,
+int PeerReplayer::propagate_snap_deletes(const std::string &dir_root,
                                          const std::set<std::string> &snaps) {
-  dout(5) << ": dir_path=" << dir_path << ", deleted snapshots=" << snaps << dendl;
+  dout(5) << ": dir_root=" << dir_root << ", deleted snapshots=" << snaps << dendl;
 
   for (auto &snap : snaps) {
-    dout(20) << ": deleting dir_path=" << dir_path << ", snapshot=" << snap
+    dout(20) << ": deleting dir_root=" << dir_root << ", snapshot=" << snap
              << dendl;
-    int r = ceph_rmsnap(m_remote_mount, dir_path.c_str(), snap.c_str());
+    int r = ceph_rmsnap(m_remote_mount, dir_root.c_str(), snap.c_str());
     if (r < 0) {
-      derr << ": failed to delete remote snap dir_path=" << dir_path
+      derr << ": failed to delete remote snap dir_root=" << dir_root
            << ", snapshot=" << snaps << ": " << cpp_strerror(r) << dendl;
       return r;
     }
-    inc_deleted_snap(dir_path);
+    inc_deleted_snap(dir_root);
   }
 
   return 0;
 }
 
 int PeerReplayer::propagate_snap_renames(
-    const std::string &dir_path,
+    const std::string &dir_root,
     const std::set<std::pair<std::string,std::string>> &snaps) {
-  dout(10) << ": dir_path=" << dir_path << ", renamed snapshots=" << snaps << dendl;
+  dout(10) << ": dir_root=" << dir_root << ", renamed snapshots=" << snaps << dendl;
 
   for (auto &snapp : snaps) {
-    auto from = snapshot_path(m_cct, dir_path, snapp.first);
-    auto to = snapshot_path(m_cct, dir_path, snapp.second);
-    dout(20) << ": renaming dir_path=" << dir_path << ", snapshot from="
+    auto from = snapshot_path(m_cct, dir_root, snapp.first);
+    auto to = snapshot_path(m_cct, dir_root, snapp.second);
+    dout(20) << ": renaming dir_root=" << dir_root << ", snapshot from="
              << from << ", to=" << to << dendl;
     int r = ceph_rename(m_remote_mount, from.c_str(), to.c_str());
     if (r < 0) {
-      derr << ": failed to rename remote snap dir_path=" << dir_path
+      derr << ": failed to rename remote snap dir_root=" << dir_root
            << ", snapshot from =" << from << ", to=" << to << ": "
            << cpp_strerror(r) << dendl;
       return r;
     }
-    inc_renamed_snap(dir_path);
+    inc_renamed_snap(dir_root);
   }
 
   return 0;
 }
 
-int PeerReplayer::remote_mkdir(const std::string &local_path,
-                               const std::string &remote_path,
-                               const struct ceph_statx &stx) {
-  dout(10) << ": local_path=" << local_path << ", remote_path=" << remote_path
-           << dendl;
+int PeerReplayer::remote_mkdir(const std::string &epath, const struct ceph_statx &stx,
+                               const FHandles &fh) {
+  dout(10) << ": remote epath=" << epath << dendl;
 
-  int r = ceph_mkdir(m_remote_mount, remote_path.c_str(), stx.stx_mode & ~S_IFDIR);
+  int r = ceph_mkdirat(m_remote_mount, fh.r_fd_dir_root, epath.c_str(), stx.stx_mode & ~S_IFDIR);
   if (r < 0 && r != -EEXIST) {
-    derr << ": failed to create remote directory=" << remote_path << ": " << cpp_strerror(r)
+    derr << ": failed to create remote directory=" << epath << ": " << cpp_strerror(r)
          << dendl;
     return r;
   }
 
-  r = ceph_lchown(m_remote_mount, remote_path.c_str(), stx.stx_uid, stx.stx_gid);
+  r = ceph_chownat(m_remote_mount, fh.r_fd_dir_root, epath.c_str(), stx.stx_uid, stx.stx_gid,
+                   AT_SYMLINK_NOFOLLOW);
   if (r < 0) {
-    derr << ": failed to chown remote directory=" << remote_path << ": " << cpp_strerror(r)
+    derr << ": failed to chown remote directory=" << epath << ": " << cpp_strerror(r)
          << dendl;
     return r;
   }
 
-  struct timeval times[] = {{stx.stx_atime.tv_sec, stx.stx_atime.tv_nsec / 1000},
-                            {stx.stx_mtime.tv_sec, stx.stx_mtime.tv_nsec / 1000}};
-  r = ceph_lutimes(m_remote_mount, remote_path.c_str(), times);
+  struct timespec times[] = {{stx.stx_atime.tv_sec, stx.stx_atime.tv_nsec},
+                             {stx.stx_mtime.tv_sec, stx.stx_mtime.tv_nsec}};
+  r = ceph_utimensat(m_remote_mount, fh.r_fd_dir_root, epath.c_str(), times, AT_SYMLINK_NOFOLLOW);
   if (r < 0) {
-    derr << ": failed to change [am]time on remote directory=" << remote_path << ": "
+    derr << ": failed to change [am]time on remote directory=" << epath << ": "
          << cpp_strerror(r) << dendl;
     return r;
   }
@@ -560,29 +574,26 @@ int PeerReplayer::remote_mkdir(const std::string &local_path,
 
 #define NR_IOVECS 8 // # iovecs
 #define IOVEC_SIZE (8 * 1024 * 1024) // buffer size for each iovec
-int PeerReplayer::remote_copy(const std::string &dir_path,
-                              const std::string &local_path,
-                              const std::string &remote_path,
-                              const struct ceph_statx &stx) {
-  dout(10) << ": dir_path=" << dir_path << ", local_path=" << local_path
-           << ", remote_path=" << remote_path << dendl;
+int PeerReplayer::copy_to_remote(const std::string &dir_root,  const std::string &epath,
+                                 const struct ceph_statx &stx, const FHandles &fh) {
+  dout(10) << ": dir_root=" << dir_root << ", epath=" << epath << dendl;
   int l_fd;
   int r_fd;
   void *ptr;
   struct iovec iov[NR_IOVECS];
 
-  int r = ceph_open(m_local_mount, local_path.c_str(), O_RDONLY, 0);
+  int r = ceph_openat(m_local_mount, fh.c_fd, epath.c_str(), O_RDONLY | O_NOFOLLOW, 0);
   if (r < 0) {
-    derr << ": failed to open local file path=" << local_path << ": "
+    derr << ": failed to open local file path=" << epath << ": "
          << cpp_strerror(r) << dendl;
     return r;
   }
 
   l_fd = r;
-  r = ceph_open(m_remote_mount, remote_path.c_str(),
-                O_CREAT | O_TRUNC | O_WRONLY, stx.stx_mode);
+  r = ceph_openat(m_remote_mount, fh.r_fd_dir_root, epath.c_str(),
+                  O_CREAT | O_TRUNC | O_WRONLY | O_NOFOLLOW, stx.stx_mode);
   if (r < 0) {
-    derr << ": failed to create remote file path=" << remote_path << ": "
+    derr << ": failed to create remote file path=" << epath << ": "
          << cpp_strerror(r) << dendl;
     goto close_local_fd;
   }
@@ -596,7 +607,7 @@ int PeerReplayer::remote_copy(const std::string &dir_path,
   }
 
   while (true) {
-    if (should_backoff(dir_path, &r)) {
+    if (should_backoff(dir_root, &r)) {
       dout(0) << ": backing off r=" << r << dendl;
       break;
     }
@@ -608,7 +619,7 @@ int PeerReplayer::remote_copy(const std::string &dir_path,
 
     r = ceph_preadv(m_local_mount, l_fd, iov, NR_IOVECS, -1);
     if (r < 0) {
-      derr << ": failed to read local file path=" << local_path << ": "
+      derr << ": failed to read local file path=" << epath << ": "
            << cpp_strerror(r) << dendl;
       break;
     }
@@ -625,7 +636,7 @@ int PeerReplayer::remote_copy(const std::string &dir_path,
 
     r = ceph_pwritev(m_remote_mount, r_fd, iov, iovs, -1);
     if (r < 0) {
-      derr << ": failed to write remote file path=" << remote_path << ": "
+      derr << ": failed to write remote file path=" << epath << ": "
            << cpp_strerror(r) << dendl;
       break;
     }
@@ -634,7 +645,7 @@ int PeerReplayer::remote_copy(const std::string &dir_path,
   if (r == 0) {
     r = ceph_fsync(m_remote_mount, r_fd, 0);
     if (r < 0) {
-      derr << ": failed to sync data for dir_path=" << remote_path << ": "
+      derr << ": failed to sync data for file path=" << epath << ": "
            << cpp_strerror(r) << dendl;
     }
   }
@@ -643,14 +654,14 @@ int PeerReplayer::remote_copy(const std::string &dir_path,
 
 close_remote_fd:
   if (ceph_close(m_remote_mount, r_fd) < 0) {
-    derr << ": failed to close remote fd path=" << remote_path << ": " << cpp_strerror(r)
+    derr << ": failed to close remote fd path=" << epath << ": " << cpp_strerror(r)
          << dendl;
     return -EINVAL;
   }
 
 close_local_fd:
   if (ceph_close(m_local_mount, l_fd) < 0) {
-    derr << ": failed to close local fd path=" << local_path << ": " << cpp_strerror(r)
+    derr << ": failed to close local fd path=" << epath << ": " << cpp_strerror(r)
          << dendl;
     return -EINVAL;
   }
@@ -658,91 +669,98 @@ close_local_fd:
   return r == 0 ? 0 : r;
 }
 
-int PeerReplayer::remote_file_op(const std::string &dir_path,
-                                 const std::string &local_path,
-                                 const std::string &remote_path,
-                                 const struct ceph_statx &stx) {
-  dout(10) << ": dir_path=" << dir_path << ", local_path=" << local_path
-           << ", remote_path=" << remote_path << dendl;
+int PeerReplayer::remote_file_op(const std::string &dir_root, const std::string &epath,
+                                 const struct ceph_statx &stx, const FHandles &fh,
+                                 bool need_data_sync, bool need_attr_sync) {
+  dout(10) << ": dir_root=" << dir_root << ", epath=" << epath << ", need_data_sync=" << need_data_sync
+           << ", need_attr_sync=" << need_attr_sync << dendl;
 
   int r;
-  if (S_ISREG(stx.stx_mode)) {
-    r = remote_copy(dir_path, local_path, remote_path, stx);
-    if (r < 0) {
-      derr << ": failed to copy path=" << local_path << ": " << cpp_strerror(r)
-           << dendl;
-      return r;
+  if (need_data_sync) {
+    if (S_ISREG(stx.stx_mode)) {
+      r = copy_to_remote(dir_root, epath, stx, fh);
+      if (r < 0) {
+        derr << ": failed to copy path=" << epath << ": " << cpp_strerror(r) << dendl;
+        return r;
+      }
+    } else if (S_ISLNK(stx.stx_mode)) {
+      // free the remote link before relinking
+      r = ceph_unlinkat(m_remote_mount, fh.r_fd_dir_root, epath.c_str(), 0);
+      if (r < 0 && r != -ENOENT) {
+        derr << ": failed to remove remote symlink=" << epath << dendl;
+        return r;
+      }
+      char *target = (char *)alloca(stx.stx_size+1);
+      r = ceph_readlinkat(m_local_mount, fh.c_fd, epath.c_str(), target, stx.stx_size);
+      if (r < 0) {
+        derr << ": failed to readlink local path=" << epath << ": " << cpp_strerror(r)
+             << dendl;
+        return r;
+      }
+
+      target[stx.stx_size] = '\0';
+      r = ceph_symlinkat(m_remote_mount, target, fh.r_fd_dir_root, epath.c_str());
+      if (r < 0 && r != EEXIST) {
+        derr << ": failed to symlink remote path=" << epath << " to target=" << target
+             << ": " << cpp_strerror(r) << dendl;
+        return r;
+      }
+    } else {
+      dout(5) << ": skipping entry=" << epath << ": unsupported mode=" << stx.stx_mode
+              << dendl;
+      return 0;
     }
-  } else if (S_ISLNK(stx.stx_mode)) {
-    char *target = (char *)alloca(stx.stx_size+1);
-    r = ceph_readlink(m_local_mount, local_path.c_str(), target, stx.stx_size);
+  }
+
+  if (need_attr_sync) {
+    r = ceph_chownat(m_remote_mount, fh.r_fd_dir_root, epath.c_str(), stx.stx_uid, stx.stx_gid,
+                     AT_SYMLINK_NOFOLLOW);
     if (r < 0) {
-      derr << ": failed to readlink local path=" << local_path << ": " << cpp_strerror(r)
+      derr << ": failed to chown remote directory=" << epath << ": " << cpp_strerror(r)
            << dendl;
       return r;
     }
 
-    target[stx.stx_size] = '\0';
-    r = ceph_symlink(m_remote_mount, target, remote_path.c_str());
-    if (r < 0 && r != EEXIST) {
-      derr << ": failed to symlink remote path=" << remote_path << " to target=" << target
-           << ": " << cpp_strerror(r) << dendl;
+    struct timespec times[] = {{stx.stx_atime.tv_sec, stx.stx_atime.tv_nsec},
+                               {stx.stx_mtime.tv_sec, stx.stx_mtime.tv_nsec}};
+    r = ceph_utimensat(m_remote_mount, fh.r_fd_dir_root, epath.c_str(), times, AT_SYMLINK_NOFOLLOW);
+    if (r < 0) {
+      derr << ": failed to change [am]time on remote directory=" << epath << ": "
+           << cpp_strerror(r) << dendl;
       return r;
     }
-  } else {
-    dout(5) << ": skipping entry=" << local_path << ": unsupported mode=" << stx.stx_mode
-            << dendl;
-    return 0;
-  }
-
-  r = ceph_lchown(m_remote_mount, remote_path.c_str(), stx.stx_uid, stx.stx_gid);
-  if (r < 0) {
-    derr << ": failed to chown remote directory=" << remote_path << ": "
-         << cpp_strerror(r) << dendl;
-    return r;
-  }
-
-  struct timeval times[] = {{stx.stx_atime.tv_sec, stx.stx_atime.tv_nsec / 1000},
-                            {stx.stx_mtime.tv_sec, stx.stx_mtime.tv_nsec / 1000}};
-  r = ceph_lutimes(m_remote_mount, remote_path.c_str(), times);
-  if (r < 0) {
-    derr << ": failed to change [am]time on remote directory=" << remote_path << ": "
-         << cpp_strerror(r) << dendl;
-    return r;
   }
 
   return 0;
 }
 
-int PeerReplayer::cleanup_remote_dir(const std::string &dir_root, const std::string &path) {
-  dout(20) << ": dir_root=" << dir_root << ", path=" << path
+int PeerReplayer::cleanup_remote_dir(const std::string &dir_root,
+                                     const std::string &epath, const FHandles &fh) {
+  dout(20) << ": dir_root=" << dir_root << ", epath=" << epath
            << dendl;
 
-  std::stack<SyncEntry> rm_stack;
-  ceph_dir_result *tdirp;
-  auto dir_path = dir_root;
-  if (!path.empty()) {
-    dir_path = entry_path(dir_root, path);
-  }
-  int r = ceph_opendir(m_remote_mount, dir_path.c_str(), &tdirp);
+  struct ceph_statx tstx;
+  int r = ceph_statxat(m_remote_mount, fh.r_fd_dir_root, epath.c_str(), &tstx,
+                       CEPH_STATX_MODE | CEPH_STATX_UID | CEPH_STATX_GID |
+                       CEPH_STATX_SIZE | CEPH_STATX_ATIME | CEPH_STATX_MTIME,
+                       AT_NO_ATTR_SYNC | AT_SYMLINK_NOFOLLOW);
   if (r < 0) {
-    derr << ": failed to open remote directory=" << dir_path << ": "
+    derr << ": failed to stat remote directory=" << epath << ": "
          << cpp_strerror(r) << dendl;
     return r;
   }
 
-  struct ceph_statx tstx;
-  r = ceph_statx(m_remote_mount, dir_path.c_str(), &tstx,
-                 CEPH_STATX_MODE | CEPH_STATX_UID | CEPH_STATX_GID |
-                 CEPH_STATX_SIZE | CEPH_STATX_ATIME | CEPH_STATX_MTIME,
-                 AT_NO_ATTR_SYNC | AT_SYMLINK_NOFOLLOW);
+  ceph_dir_result *tdirp;
+  r = opendirat(m_remote_mount, fh.r_fd_dir_root, epath, AT_SYMLINK_NOFOLLOW,
+                &tdirp);
   if (r < 0) {
-    derr << ": failed to stat remote directory=" << dir_path << ": "
+    derr << ": failed to open remote directory=" << epath << ": "
          << cpp_strerror(r) << dendl;
     return r;
   }
 
-  rm_stack.emplace(SyncEntry(dir_path, tdirp, tstx));
+  std::stack<SyncEntry> rm_stack;
+  rm_stack.emplace(SyncEntry(epath, tdirp, tstx));
   while (!rm_stack.empty()) {
     if (should_backoff(dir_root, &r)) {
       dout(0) << ": backing off r=" << r << dendl;
@@ -775,13 +793,11 @@ int PeerReplayer::cleanup_remote_dir(const std::string &dir_root, const std::str
       }
 
       if (r == 0) {
-        if (rm_stack.size() > 1) {
-          r = ceph_rmdir(m_remote_mount, entry.epath.c_str());
-          if (r < 0) {
-            derr << ": failed to remove remote directory=" << entry.epath << ": "
-                 << cpp_strerror(r) << dendl;
-            break;
-          }
+        r = ceph_unlinkat(m_remote_mount, fh.r_fd_dir_root, entry.epath.c_str(), AT_REMOVEDIR);
+        if (r < 0) {
+          derr << ": failed to remove remote directory=" << entry.epath << ": "
+               << cpp_strerror(r) << dendl;
+          break;
         }
 
         dout(10) << ": done for remote directory=" << entry.epath << dendl;
@@ -798,7 +814,8 @@ int PeerReplayer::cleanup_remote_dir(const std::string &dir_root, const std::str
       auto epath = entry_path(entry.epath, e_name);
       if (S_ISDIR(stx.stx_mode)) {
         ceph_dir_result *dirp;
-        r = ceph_opendir(m_remote_mount, epath.c_str(), &dirp);
+        r = opendirat(m_remote_mount, fh.r_fd_dir_root, epath, AT_SYMLINK_NOFOLLOW,
+                      &dirp);
         if (r < 0) {
           derr << ": failed to open remote directory=" << epath << ": "
                << cpp_strerror(r) << dendl;
@@ -809,7 +826,7 @@ int PeerReplayer::cleanup_remote_dir(const std::string &dir_root, const std::str
         rm_stack.emplace(SyncEntry(epath, stx));
       }
     } else {
-      r = ceph_unlink(m_remote_mount, entry.epath.c_str());
+      r = ceph_unlinkat(m_remote_mount, fh.r_fd_dir_root, entry.epath.c_str(), 0);
       if (r < 0) {
         derr << ": failed to remove remote directory=" << entry.epath << ": "
              << cpp_strerror(r) << dendl;
@@ -835,34 +852,324 @@ int PeerReplayer::cleanup_remote_dir(const std::string &dir_root, const std::str
   return r;
 }
 
-int PeerReplayer::do_synchronize(const std::string &dir_path, const std::string &snap_name) {
-  dout(20) << ": dir_path=" << dir_path << ", snap_name=" << snap_name << dendl;
+int PeerReplayer::should_sync_entry(const std::string &epath, const struct ceph_statx &cstx,
+                                    const FHandles &fh, bool *need_data_sync, bool *need_attr_sync) {
+  dout(10) << ": epath=" << epath << dendl;
+
+  *need_data_sync = false;
+  *need_attr_sync = false;
+  struct ceph_statx pstx;
+  int r = ceph_statxat(fh.p_mnt, fh.p_fd, epath.c_str(), &pstx,
+                       CEPH_STATX_MODE | CEPH_STATX_UID | CEPH_STATX_GID |
+                       CEPH_STATX_SIZE | CEPH_STATX_CTIME | CEPH_STATX_MTIME,
+                       AT_NO_ATTR_SYNC | AT_SYMLINK_NOFOLLOW);
+  if (r < 0 && r != -ENOENT && r != -ENOTDIR) {
+    derr << ": failed to stat prev entry= " << epath << ": " << cpp_strerror(r)
+         << dendl;
+    return r;
+  }
 
-  auto snap_path = snapshot_path(m_cct, dir_path, snap_name);
-  std::stack<SyncEntry> sync_stack;
+  if (r < 0) {
+    // inode does not exist in prev snapshot or file type has changed
+    // (file was S_IFREG earlier, S_IFDIR now).
+    dout(5) << ": entry=" << epath << ", r=" << r << dendl;
+    *need_data_sync = true;
+    *need_attr_sync = true;
+    return 0;
+  }
 
-  ceph_dir_result *tdirp;
-  int r = ceph_opendir(m_local_mount, snap_path.c_str(), &tdirp);
+  dout(10) << ": local cur statx: mode=" << cstx.stx_mode << ", uid=" << cstx.stx_uid
+           << ", gid=" << cstx.stx_gid << ", size=" << cstx.stx_size << ", ctime="
+           << cstx.stx_ctime << ", mtime=" << cstx.stx_mtime << dendl;
+  dout(10) << ": local prev statx: mode=" << pstx.stx_mode << ", uid=" << pstx.stx_uid
+           << ", gid=" << pstx.stx_gid << ", size=" << pstx.stx_size << ", ctime="
+           << pstx.stx_ctime << ", mtime=" << pstx.stx_mtime << dendl;
+  if ((cstx.stx_mode & S_IFMT) != (pstx.stx_mode & S_IFMT)) {
+    dout(5) << ": entry=" << epath << " has mode mismatch" << dendl;
+    *need_data_sync = true;
+    *need_attr_sync = true;
+  } else {
+    *need_data_sync = (cstx.stx_size != pstx.stx_size) || (cstx.stx_mtime != pstx.stx_mtime);
+    *need_attr_sync = (cstx.stx_ctime != pstx.stx_ctime);
+  }
+
+  return 0;
+}
+
+int PeerReplayer::propagate_deleted_entries(const std::string &dir_root,
+                                            const std::string &epath, const FHandles &fh) {
+  dout(10) << ": dir_root=" << dir_root << ", epath=" << epath << dendl;
+
+  ceph_dir_result *dirp;
+  int r = opendirat(fh.p_mnt, fh.p_fd, epath, AT_SYMLINK_NOFOLLOW, &dirp);
   if (r < 0) {
-    derr << ": failed to open local directory=" << snap_path << ": "
-         << cpp_strerror(r) << dendl;
+    if (r == -ELOOP) {
+      dout(5) << ": epath=" << epath << " is a symbolic link -- mode sync"
+              << " done when traversing parent" << dendl;
+      return 0;
+    }
+    if (r == -ENOTDIR) {
+      dout(5) << ": epath=" << epath << " is not a directory -- mode sync"
+              << " done when traversing parent" << dendl;
+      return 0;
+    }
+    if (r == -ENOENT) {
+      dout(5) << ": epath=" << epath << " missing in previous-snap/remote dir-root"
+              << dendl;
+    }
+    return r;
+  }
+
+  struct dirent *dire = (struct dirent *)alloca(512 * sizeof(struct dirent));
+  while (true) {
+    if (should_backoff(dir_root, &r)) {
+      dout(0) << ": backing off r=" << r << dendl;
+      break;
+    }
+
+    int len = ceph_getdents(fh.p_mnt, dirp, (char *)dire, 512);
+    if (len < 0) {
+      derr << ": failed to read directory entries: " << cpp_strerror(len) << dendl;
+      r = len;
+      // flip errno to signal that we got an err (possible the
+      // snapshot getting deleted in midst).
+      if (r == -ENOENT) {
+        r = -EINVAL;
+      }
+      break;
+    }
+    if (len == 0) {
+      dout(10) << ": reached EOD" << dendl;
+      break;
+    }
+    int nr = len / sizeof(struct dirent);
+    for (int i = 0; i < nr; ++i) {
+      if (should_backoff(dir_root, &r)) {
+        dout(0) << ": backing off r=" << r << dendl;
+        break;
+      }
+      std::string d_name = std::string(dire[i].d_name);
+      if (d_name == "." || d_name == "..") {
+        continue;
+      }
+
+      struct ceph_statx pstx;
+      auto dpath = entry_path(epath, d_name);
+      r = ceph_statxat(fh.p_mnt, fh.p_fd, dpath.c_str(), &pstx,
+                       CEPH_STATX_MODE, AT_NO_ATTR_SYNC | AT_SYMLINK_NOFOLLOW);
+      if (r < 0) {
+        derr << ": failed to stat (prev) directory=" << dpath << ": "
+             << cpp_strerror(r) << dendl;
+        // flip errno to signal that we got an err (possible the
+        // snapshot getting deleted in midst).
+        if (r == -ENOENT) {
+          r = -EINVAL;
+        }
+        return r;
+      }
+
+      struct ceph_statx cstx;
+      r = ceph_statxat(m_local_mount, fh.c_fd, dpath.c_str(), &cstx,
+                       CEPH_STATX_MODE, AT_NO_ATTR_SYNC | AT_SYMLINK_NOFOLLOW);
+      if (r < 0 && r != -ENOENT) {
+        derr << ": failed to stat local (cur) directory=" << dpath << ": "
+             << cpp_strerror(r) << dendl;
+        return r;
+      }
+
+      bool purge_remote = true;
+      if (r == 0) {
+        // directory entry present in both snapshots -- check inode
+        // type
+        if ((pstx.stx_mode & S_IFMT) == (cstx.stx_mode & S_IFMT)) {
+          dout(5) << ": mode matches for entry=" << d_name << dendl;
+          purge_remote = false;
+        } else {
+          dout(5) << ": mode mismatch for entry=" << d_name << dendl;
+        }
+      } else {
+        dout(5) << ": entry=" << d_name << " missing in current snapshot" << dendl;
+      }
+
+      if (purge_remote) {
+        dout(5) << ": purging remote entry=" << dpath << dendl;
+        if (S_ISDIR(pstx.stx_mode)) {
+          r = cleanup_remote_dir(dir_root, dpath, fh);
+        } else {
+          r = ceph_unlinkat(m_remote_mount, fh.r_fd_dir_root, dpath.c_str(), 0);
+        }
+
+        if (r < 0 && r != -ENOENT) {
+          derr << ": failed to cleanup remote entry=" << d_name << ": "
+               << cpp_strerror(r) << dendl;
+          return r;
+        }
+      }
+    }
+  }
+
+  ceph_closedir(fh.p_mnt, dirp);
+  return r;
+}
+
+int PeerReplayer::open_dir(MountRef mnt, const std::string &dir_path,
+                           boost::optional<uint64_t> snap_id) {
+  dout(20) << ": dir_path=" << dir_path << dendl;
+  if (snap_id) {
+    dout(20) << ": expected snapshot id=" << *snap_id << dendl;
+  }
+
+  int fd = ceph_open(mnt, dir_path.c_str(), O_DIRECTORY | O_RDONLY, 0);
+  if (fd < 0) {
+    derr << ": cannot open dir_path=" << dir_path << ": " << cpp_strerror(fd)
+         << dendl;
+    return fd;
+  }
+
+  if (!snap_id) {
+    return fd;
+  }
+
+  snap_info info;
+  int r = ceph_get_snap_info(mnt, dir_path.c_str(), &info);
+  if (r < 0) {
+    derr << ": failed to fetch snap_info for path=" << dir_path
+         << ": " << cpp_strerror(r) << dendl;
+    ceph_close(mnt, fd);
+    return r;
+  }
+
+  if (info.id != *snap_id) {
+    dout(5) << ": got mismatching snapshot id for path=" << dir_path << " (" << info.id
+            << " vs " << *snap_id << ") -- possible recreate" << dendl;
+    ceph_close(mnt, fd);
+    return -EINVAL;
+  }
+
+  return fd;
+}
+
+int PeerReplayer::pre_sync_check_and_open_handles(
+    const std::string &dir_root,
+    const Snapshot &current, boost::optional<Snapshot> prev,
+    FHandles *fh) {
+  dout(20) << ": dir_root=" << dir_root << ", current=" << current << dendl;
+  if (prev) {
+    dout(20) << ": prev=" << prev << dendl;
+  }
+
+  auto cur_snap_path = snapshot_path(m_cct, dir_root, current.first);
+  auto fd = open_dir(m_local_mount, cur_snap_path, current.second);
+  if (fd < 0) {
+    return fd;
+  }
+
+  // current snapshot file descriptor
+  fh->c_fd = fd;
+
+  MountRef mnt;
+  if (prev) {
+    mnt = m_local_mount;
+    auto prev_snap_path = snapshot_path(m_cct, dir_root, (*prev).first);
+    fd = open_dir(mnt, prev_snap_path, (*prev).second);
+  } else {
+    mnt = m_remote_mount;
+    fd = open_dir(mnt, dir_root, boost::none);
+  }
+
+  if (fd < 0) {
+    if (!prev || fd != -ENOENT) {
+      ceph_close(m_local_mount, fh->c_fd);
+      return fd;
+    }
+
+    // ENOENT of previous snap
+    dout(5) << ": previous snapshot=" << *prev << " missing" << dendl;
+    mnt = m_remote_mount;
+    fd = open_dir(mnt, dir_root, boost::none);
+    if (fd < 0) {
+      ceph_close(m_local_mount, fh->c_fd);
+      return fd;
+    }
+  }
+
+  // "previous" snapshot or dir_root file descriptor
+  fh->p_fd = fd;
+  fh->p_mnt = mnt;
+
+  {
+    std::scoped_lock locker(m_lock);
+    auto it = m_registered.find(dir_root);
+    ceph_assert(it != m_registered.end());
+    fh->r_fd_dir_root = it->second.fd;
+  }
+
+  dout(5) << ": using " << ((fh->p_mnt == m_local_mount) ? "local (previous) snapshot" : "remote dir_root")
+          << " for incremental transfer" << dendl;
+  return 0;
+}
+
+void PeerReplayer::post_sync_close_handles(const FHandles &fh) {
+  dout(20) << dendl;
+
+  // @FHandles.r_fd_dir_root is closed in @unregister_directory since
+  // its used to acquire an exclusive lock on remote dir_root.
+  ceph_close(m_local_mount, fh.c_fd);
+  ceph_close(fh.p_mnt, fh.p_fd);
+}
+
+int PeerReplayer::do_synchronize(const std::string &dir_root, const Snapshot &current,
+                                 boost::optional<Snapshot> prev) {
+  dout(20) << ": dir_root=" << dir_root << ", current=" << current << dendl;
+  if (prev) {
+    dout(20) << ": incremental sync check from prev=" << prev << dendl;
+  }
+
+  FHandles fh;
+  int r = pre_sync_check_and_open_handles(dir_root, current, prev, &fh);
+  if (r < 0) {
+    dout(5) << ": cannot proceeed with sync: " << cpp_strerror(r) << dendl;
+    return r;
+  }
+
+  BOOST_SCOPE_EXIT_ALL( (this)(&fh) ) {
+    post_sync_close_handles(fh);
+  };
+
+  // record that we are going to "dirty" the data under this
+  // directory root
+  auto snap_id_str{stringify(current.second)};
+  r = ceph_fsetxattr(m_remote_mount, fh.r_fd_dir_root, "ceph.mirror.dirty_snap_id",
+                     snap_id_str.c_str(), snap_id_str.size(), 0);
+  if (r < 0) {
+    derr << ": error setting \"ceph.mirror.dirty_snap_id\" on dir_root=" << dir_root
+         << ": " << cpp_strerror(r) << dendl;
     return r;
   }
 
   struct ceph_statx tstx;
-  r = ceph_statx(m_local_mount, snap_path.c_str(), &tstx,
-                 CEPH_STATX_MODE | CEPH_STATX_UID | CEPH_STATX_GID |
-                 CEPH_STATX_SIZE | CEPH_STATX_ATIME | CEPH_STATX_MTIME,
-                 AT_NO_ATTR_SYNC | AT_SYMLINK_NOFOLLOW);
+  r = ceph_fstatx(m_local_mount, fh.c_fd, &tstx,
+                  CEPH_STATX_MODE | CEPH_STATX_UID | CEPH_STATX_GID |
+                  CEPH_STATX_SIZE | CEPH_STATX_ATIME | CEPH_STATX_MTIME,
+                  AT_NO_ATTR_SYNC | AT_SYMLINK_NOFOLLOW);
   if (r < 0) {
-    derr << ": failed to stat local directory=" << snap_path << ": "
-         << cpp_strerror(r) << dendl;
+    derr << ": failed to stat snap=" << current.first << ": " << cpp_strerror(r)
+         << dendl;
     return r;
   }
 
-  sync_stack.emplace(SyncEntry("/", tdirp, tstx));
+  ceph_dir_result *tdirp;
+  r = ceph_fdopendir(m_local_mount, fh.c_fd, &tdirp);
+  if (r < 0) {
+    derr << ": failed to open local snap=" << current.first << ": " << cpp_strerror(r)
+         << dendl;
+    return r;
+  }
+
+  std::stack<SyncEntry> sync_stack;
+  sync_stack.emplace(SyncEntry(".", tdirp, tstx));
   while (!sync_stack.empty()) {
-    if (should_backoff(dir_path, &r)) {
+    if (should_backoff(dir_root, &r)) {
       dout(0) << ": backing off r=" << r << dendl;
       break;
     }
@@ -872,6 +1179,17 @@ int PeerReplayer::do_synchronize(const std::string &dir_path, const std::string
     auto &entry = sync_stack.top();
     dout(20) << ": top of stack path=" << entry.epath << dendl;
     if (entry.is_directory()) {
+      // entry is a directory -- propagate deletes for missing entries
+      // (and changed inode types) to the remote filesystem.
+      if (!entry.needs_remote_sync()) {
+        r = propagate_deleted_entries(dir_root, entry.epath, fh);
+        if (r < 0 && r != -ENOENT) {
+          derr << ": failed to propagate missing dirs: " << cpp_strerror(r) << dendl;
+          break;
+        }
+        entry.set_remote_synced();
+      }
+
       struct ceph_statx stx;
       struct dirent de;
       while (true) {
@@ -907,17 +1225,15 @@ int PeerReplayer::do_synchronize(const std::string &dir_path, const std::string
       }
 
       auto epath = entry_path(entry.epath, e_name);
-      auto l_path = entry_path(snap_path, epath);
-      auto r_path = entry_path(dir_path, epath);
       if (S_ISDIR(stx.stx_mode)) {
-        r = remote_mkdir(l_path, r_path, stx);
+        r = remote_mkdir(epath, stx, fh);
         if (r < 0) {
           break;
         }
         ceph_dir_result *dirp;
-        r = ceph_opendir(m_local_mount, l_path.c_str(), &dirp);
+        r = opendirat(m_local_mount, fh.c_fd, epath, AT_SYMLINK_NOFOLLOW, &dirp);
         if (r < 0) {
-          derr << ": failed to open local directory=" << l_path << ": "
+          derr << ": failed to open local directory=" << epath << ": "
                << cpp_strerror(r) << dendl;
           break;
         }
@@ -926,13 +1242,24 @@ int PeerReplayer::do_synchronize(const std::string &dir_path, const std::string
         sync_stack.emplace(SyncEntry(epath, stx));
       }
     } else {
-      auto l_path = entry_path(snap_path, entry.epath);
-      auto r_path = entry_path(dir_path, entry.epath);
-      r = remote_file_op(dir_path, l_path, r_path, entry.stx);
+      bool need_data_sync = true;
+      bool need_attr_sync = true;
+      r = should_sync_entry(entry.epath, entry.stx, fh,
+                            &need_data_sync, &need_attr_sync);
       if (r < 0) {
         break;
       }
-      dout(10) << ": done for file=" << entry.epath << dendl;
+
+      dout(5) << ": entry=" << entry.epath << ", data_sync=" << need_data_sync
+              << ", attr_sync=" << need_attr_sync << dendl;
+      if (need_data_sync || need_attr_sync) {
+        r = remote_file_op(dir_root, entry.epath, entry.stx, fh, need_data_sync,
+                           need_attr_sync);
+        if (r < 0) {
+          break;
+        }
+      }
+      dout(10) << ": done for epath=" << entry.epath << dendl;
       sync_stack.pop();
     }
   }
@@ -952,51 +1279,79 @@ int PeerReplayer::do_synchronize(const std::string &dir_path, const std::string
   return r;
 }
 
-int PeerReplayer::synchronize(const std::string &dir_path, uint64_t snap_id,
-                              const std::string &snap_name) {
-  dout(20) << ": dir_path=" << dir_path << ", snap_id=" << snap_id
-           << ", snap_name=" << snap_name << dendl;
+int PeerReplayer::synchronize(const std::string &dir_root, const Snapshot &current,
+                              boost::optional<Snapshot> prev) {
+  dout(20) << ": dir_root=" << dir_root << ", current=" << current << dendl;
+  if (prev) {
+    dout(20) << ": prev=" << prev << dendl;
+  }
 
-  auto snap_path = snapshot_path(m_cct, dir_path, snap_name);
+  int r = ceph_getxattr(m_remote_mount, dir_root.c_str(), "ceph.mirror.dirty_snap_id", nullptr, 0);
+  if (r < 0 && r != -ENODATA) {
+    derr << ": failed to fetch primary_snap_id length from dir_root=" << dir_root
+         << ": " << cpp_strerror(r) << dendl;
+    return r;
+  }
 
-  int r = cleanup_remote_dir(dir_path);
+  // no xattr, can't determine which snap the data belongs to!
   if (r < 0) {
-    derr << ": failed to cleanup remote directory=" << dir_path << dendl;
-    return r;
+    dout(5) << ": missing \"ceph.mirror.dirty_snap_id\" xattr on remote -- using"
+            << " incremental sync with remote scan" << dendl;
+    r = do_synchronize(dir_root, current, boost::none);
+  } else {
+    size_t xlen = r;
+    char *val = (char *)alloca(xlen+1);
+    r = ceph_getxattr(m_remote_mount, dir_root.c_str(), "ceph.mirror.dirty_snap_id", (void*)val, xlen);
+    if (r < 0) {
+      derr << ": failed to fetch \"dirty_snap_id\" for dir_root: " << dir_root
+           << ": " << cpp_strerror(r) << dendl;
+      return r;
+    }
+
+    val[xlen] = '\0';
+    uint64_t dirty_snap_id = atoll(val);
+
+    dout(20) << ": dirty_snap_id: " << dirty_snap_id << " vs (" << current.second
+             << "," << (prev ? stringify((*prev).second) : "~") << ")" << dendl;
+    if (prev && (dirty_snap_id == (*prev).second || dirty_snap_id == current.second)) {
+      dout(5) << ": match -- using incremental sync with local scan" << dendl;
+      r = do_synchronize(dir_root, current, prev);
+    } else {
+      dout(5) << ": mismatch -- using incremental sync with remote scan" << dendl;
+      r = do_synchronize(dir_root, current, boost::none);
+    }
   }
 
-  r = do_synchronize(dir_path, snap_name);
+  // snap sync failed -- bail out!
   if (r < 0) {
-    derr << ": failed to synchronize dir_path=" << dir_path << ", snapshot="
-         << snap_path << dendl;
     return r;
   }
 
-  auto snap_id_str{stringify(snap_id)};
-  snap_metadata snap_meta[] = {{PRIMARY_SNAP_ID_KEY.c_str(), snap_id_str.c_str()}};
-  r = ceph_mksnap(m_remote_mount, dir_path.c_str(), snap_name.c_str(), 0755,
+  auto cur_snap_id_str{stringify(current.second)};
+  snap_metadata snap_meta[] = {{PRIMARY_SNAP_ID_KEY.c_str(), cur_snap_id_str.c_str()}};
+  r = ceph_mksnap(m_remote_mount, dir_root.c_str(), current.first.c_str(), 0755,
                   snap_meta, sizeof(snap_meta)/sizeof(snap_metadata));
   if (r < 0) {
-    derr << ": failed to snap remote directory dir_path=" << dir_path
+    derr << ": failed to snap remote directory dir_root=" << dir_root
          << ": " << cpp_strerror(r) << dendl;
   }
 
   return r;
 }
 
-int PeerReplayer::do_sync_snaps(const std::string &dir_path) {
-  dout(20) << ": dir_path=" << dir_path << dendl;
+int PeerReplayer::do_sync_snaps(const std::string &dir_root) {
+  dout(20) << ": dir_root=" << dir_root << dendl;
 
   std::map<uint64_t, std::string> local_snap_map;
   std::map<uint64_t, std::string> remote_snap_map;
 
-  int r = build_snap_map(dir_path, &local_snap_map);
+  int r = build_snap_map(dir_root, &local_snap_map);
   if (r < 0) {
     derr << ": failed to build local snap map" << dendl;
     return r;
   }
 
-  r = build_snap_map(dir_path, &remote_snap_map, true);
+  r = build_snap_map(dir_root, &remote_snap_map, true);
   if (r < 0) {
     derr << ": failed to build remote snap map" << dendl;
     return r;
@@ -1015,13 +1370,13 @@ int PeerReplayer::do_sync_snaps(const std::string &dir_path) {
     }
   }
 
-  r = propagate_snap_deletes(dir_path, snaps_deleted);
+  r = propagate_snap_deletes(dir_root, snaps_deleted);
   if (r < 0) {
     derr << ": failed to propgate deleted snapshots" << dendl;
     return r;
   }
 
-  r = propagate_snap_renames(dir_path, snaps_renamed);
+  r = propagate_snap_renames(dir_root, snaps_renamed);
   if (r < 0) {
     derr << ": failed to propgate renamed snapshots" << dendl;
     return r;
@@ -1029,10 +1384,12 @@ int PeerReplayer::do_sync_snaps(const std::string &dir_path) {
 
   // start mirroring snapshots from the last snap-id synchronized
   uint64_t last_snap_id = 0;
+  std::string last_snap_name;
   if (!remote_snap_map.empty()) {
     auto last = remote_snap_map.rbegin();
     last_snap_id = last->first;
-    set_last_synced_snap(dir_path, last_snap_id, last->second);
+    last_snap_name = last->second;
+    set_last_synced_snap(dir_root, last_snap_id, last_snap_name);
   }
 
   dout(5) << ": last snap-id transferred=" << last_snap_id << dendl;
@@ -1045,40 +1402,47 @@ int PeerReplayer::do_sync_snaps(const std::string &dir_path) {
   auto snaps_per_cycle = g_ceph_context->_conf.get_val<uint64_t>(
     "cephfs_mirror_max_snapshot_sync_per_cycle");
 
-  dout(10) << ": synzhronizing from snap-id=" << it->first << dendl;
+  dout(10) << ": synchronizing from snap-id=" << it->first << dendl;
   for (; it != local_snap_map.end(); ++it) {
-    set_current_syncing_snap(dir_path, it->first, it->second);
+    set_current_syncing_snap(dir_root, it->first, it->second);
     auto start = clock::now();
-    r = synchronize(dir_path, it->first, it->second);
+    boost::optional<Snapshot> prev = boost::none;
+    if (last_snap_id != 0) {
+      prev = std::make_pair(last_snap_name, last_snap_id);
+    }
+    r = synchronize(dir_root, std::make_pair(it->second, it->first), prev);
     if (r < 0) {
-      derr << ": failed to synchronize dir_path=" << dir_path
+      derr << ": failed to synchronize dir_root=" << dir_root
            << ", snapshot=" << it->second << dendl;
-      clear_current_syncing_snap(dir_path);
+      clear_current_syncing_snap(dir_root);
       return r;
     }
     std::chrono::duration<double> duration = clock::now() - start;
-    set_last_synced_stat(dir_path, it->first, it->second, duration.count());
+    set_last_synced_stat(dir_root, it->first, it->second, duration.count());
     if (--snaps_per_cycle == 0) {
       break;
     }
+
+    last_snap_name = it->second;
+    last_snap_id = it->first;
   }
 
   return 0;
 }
 
-void PeerReplayer::sync_snaps(const std::string &dir_path,
+void PeerReplayer::sync_snaps(const std::string &dir_root,
                               std::unique_lock<ceph::mutex> &locker) {
-  dout(20) << ": dir_path=" << dir_path << dendl;
+  dout(20) << ": dir_root=" << dir_root << dendl;
   locker.unlock();
-  int r = do_sync_snaps(dir_path);
+  int r = do_sync_snaps(dir_root);
   if (r < 0) {
-    derr << ": failed to sync snapshots for dir_path=" << dir_path << dendl;
+    derr << ": failed to sync snapshots for dir_root=" << dir_root << dendl;
   }
   locker.lock();
   if (r < 0) {
-    _inc_failed_count(dir_path);
+    _inc_failed_count(dir_root);
   } else {
-    _reset_failed_count(dir_path);
+    _reset_failed_count(dir_root);
   }
 }
 
@@ -1111,13 +1475,13 @@ void PeerReplayer::run(SnapshotReplayerThread *replayer) {
     std::chrono::duration<double> timo = now - last_directory_scan;
     if (timo.count() >= scan_interval && m_directories.size()) {
       dout(20) << ": trying to pick from " << m_directories.size() << " directories" << dendl;
-      auto dir_path = pick_directory();
-      if (dir_path) {
-        dout(5) << ": picked dir_path=" << *dir_path << dendl;
-        int r = register_directory(*dir_path, replayer);
+      auto dir_root = pick_directory();
+      if (dir_root) {
+        dout(5) << ": picked dir_root=" << *dir_root << dendl;
+        int r = register_directory(*dir_root, replayer);
         if (r == 0) {
-          sync_snaps(*dir_path, locker);
-          unregister_directory(*dir_path);
+          sync_snaps(*dir_root, locker);
+          unregister_directory(*dir_root);
         }
       }
 
@@ -1129,8 +1493,8 @@ void PeerReplayer::run(SnapshotReplayerThread *replayer) {
 void PeerReplayer::peer_status(Formatter *f) {
   std::scoped_lock locker(m_lock);
   f->open_object_section("stats");
-  for (auto &[dir_path, sync_stat] : m_snap_sync_stats) {
-    f->open_object_section(dir_path);
+  for (auto &[dir_root, sync_stat] : m_snap_sync_stats) {
+    f->open_object_section(dir_root);
     if (sync_stat.failed) {
       f->dump_string("state", "failed");
     } else if (!sync_stat.current_syncing_snap) {
@@ -1155,7 +1519,7 @@ void PeerReplayer::peer_status(Formatter *f) {
     f->dump_unsigned("snaps_synced", sync_stat.synced_snap_count);
     f->dump_unsigned("snaps_deleted", sync_stat.deleted_snap_count);
     f->dump_unsigned("snaps_renamed", sync_stat.renamed_snap_count);
-    f->close_section(); // dir_path
+    f->close_section(); // dir_root
   }
   f->close_section(); // stats
 }
index 95e14cafbe2a46dd01425410175c0d52117273df..ff78db836eda4abc059f2e80eeed0f83ef1a35be 100644 (file)
@@ -31,10 +31,10 @@ public:
   void shutdown();
 
   // add a directory to mirror queue
-  void add_directory(string_view dir_path);
+  void add_directory(string_view dir_root);
 
   // remove a directory from queue
-  void remove_directory(string_view dir_path);
+  void remove_directory(string_view dir_root);
 
   // admin socket helpers
   void peer_status(Formatter *f);
@@ -45,6 +45,30 @@ private:
   inline static const std::string SERVICE_DAEMON_FAILED_DIR_COUNT_KEY = "failure_count";
   inline static const std::string SERVICE_DAEMON_RECOVERED_DIR_COUNT_KEY = "recovery_count";
 
+  using Snapshot = std::pair<std::string, uint64_t>;
+
+  // file descriptor "triplet" for synchronizing a snapshot
+  // w/ an added MountRef for accessing "previous" snapshot.
+  struct FHandles {
+    // open file descriptor on the snap directory for snapshot
+    // currently being synchronized. Always use this fd with
+    // @m_local_mount.
+    int c_fd;
+
+    // open file descriptor on the "previous" snapshot or on
+    // dir_root on remote filesystem (based on if the snapshot
+    // can be used for incremental transfer). Always use this
+    // fd with p_mnt which either points to @m_local_mount (
+    // for local incremental comparison) or @m_remote_mount (
+    // for remote incremental comparison).
+    int p_fd;
+    MountRef p_mnt;
+
+    // open file descriptor on dir_root on remote filesystem.
+    // Always use this fd with @m_remote_mount.
+    int r_fd_dir_root;
+  };
+
   bool is_stopping() {
     return m_stopping;
   }
@@ -83,6 +107,10 @@ private:
     std::string epath;
     ceph_dir_result *dirp; // valid for directories
     struct ceph_statx stx;
+    // set by incremental sync _after_ ensuring missing entries
+    // in the currently synced snapshot have been propagated to
+    // the remote filesystem.
+    bool remote_synced = false;
 
     SyncEntry(std::string_view path,
               const struct ceph_statx &stx)
@@ -100,6 +128,13 @@ private:
     bool is_directory() const {
       return S_ISDIR(stx.stx_mode);
     }
+
+    bool needs_remote_sync() const {
+      return remote_synced;
+    }
+    void set_remote_synced() {
+      remote_synced = true;
+    }
   };
 
   using clock = ceph::coarse_mono_clock;
@@ -124,10 +159,10 @@ private:
     boost::optional<double> last_sync_duration;
   };
 
-  void _inc_failed_count(const std::string &dir_path) {
+  void _inc_failed_count(const std::string &dir_root) {
     auto max_failures = g_ceph_context->_conf.get_val<uint64_t>(
     "cephfs_mirror_max_consecutive_failures_per_directory");
-    auto &sync_stat = m_snap_sync_stats.at(dir_path);
+    auto &sync_stat = m_snap_sync_stats.at(dir_root);
     sync_stat.last_failed = clock::now();
     if (++sync_stat.nr_failures >= max_failures && !sync_stat.failed) {
       sync_stat.failed = true;
@@ -137,8 +172,8 @@ private:
                                                      m_service_daemon_stats.failed_dir_count);
     }
   }
-  void _reset_failed_count(const std::string &dir_path) {
-    auto &sync_stat = m_snap_sync_stats.at(dir_path);
+  void _reset_failed_count(const std::string &dir_root) {
+    auto &sync_stat = m_snap_sync_stats.at(dir_root);
     if (sync_stat.failed) {
       ++m_service_daemon_stats.recovered_dir_count;
       m_service_daemon->add_or_update_peer_attribute(m_filesystem.fscid, m_peer,
@@ -150,49 +185,49 @@ private:
     sync_stat.last_failed = boost::none;
   }
 
-  void _set_last_synced_snap(const std::string &dir_path, uint64_t snap_id,
+  void _set_last_synced_snap(const std::string &dir_root, uint64_t snap_id,
                             const std::string &snap_name) {
-    auto &sync_stat = m_snap_sync_stats.at(dir_path);
+    auto &sync_stat = m_snap_sync_stats.at(dir_root);
     sync_stat.last_synced_snap = std::make_pair(snap_id, snap_name);
     sync_stat.current_syncing_snap = boost::none;
   }
-  void set_last_synced_snap(const std::string &dir_path, uint64_t snap_id,
+  void set_last_synced_snap(const std::string &dir_root, uint64_t snap_id,
                             const std::string &snap_name) {
     std::scoped_lock locker(m_lock);
-    _set_last_synced_snap(dir_path, snap_id, snap_name);
+    _set_last_synced_snap(dir_root, snap_id, snap_name);
   }
-  void set_current_syncing_snap(const std::string &dir_path, uint64_t snap_id,
+  void set_current_syncing_snap(const std::string &dir_root, uint64_t snap_id,
                                 const std::string &snap_name) {
     std::scoped_lock locker(m_lock);
-    auto &sync_stat = m_snap_sync_stats.at(dir_path);
+    auto &sync_stat = m_snap_sync_stats.at(dir_root);
     sync_stat.current_syncing_snap = std::make_pair(snap_id, snap_name);
   }
-  void clear_current_syncing_snap(const std::string &dir_path) {
+  void clear_current_syncing_snap(const std::string &dir_root) {
     std::scoped_lock locker(m_lock);
-    auto &sync_stat = m_snap_sync_stats.at(dir_path);
+    auto &sync_stat = m_snap_sync_stats.at(dir_root);
     sync_stat.current_syncing_snap = boost::none;
   }
-  void inc_deleted_snap(const std::string &dir_path) {
+  void inc_deleted_snap(const std::string &dir_root) {
     std::scoped_lock locker(m_lock);
-    auto &sync_stat = m_snap_sync_stats.at(dir_path);
+    auto &sync_stat = m_snap_sync_stats.at(dir_root);
     ++sync_stat.deleted_snap_count;
   }
-  void inc_renamed_snap(const std::string &dir_path) {
+  void inc_renamed_snap(const std::string &dir_root) {
     std::scoped_lock locker(m_lock);
-    auto &sync_stat = m_snap_sync_stats.at(dir_path);
+    auto &sync_stat = m_snap_sync_stats.at(dir_root);
     ++sync_stat.renamed_snap_count;
   }
-  void set_last_synced_stat(const std::string &dir_path, uint64_t snap_id,
+  void set_last_synced_stat(const std::string &dir_root, uint64_t snap_id,
                             const std::string &snap_name, double duration) {
     std::scoped_lock locker(m_lock);
-    _set_last_synced_snap(dir_path, snap_id, snap_name);
-    auto &sync_stat = m_snap_sync_stats.at(dir_path);
+    _set_last_synced_snap(dir_root, snap_id, snap_name);
+    auto &sync_stat = m_snap_sync_stats.at(dir_root);
     sync_stat.last_synced = clock::now();
     sync_stat.last_sync_duration = duration;
     ++sync_stat.synced_snap_count;
   }
 
-  bool should_backoff(const std::string &dir_path, int *retval) {
+  bool should_backoff(const std::string &dir_root, int *retval) {
     if (m_fs_mirror->is_blocklisted()) {
       *retval = -EBLOCKLISTED;
       return true;
@@ -205,7 +240,7 @@ private:
       *retval = -EINPROGRESS;
       return true;
     }
-    auto &dr = m_registered.at(dir_path);
+    auto &dr = m_registered.at(dir_root);
     if (dr.replayer->is_canceled()) {
       *retval = -ECANCELED;
       return true;
@@ -242,32 +277,45 @@ private:
   void run(SnapshotReplayerThread *replayer);
 
   boost::optional<std::string> pick_directory();
-  int register_directory(const std::string &dir_path, SnapshotReplayerThread *replayer);
-  void unregister_directory(const std::string &dir_path);
-  int try_lock_directory(const std::string &dir_path, SnapshotReplayerThread *replayer,
+  int register_directory(const std::string &dir_root, SnapshotReplayerThread *replayer);
+  void unregister_directory(const std::string &dir_root);
+  int try_lock_directory(const std::string &dir_root, SnapshotReplayerThread *replayer,
                          DirRegistry *registry);
-  void unlock_directory(const std::string &dir_path, const DirRegistry &registry);
-  void sync_snaps(const std::string &dir_path, std::unique_lock<ceph::mutex> &locker);
+  void unlock_directory(const std::string &dir_root, const DirRegistry &registry);
+  void sync_snaps(const std::string &dir_root, std::unique_lock<ceph::mutex> &locker);
+
 
-  int do_sync_snaps(const std::string &dir_path);
-  int build_snap_map(const std::string &dir_path, std::map<uint64_t, std::string> *snap_map,
+  int build_snap_map(const std::string &dir_root, std::map<uint64_t, std::string> *snap_map,
                      bool is_remote=false);
-  int propagate_snap_deletes(const std::string &dir_name, const std::set<std::string> &snaps);
-  int propagate_snap_renames(const std::string &dir_name,
+
+  int propagate_snap_deletes(const std::string &dir_root, const std::set<std::string> &snaps);
+  int propagate_snap_renames(const std::string &dir_root,
                              const std::set<std::pair<std::string,std::string>> &snaps);
-  int synchronize(const std::string &dir_path, uint64_t snap_id, const std::string &snap_name);
-  int do_synchronize(const std::string &path, const std::string &snap_name);
-
-  int cleanup_remote_dir(const std::string &dir_root, const std::string &path={});
-  int remote_mkdir(const std::string &local_path, const std::string &remote_path,
-                   const struct ceph_statx &stx);
-  int remote_file_op(const std::string &dir_path,
-                     const std::string &local_path,
-                     const std::string &remote_path, const struct ceph_statx &stx);
-  int remote_copy(const std::string &dir_path,
-                  const std::string &local_path,
-                  const std::string &remote_path,
-                  const struct ceph_statx &local_stx);
+  int propagate_deleted_entries(const std::string &dir_root, const std::string &epath,
+                                const FHandles &fh);
+  int cleanup_remote_dir(const std::string &dir_root, const std::string &epath,
+                         const FHandles &fh);
+
+  int should_sync_entry(const std::string &epath, const struct ceph_statx &cstx,
+                        const FHandles &fh, bool *need_data_sync, bool *need_attr_sync);
+
+  int open_dir(MountRef mnt, const std::string &dir_path, boost::optional<uint64_t> snap_id);
+  int pre_sync_check_and_open_handles(const std::string &dir_root, const Snapshot &current,
+                                      boost::optional<Snapshot> prev, FHandles *fh);
+  void post_sync_close_handles(const FHandles &fh);
+
+  int do_synchronize(const std::string &dir_root, const Snapshot &current,
+                     boost::optional<Snapshot> prev);
+
+  int synchronize(const std::string &dir_root, const Snapshot &current,
+                  boost::optional<Snapshot> prev);
+  int do_sync_snaps(const std::string &dir_root);
+
+  int remote_mkdir(const std::string &epath, const struct ceph_statx &stx, const FHandles &fh);
+  int remote_file_op(const std::string &dir_root, const std::string &epath, const struct ceph_statx &stx,
+                     const FHandles &fh, bool need_data_sync, bool need_attr_sync);
+  int copy_to_remote(const std::string &dir_root, const std::string &epath, const struct ceph_statx &stx,
+                     const FHandles &fh);
 };
 
 } // namespace mirror