From c27df932299737d531d965b455dbb63ee3d55627 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Tue, 11 Apr 2017 17:54:57 -0400 Subject: [PATCH] os/bluestore: restructure deferred writes Explicitly aggregate deferred writes into a batch. When we submit, take the opportunity to coalesce contiguous writes. Handle aio completion independently from the original txcs. Note that this paves the way for a few additional steps: 1- we could make deallocations cancel deferred writes. 2- we could drop the txc deferred states entirely and rely on the explicit deferred write batch machinery instead... if we build an alternative way to complete the SharedBlob writes and ensure the lifecycle issue are dealt with. (I'm not sure it would be worth it, but it might be.) Signed-off-by: Sage Weil --- src/os/bluestore/BlueStore.cc | 324 +++++++++++++++++++++++----------- src/os/bluestore/BlueStore.h | 65 ++++--- 2 files changed, 264 insertions(+), 125 deletions(-) diff --git a/src/os/bluestore/BlueStore.cc b/src/os/bluestore/BlueStore.cc index b08a7bcab8ed3..bf9a00b26a646 100644 --- a/src/os/bluestore/BlueStore.cc +++ b/src/os/bluestore/BlueStore.cc @@ -2887,8 +2887,110 @@ bool BlueStore::WriteContext::has_conflict( return false; } - // ======================================================= - +// ======================================================= + +// DeferredBatch +#undef dout_prefix +#define dout_prefix *_dout << "bluestore.DeferredBatch(" << this << ") " + +void BlueStore::DeferredBatch::prepare_write( + CephContext *cct, + uint64_t seq, uint64_t offset, uint64_t length, + bufferlist::const_iterator& blp) +{ + _discard(cct, offset, length); + auto i = iomap.insert(make_pair(offset, deferred_io())); + assert(i.second); // this should be a new insertion + i.first->second.seq = seq; + blp.copy(length, i.first->second.bl); + dout(20) << __func__ << " seq " << seq + << " 0x" << std::hex << offset << "~" << length + << " crc " << i.first->second.bl.crc32c(-1) + << std::dec << dendl; + seq_bytes[seq] += length; +#ifdef DEBUG_DEFERRED + _audit(cct); +#endif +} + +void BlueStore::DeferredBatch::_discard( + CephContext *cct, uint64_t offset, uint64_t length) +{ + generic_dout(20) << __func__ << " 0x" << std::hex << offset << "~" << length + << std::dec << dendl; + auto p = iomap.lower_bound(offset); + if (p != iomap.begin()) { + --p; + auto end = p->first + p->second.bl.length(); + if (end > offset) { + bufferlist head; + head.substr_of(p->second.bl, 0, offset - p->first); + dout(20) << __func__ << " keep head " << p->second.seq + << " 0x" << std::hex << p->first << "~" << p->second.bl.length() + << " -> 0x" << head.length() << std::dec << dendl; + auto i = seq_bytes.find(p->second.seq); + if (end > offset + length) { + bufferlist tail; + tail.substr_of(p->second.bl, offset + length - p->first, + end - (offset + length)); + dout(20) << __func__ << " keep tail " << p->second.seq + << " 0x" << std::hex << p->first << "~" << p->second.bl.length() + << " -> 0x" << tail.length() << std::dec << dendl; + auto &n = iomap[offset + length]; + n.bl.swap(tail); + n.seq = p->second.seq; + i->second -= length; + } else { + i->second -= end - offset; + } + p->second.bl.swap(head); + } + ++p; + } + while (p != iomap.end()) { + if (p->first >= offset + length) { + break; + } + auto i = seq_bytes.find(p->second.seq); + auto end = p->first + p->second.bl.length(); + if (end > offset + length) { + unsigned drop_front = offset + length - p->first; + unsigned keep_tail = end - (offset + length); + dout(20) << __func__ << " truncate front " << p->second.seq + << " 0x" << std::hex << p->first << "~" << p->second.bl.length() + << " drop_front 0x" << drop_front << " keep_tail 0x" << keep_tail + << " to 0x" << (offset + length) << "~" << keep_tail + << std::dec << dendl; + auto &s = iomap[offset + length]; + s.seq = p->second.seq; + s.bl.substr_of(p->second.bl, drop_front, keep_tail); + i->second -= drop_front; + } else { + dout(20) << __func__ << " drop " << p->second.seq + << " 0x" << std::hex << p->first << "~" << p->second.bl.length() + << std::dec << dendl; + i->second -= p->second.bl.length(); + } + p = iomap.erase(p); + } +} + +void BlueStore::DeferredBatch::_audit(CephContext *cct) +{ + map sb; + for (auto p : seq_bytes) { + sb[p.first] = 0; // make sure we have the same set of keys + } + uint64_t pos = 0; + for (auto& p : iomap) { + assert(p.first >= pos); + sb[p.second.seq] += p.second.bl.length(); + pos = p.first + p.second.bl.length(); + } + assert(sb == seq_bytes); +} + + // Collection #undef dout_prefix @@ -3165,7 +3267,11 @@ void *BlueStore::MempoolThread::entry() static void aio_cb(void *priv, void *priv2) { BlueStore *store = static_cast(priv); - store->_txc_aio_finish(priv2); + if ((unsigned long long)priv2 & 0x1ull) { + store->deferred_aio_finish((void*)((unsigned long long)priv2 & ~1ull)); + } else { + store->txc_aio_finish(priv2); + } } BlueStore::BlueStore(CephContext *cct, const string& path) @@ -7416,11 +7522,6 @@ void BlueStore::_txc_state_proc(TransContext *txc) txc->state = TransContext::STATE_FINISHING; break; - case TransContext::STATE_DEFERRED_AIO_WAIT: - txc->log_state_latency(logger, l_bluestore_state_deferred_aio_wait_lat); - _deferred_finish(txc); - return; - case TransContext::STATE_DEFERRED_CLEANUP: txc->log_state_latency(logger, l_bluestore_state_deferred_cleanup_lat); txc->state = TransContext::STATE_FINISHING; @@ -7767,8 +7868,8 @@ void BlueStore::_osr_drain_preceding(TransContext *txc) { // submit anything pending std::lock_guard l(deferred_lock); - if (!osr->deferred_pending.empty()) { - _deferred_try_submit(osr); + if (osr->deferred_pending) { + _deferred_submit(osr); } } { @@ -7855,7 +7956,7 @@ void BlueStore::_kv_sync_thread() dout(20) << __func__ << " wake" << dendl; } else { deque kv_submitting; - deque deferred_done, deferred_stable; + deque deferred_done, deferred_stable; dout(20) << __func__ << " committing " << kv_queue.size() << " submitting " << kv_queue_unsubmitted.size() << " deferred done " << deferred_done_queue.size() @@ -7889,7 +7990,7 @@ void BlueStore::_kv_sync_thread() if (num_aios) { force_flush = true; } else if (kv_committing.empty() && kv_submitting.empty() && - deferred_stable.empty()) { + deferred_stable.empty()) { force_flush = true; // there's nothing else to commit! } else if (deferred_aggressive) { force_flush = true; @@ -7987,19 +8088,22 @@ void BlueStore::_kv_sync_thread() } // cleanup sync deferred keys - for (auto txc : deferred_stable) { - bluestore_deferred_transaction_t& wt = *txc->deferred_txn; - if (!wt.released.empty()) { - // kraken replay compat only - txc->released = wt.released; - dout(10) << __func__ << " deferred txn has released " << txc->released - << " (we just upgraded from kraken) on " << txc << dendl; - _txc_finalize_kv(txc, synct); + for (auto b : deferred_stable) { + for (auto& txc : b->txcs) { + bluestore_deferred_transaction_t& wt = *txc.deferred_txn; + if (!wt.released.empty()) { + // kraken replay compat only + txc.released = wt.released; + dout(10) << __func__ << " deferred txn has released " + << txc.released + << " (we just upgraded from kraken) on " << &txc << dendl; + _txc_finalize_kv(&txc, synct); + } + // cleanup the deferred + string key; + get_deferred_key(wt.seq, &key); + synct->rm_single_key(PREFIX_DEFERRED, key); } - // cleanup the deferred - string key; - get_deferred_key(wt.seq, &key); - synct->rm_single_key(PREFIX_DEFERRED, key); } // submit synct synchronously (block and wait for it to commit) @@ -8035,10 +8139,14 @@ void BlueStore::_kv_sync_thread() _txc_state_proc(txc); kv_committing.pop_front(); } - while (!deferred_stable.empty()) { - TransContext *txc = deferred_stable.front(); - _txc_state_proc(txc); - deferred_stable.pop_front(); + for (auto b : deferred_stable) { + auto p = b->txcs.begin(); + while (p != b->txcs.end()) { + TransContext *txc = &*p; + p = b->txcs.erase(p); // unlink here because + _txc_state_proc(txc); // this may destroy txc + } + delete b; } if (!deferred_aggressive) { @@ -8088,17 +8196,30 @@ bluestore_deferred_op_t *BlueStore::_get_deferred_op( void BlueStore::_deferred_queue(TransContext *txc) { - dout(20) << __func__ << " txc " << txc << " on " << txc->osr << dendl; + dout(20) << __func__ << " txc " << txc << " osr " << txc->osr << dendl; std::lock_guard l(deferred_lock); - if (txc->osr->deferred_pending.empty() && - txc->osr->deferred_running.empty()) { + if (!txc->osr->deferred_pending && + !txc->osr->deferred_running) { deferred_queue.push_back(*txc->osr); } - txc->osr->deferred_pending.push_back(*txc); + if (!txc->osr->deferred_pending) { + txc->osr->deferred_pending = new DeferredBatch(cct, txc->osr.get()); + } ++deferred_queue_size; + txc->osr->deferred_pending->txcs.push_back(*txc); + bluestore_deferred_transaction_t& wt = *txc->deferred_txn; + for (auto opi = wt.ops.begin(); opi != wt.ops.end(); ++opi) { + const auto& op = *opi; + assert(op.op == bluestore_deferred_op_t::OP_WRITE); + bufferlist::const_iterator p = op.data.begin(); + for (auto e : op.extents) { + txc->osr->deferred_pending->prepare_write( + cct, wt.seq, e.offset, e.length, p); + } + } if (deferred_aggressive && - txc->osr->deferred_running.empty()) { - _deferred_try_submit(txc->osr.get()); + !txc->osr->deferred_running) { + _deferred_submit(txc->osr.get()); } } @@ -8107,106 +8228,99 @@ void BlueStore::_deferred_try_submit() dout(20) << __func__ << " " << deferred_queue.size() << " osrs, " << deferred_queue_size << " txcs" << dendl; for (auto& osr : deferred_queue) { - if (osr.deferred_running.empty()) { - _deferred_try_submit(&osr); + if (!osr.deferred_running) { + _deferred_submit(&osr); } } } -void BlueStore::_deferred_try_submit(OpSequencer *osr) +void BlueStore::_deferred_submit(OpSequencer *osr) { - dout(10) << __func__ << " osr " << osr << " " << osr->deferred_pending.size() - << " pending " << dendl; - assert(!osr->deferred_pending.empty()); - assert(osr->deferred_running.empty()); + dout(10) << __func__ << " osr " << osr + << " " << osr->deferred_pending->iomap.size() << " ios pending " + << dendl; + assert(osr->deferred_pending); + assert(!osr->deferred_running); - deferred_queue_size -= osr->deferred_pending.size(); + auto b = osr->deferred_pending; + deferred_queue_size -= b->seq_bytes.size(); assert(deferred_queue_size >= 0); - osr->deferred_running.swap(osr->deferred_pending); - // attach all IO to the last in the batch - TransContext *last = &osr->deferred_running.back(); + osr->deferred_running = osr->deferred_pending; + osr->deferred_pending = nullptr; - // reverse order - for (auto i = osr->deferred_running.rbegin(); - i != osr->deferred_running.rend(); - ++i) { - TransContext *txc = &*i; - bluestore_deferred_transaction_t& wt = *txc->deferred_txn; - dout(20) << __func__ << " txc " << txc << " seq " << wt.seq << dendl; - txc->log_state_latency(logger, l_bluestore_state_deferred_queued_lat); - txc->state = TransContext::STATE_DEFERRED_AIO_WAIT; - for (auto opi = wt.ops.rbegin(); opi != wt.ops.rend(); ++opi) { - assert(opi->op == bluestore_deferred_op_t::OP_WRITE); - const auto& op = *opi; - uint64_t bl_offset = 0; - for (auto e : op.extents) { - interval_set cur; - cur.insert(e.offset, e.length); - interval_set overlap; - overlap.intersection_of(cur, osr->deferred_blocks); - cur.subtract(overlap); - dout(20) << __func__ << " txc " << txc << " " << e << std::hex - << " overlap 0x" << overlap << " new 0x" << cur - << " from bl_offset 0x" << bl_offset << std::dec << dendl; - for (auto j = cur.begin(); j != cur.end(); ++j) { - bufferlist bl; - bl.substr_of(op.data, bl_offset + j.get_start() - e.offset, - j.get_len()); - if (!g_conf->bluestore_debug_omit_block_device_write) { - logger->inc(l_bluestore_deferred_write_ops); - logger->inc(l_bluestore_deferred_write_bytes, bl.length()); - int r = bdev->aio_write(j.get_start(), bl, &last->ioc, false); - assert(r == 0); - } - txc->osr->deferred_blocks.insert(j.get_start(), j.get_len()); + uint64_t start = 0, pos = 0; + bufferlist bl; + auto i = b->iomap.begin(); + while (true) { + if (i == b->iomap.end() || i->first != pos) { + if (bl.length()) { + dout(20) << __func__ << " write 0x" << std::hex + << start << "~" << bl.length() + << " crc " << bl.crc32c(-1) << std::dec << dendl; + if (!g_conf->bluestore_debug_omit_block_device_write) { + logger->inc(l_bluestore_deferred_write_ops); + logger->inc(l_bluestore_deferred_write_bytes, bl.length()); + int r = bdev->aio_write(start, bl, &b->ioc, false); + assert(r == 0); } - bl_offset += e.length; } + if (i == b->iomap.end()) { + break; + } + start = 0; + pos = i->first; + bl.clear(); + } + dout(20) << __func__ << " seq " << i->second.seq << " 0x" + << std::hex << pos << "~" << i->second.bl.length() << std::dec + << dendl; + if (!bl.length()) { + start = pos; } + pos += i->second.bl.length(); + bl.claim_append(i->second.bl); + ++i; } - osr->deferred_txc = last; - dout(20) << __func__ << " osr " << osr << " deferred_blocks 0x" << std::hex - << osr->deferred_blocks << std::dec << dendl; - _txc_aio_submit(last); + bdev->aio_submit(&b->ioc); } -int BlueStore::_deferred_finish(TransContext *txc) +void BlueStore::_deferred_aio_finish(OpSequencer *osr) { - bluestore_deferred_transaction_t& wt = *txc->deferred_txn; - dout(20) << __func__ << " txc " << txc << " seq " << wt.seq << dendl; + dout(10) << __func__ << " osr " << osr << dendl; + assert(osr->deferred_running); + + DeferredBatch *b = osr->deferred_running; + { + std::lock_guard l2(osr->qlock); + std::lock_guard l(kv_lock); + for (auto& i : b->txcs) { + TransContext *txc = &i; + txc->state = TransContext::STATE_DEFERRED_CLEANUP; + txc->osr->qcond.notify_all(); + throttle_deferred_bytes.put(txc->cost); + } + deferred_done_queue.emplace_back(b); + } - OpSequencer::deferred_queue_t finished; { std::lock_guard l(deferred_lock); - assert(txc->osr->deferred_txc == txc); - txc->osr->deferred_blocks.clear(); - finished.swap(txc->osr->deferred_running); - if (txc->osr->deferred_pending.empty()) { - auto q = deferred_queue.iterator_to(*txc->osr); + assert(osr->deferred_running == b); + osr->deferred_running = nullptr; + if (!osr->deferred_pending) { + auto q = deferred_queue.iterator_to(*osr); deferred_queue.erase(q); } else if (deferred_aggressive) { - _deferred_try_submit(txc->osr.get()); + _deferred_submit(osr); } } - std::lock_guard l2(txc->osr->qlock); - std::lock_guard l(kv_lock); - for (auto& i : finished) { - TransContext *txc = &i; - txc->state = TransContext::STATE_DEFERRED_CLEANUP; - txc->osr->qcond.notify_all(); - throttle_deferred_bytes.put(txc->cost); - deferred_done_queue.push_back(txc); - } - finished.clear(); - // in the normal case, do not bother waking up the kv thread; it will // catch us on the next commit anyway. if (deferred_aggressive) { + std::lock_guard l(kv_lock); kv_cond.notify_one(); } - return 0; } int BlueStore::_deferred_replay() diff --git a/src/os/bluestore/BlueStore.h b/src/os/bluestore/BlueStore.h index f2a32b8b2df84..af4b519fc72e3 100644 --- a/src/os/bluestore/BlueStore.h +++ b/src/os/bluestore/BlueStore.h @@ -47,6 +47,7 @@ class FreelistManager; class BlueFS; //#define DEBUG_CACHE +//#define DEBUG_DEFERRED enum { l_bluestore_first = 732430, @@ -1377,8 +1378,7 @@ public: STATE_KV_QUEUED, // queued for kv_sync_thread submission STATE_KV_SUBMITTED, // submitted to kv; not yet synced STATE_KV_DONE, - STATE_DEFERRED_QUEUED, // in deferred_queue - STATE_DEFERRED_AIO_WAIT, // aio in flight, waiting for completion + STATE_DEFERRED_QUEUED, // in deferred_queue (pending or running) STATE_DEFERRED_CLEANUP, // remove deferred kv record STATE_DEFERRED_DONE, STATE_FINISHING, @@ -1396,7 +1396,6 @@ public: case STATE_KV_SUBMITTED: return "kv_submitted"; case STATE_KV_DONE: return "kv_done"; case STATE_DEFERRED_QUEUED: return "deferred_queued"; - case STATE_DEFERRED_AIO_WAIT: return "deferred_aio_wait"; case STATE_DEFERRED_CLEANUP: return "deferred_cleanup"; case STATE_DEFERRED_DONE: return "deferred_done"; case STATE_FINISHING: return "finishing"; @@ -1415,7 +1414,6 @@ public: case l_bluestore_state_kv_committing_lat: return "kv_committing"; case l_bluestore_state_kv_done_lat: return "kv_done"; case l_bluestore_state_deferred_queued_lat: return "deferred_queued"; - case l_bluestore_state_deferred_aio_wait_lat: return "deferred_aio_wait"; case l_bluestore_state_deferred_cleanup_lat: return "deferred_cleanup"; case l_bluestore_state_finishing_lat: return "finishing"; case l_bluestore_state_done_lat: return "done"; @@ -1549,6 +1547,36 @@ public: } }; + typedef boost::intrusive::list< + TransContext, + boost::intrusive::member_hook< + TransContext, + boost::intrusive::list_member_hook<>, + &TransContext::deferred_queue_item> > deferred_queue_t; + + struct DeferredBatch { + struct deferred_io { + bufferlist bl; ///< data + uint64_t seq; ///< deferred transaction seq + }; + map iomap; ///< map of ios in this batch + deferred_queue_t txcs; ///< txcs in this batch + IOContext ioc; ///< our aios + /// bytes of pending io for each deferred seq (may be 0) + map seq_bytes; + + void _discard(CephContext *cct, uint64_t offset, uint64_t length); + void _audit(CephContext *cct); + + DeferredBatch(CephContext *cct, OpSequencer *osr) + : ioc(cct, (void*)((unsigned long long)osr | 1ull)) {} + + /// prepare a write + void prepare_write(CephContext *cct, + uint64_t seq, uint64_t offset, uint64_t length, + bufferlist::const_iterator& p); + }; + class OpSequencer : public Sequencer_impl { public: std::mutex qlock; @@ -1561,19 +1589,11 @@ public: &TransContext::sequencer_item> > q_list_t; q_list_t q; ///< transactions - typedef boost::intrusive::list< - TransContext, - boost::intrusive::member_hook< - TransContext, - boost::intrusive::list_member_hook<>, - &TransContext::deferred_queue_item> > deferred_queue_t; - deferred_queue_t deferred_pending; ///< waiting - deferred_queue_t deferred_running; ///< in flight ios - interval_set deferred_blocks; ///< blocks in flight - TransContext *deferred_txc; ///< txc carrying this batch - boost::intrusive::list_member_hook<> deferred_osr_queue_item; + DeferredBatch *deferred_running = nullptr; + DeferredBatch *deferred_pending = nullptr; + Sequencer *parent; BlueStore *store; @@ -1773,8 +1793,8 @@ private: deque kv_queue; ///< ready, already submitted deque kv_queue_unsubmitted; ///< ready, need submit by kv thread deque kv_committing; ///< currently syncing - deque deferred_done_queue; ///< deferred ios done - deque deferred_stable_queue; ///< deferred ios done + stable + deque deferred_done_queue; ///< deferred ios done + deque deferred_stable_queue; ///< deferred ios done + stable PerfCounters *logger = nullptr; @@ -1916,7 +1936,7 @@ private: void _txc_state_proc(TransContext *txc); void _txc_aio_submit(TransContext *txc); public: - void _txc_aio_finish(void *p) { + void txc_aio_finish(void *p) { _txc_state_proc(static_cast(p)); } private: @@ -1947,14 +1967,19 @@ private: } bluestore_deferred_op_t *_get_deferred_op(TransContext *txc, OnodeRef o); +public: + void deferred_aio_finish(void *priv) { + _deferred_aio_finish(static_cast(priv)); + } +private: void _deferred_queue(TransContext *txc); void deferred_try_submit() { std::lock_guard l(deferred_lock); _deferred_try_submit(); } void _deferred_try_submit(); - void _deferred_try_submit(OpSequencer *osr); - int _deferred_finish(TransContext *txc); + void _deferred_submit(OpSequencer *osr); + void _deferred_aio_finish(OpSequencer *osr); int _deferred_replay(); public: -- 2.39.5