]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mds: truncate journal on partial write
authorSage Weil <sage@newdream.net>
Tue, 14 Jul 2009 19:27:11 +0000 (12:27 -0700)
committerSage Weil <sage@newdream.net>
Tue, 14 Jul 2009 19:40:39 +0000 (12:40 -0700)
Otherwise, we start writing new entries, then restart, and end up
with garbage at the write pos.

src/TODO
src/mds/MDLog.cc
src/mds/MDLog.h
src/osdc/Journaler.cc
src/osdc/Journaler.h

index e7ca7821f35db8790a90223e39dc41bde11a796f..c8510f68852eb748948d4ef6254f9722cfdc990c 100644 (file)
--- a/src/TODO
+++ b/src/TODO
@@ -162,7 +162,6 @@ journaler
 - should we pad with zeros to avoid splitting individual entries?
   - make it a g_conf flag?
   - have to fix reader to skip over zeros (either <4 bytes for size, or zeroed sizes)
-- need to truncate at detected (valid) write_pos to clear out any other partial trailing writes
 
 
 mon
index fd777c77df77635b1dd318325426cdebbaac9e9b..c7682ff37a28501c5384778c89cbd4395de345e6 100644 (file)
@@ -466,6 +466,16 @@ public:
   }
 };
 
+struct C_MDL_ReplayTruncated : public Context {
+  MDLog *mdl;
+  C_MDL_ReplayTruncated(MDLog *l) : mdl(l) {}
+  void finish(int r) {
+    mdl->mds->mds_lock.Lock();
+    mdl->_replay_truncated();
+    mdl->mds->mds_lock.Unlock();
+  }
+};
+
 
 
 // i am a separate thread
@@ -543,22 +553,36 @@ void MDLog::_replay_thread()
   // done!
   if (r == 0) {
     assert(journaler->get_read_pos() == journaler->get_write_pos());
-    dout(10) << "_replay - complete, " << num_events << " events, new read/expire pos is " << new_expire_pos << dendl;
-    
+    dout(10) << "_replay - complete, " << num_events
+            << " events, new read/expire pos is " << new_expire_pos << dendl;
+
     // move read pointer _back_ to first subtree map we saw, for eventual trimming
     journaler->set_read_pos(new_expire_pos);
     journaler->set_expire_pos(new_expire_pos);
     logger->set(l_mdl_expos, new_expire_pos);
+
+    dout(10) << "_replay - truncating at " << journaler->get_write_pos() << dendl;
+    Context *c = new C_MDL_ReplayTruncated(this);
+    if (journaler->truncate_tail_junk(c)) {
+      delete c;
+      
+      dout(10) << "_replay_thread nothing to truncate, kicking waiters" << dendl;
+      finish_contexts(waitfor_replay, 0);  
+    }
+  } else {
+    dout(10) << "_replay_thread kicking waiters" << dendl;
+    finish_contexts(waitfor_replay, r);  
   }
 
-  // kick waiter(s)
-  list<Context*> ls;
-  ls.swap(waitfor_replay);
-  finish_contexts(ls, r);  
-  
   dout(10) << "_replay_thread finish" << dendl;
   mds->mds_lock.Unlock();
 }
 
 
+void MDLog::_replay_truncated()
+{
+  dout(10) << "_replay_truncated" << dendl;
+  finish_contexts(waitfor_replay, 0);  
+}
+
 
index 11707f58ea0e5993ac474675e66fc5a49b91e220..f7e43cacecd50ce03945f0604d3909295d54d41b 100644 (file)
@@ -60,8 +60,9 @@ using std::map;
 
 
 class MDLog {
- protected:
+public:
   MDS *mds;
+protected:
   int num_events; // in events
 
   int unflushed;
@@ -94,6 +95,8 @@ class MDLog {
 
   void _replay();         // old way
   void _replay_thread();  // new way
+  void _replay_truncated();
+  friend class C_MDL_ReplayTruncated;
 
 
   // -- segments --
index e9fec63f6e1153ac2db4049f912cbcf6afac69c2..c860a582a3723c1b58b23bbc1f2568bac86b9df5 100644 (file)
@@ -682,6 +682,8 @@ bool Journaler::is_readable()
   // partial fragment at the end?
   if (received_pos == write_pos) {
     dout(10) << "is_readable() detected partial entry at tail, adjusting write_pos to " << read_pos << dendl;
+    if (write_pos > read_pos)
+      junk_tail_pos = write_pos; // note old tail
     write_pos = flush_pos = ack_pos = safe_pos = read_pos;
     assert(write_buf.length() == 0);
 
@@ -701,6 +703,20 @@ bool Journaler::is_readable()
   return false;
 }
 
+bool Journaler::truncate_tail_junk(Context *c)
+{
+  if (!junk_tail_pos) {
+    dout(10) << "truncate_tail_junk -- no trailing junk" << dendl;
+    return true;
+  }
+
+  __s64 len = junk_tail_pos - write_pos;
+  dout(10) << "truncate_tail_junk " << write_pos << "~" << len << dendl;
+  SnapContext snapc;
+  filer.zero(ino, &layout, snapc, write_pos, len, g_clock.now(), 0, NULL, c);
+  return false;
+}
+
 
 /* try_read_entry(bl)
  *  read entry into bl if it's ready.
index 55ec312233c3f480d0f3d4387943b8b7cfdefc66..0fa129558ffb74cf8c986cb51c381ba7de3e3ace 100644 (file)
@@ -173,6 +173,8 @@ public:
   __s64 fetch_len;     // how much to read at a time
   __s64 prefetch_from; // how far from end do we read next chunk
 
+  __s64 junk_tail_pos; // for truncate
+
   // for read_entry() in-progress read
   bufferlist *read_bl;
   Context    *on_read_finish;
@@ -210,6 +212,7 @@ public:
     write_pos(0), flush_pos(0), ack_pos(0), safe_pos(0),
     read_pos(0), requested_pos(0), received_pos(0),
     fetch_len(0), prefetch_from(0),
+    junk_tail_pos(0),
     read_bl(0), on_read_finish(0), on_readable(0),
     expire_pos(0), trimming_pos(0), trimmed_pos(0) 
   {
@@ -260,6 +263,8 @@ public:
   void wait_for_readable(Context *onfinish);
   void read_entry(bufferlist* bl, Context *onfinish);
   
+  bool truncate_tail_junk(Context *fin);
+
   // trim
   void set_expire_pos(__s64 ep) { expire_pos = ep; }
   void trim();