From: Patrick Donnelly Date: Wed, 28 Aug 2024 03:07:48 +0000 (-0400) Subject: mds: do not duplicate journaler write heads X-Git-Tag: testing/wip-rishabh-testing-20240930.143059~19^2 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=36315d481401016daf37f6d52bf7af0aab9d7670;p=ceph-ci.git mds: do not duplicate journaler write heads The MDS can unnecessarily write out the journaler head multiple times. Check the last_written Header to see if it's necessary to write and gather waiters. Signed-off-by: Patrick Donnelly --- diff --git a/src/mds/MDLog.cc b/src/mds/MDLog.cc index 99f86231847..0be568433ef 100644 --- a/src/mds/MDLog.cc +++ b/src/mds/MDLog.cc @@ -146,13 +146,63 @@ class C_MDL_WriteError : public MDSIOContextBase { }; +class C_MDL_WriteHead : public MDSIOContextBase { +public: + explicit C_MDL_WriteHead(MDLog* m) + : MDSIOContextBase(true) + , mdlog(m) + {} + void print(ostream& out) const override { + out << "mdlog_write_head"; + } +protected: + void finish(int r) override { + mdlog->finish_head_waiters(); + } + MDSRank *get_mds() override {return mdlog->mds;} + + MDLog *mdlog; +}; + +void MDLog::finish_head_waiters() +{ + ceph_assert(ceph_mutex_is_locked_by_me(mds->mds_lock)); + + auto&& last_committed = journaler->get_last_committed(); + auto& expire_pos = last_committed.expire_pos; + + dout(20) << __func__ << " expire_pos=" << std::hex << expire_pos << dendl; + + { + auto last = waiting_for_expire.upper_bound(expire_pos); + for (auto it = waiting_for_expire.begin(); it != last; it++) { + finish_contexts(g_ceph_context, it->second); + } + waiting_for_expire.erase(waiting_for_expire.begin(), last); + } +} + void MDLog::write_head(MDSContext *c) { - Context *fin = NULL; - if (c != NULL) { - fin = new C_IO_Wrapper(mds, c); + ceph_assert(ceph_mutex_is_locked_by_me(mds->mds_lock)); + + auto&& last_written = journaler->get_last_written(); + auto expire_pos = journaler->get_expire_pos(); + dout(10) << __func__ << " last_written=" << last_written << " current expire_pos=" << std::hex << expire_pos << dendl; + + if (last_written.expire_pos < expire_pos) { + if (c != NULL) { + dout(25) << __func__ << " queueing waiter " << c << dendl; + waiting_for_expire[expire_pos].push_back(c); + } + + auto* fin = new C_MDL_WriteHead(this); + journaler->write_head(fin); + } else { + if (c) { + c->complete(0); + } } - journaler->write_head(fin); } uint64_t MDLog::get_read_pos() const @@ -174,6 +224,8 @@ uint64_t MDLog::get_safe_pos() const void MDLog::create(MDSContext *c) { + ceph_assert(ceph_mutex_is_locked_by_me(mds->mds_lock)); + dout(5) << "create empty log" << dendl; C_GatherBuilder gather(g_ceph_context); @@ -845,7 +897,6 @@ void MDLog::_trim_expired_segments(auto& locker, MDSContext* ctx) ceph_assert(locker.owns_lock()); // trim expired segments? - bool trimmed = false; uint64_t end = 0; for (auto it = segments.begin(); it != segments.end(); ++it) { auto& [seq, ls] = *it; @@ -881,7 +932,6 @@ void MDLog::_trim_expired_segments(auto& locker, MDSContext* ctx) } else { logger->set(l_mdl_expos, jexpire_pos); } - trimmed = true; } if (!expired_segments.count(ls)) { @@ -895,13 +945,7 @@ void MDLog::_trim_expired_segments(auto& locker, MDSContext* ctx) locker.unlock(); - if (trimmed) { - write_head(ctx); - } else { - if (ctx) { - ctx->complete(0); - } - } + write_head(ctx); } void MDLog::_expired(LogSegment *ls) diff --git a/src/mds/MDLog.h b/src/mds/MDLog.h index 45b6c35ade2..e2ab4e686cd 100644 --- a/src/mds/MDLog.h +++ b/src/mds/MDLog.h @@ -133,6 +133,8 @@ public: void kick_submitter(); void shutdown(); + void finish_head_waiters(); + void submit_entry(LogEvent *e, MDSLogContextBase* c = 0) { std::lock_guard l(submit_mutex); _submit_entry(e, c); @@ -320,5 +322,7 @@ private: // guarded by mds_lock std::condition_variable_any cond; std::atomic upkeep_log_trim_shutdown{false}; + + std::map> waiting_for_expire; // protected by mds_lock }; #endif