]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
FileStore: replace op_queue_throttle with op_queue_reserve_throttle
authorSamuel Just <samuel.just@dreamhost.com>
Tue, 22 Mar 2011 21:52:15 +0000 (14:52 -0700)
committerSage Weil <sage.weil@dreamhost.com>
Wed, 23 Mar 2011 23:53:28 +0000 (16:53 -0700)
Previously, queue_op would call op_queue_throttle while holding the
journal_lock.  op_queue_throttle, however, can sleep.

We fix the problem by:
1) Factor build_op out of queue_op
2) op_queue_throttle is now op_queue_reserve_throttle and takes an op as
an argument.  op_queue_reserve_throttle can be called before the journal
lock is taken.  This also avoids the race between calling throttle and
incrementing op_queue_bytes and op_queue_len.
3) queue_op now takes the op generated using build_op as an argument.
4) _journaled_ahead no longer needs to call throttle as
queue_transactions has already reserved space.

Signed-off-by: Samuel Just <samuel.just@dreamhost.com>
src/os/FileStore.cc
src/os/FileStore.h

index bd3aa3517150140dd4d54998ff7938ec9b482374..c3a7b2e983da83b42912b94a4e2528625e7d0200 100644 (file)
@@ -1493,8 +1493,8 @@ void FileStore::stop_logger()
 
 /// -----------------------------
 
-void FileStore::queue_op(OpSequencer *osr, uint64_t op_seq, list<Transaction*>& tls,
-                        Context *onreadable, Context *onreadable_sync)
+FileStore::Op *FileStore::build_op(list<Transaction*>& tls,
+                                  Context *onreadable, Context *onreadable_sync)
 {
   uint64_t bytes = 0, ops = 0;
   for (list<Transaction*>::iterator p = tls.begin();
@@ -1504,55 +1504,57 @@ void FileStore::queue_op(OpSequencer *osr, uint64_t op_seq, list<Transaction*>&
     ops += (*p)->get_num_ops();
   }
 
-  // initialize next_finish on first op
-  if (next_finish == 0)
-    next_finish = op_seq;
-
-  // mark apply start _now_, because we need to drain the entire apply
-  // queue during commit in order to put the store in a consistent
-  // state.
-  _op_apply_start(op_seq);
-
   Op *o = new Op;
-  o->op = op_seq;
   o->tls.swap(tls);
   o->onreadable = onreadable;
   o->onreadable_sync = onreadable_sync;
   o->ops = ops;
   o->bytes = bytes;
+  return o;
+}
+
+
 
+void FileStore::queue_op(OpSequencer *osr, Op *o)
+{
+  assert(journal_lock.is_locked());
+  // initialize next_finish on first op
+  if (next_finish == 0)
+    next_finish = op_seq;
+
+  // mark apply start _now_, because we need to drain the entire apply
+  // queue during commit in order to put the store in a consistent
+  // state.
+  _op_apply_start(o->op);
   op_tp.lock();
 
   osr->queue(o);
-  _op_queue_throttle("queue_op");
-
-  op_queue_len++;
-  op_queue_bytes += bytes;
 
   if (logger) {
     logger->inc(l_os_ops);
-    logger->inc(l_os_bytes, bytes);
+    logger->inc(l_os_bytes, o->bytes);
     logger->set(l_os_oq_ops, op_queue_len);
     logger->set(l_os_oq_bytes, op_queue_bytes);
   }
 
   op_tp.unlock();
 
-  dout(5) << "queue_op " << o << " seq " << op_seq << " " << bytes << " bytes"
+  dout(5) << "queue_op " << o << " seq " << o->op << " " << o->bytes << " bytes"
           << "   (queue has " << op_queue_len << " ops and " << op_queue_bytes << " bytes)"
           << dendl;
   op_wq.queue(osr);
 }
 
-void FileStore::op_queue_throttle()
+void FileStore::op_queue_reserve_throttle(Op *o)
 {
   op_tp.lock();
-  _op_queue_throttle("op_queue_throttle");
+  _op_queue_reserve_throttle(o, "op_queue_reserve_throttle");
   op_tp.unlock();
 }
 
-void FileStore::_op_queue_throttle(const char *caller)
+void FileStore::_op_queue_reserve_throttle(Op *o, const char *caller)
 {
+  // Do not call while holding the journal lock!
   uint64_t max_ops = g_conf.filestore_queue_max_ops;
   uint64_t max_bytes = g_conf.filestore_queue_max_bytes;
 
@@ -1566,13 +1568,24 @@ void FileStore::_op_queue_throttle(const char *caller)
     logger->set(l_os_oq_max_bytes, max_bytes);
   }
 
-  while ((max_ops && op_queue_len >= max_ops) ||
-        (max_bytes && op_queue_bytes >= max_bytes)) {
+  while ((max_ops && (op_queue_len + 1) > max_ops) ||
+        (max_bytes && (op_queue_bytes + o->bytes) > max_bytes)) {
     dout(2) << caller << " waiting: "
-            << op_queue_len << " > " << max_ops << " ops || "
-            << op_queue_bytes << " > " << max_bytes << dendl;
+            << op_queue_len + 1 << " > " << max_ops << " ops || "
+            << op_queue_bytes + o->bytes << " > " << max_bytes << dendl;
     op_tp.wait(op_throttle_cond);
   }
+
+  op_queue_len++;
+  op_queue_bytes += o->bytes;
+}
+
+void FileStore::_op_queue_release_throttle(Op *o)
+{
+  // Called with op_tp lock!
+  op_queue_len--;
+  op_queue_bytes -= o->bytes;
+  op_throttle_cond.Signal();
 }
 
 void FileStore::_do_op(OpSequencer *osr)
@@ -1599,9 +1612,7 @@ void FileStore::_finish_op(OpSequencer *osr)
   osr->apply_lock.Unlock();  // locked in _do_op
 
   // called with tp lock held
-  op_queue_len--;
-  op_queue_bytes -= o->bytes;
-  op_throttle_cond.Signal();
+  _op_queue_release_throttle(o);
 
   if (logger) {
     logger->inc(l_os_readable_ops);
@@ -1644,16 +1655,13 @@ void FileStore::_finish_op(OpSequencer *osr)
 struct C_JournaledAhead : public Context {
   FileStore *fs;
   FileStore::OpSequencer *osr;
-  uint64_t op;
-  list<ObjectStore::Transaction*> tls;
-  Context *onreadable, *onreadable_sync;
+  FileStore::Op *o;
   Context *ondisk;
 
-  C_JournaledAhead(FileStore *f, FileStore::OpSequencer *os, uint64_t o, list<ObjectStore::Transaction*>& t,
-                  Context *onr, Context *ond, Context *onrsync) :
-    fs(f), osr(os), op(o), tls(t), onreadable(onr), onreadable_sync(onrsync), ondisk(ond) { }
+  C_JournaledAhead(FileStore *f, FileStore::OpSequencer *os, FileStore::Op *o, Context *ondisk):
+    fs(f), osr(os), o(o), ondisk(ondisk) { }
   void finish(int r) {
-    fs->_journaled_ahead(osr, op, tls, onreadable, ondisk, onreadable_sync);
+    fs->_journaled_ahead(osr, o, ondisk);
   }
 };
 
@@ -1687,40 +1695,29 @@ int FileStore::queue_transactions(Sequencer *posr, list<Transaction*> &tls,
     //logger->inc(l_os_in_bytes, 1); 
   }
 
-  if (journal && journal->is_writeable()) {
+  if (journal && journal->is_writeable() && !g_conf.filestore_journal_trailing) {
+    Op *o = build_op(tls, onreadable, onreadable_sync);
+    op_queue_reserve_throttle(o);
+    journal->throttle();
+    o->op = op_submit_start();
     if (g_conf.filestore_journal_parallel) {
-
-      // FIXME: these throttle blocks can build up many threads, and
-      // then let them all (too many!)  through when some space is
-      // available.
-
-      journal->throttle();   // make sure we're not ahead of the jouranl
-      op_queue_throttle();   // make sure the journal isn't getting ahead of our op queue.
-
-      uint64_t op = op_submit_start();
-      dout(5) << "queue_transactions (parallel) " << op << " " << tls << dendl;
+      dout(5) << "queue_transactions (parallel) " << o->op << " " << tls << dendl;
       
-      _op_journal_transactions(tls, op, ondisk);
+      _op_journal_transactions(tls, o->op, ondisk);
       
       // queue inside journal lock, to preserve ordering
-      queue_op(osr, op, tls, onreadable, onreadable_sync);
+      queue_op(osr, o);
+    } else if (g_conf.filestore_journal_writeahead) {
+      dout(5) << "queue_transactions (writeahead) " << o->op << " " << o->tls << dendl;
       
-      op_submit_finish(op);
-      return 0;
-    }
-    else if (g_conf.filestore_journal_writeahead) {
-      
-      journal->throttle();   // make sure we're not ahead of the journal
-      op_queue_throttle();   // make sure the journal isn't getting ahead of our op queue.
-
-      uint64_t op = op_submit_start();
-      dout(5) << "queue_transactions (writeahead) " << op << " " << tls << dendl;
-      osr->queue_journal(op);
-      _op_journal_transactions(tls, op,
-                              new C_JournaledAhead(this, osr, op, tls, onreadable, ondisk, onreadable_sync));
-      op_submit_finish(op);
-      return 0;
+      osr->queue_journal(o->op);
+
+      _op_journal_transactions(tls, o->op, new C_JournaledAhead(this, osr, o, ondisk));
+    } else {
+      assert(0);
     }
+    op_submit_finish(o->op);
+    return 0;
   }
 
   uint64_t op = op_submit_start();
@@ -1754,19 +1751,13 @@ int FileStore::queue_transactions(Sequencer *posr, list<Transaction*> &tls,
   return r;
 }
 
-void FileStore::_journaled_ahead(OpSequencer *osr, uint64_t op,
-                                list<Transaction*> &tls,
-                                Context *onreadable, Context *ondisk,
-                                Context *onreadable_sync)
+void FileStore::_journaled_ahead(OpSequencer *osr, Op *o, Context *ondisk)
 {
-  dout(5) << "_journaled_ahead " << op << " " << tls << dendl;
-
-  op_queue_throttle();
-
+  dout(5) << "_journaled_ahead " << o->op << " " << o->tls << dendl;
 
   // this should queue in order because the journal does it's completions in order.
   journal_lock.Lock();
-  queue_op(osr, op, tls, onreadable, onreadable_sync);
+  queue_op(osr, o);
   journal_lock.Unlock();
 
   osr->dequeue_journal();
index 20af722639e0e45dbcdd8d9326b788aa8ea0a4cc..4c961fa6f7b37929060b53ad75370392403f2e6c 100644 (file)
@@ -206,11 +206,13 @@ class FileStore : public JournalingObjectStore {
 
   void _do_op(OpSequencer *o);
   void _finish_op(OpSequencer *o);
-  void queue_op(OpSequencer *osr, uint64_t op, list<Transaction*>& tls, Context *onreadable, Context *onreadable_sync);
-  void op_queue_throttle();
-  void _op_queue_throttle(const char *caller = 0);
-  void _journaled_ahead(OpSequencer *osr, uint64_t op, list<Transaction*> &tls,
-                       Context *onreadable, Context *ondisk, Context *onreadable_sync);
+  Op *build_op(list<Transaction*>& tls,
+              Context *onreadable, Context *onreadable_sync);
+  void queue_op(OpSequencer *osr, Op *o);
+  void op_queue_reserve_throttle(Op *o);
+  void _op_queue_reserve_throttle(Op *o, const char *caller = 0);
+  void _op_queue_release_throttle(Op *o);
+  void _journaled_ahead(OpSequencer *osr, Op *o, Context *ondisk);
   friend class C_JournaledAhead;
 
   // flusher thread