]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
osdc: Revise Journaler format
authorJohn Spray <john.spray@inktank.com>
Tue, 25 Mar 2014 13:30:50 +0000 (13:30 +0000)
committerJohn Spray <john.spray@inktank.com>
Sun, 18 May 2014 10:21:28 +0000 (11:21 +0100)
* Separate journal encoding/envelope format
  code (JournalStream) from I/O code (Journaler)
* Add new sentinel and start_ptr fields to
  prefix and suffix of log events.
* Add journal encoding version to journal header

Signed-off-by: John Spray <john.spray@inktank.com>
src/osdc/Journaler.cc
src/osdc/Journaler.h

index ba4ca8dc4b92738181922258882799031d7cc651..28b10977244a85ad835820df62e0cbd30aae81df 100644 (file)
@@ -226,6 +226,8 @@ void Journaler::_finish_read_head(int r, bufferlist& bl)
 
   init_headers(h);
   set_layout(&h.layout);
+  stream_format = h.stream_format;
+  journal_stream.set_format(h.stream_format);
 
   ldout(cct, 1) << "_finish_read_head " << h << ".  probing for end of log (from " << write_pos << ")..." << dendl;
   C_ProbeEnd *fin = new C_ProbeEnd(this);
@@ -429,6 +431,7 @@ void Journaler::_finish_flush(int r, uint64_t start, utime_t stamp)
 }
 
 
+
 uint64_t Journaler::append_entry(bufferlist& bl)
 {
   assert(!readonly);
@@ -457,12 +460,11 @@ uint64_t Journaler::append_entry(bufferlist& bl)
     }
   }
        
-  ldout(cct, 10) << "append_entry len " << bl.length() << " to " << write_pos << "~" << (bl.length() + sizeof(uint32_t)) << dendl;
   
   // append
-  ::encode(s, write_buf);
-  write_buf.claim_append(bl);
-  write_pos += sizeof(s) + s;
+  size_t wrote = journal_stream.write(bl, write_buf, write_pos);
+  ldout(cct, 10) << "append_entry len " << s << " to " << write_pos << "~" << wrote << dendl;
+  write_pos += wrote;
 
   // flush previous object?
   uint64_t su = get_layout_period();
@@ -858,6 +860,7 @@ void Journaler::_prefetch()
   }
 }
 
+
 /*
  * _is_readable() - return true if next entry is ready.
  */
@@ -867,20 +870,14 @@ bool Journaler::_is_readable()
   if (read_pos == write_pos)
     return false;
 
-  // have enough for entry size?
-  uint32_t s = 0;
-  bufferlist::iterator p = read_buf.begin();
-  if (read_buf.length() >= sizeof(s))
-    ::decode(s, p);
-
-  // entry and payload?
-  if (read_buf.length() >= sizeof(s) &&
-      read_buf.length() >= sizeof(s) + s) 
-    return true;  // yep, next entry is ready.
+  // Check if the retrieve bytestream has enough for an entry
+  uint64_t need;
+  if (journal_stream.readable(read_buf, need)) {
+    return true;
+  }
 
   ldout (cct, 10) << "_is_readable read_buf.length() == " << read_buf.length()
-                 << ", but need " << s + sizeof(s)
-                 << " for next entry; fetch_len is " << fetch_len << dendl;
+                 << ", but need " << need << " for next entry; fetch_len is " << fetch_len << dendl;
 
   // partial fragment at the end?
   if (received_pos == write_pos) {
@@ -899,11 +896,9 @@ bool Journaler::_is_readable()
     return false;
   }
 
-  uint64_t need = sizeof(s) + s;
   if (need > fetch_len) {
-    temp_fetch_len = sizeof(s) + s;
-    ldout(cct, 10) << "_is_readable noting temp_fetch_len " << temp_fetch_len
-            << " for len " << s << " entry" << dendl;
+    temp_fetch_len = need;
+    ldout(cct, 10) << "_is_readable noting temp_fetch_len " << temp_fetch_len << dendl;
   }
 
   ldout(cct, 10) << "_is_readable: not readable, returning false" << dendl;
@@ -921,6 +916,7 @@ bool Journaler::is_readable()
 }
 
 
+
 /* try_read_entry(bl)
  *  read entry into bl if it's ready.
  *  otherwise, do nothing.  (well, we'll start fetching it for good measure.)
@@ -931,28 +927,17 @@ bool Journaler::try_read_entry(bufferlist& bl)
     ldout(cct, 10) << "try_read_entry at " << read_pos << " not readable" << dendl;
     return false;
   }
-  
-  uint32_t s;
-  {
-    bufferlist::iterator p = read_buf.begin();
-    ::decode(s, p);
-  }
-  assert(read_buf.length() >= sizeof(s) + s);
-  
-  ldout(cct, 10) << "try_read_entry at " << read_pos << " reading " 
-          << read_pos << "~" << (sizeof(s)+s) << " (have " << read_buf.length() << ")" << dendl;
 
-  if (s == 0) {
-    ldout(cct, 0) << "try_read_entry got 0 len entry at offset " << read_pos << dendl;
-    error = -EINVAL;
-    return false;
+  uint64_t start_ptr;
+  size_t consumed = journal_stream.read(read_buf, bl, start_ptr);
+  if (stream_format >= JOURNAL_FORMAT_RESILIENT) {
+    assert(start_ptr == read_pos);
   }
 
-  // do it
-  assert(bl.length() == 0);
-  read_buf.splice(0, sizeof(s));
-  read_buf.splice(0, s, &bl);
-  read_pos += sizeof(s) + s;
+  ldout(cct, 10) << "try_read_entry at " << read_pos << " read " 
+          << read_pos << "~" << consumed << " (have " << read_buf.length() << ")" << dendl;
+
+  read_pos += consumed;
 
   // prefetch?
   _prefetch();
@@ -1061,4 +1046,133 @@ void Journaler::handle_write_error(int r)
 }
 
 
+/**
+ * Test whether the 'read_buf' byte stream has enough data to read
+ * an entry
+ *
+ * sets 'next_envelope_size' to the number of bytes needed to advance (enough
+ * to get the next header if header was unavailable, or enough to get the whole
+ * next entry if the header was available but the body wasn't).
+ */
+bool JournalStream::readable(bufferlist &read_buf, uint64_t &need)
+{
+  // have enough for entry size?
+  uint32_t entry_size = 0;
+  uint64_t start_ptr = 0;
+  uint64_t entry_sentinel = 0;
+  bufferlist::iterator p = read_buf.begin();
+
+  // Do we have enough data to decode an entry header?
+  if (format >= JOURNAL_FORMAT_RESILIENT) {
+    need = sizeof(entry_size) + sizeof(entry_sentinel);
+  } else {
+    need = sizeof(entry_size);
+  }
+  if (read_buf.length() >= sizeof(entry_size) + sizeof(entry_sentinel) + sizeof(start_ptr)) {
+    if (format >= JOURNAL_FORMAT_RESILIENT) {
+      ::decode(entry_sentinel, p);
+    }
+
+    ::decode(entry_size, p);
+    if (format >= JOURNAL_FORMAT_RESILIENT) {
+      if (entry_sentinel != sentinel) {
+        throw buffer::malformed_input("Invalid sentinel"); 
+      }
+    }
+  } else {
+    return false;
+  }
+
+  // Do we have enough data to decode an entry header and payload?
+  if (format >= JOURNAL_FORMAT_RESILIENT) {
+    need = sizeof(entry_size) + sizeof(entry_sentinel) + entry_size + sizeof(start_ptr);
+  } else {
+    need = sizeof(entry_size) + entry_size;
+  }
+  if (read_buf.length() >= need) {
+    return true;  // No more bytes needed
+  }
+
+  return false;
+}
+
+
+/**
+ * Consume one entry from a journal byte stream 'from', splicing a
+ * serialized LogEvent blob into 'entry'.
+ *
+ * 'entry' must be initially empty.  'from' must contain sufficient
+ * valid data (i.e. readable is true).
+ *
+ * 'offset' will be set to the entry's start pointer, if the collection
+ * format provides it.
+ *
+ * Note that the number of bytes consumed is *not* equal to the
+ * length of the blob returned: the former includes envelope data
+ * while the latter is just the inner LogEvent serialized.
+ */
+size_t JournalStream::read(bufferlist &from, bufferlist &entry, uint64_t &start_ptr)
+{
+  assert(entry.length() == 0);
+
+  uint64_t entry_sentinel = 0;
+  uint32_t entry_size;
+  {
+    bufferlist::iterator p = from.begin();
+    if (format >= JOURNAL_FORMAT_RESILIENT) {
+      ::decode(entry_sentinel, p);
+    }
+    ::decode(entry_size, p);
+    p.advance(entry_size);
+    if (format >= JOURNAL_FORMAT_RESILIENT) {
+      ::decode(start_ptr, p);
+    } else {
+      start_ptr = 0;
+    }
+  }
+  if (format >= JOURNAL_FORMAT_RESILIENT) {
+    assert(entry_sentinel == sentinel);
+  }
+  assert(from.length() >= sizeof(entry_size) + sizeof(entry_sentinel) + entry_size + sizeof(start_ptr));
+  assert(entry_size != 0);
+  
+  if (format >= JOURNAL_FORMAT_RESILIENT) {
+    from.splice(0, sizeof(entry_sentinel));
+  }
+  from.splice(0, sizeof(entry_size));
+  from.splice(0, entry_size, &entry);
+  if (format >= JOURNAL_FORMAT_RESILIENT) {
+    from.splice(0, sizeof(start_ptr));
+  }
+
+  if (format >= JOURNAL_FORMAT_RESILIENT) {
+    return (sizeof(entry_sentinel) + sizeof(entry_size) + entry_size + sizeof(start_ptr));
+  } else {
+    return (sizeof(entry_size) + entry_size);
+  }
+}
+
+
+/**
+ * Append one entry
+ */
+size_t JournalStream::write(bufferlist &entry, bufferlist &to, uint64_t const &start_ptr)
+{
+  uint32_t const entry_size = entry.length();
+  if (format >= JOURNAL_FORMAT_RESILIENT) {
+    ::encode(sentinel, to);
+  }
+  ::encode(entry_size, to);
+  to.claim_append(entry);
+  if (format >= JOURNAL_FORMAT_RESILIENT) {
+    ::encode(start_ptr, to);
+  }
+
+  if (format >= JOURNAL_FORMAT_RESILIENT) {
+    return sizeof(sentinel) + sizeof(entry_size) + entry_size + sizeof(start_ptr);
+  } else {
+    return sizeof(entry_size) + entry_size;
+  }
+}
+
 // eof.
index e3a57135b322ecfbbe07aef7a346f34f0725905a..a5f9c217fac69c8f0cd51b1657d1a4f957401db5 100644 (file)
@@ -60,6 +60,39 @@ class CephContext;
 class Context;
 class PerfCounters;
 
+typedef __u8 stream_format_t;
+#define JOURNAL_FORMAT_LEGACY 0
+#define JOURNAL_FORMAT_RESILIENT 1
+
+
+/**
+ * Represents a collection of entries serialized in a byte stream.
+ *
+ * Each entry consists of:
+ *  - a blob (used by the next level up as a serialized LogEvent)
+ *  - a uint64_t (used by the next level up as a pointer to the start of the entry
+ *    in the collection bytestream)
+ */
+class JournalStream
+{
+  stream_format_t format;
+
+  public:
+  JournalStream() : format(JOURNAL_FORMAT_RESILIENT) {}
+  JournalStream(stream_format_t format_) : format(format_) {}
+
+  void set_format(stream_format_t format_) {format = format_;}
+
+  bool readable(bufferlist &bl, uint64_t &need);
+  size_t read(bufferlist &from, bufferlist &to, uint64_t &start_ptr);
+  size_t write(bufferlist &entry, bufferlist &to, uint64_t const &start_ptr);
+
+  // A magic number for the start of journal entries, so that we can
+  // identify them in damaged journals.
+  static const uint64_t sentinel = 0x3141592653589793;
+};
+
+
 class Journaler {
 public:
   CephContext *cct;
@@ -70,16 +103,18 @@ public:
     uint64_t unused_field;
     uint64_t write_pos;
     string magic;
-    ceph_file_layout layout;
+    ceph_file_layout layout; //< The mapping from byte stream offsets to RADOS objects
+    stream_format_t stream_format; //< The encoding of LogEvents within the journal byte stream
 
     Header(const char *m="") :
       trimmed_pos(0), expire_pos(0), unused_field(0), write_pos(0),
-      magic(m) {
+      magic(m),
+      stream_format(JOURNAL_FORMAT_RESILIENT) {
       memset(&layout, 0, sizeof(layout));
     }
 
     void encode(bufferlist &bl) const {
-      __u8 struct_v = 1;
+      __u8 struct_v = 2;
       ::encode(struct_v, bl);
       ::encode(magic, bl);
       ::encode(trimmed_pos, bl);
@@ -87,6 +122,7 @@ public:
       ::encode(unused_field, bl);
       ::encode(write_pos, bl);
       ::encode(layout, bl);
+      ::encode(stream_format, bl);
     }
     void decode(bufferlist::iterator &bl) {
       __u8 struct_v;
@@ -97,6 +133,11 @@ public:
       ::decode(unused_field, bl);
       ::decode(write_pos, bl);
       ::decode(layout, bl);
+      if (struct_v > 1) {
+        ::decode(stream_format, bl);
+      } else {
+        stream_format = JOURNAL_FORMAT_LEGACY;
+      }
     }
 
     void dump(Formatter *f) const {
@@ -123,12 +164,16 @@ public:
     static void generate_test_instances(list<Header*> &ls)
     {
       ls.push_back(new Header());
+
       ls.push_back(new Header());
       ls.back()->trimmed_pos = 1;
       ls.back()->expire_pos = 2;
       ls.back()->unused_field = 3;
       ls.back()->write_pos = 4;
       ls.back()->magic = "magique";
+
+      ls.push_back(new Header());
+      ls.back()->stream_format = JOURNAL_FORMAT_RESILIENT;
     }
   } last_written, last_committed;
   WRITE_CLASS_ENCODER(Header)
@@ -139,6 +184,9 @@ private:
   int64_t pg_pool;
   bool readonly;
   ceph_file_layout layout;
+  uint32_t stream_format;
+  JournalStream journal_stream;
+
 
   const char *magic;
   Objecter *objecter;