]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
journaler: write out objects as we complete them
authorSage Weil <sage@newdream.net>
Mon, 20 Apr 2009 23:56:54 +0000 (16:56 -0700)
committerSage Weil <sage@newdream.net>
Mon, 20 Apr 2009 23:56:54 +0000 (16:56 -0700)
No reason to delay the write (except maybe to debug log flush()
calls in the MDS).

src/osdc/Journaler.cc
src/osdc/Journaler.h

index 06503eddeb6e075fb140bf63a4afdabf267fe310..73a71457bfc0bb3e614f09b2a91e7add7f6d6568 100644 (file)
@@ -333,11 +333,21 @@ __s64 Journaler::append_entry(bufferlist& bl)
   write_buf.claim_append(bl);
   write_pos += sizeof(s) + s;
 
+  // flush previous object?
+  int su = ceph_file_layout_su(layout);
+  int write_off = write_pos % su;
+  int write_obj = write_pos / su;
+  int flush_obj = flush_pos / su;
+  if (write_obj != flush_obj) {
+    dout(10) << " flushing completed object(s) (su " << su << " wro " << write_obj << " flo " << flush_obj << ")" << dendl;
+    _do_flush(write_buf.length() - write_off);
+  }
+
   return write_pos;
 }
 
 
-void Journaler::_do_flush()
+void Journaler::_do_flush(unsigned amount)
 {
   if (write_pos == flush_pos) return;
   assert(write_pos > flush_pos);
@@ -345,6 +355,8 @@ void Journaler::_do_flush()
   // flush
   unsigned len = write_pos - flush_pos;
   assert(len == write_buf.length());
+  if (amount && amount < len)
+    len = amount;
   dout(10) << "_do_flush flushing " << flush_pos << "~" << len << dendl;
   
   // submit write for anything pending
@@ -361,16 +373,22 @@ void Journaler::_do_flush()
   Context *onsafe = new C_Flush(this, flush_pos, now, true);  // on COMMIT
   pending_safe.insert(flush_pos);
 
+  bufferlist write_bl;
+
+  // adjust pointers
+  if (len == write_buf.length()) {
+    write_bl.swap(write_buf);
+  } else {
+    write_buf.splice(0, len, &write_bl);
+  }
+  flush_pos += len;
+  assert(write_buf.length() == write_pos - flush_pos);
+
   filer.write(ino, &layout, snapc,
-             flush_pos, len, write_buf, g_clock.now(),
+             flush_pos, len, write_bl, g_clock.now(),
              CEPH_OSD_FLAG_INCLOCK_FAIL,
              onack, onsafe);
-  
-  // adjust pointers
-  flush_pos = write_pos;
-  write_buf.clear();  
-  
+    
   dout(10) << "_do_flush write pointers now at " << write_pos << "/" << flush_pos << "/" << ack_pos << "/" << safe_pos << dendl;
 }
 
index 15fac7871a2dc45731ca38da97ed6b3cd123d7a9..dea0d25379fd2fac8819739bb444dbcabde1c972 100644 (file)
@@ -152,7 +152,7 @@ public:
   std::map<__s64, std::list<Context*> > waitfor_safe; // when safe through given offset
   std::set<__s64> ack_barrier;
 
-  void _do_flush();
+  void _do_flush(unsigned amount=0);
   void _finish_flush(int r, __s64 start, utime_t stamp, bool safe);
   class C_Flush;
   friend class C_Flush;