]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
journaler: wait for ack and safe.. ack_pos flushed thing still wonky tho
authorSage Weil <sage@newdream.net>
Sat, 7 Jun 2008 14:06:31 +0000 (07:06 -0700)
committerSage Weil <sage@newdream.net>
Tue, 10 Jun 2008 13:45:45 +0000 (06:45 -0700)
src/include/ceph_fs.h
src/osdc/Journaler.cc
src/osdc/Journaler.h

index 88ec35d5d5f7cb394858937fc848a6787041fefd..04b4eda42f495829f112cc6c84180868a3d2a1f2 100644 (file)
@@ -781,7 +781,8 @@ enum {
        CEPH_OSD_OP_SAFE = 2,         /* want (or is) "safe" ack */
        CEPH_OSD_OP_RETRY = 4,        /* resend attempt */
        CEPH_OSD_OP_INCLOCK_FAIL = 8, /* fail on inclock collision */
-       CEPH_OSD_OP_BALANCE_READS = 16
+       CEPH_OSD_OP_BALANCE_READS = 16,
+       CEPH_OSD_OP_ACKNVRAM = 32,    /* ACK when stable in NVRAM, not RAM */
 };
 
 struct ceph_osd_peer_stat {
index 52b0250383f3a7d0f83b57202404c90d01fc1d0d..3d11a039fe2b3fdc06dfd31a11fb3ae2c9e38d8c 100644 (file)
@@ -29,7 +29,7 @@ void Journaler::reset()
 {
   dout(1) << "reset to blank journal" << dendl;
   state = STATE_ACTIVE;
-  write_pos = flush_pos = ack_pos =
+  write_pos = flush_pos = ack_pos = safe_pos =
     read_pos = requested_pos = received_pos =
     expire_pos = trimming_pos = trimmed_pos = ceph_file_layout_period(inode.layout);
 }
@@ -102,7 +102,7 @@ void Journaler::_finish_read_head(int r, bufferlist& bl)
   assert(bl.length() == sizeof(h));
   bl.copy(0, sizeof(h), (char*)&h);
 
-  write_pos = flush_pos = ack_pos = h.write_pos;
+  write_pos = flush_pos = ack_pos = safe_pos = h.write_pos;
   read_pos = requested_pos = received_pos = h.read_pos;
   expire_pos = h.expire_pos;
   trimmed_pos = trimming_pos = h.trimmed_pos;
@@ -133,7 +133,7 @@ void Journaler::_finish_probe_end(int r, __s64 end)
            << dendl;
   }
 
-  write_pos = flush_pos = ack_pos = end;
+  write_pos = flush_pos = ack_pos = safe_pos = end;
   
   // done.
   list<Context*> ls;
@@ -161,7 +161,7 @@ void Journaler::write_head(Context *oncommit)
   last_written.trimmed_pos = trimmed_pos;
   last_written.expire_pos = expire_pos;
   last_written.read_pos = read_pos;
-  last_written.write_pos = ack_pos; //write_pos;
+  last_written.write_pos = safe_pos;
   dout(10) << "write_head " << last_written << dendl;
   
   last_wrote_head = g_clock.now();
@@ -191,16 +191,17 @@ void Journaler::_finish_write_head(Header &wrote, Context *oncommit)
 class Journaler::C_Flush : public Context {
   Journaler *ls;
   __s64 start;
+  bool safe;
 public:
-  C_Flush(Journaler *l, __s64 s) : ls(l), start(s) {}
-  void finish(int r) { ls->_finish_flush(r, start); }
+  C_Flush(Journaler *l, __s64 s, bool sa) : ls(l), start(s), safe(sa) {}
+  void finish(int r) { ls->_finish_flush(r, start, safe); }
 };
 
-void Journaler::_finish_flush(int r, __s64 start)
+void Journaler::_finish_flush(int r, __s64 start, bool safe)
 {
   assert(r>=0);
 
-  assert(start >= ack_pos);
+  assert((!safe && start >= ack_pos) || (safe && start >= safe_pos));
   assert(start < flush_pos);
   assert(pending_flush.count(start));
 
@@ -208,27 +209,40 @@ void Journaler::_finish_flush(int r, __s64 start)
   if (logger) {
     utime_t lat = g_clock.now();
     lat -= pending_flush[start];
-    logger->favg("jlat", lat);
+    logger->favg(safe ? "jsafelat" : "jacklat", lat);
   }
 
-  pending_flush.erase(start);
-
   // adjust ack_pos
-  if (pending_flush.empty())
-    ack_pos = flush_pos;
-  else
+  if (!safe) {
     ack_pos = pending_flush.begin()->first;
+  } else {
+    pending_flush.erase(start);
+    if (pending_flush.empty())
+      safe_pos = flush_pos;
+    else
+      safe_pos = pending_flush.begin()->first;
+  }
 
-  dout(10) << "_finish_flush from " << start
+  dout(10) << "_finish_flush " << (safe ? "safe":"ack") << " from " << start
           << ", pending_flush now " << pending_flush 
-          << ", write positions now " << write_pos << "/" << flush_pos << "/" << ack_pos
+          << ", write positions now " << write_pos << "/" << flush_pos
+          << "/" << ack_pos << "/" << safe_pos
           << dendl;
 
   // kick waiters <= ack_pos
-  while (!waitfor_flush.empty()) {
-    if (waitfor_flush.begin()->first > ack_pos) break;
-    finish_contexts(waitfor_flush.begin()->second);
-    waitfor_flush.erase(waitfor_flush.begin());
+  if (!safe) {
+    while (!waitfor_ack.empty()) {
+      if (waitfor_ack.begin()->first > ack_pos) break;
+      finish_contexts(waitfor_ack.begin()->second);
+      waitfor_ack.erase(waitfor_ack.begin());
+    }
+  } else {
+    while (!waitfor_safe.empty()) {
+      if (waitfor_safe.begin()->first > safe_pos) break;
+      finish_contexts(waitfor_safe.begin()->second);
+      waitfor_safe.erase(waitfor_safe.begin());
+    }
+
   }
 }
 
@@ -300,9 +314,10 @@ void Journaler::_do_flush()
   
   // submit write for anything pending
   // flush _start_ pos to _finish_flush
-  filer.write(inode, flush_pos, len, write_buf, CEPH_OSD_OP_INCLOCK_FAIL,
-             g_conf.journaler_safe ? 0:new C_Flush(this, flush_pos),  // on ACK
-             g_conf.journaler_safe ?   new C_Flush(this, flush_pos):0); // on COMMIT
+  filer.write(inode, flush_pos, len, write_buf, 
+             CEPH_OSD_OP_INCLOCK_FAIL,
+             new C_Flush(this, flush_pos, false),  // on ACK
+             new C_Flush(this, flush_pos, true));  // on COMMIT
   pending_flush[flush_pos] = g_clock.now();
   
   // adjust pointers
@@ -314,22 +329,30 @@ void Journaler::_do_flush()
 
   
 
-void Journaler::flush(Context *onsync)
+void Journaler::flush(Context *onsync, Context *onsafe)
 {
   // all flushed and acked?
   if (write_pos == ack_pos) {
     assert(write_buf.length() == 0);
-    dout(10) << "flush nothing to flush, write pointers at " << write_pos << "/" << flush_pos << "/" << ack_pos << dendl;
+    dout(10) << "flush nothing to flush, write pointers at " 
+            << write_pos << "/" << flush_pos << "/" << ack_pos << "/" << safe_pos << dendl;
     if (onsync) {
       onsync->finish(0);
       delete onsync;
+      onsync = 0;
+    }
+    if (onsafe && write_pos == safe_pos) {
+      onsafe->finish(0);
+      delete onsafe;
+      onsafe = 0;
     }
     return;
   }
 
   if (write_pos == flush_pos) {
     assert(write_buf.length() == 0);
-    dout(10) << "flush nothing to flush, write pointers at " << write_pos << "/" << flush_pos << "/" << ack_pos << dendl;
+    dout(10) << "flush nothing to flush, write pointers at "
+            << write_pos << "/" << flush_pos << "/" << ack_pos << "/" << safe_pos << dendl;
   } else {
     if (1) {
       // maybe buffer
@@ -351,7 +374,9 @@ void Journaler::flush(Context *onsync)
 
   // queue waiter (at _new_ write_pos; will go when reached by ack_pos)
   if (onsync) 
-    waitfor_flush[write_pos].push_back(onsync);
+    waitfor_ack[write_pos].push_back(onsync);
+  if (onsafe) 
+    waitfor_safe[write_pos].push_back(onsafe);
 
   // write head?
   if (last_wrote_head.sec() + g_conf.journaler_write_head_interval < g_clock.now().sec()) {
@@ -455,7 +480,7 @@ void Journaler::_issue_read(__s64 len)
     if (flush_pos == ack_pos)
       flush();
     assert(flush_pos > ack_pos);
-    waitfor_flush[flush_pos].push_back(new C_RetryRead(this));
+    waitfor_ack[flush_pos].push_back(new C_RetryRead(this));
     return;
   }
 
@@ -547,7 +572,7 @@ bool Journaler::is_readable()
   // partial fragment at the end?
   if (received_pos == write_pos) {
     dout(10) << "is_readable() detected partial entry at tail, adjusting write_pos to " << read_pos << dendl;
-    write_pos = flush_pos = ack_pos = read_pos;
+    write_pos = flush_pos = ack_pos = safe_pos = read_pos;
     assert(write_buf.length() == 0);
 
     // truncate?
index 10520cc2edac8577c1ec300054b1c8b1b29f5ef4..7daed2043589a8ebd4908c231684dc985de86f3a 100644 (file)
@@ -137,13 +137,15 @@ public:
   __s64 write_pos;       // logical write position, where next entry will go
   __s64 flush_pos;       // where we will flush. if write_pos>flush_pos, we're buffering writes.
   __s64 ack_pos;         // what has been acked.
+  __s64 safe_pos;        // what has been committed safely to disk.
   bufferlist write_buf;  // write buffer.  flush_pos + write_buf.length() == write_pos.
 
   std::map<__s64, utime_t> pending_flush;  // start offsets and times for pending flushes
-  std::map<__s64, std::list<Context*> > waitfor_flush; // when flushed through given offset
+  std::map<__s64, std::list<Context*> > waitfor_ack;  // when flushed through given offset
+  std::map<__s64, std::list<Context*> > waitfor_safe; // when safe through given offset
 
   void _do_flush();
-  void _finish_flush(int r, __s64 start);
+  void _finish_flush(int r, __s64 start, bool safe);
   class C_Flush;
   friend class C_Flush;
 
@@ -189,7 +191,7 @@ public:
     inode(inode_), objecter(obj), filer(objecter), logger(l), 
     lock(lk), timer(*lk), delay_flush_event(0),
     state(STATE_UNDEF), error(0),
-    write_pos(0), flush_pos(0), ack_pos(0),
+    write_pos(0), flush_pos(0), ack_pos(0), safe_pos(0),
     read_pos(0), requested_pos(0), received_pos(0),
     fetch_len(fl), prefetch_from(pff),
     read_bl(0), on_read_finish(0), on_readable(0),
@@ -227,7 +229,7 @@ public:
 
   // write
   __s64 append_entry(bufferlist& bl, Context *onsync = 0);
-  void flush(Context *onsync = 0);
+  void flush(Context *onsync = 0, Context *onsafe = 0);
 
   // read
   void set_read_pos(__s64 p) {