]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
Add KeyValueStore implementation
authorHaomai Wang <haomaiwang@gmail.com>
Wed, 29 Jan 2014 09:50:10 +0000 (17:50 +0800)
committerHaomai Wang <haomaiwang@gmail.com>
Wed, 29 Jan 2014 13:50:15 +0000 (21:50 +0800)
KeyValueStore is another ObjectStore implementation with FileStore. It
uses KV store wrapper(StripObjectMap) which inherited GenericObjectMap
to implement ObjectStore APIs.

Each object has a header key in KV backend, which encapsulated the metadata
of object such as size, the status of keys. A complete object data maybe spread
around multi keys. The CRUD operation of object need to access the header key
of object to know the details, then the actual data keys will be get.

Now the actual KV backend of KeyValueStore is only LevelDB, more KV backend
(RocksDB, NVM API) will be introduced in the near future.

Signed-off-by: Haomai Wang <haomaiwang@gmail.com>
src/os/KeyValueStore.cc [new file with mode: 0644]
src/os/KeyValueStore.h [new file with mode: 0644]
src/os/Makefile.am
src/os/ObjectStore.cc

diff --git a/src/os/KeyValueStore.cc b/src/os/KeyValueStore.cc
new file mode 100644 (file)
index 0000000..e209dbc
--- /dev/null
@@ -0,0 +1,2978 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2013 UnitedStack <haomai@unitedstack.com>
+ *
+ * Author: Haomai Wang <haomaiwang@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#include "include/int_types.h"
+
+#include <unistd.h>
+#include <stdlib.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <sys/file.h>
+#include <errno.h>
+#include <dirent.h>
+
+#include <iostream>
+#include <map>
+
+#include "include/compat.h"
+
+#include <fstream>
+#include <sstream>
+
+#include "KeyValueStore.h"
+#include "common/BackTrace.h"
+#include "include/types.h"
+
+#include "osd/osd_types.h"
+#include "include/color.h"
+#include "include/buffer.h"
+
+#include "common/debug.h"
+#include "common/errno.h"
+#include "common/run_cmd.h"
+#include "common/safe_io.h"
+#include "common/perf_counters.h"
+#include "common/sync_filesystem.h"
+#include "LevelDBStore.h"
+
+#include "common/ceph_crypto.h"
+using ceph::crypto::SHA1;
+
+#include "include/assert.h"
+
+#include "common/config.h"
+
+#define dout_subsys ceph_subsys_keyvaluestore
+
+const string KeyValueStore::OBJECT_STRIP_PREFIX = "_STRIP_";
+const string KeyValueStore::OBJECT_XATTR = "__OBJATTR__";
+const string KeyValueStore::OBJECT_OMAP = "__OBJOMAP__";
+const string KeyValueStore::OBJECT_OMAP_HEADER = "__OBJOMAP_HEADER__";
+const string KeyValueStore::OBJECT_OMAP_HEADER_KEY = "__OBJOMAP_HEADER__KEY_";
+const string KeyValueStore::COLLECTION = "__COLLECTION__";
+const string KeyValueStore::COLLECTION_ATTR = "__COLL_ATTR__";
+
+// ============== StripObjectMap Implementation =================
+
+void StripObjectMap::sync_wrap(StripObjectHeader &strip_header,
+                               KeyValueDB::Transaction t,
+                               const SequencerPosition &spos)
+{
+  dout(10) << __func__ << " cid: " << strip_header.cid << "oid: "
+           << strip_header.oid << " setting spos to " << strip_header.spos
+           << dendl;
+  strip_header.spos = spos;
+  strip_header.header->data.clear();
+  ::encode(strip_header, strip_header.header->data);
+
+  sync(strip_header.header, t);
+}
+
+bool StripObjectMap::check_spos(const StripObjectHeader &header,
+                                const SequencerPosition *spos)
+{
+  if (!spos || *spos > header.spos) {
+    stringstream out;
+    if (spos)
+      dout(10) << "cid: " << "oid: " << header.oid
+               << " not skipping op, *spos " << *spos << dendl;
+    else
+      dout(10) << "cid: " << "oid: " << header.oid
+               << " not skipping op, *spos " << "empty" << dendl;
+    dout(10) << " > header.spos " << header.spos << dendl;
+    return false;
+  } else {
+    dout(10) << "cid: " << "oid: " << header.oid << " skipping op, *spos "
+             << *spos << " <= header.spos " << header.spos << dendl;
+    return true;
+  }
+}
+
+int StripObjectMap::save_strip_header(StripObjectHeader &strip_header,
+                                      KeyValueDB::Transaction t)
+{
+  strip_header.header->data.clear();
+  ::encode(strip_header, strip_header.header->data);
+
+  set_header(strip_header.cid, strip_header.oid, *(strip_header.header), t);
+  return 0;
+}
+
+int StripObjectMap::create_strip_header(const coll_t &cid,
+                                        const ghobject_t &oid,
+                                        StripObjectHeader &strip_header,
+                                        KeyValueDB::Transaction t)
+{
+  Header header = lookup_create_header(cid, oid, t);
+  if (!header)
+    return -EINVAL;
+
+  strip_header.oid = oid;
+  strip_header.cid = cid;
+  strip_header.header = header;
+
+  return 0;
+}
+
+int StripObjectMap::lookup_strip_header(const coll_t &cid,
+                                        const ghobject_t &oid,
+                                        StripObjectHeader &strip_header)
+{
+  Header header = lookup_header(cid, oid);
+
+  if (!header) {
+    dout(20) << "lookup_strip_header failed to get strip_header "
+             << " cid " << cid <<" oid " << oid << dendl;
+    return -ENOENT;
+  }
+
+  if (header->data.length()) {
+    bufferlist::iterator bliter = header->data.begin();
+    ::decode(strip_header, bliter);
+  }
+
+  if (strip_header.strip_size == 0)
+    strip_header.strip_size = default_strip_size;
+
+  strip_header.oid = oid;
+  strip_header.cid = cid;
+  strip_header.header = header;
+
+  dout(10) << "lookup_strip_header done " << " cid " << cid << " oid "
+           << oid << dendl;
+  return 0;
+}
+
+int StripObjectMap::file_to_extents(uint64_t offset, size_t len,
+                                    uint64_t strip_size,
+                                    vector<StripExtent> &extents)
+{
+  if (len == 0)
+    return 0;
+
+  uint64_t start, end, strip_offset, extent_offset, extent_len;
+  start = offset / strip_size;
+  end = (offset + len) / strip_size;
+  strip_offset = start * strip_size;
+
+  // "offset" may in the middle of first strip object
+  if (offset > strip_offset) {
+    extent_offset = offset - strip_offset;
+    if (extent_offset + len <= strip_size)
+      extent_len = len;
+    else
+      extent_len = strip_size - extent_offset;
+    extents.push_back(StripExtent(start, extent_offset, extent_len));
+    start++;
+    strip_offset += strip_size;
+  }
+
+  for (; start < end; ++start) {
+    extents.push_back(StripExtent(start, 0, strip_size));
+    strip_offset += strip_size;
+  }
+
+  // The end of strip object may be partial
+  if (offset + len > strip_offset)
+    extents.push_back(StripExtent(start, 0, offset+len-strip_offset));
+
+  assert(extents.size());
+  dout(10) << "file_to_extents done " << dendl;
+  return 0;
+}
+
+void StripObjectMap::clone_wrap(StripObjectHeader &old_header,
+                                const coll_t &cid, const ghobject_t &oid,
+                                KeyValueDB::Transaction t,
+                                const SequencerPosition &spos,
+                                StripObjectHeader *origin_header,
+                                StripObjectHeader *target_header)
+{
+  Header new_origin_header;
+
+  if (target_header)
+    *target_header = old_header;
+
+  clone(old_header.header, cid, oid, t, &new_origin_header,
+        &target_header->header);
+
+  old_header.header = new_origin_header;
+
+  if (origin_header)
+    origin_header->spos = spos;
+
+  if (target_header) {
+    target_header->oid = oid;
+    target_header->cid = cid;
+    target_header->spos = spos;
+  }
+}
+
+void StripObjectMap::rename_wrap(const coll_t &cid, const ghobject_t &oid,
+                                 KeyValueDB::Transaction t,
+                                 const SequencerPosition &spos,
+                                 StripObjectHeader *header)
+{
+  assert(header);
+  rename(header->header, cid, oid, t);
+
+  if (header) {
+    header->oid = oid;
+    header->cid = cid;
+    header->spos = spos;
+  }
+}
+
+
+// =========== KeyValueStore::SubmitManager Implementation ==============
+
+uint64_t KeyValueStore::SubmitManager::op_submit_start()
+{
+  lock.Lock();
+  uint64_t op = ++op_seq;
+  dout(10) << "op_submit_start " << op << dendl;
+  return op;
+}
+
+void KeyValueStore::SubmitManager::op_submit_finish(uint64_t op)
+{
+  dout(10) << "op_submit_finish " << op << dendl;
+  if (op != op_submitted + 1) {
+      dout(0) << "op_submit_finish " << op << " expected " << (op_submitted + 1)
+          << ", OUT OF ORDER" << dendl;
+      assert(0 == "out of order op_submit_finish");
+  }
+  op_submitted = op;
+  lock.Unlock();
+}
+
+
+// ========= KeyValueStore::BufferTransaction Implementation ============
+
+int KeyValueStore::BufferTransaction::check_coll(const coll_t &cid)
+{
+  int r = store->_check_coll(cid);
+  if (r == 0)
+    return r;
+
+  StripHeaderMap::iterator it = strip_headers.find(
+      make_pair(get_coll_for_coll(), make_ghobject_for_coll(cid)));
+  if (it != strip_headers.end() && !it->second.deleted) {
+    return 0;
+  }
+  return -ENOENT;
+}
+
+int KeyValueStore::BufferTransaction::lookup_cached_header(
+    const coll_t &cid, const ghobject_t &oid,
+    StripObjectMap::StripObjectHeader **strip_header,
+    bool create_if_missing)
+{
+  if (check_coll(cid) < 0)
+    return -ENOENT;
+
+  StripObjectMap::StripObjectHeader header;
+  int r = 0;
+
+  StripHeaderMap::iterator it = strip_headers.find(make_pair(cid, oid));
+  if (it != strip_headers.end()) {
+    if (it->second.deleted)
+      return -ENOENT;
+
+    if (strip_header)
+      *strip_header = &it->second;
+    return 0;
+  }
+
+  r = store->backend->lookup_strip_header(cid, oid, header);
+  if (r < 0 && create_if_missing) {
+    r = store->backend->create_strip_header(cid, oid, header, t);
+  }
+
+  if (r < 0) {
+    dout(10) << __func__  << " " << cid << "/" << oid << " "
+             << " r = " << r << dendl;
+    return r;
+  }
+
+  strip_headers[make_pair(cid, oid)] = header;
+  if (strip_header)
+    *strip_header = &strip_headers[make_pair(cid, oid)];
+  return r;
+}
+
+int KeyValueStore::BufferTransaction::get_buffer_key(
+    StripObjectMap::StripObjectHeader *strip_header, const string &prefix,
+    const string &key, bufferlist &bl)
+{
+  if (strip_header->buffers.count(make_pair(prefix, key))) {
+    bl.swap(strip_header->buffers[make_pair(prefix, key)]);
+    return 0;
+  }
+
+  set<string> keys;
+  map<string, bufferlist> out;
+  keys.insert(key);
+  int r = store->backend->get_values(strip_header->cid, strip_header->oid,
+                                     prefix, keys, &out);
+  if (r < 0) {
+    dout(10) << __func__  << " " << strip_header->cid << "/"
+             << strip_header->oid << " " << " r = " << r << dendl;
+    return r;
+  }
+
+  assert(out.size() == 1);
+  bl.swap(out.begin()->second);
+  return 0;
+}
+
+void KeyValueStore::BufferTransaction::set_buffer_keys(
+     const string &prefix, StripObjectMap::StripObjectHeader *strip_header,
+     map<string, bufferlist> &values)
+{
+  if (store->backend->check_spos(*strip_header, &spos))
+    return ;
+
+  store->backend->set_keys(strip_header->header, prefix, values, t);
+
+  for (map<string, bufferlist>::iterator iter = values.begin();
+       iter != values.end(); ++iter) {
+    strip_header->buffers[make_pair(prefix, iter->first)].swap(iter->second);
+  }
+}
+
+int KeyValueStore::BufferTransaction::remove_buffer_keys(
+     const string &prefix, StripObjectMap::StripObjectHeader *strip_header,
+     const set<string> &keys)
+{
+  if (store->backend->check_spos(*strip_header, &spos))
+    return 0;
+
+  for (set<string>::iterator iter = keys.begin(); iter != keys.end(); ++iter) {
+    strip_header->buffers[make_pair(prefix, *iter)] = bufferlist();
+  }
+
+  return store->backend->rm_keys(strip_header->header, prefix, keys, t);
+}
+
+void KeyValueStore::BufferTransaction::clear_buffer_keys(
+     const string &prefix, StripObjectMap::StripObjectHeader *strip_header)
+{
+  for (map<pair<string, string>, bufferlist>::iterator iter = strip_header->buffers.begin();
+       iter != strip_header->buffers.end(); ++iter) {
+    if (iter->first.first == prefix)
+      iter->second = bufferlist();
+  }
+}
+
+int KeyValueStore::BufferTransaction::clear_buffer(
+     StripObjectMap::StripObjectHeader *strip_header)
+{
+  if (store->backend->check_spos(*strip_header, &spos))
+    return 0;
+
+  strip_header->deleted = true;
+
+  return store->backend->clear(strip_header->header, t);
+}
+
+void KeyValueStore::BufferTransaction::clone_buffer(
+    StripObjectMap::StripObjectHeader *old_header,
+    const coll_t &cid, const ghobject_t &oid)
+{
+  if (store->backend->check_spos(*old_header, &spos))
+    return ;
+
+  // Remove target ahead to avoid dead lock
+  strip_headers.erase(make_pair(cid, oid));
+
+  StripObjectMap::StripObjectHeader new_origin_header, new_target_header;
+
+  store->backend->clone_wrap(*old_header, cid, oid, t, spos,
+                             &new_origin_header, &new_target_header);
+
+  // FIXME: Lacking of lock for origin header(now become parent), it will
+  // cause other operation can get the origin header while submitting
+  // transactions
+  strip_headers[make_pair(cid, old_header->oid)] = new_origin_header;
+  strip_headers[make_pair(cid, oid)] = new_target_header;
+}
+
+void KeyValueStore::BufferTransaction::rename_buffer(
+    StripObjectMap::StripObjectHeader *old_header,
+    const coll_t &cid, const ghobject_t &oid)
+{
+  if (store->backend->check_spos(*old_header, &spos))
+    return ;
+
+  // FIXME: Lacking of lock for origin header, it will cause other operation
+  // can get the origin header while submitting transactions
+  store->backend->rename_wrap(cid, oid, t, spos, old_header);
+
+  strip_headers.erase(make_pair(old_header->cid, old_header->oid));
+  strip_headers[make_pair(cid, oid)] = *old_header;
+}
+
+int KeyValueStore::BufferTransaction::submit_transaction()
+{
+  int r = 0;
+
+  for (StripHeaderMap::iterator header_iter = strip_headers.begin();
+       header_iter != strip_headers.end(); ++header_iter) {
+    StripObjectMap::StripObjectHeader header = header_iter->second;
+
+    if (store->backend->check_spos(header, &spos))
+      continue;
+
+    if (header.deleted)
+      continue;
+
+    r = store->backend->save_strip_header(header, t);
+    if (r < 0) {
+      dout(10) << __func__ << " save strip header failed " << dendl;
+      goto out;
+    }
+  }
+
+out:
+
+  dout(5) << __func__ << " r = " << r << dendl;
+  return store->backend->submit_transaction(t);
+}
+
+// =========== KeyValueStore Intern Helper Implementation ==============
+
+ostream& operator<<(ostream& out, const KeyValueStore::OpSequencer& s)
+{
+  assert(&out);
+  return out << *s.parent;
+}
+
+int KeyValueStore::_create_current()
+{
+  struct stat st;
+  int ret = ::stat(current_fn.c_str(), &st);
+  if (ret == 0) {
+    // current/ exists
+    if (!S_ISDIR(st.st_mode)) {
+      dout(0) << "_create_current: current/ exists but is not a directory" << dendl;
+      ret = -EINVAL;
+    }
+  } else {
+    ret = ::mkdir(current_fn.c_str(), 0755);
+    if (ret < 0) {
+      ret = -errno;
+      dout(0) << "_create_current: mkdir " << current_fn << " failed: "<< cpp_strerror(ret) << dendl;
+    }
+  }
+
+  return ret;
+}
+
+
+
+// =========== KeyValueStore API Implementation ==============
+
+KeyValueStore::KeyValueStore(const std::string &base,
+                             const char *name, bool do_update) :
+  ObjectStore(base),
+  internal_name(name),
+  basedir(base),
+  fsid_fd(-1), op_fd(-1), current_fd(-1),
+  kv_type(KV_TYPE_NONE),
+  backend(NULL),
+  ondisk_finisher(g_ceph_context),
+  lock("KeyValueStore::lock"),
+  default_osr("default"),
+  op_queue_len(0), op_queue_bytes(0),
+  op_finisher(g_ceph_context),
+  op_tp(g_ceph_context, "KeyValueStore::op_tp",
+        g_conf->filestore_op_threads, "keyvaluestore_op_threads"),
+  op_wq(this, g_conf->filestore_op_thread_timeout,
+        g_conf->filestore_op_thread_suicide_timeout, &op_tp),
+  logger(NULL),
+  read_error_lock("KeyValueStore::read_error_lock"),
+  m_fail_eio(g_conf->filestore_fail_eio),
+  do_update(do_update)
+{
+  ostringstream oss;
+  oss << basedir << "/current";
+  current_fn = oss.str();
+
+  ostringstream sss;
+  sss << basedir << "/current/commit_op_seq";
+  current_op_seq_fn = sss.str();
+
+  // initialize logger
+  PerfCountersBuilder plb(g_ceph_context, internal_name, 0, 1);
+  logger = plb.create_perf_counters();
+
+  g_ceph_context->get_perfcounters_collection()->add(logger);
+  g_ceph_context->_conf->add_observer(this);
+}
+
+KeyValueStore::~KeyValueStore()
+{
+  g_ceph_context->_conf->remove_observer(this);
+  g_ceph_context->get_perfcounters_collection()->remove(logger);
+
+  delete logger;
+}
+
+int KeyValueStore::statfs(struct statfs *buf)
+{
+  if (::statfs(basedir.c_str(), buf) < 0) {
+    int r = -errno;
+    assert(!m_fail_eio || r != -EIO);
+    return r;
+  }
+  return 0;
+}
+
+int KeyValueStore::mkfs()
+{
+  int ret = 0;
+  char fsid_fn[PATH_MAX];
+  uuid_d old_fsid;
+
+  dout(1) << "mkfs in " << basedir << dendl;
+
+  // open+lock fsid
+  snprintf(fsid_fn, sizeof(fsid_fn), "%s/fsid", basedir.c_str());
+  fsid_fd = ::open(fsid_fn, O_RDWR|O_CREAT, 0644);
+  if (fsid_fd < 0) {
+    ret = -errno;
+    derr << "mkfs: failed to open " << fsid_fn << ": " << cpp_strerror(ret) << dendl;
+    return ret;
+  }
+
+  if (lock_fsid() < 0) {
+    ret = -EBUSY;
+    goto close_fsid_fd;
+  }
+
+  if (read_fsid(fsid_fd, &old_fsid) < 0 || old_fsid.is_zero()) {
+    if (fsid.is_zero()) {
+      fsid.generate_random();
+      dout(1) << "mkfs generated fsid " << fsid << dendl;
+    } else {
+      dout(1) << "mkfs using provided fsid " << fsid << dendl;
+    }
+
+    char fsid_str[40];
+    fsid.print(fsid_str);
+    strcat(fsid_str, "\n");
+    ret = ::ftruncate(fsid_fd, 0);
+    if (ret < 0) {
+      ret = -errno;
+      derr << "mkfs: failed to truncate fsid: " << cpp_strerror(ret) << dendl;
+      goto close_fsid_fd;
+    }
+    ret = safe_write(fsid_fd, fsid_str, strlen(fsid_str));
+    if (ret < 0) {
+      derr << "mkfs: failed to write fsid: " << cpp_strerror(ret) << dendl;
+      goto close_fsid_fd;
+    }
+    if (::fsync(fsid_fd) < 0) {
+      ret = errno;
+      derr << "mkfs: close failed: can't write fsid: "
+           << cpp_strerror(ret) << dendl;
+      goto close_fsid_fd;
+    }
+    dout(10) << "mkfs fsid is " << fsid << dendl;
+  } else {
+    if (!fsid.is_zero() && fsid != old_fsid) {
+      derr << "mkfs on-disk fsid " << old_fsid << " != provided " << fsid << dendl;
+      ret = -EINVAL;
+      goto close_fsid_fd;
+    }
+    fsid = old_fsid;
+    dout(1) << "mkfs fsid is already set to " << fsid << dendl;
+  }
+
+  // version stamp
+  ret = write_version_stamp();
+  if (ret < 0) {
+    derr << "mkfs: write_version_stamp() failed: "
+         << cpp_strerror(ret) << dendl;
+    goto close_fsid_fd;
+  }
+
+  ret = _create_current();
+  if (ret < 0) {
+    derr << "mkfs: failed to create current/ " << cpp_strerror(ret) << dendl;
+    goto close_fsid_fd;
+  }
+
+  if (_detect_backend()) {
+    derr << "KeyValueStore::mkfs error in _detect_backend" << dendl;
+    ret = -1;
+    goto close_fsid_fd;
+  }
+
+  {
+    KeyValueDB *store;
+    if (kv_type == KV_TYPE_LEVELDB) {
+      store = new LevelDBStore(g_ceph_context, current_fn);
+    } else {
+      derr << "KeyValueStore::mkfs error: unknown backend type" << kv_type << dendl;
+      ret = -1;
+      goto close_fsid_fd;
+    }
+
+    store->init();
+    stringstream err;
+    if (store->create_and_open(err)) {
+      derr << "KeyValueStore::mkfs failed to create keyvaluestore backend: "
+           << err.str() << dendl;
+      ret = -1;
+      delete store;
+      goto close_fsid_fd;
+    } else {
+      delete store;
+      dout(1) << "keyvaluestore backend exists/created" << dendl;
+    }
+  }
+
+  dout(1) << "mkfs done in " << basedir << dendl;
+  ret = 0;
+
+ close_fsid_fd:
+  TEMP_FAILURE_RETRY(::close(fsid_fd));
+  fsid_fd = -1;
+  return ret;
+}
+
+int KeyValueStore::read_fsid(int fd, uuid_d *uuid)
+{
+  char fsid_str[40];
+  int ret = safe_read(fd, fsid_str, sizeof(fsid_str));
+  if (ret < 0)
+    return ret;
+  if (ret == 8) {
+    // old 64-bit fsid... mirror it.
+    *(uint64_t*)&uuid->uuid[0] = *(uint64_t*)fsid_str;
+    *(uint64_t*)&uuid->uuid[8] = *(uint64_t*)fsid_str;
+    return 0;
+  }
+
+  if (ret > 36)
+    fsid_str[36] = 0;
+  if (!uuid->parse(fsid_str))
+    return -EINVAL;
+  return 0;
+}
+
+int KeyValueStore::lock_fsid()
+{
+  struct flock l;
+  memset(&l, 0, sizeof(l));
+  l.l_type = F_WRLCK;
+  l.l_whence = SEEK_SET;
+  l.l_start = 0;
+  l.l_len = 0;
+  int r = ::fcntl(fsid_fd, F_SETLK, &l);
+  if (r < 0) {
+    int err = errno;
+    dout(0) << "lock_fsid failed to lock " << basedir
+            << "/fsid, is another ceph-osd still running? "
+            << cpp_strerror(err) << dendl;
+    return -err;
+  }
+  return 0;
+}
+
+bool KeyValueStore::test_mount_in_use()
+{
+  dout(5) << "test_mount basedir " << basedir << dendl;
+  char fn[PATH_MAX];
+  snprintf(fn, sizeof(fn), "%s/fsid", basedir.c_str());
+
+  // verify fs isn't in use
+
+  fsid_fd = ::open(fn, O_RDWR, 0644);
+  if (fsid_fd < 0)
+    return 0;   // no fsid, ok.
+  bool inuse = lock_fsid() < 0;
+  TEMP_FAILURE_RETRY(::close(fsid_fd));
+  fsid_fd = -1;
+  return inuse;
+}
+
+int KeyValueStore::update_version_stamp()
+{
+  return write_version_stamp();
+}
+
+int KeyValueStore::version_stamp_is_valid(uint32_t *version)
+{
+  bufferptr bp(PATH_MAX);
+  int ret = safe_read_file(basedir.c_str(), "store_version",
+      bp.c_str(), bp.length());
+  if (ret < 0) {
+    if (ret == -ENOENT)
+      return 0;
+    return ret;
+  }
+  bufferlist bl;
+  bl.push_back(bp);
+  bufferlist::iterator i = bl.begin();
+  ::decode(*version, i);
+  if (*version == target_version)
+    return 1;
+  else
+    return 0;
+}
+
+int KeyValueStore::write_version_stamp()
+{
+  bufferlist bl;
+  ::encode(target_version, bl);
+
+  return safe_write_file(basedir.c_str(), "store_version",
+      bl.c_str(), bl.length());
+}
+
+int KeyValueStore::mount()
+{
+  int ret;
+  char buf[PATH_MAX];
+
+  dout(5) << "basedir " << basedir << dendl;
+
+  // make sure global base dir exists
+  if (::access(basedir.c_str(), R_OK | W_OK)) {
+    ret = -errno;
+    derr << "KeyValueStore::mount: unable to access basedir '" << basedir
+         << "': " << cpp_strerror(ret) << dendl;
+    goto done;
+  }
+
+  // get fsid
+  snprintf(buf, sizeof(buf), "%s/fsid", basedir.c_str());
+  fsid_fd = ::open(buf, O_RDWR, 0644);
+  if (fsid_fd < 0) {
+    ret = -errno;
+    derr << "KeyValueStore::mount: error opening '" << buf << "': "
+         << cpp_strerror(ret) << dendl;
+    goto done;
+  }
+
+  ret = read_fsid(fsid_fd, &fsid);
+  if (ret < 0) {
+    derr << "KeyValueStore::mount: error reading fsid_fd: "
+         << cpp_strerror(ret) << dendl;
+    goto close_fsid_fd;
+  }
+
+  if (lock_fsid() < 0) {
+    derr << "KeyValueStore::mount: lock_fsid failed" << dendl;
+    ret = -EBUSY;
+    goto close_fsid_fd;
+  }
+
+  dout(10) << "mount fsid is " << fsid << dendl;
+
+  uint32_t version_stamp;
+  ret = version_stamp_is_valid(&version_stamp);
+  if (ret < 0) {
+    derr << "KeyValueStore::mount : error in version_stamp_is_valid: "
+         << cpp_strerror(ret) << dendl;
+    goto close_fsid_fd;
+  } else if (ret == 0) {
+    if (do_update) {
+      derr << "KeyValueStore::mount : stale version stamp detected: "
+           << version_stamp << ". Proceeding, do_update "
+           << "is set, performing disk format upgrade." << dendl;
+    } else {
+      ret = -EINVAL;
+      derr << "KeyValueStore::mount : stale version stamp " << version_stamp
+           << ". Please run the KeyValueStore update script before starting "
+           << "the OSD, or set keyvaluestore_update_to to " << target_version
+           << dendl;
+      goto close_fsid_fd;
+    }
+  }
+
+  current_fd = ::open(current_fn.c_str(), O_RDONLY);
+  if (current_fd < 0) {
+    ret = -errno;
+    derr << "KeyValueStore::mount: error opening: " << current_fn << ": "
+         << cpp_strerror(ret) << dendl;
+    goto close_fsid_fd;
+  }
+
+  assert(current_fd >= 0);
+
+  if (_detect_backend()) {
+    derr << "KeyValueStore::mount error in _detect_backend" << dendl;
+    ret = -1;
+    goto close_current_fd;
+  }
+
+  {
+    KeyValueDB *store;
+    if (kv_type == KV_TYPE_LEVELDB) {
+      store = new LevelDBStore(g_ceph_context, current_fn);
+    } else {
+      derr << "KeyValueStore::mount error: unknown backend type" << kv_type
+           << dendl;
+      ret = -1;
+      goto close_current_fd;
+    }
+
+    store->init();
+    stringstream err;
+    if (store->open(err)) {
+      derr << "KeyValueStore::mount Error initializing keyvaluestore backend: "
+           << err.str() << dendl;
+      ret = -1;
+      delete store;
+      goto close_current_fd;
+    }
+
+    StripObjectMap *dbomap = new StripObjectMap(store);
+    ret = dbomap->init(do_update);
+    if (ret < 0) {
+      delete dbomap;
+      derr << "Error initializing StripObjectMap: " << ret << dendl;
+      goto close_current_fd;
+    }
+    stringstream err2;
+
+    if (g_conf->filestore_debug_omap_check && !dbomap->check(err2)) {
+      derr << err2.str() << dendl;;
+      delete dbomap;
+      ret = -EINVAL;
+      goto close_current_fd;
+    }
+
+    backend.reset(dbomap);
+  }
+
+  op_tp.start();
+  op_finisher.start();
+  ondisk_finisher.start();
+
+  // all okay.
+  return 0;
+
+close_current_fd:
+  TEMP_FAILURE_RETRY(::close(current_fd));
+  current_fd = -1;
+close_fsid_fd:
+  TEMP_FAILURE_RETRY(::close(fsid_fd));
+  fsid_fd = -1;
+done:
+  assert(!m_fail_eio || ret != -EIO);
+  return ret;
+}
+
+int KeyValueStore::umount()
+{
+  dout(5) << "umount " << basedir << dendl;
+
+  op_tp.stop();
+  op_finisher.stop();
+  ondisk_finisher.stop();
+
+  if (fsid_fd >= 0) {
+    TEMP_FAILURE_RETRY(::close(fsid_fd));
+    fsid_fd = -1;
+  }
+  if (op_fd >= 0) {
+    TEMP_FAILURE_RETRY(::close(op_fd));
+    op_fd = -1;
+  }
+  if (current_fd >= 0) {
+    TEMP_FAILURE_RETRY(::close(current_fd));
+    current_fd = -1;
+  }
+
+  backend.reset();
+
+  // nothing
+  return 0;
+}
+
+int KeyValueStore::get_max_object_name_length()
+{
+  lock.Lock();
+  int ret = pathconf(basedir.c_str(), _PC_NAME_MAX);
+  if (ret < 0) {
+    int err = errno;
+    lock.Unlock();
+    if (err == 0)
+      return -EDOM;
+    return -err;
+  }
+  lock.Unlock();
+  return ret;
+}
+
+int KeyValueStore::queue_transactions(Sequencer *posr, list<Transaction*> &tls,
+                                      TrackedOpRef osd_op,
+                                      ThreadPool::TPHandle *handle)
+{
+  Context *onreadable;
+  Context *ondisk;
+  Context *onreadable_sync;
+  ObjectStore::Transaction::collect_contexts(
+    tls, &onreadable, &ondisk, &onreadable_sync);
+
+  // set up the sequencer
+  OpSequencer *osr;
+  if (!posr)
+    posr = &default_osr;
+  if (posr->p) {
+    osr = static_cast<OpSequencer *>(posr->p);
+    dout(5) << "queue_transactions existing " << *osr << "/" << osr->parent
+            << dendl; //<< " w/ q " << osr->q << dendl;
+  } else {
+    osr = new OpSequencer;
+    osr->parent = posr;
+    posr->p = osr;
+    dout(5) << "queue_transactions new " << *osr << "/" << osr->parent << dendl;
+  }
+
+  Op *o = build_op(tls, ondisk, onreadable, onreadable_sync, osd_op);
+  uint64_t op = submit_manager.op_submit_start();
+  o->op = op;
+  dout(5) << "queue_transactions (trailing journal) " << op << " "
+          << tls <<dendl;
+  queue_op(osr, o);
+
+  submit_manager.op_submit_finish(op);
+
+  return 0;
+}
+
+
+// ============== KeyValueStore Op Handler =================
+
+KeyValueStore::Op *KeyValueStore::build_op(list<Transaction*>& tls,
+        Context *ondisk, Context *onreadable, Context *onreadable_sync,
+        TrackedOpRef osd_op)
+{
+  uint64_t bytes = 0, ops = 0;
+  for (list<Transaction*>::iterator p = tls.begin();
+       p != tls.end();
+       ++p) {
+    bytes += (*p)->get_num_bytes();
+    ops += (*p)->get_num_ops();
+  }
+
+  Op *o = new Op;
+  o->start = ceph_clock_now(g_ceph_context);
+  o->tls.swap(tls);
+  o->ondisk = ondisk;
+  o->onreadable = onreadable;
+  o->onreadable_sync = onreadable_sync;
+  o->ops = ops;
+  o->bytes = bytes;
+  o->osd_op = osd_op;
+  return o;
+}
+
+void KeyValueStore::queue_op(OpSequencer *osr, Op *o)
+{
+  // queue op on sequencer, then queue sequencer for the threadpool,
+  // so that regardless of which order the threads pick up the
+  // sequencer, the op order will be preserved.
+
+  osr->queue(o);
+
+  dout(5) << "queue_op " << o << " seq " << o->op << " " << *osr << " "
+          << o->bytes << " bytes" << "   (queue has " << op_queue_len
+          << " ops and " << op_queue_bytes << " bytes)" << dendl;
+  op_wq.queue(osr);
+}
+
+void KeyValueStore::_do_op(OpSequencer *osr, ThreadPool::TPHandle &handle)
+{
+  // inject a stall?
+  if (g_conf->filestore_inject_stall) {
+    int orig = g_conf->filestore_inject_stall;
+    dout(5) << "_do_op filestore_inject_stall " << orig << ", sleeping" << dendl;
+    for (int n = 0; n < g_conf->filestore_inject_stall; n++)
+      sleep(1);
+    g_conf->set_val("filestore_inject_stall", "0");
+    dout(5) << "_do_op done stalling" << dendl;
+  }
+
+  // FIXME: Suppose the collection of transaction only affect objects in the
+  // one PG, so this lock will ensure no other concurrent write operation
+  osr->apply_lock.Lock();
+  Op *o = osr->peek_queue();
+  dout(5) << "_do_op " << o << " seq " << o->op << " " << *osr << "/" << osr->parent << " start" << dendl;
+  int r = _do_transactions(o->tls, o->op, &handle);
+  dout(10) << "_do_op " << o << " seq " << o->op << " r = " << r
+           << ", finisher " << o->onreadable << " " << o->onreadable_sync << dendl;
+
+  if (o->ondisk) {
+    if (r < 0) {
+      delete o->ondisk;
+      o->ondisk = 0;
+    } else {
+      ondisk_finisher.queue(o->ondisk, r);
+    }
+  }
+}
+
+void KeyValueStore::_finish_op(OpSequencer *osr)
+{
+  Op *o = osr->dequeue();
+
+  dout(10) << "_finish_op " << o << " seq " << o->op << " " << *osr << "/" << osr->parent << dendl;
+  osr->apply_lock.Unlock();  // locked in _do_op
+
+  utime_t lat = ceph_clock_now(g_ceph_context);
+  lat -= o->start;
+
+  if (o->onreadable_sync) {
+    o->onreadable_sync->complete(0);
+  }
+  op_finisher.queue(o->onreadable);
+  delete o;
+}
+
+// Combine all the ops in the same transaction using "BufferTransaction" and
+// cache the middle results in order to make visible to the following ops.
+//
+// Lock: KeyValueStore use "in_use" in GenericObjectMap to avoid concurrent
+// operation on the same object. Not sure ReadWrite lock should be applied to
+// improve concurrent performance. In the future, I'd like to remove apply_lock
+// on "osr" and introduce PG RWLock.
+int KeyValueStore::_do_transactions(list<Transaction*> &tls, uint64_t op_seq,
+  ThreadPool::TPHandle *handle)
+{
+  int r = 0;
+
+  uint64_t bytes = 0, ops = 0;
+  for (list<Transaction*>::iterator p = tls.begin();
+       p != tls.end();
+       ++p) {
+    bytes += (*p)->get_num_bytes();
+    ops += (*p)->get_num_ops();
+  }
+
+  int trans_num = 0;
+  SequencerPosition spos(op_seq, trans_num, 0);
+  BufferTransaction bt(this, spos);
+
+  for (list<Transaction*>::iterator p = tls.begin();
+       p != tls.end();
+       ++p, trans_num++) {
+    r = _do_transaction(**p, bt, spos, handle);
+    if (r < 0)
+      break;
+    if (handle)
+      handle->reset_tp_timeout();
+  }
+
+  r = bt.submit_transaction();
+  if (r < 0) {
+    assert(0 == "unexpected error");  // FIXME
+  }
+
+  return r;
+}
+
+unsigned KeyValueStore::_do_transaction(Transaction& transaction,
+                                        BufferTransaction &t,
+                                        SequencerPosition& spos,
+                                        ThreadPool::TPHandle *handle)
+{
+  dout(10) << "_do_transaction on " << &transaction << dendl;
+
+  Transaction::iterator i = transaction.begin();
+
+  while (i.have_op()) {
+    if (handle)
+      handle->reset_tp_timeout();
+
+    int op = i.get_op();
+    int r = 0;
+
+    switch (op) {
+    case Transaction::OP_NOP:
+      break;
+
+    case Transaction::OP_TOUCH:
+      {
+        coll_t cid = i.get_cid();
+        ghobject_t oid = i.get_oid();
+        r = _touch(cid, oid, t);
+      }
+      break;
+
+    case Transaction::OP_WRITE:
+      {
+        coll_t cid = i.get_cid();
+        ghobject_t oid = i.get_oid();
+        uint64_t off = i.get_length();
+        uint64_t len = i.get_length();
+        bool replica = i.get_replica();
+        bufferlist bl;
+        i.get_bl(bl);
+        r = _write(cid, oid, off, len, bl, t, replica);
+      }
+      break;
+
+    case Transaction::OP_ZERO:
+      {
+        coll_t cid = i.get_cid();
+        ghobject_t oid = i.get_oid();
+        uint64_t off = i.get_length();
+        uint64_t len = i.get_length();
+        r = _zero(cid, oid, off, len, t);
+      }
+      break;
+
+    case Transaction::OP_TRIMCACHE:
+      {
+        i.get_cid();
+        i.get_oid();
+        i.get_length();
+        i.get_length();
+        // deprecated, no-op
+      }
+      break;
+
+    case Transaction::OP_TRUNCATE:
+      {
+        coll_t cid = i.get_cid();
+        ghobject_t oid = i.get_oid();
+        uint64_t off = i.get_length();
+        r = _truncate(cid, oid, off, t);
+      }
+      break;
+
+    case Transaction::OP_REMOVE:
+      {
+        coll_t cid = i.get_cid();
+        ghobject_t oid = i.get_oid();
+        r = _remove(cid, oid, t);
+      }
+      break;
+
+    case Transaction::OP_SETATTR:
+      {
+        coll_t cid = i.get_cid();
+        ghobject_t oid = i.get_oid();
+        string name = i.get_attrname();
+        bufferlist bl;
+        i.get_bl(bl);
+        map<string, bufferptr> to_set;
+        to_set[name] = bufferptr(bl.c_str(), bl.length());
+        r = _setattrs(cid, oid, to_set, t);
+        if (r == -ENOSPC)
+          dout(0) << " ENOSPC on setxattr on " << cid << "/" << oid
+                  << " name " << name << " size " << bl.length() << dendl;
+      }
+      break;
+
+    case Transaction::OP_SETATTRS:
+      {
+        coll_t cid = i.get_cid();
+        ghobject_t oid = i.get_oid();
+        map<string, bufferptr> aset;
+        i.get_attrset(aset);
+        r = _setattrs(cid, oid, aset, t);
+        if (r == -ENOSPC)
+          dout(0) << " ENOSPC on setxattrs on " << cid << "/" << oid << dendl;
+      }
+      break;
+
+    case Transaction::OP_RMATTR:
+      {
+        coll_t cid = i.get_cid();
+        ghobject_t oid = i.get_oid();
+        string name = i.get_attrname();
+        r = _rmattr(cid, oid, name.c_str(), t);
+      }
+      break;
+
+    case Transaction::OP_RMATTRS:
+      {
+        coll_t cid = i.get_cid();
+        ghobject_t oid = i.get_oid();
+        r = _rmattrs(cid, oid, t);
+      }
+      break;
+
+    case Transaction::OP_CLONE:
+      {
+        coll_t cid = i.get_cid();
+        ghobject_t oid = i.get_oid();
+        ghobject_t noid = i.get_oid();
+        r = _clone(cid, oid, noid, t);
+      }
+      break;
+
+    case Transaction::OP_CLONERANGE:
+      {
+        coll_t cid = i.get_cid();
+        ghobject_t oid = i.get_oid();
+        ghobject_t noid = i.get_oid();
+        uint64_t off = i.get_length();
+        uint64_t len = i.get_length();
+        r = _clone_range(cid, oid, noid, off, len, off, t);
+      }
+      break;
+
+    case Transaction::OP_CLONERANGE2:
+      {
+        coll_t cid = i.get_cid();
+        ghobject_t oid = i.get_oid();
+        ghobject_t noid = i.get_oid();
+        uint64_t srcoff = i.get_length();
+        uint64_t len = i.get_length();
+        uint64_t dstoff = i.get_length();
+        r = _clone_range(cid, oid, noid, srcoff, len, dstoff, t);
+      }
+      break;
+
+    case Transaction::OP_MKCOLL:
+      {
+        coll_t cid = i.get_cid();
+        r = _create_collection(cid, t);
+      }
+      break;
+
+    case Transaction::OP_RMCOLL:
+      {
+        coll_t cid = i.get_cid();
+        r = _destroy_collection(cid, t);
+      }
+      break;
+
+    case Transaction::OP_COLL_ADD:
+      {
+        coll_t ncid = i.get_cid();
+        coll_t ocid = i.get_cid();
+        ghobject_t oid = i.get_oid();
+        r = _collection_add(ncid, ocid, oid, t);
+      }
+      break;
+
+    case Transaction::OP_COLL_REMOVE:
+       {
+        coll_t cid = i.get_cid();
+        ghobject_t oid = i.get_oid();
+        r = _remove(cid, oid, t);
+       }
+      break;
+
+    case Transaction::OP_COLL_MOVE:
+      {
+        // WARNING: this is deprecated and buggy; only here to replay old journals.
+        coll_t ocid = i.get_cid();
+        coll_t ncid = i.get_cid();
+        ghobject_t oid = i.get_oid();
+        r = _collection_add(ocid, ncid, oid, t);
+        if (r == 0)
+          r = _remove(ocid, oid, t);
+      }
+      break;
+
+    case Transaction::OP_COLL_MOVE_RENAME:
+      {
+        coll_t oldcid = i.get_cid();
+        ghobject_t oldoid = i.get_oid();
+        coll_t newcid = i.get_cid();
+        ghobject_t newoid = i.get_oid();
+        r = _collection_move_rename(oldcid, oldoid, newcid, newoid, t);
+      }
+      break;
+
+    case Transaction::OP_COLL_SETATTR:
+      {
+        coll_t cid = i.get_cid();
+        string name = i.get_attrname();
+        bufferlist bl;
+        i.get_bl(bl);
+        r = _collection_setattr(cid, name.c_str(), bl.c_str(), bl.length(), t);
+      }
+      break;
+
+    case Transaction::OP_COLL_RMATTR:
+      {
+        coll_t cid = i.get_cid();
+        string name = i.get_attrname();
+        r = _collection_rmattr(cid, name.c_str(), t);
+      }
+      break;
+
+    case Transaction::OP_STARTSYNC:
+      {
+        start_sync();
+        break;
+      }
+
+    case Transaction::OP_COLL_RENAME:
+      {
+        coll_t cid(i.get_cid());
+        coll_t ncid(i.get_cid());
+        r = _collection_rename(cid, ncid, t);
+      }
+      break;
+
+    case Transaction::OP_OMAP_CLEAR:
+      {
+        coll_t cid(i.get_cid());
+        ghobject_t oid = i.get_oid();
+        r = _omap_clear(cid, oid, t);
+      }
+      break;
+    case Transaction::OP_OMAP_SETKEYS:
+      {
+        coll_t cid(i.get_cid());
+        ghobject_t oid = i.get_oid();
+        map<string, bufferlist> aset;
+        i.get_attrset(aset);
+        r = _omap_setkeys(cid, oid, aset, t);
+      }
+      break;
+    case Transaction::OP_OMAP_RMKEYS:
+      {
+        coll_t cid(i.get_cid());
+        ghobject_t oid = i.get_oid();
+        set<string> keys;
+        i.get_keyset(keys);
+        r = _omap_rmkeys(cid, oid, keys, t);
+      }
+      break;
+    case Transaction::OP_OMAP_RMKEYRANGE:
+      {
+        coll_t cid(i.get_cid());
+        ghobject_t oid = i.get_oid();
+        string first, last;
+        first = i.get_key();
+        last = i.get_key();
+        r = _omap_rmkeyrange(cid, oid, first, last, t);
+      }
+      break;
+    case Transaction::OP_OMAP_SETHEADER:
+      {
+        coll_t cid(i.get_cid());
+        ghobject_t oid = i.get_oid();
+        bufferlist bl;
+        i.get_bl(bl);
+        r = _omap_setheader(cid, oid, bl, t);
+      }
+      break;
+    case Transaction::OP_SPLIT_COLLECTION:
+      {
+        coll_t cid(i.get_cid());
+        uint32_t bits(i.get_u32());
+        uint32_t rem(i.get_u32());
+        coll_t dest(i.get_cid());
+        r = _split_collection_create(cid, bits, rem, dest, t);
+      }
+      break;
+    case Transaction::OP_SPLIT_COLLECTION2:
+      {
+        coll_t cid(i.get_cid());
+        uint32_t bits(i.get_u32());
+        uint32_t rem(i.get_u32());
+        coll_t dest(i.get_cid());
+        r = _split_collection(cid, bits, rem, dest, t);
+      }
+      break;
+
+    default:
+      derr << "bad op " << op << dendl;
+      assert(0);
+    }
+
+    if (r < 0) {
+      bool ok = false;
+
+      if (r == -ENOENT && !(op == Transaction::OP_CLONERANGE ||
+                            op == Transaction::OP_CLONE ||
+                            op == Transaction::OP_CLONERANGE2))
+        // -ENOENT is normally okay
+        // ...including on a replayed OP_RMCOLL with checkpoint mode
+        ok = true;
+      if (r == -ENODATA)
+        ok = true;
+
+      if (!ok) {
+        const char *msg = "unexpected error code";
+
+        if (r == -ENOENT && (op == Transaction::OP_CLONERANGE ||
+                            op == Transaction::OP_CLONE ||
+                            op == Transaction::OP_CLONERANGE2))
+          msg = "ENOENT on clone suggests osd bug";
+
+        if (r == -ENOSPC)
+          // For now, if we hit _any_ ENOSPC, crash, before we do any damage
+          // by partially applying transactions.
+          msg = "ENOSPC handling not implemented";
+
+        if (r == -ENOTEMPTY) {
+          msg = "ENOTEMPTY suggests garbage data in osd data dir";
+        }
+
+        dout(0) << " error " << cpp_strerror(r) << " not handled on operation "
+                << op << " (" << spos << ", or op " << spos.op
+                << ", counting from 0)" << dendl;
+        dout(0) << msg << dendl;
+        dout(0) << " transaction dump:\n";
+        JSONFormatter f(true);
+        f.open_object_section("transaction");
+        transaction.dump(&f);
+        f.close_section();
+        f.flush(*_dout);
+        *_dout << dendl;
+        assert(0 == "unexpected error");
+
+        if (r == -EMFILE) {
+          dump_open_fds(g_ceph_context);
+        }
+      }
+    }
+
+    spos.op++;
+  }
+
+  return 0;  // FIXME count errors
+}
+
+
+// =========== KeyValueStore Op Implementation ==============
+// objects
+
+int KeyValueStore::_check_coll(const coll_t &cid)
+{
+  if (is_coll_obj(cid))
+    return 0;
+
+  StripObjectMap::StripObjectHeader header;
+  int r = backend->lookup_strip_header(get_coll_for_coll(),
+                                       make_ghobject_for_coll(cid), header);
+  if (r < 0) {
+    dout(10) << __func__ << " could not find header r = " << r << dendl;
+    return -ENOENT;
+  }
+
+  return 0;
+}
+
+bool KeyValueStore::exists(coll_t cid, const ghobject_t& oid)
+{
+  dout(10) << __func__ << "collection: " << cid << " object: " << oid
+           << dendl;
+  int r;
+  StripObjectMap::StripObjectHeader header;
+
+  r = _check_coll(cid);
+  if (r < 0) {
+    return r;
+  }
+
+  r = backend->lookup_strip_header(cid, oid, header);
+  if (r < 0) {
+    return false;
+  }
+
+  return true;
+}
+
+int KeyValueStore::stat(coll_t cid, const ghobject_t& oid,
+                        struct stat *st, bool allow_eio)
+{
+  dout(10) << "stat " << cid << "/" << oid << dendl;
+
+  StripObjectMap::StripObjectHeader header;
+
+  int r = _check_coll(cid);
+  if (r < 0) {
+    return r;
+  }
+
+  r = backend->lookup_strip_header(cid, oid, header);
+  if (r < 0) {
+    dout(10) << "stat " << cid << "/" << oid << "=" << r << dendl;
+    return -ENOENT;
+  }
+
+  st->st_blocks = header.max_size / header.strip_size;
+  if (header.max_size % header.strip_size)
+    st->st_blocks++;
+  st->st_nlink = 1;
+  st->st_size = header.max_size;
+  st->st_blksize = header.strip_size;
+
+  return r;
+}
+
+int KeyValueStore::_generic_read(coll_t cid, const ghobject_t& oid,
+                                 uint64_t offset, size_t len, bufferlist& bl,
+                                 bool allow_eio, BufferTransaction *bt)
+{
+  dout(15) << __func__ << " " << cid << "/" << oid << " " << offset << "~"
+           << len << dendl;
+
+  int r;
+  StripObjectMap::StripObjectHeader header;
+
+  r = _check_coll(cid);
+  if (r < 0) {
+    return r;
+  }
+
+  // use strip_header buffer
+  if (bt) {
+    StripObjectMap::StripObjectHeader *cache_header;
+    r = bt->lookup_cached_header(cid, oid, &cache_header, false);
+    if (r == 0) {
+      header = *cache_header;
+    }
+  } else {
+    r = backend->lookup_strip_header(cid, oid, header);
+  }
+
+  if (r < 0) {
+    dout(10) << __func__ << " " << cid << "/" << oid << " " << offset << "~"
+              << len << " header isn't exist: r = " << r << dendl;
+    return r;
+  }
+
+  if (header.max_size < offset) {
+    r = -EINVAL;
+    dout(10) << __func__ << " " << cid << "/" << oid << ")"
+             << " offset exceed the length of bl"<< dendl;
+    return r;
+  }
+
+  if (len == 0)
+    len = header.max_size - offset;
+
+  if (offset + len > header.max_size)
+    len = header.max_size - offset;
+
+  vector<StripObjectMap::StripExtent> extents;
+  StripObjectMap::file_to_extents(offset, len, header.strip_size,
+                                  extents);
+  map<string, bufferlist> out;
+  set<string> keys;
+
+  for (vector<StripObjectMap::StripExtent>::iterator iter = extents.begin();
+       iter != extents.end(); ++iter) {
+    bufferlist old;
+    string key = strip_object_key(iter->no);
+
+    if (bt && header.buffers.count(make_pair(OBJECT_STRIP_PREFIX, key))) {
+      assert(header.bits[iter->no]);
+      out[key] = header.buffers[make_pair(OBJECT_STRIP_PREFIX, key)];
+    } else if (header.bits[iter->no]) {
+      keys.insert(key);
+    }
+  }
+
+  r = backend->get_values(cid, oid, OBJECT_STRIP_PREFIX, keys, &out);
+  if (r < 0) {
+    dout(10) << __func__ << " " << cid << "/" << oid << " " << offset << "~"
+             << len << " = " << r << dendl;
+    return r;
+  }
+  if (out.size() != keys.size()) {
+    r = -EINVAL;
+    dout(10) << __func__ << " " << cid << "/" << oid << " " << offset << "~"
+             << len << " get incorrect key/value pairs " << dendl;
+    return r;
+  }
+
+  uint64_t readed = 0;
+
+  for (vector<StripObjectMap::StripExtent>::iterator iter = extents.begin();
+       iter != extents.end(); ++iter) {
+    string key = strip_object_key(iter->no);
+    if (readed + header.strip_size > header.max_size) {
+      if (header.bits[iter->no]) {
+        out[key].copy(0, iter->len, bl);
+      } else {
+        bl.append_zero(iter->len);
+      }
+
+      break;
+    }
+
+    if (header.bits[iter->no]) {
+      bl.append(out[key]);
+    } else {
+      bl.append_zero(header.strip_size);
+    }
+    readed += header.strip_size;
+  }
+
+  dout(10) << __func__ << " " << cid << "/" << oid << " " << offset
+           << "~" << bl.length() << "/" << len << " r = " << r << dendl;
+
+  return bl.length();
+}
+
+
+int KeyValueStore::read(coll_t cid, const ghobject_t& oid, uint64_t offset,
+                        size_t len, bufferlist& bl, bool allow_eio)
+{
+  return _generic_read(cid, oid, offset, len, bl, allow_eio);
+}
+
+int KeyValueStore::fiemap(coll_t cid, const ghobject_t& oid,
+                          uint64_t offset, size_t len, bufferlist& bl)
+{
+  dout(10) << __func__ << " " << cid << " " << oid << " " << offset << "~"
+           << len << dendl;
+  int r;
+  StripObjectMap::StripObjectHeader header;
+
+  r = _check_coll(cid);
+  if (r < 0) {
+    return r;
+  }
+
+  r = backend->lookup_strip_header(cid, oid, header);
+  if (r < 0) {
+    dout(10) << "fiemap " << cid << "/" << oid << " " << offset << "~" << len
+             << " failed to get header: r = " << r << dendl;
+    return r;
+  }
+
+  vector<StripObjectMap::StripExtent> extents;
+  StripObjectMap::file_to_extents(offset, len, header.strip_size,
+                                  extents);
+
+  map<uint64_t, uint64_t> m;
+  for (vector<StripObjectMap::StripExtent>::iterator iter = extents.begin();
+       iter != extents.end(); ++iter) {
+    m[iter->offset] = iter->len;
+  }
+  ::encode(m, bl);
+  return 0;
+}
+
+int KeyValueStore::_remove(coll_t cid, const ghobject_t& oid,
+                           BufferTransaction &t)
+{
+  dout(15) << __func__ << " " << cid << "/" << oid << dendl;
+
+  int r;
+  StripObjectMap::StripObjectHeader *header;
+
+  r = t.lookup_cached_header(cid, oid, &header, false);
+  if (r < 0) {
+    dout(10) << __func__ << " " << cid << "/" << oid << " "
+             << " failed to get header: r = " << r << dendl;
+    return r;
+  }
+
+  r = t.clear_buffer(header);
+
+  dout(10) << __func__ << " " << cid << "/" << oid << " = " << r << dendl;
+  return r;
+}
+
+int KeyValueStore::_truncate(coll_t cid, const ghobject_t& oid, uint64_t size,
+                             BufferTransaction &t)
+{
+  dout(15) << __func__ << " " << cid << "/" << oid << " size " << size
+           << dendl;
+
+  int r;
+  StripObjectMap::StripObjectHeader *header;
+
+  r = t.lookup_cached_header(cid, oid, &header, false);
+  if (r < 0) {
+    dout(10) << __func__ << " " << cid << "/" << oid << " " << size
+             << " failed to get header: r = " << r << dendl;
+    return r;
+  }
+
+  if (header->max_size == size)
+    return 0;
+
+  if (header->max_size > size) {
+    vector<StripObjectMap::StripExtent> extents;
+    StripObjectMap::file_to_extents(size, header->max_size,
+                                    header->strip_size, extents);
+    assert(extents.size());
+
+    vector<StripObjectMap::StripExtent>::iterator iter = extents.begin();
+    if (iter->offset != 0) {
+      bufferlist value;
+      bufferlist old;
+      map<string, bufferlist> values;
+      r = t.get_buffer_key(header, OBJECT_STRIP_PREFIX,
+                           strip_object_key(iter->no), old);
+      if (r < 0) {
+        dout(10) << __func__ << " " << cid << "/" << oid << " "
+                 << size << " = " << r << dendl;
+        return r;
+      }
+
+      old.copy(0, iter->offset, value);
+      value.append_zero(header->strip_size-iter->offset);
+      assert(value.length() == header->strip_size);
+      ++iter;
+
+      values[strip_object_key(iter->no)] = value;
+      t.set_buffer_keys(OBJECT_STRIP_PREFIX, header, values);
+    }
+
+    set<string> keys;
+    for (; iter != extents.end(); ++iter) {
+      if (header->bits[iter->no]) {
+        keys.insert(strip_object_key(iter->no));
+        header->bits[iter->no] = 0;
+      }
+    }
+    r = t.remove_buffer_keys(OBJECT_STRIP_PREFIX, header, keys);
+    if (r < 0) {
+      dout(10) << __func__ << " " << cid << "/" << oid << " "
+               << size << " = " << r << dendl;
+      return r;
+    }
+  }
+
+  header->bits.resize(size/header->strip_size+1);
+  header->max_size = size;
+
+  dout(10) << __func__ << " " << cid << "/" << oid << " size " << size << " = "
+           << r << dendl;
+  return r;
+}
+
+int KeyValueStore::_touch(coll_t cid, const ghobject_t& oid,
+                          BufferTransaction &t)
+{
+  dout(15) << __func__ << " " << cid << "/" << oid << dendl;
+
+  int r;
+  StripObjectMap::StripObjectHeader *header;
+
+  r = t.lookup_cached_header(cid, oid, &header, true);
+  if (r < 0) {
+    dout(10) << __func__ << " " << cid << "/" << oid << " "
+             << " failed to get header: r = " << r << dendl;
+    r = -EINVAL;
+    return r;
+  }
+
+  dout(10) << __func__ << " " << cid << "/" << oid << " = " << r << dendl;
+  return r;
+}
+
+int KeyValueStore::_write(coll_t cid, const ghobject_t& oid,
+                          uint64_t offset, size_t len, const bufferlist& bl,
+                          BufferTransaction &t, bool replica)
+{
+  dout(15) << __func__ << " " << cid << "/" << oid << " " << offset << "~"
+           << len << dendl;
+  int r;
+  StripObjectMap::StripObjectHeader *header;
+
+  r = t.lookup_cached_header(cid, oid, &header, true);
+  if (r < 0) {
+    dout(10) << __func__ << " " << cid << "/" << oid << " " << offset
+             << "~" << len << " failed to get header: r = " << r << dendl;
+    return r;
+  }
+
+  if (len > bl.length())
+    len = bl.length();
+
+  if (len + offset > header->max_size) {
+    header->max_size = len + offset;
+    header->bits.resize(header->max_size/header->strip_size+1);
+  }
+
+  vector<StripObjectMap::StripExtent> extents;
+  StripObjectMap::file_to_extents(offset, len, header->strip_size,
+                                  extents);
+  uint64_t bl_offset = 0;
+  map<string, bufferlist> values;
+  for (vector<StripObjectMap::StripExtent>::iterator iter = extents.begin();
+       iter != extents.end(); ++iter) {
+    bufferlist value;
+    string key = strip_object_key(iter->no);
+    if (header->bits[iter->no]) {
+      if (iter->offset == 0 && iter->len == header->strip_size) {
+        bl.copy(bl_offset, iter->len, value);
+        bl_offset += iter->len;
+      } else {
+        bufferlist old;
+        r = t.get_buffer_key(header, OBJECT_STRIP_PREFIX, key, old);
+        if (r < 0) {
+          dout(10) << __func__ << " failed to get value " << cid << "/" << oid
+                   << " " << offset << "~" << len << " = " << r << dendl;
+          return r;
+        }
+
+        old.copy(0, iter->offset, value);
+        bl.copy(bl_offset, iter->len, value);
+        bl_offset += iter->len;
+
+        if (value.length() != header->strip_size)
+          old.copy(value.length(), header->strip_size-value.length(), value);
+      }
+    } else {
+      if (iter->offset)
+        value.append_zero(iter->offset);
+      bl.copy(bl_offset, iter->len, value);
+      bl_offset += iter->len;
+
+      if (value.length() < header->strip_size)
+        value.append_zero(header->strip_size-value.length());
+
+      header->bits[iter->no] = 1;
+    }
+    assert(value.length() == header->strip_size);
+    values[key].swap(value);
+  }
+  assert(bl_offset == len);
+
+  t.set_buffer_keys(OBJECT_STRIP_PREFIX, header, values);
+  dout(10) << __func__ << " " << cid << "/" << oid << " " << offset << "~" << len
+           << " = " << r << dendl;
+
+  return r;
+}
+
+int KeyValueStore::_zero(coll_t cid, const ghobject_t& oid, uint64_t offset,
+                         size_t len, BufferTransaction &t)
+{
+  dout(15) << __func__ << " " << cid << "/" << oid << " " << offset << "~" << len << dendl;
+
+  bufferptr bp(len);
+  bp.zero();
+  bufferlist bl;
+  bl.push_back(bp);
+  int r = _write(cid, oid, offset, len, bl, t);
+
+  dout(20) << __func__ << " " << cid << "/" << oid << " " << offset << "~"
+           << len << " = " << r << dendl;
+  return r;
+}
+
+int KeyValueStore::_clone(coll_t cid, const ghobject_t& oldoid,
+                          const ghobject_t& newoid, BufferTransaction &t)
+{
+  dout(15) << __func__ << " " << cid << "/" << oldoid << " -> " << cid << "/"
+           << newoid << dendl;
+
+  if (oldoid == newoid)
+    return 0;
+
+  int r;
+  StripObjectMap::StripObjectHeader *old_header;
+
+  r = t.lookup_cached_header(cid, oldoid, &old_header, false);
+  if (r < 0) {
+    dout(10) << __func__ << " " << cid << "/" << oldoid << " -> " << cid << "/"
+             << newoid << " = " << r << dendl;
+    return r;
+  }
+
+  t.clone_buffer(old_header, cid, newoid);
+
+  dout(10) << __func__ << " " << cid << "/" << oldoid << " -> " << cid << "/"
+           << newoid << " = " << r << dendl;
+  return r;
+}
+
+int KeyValueStore::_clone_range(coll_t cid, const ghobject_t& oldoid,
+                                const ghobject_t& newoid, uint64_t srcoff,
+                                uint64_t len, uint64_t dstoff,
+                                BufferTransaction &t)
+{
+  dout(15) << __func__ << " " << cid << "/" << oldoid << " -> " << cid << "/"
+           << newoid << " " << srcoff << "~" << len << " to " << dstoff
+           << dendl;
+
+  int r;
+  bufferlist bl;
+
+  r = _generic_read(cid, oldoid, srcoff, len, bl, &t);
+  if (r < 0)
+    goto out;
+
+  r = _write(cid, newoid, dstoff, len, bl, t);
+
+ out:
+  dout(10) << __func__ << " " << cid << "/" << oldoid << " -> " << cid << "/"
+           << newoid << " " << srcoff << "~" << len << " to " << dstoff
+           << " = " << r << dendl;
+  return r;
+}
+
+// attrs
+
+int KeyValueStore::getattr(coll_t cid, const ghobject_t& oid, const char *name,
+                           bufferptr &bp)
+{
+  dout(15) << __func__ << " " << cid << "/" << oid << " '" << name << "'"
+           << dendl;
+
+  int r;
+  map<string, bufferlist> got;
+  set<string> to_get;
+
+  r = _check_coll(cid);
+  if (r < 0) {
+    return r;
+  }
+
+  to_get.insert(string(name));
+  r = backend->get_values(cid, oid, OBJECT_XATTR, to_get, &got);
+  if (r < 0 && r != -ENOENT) {
+    dout(10) << __func__ << " get_xattrs err r =" << r << dendl;
+    goto out;
+  }
+  if (got.empty()) {
+    dout(10) << __func__ << " got.size() is 0" << dendl;
+    return -ENODATA;
+  }
+  bp = bufferptr(got.begin()->second.c_str(),
+                 got.begin()->second.length());
+  r = 0;
+
+ out:
+  dout(10) << __func__ << " " << cid << "/" << oid << " '" << name << "' = "
+           << r << dendl;
+  return r;
+}
+
+int KeyValueStore::getattrs(coll_t cid, const ghobject_t& oid,
+                           map<string,bufferptr>& aset, bool user_only)
+{
+  int r;
+  map<string, bufferlist> attr_aset;
+
+  r = _check_coll(cid);
+  if (r < 0) {
+    return r;
+  }
+
+  r = backend->get(cid, oid, OBJECT_XATTR, &attr_aset);
+  if (r < 0 && r != -ENOENT) {
+    dout(10) << __func__ << " could not get attrs r = " << r << dendl;
+    goto out;
+  }
+
+  if (r == -ENOENT)
+    r = 0;
+
+  for (map<string, bufferlist>::iterator i = attr_aset.begin();
+       i != attr_aset.end(); ++i) {
+    string key;
+    if (user_only) {
+      if (i->first[0] != '_')
+        continue;
+      if (i->first == "_")
+        continue;
+      key = i->first.substr(1, i->first.size());
+    } else {
+      key = i->first;
+    }
+    aset.insert(make_pair(key,
+                bufferptr(i->second.c_str(), i->second.length())));
+  }
+
+ out:
+  dout(10) << __func__ << " " << cid << "/" << oid << " = " << r << dendl;
+
+  return r;
+}
+
+int KeyValueStore::_setattrs(coll_t cid, const ghobject_t& oid,
+                             map<string, bufferptr>& aset,
+                             BufferTransaction &t)
+{
+  dout(15) << __func__ << " " << cid << "/" << oid << dendl;
+
+  int r;
+
+  StripObjectMap::StripObjectHeader *header;
+  map<string, bufferlist> attrs;
+
+  r = t.lookup_cached_header(cid, oid, &header, false);
+  if (r < 0)
+    goto out;
+
+  for (map<string, bufferptr>::iterator it = aset.begin();
+       it != aset.end(); ++it) {
+    attrs[it->first].push_back(it->second);
+  }
+
+  t.set_buffer_keys(OBJECT_XATTR, header, attrs);
+
+out:
+  dout(10) << __func__ << " " << cid << "/" << oid << " = " << r << dendl;
+  return r;
+}
+
+
+int KeyValueStore::_rmattr(coll_t cid, const ghobject_t& oid, const char *name,
+                           BufferTransaction &t)
+{
+  dout(15) << __func__ << " " << cid << "/" << oid << " '" << name << "'"
+           << dendl;
+
+  int r;
+  set<string> to_remove;
+  StripObjectMap::StripObjectHeader *header;
+
+  r = t.lookup_cached_header(cid, oid, &header, false);
+  if (r < 0) {
+    dout(10) << __func__ << " could not find header r = " << r
+             << dendl;
+    return r;
+  }
+
+  to_remove.insert(string(name));
+  r = t.remove_buffer_keys(OBJECT_XATTR, header, to_remove);
+
+  dout(10) << __func__ << " " << cid << "/" << oid << " '" << name << "' = "
+           << r << dendl;
+  return r;
+}
+
+int KeyValueStore::_rmattrs(coll_t cid, const ghobject_t& oid,
+                            BufferTransaction &t)
+{
+  dout(15) << __func__ << " " << cid << "/" << oid << dendl;
+
+  int r;
+  set<string> attrs;
+
+  StripObjectMap::StripObjectHeader *header;
+
+  r = t.lookup_cached_header(cid, oid, &header, false);
+  if (r < 0) {
+    dout(10) << __func__ << " could not find header r = " << r
+             << dendl;
+    return r;
+  }
+
+  r = backend->get_keys(cid, oid, OBJECT_XATTR, &attrs);
+  if (r < 0 && r != -ENOENT) {
+    dout(10) << __func__ << " could not get attrs r = " << r << dendl;
+    assert(!m_fail_eio || r != -EIO);
+    return r;
+  }
+
+  r = t.remove_buffer_keys(OBJECT_XATTR, header, attrs);
+  t.clear_buffer_keys(OBJECT_XATTR, header);
+
+  dout(10) << __func__ <<  " " << cid << "/" << oid << " = " << r << dendl;
+  return r;
+}
+
+// collection attrs
+
+int KeyValueStore::collection_getattr(coll_t c, const char *name,
+                                      void *value, size_t size)
+{
+  dout(15) << __func__ << " " << c.to_str() << " '" << name << "' len "
+           << size << dendl;
+
+  bufferlist bl;
+  int r;
+
+  r = collection_getattr(c, name, bl);
+  if (r < 0)
+      goto out;
+
+  if (bl.length() < size) {
+    r = bl.length();
+    bl.copy(0, bl.length(), static_cast<char*>(value));
+  } else {
+    r = size;
+    bl.copy(0, size, static_cast<char*>(value));
+  }
+
+out:
+  dout(10) << __func__ << " " << c.to_str() << " '" << name << "' len "
+           << size << " = " << r << dendl;
+  return r;
+}
+
+int KeyValueStore::collection_getattr(coll_t c, const char *name,
+                                      bufferlist& bl)
+{
+  dout(15) << __func__ << " " << c.to_str() << " '" << name
+           << "'" << dendl;
+
+  int r = _check_coll(c);
+  if (r < 0) {
+    return r;
+  }
+
+  set<string> keys;
+  map<string, bufferlist> out;
+  keys.insert(string(name));
+
+  r = backend->get_values(get_coll_for_coll(), make_ghobject_for_coll(c),
+                          COLLECTION_ATTR, keys, &out);
+  if (r < 0) {
+    dout(10) << __func__ << " could not get key" << string(name) << dendl;
+    r = -EINVAL;
+  }
+
+  assert(out.size());
+  bl.swap(out.begin()->second);
+
+  dout(10) << __func__ << " " << c.to_str() << " '" << name << "' len "
+           << bl.length() << " = " << r << dendl;
+  return bl.length();
+}
+
+int KeyValueStore::collection_getattrs(coll_t cid,
+                                       map<string, bufferptr> &aset)
+{
+  dout(10) << __func__ << " " << cid.to_str() << dendl;
+
+  int r = _check_coll(cid);
+  if (r < 0) {
+    return r;
+  }
+
+  map<string, bufferlist> out;
+  set<string> keys;
+
+  for (map<string, bufferptr>::iterator it = aset.begin();
+       it != aset.end(); it++) {
+      keys.insert(it->first);
+  }
+
+  r = backend->get_values(get_coll_for_coll(), make_ghobject_for_coll(cid),
+                          COLLECTION_ATTR, keys, &out);
+  if (r < 0) {
+    dout(10) << __func__ << " could not get keys" << dendl;
+    r = -EINVAL;
+    goto out;
+  }
+
+  for (map<string, bufferlist>::iterator it = out.begin(); it != out.end();
+       ++it) {
+    bufferptr ptr(it->second.c_str(), it->second.length());
+    aset.insert(make_pair(it->first, ptr));
+  }
+
+ out:
+  dout(10) << __func__ << " " << cid.to_str() << " = " << r << dendl;
+  return r;
+}
+
+int KeyValueStore::_collection_setattr(coll_t c, const char *name,
+                                       const void *value, size_t size,
+                                       BufferTransaction &t)
+{
+  dout(10) << __func__ << " " << c << " '" << name << "' len "
+           << size << dendl;
+
+  int r;
+  bufferlist bl;
+  map<string, bufferlist> out;
+  StripObjectMap::StripObjectHeader *header;
+
+  r = t.lookup_cached_header(get_coll_for_coll(),
+                             make_ghobject_for_coll(c),
+                             &header, false);
+  if (r < 0) {
+    dout(10) << __func__ << " could not find header r = " << r << dendl;
+    return r;
+  }
+
+  bl.append(reinterpret_cast<const char*>(value), size);
+  out.insert(make_pair(string(name), bl));
+
+  t.set_buffer_keys(COLLECTION_ATTR, header, out);
+
+  dout(10) << __func__ << " " << c << " '"
+           << name << "' len " << size << " = " << r << dendl;
+  return r;
+}
+
+int KeyValueStore::_collection_rmattr(coll_t c, const char *name,
+                                      BufferTransaction &t)
+{
+  dout(15) << __func__ << " " << c << dendl;
+
+  int r = _check_coll(c);
+  if (r < 0) {
+    return r;
+  }
+
+  bufferlist bl;
+  set<string> out;
+  StripObjectMap::StripObjectHeader *header;
+
+  r = t.lookup_cached_header(get_coll_for_coll(),
+                             make_ghobject_for_coll(c),
+                             &header, false);
+  if (r < 0) {
+    dout(10) << __func__ << " could not find header r = " << r << dendl;
+    return r;
+  }
+
+  out.insert(string(name));
+  r = t.remove_buffer_keys(COLLECTION_ATTR, header, out);
+
+  dout(10) << __func__ << " " << c << " = " << r << dendl;
+  return r;
+}
+
+int KeyValueStore::_collection_setattrs(coll_t cid,
+                                        map<string,bufferptr>& aset,
+                                        BufferTransaction &t)
+{
+  dout(15) << __func__ << " " << cid << dendl;
+
+  map<string, bufferlist> attrs;
+  StripObjectMap::StripObjectHeader *header;
+  int r = t.lookup_cached_header(get_coll_for_coll(),
+                                 make_ghobject_for_coll(cid),
+                                 &header, false);
+  if (r < 0) {
+    dout(10) << __func__ << " could not find header r = " << r << dendl;
+    return r;
+  }
+
+  for (map<string, bufferptr>::iterator it = aset.begin(); it != aset.end();
+       ++it) {
+    attrs[it->first].push_back(it->second);
+  }
+
+  t.set_buffer_keys(COLLECTION_ATTR, header, attrs);
+
+  dout(10) << __func__ << " " << cid << " = " << r << dendl;
+  return r;
+}
+
+
+// collections
+
+int KeyValueStore::_create_collection(coll_t c, BufferTransaction &t)
+{
+  dout(15) << __func__ << " " << c << dendl;
+
+  int r;
+  StripObjectMap::StripObjectHeader *header;
+  bufferlist bl;
+
+  r = t.lookup_cached_header(get_coll_for_coll(),
+                             make_ghobject_for_coll(c), &header,
+                             false);
+  if (r == 0) {
+    r = -EEXIST;
+    return r;
+  }
+
+  r = t.lookup_cached_header(get_coll_for_coll(),
+                             make_ghobject_for_coll(c), &header,
+                             true);
+
+  dout(10) << __func__ << " cid " << c << " r = " << r << dendl;
+  return r;
+}
+
+int KeyValueStore::_destroy_collection(coll_t c, BufferTransaction &t)
+{
+  dout(15) << __func__ << " " << c << dendl;
+
+  int r;
+  uint64_t modified_object = 0;
+  StripObjectMap::StripObjectHeader *header;
+  vector<ghobject_t> oids;
+
+  r = t.lookup_cached_header(get_coll_for_coll(), make_ghobject_for_coll(c),
+                             &header, false);
+  if (r < 0) {
+    goto out;
+  }
+
+  // All modified objects are marked deleted
+  for (BufferTransaction::StripHeaderMap::iterator iter = t.strip_headers.begin();
+       iter != t.strip_headers.end(); iter++) {
+    // sum the total modified object in this PG
+    if (iter->first.first != c)
+      continue;
+
+    modified_object++;
+    if (!iter->second.deleted) {
+      r = -ENOTEMPTY;
+      goto out;
+    }
+  }
+
+  r = backend->list_objects(c, ghobject_t(), modified_object+1, &oids,
+                            0);
+  // No other object
+  if (oids.size() != modified_object && oids.size() != 0) {
+    r = -ENOTEMPTY;
+    goto out;
+  }
+
+  for(vector<ghobject_t>::iterator iter = oids.begin();
+      iter != oids.end(); ++iter) {
+    if (!t.strip_headers.count(make_pair(c, *iter))) {
+      r = -ENOTEMPTY;
+      goto out;
+    }
+  }
+
+  r = t.clear_buffer(header);
+
+out:
+  dout(10) << __func__ << " " << c << " = " << r << dendl;
+  return r;
+}
+
+
+int KeyValueStore::_collection_add(coll_t c, coll_t oldcid,
+                                   const ghobject_t& o,
+                                   BufferTransaction &t)
+{
+  dout(15) << __func__ <<  " " << c << "/" << o << " from " << oldcid << "/"
+           << o << dendl;
+
+  bufferlist bl;
+  StripObjectMap::StripObjectHeader *header, *old_header;
+
+  int r = t.lookup_cached_header(oldcid, o, &old_header, false);
+  if (r < 0) {
+    goto out;
+  }
+
+  r = t.lookup_cached_header(c, o, &header, false);
+  if (r == 0) {
+    r = -EEXIST;
+    dout(10) << __func__ << " " << c << "/" << o << " from " << oldcid << "/"
+             << o << " already exist " << dendl;
+    goto out;
+  }
+
+  r = _generic_read(oldcid, o, 0, old_header->max_size, bl, &t);
+  if (r < 0) {
+    r = -EINVAL;
+    goto out;
+  }
+
+  r = _write(c, o, 0, bl.length(), bl, t);
+  if (r < 0) {
+    r = -EINVAL;
+  }
+
+out:
+  dout(10) << __func__ << " " << c << "/" << o << " from " << oldcid << "/"
+           << o << " = " << r << dendl;
+  return r;
+}
+
+int KeyValueStore::_collection_move_rename(coll_t oldcid,
+                                           const ghobject_t& oldoid,
+                                           coll_t c, const ghobject_t& o,
+                                           BufferTransaction &t)
+{
+  dout(15) << __func__ << " " << c << "/" << o << " from " << oldcid << "/"
+           << oldoid << dendl;
+  int r;
+  StripObjectMap::StripObjectHeader *header;
+
+  r = t.lookup_cached_header(c, o, &header, false);
+  if (r == 0) {
+    dout(10) << __func__ << " " << oldcid << "/" << oldoid << " -> " << c
+             << "/" << o << " = " << r << dendl;
+    return -EEXIST;
+  }
+
+  r = t.lookup_cached_header(oldcid, oldoid, &header, false);
+  if (r < 0) {
+    dout(10) << __func__ << " " << oldcid << "/" << oldoid << " -> " << c
+             << "/" << o << " = " << r << dendl;
+    return r;
+  }
+
+  t.rename_buffer(header, c, o);
+
+  dout(10) << __func__ << " " << c << "/" << o << " from " << oldcid << "/"
+           << oldoid << " = " << r << dendl;
+  return r;
+}
+
+int KeyValueStore::_collection_remove_recursive(const coll_t &cid,
+                                                BufferTransaction &t)
+{
+  dout(15) << __func__ << " " << cid << dendl;
+
+  StripObjectMap::StripObjectHeader *header;
+
+  int r = t.lookup_cached_header(get_coll_for_coll(),
+                                 make_ghobject_for_coll(cid),
+                                 &header, false);
+  if (r < 0) {
+    return 0;
+  }
+
+  vector<ghobject_t> objects;
+  ghobject_t max;
+  while (!max.is_max()) {
+    r = collection_list_partial(cid, max, 200, 300, 0, &objects, &max);
+    if (r < 0)
+      return r;
+
+    for (vector<ghobject_t>::iterator i = objects.begin();
+         i != objects.end(); ++i) {
+      r = _remove(cid, *i, t);
+
+      if (r < 0)
+        return r;
+    }
+  }
+
+  r = t.clear_buffer(header);
+
+  dout(10) << __func__ << " " << cid  << " r = " << r << dendl;
+  return 0;
+}
+
+int KeyValueStore::_collection_rename(const coll_t &cid, const coll_t &ncid,
+                                      BufferTransaction &t)
+{
+  dout(10) << __func__ << " origin cid " << cid << " new cid " << ncid
+           << dendl;
+  return -EOPNOTSUPP;
+}
+
+int KeyValueStore::list_collections(vector<coll_t>& ls)
+{
+  dout(10) << __func__ << " " << dendl;
+
+  vector<ghobject_t> oids;
+  ghobject_t next;
+  backend->list_objects(get_coll_for_coll(), ghobject_t(), 0, &oids, &next);
+  assert(next == ghobject_t::get_max());
+
+  for (vector<ghobject_t>::const_iterator iter = oids.begin();
+       iter != oids.end(); ++iter) {
+    ls.push_back(coll_t(iter->hobj.oid.name));
+  }
+
+  return 0;
+}
+
+bool KeyValueStore::collection_exists(coll_t c)
+{
+  dout(10) << __func__ << " " << dendl;
+
+  StripObjectMap::StripObjectHeader header;
+
+  int r = _check_coll(c);
+  if (r < 0) {
+    return false;
+  }
+  return true;
+}
+
+bool KeyValueStore::collection_empty(coll_t c)
+{
+  dout(10) << __func__ << " " << dendl;
+
+  int r = _check_coll(c);
+  if (r < 0) {
+    return false;
+  }
+
+  vector<ghobject_t> oids;
+  backend->list_objects(c, ghobject_t(), 1, &oids, 0);
+
+  return oids.empty();
+}
+
+int KeyValueStore::collection_list_range(coll_t c, ghobject_t start,
+                                         ghobject_t end, snapid_t seq,
+                                         vector<ghobject_t> *ls)
+{
+  bool done = false;
+  ghobject_t next = start;
+
+  int r = _check_coll(c);
+  if (r < 0) {
+    return r;
+  }
+
+  while (!done) {
+    vector<ghobject_t> next_objects;
+    r = collection_list_partial(c, next, get_ideal_list_min(),
+                                get_ideal_list_max(), seq,
+                                &next_objects, &next);
+    if (r < 0)
+      return r;
+
+    ls->insert(ls->end(), next_objects.begin(), next_objects.end());
+
+    // special case for empty collection
+    if (ls->empty()) {
+      break;
+    }
+
+    while (!ls->empty() && ls->back() >= end) {
+      ls->pop_back();
+      done = true;
+    }
+
+    if (next >= end) {
+      done = true;
+    }
+  }
+
+  return 0;
+}
+
+int KeyValueStore::collection_list_partial(coll_t c, ghobject_t start,
+                                           int min, int max, snapid_t seq,
+                                           vector<ghobject_t> *ls,
+                                           ghobject_t *next)
+{
+  dout(10) << __func__ << " " << c << " start:" << start << " is_max:"
+           << start.is_max() << dendl;
+
+  if (min < 0 || max < 0)
+      return -EINVAL;
+
+  if (start.is_max())
+      return 0;
+
+  return backend->list_objects(c, start, max, ls, next);
+}
+
+int KeyValueStore::collection_list(coll_t c, vector<ghobject_t>& ls)
+{
+  return collection_list_partial(c, ghobject_t(), 0, 0, 0, &ls, 0);
+}
+
+int KeyValueStore::collection_version_current(coll_t c, uint32_t *version)
+{
+  *version = COLLECTION_VERSION;
+  if (*version == target_version)
+    return 1;
+  else
+    return 0;
+}
+
+// omap
+
+int KeyValueStore::omap_get(coll_t c, const ghobject_t &hoid,
+                            bufferlist *header, map<string, bufferlist> *out)
+{
+  dout(15) << __func__ << " " << c << "/" << hoid << dendl;
+
+  int r = _check_coll(c);
+  if (r < 0) {
+    return r;
+  }
+
+  r = backend->get(c, hoid, OBJECT_OMAP, out);
+  if (r < 0 && r != -ENOENT) {
+    dout(10) << __func__ << " err r =" << r << dendl;
+    return r;
+  }
+
+  return omap_get_header(c, hoid, header, false);
+}
+
+int KeyValueStore::omap_get_header(coll_t c, const ghobject_t &hoid,
+                                   bufferlist *bl, bool allow_eio)
+{
+  dout(15) << __func__ << " " << c << "/" << hoid << dendl;
+
+  int r = _check_coll(c);
+  if (r < 0) {
+    return r;
+  }
+
+  set<string> keys;
+  map<string, bufferlist> got;
+
+  keys.insert(OBJECT_OMAP_HEADER_KEY);
+  r = backend->get_values(c, hoid, OBJECT_OMAP_HEADER, keys, &got);
+  if (r < 0 && r != -ENOENT) {
+    assert(allow_eio || !m_fail_eio || r != -EIO);
+    dout(10) << __func__ << " err r =" << r << dendl;
+    return r;
+  }
+
+  if (got.size()) {
+    assert(got.size() == 1);
+    bl->swap(got.begin()->second);
+  }
+
+  return 0;
+}
+
+int KeyValueStore::omap_get_keys(coll_t c, const ghobject_t &hoid, set<string> *keys)
+{
+  dout(15) << __func__ << " " << c << "/" << hoid << dendl;
+
+  int r = _check_coll(c);
+  if (r < 0) {
+    return r;
+  }
+
+  r = backend->get_keys(c, hoid, OBJECT_OMAP, keys);
+  if (r < 0 && r != -ENOENT) {
+    assert(!m_fail_eio || r != -EIO);
+    return r;
+  }
+  return 0;
+}
+
+int KeyValueStore::omap_get_values(coll_t c, const ghobject_t &hoid,
+                                   const set<string> &keys,
+                                   map<string, bufferlist> *out)
+{
+  dout(15) << __func__ << " " << c << "/" << hoid << dendl;
+
+  int r = _check_coll(c);
+  if (r < 0) {
+    return r;
+  }
+
+  r = backend->get_values(c, hoid, OBJECT_OMAP, keys, out);
+  if (r < 0 && r != -ENOENT) {
+    assert(!m_fail_eio || r != -EIO);
+    return r;
+  }
+  return 0;
+}
+
+int KeyValueStore::omap_check_keys(coll_t c, const ghobject_t &hoid,
+                                   const set<string> &keys, set<string> *out)
+{
+  dout(15) << __func__ << " " << c << "/" << hoid << dendl;
+
+  int r = _check_coll(c);
+  if (r < 0) {
+    return r;
+  }
+
+  r = backend->check_keys(c, hoid, OBJECT_OMAP, keys, out);
+  if (r < 0 && r != -ENOENT) {
+    assert(!m_fail_eio || r != -EIO);
+    return r;
+  }
+  return 0;
+}
+
+ObjectMap::ObjectMapIterator KeyValueStore::get_omap_iterator(
+    coll_t c, const ghobject_t &hoid)
+{
+  dout(15) << __func__ << " " << c << "/" << hoid << dendl;
+  return backend->get_iterator(c, hoid, OBJECT_OMAP);
+}
+
+int KeyValueStore::_omap_clear(coll_t cid, const ghobject_t &hoid,
+                               BufferTransaction &t)
+{
+  dout(15) << __func__ << " " << cid << "/" << hoid << dendl;
+
+  StripObjectMap::StripObjectHeader *header;
+
+  int r = t.lookup_cached_header(cid, hoid, &header, false);
+  if (r < 0) {
+    dout(10) << __func__ << " " << cid << "/" << hoid << " "
+             << " failed to get header: r = " << r << dendl;
+    return r;
+  }
+
+  set<string> keys;
+  r = backend->get_keys(cid, hoid, OBJECT_OMAP, &keys);
+  if (r < 0 && r != -ENOENT) {
+    dout(10) << __func__ << " could not get omap_keys r = " << r << dendl;
+    assert(!m_fail_eio || r != -EIO);
+    return r;
+  }
+
+  r = t.remove_buffer_keys(OBJECT_OMAP, header, keys);
+  if (r < 0) {
+    dout(10) << __func__ << " could not remove keys r = " << r << dendl;
+    return r;
+  }
+
+  keys.clear();
+  keys.insert(OBJECT_OMAP_HEADER_KEY);
+  r = t.remove_buffer_keys(OBJECT_OMAP_HEADER, header, keys);
+  if (r < 0) {
+    dout(10) << __func__ << " could not remove keys r = " << r << dendl;
+    return r;
+  }
+
+  t.clear_buffer_keys(OBJECT_OMAP_HEADER, header);
+
+  dout(10) << __func__ << " " << cid << "/" << hoid << " r = " << r << dendl;
+  return 0;
+}
+
+int KeyValueStore::_omap_setkeys(coll_t cid, const ghobject_t &hoid,
+                                 map<string, bufferlist> &aset,
+                                 BufferTransaction &t)
+{
+  dout(15) << __func__ << " " << cid << "/" << hoid << dendl;
+
+  StripObjectMap::StripObjectHeader *header;
+
+  int r = t.lookup_cached_header(cid, hoid, &header, false);
+  if (r < 0) {
+    dout(10) << __func__ << " " << cid << "/" << hoid << " "
+             << " failed to get header: r = " << r << dendl;
+    return r;
+  }
+
+  t.set_buffer_keys(OBJECT_OMAP, header, aset);
+
+  return 0;
+}
+
+int KeyValueStore::_omap_rmkeys(coll_t cid, const ghobject_t &hoid,
+                                const set<string> &keys,
+                                BufferTransaction &t)
+{
+  dout(15) << __func__ << " " << cid << "/" << hoid << dendl;
+
+  StripObjectMap::StripObjectHeader *header;
+
+  int r = t.lookup_cached_header(cid, hoid, &header, false);
+  if (r < 0) {
+    dout(10) << __func__ << " " << cid << "/" << hoid << " "
+             << " failed to get header: r = " << r << dendl;
+    return r;
+  }
+
+  r = t.remove_buffer_keys(OBJECT_OMAP, header, keys);
+
+  dout(10) << __func__ << " " << cid << "/" << hoid << " r = " << r << dendl;
+  return r;
+}
+
+int KeyValueStore::_omap_rmkeyrange(coll_t cid, const ghobject_t &hoid,
+                                    const string& first, const string& last,
+                                    BufferTransaction &t)
+{
+  dout(15) << __func__ << " " << cid << "/" << hoid << " [" << first << ","
+           << last << "]" << dendl;
+
+  set<string> keys;
+  {
+    ObjectMap::ObjectMapIterator iter = get_omap_iterator(cid, hoid);
+    if (!iter)
+      return -ENOENT;
+
+    for (iter->lower_bound(first); iter->valid() && iter->key() < last;
+         iter->next()) {
+      keys.insert(iter->key());
+    }
+  }
+  return _omap_rmkeys(cid, hoid, keys, t);
+}
+
+int KeyValueStore::_omap_setheader(coll_t cid, const ghobject_t &hoid,
+                                   const bufferlist &bl,
+                                   BufferTransaction &t)
+{
+  dout(15) << __func__ << " " << cid << "/" << hoid << dendl;
+
+  map<string, bufferlist> sets;
+  StripObjectMap::StripObjectHeader *header;
+
+  int r = t.lookup_cached_header(cid, hoid, &header, false);
+  if (r < 0) {
+    dout(10) << __func__ << " " << cid << "/" << hoid << " "
+             << " failed to get header: r = " << r << dendl;
+    return r;
+  }
+
+  sets[OBJECT_OMAP_HEADER_KEY] = bl;
+  t.set_buffer_keys(OBJECT_OMAP_HEADER, header, sets);
+  return 0;
+}
+
+int KeyValueStore::_split_collection(coll_t cid, uint32_t bits, uint32_t rem,
+                                     coll_t dest, BufferTransaction &t)
+{
+  {
+    dout(15) << __func__ << " " << cid << " bits: " << bits << dendl;
+
+    StripObjectMap::StripObjectHeader *header;
+
+    int r = t.lookup_cached_header(get_coll_for_coll(),
+                                   make_ghobject_for_coll(cid),
+                                   &header, false);
+    if (r < 0) {
+      dout(2) << __func__ << ": " << cid << " DNE" << dendl;
+      return 0;
+    }
+
+    r = t.lookup_cached_header(get_coll_for_coll(),
+                               make_ghobject_for_coll(dest),
+                               &header, false);
+    if (r < 0) {
+      dout(2) << __func__ << ": " << dest << " DNE" << dendl;
+      return 0;
+    }
+
+    vector<ghobject_t> objects;
+    ghobject_t next, current;
+    int move_size = 0;
+    while (1) {
+      collection_list_partial(cid, current, get_ideal_list_min(),
+                              get_ideal_list_max(), 0, &objects, &next);
+
+      dout(20) << __func__ << cid << "objects size: " << objects.size()
+              << dendl;
+
+      if (objects.empty())
+        break;
+
+      for (vector<ghobject_t>::iterator i = objects.begin();
+          i != objects.end(); ++i) {
+        if (i->match(bits, rem)) {
+          if (_collection_add(dest, cid, *i, t) < 0) {
+            return -1;
+          }
+          _remove(cid, *i, t);
+          move_size++;
+        }
+      }
+
+      objects.clear();
+      current = next;
+    }
+
+    dout(20) << __func__ << "move" << move_size << " object from " << cid
+             << "to " << dest << dendl;
+  }
+
+  if (g_conf->filestore_debug_verify_split) {
+    vector<ghobject_t> objects;
+    ghobject_t next;
+    while (1) {
+      collection_list_partial(cid, next, get_ideal_list_min(),
+                              get_ideal_list_max(), 0, &objects, &next);
+      if (objects.empty())
+        break;
+
+      for (vector<ghobject_t>::iterator i = objects.begin();
+           i != objects.end(); ++i) {
+        dout(20) << __func__ << ": " << *i << " still in source "
+                 << cid << dendl;
+        assert(!i->match(bits, rem));
+      }
+      objects.clear();
+    }
+
+    next = ghobject_t();
+    while (1) {
+      collection_list_partial(dest, next, get_ideal_list_min(),
+                              get_ideal_list_max(), 0, &objects, &next);
+      if (objects.empty())
+        break;
+
+      for (vector<ghobject_t>::iterator i = objects.begin();
+           i != objects.end(); ++i) {
+        dout(20) << __func__ << ": " << *i << " now in dest "
+                 << *i << dendl;
+        assert(i->match(bits, rem));
+      }
+      objects.clear();
+    }
+  }
+  return 0;
+}
+
+const char** KeyValueStore::get_tracked_conf_keys() const
+{
+  static const char* KEYS[] = {
+    "filestore_min_sync_interval",
+    "filestore_max_sync_interval",
+    "filestore_queue_max_ops",
+    "filestore_queue_max_bytes",
+    "filestore_queue_committing_max_ops",
+    "filestore_queue_committing_max_bytes",
+    "filestore_commit_timeout",
+    "filestore_dump_file",
+    "filestore_kill_at",
+    "filestore_fail_eio",
+    "filestore_replica_fadvise",
+    "filestore_sloppy_crc",
+    "filestore_sloppy_crc_block_size",
+    NULL
+  };
+  return KEYS;
+}
+
+void KeyValueStore::handle_conf_change(const struct md_config_t *conf,
+                                       const std::set <std::string> &changed)
+{
+}
+
+void KeyValueStore::dump_transactions(list<ObjectStore::Transaction*>& ls, uint64_t seq, OpSequencer *osr)
+{
+}
+
+// ============== KeyValueStore Debug EIO Injection =================
+
+void KeyValueStore::inject_data_error(const ghobject_t &oid) {
+  Mutex::Locker l(read_error_lock);
+  dout(10) << __func__ << ": init error on " << oid << dendl;
+  data_error_set.insert(oid);
+}
+
+void KeyValueStore::inject_mdata_error(const ghobject_t &oid) {
+  Mutex::Locker l(read_error_lock);
+  dout(10) << __func__ << ": init error on " << oid << dendl;
+  mdata_error_set.insert(oid);
+}
+
+void KeyValueStore::debug_obj_on_delete(const ghobject_t &oid) {
+  Mutex::Locker l(read_error_lock);
+  dout(10) << __func__ << ": clear error on " << oid << dendl;
+  data_error_set.erase(oid);
+  mdata_error_set.erase(oid);
+}
+
+bool KeyValueStore::debug_data_eio(const ghobject_t &oid) {
+  Mutex::Locker l(read_error_lock);
+  if (data_error_set.count(oid)) {
+    dout(10) << __func__ << ": inject error on " << oid << dendl;
+    return true;
+  } else {
+    return false;
+  }
+}
+
+bool KeyValueStore::debug_mdata_eio(const ghobject_t &oid) {
+  Mutex::Locker l(read_error_lock);
+  if (mdata_error_set.count(oid)) {
+    dout(10) << __func__ << ": inject error on " << oid << dendl;
+    return true;
+  } else {
+    return false;
+  }
+}
diff --git a/src/os/KeyValueStore.h b/src/os/KeyValueStore.h
new file mode 100644 (file)
index 0000000..23d9159
--- /dev/null
@@ -0,0 +1,594 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2013 UnitedStack <haomai@unitedstack.com>
+ *
+ * Author: Haomai Wang <haomaiwang@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+
+#ifndef CEPH_KEYVALUESTORE_H
+#define CEPH_KEYVALUESTORE_H
+
+#include "include/types.h"
+
+#include <map>
+#include <deque>
+#include <boost/scoped_ptr.hpp>
+#include <fstream>
+using namespace std;
+
+#include <ext/hash_map>
+using namespace __gnu_cxx;
+
+#include "include/assert.h"
+
+#include "ObjectStore.h"
+
+#include "common/WorkQueue.h"
+#include "common/Finisher.h"
+#include "common/fd.h"
+
+#include "common/Mutex.h"
+#include "GenericObjectMap.h"
+#include "SequencerPosition.h"
+#include "KeyValueDB.h"
+
+#include "include/uuid.h"
+
+enum kvstore_types {
+    KV_TYPE_NONE = 0,
+    KV_TYPE_LEVELDB,
+    KV_TYPE_OTHER
+};
+
+
+class StripObjectMap: public GenericObjectMap {
+ public:
+
+  struct StripExtent {
+    uint64_t no;
+    uint64_t offset;    // in key
+    uint64_t len;    // in key
+    StripExtent(uint64_t n, uint64_t off, size_t len):
+      no(n), offset(off), len(len) {}
+  };
+
+  // -- strip object --
+  struct StripObjectHeader {
+    // Persistent state
+    uint64_t strip_size;
+    uint64_t max_size;
+    vector<char> bits;
+    SequencerPosition spos;
+
+    // soft state
+    Header header; // FIXME: Hold lock to avoid concurrent operations, it will
+                   // also block read operation which not should be permitted.
+    coll_t cid;
+    ghobject_t oid;
+    bool deleted;
+    map<pair<string, string>, bufferlist> buffers;  // pair(prefix, key)
+
+    StripObjectHeader(): strip_size(default_strip_size), max_size(0), deleted(false) {}
+
+    void encode(bufferlist &bl) const {
+      ENCODE_START(1, 1, bl);
+      ::encode(strip_size, bl);
+      ::encode(max_size, bl);
+      ::encode(bits, bl);
+      ::encode(spos, bl);
+      ENCODE_FINISH(bl);
+    }
+
+    void decode(bufferlist::iterator &bl) {
+      DECODE_START(1, bl);
+      ::decode(strip_size, bl);
+      ::decode(max_size, bl);
+      ::decode(bits, bl);
+      ::decode(spos, bl);
+      DECODE_FINISH(bl);
+    }
+  };
+
+  bool check_spos(const StripObjectHeader &header,
+                  const SequencerPosition *spos);
+  void sync_wrap(StripObjectHeader &strip_header, KeyValueDB::Transaction t,
+                 const SequencerPosition &spos);
+
+  static int file_to_extents(uint64_t offset, size_t len, uint64_t strip_size,
+                             vector<StripExtent> &extents);
+  int lookup_strip_header(const coll_t & cid, const ghobject_t &oid,
+                          StripObjectHeader &header);
+  int save_strip_header(StripObjectHeader &header, KeyValueDB::Transaction t);
+  int create_strip_header(const coll_t &cid, const ghobject_t &oid,
+                          StripObjectHeader &strip_header,
+                          KeyValueDB::Transaction t);
+  void clone_wrap(StripObjectHeader &old_header,
+                  const coll_t &cid, const ghobject_t &oid,
+                  KeyValueDB::Transaction t,
+                  const SequencerPosition &spos,
+                  StripObjectHeader *origin_header,
+                  StripObjectHeader *target_header);
+  void rename_wrap(const coll_t &cid, const ghobject_t &oid,
+                   KeyValueDB::Transaction t,
+                   const SequencerPosition &spos,
+                   StripObjectHeader *header);
+
+
+  StripObjectMap(KeyValueDB *db): GenericObjectMap(db) {}
+
+  static const uint64_t default_strip_size = 1024;
+};
+
+
+class KeyValueStore : public ObjectStore,
+                      public md_config_obs_t {
+ public:
+  objectstore_perf_stat_t get_cur_stats() {
+    objectstore_perf_stat_t ret;
+    return ret;
+  }
+
+  static const uint32_t target_version = 1;
+
+ private:
+  string internal_name; // internal name, used to name the perfcounter instance
+  string basedir;
+  std::string current_fn;
+  std::string current_op_seq_fn;
+  uuid_d fsid;
+
+  int fsid_fd, op_fd, current_fd;
+
+  enum kvstore_types kv_type;
+
+  deque<uint64_t> snaps;
+
+  // ObjectMap
+  boost::scoped_ptr<StripObjectMap> backend;
+
+  Finisher ondisk_finisher;
+
+  Mutex lock;
+
+  int _create_current();
+
+  /// read a uuid from fd
+  int read_fsid(int fd, uuid_d *uuid);
+
+  /// lock fsid_fd
+  int lock_fsid();
+
+  string strip_object_key(uint64_t no) {
+    char n[100];
+    snprintf(n, 100, "%ld", no);
+    return string(n);
+  }
+
+  // A special coll used by store collection info, each obj in this coll
+  // represent a coll_t
+  static bool is_coll_obj(coll_t c) {
+    return c == coll_t("COLLECTIONS");
+  }
+  static coll_t get_coll_for_coll() {
+    return coll_t("COLLECTIONS");
+  }
+  static ghobject_t make_ghobject_for_coll(const coll_t &col) {
+    return ghobject_t(hobject_t(sobject_t(col.to_str(), CEPH_NOSNAP)));
+  }
+
+  // Each transaction has side effect which may influent the following
+  // operations, we need to make it visible for the following within
+  // transaction by caching middle result.
+  // Side effects contains:
+  // 1. Creating/Deleting collection
+  // 2. Creating/Deleting object
+  // 3. Object modify(including omap, xattr)
+  // 4. Clone or rename
+  struct BufferTransaction {
+    typedef pair<coll_t, ghobject_t> uniq_id;
+    typedef map<uniq_id, StripObjectMap::StripObjectHeader> StripHeaderMap;
+
+    //Dirty records
+    StripHeaderMap strip_headers;
+
+    KeyValueStore *store;
+
+    SequencerPosition spos;
+    KeyValueDB::Transaction t;
+
+    int check_coll(const coll_t &cid);
+    int lookup_cached_header(const coll_t &cid, const ghobject_t &oid,
+                             StripObjectMap::StripObjectHeader **strip_header,
+                             bool create_if_missing);
+    int get_buffer_key(StripObjectMap::StripObjectHeader *strip_header,
+                       const string &prefix, const string &key,
+                       bufferlist &out);
+    void set_buffer_keys(const string &prefix,
+                         StripObjectMap::StripObjectHeader *strip_header,
+                         map<string, bufferlist> &bl);
+    int remove_buffer_keys(const string &prefix,
+                           StripObjectMap::StripObjectHeader *strip_header,
+                           const set<string> &keys);
+    void clear_buffer_keys(const string &prefix,
+                           StripObjectMap::StripObjectHeader *strip_header);
+    int clear_buffer(StripObjectMap::StripObjectHeader *strip_header);
+    void clone_buffer(StripObjectMap::StripObjectHeader *old_header,
+                      const coll_t &cid, const ghobject_t &oid);
+    void rename_buffer(StripObjectMap::StripObjectHeader *old_header,
+                       const coll_t &cid, const ghobject_t &oid);
+    int submit_transaction();
+
+    BufferTransaction(KeyValueStore *store,
+                      SequencerPosition &spos): store(store), spos(spos) {
+      t = store->backend->get_transaction();
+    }
+  };
+
+  // -- op workqueue --
+  struct Op {
+    utime_t start;
+    uint64_t op;
+    list<Transaction*> tls;
+    Context *ondisk, *onreadable, *onreadable_sync;
+    uint64_t ops, bytes;
+    TrackedOpRef osd_op;
+  };
+  class OpSequencer : public Sequencer_impl {
+    Mutex qlock; // to protect q, for benefit of flush (peek/dequeue also protected by lock)
+    list<Op*> q;
+    list<uint64_t> jq;
+    Cond cond;
+   public:
+    Sequencer *parent;
+    Mutex apply_lock;  // for apply mutual exclusion
+
+    void queue(Op *o) {
+      Mutex::Locker l(qlock);
+      q.push_back(o);
+    }
+    Op *peek_queue() {
+      assert(apply_lock.is_locked());
+      return q.front();
+    }
+    Op *dequeue() {
+      assert(apply_lock.is_locked());
+      Mutex::Locker l(qlock);
+      Op *o = q.front();
+      q.pop_front();
+      cond.Signal();
+      return o;
+    }
+    void flush() {
+      Mutex::Locker l(qlock);
+
+      // get max for journal _or_ op queues
+      uint64_t seq = 0;
+      if (!q.empty())
+        seq = q.back()->op;
+      if (!jq.empty() && jq.back() > seq)
+        seq = jq.back();
+
+      if (seq) {
+        // everything prior to our watermark to drain through either/both
+        // queues
+        while ((!q.empty() && q.front()->op <= seq) ||
+                (!jq.empty() && jq.front() <= seq))
+          cond.Wait(qlock);
+      }
+    }
+
+    OpSequencer()
+      : qlock("KeyValueStore::OpSequencer::qlock", false, false),
+       parent(0),
+       apply_lock("KeyValueStore::OpSequencer::apply_lock", false, false) {}
+    ~OpSequencer() {
+      assert(q.empty());
+    }
+
+    const string& get_name() const {
+      return parent->get_name();
+    }
+  };
+
+  friend ostream& operator<<(ostream& out, const OpSequencer& s);
+
+  Sequencer default_osr;
+  deque<OpSequencer*> op_queue;
+  uint64_t op_queue_len, op_queue_bytes;
+  Finisher op_finisher;
+
+  ThreadPool op_tp;
+  struct OpWQ : public ThreadPool::WorkQueue<OpSequencer> {
+    KeyValueStore *store;
+    OpWQ(KeyValueStore *fs, time_t timeout, time_t suicide_timeout,
+         ThreadPool *tp) :
+      ThreadPool::WorkQueue<OpSequencer>("KeyValueStore::OpWQ",
+                                         timeout, suicide_timeout, tp),
+      store(fs) {}
+
+    bool _enqueue(OpSequencer *osr) {
+      store->op_queue.push_back(osr);
+      return true;
+    }
+    void _dequeue(OpSequencer *o) {
+      assert(0);
+    }
+    bool _empty() {
+      return store->op_queue.empty();
+    }
+    OpSequencer *_dequeue() {
+      if (store->op_queue.empty())
+       return NULL;
+      OpSequencer *osr = store->op_queue.front();
+      store->op_queue.pop_front();
+      return osr;
+    }
+    void _process(OpSequencer *osr, ThreadPool::TPHandle &handle) {
+      store->_do_op(osr, handle);
+    }
+    void _process_finish(OpSequencer *osr) {
+      store->_finish_op(osr);
+    }
+    void _clear() {
+      assert(store->op_queue.empty());
+    }
+  } op_wq;
+
+  void _do_op(OpSequencer *osr, ThreadPool::TPHandle &handle);
+  void _finish_op(OpSequencer *osr);
+  Op *build_op(list<Transaction*>& tls, Context *ondisk, Context *onreadable,
+               Context *onreadable_sync, TrackedOpRef osd_op);
+  void queue_op(OpSequencer *osr, Op *o);
+
+  PerfCounters *logger;
+
+ public:
+
+  KeyValueStore(const std::string &base,
+                const char *internal_name = "keyvaluestore",
+                bool update_to=false);
+  ~KeyValueStore();
+
+  int _detect_backend() { kv_type = KV_TYPE_LEVELDB; return 0; }
+  bool test_mount_in_use();
+  int version_stamp_is_valid(uint32_t *version);
+  int update_version_stamp();
+  uint32_t get_target_version() {
+    return target_version;
+  }
+  int peek_journal_fsid(uuid_d *id) {
+    *id = fsid;
+    return 0;
+  }
+
+  int write_version_stamp();
+  int mount();
+  int umount();
+  int get_max_object_name_length();
+  int mkfs();
+  int mkjournal() {return 0;}
+
+  /**
+   ** set_allow_sharded_objects()
+   **
+   ** Before sharded ghobject_t can be specified this function must be called
+   **/
+  void set_allow_sharded_objects() {}
+
+  /**
+   ** get_allow_sharded_objects()
+   **
+   ** return value: true if set_allow_sharded_objects() called, otherwise false
+   **/
+  bool get_allow_sharded_objects() {return false;}
+
+  int statfs(struct statfs *buf);
+
+  int _do_transactions(
+    list<Transaction*> &tls, uint64_t op_seq,
+    ThreadPool::TPHandle *handle);
+  int do_transactions(list<Transaction*> &tls, uint64_t op_seq) {
+    return _do_transactions(tls, op_seq, 0);
+  }
+  unsigned _do_transaction(Transaction& transaction,
+                           BufferTransaction &bt,
+                           SequencerPosition& spos,
+                           ThreadPool::TPHandle *handle);
+
+  int queue_transactions(Sequencer *osr, list<Transaction*>& tls,
+                         TrackedOpRef op = TrackedOpRef(),
+                         ThreadPool::TPHandle *handle = NULL);
+
+
+  // ------------------
+  // objects
+
+  // Read operation need call "check_coll", checking "coll_t" in write
+  // operation is done by lookup_cached_header
+  int _check_coll(const coll_t &cid);
+
+  int _generic_read(coll_t cid, const ghobject_t& oid, uint64_t offset,
+                    size_t len, bufferlist& bl, bool allow_eio = false,
+                    BufferTransaction *bt = 0);
+  bool exists(coll_t cid, const ghobject_t& oid);
+  int stat(coll_t cid, const ghobject_t& oid, struct stat *st,
+           bool allow_eio = false);
+  int read(coll_t cid, const ghobject_t& oid, uint64_t offset, size_t len,
+           bufferlist& bl, bool allow_eio = false);
+  int fiemap(coll_t cid, const ghobject_t& oid, uint64_t offset, size_t len,
+             bufferlist& bl);
+
+  int _touch(coll_t cid, const ghobject_t& oid, BufferTransaction &t);
+  int _write(coll_t cid, const ghobject_t& oid, uint64_t offset, size_t len,
+             const bufferlist& bl, BufferTransaction &t, bool replica = false);
+  int _zero(coll_t cid, const ghobject_t& oid, uint64_t offset, size_t len,
+            BufferTransaction &t);
+  int _truncate(coll_t cid, const ghobject_t& oid, uint64_t size,
+                BufferTransaction &t);
+  int _clone(coll_t cid, const ghobject_t& oldoid, const ghobject_t& newoid,
+             BufferTransaction &t);
+  int _clone_range(coll_t cid, const ghobject_t& oldoid,
+                   const ghobject_t& newoid, uint64_t srcoff,
+                   uint64_t len, uint64_t dstoff, BufferTransaction &t);
+  int _remove(coll_t cid, const ghobject_t& oid, BufferTransaction &t);
+
+
+  void start_sync() {}
+  void sync() {}
+  void flush() {}
+  void sync_and_flush() {}
+
+  void set_fsid(uuid_d u) { fsid = u; }
+  uuid_d get_fsid() { return fsid; }
+
+  // DEBUG read error injection, an object is removed from both on delete()
+  Mutex read_error_lock;
+  set<ghobject_t> data_error_set; // read() will return -EIO
+  set<ghobject_t> mdata_error_set; // getattr(),stat() will return -EIO
+  void inject_data_error(const ghobject_t &oid);
+  void inject_mdata_error(const ghobject_t &oid);
+  void debug_obj_on_delete(const ghobject_t &oid);
+  bool debug_data_eio(const ghobject_t &oid);
+  bool debug_mdata_eio(const ghobject_t &oid);
+
+  // attrs
+  int getattr(coll_t cid, const ghobject_t& oid, const char *name,
+              bufferptr &bp);
+  int getattrs(coll_t cid, const ghobject_t& oid, map<string,bufferptr>& aset,
+               bool user_only = false);
+
+  int _setattrs(coll_t cid, const ghobject_t& oid,
+                map<string, bufferptr>& aset, BufferTransaction &t);
+  int _rmattr(coll_t cid, const ghobject_t& oid, const char *name,
+              BufferTransaction &t);
+  int _rmattrs(coll_t cid, const ghobject_t& oid, BufferTransaction &t);
+
+  int collection_getattr(coll_t c, const char *name, void *value, size_t size);
+  int collection_getattr(coll_t c, const char *name, bufferlist& bl);
+  int collection_getattrs(coll_t cid, map<string,bufferptr> &aset);
+
+  int _collection_setattr(coll_t c, const char *name, const void *value,
+                          size_t size, BufferTransaction &t);
+  int _collection_rmattr(coll_t c, const char *name, BufferTransaction &t);
+  int _collection_setattrs(coll_t cid, map<string,bufferptr> &aset,
+                           BufferTransaction &t);
+
+  // collections
+  int _create_collection(coll_t c, BufferTransaction &t);
+  int _destroy_collection(coll_t c, BufferTransaction &t);
+  int _collection_add(coll_t c, coll_t ocid, const ghobject_t& oid,
+                      BufferTransaction &t);
+  int _collection_move_rename(coll_t oldcid, const ghobject_t& oldoid,
+                              coll_t c, const ghobject_t& o,
+                              BufferTransaction &t);
+  int _collection_remove_recursive(const coll_t &cid,
+                                   BufferTransaction &t);
+  int _collection_rename(const coll_t &cid, const coll_t &ncid,
+                         BufferTransaction &t);
+  int list_collections(vector<coll_t>& ls);
+  bool collection_exists(coll_t c);
+  bool collection_empty(coll_t c);
+  int collection_list(coll_t c, vector<ghobject_t>& oid);
+  int collection_list_partial(coll_t c, ghobject_t start,
+                              int min, int max, snapid_t snap,
+                              vector<ghobject_t> *ls, ghobject_t *next);
+  int collection_list_range(coll_t c, ghobject_t start, ghobject_t end,
+                            snapid_t seq, vector<ghobject_t> *ls);
+  int collection_version_current(coll_t c, uint32_t *version);
+
+  // omap (see ObjectStore.h for documentation)
+  int omap_get(coll_t c, const ghobject_t &oid, bufferlist *header,
+               map<string, bufferlist> *out);
+  int omap_get_header(
+    coll_t c,
+    const ghobject_t &oid,
+    bufferlist *out,
+    bool allow_eio = false);
+  int omap_get_keys(coll_t c, const ghobject_t &oid, set<string> *keys);
+  int omap_get_values(coll_t c, const ghobject_t &oid, const set<string> &keys,
+                      map<string, bufferlist> *out);
+  int omap_check_keys(coll_t c, const ghobject_t &oid, const set<string> &keys,
+                      set<string> *out);
+  ObjectMap::ObjectMapIterator get_omap_iterator(coll_t c,
+                                                 const ghobject_t &oid);
+
+  void dump_transactions(list<ObjectStore::Transaction*>& ls, uint64_t seq,
+                         OpSequencer *osr);
+
+ private:
+  void _inject_failure() {}
+
+  // omap
+  int _omap_clear(coll_t cid, const ghobject_t &oid,
+                  BufferTransaction &t);
+  int _omap_setkeys(coll_t cid, const ghobject_t &oid,
+                    map<string, bufferlist> &aset,
+                    BufferTransaction &t);
+  int _omap_rmkeys(coll_t cid, const ghobject_t &oid, const set<string> &keys,
+                   BufferTransaction &t);
+  int _omap_rmkeyrange(coll_t cid, const ghobject_t &oid,
+                       const string& first, const string& last,
+                       BufferTransaction &t);
+  int _omap_setheader(coll_t cid, const ghobject_t &oid, const bufferlist &bl,
+                      BufferTransaction &t);
+  int _split_collection(coll_t cid, uint32_t bits, uint32_t rem, coll_t dest,
+                        BufferTransaction &t);
+  int _split_collection_create(coll_t cid, uint32_t bits, uint32_t rem,
+                               coll_t dest, BufferTransaction &t){
+    return 0;
+  }
+
+  virtual const char** get_tracked_conf_keys() const;
+  virtual void handle_conf_change(const struct md_config_t *conf,
+                                  const std::set <std::string> &changed);
+
+  std::string m_osd_rollback_to_cluster_snap;
+  bool m_osd_use_stale_snap;
+  bool m_fail_eio;
+
+  int do_update;
+
+
+  static const string OBJECT_STRIP_PREFIX;
+  static const string OBJECT_XATTR;
+  static const string OBJECT_OMAP;
+  static const string OBJECT_OMAP_HEADER;
+  static const string OBJECT_OMAP_HEADER_KEY;
+  static const string COLLECTION;
+  static const string COLLECTION_ATTR;
+  static const uint32_t COLLECTION_VERSION = 1;
+
+  class SubmitManager {
+    Mutex lock;
+    uint64_t op_seq;
+    uint64_t op_submitted;
+   public:
+    SubmitManager() :
+        lock("JOS::SubmitManager::lock", false, true, false, g_ceph_context),
+        op_seq(0), op_submitted(0)
+    {}
+    uint64_t op_submit_start();
+    void op_submit_finish(uint64_t op);
+    void set_op_seq(uint64_t seq) {
+        Mutex::Locker l(lock);
+        op_submitted = op_seq = seq;
+    }
+    uint64_t get_op_seq() {
+        return op_seq;
+    }
+  } submit_manager;
+};
+
+WRITE_CLASS_ENCODER(StripObjectMap::StripObjectHeader)
+
+#endif
index f89b20894ac889449a8f0559aaf63830d73587b0..2bba7aad6a3c25f6d898527cb1080af11c95dc04 100644 (file)
@@ -1,6 +1,7 @@
 libos_la_SOURCES = \
        os/chain_xattr.cc \
        os/DBObjectMap.cc \
+       os/GenericObjectMap.cc \
        os/FileJournal.cc \
        os/FileStore.cc \
        os/FlatIndex.cc \
@@ -11,6 +12,7 @@ libos_la_SOURCES = \
        os/LevelDBStore.cc \
        os/LFNIndex.cc \
        os/MemStore.cc \
+       os/KeyValueStore.cc \
        os/ObjectStore.cc \
        os/WBThrottle.cc \
        common/TrackedOp.cc
@@ -31,6 +33,7 @@ noinst_HEADERS += \
        os/BtrfsFileStoreBackend.h \
        os/CollectionIndex.h \
        os/DBObjectMap.h \
+       os/GenericObjectMap.h \
        os/FileJournal.h \
        os/FileStore.h \
        os/FlatIndex.h \
@@ -44,6 +47,7 @@ noinst_HEADERS += \
        os/LevelDBStore.h \
        os/LFNIndex.h \
        os/MemStore.h \
+       os/KeyValueStore.h \
        os/ObjectMap.h \
        os/ObjectStore.h \
        os/SequencerPosition.h \
index 8a8375f569974faabdee0748b2d66722c1e307ad..701dc31039ff63abbaef80b1214720c18c613f3c 100644 (file)
@@ -18,6 +18,7 @@
 #include "common/Formatter.h"
 #include "FileStore.h"
 #include "MemStore.h"
+#include "KeyValueStore.h"
 #include "common/safe_io.h"
 
 ObjectStore *ObjectStore::create(CephContext *cct,
@@ -31,6 +32,9 @@ ObjectStore *ObjectStore::create(CephContext *cct,
   if (type == "memstore") {
     return new MemStore(cct, data);
   }
+  if (type == "keyvaluestore") {
+    return new KeyValueStore(data);
+  }
   return NULL;
 }