From: Sage Weil Date: Thu, 16 Apr 2015 23:30:31 +0000 (-0700) Subject: os/newstore: throttle wal work X-Git-Tag: v9.1.0~242^2~57 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=143d48570fc000e69c6afb2ff9857e819d2cd9b8;p=ceph.git os/newstore: throttle wal work Signed-off-by: Sage Weil --- diff --git a/src/common/WorkQueue.h b/src/common/WorkQueue.h index cf78b2dc302..f0754de8e19 100644 --- a/src/common/WorkQueue.h +++ b/src/common/WorkQueue.h @@ -322,6 +322,10 @@ public: pool->_lock.Unlock(); } + Mutex &get_lock() { + return pool->_lock; + } + void lock() { pool->lock(); } diff --git a/src/common/config_opts.h b/src/common/config_opts.h index d9198b92a71..bfe7d0df665 100644 --- a/src/common/config_opts.h +++ b/src/common/config_opts.h @@ -798,9 +798,11 @@ OPTION(newstore_sync_queue_transaction, OPT_BOOL, false) // perform write synch OPTION(newstore_fsync_threads, OPT_INT, 16) // num threads calling fsync OPTION(newstore_fsync_thread_timeout, OPT_INT, 30) // thread timeout value OPTION(newstore_fsync_thread_suicide_timeout, OPT_INT, 120) // suicide timeout value -OPTION(newstore_wal_threads, OPT_INT, 2) +OPTION(newstore_wal_threads, OPT_INT, 4) OPTION(newstore_wal_thread_timeout, OPT_INT, 30) OPTION(newstore_wal_thread_suicide_timeout, OPT_INT, 120) +OPTION(newstore_wal_max_ops, OPT_U64, 64) +OPTION(newstore_wal_max_bytes, OPT_U64, 64*1024*1024) OPTION(newstore_fid_prealloc, OPT_INT, 1024) OPTION(newstore_nid_prealloc, OPT_INT, 1024) OPTION(newstore_overlay_max_length, OPT_INT, 65536) diff --git a/src/os/newstore/NewStore.cc b/src/os/newstore/NewStore.cc index a3a873d8624..d4691f8b7f7 100644 --- a/src/os/newstore/NewStore.cc +++ b/src/os/newstore/NewStore.cc @@ -2419,6 +2419,10 @@ int NewStore::queue_transactions( tls, &onreadable, &ondisk, &onreadable_sync); int r; + // throttle wal work + wal_wq.throttle(g_conf->newstore_wal_max_ops, + g_conf->newstore_wal_max_bytes); + // set up the sequencer OpSequencer *osr; if (!posr) diff --git a/src/os/newstore/NewStore.h b/src/os/newstore/NewStore.h index d95452f3767..2563ba3d5bf 100644 --- a/src/os/newstore/NewStore.h +++ b/src/os/newstore/NewStore.h @@ -367,6 +367,8 @@ public: private: NewStore *store; wal_osr_queue_t wal_queue; + uint64_t ops, bytes; + Cond throttle_cond; public: WALWQ(NewStore *s, time_t ti, time_t sti, ThreadPool *tp) @@ -381,6 +383,8 @@ public: wal_queue.push_back(*i->osr); } i->osr->wal_q.push_back(*i); + ++ops; + bytes += i->wal_txn->get_bytes(); return true; } void _dequeue(TransContext *p) { @@ -397,12 +401,18 @@ public: // requeue at the end to minimize contention wal_queue.push_back(*i->osr); } + --ops; + bytes -= i->wal_txn->get_bytes(); + throttle_cond.Signal(); + + // preserve wal ordering for this sequencer by taking the lock + // while still holding the queue lock + i->osr->wal_apply_lock.Lock(); return i; } void _process(TransContext *i, ThreadPool::TPHandle &handle) { - // preserve wal ordering for this sequencer - Mutex::Locker l(i->osr->wal_apply_lock); store->_apply_wal_transaction(i); + i->osr->wal_apply_lock.Unlock(); } void _clear() { assert(wal_queue.empty()); @@ -416,6 +426,14 @@ public: unlock(); drain(); } + + void throttle(uint64_t max_ops, uint64_t max_bytes) { + Mutex& lock = get_lock(); + Mutex::Locker l(lock); + while (ops > max_ops || bytes > max_bytes) { + throttle_cond.Wait(lock); + } + } }; struct KVSyncThread : public Thread { diff --git a/src/os/newstore/newstore_types.h b/src/os/newstore/newstore_types.h index 636ef65a96c..286fc773e67 100644 --- a/src/os/newstore/newstore_types.h +++ b/src/os/newstore/newstore_types.h @@ -165,6 +165,20 @@ struct wal_transaction_t { uint64_t seq; list ops; + int64_t _bytes; ///< cached byte count + + wal_transaction_t() : _bytes(-1) {} + + uint64_t get_bytes() { + if (_bytes < 0) { + _bytes = 0; + for (list::iterator p = ops.begin(); p != ops.end(); ++p) { + _bytes += p->length; + } + } + return _bytes; + } + void encode(bufferlist& bl) const; void decode(bufferlist::iterator& p); void dump(Formatter *f) const;