]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
journaler: ack barriers
authorSage Weil <sage@newdream.net>
Tue, 10 Jun 2008 13:40:30 +0000 (06:40 -0700)
committerSage Weil <sage@newdream.net>
Tue, 10 Jun 2008 13:45:45 +0000 (06:45 -0700)
src/osdc/Journaler.cc
src/osdc/Journaler.h

index 3d11a039fe2b3fdc06dfd31a11fb3ae2c9e38d8c..b43a6cd90e1c37f9f0c9b589a10a6ba8aad3d085 100644 (file)
@@ -191,40 +191,63 @@ void Journaler::_finish_write_head(Header &wrote, Context *oncommit)
 class Journaler::C_Flush : public Context {
   Journaler *ls;
   __s64 start;
+  utime_t stamp;
   bool safe;
 public:
-  C_Flush(Journaler *l, __s64 s, bool sa) : ls(l), start(s), safe(sa) {}
-  void finish(int r) { ls->_finish_flush(r, start, safe); }
+  C_Flush(Journaler *l, __s64 s, utime_t st, bool sa) : ls(l), start(s), stamp(st), safe(sa) {}
+  void finish(int r) { ls->_finish_flush(r, start, stamp, safe); }
 };
 
-void Journaler::_finish_flush(int r, __s64 start, bool safe)
+void Journaler::_finish_flush(int r, __s64 start, utime_t stamp, bool safe)
 {
   assert(r>=0);
 
   assert((!safe && start >= ack_pos) || (safe && start >= safe_pos));
   assert(start < flush_pos);
-  assert(pending_flush.count(start));
 
   // calc latency?
   if (logger) {
     utime_t lat = g_clock.now();
-    lat -= pending_flush[start];
+    lat -= stamp;
     logger->favg(safe ? "jsafelat" : "jacklat", lat);
   }
 
   // adjust ack_pos
   if (!safe) {
-    ack_pos = pending_flush.begin()->first;
+    assert(pending_ack.count(start));
+    pending_ack.erase(start);
+    if (pending_ack.empty())
+      ack_pos = flush_pos;
+    else
+      ack_pos = *pending_ack.begin();
+    if (!ack_barrier.empty() && *ack_barrier.begin() < ack_pos)
+      ack_pos = *ack_barrier.begin();
   } else {
-    pending_flush.erase(start);
-    if (pending_flush.empty())
+    assert(pending_safe.count(start));
+    pending_safe.erase(start);
+    if (pending_safe.empty())
       safe_pos = flush_pos;
     else
-      safe_pos = pending_flush.begin()->first;
+      safe_pos = *pending_safe.begin();
+    
+    if (ack_barrier.count(start)) {
+      ack_barrier.erase(start);
+      
+      if (ack_pos == start) {
+       if (pending_ack.empty())
+         ack_pos = flush_pos;
+       else
+         ack_pos = *pending_ack.begin();
+       if (!ack_barrier.empty() && *ack_barrier.begin() < ack_pos)
+         ack_pos = *ack_barrier.begin();
+      }
+    }
   }
 
   dout(10) << "_finish_flush " << (safe ? "safe":"ack") << " from " << start
-          << ", pending_flush now " << pending_flush 
+          << ", pending_ack " << pending_ack
+    //<< ", pending_safe " << pending_safe
+          << ", ack_barrier " << ack_barrier
           << ", write positions now " << write_pos << "/" << flush_pos
           << "/" << ack_pos << "/" << safe_pos
           << dendl;
@@ -314,11 +337,13 @@ void Journaler::_do_flush()
   
   // submit write for anything pending
   // flush _start_ pos to _finish_flush
+  utime_t now = g_clock.now();
   filer.write(inode, flush_pos, len, write_buf, 
              CEPH_OSD_OP_INCLOCK_FAIL,
-             new C_Flush(this, flush_pos, false),  // on ACK
-             new C_Flush(this, flush_pos, true));  // on COMMIT
-  pending_flush[flush_pos] = g_clock.now();
+             new C_Flush(this, flush_pos, now, false),  // on ACK
+             new C_Flush(this, flush_pos, now, true));  // on COMMIT
+  pending_ack.insert(flush_pos);
+  pending_safe.insert(flush_pos);
   
   // adjust pointers
   flush_pos = write_pos;
@@ -329,7 +354,7 @@ void Journaler::_do_flush()
 
   
 
-void Journaler::flush(Context *onsync, Context *onsafe)
+void Journaler::flush(Context *onsync, Context *onsafe, bool add_ack_barrier)
 {
   // all flushed and acked?
   if (write_pos == ack_pos) {
@@ -341,10 +366,14 @@ void Journaler::flush(Context *onsync, Context *onsafe)
       delete onsync;
       onsync = 0;
     }
-    if (onsafe && write_pos == safe_pos) {
-      onsafe->finish(0);
-      delete onsafe;
-      onsafe = 0;
+    if (onsafe) {
+      if (write_pos == safe_pos) {
+       onsafe->finish(0);
+       delete onsafe;
+       onsafe = 0;
+      } else {
+       waitfor_safe[write_pos].push_back(onsafe);
+      }
     }
     return;
   }
@@ -377,6 +406,8 @@ void Journaler::flush(Context *onsync, Context *onsafe)
     waitfor_ack[write_pos].push_back(onsync);
   if (onsafe) 
     waitfor_safe[write_pos].push_back(onsafe);
+  if (add_ack_barrier)
+    ack_barrier.insert(write_pos);
 
   // write head?
   if (last_wrote_head.sec() + g_conf.journaler_write_head_interval < g_clock.now().sec()) {
index 7daed2043589a8ebd4908c231684dc985de86f3a..410760300ae057a898e20d96c584e67568d33b9b 100644 (file)
@@ -140,12 +140,13 @@ public:
   __s64 safe_pos;        // what has been committed safely to disk.
   bufferlist write_buf;  // write buffer.  flush_pos + write_buf.length() == write_pos.
 
-  std::map<__s64, utime_t> pending_flush;  // start offsets and times for pending flushes
+  std::set<__s64> pending_ack, pending_safe;
   std::map<__s64, std::list<Context*> > waitfor_ack;  // when flushed through given offset
   std::map<__s64, std::list<Context*> > waitfor_safe; // when safe through given offset
+  std::set<__s64> ack_barrier;
 
   void _do_flush();
-  void _finish_flush(int r, __s64 start, bool safe);
+  void _finish_flush(int r, __s64 start, utime_t stamp, bool safe);
   class C_Flush;
   friend class C_Flush;
 
@@ -223,13 +224,14 @@ public:
 
   __s64 get_write_pos() const { return write_pos; }
   __s64 get_write_ack_pos() const { return ack_pos; }
+  __s64 get_write_safe_pos() const { return safe_pos; }
   __s64 get_read_pos() const { return read_pos; }
   __s64 get_expire_pos() const { return expire_pos; }
   __s64 get_trimmed_pos() const { return trimmed_pos; }
 
   // write
   __s64 append_entry(bufferlist& bl, Context *onsync = 0);
-  void flush(Context *onsync = 0, Context *onsafe = 0);
+  void flush(Context *onsync = 0, Context *onsafe = 0, bool add_ack_barrier=false);
 
   // read
   void set_read_pos(__s64 p) {