m_replayers.push_back(std::move(replayer));
}
+ //TODO: Have a separate tuneable for data sync threads
+ nr_replayers = g_ceph_context->_conf.get_val<uint64_t>(
+ "cephfs_mirror_max_concurrent_directory_syncs");
+ dout(20) << ": spawning " << nr_replayers << " snapshot data replayer(s)" << dendl;
+ while (nr_replayers-- > 0) {
+ std::unique_ptr<SnapshotDataSyncThread> data_replayer(
+ new SnapshotDataSyncThread(this));
+ std::string name("d_replayer-" + stringify(nr_replayers));
+ data_replayer->create(name.c_str());
+ m_data_replayers.push_back(std::move(data_replayer));
+ }
return 0;
}
}
}
+void PeerReplayer::run_datasync(SnapshotDataSyncThread *data_replayer) {
+ dout(10) << ": snapshot datasync replayer=" << data_replayer << dendl;
+
+ // TODO Do we need separate m_lock/m_cond for synchornization or can you use the same?
+
+ while (true) {
+ // TODO is_stopping and is_blocklisted
+
+ // TODO Wait and fetch syncm from SyncMechanism Queue
+
+ // TODO pre_sync and open handles
+
+ // TODO Wait and fetch files from syncm data queue and sync
+ }
+}
+
void PeerReplayer::peer_status(Formatter *f) {
std::scoped_lock locker(m_lock);
f->open_object_section("stats");
PeerReplayer *m_peer_replayer;
};
+ class SnapshotDataSyncThread : public Thread {
+ public:
+ SnapshotDataSyncThread(PeerReplayer *peer_replayer)
+ : m_peer_replayer(peer_replayer) {
+ }
+
+ void *entry() override {
+ m_peer_replayer->run_datasync(this);
+ return 0;
+ }
+
+ private:
+ PeerReplayer *m_peer_replayer;
+ };
+
struct DirRegistry {
int fd;
bool canceled = false;
}
typedef std::vector<std::unique_ptr<SnapshotReplayerThread>> SnapshotReplayers;
+ typedef std::vector<std::unique_ptr<SnapshotDataSyncThread>> SnapshotDataReplayers;
CephContext *m_cct;
FSMirror *m_fs_mirror;
bool m_stopping = false;
SnapshotReplayers m_replayers;
+ SnapshotDataReplayers m_data_replayers;
+
ServiceDaemonStats m_service_daemon_stats;
PerfCounters *m_perf_counters;
void run(SnapshotReplayerThread *replayer);
+ void run_datasync(SnapshotDataSyncThread *data_replayer);
boost::optional<std::string> pick_directory();
int register_directory(const std::string &dir_root, SnapshotReplayerThread *replayer);