]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mds: separate MDLog::safe_pos from journaler 4204/head
authorJohn Spray <john.spray@redhat.com>
Fri, 27 Mar 2015 10:30:46 +0000 (10:30 +0000)
committerJohn Spray <john.spray@redhat.com>
Fri, 27 Mar 2015 22:17:55 +0000 (22:17 +0000)
...and update it via wait_For_flush completions, so
that its updates are ordered with respect to the
callbacks that happen after a log event is persisted.

Fixes: #10368
Signed-off-by: John Spray <john.spray@redhat.com>
src/mds/MDLog.cc
src/mds/MDLog.h

index 746c1fe4e92ee2af78135c18877517013c222cfa..1870b486a2beb7af29ea7b3abd1ce9abcd44f9b2 100644 (file)
@@ -315,6 +315,32 @@ void MDLog::_submit_entry(LogEvent *le, MDSInternalContextBase *c)
   }
 }
 
+/**
+ * Invoked on the flush after each entry submitted
+ */
+class C_MDL_Flushed : public MDSIOContextBase {
+  protected:
+  MDLog *mdlog;
+  MDS *get_mds() {return mdlog->mds;}
+  uint64_t flushed_to;
+  MDSInternalContextBase *wrapped;
+
+  void finish(int r) {
+    if (wrapped) {
+      wrapped->complete(r);
+    }
+
+    mdlog->submit_mutex.Lock();
+    assert(mdlog->safe_pos <= flushed_to);
+    mdlog->safe_pos = flushed_to;
+    mdlog->submit_mutex.Unlock();
+  }
+
+  public:
+  C_MDL_Flushed(MDLog *m, uint64_t ft, MDSInternalContextBase *w)
+    : mdlog(m), flushed_to(ft), wrapped(w) {}
+};
+
 void MDLog::_submit_thread()
 {
   dout(10) << "_submit_thread start" << dendl;
@@ -355,11 +381,12 @@ void MDLog::_submit_thread()
              << " : " << *le << dendl;
 
       // journal it.
-      journaler->append_entry(bl);  // bl is destroyed.
-      ls->end = journaler->get_write_pos();
+      const uint64_t new_write_pos = journaler->append_entry(bl);  // bl is destroyed.
+      ls->end = new_write_pos;
+
+      journaler->wait_for_flush(new C_MDL_Flushed(
+            this, new_write_pos, data.fin));
 
-      if (data.fin)
-       journaler->wait_for_flush(new C_IO_Wrapper(mds, data.fin));
       if (data.flush)
        journaler->flush();
 
@@ -368,8 +395,8 @@ void MDLog::_submit_thread()
 
       delete le;
     } else {
-      if (data.fin)
-       journaler->wait_for_flush(new C_IO_Wrapper(mds, data.fin));
+      journaler->wait_for_flush(new C_MDL_Flushed(
+            this, journaler->get_write_pos(), data.fin));
       if (data.flush)
        journaler->flush();
     }
@@ -541,7 +568,7 @@ void MDLog::trim(int m)
     ++p;
     
     if (pending_events.count(ls->seq) ||
-       ls->end > journaler->get_write_safe_pos()) {
+       ls->end > safe_pos) {
       dout(5) << "trim segment " << ls->seq << "/" << ls->offset << ", not fully flushed yet, safe "
              << journaler->get_write_safe_pos() << " < end " << ls->end << dendl;
       break;
@@ -597,7 +624,6 @@ int MDLog::trim_all()
            << "/" << expiring_segments.size()
            << "/" << expired_segments.size() << dendl;
 
-  uint64_t safe_pos = journaler->get_write_safe_pos();
   uint64_t last_seq = 0;
   if (!segments.empty())
     last_seq = get_last_segment_seq();
@@ -1244,6 +1270,8 @@ void MDLog::_replay_thread()
     logger->set(l_mdl_expos, journaler->get_expire_pos());
   }
 
+  safe_pos = journaler->get_write_safe_pos();
+
   dout(10) << "_replay_thread kicking waiters" << dendl;
   mds->mds_lock.Lock();
   finish_contexts(g_ceph_context, waitfor_replay, r);  
index 7b21ff258e663ddb1b62b16474cb4215b88b1032..bc0cf413e0f246dc45ff1aeaf0bd001f34002abc 100644 (file)
@@ -74,6 +74,11 @@ protected:
 
   bool stopping;
 
+  // Log position which is persistent *and* for which
+  // submit_entry wait_for_safe callbacks have already
+  // been called.
+  uint64_t safe_pos;
+
   inodeno_t ino;
   Journaler *journaler;
 
@@ -184,6 +189,7 @@ public:
                  unflushed(0),
                  capped(false),
                  stopping(false),
+                 safe_pos(0),
                  journaler(0),
                  logger(0),
                  replay_thread(this),
@@ -290,6 +296,7 @@ private:
   void _trim_expired_segments();
 
   friend class C_MaybeExpiredSegment;
+  friend class C_MDL_Flushed;
 
 public:
   void trim_expired_segments();