]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mds: implement journal flush as asynchronous context execution
authorVenky Shankar <vshankar@redhat.com>
Fri, 14 Sep 2018 04:23:11 +0000 (00:23 -0400)
committerVenky Shankar <vshankar@redhat.com>
Thu, 15 Nov 2018 09:15:13 +0000 (14:45 +0530)
Signed-off-by: Venky Shankar <vshankar@redhat.com>
(cherry picked from commit 75f42340db0ee7263b0708675b8cdfda3bc78059)

 Conflicts:
src/mds/MDSRank.cc
src/mds/MDSRank.h

src/mds/MDSRank.cc
src/mds/MDSRank.h

index be5d186bda3516028bb7fb34f27e8c763a3ca292..43df4af62c2724440bf679cd2542be17a0ac7d4a 100644 (file)
 #undef dout_prefix
 #define dout_prefix *_dout << "mds." << whoami << '.' << incarnation << ' '
 
+class C_Flush_Journal : public MDSInternalContext {
+public:
+  C_Flush_Journal(MDCache *mdcache, MDLog *mdlog, MDSRank *mds,
+                  std::ostream *ss, Context *on_finish)
+    : MDSInternalContext(mds),
+      mdcache(mdcache), mdlog(mdlog), ss(ss), on_finish(on_finish),
+      whoami(mds->whoami), incarnation(mds->incarnation) {
+  }
+
+  void send() {
+    assert(mds->mds_lock.is_locked());
+
+    dout(20) << __func__ << dendl;
+
+    if (mdcache->is_readonly()) {
+      dout(5) << __func__ << ": read-only FS" << dendl;
+      complete(-EROFS);
+      return;
+    }
+
+    if (!mds->is_active()) {
+      dout(5) << __func__ << ": MDS not active, no-op" << dendl;
+      complete(0);
+      return;
+    }
+
+    flush_mdlog();
+  }
+
+private:
+
+  void flush_mdlog() {
+    dout(20) << __func__ << dendl;
+
+    // I need to seal off the current segment, and then mark all
+    // previous segments for expiry
+    mdlog->start_new_segment();
+
+    Context *ctx = new FunctionContext([this](int r) {
+        handle_flush_mdlog(r);
+      });
+
+    // Flush initially so that all the segments older than our new one
+    // will be elegible for expiry
+    mdlog->flush();
+    mdlog->wait_for_safe(new MDSInternalContextWrapper(mds, ctx));
+  }
+
+  void handle_flush_mdlog(int r) {
+    dout(20) << __func__ << ": r=" << r << dendl;
+
+    if (r != 0) {
+      *ss << "Error " << r << " (" << cpp_strerror(r) << ") while flushing journal";
+      complete(r);
+      return;
+    }
+
+    clear_mdlog();
+  }
+
+  void clear_mdlog() {
+    dout(20) << __func__ << dendl;
+
+    Context *ctx = new FunctionContext([this](int r) {
+        handle_clear_mdlog(r);
+      });
+
+    // Because we may not be the last wait_for_safe context on MDLog,
+    // and subsequent contexts might wake up in the middle of our
+    // later trim_all and interfere with expiry (by e.g. marking
+    // dirs/dentries dirty on previous log segments), we run a second
+    // wait_for_safe here. See #10368
+    mdlog->wait_for_safe(new MDSInternalContextWrapper(mds, ctx));
+  }
+
+  void handle_clear_mdlog(int r) {
+    dout(20) << __func__ << ": r=" << r << dendl;
+
+    if (r != 0) {
+      *ss << "Error " << r << " (" << cpp_strerror(r) << ") while flushing journal";
+      complete(r);
+      return;
+    }
+
+    trim_mdlog();
+  }
+
+  void trim_mdlog() {
+    // Put all the old log segments into expiring or expired state
+    dout(5) << __func__ << ": beginning segment expiry" << dendl;
+
+    int ret = mdlog->trim_all();
+    if (ret != 0) {
+      *ss << "Error " << ret << " (" << cpp_strerror(ret) << ") while trimming log";
+      complete(ret);
+      return;
+    }
+
+    expire_segments();
+  }
+
+  void expire_segments() {
+    dout(20) << __func__ << dendl;
+
+    // Attach contexts to wait for all expiring segments to expire
+    MDSGatherBuilder *expiry_gather = new MDSGatherBuilder(g_ceph_context);
+
+    const auto &expiring_segments = mdlog->get_expiring_segments();
+    for (auto p : expiring_segments) {
+      p->wait_for_expiry(expiry_gather->new_sub());
+    }
+    dout(5) << __func__ << ": waiting for " << expiry_gather->num_subs_created()
+            << " segments to expire" << dendl;
+
+    if (!expiry_gather->has_subs()) {
+      trim_segments();
+      delete expiry_gather;
+      return;
+    }
+
+    Context *ctx = new FunctionContext([this](int r) {
+        handle_expire_segments(r);
+      });
+    expiry_gather->set_finisher(new MDSInternalContextWrapper(mds, ctx));
+    expiry_gather->activate();
+  }
+
+  void handle_expire_segments(int r) {
+    dout(20) << __func__ << ": r=" << r << dendl;
+
+    ceph_assert(r == 0); // MDLog is not allowed to raise errors via
+                         // wait_for_expiry
+    trim_segments();
+  }
+
+  void trim_segments() {
+    dout(20) << __func__ << dendl;
+
+    Context *ctx = new C_OnFinisher(new FunctionContext([this](int _) {
+          Mutex::Locker locker(mds->mds_lock);
+          trim_expired_segments();
+        }), mds->finisher);
+    ctx->complete(0);
+  }
+
+  void trim_expired_segments() {
+    dout(5) << __func__ << ": expiry complete, expire_pos/trim_pos is now "
+            << std::hex << mdlog->get_journaler()->get_expire_pos() << "/"
+            << mdlog->get_journaler()->get_trimmed_pos() << dendl;
+
+    // Now everyone I'm interested in is expired
+    mdlog->trim_expired_segments();
+
+    dout(5) << __func__ << ": trim complete, expire_pos/trim_pos is now "
+            << std::hex << mdlog->get_journaler()->get_expire_pos() << "/"
+            << mdlog->get_journaler()->get_trimmed_pos() << dendl;
+
+    write_journal_head();
+  }
+
+  void write_journal_head() {
+    dout(20) << __func__ << dendl;
+
+    Context *ctx = new FunctionContext([this](int r) {
+        Mutex::Locker locker(mds->mds_lock);
+        handle_write_head(r);
+      });
+    // Flush the journal header so that readers will start from after
+    // the flushed region
+    mdlog->get_journaler()->write_head(ctx);
+  }
+
+  void handle_write_head(int r) {
+    if (r != 0) {
+      *ss << "Error " << r << " (" << cpp_strerror(r) << ") while writing header";
+    } else {
+      dout(5) << __func__ << ": write_head complete, all done!" << dendl;
+    }
+
+    complete(r);
+  }
+
+  void finish(int r) override {
+    dout(20) << __func__ << ": r=" << r << dendl;
+    on_finish->complete(r);
+  }
+
+  MDCache *mdcache;
+  MDLog *mdlog;
+  std::ostream *ss;
+  Context *on_finish;
+
+  // so as to use dout
+  mds_rank_t whoami;
+  int incarnation;
+};
+
 MDSRank::MDSRank(
     mds_rank_t whoami_,
     Mutex &mds_lock_,
@@ -2321,142 +2518,28 @@ void MDSRank::command_flush_path(Formatter *f, std::string_view path)
   f->close_section(); // results
 }
 
-/**
- * Wrapper around _command_flush_journal that
- * handles serialization of result
- */
+// synchronous wrapper around "journal flush" asynchronous context
+// execution.
 void MDSRank::command_flush_journal(Formatter *f)
 {
-  assert(f != NULL);
+  ceph_assert(f != NULL);
 
+  C_SaferCond cond;
   std::stringstream ss;
-  const int r = _command_flush_journal(&ss);
-  f->open_object_section("result");
-  f->dump_string("message", ss.str());
-  f->dump_int("return_code", r);
-  f->close_section();
-}
 
-/**
- * Implementation of "flush journal" asok command.
- *
- * @param ss
- * Optionally populate with a human readable string describing the
- * reason for any unexpected return status.
- */
-int MDSRank::_command_flush_journal(std::stringstream *ss)
-{
-  assert(ss != NULL);
-
-  Mutex::Locker l(mds_lock);
-
-  if (mdcache->is_readonly()) {
-    dout(5) << __func__ << ": read-only FS" << dendl;
-    return -EROFS;
-  }
-
-  if (!is_active()) {
-    dout(5) << __func__ << ": MDS not active, no-op" << dendl;
-    return 0;
-  }
-
-  // I need to seal off the current segment, and then mark all previous segments
-  // for expiry
-  mdlog->start_new_segment();
-  int r = 0;
-
-  // Flush initially so that all the segments older than our new one
-  // will be elegible for expiry
   {
-    C_SaferCond mdlog_flushed;
-    mdlog->flush();
-    mdlog->wait_for_safe(new MDSInternalContextWrapper(this, &mdlog_flushed));
-    mds_lock.Unlock();
-    r = mdlog_flushed.wait();
-    mds_lock.Lock();
-    if (r != 0) {
-      *ss << "Error " << r << " (" << cpp_strerror(r) << ") while flushing journal";
-      return r;
-    }
-  }
-
-  // Because we may not be the last wait_for_safe context on MDLog, and
-  // subsequent contexts might wake up in the middle of our later trim_all
-  // and interfere with expiry (by e.g. marking dirs/dentries dirty
-  // on previous log segments), we run a second wait_for_safe here.
-  // See #10368
-  {
-    C_SaferCond mdlog_cleared;
-    mdlog->wait_for_safe(new MDSInternalContextWrapper(this, &mdlog_cleared));
-    mds_lock.Unlock();
-    r = mdlog_cleared.wait();
-    mds_lock.Lock();
-    if (r != 0) {
-      *ss << "Error " << r << " (" << cpp_strerror(r) << ") while flushing journal";
-      return r;
-    }
+    Mutex::Locker locker(mds_lock);
+    C_Flush_Journal *flush_journal = new C_Flush_Journal(mdcache, mdlog, this, &ss, &cond);
+    flush_journal->send();
   }
+  int r = cond.wait();
 
-  // Put all the old log segments into expiring or expired state
-  dout(5) << __func__ << ": beginning segment expiry" << dendl;
-  r = mdlog->trim_all();
-  if (r != 0) {
-    *ss << "Error " << r << " (" << cpp_strerror(r) << ") while trimming log";
-    return r;
-  }
-
-  // Attach contexts to wait for all expiring segments to expire
-  MDSGatherBuilder expiry_gather(g_ceph_context);
-
-  const std::set<LogSegment*> &expiring_segments = mdlog->get_expiring_segments();
-  for (std::set<LogSegment*>::const_iterator i = expiring_segments.begin();
-       i != expiring_segments.end(); ++i) {
-    (*i)->wait_for_expiry(expiry_gather.new_sub());
-  }
-  dout(5) << __func__ << ": waiting for " << expiry_gather.num_subs_created()
-          << " segments to expire" << dendl;
-
-  if (expiry_gather.has_subs()) {
-    C_SaferCond cond;
-    expiry_gather.set_finisher(new MDSInternalContextWrapper(this, &cond));
-    expiry_gather.activate();
-
-    // Drop mds_lock to allow progress until expiry is complete
-    mds_lock.Unlock();
-    int r = cond.wait();
-    mds_lock.Lock();
-
-    assert(r == 0);  // MDLog is not allowed to raise errors via wait_for_expiry
-  }
-
-  dout(5) << __func__ << ": expiry complete, expire_pos/trim_pos is now " << std::hex <<
-    mdlog->get_journaler()->get_expire_pos() << "/" <<
-    mdlog->get_journaler()->get_trimmed_pos() << dendl;
-
-  // Now everyone I'm interested in is expired
-  mdlog->trim_expired_segments();
-
-  dout(5) << __func__ << ": trim complete, expire_pos/trim_pos is now " << std::hex <<
-    mdlog->get_journaler()->get_expire_pos() << "/" <<
-    mdlog->get_journaler()->get_trimmed_pos() << dendl;
-
-  // Flush the journal header so that readers will start from after the flushed region
-  C_SaferCond wrote_head;
-  mdlog->get_journaler()->write_head(&wrote_head);
-  mds_lock.Unlock();  // Drop lock to allow messenger dispatch progress
-  r = wrote_head.wait();
-  mds_lock.Lock();
-  if (r != 0) {
-      *ss << "Error " << r << " (" << cpp_strerror(r) << ") while writing header";
-      return r;
-  }
-
-  dout(5) << __func__ << ": write_head complete, all done!" << dendl;
-
-  return 0;
+  f->open_object_section("result");
+  f->dump_string("message", ss.str());
+  f->dump_int("return_code", r);
+  f->close_section();
 }
 
-
 void MDSRank::command_get_subtrees(Formatter *f)
 {
   assert(f != NULL);
index 11e1e6daa66f2277ec8f067ee960417403936288..db0d6ccde0808ffb6b7d2294592e46a1acb5fc42 100644 (file)
@@ -132,6 +132,9 @@ class MDSRank {
     int incarnation;
 
   public:
+
+    friend class C_Flush_Journal;
+
     mds_rank_t get_nodeid() const { return whoami; }
     int64_t get_metadata_pool();
 
@@ -471,7 +474,6 @@ class MDSRank {
         std::ostream &ss,
         Formatter *f);
     int _command_export_dir(std::string_view path, mds_rank_t dest);
-    int _command_flush_journal(std::stringstream *ss);
     CDir *_command_dirfrag_get(
         const cmdmap_t &cmdmap,
         std::ostream &ss);