dout(20) << __func__ << " committed " << kv_committing.size()
<< " in " << dur << dendl;
while (!kv_committing.empty()) {
- _txc_finish_kv(kv_committing.front());
+ TransContext *txc = kv_committing.front();
+ if (txc->state == TransContext::STATE_WAL_CLEANUP) {
+ txc->osr->qlock.Lock();
+ txc->state = TransContext::STATE_FINISHING;
+ txc->osr->qlock.Unlock();
+ _txc_finish_apply(txc);
+ } else if (txc->state == TransContext::STATE_KV_QUEUED) {
+ _txc_finish_kv(txc);
+ } else {
+ derr << __func__ << " unexpected txc state " << txc->get_state_name()
+ << dendl;
+ assert(0);
+ }
kv_committing.pop_front();
}
get_wal_key(wt.seq, &key);
KeyValueDB::Transaction cleanup = db->get_transaction();
cleanup->rmkey(PREFIX_WAL, key);
- db->submit_transaction_sync(cleanup);
txc->osr->qlock.Lock();
- txc->state = TransContext::STATE_FINISHING;
+ txc->state = TransContext::STATE_WAL_CLEANUP;
txc->osr->qlock.Unlock();
- _txc_finish_apply(txc);
+ Mutex::Locker l(kv_lock);
+ db->submit_transaction(cleanup);
+ kv_queue.push_back(txc);
+ kv_cond.SignalOne();
return 0;
}
STATE_KV_DONE,
STATE_WAL_QUEUED,
STATE_WAL_APPLYING,
+ STATE_WAL_CLEANUP, // remove wal kv record
STATE_WAL_DONE,
STATE_FINISHING,
STATE_DONE,
case STATE_KV_DONE: return "kv_done";
case STATE_WAL_QUEUED: return "wal_queued";
case STATE_WAL_APPLYING: return "wal_applying";
+ case STATE_WAL_CLEANUP: return "wal_cleanup";
case STATE_WAL_DONE: return "wal_done";
case STATE_FINISHING: return "finishing";
case STATE_DONE: return "done";