class Journaler::C_Read : public Context {
Journaler *ls;
+ uint64_t offset;
public:
- C_Read(Journaler *l) : ls(l) {}
- void finish(int r) { ls->_finish_read(r); }
+ bufferlist bl;
+ C_Read(Journaler *l, uint64_t o) : ls(l), offset(o) {}
+ void finish(int r) {
+ ls->_finish_read(r, offset, bl);
+ }
};
class Journaler::C_RetryRead : public Context {
}
};
-void Journaler::_finish_read(int r)
+void Journaler::_finish_read(int r, uint64_t offset, bufferlist& bl)
{
if (r < 0) {
dout(0) << "_finish_read got error " << r << dendl;
}
assert(r>=0);
- dout(10) << "_finish_read got " << received_pos << "~" << reading_buf.length() << dendl;
- received_pos += reading_buf.length();
- read_buf.claim_append(reading_buf);
- assert(received_pos <= requested_pos);
- dout(10) << "_finish_read read_buf now " << read_pos << "~" << read_buf.length()
- << ", read pointers " << read_pos << "/" << received_pos << "/" << requested_pos
- << dendl;
-
- if (_is_readable()) {
+ dout(10) << "_finish_read got " << offset << "~" << bl.length() << dendl;
+ prefetch_buf[offset].swap(bl);
+
+ _assimilate_prefetch();
+ _prefetch();
+}
+
+void Journaler::_assimilate_prefetch()
+{
+ bool any = false;
+ while (!prefetch_buf.empty()) {
+ map<uint64_t,bufferlist>::iterator p = prefetch_buf.begin();
+ if (p->first != received_pos) {
+ uint64_t gap = p->first - received_pos;
+ dout(10) << "_assimilate_prefetch gap of " << gap << " from received_pos " << received_pos
+ << " to first prefetched buffer " << p->first << dendl;
+ break;
+ }
+
+ dout(10) << "_assimilate_prefetch " << p->first << "~" << p->second.length() << dendl;
+ received_pos += p->second.length();
+ read_buf.claim_append(p->second);
+ assert(received_pos <= requested_pos);
+ prefetch_buf.erase(p);
+ any = true;
+ }
+
+ if (any)
+ dout(10) << "_assimilate_prefetch read_buf now " << read_pos << "~" << read_buf.length()
+ << ", read pointers " << read_pos << "/" << received_pos << "/" << requested_pos
+ << dendl;
+
+ if (any && _is_readable()) {
// readable!
dout(10) << "_finish_read now readable" << dendl;
if (on_readable) {
delete f;
}
}
-
- _prefetch();
}
/* NOTE: this could be slightly smarter... we could allow
<< ", read pointers " << read_pos << "/" << received_pos << "/" << (requested_pos+len)
<< dendl;
+ C_Read *c = new C_Read(this, requested_pos);
filer.read(ino, &layout, CEPH_NOSNAP,
- requested_pos, len, &reading_buf, 0,
- new C_Read(this));
+ requested_pos, len, &c->bl, 0, c);
requested_pos += len;
}
uint64_t requested_pos; // what we've requested from OSD.
uint64_t received_pos; // what we've received from OSD.
bufferlist read_buf; // read buffer. unused_field + read_buf.length() == prefetch_pos.
- bufferlist reading_buf; // what i'm reading into
+
+ map<uint64_t,bufferlist> prefetch_buf;
uint64_t fetch_len; // how much to read at a time
uint64_t temp_fetch_len;
// for wait_for_readable()
Context *on_readable;
- void _finish_read(int r); // we just read some (read completion callback)
+ void _finish_read(int r, uint64_t offset, bufferlist &bl); // read completion callback
+ void _assimilate_prefetch();
void _issue_read(int64_t len); // read some more
void _prefetch(); // maybe read ahead
class C_Read;