]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osd: FileJournal clean-up
authorDavid Zafman <dzafman@redhat.com>
Wed, 18 Feb 2015 20:29:38 +0000 (12:29 -0800)
committerDavid Zafman <dzafman@redhat.com>
Thu, 25 Feb 2016 20:50:21 +0000 (12:50 -0800)
Move aio / directio adjustments to constructor
Indicate functions which only operate before journal is active
Make functions const when possible
Move push_back() of journalq to read_entry()
Change dump() to use do_read_entry() to minimize side effects

Signed-off-by: David Zafman <dzafman@redhat.com>
(cherry picked from commit 12fdf4ced0b2c17770c21204c8eccca8e4d0d2c9)

Conflicts:
src/os/FileJournal.cc (trivial)

src/os/FileJournal.cc
src/os/FileJournal.h

index 34082cdb5bdefefe46328af5548018129c4f4568..8b151f5dce67ab2914e99d82ff5e32452ff20e5d 100644 (file)
@@ -45,17 +45,6 @@ int FileJournal::_open(bool forwrite, bool create)
 {
   int flags, ret;
 
-  if (aio && !directio) {
-    derr << "FileJournal::_open: aio not supported without directio; disabling aio" << dendl;
-    aio = false;
-  }
-#ifndef HAVE_LIBAIO
-  if (aio) {
-    derr << "FileJournal::_open: libaio not compiled in; disabling aio" << dendl;
-    aio = false;
-  }
-#endif
-
   if (forwrite) {
     flags = O_RDWR;
     if (directio)
@@ -331,15 +320,17 @@ int FileJournal::_open_file(int64_t oldsize, blksize_t blksize,
   return 0;
 }
 
+// This can not be used on an active journal
 int FileJournal::check()
 {
   int ret;
 
+  assert(fd == -1);
   ret = _open(false, false);
   if (ret)
-    goto done;
+    return ret;
 
-  ret = read_header();
+  ret = read_header(&header);
   if (ret < 0)
     goto done;
 
@@ -354,8 +345,7 @@ int FileJournal::check()
   ret = 0;
 
  done:
-  VOID_TEMP_FAILURE_RETRY(::close(fd));
-  fd = -1;
+  close();
   return ret;
 }
 
@@ -386,7 +376,7 @@ int FileJournal::create()
   header.start = get_top();
   header.start_seq = 0;
 
-  print_header();
+  print_header(header);
 
   // static zeroed buffer for alignment padding
   delete [] zero_buf;
@@ -443,16 +433,20 @@ done:
   return ret;
 }
 
+// This can not be used on an active journal
 int FileJournal::peek_fsid(uuid_d& fsid)
 {
+  assert(fd == -1);
   int r = _open(false, false);
   if (r)
     return r;
-  r = read_header();
+  r = read_header(&header);
   if (r < 0)
-    return r;
+    goto out;
   fsid = header.fsid;
-  return 0;
+out:
+  close();
+  return r;
 }
 
 int FileJournal::open(uint64_t fs_op_seq)
@@ -470,7 +464,7 @@ int FileJournal::open(uint64_t fs_op_seq)
   write_pos = get_top();
 
   // read header?
-  err = read_header();
+  err = read_header(&header);
   if (err < 0)
     return err;
 
@@ -556,6 +550,11 @@ int FileJournal::open(uint64_t fs_op_seq)
   return 0;
 }
 
+void FileJournal::_close(int fd) const
+{
+  VOID_TEMP_FAILURE_RETRY(::close(fd));
+}
+
 void FileJournal::close()
 {
   dout(1) << "close " << fn << dendl;
@@ -567,25 +566,27 @@ void FileJournal::close()
   assert(writeq_empty());
   assert(!must_write_header);
   assert(fd >= 0);
-  VOID_TEMP_FAILURE_RETRY(::close(fd));
+  _close(fd);
   fd = -1;
 }
 
 
 int FileJournal::dump(ostream& out)
 {
-  int err = 0;
-
   dout(10) << "dump" << dendl;
-  err = _open(false, false);
+
+  assert(fd == -1);
+  int err = _open(false, false);
   if (err)
     return err;
 
-  err = read_header();
-  if (err < 0)
+  err = read_header(&header);
+  if (err < 0) {
+    close();
     return err;
+  }
 
-  read_pos = header.start;
+  off64_t next_pos = header.start;
 
   JSONFormatter f(true);
   f.open_object_section("journal");
@@ -604,12 +605,32 @@ int FileJournal::dump(ostream& out)
   f.close_section();
 
   f.open_array_section("entries");
-  uint64_t seq = 0;
+  uint64_t seq = header.start_seq;
   while (1) {
     bufferlist bl;
-    uint64_t pos = read_pos;
-    if (!read_entry(bl, seq)) {
-      dout(3) << "journal_replay: end of journal, done." << dendl;
+    off64_t pos = next_pos;
+
+    if (!pos) {
+      dout(2) << "_dump -- not readable" << dendl;
+      return false;
+    }
+    stringstream ss;
+    read_entry_result result = do_read_entry(
+      pos,
+      &next_pos,
+      &bl,
+      &seq,
+      &ss);
+    if (result != SUCCESS) {
+      if (seq < header.committed_up_to) {
+        dout(2) << "Unable to read past sequence " << seq
+           << " but header indicates the journal has committed up through "
+           << header.committed_up_to << ", journal is corrupt" << dendl;
+        err = EINVAL;
+      }
+      dout(25) << ss.str() << dendl;
+      dout(25) << "No further valid entries found, journal is most likely valid"
+         << dendl;
       break;
     }
 
@@ -636,7 +657,9 @@ int FileJournal::dump(ostream& out)
   f.close_section();
   f.flush(out);
   dout(10) << "dump finish" << dendl;
-  return 0;
+
+  close();
+  return err;
 }
 
 
@@ -653,24 +676,28 @@ void FileJournal::start_writer()
 
 void FileJournal::stop_writer()
 {
+  // Do nothing if writer already stopped or never started
+  if (!write_stop)
   {
-    Mutex::Locker l(write_lock);
-    Mutex::Locker p(writeq_lock);
-    write_stop = true;
-    writeq_cond.Signal();
-    // Doesn't hurt to signal commit_cond in case thread is waiting there
-    // and caller didn't use committed_thru() first.
-    commit_cond.Signal();
-  }
-  write_thread.join();
-  // write journal header now so that we have less to replay on remount
-  write_header_sync();
+    {
+      Mutex::Locker l(write_lock);
+      Mutex::Locker p(writeq_lock);
+      write_stop = true;
+      writeq_cond.Signal();
+      // Doesn't hurt to signal commit_cond in case thread is waiting there
+      // and caller didn't use committed_thru() first.
+      commit_cond.Signal();
+    }
+    write_thread.join();
+
+    // write journal header now so that we have less to replay on remount
+    write_header_sync();
+  }
 
 #ifdef HAVE_LIBAIO
   // stop aio completeion thread *after* writer thread has stopped
   // and has submitted all of its io
-  if (aio) {
+  if (aio && !aio_stop) {
     aio_lock.Lock();
     aio_stop = true;
     aio_cond.Signal();
@@ -683,7 +710,7 @@ void FileJournal::stop_writer()
 
 
 
-void FileJournal::print_header()
+void FileJournal::print_header(const header_t &header) const
 {
   dout(10) << "header: block_size " << header.block_size
           << " alignment " << header.alignment
@@ -693,7 +720,7 @@ void FileJournal::print_header()
   dout(10) << " write_pos " << write_pos << dendl;
 }
 
-int FileJournal::read_header()
+int FileJournal::read_header(header_t *hdr) const
 {
   dout(10) << "read_header" << dendl;
   bufferlist bl;
@@ -712,7 +739,7 @@ int FileJournal::read_header()
 
   try {
     bufferlist::iterator p = bl.begin();
-    ::decode(header, p);
+    ::decode(*hdr, p);
   }
   catch (buffer::error& e) {
     derr << "read_header error decoding journal header" << dendl;
@@ -727,12 +754,12 @@ int FileJournal::read_header()
    * remove this or else this (eventually old) code will clobber newer
    * code's flags.
    */
-  if (header.flags > 3) {
+  if (hdr->flags > 3) {
     derr << "read_header appears to have gibberish flags; assuming 0" << dendl;
-    header.flags = 0;
+    hdr->flags = 0;
   }
 
-  print_header();
+  print_header(*hdr);
 
   return 0;
 }
@@ -834,7 +861,7 @@ int FileJournal::prepare_multi_write(bufferlist& bl, uint64_t& orig_ops, uint64_
          put_throttle(1, peek_write().bl.length());
          pop_write();
        }  
-       print_header();
+       print_header(header);
       }
 
       return -ENOSPC;  // hrm, full on first op
@@ -1241,7 +1268,7 @@ void FileJournal::write_thread_entry()
          put_throttle(1, peek_write().bl.length());
          pop_write();
        }  
-       print_header();
+       print_header(header);
        r = 0;
       } else {
        dout(20) << "write_thread_entry full, going to sleep (waiting for commit)" << dendl;
@@ -1666,7 +1693,7 @@ void FileJournal::committed_thru(uint64_t seq)
   }
 
   must_write_header = true;
-  print_header();
+  print_header(header);
 
   // committed but unjournaled items
   while (!writeq_empty() && peek_write().seq <= seq) {
@@ -1725,7 +1752,7 @@ void FileJournal::wrap_read_bl(
   int64_t olen,
   bufferlist* bl,
   off64_t *out_pos
-  )
+  ) const
 {
   while (olen > 0) {
     while (pos >= header.max_size)
@@ -1781,6 +1808,7 @@ bool FileJournal::read_entry(
     &seq,
     &ss);
   if (result == SUCCESS) {
+    journalq.push_back( pair<uint64_t,off64_t>(seq, pos));
     if (next_seq > seq) {
       return false;
     } else {
@@ -1818,7 +1846,7 @@ FileJournal::read_entry_result FileJournal::do_read_entry(
   bufferlist *bl,
   uint64_t *seq,
   ostream *ss,
-  entry_header_t *_h)
+  entry_header_t *_h) const
 {
   off64_t cur_pos = init_pos;
   bufferlist _bl;
@@ -1888,11 +1916,6 @@ FileJournal::read_entry_result FileJournal::do_read_entry(
   if (seq)
     *seq = h->seq;
 
-  // works around an apparent GCC 4.8(?) compiler bug about unaligned
-  // bind by reference to (packed) h->seq
-  journalq.push_back(
-    pair<uint64_t,off64_t>(static_cast<uint64_t>(h->seq),
-                          static_cast<off64_t>(init_pos)));
 
   if (next_pos)
     *next_pos = cur_pos;
index 430f773c83d828f98974d3efdd3268e066ba4b9f..0840f733fe9fd710e7b5f672d7c2ed0211c83628 100644 (file)
@@ -134,7 +134,7 @@ public:
       start = block_size;
     }
 
-    uint64_t get_fsid64() {
+    uint64_t get_fsid64() const {
       return *(uint64_t*)&fsid.uuid[0];
     }
 
@@ -294,10 +294,11 @@ private:
 
   int _open(bool wr, bool create=false);
   int _open_block_device();
+  void _close(int fd) const;
   void _check_disk_write_cache() const;
   int _open_file(int64_t oldsize, blksize_t blksize, bool create);
-  void print_header();
-  int read_header();
+  void print_header(const header_t &hdr) const;
+  int read_header(header_t *hdr) const;
   bufferptr prepare_header();
   void start_writer();
   void stop_writer();
@@ -325,7 +326,7 @@ private:
     int64_t len,      ///< [in] length to read
     bufferlist* bl,   ///< [out] result
     off64_t *out_pos  ///< [out] next position to read, will be wrapped
-    );
+    ) const;
 
   void do_discard(int64_t offset, int64_t end);
 
@@ -349,7 +350,7 @@ private:
     }
   } write_finish_thread;
 
-  off64_t get_top() {
+  off64_t get_top() const {
     return ROUND_UP_TO(sizeof(header), block_size);
   }
 
@@ -382,11 +383,24 @@ private:
     throttle_ops(g_ceph_context, "filestore_ops", g_conf->journal_queue_max_ops),
     throttle_bytes(g_ceph_context, "filestore_bytes", g_conf->journal_queue_max_bytes),
     write_lock("FileJournal::write_lock", false, true, false, g_ceph_context),
-    write_stop(false),
-    aio_stop(false),
+    write_stop(true),
+    aio_stop(true),
     write_thread(this),
-    write_finish_thread(this) { }
+    write_finish_thread(this) {
+
+      if (aio && !directio) {
+        derr << "FileJournal::_open_any: aio not supported without directio; disabling aio" << dendl;
+        aio = false;
+      }
+#ifndef HAVE_LIBAIO
+      if (aio) {
+        derr << "FileJournal::_open_any: libaio not compiled in; disabling aio" << dendl;
+        aio = false;
+      }
+#endif
+  }
   ~FileJournal() {
+    assert(fd == -1);
     delete[] zero_buf;
   }
 
@@ -448,7 +462,7 @@ private:
     uint64_t *seq,        ///< [out] seq of successful read
     ostream *ss,          ///< [out] error output
     entry_header_t *h = 0 ///< [out] header
-    ); ///< @return result code
+    ) const; ///< @return result code
 
   bool read_entry(
     bufferlist &bl,