dout(1) << ": shutdown is already in progress - return"<< dendl;
return;
}
+
+ // wake up all datasync threads waiting on syncm_q
+ {
+ std::unique_lock smq_l1(smq_lock);
+ smq_cv.notify_all(); // wake up syncm_q wait
+ }
+
+ // Join data sync threads first
+ for (auto &replayer : m_data_replayers) {
+ replayer->join();
+ }
+ m_data_replayers.clear();
+
+ // Wake up crawler thread shutdown wait after datasync thread die
{
std::scoped_lock lock(m_lock);
- m_cond.notify_all(); //wake up shutdown wait
+ m_cond.notify_all();
}
for (auto &replayer : m_replayers) {
replayer->join();
}
m_replayers.clear();
- for (auto &replayer : m_data_replayers) {
- replayer->join();
- }
- m_data_replayers.clear();
ceph_unmount(m_remote_mount);
ceph_release(m_remote_mount);
return nullptr;
}
+void PeerReplayer::mark_and_notify_syncms_to_backoff(int err) {
+ // caller holds the smq_lock
+ ceph_assert(ceph_mutex_is_locked_by_me(smq_lock));
+ for (auto& syncm : syncm_q) {
+ std::unique_lock sdq_lock(syncm->get_sdq_lock());
+ syncm->set_datasync_error_unlocked(err);
+ syncm->mark_backoff_unlocked();
+ if (get_active_datasync_threads() == 1) { //Last thread
+ // To wake up crawler thread whose dataq process is not started
+ syncm->sdq_cv_notify_all_unlocked();
+ }
+ }
+ // To wake up other datasync threads to speed up exit
+ smq_cv.notify_all();
+}
+
void PeerReplayer::run_datasync(SnapshotDataSyncThread *data_replayer) {
dout(10) << ": snapshot datasync replayer=" << data_replayer << dendl;
- /* The entire snapshot is synced outside the lock. The m_lock and m_cond pair
- * is used for these threads along with crawler threads to work well with all
- * terminal conditions like shutdown.
+ /* The m_stopping is made atomic and m_lock is no longer required for state
+ * change or access. Hence data sync thread can get rid of waiting for is_stopping
+ * using m_lock
*/
- std::unique_lock locker(m_lock);
while (true) {
- m_cond.wait_for(locker, 1s, [this]{return is_stopping();});
- if (is_stopping()) {
- dout(5) << ": exiting snapshot data replayer=" << data_replayer << dendl;
- break;
- }
- // do not check if client is blocklisted under lock
- locker.unlock();
- if (m_fs_mirror->is_blocklisted()) {
- dout(5) << ": exiting snapshot data replayer=" << data_replayer << " as client is blocklisted" << dendl;
- break;
- }
-
+ bool shutdown = false;
+ bool blocklist = false;
std::shared_ptr<SyncMechanism> syncm;
{
std::unique_lock lock(smq_lock);
dout(20) << ": snapshot data replayer waiting for syncm to process" << dendl;
- smq_cv.wait(lock, [this] { return !syncm_q.empty() && pick_next_syncm();});
+ while (true) {
+ bool ready = smq_cv.wait_for(lock, 2s, [this] { return is_stopping() || (!syncm_q.empty() && pick_next_syncm());});
+ // immediate shutdown - predicate is true
+ if (is_stopping()) {
+ dout(5) << ": exiting snapshot data replayer=" << data_replayer
+ << " as mirroring is shutting down" << dendl;
+ shutdown = true;
+ mark_and_notify_syncms_to_backoff(-EINPROGRESS);
+ break;
+ }
+ // work available - predicate is true
+ if (ready)
+ break;
+ // Timed wake up path - blocklist check
+ if (m_fs_mirror->is_blocklisted()) {
+ dout(5) << ": exiting snapshot data replayer=" << data_replayer
+ << " as client is blocklisted" << dendl;
+ blocklist = true;
+ mark_and_notify_syncms_to_backoff(-EBLOCKLISTED);
+ break;
+ }
+ // otherwise timeout occured, nothing to do - loop again
+ }
+
+ if (shutdown || blocklist) {
+ break; //exit
+ }
// syncm is gauranteed to be non-null because of the predicate used in above wait.
syncm = pick_next_syncm();
dout(20) << ": snapshot data replayer woke up! syncm=" << syncm << dendl;
}
}
}
-
- //lock again to satify m_cond
- locker.lock();
- }
+ } // outer while
}
void PeerReplayer::peer_status(Formatter *f) {
m_datasync_error = true;
m_datasync_errno = err;
}
+ void set_datasync_error_unlocked(int err) {
+ m_datasync_error = true;
+ m_datasync_errno = err;
+ }
+ void mark_backoff_unlocked() {
+ m_backoff = true;
+ }
bool get_datasync_error_unlocked() {
return m_datasync_error;
}
bool m_take_snapshot = false;
bool m_datasync_error = false;
int m_datasync_errno = 0;
+ bool m_backoff = false;
};
class RemoteSync : public SyncMechanism {
int get_active_datasync_threads() const {
return m_active_datasync_threads.load(std::memory_order_relaxed);
}
+ void mark_and_notify_syncms_to_backoff(int err);
boost::optional<std::string> pick_directory();
int register_directory(const std::string &dir_root, SnapshotReplayerThread *replayer);