]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
FileJournal: break writeq locking from queue_lock
authorSamuel Just <sam.just@inktank.com>
Mon, 15 Oct 2012 22:39:55 +0000 (15:39 -0700)
committerSamuel Just <sam.just@inktank.com>
Tue, 30 Oct 2012 20:31:10 +0000 (13:31 -0700)
This prevents the relatively long process of queueing
finishers from preventing op submission.

In submit_entry, we no longer check for full before placing
the write in the writeq, committed_thru should work anyway,
and we don't want to grab the required lock.

Signed-off-by: Samuel Just <sam.just@inktank.com>
src/os/FileJournal.cc
src/os/FileJournal.h

index 2ef43798c522e6cc8946254ed6734b6a39e8aff9..730dac6e8bbfeb78611ce1f070b1efc70819bfbf 100644 (file)
@@ -626,10 +626,10 @@ void FileJournal::stop_writer()
 #ifdef HAVE_LIBAIO
     Mutex::Locker q(aio_lock);
 #endif
-    Mutex::Locker p(queue_lock);
+    Mutex::Locker p(writeq_lock);
     write_stop = true;
     write_cond.Signal();
-    queue_cond.Signal();
+    writeq_cond.Signal();
 #ifdef HAVE_LIBAIO
     aio_cond.Signal();
     write_finish_cond.Signal();
@@ -829,22 +829,24 @@ void FileJournal::queue_completions_thru(uint64_t seq)
 {
   assert(queue_lock.is_locked());
   utime_t now = ceph_clock_now(g_ceph_context);
-  while (!completions.empty() &&
-        completions.front().seq <= seq) {
+  while (!completions_empty()) {
+    completion_item next = completion_peek_front();
+    if (next.seq > seq)
+      break;
+    completion_pop_front();
     utime_t lat = now;
-    lat -= completions.front().start;
+    lat -= next.start;
     dout(10) << "queue_completions_thru seq " << seq
-            << " queueing seq " << completions.front().seq
-            << " " << completions.front().finish
+            << " queueing seq " << next.seq
+            << " " << next.finish
             << " lat " << lat << dendl;
     if (logger) {
       logger->finc(l_os_j_lat, lat);
     }
-    if (completions.front().finish)
-      finisher->queue(completions.front().finish);
-    if (completions.front().tracked_op)
-      completions.front().tracked_op->mark_event("journaled_completion_queued");
-    completions.pop_front();
+    if (next.finish)
+      finisher->queue(next.finish);
+    if (next.tracked_op)
+      next.tracked_op->mark_event("journaled_completion_queued");
   }
   queue_cond.Signal();
 }
@@ -1084,7 +1086,7 @@ void FileJournal::flush()
   dout(5) << "waiting for completions to empty" << dendl;
   {
     Mutex::Locker l(queue_lock);
-    while (!completions.empty())
+    while (!completions_empty())
       queue_cond.Wait(queue_lock);
   }
   dout(5) << "flush waiting for finisher" << dendl;
@@ -1098,16 +1100,12 @@ void FileJournal::write_thread_entry()
   dout(10) << "write_thread_entry start" << dendl;
   while (1) {
     {
-      Mutex::Locker locker(queue_lock);
+      Mutex::Locker locker(writeq_lock);
       if (writeq.empty()) {
        if (write_stop)
          break;
        dout(20) << "write_thread_entry going to sleep" << dendl;
-       {
-         if (writeq.empty()) {
-           queue_cond.Wait(queue_lock);
-         }
-       }
+       writeq_cond.Wait(writeq_lock);
        dout(20) << "write_thread_entry woke up" << dendl;
        continue;
       }
@@ -1391,60 +1389,52 @@ void FileJournal::check_aio_completion()
 void FileJournal::submit_entry(uint64_t seq, bufferlist& e, int alignment,
                               Context *oncommit, TrackedOpRef osd_op)
 {
-  Mutex::Locker locker(queue_lock);  // ** lock **
-
   // dump on queue
   dout(5) << "submit_entry seq " << seq
-          << " len " << e.length()
-          << " (" << oncommit << ")" << dendl;
+         << " len " << e.length()
+         << " (" << oncommit << ")" << dendl;
   assert(e.length() > 0);
 
-  completions.push_back(
-    completion_item(
-      seq, oncommit, ceph_clock_now(g_ceph_context), osd_op));
-
-  if (full_state == FULL_NOTFULL) {
-    if (osd_op)
-      osd_op->mark_event("commit_queued_for_journal_write");
-    // queue and kick writer thread
-    dout(30) << "XXX throttle take " << e.length() << dendl;
-    throttle_ops.take(1);
-    throttle_bytes.take(e.length());
-
-    if (logger) {
-      logger->set(l_os_jq_max_ops, throttle_ops.get_max());
-      logger->set(l_os_jq_max_bytes, throttle_bytes.get_max());
-      logger->set(l_os_jq_ops, throttle_ops.get_current());
-      logger->set(l_os_jq_bytes, throttle_bytes.get_current());
-    }
+  dout(30) << "XXX throttle take " << e.length() << dendl;
+  throttle_ops.take(1);
+  throttle_bytes.take(e.length());
+  if (osd_op)
+    osd_op->mark_event("commit_queued_for_journal_write");
+  if (logger) {
+    logger->set(l_os_jq_max_ops, throttle_ops.get_max());
+    logger->set(l_os_jq_max_bytes, throttle_bytes.get_max());
+    logger->set(l_os_jq_ops, throttle_ops.get_current());
+    logger->set(l_os_jq_bytes, throttle_bytes.get_current());
+  }
 
+  {
+    Mutex::Locker l1(writeq_lock);  // ** lock **
+    Mutex::Locker l2(completions_lock);  // ** lock **
+    completions.push_back(
+      completion_item(
+       seq, oncommit, ceph_clock_now(g_ceph_context), osd_op));
     writeq.push_back(write_item(seq, e, alignment, osd_op));
-    queue_cond.Signal();
-  } else {
-    if (osd_op)
-      osd_op->mark_event("commit_blocked_by_journal_full");
-    // not journaling this.  restart writing no sooner than seq + 1.
-    dout(10) << " journal is/was full" << dendl;
+    writeq_cond.Signal();
   }
 }
 
 bool FileJournal::writeq_empty()
 {
-  Mutex::Locker locker(queue_lock);
+  Mutex::Locker locker(writeq_lock);
   return writeq.empty();
 }
 
 FileJournal::write_item &FileJournal::peek_write()
 {
   assert(write_lock.is_locked());
-  Mutex::Locker locker(queue_lock);
+  Mutex::Locker locker(writeq_lock);
   return writeq.front();
 }
 
 void FileJournal::pop_write()
 {
   assert(write_lock.is_locked());
-  Mutex::Locker locker(queue_lock);
+  Mutex::Locker locker(writeq_lock);
   writeq.pop_front();
 }
 
index 5b3e6ee9544ac9b1ffbac0f876fcb4f519f6c293..cd13330c468daa648377e6a3181c5155ed2bc4f1 100644 (file)
@@ -58,15 +58,36 @@ public:
     }
     write_item() : seq(0), alignment(0) {}
   };
+
   Mutex queue_lock;
   Cond queue_cond;
   uint64_t journaled_seq;
   bool plug_journal_completions;
+
+  Mutex writeq_lock;
+  Cond writeq_cond;
   deque<write_item> writeq;
-  deque<completion_item> completions;
   bool writeq_empty();
   write_item &peek_write();
   void pop_write();
+
+  Mutex completions_lock;
+  deque<completion_item> completions;
+  bool completions_empty() {
+    Mutex::Locker l(completions_lock);
+    return completions.empty();
+  }
+  completion_item completion_peek_front() {
+    Mutex::Locker l(completions_lock);
+    assert(!completions.empty());
+    return completions.front();
+  }
+  void completion_pop_front() {
+    Mutex::Locker l(completions_lock);
+    assert(!completions.empty());
+    completions.pop_front();
+  }
+
   void submit_entry(uint64_t seq, bufferlist& bl, int alignment,
                    Context *oncommit,
                    TrackedOpRef osd_op = TrackedOpRef());
@@ -295,6 +316,9 @@ private:
     queue_lock("FileJournal::queue_lock", false, true, false, g_ceph_context),
     journaled_seq(0),
     plug_journal_completions(false),
+    writeq_lock("FileJournal::writeq_lock", false, true, false, g_ceph_context),
+    completions_lock(
+      "FileJournal::completions_lock", false, true, false, g_ceph_context),
     fn(f),
     zero_buf(NULL),
     max_size(0), block_size(0),