} else {
_txc_finalize_kv(txc, txc->t);
txc->state = TransContext::STATE_KV_SUBMITTED;
- if (txc->osr->kv_submitted_waiters) {
- txc->osr->qcond.notify_all();
- }
int r = db->submit_transaction(txc->t);
assert(r == 0);
_txc_applied_kv(txc);
_txc_state_proc(&*p++);
} while (p != osr->q.end() &&
p->state == TransContext::STATE_IO_DONE);
+
+ if (osr->kv_submitted_waiters &&
+ osr->_is_all_kv_submitted()) {
+ osr->qcond.notify_all();
+ }
}
void BlueStore::_txc_write_nodes(TransContext *txc, KeyValueDB::Transaction t)
txc->state = TransContext::STATE_KV_SUBMITTED;
if (txc->osr->kv_submitted_waiters) {
std::lock_guard<std::mutex> l(txc->osr->qlock);
- txc->osr->qcond.notify_all();
+ if (txc->osr->_is_all_kv_submitted()) {
+ txc->osr->qcond.notify_all();
+ }
}
}
if (num_aios) {
qcond.wait(l);
}
+ bool _is_all_kv_submitted() {
+ // caller must hold qlock
+ if (q.empty()) {
+ return true;
+ }
+ TransContext *txc = &q.back();
+ if (txc->state >= TransContext::STATE_KV_SUBMITTED) {
+ return true;
+ }
+ return false;
+ }
+
void flush() override {
std::unique_lock<std::mutex> l(qlock);
while (true) {
- if (q.empty()) {
- return;
- }
- TransContext *txc = &q.back();
- if (txc->state >= TransContext::STATE_KV_SUBMITTED) {
+ if (_is_all_kv_submitted()) {
return;
}
++kv_submitted_waiters;