]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
os/: DBObjectMap and KeyValueDB interface with tests
authorSamuel Just <rexludorum@gmail.com>
Fri, 3 Feb 2012 17:16:09 +0000 (09:16 -0800)
committerSamuel Just <samuel.just@dreamhost.com>
Thu, 1 Mar 2012 18:11:42 +0000 (10:11 -0800)
DBObjectMap is an implementation of ObjectMap in terms of KeyValueDB.

Signed-off-by: Samuel Just <samuel.just@dreamhost.com>
src/common/config_opts.h
src/os/DBObjectMap.cc [new file with mode: 0644]
src/os/DBObjectMap.h [new file with mode: 0644]
src/os/FileStore.cc
src/os/KeyValueDB.h [new file with mode: 0644]
src/os/ObjectMap.h
src/test/ObjectMap/KeyValueDBMemory.cc [new file with mode: 0644]
src/test/ObjectMap/KeyValueDBMemory.h [new file with mode: 0644]
src/test/ObjectMap/test_object_map.cc [new file with mode: 0644]
src/test/encoding/types.h

index e2a84d152c536ad224692e7feadeadd65d9482a2..a83503e22712919573fb2f84988103c88a8ee6f0 100644 (file)
@@ -302,6 +302,7 @@ OPTION(osd_min_pg_log_entries, OPT_U32, 1000) // number of entries to keep in th
 OPTION(osd_op_complaint_time, OPT_FLOAT, 30) // how many seconds old makes an op complaint-worthy
 OPTION(osd_command_max_records, OPT_INT, 256)
 OPTION(filestore, OPT_BOOL, false)
+OPTION(filestore_debug_omap_check, OPT_BOOL, 0) // Expensive debugging check on sync
 OPTION(filestore_max_sync_interval, OPT_DOUBLE, 5)    // seconds
 OPTION(filestore_min_sync_interval, OPT_DOUBLE, .01)  // seconds
 OPTION(filestore_fake_attrs, OPT_BOOL, false)
diff --git a/src/os/DBObjectMap.cc b/src/os/DBObjectMap.cc
new file mode 100644 (file)
index 0000000..614feb1
--- /dev/null
@@ -0,0 +1,972 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+
+#include <iostream>
+#include <inttypes.h>
+#include "include/buffer.h"
+#include <set>
+#include <map>
+#include <string>
+#include <tr1/memory>
+
+#include <string>
+#include <vector>
+#include <tr1/memory>
+
+#include "CollectionIndex.h"
+#include "ObjectMap.h"
+#include "KeyValueDB.h"
+#include "DBObjectMap.h"
+#include <errno.h>
+
+#include "common/debug.h"
+#include "common/config.h"
+#define DOUT_SUBSYS filestore
+#undef dout_prefix
+#define dout_prefix *_dout << "filestore "
+
+const string DBObjectMap::USER_PREFIX = "_USER_";
+const string DBObjectMap::SYS_PREFIX = "_SYS_";
+const string DBObjectMap::COMPLETE_PREFIX = "_COMPLETE_";
+const string DBObjectMap::HEADER_KEY = "HEADER";
+const string DBObjectMap::USER_HEADER_KEY = "USER_HEADER";
+const string DBObjectMap::LEAF_PREFIX = "_LEAF_";
+const string DBObjectMap::GLOBAL_STATE_KEY = "HEADER";
+const string DBObjectMap::REVERSE_LEAF_PREFIX = "_REVLEAF_";
+
+static void append_escaped(const string &in, string *out)
+{
+  for (string::const_iterator i = in.begin(); i != in.end(); ++i) {
+    if (*i == '%') {
+      out->push_back('%');
+      out->push_back('p');
+    } else if (*i == '.') {
+      out->push_back('%');
+      out->push_back('e');
+    } else if (*i == '_') {
+      out->push_back('%');
+      out->push_back('u');
+    } else {
+      out->push_back(*i);
+    }
+  }
+}
+
+bool DBObjectMap::check(std::ostream &out)
+{
+  bool retval = true;
+  map<uint64_t, uint64_t> parent_to_num_children;
+  map<uint64_t, uint64_t> parent_to_actual_num_children;
+  KeyValueDB::Iterator iter = db->get_iterator(LEAF_PREFIX);
+  for (iter->seek_to_first(); iter->valid(); iter->next()) {
+    _Header header;
+    bufferlist bl = iter->value();
+    while (true) {
+      bufferlist::iterator bliter = bl.begin();
+      header.decode(bliter);
+      if (header.seq != 0)
+       parent_to_actual_num_children[header.seq] = header.num_children;
+      if (header.parent == 0)
+       break;
+
+      if (!parent_to_num_children.count(header.parent))
+       parent_to_num_children[header.parent] = 0;
+      parent_to_num_children[header.parent]++;
+      if (parent_to_actual_num_children.count(header.parent))
+       break;
+
+      set<string> to_get;
+      map<string, bufferlist> got;
+      to_get.insert(HEADER_KEY);
+      db->get(sys_parent_prefix(header), to_get, &got);
+      if (!got.size()) {
+       out << "Missing: seq " << header.parent << std::endl;
+       retval = false;
+       break;
+      } else {
+       bl = got.begin()->second;
+      }
+    }
+  }
+
+  for (map<uint64_t, uint64_t>::iterator i = parent_to_num_children.begin();
+       i != parent_to_num_children.end();
+       parent_to_num_children.erase(i++)) {
+    if (!parent_to_actual_num_children.count(i->first))
+      continue;
+    if (parent_to_actual_num_children[i->first] != i->second) {
+      out << "Invalid: seq " << i->first << " recorded children: "
+         << parent_to_actual_num_children[i->first] << " found: "
+         << i->second << std::endl;
+      retval = false;
+    }
+    parent_to_actual_num_children.erase(i->first);
+  }
+  return retval;
+}
+
+string DBObjectMap::hobject_key(coll_t c, const hobject_t &hoid)
+{
+  string out;
+  append_escaped(c.to_str(), &out);
+  out.push_back('.');
+  append_escaped(hoid.oid.name, &out);
+  out.push_back('.');
+  append_escaped(hoid.get_key(), &out);
+  out.push_back('.');
+
+  char snap_with_hash[1000];
+  char *t = snap_with_hash;
+  char *end = t + sizeof(snap_with_hash);
+  if (hoid.snap == CEPH_NOSNAP)
+    t += snprintf(t, end - t, ".head");
+  else if (hoid.snap == CEPH_SNAPDIR)
+    t += snprintf(t, end - t, ".snapdir");
+  else
+    t += snprintf(t, end - t, ".%llx", (long long unsigned)hoid.snap);
+  snprintf(t, end - t, ".%.*X", (int)(sizeof(hoid.hash)*2), hoid.hash);
+  out += string(snap_with_hash);
+  return out;
+}
+
+string DBObjectMap::map_header_key(coll_t c, const hobject_t &hoid)
+{
+  return hobject_key(c, hoid);
+}
+
+string DBObjectMap::header_key(uint64_t seq)
+{
+  char buf[100];
+  snprintf(buf, sizeof(buf), "%.*" PRId64, (int)(2*sizeof(seq)), seq);
+  return string(buf);
+}
+
+string DBObjectMap::complete_prefix(Header header)
+{
+  return USER_PREFIX + header_key(header->seq) + COMPLETE_PREFIX;
+}
+
+string DBObjectMap::user_prefix(Header header)
+{
+  return USER_PREFIX + header_key(header->seq) + USER_PREFIX;
+}
+
+string DBObjectMap::sys_prefix(Header header)
+{
+  return USER_PREFIX + header_key(header->seq) + SYS_PREFIX;
+}
+
+string DBObjectMap::sys_parent_prefix(_Header header)
+{
+  return USER_PREFIX + header_key(header.parent) + SYS_PREFIX;
+}
+
+int DBObjectMap::DBObjectMapIteratorImpl::init()
+{
+  invalid = false;
+  if (ready) {
+    return 0;
+  }
+  assert(!parent_iter);
+  if (header->parent) {
+    Header parent = map->lookup_parent(header);
+    if (!parent) {
+      assert(0);
+      return -EINVAL;
+    }
+    parent_iter.reset(new DBObjectMapIteratorImpl(map, parent));
+  }
+  key_iter = map->db->get_iterator(map->user_prefix(header));
+  assert(key_iter);
+  complete_iter = map->db->get_iterator(map->complete_prefix(header));
+  assert(complete_iter);
+  cur_iter = key_iter;
+  assert(cur_iter);
+  ready = true;
+  return 0;
+}
+
+ObjectMap::ObjectMapIterator DBObjectMap::get_iterator(
+  const hobject_t &hoid,
+  CollectionIndex::IndexedPath path)
+{
+  Header header = lookup_map_header(path->coll(), hoid);
+  if (!header)
+    return ObjectMapIterator(new EmptyIteratorImpl());
+  return _get_iterator(header);
+}
+
+int DBObjectMap::DBObjectMapIteratorImpl::seek_to_first()
+{
+  init();
+  r = 0;
+  if (parent_iter) {
+    r = parent_iter->seek_to_first();
+    if (r < 0)
+      return r;
+  }
+  r = key_iter->seek_to_first();
+  if (r < 0)
+    return r;
+  return adjust();
+}
+
+int DBObjectMap::DBObjectMapIteratorImpl::seek_to_last()
+{
+  init();
+  r = 0;
+  if (parent_iter) {
+    r = parent_iter->seek_to_last();
+    if (r < 0)
+      return r;
+    if (parent_iter->valid())
+      r = parent_iter->next();
+    if (r < 0)
+      return r;
+  }
+  r = key_iter->seek_to_last();
+  if (r < 0)
+    return r;
+  if (key_iter->valid())
+    r = key_iter->next();
+  if (r < 0)
+    return r;
+  return adjust();
+}
+
+int DBObjectMap::DBObjectMapIteratorImpl::lower_bound(const string &to)
+{
+  init();
+  r = 0;
+  if (parent_iter) {
+    r = parent_iter->lower_bound(to);
+    if (r < 0)
+      return r;
+  }
+  r = key_iter->lower_bound(to);
+  if (r < 0)
+    return r;
+  return adjust();
+}
+
+int DBObjectMap::DBObjectMapIteratorImpl::upper_bound(const string &after)
+{
+  init();
+  r = 0;
+  if (parent_iter) {
+    r = parent_iter->upper_bound(after);
+    if (r < 0)
+      return r;
+  }
+  r = key_iter->upper_bound(after);
+  if (r < 0)
+    return r;
+  return adjust();
+}
+
+bool DBObjectMap::DBObjectMapIteratorImpl::valid()
+{
+  bool valid = !invalid && ready;
+  assert(!valid || cur_iter->valid());
+  return valid;
+}
+
+bool DBObjectMap::DBObjectMapIteratorImpl::valid_parent()
+{
+  if (parent_iter && parent_iter->valid() &&
+      (!key_iter->valid() || key_iter->key() > parent_iter->key()))
+    return true;
+  return false;
+}
+
+int DBObjectMap::DBObjectMapIteratorImpl::next()
+{
+  assert(cur_iter->valid());
+  assert(valid());
+  cur_iter->next();
+  return adjust();
+}
+
+int DBObjectMap::DBObjectMapIteratorImpl::next_parent()
+{
+  if (!parent_iter || !parent_iter->valid()) {
+    invalid = true;
+    return 0;
+  }
+  r = next();
+  if (r < 0)
+    return r;
+  if (!valid() || on_parent() || !parent_iter->valid())
+    return 0;
+
+  return lower_bound(parent_iter->key());
+}
+
+int DBObjectMap::DBObjectMapIteratorImpl::in_complete_region(const string &to_test,
+                                                            string *begin,
+                                                            string *end)
+{
+  complete_iter->upper_bound(to_test);
+  if (complete_iter->valid())
+    complete_iter->prev();
+  else
+    complete_iter->seek_to_last();
+
+  if (!complete_iter->valid())
+    return false;
+
+  string _end;
+  if (begin)
+    *begin = complete_iter->key();
+  _end = string(complete_iter->value().c_str());
+  if (end)
+    *end = _end;
+  return (to_test >= complete_iter->key()) && (!_end.size() || _end > to_test);
+}
+
+/**
+ * Moves parent_iter to the next position both out of the complete_region and 
+ * not equal to key_iter.  Then, we set cur_iter to parent_iter if valid and
+ * less than key_iter and key_iter otherwise.
+ */
+int DBObjectMap::DBObjectMapIteratorImpl::adjust()
+{
+  string begin, end;
+  while (parent_iter && parent_iter->valid()) {
+    if (in_complete_region(parent_iter->key(), &begin, &end)) {
+      if (end.size() == 0) {
+       parent_iter->seek_to_last();
+       if (parent_iter->valid())
+         parent_iter->next();
+      } else
+       parent_iter->lower_bound(end);
+    } else if (key_iter->valid() && key_iter->key() == parent_iter->key()) {
+      parent_iter->next();
+    } else {
+      break;
+    }
+  }
+  if (valid_parent()) {
+    cur_iter = parent_iter;
+  } else if (key_iter->valid()) {
+    cur_iter = key_iter;
+  } else {
+    invalid = true;
+  }
+  assert(invalid || cur_iter->valid());
+  return 0;
+}
+
+
+string DBObjectMap::DBObjectMapIteratorImpl::key()
+{
+  return cur_iter->key();
+}
+
+bufferlist DBObjectMap::DBObjectMapIteratorImpl::value()
+{
+  return cur_iter->value();
+}
+
+int DBObjectMap::DBObjectMapIteratorImpl::status()
+{
+  return r;
+}
+
+int DBObjectMap::set_keys(const hobject_t &hoid,
+                         CollectionIndex::IndexedPath path,
+                         const map<string, bufferlist> &set)
+{
+  KeyValueDB::Transaction t = db->get_transaction();
+  Header header = lookup_create_map_header(path->coll(), hoid, t);
+  if (!header)
+    return -EINVAL;
+
+  t->set(user_prefix(header), set);
+
+  return db->submit_transaction(t);
+}
+
+int DBObjectMap::set_header(const hobject_t &hoid,
+                           CollectionIndex::IndexedPath path,
+                           const bufferlist &bl)
+{
+  KeyValueDB::Transaction t = db->get_transaction();
+  Header header = lookup_create_map_header(path->coll(), hoid, t);
+  if (!header)
+    return -EINVAL;
+  _set_header(header, bl, t);
+  return db->submit_transaction(t);
+}
+
+void DBObjectMap::_set_header(Header header, const bufferlist &bl,
+                             KeyValueDB::Transaction t)
+{
+  map<string, bufferlist> to_set;
+  to_set[USER_HEADER_KEY] = bl;
+  t->set(sys_prefix(header), to_set);
+}
+
+int DBObjectMap::get_header(const hobject_t &hoid,
+                           CollectionIndex::IndexedPath path,
+                           bufferlist *bl)
+{
+  Header header = lookup_map_header(path->coll(), hoid);
+  if (!header) {
+    return 0;
+  }
+  return _get_header(header, bl);
+}
+
+int DBObjectMap::_get_header(Header header,
+                            bufferlist *bl)
+{
+  int r = 0;
+  map<string, bufferlist> out;
+  while (r == 0) {
+    out.clear();
+    set<string> to_get;
+    to_get.insert(USER_HEADER_KEY);
+    int r = db->get(sys_prefix(header), to_get, &out);
+    if (r == 0 && out.size())
+      break;
+    Header current(header);
+    if (!current->parent)
+      break;
+    header = lookup_parent(current);
+  }
+
+  if (r < 0)
+    return r;
+  if (out.size())
+    bl->swap(out.begin()->second);
+  return 0;
+}
+
+int DBObjectMap::clear(const hobject_t &hoid,
+                      CollectionIndex::IndexedPath path)
+{
+  KeyValueDB::Transaction t = db->get_transaction();
+  Header header = lookup_map_header(path->coll(), hoid);
+  if (!header)
+    return -ENOENT;
+  remove_map_header(path->coll(), hoid, header, t);
+  assert(header->num_children > 0);
+  header->num_children--;
+  int r = _clear(header, t);
+  if (r < 0)
+    return r;
+  return db->submit_transaction(t);
+}
+
+int DBObjectMap::_clear(Header header,
+                       KeyValueDB::Transaction t)
+{
+  while (1) {
+    if (header->num_children) {
+      set_header(header, t);
+      break;
+    }
+    clear_header(header, t);
+    if (!header->parent)
+      break;
+    Header parent = lookup_parent(header);
+    if (!parent) {
+      return -EINVAL;
+    }
+    assert(parent->num_children > 0);
+    parent->num_children--;
+    header.swap(parent);
+  }
+  return 0;
+}
+
+int DBObjectMap::merge_new_complete(Header header,
+                                   const map<string, string> &new_complete,
+                                   DBObjectMapIterator iter,
+                                   KeyValueDB::Transaction t)
+{
+  KeyValueDB::Iterator complete_iter = db->get_iterator(
+    complete_prefix(header)
+    );
+  map<string, string>::const_iterator i = new_complete.begin();
+  set<string> to_remove;
+  map<string, bufferlist> to_add;
+
+  string begin, end;
+  int r = 0;
+  while (i != new_complete.end()) {
+    string new_begin = i->first;
+    string new_end = i->second;
+    r = iter->in_complete_region(new_begin, &begin, &end);
+    if (r < 0)
+      return r;
+    if (r) {
+      to_remove.insert(begin);
+      new_begin = begin;
+    }
+    ++i;
+    while (i != new_complete.end()) {
+      if (!new_end.size() || i->first <= new_end) {
+       if (!new_end.size() && i->second > new_end) {
+         new_end = i->second;
+       }
+       ++i;
+       continue;
+      }
+
+      r = iter->in_complete_region(new_end, &begin, &end);
+      if (r < 0)
+       return r;
+      if (r) {
+       to_remove.insert(begin);
+       new_end = end;
+       continue;
+      }
+      break;
+    }
+    bufferlist bl;
+    bl.append(bufferptr(new_end.c_str(), new_end.size() + 1));
+    to_add.insert(make_pair(new_begin, bl));
+  }
+  t->rmkeys(complete_prefix(header), to_remove);
+  t->set(complete_prefix(header), to_add);
+  return 0;
+}
+
+int DBObjectMap::copy_up_header(Header header,
+                               KeyValueDB::Transaction t)
+{
+  bufferlist bl;
+  int r = _get_header(header, &bl);
+  if (r < 0)
+    return r;
+
+  _set_header(header, bl, t);
+  return 0;
+}
+
+int DBObjectMap::need_parent(DBObjectMapIterator iter)
+{
+  int r = iter->seek_to_first();
+  if (r < 0)
+    return r;
+
+  if (!iter->valid())
+    return 0;
+
+  string begin, end;
+  if (iter->in_complete_region(iter->key(), &begin, &end) && end == "") {
+    return 0;
+  }
+  return 1;
+}
+
+int DBObjectMap::rm_keys(const hobject_t &hoid,
+                        CollectionIndex::IndexedPath path,
+                        const set<string> &to_clear)
+{
+  Header header = lookup_map_header(path->coll(), hoid);
+  if (!header)
+    return -ENOENT;
+  KeyValueDB::Transaction t = db->get_transaction();
+  t->rmkeys(user_prefix(header), to_clear);
+  if (!header->parent) {
+    return db->submit_transaction(t);
+  }
+
+  // Copy up keys from parent around to_clear
+  int keep_parent;
+  {
+    DBObjectMapIterator iter = _get_iterator(header);
+    iter->seek_to_first();
+    map<string, string> new_complete;
+    map<string, bufferlist> to_write;
+    unsigned copied = 0;
+    for(set<string>::const_iterator i = to_clear.begin();
+       i != to_clear.end();
+      ) {
+      copied = 0;
+      iter->lower_bound(*i);
+      ++i;
+      if (!iter->valid())
+       break;
+      string begin = iter->key();
+      if (!iter->on_parent())
+       iter->next_parent();
+      if (new_complete.size() && new_complete.rbegin()->second == begin) {
+       begin = new_complete.rbegin()->first;
+      }
+      while (iter->valid() && copied < 20) {
+       if (!to_clear.count(iter->key()))
+         to_write[iter->key()].append(iter->value());
+       if (i != to_clear.end() && *i <= iter->key()) {
+         ++i;
+         copied = 0;
+       }
+
+       iter->next_parent();
+       copied++;
+      }
+      if (iter->valid()) {
+       new_complete[begin] = iter->key();
+      } else {
+       new_complete[begin] = "";
+       break;
+      }
+    }
+    t->set(user_prefix(header), to_write);
+    merge_new_complete(header, new_complete, iter, t);
+    keep_parent = need_parent(iter);
+    if (keep_parent < 0)
+      return keep_parent;
+  }
+  if (!keep_parent) {
+    copy_up_header(header, t);
+    Header parent = lookup_parent(header);
+    if (!parent)
+      return -EINVAL;
+    parent->num_children--;
+    _clear(parent, t);
+    header->parent = 0;
+    set_header(header, t);
+    t->rmkeys_by_prefix(complete_prefix(header));
+  }
+  return db->submit_transaction(t);
+}
+
+int DBObjectMap::get(const hobject_t &hoid,
+                    CollectionIndex::IndexedPath path,
+                    bufferlist *_header,
+                    map<string, bufferlist> *out)
+{
+  Header header = lookup_map_header(path->coll(), hoid);
+  if (!header)
+    return -ENOENT;
+  _get_header(header, _header);
+  ObjectMapIterator iter = _get_iterator(header);
+  for (iter->seek_to_first(); iter->valid(); iter->next()) {
+    if (iter->status())
+      return iter->status();
+    out->insert(make_pair(iter->key(), iter->value()));
+  }
+  return 0;
+}
+
+int DBObjectMap::get_keys(const hobject_t &hoid,
+                         CollectionIndex::IndexedPath path,
+                         set<string> *keys)
+{
+  Header header = lookup_map_header(path->coll(), hoid);
+  if (!header);
+    return -ENOENT;
+  ObjectMapIterator iter = get_iterator(hoid, path);
+  for (; iter->valid(); iter->next()) {
+    if (iter->status())
+      return iter->status();
+    keys->insert(iter->key());
+  }
+  return 0;
+}
+
+int DBObjectMap::scan(Header header,
+                     const set<string> &in_keys,
+                     set<string> *out_keys,
+                     map<string, bufferlist> *out_values)
+{
+  ObjectMapIterator db_iter = _get_iterator(header);
+  for (set<string>::const_iterator key_iter = in_keys.begin();
+       key_iter != in_keys.end();
+       ++key_iter) {
+    db_iter->lower_bound(*key_iter);
+    if (db_iter->status())
+      return db_iter->status();
+    if (db_iter->valid() && db_iter->key() == *key_iter) {
+      if (out_keys)
+       out_keys->insert(*key_iter);
+      if (out_values)
+       out_values->insert(make_pair(db_iter->key(), db_iter->value()));
+    }
+  }
+  return 0;
+}
+
+int DBObjectMap::get_values(const hobject_t &hoid,
+                           CollectionIndex::IndexedPath path,
+                           const set<string> &keys,
+                           map<string, bufferlist> *out)
+{
+  Header header = lookup_map_header(path->coll(), hoid);
+  if (!header)
+    return -ENOENT;
+  return scan(header, keys, 0, out);;
+}
+
+int DBObjectMap::check_keys(const hobject_t &hoid,
+                           CollectionIndex::IndexedPath path,
+                           const set<string> &keys,
+                           set<string> *out)
+{
+  Header header = lookup_map_header(path->coll(), hoid);
+  if (!header)
+    return -ENOENT;
+  return scan(header, keys, out, 0);
+}
+
+int DBObjectMap::clone_keys(const hobject_t &hoid,
+                           CollectionIndex::IndexedPath path,
+                           const hobject_t &target,
+                           CollectionIndex::IndexedPath target_path)
+{
+  KeyValueDB::Transaction t = db->get_transaction();
+  {
+    Header destination = lookup_map_header(target_path->coll(), target);
+    if (destination) {
+      remove_map_header(target_path->coll(), target, destination, t);
+      destination->num_children--;
+      _clear(destination, t);
+    }
+  }
+
+  Header parent = lookup_map_header(path->coll(), hoid);
+  if (!parent)
+    return db->submit_transaction(t);
+
+  Header source = generate_new_header(path->coll(), hoid, parent);
+  Header destination = generate_new_header(target_path->coll(), target, parent);
+  _Header lsource, ldestination;
+  source->num_children = parent->num_children;
+  lsource.parent = source->seq;
+  ldestination.parent = destination->seq;
+  parent->num_children = 2;
+  set_header(parent, t);
+  set_header(source, t);
+  set_header(destination, t);
+  set_map_header(target_path->coll(), target, ldestination, t);
+
+  string hkey(header_key(parent->seq));
+  KeyValueDB::Iterator iter = db->get_iterator(REVERSE_LEAF_PREFIX);
+  iter->lower_bound(hkey);
+  assert(iter->valid());
+  assert(string(iter->key(), 0, hkey.size()) == hkey);
+  for (; iter->valid() && string(iter->key(), 0, hkey.size()) == hkey;
+       iter->next()) {
+    pair<coll_t, hobject_t> p;
+    bufferlist bl = iter->value();
+    bufferlist::iterator bliter = bl.begin();
+    ::decode(p, bliter);
+    remove_map_header(p.first, p.second, parent, t);
+    set_map_header(p.first, p.second, lsource , t);
+  }
+  return db->submit_transaction(t);
+}
+
+int DBObjectMap::link_keys(const hobject_t &hoid,
+                          CollectionIndex::IndexedPath path,
+                          const hobject_t &target,
+                          CollectionIndex::IndexedPath target_path)
+{
+  KeyValueDB::Transaction t = db->get_transaction();
+  {
+    Header destination = lookup_map_header(target_path->coll(), target);
+    if (destination) {
+      remove_map_header(target_path->coll(), target, destination, t);
+      destination->num_children--;
+      _clear(destination, t);
+    }
+  }
+  Header header = lookup_create_map_header(path->coll(), hoid, t);
+
+  assert(header->num_children > 0);
+  header->num_children++;
+  set_header(header, t);
+  _Header ldestination;
+  ldestination.parent = header->seq;
+  set_map_header(target_path->coll(), target, ldestination, t);
+  return db->submit_transaction(t);
+}
+
+int DBObjectMap::init() {
+  State state;
+  map<string, bufferlist> result;
+  set<string> to_get;
+  to_get.insert(GLOBAL_STATE_KEY);
+  int r = db->get(SYS_PREFIX, to_get, &result);
+  if (r < 0)
+    return r;
+  if (result.size()) {
+    bufferlist::iterator bliter = result.begin()->second.begin();
+    state.decode(bliter);
+    next_seq = state.seq;
+  }
+  dout(20) << "(init)dbobjectmap: seq is " << next_seq << dendl;
+  return 0;
+}
+
+int DBObjectMap::write_state() {
+  dout(20) << "dbobjectmap: seq is " << next_seq << dendl;
+  KeyValueDB::Transaction t = db->get_transaction();
+  State state;
+  state.seq = next_seq;
+  bufferlist bl;
+  state.encode(bl);
+  map<string, bufferlist> to_write;
+  to_write[GLOBAL_STATE_KEY] = bl;
+  t->set(SYS_PREFIX, to_write);
+  return db->submit_transaction(t);
+}
+
+
+DBObjectMap::Header DBObjectMap::lookup_map_header(coll_t c, const hobject_t &hoid)
+{
+  Mutex::Locker l(header_lock);
+  _Header lheader;
+  while (true) {
+    map<string, bufferlist> out;
+    set<string> keys;
+    keys.insert(map_header_key(c, hoid));
+    int r = db->get(LEAF_PREFIX, keys, &out);
+    if (r < 0)
+      return Header();
+    if (out.size() < 1)
+      return Header();
+    bufferlist::iterator iter = out.begin()->second.begin();
+    lheader.decode(iter);
+
+    if (in_use.count(lheader.parent)) {
+      header_cond.Wait(header_lock);
+      continue;
+    }
+    in_use.insert(lheader.parent);
+    break;
+  }
+
+  dout(20) << "lookup_map_header: parent seq is " << lheader.parent
+       << " for hoid " << hoid << dendl;
+  map<string, bufferlist> out;
+  set<string> keys;
+  keys.insert(HEADER_KEY);
+  int r = db->get(sys_parent_prefix(lheader), keys, &out);
+  if (r < 0)
+    return Header();
+  assert(out.size());
+
+  Header header = Header(new _Header(), RemoveOnDelete(this));
+  header->seq = lheader.parent;
+
+  bufferlist::iterator iter = out.begin()->second.begin();
+  header->decode(iter);
+  return header;
+}
+
+DBObjectMap::Header DBObjectMap::generate_new_header(coll_t c, const hobject_t &hoid,
+                                                    Header parent)
+{
+  Mutex::Locker l(header_lock);
+  Header header = Header(new _Header(), RemoveOnDelete(this));
+  header->seq = next_seq++;
+  if (parent)
+    header->parent = parent->seq;
+  header->num_children = 1;
+  header->c = c;
+  header->hoid = hoid;
+  assert(!in_use.count(header->seq));
+  in_use.insert(header->seq);
+
+  write_state();
+  return header;
+}
+
+DBObjectMap::Header DBObjectMap::lookup_parent(Header input)
+{
+  Mutex::Locker l(header_lock);
+  while (in_use.count(input->parent))
+    header_cond.Wait(header_lock);
+  map<string, bufferlist> out;
+  set<string> keys;
+  keys.insert(HEADER_KEY);
+
+  dout(20) << "lookup_parent: parent " << input->parent
+       << " for seq " << input->seq << dendl;
+  int r = db->get(sys_parent_prefix(input), keys, &out);
+  if (r < 0) {
+    assert(0);
+    return Header();
+  }
+  if (out.size() < 1) {
+    assert(0);
+    return Header();
+  }
+
+  Header header = Header(new _Header(), RemoveOnDelete(this));
+  header->seq = input->parent;
+  bufferlist::iterator iter = out.begin()->second.begin();
+  header->decode(iter);
+  dout(20) << "lookup_parent: parent seq is " << header->seq << " with parent "
+       << header->parent << dendl;
+  in_use.insert(header->seq);
+  return header;
+}
+
+DBObjectMap::Header DBObjectMap::lookup_create_map_header(coll_t c, const hobject_t &hoid,
+                                                         KeyValueDB::Transaction t)
+{
+  Header header = lookup_map_header(c, hoid);
+  if (!header) {
+    header = generate_new_header(c, hoid, Header());
+    set_header(header, t);
+    _Header lheader;
+    lheader.parent = header->seq;
+    set_map_header(c, hoid, lheader, t);
+  }
+  return header;
+}
+
+void DBObjectMap::clear_header(Header header, KeyValueDB::Transaction t)
+{
+  dout(20) << "clear_header: clearing seq " << header->seq << dendl;
+  t->rmkeys_by_prefix(user_prefix(header));
+  t->rmkeys_by_prefix(sys_prefix(header));
+  t->rmkeys_by_prefix(complete_prefix(header));
+  set<string> keys;
+  keys.insert(header_key(header->seq));
+  t->rmkeys(USER_PREFIX, keys);
+}
+
+void DBObjectMap::set_header(Header header, KeyValueDB::Transaction t)
+{
+  dout(20) << "set_header: setting seq " << header->seq << dendl;
+  map<string, bufferlist> to_write;
+  header->encode(to_write[HEADER_KEY]);
+  t->set(sys_prefix(header), to_write);
+}
+
+void DBObjectMap::remove_map_header(coll_t c, const hobject_t &hoid,
+                                   Header header,
+                                   KeyValueDB::Transaction t)
+{
+  dout(20) << "remove_map_header: removing " << header->seq
+       << " hoid " << hoid << dendl;
+  set<string> to_remove;
+  to_remove.insert(map_header_key(c, hoid));
+  t->rmkeys(LEAF_PREFIX, to_remove);
+  to_remove.clear();
+  to_remove.insert(header_key(header->seq) +
+                  map_header_key(c, hoid));
+  t->rmkeys(REVERSE_LEAF_PREFIX, to_remove);
+}
+
+void DBObjectMap::set_map_header(coll_t c, const hobject_t &hoid, _Header header,
+                                KeyValueDB::Transaction t)
+{
+  dout(20) << "set_map_header: setting " << header.seq
+       << " hoid " << hoid << " parent seq "
+       << header.parent << dendl;
+  map<string, bufferlist> to_set;
+  header.encode(to_set[map_header_key(c, hoid)]);
+  t->set(LEAF_PREFIX, to_set);
+  to_set.clear();
+  ::encode(make_pair(c, hoid), to_set[
+            header_key(header.parent) +
+            map_header_key(c, hoid)]);
+  t->set(REVERSE_LEAF_PREFIX, to_set);
+}
diff --git a/src/os/DBObjectMap.h b/src/os/DBObjectMap.h
new file mode 100644 (file)
index 0000000..605e603
--- /dev/null
@@ -0,0 +1,423 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+#ifndef DBOBJECTMAP_DB_H
+#define DBOBJECTMAP_DB_H
+
+#include "include/buffer.h"
+#include <set>
+#include <map>
+#include <string>
+
+#include <string>
+#include <vector>
+#include <tr1/memory>
+#include <boost/scoped_ptr.hpp>
+
+#include "CollectionIndex.h"
+#include "ObjectMap.h"
+#include "KeyValueDB.h"
+#include "osd/osd_types.h"
+#include "common/Mutex.h"
+#include "common/Cond.h"
+
+/**
+ * DBObjectMap: Implements ObjectMap in terms of KeyValueDB
+ *
+ * Prefix space structure:
+ *
+ * @see complete_prefix
+ * @see user_prefix
+ * @see sys_prefix
+ *
+ * - LEAF_PREFIX: Contains mapping from (coll_t,hobject_t)->seq
+ * - REVERSE_LEAF_PREFIX: Contains mapping from seq->[(coll_t, hobject_t)]
+ *                        @see set_map_header
+ *                        @see remove_map_header
+ * - SYS_PREFIX: GLOBAL_STATE_KEY - contains next seq number
+ *                                  @see State
+ *                                  @see write_state
+ *                                  @see init
+ *                                  @see generate_new_header
+ * - USER_PREFIX + header_key(header->seq) + USER_PREFIX
+ *              : key->value for header->seq
+ * - USER_PREFIX + header_key(header->seq) + COMPLETE_PREFIX: see below
+ * - USER_PREFIX + header_key(header->seq) + SYS_PREFIX
+ *              : USER_HEADER_KEY - omap header for header->seq
+ *              : HEADER_KEY - encoding of header for header->seq
+ *
+ * For each node (represented by a header, not counting LEAF_PREFIX space), we
+ * store three mappings: the key mapping, the complete mapping, and the parent.
+ * The complete mapping (COMPLETE_PREFIX space) is key->key.  Each x->y entry in
+ * this mapping indicates that the key mapping contains all entries on [x,y).
+ * Note, max string is represented by "", so ""->"" indicates that the parent
+ * is unnecessary (@see rm_keys).  When looking up a key not contained in the
+ * the complete set, we have to check the parent if we don't find it in the
+ * key set.  During rm_keys, we copy keys from the parent and update the
+ * complete set to reflect the change @see rm_keys.
+ */
+class DBObjectMap : public ObjectMap {
+public:
+  boost::scoped_ptr<KeyValueDB> db;
+
+  /**
+   * Each header has a unique id.  This value is updated and written *before*
+   * the transaction writing th new header
+   */
+  uint64_t next_seq;
+
+  /**
+   * Serializes access to next_seq as well as the in_use set
+   */
+  Mutex header_lock;
+  Cond header_cond;
+
+  /**
+   * Set of headers currently in use
+   */
+  set<uint64_t> in_use;
+
+  DBObjectMap(KeyValueDB *db) : db(db), next_seq(1),
+                               header_lock("DBOBjectMap")
+    {}
+
+  int set_keys(
+    const hobject_t &hoid,
+    CollectionIndex::IndexedPath path,
+    const map<string, bufferlist> &set
+    );
+
+  int set_header(
+    const hobject_t &hoid,
+    CollectionIndex::IndexedPath path,
+    const bufferlist &bl
+    );
+
+  int get_header(
+    const hobject_t &hoid,
+    CollectionIndex::IndexedPath path,
+    bufferlist *bl
+    );
+
+  int clear(
+    const hobject_t &hoid,
+    CollectionIndex::IndexedPath path
+    );
+
+  int rm_keys(
+    const hobject_t &hoid,
+    CollectionIndex::IndexedPath path,
+    const set<string> &to_clear
+    );
+
+  int get(
+    const hobject_t &hoid,
+    CollectionIndex::IndexedPath path,
+    bufferlist *header,
+    map<string, bufferlist> *out
+    );
+
+  int get_keys(
+    const hobject_t &hoid,
+    CollectionIndex::IndexedPath path,
+    set<string> *keys
+    );
+
+  int get_values(
+    const hobject_t &hoid,
+    CollectionIndex::IndexedPath path,
+    const set<string> &keys,
+    map<string, bufferlist> *out
+    );
+
+  int check_keys(
+    const hobject_t &hoid,
+    CollectionIndex::IndexedPath path,
+    const set<string> &keys,
+    set<string> *out
+    );
+
+  int clone_keys(
+    const hobject_t &hoid,
+    CollectionIndex::IndexedPath path,
+    const hobject_t &target,
+    CollectionIndex::IndexedPath target_path
+    );
+
+  int link_keys(
+    const hobject_t &hoid,
+    CollectionIndex::IndexedPath path,
+    const hobject_t &target,
+    CollectionIndex::IndexedPath target_path
+    );
+
+  /// Read initial state from backing store
+  int init();
+
+  /// Consistency check, debug, there must be no parallel writes
+  bool check(std::ostream &out);
+
+  ObjectMapIterator get_iterator(const hobject_t &hoid,
+                                CollectionIndex::IndexedPath path);
+
+  static const string USER_PREFIX;
+  static const string SYS_PREFIX;
+  static const string COMPLETE_PREFIX;
+  static const string HEADER_KEY;
+  static const string USER_HEADER_KEY;
+  static const string LEAF_PREFIX;
+  static const string GLOBAL_STATE_KEY;
+  static const string REVERSE_LEAF_PREFIX;
+
+  /// persistent state for store @see generate_header
+  struct State {
+    uint64_t seq;
+    State() : seq(0) {}
+    State(uint64_t seq) : seq(seq) {}
+
+    void encode(bufferlist &bl) const {
+      ENCODE_START(1, 1, bl);
+      ::encode(seq, bl);
+      ENCODE_FINISH(bl);
+    }
+
+    void decode(bufferlist::iterator &bl) {
+      DECODE_START(1, bl);
+      ::decode(seq, bl);
+      DECODE_FINISH(bl);
+    }
+
+    void dump(Formatter *f) const {
+      f->dump_unsigned("seq", seq);
+    }
+
+    static void generate_test_instances(list<State*> &o) {
+      o.push_back(new State(0));
+      o.push_back(new State(20));
+    }
+  };
+
+  struct _Header {
+    uint64_t seq;
+    uint64_t parent;
+    uint64_t num_children;
+
+    coll_t c;
+    hobject_t hoid;
+
+    void encode(bufferlist &bl) const {
+      ENCODE_START(1, 1, bl);
+      ::encode(seq, bl);
+      ::encode(parent, bl);
+      ::encode(num_children, bl);
+      ::encode(c, bl);
+      ::encode(hoid, bl);
+      ENCODE_FINISH(bl);
+    }
+
+    void decode(bufferlist::iterator &bl) {
+      DECODE_START(1, bl);
+      ::decode(seq, bl);
+      ::decode(parent, bl);
+      ::decode(num_children, bl);
+      ::decode(c, bl);
+      ::decode(hoid, bl);
+      DECODE_FINISH(bl);
+    }
+
+    void dump(Formatter *f) const {
+      f->dump_unsigned("seq", seq);
+      f->dump_unsigned("parent", parent);
+      f->dump_unsigned("num_children", num_children);
+      f->dump_stream("coll") << c;
+      f->dump_stream("oid") << hoid;
+    }
+
+    static void generate_test_instances(list<_Header*> &o) {
+      o.push_back(new _Header);
+      o.push_back(new _Header);
+      o.back()->parent = 20;
+      o.back()->seq = 30;
+    }
+
+    _Header() : seq(0), parent(0), num_children(1) {}
+  };
+
+private:
+  /// Implicit lock on Header->seq
+  typedef std::tr1::shared_ptr<_Header> Header;
+
+  /// String munging
+  string hobject_key(coll_t c, const hobject_t &hoid);
+  string map_header_key(coll_t c, const hobject_t &hoid);
+  string header_key(uint64_t seq);
+  string complete_prefix(Header header);
+  string user_prefix(Header header);
+  string sys_prefix(Header header);
+  string sys_parent_prefix(_Header header);
+  string sys_parent_prefix(Header header) {
+    return sys_parent_prefix(*header);
+  }
+
+  class EmptyIteratorImpl : public ObjectMapIteratorImpl {
+  public:
+    int seek_to_first() { return 0; }
+    int seek_to_last() { return 0; }
+    int upper_bound(const string &after) { return 0; }
+    int lower_bound(const string &to) { return 0; }
+    bool valid() { return false; }
+    int next() { assert(0); return 0; }
+    string key() { assert(0); return ""; }
+    bufferlist value() { assert(0); return bufferlist(); }
+    int status() { return 0; }
+  };
+
+
+  /// Iterator
+  class DBObjectMapIteratorImpl : public ObjectMapIteratorImpl {
+  public:
+    DBObjectMap *map;
+
+    /// NOTE: implicit lock on header->seq AND for all ancestors
+    Header header;
+
+    /// parent_iter == NULL iff no parent
+    std::tr1::shared_ptr<DBObjectMapIteratorImpl> parent_iter;
+    KeyValueDB::Iterator key_iter;
+    KeyValueDB::Iterator complete_iter;
+
+    /// cur_iter points to currently valid iterator
+    std::tr1::shared_ptr<ObjectMapIteratorImpl> cur_iter;
+    int r;
+
+    /// init() called, key_iter, complete_iter, parent_iter filled in
+    bool ready;
+    /// past end
+    bool invalid;
+
+    DBObjectMapIteratorImpl(DBObjectMap *map, Header header) :
+      map(map), header(header), r(0), ready(false), invalid(true) {}
+    int seek_to_first();
+    int seek_to_last();
+    int upper_bound(const string &after);
+    int lower_bound(const string &to);
+    bool valid();
+    int next();
+    string key();
+    bufferlist value();
+    int status();
+
+    bool on_parent() {
+      return cur_iter == parent_iter;
+    }
+
+    /// skips to next valid parent entry
+    int next_parent();
+
+    /// Tests whether to_test is in complete region
+    int in_complete_region(const string &to_test, ///< [in] key to test
+                          string *begin,         ///< [out] beginning of region
+                          string *end            ///< [out] end of region
+      ); ///< @returns true if to_test is in the complete region, else false
+
+  private:
+    int init();
+    bool valid_parent();
+    int adjust();
+  };
+
+  typedef std::tr1::shared_ptr<DBObjectMapIteratorImpl> DBObjectMapIterator;
+  DBObjectMapIterator _get_iterator(Header header) {
+    return DBObjectMapIterator(new DBObjectMapIteratorImpl(this, header));
+  }
+
+  /// sys
+
+  /// Removes node corresponding to header
+  void clear_header(Header header, KeyValueDB::Transaction t);
+
+  /// Set node containing input to new contents
+  void set_header(Header input, KeyValueDB::Transaction t);
+
+  /// Remove leaf node corresponding to hoid in c
+  void remove_map_header(coll_t c, const hobject_t &hoid,
+                        Header header,
+                        KeyValueDB::Transaction t);
+
+  /// Set leaf node for c and hoid to the value of header
+  void set_map_header(coll_t c, const hobject_t &hoid, _Header header,
+                    KeyValueDB::Transaction t);
+
+  /// Lookup or create header for c hoid
+  Header lookup_create_map_header(coll_t c, const hobject_t &hoid,
+                                 KeyValueDB::Transaction t);
+
+  /**
+   * Generate new header for c hoid with new seq number
+   *
+   * Has the side effect of syncronously saving the new DBObjectMap state
+   */
+  Header generate_new_header(coll_t c, const hobject_t &hoid, Header parent);
+
+  /// Lookup leaf header for c hoid
+  Header lookup_map_header(coll_t c, const hobject_t &hoid);
+
+  /// Lookup header node for input
+  Header lookup_parent(Header input);
+
+
+  /// Helpers
+  int _get_header(Header header, bufferlist *bl);
+
+  /// Scan keys in header into out_keys and out_values (if nonnull)
+  int scan(Header header,
+          const set<string> &in_keys,
+          set<string> *out_keys,
+          map<string, bufferlist> *out_values);
+
+  /// Remove header and all related prefixes
+  int _clear(Header header,
+            KeyValueDB::Transaction t);
+  /// Adds to t operations necessary to add new_complete to the complete set
+  int merge_new_complete(Header header,
+                        const map<string, string> &new_complete,
+                        DBObjectMapIterator iter,
+                        KeyValueDB::Transaction t);
+
+  /// Writes out State (mainly next_seq)
+  int write_state();
+
+  /// 0 if the complete set now contains all of key space, < 0 on error, 1 else
+  int need_parent(DBObjectMapIterator iter);
+
+  /// Copies header entry from parent @see rm_keys
+  int copy_up_header(Header header,
+                    KeyValueDB::Transaction t);
+
+  /// Sets header @see set_header
+  void _set_header(Header header, const bufferlist &bl,
+                  KeyValueDB::Transaction t);
+
+
+  /** 
+   * Removes header seq lock once Header is out of scope
+   * @see lookup_parent
+   * @see generate_new_header
+   */
+  class RemoveOnDelete {
+  public:
+    DBObjectMap *db;
+    uint64_t seq;
+    RemoveOnDelete(DBObjectMap *db) :
+      db(db), seq(seq) {}
+    void operator() (_Header *header) {
+      Mutex::Locker l(db->header_lock);
+      db->in_use.erase(header->seq);
+      db->header_cond.Signal();
+      delete header;
+    }
+  };
+  friend class RemoveOnDelete;
+};
+WRITE_CLASS_ENCODER(DBObjectMap::_Header)
+WRITE_CLASS_ENCODER(DBObjectMap::State)
+
+#endif
index ecf5f7420c91a084775a403d6cb6aaa5b53607f8..0c1632de3264e8b32e3fe636674602d1cccf927f 100644 (file)
@@ -1727,7 +1727,20 @@ int FileStore::mount()
       ret = -1;
       goto close_current_fd;
     }
-    object_map.reset(new DBObjectMap(omap_store));
+    DBObjectMap *dbomap = new DBObjectMap(omap_store);
+    ret = dbomap->init();
+    if (ret < 0) {
+      derr << "Error initializing DBObjectMap: " << ret << dendl;
+      goto close_current_fd;
+    }
+    stringstream err2;
+
+    if (g_conf->filestore_debug_omap_check && !dbomap->check(err2)) {
+      derr << err2.str() << dendl;;
+      ret = -EINVAL;
+      goto close_current_fd;
+    }
+    object_map.reset(dbomap);
   }
 
   // journal
@@ -1812,6 +1825,15 @@ int FileStore::mount()
     goto close_current_fd;
   }
 
+  {
+    stringstream err2;
+    if (!object_map->check(err2)) {
+      derr << err2.str() << dendl;;
+      ret = -EINVAL;
+      goto close_current_fd;
+    }
+  }
+
   journal_start();
 
   op_tp.start();
@@ -3162,6 +3184,11 @@ void FileStore::sync_entry()
        derr << "Error during write_op_seq: " << cpp_strerror(err) << dendl;
        assert(0);
       }
+      stringstream errstream;
+      if (!object_map->check(errstream)) {
+       derr << errstream.str() << dendl;
+       assert(0);
+      }
 
       if (btrfs_stable_commits) {
 
@@ -3936,6 +3963,7 @@ int FileStore::omap_get(coll_t c, const hobject_t &hoid,
                        bufferlist *header,
                        map<string, bufferlist> *out)
 {
+  dout(15) << __func__ << " " << c << "/" << hoid << dendl;
   IndexedPath path;
   int r = lfn_find(c, hoid, &path);
   if (r < 0)
@@ -3946,6 +3974,7 @@ int FileStore::omap_get(coll_t c, const hobject_t &hoid,
 int FileStore::omap_get_header(coll_t c, const hobject_t &hoid,
                               bufferlist *bl)
 {
+  dout(15) << __func__ << " " << c << "/" << hoid << dendl;
   IndexedPath path;
   int r = lfn_find(c, hoid, &path);
   if (r < 0)
@@ -3955,6 +3984,7 @@ int FileStore::omap_get_header(coll_t c, const hobject_t &hoid,
 
 int FileStore::omap_get_keys(coll_t c, const hobject_t &hoid, set<string> *keys)
 {
+  dout(15) << __func__ << " " << c << "/" << hoid << dendl;
   IndexedPath path;
   int r = lfn_find(c, hoid, &path);
   if (r < 0)
@@ -3966,6 +3996,7 @@ int FileStore::omap_get_values(coll_t c, const hobject_t &hoid,
                               const set<string> &keys,
                               map<string, bufferlist> *out)
 {
+  dout(15) << __func__ << " " << c << "/" << hoid << dendl;
   IndexedPath path;
   int r = lfn_find(c, hoid, &path);
   if (r < 0)
@@ -3977,6 +4008,7 @@ int FileStore::omap_check_keys(coll_t c, const hobject_t &hoid,
                               const set<string> &keys,
                               set<string> *out)
 {
+  dout(15) << __func__ << " " << c << "/" << hoid << dendl;
   IndexedPath path;
   int r = lfn_find(c, hoid, &path);
   if (r < 0)
@@ -3987,6 +4019,7 @@ int FileStore::omap_check_keys(coll_t c, const hobject_t &hoid,
 ObjectMap::ObjectMapIterator FileStore::get_omap_iterator(coll_t c,
                                                          const hobject_t &hoid)
 {
+  dout(15) << __func__ << " " << c << "/" << hoid << dendl;
   IndexedPath path;
   int r = lfn_find(c, hoid, &path);
   if (r < 0)
@@ -4045,6 +4078,7 @@ int FileStore::_collection_remove(coll_t c, const hobject_t& o)
 
 
 int FileStore::_omap_clear(coll_t cid, const hobject_t &hoid) {
+  dout(15) << __func__ << " " << cid << "/" << hoid << dendl;
   IndexedPath path;
   int r = lfn_find(cid, hoid, &path);
   if (r < 0)
@@ -4053,6 +4087,7 @@ int FileStore::_omap_clear(coll_t cid, const hobject_t &hoid) {
 }
 int FileStore::_omap_setkeys(coll_t cid, const hobject_t &hoid,
                             const map<string, bufferlist> &aset) {
+  dout(15) << __func__ << " " << cid << "/" << hoid << dendl;
   IndexedPath path;
   int r = lfn_find(cid, hoid, &path);
   if (r < 0)
@@ -4061,6 +4096,7 @@ int FileStore::_omap_setkeys(coll_t cid, const hobject_t &hoid,
 }
 int FileStore::_omap_rmkeys(coll_t cid, const hobject_t &hoid,
                           const set<string> &keys) {
+  dout(15) << __func__ << " " << cid << "/" << hoid << dendl;
   IndexedPath path;
   int r = lfn_find(cid, hoid, &path);
   if (r < 0)
@@ -4070,6 +4106,7 @@ int FileStore::_omap_rmkeys(coll_t cid, const hobject_t &hoid,
 int FileStore::_omap_setheader(coll_t cid, const hobject_t &hoid,
                               const bufferlist &bl)
 {
+  dout(15) << __func__ << " " << cid << "/" << hoid << dendl;
   IndexedPath path;
   int r = lfn_find(cid, hoid, &path);
   if (r < 0)
diff --git a/src/os/KeyValueDB.h b/src/os/KeyValueDB.h
new file mode 100644 (file)
index 0000000..0c7fd54
--- /dev/null
@@ -0,0 +1,73 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+#ifndef KEY_VALUE_DB_H
+#define KEY_VALUE_DB_H
+
+#include "include/buffer.h"
+#include <set>
+#include <map>
+#include <string>
+#include <tr1/memory>
+#include "ObjectMap.h"
+
+using std::string;
+/**
+ * Defines virtual interface to be implemented by key value store
+ *
+ * Kyoto Cabinet or LevelDB should implement this
+ */
+class KeyValueDB {
+public:
+  class TransactionImpl {
+  public:
+    /// Set Keys
+    virtual void set(
+      const string &prefix,                 ///< [in] Prefix for keys
+      const std::map<string, bufferlist> &to_set ///< [in] keys/values to set
+      ) = 0;
+
+    /// Removes Keys
+    virtual void rmkeys(
+      const string &prefix,   ///< [in] Prefix to search for
+      const std::set<string> &keys ///< [in] Keys to remove
+      ) = 0;
+
+    /// Removes keys beginning with prefix
+    virtual void rmkeys_by_prefix(
+      const string &prefix ///< [in] Prefix by which to remove keys
+      ) = 0;
+
+    virtual ~TransactionImpl() {};
+  };
+  typedef std::tr1::shared_ptr< TransactionImpl > Transaction;
+
+  virtual Transaction get_transaction() = 0;
+  virtual int submit_transaction(Transaction) = 0;
+
+  /// Retrieve Keys
+  virtual int get(
+    const string &prefix,        ///< [in] Prefix for key
+    const std::set<string> &key,      ///< [in] Key to retrieve
+    std::map<string, bufferlist> *out ///< [out] Key value retrieved
+    ) = 0;
+
+  class IteratorImpl : public ObjectMap::ObjectMapIteratorImpl {
+  public:
+    virtual int seek_to_first() = 0;
+    virtual int seek_to_last() = 0;
+    virtual int upper_bound(const string &after) = 0;
+    virtual int lower_bound(const string &to) = 0;
+    virtual bool valid() = 0;
+    virtual int next() = 0;
+    virtual int prev() = 0;
+    virtual string key() = 0;
+    virtual bufferlist value() = 0;
+    virtual int status() = 0;
+    virtual ~IteratorImpl() {}
+  };
+  typedef std::tr1::shared_ptr< IteratorImpl > Iterator;
+  virtual Iterator get_iterator(const string &prefix) = 0;
+
+  virtual ~KeyValueDB() {}
+};
+
+#endif
index d6e1115a08e9b10aa8c3a225fe2cc826b7c616a1..361d99210cb8480f5b69f301de8a0021f9f108fc 100644 (file)
@@ -109,6 +109,8 @@ public:
     CollectionIndex::IndexedPath target_path ///< [in] path to target
     ) { return 0; }
 
+  virtual bool check(std::ostream &out) { return true; }
+
   class ObjectMapIteratorImpl {
   public:
     virtual int seek_to_first() = 0;
diff --git a/src/test/ObjectMap/KeyValueDBMemory.cc b/src/test/ObjectMap/KeyValueDBMemory.cc
new file mode 100644 (file)
index 0000000..596059d
--- /dev/null
@@ -0,0 +1,165 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+#include "include/encoding.h"
+#include "KeyValueDBMemory.h"
+#include <map>
+#include <set>
+#include <tr1/memory>
+#include <iostream>
+
+using namespace std;
+
+class MemIterator : public KeyValueDB::IteratorImpl {
+  string prefix;
+  KeyValueDBMemory *db;
+
+  bool ready;
+  map<string, bufferlist>::iterator iter;
+
+public:
+  MemIterator(const string &prefix,
+             KeyValueDBMemory *db) :
+    prefix(prefix), db(db), ready(false) {}
+
+  int seek_to_first() {
+    if (!db->db.count(prefix)) {
+      ready = false;
+      return 0;
+    }
+    iter = db->db[prefix].begin();
+    ready = true;
+    return 0;
+  }
+
+  int seek_to_last() {
+    if (!db->db.count(prefix)) {
+      ready = false;
+      return 0;
+    } else if (db->db[prefix].size() == 0) {
+      iter = db->db[prefix].end();
+    } else {
+      iter = --db->db[prefix].end();
+    }
+    ready = true;
+    return 0;
+  }
+
+  int lower_bound(const string &to) {
+    if (!db->db.count(prefix)) {
+      ready = false;
+      return 0;
+    }
+    iter = db->db[prefix].lower_bound(to);
+    ready = true;
+    return 0;
+  }
+
+  int upper_bound(const string &after) {
+    if (!db->db.count(prefix)) {
+      ready = false;
+      return 0;
+    }
+    iter = db->db[prefix].upper_bound(after);
+    ready = true;
+    return 0;
+  }
+
+  bool valid() {
+    return ready && iter != db->db[prefix].end();
+  }
+
+  bool begin() {
+    return ready && iter == db->db[prefix].begin();
+  }
+
+  int prev() {
+    if (valid() && iter != db->db[prefix].begin())
+      iter--;
+    return 0;
+  }
+
+  int next() {
+    if (valid())
+      iter++;
+    return 0;
+  }
+
+  string key() {
+    if (valid())
+      return iter->first;
+    else
+      return "";
+  }
+
+  bufferlist value() {
+    if (valid())
+      return iter->second;
+    else
+      return bufferlist();
+  }
+
+  int status() {
+    return 0;
+  }
+};
+
+int KeyValueDBMemory::get(const string &prefix,
+                         const std::set<string> &key,
+                         map<string, bufferlist> *out) {
+  if (!db.count(prefix))
+    return 0;
+
+  for (std::set<string>::const_iterator i = key.begin();
+       i != key.end();
+       ++i) {
+    if (db[prefix].count(*i))
+      (*out)[*i] = db[prefix][*i];
+  }
+  return 0;
+}
+
+int KeyValueDBMemory::get_keys(const string &prefix,
+                              const std::set<string> &key,
+                              std::set<string> *out) {
+  if (!db.count(prefix))
+    return 0;
+
+  for (std::set<string>::const_iterator i = key.begin();
+       i != key.end();
+       ++i) {
+    if (db[prefix].count(*i))
+      out->insert(*i);
+  }
+  return 0;
+}
+
+int KeyValueDBMemory::set(const string &prefix,
+                         const map<string, bufferlist> &to_set) {
+  for (map<string, bufferlist>::const_iterator i = to_set.begin();
+       i != to_set.end();
+       ++i) {
+    bufferlist bl = i->second;
+    db[prefix][i->first] = i->second;
+  }
+  return 0;
+}
+
+int KeyValueDBMemory::rmkeys(const string &prefix,
+                            const std::set<string> &keys) {
+  if (!db.count(prefix))
+    return 0;
+  for (std::set<string>::const_iterator i = keys.begin();
+       i != keys.end();
+       ++i) {
+    db[prefix].erase(*i);
+  }
+  return 0;
+}
+
+int KeyValueDBMemory::rmkeys_by_prefix(const string &prefix) {
+  db.erase(prefix);
+  return 0;
+}
+
+KeyValueDB::Iterator KeyValueDBMemory::get_iterator(const string &prefix) {
+  return tr1::shared_ptr<IteratorImpl>(new MemIterator(prefix, this));
+}
diff --git a/src/test/ObjectMap/KeyValueDBMemory.h b/src/test/ObjectMap/KeyValueDBMemory.h
new file mode 100644 (file)
index 0000000..e2ce32a
--- /dev/null
@@ -0,0 +1,126 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+#include <map>
+#include <set>
+#include <string>
+#include <tr1/memory>
+
+#include "os/KeyValueDB.h"
+#include "include/buffer.h"
+#include "include/Context.h"
+
+using std::string;
+
+class KeyValueDBMemory : public KeyValueDB {
+public:
+  std::map<string, std::map<string, bufferlist> > db;
+
+  int get(
+    const string &prefix,
+    const std::set<string> &key,
+    std::map<string, bufferlist> *out
+    );
+
+  int get_keys(
+    const string &prefix,
+    const std::set<string> &key,
+    std::set<string> *out
+    );
+
+  int set(
+    const string &prefix,
+    const std::map<string, bufferlist> &to_set
+    );
+
+  int rmkeys(
+    const string &prefix,
+    const std::set<string> &keys
+    );
+
+  int rmkeys_by_prefix(
+    const string &prefix
+    );
+
+  class TransactionImpl_ : public TransactionImpl {
+  public:
+    list<Context *> on_commit;
+    KeyValueDBMemory *db;
+
+    TransactionImpl_(KeyValueDBMemory *db) : db(db) {}
+
+
+    struct SetOp : public Context {
+      KeyValueDBMemory *db;
+      string prefix;
+      std::map<string, bufferlist> to_set;
+      SetOp(KeyValueDBMemory *db,
+           const string &prefix,
+           const std::map<string, bufferlist> &to_set)
+       : db(db), prefix(prefix), to_set(to_set) {}
+      void finish(int r) {
+       db->set(prefix, to_set);
+      }
+    };
+    void set(const string &prefix, const std::map<string, bufferlist> &to_set) {
+      on_commit.push_back(new SetOp(db, prefix, to_set));
+    }
+
+    struct RmKeysOp : public Context {
+      KeyValueDBMemory *db;
+      string prefix;
+      std::set<string> keys;
+      RmKeysOp(KeyValueDBMemory *db,
+              const string &prefix,
+              const std::set<string> &keys)
+       : db(db), prefix(prefix), keys(keys) {}
+      void finish(int r) {
+       db->rmkeys(prefix, keys);
+      }
+    };
+    void rmkeys(const string &prefix, const std::set<string> &to_remove) {
+      on_commit.push_back(new RmKeysOp(db, prefix, to_remove));
+    }
+
+    struct RmKeysByPrefixOp : public Context {
+      KeyValueDBMemory *db;
+      string prefix;
+      RmKeysByPrefixOp(KeyValueDBMemory *db,
+                      const string &prefix)
+       : db(db), prefix(prefix) {}
+      void finish(int r) {
+       db->rmkeys_by_prefix(prefix);
+      }
+    };
+    void rmkeys_by_prefix(const string &prefix) {
+      on_commit.push_back(new RmKeysByPrefixOp(db, prefix));
+    }
+
+    int complete() {
+      for (list<Context *>::iterator i = on_commit.begin();
+          i != on_commit.end();
+          on_commit.erase(i++)) {
+       (*i)->finish(0);
+       delete *i;
+      }
+      return 0;
+    }
+
+    ~TransactionImpl_() {
+      for (list<Context *>::iterator i = on_commit.begin();
+          i != on_commit.end();
+          on_commit.erase(i++)) {
+       delete *i;
+      }
+    }
+  };
+
+  Transaction get_transaction() {
+    return Transaction(new TransactionImpl_(this));
+  }
+
+  int submit_transaction(Transaction trans) {
+    return static_cast<TransactionImpl_*>(trans.get())->complete();
+  }
+
+  friend class MemIterator;
+  Iterator get_iterator(const string &prefix);
+};
diff --git a/src/test/ObjectMap/test_object_map.cc b/src/test/ObjectMap/test_object_map.cc
new file mode 100644 (file)
index 0000000..d888a84
--- /dev/null
@@ -0,0 +1,598 @@
+#include <tr1/memory>
+#include <map>
+#include <set>
+#include <boost/scoped_ptr.hpp>
+
+#include "os/CollectionIndex.h"
+#include "include/buffer.h"
+#include "test/ObjectMap/KeyValueDBMemory.h"
+#include "os/KeyValueDB.h"
+#include "os/DBObjectMap.h"
+#include "os/LevelDBStore.h"
+#include <sys/types.h>
+#include "global/global_init.h"
+#include "common/ceph_argparse.h"
+#include <dirent.h>
+
+#include "gtest/gtest.h"
+#include "stdlib.h"
+
+using namespace std;
+
+template <typename T>
+typename T::iterator rand_choose(T &cont) {
+  if (cont.size() == 0) {
+    return cont.end();
+  }
+  int index = rand() % cont.size();
+  typename T::iterator retval = cont.begin();
+
+  for (; index > 0; --index) retval++;
+  return retval;
+}
+
+string num_str(unsigned i) {
+  char buf[100];
+  snprintf(buf, sizeof(buf), "%.10d", i);
+  return string(buf);
+}
+
+class ObjectMapTest : public ::testing::Test {
+public:
+  boost::scoped_ptr< ObjectMap > db;
+  set<string> key_space;
+  set<string> object_name_space;
+  map<string, map<string, string> > omap;
+  map<string, string > hmap;
+  CollectionIndex::IndexedPath def_collection;
+  unsigned seq;
+
+  ObjectMapTest() : db(), seq(0) {}
+
+  virtual void SetUp() {
+    char *path = getenv("OBJECT_MAP_PATH");
+    if (!path) {
+      db.reset(new DBObjectMap(new KeyValueDBMemory()));
+      return;
+    }
+
+    string strpath(path);
+
+    cerr << "using path " << strpath << std::endl;;
+    LevelDBStore *store = new LevelDBStore(strpath);
+    assert(!store->init(cerr));
+
+    db.reset(new DBObjectMap(store));
+  }
+
+  virtual void TearDown() {
+    std::cerr << "Checking..." << std::endl;
+    assert(db->check(std::cerr));
+  }
+
+  string val_from_key(const string &object, const string &key) {
+    return object + "_" + key + "_" + num_str(seq++);
+  }
+
+  void set_key(const string &objname, const string &key, const string &value) {
+    set_key(hobject_t(sobject_t(objname, CEPH_NOSNAP)), def_collection,
+           key, value);
+  }
+
+  void set_key(hobject_t hoid, CollectionIndex::IndexedPath path,
+              string key, string value) {
+    map<string, bufferlist> to_write;
+    bufferptr bp(value.c_str(), value.size());
+    bufferlist bl;
+    bl.append(bp);
+    to_write.insert(make_pair(key, bl));
+    db->set_keys(hoid, path, to_write);
+  }
+
+  void set_header(const string &objname, const string &value) {
+    set_header(hobject_t(sobject_t(objname, CEPH_NOSNAP)), def_collection,
+              value);
+  }
+
+  void set_header(hobject_t hoid, CollectionIndex::IndexedPath path,
+                 const string &value) {
+    bufferlist header;
+    header.append(bufferptr(value.c_str(), value.size() + 1));
+    db->set_header(hoid, path, header);
+  }
+
+  int get_header(const string &objname, string *value) {
+    return get_header(hobject_t(sobject_t(objname, CEPH_NOSNAP)), def_collection,
+                     value);
+  }
+
+  int get_header(hobject_t hoid, CollectionIndex::IndexedPath path,
+                string *value) {
+    bufferlist header;
+    int r = db->get_header(hoid, path, &header);
+    if (r < 0)
+      return r;
+    if (header.length())
+      *value = string(header.c_str());
+    else
+      *value = string("");
+    return 0;
+  }
+
+  int get_key(const string &objname, const string &key, string *value) {
+    return get_key(hobject_t(sobject_t(objname, CEPH_NOSNAP)), def_collection,
+                  key, value);
+  }
+
+  int get_key(hobject_t hoid, CollectionIndex::IndexedPath path,
+             string key, string *value) {
+    set<string> to_get;
+    to_get.insert(key);
+    map<string, bufferlist> got;
+    db->get_values(hoid, path, to_get, &got);
+    if (got.size()) {
+      *value = string(got.begin()->second.c_str(),
+                     got.begin()->second.length());
+      return 1;
+    } else {
+      return 0;
+    }
+  }
+
+  void remove_key(const string &objname, const string &key) {
+    remove_key(hobject_t(sobject_t(objname, CEPH_NOSNAP)), def_collection,
+              key);
+  }
+
+  void remove_key(hobject_t hoid, CollectionIndex::IndexedPath path,
+                 string key) {
+    set<string> to_remove;
+    to_remove.insert(key);
+    db->rm_keys(hoid, path, to_remove);
+  }
+
+  void clone(const string &objname, const string &target) {
+    clone(hobject_t(sobject_t(objname, CEPH_NOSNAP)), def_collection,
+         hobject_t(sobject_t(target, CEPH_NOSNAP)), def_collection);
+  }
+
+  void clone(hobject_t hoid, CollectionIndex::IndexedPath path,
+            hobject_t hoid2, CollectionIndex::IndexedPath path2) {
+    db->clone_keys(hoid, path, hoid2, path2);
+  }
+
+  void clear(const string &objname) {
+    clear(hobject_t(sobject_t(objname, CEPH_NOSNAP)), def_collection);
+  }
+
+  void clear(hobject_t hoid, CollectionIndex::IndexedPath path) {
+    db->clear(hoid, path);
+  }
+
+  void def_init() {
+    for (unsigned i = 0; i < 1000; ++i) {
+      key_space.insert("key_" + num_str(i));
+    }
+    for (unsigned i = 0; i < 1000; ++i) {
+      object_name_space.insert("name_" + num_str(i));
+    }
+    init_default_collection("def_collection");
+  }
+
+  void init_key_set(const set<string> &keys) {
+    key_space = keys;
+  }
+
+  void init_object_name_space(const set<string> &onamespace) {
+    object_name_space = onamespace;
+  }
+
+  void init_default_collection(const string &coll_name) {
+    def_collection = CollectionIndex::get_testing_path(
+      "/" + coll_name, coll_t(coll_name));
+  }
+
+  void auto_set_key(ostream &out) {
+    set<string>::iterator key = rand_choose(key_space);
+    set<string>::iterator object = rand_choose(object_name_space);
+
+    string value = val_from_key(*object, *key);
+
+    omap[*object][*key] = value;
+    set_key(*object, *key, value);
+
+    out << "auto_set_key " << *object << ": " << *key << " -> "
+       << value << std::endl;
+  }
+
+  void keys_on_object(const string &object, set<string> *out) {
+    if (!omap.count(object))
+      return;
+    const map<string, string> &kmap = omap.find(object)->second;
+    for (map<string, string>::const_iterator i = kmap.begin();
+        i != kmap.end();
+        ++i) {
+      out->insert(i->first);
+    }
+  }
+
+  void keys_off_object(const string &object, set<string> *out) {
+    *out = key_space;
+    set<string> kspace;
+    keys_on_object(object, &kspace);
+    for (set<string>::iterator i = kspace.begin();
+        i != kspace.end();
+        ++i) {
+      out->erase(*i);
+    }
+  }
+
+  int auto_check_present_key(ostream &out) {
+    set<string>::iterator object = rand_choose(object_name_space);
+    set<string> kspace;
+    keys_on_object(*object, &kspace);
+    set<string>::iterator key = rand_choose(kspace);
+    if (key == kspace.end()) {
+      return 1;
+    }
+
+    string result;
+    int r = get_key(*object, *key, &result);
+    if (!r) {
+      out << "auto_check_present_key: failed to find key "
+         << *key << " on object " << *object << std::endl;
+      return 0;
+    }
+
+    if (result != omap[*object][*key]) {
+      out << "auto_check_present_key: for key "
+         << *key << " on object " << *object
+         << " found value " << result << " where we should have found "
+         << omap[*object][*key] << std::endl;
+      return 0;
+    }
+
+    out << "auto_check_present_key: for key "
+       << *key << " on object " << *object
+       << " found value " << result << " where we should have found "
+       << omap[*object][*key] << std::endl;
+    return 1;
+  }
+
+  int auto_check_absent_key(ostream &out) {
+    set<string>::iterator object = rand_choose(object_name_space);
+    set<string> kspace;
+    keys_off_object(*object, &kspace);
+    set<string>::iterator key = rand_choose(kspace);
+    if (key == kspace.end()) {
+      return 1;
+    }
+
+    string result;
+    int r = get_key(*object, *key, &result);
+    if (!r) {
+      out << "auto_check_absent_key: did not find key "
+         << *key << " on object " << *object << std::endl;
+      return 1;
+    }
+
+    out << "auto_check_basent_key: for key "
+       << *key << " on object " << *object
+       << " found value " << result << " where we should have found nothing"
+       << std::endl;
+    return 0;
+  }
+
+  void auto_clone_key(ostream &out) {
+    set<string>::iterator object = rand_choose(object_name_space);
+    set<string>::iterator target = rand_choose(object_name_space);
+    while (target == object) {
+      target = rand_choose(object_name_space);
+    }
+    out << "clone " << *object << " to " << *target;
+    clone(*object, *target);
+    if (!omap.count(*object)) {
+      out << " source missing.";
+      omap.erase(*target);
+    } else {
+      out << " source present.";
+      omap[*target] = omap[*object];
+    }
+    if (!hmap.count(*object)) {
+      out << " hmap source missing." << std::endl;
+      hmap.erase(*target);
+    } else {
+      out << " hmap source present." << std::endl;
+      hmap[*target] = hmap[*object];
+    }
+  }
+
+  void auto_remove_key(ostream &out) {
+    set<string>::iterator object = rand_choose(object_name_space);
+    set<string> kspace;
+    keys_on_object(*object, &kspace);
+    set<string>::iterator key = rand_choose(kspace);
+    if (key == kspace.end()) {
+      return;
+    }
+    out << "removing " << *key << " from " << *object << std::endl;
+    omap[*object].erase(*key);
+    remove_key(*object, *key);
+  }
+
+  void auto_delete_object(ostream &out) {
+    set<string>::iterator object = rand_choose(object_name_space);
+    out << "auto_delete_object " << *object << std::endl;
+    clear(*object);
+    omap.erase(*object);
+    hmap.erase(*object);
+  }
+
+  void auto_write_header(ostream &out) {
+    set<string>::iterator object = rand_choose(object_name_space);
+    string header = val_from_key(*object, "HEADER");
+    out << "auto_write_header: " << *object << " -> " << header << std::endl;
+    set_header(*object, header);
+    hmap[*object] = header;
+  }
+
+  int auto_verify_header(ostream &out) {
+    set<string>::iterator object = rand_choose(object_name_space);
+    out << "verify_header: " << *object << " ";
+    string header;
+    int r = get_header(*object, &header);
+    if (r < 0) {
+      assert(0);
+    }
+    if (header.size() == 0) {
+      if (hmap.count(*object)) {
+       out << " failed to find header " << hmap[*object] << std::endl;
+       return 0;
+      } else {
+       out << " found no header" << std::endl;
+       return 1;
+      }
+    }
+
+    if (!hmap.count(*object)) {
+      out << " found header " << header << " should have been empty"
+             << std::endl;
+      return 0;
+    } else if (header == hmap[*object]) {
+      out << " found correct header " << header << std::endl;
+      return 1;
+    } else {
+      out << " found incorrect header " << header
+         << " where we should have found " << hmap[*object] << std::endl;
+      return 0;
+    }
+  }
+};
+
+int main(int argc, char **argv) {
+  vector<const char*> args;
+  argv_to_vec(argc, (const char **)argv, args);
+
+  global_init(args, CEPH_ENTITY_TYPE_CLIENT, CODE_ENVIRONMENT_UTILITY, 0);
+  common_init_finish(g_ceph_context);
+  ::testing::InitGoogleTest(&argc, argv);
+  return RUN_ALL_TESTS();
+}
+
+TEST_F(ObjectMapTest, CreateOneObject) {
+  hobject_t hoid(sobject_t("foo", CEPH_NOSNAP));
+  CollectionIndex::IndexedPath path = CollectionIndex::get_testing_path(
+    "/bar", coll_t("foo_coll"));
+
+  map<string, bufferlist> to_set;
+  string key("test");
+  string val("test_val");
+  bufferptr bp(val.c_str(), val.size());
+  bufferlist bl;
+  bl.append(bp);
+  to_set.insert(make_pair(key, bl));
+  ASSERT_FALSE(db->set_keys(hoid, path, to_set));
+
+  map<string, bufferlist> got;
+  set<string> to_get;
+  to_get.insert(key);
+  to_get.insert("not there");
+  db->get_values(hoid, path, to_get, &got);
+  ASSERT_EQ(got.size(), (unsigned)1);
+  ASSERT_EQ(string(got[key].c_str(), got[key].length()), val);
+
+  bufferlist header;
+  got.clear();
+  db->get(hoid, path, &header, &got);
+  ASSERT_EQ(got.size(), (unsigned)1);
+  ASSERT_EQ(string(got[key].c_str(), got[key].length()), val);
+  ASSERT_EQ(header.length(), (unsigned)0);
+
+  db->rm_keys(hoid, path, to_get);
+  got.clear();
+  db->get(hoid, path, &header, &got);
+  ASSERT_EQ(got.size(), (unsigned)0);
+
+  db->clear(hoid, path);
+  db->get(hoid, path, &header, &got);
+  ASSERT_EQ(got.size(), (unsigned)0);
+}
+
+TEST_F(ObjectMapTest, CloneOneObject) {
+  hobject_t hoid(sobject_t("foo", CEPH_NOSNAP));
+  hobject_t hoid2(sobject_t("foo2", CEPH_NOSNAP));
+  CollectionIndex::IndexedPath path = CollectionIndex::get_testing_path(
+    "/bar", coll_t("foo_coll"));
+
+  set_key(hoid, path, "foo", "bar");
+  set_key(hoid, path, "foo2", "bar2");
+  string result;
+  int r = get_key(hoid, path, "foo", &result);
+  ASSERT_EQ(r, 1);
+  ASSERT_EQ(result, "bar");
+
+  db->clone_keys(hoid, path, hoid2, path);
+  r = get_key(hoid, path, "foo", &result);
+  ASSERT_EQ(r, 1);
+  ASSERT_EQ(result, "bar");
+  r = get_key(hoid2, path, "foo", &result);
+  ASSERT_EQ(r, 1);
+  ASSERT_EQ(result, "bar");
+
+  remove_key(hoid, path, "foo");
+  r = get_key(hoid2, path, "foo", &result);
+  ASSERT_EQ(r, 1);
+  ASSERT_EQ(result, "bar");
+  r = get_key(hoid, path, "foo", &result);
+  ASSERT_EQ(r, 0);
+  r = get_key(hoid, path, "foo2", &result);
+  ASSERT_EQ(r, 1);
+  ASSERT_EQ(result, "bar2");
+
+  set_key(hoid, path, "foo", "baz");
+  remove_key(hoid, path, "foo");
+  r = get_key(hoid, path, "foo", &result);
+  ASSERT_EQ(r, 0);
+
+  set_key(hoid, path, "foo2", "baz");
+  remove_key(hoid, path, "foo2");
+  r = get_key(hoid, path, "foo2", &result);
+  ASSERT_EQ(r, 0);
+
+  map<string, bufferlist> got;
+  bufferlist header;
+
+  got.clear();
+  db->clear(hoid, path);
+  db->get(hoid, path, &header, &got);
+  ASSERT_EQ(got.size(), (unsigned)0);
+
+  got.clear();
+  r = db->clear(hoid2, path);
+  ASSERT_EQ(0, r);
+  db->get(hoid2, path, &header, &got);
+  ASSERT_EQ(got.size(), (unsigned)0);
+
+  set_key(hoid, path, "baz", "bar");
+  got.clear();
+  db->get(hoid, path, &header, &got);
+  ASSERT_EQ(got.size(), (unsigned)1);
+  db->clear(hoid, path);
+  db->clear(hoid2, path);
+}
+
+TEST_F(ObjectMapTest, OddEvenClone) {
+  hobject_t hoid(sobject_t("foo", CEPH_NOSNAP));
+  hobject_t hoid2(sobject_t("foo2", CEPH_NOSNAP));
+  hobject_t hoid_link(sobject_t("foo_link", CEPH_NOSNAP));
+  CollectionIndex::IndexedPath path = CollectionIndex::get_testing_path(
+    "/bar", coll_t("foo_coll"));
+
+
+  for (unsigned i = 0; i < 1000; ++i) {
+    set_key(hoid, path, "foo" + num_str(i), "bar" + num_str(i));
+  }
+
+  db->link_keys(hoid, path, hoid_link, path);
+  db->clone_keys(hoid, path, hoid2, path);
+
+  int r = 0;
+  for (unsigned i = 0; i < 1000; ++i) {
+    string result;
+    r = get_key(hoid, path, "foo" + num_str(i), &result);
+    ASSERT_EQ(1, r);
+    ASSERT_EQ("bar" + num_str(i), result);
+    r = get_key(hoid2, path, "foo" + num_str(i), &result);
+    ASSERT_EQ(1, r);
+    ASSERT_EQ("bar" + num_str(i), result);
+
+    r = get_key(hoid2, path, "foo" + num_str(i), &result);
+    ASSERT_EQ(1, r);
+    ASSERT_EQ("bar" + num_str(i), result);
+    if (i % 2) {
+      remove_key(hoid, path, "foo" + num_str(i));
+    } else {
+      remove_key(hoid2, path, "foo" + num_str(i));
+    }
+  }
+
+  for (unsigned i = 0; i < 1000; ++i) {
+    string result;
+    string result2;
+    string result3;
+    r = get_key(hoid, path, "foo" + num_str(i), &result);
+    int r3 = get_key(hoid_link, path, "foo" + num_str(i), &result3);
+    int r2 = get_key(hoid2, path, "foo" + num_str(i), &result2);
+    if (i % 2) {
+      ASSERT_EQ(0, r);
+      ASSERT_EQ(0, r3);
+      ASSERT_EQ(1, r2);
+      ASSERT_EQ("bar" + num_str(i), result2);
+    } else {
+      ASSERT_EQ(0, r2);
+      ASSERT_EQ(1, r);
+      ASSERT_EQ(1, r3);
+      ASSERT_EQ("bar" + num_str(i), result);
+      ASSERT_EQ("bar" + num_str(i), result3);
+    }
+  }
+
+  db->clear(hoid_link, path);
+
+  {
+    ObjectMap::ObjectMapIterator iter = db->get_iterator(hoid, path);
+    iter->seek_to_first();
+    for (unsigned i = 0; i < 1000; ++i) {
+      if (!(i % 2)) {
+       ASSERT_TRUE(iter->valid());
+       ASSERT_EQ("foo" + num_str(i), iter->key());
+       iter->next();
+      }
+    }
+  }
+
+  {
+    ObjectMap::ObjectMapIterator iter2 = db->get_iterator(hoid2, path);
+    iter2->seek_to_first();
+    for (unsigned i = 0; i < 1000; ++i) {
+      if (i % 2) {
+       ASSERT_TRUE(iter2->valid());
+       ASSERT_EQ("foo" + num_str(i), iter2->key());
+       iter2->next();
+      }
+    }
+  }
+
+  db->clear(hoid, path);
+  db->clear(hoid2, path);
+}
+
+TEST_F(ObjectMapTest, RandomTest) {
+  def_init();
+  for (unsigned i = 0; i < 1000; ++i) {
+    unsigned val = rand();
+    val <<= 8;
+    val %= 100;
+    if (!(i%100))
+      std::cout << "on op " << i
+               << " val is " << val << std::endl;
+
+    if (val < 7) {
+      auto_write_header(std::cerr);
+    } else if (val < 14) {
+      ASSERT_TRUE(auto_verify_header(std::cerr));
+    } else if (val < 30) {
+      auto_set_key(std::cerr);
+    } else if (val < 55) {
+      ASSERT_TRUE(auto_check_present_key(std::cerr));
+    } else if (val < 70) {
+      ASSERT_TRUE(auto_check_absent_key(std::cerr));
+    } else if (val < 76) {
+      auto_delete_object(std::cerr);
+    } else if (val < 85) {
+      auto_clone_key(std::cerr);
+    } else {
+      auto_remove_key(std::cerr);
+    }
+  }
+}
index bdb4e911eded65e57ebbdd06cf6effadf94d5bcb..f8fd5932d0466cc549665e8480c4b05e29319fbd 100644 (file)
@@ -77,6 +77,10 @@ TYPE(MonMap)
 TYPE(MonCap)
 TYPE(MonCaps)
 
+#include "os/DBObjectMap.h"
+TYPE(DBObjectMap::_Header)
+TYPE(DBObjectMap::State)
+
 // --- messages ---
 #include "messages/MAuth.h"
 MESSAGE(MAuth)