]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
* some edits
authorsageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Fri, 1 Jun 2007 21:17:34 +0000 (21:17 +0000)
committersageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Fri, 1 Jun 2007 21:17:34 +0000 (21:17 +0000)
git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@1388 29311d96-e01e-0410-9327-a35deaab8ce9

trunk/ceph/ebofs/FileJournal.cc
trunk/ceph/ebofs/FileJournal.h

index 5f38c7725ea8c6251a90b9a9d9c193002063bab1..87ea20199cd24625eed31811a0160f5b103efa2d 100644 (file)
@@ -57,119 +57,144 @@ void FileJournal::close()
 
 void FileJournal::start_writer()
 {
-  writer_stop = false;
+  write_stop = false;
   write_thread.create();
 }
 
 void FileJournal::stop_writer()
 {
-  writer_lock.Lock();
+  write_lock.Lock();
   {
-       writer_stop = true;
-       writer_cond.Signal();
+    write_stop = true;
+    write_cond.Signal();
   } 
-  writer_lock.Unlock();
+  write_lock.Unlock();
   write_thread.join();
 }
 
+
+void FileJournal::write_header()
+{
+  dout(10) << "write_header" << endl;
+  
+  ::lseek(fd, 0, SEEK_SET);
+  ::write(fd, &header, sizeof(header));
+}
+
+
 void FileJournal::write_thread_entry()
 {
   dout(10) << "write_thread_entry start" << endl;
-  writer_lock.Lock();
+  write_lock.Lock();
   
   while (!write_stop) {
-       if (writeq.empty()) {
-         // sleep
-         dout(20) << "write_thread_entry going to sleep" << endl;
-         write_cond.Wait(writer_lock);
-         dout(20) << "write_thread_entry woke up" << endl;
-         continue;
-       }
-
-       // do queued writes
-       while (!writeq.empty()) {
-         // grab next item
-         epoch_t e = writeq.front().first;
-         bufferlist bl;
-         bl.claim(writeq.front().second);
-         writeq.pop_front();
-         Context *oncommit = commitq.front();
-         commitq.pop_front();
-
-         dout(15) << "write_thread_entry writing " << bottom << " : " 
-                          << bl.length() 
-                          << " epoch " << e
-                          << endl;
-         
-         // write epoch, len, data.
-         ::fseek(fd, bottom, SEEK_SET);
-         ::write(fd, &e, sizeof(e));
-
-         uint32_t len = bl.length();
-         ::write(fd, &len, sizeof(len));
-
-         for (list<bufferptr>::const_iterator it = bl.buffers().begin();
-                  it != bl.buffers().end();
-                  it++) {
-               if ((*it).length() == 0) continue;  // blank buffer.
-               ::write(fd, (char*)(*it).c_str(), (*it).length() );
-         }
-
-         // move position pointer
-         bottom += sizeof(epoch_t) + sizeof(uint32_t) + e.length();
-
-         // do commit callback
-         if (oncommit) {
-               oncommit->finish(0);
-               delete oncommit;
-         }
-       }
+    if (writeq.empty()) {
+      // sleep
+      dout(20) << "write_thread_entry going to sleep" << endl;
+      write_cond.Wait(write_lock);
+      dout(20) << "write_thread_entry woke up" << endl;
+      continue;
+    }
+    
+    // do queued writes
+    while (!writeq.empty()) {
+      // grab next item
+      epoch_t e = writeq.front().first;
+      bufferlist bl;
+      bl.claim(writeq.front().second);
+      writeq.pop_front();
+      Context *oncommit = commitq.front();
+      commitq.pop_front();
+      
+      dout(15) << "write_thread_entry writing " << bottom << " : " 
+              << bl.length() 
+              << " epoch " << e
+              << endl;
+      
+      // write epoch, len, data.
+      ::fseek(fd, bottom, SEEK_SET);
+      ::write(fd, &e, sizeof(e));
+      
+      uint32_t len = bl.length();
+      ::write(fd, &len, sizeof(len));
+      
+      for (list<bufferptr>::const_iterator it = bl.buffers().begin();
+          it != bl.buffers().end();
+          it++) {
+       if ((*it).length() == 0) continue;  // blank buffer.
+       ::write(fd, (char*)(*it).c_str(), (*it).length() );
+      }
+      
+      // move position pointer
+      bottom += sizeof(epoch_t) + sizeof(uint32_t) + e.length();
+      
+      // do commit callback
+      if (oncommit) {
+       oncommit->finish(0);
+       delete oncommit;
+      }
+    }
   }
   
-  writer_lock.Unlock();
+  write_lock.Unlock();
   dout(10) << "write_thread_entry finish" << endl;
 }
 
 void FileJournal::submit_entry(bufferlist& e, Context *oncommit)
 {
   dout(10) << "submit_entry " << bottom << " : " << e.length()
-                  << " epoch " << ebofs->super_epoch
-                  << " " << oncommit << endl;
-
+          << " epoch " << ebofs->super_epoch
+          << " " << oncommit << endl;
+  
   // dump on queue
   writeq.push_back(pair<epoch_t,bufferlist>(ebofs->super_epoch, e));
   commitq.push_back(oncommit);
 
   // kick writer thread
-  writer_cond.Signal();
+  write_cond.Signal();
 }
 
 
 void FileJournal::commit_epoch_start()
 {
   dout(10) << "commit_epoch_start" << endl;
+
+  write_lock.Lock();
+  {
+    header.epoch2 = ebofs->super_epoch;
+    header.top2 = bottom;
+    write_header();
+  }
+  write_lock.Unlock();
 }
 
 void FileJournal::commit_epoch_finish()
 {
   dout(10) << "commit_epoch_finish" << endl;
-  
-  // flush any unwritten items in previous epoch
-  writer_lock.Lock();
+
+  write_lock.Lock();
   {
-       while (!writeq.empty() &&
-                  writeq.front().first < ebofs->super_epoch) {
-         dout(15) << " dropping uncommitted journal item from prior epoch" << endl;
-         writeq.pop_front();
-         Context *oncommit = commitq.front();
-         commitq.pop_front();
-
-         if (oncommit) {
-               oncommit->finish(0);
-               delete oncommit;
-         }
-       }
+    // update header
+    header.epoch1 = ebofs->super_epoch;
+    header.top1 = header.top2;
+    header.epoch2 = 0;
+    header.top2 = 0;
+    write_header();
+
+    // flush any unwritten items in previous epoch
+    while (!writeq.empty() &&
+          writeq.front().first < ebofs->super_epoch) {
+      dout(15) << " dropping uncommitted journal item from prior epoch" << endl;
+      writeq.pop_front();
+      Context *oncommit = commitq.front();
+      commitq.pop_front();
+         
+      if (oncommit) {
+       oncommit->finish(0);
+       delete oncommit;
+      }
+    }
   }
-  writer_lock.Unlock();
+  write_lock.Unlock();
   
 }
index 33333f7259fb3b5aeac317c1df156050c31a3b6a..6c34f24f339559a425df42d7036f8803060f9454 100644 (file)
 
 
 class FileJournal : public Journal {
+public:
+  struct header_t {
+    epoch_t epoch1;
+    off_t top1;
+    epoch_t epoch2;
+    off_t top2;
+  } header;
+
+private:
   string fn;
 
   off_t max_size;
@@ -31,10 +40,12 @@ class FileJournal : public Journal {
   int fd;
 
   list<pair<epoch_t,bufferlist> > writeq;  // currently journaling
-  map<off_t,Context*> commitq; // currently journaling
+  list<Context*> commitq; // currently journaling
   
   // write thread
-  bool writer_stop;
+  Mutex write_lock;
+  Cond write_cond;
+  bool write_stop;
 
   void write_header();
   void start_writer();
@@ -42,23 +53,23 @@ class FileJournal : public Journal {
   void write_thread_entry();
 
   class Writer : public Thread {
-       FileJournal *journal;
+    FileJournal *journal;
   public:
-       Writer(FileJournal *fj) : journal(fj) {}
-       void *entry() {
-         journal->write_thread();
-         return 0;
-       }
-  } writer_thread;
+    Writer(FileJournal *fj) : journal(fj) {}
+    void *entry() {
+      journal->write_thread();
+      return 0;
+    }
+  } write_thread;
 
  public:
   FileJournal(Ebofs *e, char *f, off_t sz) : 
-       Journal(e),
-       fn(f), max_size(sz),
-       top(0), bottom(0), committing_to(0),
-       fd(0),
-       writer_stop(false), writer_thread(this)
-       { }
+    Journal(e),
+    fn(f), max_size(sz),
+    top(0), bottom(0), committing_to(0),
+    fd(0),
+    write_stop(false), write_thread(this)
+  { }
   ~FileJournal() {}
 
   void create();