From 9754764ca2e7dad0c72c68cc310fc836e5bb96e0 Mon Sep 17 00:00:00 2001 From: Xiaoxi Chen Date: Mon, 11 May 2015 23:30:22 +0800 Subject: [PATCH] os/RocksDBStore: interpret some configurations. compact_on_mount, compaction_threads, flusher_threads, disableWAL. Signed-off-by: Xiaoxi Chen --- src/os/RocksDBStore.cc | 98 ++++++++++++++++--- src/os/RocksDBStore.h | 12 ++- .../objectstore/TestRocksdbOptionParse.cc | 54 ++++++++-- 3 files changed, 140 insertions(+), 24 deletions(-) diff --git a/src/os/RocksDBStore.cc b/src/os/RocksDBStore.cc index 4c600d8be4b97..cc06bd36a9a8e 100644 --- a/src/os/RocksDBStore.cc +++ b/src/os/RocksDBStore.cc @@ -17,18 +17,88 @@ #include "rocksdb/utilities/convenience.h" using std::string; #include "common/perf_counters.h" +#include "include/str_map.h" #include "KeyValueDB.h" #include "RocksDBStore.h" +int string2bool(string val, bool &b_val) +{ + if (strcasecmp(val.c_str(), "false") == 0) { + b_val = false; + return 0; + } else if (strcasecmp(val.c_str(), "true") == 0) { + b_val = true; + return 0; + } else { + std::string err; + int b = strict_strtol(val.c_str(), 10, &err); + if (!err.empty()) + return -EINVAL; + b_val = !!b; + return 0; + } +} + +int RocksDBStore::tryInterpret(const string key, const string val, rocksdb::Options &opt) +{ + if (key == "compaction_threads") { + std::string err; + int f = strict_sistrtoll(val.c_str(), &err); + if (!err.empty()) + return -EINVAL; + //Low priority threadpool is used for compaction + opt.env->SetBackgroundThreads(f, rocksdb::Env::Priority::LOW); + } else if (key == "flusher_threads") { + std::string err; + int f = strict_sistrtoll(val.c_str(), &err); + if (!err.empty()) + return -EINVAL; + //High priority threadpool is used for flusher + opt.env->SetBackgroundThreads(f, rocksdb::Env::Priority::HIGH); + } else if (key == "compact_on_mount") { + int ret = string2bool(val, compact_on_mount); + if (ret != 0) + return ret; + } else if (key == "disableWAL") { + int ret = string2bool(val, disableWAL); + if (ret != 0) + return ret; + } else { + //unrecognize config options. + return -EINVAL; + } + return 0; +} + +int RocksDBStore::ParseOptionsFromString(const string opt_str, rocksdb::Options &opt) +{ + map str_map; + int r = get_str_map(opt_str, "\n;", &str_map); + if (r < 0) + return r; + map::iterator it; + for(it = str_map.begin(); it != str_map.end(); it++) { + string this_opt = it->first + "=" + it->second; + rocksdb::Status status = rocksdb::GetOptionsFromString(opt, this_opt , &opt); + if (!status.ok()) { + //unrecognized by rocksdb, try to interpret by ourselves. + r = tryInterpret(it->first, it->second, opt); + if (r < 0) { + derr << status.ToString() << dendl; + return -EINVAL; + } + } + } + return 0; +} int RocksDBStore::init(string _options_str) { options_str = _options_str; - //try parse options rocksdb::Options opt; - rocksdb::Status status = rocksdb::GetOptionsFromString(opt, options_str, &opt); - if (!status.ok()) { - derr << status.ToString() << dendl; + //try parse options + int r = ParseOptionsFromString(options_str, opt); + if (r != 0) { return -EINVAL; } return 0; @@ -38,9 +108,9 @@ int RocksDBStore::do_open(ostream &out, bool create_if_missing) { rocksdb::Options opt; rocksdb::Status status; - status = rocksdb::GetOptionsFromString(opt, options_str, &opt); - if (!status.ok()) { - derr << status.ToString() << dendl; + + int r = ParseOptionsFromString(options_str, opt); + if (r != 0) { return -EINVAL; } opt.create_if_missing = create_if_missing; @@ -64,11 +134,11 @@ int RocksDBStore::do_open(ostream &out, bool create_if_missing) logger = plb.create_perf_counters(); cct->get_perfcounters_collection()->add(logger); - // if (g_conf->rocksdb_compact_on_mount) { - // derr << "Compacting rocksdb store..." << dendl; - //compact(); - //derr << "Finished compacting rocksdb store" << dendl; - // } + if (compact_on_mount) { + derr << "Compacting rocksdb store..." << dendl; + compact(); + derr << "Finished compacting rocksdb store" << dendl; + } return 0; } @@ -114,7 +184,7 @@ int RocksDBStore::submit_transaction(KeyValueDB::Transaction t) RocksDBTransactionImpl * _t = static_cast(t.get()); rocksdb::WriteOptions woptions; - //woptions.disableWAL = options.disableWAL; + woptions.disableWAL = disableWAL; rocksdb::Status s = db->Write(woptions, _t->bat); utime_t lat = ceph_clock_now(g_ceph_context) - start; logger->inc(l_rocksdb_txns); @@ -129,7 +199,7 @@ int RocksDBStore::submit_transaction_sync(KeyValueDB::Transaction t) static_cast(t.get()); rocksdb::WriteOptions woptions; woptions.sync = true; - //woptions.disableWAL = options.disableWAL; + woptions.disableWAL = disableWAL; rocksdb::Status s = db->Write(woptions, _t->bat); utime_t lat = ceph_clock_now(g_ceph_context) - start; logger->inc(l_rocksdb_txns); diff --git a/src/os/RocksDBStore.h b/src/os/RocksDBStore.h index 0204b1a8125a5..e44977f3e46bb 100644 --- a/src/os/RocksDBStore.h +++ b/src/os/RocksDBStore.h @@ -43,6 +43,7 @@ namespace rocksdb{ class Slice; class WriteBatch; class Iterator; + struct Options; } /** * Uses RocksDB to implement the KeyValueDB interface @@ -51,9 +52,8 @@ class RocksDBStore : public KeyValueDB { CephContext *cct; PerfCounters *logger; string path; - string options_str; rocksdb::DB *db; - + string options_str; int do_open(ostream &out, bool create_if_missing); // manage async compactions @@ -79,8 +79,12 @@ class RocksDBStore : public KeyValueDB { public: /// compact the underlying rocksdb store + bool compact_on_mount; + bool disableWAL; void compact(); + int tryInterpret(const string key, const string val, rocksdb::Options &opt); + int ParseOptionsFromString(const string opt_str, rocksdb::Options &opt); static int _test_init(const string& dir); int init(string options_str); /// compact rocksdb for all keys with a given prefix @@ -105,7 +109,9 @@ public: path(path), compact_queue_lock("RocksDBStore::compact_thread_lock"), compact_queue_stop(false), - compact_thread(this) + compact_thread(this), + compact_on_mount(false), + disableWAL(false) {} ~RocksDBStore(); diff --git a/src/test/objectstore/TestRocksdbOptionParse.cc b/src/test/objectstore/TestRocksdbOptionParse.cc index de7a7b3142896..4daf970e670cb 100644 --- a/src/test/objectstore/TestRocksdbOptionParse.cc +++ b/src/test/objectstore/TestRocksdbOptionParse.cc @@ -1,12 +1,20 @@ #include +#include "include/Context.h" +#include "common/ceph_argparse.h" +#include "global/global_init.h" #include "rocksdb/db.h" -#include "rocksdb/utilities/convenience.h" +#include "rocksdb/env.h" +#include "rocksdb/thread_status.h" +#include "os/RocksDBStore.h" #include using namespace std; +const string dir("store_test_temp_dir"); + TEST(RocksDBOption, simple) { rocksdb::Options options; rocksdb::Status status; + RocksDBStore *db = new RocksDBStore(g_ceph_context, dir); string options_string = "" "write_buffer_size=536870912;" "create_if_missing=true;" @@ -20,12 +28,8 @@ TEST(RocksDBOption, simple) { "num_levels = 3;" "compression = kNoCompression;" "disable_data_sync = false;"; - status = rocksdb::GetOptionsFromString(options, options_string, &options); - if (!status.ok()) { - cerr << status.ToString() << std::endl; - ASSERT_TRUE(status.ok()); - } - ASSERT_TRUE(status.ok()); + int r = db->ParseOptionsFromString(options_string, options); + ASSERT_EQ(0, r); ASSERT_EQ(536870912, options.write_buffer_size); ASSERT_EQ(4, options.max_write_buffer_number); ASSERT_EQ(4, options.max_background_compactions); @@ -38,8 +42,44 @@ TEST(RocksDBOption, simple) { ASSERT_FALSE(options.disableDataSync); // ASSERT_EQ("none", options.compression); } +TEST(RocksDBOption, interpret) { + rocksdb::Options options; + rocksdb::Status status; + RocksDBStore *db = new RocksDBStore(g_ceph_context, dir); + string options_string = "compact_on_mount = true; compaction_threads=10;flusher_threads=5;"; + + int r = db->ParseOptionsFromString(options_string, options); + ASSERT_EQ(0, r); + ASSERT_TRUE(db->compact_on_mount); + //check thread pool setting + options.env->SleepForMicroseconds(100000); + std::vector thread_list; + status = options.env->GetThreadList(&thread_list); + ASSERT_TRUE(status.ok()); + + int num_high_pri_threads = 0; + int num_low_pri_threads = 0; + for (vector::iterator it = thread_list.begin(); + it!= thread_list.end(); + it++) { + if (it->thread_type == rocksdb::ThreadStatus::HIGH_PRIORITY) + num_high_pri_threads++; + if (it->thread_type == rocksdb::ThreadStatus::LOW_PRIORITY) + num_low_pri_threads++; + } + ASSERT_EQ(15, thread_list.size()); + //low pri threads is compaction_threads + ASSERT_EQ(10, num_low_pri_threads); + //high pri threads is flusher_threads + ASSERT_EQ(5, num_high_pri_threads); +} int main(int argc, char **argv) { + vector args; + argv_to_vec(argc, (const char **)argv, args); + env_to_vec(args); + global_init(NULL, args, CEPH_ENTITY_TYPE_CLIENT, CODE_ENVIRONMENT_UTILITY, 0); + common_init_finish(g_ceph_context); ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); } -- 2.39.5