m_fh(fh),
m_peer(peer),
m_current(current),
- m_prev(prev) {
+ m_prev(prev),
+ sdq_lock(ceph::make_mutex("cephfs::mirror::PeerReplayer::SyncMechanism" + stringify(peer.uuid))) {
}
PeerReplayer::SyncMechanism::~SyncMechanism() {
}
+void PeerReplayer::SyncMechanism::push_dataq_entry(SyncEntry e) {
+ dout(10) << ": snapshot data replayer dataq pushed" << " syncm=" << this
+ << " epath=" << e.epath << dendl;
+ std::unique_lock lock(sdq_lock);
+ m_sync_dataq.push(std::move(e));
+ sdq_cv.notify_all();
+}
+
+bool PeerReplayer::SyncMechanism::pop_dataq_entry(SyncEntry &out_entry) {
+ std::unique_lock lock(sdq_lock);
+ dout(20) << ": snapshot data replayer waiting on m_sync_dataq, syncm=" << this << dendl;
+ sdq_cv.wait(lock, [this]{ return !m_sync_dataq.empty() || m_crawl_finished;});
+ dout(20) << ": snapshot data replayer woke up to process m_syncm_dataq, syncm=" << this << dendl;
+ if (m_sync_dataq.empty() && m_crawl_finished)
+ return false; // no more work
+
+ out_entry = std::move(m_sync_dataq.front());
+ m_sync_dataq.pop();
+ dout(10) << ": snapshot data replayer dataq popped" << " syncm=" << this
+ << " epath=" << out_entry.epath << dendl;
+ return true;
+}
+
+void PeerReplayer::SyncMechanism::mark_crawl_finished() {
+ std::unique_lock lock(sdq_lock);
+ m_crawl_finished = true;
+ sdq_cv.notify_all();
+}
+
int PeerReplayer::SyncMechanism::get_changed_blocks(const std::string &epath,
const struct ceph_statx &stx, bool sync_check,
const std::function<int (uint64_t, struct cblock *)> &callback) {
// TODO pre_sync and open handles
- // TODO Wait and fetch files from syncm data queue and sync
+ // Wait on data sync queue for entries to process
+ SyncEntry entry;
+ while (syncm->pop_dataq_entry(entry)) {
+ //TODO Process entry
+ }
// Dequeue syncm object after processing
{
virtual void finish_sync() = 0;
+ void push_dataq_entry(PeerReplayer::SyncEntry e);
+ bool pop_dataq_entry(PeerReplayer::SyncEntry &out);
+ void mark_crawl_finished();
+
protected:
MountRef m_local;
MountRef m_remote;
Snapshot m_current;
boost::optional<Snapshot> m_prev;
std::stack<PeerReplayer::SyncEntry> m_sync_stack;
+
+ ceph::mutex sdq_lock;
+ ceph::condition_variable sdq_cv;
+ std::queue<PeerReplayer::SyncEntry> m_sync_dataq;
+ bool m_crawl_finished = false;
};
class RemoteSync : public SyncMechanism {