From cbd0043ca7549ab7260151a4458752265e70e3ee Mon Sep 17 00:00:00 2001 From: xinxinsh Date: Tue, 18 Feb 2014 09:27:44 +0800 Subject: [PATCH] add rocksdb support Signed-off-by: xinxin shu --- configure.ac | 25 +++ src/Makefile-env.am | 4 + src/common/config_opts.h | 10 + src/os/KeyValueDB.cc | 13 ++ src/os/Makefile.am | 7 + src/os/RocksDBStore.cc | 458 +++++++++++++++++++++++++++++++++++++++ src/os/RocksDBStore.h | 310 ++++++++++++++++++++++++++ 7 files changed, 827 insertions(+) create mode 100644 src/os/RocksDBStore.cc create mode 100644 src/os/RocksDBStore.h diff --git a/configure.ac b/configure.ac index 9506b427a3599..a77e73ae32feb 100644 --- a/configure.ac +++ b/configure.ac @@ -529,6 +529,31 @@ AS_IF([test "x$with_kinetic" = "xyes"], [AC_DEFINE([HAVE_KINETIC], [1], [Defined if you have kinetic enabled])]) AM_CONDITIONAL(WITH_KINETIC, [ test "$with_kinetic" = "yes" ]) +# use rocksdb +AC_ARG_WITH([librocksdb], + [AS_HELP_STRING([--with-librocksdb], [build rocksdb support])], + [], + [with_librocksdb=no]) +AS_IF([test "x$with_librocksdb" = "xyes"], + [PKG_CHECK_MODULES([LIBROCKSDB], [rocksdb], [], [true])]) +AS_IF([test "x$with_librocksdb" = "xyes"], + [AC_DEFINE([HAVE_LIBROCKSDB], [1], [Defined if you have librocksdb enabled])]) +AM_CONDITIONAL(WITH_LIBROCKSDB, [ test "$with_librocksdb" = "yes" ]) + +#add rocksdb support +# check libz +AS_IF([test "x$with_librocksdb" = "xyes"], + [AC_CHECK_LIB([z], [gzread], [], [AC_MSG_FAILURE([libz not found])])]) +# check libbz2 +AS_IF([test "x$with_librocksdb" = "xyes"], + [AC_CHECK_LIB([bz2], [BZ2_bzCompressInit], [], [AC_MSG_FAILURE([libbz2 not found])])]) +# check librt +AS_IF([test "x$with_librocksdb" = "xyes"], + [AC_CHECK_LIB([rt], [clock_gettime], [], [AC_MSG_FAILURE([librt not found])])]) +# use system rocksdb +#AC_CHECK_LIB([rocksdb], [rocksdb_open], [], [AC_MSG_FAILURE([librocks not found])],[-lsnappy -lpthread -lrt -lbz2 -lz]) +#AC_CHECK_LIB([rocksdb], [open], [], [AC_MSG_FAILURE([librocks not found])],[-lsnappy -lpthread -lrt -lbz2 -lz]) + # use system libs3? AC_ARG_WITH([system-libs3], [AS_HELP_STRING([--with-system-libs3], [use system libs3])], diff --git a/src/Makefile-env.am b/src/Makefile-env.am index 3bf1b6d937370..f866e6fa4ee9e 100644 --- a/src/Makefile-env.am +++ b/src/Makefile-env.am @@ -165,6 +165,10 @@ if WITH_LIBZFS LIBOS += libos_zfs.a -lzfs endif # WITH_LIBZFS +if WITH_LIBROCKSDB +LIBOS += libos_rocksdb.la -lrocksdb +endif # WITH_LIBROCKSDB + if WITH_TCMALLOC LIBPERFGLUE += -ltcmalloc endif # WITH_TCMALLOC diff --git a/src/common/config_opts.h b/src/common/config_opts.h index 6c034a54a4b30..7aeb0d6258368 100644 --- a/src/common/config_opts.h +++ b/src/common/config_opts.h @@ -576,6 +576,16 @@ OPTION(kinetic_user_id, OPT_INT, 1) // kinetic user to authenticate as OPTION(kinetic_hmac_key, OPT_STR, "asdfasdf") // kinetic key to authenticate with OPTION(kinetic_use_ssl, OPT_BOOL, false) // whether to secure kinetic traffic with TLS +OPTION(rocksdb_compact_on_mount, OPT_BOOL, false) +OPTION(rocksdb_write_buffer_size, OPT_U64, 0) // rocksdb write buffer size +OPTION(rocksdb_cache_size, OPT_U64, 0) // rocksdb cache size +OPTION(rocksdb_block_size, OPT_U64, 0) // rocksdb block size +OPTION(rocksdb_bloom_size, OPT_INT, 0) // rocksdb bloom bits per entry +OPTION(rocksdb_max_open_files, OPT_INT, 0) // rocksdb max open files +OPTION(rocksdb_compression, OPT_STR, "") // rocksdb uses compression : none, snappy, zlib, bzip2 +OPTION(rocksdb_paranoid, OPT_BOOL, false) // rocksdb paranoid flag +OPTION(rocksdb_log, OPT_STR, "") // enable rocksdb log file + /** * osd_client_op_priority and osd_recovery_op_priority adjust the relative * priority of client io vs recovery io. diff --git a/src/os/KeyValueDB.cc b/src/os/KeyValueDB.cc index 8e590e2a31e8b..e73932933329b 100644 --- a/src/os/KeyValueDB.cc +++ b/src/os/KeyValueDB.cc @@ -3,6 +3,9 @@ #include "KeyValueDB.h" #include "LevelDBStore.h" +#ifdef HAVE_LIBROCKSDB +#include "RocksDBStore.h" +#endif KeyValueDB *KeyValueDB::create(CephContext *cct, const string& type, const string& dir) @@ -14,6 +17,11 @@ KeyValueDB *KeyValueDB::create(CephContext *cct, const string& type, if (kv_type == KV_TYPE_KINETIC) { store = new KineticStore(g_ceph_context); } +#endif +#ifdef HAVE_LIBROCKSDB + if (type == "rocksdb") { + return new RocksDBStore(cct, dir); + } #endif return NULL; } @@ -27,6 +35,11 @@ int KeyValueDB::test_init(const string& type, const string& dir) if (kv_type == KV_TYPE_KINETIC) { return 0; } +#endif +#ifdef HAVE_LIBROCKSDB + if (type == "rocksdb"){ + return RocksDBStore::_test_init(dir); + } #endif return -EINVAL; } diff --git a/src/os/Makefile.am b/src/os/Makefile.am index 07b0e6f0ecf2b..6f6e85c3e1096 100644 --- a/src/os/Makefile.am +++ b/src/os/Makefile.am @@ -21,6 +21,7 @@ libos_la_SOURCES = \ os/KeyValueStore.cc \ os/ObjectStore.cc \ os/WBThrottle.cc \ + os/KeyValueDB.cc \ common/TrackedOp.cc if LINUX @@ -67,6 +68,12 @@ noinst_HEADERS += \ os/XfsFileStoreBackend.h \ os/ZFSFileStoreBackend.h +if WITH_LIBROCKSDB +libos_rocksdb_la_SOURCES = os/RocksDBStore.cc +libos_rocksdb_la_CXXFLAGS = ${AM_CXXFLAGS} ${LIBROCKSDB_CFLAGS} -std=gnu++11 +noinst_LTLIBRARIES += libos_rocksdb.la +noinst_HEADERS += os/RocksDBStore.h +endif if WITH_LIBZFS libos_zfs_a_SOURCES = os/ZFS.cc libos_zfs_a_CXXFLAGS = ${AM_CXXFLAGS} ${LIBZFS_CFLAGS} diff --git a/src/os/RocksDBStore.cc b/src/os/RocksDBStore.cc new file mode 100644 index 0000000000000..5cb73e4766225 --- /dev/null +++ b/src/os/RocksDBStore.cc @@ -0,0 +1,458 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include +#include +#include +#include +#include + +#include "rocksdb/db.h" +#include "rocksdb/env.h" +#include "rocksdb/write_batch.h" +#include "rocksdb/slice.h" +#include "rocksdb/cache.h" +#include "rocksdb/filter_policy.h" + +using std::string; +#include "common/perf_counters.h" +#include "KeyValueDB.h" +#include "RocksDBStore.h" + + +int RocksDBStore::init() +{ + options.write_buffer_size = g_conf->rocksdb_write_buffer_size; + options.cache_size = g_conf->rocksdb_cache_size; + options.block_size = g_conf->rocksdb_block_size; + options.bloom_size = g_conf->rocksdb_bloom_size; + options.compression_type = g_conf->rocksdb_compression; + options.paranoid_checks = g_conf->rocksdb_paranoid; + options.max_open_files = g_conf->rocksdb_max_open_files; + options.log_file = g_conf->rocksdb_log; + return 0; +} + +int RocksDBStore::do_open(ostream &out, bool create_if_missing) +{ + rocksdb::Options ldoptions; + + if (options.write_buffer_size) + ldoptions.write_buffer_size = options.write_buffer_size; + if (options.max_open_files) + ldoptions.max_open_files = options.max_open_files; + if (options.cache_size) { + ldoptions.block_cache = rocksdb::NewLRUCache(options.cache_size); + } + if (options.block_size) + ldoptions.block_size = options.block_size; + if (options.bloom_size) { + const rocksdb::FilterPolicy *_filterpolicy = + rocksdb::NewBloomFilterPolicy(options.bloom_size); + ldoptions.filter_policy = _filterpolicy; + filterpolicy = _filterpolicy; + } + if (options.compression_type.length() == 0) + ldoptions.compression = rocksdb::kNoCompression; + else if(options.compression_type == "snappy") + ldoptions.compression = rocksdb::kSnappyCompression; + else if(options.compression_type == "zlib") + ldoptions.compression = rocksdb::kZlibCompression; + else if(options.compression_type == "bzip2") + ldoptions.compression = rocksdb::kBZip2Compression; + else + ldoptions.compression = rocksdb::kNoCompression; + if (options.block_restart_interval) + ldoptions.block_restart_interval = options.block_restart_interval; + + ldoptions.error_if_exists = options.error_if_exists; + ldoptions.paranoid_checks = options.paranoid_checks; + ldoptions.create_if_missing = create_if_missing; + + if (options.log_file.length()) { + rocksdb::Env *env = rocksdb::Env::Default(); + env->NewLogger(options.log_file, &ldoptions.info_log); + } + + //rocksdb::DB *_db; + rocksdb::Status status = rocksdb::DB::Open(ldoptions, path, &db); + if (!status.ok()) { + out << status.ToString() << std::endl; + return -EINVAL; + } + //db.reset(_db); + + if (g_conf->rocksdb_compact_on_mount) { + derr << "Compacting rocksdb store..." << dendl; + compact(); + derr << "Finished compacting rocksdb store" << dendl; + } + + + PerfCountersBuilder plb(g_ceph_context, "rocksdb", l_rocksdb_first, l_rocksdb_last); + plb.add_u64_counter(l_rocksdb_gets, "rocksdb_get"); + plb.add_u64_counter(l_rocksdb_txns, "rocksdb_transaction"); + plb.add_u64_counter(l_rocksdb_compact, "rocksdb_compact"); + plb.add_u64_counter(l_rocksdb_compact_range, "rocksdb_compact_range"); + plb.add_u64_counter(l_rocksdb_compact_queue_merge, "rocksdb_compact_queue_merge"); + plb.add_u64(l_rocksdb_compact_queue_len, "rocksdb_compact_queue_len"); + logger = plb.create_perf_counters(); + cct->get_perfcounters_collection()->add(logger); + return 0; +} + +int RocksDBStore::_test_init(const string& dir) +{ + rocksdb::Options options; + options.create_if_missing = true; + rocksdb::DB *db; + rocksdb::Status status = rocksdb::DB::Open(options, dir, &db); + delete db; + return status.ok() ? 0 : -EIO; +} + +RocksDBStore::~RocksDBStore() +{ + close(); + delete logger; + + // Ensure db is destroyed before dependent db_cache and filterpolicy + delete db; +} + +void RocksDBStore::close() +{ + // stop compaction thread + compact_queue_lock.Lock(); + if (compact_thread.is_started()) { + compact_queue_stop = true; + compact_queue_cond.Signal(); + compact_queue_lock.Unlock(); + compact_thread.join(); + } else { + compact_queue_lock.Unlock(); + } + + if (logger) + cct->get_perfcounters_collection()->remove(logger); +} + +int RocksDBStore::submit_transaction(KeyValueDB::Transaction t) +{ + RocksDBTransactionImpl * _t = + static_cast(t.get()); + rocksdb::Status s = db->Write(rocksdb::WriteOptions(), _t->bat); + logger->inc(l_rocksdb_txns); + return s.ok() ? 0 : -1; +} + +int RocksDBStore::submit_transaction_sync(KeyValueDB::Transaction t) +{ + RocksDBTransactionImpl * _t = + static_cast(t.get()); + rocksdb::WriteOptions options; + options.sync = true; + rocksdb::Status s = db->Write(options, _t->bat); + logger->inc(l_rocksdb_txns); + return s.ok() ? 0 : -1; +} + +RocksDBStore::RocksDBTransactionImpl::RocksDBTransactionImpl(RocksDBStore *_db) +{ + db = _db; + bat = new rocksdb::WriteBatch(); +} +RocksDBStore::RocksDBTransactionImpl::~RocksDBTransactionImpl() +{ + delete bat; +} +void RocksDBStore::RocksDBTransactionImpl::set( + const string &prefix, + const string &k, + const bufferlist &to_set_bl) +{ + buffers.push_back(to_set_bl); + buffers.rbegin()->rebuild(); + bufferlist &bl = *(buffers.rbegin()); + string key = combine_strings(prefix, k); + keys.push_back(key); + bat->Delete(rocksdb::Slice(*(keys.rbegin()))); + bat->Put(rocksdb::Slice(*(keys.rbegin())), + rocksdb::Slice(bl.c_str(), bl.length())); +} + +void RocksDBStore::RocksDBTransactionImpl::rmkey(const string &prefix, + const string &k) +{ + string key = combine_strings(prefix, k); + keys.push_back(key); + bat->Delete(rocksdb::Slice(*(keys.rbegin()))); +} + +void RocksDBStore::RocksDBTransactionImpl::rmkeys_by_prefix(const string &prefix) +{ + KeyValueDB::Iterator it = db->get_iterator(prefix); + for (it->seek_to_first(); + it->valid(); + it->next()) { + string key = combine_strings(prefix, it->key()); + keys.push_back(key); + bat->Delete(*(keys.rbegin())); + } +} + +int RocksDBStore::get( + const string &prefix, + const std::set &keys, + std::map *out) +{ + KeyValueDB::Iterator it = get_iterator(prefix); + for (std::set::const_iterator i = keys.begin(); + i != keys.end(); + ++i) { + it->lower_bound(*i); + if (it->valid() && it->key() == *i) { + out->insert(make_pair(*i, it->value())); + } else if (!it->valid()) + break; + } + logger->inc(l_rocksdb_gets); + return 0; +} + +string RocksDBStore::combine_strings(const string &prefix, const string &value) +{ + string out = prefix; + out.push_back(0); + out.append(value); + return out; +} + +bufferlist RocksDBStore::to_bufferlist(rocksdb::Slice in) +{ + bufferlist bl; + bl.append(bufferptr(in.data(), in.size())); + return bl; +} + +int RocksDBStore::split_key(rocksdb::Slice in, string *prefix, string *key) +{ + string in_prefix = in.ToString(); + size_t prefix_len = in_prefix.find('\0'); + if (prefix_len >= in_prefix.size()) + return -EINVAL; + + if (prefix) + *prefix = string(in_prefix, 0, prefix_len); + if (key) + *key= string(in_prefix, prefix_len + 1); + return 0; +} + +void RocksDBStore::compact() +{ + logger->inc(l_rocksdb_compact); + db->CompactRange(NULL, NULL); +} + + +void RocksDBStore::compact_thread_entry() +{ + compact_queue_lock.Lock(); + while (!compact_queue_stop) { + while (!compact_queue.empty()) { + pair range = compact_queue.front(); + compact_queue.pop_front(); + logger->set(l_rocksdb_compact_queue_len, compact_queue.size()); + compact_queue_lock.Unlock(); + logger->inc(l_rocksdb_compact_range); + compact_range(range.first, range.second); + compact_queue_lock.Lock(); + continue; + } + compact_queue_cond.Wait(compact_queue_lock); + } + compact_queue_lock.Unlock(); +} + +void RocksDBStore::compact_range_async(const string& start, const string& end) +{ + Mutex::Locker l(compact_queue_lock); + + // try to merge adjacent ranges. this is O(n), but the queue should + // be short. note that we do not cover all overlap cases and merge + // opportunities here, but we capture the ones we currently need. + list< pair >::iterator p = compact_queue.begin(); + while (p != compact_queue.end()) { + if (p->first == start && p->second == end) { + // dup; no-op + return; + } + if (p->first <= end && p->first > start) { + // merge with existing range to the right + compact_queue.push_back(make_pair(start, p->second)); + compact_queue.erase(p); + logger->inc(l_rocksdb_compact_queue_merge); + break; + } + if (p->second >= start && p->second < end) { + // merge with existing range to the left + compact_queue.push_back(make_pair(p->first, end)); + compact_queue.erase(p); + logger->inc(l_rocksdb_compact_queue_merge); + break; + } + ++p; + } + if (p == compact_queue.end()) { + // no merge, new entry. + compact_queue.push_back(make_pair(start, end)); + logger->set(l_rocksdb_compact_queue_len, compact_queue.size()); + } + compact_queue_cond.Signal(); + if (!compact_thread.is_started()) { + compact_thread.create(); + } +} +bool RocksDBStore::check_omap_dir(string &omap_dir) +{ + rocksdb::Options options; + options.create_if_missing = true; + rocksdb::DB *db; + rocksdb::Status status = rocksdb::DB::Open(options, omap_dir, &db); + delete db; + return status.ok(); +} +void RocksDBStore::compact_range(const string& start, const string& end) +{ + rocksdb::Slice cstart(start); + rocksdb::Slice cend(end); + db->CompactRange(&cstart, &cend); +} +RocksDBStore::RocksDBWholeSpaceIteratorImpl::~RocksDBWholeSpaceIteratorImpl() +{ + delete dbiter; +} +int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_first() +{ + dbiter->SeekToFirst(); + return dbiter->status().ok() ? 0 : -1; +} +int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_first(const string &prefix) +{ + rocksdb::Slice slice_prefix(prefix); + dbiter->Seek(slice_prefix); + return dbiter->status().ok() ? 0 : -1; +} +int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_last() +{ + dbiter->SeekToLast(); + return dbiter->status().ok() ? 0 : -1; +} +int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_last(const string &prefix) +{ + string limit = past_prefix(prefix); + rocksdb::Slice slice_limit(limit); + dbiter->Seek(slice_limit); + + if (!dbiter->Valid()) { + dbiter->SeekToLast(); + } else { + dbiter->Prev(); + } + return dbiter->status().ok() ? 0 : -1; +} +int RocksDBStore::RocksDBWholeSpaceIteratorImpl::upper_bound(const string &prefix, const string &after) +{ + lower_bound(prefix, after); + if (valid()) { + pair key = raw_key(); + if (key.first == prefix && key.second == after) + next(); + } + return dbiter->status().ok() ? 0 : -1; +} +int RocksDBStore::RocksDBWholeSpaceIteratorImpl::lower_bound(const string &prefix, const string &to) +{ + string bound = combine_strings(prefix, to); + rocksdb::Slice slice_bound(bound); + dbiter->Seek(slice_bound); + return dbiter->status().ok() ? 0 : -1; +} +bool RocksDBStore::RocksDBWholeSpaceIteratorImpl::valid() +{ + return dbiter->Valid(); +} +int RocksDBStore::RocksDBWholeSpaceIteratorImpl::next() +{ + if (valid()) + dbiter->Next(); + return dbiter->status().ok() ? 0 : -1; +} +int RocksDBStore::RocksDBWholeSpaceIteratorImpl::prev() +{ + if (valid()) + dbiter->Prev(); + return dbiter->status().ok() ? 0 : -1; +} +string RocksDBStore::RocksDBWholeSpaceIteratorImpl::key() +{ + string out_key; + split_key(dbiter->key(), 0, &out_key); + return out_key; +} +pair RocksDBStore::RocksDBWholeSpaceIteratorImpl::raw_key() +{ + string prefix, key; + split_key(dbiter->key(), &prefix, &key); + return make_pair(prefix, key); +} +bufferlist RocksDBStore::RocksDBWholeSpaceIteratorImpl::value() +{ + return to_bufferlist(dbiter->value()); +} +int RocksDBStore::RocksDBWholeSpaceIteratorImpl::status() +{ + return dbiter->status().ok() ? 0 : -1; +} + +bool RocksDBStore::in_prefix(const string &prefix, rocksdb::Slice key) +{ + return (key.compare(rocksdb::Slice(past_prefix(prefix))) < 0) && + (key.compare(rocksdb::Slice(prefix)) > 0); +} +string RocksDBStore::past_prefix(const string &prefix) +{ + string limit = prefix; + limit.push_back(1); + return limit; +} + + +RocksDBStore::WholeSpaceIterator RocksDBStore::_get_iterator() +{ + return std::tr1::shared_ptr( + new RocksDBWholeSpaceIteratorImpl( + db->NewIterator(rocksdb::ReadOptions()) + ) + ); +} + +RocksDBStore::WholeSpaceIterator RocksDBStore::_get_snapshot_iterator() +{ + const rocksdb::Snapshot *snapshot; + rocksdb::ReadOptions options; + + snapshot = db->GetSnapshot(); + options.snapshot = snapshot; + + return std::tr1::shared_ptr( + new RocksDBSnapshotIteratorImpl(db, snapshot, + db->NewIterator(options)) + ); +} + +RocksDBStore::RocksDBSnapshotIteratorImpl::~RocksDBSnapshotIteratorImpl() +{ + db->ReleaseSnapshot(snapshot); +} diff --git a/src/os/RocksDBStore.h b/src/os/RocksDBStore.h new file mode 100644 index 0000000000000..103f955863dfb --- /dev/null +++ b/src/os/RocksDBStore.h @@ -0,0 +1,310 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +#ifndef ROCKS_DB_STORE_H +#define ROCKS_DB_STORE_H + +#include "include/types.h" +#include "include/buffer.h" +#include "KeyValueDB.h" +#include +#include +#include +#include +#include + +#include +#include "common/errno.h" +#include "common/dout.h" +#include "include/assert.h" +#include "common/Formatter.h" + +#include "common/ceph_context.h" + +class PerfCounters; + +enum { + l_rocksdb_first = 34300, + l_rocksdb_gets, + l_rocksdb_txns, + l_rocksdb_compact, + l_rocksdb_compact_range, + l_rocksdb_compact_queue_merge, + l_rocksdb_compact_queue_len, + l_rocksdb_last, +}; + +namespace rocksdb{ + class DB; + class Cache; + class FilterPolicy; + class Snapshot; + class Slice; + class WriteBatch; + class Iterator; +} + +/** + * Uses RocksDB to implement the KeyValueDB interface + */ +class RocksDBStore : public KeyValueDB { + CephContext *cct; + PerfCounters *logger; + string path; + const rocksdb::FilterPolicy *filterpolicy; + rocksdb::DB *db; + + int do_open(ostream &out, bool create_if_missing); + + // manage async compactions + Mutex compact_queue_lock; + Cond compact_queue_cond; + list< pair > compact_queue; + bool compact_queue_stop; + class CompactThread : public Thread { + RocksDBStore *db; + public: + CompactThread(RocksDBStore *d) : db(d) {} + void *entry() { + db->compact_thread_entry(); + return NULL; + } + friend class RocksDBStore; + } compact_thread; + + void compact_thread_entry(); + + void compact_range(const string& start, const string& end); + void compact_range_async(const string& start, const string& end); + +public: + /// compact the underlying rocksdb store + void compact(); + + static int _test_init(const string& dir); + int init(); + /// compact rocksdb for all keys with a given prefix + void compact_prefix(const string& prefix) { + compact_range(prefix, past_prefix(prefix)); + } + void compact_prefix_async(const string& prefix) { + compact_range_async(prefix, past_prefix(prefix)); + } + + void compact_range(const string& prefix, const string& start, const string& end) { + compact_range(combine_strings(prefix, start), combine_strings(prefix, end)); + } + void compact_range_async(const string& prefix, const string& start, const string& end) { + compact_range_async(combine_strings(prefix, start), combine_strings(prefix, end)); + } + + /** + * options_t: Holds options which are minimally interpreted + * on initialization and then passed through to RocksDB. + * We transform a couple of these into actual RocksDB + * structures, but the rest are simply passed through unchanged. See + * rocksdb/options.h for more precise details on each. + * + * Set them after constructing the RocksDBStore, but before calling + * open() or create_and_open(). + */ + struct options_t { + uint64_t write_buffer_size; /// in-memory write buffer size + int max_open_files; /// maximum number of files RocksDB can open at once + uint64_t cache_size; /// size of extra decompressed cache to use + uint64_t block_size; /// user data per block + int bloom_size; /// number of bits per entry to put in a bloom filter + string compression_type; /// whether to use libsnappy compression or not + + // don't change these ones. No, seriously + int block_restart_interval; + bool error_if_exists; + bool paranoid_checks; + + string log_file; + + options_t() : + write_buffer_size(0), //< 0 means default + max_open_files(0), //< 0 means default + cache_size(0), //< 0 means no cache (default) + block_size(0), //< 0 means default + bloom_size(0), //< 0 means no bloom filter (default) + compression_type("none"), //< set to false for no compression + block_restart_interval(0), //< 0 means default + error_if_exists(false), //< set to true if you want to check nonexistence + paranoid_checks(false) //< set to true if you want paranoid checks + {} + } options; + + RocksDBStore(CephContext *c, const string &path) : + cct(c), + logger(NULL), + path(path), + compact_queue_lock("RocksDBStore::compact_thread_lock"), + compact_queue_stop(false), + compact_thread(this), + options() + {} + + ~RocksDBStore(); + + static bool check_omap_dir(string &omap_dir); + /// Opens underlying db + int open(ostream &out) { + return do_open(out, false); + } + /// Creates underlying db if missing and opens it + int create_and_open(ostream &out) { + return do_open(out, true); + } + + void close(); + + class RocksDBTransactionImpl : public KeyValueDB::TransactionImpl { + public: + rocksdb::WriteBatch *bat; + list buffers; + list keys; + RocksDBStore *db; + + RocksDBTransactionImpl(RocksDBStore *_db); + ~RocksDBTransactionImpl(); + void set( + const string &prefix, + const string &k, + const bufferlist &bl); + void rmkey( + const string &prefix, + const string &k); + void rmkeys_by_prefix( + const string &prefix + ); + }; + + KeyValueDB::Transaction get_transaction() { + return std::tr1::shared_ptr< RocksDBTransactionImpl >( + new RocksDBTransactionImpl(this)); + } + + int submit_transaction(KeyValueDB::Transaction t); + int submit_transaction_sync(KeyValueDB::Transaction t); + int get( + const string &prefix, + const std::set &key, + std::map *out + ); + + class RocksDBWholeSpaceIteratorImpl : + public KeyValueDB::WholeSpaceIteratorImpl { + protected: + rocksdb::Iterator *dbiter; + public: + RocksDBWholeSpaceIteratorImpl(rocksdb::Iterator *iter) : + dbiter(iter) { } + //virtual ~RocksDBWholeSpaceIteratorImpl() { } + ~RocksDBWholeSpaceIteratorImpl(); + + int seek_to_first(); + int seek_to_first(const string &prefix); + int seek_to_last(); + int seek_to_last(const string &prefix); + int upper_bound(const string &prefix, const string &after); + int lower_bound(const string &prefix, const string &to); + bool valid(); + int next(); + int prev(); + string key(); + pair raw_key(); + bufferlist value(); + int status(); + }; + + class RocksDBSnapshotIteratorImpl : public RocksDBWholeSpaceIteratorImpl { + rocksdb::DB *db; + const rocksdb::Snapshot *snapshot; + public: + RocksDBSnapshotIteratorImpl(rocksdb::DB *db, const rocksdb::Snapshot *s, + rocksdb::Iterator *iter) : + RocksDBWholeSpaceIteratorImpl(iter), db(db), snapshot(s) { } + + ~RocksDBSnapshotIteratorImpl(); + }; + + /// Utility + static string combine_strings(const string &prefix, const string &value); + static int split_key(rocksdb::Slice in, string *prefix, string *key); + static bufferlist to_bufferlist(rocksdb::Slice in); + static bool in_prefix(const string &prefix, rocksdb::Slice key); + static string past_prefix(const string &prefix); + + virtual uint64_t get_estimated_size(map &extra) { + DIR *store_dir = opendir(path.c_str()); + if (!store_dir) { + lderr(cct) << __func__ << " something happened opening the store: " + << cpp_strerror(errno) << dendl; + return 0; + } + + uint64_t total_size = 0; + uint64_t sst_size = 0; + uint64_t log_size = 0; + uint64_t misc_size = 0; + + struct dirent *entry = NULL; + while ((entry = readdir(store_dir)) != NULL) { + string n(entry->d_name); + + if (n == "." || n == "..") + continue; + + string fpath = path + '/' + n; + struct stat s; + int err = stat(fpath.c_str(), &s); + if (err < 0) + err = -errno; + // we may race against rocksdb while reading files; this should only + // happen when those files are being updated, data is being shuffled + // and files get removed, in which case there's not much of a problem + // as we'll get to them next time around. + if ((err < 0) && (err != -ENOENT)) { + lderr(cct) << __func__ << " error obtaining stats for " << fpath + << ": " << cpp_strerror(err) << dendl; + goto err; + } + + size_t pos = n.find_last_of('.'); + if (pos == string::npos) { + misc_size += s.st_size; + continue; + } + + string ext = n.substr(pos+1); + if (ext == "sst") { + sst_size += s.st_size; + } else if (ext == "log") { + log_size += s.st_size; + } else { + misc_size += s.st_size; + } + } + + total_size = sst_size + log_size + misc_size; + + extra["sst"] = sst_size; + extra["log"] = log_size; + extra["misc"] = misc_size; + extra["total"] = total_size; + +err: + closedir(store_dir); + return total_size; + } + + +protected: + WholeSpaceIterator _get_iterator(); + + WholeSpaceIterator _get_snapshot_iterator(); + +}; + +#endif -- 2.39.5