#undef dout_prefix
#define dout_prefix *_dout << "mds." << whoami << '.' << incarnation << ' '
+class C_Flush_Journal : public MDSInternalContext {
+public:
+ C_Flush_Journal(MDCache *mdcache, MDLog *mdlog, MDSRank *mds,
+ std::ostream *ss, Context *on_finish)
+ : MDSInternalContext(mds),
+ mdcache(mdcache), mdlog(mdlog), ss(ss), on_finish(on_finish),
+ whoami(mds->whoami), incarnation(mds->incarnation) {
+ }
+
+ void send() {
+ assert(mds->mds_lock.is_locked());
+
+ dout(20) << __func__ << dendl;
+
+ if (mdcache->is_readonly()) {
+ dout(5) << __func__ << ": read-only FS" << dendl;
+ complete(-EROFS);
+ return;
+ }
+
+ if (!mds->is_active()) {
+ dout(5) << __func__ << ": MDS not active, no-op" << dendl;
+ complete(0);
+ return;
+ }
+
+ flush_mdlog();
+ }
+
+private:
+
+ void flush_mdlog() {
+ dout(20) << __func__ << dendl;
+
+ // I need to seal off the current segment, and then mark all
+ // previous segments for expiry
+ mdlog->start_new_segment();
+
+ Context *ctx = new FunctionContext([this](int r) {
+ handle_flush_mdlog(r);
+ });
+
+ // Flush initially so that all the segments older than our new one
+ // will be elegible for expiry
+ mdlog->flush();
+ mdlog->wait_for_safe(new MDSInternalContextWrapper(mds, ctx));
+ }
+
+ void handle_flush_mdlog(int r) {
+ dout(20) << __func__ << ": r=" << r << dendl;
+
+ if (r != 0) {
+ *ss << "Error " << r << " (" << cpp_strerror(r) << ") while flushing journal";
+ complete(r);
+ return;
+ }
+
+ clear_mdlog();
+ }
+
+ void clear_mdlog() {
+ dout(20) << __func__ << dendl;
+
+ Context *ctx = new FunctionContext([this](int r) {
+ handle_clear_mdlog(r);
+ });
+
+ // Because we may not be the last wait_for_safe context on MDLog,
+ // and subsequent contexts might wake up in the middle of our
+ // later trim_all and interfere with expiry (by e.g. marking
+ // dirs/dentries dirty on previous log segments), we run a second
+ // wait_for_safe here. See #10368
+ mdlog->wait_for_safe(new MDSInternalContextWrapper(mds, ctx));
+ }
+
+ void handle_clear_mdlog(int r) {
+ dout(20) << __func__ << ": r=" << r << dendl;
+
+ if (r != 0) {
+ *ss << "Error " << r << " (" << cpp_strerror(r) << ") while flushing journal";
+ complete(r);
+ return;
+ }
+
+ trim_mdlog();
+ }
+
+ void trim_mdlog() {
+ // Put all the old log segments into expiring or expired state
+ dout(5) << __func__ << ": beginning segment expiry" << dendl;
+
+ int ret = mdlog->trim_all();
+ if (ret != 0) {
+ *ss << "Error " << ret << " (" << cpp_strerror(ret) << ") while trimming log";
+ complete(ret);
+ return;
+ }
+
+ expire_segments();
+ }
+
+ void expire_segments() {
+ dout(20) << __func__ << dendl;
+
+ // Attach contexts to wait for all expiring segments to expire
+ MDSGatherBuilder *expiry_gather = new MDSGatherBuilder(g_ceph_context);
+
+ const auto &expiring_segments = mdlog->get_expiring_segments();
+ for (auto p : expiring_segments) {
+ p->wait_for_expiry(expiry_gather->new_sub());
+ }
+ dout(5) << __func__ << ": waiting for " << expiry_gather->num_subs_created()
+ << " segments to expire" << dendl;
+
+ if (!expiry_gather->has_subs()) {
+ trim_segments();
+ delete expiry_gather;
+ return;
+ }
+
+ Context *ctx = new FunctionContext([this](int r) {
+ handle_expire_segments(r);
+ });
+ expiry_gather->set_finisher(new MDSInternalContextWrapper(mds, ctx));
+ expiry_gather->activate();
+ }
+
+ void handle_expire_segments(int r) {
+ dout(20) << __func__ << ": r=" << r << dendl;
+
+ ceph_assert(r == 0); // MDLog is not allowed to raise errors via
+ // wait_for_expiry
+ trim_segments();
+ }
+
+ void trim_segments() {
+ dout(20) << __func__ << dendl;
+
+ Context *ctx = new C_OnFinisher(new FunctionContext([this](int _) {
+ Mutex::Locker locker(mds->mds_lock);
+ trim_expired_segments();
+ }), mds->finisher);
+ ctx->complete(0);
+ }
+
+ void trim_expired_segments() {
+ dout(5) << __func__ << ": expiry complete, expire_pos/trim_pos is now "
+ << std::hex << mdlog->get_journaler()->get_expire_pos() << "/"
+ << mdlog->get_journaler()->get_trimmed_pos() << dendl;
+
+ // Now everyone I'm interested in is expired
+ mdlog->trim_expired_segments();
+
+ dout(5) << __func__ << ": trim complete, expire_pos/trim_pos is now "
+ << std::hex << mdlog->get_journaler()->get_expire_pos() << "/"
+ << mdlog->get_journaler()->get_trimmed_pos() << dendl;
+
+ write_journal_head();
+ }
+
+ void write_journal_head() {
+ dout(20) << __func__ << dendl;
+
+ Context *ctx = new FunctionContext([this](int r) {
+ Mutex::Locker locker(mds->mds_lock);
+ handle_write_head(r);
+ });
+ // Flush the journal header so that readers will start from after
+ // the flushed region
+ mdlog->get_journaler()->write_head(ctx);
+ }
+
+ void handle_write_head(int r) {
+ if (r != 0) {
+ *ss << "Error " << r << " (" << cpp_strerror(r) << ") while writing header";
+ } else {
+ dout(5) << __func__ << ": write_head complete, all done!" << dendl;
+ }
+
+ complete(r);
+ }
+
+ void finish(int r) override {
+ dout(20) << __func__ << ": r=" << r << dendl;
+ on_finish->complete(r);
+ }
+
+ MDCache *mdcache;
+ MDLog *mdlog;
+ std::ostream *ss;
+ Context *on_finish;
+
+ // so as to use dout
+ mds_rank_t whoami;
+ int incarnation;
+};
+
MDSRank::MDSRank(
mds_rank_t whoami_,
Mutex &mds_lock_,
f->close_section(); // results
}
-/**
- * Wrapper around _command_flush_journal that
- * handles serialization of result
- */
+// synchronous wrapper around "journal flush" asynchronous context
+// execution.
void MDSRank::command_flush_journal(Formatter *f)
{
- assert(f != NULL);
+ ceph_assert(f != NULL);
+ C_SaferCond cond;
std::stringstream ss;
- const int r = _command_flush_journal(&ss);
- f->open_object_section("result");
- f->dump_string("message", ss.str());
- f->dump_int("return_code", r);
- f->close_section();
-}
-/**
- * Implementation of "flush journal" asok command.
- *
- * @param ss
- * Optionally populate with a human readable string describing the
- * reason for any unexpected return status.
- */
-int MDSRank::_command_flush_journal(std::stringstream *ss)
-{
- assert(ss != NULL);
-
- Mutex::Locker l(mds_lock);
-
- if (mdcache->is_readonly()) {
- dout(5) << __func__ << ": read-only FS" << dendl;
- return -EROFS;
- }
-
- if (!is_active()) {
- dout(5) << __func__ << ": MDS not active, no-op" << dendl;
- return 0;
- }
-
- // I need to seal off the current segment, and then mark all previous segments
- // for expiry
- mdlog->start_new_segment();
- int r = 0;
-
- // Flush initially so that all the segments older than our new one
- // will be elegible for expiry
{
- C_SaferCond mdlog_flushed;
- mdlog->flush();
- mdlog->wait_for_safe(new MDSInternalContextWrapper(this, &mdlog_flushed));
- mds_lock.Unlock();
- r = mdlog_flushed.wait();
- mds_lock.Lock();
- if (r != 0) {
- *ss << "Error " << r << " (" << cpp_strerror(r) << ") while flushing journal";
- return r;
- }
- }
-
- // Because we may not be the last wait_for_safe context on MDLog, and
- // subsequent contexts might wake up in the middle of our later trim_all
- // and interfere with expiry (by e.g. marking dirs/dentries dirty
- // on previous log segments), we run a second wait_for_safe here.
- // See #10368
- {
- C_SaferCond mdlog_cleared;
- mdlog->wait_for_safe(new MDSInternalContextWrapper(this, &mdlog_cleared));
- mds_lock.Unlock();
- r = mdlog_cleared.wait();
- mds_lock.Lock();
- if (r != 0) {
- *ss << "Error " << r << " (" << cpp_strerror(r) << ") while flushing journal";
- return r;
- }
+ Mutex::Locker locker(mds_lock);
+ C_Flush_Journal *flush_journal = new C_Flush_Journal(mdcache, mdlog, this, &ss, &cond);
+ flush_journal->send();
}
+ int r = cond.wait();
- // Put all the old log segments into expiring or expired state
- dout(5) << __func__ << ": beginning segment expiry" << dendl;
- r = mdlog->trim_all();
- if (r != 0) {
- *ss << "Error " << r << " (" << cpp_strerror(r) << ") while trimming log";
- return r;
- }
-
- // Attach contexts to wait for all expiring segments to expire
- MDSGatherBuilder expiry_gather(g_ceph_context);
-
- const std::set<LogSegment*> &expiring_segments = mdlog->get_expiring_segments();
- for (std::set<LogSegment*>::const_iterator i = expiring_segments.begin();
- i != expiring_segments.end(); ++i) {
- (*i)->wait_for_expiry(expiry_gather.new_sub());
- }
- dout(5) << __func__ << ": waiting for " << expiry_gather.num_subs_created()
- << " segments to expire" << dendl;
-
- if (expiry_gather.has_subs()) {
- C_SaferCond cond;
- expiry_gather.set_finisher(new MDSInternalContextWrapper(this, &cond));
- expiry_gather.activate();
-
- // Drop mds_lock to allow progress until expiry is complete
- mds_lock.Unlock();
- int r = cond.wait();
- mds_lock.Lock();
-
- assert(r == 0); // MDLog is not allowed to raise errors via wait_for_expiry
- }
-
- dout(5) << __func__ << ": expiry complete, expire_pos/trim_pos is now " << std::hex <<
- mdlog->get_journaler()->get_expire_pos() << "/" <<
- mdlog->get_journaler()->get_trimmed_pos() << dendl;
-
- // Now everyone I'm interested in is expired
- mdlog->trim_expired_segments();
-
- dout(5) << __func__ << ": trim complete, expire_pos/trim_pos is now " << std::hex <<
- mdlog->get_journaler()->get_expire_pos() << "/" <<
- mdlog->get_journaler()->get_trimmed_pos() << dendl;
-
- // Flush the journal header so that readers will start from after the flushed region
- C_SaferCond wrote_head;
- mdlog->get_journaler()->write_head(&wrote_head);
- mds_lock.Unlock(); // Drop lock to allow messenger dispatch progress
- r = wrote_head.wait();
- mds_lock.Lock();
- if (r != 0) {
- *ss << "Error " << r << " (" << cpp_strerror(r) << ") while writing header";
- return r;
- }
-
- dout(5) << __func__ << ": write_head complete, all done!" << dendl;
-
- return 0;
+ f->open_object_section("result");
+ f->dump_string("message", ss.str());
+ f->dump_int("return_code", r);
+ f->close_section();
}
-
void MDSRank::command_get_subtrees(Formatter *f)
{
assert(f != NULL);