pool->_lock.Unlock();
}
+ Mutex &get_lock() {
+ return pool->_lock;
+ }
+
void lock() {
pool->lock();
}
OPTION(newstore_fsync_threads, OPT_INT, 16) // num threads calling fsync
OPTION(newstore_fsync_thread_timeout, OPT_INT, 30) // thread timeout value
OPTION(newstore_fsync_thread_suicide_timeout, OPT_INT, 120) // suicide timeout value
-OPTION(newstore_wal_threads, OPT_INT, 2)
+OPTION(newstore_wal_threads, OPT_INT, 4)
OPTION(newstore_wal_thread_timeout, OPT_INT, 30)
OPTION(newstore_wal_thread_suicide_timeout, OPT_INT, 120)
+OPTION(newstore_wal_max_ops, OPT_U64, 64)
+OPTION(newstore_wal_max_bytes, OPT_U64, 64*1024*1024)
OPTION(newstore_fid_prealloc, OPT_INT, 1024)
OPTION(newstore_nid_prealloc, OPT_INT, 1024)
OPTION(newstore_overlay_max_length, OPT_INT, 65536)
tls, &onreadable, &ondisk, &onreadable_sync);
int r;
+ // throttle wal work
+ wal_wq.throttle(g_conf->newstore_wal_max_ops,
+ g_conf->newstore_wal_max_bytes);
+
// set up the sequencer
OpSequencer *osr;
if (!posr)
private:
NewStore *store;
wal_osr_queue_t wal_queue;
+ uint64_t ops, bytes;
+ Cond throttle_cond;
public:
WALWQ(NewStore *s, time_t ti, time_t sti, ThreadPool *tp)
wal_queue.push_back(*i->osr);
}
i->osr->wal_q.push_back(*i);
+ ++ops;
+ bytes += i->wal_txn->get_bytes();
return true;
}
void _dequeue(TransContext *p) {
// requeue at the end to minimize contention
wal_queue.push_back(*i->osr);
}
+ --ops;
+ bytes -= i->wal_txn->get_bytes();
+ throttle_cond.Signal();
+
+ // preserve wal ordering for this sequencer by taking the lock
+ // while still holding the queue lock
+ i->osr->wal_apply_lock.Lock();
return i;
}
void _process(TransContext *i, ThreadPool::TPHandle &handle) {
- // preserve wal ordering for this sequencer
- Mutex::Locker l(i->osr->wal_apply_lock);
store->_apply_wal_transaction(i);
+ i->osr->wal_apply_lock.Unlock();
}
void _clear() {
assert(wal_queue.empty());
unlock();
drain();
}
+
+ void throttle(uint64_t max_ops, uint64_t max_bytes) {
+ Mutex& lock = get_lock();
+ Mutex::Locker l(lock);
+ while (ops > max_ops || bytes > max_bytes) {
+ throttle_cond.Wait(lock);
+ }
+ }
};
struct KVSyncThread : public Thread {
uint64_t seq;
list<wal_op_t> ops;
+ int64_t _bytes; ///< cached byte count
+
+ wal_transaction_t() : _bytes(-1) {}
+
+ uint64_t get_bytes() {
+ if (_bytes < 0) {
+ _bytes = 0;
+ for (list<wal_op_t>::iterator p = ops.begin(); p != ops.end(); ++p) {
+ _bytes += p->length;
+ }
+ }
+ return _bytes;
+ }
+
void encode(bufferlist& bl) const;
void decode(bufferlist::iterator& p);
void dump(Formatter *f) const;