From 7e1af1e616a4decdbb9bb634cd18b610cd0b5d4e Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Thu, 16 Apr 2015 15:01:20 -0700 Subject: [PATCH] os/newstore: use a threadpool for applying wal events Signed-off-by: Sage Weil --- src/common/config_opts.h | 3 ++ src/os/newstore/NewStore.cc | 24 +++++----- src/os/newstore/NewStore.h | 88 ++++++++++++++++++++++++++++++++++++- 3 files changed, 103 insertions(+), 12 deletions(-) diff --git a/src/common/config_opts.h b/src/common/config_opts.h index 2c3d8ee52a6e0..d9198b92a7170 100644 --- a/src/common/config_opts.h +++ b/src/common/config_opts.h @@ -798,6 +798,9 @@ OPTION(newstore_sync_queue_transaction, OPT_BOOL, false) // perform write synch OPTION(newstore_fsync_threads, OPT_INT, 16) // num threads calling fsync OPTION(newstore_fsync_thread_timeout, OPT_INT, 30) // thread timeout value OPTION(newstore_fsync_thread_suicide_timeout, OPT_INT, 120) // suicide timeout value +OPTION(newstore_wal_threads, OPT_INT, 2) +OPTION(newstore_wal_thread_timeout, OPT_INT, 30) +OPTION(newstore_wal_thread_suicide_timeout, OPT_INT, 120) OPTION(newstore_fid_prealloc, OPT_INT, 1024) OPTION(newstore_nid_prealloc, OPT_INT, 1024) OPTION(newstore_overlay_max_length, OPT_INT, 65536) diff --git a/src/os/newstore/NewStore.cc b/src/os/newstore/NewStore.cc index 750f6dd6782d6..53086f1f9359b 100644 --- a/src/os/newstore/NewStore.cc +++ b/src/os/newstore/NewStore.cc @@ -580,6 +580,14 @@ NewStore::NewStore(CephContext *cct, const string& path) nid_max(0), wal_lock("NewStore::wal_lock"), wal_seq(0), + wal_tp(cct, + "NewStore::wal_tp", + cct->_conf->newstore_wal_threads, + "newstore_wal_threads"), + wal_wq(this, + cct->_conf->newstore_wal_thread_timeout, + cct->_conf->newstore_wal_thread_suicide_timeout, + &wal_tp), finisher(cct), fsync_tp(cct, "NewStore::fsync_tp", @@ -953,6 +961,7 @@ int NewStore::mount() finisher.start(); fsync_tp.start(); + wal_tp.start(); kv_sync_thread.create(); mounted = true; @@ -981,6 +990,10 @@ int NewStore::umount() fsync_tp.stop(); dout(20) << __func__ << " stopping kv thread" << dendl; _kv_stop(); + dout(20) << __func__ << " draining wal_wq" << dendl; + wal_wq.drain(); + dout(20) << __func__ << " stopping wal_tp" << dendl; + wal_tp.stop(); dout(20) << __func__ << " draining finisher" << dendl; finisher.wait_for_empty(); dout(20) << __func__ << " stopping finisher" << dendl; @@ -2088,15 +2101,6 @@ void NewStore::_txc_submit_kv(TransContext *txc) kv_cond.SignalOne(); } -struct C_ApplyWAL : public Context { - NewStore *store; - NewStore::TransContext *txc; - C_ApplyWAL(NewStore *s, NewStore::TransContext *t) : store(s), txc(t) {} - void finish(int r) { - store->_apply_wal_transaction(txc); - } -}; - void NewStore::_txc_finish_kv(TransContext *txc) { dout(20) << __func__ << " txc " << txc << dendl; @@ -2125,7 +2129,7 @@ void NewStore::_txc_finish_kv(TransContext *txc) dout(20) << __func__ << " starting wal apply" << dendl; txc->state = TransContext::STATE_WAL_QUEUED; txc->osr->qlock.Unlock(); - finisher.queue(new C_ApplyWAL(this, txc)); + wal_wq.queue(txc); } else { txc->state = TransContext::STATE_FINISHING; txc->osr->qlock.Unlock(); diff --git a/src/os/newstore/NewStore.h b/src/os/newstore/NewStore.h index ba12cc6e3ecdc..d95452f37670c 100644 --- a/src/os/newstore/NewStore.h +++ b/src/os/newstore/NewStore.h @@ -190,6 +190,7 @@ public: list oncommits; ///< more commit completions list removed_collections; ///< colls we removed + boost::intrusive::list_member_hook<> wal_queue_item; wal_transaction_t *wal_txn; ///< wal transaction (if any) unsigned num_fsyncs_completed; @@ -237,7 +238,6 @@ public: } }; - class OpSequencer : public Sequencer_impl { public: Mutex qlock; @@ -250,11 +250,24 @@ public: &TransContext::sequencer_item> > q_list_t; q_list_t q; ///< transactions + typedef boost::intrusive::list< + TransContext, + boost::intrusive::member_hook< + TransContext, + boost::intrusive::list_member_hook<>, + &TransContext::wal_queue_item> > wal_queue_t; + wal_queue_t wal_q; ///< transactions + + boost::intrusive::list_member_hook<> wal_osr_queue_item; + Sequencer *parent; + Mutex wal_apply_lock; + OpSequencer() : qlock("NewStore::OpSequencer::qlock", false, false), - parent(NULL) { + parent(NULL), + wal_apply_lock("NewStore::OpSequencer::wal_apply_lock") { } ~OpSequencer() { assert(q.empty()); @@ -336,6 +349,75 @@ public: } }; + class WALWQ : public ThreadPool::WorkQueue { + // We need to order WAL items within each Sequencer. To do that, + // queue each txc under osr, and queue the osr's here. When we + // dequeue an txc, requeue the osr if there are more pending, and + // do it at the end of the list so that the next thread does not + // get a conflicted txc. Hold an osr mutex while doing the wal to + // preserve the ordering. + public: + typedef boost::intrusive::list< + OpSequencer, + boost::intrusive::member_hook< + OpSequencer, + boost::intrusive::list_member_hook<>, + &OpSequencer::wal_osr_queue_item> > wal_osr_queue_t; + + private: + NewStore *store; + wal_osr_queue_t wal_queue; + + public: + WALWQ(NewStore *s, time_t ti, time_t sti, ThreadPool *tp) + : ThreadPool::WorkQueue("NewStore::WALWQ", ti, sti, tp), + store(s) { + } + bool _empty() { + return wal_queue.empty(); + } + bool _enqueue(TransContext *i) { + if (i->osr->wal_q.empty()) { + wal_queue.push_back(*i->osr); + } + i->osr->wal_q.push_back(*i); + return true; + } + void _dequeue(TransContext *p) { + assert(0 == "not needed, not implemented"); + } + TransContext *_dequeue() { + if (wal_queue.empty()) + return NULL; + OpSequencer *osr = &wal_queue.front(); + TransContext *i = &osr->wal_q.front(); + osr->wal_q.pop_front(); + wal_queue.pop_front(); + if (!osr->wal_q.empty()) { + // requeue at the end to minimize contention + wal_queue.push_back(*i->osr); + } + return i; + } + void _process(TransContext *i, ThreadPool::TPHandle &handle) { + // preserve wal ordering for this sequencer + Mutex::Locker l(i->osr->wal_apply_lock); + store->_apply_wal_transaction(i); + } + void _clear() { + assert(wal_queue.empty()); + } + + void flush() { + lock(); + while (!wal_queue.empty()) { + _wait(); + } + unlock(); + drain(); + } + }; + struct KVSyncThread : public Thread { NewStore *store; KVSyncThread(NewStore *s) : store(s) {} @@ -371,6 +453,8 @@ private: Mutex wal_lock; atomic64_t wal_seq; + ThreadPool wal_tp; + WALWQ wal_wq; Finisher finisher; ThreadPool fsync_tp; -- 2.39.5