]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
blk: add threaded discard support to kernel devices
authorMatt Vandermeulen <matt@reenigne.net>
Thu, 8 Feb 2024 17:54:37 +0000 (13:54 -0400)
committerYite Gu <yitegu0@gmail.com>
Wed, 7 Aug 2024 02:45:34 +0000 (10:45 +0800)
Signed-off-by: Matt Vandermeulen <matt@reenigne.net>
(cherry picked from commit d8815e18b7e2c75db0f55dd4497ea7285bd4ae25)

src/blk/kernel/KernelDevice.cc
src/blk/kernel/KernelDevice.h

index 754b44d32a69161251fcde417287bcce976aa21e..6ce5b3a929338e383f6df280926ed169748955e9 100644 (file)
@@ -65,12 +65,10 @@ KernelDevice::KernelDevice(CephContext* cct, aio_callback_t cb, void *cbpriv, ai
     discard_callback(d_cb),
     discard_callback_priv(d_cbpriv),
     aio_stop(false),
-    discard_started(false),
-    discard_stop(false),
     aio_thread(this),
-    discard_thread(this),
     injecting_crash(0)
 {
+  cct->_conf.add_observer(this);
   fd_directs.resize(WRITE_LIFE_MAX, -1);
   fd_buffereds.resize(WRITE_LIFE_MAX, -1);
 
@@ -92,6 +90,11 @@ KernelDevice::KernelDevice(CephContext* cct, aio_callback_t cb, void *cbpriv, ai
   }
 }
 
+KernelDevice::~KernelDevice()
+{
+  cct->_conf.remove_observer(this);
+}
+
 int KernelDevice::_lock()
 {
   // When the block changes, systemd-udevd will open the block,
@@ -131,6 +134,7 @@ int KernelDevice::open(const string& p)
 {
   path = p;
   int r = 0, i = 0;
+  uint64_t num_discard_threads = 0;
   dout(1) << __func__ << " path " << path << dendl;
 
   struct stat statbuf;
@@ -281,7 +285,9 @@ int KernelDevice::open(const string& p)
   if (r < 0) {
     goto out_fail;
   }
-  if (support_discard && cct->_conf->bdev_enable_discard && cct->_conf->bdev_async_discard) {
+
+  num_discard_threads = cct->_conf.get_val<uint64_t>("bdev_async_discard_threads");
+  if (support_discard && cct->_conf->bdev_enable_discard && num_discard_threads > 0) {
     _discard_start();
   }
 
@@ -330,7 +336,7 @@ void KernelDevice::close()
 {
   dout(1) << __func__ << dendl;
   _aio_stop();
-  if (discard_thread.is_started()) {
+  if (_discard_started()) {
     _discard_stop();
   }
   _pre_close();
@@ -532,28 +538,55 @@ void KernelDevice::_aio_stop()
 
 void KernelDevice::_discard_start()
 {
-    discard_thread.create("bstore_discard");
+  uint64_t num = cct->_conf.get_val<uint64_t>("bdev_async_discard_threads");
+  dout(10) << __func__ << " starting " << num << " threads" << dendl;
+
+  std::unique_lock l(discard_lock);
+
+  target_discard_threads = num;
+  discard_threads.reserve(num);
+  for(uint64_t i = 0; i < num; i++)
+  {
+    // All threads created with the same name
+    discard_threads.emplace_back(new DiscardThread(this, i));
+    discard_threads.back()->create("bstore_discard");
+  }
+
+  dout(10) << __func__ << " started " << num << " threads" << dendl;
 }
 
 void KernelDevice::_discard_stop()
 {
   dout(10) << __func__ << dendl;
+
+  // Signal threads to stop, then wait for them to join
   {
     std::unique_lock l(discard_lock);
-    while (!discard_started) {
+    while (discard_threads.empty()) {
       discard_cond.wait(l);
     }
-    discard_stop = true;
+
+    for(auto &t : discard_threads) {
+      t->stop = true;
+    }
+
     discard_cond.notify_all();
   }
-  discard_thread.join();
-  {
-    std::lock_guard l(discard_lock);
-    discard_stop = false;
-  }
+
+  // Threads are shared pointers and are cleaned up for us
+  for(auto &t : discard_threads)
+    t->join();
+  discard_threads.clear();
+
   dout(10) << __func__ << " stopped" << dendl;
 }
 
+bool KernelDevice::_discard_started()
+{
+  std::unique_lock l(discard_lock);
+  return !discard_threads.empty();
+}
+
 void KernelDevice::discard_drain()
 {
   dout(10) << __func__ << dendl;
@@ -567,7 +600,7 @@ static bool is_expected_ioerr(const int r)
 {
   // https://lxr.missinglinkelectronics.com/linux+v4.15/block/blk-core.c#L135
   return (r == -EOPNOTSUPP || r == -ETIMEDOUT || r == -ENOSPC ||
-         r == -ENOLINK || r == -EREMOTEIO  || r == -EAGAIN || r == -EIO ||
+         r == -ENOLINK || r == -EREMOTEIO || r == -EAGAIN || r == -EIO ||
          r == -ENODATA || r == -EILSEQ || r == -ENOMEM ||
 #if defined(__linux__)
          r == -EREMCHG || r == -EBADE
@@ -698,44 +731,57 @@ void KernelDevice::_aio_thread()
   dout(10) << __func__ << " end" << dendl;
 }
 
-void KernelDevice::_discard_thread()
+void KernelDevice::_discard_thread(uint64_t tid)
 {
+  dout(10) << __func__ << " thread " << tid << " start" << dendl;
+
+  // Thread-local list of processing discards
+  interval_set<uint64_t> discard_processing;
+
   std::unique_lock l(discard_lock);
-  ceph_assert(!discard_started);
-  discard_started = true;
   discard_cond.notify_all();
+
+  // Keeps the shared pointer around until erased from the vector
+  // and until we leave this function
+  auto thr = discard_threads[tid];
+
   while (true) {
-    ceph_assert(discard_finishing.empty());
+    ceph_assert(discard_processing.empty());
     if (discard_queued.empty()) {
-      if (discard_stop)
+      if (thr->stop)
        break;
       dout(20) << __func__ << " sleep" << dendl;
       discard_cond.notify_all(); // for the thread trying to drain...
       discard_cond.wait(l);
       dout(20) << __func__ << " wake" << dendl;
     } else {
-      discard_finishing.swap(discard_queued);
+      // Swap the queued discards for a local list we'll process here
+      // without caring about thread fairness.  This allows the current
+      // thread to wait on the discard running while other threads pick
+      // up the next-in-queue, and do the same, ultimately issuing more
+      // discards in parallel, which is the goal.
+      discard_processing.swap(discard_queued);
       discard_running = true;
       l.unlock();
       dout(20) << __func__ << " finishing" << dendl;
-      for (auto p = discard_finishing.begin();p != discard_finishing.end(); ++p) {
-       _discard(p.get_start(), p.get_len());
+      for (auto p = discard_processing.begin(); p != discard_processing.end(); ++p) {
+        _discard(p.get_start(), p.get_len());
       }
 
-      discard_callback(discard_callback_priv, static_cast<void*>(&discard_finishing));
-      discard_finishing.clear();
+      discard_callback(discard_callback_priv, static_cast<void*>(&discard_processing));
+      discard_processing.clear();
       l.lock();
       discard_running = false;
     }
   }
-  dout(10) << __func__ << " finish" << dendl;
-  discard_started = false;
+
+  dout(10) << __func__ << " thread " << tid << " finish" << dendl;
 }
 
 int KernelDevice::_queue_discard(interval_set<uint64_t> &to_release)
 {
   // if bdev_async_discard enabled on the fly, discard_thread is not started here, fallback to sync discard
-  if (!discard_thread.is_started())
+  if (!_discard_started())
     return -1;
 
   if (to_release.empty())
@@ -743,7 +789,7 @@ int KernelDevice::_queue_discard(interval_set<uint64_t> &to_release)
 
   std::lock_guard l(discard_lock);
   discard_queued.insert(to_release);
-  discard_cond.notify_all();
+  discard_cond.notify_one();
   return 0;
 }
 
@@ -754,7 +800,7 @@ bool KernelDevice::try_discard(interval_set<uint64_t> &to_release, bool async)
   if (!support_discard || !cct->_conf->bdev_enable_discard)
     return false;
 
-  if (async && discard_thread.is_started()) {
+  if (async) {
     return 0 == _queue_discard(to_release);
   } else {
     for (auto p = to_release.begin(); p != to_release.end(); ++p) {
@@ -1447,3 +1493,51 @@ int KernelDevice::invalidate_cache(uint64_t off, uint64_t len)
   }
   return r;
 }
+
+const char** KernelDevice::get_tracked_conf_keys() const
+{
+  static const char* KEYS[] = {
+    "bdev_async_discard_threads",
+    NULL
+  };
+  return KEYS;
+}
+
+void KernelDevice::handle_conf_change(const ConfigProxy& conf,
+                            const std::set <std::string> &changed)
+{
+  if (changed.count("bdev_async_discard_threads")) {
+    std::unique_lock l(discard_lock);
+
+    uint64_t oldval = target_discard_threads;
+    uint64_t newval = cct->_conf.get_val<uint64_t>("bdev_async_discard_threads");
+
+    target_discard_threads = newval;
+
+    // Increase? Spawn now, it's quick
+    if (newval > oldval) {
+      dout(10) << __func__ << " starting " << (newval - oldval) << " additional discard threads" << dendl;
+      discard_threads.reserve(target_discard_threads);
+      for(uint64_t i = oldval; i < newval; i++)
+      {
+        // All threads created with the same name
+        discard_threads.emplace_back(new DiscardThread(this, i));
+        discard_threads.back()->create("bstore_discard");
+      }
+    } else {
+      // Decrease? Signal threads after telling them to stop
+      dout(10) << __func__ << " stopping " << (oldval - newval) << " existing discard threads" << dendl;
+
+      // Signal the last threads to quit, and stop tracking them
+      for(uint64_t i = oldval - 1; i >= newval; i--)
+      {
+        // Also detach the thread so we no longer need to join
+        discard_threads[i]->stop = true;
+        discard_threads[i]->detach();
+        discard_threads.erase(discard_threads.begin() + i);
+      }
+
+      discard_cond.notify_all();
+    }
+  }
+}
index e00e31f10b170b50cd71d22929d0b9667d338168..326a94339915f9a7e3e4f39601a40bfce8792eb0 100644 (file)
@@ -19,6 +19,7 @@
 
 #include "include/types.h"
 #include "include/interval_set.h"
+#include "common/config_obs.h"
 #include "common/Thread.h"
 #include "include/utime.h"
 
@@ -28,7 +29,8 @@
 
 #define RW_IO_MAX (INT_MAX & CEPH_PAGE_MASK)
 
-class KernelDevice : public BlockDevice {
+class KernelDevice : public BlockDevice,
+                     public md_config_obs_t {
 protected:
   std::string path;
 private:
@@ -50,14 +52,11 @@ private:
   aio_callback_t discard_callback;
   void *discard_callback_priv;
   bool aio_stop;
-  bool discard_started;
-  bool discard_stop;
 
   ceph::mutex discard_lock = ceph::make_mutex("KernelDevice::discard_lock");
   ceph::condition_variable discard_cond;
   bool discard_running = false;
   interval_set<uint64_t> discard_queued;
-  interval_set<uint64_t> discard_finishing;
 
   struct AioCompletionThread : public Thread {
     KernelDevice *bdev;
@@ -70,12 +69,16 @@ private:
 
   struct DiscardThread : public Thread {
     KernelDevice *bdev;
-    explicit DiscardThread(KernelDevice *b) : bdev(b) {}
+    const uint64_t id;
+    bool stop = false;
+    explicit DiscardThread(KernelDevice *b, uint64_t id) : bdev(b), id(id) {}
     void *entry() override {
-      bdev->_discard_thread();
+      bdev->_discard_thread(id);
       return NULL;
     }
-  } discard_thread;
+  };
+  std::vector<std::shared_ptr<DiscardThread>> discard_threads;
+  uint64_t target_discard_threads = 0;
 
   std::atomic_int injecting_crash;
 
@@ -83,7 +86,7 @@ private:
   virtual void  _pre_close() { }  // hook for child implementations
 
   void _aio_thread();
-  void _discard_thread();
+  void _discard_thread(uint64_t tid);
   int _queue_discard(interval_set<uint64_t> &to_release);
   bool try_discard(interval_set<uint64_t> &to_release, bool async = true) override;
 
@@ -92,6 +95,7 @@ private:
 
   void _discard_start();
   void _discard_stop();
+  bool _discard_started();
 
   void _aio_log_start(IOContext *ioc, uint64_t offset, uint64_t length);
   void _aio_log_finish(IOContext *ioc, uint64_t offset, uint64_t length);
@@ -116,6 +120,7 @@ private:
 
 public:
   KernelDevice(CephContext* cct, aio_callback_t cb, void *cbpriv, aio_callback_t d_cb, void *d_cbpriv);
+  ~KernelDevice();
 
   void aio_submit(IOContext *ioc) override;
   void discard_drain() override;
@@ -151,6 +156,11 @@ public:
   int invalidate_cache(uint64_t off, uint64_t len) override;
   int open(const std::string& path) override;
   void close() override;
+
+  // config observer bits
+  const char** get_tracked_conf_keys() const override;
+  void handle_conf_change(const ConfigProxy& conf,
+                          const std::set <std::string> &changed) override;
 };
 
 #endif