// vim: ts=8 sw=2 smarttab
#include <stack>
+#include <queue>
#include <fcntl.h>
#include <algorithm>
#include <sys/time.h>
return dir + "/" + name;
}
+std::string entry_diff_path(const std::string &dir, const std::string &name) {
+ if (dir == ".")
+ return name;
+ return dir + "/" + name;
+}
+
std::map<std::string, std::string> decode_snap_metadata(snap_metadata *snap_metadata,
size_t nr_snap_metadata) {
std::map<std::string, std::string> metadata;
ceph_close(fh.p_mnt, fh.p_fd);
}
-int PeerReplayer::do_synchronize(const std::string &dir_root, const Snapshot ¤t,
- boost::optional<Snapshot> prev) {
+int PeerReplayer::do_synchronize(const std::string &dir_root, const Snapshot ¤t) {
dout(20) << ": dir_root=" << dir_root << ", current=" << current << dendl;
- if (prev) {
- dout(20) << ": incremental sync check from prev=" << prev << dendl;
- }
-
FHandles fh;
- int r = pre_sync_check_and_open_handles(dir_root, current, prev, &fh);
+ int r = pre_sync_check_and_open_handles(dir_root, current, boost::none, &fh);
if (r < 0) {
- dout(5) << ": cannot proceeed with sync: " << cpp_strerror(r) << dendl;
+ dout(5) << ": cannot proceed with sync: " << cpp_strerror(r) << dendl;
return r;
}
return r;
}
+int PeerReplayer::do_synchronize(const std::string &dir_root, const Snapshot ¤t,
+ boost::optional<Snapshot> prev) {
+ if (!prev) {
+ derr << ": invalid previous snapshot" << dendl;
+ return -ENODATA;
+ }
+
+ dout(20) << ": incremental sync check from prev=" << prev << dendl;
+
+ FHandles fh;
+ int r = pre_sync_check_and_open_handles(dir_root, current, prev, &fh);
+ if (r < 0) {
+ dout(5) << ": cannot proceed with sync: " << cpp_strerror(r) << dendl;
+ return r;
+ }
+
+ BOOST_SCOPE_EXIT_ALL( (this)(&fh) ) {
+ post_sync_close_handles(fh);
+ };
+
+ // record that we are going to "dirty" the data under this directory root
+ auto snap_id_str{stringify(current.second)};
+ r = ceph_setxattr(m_remote_mount, dir_root.c_str(), "ceph.mirror.dirty_snap_id",
+ snap_id_str.c_str(), snap_id_str.size(), 0);
+ if (r < 0) {
+ derr << ": error setting \"ceph.mirror.dirty_snap_id\" on dir_root=" << dir_root
+ << ": " << cpp_strerror(r) << dendl;
+ return r;
+ }
+
+ struct ceph_statx cstx;
+ r = ceph_fstatx(m_local_mount, fh.c_fd, &cstx,
+ CEPH_STATX_MODE | CEPH_STATX_UID | CEPH_STATX_GID |
+ CEPH_STATX_SIZE | CEPH_STATX_ATIME | CEPH_STATX_MTIME,
+ AT_STATX_DONT_SYNC | AT_SYMLINK_NOFOLLOW);
+ if (r < 0) {
+ derr << ": failed to stat snap=" << current.first << ": " << cpp_strerror(r)
+ << dendl;
+ return r;
+ }
+
+ ceph_snapdiff_info sd_info;
+ ceph_snapdiff_entry_t sd_entry;
+
+ //The queue of SyncEntry items (directories) to be synchronized.
+ //We follow a breadth first approach here based on the snapdiff output.
+ std::queue<SyncEntry> sync_queue;
+
+ //start with initial/default entry
+ std::string epath = ".", npath = "", nabs_path = "", nname = "";
+ sync_queue.emplace(SyncEntry(epath, cstx));
+
+ while (!sync_queue.empty()) {
+ if (should_backoff(dir_root, &r)) {
+ dout(0) << ": backing off r=" << r << dendl;
+ break;
+ }
+ r = pre_sync_check_and_open_handles(dir_root, current, prev, &fh);
+ if (r < 0) {
+ dout(5) << ": cannot proceed with sync: " << cpp_strerror(r) << dendl;
+ return r;
+ }
+
+ dout(20) << ": " << sync_queue.size() << " entries in queue" << dendl;
+ const auto &queue_entry = sync_queue.front();
+ epath = queue_entry.epath;
+ dout(20) << ": syncing entry, path=" << epath << dendl;
+ r = ceph_open_snapdiff(fh.p_mnt, dir_root.c_str(), epath.c_str(),
+ stringify((*prev).first).c_str(), current.first.c_str(), &sd_info);
+ if (r != 0) {
+ derr << ": failed to open snapdiff, r=" << r << dendl;
+ return r;
+ }
+ while (0 < (r = ceph_readdir_snapdiff(&sd_info, &sd_entry))) {
+ if (r < 0) {
+ derr << ": failed to read directory=" << epath << dendl;
+ ceph_close_snapdiff(&sd_info);
+ return r;
+ }
+
+ //New entry found
+ nname = sd_entry.dir_entry.d_name;
+ if ("." == nname || ".." == nname)
+ continue;
+ // create path for the newly found entry
+ npath = entry_diff_path(epath, nname);
+ nabs_path = entry_diff_path(dir_root, npath);
+
+ r = ceph_statx(sd_info.cmount, nabs_path.c_str(), &cstx,
+ CEPH_STATX_MODE | CEPH_STATX_UID | CEPH_STATX_GID |
+ CEPH_STATX_SIZE | CEPH_STATX_ATIME | CEPH_STATX_MTIME,
+ AT_STATX_DONT_SYNC | AT_SYMLINK_NOFOLLOW);
+ if (r < 0) {
+ // can't stat, so it's a deleted entry.
+ if (DT_DIR == sd_entry.dir_entry.d_type) { // is a directory
+ r = cleanup_remote_dir(dir_root, npath, fh);
+ if (r < 0) {
+ derr << ": failed to remove directory=" << nabs_path << dendl;
+ break;
+ }
+ }
+ else { // is a file
+ r = ceph_unlinkat(m_remote_mount, fh.r_fd_dir_root, npath.c_str(), 0);
+ if (r < 0) {
+ break;
+ }
+ }
+ } else {
+ // stat success, update the existing entry
+ struct ceph_statx tstx;
+ int rstat_r = ceph_statx(m_remote_mount, nabs_path.c_str(), &tstx,
+ CEPH_STATX_MODE | CEPH_STATX_UID | CEPH_STATX_GID |
+ CEPH_STATX_SIZE | CEPH_STATX_ATIME | CEPH_STATX_MTIME,
+ AT_STATX_DONT_SYNC | AT_SYMLINK_NOFOLLOW);
+ if (S_ISDIR(cstx.stx_mode)) { // is a directory
+ //cleanup if it's a file in the remotefs
+ if ((0 == rstat_r) && !S_ISDIR(tstx.stx_mode)) {
+ r = ceph_unlinkat(m_remote_mount, fh.r_fd_dir_root, npath.c_str(), 0);
+ if (r < 0) {
+ derr << ": Error in directory sync. Failed to remove file="
+ << nabs_path << dendl;
+ break;
+ }
+ }
+ r = remote_mkdir(npath, cstx, fh);
+ if (r < 0) {
+ break;
+ }
+ // push it to sync_queue for later processing
+ sync_queue.emplace(SyncEntry(npath, cstx));
+ } else { // is a file
+ bool need_data_sync = true;
+ bool need_attr_sync = true;
+ r = should_sync_entry(npath, cstx, fh, &need_data_sync, &need_attr_sync);
+ if (r < 0) {
+ break;
+ }
+ dout(5) << ": entry=" << npath << ", data_sync=" << need_data_sync
+ << ", attr_sync=" << need_attr_sync << dendl;
+ if (need_data_sync || need_attr_sync) {
+ //cleanup if it's a directory in the remotefs
+ if ((0 == rstat_r) && S_ISDIR(tstx.stx_mode)) {
+ r = cleanup_remote_dir(dir_root, npath, fh);
+ if (r < 0) {
+ derr << ": Error in file sync. Failed to remove remote directory="
+ << nabs_path << dendl;
+ break;
+ }
+ }
+ r = remote_file_op(dir_root, npath, cstx, fh, need_data_sync, need_attr_sync);
+ if (r < 0) {
+ break;
+ }
+ }
+ }
+ }
+ }
+ if (0 == r) {
+ dout(10) << ": successfully synchronized the entry=" << epath << dendl;
+ }
+
+ //Close the current open directory and take the next queue_entry, if success or failure.
+ r = ceph_close_snapdiff(&sd_info);
+ if (r != 0) {
+ derr << ": failed to close directory=" << epath << dendl;
+ }
+ sync_queue.pop();
+ }
+ return r;
+}
+
int PeerReplayer::synchronize(const std::string &dir_root, const Snapshot ¤t,
boost::optional<Snapshot> prev) {
dout(20) << ": dir_root=" << dir_root << ", current=" << current << dendl;
if (r < 0) {
dout(5) << ": missing \"ceph.mirror.dirty_snap_id\" xattr on remote -- using"
<< " incremental sync with remote scan" << dendl;
- r = do_synchronize(dir_root, current, boost::none);
+ r = do_synchronize(dir_root, current);
} else {
size_t xlen = r;
char *val = (char *)alloca(xlen+1);
r = do_synchronize(dir_root, current, prev);
} else {
dout(5) << ": mismatch -- using incremental sync with remote scan" << dendl;
- r = do_synchronize(dir_root, current, boost::none);
+ r = do_synchronize(dir_root, current);
}
}