if (it != m_directories.end()) {
m_directories.erase(it);
}
- if (m_registered.find(_dir_path) == m_registered.end()) {
+
+ auto it1 = m_registered.find(_dir_path);
+ if (it1 == m_registered.end()) {
m_snap_sync_stats.erase(_dir_path);
+ } else {
+ it1->second.replayer->cancel();
}
m_cond.notify_all();
}
boost::optional<std::string> PeerReplayer::pick_directory() {
dout(20) << dendl;
+ auto now = clock::now();
+ auto retry_timo = g_ceph_context->_conf.get_val<uint64_t>(
+ "cephfs_mirror_retry_failed_directories_interval");
+
boost::optional<std::string> candidate;
for (auto &dir_path : m_directories) {
+ auto &sync_stat = m_snap_sync_stats.at(dir_path);
+ if (sync_stat.failed) {
+ std::chrono::duration<double> d = now - *sync_stat.last_failed;
+ if (d.count() < retry_timo) {
+ continue;
+ }
+ }
if (!m_registered.count(dir_path)) {
candidate = dir_path;
break;
#define NR_IOVECS 8 // # iovecs
#define IOVEC_SIZE (8 * 1024 * 1024) // buffer size for each iovec
-int PeerReplayer::remote_copy(const std::string &local_path,
+int PeerReplayer::remote_copy(const std::string &dir_path,
+ const std::string &local_path,
const std::string &remote_path,
const struct ceph_statx &stx) {
- dout(10) << ": local_path=" << local_path << ", remote_path=" << remote_path
- << dendl;
+ dout(10) << ": dir_path=" << dir_path << ", local_path=" << local_path
+ << ", remote_path=" << remote_path << dendl;
int l_fd;
int r_fd;
void *ptr;
}
while (true) {
+ if (should_backoff(dir_path, &r)) {
+ dout(0) << ": backing off r=" << r << dendl;
+ break;
+ }
+
for (int i = 0; i < NR_IOVECS; ++i) {
iov[i].iov_base = (char*)ptr + IOVEC_SIZE*i;
iov[i].iov_len = IOVEC_SIZE;
return r == 0 ? 0 : r;
}
-int PeerReplayer::remote_file_op(const std::string &local_path,
+int PeerReplayer::remote_file_op(const std::string &dir_path,
+ const std::string &local_path,
const std::string &remote_path,
const struct ceph_statx &stx) {
- dout(10) << ": local_path=" << local_path << ", remote_path=" << remote_path
- << dendl;
+ dout(10) << ": dir_path=" << dir_path << ", local_path=" << local_path
+ << ", remote_path=" << remote_path << dendl;
int r;
if (S_ISREG(stx.stx_mode)) {
- r = remote_copy(local_path, remote_path, stx);
+ r = remote_copy(dir_path, local_path, remote_path, stx);
if (r < 0) {
derr << ": failed to copy path=" << local_path << ": " << cpp_strerror(r)
<< dendl;
rm_stack.emplace(SyncEntry(dir_path, tdirp, tstx));
while (!rm_stack.empty()) {
+ if (should_backoff(dir_path, &r)) {
+ dout(0) << ": backing off r=" << r << dendl;
+ break;
+ }
+
dout(20) << ": " << rm_stack.size() << " entries in stack" << dendl;
std::string e_name;
auto &entry = rm_stack.top();
sync_stack.emplace(SyncEntry("/", tdirp, tstx));
while (!sync_stack.empty()) {
+ if (should_backoff(dir_path, &r)) {
+ dout(0) << ": backing off r=" << r << dendl;
+ break;
+ }
+
dout(20) << ": " << sync_stack.size() << " entries in stack" << dendl;
std::string e_name;
auto &entry = sync_stack.top();
} else {
auto l_path = entry_path(snap_path, entry.epath);
auto r_path = entry_path(dir_path, entry.epath);
- r = remote_file_op(l_path, r_path, entry.stx);
+ r = remote_file_op(dir_path, l_path, r_path, entry.stx);
if (r < 0) {
break;
}
derr << ": failed to sync snapshots for dir_path=" << dir_path << dendl;
}
locker.lock();
+ if (r < 0) {
+ _inc_failed_count(dir_path);
+ } else {
+ _reset_failed_count(dir_path);
+ }
}
void PeerReplayer::run(SnapshotReplayerThread *replayer) {
f->open_object_section("stats");
for (auto &[dir_path, sync_stat] : m_snap_sync_stats) {
f->open_object_section(dir_path);
- if (!sync_stat.current_syncing_snap) {
+ if (sync_stat.failed) {
+ f->dump_string("state", "failed");
+ } else if (!sync_stat.current_syncing_snap) {
f->dump_string("state", "idle");
} else {
f->dump_string("state", "syncing");
return 0;
}
+ void cancel() {
+ canceled = true;
+ }
+
+ bool is_canceled() const {
+ return canceled;
+ }
+
private:
PeerReplayer *m_peer_replayer;
+ bool canceled = false;
};
struct DirRegistry {
using time = ceph::coarse_mono_time;
struct SnapSyncStat {
+ uint64_t nr_failures = 0; // number of consecutive failures
+ boost::optional<time> last_failed; // lat failed timestamp
+ bool failed = false; // hit upper cap for consecutive failures
boost::optional<std::pair<uint64_t, std::string>> last_synced_snap;
boost::optional<std::pair<uint64_t, std::string>> current_syncing_snap;
uint64_t synced_snap_count = 0;
boost::optional<double> last_sync_duration;
};
+ void _inc_failed_count(const std::string &dir_path) {
+ auto max_failures = g_ceph_context->_conf.get_val<uint64_t>(
+ "cephfs_mirror_max_consecutive_failures_per_directory");
+ auto &sync_stat = m_snap_sync_stats.at(dir_path);
+ sync_stat.last_failed = clock::now();
+ if (++sync_stat.nr_failures >= max_failures) {
+ sync_stat.failed = true;
+ }
+ }
+ void _reset_failed_count(const std::string &dir_path) {
+ auto &sync_stat = m_snap_sync_stats.at(dir_path);
+ sync_stat.nr_failures = 0;
+ sync_stat.failed = false;
+ sync_stat.last_failed = boost::none;
+ }
+
void _set_last_synced_snap(const std::string &dir_path, uint64_t snap_id,
const std::string &snap_name) {
auto &sync_stat = m_snap_sync_stats.at(dir_path);
++sync_stat.synced_snap_count;
}
+ bool should_backoff(const std::string &dir_path, int *retval) {
+ if (m_fs_mirror->is_blocklisted()) {
+ *retval = -EBLOCKLISTED;
+ return true;
+ }
+
+ std::scoped_lock locker(m_lock);
+ if (is_stopping()) {
+ // ceph defines EBLOCKLISTED to ESHUTDOWN (108). so use
+ // EINPROGRESS to identify shutdown.
+ *retval = -EINPROGRESS;
+ return true;
+ }
+ auto &dr = m_registered.at(dir_path);
+ if (dr.replayer->is_canceled()) {
+ *retval = -ECANCELED;
+ return true;
+ }
+
+ *retval = 0;
+ return false;
+ }
+
typedef std::vector<std::unique_ptr<SnapshotReplayerThread>> SnapshotReplayers;
CephContext *m_cct;
int cleanup_remote_dir(const std::string &dir_path);
int remote_mkdir(const std::string &local_path, const std::string &remote_path,
const struct ceph_statx &stx);
- int remote_file_op(const std::string &local_path, const std::string &remote_path,
- const struct ceph_statx &stx);
- int remote_copy(const std::string &local_path,const std::string &remote_path,
+ int remote_file_op(const std::string &dir_path,
+ const std::string &local_path,
+ const std::string &remote_path, const struct ceph_statx &stx);
+ int remote_copy(const std::string &dir_path,
+ const std::string &local_path,
+ const std::string &remote_path,
const struct ceph_statx &local_stx);
};