From 91b0fe4d51cebb5667f3e58093ecc1346071f1f0 Mon Sep 17 00:00:00 2001 From: Matt Vandermeulen Date: Thu, 8 Feb 2024 13:54:37 -0400 Subject: [PATCH] blk: add threaded discard support to kernel devices Signed-off-by: Matt Vandermeulen (cherry picked from commit d8815e18b7e2c75db0f55dd4497ea7285bd4ae25) --- src/blk/kernel/KernelDevice.cc | 152 ++++++++++++++++++++++++++------- src/blk/kernel/KernelDevice.h | 26 ++++-- 2 files changed, 141 insertions(+), 37 deletions(-) diff --git a/src/blk/kernel/KernelDevice.cc b/src/blk/kernel/KernelDevice.cc index 754b44d32a6..6ce5b3a9293 100644 --- a/src/blk/kernel/KernelDevice.cc +++ b/src/blk/kernel/KernelDevice.cc @@ -65,12 +65,10 @@ KernelDevice::KernelDevice(CephContext* cct, aio_callback_t cb, void *cbpriv, ai discard_callback(d_cb), discard_callback_priv(d_cbpriv), aio_stop(false), - discard_started(false), - discard_stop(false), aio_thread(this), - discard_thread(this), injecting_crash(0) { + cct->_conf.add_observer(this); fd_directs.resize(WRITE_LIFE_MAX, -1); fd_buffereds.resize(WRITE_LIFE_MAX, -1); @@ -92,6 +90,11 @@ KernelDevice::KernelDevice(CephContext* cct, aio_callback_t cb, void *cbpriv, ai } } +KernelDevice::~KernelDevice() +{ + cct->_conf.remove_observer(this); +} + int KernelDevice::_lock() { // When the block changes, systemd-udevd will open the block, @@ -131,6 +134,7 @@ int KernelDevice::open(const string& p) { path = p; int r = 0, i = 0; + uint64_t num_discard_threads = 0; dout(1) << __func__ << " path " << path << dendl; struct stat statbuf; @@ -281,7 +285,9 @@ int KernelDevice::open(const string& p) if (r < 0) { goto out_fail; } - if (support_discard && cct->_conf->bdev_enable_discard && cct->_conf->bdev_async_discard) { + + num_discard_threads = cct->_conf.get_val("bdev_async_discard_threads"); + if (support_discard && cct->_conf->bdev_enable_discard && num_discard_threads > 0) { _discard_start(); } @@ -330,7 +336,7 @@ void KernelDevice::close() { dout(1) << __func__ << dendl; _aio_stop(); - if (discard_thread.is_started()) { + if (_discard_started()) { _discard_stop(); } _pre_close(); @@ -532,28 +538,55 @@ void KernelDevice::_aio_stop() void KernelDevice::_discard_start() { - discard_thread.create("bstore_discard"); + uint64_t num = cct->_conf.get_val("bdev_async_discard_threads"); + dout(10) << __func__ << " starting " << num << " threads" << dendl; + + std::unique_lock l(discard_lock); + + target_discard_threads = num; + discard_threads.reserve(num); + for(uint64_t i = 0; i < num; i++) + { + // All threads created with the same name + discard_threads.emplace_back(new DiscardThread(this, i)); + discard_threads.back()->create("bstore_discard"); + } + + dout(10) << __func__ << " started " << num << " threads" << dendl; } void KernelDevice::_discard_stop() { dout(10) << __func__ << dendl; + + // Signal threads to stop, then wait for them to join { std::unique_lock l(discard_lock); - while (!discard_started) { + while (discard_threads.empty()) { discard_cond.wait(l); } - discard_stop = true; + + for(auto &t : discard_threads) { + t->stop = true; + } + discard_cond.notify_all(); } - discard_thread.join(); - { - std::lock_guard l(discard_lock); - discard_stop = false; - } + + // Threads are shared pointers and are cleaned up for us + for(auto &t : discard_threads) + t->join(); + discard_threads.clear(); + dout(10) << __func__ << " stopped" << dendl; } +bool KernelDevice::_discard_started() +{ + std::unique_lock l(discard_lock); + return !discard_threads.empty(); +} + void KernelDevice::discard_drain() { dout(10) << __func__ << dendl; @@ -567,7 +600,7 @@ static bool is_expected_ioerr(const int r) { // https://lxr.missinglinkelectronics.com/linux+v4.15/block/blk-core.c#L135 return (r == -EOPNOTSUPP || r == -ETIMEDOUT || r == -ENOSPC || - r == -ENOLINK || r == -EREMOTEIO || r == -EAGAIN || r == -EIO || + r == -ENOLINK || r == -EREMOTEIO || r == -EAGAIN || r == -EIO || r == -ENODATA || r == -EILSEQ || r == -ENOMEM || #if defined(__linux__) r == -EREMCHG || r == -EBADE @@ -698,44 +731,57 @@ void KernelDevice::_aio_thread() dout(10) << __func__ << " end" << dendl; } -void KernelDevice::_discard_thread() +void KernelDevice::_discard_thread(uint64_t tid) { + dout(10) << __func__ << " thread " << tid << " start" << dendl; + + // Thread-local list of processing discards + interval_set discard_processing; + std::unique_lock l(discard_lock); - ceph_assert(!discard_started); - discard_started = true; discard_cond.notify_all(); + + // Keeps the shared pointer around until erased from the vector + // and until we leave this function + auto thr = discard_threads[tid]; + while (true) { - ceph_assert(discard_finishing.empty()); + ceph_assert(discard_processing.empty()); if (discard_queued.empty()) { - if (discard_stop) + if (thr->stop) break; dout(20) << __func__ << " sleep" << dendl; discard_cond.notify_all(); // for the thread trying to drain... discard_cond.wait(l); dout(20) << __func__ << " wake" << dendl; } else { - discard_finishing.swap(discard_queued); + // Swap the queued discards for a local list we'll process here + // without caring about thread fairness. This allows the current + // thread to wait on the discard running while other threads pick + // up the next-in-queue, and do the same, ultimately issuing more + // discards in parallel, which is the goal. + discard_processing.swap(discard_queued); discard_running = true; l.unlock(); dout(20) << __func__ << " finishing" << dendl; - for (auto p = discard_finishing.begin();p != discard_finishing.end(); ++p) { - _discard(p.get_start(), p.get_len()); + for (auto p = discard_processing.begin(); p != discard_processing.end(); ++p) { + _discard(p.get_start(), p.get_len()); } - discard_callback(discard_callback_priv, static_cast(&discard_finishing)); - discard_finishing.clear(); + discard_callback(discard_callback_priv, static_cast(&discard_processing)); + discard_processing.clear(); l.lock(); discard_running = false; } } - dout(10) << __func__ << " finish" << dendl; - discard_started = false; + + dout(10) << __func__ << " thread " << tid << " finish" << dendl; } int KernelDevice::_queue_discard(interval_set &to_release) { // if bdev_async_discard enabled on the fly, discard_thread is not started here, fallback to sync discard - if (!discard_thread.is_started()) + if (!_discard_started()) return -1; if (to_release.empty()) @@ -743,7 +789,7 @@ int KernelDevice::_queue_discard(interval_set &to_release) std::lock_guard l(discard_lock); discard_queued.insert(to_release); - discard_cond.notify_all(); + discard_cond.notify_one(); return 0; } @@ -754,7 +800,7 @@ bool KernelDevice::try_discard(interval_set &to_release, bool async) if (!support_discard || !cct->_conf->bdev_enable_discard) return false; - if (async && discard_thread.is_started()) { + if (async) { return 0 == _queue_discard(to_release); } else { for (auto p = to_release.begin(); p != to_release.end(); ++p) { @@ -1447,3 +1493,51 @@ int KernelDevice::invalidate_cache(uint64_t off, uint64_t len) } return r; } + +const char** KernelDevice::get_tracked_conf_keys() const +{ + static const char* KEYS[] = { + "bdev_async_discard_threads", + NULL + }; + return KEYS; +} + +void KernelDevice::handle_conf_change(const ConfigProxy& conf, + const std::set &changed) +{ + if (changed.count("bdev_async_discard_threads")) { + std::unique_lock l(discard_lock); + + uint64_t oldval = target_discard_threads; + uint64_t newval = cct->_conf.get_val("bdev_async_discard_threads"); + + target_discard_threads = newval; + + // Increase? Spawn now, it's quick + if (newval > oldval) { + dout(10) << __func__ << " starting " << (newval - oldval) << " additional discard threads" << dendl; + discard_threads.reserve(target_discard_threads); + for(uint64_t i = oldval; i < newval; i++) + { + // All threads created with the same name + discard_threads.emplace_back(new DiscardThread(this, i)); + discard_threads.back()->create("bstore_discard"); + } + } else { + // Decrease? Signal threads after telling them to stop + dout(10) << __func__ << " stopping " << (oldval - newval) << " existing discard threads" << dendl; + + // Signal the last threads to quit, and stop tracking them + for(uint64_t i = oldval - 1; i >= newval; i--) + { + // Also detach the thread so we no longer need to join + discard_threads[i]->stop = true; + discard_threads[i]->detach(); + discard_threads.erase(discard_threads.begin() + i); + } + + discard_cond.notify_all(); + } + } +} diff --git a/src/blk/kernel/KernelDevice.h b/src/blk/kernel/KernelDevice.h index e00e31f10b1..326a9433991 100644 --- a/src/blk/kernel/KernelDevice.h +++ b/src/blk/kernel/KernelDevice.h @@ -19,6 +19,7 @@ #include "include/types.h" #include "include/interval_set.h" +#include "common/config_obs.h" #include "common/Thread.h" #include "include/utime.h" @@ -28,7 +29,8 @@ #define RW_IO_MAX (INT_MAX & CEPH_PAGE_MASK) -class KernelDevice : public BlockDevice { +class KernelDevice : public BlockDevice, + public md_config_obs_t { protected: std::string path; private: @@ -50,14 +52,11 @@ private: aio_callback_t discard_callback; void *discard_callback_priv; bool aio_stop; - bool discard_started; - bool discard_stop; ceph::mutex discard_lock = ceph::make_mutex("KernelDevice::discard_lock"); ceph::condition_variable discard_cond; bool discard_running = false; interval_set discard_queued; - interval_set discard_finishing; struct AioCompletionThread : public Thread { KernelDevice *bdev; @@ -70,12 +69,16 @@ private: struct DiscardThread : public Thread { KernelDevice *bdev; - explicit DiscardThread(KernelDevice *b) : bdev(b) {} + const uint64_t id; + bool stop = false; + explicit DiscardThread(KernelDevice *b, uint64_t id) : bdev(b), id(id) {} void *entry() override { - bdev->_discard_thread(); + bdev->_discard_thread(id); return NULL; } - } discard_thread; + }; + std::vector> discard_threads; + uint64_t target_discard_threads = 0; std::atomic_int injecting_crash; @@ -83,7 +86,7 @@ private: virtual void _pre_close() { } // hook for child implementations void _aio_thread(); - void _discard_thread(); + void _discard_thread(uint64_t tid); int _queue_discard(interval_set &to_release); bool try_discard(interval_set &to_release, bool async = true) override; @@ -92,6 +95,7 @@ private: void _discard_start(); void _discard_stop(); + bool _discard_started(); void _aio_log_start(IOContext *ioc, uint64_t offset, uint64_t length); void _aio_log_finish(IOContext *ioc, uint64_t offset, uint64_t length); @@ -116,6 +120,7 @@ private: public: KernelDevice(CephContext* cct, aio_callback_t cb, void *cbpriv, aio_callback_t d_cb, void *d_cbpriv); + ~KernelDevice(); void aio_submit(IOContext *ioc) override; void discard_drain() override; @@ -151,6 +156,11 @@ public: int invalidate_cache(uint64_t off, uint64_t len) override; int open(const std::string& path) override; void close() override; + + // config observer bits + const char** get_tracked_conf_keys() const override; + void handle_conf_change(const ConfigProxy& conf, + const std::set &changed) override; }; #endif -- 2.39.5