#include "include/intarith.h"
#include "include/stringify.h"
#include "include/random.h"
+#include "include/str_list.h"
#include "common/perf_counters.h"
#include "common/TracepointProvider.h"
thread_data* td;
char* conf;
char* perf_output_file;
+ char* throttle_values;
+ char* deferred_throttle_values;
unsigned long long
+ cycle_throttle_period,
oi_attr_len_low,
oi_attr_len_high,
snapset_attr_len_low,
o.off1 = offsetof(Options, check_files);
o.def = "0";
}),
+ make_option([] (fio_option& o) {
+ o.name = "bluestore_throttle";
+ o.lname = "set bluestore throttle";
+ o.type = FIO_OPT_STR_STORE;
+ o.help = "comma delimited list of throttle values",
+ o.off1 = offsetof(Options, throttle_values);
+ o.def = 0;
+ }),
+ make_option([] (fio_option& o) {
+ o.name = "bluestore_deferred_throttle";
+ o.lname = "set bluestore deferred throttle";
+ o.type = FIO_OPT_STR_STORE;
+ o.help = "comma delimited list of throttle values",
+ o.off1 = offsetof(Options, deferred_throttle_values);
+ o.def = 0;
+ }),
+ make_option([] (fio_option& o) {
+ o.name = "vary_bluestore_throttle_period";
+ o.lname = "period between different throttle values";
+ o.type = FIO_OPT_STR_VAL;
+ o.help = "set to non-zero value to periodically cycle through throttle options";
+ o.off1 = offsetof(Options, cycle_throttle_period);
+ o.def = "0";
+ o.minval = 0;
+ }),
{} // fio expects a 'null'-terminated list
};
/// or just a client using its own objects from the shared pool
struct Job {
Engine* engine; //< shared ptr to the global Engine
+ const unsigned subjob_number; //< subjob num
std::vector<Collection> collections; //< job's private collections to spread objects over
std::vector<Object> objects; //< associate an object with each fio_file
std::vector<io_u*> events; //< completions for fio_ceph_os_event()
bufferptr one_for_all_data; //< preallocated buffer long enough
//< to use for vairious operations
+ std::mutex throttle_lock;
+ const vector<unsigned> throttle_values;
+ const vector<unsigned> deferred_throttle_values;
+ std::chrono::duration<double> cycle_throttle_period;
+ mono_clock::time_point last = ceph::mono_clock::zero();
+ unsigned index = 0;
+
+ static vector<unsigned> parse_throttle_str(const char *p) {
+ vector<unsigned> ret;
+
+ ceph::for_each_substr(p, ",\"", [&ret] (auto &&s) mutable {
+ if (s.size() > 0) {
+ ret.push_back(std::stoul(std::string(s)));
+ }
+ });
+ return ret;
+ }
+ void check_throttle();
Job(Engine* engine, const thread_data* td);
~Job();
Job::Job(Engine* engine, const thread_data* td)
: engine(engine),
+ subjob_number(td->subjob_number),
events(td->o.iodepth),
- unlink(td->o.unlink)
+ unlink(td->o.unlink),
+ throttle_values(
+ parse_throttle_str(static_cast<Options*>(td->eo)->throttle_values)),
+ deferred_throttle_values(
+ parse_throttle_str(static_cast<Options*>(td->eo)->deferred_throttle_values)),
+ cycle_throttle_period(
+ static_cast<Options*>(td->eo)->cycle_throttle_period)
{
engine->ref();
auto o = static_cast<Options*>(td->eo);
engine->deref();
}
+void Job::check_throttle()
+{
+ if (subjob_number != 0)
+ return;
+
+ std::lock_guard<std::mutex> l(throttle_lock);
+ if (throttle_values.empty() && deferred_throttle_values.empty())
+ return;
+
+ if (ceph::mono_clock::is_zero(last) ||
+ ((cycle_throttle_period != cycle_throttle_period.zero()) &&
+ (ceph::mono_clock::now() - last) > cycle_throttle_period)) {
+ unsigned tvals = throttle_values.size() ? throttle_values.size() : 1;
+ unsigned dtvals = deferred_throttle_values.size() ? deferred_throttle_values.size() : 1;
+ if (!throttle_values.empty()) {
+ std::string val = std::to_string(throttle_values[index % tvals]);
+ std::cerr << "Setting bluestore_throttle_bytes to " << val << std::endl;
+ int r = engine->cct->_conf.set_val(
+ "bluestore_throttle_bytes",
+ val,
+ nullptr);
+ ceph_assert(r == 0);
+ }
+ if (!deferred_throttle_values.empty()) {
+ std::string val = std::to_string(deferred_throttle_values[(index / tvals) % dtvals]);
+ std::cerr << "Setting bluestore_deferred_throttle_bytes to " << val << std::endl;
+ int r = engine->cct->_conf.set_val(
+ "bluestore_throttle_deferred_bytes",
+ val,
+ nullptr);
+ ceph_assert(r == 0);
+ }
+ engine->cct->_conf.apply_changes(nullptr);
+ index++;
+ index %= tvals * dtvals;
+ last = ceph::mono_clock::now();
+ }
+}
+
int fio_ceph_os_setup(thread_data* td)
{
// if there are multiple jobs, they must run in the same process against a
auto& coll = object.coll;
auto& os = job->engine->os;
+ job->check_throttle();
+
if (u->ddir == DDIR_WRITE) {
// provide a hint if we're likely to read this data back
const int flags = td_rw(td) ? CEPH_OSD_OP_FLAG_FADVISE_WILLNEED : 0;