<< ccap_string(in->caps_issued()) << dendl;
}
+ const md_config_t *conf = cct->_conf;
+ loff_t p = in->layout.fl_stripe_count * in->layout.fl_object_size;
+ f->readahead.set_trigger_requests(1);
+ f->readahead.set_min_readahead_size(conf->client_readahead_min);
+ uint64_t max_readahead = Readahead::NO_LIMIT;
+ if (conf->client_readahead_max_bytes) {
+ max_readahead = MIN(max_readahead, conf->client_readahead_max_bytes);
+ }
+ if (conf->client_readahead_max_periods) {
+ max_readahead = MIN(max_readahead, conf->client_readahead_max_periods * p);
+ }
+ f->readahead.set_max_readahead_size(max_readahead);
+ vector<uint64_t> alignments;
+ alignments.push_back(p);
+ alignments.push_back(in->layout.fl_stripe_unit);
+ f->readahead.set_alignments(alignments);
+
return f;
}
}
success:
- // adjust readahead state
- if (f->last_pos != start_pos) {
- f->nr_consec_read = f->consec_read_bytes = 0;
- } else {
- f->nr_consec_read++;
- }
- f->consec_read_bytes += bl->length();
- ldout(cct, 10) << "readahead nr_consec_read " << f->nr_consec_read
- << " for " << f->consec_read_bytes << " bytes"
- << " .. last_pos " << f->last_pos << " .. offset "
- << start_pos << dendl;
-
- f->last_pos = start_pos + bl->length();
if (movepos) {
// adjust fd pos
- f->pos = f->last_pos;
+ f->pos = start_pos + bl->length();
unlock_fh_pos(f);
}
return r < 0 ? r : bl->length();
}
+void Client::C_Readahead::finish(int r) {
+ lgeneric_subdout(client->cct, client, 20) << "client." << client->get_nodeid() << " " << "C_Readahead on " << f->inode << dendl;
+ client->put_cap_ref(f->inode, CEPH_CAP_FILE_RD | CEPH_CAP_FILE_CACHE);
+ f->readahead.dec_pending();
+}
+
int Client::_read_async(Fh *f, uint64_t off, uint64_t len, bufferlist *bl)
{
const md_config_t *conf = cct->_conf;
Inode *in = f->inode;
- bool readahead = true;
ldout(cct, 10) << "_read_async " << *in << " " << off << "~" << len << dendl;
return 0;
if (off + len > in->size) {
len = in->size - off;
- readahead = false;
- }
-
- ldout(cct, 10) << "readahead=" << readahead << " nr_consec=" << f->nr_consec_read
- << " max_byes=" << conf->client_readahead_max_bytes
- << " max_periods=" << conf->client_readahead_max_periods << dendl;
-
- // readahead?
- if (readahead &&
- f->nr_consec_read &&
- (conf->client_readahead_max_bytes ||
- conf->client_readahead_max_periods)) {
- loff_t l = f->consec_read_bytes * 2;
- if (conf->client_readahead_min)
- l = MAX(l, conf->client_readahead_min);
- if (conf->client_readahead_max_bytes)
- l = MIN(l, conf->client_readahead_max_bytes);
- loff_t p = in->layout.fl_stripe_count * in->layout.fl_object_size;
- if (conf->client_readahead_max_periods)
- l = MIN(l, conf->client_readahead_max_periods * p);
-
- if (l >= 2*p)
- // align large readahead with period
- l -= (off+l) % p;
- else {
- // align readahead with stripe unit if we cross su boundary
- int su = in->layout.fl_stripe_unit;
- if ((off+l)/su != off/su) l -= (off+l) % su;
- }
-
- // don't read past end of file
- if (off+l > in->size)
- l = in->size - off;
-
- loff_t min = MIN((loff_t)len, l/2);
-
- ldout(cct, 20) << "readahead " << f->nr_consec_read << " reads "
- << f->consec_read_bytes << " bytes ... readahead " << off << "~" << l
- << " min " << min
- << " (caller wants " << off << "~" << len << ")" << dendl;
- if (l > (loff_t)len) {
- if (objectcacher->file_is_cached(&in->oset, &in->layout, in->snapid, off, min))
- ldout(cct, 20) << "readahead already have min" << dendl;
- else {
- Context *onfinish = new C_Readahead(this, in);
- int r = objectcacher->file_read(&in->oset, &in->layout, in->snapid,
- off, l,
- NULL, 0, onfinish);
- if (r == 0) {
- ldout(cct, 20) << "readahead initiated, c " << onfinish << dendl;
- get_cap_ref(in, CEPH_CAP_FILE_RD | CEPH_CAP_FILE_CACHE);
- } else {
- ldout(cct, 20) << "readahead was no-op, already cached" << dendl;
- delete onfinish;
- }
- }
- }
}
+ ldout(cct, 10) << " max_byes=" << conf->client_readahead_max_bytes
+ << " max_periods=" << conf->client_readahead_max_periods << dendl;
+
// read (and possibly block)
int r, rvalue = 0;
Mutex flock("Client::_read_async flock");
// it was cached.
delete onfinish;
}
+
+ if(conf->client_readahead_max_bytes > 0) {
+ pair<uint64_t, uint64_t> readahead_extent = f->readahead.update(off, len, in->size);
+ if (readahead_extent.second > 0) {
+ ldout(cct, 20) << "readahead " << readahead_extent.first << "~" << readahead_extent.second
+ << " (caller wants " << off << "~" << len << ")" << dendl;
+ Context *onfinish2 = new C_Readahead(this, f);
+ f->readahead.inc_pending();
+ int r2 = objectcacher->file_read(&in->oset, &in->layout, in->snapid,
+ readahead_extent.first, readahead_extent.second,
+ NULL, 0, onfinish2);
+ if (r2 == 0) {
+ ldout(cct, 20) << "readahead initiated, c " << onfinish2 << dendl;
+ get_cap_ref(in, CEPH_CAP_FILE_RD | CEPH_CAP_FILE_CACHE);
+ } else {
+ f->readahead.dec_pending();
+ ldout(cct, 20) << "readahead was no-op, already cached" << dendl;
+ delete onfinish2;
+ }
+ }
+ }
+
return r;
}