std::unique_lock lock(sdq_lock);
dout(20) << ": snapshot data replayer waiting on m_sync_dataq, syncm=" << this << dendl;
sdq_cv.wait(lock, [this]{ return !m_sync_dataq.empty() || m_sync_crawl_finished;});
- dout(20) << ": snapshot data replayer woke up to process m_syncm_dataq, syncm=" << this << dendl;
- if (m_sync_dataq.empty() && m_sync_crawl_finished)
+ dout(20) << ": snapshot data replayer woke up to process m_syncm_dataq, syncm=" << this
+ << " crawl_finished=" << m_sync_crawl_finished << dendl;
+ if (m_sync_dataq.empty() && m_sync_crawl_finished) {
+ dout(20) << ": snapshot data replayer dataq_empty and crawl finished - waiting for other"
+ << " inflight processing threads to finish!!!" << dendl;
+ sdq_cv.wait(lock, [this]{ return m_in_flight == 0;});
return false; // no more work
+ }
out_entry = std::move(m_sync_dataq.front());
m_sync_dataq.pop();
+ m_in_flight++;
dout(10) << ": snapshot data replayer dataq popped" << " syncm=" << this
<< " epath=" << out_entry.epath << dendl;
return true;
SyncEntry entry;
while (syncm->pop_dataq_entry(entry)) {
//TODO Process entry
+ syncm->dec_in_flight();
}
// Dequeue syncm object after processing
{
- std::unique_lock lock(smq_lock);
- if (!syncm_q.empty() && syncm_q.front() == syncm) {
+ std::unique_lock smq_l1(smq_lock);
+ std::unique_lock sdq_l1(syncm->get_sdq_lock());
+ if (!syncm_q.empty() && syncm_q.front() == syncm
+ && syncm->get_in_flight_unlocked() == 0
+ && syncm->get_crawl_finished_unlocked() == true) {
dout(20) << ": Dequeue syncm object=" << syncm << dendl;
syncm_q.pop();
smq_cv.notify_all();
void push_dataq_entry(PeerReplayer::SyncEntry e);
bool pop_dataq_entry(PeerReplayer::SyncEntry &out);
void mark_crawl_finished();
+ bool get_crawl_finished_unlocked() {
+ return m_sync_crawl_finished;
+ }
+ void dec_in_flight() {
+ std::unique_lock lock(sdq_lock);
+ --m_in_flight;
+ /* If the crawler is done (m_sync_crawl_finished = true) and m_sync_dataq
+ * is empty, threads will block until other pending threads which are syncing
+ * the entries picked up from queue are completed. So make sure to wake them
+ * up when the processing is complete. This is to avoid the busy loop of jobless
+ * data sync threads.
+ */
+ if (m_in_flight == 0)
+ sdq_cv.notify_all();
+ }
+ int get_in_flight_unlocked() {
+ return m_in_flight;
+ }
+ ceph::mutex& get_sdq_lock() {
+ return sdq_lock;
+ }
int remote_mkdir(const std::string &epath, const struct ceph_statx &stx);
protected:
ceph::mutex sdq_lock;
ceph::condition_variable sdq_cv;
std::queue<PeerReplayer::SyncEntry> m_sync_dataq;
+ int m_in_flight = 0;
bool m_sync_crawl_finished = false;
};