]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
os/MemStore: implement reference 'memstore' backend
authorSage Weil <sage@inktank.com>
Fri, 6 Dec 2013 00:58:06 +0000 (16:58 -0800)
committerSage Weil <sage@inktank.com>
Fri, 6 Dec 2013 07:13:28 +0000 (23:13 -0800)
This is (as near to) a trivial ObjectStore backend for the OSD as we can
get at the moment.  Everything is stored in memory.  We are slightly
tricky with the locking, but not overly so.

On umount we dump everything out to disk, and on mount we load it all in
again, so we have some very coarse persistence/durability... just enough
to make this usable in a non-failure environment.

Signed-off-by: Sage Weil <sage@inktank.com>
src/os/Makefile.am
src/os/MemStore.cc [new file with mode: 0644]
src/os/MemStore.h [new file with mode: 0644]
src/os/ObjectStore.cc

index 4f12a6a327816e88b1eb4b70159f38debac9afd9..e83d44e091afe178b22d930b32ae1bf7bfdacf5c 100644 (file)
@@ -1,18 +1,19 @@
 libos_la_SOURCES = \
+       os/chain_xattr.cc \
+       os/BtrfsFileStoreBackend.cc \
+       os/DBObjectMap.cc \
        os/FileJournal.cc \
        os/FileStore.cc \
-       os/chain_xattr.cc \
-       os/ObjectStore.cc \
-       os/JournalingObjectStore.cc \
-       os/LFNIndex.cc \
+       os/FlatIndex.cc \
+       os/GenericFileStoreBackend.cc \
        os/HashIndex.cc \
        os/IndexManager.cc \
-       os/FlatIndex.cc \
-       os/DBObjectMap.cc \
+       os/JournalingObjectStore.cc \
        os/LevelDBStore.cc \
+       os/LFNIndex.cc \
+       os/MemStore.cc \
+       os/ObjectStore.cc \
        os/WBThrottle.cc \
-       os/BtrfsFileStoreBackend.cc \
-       os/GenericFileStoreBackend.cc \
        os/ZFSFileStoreBackend.cc \
        common/TrackedOp.cc
 noinst_LTLIBRARIES += libos.la
@@ -20,26 +21,27 @@ noinst_LTLIBRARIES += libos.la
 noinst_HEADERS += \
        os/btrfs_ioctl.h \
        os/chain_xattr.h \
+       os/BtrfsFileStoreBackend.h \
        os/CollectionIndex.h \
+       os/DBObjectMap.h \
        os/FileJournal.h \
        os/FileStore.h \
-       os/BtrfsFileStoreBackend.h \
-       os/GenericFileStoreBackend.h \
-       os/ZFSFileStoreBackend.h \
        os/FlatIndex.h \
-       os/HashIndex.h \
        os/FDCache.h \
-       os/WBThrottle.h \
+       os/GenericFileStoreBackend.h \
+       os/HashIndex.h \
        os/IndexManager.h \
        os/Journal.h \
        os/JournalingObjectStore.h \
+       os/KeyValueDB.h \
+       os/LevelDBStore.h \
        os/LFNIndex.h \
+       os/MemStore.h \
+       os/ObjectMap.h \
        os/ObjectStore.h \
        os/SequencerPosition.h \
-       os/ObjectMap.h \
-       os/DBObjectMap.h \
-       os/KeyValueDB.h \
-       os/LevelDBStore.h
+       os/WBThrottle.h \
+       os/ZFSFileStoreBackend.h
 
 if WITH_LIBZFS
 libos_zfs_a_SOURCES = os/ZFS.cc
diff --git a/src/os/MemStore.cc b/src/os/MemStore.cc
new file mode 100644 (file)
index 0000000..063ae24
--- /dev/null
@@ -0,0 +1,1458 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2013 Inktank
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#include "include/types.h"
+#include "include/stringify.h"
+#include "common/errno.h"
+#include "MemStore.h"
+
+#define dout_subsys ceph_subsys_filestore
+#undef dout_prefix
+#define dout_prefix *_dout << "memstore(" << path << ") "
+
+// for comparing collections for lock ordering
+bool operator>(const MemStore::CollectionRef& l,
+              const MemStore::CollectionRef& r)
+{
+  return (unsigned long)l.get() > (unsigned long)r.get();
+}
+
+
+int MemStore::peek_journal_fsid(uuid_d *fsid)
+{
+  *fsid = uuid_d();
+  return 0;
+}
+
+int MemStore::mount()
+{
+  int r = _load();
+  if (r < 0)
+    return r;
+  finisher.start();
+  return 0;
+}
+
+int MemStore::umount()
+{
+  finisher.stop();
+  return _save();
+}
+
+int MemStore::_save()
+{
+  dout(10) << __func__ << dendl;
+  Mutex::Locker l(apply_lock); // block any writer
+  dump_all();
+  set<coll_t> collections;
+  for (hash_map<coll_t,CollectionRef>::iterator p = coll_map.begin();
+       p != coll_map.end();
+       ++p) {
+    dout(20) << __func__ << " coll " << p->first << " " << p->second << dendl;
+    collections.insert(p->first);
+    bufferlist bl;
+    assert(p->second);
+    p->second->encode(bl);
+    string fn = path + "/" + stringify(p->first);
+    int r = bl.write_file(fn.c_str());
+    if (r < 0)
+      return r;
+  }
+
+  string fn = path + "/collections";
+  bufferlist bl;
+  ::encode(collections, bl);
+  int r = bl.write_file(fn.c_str());
+  if (r < 0)
+    return r;
+
+  return 0;
+}
+
+void MemStore::dump_all()
+{
+  Formatter *f = new_formatter("json-pretty");
+  f->open_object_section("store");
+  dump(f);
+  f->close_section();
+  dout(0) << "dump:";
+  f->flush(*_dout);
+  *_dout << dendl;
+  delete f;
+}
+
+void MemStore::dump(Formatter *f)
+{
+  f->open_array_section("collections");
+  for (hash_map<coll_t,CollectionRef>::iterator p = coll_map.begin();
+       p != coll_map.end();
+       ++p) {
+    f->open_object_section("collection");
+    f->dump_string("name", stringify(p->first));
+
+    f->open_array_section("xattrs");
+    for (map<string,bufferptr>::iterator q = p->second->xattr.begin();
+        q != p->second->xattr.end();
+        ++q) {
+      f->open_object_section("xattr");
+      f->dump_string("name", q->first);
+      f->dump_int("length", q->second.length());
+      f->close_section();
+    }
+    f->close_section();
+
+    f->open_array_section("objects");
+    for (map<ghobject_t,ObjectRef>::iterator q = p->second->object_map.begin();
+        q != p->second->object_map.end();
+        ++q) {
+      f->open_object_section("object");
+      f->dump_string("name", stringify(q->first));
+      if (q->second)
+       q->second->dump(f);
+      f->close_section();
+    }
+    f->close_section();
+
+    f->close_section();
+  }
+  f->close_section();
+}
+
+int MemStore::_load()
+{
+  dout(10) << __func__ << dendl;
+  bufferlist bl;
+  string fn = path + "/collections";
+  string err;
+  int r = bl.read_file(fn.c_str(), &err);
+  if (r < 0)
+    return r;
+
+  set<coll_t> collections;
+  bufferlist::iterator p = bl.begin();
+  ::decode(collections, p);
+
+  for (set<coll_t>::iterator q = collections.begin();
+       q != collections.end();
+       ++q) {
+    string fn = path + "/" + stringify(*q);
+    bufferlist cbl;
+    int r = cbl.read_file(fn.c_str(), &err);
+    if (r < 0)
+      return r;
+    CollectionRef c(new Collection);
+    bufferlist::iterator p = cbl.begin();
+    c->decode(p);
+    coll_map[*q] = c;
+  }
+
+  dump_all();
+
+  return 0;  
+}
+
+void MemStore::set_fsid(uuid_d u)
+{
+  int r = write_meta("fs_fsid", stringify(u));
+  assert(r >= 0);
+}
+
+uuid_d MemStore::get_fsid()
+{
+  string fsid_str;
+  int r = read_meta("fs_fsid", &fsid_str);
+  assert(r >= 0);
+  uuid_d uuid;
+  bool b = uuid.parse(fsid_str.c_str());
+  assert(b);
+  return uuid;
+}
+
+int MemStore::mkfs()
+{
+  string fsid_str;
+  int r = read_meta("fs_fsid", &fsid_str);
+  if (r == -ENOENT) {
+    uuid_d fsid;
+    fsid.generate_random();
+    fsid_str = stringify(fsid);
+    r = write_meta("fs_fsid", fsid_str);
+    if (r < 0)
+      return r;
+    dout(1) << __func__ << " new fsid " << fsid_str << dendl;
+  } else {
+    dout(1) << __func__ << " had fsid " << fsid_str << dendl;
+  }
+
+  string fn = path + "/collections";
+  derr << path << dendl;
+  bufferlist bl;
+  set<coll_t> collections;
+  ::encode(collections, bl);
+  r = bl.write_file(fn.c_str());
+  if (r < 0)
+    return r;
+
+  return 0;
+}
+
+int MemStore::statfs(struct statfs *st)
+{
+  dout(10) << __func__ << dendl;
+  // make some shit up.  these are the only fields that matter.
+  st->f_bsize = 1024;
+  st->f_blocks = 1000000;
+  st->f_bfree =  1000000;
+  st->f_bavail = 1000000;
+  return 0;
+}
+
+filestore_perf_stat_t MemStore::get_cur_stats()
+{
+  // fixme
+  return filestore_perf_stat_t();
+}
+
+MemStore::CollectionRef MemStore::get_collection(coll_t cid)
+{
+  RWLock::RLocker l(coll_lock);
+  hash_map<coll_t,CollectionRef>::iterator cp = coll_map.find(cid);
+  if (cp == coll_map.end())
+    return CollectionRef();
+  return cp->second;
+}
+
+
+// ---------------
+// read operations
+
+bool MemStore::exists(coll_t cid, const ghobject_t& oid)
+{
+  dout(10) << __func__ << " " << cid << " " << oid << dendl;
+  CollectionRef c = get_collection(cid);
+  if (!c)
+    return false;
+  RWLock::RLocker l(c->lock);
+
+  return (c->get_object(oid) != NULL);
+}
+
+int MemStore::stat(
+    coll_t cid,
+    const ghobject_t& oid,
+    struct stat *st,
+    bool allow_eio)
+{
+  dout(10) << __func__ << " " << cid << " " << oid << dendl;
+  CollectionRef c = get_collection(cid);
+  if (!c)
+    return -ENOENT;
+  RWLock::RLocker l(c->lock);
+
+  ObjectRef o = c->get_object(oid);
+  if (!o)
+    return -ENOENT;
+  st->st_size = o->data.length();
+  st->st_blksize = 4096;
+  st->st_blocks = (st->st_size + st->st_blksize - 1) / st->st_blksize;
+  st->st_nlink = 1;
+  return 0;
+}
+
+int MemStore::read(
+    coll_t cid,
+    const ghobject_t& oid,
+    uint64_t offset,
+    size_t len,
+    bufferlist& bl,
+    bool allow_eio)
+{
+  dout(10) << __func__ << " " << cid << " " << oid << " "
+          << offset << "~" << len << dendl;
+  CollectionRef c = get_collection(cid);
+  if (!c)
+    return -ENOENT;
+  RWLock::RLocker lc(c->lock);
+
+  ObjectRef o = c->get_object(oid);
+  if (!o)
+    return -ENOENT;
+  if (offset >= o->data.length())
+    return 0;
+  size_t l = len;
+  if (l == 0)  // note: len == 0 means read the entire object
+    l = o->data.length();
+  else if (offset + l > o->data.length())
+    l = o->data.length() - offset;
+  bl.clear();
+  bl.substr_of(o->data, offset, l);
+  return bl.length();
+}
+
+int MemStore::fiemap(coll_t cid, const ghobject_t& oid,
+                    uint64_t offset, size_t len, bufferlist& bl)
+{
+  dout(10) << __func__ << " " << cid << " " << oid << " " << offset << "~"
+          << len << dendl;
+  CollectionRef c = get_collection(cid);
+  if (!c)
+    return -ENOENT;
+  RWLock::RLocker lc(c->lock);
+
+  ObjectRef o = c->get_object(oid);
+  if (!o)
+    return -ENOENT;
+  if (offset >= o->data.length())
+    return 0;
+  size_t l = len;
+  if (offset + l > o->data.length())
+    l = o->data.length() - offset;
+  map<uint64_t, uint64_t> m;
+  m[offset] = l;
+  ::encode(m, bl);
+  return 0;  
+}
+
+int MemStore::getattr(coll_t cid, const ghobject_t& oid,
+                     const char *name, bufferptr& value)
+{
+  dout(10) << __func__ << " " << cid << " " << oid << " " << name << dendl;
+  CollectionRef c = get_collection(cid);
+  if (!c)
+    return -ENOENT;
+  RWLock::RLocker l(c->lock);
+
+  ObjectRef o = c->get_object(oid);
+  if (!o)
+    return -ENOENT;
+  string k(name);
+  if (!o->xattr.count(k)) {
+    return -ENODATA;
+  }
+  value = o->xattr[k];
+  return 0;
+}
+
+int MemStore::getattrs(coll_t cid, const ghobject_t& oid,
+                      map<string,bufferptr>& aset, bool user_only)
+{
+  dout(10) << __func__ << " " << cid << " " << oid << dendl;
+  CollectionRef c = get_collection(cid);
+  if (!c)
+    return -ENOENT;
+  RWLock::RLocker l(c->lock);
+
+  ObjectRef o = c->get_object(oid);
+  if (!o)
+    return -ENOENT;
+  if (user_only) {
+    for (map<string,bufferptr>::iterator p = o->xattr.begin();
+        p != o->xattr.end();
+        ++p) {
+      if (p->first.length() > 1 && p->first[0] == '_') {
+       aset[p->first.substr(1)] = p->second;
+      }
+    }
+  } else {
+    aset = o->xattr;
+  }
+  return 0;
+}
+
+int MemStore::list_collections(vector<coll_t>& ls)
+{
+  dout(10) << __func__ << dendl;
+  RWLock::RLocker l(coll_lock);
+  for (hash_map<coll_t,CollectionRef>::iterator p = coll_map.begin();
+       p != coll_map.end();
+       ++p) {
+    ls.push_back(p->first);
+  }
+  return 0;
+}
+
+bool MemStore::collection_exists(coll_t cid)
+{
+  dout(10) << __func__ << " " << cid << dendl;
+  RWLock::RLocker l(coll_lock);
+  return coll_map.count(cid);
+}
+
+int MemStore::collection_getattr(coll_t cid, const char *name,
+                                void *value, size_t size)
+{
+  dout(10) << __func__ << " " << cid << " " << name << dendl;
+  CollectionRef c = get_collection(cid);
+  if (!c)
+    return -ENOENT;
+  RWLock::RLocker lc(c->lock);
+
+  if (!c->xattr.count(name))
+    return -ENOENT;
+  bufferlist bl;
+  bl.append(c->xattr[name]);
+  size_t l = MIN(size, bl.length());
+  bl.copy(0, size, (char *)value);
+  return l;
+}
+
+int MemStore::collection_getattr(coll_t cid, const char *name, bufferlist& bl)
+{
+  dout(10) << __func__ << " " << cid << " " << name << dendl;
+  CollectionRef c = get_collection(cid);
+  if (!c)
+    return -ENOENT;
+  RWLock::RLocker l(c->lock);
+
+  if (!c->xattr.count(name))
+    return -ENOENT;
+  bl.clear();
+  bl.append(c->xattr[name]);
+  return bl.length();
+}
+
+int MemStore::collection_getattrs(coll_t cid, map<string,bufferptr> &aset)
+{
+  dout(10) << __func__ << " " << cid << dendl;
+  CollectionRef c = get_collection(cid);
+  if (!c)
+    return -ENOENT;
+  RWLock::RLocker l(c->lock);
+
+  aset = c->xattr;
+  return 0;
+}
+
+bool MemStore::collection_empty(coll_t cid)
+{
+  dout(10) << __func__ << " " << cid << dendl;
+  CollectionRef c = get_collection(cid);
+  if (!c)
+    return -ENOENT;
+  RWLock::RLocker l(c->lock);
+
+  return c->object_map.empty();
+}
+
+int MemStore::collection_list(coll_t cid, vector<ghobject_t>& o)
+{
+  dout(10) << __func__ << " " << cid << dendl;
+  CollectionRef c = get_collection(cid);
+  if (!c)
+    return -ENOENT;
+  RWLock::RLocker l(c->lock);
+
+  for (map<ghobject_t,ObjectRef>::iterator p = c->object_map.begin();
+       p != c->object_map.end();
+       ++p)
+    o.push_back(p->first);
+  return 0;
+}
+
+int MemStore::collection_list_partial(coll_t cid, ghobject_t start,
+                                     int min, int max, snapid_t snap, 
+                                     vector<ghobject_t> *ls, ghobject_t *next)
+{
+  dout(10) << __func__ << " " << cid << " " << start << " " << min << "-"
+          << max << " " << snap << dendl;
+  CollectionRef c = get_collection(cid);
+  if (!c)
+    return -ENOENT;
+  RWLock::RLocker l(c->lock);
+
+  map<ghobject_t,ObjectRef>::iterator p = c->object_map.lower_bound(start);
+  while (p != c->object_map.end() &&
+        ls->size() < (unsigned)max) {
+    ls->push_back(p->first);
+    ++p;
+  }
+  if (p == c->object_map.end())
+    *next = ghobject_t::get_max();
+  else
+    *next = p->first;
+  return 0;
+}
+
+int MemStore::collection_list_range(coll_t cid,
+                                   ghobject_t start, ghobject_t end,
+                                   snapid_t seq, vector<ghobject_t> *ls)
+{
+  dout(10) << __func__ << " " << cid << " " << start << " " << end
+          << " " << seq << dendl;
+  CollectionRef c = get_collection(cid);
+  if (!c)
+    return -ENOENT;
+  RWLock::RLocker l(c->lock);
+
+  map<ghobject_t,ObjectRef>::iterator p = c->object_map.lower_bound(start);
+  while (p != c->object_map.end() &&
+        p->first < end) {
+    ls->push_back(p->first);
+    ++p;
+  }
+  return 0;
+}
+
+int MemStore::omap_get(
+    coll_t cid,                ///< [in] Collection containing oid
+    const ghobject_t &oid,   ///< [in] Object containing omap
+    bufferlist *header,      ///< [out] omap header
+    map<string, bufferlist> *out /// < [out] Key to value map
+    )
+{
+  dout(10) << __func__ << " " << cid << " " << oid << dendl;
+  CollectionRef c = get_collection(cid);
+  if (!c)
+    return -ENOENT;
+  RWLock::RLocker l(c->lock);
+
+  ObjectRef o = c->get_object(oid);
+  if (!o)
+    return -ENOENT;
+  *header = o->omap_header;
+  *out = o->omap;
+  return 0;
+}
+
+int MemStore::omap_get_header(
+    coll_t cid,                ///< [in] Collection containing oid
+    const ghobject_t &oid,   ///< [in] Object containing omap
+    bufferlist *header,      ///< [out] omap header
+    bool allow_eio ///< [in] don't assert on eio
+    )
+{
+  dout(10) << __func__ << " " << cid << " " << oid << dendl;
+  CollectionRef c = get_collection(cid);
+  if (!c)
+    return -ENOENT;
+  RWLock::RLocker l(c->lock);
+
+  ObjectRef o = c->get_object(oid);
+  if (!o)
+    return -ENOENT;
+  *header = o->omap_header;
+  return 0;
+}
+
+int MemStore::omap_get_keys(
+    coll_t cid,              ///< [in] Collection containing oid
+    const ghobject_t &oid, ///< [in] Object containing omap
+    set<string> *keys      ///< [out] Keys defined on oid
+    )
+{
+  dout(10) << __func__ << " " << cid << " " << oid << dendl;
+  CollectionRef c = get_collection(cid);
+  if (!c)
+    return -ENOENT;
+  RWLock::RLocker l(c->lock);
+
+  ObjectRef o = c->get_object(oid);
+  if (!o)
+    return -ENOENT;
+  for (map<string,bufferlist>::iterator p = o->omap.begin();
+       p != o->omap.end();
+       ++p)
+    keys->insert(p->first);
+  return 0;
+}
+
+int MemStore::omap_get_values(
+    coll_t cid,                    ///< [in] Collection containing oid
+    const ghobject_t &oid,       ///< [in] Object containing omap
+    const set<string> &keys,     ///< [in] Keys to get
+    map<string, bufferlist> *out ///< [out] Returned keys and values
+    )
+{
+  dout(10) << __func__ << " " << cid << " " << oid << dendl;
+  CollectionRef c = get_collection(cid);
+  if (!c)
+    return -ENOENT;
+  RWLock::RLocker l(c->lock);
+
+  ObjectRef o = c->get_object(oid);
+  if (!o)
+    return -ENOENT;
+  for (set<string>::const_iterator p = keys.begin();
+       p != keys.end();
+       ++p) {
+    map<string,bufferlist>::iterator q = o->omap.find(*p);
+    if (q != o->omap.end())
+      out->insert(*q);
+  }
+  return 0;
+}
+
+int MemStore::omap_check_keys(
+    coll_t cid,                ///< [in] Collection containing oid
+    const ghobject_t &oid,   ///< [in] Object containing omap
+    const set<string> &keys, ///< [in] Keys to check
+    set<string> *out         ///< [out] Subset of keys defined on oid
+    )
+{
+  dout(10) << __func__ << " " << cid << " " << oid << dendl;
+  CollectionRef c = get_collection(cid);
+  if (!c)
+    return -ENOENT;
+  RWLock::RLocker l(c->lock);
+
+  ObjectRef o = c->get_object(oid);
+  if (!o)
+    return -ENOENT;
+  for (set<string>::const_iterator p = keys.begin();
+       p != keys.end();
+       ++p) {
+    map<string,bufferlist>::iterator q = o->omap.find(*p);
+    if (q != o->omap.end())
+      out->insert(*p);
+  }
+  return 0;
+}
+
+ObjectMap::ObjectMapIterator MemStore::get_omap_iterator(coll_t cid,
+                                                        const ghobject_t& oid)
+{
+  dout(10) << __func__ << " " << cid << " " << oid << dendl;
+  CollectionRef c = get_collection(cid);
+  if (!c)
+    return ObjectMap::ObjectMapIterator();
+  RWLock::RLocker l(c->lock);
+
+  ObjectRef o = c->get_object(oid);
+  if (!o)
+    return ObjectMap::ObjectMapIterator();
+  return ObjectMap::ObjectMapIterator(new OmapIteratorImpl(c, o));
+}
+
+
+// ---------------
+// write operations
+
+int MemStore::queue_transactions(Sequencer *osr,
+                                list<Transaction*>& tls,
+                                TrackedOpRef op)
+{
+  // fixme: ignore the Sequencer and serialize everything.
+  Mutex::Locker l(apply_lock);
+
+  for (list<Transaction*>::iterator p = tls.begin(); p != tls.end(); ++p)
+    _do_transaction(**p);
+
+  Context *on_apply = NULL, *on_apply_sync = NULL, *on_commit = NULL;
+  ObjectStore::Transaction::collect_contexts(tls, &on_apply, &on_commit,
+                                            &on_apply_sync);
+  if (on_apply)
+    finisher.queue(on_apply);
+  if (on_apply_sync)
+    finisher.queue(on_apply_sync);
+  if (on_commit)
+    finisher.queue(on_commit);
+  return 0;
+}
+
+void MemStore::_do_transaction(Transaction& t)
+{
+  Transaction::iterator i = t.begin();
+  int pos = 0;
+
+  while (i.have_op()) {
+    int op = i.get_op();
+    int r = 0;
+
+    switch (op) {
+    case Transaction::OP_NOP:
+      break;
+    case Transaction::OP_TOUCH:
+      {
+       coll_t cid = i.get_cid();
+       ghobject_t oid = i.get_oid();
+       r = _touch(cid, oid);
+      }
+      break;
+      
+    case Transaction::OP_WRITE:
+      {
+       coll_t cid = i.get_cid();
+       ghobject_t oid = i.get_oid();
+       uint64_t off = i.get_length();
+       uint64_t len = i.get_length();
+       bool replica = i.get_replica();
+       bufferlist bl;
+       i.get_bl(bl);
+       r = _write(cid, oid, off, len, bl, replica);
+      }
+      break;
+      
+    case Transaction::OP_ZERO:
+      {
+       coll_t cid = i.get_cid();
+       ghobject_t oid = i.get_oid();
+       uint64_t off = i.get_length();
+       uint64_t len = i.get_length();
+       r = _zero(cid, oid, off, len);
+      }
+      break;
+      
+    case Transaction::OP_TRIMCACHE:
+      {
+       i.get_cid();
+       i.get_oid();
+       i.get_length();
+       i.get_length();
+       // deprecated, no-op
+      }
+      break;
+      
+    case Transaction::OP_TRUNCATE:
+      {
+       coll_t cid = i.get_cid();
+       ghobject_t oid = i.get_oid();
+       uint64_t off = i.get_length();
+       r = _truncate(cid, oid, off);
+      }
+      break;
+      
+    case Transaction::OP_REMOVE:
+      {
+       coll_t cid = i.get_cid();
+       ghobject_t oid = i.get_oid();
+       r = _remove(cid, oid);
+      }
+      break;
+      
+    case Transaction::OP_SETATTR:
+      {
+       coll_t cid = i.get_cid();
+       ghobject_t oid = i.get_oid();
+       string name = i.get_attrname();
+       bufferlist bl;
+       i.get_bl(bl);
+       map<string, bufferptr> to_set;
+       to_set[name] = bufferptr(bl.c_str(), bl.length());
+       r = _setattrs(cid, oid, to_set);
+      }
+      break;
+      
+    case Transaction::OP_SETATTRS:
+      {
+       coll_t cid = i.get_cid();
+       ghobject_t oid = i.get_oid();
+       map<string, bufferptr> aset;
+       i.get_attrset(aset);
+       r = _setattrs(cid, oid, aset);
+      }
+      break;
+
+    case Transaction::OP_RMATTR:
+      {
+       coll_t cid = i.get_cid();
+       ghobject_t oid = i.get_oid();
+       string name = i.get_attrname();
+       r = _rmattr(cid, oid, name.c_str());
+      }
+      break;
+
+    case Transaction::OP_RMATTRS:
+      {
+       coll_t cid = i.get_cid();
+       ghobject_t oid = i.get_oid();
+       r = _rmattrs(cid, oid);
+      }
+      break;
+      
+    case Transaction::OP_CLONE:
+      {
+       coll_t cid = i.get_cid();
+       ghobject_t oid = i.get_oid();
+       ghobject_t noid = i.get_oid();
+       r = _clone(cid, oid, noid);
+      }
+      break;
+
+    case Transaction::OP_CLONERANGE:
+      {
+       coll_t cid = i.get_cid();
+       ghobject_t oid = i.get_oid();
+       ghobject_t noid = i.get_oid();
+       uint64_t off = i.get_length();
+       uint64_t len = i.get_length();
+       r = _clone_range(cid, oid, noid, off, len, off);
+      }
+      break;
+
+    case Transaction::OP_CLONERANGE2:
+      {
+       coll_t cid = i.get_cid();
+       ghobject_t oid = i.get_oid();
+       ghobject_t noid = i.get_oid();
+       uint64_t srcoff = i.get_length();
+       uint64_t len = i.get_length();
+       uint64_t dstoff = i.get_length();
+       r = _clone_range(cid, oid, noid, srcoff, len, dstoff);
+      }
+      break;
+
+    case Transaction::OP_MKCOLL:
+      {
+       coll_t cid = i.get_cid();
+       r = _create_collection(cid);
+      }
+      break;
+
+    case Transaction::OP_RMCOLL:
+      {
+       coll_t cid = i.get_cid();
+       r = _destroy_collection(cid);
+      }
+      break;
+
+    case Transaction::OP_COLL_ADD:
+      {
+       coll_t ncid = i.get_cid();
+       coll_t ocid = i.get_cid();
+       ghobject_t oid = i.get_oid();
+       r = _collection_add(ncid, ocid, oid);
+      }
+      break;
+
+    case Transaction::OP_COLL_REMOVE:
+       {
+       coll_t cid = i.get_cid();
+       ghobject_t oid = i.get_oid();
+       r = _remove(cid, oid);
+       }
+      break;
+
+    case Transaction::OP_COLL_MOVE:
+      assert(0 == "deprecated");
+      break;
+
+    case Transaction::OP_COLL_MOVE_RENAME:
+      {
+       coll_t oldcid = i.get_cid();
+       ghobject_t oldoid = i.get_oid();
+       coll_t newcid = i.get_cid();
+       ghobject_t newoid = i.get_oid();
+       r = _collection_move_rename(oldcid, oldoid, newcid, newoid);
+      }
+      break;
+
+    case Transaction::OP_COLL_SETATTR:
+      {
+       coll_t cid = i.get_cid();
+       string name = i.get_attrname();
+       bufferlist bl;
+       i.get_bl(bl);
+       r = _collection_setattr(cid, name.c_str(), bl.c_str(), bl.length());
+      }
+      break;
+
+    case Transaction::OP_COLL_RMATTR:
+      {
+       coll_t cid = i.get_cid();
+       string name = i.get_attrname();
+       r = _collection_rmattr(cid, name.c_str());
+      }
+      break;
+
+    case Transaction::OP_COLL_RENAME:
+      {
+       coll_t cid(i.get_cid());
+       coll_t ncid(i.get_cid());
+       r = _collection_rename(cid, ncid);
+      }
+      break;
+
+    case Transaction::OP_OMAP_CLEAR:
+      {
+       coll_t cid(i.get_cid());
+       ghobject_t oid = i.get_oid();
+       r = _omap_clear(cid, oid);
+      }
+      break;
+    case Transaction::OP_OMAP_SETKEYS:
+      {
+       coll_t cid(i.get_cid());
+       ghobject_t oid = i.get_oid();
+       map<string, bufferlist> aset;
+       i.get_attrset(aset);
+       r = _omap_setkeys(cid, oid, aset);
+      }
+      break;
+    case Transaction::OP_OMAP_RMKEYS:
+      {
+       coll_t cid(i.get_cid());
+       ghobject_t oid = i.get_oid();
+       set<string> keys;
+       i.get_keyset(keys);
+       r = _omap_rmkeys(cid, oid, keys);
+      }
+      break;
+    case Transaction::OP_OMAP_RMKEYRANGE:
+      {
+       coll_t cid(i.get_cid());
+       ghobject_t oid = i.get_oid();
+       string first, last;
+       first = i.get_key();
+       last = i.get_key();
+       r = _omap_rmkeyrange(cid, oid, first, last);
+      }
+      break;
+    case Transaction::OP_OMAP_SETHEADER:
+      {
+       coll_t cid(i.get_cid());
+       ghobject_t oid = i.get_oid();
+       bufferlist bl;
+       i.get_bl(bl);
+       r = _omap_setheader(cid, oid, bl);
+      }
+      break;
+    case Transaction::OP_SPLIT_COLLECTION:
+      assert(0 == "deprecated");
+      break;
+    case Transaction::OP_SPLIT_COLLECTION2:
+      {
+       coll_t cid(i.get_cid());
+       uint32_t bits(i.get_u32());
+       uint32_t rem(i.get_u32());
+       coll_t dest(i.get_cid());
+       r = _split_collection(cid, bits, rem, dest);
+      }
+      break;
+
+    default:
+      derr << "bad op " << op << dendl;
+      assert(0);
+    }
+
+    if (r < 0) {
+      bool ok = false;
+
+      if (r == -ENOENT && !(op == Transaction::OP_CLONERANGE ||
+                           op == Transaction::OP_CLONE ||
+                           op == Transaction::OP_CLONERANGE2 ||
+                           op == Transaction::OP_COLL_ADD))
+       // -ENOENT is usually okay
+       ok = true;
+      if (r == -ENODATA)
+       ok = true;
+
+      if (!ok) {
+       const char *msg = "unexpected error code";
+
+       if (r == -ENOENT && (op == Transaction::OP_CLONERANGE ||
+                            op == Transaction::OP_CLONE ||
+                            op == Transaction::OP_CLONERANGE2))
+         msg = "ENOENT on clone suggests osd bug";
+
+       if (r == -ENOSPC)
+         // For now, if we hit _any_ ENOSPC, crash, before we do any damage
+         // by partially applying transactions.
+         msg = "ENOSPC handling not implemented";
+
+       if (r == -ENOTEMPTY) {
+         msg = "ENOTEMPTY suggests garbage data in osd data dir";
+         dump_all();
+       }
+
+       dout(0) << " error " << cpp_strerror(r) << " not handled on operation " << op
+               << " (op " << pos << ", counting from 0)" << dendl;
+       dout(0) << msg << dendl;
+       dout(0) << " transaction dump:\n";
+       JSONFormatter f(true);
+       f.open_object_section("transaction");
+       t.dump(&f);
+       f.close_section();
+       f.flush(*_dout);
+       *_dout << dendl;
+       assert(0 == "unexpected error");
+      }
+    }
+
+    ++pos;
+  }
+}
+
+int MemStore::_touch(coll_t cid, const ghobject_t& oid)
+{
+  dout(10) << __func__ << " " << cid << " " << oid << dendl;
+  CollectionRef c = get_collection(cid);
+  if (!c)
+    return -ENOENT;
+  RWLock::WLocker l(c->lock);
+
+  ObjectRef o = c->get_object(oid);
+  if (!o) {
+    c->object_map[oid].reset(new Object);
+    c->object_hash[oid].reset(new Object);
+  }
+  return 0;
+}
+
+int MemStore::_write(coll_t cid, const ghobject_t& oid,
+                    uint64_t offset, size_t len, const bufferlist& bl,
+                    bool replica)
+{
+  dout(10) << __func__ << " " << cid << " " << oid << " "
+          << offset << "~" << len << dendl;
+  assert(len == bl.length());
+
+  CollectionRef c = get_collection(cid);
+  if (!c)
+    return -ENOENT;
+  RWLock::WLocker l(c->lock);
+
+  ObjectRef o = c->get_object(oid);
+  if (!o) {
+    // write implicitly creates a missing object
+    o.reset(new Object);
+    c->object_map[oid] = o;
+    c->object_hash[oid] = o;
+  }
+
+  _write_into_bl(bl, offset, &o->data);
+  return 0;
+}
+
+void MemStore::_write_into_bl(const bufferlist& src, unsigned offset,
+                             bufferlist *dst)
+{
+  unsigned len = src.length();
+
+  // before
+  bufferlist newdata;
+  if (dst->length() >= offset) {
+    newdata.substr_of(*dst, 0, offset);
+  } else {
+    newdata.substr_of(*dst, 0, dst->length());
+    bufferptr bp(offset - dst->length());
+    bp.zero();
+    newdata.append(bp);
+  }
+
+  newdata.append(src);
+
+  // after
+  if (dst->length() > offset + len) {
+    bufferlist tail;
+    tail.substr_of(*dst, offset + len, dst->length() - (offset + len));
+    newdata.append(tail);
+  }
+
+  dst->claim(newdata);
+}
+
+int MemStore::_zero(coll_t cid, const ghobject_t& oid,
+                   uint64_t offset, size_t len)
+{
+  dout(10) << __func__ << " " << cid << " " << oid << " " << offset << "~"
+          << len << dendl;
+  bufferptr bp(len);
+  bp.zero();
+  bufferlist bl;
+  bl.push_back(bp);
+  return _write(cid, oid, offset, len, bl);
+}
+
+int MemStore::_truncate(coll_t cid, const ghobject_t& oid, uint64_t size)
+{
+  dout(10) << __func__ << " " << cid << " " << oid << " " << size << dendl;
+  CollectionRef c = get_collection(cid);
+  if (!c)
+    return -ENOENT;
+  RWLock::WLocker l(c->lock);
+
+  ObjectRef o = c->get_object(oid);
+  if (!o)
+    return -ENOENT;
+  if (o->data.length() > size) {
+    bufferlist bl;
+    bl.substr_of(o->data, 0, size);
+    o->data.claim(bl);
+  } else if (o->data.length() == size) {
+    // do nothing
+  } else {
+    bufferptr bp(size - o->data.length());
+    bp.zero();
+    o->data.append(bp);
+  }
+  return 0;
+}
+
+int MemStore::_remove(coll_t cid, const ghobject_t& oid)
+{
+  dout(10) << __func__ << " " << cid << " " << oid << dendl;
+  CollectionRef c = get_collection(cid);
+  if (!c)
+    return -ENOENT;
+  RWLock::WLocker l(c->lock);
+
+  ObjectRef o = c->get_object(oid);
+  if (!o)
+    return -ENOENT;
+  c->object_map.erase(oid);
+  c->object_hash.erase(oid);
+  return 0;
+}
+
+int MemStore::_setattrs(coll_t cid, const ghobject_t& oid,
+                       map<string,bufferptr>& aset)
+{
+  dout(10) << __func__ << " " << cid << " " << oid << dendl;
+  CollectionRef c = get_collection(cid);
+  if (!c)
+    return -ENOENT;
+  RWLock::WLocker l(c->lock);
+
+  ObjectRef o = c->get_object(oid);
+  if (!o)
+    return -ENOENT;
+  for (map<string,bufferptr>::const_iterator p = aset.begin(); p != aset.end(); ++p)
+    o->xattr[p->first] = p->second;
+  return 0;
+}
+
+int MemStore::_rmattr(coll_t cid, const ghobject_t& oid, const char *name)
+{
+  dout(10) << __func__ << " " << cid << " " << oid << " " << name << dendl;
+  CollectionRef c = get_collection(cid);
+  if (!c)
+    return -ENOENT;
+  RWLock::WLocker l(c->lock);
+
+  ObjectRef o = c->get_object(oid);
+  if (!o)
+    return -ENOENT;
+  if (!o->xattr.count(name))
+    return -ENODATA;
+  o->xattr.erase(name);
+  return 0;
+}
+
+int MemStore::_rmattrs(coll_t cid, const ghobject_t& oid)
+{
+  dout(10) << __func__ << " " << cid << " " << oid << dendl;
+  CollectionRef c = get_collection(cid);
+  if (!c)
+    return -ENOENT;
+  RWLock::WLocker l(c->lock);
+
+  ObjectRef o = c->get_object(oid);
+  if (!o)
+    return -ENOENT;
+  o->xattr.clear();
+  return 0;
+}
+
+int MemStore::_clone(coll_t cid, const ghobject_t& oldoid,
+                    const ghobject_t& newoid)
+{
+  dout(10) << __func__ << " " << cid << " " << oldoid
+          << " -> " << newoid << dendl;
+  CollectionRef c = get_collection(cid);
+  if (!c)
+    return -ENOENT;
+  RWLock::WLocker l(c->lock);
+
+  ObjectRef oo = c->get_object(oldoid);
+  if (!oo)
+    return -ENOENT;
+  ObjectRef no = c->get_object(newoid);
+  if (!no) {
+    no.reset(new Object);
+    c->object_map[newoid] = no;
+    c->object_hash[newoid] = no;
+  }
+  no->data = oo->data;
+  no->omap_header = oo->omap_header;
+  no->omap = oo->omap;
+  return 0;
+}
+
+int MemStore::_clone_range(coll_t cid, const ghobject_t& oldoid,
+                          const ghobject_t& newoid,
+                          uint64_t srcoff, uint64_t len, uint64_t dstoff)
+{
+  dout(10) << __func__ << " " << cid << " "
+          << oldoid << " " << srcoff << "~" << len << " -> "
+          << newoid << " " << dstoff << "~" << len
+          << dendl;
+  CollectionRef c = get_collection(cid);
+  if (!c)
+    return -ENOENT;
+  RWLock::WLocker l(c->lock);
+
+  ObjectRef oo = c->get_object(oldoid);
+  if (!oo)
+    return -ENOENT;
+  ObjectRef no = c->get_object(newoid);
+  if (!no) {
+    no.reset(new Object);
+    c->object_map[newoid] = no;
+    c->object_hash[newoid] = no;
+  }
+  if (srcoff >= oo->data.length())
+    return 0;
+  if (srcoff + len >= oo->data.length())
+    len = oo->data.length() - srcoff;
+  bufferlist bl;
+  bl.substr_of(oo->data, srcoff, len);
+  _write_into_bl(bl, dstoff, &no->data);
+  return len;
+}
+
+int MemStore::_omap_clear(coll_t cid, const ghobject_t &oid)
+{
+  dout(10) << __func__ << " " << cid << " " << oid << dendl;
+  CollectionRef c = get_collection(cid);
+  if (!c)
+    return -ENOENT;
+  RWLock::WLocker l(c->lock);
+
+  ObjectRef o = c->get_object(oid);
+  if (!o)
+    return -ENOENT;
+  o->omap.clear();
+  return 0;
+}
+
+int MemStore::_omap_setkeys(coll_t cid, const ghobject_t &oid,
+                           const map<string, bufferlist> &aset)
+{
+  dout(10) << __func__ << " " << cid << " " << oid << dendl;
+  CollectionRef c = get_collection(cid);
+  if (!c)
+    return -ENOENT;
+  RWLock::WLocker l(c->lock);
+
+  ObjectRef o = c->get_object(oid);
+  if (!o)
+    return -ENOENT;
+  for (map<string,bufferlist>::const_iterator p = aset.begin(); p != aset.end(); ++p)
+    o->omap[p->first] = p->second;
+  return 0;
+}
+
+int MemStore::_omap_rmkeys(coll_t cid, const ghobject_t &oid,
+                          const set<string> &keys)
+{
+  dout(10) << __func__ << " " << cid << " " << oid << dendl;
+  CollectionRef c = get_collection(cid);
+  if (!c)
+    return -ENOENT;
+  RWLock::WLocker l(c->lock);
+
+  ObjectRef o = c->get_object(oid);
+  if (!o)
+    return -ENOENT;
+  for (set<string>::const_iterator p = keys.begin(); p != keys.end(); ++p)
+    o->omap.erase(*p);
+  return 0;
+}
+
+int MemStore::_omap_rmkeyrange(coll_t cid, const ghobject_t &oid,
+                              const string& first, const string& last)
+{
+  dout(10) << __func__ << " " << cid << " " << oid << " " << first
+          << " " << last << dendl;
+  CollectionRef c = get_collection(cid);
+  if (!c)
+    return -ENOENT;
+  RWLock::WLocker l(c->lock);
+
+  ObjectRef o = c->get_object(oid);
+  if (!o)
+    return -ENOENT;
+  map<string,bufferlist>::iterator p = o->omap.upper_bound(first);
+  map<string,bufferlist>::iterator e = o->omap.lower_bound(last);
+  while (p != e)
+    o->omap.erase(p++);
+  return 0;
+}
+
+int MemStore::_omap_setheader(coll_t cid, const ghobject_t &oid,
+                             const bufferlist &bl)
+{
+  dout(10) << __func__ << " " << cid << " " << oid << dendl;
+  CollectionRef c = get_collection(cid);
+  if (!c)
+    return -ENOENT;
+  RWLock::WLocker l(c->lock);
+
+  ObjectRef o = c->get_object(oid);
+  if (!o)
+    return -ENOENT;
+  o->omap_header = bl;
+  return 0;
+}
+
+int MemStore::_create_collection(coll_t cid)
+{
+  dout(10) << __func__ << " " << cid << dendl;
+  RWLock::WLocker l(coll_lock);
+  hash_map<coll_t,CollectionRef>::iterator cp = coll_map.find(cid);
+  if (cp != coll_map.end())
+    return -EEXIST;
+  coll_map[cid].reset(new Collection);
+  return 0;
+}
+
+int MemStore::_destroy_collection(coll_t cid)
+{
+  dout(10) << __func__ << " " << cid << dendl;
+  RWLock::WLocker l(coll_lock);
+  hash_map<coll_t,CollectionRef>::iterator cp = coll_map.find(cid);
+  if (cp == coll_map.end())
+    return -ENOENT;
+  {
+    RWLock::RLocker l2(cp->second->lock);
+    if (!cp->second->object_map.empty())
+      return -ENOTEMPTY;
+  }
+  coll_map.erase(cp);
+  return 0;
+}
+
+int MemStore::_collection_add(coll_t cid, coll_t ocid, const ghobject_t& oid)
+{
+  dout(10) << __func__ << " " << cid << " " << ocid << " " << oid << dendl;
+  CollectionRef c = get_collection(cid);
+  if (!c)
+    return -ENOENT;
+  CollectionRef oc = get_collection(ocid);
+  if (!oc)
+    return -ENOENT;
+  RWLock::WLocker l1(MIN(c, oc)->lock);
+  RWLock::WLocker l2(MAX(c, oc)->lock);
+
+  if (c->object_hash.count(oid))
+    return -EEXIST;
+  if (oc->object_hash.count(oid) == 0)
+    return -ENOENT;
+  ObjectRef o = oc->object_hash[oid];
+  c->object_map[oid] = o;
+  c->object_hash[oid] = o;
+  return 0;
+}
+
+int MemStore::_collection_move_rename(coll_t oldcid, const ghobject_t& oldoid,
+                                     coll_t cid, const ghobject_t& oid)
+{
+  dout(10) << __func__ << " " << oldcid << " " << oldoid << " -> "
+          << cid << " " << oid << dendl;
+  CollectionRef c = get_collection(cid);
+  if (!c)
+    return -ENOENT;
+  CollectionRef oc = get_collection(oldcid);
+  if (!oc)
+    return -ENOENT;
+  RWLock::WLocker l1(MIN(c, oc)->lock);
+  RWLock::WLocker l2(MAX(c, oc)->lock);
+
+  if (c->object_hash.count(oid))
+    return -EEXIST;
+  if (oc->object_hash.count(oldoid) == 0)
+    return -ENOENT;
+  ObjectRef o = oc->object_hash[oldoid];
+  c->object_map[oid] = o;
+  c->object_hash[oid] = o;
+  oc->object_map.erase(oldoid);
+  oc->object_hash.erase(oldoid);
+  return 0; 
+}
+
+int MemStore::_collection_setattr(coll_t cid, const char *name,
+                                 const void *value, size_t size)
+{
+  dout(10) << __func__ << " " << cid << " " << name << dendl;
+  hash_map<coll_t,CollectionRef>::iterator cp = coll_map.find(cid);
+  if (cp == coll_map.end())
+    return -ENOENT;
+  RWLock::WLocker l(cp->second->lock);
+
+  cp->second->xattr[name] = bufferptr((const char *)value, size);
+  return 0;
+}
+
+int MemStore::_collection_setattrs(coll_t cid, map<string,bufferptr> &aset)
+{
+  dout(10) << __func__ << " " << cid << dendl;
+  hash_map<coll_t,CollectionRef>::iterator cp = coll_map.find(cid);
+  if (cp == coll_map.end())
+    return -ENOENT;
+  RWLock::WLocker l(cp->second->lock);
+
+  for (map<string,bufferptr>::const_iterator p = aset.begin();
+       p != aset.end();
+       ++p) {
+    cp->second->xattr[p->first] = p->second;
+  }
+  return 0;
+}
+
+int MemStore::_collection_rmattr(coll_t cid, const char *name)
+{
+  dout(10) << __func__ << " " << cid << " " << name << dendl;
+  hash_map<coll_t,CollectionRef>::iterator cp = coll_map.find(cid);
+  if (cp == coll_map.end())
+    return -ENOENT;
+  RWLock::WLocker l(cp->second->lock);
+
+  if (cp->second->xattr.count(name) == 0)
+    return -ENODATA;
+  cp->second->xattr.erase(name);
+  return 0;
+}
+
+int MemStore::_collection_rename(const coll_t &cid, const coll_t &ncid)
+{
+  dout(10) << __func__ << " " << cid << " -> " << ncid << dendl;
+  RWLock::WLocker l(coll_lock);
+  if (coll_map.count(cid) == 0)
+    return -ENOENT;
+  if (coll_map.count(ncid))
+    return -EEXIST;
+  coll_map[ncid] = coll_map[cid];
+  coll_map.erase(cid);
+  return 0;
+}
+
+int MemStore::_split_collection(coll_t cid, uint32_t bits, uint32_t match,
+                               coll_t dest)
+{
+  dout(10) << __func__ << " " << cid << " " << bits << " " << match << " "
+          << dest << dendl;
+  CollectionRef sc = get_collection(cid);
+  if (!sc)
+    return -ENOENT;
+  CollectionRef dc = get_collection(dest);
+  if (!dc)
+    return -ENOENT;
+  RWLock::WLocker l1(MIN(sc, dc)->lock);
+  RWLock::WLocker l2(MAX(sc, dc)->lock);
+
+  map<ghobject_t,ObjectRef>::iterator p = sc->object_map.begin();
+  while (p != sc->object_map.end()) {
+    if (p->first.match(bits, match)) {
+      dout(20) << " moving " << p->first << dendl;
+      dc->object_map.insert(make_pair(p->first, p->second));
+      dc->object_hash.insert(make_pair(p->first, p->second));
+      sc->object_hash.erase(p->first);
+      sc->object_map.erase(p++);
+    } else {
+      ++p;
+    }
+  }
+
+  return 0;
+}
diff --git a/src/os/MemStore.h b/src/os/MemStore.h
new file mode 100644 (file)
index 0000000..9c4d003
--- /dev/null
@@ -0,0 +1,357 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2013- Sage Weil <sage@inktank.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software 
+ * Foundation.  See file COPYING.
+ * 
+ */
+
+
+#ifndef CEPH_MEMSTORE_H
+#define CEPH_MEMSTORE_H
+
+#include <ext/hash_map>
+using namespace __gnu_cxx;
+
+#include "include/assert.h"
+#include "common/Finisher.h"
+#include "common/RWLock.h"
+#include "ObjectStore.h"
+
+class MemStore : public ObjectStore {
+public:
+  struct Object {
+    bufferlist data;
+    map<string,bufferptr> xattr;
+    bufferlist omap_header;
+    map<string,bufferlist> omap;
+
+    void encode(bufferlist& bl) const {
+      ENCODE_START(1, 1, bl);
+      ::encode(data, bl);
+      ::encode(xattr, bl);
+      ::encode(omap_header, bl);
+      ::encode(omap, bl);
+      ENCODE_FINISH(bl);
+    }
+    void decode(bufferlist::iterator& p) {
+      DECODE_START(1, p);
+      ::decode(data, p);
+      ::decode(xattr, p);
+      ::decode(omap_header, p);
+      ::decode(omap, p);
+      DECODE_FINISH(p);
+    }
+    void dump(Formatter *f) const {
+      f->dump_int("data_len", data.length());
+      f->dump_int("omap_header_len", omap_header.length());
+
+      f->open_array_section("xattrs");
+      for (map<string,bufferptr>::const_iterator p = xattr.begin();
+          p != xattr.end();
+          ++p) {
+       f->open_object_section("xattr");
+       f->dump_string("name", p->first);
+       f->dump_int("length", p->second.length());
+       f->close_section();     
+      }
+      f->close_section();
+
+      f->open_array_section("omap");
+      for (map<string,bufferlist>::const_iterator p = omap.begin();
+          p != omap.end();
+          ++p) {
+       f->open_object_section("pair");
+       f->dump_string("key", p->first);
+       f->dump_int("length", p->second.length());
+       f->close_section();     
+      }
+      f->close_section();
+    }
+  };
+  typedef std::tr1::shared_ptr<Object> ObjectRef;
+
+  struct Collection {
+    hash_map<ghobject_t, ObjectRef> object_hash;  ///< for lookup
+    map<ghobject_t, ObjectRef> object_map;        ///< for iteration
+    map<string,bufferptr> xattr;
+    RWLock lock;   ///< for object_{map,hash}
+
+    // NOTE: The lock only needs to protect the object_map/hash, not the
+    // contents of individual objects.  The osd is already sequencing
+    // reads and writes, so we will never see them concurrently at this
+    // level.
+
+    ObjectRef get_object(ghobject_t oid) {
+      hash_map<ghobject_t,ObjectRef>::iterator o = object_hash.find(oid);
+      if (o == object_hash.end())
+       return ObjectRef();
+      return o->second;
+    }
+
+    void encode(bufferlist& bl) const {
+      ENCODE_START(1, 1, bl);
+      ::encode(xattr, bl);
+      uint32_t s = object_map.size();
+      ::encode(s, bl);
+      for (map<ghobject_t, ObjectRef>::const_iterator p = object_map.begin();
+          p != object_map.end();
+          ++p) {
+       ::encode(p->first, bl);
+       p->second->encode(bl);
+      }
+      ENCODE_FINISH(bl);
+    }
+    void decode(bufferlist::iterator& p) {
+      DECODE_START(1, p);
+      ::decode(xattr, p);
+      uint32_t s;
+      ::decode(s, p);
+      while (s--) {
+       ghobject_t k;
+       ::decode(k, p);
+       ObjectRef o(new Object);
+       o->decode(p);
+       object_map.insert(make_pair(k, o));
+       object_hash.insert(make_pair(k, o));
+      }
+      DECODE_FINISH(p);
+    }
+
+    Collection() : lock("MemStore::Collection::lock") {}
+  };
+  typedef std::tr1::shared_ptr<Collection> CollectionRef;
+
+private:
+  class OmapIteratorImpl : public ObjectMap::ObjectMapIteratorImpl {
+    CollectionRef c;
+    ObjectRef o;
+    map<string,bufferlist>::iterator it;
+  public:
+    OmapIteratorImpl(CollectionRef c, ObjectRef o)
+      : c(c), o(o), it(o->omap.begin()) {}
+
+    int seek_to_first() {
+      RWLock::RLocker l(c->lock);
+      it = o->omap.begin();
+      return 0;
+    }
+    int upper_bound(const string &after) {
+      RWLock::RLocker l(c->lock);
+      it = o->omap.upper_bound(after);
+      return 0;
+    }
+    int lower_bound(const string &to) {
+      RWLock::RLocker l(c->lock);
+      it = o->omap.lower_bound(to);
+      return 0;
+    }
+    bool valid() {
+      RWLock::RLocker l(c->lock);
+      return it != o->omap.end();      
+    }
+    int next() {
+      RWLock::RLocker l(c->lock);
+      ++it;
+      return 0;
+    }
+    string key() {
+      RWLock::RLocker l(c->lock);
+      return it->first;
+    }
+    bufferlist value() {
+      RWLock::RLocker l(c->lock);
+      return it->second;
+    }
+    int status() {
+      return 0;
+    }
+  };
+
+
+  hash_map<coll_t, CollectionRef> coll_map;
+  RWLock coll_lock;    ///< rwlock to protect coll_map
+  Mutex apply_lock;    ///< serialize all updates
+
+  CollectionRef get_collection(coll_t cid);
+
+  Finisher finisher;
+
+  void _do_transaction(Transaction& t);
+
+  void _write_into_bl(const bufferlist& src, unsigned offset, bufferlist *dst);
+
+  int _touch(coll_t cid, const ghobject_t& oid);
+  int _write(coll_t cid, const ghobject_t& oid, uint64_t offset, size_t len, const bufferlist& bl,
+      bool replica = false);
+  int _zero(coll_t cid, const ghobject_t& oid, uint64_t offset, size_t len);
+  int _truncate(coll_t cid, const ghobject_t& oid, uint64_t size);
+  int _remove(coll_t cid, const ghobject_t& oid);
+  int _setattrs(coll_t cid, const ghobject_t& oid, map<string,bufferptr>& aset);
+  int _rmattr(coll_t cid, const ghobject_t& oid, const char *name);
+  int _rmattrs(coll_t cid, const ghobject_t& oid);
+  int _clone(coll_t cid, const ghobject_t& oldoid, const ghobject_t& newoid);
+  int _clone_range(coll_t cid, const ghobject_t& oldoid,
+                  const ghobject_t& newoid,
+                  uint64_t srcoff, uint64_t len, uint64_t dstoff);
+  int _omap_clear(coll_t cid, const ghobject_t &oid);
+  int _omap_setkeys(coll_t cid, const ghobject_t &oid,
+                   const map<string, bufferlist> &aset);
+  int _omap_rmkeys(coll_t cid, const ghobject_t &oid, const set<string> &keys);
+  int _omap_rmkeyrange(coll_t cid, const ghobject_t &oid,
+                      const string& first, const string& last);
+  int _omap_setheader(coll_t cid, const ghobject_t &oid, const bufferlist &bl);
+
+  int _create_collection(coll_t c);
+  int _destroy_collection(coll_t c);
+  int _collection_add(coll_t cid, coll_t ocid, const ghobject_t& oid);
+  int _collection_move_rename(coll_t oldcid, const ghobject_t& oldoid,
+                             coll_t cid, const ghobject_t& o);
+  int _collection_setattr(coll_t cid, const char *name, const void *value,
+                         size_t size);
+  int _collection_setattrs(coll_t cid, map<string,bufferptr> &aset);
+  int _collection_rmattr(coll_t cid, const char *name);
+  int _collection_rename(const coll_t &cid, const coll_t &ncid);
+  int _split_collection(coll_t cid, uint32_t bits, uint32_t rem, coll_t dest);
+
+  int _save();
+  int _load();
+
+  void dump(Formatter *f);
+  void dump_all();
+
+public:
+  MemStore(CephContext *cct, const string& path)
+    : ObjectStore(path),
+      coll_lock("MemStore::coll_lock"),
+      apply_lock("MemStore::apply_lock"),
+      finisher(cct) { }
+  ~MemStore() { }
+
+  int update_version_stamp() {
+    return 0;
+  }
+  uint32_t get_target_version() {
+    return 1;
+  }
+
+  int peek_journal_fsid(uuid_d *fsid);
+
+  bool test_mount_in_use() {
+    return false;
+  }
+
+  int mount();
+  int umount();
+
+  int get_max_object_name_length() {
+    return 4096;
+  }
+
+  int mkfs();
+  int mkjournal() {
+    return 0;
+  }
+
+  void set_allow_sharded_objects() {
+  }
+  bool get_allow_sharded_objects() {
+    return true;
+  }
+
+  int statfs(struct statfs *buf);
+
+  bool exists(coll_t cid, const ghobject_t& oid);
+  int stat(
+    coll_t cid,
+    const ghobject_t& oid,
+    struct stat *st,
+    bool allow_eio = false); // struct stat?
+  int read(
+    coll_t cid,
+    const ghobject_t& oid,
+    uint64_t offset,
+    size_t len,
+    bufferlist& bl,
+    bool allow_eio = false);
+  int fiemap(coll_t cid, const ghobject_t& oid, uint64_t offset, size_t len, bufferlist& bl);
+  int getattr(coll_t cid, const ghobject_t& oid, const char *name, bufferptr& value);
+  int getattrs(coll_t cid, const ghobject_t& oid, map<string,bufferptr>& aset, bool user_only = false);
+
+  int list_collections(vector<coll_t>& ls);
+  bool collection_exists(coll_t c);
+  int collection_getattr(coll_t cid, const char *name,
+                        void *value, size_t size);
+  int collection_getattr(coll_t cid, const char *name, bufferlist& bl);
+  int collection_getattrs(coll_t cid, map<string,bufferptr> &aset);
+  bool collection_empty(coll_t c);
+  int collection_list(coll_t cid, vector<ghobject_t>& o);
+  int collection_list_partial(coll_t cid, ghobject_t start,
+                             int min, int max, snapid_t snap, 
+                             vector<ghobject_t> *ls, ghobject_t *next);
+  int collection_list_range(coll_t cid, ghobject_t start, ghobject_t end,
+                           snapid_t seq, vector<ghobject_t> *ls);
+
+  int omap_get(
+    coll_t cid,                ///< [in] Collection containing oid
+    const ghobject_t &oid,   ///< [in] Object containing omap
+    bufferlist *header,      ///< [out] omap header
+    map<string, bufferlist> *out /// < [out] Key to value map
+    );
+
+  /// Get omap header
+  int omap_get_header(
+    coll_t cid,                ///< [in] Collection containing oid
+    const ghobject_t &oid,   ///< [in] Object containing omap
+    bufferlist *header,      ///< [out] omap header
+    bool allow_eio = false ///< [in] don't assert on eio
+    );
+
+  /// Get keys defined on oid
+  int omap_get_keys(
+    coll_t cid,              ///< [in] Collection containing oid
+    const ghobject_t &oid, ///< [in] Object containing omap
+    set<string> *keys      ///< [out] Keys defined on oid
+    );
+
+  /// Get key values
+  int omap_get_values(
+    coll_t cid,                    ///< [in] Collection containing oid
+    const ghobject_t &oid,       ///< [in] Object containing omap
+    const set<string> &keys,     ///< [in] Keys to get
+    map<string, bufferlist> *out ///< [out] Returned keys and values
+    );
+
+  /// Filters keys into out which are defined on oid
+  int omap_check_keys(
+    coll_t cid,                ///< [in] Collection containing oid
+    const ghobject_t &oid,   ///< [in] Object containing omap
+    const set<string> &keys, ///< [in] Keys to check
+    set<string> *out         ///< [out] Subset of keys defined on oid
+    );
+
+  ObjectMap::ObjectMapIterator get_omap_iterator(
+    coll_t cid,              ///< [in] collection
+    const ghobject_t &oid  ///< [in] object
+    );
+
+  void set_fsid(uuid_d u);
+  uuid_d get_fsid();
+
+  filestore_perf_stat_t get_cur_stats();
+
+  int queue_transactions(
+    Sequencer *osr, list<Transaction*>& tls,
+    TrackedOpRef op = TrackedOpRef());
+};
+
+
+
+
+#endif
index 07466f1c2502ebf468bdce1f24cd690b5cb553cd..db22d978497dfec68afd7a56a4475438cc917663 100644 (file)
@@ -17,6 +17,7 @@
 #include "ObjectStore.h"
 #include "common/Formatter.h"
 #include "FileStore.h"
+#include "MemStore.h"
 #include "common/safe_io.h"
 
 ObjectStore *ObjectStore::create(CephContext *cct,
@@ -27,6 +28,9 @@ ObjectStore *ObjectStore::create(CephContext *cct,
   if (type == "filestore") {
     return new FileStore(data, journal);
   }
+  if (type == "memstore") {
+    return new MemStore(cct, data);
+  }
   return NULL;
 }