#define NR_IOVECS 8 // # iovecs
#define IOVEC_SIZE (8 * 1024 * 1024) // buffer size for each iovec
int PeerReplayer::copy_to_remote(const std::string &dir_root, const std::string &epath,
- const struct ceph_statx &stx, const FHandles &fh) {
- dout(10) << ": dir_root=" << dir_root << ", epath=" << epath << dendl;
+ const struct ceph_statx &stx, const FHandles &fh,
+ uint64_t num_blocks, struct cblock *b) {
+ dout(10) << ": dir_root=" << dir_root << ", epath=" << epath << ", num_blocks="
+ << num_blocks << dendl;
int l_fd;
int r_fd;
void *ptr;
l_fd = r;
r = ceph_openat(m_remote_mount, fh.r_fd_dir_root, epath.c_str(),
- O_CREAT | O_TRUNC | O_WRONLY | O_NOFOLLOW, stx.stx_mode);
+ O_CREAT | O_WRONLY | O_NOFOLLOW, stx.stx_mode);
if (r < 0) {
derr << ": failed to create remote file path=" << epath << ": "
<< cpp_strerror(r) << dendl;
goto close_remote_fd;
}
- while (true) {
- if (should_backoff(dir_root, &r)) {
- dout(0) << ": backing off r=" << r << dendl;
- break;
- }
+ while (num_blocks > 0) {
+ auto offset = b->offset;
+ auto len = b->len;
- for (int i = 0; i < NR_IOVECS; ++i) {
- iov[i].iov_base = (char*)ptr + IOVEC_SIZE*i;
- iov[i].iov_len = IOVEC_SIZE;
- }
+ dout(10) << ": dir_root=" << dir_root << ", epath=" << epath << ", block: ["
+ << offset << "~" << len << "]" << dendl;
- r = ceph_preadv(m_local_mount, l_fd, iov, NR_IOVECS, -1);
- if (r < 0) {
- derr << ": failed to read local file path=" << epath << ": "
- << cpp_strerror(r) << dendl;
- break;
- }
- if (r == 0) {
- break;
- }
+ auto end_offset = offset + len;
+ dout(20) << ": start offset=" << offset << ", end offset=" << end_offset
+ << dendl;
+ while (offset < end_offset) {
+ if (should_backoff(dir_root, &r)) {
+ dout(0) << ": backing off r=" << r << dendl;
+ break;
+ }
- int iovs = (int)(r / IOVEC_SIZE);
- int t = r % IOVEC_SIZE;
- if (t) {
- iov[iovs].iov_len = t;
- ++iovs;
+ auto cut_off = len;
+ if (cut_off > NR_IOVECS*IOVEC_SIZE) {
+ cut_off = NR_IOVECS*IOVEC_SIZE;
+ }
+
+ int num_buffers = cut_off / IOVEC_SIZE;
+ if (cut_off % IOVEC_SIZE) {
+ ++num_buffers;
+ }
+
+ dout(20) << ": num_buffers=" << num_buffers << dendl;
+ for (int i = 0; i < num_buffers; ++i) {
+ iov[i].iov_base = (char *)ptr + IOVEC_SIZE*i;
+ if (cut_off < IOVEC_SIZE) {
+ ceph_assert(i+1 == num_buffers);
+ iov[i].iov_len = cut_off;
+ } else {
+ iov[i].iov_len = IOVEC_SIZE;
+ cut_off -= IOVEC_SIZE;
+ }
+ }
+
+ r = ceph_preadv(m_local_mount, l_fd, iov, num_buffers, offset);
+ if (r < 0) {
+ derr << ": failed to read local file path=" << epath << ": "
+ << cpp_strerror(r) << dendl;
+ break;
+ }
+ dout(10) << ": read: " << r << " bytes" << dendl;
+ if (r == 0) {
+ break;
+ }
+
+ int iovs = (int)(r / IOVEC_SIZE);
+ int t = r % IOVEC_SIZE;
+ if (t) {
+ iov[iovs].iov_len = t;
+ ++iovs;
+ }
+
+ dout(10) << ": writing to offset: " << offset << dendl;
+ r = ceph_pwritev(m_remote_mount, r_fd, iov, iovs, offset);
+ if (r < 0) {
+ derr << ": failed to write remote file path=" << epath << ": "
+ << cpp_strerror(r) << dendl;
+ break;
+ }
+
+ offset += r;
}
- r = ceph_pwritev(m_remote_mount, r_fd, iov, iovs, -1);
+ --num_blocks;
+ ++b;
+ }
+
+ if (num_blocks == 0 && r >= 0) { // handle blocklist case
+ dout(20) << ": truncating epath=" << epath << " to " << stx.stx_size << " bytes"
+ << dendl;
+ r = ceph_ftruncate(m_remote_mount, r_fd, stx.stx_size);
if (r < 0) {
- derr << ": failed to write remote file path=" << epath << ": "
+ derr << ": failed to truncate remote file path=" << epath << ": "
<< cpp_strerror(r) << dendl;
- break;
+ goto freeptr;
}
- }
-
- if (r == 0) {
r = ceph_fsync(m_remote_mount, r_fd, 0);
if (r < 0) {
derr << ": failed to sync data for file path=" << epath << ": "
}
}
+freeptr:
free(ptr);
close_remote_fd:
return r == 0 ? 0 : r;
}
-int PeerReplayer::remote_file_op(const std::string &dir_root, const std::string &epath,
- const struct ceph_statx &stx, const FHandles &fh,
- bool need_data_sync, bool need_attr_sync) {
+int PeerReplayer::remote_file_op(SyncMechanism *syncm, const std::string &dir_root,
+ const std::string &epath, const struct ceph_statx &stx,
+ bool sync_check, const FHandles &fh, bool need_data_sync, bool need_attr_sync) {
dout(10) << ": dir_root=" << dir_root << ", epath=" << epath << ", need_data_sync=" << need_data_sync
<< ", need_attr_sync=" << need_attr_sync << dendl;
int r;
if (need_data_sync) {
if (S_ISREG(stx.stx_mode)) {
- r = copy_to_remote(dir_root, epath, stx, fh);
+ r = syncm->get_changed_blocks(epath, stx, sync_check,
+ [this, &dir_root, &epath, &stx, &fh](uint64_t num_blocks, struct cblock *b) {
+ int ret = copy_to_remote(dir_root, epath, stx, fh, num_blocks, b);
+ if (ret < 0) {
+ derr << ": failed to copy path=" << epath << ": " << ret << dendl;
+ }
+ return ret;
+ });
if (r < 0) {
- derr << ": failed to copy path=" << epath << ": " << cpp_strerror(r) << dendl;
return r;
}
if (m_perf_counters) {
PeerReplayer::SyncMechanism::~SyncMechanism() {
}
+int PeerReplayer::SyncMechanism::get_changed_blocks(const std::string &epath,
+ const struct ceph_statx &stx, bool sync_check,
+ const std::function<int (uint64_t, struct cblock *)> &callback) {
+ dout(20) << ": epath=" << epath << dendl;
+
+ struct cblock b;
+ // extent covers the whole file
+ b.offset = 0;
+ b.len = stx.stx_size;
+
+ struct ceph_file_blockdiff_changedblocks block;
+ block.num_blocks = 1;
+ block.b = &b;
+
+ return callback(block.num_blocks, block.b);
+}
+
PeerReplayer::SnapDiffSync::SnapDiffSync(std::string_view dir_root, MountRef local, MountRef remote,
FHandles *fh, const Peer &peer, const Snapshot ¤t,
boost::optional<Snapshot> prev)
return 0;
}
-int PeerReplayer::SnapDiffSync::get_entry(std::string *epath, struct ceph_statx *stx,
+int PeerReplayer::SnapDiffSync::init_directory(const std::string &epath,
+ const struct ceph_statx &stx, bool pic, SyncEntry *se) {
+ dout(20) << ": epath=" << epath << dendl;
+
+ int r;
+ if (pic) {
+ dout(20) << ": non snapdiff dir_root=" << m_dir_root << ", path=" << epath << dendl;
+
+ ceph_dir_result *dirp;
+ r = opendirat(m_local, m_fh->c_fd, epath, AT_SYMLINK_NOFOLLOW, &dirp);
+ if (r < 0) {
+ derr << ": failed to open local directory=" << epath << ": " << r << dendl;
+ return r;
+ }
+
+ *se = SyncEntry(epath, dirp, stx);
+ } else {
+ dout(20) << ": open_snapdiff for dir_root=" << m_dir_root << ", path=" << epath
+ << ", prev=" << (*m_prev).first << ", current=" << m_current.first << dendl;
+
+ ceph_snapdiff_info info;
+ r = ceph_open_snapdiff(m_local, m_dir_root.c_str(), epath.c_str(),
+ stringify((*m_prev).first).c_str(), stringify(m_current.first).c_str(), &info);
+ if (r != 0) {
+ derr << ": failed to open snapdiff for " << m_dir_root << ", r=" << r << dendl;
+ return r;
+ }
+
+ *se = SyncEntry(epath, info, stx);
+ }
+
+ return 0;
+}
+
+int PeerReplayer::SnapDiffSync::next_entry(SyncEntry &entry, std::string *e_name,
+ snapid_t *snapid) {
+ int r;
+ if (!entry.sync_is_snapdiff()) {
+ dout(20) << ": not snapdiff" << dendl;
+ struct dirent de;
+ r = ceph_readdirplus_r(m_local, entry.dirp, &de, NULL, 0,
+ AT_STATX_DONT_SYNC | AT_SYMLINK_NOFOLLOW, NULL);
+ if (r < 0) {
+ derr << ": failed to read directory=" << entry.epath << ", r=" << r << dendl;
+ return r;
+ }
+
+ if (r == 0) {
+ return 0;
+ }
+
+ *e_name = de.d_name;
+ *snapid = m_current.second;
+ } else {
+ dout(20) << ": is snapdiff" << dendl;
+ ceph_snapdiff_entry_t sd_entry;
+ r = ceph_readdir_snapdiff(&(entry.info), &sd_entry);
+ if (r < 0) {
+ derr << ": failed to read directory=" << entry.epath << ", r=" << r << dendl;
+ return r;
+ }
+
+ if (r == 0) {
+ return 0;
+ }
+
+ *e_name = sd_entry.dir_entry.d_name;
+ *snapid = sd_entry.snapid;
+ }
+
+ return 1;
+}
+
+void PeerReplayer::SnapDiffSync::fini_directory(SyncEntry &entry) {
+ if (!entry.sync_is_snapdiff()) {
+ if (ceph_closedir(m_local, entry.dirp) < 0) {
+ derr << ": failed to close local directory=" << entry.epath << dendl;
+ }
+ } else {
+ if (ceph_close_snapdiff(&(entry.info)) < 0) {
+ derr << ": failed to close snapdiff for " << entry.epath << dendl;
+ }
+ }
+}
+
+int PeerReplayer::SnapDiffSync::get_entry(std::string *epath, struct ceph_statx *stx, bool *sync_check,
const std::function<int (const std::string&)> &dirsync_func,
const std::function<int (const std::string &)> &purge_func) {
dout(20) << ": sync stack size=" << m_sync_stack.size() << dendl;
while (!m_sync_stack.empty()) {
auto &entry = m_sync_stack.top();
dout(20) << ": top of stack path=" << entry.epath << dendl;
-
- if (!entry.is_directory()) {
- *epath = entry.epath;
- *stx = entry.stx;
- m_sync_stack.pop();
- return 0;
- }
+ ceph_assert(entry.is_directory());
int r;
+ snapid_t snapid;
std::string e_name;
- ceph_snapdiff_entry_t sd_entry;
while (true) {
- r = ceph_readdir_snapdiff(&(entry.info), &sd_entry);
- if (r < 0) {
- derr << ": failed to read directory=" << entry.epath << dendl;
+ e_name.clear();
+ r = next_entry(entry, &e_name, &snapid);
+ if (r < 0 || r == 0) {
break;
}
- if (r == 0) {
- break;
- }
-
- dout(20) << ": entry=" << sd_entry.dir_entry.d_name << ", snapid="
- << sd_entry.snapid << dendl;
- auto d_name = std::string(sd_entry.dir_entry.d_name);
- if (d_name != "." && d_name != "..") {
- e_name = d_name;
+ dout(20) << ": entry=" << e_name << ", snapid=" << snapid << dendl;
+ if (e_name != "." && e_name != "..") {
break;
}
}
if (r == 0) {
dout(10) << ": done for directory=" << entry.epath << dendl;
- if (ceph_close_snapdiff(&(entry.info)) < 0) {
- derr << ": failed to close snapdiff for " << entry.epath << dendl;
- }
+ fini_directory(entry);
m_sync_stack.pop();
continue;
}
auto _epath = entry_path(entry.epath, e_name);
dout(20) << ": epath=" << _epath << dendl;
- if (sd_entry.snapid == (*m_prev).second) {
+ if (snapid == (*m_prev).second) {
dout(20) << ": epath=" << _epath << " is deleted in current snapshot " << dendl;
// do not depend on d_type reported in struct dirent as the
// delete and create could have been processed and a restart
r = ceph_statxat(m_remote, m_fh->r_fd_dir_root, _epath.c_str(), &pstx,
CEPH_STATX_MODE, AT_STATX_DONT_SYNC | AT_SYMLINK_NOFOLLOW);
if (r < 0 && r != -ENOENT) {
- derr << ": failed to stat remote entry=" << _epath << ": " << cpp_strerror(r)
- << dendl;
+ derr << ": failed to stat remote entry=" << _epath << ", r=" << r << dendl;
return r;
}
if (r == 0) {
}
if (r < 0) {
- derr << ": failed to propagate missing dirs: " << cpp_strerror(r) << dendl;
+ derr << ": failed to propagate missing dirs r=" << r << dendl;
return r;
}
}
+ m_deleted[entry.epath].emplace(e_name);
continue;
}
CEPH_STATX_SIZE | CEPH_STATX_ATIME | CEPH_STATX_MTIME,
AT_STATX_DONT_SYNC | AT_SYMLINK_NOFOLLOW);
if (r < 0) {
- derr << ": failed to stat epath=" << epath << ": " << cpp_strerror(r)
- << dendl;
+ derr << ": failed to stat epath=" << epath << ", r=" << r << dendl;
return r;
}
+ bool pic = entry.is_purged_or_itype_changed() || m_deleted[entry.epath].contains(e_name);
if (S_ISDIR(estx.stx_mode)) {
- dout(20) << ": open_snapdiff for dir_root=" << m_dir_root << ", path=" << _epath
- << ", prev=" << (*m_prev).first << ", current=" << m_current.first << dendl;
-
- ceph_snapdiff_info info;
- r = ceph_open_snapdiff(m_local, m_dir_root.c_str(), _epath.c_str(),
- stringify((*m_prev).first).c_str(), stringify(m_current.first).c_str(), &info);
- if (r != 0) {
- derr << ": failed to open snapdiff for " << m_dir_root << ": r=" << r << dendl;
+ SyncEntry se;
+ r = init_directory(_epath, estx, pic, &se);
+ if (r < 0) {
return r;
}
- m_sync_stack.emplace(SyncEntry(_epath, info, estx));
+ if (pic) {
+ dout(10) << ": purge or itype change (including parent) found for entry="
+ << se.epath << dendl;
+ se.set_purged_or_itype_changed();
+ }
+
+ m_sync_stack.emplace(se);
}
*epath = _epath;
*stx = estx;
+ *sync_check = !pic;
+ dout(10) << ": sync_check=" << *sync_check << " for epath=" << *epath << dendl;
return 0;
}
return 0;
}
+int PeerReplayer::SnapDiffSync::get_changed_blocks(const std::string &epath,
+ const struct ceph_statx &stx, bool sync_check,
+ const std::function<int (uint64_t, struct cblock *)> &callback) {
+ dout(20) << ": dir_root=" << m_dir_root << ", epath=" << epath
+ << ", sync_check=" << sync_check << dendl;
+
+ if (!sync_check) {
+ return SyncMechanism::get_changed_blocks(epath, stx, sync_check, callback);
+ }
+
+ ceph_file_blockdiff_info info;
+ int r = ceph_file_blockdiff_init(m_local, m_dir_root.c_str(), epath.c_str(),
+ (*m_prev).first.c_str(), m_current.first.c_str(), &info);
+ if (r != 0 && r != -ENOENT) {
+ derr << ": failed to init file blockdiff: r=" << r << dendl;
+ return r;
+ }
+
+ if (r < 0) {
+ dout(20) << ": new file epath=" << epath << dendl;
+ return SyncMechanism::get_changed_blocks(epath, stx, sync_check, callback);
+ }
+
+ r = 1;
+ while (true) {
+ ceph_file_blockdiff_changedblocks blocks;
+ r = ceph_file_blockdiff(&info, &blocks);
+ if (r < 0) {
+ derr << " failed to get next changed block: ret:" << r << dendl;
+ break;
+ }
+
+ int rr = r;
+ if (blocks.num_blocks) {
+ r = callback(blocks.num_blocks, blocks.b);
+ ceph_free_file_blockdiff_buffer(&blocks);
+ if (r < 0) {
+ derr << ": blockdiff callback returned error: r=" << r << dendl;
+ break;
+ }
+ }
+
+ if (rr == 0) {
+ break;
+ }
+ // else fetch next changed blocks
+ }
+
+ ceph_file_blockdiff_finish(&info);
+ return r;
+}
+
void PeerReplayer::SnapDiffSync::finish_sync() {
dout(20) << dendl;
return 0;
}
-int PeerReplayer::RemoteSync::get_entry(std::string *epath, struct ceph_statx *stx,
+int PeerReplayer::RemoteSync::get_entry(std::string *epath, struct ceph_statx *stx, bool *sync_check,
const std::function<int (const std::string&)> &dirsync_func,
const std::function<int (const std::string &)> &purge_func) {
dout(20) << ": sync stack size=" << m_sync_stack.size() << dendl;
break;
}
+ bool sync_check = true;
std::string epath;
struct ceph_statx stx;
- r = syncm->get_entry(&epath, &stx,
+ r = syncm->get_entry(&epath, &stx, &sync_check,
[this, &dir_root, &fh](const std::string &epath) {
return propagate_deleted_entries(dir_root, epath, fh);
},
} else {
bool need_data_sync = true;
bool need_attr_sync = true;
- r = should_sync_entry(epath, stx, fh,
- &need_data_sync, &need_attr_sync);
- if (r < 0) {
- break;
+ if (sync_check) {
+ r = should_sync_entry(epath, stx, fh,
+ &need_data_sync, &need_attr_sync);
+ if (r < 0) {
+ break;
+ }
}
dout(5) << ": entry=" << epath << ", data_sync=" << need_data_sync
<< ", attr_sync=" << need_attr_sync << dendl;
if (need_data_sync || need_attr_sync) {
- r = remote_file_op(dir_root, epath, stx, fh, need_data_sync, need_attr_sync);
+ r = remote_file_op(syncm, dir_root, epath, stx, sync_check, fh, need_data_sync, need_attr_sync);
if (r < 0) {
break;
}
// in the currently synced snapshot have been propagated to
// the remote filesystem.
bool remote_synced = false;
+ // includes parent dentry purge
+ bool purged_or_itype_changed = false;
+ bool is_snapdiff = false;
+
+ SyncEntry() {
+ }
SyncEntry(std::string_view path,
const struct ceph_statx &stx)
: epath(path),
info(info),
stx(stx) {
+ is_snapdiff = true;
}
bool is_directory() const {
void set_remote_synced() {
remote_synced = true;
}
+
+ bool is_purged_or_itype_changed() const {
+ return purged_or_itype_changed;
+ }
+ void set_purged_or_itype_changed() {
+ purged_or_itype_changed = true;
+ }
+
+ bool sync_is_snapdiff() const {
+ return is_snapdiff;
+ }
};
class SyncMechanism {
virtual int init_sync() = 0;
- virtual int get_entry(std::string *epath, struct ceph_statx *stx,
+ virtual int get_entry(std::string *epath, struct ceph_statx *stx, bool *sync_check,
const std::function<int (const std::string&)> &dirsync_func,
const std::function<int (const std::string&)> &purge_func) = 0;
+ virtual int get_changed_blocks(const std::string &epath,
+ const struct ceph_statx &stx, bool sync_check,
+ const std::function<int (uint64_t, struct cblock *)> &callback);
+
virtual void finish_sync() = 0;
protected:
int init_sync() override;
- int get_entry(std::string *epath, struct ceph_statx *stx,
+ int get_entry(std::string *epath, struct ceph_statx *stx, bool *sync_check,
const std::function<int (const std::string&)> &dirsync_func,
const std::function<int (const std::string&)> &purge_func);
int init_sync() override;
- int get_entry(std::string *epeth, struct ceph_statx *stx,
+ int get_entry(std::string *epeth, struct ceph_statx *stx, bool *sync_check,
const std::function<int (const std::string&)> &dirsync_func,
const std::function<int (const std::string&)> &purge_func);
+ int get_changed_blocks(const std::string &epath,
+ const struct ceph_statx &stx, bool sync_check,
+ const std::function<int (uint64_t, struct cblock *)> &callback);
+
void finish_sync();
+
private:
+ int init_directory(const std::string &epath,
+ const struct ceph_statx &stx, bool pic, SyncEntry *se);
+ int next_entry(SyncEntry &entry, std::string *e_name, snapid_t *snapid);
+ void fini_directory(SyncEntry &entry);
+
std::string m_dir_root;
+ std::map<std::string, std::set<std::string>> m_deleted;
};
// stats sent to service daemon
int do_sync_snaps(const std::string &dir_root);
int remote_mkdir(const std::string &epath, const struct ceph_statx &stx, const FHandles &fh);
- int remote_file_op(const std::string &dir_root, const std::string &epath, const struct ceph_statx &stx,
- const FHandles &fh, bool need_data_sync, bool need_attr_sync);
+ int remote_file_op(SyncMechanism *syncm, const std::string &dir_root,
+ const std::string &epath, const struct ceph_statx &stx,
+ bool sync_check, const FHandles &fh, bool need_data_sync, bool need_attr_sync);
int copy_to_remote(const std::string &dir_root, const std::string &epath, const struct ceph_statx &stx,
- const FHandles &fh);
+ const FHandles &fh, uint64_t num_blocks, struct cblock *b);
int sync_perms(const std::string& path);
};