From: John Spray Date: Fri, 27 Mar 2015 10:30:46 +0000 (+0000) Subject: mds: separate MDLog::safe_pos from journaler X-Git-Tag: v9.0.1~119^2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=b489f94a989ce54e25a24d2a463bdd6dc055de6a;p=ceph.git mds: separate MDLog::safe_pos from journaler ...and update it via wait_For_flush completions, so that its updates are ordered with respect to the callbacks that happen after a log event is persisted. Fixes: #10368 Signed-off-by: John Spray --- diff --git a/src/mds/MDLog.cc b/src/mds/MDLog.cc index 746c1fe4e92e..1870b486a2be 100644 --- a/src/mds/MDLog.cc +++ b/src/mds/MDLog.cc @@ -315,6 +315,32 @@ void MDLog::_submit_entry(LogEvent *le, MDSInternalContextBase *c) } } +/** + * Invoked on the flush after each entry submitted + */ +class C_MDL_Flushed : public MDSIOContextBase { + protected: + MDLog *mdlog; + MDS *get_mds() {return mdlog->mds;} + uint64_t flushed_to; + MDSInternalContextBase *wrapped; + + void finish(int r) { + if (wrapped) { + wrapped->complete(r); + } + + mdlog->submit_mutex.Lock(); + assert(mdlog->safe_pos <= flushed_to); + mdlog->safe_pos = flushed_to; + mdlog->submit_mutex.Unlock(); + } + + public: + C_MDL_Flushed(MDLog *m, uint64_t ft, MDSInternalContextBase *w) + : mdlog(m), flushed_to(ft), wrapped(w) {} +}; + void MDLog::_submit_thread() { dout(10) << "_submit_thread start" << dendl; @@ -355,11 +381,12 @@ void MDLog::_submit_thread() << " : " << *le << dendl; // journal it. - journaler->append_entry(bl); // bl is destroyed. - ls->end = journaler->get_write_pos(); + const uint64_t new_write_pos = journaler->append_entry(bl); // bl is destroyed. + ls->end = new_write_pos; + + journaler->wait_for_flush(new C_MDL_Flushed( + this, new_write_pos, data.fin)); - if (data.fin) - journaler->wait_for_flush(new C_IO_Wrapper(mds, data.fin)); if (data.flush) journaler->flush(); @@ -368,8 +395,8 @@ void MDLog::_submit_thread() delete le; } else { - if (data.fin) - journaler->wait_for_flush(new C_IO_Wrapper(mds, data.fin)); + journaler->wait_for_flush(new C_MDL_Flushed( + this, journaler->get_write_pos(), data.fin)); if (data.flush) journaler->flush(); } @@ -541,7 +568,7 @@ void MDLog::trim(int m) ++p; if (pending_events.count(ls->seq) || - ls->end > journaler->get_write_safe_pos()) { + ls->end > safe_pos) { dout(5) << "trim segment " << ls->seq << "/" << ls->offset << ", not fully flushed yet, safe " << journaler->get_write_safe_pos() << " < end " << ls->end << dendl; break; @@ -597,7 +624,6 @@ int MDLog::trim_all() << "/" << expiring_segments.size() << "/" << expired_segments.size() << dendl; - uint64_t safe_pos = journaler->get_write_safe_pos(); uint64_t last_seq = 0; if (!segments.empty()) last_seq = get_last_segment_seq(); @@ -1244,6 +1270,8 @@ void MDLog::_replay_thread() logger->set(l_mdl_expos, journaler->get_expire_pos()); } + safe_pos = journaler->get_write_safe_pos(); + dout(10) << "_replay_thread kicking waiters" << dendl; mds->mds_lock.Lock(); finish_contexts(g_ceph_context, waitfor_replay, r); diff --git a/src/mds/MDLog.h b/src/mds/MDLog.h index 7b21ff258e66..bc0cf413e0f2 100644 --- a/src/mds/MDLog.h +++ b/src/mds/MDLog.h @@ -74,6 +74,11 @@ protected: bool stopping; + // Log position which is persistent *and* for which + // submit_entry wait_for_safe callbacks have already + // been called. + uint64_t safe_pos; + inodeno_t ino; Journaler *journaler; @@ -184,6 +189,7 @@ public: unflushed(0), capped(false), stopping(false), + safe_pos(0), journaler(0), logger(0), replay_thread(this), @@ -290,6 +296,7 @@ private: void _trim_expired_segments(); friend class C_MaybeExpiredSegment; + friend class C_MDL_Flushed; public: void trim_expired_segments();