]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
os: add prototype KineticStore 2091/head
authorJosh Durgin <josh.durgin@inktank.com>
Thu, 29 May 2014 19:23:30 +0000 (12:23 -0700)
committerJosh Durgin <josh.durgin@inktank.com>
Thu, 10 Jul 2014 20:23:35 +0000 (13:23 -0700)
Implement the KeyValueDB interface using libkinetic_client,
and allow it to be configured as the backend for the KeyValueStore,
running the entire OSD on it.

This prototype implementation has no transaction safety, and is
only suitable as a proof of concept. Since the libkinetic_client
API does not provide reverse iteration over keys without also reading
the value off disk, it implements iterators in a very slow but correct way.
These are used heavily by the KeyValueDB callers, so this is a bottleneck
in performance.

Signed-off-by: Josh Durgin <josh.durgin@inktank.com>
configure.ac
src/common/config_opts.h
src/os/KeyValueStore.cc
src/os/KeyValueStore.h
src/os/KineticStore.cc [new file with mode: 0644]
src/os/KineticStore.h [new file with mode: 0644]
src/os/Makefile.am
src/osd/Makefile.am

index c21ace0867ff850f9c8edcd6713536f43efa4706..0da47323e368b836faf7a51ef0c3724212ed6635 100644 (file)
@@ -517,6 +517,18 @@ AC_LANG_POP([C++])
 # Find supported SIMD / SSE extensions supported by the compiler
 AX_INTEL_FEATURES()
 
+# kinetic osd backend?
+AC_ARG_WITH([kinetic],
+            [AS_HELP_STRING([--with-kinetic], [build kinetic support])],
+            [],
+            [with_kinetic=no])
+# no pkg-config support yet
+#AS_IF([test "x$with_kinetic" = "xyes"],
+#            [PKG_CHECK_MODULES([KINETIC], [kinetic_client], [], [true])])
+AS_IF([test "x$with_kinetic" = "xyes"],
+            [AC_DEFINE([HAVE_KINETIC], [1], [Defined if you have kinetic enabled])])
+AM_CONDITIONAL(WITH_KINETIC, [ test "$with_kinetic" = "yes" ])
+
 # use system libs3?
 AC_ARG_WITH([system-libs3],
        [AS_HELP_STRING([--with-system-libs3], [use system libs3])],
index 338db2b5906928efb0700cc80cea73f6174c654a..885353930d820cadc06d7891e7dac26a50697d17 100644 (file)
@@ -569,6 +569,14 @@ OPTION(leveldb_paranoid, OPT_BOOL, false) // leveldb paranoid flag
 OPTION(leveldb_log, OPT_STR, "/dev/null")  // enable leveldb log file
 OPTION(leveldb_compact_on_mount, OPT_BOOL, false)
 
+OPTION(osd_keyvaluedb, OPT_STR, "leveldb")
+
+OPTION(kinetic_host, OPT_STR, "") // hostname or ip address of a kinetic drive to use
+OPTION(kinetic_port, OPT_INT, 8123) // port number of the kinetic drive
+OPTION(kinetic_user_id, OPT_INT, 1) // kinetic user to authenticate as
+OPTION(kinetic_hmac_key, OPT_STR, "asdfasdf") // kinetic key to authenticate with
+OPTION(kinetic_use_ssl, OPT_BOOL, false) // whether to secure kinetic traffic with TLS
+
 /**
  * osd_client_op_priority and osd_recovery_op_priority adjust the relative
  * priority of client io vs recovery io.
index 3bcbede58ea2546348d6a282ea173a50e18bc642..82d6a364dccc9bdb59a49465763ae82fcfb85d49 100644 (file)
 #include "common/sync_filesystem.h"
 #include "LevelDBStore.h"
 
+#ifdef HAVE_KINETIC
+#include "KineticStore.h"
+#endif
+
 #include "common/ceph_crypto.h"
 using ceph::crypto::SHA1;
 
@@ -590,6 +594,10 @@ int KeyValueStore::mkfs()
     KeyValueDB *store;
     if (kv_type == KV_TYPE_LEVELDB) {
       store = new LevelDBStore(g_ceph_context, current_fn);
+#ifdef HAVE_KINETIC
+    } else if (kv_type == KV_TYPE_KINETIC) {
+      store = new KineticStore(g_ceph_context);
+#endif
     } else {
       derr << "KeyValueStore::mkfs error: unknown backend type" << kv_type << dendl;
       ret = -1;
@@ -790,6 +798,10 @@ int KeyValueStore::mount()
     KeyValueDB *store;
     if (kv_type == KV_TYPE_LEVELDB) {
       store = new LevelDBStore(g_ceph_context, current_fn);
+#ifdef HAVE_KINETIC
+    } else if (kv_type == KV_TYPE_KINETIC) {
+      store = new KineticStore(g_ceph_context);
+#endif
     } else {
       derr << "KeyValueStore::mount error: unknown backend type" << kv_type
            << dendl;
index 5f62c5a0bfa5690539a3f0a087e013f4d3ecadf0..9609016942bd09cbd0f4d62f8d6dc737b9cfecc1 100644 (file)
@@ -43,6 +43,7 @@ using namespace std;
 enum kvstore_types {
     KV_TYPE_NONE = 0,
     KV_TYPE_LEVELDB,
+    KV_TYPE_KINETIC,
     KV_TYPE_OTHER
 };
 
@@ -442,7 +443,17 @@ class KeyValueStore : public ObjectStore,
                 bool update_to=false);
   ~KeyValueStore();
 
-  int _detect_backend() { kv_type = KV_TYPE_LEVELDB; return 0; }
+  int _detect_backend() {
+    if (g_conf->osd_keyvaluedb == "leveldb")
+      kv_type = KV_TYPE_LEVELDB;
+#ifdef HAVE_KINETIC
+    else if (g_conf->osd_keyvaluedb == "kinetic")
+      kv_type = KV_TYPE_KINETIC;
+#endif
+    else
+      return -EINVAL;
+    return 0;
+  }
   bool test_mount_in_use();
   int version_stamp_is_valid(uint32_t *version);
   int update_version_stamp();
diff --git a/src/os/KineticStore.cc b/src/os/KineticStore.cc
new file mode 100644 (file)
index 0000000..ba77376
--- /dev/null
@@ -0,0 +1,309 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+#include "KineticStore.h"
+#include "common/ceph_crypto.h"
+
+#include <set>
+#include <map>
+#include <string>
+#include "include/memory.h"
+#include <errno.h>
+using std::string;
+#include "common/perf_counters.h"
+
+#define dout_subsys ceph_subsys_keyvaluestore
+
+int KineticStore::init()
+{
+  // init defaults.  caller can override these if they want
+  // prior to calling open.
+  host = cct->_conf->kinetic_host;
+  port = cct->_conf->kinetic_port;
+  user_id = cct->_conf->kinetic_user_id;
+  hmac_key = cct->_conf->kinetic_hmac_key;
+  use_ssl = cct->_conf->kinetic_use_ssl;
+  return 0;
+}
+
+int KineticStore::do_open(ostream &out, bool create_if_missing)
+{
+  kinetic::KineticConnectionFactory conn_factory =
+    kinetic::NewKineticConnectionFactory();
+  kinetic::ConnectionOptions options;
+  options.host = host;
+  options.port = port;
+  options.user_id = user_id;
+  options.hmac_key = hmac_key;
+  options.use_ssl = use_ssl;
+  kinetic::Status status = conn_factory.NewThreadsafeBlockingConnection(options, kinetic_conn, 10);
+  if (!status.ok()) {
+    derr << "Unable to connect to kinetic store " << host << ":" << port
+        << " : " << status.ToString() << dendl;
+    return -EINVAL;
+  }
+
+  PerfCountersBuilder plb(g_ceph_context, "kinetic", l_kinetic_first, l_kinetic_last);
+  plb.add_u64_counter(l_kinetic_gets, "kinetic_get");
+  plb.add_u64_counter(l_kinetic_txns, "kinetic_transaction");
+  logger = plb.create_perf_counters();
+  cct->get_perfcounters_collection()->add(logger);
+  return 0;
+}
+
+KineticStore::KineticStore(CephContext *c) :
+  cct(c),
+  logger(NULL)
+{
+  host = c->_conf->kinetic_host;
+  port = c->_conf->kinetic_port;
+  user_id = c->_conf->kinetic_user_id;
+  hmac_key = c->_conf->kinetic_hmac_key;
+  use_ssl = c->_conf->kinetic_use_ssl;
+}
+
+KineticStore::~KineticStore()
+{
+  close();
+  delete logger;
+}
+
+void KineticStore::close()
+{
+  kinetic_conn.reset();
+  if (logger)
+    cct->get_perfcounters_collection()->remove(logger);
+}
+
+int KineticStore::submit_transaction(KeyValueDB::Transaction t)
+{
+  KineticTransactionImpl * _t =
+    static_cast<KineticTransactionImpl *>(t.get());
+
+  dout(20) << "kinetic submit_transaction" << dendl;
+
+  for (vector<KineticOp>::iterator it = _t->ops.begin();
+       it != _t->ops.end(); ++it) {
+    kinetic::KineticStatus status(kinetic::StatusCode::OK, "");
+    if (it->type == KINETIC_OP_WRITE) {
+      string data(it->data.c_str(), it->data.length());
+      kinetic::KineticRecord record(data, "", "",
+                                   com::seagate::kinetic::client::proto::Message_Algorithm_SHA1);
+      dout(30) << "kinetic before put of " << it->key << " (" << data.length() << " bytes)" << dendl;
+      status = kinetic_conn->Put(it->key, "", kinetic::WriteMode::IGNORE_VERSION,
+                                record);
+      dout(30) << "kinetic after put of " << it->key << dendl;
+    } else {
+      assert(it->type == KINETIC_OP_DELETE);
+      dout(30) << "kinetic before delete" << dendl;
+      status = kinetic_conn->Delete(it->key, "",
+                                   kinetic::WriteMode::IGNORE_VERSION);
+      dout(30) << "kinetic after delete" << dendl;
+    }
+    if (!status.ok()) {
+      derr << "kinetic error submitting transaction: "
+          << status.message() << dendl;
+      return -1;
+    }
+  }
+
+  logger->inc(l_kinetic_txns);
+  return 0;
+}
+
+int KineticStore::submit_transaction_sync(KeyValueDB::Transaction t)
+{
+  return submit_transaction(t);
+}
+
+void KineticStore::KineticTransactionImpl::set(
+  const string &prefix,
+  const string &k,
+  const bufferlist &to_set_bl)
+{
+  string key = combine_strings(prefix, k);
+  dout(30) << "kinetic set key " << key << dendl;
+  ops.push_back(KineticOp(KINETIC_OP_WRITE, key, to_set_bl));
+}
+
+void KineticStore::KineticTransactionImpl::rmkey(const string &prefix,
+                                                const string &k)
+{
+  string key = combine_strings(prefix, k);
+  dout(30) << "kinetic rm key " << key << dendl;
+  ops.push_back(KineticOp(KINETIC_OP_DELETE, key));
+}
+
+void KineticStore::KineticTransactionImpl::rmkeys_by_prefix(const string &prefix)
+{
+  dout(20) << "kinetic rmkeys_by_prefix " << prefix << dendl;
+  KeyValueDB::Iterator it = db->get_iterator(prefix);
+  for (it->seek_to_first();
+       it->valid();
+       it->next()) {
+    string key = combine_strings(prefix, it->key());
+    ops.push_back(KineticOp(KINETIC_OP_DELETE, key));
+    dout(30) << "kinetic rm key by prefix: " << key << dendl;
+  }
+}
+
+int KineticStore::get(
+    const string &prefix,
+    const std::set<string> &keys,
+    std::map<string, bufferlist> *out)
+{
+  dout(30) << "kinetic get prefix: " << prefix << " keys: " << keys << dendl;
+  for (std::set<string>::const_iterator i = keys.begin();
+       i != keys.end();
+       ++i) {
+    unique_ptr<kinetic::KineticRecord> record;
+    string key = combine_strings(prefix, *i);
+    dout(30) << "before get key " << key << dendl;
+    kinetic::KineticStatus status = kinetic_conn->Get(key, record);
+    if (!status.ok())
+      break;
+    dout(30) << "kinetic get got key: " << key << dendl;
+    out->insert(make_pair(key, to_bufferlist(*record.get())));
+  }
+  logger->inc(l_kinetic_gets);
+  return 0;
+}
+
+string KineticStore::combine_strings(const string &prefix, const string &value)
+{
+  string out = prefix;
+  out.push_back(1);
+  out.append(value);
+  return out;
+}
+
+bufferlist KineticStore::to_bufferlist(const kinetic::KineticRecord &record)
+{
+  bufferlist bl;
+  bl.append(*(record.value()));
+  return bl;
+}
+
+int KineticStore::split_key(string in_prefix, string *prefix, string *key)
+{
+  size_t prefix_len = in_prefix.find('\1');
+  if (prefix_len >= in_prefix.size())
+    return -EINVAL;
+
+  if (prefix)
+    *prefix = string(in_prefix, 0, prefix_len);
+  if (key)
+    *key= string(in_prefix, prefix_len + 1);
+  return 0;
+}
+
+KineticStore::KineticWholeSpaceIteratorImpl::KineticWholeSpaceIteratorImpl(kinetic::BlockingKineticConnection *conn) : kinetic_conn(conn),
+   kinetic_status(kinetic::StatusCode::OK, "")
+{
+  dout(30) << "kinetic iterator constructor()" << dendl;
+  const static string last_key = "\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF";
+  kinetic::KeyRangeIterator it =
+    kinetic_conn->IterateKeyRange("", true, last_key, true, 1024);
+  while (it != kinetic::KeyRangeEnd()) {
+    try {
+      keys.insert(*it);
+      dout(30) << "kinetic iterator added " << *it << dendl;
+    } catch (std::runtime_error &e) {
+      kinetic_status = kinetic::KineticStatus(kinetic::StatusCode::CLIENT_INTERNAL_ERROR, e.what());
+      return;
+    }
+    ++it;
+  }
+  keys_iter = keys.begin();
+}
+
+int KineticStore::KineticWholeSpaceIteratorImpl::seek_to_first(const string &prefix)
+{
+  dout(30) << "kinetic iterator seek_to_first(prefix): " << prefix << dendl;
+  keys_iter = keys.lower_bound(prefix);
+  return 0;
+}
+
+int KineticStore::KineticWholeSpaceIteratorImpl::seek_to_last()
+{
+  dout(30) << "kinetic iterator seek_to_last()" << dendl;
+  keys_iter = keys.end();
+  if (keys.begin() != keys_iter)
+    --keys_iter;
+  return 0;
+}
+
+int KineticStore::KineticWholeSpaceIteratorImpl::seek_to_last(const string &prefix)
+{
+  dout(30) << "kinetic iterator seek_to_last(prefix): " << prefix << dendl;
+  keys_iter = keys.upper_bound(prefix + "\2");
+  if (keys.begin() == keys_iter) {
+    keys_iter = keys.end();
+  } else {
+    --keys_iter;
+  }
+  return 0;
+}
+
+int KineticStore::KineticWholeSpaceIteratorImpl::upper_bound(const string &prefix, const string &after) {
+  dout(30) << "kinetic iterator upper_bound()" << dendl;
+  string bound = combine_strings(prefix, after);
+  keys_iter = keys.upper_bound(bound);
+  return 0;
+}
+
+int KineticStore::KineticWholeSpaceIteratorImpl::lower_bound(const string &prefix, const string &to) {
+  dout(30) << "kinetic iterator lower_bound()" << dendl;
+  string bound = combine_strings(prefix, to);
+  keys_iter = keys.lower_bound(bound);
+  return 0;
+}
+
+bool KineticStore::KineticWholeSpaceIteratorImpl::valid() {
+  dout(30) << "kinetic iterator valid()" << dendl;
+  return keys_iter != keys.end();
+}
+
+int KineticStore::KineticWholeSpaceIteratorImpl::next() {
+  dout(30) << "kinetic iterator next()" << dendl;
+  if (keys_iter != keys.end()) {
+      ++keys_iter;
+      return 0;
+  }
+  return -1;
+}
+
+int KineticStore::KineticWholeSpaceIteratorImpl::prev() {
+  dout(30) << "kinetic iterator prev()" << dendl;
+  if (keys_iter != keys.begin()) {
+      --keys_iter;
+      return 0;
+  }
+  keys_iter = keys.end();
+  return -1;
+}
+
+string KineticStore::KineticWholeSpaceIteratorImpl::key() {
+  dout(30) << "kinetic iterator key()" << dendl;
+  string out_key;
+  split_key(*keys_iter, NULL, &out_key);
+  return out_key;
+}
+
+pair<string,string> KineticStore::KineticWholeSpaceIteratorImpl::raw_key() {
+  dout(30) << "kinetic iterator raw_key()" << dendl;
+  string prefix, key;
+  split_key(*keys_iter, &prefix, &key);
+  return make_pair(prefix, key);
+}
+
+bufferlist KineticStore::KineticWholeSpaceIteratorImpl::value() {
+  dout(30) << "kinetic iterator value()" << dendl;
+  unique_ptr<kinetic::KineticRecord> record;
+  kinetic_status = kinetic_conn->Get(*keys_iter, record);
+  return to_bufferlist(*record.get());
+}
+
+int KineticStore::KineticWholeSpaceIteratorImpl::status() {
+  dout(30) << "kinetic iterator status()" << dendl;
+  return kinetic_status.ok() ? 0 : -1;
+}
diff --git a/src/os/KineticStore.h b/src/os/KineticStore.h
new file mode 100644 (file)
index 0000000..57b8a49
--- /dev/null
@@ -0,0 +1,159 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+#ifndef KINETIC_STORE_H
+#define KINETIC_STORE_H
+
+#include "include/types.h"
+#include "include/buffer.h"
+#include "KeyValueDB.h"
+#include <set>
+#include <map>
+#include <string>
+#include "include/memory.h"
+#include <kinetic/kinetic.h>
+
+#include <errno.h>
+#include "common/errno.h"
+#include "common/dout.h"
+#include "include/assert.h"
+#include "common/Formatter.h"
+
+#include "common/ceph_context.h"
+
+class PerfCounters;
+
+enum {
+  l_kinetic_first = 34400,
+  l_kinetic_gets,
+  l_kinetic_txns,
+  l_kinetic_last,
+};
+
+/**
+ * Uses Kinetic to implement the KeyValueDB interface
+ */
+class KineticStore : public KeyValueDB {
+  CephContext *cct;
+  PerfCounters *logger;
+  string host;
+  int port;
+  int user_id;
+  string hmac_key;
+  bool use_ssl;
+  std::unique_ptr<kinetic::BlockingKineticConnection> kinetic_conn;
+
+  int do_open(ostream &out, bool create_if_missing);
+
+public:
+  KineticStore(CephContext *c);
+  ~KineticStore();
+
+  int init();
+
+  /// Opens underlying db
+  int open(ostream &out) {
+    return do_open(out, false);
+  }
+  /// Creates underlying db if missing and opens it
+  int create_and_open(ostream &out) {
+    return do_open(out, true);
+  }
+
+  void close();
+
+  enum KineticOpType {
+    KINETIC_OP_WRITE,
+    KINETIC_OP_DELETE,
+  };
+
+  struct KineticOp {
+    KineticOpType type;
+    std::string key;
+    bufferlist data;
+    KineticOp(KineticOpType type, const string &key) : type(type), key(key) {}
+    KineticOp(KineticOpType type, const string &key, const bufferlist &data)
+      : type(type), key(key), data(data) {}
+  };
+
+  class KineticTransactionImpl : public KeyValueDB::TransactionImpl {
+  public:
+    vector<KineticOp> ops;
+    KineticStore *db;
+
+    KineticTransactionImpl(KineticStore *db) : db(db) {}
+    void set(
+      const string &prefix,
+      const string &k,
+      const bufferlist &bl);
+    void rmkey(
+      const string &prefix,
+      const string &k);
+    void rmkeys_by_prefix(
+      const string &prefix
+      );
+  };
+
+  KeyValueDB::Transaction get_transaction() {
+    return ceph::shared_ptr< KineticTransactionImpl >(
+      new KineticTransactionImpl(this));
+  }
+
+  int submit_transaction(KeyValueDB::Transaction t);
+  int submit_transaction_sync(KeyValueDB::Transaction t);
+  int get(
+    const string &prefix,
+    const std::set<string> &key,
+    std::map<string, bufferlist> *out
+    );
+
+  class KineticWholeSpaceIteratorImpl :
+    public KeyValueDB::WholeSpaceIteratorImpl {
+    std::set<std::string> keys;
+    std::set<std::string>::iterator keys_iter;
+    kinetic::BlockingKineticConnection *kinetic_conn;
+    kinetic::KineticStatus kinetic_status;
+  public:
+    KineticWholeSpaceIteratorImpl(kinetic::BlockingKineticConnection *conn);
+    virtual ~KineticWholeSpaceIteratorImpl() { }
+
+    int seek_to_first() {
+      return seek_to_first("");
+    }
+    int seek_to_first(const string &prefix);
+    int seek_to_last();
+    int seek_to_last(const string &prefix);
+    int upper_bound(const string &prefix, const string &after);
+    int lower_bound(const string &prefix, const string &to);
+    bool valid();
+    int next();
+    int prev();
+    string key();
+    pair<string,string> raw_key();
+    bufferlist value();
+    int status();
+  };
+
+  /// Utility
+  static string combine_strings(const string &prefix, const string &value);
+  static int split_key(string in_prefix, string *prefix, string *key);
+  static bufferlist to_bufferlist(const kinetic::KineticRecord &record);
+  virtual uint64_t get_estimated_size(map<string,uint64_t> &extra) {
+    // not used by the osd
+    return 0;
+  }
+
+
+protected:
+  WholeSpaceIterator _get_iterator() {
+    return ceph::shared_ptr<KeyValueDB::WholeSpaceIteratorImpl>(
+                                                               new KineticWholeSpaceIteratorImpl(kinetic_conn.get()));
+  }
+
+  // TODO: remove snapshots from interface
+  WholeSpaceIterator _get_snapshot_iterator() {
+    return _get_iterator();
+  }
+
+};
+
+#endif
index 252c678a64337f836feb90f6e9db3b2ea8e52b3e..f6ac4989c4a27d8be0dd3d13363be8a37063af4a 100644 (file)
@@ -29,6 +29,8 @@ if WITH_LIBZFS
 libos_la_SOURCES += os/ZFSFileStoreBackend.cc
 endif
 
+libos_la_CXXFLAGS = ${AM_CXXFLAGS}
+libos_la_LIBADD =
 noinst_LTLIBRARIES += libos.la
 
 noinst_HEADERS += \
@@ -66,3 +68,9 @@ noinst_LIBRARIES += libos_zfs.a
 noinst_HEADERS += os/ZFS.h
 endif
 
+if WITH_KINETIC
+libos_la_SOURCES += os/KineticStore.cc
+libos_la_CXXFLAGS += -std=gnu++11
+libos_la_LIBADD += -lkinetic_client -lprotobuf -lglog -lgflags libcrypto.a
+noinst_HEADERS += os/KineticStore.h
+endif
index 8a2cb2bda4f39b794d67d26054e7629dfb4bb7f9..0e8e0586510c433e76fb2b7fb496d618f1779889 100644 (file)
@@ -19,6 +19,8 @@ libosd_la_SOURCES = \
        osd/osd_types.cc \
        osd/ECUtil.cc \
        objclass/class_api.cc
+
+libosd_la_CXXFLAGS = ${AM_CXXFLAGS}
 libosd_la_LIBADD = $(LIBOSDC) $(LIBOS)
 noinst_LTLIBRARIES += libosd.la
 
@@ -45,3 +47,6 @@ noinst_HEADERS += \
        osd/Watch.h \
        osd/osd_types.h
 
+if WITH_KINETIC
+libosd_la_CXXFLAGS += -std=gnu++11
+endif