From: Kotresh HR Date: Sat, 21 Feb 2026 13:51:02 +0000 (+0530) Subject: tools/cephfs_mirror: Fix assert while opening handles X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=95e9bb8d9a79ff1d58abaf683063837cc379cca1;p=ceph-ci.git tools/cephfs_mirror: Fix assert while opening handles Issue: When the crawler or a datasync thread encountered an error, it's possible that the crawler gets notified by a datasync thread and bails out resulting in the unregister of the particular dir_root. The other datasync threads might still hold the same syncm object and tries to open the handles during which the following assert is hit. ceph_assert(it != m_registered.end()); Cause: This happens because the in_flight counter in syncm object was tracking if it's processing the actual job from the data queue. Fix: Make in_flight counter in syncm object to track the active syncm object i.e, inrement as soon as the datasync thread get a reference to it and decrement when it goes out of reference. Fixes: https://tracker.ceph.com/issues/73452 Signed-off-by: Kotresh HR --- diff --git a/src/tools/cephfs_mirror/PeerReplayer.cc b/src/tools/cephfs_mirror/PeerReplayer.cc index 04759b1372a..ca9933c31b8 100644 --- a/src/tools/cephfs_mirror/PeerReplayer.cc +++ b/src/tools/cephfs_mirror/PeerReplayer.cc @@ -1341,7 +1341,6 @@ bool PeerReplayer::SyncMechanism::pop_dataq_entry(SyncEntry &out_entry) { out_entry = std::move(m_sync_dataq.front()); m_sync_dataq.pop(); - m_in_flight++; dout(10) << ": snapshot data replayer dataq popped" << " syncm=" << this << " epath=" << out_entry.epath << dendl; return true; @@ -1357,7 +1356,7 @@ bool PeerReplayer::SyncMechanism::has_pending_work() const { * - Notify the crawler as it waits after the error for pending jobs to finish. */ if (m_crawl_error) { - // If in_flight > 0, those threads will take care of dequeue/notify, you just consume next job + // If m_in_flight > 0, those threads will take care of dequeue/notify, you just consume next job if (m_in_flight > 0) return false; else @@ -1367,6 +1366,7 @@ bool PeerReplayer::SyncMechanism::has_pending_work() const { // No more work if datasync failed or everything is done if (m_datasync_error || job_done) return false; + return true; } @@ -2236,10 +2236,11 @@ void PeerReplayer::remove_syncm(const std::shared_ptr PeerReplayer::pick_next_syncm() const { +std::shared_ptr PeerReplayer::pick_next_syncm_and_mark() { // caller holds lock for (auto& syncm : syncm_q) { if (syncm->has_pending_work()) { + syncm->inc_in_flight(); return syncm; } } @@ -2271,7 +2272,7 @@ void PeerReplayer::run_datasync(SnapshotDataSyncThread *data_replayer) { { std::unique_lock lock(smq_lock); dout(20) << ": snapshot data replayer waiting for syncm to process" << dendl; - smq_cv.wait(lock, [this, &syncm] { syncm = pick_next_syncm(); return syncm != nullptr; }); + smq_cv.wait(lock, [this, &syncm] { syncm = pick_next_syncm_and_mark(); return syncm != nullptr; }); // syncm is gauranteed to be non-null because of the predicate used in above wait. dout(20) << ": snapshot data replayer woke up! syncm=" << syncm << dendl; } @@ -2307,7 +2308,7 @@ void PeerReplayer::run_datasync(SnapshotDataSyncThread *data_replayer) { if (r < 0) { dout(5) << ": should_sync_entry failed, cannot proceed sync: " << cpp_strerror(r) << " dir_root=" << syncm->get_m_dir_root() << " epath=" << entry.epath << dendl; - syncm->set_datasync_error_and_dec_in_flight(r); + syncm->set_datasync_error(r); break; } } @@ -2321,13 +2322,11 @@ void PeerReplayer::run_datasync(SnapshotDataSyncThread *data_replayer) { if (r < 0) { dout(5) << ": remote_file_op failed, cannot proceed sync: " << cpp_strerror(r) << " dir_root=" << syncm->get_m_dir_root() << " epath=" << entry.epath << dendl; - syncm->set_datasync_error_and_dec_in_flight(r); + syncm->set_datasync_error(r); break; } } dout(10) << ": done for epath=" << entry.epath << " syncm=" << syncm << dendl; - - syncm->dec_in_flight(); } // Close fds @@ -2340,12 +2339,12 @@ void PeerReplayer::run_datasync(SnapshotDataSyncThread *data_replayer) { { std::unique_lock smq_l1(smq_lock); std::unique_lock sdq_l1(syncm->get_sdq_lock()); - const bool no_in_flight_syncm_jobs = syncm->get_in_flight_unlocked() == 0; + const bool last_in_flight_syncm = syncm->get_in_flight_unlocked() == 1; const bool crawl_finished = syncm->get_crawl_finished_unlocked(); const bool sync_error = syncm->get_datasync_error_unlocked() || syncm->get_crawl_error_unlocked(); - if (!syncm_q.empty() && no_in_flight_syncm_jobs && (crawl_finished || sync_error)) { + if (!syncm_q.empty() && last_in_flight_syncm && (crawl_finished || sync_error)) { if (sync_error && !is_syncm_active(syncm)){ dout(20) << ": syncm object=" << syncm << " already dequeued" << dendl; } else { @@ -2361,7 +2360,7 @@ void PeerReplayer::run_datasync(SnapshotDataSyncThread *data_replayer) { } } } - + syncm->dec_in_flight(); //lock again to satify m_cond locker.lock(); } diff --git a/src/tools/cephfs_mirror/PeerReplayer.h b/src/tools/cephfs_mirror/PeerReplayer.h index 38282c2c261..d6f185fa25b 100644 --- a/src/tools/cephfs_mirror/PeerReplayer.h +++ b/src/tools/cephfs_mirror/PeerReplayer.h @@ -211,12 +211,6 @@ private: bool get_crawl_finished_unlocked() { return m_crawl_finished; } - void set_datasync_error_and_dec_in_flight(int err) { - std::unique_lock lock(sdq_lock); - m_datasync_error = true; - m_datasync_errno = err; - --m_in_flight; - } void set_datasync_error(int err) { std::unique_lock lock(sdq_lock); m_datasync_error = true; @@ -236,6 +230,10 @@ private: bool get_crawl_error_unlocked() { return m_crawl_error; } + void inc_in_flight() { + std::unique_lock lock(sdq_lock); + ++m_in_flight; + } void dec_in_flight() { std::unique_lock lock(sdq_lock); --m_in_flight; @@ -490,7 +488,7 @@ private: void run_datasync(SnapshotDataSyncThread *data_replayer); void remove_syncm(const std::shared_ptr& syncm_obj); bool is_syncm_active(const std::shared_ptr& syncm_obj); - std::shared_ptr pick_next_syncm() const; + std::shared_ptr pick_next_syncm_and_mark(); boost::optional pick_directory(); int register_directory(const std::string &dir_root, SnapshotReplayerThread *replayer);