}
}
+/**
+ * Invoked on the flush after each entry submitted
+ */
+class C_MDL_Flushed : public MDSIOContextBase {
+ protected:
+ MDLog *mdlog;
+ MDS *get_mds() {return mdlog->mds;}
+ uint64_t flushed_to;
+ MDSInternalContextBase *wrapped;
+
+ void finish(int r) {
+ if (wrapped) {
+ wrapped->complete(r);
+ }
+
+ mdlog->submit_mutex.Lock();
+ assert(mdlog->safe_pos <= flushed_to);
+ mdlog->safe_pos = flushed_to;
+ mdlog->submit_mutex.Unlock();
+ }
+
+ public:
+ C_MDL_Flushed(MDLog *m, uint64_t ft, MDSInternalContextBase *w)
+ : mdlog(m), flushed_to(ft), wrapped(w) {}
+};
+
void MDLog::_submit_thread()
{
dout(10) << "_submit_thread start" << dendl;
<< " : " << *le << dendl;
// journal it.
- journaler->append_entry(bl); // bl is destroyed.
- ls->end = journaler->get_write_pos();
+ const uint64_t new_write_pos = journaler->append_entry(bl); // bl is destroyed.
+ ls->end = new_write_pos;
+
+ journaler->wait_for_flush(new C_MDL_Flushed(
+ this, new_write_pos, data.fin));
- if (data.fin)
- journaler->wait_for_flush(new C_IO_Wrapper(mds, data.fin));
if (data.flush)
journaler->flush();
delete le;
} else {
- if (data.fin)
- journaler->wait_for_flush(new C_IO_Wrapper(mds, data.fin));
+ journaler->wait_for_flush(new C_MDL_Flushed(
+ this, journaler->get_write_pos(), data.fin));
if (data.flush)
journaler->flush();
}
++p;
if (pending_events.count(ls->seq) ||
- ls->end > journaler->get_write_safe_pos()) {
+ ls->end > safe_pos) {
dout(5) << "trim segment " << ls->seq << "/" << ls->offset << ", not fully flushed yet, safe "
<< journaler->get_write_safe_pos() << " < end " << ls->end << dendl;
break;
<< "/" << expiring_segments.size()
<< "/" << expired_segments.size() << dendl;
- uint64_t safe_pos = journaler->get_write_safe_pos();
uint64_t last_seq = 0;
if (!segments.empty())
last_seq = get_last_segment_seq();
logger->set(l_mdl_expos, journaler->get_expire_pos());
}
+ safe_pos = journaler->get_write_safe_pos();
+
dout(10) << "_replay_thread kicking waiters" << dendl;
mds->mds_lock.Lock();
finish_contexts(g_ceph_context, waitfor_replay, r);