void PeerReplayer::enqueue_syncm(const std::shared_ptr<SyncMechanism>& item) {
dout(20) << ": Enqueue syncm object=" << item << dendl;
std::lock_guard lock(smq_lock);
- syncm_q.push(item);
+ syncm_q.push_back(item);
smq_cv.notify_all();
}
dout(20) << ": snapshot data replayer woke up to process m_syncm_dataq, syncm=" << this
<< " crawl_finished=" << m_crawl_finished << dendl;
if (m_sync_dataq.empty() && m_crawl_finished) {
- dout(20) << ": snapshot data replayer dataq_empty and crawl finished - waiting for other"
- << " inflight processing threads to finish!!!" << dendl;
- sdq_cv.wait(lock, [this]{ return m_in_flight == 0;});
+ dout(20) << ": snapshot data replayer - finished processing syncm=" << this
+ << " Proceed with next syncm job " << dendl;
return false; // no more work
}
return true;
}
+bool PeerReplayer::SyncMechanism::has_pending_work() const {
+ std::unique_lock lock(sdq_lock);
+ if (m_sync_dataq.empty() && m_crawl_finished)
+ return false;
+ return true;
+}
+
void PeerReplayer::SyncMechanism::mark_crawl_finished() {
std::unique_lock lock(sdq_lock);
m_crawl_finished = true;
}
}
+void PeerReplayer::remove_syncm(const std::shared_ptr<PeerReplayer::SyncMechanism>& syncm_obj)
+{
+ // caller holds lock
+ auto it = std::find(syncm_q.begin(), syncm_q.end(), syncm_obj);
+ if (it != syncm_q.end()) {
+ syncm_q.erase(it);
+ }
+}
+
+/* The data sync threads should consume the next syncm job if the present syncm has no
+ * pending work. This can evidently happen if the last file being synced in the present
+ * syncm job is a large file. In this case, one data sync thread is busy syncing the
+ * large file, the rest of data sync threads could start consuming the next syncm job
+ * instead of being idle waiting for the last file to be synced from present syncm job.
+ */
+std::shared_ptr<PeerReplayer::SyncMechanism> PeerReplayer::pick_next_syncm() const {
+ // caller holds lock
+ for (auto& syncm : syncm_q) {
+ if (syncm->has_pending_work()) {
+ return syncm;
+ }
+ }
+ return nullptr;
+}
+
void PeerReplayer::run_datasync(SnapshotDataSyncThread *data_replayer) {
dout(10) << ": snapshot datasync replayer=" << data_replayer << dendl;
{
std::unique_lock lock(smq_lock);
dout(20) << ": snapshot data replayer waiting for syncm to process" << dendl;
- smq_cv.wait(lock, [this]{return !syncm_q.empty();});
- syncm = syncm_q.front();
+ smq_cv.wait(lock, [this, &syncm] { syncm = pick_next_syncm(); return syncm != nullptr; });
+ // syncm is gauranteed to be non-null because of the predicate used in above wait.
dout(20) << ": snapshot data replayer woke up! syncm=" << syncm << dendl;
}
{
std::unique_lock smq_l1(smq_lock);
std::unique_lock sdq_l1(syncm->get_sdq_lock());
- if (!syncm_q.empty() && syncm_q.front() == syncm
- && syncm->get_in_flight_unlocked() == 0
- && syncm->get_crawl_finished_unlocked() == true) {
+ if (!syncm_q.empty() &&
+ syncm->get_in_flight_unlocked() == 0 &&
+ syncm->get_crawl_finished_unlocked() == true) {
dout(20) << ": Dequeue syncm object=" << syncm << dendl;
syncm->set_sync_finished_and_notify_unlocked(); // To wake up crawler thread waiting to take snapshot
- syncm_q.pop();
+ if (syncm_q.front() == syncm) {
+ syncm_q.pop_front();
+ } else { // if syncms in the middle finishes first
+ remove_syncm(syncm);
+ }
+ dout(20) << ": syncm_q after removal " << syncm_q << dendl;
smq_cv.notify_all();
}
}
void push_dataq_entry(PeerReplayer::SyncEntry e);
bool pop_dataq_entry(PeerReplayer::SyncEntry &out);
+ bool has_pending_work() const;
void mark_crawl_finished();
bool get_crawl_finished_unlocked() {
return m_crawl_finished;
void dec_in_flight() {
std::unique_lock lock(sdq_lock);
--m_in_flight;
- /* If the crawler is done (m_crawl_finished = true) and m_sync_dataq
- * is empty, threads will block until other pending threads which are syncing
- * the entries picked up from queue are completed. So make sure to wake them
- * up when the processing is complete. This is to avoid the busy loop of jobless
- * data sync threads.
- */
- if (m_in_flight == 0)
- sdq_cv.notify_all();
}
int get_in_flight_unlocked() {
return m_in_flight;
boost::optional<Snapshot> m_prev;
std::stack<PeerReplayer::SyncEntry> m_sync_stack;
- ceph::mutex sdq_lock;
+ mutable ceph::mutex sdq_lock;
ceph::condition_variable sdq_cv;
std::queue<PeerReplayer::SyncEntry> m_sync_dataq;
int m_in_flight = 0;
ceph::mutex smq_lock;
ceph::condition_variable smq_cv;
- std::queue<std::shared_ptr<SyncMechanism>> syncm_q;
+ std::deque<std::shared_ptr<SyncMechanism>> syncm_q;
ServiceDaemonStats m_service_daemon_stats;
void run(SnapshotReplayerThread *replayer);
void run_datasync(SnapshotDataSyncThread *data_replayer);
+ void remove_syncm(const std::shared_ptr<SyncMechanism>& syncm_obj);
+ std::shared_ptr<SyncMechanism> pick_next_syncm() const;
boost::optional<std::string> pick_directory();
int register_directory(const std::string &dir_root, SnapshotReplayerThread *replayer);