bluestore_wal_transaction_t& wt = *txc->wal_txn;
dout(20) << __func__ << " txc " << " seq " << wt.seq << txc << dendl;
+ std::lock_guard<std::mutex> l2(txc->osr->qlock);
std::lock_guard<std::mutex> l(kv_lock);
txc->state = TransContext::STATE_WAL_CLEANUP;
+ txc->osr->qcond.notify_all();
wal_cleanup_queue.push_back(txc);
kv_cond.notify_one();
return 0;
(offset / block_size == (o->onode.size - 1) / block_size)) {
dout(20) << __func__ << " using cached tail" << dendl;
assert((offset & block_mask) == (o->onode.size & block_mask));
+ // wait for any related wal writes to commit
+ txc->osr->wait_for_wal_on_seq(o->tail_txc_seq);
uint64_t tail_off = offset % block_size;
if (tail_off >= o->tail_bl.length()) {
bufferlist t;
txc->oncommits.push_back(c);
return false;
}
+
+ /// if there is a wal on @seq, wait for it to apply
+ void wait_for_wal_on_seq(uint64_t seq) {
+ std::unique_lock<std::mutex> l(qlock);
+ restart:
+ for (OpSequencer::q_list_t::reverse_iterator p = q.rbegin();
+ p != q.rend();
+ ++p) {
+ if (p->seq == seq) {
+ TransContext *txc = &(*p);
+ if (txc->wal_txn) {
+ while (txc->state < TransContext::STATE_WAL_CLEANUP) {
+ txc->osr->qcond.wait(l);
+ goto restart; // txc may have gone away
+ }
+ }
+ break;
+ }
+ if (p->seq < seq)
+ break;
+ }
+ }
};
class WALWQ : public ThreadPool::WorkQueue<TransContext> {