]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
journal: throttle journal
authorSage Weil <sage@newdream.net>
Tue, 2 Feb 2010 21:50:26 +0000 (13:50 -0800)
committerSage Weil <sage@newdream.net>
Tue, 2 Feb 2010 23:08:04 +0000 (15:08 -0800)
src/Makefile.am
src/common/Throttle.h [new file with mode: 0644]
src/config.cc
src/config.h
src/os/FileJournal.cc
src/os/FileJournal.h
src/os/FileStore.cc
src/os/Journal.h

index 214e0fedc0530398a8cd2479b9feccd88fe041ed..fb150ffc53ebaad3bbe67d3d721ad7539664ec7b 100644 (file)
@@ -469,6 +469,7 @@ noinst_HEADERS = \
         common/Semaphore.h\
        common/Spinlock.h\
         common/Thread.h\
+        common/Throttle.h\
         common/Timer.h\
         common/tls.h\
        common/WorkQueue.h\
diff --git a/src/common/Throttle.h b/src/common/Throttle.h
new file mode 100644 (file)
index 0000000..69e6aa3
--- /dev/null
@@ -0,0 +1,68 @@
+#ifndef _CEPH_THROTTLE_H
+#define _CEPH_THROTTLE_H
+
+#include "Mutex.h"
+#include "Cond.h"
+
+class Throttle {
+  __u64 count, want, max;
+  Mutex lock;
+  Cond cond;
+  
+public:
+  Throttle(__u64 m = 0) : count(0), max(m),
+                         lock("Throttle::lock") {}
+
+private:
+  void _reset_max(__u64 m) {
+    if (m) {
+      if (m < max)
+       cond.SignalAll();
+      max = m;
+    }
+  }
+  bool _wait(__u64 c) {
+    bool waited = false;
+    while (max && count + c > max) {
+      waited = true;
+      cond.Wait(lock);
+    }
+    return waited;
+  }
+
+public:
+  __u64 get_current() {
+    Mutex::Locker l(lock);
+    return count;
+  }
+
+  bool wait(__u64 m = 0) {
+    Mutex::Locker l(lock);
+    _reset_max(m);
+    return _wait(0);
+  }
+
+  __u64 take(__u64 c = 1) {
+    Mutex::Locker l(lock);
+    count += c;
+    return count;
+  }
+
+  bool get(__u64 c = 1, __u64 m = 0) {
+    Mutex::Locker l(lock);
+    _reset_max(m);
+    bool waited = _wait(c);
+    count += c;
+    return waited;
+  }
+
+  __u64 put(__u64 c = 1) {
+    Mutex::Locker l(lock);
+    cond.SignalAll();
+    count -= c;
+    return count;
+  }
+};
+
+
+#endif
index 94660a169c88c81dacd4e1cbf3b3f5f4fc5ed969..c49af9e2b86804c71ad2fa2597f04270a4c9b4da 100644 (file)
@@ -548,6 +548,8 @@ static struct config_option config_optionsp[] = {
        OPTION(journal_block_align, 0, OPT_BOOL, true),
        OPTION(journal_max_write_bytes, 0, OPT_INT, 0),
        OPTION(journal_max_write_entries, 0, OPT_INT, 100),
+       OPTION(journal_queue_max_ops, 0, OPT_INT, 500),
+       OPTION(journal_queue_max_bytes, 0, OPT_INT, 100 << 20),
        OPTION(bdev_lock, 0, OPT_BOOL, true),
        OPTION(bdev_iothreads, 0, OPT_INT, 1),         // number of ios to queue with kernel
        OPTION(bdev_idle_kick_after_ms, 0, OPT_INT, 100),  // ms
index 874a36512d0fb60163853460255a735e3acb3aad..7a3dc51441343b1031ef1ba34a2733368f6f54e7 100644 (file)
@@ -363,6 +363,8 @@ struct md_config_t {
   bool journal_block_align;
   int journal_max_write_bytes;
   int journal_max_write_entries;
+  int journal_queue_max_ops;
+  int journal_queue_max_bytes;
 
   // block device
   bool  bdev_lock;
index 73bc28d4e09a5c14940ec5443a90ed7201a4c973..6098be16f6a2f49154a4386fdb2754e06e5d1954 100644 (file)
@@ -356,6 +356,8 @@ bool FileJournal::check_for_full(__u64 seq, off64_t pos, off64_t size)
       writing_seq.push_back(writeq.front().seq);
       writing_fin.push_back(writeq.front().fin);
     }
+    throttle_ops.put(1);
+    throttle_bytes.put(writeq.front().bl.length());
     writeq.pop_front();
   }  
   print_header();
@@ -363,7 +365,7 @@ bool FileJournal::check_for_full(__u64 seq, off64_t pos, off64_t size)
 
 }
 
-void FileJournal::prepare_multi_write(bufferlist& bl)
+void FileJournal::prepare_multi_write(bufferlist& bl, __u64& orig_ops, __u64& orig_bytes)
 {
   // gather queued writes
   off64_t queue_pos = write_pos;
@@ -375,10 +377,12 @@ void FileJournal::prepare_multi_write(bufferlist& bl)
     return;
   
   while (!writeq.empty()) {
-    bool r = prepare_single_write(bl, queue_pos);
+    bool r = prepare_single_write(bl, queue_pos, orig_bytes);
     if (!r)
       break;
 
+    orig_ops++;
+
     if (eleft) {
       if (--eleft == 0) {
        dout(20) << "prepare_multi_write hit max events per write " << g_conf.journal_max_write_entries << dendl;
@@ -396,7 +400,7 @@ void FileJournal::prepare_multi_write(bufferlist& bl)
   //assert(write_pos + bl.length() == queue_pos);
 }
 
-bool FileJournal::prepare_single_write(bufferlist& bl, off64_t& queue_pos)
+bool FileJournal::prepare_single_write(bufferlist& bl, off64_t& queue_pos, __u64& orig_size)
 {
   // grab next item
   __u64 seq = writeq.front().seq;
@@ -407,6 +411,8 @@ bool FileJournal::prepare_single_write(bufferlist& bl, off64_t& queue_pos)
   if (!check_for_full(seq, queue_pos, size))
     return false;
 
+  orig_size += ebl.length();
+
   // add to write buffer
   dout(15) << "prepare_single_write will write " << queue_pos << " : seq " << seq
           << " len " << ebl.length() << " -> " << size
@@ -598,9 +604,19 @@ void FileJournal::write_thread_entry()
       continue;
     }
     
+    __u64 orig_ops = 0;
+    __u64 orig_bytes = 0;
+
     bufferlist bl;
-    prepare_multi_write(bl);
+    prepare_multi_write(bl, orig_ops, orig_bytes);
     do_write(bl);
+    
+    __u64 new_ops = throttle_ops.put(orig_ops);
+    __u64 new_bytes = throttle_bytes.put(orig_bytes);
+    dout(10) << "write_thread throttle finished " << orig_ops << " ops and " 
+            << orig_bytes << " bytes, now "
+            << new_ops << " ops and " << new_bytes << " bytes"
+            << dendl;
   }
   write_empty_cond.Signal();
   write_lock.Unlock();
@@ -616,6 +632,9 @@ void FileJournal::submit_entry(__u64 seq, bufferlist& e, Context *oncommit)
   dout(10) << "submit_entry seq " << seq
           << " len " << e.length()
           << " (" << oncommit << ")" << dendl;
+
+  throttle_ops.take(1);
+  throttle_bytes.take(e.length());
   
   if (!full_commit_seq && full_restart_seq && 
       seq >= full_restart_seq) {
@@ -690,6 +709,8 @@ void FileJournal::committed_thru(__u64 seq)
             << dendl;
     if (writeq.front().fin)
       finisher->queue(writeq.front().fin);
+    throttle_ops.put(1);
+    throttle_bytes.put(writeq.front().bl.length());
     writeq.pop_front();  
   }
   
@@ -804,3 +825,11 @@ bool FileJournal::read_entry(bufferlist& bl, __u64& seq)
  
   return true;
 }
+
+void FileJournal::throttle()
+{
+  if (throttle_ops.wait(g_conf.journal_queue_max_ops))
+    dout(1) << "throttle: waited for ops" << dendl;
+  if (throttle_bytes.wait(g_conf.journal_queue_max_bytes))
+    dout(1) << "throttle: waited for bytes" << dendl;
+}
index b1bd6342e2ca71df996ba9446ad8f0c4de2dc50f..1674a02a90635cf2e92ff57a15da94a77a32fb5a 100644 (file)
@@ -23,6 +23,7 @@ using std::deque;
 #include "common/Cond.h"
 #include "common/Mutex.h"
 #include "common/Thread.h"
+#include "common/Throttle.h"
 
 class FileJournal : public Journal {
 public:
@@ -101,6 +102,9 @@ private:
   };
   deque<write_item> writeq;
   
+  // throttle
+  Throttle throttle_ops, throttle_bytes;
+
   // write thread
   Mutex write_lock;
   Cond write_cond, write_empty_cond;
@@ -117,9 +121,8 @@ private:
   void write_thread_entry();
 
   bool check_for_full(__u64 seq, off64_t pos, off64_t size);
-  void prepare_multi_write(bufferlist& bl);
-  bool prepare_single_write(bufferlist& bl, off64_t& queue_pos);
-  bool prepare_single_dio_write(bufferlist& bl, off64_t& queue_pos);
+  void prepare_multi_write(bufferlist& bl, __u64& orig_ops, __u64& orig_bytse);
+  bool prepare_single_write(bufferlist& bl, off64_t& queue_pos, __u64& orig_bytes);
   void do_write(bufferlist& bl);
 
   void write_bl(off64_t& pos, bufferlist& bl);
@@ -163,6 +166,8 @@ private:
 
   void flush();
 
+  void throttle();
+
   bool is_writeable() {
     return read_pos == 0;
   }
index c1fe16e1e49cc381517a1753f818c6e993f4eaf9..221c4cee176791d48933e68ea79c034e45fff838 100644 (file)
@@ -633,6 +633,9 @@ int FileStore::queue_transactions(list<Transaction*> &tls,
 {
   if (journal && journal->is_writeable()) {
     if (g_conf.filestore_journal_parallel) {
+
+      journal->throttle();
+
       __u64 op = op_journal_start(0);
       dout(10) << "queue_transactions (parallel) " << op << " " << tls << dendl;
       
index 0584e66a2ba516fefc39b08bb0ddce26157f3666..4b2051303c71a9074b1f266f40b0599537a6815c 100644 (file)
@@ -38,6 +38,7 @@ public:
   virtual void close() = 0;
 
   virtual void flush() = 0;
+  virtual void throttle() = 0;
 
   void set_wait_on_full(bool b) { wait_on_full = b; }