From: Sage Weil Date: Thu, 9 Mar 2017 02:53:22 +0000 (-0500) Subject: os/bluestore: restructure deferred write queue X-Git-Tag: v12.0.1~12^2~24 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=6db031be4d5094a1fc68ee1081b789dd97cc8237;p=ceph.git os/bluestore: restructure deferred write queue First, eliminate the work queue--it's useless. We are dispatching aio and should not block. And if a single thread isn't sufficient to do it, it probably means we should be parallelizing kv_sync_thread too (which is our only caller that matters). Repurpose the old osr-list -> txc-list-per-osr queue structure to manage the queuing. For any given osr, dispatch one batch of aios at a time, taking care to collapse any overwrites so that the latest write wins. Signed-off-by: Sage Weil --- diff --git a/src/common/config_opts.h b/src/common/config_opts.h index 56c099d5ea83..27f8be5bd51b 100644 --- a/src/common/config_opts.h +++ b/src/common/config_opts.h @@ -1104,10 +1104,6 @@ OPTION(bluestore_fsck_on_umount_deep, OPT_BOOL, true) OPTION(bluestore_fsck_on_mkfs, OPT_BOOL, true) OPTION(bluestore_fsck_on_mkfs_deep, OPT_BOOL, false) OPTION(bluestore_sync_submit_transaction, OPT_BOOL, false) // submit kv txn in queueing thread (not kv_sync_thread) -OPTION(bluestore_sync_deferred_apply, OPT_BOOL, true) // perform initial wal work synchronously (possibly in combination with aio so we only *queue* ios) -OPTION(bluestore_deferred_threads, OPT_INT, 4) -OPTION(bluestore_deferred_thread_timeout, OPT_INT, 30) -OPTION(bluestore_deferred_thread_suicide_timeout, OPT_INT, 120) OPTION(bluestore_max_ops, OPT_U64, 512) OPTION(bluestore_max_bytes, OPT_U64, 64*1024*1024) OPTION(bluestore_deferred_max_ops, OPT_U64, 512) @@ -1126,7 +1122,6 @@ OPTION(bluestore_debug_prefragment_max, OPT_INT, 1048576) OPTION(bluestore_debug_inject_read_err, OPT_BOOL, false) OPTION(bluestore_debug_randomize_serial_transaction, OPT_INT, 0) OPTION(bluestore_debug_omit_block_device_write, OPT_BOOL, false) -OPTION(bluestore_inject_deferred_apply_delay, OPT_FLOAT, 0) OPTION(bluestore_shard_finishers, OPT_BOOL, false) OPTION(kstore_max_ops, OPT_U64, 512) diff --git a/src/os/bluestore/BlueStore.cc b/src/os/bluestore/BlueStore.cc index 9d466954a24a..bf1d4cae0393 100644 --- a/src/os/bluestore/BlueStore.cc +++ b/src/os/bluestore/BlueStore.cc @@ -3135,22 +3135,12 @@ BlueStore::BlueStore(CephContext *cct, const string& path) throttle_deferred_bytes(cct, "bluestore_deferred_max_bytes", cct->_conf->bluestore_max_bytes + cct->_conf->bluestore_deferred_max_bytes), - deferred_tp(cct, - "BlueStore::deferred_tp", - "tp_deferred", - cct->_conf->bluestore_sync_deferred_apply ? 0 : cct->_conf->bluestore_deferred_threads, - "bluestore_deferred_threads"), - deferred_wq(this, - cct->_conf->bluestore_deferred_thread_timeout, - cct->_conf->bluestore_deferred_thread_suicide_timeout, - &deferred_tp), m_finisher_num(1), kv_sync_thread(this), kv_stop(false), logger(NULL), debug_read_error_lock("BlueStore::debug_read_error_lock"), csum_type(Checksummer::CSUM_CRC32C), - sync_deferred_apply(cct->_conf->bluestore_sync_deferred_apply), mempool_thread(this) { _init_logger(); @@ -3191,15 +3181,6 @@ BlueStore::BlueStore(CephContext *cct, throttle_deferred_bytes(cct, "bluestore_deferred_max_bytes", cct->_conf->bluestore_max_bytes + cct->_conf->bluestore_deferred_max_bytes), - deferred_tp(cct, - "BlueStore::deferred_tp", - "tp_deferred", - cct->_conf->bluestore_sync_deferred_apply ? 0 : cct->_conf->bluestore_deferred_threads, - "bluestore_deferred_threads"), - deferred_wq(this, - cct->_conf->bluestore_deferred_thread_timeout, - cct->_conf->bluestore_deferred_thread_suicide_timeout, - &deferred_tp), m_finisher_num(1), kv_sync_thread(this), kv_stop(false), @@ -3208,7 +3189,6 @@ BlueStore::BlueStore(CephContext *cct, csum_type(Checksummer::CSUM_CRC32C), min_alloc_size(_min_alloc_size), min_alloc_size_order(ctz(_min_alloc_size)), - sync_deferred_apply(cct->_conf->bluestore_sync_deferred_apply), mempool_thread(this) { _init_logger(); @@ -3345,8 +3325,6 @@ void BlueStore::_init_logger() "Average kv_done state latency"); b.add_time_avg(l_bluestore_state_deferred_queued_lat, "state_deferred_queued_lat", "Average deferred_queued state latency"); - b.add_time_avg(l_bluestore_state_deferred_applying_lat, "state_deferred_applying_lat", - "Average deferred_applying state latency"); b.add_time_avg(l_bluestore_state_deferred_aio_wait_lat, "state_deferred_aio_wait_lat", "Average aio_wait state latency"); b.add_time_avg(l_bluestore_state_deferred_cleanup_lat, "state_deferred_cleanup_lat", @@ -4769,7 +4747,6 @@ int BlueStore::mount() for (auto f : finishers) { f->start(); } - deferred_tp.start(); kv_sync_thread.create("bstore_kv_sync"); r = _deferred_replay(); @@ -4786,8 +4763,6 @@ int BlueStore::mount() out_stop: _kv_stop(); - deferred_wq.drain(); - deferred_tp.stop(); for (auto f : finishers) { f->wait_for_empty(); f->stop(); @@ -4822,10 +4797,6 @@ int BlueStore::umount() dout(20) << __func__ << " stopping kv thread" << dendl; _kv_stop(); - dout(20) << __func__ << " draining deferred_wq" << dendl; - deferred_wq.drain(); - dout(20) << __func__ << " stopping deferred_tp" << dendl; - deferred_tp.stop(); for (auto f : finishers) { dout(20) << __func__ << " draining finisher" << dendl; f->wait_for_empty(); @@ -7152,25 +7123,12 @@ void BlueStore::_txc_state_proc(TransContext *txc) txc->log_state_latency(logger, l_bluestore_state_kv_done_lat); if (txc->deferred_txn) { txc->state = TransContext::STATE_DEFERRED_QUEUED; - if (sync_deferred_apply) { - _deferred_apply(txc); - } else { - deferred_wq.queue(txc); - } + _deferred_queue(txc); return; } txc->state = TransContext::STATE_FINISHING; break; - case TransContext::STATE_DEFERRED_APPLYING: - txc->log_state_latency(logger, l_bluestore_state_deferred_applying_lat); - if (txc->ioc.has_pending_aios()) { - txc->state = TransContext::STATE_DEFERRED_AIO_WAIT; - _txc_aio_submit(txc); - return; - } - // ** fall-thru ** - case TransContext::STATE_DEFERRED_AIO_WAIT: txc->log_state_latency(logger, l_bluestore_state_deferred_aio_wait_lat); _deferred_finish(txc); @@ -7691,7 +7649,8 @@ void BlueStore::_kv_sync_thread() dout(10) << __func__ << " finish" << dendl; } -bluestore_deferred_op_t *BlueStore::_get_deferred_op(TransContext *txc, OnodeRef o) +bluestore_deferred_op_t *BlueStore::_get_deferred_op( + TransContext *txc, OnodeRef o) { if (!txc->deferred_txn) { txc->deferred_txn = new bluestore_deferred_transaction_t; @@ -7700,46 +7659,111 @@ bluestore_deferred_op_t *BlueStore::_get_deferred_op(TransContext *txc, OnodeRef return &txc->deferred_txn->ops.back(); } -int BlueStore::_deferred_apply(TransContext *txc) +void BlueStore::_deferred_queue(TransContext *txc) { - bluestore_deferred_transaction_t& wt = *txc->deferred_txn; - dout(20) << __func__ << " txc " << txc << " seq " << wt.seq << dendl; - txc->log_state_latency(logger, l_bluestore_state_deferred_queued_lat); - txc->state = TransContext::STATE_DEFERRED_APPLYING; - - if (cct->_conf->bluestore_inject_deferred_apply_delay) { - dout(20) << __func__ << " bluestore_inject_deferred_apply_delay " - << cct->_conf->bluestore_inject_deferred_apply_delay - << dendl; - utime_t t; - t.set_from_double(cct->_conf->bluestore_inject_deferred_apply_delay); - t.sleep(); - dout(20) << __func__ << " finished sleep" << dendl; + dout(20) << __func__ << " txc " << txc << " on " << txc->osr << dendl; + std::lock_guard l(deferred_lock); + if (txc->osr->deferred_pending.empty() && + txc->osr->deferred_running.empty()) { + deferred_queue.push_back(*txc->osr); + } + txc->osr->deferred_pending.push_back(*txc); + if (txc->osr->deferred_running.empty()) { + _deferred_try_submit(txc->osr.get()); } +} - assert(txc->ioc.pending_aios.empty()); - for (list::iterator p = wt.ops.begin(); - p != wt.ops.end(); - ++p) { - int r = _do_deferred_op(txc, *p); - assert(r == 0); +void BlueStore::_deferred_try_submit() +{ + dout(20) << __func__ << " " << deferred_queue.size() << " osrs" << dendl; + for (auto& osr : deferred_queue) { + if (osr.deferred_running.empty()) { + _deferred_try_submit(&osr); + } } +} - _txc_state_proc(txc); - return 0; +void BlueStore::_deferred_try_submit(OpSequencer *osr) +{ + dout(10) << __func__ << " osr " << osr << " " << osr->deferred_pending.size() + << " pending " << dendl; + assert(osr->deferred_running.empty()); + osr->deferred_pending.swap(osr->deferred_running); + + // attach all IO to the last in the batch + TransContext *last = &osr->deferred_running.back(); + + // reverse order + for (auto i = osr->deferred_running.rbegin(); + i != osr->deferred_running.rend(); + ++i) { + TransContext *txc = &*i; + bluestore_deferred_transaction_t& wt = *txc->deferred_txn; + dout(20) << __func__ << " txc " << txc << " seq " << wt.seq << dendl; + txc->log_state_latency(logger, l_bluestore_state_deferred_queued_lat); + txc->state = TransContext::STATE_DEFERRED_AIO_WAIT; + for (auto opi = wt.ops.rbegin(); opi != wt.ops.rend(); ++opi) { + const auto& op = *opi; + uint64_t bl_offset = 0; + for (auto e : op.extents) { + interval_set cur; + cur.insert(e.offset, e.length); + interval_set overlap; + overlap.intersection_of(cur, osr->deferred_blocks); + cur.subtract(overlap); + dout(20) << __func__ << " txc " << txc << " " << e << std::hex + << " overlap 0x" << overlap << " new 0x" << cur + << " from bl_offset 0x" << bl_offset << std::dec << dendl; + for (auto j = cur.begin(); j != cur.end(); ++j) { + bufferlist bl; + bl.substr_of(op.data, bl_offset + j.get_start() - e.offset, + j.get_len()); + if (!g_conf->bluestore_debug_omit_block_device_write) { + int r = bdev->aio_write(j.get_start(), bl, &last->ioc, false); + assert(r == 0); + } + txc->osr->deferred_blocks.insert(j.get_start(), j.get_len()); + } + bl_offset += e.length; + } + } + } + osr->deferred_txc = last; + dout(20) << " osr " << osr << " deferred_blocks 0x" << std::hex + << osr->deferred_blocks << std::dec << dendl; + _txc_aio_submit(last); } int BlueStore::_deferred_finish(TransContext *txc) { bluestore_deferred_transaction_t& wt = *txc->deferred_txn; - dout(20) << __func__ << " txc " << " seq " << wt.seq << txc << dendl; + dout(20) << __func__ << " txc " << txc << " seq " << wt.seq << dendl; + + OpSequencer::deferred_queue_t finished; + { + std::lock_guard l(deferred_lock); + assert(txc->osr->deferred_txc == txc); + txc->osr->deferred_blocks.clear(); + finished.swap(txc->osr->deferred_running); + if (!txc->osr->deferred_pending.empty()) { + _deferred_try_submit(txc->osr.get()); + } else { + auto q = deferred_queue.iterator_to(*txc->osr); + deferred_queue.erase(q); + } + } + std::lock_guard l2(txc->osr->qlock); std::lock_guard l(kv_lock); - txc->state = TransContext::STATE_DEFERRED_CLEANUP; - txc->osr->qcond.notify_all(); - throttle_deferred_ops.put(txc->ops); - throttle_deferred_bytes.put(txc->bytes); - deferred_cleanup_queue.push_back(txc); + for (auto& i : finished) { + TransContext *txc = &i; + txc->state = TransContext::STATE_DEFERRED_CLEANUP; + txc->osr->qcond.notify_all(); + throttle_deferred_ops.put(txc->ops); + throttle_deferred_bytes.put(txc->bytes); + deferred_cleanup_queue.push_back(txc); + } + finished.clear(); kv_cond.notify_one(); return 0; } @@ -7760,6 +7784,7 @@ int BlueStore::_do_deferred_op(TransContext *txc, bluestore_deferred_op_t& wo) int r = bdev->aio_write(e.offset, bl, &txc->ioc, false); assert(r == 0); } + txc->osr->deferred_blocks.insert(e.offset, e.length); } } break; @@ -8488,7 +8513,7 @@ void BlueStore::_do_write_small( if (!g_conf->bluestore_debug_omit_block_device_write) { if (b_len <= prefer_deferred_size) { - dout(20) << __func__ << " defering small 0x" << std::hex + dout(20) << __func__ << " deferring small 0x" << std::hex << b_len << std::dec << " unused write via deferred" << dendl; bluestore_deferred_op_t *op = _get_deferred_op(txc, o); op->op = bluestore_deferred_op_t::OP_WRITE; @@ -8836,7 +8861,7 @@ int BlueStore::_do_alloc_write( // queue io if (!g_conf->bluestore_debug_omit_block_device_write) { if (l->length() <= prefer_deferred_size) { - dout(20) << __func__ << " defering small 0x" << std::hex + dout(20) << __func__ << " deferring small 0x" << std::hex << l->length() << std::dec << " write via deferred" << dendl; bluestore_deferred_op_t *op = _get_deferred_op(txc, o); op->op = bluestore_deferred_op_t::OP_WRITE; diff --git a/src/os/bluestore/BlueStore.h b/src/os/bluestore/BlueStore.h index 35d889dcb4d1..5c028f136e05 100644 --- a/src/os/bluestore/BlueStore.h +++ b/src/os/bluestore/BlueStore.h @@ -57,7 +57,6 @@ enum { l_bluestore_state_kv_committing_lat, l_bluestore_state_kv_done_lat, l_bluestore_state_deferred_queued_lat, - l_bluestore_state_deferred_applying_lat, l_bluestore_state_deferred_aio_wait_lat, l_bluestore_state_deferred_cleanup_lat, l_bluestore_state_finishing_lat, @@ -1337,9 +1336,8 @@ public: STATE_KV_QUEUED, // queued for kv_sync_thread submission STATE_KV_SUBMITTED, // submitted to kv; not yet synced STATE_KV_DONE, - STATE_DEFERRED_QUEUED, - STATE_DEFERRED_APPLYING, - STATE_DEFERRED_AIO_WAIT, + STATE_DEFERRED_QUEUED, // in deferred_queue + STATE_DEFERRED_AIO_WAIT, // aio in flight, waiting for completion STATE_DEFERRED_CLEANUP, // remove deferred kv record STATE_DEFERRED_DONE, STATE_FINISHING, @@ -1357,7 +1355,6 @@ public: case STATE_KV_SUBMITTED: return "kv_submitted"; case STATE_KV_DONE: return "kv_done"; case STATE_DEFERRED_QUEUED: return "deferred_queued"; - case STATE_DEFERRED_APPLYING: return "deferred_applying"; case STATE_DEFERRED_AIO_WAIT: return "deferred_aio_wait"; case STATE_DEFERRED_CLEANUP: return "deferred_cleanup"; case STATE_DEFERRED_DONE: return "deferred_done"; @@ -1377,7 +1374,6 @@ public: case l_bluestore_state_kv_committing_lat: return "kv_committing"; case l_bluestore_state_kv_done_lat: return "kv_done"; case l_bluestore_state_deferred_queued_lat: return "deferred_queued"; - case l_bluestore_state_deferred_applying_lat: return "deferred_applying"; case l_bluestore_state_deferred_aio_wait_lat: return "deferred_aio_wait"; case l_bluestore_state_deferred_cleanup_lat: return "deferred_cleanup"; case l_bluestore_state_finishing_lat: return "finishing"; @@ -1530,15 +1526,16 @@ public: TransContext, boost::intrusive::list_member_hook<>, &TransContext::deferred_queue_item> > deferred_queue_t; - deferred_queue_t deferred_q; ///< transactions + deferred_queue_t deferred_pending; ///< waiting + deferred_queue_t deferred_running; ///< in flight ios + interval_set deferred_blocks; ///< blocks in flight + TransContext *deferred_txc; ///< txc carrying this batch boost::intrusive::list_member_hook<> deferred_osr_queue_item; Sequencer *parent; BlueStore *store; - std::mutex deferred_apply_mutex; - uint64_t last_seq = 0; std::atomic_int txc_with_unstable_io = {0}; ///< num txcs with unstable io @@ -1612,73 +1609,12 @@ public: } }; - class DeferredWQ : public ThreadPool::WorkQueue { - // We need to order DEFERRED items within each Sequencer. To do that, - // queue each txc under osr, and queue the osr's here. When we - // dequeue an txc, requeue the osr if there are more pending, and - // do it at the end of the list so that the next thread does not - // get a conflicted txc. Hold an osr mutex while doing the deferred to - // preserve the ordering. - public: - typedef boost::intrusive::list< + typedef boost::intrusive::list< + OpSequencer, + boost::intrusive::member_hook< OpSequencer, - boost::intrusive::member_hook< - OpSequencer, - boost::intrusive::list_member_hook<>, - &OpSequencer::deferred_osr_queue_item> > deferred_osr_queue_t; - - private: - BlueStore *store; - deferred_osr_queue_t deferred_queue; - - public: - DeferredWQ(BlueStore *s, time_t ti, time_t sti, ThreadPool *tp) - : ThreadPool::WorkQueue("BlueStore::DeferredWQ", ti, sti, - tp), - store(s) { - } - bool _empty() { - return deferred_queue.empty(); - } - bool _enqueue(TransContext *i) { - if (i->osr->deferred_q.empty()) { - deferred_queue.push_back(*i->osr); - } - i->osr->deferred_q.push_back(*i); - return true; - } - void _dequeue(TransContext *p) { - assert(0 == "not needed, not implemented"); - } - TransContext *_dequeue() { - if (deferred_queue.empty()) - return NULL; - OpSequencer *osr = &deferred_queue.front(); - TransContext *i = &osr->deferred_q.front(); - osr->deferred_q.pop_front(); - deferred_queue.pop_front(); - if (!osr->deferred_q.empty()) { - // requeue at the end to minimize contention - deferred_queue.push_back(*i->osr); - } - - // preserve deferred ordering for this sequencer by taking the lock - // while still holding the queue lock - i->osr->deferred_apply_mutex.lock(); - return i; - } - void _process(TransContext *i, ThreadPool::TPHandle &) override { - store->_deferred_apply(i); - i->osr->deferred_apply_mutex.unlock(); - } - void _clear() { - assert(deferred_queue.empty()); - } - - void flush() { - drain(); - } - }; + boost::intrusive::list_member_hook<>, + &OpSequencer::deferred_osr_queue_item> > deferred_osr_queue_t; struct KVSyncThread : public Thread { BlueStore *store; @@ -1748,8 +1684,7 @@ private: std::mutex deferred_lock; std::atomic deferred_seq = {0}; - ThreadPool deferred_tp; - DeferredWQ deferred_wq; + deferred_osr_queue_t deferred_queue; ///< osr's with deferred io pending int m_finisher_num; vector finishers; @@ -1784,8 +1719,6 @@ private: uint64_t max_alloc_size = 0; ///< maximum allocation unit (power of 2) - bool sync_deferred_apply; ///< see config option bluestore_sync_deferred_apply - std::atomic comp_mode = {Compressor::COMP_NONE}; ///< compression mode CompressorRef compressor; std::atomic comp_min_blob_size = {0}; @@ -1919,7 +1852,9 @@ private: } bluestore_deferred_op_t *_get_deferred_op(TransContext *txc, OnodeRef o); - int _deferred_apply(TransContext *txc); + void _deferred_queue(TransContext *txc); + void _deferred_try_submit(); + void _deferred_try_submit(OpSequencer *osr); int _deferred_finish(TransContext *txc); int _do_deferred_op(TransContext *txc, bluestore_deferred_op_t& wo); int _deferred_replay(); diff --git a/src/test/objectstore/store_test.cc b/src/test/objectstore/store_test.cc index 481ee5ec2db6..1905971e1472 100644 --- a/src/test/objectstore/store_test.cc +++ b/src/test/objectstore/store_test.cc @@ -1776,6 +1776,61 @@ TEST_P(StoreTest, ManySmallWrite) { } } +TEST_P(StoreTest, MultiSmallWriteSameBlock) { + ObjectStore::Sequencer osr("test"); + int r; + coll_t cid; + ghobject_t a(hobject_t(sobject_t("Object 1", CEPH_NOSNAP))); + { + ObjectStore::Transaction t; + t.create_collection(cid, 0); + cerr << "Creating collection " << cid << std::endl; + r = apply_transaction(store, &osr, std::move(t)); + ASSERT_EQ(r, 0); + } + bufferlist bl; + bl.append("short"); + C_SaferCond c, d; + // touch same block in both same transaction, tls, and pipelined txns + { + ObjectStore::Transaction t, u; + t.write(cid, a, 0, 5, bl, 0); + t.write(cid, a, 5, 5, bl, 0); + t.write(cid, a, 4094, 5, bl, 0); + t.write(cid, a, 9000, 5, bl, 0); + u.write(cid, a, 10, 5, bl, 0); + u.write(cid, a, 7000, 5, bl, 0); + vector v = {t, u}; + store->queue_transactions(&osr, v, nullptr, &c); + } + { + ObjectStore::Transaction t, u; + t.write(cid, a, 40, 5, bl, 0); + t.write(cid, a, 45, 5, bl, 0); + t.write(cid, a, 4094, 5, bl, 0); + t.write(cid, a, 6000, 5, bl, 0); + u.write(cid, a, 610, 5, bl, 0); + u.write(cid, a, 11000, 5, bl, 0); + vector v = {t, u}; + store->queue_transactions(&osr, v, nullptr, &d); + } + c.wait(); + d.wait(); + { + bufferlist bl2; + r = store->read(cid, a, 0, 16000, bl2); + ASSERT_GE(r, 0); + } + { + ObjectStore::Transaction t; + t.remove(cid, a); + t.remove_collection(cid); + cerr << "Cleaning" << std::endl; + r = apply_transaction(store, &osr, std::move(t)); + ASSERT_EQ(r, 0); + } +} + TEST_P(StoreTest, SmallSkipFront) { ObjectStore::Sequencer osr("test"); int r;