From: Venky Shankar Date: Fri, 28 Feb 2025 04:57:46 +0000 (+0000) Subject: cephfs-mirror: integrate blockdiff API for regular file transfers X-Git-Tag: v19.2.3~44^2~1 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=486d742ea27a6bab7bb91b3d1af025f50f4b0d54;p=ceph.git cephfs-mirror: integrate blockdiff API for regular file transfers Fixes: http://tracker.ceph.com/issues/70225 Signed-off-by: Venky Shankar (cherry picked from commit 19802bab6f5f22614c3d30b1571dbbcbc675a0f8) --- diff --git a/src/tools/cephfs_mirror/PeerReplayer.cc b/src/tools/cephfs_mirror/PeerReplayer.cc index c52a42a05a35b..f6fc51768630d 100644 --- a/src/tools/cephfs_mirror/PeerReplayer.cc +++ b/src/tools/cephfs_mirror/PeerReplayer.cc @@ -650,8 +650,10 @@ int PeerReplayer::remote_mkdir(const std::string &epath, const struct ceph_statx #define NR_IOVECS 8 // # iovecs #define IOVEC_SIZE (8 * 1024 * 1024) // buffer size for each iovec int PeerReplayer::copy_to_remote(const std::string &dir_root, const std::string &epath, - const struct ceph_statx &stx, const FHandles &fh) { - dout(10) << ": dir_root=" << dir_root << ", epath=" << epath << dendl; + const struct ceph_statx &stx, const FHandles &fh, + uint64_t num_blocks, struct cblock *b) { + dout(10) << ": dir_root=" << dir_root << ", epath=" << epath << ", num_blocks=" + << num_blocks << dendl; int l_fd; int r_fd; void *ptr; @@ -666,7 +668,7 @@ int PeerReplayer::copy_to_remote(const std::string &dir_root, const std::string l_fd = r; r = ceph_openat(m_remote_mount, fh.r_fd_dir_root, epath.c_str(), - O_CREAT | O_TRUNC | O_WRONLY | O_NOFOLLOW, stx.stx_mode); + O_CREAT | O_WRONLY | O_NOFOLLOW, stx.stx_mode); if (r < 0) { derr << ": failed to create remote file path=" << epath << ": " << cpp_strerror(r) << dendl; @@ -681,43 +683,86 @@ int PeerReplayer::copy_to_remote(const std::string &dir_root, const std::string goto close_remote_fd; } - while (true) { - if (should_backoff(dir_root, &r)) { - dout(0) << ": backing off r=" << r << dendl; - break; - } + while (num_blocks > 0) { + auto offset = b->offset; + auto len = b->len; - for (int i = 0; i < NR_IOVECS; ++i) { - iov[i].iov_base = (char*)ptr + IOVEC_SIZE*i; - iov[i].iov_len = IOVEC_SIZE; - } + dout(10) << ": dir_root=" << dir_root << ", epath=" << epath << ", block: [" + << offset << "~" << len << "]" << dendl; - r = ceph_preadv(m_local_mount, l_fd, iov, NR_IOVECS, -1); - if (r < 0) { - derr << ": failed to read local file path=" << epath << ": " - << cpp_strerror(r) << dendl; - break; - } - if (r == 0) { - break; - } + auto end_offset = offset + len; + dout(20) << ": start offset=" << offset << ", end offset=" << end_offset + << dendl; + while (offset < end_offset) { + if (should_backoff(dir_root, &r)) { + dout(0) << ": backing off r=" << r << dendl; + break; + } - int iovs = (int)(r / IOVEC_SIZE); - int t = r % IOVEC_SIZE; - if (t) { - iov[iovs].iov_len = t; - ++iovs; + auto cut_off = len; + if (cut_off > NR_IOVECS*IOVEC_SIZE) { + cut_off = NR_IOVECS*IOVEC_SIZE; + } + + int num_buffers = cut_off / IOVEC_SIZE; + if (cut_off % IOVEC_SIZE) { + ++num_buffers; + } + + dout(20) << ": num_buffers=" << num_buffers << dendl; + for (int i = 0; i < num_buffers; ++i) { + iov[i].iov_base = (char *)ptr + IOVEC_SIZE*i; + if (cut_off < IOVEC_SIZE) { + ceph_assert(i+1 == num_buffers); + iov[i].iov_len = cut_off; + } else { + iov[i].iov_len = IOVEC_SIZE; + cut_off -= IOVEC_SIZE; + } + } + + r = ceph_preadv(m_local_mount, l_fd, iov, num_buffers, offset); + if (r < 0) { + derr << ": failed to read local file path=" << epath << ": " + << cpp_strerror(r) << dendl; + break; + } + dout(10) << ": read: " << r << " bytes" << dendl; + if (r == 0) { + break; + } + + int iovs = (int)(r / IOVEC_SIZE); + int t = r % IOVEC_SIZE; + if (t) { + iov[iovs].iov_len = t; + ++iovs; + } + + dout(10) << ": writing to offset: " << offset << dendl; + r = ceph_pwritev(m_remote_mount, r_fd, iov, iovs, offset); + if (r < 0) { + derr << ": failed to write remote file path=" << epath << ": " + << cpp_strerror(r) << dendl; + break; + } + + offset += r; } - r = ceph_pwritev(m_remote_mount, r_fd, iov, iovs, -1); + --num_blocks; + ++b; + } + + if (num_blocks == 0 && r >= 0) { // handle blocklist case + dout(20) << ": truncating epath=" << epath << " to " << stx.stx_size << " bytes" + << dendl; + r = ceph_ftruncate(m_remote_mount, r_fd, stx.stx_size); if (r < 0) { - derr << ": failed to write remote file path=" << epath << ": " + derr << ": failed to truncate remote file path=" << epath << ": " << cpp_strerror(r) << dendl; - break; + goto freeptr; } - } - - if (r == 0) { r = ceph_fsync(m_remote_mount, r_fd, 0); if (r < 0) { derr << ": failed to sync data for file path=" << epath << ": " @@ -725,6 +770,7 @@ int PeerReplayer::copy_to_remote(const std::string &dir_root, const std::string } } +freeptr: free(ptr); close_remote_fd: @@ -744,18 +790,24 @@ close_local_fd: return r == 0 ? 0 : r; } -int PeerReplayer::remote_file_op(const std::string &dir_root, const std::string &epath, - const struct ceph_statx &stx, const FHandles &fh, - bool need_data_sync, bool need_attr_sync) { +int PeerReplayer::remote_file_op(SyncMechanism *syncm, const std::string &dir_root, + const std::string &epath, const struct ceph_statx &stx, + bool sync_check, const FHandles &fh, bool need_data_sync, bool need_attr_sync) { dout(10) << ": dir_root=" << dir_root << ", epath=" << epath << ", need_data_sync=" << need_data_sync << ", need_attr_sync=" << need_attr_sync << dendl; int r; if (need_data_sync) { if (S_ISREG(stx.stx_mode)) { - r = copy_to_remote(dir_root, epath, stx, fh); + r = syncm->get_changed_blocks(epath, stx, sync_check, + [this, &dir_root, &epath, &stx, &fh](uint64_t num_blocks, struct cblock *b) { + int ret = copy_to_remote(dir_root, epath, stx, fh, num_blocks, b); + if (ret < 0) { + derr << ": failed to copy path=" << epath << ": " << ret << dendl; + } + return ret; + }); if (r < 0) { - derr << ": failed to copy path=" << epath << ": " << cpp_strerror(r) << dendl; return r; } if (m_perf_counters) { @@ -1231,6 +1283,23 @@ PeerReplayer::SyncMechanism::SyncMechanism(MountRef local, MountRef remote, FHan PeerReplayer::SyncMechanism::~SyncMechanism() { } +int PeerReplayer::SyncMechanism::get_changed_blocks(const std::string &epath, + const struct ceph_statx &stx, bool sync_check, + const std::function &callback) { + dout(20) << ": epath=" << epath << dendl; + + struct cblock b; + // extent covers the whole file + b.offset = 0; + b.len = stx.stx_size; + + struct ceph_file_blockdiff_changedblocks block; + block.num_blocks = 1; + block.b = &b; + + return callback(block.num_blocks, block.b); +} + PeerReplayer::SnapDiffSync::SnapDiffSync(std::string_view dir_root, MountRef local, MountRef remote, FHandles *fh, const Peer &peer, const Snapshot ¤t, boost::optional prev) @@ -1268,7 +1337,92 @@ int PeerReplayer::SnapDiffSync::init_sync() { return 0; } -int PeerReplayer::SnapDiffSync::get_entry(std::string *epath, struct ceph_statx *stx, +int PeerReplayer::SnapDiffSync::init_directory(const std::string &epath, + const struct ceph_statx &stx, bool pic, SyncEntry *se) { + dout(20) << ": epath=" << epath << dendl; + + int r; + if (pic) { + dout(20) << ": non snapdiff dir_root=" << m_dir_root << ", path=" << epath << dendl; + + ceph_dir_result *dirp; + r = opendirat(m_local, m_fh->c_fd, epath, AT_SYMLINK_NOFOLLOW, &dirp); + if (r < 0) { + derr << ": failed to open local directory=" << epath << ": " << r << dendl; + return r; + } + + *se = SyncEntry(epath, dirp, stx); + } else { + dout(20) << ": open_snapdiff for dir_root=" << m_dir_root << ", path=" << epath + << ", prev=" << (*m_prev).first << ", current=" << m_current.first << dendl; + + ceph_snapdiff_info info; + r = ceph_open_snapdiff(m_local, m_dir_root.c_str(), epath.c_str(), + stringify((*m_prev).first).c_str(), stringify(m_current.first).c_str(), &info); + if (r != 0) { + derr << ": failed to open snapdiff for " << m_dir_root << ", r=" << r << dendl; + return r; + } + + *se = SyncEntry(epath, info, stx); + } + + return 0; +} + +int PeerReplayer::SnapDiffSync::next_entry(SyncEntry &entry, std::string *e_name, + snapid_t *snapid) { + int r; + if (!entry.sync_is_snapdiff()) { + dout(20) << ": not snapdiff" << dendl; + struct dirent de; + r = ceph_readdirplus_r(m_local, entry.dirp, &de, NULL, 0, + AT_STATX_DONT_SYNC | AT_SYMLINK_NOFOLLOW, NULL); + if (r < 0) { + derr << ": failed to read directory=" << entry.epath << ", r=" << r << dendl; + return r; + } + + if (r == 0) { + return 0; + } + + *e_name = de.d_name; + *snapid = m_current.second; + } else { + dout(20) << ": is snapdiff" << dendl; + ceph_snapdiff_entry_t sd_entry; + r = ceph_readdir_snapdiff(&(entry.info), &sd_entry); + if (r < 0) { + derr << ": failed to read directory=" << entry.epath << ", r=" << r << dendl; + return r; + } + + if (r == 0) { + return 0; + } + + *e_name = sd_entry.dir_entry.d_name; + *snapid = sd_entry.snapid; + } + + return 1; +} + +void PeerReplayer::SnapDiffSync::fini_directory(SyncEntry &entry) { + if (!entry.sync_is_snapdiff()) { + if (ceph_closedir(m_local, entry.dirp) < 0) { + derr << ": failed to close local directory=" << entry.epath << dendl; + } + } else { + if (ceph_close_snapdiff(&(entry.info)) < 0) { + derr << ": failed to close snapdiff for " << entry.epath << dendl; + } + } +} + +int PeerReplayer::SnapDiffSync::get_entry(std::string *epath, struct ceph_statx *stx, bool *sync_check, const std::function &dirsync_func, const std::function &purge_func) { dout(20) << ": sync stack size=" << m_sync_stack.size() << dendl; @@ -1276,42 +1430,27 @@ int PeerReplayer::SnapDiffSync::get_entry(std::string *epath, struct ceph_statx while (!m_sync_stack.empty()) { auto &entry = m_sync_stack.top(); dout(20) << ": top of stack path=" << entry.epath << dendl; - - if (!entry.is_directory()) { - *epath = entry.epath; - *stx = entry.stx; - m_sync_stack.pop(); - return 0; - } + ceph_assert(entry.is_directory()); int r; + snapid_t snapid; std::string e_name; - ceph_snapdiff_entry_t sd_entry; while (true) { - r = ceph_readdir_snapdiff(&(entry.info), &sd_entry); - if (r < 0) { - derr << ": failed to read directory=" << entry.epath << dendl; + e_name.clear(); + r = next_entry(entry, &e_name, &snapid); + if (r < 0 || r == 0) { break; } - if (r == 0) { - break; - } - - dout(20) << ": entry=" << sd_entry.dir_entry.d_name << ", snapid=" - << sd_entry.snapid << dendl; - auto d_name = std::string(sd_entry.dir_entry.d_name); - if (d_name != "." && d_name != "..") { - e_name = d_name; + dout(20) << ": entry=" << e_name << ", snapid=" << snapid << dendl; + if (e_name != "." && e_name != "..") { break; } } if (r == 0) { dout(10) << ": done for directory=" << entry.epath << dendl; - if (ceph_close_snapdiff(&(entry.info)) < 0) { - derr << ": failed to close snapdiff for " << entry.epath << dendl; - } + fini_directory(entry); m_sync_stack.pop(); continue; } @@ -1322,7 +1461,7 @@ int PeerReplayer::SnapDiffSync::get_entry(std::string *epath, struct ceph_statx auto _epath = entry_path(entry.epath, e_name); dout(20) << ": epath=" << _epath << dendl; - if (sd_entry.snapid == (*m_prev).second) { + if (snapid == (*m_prev).second) { dout(20) << ": epath=" << _epath << " is deleted in current snapshot " << dendl; // do not depend on d_type reported in struct dirent as the // delete and create could have been processed and a restart @@ -1333,8 +1472,7 @@ int PeerReplayer::SnapDiffSync::get_entry(std::string *epath, struct ceph_statx r = ceph_statxat(m_remote, m_fh->r_fd_dir_root, _epath.c_str(), &pstx, CEPH_STATX_MODE, AT_STATX_DONT_SYNC | AT_SYMLINK_NOFOLLOW); if (r < 0 && r != -ENOENT) { - derr << ": failed to stat remote entry=" << _epath << ": " << cpp_strerror(r) - << dendl; + derr << ": failed to stat remote entry=" << _epath << ", r=" << r << dendl; return r; } if (r == 0) { @@ -1345,11 +1483,12 @@ int PeerReplayer::SnapDiffSync::get_entry(std::string *epath, struct ceph_statx } if (r < 0) { - derr << ": failed to propagate missing dirs: " << cpp_strerror(r) << dendl; + derr << ": failed to propagate missing dirs r=" << r << dendl; return r; } } + m_deleted[entry.epath].emplace(e_name); continue; } @@ -1359,29 +1498,32 @@ int PeerReplayer::SnapDiffSync::get_entry(std::string *epath, struct ceph_statx CEPH_STATX_SIZE | CEPH_STATX_ATIME | CEPH_STATX_MTIME, AT_STATX_DONT_SYNC | AT_SYMLINK_NOFOLLOW); if (r < 0) { - derr << ": failed to stat epath=" << epath << ": " << cpp_strerror(r) - << dendl; + derr << ": failed to stat epath=" << epath << ", r=" << r << dendl; return r; } + bool pic = entry.is_purged_or_itype_changed() || m_deleted[entry.epath].contains(e_name); if (S_ISDIR(estx.stx_mode)) { - dout(20) << ": open_snapdiff for dir_root=" << m_dir_root << ", path=" << _epath - << ", prev=" << (*m_prev).first << ", current=" << m_current.first << dendl; - - ceph_snapdiff_info info; - r = ceph_open_snapdiff(m_local, m_dir_root.c_str(), _epath.c_str(), - stringify((*m_prev).first).c_str(), stringify(m_current.first).c_str(), &info); - if (r != 0) { - derr << ": failed to open snapdiff for " << m_dir_root << ": r=" << r << dendl; + SyncEntry se; + r = init_directory(_epath, estx, pic, &se); + if (r < 0) { return r; } - m_sync_stack.emplace(SyncEntry(_epath, info, estx)); + if (pic) { + dout(10) << ": purge or itype change (including parent) found for entry=" + << se.epath << dendl; + se.set_purged_or_itype_changed(); + } + + m_sync_stack.emplace(se); } *epath = _epath; *stx = estx; + *sync_check = !pic; + dout(10) << ": sync_check=" << *sync_check << " for epath=" << *epath << dendl; return 0; } @@ -1389,6 +1531,58 @@ int PeerReplayer::SnapDiffSync::get_entry(std::string *epath, struct ceph_statx return 0; } +int PeerReplayer::SnapDiffSync::get_changed_blocks(const std::string &epath, + const struct ceph_statx &stx, bool sync_check, + const std::function &callback) { + dout(20) << ": dir_root=" << m_dir_root << ", epath=" << epath + << ", sync_check=" << sync_check << dendl; + + if (!sync_check) { + return SyncMechanism::get_changed_blocks(epath, stx, sync_check, callback); + } + + ceph_file_blockdiff_info info; + int r = ceph_file_blockdiff_init(m_local, m_dir_root.c_str(), epath.c_str(), + (*m_prev).first.c_str(), m_current.first.c_str(), &info); + if (r != 0 && r != -ENOENT) { + derr << ": failed to init file blockdiff: r=" << r << dendl; + return r; + } + + if (r < 0) { + dout(20) << ": new file epath=" << epath << dendl; + return SyncMechanism::get_changed_blocks(epath, stx, sync_check, callback); + } + + r = 1; + while (true) { + ceph_file_blockdiff_changedblocks blocks; + r = ceph_file_blockdiff(&info, &blocks); + if (r < 0) { + derr << " failed to get next changed block: ret:" << r << dendl; + break; + } + + int rr = r; + if (blocks.num_blocks) { + r = callback(blocks.num_blocks, blocks.b); + ceph_free_file_blockdiff_buffer(&blocks); + if (r < 0) { + derr << ": blockdiff callback returned error: r=" << r << dendl; + break; + } + } + + if (rr == 0) { + break; + } + // else fetch next changed blocks + } + + ceph_file_blockdiff_finish(&info); + return r; +} + void PeerReplayer::SnapDiffSync::finish_sync() { dout(20) << dendl; @@ -1438,7 +1632,7 @@ int PeerReplayer::RemoteSync::init_sync() { return 0; } -int PeerReplayer::RemoteSync::get_entry(std::string *epath, struct ceph_statx *stx, +int PeerReplayer::RemoteSync::get_entry(std::string *epath, struct ceph_statx *stx, bool *sync_check, const std::function &dirsync_func, const std::function &purge_func) { dout(20) << ": sync stack size=" << m_sync_stack.size() << dendl; @@ -1600,9 +1794,10 @@ int PeerReplayer::do_synchronize(const std::string &dir_root, const Snapshot &cu break; } + bool sync_check = true; std::string epath; struct ceph_statx stx; - r = syncm->get_entry(&epath, &stx, + r = syncm->get_entry(&epath, &stx, &sync_check, [this, &dir_root, &fh](const std::string &epath) { return propagate_deleted_entries(dir_root, epath, fh); }, @@ -1628,16 +1823,18 @@ int PeerReplayer::do_synchronize(const std::string &dir_root, const Snapshot &cu } else { bool need_data_sync = true; bool need_attr_sync = true; - r = should_sync_entry(epath, stx, fh, - &need_data_sync, &need_attr_sync); - if (r < 0) { - break; + if (sync_check) { + r = should_sync_entry(epath, stx, fh, + &need_data_sync, &need_attr_sync); + if (r < 0) { + break; + } } dout(5) << ": entry=" << epath << ", data_sync=" << need_data_sync << ", attr_sync=" << need_attr_sync << dendl; if (need_data_sync || need_attr_sync) { - r = remote_file_op(dir_root, epath, stx, fh, need_data_sync, need_attr_sync); + r = remote_file_op(syncm, dir_root, epath, stx, sync_check, fh, need_data_sync, need_attr_sync); if (r < 0) { break; } diff --git a/src/tools/cephfs_mirror/PeerReplayer.h b/src/tools/cephfs_mirror/PeerReplayer.h index b5199913649a3..d15d580e21c8e 100644 --- a/src/tools/cephfs_mirror/PeerReplayer.h +++ b/src/tools/cephfs_mirror/PeerReplayer.h @@ -109,6 +109,12 @@ private: // in the currently synced snapshot have been propagated to // the remote filesystem. bool remote_synced = false; + // includes parent dentry purge + bool purged_or_itype_changed = false; + bool is_snapdiff = false; + + SyncEntry() { + } SyncEntry(std::string_view path, const struct ceph_statx &stx) @@ -128,6 +134,7 @@ private: : epath(path), info(info), stx(stx) { + is_snapdiff = true; } bool is_directory() const { @@ -140,6 +147,17 @@ private: void set_remote_synced() { remote_synced = true; } + + bool is_purged_or_itype_changed() const { + return purged_or_itype_changed; + } + void set_purged_or_itype_changed() { + purged_or_itype_changed = true; + } + + bool sync_is_snapdiff() const { + return is_snapdiff; + } }; class SyncMechanism { @@ -151,10 +169,14 @@ private: virtual int init_sync() = 0; - virtual int get_entry(std::string *epath, struct ceph_statx *stx, + virtual int get_entry(std::string *epath, struct ceph_statx *stx, bool *sync_check, const std::function &dirsync_func, const std::function &purge_func) = 0; + virtual int get_changed_blocks(const std::string &epath, + const struct ceph_statx &stx, bool sync_check, + const std::function &callback); + virtual void finish_sync() = 0; protected: @@ -176,7 +198,7 @@ private: int init_sync() override; - int get_entry(std::string *epath, struct ceph_statx *stx, + int get_entry(std::string *epath, struct ceph_statx *stx, bool *sync_check, const std::function &dirsync_func, const std::function &purge_func); @@ -192,13 +214,24 @@ private: int init_sync() override; - int get_entry(std::string *epeth, struct ceph_statx *stx, + int get_entry(std::string *epeth, struct ceph_statx *stx, bool *sync_check, const std::function &dirsync_func, const std::function &purge_func); + int get_changed_blocks(const std::string &epath, + const struct ceph_statx &stx, bool sync_check, + const std::function &callback); + void finish_sync(); + private: + int init_directory(const std::string &epath, + const struct ceph_statx &stx, bool pic, SyncEntry *se); + int next_entry(SyncEntry &entry, std::string *e_name, snapid_t *snapid); + void fini_directory(SyncEntry &entry); + std::string m_dir_root; + std::map> m_deleted; }; // stats sent to service daemon @@ -388,10 +421,11 @@ private: int do_sync_snaps(const std::string &dir_root); int remote_mkdir(const std::string &epath, const struct ceph_statx &stx, const FHandles &fh); - int remote_file_op(const std::string &dir_root, const std::string &epath, const struct ceph_statx &stx, - const FHandles &fh, bool need_data_sync, bool need_attr_sync); + int remote_file_op(SyncMechanism *syncm, const std::string &dir_root, + const std::string &epath, const struct ceph_statx &stx, + bool sync_check, const FHandles &fh, bool need_data_sync, bool need_attr_sync); int copy_to_remote(const std::string &dir_root, const std::string &epath, const struct ceph_statx &stx, - const FHandles &fh); + const FHandles &fh, uint64_t num_blocks, struct cblock *b); int sync_perms(const std::string& path); };