]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
filejournal: rewrite completion handling, fix ordering on full->notfull
authorSage Weil <sage@newdream.net>
Thu, 13 Jan 2011 21:14:24 +0000 (13:14 -0800)
committerSage Weil <sage@newdream.net>
Thu, 13 Jan 2011 21:14:40 +0000 (13:14 -0800)
Rewriting the completion handling to be simpler, clearer, so that it is
easier to maintain a strict completion ordering invariant.

This also fixes an ordering bug: When restarting journal, we defer
initially until we get a committed_thru from the previous commit and then
do all those completions.  That same logic needs to also apply to new items
submitted during that commit interval.  This was broken before, but the
simpler structure fixes it.  Fixes #666.

Tested-by: Jim Schutt <jaschut@sandia.gov>
Signed-off-by: Sage Weil <sage@newdream.net>
src/os/FileJournal.cc
src/os/FileJournal.h

index 7f33c84fa395a7068ef0f3a219ded1d50308ee70..54e0f0503905037d801507670fa8f0ab681ac6c6 100644 (file)
@@ -510,10 +510,6 @@ int FileJournal::prepare_multi_write(bufferlist& bl, uint64_t& orig_ops, uint64_
        // throw out what we have so far
        full_state = FULL_FULL;
        while (!writeq.empty()) {
-         if (writeq.front().fin) {
-           writing_seq.push_back(writeq.front().seq);
-           writing_fin.push_back(writeq.front().fin);
-         }
          dout(30) << "XXX throttle put " << writeq.front().bl.length() << dendl;
          throttle_ops.put(1);
          throttle_bytes.put(writeq.front().bl.length());
@@ -544,6 +540,36 @@ int FileJournal::prepare_multi_write(bufferlist& bl, uint64_t& orig_ops, uint64_
   return 0;
 }
 
+/*
+void FileJournal::queue_write_fin(uint64_t seq, Context *fin)
+{
+  writing_seq.push_back(seq);
+  if (!waiting_for_notfull.empty()) {
+    // make sure previously unjournaled stuff waiting for UNFULL triggers
+    // _before_ newly journaled stuff does
+    dout(10) << "queue_write_fin will defer seq " << seq << " callback " << fin
+            << " until after UNFULL" << dendl;
+    C_Gather *g = new C_Gather(writeq.front().fin);
+    writing_fin.push_back(g->new_sub());
+    waiting_for_notfull.push_back(g->new_sub());
+  } else {
+    writing_fin.push_back(writeq.front().fin);
+    dout(20) << "queue_write_fin seq " << seq << " callback " << fin << dendl;
+  }
+}
+*/
+
+void FileJournal::queue_completions_thru(uint64_t seq)
+{
+  while (!completions.empty() &&
+        completions.front().first <= seq) {
+    dout(10) << "queue_completions_thru seq " << seq << " queueing seq " << completions.front().first
+            << " " << completions.front().second << dendl;
+    finisher->queue(completions.front().second);
+    completions.pop_front();
+  }
+}
+
 int FileJournal::prepare_single_write(bufferlist& bl, off64_t& queue_pos, uint64_t& orig_ops, uint64_t& orig_bytes)
 {
   // grab next item
@@ -593,23 +619,11 @@ int FileJournal::prepare_single_write(bufferlist& bl, off64_t& queue_pos, uint64
     bl.push_back(bp);
   }
   bl.append((const char*)&h, sizeof(h));
-   
-  if (writeq.front().fin) {
-    writing_seq.push_back(seq);
-    if (!waiting_for_notfull.empty()) {
-      // make sure previously unjournaled stuff waiting for UNFULL triggers
-      // _before_ newly journaled stuff does
-      dout(10) << " will defer seq " << seq << " callback until after UNFULL" << dendl;
-      C_Gather *g = new C_Gather(writeq.front().fin);
-      writing_fin.push_back(g->new_sub());
-      waiting_for_notfull.push_back(g->new_sub());
-    } else
-      writing_fin.push_back(writeq.front().fin);
-  }
 
   // pop from writeq
   writeq.pop_front();
   journalq.push_back(pair<uint64_t,off64_t>(seq, queue_pos));
+  writing_seq = seq;
 
   queue_pos += size;
   if (queue_pos > header.max_size)
@@ -731,15 +745,21 @@ void FileJournal::do_write(bufferlist& bl)
   write_pos = pos;
   assert(write_pos % header.alignment == 0);
 
+  journaled_seq = writing_seq;
+
   // kick finisher?  
   //  only if we haven't filled up recently!
   if (full_state != FULL_NOTFULL) {
-    dout(10) << "do_write NOT queueing finisher seq " << writing_seq.front()
+    dout(10) << "do_write NOT queueing finisher seq " << journaled_seq
             << ", full_commit_seq|full_restart_seq" << dendl;
   } else {
-    dout(20) << "do_write queueing finishers " << writing_fin << dendl;
-    writing_seq.clear();
-    finisher->queue(writing_fin);
+    if (plug_journal_completions) {
+      dout(20) << "do_write NOT queueing finishers through seq " << journaled_seq
+              << " due to completion plug" << dendl;
+    } else {
+      dout(20) << "do_write queueing finishers through seq " << journaled_seq << dendl;
+      queue_completions_thru(journaled_seq);
+    }
   }
 }
 
@@ -810,21 +830,20 @@ void FileJournal::submit_entry(uint64_t seq, bufferlist& e, int alignment, Conte
           << " len " << e.length()
           << " (" << oncommit << ")" << dendl;
 
+  if (oncommit)
+    completions.push_back(pair<uint64_t,Context*>(seq, oncommit));
+
   if (full_state == FULL_NOTFULL) {
     // queue and kick writer thread
     dout(30) << "XXX throttle take " << e.length() << dendl;
     throttle_ops.take(1);
     throttle_bytes.take(e.length());
 
-    writeq.push_back(write_item(seq, e, alignment, oncommit));
+    writeq.push_back(write_item(seq, e, alignment));
     write_cond.Signal();
   } else {
     // not journaling this.  restart writing no sooner than seq + 1.
     dout(10) << " journal is/was full" << dendl;
-    if (oncommit) {
-      writing_seq.push_back(seq);
-      writing_fin.push_back(oncommit);
-    }
   }
 }
 
@@ -843,8 +862,9 @@ void FileJournal::commit_start()
     break;
 
   case FULL_WAIT:
-    dout(1) << " FULL_WAIT -> FULL_NOTFULL.  journal now active." << dendl;
+    dout(1) << " FULL_WAIT -> FULL_NOTFULL.  journal now active, setting completion plug." << dendl;
     full_state = FULL_NOTFULL;
+    plug_journal_completions = true;
     break;
   }
 }
@@ -877,40 +897,20 @@ void FileJournal::committed_thru(uint64_t seq)
   }
   must_write_header = true;
   print_header();
-  
-  // recently were full, but aren't now.
-  if (!waiting_for_notfull.empty()) {
-    dout(10) << " finishing waiting_for_notfull items " << waiting_for_notfull << dendl;
-    finisher->queue(waiting_for_notfull);
-  }
 
-  // committed but writing
-  while (!writing_seq.empty() && writing_seq.front() <= seq) {
-    dout(15) << " finishing committed but writing|waiting seq " << writing_seq.front() << dendl;
-    finisher->queue(writing_fin.front());
-    writing_seq.pop_front();
-    writing_fin.pop_front();
+  // completions!
+  queue_completions_thru(seq);
+  if (plug_journal_completions) {
+    dout(10) << " removing completion plug, queuing completions thru journaled_seq " << journaled_seq << dendl;
+    plug_journal_completions = false;
+    queue_completions_thru(journaled_seq);
   }
 
-  if (full_state == FULL_WAIT) {
-    // will complete on _next_ commit
-    while (!writing_seq.empty()) {
-      dout(15) << " queuing seq " << writing_seq.front() << " " << writing_fin.front()
-              << " in waiting_for_notfull" << dendl;
-      waiting_for_notfull.push_back(writing_fin.front());
-      writing_seq.pop_front();
-      writing_fin.pop_front();
-    }
-  }
-  
   // committed but unjournaled items
   while (!writeq.empty() && writeq.front().seq <= seq) {
     dout(15) << " dropping committed but unwritten seq " << writeq.front().seq 
             << " len " << writeq.front().bl.length()
-            << " (" << writeq.front().fin << ")"
             << dendl;
-    if (writeq.front().fin)
-      finisher->queue(writeq.front().fin);
     dout(30) << "XXX throttle put " << writeq.front().bl.length() << dendl;
     throttle_ops.put(1);
     throttle_bytes.put(writeq.front().bl.length());
index 0959f7085fe7ffb0a1bc4c8f65de3b35df6616c5..6ffa3a30c0df7a67d239acb35fc01bc78a03d6fc 100644 (file)
@@ -98,22 +98,18 @@ private:
 
   // in journal
   deque<pair<uint64_t, off64_t> > journalq;  // track seq offsets, so we can trim later.
+  deque<pair<uint64_t,Context*> > completions;  // queued, writing, waiting for commit.
 
-  // currently being journaled and awaiting callback.
-  //  or, awaiting callback bc journal was full.
-  deque<uint64_t> writing_seq;
-  deque<Context*> writing_fin;
-
-  deque<Context*> waiting_for_notfull;
+  uint64_t writing_seq, journaled_seq;
+  bool plug_journal_completions;
 
   // waiting to be journaled
   struct write_item {
     uint64_t seq;
     bufferlist bl;
     int alignment;
-    Context *fin;
-    write_item(uint64_t s, bufferlist& b, int al, Context *f) :
-      seq(s), alignment(al), fin(f) { 
+    write_item(uint64_t s, bufferlist& b, int al) :
+      seq(s), alignment(al) { 
       bl.claim(b);
     }
   };
@@ -140,6 +136,8 @@ private:
   void stop_writer();
   void write_thread_entry();
 
+  void queue_completions_thru(uint64_t seq);
+
   int check_for_full(uint64_t seq, off64_t pos, off64_t size);
   int prepare_multi_write(bufferlist& bl, uint64_t& orig_ops, uint64_t& orig_bytee);
   int prepare_single_write(bufferlist& bl, off64_t& queue_pos, uint64_t& orig_ops, uint64_t& orig_bytes);
@@ -173,6 +171,8 @@ private:
     last_committed_seq(0), 
     full_state(FULL_NOTFULL),
     fd(-1),
+    writing_seq(0), journaled_seq(0),
+    plug_journal_completions(false),
     write_lock("FileJournal::write_lock"),
     write_stop(false),
     write_thread(this) { }
@@ -198,7 +198,7 @@ private:
   void commit_start();
   void committed_thru(uint64_t seq);
   bool should_commit_now() {
-    return full_state != FULL_NOTFULL || !waiting_for_notfull.empty();
+    return full_state != FULL_NOTFULL;
   }
 
   void set_wait_on_full(bool b) { wait_on_full = b; }