]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
tools/cephfs_mirror: Add SnapDiff entries to dataq
authorKotresh HR <khiremat@redhat.com>
Wed, 14 Jan 2026 12:17:47 +0000 (17:47 +0530)
committerKotresh HR <khiremat@redhat.com>
Mon, 16 Feb 2026 19:14:14 +0000 (00:44 +0530)
Add SnapDiff entries to dataq and process the same
in datasync threads similar to RemoteSync entries.

Fixes: https://tracker.ceph.com/issues/73452
Signed-off-by: Kotresh HR <khiremat@redhat.com>
src/tools/cephfs_mirror/PeerReplayer.cc
src/tools/cephfs_mirror/PeerReplayer.h

index 01327e163b5065492137633b33a03e486dd9f149..3c8bc1509c6f5bc1a27f6201b64dbe7092d527ba 100644 (file)
@@ -1508,87 +1508,96 @@ int PeerReplayer::SnapDiffSync::get_entry(std::string *epath, struct ceph_statx
 
       dout(20) << ": entry=" << e_name << ", snapid=" << snapid << dendl;
       if (e_name != "." && e_name != "..") {
-        break;
-      }
-    }
-
-    if (r == 0) {
-      dout(10) << ": done for directory=" << entry.epath << dendl;
-      fini_directory(entry);
-      m_sync_stack.pop();
-      continue;
-    }
+        auto _epath = entry_path(entry.epath, e_name);
+        dout(20) << ": epath=" << _epath << dendl;
+        if (snapid == (*m_prev).second) {
+          dout(20) << ": epath=" << _epath << " is deleted in current snapshot " << dendl;
+          // do not depend on d_type reported in struct dirent as the
+          // delete and create could have been processed and a restart
+          // of an interrupted sync would use the incorrect unlink API.
+          // N.B.: snapdiff returns the deleted entry before the newly
+          // created one.
+          struct ceph_statx pstx;
+          r = ceph_statxat(m_remote, m_fh->r_fd_dir_root, _epath.c_str(), &pstx,
+                           CEPH_STATX_MODE, AT_STATX_DONT_SYNC | AT_SYMLINK_NOFOLLOW);
+          if (r < 0 && r != -ENOENT) {
+            derr << ": failed to stat remote entry=" << _epath << ", r=" << r << dendl;
+            return r;
+          }
+          if (r == 0) {
+            if (!S_ISDIR(pstx.stx_mode)) {
+              r = ceph_unlinkat(m_remote, m_fh->r_fd_dir_root, _epath.c_str(), 0);
+            } else {
+              r = purge_func(_epath);
+            }
 
-    if (r < 0) {
-      return r;
-    }
+            if (r < 0) {
+              derr << ": failed to propagate missing dirs r=" << r << dendl;
+              return r;
+            }
+          }
 
-    auto _epath = entry_path(entry.epath, e_name);
-    dout(20) << ": epath=" << _epath << dendl;
-    if (snapid == (*m_prev).second) {
-      dout(20) << ": epath=" << _epath << " is deleted in current snapshot " << dendl;
-      // do not depend on d_type reported in struct dirent as the
-      // delete and create could have been processed and a restart
-      // of an interrupted sync would use the incorrect unlink API.
-      // N.B.: snapdiff returns the deleted entry before the newly
-      // created one.
-      struct ceph_statx pstx;
-      r = ceph_statxat(m_remote, m_fh->r_fd_dir_root, _epath.c_str(), &pstx,
-                       CEPH_STATX_MODE, AT_STATX_DONT_SYNC | AT_SYMLINK_NOFOLLOW);
-      if (r < 0 && r != -ENOENT) {
-        derr << ": failed to stat remote entry=" << _epath << ", r=" << r << dendl;
-        return r;
-      }
-      if (r == 0) {
-        if (!S_ISDIR(pstx.stx_mode)) {
-          r = ceph_unlinkat(m_remote, m_fh->r_fd_dir_root, _epath.c_str(), 0);
-        } else {
-          r = purge_func(_epath);
+          m_deleted[entry.epath].emplace(e_name);
+          r = 1; //Continue with the outer loop
+          break;
         }
 
+        struct ceph_statx estx;
+        r = ceph_statxat(m_local, m_fh->c_fd, _epath.c_str(), &estx,
+                         CEPH_STATX_MODE | CEPH_STATX_UID | CEPH_STATX_GID |
+                         CEPH_STATX_SIZE | CEPH_STATX_ATIME | CEPH_STATX_MTIME,
+                         AT_STATX_DONT_SYNC | AT_SYMLINK_NOFOLLOW);
         if (r < 0) {
-          derr << ": failed to propagate missing dirs r=" << r << dendl;
+          derr << ": failed to stat epath=" << epath << ", r=" << r << dendl;
           return r;
         }
-      }
 
-      m_deleted[entry.epath].emplace(e_name);
-      continue;
-    }
+        bool pic = entry.is_purged_or_itype_changed() || m_deleted[entry.epath].contains(e_name);
+        if (S_ISDIR(estx.stx_mode)) {
+          SyncEntry se;
+          r = init_directory(_epath, estx, pic, &se);
+          if (r < 0) {
+            return r;
+          }
 
-    struct ceph_statx estx;
-    r = ceph_statxat(m_local, m_fh->c_fd, _epath.c_str(), &estx,
-                     CEPH_STATX_MODE | CEPH_STATX_UID | CEPH_STATX_GID |
-                     CEPH_STATX_SIZE | CEPH_STATX_ATIME | CEPH_STATX_MTIME,
-                     AT_STATX_DONT_SYNC | AT_SYMLINK_NOFOLLOW);
-    if (r < 0) {
-      derr << ": failed to stat epath=" << epath << ", r=" << r << dendl;
-      return r;
-    }
+          if (pic) {
+            dout(10) << ": purge or itype change (including parent) found for entry="
+                     << se.epath << dendl;
+            se.set_purged_or_itype_changed();
+          }
 
-    bool pic = entry.is_purged_or_itype_changed() || m_deleted[entry.epath].contains(e_name);
-    if (S_ISDIR(estx.stx_mode)) {
-      SyncEntry se;
-      r = init_directory(_epath, estx, pic, &se);
-      if (r < 0) {
-        return r;
+          m_sync_stack.emplace(se);
+          dout(20) << ": Added directory to stack =" << _epath << dendl;
+          r = remote_mkdir(_epath, estx);
+          if (r < 0) {
+            derr << ": mkdir failed on remote. epath=" << _epath << ": " << cpp_strerror(r)
+               << dendl;
+            return r;
+          }
+          //Fill epath to avoid caller treat this as failure and breaking the loop early.
+          *epath = _epath;
+          *stx = estx;
+          return r; // New directory added to stack
+        } else {
+          push_dataq_entry(SyncEntry(_epath, estx, !pic));
+          dout(10) << ": sync_check=" << *sync_check << " for epath=" << _epath << dendl;
+       }
       }
+    }
 
-      if (pic) {
-        dout(10) << ": purge or itype change (including parent) found for entry="
-                 << se.epath << dendl;
-        se.set_purged_or_itype_changed();
-      }
+    if (r == 1)
+      continue;
 
-      m_sync_stack.emplace(se);
+    if (r == 0) {
+      dout(10) << ": done for directory=" << entry.epath << dendl;
+      fini_directory(entry);
+      m_sync_stack.pop();
+      continue;
     }
 
-    *epath = _epath;
-    *stx = estx;
-    *sync_check = !pic;
-
-    dout(10) << ": sync_check=" << *sync_check << " for epath=" << *epath << dendl;
-    return 0;
+    if (r < 0) {
+      return r;
+    }
   }
 
   *epath = "";
@@ -2195,17 +2204,12 @@ void PeerReplayer::run_datasync(SnapshotDataSyncThread *data_replayer) {
       break;
     }
 
-    //TODO move sync_check to SyncEntry to make it work with SnapDiffSync.
-    //Always true for Remotesync
-    bool sync_check = true;
-
     // Wait on data sync queue for entries to process
     SyncEntry entry;
     while (syncm->pop_dataq_entry(entry)) {
-      //TODO Fix process entry for SnapDiffSync
       bool need_data_sync = true;
       bool need_attr_sync = true;
-      if (sync_check) {
+      if (entry.sync_check) {
         r = should_sync_entry(entry.epath, entry.stx, fh,
                               &need_data_sync, &need_attr_sync);
         if (r < 0) {
@@ -2217,7 +2221,7 @@ void PeerReplayer::run_datasync(SnapshotDataSyncThread *data_replayer) {
              << need_data_sync << " attr_sync=" << need_attr_sync << dendl;
       if (need_data_sync || need_attr_sync) {
         r = remote_file_op(syncm, std::string(syncm->get_m_dir_root()),
-                           entry.epath, entry.stx, sync_check, fh,
+                           entry.epath, entry.stx, entry.sync_check, fh,
                            need_data_sync, need_attr_sync);
         if (r < 0) {
           //TODO Handle bail out with taking remote snap
index 550944af36c854dc6253c1e1956007b5673e0ca0..ae83cf00410811de3ada11fabb114c445b87b5a7 100644 (file)
@@ -128,6 +128,7 @@ private:
     // includes parent dentry purge
     bool purged_or_itype_changed = false;
     bool is_snapdiff = false;
+    bool sync_check = true;
 
     SyncEntry() {
     }
@@ -137,6 +138,13 @@ private:
       : epath(path),
         stx(stx) {
     }
+    SyncEntry(std::string_view path,
+              const struct ceph_statx &stx,
+             bool sync_check)
+      : epath(path),
+        stx(stx),
+       sync_check(sync_check) {
+    }
     SyncEntry(std::string_view path,
               ceph_dir_result *dirp,
               const struct ceph_statx &stx)