// prefetch intelligently.
// (watch out, this is big if you use big objects or weird striping)
- fetch_len = layout.fl_stripe_count * layout.fl_object_size * g_conf.journaler_prefetch_periods;
+ uint64_t periods = g_conf.journaler_prefetch_periods;
+ if (periods < 2)
+ periods = 2; // we need at least 2 periods to make progress.
+ fetch_len = layout.fl_stripe_count * layout.fl_object_size * periods;
prefetch_from = fetch_len / 2;
}
Journaler *ls;
public:
C_RetryRead(Journaler *l) : ls(l) {}
- void finish(int r) { ls->is_readable(); } // this'll kickstart.
+ void finish(int r) {
+ // kickstart.
+ ls->_prefetch();
+ }
};
void Journaler::_finish_read(int r)
<< ", read pointers " << read_pos << "/" << received_pos << "/" << requested_pos
<< dendl;
- if (is_readable()) { // NOTE: this check may read more
+ if (_is_readable()) {
// readable!
dout(10) << "_finish_read now readable" << dendl;
if (on_readable) {
}
}
- // prefetch?
_prefetch();
}
// make sure we're fully flushed
_do_flush();
- if (_is_reading()) {
- dout(10) << "_issue_read " << len << " waiting, already reading "
- << received_pos << "~" << (requested_pos-received_pos) << dendl;
- return;
- }
- assert(requested_pos == received_pos);
-
// stuck at ack_pos?
+ // (this is needed if we are reading the tail of a journal we are also writing to)
assert(requested_pos <= ack_pos);
if (requested_pos == ack_pos) {
dout(10) << "_issue_read requested_pos = ack_pos = " << ack_pos << ", waiting" << dendl;
void Journaler::_prefetch()
{
- // prefetch?
- uint64_t left = requested_pos - read_pos;
- if (left <= prefetch_from && // should read more,
- !_is_reading() && // and not reading anything right now
- write_pos > requested_pos) { // there's something more to read...
- dout(10) << "_prefetch only " << left << " < " << prefetch_from
- << ", prefetching " << dendl;
- _issue_read(fetch_len);
+ // prefetch
+ uint64_t pf;
+ if (temp_fetch_len) {
+ dout(10) << "_prefetch temp_fetch_len " << temp_fetch_len << dendl;
+ pf = temp_fetch_len;
+ temp_fetch_len = 0;
+ } else {
+ pf = fetch_len;
+ }
+
+ uint64_t raw_target = read_pos + pf;
+
+ // only read full log segments
+ uint64_t period = get_layout_period();
+ uint64_t target = raw_target - (raw_target % period);
+
+ // don't read past the log tail
+ if (target > write_pos)
+ target = write_pos;
+
+ if (requested_pos < target) {
+ uint64_t len = target - requested_pos;
+ dout(10) << "_prefetch " << pf << " requested_pos " << requested_pos << " < target " << target
+ << " (" << raw_target << "), prefetching " << len << dendl;
+ _issue_read(len);
}
}
* return true if next entry is ready.
* kickstart read as necessary.
*/
-bool Journaler::is_readable()
+bool Journaler::_is_readable()
{
// anything to read?
- if (read_pos == write_pos) return false;
+ if (read_pos == write_pos)
+ return false;
// have enough for entry size?
uint32_t s = 0;
junk_tail_pos = write_pos; // note old tail
write_pos = flush_pos = ack_pos = safe_pos = read_pos;
assert(write_buf.length() == 0);
-
+
// truncate?
// FIXME: how much?
-
return false;
- }
-
- // start reading some more?
- if (!_is_reading()) {
- if (s)
- fetch_len = MAX(fetch_len, (sizeof(s)+s-read_buf.length()));
- _issue_read(fetch_len);
}
+ uint64_t need = (sizeof(s)+s-read_buf.length());
+ if (need > fetch_len) {
+ dout(10) << "_is_readable noting temp_fetch_len " << temp_fetch_len
+ << " for len " << s << " entry" << dendl;
+ temp_fetch_len = need;
+ }
return false;
}
+bool Journaler::is_readable()
+{
+ bool r =_is_readable();
+ _prefetch();
+ return r;
+}
+
bool Journaler::truncate_tail_junk(Context *c)
{
if(readonly)
void Journaler::wait_for_readable(Context *onreadable)
{
dout(10) << "wait_for_readable at " << read_pos << " onreadable " << onreadable << dendl;
- assert(!is_readable());
+ assert(!_is_readable());
assert(on_readable == 0);
on_readable = onreadable;
}
void Journaler::trim()
{
assert(!readonly);
- uint64_t period = layout.fl_stripe_count * layout.fl_object_size;
+ uint64_t period = get_layout_period();
uint64_t trim_to = last_committed.expire_pos;
trim_to -= trim_to % period;
bufferlist reading_buf; // what i'm reading into
uint64_t fetch_len; // how much to read at a time
+ uint64_t temp_fetch_len;
uint64_t prefetch_from; // how far from end do we read next chunk
int64_t junk_tail_pos; // for truncate
// for wait_for_readable()
Context *on_readable;
- bool _is_reading() {
- return requested_pos > received_pos;
- }
void _finish_read(int r); // we just read some (read completion callback)
void _issue_read(int64_t len); // read some more
void _prefetch(); // maybe read ahead
state(STATE_UNDEF), error(0),
write_pos(0), flush_pos(0), ack_pos(0), safe_pos(0),
read_pos(0), requested_pos(0), received_pos(0),
- fetch_len(0), prefetch_from(0),
+ fetch_len(0), temp_fetch_len(0), prefetch_from(0),
junk_tail_pos(0),
read_bl(0), on_read_finish(0), on_readable(0),
expire_pos(0), trimming_pos(0), trimmed_pos(0)
read_pos = requested_pos = received_pos = p;
read_buf.clear();
}
+
+ bool _is_readable();
bool is_readable();
bool try_read_entry(bufferlist& bl);
void wait_for_readable(Context *onfinish);