bool PeerReplayer::SyncMechanism::pop_dataq_entry(SyncEntry &out_entry) {
std::unique_lock lock(sdq_lock);
dout(20) << ": snapshot data replayer waiting on m_sync_dataq, syncm=" << this << dendl;
- sdq_cv.wait(lock, [this]{ return !m_sync_dataq.empty() || m_crawl_finished;});
+ sdq_cv.wait(lock, [this]{ return !m_sync_dataq.empty() || m_crawl_finished || m_datasync_error;});
dout(20) << ": snapshot data replayer woke up to process m_syncm_dataq, syncm=" << this
<< " crawl_finished=" << m_crawl_finished << dendl;
+ if (m_datasync_error) {
+ dout(20) << ": snapshot data replayer, datasync_error="<< m_datasync_error
+ << " syncm=" << this << dendl;
+ return false;
+ }
if (m_sync_dataq.empty() && m_crawl_finished) {
dout(20) << ": snapshot data replayer - finished processing syncm=" << this
<< " Proceed with next syncm job " << dendl;
bool PeerReplayer::SyncMechanism::has_pending_work() const {
std::unique_lock lock(sdq_lock);
- if (m_sync_dataq.empty() && m_crawl_finished)
+ const bool job_done =
+ m_sync_dataq.empty() && m_crawl_finished;
+ // No more work if datasync failed or everything is done
+ if (m_datasync_error || job_done)
return false;
return true;
}
sdq_cv.notify_all();
}
-void PeerReplayer::SyncMechanism::wait_for_sync() {
+// Returns false if there is any error during data sync
+bool PeerReplayer::SyncMechanism::wait_for_sync() {
std::unique_lock lock(sdq_lock);
dout(20) << ": Waiting for data sync to be done to take snapshot - dir_root=" << m_dir_root
<< " current=" << m_current << " prev=" << (m_prev ? stringify(m_prev) : "")
<< " syncm=" << this << dendl;
- sdq_cv.wait(lock, [this]{return m_sync_done;});
+ sdq_cv.wait(lock, [this]{return m_sync_done || m_datasync_error;});
dout(20) << ": Woke up to take snapshot - dir_root=" << m_dir_root
<< " current=" << m_current << " prev=" << (m_prev ? stringify(m_prev) : "")
<< " syncm=" << this << dendl;
m_sync_done = false;
+ return m_datasync_error;
}
int PeerReplayer::SyncMechanism::get_changed_blocks(const std::string &epath,
// there is no need to close this fd manually.
ceph_close(fh.p_mnt, fh.p_fd);
- if (r == 0 ) { //Bail out without taking snap if r < 0
- syncm->wait_for_sync();
+ // Wait for datasync threads to finish syncing
+ bool datasync_err = syncm->wait_for_sync();
+ if (r == 0 && !datasync_err) {
// All good, take the snapshot
auto cur_snap_id_str{stringify(current.second)};
snap_metadata snap_meta[] = {{PRIMARY_SNAP_ID_KEY.c_str(), cur_snap_id_str.c_str()}};
derr << ": failed to snap remote directory dir_root=" << dir_root
<< ": " << cpp_strerror(r) << dendl;
}
+ } else if (datasync_err) {
+ r = syncm->get_datasync_errno();
+ derr << ": Datasync thread failed, bailing out without taking snap. dir_root="
+ << dir_root << ": " << cpp_strerror(r) << dendl;
+ } else {
+ derr << ": Crawler thread failed, bailing out without taking snap. dir_root="
+ << dir_root << ": " << cpp_strerror(r) << dendl;
}
return r;
// FHandles are not thread safe, so don't use FHandles from SyncMechanism, open them locally here.
int r = 0;
FHandles fh;
+ bool handles_opened = true;
r = pre_sync_check_and_open_handles(std::string(syncm->get_m_dir_root()),
syncm->get_m_current(), syncm->get_m_prev(), &fh);
if (r < 0) {
- //TODO - Handle this failure in better way ?
dout(5) << ": open_handles failed, cannot proceed sync: " << cpp_strerror(r)
<< " dir_root=" << syncm->get_m_dir_root() << "syncm=" << syncm << dendl;
- break;
+ syncm->set_datasync_error(r); //don't do continue, as it needs to go through dequeue/notify logic
+ handles_opened = false;
}
// Wait on data sync queue for entries to process
r = should_sync_entry(entry.epath, entry.stx, fh,
&need_data_sync, &need_attr_sync);
if (r < 0) {
- //TODO Handle bail out with taking remote snap
+ dout(5) << ": should_sync_entry failed, cannot proceed sync: " << cpp_strerror(r)
+ << " dir_root=" << syncm->get_m_dir_root() << " epath=" << entry.epath << dendl;
+ syncm->set_datasync_error_and_dec_in_flight(r);
+ break;
}
}
entry.epath, entry.stx, entry.sync_check, fh,
need_data_sync, need_attr_sync);
if (r < 0) {
- //TODO Handle bail out with taking remote snap
+ dout(5) << ": remote_file_op failed, cannot proceed sync: " << cpp_strerror(r)
+ << " dir_root=" << syncm->get_m_dir_root() << " epath=" << entry.epath << dendl;
+ syncm->set_datasync_error_and_dec_in_flight(r);
+ break;
}
}
dout(10) << ": done for epath=" << entry.epath << " syncm=" << syncm << dendl;
}
// Close fds
- ceph_close(m_local_mount, fh.c_fd);
- ceph_close(fh.p_mnt, fh.p_fd);
+ if (handles_opened) {
+ ceph_close(m_local_mount, fh.c_fd);
+ ceph_close(fh.p_mnt, fh.p_fd);
+ }
// Dequeue syncm object after processing
{
std::unique_lock smq_l1(smq_lock);
std::unique_lock sdq_l1(syncm->get_sdq_lock());
- if (!syncm_q.empty() &&
- syncm->get_in_flight_unlocked() == 0 &&
- syncm->get_crawl_finished_unlocked() == true) {
+ const bool no_in_flight_syncm_jobs = syncm->get_in_flight_unlocked() == 0;
+ const bool crawl_finished = syncm->get_crawl_finished_unlocked();
+ const bool sync_error =
+ syncm->get_datasync_error_unlocked();
+ if (!syncm_q.empty() && no_in_flight_syncm_jobs && (crawl_finished || sync_error)) {
dout(20) << ": Dequeue syncm object=" << syncm << dendl;
syncm->set_sync_finished_and_notify_unlocked(); // To wake up crawler thread waiting to take snapshot
if (syncm_q.front() == syncm) {