]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
fio_ceph_objectstore: add option to cycle throttle values during test
authorSamuel Just <sjust@redhat.com>
Fri, 26 Jul 2019 22:16:11 +0000 (15:16 -0700)
committerSamuel Just <sjust@redhat.com>
Fri, 27 Sep 2019 22:55:18 +0000 (15:55 -0700)
Signed-off-by: Samuel Just <sjust@redhat.com>
src/test/fio/fio_ceph_objectstore.cc

index b60b117da954dc55f99022bcf7806ea07e3703b2..aabf0c22e80090c89cf3e33d712ecbb06a83c475 100644 (file)
@@ -19,6 +19,7 @@
 #include "include/intarith.h"
 #include "include/stringify.h"
 #include "include/random.h"
+#include "include/str_list.h"
 #include "common/perf_counters.h"
 #include "common/TracepointProvider.h"
 
@@ -38,7 +39,10 @@ struct Options {
   thread_data* td;
   char* conf;
   char* perf_output_file;
+  char* throttle_values;
+  char* deferred_throttle_values;
   unsigned long long
+    cycle_throttle_period,
     oi_attr_len_low,
     oi_attr_len_high,
     snapset_attr_len_low,
@@ -164,6 +168,31 @@ static std::vector<fio_option> ceph_options{
     o.off1   = offsetof(Options, check_files);
     o.def    = "0";
   }),
+  make_option([] (fio_option& o) {
+    o.name   = "bluestore_throttle";
+    o.lname  = "set bluestore throttle";
+    o.type   = FIO_OPT_STR_STORE;
+    o.help   = "comma delimited list of throttle values",
+    o.off1   = offsetof(Options, throttle_values);
+    o.def    = 0;
+  }),
+  make_option([] (fio_option& o) {
+    o.name   = "bluestore_deferred_throttle";
+    o.lname  = "set bluestore deferred throttle";
+    o.type   = FIO_OPT_STR_STORE;
+    o.help   = "comma delimited list of throttle values",
+    o.off1   = offsetof(Options, deferred_throttle_values);
+    o.def    = 0;
+  }),
+  make_option([] (fio_option& o) {
+    o.name   = "vary_bluestore_throttle_period";
+    o.lname = "period between different throttle values";
+    o.type   = FIO_OPT_STR_VAL;
+    o.help = "set to non-zero value to periodically cycle through throttle options";
+    o.off1   = offsetof(Options, cycle_throttle_period);
+    o.def    = "0";
+    o.minval = 0;
+  }),
   {} // fio expects a 'null'-terminated list
 };
 
@@ -441,6 +470,7 @@ struct Object {
 /// or just a client using its own objects from the shared pool
 struct Job {
   Engine* engine; //< shared ptr to the global Engine
+  const unsigned subjob_number; //< subjob num
   std::vector<Collection> collections; //< job's private collections to spread objects over
   std::vector<Object> objects; //< associate an object with each fio_file
   std::vector<io_u*> events; //< completions for fio_ceph_os_event()
@@ -448,6 +478,24 @@ struct Job {
 
   bufferptr one_for_all_data; //< preallocated buffer long enough
                               //< to use for vairious operations
+  std::mutex throttle_lock;
+  const vector<unsigned> throttle_values;
+  const vector<unsigned> deferred_throttle_values;
+  std::chrono::duration<double> cycle_throttle_period;
+  mono_clock::time_point last = ceph::mono_clock::zero();
+  unsigned index = 0;
+
+  static vector<unsigned> parse_throttle_str(const char *p) {
+    vector<unsigned> ret;
+
+    ceph::for_each_substr(p, ",\"", [&ret] (auto &&s) mutable {
+      if (s.size() > 0) {
+       ret.push_back(std::stoul(std::string(s)));
+      }
+    });
+    return ret;
+  }
+  void check_throttle();
 
   Job(Engine* engine, const thread_data* td);
   ~Job();
@@ -455,8 +503,15 @@ struct Job {
 
 Job::Job(Engine* engine, const thread_data* td)
   : engine(engine),
+    subjob_number(td->subjob_number),
     events(td->o.iodepth),
-    unlink(td->o.unlink)
+    unlink(td->o.unlink),
+    throttle_values(
+      parse_throttle_str(static_cast<Options*>(td->eo)->throttle_values)),
+    deferred_throttle_values(
+      parse_throttle_str(static_cast<Options*>(td->eo)->deferred_throttle_values)),
+    cycle_throttle_period(
+      static_cast<Options*>(td->eo)->cycle_throttle_period)
 {
   engine->ref();
   auto o = static_cast<Options*>(td->eo);
@@ -552,6 +607,45 @@ Job::~Job()
   engine->deref();
 }
 
+void Job::check_throttle()
+{
+  if (subjob_number != 0)
+    return;
+
+  std::lock_guard<std::mutex> l(throttle_lock);
+  if (throttle_values.empty() && deferred_throttle_values.empty())
+    return;
+
+  if (ceph::mono_clock::is_zero(last) ||
+      ((cycle_throttle_period != cycle_throttle_period.zero()) &&
+       (ceph::mono_clock::now() - last) > cycle_throttle_period)) {
+    unsigned tvals = throttle_values.size() ? throttle_values.size() : 1;
+    unsigned dtvals = deferred_throttle_values.size() ? deferred_throttle_values.size() : 1;
+    if (!throttle_values.empty()) {
+      std::string val = std::to_string(throttle_values[index % tvals]);
+      std::cerr << "Setting bluestore_throttle_bytes to " << val << std::endl;
+      int r = engine->cct->_conf.set_val(
+       "bluestore_throttle_bytes",
+       val,
+       nullptr);
+      ceph_assert(r == 0);
+    }
+    if (!deferred_throttle_values.empty()) {
+      std::string val = std::to_string(deferred_throttle_values[(index / tvals) % dtvals]);
+      std::cerr << "Setting bluestore_deferred_throttle_bytes to " << val << std::endl;
+      int r = engine->cct->_conf.set_val(
+       "bluestore_throttle_deferred_bytes",
+       val,
+       nullptr);
+      ceph_assert(r == 0);
+    }
+    engine->cct->_conf.apply_changes(nullptr);
+    index++;
+    index %= tvals * dtvals;
+    last = ceph::mono_clock::now();
+  }
+}
+
 int fio_ceph_os_setup(thread_data* td)
 {
   // if there are multiple jobs, they must run in the same process against a
@@ -637,6 +731,8 @@ enum fio_q_status fio_ceph_os_queue(thread_data* td, io_u* u)
   auto& coll = object.coll;
   auto& os = job->engine->os;
 
+  job->check_throttle();
+
   if (u->ddir == DDIR_WRITE) {
     // provide a hint if we're likely to read this data back
     const int flags = td_rw(td) ? CEPH_OSD_OP_FLAG_FADVISE_WILLNEED : 0;