]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mds: drop partial entry and adjust write_pos when opening PurgeQueue 14447/head
authorYan, Zheng <zyan@redhat.com>
Wed, 12 Apr 2017 10:06:23 +0000 (18:06 +0800)
committerYan, Zheng <zyan@redhat.com>
Tue, 18 Apr 2017 02:20:09 +0000 (10:20 +0800)
At tail journal, there can be partial written entry. Before appending
new entries to the journal, we need to drop any partial written entry
and adjust write_pos. For mds log, partial written entry is detected
and dropped when replaying the journal.

For PurgeQueue journal, we don't replay the whole journal when MDS
starts. Before appending new entry to the journal, we need to drop
any partial written entry and adjust write_pos.

Previous patch makes the journal header write_pos align to boundary
of fully flushed entry. We can start finding partial written entry
from the journal header write_pos. It should be fast even when the
purge queue is very large.

Fixes: http://tracker.ceph.com/issues/19450
Signed-off-by: "Yan, Zheng" <zyan@redhat.com>
src/mds/PurgeQueue.cc
src/mds/PurgeQueue.h

index 7bea41bfab3a0ad3d09afdabb77534ed1d1b8d79..2cf61938afcd73b4833e4dd1986ee04e2827a2bb 100644 (file)
@@ -139,17 +139,65 @@ void PurgeQueue::open(Context *completion)
     } else if (r == 0) {
       Mutex::Locker l(lock);
       dout(4) << "open complete" << dendl;
-      if (r == 0) {
-        journaler.set_writeable();
+
+      // Journaler only guarantees entries before head write_pos have been
+      // fully flushed. Before appending new entries, we need to find and
+      // drop any partial written entry.
+      if (journaler.last_committed.write_pos < journaler.get_write_pos()) {
+       dout(4) << "recovering write_pos" << dendl;
+       journaler.set_read_pos(journaler.last_committed.write_pos);
+       _recover(completion);
+       return;
       }
-      completion->complete(r);
+
+      journaler.set_writeable();
+      completion->complete(0);
     } else {
       derr << "Error " << r << " loading Journaler" << dendl;
-      on_error->complete(0);
+      on_error->complete(r);
     }
   }));
 }
 
+
+void PurgeQueue::_recover(Context *completion)
+{
+  assert(lock.is_locked_by_me());
+
+  // Journaler::is_readable() adjusts write_pos if partial entry is encountered
+  while (1) {
+    if (!journaler.is_readable() &&
+       !journaler.get_error() &&
+       journaler.get_read_pos() < journaler.get_write_pos()) {
+      journaler.wait_for_readable(new FunctionContext([this, completion](int r) {
+        Mutex::Locker l(lock);
+       _recover(completion);
+      }));
+      return;
+    }
+
+    if (journaler.get_error()) {
+      int r = journaler.get_error();
+      derr << "Error " << r << " recovering write_pos" << dendl;
+      on_error->complete(r);
+      return;
+    }
+
+    if (journaler.get_read_pos() == journaler.get_write_pos()) {
+      dout(4) << "write_pos recovered" << dendl;
+      // restore original read_pos
+      journaler.set_read_pos(journaler.last_committed.expire_pos);
+      journaler.set_writeable();
+      completion->complete(0);
+      return;
+    }
+
+    bufferlist bl;
+    bool readable = journaler.try_read_entry(bl);
+    assert(readable);  // we checked earlier
+  }
+}
+
 void PurgeQueue::create(Context *fin)
 {
   dout(4) << "creating" << dendl;
index 33b8c8617998a8c5cfc87489336cc5b254d38e04..b9699ddf3e3d43dc21c4085d6b015eea6575ec7a 100644 (file)
@@ -112,6 +112,9 @@ protected:
   // Has drain() ever been called on this instance?
   bool draining;
 
+  // recover the journal write_pos (drop any partial written entry)
+  void _recover(Context *completion);
+
   /**
    * @return true if we were in a position to try and consume something:
    *         does not mean we necessarily did.