out_entry = std::move(m_sync_dataq.front());
m_sync_dataq.pop();
- m_in_flight++;
dout(10) << ": snapshot data replayer dataq popped" << " syncm=" << this
<< " epath=" << out_entry.epath << dendl;
return true;
* - Notify the crawler as it waits after the error for pending jobs to finish.
*/
if (m_crawl_error) {
- // If in_flight > 0, those threads will take care of dequeue/notify, you just consume next job
+ // If m_in_flight > 0, those threads will take care of dequeue/notify, you just consume next job
if (m_in_flight > 0)
return false;
else
// No more work if datasync failed or everything is done
if (m_datasync_error || job_done)
return false;
+
return true;
}
* large file, the rest of data sync threads could start consuming the next syncm job
* instead of being idle waiting for the last file to be synced from present syncm job.
*/
-std::shared_ptr<PeerReplayer::SyncMechanism> PeerReplayer::pick_next_syncm() const {
+std::shared_ptr<PeerReplayer::SyncMechanism> PeerReplayer::pick_next_syncm_and_mark() {
// caller holds lock
for (auto& syncm : syncm_q) {
if (syncm->has_pending_work()) {
+ syncm->inc_in_flight();
return syncm;
}
}
{
std::unique_lock lock(smq_lock);
dout(20) << ": snapshot data replayer waiting for syncm to process" << dendl;
- smq_cv.wait(lock, [this, &syncm] { syncm = pick_next_syncm(); return syncm != nullptr; });
+ smq_cv.wait(lock, [this, &syncm] { syncm = pick_next_syncm_and_mark(); return syncm != nullptr; });
// syncm is gauranteed to be non-null because of the predicate used in above wait.
dout(20) << ": snapshot data replayer woke up! syncm=" << syncm << dendl;
}
if (r < 0) {
dout(5) << ": should_sync_entry failed, cannot proceed sync: " << cpp_strerror(r)
<< " dir_root=" << syncm->get_m_dir_root() << " epath=" << entry.epath << dendl;
- syncm->set_datasync_error_and_dec_in_flight(r);
+ syncm->set_datasync_error(r);
break;
}
}
if (r < 0) {
dout(5) << ": remote_file_op failed, cannot proceed sync: " << cpp_strerror(r)
<< " dir_root=" << syncm->get_m_dir_root() << " epath=" << entry.epath << dendl;
- syncm->set_datasync_error_and_dec_in_flight(r);
+ syncm->set_datasync_error(r);
break;
}
}
dout(10) << ": done for epath=" << entry.epath << " syncm=" << syncm << dendl;
-
- syncm->dec_in_flight();
}
// Close fds
{
std::unique_lock smq_l1(smq_lock);
std::unique_lock sdq_l1(syncm->get_sdq_lock());
- const bool no_in_flight_syncm_jobs = syncm->get_in_flight_unlocked() == 0;
+ const bool last_in_flight_syncm = syncm->get_in_flight_unlocked() == 1;
const bool crawl_finished = syncm->get_crawl_finished_unlocked();
const bool sync_error =
syncm->get_datasync_error_unlocked() ||
syncm->get_crawl_error_unlocked();
- if (!syncm_q.empty() && no_in_flight_syncm_jobs && (crawl_finished || sync_error)) {
+ if (!syncm_q.empty() && last_in_flight_syncm && (crawl_finished || sync_error)) {
if (sync_error && !is_syncm_active(syncm)){
dout(20) << ": syncm object=" << syncm << " already dequeued" << dendl;
} else {
}
}
}
-
+ syncm->dec_in_flight();
//lock again to satify m_cond
locker.lock();
}
bool get_crawl_finished_unlocked() {
return m_crawl_finished;
}
- void set_datasync_error_and_dec_in_flight(int err) {
- std::unique_lock lock(sdq_lock);
- m_datasync_error = true;
- m_datasync_errno = err;
- --m_in_flight;
- }
void set_datasync_error(int err) {
std::unique_lock lock(sdq_lock);
m_datasync_error = true;
bool get_crawl_error_unlocked() {
return m_crawl_error;
}
+ void inc_in_flight() {
+ std::unique_lock lock(sdq_lock);
+ ++m_in_flight;
+ }
void dec_in_flight() {
std::unique_lock lock(sdq_lock);
--m_in_flight;
void run_datasync(SnapshotDataSyncThread *data_replayer);
void remove_syncm(const std::shared_ptr<SyncMechanism>& syncm_obj);
bool is_syncm_active(const std::shared_ptr<SyncMechanism>& syncm_obj);
- std::shared_ptr<SyncMechanism> pick_next_syncm() const;
+ std::shared_ptr<SyncMechanism> pick_next_syncm_and_mark();
boost::optional<std::string> pick_directory();
int register_directory(const std::string &dir_root, SnapshotReplayerThread *replayer);