]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
tools/cephfs_mirror: Handle errors in crawler thread
authorKotresh HR <khiremat@redhat.com>
Sat, 21 Feb 2026 10:27:42 +0000 (15:57 +0530)
committerKotresh HR <khiremat@redhat.com>
Sun, 22 Feb 2026 18:56:35 +0000 (00:26 +0530)
Any error encountered in crawler threads should be
communicated to the data sync threads by marking the
crawl error in the corresponding syncm object. The
data sync threads would finish pending jobs, dequeue
the syncm object and notify crawler to bail out.

Fixes: https://tracker.ceph.com/issues/73452
Signed-off-by: Kotresh HR <khiremat@redhat.com>
src/tools/cephfs_mirror/PeerReplayer.cc
src/tools/cephfs_mirror/PeerReplayer.h

index af11c281a2659250d1edb390d45e5a2231e9f361..5fc8fe4cefb33478a98a6eea63591dd35f5eb1f2 100644 (file)
@@ -1325,12 +1325,12 @@ void PeerReplayer::SyncMechanism::push_dataq_entry(SyncEntry e) {
 bool PeerReplayer::SyncMechanism::pop_dataq_entry(SyncEntry &out_entry) {
   std::unique_lock lock(sdq_lock);
   dout(20) << ": snapshot data replayer waiting on m_sync_dataq, syncm=" << this << dendl;
-  sdq_cv.wait(lock, [this]{ return !m_sync_dataq.empty() || m_crawl_finished || m_datasync_error;});
+  sdq_cv.wait(lock, [this]{ return !m_sync_dataq.empty() || m_crawl_finished || m_datasync_error || m_crawl_error;});
   dout(20) << ": snapshot data replayer woke up to process m_syncm_dataq, syncm=" << this
            << " crawl_finished=" << m_crawl_finished << dendl;
-  if (m_datasync_error) {
+  if (m_datasync_error || m_crawl_error) {
      dout(20) << ": snapshot data replayer, datasync_error="<< m_datasync_error
-              << " syncm=" << this << dendl;
+              << " crawl_error=" << m_crawl_error << " syncm=" << this << dendl;
      return false;
   }
   if (m_sync_dataq.empty() && m_crawl_finished) {
@@ -1351,15 +1351,30 @@ bool PeerReplayer::SyncMechanism::has_pending_work() const {
   std::unique_lock lock(sdq_lock);
   const bool job_done =
     m_sync_dataq.empty() && m_crawl_finished;
+
+  /* On crawl error, return true even if the queue is empty to
+   *   - Dequeue the syncm object
+   *   - Notify the crawler as it waits after the error for pending jobs to finish.
+   */
+  if (m_crawl_error) {
+    // If in_flight > 0, those threads will take care of dequeue/notify, you just consume next job
+    if (m_in_flight > 0)
+      return false;
+    else
+      return true;
+  }
+
   // No more work if datasync failed or everything is done
   if (m_datasync_error || job_done)
     return false;
   return true;
 }
 
-void PeerReplayer::SyncMechanism::mark_crawl_finished() {
+void PeerReplayer::SyncMechanism::mark_crawl_finished(int ret) {
   std::unique_lock lock(sdq_lock);
   m_crawl_finished = true;
+  if (ret < 0)
+    m_crawl_error = true;
   sdq_cv.notify_all();
 }
 
@@ -1685,7 +1700,7 @@ int PeerReplayer::SnapDiffSync::get_changed_blocks(const std::string &epath,
   return r;
 }
 
-void PeerReplayer::SnapDiffSync::finish_crawl() {
+void PeerReplayer::SnapDiffSync::finish_crawl(int ret) {
   dout(20) << dendl;
 
   while (!m_sync_stack.empty()) {
@@ -1701,7 +1716,7 @@ void PeerReplayer::SnapDiffSync::finish_crawl() {
   }
 
   // Crawl and entry operations are done syncing here. So mark crawl finished here
-  mark_crawl_finished();
+  mark_crawl_finished(ret);
 }
 
 PeerReplayer::RemoteSync::RemoteSync(std::string_view dir_root,
@@ -1838,7 +1853,7 @@ int PeerReplayer::RemoteSync::get_entry(std::string *epath, struct ceph_statx *s
   return 0;
 }
 
-void PeerReplayer::RemoteSync::finish_crawl() {
+void PeerReplayer::RemoteSync::finish_crawl(int ret) {
   dout(20) << dendl;
 
   while (!m_sync_stack.empty()) {
@@ -1854,7 +1869,7 @@ void PeerReplayer::RemoteSync::finish_crawl() {
   }
 
   // Crawl and entry operations are done syncing here. So mark stack finished here
-  mark_crawl_finished();
+  mark_crawl_finished(ret);
 }
 
 int PeerReplayer::do_synchronize(const std::string &dir_root, const Snapshot &current,
@@ -1930,7 +1945,7 @@ int PeerReplayer::do_synchronize(const std::string &dir_root, const Snapshot &cu
     }
   }
 
-  syncm->finish_crawl();
+  syncm->finish_crawl(r);
 
   dout(20) << " cur:" << fh.c_fd
            << " prev:" << fh.p_fd
@@ -2257,17 +2272,24 @@ void PeerReplayer::run_datasync(SnapshotDataSyncThread *data_replayer) {
       dout(20) << ": snapshot data replayer woke up! syncm=" << syncm << dendl;
     }
 
-    // FHandles are not thread safe, so don't use FHandles from SyncMechanism, open them locally here.
+    /*  - FHandles are not thread safe, so don't use FHandles from SyncMechanism, open them locally here.
+     *  - On crawl error, don't open handles as other threads could have dequeued syncm and notified
+     *    crawler resulting in unregister of dir_root.
+     */
     int r = 0;
     FHandles fh;
-    bool handles_opened = true;
-    r = pre_sync_check_and_open_handles(std::string(syncm->get_m_dir_root()),
-                                       syncm->get_m_current(), syncm->get_m_prev(), &fh);
-    if (r < 0) {
-      dout(5) << ": open_handles failed, cannot proceed sync: " << cpp_strerror(r)
-              << " dir_root=" << syncm->get_m_dir_root() << "syncm=" << syncm << dendl;
-      syncm->set_datasync_error(r); //don't do continue, as it needs to go through dequeue/notify logic
-      handles_opened = false;
+    bool handles_opened = false;
+    const bool crawl_error = syncm->get_crawl_error();
+    if (!crawl_error) {
+      r = pre_sync_check_and_open_handles(std::string(syncm->get_m_dir_root()),
+                                          syncm->get_m_current(), syncm->get_m_prev(), &fh);
+      if (r < 0) {
+        dout(5) << ": open_handles failed, cannot proceed sync: " << cpp_strerror(r)
+                << " dir_root=" << syncm->get_m_dir_root() << "syncm=" << syncm << dendl;
+        syncm->set_datasync_error(r); //don't do continue, as it needs to go through dequeue/notify logic
+      } else {
+        handles_opened = true;
+      }
     }
 
     // Wait on data sync queue for entries to process
@@ -2317,7 +2339,8 @@ void PeerReplayer::run_datasync(SnapshotDataSyncThread *data_replayer) {
       const bool no_in_flight_syncm_jobs = syncm->get_in_flight_unlocked() == 0;
       const bool crawl_finished = syncm->get_crawl_finished_unlocked();
       const bool sync_error =
-        syncm->get_datasync_error_unlocked();
+        syncm->get_datasync_error_unlocked() ||
+        syncm->get_crawl_error_unlocked();
       if (!syncm_q.empty() && no_in_flight_syncm_jobs && (crawl_finished || sync_error)) {
         dout(20) << ": Dequeue syncm object=" << syncm << dendl;
         syncm->set_sync_finished_and_notify_unlocked(); // To wake up crawler thread waiting to take snapshot
index 681638096339845f7d899e615b94198ec1ad9a72..f60dc06d647ec5577ad1df6f8e5fd669a929e8a1 100644 (file)
@@ -202,12 +202,12 @@ private:
                                    const struct ceph_statx &stx, bool sync_check,
                                    const std::function<int (uint64_t, struct cblock *)> &callback);
 
-    virtual void finish_crawl() = 0;
+    virtual void finish_crawl(int ret) = 0;
 
     void push_dataq_entry(PeerReplayer::SyncEntry e);
     bool pop_dataq_entry(PeerReplayer::SyncEntry &out);
     bool has_pending_work() const;
-    void mark_crawl_finished();
+    void mark_crawl_finished(int ret);
     bool get_crawl_finished_unlocked() {
       return m_crawl_finished;
     }
@@ -229,6 +229,13 @@ private:
       std::unique_lock lock(sdq_lock);
       return m_datasync_errno;
     }
+    bool get_crawl_error() {
+      std::unique_lock lock(sdq_lock);
+      return m_crawl_error;
+    }
+    bool get_crawl_error_unlocked() {
+      return m_crawl_error;
+    }
     void dec_in_flight() {
       std::unique_lock lock(sdq_lock);
       --m_in_flight;
@@ -272,6 +279,7 @@ private:
     std::queue<PeerReplayer::SyncEntry> m_sync_dataq;
     int m_in_flight = 0;
     bool m_crawl_finished = false;
+    bool m_crawl_error = false;
     bool m_sync_done = false;
     bool m_datasync_error = false;
     int m_datasync_errno = 0;
@@ -293,7 +301,7 @@ private:
                   const std::function<int (const std::string&)> &dirsync_func,
                   const std::function<int (const std::string&)> &purge_func);
 
-    void finish_crawl();
+    void finish_crawl(int ret);
   };
 
   class SnapDiffSync : public SyncMechanism {
@@ -313,7 +321,7 @@ private:
                            const struct ceph_statx &stx, bool sync_check,
                            const std::function<int (uint64_t, struct cblock *)> &callback);
 
-    void finish_crawl();
+    void finish_crawl(int ret);
 
   private:
     int init_directory(const std::string &epath,