From: John Spray Date: Tue, 25 Mar 2014 13:30:50 +0000 (+0000) Subject: osdc: Revise Journaler format X-Git-Tag: v0.82~48^2~45 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=3fa825c9ed78b17060d17abafada6c19bed857c1;p=ceph.git osdc: Revise Journaler format * Separate journal encoding/envelope format code (JournalStream) from I/O code (Journaler) * Add new sentinel and start_ptr fields to prefix and suffix of log events. * Add journal encoding version to journal header Signed-off-by: John Spray --- diff --git a/src/osdc/Journaler.cc b/src/osdc/Journaler.cc index ba4ca8dc4b92..28b10977244a 100644 --- a/src/osdc/Journaler.cc +++ b/src/osdc/Journaler.cc @@ -226,6 +226,8 @@ void Journaler::_finish_read_head(int r, bufferlist& bl) init_headers(h); set_layout(&h.layout); + stream_format = h.stream_format; + journal_stream.set_format(h.stream_format); ldout(cct, 1) << "_finish_read_head " << h << ". probing for end of log (from " << write_pos << ")..." << dendl; C_ProbeEnd *fin = new C_ProbeEnd(this); @@ -429,6 +431,7 @@ void Journaler::_finish_flush(int r, uint64_t start, utime_t stamp) } + uint64_t Journaler::append_entry(bufferlist& bl) { assert(!readonly); @@ -457,12 +460,11 @@ uint64_t Journaler::append_entry(bufferlist& bl) } } - ldout(cct, 10) << "append_entry len " << bl.length() << " to " << write_pos << "~" << (bl.length() + sizeof(uint32_t)) << dendl; // append - ::encode(s, write_buf); - write_buf.claim_append(bl); - write_pos += sizeof(s) + s; + size_t wrote = journal_stream.write(bl, write_buf, write_pos); + ldout(cct, 10) << "append_entry len " << s << " to " << write_pos << "~" << wrote << dendl; + write_pos += wrote; // flush previous object? uint64_t su = get_layout_period(); @@ -858,6 +860,7 @@ void Journaler::_prefetch() } } + /* * _is_readable() - return true if next entry is ready. */ @@ -867,20 +870,14 @@ bool Journaler::_is_readable() if (read_pos == write_pos) return false; - // have enough for entry size? - uint32_t s = 0; - bufferlist::iterator p = read_buf.begin(); - if (read_buf.length() >= sizeof(s)) - ::decode(s, p); - - // entry and payload? - if (read_buf.length() >= sizeof(s) && - read_buf.length() >= sizeof(s) + s) - return true; // yep, next entry is ready. + // Check if the retrieve bytestream has enough for an entry + uint64_t need; + if (journal_stream.readable(read_buf, need)) { + return true; + } ldout (cct, 10) << "_is_readable read_buf.length() == " << read_buf.length() - << ", but need " << s + sizeof(s) - << " for next entry; fetch_len is " << fetch_len << dendl; + << ", but need " << need << " for next entry; fetch_len is " << fetch_len << dendl; // partial fragment at the end? if (received_pos == write_pos) { @@ -899,11 +896,9 @@ bool Journaler::_is_readable() return false; } - uint64_t need = sizeof(s) + s; if (need > fetch_len) { - temp_fetch_len = sizeof(s) + s; - ldout(cct, 10) << "_is_readable noting temp_fetch_len " << temp_fetch_len - << " for len " << s << " entry" << dendl; + temp_fetch_len = need; + ldout(cct, 10) << "_is_readable noting temp_fetch_len " << temp_fetch_len << dendl; } ldout(cct, 10) << "_is_readable: not readable, returning false" << dendl; @@ -921,6 +916,7 @@ bool Journaler::is_readable() } + /* try_read_entry(bl) * read entry into bl if it's ready. * otherwise, do nothing. (well, we'll start fetching it for good measure.) @@ -931,28 +927,17 @@ bool Journaler::try_read_entry(bufferlist& bl) ldout(cct, 10) << "try_read_entry at " << read_pos << " not readable" << dendl; return false; } - - uint32_t s; - { - bufferlist::iterator p = read_buf.begin(); - ::decode(s, p); - } - assert(read_buf.length() >= sizeof(s) + s); - - ldout(cct, 10) << "try_read_entry at " << read_pos << " reading " - << read_pos << "~" << (sizeof(s)+s) << " (have " << read_buf.length() << ")" << dendl; - if (s == 0) { - ldout(cct, 0) << "try_read_entry got 0 len entry at offset " << read_pos << dendl; - error = -EINVAL; - return false; + uint64_t start_ptr; + size_t consumed = journal_stream.read(read_buf, bl, start_ptr); + if (stream_format >= JOURNAL_FORMAT_RESILIENT) { + assert(start_ptr == read_pos); } - // do it - assert(bl.length() == 0); - read_buf.splice(0, sizeof(s)); - read_buf.splice(0, s, &bl); - read_pos += sizeof(s) + s; + ldout(cct, 10) << "try_read_entry at " << read_pos << " read " + << read_pos << "~" << consumed << " (have " << read_buf.length() << ")" << dendl; + + read_pos += consumed; // prefetch? _prefetch(); @@ -1061,4 +1046,133 @@ void Journaler::handle_write_error(int r) } +/** + * Test whether the 'read_buf' byte stream has enough data to read + * an entry + * + * sets 'next_envelope_size' to the number of bytes needed to advance (enough + * to get the next header if header was unavailable, or enough to get the whole + * next entry if the header was available but the body wasn't). + */ +bool JournalStream::readable(bufferlist &read_buf, uint64_t &need) +{ + // have enough for entry size? + uint32_t entry_size = 0; + uint64_t start_ptr = 0; + uint64_t entry_sentinel = 0; + bufferlist::iterator p = read_buf.begin(); + + // Do we have enough data to decode an entry header? + if (format >= JOURNAL_FORMAT_RESILIENT) { + need = sizeof(entry_size) + sizeof(entry_sentinel); + } else { + need = sizeof(entry_size); + } + if (read_buf.length() >= sizeof(entry_size) + sizeof(entry_sentinel) + sizeof(start_ptr)) { + if (format >= JOURNAL_FORMAT_RESILIENT) { + ::decode(entry_sentinel, p); + } + + ::decode(entry_size, p); + if (format >= JOURNAL_FORMAT_RESILIENT) { + if (entry_sentinel != sentinel) { + throw buffer::malformed_input("Invalid sentinel"); + } + } + } else { + return false; + } + + // Do we have enough data to decode an entry header and payload? + if (format >= JOURNAL_FORMAT_RESILIENT) { + need = sizeof(entry_size) + sizeof(entry_sentinel) + entry_size + sizeof(start_ptr); + } else { + need = sizeof(entry_size) + entry_size; + } + if (read_buf.length() >= need) { + return true; // No more bytes needed + } + + return false; +} + + +/** + * Consume one entry from a journal byte stream 'from', splicing a + * serialized LogEvent blob into 'entry'. + * + * 'entry' must be initially empty. 'from' must contain sufficient + * valid data (i.e. readable is true). + * + * 'offset' will be set to the entry's start pointer, if the collection + * format provides it. + * + * Note that the number of bytes consumed is *not* equal to the + * length of the blob returned: the former includes envelope data + * while the latter is just the inner LogEvent serialized. + */ +size_t JournalStream::read(bufferlist &from, bufferlist &entry, uint64_t &start_ptr) +{ + assert(entry.length() == 0); + + uint64_t entry_sentinel = 0; + uint32_t entry_size; + { + bufferlist::iterator p = from.begin(); + if (format >= JOURNAL_FORMAT_RESILIENT) { + ::decode(entry_sentinel, p); + } + ::decode(entry_size, p); + p.advance(entry_size); + if (format >= JOURNAL_FORMAT_RESILIENT) { + ::decode(start_ptr, p); + } else { + start_ptr = 0; + } + } + if (format >= JOURNAL_FORMAT_RESILIENT) { + assert(entry_sentinel == sentinel); + } + assert(from.length() >= sizeof(entry_size) + sizeof(entry_sentinel) + entry_size + sizeof(start_ptr)); + assert(entry_size != 0); + + if (format >= JOURNAL_FORMAT_RESILIENT) { + from.splice(0, sizeof(entry_sentinel)); + } + from.splice(0, sizeof(entry_size)); + from.splice(0, entry_size, &entry); + if (format >= JOURNAL_FORMAT_RESILIENT) { + from.splice(0, sizeof(start_ptr)); + } + + if (format >= JOURNAL_FORMAT_RESILIENT) { + return (sizeof(entry_sentinel) + sizeof(entry_size) + entry_size + sizeof(start_ptr)); + } else { + return (sizeof(entry_size) + entry_size); + } +} + + +/** + * Append one entry + */ +size_t JournalStream::write(bufferlist &entry, bufferlist &to, uint64_t const &start_ptr) +{ + uint32_t const entry_size = entry.length(); + if (format >= JOURNAL_FORMAT_RESILIENT) { + ::encode(sentinel, to); + } + ::encode(entry_size, to); + to.claim_append(entry); + if (format >= JOURNAL_FORMAT_RESILIENT) { + ::encode(start_ptr, to); + } + + if (format >= JOURNAL_FORMAT_RESILIENT) { + return sizeof(sentinel) + sizeof(entry_size) + entry_size + sizeof(start_ptr); + } else { + return sizeof(entry_size) + entry_size; + } +} + // eof. diff --git a/src/osdc/Journaler.h b/src/osdc/Journaler.h index e3a57135b322..a5f9c217fac6 100644 --- a/src/osdc/Journaler.h +++ b/src/osdc/Journaler.h @@ -60,6 +60,39 @@ class CephContext; class Context; class PerfCounters; +typedef __u8 stream_format_t; +#define JOURNAL_FORMAT_LEGACY 0 +#define JOURNAL_FORMAT_RESILIENT 1 + + +/** + * Represents a collection of entries serialized in a byte stream. + * + * Each entry consists of: + * - a blob (used by the next level up as a serialized LogEvent) + * - a uint64_t (used by the next level up as a pointer to the start of the entry + * in the collection bytestream) + */ +class JournalStream +{ + stream_format_t format; + + public: + JournalStream() : format(JOURNAL_FORMAT_RESILIENT) {} + JournalStream(stream_format_t format_) : format(format_) {} + + void set_format(stream_format_t format_) {format = format_;} + + bool readable(bufferlist &bl, uint64_t &need); + size_t read(bufferlist &from, bufferlist &to, uint64_t &start_ptr); + size_t write(bufferlist &entry, bufferlist &to, uint64_t const &start_ptr); + + // A magic number for the start of journal entries, so that we can + // identify them in damaged journals. + static const uint64_t sentinel = 0x3141592653589793; +}; + + class Journaler { public: CephContext *cct; @@ -70,16 +103,18 @@ public: uint64_t unused_field; uint64_t write_pos; string magic; - ceph_file_layout layout; + ceph_file_layout layout; //< The mapping from byte stream offsets to RADOS objects + stream_format_t stream_format; //< The encoding of LogEvents within the journal byte stream Header(const char *m="") : trimmed_pos(0), expire_pos(0), unused_field(0), write_pos(0), - magic(m) { + magic(m), + stream_format(JOURNAL_FORMAT_RESILIENT) { memset(&layout, 0, sizeof(layout)); } void encode(bufferlist &bl) const { - __u8 struct_v = 1; + __u8 struct_v = 2; ::encode(struct_v, bl); ::encode(magic, bl); ::encode(trimmed_pos, bl); @@ -87,6 +122,7 @@ public: ::encode(unused_field, bl); ::encode(write_pos, bl); ::encode(layout, bl); + ::encode(stream_format, bl); } void decode(bufferlist::iterator &bl) { __u8 struct_v; @@ -97,6 +133,11 @@ public: ::decode(unused_field, bl); ::decode(write_pos, bl); ::decode(layout, bl); + if (struct_v > 1) { + ::decode(stream_format, bl); + } else { + stream_format = JOURNAL_FORMAT_LEGACY; + } } void dump(Formatter *f) const { @@ -123,12 +164,16 @@ public: static void generate_test_instances(list &ls) { ls.push_back(new Header()); + ls.push_back(new Header()); ls.back()->trimmed_pos = 1; ls.back()->expire_pos = 2; ls.back()->unused_field = 3; ls.back()->write_pos = 4; ls.back()->magic = "magique"; + + ls.push_back(new Header()); + ls.back()->stream_format = JOURNAL_FORMAT_RESILIENT; } } last_written, last_committed; WRITE_CLASS_ENCODER(Header) @@ -139,6 +184,9 @@ private: int64_t pg_pool; bool readonly; ceph_file_layout layout; + uint32_t stream_format; + JournalStream journal_stream; + const char *magic; Objecter *objecter;