<< write_obj << " flo " << flush_obj << ")" << dendl;
_do_flush(write_buf.length() - write_off);
- // if _do_flush() skips flushing some data, it does not update next_safe_pos.
+ // if _do_flush() skips flushing some data, it does do a best effort to
+ // update next_safe_pos.
if (write_buf.length() > 0 &&
write_buf.length() <= wrote) { // the unflushed data are within this entry
// set next_safe_pos to end of previous entry
next_safe_pos = write_pos;
} else {
write_buf.splice(0, len, &write_bl);
+ // Keys of waitfor_safe map are journal entry boundaries.
+ // Try finding a journal entry that we are actually flushing
+ // and set next_safe_pos to end of it. This is best effort.
+ // The one we found may not be the lastest flushing entry.
+ auto p = waitfor_safe.lower_bound(flush_pos + len);
+ if (p != waitfor_safe.end()) {
+ if (p->first > flush_pos + len && p != waitfor_safe.begin())
+ --p;
+ if (p->first <= flush_pos + len && p->first > next_safe_pos)
+ next_safe_pos = p->first;
+ }
}
filer.write(ino, &layout, snapc,
if (pending_safe.empty()) {
_flush(NULL);
}
- waitfor_safe[flush_pos].push_back(new C_RetryRead(this));
+
+ // Make sure keys of waitfor_safe map are journal entry boundaries.
+ // The key we used here is either next_safe_pos or old value of
+ // next_safe_pos. next_safe_pos is always set to journal entry
+ // boundary.
+ auto p = pending_safe.rbegin();
+ if (p != pending_safe.rend())
+ waitfor_safe[p->second].push_back(new C_RetryRead(this));
+ else
+ waitfor_safe[next_safe_pos].push_back(new C_RetryRead(this));
return;
}
// adjust write_pos
prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = next_safe_pos = read_pos;
assert(write_buf.length() == 0);
+ assert(waitfor_safe.empty());
// reset read state
requested_pos = received_pos = read_pos;