Take a global throttle when we submit ops and release when they complete.
The first throttles cover the period from submit to commit, while the wal
ones also cover the async post-commit wal work. The configs are additive
since the wal ones cover both periods; this should make them reasonably
idiot-proof.
Signed-off-by: Sage Weil <sage@redhat.com>
OPTION(newstore_fail_eio, OPT_BOOL, true)
OPTION(newstore_sync_io, OPT_BOOL, false) // perform initial io synchronously
OPTION(newstore_sync_transaction, OPT_BOOL, false) // perform kv txn synchronously
+OPTION(newstore_sync_wal_apply, OPT_BOOL, true) // perform initial wal work synchronously (possibly in combination with aio so we only *queue* ios)
OPTION(newstore_fsync_threads, OPT_INT, 16) // num threads calling fsync
OPTION(newstore_fsync_thread_timeout, OPT_INT, 30) // thread timeout value
OPTION(newstore_fsync_thread_suicide_timeout, OPT_INT, 120) // suicide timeout value
OPTION(newstore_wal_threads, OPT_INT, 4)
OPTION(newstore_wal_thread_timeout, OPT_INT, 30)
OPTION(newstore_wal_thread_suicide_timeout, OPT_INT, 120)
-OPTION(newstore_wal_max_ops, OPT_U64, 64)
-OPTION(newstore_wal_max_bytes, OPT_U64, 64*1024*1024)
+OPTION(newstore_max_ops, OPT_U64, 512)
+OPTION(newstore_max_bytes, OPT_U64, 64*1024*1024)
+OPTION(newstore_wal_max_ops, OPT_U64, 512)
+OPTION(newstore_wal_max_bytes, OPT_U64, 128*1024*1024)
OPTION(newstore_fid_prealloc, OPT_INT, 1024)
OPTION(newstore_nid_prealloc, OPT_INT, 1024)
OPTION(newstore_overlay_max_length, OPT_INT, 65536)
fid_lock("NewStore::fid_lock"),
nid_lock("NewStore::nid_lock"),
nid_max(0),
+ throttle_ops(cct, "newstore_max_ops", cct->_conf->newstore_max_ops),
+ throttle_bytes(cct, "newstore_max_bytes", cct->_conf->newstore_max_bytes),
+ throttle_wal_ops(cct, "newstore_wal_max_ops",
+ cct->_conf->newstore_max_ops +
+ cct->_conf->newstore_wal_max_ops),
+ throttle_wal_bytes(cct, "newstore_wal_max_bytes",
+ cct->_conf->newstore_max_bytes +
+ cct->_conf->newstore_wal_max_bytes),
wal_lock("NewStore::wal_lock"),
wal_seq(0),
wal_tp(cct,
case TransContext::STATE_KV_DONE:
if (txc->wal_txn) {
txc->state = TransContext::STATE_WAL_QUEUED;
- wal_wq.queue(txc);
+ if (g_conf->newstore_sync_wal_apply) {
+ _wal_apply(txc);
+ } else {
+ wal_wq.queue(txc);
+ }
return;
}
txc->state = TransContext::STATE_FINISHING;
finisher.queue(txc->oncommits.front());
txc->oncommits.pop_front();
}
+
+ throttle_ops.put(txc->ops);
+ throttle_bytes.put(txc->bytes);
}
void NewStore::_txc_finish(TransContext *txc)
txc->removed_collections.pop_front();
}
+ throttle_wal_ops.put(txc->ops);
+ throttle_wal_bytes.put(txc->bytes);
+
OpSequencerRef osr = txc->osr;
osr->qlock.Lock();
txc->state = TransContext::STATE_DONE;
wal_transaction_t& wt = *txc->wal_txn;
dout(20) << __func__ << " txc " << " seq " << wt.seq << txc << dendl;
- wal_wq.release_throttle(txc);
-
string key;
get_wal_key(wt.seq, &key);
KeyValueDB::Transaction cleanup = db->get_transaction();
tls, &onreadable, &ondisk, &onreadable_sync);
int r;
- // throttle on wal work
- wal_wq.throttle(g_conf->newstore_wal_max_ops,
- g_conf->newstore_wal_max_bytes);
-
// set up the sequencer
OpSequencer *osr;
if (!posr)
for (list<Transaction*>::iterator p = tls.begin(); p != tls.end(); ++p) {
(*p)->set_osr(osr);
+ txc->ops += (*p)->get_num_ops();
+ txc->bytes += (*p)->get_num_bytes();
_txc_add_transaction(txc, *p);
}
r = _txc_finalize(osr, txc);
assert(r == 0);
+ throttle_ops.get(txc->ops);
+ throttle_bytes.get(txc->bytes);
+ throttle_wal_ops.get(txc->ops);
+ throttle_wal_bytes.get(txc->bytes);
+
// execute (start)
_txc_state_proc(txc);
return 0;
OpSequencerRef osr;
boost::intrusive::list_member_hook<> sequencer_item;
+ uint64_t ops, bytes;
+
list<fsync_item> fds; ///< these fds need to be synced
set<OnodeRef> onodes; ///< these onodes need to be updated/written
KeyValueDB::Transaction t; ///< then we will commit this
TransContext(OpSequencer *o)
: state(STATE_PREPARE),
osr(o),
+ ops(0),
+ bytes(0),
oncommit(NULL),
onreadable(NULL),
onreadable_sync(NULL),
private:
NewStore *store;
wal_osr_queue_t wal_queue;
- uint64_t ops, bytes;
- Cond throttle_cond;
public:
WALWQ(NewStore *s, time_t ti, time_t sti, ThreadPool *tp)
: ThreadPool::WorkQueue<TransContext>("NewStore::WALWQ", ti, sti, tp),
- store(s),
- ops(0),
- bytes(0) {
+ store(s) {
}
bool _empty() {
return wal_queue.empty();
wal_queue.push_back(*i->osr);
}
i->osr->wal_q.push_back(*i);
- ++ops;
- bytes += i->wal_txn->get_bytes();
return true;
}
void _dequeue(TransContext *p) {
unlock();
drain();
}
-
- void throttle(uint64_t max_ops, uint64_t max_bytes) {
- Mutex& lock = get_lock();
- Mutex::Locker l(lock);
- while (ops > max_ops || bytes > max_bytes) {
- throttle_cond.Wait(lock);
- }
- }
-
- void release_throttle(TransContext *txc) {
- lock();
- --ops;
- bytes -= txc->wal_txn->get_bytes();
- throttle_cond.Signal();
- unlock();
- }
};
struct KVSyncThread : public Thread {
uint64_t nid_last;
uint64_t nid_max;
+ Throttle throttle_ops, throttle_bytes; ///< submit to commit
+ Throttle throttle_wal_ops, throttle_wal_bytes; ///< submit to wal complete
+
Mutex wal_lock;
atomic64_t wal_seq;
ThreadPool wal_tp;