#include "rocksdb/slice.h"
#include "rocksdb/cache.h"
#include "rocksdb/filter_policy.h"
-
+#include "rocksdb/utilities/convenience.h"
using std::string;
#include "common/perf_counters.h"
#include "KeyValueDB.h"
#include "RocksDBStore.h"
-int RocksDBStore::init()
-{
- options.write_buffer_size = g_conf->rocksdb_write_buffer_size;
- options.write_buffer_num = g_conf->rocksdb_write_buffer_num;
- options.min_write_buffer_number_to_merge = g_conf->rocksdb_min_write_buffer_number_to_merge;
-
- options.level0_file_num_compaction_trigger = g_conf->rocksdb_level0_file_num_compaction_trigger;
- options.level0_slowdown_writes_trigger = g_conf->rocksdb_level0_slowdown_writes_trigger;
- options.level0_stop_writes_trigger = g_conf->rocksdb_level0_stop_writes_trigger;
-
- options.max_bytes_for_level_base = g_conf->rocksdb_max_bytes_for_level_base;
- options.max_bytes_for_level_multiplier = g_conf->rocksdb_max_bytes_for_level_multiplier;
- options.target_file_size_base = g_conf->rocksdb_target_file_size_base;
- options.target_file_size_multiplier = g_conf->rocksdb_target_file_size_multiplier;
- options.num_levels = g_conf->rocksdb_num_levels;
- options.cache_size = g_conf->rocksdb_cache_size;
- options.block_size = g_conf->rocksdb_block_size;
- options.bloom_bits_per_key = g_conf->rocksdb_bloom_bits_per_key;
-
- options.max_background_compactions = g_conf->rocksdb_max_background_compactions;
- options.compaction_threads = g_conf->rocksdb_compaction_threads;
- options.max_background_flushes = g_conf->rocksdb_max_background_flushes;
- options.flusher_threads = g_conf->rocksdb_flusher_threads;
-
- options.max_open_files = g_conf->rocksdb_max_open_files;
- options.compression_type = g_conf->rocksdb_compression;
- options.paranoid_checks = g_conf->rocksdb_paranoid;
- options.log_file = g_conf->rocksdb_log;
- options.info_log_level = g_conf->rocksdb_info_log_level;
- options.wal_dir = g_conf->rocksdb_wal_dir;
- options.disableDataSync = g_conf->rocksdb_disableDataSync;
- options.disableWAL = g_conf->rocksdb_disableWAL;
+int RocksDBStore::init(string _options_str)
+{
+ options_str = _options_str;
+ //try parse options
+ rocksdb::Options opt;
+ rocksdb::Status status = rocksdb::GetOptionsFromString(opt, options_str, &opt);
+ if (!status.ok()) {
+ derr << status.ToString() << dendl;
+ return -EINVAL;
+ }
return 0;
}
int RocksDBStore::do_open(ostream &out, bool create_if_missing)
{
- rocksdb::Options ldoptions;
- rocksdb::BlockBasedTableOptions table_options;
- auto env = rocksdb::Env::Default();
-
- ldoptions.write_buffer_size = options.write_buffer_size;
- ldoptions.max_write_buffer_number = options.write_buffer_num;
- ldoptions.min_write_buffer_number_to_merge = options.min_write_buffer_number_to_merge;
-
- ldoptions.level0_file_num_compaction_trigger = options.level0_file_num_compaction_trigger;
- if(options.level0_slowdown_writes_trigger >= 0)
- ldoptions.level0_slowdown_writes_trigger = options.level0_slowdown_writes_trigger;
- if(options.level0_stop_writes_trigger >= 0)
- ldoptions.level0_stop_writes_trigger = options.level0_stop_writes_trigger;
-
- ldoptions.max_bytes_for_level_base = options.max_bytes_for_level_base;
- ldoptions.max_bytes_for_level_multiplier = options.max_bytes_for_level_multiplier;
- ldoptions.target_file_size_base = options.target_file_size_base;
- ldoptions.target_file_size_multiplier = options.target_file_size_multiplier;
- ldoptions.num_levels = options.num_levels;
- if (options.cache_size) {
- table_options.block_cache = rocksdb::NewLRUCache(options.cache_size);
- }
- table_options.block_size = options.block_size;
- table_options.filter_policy.reset(rocksdb::NewBloomFilterPolicy(options.bloom_bits_per_key, true));
-
- ldoptions.max_background_compactions = options.max_background_compactions;
- ldoptions.max_background_flushes = options.max_background_flushes;
- //High priority threadpool is used for flusher
- env->SetBackgroundThreads(options.flusher_threads, rocksdb::Env::Priority::HIGH);
- //Low priority threadpool is used for compaction
- env->SetBackgroundThreads(options.compaction_threads, rocksdb::Env::Priority::LOW);
-
- ldoptions.max_open_files = options.max_open_files;
- if (options.compression_type.length() == 0)
- ldoptions.compression = rocksdb::kNoCompression;
- else if(options.compression_type == "snappy")
- ldoptions.compression = rocksdb::kSnappyCompression;
- else if(options.compression_type == "zlib")
- ldoptions.compression = rocksdb::kZlibCompression;
- else if(options.compression_type == "bzip2")
- ldoptions.compression = rocksdb::kBZip2Compression;
- else
- ldoptions.compression = rocksdb::kNoCompression;
-
- if(options.disableDataSync) {
- derr << "Warning: DataSync is disabled, may lose data on node failure" << dendl;
- ldoptions.disableDataSync = options.disableDataSync;
- }
-
- if(options.disableWAL) {
- derr << "Warning: Write Ahead Log is disabled, may lose data on failure" << dendl;
- }
- if(options.wal_dir.length())
- ldoptions.wal_dir = options.wal_dir;
-
- if (options.block_restart_interval)
- table_options.block_restart_interval = options.block_restart_interval;
-
- ldoptions.error_if_exists = options.error_if_exists;
- ldoptions.paranoid_checks = options.paranoid_checks;
- ldoptions.create_if_missing = create_if_missing;
- if (options.log_file.length()) {
- env->NewLogger(options.log_file, &ldoptions.info_log);
- ldoptions.info_log->SetInfoLogLevel((rocksdb::InfoLogLevel)get_info_log_level(options.info_log_level));
- } else {
- ldoptions.info_log_level = (rocksdb::InfoLogLevel)get_info_log_level(options.info_log_level);
+ rocksdb::Options opt;
+ rocksdb::Status status;
+ status = rocksdb::GetOptionsFromString(opt, options_str, &opt);
+ if (!status.ok()) {
+ derr << status.ToString() << dendl;
+ return -EINVAL;
}
+ opt.create_if_missing = create_if_missing;
- //apply table_options
- ldoptions.table_factory.reset(NewBlockBasedTableFactory(table_options));
-
- //apply env setting
- ldoptions.env = env;
- rocksdb::Status status = rocksdb::DB::Open(ldoptions, path, &db);
+ status = rocksdb::DB::Open(opt, path, &db);
if (!status.ok()) {
- out << status.ToString() << std::endl;
+ derr << status.ToString() << dendl;
return -EINVAL;
}
// Ensure db is destroyed before dependent db_cache and filterpolicy
delete db;
- delete filterpolicy;
}
void RocksDBStore::close()
RocksDBTransactionImpl * _t =
static_cast<RocksDBTransactionImpl *>(t.get());
rocksdb::WriteOptions woptions;
- woptions.disableWAL = options.disableWAL;
+ //woptions.disableWAL = options.disableWAL;
rocksdb::Status s = db->Write(woptions, _t->bat);
utime_t lat = ceph_clock_now(g_ceph_context) - start;
logger->inc(l_rocksdb_txns);
static_cast<RocksDBTransactionImpl *>(t.get());
rocksdb::WriteOptions woptions;
woptions.sync = true;
- woptions.disableWAL = options.disableWAL;
+ //woptions.disableWAL = options.disableWAL;
rocksdb::Status s = db->Write(woptions, _t->bat);
utime_t lat = ceph_clock_now(g_ceph_context) - start;
logger->inc(l_rocksdb_txns);
#include "common/Formatter.h"
#include "common/ceph_context.h"
-
class PerfCounters;
enum {
class WriteBatch;
class Iterator;
}
-
/**
* Uses RocksDB to implement the KeyValueDB interface
*/
CephContext *cct;
PerfCounters *logger;
string path;
- const rocksdb::FilterPolicy *filterpolicy;
+ string options_str;
rocksdb::DB *db;
int do_open(ostream &out, bool create_if_missing);
void compact();
static int _test_init(const string& dir);
- int init();
+ int init(string options_str);
/// compact rocksdb for all keys with a given prefix
void compact_prefix(const string& prefix) {
compact_range(prefix, past_prefix(prefix));
}
int get_info_log_level(string info_log_level);
- /**
- * options_t: Holds options which are minimally interpreted
- * on initialization and then passed through to RocksDB.
- * We transform a couple of these into actual RocksDB
- * structures, but the rest are simply passed through unchanged. See
- * rocksdb/options.h for more precise details on each.
- *
- * Set them after constructing the RocksDBStore, but before calling
- * open() or create_and_open().
- */
- struct options_t {
- uint64_t write_buffer_size; /// in-memory write buffer size
- int write_buffer_num; /// in-memory write buffer number
- int min_write_buffer_number_to_merge;
-
- int level0_file_num_compaction_trigger;
- int level0_slowdown_writes_trigger;
- int level0_stop_writes_trigger;
-
- uint64_t max_bytes_for_level_base;
- int max_bytes_for_level_multiplier;
- uint64_t target_file_size_base;
- int target_file_size_multiplier;
- int num_levels;
- uint64_t cache_size; /// size of extra decompressed cache to use
- uint64_t block_size; /// user data per block
- int bloom_bits_per_key; /// number of bits per entry to put in a bloom filter
-
- int max_background_compactions;
- int compaction_threads;
- int max_background_flushes;
- int flusher_threads;
-
- uint64_t max_open_files;
- string compression_type; /// whether to use libsnappy compression or not
- bool paranoid_checks;
- string log_file;
- string wal_dir;
- string info_log_level;
- bool disableDataSync;
- bool disableWAL;
-
- int block_restart_interval;
- bool error_if_exists;
-
- options_t() :
- write_buffer_size(0),
- write_buffer_num(0),
- min_write_buffer_number_to_merge(0),
- level0_file_num_compaction_trigger(0),
- level0_slowdown_writes_trigger(-1),
- level0_stop_writes_trigger(-1),
- max_bytes_for_level_base(0),
- max_bytes_for_level_multiplier(0),
- target_file_size_base(0),
- target_file_size_multiplier(0),
- num_levels(0),
- cache_size(0), /// size of extra decompressed cache to use
- block_size(0), /// user data per block
- bloom_bits_per_key(0), /// number of bits per entry to put in a bloom filter
- max_background_compactions(0),
- compaction_threads(0),
- max_background_flushes(0),
- flusher_threads(0),
-
- max_open_files(0),
- compression_type("none"),
- paranoid_checks(false), //< set to true if you want paranoid checks
- info_log_level("info"),
- disableDataSync(false),
- disableWAL(false),
-
- block_restart_interval(0), //< 0 means default
- error_if_exists(false) //< set to true if you want to check nonexistence
- {}
- } options;
-
RocksDBStore(CephContext *c, const string &path) :
cct(c),
logger(NULL),
path(path),
compact_queue_lock("RocksDBStore::compact_thread_lock"),
compact_queue_stop(false),
- compact_thread(this),
- options()
+ compact_thread(this)
{}
~RocksDBStore();