]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osdc/Journaler: make header write_pos align to boundary of flushed entry
authorYan, Zheng <zyan@redhat.com>
Wed, 12 Apr 2017 08:00:18 +0000 (16:00 +0800)
committerNathan Cutler <ncutler@suse.com>
Wed, 14 Jun 2017 14:40:26 +0000 (16:40 +0200)
This can speed up the process that detects and drops partial written
entry in the log tail.

Signed-off-by: "Yan, Zheng" <zyan@redhat.com>
(cherry picked from commit 8ae2962b79903e217fda83cea4140af64b5d6883)

Conflicts:
src/osdc/Journaler.cc - 8d4f6b92cba is not being backported to jewel
src/osdc/Journaler.h - Journaler::Journaler initializer list is different in jewel, compared to master

src/osdc/Journaler.cc
src/osdc/Journaler.h

index c57616edca47e83fa4e4a04bfbafb66c5edd96f2..88208dcec0082b545e37cc1a66e7df6c21e304a4 100644 (file)
@@ -56,9 +56,10 @@ void Journaler::create(file_layout_t *l, stream_format_t const sf)
   journal_stream.set_format(sf);
   _set_layout(l);
 
-  prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos =
-    read_pos = requested_pos = received_pos =
-    expire_pos = trimming_pos = trimmed_pos = layout.get_period();
+  prezeroing_pos = prezero_pos = write_pos = flush_pos =
+    safe_pos = read_pos = requested_pos = received_pos =
+    expire_pos = trimming_pos = trimmed_pos =
+    next_safe_pos = layout.get_period();
 
   ldout(cct, 1) << "created blank journal at inode 0x" << std::hex << ino
                << std::dec << ", format=" << stream_format << dendl;
@@ -219,7 +220,7 @@ void Journaler::_finish_reread_head(int r, bufferlist& bl, Context *finish)
       finish->complete(-EINVAL);
       return;
     }
-    prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos
+    prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = next_safe_pos
       = h.write_pos;
     expire_pos = h.expire_pos;
     trimmed_pos = trimming_pos = h.trimmed_pos;
@@ -280,7 +281,7 @@ void Journaler::_finish_read_head(int r, bufferlist& bl)
     return;
   }
 
-  prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos
+  prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = next_safe_pos
     = h.write_pos;
   read_pos = requested_pos = received_pos = expire_pos = h.expire_pos;
   trimmed_pos = trimming_pos = h.trimmed_pos;
@@ -328,7 +329,7 @@ void Journaler::_finish_reprobe(int r, uint64_t new_end,
   ldout(cct, 1) << "_finish_reprobe new_end = " << new_end
          << " (header had " << write_pos << ")."
          << dendl;
-  prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = new_end;
+  prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = next_safe_pos = new_end;
   state = STATE_ACTIVE;
   onfinish->complete(r);
 }
@@ -355,7 +356,7 @@ void Journaler::_finish_probe_end(int r, uint64_t end)
 
   state = STATE_ACTIVE;
 
-  prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = end;
+  prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = next_safe_pos = end;
 
 out:
   // done.
@@ -491,7 +492,6 @@ void Journaler::_finish_flush(int r, uint64_t start, ceph::real_time stamp)
     return;
   }
 
-  assert(start >= safe_pos);
   assert(start < flush_pos);
 
   // calc latency?
@@ -501,12 +501,13 @@ void Journaler::_finish_flush(int r, uint64_t start, ceph::real_time stamp)
   }
 
   // adjust safe_pos
-  assert(pending_safe.count(start));
-  pending_safe.erase(start);
+  auto it = pending_safe.find(start);
+  assert(it != pending_safe.end());
+  pending_safe.erase(it);
   if (pending_safe.empty())
-    safe_pos = flush_pos;
+    safe_pos = next_safe_pos;
   else
-    safe_pos = *pending_safe.begin();
+    safe_pos = pending_safe.begin()->second;
 
   ldout(cct, 10) << "_finish_flush safe from " << start
                 << ", pending_safe " << pending_safe
@@ -593,6 +594,10 @@ uint64_t Journaler::append_entry(bufferlist& bl)
     ldout(cct, 10) << " flushing completed object(s) (su " << su << " wro "
                   << write_obj << " flo " << flush_obj << ")" << dendl;
     _do_flush(write_buf.length() - write_off);
+    if (write_off) {
+      // current entry isn't being flushed, set next_safe_pos to the end of previous entry
+      next_safe_pos = write_pos - wrote;
+    }
   }
 
   return write_pos;
@@ -645,13 +650,14 @@ void Journaler::_do_flush(unsigned amount)
   SnapContext snapc;
 
   Context *onsafe = new C_Flush(this, flush_pos, now);  // on COMMIT
-  pending_safe.insert(flush_pos);
+  pending_safe[flush_pos] = next_safe_pos;
 
   bufferlist write_bl;
 
   // adjust pointers
   if (len == write_buf.length()) {
     write_bl.swap(write_buf);
+    next_safe_pos = write_pos;
   } else {
     write_buf.splice(0, len, &write_bl);
   }
@@ -978,7 +984,7 @@ void Journaler::_issue_read(uint64_t len)
     ldout(cct, 10) << "_issue_read requested_pos = safe_pos = " << safe_pos
                   << ", waiting" << dendl;
     assert(write_pos > requested_pos);
-    if (flush_pos == safe_pos) {
+    if (pending_safe.empty()) {
       _flush(NULL);
     }
     assert(flush_pos > safe_pos);
@@ -1077,7 +1083,7 @@ bool Journaler::_is_readable()
       "adjusting write_pos to " << read_pos << dendl;
 
     // adjust write_pos
-    prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = read_pos;
+    prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = next_safe_pos = read_pos;
     assert(write_buf.length() == 0);
 
     // reset read state
index 58269462ea516499ebf5ea5804d2e44792e8ecb4..e96ebf3592f411ace3189657caa9c14b19ab32aa 100644 (file)
@@ -309,6 +309,11 @@ private:
   uint64_t flush_pos; ///< where we will flush. if
                      ///  write_pos>flush_pos, we're buffering writes.
   uint64_t safe_pos; ///< what has been committed safely to disk.
+
+  uint64_t next_safe_pos; /// start postion of the first entry that isn't
+                         /// being fully flushed. If we don't flush any
+                         // partial entry, it's equal to flush_pos.
+
   bufferlist write_buf; ///< write buffer.  flush_pos +
                        ///  write_buf.length() == write_pos.
 
@@ -317,7 +322,7 @@ private:
 
   bool waiting_for_zero;
   interval_set<uint64_t> pending_zero;  // non-contig bits we've zeroed
-  std::set<uint64_t> pending_safe;
+  std::map<uint64_t, uint64_t> pending_safe; // flush_pos -> safe_pos
   // when safe through given offset
   std::map<uint64_t, std::list<Context*> > waitfor_safe;
 
@@ -409,7 +414,8 @@ public:
     objecter(obj), filer(objecter, f), logger(l), logger_key_lat(lkey),
     timer(tim), delay_flush_event(0),
     state(STATE_UNDEF), error(0),
-    prezeroing_pos(0), prezero_pos(0), write_pos(0), flush_pos(0), safe_pos(0),
+    prezeroing_pos(0), prezero_pos(0), write_pos(0), flush_pos(0),
+    safe_pos(0), next_safe_pos(0),
     write_buf_throttle(cct, "write_buf_throttle", UINT_MAX - (UINT_MAX >> 3)),
     waiting_for_zero(false),
     read_pos(0), requested_pos(0), received_pos(0),
@@ -439,6 +445,7 @@ public:
     write_pos = 0;
     flush_pos = 0;
     safe_pos = 0;
+    next_safe_pos = 0;
     read_pos = 0;
     requested_pos = 0;
     received_pos = 0;
@@ -469,7 +476,7 @@ public:
   void set_writeable();
   void set_write_pos(int64_t p) {
     lock_guard l(lock);
-    prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = p;
+    prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = next_safe_pos = p;
   }
   void set_read_pos(int64_t p) {
     lock_guard l(lock);