endif()
endif()
-option(WITH_LEVELDB "LevelDB is here" ON)
-if(WITH_LEVELDB)
- if(LEVELDB_PREFIX)
- include_directories(SYSTEM ${LEVELDB_PREFIX}/include)
- link_directories(${LEVELDB_PREFIX}/lib)
- endif()
- find_package(leveldb REQUIRED)
- find_file(HAVE_LEVELDB_FILTER_POLICY leveldb/filter_policy.h PATHS ${LEVELDB_INCLUDE_DIR})
-endif(WITH_LEVELDB)
-
find_package(snappy REQUIRED)
option(WITH_BROTLI "Brotli compression support" OFF)
BuildRequires: gperftools-devel >= 2.4
%endif
%endif
-BuildRequires: leveldb-devel > 1.2
BuildRequires: libaio-devel
BuildRequires: libblkid-devel >= 2.17
BuildRequires: cryptsetup-devel
+++ /dev/null
-# - Find LevelDB
-#
-# LEVELDB_INCLUDE_DIR - Where to find leveldb/db.h
-# LEVELDB_LIBRARIES - List of libraries when using LevelDB.
-# LEVELDB_FOUND - True if LevelDB found.
-
-find_path(LEVELDB_INCLUDE_DIR leveldb/db.h
- HINTS $ENV{LEVELDB_ROOT}/include
- DOC "Path in which the file leveldb/db.h is located." )
-
-find_library(LEVELDB_LIBRARIES leveldb
- HINTS $ENV{LEVELDB_ROOT}/lib
- DOC "Path to leveldb library." )
-
-mark_as_advanced(LEVELDB_INCLUDE_DIR LEVELDB_LIBRARIES)
-
-include(FindPackageHandleStandardArgs)
-find_package_handle_standard_args(leveldb DEFAULT_MSG LEVELDB_LIBRARIES LEVELDB_INCLUDE_DIR)
librdmacm-dev,
libkeyutils-dev,
libldap2-dev,
- libleveldb-dev,
liblttng-ust-dev,
liblua5.3-dev,
liblz4-dev (>= 0.0~r131),
/* PMEM_DEVICE (OSD) conditional compilation */
#cmakedefine HAVE_BLUESTORE_PMEM
-/* Defined if LevelDB supports bloom filters */
-#cmakedefine HAVE_LEVELDB_FILTER_POLICY
-
/* Define if you have tcmalloc */
#cmakedefine HAVE_LIBTCMALLOC
#cmakedefine LIBTCMALLOC_MISSING_ALIGNED_ALLOC
/* define if radosgw enabled */
#cmakedefine WITH_RADOSGW
-/* define if leveldb is enabled */
-#cmakedefine WITH_LEVELDB
-
/* define if radosgw has openssl support */
#cmakedefine WITH_CURL_OPENSSL
rocksdb_cache/ShardedCache.cc
rocksdb_cache/BinnedLRUCache.cc)
-if (WITH_LEVELDB)
- list(APPEND kv_srcs LevelDBStore.cc)
-endif (WITH_LEVELDB)
-
add_library(kv STATIC ${kv_srcs}
$<TARGET_OBJECTS:common_prioritycache_obj>)
-target_link_libraries(kv ${LEVELDB_LIBRARIES}
+target_link_libraries(kv
RocksDB::RocksDB
heap_profiler)
// vim: ts=8 sw=2 smarttab
#include "KeyValueDB.h"
-#ifdef WITH_LEVELDB
-#include "LevelDBStore.h"
-#endif
#include "MemDB.h"
#include "RocksDBStore.h"
map<string,string> options,
void *p)
{
-#ifdef WITH_LEVELDB
- if (type == "leveldb") {
- return new LevelDBStore(cct, dir);
- }
-#endif
if (type == "rocksdb") {
return new RocksDBStore(cct, dir, options, p);
}
int KeyValueDB::test_init(const string& type, const string& dir)
{
-#ifdef WITH_LEVELDB
- if (type == "leveldb") {
- return LevelDBStore::_test_init(dir);
- }
-#endif
if (type == "rocksdb") {
return RocksDBStore::_test_init(dir);
}
/**
* Defines virtual interface to be implemented by key value store
*
- * Kyoto Cabinet or LevelDB should implement this
+ * Kyoto Cabinet should implement this
*/
class KeyValueDB {
public:
virtual void close() { }
- /// Try to repair K/V database. leveldb and rocksdb require that database must be not opened.
+ /// Try to repair K/V database. rocksdb requires that database must be not opened.
virtual int repair(std::ostream &out) { return 0; }
virtual Transaction get_transaction() = 0;
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-#include "LevelDBStore.h"
-
-#include <set>
-#include <map>
-#include <string>
-#include <cerrno>
-
-#include "common/debug.h"
-#include "common/perf_counters.h"
-
-// re-include our assert to clobber the system one; fix dout:
-#include "include/ceph_assert.h"
-
-#define dout_context cct
-#define dout_subsys ceph_subsys_leveldb
-#undef dout_prefix
-#define dout_prefix *_dout << "leveldb: "
-
-using std::list;
-using std::string;
-using std::ostream;
-using std::pair;
-using std::vector;
-
-using ceph::bufferlist;
-using ceph::bufferptr;
-
-class CephLevelDBLogger : public leveldb::Logger {
- CephContext *cct;
-public:
- explicit CephLevelDBLogger(CephContext *c) : cct(c) {
- cct->get();
- }
- ~CephLevelDBLogger() override {
- cct->put();
- }
-
- // Write an entry to the log file with the specified format.
- void Logv(const char* format, va_list ap) override {
- dout(1);
- char buf[65536];
- vsnprintf(buf, sizeof(buf), format, ap);
- *_dout << buf << dendl;
- }
-};
-
-leveldb::Logger *create_leveldb_ceph_logger()
-{
- return new CephLevelDBLogger(g_ceph_context);
-}
-
-int LevelDBStore::init(string option_str)
-{
- // init defaults. caller can override these if they want
- // prior to calling open.
- options.write_buffer_size = g_conf()->leveldb_write_buffer_size;
- options.cache_size = g_conf()->leveldb_cache_size;
- options.block_size = g_conf()->leveldb_block_size;
- options.bloom_size = g_conf()->leveldb_bloom_size;
- options.compression_enabled = g_conf()->leveldb_compression;
- options.paranoid_checks = g_conf()->leveldb_paranoid;
- options.max_open_files = g_conf()->leveldb_max_open_files;
- options.log_file = g_conf()->leveldb_log;
- return 0;
-}
-
-int LevelDBStore::open(ostream &out, const std::string& cfs) {
- if (!cfs.empty()) {
- ceph_abort_msg("Not implemented");
- }
- return do_open(out, false);
-}
-
-int LevelDBStore::create_and_open(ostream &out, const std::string& cfs) {
- if (!cfs.empty()) {
- ceph_abort_msg("Not implemented");
- }
- return do_open(out, true);
-}
-
-int LevelDBStore::load_leveldb_options(bool create_if_missing, leveldb::Options &ldoptions)
-{
- if (options.write_buffer_size)
- ldoptions.write_buffer_size = options.write_buffer_size;
- if (options.max_open_files)
- ldoptions.max_open_files = options.max_open_files;
- if (options.cache_size) {
- leveldb::Cache *_db_cache = leveldb::NewLRUCache(options.cache_size);
- db_cache.reset(_db_cache);
- ldoptions.block_cache = db_cache.get();
- }
- if (options.block_size)
- ldoptions.block_size = options.block_size;
- if (options.bloom_size) {
-#ifdef HAVE_LEVELDB_FILTER_POLICY
- const leveldb::FilterPolicy *_filterpolicy =
- leveldb::NewBloomFilterPolicy(options.bloom_size);
- filterpolicy.reset(_filterpolicy);
- ldoptions.filter_policy = filterpolicy.get();
-#else
- ceph_abort_msg("bloom size set but installed leveldb doesn't support bloom filters");
-#endif
- }
- if (options.compression_enabled)
- ldoptions.compression = leveldb::kSnappyCompression;
- else
- ldoptions.compression = leveldb::kNoCompression;
- if (options.block_restart_interval)
- ldoptions.block_restart_interval = options.block_restart_interval;
-
- ldoptions.error_if_exists = options.error_if_exists;
- ldoptions.paranoid_checks = options.paranoid_checks;
- ldoptions.create_if_missing = create_if_missing;
-
- if (g_conf()->leveldb_log_to_ceph_log) {
- ceph_logger = new CephLevelDBLogger(g_ceph_context);
- ldoptions.info_log = ceph_logger;
- }
-
- if (options.log_file.length()) {
- leveldb::Env *env = leveldb::Env::Default();
- env->NewLogger(options.log_file, &ldoptions.info_log);
- }
- return 0;
-}
-
-int LevelDBStore::do_open(ostream &out, bool create_if_missing)
-{
- leveldb::Options ldoptions;
- int r = load_leveldb_options(create_if_missing, ldoptions);
- if (r) {
- dout(1) << "load leveldb options failed" << dendl;
- return r;
- }
-
- leveldb::DB *_db;
- leveldb::Status status = leveldb::DB::Open(ldoptions, path, &_db);
- db.reset(_db);
- if (!status.ok()) {
- out << status.ToString() << std::endl;
- return -EINVAL;
- }
-
- PerfCountersBuilder plb(g_ceph_context, "leveldb", l_leveldb_first, l_leveldb_last);
- plb.add_u64_counter(l_leveldb_gets, "leveldb_get", "Gets");
- plb.add_u64_counter(l_leveldb_txns, "leveldb_transaction", "Transactions");
- plb.add_time_avg(l_leveldb_get_latency, "leveldb_get_latency", "Get Latency");
- plb.add_time_avg(l_leveldb_submit_latency, "leveldb_submit_latency", "Submit Latency");
- plb.add_time_avg(l_leveldb_submit_sync_latency, "leveldb_submit_sync_latency", "Submit Sync Latency");
- plb.add_u64_counter(l_leveldb_compact, "leveldb_compact", "Compactions");
- plb.add_u64_counter(l_leveldb_compact_range, "leveldb_compact_range", "Compactions by range");
- plb.add_u64_counter(l_leveldb_compact_queue_merge, "leveldb_compact_queue_merge", "Mergings of ranges in compaction queue");
- plb.add_u64(l_leveldb_compact_queue_len, "leveldb_compact_queue_len", "Length of compaction queue");
- logger = plb.create_perf_counters();
- cct->get_perfcounters_collection()->add(logger);
-
- if (g_conf()->leveldb_compact_on_mount) {
- derr << "Compacting leveldb store..." << dendl;
- compact();
- derr << "Finished compacting leveldb store" << dendl;
- }
- return 0;
-}
-
-int LevelDBStore::_test_init(const string& dir)
-{
- leveldb::Options options;
- options.create_if_missing = true;
- leveldb::DB *db;
- leveldb::Status status = leveldb::DB::Open(options, dir, &db);
- delete db;
- return status.ok() ? 0 : -EIO;
-}
-
-LevelDBStore::~LevelDBStore()
-{
- close();
-}
-
-void LevelDBStore::close()
-{
- // stop compaction thread
- compact_queue_lock.lock();
- if (compact_thread.is_started()) {
- compact_queue_stop = true;
- compact_queue_cond.notify_all();
- compact_queue_lock.unlock();
- compact_thread.join();
- } else {
- compact_queue_lock.unlock();
- }
-
- if (logger) {
- cct->get_perfcounters_collection()->remove(logger);
- delete logger;
- logger = nullptr;
- }
-
- // Ensure db is destroyed before dependent db_cache and filterpolicy
- db.reset();
- delete ceph_logger;
-}
-
-int LevelDBStore::repair(std::ostream &out)
-{
- leveldb::Options ldoptions;
- int r = load_leveldb_options(false, ldoptions);
- if (r) {
- dout(1) << "load leveldb options failed" << dendl;
- out << "load leveldb options failed" << std::endl;
- return r;
- }
- leveldb::Status status = leveldb::RepairDB(path, ldoptions);
- if (status.ok()) {
- return 0;
- } else {
- out << "repair leveldb failed : " << status.ToString() << std::endl;
- return 1;
- }
-}
-
-int LevelDBStore::submit_transaction(KeyValueDB::Transaction t)
-{
- utime_t start = ceph_clock_now();
- LevelDBTransactionImpl * _t =
- static_cast<LevelDBTransactionImpl *>(t.get());
- leveldb::Status s = db->Write(leveldb::WriteOptions(), &(_t->bat));
- utime_t lat = ceph_clock_now() - start;
- logger->inc(l_leveldb_txns);
- logger->tinc(l_leveldb_submit_latency, lat);
- return s.ok() ? 0 : -1;
-}
-
-int LevelDBStore::submit_transaction_sync(KeyValueDB::Transaction t)
-{
- utime_t start = ceph_clock_now();
- LevelDBTransactionImpl * _t =
- static_cast<LevelDBTransactionImpl *>(t.get());
- leveldb::WriteOptions options;
- options.sync = true;
- leveldb::Status s = db->Write(options, &(_t->bat));
- utime_t lat = ceph_clock_now() - start;
- logger->inc(l_leveldb_txns);
- logger->tinc(l_leveldb_submit_sync_latency, lat);
- return s.ok() ? 0 : -1;
-}
-
-void LevelDBStore::LevelDBTransactionImpl::set(
- const string &prefix,
- const string &k,
- const bufferlist &to_set_bl)
-{
- string key = combine_strings(prefix, k);
- size_t bllen = to_set_bl.length();
- // bufferlist::c_str() is non-constant, so we can't call c_str()
- if (to_set_bl.is_contiguous() && bllen > 0) {
- // bufferlist contains just one ptr or they're contiguous
- bat.Put(leveldb::Slice(key), leveldb::Slice(to_set_bl.buffers().front().c_str(), bllen));
- } else if ((bllen <= 32 * 1024) && (bllen > 0)) {
- // 2+ bufferptrs that are not contiguopus
- // allocate buffer on stack and copy bl contents to that buffer
- // make sure the buffer isn't too large or we might crash here...
- char* slicebuf = (char*) alloca(bllen);
- leveldb::Slice newslice(slicebuf, bllen);
- for (const auto& node : to_set_bl.buffers()) {
- const size_t ptrlen = node.length();
- memcpy(static_cast<void*>(slicebuf), node.c_str(), ptrlen);
- slicebuf += ptrlen;
- }
- bat.Put(leveldb::Slice(key), newslice);
- } else {
- // 2+ bufferptrs that are not contiguous, and enormous in size
- bufferlist val = to_set_bl;
- bat.Put(leveldb::Slice(key), leveldb::Slice(val.c_str(), val.length()));
- }
-}
-
-void LevelDBStore::LevelDBTransactionImpl::rmkey(const string &prefix,
- const string &k)
-{
- string key = combine_strings(prefix, k);
- bat.Delete(leveldb::Slice(key));
-}
-
-void LevelDBStore::LevelDBTransactionImpl::rmkeys_by_prefix(const string &prefix)
-{
- KeyValueDB::Iterator it = db->get_iterator(prefix);
- for (it->seek_to_first();
- it->valid();
- it->next()) {
- bat.Delete(leveldb::Slice(combine_strings(prefix, it->key())));
- }
-}
-
-void LevelDBStore::LevelDBTransactionImpl::rm_range_keys(const string &prefix, const string &start, const string &end)
-{
- KeyValueDB::Iterator it = db->get_iterator(prefix);
- it->lower_bound(start);
- while (it->valid()) {
- if (it->key() >= end) {
- break;
- }
- bat.Delete(combine_strings(prefix, it->key()));
- it->next();
- }
-}
-
-int LevelDBStore::get(
- const string &prefix,
- const std::set<string> &keys,
- std::map<string, bufferlist> *out)
-{
- utime_t start = ceph_clock_now();
- for (std::set<string>::const_iterator i = keys.begin();
- i != keys.end(); ++i) {
- std::string value;
- std::string bound = combine_strings(prefix, *i);
- auto status = db->Get(leveldb::ReadOptions(), leveldb::Slice(bound), &value);
- if (status.ok())
- (*out)[*i].append(value);
- }
- utime_t lat = ceph_clock_now() - start;
- logger->inc(l_leveldb_gets);
- logger->tinc(l_leveldb_get_latency, lat);
- return 0;
-}
-
-int LevelDBStore::get(const string &prefix,
- const string &key,
- bufferlist *out)
-{
- ceph_assert(out && (out->length() == 0));
- utime_t start = ceph_clock_now();
- int r = 0;
- string value, k;
- leveldb::Status s;
- k = combine_strings(prefix, key);
- s = db->Get(leveldb::ReadOptions(), leveldb::Slice(k), &value);
- if (s.ok()) {
- out->append(value);
- } else {
- r = -ENOENT;
- }
- utime_t lat = ceph_clock_now() - start;
- logger->inc(l_leveldb_gets);
- logger->tinc(l_leveldb_get_latency, lat);
- return r;
-}
-
-string LevelDBStore::combine_strings(const string &prefix, const string &value)
-{
- string out = prefix;
- out.push_back(0);
- out.append(value);
- return out;
-}
-
-bufferlist LevelDBStore::to_bufferlist(leveldb::Slice in)
-{
- bufferlist bl;
- bl.append(bufferptr(in.data(), in.size()));
- return bl;
-}
-
-int LevelDBStore::split_key(leveldb::Slice in, string *prefix, string *key)
-{
- size_t prefix_len = 0;
-
- // Find separator inside Slice
- char* separator = (char*) memchr(in.data(), 0, in.size());
- if (separator == NULL)
- return -EINVAL;
- prefix_len = size_t(separator - in.data());
- if (prefix_len >= in.size())
- return -EINVAL;
-
- if (prefix)
- *prefix = string(in.data(), prefix_len);
- if (key)
- *key = string(separator+1, in.size() - prefix_len - 1);
- return 0;
-}
-
-void LevelDBStore::compact()
-{
- logger->inc(l_leveldb_compact);
- db->CompactRange(NULL, NULL);
-}
-
-
-void LevelDBStore::compact_thread_entry()
-{
- std::unique_lock l{compact_queue_lock};
- while (!compact_queue_stop) {
- while (!compact_queue.empty()) {
- pair<string,string> range = compact_queue.front();
- compact_queue.pop_front();
- logger->set(l_leveldb_compact_queue_len, compact_queue.size());
- l.unlock();
- logger->inc(l_leveldb_compact_range);
- if (range.first.empty() && range.second.empty()) {
- compact();
- } else {
- compact_range(range.first, range.second);
- }
- l.lock();
- continue;
- }
- if (compact_queue_stop)
- break;
- compact_queue_cond.wait(l);
- }
-}
-
-void LevelDBStore::compact_range_async(const string& start, const string& end)
-{
- std::lock_guard l(compact_queue_lock);
-
- // try to merge adjacent ranges. this is O(n), but the queue should
- // be short. note that we do not cover all overlap cases and merge
- // opportunities here, but we capture the ones we currently need.
- list< pair<string,string> >::iterator p = compact_queue.begin();
- while (p != compact_queue.end()) {
- if (p->first == start && p->second == end) {
- // dup; no-op
- return;
- }
- if (p->first <= end && p->first > start) {
- // merge with existing range to the right
- compact_queue.push_back(make_pair(start, p->second));
- compact_queue.erase(p);
- logger->inc(l_leveldb_compact_queue_merge);
- break;
- }
- if (p->second >= start && p->second < end) {
- // merge with existing range to the left
- compact_queue.push_back(make_pair(p->first, end));
- compact_queue.erase(p);
- logger->inc(l_leveldb_compact_queue_merge);
- break;
- }
- ++p;
- }
- if (p == compact_queue.end()) {
- // no merge, new entry.
- compact_queue.push_back(make_pair(start, end));
- logger->set(l_leveldb_compact_queue_len, compact_queue.size());
- }
- compact_queue_cond.notify_all();
- if (!compact_thread.is_started()) {
- compact_thread.create("levdbst_compact");
- }
-}
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-#ifndef LEVEL_DB_STORE_H
-#define LEVEL_DB_STORE_H
-
-#include "include/types.h"
-#include "include/buffer_fwd.h"
-#include "KeyValueDB.h"
-#include <set>
-#include <map>
-#include <string>
-#include <boost/scoped_ptr.hpp>
-#include "leveldb/db.h"
-#include "leveldb/env.h"
-#include "leveldb/write_batch.h"
-#include "leveldb/slice.h"
-#include "leveldb/cache.h"
-#ifdef HAVE_LEVELDB_FILTER_POLICY
-#include "leveldb/filter_policy.h"
-#endif
-
-#include <errno.h>
-#include "common/errno.h"
-#include "common/dout.h"
-#include "include/ceph_assert.h"
-#include "common/Formatter.h"
-#include "common/Cond.h"
-
-#include "common/ceph_context.h"
-#include "include/common_fwd.h"
-
-// reinclude our assert to clobber the system one
-# include "include/ceph_assert.h"
-
-enum {
- l_leveldb_first = 34300,
- l_leveldb_gets,
- l_leveldb_txns,
- l_leveldb_get_latency,
- l_leveldb_submit_latency,
- l_leveldb_submit_sync_latency,
- l_leveldb_compact,
- l_leveldb_compact_range,
- l_leveldb_compact_queue_merge,
- l_leveldb_compact_queue_len,
- l_leveldb_last,
-};
-
-extern leveldb::Logger *create_leveldb_ceph_logger();
-
-class CephLevelDBLogger;
-
-/**
- * Uses LevelDB to implement the KeyValueDB interface
- */
-class LevelDBStore : public KeyValueDB {
- CephContext *cct;
- PerfCounters *logger;
- CephLevelDBLogger *ceph_logger;
- std::string path;
- boost::scoped_ptr<leveldb::Cache> db_cache;
-#ifdef HAVE_LEVELDB_FILTER_POLICY
- boost::scoped_ptr<const leveldb::FilterPolicy> filterpolicy;
-#endif
- boost::scoped_ptr<leveldb::DB> db;
-
- int load_leveldb_options(bool create_if_missing, leveldb::Options &opts);
- int do_open(std::ostream &out, bool create_if_missing);
-
- // manage async compactions
- ceph::mutex compact_queue_lock =
- ceph::make_mutex("LevelDBStore::compact_thread_lock");
- ceph::condition_variable compact_queue_cond;
- std::list<std::pair<std::string, std::string>> compact_queue;
- bool compact_queue_stop;
- class CompactThread : public Thread {
- LevelDBStore *db;
- public:
- explicit CompactThread(LevelDBStore *d) : db(d) {}
- void *entry() override {
- db->compact_thread_entry();
- return NULL;
- }
- friend class LevelDBStore;
- } compact_thread;
-
- void compact_thread_entry();
-
- void compact_range(const std::string& start, const std::string& end) {
- leveldb::Slice cstart(start);
- leveldb::Slice cend(end);
- db->CompactRange(&cstart, &cend);
- }
- void compact_range_async(const std::string& start, const std::string& end);
-
-public:
- /// compact the underlying leveldb store
- void compact() override;
-
- void compact_async() override {
- compact_range_async({}, {});
- }
-
- /// compact db for all keys with a given prefix
- void compact_prefix(const std::string& prefix) override {
- compact_range(prefix, past_prefix(prefix));
- }
- void compact_prefix_async(const std::string& prefix) override {
- compact_range_async(prefix, past_prefix(prefix));
- }
- void compact_range(const std::string& prefix,
- const std::string& start, const std::string& end) override {
- compact_range(combine_strings(prefix, start), combine_strings(prefix, end));
- }
- void compact_range_async(const std::string& prefix,
- const std::string& start, const std::string& end) override {
- compact_range_async(combine_strings(prefix, start),
- combine_strings(prefix, end));
- }
-
-
- /**
- * options_t: Holds options which are minimally interpreted
- * on initialization and then passed through to LevelDB.
- * We transform a couple of these into actual LevelDB
- * structures, but the rest are simply passed through unchanged. See
- * leveldb/options.h for more precise details on each.
- *
- * Set them after constructing the LevelDBStore, but before calling
- * open() or create_and_open().
- */
- struct options_t {
- uint64_t write_buffer_size; /// in-memory write buffer size
- int max_open_files; /// maximum number of files LevelDB can open at once
- uint64_t cache_size; /// size of extra decompressed cache to use
- uint64_t block_size; /// user data per block
- int bloom_size; /// number of bits per entry to put in a bloom filter
- bool compression_enabled; /// whether to use libsnappy compression or not
-
- // don't change these ones. No, seriously
- int block_restart_interval;
- bool error_if_exists;
- bool paranoid_checks;
-
- std::string log_file;
-
- options_t() :
- write_buffer_size(0), //< 0 means default
- max_open_files(0), //< 0 means default
- cache_size(0), //< 0 means no cache (default)
- block_size(0), //< 0 means default
- bloom_size(0), //< 0 means no bloom filter (default)
- compression_enabled(true), //< set to false for no compression
- block_restart_interval(0), //< 0 means default
- error_if_exists(false), //< set to true if you want to check nonexistence
- paranoid_checks(false) //< set to true if you want paranoid checks
- {}
- } options;
-
- LevelDBStore(CephContext *c, const std::string &path) :
- cct(c),
- logger(NULL),
- ceph_logger(NULL),
- path(path),
- db_cache(NULL),
-#ifdef HAVE_LEVELDB_FILTER_POLICY
- filterpolicy(NULL),
-#endif
- compact_queue_stop(false),
- compact_thread(this),
- options()
- {}
-
- ~LevelDBStore() override;
-
- static int _test_init(const std::string& dir);
- int init(std::string option_str="") override;
-
- /// Opens underlying db
- int open(std::ostream &out, const std::string& cfs="") override;
- /// Creates underlying db if missing and opens it
- int create_and_open(std::ostream &out, const std::string& cfs="") override;
-
- void close() override;
-
- PerfCounters *get_perf_counters() override
- {
- return logger;
- }
- int repair(std::ostream &out) override;
-
- class LevelDBTransactionImpl : public KeyValueDB::TransactionImpl {
- public:
- leveldb::WriteBatch bat;
- LevelDBStore *db;
- explicit LevelDBTransactionImpl(LevelDBStore *db) : db(db) {}
- void set(
- const std::string &prefix,
- const std::string &k,
- const ceph::buffer::list &bl) override;
- using KeyValueDB::TransactionImpl::set;
- void rmkey(
- const std::string &prefix,
- const std::string &k) override;
- void rmkeys_by_prefix(
- const std::string &prefix
- ) override;
- virtual void rm_range_keys(
- const std::string &prefix,
- const std::string &start,
- const std::string &end) override;
-
- using KeyValueDB::TransactionImpl::rmkey;
- };
-
- KeyValueDB::Transaction get_transaction() override {
- return std::make_shared<LevelDBTransactionImpl>(this);
- }
-
- int submit_transaction(KeyValueDB::Transaction t) override;
- int submit_transaction_sync(KeyValueDB::Transaction t) override;
- int get(
- const std::string &prefix,
- const std::set<std::string> &key,
- std::map<std::string, ceph::buffer::list> *out
- ) override;
-
- int get(const std::string &prefix,
- const std::string &key,
- ceph::buffer::list *value) override;
-
- using KeyValueDB::get;
-
- class LevelDBWholeSpaceIteratorImpl :
- public KeyValueDB::WholeSpaceIteratorImpl {
- protected:
- boost::scoped_ptr<leveldb::Iterator> dbiter;
- public:
- explicit LevelDBWholeSpaceIteratorImpl(leveldb::Iterator *iter) :
- dbiter(iter) { }
- ~LevelDBWholeSpaceIteratorImpl() override { }
-
- int seek_to_first() override {
- dbiter->SeekToFirst();
- return dbiter->status().ok() ? 0 : -1;
- }
- int seek_to_first(const std::string &prefix) override {
- leveldb::Slice slice_prefix(prefix);
- dbiter->Seek(slice_prefix);
- return dbiter->status().ok() ? 0 : -1;
- }
- int seek_to_last() override {
- dbiter->SeekToLast();
- return dbiter->status().ok() ? 0 : -1;
- }
- int seek_to_last(const std::string &prefix) override {
- std::string limit = past_prefix(prefix);
- leveldb::Slice slice_limit(limit);
- dbiter->Seek(slice_limit);
-
- if (!dbiter->Valid()) {
- dbiter->SeekToLast();
- } else {
- dbiter->Prev();
- }
- return dbiter->status().ok() ? 0 : -1;
- }
- int upper_bound(const std::string &prefix, const std::string &after) override {
- lower_bound(prefix, after);
- if (valid()) {
- std::pair<std::string,std::string> key = raw_key();
- if (key.first == prefix && key.second == after)
- next();
- }
- return dbiter->status().ok() ? 0 : -1;
- }
- int lower_bound(const std::string &prefix, const std::string &to) override {
- std::string bound = combine_strings(prefix, to);
- leveldb::Slice slice_bound(bound);
- dbiter->Seek(slice_bound);
- return dbiter->status().ok() ? 0 : -1;
- }
- bool valid() override {
- return dbiter->Valid();
- }
- int next() override {
- if (valid())
- dbiter->Next();
- return dbiter->status().ok() ? 0 : -1;
- }
- int prev() override {
- if (valid())
- dbiter->Prev();
- return dbiter->status().ok() ? 0 : -1;
- }
- std::string key() override {
- std::string out_key;
- split_key(dbiter->key(), 0, &out_key);
- return out_key;
- }
- std::pair<std::string,std::string> raw_key() override {
- std::string prefix, key;
- split_key(dbiter->key(), &prefix, &key);
- return std::make_pair(prefix, key);
- }
- bool raw_key_is_prefixed(const std::string &prefix) override {
- leveldb::Slice key = dbiter->key();
- if ((key.size() > prefix.length()) && (key[prefix.length()] == '\0')) {
- return memcmp(key.data(), prefix.c_str(), prefix.length()) == 0;
- } else {
- return false;
- }
- }
- ceph::buffer::list value() override {
- return to_bufferlist(dbiter->value());
- }
-
- ceph::bufferptr value_as_ptr() override {
- leveldb::Slice data = dbiter->value();
- return ceph::bufferptr(data.data(), data.size());
- }
-
- int status() override {
- return dbiter->status().ok() ? 0 : -1;
- }
- };
-
- /// Utility
- static std::string combine_strings(const std::string &prefix, const std::string &value);
- static int split_key(leveldb::Slice in, std::string *prefix, std::string *key);
- static ceph::buffer::list to_bufferlist(leveldb::Slice in);
- static std::string past_prefix(const std::string &prefix) {
- std::string limit = prefix;
- limit.push_back(1);
- return limit;
- }
-
- uint64_t get_estimated_size(std::map<std::string,std::uint64_t> &extra) override {
- DIR *store_dir = opendir(path.c_str());
- if (!store_dir) {
- lderr(cct) << __func__ << " something happened opening the store: "
- << cpp_strerror(errno) << dendl;
- return 0;
- }
-
- uint64_t total_size = 0;
- uint64_t sst_size = 0;
- uint64_t log_size = 0;
- uint64_t misc_size = 0;
-
- struct dirent *entry = NULL;
- while ((entry = readdir(store_dir)) != NULL) {
- std::string n(entry->d_name);
-
- if (n == "." || n == "..")
- continue;
-
- std::string fpath = path + '/' + n;
- struct stat s;
- int err = stat(fpath.c_str(), &s);
- if (err < 0)
- err = -errno;
- // we may race against leveldb while reading files; this should only
- // happen when those files are being updated, data is being shuffled
- // and files get removed, in which case there's not much of a problem
- // as we'll get to them next time around.
- if (err == -ENOENT) {
- continue;
- }
- if (err < 0) {
- lderr(cct) << __func__ << " error obtaining stats for " << fpath
- << ": " << cpp_strerror(err) << dendl;
- goto err;
- }
-
- size_t pos = n.find_last_of('.');
- if (pos == std::string::npos) {
- misc_size += s.st_size;
- continue;
- }
-
- std::string ext = n.substr(pos+1);
- if (ext == "sst") {
- sst_size += s.st_size;
- } else if (ext == "log") {
- log_size += s.st_size;
- } else {
- misc_size += s.st_size;
- }
- }
-
- total_size = sst_size + log_size + misc_size;
-
- extra["sst"] = sst_size;
- extra["log"] = log_size;
- extra["misc"] = misc_size;
- extra["total"] = total_size;
-
-err:
- closedir(store_dir);
- return total_size;
- }
-
-
- WholeSpaceIterator get_wholespace_iterator(IteratorOpts opts = 0) override {
- return std::make_shared<LevelDBWholeSpaceIteratorImpl>(
- db->NewIterator(leveldb::ReadOptions()));
- }
-
-};
-
-#endif
target_link_libraries(get_command_descriptions
mon
global
- ${LEVELDB_LIBRARIES}
${EXTRALIBS}
${BLKID_LIBRARIES}
${CMAKE_DL_LIBS}
INSTANTIATE_TEST_SUITE_P(
KeyValueDB,
KVTest,
- ::testing::Values("leveldb", "rocksdb", "memdb"));
+ ::testing::Values("rocksdb", "memdb"));
INSTANTIATE_TEST_SUITE_P(
KeyValueDB,