From 75f42340db0ee7263b0708675b8cdfda3bc78059 Mon Sep 17 00:00:00 2001 From: Venky Shankar Date: Fri, 14 Sep 2018 00:23:11 -0400 Subject: [PATCH] mds: implement journal flush as asynchronous context execution Signed-off-by: Venky Shankar --- src/mds/MDSRank.cc | 333 ++++++++++++++++++++++++++++----------------- src/mds/MDSRank.h | 4 +- 2 files changed, 211 insertions(+), 126 deletions(-) diff --git a/src/mds/MDSRank.cc b/src/mds/MDSRank.cc index 12897b9056f..becf3f4d3cf 100644 --- a/src/mds/MDSRank.cc +++ b/src/mds/MDSRank.cc @@ -41,6 +41,203 @@ #undef dout_prefix #define dout_prefix *_dout << "mds." << whoami << '.' << incarnation << ' ' +class C_Flush_Journal : public MDSInternalContext { +public: + C_Flush_Journal(MDCache *mdcache, MDLog *mdlog, MDSRank *mds, + std::ostream *ss, Context *on_finish) + : MDSInternalContext(mds), + mdcache(mdcache), mdlog(mdlog), ss(ss), on_finish(on_finish), + whoami(mds->whoami), incarnation(mds->incarnation) { + } + + void send() { + assert(mds->mds_lock.is_locked()); + + dout(20) << __func__ << dendl; + + if (mdcache->is_readonly()) { + dout(5) << __func__ << ": read-only FS" << dendl; + complete(-EROFS); + return; + } + + if (!mds->is_active()) { + dout(5) << __func__ << ": MDS not active, no-op" << dendl; + complete(0); + return; + } + + flush_mdlog(); + } + +private: + + void flush_mdlog() { + dout(20) << __func__ << dendl; + + // I need to seal off the current segment, and then mark all + // previous segments for expiry + mdlog->start_new_segment(); + + Context *ctx = new FunctionContext([this](int r) { + handle_flush_mdlog(r); + }); + + // Flush initially so that all the segments older than our new one + // will be elegible for expiry + mdlog->flush(); + mdlog->wait_for_safe(new MDSInternalContextWrapper(mds, ctx)); + } + + void handle_flush_mdlog(int r) { + dout(20) << __func__ << ": r=" << r << dendl; + + if (r != 0) { + *ss << "Error " << r << " (" << cpp_strerror(r) << ") while flushing journal"; + complete(r); + return; + } + + clear_mdlog(); + } + + void clear_mdlog() { + dout(20) << __func__ << dendl; + + Context *ctx = new FunctionContext([this](int r) { + handle_clear_mdlog(r); + }); + + // Because we may not be the last wait_for_safe context on MDLog, + // and subsequent contexts might wake up in the middle of our + // later trim_all and interfere with expiry (by e.g. marking + // dirs/dentries dirty on previous log segments), we run a second + // wait_for_safe here. See #10368 + mdlog->wait_for_safe(new MDSInternalContextWrapper(mds, ctx)); + } + + void handle_clear_mdlog(int r) { + dout(20) << __func__ << ": r=" << r << dendl; + + if (r != 0) { + *ss << "Error " << r << " (" << cpp_strerror(r) << ") while flushing journal"; + complete(r); + return; + } + + trim_mdlog(); + } + + void trim_mdlog() { + // Put all the old log segments into expiring or expired state + dout(5) << __func__ << ": beginning segment expiry" << dendl; + + int ret = mdlog->trim_all(); + if (ret != 0) { + *ss << "Error " << ret << " (" << cpp_strerror(ret) << ") while trimming log"; + complete(ret); + return; + } + + expire_segments(); + } + + void expire_segments() { + dout(20) << __func__ << dendl; + + // Attach contexts to wait for all expiring segments to expire + MDSGatherBuilder *expiry_gather = new MDSGatherBuilder(g_ceph_context); + + const auto &expiring_segments = mdlog->get_expiring_segments(); + for (auto p : expiring_segments) { + p->wait_for_expiry(expiry_gather->new_sub()); + } + dout(5) << __func__ << ": waiting for " << expiry_gather->num_subs_created() + << " segments to expire" << dendl; + + if (!expiry_gather->has_subs()) { + trim_segments(); + delete expiry_gather; + return; + } + + Context *ctx = new FunctionContext([this](int r) { + handle_expire_segments(r); + }); + expiry_gather->set_finisher(new MDSInternalContextWrapper(mds, ctx)); + expiry_gather->activate(); + } + + void handle_expire_segments(int r) { + dout(20) << __func__ << ": r=" << r << dendl; + + ceph_assert(r == 0); // MDLog is not allowed to raise errors via + // wait_for_expiry + trim_segments(); + } + + void trim_segments() { + dout(20) << __func__ << dendl; + + Context *ctx = new C_OnFinisher(new FunctionContext([this](int _) { + Mutex::Locker locker(mds->mds_lock); + trim_expired_segments(); + }), mds->finisher); + ctx->complete(0); + } + + void trim_expired_segments() { + dout(5) << __func__ << ": expiry complete, expire_pos/trim_pos is now " + << std::hex << mdlog->get_journaler()->get_expire_pos() << "/" + << mdlog->get_journaler()->get_trimmed_pos() << dendl; + + // Now everyone I'm interested in is expired + mdlog->trim_expired_segments(); + + dout(5) << __func__ << ": trim complete, expire_pos/trim_pos is now " + << std::hex << mdlog->get_journaler()->get_expire_pos() << "/" + << mdlog->get_journaler()->get_trimmed_pos() << dendl; + + write_journal_head(); + } + + void write_journal_head() { + dout(20) << __func__ << dendl; + + Context *ctx = new FunctionContext([this](int r) { + Mutex::Locker locker(mds->mds_lock); + handle_write_head(r); + }); + // Flush the journal header so that readers will start from after + // the flushed region + mdlog->get_journaler()->write_head(ctx); + } + + void handle_write_head(int r) { + if (r != 0) { + *ss << "Error " << r << " (" << cpp_strerror(r) << ") while writing header"; + } else { + dout(5) << __func__ << ": write_head complete, all done!" << dendl; + } + + complete(r); + } + + void finish(int r) override { + dout(20) << __func__ << ": r=" << r << dendl; + on_finish->complete(r); + } + + MDCache *mdcache; + MDLog *mdlog; + std::ostream *ss; + Context *on_finish; + + // so as to use dout + mds_rank_t whoami; + int incarnation; +}; + MDSRank::MDSRank( mds_rank_t whoami_, Mutex &mds_lock_, @@ -2285,140 +2482,26 @@ void MDSRank::command_flush_path(Formatter *f, std::string_view path) f->close_section(); // results } -/** - * Wrapper around _command_flush_journal that - * handles serialization of result - */ -void MDSRank::command_flush_journal(Formatter *f) -{ +// synchronous wrapper around "journal flush" asynchronous context +// execution. +void MDSRank::command_flush_journal(Formatter *f) { ceph_assert(f != NULL); + C_SaferCond cond; std::stringstream ss; - const int r = _command_flush_journal(ss); + { + Mutex::Locker locker(mds_lock); + C_Flush_Journal *flush_journal = new C_Flush_Journal(mdcache, mdlog, this, &ss, &cond); + flush_journal->send(); + } + int r = cond.wait(); + f->open_object_section("result"); f->dump_string("message", ss.str()); f->dump_int("return_code", r); f->close_section(); } -/** - * Implementation of "flush journal" asok command. - * - * @param ss - * Optionally populate with a human readable string describing the - * reason for any unexpected return status. - */ -int MDSRank::_command_flush_journal(std::ostream& ss) -{ - Mutex::Locker l(mds_lock); - - if (mdcache->is_readonly()) { - dout(5) << __func__ << ": read-only FS" << dendl; - return -EROFS; - } - - if (!is_active()) { - dout(5) << __func__ << ": MDS not active, no-op" << dendl; - return 0; - } - - // I need to seal off the current segment, and then mark all previous segments - // for expiry - mdlog->start_new_segment(); - int r = 0; - - // Flush initially so that all the segments older than our new one - // will be elegible for expiry - { - C_SaferCond mdlog_flushed; - mdlog->flush(); - mdlog->wait_for_safe(new MDSInternalContextWrapper(this, &mdlog_flushed)); - mds_lock.Unlock(); - r = mdlog_flushed.wait(); - mds_lock.Lock(); - if (r != 0) { - ss << "Error " << r << " (" << cpp_strerror(r) << ") while flushing journal"; - return r; - } - } - - // Because we may not be the last wait_for_safe context on MDLog, and - // subsequent contexts might wake up in the middle of our later trim_all - // and interfere with expiry (by e.g. marking dirs/dentries dirty - // on previous log segments), we run a second wait_for_safe here. - // See #10368 - { - C_SaferCond mdlog_cleared; - mdlog->wait_for_safe(new MDSInternalContextWrapper(this, &mdlog_cleared)); - mds_lock.Unlock(); - r = mdlog_cleared.wait(); - mds_lock.Lock(); - if (r != 0) { - ss << "Error " << r << " (" << cpp_strerror(r) << ") while flushing journal"; - return r; - } - } - - // Put all the old log segments into expiring or expired state - dout(5) << __func__ << ": beginning segment expiry" << dendl; - r = mdlog->trim_all(); - if (r != 0) { - ss << "Error " << r << " (" << cpp_strerror(r) << ") while trimming log"; - return r; - } - - // Attach contexts to wait for all expiring segments to expire - MDSGatherBuilder expiry_gather(g_ceph_context); - - const std::set &expiring_segments = mdlog->get_expiring_segments(); - for (std::set::const_iterator i = expiring_segments.begin(); - i != expiring_segments.end(); ++i) { - (*i)->wait_for_expiry(expiry_gather.new_sub()); - } - dout(5) << __func__ << ": waiting for " << expiry_gather.num_subs_created() - << " segments to expire" << dendl; - - if (expiry_gather.has_subs()) { - C_SaferCond cond; - expiry_gather.set_finisher(new MDSInternalContextWrapper(this, &cond)); - expiry_gather.activate(); - - // Drop mds_lock to allow progress until expiry is complete - mds_lock.Unlock(); - int r = cond.wait(); - mds_lock.Lock(); - - ceph_assert(r == 0); // MDLog is not allowed to raise errors via wait_for_expiry - } - - dout(5) << __func__ << ": expiry complete, expire_pos/trim_pos is now " << std::hex << - mdlog->get_journaler()->get_expire_pos() << "/" << - mdlog->get_journaler()->get_trimmed_pos() << dendl; - - // Now everyone I'm interested in is expired - mdlog->trim_expired_segments(); - - dout(5) << __func__ << ": trim complete, expire_pos/trim_pos is now " << std::hex << - mdlog->get_journaler()->get_expire_pos() << "/" << - mdlog->get_journaler()->get_trimmed_pos() << dendl; - - // Flush the journal header so that readers will start from after the flushed region - C_SaferCond wrote_head; - mdlog->get_journaler()->write_head(&wrote_head); - mds_lock.Unlock(); // Drop lock to allow messenger dispatch progress - r = wrote_head.wait(); - mds_lock.Lock(); - if (r != 0) { - ss << "Error " << r << " (" << cpp_strerror(r) << ") while writing header"; - return r; - } - - dout(5) << __func__ << ": write_head complete, all done!" << dendl; - - return 0; -} - - void MDSRank::command_get_subtrees(Formatter *f) { ceph_assert(f != NULL); diff --git a/src/mds/MDSRank.h b/src/mds/MDSRank.h index 69a1fb9269f..b8bd7e6c1ed 100644 --- a/src/mds/MDSRank.h +++ b/src/mds/MDSRank.h @@ -134,6 +134,9 @@ class MDSRank { int incarnation; public: + + friend class C_Flush_Journal; + mds_rank_t get_nodeid() const { return whoami; } int64_t get_metadata_pool(); @@ -470,7 +473,6 @@ class MDSRank { std::ostream &ss, Formatter *f); int _command_export_dir(std::string_view path, mds_rank_t dest); - int _command_flush_journal(std::ostream& ss); CDir *_command_dirfrag_get( const cmdmap_t &cmdmap, std::ostream &ss); -- 2.39.5