init_headers(h);
set_layout(&h.layout);
+ stream_format = h.stream_format;
+ journal_stream.set_format(h.stream_format);
ldout(cct, 1) << "_finish_read_head " << h << ". probing for end of log (from " << write_pos << ")..." << dendl;
C_ProbeEnd *fin = new C_ProbeEnd(this);
}
+
uint64_t Journaler::append_entry(bufferlist& bl)
{
assert(!readonly);
}
}
- ldout(cct, 10) << "append_entry len " << bl.length() << " to " << write_pos << "~" << (bl.length() + sizeof(uint32_t)) << dendl;
// append
- ::encode(s, write_buf);
- write_buf.claim_append(bl);
- write_pos += sizeof(s) + s;
+ size_t wrote = journal_stream.write(bl, write_buf, write_pos);
+ ldout(cct, 10) << "append_entry len " << s << " to " << write_pos << "~" << wrote << dendl;
+ write_pos += wrote;
// flush previous object?
uint64_t su = get_layout_period();
}
}
+
/*
* _is_readable() - return true if next entry is ready.
*/
if (read_pos == write_pos)
return false;
- // have enough for entry size?
- uint32_t s = 0;
- bufferlist::iterator p = read_buf.begin();
- if (read_buf.length() >= sizeof(s))
- ::decode(s, p);
-
- // entry and payload?
- if (read_buf.length() >= sizeof(s) &&
- read_buf.length() >= sizeof(s) + s)
- return true; // yep, next entry is ready.
+ // Check if the retrieve bytestream has enough for an entry
+ uint64_t need;
+ if (journal_stream.readable(read_buf, need)) {
+ return true;
+ }
ldout (cct, 10) << "_is_readable read_buf.length() == " << read_buf.length()
- << ", but need " << s + sizeof(s)
- << " for next entry; fetch_len is " << fetch_len << dendl;
+ << ", but need " << need << " for next entry; fetch_len is " << fetch_len << dendl;
// partial fragment at the end?
if (received_pos == write_pos) {
return false;
}
- uint64_t need = sizeof(s) + s;
if (need > fetch_len) {
- temp_fetch_len = sizeof(s) + s;
- ldout(cct, 10) << "_is_readable noting temp_fetch_len " << temp_fetch_len
- << " for len " << s << " entry" << dendl;
+ temp_fetch_len = need;
+ ldout(cct, 10) << "_is_readable noting temp_fetch_len " << temp_fetch_len << dendl;
}
ldout(cct, 10) << "_is_readable: not readable, returning false" << dendl;
}
+
/* try_read_entry(bl)
* read entry into bl if it's ready.
* otherwise, do nothing. (well, we'll start fetching it for good measure.)
ldout(cct, 10) << "try_read_entry at " << read_pos << " not readable" << dendl;
return false;
}
-
- uint32_t s;
- {
- bufferlist::iterator p = read_buf.begin();
- ::decode(s, p);
- }
- assert(read_buf.length() >= sizeof(s) + s);
-
- ldout(cct, 10) << "try_read_entry at " << read_pos << " reading "
- << read_pos << "~" << (sizeof(s)+s) << " (have " << read_buf.length() << ")" << dendl;
- if (s == 0) {
- ldout(cct, 0) << "try_read_entry got 0 len entry at offset " << read_pos << dendl;
- error = -EINVAL;
- return false;
+ uint64_t start_ptr;
+ size_t consumed = journal_stream.read(read_buf, bl, start_ptr);
+ if (stream_format >= JOURNAL_FORMAT_RESILIENT) {
+ assert(start_ptr == read_pos);
}
- // do it
- assert(bl.length() == 0);
- read_buf.splice(0, sizeof(s));
- read_buf.splice(0, s, &bl);
- read_pos += sizeof(s) + s;
+ ldout(cct, 10) << "try_read_entry at " << read_pos << " read "
+ << read_pos << "~" << consumed << " (have " << read_buf.length() << ")" << dendl;
+
+ read_pos += consumed;
// prefetch?
_prefetch();
}
+/**
+ * Test whether the 'read_buf' byte stream has enough data to read
+ * an entry
+ *
+ * sets 'next_envelope_size' to the number of bytes needed to advance (enough
+ * to get the next header if header was unavailable, or enough to get the whole
+ * next entry if the header was available but the body wasn't).
+ */
+bool JournalStream::readable(bufferlist &read_buf, uint64_t &need)
+{
+ // have enough for entry size?
+ uint32_t entry_size = 0;
+ uint64_t start_ptr = 0;
+ uint64_t entry_sentinel = 0;
+ bufferlist::iterator p = read_buf.begin();
+
+ // Do we have enough data to decode an entry header?
+ if (format >= JOURNAL_FORMAT_RESILIENT) {
+ need = sizeof(entry_size) + sizeof(entry_sentinel);
+ } else {
+ need = sizeof(entry_size);
+ }
+ if (read_buf.length() >= sizeof(entry_size) + sizeof(entry_sentinel) + sizeof(start_ptr)) {
+ if (format >= JOURNAL_FORMAT_RESILIENT) {
+ ::decode(entry_sentinel, p);
+ }
+
+ ::decode(entry_size, p);
+ if (format >= JOURNAL_FORMAT_RESILIENT) {
+ if (entry_sentinel != sentinel) {
+ throw buffer::malformed_input("Invalid sentinel");
+ }
+ }
+ } else {
+ return false;
+ }
+
+ // Do we have enough data to decode an entry header and payload?
+ if (format >= JOURNAL_FORMAT_RESILIENT) {
+ need = sizeof(entry_size) + sizeof(entry_sentinel) + entry_size + sizeof(start_ptr);
+ } else {
+ need = sizeof(entry_size) + entry_size;
+ }
+ if (read_buf.length() >= need) {
+ return true; // No more bytes needed
+ }
+
+ return false;
+}
+
+
+/**
+ * Consume one entry from a journal byte stream 'from', splicing a
+ * serialized LogEvent blob into 'entry'.
+ *
+ * 'entry' must be initially empty. 'from' must contain sufficient
+ * valid data (i.e. readable is true).
+ *
+ * 'offset' will be set to the entry's start pointer, if the collection
+ * format provides it.
+ *
+ * Note that the number of bytes consumed is *not* equal to the
+ * length of the blob returned: the former includes envelope data
+ * while the latter is just the inner LogEvent serialized.
+ */
+size_t JournalStream::read(bufferlist &from, bufferlist &entry, uint64_t &start_ptr)
+{
+ assert(entry.length() == 0);
+
+ uint64_t entry_sentinel = 0;
+ uint32_t entry_size;
+ {
+ bufferlist::iterator p = from.begin();
+ if (format >= JOURNAL_FORMAT_RESILIENT) {
+ ::decode(entry_sentinel, p);
+ }
+ ::decode(entry_size, p);
+ p.advance(entry_size);
+ if (format >= JOURNAL_FORMAT_RESILIENT) {
+ ::decode(start_ptr, p);
+ } else {
+ start_ptr = 0;
+ }
+ }
+ if (format >= JOURNAL_FORMAT_RESILIENT) {
+ assert(entry_sentinel == sentinel);
+ }
+ assert(from.length() >= sizeof(entry_size) + sizeof(entry_sentinel) + entry_size + sizeof(start_ptr));
+ assert(entry_size != 0);
+
+ if (format >= JOURNAL_FORMAT_RESILIENT) {
+ from.splice(0, sizeof(entry_sentinel));
+ }
+ from.splice(0, sizeof(entry_size));
+ from.splice(0, entry_size, &entry);
+ if (format >= JOURNAL_FORMAT_RESILIENT) {
+ from.splice(0, sizeof(start_ptr));
+ }
+
+ if (format >= JOURNAL_FORMAT_RESILIENT) {
+ return (sizeof(entry_sentinel) + sizeof(entry_size) + entry_size + sizeof(start_ptr));
+ } else {
+ return (sizeof(entry_size) + entry_size);
+ }
+}
+
+
+/**
+ * Append one entry
+ */
+size_t JournalStream::write(bufferlist &entry, bufferlist &to, uint64_t const &start_ptr)
+{
+ uint32_t const entry_size = entry.length();
+ if (format >= JOURNAL_FORMAT_RESILIENT) {
+ ::encode(sentinel, to);
+ }
+ ::encode(entry_size, to);
+ to.claim_append(entry);
+ if (format >= JOURNAL_FORMAT_RESILIENT) {
+ ::encode(start_ptr, to);
+ }
+
+ if (format >= JOURNAL_FORMAT_RESILIENT) {
+ return sizeof(sentinel) + sizeof(entry_size) + entry_size + sizeof(start_ptr);
+ } else {
+ return sizeof(entry_size) + entry_size;
+ }
+}
+
// eof.
class Context;
class PerfCounters;
+typedef __u8 stream_format_t;
+#define JOURNAL_FORMAT_LEGACY 0
+#define JOURNAL_FORMAT_RESILIENT 1
+
+
+/**
+ * Represents a collection of entries serialized in a byte stream.
+ *
+ * Each entry consists of:
+ * - a blob (used by the next level up as a serialized LogEvent)
+ * - a uint64_t (used by the next level up as a pointer to the start of the entry
+ * in the collection bytestream)
+ */
+class JournalStream
+{
+ stream_format_t format;
+
+ public:
+ JournalStream() : format(JOURNAL_FORMAT_RESILIENT) {}
+ JournalStream(stream_format_t format_) : format(format_) {}
+
+ void set_format(stream_format_t format_) {format = format_;}
+
+ bool readable(bufferlist &bl, uint64_t &need);
+ size_t read(bufferlist &from, bufferlist &to, uint64_t &start_ptr);
+ size_t write(bufferlist &entry, bufferlist &to, uint64_t const &start_ptr);
+
+ // A magic number for the start of journal entries, so that we can
+ // identify them in damaged journals.
+ static const uint64_t sentinel = 0x3141592653589793;
+};
+
+
class Journaler {
public:
CephContext *cct;
uint64_t unused_field;
uint64_t write_pos;
string magic;
- ceph_file_layout layout;
+ ceph_file_layout layout; //< The mapping from byte stream offsets to RADOS objects
+ stream_format_t stream_format; //< The encoding of LogEvents within the journal byte stream
Header(const char *m="") :
trimmed_pos(0), expire_pos(0), unused_field(0), write_pos(0),
- magic(m) {
+ magic(m),
+ stream_format(JOURNAL_FORMAT_RESILIENT) {
memset(&layout, 0, sizeof(layout));
}
void encode(bufferlist &bl) const {
- __u8 struct_v = 1;
+ __u8 struct_v = 2;
::encode(struct_v, bl);
::encode(magic, bl);
::encode(trimmed_pos, bl);
::encode(unused_field, bl);
::encode(write_pos, bl);
::encode(layout, bl);
+ ::encode(stream_format, bl);
}
void decode(bufferlist::iterator &bl) {
__u8 struct_v;
::decode(unused_field, bl);
::decode(write_pos, bl);
::decode(layout, bl);
+ if (struct_v > 1) {
+ ::decode(stream_format, bl);
+ } else {
+ stream_format = JOURNAL_FORMAT_LEGACY;
+ }
}
void dump(Formatter *f) const {
static void generate_test_instances(list<Header*> &ls)
{
ls.push_back(new Header());
+
ls.push_back(new Header());
ls.back()->trimmed_pos = 1;
ls.back()->expire_pos = 2;
ls.back()->unused_field = 3;
ls.back()->write_pos = 4;
ls.back()->magic = "magique";
+
+ ls.push_back(new Header());
+ ls.back()->stream_format = JOURNAL_FORMAT_RESILIENT;
}
} last_written, last_committed;
WRITE_CLASS_ENCODER(Header)
int64_t pg_pool;
bool readonly;
ceph_file_layout layout;
+ uint32_t stream_format;
+ JournalStream journal_stream;
+
const char *magic;
Objecter *objecter;