]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
journler: make readahead/prefetch smarter
authorSage Weil <sage.weil@dreamhost.com>
Fri, 25 Mar 2011 16:30:16 +0000 (09:30 -0700)
committerSage Weil <sage.weil@dreamhost.com>
Fri, 25 Mar 2011 16:30:16 +0000 (09:30 -0700)
Always try to prefetch N segments ahead of the current read position.  The
old implementation would read a bunch of data, process it all, then read
a bunch more.  This was suboptimal on a couple different levels.

Also, make an internal _is_readable() _not_ do the prefetch step; only do
that for external callers.

Fixes: #929
Signed-off-by: Sage Weil <sage.weil@dreamhost.com>
src/osdc/Journaler.cc
src/osdc/Journaler.h

index d4a315c5b3bb798c90d34e1260fca1ce217c9489..f0ea07683418394fbf44ea5ec4c0ce7f5a428a4a 100644 (file)
@@ -50,7 +50,10 @@ void Journaler::set_layout(ceph_file_layout *l)
 
   // prefetch intelligently.
   // (watch out, this is big if you use big objects or weird striping)
-  fetch_len = layout.fl_stripe_count * layout.fl_object_size * g_conf.journaler_prefetch_periods;
+  uint64_t periods = g_conf.journaler_prefetch_periods;
+  if (periods < 2)
+    periods = 2;  // we need at least 2 periods to make progress.
+  fetch_len = layout.fl_stripe_count * layout.fl_object_size * periods;
   prefetch_from = fetch_len / 2;
 }
 
@@ -634,7 +637,10 @@ class Journaler::C_RetryRead : public Context {
   Journaler *ls;
 public:
   C_RetryRead(Journaler *l) : ls(l) {}
-  void finish(int r) { ls->is_readable(); }  // this'll kickstart.
+  void finish(int r) {
+    // kickstart.
+    ls->_prefetch();
+  }  
 };
 
 void Journaler::_finish_read(int r)
@@ -660,7 +666,7 @@ void Journaler::_finish_read(int r)
           << ", read pointers " << read_pos << "/" << received_pos << "/" << requested_pos
           << dendl;
   
-  if (is_readable()) { // NOTE: this check may read more
+  if (_is_readable()) {
     // readable!
     dout(10) << "_finish_read now readable" << dendl;
     if (on_readable) {
@@ -685,7 +691,6 @@ void Journaler::_finish_read(int r)
     }
   }
   
-  // prefetch?
   _prefetch();
 }
 
@@ -699,14 +704,8 @@ void Journaler::_issue_read(int64_t len)
   // make sure we're fully flushed
   _do_flush();
 
-  if (_is_reading()) {
-    dout(10) << "_issue_read " << len << " waiting, already reading " 
-            << received_pos << "~" << (requested_pos-received_pos) << dendl;
-    return;
-  } 
-  assert(requested_pos == received_pos);
-
   // stuck at ack_pos?
+  //  (this is needed if we are reading the tail of a journal we are also writing to)
   assert(requested_pos <= ack_pos);
   if (requested_pos == ack_pos) {
     dout(10) << "_issue_read requested_pos = ack_pos = " << ack_pos << ", waiting" << dendl;
@@ -737,14 +736,31 @@ void Journaler::_issue_read(int64_t len)
 
 void Journaler::_prefetch()
 {
-  // prefetch?
-  uint64_t left = requested_pos - read_pos;
-  if (left <= prefetch_from &&      // should read more,
-      !_is_reading() &&             // and not reading anything right now
-      write_pos > requested_pos) {  // there's something more to read...
-    dout(10) << "_prefetch only " << left << " < " << prefetch_from
-            << ", prefetching " << dendl;
-    _issue_read(fetch_len);
+  // prefetch
+  uint64_t pf;
+  if (temp_fetch_len) {
+    dout(10) << "_prefetch temp_fetch_len " << temp_fetch_len << dendl;
+    pf = temp_fetch_len;
+    temp_fetch_len = 0;
+  } else {
+    pf = fetch_len;
+  }
+
+  uint64_t raw_target = read_pos + pf;
+
+  // only read full log segments
+  uint64_t period = get_layout_period();
+  uint64_t target = raw_target - (raw_target % period);
+
+  // don't read past the log tail
+  if (target > write_pos)
+    target = write_pos;
+
+  if (requested_pos < target) {
+    uint64_t len = target - requested_pos;
+    dout(10) << "_prefetch " << pf << " requested_pos " << requested_pos << " < target " << target
+            << " (" << raw_target << "), prefetching " << len << dendl;
+    _issue_read(len);
   }
 }
 
@@ -752,10 +768,11 @@ void Journaler::_prefetch()
  *  return true if next entry is ready.
  *  kickstart read as necessary.
  */
-bool Journaler::is_readable() 
+bool Journaler::_is_readable()
 {
   // anything to read?
-  if (read_pos == write_pos) return false;
+  if (read_pos == write_pos)
+    return false;
 
   // have enough for entry size?
   uint32_t s = 0;
@@ -777,23 +794,28 @@ bool Journaler::is_readable()
       junk_tail_pos = write_pos; // note old tail
     write_pos = flush_pos = ack_pos = safe_pos = read_pos;
     assert(write_buf.length() == 0);
-
+    
     // truncate?
     // FIXME: how much?
-    
     return false;
-  } 
-
-  // start reading some more?
-  if (!_is_reading()) {
-    if (s)
-      fetch_len = MAX(fetch_len, (sizeof(s)+s-read_buf.length()));
-    _issue_read(fetch_len);
   }
 
+  uint64_t need = (sizeof(s)+s-read_buf.length());
+  if (need > fetch_len) {
+    dout(10) << "_is_readable noting temp_fetch_len " << temp_fetch_len
+            << " for len " << s << " entry" << dendl;
+    temp_fetch_len = need;
+  }
   return false;
 }
 
+bool Journaler::is_readable() 
+{
+  bool r =_is_readable();
+  _prefetch();
+  return r;
+}
+
 bool Journaler::truncate_tail_junk(Context *c)
 {
   if(readonly)
@@ -852,7 +874,7 @@ bool Journaler::try_read_entry(bufferlist& bl)
 void Journaler::wait_for_readable(Context *onreadable)
 {
   dout(10) << "wait_for_readable at " << read_pos << " onreadable " << onreadable << dendl;
-  assert(!is_readable());
+  assert(!_is_readable());
   assert(on_readable == 0);
   on_readable = onreadable;
 }
@@ -876,7 +898,7 @@ public:
 void Journaler::trim()
 {
   assert(!readonly);
-  uint64_t period = layout.fl_stripe_count * layout.fl_object_size;
+  uint64_t period = get_layout_period();
 
   uint64_t trim_to = last_committed.expire_pos;
   trim_to -= trim_to % period;
index bf02e44a3d6f6b6b0a6159a4231aae274d2ceda6..d751e44b0c26bb48c1a868fb0b266e8a65921bec 100644 (file)
@@ -188,6 +188,7 @@ private:
   bufferlist reading_buf; // what i'm reading into
 
   uint64_t fetch_len;     // how much to read at a time
+  uint64_t temp_fetch_len;
   uint64_t prefetch_from; // how far from end do we read next chunk
 
   int64_t junk_tail_pos; // for truncate
@@ -198,9 +199,6 @@ private:
   // for wait_for_readable()
   Context    *on_readable;
 
-  bool _is_reading() {
-    return requested_pos > received_pos;
-  }
   void _finish_read(int r);     // we just read some (read completion callback)
   void _issue_read(int64_t len);  // read some more
   void _prefetch();             // maybe read ahead
@@ -236,7 +234,7 @@ public:
     state(STATE_UNDEF), error(0),
     write_pos(0), flush_pos(0), ack_pos(0), safe_pos(0),
     read_pos(0), requested_pos(0), received_pos(0),
-    fetch_len(0), prefetch_from(0),
+    fetch_len(0), temp_fetch_len(0), prefetch_from(0),
     junk_tail_pos(0),
     read_bl(0), on_read_finish(0), on_readable(0),
     expire_pos(0), trimming_pos(0), trimmed_pos(0) 
@@ -314,6 +312,8 @@ public:
     read_pos = requested_pos = received_pos = p;
     read_buf.clear();
   }
+
+  bool _is_readable();
   bool is_readable();
   bool try_read_entry(bufferlist& bl);
   void wait_for_readable(Context *onfinish);