From: Sage Weil Date: Mon, 19 Oct 2015 16:55:56 +0000 (-0400) Subject: kv: move KeyValueDB from os/ to kv/, libos.a to libkv.a X-Git-Tag: v10.0.1~116^2~22 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=e1783d23332258b34f89aae35c853be481656f24;p=ceph.git kv: move KeyValueDB from os/ to kv/, libos.a to libkv.a Better code organization, and it will allow us to link less code into the mon. Signed-off-by: Sage Weil --- diff --git a/src/Makefile-env.am b/src/Makefile-env.am index 1d481da96f77..c794dcadf7f0 100644 --- a/src/Makefile-env.am +++ b/src/Makefile-env.am @@ -169,6 +169,7 @@ LIBMSG = libmsg.la LIBCRUSH = libcrush.la LIBCOMPRESSOR = libcompressor.la -lsnappy LIBJSON_SPIRIT = libjson_spirit.la +LIBKV = libkv.a LIBLOG = liblog.la LIBOS = libos.a LIBOS_TYPES = libos_types.a @@ -228,12 +229,12 @@ LIBMDS += $(LIBPERFGLUE) # OSD needs types LIBOSD += $(LIBOSD_TYPES) $(LIBOS_TYPES) -# libos linking order is ornery -LIBOS += $(LIBOS_TYPES) +# libkv/libos linking order is ornery if WITH_SLIBROCKSDB -LIBOS += rocksdb/librocksdb.a +LIBKV += rocksdb/librocksdb.a endif -LIBOS += -lbz2 -lz -lleveldb -lsnappy +LIBKV += -lbz2 -lz -lleveldb -lsnappy +LIBOS += $(LIBOS_TYPES) $(LIBKV) LIBMON += $(LIBMON_TYPES) diff --git a/src/Makefile.am b/src/Makefile.am index 3bb6c19daf71..2e4f1286a5da 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -14,6 +14,7 @@ include auth/Makefile.am include brag/Makefile.am include ceph-detect-init/Makefile.am include crush/Makefile.am +include kv/Makefile.am include mon/Makefile.am include mds/Makefile.am include os/Makefile.am diff --git a/src/kv/KeyValueDB.cc b/src/kv/KeyValueDB.cc new file mode 100644 index 000000000000..65ce487ed806 --- /dev/null +++ b/src/kv/KeyValueDB.cc @@ -0,0 +1,50 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "KeyValueDB.h" +#include "LevelDBStore.h" +#ifdef HAVE_LIBROCKSDB +#include "RocksDBStore.h" +#endif +#ifdef HAVE_KINETIC +#include "KineticStore.h" +#endif + +KeyValueDB *KeyValueDB::create(CephContext *cct, const string& type, + const string& dir) +{ + if (type == "leveldb") { + return new LevelDBStore(cct, dir); + } +#ifdef HAVE_KINETIC + if (type == "kinetic" && + cct->check_experimental_feature_enabled("kinetic")) { + return new KineticStore(cct); + } +#endif +#ifdef HAVE_LIBROCKSDB + if (type == "rocksdb" && + cct->check_experimental_feature_enabled("rocksdb")) { + return new RocksDBStore(cct, dir); + } +#endif + return NULL; +} + +int KeyValueDB::test_init(const string& type, const string& dir) +{ + if (type == "leveldb") { + return LevelDBStore::_test_init(dir); + } +#ifdef HAVE_KINETIC + if (type == "kinetic") { + return KineticStore::_test_init(g_ceph_context); + } +#endif +#ifdef HAVE_LIBROCKSDB + if (type == "rocksdb") { + return RocksDBStore::_test_init(dir); + } +#endif + return -EINVAL; +} diff --git a/src/kv/KeyValueDB.h b/src/kv/KeyValueDB.h new file mode 100644 index 000000000000..755a627f8f9f --- /dev/null +++ b/src/kv/KeyValueDB.h @@ -0,0 +1,233 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +#ifndef KEY_VALUE_DB_H +#define KEY_VALUE_DB_H + +#include "include/buffer.h" +#include +#include +#include +#include +#include "include/memory.h" +#include + +using std::string; +/** + * Defines virtual interface to be implemented by key value store + * + * Kyoto Cabinet or LevelDB should implement this + */ +class KeyValueDB { +public: + class TransactionImpl { + public: + /// Set Keys + void set( + const std::string &prefix, ///< [in] Prefix for keys + const std::map &to_set ///< [in] keys/values to set + ) { + std::map::const_iterator it; + for (it = to_set.begin(); it != to_set.end(); ++it) + set(prefix, it->first, it->second); + } + + /// Set Key + virtual void set( + const std::string &prefix, ///< [in] Prefix for the key + const std::string &k, ///< [in] Key to set + const bufferlist &bl ///< [in] Value to set + ) = 0; + + + /// Removes Keys + void rmkeys( + const std::string &prefix, ///< [in] Prefix to search for + const std::set &keys ///< [in] Keys to remove + ) { + std::set::const_iterator it; + for (it = keys.begin(); it != keys.end(); ++it) + rmkey(prefix, *it); + } + + /// Remove Key + virtual void rmkey( + const std::string &prefix, ///< [in] Prefix to search for + const std::string &k ///< [in] Key to remove + ) = 0; + + /// Removes keys beginning with prefix + virtual void rmkeys_by_prefix( + const std::string &prefix ///< [in] Prefix by which to remove keys + ) = 0; + + virtual ~TransactionImpl() {} + }; + typedef ceph::shared_ptr< TransactionImpl > Transaction; + + /// create a new instance + static KeyValueDB *create(CephContext *cct, const std::string& type, + const std::string& dir); + + /// test whether we can successfully initialize; may have side effects (e.g., create) + static int test_init(const std::string& type, const std::string& dir); + virtual int init(string option_str="") = 0; + virtual int open(std::ostream &out) = 0; + virtual int create_and_open(std::ostream &out) = 0; + + virtual Transaction get_transaction() = 0; + virtual int submit_transaction(Transaction) = 0; + virtual int submit_transaction_sync(Transaction t) { + return submit_transaction(t); + } + + /// Retrieve Keys + virtual int get( + const std::string &prefix, ///< [in] Prefix for key + const std::set &key, ///< [in] Key to retrieve + std::map *out ///< [out] Key value retrieved + ) = 0; + virtual int get(const std::string &prefix, ///< [in] prefix + const std::string &key, ///< [in] key + bufferlist *value) { ///< [out] value + std::set ks; + ks.insert(key); + std::map om; + int r = get(prefix, ks, &om); + if (om.find(key) != om.end()) { + *value = om[key]; + } else { + *value = bufferlist(); + r = -ENOENT; + } + return r; + } + + class GenericIteratorImpl { + public: + virtual int seek_to_first() = 0; + virtual int upper_bound(const std::string &after) = 0; + virtual int lower_bound(const std::string &to) = 0; + virtual bool valid() = 0; + virtual int next() = 0; + virtual std::string key() = 0; + virtual bufferlist value() = 0; + virtual int status() = 0; + virtual ~GenericIteratorImpl() {} + }; + + class WholeSpaceIteratorImpl { + public: + virtual int seek_to_first() = 0; + virtual int seek_to_first(const std::string &prefix) = 0; + virtual int seek_to_last() = 0; + virtual int seek_to_last(const std::string &prefix) = 0; + virtual int upper_bound(const std::string &prefix, const std::string &after) = 0; + virtual int lower_bound(const std::string &prefix, const std::string &to) = 0; + virtual bool valid() = 0; + virtual int next() = 0; + virtual int prev() = 0; + virtual std::string key() = 0; + virtual std::pair raw_key() = 0; + virtual bufferlist value() = 0; + virtual int status() = 0; + virtual ~WholeSpaceIteratorImpl() { } + }; + typedef ceph::shared_ptr< WholeSpaceIteratorImpl > WholeSpaceIterator; + + class IteratorImpl : public GenericIteratorImpl { + const std::string prefix; + WholeSpaceIterator generic_iter; + public: + IteratorImpl(const std::string &prefix, WholeSpaceIterator iter) : + prefix(prefix), generic_iter(iter) { } + virtual ~IteratorImpl() { } + + int seek_to_first() { + return generic_iter->seek_to_first(prefix); + } + int seek_to_last() { + return generic_iter->seek_to_last(prefix); + } + int upper_bound(const std::string &after) { + return generic_iter->upper_bound(prefix, after); + } + int lower_bound(const std::string &to) { + return generic_iter->lower_bound(prefix, to); + } + bool valid() { + if (!generic_iter->valid()) + return false; + std::pair raw_key = generic_iter->raw_key(); + return (raw_key.first.compare(0, prefix.length(), prefix) == 0); + } + int next() { + if (valid()) + return generic_iter->next(); + return status(); + } + int prev() { + if (valid()) + return generic_iter->prev(); + return status(); + } + std::string key() { + return generic_iter->key(); + } + std::pair raw_key() { + return generic_iter->raw_key(); + } + bufferlist value() { + return generic_iter->value(); + } + int status() { + return generic_iter->status(); + } + }; + + typedef ceph::shared_ptr< IteratorImpl > Iterator; + + WholeSpaceIterator get_iterator() { + return _get_iterator(); + } + + Iterator get_iterator(const std::string &prefix) { + return ceph::shared_ptr( + new IteratorImpl(prefix, get_iterator()) + ); + } + + WholeSpaceIterator get_snapshot_iterator() { + return _get_snapshot_iterator(); + } + + Iterator get_snapshot_iterator(const std::string &prefix) { + return ceph::shared_ptr( + new IteratorImpl(prefix, get_snapshot_iterator()) + ); + } + + virtual uint64_t get_estimated_size(std::map &extra) = 0; + virtual int get_statfs(struct statfs *buf) { + return -EOPNOTSUPP; + } + + virtual ~KeyValueDB() {} + + /// compact the underlying store + virtual void compact() {} + + /// compact db for all keys with a given prefix + virtual void compact_prefix(const std::string& prefix) {} + /// compact db for all keys with a given prefix, async + virtual void compact_prefix_async(const std::string& prefix) {} + virtual void compact_range(const std::string& prefix, + const std::string& start, const std::string& end) {} + virtual void compact_range_async(const std::string& prefix, + const std::string& start, const std::string& end) {} + +protected: + virtual WholeSpaceIterator _get_iterator() = 0; + virtual WholeSpaceIterator _get_snapshot_iterator() = 0; +}; + +#endif diff --git a/src/kv/KineticStore.cc b/src/kv/KineticStore.cc new file mode 100644 index 000000000000..fb6e2bfe002e --- /dev/null +++ b/src/kv/KineticStore.cc @@ -0,0 +1,329 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +#include "KineticStore.h" +#include "common/ceph_crypto.h" + +#include +#include +#include +#include "include/memory.h" +#include +using std::string; +#include "common/perf_counters.h" + +#define dout_subsys ceph_subsys_keyvaluestore + +int KineticStore::init() +{ + // init defaults. caller can override these if they want + // prior to calling open. + host = cct->_conf->kinetic_host; + port = cct->_conf->kinetic_port; + user_id = cct->_conf->kinetic_user_id; + hmac_key = cct->_conf->kinetic_hmac_key; + use_ssl = cct->_conf->kinetic_use_ssl; + return 0; +} + +int KineticStore::_test_init(CephContext *c) +{ + kinetic::KineticConnectionFactory conn_factory = + kinetic::NewKineticConnectionFactory(); + + kinetic::ConnectionOptions options; + options.host = cct->_conf->kinetic_host; + options.port = cct->_conf->kinetic_port; + options.user_id = cct->_conf->kinetic_user_id; + options.hmac_key = cct->_conf->kinetic_hmac_key; + options.use_ssl = cct->_conf->kinetic_use_ssl; + + kinetic::Status status = conn_factory.NewThreadsafeBlockingConnection(options, kinetic_conn, 10); + kinetic_conn.reset(); + if (!status.ok()) + derr << __func__ << "Unable to connect to kinetic store " << options.host + << ":" << options.port << " : " << status.ToString() << dendl; + return status.ok() ? 0 : -EIO; +} + +int KineticStore::do_open(ostream &out, bool create_if_missing) +{ + kinetic::KineticConnectionFactory conn_factory = + kinetic::NewKineticConnectionFactory(); + kinetic::ConnectionOptions options; + options.host = host; + options.port = port; + options.user_id = user_id; + options.hmac_key = hmac_key; + options.use_ssl = use_ssl; + kinetic::Status status = conn_factory.NewThreadsafeBlockingConnection(options, kinetic_conn, 10); + if (!status.ok()) { + derr << "Unable to connect to kinetic store " << host << ":" << port + << " : " << status.ToString() << dendl; + return -EINVAL; + } + + PerfCountersBuilder plb(g_ceph_context, "kinetic", l_kinetic_first, l_kinetic_last); + plb.add_u64_counter(l_kinetic_gets, "kinetic_get", "Gets"); + plb.add_u64_counter(l_kinetic_txns, "kinetic_transaction", "Transactions"); + logger = plb.create_perf_counters(); + cct->get_perfcounters_collection()->add(logger); + return 0; +} + +KineticStore::KineticStore(CephContext *c) : + cct(c), + logger(NULL) +{ + host = c->_conf->kinetic_host; + port = c->_conf->kinetic_port; + user_id = c->_conf->kinetic_user_id; + hmac_key = c->_conf->kinetic_hmac_key; + use_ssl = c->_conf->kinetic_use_ssl; +} + +KineticStore::~KineticStore() +{ + close(); + delete logger; +} + +void KineticStore::close() +{ + kinetic_conn.reset(); + if (logger) + cct->get_perfcounters_collection()->remove(logger); +} + +int KineticStore::submit_transaction(KeyValueDB::Transaction t) +{ + KineticTransactionImpl * _t = + static_cast(t.get()); + + dout(20) << "kinetic submit_transaction" << dendl; + + for (vector::iterator it = _t->ops.begin(); + it != _t->ops.end(); ++it) { + kinetic::KineticStatus status(kinetic::StatusCode::OK, ""); + if (it->type == KINETIC_OP_WRITE) { + string data(it->data.c_str(), it->data.length()); + kinetic::KineticRecord record(data, "", "", + com::seagate::kinetic::client::proto::Message_Algorithm_SHA1); + dout(30) << "kinetic before put of " << it->key << " (" << data.length() << " bytes)" << dendl; + status = kinetic_conn->Put(it->key, "", kinetic::WriteMode::IGNORE_VERSION, + record); + dout(30) << "kinetic after put of " << it->key << dendl; + } else { + assert(it->type == KINETIC_OP_DELETE); + dout(30) << "kinetic before delete" << dendl; + status = kinetic_conn->Delete(it->key, "", + kinetic::WriteMode::IGNORE_VERSION); + dout(30) << "kinetic after delete" << dendl; + } + if (!status.ok()) { + derr << "kinetic error submitting transaction: " + << status.message() << dendl; + return -1; + } + } + + logger->inc(l_kinetic_txns); + return 0; +} + +int KineticStore::submit_transaction_sync(KeyValueDB::Transaction t) +{ + return submit_transaction(t); +} + +void KineticStore::KineticTransactionImpl::set( + const string &prefix, + const string &k, + const bufferlist &to_set_bl) +{ + string key = combine_strings(prefix, k); + dout(30) << "kinetic set key " << key << dendl; + ops.push_back(KineticOp(KINETIC_OP_WRITE, key, to_set_bl)); +} + +void KineticStore::KineticTransactionImpl::rmkey(const string &prefix, + const string &k) +{ + string key = combine_strings(prefix, k); + dout(30) << "kinetic rm key " << key << dendl; + ops.push_back(KineticOp(KINETIC_OP_DELETE, key)); +} + +void KineticStore::KineticTransactionImpl::rmkeys_by_prefix(const string &prefix) +{ + dout(20) << "kinetic rmkeys_by_prefix " << prefix << dendl; + KeyValueDB::Iterator it = db->get_iterator(prefix); + for (it->seek_to_first(); + it->valid(); + it->next()) { + string key = combine_strings(prefix, it->key()); + ops.push_back(KineticOp(KINETIC_OP_DELETE, key)); + dout(30) << "kinetic rm key by prefix: " << key << dendl; + } +} + +int KineticStore::get( + const string &prefix, + const std::set &keys, + std::map *out) +{ + dout(30) << "kinetic get prefix: " << prefix << " keys: " << keys << dendl; + for (std::set::const_iterator i = keys.begin(); + i != keys.end(); + ++i) { + unique_ptr record; + string key = combine_strings(prefix, *i); + dout(30) << "before get key " << key << dendl; + kinetic::KineticStatus status = kinetic_conn->Get(key, record); + if (!status.ok()) + break; + dout(30) << "kinetic get got key: " << key << dendl; + out->insert(make_pair(key, to_bufferlist(*record.get()))); + } + logger->inc(l_kinetic_gets); + return 0; +} + +string KineticStore::combine_strings(const string &prefix, const string &value) +{ + string out = prefix; + out.push_back(1); + out.append(value); + return out; +} + +bufferlist KineticStore::to_bufferlist(const kinetic::KineticRecord &record) +{ + bufferlist bl; + bl.append(*(record.value())); + return bl; +} + +int KineticStore::split_key(string in_prefix, string *prefix, string *key) +{ + size_t prefix_len = in_prefix.find('\1'); + if (prefix_len >= in_prefix.size()) + return -EINVAL; + + if (prefix) + *prefix = string(in_prefix, 0, prefix_len); + if (key) + *key= string(in_prefix, prefix_len + 1); + return 0; +} + +KineticStore::KineticWholeSpaceIteratorImpl::KineticWholeSpaceIteratorImpl(kinetic::BlockingKineticConnection *conn) : kinetic_conn(conn), + kinetic_status(kinetic::StatusCode::OK, "") +{ + dout(30) << "kinetic iterator constructor()" << dendl; + const static string last_key = "\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF"; + kinetic::KeyRangeIterator it = + kinetic_conn->IterateKeyRange("", true, last_key, true, 1024); + while (it != kinetic::KeyRangeEnd()) { + try { + keys.insert(*it); + dout(30) << "kinetic iterator added " << *it << dendl; + } catch (std::runtime_error &e) { + kinetic_status = kinetic::KineticStatus(kinetic::StatusCode::CLIENT_INTERNAL_ERROR, e.what()); + return; + } + ++it; + } + keys_iter = keys.begin(); +} + +int KineticStore::KineticWholeSpaceIteratorImpl::seek_to_first(const string &prefix) +{ + dout(30) << "kinetic iterator seek_to_first(prefix): " << prefix << dendl; + keys_iter = keys.lower_bound(prefix); + return 0; +} + +int KineticStore::KineticWholeSpaceIteratorImpl::seek_to_last() +{ + dout(30) << "kinetic iterator seek_to_last()" << dendl; + keys_iter = keys.end(); + if (keys.begin() != keys_iter) + --keys_iter; + return 0; +} + +int KineticStore::KineticWholeSpaceIteratorImpl::seek_to_last(const string &prefix) +{ + dout(30) << "kinetic iterator seek_to_last(prefix): " << prefix << dendl; + keys_iter = keys.upper_bound(prefix + "\2"); + if (keys.begin() == keys_iter) { + keys_iter = keys.end(); + } else { + --keys_iter; + } + return 0; +} + +int KineticStore::KineticWholeSpaceIteratorImpl::upper_bound(const string &prefix, const string &after) { + dout(30) << "kinetic iterator upper_bound()" << dendl; + string bound = combine_strings(prefix, after); + keys_iter = keys.upper_bound(bound); + return 0; +} + +int KineticStore::KineticWholeSpaceIteratorImpl::lower_bound(const string &prefix, const string &to) { + dout(30) << "kinetic iterator lower_bound()" << dendl; + string bound = combine_strings(prefix, to); + keys_iter = keys.lower_bound(bound); + return 0; +} + +bool KineticStore::KineticWholeSpaceIteratorImpl::valid() { + dout(30) << "kinetic iterator valid()" << dendl; + return keys_iter != keys.end(); +} + +int KineticStore::KineticWholeSpaceIteratorImpl::next() { + dout(30) << "kinetic iterator next()" << dendl; + if (keys_iter != keys.end()) { + ++keys_iter; + return 0; + } + return -1; +} + +int KineticStore::KineticWholeSpaceIteratorImpl::prev() { + dout(30) << "kinetic iterator prev()" << dendl; + if (keys_iter != keys.begin()) { + --keys_iter; + return 0; + } + keys_iter = keys.end(); + return -1; +} + +string KineticStore::KineticWholeSpaceIteratorImpl::key() { + dout(30) << "kinetic iterator key()" << dendl; + string out_key; + split_key(*keys_iter, NULL, &out_key); + return out_key; +} + +pair KineticStore::KineticWholeSpaceIteratorImpl::raw_key() { + dout(30) << "kinetic iterator raw_key()" << dendl; + string prefix, key; + split_key(*keys_iter, &prefix, &key); + return make_pair(prefix, key); +} + +bufferlist KineticStore::KineticWholeSpaceIteratorImpl::value() { + dout(30) << "kinetic iterator value()" << dendl; + unique_ptr record; + kinetic_status = kinetic_conn->Get(*keys_iter, record); + return to_bufferlist(*record.get()); +} + +int KineticStore::KineticWholeSpaceIteratorImpl::status() { + dout(30) << "kinetic iterator status()" << dendl; + return kinetic_status.ok() ? 0 : -1; +} diff --git a/src/kv/KineticStore.h b/src/kv/KineticStore.h new file mode 100644 index 000000000000..cbb7633466b1 --- /dev/null +++ b/src/kv/KineticStore.h @@ -0,0 +1,160 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +#ifndef KINETIC_STORE_H +#define KINETIC_STORE_H + +#include "include/types.h" +#include "include/buffer.h" +#include "KeyValueDB.h" +#include +#include +#include +#include "include/memory.h" +#include + +#include +#include "common/errno.h" +#include "common/dout.h" +#include "include/assert.h" +#include "common/Formatter.h" + +#include "common/ceph_context.h" + +class PerfCounters; + +enum { + l_kinetic_first = 34400, + l_kinetic_gets, + l_kinetic_txns, + l_kinetic_last, +}; + +/** + * Uses Kinetic to implement the KeyValueDB interface + */ +class KineticStore : public KeyValueDB { + CephContext *cct; + PerfCounters *logger; + string host; + int port; + int user_id; + string hmac_key; + bool use_ssl; + std::unique_ptr kinetic_conn; + + int do_open(ostream &out, bool create_if_missing); + +public: + KineticStore(CephContext *c); + ~KineticStore(); + + static int _test_init(CephContext *c); + int init(); + + /// Opens underlying db + int open(ostream &out) { + return do_open(out, false); + } + /// Creates underlying db if missing and opens it + int create_and_open(ostream &out) { + return do_open(out, true); + } + + void close(); + + enum KineticOpType { + KINETIC_OP_WRITE, + KINETIC_OP_DELETE, + }; + + struct KineticOp { + KineticOpType type; + std::string key; + bufferlist data; + KineticOp(KineticOpType type, const string &key) : type(type), key(key) {} + KineticOp(KineticOpType type, const string &key, const bufferlist &data) + : type(type), key(key), data(data) {} + }; + + class KineticTransactionImpl : public KeyValueDB::TransactionImpl { + public: + vector ops; + KineticStore *db; + + KineticTransactionImpl(KineticStore *db) : db(db) {} + void set( + const string &prefix, + const string &k, + const bufferlist &bl); + void rmkey( + const string &prefix, + const string &k); + void rmkeys_by_prefix( + const string &prefix + ); + }; + + KeyValueDB::Transaction get_transaction() { + return ceph::shared_ptr< KineticTransactionImpl >( + new KineticTransactionImpl(this)); + } + + int submit_transaction(KeyValueDB::Transaction t); + int submit_transaction_sync(KeyValueDB::Transaction t); + int get( + const string &prefix, + const std::set &key, + std::map *out + ); + + class KineticWholeSpaceIteratorImpl : + public KeyValueDB::WholeSpaceIteratorImpl { + std::set keys; + std::set::iterator keys_iter; + kinetic::BlockingKineticConnection *kinetic_conn; + kinetic::KineticStatus kinetic_status; + public: + KineticWholeSpaceIteratorImpl(kinetic::BlockingKineticConnection *conn); + virtual ~KineticWholeSpaceIteratorImpl() { } + + int seek_to_first() { + return seek_to_first(""); + } + int seek_to_first(const string &prefix); + int seek_to_last(); + int seek_to_last(const string &prefix); + int upper_bound(const string &prefix, const string &after); + int lower_bound(const string &prefix, const string &to); + bool valid(); + int next(); + int prev(); + string key(); + pair raw_key(); + bufferlist value(); + int status(); + }; + + /// Utility + static string combine_strings(const string &prefix, const string &value); + static int split_key(string in_prefix, string *prefix, string *key); + static bufferlist to_bufferlist(const kinetic::KineticRecord &record); + virtual uint64_t get_estimated_size(map &extra) { + // not used by the osd + return 0; + } + + +protected: + WholeSpaceIterator _get_iterator() { + return ceph::shared_ptr( + new KineticWholeSpaceIteratorImpl(kinetic_conn.get())); + } + + // TODO: remove snapshots from interface + WholeSpaceIterator _get_snapshot_iterator() { + return _get_iterator(); + } + +}; + +#endif diff --git a/src/kv/LevelDBStore.cc b/src/kv/LevelDBStore.cc new file mode 100644 index 000000000000..6a71aef9ec34 --- /dev/null +++ b/src/kv/LevelDBStore.cc @@ -0,0 +1,307 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +#include "LevelDBStore.h" + +#include +#include +#include +#include "include/memory.h" +#include +using std::string; +#include "common/debug.h" +#include "common/perf_counters.h" + +int LevelDBStore::init(string option_str) +{ + // init defaults. caller can override these if they want + // prior to calling open. + options.write_buffer_size = g_conf->leveldb_write_buffer_size; + options.cache_size = g_conf->leveldb_cache_size; + options.block_size = g_conf->leveldb_block_size; + options.bloom_size = g_conf->leveldb_bloom_size; + options.compression_enabled = g_conf->leveldb_compression; + options.paranoid_checks = g_conf->leveldb_paranoid; + options.max_open_files = g_conf->leveldb_max_open_files; + options.log_file = g_conf->leveldb_log; + return 0; +} + +int LevelDBStore::do_open(ostream &out, bool create_if_missing) +{ + leveldb::Options ldoptions; + + if (options.write_buffer_size) + ldoptions.write_buffer_size = options.write_buffer_size; + if (options.max_open_files) + ldoptions.max_open_files = options.max_open_files; + if (options.cache_size) { + leveldb::Cache *_db_cache = leveldb::NewLRUCache(options.cache_size); + db_cache.reset(_db_cache); + ldoptions.block_cache = db_cache.get(); + } + if (options.block_size) + ldoptions.block_size = options.block_size; + if (options.bloom_size) { +#ifdef HAVE_LEVELDB_FILTER_POLICY + const leveldb::FilterPolicy *_filterpolicy = + leveldb::NewBloomFilterPolicy(options.bloom_size); + filterpolicy.reset(_filterpolicy); + ldoptions.filter_policy = filterpolicy.get(); +#else + assert(0 == "bloom size set but installed leveldb doesn't support bloom filters"); +#endif + } + if (options.compression_enabled) + ldoptions.compression = leveldb::kSnappyCompression; + else + ldoptions.compression = leveldb::kNoCompression; + if (options.block_restart_interval) + ldoptions.block_restart_interval = options.block_restart_interval; + + ldoptions.error_if_exists = options.error_if_exists; + ldoptions.paranoid_checks = options.paranoid_checks; + ldoptions.create_if_missing = create_if_missing; + + if (options.log_file.length()) { + leveldb::Env *env = leveldb::Env::Default(); + env->NewLogger(options.log_file, &ldoptions.info_log); + } + + leveldb::DB *_db; + leveldb::Status status = leveldb::DB::Open(ldoptions, path, &_db); + db.reset(_db); + if (!status.ok()) { + out << status.ToString() << std::endl; + return -EINVAL; + } + + PerfCountersBuilder plb(g_ceph_context, "leveldb", l_leveldb_first, l_leveldb_last); + plb.add_u64_counter(l_leveldb_gets, "leveldb_get", "Gets"); + plb.add_u64_counter(l_leveldb_txns, "leveldb_transaction", "Transactions"); + plb.add_time_avg(l_leveldb_get_latency, "leveldb_get_latency", "Get Latency"); + plb.add_time_avg(l_leveldb_submit_latency, "leveldb_submit_latency", "Submit Latency"); + plb.add_time_avg(l_leveldb_submit_sync_latency, "leveldb_submit_sync_latency", "Submit Sync Latency"); + plb.add_u64_counter(l_leveldb_compact, "leveldb_compact", "Compactions"); + plb.add_u64_counter(l_leveldb_compact_range, "leveldb_compact_range", "Compactions by range"); + plb.add_u64_counter(l_leveldb_compact_queue_merge, "leveldb_compact_queue_merge", "Mergings of ranges in compaction queue"); + plb.add_u64(l_leveldb_compact_queue_len, "leveldb_compact_queue_len", "Length of compaction queue"); + logger = plb.create_perf_counters(); + cct->get_perfcounters_collection()->add(logger); + + if (g_conf->leveldb_compact_on_mount) { + derr << "Compacting leveldb store..." << dendl; + compact(); + derr << "Finished compacting leveldb store" << dendl; + } + return 0; +} + +int LevelDBStore::_test_init(const string& dir) +{ + leveldb::Options options; + options.create_if_missing = true; + leveldb::DB *db; + leveldb::Status status = leveldb::DB::Open(options, dir, &db); + delete db; + return status.ok() ? 0 : -EIO; +} + +LevelDBStore::~LevelDBStore() +{ + close(); + delete logger; + + // Ensure db is destroyed before dependent db_cache and filterpolicy + db.reset(); +} + +void LevelDBStore::close() +{ + // stop compaction thread + compact_queue_lock.Lock(); + if (compact_thread.is_started()) { + compact_queue_stop = true; + compact_queue_cond.Signal(); + compact_queue_lock.Unlock(); + compact_thread.join(); + } else { + compact_queue_lock.Unlock(); + } + + if (logger) + cct->get_perfcounters_collection()->remove(logger); +} + +int LevelDBStore::submit_transaction(KeyValueDB::Transaction t) +{ + utime_t start = ceph_clock_now(g_ceph_context); + LevelDBTransactionImpl * _t = + static_cast(t.get()); + leveldb::Status s = db->Write(leveldb::WriteOptions(), &(_t->bat)); + utime_t lat = ceph_clock_now(g_ceph_context) - start; + logger->inc(l_leveldb_txns); + logger->tinc(l_leveldb_submit_latency, lat); + return s.ok() ? 0 : -1; +} + +int LevelDBStore::submit_transaction_sync(KeyValueDB::Transaction t) +{ + utime_t start = ceph_clock_now(g_ceph_context); + LevelDBTransactionImpl * _t = + static_cast(t.get()); + leveldb::WriteOptions options; + options.sync = true; + leveldb::Status s = db->Write(options, &(_t->bat)); + utime_t lat = ceph_clock_now(g_ceph_context) - start; + logger->inc(l_leveldb_txns); + logger->tinc(l_leveldb_submit_sync_latency, lat); + return s.ok() ? 0 : -1; +} + +void LevelDBStore::LevelDBTransactionImpl::set( + const string &prefix, + const string &k, + const bufferlist &to_set_bl) +{ + string key = combine_strings(prefix, k); + //bufferlist::c_str() is non-constant, so we need to make a copy + bufferlist val = to_set_bl; + bat.Delete(leveldb::Slice(key)); + bat.Put(leveldb::Slice(key), + leveldb::Slice(val.c_str(), val.length())); +} + +void LevelDBStore::LevelDBTransactionImpl::rmkey(const string &prefix, + const string &k) +{ + string key = combine_strings(prefix, k); + bat.Delete(leveldb::Slice(key)); +} + +void LevelDBStore::LevelDBTransactionImpl::rmkeys_by_prefix(const string &prefix) +{ + KeyValueDB::Iterator it = db->get_iterator(prefix); + for (it->seek_to_first(); + it->valid(); + it->next()) { + string key = combine_strings(prefix, it->key()); + bat.Delete(key); + } +} + +int LevelDBStore::get( + const string &prefix, + const std::set &keys, + std::map *out) +{ + utime_t start = ceph_clock_now(g_ceph_context); + KeyValueDB::Iterator it = get_iterator(prefix); + for (std::set::const_iterator i = keys.begin(); + i != keys.end(); + ++i) { + it->lower_bound(*i); + if (it->valid() && it->key() == *i) { + out->insert(make_pair(*i, it->value())); + } else if (!it->valid()) + break; + } + utime_t lat = ceph_clock_now(g_ceph_context) - start; + logger->inc(l_leveldb_gets); + logger->tinc(l_leveldb_get_latency, lat); + return 0; +} + +string LevelDBStore::combine_strings(const string &prefix, const string &value) +{ + string out = prefix; + out.push_back(0); + out.append(value); + return out; +} + +bufferlist LevelDBStore::to_bufferlist(leveldb::Slice in) +{ + bufferlist bl; + bl.append(bufferptr(in.data(), in.size())); + return bl; +} + +int LevelDBStore::split_key(leveldb::Slice in, string *prefix, string *key) +{ + string in_prefix = in.ToString(); + size_t prefix_len = in_prefix.find('\0'); + if (prefix_len >= in_prefix.size()) + return -EINVAL; + + if (prefix) + *prefix = string(in_prefix, 0, prefix_len); + if (key) + *key= string(in_prefix, prefix_len + 1); + return 0; +} + +void LevelDBStore::compact() +{ + logger->inc(l_leveldb_compact); + db->CompactRange(NULL, NULL); +} + + +void LevelDBStore::compact_thread_entry() +{ + compact_queue_lock.Lock(); + while (!compact_queue_stop) { + while (!compact_queue.empty()) { + pair range = compact_queue.front(); + compact_queue.pop_front(); + logger->set(l_leveldb_compact_queue_len, compact_queue.size()); + compact_queue_lock.Unlock(); + logger->inc(l_leveldb_compact_range); + compact_range(range.first, range.second); + compact_queue_lock.Lock(); + continue; + } + compact_queue_cond.Wait(compact_queue_lock); + } + compact_queue_lock.Unlock(); +} + +void LevelDBStore::compact_range_async(const string& start, const string& end) +{ + Mutex::Locker l(compact_queue_lock); + + // try to merge adjacent ranges. this is O(n), but the queue should + // be short. note that we do not cover all overlap cases and merge + // opportunities here, but we capture the ones we currently need. + list< pair >::iterator p = compact_queue.begin(); + while (p != compact_queue.end()) { + if (p->first == start && p->second == end) { + // dup; no-op + return; + } + if (p->first <= end && p->first > start) { + // merge with existing range to the right + compact_queue.push_back(make_pair(start, p->second)); + compact_queue.erase(p); + logger->inc(l_leveldb_compact_queue_merge); + break; + } + if (p->second >= start && p->second < end) { + // merge with existing range to the left + compact_queue.push_back(make_pair(p->first, end)); + compact_queue.erase(p); + logger->inc(l_leveldb_compact_queue_merge); + break; + } + ++p; + } + if (p == compact_queue.end()) { + // no merge, new entry. + compact_queue.push_back(make_pair(start, end)); + logger->set(l_leveldb_compact_queue_len, compact_queue.size()); + } + compact_queue_cond.Signal(); + if (!compact_thread.is_started()) { + compact_thread.create(); + } +} diff --git a/src/kv/LevelDBStore.h b/src/kv/LevelDBStore.h new file mode 100644 index 000000000000..a42f2f067c7e --- /dev/null +++ b/src/kv/LevelDBStore.h @@ -0,0 +1,403 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +#ifndef LEVEL_DB_STORE_H +#define LEVEL_DB_STORE_H + +#include "include/types.h" +#include "include/buffer.h" +#include "KeyValueDB.h" +#include +#include +#include +#include "include/memory.h" +#include +#include "leveldb/db.h" +#include "leveldb/env.h" +#include "leveldb/write_batch.h" +#include "leveldb/slice.h" +#include "leveldb/cache.h" +#ifdef HAVE_LEVELDB_FILTER_POLICY +#include "leveldb/filter_policy.h" +#endif + +#include +#include "common/errno.h" +#include "common/dout.h" +#include "include/assert.h" +#include "common/Formatter.h" +#include "common/Cond.h" + +#include "common/ceph_context.h" + +class PerfCounters; + +enum { + l_leveldb_first = 34300, + l_leveldb_gets, + l_leveldb_txns, + l_leveldb_get_latency, + l_leveldb_submit_latency, + l_leveldb_submit_sync_latency, + l_leveldb_compact, + l_leveldb_compact_range, + l_leveldb_compact_queue_merge, + l_leveldb_compact_queue_len, + l_leveldb_last, +}; + +/** + * Uses LevelDB to implement the KeyValueDB interface + */ +class LevelDBStore : public KeyValueDB { + CephContext *cct; + PerfCounters *logger; + string path; + boost::scoped_ptr db_cache; +#ifdef HAVE_LEVELDB_FILTER_POLICY + boost::scoped_ptr filterpolicy; +#endif + boost::scoped_ptr db; + + int do_open(ostream &out, bool create_if_missing); + + // manage async compactions + Mutex compact_queue_lock; + Cond compact_queue_cond; + list< pair > compact_queue; + bool compact_queue_stop; + class CompactThread : public Thread { + LevelDBStore *db; + public: + CompactThread(LevelDBStore *d) : db(d) {} + void *entry() { + db->compact_thread_entry(); + return NULL; + } + friend class LevelDBStore; + } compact_thread; + + void compact_thread_entry(); + + void compact_range(const string& start, const string& end) { + leveldb::Slice cstart(start); + leveldb::Slice cend(end); + db->CompactRange(&cstart, &cend); + } + void compact_range_async(const string& start, const string& end); + +public: + /// compact the underlying leveldb store + void compact(); + + /// compact db for all keys with a given prefix + void compact_prefix(const string& prefix) { + compact_range(prefix, past_prefix(prefix)); + } + void compact_prefix_async(const string& prefix) { + compact_range_async(prefix, past_prefix(prefix)); + } + void compact_range(const string& prefix, + const string& start, const string& end) { + compact_range(combine_strings(prefix, start), combine_strings(prefix, end)); + } + void compact_range_async(const string& prefix, + const string& start, const string& end) { + compact_range_async(combine_strings(prefix, start), + combine_strings(prefix, end)); + } + + + /** + * options_t: Holds options which are minimally interpreted + * on initialization and then passed through to LevelDB. + * We transform a couple of these into actual LevelDB + * structures, but the rest are simply passed through unchanged. See + * leveldb/options.h for more precise details on each. + * + * Set them after constructing the LevelDBStore, but before calling + * open() or create_and_open(). + */ + struct options_t { + uint64_t write_buffer_size; /// in-memory write buffer size + int max_open_files; /// maximum number of files LevelDB can open at once + uint64_t cache_size; /// size of extra decompressed cache to use + uint64_t block_size; /// user data per block + int bloom_size; /// number of bits per entry to put in a bloom filter + bool compression_enabled; /// whether to use libsnappy compression or not + + // don't change these ones. No, seriously + int block_restart_interval; + bool error_if_exists; + bool paranoid_checks; + + string log_file; + + options_t() : + write_buffer_size(0), //< 0 means default + max_open_files(0), //< 0 means default + cache_size(0), //< 0 means no cache (default) + block_size(0), //< 0 means default + bloom_size(0), //< 0 means no bloom filter (default) + compression_enabled(true), //< set to false for no compression + block_restart_interval(0), //< 0 means default + error_if_exists(false), //< set to true if you want to check nonexistence + paranoid_checks(false) //< set to true if you want paranoid checks + {} + } options; + + LevelDBStore(CephContext *c, const string &path) : + cct(c), + logger(NULL), + path(path), + db_cache(NULL), +#ifdef HAVE_LEVELDB_FILTER_POLICY + filterpolicy(NULL), +#endif + compact_queue_lock("LevelDBStore::compact_thread_lock"), + compact_queue_stop(false), + compact_thread(this), + options() + {} + + ~LevelDBStore(); + + static int _test_init(const string& dir); + int init(string option_str=""); + + /// Opens underlying db + int open(ostream &out) { + return do_open(out, false); + } + /// Creates underlying db if missing and opens it + int create_and_open(ostream &out) { + return do_open(out, true); + } + + void close(); + + class LevelDBTransactionImpl : public KeyValueDB::TransactionImpl { + public: + leveldb::WriteBatch bat; + LevelDBStore *db; + LevelDBTransactionImpl(LevelDBStore *db) : db(db) {} + void set( + const string &prefix, + const string &k, + const bufferlist &bl); + void rmkey( + const string &prefix, + const string &k); + void rmkeys_by_prefix( + const string &prefix + ); + }; + + KeyValueDB::Transaction get_transaction() { + return ceph::shared_ptr< LevelDBTransactionImpl >( + new LevelDBTransactionImpl(this)); + } + + int submit_transaction(KeyValueDB::Transaction t); + int submit_transaction_sync(KeyValueDB::Transaction t); + int get( + const string &prefix, + const std::set &key, + std::map *out + ); + + class LevelDBWholeSpaceIteratorImpl : + public KeyValueDB::WholeSpaceIteratorImpl { + protected: + boost::scoped_ptr dbiter; + public: + LevelDBWholeSpaceIteratorImpl(leveldb::Iterator *iter) : + dbiter(iter) { } + virtual ~LevelDBWholeSpaceIteratorImpl() { } + + int seek_to_first() { + dbiter->SeekToFirst(); + return dbiter->status().ok() ? 0 : -1; + } + int seek_to_first(const string &prefix) { + leveldb::Slice slice_prefix(prefix); + dbiter->Seek(slice_prefix); + return dbiter->status().ok() ? 0 : -1; + } + int seek_to_last() { + dbiter->SeekToLast(); + return dbiter->status().ok() ? 0 : -1; + } + int seek_to_last(const string &prefix) { + string limit = past_prefix(prefix); + leveldb::Slice slice_limit(limit); + dbiter->Seek(slice_limit); + + if (!dbiter->Valid()) { + dbiter->SeekToLast(); + } else { + dbiter->Prev(); + } + return dbiter->status().ok() ? 0 : -1; + } + int upper_bound(const string &prefix, const string &after) { + lower_bound(prefix, after); + if (valid()) { + pair key = raw_key(); + if (key.first == prefix && key.second == after) + next(); + } + return dbiter->status().ok() ? 0 : -1; + } + int lower_bound(const string &prefix, const string &to) { + string bound = combine_strings(prefix, to); + leveldb::Slice slice_bound(bound); + dbiter->Seek(slice_bound); + return dbiter->status().ok() ? 0 : -1; + } + bool valid() { + return dbiter->Valid(); + } + int next() { + if (valid()) + dbiter->Next(); + return dbiter->status().ok() ? 0 : -1; + } + int prev() { + if (valid()) + dbiter->Prev(); + return dbiter->status().ok() ? 0 : -1; + } + string key() { + string out_key; + split_key(dbiter->key(), 0, &out_key); + return out_key; + } + pair raw_key() { + string prefix, key; + split_key(dbiter->key(), &prefix, &key); + return make_pair(prefix, key); + } + bufferlist value() { + return to_bufferlist(dbiter->value()); + } + int status() { + return dbiter->status().ok() ? 0 : -1; + } + }; + + class LevelDBSnapshotIteratorImpl : public LevelDBWholeSpaceIteratorImpl { + leveldb::DB *db; + const leveldb::Snapshot *snapshot; + public: + LevelDBSnapshotIteratorImpl(leveldb::DB *db, const leveldb::Snapshot *s, + leveldb::Iterator *iter) : + LevelDBWholeSpaceIteratorImpl(iter), db(db), snapshot(s) { } + + ~LevelDBSnapshotIteratorImpl() { + assert(snapshot != NULL); + db->ReleaseSnapshot(snapshot); + } + }; + + /// Utility + static string combine_strings(const string &prefix, const string &value); + static int split_key(leveldb::Slice in, string *prefix, string *key); + static bufferlist to_bufferlist(leveldb::Slice in); + static string past_prefix(const string &prefix) { + string limit = prefix; + limit.push_back(1); + return limit; + } + + virtual uint64_t get_estimated_size(map &extra) { + DIR *store_dir = opendir(path.c_str()); + if (!store_dir) { + lderr(cct) << __func__ << " something happened opening the store: " + << cpp_strerror(errno) << dendl; + return 0; + } + + uint64_t total_size = 0; + uint64_t sst_size = 0; + uint64_t log_size = 0; + uint64_t misc_size = 0; + + struct dirent *entry = NULL; + while ((entry = readdir(store_dir)) != NULL) { + string n(entry->d_name); + + if (n == "." || n == "..") + continue; + + string fpath = path + '/' + n; + struct stat s; + int err = stat(fpath.c_str(), &s); + if (err < 0) + err = -errno; + // we may race against leveldb while reading files; this should only + // happen when those files are being updated, data is being shuffled + // and files get removed, in which case there's not much of a problem + // as we'll get to them next time around. + if (err == -ENOENT) { + continue; + } + if (err < 0) { + lderr(cct) << __func__ << " error obtaining stats for " << fpath + << ": " << cpp_strerror(err) << dendl; + goto err; + } + + size_t pos = n.find_last_of('.'); + if (pos == string::npos) { + misc_size += s.st_size; + continue; + } + + string ext = n.substr(pos+1); + if (ext == "sst") { + sst_size += s.st_size; + } else if (ext == "log") { + log_size += s.st_size; + } else { + misc_size += s.st_size; + } + } + + total_size = sst_size + log_size + misc_size; + + extra["sst"] = sst_size; + extra["log"] = log_size; + extra["misc"] = misc_size; + extra["total"] = total_size; + +err: + closedir(store_dir); + return total_size; + } + + +protected: + WholeSpaceIterator _get_iterator() { + return ceph::shared_ptr( + new LevelDBWholeSpaceIteratorImpl( + db->NewIterator(leveldb::ReadOptions()) + ) + ); + } + + WholeSpaceIterator _get_snapshot_iterator() { + const leveldb::Snapshot *snapshot; + leveldb::ReadOptions options; + + snapshot = db->GetSnapshot(); + options.snapshot = snapshot; + + return ceph::shared_ptr( + new LevelDBSnapshotIteratorImpl(db.get(), snapshot, + db->NewIterator(options)) + ); + } + +}; + +#endif diff --git a/src/kv/Makefile.am b/src/kv/Makefile.am new file mode 100644 index 000000000000..e5e4878b682f --- /dev/null +++ b/src/kv/Makefile.am @@ -0,0 +1,40 @@ +if ENABLE_SERVER + +libkv_a_SOURCES = \ + kv/KeyValueDB.cc \ + kv/LevelDBStore.cc +libkv_a_CXXFLAGS = ${AM_CXXFLAGS} -I rocksdb/include +libkv_a_LIBADD = + +noinst_LIBRARIES += libkv.a + +noinst_HEADERS += \ + kv/KeyValueDB.h \ + kv/LevelDBStore.h + +if WITH_SLIBROCKSDB +# build rocksdb with its own makefile +# for some stupid reason this needs -fPIC... +# PORTABLE=1 fixes the aarch64 build (-march=native doesn't work there) +rocksdb/librocksdb.a: + cd rocksdb && EXTRA_CXXFLAGS=-fPIC PORTABLE=1 make -j$(shell nproc) static_lib +libkv_a_CXXFLAGS += -I rocksdb/include -fPIC +libkv_a_SOURCES += kv/RocksDBStore.cc +libkv_a_LIBADD += rocksdb/librocksdb.a +noinst_HEADERS += kv/RocksDBStore.h +endif + +if WITH_DLIBROCKSDB +libkv_a_SOURCES += kv/RocksDBStore.cc +libkv_a_LIBADD += -lrocksdb +noinst_HEADERS += kv/RocksDBStore.h +endif + +if WITH_KINETIC +libkv_a_SOURCES += kv/KineticStore.cc +libkv_a_CXXFLAGS += -std=gnu++11 +libkv_a_LIBADD += -lkinetic_client -lprotobuf -lglog -lgflags libcrypto.a +noinst_HEADERS += kv/KineticStore.h +endif + +endif # ENABLE_SERVER diff --git a/src/kv/RocksDBStore.cc b/src/kv/RocksDBStore.cc new file mode 100644 index 000000000000..a75274328601 --- /dev/null +++ b/src/kv/RocksDBStore.cc @@ -0,0 +1,519 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include +#include +#include +#include +#include + +#include "rocksdb/db.h" +#include "rocksdb/table.h" +#include "rocksdb/env.h" +#include "rocksdb/write_batch.h" +#include "rocksdb/slice.h" +#include "rocksdb/cache.h" +#include "rocksdb/filter_policy.h" +#include "rocksdb/utilities/convenience.h" +using std::string; +#include "common/perf_counters.h" +#include "common/debug.h" +#include "include/str_map.h" +#include "KeyValueDB.h" +#include "RocksDBStore.h" + +int string2bool(string val, bool &b_val) +{ + if (strcasecmp(val.c_str(), "false") == 0) { + b_val = false; + return 0; + } else if (strcasecmp(val.c_str(), "true") == 0) { + b_val = true; + return 0; + } else { + std::string err; + int b = strict_strtol(val.c_str(), 10, &err); + if (!err.empty()) + return -EINVAL; + b_val = !!b; + return 0; + } +} + +int RocksDBStore::tryInterpret(const string key, const string val, rocksdb::Options &opt) +{ + if (key == "compaction_threads") { + std::string err; + int f = strict_sistrtoll(val.c_str(), &err); + if (!err.empty()) + return -EINVAL; + //Low priority threadpool is used for compaction + opt.env->SetBackgroundThreads(f, rocksdb::Env::Priority::LOW); + } else if (key == "flusher_threads") { + std::string err; + int f = strict_sistrtoll(val.c_str(), &err); + if (!err.empty()) + return -EINVAL; + //High priority threadpool is used for flusher + opt.env->SetBackgroundThreads(f, rocksdb::Env::Priority::HIGH); + } else if (key == "compact_on_mount") { + int ret = string2bool(val, compact_on_mount); + if (ret != 0) + return ret; + } else if (key == "disableWAL") { + int ret = string2bool(val, disableWAL); + if (ret != 0) + return ret; + } else { + //unrecognize config options. + return -EINVAL; + } + return 0; +} + +int RocksDBStore::ParseOptionsFromString(const string opt_str, rocksdb::Options &opt) +{ + map str_map; + int r = get_str_map(opt_str, ",\n;", &str_map); + if (r < 0) + return r; + map::iterator it; + for(it = str_map.begin(); it != str_map.end(); ++it) { + string this_opt = it->first + "=" + it->second; + rocksdb::Status status = rocksdb::GetOptionsFromString(opt, this_opt , &opt); + if (!status.ok()) { + //unrecognized by rocksdb, try to interpret by ourselves. + r = tryInterpret(it->first, it->second, opt); + if (r < 0) { + derr << status.ToString() << dendl; + return -EINVAL; + } + } + lgeneric_dout(cct, 0) << " set rocksdb option " << it->first + << " = " << it->second << dendl; + } + return 0; +} + +int RocksDBStore::init(string _options_str) +{ + options_str = _options_str; + rocksdb::Options opt; + //try parse options + int r = ParseOptionsFromString(options_str, opt); + if (r != 0) { + return -EINVAL; + } + return 0; +} + +int RocksDBStore::do_open(ostream &out, bool create_if_missing) +{ + rocksdb::Options opt; + rocksdb::Status status; + + int r = ParseOptionsFromString(options_str, opt); + if (r != 0) { + return -EINVAL; + } + opt.create_if_missing = create_if_missing; + + status = rocksdb::DB::Open(opt, path, &db); + if (!status.ok()) { + derr << status.ToString() << dendl; + return -EINVAL; + } + + PerfCountersBuilder plb(g_ceph_context, "rocksdb", l_rocksdb_first, l_rocksdb_last); + plb.add_u64_counter(l_rocksdb_gets, "rocksdb_get", "Gets"); + plb.add_u64_counter(l_rocksdb_txns, "rocksdb_transaction", "Transactions"); + plb.add_time_avg(l_rocksdb_get_latency, "rocksdb_get_latency", "Get latency"); + plb.add_time_avg(l_rocksdb_submit_latency, "rocksdb_submit_latency", "Submit Latency"); + plb.add_time_avg(l_rocksdb_submit_sync_latency, "rocksdb_submit_sync_latency", "Submit Sync Latency"); + plb.add_u64_counter(l_rocksdb_compact, "rocksdb_compact", "Compactions"); + plb.add_u64_counter(l_rocksdb_compact_range, "rocksdb_compact_range", "Compactions by range"); + plb.add_u64_counter(l_rocksdb_compact_queue_merge, "rocksdb_compact_queue_merge", "Mergings of ranges in compaction queue"); + plb.add_u64(l_rocksdb_compact_queue_len, "rocksdb_compact_queue_len", "Length of compaction queue"); + logger = plb.create_perf_counters(); + cct->get_perfcounters_collection()->add(logger); + + if (compact_on_mount) { + derr << "Compacting rocksdb store..." << dendl; + compact(); + derr << "Finished compacting rocksdb store" << dendl; + } + return 0; +} + +int RocksDBStore::_test_init(const string& dir) +{ + rocksdb::Options options; + options.create_if_missing = true; + rocksdb::DB *db; + rocksdb::Status status = rocksdb::DB::Open(options, dir, &db); + delete db; + return status.ok() ? 0 : -EIO; +} + +RocksDBStore::~RocksDBStore() +{ + close(); + delete logger; + + // Ensure db is destroyed before dependent db_cache and filterpolicy + delete db; +} + +void RocksDBStore::close() +{ + // stop compaction thread + compact_queue_lock.Lock(); + if (compact_thread.is_started()) { + compact_queue_stop = true; + compact_queue_cond.Signal(); + compact_queue_lock.Unlock(); + compact_thread.join(); + } else { + compact_queue_lock.Unlock(); + } + + if (logger) + cct->get_perfcounters_collection()->remove(logger); +} + +int RocksDBStore::submit_transaction(KeyValueDB::Transaction t) +{ + utime_t start = ceph_clock_now(g_ceph_context); + RocksDBTransactionImpl * _t = + static_cast(t.get()); + rocksdb::WriteOptions woptions; + woptions.disableWAL = disableWAL; + rocksdb::Status s = db->Write(woptions, _t->bat); + utime_t lat = ceph_clock_now(g_ceph_context) - start; + logger->inc(l_rocksdb_txns); + logger->tinc(l_rocksdb_submit_latency, lat); + return s.ok() ? 0 : -1; +} + +int RocksDBStore::submit_transaction_sync(KeyValueDB::Transaction t) +{ + utime_t start = ceph_clock_now(g_ceph_context); + RocksDBTransactionImpl * _t = + static_cast(t.get()); + rocksdb::WriteOptions woptions; + woptions.sync = true; + woptions.disableWAL = disableWAL; + rocksdb::Status s = db->Write(woptions, _t->bat); + utime_t lat = ceph_clock_now(g_ceph_context) - start; + logger->inc(l_rocksdb_txns); + logger->tinc(l_rocksdb_submit_sync_latency, lat); + return s.ok() ? 0 : -1; +} +int RocksDBStore::get_info_log_level(string info_log_level) +{ + if (info_log_level == "debug") { + return 0; + } else if (info_log_level == "info") { + return 1; + } else if (info_log_level == "warn") { + return 2; + } else if (info_log_level == "error") { + return 3; + } else if (info_log_level == "fatal") { + return 4; + } else { + return 1; + } +} + +RocksDBStore::RocksDBTransactionImpl::RocksDBTransactionImpl(RocksDBStore *_db) +{ + db = _db; + bat = new rocksdb::WriteBatch(); +} +RocksDBStore::RocksDBTransactionImpl::~RocksDBTransactionImpl() +{ + delete bat; +} +void RocksDBStore::RocksDBTransactionImpl::set( + const string &prefix, + const string &k, + const bufferlist &to_set_bl) +{ + string key = combine_strings(prefix, k); + //bufferlist::c_str() is non-constant, so we need to make a copy + bufferlist val = to_set_bl; + bat->Delete(rocksdb::Slice(key)); + bat->Put(rocksdb::Slice(key), + rocksdb::Slice(val.c_str(), val.length())); +} + +void RocksDBStore::RocksDBTransactionImpl::rmkey(const string &prefix, + const string &k) +{ + bat->Delete(combine_strings(prefix, k)); +} + +void RocksDBStore::RocksDBTransactionImpl::rmkeys_by_prefix(const string &prefix) +{ + KeyValueDB::Iterator it = db->get_iterator(prefix); + for (it->seek_to_first(); + it->valid(); + it->next()) { + bat->Delete(combine_strings(prefix, it->key())); + } +} + +int RocksDBStore::get( + const string &prefix, + const std::set &keys, + std::map *out) +{ + utime_t start = ceph_clock_now(g_ceph_context); + KeyValueDB::Iterator it = get_iterator(prefix); + for (std::set::const_iterator i = keys.begin(); + i != keys.end(); + ++i) { + it->lower_bound(*i); + if (it->valid() && it->key() == *i) { + out->insert(make_pair(*i, it->value())); + } else if (!it->valid()) + break; + } + utime_t lat = ceph_clock_now(g_ceph_context) - start; + logger->inc(l_rocksdb_gets); + logger->tinc(l_rocksdb_get_latency, lat); + return 0; +} + +string RocksDBStore::combine_strings(const string &prefix, const string &value) +{ + string out = prefix; + out.push_back(0); + out.append(value); + return out; +} + +bufferlist RocksDBStore::to_bufferlist(rocksdb::Slice in) +{ + bufferlist bl; + bl.append(bufferptr(in.data(), in.size())); + return bl; +} + +int RocksDBStore::split_key(rocksdb::Slice in, string *prefix, string *key) +{ + string in_prefix = in.ToString(); + size_t prefix_len = in_prefix.find('\0'); + if (prefix_len >= in_prefix.size()) + return -EINVAL; + + if (prefix) + *prefix = string(in_prefix, 0, prefix_len); + if (key) + *key= string(in_prefix, prefix_len + 1); + return 0; +} + +void RocksDBStore::compact() +{ + logger->inc(l_rocksdb_compact); + db->CompactRange(NULL, NULL); +} + + +void RocksDBStore::compact_thread_entry() +{ + compact_queue_lock.Lock(); + while (!compact_queue_stop) { + while (!compact_queue.empty()) { + pair range = compact_queue.front(); + compact_queue.pop_front(); + logger->set(l_rocksdb_compact_queue_len, compact_queue.size()); + compact_queue_lock.Unlock(); + logger->inc(l_rocksdb_compact_range); + compact_range(range.first, range.second); + compact_queue_lock.Lock(); + continue; + } + compact_queue_cond.Wait(compact_queue_lock); + } + compact_queue_lock.Unlock(); +} + +void RocksDBStore::compact_range_async(const string& start, const string& end) +{ + Mutex::Locker l(compact_queue_lock); + + // try to merge adjacent ranges. this is O(n), but the queue should + // be short. note that we do not cover all overlap cases and merge + // opportunities here, but we capture the ones we currently need. + list< pair >::iterator p = compact_queue.begin(); + while (p != compact_queue.end()) { + if (p->first == start && p->second == end) { + // dup; no-op + return; + } + if (p->first <= end && p->first > start) { + // merge with existing range to the right + compact_queue.push_back(make_pair(start, p->second)); + compact_queue.erase(p); + logger->inc(l_rocksdb_compact_queue_merge); + break; + } + if (p->second >= start && p->second < end) { + // merge with existing range to the left + compact_queue.push_back(make_pair(p->first, end)); + compact_queue.erase(p); + logger->inc(l_rocksdb_compact_queue_merge); + break; + } + ++p; + } + if (p == compact_queue.end()) { + // no merge, new entry. + compact_queue.push_back(make_pair(start, end)); + logger->set(l_rocksdb_compact_queue_len, compact_queue.size()); + } + compact_queue_cond.Signal(); + if (!compact_thread.is_started()) { + compact_thread.create(); + } +} +bool RocksDBStore::check_omap_dir(string &omap_dir) +{ + rocksdb::Options options; + options.create_if_missing = true; + rocksdb::DB *db; + rocksdb::Status status = rocksdb::DB::Open(options, omap_dir, &db); + delete db; + return status.ok(); +} +void RocksDBStore::compact_range(const string& start, const string& end) +{ + rocksdb::Slice cstart(start); + rocksdb::Slice cend(end); + db->CompactRange(&cstart, &cend); +} +RocksDBStore::RocksDBWholeSpaceIteratorImpl::~RocksDBWholeSpaceIteratorImpl() +{ + delete dbiter; +} +int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_first() +{ + dbiter->SeekToFirst(); + return dbiter->status().ok() ? 0 : -1; +} +int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_first(const string &prefix) +{ + rocksdb::Slice slice_prefix(prefix); + dbiter->Seek(slice_prefix); + return dbiter->status().ok() ? 0 : -1; +} +int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_last() +{ + dbiter->SeekToLast(); + return dbiter->status().ok() ? 0 : -1; +} +int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_last(const string &prefix) +{ + string limit = past_prefix(prefix); + rocksdb::Slice slice_limit(limit); + dbiter->Seek(slice_limit); + + if (!dbiter->Valid()) { + dbiter->SeekToLast(); + } else { + dbiter->Prev(); + } + return dbiter->status().ok() ? 0 : -1; +} +int RocksDBStore::RocksDBWholeSpaceIteratorImpl::upper_bound(const string &prefix, const string &after) +{ + lower_bound(prefix, after); + if (valid()) { + pair key = raw_key(); + if (key.first == prefix && key.second == after) + next(); + } + return dbiter->status().ok() ? 0 : -1; +} +int RocksDBStore::RocksDBWholeSpaceIteratorImpl::lower_bound(const string &prefix, const string &to) +{ + string bound = combine_strings(prefix, to); + rocksdb::Slice slice_bound(bound); + dbiter->Seek(slice_bound); + return dbiter->status().ok() ? 0 : -1; +} +bool RocksDBStore::RocksDBWholeSpaceIteratorImpl::valid() +{ + return dbiter->Valid(); +} +int RocksDBStore::RocksDBWholeSpaceIteratorImpl::next() +{ + if (valid()) + dbiter->Next(); + return dbiter->status().ok() ? 0 : -1; +} +int RocksDBStore::RocksDBWholeSpaceIteratorImpl::prev() +{ + if (valid()) + dbiter->Prev(); + return dbiter->status().ok() ? 0 : -1; +} +string RocksDBStore::RocksDBWholeSpaceIteratorImpl::key() +{ + string out_key; + split_key(dbiter->key(), 0, &out_key); + return out_key; +} +pair RocksDBStore::RocksDBWholeSpaceIteratorImpl::raw_key() +{ + string prefix, key; + split_key(dbiter->key(), &prefix, &key); + return make_pair(prefix, key); +} +bufferlist RocksDBStore::RocksDBWholeSpaceIteratorImpl::value() +{ + return to_bufferlist(dbiter->value()); +} +int RocksDBStore::RocksDBWholeSpaceIteratorImpl::status() +{ + return dbiter->status().ok() ? 0 : -1; +} + +string RocksDBStore::past_prefix(const string &prefix) +{ + string limit = prefix; + limit.push_back(1); + return limit; +} + + +RocksDBStore::WholeSpaceIterator RocksDBStore::_get_iterator() +{ + return std::shared_ptr( + new RocksDBWholeSpaceIteratorImpl( + db->NewIterator(rocksdb::ReadOptions()) + ) + ); +} + +RocksDBStore::WholeSpaceIterator RocksDBStore::_get_snapshot_iterator() +{ + const rocksdb::Snapshot *snapshot; + rocksdb::ReadOptions options; + + snapshot = db->GetSnapshot(); + options.snapshot = snapshot; + + return std::shared_ptr( + new RocksDBSnapshotIteratorImpl(db, snapshot, + db->NewIterator(options)) + ); +} + +RocksDBStore::RocksDBSnapshotIteratorImpl::~RocksDBSnapshotIteratorImpl() +{ + db->ReleaseSnapshot(snapshot); +} diff --git a/src/kv/RocksDBStore.h b/src/kv/RocksDBStore.h new file mode 100644 index 000000000000..c03b20c86b3f --- /dev/null +++ b/src/kv/RocksDBStore.h @@ -0,0 +1,281 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +#ifndef ROCKS_DB_STORE_H +#define ROCKS_DB_STORE_H + +#include "include/types.h" +#include "include/buffer.h" +#include "KeyValueDB.h" +#include +#include +#include +#include +#include + +#include +#include "common/errno.h" +#include "common/dout.h" +#include "include/assert.h" +#include "common/Formatter.h" +#include "common/Cond.h" + +#include "common/ceph_context.h" +class PerfCounters; + +enum { + l_rocksdb_first = 34300, + l_rocksdb_gets, + l_rocksdb_txns, + l_rocksdb_get_latency, + l_rocksdb_submit_latency, + l_rocksdb_submit_sync_latency, + l_rocksdb_compact, + l_rocksdb_compact_range, + l_rocksdb_compact_queue_merge, + l_rocksdb_compact_queue_len, + l_rocksdb_last, +}; + +namespace rocksdb{ + class DB; + class Cache; + class FilterPolicy; + class Snapshot; + class Slice; + class WriteBatch; + class Iterator; + struct Options; +} +/** + * Uses RocksDB to implement the KeyValueDB interface + */ +class RocksDBStore : public KeyValueDB { + CephContext *cct; + PerfCounters *logger; + string path; + rocksdb::DB *db; + string options_str; + int do_open(ostream &out, bool create_if_missing); + + // manage async compactions + Mutex compact_queue_lock; + Cond compact_queue_cond; + list< pair > compact_queue; + bool compact_queue_stop; + class CompactThread : public Thread { + RocksDBStore *db; + public: + CompactThread(RocksDBStore *d) : db(d) {} + void *entry() { + db->compact_thread_entry(); + return NULL; + } + friend class RocksDBStore; + } compact_thread; + + void compact_thread_entry(); + + void compact_range(const string& start, const string& end); + void compact_range_async(const string& start, const string& end); + +public: + /// compact the underlying rocksdb store + bool compact_on_mount; + bool disableWAL; + void compact(); + + int tryInterpret(const string key, const string val, rocksdb::Options &opt); + int ParseOptionsFromString(const string opt_str, rocksdb::Options &opt); + static int _test_init(const string& dir); + int init(string options_str); + /// compact rocksdb for all keys with a given prefix + void compact_prefix(const string& prefix) { + compact_range(prefix, past_prefix(prefix)); + } + void compact_prefix_async(const string& prefix) { + compact_range_async(prefix, past_prefix(prefix)); + } + + void compact_range(const string& prefix, const string& start, const string& end) { + compact_range(combine_strings(prefix, start), combine_strings(prefix, end)); + } + void compact_range_async(const string& prefix, const string& start, const string& end) { + compact_range_async(combine_strings(prefix, start), combine_strings(prefix, end)); + } + int get_info_log_level(string info_log_level); + + RocksDBStore(CephContext *c, const string &path) : + cct(c), + logger(NULL), + path(path), + db(NULL), + compact_queue_lock("RocksDBStore::compact_thread_lock"), + compact_queue_stop(false), + compact_thread(this), + compact_on_mount(false), + disableWAL(false) + {} + + ~RocksDBStore(); + + static bool check_omap_dir(string &omap_dir); + /// Opens underlying db + int open(ostream &out) { + return do_open(out, false); + } + /// Creates underlying db if missing and opens it + int create_and_open(ostream &out) { + return do_open(out, true); + } + + void close(); + + class RocksDBTransactionImpl : public KeyValueDB::TransactionImpl { + public: + rocksdb::WriteBatch *bat; + RocksDBStore *db; + + RocksDBTransactionImpl(RocksDBStore *_db); + ~RocksDBTransactionImpl(); + void set( + const string &prefix, + const string &k, + const bufferlist &bl); + void rmkey( + const string &prefix, + const string &k); + void rmkeys_by_prefix( + const string &prefix + ); + }; + + KeyValueDB::Transaction get_transaction() { + return std::shared_ptr< RocksDBTransactionImpl >( + new RocksDBTransactionImpl(this)); + } + + int submit_transaction(KeyValueDB::Transaction t); + int submit_transaction_sync(KeyValueDB::Transaction t); + int get( + const string &prefix, + const std::set &key, + std::map *out + ); + + class RocksDBWholeSpaceIteratorImpl : + public KeyValueDB::WholeSpaceIteratorImpl { + protected: + rocksdb::Iterator *dbiter; + public: + RocksDBWholeSpaceIteratorImpl(rocksdb::Iterator *iter) : + dbiter(iter) { } + //virtual ~RocksDBWholeSpaceIteratorImpl() { } + ~RocksDBWholeSpaceIteratorImpl(); + + int seek_to_first(); + int seek_to_first(const string &prefix); + int seek_to_last(); + int seek_to_last(const string &prefix); + int upper_bound(const string &prefix, const string &after); + int lower_bound(const string &prefix, const string &to); + bool valid(); + int next(); + int prev(); + string key(); + pair raw_key(); + bufferlist value(); + int status(); + }; + + class RocksDBSnapshotIteratorImpl : public RocksDBWholeSpaceIteratorImpl { + rocksdb::DB *db; + const rocksdb::Snapshot *snapshot; + public: + RocksDBSnapshotIteratorImpl(rocksdb::DB *db, const rocksdb::Snapshot *s, + rocksdb::Iterator *iter) : + RocksDBWholeSpaceIteratorImpl(iter), db(db), snapshot(s) { } + + ~RocksDBSnapshotIteratorImpl(); + }; + + /// Utility + static string combine_strings(const string &prefix, const string &value); + static int split_key(rocksdb::Slice in, string *prefix, string *key); + static bufferlist to_bufferlist(rocksdb::Slice in); + static string past_prefix(const string &prefix); + + virtual uint64_t get_estimated_size(map &extra) { + DIR *store_dir = opendir(path.c_str()); + if (!store_dir) { + lderr(cct) << __func__ << " something happened opening the store: " + << cpp_strerror(errno) << dendl; + return 0; + } + + uint64_t total_size = 0; + uint64_t sst_size = 0; + uint64_t log_size = 0; + uint64_t misc_size = 0; + + struct dirent *entry = NULL; + while ((entry = readdir(store_dir)) != NULL) { + string n(entry->d_name); + + if (n == "." || n == "..") + continue; + + string fpath = path + '/' + n; + struct stat s; + int err = stat(fpath.c_str(), &s); + if (err < 0) + err = -errno; + // we may race against rocksdb while reading files; this should only + // happen when those files are being updated, data is being shuffled + // and files get removed, in which case there's not much of a problem + // as we'll get to them next time around. + if (err == -ENOENT) { + continue; + } + if (err < 0) { + lderr(cct) << __func__ << " error obtaining stats for " << fpath + << ": " << cpp_strerror(err) << dendl; + goto err; + } + + size_t pos = n.find_last_of('.'); + if (pos == string::npos) { + misc_size += s.st_size; + continue; + } + + string ext = n.substr(pos+1); + if (ext == "sst") { + sst_size += s.st_size; + } else if (ext == "log") { + log_size += s.st_size; + } else { + misc_size += s.st_size; + } + } + + total_size = sst_size + log_size + misc_size; + + extra["sst"] = sst_size; + extra["log"] = log_size; + extra["misc"] = misc_size; + extra["total"] = total_size; + +err: + closedir(store_dir); + return total_size; + } + + +protected: + WholeSpaceIterator _get_iterator(); + + WholeSpaceIterator _get_snapshot_iterator(); + +}; + +#endif diff --git a/src/mon/MonitorDBStore.h b/src/mon/MonitorDBStore.h index 5ba56a4b0b16..15558e3da986 100644 --- a/src/mon/MonitorDBStore.h +++ b/src/mon/MonitorDBStore.h @@ -21,7 +21,7 @@ #include #include #include -#include "os/KeyValueDB.h" +#include "kv/KeyValueDB.h" #include "include/assert.h" #include "common/Formatter.h" diff --git a/src/os/DBObjectMap.cc b/src/os/DBObjectMap.cc index 8ff7ef78fa57..3a878ccb5699 100644 --- a/src/os/DBObjectMap.cc +++ b/src/os/DBObjectMap.cc @@ -11,7 +11,7 @@ #include #include "ObjectMap.h" -#include "KeyValueDB.h" +#include "kv/KeyValueDB.h" #include "DBObjectMap.h" #include diff --git a/src/os/DBObjectMap.h b/src/os/DBObjectMap.h index ee252c1ccb1a..48fa8e936a01 100644 --- a/src/os/DBObjectMap.h +++ b/src/os/DBObjectMap.h @@ -12,7 +12,7 @@ #include #include "ObjectMap.h" -#include "KeyValueDB.h" +#include "kv/KeyValueDB.h" #include "osd/osd_types.h" #include "common/Mutex.h" #include "common/Cond.h" diff --git a/src/os/FileStore.cc b/src/os/FileStore.cc index 4bee72656e67..d51e8861b870 100644 --- a/src/os/FileStore.cc +++ b/src/os/FileStore.cc @@ -69,7 +69,7 @@ #include "common/fd.h" #include "HashIndex.h" #include "DBObjectMap.h" -#include "KeyValueDB.h" +#include "kv/KeyValueDB.h" #include "common/ceph_crypto.h" using ceph::crypto::SHA1; diff --git a/src/os/GenericObjectMap.h b/src/os/GenericObjectMap.h index ecf28222d481..11d50db05536 100644 --- a/src/os/GenericObjectMap.h +++ b/src/os/GenericObjectMap.h @@ -26,7 +26,7 @@ #include "include/memory.h" #include "ObjectMap.h" -#include "KeyValueDB.h" +#include "kv/KeyValueDB.h" #include "osd/osd_types.h" #include "common/Mutex.h" #include "common/Cond.h" diff --git a/src/os/KeyValueDB.cc b/src/os/KeyValueDB.cc deleted file mode 100644 index 65ce487ed806..000000000000 --- a/src/os/KeyValueDB.cc +++ /dev/null @@ -1,50 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab - -#include "KeyValueDB.h" -#include "LevelDBStore.h" -#ifdef HAVE_LIBROCKSDB -#include "RocksDBStore.h" -#endif -#ifdef HAVE_KINETIC -#include "KineticStore.h" -#endif - -KeyValueDB *KeyValueDB::create(CephContext *cct, const string& type, - const string& dir) -{ - if (type == "leveldb") { - return new LevelDBStore(cct, dir); - } -#ifdef HAVE_KINETIC - if (type == "kinetic" && - cct->check_experimental_feature_enabled("kinetic")) { - return new KineticStore(cct); - } -#endif -#ifdef HAVE_LIBROCKSDB - if (type == "rocksdb" && - cct->check_experimental_feature_enabled("rocksdb")) { - return new RocksDBStore(cct, dir); - } -#endif - return NULL; -} - -int KeyValueDB::test_init(const string& type, const string& dir) -{ - if (type == "leveldb") { - return LevelDBStore::_test_init(dir); - } -#ifdef HAVE_KINETIC - if (type == "kinetic") { - return KineticStore::_test_init(g_ceph_context); - } -#endif -#ifdef HAVE_LIBROCKSDB - if (type == "rocksdb") { - return RocksDBStore::_test_init(dir); - } -#endif - return -EINVAL; -} diff --git a/src/os/KeyValueDB.h b/src/os/KeyValueDB.h deleted file mode 100644 index 755a627f8f9f..000000000000 --- a/src/os/KeyValueDB.h +++ /dev/null @@ -1,233 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab -#ifndef KEY_VALUE_DB_H -#define KEY_VALUE_DB_H - -#include "include/buffer.h" -#include -#include -#include -#include -#include "include/memory.h" -#include - -using std::string; -/** - * Defines virtual interface to be implemented by key value store - * - * Kyoto Cabinet or LevelDB should implement this - */ -class KeyValueDB { -public: - class TransactionImpl { - public: - /// Set Keys - void set( - const std::string &prefix, ///< [in] Prefix for keys - const std::map &to_set ///< [in] keys/values to set - ) { - std::map::const_iterator it; - for (it = to_set.begin(); it != to_set.end(); ++it) - set(prefix, it->first, it->second); - } - - /// Set Key - virtual void set( - const std::string &prefix, ///< [in] Prefix for the key - const std::string &k, ///< [in] Key to set - const bufferlist &bl ///< [in] Value to set - ) = 0; - - - /// Removes Keys - void rmkeys( - const std::string &prefix, ///< [in] Prefix to search for - const std::set &keys ///< [in] Keys to remove - ) { - std::set::const_iterator it; - for (it = keys.begin(); it != keys.end(); ++it) - rmkey(prefix, *it); - } - - /// Remove Key - virtual void rmkey( - const std::string &prefix, ///< [in] Prefix to search for - const std::string &k ///< [in] Key to remove - ) = 0; - - /// Removes keys beginning with prefix - virtual void rmkeys_by_prefix( - const std::string &prefix ///< [in] Prefix by which to remove keys - ) = 0; - - virtual ~TransactionImpl() {} - }; - typedef ceph::shared_ptr< TransactionImpl > Transaction; - - /// create a new instance - static KeyValueDB *create(CephContext *cct, const std::string& type, - const std::string& dir); - - /// test whether we can successfully initialize; may have side effects (e.g., create) - static int test_init(const std::string& type, const std::string& dir); - virtual int init(string option_str="") = 0; - virtual int open(std::ostream &out) = 0; - virtual int create_and_open(std::ostream &out) = 0; - - virtual Transaction get_transaction() = 0; - virtual int submit_transaction(Transaction) = 0; - virtual int submit_transaction_sync(Transaction t) { - return submit_transaction(t); - } - - /// Retrieve Keys - virtual int get( - const std::string &prefix, ///< [in] Prefix for key - const std::set &key, ///< [in] Key to retrieve - std::map *out ///< [out] Key value retrieved - ) = 0; - virtual int get(const std::string &prefix, ///< [in] prefix - const std::string &key, ///< [in] key - bufferlist *value) { ///< [out] value - std::set ks; - ks.insert(key); - std::map om; - int r = get(prefix, ks, &om); - if (om.find(key) != om.end()) { - *value = om[key]; - } else { - *value = bufferlist(); - r = -ENOENT; - } - return r; - } - - class GenericIteratorImpl { - public: - virtual int seek_to_first() = 0; - virtual int upper_bound(const std::string &after) = 0; - virtual int lower_bound(const std::string &to) = 0; - virtual bool valid() = 0; - virtual int next() = 0; - virtual std::string key() = 0; - virtual bufferlist value() = 0; - virtual int status() = 0; - virtual ~GenericIteratorImpl() {} - }; - - class WholeSpaceIteratorImpl { - public: - virtual int seek_to_first() = 0; - virtual int seek_to_first(const std::string &prefix) = 0; - virtual int seek_to_last() = 0; - virtual int seek_to_last(const std::string &prefix) = 0; - virtual int upper_bound(const std::string &prefix, const std::string &after) = 0; - virtual int lower_bound(const std::string &prefix, const std::string &to) = 0; - virtual bool valid() = 0; - virtual int next() = 0; - virtual int prev() = 0; - virtual std::string key() = 0; - virtual std::pair raw_key() = 0; - virtual bufferlist value() = 0; - virtual int status() = 0; - virtual ~WholeSpaceIteratorImpl() { } - }; - typedef ceph::shared_ptr< WholeSpaceIteratorImpl > WholeSpaceIterator; - - class IteratorImpl : public GenericIteratorImpl { - const std::string prefix; - WholeSpaceIterator generic_iter; - public: - IteratorImpl(const std::string &prefix, WholeSpaceIterator iter) : - prefix(prefix), generic_iter(iter) { } - virtual ~IteratorImpl() { } - - int seek_to_first() { - return generic_iter->seek_to_first(prefix); - } - int seek_to_last() { - return generic_iter->seek_to_last(prefix); - } - int upper_bound(const std::string &after) { - return generic_iter->upper_bound(prefix, after); - } - int lower_bound(const std::string &to) { - return generic_iter->lower_bound(prefix, to); - } - bool valid() { - if (!generic_iter->valid()) - return false; - std::pair raw_key = generic_iter->raw_key(); - return (raw_key.first.compare(0, prefix.length(), prefix) == 0); - } - int next() { - if (valid()) - return generic_iter->next(); - return status(); - } - int prev() { - if (valid()) - return generic_iter->prev(); - return status(); - } - std::string key() { - return generic_iter->key(); - } - std::pair raw_key() { - return generic_iter->raw_key(); - } - bufferlist value() { - return generic_iter->value(); - } - int status() { - return generic_iter->status(); - } - }; - - typedef ceph::shared_ptr< IteratorImpl > Iterator; - - WholeSpaceIterator get_iterator() { - return _get_iterator(); - } - - Iterator get_iterator(const std::string &prefix) { - return ceph::shared_ptr( - new IteratorImpl(prefix, get_iterator()) - ); - } - - WholeSpaceIterator get_snapshot_iterator() { - return _get_snapshot_iterator(); - } - - Iterator get_snapshot_iterator(const std::string &prefix) { - return ceph::shared_ptr( - new IteratorImpl(prefix, get_snapshot_iterator()) - ); - } - - virtual uint64_t get_estimated_size(std::map &extra) = 0; - virtual int get_statfs(struct statfs *buf) { - return -EOPNOTSUPP; - } - - virtual ~KeyValueDB() {} - - /// compact the underlying store - virtual void compact() {} - - /// compact db for all keys with a given prefix - virtual void compact_prefix(const std::string& prefix) {} - /// compact db for all keys with a given prefix, async - virtual void compact_prefix_async(const std::string& prefix) {} - virtual void compact_range(const std::string& prefix, - const std::string& start, const std::string& end) {} - virtual void compact_range_async(const std::string& prefix, - const std::string& start, const std::string& end) {} - -protected: - virtual WholeSpaceIterator _get_iterator() = 0; - virtual WholeSpaceIterator _get_snapshot_iterator() = 0; -}; - -#endif diff --git a/src/os/KeyValueStore.h b/src/os/KeyValueStore.h index 90e41ee85a17..4307901e5150 100644 --- a/src/os/KeyValueStore.h +++ b/src/os/KeyValueStore.h @@ -36,7 +36,7 @@ using namespace std; #include "common/Mutex.h" #include "GenericObjectMap.h" -#include "KeyValueDB.h" +#include "kv/KeyValueDB.h" #include "common/random_cache.hpp" #include "include/uuid.h" diff --git a/src/os/KineticStore.cc b/src/os/KineticStore.cc deleted file mode 100644 index fb6e2bfe002e..000000000000 --- a/src/os/KineticStore.cc +++ /dev/null @@ -1,329 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab -#include "KineticStore.h" -#include "common/ceph_crypto.h" - -#include -#include -#include -#include "include/memory.h" -#include -using std::string; -#include "common/perf_counters.h" - -#define dout_subsys ceph_subsys_keyvaluestore - -int KineticStore::init() -{ - // init defaults. caller can override these if they want - // prior to calling open. - host = cct->_conf->kinetic_host; - port = cct->_conf->kinetic_port; - user_id = cct->_conf->kinetic_user_id; - hmac_key = cct->_conf->kinetic_hmac_key; - use_ssl = cct->_conf->kinetic_use_ssl; - return 0; -} - -int KineticStore::_test_init(CephContext *c) -{ - kinetic::KineticConnectionFactory conn_factory = - kinetic::NewKineticConnectionFactory(); - - kinetic::ConnectionOptions options; - options.host = cct->_conf->kinetic_host; - options.port = cct->_conf->kinetic_port; - options.user_id = cct->_conf->kinetic_user_id; - options.hmac_key = cct->_conf->kinetic_hmac_key; - options.use_ssl = cct->_conf->kinetic_use_ssl; - - kinetic::Status status = conn_factory.NewThreadsafeBlockingConnection(options, kinetic_conn, 10); - kinetic_conn.reset(); - if (!status.ok()) - derr << __func__ << "Unable to connect to kinetic store " << options.host - << ":" << options.port << " : " << status.ToString() << dendl; - return status.ok() ? 0 : -EIO; -} - -int KineticStore::do_open(ostream &out, bool create_if_missing) -{ - kinetic::KineticConnectionFactory conn_factory = - kinetic::NewKineticConnectionFactory(); - kinetic::ConnectionOptions options; - options.host = host; - options.port = port; - options.user_id = user_id; - options.hmac_key = hmac_key; - options.use_ssl = use_ssl; - kinetic::Status status = conn_factory.NewThreadsafeBlockingConnection(options, kinetic_conn, 10); - if (!status.ok()) { - derr << "Unable to connect to kinetic store " << host << ":" << port - << " : " << status.ToString() << dendl; - return -EINVAL; - } - - PerfCountersBuilder plb(g_ceph_context, "kinetic", l_kinetic_first, l_kinetic_last); - plb.add_u64_counter(l_kinetic_gets, "kinetic_get", "Gets"); - plb.add_u64_counter(l_kinetic_txns, "kinetic_transaction", "Transactions"); - logger = plb.create_perf_counters(); - cct->get_perfcounters_collection()->add(logger); - return 0; -} - -KineticStore::KineticStore(CephContext *c) : - cct(c), - logger(NULL) -{ - host = c->_conf->kinetic_host; - port = c->_conf->kinetic_port; - user_id = c->_conf->kinetic_user_id; - hmac_key = c->_conf->kinetic_hmac_key; - use_ssl = c->_conf->kinetic_use_ssl; -} - -KineticStore::~KineticStore() -{ - close(); - delete logger; -} - -void KineticStore::close() -{ - kinetic_conn.reset(); - if (logger) - cct->get_perfcounters_collection()->remove(logger); -} - -int KineticStore::submit_transaction(KeyValueDB::Transaction t) -{ - KineticTransactionImpl * _t = - static_cast(t.get()); - - dout(20) << "kinetic submit_transaction" << dendl; - - for (vector::iterator it = _t->ops.begin(); - it != _t->ops.end(); ++it) { - kinetic::KineticStatus status(kinetic::StatusCode::OK, ""); - if (it->type == KINETIC_OP_WRITE) { - string data(it->data.c_str(), it->data.length()); - kinetic::KineticRecord record(data, "", "", - com::seagate::kinetic::client::proto::Message_Algorithm_SHA1); - dout(30) << "kinetic before put of " << it->key << " (" << data.length() << " bytes)" << dendl; - status = kinetic_conn->Put(it->key, "", kinetic::WriteMode::IGNORE_VERSION, - record); - dout(30) << "kinetic after put of " << it->key << dendl; - } else { - assert(it->type == KINETIC_OP_DELETE); - dout(30) << "kinetic before delete" << dendl; - status = kinetic_conn->Delete(it->key, "", - kinetic::WriteMode::IGNORE_VERSION); - dout(30) << "kinetic after delete" << dendl; - } - if (!status.ok()) { - derr << "kinetic error submitting transaction: " - << status.message() << dendl; - return -1; - } - } - - logger->inc(l_kinetic_txns); - return 0; -} - -int KineticStore::submit_transaction_sync(KeyValueDB::Transaction t) -{ - return submit_transaction(t); -} - -void KineticStore::KineticTransactionImpl::set( - const string &prefix, - const string &k, - const bufferlist &to_set_bl) -{ - string key = combine_strings(prefix, k); - dout(30) << "kinetic set key " << key << dendl; - ops.push_back(KineticOp(KINETIC_OP_WRITE, key, to_set_bl)); -} - -void KineticStore::KineticTransactionImpl::rmkey(const string &prefix, - const string &k) -{ - string key = combine_strings(prefix, k); - dout(30) << "kinetic rm key " << key << dendl; - ops.push_back(KineticOp(KINETIC_OP_DELETE, key)); -} - -void KineticStore::KineticTransactionImpl::rmkeys_by_prefix(const string &prefix) -{ - dout(20) << "kinetic rmkeys_by_prefix " << prefix << dendl; - KeyValueDB::Iterator it = db->get_iterator(prefix); - for (it->seek_to_first(); - it->valid(); - it->next()) { - string key = combine_strings(prefix, it->key()); - ops.push_back(KineticOp(KINETIC_OP_DELETE, key)); - dout(30) << "kinetic rm key by prefix: " << key << dendl; - } -} - -int KineticStore::get( - const string &prefix, - const std::set &keys, - std::map *out) -{ - dout(30) << "kinetic get prefix: " << prefix << " keys: " << keys << dendl; - for (std::set::const_iterator i = keys.begin(); - i != keys.end(); - ++i) { - unique_ptr record; - string key = combine_strings(prefix, *i); - dout(30) << "before get key " << key << dendl; - kinetic::KineticStatus status = kinetic_conn->Get(key, record); - if (!status.ok()) - break; - dout(30) << "kinetic get got key: " << key << dendl; - out->insert(make_pair(key, to_bufferlist(*record.get()))); - } - logger->inc(l_kinetic_gets); - return 0; -} - -string KineticStore::combine_strings(const string &prefix, const string &value) -{ - string out = prefix; - out.push_back(1); - out.append(value); - return out; -} - -bufferlist KineticStore::to_bufferlist(const kinetic::KineticRecord &record) -{ - bufferlist bl; - bl.append(*(record.value())); - return bl; -} - -int KineticStore::split_key(string in_prefix, string *prefix, string *key) -{ - size_t prefix_len = in_prefix.find('\1'); - if (prefix_len >= in_prefix.size()) - return -EINVAL; - - if (prefix) - *prefix = string(in_prefix, 0, prefix_len); - if (key) - *key= string(in_prefix, prefix_len + 1); - return 0; -} - -KineticStore::KineticWholeSpaceIteratorImpl::KineticWholeSpaceIteratorImpl(kinetic::BlockingKineticConnection *conn) : kinetic_conn(conn), - kinetic_status(kinetic::StatusCode::OK, "") -{ - dout(30) << "kinetic iterator constructor()" << dendl; - const static string last_key = "\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF"; - kinetic::KeyRangeIterator it = - kinetic_conn->IterateKeyRange("", true, last_key, true, 1024); - while (it != kinetic::KeyRangeEnd()) { - try { - keys.insert(*it); - dout(30) << "kinetic iterator added " << *it << dendl; - } catch (std::runtime_error &e) { - kinetic_status = kinetic::KineticStatus(kinetic::StatusCode::CLIENT_INTERNAL_ERROR, e.what()); - return; - } - ++it; - } - keys_iter = keys.begin(); -} - -int KineticStore::KineticWholeSpaceIteratorImpl::seek_to_first(const string &prefix) -{ - dout(30) << "kinetic iterator seek_to_first(prefix): " << prefix << dendl; - keys_iter = keys.lower_bound(prefix); - return 0; -} - -int KineticStore::KineticWholeSpaceIteratorImpl::seek_to_last() -{ - dout(30) << "kinetic iterator seek_to_last()" << dendl; - keys_iter = keys.end(); - if (keys.begin() != keys_iter) - --keys_iter; - return 0; -} - -int KineticStore::KineticWholeSpaceIteratorImpl::seek_to_last(const string &prefix) -{ - dout(30) << "kinetic iterator seek_to_last(prefix): " << prefix << dendl; - keys_iter = keys.upper_bound(prefix + "\2"); - if (keys.begin() == keys_iter) { - keys_iter = keys.end(); - } else { - --keys_iter; - } - return 0; -} - -int KineticStore::KineticWholeSpaceIteratorImpl::upper_bound(const string &prefix, const string &after) { - dout(30) << "kinetic iterator upper_bound()" << dendl; - string bound = combine_strings(prefix, after); - keys_iter = keys.upper_bound(bound); - return 0; -} - -int KineticStore::KineticWholeSpaceIteratorImpl::lower_bound(const string &prefix, const string &to) { - dout(30) << "kinetic iterator lower_bound()" << dendl; - string bound = combine_strings(prefix, to); - keys_iter = keys.lower_bound(bound); - return 0; -} - -bool KineticStore::KineticWholeSpaceIteratorImpl::valid() { - dout(30) << "kinetic iterator valid()" << dendl; - return keys_iter != keys.end(); -} - -int KineticStore::KineticWholeSpaceIteratorImpl::next() { - dout(30) << "kinetic iterator next()" << dendl; - if (keys_iter != keys.end()) { - ++keys_iter; - return 0; - } - return -1; -} - -int KineticStore::KineticWholeSpaceIteratorImpl::prev() { - dout(30) << "kinetic iterator prev()" << dendl; - if (keys_iter != keys.begin()) { - --keys_iter; - return 0; - } - keys_iter = keys.end(); - return -1; -} - -string KineticStore::KineticWholeSpaceIteratorImpl::key() { - dout(30) << "kinetic iterator key()" << dendl; - string out_key; - split_key(*keys_iter, NULL, &out_key); - return out_key; -} - -pair KineticStore::KineticWholeSpaceIteratorImpl::raw_key() { - dout(30) << "kinetic iterator raw_key()" << dendl; - string prefix, key; - split_key(*keys_iter, &prefix, &key); - return make_pair(prefix, key); -} - -bufferlist KineticStore::KineticWholeSpaceIteratorImpl::value() { - dout(30) << "kinetic iterator value()" << dendl; - unique_ptr record; - kinetic_status = kinetic_conn->Get(*keys_iter, record); - return to_bufferlist(*record.get()); -} - -int KineticStore::KineticWholeSpaceIteratorImpl::status() { - dout(30) << "kinetic iterator status()" << dendl; - return kinetic_status.ok() ? 0 : -1; -} diff --git a/src/os/KineticStore.h b/src/os/KineticStore.h deleted file mode 100644 index cbb7633466b1..000000000000 --- a/src/os/KineticStore.h +++ /dev/null @@ -1,160 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab -#ifndef KINETIC_STORE_H -#define KINETIC_STORE_H - -#include "include/types.h" -#include "include/buffer.h" -#include "KeyValueDB.h" -#include -#include -#include -#include "include/memory.h" -#include - -#include -#include "common/errno.h" -#include "common/dout.h" -#include "include/assert.h" -#include "common/Formatter.h" - -#include "common/ceph_context.h" - -class PerfCounters; - -enum { - l_kinetic_first = 34400, - l_kinetic_gets, - l_kinetic_txns, - l_kinetic_last, -}; - -/** - * Uses Kinetic to implement the KeyValueDB interface - */ -class KineticStore : public KeyValueDB { - CephContext *cct; - PerfCounters *logger; - string host; - int port; - int user_id; - string hmac_key; - bool use_ssl; - std::unique_ptr kinetic_conn; - - int do_open(ostream &out, bool create_if_missing); - -public: - KineticStore(CephContext *c); - ~KineticStore(); - - static int _test_init(CephContext *c); - int init(); - - /// Opens underlying db - int open(ostream &out) { - return do_open(out, false); - } - /// Creates underlying db if missing and opens it - int create_and_open(ostream &out) { - return do_open(out, true); - } - - void close(); - - enum KineticOpType { - KINETIC_OP_WRITE, - KINETIC_OP_DELETE, - }; - - struct KineticOp { - KineticOpType type; - std::string key; - bufferlist data; - KineticOp(KineticOpType type, const string &key) : type(type), key(key) {} - KineticOp(KineticOpType type, const string &key, const bufferlist &data) - : type(type), key(key), data(data) {} - }; - - class KineticTransactionImpl : public KeyValueDB::TransactionImpl { - public: - vector ops; - KineticStore *db; - - KineticTransactionImpl(KineticStore *db) : db(db) {} - void set( - const string &prefix, - const string &k, - const bufferlist &bl); - void rmkey( - const string &prefix, - const string &k); - void rmkeys_by_prefix( - const string &prefix - ); - }; - - KeyValueDB::Transaction get_transaction() { - return ceph::shared_ptr< KineticTransactionImpl >( - new KineticTransactionImpl(this)); - } - - int submit_transaction(KeyValueDB::Transaction t); - int submit_transaction_sync(KeyValueDB::Transaction t); - int get( - const string &prefix, - const std::set &key, - std::map *out - ); - - class KineticWholeSpaceIteratorImpl : - public KeyValueDB::WholeSpaceIteratorImpl { - std::set keys; - std::set::iterator keys_iter; - kinetic::BlockingKineticConnection *kinetic_conn; - kinetic::KineticStatus kinetic_status; - public: - KineticWholeSpaceIteratorImpl(kinetic::BlockingKineticConnection *conn); - virtual ~KineticWholeSpaceIteratorImpl() { } - - int seek_to_first() { - return seek_to_first(""); - } - int seek_to_first(const string &prefix); - int seek_to_last(); - int seek_to_last(const string &prefix); - int upper_bound(const string &prefix, const string &after); - int lower_bound(const string &prefix, const string &to); - bool valid(); - int next(); - int prev(); - string key(); - pair raw_key(); - bufferlist value(); - int status(); - }; - - /// Utility - static string combine_strings(const string &prefix, const string &value); - static int split_key(string in_prefix, string *prefix, string *key); - static bufferlist to_bufferlist(const kinetic::KineticRecord &record); - virtual uint64_t get_estimated_size(map &extra) { - // not used by the osd - return 0; - } - - -protected: - WholeSpaceIterator _get_iterator() { - return ceph::shared_ptr( - new KineticWholeSpaceIteratorImpl(kinetic_conn.get())); - } - - // TODO: remove snapshots from interface - WholeSpaceIterator _get_snapshot_iterator() { - return _get_iterator(); - } - -}; - -#endif diff --git a/src/os/LevelDBStore.cc b/src/os/LevelDBStore.cc deleted file mode 100644 index 6a71aef9ec34..000000000000 --- a/src/os/LevelDBStore.cc +++ /dev/null @@ -1,307 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab -#include "LevelDBStore.h" - -#include -#include -#include -#include "include/memory.h" -#include -using std::string; -#include "common/debug.h" -#include "common/perf_counters.h" - -int LevelDBStore::init(string option_str) -{ - // init defaults. caller can override these if they want - // prior to calling open. - options.write_buffer_size = g_conf->leveldb_write_buffer_size; - options.cache_size = g_conf->leveldb_cache_size; - options.block_size = g_conf->leveldb_block_size; - options.bloom_size = g_conf->leveldb_bloom_size; - options.compression_enabled = g_conf->leveldb_compression; - options.paranoid_checks = g_conf->leveldb_paranoid; - options.max_open_files = g_conf->leveldb_max_open_files; - options.log_file = g_conf->leveldb_log; - return 0; -} - -int LevelDBStore::do_open(ostream &out, bool create_if_missing) -{ - leveldb::Options ldoptions; - - if (options.write_buffer_size) - ldoptions.write_buffer_size = options.write_buffer_size; - if (options.max_open_files) - ldoptions.max_open_files = options.max_open_files; - if (options.cache_size) { - leveldb::Cache *_db_cache = leveldb::NewLRUCache(options.cache_size); - db_cache.reset(_db_cache); - ldoptions.block_cache = db_cache.get(); - } - if (options.block_size) - ldoptions.block_size = options.block_size; - if (options.bloom_size) { -#ifdef HAVE_LEVELDB_FILTER_POLICY - const leveldb::FilterPolicy *_filterpolicy = - leveldb::NewBloomFilterPolicy(options.bloom_size); - filterpolicy.reset(_filterpolicy); - ldoptions.filter_policy = filterpolicy.get(); -#else - assert(0 == "bloom size set but installed leveldb doesn't support bloom filters"); -#endif - } - if (options.compression_enabled) - ldoptions.compression = leveldb::kSnappyCompression; - else - ldoptions.compression = leveldb::kNoCompression; - if (options.block_restart_interval) - ldoptions.block_restart_interval = options.block_restart_interval; - - ldoptions.error_if_exists = options.error_if_exists; - ldoptions.paranoid_checks = options.paranoid_checks; - ldoptions.create_if_missing = create_if_missing; - - if (options.log_file.length()) { - leveldb::Env *env = leveldb::Env::Default(); - env->NewLogger(options.log_file, &ldoptions.info_log); - } - - leveldb::DB *_db; - leveldb::Status status = leveldb::DB::Open(ldoptions, path, &_db); - db.reset(_db); - if (!status.ok()) { - out << status.ToString() << std::endl; - return -EINVAL; - } - - PerfCountersBuilder plb(g_ceph_context, "leveldb", l_leveldb_first, l_leveldb_last); - plb.add_u64_counter(l_leveldb_gets, "leveldb_get", "Gets"); - plb.add_u64_counter(l_leveldb_txns, "leveldb_transaction", "Transactions"); - plb.add_time_avg(l_leveldb_get_latency, "leveldb_get_latency", "Get Latency"); - plb.add_time_avg(l_leveldb_submit_latency, "leveldb_submit_latency", "Submit Latency"); - plb.add_time_avg(l_leveldb_submit_sync_latency, "leveldb_submit_sync_latency", "Submit Sync Latency"); - plb.add_u64_counter(l_leveldb_compact, "leveldb_compact", "Compactions"); - plb.add_u64_counter(l_leveldb_compact_range, "leveldb_compact_range", "Compactions by range"); - plb.add_u64_counter(l_leveldb_compact_queue_merge, "leveldb_compact_queue_merge", "Mergings of ranges in compaction queue"); - plb.add_u64(l_leveldb_compact_queue_len, "leveldb_compact_queue_len", "Length of compaction queue"); - logger = plb.create_perf_counters(); - cct->get_perfcounters_collection()->add(logger); - - if (g_conf->leveldb_compact_on_mount) { - derr << "Compacting leveldb store..." << dendl; - compact(); - derr << "Finished compacting leveldb store" << dendl; - } - return 0; -} - -int LevelDBStore::_test_init(const string& dir) -{ - leveldb::Options options; - options.create_if_missing = true; - leveldb::DB *db; - leveldb::Status status = leveldb::DB::Open(options, dir, &db); - delete db; - return status.ok() ? 0 : -EIO; -} - -LevelDBStore::~LevelDBStore() -{ - close(); - delete logger; - - // Ensure db is destroyed before dependent db_cache and filterpolicy - db.reset(); -} - -void LevelDBStore::close() -{ - // stop compaction thread - compact_queue_lock.Lock(); - if (compact_thread.is_started()) { - compact_queue_stop = true; - compact_queue_cond.Signal(); - compact_queue_lock.Unlock(); - compact_thread.join(); - } else { - compact_queue_lock.Unlock(); - } - - if (logger) - cct->get_perfcounters_collection()->remove(logger); -} - -int LevelDBStore::submit_transaction(KeyValueDB::Transaction t) -{ - utime_t start = ceph_clock_now(g_ceph_context); - LevelDBTransactionImpl * _t = - static_cast(t.get()); - leveldb::Status s = db->Write(leveldb::WriteOptions(), &(_t->bat)); - utime_t lat = ceph_clock_now(g_ceph_context) - start; - logger->inc(l_leveldb_txns); - logger->tinc(l_leveldb_submit_latency, lat); - return s.ok() ? 0 : -1; -} - -int LevelDBStore::submit_transaction_sync(KeyValueDB::Transaction t) -{ - utime_t start = ceph_clock_now(g_ceph_context); - LevelDBTransactionImpl * _t = - static_cast(t.get()); - leveldb::WriteOptions options; - options.sync = true; - leveldb::Status s = db->Write(options, &(_t->bat)); - utime_t lat = ceph_clock_now(g_ceph_context) - start; - logger->inc(l_leveldb_txns); - logger->tinc(l_leveldb_submit_sync_latency, lat); - return s.ok() ? 0 : -1; -} - -void LevelDBStore::LevelDBTransactionImpl::set( - const string &prefix, - const string &k, - const bufferlist &to_set_bl) -{ - string key = combine_strings(prefix, k); - //bufferlist::c_str() is non-constant, so we need to make a copy - bufferlist val = to_set_bl; - bat.Delete(leveldb::Slice(key)); - bat.Put(leveldb::Slice(key), - leveldb::Slice(val.c_str(), val.length())); -} - -void LevelDBStore::LevelDBTransactionImpl::rmkey(const string &prefix, - const string &k) -{ - string key = combine_strings(prefix, k); - bat.Delete(leveldb::Slice(key)); -} - -void LevelDBStore::LevelDBTransactionImpl::rmkeys_by_prefix(const string &prefix) -{ - KeyValueDB::Iterator it = db->get_iterator(prefix); - for (it->seek_to_first(); - it->valid(); - it->next()) { - string key = combine_strings(prefix, it->key()); - bat.Delete(key); - } -} - -int LevelDBStore::get( - const string &prefix, - const std::set &keys, - std::map *out) -{ - utime_t start = ceph_clock_now(g_ceph_context); - KeyValueDB::Iterator it = get_iterator(prefix); - for (std::set::const_iterator i = keys.begin(); - i != keys.end(); - ++i) { - it->lower_bound(*i); - if (it->valid() && it->key() == *i) { - out->insert(make_pair(*i, it->value())); - } else if (!it->valid()) - break; - } - utime_t lat = ceph_clock_now(g_ceph_context) - start; - logger->inc(l_leveldb_gets); - logger->tinc(l_leveldb_get_latency, lat); - return 0; -} - -string LevelDBStore::combine_strings(const string &prefix, const string &value) -{ - string out = prefix; - out.push_back(0); - out.append(value); - return out; -} - -bufferlist LevelDBStore::to_bufferlist(leveldb::Slice in) -{ - bufferlist bl; - bl.append(bufferptr(in.data(), in.size())); - return bl; -} - -int LevelDBStore::split_key(leveldb::Slice in, string *prefix, string *key) -{ - string in_prefix = in.ToString(); - size_t prefix_len = in_prefix.find('\0'); - if (prefix_len >= in_prefix.size()) - return -EINVAL; - - if (prefix) - *prefix = string(in_prefix, 0, prefix_len); - if (key) - *key= string(in_prefix, prefix_len + 1); - return 0; -} - -void LevelDBStore::compact() -{ - logger->inc(l_leveldb_compact); - db->CompactRange(NULL, NULL); -} - - -void LevelDBStore::compact_thread_entry() -{ - compact_queue_lock.Lock(); - while (!compact_queue_stop) { - while (!compact_queue.empty()) { - pair range = compact_queue.front(); - compact_queue.pop_front(); - logger->set(l_leveldb_compact_queue_len, compact_queue.size()); - compact_queue_lock.Unlock(); - logger->inc(l_leveldb_compact_range); - compact_range(range.first, range.second); - compact_queue_lock.Lock(); - continue; - } - compact_queue_cond.Wait(compact_queue_lock); - } - compact_queue_lock.Unlock(); -} - -void LevelDBStore::compact_range_async(const string& start, const string& end) -{ - Mutex::Locker l(compact_queue_lock); - - // try to merge adjacent ranges. this is O(n), but the queue should - // be short. note that we do not cover all overlap cases and merge - // opportunities here, but we capture the ones we currently need. - list< pair >::iterator p = compact_queue.begin(); - while (p != compact_queue.end()) { - if (p->first == start && p->second == end) { - // dup; no-op - return; - } - if (p->first <= end && p->first > start) { - // merge with existing range to the right - compact_queue.push_back(make_pair(start, p->second)); - compact_queue.erase(p); - logger->inc(l_leveldb_compact_queue_merge); - break; - } - if (p->second >= start && p->second < end) { - // merge with existing range to the left - compact_queue.push_back(make_pair(p->first, end)); - compact_queue.erase(p); - logger->inc(l_leveldb_compact_queue_merge); - break; - } - ++p; - } - if (p == compact_queue.end()) { - // no merge, new entry. - compact_queue.push_back(make_pair(start, end)); - logger->set(l_leveldb_compact_queue_len, compact_queue.size()); - } - compact_queue_cond.Signal(); - if (!compact_thread.is_started()) { - compact_thread.create(); - } -} diff --git a/src/os/LevelDBStore.h b/src/os/LevelDBStore.h deleted file mode 100644 index a42f2f067c7e..000000000000 --- a/src/os/LevelDBStore.h +++ /dev/null @@ -1,403 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab -#ifndef LEVEL_DB_STORE_H -#define LEVEL_DB_STORE_H - -#include "include/types.h" -#include "include/buffer.h" -#include "KeyValueDB.h" -#include -#include -#include -#include "include/memory.h" -#include -#include "leveldb/db.h" -#include "leveldb/env.h" -#include "leveldb/write_batch.h" -#include "leveldb/slice.h" -#include "leveldb/cache.h" -#ifdef HAVE_LEVELDB_FILTER_POLICY -#include "leveldb/filter_policy.h" -#endif - -#include -#include "common/errno.h" -#include "common/dout.h" -#include "include/assert.h" -#include "common/Formatter.h" -#include "common/Cond.h" - -#include "common/ceph_context.h" - -class PerfCounters; - -enum { - l_leveldb_first = 34300, - l_leveldb_gets, - l_leveldb_txns, - l_leveldb_get_latency, - l_leveldb_submit_latency, - l_leveldb_submit_sync_latency, - l_leveldb_compact, - l_leveldb_compact_range, - l_leveldb_compact_queue_merge, - l_leveldb_compact_queue_len, - l_leveldb_last, -}; - -/** - * Uses LevelDB to implement the KeyValueDB interface - */ -class LevelDBStore : public KeyValueDB { - CephContext *cct; - PerfCounters *logger; - string path; - boost::scoped_ptr db_cache; -#ifdef HAVE_LEVELDB_FILTER_POLICY - boost::scoped_ptr filterpolicy; -#endif - boost::scoped_ptr db; - - int do_open(ostream &out, bool create_if_missing); - - // manage async compactions - Mutex compact_queue_lock; - Cond compact_queue_cond; - list< pair > compact_queue; - bool compact_queue_stop; - class CompactThread : public Thread { - LevelDBStore *db; - public: - CompactThread(LevelDBStore *d) : db(d) {} - void *entry() { - db->compact_thread_entry(); - return NULL; - } - friend class LevelDBStore; - } compact_thread; - - void compact_thread_entry(); - - void compact_range(const string& start, const string& end) { - leveldb::Slice cstart(start); - leveldb::Slice cend(end); - db->CompactRange(&cstart, &cend); - } - void compact_range_async(const string& start, const string& end); - -public: - /// compact the underlying leveldb store - void compact(); - - /// compact db for all keys with a given prefix - void compact_prefix(const string& prefix) { - compact_range(prefix, past_prefix(prefix)); - } - void compact_prefix_async(const string& prefix) { - compact_range_async(prefix, past_prefix(prefix)); - } - void compact_range(const string& prefix, - const string& start, const string& end) { - compact_range(combine_strings(prefix, start), combine_strings(prefix, end)); - } - void compact_range_async(const string& prefix, - const string& start, const string& end) { - compact_range_async(combine_strings(prefix, start), - combine_strings(prefix, end)); - } - - - /** - * options_t: Holds options which are minimally interpreted - * on initialization and then passed through to LevelDB. - * We transform a couple of these into actual LevelDB - * structures, but the rest are simply passed through unchanged. See - * leveldb/options.h for more precise details on each. - * - * Set them after constructing the LevelDBStore, but before calling - * open() or create_and_open(). - */ - struct options_t { - uint64_t write_buffer_size; /// in-memory write buffer size - int max_open_files; /// maximum number of files LevelDB can open at once - uint64_t cache_size; /// size of extra decompressed cache to use - uint64_t block_size; /// user data per block - int bloom_size; /// number of bits per entry to put in a bloom filter - bool compression_enabled; /// whether to use libsnappy compression or not - - // don't change these ones. No, seriously - int block_restart_interval; - bool error_if_exists; - bool paranoid_checks; - - string log_file; - - options_t() : - write_buffer_size(0), //< 0 means default - max_open_files(0), //< 0 means default - cache_size(0), //< 0 means no cache (default) - block_size(0), //< 0 means default - bloom_size(0), //< 0 means no bloom filter (default) - compression_enabled(true), //< set to false for no compression - block_restart_interval(0), //< 0 means default - error_if_exists(false), //< set to true if you want to check nonexistence - paranoid_checks(false) //< set to true if you want paranoid checks - {} - } options; - - LevelDBStore(CephContext *c, const string &path) : - cct(c), - logger(NULL), - path(path), - db_cache(NULL), -#ifdef HAVE_LEVELDB_FILTER_POLICY - filterpolicy(NULL), -#endif - compact_queue_lock("LevelDBStore::compact_thread_lock"), - compact_queue_stop(false), - compact_thread(this), - options() - {} - - ~LevelDBStore(); - - static int _test_init(const string& dir); - int init(string option_str=""); - - /// Opens underlying db - int open(ostream &out) { - return do_open(out, false); - } - /// Creates underlying db if missing and opens it - int create_and_open(ostream &out) { - return do_open(out, true); - } - - void close(); - - class LevelDBTransactionImpl : public KeyValueDB::TransactionImpl { - public: - leveldb::WriteBatch bat; - LevelDBStore *db; - LevelDBTransactionImpl(LevelDBStore *db) : db(db) {} - void set( - const string &prefix, - const string &k, - const bufferlist &bl); - void rmkey( - const string &prefix, - const string &k); - void rmkeys_by_prefix( - const string &prefix - ); - }; - - KeyValueDB::Transaction get_transaction() { - return ceph::shared_ptr< LevelDBTransactionImpl >( - new LevelDBTransactionImpl(this)); - } - - int submit_transaction(KeyValueDB::Transaction t); - int submit_transaction_sync(KeyValueDB::Transaction t); - int get( - const string &prefix, - const std::set &key, - std::map *out - ); - - class LevelDBWholeSpaceIteratorImpl : - public KeyValueDB::WholeSpaceIteratorImpl { - protected: - boost::scoped_ptr dbiter; - public: - LevelDBWholeSpaceIteratorImpl(leveldb::Iterator *iter) : - dbiter(iter) { } - virtual ~LevelDBWholeSpaceIteratorImpl() { } - - int seek_to_first() { - dbiter->SeekToFirst(); - return dbiter->status().ok() ? 0 : -1; - } - int seek_to_first(const string &prefix) { - leveldb::Slice slice_prefix(prefix); - dbiter->Seek(slice_prefix); - return dbiter->status().ok() ? 0 : -1; - } - int seek_to_last() { - dbiter->SeekToLast(); - return dbiter->status().ok() ? 0 : -1; - } - int seek_to_last(const string &prefix) { - string limit = past_prefix(prefix); - leveldb::Slice slice_limit(limit); - dbiter->Seek(slice_limit); - - if (!dbiter->Valid()) { - dbiter->SeekToLast(); - } else { - dbiter->Prev(); - } - return dbiter->status().ok() ? 0 : -1; - } - int upper_bound(const string &prefix, const string &after) { - lower_bound(prefix, after); - if (valid()) { - pair key = raw_key(); - if (key.first == prefix && key.second == after) - next(); - } - return dbiter->status().ok() ? 0 : -1; - } - int lower_bound(const string &prefix, const string &to) { - string bound = combine_strings(prefix, to); - leveldb::Slice slice_bound(bound); - dbiter->Seek(slice_bound); - return dbiter->status().ok() ? 0 : -1; - } - bool valid() { - return dbiter->Valid(); - } - int next() { - if (valid()) - dbiter->Next(); - return dbiter->status().ok() ? 0 : -1; - } - int prev() { - if (valid()) - dbiter->Prev(); - return dbiter->status().ok() ? 0 : -1; - } - string key() { - string out_key; - split_key(dbiter->key(), 0, &out_key); - return out_key; - } - pair raw_key() { - string prefix, key; - split_key(dbiter->key(), &prefix, &key); - return make_pair(prefix, key); - } - bufferlist value() { - return to_bufferlist(dbiter->value()); - } - int status() { - return dbiter->status().ok() ? 0 : -1; - } - }; - - class LevelDBSnapshotIteratorImpl : public LevelDBWholeSpaceIteratorImpl { - leveldb::DB *db; - const leveldb::Snapshot *snapshot; - public: - LevelDBSnapshotIteratorImpl(leveldb::DB *db, const leveldb::Snapshot *s, - leveldb::Iterator *iter) : - LevelDBWholeSpaceIteratorImpl(iter), db(db), snapshot(s) { } - - ~LevelDBSnapshotIteratorImpl() { - assert(snapshot != NULL); - db->ReleaseSnapshot(snapshot); - } - }; - - /// Utility - static string combine_strings(const string &prefix, const string &value); - static int split_key(leveldb::Slice in, string *prefix, string *key); - static bufferlist to_bufferlist(leveldb::Slice in); - static string past_prefix(const string &prefix) { - string limit = prefix; - limit.push_back(1); - return limit; - } - - virtual uint64_t get_estimated_size(map &extra) { - DIR *store_dir = opendir(path.c_str()); - if (!store_dir) { - lderr(cct) << __func__ << " something happened opening the store: " - << cpp_strerror(errno) << dendl; - return 0; - } - - uint64_t total_size = 0; - uint64_t sst_size = 0; - uint64_t log_size = 0; - uint64_t misc_size = 0; - - struct dirent *entry = NULL; - while ((entry = readdir(store_dir)) != NULL) { - string n(entry->d_name); - - if (n == "." || n == "..") - continue; - - string fpath = path + '/' + n; - struct stat s; - int err = stat(fpath.c_str(), &s); - if (err < 0) - err = -errno; - // we may race against leveldb while reading files; this should only - // happen when those files are being updated, data is being shuffled - // and files get removed, in which case there's not much of a problem - // as we'll get to them next time around. - if (err == -ENOENT) { - continue; - } - if (err < 0) { - lderr(cct) << __func__ << " error obtaining stats for " << fpath - << ": " << cpp_strerror(err) << dendl; - goto err; - } - - size_t pos = n.find_last_of('.'); - if (pos == string::npos) { - misc_size += s.st_size; - continue; - } - - string ext = n.substr(pos+1); - if (ext == "sst") { - sst_size += s.st_size; - } else if (ext == "log") { - log_size += s.st_size; - } else { - misc_size += s.st_size; - } - } - - total_size = sst_size + log_size + misc_size; - - extra["sst"] = sst_size; - extra["log"] = log_size; - extra["misc"] = misc_size; - extra["total"] = total_size; - -err: - closedir(store_dir); - return total_size; - } - - -protected: - WholeSpaceIterator _get_iterator() { - return ceph::shared_ptr( - new LevelDBWholeSpaceIteratorImpl( - db->NewIterator(leveldb::ReadOptions()) - ) - ); - } - - WholeSpaceIterator _get_snapshot_iterator() { - const leveldb::Snapshot *snapshot; - leveldb::ReadOptions options; - - snapshot = db->GetSnapshot(); - options.snapshot = snapshot; - - return ceph::shared_ptr( - new LevelDBSnapshotIteratorImpl(db.get(), snapshot, - db->NewIterator(options)) - ); - } - -}; - -#endif diff --git a/src/os/Makefile.am b/src/os/Makefile.am index fe126ec80177..3ea93007c28a 100644 --- a/src/os/Makefile.am +++ b/src/os/Makefile.am @@ -16,10 +16,8 @@ libos_a_SOURCES = \ os/HashIndex.cc \ os/IndexManager.cc \ os/JournalingObjectStore.cc \ - os/LevelDBStore.cc \ os/LFNIndex.cc \ os/MemStore.cc \ - os/KeyValueDB.cc \ os/KeyValueStore.cc \ os/ObjectStore.cc \ os/WBThrottle.cc @@ -43,8 +41,7 @@ if WITH_LIBZFS libos_a_SOURCES += os/ZFSFileStoreBackend.cc endif -libos_a_CXXFLAGS = ${AM_CXXFLAGS} -I rocksdb/include -libos_a_LIBADD = libos_types.a +libos_a_LIBADD = libos_types.a libkv.a if WITH_LTTNG libos_a_LIBADD += $(LIBOS_TP) @@ -71,8 +68,6 @@ noinst_HEADERS += \ os/IndexManager.h \ os/Journal.h \ os/JournalingObjectStore.h \ - os/KeyValueDB.h \ - os/LevelDBStore.h \ os/LFNIndex.h \ os/MemStore.h \ os/KeyValueStore.h \ @@ -84,24 +79,6 @@ noinst_HEADERS += \ os/XfsFileStoreBackend.h \ os/ZFSFileStoreBackend.h -if WITH_SLIBROCKSDB -# build rocksdb with its own makefile -# for some stupid reason this needs -fPIC... -# PORTABLE=1 fixes the aarch64 build (-march=native doesn't work there) -rocksdb/librocksdb.a: - cd rocksdb && EXTRA_CXXFLAGS=-fPIC PORTABLE=1 make -j$(shell nproc) static_lib -libos_a_CXXFLAGS += -I rocksdb/include -fPIC -libos_a_SOURCES += os/RocksDBStore.cc -libos_a_LIBADD += rocksdb/librocksdb.a -noinst_HEADERS += os/RocksDBStore.h -endif - -if WITH_DLIBROCKSDB -libos_a_SOURCES += os/RocksDBStore.cc -libos_a_LIBADD += -lrocksdb -noinst_HEADERS += os/RocksDBStore.h -endif - if WITH_LIBZFS libos_zfs_a_SOURCES = os/ZFS.cc libos_zfs_a_CXXFLAGS = ${AM_CXXFLAGS} ${LIBZFS_CFLAGS} @@ -109,11 +86,4 @@ noinst_LIBRARIES += libos_zfs.a noinst_HEADERS += os/ZFS.h endif -if WITH_KINETIC -libos_a_SOURCES += os/KineticStore.cc -libos_a_CXXFLAGS += -std=gnu++11 -libos_a_LIBADD += -lkinetic_client -lprotobuf -lglog -lgflags libcrypto.a -noinst_HEADERS += os/KineticStore.h -endif - endif # ENABLE_SERVER diff --git a/src/os/ObjectMap.h b/src/os/ObjectMap.h index 0e48d5531605..e7a64a47f9db 100644 --- a/src/os/ObjectMap.h +++ b/src/os/ObjectMap.h @@ -20,7 +20,7 @@ #include #include #include "include/memory.h" -#include "os/KeyValueDB.h" +#include "kv/KeyValueDB.h" /** * Encapsulates the FileStore key value store diff --git a/src/os/RocksDBStore.cc b/src/os/RocksDBStore.cc deleted file mode 100644 index a75274328601..000000000000 --- a/src/os/RocksDBStore.cc +++ /dev/null @@ -1,519 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab - -#include -#include -#include -#include -#include - -#include "rocksdb/db.h" -#include "rocksdb/table.h" -#include "rocksdb/env.h" -#include "rocksdb/write_batch.h" -#include "rocksdb/slice.h" -#include "rocksdb/cache.h" -#include "rocksdb/filter_policy.h" -#include "rocksdb/utilities/convenience.h" -using std::string; -#include "common/perf_counters.h" -#include "common/debug.h" -#include "include/str_map.h" -#include "KeyValueDB.h" -#include "RocksDBStore.h" - -int string2bool(string val, bool &b_val) -{ - if (strcasecmp(val.c_str(), "false") == 0) { - b_val = false; - return 0; - } else if (strcasecmp(val.c_str(), "true") == 0) { - b_val = true; - return 0; - } else { - std::string err; - int b = strict_strtol(val.c_str(), 10, &err); - if (!err.empty()) - return -EINVAL; - b_val = !!b; - return 0; - } -} - -int RocksDBStore::tryInterpret(const string key, const string val, rocksdb::Options &opt) -{ - if (key == "compaction_threads") { - std::string err; - int f = strict_sistrtoll(val.c_str(), &err); - if (!err.empty()) - return -EINVAL; - //Low priority threadpool is used for compaction - opt.env->SetBackgroundThreads(f, rocksdb::Env::Priority::LOW); - } else if (key == "flusher_threads") { - std::string err; - int f = strict_sistrtoll(val.c_str(), &err); - if (!err.empty()) - return -EINVAL; - //High priority threadpool is used for flusher - opt.env->SetBackgroundThreads(f, rocksdb::Env::Priority::HIGH); - } else if (key == "compact_on_mount") { - int ret = string2bool(val, compact_on_mount); - if (ret != 0) - return ret; - } else if (key == "disableWAL") { - int ret = string2bool(val, disableWAL); - if (ret != 0) - return ret; - } else { - //unrecognize config options. - return -EINVAL; - } - return 0; -} - -int RocksDBStore::ParseOptionsFromString(const string opt_str, rocksdb::Options &opt) -{ - map str_map; - int r = get_str_map(opt_str, ",\n;", &str_map); - if (r < 0) - return r; - map::iterator it; - for(it = str_map.begin(); it != str_map.end(); ++it) { - string this_opt = it->first + "=" + it->second; - rocksdb::Status status = rocksdb::GetOptionsFromString(opt, this_opt , &opt); - if (!status.ok()) { - //unrecognized by rocksdb, try to interpret by ourselves. - r = tryInterpret(it->first, it->second, opt); - if (r < 0) { - derr << status.ToString() << dendl; - return -EINVAL; - } - } - lgeneric_dout(cct, 0) << " set rocksdb option " << it->first - << " = " << it->second << dendl; - } - return 0; -} - -int RocksDBStore::init(string _options_str) -{ - options_str = _options_str; - rocksdb::Options opt; - //try parse options - int r = ParseOptionsFromString(options_str, opt); - if (r != 0) { - return -EINVAL; - } - return 0; -} - -int RocksDBStore::do_open(ostream &out, bool create_if_missing) -{ - rocksdb::Options opt; - rocksdb::Status status; - - int r = ParseOptionsFromString(options_str, opt); - if (r != 0) { - return -EINVAL; - } - opt.create_if_missing = create_if_missing; - - status = rocksdb::DB::Open(opt, path, &db); - if (!status.ok()) { - derr << status.ToString() << dendl; - return -EINVAL; - } - - PerfCountersBuilder plb(g_ceph_context, "rocksdb", l_rocksdb_first, l_rocksdb_last); - plb.add_u64_counter(l_rocksdb_gets, "rocksdb_get", "Gets"); - plb.add_u64_counter(l_rocksdb_txns, "rocksdb_transaction", "Transactions"); - plb.add_time_avg(l_rocksdb_get_latency, "rocksdb_get_latency", "Get latency"); - plb.add_time_avg(l_rocksdb_submit_latency, "rocksdb_submit_latency", "Submit Latency"); - plb.add_time_avg(l_rocksdb_submit_sync_latency, "rocksdb_submit_sync_latency", "Submit Sync Latency"); - plb.add_u64_counter(l_rocksdb_compact, "rocksdb_compact", "Compactions"); - plb.add_u64_counter(l_rocksdb_compact_range, "rocksdb_compact_range", "Compactions by range"); - plb.add_u64_counter(l_rocksdb_compact_queue_merge, "rocksdb_compact_queue_merge", "Mergings of ranges in compaction queue"); - plb.add_u64(l_rocksdb_compact_queue_len, "rocksdb_compact_queue_len", "Length of compaction queue"); - logger = plb.create_perf_counters(); - cct->get_perfcounters_collection()->add(logger); - - if (compact_on_mount) { - derr << "Compacting rocksdb store..." << dendl; - compact(); - derr << "Finished compacting rocksdb store" << dendl; - } - return 0; -} - -int RocksDBStore::_test_init(const string& dir) -{ - rocksdb::Options options; - options.create_if_missing = true; - rocksdb::DB *db; - rocksdb::Status status = rocksdb::DB::Open(options, dir, &db); - delete db; - return status.ok() ? 0 : -EIO; -} - -RocksDBStore::~RocksDBStore() -{ - close(); - delete logger; - - // Ensure db is destroyed before dependent db_cache and filterpolicy - delete db; -} - -void RocksDBStore::close() -{ - // stop compaction thread - compact_queue_lock.Lock(); - if (compact_thread.is_started()) { - compact_queue_stop = true; - compact_queue_cond.Signal(); - compact_queue_lock.Unlock(); - compact_thread.join(); - } else { - compact_queue_lock.Unlock(); - } - - if (logger) - cct->get_perfcounters_collection()->remove(logger); -} - -int RocksDBStore::submit_transaction(KeyValueDB::Transaction t) -{ - utime_t start = ceph_clock_now(g_ceph_context); - RocksDBTransactionImpl * _t = - static_cast(t.get()); - rocksdb::WriteOptions woptions; - woptions.disableWAL = disableWAL; - rocksdb::Status s = db->Write(woptions, _t->bat); - utime_t lat = ceph_clock_now(g_ceph_context) - start; - logger->inc(l_rocksdb_txns); - logger->tinc(l_rocksdb_submit_latency, lat); - return s.ok() ? 0 : -1; -} - -int RocksDBStore::submit_transaction_sync(KeyValueDB::Transaction t) -{ - utime_t start = ceph_clock_now(g_ceph_context); - RocksDBTransactionImpl * _t = - static_cast(t.get()); - rocksdb::WriteOptions woptions; - woptions.sync = true; - woptions.disableWAL = disableWAL; - rocksdb::Status s = db->Write(woptions, _t->bat); - utime_t lat = ceph_clock_now(g_ceph_context) - start; - logger->inc(l_rocksdb_txns); - logger->tinc(l_rocksdb_submit_sync_latency, lat); - return s.ok() ? 0 : -1; -} -int RocksDBStore::get_info_log_level(string info_log_level) -{ - if (info_log_level == "debug") { - return 0; - } else if (info_log_level == "info") { - return 1; - } else if (info_log_level == "warn") { - return 2; - } else if (info_log_level == "error") { - return 3; - } else if (info_log_level == "fatal") { - return 4; - } else { - return 1; - } -} - -RocksDBStore::RocksDBTransactionImpl::RocksDBTransactionImpl(RocksDBStore *_db) -{ - db = _db; - bat = new rocksdb::WriteBatch(); -} -RocksDBStore::RocksDBTransactionImpl::~RocksDBTransactionImpl() -{ - delete bat; -} -void RocksDBStore::RocksDBTransactionImpl::set( - const string &prefix, - const string &k, - const bufferlist &to_set_bl) -{ - string key = combine_strings(prefix, k); - //bufferlist::c_str() is non-constant, so we need to make a copy - bufferlist val = to_set_bl; - bat->Delete(rocksdb::Slice(key)); - bat->Put(rocksdb::Slice(key), - rocksdb::Slice(val.c_str(), val.length())); -} - -void RocksDBStore::RocksDBTransactionImpl::rmkey(const string &prefix, - const string &k) -{ - bat->Delete(combine_strings(prefix, k)); -} - -void RocksDBStore::RocksDBTransactionImpl::rmkeys_by_prefix(const string &prefix) -{ - KeyValueDB::Iterator it = db->get_iterator(prefix); - for (it->seek_to_first(); - it->valid(); - it->next()) { - bat->Delete(combine_strings(prefix, it->key())); - } -} - -int RocksDBStore::get( - const string &prefix, - const std::set &keys, - std::map *out) -{ - utime_t start = ceph_clock_now(g_ceph_context); - KeyValueDB::Iterator it = get_iterator(prefix); - for (std::set::const_iterator i = keys.begin(); - i != keys.end(); - ++i) { - it->lower_bound(*i); - if (it->valid() && it->key() == *i) { - out->insert(make_pair(*i, it->value())); - } else if (!it->valid()) - break; - } - utime_t lat = ceph_clock_now(g_ceph_context) - start; - logger->inc(l_rocksdb_gets); - logger->tinc(l_rocksdb_get_latency, lat); - return 0; -} - -string RocksDBStore::combine_strings(const string &prefix, const string &value) -{ - string out = prefix; - out.push_back(0); - out.append(value); - return out; -} - -bufferlist RocksDBStore::to_bufferlist(rocksdb::Slice in) -{ - bufferlist bl; - bl.append(bufferptr(in.data(), in.size())); - return bl; -} - -int RocksDBStore::split_key(rocksdb::Slice in, string *prefix, string *key) -{ - string in_prefix = in.ToString(); - size_t prefix_len = in_prefix.find('\0'); - if (prefix_len >= in_prefix.size()) - return -EINVAL; - - if (prefix) - *prefix = string(in_prefix, 0, prefix_len); - if (key) - *key= string(in_prefix, prefix_len + 1); - return 0; -} - -void RocksDBStore::compact() -{ - logger->inc(l_rocksdb_compact); - db->CompactRange(NULL, NULL); -} - - -void RocksDBStore::compact_thread_entry() -{ - compact_queue_lock.Lock(); - while (!compact_queue_stop) { - while (!compact_queue.empty()) { - pair range = compact_queue.front(); - compact_queue.pop_front(); - logger->set(l_rocksdb_compact_queue_len, compact_queue.size()); - compact_queue_lock.Unlock(); - logger->inc(l_rocksdb_compact_range); - compact_range(range.first, range.second); - compact_queue_lock.Lock(); - continue; - } - compact_queue_cond.Wait(compact_queue_lock); - } - compact_queue_lock.Unlock(); -} - -void RocksDBStore::compact_range_async(const string& start, const string& end) -{ - Mutex::Locker l(compact_queue_lock); - - // try to merge adjacent ranges. this is O(n), but the queue should - // be short. note that we do not cover all overlap cases and merge - // opportunities here, but we capture the ones we currently need. - list< pair >::iterator p = compact_queue.begin(); - while (p != compact_queue.end()) { - if (p->first == start && p->second == end) { - // dup; no-op - return; - } - if (p->first <= end && p->first > start) { - // merge with existing range to the right - compact_queue.push_back(make_pair(start, p->second)); - compact_queue.erase(p); - logger->inc(l_rocksdb_compact_queue_merge); - break; - } - if (p->second >= start && p->second < end) { - // merge with existing range to the left - compact_queue.push_back(make_pair(p->first, end)); - compact_queue.erase(p); - logger->inc(l_rocksdb_compact_queue_merge); - break; - } - ++p; - } - if (p == compact_queue.end()) { - // no merge, new entry. - compact_queue.push_back(make_pair(start, end)); - logger->set(l_rocksdb_compact_queue_len, compact_queue.size()); - } - compact_queue_cond.Signal(); - if (!compact_thread.is_started()) { - compact_thread.create(); - } -} -bool RocksDBStore::check_omap_dir(string &omap_dir) -{ - rocksdb::Options options; - options.create_if_missing = true; - rocksdb::DB *db; - rocksdb::Status status = rocksdb::DB::Open(options, omap_dir, &db); - delete db; - return status.ok(); -} -void RocksDBStore::compact_range(const string& start, const string& end) -{ - rocksdb::Slice cstart(start); - rocksdb::Slice cend(end); - db->CompactRange(&cstart, &cend); -} -RocksDBStore::RocksDBWholeSpaceIteratorImpl::~RocksDBWholeSpaceIteratorImpl() -{ - delete dbiter; -} -int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_first() -{ - dbiter->SeekToFirst(); - return dbiter->status().ok() ? 0 : -1; -} -int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_first(const string &prefix) -{ - rocksdb::Slice slice_prefix(prefix); - dbiter->Seek(slice_prefix); - return dbiter->status().ok() ? 0 : -1; -} -int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_last() -{ - dbiter->SeekToLast(); - return dbiter->status().ok() ? 0 : -1; -} -int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_last(const string &prefix) -{ - string limit = past_prefix(prefix); - rocksdb::Slice slice_limit(limit); - dbiter->Seek(slice_limit); - - if (!dbiter->Valid()) { - dbiter->SeekToLast(); - } else { - dbiter->Prev(); - } - return dbiter->status().ok() ? 0 : -1; -} -int RocksDBStore::RocksDBWholeSpaceIteratorImpl::upper_bound(const string &prefix, const string &after) -{ - lower_bound(prefix, after); - if (valid()) { - pair key = raw_key(); - if (key.first == prefix && key.second == after) - next(); - } - return dbiter->status().ok() ? 0 : -1; -} -int RocksDBStore::RocksDBWholeSpaceIteratorImpl::lower_bound(const string &prefix, const string &to) -{ - string bound = combine_strings(prefix, to); - rocksdb::Slice slice_bound(bound); - dbiter->Seek(slice_bound); - return dbiter->status().ok() ? 0 : -1; -} -bool RocksDBStore::RocksDBWholeSpaceIteratorImpl::valid() -{ - return dbiter->Valid(); -} -int RocksDBStore::RocksDBWholeSpaceIteratorImpl::next() -{ - if (valid()) - dbiter->Next(); - return dbiter->status().ok() ? 0 : -1; -} -int RocksDBStore::RocksDBWholeSpaceIteratorImpl::prev() -{ - if (valid()) - dbiter->Prev(); - return dbiter->status().ok() ? 0 : -1; -} -string RocksDBStore::RocksDBWholeSpaceIteratorImpl::key() -{ - string out_key; - split_key(dbiter->key(), 0, &out_key); - return out_key; -} -pair RocksDBStore::RocksDBWholeSpaceIteratorImpl::raw_key() -{ - string prefix, key; - split_key(dbiter->key(), &prefix, &key); - return make_pair(prefix, key); -} -bufferlist RocksDBStore::RocksDBWholeSpaceIteratorImpl::value() -{ - return to_bufferlist(dbiter->value()); -} -int RocksDBStore::RocksDBWholeSpaceIteratorImpl::status() -{ - return dbiter->status().ok() ? 0 : -1; -} - -string RocksDBStore::past_prefix(const string &prefix) -{ - string limit = prefix; - limit.push_back(1); - return limit; -} - - -RocksDBStore::WholeSpaceIterator RocksDBStore::_get_iterator() -{ - return std::shared_ptr( - new RocksDBWholeSpaceIteratorImpl( - db->NewIterator(rocksdb::ReadOptions()) - ) - ); -} - -RocksDBStore::WholeSpaceIterator RocksDBStore::_get_snapshot_iterator() -{ - const rocksdb::Snapshot *snapshot; - rocksdb::ReadOptions options; - - snapshot = db->GetSnapshot(); - options.snapshot = snapshot; - - return std::shared_ptr( - new RocksDBSnapshotIteratorImpl(db, snapshot, - db->NewIterator(options)) - ); -} - -RocksDBStore::RocksDBSnapshotIteratorImpl::~RocksDBSnapshotIteratorImpl() -{ - db->ReleaseSnapshot(snapshot); -} diff --git a/src/os/RocksDBStore.h b/src/os/RocksDBStore.h deleted file mode 100644 index c03b20c86b3f..000000000000 --- a/src/os/RocksDBStore.h +++ /dev/null @@ -1,281 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab -#ifndef ROCKS_DB_STORE_H -#define ROCKS_DB_STORE_H - -#include "include/types.h" -#include "include/buffer.h" -#include "KeyValueDB.h" -#include -#include -#include -#include -#include - -#include -#include "common/errno.h" -#include "common/dout.h" -#include "include/assert.h" -#include "common/Formatter.h" -#include "common/Cond.h" - -#include "common/ceph_context.h" -class PerfCounters; - -enum { - l_rocksdb_first = 34300, - l_rocksdb_gets, - l_rocksdb_txns, - l_rocksdb_get_latency, - l_rocksdb_submit_latency, - l_rocksdb_submit_sync_latency, - l_rocksdb_compact, - l_rocksdb_compact_range, - l_rocksdb_compact_queue_merge, - l_rocksdb_compact_queue_len, - l_rocksdb_last, -}; - -namespace rocksdb{ - class DB; - class Cache; - class FilterPolicy; - class Snapshot; - class Slice; - class WriteBatch; - class Iterator; - struct Options; -} -/** - * Uses RocksDB to implement the KeyValueDB interface - */ -class RocksDBStore : public KeyValueDB { - CephContext *cct; - PerfCounters *logger; - string path; - rocksdb::DB *db; - string options_str; - int do_open(ostream &out, bool create_if_missing); - - // manage async compactions - Mutex compact_queue_lock; - Cond compact_queue_cond; - list< pair > compact_queue; - bool compact_queue_stop; - class CompactThread : public Thread { - RocksDBStore *db; - public: - CompactThread(RocksDBStore *d) : db(d) {} - void *entry() { - db->compact_thread_entry(); - return NULL; - } - friend class RocksDBStore; - } compact_thread; - - void compact_thread_entry(); - - void compact_range(const string& start, const string& end); - void compact_range_async(const string& start, const string& end); - -public: - /// compact the underlying rocksdb store - bool compact_on_mount; - bool disableWAL; - void compact(); - - int tryInterpret(const string key, const string val, rocksdb::Options &opt); - int ParseOptionsFromString(const string opt_str, rocksdb::Options &opt); - static int _test_init(const string& dir); - int init(string options_str); - /// compact rocksdb for all keys with a given prefix - void compact_prefix(const string& prefix) { - compact_range(prefix, past_prefix(prefix)); - } - void compact_prefix_async(const string& prefix) { - compact_range_async(prefix, past_prefix(prefix)); - } - - void compact_range(const string& prefix, const string& start, const string& end) { - compact_range(combine_strings(prefix, start), combine_strings(prefix, end)); - } - void compact_range_async(const string& prefix, const string& start, const string& end) { - compact_range_async(combine_strings(prefix, start), combine_strings(prefix, end)); - } - int get_info_log_level(string info_log_level); - - RocksDBStore(CephContext *c, const string &path) : - cct(c), - logger(NULL), - path(path), - db(NULL), - compact_queue_lock("RocksDBStore::compact_thread_lock"), - compact_queue_stop(false), - compact_thread(this), - compact_on_mount(false), - disableWAL(false) - {} - - ~RocksDBStore(); - - static bool check_omap_dir(string &omap_dir); - /// Opens underlying db - int open(ostream &out) { - return do_open(out, false); - } - /// Creates underlying db if missing and opens it - int create_and_open(ostream &out) { - return do_open(out, true); - } - - void close(); - - class RocksDBTransactionImpl : public KeyValueDB::TransactionImpl { - public: - rocksdb::WriteBatch *bat; - RocksDBStore *db; - - RocksDBTransactionImpl(RocksDBStore *_db); - ~RocksDBTransactionImpl(); - void set( - const string &prefix, - const string &k, - const bufferlist &bl); - void rmkey( - const string &prefix, - const string &k); - void rmkeys_by_prefix( - const string &prefix - ); - }; - - KeyValueDB::Transaction get_transaction() { - return std::shared_ptr< RocksDBTransactionImpl >( - new RocksDBTransactionImpl(this)); - } - - int submit_transaction(KeyValueDB::Transaction t); - int submit_transaction_sync(KeyValueDB::Transaction t); - int get( - const string &prefix, - const std::set &key, - std::map *out - ); - - class RocksDBWholeSpaceIteratorImpl : - public KeyValueDB::WholeSpaceIteratorImpl { - protected: - rocksdb::Iterator *dbiter; - public: - RocksDBWholeSpaceIteratorImpl(rocksdb::Iterator *iter) : - dbiter(iter) { } - //virtual ~RocksDBWholeSpaceIteratorImpl() { } - ~RocksDBWholeSpaceIteratorImpl(); - - int seek_to_first(); - int seek_to_first(const string &prefix); - int seek_to_last(); - int seek_to_last(const string &prefix); - int upper_bound(const string &prefix, const string &after); - int lower_bound(const string &prefix, const string &to); - bool valid(); - int next(); - int prev(); - string key(); - pair raw_key(); - bufferlist value(); - int status(); - }; - - class RocksDBSnapshotIteratorImpl : public RocksDBWholeSpaceIteratorImpl { - rocksdb::DB *db; - const rocksdb::Snapshot *snapshot; - public: - RocksDBSnapshotIteratorImpl(rocksdb::DB *db, const rocksdb::Snapshot *s, - rocksdb::Iterator *iter) : - RocksDBWholeSpaceIteratorImpl(iter), db(db), snapshot(s) { } - - ~RocksDBSnapshotIteratorImpl(); - }; - - /// Utility - static string combine_strings(const string &prefix, const string &value); - static int split_key(rocksdb::Slice in, string *prefix, string *key); - static bufferlist to_bufferlist(rocksdb::Slice in); - static string past_prefix(const string &prefix); - - virtual uint64_t get_estimated_size(map &extra) { - DIR *store_dir = opendir(path.c_str()); - if (!store_dir) { - lderr(cct) << __func__ << " something happened opening the store: " - << cpp_strerror(errno) << dendl; - return 0; - } - - uint64_t total_size = 0; - uint64_t sst_size = 0; - uint64_t log_size = 0; - uint64_t misc_size = 0; - - struct dirent *entry = NULL; - while ((entry = readdir(store_dir)) != NULL) { - string n(entry->d_name); - - if (n == "." || n == "..") - continue; - - string fpath = path + '/' + n; - struct stat s; - int err = stat(fpath.c_str(), &s); - if (err < 0) - err = -errno; - // we may race against rocksdb while reading files; this should only - // happen when those files are being updated, data is being shuffled - // and files get removed, in which case there's not much of a problem - // as we'll get to them next time around. - if (err == -ENOENT) { - continue; - } - if (err < 0) { - lderr(cct) << __func__ << " error obtaining stats for " << fpath - << ": " << cpp_strerror(err) << dendl; - goto err; - } - - size_t pos = n.find_last_of('.'); - if (pos == string::npos) { - misc_size += s.st_size; - continue; - } - - string ext = n.substr(pos+1); - if (ext == "sst") { - sst_size += s.st_size; - } else if (ext == "log") { - log_size += s.st_size; - } else { - misc_size += s.st_size; - } - } - - total_size = sst_size + log_size + misc_size; - - extra["sst"] = sst_size; - extra["log"] = log_size; - extra["misc"] = misc_size; - extra["total"] = total_size; - -err: - closedir(store_dir); - return total_size; - } - - -protected: - WholeSpaceIterator _get_iterator(); - - WholeSpaceIterator _get_snapshot_iterator(); - -}; - -#endif diff --git a/src/os/newstore/NewStore.h b/src/os/newstore/NewStore.h index 97c5d6a8a979..e81a47cac030 100644 --- a/src/os/newstore/NewStore.h +++ b/src/os/newstore/NewStore.h @@ -27,7 +27,7 @@ #include "common/WorkQueue.h" #include "os/ObjectStore.h" #include "os/fs/FS.h" -#include "os/KeyValueDB.h" +#include "kv/KeyValueDB.h" #include "newstore_types.h" diff --git a/src/test/ObjectMap/KeyValueDBMemory.h b/src/test/ObjectMap/KeyValueDBMemory.h index 77342a0b5672..94e224b33f86 100644 --- a/src/test/ObjectMap/KeyValueDBMemory.h +++ b/src/test/ObjectMap/KeyValueDBMemory.h @@ -5,7 +5,7 @@ #include #include "include/memory.h" -#include "os/KeyValueDB.h" +#include "kv/KeyValueDB.h" #include "include/buffer.h" #include "include/Context.h" diff --git a/src/test/ObjectMap/test_keyvaluedb_atomicity.cc b/src/test/ObjectMap/test_keyvaluedb_atomicity.cc index ac8b27a5e4fc..6e7fc8d2fa77 100644 --- a/src/test/ObjectMap/test_keyvaluedb_atomicity.cc +++ b/src/test/ObjectMap/test_keyvaluedb_atomicity.cc @@ -1,7 +1,7 @@ // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- #include #include "include/buffer.h" -#include "os/KeyValueDB.h" +#include "kv/KeyValueDB.h" #include #include #include diff --git a/src/test/ObjectMap/test_keyvaluedb_iterators.cc b/src/test/ObjectMap/test_keyvaluedb_iterators.cc index 32a038183ee4..dd4c8cd7a24c 100644 --- a/src/test/ObjectMap/test_keyvaluedb_iterators.cc +++ b/src/test/ObjectMap/test_keyvaluedb_iterators.cc @@ -17,7 +17,7 @@ #include #include "test/ObjectMap/KeyValueDBMemory.h" -#include "os/KeyValueDB.h" +#include "kv/KeyValueDB.h" #include #include "global/global_init.h" #include "common/ceph_argparse.h" diff --git a/src/test/ObjectMap/test_object_map.cc b/src/test/ObjectMap/test_object_map.cc index 500ca32626f8..6af60cf50333 100644 --- a/src/test/ObjectMap/test_object_map.cc +++ b/src/test/ObjectMap/test_object_map.cc @@ -6,7 +6,7 @@ #include "include/buffer.h" #include "test/ObjectMap/KeyValueDBMemory.h" -#include "os/KeyValueDB.h" +#include "kv/KeyValueDB.h" #include "os/DBObjectMap.h" #include "os/HashIndex.h" #include diff --git a/src/test/objectstore/FileStoreTracker.h b/src/test/objectstore/FileStoreTracker.h index 11033a68b996..e350c80fe1f5 100644 --- a/src/test/objectstore/FileStoreTracker.h +++ b/src/test/objectstore/FileStoreTracker.h @@ -4,7 +4,7 @@ #define FILESTORE_TRACKER_H #include "test/common/ObjectContents.h" #include "os/FileStore.h" -#include "os/KeyValueDB.h" +#include "kv/KeyValueDB.h" #include #include #include diff --git a/src/test/objectstore/TestRocksdbOptionParse.cc b/src/test/objectstore/TestRocksdbOptionParse.cc index cdbbfa9329d8..eaccfa14c65e 100644 --- a/src/test/objectstore/TestRocksdbOptionParse.cc +++ b/src/test/objectstore/TestRocksdbOptionParse.cc @@ -5,7 +5,7 @@ #include "rocksdb/db.h" #include "rocksdb/env.h" #include "rocksdb/thread_status.h" -#include "os/RocksDBStore.h" +#include "kv/RocksDBStore.h" #include using namespace std; diff --git a/src/test/objectstore/test_idempotent.cc b/src/test/objectstore/test_idempotent.cc index 72b8e24dcc3e..d52f7dbf03b9 100644 --- a/src/test/objectstore/test_idempotent.cc +++ b/src/test/objectstore/test_idempotent.cc @@ -21,7 +21,7 @@ #include "common/debug.h" #include "test/common/ObjectContents.h" #include "FileStoreTracker.h" -#include "os/KeyValueDB.h" +#include "kv/KeyValueDB.h" #include "os/ObjectStore.h" void usage(const string &name) { diff --git a/src/test/objectstore/test_kv.cc b/src/test/objectstore/test_kv.cc index df3805b8d1d6..b561650b3098 100644 --- a/src/test/objectstore/test_kv.cc +++ b/src/test/objectstore/test_kv.cc @@ -17,7 +17,7 @@ #include #include #include -#include "os/KeyValueDB.h" +#include "kv/KeyValueDB.h" #include "include/Context.h" #include "common/ceph_argparse.h" #include "global/global_init.h" diff --git a/src/tools/ceph_kvstore_tool.cc b/src/tools/ceph_kvstore_tool.cc index dc8888d9a767..202c5df2f652 100644 --- a/src/tools/ceph_kvstore_tool.cc +++ b/src/tools/ceph_kvstore_tool.cc @@ -25,7 +25,7 @@ #include "include/stringify.h" #include "include/utime.h" #include "common/Clock.h" -#include "os/KeyValueDB.h" +#include "kv/KeyValueDB.h" using namespace std; diff --git a/src/tools/ceph_osdomap_tool.cc b/src/tools/ceph_osdomap_tool.cc index a22e9b876e59..465ffda9195e 100644 --- a/src/tools/ceph_osdomap_tool.cc +++ b/src/tools/ceph_osdomap_tool.cc @@ -20,7 +20,7 @@ #include "global/global_init.h" #include "os/DBObjectMap.h" -#include "os/KeyValueDB.h" +#include "kv/KeyValueDB.h" namespace po = boost::program_options; using namespace std;