From b51834ba6c0d08542d6ba103c38720936b6035e4 Mon Sep 17 00:00:00 2001 From: sageweil Date: Mon, 20 Nov 2006 22:16:34 +0000 Subject: [PATCH] Journaler ignores fragmented entries at end of journal. does not truncate them, though! git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@962 29311d96-e01e-0410-9327-a35deaab8ce9 --- ceph/osdc/Journaler.cc | 68 ++++++++++++++++++++++++++++++++---------- 1 file changed, 53 insertions(+), 15 deletions(-) diff --git a/ceph/osdc/Journaler.cc b/ceph/osdc/Journaler.cc index 7a3dbf0823d10..ab1f7813258df 100644 --- a/ceph/osdc/Journaler.cc +++ b/ceph/osdc/Journaler.cc @@ -225,10 +225,34 @@ void Journaler::_finish_flush(int r, off_t start) off_t Journaler::append_entry(bufferlist& bl, Context *onsync) { + size_t s = bl.length(); + + if (!g_conf.journaler_allow_split_entries) { + // will we span a stripe boundary? + int p = inode.layout.stripe_size; + if (write_pos / p != (write_pos + bl.length() + sizeof(s)) / p) { + // yes. + // move write_pos forward. + off_t owp = write_pos; + write_pos += p; + write_pos -= (write_pos % p); + + // pad with zeros. + bufferptr bp(write_pos - owp); + bp.zero(); + assert(bp.length() >= 4); + write_buf.push_back(bp); + + // now flush. + flush(); + + dout(12) << "append_entry skipped " << (write_pos-owp) << " bytes to " << write_pos << " to avoid spanning stripe boundary" << endl; + } + } + dout(10) << "append_entry len " << bl.length() << " to " << write_pos << "~" << (bl.length() + sizeof(size_t)) << endl; // append - size_t s = bl.length(); write_buf.append((char*)&s, sizeof(s)); write_buf.append(bl); write_pos += sizeof(s) + s; @@ -439,23 +463,37 @@ bool Journaler::is_readable() if (read_pos == write_pos) return false; // have enough for entry size? - size_t s; - if (read_buf.length() < sizeof(s)) { - if (!_is_reading()) - _issue_read(fetch_len); - return false; - } - read_buf.copy(0, sizeof(s), (char*)&s); + size_t s = 0; + if (read_buf.length() >= sizeof(s)) + read_buf.copy(0, sizeof(s), (char*)&s); + + // entry and payload? + if (read_buf.length() >= sizeof(s) && + read_buf.length() >= sizeof(s) + s) + return true; // yep, next entry is ready. + + // darn it! - // have entirely of next entry? - if (read_buf.length() < sizeof(s) + s) { - if (!_is_reading()) - _issue_read(MAX(fetch_len, sizeof(s)+s-read_buf.length())); + // partial fragment at the end? + if (received_pos == write_pos) { + dout(10) << "is_readable() detected partial entry at tail, adjusting write_pos to " << read_pos << endl; + write_pos = flush_pos = ack_pos = read_pos; + assert(write_buf.length() == 0); + + // truncate? + // FIXME: how much? + return false; + } + + // start reading some more? + if (!_is_reading()) { + if (s) + fetch_len = MAX(fetch_len, sizeof(s)+s-read_buf.length()); + _issue_read(fetch_len); } - - // next entry is ready! - return true; + + return false; } -- 2.39.5