#include "rocksdb/utilities/convenience.h"
using std::string;
#include "common/perf_counters.h"
+#include "include/str_map.h"
#include "KeyValueDB.h"
#include "RocksDBStore.h"
+int string2bool(string val, bool &b_val)
+{
+ if (strcasecmp(val.c_str(), "false") == 0) {
+ b_val = false;
+ return 0;
+ } else if (strcasecmp(val.c_str(), "true") == 0) {
+ b_val = true;
+ return 0;
+ } else {
+ std::string err;
+ int b = strict_strtol(val.c_str(), 10, &err);
+ if (!err.empty())
+ return -EINVAL;
+ b_val = !!b;
+ return 0;
+ }
+}
+
+int RocksDBStore::tryInterpret(const string key, const string val, rocksdb::Options &opt)
+{
+ if (key == "compaction_threads") {
+ std::string err;
+ int f = strict_sistrtoll(val.c_str(), &err);
+ if (!err.empty())
+ return -EINVAL;
+ //Low priority threadpool is used for compaction
+ opt.env->SetBackgroundThreads(f, rocksdb::Env::Priority::LOW);
+ } else if (key == "flusher_threads") {
+ std::string err;
+ int f = strict_sistrtoll(val.c_str(), &err);
+ if (!err.empty())
+ return -EINVAL;
+ //High priority threadpool is used for flusher
+ opt.env->SetBackgroundThreads(f, rocksdb::Env::Priority::HIGH);
+ } else if (key == "compact_on_mount") {
+ int ret = string2bool(val, compact_on_mount);
+ if (ret != 0)
+ return ret;
+ } else if (key == "disableWAL") {
+ int ret = string2bool(val, disableWAL);
+ if (ret != 0)
+ return ret;
+ } else {
+ //unrecognize config options.
+ return -EINVAL;
+ }
+ return 0;
+}
+
+int RocksDBStore::ParseOptionsFromString(const string opt_str, rocksdb::Options &opt)
+{
+ map<string, string> str_map;
+ int r = get_str_map(opt_str, "\n;", &str_map);
+ if (r < 0)
+ return r;
+ map<string, string>::iterator it;
+ for(it = str_map.begin(); it != str_map.end(); it++) {
+ string this_opt = it->first + "=" + it->second;
+ rocksdb::Status status = rocksdb::GetOptionsFromString(opt, this_opt , &opt);
+ if (!status.ok()) {
+ //unrecognized by rocksdb, try to interpret by ourselves.
+ r = tryInterpret(it->first, it->second, opt);
+ if (r < 0) {
+ derr << status.ToString() << dendl;
+ return -EINVAL;
+ }
+ }
+ }
+ return 0;
+}
int RocksDBStore::init(string _options_str)
{
options_str = _options_str;
- //try parse options
rocksdb::Options opt;
- rocksdb::Status status = rocksdb::GetOptionsFromString(opt, options_str, &opt);
- if (!status.ok()) {
- derr << status.ToString() << dendl;
+ //try parse options
+ int r = ParseOptionsFromString(options_str, opt);
+ if (r != 0) {
return -EINVAL;
}
return 0;
{
rocksdb::Options opt;
rocksdb::Status status;
- status = rocksdb::GetOptionsFromString(opt, options_str, &opt);
- if (!status.ok()) {
- derr << status.ToString() << dendl;
+
+ int r = ParseOptionsFromString(options_str, opt);
+ if (r != 0) {
return -EINVAL;
}
opt.create_if_missing = create_if_missing;
logger = plb.create_perf_counters();
cct->get_perfcounters_collection()->add(logger);
- // if (g_conf->rocksdb_compact_on_mount) {
- // derr << "Compacting rocksdb store..." << dendl;
- //compact();
- //derr << "Finished compacting rocksdb store" << dendl;
- // }
+ if (compact_on_mount) {
+ derr << "Compacting rocksdb store..." << dendl;
+ compact();
+ derr << "Finished compacting rocksdb store" << dendl;
+ }
return 0;
}
RocksDBTransactionImpl * _t =
static_cast<RocksDBTransactionImpl *>(t.get());
rocksdb::WriteOptions woptions;
- //woptions.disableWAL = options.disableWAL;
+ woptions.disableWAL = disableWAL;
rocksdb::Status s = db->Write(woptions, _t->bat);
utime_t lat = ceph_clock_now(g_ceph_context) - start;
logger->inc(l_rocksdb_txns);
static_cast<RocksDBTransactionImpl *>(t.get());
rocksdb::WriteOptions woptions;
woptions.sync = true;
- //woptions.disableWAL = options.disableWAL;
+ woptions.disableWAL = disableWAL;
rocksdb::Status s = db->Write(woptions, _t->bat);
utime_t lat = ceph_clock_now(g_ceph_context) - start;
logger->inc(l_rocksdb_txns);
class Slice;
class WriteBatch;
class Iterator;
+ struct Options;
}
/**
* Uses RocksDB to implement the KeyValueDB interface
CephContext *cct;
PerfCounters *logger;
string path;
- string options_str;
rocksdb::DB *db;
-
+ string options_str;
int do_open(ostream &out, bool create_if_missing);
// manage async compactions
public:
/// compact the underlying rocksdb store
+ bool compact_on_mount;
+ bool disableWAL;
void compact();
+ int tryInterpret(const string key, const string val, rocksdb::Options &opt);
+ int ParseOptionsFromString(const string opt_str, rocksdb::Options &opt);
static int _test_init(const string& dir);
int init(string options_str);
/// compact rocksdb for all keys with a given prefix
path(path),
compact_queue_lock("RocksDBStore::compact_thread_lock"),
compact_queue_stop(false),
- compact_thread(this)
+ compact_thread(this),
+ compact_on_mount(false),
+ disableWAL(false)
{}
~RocksDBStore();
#include <gtest/gtest.h>
+#include "include/Context.h"
+#include "common/ceph_argparse.h"
+#include "global/global_init.h"
#include "rocksdb/db.h"
-#include "rocksdb/utilities/convenience.h"
+#include "rocksdb/env.h"
+#include "rocksdb/thread_status.h"
+#include "os/RocksDBStore.h"
#include <iostream>
using namespace std;
+const string dir("store_test_temp_dir");
+
TEST(RocksDBOption, simple) {
rocksdb::Options options;
rocksdb::Status status;
+ RocksDBStore *db = new RocksDBStore(g_ceph_context, dir);
string options_string = ""
"write_buffer_size=536870912;"
"create_if_missing=true;"
"num_levels = 3;"
"compression = kNoCompression;"
"disable_data_sync = false;";
- status = rocksdb::GetOptionsFromString(options, options_string, &options);
- if (!status.ok()) {
- cerr << status.ToString() << std::endl;
- ASSERT_TRUE(status.ok());
- }
- ASSERT_TRUE(status.ok());
+ int r = db->ParseOptionsFromString(options_string, options);
+ ASSERT_EQ(0, r);
ASSERT_EQ(536870912, options.write_buffer_size);
ASSERT_EQ(4, options.max_write_buffer_number);
ASSERT_EQ(4, options.max_background_compactions);
ASSERT_FALSE(options.disableDataSync);
// ASSERT_EQ("none", options.compression);
}
+TEST(RocksDBOption, interpret) {
+ rocksdb::Options options;
+ rocksdb::Status status;
+ RocksDBStore *db = new RocksDBStore(g_ceph_context, dir);
+ string options_string = "compact_on_mount = true; compaction_threads=10;flusher_threads=5;";
+
+ int r = db->ParseOptionsFromString(options_string, options);
+ ASSERT_EQ(0, r);
+ ASSERT_TRUE(db->compact_on_mount);
+ //check thread pool setting
+ options.env->SleepForMicroseconds(100000);
+ std::vector<rocksdb::ThreadStatus> thread_list;
+ status = options.env->GetThreadList(&thread_list);
+ ASSERT_TRUE(status.ok());
+
+ int num_high_pri_threads = 0;
+ int num_low_pri_threads = 0;
+ for (vector<rocksdb::ThreadStatus>::iterator it = thread_list.begin();
+ it!= thread_list.end();
+ it++) {
+ if (it->thread_type == rocksdb::ThreadStatus::HIGH_PRIORITY)
+ num_high_pri_threads++;
+ if (it->thread_type == rocksdb::ThreadStatus::LOW_PRIORITY)
+ num_low_pri_threads++;
+ }
+ ASSERT_EQ(15, thread_list.size());
+ //low pri threads is compaction_threads
+ ASSERT_EQ(10, num_low_pri_threads);
+ //high pri threads is flusher_threads
+ ASSERT_EQ(5, num_high_pri_threads);
+}
int main(int argc, char **argv) {
+ vector<const char*> args;
+ argv_to_vec(argc, (const char **)argv, args);
+ env_to_vec(args);
+ global_init(NULL, args, CEPH_ENTITY_TYPE_CLIENT, CODE_ENVIRONMENT_UTILITY, 0);
+ common_init_finish(g_ceph_context);
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}