PeerReplayer *m_peer_replayer;
};
+ class SnapshotDataSyncThreadGuard {
+ public:
+ explicit SnapshotDataSyncThreadGuard(PeerReplayer *peer_replayer)
+ : m_peer_replayer(peer_replayer) {
+ m_peer_replayer->m_active_datasync_threads.fetch_add(1, std::memory_order_relaxed);
+ }
+
+ ~SnapshotDataSyncThreadGuard() {
+ m_peer_replayer->m_active_datasync_threads.fetch_sub(1, std::memory_order_relaxed);
+ }
+
+ SnapshotDataSyncThreadGuard(const SnapshotDataSyncThreadGuard&) = delete;
+ SnapshotDataSyncThreadGuard& operator=(const SnapshotDataSyncThreadGuard&) = delete;
+
+ private:
+ PeerReplayer* m_peer_replayer;
+ };
+
class SnapshotDataSyncThread : public Thread {
public:
SnapshotDataSyncThread(PeerReplayer *peer_replayer)
}
void *entry() override {
+ SnapshotDataSyncThreadGuard guard(m_peer_replayer); //active thread counter
m_peer_replayer->run_datasync(this);
return 0;
}
SnapshotReplayers m_replayers;
SnapshotDataReplayers m_data_replayers;
+ std::atomic<int> m_active_datasync_threads{0};
ceph::mutex smq_lock;
ceph::condition_variable smq_cv;
void remove_syncm(const std::shared_ptr<SyncMechanism>& syncm_obj);
bool is_syncm_active(const std::shared_ptr<SyncMechanism>& syncm_obj);
std::shared_ptr<SyncMechanism> pick_next_syncm() const;
+ int get_active_datasync_threads() const {
+ return m_active_datasync_threads.load(std::memory_order_relaxed);
+ }
boost::optional<std::string> pick_directory();
int register_directory(const std::string &dir_root, SnapshotReplayerThread *replayer);