]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
FileStore: add op_throttle_lock
authorSamuel Just <sam.just@inktank.com>
Wed, 10 Oct 2012 16:44:32 +0000 (09:44 -0700)
committerSamuel Just <sam.just@inktank.com>
Tue, 30 Oct 2012 20:31:10 +0000 (13:31 -0700)
Avoid using op_tp lock for the op throttle.

Signed-off-by: Samuel Just <sam.just@inktank.com>
src/os/FileStore.cc
src/os/FileStore.h

index 4c713a01c519219ed11ce5d4c89c6ba8346bc13d..52894af7d6d0fbf41b2967d93b183f19a44f97ed 100644 (file)
@@ -729,7 +729,9 @@ FileStore::FileStore(const std::string &base, const std::string &jdev, const cha
   timer(g_ceph_context, sync_entry_timeo_lock),
   stop(false), sync_thread(this),
   default_osr("default"),
-  op_queue_len(0), op_queue_bytes(0), op_finisher(g_ceph_context),
+  op_queue_len(0), op_queue_bytes(0),
+  op_throttle_lock("FileStore::op_throttle_lock"),
+  op_finisher(g_ceph_context),
   op_tp(g_ceph_context, "FileStore::op_tp", g_conf->filestore_op_threads, "filestore_op_threads"),
   op_wq(this, g_conf->filestore_op_thread_timeout,
        g_conf->filestore_op_thread_suicide_timeout, &op_tp),
@@ -2238,13 +2240,6 @@ void FileStore::queue_op(OpSequencer *osr, Op *o)
 }
 
 void FileStore::op_queue_reserve_throttle(Op *o)
-{
-  op_tp.lock();
-  _op_queue_reserve_throttle(o, "op_queue_reserve_throttle");
-  op_tp.unlock();
-}
-
-void FileStore::_op_queue_reserve_throttle(Op *o, const char *caller)
 {
   // Do not call while holding the journal lock!
   uint64_t max_ops = m_filestore_queue_max_ops;
@@ -2258,28 +2253,32 @@ void FileStore::_op_queue_reserve_throttle(Op *o, const char *caller)
   logger->set(l_os_oq_max_ops, max_ops);
   logger->set(l_os_oq_max_bytes, max_bytes);
 
-  while ((max_ops && (op_queue_len + 1) > max_ops) ||
-        (max_bytes && op_queue_bytes      // let single large ops through!
-         && (op_queue_bytes + o->bytes) > max_bytes)) {
-    dout(2) << caller << " waiting: "
-            << op_queue_len + 1 << " > " << max_ops << " ops || "
-            << op_queue_bytes + o->bytes << " > " << max_bytes << dendl;
-    op_tp.wait(op_throttle_cond);
-  }
+  {
+    Mutex::Locker l(op_throttle_lock);
+    while ((max_ops && (op_queue_len + 1) > max_ops) ||
+           (max_bytes && op_queue_bytes      // let single large ops through!
+             && (op_queue_bytes + o->bytes) > max_bytes)) {
+      dout(2) << "waiting " << op_queue_len + 1 << " > " << max_ops << " ops || "
+             << op_queue_bytes + o->bytes << " > " << max_bytes << dendl;
+      op_throttle_cond.Wait(op_throttle_lock);
+    }
 
-  op_queue_len++;
-  op_queue_bytes += o->bytes;
+    op_queue_len++;
+    op_queue_bytes += o->bytes;
+  }
 
   logger->set(l_os_oq_ops, op_queue_len);
   logger->set(l_os_oq_bytes, op_queue_bytes);
 }
 
-void FileStore::_op_queue_release_throttle(Op *o)
+void FileStore::op_queue_release_throttle(Op *o)
 {
-  // Called with op_tp lock!
-  op_queue_len--;
-  op_queue_bytes -= o->bytes;
-  op_throttle_cond.Signal();
+  {
+    Mutex::Locker l(op_throttle_lock);
+    op_queue_len--;
+    op_queue_bytes -= o->bytes;
+    op_throttle_cond.Signal();
+  }
 
   logger->set(l_os_oq_ops, op_queue_len);
   logger->set(l_os_oq_bytes, op_queue_bytes);
@@ -2309,7 +2308,7 @@ void FileStore::_finish_op(OpSequencer *osr)
   osr->apply_lock.Unlock();  // locked in _do_op
 
   // called with tp lock held
-  _op_queue_release_throttle(o);
+  op_queue_release_throttle(o);
 
   utime_t lat = ceph_clock_now(g_ceph_context);
   lat -= o->start;
index a525f7f704eda694b47e7d38258181965ebf08ac..faa0cd04d52c78ebec69a0f8d9b315969fcfb170 100644 (file)
@@ -202,6 +202,7 @@ private:
   deque<OpSequencer*> op_queue;
   uint64_t op_queue_len, op_queue_bytes;
   Cond op_throttle_cond;
+  Mutex op_throttle_lock;
   Finisher op_finisher;
 
   ThreadPool op_tp;
@@ -245,8 +246,7 @@ private:
               TrackedOpRef osd_op);
   void queue_op(OpSequencer *osr, Op *o);
   void op_queue_reserve_throttle(Op *o);
-  void _op_queue_reserve_throttle(Op *o, const char *caller = 0);
-  void _op_queue_release_throttle(Op *o);
+  void op_queue_release_throttle(Op *o);
   void _journaled_ahead(OpSequencer *osr, Op *o, Context *ondisk);
   friend class C_JournaledAhead;