#include <tr1/memory>
#include <errno.h>
using std::string;
+#include "common/perf_counters.h"
int LevelDBStore::init(ostream &out, bool create_if_missing)
{
if (!status.ok()) {
out << status.ToString() << std::endl;
return -EINVAL;
- } else
- return 0;
+ }
+
+ PerfCountersBuilder plb(g_ceph_context, "leveldb", l_leveldb_first, l_leveldb_last);
+ plb.add_u64_counter(l_leveldb_gets, "leveldb_get");
+ plb.add_u64_counter(l_leveldb_txns, "leveldb_transaction");
+ plb.add_u64_counter(l_leveldb_compact, "leveldb_compact");
+ plb.add_u64_counter(l_leveldb_compact_range, "leveldb_compact_range");
+ plb.add_u64_counter(l_leveldb_compact_queue_merge, "leveldb_compact_queue_merge");
+ plb.add_u64(l_leveldb_compact_queue_len, "leveldb_compact_queue_len");
+ logger = plb.create_perf_counters();
+ cct->get_perfcounters_collection()->add(logger);
+ return 0;
+}
+
+LevelDBStore::~LevelDBStore()
+{
+ close();
+ delete logger;
+}
+
+void LevelDBStore::close()
+{
+ // stop compaction thread
+ compact_queue_lock.Lock();
+ if (compact_thread.is_started()) {
+ compact_queue_stop = true;
+ compact_queue_cond.Signal();
+ compact_queue_lock.Unlock();
+ compact_thread.join();
+ } else {
+ compact_queue_lock.Unlock();
+ }
+
+ cct->get_perfcounters_collection()->remove(logger);
+}
+
+int LevelDBStore::submit_transaction(KeyValueDB::Transaction t)
+{
+ LevelDBTransactionImpl * _t =
+ static_cast<LevelDBTransactionImpl *>(t.get());
+ leveldb::Status s = db->Write(leveldb::WriteOptions(), &(_t->bat));
+ logger->inc(l_leveldb_txns);
+ return s.ok() ? 0 : -1;
+}
+
+int LevelDBStore::submit_transaction_sync(KeyValueDB::Transaction t)
+{
+ LevelDBTransactionImpl * _t =
+ static_cast<LevelDBTransactionImpl *>(t.get());
+ leveldb::WriteOptions options;
+ options.sync = true;
+ leveldb::Status s = db->Write(options, &(_t->bat));
+ logger->inc(l_leveldb_txns);
+ return s.ok() ? 0 : -1;
}
void LevelDBStore::LevelDBTransactionImpl::set(
} else if (!it->valid())
break;
}
+ logger->inc(l_leveldb_gets);
return 0;
}
return 0;
}
+void LevelDBStore::compact()
+{
+ logger->inc(l_leveldb_compact);
+ db->CompactRange(NULL, NULL);
+}
+
+
void LevelDBStore::compact_thread_entry()
{
compact_queue_lock.Lock();
while (!compact_queue.empty()) {
pair<string,string> range = compact_queue.front();
compact_queue.pop_front();
+ logger->set(l_leveldb_compact_queue_len, compact_queue.size());
compact_queue_lock.Unlock();
+ logger->inc(l_leveldb_compact_range);
compact_range(range.first, range.second);
compact_queue_lock.Lock();
continue;
// merge with existing range to the right
compact_queue.push_back(make_pair(start, p->second));
compact_queue.erase(p);
+ logger->inc(l_leveldb_compact_queue_merge);
break;
} else if (p->second >= start && p->second < end) {
// merge with existing range to the left
compact_queue.push_back(make_pair(p->first, end));
compact_queue.erase(p);
+ logger->inc(l_leveldb_compact_queue_merge);
break;
} else {
++p;
if (p == compact_queue.end()) {
// no merge, new entry.
compact_queue.push_back(make_pair(start, end));
+ logger->set(l_leveldb_compact_queue_len, compact_queue.size());
}
compact_queue_cond.Signal();
if (!compact_thread.is_started()) {
#include "leveldb/filter_policy.h"
#endif
+#include "common/ceph_context.h"
+
+class PerfCounters;
+
+enum {
+ l_leveldb_first = 34300,
+ l_leveldb_gets,
+ l_leveldb_txns,
+ l_leveldb_compact,
+ l_leveldb_compact_range,
+ l_leveldb_compact_queue_merge,
+ l_leveldb_compact_queue_len,
+ l_leveldb_last,
+};
+
/**
* Uses LevelDB to implement the KeyValueDB interface
*/
class LevelDBStore : public KeyValueDB {
+ CephContext *cct;
+ PerfCounters *logger;
string path;
boost::scoped_ptr<leveldb::DB> db;
boost::scoped_ptr<leveldb::Cache> db_cache;
public:
/// compact the underlying leveldb store
- void compact() {
- db->CompactRange(NULL, NULL);
- }
+ void compact();
/// compact leveldb for all keys with a given prefix
void compact_prefix(const string& prefix) {
{}
} options;
- LevelDBStore(const string &path) :
+ LevelDBStore(CephContext *c, const string &path) :
+ cct(c),
+ logger(NULL),
path(path),
db_cache(NULL),
#ifdef HAVE_LEVELDB_FILTER_POLICY
options()
{}
- ~LevelDBStore() {
- compact_queue_lock.Lock();
- if (compact_thread.is_started()) {
- compact_queue_stop = true;
- compact_queue_cond.Signal();
- compact_queue_lock.Unlock();
- compact_thread.join();
- } else {
- compact_queue_lock.Unlock();
- }
- }
+ ~LevelDBStore();
/// Opens underlying db
int open(ostream &out) {
return init(out, true);
}
+ void close();
+
class LevelDBTransactionImpl : public KeyValueDB::TransactionImpl {
public:
leveldb::WriteBatch bat;
new LevelDBTransactionImpl(this));
}
- int submit_transaction(KeyValueDB::Transaction t) {
- LevelDBTransactionImpl * _t =
- static_cast<LevelDBTransactionImpl *>(t.get());
- leveldb::Status s = db->Write(leveldb::WriteOptions(), &(_t->bat));
- return s.ok() ? 0 : -1;
- }
-
- int submit_transaction_sync(KeyValueDB::Transaction t) {
- LevelDBTransactionImpl * _t =
- static_cast<LevelDBTransactionImpl *>(t.get());
- leveldb::WriteOptions options;
- options.sync = true;
- leveldb::Status s = db->Write(options, &(_t->bat));
- return s.ok() ? 0 : -1;
- }
-
+ int submit_transaction(KeyValueDB::Transaction t);
+ int submit_transaction_sync(KeyValueDB::Transaction t);
int get(
const string &prefix,
const std::set<string> &key,