From: John Spray Date: Tue, 6 May 2014 12:18:03 +0000 (+0100) Subject: mds: add atomic log rewrite on format change X-Git-Tag: v0.82~48^2~22 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=a4ae168ae61589fbd8c85c7ea970ea6bcd0903e3;p=ceph.git mds: add atomic log rewrite on format change Two main pieces to this: * A new JournalPointer object that stores two journal inodes so that we can do a double-buffered update, followed by an atomic swap. * An extended recovery process in MDLog that dereferences the JournalPointer and conditionally rewrites the journal to accomodate format updates. The JournalPointer indirection should also be useful for making cephfs-journal-tool do updates more safely. Signed-off-by: John Spray --- diff --git a/src/mds/MDLog.cc b/src/mds/MDLog.cc index c224695671b..a5287bf3dbe 100644 --- a/src/mds/MDLog.cc +++ b/src/mds/MDLog.cc @@ -136,9 +136,10 @@ void MDLog::create(Context *c) void MDLog::open(Context *c) { dout(5) << "open discovering log bounds" << dendl; - init_journaler(); - journaler->recover(c); + recovery_thread.set_completion(c); + recovery_thread.create(); + recovery_thread.detach(); // either append() or replay() will follow. } @@ -489,6 +490,315 @@ public: }; +/** + * Resolve the JournalPointer object to a journal file, and + * instantiate a Journaler object. This may re-write the journal + * if the journal in RADOS appears to be in an old format. + * + * This is a separate thread because of the way it is initialized from inside + * the mds lock, which is also the global objecter lock -- rather than split + * it up into hard-to-read async operations linked up by contexts, + * + * When this function completes, the `journaler` attribute will be set to + * a Journaler instance using the latest available serialization format. + */ +void MDLog::_recovery_thread(Context *completion) +{ + assert(journaler == NULL); + + // First, read the pointer object. + // If the pointer object is not present, then create it with + // front = default ino and back = null + JournalPointer jp; + int const read_result = _read_pointer(&jp); + if (read_result == -ENOENT) { + inodeno_t const default_log_ino = MDS_INO_LOG_OFFSET + mds->get_nodeid(); + jp.front = default_log_ino; + int write_result = _write_pointer(jp); + // Nothing graceful we can do for this + assert(write_result >= 0); + } else if (read_result != 0) { + // No graceful way of handling this: give up and leave it for support + // to work out why RADOS preventing access. + assert(0); + } + + // If the back pointer is non-null, that means that a journal + // rewrite failed part way through. Erase the back journal + // to clean up. + if (jp.back) { + dout(1) << "Erasing journal " << jp.back << dendl; + C_SaferCond erase_waiter; + Journaler back(jp.back, mds->mdsmap->get_metadata_pool(), CEPH_FS_ONDISK_MAGIC, + mds->objecter, logger, l_mdl_jlat, &mds->timer); + + // Read all about this journal (header + extents) + mds->mds_lock.Lock(); + C_SaferCond recover_wait; + back.recover(&recover_wait); + mds->mds_lock.Unlock(); + int recovery_result = recover_wait.wait(); + + // Journaler.recover succeeds if no journal objects are present: an error + // means something worse like a corrupt header, which we can't handle here. + assert(recovery_result == 0); + // We could read journal, so we can erase it. + mds->mds_lock.Lock(); + back.erase(&erase_waiter); + mds->mds_lock.Unlock(); + int erase_result = erase_waiter.wait(); + + // If we are successful, or find no data, we can update the JournalPointer to + // reflect that the back journal is gone. + if (erase_result != 0 && erase_result != -ENOENT) { + derr << "Failed to erase journal " << jp.back << ": " << cpp_strerror(erase_result) << dendl; + } else { + dout(1) << "Successfully erased journal, updating journal pointer" << dendl; + jp.back = 0; + int write_result = _write_pointer(jp); + // Nothing graceful we can do for this + assert(write_result >= 0); + } + } + + /* Read the header from the front journal */ + Journaler *front_journal = new Journaler(jp.front, mds->mdsmap->get_metadata_pool(), + CEPH_FS_ONDISK_MAGIC, mds->objecter, logger, l_mdl_jlat, &mds->timer); + C_SaferCond recover_wait; + mds->mds_lock.Lock(); + front_journal->recover(&recover_wait); + mds->mds_lock.Unlock(); + dout(4) << "Waiting for journal " << jp.front << " to recover..." << dendl; + int recovery_result = recover_wait.wait(); + dout(4) << "Journal " << jp.front << " recovered." << dendl; + + if (recovery_result != 0) { + derr << "Error recovering journal " << jp.front << ": " << cpp_strerror(recovery_result) << dendl; + completion->complete(recovery_result); + return; + } + + /* Check whether the front journal format is acceptable or needs re-write */ + if (front_journal->get_stream_format() >= JOURNAL_FORMAT_RESILIENT) { + /* Great, the journal is of current format and ready to rock, hook + * it into this->journaler and complete */ + journaler = front_journal; + journaler->set_write_error_handler(new C_MDL_WriteError(this)); + mds->mds_lock.Lock(); + completion->complete(0); + mds->mds_lock.Unlock(); + } else { + /* Hand off to reformat routine, which will ultimately set the + * completion when it has done its thing */ + dout(1) << "Journal " << jp.front << " has old format " + << front_journal->get_stream_format() << ", it will now be updated" << dendl; + + _reformat_journal(jp, front_journal, completion); + } +} + +/** + * Blocking rewrite of the journal to a new file, followed by + * swap of journal pointer to point to the new one. + * + * We write the new journal to the 'back' journal from the JournalPointer, + * swapping pointers to make that one the front journal only when we have + * safely completed. + */ +void MDLog::_reformat_journal(JournalPointer const &jp_in, Journaler *old_journal, Context *completion) +{ + assert(!jp_in.is_null()); + assert(completion != NULL); + assert(old_journal != NULL); + + JournalPointer jp = jp_in; + + /* Set JournalPointer.back to the location we will write the new journal */ + inodeno_t primary_ino = MDS_INO_LOG_OFFSET + mds->get_nodeid(); + inodeno_t secondary_ino = MDS_INO_LOG_BACKUP_OFFSET + mds->get_nodeid(); + jp.back = (jp.front == primary_ino ? secondary_ino : primary_ino); + int write_result = _write_pointer(jp); + assert(write_result == 0); + + /* Create the new Journaler file */ + Journaler *new_journal = new Journaler(jp.back, mds->mdsmap->get_metadata_pool(), + CEPH_FS_ONDISK_MAGIC, mds->objecter, logger, l_mdl_jlat, &mds->timer); + dout(4) << "Writing new journal header " << jp.back << dendl; + ceph_file_layout new_layout = old_journal->get_layout(); + new_journal->set_writeable(); + new_journal->create(&new_layout); + + /* Write the new journal header to RADOS */ + C_SaferCond write_head_wait; + mds->mds_lock.Lock(); + new_journal->write_head(&write_head_wait); + mds->mds_lock.Unlock(); + write_head_wait.wait(); + + // Read in the old journal, and whenever we have readable events, + // write them to the new journal. + int r = 0; + + // The logic in here borrowed from replay_thread expects mds_lock to be held, + // e.g. between checking readable and doing wait_for_readable so that journaler + // state doesn't change in between. + uint32_t events_transcribed = 0; + mds->mds_lock.Lock(); + while (1) { + while (!old_journal->is_readable() && + old_journal->get_read_pos() < old_journal->get_write_pos() && + !old_journal->get_error()) { + + // Issue a journal prefetch + C_SaferCond readable_waiter; + old_journal->wait_for_readable(&readable_waiter); + + // Wait for a journal prefetch to complete + mds->mds_lock.Unlock(); + readable_waiter.wait(); + mds->mds_lock.Lock(); + } + if (old_journal->get_error()) { + r = old_journal->get_error(); + dout(0) << "_replay journaler got error " << r << ", aborting" << dendl; + break; + } + + if (!old_journal->is_readable() && + old_journal->get_read_pos() == old_journal->get_write_pos()) + break; + + // Read one serialized LogEvent + assert(old_journal->is_readable()); + bufferlist bl; + bool r = old_journal->try_read_entry(bl); + if (!r && old_journal->get_error()) + continue; + assert(r); + + // Write (buffered, synchronous) one serialized LogEvent + events_transcribed += 1; + new_journal->append_entry(bl); + + // Allow other I/O to advance, e.g. MDS beacons + mds->mds_lock.Unlock(); + mds->mds_lock.Lock(); + } + mds->mds_lock.Unlock(); + + dout(1) << "Transcribed " << events_transcribed << " events, flushing new journal" << dendl; + C_SaferCond flush_waiter; + mds->mds_lock.Lock(); + new_journal->flush(&flush_waiter); + mds->mds_lock.Unlock(); + flush_waiter.wait(); + + // If failed to rewrite journal, leave the part written journal + // as garbage to be cleaned up next startup. + assert(r == 0); + + /* Now that the new journal is safe, we can flip the pointers */ + inodeno_t const tmp = jp.front; + jp.front = jp.back; + jp.back = tmp; + write_result = _write_pointer(jp); + assert(write_result == 0); + + /* Delete the old journal to free space */ + dout(1) << "New journal flushed, erasing old journal" << dendl; + C_SaferCond erase_waiter; + mds->mds_lock.Lock(); + old_journal->erase(&erase_waiter); + mds->mds_lock.Unlock(); + int erase_result = erase_waiter.wait(); + assert(erase_result == 0); + delete old_journal; + + /* Update the pointer to reflect we're back in clean single journal state. */ + jp.back = 0; + write_result = _write_pointer(jp); + assert(write_result == 0); + + /* Reset the Journaler object to its default state */ + dout(1) << "Journal rewrite complete, continuing with normal startup" << dendl; + journaler = new_journal; + journaler->set_readonly(); + journaler->set_write_error_handler(new C_MDL_WriteError(this)); + + /* Trigger completion */ + mds->mds_lock.Lock(); + completion->complete(0); + mds->mds_lock.Unlock(); +} + +/** + * Blocking read of JournalPointer for this MDS + */ +int MDLog::_read_pointer(JournalPointer *jp) +{ + assert(!mds->mds_lock.is_locked_by_me()); + assert(jp); + + inodeno_t const pointer_ino = MDS_INO_LOG_POINTER_OFFSET + mds->get_nodeid(); + char buf[32]; + snprintf(buf, sizeof(buf), "%llx.%08llx", (long long unsigned)pointer_ino, (long long unsigned)0); + + // Blocking read of data + dout(4) << "Reading journal pointer '" << buf << "'" << dendl; + bufferlist data; + C_SaferCond waiter; + mds->mds_lock.Lock(); + mds->objecter->read_full(object_t(buf), object_locator_t(mds->mdsmap->get_metadata_pool()), + CEPH_NOSNAP, &data, 0, &waiter); + mds->mds_lock.Unlock(); + int r = waiter.wait(); + + // Construct JournalPointer result, null or decoded data + if (r == 0) { + bufferlist::iterator q = data.begin(); + jp->decode(q); + } else { + dout(1) << "Journal pointer '" << buf << "' read failed: " << cpp_strerror(r) << dendl; + } + return r; +} + + +/** + * Blocking write of JournalPointer for this MDS + * + * @return objecter write op status code + */ +int MDLog::_write_pointer(JournalPointer const &ptr) +{ + assert(!mds->mds_lock.is_locked_by_me()); + // It is not valid to persist a null pointer + assert(!ptr.is_null()); + + // Calculate object ID + inodeno_t const pointer_ino = MDS_INO_LOG_POINTER_OFFSET + mds->get_nodeid(); + char buf[32]; + snprintf(buf, sizeof(buf), "%llx.%08llx", (long long unsigned)pointer_ino, (long long unsigned)0); + dout(4) << "Writing pointer object '" << buf << "': 0x" + << std::hex << ptr.front << ":0x" << ptr.back << std::dec << dendl; + + // Serialize JournalPointer object + bufferlist data; + ptr.encode(data); + + // Write to RADOS and wait for durability + C_SaferCond waiter; + mds->mds_lock.Lock(); + mds->objecter->write_full(object_t(buf), object_locator_t(mds->mdsmap->get_metadata_pool()), + SnapContext(), data, ceph_clock_now(g_ceph_context), 0, NULL, &waiter); + mds->mds_lock.Unlock(); + int write_result = waiter.wait(); + if (write_result < 0) { + derr << "Error writing pointer object '" << buf << "': " << cpp_strerror(write_result) << dendl; + } + return write_result; +} + // i am a separate thread void MDLog::_replay_thread() diff --git a/src/mds/MDLog.h b/src/mds/MDLog.h index 6e8e980c94e..ffdd7061bd4 100644 --- a/src/mds/MDLog.h +++ b/src/mds/MDLog.h @@ -48,6 +48,7 @@ enum { #include class Journaler; +class JournalPointer; class LogEvent; class MDS; class LogSegment; @@ -97,6 +98,22 @@ protected: void _replay(); // old way void _replay_thread(); // new way + // Journal recovery/rewrite logic + class RecoveryThread : public Thread { + MDLog *log; + Context *completion; + public: + void set_completion(Context *c) {completion = c;} + RecoveryThread(MDLog *l) : log(l), completion(NULL) {} + void* entry() { + log->_recovery_thread(completion); + return 0; + } + } recovery_thread; + void _recovery_thread(Context *completion); + int _read_pointer(JournalPointer *jp); + int _write_pointer(JournalPointer const &ptr); + void _reformat_journal(JournalPointer const &jp, Journaler *old_journal, Context *completion); // -- segments -- map segments; @@ -154,6 +171,7 @@ public: logger(0), replay_thread(this), already_replayed(false), + recovery_thread(this), expiring_events(0), expired_events(0), cur_event(NULL) { } ~MDLog(); diff --git a/src/mds/mdstypes.h b/src/mds/mdstypes.h index 6a08e9d20d5..a7e495c7203 100644 --- a/src/mds/mdstypes.h +++ b/src/mds/mdstypes.h @@ -51,9 +51,13 @@ using namespace std; #define MDS_INO_CEPH 2 #define MDS_INO_MDSDIR_OFFSET (1*MAX_MDS) -#define MDS_INO_LOG_OFFSET (2*MAX_MDS) #define MDS_INO_STRAY_OFFSET (6*MAX_MDS) +// Locations for journal data +#define MDS_INO_LOG_OFFSET (2*MAX_MDS) +#define MDS_INO_LOG_BACKUP_OFFSET (3*MAX_MDS) +#define MDS_INO_LOG_POINTER_OFFSET (4*MAX_MDS) + #define MDS_INO_SYSTEM_BASE ((6*MAX_MDS) + (MAX_MDS * NUM_STRAY)) #define MDS_INO_STRAY(x,i) (MDS_INO_STRAY_OFFSET+((((unsigned)(x))*NUM_STRAY)+((unsigned)(i)))) diff --git a/src/osdc/Journaler.h b/src/osdc/Journaler.h index 20b576ce8ff..91c7d138288 100644 --- a/src/osdc/Journaler.h +++ b/src/osdc/Journaler.h @@ -93,6 +93,37 @@ class JournalStream }; +// This always lives in the same location for a given MDS +// instance, it tells the daemon where to look for the journal. +class JournalPointer { + public: + // The currently active journal + inodeno_t front; + // The backup journal, if any (may be 0) + inodeno_t back; + + void encode(bufferlist &bl) const { + ENCODE_START(1, 1, bl); + ::encode(front, bl); + ::encode(back, bl); + ENCODE_FINISH(bl); + } + + void decode(bufferlist::iterator &bl) { + DECODE_START(1, bl); + ::decode(front, bl); + ::decode(back, bl); + DECODE_FINISH(bl); + } + + JournalPointer() : front(0), back(0) {} + + bool is_null() const { + return front == 0 && back == 0; + } +}; + + class Journaler { public: // this goes at the head of the log "file". @@ -177,6 +208,10 @@ public: } last_written, last_committed; WRITE_CLASS_ENCODER(Header) + uint32_t get_stream_format() const { + return stream_format; + } + private: // me CephContext *cct; @@ -187,7 +222,6 @@ private: uint32_t stream_format; JournalStream journal_stream; - const char *magic; Objecter *objecter; Filer filer;