]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
mds: add atomic log rewrite on format change
authorJohn Spray <john.spray@inktank.com>
Tue, 6 May 2014 12:18:03 +0000 (13:18 +0100)
committerJohn Spray <john.spray@inktank.com>
Tue, 20 May 2014 13:07:50 +0000 (14:07 +0100)
Two main pieces to this:
 * A new JournalPointer object that stores two journal
   inodes so that we can do a double-buffered update,
   followed by an atomic swap.
 * An extended recovery process in MDLog that dereferences
   the JournalPointer and conditionally rewrites the
   journal to accomodate format updates.

The JournalPointer indirection should also be useful for
making cephfs-journal-tool do updates more safely.

Signed-off-by: John Spray <john.spray@inktank.com>
src/mds/MDLog.cc
src/mds/MDLog.h
src/mds/mdstypes.h
src/osdc/Journaler.h

index c224695671bf2e5d1da0dabbe815864f7ed5f78a..a5287bf3dbee65c79cc0eed302b8e67ff979add7 100644 (file)
@@ -136,9 +136,10 @@ void MDLog::create(Context *c)
 void MDLog::open(Context *c)
 {
   dout(5) << "open discovering log bounds" << dendl;
-  init_journaler();
-  journaler->recover(c);
 
+  recovery_thread.set_completion(c);
+  recovery_thread.create();
+  recovery_thread.detach();
   // either append() or replay() will follow.
 }
 
@@ -489,6 +490,315 @@ public:
 };
 
 
+/**
+ * Resolve the JournalPointer object to a journal file, and
+ * instantiate a Journaler object.  This may re-write the journal
+ * if the journal in RADOS appears to be in an old format.
+ *
+ * This is a separate thread because of the way it is initialized from inside
+ * the mds lock, which is also the global objecter lock -- rather than split
+ * it up into hard-to-read async operations linked up by contexts, 
+ *
+ * When this function completes, the `journaler` attribute will be set to
+ * a Journaler instance using the latest available serialization format.
+ */
+void MDLog::_recovery_thread(Context *completion)
+{
+  assert(journaler == NULL);
+
+  // First, read the pointer object.
+  // If the pointer object is not present, then create it with
+  // front = default ino and back = null
+  JournalPointer jp;
+  int const read_result = _read_pointer(&jp);
+  if (read_result == -ENOENT) {
+    inodeno_t const default_log_ino = MDS_INO_LOG_OFFSET + mds->get_nodeid();
+    jp.front = default_log_ino;
+    int write_result = _write_pointer(jp);
+    // Nothing graceful we can do for this
+    assert(write_result >= 0);
+  } else if (read_result != 0) {
+    // No graceful way of handling this: give up and leave it for support
+    // to work out why RADOS preventing access.
+    assert(0);
+  }
+
+  // If the back pointer is non-null, that means that a journal
+  // rewrite failed part way through.  Erase the back journal
+  // to clean up.
+  if (jp.back) {
+    dout(1) << "Erasing journal " << jp.back << dendl;
+    C_SaferCond erase_waiter;
+    Journaler back(jp.back, mds->mdsmap->get_metadata_pool(), CEPH_FS_ONDISK_MAGIC,
+        mds->objecter, logger, l_mdl_jlat, &mds->timer);
+
+    // Read all about this journal (header + extents)
+    mds->mds_lock.Lock();
+    C_SaferCond recover_wait;
+    back.recover(&recover_wait);
+    mds->mds_lock.Unlock();
+    int recovery_result = recover_wait.wait();
+
+    // Journaler.recover succeeds if no journal objects are present: an error
+    // means something worse like a corrupt header, which we can't handle here.
+    assert(recovery_result == 0);
+    // We could read journal, so we can erase it.
+    mds->mds_lock.Lock();
+    back.erase(&erase_waiter);
+    mds->mds_lock.Unlock();
+    int erase_result = erase_waiter.wait();
+
+    // If we are successful, or find no data, we can update the JournalPointer to
+    // reflect that the back journal is gone.
+    if (erase_result != 0 && erase_result != -ENOENT) {
+      derr << "Failed to erase journal " << jp.back << ": " << cpp_strerror(erase_result) << dendl;
+    } else {
+      dout(1) << "Successfully erased journal, updating journal pointer" << dendl;
+      jp.back = 0;
+      int write_result = _write_pointer(jp);
+      // Nothing graceful we can do for this
+      assert(write_result >= 0);
+    }
+  }
+
+  /* Read the header from the front journal */
+  Journaler *front_journal = new Journaler(jp.front, mds->mdsmap->get_metadata_pool(),
+      CEPH_FS_ONDISK_MAGIC, mds->objecter, logger, l_mdl_jlat, &mds->timer);
+  C_SaferCond recover_wait;
+  mds->mds_lock.Lock();
+  front_journal->recover(&recover_wait);
+  mds->mds_lock.Unlock();
+  dout(4) << "Waiting for journal " << jp.front << " to recover..." << dendl;
+  int recovery_result = recover_wait.wait();
+  dout(4) << "Journal " << jp.front << " recovered." << dendl;
+
+  if (recovery_result != 0) {
+    derr << "Error recovering journal " << jp.front << ": " << cpp_strerror(recovery_result) << dendl;
+    completion->complete(recovery_result);
+    return;
+  }
+
+  /* Check whether the front journal format is acceptable or needs re-write */
+  if (front_journal->get_stream_format() >= JOURNAL_FORMAT_RESILIENT) {
+    /* Great, the journal is of current format and ready to rock, hook
+     * it into this->journaler and complete */
+    journaler = front_journal;
+    journaler->set_write_error_handler(new C_MDL_WriteError(this));
+    mds->mds_lock.Lock();
+    completion->complete(0);
+    mds->mds_lock.Unlock();
+  } else {
+    /* Hand off to reformat routine, which will ultimately set the
+     * completion when it has done its thing */
+    dout(1) << "Journal " << jp.front << " has old format "
+      << front_journal->get_stream_format() << ", it will now be updated" << dendl;
+
+    _reformat_journal(jp, front_journal, completion);
+  }
+}
+
+/**
+ * Blocking rewrite of the journal to a new file, followed by
+ * swap of journal pointer to point to the new one.
+ *
+ * We write the new journal to the 'back' journal from the JournalPointer,
+ * swapping pointers to make that one the front journal only when we have
+ * safely completed.
+ */
+void MDLog::_reformat_journal(JournalPointer const &jp_in, Journaler *old_journal, Context *completion)
+{
+  assert(!jp_in.is_null());
+  assert(completion != NULL);
+  assert(old_journal != NULL);
+
+  JournalPointer jp = jp_in;
+
+  /* Set JournalPointer.back to the location we will write the new journal */
+  inodeno_t primary_ino = MDS_INO_LOG_OFFSET + mds->get_nodeid();
+  inodeno_t secondary_ino = MDS_INO_LOG_BACKUP_OFFSET + mds->get_nodeid();
+  jp.back = (jp.front == primary_ino ? secondary_ino : primary_ino);
+  int write_result = _write_pointer(jp);
+  assert(write_result == 0);
+
+  /* Create the new Journaler file */
+  Journaler *new_journal = new Journaler(jp.back, mds->mdsmap->get_metadata_pool(),
+      CEPH_FS_ONDISK_MAGIC, mds->objecter, logger, l_mdl_jlat, &mds->timer);
+  dout(4) << "Writing new journal header " << jp.back << dendl;
+  ceph_file_layout new_layout = old_journal->get_layout();
+  new_journal->set_writeable();
+  new_journal->create(&new_layout);
+
+  /* Write the new journal header to RADOS */
+  C_SaferCond write_head_wait;
+  mds->mds_lock.Lock();
+  new_journal->write_head(&write_head_wait);
+  mds->mds_lock.Unlock();
+  write_head_wait.wait();
+
+  // Read in the old journal, and whenever we have readable events,
+  // write them to the new journal.
+  int r = 0;
+
+  // The logic in here borrowed from replay_thread expects mds_lock to be held,
+  // e.g. between checking readable and doing wait_for_readable so that journaler
+  // state doesn't change in between.
+  uint32_t events_transcribed = 0;
+  mds->mds_lock.Lock();
+  while (1) {
+    while (!old_journal->is_readable() &&
+          old_journal->get_read_pos() < old_journal->get_write_pos() &&
+          !old_journal->get_error()) {
+
+      // Issue a journal prefetch
+      C_SaferCond readable_waiter;
+      old_journal->wait_for_readable(&readable_waiter);
+
+      // Wait for a journal prefetch to complete
+      mds->mds_lock.Unlock();
+      readable_waiter.wait();
+      mds->mds_lock.Lock();
+    }
+    if (old_journal->get_error()) {
+      r = old_journal->get_error();
+      dout(0) << "_replay journaler got error " << r << ", aborting" << dendl;
+      break;
+    }
+
+    if (!old_journal->is_readable() &&
+       old_journal->get_read_pos() == old_journal->get_write_pos())
+      break;
+
+    // Read one serialized LogEvent
+    assert(old_journal->is_readable());
+    bufferlist bl;
+    bool r = old_journal->try_read_entry(bl);
+    if (!r && old_journal->get_error())
+      continue;
+    assert(r);
+
+    // Write (buffered, synchronous) one serialized LogEvent
+    events_transcribed += 1;
+    new_journal->append_entry(bl);
+
+    // Allow other I/O to advance, e.g. MDS beacons
+    mds->mds_lock.Unlock();
+    mds->mds_lock.Lock();
+  }
+  mds->mds_lock.Unlock();
+
+  dout(1) << "Transcribed " << events_transcribed << " events, flushing new journal" << dendl;
+  C_SaferCond flush_waiter;
+  mds->mds_lock.Lock();
+  new_journal->flush(&flush_waiter);
+  mds->mds_lock.Unlock();
+  flush_waiter.wait();
+
+  // If failed to rewrite journal, leave the part written journal
+  // as garbage to be cleaned up next startup.
+  assert(r == 0);
+
+  /* Now that the new journal is safe, we can flip the pointers */
+  inodeno_t const tmp = jp.front;
+  jp.front = jp.back;
+  jp.back = tmp;
+  write_result = _write_pointer(jp);
+  assert(write_result == 0);
+
+  /* Delete the old journal to free space */
+  dout(1) << "New journal flushed, erasing old journal" << dendl;
+  C_SaferCond erase_waiter;
+  mds->mds_lock.Lock();
+  old_journal->erase(&erase_waiter);
+  mds->mds_lock.Unlock();
+  int erase_result = erase_waiter.wait();
+  assert(erase_result == 0);
+  delete old_journal;
+
+  /* Update the pointer to reflect we're back in clean single journal state. */
+  jp.back = 0;
+  write_result = _write_pointer(jp);
+  assert(write_result == 0);
+
+  /* Reset the Journaler object to its default state */
+  dout(1) << "Journal rewrite complete, continuing with normal startup" << dendl;
+  journaler = new_journal;
+  journaler->set_readonly();
+  journaler->set_write_error_handler(new C_MDL_WriteError(this));
+
+  /* Trigger completion */
+  mds->mds_lock.Lock();
+  completion->complete(0);
+  mds->mds_lock.Unlock();
+}
+
+/**
+ * Blocking read of JournalPointer for this MDS
+ */
+int MDLog::_read_pointer(JournalPointer *jp)
+{
+  assert(!mds->mds_lock.is_locked_by_me());
+  assert(jp);
+
+  inodeno_t const pointer_ino = MDS_INO_LOG_POINTER_OFFSET + mds->get_nodeid();
+  char buf[32];
+  snprintf(buf, sizeof(buf), "%llx.%08llx", (long long unsigned)pointer_ino, (long long unsigned)0);
+
+  // Blocking read of data
+  dout(4) << "Reading journal pointer '" << buf << "'" << dendl;
+  bufferlist data;
+  C_SaferCond waiter;
+  mds->mds_lock.Lock();
+  mds->objecter->read_full(object_t(buf), object_locator_t(mds->mdsmap->get_metadata_pool()),
+      CEPH_NOSNAP, &data, 0, &waiter);
+  mds->mds_lock.Unlock();
+  int r = waiter.wait();
+
+  // Construct JournalPointer result, null or decoded data
+  if (r == 0) {
+    bufferlist::iterator q = data.begin();
+    jp->decode(q);
+  } else {
+    dout(1) << "Journal pointer '" << buf << "' read failed: " << cpp_strerror(r) << dendl;
+  }
+  return r;
+}
+
+
+/**
+ * Blocking write of JournalPointer for this MDS
+ *
+ * @return objecter write op status code
+ */
+int MDLog::_write_pointer(JournalPointer const &ptr)
+{
+  assert(!mds->mds_lock.is_locked_by_me());
+  // It is not valid to persist a null pointer
+  assert(!ptr.is_null());
+
+  // Calculate object ID
+  inodeno_t const pointer_ino = MDS_INO_LOG_POINTER_OFFSET + mds->get_nodeid();
+  char buf[32];
+  snprintf(buf, sizeof(buf), "%llx.%08llx", (long long unsigned)pointer_ino, (long long unsigned)0);
+  dout(4) << "Writing pointer object '" << buf << "': 0x"
+    << std::hex << ptr.front << ":0x" << ptr.back << std::dec << dendl;
+
+  // Serialize JournalPointer object
+  bufferlist data;
+  ptr.encode(data);
+
+  // Write to RADOS and wait for durability
+  C_SaferCond waiter;
+  mds->mds_lock.Lock();
+  mds->objecter->write_full(object_t(buf), object_locator_t(mds->mdsmap->get_metadata_pool()),
+      SnapContext(), data, ceph_clock_now(g_ceph_context), 0, NULL, &waiter);
+  mds->mds_lock.Unlock();
+  int write_result = waiter.wait();
+  if (write_result < 0) {
+    derr << "Error writing pointer object '" << buf << "': " << cpp_strerror(write_result) << dendl;
+  }
+  return write_result;
+}
+
 
 // i am a separate thread
 void MDLog::_replay_thread()
index 6e8e980c94e94a62b991f8b1f8dd0ca03f6ad138..ffdd7061bd48527c54f9f600c56eb3fef2f54981 100644 (file)
@@ -48,6 +48,7 @@ enum {
 #include <list>
 
 class Journaler;
+class JournalPointer;
 class LogEvent;
 class MDS;
 class LogSegment;
@@ -97,6 +98,22 @@ protected:
   void _replay();         // old way
   void _replay_thread();  // new way
 
+  // Journal recovery/rewrite logic
+  class RecoveryThread : public Thread {
+    MDLog *log;
+    Context *completion;
+  public:
+    void set_completion(Context *c) {completion = c;}
+    RecoveryThread(MDLog *l) : log(l), completion(NULL) {}
+    void* entry() {
+      log->_recovery_thread(completion);
+      return 0;
+    }
+  } recovery_thread;
+  void _recovery_thread(Context *completion);
+  int _read_pointer(JournalPointer *jp);
+  int _write_pointer(JournalPointer const &ptr);
+  void _reformat_journal(JournalPointer const &jp, Journaler *old_journal, Context *completion);
 
   // -- segments --
   map<uint64_t,LogSegment*> segments;
@@ -154,6 +171,7 @@ public:
                  logger(0),
                  replay_thread(this),
                  already_replayed(false),
+                 recovery_thread(this),
                  expiring_events(0), expired_events(0),
                  cur_event(NULL) { }             
   ~MDLog();
index 6a08e9d20d5152e65e1276f6e16077b94bbfb852..a7e495c72038d0d4892e9ac125205c8b4ebf6e9e 100644 (file)
@@ -51,9 +51,13 @@ using namespace std;
 #define MDS_INO_CEPH              2
 
 #define MDS_INO_MDSDIR_OFFSET     (1*MAX_MDS)
-#define MDS_INO_LOG_OFFSET        (2*MAX_MDS)
 #define MDS_INO_STRAY_OFFSET      (6*MAX_MDS)
 
+// Locations for journal data
+#define MDS_INO_LOG_OFFSET        (2*MAX_MDS)
+#define MDS_INO_LOG_BACKUP_OFFSET (3*MAX_MDS)
+#define MDS_INO_LOG_POINTER_OFFSET    (4*MAX_MDS)
+
 #define MDS_INO_SYSTEM_BASE       ((6*MAX_MDS) + (MAX_MDS * NUM_STRAY))
 
 #define MDS_INO_STRAY(x,i)  (MDS_INO_STRAY_OFFSET+((((unsigned)(x))*NUM_STRAY)+((unsigned)(i))))
index 20b576ce8ffee9ad878748356135eafad4a3a7b2..91c7d138288b502e8c16e55ace06b857e8386008 100644 (file)
@@ -93,6 +93,37 @@ class JournalStream
 };
 
 
+// This always lives in the same location for a given MDS
+// instance, it tells the daemon where to look for the journal.
+class JournalPointer {
+  public:
+  // The currently active journal
+  inodeno_t front;
+  // The backup journal, if any (may be 0)
+  inodeno_t back;
+
+  void encode(bufferlist &bl) const {
+    ENCODE_START(1, 1, bl);
+    ::encode(front, bl);
+    ::encode(back, bl);
+    ENCODE_FINISH(bl);
+  }
+
+  void decode(bufferlist::iterator &bl) {
+    DECODE_START(1, bl);
+    ::decode(front, bl);
+    ::decode(back, bl);
+    DECODE_FINISH(bl);
+  }
+
+  JournalPointer() : front(0), back(0) {}
+
+  bool is_null() const {
+    return front == 0 && back == 0;
+  }
+};
+
+
 class Journaler {
 public:
   // this goes at the head of the log "file".
@@ -177,6 +208,10 @@ public:
   } last_written, last_committed;
   WRITE_CLASS_ENCODER(Header)
 
+  uint32_t get_stream_format() const {
+    return stream_format;
+  }
+
 private:
   // me
   CephContext *cct;
@@ -187,7 +222,6 @@ private:
   uint32_t stream_format;
   JournalStream journal_stream;
 
-
   const char *magic;
   Objecter *objecter;
   Filer filer;