journal_stream.set_format(sf);
_set_layout(l);
- prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos =
- read_pos = requested_pos = received_pos =
- expire_pos = trimming_pos = trimmed_pos = layout.get_period();
+ prezeroing_pos = prezero_pos = write_pos = flush_pos =
+ safe_pos = read_pos = requested_pos = received_pos =
+ expire_pos = trimming_pos = trimmed_pos =
+ next_safe_pos = layout.get_period();
ldout(cct, 1) << "created blank journal at inode 0x" << std::hex << ino
<< std::dec << ", format=" << stream_format << dendl;
finish->complete(-EINVAL);
return;
}
- prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos
+ prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = next_safe_pos
= h.write_pos;
expire_pos = h.expire_pos;
trimmed_pos = trimming_pos = h.trimmed_pos;
return;
}
- prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos
+ prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = next_safe_pos
= h.write_pos;
read_pos = requested_pos = received_pos = expire_pos = h.expire_pos;
trimmed_pos = trimming_pos = h.trimmed_pos;
ldout(cct, 1) << "_finish_reprobe new_end = " << new_end
<< " (header had " << write_pos << ")."
<< dendl;
- prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = new_end;
+ prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = next_safe_pos = new_end;
state = STATE_ACTIVE;
onfinish->complete(r);
}
state = STATE_ACTIVE;
- prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = end;
+ prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = next_safe_pos = end;
out:
// done.
return;
}
- assert(start >= safe_pos);
assert(start < flush_pos);
// calc latency?
}
// adjust safe_pos
- assert(pending_safe.count(start));
- pending_safe.erase(start);
+ auto it = pending_safe.find(start);
+ assert(it != pending_safe.end());
+ pending_safe.erase(it);
if (pending_safe.empty())
- safe_pos = flush_pos;
+ safe_pos = next_safe_pos;
else
- safe_pos = *pending_safe.begin();
+ safe_pos = pending_safe.begin()->second;
ldout(cct, 10) << "_finish_flush safe from " << start
<< ", pending_safe " << pending_safe
ldout(cct, 10) << " flushing completed object(s) (su " << su << " wro "
<< write_obj << " flo " << flush_obj << ")" << dendl;
_do_flush(write_buf.length() - write_off);
+ if (write_off) {
+ // current entry isn't being flushed, set next_safe_pos to the end of previous entry
+ next_safe_pos = write_pos - wrote;
+ }
}
return write_pos;
SnapContext snapc;
Context *onsafe = new C_Flush(this, flush_pos, now); // on COMMIT
- pending_safe.insert(flush_pos);
+ pending_safe[flush_pos] = next_safe_pos;
bufferlist write_bl;
// adjust pointers
if (len == write_buf.length()) {
write_bl.swap(write_buf);
+ next_safe_pos = write_pos;
} else {
write_buf.splice(0, len, &write_bl);
}
ldout(cct, 10) << "_issue_read requested_pos = safe_pos = " << safe_pos
<< ", waiting" << dendl;
assert(write_pos > requested_pos);
- if (flush_pos == safe_pos) {
+ if (pending_safe.empty()) {
_flush(NULL);
}
assert(flush_pos > safe_pos);
"adjusting write_pos to " << read_pos << dendl;
// adjust write_pos
- prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = read_pos;
+ prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = next_safe_pos = read_pos;
assert(write_buf.length() == 0);
// reset read state
uint64_t flush_pos; ///< where we will flush. if
/// write_pos>flush_pos, we're buffering writes.
uint64_t safe_pos; ///< what has been committed safely to disk.
+
+ uint64_t next_safe_pos; /// start postion of the first entry that isn't
+ /// being fully flushed. If we don't flush any
+ // partial entry, it's equal to flush_pos.
+
bufferlist write_buf; ///< write buffer. flush_pos +
/// write_buf.length() == write_pos.
bool waiting_for_zero;
interval_set<uint64_t> pending_zero; // non-contig bits we've zeroed
- std::set<uint64_t> pending_safe;
+ std::map<uint64_t, uint64_t> pending_safe; // flush_pos -> safe_pos
// when safe through given offset
std::map<uint64_t, std::list<Context*> > waitfor_safe;
objecter(obj), filer(objecter, f), logger(l), logger_key_lat(lkey),
timer(tim), delay_flush_event(0),
state(STATE_UNDEF), error(0),
- prezeroing_pos(0), prezero_pos(0), write_pos(0), flush_pos(0), safe_pos(0),
+ prezeroing_pos(0), prezero_pos(0), write_pos(0), flush_pos(0),
+ safe_pos(0), next_safe_pos(0),
write_buf_throttle(cct, "write_buf_throttle", UINT_MAX - (UINT_MAX >> 3)),
waiting_for_zero(false),
read_pos(0), requested_pos(0), received_pos(0),
write_pos = 0;
flush_pos = 0;
safe_pos = 0;
+ next_safe_pos = 0;
read_pos = 0;
requested_pos = 0;
received_pos = 0;
void set_writeable();
void set_write_pos(int64_t p) {
lock_guard l(lock);
- prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = p;
+ prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = next_safe_pos = p;
}
void set_read_pos(int64_t p) {
lock_guard l(lock);