From: Sage Weil Date: Fri, 25 Mar 2011 16:42:47 +0000 (-0700) Subject: journaler: fix prefetch to handle multiple in-flight reads X-Git-Tag: v0.27~245^2~3 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=c9d0edebd025bd43d07c7a2a128ae2a8b8539e09;p=ceph.git journaler: fix prefetch to handle multiple in-flight reads If we issue multiple reads, they can come back in any order. Signed-off-by: Sage Weil --- diff --git a/src/osdc/Journaler.cc b/src/osdc/Journaler.cc index f0ea07683418..c9f9d3d57146 100644 --- a/src/osdc/Journaler.cc +++ b/src/osdc/Journaler.cc @@ -628,9 +628,13 @@ void Journaler::flush(Context *onsync, Context *onsafe, bool add_ack_barrier) class Journaler::C_Read : public Context { Journaler *ls; + uint64_t offset; public: - C_Read(Journaler *l) : ls(l) {} - void finish(int r) { ls->_finish_read(r); } + bufferlist bl; + C_Read(Journaler *l, uint64_t o) : ls(l), offset(o) {} + void finish(int r) { + ls->_finish_read(r, offset, bl); + } }; class Journaler::C_RetryRead : public Context { @@ -643,7 +647,7 @@ public: } }; -void Journaler::_finish_read(int r) +void Journaler::_finish_read(int r, uint64_t offset, bufferlist& bl) { if (r < 0) { dout(0) << "_finish_read got error " << r << dendl; @@ -658,15 +662,39 @@ void Journaler::_finish_read(int r) } assert(r>=0); - dout(10) << "_finish_read got " << received_pos << "~" << reading_buf.length() << dendl; - received_pos += reading_buf.length(); - read_buf.claim_append(reading_buf); - assert(received_pos <= requested_pos); - dout(10) << "_finish_read read_buf now " << read_pos << "~" << read_buf.length() - << ", read pointers " << read_pos << "/" << received_pos << "/" << requested_pos - << dendl; - - if (_is_readable()) { + dout(10) << "_finish_read got " << offset << "~" << bl.length() << dendl; + prefetch_buf[offset].swap(bl); + + _assimilate_prefetch(); + _prefetch(); +} + +void Journaler::_assimilate_prefetch() +{ + bool any = false; + while (!prefetch_buf.empty()) { + map::iterator p = prefetch_buf.begin(); + if (p->first != received_pos) { + uint64_t gap = p->first - received_pos; + dout(10) << "_assimilate_prefetch gap of " << gap << " from received_pos " << received_pos + << " to first prefetched buffer " << p->first << dendl; + break; + } + + dout(10) << "_assimilate_prefetch " << p->first << "~" << p->second.length() << dendl; + received_pos += p->second.length(); + read_buf.claim_append(p->second); + assert(received_pos <= requested_pos); + prefetch_buf.erase(p); + any = true; + } + + if (any) + dout(10) << "_assimilate_prefetch read_buf now " << read_pos << "~" << read_buf.length() + << ", read pointers " << read_pos << "/" << received_pos << "/" << requested_pos + << dendl; + + if (any && _is_readable()) { // readable! dout(10) << "_finish_read now readable" << dendl; if (on_readable) { @@ -690,8 +718,6 @@ void Journaler::_finish_read(int r) delete f; } } - - _prefetch(); } /* NOTE: this could be slightly smarter... we could allow @@ -728,9 +754,9 @@ void Journaler::_issue_read(int64_t len) << ", read pointers " << read_pos << "/" << received_pos << "/" << (requested_pos+len) << dendl; + C_Read *c = new C_Read(this, requested_pos); filer.read(ino, &layout, CEPH_NOSNAP, - requested_pos, len, &reading_buf, 0, - new C_Read(this)); + requested_pos, len, &c->bl, 0, c); requested_pos += len; } diff --git a/src/osdc/Journaler.h b/src/osdc/Journaler.h index d751e44b0c26..29d94a4481c4 100644 --- a/src/osdc/Journaler.h +++ b/src/osdc/Journaler.h @@ -185,7 +185,8 @@ private: uint64_t requested_pos; // what we've requested from OSD. uint64_t received_pos; // what we've received from OSD. bufferlist read_buf; // read buffer. unused_field + read_buf.length() == prefetch_pos. - bufferlist reading_buf; // what i'm reading into + + map prefetch_buf; uint64_t fetch_len; // how much to read at a time uint64_t temp_fetch_len; @@ -199,7 +200,8 @@ private: // for wait_for_readable() Context *on_readable; - void _finish_read(int r); // we just read some (read completion callback) + void _finish_read(int r, uint64_t offset, bufferlist &bl); // read completion callback + void _assimilate_prefetch(); void _issue_read(int64_t len); // read some more void _prefetch(); // maybe read ahead class C_Read;