From 5b5493db5b1aa4e61147a4bcafd173e25f93d832 Mon Sep 17 00:00:00 2001 From: John Spray Date: Fri, 31 Oct 2014 12:41:18 +0000 Subject: [PATCH] mds: implement "flush journal" asok Start a new segment and flush everything older to the backing store. Signed-off-by: John Spray --- src/mds/LogSegment.h | 8 +++ src/mds/MDLog.cc | 64 +++++++++++++++++++++++ src/mds/MDLog.h | 13 +++++ src/mds/MDS.cc | 119 ++++++++++++++++++++++++++++++++++++++++++- src/mds/MDS.h | 4 ++ 5 files changed, 207 insertions(+), 1 deletion(-) diff --git a/src/mds/LogSegment.h b/src/mds/LogSegment.h index 035cc81a72abc..6626afe73d3da 100644 --- a/src/mds/LogSegment.h +++ b/src/mds/LogSegment.h @@ -71,6 +71,14 @@ class LogSegment { // try to expire void try_to_expire(MDS *mds, MDSGatherBuilder &gather_bld, int op_prio); + std::list expiry_waiters; + + void wait_for_expiry(MDSInternalContextBase *c) + { + assert(c != NULL); + expiry_waiters.push_back(c); + } + // cons LogSegment(uint64_t _seq, loff_t off=-1) : seq(_seq), offset(off), end(off), num_events(0), diff --git a/src/mds/MDLog.cc b/src/mds/MDLog.cc index 60c84e4f941f4..456e97d551c76 100644 --- a/src/mds/MDLog.cc +++ b/src/mds/MDLog.cc @@ -562,6 +562,57 @@ void MDLog::trim(int m) } +/** + * Like ::trim, but instead of trimming to max_segments, trim all but the latest + * segment. + */ +int MDLog::trim_all() +{ + submit_mutex.Lock(); + + dout(10) << __func__ << ": " + << segments.size() + << "/" << expiring_segments.size() + << "/" << expired_segments.size() << dendl; + + map::iterator p = segments.begin(); + for (; (p != segments.end()) && (p->second != peek_current_segment());) { + LogSegment *ls = p->second; + ++p; + + // Caller should have flushed journaler before calling this + if (pending_events.count(ls->seq)) { + dout(5) << __func__ << ": segment " << ls->seq << " has pending events" << dendl; + submit_mutex.Unlock(); + return -EAGAIN; + } + + if (expiring_segments.count(ls)) { + dout(5) << "trim already expiring segment " << ls->seq << "/" << ls->offset + << ", " << ls->num_events << " events" << dendl; + } else if (expired_segments.count(ls)) { + dout(5) << "trim already expired segment " << ls->seq << "/" << ls->offset + << ", " << ls->num_events << " events" << dendl; + } else { + assert(expiring_segments.count(ls) == 0); + expiring_segments.insert(ls); + expiring_events += ls->num_events; + submit_mutex.Unlock(); + + uint64_t last_seq = ls->seq; + try_expire(ls, CEPH_MSG_PRIO_DEFAULT); + + submit_mutex.Lock(); + p = segments.lower_bound(last_seq + 1); + } + } + + _trim_expired_segments(); + + return 0; +} + + void MDLog::try_expire(LogSegment *ls, int op_prio) { MDSGatherBuilder gather_bld(g_ceph_context); @@ -632,6 +683,12 @@ void MDLog::_trim_expired_segments() journaler->write_head(0); } +void MDLog::trim_expired_segments() +{ + submit_mutex.Lock(); + _trim_expired_segments(); +} + void MDLog::_expired(LogSegment *ls) { assert(submit_mutex.is_locked_by_me()); @@ -646,6 +703,13 @@ void MDLog::_expired(LogSegment *ls) // expired. expired_segments.insert(ls); expired_events += ls->num_events; + + // Trigger all waiters + for (std::list::iterator i = ls->expiry_waiters.begin(); + i != ls->expiry_waiters.end(); ++i) { + (*i)->complete(0); + } + ls->expiry_waiters.clear(); logger->inc(l_mdl_evex, ls->num_events); logger->inc(l_mdl_segex); diff --git a/src/mds/MDLog.h b/src/mds/MDLog.h index 2d00ed21380a7..601148dc912da 100644 --- a/src/mds/MDLog.h +++ b/src/mds/MDLog.h @@ -146,6 +146,13 @@ protected: } submit_thread; friend class SubmitThread; +public: + const std::set &get_expiring_segments() const + { + return expiring_segments; + } +protected: + // -- subtreemaps -- friend class ESubtreeMap; friend class MDCache; @@ -294,7 +301,13 @@ private: void _trim_expired_segments(); public: + void trim_expired_segments(); void trim(int max=-1); + int trim_all(); + bool expiry_done() const + { + return expiring_segments.empty() && expired_segments.empty(); + }; private: void write_head(MDSInternalContextBase *onfinish); diff --git a/src/mds/MDS.cc b/src/mds/MDS.cc index a4d27945e2f98..6060014d106d0 100644 --- a/src/mds/MDS.cc +++ b/src/mds/MDS.cc @@ -217,7 +217,7 @@ public: bool MDS::asok_command(string command, cmdmap_t& cmdmap, string format, ostream& ss) { - dout(1) << "asok_command: " << command << dendl; + dout(1) << "asok_command: " << command << " (starting...)" << dendl; Formatter *f = new_formatter(format); if (!f) @@ -297,9 +297,14 @@ bool MDS::asok_command(string command, cmdmap_t& cmdmap, string format, string path; cmd_getval(g_ceph_context, cmdmap, "path", path); command_flush_path(f, path); + } else if (command == "flush journal") { + command_flush_journal(f); } f->flush(ss); delete f; + + dout(1) << "asok_command: " << command << " (complete)" << dendl; + return true; } @@ -327,6 +332,113 @@ void MDS::command_flush_path(Formatter *f, const string& path) f->close_section(); // results } +/** + * Wrapper around _command_flush_journal that + * handles serialization of result + */ +void MDS::command_flush_journal(Formatter *f) +{ + assert(f != NULL); + + std::stringstream ss; + const int r = _command_flush_journal(&ss); + f->open_object_section("result"); + f->dump_string("message", ss.str()); + f->dump_int("return_code", r); + f->close_section(); +} + +/** + * Implementation of "flush journal" asok command. + * + * @param ss + * Optionally populate with a human readable string describing the + * reason for any unexpected return status. + */ +int MDS::_command_flush_journal(std::stringstream *ss) +{ + assert(ss != NULL); + + Mutex::Locker l(mds_lock); + + // I need to seal off the current segment, and then mark all previous segments + // for expiry + mdlog->start_new_segment(); + int r = 0; + + // Flush initially so that all the segments older than our new one + // will be elegible for expiry + C_SaferCond mdlog_flushed; + mdlog->flush(); + mdlog->wait_for_safe(new MDSInternalContextWrapper(this, &mdlog_flushed)); + mds_lock.Unlock(); + r = mdlog_flushed.wait(); + mds_lock.Lock(); + if (r != 0) { + *ss << "Error " << r << " (" << cpp_strerror(r) << ") while flushing journal"; + return r; + } + + // Put all the old log segments into expiring or expired state + dout(5) << __func__ << ": beginning segment expiry" << dendl; + r = mdlog->trim_all(); + if (r != 0) { + *ss << "Error " << r << " (" << cpp_strerror(r) << ") while trimming log"; + return r; + } + + // Attach contexts to wait for all expiring segments to expire + MDSGatherBuilder expiry_gather(g_ceph_context); + + std::list expired_ctxs; + const std::set &expiring_segments = mdlog->get_expiring_segments(); + for (std::set::const_iterator i = expiring_segments.begin(); + i != expiring_segments.end(); ++i) { + (*i)->wait_for_expiry(expiry_gather.new_sub()); + } + dout(5) << __func__ << ": waiting for " << expiry_gather.num_subs_created() + << " segments to expire" << dendl; + + if (expiry_gather.has_subs()) { + C_SaferCond cond; + expiry_gather.set_finisher(new MDSInternalContextWrapper(this, &cond)); + expiry_gather.activate(); + + // Drop mds_lock to allow progress until expiry is complete + mds_lock.Unlock(); + int r = cond.wait(); + mds_lock.Lock(); + + assert(r == 0); // MDLog is not allowed to raise errors via wait_for_expiry + } + + dout(5) << __func__ << ": expiry complete, expire_pos/trim_pos is now " << std::hex << + mdlog->get_journaler()->get_expire_pos() << "/" << + mdlog->get_journaler()->get_trimmed_pos() << dendl; + + // Now everyone I'm interested in is expired + mdlog->trim_expired_segments(); + + dout(5) << __func__ << ": trim complete, expire_pos/trim_pos is now " << std::hex << + mdlog->get_journaler()->get_expire_pos() << "/" << + mdlog->get_journaler()->get_trimmed_pos() << dendl; + + // Flush the journal header so that readers will start from after the flushed region + C_SaferCond wrote_head; + mdlog->get_journaler()->write_head(&wrote_head); + mds_lock.Unlock(); // Drop lock to allow messenger dispatch progress + r = wrote_head.wait(); + mds_lock.Lock(); + if (r != 0) { + *ss << "Error " << r << " (" << cpp_strerror(r) << ") while writing header"; + return r; + } + + dout(5) << __func__ << ": write_head complete, all done!" << dendl; + + return 0; +} + void MDS::set_up_admin_socket() { int r; @@ -361,6 +473,11 @@ void MDS::set_up_admin_socket() asok_hook, "Enumerate connected CephFS clients"); assert(0 == r); + r = admin_socket->register_command("flush journal", + "flush journal", + asok_hook, + "Flush the journal to the backing store"); + assert(0 == r); } void MDS::clean_up_admin_socket() diff --git a/src/mds/MDS.h b/src/mds/MDS.h index 3925bbd508ac9..e74f0428743a0 100644 --- a/src/mds/MDS.h +++ b/src/mds/MDS.h @@ -378,6 +378,10 @@ private: void check_ops_in_flight(); // send off any slow ops to monitor void command_scrub_path(Formatter *f, const string& path); void command_flush_path(Formatter *f, const string& path); + void command_flush_journal(Formatter *f); + private: + int _command_flush_journal(std::stringstream *ss); + public: // config observer bits virtual const char** get_tracked_conf_keys() const; virtual void handle_conf_change(const struct md_config_t *conf, -- 2.39.5