off_t Journaler::append_entry(bufferlist& bl, Context *onsync)
{
+ size_t s = bl.length();
+
+ if (!g_conf.journaler_allow_split_entries) {
+ // will we span a stripe boundary?
+ int p = inode.layout.stripe_size;
+ if (write_pos / p != (write_pos + bl.length() + sizeof(s)) / p) {
+ // yes.
+ // move write_pos forward.
+ off_t owp = write_pos;
+ write_pos += p;
+ write_pos -= (write_pos % p);
+
+ // pad with zeros.
+ bufferptr bp(write_pos - owp);
+ bp.zero();
+ assert(bp.length() >= 4);
+ write_buf.push_back(bp);
+
+ // now flush.
+ flush();
+
+ dout(12) << "append_entry skipped " << (write_pos-owp) << " bytes to " << write_pos << " to avoid spanning stripe boundary" << endl;
+ }
+ }
+
dout(10) << "append_entry len " << bl.length() << " to " << write_pos << "~" << (bl.length() + sizeof(size_t)) << endl;
// append
- size_t s = bl.length();
write_buf.append((char*)&s, sizeof(s));
write_buf.append(bl);
write_pos += sizeof(s) + s;
if (read_pos == write_pos) return false;
// have enough for entry size?
- size_t s;
- if (read_buf.length() < sizeof(s)) {
- if (!_is_reading())
- _issue_read(fetch_len);
- return false;
- }
- read_buf.copy(0, sizeof(s), (char*)&s);
+ size_t s = 0;
+ if (read_buf.length() >= sizeof(s))
+ read_buf.copy(0, sizeof(s), (char*)&s);
+
+ // entry and payload?
+ if (read_buf.length() >= sizeof(s) &&
+ read_buf.length() >= sizeof(s) + s)
+ return true; // yep, next entry is ready.
+
+ // darn it!
- // have entirely of next entry?
- if (read_buf.length() < sizeof(s) + s) {
- if (!_is_reading())
- _issue_read(MAX(fetch_len, sizeof(s)+s-read_buf.length()));
+ // partial fragment at the end?
+ if (received_pos == write_pos) {
+ dout(10) << "is_readable() detected partial entry at tail, adjusting write_pos to " << read_pos << endl;
+ write_pos = flush_pos = ack_pos = read_pos;
+ assert(write_buf.length() == 0);
+
+ // truncate?
+ // FIXME: how much?
+
return false;
+ }
+
+ // start reading some more?
+ if (!_is_reading()) {
+ if (s)
+ fetch_len = MAX(fetch_len, sizeof(s)+s-read_buf.length());
+ _issue_read(fetch_len);
}
-
- // next entry is ready!
- return true;
+
+ return false;
}