From: Adam Crume Date: Tue, 16 Sep 2014 01:17:24 +0000 (-0700) Subject: client: Replace client readahead logic with Readahead X-Git-Tag: v0.88~61^2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=95ee69989129750fddce6a3b5644238c4b88ed74;p=ceph.git client: Replace client readahead logic with Readahead Signed-off-by: Adam Crume --- diff --git a/src/client/Client.cc b/src/client/Client.cc index 667b700b004..bcc0176d790 100644 --- a/src/client/Client.cc +++ b/src/client/Client.cc @@ -5911,6 +5911,23 @@ Fh *Client::_create_fh(Inode *in, int flags, int cmode) << ccap_string(in->caps_issued()) << dendl; } + const md_config_t *conf = cct->_conf; + loff_t p = in->layout.fl_stripe_count * in->layout.fl_object_size; + f->readahead.set_trigger_requests(1); + f->readahead.set_min_readahead_size(conf->client_readahead_min); + uint64_t max_readahead = Readahead::NO_LIMIT; + if (conf->client_readahead_max_bytes) { + max_readahead = MIN(max_readahead, conf->client_readahead_max_bytes); + } + if (conf->client_readahead_max_periods) { + max_readahead = MIN(max_readahead, conf->client_readahead_max_periods * p); + } + f->readahead.set_max_readahead_size(max_readahead); + vector alignments; + alignments.push_back(p); + alignments.push_back(in->layout.fl_stripe_unit); + f->readahead.set_alignments(alignments); + return f; } @@ -6243,22 +6260,9 @@ retry: } success: - // adjust readahead state - if (f->last_pos != start_pos) { - f->nr_consec_read = f->consec_read_bytes = 0; - } else { - f->nr_consec_read++; - } - f->consec_read_bytes += bl->length(); - ldout(cct, 10) << "readahead nr_consec_read " << f->nr_consec_read - << " for " << f->consec_read_bytes << " bytes" - << " .. last_pos " << f->last_pos << " .. offset " - << start_pos << dendl; - - f->last_pos = start_pos + bl->length(); if (movepos) { // adjust fd pos - f->pos = f->last_pos; + f->pos = start_pos + bl->length(); unlock_fh_pos(f); } @@ -6287,11 +6291,16 @@ done: return r < 0 ? r : bl->length(); } +void Client::C_Readahead::finish(int r) { + lgeneric_subdout(client->cct, client, 20) << "client." << client->get_nodeid() << " " << "C_Readahead on " << f->inode << dendl; + client->put_cap_ref(f->inode, CEPH_CAP_FILE_RD | CEPH_CAP_FILE_CACHE); + f->readahead.dec_pending(); +} + int Client::_read_async(Fh *f, uint64_t off, uint64_t len, bufferlist *bl) { const md_config_t *conf = cct->_conf; Inode *in = f->inode; - bool readahead = true; ldout(cct, 10) << "_read_async " << *in << " " << off << "~" << len << dendl; @@ -6300,65 +6309,11 @@ int Client::_read_async(Fh *f, uint64_t off, uint64_t len, bufferlist *bl) return 0; if (off + len > in->size) { len = in->size - off; - readahead = false; - } - - ldout(cct, 10) << "readahead=" << readahead << " nr_consec=" << f->nr_consec_read - << " max_byes=" << conf->client_readahead_max_bytes - << " max_periods=" << conf->client_readahead_max_periods << dendl; - - // readahead? - if (readahead && - f->nr_consec_read && - (conf->client_readahead_max_bytes || - conf->client_readahead_max_periods)) { - loff_t l = f->consec_read_bytes * 2; - if (conf->client_readahead_min) - l = MAX(l, conf->client_readahead_min); - if (conf->client_readahead_max_bytes) - l = MIN(l, conf->client_readahead_max_bytes); - loff_t p = in->layout.fl_stripe_count * in->layout.fl_object_size; - if (conf->client_readahead_max_periods) - l = MIN(l, conf->client_readahead_max_periods * p); - - if (l >= 2*p) - // align large readahead with period - l -= (off+l) % p; - else { - // align readahead with stripe unit if we cross su boundary - int su = in->layout.fl_stripe_unit; - if ((off+l)/su != off/su) l -= (off+l) % su; - } - - // don't read past end of file - if (off+l > in->size) - l = in->size - off; - - loff_t min = MIN((loff_t)len, l/2); - - ldout(cct, 20) << "readahead " << f->nr_consec_read << " reads " - << f->consec_read_bytes << " bytes ... readahead " << off << "~" << l - << " min " << min - << " (caller wants " << off << "~" << len << ")" << dendl; - if (l > (loff_t)len) { - if (objectcacher->file_is_cached(&in->oset, &in->layout, in->snapid, off, min)) - ldout(cct, 20) << "readahead already have min" << dendl; - else { - Context *onfinish = new C_Readahead(this, in); - int r = objectcacher->file_read(&in->oset, &in->layout, in->snapid, - off, l, - NULL, 0, onfinish); - if (r == 0) { - ldout(cct, 20) << "readahead initiated, c " << onfinish << dendl; - get_cap_ref(in, CEPH_CAP_FILE_RD | CEPH_CAP_FILE_CACHE); - } else { - ldout(cct, 20) << "readahead was no-op, already cached" << dendl; - delete onfinish; - } - } - } } + ldout(cct, 10) << " max_byes=" << conf->client_readahead_max_bytes + << " max_periods=" << conf->client_readahead_max_periods << dendl; + // read (and possibly block) int r, rvalue = 0; Mutex flock("Client::_read_async flock"); @@ -6381,6 +6336,28 @@ int Client::_read_async(Fh *f, uint64_t off, uint64_t len, bufferlist *bl) // it was cached. delete onfinish; } + + if(conf->client_readahead_max_bytes > 0) { + pair readahead_extent = f->readahead.update(off, len, in->size); + if (readahead_extent.second > 0) { + ldout(cct, 20) << "readahead " << readahead_extent.first << "~" << readahead_extent.second + << " (caller wants " << off << "~" << len << ")" << dendl; + Context *onfinish2 = new C_Readahead(this, f); + f->readahead.inc_pending(); + int r2 = objectcacher->file_read(&in->oset, &in->layout, in->snapid, + readahead_extent.first, readahead_extent.second, + NULL, 0, onfinish2); + if (r2 == 0) { + ldout(cct, 20) << "readahead initiated, c " << onfinish2 << dendl; + get_cap_ref(in, CEPH_CAP_FILE_RD | CEPH_CAP_FILE_CACHE); + } else { + f->readahead.dec_pending(); + ldout(cct, 20) << "readahead was no-op, already cached" << dendl; + delete onfinish2; + } + } + } + return r; } diff --git a/src/client/Client.h b/src/client/Client.h index 7cb0e89130b..0cc653ea3b6 100644 --- a/src/client/Client.h +++ b/src/client/Client.h @@ -562,14 +562,11 @@ private: struct C_Readahead : public Context { Client *client; - Inode *inode; - C_Readahead(Client *c, Inode *i) + Fh *f; + C_Readahead(Client *c, Fh *f) : client(c), - inode(i) { } - void finish(int r) { - lsubdout(client->cct, client, 20) << "C_Readahead on " << inode << dendl; - client->put_cap_ref(inode, CEPH_CAP_FILE_RD | CEPH_CAP_FILE_CACHE); - } + f(f) { } + void finish(int r); }; int _read_sync(Fh *f, uint64_t off, uint64_t len, bufferlist *bl, bool *checkeof); diff --git a/src/client/Fh.h b/src/client/Fh.h index 235b244b401..a4050a78480 100644 --- a/src/client/Fh.h +++ b/src/client/Fh.h @@ -1,6 +1,7 @@ #ifndef CEPH_CLIENT_FH_H #define CEPH_CLIENT_FH_H +#include "common/Readahead.h" #include "include/types.h" struct Inode; @@ -18,13 +19,10 @@ struct Fh { bool pos_locked; // pos is currently in use list pos_waiters; // waiters for pos - // readahead state - loff_t last_pos; - loff_t consec_read_bytes; - int nr_consec_read; + Readahead readahead; Fh() : inode(0), pos(0), mds(0), mode(0), flags(0), pos_locked(false), - last_pos(0), consec_read_bytes(0), nr_consec_read(0) {} + readahead() {} };