]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
Limit private discarded queue for threads to a small items count.
authorGabriel BenHanokh <gbenhano@redhat.com>
Tue, 9 Apr 2024 07:53:15 +0000 (07:53 +0000)
committerGabriel BenHanokh <gbenhano@redhat.com>
Tue, 9 Apr 2024 12:19:08 +0000 (12:19 +0000)
On fast-shutdown take over the main discarded queue copying it to the allocator and only wait for the threads to commit their small private discarded queues

Signed-off-by: Gabriel BenHanokh <gbenhano@redhat.com>
src/blk/BlockDevice.h
src/blk/kernel/KernelDevice.cc
src/blk/kernel/KernelDevice.h
src/os/bluestore/BlueStore.cc

index e46948ced34620dbb2a96e71713b08a8650fdf94..f1e32669b092d13c4c9127441fff98cbd145d8c2 100644 (file)
@@ -286,7 +286,7 @@ public:
   virtual int flush() = 0;
   virtual bool try_discard(interval_set<uint64_t> &to_release, bool async=true) { return false; }
   virtual void discard_drain() { return; }
-  virtual const interval_set<uint64_t>* get_discard_queued() { return nullptr;}
+  virtual void swap_discard_queued(interval_set<uint64_t>& other)  { other.clear(); }
   // for managing buffered readers/writers
   virtual int invalidate_cache(uint64_t off, uint64_t len) = 0;
   virtual int open(const std::string& path) = 0;
index 6337292f5dec25e95766e5db79b1e9cc9dcff8ad..f7980baddf2fad9740f9ecb7335abe61d81e3522 100644 (file)
@@ -591,7 +591,7 @@ void KernelDevice::discard_drain()
 {
   dout(10) << __func__ << dendl;
   std::unique_lock l(discard_lock);
-  while (!discard_queued.empty() || discard_running) {
+  while (!discard_queued.empty() || (discard_running > 0)) {
     discard_cond.wait(l);
   }
 }
@@ -731,6 +731,12 @@ void KernelDevice::_aio_thread()
   dout(10) << __func__ << " end" << dendl;
 }
 
+void KernelDevice::swap_discard_queued(interval_set<uint64_t>& other)
+{
+  std::unique_lock l(discard_lock);
+  discard_queued.swap(other);
+}
+
 void KernelDevice::_discard_thread(uint64_t tid)
 {
   dout(10) << __func__ << " thread " << tid << " start" << dendl;
@@ -755,13 +761,21 @@ void KernelDevice::_discard_thread(uint64_t tid)
       discard_cond.wait(l);
       dout(20) << __func__ << " wake" << dendl;
     } else {
-      // Swap the queued discards for a local list we'll process here
-      // without caring about thread fairness.  This allows the current
-      // thread to wait on the discard running while other threads pick
-      // up the next-in-queue, and do the same, ultimately issuing more
-      // discards in parallel, which is the goal.
-      discard_processing.swap(discard_queued);
-      discard_running = true;
+      // Limit local processing to MAX_LOCAL_DISCARD items.
+      // This will allow threads to work in parallel
+      //      instead of a single thread taking over the whole discard_queued.
+      // It will also allow threads to finish in a timely manner.
+      constexpr unsigned MAX_LOCAL_DISCARD = 10;
+      unsigned count = 0;
+      for (auto p = discard_queued.begin();
+          p != discard_queued.end() && count < MAX_LOCAL_DISCARD;
+          ++p, ++count) {
+       discard_processing.insert(p.get_start(), p.get_len());
+       discard_queued.erase(p);
+      }
+
+      // there are multiple active threads -> must use a counter instead of a flag
+      discard_running ++;
       l.unlock();
       dout(20) << __func__ << " finishing" << dendl;
       for (auto p = discard_processing.begin(); p != discard_processing.end(); ++p) {
@@ -771,7 +785,8 @@ void KernelDevice::_discard_thread(uint64_t tid)
       discard_callback(discard_callback_priv, static_cast<void*>(&discard_processing));
       discard_processing.clear();
       l.lock();
-      discard_running = false;
+      discard_running --;
+      ceph_assert(discard_running >= 0);
     }
   }
 
index 2b3f3943d77b89d1ff701a524fd15e4ba14e1d09..70962117403fec4019fef3dc47fccce8d4f81fca 100644 (file)
@@ -55,7 +55,7 @@ private:
 
   ceph::mutex discard_lock = ceph::make_mutex("KernelDevice::discard_lock");
   ceph::condition_variable discard_cond;
-  bool discard_running = false;
+  int discard_running = 0;
   interval_set<uint64_t> discard_queued;
 
   struct AioCompletionThread : public Thread {
@@ -124,7 +124,7 @@ public:
 
   void aio_submit(IOContext *ioc) override;
   void discard_drain() override;
-  const interval_set<uint64_t>* get_discard_queued() override { return &discard_queued;}
+  void swap_discard_queued(interval_set<uint64_t>& other) override;
   int collect_metadata(const std::string& prefix, std::map<std::string,std::string> *pm) const override;
   int get_devname(std::string *s) const override {
     if (devname.empty()) {
index 4570e4353e9873a7c8f8801e002a893cb26f20c1..5c8e8d8ec599b5b6383e9e92d150b8791f34df65 100644 (file)
@@ -7767,16 +7767,20 @@ void BlueStore::_close_db()
       bdev->discard_drain();
     }
 
-    auto discard_queued = bdev->get_discard_queued();
-    if (discard_queued && (discard_queued->num_intervals() > 0)) {
-      dout(10) << __func__ << "::discard_drain: size=" << discard_queued->size()
-              << " num_intervals=" << discard_queued->num_intervals() << dendl;
+    interval_set<uint64_t> discard_queued;
+    bdev->swap_discard_queued(discard_queued);
+    if (discard_queued.num_intervals() > 0) {
+      dout(10) << __func__ << "::discard_drain: size=" << discard_queued.size()
+              << " num_intervals=" << discard_queued.num_intervals() << dendl;
       // copy discard_queued to the allocator before storing it
-      for (auto p = discard_queued->begin(); p != discard_queued->end(); ++p) {
+      for (auto p = discard_queued.begin(); p != discard_queued.end(); ++p) {
        dout(20) << __func__ << "::discarded-extent=[" << p.get_start() << ", " << p.get_len() << "]" << dendl;
        alloc->init_add_free(p.get_start(), p.get_len());
       }
     }
+    // drain the items in the threads local discard_processing queues
+    // There are only a few items in those queues so it is fine to do so in fast shutdown
+    bdev->discard_drain();
     int ret = store_allocator(alloc);
     if (unlikely(ret != 0)) {
       derr << __func__ << "::NCB::store_allocator() failed (we will need to rebuild it on startup)" << dendl;