]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
os/newstore: use a threadpool for applying wal events
authorSage Weil <sage@redhat.com>
Thu, 16 Apr 2015 22:01:20 +0000 (15:01 -0700)
committerSage Weil <sage@redhat.com>
Tue, 1 Sep 2015 17:39:38 +0000 (13:39 -0400)
Signed-off-by: Sage Weil <sage@redhat.com>
src/common/config_opts.h
src/os/newstore/NewStore.cc
src/os/newstore/NewStore.h

index 2c3d8ee52a6e0264bbc354d9e64cf3c6e7fddd9f..d9198b92a71702258da66ac47bca3cf282a6b5c4 100644 (file)
@@ -798,6 +798,9 @@ OPTION(newstore_sync_queue_transaction, OPT_BOOL, false)  // perform write synch
 OPTION(newstore_fsync_threads, OPT_INT, 16)  // num threads calling fsync
 OPTION(newstore_fsync_thread_timeout, OPT_INT, 30) // thread timeout value
 OPTION(newstore_fsync_thread_suicide_timeout, OPT_INT, 120) // suicide timeout value
+OPTION(newstore_wal_threads, OPT_INT, 2)
+OPTION(newstore_wal_thread_timeout, OPT_INT, 30)
+OPTION(newstore_wal_thread_suicide_timeout, OPT_INT, 120)
 OPTION(newstore_fid_prealloc, OPT_INT, 1024)
 OPTION(newstore_nid_prealloc, OPT_INT, 1024)
 OPTION(newstore_overlay_max_length, OPT_INT, 65536)
index 750f6dd6782d67680c8d8f0895f14fb3996c2911..53086f1f9359b81399c2b938547b55ea4ae9a096 100644 (file)
@@ -580,6 +580,14 @@ NewStore::NewStore(CephContext *cct, const string& path)
     nid_max(0),
     wal_lock("NewStore::wal_lock"),
     wal_seq(0),
+    wal_tp(cct,
+          "NewStore::wal_tp",
+          cct->_conf->newstore_wal_threads,
+          "newstore_wal_threads"),
+    wal_wq(this,
+            cct->_conf->newstore_wal_thread_timeout,
+            cct->_conf->newstore_wal_thread_suicide_timeout,
+            &wal_tp),
     finisher(cct),
     fsync_tp(cct,
             "NewStore::fsync_tp",
@@ -953,6 +961,7 @@ int NewStore::mount()
 
   finisher.start();
   fsync_tp.start();
+  wal_tp.start();
   kv_sync_thread.create();
 
   mounted = true;
@@ -981,6 +990,10 @@ int NewStore::umount()
   fsync_tp.stop();
   dout(20) << __func__ << " stopping kv thread" << dendl;
   _kv_stop();
+  dout(20) << __func__ << " draining wal_wq" << dendl;
+  wal_wq.drain();
+  dout(20) << __func__ << " stopping wal_tp" << dendl;
+  wal_tp.stop();
   dout(20) << __func__ << " draining finisher" << dendl;
   finisher.wait_for_empty();
   dout(20) << __func__ << " stopping finisher" << dendl;
@@ -2088,15 +2101,6 @@ void NewStore::_txc_submit_kv(TransContext *txc)
   kv_cond.SignalOne();
 }
 
-struct C_ApplyWAL : public Context {
-  NewStore *store;
-  NewStore::TransContext *txc;
-  C_ApplyWAL(NewStore *s, NewStore::TransContext *t) : store(s), txc(t) {}
-  void finish(int r) {
-    store->_apply_wal_transaction(txc);
-  }
-};
-
 void NewStore::_txc_finish_kv(TransContext *txc)
 {
   dout(20) << __func__ << " txc " << txc << dendl;
@@ -2125,7 +2129,7 @@ void NewStore::_txc_finish_kv(TransContext *txc)
     dout(20) << __func__ << " starting wal apply" << dendl;
     txc->state = TransContext::STATE_WAL_QUEUED;
     txc->osr->qlock.Unlock();
-    finisher.queue(new C_ApplyWAL(this, txc));
+    wal_wq.queue(txc);
   } else {
     txc->state = TransContext::STATE_FINISHING;
     txc->osr->qlock.Unlock();
index ba12cc6e3ecdc4f45761e0f1c69eec14659c53a6..d95452f37670c9fa10671c109c61bafeed8951bd 100644 (file)
@@ -190,6 +190,7 @@ public:
     list<Context*> oncommits;  ///< more commit completions
     list<CollectionRef> removed_collections; ///< colls we removed
 
+    boost::intrusive::list_member_hook<> wal_queue_item;
     wal_transaction_t *wal_txn; ///< wal transaction (if any)
     unsigned num_fsyncs_completed;
 
@@ -237,7 +238,6 @@ public:
     }
   };
 
-
   class OpSequencer : public Sequencer_impl {
   public:
     Mutex qlock;
@@ -250,11 +250,24 @@ public:
        &TransContext::sequencer_item> > q_list_t;
     q_list_t q;  ///< transactions
 
+    typedef boost::intrusive::list<
+      TransContext,
+      boost::intrusive::member_hook<
+       TransContext,
+       boost::intrusive::list_member_hook<>,
+       &TransContext::wal_queue_item> > wal_queue_t;
+    wal_queue_t wal_q; ///< transactions
+
+    boost::intrusive::list_member_hook<> wal_osr_queue_item;
+
     Sequencer *parent;
 
+    Mutex wal_apply_lock;
+
     OpSequencer()
       : qlock("NewStore::OpSequencer::qlock", false, false),
-       parent(NULL) {
+       parent(NULL),
+       wal_apply_lock("NewStore::OpSequencer::wal_apply_lock") {
     }
     ~OpSequencer() {
       assert(q.empty());
@@ -336,6 +349,75 @@ public:
     }
   };
 
+  class WALWQ : public ThreadPool::WorkQueue<TransContext> {
+    // We need to order WAL items within each Sequencer.  To do that,
+    // queue each txc under osr, and queue the osr's here.  When we
+    // dequeue an txc, requeue the osr if there are more pending, and
+    // do it at the end of the list so that the next thread does not
+    // get a conflicted txc.  Hold an osr mutex while doing the wal to
+    // preserve the ordering.
+  public:
+    typedef boost::intrusive::list<
+      OpSequencer,
+      boost::intrusive::member_hook<
+       OpSequencer,
+       boost::intrusive::list_member_hook<>,
+       &OpSequencer::wal_osr_queue_item> > wal_osr_queue_t;
+
+  private:
+    NewStore *store;
+    wal_osr_queue_t wal_queue;
+
+  public:
+    WALWQ(NewStore *s, time_t ti, time_t sti, ThreadPool *tp)
+      : ThreadPool::WorkQueue<TransContext>("NewStore::WALWQ", ti, sti, tp),
+       store(s) {
+    }
+    bool _empty() {
+      return wal_queue.empty();
+    }
+    bool _enqueue(TransContext *i) {
+      if (i->osr->wal_q.empty()) {
+       wal_queue.push_back(*i->osr);
+      }
+      i->osr->wal_q.push_back(*i);
+      return true;
+    }
+    void _dequeue(TransContext *p) {
+      assert(0 == "not needed, not implemented");
+    }
+    TransContext *_dequeue() {
+      if (wal_queue.empty())
+       return NULL;
+      OpSequencer *osr = &wal_queue.front();
+      TransContext *i = &osr->wal_q.front();
+      osr->wal_q.pop_front();
+      wal_queue.pop_front();
+      if (!osr->wal_q.empty()) {
+       // requeue at the end to minimize contention
+       wal_queue.push_back(*i->osr);
+      }
+      return i;
+    }
+    void _process(TransContext *i, ThreadPool::TPHandle &handle) {
+      // preserve wal ordering for this sequencer
+      Mutex::Locker l(i->osr->wal_apply_lock);
+      store->_apply_wal_transaction(i);
+    }
+    void _clear() {
+      assert(wal_queue.empty());
+    }
+
+    void flush() {
+      lock();
+      while (!wal_queue.empty()) {
+       _wait();
+      }
+      unlock();
+      drain();
+    }
+  };
+
   struct KVSyncThread : public Thread {
     NewStore *store;
     KVSyncThread(NewStore *s) : store(s) {}
@@ -371,6 +453,8 @@ private:
 
   Mutex wal_lock;
   atomic64_t wal_seq;
+  ThreadPool wal_tp;
+  WALWQ wal_wq;
 
   Finisher finisher;
   ThreadPool fsync_tp;