]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
osdc/ObjectCacher: avoid io blocking for bufferheads exceed limit
authorshua.lv <shua.lv@outlook.com>
Fri, 31 May 2024 13:32:02 +0000 (21:32 +0800)
committershua.lv <shua.lv@outlook.com>
Sat, 29 Mar 2025 05:45:52 +0000 (13:45 +0800)
Fixes: https://tracker.ceph.com/issues/62918
Signed-off-by: shua.lv <shua.lv@outlook.com>
qa/suites/rados/objectstore/backends/objectcacher-stress.yaml
qa/workunits/osdc/object_cacher_misc.sh [new file with mode: 0755]
src/osdc/ObjectCacher.cc
src/osdc/ObjectCacher.h
src/test/osdc/CMakeLists.txt
src/test/osdc/object_cacher_misc.cc [new file with mode: 0644]

index ae0f8f381b4a8a2164fda59aea2a3203f83416d7..bc31c2997d420efc870c5eabe77ee941c696fb47 100644 (file)
@@ -14,3 +14,4 @@ tasks:
     clients:
       all:
         - osdc/stress_objectcacher.sh
+        - osdc/object_cacher_misc.sh
diff --git a/qa/workunits/osdc/object_cacher_misc.sh b/qa/workunits/osdc/object_cacher_misc.sh
new file mode 100755 (executable)
index 0000000..c4c331c
--- /dev/null
@@ -0,0 +1,5 @@
+#!/bin/sh -ex
+
+ceph_test_objectcacher_misc --flush-test
+
+echo OK
index e850f9a7cb0f38df43899c504d0ad16518e19781..14287dd978e223260b8180fbde646e376f314eda 100644 (file)
@@ -1276,13 +1276,14 @@ void ObjectCacher::bh_write_commit(int64_t poolid, sobject_t oid,
     finish_contexts(cct, ls, r);
 }
 
-void ObjectCacher::flush(ZTracer::Trace *trace, loff_t amount)
+void ObjectCacher::flush(ZTracer::Trace *trace, loff_t amount, int max_bhs)
 {
   ceph_assert(trace != nullptr);
   ceph_assert(ceph_mutex_is_locked(lock));
   ceph::real_time cutoff = ceph::real_clock::now();
 
-  ldout(cct, 10) << "flush " << amount << dendl;
+  ldout(cct, 10) << "flush " << amount
+                 << " bytes, max bufferheads " << max_bhs << dendl;
 
   /*
    * NOTE: we aren't actually pulling things off the LRU here, just
@@ -1291,22 +1292,24 @@ void ObjectCacher::flush(ZTracer::Trace *trace, loff_t amount)
    * lru_dirty.lru_get_next_expire() again.
    */
   int64_t left = amount;
-  while (amount == 0 || left > 0) {
+  int left_bhs = max_bhs;
+  while ((amount == 0 && max_bhs == 0) || left > 0 || left_bhs > 0) {
     BufferHead *bh = static_cast<BufferHead*>(
       bh_lru_dirty.lru_get_next_expire());
     if (!bh) break;
     if (bh->last_write > cutoff) break;
 
     if (scattered_write) {
-      bh_write_adjacencies(bh, cutoff, amount > 0 ? &left : NULL, NULL);
+      bh_write_adjacencies(bh, cutoff, amount > 0 ? &left : NULL,
+                           max_bhs > 0 ? &left_bhs : NULL);
     } else {
       left -= bh->length();
+      left_bhs--;
       bh_write(bh, *trace);
     }
   }
 }
 
-
 void ObjectCacher::trim()
 {
   ceph_assert(ceph_mutex_is_locked(lock));
@@ -1944,6 +1947,7 @@ int ObjectCacher::_wait_for_write(OSDWrite *wr, uint64_t len, ObjectSet *oset,
 void ObjectCacher::flusher_entry()
 {
   ldout(cct, 10) << "flusher start" << dendl;
+  int target_dirty_bh = target_dirty >> BUFFER_MEMORY_WEIGHT;
   std::unique_lock l{lock};
   while (!flusher_stop) {
     loff_t all = get_stat_tx() + get_stat_rx() + get_stat_clean() +
@@ -1957,7 +1961,6 @@ void ObjectCacher::flusher_entry()
                   << target_dirty << " target, "
                   << max_dirty << " max)"
                   << dendl;
-    loff_t actual = get_stat_dirty() + get_stat_dirty_waiting();
 
     ZTracer::Trace trace;
     if (cct->_conf->osdc_blkin_trace_all) {
@@ -1965,12 +1968,20 @@ void ObjectCacher::flusher_entry()
       trace.event("start");
     }
 
+    loff_t actual = get_stat_dirty() + get_stat_dirty_waiting();
+    int actual_bhs = dirty_or_tx_bh.size() + get_stat_nr_dirty_waiters();
     if (actual > 0 && (uint64_t) actual > target_dirty) {
       // flush some dirty pages
       ldout(cct, 10) << "flusher " << get_stat_dirty() << " dirty + "
                     << get_stat_dirty_waiting() << " dirty_waiting > target "
                     << target_dirty << ", flushing some dirty bhs" << dendl;
       flush(&trace, actual - target_dirty);
+    } else if (actual_bhs > target_dirty_bh) {
+      ldout(cct, 10) << "flusher " << dirty_or_tx_bh.size() << " dirty/tx bh + "
+                     << get_stat_nr_dirty_waiters() << " dirty_waiters > "
+                     << "target dirty bh " << target_dirty_bh
+                     << ", flushing some dirty bhs" << dendl;
+      flush(&trace, 0, actual_bhs - target_dirty_bh);
     } else {
       // check tail of lru for old dirty items
       ceph::real_time cutoff = ceph::real_clock::now();
index 68f796c32832c962fc1a5efb92f9f462e0562a71..75d573eb391fc843cd482fdc6358c41bca004b0a 100644 (file)
@@ -541,7 +541,7 @@ class ObjectCacher {
                            int64_t *amount, int *max_count);
 
   void trim();
-  void flush(ZTracer::Trace *trace, loff_t amount=0);
+  void flush(ZTracer::Trace *trace, loff_t amount=0, int max_bhs=0);
 
   /**
    * flush a range of buffers
index 297c2672c638786fbb0e00c3aba8c9fc3f830237..264eb1712ca30d2847e0319c6f9d0febd4c2c120 100644 (file)
@@ -11,3 +11,16 @@ target_link_libraries(ceph_test_objectcacher_stress
   )
 install(TARGETS ceph_test_objectcacher_stress
   DESTINATION ${CMAKE_INSTALL_BINDIR})
+
+add_executable(ceph_test_objectcacher_misc
+  object_cacher_misc.cc
+  MemWriteback.cc
+  )
+target_link_libraries(ceph_test_objectcacher_misc
+  osdc
+  global
+  ${EXTRALIBS}
+  ${CMAKE_DL_LIBS}
+  )
+install(TARGETS ceph_test_objectcacher_misc
+  DESTINATION ${CMAKE_INSTALL_BINDIR})
diff --git a/src/test/osdc/object_cacher_misc.cc b/src/test/osdc/object_cacher_misc.cc
new file mode 100644 (file)
index 0000000..3ebd98f
--- /dev/null
@@ -0,0 +1,151 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include <cstdlib>
+#include <ctime>
+#include <sstream>
+#include <string>
+#include <vector>
+#include <boost/scoped_ptr.hpp>
+
+#include "common/ceph_argparse.h"
+#include "common/ceph_mutex.h"
+#include "common/common_init.h"
+#include "common/config.h"
+#include "common/snap_types.h"
+#include "global/global_init.h"
+#include "include/buffer.h"
+#include "include/Context.h"
+#include "include/stringify.h"
+#include "osdc/ObjectCacher.h"
+
+#include "FakeWriteback.h"
+#include "MemWriteback.h"
+
+#include <atomic>
+
+using namespace std;
+
+int flush_test()
+{
+  bool fail = false;
+  bool done = false;
+  uint64_t delay_ns = 0;
+  ceph::mutex lock = ceph::make_mutex("object_cacher_misc");
+  MemWriteback writeback(g_ceph_context, &lock, delay_ns);
+
+  int max_dirty_age = 1;
+  uint64_t max_cache = 1 << 20; // max cache size, 1MB
+  uint64_t max_dirty = 1 << 19; // max dirty, 512KB
+  uint64_t target_dirty = 1 << 18; // target dirty, 256KB
+
+  int bl_size = 1 << 12;
+  ceph::_page_shift = 16; // 64KB
+  int max_dirty_bhs = max_dirty / (1 << ceph::_page_shift); // 8
+
+  std::cout << "Test configuration:\n"
+      << setw(20) << "max_cache: " << max_cache << "\n"
+      << setw(20) << "max_dirty_age: " << max_dirty_age << "\n"
+      << setw(20) << "max_dirty: " << max_dirty << "\n"
+      << setw(20) << "ceph::_page_shift: " << ceph::_page_shift << "\n"
+      << setw(20) << "max_dirty_bh: " << max_dirty_bhs << "\n"
+      << setw(20) << "write extent size: " << bl_size << "\n\n";
+
+  ObjectCacher obc(g_ceph_context, "test", writeback, lock, NULL, NULL,
+                  max_cache, // max cache size, 1MB
+                  1, // max objects, just one
+                  max_dirty, // max dirty, 512KB
+                  target_dirty, // target dirty, 256KB
+                  max_dirty_age,
+                  true);
+  obc.start();
+
+  SnapContext snapc;
+  ceph_tid_t journal_tid = 0;
+  std::string oid("flush_test_obj");
+  ObjectCacher::ObjectSet object_set(NULL, 0, 0);
+  ceph::bufferlist zeroes_bl;
+  zeroes_bl.append_zero(bl_size);
+
+  std::map<int, C_SaferCond> create_finishers;
+
+  utime_t last_start;
+  for (int i = 0; i < max_dirty_bhs; ++i) {
+    if (i == (max_dirty_bhs - 1)) {
+      last_start = ceph_clock_now();
+    }
+    ObjectCacher::OSDWrite *wr = obc.prepare_write(snapc, zeroes_bl,
+                                                  ceph::real_clock::zero(), 0,
+                                                  ++journal_tid);
+    ObjectExtent extent(oid, 0, zeroes_bl.length()*i, zeroes_bl.length(), 0);
+    extent.oloc.pool = 0;
+    extent.buffer_extents.push_back(make_pair(0, bl_size));
+    wr->extents.push_back(extent);
+    lock.lock();
+    obc.writex(wr, &object_set, &create_finishers[i]);
+    lock.unlock();
+  }
+  utime_t last_end = ceph_clock_now();
+
+  std::cout << "Write " << max_dirty_bhs << " extents"
+      << ", total size " << zeroes_bl.length() * max_dirty_bhs
+      << ", attain max dirty bufferheads " << max_dirty_bhs
+      << ", but below max dirty " << max_dirty << std::endl;
+
+  if (last_end - last_start > utime_t(max_dirty_age, 0)) {
+    std::cout << "Error: the last writex took more than " << max_dirty_age
+        << "s(max_dirty_age), fail to trigger flush" << std::endl;
+    fail = true;;
+  } else {
+    std::cout << "Info: the last writex took " << last_end - last_start
+        << ", success to trigger flush" << std::endl;
+  }
+
+  for (int i = 0; i < max_dirty_bhs; ++i) {
+    create_finishers[i].wait();
+  }
+
+  lock.lock();
+  C_SaferCond flushcond;
+  obc.flush_all(&flushcond);
+  done = obc.flush_all(&flushcond);
+  if (!done) {
+    lock.unlock();
+    flushcond.wait();
+    lock.lock();
+  }
+
+  obc.release_set(&object_set);
+  lock.unlock();
+  obc.stop();
+
+  if (fail) {
+    std::cout << "Test ObjectCacher flush completed failed" << std::endl;
+    return EXIT_FAILURE;
+  }
+
+  std::cout << "Test ObjectCacher flush completed successfully" << std::endl;
+  return EXIT_SUCCESS;
+}
+
+int main(int argc, const char **argv)
+{
+  auto args = argv_to_vec(argc, argv);
+  auto cct = global_init(nullptr, args, CEPH_ENTITY_TYPE_CLIENT,
+                        CODE_ENVIRONMENT_UTILITY,
+                        CINIT_FLAG_NO_DEFAULT_CONFIG_FILE);
+  bool flush = false;
+  std::vector<const char*>::iterator i;
+  for (i = args.begin(); i != args.end();) {
+    if (ceph_argparse_flag(args, i, "--flush-test", NULL)) {
+      flush = true;
+    } else {
+      cerr << "unknown option " << *i << std::endl;
+      return EXIT_FAILURE;
+    }
+  }
+
+  if (flush) {
+    return flush_test();
+  }
+}