try {
_assimilate_prefetch();
- // Check the readable-ness of the buffer: do this head because it involves
- // decoding, and we would like to catch any decode errors here so that
- // external is_readable() callers don't have to.
- _is_readable();
} catch (const buffer::error &err) {
+ lderr(cct) << "_decode error from assimilate_prefetch" << dendl;
error = -EINVAL;
if (on_readable) {
C_OnFinisher *f = on_readable;
void Journaler::_assimilate_prefetch()
{
- bool was_readable = _is_readable();
+ bool was_readable = readable;
bool got_any = false;
while (!prefetch_buf.empty()) {
got_any = true;
}
- if (got_any)
+ if (got_any) {
ldout(cct, 10) << "_assimilate_prefetch read_buf now " << read_pos << "~" << read_buf.length()
<< ", read pointers " << read_pos << "/" << received_pos << "/" << requested_pos
<< dendl;
- if ((got_any && !was_readable && _is_readable()) ||
- read_pos == write_pos) {
+ // Update readability (this will also hit any decode errors resulting
+ // from bad data)
+ readable = _is_readable();
+ }
+
+ if ((got_any && !was_readable && readable) || read_pos == write_pos) {
// readable!
ldout(cct, 10) << "_finish_read now readable (or at journal end)" << dendl;
if (on_readable) {
if (read_pos == write_pos)
return false;
- // Are we errored? Stop here to avoid risking
- // raising decode errors.
- if (error != 0) {
- return false;
- }
-
// Check if the retrieve bytestream has enough for an entry
uint64_t need;
if (journal_stream.readable(read_buf, &need)) {
{
Mutex::Locker l(lock);
- bool r = _is_readable();
+ if (error != 0) {
+ return false;
+ }
+
+ bool r = readable;
_prefetch();
return r;
}
/* try_read_entry(bl)
* read entry into bl if it's ready.
- * otherwise, do nothing. (well, we'll start fetching it for good measure.)
+ * otherwise, do nothing.
*/
bool Journaler::try_read_entry(bufferlist& bl)
{
Mutex::Locker l(lock);
- if (!_is_readable()) { // this may start a read.
+ if (!readable) {
ldout(cct, 10) << "try_read_entry at " << read_pos << " not readable" << dendl;
return false;
}
assert(start_ptr == read_pos);
}
} catch (const buffer::error &e) {
+ lderr(cct) << __func__ << ": decode error from journal_stream" << dendl;
error = -EINVAL;
return false;
}
<< read_pos << "~" << consumed << " (have " << read_buf.length() << ")" << dendl;
read_pos += consumed;
+ try {
+ // We were readable, we might not be any more
+ readable = _is_readable();
+ } catch (const buffer::error &e) {
+ lderr(cct) << __func__ << ": decode error from _is_readable" << dendl;
+ error = -EINVAL;
+ return false;
+ }
// prefetch?
_prefetch();
Mutex::Locker l(lock);
assert(on_readable == 0);
- if (!_is_readable()) {
+ if (!readable) {
ldout(cct, 10) << "wait_for_readable at " << read_pos << " onreadable " << onreadable << dendl;
on_readable = wrap_finisher(onreadable);
} else {
uint64_t trimming_pos; // what we've requested to trim through
uint64_t trimmed_pos; // what has been trimmed
+ bool readable;
+
void _finish_trim(int r, uint64_t to);
class C_Trim;
friend class C_Trim;
read_pos(0), requested_pos(0), received_pos(0),
fetch_len(0), temp_fetch_len(0),
on_readable(0), on_write_error(NULL), called_write_error(false),
- expire_pos(0), trimming_pos(0), trimmed_pos(0)
+ expire_pos(0), trimming_pos(0), trimmed_pos(0), readable(false)
{
memset(&layout, 0, sizeof(layout));
}