write_buf.claim_append(bl);
write_pos += sizeof(s) + s;
+ // flush previous object?
+ int su = ceph_file_layout_su(layout);
+ int write_off = write_pos % su;
+ int write_obj = write_pos / su;
+ int flush_obj = flush_pos / su;
+ if (write_obj != flush_obj) {
+ dout(10) << " flushing completed object(s) (su " << su << " wro " << write_obj << " flo " << flush_obj << ")" << dendl;
+ _do_flush(write_buf.length() - write_off);
+ }
+
return write_pos;
}
-void Journaler::_do_flush()
+void Journaler::_do_flush(unsigned amount)
{
if (write_pos == flush_pos) return;
assert(write_pos > flush_pos);
// flush
unsigned len = write_pos - flush_pos;
assert(len == write_buf.length());
+ if (amount && amount < len)
+ len = amount;
dout(10) << "_do_flush flushing " << flush_pos << "~" << len << dendl;
// submit write for anything pending
Context *onsafe = new C_Flush(this, flush_pos, now, true); // on COMMIT
pending_safe.insert(flush_pos);
+ bufferlist write_bl;
+
+ // adjust pointers
+ if (len == write_buf.length()) {
+ write_bl.swap(write_buf);
+ } else {
+ write_buf.splice(0, len, &write_bl);
+ }
+ flush_pos += len;
+ assert(write_buf.length() == write_pos - flush_pos);
+
filer.write(ino, &layout, snapc,
- flush_pos, len, write_buf, g_clock.now(),
+ flush_pos, len, write_bl, g_clock.now(),
CEPH_OSD_FLAG_INCLOCK_FAIL,
onack, onsafe);
-
-
- // adjust pointers
- flush_pos = write_pos;
- write_buf.clear();
-
+
dout(10) << "_do_flush write pointers now at " << write_pos << "/" << flush_pos << "/" << ack_pos << "/" << safe_pos << dendl;
}