]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mon: MonitorDBStore: Add a key/value store to be used in the monitor
authorJoao Eduardo Luis <joao.luis@inktank.com>
Tue, 12 Jun 2012 22:51:10 +0000 (23:51 +0100)
committerJoao Eduardo Luis <joao.luis@inktank.com>
Wed, 20 Feb 2013 20:47:41 +0000 (20:47 +0000)
Signed-off-by: Joao Eduardo Luis <joao.luis@inktank.com>
src/Makefile.am
src/mon/MonitorDBStore.h [new file with mode: 0644]

index efff334e0456b3c172e916a7f4ffd2dcd3db5126..5d4c31449daf7c3fa44e0dc032710a6dc5a76fe4 100644 (file)
@@ -75,8 +75,8 @@ endif
 # monitor
 ceph_mon_SOURCES = ceph_mon.cc
 ceph_mon_LDFLAGS = $(AM_LDFLAGS)
-ceph_mon_LDADD = libmon.a $(LIBGLOBAL_LDA)
-ceph_mon_CXXFLAGS = ${AM_CXXFLAGS}
+ceph_mon_LDADD = libmon.a $(LIBOS_LDA) $(LIBGLOBAL_LDA)
+ceph_mon_CXXFLAGS = ${CRYPTO_CXXFLAGS} ${AM_CXXFLAGS} $(LEVELDB_INCLUDE)
 bin_PROGRAMS += ceph-mon
 
 # osd
@@ -106,6 +106,7 @@ ceph_CXXFLAGS = ${AM_CXXFLAGS}
 
 ceph_conf_SOURCES = ceph_conf.cc
 ceph_conf_LDADD = $(LIBGLOBAL_LDA)
+ceph_conf_CXXFLAGS = ${AM_CXXFLAGS} $(LEVELDB_INCLUDE)
 ceph_authtool_SOURCES = ceph_authtool.cc
 ceph_authtool_LDADD = $(LIBGLOBAL_LDA)
 ceph_filestore_dump_SOURCES = tools/ceph-filestore-dump.cc objclass/class_debug.cc \
@@ -129,7 +130,7 @@ rgw_dencoder_src = rgw/rgw_dencoder.cc \
                    rgw/rgw_acl.cc
 
 ceph_dencoder_SOURCES = test/encoding/ceph_dencoder.cc ${rgw_dencoder_src}
-ceph_dencoder_CXXFLAGS = ${AM_CXXFLAGS}
+ceph_dencoder_CXXFLAGS = ${AM_CXXFLAGS} $(LEVELDB_INCLUDE)
 ceph_dencoder_LDADD = $(LIBGLOBAL_LDA) libcls_lock_client.a libcls_rgw_client.a libosd.a libmds.a $(LIBOS_LDA) libmon.a
 bin_PROGRAMS += ceph-dencoder
 
@@ -1330,8 +1331,9 @@ libmon_a_SOURCES = \
        mon/LogMonitor.cc \
        mon/AuthMonitor.cc \
        mon/Elector.cc \
-       mon/MonitorStore.cc
-libmon_a_CXXFLAGS= ${AM_CXXFLAGS}
+       mon/MonitorStore.cc \
+       os/LevelDBStore.cc
+libmon_a_CXXFLAGS= ${AM_CXXFLAGS} $(LEVELDB_INCLUDE)
 noinst_LIBRARIES += libmon.a
 
 libmds_a_SOURCES = \
@@ -1812,7 +1814,7 @@ noinst_HEADERS = \
         mon/MonClient.h\
         mon/MonMap.h\
         mon/Monitor.h\
-        mon/MonitorStore.h\
+        mon/MonitorDBStore.h\
         mon/OSDMonitor.h\
         mon/PGMap.h\
         mon/PGMonitor.h\
diff --git a/src/mon/MonitorDBStore.h b/src/mon/MonitorDBStore.h
new file mode 100644 (file)
index 0000000..911281b
--- /dev/null
@@ -0,0 +1,292 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+* Ceph - scalable distributed file system
+*
+* Copyright (C) 2012 Inktank, Inc.
+*
+* This is free software; you can redistribute it and/or
+* modify it under the terms of the GNU Lesser General Public
+* License version 2.1, as published by the Free Software
+* Foundation. See file COPYING.
+*/
+#ifndef CEPH_MONITOR_DB_STORE_H
+#define CEPH_MONITOR_DB_STORE_H
+
+#include "include/types.h"
+#include "include/buffer.h"
+#include <set>
+#include <map>
+#include <string>
+#include <boost/scoped_ptr.hpp>
+#include <sstream>
+#include "os/KeyValueDB.h"
+#include "os/LevelDBStore.h"
+
+#include "include/assert.h"
+#include "common/Formatter.h"
+
+class MonitorDBStore
+{
+  boost::scoped_ptr<LevelDBStore> db;
+
+ public:
+
+  struct Op {
+    uint8_t type;
+    string prefix;
+    string key;
+    bufferlist bl;
+
+    Op() { }
+    Op(int t, string p, string k)
+      : type(t), prefix(p), key(k) { }
+    Op(int t, const string& p, string k, bufferlist& b)
+      : type(t), prefix(p), key(k), bl(b) { }
+
+    void encode(bufferlist& encode_bl) const {
+      ENCODE_START(1, 1, encode_bl);
+      ::encode(type, encode_bl);
+      ::encode(prefix, encode_bl);
+      ::encode(key, encode_bl);
+      ::encode(bl, encode_bl);
+      ENCODE_FINISH(encode_bl);
+    }
+
+    void decode(bufferlist::iterator& decode_bl) {
+      DECODE_START(1, decode_bl);
+      ::decode(type, decode_bl);
+      ::decode(prefix, decode_bl);
+      ::decode(key, decode_bl);
+      ::decode(bl, decode_bl);
+      DECODE_FINISH(decode_bl);
+    }
+  };
+
+  struct Transaction {
+    list<Op> ops;
+
+    enum {
+      OP_PUT   = 1,
+      OP_ERASE = 2,
+    };
+
+    void put(string prefix, string key, bufferlist& bl) {
+      ops.push_back(Op(OP_PUT, prefix, key, bl));
+    }
+
+    void put(string prefix, version_t ver, bufferlist& bl) {
+      ostringstream os;
+      os << ver;
+      put(prefix, os.str(), bl);
+    }
+
+    void put(string prefix, string key, version_t ver) {
+      bufferlist bl;
+      ::encode(ver, bl);
+      put(prefix, key, bl);
+    }
+
+    void erase(string prefix, string key) {
+      ops.push_back(Op(OP_ERASE, prefix, key));
+    }
+
+    void erase(string prefix, version_t ver) {
+      ostringstream os;
+      os << ver;
+      erase(prefix, os.str());
+    }
+
+    void encode(bufferlist& bl) const {
+      ENCODE_START(1, 1, bl);
+      ::encode(ops, bl);
+      ENCODE_FINISH(bl);
+    }
+
+    void decode(bufferlist::iterator& bl) {
+      DECODE_START(1, bl);
+      ::decode(ops, bl);
+      DECODE_FINISH(bl);
+    }
+
+    void append(Transaction& other) {
+      ops.splice(ops.end(), other.ops);
+    }
+
+    void append_from_encoded(bufferlist& bl) {
+      Transaction other;
+      bufferlist::iterator it = bl.begin();
+      other.decode(it);
+      append(other);
+    }
+
+    bool empty() {
+      return (ops.size() == 0);
+    }
+
+    void dump(ceph::Formatter *f) {
+      f->open_object_section("transaction");
+      f->open_array_section("ops");
+      list<Op>::iterator it;
+      int op_num = 0;
+      for (it = ops.begin(); it != ops.end(); ++it) {
+       Op& op = *it;
+       f->open_object_section("op");
+       f->dump_int("op_num", op_num++);
+       switch (op.type) {
+       case OP_PUT:
+         {
+           f->dump_string("type", "PUT");
+           f->dump_string("prefix", op.prefix);
+           f->dump_string("key", op.key);
+           ostringstream os;
+           op.bl.hexdump(os);
+           f->dump_unsigned("length", op.bl.length());
+           f->dump_string("bl", os.str());
+         }
+         break;
+       case OP_ERASE:
+         {
+           f->dump_string("type", "ERASE");
+           f->dump_string("prefix", op.prefix);
+           f->dump_string("key", op.key);
+         }
+         break;
+       default:
+         {
+           f->dump_string("type", "unknown");
+           f->dump_unsigned("op_code", op.type);
+           break;
+         }
+       }
+       f->close_section();
+      }
+      f->close_section();
+      f->close_section();
+    }
+  };
+
+  int apply_transaction(MonitorDBStore::Transaction& t) {
+    KeyValueDB::Transaction dbt = db->get_transaction();
+
+    for (list<Op>::iterator it = t.ops.begin(); it != t.ops.end(); ++it) {
+      Op& op = *it;
+      switch (op.type) {
+      case Transaction::OP_PUT:
+       dbt->set(op.prefix, op.key, op.bl);
+       break;
+      case Transaction::OP_ERASE:
+       dbt->rmkey(op.prefix, op.key);
+       break;
+      default:
+       derr << __func__ << " unknown op type " << op.type << dendl;
+       ceph_assert(0);
+       break;
+      }
+    }
+    return db->submit_transaction_sync(dbt);
+  }
+
+  int get(const string& prefix, const string& key, bufferlist& bl) {
+    set<string> k;
+    k.insert(key);
+    map<string,bufferlist> out;
+
+    db->get(prefix, k, &out);
+    if (!out.empty())
+      bl.append(out[key]);
+
+    return 0;
+  }
+
+  int get(const string& prefix, const version_t ver, bufferlist& bl) {
+    ostringstream os;
+    os << ver;
+    return get(prefix, os.str(), bl);
+  }
+
+  version_t get(const string& prefix, const string& key) {
+    bufferlist bl;
+    get(prefix, key, bl);
+    if (!bl.length()) // if key does not exist, assume its value is 0
+      return 0;
+
+    version_t ver;
+    bufferlist::iterator p = bl.begin();
+    ::decode(ver, p);
+    return ver;
+  }
+
+  bool exists(const string& prefix, const string& key) {
+    KeyValueDB::Iterator it = db->get_iterator(prefix);
+    int err = it->lower_bound(key);
+    if (err < 0)
+      return false;
+
+    return (it->valid() && it->key() == key);
+  }
+
+  bool exists(const string& prefix, version_t ver) {
+    ostringstream os;
+    os << ver;
+    return exists(prefix, os.str());
+  }
+
+  string combine_strings(const string& prefix, const string& value) {
+    string out = prefix;
+    out.push_back('_');
+    out.append(value);
+    return out;
+  }
+
+  string combine_strings(const string& prefix, const version_t ver) {
+    ostringstream os;
+    os << ver;
+    return combine_strings(prefix, os.str());
+  }
+
+  void clear(set<string>& prefixes) {
+    set<string>::iterator iter;
+    KeyValueDB::Transaction dbt = db->get_transaction();
+
+    for (iter = prefixes.begin(); iter != prefixes.end(); ++iter) {
+      dbt->rmkeys_by_prefix((*iter));
+    }
+    db->submit_transaction_sync(dbt);
+  }
+
+  MonitorDBStore(const string& path) : db(0) {
+
+    string::const_reverse_iterator rit;
+    int pos = 0;
+    for (rit = path.rbegin(); rit != path.rend(); ++rit, ++pos) {
+      if (*rit != '/')
+       break;
+    }
+    ostringstream os;
+    os << path.substr(0, path.size() - pos) << "/store.db";
+
+    string full_path = os.str();
+
+    LevelDBStore *db_ptr = new LevelDBStore(full_path);
+    if (!db_ptr) {
+      cerr << __func__ << " error initializing level db back storage in "
+          << full_path << std::endl;
+      assert(0 != "MonitorDBStore: error initializing level db back storage");
+    }
+    cout << __func__ << " initializing back storage in "
+        << full_path << std::endl;
+    assert(!db_ptr->init(cerr));
+    db.reset(db_ptr);
+  }
+  MonitorDBStore(LevelDBStore *db_ptr) {
+    db.reset(db_ptr);
+  }
+  ~MonitorDBStore() { }
+
+};
+
+WRITE_CLASS_ENCODER(MonitorDBStore::Op);
+WRITE_CLASS_ENCODER(MonitorDBStore::Transaction);
+
+#endif /* CEPH_MONITOR_DB_STORE_H */