]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
os/newstore: throttle over entire write lifecycle
authorSage Weil <sage@redhat.com>
Thu, 23 Apr 2015 21:51:51 +0000 (14:51 -0700)
committerSage Weil <sage@redhat.com>
Tue, 1 Sep 2015 17:39:40 +0000 (13:39 -0400)
Take a global throttle when we submit ops and release when they complete.
The first throttles cover the period from submit to commit, while the wal
ones also cover the async post-commit wal work.  The configs are additive
since the wal ones cover both periods; this should make them reasonably
idiot-proof.

Signed-off-by: Sage Weil <sage@redhat.com>
src/common/config_opts.h
src/os/newstore/NewStore.cc
src/os/newstore/NewStore.h

index e9020757aba8fe4cdb47e95bf0f781d6866fa8a1..f39d6435fdb6516bed505ab7c85bc7c42f3345aa 100644 (file)
@@ -796,14 +796,17 @@ OPTION(newstore_backend, OPT_STR, "rocksdb")
 OPTION(newstore_fail_eio, OPT_BOOL, true)
 OPTION(newstore_sync_io, OPT_BOOL, false)  // perform initial io synchronously
 OPTION(newstore_sync_transaction, OPT_BOOL, false)  // perform kv txn synchronously
+OPTION(newstore_sync_wal_apply, OPT_BOOL, true)     // perform initial wal work synchronously (possibly in combination with aio so we only *queue* ios)
 OPTION(newstore_fsync_threads, OPT_INT, 16)  // num threads calling fsync
 OPTION(newstore_fsync_thread_timeout, OPT_INT, 30) // thread timeout value
 OPTION(newstore_fsync_thread_suicide_timeout, OPT_INT, 120) // suicide timeout value
 OPTION(newstore_wal_threads, OPT_INT, 4)
 OPTION(newstore_wal_thread_timeout, OPT_INT, 30)
 OPTION(newstore_wal_thread_suicide_timeout, OPT_INT, 120)
-OPTION(newstore_wal_max_ops, OPT_U64, 64)
-OPTION(newstore_wal_max_bytes, OPT_U64, 64*1024*1024)
+OPTION(newstore_max_ops, OPT_U64, 512)
+OPTION(newstore_max_bytes, OPT_U64, 64*1024*1024)
+OPTION(newstore_wal_max_ops, OPT_U64, 512)
+OPTION(newstore_wal_max_bytes, OPT_U64, 128*1024*1024)
 OPTION(newstore_fid_prealloc, OPT_INT, 1024)
 OPTION(newstore_nid_prealloc, OPT_INT, 1024)
 OPTION(newstore_overlay_max_length, OPT_INT, 65536)
index d501943b99310436f32e7c72f302a1fd6214302d..a9ece6da05a50b29d7a4f2105871ec61965d2274 100644 (file)
@@ -584,6 +584,14 @@ NewStore::NewStore(CephContext *cct, const string& path)
     fid_lock("NewStore::fid_lock"),
     nid_lock("NewStore::nid_lock"),
     nid_max(0),
+    throttle_ops(cct, "newstore_max_ops", cct->_conf->newstore_max_ops),
+    throttle_bytes(cct, "newstore_max_bytes", cct->_conf->newstore_max_bytes),
+    throttle_wal_ops(cct, "newstore_wal_max_ops",
+                    cct->_conf->newstore_max_ops +
+                    cct->_conf->newstore_wal_max_ops),
+    throttle_wal_bytes(cct, "newstore_wal_max_bytes",
+                      cct->_conf->newstore_max_bytes +
+                      cct->_conf->newstore_wal_max_bytes),
     wal_lock("NewStore::wal_lock"),
     wal_seq(0),
     wal_tp(cct,
@@ -2088,7 +2096,11 @@ void NewStore::_txc_state_proc(TransContext *txc)
     case TransContext::STATE_KV_DONE:
       if (txc->wal_txn) {
        txc->state = TransContext::STATE_WAL_QUEUED;
-       wal_wq.queue(txc);
+       if (g_conf->newstore_sync_wal_apply) {
+         _wal_apply(txc);
+       } else {
+         wal_wq.queue(txc);
+       }
        return;
       }
       txc->state = TransContext::STATE_FINISHING;
@@ -2253,6 +2265,9 @@ void NewStore::_txc_finish_kv(TransContext *txc)
     finisher.queue(txc->oncommits.front());
     txc->oncommits.pop_front();
   }
+
+  throttle_ops.put(txc->ops);
+  throttle_bytes.put(txc->bytes);
 }
 
 void NewStore::_txc_finish(TransContext *txc)
@@ -2280,6 +2295,9 @@ void NewStore::_txc_finish(TransContext *txc)
     txc->removed_collections.pop_front();
   }
 
+  throttle_wal_ops.put(txc->ops);
+  throttle_wal_bytes.put(txc->bytes);
+
   OpSequencerRef osr = txc->osr;
   osr->qlock.Lock();
   txc->state = TransContext::STATE_DONE;
@@ -2402,8 +2420,6 @@ int NewStore::_wal_finish(TransContext *txc)
   wal_transaction_t& wt = *txc->wal_txn;
   dout(20) << __func__ << " txc " << " seq " << wt.seq << txc << dendl;
 
-  wal_wq.release_throttle(txc);
-
   string key;
   get_wal_key(wt.seq, &key);
   KeyValueDB::Transaction cleanup = db->get_transaction();
@@ -2584,10 +2600,6 @@ int NewStore::queue_transactions(
     tls, &onreadable, &ondisk, &onreadable_sync);
   int r;
 
-  // throttle on wal work
-  wal_wq.throttle(g_conf->newstore_wal_max_ops,
-                 g_conf->newstore_wal_max_bytes);
-
   // set up the sequencer
   OpSequencer *osr;
   if (!posr)
@@ -2610,12 +2622,19 @@ int NewStore::queue_transactions(
 
   for (list<Transaction*>::iterator p = tls.begin(); p != tls.end(); ++p) {
     (*p)->set_osr(osr);
+    txc->ops += (*p)->get_num_ops();
+    txc->bytes += (*p)->get_num_bytes();
     _txc_add_transaction(txc, *p);
   }
 
   r = _txc_finalize(osr, txc);
   assert(r == 0);
 
+  throttle_ops.get(txc->ops);
+  throttle_bytes.get(txc->bytes);
+  throttle_wal_ops.get(txc->ops);
+  throttle_wal_bytes.get(txc->bytes);
+
   // execute (start)
   _txc_state_proc(txc);
   return 0;
index fbe1cf0423e7b35e15373474020ae01a8971d10d..9f97122045fac067b93c5f3c8836cab3508d289d 100644 (file)
@@ -185,6 +185,8 @@ public:
     OpSequencerRef osr;
     boost::intrusive::list_member_hook<> sequencer_item;
 
+    uint64_t ops, bytes;
+
     list<fsync_item> fds;     ///< these fds need to be synced
     set<OnodeRef> onodes;     ///< these onodes need to be updated/written
     KeyValueDB::Transaction t; ///< then we will commit this
@@ -210,6 +212,8 @@ public:
     TransContext(OpSequencer *o)
       : state(STATE_PREPARE),
        osr(o),
+       ops(0),
+       bytes(0),
        oncommit(NULL),
        onreadable(NULL),
        onreadable_sync(NULL),
@@ -376,15 +380,11 @@ public:
   private:
     NewStore *store;
     wal_osr_queue_t wal_queue;
-    uint64_t ops, bytes;
-    Cond throttle_cond;
 
   public:
     WALWQ(NewStore *s, time_t ti, time_t sti, ThreadPool *tp)
       : ThreadPool::WorkQueue<TransContext>("NewStore::WALWQ", ti, sti, tp),
-       store(s),
-       ops(0),
-       bytes(0) {
+       store(s) {
     }
     bool _empty() {
       return wal_queue.empty();
@@ -394,8 +394,6 @@ public:
        wal_queue.push_back(*i->osr);
       }
       i->osr->wal_q.push_back(*i);
-      ++ops;
-      bytes += i->wal_txn->get_bytes();
       return true;
     }
     void _dequeue(TransContext *p) {
@@ -434,22 +432,6 @@ public:
       unlock();
       drain();
     }
-
-    void throttle(uint64_t max_ops, uint64_t max_bytes) {
-      Mutex& lock = get_lock();
-      Mutex::Locker l(lock);
-      while (ops > max_ops || bytes > max_bytes) {
-       throttle_cond.Wait(lock);
-      }
-    }
-
-    void release_throttle(TransContext *txc) {
-      lock();
-      --ops;
-      bytes -= txc->wal_txn->get_bytes();
-      throttle_cond.Signal();
-      unlock();
-    }
   };
 
   struct KVSyncThread : public Thread {
@@ -495,6 +477,9 @@ private:
   uint64_t nid_last;
   uint64_t nid_max;
 
+  Throttle throttle_ops, throttle_bytes;          ///< submit to commit
+  Throttle throttle_wal_ops, throttle_wal_bytes;  ///< submit to wal complete
+
   Mutex wal_lock;
   atomic64_t wal_seq;
   ThreadPool wal_tp;