From fe09958ba2ea7bce1ff93d91ebde65825022261a Mon Sep 17 00:00:00 2001 From: Jos Collin Date: Thu, 30 Jan 2025 17:32:30 +0530 Subject: [PATCH] cephfs-mirror: current sync mechanism uses sync mechanism subclass'ing Fixes: https://tracker.ceph.com/issues/69671 Signed-off-by: Jos Collin Signed-off-by: Venky Shankar (cherry picked from commit d9ac4315154a73c7bc84e94f55a9966c115adaca) Conflicts: src/tools/cephfs_mirror/PeerReplayer.h Minor conflict involcing boost::optional<> --- src/tools/cephfs_mirror/PeerReplayer.cc | 608 ++++++++++++++---------- src/tools/cephfs_mirror/PeerReplayer.h | 74 ++- 2 files changed, 423 insertions(+), 259 deletions(-) diff --git a/src/tools/cephfs_mirror/PeerReplayer.cc b/src/tools/cephfs_mirror/PeerReplayer.cc index 357821484ccc1..c52a42a05a35b 100644 --- a/src/tools/cephfs_mirror/PeerReplayer.cc +++ b/src/tools/cephfs_mirror/PeerReplayer.cc @@ -2,7 +2,6 @@ // vim: ts=8 sw=2 smarttab #include -#include #include #include #include @@ -1218,187 +1217,343 @@ int PeerReplayer::sync_perms(const std::string& path) { return 0; } -int PeerReplayer::do_synchronize(const std::string &dir_root, const Snapshot ¤t) { - dout(20) << ": dir_root=" << dir_root << ", current=" << current << dendl; - FHandles fh; - int r = pre_sync_check_and_open_handles(dir_root, current, boost::none, &fh); +PeerReplayer::SyncMechanism::SyncMechanism(MountRef local, MountRef remote, FHandles *fh, + const Peer &peer, const Snapshot ¤t, + boost::optional prev) + : m_local(local), + m_remote(remote), + m_fh(fh), + m_peer(peer), + m_current(current), + m_prev(prev) { + } + +PeerReplayer::SyncMechanism::~SyncMechanism() { +} + +PeerReplayer::SnapDiffSync::SnapDiffSync(std::string_view dir_root, MountRef local, MountRef remote, + FHandles *fh, const Peer &peer, const Snapshot ¤t, + boost::optional prev) + : SyncMechanism(local, remote, fh, peer, current, prev), + m_dir_root(dir_root) { +} + +PeerReplayer::SnapDiffSync::~SnapDiffSync() { +} + +int PeerReplayer::SnapDiffSync::init_sync() { + struct ceph_statx tstx; + int r = ceph_fstatx(m_local, m_fh->c_fd, &tstx, + CEPH_STATX_MODE | CEPH_STATX_UID | CEPH_STATX_GID | + CEPH_STATX_SIZE | CEPH_STATX_ATIME | CEPH_STATX_MTIME, + AT_STATX_DONT_SYNC | AT_SYMLINK_NOFOLLOW); if (r < 0) { - dout(5) << ": cannot proceed with sync: " << cpp_strerror(r) << dendl; + derr << ": failed to stat snap=" << m_current.first << ": " << cpp_strerror(r) + << dendl; return r; } - // record that we are going to "dirty" the data under this - // directory root - auto snap_id_str{stringify(current.second)}; - r = ceph_fsetxattr(m_remote_mount, fh.r_fd_dir_root, "ceph.mirror.dirty_snap_id", - snap_id_str.c_str(), snap_id_str.size(), 0); - if (r < 0) { - derr << ": error setting \"ceph.mirror.dirty_snap_id\" on dir_root=" << dir_root - << ": " << cpp_strerror(r) << dendl; - ceph_close(m_local_mount, fh.c_fd); - ceph_close(fh.p_mnt, fh.p_fd); + dout(20) << ": open_snapdiff for dir_root=" << m_dir_root << ", path=., prev=" + << (*m_prev).first << ", current=" << m_current.first << dendl; + + ceph_snapdiff_info info; + r = ceph_open_snapdiff(m_local, m_dir_root.c_str(), ".", + stringify((*m_prev).first).c_str(), stringify(m_current.first).c_str(), &info); + if (r != 0) { + derr << ": failed to open snapdiff for " << m_dir_root << ": r=" << r << dendl; return r; } + m_sync_stack.emplace(SyncEntry(".", info, tstx)); + return 0; +} + +int PeerReplayer::SnapDiffSync::get_entry(std::string *epath, struct ceph_statx *stx, + const std::function &dirsync_func, + const std::function &purge_func) { + dout(20) << ": sync stack size=" << m_sync_stack.size() << dendl; + + while (!m_sync_stack.empty()) { + auto &entry = m_sync_stack.top(); + dout(20) << ": top of stack path=" << entry.epath << dendl; + + if (!entry.is_directory()) { + *epath = entry.epath; + *stx = entry.stx; + m_sync_stack.pop(); + return 0; + } + + int r; + std::string e_name; + ceph_snapdiff_entry_t sd_entry; + while (true) { + r = ceph_readdir_snapdiff(&(entry.info), &sd_entry); + if (r < 0) { + derr << ": failed to read directory=" << entry.epath << dendl; + break; + } + if (r == 0) { + break; + } + + dout(20) << ": entry=" << sd_entry.dir_entry.d_name << ", snapid=" + << sd_entry.snapid << dendl; + + auto d_name = std::string(sd_entry.dir_entry.d_name); + if (d_name != "." && d_name != "..") { + e_name = d_name; + break; + } + } + + if (r == 0) { + dout(10) << ": done for directory=" << entry.epath << dendl; + if (ceph_close_snapdiff(&(entry.info)) < 0) { + derr << ": failed to close snapdiff for " << entry.epath << dendl; + } + m_sync_stack.pop(); + continue; + } + + if (r < 0) { + return r; + } + + auto _epath = entry_path(entry.epath, e_name); + dout(20) << ": epath=" << _epath << dendl; + if (sd_entry.snapid == (*m_prev).second) { + dout(20) << ": epath=" << _epath << " is deleted in current snapshot " << dendl; + // do not depend on d_type reported in struct dirent as the + // delete and create could have been processed and a restart + // of an interrupted sync would use the incorrect unlink API. + // N.B.: snapdiff returns the deleted entry before the newly + // created one. + struct ceph_statx pstx; + r = ceph_statxat(m_remote, m_fh->r_fd_dir_root, _epath.c_str(), &pstx, + CEPH_STATX_MODE, AT_STATX_DONT_SYNC | AT_SYMLINK_NOFOLLOW); + if (r < 0 && r != -ENOENT) { + derr << ": failed to stat remote entry=" << _epath << ": " << cpp_strerror(r) + << dendl; + return r; + } + if (r == 0) { + if (!S_ISDIR(pstx.stx_mode)) { + r = ceph_unlinkat(m_remote, m_fh->r_fd_dir_root, _epath.c_str(), 0); + } else { + r = purge_func(_epath); + } + + if (r < 0) { + derr << ": failed to propagate missing dirs: " << cpp_strerror(r) << dendl; + return r; + } + } + + continue; + } + + struct ceph_statx estx; + r = ceph_statxat(m_local, m_fh->c_fd, _epath.c_str(), &estx, + CEPH_STATX_MODE | CEPH_STATX_UID | CEPH_STATX_GID | + CEPH_STATX_SIZE | CEPH_STATX_ATIME | CEPH_STATX_MTIME, + AT_STATX_DONT_SYNC | AT_SYMLINK_NOFOLLOW); + if (r < 0) { + derr << ": failed to stat epath=" << epath << ": " << cpp_strerror(r) + << dendl; + return r; + } + + if (S_ISDIR(estx.stx_mode)) { + dout(20) << ": open_snapdiff for dir_root=" << m_dir_root << ", path=" << _epath + << ", prev=" << (*m_prev).first << ", current=" << m_current.first << dendl; + + ceph_snapdiff_info info; + r = ceph_open_snapdiff(m_local, m_dir_root.c_str(), _epath.c_str(), + stringify((*m_prev).first).c_str(), stringify(m_current.first).c_str(), &info); + if (r != 0) { + derr << ": failed to open snapdiff for " << m_dir_root << ": r=" << r << dendl; + return r; + } + + m_sync_stack.emplace(SyncEntry(_epath, info, estx)); + } + + *epath = _epath; + *stx = estx; + + return 0; + } + + *epath = ""; + return 0; +} + +void PeerReplayer::SnapDiffSync::finish_sync() { + dout(20) << dendl; + + while (!m_sync_stack.empty()) { + auto &entry = m_sync_stack.top(); + if (entry.is_directory()) { + dout(20) << ": closing local directory=" << entry.epath << dendl; + if (ceph_close_snapdiff(&(entry.info)) < 0) { + derr << ": failed to close snapdiff directory=" << entry.epath << dendl; + } + } + + m_sync_stack.pop(); + } +} + +PeerReplayer::RemoteSync::RemoteSync(MountRef local, MountRef remote, FHandles *fh, + const Peer &peer, const Snapshot ¤t, + boost::optional prev) + : SyncMechanism(local, remote, fh, peer, current, prev) { +} + +PeerReplayer::RemoteSync::~RemoteSync() { +} + +int PeerReplayer::RemoteSync::init_sync() { struct ceph_statx tstx; - r = ceph_fstatx(m_local_mount, fh.c_fd, &tstx, - CEPH_STATX_MODE | CEPH_STATX_UID | CEPH_STATX_GID | - CEPH_STATX_SIZE | CEPH_STATX_ATIME | CEPH_STATX_MTIME, - AT_STATX_DONT_SYNC | AT_SYMLINK_NOFOLLOW); + int r = ceph_fstatx(m_local, m_fh->c_fd, &tstx, + CEPH_STATX_MODE | CEPH_STATX_UID | CEPH_STATX_GID | + CEPH_STATX_SIZE | CEPH_STATX_ATIME | CEPH_STATX_MTIME, + AT_STATX_DONT_SYNC | AT_SYMLINK_NOFOLLOW); if (r < 0) { - derr << ": failed to stat snap=" << current.first << ": " << cpp_strerror(r) + derr << ": failed to stat snap=" << m_current.first << ": " << cpp_strerror(r) << dendl; - ceph_close(m_local_mount, fh.c_fd); - ceph_close(fh.p_mnt, fh.p_fd); return r; } ceph_dir_result *tdirp; - r = ceph_fdopendir(m_local_mount, fh.c_fd, &tdirp); + r = ceph_fdopendir(m_local, m_fh->c_fd, &tdirp); if (r < 0) { - derr << ": failed to open local snap=" << current.first << ": " << cpp_strerror(r) + derr << ": failed to open local snap=" << m_current.first << ": " << cpp_strerror(r) << dendl; - ceph_close(m_local_mount, fh.c_fd); - ceph_close(fh.p_mnt, fh.p_fd); return r; } - // starting from this point we shouldn't care about manual closing of fh.c_fd, - // it will be closed automatically when bound tdirp is closed. - std::stack sync_stack; - sync_stack.emplace(SyncEntry(".", tdirp, tstx)); - while (!sync_stack.empty()) { - if (should_backoff(dir_root, &r)) { - dout(0) << ": backing off r=" << r << dendl; - break; - } + m_sync_stack.emplace(SyncEntry(".", tdirp, tstx)); + return 0; +} - dout(20) << ": " << sync_stack.size() << " entries in stack" << dendl; - std::string e_name; - auto &entry = sync_stack.top(); +int PeerReplayer::RemoteSync::get_entry(std::string *epath, struct ceph_statx *stx, + const std::function &dirsync_func, + const std::function &purge_func) { + dout(20) << ": sync stack size=" << m_sync_stack.size() << dendl; + + while (!m_sync_stack.empty()) { + auto &entry = m_sync_stack.top(); dout(20) << ": top of stack path=" << entry.epath << dendl; - if (entry.is_directory()) { - // entry is a directory -- propagate deletes for missing entries - // (and changed inode types) to the remote filesystem. - if (!entry.needs_remote_sync()) { - r = propagate_deleted_entries(dir_root, entry.epath, fh); - if (r < 0 && r != -ENOENT) { - derr << ": failed to propagate missing dirs: " << cpp_strerror(r) << dendl; - break; - } - entry.set_remote_synced(); - } - struct ceph_statx stx; - struct dirent de; - while (true) { - r = ceph_readdirplus_r(m_local_mount, entry.dirp, &de, &stx, - CEPH_STATX_MODE | CEPH_STATX_UID | CEPH_STATX_GID | - CEPH_STATX_SIZE | CEPH_STATX_ATIME | CEPH_STATX_MTIME, - AT_STATX_DONT_SYNC | AT_SYMLINK_NOFOLLOW, NULL); - if (r < 0) { - derr << ": failed to local read directory=" << entry.epath << dendl; - break; - } - if (r == 0) { - break; - } + if (!entry.is_directory()) { + *epath = entry.epath; + *stx = entry.stx; + m_sync_stack.pop(); + return 0; + } - auto d_name = std::string(de.d_name); - if (d_name != "." && d_name != "..") { - e_name = d_name; - break; - } + // entry is a directory -- propagate deletes for missing entries + // (and changed inode types) to the remote filesystem. + if (!entry.needs_remote_sync()) { + int r = dirsync_func(entry.epath); + if (r < 0 && r != -ENOENT) { + derr << ": failed to propagate missing dirs: " << cpp_strerror(r) << dendl; + return r; } + entry.set_remote_synced(); + } + int r; + std::string e_name; + while (true) { + struct dirent de; + r = ceph_readdirplus_r(m_local, entry.dirp, &de, NULL, + CEPH_STATX_MODE | CEPH_STATX_UID | CEPH_STATX_GID | + CEPH_STATX_SIZE | CEPH_STATX_ATIME | CEPH_STATX_MTIME, + AT_STATX_DONT_SYNC | AT_SYMLINK_NOFOLLOW, NULL); + if (r < 0) { + derr << ": failed to local read directory=" << entry.epath << dendl; + break; + } if (r == 0) { - dout(10) << ": done for directory=" << entry.epath << dendl; - if (ceph_closedir(m_local_mount, entry.dirp) < 0) { - derr << ": failed to close local directory=" << entry.epath << dendl; - } - sync_stack.pop(); - continue; + break; } - if (r < 0) { + + auto d_name = std::string(de.d_name); + if (d_name != "." && d_name != "..") { + e_name = d_name; break; } + } - auto epath = entry_path(entry.epath, e_name); - if (S_ISDIR(stx.stx_mode)) { - r = remote_mkdir(epath, stx, fh); - if (r < 0) { - break; - } - ceph_dir_result *dirp; - r = opendirat(m_local_mount, fh.c_fd, epath, AT_SYMLINK_NOFOLLOW, &dirp); - if (r < 0) { - derr << ": failed to open local directory=" << epath << ": " - << cpp_strerror(r) << dendl; - break; - } - sync_stack.emplace(SyncEntry(epath, dirp, stx)); - } else { - sync_stack.emplace(SyncEntry(epath, stx)); + if (r == 0) { + dout(10) << ": done for directory=" << entry.epath << dendl; + if (ceph_closedir(m_local, entry.dirp) < 0) { + derr << ": failed to close local directory=" << entry.epath << dendl; } - } else { - bool need_data_sync = true; - bool need_attr_sync = true; - r = should_sync_entry(entry.epath, entry.stx, fh, - &need_data_sync, &need_attr_sync); + m_sync_stack.pop(); + continue; + } + + if (r < 0) { + return r; + } + + struct ceph_statx cstx; + auto _epath = entry_path(entry.epath, e_name); + r = ceph_statxat(m_local, m_fh->c_fd, _epath.c_str(), &cstx, + CEPH_STATX_MODE | CEPH_STATX_UID | CEPH_STATX_GID | + CEPH_STATX_SIZE | CEPH_STATX_ATIME | CEPH_STATX_MTIME, + AT_STATX_DONT_SYNC | AT_SYMLINK_NOFOLLOW); + if (r < 0) { + derr << ": failed to stat epath=" << _epath << ": " << cpp_strerror(r) + << dendl; + return r; + } + + if (S_ISDIR(cstx.stx_mode)) { + ceph_dir_result *dirp; + r = opendirat(m_local, m_fh->c_fd, _epath, AT_SYMLINK_NOFOLLOW, &dirp); if (r < 0) { + derr << ": failed to open local directory=" << _epath << ": " + << cpp_strerror(r) << dendl; break; } - dout(5) << ": entry=" << entry.epath << ", data_sync=" << need_data_sync - << ", attr_sync=" << need_attr_sync << dendl; - if (need_data_sync || need_attr_sync) { - r = remote_file_op(dir_root, entry.epath, entry.stx, fh, need_data_sync, - need_attr_sync); - if (r < 0) { - break; - } - } - dout(10) << ": done for epath=" << entry.epath << dendl; - sync_stack.pop(); + m_sync_stack.emplace(SyncEntry(_epath, dirp, cstx)); } + + *epath = _epath; + *stx = cstx; + + return 0; } - while (!sync_stack.empty()) { - auto &entry = sync_stack.top(); + *epath = ""; + return 0; +} + +void PeerReplayer::RemoteSync::finish_sync() { + dout(20) << dendl; + + while (!m_sync_stack.empty()) { + auto &entry = m_sync_stack.top(); if (entry.is_directory()) { dout(20) << ": closing local directory=" << entry.epath << dendl; - if (ceph_closedir(m_local_mount, entry.dirp) < 0) { + if (ceph_closedir(m_local, entry.dirp) < 0) { derr << ": failed to close local directory=" << entry.epath << dendl; } } - sync_stack.pop(); + m_sync_stack.pop(); } - - dout(20) << " cur:" << fh.c_fd - << " prev:" << fh.p_fd - << " ret = " << r - << dendl; - - // @FHandles.r_fd_dir_root is closed in @unregister_directory since - // its used to acquire an exclusive lock on remote dir_root. - - // c_fd has been used in ceph_fdopendir call so - // there is no need to close this fd manually. - ceph_close(fh.p_mnt, fh.p_fd); - - return r; } int PeerReplayer::do_synchronize(const std::string &dir_root, const Snapshot ¤t, boost::optional prev) { - if (!prev) { - derr << ": invalid previous snapshot" << dendl; - return -ENODATA; - } - - dout(20) << ": incremental sync check from prev=" << prev << dendl; - + dout(20) << ": dir_root=" << dir_root << ", current=" << current << dendl; FHandles fh; int r = pre_sync_check_and_open_handles(dir_root, current, prev, &fh); if (r < 0) { @@ -1406,11 +1561,11 @@ int PeerReplayer::do_synchronize(const std::string &dir_root, const Snapshot &cu return r; } - - // record that we are going to "dirty" the data under this directory root + // record that we are going to "dirty" the data under this + // directory root auto snap_id_str{stringify(current.second)}; - r = ceph_setxattr(m_remote_mount, dir_root.c_str(), "ceph.mirror.dirty_snap_id", - snap_id_str.c_str(), snap_id_str.size(), 0); + r = ceph_fsetxattr(m_remote_mount, fh.r_fd_dir_root, "ceph.mirror.dirty_snap_id", + snap_id_str.c_str(), snap_id_str.size(), 0); if (r < 0) { derr << ": error setting \"ceph.mirror.dirty_snap_id\" on dir_root=" << dir_root << ": " << cpp_strerror(r) << dendl; @@ -1419,145 +1574,82 @@ int PeerReplayer::do_synchronize(const std::string &dir_root, const Snapshot &cu return r; } - struct ceph_statx cstx; - r = ceph_fstatx(m_local_mount, fh.c_fd, &cstx, - CEPH_STATX_MODE | CEPH_STATX_UID | CEPH_STATX_GID | - CEPH_STATX_SIZE | CEPH_STATX_ATIME | CEPH_STATX_MTIME, - AT_STATX_DONT_SYNC | AT_SYMLINK_NOFOLLOW); + SyncMechanism *syncm; + if (fh.p_mnt == m_local_mount) { + syncm = new SnapDiffSync(dir_root, m_local_mount, m_remote_mount, &fh, + m_peer, current, prev); + } else { + syncm = new RemoteSync(m_local_mount, m_remote_mount, &fh, + m_peer, current, boost::none); + } + + r = syncm->init_sync(); if (r < 0) { - derr << ": failed to stat snap=" << current.first << ": " << cpp_strerror(r) - << dendl; + derr << ": failed to initialize sync mechanism" << dendl; ceph_close(m_local_mount, fh.c_fd); ceph_close(fh.p_mnt, fh.p_fd); + delete syncm; return r; } - ceph_snapdiff_info sd_info; - ceph_snapdiff_entry_t sd_entry; - - //The queue of SyncEntry items (directories) to be synchronized. - //We follow a breadth first approach here based on the snapdiff output. - std::queue sync_queue; - - //start with initial/default entry - std::string epath = ".", npath = "", nname = ""; - sync_queue.emplace(SyncEntry(epath, cstx)); - - while (!sync_queue.empty()) { + // starting from this point we shouldn't care about manual closing of fh.c_fd, + // it will be closed automatically when bound tdirp is closed. + while (true) { if (should_backoff(dir_root, &r)) { dout(0) << ": backing off r=" << r << dendl; break; } - dout(20) << ": " << sync_queue.size() << " entries in queue" << dendl; - const auto &queue_entry = sync_queue.front(); - epath = queue_entry.epath; - dout(20) << ": syncing entry, path=" << epath << dendl; - r = ceph_open_snapdiff(fh.p_mnt, dir_root.c_str(), epath.c_str(), - stringify((*prev).first).c_str(), current.first.c_str(), &sd_info); - if (r != 0) { - derr << ": failed to open snapdiff, r=" << r << dendl; - ceph_close(m_local_mount, fh.c_fd); - ceph_close(fh.p_mnt, fh.p_fd); - return r; + std::string epath; + struct ceph_statx stx; + r = syncm->get_entry(&epath, &stx, + [this, &dir_root, &fh](const std::string &epath) { + return propagate_deleted_entries(dir_root, epath, fh); + }, + [this, &dir_root, &fh](const std::string &epath) { + return cleanup_remote_dir(dir_root, epath, fh); + }); + dout(20) << ": r=" << r << dendl; + if (r < 0) { + break; } - while (0 < (r = ceph_readdir_snapdiff(&sd_info, &sd_entry))) { + + dout(20) << ": epath=" << epath << dendl; + if (epath == "") { + dout(10) << ": tree traversal done for dir_root=" << dir_root << dendl; + break; + } + + if (S_ISDIR(stx.stx_mode)) { + r = remote_mkdir(epath, stx, fh); if (r < 0) { - derr << ": failed to read directory=" << epath << dendl; - ceph_close_snapdiff(&sd_info); - ceph_close(m_local_mount, fh.c_fd); - ceph_close(fh.p_mnt, fh.p_fd); - return r; + break; } - - //New entry found - nname = sd_entry.dir_entry.d_name; - if ("." == nname || ".." == nname) - continue; - // create path for the newly found entry - npath = entry_path(epath, nname); - r = ceph_statxat(m_local_mount, fh.c_fd, npath.c_str(), &cstx, - CEPH_STATX_MODE | CEPH_STATX_UID | CEPH_STATX_GID | - CEPH_STATX_SIZE | CEPH_STATX_ATIME | CEPH_STATX_MTIME, - AT_STATX_DONT_SYNC | AT_SYMLINK_NOFOLLOW); + } else { + bool need_data_sync = true; + bool need_attr_sync = true; + r = should_sync_entry(epath, stx, fh, + &need_data_sync, &need_attr_sync); if (r < 0) { - // can't stat, so it's a deleted entry. - if (DT_DIR == sd_entry.dir_entry.d_type) { // is a directory - r = cleanup_remote_dir(dir_root, npath, fh); - if (r < 0) { - derr << ": failed to remove directory=" << npath << dendl; - break; - } - } - else { // is a file - r = ceph_unlinkat(m_remote_mount, fh.r_fd_dir_root, npath.c_str(), 0); - if (r < 0) { - break; - } - } - } else { - // stat success, update the existing entry - struct ceph_statx tstx; - int rstat_r = ceph_statxat(m_remote_mount, fh.r_fd_dir_root, npath.c_str(), &tstx, - CEPH_STATX_MODE | CEPH_STATX_UID | CEPH_STATX_GID | - CEPH_STATX_SIZE | CEPH_STATX_ATIME | CEPH_STATX_MTIME, - AT_STATX_DONT_SYNC | AT_SYMLINK_NOFOLLOW); - if (S_ISDIR(cstx.stx_mode)) { // is a directory - //cleanup if it's a file in the remotefs - if ((0 == rstat_r) && !S_ISDIR(tstx.stx_mode)) { - r = ceph_unlinkat(m_remote_mount, fh.r_fd_dir_root, npath.c_str(), 0); - if (r < 0) { - derr << ": Error in directory sync. Failed to remove file=" - << npath << dendl; - break; - } - } - r = remote_mkdir(npath, cstx, fh); - if (r < 0) { - break; - } - // push it to sync_queue for later processing - sync_queue.emplace(SyncEntry(npath, cstx)); - } else { // is a file - bool need_data_sync = true; - bool need_attr_sync = true; - r = should_sync_entry(npath, cstx, fh, &need_data_sync, &need_attr_sync); - if (r < 0) { - break; - } - dout(5) << ": entry=" << npath << ", data_sync=" << need_data_sync - << ", attr_sync=" << need_attr_sync << dendl; - if (need_data_sync || need_attr_sync) { - //cleanup if it's a directory in the remotefs - if ((0 == rstat_r) && S_ISDIR(tstx.stx_mode)) { - r = cleanup_remote_dir(dir_root, npath, fh); - if (r < 0) { - derr << ": Error in file sync. Failed to remove remote directory=" - << npath << dendl; - break; - } - } - r = remote_file_op(dir_root, npath, cstx, fh, need_data_sync, need_attr_sync); - if (r < 0) { - break; - } - } - } + break; } - } - if (0 == r) { - dout(10) << ": successfully synchronized the entry=" << epath << dendl; - } - //Close the current open directory and take the next queue_entry, if success or failure. - r = ceph_close_snapdiff(&sd_info); - if (r != 0) { - derr << ": failed to close directory=" << epath << dendl; + dout(5) << ": entry=" << epath << ", data_sync=" << need_data_sync + << ", attr_sync=" << need_attr_sync << dendl; + if (need_data_sync || need_attr_sync) { + r = remote_file_op(dir_root, epath, stx, fh, need_data_sync, need_attr_sync); + if (r < 0) { + break; + } + } + dout(10) << ": done for epath=" << epath << dendl; } - sync_queue.pop(); } - dout(20) << " current:" << fh.c_fd + syncm->finish_sync(); + delete syncm; + + dout(20) << " cur:" << fh.c_fd << " prev:" << fh.p_fd << " ret = " << r << dendl; @@ -1565,8 +1657,10 @@ int PeerReplayer::do_synchronize(const std::string &dir_root, const Snapshot &cu // @FHandles.r_fd_dir_root is closed in @unregister_directory since // its used to acquire an exclusive lock on remote dir_root. - ceph_close(m_local_mount, fh.c_fd); + // c_fd has been used in ceph_fdopendir call so + // there is no need to close this fd manually. ceph_close(fh.p_mnt, fh.p_fd); + return r; } diff --git a/src/tools/cephfs_mirror/PeerReplayer.h b/src/tools/cephfs_mirror/PeerReplayer.h index 32c71301f0068..b5199913649a3 100644 --- a/src/tools/cephfs_mirror/PeerReplayer.h +++ b/src/tools/cephfs_mirror/PeerReplayer.h @@ -10,6 +10,8 @@ #include "ServiceDaemon.h" #include "Types.h" +#include + namespace cephfs { namespace mirror { @@ -101,6 +103,7 @@ private: struct SyncEntry { std::string epath; ceph_dir_result *dirp; // valid for directories + ceph_snapdiff_info info; struct ceph_statx stx; // set by incremental sync _after_ ensuring missing entries // in the currently synced snapshot have been propagated to @@ -119,6 +122,13 @@ private: dirp(dirp), stx(stx) { } + SyncEntry(std::string_view path, + const ceph_snapdiff_info &info, + const struct ceph_statx &stx) + : epath(path), + info(info), + stx(stx) { + } bool is_directory() const { return S_ISDIR(stx.stx_mode); @@ -132,6 +142,65 @@ private: } }; + class SyncMechanism { + public: + SyncMechanism(MountRef local, MountRef remote, FHandles *fh, + const Peer &peer, /* keep dout happy */ + const Snapshot ¤t, boost::optional prev); + virtual ~SyncMechanism() = 0; + + virtual int init_sync() = 0; + + virtual int get_entry(std::string *epath, struct ceph_statx *stx, + const std::function &dirsync_func, + const std::function &purge_func) = 0; + + virtual void finish_sync() = 0; + + protected: + MountRef m_local; + MountRef m_remote; + FHandles *m_fh; + Peer m_peer; + Snapshot m_current; + boost::optional m_prev; + std::stack m_sync_stack; + }; + + class RemoteSync : public SyncMechanism { + public: + RemoteSync(MountRef local, MountRef remote, FHandles *fh, + const Peer &peer, /* keep dout happy */ + const Snapshot ¤t, boost::optional prev); + ~RemoteSync(); + + int init_sync() override; + + int get_entry(std::string *epath, struct ceph_statx *stx, + const std::function &dirsync_func, + const std::function &purge_func); + + void finish_sync(); + }; + + class SnapDiffSync : public SyncMechanism { + public: + SnapDiffSync(std::string_view dir_root, MountRef local, MountRef remote, + FHandles *fh, const Peer &peer, const Snapshot ¤t, + boost::optional prev); + ~SnapDiffSync(); + + int init_sync() override; + + int get_entry(std::string *epeth, struct ceph_statx *stx, + const std::function &dirsync_func, + const std::function &purge_func); + + void finish_sync(); + private: + std::string m_dir_root; + }; + // stats sent to service daemon struct ServiceDaemonStats { uint64_t failed_dir_count = 0; @@ -310,8 +379,9 @@ private: int do_synchronize(const std::string &dir_root, const Snapshot ¤t, boost::optional prev); - - int do_synchronize(const std::string &dir_root, const Snapshot ¤t); + int do_synchronize(const std::string &dir_root, const Snapshot ¤t) { + return do_synchronize(dir_root, current, boost::none); + } int synchronize(const std::string &dir_root, const Snapshot ¤t, boost::optional prev); -- 2.39.5