From 146196756c6b2f5e5a7538c514ff5fd9a4fd3d3e Mon Sep 17 00:00:00 2001 From: Joao Eduardo Luis Date: Tue, 12 Jun 2012 23:51:10 +0100 Subject: [PATCH] mon: MonitorDBStore: Add a key/value store to be used in the monitor Signed-off-by: Joao Eduardo Luis --- src/Makefile.am | 14 +- src/mon/MonitorDBStore.h | 292 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 300 insertions(+), 6 deletions(-) create mode 100644 src/mon/MonitorDBStore.h diff --git a/src/Makefile.am b/src/Makefile.am index efff334e0456b..5d4c31449daf7 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -75,8 +75,8 @@ endif # monitor ceph_mon_SOURCES = ceph_mon.cc ceph_mon_LDFLAGS = $(AM_LDFLAGS) -ceph_mon_LDADD = libmon.a $(LIBGLOBAL_LDA) -ceph_mon_CXXFLAGS = ${AM_CXXFLAGS} +ceph_mon_LDADD = libmon.a $(LIBOS_LDA) $(LIBGLOBAL_LDA) +ceph_mon_CXXFLAGS = ${CRYPTO_CXXFLAGS} ${AM_CXXFLAGS} $(LEVELDB_INCLUDE) bin_PROGRAMS += ceph-mon # osd @@ -106,6 +106,7 @@ ceph_CXXFLAGS = ${AM_CXXFLAGS} ceph_conf_SOURCES = ceph_conf.cc ceph_conf_LDADD = $(LIBGLOBAL_LDA) +ceph_conf_CXXFLAGS = ${AM_CXXFLAGS} $(LEVELDB_INCLUDE) ceph_authtool_SOURCES = ceph_authtool.cc ceph_authtool_LDADD = $(LIBGLOBAL_LDA) ceph_filestore_dump_SOURCES = tools/ceph-filestore-dump.cc objclass/class_debug.cc \ @@ -129,7 +130,7 @@ rgw_dencoder_src = rgw/rgw_dencoder.cc \ rgw/rgw_acl.cc ceph_dencoder_SOURCES = test/encoding/ceph_dencoder.cc ${rgw_dencoder_src} -ceph_dencoder_CXXFLAGS = ${AM_CXXFLAGS} +ceph_dencoder_CXXFLAGS = ${AM_CXXFLAGS} $(LEVELDB_INCLUDE) ceph_dencoder_LDADD = $(LIBGLOBAL_LDA) libcls_lock_client.a libcls_rgw_client.a libosd.a libmds.a $(LIBOS_LDA) libmon.a bin_PROGRAMS += ceph-dencoder @@ -1330,8 +1331,9 @@ libmon_a_SOURCES = \ mon/LogMonitor.cc \ mon/AuthMonitor.cc \ mon/Elector.cc \ - mon/MonitorStore.cc -libmon_a_CXXFLAGS= ${AM_CXXFLAGS} + mon/MonitorStore.cc \ + os/LevelDBStore.cc +libmon_a_CXXFLAGS= ${AM_CXXFLAGS} $(LEVELDB_INCLUDE) noinst_LIBRARIES += libmon.a libmds_a_SOURCES = \ @@ -1812,7 +1814,7 @@ noinst_HEADERS = \ mon/MonClient.h\ mon/MonMap.h\ mon/Monitor.h\ - mon/MonitorStore.h\ + mon/MonitorDBStore.h\ mon/OSDMonitor.h\ mon/PGMap.h\ mon/PGMonitor.h\ diff --git a/src/mon/MonitorDBStore.h b/src/mon/MonitorDBStore.h new file mode 100644 index 0000000000000..911281bc71784 --- /dev/null +++ b/src/mon/MonitorDBStore.h @@ -0,0 +1,292 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* +* Ceph - scalable distributed file system +* +* Copyright (C) 2012 Inktank, Inc. +* +* This is free software; you can redistribute it and/or +* modify it under the terms of the GNU Lesser General Public +* License version 2.1, as published by the Free Software +* Foundation. See file COPYING. +*/ +#ifndef CEPH_MONITOR_DB_STORE_H +#define CEPH_MONITOR_DB_STORE_H + +#include "include/types.h" +#include "include/buffer.h" +#include +#include +#include +#include +#include +#include "os/KeyValueDB.h" +#include "os/LevelDBStore.h" + +#include "include/assert.h" +#include "common/Formatter.h" + +class MonitorDBStore +{ + boost::scoped_ptr db; + + public: + + struct Op { + uint8_t type; + string prefix; + string key; + bufferlist bl; + + Op() { } + Op(int t, string p, string k) + : type(t), prefix(p), key(k) { } + Op(int t, const string& p, string k, bufferlist& b) + : type(t), prefix(p), key(k), bl(b) { } + + void encode(bufferlist& encode_bl) const { + ENCODE_START(1, 1, encode_bl); + ::encode(type, encode_bl); + ::encode(prefix, encode_bl); + ::encode(key, encode_bl); + ::encode(bl, encode_bl); + ENCODE_FINISH(encode_bl); + } + + void decode(bufferlist::iterator& decode_bl) { + DECODE_START(1, decode_bl); + ::decode(type, decode_bl); + ::decode(prefix, decode_bl); + ::decode(key, decode_bl); + ::decode(bl, decode_bl); + DECODE_FINISH(decode_bl); + } + }; + + struct Transaction { + list ops; + + enum { + OP_PUT = 1, + OP_ERASE = 2, + }; + + void put(string prefix, string key, bufferlist& bl) { + ops.push_back(Op(OP_PUT, prefix, key, bl)); + } + + void put(string prefix, version_t ver, bufferlist& bl) { + ostringstream os; + os << ver; + put(prefix, os.str(), bl); + } + + void put(string prefix, string key, version_t ver) { + bufferlist bl; + ::encode(ver, bl); + put(prefix, key, bl); + } + + void erase(string prefix, string key) { + ops.push_back(Op(OP_ERASE, prefix, key)); + } + + void erase(string prefix, version_t ver) { + ostringstream os; + os << ver; + erase(prefix, os.str()); + } + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + ::encode(ops, bl); + ENCODE_FINISH(bl); + } + + void decode(bufferlist::iterator& bl) { + DECODE_START(1, bl); + ::decode(ops, bl); + DECODE_FINISH(bl); + } + + void append(Transaction& other) { + ops.splice(ops.end(), other.ops); + } + + void append_from_encoded(bufferlist& bl) { + Transaction other; + bufferlist::iterator it = bl.begin(); + other.decode(it); + append(other); + } + + bool empty() { + return (ops.size() == 0); + } + + void dump(ceph::Formatter *f) { + f->open_object_section("transaction"); + f->open_array_section("ops"); + list::iterator it; + int op_num = 0; + for (it = ops.begin(); it != ops.end(); ++it) { + Op& op = *it; + f->open_object_section("op"); + f->dump_int("op_num", op_num++); + switch (op.type) { + case OP_PUT: + { + f->dump_string("type", "PUT"); + f->dump_string("prefix", op.prefix); + f->dump_string("key", op.key); + ostringstream os; + op.bl.hexdump(os); + f->dump_unsigned("length", op.bl.length()); + f->dump_string("bl", os.str()); + } + break; + case OP_ERASE: + { + f->dump_string("type", "ERASE"); + f->dump_string("prefix", op.prefix); + f->dump_string("key", op.key); + } + break; + default: + { + f->dump_string("type", "unknown"); + f->dump_unsigned("op_code", op.type); + break; + } + } + f->close_section(); + } + f->close_section(); + f->close_section(); + } + }; + + int apply_transaction(MonitorDBStore::Transaction& t) { + KeyValueDB::Transaction dbt = db->get_transaction(); + + for (list::iterator it = t.ops.begin(); it != t.ops.end(); ++it) { + Op& op = *it; + switch (op.type) { + case Transaction::OP_PUT: + dbt->set(op.prefix, op.key, op.bl); + break; + case Transaction::OP_ERASE: + dbt->rmkey(op.prefix, op.key); + break; + default: + derr << __func__ << " unknown op type " << op.type << dendl; + ceph_assert(0); + break; + } + } + return db->submit_transaction_sync(dbt); + } + + int get(const string& prefix, const string& key, bufferlist& bl) { + set k; + k.insert(key); + map out; + + db->get(prefix, k, &out); + if (!out.empty()) + bl.append(out[key]); + + return 0; + } + + int get(const string& prefix, const version_t ver, bufferlist& bl) { + ostringstream os; + os << ver; + return get(prefix, os.str(), bl); + } + + version_t get(const string& prefix, const string& key) { + bufferlist bl; + get(prefix, key, bl); + if (!bl.length()) // if key does not exist, assume its value is 0 + return 0; + + version_t ver; + bufferlist::iterator p = bl.begin(); + ::decode(ver, p); + return ver; + } + + bool exists(const string& prefix, const string& key) { + KeyValueDB::Iterator it = db->get_iterator(prefix); + int err = it->lower_bound(key); + if (err < 0) + return false; + + return (it->valid() && it->key() == key); + } + + bool exists(const string& prefix, version_t ver) { + ostringstream os; + os << ver; + return exists(prefix, os.str()); + } + + string combine_strings(const string& prefix, const string& value) { + string out = prefix; + out.push_back('_'); + out.append(value); + return out; + } + + string combine_strings(const string& prefix, const version_t ver) { + ostringstream os; + os << ver; + return combine_strings(prefix, os.str()); + } + + void clear(set& prefixes) { + set::iterator iter; + KeyValueDB::Transaction dbt = db->get_transaction(); + + for (iter = prefixes.begin(); iter != prefixes.end(); ++iter) { + dbt->rmkeys_by_prefix((*iter)); + } + db->submit_transaction_sync(dbt); + } + + MonitorDBStore(const string& path) : db(0) { + + string::const_reverse_iterator rit; + int pos = 0; + for (rit = path.rbegin(); rit != path.rend(); ++rit, ++pos) { + if (*rit != '/') + break; + } + ostringstream os; + os << path.substr(0, path.size() - pos) << "/store.db"; + + string full_path = os.str(); + + LevelDBStore *db_ptr = new LevelDBStore(full_path); + if (!db_ptr) { + cerr << __func__ << " error initializing level db back storage in " + << full_path << std::endl; + assert(0 != "MonitorDBStore: error initializing level db back storage"); + } + cout << __func__ << " initializing back storage in " + << full_path << std::endl; + assert(!db_ptr->init(cerr)); + db.reset(db_ptr); + } + MonitorDBStore(LevelDBStore *db_ptr) { + db.reset(db_ptr); + } + ~MonitorDBStore() { } + +}; + +WRITE_CLASS_ENCODER(MonitorDBStore::Op); +WRITE_CLASS_ENCODER(MonitorDBStore::Transaction); + +#endif /* CEPH_MONITOR_DB_STORE_H */ -- 2.39.5