]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
add rocksdb support
authorxinxinsh <xinxin.shu@intel.com>
Tue, 18 Feb 2014 01:27:44 +0000 (09:27 +0800)
committerSage Weil <sage@redhat.com>
Wed, 30 Jul 2014 04:45:25 +0000 (21:45 -0700)
Signed-off-by: xinxin shu <xinxin.shu@intel.com>
configure.ac
src/Makefile-env.am
src/common/config_opts.h
src/os/KeyValueDB.cc
src/os/Makefile.am
src/os/RocksDBStore.cc [new file with mode: 0644]
src/os/RocksDBStore.h [new file with mode: 0644]

index 9506b427a3599275d5913e3592ab5438ed73db6c..a77e73ae32feb21036285ef10b3e466770694805 100644 (file)
@@ -529,6 +529,31 @@ AS_IF([test "x$with_kinetic" = "xyes"],
             [AC_DEFINE([HAVE_KINETIC], [1], [Defined if you have kinetic enabled])])
 AM_CONDITIONAL(WITH_KINETIC, [ test "$with_kinetic" = "yes" ])
 
+# use rocksdb
+AC_ARG_WITH([librocksdb],
+            [AS_HELP_STRING([--with-librocksdb], [build rocksdb support])],
+            [],
+            [with_librocksdb=no])
+AS_IF([test "x$with_librocksdb" = "xyes"],
+            [PKG_CHECK_MODULES([LIBROCKSDB], [rocksdb], [], [true])])
+AS_IF([test "x$with_librocksdb" = "xyes"],
+            [AC_DEFINE([HAVE_LIBROCKSDB], [1], [Defined if you have librocksdb enabled])])
+AM_CONDITIONAL(WITH_LIBROCKSDB, [ test "$with_librocksdb" = "yes" ])
+
+#add rocksdb support
+# check libz
+AS_IF([test "x$with_librocksdb" = "xyes"],
+            [AC_CHECK_LIB([z], [gzread], [], [AC_MSG_FAILURE([libz not found])])])
+# check libbz2
+AS_IF([test "x$with_librocksdb" = "xyes"],
+            [AC_CHECK_LIB([bz2], [BZ2_bzCompressInit], [], [AC_MSG_FAILURE([libbz2 not found])])])
+# check librt
+AS_IF([test "x$with_librocksdb" = "xyes"],
+            [AC_CHECK_LIB([rt], [clock_gettime], [], [AC_MSG_FAILURE([librt not found])])])
+# use system rocksdb
+#AC_CHECK_LIB([rocksdb], [rocksdb_open], [], [AC_MSG_FAILURE([librocks not found])],[-lsnappy -lpthread -lrt -lbz2 -lz])
+#AC_CHECK_LIB([rocksdb], [open], [], [AC_MSG_FAILURE([librocks not found])],[-lsnappy -lpthread -lrt -lbz2 -lz])
+
 # use system libs3?
 AC_ARG_WITH([system-libs3],
        [AS_HELP_STRING([--with-system-libs3], [use system libs3])],
index 3bf1b6d93737054e723420b76ab76514e05355ae..f866e6fa4ee9e5ea0a2de4fe748d6495ca8e804c 100644 (file)
@@ -165,6 +165,10 @@ if WITH_LIBZFS
 LIBOS += libos_zfs.a -lzfs
 endif # WITH_LIBZFS
 
+if WITH_LIBROCKSDB
+LIBOS += libos_rocksdb.la -lrocksdb
+endif # WITH_LIBROCKSDB
+
 if WITH_TCMALLOC
 LIBPERFGLUE += -ltcmalloc
 endif # WITH_TCMALLOC
index 6c034a54a4b3093119a63fb7433020835ad037e0..7aeb0d62583686f44dfe4fdd4c3b3facafa83aa9 100644 (file)
@@ -576,6 +576,16 @@ OPTION(kinetic_user_id, OPT_INT, 1) // kinetic user to authenticate as
 OPTION(kinetic_hmac_key, OPT_STR, "asdfasdf") // kinetic key to authenticate with
 OPTION(kinetic_use_ssl, OPT_BOOL, false) // whether to secure kinetic traffic with TLS
 
+OPTION(rocksdb_compact_on_mount, OPT_BOOL, false)
+OPTION(rocksdb_write_buffer_size, OPT_U64, 0) // rocksdb write buffer size
+OPTION(rocksdb_cache_size, OPT_U64, 0) // rocksdb cache size
+OPTION(rocksdb_block_size, OPT_U64, 0) // rocksdb block size
+OPTION(rocksdb_bloom_size, OPT_INT, 0) // rocksdb bloom bits per entry
+OPTION(rocksdb_max_open_files, OPT_INT, 0) // rocksdb max open files
+OPTION(rocksdb_compression, OPT_STR, "") // rocksdb uses compression : none, snappy, zlib, bzip2
+OPTION(rocksdb_paranoid, OPT_BOOL, false) // rocksdb paranoid flag
+OPTION(rocksdb_log, OPT_STR, "")  // enable rocksdb log file
+
 /**
  * osd_client_op_priority and osd_recovery_op_priority adjust the relative
  * priority of client io vs recovery io.
index 8e590e2a31e8b59e9ffbdd9548198d780a987e6c..e73932933329b1060dd8c438298a56f6c8825a51 100644 (file)
@@ -3,6 +3,9 @@
 
 #include "KeyValueDB.h"
 #include "LevelDBStore.h"
+#ifdef HAVE_LIBROCKSDB
+#include "RocksDBStore.h"
+#endif
 
 KeyValueDB *KeyValueDB::create(CephContext *cct, const string& type,
                               const string& dir)
@@ -14,6 +17,11 @@ KeyValueDB *KeyValueDB::create(CephContext *cct, const string& type,
   if (kv_type == KV_TYPE_KINETIC) {
     store = new KineticStore(g_ceph_context);
   }
+#endif
+#ifdef HAVE_LIBROCKSDB
+  if (type == "rocksdb") {
+    return new RocksDBStore(cct, dir);
+  }
 #endif
   return NULL;
 }
@@ -27,6 +35,11 @@ int KeyValueDB::test_init(const string& type, const string& dir)
   if (kv_type == KV_TYPE_KINETIC) {
     return 0;
   }
+#endif
+#ifdef HAVE_LIBROCKSDB
+  if (type == "rocksdb"){
+    return RocksDBStore::_test_init(dir);
+  }
 #endif
   return -EINVAL;
 }
index 07b0e6f0ecf2b6736afdaa469deca6377de46cbf..6f6e85c3e1096b342a271b8d64dd2b1b5f90ee49 100644 (file)
@@ -21,6 +21,7 @@ libos_la_SOURCES = \
        os/KeyValueStore.cc \
        os/ObjectStore.cc \
        os/WBThrottle.cc \
+        os/KeyValueDB.cc \
        common/TrackedOp.cc
 
 if LINUX
@@ -67,6 +68,12 @@ noinst_HEADERS += \
        os/XfsFileStoreBackend.h \
        os/ZFSFileStoreBackend.h
 
+if WITH_LIBROCKSDB
+libos_rocksdb_la_SOURCES = os/RocksDBStore.cc
+libos_rocksdb_la_CXXFLAGS = ${AM_CXXFLAGS} ${LIBROCKSDB_CFLAGS} -std=gnu++11
+noinst_LTLIBRARIES += libos_rocksdb.la
+noinst_HEADERS += os/RocksDBStore.h
+endif
 if WITH_LIBZFS
 libos_zfs_a_SOURCES = os/ZFS.cc
 libos_zfs_a_CXXFLAGS = ${AM_CXXFLAGS} ${LIBZFS_CFLAGS}
diff --git a/src/os/RocksDBStore.cc b/src/os/RocksDBStore.cc
new file mode 100644 (file)
index 0000000..5cb73e4
--- /dev/null
@@ -0,0 +1,458 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include <set>
+#include <map>
+#include <string>
+#include <tr1/memory>
+#include <errno.h>
+
+#include "rocksdb/db.h"
+#include "rocksdb/env.h"
+#include "rocksdb/write_batch.h"
+#include "rocksdb/slice.h"
+#include "rocksdb/cache.h"
+#include "rocksdb/filter_policy.h"
+
+using std::string;
+#include "common/perf_counters.h"
+#include "KeyValueDB.h"
+#include "RocksDBStore.h"
+
+
+int RocksDBStore::init()
+{
+  options.write_buffer_size = g_conf->rocksdb_write_buffer_size;
+  options.cache_size = g_conf->rocksdb_cache_size;
+  options.block_size = g_conf->rocksdb_block_size;
+  options.bloom_size = g_conf->rocksdb_bloom_size;
+  options.compression_type = g_conf->rocksdb_compression;
+  options.paranoid_checks = g_conf->rocksdb_paranoid;
+  options.max_open_files = g_conf->rocksdb_max_open_files;
+  options.log_file = g_conf->rocksdb_log;
+  return 0;
+}
+
+int RocksDBStore::do_open(ostream &out, bool create_if_missing)
+{
+  rocksdb::Options ldoptions;
+
+  if (options.write_buffer_size)
+    ldoptions.write_buffer_size = options.write_buffer_size;
+  if (options.max_open_files)
+    ldoptions.max_open_files = options.max_open_files;
+  if (options.cache_size) {
+    ldoptions.block_cache = rocksdb::NewLRUCache(options.cache_size);
+  }
+  if (options.block_size)
+    ldoptions.block_size = options.block_size;
+  if (options.bloom_size) {
+    const rocksdb::FilterPolicy *_filterpolicy =
+       rocksdb::NewBloomFilterPolicy(options.bloom_size);
+    ldoptions.filter_policy = _filterpolicy;
+    filterpolicy = _filterpolicy;
+  }
+  if (options.compression_type.length() == 0)
+    ldoptions.compression = rocksdb::kNoCompression;
+  else if(options.compression_type == "snappy")
+    ldoptions.compression = rocksdb::kSnappyCompression;
+  else if(options.compression_type == "zlib")
+    ldoptions.compression = rocksdb::kZlibCompression;
+  else if(options.compression_type == "bzip2")
+    ldoptions.compression = rocksdb::kBZip2Compression;
+  else
+    ldoptions.compression = rocksdb::kNoCompression;
+  if (options.block_restart_interval)
+    ldoptions.block_restart_interval = options.block_restart_interval;
+
+  ldoptions.error_if_exists = options.error_if_exists;
+  ldoptions.paranoid_checks = options.paranoid_checks;
+  ldoptions.create_if_missing = create_if_missing;
+
+  if (options.log_file.length()) {
+    rocksdb::Env *env = rocksdb::Env::Default();
+    env->NewLogger(options.log_file, &ldoptions.info_log);
+  }
+
+  //rocksdb::DB *_db;
+  rocksdb::Status status = rocksdb::DB::Open(ldoptions, path, &db);
+  if (!status.ok()) {
+    out << status.ToString() << std::endl;
+    return -EINVAL;
+  }
+  //db.reset(_db);
+
+  if (g_conf->rocksdb_compact_on_mount) {
+    derr << "Compacting rocksdb store..." << dendl;
+    compact();
+    derr << "Finished compacting rocksdb store" << dendl;
+  }
+
+
+  PerfCountersBuilder plb(g_ceph_context, "rocksdb", l_rocksdb_first, l_rocksdb_last);
+  plb.add_u64_counter(l_rocksdb_gets, "rocksdb_get");
+  plb.add_u64_counter(l_rocksdb_txns, "rocksdb_transaction");
+  plb.add_u64_counter(l_rocksdb_compact, "rocksdb_compact");
+  plb.add_u64_counter(l_rocksdb_compact_range, "rocksdb_compact_range");
+  plb.add_u64_counter(l_rocksdb_compact_queue_merge, "rocksdb_compact_queue_merge");
+  plb.add_u64(l_rocksdb_compact_queue_len, "rocksdb_compact_queue_len");
+  logger = plb.create_perf_counters();
+  cct->get_perfcounters_collection()->add(logger);
+  return 0;
+}
+
+int RocksDBStore::_test_init(const string& dir)
+{
+  rocksdb::Options options;
+  options.create_if_missing = true;
+  rocksdb::DB *db;
+  rocksdb::Status status = rocksdb::DB::Open(options, dir, &db);
+  delete db;
+  return status.ok() ? 0 : -EIO;
+}
+
+RocksDBStore::~RocksDBStore()
+{
+  close();
+  delete logger;
+
+  // Ensure db is destroyed before dependent db_cache and filterpolicy
+  delete db;
+}
+
+void RocksDBStore::close()
+{
+  // stop compaction thread
+  compact_queue_lock.Lock();
+  if (compact_thread.is_started()) {
+    compact_queue_stop = true;
+    compact_queue_cond.Signal();
+    compact_queue_lock.Unlock();
+    compact_thread.join();
+  } else {
+    compact_queue_lock.Unlock();
+  }
+
+  if (logger)
+    cct->get_perfcounters_collection()->remove(logger);
+}
+
+int RocksDBStore::submit_transaction(KeyValueDB::Transaction t)
+{
+  RocksDBTransactionImpl * _t =
+    static_cast<RocksDBTransactionImpl *>(t.get());
+  rocksdb::Status s = db->Write(rocksdb::WriteOptions(), _t->bat);
+  logger->inc(l_rocksdb_txns);
+  return s.ok() ? 0 : -1;
+}
+
+int RocksDBStore::submit_transaction_sync(KeyValueDB::Transaction t)
+{
+  RocksDBTransactionImpl * _t =
+    static_cast<RocksDBTransactionImpl *>(t.get());
+  rocksdb::WriteOptions options;
+  options.sync = true;
+  rocksdb::Status s = db->Write(options, _t->bat);
+  logger->inc(l_rocksdb_txns);
+  return s.ok() ? 0 : -1;
+}
+
+RocksDBStore::RocksDBTransactionImpl::RocksDBTransactionImpl(RocksDBStore *_db)
+{
+  db = _db;
+  bat = new rocksdb::WriteBatch();
+}
+RocksDBStore::RocksDBTransactionImpl::~RocksDBTransactionImpl()
+{
+  delete bat;
+}
+void RocksDBStore::RocksDBTransactionImpl::set(
+  const string &prefix,
+  const string &k,
+  const bufferlist &to_set_bl)
+{
+  buffers.push_back(to_set_bl);
+  buffers.rbegin()->rebuild();
+  bufferlist &bl = *(buffers.rbegin());
+  string key = combine_strings(prefix, k);
+  keys.push_back(key);
+  bat->Delete(rocksdb::Slice(*(keys.rbegin())));
+  bat->Put(rocksdb::Slice(*(keys.rbegin())),
+         rocksdb::Slice(bl.c_str(), bl.length()));
+}
+
+void RocksDBStore::RocksDBTransactionImpl::rmkey(const string &prefix,
+                                                const string &k)
+{
+  string key = combine_strings(prefix, k);
+  keys.push_back(key);
+  bat->Delete(rocksdb::Slice(*(keys.rbegin())));
+}
+
+void RocksDBStore::RocksDBTransactionImpl::rmkeys_by_prefix(const string &prefix)
+{
+  KeyValueDB::Iterator it = db->get_iterator(prefix);
+  for (it->seek_to_first();
+       it->valid();
+       it->next()) {
+    string key = combine_strings(prefix, it->key());
+    keys.push_back(key);
+    bat->Delete(*(keys.rbegin()));
+  }
+}
+
+int RocksDBStore::get(
+    const string &prefix,
+    const std::set<string> &keys,
+    std::map<string, bufferlist> *out)
+{
+  KeyValueDB::Iterator it = get_iterator(prefix);
+  for (std::set<string>::const_iterator i = keys.begin();
+       i != keys.end();
+       ++i) {
+    it->lower_bound(*i);
+    if (it->valid() && it->key() == *i) {
+      out->insert(make_pair(*i, it->value()));
+    } else if (!it->valid())
+      break;
+  }
+  logger->inc(l_rocksdb_gets);
+  return 0;
+}
+
+string RocksDBStore::combine_strings(const string &prefix, const string &value)
+{
+  string out = prefix;
+  out.push_back(0);
+  out.append(value);
+  return out;
+}
+
+bufferlist RocksDBStore::to_bufferlist(rocksdb::Slice in)
+{
+  bufferlist bl;
+  bl.append(bufferptr(in.data(), in.size()));
+  return bl;
+}
+
+int RocksDBStore::split_key(rocksdb::Slice in, string *prefix, string *key)
+{
+  string in_prefix = in.ToString();
+  size_t prefix_len = in_prefix.find('\0');
+  if (prefix_len >= in_prefix.size())
+    return -EINVAL;
+
+  if (prefix)
+    *prefix = string(in_prefix, 0, prefix_len);
+  if (key)
+    *key= string(in_prefix, prefix_len + 1);
+  return 0;
+}
+
+void RocksDBStore::compact()
+{
+  logger->inc(l_rocksdb_compact);
+  db->CompactRange(NULL, NULL);
+}
+
+
+void RocksDBStore::compact_thread_entry()
+{
+  compact_queue_lock.Lock();
+  while (!compact_queue_stop) {
+    while (!compact_queue.empty()) {
+      pair<string,string> range = compact_queue.front();
+      compact_queue.pop_front();
+      logger->set(l_rocksdb_compact_queue_len, compact_queue.size());
+      compact_queue_lock.Unlock();
+      logger->inc(l_rocksdb_compact_range);
+      compact_range(range.first, range.second);
+      compact_queue_lock.Lock();
+      continue;
+    }
+    compact_queue_cond.Wait(compact_queue_lock);
+  }
+  compact_queue_lock.Unlock();
+}
+
+void RocksDBStore::compact_range_async(const string& start, const string& end)
+{
+  Mutex::Locker l(compact_queue_lock);
+
+  // try to merge adjacent ranges.  this is O(n), but the queue should
+  // be short.  note that we do not cover all overlap cases and merge
+  // opportunities here, but we capture the ones we currently need.
+  list< pair<string,string> >::iterator p = compact_queue.begin();
+  while (p != compact_queue.end()) {
+    if (p->first == start && p->second == end) {
+      // dup; no-op
+      return;
+    }
+    if (p->first <= end && p->first > start) {
+      // merge with existing range to the right
+      compact_queue.push_back(make_pair(start, p->second));
+      compact_queue.erase(p);
+      logger->inc(l_rocksdb_compact_queue_merge);
+      break;
+    }
+    if (p->second >= start && p->second < end) {
+      // merge with existing range to the left
+      compact_queue.push_back(make_pair(p->first, end));
+      compact_queue.erase(p);
+      logger->inc(l_rocksdb_compact_queue_merge);
+      break;
+    }
+    ++p;
+  }
+  if (p == compact_queue.end()) {
+    // no merge, new entry.
+    compact_queue.push_back(make_pair(start, end));
+    logger->set(l_rocksdb_compact_queue_len, compact_queue.size());
+  }
+  compact_queue_cond.Signal();
+  if (!compact_thread.is_started()) {
+    compact_thread.create();
+  }
+}
+bool RocksDBStore::check_omap_dir(string &omap_dir)
+{
+  rocksdb::Options options;
+  options.create_if_missing = true;
+  rocksdb::DB *db;
+  rocksdb::Status status = rocksdb::DB::Open(options, omap_dir, &db);
+  delete db;
+  return status.ok();
+}
+void RocksDBStore::compact_range(const string& start, const string& end)
+{
+    rocksdb::Slice cstart(start);
+    rocksdb::Slice cend(end);
+    db->CompactRange(&cstart, &cend);
+}
+RocksDBStore::RocksDBWholeSpaceIteratorImpl::~RocksDBWholeSpaceIteratorImpl()
+{
+  delete dbiter;
+}
+int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_first()
+{
+  dbiter->SeekToFirst();
+  return dbiter->status().ok() ? 0 : -1;
+}
+int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_first(const string &prefix)
+{
+  rocksdb::Slice slice_prefix(prefix);
+  dbiter->Seek(slice_prefix);
+  return dbiter->status().ok() ? 0 : -1;
+}
+int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_last()
+{
+  dbiter->SeekToLast();
+  return dbiter->status().ok() ? 0 : -1;
+}
+int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_last(const string &prefix)
+{
+  string limit = past_prefix(prefix);
+  rocksdb::Slice slice_limit(limit);
+  dbiter->Seek(slice_limit);
+
+  if (!dbiter->Valid()) {
+    dbiter->SeekToLast();
+  } else {
+    dbiter->Prev();
+  }
+  return dbiter->status().ok() ? 0 : -1;
+}
+int RocksDBStore::RocksDBWholeSpaceIteratorImpl::upper_bound(const string &prefix, const string &after)
+{
+  lower_bound(prefix, after);
+  if (valid()) {
+  pair<string,string> key = raw_key();
+    if (key.first == prefix && key.second == after)
+      next();
+  }
+  return dbiter->status().ok() ? 0 : -1;
+}
+int RocksDBStore::RocksDBWholeSpaceIteratorImpl::lower_bound(const string &prefix, const string &to)
+{
+  string bound = combine_strings(prefix, to);
+  rocksdb::Slice slice_bound(bound);
+  dbiter->Seek(slice_bound);
+  return dbiter->status().ok() ? 0 : -1;
+}
+bool RocksDBStore::RocksDBWholeSpaceIteratorImpl::valid()
+{
+  return dbiter->Valid();
+}
+int RocksDBStore::RocksDBWholeSpaceIteratorImpl::next()
+{
+  if (valid())
+  dbiter->Next();
+  return dbiter->status().ok() ? 0 : -1;
+}
+int RocksDBStore::RocksDBWholeSpaceIteratorImpl::prev()
+{
+  if (valid())
+    dbiter->Prev();
+    return dbiter->status().ok() ? 0 : -1;
+}
+string RocksDBStore::RocksDBWholeSpaceIteratorImpl::key()
+{
+  string out_key;
+  split_key(dbiter->key(), 0, &out_key);
+  return out_key;
+}
+pair<string,string> RocksDBStore::RocksDBWholeSpaceIteratorImpl::raw_key()
+{
+  string prefix, key;
+  split_key(dbiter->key(), &prefix, &key);
+  return make_pair(prefix, key);
+}
+bufferlist RocksDBStore::RocksDBWholeSpaceIteratorImpl::value()
+{
+  return to_bufferlist(dbiter->value());
+}
+int RocksDBStore::RocksDBWholeSpaceIteratorImpl::status()
+{
+  return dbiter->status().ok() ? 0 : -1;
+}
+
+bool RocksDBStore::in_prefix(const string &prefix, rocksdb::Slice key)
+{
+  return (key.compare(rocksdb::Slice(past_prefix(prefix))) < 0) &&
+    (key.compare(rocksdb::Slice(prefix)) > 0);
+}
+string RocksDBStore::past_prefix(const string &prefix)
+{
+  string limit = prefix;
+  limit.push_back(1);
+  return limit;
+}
+
+
+RocksDBStore::WholeSpaceIterator RocksDBStore::_get_iterator()
+{
+  return std::tr1::shared_ptr<KeyValueDB::WholeSpaceIteratorImpl>(
+    new RocksDBWholeSpaceIteratorImpl(
+      db->NewIterator(rocksdb::ReadOptions())
+    )
+  );
+}
+
+RocksDBStore::WholeSpaceIterator RocksDBStore::_get_snapshot_iterator()
+{
+  const rocksdb::Snapshot *snapshot;
+  rocksdb::ReadOptions options;
+
+  snapshot = db->GetSnapshot();
+  options.snapshot = snapshot;
+
+  return std::tr1::shared_ptr<KeyValueDB::WholeSpaceIteratorImpl>(
+    new RocksDBSnapshotIteratorImpl(db, snapshot,
+      db->NewIterator(options))
+  );
+}
+
+RocksDBStore::RocksDBSnapshotIteratorImpl::~RocksDBSnapshotIteratorImpl()
+{
+  db->ReleaseSnapshot(snapshot);
+}
diff --git a/src/os/RocksDBStore.h b/src/os/RocksDBStore.h
new file mode 100644 (file)
index 0000000..103f955
--- /dev/null
@@ -0,0 +1,310 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+#ifndef ROCKS_DB_STORE_H
+#define ROCKS_DB_STORE_H
+
+#include "include/types.h"
+#include "include/buffer.h"
+#include "KeyValueDB.h"
+#include <set>
+#include <map>
+#include <string>
+#include <tr1/memory>
+#include <boost/scoped_ptr.hpp>
+
+#include <errno.h>
+#include "common/errno.h"
+#include "common/dout.h"
+#include "include/assert.h"
+#include "common/Formatter.h"
+
+#include "common/ceph_context.h"
+
+class PerfCounters;
+
+enum {
+  l_rocksdb_first = 34300,
+  l_rocksdb_gets,
+  l_rocksdb_txns,
+  l_rocksdb_compact,
+  l_rocksdb_compact_range,
+  l_rocksdb_compact_queue_merge,
+  l_rocksdb_compact_queue_len,
+  l_rocksdb_last,
+};
+
+namespace rocksdb{
+  class DB;
+  class Cache;
+  class FilterPolicy;
+  class Snapshot;
+  class Slice;
+  class WriteBatch;
+  class Iterator;
+}
+
+/**
+ * Uses RocksDB to implement the KeyValueDB interface
+ */
+class RocksDBStore : public KeyValueDB {
+  CephContext *cct;
+  PerfCounters *logger;
+  string path;
+  const rocksdb::FilterPolicy *filterpolicy;
+  rocksdb::DB *db;
+
+  int do_open(ostream &out, bool create_if_missing);
+
+  // manage async compactions
+  Mutex compact_queue_lock;
+  Cond compact_queue_cond;
+  list< pair<string,string> > compact_queue;
+  bool compact_queue_stop;
+  class CompactThread : public Thread {
+    RocksDBStore *db;
+  public:
+    CompactThread(RocksDBStore *d) : db(d) {}
+    void *entry() {
+      db->compact_thread_entry();
+      return NULL;
+    }
+    friend class RocksDBStore;
+  } compact_thread;
+
+  void compact_thread_entry();
+
+  void compact_range(const string& start, const string& end);
+  void compact_range_async(const string& start, const string& end);
+
+public:
+  /// compact the underlying rocksdb store
+  void compact();
+
+  static int _test_init(const string& dir);
+  int init();
+  /// compact rocksdb for all keys with a given prefix
+  void compact_prefix(const string& prefix) {
+    compact_range(prefix, past_prefix(prefix));
+  }
+  void compact_prefix_async(const string& prefix) {
+    compact_range_async(prefix, past_prefix(prefix));
+  }
+
+  void compact_range(const string& prefix, const string& start, const string& end) {
+    compact_range(combine_strings(prefix, start), combine_strings(prefix, end));
+  }
+  void compact_range_async(const string& prefix, const string& start, const string& end) {
+    compact_range_async(combine_strings(prefix, start), combine_strings(prefix, end));
+  }
+
+  /**
+   * options_t: Holds options which are minimally interpreted
+   * on initialization and then passed through to RocksDB.
+   * We transform a couple of these into actual RocksDB
+   * structures, but the rest are simply passed through unchanged. See
+   * rocksdb/options.h for more precise details on each.
+   *
+   * Set them after constructing the RocksDBStore, but before calling
+   * open() or create_and_open().
+   */
+  struct options_t {
+    uint64_t write_buffer_size; /// in-memory write buffer size
+    int max_open_files; /// maximum number of files RocksDB can open at once
+    uint64_t cache_size; /// size of extra decompressed cache to use
+    uint64_t block_size; /// user data per block
+    int bloom_size; /// number of bits per entry to put in a bloom filter
+    string compression_type; /// whether to use libsnappy compression or not
+
+    // don't change these ones. No, seriously
+    int block_restart_interval;
+    bool error_if_exists;
+    bool paranoid_checks;
+
+    string log_file;
+
+    options_t() :
+      write_buffer_size(0), //< 0 means default
+      max_open_files(0), //< 0 means default
+      cache_size(0), //< 0 means no cache (default)
+      block_size(0), //< 0 means default
+      bloom_size(0), //< 0 means no bloom filter (default)
+      compression_type("none"), //< set to false for no compression
+      block_restart_interval(0), //< 0 means default
+      error_if_exists(false), //< set to true if you want to check nonexistence
+      paranoid_checks(false) //< set to true if you want paranoid checks
+    {}
+  } options;
+
+  RocksDBStore(CephContext *c, const string &path) :
+    cct(c),
+    logger(NULL),
+    path(path),
+    compact_queue_lock("RocksDBStore::compact_thread_lock"),
+    compact_queue_stop(false),
+    compact_thread(this),
+    options()
+  {}
+
+  ~RocksDBStore();
+
+  static bool check_omap_dir(string &omap_dir);
+  /// Opens underlying db
+  int open(ostream &out) {
+    return do_open(out, false);
+  }
+  /// Creates underlying db if missing and opens it
+  int create_and_open(ostream &out) {
+    return do_open(out, true);
+  }
+
+  void close();
+
+  class RocksDBTransactionImpl : public KeyValueDB::TransactionImpl {
+  public:
+    rocksdb::WriteBatch *bat;
+    list<bufferlist> buffers;
+    list<string> keys;
+    RocksDBStore *db;
+
+    RocksDBTransactionImpl(RocksDBStore *_db);
+    ~RocksDBTransactionImpl();
+    void set(
+      const string &prefix,
+      const string &k,
+      const bufferlist &bl);
+    void rmkey(
+      const string &prefix,
+      const string &k);
+    void rmkeys_by_prefix(
+      const string &prefix
+      );
+  };
+
+  KeyValueDB::Transaction get_transaction() {
+    return std::tr1::shared_ptr< RocksDBTransactionImpl >(
+      new RocksDBTransactionImpl(this));
+  }
+
+  int submit_transaction(KeyValueDB::Transaction t);
+  int submit_transaction_sync(KeyValueDB::Transaction t);
+  int get(
+    const string &prefix,
+    const std::set<string> &key,
+    std::map<string, bufferlist> *out
+    );
+
+  class RocksDBWholeSpaceIteratorImpl :
+    public KeyValueDB::WholeSpaceIteratorImpl {
+  protected:
+    rocksdb::Iterator *dbiter;
+  public:
+    RocksDBWholeSpaceIteratorImpl(rocksdb::Iterator *iter) :
+      dbiter(iter) { }
+    //virtual ~RocksDBWholeSpaceIteratorImpl() { }
+    ~RocksDBWholeSpaceIteratorImpl();
+
+    int seek_to_first();
+    int seek_to_first(const string &prefix);
+    int seek_to_last();
+    int seek_to_last(const string &prefix);
+    int upper_bound(const string &prefix, const string &after);
+    int lower_bound(const string &prefix, const string &to);
+    bool valid();
+    int next();
+    int prev();
+    string key();
+    pair<string,string> raw_key();
+    bufferlist value();
+    int status();
+  };
+
+  class RocksDBSnapshotIteratorImpl : public RocksDBWholeSpaceIteratorImpl {
+    rocksdb::DB *db;
+    const rocksdb::Snapshot *snapshot;
+  public:
+    RocksDBSnapshotIteratorImpl(rocksdb::DB *db, const rocksdb::Snapshot *s,
+                               rocksdb::Iterator *iter) :
+      RocksDBWholeSpaceIteratorImpl(iter), db(db), snapshot(s) { }
+
+    ~RocksDBSnapshotIteratorImpl();
+  };
+
+  /// Utility
+  static string combine_strings(const string &prefix, const string &value);
+  static int split_key(rocksdb::Slice in, string *prefix, string *key);
+  static bufferlist to_bufferlist(rocksdb::Slice in);
+  static bool in_prefix(const string &prefix, rocksdb::Slice key);
+  static string past_prefix(const string &prefix);
+
+  virtual uint64_t get_estimated_size(map<string,uint64_t> &extra) {
+    DIR *store_dir = opendir(path.c_str());
+    if (!store_dir) {
+      lderr(cct) << __func__ << " something happened opening the store: "
+                 << cpp_strerror(errno) << dendl;
+      return 0;
+    }
+
+    uint64_t total_size = 0;
+    uint64_t sst_size = 0;
+    uint64_t log_size = 0;
+    uint64_t misc_size = 0;
+
+    struct dirent *entry = NULL;
+    while ((entry = readdir(store_dir)) != NULL) {
+      string n(entry->d_name);
+
+      if (n == "." || n == "..")
+        continue;
+
+      string fpath = path + '/' + n;
+      struct stat s;
+      int err = stat(fpath.c_str(), &s);
+      if (err < 0)
+       err = -errno;
+      // we may race against rocksdb while reading files; this should only
+      // happen when those files are being updated, data is being shuffled
+      // and files get removed, in which case there's not much of a problem
+      // as we'll get to them next time around.
+      if ((err < 0) && (err != -ENOENT)) {
+        lderr(cct) << __func__ << " error obtaining stats for " << fpath
+                   << ": " << cpp_strerror(err) << dendl;
+        goto err;
+      }
+
+      size_t pos = n.find_last_of('.');
+      if (pos == string::npos) {
+        misc_size += s.st_size;
+        continue;
+      }
+
+      string ext = n.substr(pos+1);
+      if (ext == "sst") {
+        sst_size += s.st_size;
+      } else if (ext == "log") {
+        log_size += s.st_size;
+      } else {
+        misc_size += s.st_size;
+      }
+    }
+
+    total_size = sst_size + log_size + misc_size;
+
+    extra["sst"] = sst_size;
+    extra["log"] = log_size;
+    extra["misc"] = misc_size;
+    extra["total"] = total_size;
+
+err:
+    closedir(store_dir);
+    return total_size;
+  }
+
+
+protected:
+  WholeSpaceIterator _get_iterator();
+
+  WholeSpaceIterator _get_snapshot_iterator();
+
+};
+
+#endif