kv_lock.Lock();
while (true) {
assert(kv_committing.empty());
- if (kv_queue.empty()) {
+ assert(wal_cleaning.empty());
+ if (kv_queue.empty() && wal_cleanup_queue.empty()) {
if (kv_stop)
break;
dout(20) << __func__ << " sleep" << dendl;
kv_cond.Wait(kv_lock);
dout(20) << __func__ << " wake" << dendl;
} else {
- dout(20) << __func__ << " committing " << kv_queue.size() << dendl;
+ dout(20) << __func__ << " committing " << kv_queue.size() << " cleaning " << wal_cleanup_queue.size() << dendl;
kv_committing.swap(kv_queue);
+ wal_cleaning.swap(wal_cleanup_queue);
utime_t start = ceph_clock_now(NULL);
kv_lock.Unlock();
- db->submit_transaction_sync(db->get_transaction());
+ KeyValueDB::Transaction txc_cleanup_sync = db->get_transaction();
+ //adding wal cleanup op
+ for (std::deque<TransContext *>::iterator it = wal_cleaning.begin();
+ it != wal_cleaning.end();
+ it++) {
+ wal_transaction_t& wt =*(*it)->wal_txn;
+ string key;
+ get_wal_key(wt.seq, &key);
+ txc_cleanup_sync->rmkey(PREFIX_WAL, key);
+ }
+
+ db->submit_transaction_sync(txc_cleanup_sync);
utime_t finish = ceph_clock_now(NULL);
utime_t dur = finish - start;
- dout(20) << __func__ << " committed " << kv_committing.size()
+ dout(20) << __func__ << " committed " << kv_committing.size() << "cleaned " << wal_cleaning.size()
<< " in " << dur << dendl;
while (!kv_committing.empty()) {
TransContext *txc = kv_committing.front();
_txc_state_proc(txc);
kv_committing.pop_front();
}
+ while (!wal_cleaning.empty()) {
+ TransContext *txc = wal_cleaning.front();
+ _txc_state_proc(txc);
+ wal_cleaning.pop_front();
+ }
// this is as good a place as any ...
_reap_collections();
wal_transaction_t& wt = *txc->wal_txn;
dout(20) << __func__ << " txc " << " seq " << wt.seq << txc << dendl;
- string key;
- get_wal_key(wt.seq, &key);
- KeyValueDB::Transaction cleanup = db->get_transaction();
- cleanup->rmkey(PREFIX_WAL, key);
-
- txc->state = TransContext::STATE_WAL_CLEANUP;
-
Mutex::Locker l(kv_lock);
- db->submit_transaction(cleanup);
- kv_queue.push_back(txc);
+ txc->state = TransContext::STATE_WAL_CLEANUP;
+ wal_cleanup_queue.push_back(txc);
kv_cond.SignalOne();
return 0;
}