]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
ebofs: journal replay working; better journal validation on mount
authorSage Weil <sage@newdream.net>
Fri, 25 Jan 2008 08:25:27 +0000 (00:25 -0800)
committerSage Weil <sage@newdream.net>
Fri, 25 Jan 2008 08:25:27 +0000 (00:25 -0800)
src/ebofs/Ebofs.cc
src/ebofs/FileJournal.cc
src/ebofs/FileJournal.h
src/ebofs/types.h

index 1786bfcedef8cbf288dc75f51ea4b4265a74ba98..cbc74c11db1f3ac9e460f7a12e92d466e8321701 100644 (file)
@@ -48,15 +48,12 @@ char *nice_blocks(block_t b)
 
 int Ebofs::mount()
 {
-  ebofs_lock.Lock();
+  Mutex::Locker locker(ebofs_lock);
   assert(!mounted);
 
   // open dev
   int r = dev.open(&idle_kicker);
-  if (r < 0) {
-    ebofs_lock.Unlock();
-    return r;
-  }
+  if (r < 0) return r;
 
   dout(2) << "mounting " << dev.get_device_name() << " " << dev.get_num_blocks() << " blocks, " << nice_blocks(dev.get_num_blocks()) << dendl;
 
@@ -121,15 +118,21 @@ int Ebofs::mount()
 
   allocator.release_limbo();
 
-  
-  // open journal?
+  // open journal
   if (journalfn) {
     journal = new FileJournal(this, journalfn, g_conf.ebofs_journal_dio);
-    if (journal->open() < 0) {
+    int err = journal->open();
+    if (err < 0) {
       dout(3) << "mount journal " << journalfn << " open failed" << dendl;
       delete journal;
       journal = 0;
+      if (err == -EINVAL) {
+       dout(0) << "mount journal appears corrupt/invalid, stopping" << dendl;
+       dev.close();
+       return -1;
+      }
     } else {
+      // replay journal
       dout(3) << "mount journal " << journalfn << " opened, replaying" << dendl;
       
       while (1) {
@@ -139,7 +142,7 @@ int Ebofs::mount()
          dout(3) << "mount replay: end of journal, done." << dendl;
          break;
        }
-
+       
        if (e < super_epoch) {
          dout(3) << "mount replay: skipping old entry in epoch " << e << " < " << super_epoch << dendl;
          continue;
@@ -156,7 +159,7 @@ int Ebofs::mount()
        t._decode(bl, off);
        _apply_transaction(t);
       }
-
+      
       // done reading, make writeable.
       journal->make_writeable();
     }
@@ -171,22 +174,18 @@ int Ebofs::mount()
          << dendl;
   mounted = true;
 
-
-  ebofs_lock.Unlock();
   return 0;
 }
 
 
 int Ebofs::mkfs()
 {
-  ebofs_lock.Lock();
+  Mutex::Locker locker(ebofs_lock);
   assert(!mounted);
 
   int r = dev.open();
-  if (r < 0) {
-    ebofs_lock.Unlock();
+  if (r < 0) 
     return r;
-  }
 
   block_t num_blocks = dev.get_num_blocks();
 
@@ -279,7 +278,6 @@ int Ebofs::mkfs()
   }
 
   dout(2) << "mkfs: " << dev.get_device_name() << " "  << dev.get_num_blocks() << " blocks, " << nice_blocks(dev.get_num_blocks()) << dendl;
-  ebofs_lock.Unlock();
   return 0;
 }
 
index 73a1009bdbdb2cceec433cc6919ac8290762b017..fab49274682d33ae751bbdbb792a3d253e6861bc 100644 (file)
@@ -32,10 +32,10 @@ int FileJournal::_open(bool forwrite)
   int flags;
 
   if (forwrite) {
-    flags = O_RDONLY;
-  } else {
     flags = O_RDWR;
     if (directio) flags |= O_DIRECT;
+  } else {
+    flags = O_RDONLY;
   }
   
   if (fd >= 0) 
@@ -48,10 +48,12 @@ int FileJournal::_open(bool forwrite)
 
   // get size
   struct stat st;
-  ::fstat(fd, &st);
+  int r = ::fstat(fd, &st);
+  assert(r == 0);
   max_size = st.st_size;
   block_size = st.st_blksize;
-  dout(2) << "_open " << fn << " " << st.st_size << " bytes, block size " << block_size << dendl;
+  dout(2) << "_open " << fn << " fd " << fd 
+         << ": " << st.st_size << " bytes, block size " << block_size << dendl;
 
   return 0;
 }
@@ -69,11 +71,18 @@ int FileJournal::create()
   header.fsid = ebofs->get_fsid();
   header.max_size = max_size;
   header.block_size = block_size;
-  write_header();
-  
-  // writeable.
-  read_pos = 0;
-  write_pos = get_top();
+  if (directio)
+    header.alignment = block_size;
+  else
+    header.alignment = 16;  // at least stay word aligned on 64bit machines...
+  print_header();
+
+  buffer::ptr bp = prepare_header();
+  int r = ::pwrite(fd, bp.c_str(), bp.length(), 0);
+  if (r < 0) {
+    dout(0) << "create write header error " << errno << " " << strerror(errno) << dendl;
+    return -errno;
+  }
 
   ::close(fd);
   fd = -1;
@@ -96,16 +105,29 @@ int FileJournal::open()
   read_header();
   if (header.fsid != ebofs->get_fsid()) {
     dout(2) << "open journal fsid doesn't match, invalid (someone else's?) journal" << dendl;
+    err = -EINVAL;
   } 
-  else if (header.max_size > max_size) {
-    dout(2) << "open old header has mismatched max size, discarding" << dendl;
+  if (header.max_size > max_size) {
+    dout(2) << "open journal size " << header.max_size << " > current " << max_size << dendl;
+    err = -EINVAL;
   }
-  else if (header.block_size != block_size) {
-    dout(2) << "open old header has mismatched block size, discarding" << dendl;
+  if (header.block_size != block_size) {
+    dout(2) << "open journal block size " << header.block_size << " != current " << block_size << dendl;
+    err = -EINVAL;
   }
-  else if (header.num > 0) {
-    // valid header.  
-    
+  if (header.alignment != block_size && directio) {
+    derr(0) << "open journal alignment " << header.alignment << " does not match block size " 
+           << block_size << " (required for direct_io journal mode)" << dendl;
+    err = -EINVAL;
+  }
+  if (err)
+    return err;
+
+  // looks like a valid header.
+  write_pos = 0;  // not writeable yet
+  read_pos = 0;
+
+  if (header.num > 0) {
     // pick an offset
     for (int i=0; i<header.num; i++) {
       if (header.epoch[i] == ebofs->get_super_epoch()) {
@@ -128,6 +150,15 @@ int FileJournal::open()
        break;
       }
     }
+
+    if (read_pos == 0) {
+      dout(0) << "no valid journal segments" << dendl;
+      return -EINVAL;
+    }
+
+  } else {
+    dout(0) << "journal was empty" << dendl;
+    read_pos = get_top();
   }
 
   return 0;
@@ -210,20 +241,10 @@ bufferptr FileJournal::prepare_header()
   return bp;
 }
 
-void FileJournal::write_header()
-{
-  buffer::ptr bp = prepare_header();
-  dout(10) << "write_header writing " << bp.length() << dendl;
-  print_header();
-
-  int r = ::pwrite(fd, bp.c_str(), bp.length(), 0);
-  if (r < 0) 
-    dout(0) << "write_header error " << errno << " " << strerror(errno) << dendl;
-}
 
 
 
-void FileJournal::check_for_wrap(epoch_t epoch, off_t pos, off_t size)
+void FileJournal::check_for_wrap(epoch_t epoch, off64_t pos, off64_t size)
 {
   // epoch boundary?
   dout(10) << "check_for_wrap epoch " << epoch << " last " << header.last_epoch() << " of " << header.num << dendl;
@@ -271,7 +292,7 @@ void FileJournal::check_for_wrap(epoch_t epoch, off_t pos, off_t size)
 void FileJournal::prepare_multi_write(bufferlist& bl)
 {
   // gather queued writes
-  off_t queue_pos = write_pos;
+  off64_t queue_pos = write_pos;
 
   int eleft = g_conf.ebofs_journal_max_write_entries;
   int bleft = g_conf.ebofs_journal_max_write_bytes;
@@ -280,7 +301,7 @@ void FileJournal::prepare_multi_write(bufferlist& bl)
     // grab next item
     epoch_t epoch = writeq.front().first;
     bufferlist &ebl = writeq.front().second;
-    off_t size = 2*sizeof(entry_header_t) + ebl.length();
+    off64_t size = 2*sizeof(entry_header_t) + ebl.length();
 
     if (bl.length() > 0 && bleft > 0 && bleft < size) break;
     
@@ -291,15 +312,13 @@ void FileJournal::prepare_multi_write(bufferlist& bl)
     
     // add to write buffer
     dout(15) << "prepare_multi_write will write " << queue_pos << " : " 
-            << ebl.length() 
-            << " epoch " << epoch
-            << dendl;
+            << ebl.length() << " epoch " << epoch << " -> " << size << dendl;
     
     // add it this entry
     entry_header_t h;
     h.epoch = epoch;
     h.len = ebl.length();
-    h.make_magic(write_pos, header.fsid);
+    h.make_magic(queue_pos, header.fsid);
     bl.append((const char*)&h, sizeof(h));
     bl.claim_append(ebl);
     bl.append((const char*)&h, sizeof(h));
@@ -311,6 +330,8 @@ void FileJournal::prepare_multi_write(bufferlist& bl)
     // pop from writeq
     writeq.pop_front();
     commitq.pop_front();
+
+    queue_pos += size;
     if (--eleft == 0) break;
     bleft -= size;
     if (bleft == 0) break;
@@ -323,15 +344,15 @@ bool FileJournal::prepare_single_dio_write(bufferlist& bl)
   epoch_t epoch = writeq.front().first;
   bufferlist &ebl = writeq.front().second;
     
-  off_t size = 2*sizeof(entry_header_t) + ebl.length();
-  size = DIV_ROUND_UP(size, 4096) * 4096;
+  off64_t size = 2*sizeof(entry_header_t) + ebl.length();
+  size = ROUND_UP_2(size, header.alignment);
   
   check_for_wrap(epoch, write_pos, size);
   if (full) return false;
 
   // build it
   dout(15) << "prepare_single_dio_write will write " << write_pos << " : " 
-          << size << " epoch " << epoch << dendl;
+          << ebl.length() << " epoch " << epoch << " -> " << size << dendl;
 
   bufferptr bp = buffer::create_page_aligned(size);
   entry_header_t *h = (entry_header_t*)bp.c_str();
@@ -377,23 +398,31 @@ void FileJournal::do_write(bufferlist& bl)
     ::pwrite(fd, hbp.c_str(), hbp.length(), 0);
   
   // entry
-  off_t pos = write_pos;
+  off64_t pos = write_pos;
+  ::lseek64(fd, write_pos, SEEK_SET);
   for (list<bufferptr>::const_iterator it = bl.buffers().begin();
        it != bl.buffers().end();
        it++) {
     if ((*it).length() == 0) continue;  // blank buffer.
-    ::pwrite(fd, (char*)(*it).c_str(), (*it).length(), pos);
+    int r = ::write(fd, (char*)(*it).c_str(), (*it).length());
+    if (r < 0)
+      derr(0) << "do_write failed with " << errno << " " << strerror(errno) 
+             << " with " << (void*)(*it).c_str() << " len " << (*it).length()
+             << dendl;
     pos += (*it).length();
   }
-  ::fdatasync(fd);
+  if (!directio)
+    ::fdatasync(fd);
       
   write_lock.Lock();    
 
   writing = false;
   if (memcmp(&old_header, &header, sizeof(header)) == 0) {
     write_pos += bl.length();
+    write_pos = ROUND_UP_2(write_pos, header.alignment);
     ebofs->queue_finishers(writingq);
   } else {
+    dout(10) << "do_write finished write but header changed?  not moving write_pos." << dendl;
     derr(0) << "do_write finished write but header changed?  not moving write_pos." << dendl;
     assert(writingq.empty());
   }
@@ -554,7 +583,8 @@ bool FileJournal::read_entry(bufferlist& bl, epoch_t& epoch)
 
   // header
   entry_header_t h;
-  ::pread(fd, &h, sizeof(h), read_pos);
+  ::lseek64(fd, read_pos, SEEK_SET);
+  ::read(fd, &h, sizeof(h));
   if (!h.check_magic(read_pos, header.fsid)) {
     dout(2) << "read_entry " << read_pos << " : bad header magic, end of journal" << dendl;
     return false;
@@ -570,7 +600,7 @@ bool FileJournal::read_entry(bufferlist& bl, epoch_t& epoch)
   if (!f.check_magic(read_pos, header.fsid) ||
       h.epoch != f.epoch ||
       h.len != f.len) {
-    dout(2) << "read_entry " << read_pos << " : bad footer magic, partially entry, end of journal" << dendl;
+    dout(2) << "read_entry " << read_pos << " : bad footer magic, partial entry, end of journal" << dendl;
     return false;
   }
 
@@ -585,6 +615,7 @@ bool FileJournal::read_entry(bufferlist& bl, epoch_t& epoch)
   epoch = h.epoch;
 
   read_pos += 2*sizeof(entry_header_t) + h.len;
+  read_pos = ROUND_UP_2(read_pos, header.alignment);
 
   return true;
 }
index a0a5cc40ad33a35616b91488cfd94d09a5dc87f9..03c3363e145a32a3b9482514d5f50467a0aaa4b9 100644 (file)
@@ -34,15 +34,16 @@ public:
    * (i.e. when ebofs committed, but the journal didn't rollover ... very small window!)
    */
   struct header_t {
-    uint64_t fsid;
-    int num;
-    off_t wrap;
-    off_t max_size;
-    size_t block_size;
-    epoch_t epoch[4];
-    off_t offset[4];
+    __u64 fsid;
+    __s64 num;
+    __u32 block_size;
+    __u32 alignment;
+    __s64 max_size;
+    __s64 wrap;
+    __u32 epoch[4];
+    __s64 offset[4];
 
-    header_t() : fsid(0), num(0), wrap(0), max_size(0), block_size(0) {}
+    header_t() : fsid(0), num(0), block_size(0), alignment(0), max_size(0), wrap(0) {}
 
     void clear() {
       num = 0;
@@ -57,7 +58,7 @@ public:
        offset[i] = offset[i+1];
       }
     }
-    void push(epoch_t e, off_t o) {
+    void push(epoch_t e, off64_t o) {
       assert(num < 4);
       if (num > 2 && 
          epoch[num-1] == e &&
@@ -81,11 +82,11 @@ public:
     uint64_t magic1;
     uint64_t magic2;
     
-    void make_magic(off_t pos, uint64_t fsid) {
+    void make_magic(off64_t pos, uint64_t fsid) {
       magic1 = pos;
       magic2 = fsid ^ epoch ^ len;
     }
-    bool check_magic(off_t pos, uint64_t fsid) {
+    bool check_magic(off64_t pos, uint64_t fsid) {
       return
        magic1 == (uint64_t)pos &&
        magic2 == (fsid ^ epoch ^ len);
@@ -95,12 +96,12 @@ public:
 private:
   string fn;
 
-  off_t max_size;
+  off64_t max_size;
   size_t block_size;
   bool directio;
   bool full, writing, must_write_header;
-  off_t write_pos;      // byte where next entry written goes
-  off_t read_pos;       // 
+  off64_t write_pos;      // byte where next entry written goes
+  off64_t read_pos;       // 
 
   int fd;
 
@@ -120,12 +121,11 @@ private:
   void print_header();
   void read_header();
   bufferptr prepare_header();
-  void write_header();
   void start_writer();
   void stop_writer();
   void write_thread_entry();
 
-  void check_for_wrap(epoch_t epoch, off_t pos, off_t size);
+  void check_for_wrap(epoch_t epoch, off64_t pos, off64_t size);
   bool prepare_single_dio_write(bufferlist& bl);
   void prepare_multi_write(bufferlist& bl);
   void do_write(bufferlist& bl);
@@ -140,7 +140,7 @@ private:
     }
   } write_thread;
 
-  off_t get_top() {
+  off64_t get_top() {
     if (directio)
       return block_size;
     else
index d0698ab639565a117a0f2edcefad31480ab7f315..b5d9c1fce9c429dd218c414c74dd09a74b5445a3 100644 (file)
@@ -44,6 +44,7 @@ using namespace __gnu_cxx;
 #ifndef DIV_ROUND_UP
 # define DIV_ROUND_UP(n, d)  (((n) + (d) - 1) / (d))
 #endif
+#define ROUND_UP_2(n, d) (((n)+(d)-1) & ~((d)-1))
 
 // disk
 typedef uint64_t block_t;        // disk location/sector/block