} else {
_txc_finalize_kv(txc, txc->t);
txc->state = TransContext::STATE_KV_SUBMITTED;
+ if (txc->osr->kv_submitted_waiters) {
+ txc->osr->qcond.notify_all();
+ }
int r = db->submit_transaction(txc->t);
assert(r == 0);
_txc_applied_kv(txc);
_txc_applied_kv(txc);
--txc->osr->kv_committing_serially;
txc->state = TransContext::STATE_KV_SUBMITTED;
+ if (txc->osr->kv_submitted_waiters) {
+ std::lock_guard<std::mutex> l(txc->osr->qlock);
+ txc->osr->qcond.notify_all();
+ }
}
for (auto txc : kv_committing) {
if (txc->had_ios) {
std::atomic_int kv_committing_serially = {0};
+ std::atomic_int kv_submitted_waiters = {0};
+
OpSequencer(CephContext* cct)
//set the qlock to PTHREAD_MUTEX_RECURSIVE mode
: Sequencer_impl(cct),
q.push_back(*txc);
}
- void flush() override {
+ void drain() {
std::unique_lock<std::mutex> l(qlock);
while (!q.empty())
qcond.wait(l);
}
- void drain() {
+ void flush() override {
std::unique_lock<std::mutex> l(qlock);
- while (!q.empty())
+ while (true) {
+ if (q.empty()) {
+ return;
+ }
+ TransContext *txc = &q.back();
+ if (txc->state >= TransContext::STATE_KV_SUBMITTED) {
+ return;
+ }
+ ++kv_submitted_waiters;
qcond.wait(l);
+ --kv_submitted_waiters;
+ }
}
bool flush_commit(Context *c) override {