// vim: ts=8 sw=2 smarttab
#include <stack>
-#include <queue>
#include <fcntl.h>
#include <algorithm>
#include <sys/time.h>
return 0;
}
-int PeerReplayer::do_synchronize(const std::string &dir_root, const Snapshot ¤t) {
- dout(20) << ": dir_root=" << dir_root << ", current=" << current << dendl;
- FHandles fh;
- int r = pre_sync_check_and_open_handles(dir_root, current, boost::none, &fh);
+PeerReplayer::SyncMechanism::SyncMechanism(MountRef local, MountRef remote, FHandles *fh,
+ const Peer &peer, const Snapshot ¤t,
+ boost::optional<Snapshot> prev)
+ : m_local(local),
+ m_remote(remote),
+ m_fh(fh),
+ m_peer(peer),
+ m_current(current),
+ m_prev(prev) {
+ }
+
+PeerReplayer::SyncMechanism::~SyncMechanism() {
+}
+
+PeerReplayer::SnapDiffSync::SnapDiffSync(std::string_view dir_root, MountRef local, MountRef remote,
+ FHandles *fh, const Peer &peer, const Snapshot ¤t,
+ boost::optional<Snapshot> prev)
+ : SyncMechanism(local, remote, fh, peer, current, prev),
+ m_dir_root(dir_root) {
+}
+
+PeerReplayer::SnapDiffSync::~SnapDiffSync() {
+}
+
+int PeerReplayer::SnapDiffSync::init_sync() {
+ struct ceph_statx tstx;
+ int r = ceph_fstatx(m_local, m_fh->c_fd, &tstx,
+ CEPH_STATX_MODE | CEPH_STATX_UID | CEPH_STATX_GID |
+ CEPH_STATX_SIZE | CEPH_STATX_ATIME | CEPH_STATX_MTIME,
+ AT_STATX_DONT_SYNC | AT_SYMLINK_NOFOLLOW);
if (r < 0) {
- dout(5) << ": cannot proceed with sync: " << cpp_strerror(r) << dendl;
+ derr << ": failed to stat snap=" << m_current.first << ": " << cpp_strerror(r)
+ << dendl;
return r;
}
- // record that we are going to "dirty" the data under this
- // directory root
- auto snap_id_str{stringify(current.second)};
- r = ceph_fsetxattr(m_remote_mount, fh.r_fd_dir_root, "ceph.mirror.dirty_snap_id",
- snap_id_str.c_str(), snap_id_str.size(), 0);
- if (r < 0) {
- derr << ": error setting \"ceph.mirror.dirty_snap_id\" on dir_root=" << dir_root
- << ": " << cpp_strerror(r) << dendl;
- ceph_close(m_local_mount, fh.c_fd);
- ceph_close(fh.p_mnt, fh.p_fd);
+ dout(20) << ": open_snapdiff for dir_root=" << m_dir_root << ", path=., prev="
+ << (*m_prev).first << ", current=" << m_current.first << dendl;
+
+ ceph_snapdiff_info info;
+ r = ceph_open_snapdiff(m_local, m_dir_root.c_str(), ".",
+ stringify((*m_prev).first).c_str(), stringify(m_current.first).c_str(), &info);
+ if (r != 0) {
+ derr << ": failed to open snapdiff for " << m_dir_root << ": r=" << r << dendl;
return r;
}
+ m_sync_stack.emplace(SyncEntry(".", info, tstx));
+ return 0;
+}
+
+int PeerReplayer::SnapDiffSync::get_entry(std::string *epath, struct ceph_statx *stx,
+ const std::function<int (const std::string&)> &dirsync_func,
+ const std::function<int (const std::string &)> &purge_func) {
+ dout(20) << ": sync stack size=" << m_sync_stack.size() << dendl;
+
+ while (!m_sync_stack.empty()) {
+ auto &entry = m_sync_stack.top();
+ dout(20) << ": top of stack path=" << entry.epath << dendl;
+
+ if (!entry.is_directory()) {
+ *epath = entry.epath;
+ *stx = entry.stx;
+ m_sync_stack.pop();
+ return 0;
+ }
+
+ int r;
+ std::string e_name;
+ ceph_snapdiff_entry_t sd_entry;
+ while (true) {
+ r = ceph_readdir_snapdiff(&(entry.info), &sd_entry);
+ if (r < 0) {
+ derr << ": failed to read directory=" << entry.epath << dendl;
+ break;
+ }
+ if (r == 0) {
+ break;
+ }
+
+ dout(20) << ": entry=" << sd_entry.dir_entry.d_name << ", snapid="
+ << sd_entry.snapid << dendl;
+
+ auto d_name = std::string(sd_entry.dir_entry.d_name);
+ if (d_name != "." && d_name != "..") {
+ e_name = d_name;
+ break;
+ }
+ }
+
+ if (r == 0) {
+ dout(10) << ": done for directory=" << entry.epath << dendl;
+ if (ceph_close_snapdiff(&(entry.info)) < 0) {
+ derr << ": failed to close snapdiff for " << entry.epath << dendl;
+ }
+ m_sync_stack.pop();
+ continue;
+ }
+
+ if (r < 0) {
+ return r;
+ }
+
+ auto _epath = entry_path(entry.epath, e_name);
+ dout(20) << ": epath=" << _epath << dendl;
+ if (sd_entry.snapid == (*m_prev).second) {
+ dout(20) << ": epath=" << _epath << " is deleted in current snapshot " << dendl;
+ // do not depend on d_type reported in struct dirent as the
+ // delete and create could have been processed and a restart
+ // of an interrupted sync would use the incorrect unlink API.
+ // N.B.: snapdiff returns the deleted entry before the newly
+ // created one.
+ struct ceph_statx pstx;
+ r = ceph_statxat(m_remote, m_fh->r_fd_dir_root, _epath.c_str(), &pstx,
+ CEPH_STATX_MODE, AT_STATX_DONT_SYNC | AT_SYMLINK_NOFOLLOW);
+ if (r < 0 && r != -ENOENT) {
+ derr << ": failed to stat remote entry=" << _epath << ": " << cpp_strerror(r)
+ << dendl;
+ return r;
+ }
+ if (r == 0) {
+ if (!S_ISDIR(pstx.stx_mode)) {
+ r = ceph_unlinkat(m_remote, m_fh->r_fd_dir_root, _epath.c_str(), 0);
+ } else {
+ r = purge_func(_epath);
+ }
+
+ if (r < 0) {
+ derr << ": failed to propagate missing dirs: " << cpp_strerror(r) << dendl;
+ return r;
+ }
+ }
+
+ continue;
+ }
+
+ struct ceph_statx estx;
+ r = ceph_statxat(m_local, m_fh->c_fd, _epath.c_str(), &estx,
+ CEPH_STATX_MODE | CEPH_STATX_UID | CEPH_STATX_GID |
+ CEPH_STATX_SIZE | CEPH_STATX_ATIME | CEPH_STATX_MTIME,
+ AT_STATX_DONT_SYNC | AT_SYMLINK_NOFOLLOW);
+ if (r < 0) {
+ derr << ": failed to stat epath=" << epath << ": " << cpp_strerror(r)
+ << dendl;
+ return r;
+ }
+
+ if (S_ISDIR(estx.stx_mode)) {
+ dout(20) << ": open_snapdiff for dir_root=" << m_dir_root << ", path=" << _epath
+ << ", prev=" << (*m_prev).first << ", current=" << m_current.first << dendl;
+
+ ceph_snapdiff_info info;
+ r = ceph_open_snapdiff(m_local, m_dir_root.c_str(), _epath.c_str(),
+ stringify((*m_prev).first).c_str(), stringify(m_current.first).c_str(), &info);
+ if (r != 0) {
+ derr << ": failed to open snapdiff for " << m_dir_root << ": r=" << r << dendl;
+ return r;
+ }
+
+ m_sync_stack.emplace(SyncEntry(_epath, info, estx));
+ }
+
+ *epath = _epath;
+ *stx = estx;
+
+ return 0;
+ }
+
+ *epath = "";
+ return 0;
+}
+
+void PeerReplayer::SnapDiffSync::finish_sync() {
+ dout(20) << dendl;
+
+ while (!m_sync_stack.empty()) {
+ auto &entry = m_sync_stack.top();
+ if (entry.is_directory()) {
+ dout(20) << ": closing local directory=" << entry.epath << dendl;
+ if (ceph_close_snapdiff(&(entry.info)) < 0) {
+ derr << ": failed to close snapdiff directory=" << entry.epath << dendl;
+ }
+ }
+
+ m_sync_stack.pop();
+ }
+}
+
+PeerReplayer::RemoteSync::RemoteSync(MountRef local, MountRef remote, FHandles *fh,
+ const Peer &peer, const Snapshot ¤t,
+ boost::optional<Snapshot> prev)
+ : SyncMechanism(local, remote, fh, peer, current, prev) {
+}
+
+PeerReplayer::RemoteSync::~RemoteSync() {
+}
+
+int PeerReplayer::RemoteSync::init_sync() {
struct ceph_statx tstx;
- r = ceph_fstatx(m_local_mount, fh.c_fd, &tstx,
- CEPH_STATX_MODE | CEPH_STATX_UID | CEPH_STATX_GID |
- CEPH_STATX_SIZE | CEPH_STATX_ATIME | CEPH_STATX_MTIME,
- AT_STATX_DONT_SYNC | AT_SYMLINK_NOFOLLOW);
+ int r = ceph_fstatx(m_local, m_fh->c_fd, &tstx,
+ CEPH_STATX_MODE | CEPH_STATX_UID | CEPH_STATX_GID |
+ CEPH_STATX_SIZE | CEPH_STATX_ATIME | CEPH_STATX_MTIME,
+ AT_STATX_DONT_SYNC | AT_SYMLINK_NOFOLLOW);
if (r < 0) {
- derr << ": failed to stat snap=" << current.first << ": " << cpp_strerror(r)
+ derr << ": failed to stat snap=" << m_current.first << ": " << cpp_strerror(r)
<< dendl;
- ceph_close(m_local_mount, fh.c_fd);
- ceph_close(fh.p_mnt, fh.p_fd);
return r;
}
ceph_dir_result *tdirp;
- r = ceph_fdopendir(m_local_mount, fh.c_fd, &tdirp);
+ r = ceph_fdopendir(m_local, m_fh->c_fd, &tdirp);
if (r < 0) {
- derr << ": failed to open local snap=" << current.first << ": " << cpp_strerror(r)
+ derr << ": failed to open local snap=" << m_current.first << ": " << cpp_strerror(r)
<< dendl;
- ceph_close(m_local_mount, fh.c_fd);
- ceph_close(fh.p_mnt, fh.p_fd);
return r;
}
- // starting from this point we shouldn't care about manual closing of fh.c_fd,
- // it will be closed automatically when bound tdirp is closed.
- std::stack<SyncEntry> sync_stack;
- sync_stack.emplace(SyncEntry(".", tdirp, tstx));
- while (!sync_stack.empty()) {
- if (should_backoff(dir_root, &r)) {
- dout(0) << ": backing off r=" << r << dendl;
- break;
- }
+ m_sync_stack.emplace(SyncEntry(".", tdirp, tstx));
+ return 0;
+}
- dout(20) << ": " << sync_stack.size() << " entries in stack" << dendl;
- std::string e_name;
- auto &entry = sync_stack.top();
+int PeerReplayer::RemoteSync::get_entry(std::string *epath, struct ceph_statx *stx,
+ const std::function<int (const std::string&)> &dirsync_func,
+ const std::function<int (const std::string &)> &purge_func) {
+ dout(20) << ": sync stack size=" << m_sync_stack.size() << dendl;
+
+ while (!m_sync_stack.empty()) {
+ auto &entry = m_sync_stack.top();
dout(20) << ": top of stack path=" << entry.epath << dendl;
- if (entry.is_directory()) {
- // entry is a directory -- propagate deletes for missing entries
- // (and changed inode types) to the remote filesystem.
- if (!entry.needs_remote_sync()) {
- r = propagate_deleted_entries(dir_root, entry.epath, fh);
- if (r < 0 && r != -ENOENT) {
- derr << ": failed to propagate missing dirs: " << cpp_strerror(r) << dendl;
- break;
- }
- entry.set_remote_synced();
- }
- struct ceph_statx stx;
- struct dirent de;
- while (true) {
- r = ceph_readdirplus_r(m_local_mount, entry.dirp, &de, &stx,
- CEPH_STATX_MODE | CEPH_STATX_UID | CEPH_STATX_GID |
- CEPH_STATX_SIZE | CEPH_STATX_ATIME | CEPH_STATX_MTIME,
- AT_STATX_DONT_SYNC | AT_SYMLINK_NOFOLLOW, NULL);
- if (r < 0) {
- derr << ": failed to local read directory=" << entry.epath << dendl;
- break;
- }
- if (r == 0) {
- break;
- }
+ if (!entry.is_directory()) {
+ *epath = entry.epath;
+ *stx = entry.stx;
+ m_sync_stack.pop();
+ return 0;
+ }
- auto d_name = std::string(de.d_name);
- if (d_name != "." && d_name != "..") {
- e_name = d_name;
- break;
- }
+ // entry is a directory -- propagate deletes for missing entries
+ // (and changed inode types) to the remote filesystem.
+ if (!entry.needs_remote_sync()) {
+ int r = dirsync_func(entry.epath);
+ if (r < 0 && r != -ENOENT) {
+ derr << ": failed to propagate missing dirs: " << cpp_strerror(r) << dendl;
+ return r;
}
+ entry.set_remote_synced();
+ }
+ int r;
+ std::string e_name;
+ while (true) {
+ struct dirent de;
+ r = ceph_readdirplus_r(m_local, entry.dirp, &de, NULL,
+ CEPH_STATX_MODE | CEPH_STATX_UID | CEPH_STATX_GID |
+ CEPH_STATX_SIZE | CEPH_STATX_ATIME | CEPH_STATX_MTIME,
+ AT_STATX_DONT_SYNC | AT_SYMLINK_NOFOLLOW, NULL);
+ if (r < 0) {
+ derr << ": failed to local read directory=" << entry.epath << dendl;
+ break;
+ }
if (r == 0) {
- dout(10) << ": done for directory=" << entry.epath << dendl;
- if (ceph_closedir(m_local_mount, entry.dirp) < 0) {
- derr << ": failed to close local directory=" << entry.epath << dendl;
- }
- sync_stack.pop();
- continue;
+ break;
}
- if (r < 0) {
+
+ auto d_name = std::string(de.d_name);
+ if (d_name != "." && d_name != "..") {
+ e_name = d_name;
break;
}
+ }
- auto epath = entry_path(entry.epath, e_name);
- if (S_ISDIR(stx.stx_mode)) {
- r = remote_mkdir(epath, stx, fh);
- if (r < 0) {
- break;
- }
- ceph_dir_result *dirp;
- r = opendirat(m_local_mount, fh.c_fd, epath, AT_SYMLINK_NOFOLLOW, &dirp);
- if (r < 0) {
- derr << ": failed to open local directory=" << epath << ": "
- << cpp_strerror(r) << dendl;
- break;
- }
- sync_stack.emplace(SyncEntry(epath, dirp, stx));
- } else {
- sync_stack.emplace(SyncEntry(epath, stx));
+ if (r == 0) {
+ dout(10) << ": done for directory=" << entry.epath << dendl;
+ if (ceph_closedir(m_local, entry.dirp) < 0) {
+ derr << ": failed to close local directory=" << entry.epath << dendl;
}
- } else {
- bool need_data_sync = true;
- bool need_attr_sync = true;
- r = should_sync_entry(entry.epath, entry.stx, fh,
- &need_data_sync, &need_attr_sync);
+ m_sync_stack.pop();
+ continue;
+ }
+
+ if (r < 0) {
+ return r;
+ }
+
+ struct ceph_statx cstx;
+ auto _epath = entry_path(entry.epath, e_name);
+ r = ceph_statxat(m_local, m_fh->c_fd, _epath.c_str(), &cstx,
+ CEPH_STATX_MODE | CEPH_STATX_UID | CEPH_STATX_GID |
+ CEPH_STATX_SIZE | CEPH_STATX_ATIME | CEPH_STATX_MTIME,
+ AT_STATX_DONT_SYNC | AT_SYMLINK_NOFOLLOW);
+ if (r < 0) {
+ derr << ": failed to stat epath=" << _epath << ": " << cpp_strerror(r)
+ << dendl;
+ return r;
+ }
+
+ if (S_ISDIR(cstx.stx_mode)) {
+ ceph_dir_result *dirp;
+ r = opendirat(m_local, m_fh->c_fd, _epath, AT_SYMLINK_NOFOLLOW, &dirp);
if (r < 0) {
+ derr << ": failed to open local directory=" << _epath << ": "
+ << cpp_strerror(r) << dendl;
break;
}
- dout(5) << ": entry=" << entry.epath << ", data_sync=" << need_data_sync
- << ", attr_sync=" << need_attr_sync << dendl;
- if (need_data_sync || need_attr_sync) {
- r = remote_file_op(dir_root, entry.epath, entry.stx, fh, need_data_sync,
- need_attr_sync);
- if (r < 0) {
- break;
- }
- }
- dout(10) << ": done for epath=" << entry.epath << dendl;
- sync_stack.pop();
+ m_sync_stack.emplace(SyncEntry(_epath, dirp, cstx));
}
+
+ *epath = _epath;
+ *stx = cstx;
+
+ return 0;
}
- while (!sync_stack.empty()) {
- auto &entry = sync_stack.top();
+ *epath = "";
+ return 0;
+}
+
+void PeerReplayer::RemoteSync::finish_sync() {
+ dout(20) << dendl;
+
+ while (!m_sync_stack.empty()) {
+ auto &entry = m_sync_stack.top();
if (entry.is_directory()) {
dout(20) << ": closing local directory=" << entry.epath << dendl;
- if (ceph_closedir(m_local_mount, entry.dirp) < 0) {
+ if (ceph_closedir(m_local, entry.dirp) < 0) {
derr << ": failed to close local directory=" << entry.epath << dendl;
}
}
- sync_stack.pop();
+ m_sync_stack.pop();
}
-
- dout(20) << " cur:" << fh.c_fd
- << " prev:" << fh.p_fd
- << " ret = " << r
- << dendl;
-
- // @FHandles.r_fd_dir_root is closed in @unregister_directory since
- // its used to acquire an exclusive lock on remote dir_root.
-
- // c_fd has been used in ceph_fdopendir call so
- // there is no need to close this fd manually.
- ceph_close(fh.p_mnt, fh.p_fd);
-
- return r;
}
int PeerReplayer::do_synchronize(const std::string &dir_root, const Snapshot ¤t,
boost::optional<Snapshot> prev) {
- if (!prev) {
- derr << ": invalid previous snapshot" << dendl;
- return -ENODATA;
- }
-
- dout(20) << ": incremental sync check from prev=" << prev << dendl;
-
+ dout(20) << ": dir_root=" << dir_root << ", current=" << current << dendl;
FHandles fh;
int r = pre_sync_check_and_open_handles(dir_root, current, prev, &fh);
if (r < 0) {
return r;
}
-
- // record that we are going to "dirty" the data under this directory root
+ // record that we are going to "dirty" the data under this
+ // directory root
auto snap_id_str{stringify(current.second)};
- r = ceph_setxattr(m_remote_mount, dir_root.c_str(), "ceph.mirror.dirty_snap_id",
- snap_id_str.c_str(), snap_id_str.size(), 0);
+ r = ceph_fsetxattr(m_remote_mount, fh.r_fd_dir_root, "ceph.mirror.dirty_snap_id",
+ snap_id_str.c_str(), snap_id_str.size(), 0);
if (r < 0) {
derr << ": error setting \"ceph.mirror.dirty_snap_id\" on dir_root=" << dir_root
<< ": " << cpp_strerror(r) << dendl;
return r;
}
- struct ceph_statx cstx;
- r = ceph_fstatx(m_local_mount, fh.c_fd, &cstx,
- CEPH_STATX_MODE | CEPH_STATX_UID | CEPH_STATX_GID |
- CEPH_STATX_SIZE | CEPH_STATX_ATIME | CEPH_STATX_MTIME,
- AT_STATX_DONT_SYNC | AT_SYMLINK_NOFOLLOW);
+ SyncMechanism *syncm;
+ if (fh.p_mnt == m_local_mount) {
+ syncm = new SnapDiffSync(dir_root, m_local_mount, m_remote_mount, &fh,
+ m_peer, current, prev);
+ } else {
+ syncm = new RemoteSync(m_local_mount, m_remote_mount, &fh,
+ m_peer, current, boost::none);
+ }
+
+ r = syncm->init_sync();
if (r < 0) {
- derr << ": failed to stat snap=" << current.first << ": " << cpp_strerror(r)
- << dendl;
+ derr << ": failed to initialize sync mechanism" << dendl;
ceph_close(m_local_mount, fh.c_fd);
ceph_close(fh.p_mnt, fh.p_fd);
+ delete syncm;
return r;
}
- ceph_snapdiff_info sd_info;
- ceph_snapdiff_entry_t sd_entry;
-
- //The queue of SyncEntry items (directories) to be synchronized.
- //We follow a breadth first approach here based on the snapdiff output.
- std::queue<SyncEntry> sync_queue;
-
- //start with initial/default entry
- std::string epath = ".", npath = "", nname = "";
- sync_queue.emplace(SyncEntry(epath, cstx));
-
- while (!sync_queue.empty()) {
+ // starting from this point we shouldn't care about manual closing of fh.c_fd,
+ // it will be closed automatically when bound tdirp is closed.
+ while (true) {
if (should_backoff(dir_root, &r)) {
dout(0) << ": backing off r=" << r << dendl;
break;
}
- dout(20) << ": " << sync_queue.size() << " entries in queue" << dendl;
- const auto &queue_entry = sync_queue.front();
- epath = queue_entry.epath;
- dout(20) << ": syncing entry, path=" << epath << dendl;
- r = ceph_open_snapdiff(fh.p_mnt, dir_root.c_str(), epath.c_str(),
- stringify((*prev).first).c_str(), current.first.c_str(), &sd_info);
- if (r != 0) {
- derr << ": failed to open snapdiff, r=" << r << dendl;
- ceph_close(m_local_mount, fh.c_fd);
- ceph_close(fh.p_mnt, fh.p_fd);
- return r;
+ std::string epath;
+ struct ceph_statx stx;
+ r = syncm->get_entry(&epath, &stx,
+ [this, &dir_root, &fh](const std::string &epath) {
+ return propagate_deleted_entries(dir_root, epath, fh);
+ },
+ [this, &dir_root, &fh](const std::string &epath) {
+ return cleanup_remote_dir(dir_root, epath, fh);
+ });
+ dout(20) << ": r=" << r << dendl;
+ if (r < 0) {
+ break;
}
- while (0 < (r = ceph_readdir_snapdiff(&sd_info, &sd_entry))) {
+
+ dout(20) << ": epath=" << epath << dendl;
+ if (epath == "") {
+ dout(10) << ": tree traversal done for dir_root=" << dir_root << dendl;
+ break;
+ }
+
+ if (S_ISDIR(stx.stx_mode)) {
+ r = remote_mkdir(epath, stx, fh);
if (r < 0) {
- derr << ": failed to read directory=" << epath << dendl;
- ceph_close_snapdiff(&sd_info);
- ceph_close(m_local_mount, fh.c_fd);
- ceph_close(fh.p_mnt, fh.p_fd);
- return r;
+ break;
}
-
- //New entry found
- nname = sd_entry.dir_entry.d_name;
- if ("." == nname || ".." == nname)
- continue;
- // create path for the newly found entry
- npath = entry_path(epath, nname);
- r = ceph_statxat(m_local_mount, fh.c_fd, npath.c_str(), &cstx,
- CEPH_STATX_MODE | CEPH_STATX_UID | CEPH_STATX_GID |
- CEPH_STATX_SIZE | CEPH_STATX_ATIME | CEPH_STATX_MTIME,
- AT_STATX_DONT_SYNC | AT_SYMLINK_NOFOLLOW);
+ } else {
+ bool need_data_sync = true;
+ bool need_attr_sync = true;
+ r = should_sync_entry(epath, stx, fh,
+ &need_data_sync, &need_attr_sync);
if (r < 0) {
- // can't stat, so it's a deleted entry.
- if (DT_DIR == sd_entry.dir_entry.d_type) { // is a directory
- r = cleanup_remote_dir(dir_root, npath, fh);
- if (r < 0) {
- derr << ": failed to remove directory=" << npath << dendl;
- break;
- }
- }
- else { // is a file
- r = ceph_unlinkat(m_remote_mount, fh.r_fd_dir_root, npath.c_str(), 0);
- if (r < 0) {
- break;
- }
- }
- } else {
- // stat success, update the existing entry
- struct ceph_statx tstx;
- int rstat_r = ceph_statxat(m_remote_mount, fh.r_fd_dir_root, npath.c_str(), &tstx,
- CEPH_STATX_MODE | CEPH_STATX_UID | CEPH_STATX_GID |
- CEPH_STATX_SIZE | CEPH_STATX_ATIME | CEPH_STATX_MTIME,
- AT_STATX_DONT_SYNC | AT_SYMLINK_NOFOLLOW);
- if (S_ISDIR(cstx.stx_mode)) { // is a directory
- //cleanup if it's a file in the remotefs
- if ((0 == rstat_r) && !S_ISDIR(tstx.stx_mode)) {
- r = ceph_unlinkat(m_remote_mount, fh.r_fd_dir_root, npath.c_str(), 0);
- if (r < 0) {
- derr << ": Error in directory sync. Failed to remove file="
- << npath << dendl;
- break;
- }
- }
- r = remote_mkdir(npath, cstx, fh);
- if (r < 0) {
- break;
- }
- // push it to sync_queue for later processing
- sync_queue.emplace(SyncEntry(npath, cstx));
- } else { // is a file
- bool need_data_sync = true;
- bool need_attr_sync = true;
- r = should_sync_entry(npath, cstx, fh, &need_data_sync, &need_attr_sync);
- if (r < 0) {
- break;
- }
- dout(5) << ": entry=" << npath << ", data_sync=" << need_data_sync
- << ", attr_sync=" << need_attr_sync << dendl;
- if (need_data_sync || need_attr_sync) {
- //cleanup if it's a directory in the remotefs
- if ((0 == rstat_r) && S_ISDIR(tstx.stx_mode)) {
- r = cleanup_remote_dir(dir_root, npath, fh);
- if (r < 0) {
- derr << ": Error in file sync. Failed to remove remote directory="
- << npath << dendl;
- break;
- }
- }
- r = remote_file_op(dir_root, npath, cstx, fh, need_data_sync, need_attr_sync);
- if (r < 0) {
- break;
- }
- }
- }
+ break;
}
- }
- if (0 == r) {
- dout(10) << ": successfully synchronized the entry=" << epath << dendl;
- }
- //Close the current open directory and take the next queue_entry, if success or failure.
- r = ceph_close_snapdiff(&sd_info);
- if (r != 0) {
- derr << ": failed to close directory=" << epath << dendl;
+ dout(5) << ": entry=" << epath << ", data_sync=" << need_data_sync
+ << ", attr_sync=" << need_attr_sync << dendl;
+ if (need_data_sync || need_attr_sync) {
+ r = remote_file_op(dir_root, epath, stx, fh, need_data_sync, need_attr_sync);
+ if (r < 0) {
+ break;
+ }
+ }
+ dout(10) << ": done for epath=" << epath << dendl;
}
- sync_queue.pop();
}
- dout(20) << " current:" << fh.c_fd
+ syncm->finish_sync();
+ delete syncm;
+
+ dout(20) << " cur:" << fh.c_fd
<< " prev:" << fh.p_fd
<< " ret = " << r
<< dendl;
// @FHandles.r_fd_dir_root is closed in @unregister_directory since
// its used to acquire an exclusive lock on remote dir_root.
- ceph_close(m_local_mount, fh.c_fd);
+ // c_fd has been used in ceph_fdopendir call so
+ // there is no need to close this fd manually.
ceph_close(fh.p_mnt, fh.p_fd);
+
return r;
}