]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
filejournal: fix journal full handling
authorSage Weil <sage@newdream.net>
Thu, 1 Apr 2010 18:39:16 +0000 (11:39 -0700)
committerSage Weil <sage@newdream.net>
Thu, 1 Apr 2010 18:39:16 +0000 (11:39 -0700)
We would block on journal full, but then try to continue where we left off,
which broke thoroughly.  Add return codes, and wait at the proper times:

 - if journal if full on first event, wait. otherwise, write what we have
   so far.
 - wait in write_thread_entry(), not check_for_full().

Also fix up 'room' calculation.

src/os/FileJournal.cc
src/os/FileJournal.h

index 0fb49f0812572a1cdb9fda101514d92ae4f8e724..c69463ada433fab56222de3260d51cb9cb156687 100644 (file)
@@ -321,14 +321,20 @@ bufferptr FileJournal::prepare_header()
 
 
 
-bool FileJournal::check_for_full(__u64 seq, off64_t pos, off64_t size)
+int FileJournal::check_for_full(__u64 seq, off64_t pos, off64_t size)
 {
   // already full?
   if (full_commit_seq || full_restart_seq)
-    return false;
+    return -ENOSPC;
 
- retry:
-  off64_t room = (header.max_size - pos) + (header.start - get_top());
+  // take 1 byte off so that we only get pos == header.start on EMPTY, never on FULL.
+  off64_t room;
+  if (pos >= header.start)
+    room = (header.max_size - pos) + (header.start - get_top()) - 1;
+  else
+    room = header.start - pos - 1;
+  dout(10) << "room " << room << " max_size " << max_size << " pos " << pos << " header.start " << header.start
+          << " top " << get_top() << dendl;
 
   if (do_sync_cond) {
     if (room < (header.max_size >> 1) &&
@@ -340,7 +346,7 @@ bool FileJournal::check_for_full(__u64 seq, off64_t pos, off64_t size)
     dout(10) << "check_for_full at " << pos << " : " << size << " < " << room << dendl;
     if (pos + size > header.max_size)
       must_write_header = true;
-    return true;
+    return 0;
   }
 
   // full
@@ -349,13 +355,10 @@ bool FileJournal::check_for_full(__u64 seq, off64_t pos, off64_t size)
          << " (max_size " << header.max_size << " start " << header.start << ")"
          << dendl;
 
-  // wait?
-  if (wait_on_full) {
-    dout(1) << "check_for_full waiting for a commit" << dendl;
-    commit_cond.Wait(write_lock);
-    goto retry;
-  }  
+  if (wait_on_full)
+    return -ENOSPC;
 
+  // throw out what we have so far
   full_commit_seq = seq;
   full_restart_seq = seq+1;
   while (!writeq.empty()) {
@@ -368,11 +371,10 @@ bool FileJournal::check_for_full(__u64 seq, off64_t pos, off64_t size)
     writeq.pop_front();
   }  
   print_header();
-  return false;
-
+  return -ENOSPC;
 }
 
-void FileJournal::prepare_multi_write(bufferlist& bl, __u64& orig_ops, __u64& orig_bytes)
+int FileJournal::prepare_multi_write(bufferlist& bl, __u64& orig_ops, __u64& orig_bytes)
 {
   // gather queued writes
   off64_t queue_pos = write_pos;
@@ -381,14 +383,16 @@ void FileJournal::prepare_multi_write(bufferlist& bl, __u64& orig_ops, __u64& or
   unsigned bmax = g_conf.journal_max_write_bytes;
 
   if (full_commit_seq || full_restart_seq)
-    return;
+    return -ENOSPC;
   
   while (!writeq.empty()) {
-    bool r = prepare_single_write(bl, queue_pos, orig_bytes);
-    if (!r)
-      break;
-
-    orig_ops++;
+    int r = prepare_single_write(bl, queue_pos, orig_ops, orig_bytes);
+    if (r == -ENOSPC) {
+      if (orig_ops)
+       break;         // commit what we have
+      dout(20) << "prepare_multi_write full on first entry, need to wait" << dendl;
+      return -ENOSPC;  // hrm, full on first op
+    }
 
     if (eleft) {
       if (--eleft == 0) {
@@ -405,9 +409,10 @@ void FileJournal::prepare_multi_write(bufferlist& bl, __u64& orig_ops, __u64& or
   }
 
   //assert(write_pos + bl.length() == queue_pos);
+  return 0;
 }
 
-bool FileJournal::prepare_single_write(bufferlist& bl, off64_t& queue_pos, __u64& orig_size)
+int FileJournal::prepare_single_write(bufferlist& bl, off64_t& queue_pos, __u64& orig_ops, __u64& orig_bytes)
 {
   // grab next item
   __u64 seq = writeq.front().seq;
@@ -415,13 +420,15 @@ bool FileJournal::prepare_single_write(bufferlist& bl, off64_t& queue_pos, __u64
   off64_t base_size = 2*sizeof(entry_header_t) + ebl.length();
   off64_t size = ROUND_UP_TO(base_size, header.alignment);
 
-  if (!check_for_full(seq, queue_pos, size))
-    return false;
+  int r = check_for_full(seq, queue_pos, size);
+  if (r < 0)
+    return r;   // ENOSPC or EAGAIN
 
-  orig_size += ebl.length();
+  orig_bytes += ebl.length();
+  orig_ops++;
 
   // add to write buffer
-  dout(15) << "prepare_single_write will write " << queue_pos << " : seq " << seq
+  dout(15) << "prepare_single_write " << orig_ops << " will write " << queue_pos << " : seq " << seq
           << " len " << ebl.length() << " -> " << size
           << dendl;
     
@@ -454,7 +461,7 @@ bool FileJournal::prepare_single_write(bufferlist& bl, off64_t& queue_pos, __u64
   if (queue_pos > header.max_size)
     queue_pos = queue_pos + get_top() - header.max_size;
 
-  return true;
+  return 0;
 }
 
 void FileJournal::write_bl(off64_t& pos, bufferlist& bl)
@@ -616,7 +623,14 @@ void FileJournal::write_thread_entry()
     __u64 orig_bytes = 0;
 
     bufferlist bl;
-    prepare_multi_write(bl, orig_ops, orig_bytes);
+    int r = prepare_multi_write(bl, orig_ops, orig_bytes);
+    if (r == -ENOSPC) {
+      dout(20) << "write_thread_entry full, going to sleep (waiting for commit)" << dendl;
+      commit_cond.Wait(write_lock);
+      dout(20) << "write_thread_entry woke up" << dendl;
+      continue;
+    }
+    assert(r == 0);
     do_write(bl);
     
     __u64 new_ops = throttle_ops.put(orig_ops);
index 1674a02a90635cf2e92ff57a15da94a77a32fb5a..d96c4d68f355e5508cd3c99509494d46ce65a30c 100644 (file)
@@ -120,9 +120,9 @@ private:
   void stop_writer();
   void write_thread_entry();
 
-  bool check_for_full(__u64 seq, off64_t pos, off64_t size);
-  void prepare_multi_write(bufferlist& bl, __u64& orig_ops, __u64& orig_bytse);
-  bool prepare_single_write(bufferlist& bl, off64_t& queue_pos, __u64& orig_bytes);
+  int check_for_full(__u64 seq, off64_t pos, off64_t size);
+  int prepare_multi_write(bufferlist& bl, __u64& orig_ops, __u64& orig_bytee);
+  int prepare_single_write(bufferlist& bl, off64_t& queue_pos, __u64& orig_ops, __u64& orig_bytes);
   void do_write(bufferlist& bl);
 
   void write_bl(off64_t& pos, bufferlist& bl);