// wake up all datasync threads waiting on syncm_q
{
std::unique_lock smq_l1(smq_lock);
+ for (auto& syncm : syncm_q) {
+ std::unique_lock sdq_l1(syncm->get_sdq_lock());
+ syncm->sdq_cv_notify_all_unlocked();
+ }
smq_cv.notify_all(); // wake up syncm_q wait
}
}
bool PeerReplayer::SyncMechanism::pop_dataq_entry(SyncEntry &out_entry) {
+ std::unique_lock smq_lock(m_peer_replayer.get_smq_lock());
std::unique_lock lock(sdq_lock);
dout(20) << ": snapshot data replayer waiting on m_sync_dataq, syncm=" << this << dendl;
sdq_cv.wait(lock, [this]{ return !m_sync_dataq.empty() || m_crawl_finished || m_datasync_error || m_crawl_error;});
+ while (true) {
+ bool ready = sdq_cv.wait_for(lock, 2s, [this] {
+ return m_peer_replayer.is_stopping() ||
+ !m_sync_dataq.empty() ||
+ m_crawl_finished ||
+ m_datasync_error ||
+ m_crawl_error;
+ });
+
+ // check for shutdown/blocklist/cancel
+ int r = 0;
+ if (m_peer_replayer.should_backoff(m_dir_root, &r)) {
+ dout(0) << ": backing off, shutdown/blocklist/cancel r=" << r << dendl;
+ if (r == -ECANCELED) {
+ set_datasync_error_unlocked(r);
+ dout(5) << ": snapshot data replayer, mirroring cancelled for syncm=" << this << dendl;
+ } else { //shutdown/blocklist
+ m_peer_replayer.mark_all_syncms_to_backoff_unlocked(r);
+ }
+ return false;
+ }
+ // predicate true
+ if (ready)
+ break;
+ // otherwise timeout occured, nothing to do - loop again
+ }
dout(20) << ": snapshot data replayer woke up to process m_syncm_dataq, syncm=" << this
<< " crawl_finished=" << m_crawl_finished << dendl;
if (m_datasync_error || m_crawl_error) {
smq_cv.notify_all();
}
+void PeerReplayer::mark_all_syncms_to_backoff_unlocked(int err) {
+ // caller holds the smq_lock and sdq_lock
+ ceph_assert(ceph_mutex_is_locked_by_me(smq_lock));
+ for (auto& syncm : syncm_q) {
+ ceph_assert(ceph_mutex_is_locked_by_me(syncm->get_sdq_lock()));
+ syncm->set_datasync_error_unlocked(err);
+ syncm->mark_backoff_unlocked();
+ }
+}
+
+void PeerReplayer::notify_all_syncms_to_backoff() {
+ // caller holds the smq_lock and sdq_lock
+ ceph_assert(ceph_mutex_is_locked_by_me(smq_lock));
+ if (get_active_datasync_threads() == 1) { //Last thread
+ for (auto& syncm : syncm_q) {
+ ceph_assert(ceph_mutex_is_locked_by_me(syncm->get_sdq_lock()));
+ // To wake up crawler thread
+ syncm->sdq_cv_notify_all_unlocked();
+ }
+ }
+ // To wake up other datasync threads to speed up exit
+ smq_cv.notify_all();
+}
+
void PeerReplayer::run_datasync(SnapshotDataSyncThread *data_replayer) {
dout(10) << ": snapshot datasync replayer=" << data_replayer << dendl;
{
std::unique_lock smq_l1(smq_lock);
std::unique_lock sdq_l1(syncm->get_sdq_lock());
+ // backoff ?
+ const bool syncm_backoff = syncm->get_backoff_unlocked();
+ int syncm_errno = syncm->get_datasync_errno_unlocked();
+ if (syncm_backoff && syncm_errno != -ECANCELED) {
+ notify_all_syncms_to_backoff(); //shutdown/blocklist
+ dout(5) << ": exiting snapshot data replayer=" << data_replayer
+ << " as client is blocklisted or mirroring is shutting down, error=" << syncm_errno << dendl;
+ break; // exit
+ }
const bool last_in_flight_syncm = syncm->get_in_flight_unlocked() == 1;
const bool crawl_finished = syncm->get_crawl_finished_unlocked();
const bool sync_error =
void mark_backoff_unlocked() {
m_backoff = true;
}
+ bool get_backoff_unlocked() {
+ return m_backoff;
+ }
bool get_datasync_error_unlocked() {
return m_datasync_error;
}
std::unique_lock lock(sdq_lock);
return m_datasync_errno;
}
+ int get_datasync_errno_unlocked() {
+ return m_datasync_errno;
+ }
bool get_crawl_error() {
std::unique_lock lock(sdq_lock);
return m_crawl_error;
return m_active_datasync_threads.load(std::memory_order_relaxed);
}
void mark_and_notify_syncms_to_backoff(int err);
+ void mark_all_syncms_to_backoff_unlocked(int err);
+ void notify_all_syncms_to_backoff();
boost::optional<std::string> pick_directory();
int register_directory(const std::string &dir_root, SnapshotReplayerThread *replayer);
// add syncm to syncm_q
void enqueue_syncm(const std::shared_ptr<SyncMechanism>& item);
+ ceph::mutex& get_smq_lock() {
+ return smq_lock;
+ }
};
} // namespace mirror