]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
client: Replace client readahead logic with Readahead 2387/head
authorAdam Crume <adamcrume@gmail.com>
Tue, 16 Sep 2014 01:17:24 +0000 (18:17 -0700)
committerAdam Crume <adamcrume@gmail.com>
Wed, 17 Sep 2014 19:02:18 +0000 (12:02 -0700)
Signed-off-by: Adam Crume <adamcrume@gmail.com>
src/client/Client.cc
src/client/Client.h
src/client/Fh.h

index 667b700b004e61cb5959c8eb949aede54138a878..bcc0176d7906b2af92845ff888545ced8888c9ae 100644 (file)
@@ -5911,6 +5911,23 @@ Fh *Client::_create_fh(Inode *in, int flags, int cmode)
            << ccap_string(in->caps_issued()) << dendl;
   }
 
+  const md_config_t *conf = cct->_conf;
+  loff_t p = in->layout.fl_stripe_count * in->layout.fl_object_size;
+  f->readahead.set_trigger_requests(1);
+  f->readahead.set_min_readahead_size(conf->client_readahead_min);
+  uint64_t max_readahead = Readahead::NO_LIMIT;
+  if (conf->client_readahead_max_bytes) {
+    max_readahead = MIN(max_readahead, conf->client_readahead_max_bytes);
+  }
+  if (conf->client_readahead_max_periods) {
+    max_readahead = MIN(max_readahead, conf->client_readahead_max_periods * p);
+  }
+  f->readahead.set_max_readahead_size(max_readahead);
+  vector<uint64_t> alignments;
+  alignments.push_back(p);
+  alignments.push_back(in->layout.fl_stripe_unit);
+  f->readahead.set_alignments(alignments);
+
   return f;
 }
 
@@ -6243,22 +6260,9 @@ retry:
   }
 
 success:
-  // adjust readahead state
-  if (f->last_pos != start_pos) {
-    f->nr_consec_read = f->consec_read_bytes = 0;
-  } else {
-    f->nr_consec_read++;
-  }
-  f->consec_read_bytes += bl->length();
-  ldout(cct, 10) << "readahead nr_consec_read " << f->nr_consec_read
-          << " for " << f->consec_read_bytes << " bytes" 
-          << " .. last_pos " << f->last_pos << " .. offset "
-          << start_pos << dendl;
-
-  f->last_pos = start_pos + bl->length();
   if (movepos) {
     // adjust fd pos
-    f->pos = f->last_pos;
+    f->pos = start_pos + bl->length();
     unlock_fh_pos(f);
   }
 
@@ -6287,11 +6291,16 @@ done:
   return r < 0 ? r : bl->length();
 }
 
+void Client::C_Readahead::finish(int r) {
+  lgeneric_subdout(client->cct, client, 20) << "client." << client->get_nodeid() << " " << "C_Readahead on " << f->inode << dendl;
+  client->put_cap_ref(f->inode, CEPH_CAP_FILE_RD | CEPH_CAP_FILE_CACHE);
+  f->readahead.dec_pending();
+}
+
 int Client::_read_async(Fh *f, uint64_t off, uint64_t len, bufferlist *bl)
 {
   const md_config_t *conf = cct->_conf;
   Inode *in = f->inode;
-  bool readahead = true;
 
   ldout(cct, 10) << "_read_async " << *in << " " << off << "~" << len << dendl;
 
@@ -6300,65 +6309,11 @@ int Client::_read_async(Fh *f, uint64_t off, uint64_t len, bufferlist *bl)
     return 0;
   if (off + len > in->size) {
     len = in->size - off;    
-    readahead = false;
-  }
-
-  ldout(cct, 10) << "readahead=" << readahead << " nr_consec=" << f->nr_consec_read
-          << " max_byes=" << conf->client_readahead_max_bytes
-          << " max_periods=" << conf->client_readahead_max_periods << dendl;
-
-  // readahead?
-  if (readahead &&
-      f->nr_consec_read &&
-      (conf->client_readahead_max_bytes ||
-       conf->client_readahead_max_periods)) {
-    loff_t l = f->consec_read_bytes * 2;
-    if (conf->client_readahead_min)
-      l = MAX(l, conf->client_readahead_min);
-    if (conf->client_readahead_max_bytes)
-      l = MIN(l, conf->client_readahead_max_bytes);
-    loff_t p = in->layout.fl_stripe_count * in->layout.fl_object_size;
-    if (conf->client_readahead_max_periods)
-      l = MIN(l, conf->client_readahead_max_periods * p);
-
-    if (l >= 2*p)
-      // align large readahead with period
-      l -= (off+l) % p;
-    else {
-      // align readahead with stripe unit if we cross su boundary
-      int su = in->layout.fl_stripe_unit;
-      if ((off+l)/su != off/su) l -= (off+l) % su;
-    }
-    
-    // don't read past end of file
-    if (off+l > in->size)
-      l = in->size - off;
-    
-    loff_t min = MIN((loff_t)len, l/2);
-
-    ldout(cct, 20) << "readahead " << f->nr_consec_read << " reads " 
-            << f->consec_read_bytes << " bytes ... readahead " << off << "~" << l
-            << " min " << min
-            << " (caller wants " << off << "~" << len << ")" << dendl;
-    if (l > (loff_t)len) {
-      if (objectcacher->file_is_cached(&in->oset, &in->layout, in->snapid, off, min))
-       ldout(cct, 20) << "readahead already have min" << dendl;
-      else {
-       Context *onfinish = new C_Readahead(this, in);
-       int r = objectcacher->file_read(&in->oset, &in->layout, in->snapid,
-                                       off, l,
-                                       NULL, 0, onfinish);
-       if (r == 0) {
-         ldout(cct, 20) << "readahead initiated, c " << onfinish << dendl;
-         get_cap_ref(in, CEPH_CAP_FILE_RD | CEPH_CAP_FILE_CACHE);
-       } else {
-         ldout(cct, 20) << "readahead was no-op, already cached" << dendl;
-         delete onfinish;
-       }
-      }
-    }
   }
 
+  ldout(cct, 10) << " max_byes=" << conf->client_readahead_max_bytes
+                << " max_periods=" << conf->client_readahead_max_periods << dendl;
+
   // read (and possibly block)
   int r, rvalue = 0;
   Mutex flock("Client::_read_async flock");
@@ -6381,6 +6336,28 @@ int Client::_read_async(Fh *f, uint64_t off, uint64_t len, bufferlist *bl)
     // it was cached.
     delete onfinish;
   }
+
+  if(conf->client_readahead_max_bytes > 0) {
+    pair<uint64_t, uint64_t> readahead_extent = f->readahead.update(off, len, in->size);
+    if (readahead_extent.second > 0) {
+      ldout(cct, 20) << "readahead " << readahead_extent.first << "~" << readahead_extent.second
+                    << " (caller wants " << off << "~" << len << ")" << dendl;
+      Context *onfinish2 = new C_Readahead(this, f);
+      f->readahead.inc_pending();
+      int r2 = objectcacher->file_read(&in->oset, &in->layout, in->snapid,
+                                      readahead_extent.first, readahead_extent.second,
+                                      NULL, 0, onfinish2);
+      if (r2 == 0) {
+       ldout(cct, 20) << "readahead initiated, c " << onfinish2 << dendl;
+       get_cap_ref(in, CEPH_CAP_FILE_RD | CEPH_CAP_FILE_CACHE);
+      } else {
+       f->readahead.dec_pending();
+       ldout(cct, 20) << "readahead was no-op, already cached" << dendl;
+       delete onfinish2;
+      }
+    }
+  }
+
   return r;
 }
 
index 7cb0e89130b740310d08a69070ba5d9d58c82080..0cc653ea3b64da817dcdb046810c5b50924e5725 100644 (file)
@@ -562,14 +562,11 @@ private:
 
   struct C_Readahead : public Context {
     Client *client;
-    Inode *inode;
-    C_Readahead(Client *c, Inode *i)
+    Fh *f;
+    C_Readahead(Client *c, Fh *f)
       : client(c),
-       inode(i) { }
-    void finish(int r) {
-      lsubdout(client->cct, client, 20) << "C_Readahead on " << inode << dendl;
-      client->put_cap_ref(inode, CEPH_CAP_FILE_RD | CEPH_CAP_FILE_CACHE);
-    }
+       f(f) { }
+    void finish(int r);
   };
 
   int _read_sync(Fh *f, uint64_t off, uint64_t len, bufferlist *bl, bool *checkeof);
index 235b244b4010c1d222cc9f76b2cb47136a684606..a4050a78480cd882659ab97ac8229be0986f6cbc 100644 (file)
@@ -1,6 +1,7 @@
 #ifndef CEPH_CLIENT_FH_H
 #define CEPH_CLIENT_FH_H
 
+#include "common/Readahead.h"
 #include "include/types.h"
 
 struct Inode;
@@ -18,13 +19,10 @@ struct Fh {
   bool pos_locked;           // pos is currently in use
   list<Cond*> pos_waiters;   // waiters for pos
 
-  // readahead state
-  loff_t last_pos;
-  loff_t consec_read_bytes;
-  int nr_consec_read;
+  Readahead readahead;
 
   Fh() : inode(0), pos(0), mds(0), mode(0), flags(0), pos_locked(false),
-        last_pos(0), consec_read_bytes(0), nr_consec_read(0) {}
+      readahead() {}
 };