From 8cb960e9760df587015c43a46ea550946cc86a12 Mon Sep 17 00:00:00 2001 From: Jos Collin Date: Thu, 23 Nov 2023 11:55:11 +0530 Subject: [PATCH] cephfs_mirror: use snapdiff api for incremental syncing Use snapdiff api to sync only the delta of files between two snapshots. Fixes: https://tracker.ceph.com/issues/61334 Signed-off-by: Jos Collin (cherry picked from commit 96c351c81cb60c34aef16fe1dc8dd2e70fc5acc6) --- src/tools/cephfs_mirror/PeerReplayer.cc | 193 ++++++++++++++++++++++-- src/tools/cephfs_mirror/PeerReplayer.h | 2 + 2 files changed, 185 insertions(+), 10 deletions(-) diff --git a/src/tools/cephfs_mirror/PeerReplayer.cc b/src/tools/cephfs_mirror/PeerReplayer.cc index 47a4fd0219f9..6b543dc369de 100644 --- a/src/tools/cephfs_mirror/PeerReplayer.cc +++ b/src/tools/cephfs_mirror/PeerReplayer.cc @@ -2,6 +2,7 @@ // vim: ts=8 sw=2 smarttab #include +#include #include #include #include @@ -63,6 +64,12 @@ std::string entry_path(const std::string &dir, const std::string &name) { return dir + "/" + name; } +std::string entry_diff_path(const std::string &dir, const std::string &name) { + if (dir == ".") + return name; + return dir + "/" + name; +} + std::map decode_snap_metadata(snap_metadata *snap_metadata, size_t nr_snap_metadata) { std::map metadata; @@ -1210,17 +1217,12 @@ void PeerReplayer::post_sync_close_handles(const FHandles &fh) { ceph_close(fh.p_mnt, fh.p_fd); } -int PeerReplayer::do_synchronize(const std::string &dir_root, const Snapshot ¤t, - boost::optional prev) { +int PeerReplayer::do_synchronize(const std::string &dir_root, const Snapshot ¤t) { dout(20) << ": dir_root=" << dir_root << ", current=" << current << dendl; - if (prev) { - dout(20) << ": incremental sync check from prev=" << prev << dendl; - } - FHandles fh; - int r = pre_sync_check_and_open_handles(dir_root, current, prev, &fh); + int r = pre_sync_check_and_open_handles(dir_root, current, boost::none, &fh); if (r < 0) { - dout(5) << ": cannot proceeed with sync: " << cpp_strerror(r) << dendl; + dout(5) << ": cannot proceed with sync: " << cpp_strerror(r) << dendl; return r; } @@ -1371,6 +1373,177 @@ int PeerReplayer::do_synchronize(const std::string &dir_root, const Snapshot &cu return r; } +int PeerReplayer::do_synchronize(const std::string &dir_root, const Snapshot ¤t, + boost::optional prev) { + if (!prev) { + derr << ": invalid previous snapshot" << dendl; + return -ENODATA; + } + + dout(20) << ": incremental sync check from prev=" << prev << dendl; + + FHandles fh; + int r = pre_sync_check_and_open_handles(dir_root, current, prev, &fh); + if (r < 0) { + dout(5) << ": cannot proceed with sync: " << cpp_strerror(r) << dendl; + return r; + } + + BOOST_SCOPE_EXIT_ALL( (this)(&fh) ) { + post_sync_close_handles(fh); + }; + + // record that we are going to "dirty" the data under this directory root + auto snap_id_str{stringify(current.second)}; + r = ceph_setxattr(m_remote_mount, dir_root.c_str(), "ceph.mirror.dirty_snap_id", + snap_id_str.c_str(), snap_id_str.size(), 0); + if (r < 0) { + derr << ": error setting \"ceph.mirror.dirty_snap_id\" on dir_root=" << dir_root + << ": " << cpp_strerror(r) << dendl; + return r; + } + + struct ceph_statx cstx; + r = ceph_fstatx(m_local_mount, fh.c_fd, &cstx, + CEPH_STATX_MODE | CEPH_STATX_UID | CEPH_STATX_GID | + CEPH_STATX_SIZE | CEPH_STATX_ATIME | CEPH_STATX_MTIME, + AT_STATX_DONT_SYNC | AT_SYMLINK_NOFOLLOW); + if (r < 0) { + derr << ": failed to stat snap=" << current.first << ": " << cpp_strerror(r) + << dendl; + return r; + } + + ceph_snapdiff_info sd_info; + ceph_snapdiff_entry_t sd_entry; + + //The queue of SyncEntry items (directories) to be synchronized. + //We follow a breadth first approach here based on the snapdiff output. + std::queue sync_queue; + + //start with initial/default entry + std::string epath = ".", npath = "", nabs_path = "", nname = ""; + sync_queue.emplace(SyncEntry(epath, cstx)); + + while (!sync_queue.empty()) { + if (should_backoff(dir_root, &r)) { + dout(0) << ": backing off r=" << r << dendl; + break; + } + r = pre_sync_check_and_open_handles(dir_root, current, prev, &fh); + if (r < 0) { + dout(5) << ": cannot proceed with sync: " << cpp_strerror(r) << dendl; + return r; + } + + dout(20) << ": " << sync_queue.size() << " entries in queue" << dendl; + const auto &queue_entry = sync_queue.front(); + epath = queue_entry.epath; + dout(20) << ": syncing entry, path=" << epath << dendl; + r = ceph_open_snapdiff(fh.p_mnt, dir_root.c_str(), epath.c_str(), + stringify((*prev).first).c_str(), current.first.c_str(), &sd_info); + if (r != 0) { + derr << ": failed to open snapdiff, r=" << r << dendl; + return r; + } + while (0 < (r = ceph_readdir_snapdiff(&sd_info, &sd_entry))) { + if (r < 0) { + derr << ": failed to read directory=" << epath << dendl; + ceph_close_snapdiff(&sd_info); + return r; + } + + //New entry found + nname = sd_entry.dir_entry.d_name; + if ("." == nname || ".." == nname) + continue; + // create path for the newly found entry + npath = entry_diff_path(epath, nname); + nabs_path = entry_diff_path(dir_root, npath); + + r = ceph_statx(sd_info.cmount, nabs_path.c_str(), &cstx, + CEPH_STATX_MODE | CEPH_STATX_UID | CEPH_STATX_GID | + CEPH_STATX_SIZE | CEPH_STATX_ATIME | CEPH_STATX_MTIME, + AT_STATX_DONT_SYNC | AT_SYMLINK_NOFOLLOW); + if (r < 0) { + // can't stat, so it's a deleted entry. + if (DT_DIR == sd_entry.dir_entry.d_type) { // is a directory + r = cleanup_remote_dir(dir_root, npath, fh); + if (r < 0) { + derr << ": failed to remove directory=" << nabs_path << dendl; + break; + } + } + else { // is a file + r = ceph_unlinkat(m_remote_mount, fh.r_fd_dir_root, npath.c_str(), 0); + if (r < 0) { + break; + } + } + } else { + // stat success, update the existing entry + struct ceph_statx tstx; + int rstat_r = ceph_statx(m_remote_mount, nabs_path.c_str(), &tstx, + CEPH_STATX_MODE | CEPH_STATX_UID | CEPH_STATX_GID | + CEPH_STATX_SIZE | CEPH_STATX_ATIME | CEPH_STATX_MTIME, + AT_STATX_DONT_SYNC | AT_SYMLINK_NOFOLLOW); + if (S_ISDIR(cstx.stx_mode)) { // is a directory + //cleanup if it's a file in the remotefs + if ((0 == rstat_r) && !S_ISDIR(tstx.stx_mode)) { + r = ceph_unlinkat(m_remote_mount, fh.r_fd_dir_root, npath.c_str(), 0); + if (r < 0) { + derr << ": Error in directory sync. Failed to remove file=" + << nabs_path << dendl; + break; + } + } + r = remote_mkdir(npath, cstx, fh); + if (r < 0) { + break; + } + // push it to sync_queue for later processing + sync_queue.emplace(SyncEntry(npath, cstx)); + } else { // is a file + bool need_data_sync = true; + bool need_attr_sync = true; + r = should_sync_entry(npath, cstx, fh, &need_data_sync, &need_attr_sync); + if (r < 0) { + break; + } + dout(5) << ": entry=" << npath << ", data_sync=" << need_data_sync + << ", attr_sync=" << need_attr_sync << dendl; + if (need_data_sync || need_attr_sync) { + //cleanup if it's a directory in the remotefs + if ((0 == rstat_r) && S_ISDIR(tstx.stx_mode)) { + r = cleanup_remote_dir(dir_root, npath, fh); + if (r < 0) { + derr << ": Error in file sync. Failed to remove remote directory=" + << nabs_path << dendl; + break; + } + } + r = remote_file_op(dir_root, npath, cstx, fh, need_data_sync, need_attr_sync); + if (r < 0) { + break; + } + } + } + } + } + if (0 == r) { + dout(10) << ": successfully synchronized the entry=" << epath << dendl; + } + + //Close the current open directory and take the next queue_entry, if success or failure. + r = ceph_close_snapdiff(&sd_info); + if (r != 0) { + derr << ": failed to close directory=" << epath << dendl; + } + sync_queue.pop(); + } + return r; +} + int PeerReplayer::synchronize(const std::string &dir_root, const Snapshot ¤t, boost::optional prev) { dout(20) << ": dir_root=" << dir_root << ", current=" << current << dendl; @@ -1389,7 +1562,7 @@ int PeerReplayer::synchronize(const std::string &dir_root, const Snapshot &curre if (r < 0) { dout(5) << ": missing \"ceph.mirror.dirty_snap_id\" xattr on remote -- using" << " incremental sync with remote scan" << dendl; - r = do_synchronize(dir_root, current, boost::none); + r = do_synchronize(dir_root, current); } else { size_t xlen = r; char *val = (char *)alloca(xlen+1); @@ -1410,7 +1583,7 @@ int PeerReplayer::synchronize(const std::string &dir_root, const Snapshot &curre r = do_synchronize(dir_root, current, prev); } else { dout(5) << ": mismatch -- using incremental sync with remote scan" << dendl; - r = do_synchronize(dir_root, current, boost::none); + r = do_synchronize(dir_root, current); } } diff --git a/src/tools/cephfs_mirror/PeerReplayer.h b/src/tools/cephfs_mirror/PeerReplayer.h index 4d86dc43632d..35918fc6e49e 100644 --- a/src/tools/cephfs_mirror/PeerReplayer.h +++ b/src/tools/cephfs_mirror/PeerReplayer.h @@ -301,6 +301,8 @@ private: int do_synchronize(const std::string &dir_root, const Snapshot ¤t, boost::optional prev); + int do_synchronize(const std::string &dir_root, const Snapshot ¤t); + int synchronize(const std::string &dir_root, const Snapshot ¤t, boost::optional prev); int do_sync_snaps(const std::string &dir_root); -- 2.47.3