]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
tools/cephfs_mirror: Fix assert while opening handles
authorKotresh HR <khiremat@redhat.com>
Sat, 21 Feb 2026 13:51:02 +0000 (19:21 +0530)
committerKotresh HR <khiremat@redhat.com>
Sat, 21 Feb 2026 13:59:26 +0000 (19:29 +0530)
Issue:
When the crawler or a datasync thread encountered an error,
it's possible that the crawler gets notified by a datasync
thread and bails out resulting in the unregister of the
particular dir_root. The other datasync threads might
still hold the same syncm object and tries to open the
handles during which the following assert is hit.

ceph_assert(it != m_registered.end());

Cause:
This happens because the in_flight counter in syncm object
was tracking if it's processing the actual job from the data
queue.

Fix:
Make in_flight counter in syncm object to track the active
syncm object i.e, inrement as soon as the datasync thread
get a reference to it and decrement when it goes out of
reference.

Fixes: https://tracker.ceph.com/issues/73452
Signed-off-by: Kotresh HR <khiremat@redhat.com>
src/tools/cephfs_mirror/PeerReplayer.cc
src/tools/cephfs_mirror/PeerReplayer.h

index 04759b1372a27faa632840d56ced11d850745a3f..ca9933c31b840e765c8fbc24033e17f533713720 100644 (file)
@@ -1341,7 +1341,6 @@ bool PeerReplayer::SyncMechanism::pop_dataq_entry(SyncEntry &out_entry) {
 
   out_entry = std::move(m_sync_dataq.front());
   m_sync_dataq.pop();
-  m_in_flight++;
   dout(10) << ": snapshot data replayer dataq popped" << " syncm=" << this
           << " epath=" << out_entry.epath << dendl;
   return true;
@@ -1357,7 +1356,7 @@ bool PeerReplayer::SyncMechanism::has_pending_work() const {
    *   - Notify the crawler as it waits after the error for pending jobs to finish.
    */
   if (m_crawl_error) {
-    // If in_flight > 0, those threads will take care of dequeue/notify, you just consume next job
+    // If m_in_flight > 0, those threads will take care of dequeue/notify, you just consume next job
     if (m_in_flight > 0)
       return false;
     else
@@ -1367,6 +1366,7 @@ bool PeerReplayer::SyncMechanism::has_pending_work() const {
   // No more work if datasync failed or everything is done
   if (m_datasync_error || job_done)
     return false;
+
   return true;
 }
 
@@ -2236,10 +2236,11 @@ void PeerReplayer::remove_syncm(const std::shared_ptr<PeerReplayer::SyncMechanis
  * large file, the rest of data sync threads could start consuming the next syncm job
  * instead of being idle waiting for the last file to be synced from present syncm job.
  */
-std::shared_ptr<PeerReplayer::SyncMechanism> PeerReplayer::pick_next_syncm() const {
+std::shared_ptr<PeerReplayer::SyncMechanism> PeerReplayer::pick_next_syncm_and_mark() {
   // caller holds lock
   for (auto& syncm : syncm_q) {
     if (syncm->has_pending_work()) {
+      syncm->inc_in_flight();
       return syncm;
     }
   }
@@ -2271,7 +2272,7 @@ void PeerReplayer::run_datasync(SnapshotDataSyncThread *data_replayer) {
     {
       std::unique_lock lock(smq_lock);
       dout(20) << ": snapshot data replayer waiting for syncm to process" << dendl;
-      smq_cv.wait(lock, [this, &syncm] { syncm = pick_next_syncm(); return syncm != nullptr; });
+      smq_cv.wait(lock, [this, &syncm] { syncm = pick_next_syncm_and_mark(); return syncm != nullptr; });
       // syncm is gauranteed to be non-null because of the predicate used in above wait.
       dout(20) << ": snapshot data replayer woke up! syncm=" << syncm << dendl;
     }
@@ -2307,7 +2308,7 @@ void PeerReplayer::run_datasync(SnapshotDataSyncThread *data_replayer) {
         if (r < 0) {
           dout(5) << ": should_sync_entry failed, cannot proceed sync: " << cpp_strerror(r)
                   << " dir_root=" << syncm->get_m_dir_root() << " epath=" << entry.epath << dendl;
-         syncm->set_datasync_error_and_dec_in_flight(r);
+          syncm->set_datasync_error(r);
           break;
         }
       }
@@ -2321,13 +2322,11 @@ void PeerReplayer::run_datasync(SnapshotDataSyncThread *data_replayer) {
         if (r < 0) {
           dout(5) << ": remote_file_op failed, cannot proceed sync: " << cpp_strerror(r)
                   << " dir_root=" << syncm->get_m_dir_root() << " epath=" << entry.epath << dendl;
-         syncm->set_datasync_error_and_dec_in_flight(r);
+          syncm->set_datasync_error(r);
           break;
         }
       }
       dout(10) << ": done for epath=" << entry.epath << " syncm=" << syncm << dendl;
-
-      syncm->dec_in_flight();
     }
 
     // Close fds
@@ -2340,12 +2339,12 @@ void PeerReplayer::run_datasync(SnapshotDataSyncThread *data_replayer) {
     {
       std::unique_lock smq_l1(smq_lock);
       std::unique_lock sdq_l1(syncm->get_sdq_lock());
-      const bool no_in_flight_syncm_jobs = syncm->get_in_flight_unlocked() == 0;
+      const bool last_in_flight_syncm = syncm->get_in_flight_unlocked() == 1;
       const bool crawl_finished = syncm->get_crawl_finished_unlocked();
       const bool sync_error =
         syncm->get_datasync_error_unlocked() ||
         syncm->get_crawl_error_unlocked();
-      if (!syncm_q.empty() && no_in_flight_syncm_jobs && (crawl_finished || sync_error)) {
+      if (!syncm_q.empty() && last_in_flight_syncm && (crawl_finished || sync_error)) {
         if (sync_error && !is_syncm_active(syncm)){
           dout(20) << ": syncm object=" << syncm << " already dequeued" << dendl;
         } else {
@@ -2361,7 +2360,7 @@ void PeerReplayer::run_datasync(SnapshotDataSyncThread *data_replayer) {
         }
       }
     }
-
+    syncm->dec_in_flight();
     //lock again to satify m_cond
     locker.lock();
   }
index 38282c2c261e01f5c4ebbd16893b51acebca06af..d6f185fa25b14e8415fc16d020942296d6694db4 100644 (file)
@@ -211,12 +211,6 @@ private:
     bool get_crawl_finished_unlocked() {
       return m_crawl_finished;
     }
-    void set_datasync_error_and_dec_in_flight(int err) {
-      std::unique_lock lock(sdq_lock);
-      m_datasync_error = true;
-      m_datasync_errno = err;
-      --m_in_flight;
-    }
     void set_datasync_error(int err) {
       std::unique_lock lock(sdq_lock);
       m_datasync_error = true;
@@ -236,6 +230,10 @@ private:
     bool get_crawl_error_unlocked() {
       return m_crawl_error;
     }
+    void inc_in_flight() {
+      std::unique_lock lock(sdq_lock);
+      ++m_in_flight;
+    }
     void dec_in_flight() {
       std::unique_lock lock(sdq_lock);
       --m_in_flight;
@@ -490,7 +488,7 @@ private:
   void run_datasync(SnapshotDataSyncThread *data_replayer);
   void remove_syncm(const std::shared_ptr<SyncMechanism>& syncm_obj);
   bool is_syncm_active(const std::shared_ptr<SyncMechanism>& syncm_obj);
-  std::shared_ptr<SyncMechanism> pick_next_syncm() const;
+  std::shared_ptr<SyncMechanism> pick_next_syncm_and_mark();
 
   boost::optional<std::string> pick_directory();
   int register_directory(const std::string &dir_root, SnapshotReplayerThread *replayer);