]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
cephfs-mirror: current sync mechanism uses sync mechanism subclass'ing
authorJos Collin <jcollin@redhat.com>
Thu, 30 Jan 2025 12:02:30 +0000 (17:32 +0530)
committerVenky Shankar <vshankar@redhat.com>
Wed, 4 Jun 2025 14:52:56 +0000 (20:22 +0530)
Fixes: https://tracker.ceph.com/issues/69671
Signed-off-by: Jos Collin <jcollin@redhat.com>
Signed-off-by: Venky Shankar <vshankar@redhat.com>
(cherry picked from commit d9ac4315154a73c7bc84e94f55a9966c115adaca)

 Conflicts:
src/tools/cephfs_mirror/PeerReplayer.h

Minor conflict involcing boost::optional<>

src/tools/cephfs_mirror/PeerReplayer.cc
src/tools/cephfs_mirror/PeerReplayer.h

index 357821484ccc1c74acc0f9d9cd32e71e4d00083e..c52a42a05a35b3389358e11252e7f5cc89098c72 100644 (file)
@@ -2,7 +2,6 @@
 // vim: ts=8 sw=2 smarttab
 
 #include <stack>
-#include <queue>
 #include <fcntl.h>
 #include <algorithm>
 #include <sys/time.h>
@@ -1218,187 +1217,343 @@ int PeerReplayer::sync_perms(const std::string& path) {
   return 0;
 }
 
-int PeerReplayer::do_synchronize(const std::string &dir_root, const Snapshot &current) {
-  dout(20) << ": dir_root=" << dir_root << ", current=" << current << dendl;
-  FHandles fh;
-  int r = pre_sync_check_and_open_handles(dir_root, current, boost::none, &fh);
+PeerReplayer::SyncMechanism::SyncMechanism(MountRef local, MountRef remote, FHandles *fh,
+                                           const Peer &peer, const Snapshot &current,
+                                           boost::optional<Snapshot> prev)
+    : m_local(local),
+      m_remote(remote),
+      m_fh(fh),
+      m_peer(peer),
+      m_current(current),
+      m_prev(prev) {
+  }
+
+PeerReplayer::SyncMechanism::~SyncMechanism() {
+}
+
+PeerReplayer::SnapDiffSync::SnapDiffSync(std::string_view dir_root, MountRef local, MountRef remote,
+                                         FHandles *fh, const Peer &peer, const Snapshot &current,
+                                         boost::optional<Snapshot> prev)
+  : SyncMechanism(local, remote, fh, peer, current, prev),
+    m_dir_root(dir_root) {
+}
+
+PeerReplayer::SnapDiffSync::~SnapDiffSync() {
+}
+
+int PeerReplayer::SnapDiffSync::init_sync() {
+  struct ceph_statx tstx;
+  int r = ceph_fstatx(m_local, m_fh->c_fd, &tstx,
+                      CEPH_STATX_MODE | CEPH_STATX_UID | CEPH_STATX_GID |
+                      CEPH_STATX_SIZE | CEPH_STATX_ATIME | CEPH_STATX_MTIME,
+                      AT_STATX_DONT_SYNC | AT_SYMLINK_NOFOLLOW);
   if (r < 0) {
-    dout(5) << ": cannot proceed with sync: " << cpp_strerror(r) << dendl;
+    derr << ": failed to stat snap=" << m_current.first << ": " << cpp_strerror(r)
+         << dendl;
     return r;
   }
 
-  // record that we are going to "dirty" the data under this
-  // directory root
-  auto snap_id_str{stringify(current.second)};
-  r = ceph_fsetxattr(m_remote_mount, fh.r_fd_dir_root, "ceph.mirror.dirty_snap_id",
-                     snap_id_str.c_str(), snap_id_str.size(), 0);
-  if (r < 0) {
-    derr << ": error setting \"ceph.mirror.dirty_snap_id\" on dir_root=" << dir_root
-         << ": " << cpp_strerror(r) << dendl;
-    ceph_close(m_local_mount, fh.c_fd);
-    ceph_close(fh.p_mnt, fh.p_fd);
+  dout(20) << ": open_snapdiff for dir_root=" << m_dir_root << ", path=., prev="
+           << (*m_prev).first << ", current=" << m_current.first << dendl;
+
+  ceph_snapdiff_info info;
+  r = ceph_open_snapdiff(m_local, m_dir_root.c_str(), ".",
+                         stringify((*m_prev).first).c_str(), stringify(m_current.first).c_str(), &info);
+  if (r != 0) {
+    derr << ": failed to open snapdiff for " << m_dir_root << ": r=" << r << dendl;
     return r;
   }
 
+  m_sync_stack.emplace(SyncEntry(".", info, tstx));
+  return 0;
+}
+
+int PeerReplayer::SnapDiffSync::get_entry(std::string *epath, struct ceph_statx *stx,
+                                          const std::function<int (const std::string&)> &dirsync_func,
+                                          const std::function<int (const std::string &)> &purge_func) {
+  dout(20) << ": sync stack size=" << m_sync_stack.size() << dendl;
+
+  while (!m_sync_stack.empty()) {
+    auto &entry = m_sync_stack.top();
+    dout(20) << ": top of stack path=" << entry.epath << dendl;
+
+    if (!entry.is_directory()) {
+      *epath = entry.epath;
+      *stx = entry.stx;
+      m_sync_stack.pop();
+      return 0;
+    }
+
+    int r;
+    std::string e_name;
+    ceph_snapdiff_entry_t sd_entry;
+    while (true) {
+      r = ceph_readdir_snapdiff(&(entry.info), &sd_entry);
+      if (r < 0) {
+        derr << ": failed to read directory=" << entry.epath << dendl;
+        break;
+      }
+      if (r == 0) {
+        break;
+      }
+
+      dout(20) << ": entry=" << sd_entry.dir_entry.d_name << ", snapid="
+               << sd_entry.snapid << dendl;
+
+      auto d_name = std::string(sd_entry.dir_entry.d_name);
+      if (d_name != "." && d_name != "..") {
+        e_name = d_name;
+        break;
+      }
+    }
+
+    if (r == 0) {
+      dout(10) << ": done for directory=" << entry.epath << dendl;
+      if (ceph_close_snapdiff(&(entry.info)) < 0) {
+        derr << ": failed to close snapdiff for " << entry.epath << dendl;
+      }
+      m_sync_stack.pop();
+      continue;
+    }
+
+    if (r < 0) {
+      return r;
+    }
+
+    auto _epath = entry_path(entry.epath, e_name);
+    dout(20) << ": epath=" << _epath << dendl;
+    if (sd_entry.snapid == (*m_prev).second) {
+      dout(20) << ": epath=" << _epath << " is deleted in current snapshot " << dendl;
+      // do not depend on d_type reported in struct dirent as the
+      // delete and create could have been processed and a restart
+      // of an interrupted sync would use the incorrect unlink API.
+      // N.B.: snapdiff returns the deleted entry before the newly
+      // created one.
+      struct ceph_statx pstx;
+      r = ceph_statxat(m_remote, m_fh->r_fd_dir_root, _epath.c_str(), &pstx,
+                       CEPH_STATX_MODE, AT_STATX_DONT_SYNC | AT_SYMLINK_NOFOLLOW);
+      if (r < 0 && r != -ENOENT) {
+        derr << ": failed to stat remote entry=" << _epath << ": " << cpp_strerror(r)
+             << dendl;
+        return r;
+      }
+      if (r == 0) {
+        if (!S_ISDIR(pstx.stx_mode)) {
+          r = ceph_unlinkat(m_remote, m_fh->r_fd_dir_root, _epath.c_str(), 0);
+        } else {
+          r = purge_func(_epath);
+        }
+
+        if (r < 0) {
+          derr << ": failed to propagate missing dirs: " << cpp_strerror(r) << dendl;
+          return r;
+        }
+      }
+
+      continue;
+    }
+
+    struct ceph_statx estx;
+    r = ceph_statxat(m_local, m_fh->c_fd, _epath.c_str(), &estx,
+                     CEPH_STATX_MODE | CEPH_STATX_UID | CEPH_STATX_GID |
+                     CEPH_STATX_SIZE | CEPH_STATX_ATIME | CEPH_STATX_MTIME,
+                     AT_STATX_DONT_SYNC | AT_SYMLINK_NOFOLLOW);
+    if (r < 0) {
+      derr << ": failed to stat epath=" << epath << ": " << cpp_strerror(r)
+           << dendl;
+      return r;
+    }
+
+    if (S_ISDIR(estx.stx_mode)) {
+      dout(20) << ": open_snapdiff for dir_root=" << m_dir_root << ", path=" << _epath
+               << ", prev=" << (*m_prev).first << ", current=" << m_current.first << dendl;
+
+      ceph_snapdiff_info info;
+      r = ceph_open_snapdiff(m_local, m_dir_root.c_str(), _epath.c_str(),
+                             stringify((*m_prev).first).c_str(), stringify(m_current.first).c_str(), &info);
+      if (r != 0) {
+        derr << ": failed to open snapdiff for " << m_dir_root << ": r=" << r << dendl;
+        return r;
+      }
+
+      m_sync_stack.emplace(SyncEntry(_epath, info, estx));
+    }
+
+    *epath = _epath;
+    *stx = estx;
+
+    return 0;
+  }
+
+  *epath = "";
+  return 0;
+}
+
+void PeerReplayer::SnapDiffSync::finish_sync() {
+  dout(20) << dendl;
+
+  while (!m_sync_stack.empty()) {
+    auto &entry = m_sync_stack.top();
+    if (entry.is_directory()) {
+      dout(20) << ": closing local directory=" << entry.epath << dendl;
+      if (ceph_close_snapdiff(&(entry.info)) < 0) {
+        derr << ": failed to close snapdiff directory=" << entry.epath << dendl;
+      }
+    }
+
+    m_sync_stack.pop();
+  }
+}
+
+PeerReplayer::RemoteSync::RemoteSync(MountRef local, MountRef remote, FHandles *fh,
+                                       const Peer &peer, const Snapshot &current,
+                                       boost::optional<Snapshot> prev)
+  : SyncMechanism(local, remote, fh, peer, current, prev) {
+}
+
+PeerReplayer::RemoteSync::~RemoteSync() {
+}
+
+int PeerReplayer::RemoteSync::init_sync() {
   struct ceph_statx tstx;
-  r = ceph_fstatx(m_local_mount, fh.c_fd, &tstx,
-                  CEPH_STATX_MODE | CEPH_STATX_UID | CEPH_STATX_GID |
-                  CEPH_STATX_SIZE | CEPH_STATX_ATIME | CEPH_STATX_MTIME,
-                  AT_STATX_DONT_SYNC | AT_SYMLINK_NOFOLLOW);
+  int r = ceph_fstatx(m_local, m_fh->c_fd, &tstx,
+                      CEPH_STATX_MODE | CEPH_STATX_UID | CEPH_STATX_GID |
+                      CEPH_STATX_SIZE | CEPH_STATX_ATIME | CEPH_STATX_MTIME,
+                      AT_STATX_DONT_SYNC | AT_SYMLINK_NOFOLLOW);
   if (r < 0) {
-    derr << ": failed to stat snap=" << current.first << ": " << cpp_strerror(r)
+    derr << ": failed to stat snap=" << m_current.first << ": " << cpp_strerror(r)
          << dendl;
-    ceph_close(m_local_mount, fh.c_fd);
-    ceph_close(fh.p_mnt, fh.p_fd);
     return r;
   }
 
   ceph_dir_result *tdirp;
-  r = ceph_fdopendir(m_local_mount, fh.c_fd, &tdirp);
+  r = ceph_fdopendir(m_local, m_fh->c_fd, &tdirp);
   if (r < 0) {
-    derr << ": failed to open local snap=" << current.first << ": " << cpp_strerror(r)
+    derr << ": failed to open local snap=" << m_current.first << ": " << cpp_strerror(r)
          << dendl;
-    ceph_close(m_local_mount, fh.c_fd);
-    ceph_close(fh.p_mnt, fh.p_fd);
     return r;
   }
-  // starting from this point we shouldn't care about manual closing of fh.c_fd,
-  // it will be closed automatically when bound tdirp is closed.
 
-  std::stack<SyncEntry> sync_stack;
-  sync_stack.emplace(SyncEntry(".", tdirp, tstx));
-  while (!sync_stack.empty()) {
-    if (should_backoff(dir_root, &r)) {
-      dout(0) << ": backing off r=" << r << dendl;
-      break;
-    }
+  m_sync_stack.emplace(SyncEntry(".", tdirp, tstx));
+  return 0;
+}
 
-    dout(20) << ": " << sync_stack.size() << " entries in stack" << dendl;
-    std::string e_name;
-    auto &entry = sync_stack.top();
+int PeerReplayer::RemoteSync::get_entry(std::string *epath, struct ceph_statx *stx,
+                                        const std::function<int (const std::string&)> &dirsync_func,
+                                        const std::function<int (const std::string &)> &purge_func) {
+  dout(20) << ": sync stack size=" << m_sync_stack.size() << dendl;
+
+  while (!m_sync_stack.empty()) {
+    auto &entry = m_sync_stack.top();
     dout(20) << ": top of stack path=" << entry.epath << dendl;
-    if (entry.is_directory()) {
-      // entry is a directory -- propagate deletes for missing entries
-      // (and changed inode types) to the remote filesystem.
-      if (!entry.needs_remote_sync()) {
-        r = propagate_deleted_entries(dir_root, entry.epath, fh);
-        if (r < 0 && r != -ENOENT) {
-          derr << ": failed to propagate missing dirs: " << cpp_strerror(r) << dendl;
-          break;
-        }
-        entry.set_remote_synced();
-      }
 
-      struct ceph_statx stx;
-      struct dirent de;
-      while (true) {
-        r = ceph_readdirplus_r(m_local_mount, entry.dirp, &de, &stx,
-                               CEPH_STATX_MODE | CEPH_STATX_UID | CEPH_STATX_GID |
-                               CEPH_STATX_SIZE | CEPH_STATX_ATIME | CEPH_STATX_MTIME,
-                               AT_STATX_DONT_SYNC | AT_SYMLINK_NOFOLLOW, NULL);
-        if (r < 0) {
-          derr << ": failed to local read directory=" << entry.epath << dendl;
-          break;
-        }
-        if (r == 0) {
-          break;
-        }
+    if (!entry.is_directory()) {
+      *epath = entry.epath;
+      *stx = entry.stx;
+      m_sync_stack.pop();
+      return 0;
+    }
 
-        auto d_name = std::string(de.d_name);
-        if (d_name != "." && d_name != "..") {
-          e_name = d_name;
-          break;
-        }
+    // entry is a directory -- propagate deletes for missing entries
+    // (and changed inode types) to the remote filesystem.
+    if (!entry.needs_remote_sync()) {
+      int r = dirsync_func(entry.epath);
+      if (r < 0 && r != -ENOENT) {
+        derr << ": failed to propagate missing dirs: " << cpp_strerror(r) << dendl;
+        return r;
       }
+      entry.set_remote_synced();
+    }
 
+    int r;
+    std::string e_name;
+    while (true) {
+      struct dirent de;
+      r = ceph_readdirplus_r(m_local, entry.dirp, &de, NULL,
+                             CEPH_STATX_MODE | CEPH_STATX_UID | CEPH_STATX_GID |
+                             CEPH_STATX_SIZE | CEPH_STATX_ATIME | CEPH_STATX_MTIME,
+                             AT_STATX_DONT_SYNC | AT_SYMLINK_NOFOLLOW, NULL);
+      if (r < 0) {
+        derr << ": failed to local read directory=" << entry.epath << dendl;
+        break;
+      }
       if (r == 0) {
-        dout(10) << ": done for directory=" << entry.epath << dendl;
-        if (ceph_closedir(m_local_mount, entry.dirp) < 0) {
-          derr << ": failed to close local directory=" << entry.epath << dendl;
-        }
-        sync_stack.pop();
-        continue;
+        break;
       }
-      if (r < 0) {
+
+      auto d_name = std::string(de.d_name);
+      if (d_name != "." && d_name != "..") {
+        e_name = d_name;
         break;
       }
+    }
 
-      auto epath = entry_path(entry.epath, e_name);
-      if (S_ISDIR(stx.stx_mode)) {
-        r = remote_mkdir(epath, stx, fh);
-        if (r < 0) {
-          break;
-        }
-        ceph_dir_result *dirp;
-        r = opendirat(m_local_mount, fh.c_fd, epath, AT_SYMLINK_NOFOLLOW, &dirp);
-        if (r < 0) {
-          derr << ": failed to open local directory=" << epath << ": "
-               << cpp_strerror(r) << dendl;
-          break;
-        }
-        sync_stack.emplace(SyncEntry(epath, dirp, stx));
-      } else {
-        sync_stack.emplace(SyncEntry(epath, stx));
+    if (r == 0) {
+      dout(10) << ": done for directory=" << entry.epath << dendl;
+      if (ceph_closedir(m_local, entry.dirp) < 0) {
+        derr << ": failed to close local directory=" << entry.epath << dendl;
       }
-    } else {
-      bool need_data_sync = true;
-      bool need_attr_sync = true;
-      r = should_sync_entry(entry.epath, entry.stx, fh,
-                            &need_data_sync, &need_attr_sync);
+      m_sync_stack.pop();
+      continue;
+    }
+
+    if (r < 0) {
+      return r;
+    }
+
+    struct ceph_statx cstx;
+    auto _epath = entry_path(entry.epath, e_name);
+    r = ceph_statxat(m_local, m_fh->c_fd, _epath.c_str(), &cstx,
+                     CEPH_STATX_MODE | CEPH_STATX_UID | CEPH_STATX_GID |
+                     CEPH_STATX_SIZE | CEPH_STATX_ATIME | CEPH_STATX_MTIME,
+                     AT_STATX_DONT_SYNC | AT_SYMLINK_NOFOLLOW);
+    if (r < 0) {
+      derr << ": failed to stat epath=" << _epath << ": " << cpp_strerror(r)
+           << dendl;
+      return r;
+    }
+
+    if (S_ISDIR(cstx.stx_mode)) {
+      ceph_dir_result *dirp;
+      r = opendirat(m_local, m_fh->c_fd, _epath, AT_SYMLINK_NOFOLLOW, &dirp);
       if (r < 0) {
+        derr << ": failed to open local directory=" << _epath << ": "
+             << cpp_strerror(r) << dendl;
         break;
       }
 
-      dout(5) << ": entry=" << entry.epath << ", data_sync=" << need_data_sync
-              << ", attr_sync=" << need_attr_sync << dendl;
-      if (need_data_sync || need_attr_sync) {
-        r = remote_file_op(dir_root, entry.epath, entry.stx, fh, need_data_sync,
-                           need_attr_sync);
-        if (r < 0) {
-          break;
-        }
-      }
-      dout(10) << ": done for epath=" << entry.epath << dendl;
-      sync_stack.pop();
+      m_sync_stack.emplace(SyncEntry(_epath, dirp, cstx));
     }
+
+    *epath = _epath;
+    *stx = cstx;
+
+    return 0;
   }
 
-  while (!sync_stack.empty()) {
-    auto &entry = sync_stack.top();
+  *epath = "";
+  return 0;
+}
+
+void PeerReplayer::RemoteSync::finish_sync() {
+  dout(20) << dendl;
+
+  while (!m_sync_stack.empty()) {
+    auto &entry = m_sync_stack.top();
     if (entry.is_directory()) {
       dout(20) << ": closing local directory=" << entry.epath << dendl;
-      if (ceph_closedir(m_local_mount, entry.dirp) < 0) {
+      if (ceph_closedir(m_local, entry.dirp) < 0) {
         derr << ": failed to close local directory=" << entry.epath << dendl;
       }
     }
 
-    sync_stack.pop();
+    m_sync_stack.pop();
   }
-
-  dout(20) << " cur:" << fh.c_fd
-           << " prev:" << fh.p_fd
-           << " ret = " << r
-           << dendl;
-
-  // @FHandles.r_fd_dir_root is closed in @unregister_directory since
-  // its used to acquire an exclusive lock on remote dir_root.
-
-  // c_fd has been used in ceph_fdopendir call so
-  // there is no need to close this fd manually.
-  ceph_close(fh.p_mnt, fh.p_fd);
-
-  return r;
 }
 
 int PeerReplayer::do_synchronize(const std::string &dir_root, const Snapshot &current,
                                  boost::optional<Snapshot> prev) {
-  if (!prev) {
-    derr << ": invalid previous snapshot" << dendl;
-    return -ENODATA;
-  }
-
-  dout(20) << ": incremental sync check from prev=" << prev << dendl;
-
+  dout(20) << ": dir_root=" << dir_root << ", current=" << current << dendl;
   FHandles fh;
   int r = pre_sync_check_and_open_handles(dir_root, current, prev, &fh);
   if (r < 0) {
@@ -1406,11 +1561,11 @@ int PeerReplayer::do_synchronize(const std::string &dir_root, const Snapshot &cu
     return r;
   }
 
-
-  // record that we are going to "dirty" the data under this directory root
+  // record that we are going to "dirty" the data under this
+  // directory root
   auto snap_id_str{stringify(current.second)};
-  r = ceph_setxattr(m_remote_mount, dir_root.c_str(), "ceph.mirror.dirty_snap_id",
-                    snap_id_str.c_str(), snap_id_str.size(), 0);
+  r = ceph_fsetxattr(m_remote_mount, fh.r_fd_dir_root, "ceph.mirror.dirty_snap_id",
+                     snap_id_str.c_str(), snap_id_str.size(), 0);
   if (r < 0) {
     derr << ": error setting \"ceph.mirror.dirty_snap_id\" on dir_root=" << dir_root
          << ": " << cpp_strerror(r) << dendl;
@@ -1419,145 +1574,82 @@ int PeerReplayer::do_synchronize(const std::string &dir_root, const Snapshot &cu
     return r;
   }
 
-  struct ceph_statx cstx;
-  r = ceph_fstatx(m_local_mount, fh.c_fd, &cstx,
-                  CEPH_STATX_MODE | CEPH_STATX_UID | CEPH_STATX_GID |
-                  CEPH_STATX_SIZE | CEPH_STATX_ATIME | CEPH_STATX_MTIME,
-                  AT_STATX_DONT_SYNC | AT_SYMLINK_NOFOLLOW);
+  SyncMechanism *syncm;
+  if (fh.p_mnt == m_local_mount) {
+    syncm = new SnapDiffSync(dir_root, m_local_mount, m_remote_mount, &fh,
+                             m_peer, current, prev);
+  } else {
+    syncm = new RemoteSync(m_local_mount, m_remote_mount, &fh,
+                           m_peer, current, boost::none);
+  }
+
+  r = syncm->init_sync();
   if (r < 0) {
-    derr << ": failed to stat snap=" << current.first << ": " << cpp_strerror(r)
-         << dendl;
+    derr << ": failed to initialize sync mechanism" << dendl;
     ceph_close(m_local_mount, fh.c_fd);
     ceph_close(fh.p_mnt, fh.p_fd);
+    delete syncm;
     return r;
   }
 
-  ceph_snapdiff_info sd_info;
-  ceph_snapdiff_entry_t sd_entry;
-
-  //The queue of SyncEntry items (directories) to be synchronized.
-  //We follow a breadth first approach here based on the snapdiff output.
-  std::queue<SyncEntry> sync_queue;
-
-  //start with initial/default entry
-  std::string epath = ".", npath = "", nname = "";
-  sync_queue.emplace(SyncEntry(epath, cstx));
-
-  while (!sync_queue.empty()) {
+  // starting from this point we shouldn't care about manual closing of fh.c_fd,
+  // it will be closed automatically when bound tdirp is closed.
+  while (true) {
     if (should_backoff(dir_root, &r)) {
       dout(0) << ": backing off r=" << r << dendl;
       break;
     }
 
-    dout(20) << ": " << sync_queue.size() << " entries in queue" << dendl;
-    const auto &queue_entry = sync_queue.front();
-    epath = queue_entry.epath;
-    dout(20) << ": syncing entry, path=" << epath << dendl;
-    r = ceph_open_snapdiff(fh.p_mnt, dir_root.c_str(), epath.c_str(),
-                           stringify((*prev).first).c_str(), current.first.c_str(), &sd_info);
-    if (r != 0) {
-      derr << ": failed to open snapdiff, r=" << r << dendl;
-      ceph_close(m_local_mount, fh.c_fd);
-      ceph_close(fh.p_mnt, fh.p_fd);
-      return r;
+    std::string epath;
+    struct ceph_statx stx;
+    r = syncm->get_entry(&epath, &stx,
+                         [this, &dir_root, &fh](const std::string &epath) {
+                           return propagate_deleted_entries(dir_root, epath, fh);
+                         },
+                         [this, &dir_root, &fh](const std::string &epath) {
+                           return cleanup_remote_dir(dir_root, epath, fh);
+                         });
+    dout(20) << ": r=" << r << dendl;
+    if (r < 0) {
+      break;
     }
-    while (0 < (r = ceph_readdir_snapdiff(&sd_info, &sd_entry))) {
+
+    dout(20) << ": epath=" << epath << dendl;
+    if (epath == "") {
+      dout(10) << ": tree traversal done for dir_root=" << dir_root << dendl;
+      break;
+    }
+
+    if (S_ISDIR(stx.stx_mode)) {
+      r = remote_mkdir(epath, stx, fh);
       if (r < 0) {
-        derr << ": failed to read directory=" << epath << dendl;
-        ceph_close_snapdiff(&sd_info);
-        ceph_close(m_local_mount, fh.c_fd);
-        ceph_close(fh.p_mnt, fh.p_fd);
-        return r;
+        break;
       }
-
-      //New entry found
-      nname = sd_entry.dir_entry.d_name;
-      if ("." == nname || ".." == nname)
-        continue;
-      // create path for the newly found entry
-      npath = entry_path(epath, nname);
-      r = ceph_statxat(m_local_mount, fh.c_fd, npath.c_str(), &cstx,
-                       CEPH_STATX_MODE | CEPH_STATX_UID | CEPH_STATX_GID |
-                       CEPH_STATX_SIZE | CEPH_STATX_ATIME | CEPH_STATX_MTIME,
-                       AT_STATX_DONT_SYNC | AT_SYMLINK_NOFOLLOW);
+    } else {
+      bool need_data_sync = true;
+      bool need_attr_sync = true;
+      r = should_sync_entry(epath, stx, fh,
+                            &need_data_sync, &need_attr_sync);
       if (r < 0) {
-        // can't stat, so it's a deleted entry.
-        if (DT_DIR == sd_entry.dir_entry.d_type) { // is a directory
-          r = cleanup_remote_dir(dir_root, npath, fh);
-          if (r < 0) {
-            derr << ": failed to remove directory=" << npath << dendl;
-            break;
-          }
-        }
-        else { // is a file
-          r = ceph_unlinkat(m_remote_mount, fh.r_fd_dir_root, npath.c_str(), 0);
-          if (r < 0) {
-            break;
-          }
-        }
-      } else {
-        // stat success, update the existing entry
-        struct ceph_statx tstx;
-        int rstat_r = ceph_statxat(m_remote_mount, fh.r_fd_dir_root, npath.c_str(), &tstx,
-                                   CEPH_STATX_MODE | CEPH_STATX_UID | CEPH_STATX_GID |
-                                   CEPH_STATX_SIZE | CEPH_STATX_ATIME | CEPH_STATX_MTIME,
-                                   AT_STATX_DONT_SYNC | AT_SYMLINK_NOFOLLOW);
-        if (S_ISDIR(cstx.stx_mode)) { // is a directory
-          //cleanup if it's a file in the remotefs
-          if ((0 == rstat_r) && !S_ISDIR(tstx.stx_mode)) {
-            r = ceph_unlinkat(m_remote_mount, fh.r_fd_dir_root, npath.c_str(), 0);
-            if (r < 0) {
-              derr << ": Error in directory sync. Failed to remove file="
-                   << npath << dendl;
-              break;
-            }
-          }
-          r = remote_mkdir(npath, cstx, fh);
-          if (r < 0) {
-            break;
-          }
-          // push it to sync_queue for later processing
-          sync_queue.emplace(SyncEntry(npath, cstx));
-        } else { // is a file
-          bool need_data_sync = true;
-          bool need_attr_sync = true;
-          r = should_sync_entry(npath, cstx, fh, &need_data_sync, &need_attr_sync);
-          if (r < 0) {
-            break;
-          }
-          dout(5) << ": entry=" << npath << ", data_sync=" << need_data_sync
-                  << ", attr_sync=" << need_attr_sync << dendl;
-          if (need_data_sync || need_attr_sync) {
-            //cleanup if it's a directory in the remotefs
-            if ((0 == rstat_r) && S_ISDIR(tstx.stx_mode)) {
-              r = cleanup_remote_dir(dir_root, npath, fh);
-              if (r < 0) {
-                derr << ": Error in file sync. Failed to remove remote directory="
-                     << npath << dendl;
-                break;
-              }
-            }
-            r = remote_file_op(dir_root, npath, cstx, fh, need_data_sync, need_attr_sync);
-            if (r < 0) {
-              break;
-            }
-          }
-        }
+        break;
       }
-    }
-    if (0 == r) {
-      dout(10) << ": successfully synchronized the entry=" << epath << dendl;
-    }
 
-    //Close the current open directory and take the next queue_entry, if success or failure.
-    r = ceph_close_snapdiff(&sd_info);
-    if (r != 0) {
-      derr << ": failed to close directory=" << epath << dendl;
+      dout(5) << ": entry=" << epath << ", data_sync=" << need_data_sync
+              << ", attr_sync=" << need_attr_sync << dendl;
+      if (need_data_sync || need_attr_sync) {
+        r = remote_file_op(dir_root, epath, stx, fh, need_data_sync, need_attr_sync);
+        if (r < 0) {
+          break;
+        }
+      }
+      dout(10) << ": done for epath=" << epath << dendl;
     }
-    sync_queue.pop();
   }
 
-  dout(20) << " current:" << fh.c_fd
+  syncm->finish_sync();
+  delete syncm;
+
+  dout(20) << " cur:" << fh.c_fd
            << " prev:" << fh.p_fd
            << " ret = " << r
            << dendl;
@@ -1565,8 +1657,10 @@ int PeerReplayer::do_synchronize(const std::string &dir_root, const Snapshot &cu
   // @FHandles.r_fd_dir_root is closed in @unregister_directory since
   // its used to acquire an exclusive lock on remote dir_root.
 
-  ceph_close(m_local_mount, fh.c_fd);
+  // c_fd has been used in ceph_fdopendir call so
+  // there is no need to close this fd manually.
   ceph_close(fh.p_mnt, fh.p_fd);
+
   return r;
 }
 
index 32c71301f00686fd78cf5f56a254481defe78aeb..b5199913649a365aac0c150bb2af403c8e411053 100644 (file)
@@ -10,6 +10,8 @@
 #include "ServiceDaemon.h"
 #include "Types.h"
 
+#include <stack>
+
 namespace cephfs {
 namespace mirror {
 
@@ -101,6 +103,7 @@ private:
   struct SyncEntry {
     std::string epath;
     ceph_dir_result *dirp; // valid for directories
+    ceph_snapdiff_info info;
     struct ceph_statx stx;
     // set by incremental sync _after_ ensuring missing entries
     // in the currently synced snapshot have been propagated to
@@ -119,6 +122,13 @@ private:
         dirp(dirp),
         stx(stx) {
     }
+    SyncEntry(std::string_view path,
+              const ceph_snapdiff_info &info,
+              const struct ceph_statx &stx)
+      : epath(path),
+        info(info),
+        stx(stx) {
+    }
 
     bool is_directory() const {
       return S_ISDIR(stx.stx_mode);
@@ -132,6 +142,65 @@ private:
     }
   };
 
+  class SyncMechanism {
+  public:
+    SyncMechanism(MountRef local, MountRef remote, FHandles *fh,
+                  const Peer &peer, /* keep dout happy */
+                  const Snapshot &current, boost::optional<Snapshot> prev);
+    virtual ~SyncMechanism() = 0;
+
+    virtual int init_sync() = 0;
+
+    virtual int get_entry(std::string *epath, struct ceph_statx *stx,
+                          const std::function<int (const std::string&)> &dirsync_func,
+                          const std::function<int (const std::string&)> &purge_func) = 0;
+
+    virtual void finish_sync() = 0;
+
+  protected:
+    MountRef m_local;
+    MountRef m_remote;
+    FHandles *m_fh;
+    Peer m_peer;
+    Snapshot m_current;
+    boost::optional<Snapshot> m_prev;
+    std::stack<PeerReplayer::SyncEntry> m_sync_stack;
+  };
+
+  class RemoteSync : public SyncMechanism {
+  public:
+    RemoteSync(MountRef local, MountRef remote, FHandles *fh,
+               const Peer &peer, /* keep dout happy */
+               const Snapshot &current, boost::optional<Snapshot> prev);
+    ~RemoteSync();
+
+    int init_sync() override;
+
+    int get_entry(std::string *epath, struct ceph_statx *stx,
+                  const std::function<int (const std::string&)> &dirsync_func,
+                  const std::function<int (const std::string&)> &purge_func);
+
+    void finish_sync();
+  };
+
+  class SnapDiffSync : public SyncMechanism {
+  public:
+    SnapDiffSync(std::string_view dir_root, MountRef local, MountRef remote,
+                 FHandles *fh, const Peer &peer, const Snapshot &current,
+                 boost::optional<Snapshot> prev);
+    ~SnapDiffSync();
+
+    int init_sync() override;
+
+    int get_entry(std::string *epeth, struct ceph_statx *stx,
+                  const std::function<int (const std::string&)> &dirsync_func,
+                  const std::function<int (const std::string&)> &purge_func);
+
+    void finish_sync();
+  private:
+    std::string m_dir_root;
+  };
+
   // stats sent to service daemon
   struct ServiceDaemonStats {
     uint64_t failed_dir_count = 0;
@@ -310,8 +379,9 @@ private:
 
   int do_synchronize(const std::string &dir_root, const Snapshot &current,
                      boost::optional<Snapshot> prev);
-
-  int do_synchronize(const std::string &dir_root, const Snapshot &current);
+  int do_synchronize(const std::string &dir_root, const Snapshot &current) {
+    return do_synchronize(dir_root, current, boost::none);
+  }
 
   int synchronize(const std::string &dir_root, const Snapshot &current,
                   boost::optional<Snapshot> prev);