]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
journaler: fix prefetch to handle multiple in-flight reads
authorSage Weil <sage.weil@dreamhost.com>
Fri, 25 Mar 2011 16:42:47 +0000 (09:42 -0700)
committerSage Weil <sage.weil@dreamhost.com>
Fri, 25 Mar 2011 16:42:47 +0000 (09:42 -0700)
If we issue multiple reads, they can come back in any order.

Signed-off-by: Sage Weil <sage.weil@dreamhost.com>
src/osdc/Journaler.cc
src/osdc/Journaler.h

index f0ea07683418394fbf44ea5ec4c0ce7f5a428a4a..c9f9d3d571461f7d0941cdd29509457fb43080ed 100644 (file)
@@ -628,9 +628,13 @@ void Journaler::flush(Context *onsync, Context *onsafe, bool add_ack_barrier)
 
 class Journaler::C_Read : public Context {
   Journaler *ls;
+  uint64_t offset;
 public:
-  C_Read(Journaler *l) : ls(l) {}
-  void finish(int r) { ls->_finish_read(r); }
+  bufferlist bl;
+  C_Read(Journaler *l, uint64_t o) : ls(l), offset(o) {}
+  void finish(int r) {
+    ls->_finish_read(r, offset, bl);
+  }
 };
 
 class Journaler::C_RetryRead : public Context {
@@ -643,7 +647,7 @@ public:
   }  
 };
 
-void Journaler::_finish_read(int r)
+void Journaler::_finish_read(int r, uint64_t offset, bufferlist& bl)
 {
   if (r < 0) {
     dout(0) << "_finish_read got error " << r << dendl;
@@ -658,15 +662,39 @@ void Journaler::_finish_read(int r)
   }
   assert(r>=0);
 
-  dout(10) << "_finish_read got " << received_pos << "~" << reading_buf.length() << dendl;
-  received_pos += reading_buf.length();
-  read_buf.claim_append(reading_buf);
-  assert(received_pos <= requested_pos);
-  dout(10) << "_finish_read read_buf now " << read_pos << "~" << read_buf.length() 
-          << ", read pointers " << read_pos << "/" << received_pos << "/" << requested_pos
-          << dendl;
-  
-  if (_is_readable()) {
+  dout(10) << "_finish_read got " << offset << "~" << bl.length() << dendl;
+  prefetch_buf[offset].swap(bl);
+
+  _assimilate_prefetch();
+  _prefetch();
+}
+
+void Journaler::_assimilate_prefetch()
+{
+  bool any = false;
+  while (!prefetch_buf.empty()) {
+    map<uint64_t,bufferlist>::iterator p = prefetch_buf.begin();
+    if (p->first != received_pos) {
+      uint64_t gap = p->first - received_pos;
+      dout(10) << "_assimilate_prefetch gap of " << gap << " from received_pos " << received_pos
+              << " to first prefetched buffer " << p->first << dendl;
+      break;
+    }
+
+    dout(10) << "_assimilate_prefetch " << p->first << "~" << p->second.length() << dendl;
+    received_pos += p->second.length();
+    read_buf.claim_append(p->second);
+    assert(received_pos <= requested_pos);
+    prefetch_buf.erase(p);
+    any = true;
+  }
+
+  if (any)
+    dout(10) << "_assimilate_prefetch read_buf now " << read_pos << "~" << read_buf.length() 
+            << ", read pointers " << read_pos << "/" << received_pos << "/" << requested_pos
+            << dendl;
+
+  if (any && _is_readable()) {
     // readable!
     dout(10) << "_finish_read now readable" << dendl;
     if (on_readable) {
@@ -690,8 +718,6 @@ void Journaler::_finish_read(int r)
       delete f;
     }
   }
-  
-  _prefetch();
 }
 
 /* NOTE: this could be slightly smarter... we could allow
@@ -728,9 +754,9 @@ void Journaler::_issue_read(int64_t len)
           << ", read pointers " << read_pos << "/" << received_pos << "/" << (requested_pos+len)
           << dendl;
   
+  C_Read *c = new C_Read(this, requested_pos);
   filer.read(ino, &layout, CEPH_NOSNAP,
-            requested_pos, len, &reading_buf, 0,
-            new C_Read(this));
+            requested_pos, len, &c->bl, 0, c);
   requested_pos += len;
 }
 
index d751e44b0c26bb48c1a868fb0b266e8a65921bec..29d94a4481c47f07fbc1a9fc0cd1a329561df74f 100644 (file)
@@ -185,7 +185,8 @@ private:
   uint64_t requested_pos; // what we've requested from OSD.
   uint64_t received_pos;  // what we've received from OSD.
   bufferlist read_buf; // read buffer.  unused_field + read_buf.length() == prefetch_pos.
-  bufferlist reading_buf; // what i'm reading into
+
+  map<uint64_t,bufferlist> prefetch_buf;
 
   uint64_t fetch_len;     // how much to read at a time
   uint64_t temp_fetch_len;
@@ -199,7 +200,8 @@ private:
   // for wait_for_readable()
   Context    *on_readable;
 
-  void _finish_read(int r);     // we just read some (read completion callback)
+  void _finish_read(int r, uint64_t offset, bufferlist &bl); // read completion callback
+  void _assimilate_prefetch();
   void _issue_read(int64_t len);  // read some more
   void _prefetch();             // maybe read ahead
   class C_Read;