From: Kotresh HR Date: Sat, 21 Feb 2026 10:27:42 +0000 (+0530) Subject: tools/cephfs_mirror: Handle errors in crawler thread X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=b1c5ec04a1268f52f4de419ab0cb2dc67300fd53;p=ceph.git tools/cephfs_mirror: Handle errors in crawler thread Any error encountered in crawler threads should be communicated to the data sync threads by marking the crawl error in the corresponding syncm object. The data sync threads would finish pending jobs, dequeue the syncm object and notify crawler to bail out. Fixes: https://tracker.ceph.com/issues/73452 Signed-off-by: Kotresh HR --- diff --git a/src/tools/cephfs_mirror/PeerReplayer.cc b/src/tools/cephfs_mirror/PeerReplayer.cc index af11c281a26..5fc8fe4cefb 100644 --- a/src/tools/cephfs_mirror/PeerReplayer.cc +++ b/src/tools/cephfs_mirror/PeerReplayer.cc @@ -1325,12 +1325,12 @@ void PeerReplayer::SyncMechanism::push_dataq_entry(SyncEntry e) { bool PeerReplayer::SyncMechanism::pop_dataq_entry(SyncEntry &out_entry) { std::unique_lock lock(sdq_lock); dout(20) << ": snapshot data replayer waiting on m_sync_dataq, syncm=" << this << dendl; - sdq_cv.wait(lock, [this]{ return !m_sync_dataq.empty() || m_crawl_finished || m_datasync_error;}); + sdq_cv.wait(lock, [this]{ return !m_sync_dataq.empty() || m_crawl_finished || m_datasync_error || m_crawl_error;}); dout(20) << ": snapshot data replayer woke up to process m_syncm_dataq, syncm=" << this << " crawl_finished=" << m_crawl_finished << dendl; - if (m_datasync_error) { + if (m_datasync_error || m_crawl_error) { dout(20) << ": snapshot data replayer, datasync_error="<< m_datasync_error - << " syncm=" << this << dendl; + << " crawl_error=" << m_crawl_error << " syncm=" << this << dendl; return false; } if (m_sync_dataq.empty() && m_crawl_finished) { @@ -1351,15 +1351,30 @@ bool PeerReplayer::SyncMechanism::has_pending_work() const { std::unique_lock lock(sdq_lock); const bool job_done = m_sync_dataq.empty() && m_crawl_finished; + + /* On crawl error, return true even if the queue is empty to + * - Dequeue the syncm object + * - Notify the crawler as it waits after the error for pending jobs to finish. + */ + if (m_crawl_error) { + // If in_flight > 0, those threads will take care of dequeue/notify, you just consume next job + if (m_in_flight > 0) + return false; + else + return true; + } + // No more work if datasync failed or everything is done if (m_datasync_error || job_done) return false; return true; } -void PeerReplayer::SyncMechanism::mark_crawl_finished() { +void PeerReplayer::SyncMechanism::mark_crawl_finished(int ret) { std::unique_lock lock(sdq_lock); m_crawl_finished = true; + if (ret < 0) + m_crawl_error = true; sdq_cv.notify_all(); } @@ -1685,7 +1700,7 @@ int PeerReplayer::SnapDiffSync::get_changed_blocks(const std::string &epath, return r; } -void PeerReplayer::SnapDiffSync::finish_crawl() { +void PeerReplayer::SnapDiffSync::finish_crawl(int ret) { dout(20) << dendl; while (!m_sync_stack.empty()) { @@ -1701,7 +1716,7 @@ void PeerReplayer::SnapDiffSync::finish_crawl() { } // Crawl and entry operations are done syncing here. So mark crawl finished here - mark_crawl_finished(); + mark_crawl_finished(ret); } PeerReplayer::RemoteSync::RemoteSync(std::string_view dir_root, @@ -1838,7 +1853,7 @@ int PeerReplayer::RemoteSync::get_entry(std::string *epath, struct ceph_statx *s return 0; } -void PeerReplayer::RemoteSync::finish_crawl() { +void PeerReplayer::RemoteSync::finish_crawl(int ret) { dout(20) << dendl; while (!m_sync_stack.empty()) { @@ -1854,7 +1869,7 @@ void PeerReplayer::RemoteSync::finish_crawl() { } // Crawl and entry operations are done syncing here. So mark stack finished here - mark_crawl_finished(); + mark_crawl_finished(ret); } int PeerReplayer::do_synchronize(const std::string &dir_root, const Snapshot ¤t, @@ -1930,7 +1945,7 @@ int PeerReplayer::do_synchronize(const std::string &dir_root, const Snapshot &cu } } - syncm->finish_crawl(); + syncm->finish_crawl(r); dout(20) << " cur:" << fh.c_fd << " prev:" << fh.p_fd @@ -2257,17 +2272,24 @@ void PeerReplayer::run_datasync(SnapshotDataSyncThread *data_replayer) { dout(20) << ": snapshot data replayer woke up! syncm=" << syncm << dendl; } - // FHandles are not thread safe, so don't use FHandles from SyncMechanism, open them locally here. + /* - FHandles are not thread safe, so don't use FHandles from SyncMechanism, open them locally here. + * - On crawl error, don't open handles as other threads could have dequeued syncm and notified + * crawler resulting in unregister of dir_root. + */ int r = 0; FHandles fh; - bool handles_opened = true; - r = pre_sync_check_and_open_handles(std::string(syncm->get_m_dir_root()), - syncm->get_m_current(), syncm->get_m_prev(), &fh); - if (r < 0) { - dout(5) << ": open_handles failed, cannot proceed sync: " << cpp_strerror(r) - << " dir_root=" << syncm->get_m_dir_root() << "syncm=" << syncm << dendl; - syncm->set_datasync_error(r); //don't do continue, as it needs to go through dequeue/notify logic - handles_opened = false; + bool handles_opened = false; + const bool crawl_error = syncm->get_crawl_error(); + if (!crawl_error) { + r = pre_sync_check_and_open_handles(std::string(syncm->get_m_dir_root()), + syncm->get_m_current(), syncm->get_m_prev(), &fh); + if (r < 0) { + dout(5) << ": open_handles failed, cannot proceed sync: " << cpp_strerror(r) + << " dir_root=" << syncm->get_m_dir_root() << "syncm=" << syncm << dendl; + syncm->set_datasync_error(r); //don't do continue, as it needs to go through dequeue/notify logic + } else { + handles_opened = true; + } } // Wait on data sync queue for entries to process @@ -2317,7 +2339,8 @@ void PeerReplayer::run_datasync(SnapshotDataSyncThread *data_replayer) { const bool no_in_flight_syncm_jobs = syncm->get_in_flight_unlocked() == 0; const bool crawl_finished = syncm->get_crawl_finished_unlocked(); const bool sync_error = - syncm->get_datasync_error_unlocked(); + syncm->get_datasync_error_unlocked() || + syncm->get_crawl_error_unlocked(); if (!syncm_q.empty() && no_in_flight_syncm_jobs && (crawl_finished || sync_error)) { dout(20) << ": Dequeue syncm object=" << syncm << dendl; syncm->set_sync_finished_and_notify_unlocked(); // To wake up crawler thread waiting to take snapshot diff --git a/src/tools/cephfs_mirror/PeerReplayer.h b/src/tools/cephfs_mirror/PeerReplayer.h index 68163809633..f60dc06d647 100644 --- a/src/tools/cephfs_mirror/PeerReplayer.h +++ b/src/tools/cephfs_mirror/PeerReplayer.h @@ -202,12 +202,12 @@ private: const struct ceph_statx &stx, bool sync_check, const std::function &callback); - virtual void finish_crawl() = 0; + virtual void finish_crawl(int ret) = 0; void push_dataq_entry(PeerReplayer::SyncEntry e); bool pop_dataq_entry(PeerReplayer::SyncEntry &out); bool has_pending_work() const; - void mark_crawl_finished(); + void mark_crawl_finished(int ret); bool get_crawl_finished_unlocked() { return m_crawl_finished; } @@ -229,6 +229,13 @@ private: std::unique_lock lock(sdq_lock); return m_datasync_errno; } + bool get_crawl_error() { + std::unique_lock lock(sdq_lock); + return m_crawl_error; + } + bool get_crawl_error_unlocked() { + return m_crawl_error; + } void dec_in_flight() { std::unique_lock lock(sdq_lock); --m_in_flight; @@ -272,6 +279,7 @@ private: std::queue m_sync_dataq; int m_in_flight = 0; bool m_crawl_finished = false; + bool m_crawl_error = false; bool m_sync_done = false; bool m_datasync_error = false; int m_datasync_errno = 0; @@ -293,7 +301,7 @@ private: const std::function &dirsync_func, const std::function &purge_func); - void finish_crawl(); + void finish_crawl(int ret); }; class SnapDiffSync : public SyncMechanism { @@ -313,7 +321,7 @@ private: const struct ceph_statx &stx, bool sync_check, const std::function &callback); - void finish_crawl(); + void finish_crawl(int ret); private: int init_directory(const std::string &epath,