OPTION(newstore_fsync_threads, OPT_INT, 16) // num threads calling fsync
OPTION(newstore_fsync_thread_timeout, OPT_INT, 30) // thread timeout value
OPTION(newstore_fsync_thread_suicide_timeout, OPT_INT, 120) // suicide timeout value
+OPTION(newstore_wal_threads, OPT_INT, 2)
+OPTION(newstore_wal_thread_timeout, OPT_INT, 30)
+OPTION(newstore_wal_thread_suicide_timeout, OPT_INT, 120)
OPTION(newstore_fid_prealloc, OPT_INT, 1024)
OPTION(newstore_nid_prealloc, OPT_INT, 1024)
OPTION(newstore_overlay_max_length, OPT_INT, 65536)
nid_max(0),
wal_lock("NewStore::wal_lock"),
wal_seq(0),
+ wal_tp(cct,
+ "NewStore::wal_tp",
+ cct->_conf->newstore_wal_threads,
+ "newstore_wal_threads"),
+ wal_wq(this,
+ cct->_conf->newstore_wal_thread_timeout,
+ cct->_conf->newstore_wal_thread_suicide_timeout,
+ &wal_tp),
finisher(cct),
fsync_tp(cct,
"NewStore::fsync_tp",
finisher.start();
fsync_tp.start();
+ wal_tp.start();
kv_sync_thread.create();
mounted = true;
fsync_tp.stop();
dout(20) << __func__ << " stopping kv thread" << dendl;
_kv_stop();
+ dout(20) << __func__ << " draining wal_wq" << dendl;
+ wal_wq.drain();
+ dout(20) << __func__ << " stopping wal_tp" << dendl;
+ wal_tp.stop();
dout(20) << __func__ << " draining finisher" << dendl;
finisher.wait_for_empty();
dout(20) << __func__ << " stopping finisher" << dendl;
kv_cond.SignalOne();
}
-struct C_ApplyWAL : public Context {
- NewStore *store;
- NewStore::TransContext *txc;
- C_ApplyWAL(NewStore *s, NewStore::TransContext *t) : store(s), txc(t) {}
- void finish(int r) {
- store->_apply_wal_transaction(txc);
- }
-};
-
void NewStore::_txc_finish_kv(TransContext *txc)
{
dout(20) << __func__ << " txc " << txc << dendl;
dout(20) << __func__ << " starting wal apply" << dendl;
txc->state = TransContext::STATE_WAL_QUEUED;
txc->osr->qlock.Unlock();
- finisher.queue(new C_ApplyWAL(this, txc));
+ wal_wq.queue(txc);
} else {
txc->state = TransContext::STATE_FINISHING;
txc->osr->qlock.Unlock();
list<Context*> oncommits; ///< more commit completions
list<CollectionRef> removed_collections; ///< colls we removed
+ boost::intrusive::list_member_hook<> wal_queue_item;
wal_transaction_t *wal_txn; ///< wal transaction (if any)
unsigned num_fsyncs_completed;
}
};
-
class OpSequencer : public Sequencer_impl {
public:
Mutex qlock;
&TransContext::sequencer_item> > q_list_t;
q_list_t q; ///< transactions
+ typedef boost::intrusive::list<
+ TransContext,
+ boost::intrusive::member_hook<
+ TransContext,
+ boost::intrusive::list_member_hook<>,
+ &TransContext::wal_queue_item> > wal_queue_t;
+ wal_queue_t wal_q; ///< transactions
+
+ boost::intrusive::list_member_hook<> wal_osr_queue_item;
+
Sequencer *parent;
+ Mutex wal_apply_lock;
+
OpSequencer()
: qlock("NewStore::OpSequencer::qlock", false, false),
- parent(NULL) {
+ parent(NULL),
+ wal_apply_lock("NewStore::OpSequencer::wal_apply_lock") {
}
~OpSequencer() {
assert(q.empty());
}
};
+ class WALWQ : public ThreadPool::WorkQueue<TransContext> {
+ // We need to order WAL items within each Sequencer. To do that,
+ // queue each txc under osr, and queue the osr's here. When we
+ // dequeue an txc, requeue the osr if there are more pending, and
+ // do it at the end of the list so that the next thread does not
+ // get a conflicted txc. Hold an osr mutex while doing the wal to
+ // preserve the ordering.
+ public:
+ typedef boost::intrusive::list<
+ OpSequencer,
+ boost::intrusive::member_hook<
+ OpSequencer,
+ boost::intrusive::list_member_hook<>,
+ &OpSequencer::wal_osr_queue_item> > wal_osr_queue_t;
+
+ private:
+ NewStore *store;
+ wal_osr_queue_t wal_queue;
+
+ public:
+ WALWQ(NewStore *s, time_t ti, time_t sti, ThreadPool *tp)
+ : ThreadPool::WorkQueue<TransContext>("NewStore::WALWQ", ti, sti, tp),
+ store(s) {
+ }
+ bool _empty() {
+ return wal_queue.empty();
+ }
+ bool _enqueue(TransContext *i) {
+ if (i->osr->wal_q.empty()) {
+ wal_queue.push_back(*i->osr);
+ }
+ i->osr->wal_q.push_back(*i);
+ return true;
+ }
+ void _dequeue(TransContext *p) {
+ assert(0 == "not needed, not implemented");
+ }
+ TransContext *_dequeue() {
+ if (wal_queue.empty())
+ return NULL;
+ OpSequencer *osr = &wal_queue.front();
+ TransContext *i = &osr->wal_q.front();
+ osr->wal_q.pop_front();
+ wal_queue.pop_front();
+ if (!osr->wal_q.empty()) {
+ // requeue at the end to minimize contention
+ wal_queue.push_back(*i->osr);
+ }
+ return i;
+ }
+ void _process(TransContext *i, ThreadPool::TPHandle &handle) {
+ // preserve wal ordering for this sequencer
+ Mutex::Locker l(i->osr->wal_apply_lock);
+ store->_apply_wal_transaction(i);
+ }
+ void _clear() {
+ assert(wal_queue.empty());
+ }
+
+ void flush() {
+ lock();
+ while (!wal_queue.empty()) {
+ _wait();
+ }
+ unlock();
+ drain();
+ }
+ };
+
struct KVSyncThread : public Thread {
NewStore *store;
KVSyncThread(NewStore *s) : store(s) {}
Mutex wal_lock;
atomic64_t wal_seq;
+ ThreadPool wal_tp;
+ WALWQ wal_wq;
Finisher finisher;
ThreadPool fsync_tp;