From: Kotresh HR Date: Wed, 14 Jan 2026 12:17:47 +0000 (+0530) Subject: tools/cephfs_mirror: Add SnapDiff entries to dataq X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=cc1aeb3db20260f97bba4f3bce85e0abcfaf2b07;p=ceph-ci.git tools/cephfs_mirror: Add SnapDiff entries to dataq Add SnapDiff entries to dataq and process the same in datasync threads similar to RemoteSync entries. Fixes: https://tracker.ceph.com/issues/73452 Signed-off-by: Kotresh HR --- diff --git a/src/tools/cephfs_mirror/PeerReplayer.cc b/src/tools/cephfs_mirror/PeerReplayer.cc index 01327e163b5..3c8bc1509c6 100644 --- a/src/tools/cephfs_mirror/PeerReplayer.cc +++ b/src/tools/cephfs_mirror/PeerReplayer.cc @@ -1508,87 +1508,96 @@ int PeerReplayer::SnapDiffSync::get_entry(std::string *epath, struct ceph_statx dout(20) << ": entry=" << e_name << ", snapid=" << snapid << dendl; if (e_name != "." && e_name != "..") { - break; - } - } - - if (r == 0) { - dout(10) << ": done for directory=" << entry.epath << dendl; - fini_directory(entry); - m_sync_stack.pop(); - continue; - } + auto _epath = entry_path(entry.epath, e_name); + dout(20) << ": epath=" << _epath << dendl; + if (snapid == (*m_prev).second) { + dout(20) << ": epath=" << _epath << " is deleted in current snapshot " << dendl; + // do not depend on d_type reported in struct dirent as the + // delete and create could have been processed and a restart + // of an interrupted sync would use the incorrect unlink API. + // N.B.: snapdiff returns the deleted entry before the newly + // created one. + struct ceph_statx pstx; + r = ceph_statxat(m_remote, m_fh->r_fd_dir_root, _epath.c_str(), &pstx, + CEPH_STATX_MODE, AT_STATX_DONT_SYNC | AT_SYMLINK_NOFOLLOW); + if (r < 0 && r != -ENOENT) { + derr << ": failed to stat remote entry=" << _epath << ", r=" << r << dendl; + return r; + } + if (r == 0) { + if (!S_ISDIR(pstx.stx_mode)) { + r = ceph_unlinkat(m_remote, m_fh->r_fd_dir_root, _epath.c_str(), 0); + } else { + r = purge_func(_epath); + } - if (r < 0) { - return r; - } + if (r < 0) { + derr << ": failed to propagate missing dirs r=" << r << dendl; + return r; + } + } - auto _epath = entry_path(entry.epath, e_name); - dout(20) << ": epath=" << _epath << dendl; - if (snapid == (*m_prev).second) { - dout(20) << ": epath=" << _epath << " is deleted in current snapshot " << dendl; - // do not depend on d_type reported in struct dirent as the - // delete and create could have been processed and a restart - // of an interrupted sync would use the incorrect unlink API. - // N.B.: snapdiff returns the deleted entry before the newly - // created one. - struct ceph_statx pstx; - r = ceph_statxat(m_remote, m_fh->r_fd_dir_root, _epath.c_str(), &pstx, - CEPH_STATX_MODE, AT_STATX_DONT_SYNC | AT_SYMLINK_NOFOLLOW); - if (r < 0 && r != -ENOENT) { - derr << ": failed to stat remote entry=" << _epath << ", r=" << r << dendl; - return r; - } - if (r == 0) { - if (!S_ISDIR(pstx.stx_mode)) { - r = ceph_unlinkat(m_remote, m_fh->r_fd_dir_root, _epath.c_str(), 0); - } else { - r = purge_func(_epath); + m_deleted[entry.epath].emplace(e_name); + r = 1; //Continue with the outer loop + break; } + struct ceph_statx estx; + r = ceph_statxat(m_local, m_fh->c_fd, _epath.c_str(), &estx, + CEPH_STATX_MODE | CEPH_STATX_UID | CEPH_STATX_GID | + CEPH_STATX_SIZE | CEPH_STATX_ATIME | CEPH_STATX_MTIME, + AT_STATX_DONT_SYNC | AT_SYMLINK_NOFOLLOW); if (r < 0) { - derr << ": failed to propagate missing dirs r=" << r << dendl; + derr << ": failed to stat epath=" << epath << ", r=" << r << dendl; return r; } - } - m_deleted[entry.epath].emplace(e_name); - continue; - } + bool pic = entry.is_purged_or_itype_changed() || m_deleted[entry.epath].contains(e_name); + if (S_ISDIR(estx.stx_mode)) { + SyncEntry se; + r = init_directory(_epath, estx, pic, &se); + if (r < 0) { + return r; + } - struct ceph_statx estx; - r = ceph_statxat(m_local, m_fh->c_fd, _epath.c_str(), &estx, - CEPH_STATX_MODE | CEPH_STATX_UID | CEPH_STATX_GID | - CEPH_STATX_SIZE | CEPH_STATX_ATIME | CEPH_STATX_MTIME, - AT_STATX_DONT_SYNC | AT_SYMLINK_NOFOLLOW); - if (r < 0) { - derr << ": failed to stat epath=" << epath << ", r=" << r << dendl; - return r; - } + if (pic) { + dout(10) << ": purge or itype change (including parent) found for entry=" + << se.epath << dendl; + se.set_purged_or_itype_changed(); + } - bool pic = entry.is_purged_or_itype_changed() || m_deleted[entry.epath].contains(e_name); - if (S_ISDIR(estx.stx_mode)) { - SyncEntry se; - r = init_directory(_epath, estx, pic, &se); - if (r < 0) { - return r; + m_sync_stack.emplace(se); + dout(20) << ": Added directory to stack =" << _epath << dendl; + r = remote_mkdir(_epath, estx); + if (r < 0) { + derr << ": mkdir failed on remote. epath=" << _epath << ": " << cpp_strerror(r) + << dendl; + return r; + } + //Fill epath to avoid caller treat this as failure and breaking the loop early. + *epath = _epath; + *stx = estx; + return r; // New directory added to stack + } else { + push_dataq_entry(SyncEntry(_epath, estx, !pic)); + dout(10) << ": sync_check=" << *sync_check << " for epath=" << _epath << dendl; + } } + } - if (pic) { - dout(10) << ": purge or itype change (including parent) found for entry=" - << se.epath << dendl; - se.set_purged_or_itype_changed(); - } + if (r == 1) + continue; - m_sync_stack.emplace(se); + if (r == 0) { + dout(10) << ": done for directory=" << entry.epath << dendl; + fini_directory(entry); + m_sync_stack.pop(); + continue; } - *epath = _epath; - *stx = estx; - *sync_check = !pic; - - dout(10) << ": sync_check=" << *sync_check << " for epath=" << *epath << dendl; - return 0; + if (r < 0) { + return r; + } } *epath = ""; @@ -2195,17 +2204,12 @@ void PeerReplayer::run_datasync(SnapshotDataSyncThread *data_replayer) { break; } - //TODO move sync_check to SyncEntry to make it work with SnapDiffSync. - //Always true for Remotesync - bool sync_check = true; - // Wait on data sync queue for entries to process SyncEntry entry; while (syncm->pop_dataq_entry(entry)) { - //TODO Fix process entry for SnapDiffSync bool need_data_sync = true; bool need_attr_sync = true; - if (sync_check) { + if (entry.sync_check) { r = should_sync_entry(entry.epath, entry.stx, fh, &need_data_sync, &need_attr_sync); if (r < 0) { @@ -2217,7 +2221,7 @@ void PeerReplayer::run_datasync(SnapshotDataSyncThread *data_replayer) { << need_data_sync << " attr_sync=" << need_attr_sync << dendl; if (need_data_sync || need_attr_sync) { r = remote_file_op(syncm, std::string(syncm->get_m_dir_root()), - entry.epath, entry.stx, sync_check, fh, + entry.epath, entry.stx, entry.sync_check, fh, need_data_sync, need_attr_sync); if (r < 0) { //TODO Handle bail out with taking remote snap diff --git a/src/tools/cephfs_mirror/PeerReplayer.h b/src/tools/cephfs_mirror/PeerReplayer.h index 550944af36c..ae83cf00410 100644 --- a/src/tools/cephfs_mirror/PeerReplayer.h +++ b/src/tools/cephfs_mirror/PeerReplayer.h @@ -128,6 +128,7 @@ private: // includes parent dentry purge bool purged_or_itype_changed = false; bool is_snapdiff = false; + bool sync_check = true; SyncEntry() { } @@ -137,6 +138,13 @@ private: : epath(path), stx(stx) { } + SyncEntry(std::string_view path, + const struct ceph_statx &stx, + bool sync_check) + : epath(path), + stx(stx), + sync_check(sync_check) { + } SyncEntry(std::string_view path, ceph_dir_result *dirp, const struct ceph_statx &stx)