]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
filestore: throttle op queue
authorSage Weil <sage@newdream.net>
Tue, 2 Feb 2010 21:29:56 +0000 (13:29 -0800)
committerSage Weil <sage@newdream.net>
Tue, 2 Feb 2010 22:12:44 +0000 (14:12 -0800)
src/config.cc
src/config.h
src/os/FileStore.cc
src/os/FileStore.h

index a726d9e18fe8ec63ea4dd0a67aad89a558f192d2..94660a169c88c81dacd4e1cbf3b3f5f4fc5ed969 100644 (file)
@@ -531,6 +531,8 @@ static struct config_option config_optionsp[] = {
        OPTION(filestore_sync_flush, 0, OPT_BOOL, false),
        OPTION(filestore_journal_parallel, 0, OPT_BOOL, false),
        OPTION(filestore_journal_writeahead, 0, OPT_BOOL, false),
+       OPTION(filestore_queue_max_ops, 0, OPT_INT, 500),
+       OPTION(filestore_queue_max_bytes, 0, OPT_INT, 100 << 20),
        OPTION(ebofs, 0, OPT_BOOL, false),
        OPTION(ebofs_cloneable, 0, OPT_BOOL, true),
        OPTION(ebofs_verify, 0, OPT_BOOL, false),
index 2570afacdd568f25b38f7eb5824b212b930794c8..874a36512d0fb60163853460255a735e3acb3aad 100644 (file)
@@ -342,6 +342,8 @@ struct md_config_t {
   bool filestore_sync_flush;
   bool filestore_journal_parallel;
   bool filestore_journal_writeahead;
+  int filestore_queue_max_ops;
+  int filestore_queue_max_bytes;
   
   // ebofs
   bool  ebofs;
index e96a56cbcaf0e3e4d0a2c6b241e52b5b4a8142b2..c1fe16e1e49cc381517a1753f818c6e993f4eaf9 100644 (file)
@@ -534,13 +534,35 @@ int FileStore::umount()
 void FileStore::queue_op(__u64 op_seq, list<Transaction*>& tls, Context *onreadable,
                         Context *oncommit)
 {
-  op_lock.Lock();
   Op *o = new Op;
-  dout(10) << "queue_op " << o << " " << op_seq << dendl;
+
+  __u64 bytes = 0, ops = 0;
+  for (list<Transaction*>::iterator p = tls.begin();
+       p != tls.end();
+       p++) {
+    bytes += (*p)->get_num_bytes();
+    ops += (*p)->get_num_ops();
+  }
+
+  op_lock.Lock();
+
+  while ((g_conf.filestore_queue_max_ops && op_queue_len >= (unsigned)g_conf.filestore_queue_max_ops) ||
+        (g_conf.filestore_queue_max_bytes && op_queue_bytes >= (unsigned)g_conf.filestore_queue_max_bytes)) {
+    dout(2) << "queue_op " << o << " throttle: "
+            << op_queue_len << " > " << g_conf.filestore_queue_max_ops << " ops || "
+            << op_queue_bytes << " > " << g_conf.filestore_queue_max_bytes << dendl;
+    op_throttle_cond.Wait(op_lock);
+  }
+  op_queue_len++;
+  op_queue_bytes += bytes;
+
+  dout(10) << "queue_op " << o << " seq " << op_seq << " " << bytes << " bytes" << dendl;
   o->op = op_seq;
   o->tls.swap(tls);
   o->onreadable = onreadable;
   o->oncommit = oncommit;
+  o->ops = ops;
+  o->bytes = bytes;
   op_queue.push_back(o);
   op_cond.Signal();
   op_lock.Unlock();
@@ -564,9 +586,14 @@ void FileStore::op_entry()
 
       op_finisher.queue(o->onreadable, r);
 
-      delete o;
-
       op_lock.Lock();
+
+      op_queue_len--;
+      op_queue_bytes -= o->bytes;
+      op_throttle_cond.Signal();
+      dout(10) << "op_entry finished " << o->bytes << " bytes, queue now "
+              << op_queue_len << " ops, " << op_queue_bytes << " bytes" << dendl;
+      delete o;
     }
     op_empty_cond.Signal();
     if (stop)
index fcb8d2bbb7166b067d40c6ee63e38b0a3ea9e766..a467450c424c7d4f29c3044bab2c02331039140d 100644 (file)
@@ -85,12 +85,14 @@ class FileStore : public JournalingObjectStore {
     __u64 op;
     list<Transaction*> tls;
     Context *onreadable, *oncommit;
+    __u64 ops, bytes;
   };
 
   Finisher op_finisher;
   Mutex op_lock;
-  Cond op_cond, op_empty_cond;
+  Cond op_cond, op_empty_cond, op_throttle_cond;
   list<Op*> op_queue;
+  __u64 op_queue_len, op_queue_bytes;
   void op_entry();
   struct OpThread : public Thread {
     FileStore *fs;
@@ -130,7 +132,7 @@ class FileStore : public JournalingObjectStore {
     collections(this), fake_collections(false),
     lock("FileStore::lock"),
     sync_epoch(0), stop(false), sync_thread(this),
-    op_lock("FileStore::op_lock"), op_thread(this),
+    op_lock("FileStore::op_lock"), op_queue_len(0), op_queue_bytes(0), op_thread(this),
     flusher_queue_len(0), flusher_thread(this) { }
 
   int mount();