void MDLog::open(Context *c)
{
dout(5) << "open discovering log bounds" << dendl;
- init_journaler();
- journaler->recover(c);
+ recovery_thread.set_completion(c);
+ recovery_thread.create();
+ recovery_thread.detach();
// either append() or replay() will follow.
}
};
+/**
+ * Resolve the JournalPointer object to a journal file, and
+ * instantiate a Journaler object. This may re-write the journal
+ * if the journal in RADOS appears to be in an old format.
+ *
+ * This is a separate thread because of the way it is initialized from inside
+ * the mds lock, which is also the global objecter lock -- rather than split
+ * it up into hard-to-read async operations linked up by contexts,
+ *
+ * When this function completes, the `journaler` attribute will be set to
+ * a Journaler instance using the latest available serialization format.
+ */
+void MDLog::_recovery_thread(Context *completion)
+{
+ assert(journaler == NULL);
+
+ // First, read the pointer object.
+ // If the pointer object is not present, then create it with
+ // front = default ino and back = null
+ JournalPointer jp;
+ int const read_result = _read_pointer(&jp);
+ if (read_result == -ENOENT) {
+ inodeno_t const default_log_ino = MDS_INO_LOG_OFFSET + mds->get_nodeid();
+ jp.front = default_log_ino;
+ int write_result = _write_pointer(jp);
+ // Nothing graceful we can do for this
+ assert(write_result >= 0);
+ } else if (read_result != 0) {
+ // No graceful way of handling this: give up and leave it for support
+ // to work out why RADOS preventing access.
+ assert(0);
+ }
+
+ // If the back pointer is non-null, that means that a journal
+ // rewrite failed part way through. Erase the back journal
+ // to clean up.
+ if (jp.back) {
+ dout(1) << "Erasing journal " << jp.back << dendl;
+ C_SaferCond erase_waiter;
+ Journaler back(jp.back, mds->mdsmap->get_metadata_pool(), CEPH_FS_ONDISK_MAGIC,
+ mds->objecter, logger, l_mdl_jlat, &mds->timer);
+
+ // Read all about this journal (header + extents)
+ mds->mds_lock.Lock();
+ C_SaferCond recover_wait;
+ back.recover(&recover_wait);
+ mds->mds_lock.Unlock();
+ int recovery_result = recover_wait.wait();
+
+ // Journaler.recover succeeds if no journal objects are present: an error
+ // means something worse like a corrupt header, which we can't handle here.
+ assert(recovery_result == 0);
+ // We could read journal, so we can erase it.
+ mds->mds_lock.Lock();
+ back.erase(&erase_waiter);
+ mds->mds_lock.Unlock();
+ int erase_result = erase_waiter.wait();
+
+ // If we are successful, or find no data, we can update the JournalPointer to
+ // reflect that the back journal is gone.
+ if (erase_result != 0 && erase_result != -ENOENT) {
+ derr << "Failed to erase journal " << jp.back << ": " << cpp_strerror(erase_result) << dendl;
+ } else {
+ dout(1) << "Successfully erased journal, updating journal pointer" << dendl;
+ jp.back = 0;
+ int write_result = _write_pointer(jp);
+ // Nothing graceful we can do for this
+ assert(write_result >= 0);
+ }
+ }
+
+ /* Read the header from the front journal */
+ Journaler *front_journal = new Journaler(jp.front, mds->mdsmap->get_metadata_pool(),
+ CEPH_FS_ONDISK_MAGIC, mds->objecter, logger, l_mdl_jlat, &mds->timer);
+ C_SaferCond recover_wait;
+ mds->mds_lock.Lock();
+ front_journal->recover(&recover_wait);
+ mds->mds_lock.Unlock();
+ dout(4) << "Waiting for journal " << jp.front << " to recover..." << dendl;
+ int recovery_result = recover_wait.wait();
+ dout(4) << "Journal " << jp.front << " recovered." << dendl;
+
+ if (recovery_result != 0) {
+ derr << "Error recovering journal " << jp.front << ": " << cpp_strerror(recovery_result) << dendl;
+ completion->complete(recovery_result);
+ return;
+ }
+
+ /* Check whether the front journal format is acceptable or needs re-write */
+ if (front_journal->get_stream_format() >= JOURNAL_FORMAT_RESILIENT) {
+ /* Great, the journal is of current format and ready to rock, hook
+ * it into this->journaler and complete */
+ journaler = front_journal;
+ journaler->set_write_error_handler(new C_MDL_WriteError(this));
+ mds->mds_lock.Lock();
+ completion->complete(0);
+ mds->mds_lock.Unlock();
+ } else {
+ /* Hand off to reformat routine, which will ultimately set the
+ * completion when it has done its thing */
+ dout(1) << "Journal " << jp.front << " has old format "
+ << front_journal->get_stream_format() << ", it will now be updated" << dendl;
+
+ _reformat_journal(jp, front_journal, completion);
+ }
+}
+
+/**
+ * Blocking rewrite of the journal to a new file, followed by
+ * swap of journal pointer to point to the new one.
+ *
+ * We write the new journal to the 'back' journal from the JournalPointer,
+ * swapping pointers to make that one the front journal only when we have
+ * safely completed.
+ */
+void MDLog::_reformat_journal(JournalPointer const &jp_in, Journaler *old_journal, Context *completion)
+{
+ assert(!jp_in.is_null());
+ assert(completion != NULL);
+ assert(old_journal != NULL);
+
+ JournalPointer jp = jp_in;
+
+ /* Set JournalPointer.back to the location we will write the new journal */
+ inodeno_t primary_ino = MDS_INO_LOG_OFFSET + mds->get_nodeid();
+ inodeno_t secondary_ino = MDS_INO_LOG_BACKUP_OFFSET + mds->get_nodeid();
+ jp.back = (jp.front == primary_ino ? secondary_ino : primary_ino);
+ int write_result = _write_pointer(jp);
+ assert(write_result == 0);
+
+ /* Create the new Journaler file */
+ Journaler *new_journal = new Journaler(jp.back, mds->mdsmap->get_metadata_pool(),
+ CEPH_FS_ONDISK_MAGIC, mds->objecter, logger, l_mdl_jlat, &mds->timer);
+ dout(4) << "Writing new journal header " << jp.back << dendl;
+ ceph_file_layout new_layout = old_journal->get_layout();
+ new_journal->set_writeable();
+ new_journal->create(&new_layout);
+
+ /* Write the new journal header to RADOS */
+ C_SaferCond write_head_wait;
+ mds->mds_lock.Lock();
+ new_journal->write_head(&write_head_wait);
+ mds->mds_lock.Unlock();
+ write_head_wait.wait();
+
+ // Read in the old journal, and whenever we have readable events,
+ // write them to the new journal.
+ int r = 0;
+
+ // The logic in here borrowed from replay_thread expects mds_lock to be held,
+ // e.g. between checking readable and doing wait_for_readable so that journaler
+ // state doesn't change in between.
+ uint32_t events_transcribed = 0;
+ mds->mds_lock.Lock();
+ while (1) {
+ while (!old_journal->is_readable() &&
+ old_journal->get_read_pos() < old_journal->get_write_pos() &&
+ !old_journal->get_error()) {
+
+ // Issue a journal prefetch
+ C_SaferCond readable_waiter;
+ old_journal->wait_for_readable(&readable_waiter);
+
+ // Wait for a journal prefetch to complete
+ mds->mds_lock.Unlock();
+ readable_waiter.wait();
+ mds->mds_lock.Lock();
+ }
+ if (old_journal->get_error()) {
+ r = old_journal->get_error();
+ dout(0) << "_replay journaler got error " << r << ", aborting" << dendl;
+ break;
+ }
+
+ if (!old_journal->is_readable() &&
+ old_journal->get_read_pos() == old_journal->get_write_pos())
+ break;
+
+ // Read one serialized LogEvent
+ assert(old_journal->is_readable());
+ bufferlist bl;
+ bool r = old_journal->try_read_entry(bl);
+ if (!r && old_journal->get_error())
+ continue;
+ assert(r);
+
+ // Write (buffered, synchronous) one serialized LogEvent
+ events_transcribed += 1;
+ new_journal->append_entry(bl);
+
+ // Allow other I/O to advance, e.g. MDS beacons
+ mds->mds_lock.Unlock();
+ mds->mds_lock.Lock();
+ }
+ mds->mds_lock.Unlock();
+
+ dout(1) << "Transcribed " << events_transcribed << " events, flushing new journal" << dendl;
+ C_SaferCond flush_waiter;
+ mds->mds_lock.Lock();
+ new_journal->flush(&flush_waiter);
+ mds->mds_lock.Unlock();
+ flush_waiter.wait();
+
+ // If failed to rewrite journal, leave the part written journal
+ // as garbage to be cleaned up next startup.
+ assert(r == 0);
+
+ /* Now that the new journal is safe, we can flip the pointers */
+ inodeno_t const tmp = jp.front;
+ jp.front = jp.back;
+ jp.back = tmp;
+ write_result = _write_pointer(jp);
+ assert(write_result == 0);
+
+ /* Delete the old journal to free space */
+ dout(1) << "New journal flushed, erasing old journal" << dendl;
+ C_SaferCond erase_waiter;
+ mds->mds_lock.Lock();
+ old_journal->erase(&erase_waiter);
+ mds->mds_lock.Unlock();
+ int erase_result = erase_waiter.wait();
+ assert(erase_result == 0);
+ delete old_journal;
+
+ /* Update the pointer to reflect we're back in clean single journal state. */
+ jp.back = 0;
+ write_result = _write_pointer(jp);
+ assert(write_result == 0);
+
+ /* Reset the Journaler object to its default state */
+ dout(1) << "Journal rewrite complete, continuing with normal startup" << dendl;
+ journaler = new_journal;
+ journaler->set_readonly();
+ journaler->set_write_error_handler(new C_MDL_WriteError(this));
+
+ /* Trigger completion */
+ mds->mds_lock.Lock();
+ completion->complete(0);
+ mds->mds_lock.Unlock();
+}
+
+/**
+ * Blocking read of JournalPointer for this MDS
+ */
+int MDLog::_read_pointer(JournalPointer *jp)
+{
+ assert(!mds->mds_lock.is_locked_by_me());
+ assert(jp);
+
+ inodeno_t const pointer_ino = MDS_INO_LOG_POINTER_OFFSET + mds->get_nodeid();
+ char buf[32];
+ snprintf(buf, sizeof(buf), "%llx.%08llx", (long long unsigned)pointer_ino, (long long unsigned)0);
+
+ // Blocking read of data
+ dout(4) << "Reading journal pointer '" << buf << "'" << dendl;
+ bufferlist data;
+ C_SaferCond waiter;
+ mds->mds_lock.Lock();
+ mds->objecter->read_full(object_t(buf), object_locator_t(mds->mdsmap->get_metadata_pool()),
+ CEPH_NOSNAP, &data, 0, &waiter);
+ mds->mds_lock.Unlock();
+ int r = waiter.wait();
+
+ // Construct JournalPointer result, null or decoded data
+ if (r == 0) {
+ bufferlist::iterator q = data.begin();
+ jp->decode(q);
+ } else {
+ dout(1) << "Journal pointer '" << buf << "' read failed: " << cpp_strerror(r) << dendl;
+ }
+ return r;
+}
+
+
+/**
+ * Blocking write of JournalPointer for this MDS
+ *
+ * @return objecter write op status code
+ */
+int MDLog::_write_pointer(JournalPointer const &ptr)
+{
+ assert(!mds->mds_lock.is_locked_by_me());
+ // It is not valid to persist a null pointer
+ assert(!ptr.is_null());
+
+ // Calculate object ID
+ inodeno_t const pointer_ino = MDS_INO_LOG_POINTER_OFFSET + mds->get_nodeid();
+ char buf[32];
+ snprintf(buf, sizeof(buf), "%llx.%08llx", (long long unsigned)pointer_ino, (long long unsigned)0);
+ dout(4) << "Writing pointer object '" << buf << "': 0x"
+ << std::hex << ptr.front << ":0x" << ptr.back << std::dec << dendl;
+
+ // Serialize JournalPointer object
+ bufferlist data;
+ ptr.encode(data);
+
+ // Write to RADOS and wait for durability
+ C_SaferCond waiter;
+ mds->mds_lock.Lock();
+ mds->objecter->write_full(object_t(buf), object_locator_t(mds->mdsmap->get_metadata_pool()),
+ SnapContext(), data, ceph_clock_now(g_ceph_context), 0, NULL, &waiter);
+ mds->mds_lock.Unlock();
+ int write_result = waiter.wait();
+ if (write_result < 0) {
+ derr << "Error writing pointer object '" << buf << "': " << cpp_strerror(write_result) << dendl;
+ }
+ return write_result;
+}
+
// i am a separate thread
void MDLog::_replay_thread()