bool PeerReplayer::SyncMechanism::pop_dataq_entry(SyncEntry &out_entry) {
std::unique_lock lock(sdq_lock);
dout(20) << ": snapshot data replayer waiting on m_sync_dataq, syncm=" << this << dendl;
- sdq_cv.wait(lock, [this]{ return !m_sync_dataq.empty() || m_crawl_finished || m_datasync_error;});
+ sdq_cv.wait(lock, [this]{ return !m_sync_dataq.empty() || m_crawl_finished || m_datasync_error || m_crawl_error;});
dout(20) << ": snapshot data replayer woke up to process m_syncm_dataq, syncm=" << this
<< " crawl_finished=" << m_crawl_finished << dendl;
- if (m_datasync_error) {
+ if (m_datasync_error || m_crawl_error) {
dout(20) << ": snapshot data replayer, datasync_error="<< m_datasync_error
- << " syncm=" << this << dendl;
+ << " crawl_error=" << m_crawl_error << " syncm=" << this << dendl;
return false;
}
if (m_sync_dataq.empty() && m_crawl_finished) {
std::unique_lock lock(sdq_lock);
const bool job_done =
m_sync_dataq.empty() && m_crawl_finished;
+
+ /* On crawl error, return true even if the queue is empty to
+ * - Dequeue the syncm object
+ * - Notify the crawler as it waits after the error for pending jobs to finish.
+ */
+ if (m_crawl_error) {
+ // If in_flight > 0, those threads will take care of dequeue/notify, you just consume next job
+ if (m_in_flight > 0)
+ return false;
+ else
+ return true;
+ }
+
// No more work if datasync failed or everything is done
if (m_datasync_error || job_done)
return false;
return true;
}
-void PeerReplayer::SyncMechanism::mark_crawl_finished() {
+void PeerReplayer::SyncMechanism::mark_crawl_finished(int ret) {
std::unique_lock lock(sdq_lock);
m_crawl_finished = true;
+ if (ret < 0)
+ m_crawl_error = true;
sdq_cv.notify_all();
}
return r;
}
-void PeerReplayer::SnapDiffSync::finish_crawl() {
+void PeerReplayer::SnapDiffSync::finish_crawl(int ret) {
dout(20) << dendl;
while (!m_sync_stack.empty()) {
}
// Crawl and entry operations are done syncing here. So mark crawl finished here
- mark_crawl_finished();
+ mark_crawl_finished(ret);
}
PeerReplayer::RemoteSync::RemoteSync(std::string_view dir_root,
return 0;
}
-void PeerReplayer::RemoteSync::finish_crawl() {
+void PeerReplayer::RemoteSync::finish_crawl(int ret) {
dout(20) << dendl;
while (!m_sync_stack.empty()) {
}
// Crawl and entry operations are done syncing here. So mark stack finished here
- mark_crawl_finished();
+ mark_crawl_finished(ret);
}
int PeerReplayer::do_synchronize(const std::string &dir_root, const Snapshot ¤t,
}
}
- syncm->finish_crawl();
+ syncm->finish_crawl(r);
dout(20) << " cur:" << fh.c_fd
<< " prev:" << fh.p_fd
dout(20) << ": snapshot data replayer woke up! syncm=" << syncm << dendl;
}
- // FHandles are not thread safe, so don't use FHandles from SyncMechanism, open them locally here.
+ /* - FHandles are not thread safe, so don't use FHandles from SyncMechanism, open them locally here.
+ * - On crawl error, don't open handles as other threads could have dequeued syncm and notified
+ * crawler resulting in unregister of dir_root.
+ */
int r = 0;
FHandles fh;
- bool handles_opened = true;
- r = pre_sync_check_and_open_handles(std::string(syncm->get_m_dir_root()),
- syncm->get_m_current(), syncm->get_m_prev(), &fh);
- if (r < 0) {
- dout(5) << ": open_handles failed, cannot proceed sync: " << cpp_strerror(r)
- << " dir_root=" << syncm->get_m_dir_root() << "syncm=" << syncm << dendl;
- syncm->set_datasync_error(r); //don't do continue, as it needs to go through dequeue/notify logic
- handles_opened = false;
+ bool handles_opened = false;
+ const bool crawl_error = syncm->get_crawl_error();
+ if (!crawl_error) {
+ r = pre_sync_check_and_open_handles(std::string(syncm->get_m_dir_root()),
+ syncm->get_m_current(), syncm->get_m_prev(), &fh);
+ if (r < 0) {
+ dout(5) << ": open_handles failed, cannot proceed sync: " << cpp_strerror(r)
+ << " dir_root=" << syncm->get_m_dir_root() << "syncm=" << syncm << dendl;
+ syncm->set_datasync_error(r); //don't do continue, as it needs to go through dequeue/notify logic
+ } else {
+ handles_opened = true;
+ }
}
// Wait on data sync queue for entries to process
const bool no_in_flight_syncm_jobs = syncm->get_in_flight_unlocked() == 0;
const bool crawl_finished = syncm->get_crawl_finished_unlocked();
const bool sync_error =
- syncm->get_datasync_error_unlocked();
+ syncm->get_datasync_error_unlocked() ||
+ syncm->get_crawl_error_unlocked();
if (!syncm_q.empty() && no_in_flight_syncm_jobs && (crawl_finished || sync_error)) {
dout(20) << ": Dequeue syncm object=" << syncm << dendl;
syncm->set_sync_finished_and_notify_unlocked(); // To wake up crawler thread waiting to take snapshot
const struct ceph_statx &stx, bool sync_check,
const std::function<int (uint64_t, struct cblock *)> &callback);
- virtual void finish_crawl() = 0;
+ virtual void finish_crawl(int ret) = 0;
void push_dataq_entry(PeerReplayer::SyncEntry e);
bool pop_dataq_entry(PeerReplayer::SyncEntry &out);
bool has_pending_work() const;
- void mark_crawl_finished();
+ void mark_crawl_finished(int ret);
bool get_crawl_finished_unlocked() {
return m_crawl_finished;
}
std::unique_lock lock(sdq_lock);
return m_datasync_errno;
}
+ bool get_crawl_error() {
+ std::unique_lock lock(sdq_lock);
+ return m_crawl_error;
+ }
+ bool get_crawl_error_unlocked() {
+ return m_crawl_error;
+ }
void dec_in_flight() {
std::unique_lock lock(sdq_lock);
--m_in_flight;
std::queue<PeerReplayer::SyncEntry> m_sync_dataq;
int m_in_flight = 0;
bool m_crawl_finished = false;
+ bool m_crawl_error = false;
bool m_sync_done = false;
bool m_datasync_error = false;
int m_datasync_errno = 0;
const std::function<int (const std::string&)> &dirsync_func,
const std::function<int (const std::string&)> &purge_func);
- void finish_crawl();
+ void finish_crawl(int ret);
};
class SnapDiffSync : public SyncMechanism {
const struct ceph_statx &stx, bool sync_check,
const std::function<int (uint64_t, struct cblock *)> &callback);
- void finish_crawl();
+ void finish_crawl(int ret);
private:
int init_directory(const std::string &epath,