]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
os/RocksDBStore: interpret some configurations.
authorXiaoxi Chen <xiaoxi.chen@intel.com>
Mon, 11 May 2015 15:30:22 +0000 (23:30 +0800)
committerXiaoxi Chen <xiaoxi.chen@intel.com>
Wed, 13 May 2015 00:25:53 +0000 (08:25 +0800)
compact_on_mount,
compaction_threads,
flusher_threads,
disableWAL.

Signed-off-by: Xiaoxi Chen <xiaoxi.chen@intel.com>
src/os/RocksDBStore.cc
src/os/RocksDBStore.h
src/test/objectstore/TestRocksdbOptionParse.cc

index 4c600d8be4b973d36bfed7f8935daa01dfd34c5b..cc06bd36a9a8e3f63d4656b84082b124b2385354 100644 (file)
 #include "rocksdb/utilities/convenience.h"
 using std::string;
 #include "common/perf_counters.h"
+#include "include/str_map.h"
 #include "KeyValueDB.h"
 #include "RocksDBStore.h"
 
+int string2bool(string val, bool &b_val)
+{
+  if (strcasecmp(val.c_str(), "false") == 0) {
+    b_val = false;
+    return 0;
+  } else if (strcasecmp(val.c_str(), "true") == 0) {
+    b_val = true;
+    return 0;
+  } else {
+    std::string err;
+    int b = strict_strtol(val.c_str(), 10, &err);
+    if (!err.empty())
+      return -EINVAL;
+    b_val = !!b;
+    return 0;
+  }
+}
+  
+int RocksDBStore::tryInterpret(const string key, const string val, rocksdb::Options &opt)
+{
+  if (key == "compaction_threads") {
+    std::string err;
+    int f = strict_sistrtoll(val.c_str(), &err);
+    if (!err.empty())
+      return -EINVAL;
+    //Low priority threadpool is used for compaction
+    opt.env->SetBackgroundThreads(f, rocksdb::Env::Priority::LOW);
+  } else if (key == "flusher_threads") {
+    std::string err;
+    int f = strict_sistrtoll(val.c_str(), &err);
+    if (!err.empty())
+      return -EINVAL;
+    //High priority threadpool is used for flusher
+    opt.env->SetBackgroundThreads(f, rocksdb::Env::Priority::HIGH);
+  } else if (key == "compact_on_mount") {
+    int ret = string2bool(val, compact_on_mount);
+    if (ret != 0)
+      return ret;
+  } else if (key == "disableWAL") {
+    int ret = string2bool(val, disableWAL);
+    if (ret != 0)
+      return ret;
+  } else {
+    //unrecognize config options.
+    return -EINVAL;
+  }
+  return 0;
+}
+
+int RocksDBStore::ParseOptionsFromString(const string opt_str, rocksdb::Options &opt)
+{
+  map<string, string> str_map;
+  int r = get_str_map(opt_str, "\n;", &str_map);
+  if (r < 0)
+    return r;
+  map<string, string>::iterator it;
+  for(it = str_map.begin(); it != str_map.end(); it++) {
+    string this_opt = it->first + "=" + it->second;
+    rocksdb::Status status = rocksdb::GetOptionsFromString(opt, this_opt , &opt); 
+    if (!status.ok()) {
+      //unrecognized by rocksdb, try to interpret by ourselves.
+      r = tryInterpret(it->first, it->second, opt);
+      if (r < 0) {
+       derr << status.ToString() << dendl;
+       return -EINVAL;
+      }
+    }
+  }
+  return 0;
+}
 
 int RocksDBStore::init(string _options_str)
 {
   options_str = _options_str;
-  //try parse options
   rocksdb::Options opt;
-  rocksdb::Status status = rocksdb::GetOptionsFromString(opt, options_str, &opt); 
-  if (!status.ok()) {
-    derr << status.ToString() << dendl;
+  //try parse options
+  int r = ParseOptionsFromString(options_str, opt); 
+  if (r != 0) {
     return -EINVAL;
   }
   return 0;
@@ -38,9 +108,9 @@ int RocksDBStore::do_open(ostream &out, bool create_if_missing)
 {
   rocksdb::Options opt;
   rocksdb::Status status;
-  status = rocksdb::GetOptionsFromString(opt, options_str, &opt); 
-  if (!status.ok()) {
-    derr << status.ToString() << dendl;
+
+  int r = ParseOptionsFromString(options_str, opt); 
+  if (r != 0) {
     return -EINVAL;
   }
   opt.create_if_missing = create_if_missing;
@@ -64,11 +134,11 @@ int RocksDBStore::do_open(ostream &out, bool create_if_missing)
   logger = plb.create_perf_counters();
   cct->get_perfcounters_collection()->add(logger);
 
// if (g_conf->rocksdb_compact_on_mount) {
-   // derr << "Compacting rocksdb store..." << dendl;
-    //compact();
-    //derr << "Finished compacting rocksdb store" << dendl;
// }
 if (compact_on_mount) {
+    derr << "Compacting rocksdb store..." << dendl;
+    compact();
+    derr << "Finished compacting rocksdb store" << dendl;
+  }
   return 0;
 }
 
@@ -114,7 +184,7 @@ int RocksDBStore::submit_transaction(KeyValueDB::Transaction t)
   RocksDBTransactionImpl * _t =
     static_cast<RocksDBTransactionImpl *>(t.get());
   rocksdb::WriteOptions woptions;
-  //woptions.disableWAL = options.disableWAL;
+  woptions.disableWAL = disableWAL;
   rocksdb::Status s = db->Write(woptions, _t->bat);
   utime_t lat = ceph_clock_now(g_ceph_context) - start;
   logger->inc(l_rocksdb_txns);
@@ -129,7 +199,7 @@ int RocksDBStore::submit_transaction_sync(KeyValueDB::Transaction t)
     static_cast<RocksDBTransactionImpl *>(t.get());
   rocksdb::WriteOptions woptions;
   woptions.sync = true;
-  //woptions.disableWAL = options.disableWAL;
+  woptions.disableWAL = disableWAL;
   rocksdb::Status s = db->Write(woptions, _t->bat);
   utime_t lat = ceph_clock_now(g_ceph_context) - start;
   logger->inc(l_rocksdb_txns);
index 0204b1a8125a5e0d98c957a59997b102e04ec471..e44977f3e46bbd7dc23cca4dfe4d4b6de6a52a14 100644 (file)
@@ -43,6 +43,7 @@ namespace rocksdb{
   class Slice;
   class WriteBatch;
   class Iterator;
+  struct Options;
 }
 /**
  * Uses RocksDB to implement the KeyValueDB interface
@@ -51,9 +52,8 @@ class RocksDBStore : public KeyValueDB {
   CephContext *cct;
   PerfCounters *logger;
   string path;
-  string options_str;
   rocksdb::DB *db;
-
+  string options_str;
   int do_open(ostream &out, bool create_if_missing);
 
   // manage async compactions
@@ -79,8 +79,12 @@ class RocksDBStore : public KeyValueDB {
 
 public:
   /// compact the underlying rocksdb store
+  bool compact_on_mount;
+  bool disableWAL;
   void compact();
 
+  int tryInterpret(const string key, const string val, rocksdb::Options &opt);
+  int ParseOptionsFromString(const string opt_str, rocksdb::Options &opt);
   static int _test_init(const string& dir);
   int init(string options_str);
   /// compact rocksdb for all keys with a given prefix
@@ -105,7 +109,9 @@ public:
     path(path),
     compact_queue_lock("RocksDBStore::compact_thread_lock"),
     compact_queue_stop(false),
-    compact_thread(this)
+    compact_thread(this),
+    compact_on_mount(false),
+    disableWAL(false)
   {}
 
   ~RocksDBStore();
index de7a7b3142896935d71099d5e49f51e2e9560018..4daf970e670cbf736105cf93fbb1ca86e2527b79 100644 (file)
@@ -1,12 +1,20 @@
 #include <gtest/gtest.h>
+#include "include/Context.h"
+#include "common/ceph_argparse.h"
+#include "global/global_init.h"
 #include "rocksdb/db.h"
-#include "rocksdb/utilities/convenience.h"
+#include "rocksdb/env.h"
+#include "rocksdb/thread_status.h"
+#include "os/RocksDBStore.h"
 #include <iostream>
 using namespace std;
 
+const string dir("store_test_temp_dir");
+
 TEST(RocksDBOption, simple) {
   rocksdb::Options options;
   rocksdb::Status status;
+  RocksDBStore *db = new RocksDBStore(g_ceph_context, dir);
   string options_string = ""
                          "write_buffer_size=536870912;"
                          "create_if_missing=true;"
@@ -20,12 +28,8 @@ TEST(RocksDBOption, simple) {
                          "num_levels = 3;"
                          "compression = kNoCompression;"
                          "disable_data_sync = false;";
-  status = rocksdb::GetOptionsFromString(options, options_string, &options);
-  if (!status.ok()) {
-    cerr << status.ToString() << std::endl;
-    ASSERT_TRUE(status.ok());
-  }
-  ASSERT_TRUE(status.ok());
+  int r = db->ParseOptionsFromString(options_string, options);
+  ASSERT_EQ(0, r);
   ASSERT_EQ(536870912, options.write_buffer_size);
   ASSERT_EQ(4, options.max_write_buffer_number);
   ASSERT_EQ(4, options.max_background_compactions);
@@ -38,8 +42,44 @@ TEST(RocksDBOption, simple) {
   ASSERT_FALSE(options.disableDataSync);
  // ASSERT_EQ("none", options.compression);
 }
+TEST(RocksDBOption, interpret) {
+  rocksdb::Options options;
+  rocksdb::Status status;
+  RocksDBStore *db = new RocksDBStore(g_ceph_context, dir);
+  string options_string = "compact_on_mount = true; compaction_threads=10;flusher_threads=5;";
+  
+  int r = db->ParseOptionsFromString(options_string, options);
+  ASSERT_EQ(0, r);
+  ASSERT_TRUE(db->compact_on_mount);
+  //check thread pool setting
+  options.env->SleepForMicroseconds(100000);
+  std::vector<rocksdb::ThreadStatus> thread_list;
+  status = options.env->GetThreadList(&thread_list);
+  ASSERT_TRUE(status.ok());
+
+  int num_high_pri_threads = 0;
+  int num_low_pri_threads = 0;
+  for (vector<rocksdb::ThreadStatus>::iterator it = thread_list.begin();
+       it!= thread_list.end();
+       it++) {
+    if (it->thread_type == rocksdb::ThreadStatus::HIGH_PRIORITY)
+      num_high_pri_threads++;
+    if (it->thread_type == rocksdb::ThreadStatus::LOW_PRIORITY)
+      num_low_pri_threads++;
+  }
+  ASSERT_EQ(15, thread_list.size());
+  //low pri threads is compaction_threads
+  ASSERT_EQ(10, num_low_pri_threads);
+  //high pri threads is flusher_threads
+  ASSERT_EQ(5, num_high_pri_threads);
+}
 
 int main(int argc, char **argv) {
+  vector<const char*> args;
+  argv_to_vec(argc, (const char **)argv, args);
+  env_to_vec(args);
+  global_init(NULL, args, CEPH_ENTITY_TYPE_CLIENT, CODE_ENVIRONMENT_UTILITY, 0);
+  common_init_finish(g_ceph_context);
   ::testing::InitGoogleTest(&argc, argv);
   return RUN_ALL_TESTS();
 }