]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osbdb: Remove dead code.
authorTommi Virtanen <tommi.virtanen@dreamhost.com>
Thu, 7 Apr 2011 19:24:06 +0000 (12:24 -0700)
committerTommi Virtanen <tommi.virtanen@dreamhost.com>
Thu, 7 Apr 2011 19:24:06 +0000 (12:24 -0700)
Nothing has used the BerkeleyDB-backed object storage in years.

Signed-off-by: Tommi Virtanen <tommi.virtanen@dreamhost.com>
12 files changed:
debian/copyright
src/Makefile.am
src/common/config.h
src/doc/bdb.txt [deleted file]
src/os/BDBMap.h [deleted file]
src/os/FakeStoreBDBCollections.h [deleted file]
src/os/FileStore.h
src/osbdb/OSBDB.cc [deleted file]
src/osbdb/OSBDB.h [deleted file]
src/osd/OSD.cc
src/test/old/testos.cc
src/test/old/testosbdb.cc [deleted file]

index 95d227b5ccbc44f73b87be098153c735c62d2b9a..33a936e351481d89d8cfaa29a08ec13dae733ea2 100644 (file)
@@ -17,10 +17,6 @@ Copyright:
     Copyright (C) 2010 Canonical
 License: GPL2
 
-Files: src/osbdb/OSBDB.h
-Copyright: Copyright (C) 2007 Casey Marshall <csm@soe.ucsc.edu>
-License: LGPL2.1
-
 Files: src/mount/canonicalize.c
 Copyright: Copyright (C) 1993 Rick Sladkey <jrs@world.std.com>
 License: LGPL2 or later
index f123f677bbb725b846a5e1694359ad476622ab20..4b87b3f071b7417a5c027df3ad5b76669a19bbfe 100644 (file)
@@ -1024,15 +1024,12 @@ noinst_HEADERS = \
         msg/tcp.h\
        objclass/objclass.h\
        os/btrfs_ioctl.h\
-        os/BDBMap.h\
         os/Fake.h\
-        os/FakeStoreBDBCollections.h\
         os/FileJournal.h\
         os/FileStore.h\
         os/Journal.h\
         os/JournalingObjectStore.h\
         os/ObjectStore.h\
-        osbdb/OSBDB.h\
         osd/Ager.h\
         osd/OSD.h\
         osd/OSDCaps.h\
index 295b66cb6a44471560396474dbdc9956f25ea0ec..0f51d118b4fde7d7bf53374ff7d9e723d1cf5a0e 100644 (file)
@@ -488,16 +488,6 @@ public:
   int   bdev_fake_mb;
   int   bdev_fake_max_mb;
 
-#ifdef USE_OSBDB
-  bool bdbstore;
-  int debug_bdbstore;
-  bool bdbstore_btree;
-  int bdbstore_ffactor;
-  int bdbstore_nelem;
-  int bdbstore_pagesize;
-  int bdbstore_cachesize;
-  bool bdbstore_transactional;
-#endif // USE_OSBDB
 };
 
 extern md_config_t g_conf;
diff --git a/src/doc/bdb.txt b/src/doc/bdb.txt
deleted file mode 100644 (file)
index 63e647f..0000000
+++ /dev/null
@@ -1,48 +0,0 @@
-OBJECT STORE ON BERKELEY DB
----------------------------
-
-OSBDB is an implementation of an object store that uses Berkeley DB as
-the underlying storage. It is meant to be an alternative to EBOFS.
-
-BUILDING
---------
-
-You will need to have Berkeley DB installed, including the developent
-packages. We've tested this with Berkeley DB 4.4.20 on Ubuntu 6.10.
-
-To compile OSBDB support, you need to pass the argument "want_bdb=yes"
-to "make." If you don't specify this, OSBDB and all its associated
-support is not included in the executables.
-
-RUNNING
--------
-
-To use OSBDB in Ceph, simply pass the --bdbstore flag to programs. You
-don't need to create a "device" for OSBDB ahead of time; Berkeley DB
-will take care of creating the files. You also *cannot* use a raw
-device as your store -- it must be regular file.
-
-OSBDB additionally accepts the following flags:
-
-  --bdbstore-btree         Configures OSBDB to use the "Btree"
-                           database type for Berkeley DB. The default
-                           database type is "Hash".
-
-  --bdbstore-hash-ffactor  Sets the "fill factor" for the hash
-                           database type. Takes an integer argument.
-
-  --bdbstore-hash-nelem    Sets the "nelem" parameter for the hash
-                           database type. Takes an integer argument.
-
-  --bdbstore-hash-pagesize Sets the page size for the hash database
-                           type. Takes an integer argument.
-
-  --bdbstore-cachesize     Sets the cache size. Takes an integer
-                           argument, which must be a power of two, and
-                           no less than 20 KiB.
-
-  --bdbstore-transactional Enable (in-memory-only) transactions for
-                           all operations in the OSBDB store.
-
-  --debug-bdbstore         Set the debug level. Takes an integer
-                           argument.
diff --git a/src/os/BDBMap.h b/src/os/BDBMap.h
deleted file mode 100644 (file)
index 434c87f..0000000
+++ /dev/null
@@ -1,138 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software 
- * Foundation.  See file COPYING.
- * 
- */
-
-
-#ifndef CEPH_BERKELEYDB_H
-#define CEPH_BERKELEYDB_H
-
-#include <db.h>
-#include <list>
-#include <unistd.h>
-
-#include "common/debug.h"
-
-using namespace std;
-
-template<typename K, typename D>
-class BDBMap {
- private:
-  DB *dbp;
-  
- public:
-  BDBMap() : dbp(0) {}
-  ~BDBMap() {
-    close();
-  }
-
-  bool is_open() { return dbp ? true:false; }
-
-  // open/close
-  int open(const char *fn) {
-    //cout << "open " << fn << endl;
-
-    int r;
-    if ((r = db_create(&dbp, NULL, 0)) != 0) {
-      derr << "db_create: " << db_strerror(r) << dendl;
-      assert(0);
-    }
-
-    dbp->set_errfile(dbp, stderr);
-    dbp->set_errpfx(dbp, "bdbmap");
-
-    r = dbp->open(dbp, NULL, fn, NULL, DB_BTREE, DB_CREATE, 0644);
-    if (r != 0) {
-      dbp->err(dbp, r, "%s", fn);
-    }
-    assert(r == 0);
-    return 0;
-  }
-  void close() {
-    if (dbp) {
-      dbp->close(dbp,0);
-      dbp = 0;
-    }
-  }
-  void remove(const char *fn) {
-    if (!dbp) open(fn);
-    if (dbp) {
-      dbp->remove(dbp, fn, 0, 0);
-      dbp = 0;
-    } else {
-      ::unlink(fn);
-    }
-  }
-  
-  // accessors
-  int put(K key,
-          D data) {
-    DBT k;
-    memset(&k, 0, sizeof(k)); 
-    k.data = &key;
-    k.size = sizeof(K);
-    DBT d;
-    memset(&d, 0, sizeof(d));
-    d.data = &data;
-    d.size = sizeof(data);
-    return dbp->put(dbp, NULL, &k, &d, 0);
-  }
-
-  int get(K key,
-          D& data) {
-    DBT k;
-    memset(&k, 0, sizeof(k)); 
-    k.data = &key;
-    k.size = sizeof(key);
-    DBT d;
-    memset(&d, 0, sizeof(d));
-    d.data = &data;
-    d.size = sizeof(data);
-    int r = dbp->get(dbp, NULL, &k, &d, 0);
-    return r;
-  }
-
-  int del(K key) {
-    DBT k;
-    memset(&k, 0, sizeof(k)); 
-    k.data = &key;
-    k.size = sizeof(key);
-    return dbp->del(dbp, NULL, &k, 0);
-  }
-
-  int list_keys(list<K>& ls) {
-    DBC *cursor = 0;
-    int r = dbp->cursor(dbp, NULL, &cursor, 0);
-    assert(r == 0);
-
-    DBT k,d;
-    memset(&k, 0, sizeof(k));
-    memset(&d, 0, sizeof(d));
-
-    while ((r = cursor->c_get(cursor, &k, &d, DB_NEXT)) == 0) {
-      K key;
-      assert(k.size == sizeof(key));
-      memcpy(&key, k.data, k.size);
-      ls.push_back(key);
-    }
-    if (r != DB_NOTFOUND) {
-      dbp->err(dbp, r, "DBcursor->get");
-      assert(r == DB_NOTFOUND);
-    }
-
-    cursor->c_close(cursor);
-    return 0;
-  }
-  
-};
-
-#endif
diff --git a/src/os/FakeStoreBDBCollections.h b/src/os/FakeStoreBDBCollections.h
deleted file mode 100644 (file)
index 34b7e43..0000000
+++ /dev/null
@@ -1,167 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software 
- * Foundation.  See file COPYING.
- * 
- */
-
-
-#ifndef CEPH_FAKESTOREBDBCOLLECTIONS_H
-#define CEPH_FAKESTOREBDBCOLLECTIONS_H
-
-#include "BDBMap.h"
-#include "ObjectStore.h"
-#include "common/Mutex.h"
-
-#define BDBHASH_DIRS       128LL
-#define BDBHASH_FUNC(x)    (((x) ^ ((x)>>30) ^ ((x)>>18) ^ ((x)>>45) ^ 0xdead1234) * 884811 % BDBHASH_DIRS)
-
-class FakeStoreBDBCollections {
- private:
-  int whoami;
-  string basedir;
-
-  Mutex bdblock;
-
-  // collection dbs
-  BDBMap<coll_t, int>                 collections;
-  map<coll_t, BDBMap<object_t, int>*> collection_map;
-  
-  // dirs
-  void get_dir(string& dir) {
-    dir = basedir + "/" + string(whoami);
-  }
-  void get_collfn(coll_t c, string &fn) {
-    char s[100];
-    snprintf(s, sizeof(s), "%d/%02llx/%016llx.co", whoami, BDBHASH_FUNC(c), c);
-    fn = basedir + "/" + s;
-  }
-
-  void open_collections() {
-    string cfn;
-    get_dir(cfn);
-    cfn += "/collections";
-    collections.open(cfn.c_str());  
-    list<coll_t> ls;
-    collections.list_keys(ls);
-  }
-  void close_collections() {
-    if (collections.is_open())
-      collections.close();
-    
-    for (map<coll_t, BDBMap<object_t, int>*>::iterator it = collection_map.begin();
-         it != collection_map.end();
-         it++) {
-      it->second->close();
-    }
-    collection_map.clear();
-  }
-  
-  int open_collection(coll_t c) {
-    if (collection_map.count(c))
-      return 0;  // already open.
-    
-    string fn;
-    get_collfn(c,fn);
-    collection_map[c] = new BDBMap<coll_t,int>;
-    int r = collection_map[c]->open(fn.c_str());
-    if (r != 0)
-      collection_map.erase(c);  // failed
-    return r;
-  }
-  
- public:
-  FakeStoreBDBCollections(int w, string& bd) : whoami(w), basedir(bd) {}
-  ~FakeStoreBDBCollections() {
-    close_collections();
-  }
-
-  int list_collections(list<coll_t>& ls) {
-    bdblock.Lock();
-    if (!collections.is_open()) open_collections();
-    
-    ls.clear();
-    collections.list_keys(ls);
-    bdblock.Unlock();
-    return 0;
-  }
-  int create_collection(coll_t c) {
-    bdblock.Lock();
-    if (!collections.is_open()) open_collections();
-    
-    collections.put(c, 1);
-    open_collection(c);
-    bdblock.Unlock();
-    return 0;
-  }
-  int destroy_collection(coll_t c) {
-    bdblock.Lock();
-    if (!collections.is_open()) open_collections();
-    
-    collections.del(c);
-    
-    open_collection(c);
-    collection_map[c]->close();
-    
-    string fn;
-    get_collfn(c,fn);
-    collection_map[c]->remove(fn.c_str());
-    delete collection_map[c];
-    collection_map.erase(c);
-    bdblock.Unlock();
-    return 0;
-  }
-  int collection_stat(coll_t c, struct stat *st) {
-    bdblock.Lock();
-    if (!collections.is_open()) open_collections();
-    
-    string fn;
-    get_collfn(c,fn);
-    int r = ::stat(fn.c_str(), st);
-    bdblock.Unlock();
-    return r;
-  }
-  bool collection_exists(coll_t c) {
-    bdblock.Lock();
-    struct stat st;
-    int r = collection_stat(c, &st) == 0;
-    bdblock.Unlock();
-    return r;
-  }
-  int collection_add(coll_t c, object_t o) {
-    bdblock.Lock();
-    if (!collections.is_open()) open_collections();
-    
-    open_collection(c);
-    collection_map[c]->put(o,1);
-    bdblock.Unlock();
-    return 0;
-  }
-  int collection_remove(coll_t c, object_t o) {
-    bdblock.Lock();
-    if (!collections.is_open()) open_collections();
-    
-    open_collection(c);
-    collection_map[c]->del(o);
-    bdblock.Unlock();
-    return 0;
-  }
-  int collection_list(coll_t c, list<object_t>& o) {
-    bdblock.Lock();
-    if (!collections.is_open()) open_collections();
-    
-    open_collection(c);
-    collection_map[c]->list_keys(o);
-    bdblock.Unlock();
-    return 0;
-  }
-};
-
-#endif
index bfe1bc35975e5270abc7df566ef59e7712d803a8..a052b1a03452d0be1bea43cd82f6d65e6e509ab5 100644 (file)
@@ -25,7 +25,6 @@
 #include "common/Mutex.h"
 
 #include "Fake.h"
-//#include "FakeStoreBDBCollections.h"
 
 #include <map>
 #include <deque>
diff --git a/src/osbdb/OSBDB.cc b/src/osbdb/OSBDB.cc
deleted file mode 100644 (file)
index 2adc2d5..0000000
+++ /dev/null
@@ -1,2169 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
-// vim: ts=8 sw=2 smarttab
-/* OSBDB.cc -- ObjectStore on top of Berkeley DB.
-   Copyright (C) 2007 Casey Marshall <csm@soe.ucsc.edu>
-
-Ceph - scalable distributed file system
-
-This is free software; you can redistribute it and/or
-modify it under the terms of the GNU Lesser General Public
-License version 2.1, as published by the Free Software 
-Foundation.  See file COPYING. */
-
-
-#include <map>
-#include <string>
-#include <cerrno>
-#include "OSBDB.h"
-#include "common/Timer.h"
-
-using namespace std;
-
-#define dout_prefix *_dout << "bdbstore(" << device << ")@" << __LINE__ << "."
-#define derr(x) dout(x)
-
-#define CLEANUP(onsafe) do { \
-    dout(6) << "DELETE " << hex << onsafe << dec << dendl; \
-    delete onsafe; \
-  } while (0)
-#define COMMIT(onsafe) do { \
-    dout(6) << "COMMIT " << hex << onsafe << dec << dendl; \
-    sync(onsafe); \
-  } while (0)
-
-\f // Have a lock, already.
-
-class scoped_lock
-{
-private:
-  Mutex *m;
-public:
-  scoped_lock(Mutex *m) : m(m) { m->Lock(); }
-  ~scoped_lock() { m->Unlock(); }
-};
-
-\f // Utilities.
-
-// Starting off with my own bsearch; mail reader to follow...
-
-// Perform a binary search on a sorted array, returning the insertion
-// point for key, or key if it is exactly found. In other words, this
-// will return a pointer to the element that will come after key if
-// key were to be inserted into the sorted array.
-//
-// Requires that T have < and > operators defined.
-template<typename T>
-uint32_t binary_search (T *array, size_t size, T key)
-{
-  int low = 0;
-  int high = size;
-  int p = (low + high) / 2;
-
-  while (low < high - 1)
-    {
-      if (array[p] > key)
-        {
-          high = p;
-        }
-      else if (array[p] < key)
-        {
-          low = p;
-        }
-      else
-        return p;
-
-      p = (low + high) / 2;
-    }
-
-  if (array[p] < key)
-    p++;
-  else if (array[p] > key && p > 0)
-    p--;
-  return p;
-} 
-
-\f // Management.
-
-DbEnv *OSBDB::getenv ()
-{
-  DbEnv *envp = new DbEnv (DB_CXX_NO_EXCEPTIONS);
-  if (g_conf.debug > 1 || g_conf.debug_bdbstore > 1)
-    envp->set_error_stream (&std::cerr);
-  if (g_conf.debug > 2 || g_conf.debug_bdbstore > 2)
-    envp->set_message_stream (&std::cout);
-  envp->set_flags (DB_LOG_INMEMORY, 1);
-  //env->set_flags (DB_DIRECT_DB, 1);
-  int env_flags = (DB_CREATE
-                   | DB_THREAD
-                   //| DB_INIT_LOCK
-                   | DB_INIT_MPOOL
-                   //| DB_INIT_TXN
-                   //| DB_INIT_LOG
-                   | DB_PRIVATE);
-  if (envp->open (NULL, env_flags, 0) != 0)
-    {
-      std::cerr << "failed to open environment " << std::endl;
-      assert(0);
-    }
-  return envp;
-}
-
-int OSBDB::opendb(DBTYPE type, int flags, bool new_env)
-{
-  env = getenv();
-  db = new Db(env, 0);
-  db->set_error_stream (&std::cerr);
-  db->set_message_stream (&std::cout);
-  db->set_flags (0);
-  if (!g_conf.bdbstore_btree)
-    {
-      if (g_conf.bdbstore_pagesize > 0)
-        db->set_pagesize (g_conf.bdbstore_pagesize);
-      if (g_conf.bdbstore_ffactor > 0 && g_conf.bdbstore_nelem > 0)
-        {
-          db->set_h_ffactor (g_conf.bdbstore_ffactor);
-          db->set_h_nelem (g_conf.bdbstore_nelem);
-        }
-    }
-  if (g_conf.bdbstore_cachesize > 0)
-    {
-      db->set_cachesize (0, g_conf.bdbstore_cachesize, 0);
-    }
-
-  flags = flags | DB_THREAD;
-  if (transactional)
-    flags = flags | DB_AUTO_COMMIT;
-
-  int ret;
-  if ((ret = db->open (NULL, device.c_str(), NULL, type, flags, 0)) != 0)
-    {
-      derr(1) << "failed to open database: " << device << ": "
-              << db_strerror(ret) << std::dendl;
-      return -EINVAL;
-    }
-  opened = true;
-  return 0;
-}
-
-int OSBDB::mount()
-{
-  dout(2) << "mount " << device << dendl;
-
-  if (mounted)
-    {
-      dout(4) << "..already mounted" << dendl;
-      return 0;
-    }
-
-  if (!opened)
-    {
-      int ret;
-      if ((ret = opendb ()) != 0)
-        {
-          dout(4) << "..returns " << ret << dendl;
-          return ret;
-        }
-    }
-
-  // XXX Do we want anything else in the superblock?
-
-  Dbt key (OSBDB_SUPERBLOCK_KEY, 1);
-  stored_superblock super;
-  Dbt value (&super, sizeof (super));
-  value.set_dlen (sizeof (super));
-  value.set_ulen (sizeof (super));
-  value.set_flags (DB_DBT_USERMEM | DB_DBT_PARTIAL);
-
-  if (db->get (NULL, &key, &value, 0) != 0)
-    {
-      dout(4) << "..get superblock fails" << dendl;
-      return -EINVAL; // XXX how to say "badly formed fs?"
-    }
-
-  dout(3) << ".mount " << super << dendl;
-
-  if (super.version != OSBDB_THIS_VERSION)
-    {
-      dout(4) << "version mismatch (" << super.version << ")" << dendl;
-      return -EINVAL;
-    }
-
-  DBTYPE t;
-  db->get_type (&t);
-
-  if (t == DB_BTREE)
-    {
-      u_int32_t minkey;
-      u_int32_t flags;
-      db->get_bt_minkey (&minkey);
-      db->get_flags (&flags);
-      dout(1) << "mounted version " << OSBDB_THIS_VERSION << "; Btree; "
-              << "min keys per page: " << minkey << "; flags: "
-              << hex << flags << dec << dendl;
-      cout << dec;
-    }
-  else
-    {
-      u_int32_t ffactor;
-      u_int32_t nelem;
-      u_int32_t flags;
-      db->get_h_ffactor (&ffactor);
-      db->get_h_nelem (&nelem);
-      db->get_flags (&flags);
-      dout(1) << "mounted version " << OSBDB_THIS_VERSION << "; Hash; "
-              << "fill factor: " << ffactor
-              << " table size: " << nelem << "; flags: "
-              << hex << flags << dec << dendl;
-      cout << dec;
-    }
-
-  mounted = true;
-  dout(4) << "..mounted" << dendl;
-  return 0;
-}
-
-int OSBDB::umount()
-{
-  if (!mounted)
-    return -EINVAL;
-
-  dout(2) << "umount" << dendl;
-
-  int ret;
-  if (opened)
-    {
-      if (transactional)
-        {
-          env->log_flush (NULL);
-          if ((ret = env->lsn_reset (device.c_str(), 0)) != 0)
-            {
-              derr(1) << "lsn_reset: " << db_strerror (ret) << dendl;
-            }
-        }
-
-      db->sync (0);
-
-      if ((ret = db->close (0)) != 0)
-        {
-          derr(1) << "close: " << db_strerror(ret) << dendl;
-          return -EINVAL;
-        }
-      delete db;
-      db = NULL;
-
-      if (env)
-        {
-          env->close (0);
-          delete env;
-          env = NULL;
-        }
-    }
-  mounted = false;
-  opened = false;
-  dout(4) << "..unmounted" << dendl;
-  return 0;
-}
-
-int OSBDB::mkfs()
-{
-  if (mounted)
-    return -EINVAL;
-
-  dout(2) << "mkfs" << dendl;
-
-  string d = env_dir;
-  d += device;
-  unlink (d.c_str());
-
-  int ret;
-  if ((ret = opendb((g_conf.bdbstore_btree ? DB_BTREE : DB_HASH),
-                    DB_CREATE, true)) != 0)
-    {
-      derr(1) << "failed to open database: " << device << ": "
-              << db_strerror(ret) << std::dendl;
-      return -EINVAL;
-    }
-  opened = true;
-  dout(3) << "..opened " << device << dendl;
-
-  uint32_t c;
-  ret = db->truncate (NULL, &c, 0);
-  if (ret != 0)
-    {
-      derr(1) << "db truncate failed: " << db_strerror (ret) << dendl;
-      return -EIO; // ???
-    }
-
-  Dbt key (OSBDB_SUPERBLOCK_KEY, 1);
-  struct stored_superblock sb;
-  sb.version = OSBDB_THIS_VERSION;
-  Dbt value (&sb, sizeof (sb));
-
-  dout(3) << "..writing superblock" << dendl;
-  if ((ret = db->put (NULL, &key, &value, 0)) != 0)
-    {
-      derr(1) << "failed to write superblock: " << db_strerror (ret)
-              << dendl;
-      return -EIO;
-    }
-  dout(3) << "..wrote superblock" << dendl;
-  dout(4) << "..mkfs done" << dendl;
-  return 0;
-}
-
-\f // Objects.
-
-int OSBDB::pick_object_revision_lt(object_t& oid)
-{
-  // Not really needed.
-  dout(0) << "pick_object_revision_lt " << oid << dendl;
-  return -ENOSYS;
-}
-
-bool OSBDB::exists(object_t oid)
-{
-  dout(2) << "exists " << oid << dendl;
-  struct stat st;
-  bool ret = (stat (oid, &st) == 0);
-  dout(4) << "..returns " << ret << dendl;
-  return ret;
-}
-
-int OSBDB::statfs (struct statfs *st)
-{
-  // Hacky?
-  if (::statfs (device.c_str(), st) != 0)
-    {
-      int ret = -errno;
-      derr(1) << "statfs returns " << ret << dendl;
-      return ret;
-    }
-  st->f_type = OSBDB_MAGIC;
-  dout(4) << "..statfs OK" << dendl;
-  return 0;
-}
-
-int OSBDB::stat(object_t oid, struct stat *st)
-{
-  if (!mounted)
-    {
-      dout(4) << "not mounted!" << dendl;
-      return -EINVAL;
-    }
-
-  dout(2) << "stat " << oid << dendl;
-
-  object_inode_key ikey = new_object_inode_key(oid);
-  stored_object obj;
-  Dbt key (&ikey, sizeof_object_inode_key());
-  Dbt value (&obj, sizeof (obj));
-  value.set_flags (DB_DBT_USERMEM);
-  value.set_ulen (sizeof (obj));
-
-  dout(3) << "  lookup " << ikey << dendl;
-  int ret;
-  if ((ret = db->get (NULL, &key, &value, 0)) != 0)
-    {
-      derr(1) << " get returned " << ret << dendl;
-      return -ENOENT;
-    }
-
-  st->st_size = obj.length;
-  dout(3) << "stat length:" << obj.length << dendl;
-  dout(4) << "..stat OK" << dendl;
-  return 0;
-}
-
-int OSBDB::remove(object_t oid, Context *onsafe)
-{
-  if (!mounted)
-    {
-      derr(1) << "not mounted!" << dendl;
-      if (onsafe != NULL)
-        CLEANUP(onsafe);
-      return -EINVAL;
-    }
-
-  dout(6) << "Context " << hex << onsafe << dec << dendl;
-  scoped_lock __lock(&lock);
-  dout(2) << "remove " << oid << dendl;
-
-  DbTxn *txn = NULL;
-  if (transactional)
-    env->txn_begin (NULL, &txn, 0);
-
-  oid_t id;
-  mkoid(id, oid);
-  Dbt key (&id, sizeof (oid_t));
-  int ret;
-  if ((ret = db->del (txn, &key, 0)) != 0)
-    {
-      derr(1) << ".del returned error: " << db_strerror (ret) << dendl;
-      if (txn != NULL)
-        txn->abort();
-      if (onsafe != NULL)
-        CLEANUP(onsafe);
-      return -EIO;
-    }
-
-  object_inode_key _ikey = new_object_inode_key (oid);
-  Dbt ikey (&_ikey, sizeof_object_inode_key());
-  if ((ret = db->del (txn, &ikey, 0)) != 0)
-    {
-      derr(1) << ".del returned error: " << db_strerror (ret) << dendl;
-      if (txn != NULL)
-        txn->abort();
-      if (onsafe != NULL)
-        CLEANUP(onsafe);
-      return -EIO;
-    }
-
-  attrs_id aids = new_attrs_id (oid);
-  Dbt askey (&aids, sizeof_attrs_id());
-  Dbt asval;
-  asval.set_flags (DB_DBT_MALLOC);
-  if (db->get (txn, &askey, &asval, 0) == 0)
-    {
-      // We have attributes; remove them.
-      stored_attrs *sap = (stored_attrs *) asval.get_data();
-      auto_ptr<stored_attrs> sa (sap);
-      for (unsigned i = 0; i < sap->count; i++)
-        {
-          attr_id aid = new_attr_id (oid, sap->names[i].name);
-          Dbt akey (&aid, sizeof (aid));
-          if ((ret = db->del (txn, &akey, 0)) != 0)
-            {
-              derr(1) << ".del returns error: " << db_strerror (ret) << dendl;
-              if (txn != NULL)
-                txn->abort();
-              if (onsafe != NULL)
-                CLEANUP(onsafe);
-              return -EIO;
-            }
-        }
-      if ((ret = db->del (txn, &askey, 0)) != 0)
-        {
-          derr(1) << ".del returns error: " << db_strerror (ret) << dendl;
-          if (txn != NULL)
-            txn->abort();
-          if (onsafe != NULL)
-            CLEANUP(onsafe);
-          return -EIO;
-        }
-    }
-
-  // XXX check del return value
-
-  if (txn != NULL)
-    txn->commit (0);
-  if (onsafe != NULL)
-    COMMIT(onsafe);
-  dout(4) << "..remove OK" << dendl;
-  return 0;
-}
-
-int OSBDB::truncate(object_t oid, off_t size, Context *onsafe)
-{
-  dout(6) << "Context " << hex << onsafe << dec << dendl;
-
-  if (!mounted)
-    {
-      derr(1) << "not mounted!" << dendl;
-      if (onsafe != NULL)
-        CLEANUP(onsafe);
-      return -EINVAL;
-    }
-
-  scoped_lock __lock(&lock);
-  dout(2) << "truncate " << size << dendl;
-
-  if (size > 0xFFFFFFFF)
-    {
-      derr(1) << "object size too big!" << dendl;
-      if (onsafe != NULL)
-        CLEANUP(onsafe);
-      return -ENOSPC;
-    }
-
-  DbTxn *txn = NULL;
-
-  if (transactional)
-    env->txn_begin (NULL, &txn, 0);
-
-  object_inode_key ikey = new_object_inode_key (oid);
-  stored_object obj;
-  Dbt key (&ikey, sizeof_object_inode_key());
-  Dbt value (&obj, sizeof (obj));
-  value.set_dlen (sizeof (obj));
-  value.set_ulen (sizeof (obj));
-  value.set_flags (DB_DBT_USERMEM);
-
-  if (db->get (txn, &key, &value, 0) != 0)
-    {
-      if (txn)
-        txn->abort();
-      if (onsafe != NULL)
-        CLEANUP(onsafe);
-      dout(4) << "..returns -ENOENT" << dendl;
-      return -ENOENT;
-    }
-
-  if (obj.length < size)
-    {
-      oid_t id;
-      mkoid (id, oid);
-      Dbt okey (&id, sizeof (oid_t));
-      char b[] = { '\0' };
-      Dbt newVal (b, 1);
-      newVal.set_doff ((size_t) size);
-      newVal.set_dlen (1);
-      newVal.set_ulen (1);
-      newVal.set_flags (DB_DBT_PARTIAL);
-      if (db->put (txn, &okey, &newVal, 0) != 0)
-        {
-          if (txn)
-            txn->abort();
-          if (onsafe != NULL)
-            CLEANUP(onsafe);
-          derr(1) << ".updating object failed" << dendl;
-          return -EIO;
-        }
-
-      obj.length = size;
-      value.set_ulen (sizeof (obj));
-      if (db->put (txn, &key, &value, 0) != 0)
-        {
-          if (txn)
-            txn->abort();
-          if (onsafe != NULL)
-            CLEANUP(onsafe);
-          derr(1) << ".updating object info failed" << dendl;
-          return -EIO;
-        }
-    }
-  else if (obj.length > size)
-    {
-      obj.length = size;
-      Dbt tval (&obj, sizeof (obj));
-      tval.set_ulen (sizeof (obj));
-      tval.set_flags (DB_DBT_USERMEM);
-      if (db->put (txn, &key, &tval, 0) != 0)
-        {
-          if (txn)
-            txn->abort();
-          if (onsafe != NULL)
-            CLEANUP(onsafe);
-          derr(1) << ".updating object info failed" << dendl;
-          return -EIO;
-        }
-      if (size == 0)
-        {
-          char x[1];
-          oid_t id;
-          mkoid (id, oid);
-          Dbt okey (&id, sizeof (oid_t));
-          Dbt oval (&x, 0);
-          if (db->put (txn, &okey, &oval, 0) != 0)
-            {
-              if (txn)
-                txn->abort();
-              if (onsafe != NULL)
-                CLEANUP(onsafe);
-              derr(1) << ".updating object failed" << dendl;
-              return -EIO;
-            }
-        }
-      else
-        {
-          oid_t id;
-          mkoid (id, oid);
-          Dbt okey (&id, sizeof (oid_t));
-          Dbt oval;
-          oval.set_flags (DB_DBT_MALLOC);
-          if (db->get (txn, &okey, &oval, 0) != 0)
-            {
-              if (txn)
-                txn->abort();
-              if (onsafe != NULL)
-                CLEANUP(onsafe);
-              derr(1) << ".getting old object failed" << dendl;
-              return -EIO;
-            }
-          auto_ptr<char> ovalPtr ((char *) oval.get_data());
-          oval.set_size ((size_t) size);
-          oval.set_ulen ((size_t) size);
-          if (db->put (txn, &okey, &oval, 0) != 0)
-            {
-              if (txn)
-                txn->abort();
-              if (onsafe != NULL)
-                CLEANUP(onsafe);
-              derr(1) << ".putting new object failed" << dendl;
-              return -EIO;
-            }
-        }
-    }
-
-  if (txn)
-    txn->commit (0);
-  if (onsafe != NULL)
-    COMMIT(onsafe);
-
-  dout(4) << "..truncate OK" << dendl;
-  return 0;
-}
-
-int OSBDB::read(object_t oid, off_t offset, size_t len, bufferlist& bl)
-{
-  if (!mounted)
-    {
-      derr(1) << "not mounted!" << dendl;
-      return -EINVAL;
-    }
-
-  dout(2) << "read " << oid << " " << offset << " "
-          << len << dendl;
-
-  if (bl.length() < len)
-    {
-      int remain = len - bl.length();
-      bufferptr ptr (remain);
-      bl.push_back(ptr);
-    }
-
-  DbTxn *txn = NULL;
-  if (transactional)
-    env->txn_begin (NULL, &txn, 0);
-
-  object_inode_key _ikey = new_object_inode_key (oid);
-  stored_object obj;
-  Dbt ikey (&_ikey, sizeof_object_inode_key());
-  Dbt ival (&obj, sizeof (obj));
-  ival.set_flags (DB_DBT_USERMEM);
-  ival.set_ulen (sizeof(obj));
-
-  dout(3) << "..get " << _ikey << dendl;
-  int ret;
-  if ((ret = db->get (txn, &ikey, &ival, 0)) != 0)
-    {
-      if (txn)
-        txn->abort();
-      derr(1) << "get returned " << db_strerror (ret) << dendl;
-      return -ENOENT;
-    }
-
-  dout(3) << "..object has size " << obj.length << dendl;
-
-  if (offset == 0 && len >= obj.length)
-    {
-      len = obj.length;
-      dout(3) << "..doing full read of " << len << dendl;
-      oid_t id;
-      mkoid (id, oid);
-      Dbt key (&id, sizeof (oid_t));
-      Dbt value (bl.c_str(), len);
-      value.set_ulen (len);
-      value.set_flags (DB_DBT_USERMEM);
-      dout(3) << "..getting " << oid << dendl;
-      if ((ret = db->get (txn, &key, &value, 0)) != 0)
-        {
-          derr(1) << ".get returned " << db_strerror (ret) << dendl;
-          if (txn)
-            txn->abort();
-          return -EIO;
-        }
-    }
-  else
-    {
-      if (offset > obj.length)
-        {
-          dout(2) << "..offset out of range" << dendl;
-          return 0;
-        }
-      if (offset + len > obj.length)
-        len = obj.length - (size_t) offset;
-      dout(3) << "..doing partial read of " << len << dendl;
-      oid_t id;
-      mkoid (id, oid);
-      Dbt key (&id, sizeof (oid));
-      Dbt value;
-      char *data = bl.c_str();
-      dout(3) << ".bufferlist c_str returned " << ((void*) data) << dendl;
-      value.set_data (data);
-      value.set_doff ((size_t) offset);
-      value.set_dlen (len);
-      value.set_ulen (len);
-      value.set_flags (DB_DBT_USERMEM | DB_DBT_PARTIAL);
-      dout(3) << "..getting " << oid << dendl;
-      if ((ret = db->get (txn, &key, &value, 0)) != 0)
-        {
-          derr(1) << ".get returned " << db_strerror (ret) << dendl;
-          if (txn)
-            txn->abort();
-          return -EIO;
-        }
-    }
-
-  if (txn)
-    txn->commit (0);
-  dout(4) << "..read OK, returning " << len << dendl;
-  return len;
-}
-
-int OSBDB::write(object_t oid, off_t offset, size_t len,
-                 bufferlist& bl, Context *onsafe)
-{
-  dout(6) << "Context " << hex << onsafe << dec << dendl;
-  if (!mounted)
-    {
-      derr(1) << "not mounted!" << dendl;
-      if (onsafe != NULL)
-        CLEANUP(onsafe);
-      return -EINVAL;
-    }
-
-  scoped_lock __lock(&lock);
-  dout(2) << "write " << oid << " " << offset << " "
-          << len << dendl;
-
-  if (offset > 0xFFFFFFFFL || offset + len > 0xFFFFFFFFL)
-    {
-      derr(1) << "object too big" << dendl;
-      if (onsafe != NULL)
-        CLEANUP(onsafe);
-      return -ENOSPC;
-    }
-
-  DbTxn *txn = NULL;
-  if (transactional)
-    env->txn_begin (txn, &txn, 0);
-
-  object_inode_key _ikey = new_object_inode_key (oid);
-  stored_object obj;
-  Dbt ikey (&_ikey, sizeof_object_inode_key());
-  Dbt ival (&obj, sizeof (obj));
-  ival.set_ulen (sizeof (obj));
-  ival.set_flags (DB_DBT_USERMEM);
-
-  int ret;
-  dout(3) << "..getting " << _ikey << dendl;
-  if (db->get (txn, &ikey, &ival, 0) != 0)
-    {
-      dout(3) << "..writing new object" << dendl;
-
-      // New object.
-      obj.length = (size_t) offset + len;
-      dout(3) << "..mapping " << _ikey << " => "
-              << obj << dendl;
-      if ((ret = db->put (txn, &ikey, &ival, 0)) != 0)
-        {
-          derr(1) << "..put returned " << db_strerror (ret) << dendl;
-          if (txn)
-            txn->abort();
-          if (onsafe != NULL)
-            CLEANUP(onsafe);
-          return -EIO;
-        }
-
-      oid_t id;
-      mkoid (id, oid);
-      Dbt key (&id, sizeof (oid_t));
-      Dbt value (bl.c_str(), len);
-      if (offset == 0) // whole object
-        {
-          value.set_flags (DB_DBT_USERMEM);
-          value.set_ulen (len);
-        }
-      else
-        {
-          value.set_flags (DB_DBT_USERMEM | DB_DBT_PARTIAL);
-          value.set_ulen (len);
-          value.set_doff ((size_t) offset);
-          value.set_dlen (len);
-        }
-      dout(3) << "..mapping " << oid << " => ("
-              << obj.length << " bytes)" << dendl;
-      if ((ret = db->put (txn, &key, &value, 0)) != 0)
-        {
-          derr(1) << "..put returned " << db_strerror (ret) << dendl;
-          if (txn)
-            txn->abort();
-          if (onsafe != NULL)
-            CLEANUP(onsafe);
-          return -EIO;
-        }
-
-      if (txn != NULL)
-        txn->commit (0);
-      if (onsafe != NULL)
-        COMMIT(onsafe);
-
-      dout(4) << "..write OK, returning " << len << dendl;
-      return len;
-    }
-
-  if (offset == 0 && len >= obj.length)
-    {
-      if (len != obj.length)
-        {
-          obj.length = len;
-          if ((ret = db->put (txn, &ikey, &ival, 0)) != 0)
-            {
-              derr(1) << "  put returned " << db_strerror (ret) << dendl;
-              if (txn)
-                txn->abort();
-              if (onsafe != NULL)
-                CLEANUP(onsafe);
-              return -EIO;
-            }
-        }
-      oid_t id;
-      mkoid(id, oid);
-      Dbt key (&id, sizeof (oid_t));
-      Dbt value (bl.c_str(), len);
-      if (db->put (txn, &key, &value, 0) != 0)
-        {
-          if (txn)
-            txn->abort();
-          if (onsafe != NULL)
-            CLEANUP(onsafe);
-          derr(1) << "..writing object failed!" << dendl;
-          return -EIO;
-        }
-    }
-  else
-    {
-      if (offset + len > obj.length)
-        {
-          obj.length = (size_t) offset + len;
-          if (db->put (txn, &ikey, &ival, 0) != 0)
-            {
-              if (txn)
-                txn->abort();
-              if (onsafe != NULL)
-                CLEANUP(onsafe);
-              derr(1) << "..writing object info failed!" << dendl;
-              return -EIO;
-            }
-        }
-      oid_t id;
-      mkoid(id, oid);
-      Dbt key (&id, sizeof (oid_t));
-      Dbt value (bl.c_str(), len);
-      value.set_doff ((size_t) offset);
-      value.set_dlen (len);
-      value.set_ulen (len);
-      value.set_flags (DB_DBT_PARTIAL);
-      if (db->put (txn, &key, &value, 0) != 0)
-        {
-          if (txn)
-            txn->abort();
-          if (onsafe != NULL)
-            CLEANUP(onsafe);
-          derr(1) << "..writing object failed!" << dendl;
-          return -EIO;
-        }
-    }
-
-  if (txn != NULL)
-    txn->commit (0);
-  if (onsafe != NULL)
-    COMMIT(onsafe);
-
-  dout(4) << "..write OK, returning " << len << dendl;
-  return len;
-}
-
-int OSBDB::clone(object_t oid, object_t noid)
-{
-  if (!mounted)
-    {
-      derr(1) << "not mounted!" << dendl;
-      return -EINVAL;
-    }
-
-  dout(2) << "clone " << oid << ", " << noid << dendl;
-
-  if (exists (noid))
-    {
-      dout(4) << "..target exists; returning -EEXIST" << dendl;
-      return -EEXIST;
-    }
-
-  DbTxn *txn = NULL;
-  if (transactional)
-    env->txn_begin (NULL, &txn, 0);
-
-  object_inode_key _ikey = new_object_inode_key (oid);
-  object_inode_key _nikey = new_object_inode_key (noid);
-  stored_object obj;
-  Dbt ikey (&_ikey, sizeof_object_inode_key());
-  Dbt ival (&obj, sizeof (obj));
-  Dbt nikey (&_nikey, sizeof_object_inode_key());
-  ival.set_ulen (sizeof (obj));
-  ival.set_flags (DB_DBT_USERMEM);
-
-  oid_t id, nid;
-  mkoid(id, oid);
-  mkoid(nid, noid);
-  Dbt key (&id, sizeof (oid_t));
-  Dbt nkey (&oid, sizeof (oid_t));
-  Dbt value;
-  value.set_flags (DB_DBT_MALLOC);
-
-  if (db->get (txn, &ikey, &ival, 0) != 0)
-    {
-      if (txn)
-        txn->abort();
-      derr(1) << "..getting object info failed!" << dendl;
-      return -ENOENT;
-    }
-  if (db->get (txn, &key, &value, 0) != 0)
-    {
-      if (txn)
-        txn->abort();
-      derr(1) << "..getting original object failed" << dendl;
-      return -ENOENT;
-    }
-  auto_ptr<char> valueptr ((char *) value.get_data());
-
-  if (db->put (txn, &nikey, &ival, 0) != 0)
-    {
-      if (txn)
-        txn->abort();
-      derr(1) << "..putting object info failed" << dendl;
-      return -EIO;
-    }
-  if (db->put (txn, &nkey, &value, 0) != 0)
-    {
-      if (txn)
-        txn->abort();
-      derr(1) << "..putting new object failed" << dendl;
-      return -EIO;
-    }
-
-  if (txn)
-    txn->commit (0);
-
-  dout(4) << "..clone OK" << dendl;
-  return 0;
-}
-
-\f // Collections
-
-int OSBDB::list_collections(list<coll_t>& ls)
-{
-  if (!mounted)
-    {
-      derr(1) << "not mounted!" << dendl;
-      return -EINVAL;
-    }
-
-  dout(2) << "list_collections" << dendl;
-
-  Dbt key (COLLECTIONS_KEY, 1);
-  Dbt value;
-  value.set_flags (DB_DBT_MALLOC);
-
-  if (db->get (NULL, &key, &value, 0) != 0)
-    {
-      dout(4) << "..no collections" << dendl;
-      return 0; // no collections.
-    }
-
-  auto_ptr<stored_colls> sc ((stored_colls *) value.get_data());
-  stored_colls *scp = sc.get();
-  for (uint32_t i = 0; i < sc->count; i++)
-    ls.push_back (scp->colls[i]);
-
-  dout(4) << "..list_collections returns " << scp->count << dendl;
-  return scp->count;
-}
-
-int OSBDB::create_collection(coll_t c, Context *onsafe)
-{
-  dout(6) << "Context " << hex << onsafe << dec << dendl;
-  if (!mounted)
-    {
-      derr(1) << "not mounted" << dendl;
-      if (onsafe != NULL)
-        CLEANUP(onsafe);
-      return -EINVAL;
-    }
-
-  scoped_lock __lock(&lock);
-  dout(2) << "create_collection " << hex << c << dec << dendl;
-
-  Dbt key (COLLECTIONS_KEY, 1);
-  Dbt value;
-  value.set_flags (DB_DBT_MALLOC);
-
-  DbTxn *txn = NULL;
-  if (transactional)
-    env->txn_begin (NULL, &txn, 0);
-
-  stored_colls *scp = NULL;
-  size_t sz = 0;
-  bool created = false;
-  if (db->get (txn, &key, &value, 0) != 0)
-    {
-      sz = sizeof (stored_colls) + sizeof (coll_t);
-      scp = (stored_colls *) malloc (sz);
-      scp->count = 0;
-      created = true;
-    }
-  else
-    {
-      scp = (stored_colls *) value.get_data();
-      sz = value.get_size();
-    }
-
-  auto_ptr<stored_colls> sc (scp);
-  int ins = 0;
-  if (scp->count > 0)
-    ins = binary_search<coll_t> (scp->colls, scp->count, c);
-  if (ins < scp->count && scp->colls[ins] == c)
-    {
-      if (txn != NULL)
-        txn->abort();
-      if (onsafe != NULL)
-        CLEANUP(onsafe);
-      derr(1) << ".collection " << c << " already exists " << dendl;
-      return -EEXIST;
-    }
-
-  dout(3) << "..insertion point: " << ins << dendl;
-
-  // Make room for a new collection ID.
-  if (!created)
-  {
-    sz += sizeof (coll_t);
-    dout(3) << "..increase size to " << sz << dendl;
-    stored_colls *scp2 = (stored_colls *) realloc (scp, sz);
-    sc.release ();
-    sc.reset (scp2);
-    scp = scp2;
-  }
-
-  int n = (scp->count - ins) * sizeof (coll_t);
-  if (n > 0)
-    {
-      dout(3) << "..moving " << n << " bytes up" << dendl;
-      memmove (&scp->colls[ins + 1], &scp->colls[ins], n);
-    }
-  scp->count++;
-  scp->colls[ins] = c;
-
-  dout(3) << "..collections: " << scp << dendl;
-
-  // Put the modified collection list back.
-  {
-    Dbt value2 (scp, sz);
-    if (db->put (txn, &key, &value2, 0) != 0)
-      {
-        if (txn != NULL)
-          txn->abort();
-        if (onsafe != NULL)
-          CLEANUP(onsafe);
-        derr(1) << ".writing new collections list failed" << dendl;
-        return -EIO;
-      }
-  }
-
-  // Create the new collection.
-  {
-    stored_coll new_coll;
-    new_coll.count = 0;
-    Dbt coll_key (&c, sizeof (coll_t));
-    Dbt coll_value (&new_coll, sizeof (stored_coll));
-    if (db->put (txn, &coll_key, &coll_value, 0) != 0)
-      {
-        if (txn != NULL)
-          txn->abort();
-        if (onsafe != NULL)
-          CLEANUP(onsafe);
-        derr(1) << ".writing new collection failed" << dendl;
-        return -EIO;
-      }
-  }
-
-  if (txn != NULL)
-    txn->commit (0);
-  if (onsafe != NULL)
-    COMMIT(onsafe);
-
-  dout(4) << "..create_collection OK" << dendl;
-  return 0;
-}
-
-int OSBDB::destroy_collection(coll_t c, Context *onsafe)
-{
-  dout(6) << "Context " << hex << onsafe << dec << dendl;
-  if (!mounted)
-    {
-      derr(1) << "not mounted" << dendl;
-      if (onsafe != NULL)
-        CLEANUP(onsafe);
-      return -EINVAL;
-    }
-
-  scoped_lock __lock(&lock);
-  dout(2) << "destroy_collection " << hex << c << dec << dendl;
-
-  Dbt key (COLLECTIONS_KEY, 1);
-  Dbt value;
-  value.set_flags (DB_DBT_MALLOC);
-  DbTxn *txn = NULL;
-
-  if (transactional)
-    env->txn_begin (NULL, &txn, 0);
-
-  if (db->get (txn, &key, &value, 0) != 0)
-    {
-      if (txn != NULL)
-        txn->abort();
-      if (onsafe != NULL)
-        CLEANUP(onsafe);
-      derr(1) << ".collection list doesn't exist" << dendl;
-      return -ENOENT; // XXX
-    }
-
-  stored_colls *scp = (stored_colls *) value.get_data();
-  auto_ptr<stored_colls> valueBuf (scp);
-  if (scp->count == 0)
-    {
-      if (txn != NULL)
-        txn->abort();
-      if (onsafe != NULL)
-        CLEANUP(onsafe);
-      derr(1) << ".collection " << c << " not listed" << dendl;
-      return -ENOENT;
-    }
-  uint32_t ins = binary_search<coll_t> (scp->colls, scp->count, c);
-  dout(4) << "..insertion point is " << ins << dendl;
-  if (ins >= scp->count || scp->colls[ins] != c)
-    {
-      if (txn != NULL)
-        txn->abort();
-      if (onsafe != NULL)
-        CLEANUP(onsafe);
-      derr(1) << ".collection " << c << " not listed" << dendl;
-      return -ENOENT;
-    }
-
-  dout(4) << "..collections list is " << scp << dendl;
-
-  // Move the rest of the list down in memory, if needed.
-  if (ins < scp->count)
-    {
-      size_t n = scp->count - ins - 1;
-      dout(4) << "..shift list down " << n << dendl;
-      memmove (&scp->colls[ins], &scp->colls[ins + 1], n);
-    }
-
-  dout(4) << "..collections list is " << scp << dendl;
-
-  // Modify the record size to be one less.
-  Dbt nvalue (scp, value.get_size() - sizeof (coll_t));
-  nvalue.set_flags (DB_DBT_USERMEM);
-  if (db->put (txn, &key, &nvalue, 0) != 0)
-    {
-      if (txn != NULL)
-        txn->abort();
-      if (onsafe != NULL)
-        CLEANUP(onsafe);
-      derr(1) << ".putting modified collection list failed" << dendl;
-      return -EIO;
-    }
-
-  // Delete the collection.
-  Dbt collKey (&c, sizeof (coll_t));
-  if (db->del (txn, &collKey, 0) != 0)
-    {
-      if (txn != NULL)
-        txn->abort();
-      if (onsafe != NULL)
-        CLEANUP(onsafe);
-      derr(1) << ".deleting collection failed" << dendl;
-      return -EIO;
-    }
-
-  if (txn != NULL)
-    txn->commit (0);
-  if (onsafe != NULL)
-    COMMIT(onsafe);
-  dout(4) << "..destroy_collection OK" << dendl;
-  return 0;
-}
-
-bool OSBDB::collection_exists(coll_t c)
-{
-  if (!mounted)
-    {
-      derr(1) << "not mounted" << dendl;
-      return -EINVAL;
-    }
-
-  dout(2) << "collection_exists " << hex << c << dec << dendl;
-
-  /*Dbt key (COLLECTIONS_KEY, 1);
-  Dbt value;
-  value.set_flags (DB_DBT_MALLOC);
-
-  if (db->get (NULL, &key, &value, 0) != 0)
-    {
-      dout(4) << "..no collection list; return false" << dendl;
-      return false;
-    }
-
-  stored_colls *scp = (stored_colls *) value.get_data();
-  auto_ptr<stored_colls> sc (scp);
-  dout(5) << "..collection list is " << scp << dendl;
-  if (scp->count == 0)
-    {
-      dout(4) << "..empty collection list; return false" << dendl;
-      return false;
-    }
-  uint32_t ins = binary_search<coll_t> (scp->colls, scp->count, c);
-  dout(4) << "..insertion point is " << ins << dendl;
-
-  int ret = (scp->colls[ins] == c);
-  dout(4) << "..returns " << ret << dendl;
-  return ret;*/
-
-  Dbt key (&c, sizeof (coll_t));
-  Dbt value;
-  value.set_flags (DB_DBT_MALLOC);
-  if (db->get (NULL, &key, &value, 0) != 0)
-    {
-      dout(4) << "..no collection, return false" << dendl;
-      return false;
-    }
-  void *val = value.get_data();
-  free (val);
-  dout(4) << "..collection exists; return true" << dendl;
-  return true;
-}
-
-int OSBDB::collection_stat(coll_t c, struct stat *st)
-{
-  if (!mounted)
-    {
-      derr(1) << "not mounted" << dendl;
-      return -EINVAL;
-    }
-
-  dout(2) << "collection_stat " << c << dendl;
-  // XXX is this needed?
-  return -ENOSYS;
-}
-
-int OSBDB::collection_add(coll_t c, object_t o, Context *onsafe)
-{
-  dout(6) << "Context " << hex << onsafe << dec << dendl;
-  if (!mounted)
-    {
-      dout(2) << "not mounted" << dendl;
-      if (onsafe != NULL)
-        CLEANUP(onsafe);
-      return -EINVAL;
-    }
-
-  scoped_lock __lock(&lock);
-  dout(2) << "collection_add " << hex << c << dec << " " << o << dendl;
-
-  Dbt key (&c, sizeof (coll_t));
-  Dbt value;
-  value.set_flags (DB_DBT_MALLOC);
-  DbTxn *txn = NULL;
-
-  if (transactional)
-    env->txn_begin (NULL, &txn, 0);
-
-  if (db->get (txn, &key, &value, 0) != 0)
-    {
-      if (txn != NULL)
-        txn->abort();
-      if (onsafe != NULL)
-        CLEANUP(onsafe);
-      derr(1) << "failed to find collection" << dendl;
-      return -ENOENT;
-    }
-
-  size_t sz = value.get_size();
-  stored_coll *scp = (stored_coll *) value.get_data();
-  auto_ptr<stored_coll> sc (scp);
-
-  // Find the insertion point for the new object ID.
-  uint32_t ins = 0;
-  if (scp->count > 0)
-    {
-      ins = binary_search<object_t> (scp->objects, scp->count, o);
-      // Already there?
-      if (ins < scp->count && scp->objects[ins] == o)
-        {
-          if (txn != NULL)
-            txn->abort();
-          if (onsafe != NULL)
-            CLEANUP(onsafe);
-          derr(1) << "collection already has object" << dendl;
-          return -EEXIST;
-        }
-    }
-
-  // Make room for the new value, and add it.
-  sz += sizeof (object_t);
-  scp = (stored_coll *) realloc (scp, sz);
-  sc.release();
-  sc.reset (scp);
-  dout(3) << "..current collection: " << scp << dendl;
-  if (ins < scp->count - 1)
-    {
-      size_t n = (scp->count - ins) * sizeof (object_t);
-      dout(3) << "..move up " << n << " bytes" << dendl;
-      memmove (&scp->objects[ins + 1], &scp->objects[ins], n);
-    }
-  scp->count++;
-  scp->objects[ins] = o;
-
-  dout(3) << "..collection: " << scp << dendl;
-
-  Dbt nvalue (scp, sz);
-  if (db->put (txn, &key, &nvalue, 0) != 0)
-    {
-      if (txn != NULL)
-        txn->abort();
-      if (onsafe != NULL)
-        CLEANUP(onsafe);
-      derr(1) << "..putting modified collection failed" << dendl;
-      return -EIO;
-    }
-
-  if (txn != NULL)
-    txn->commit (0);
-  if (onsafe != NULL)
-    COMMIT(onsafe);
-  dout(4) << "..collection add OK" << dendl;
-  return 0;
-}
-
-int OSBDB::collection_remove(coll_t c, object_t o, Context *onsafe)
-{
-  dout(6) << "Context " << hex << onsafe << dec << dendl;
-  if (!mounted)
-    {
-      derr(1) << "not mounted" << dendl;
-      if (onsafe != NULL)
-        CLEANUP(onsafe);
-      return -EINVAL;
-    }
-
-  scoped_lock __lock(&lock);
-  dout(2) << "collection_remove " << hex << c << dec << " " << o << dendl;
-
-  Dbt key (&c, sizeof (coll_t));
-  Dbt value;
-  value.set_flags (DB_DBT_MALLOC);
-  DbTxn *txn = NULL;
-  if (transactional)
-    env->txn_begin (NULL, &txn, 0);
-
-  if (db->get (txn, &key, &value, 0) != 0)
-    {
-      if (txn != NULL)
-        txn->abort();
-      if (onsafe != NULL)
-        CLEANUP(onsafe);
-      dout(1) << "..collection doesn't exist" << dendl;
-      return -ENOENT;
-    }
-
-  stored_coll *scp = (stored_coll *) value.get_data();
-  auto_ptr<stored_coll> sc (scp);
-
-  dout(5) << "..collection is " << scp << dendl;
-  if (scp->count == 0)
-    {
-      if (txn != NULL)
-        txn->abort();
-      if (onsafe != NULL)
-        CLEANUP(onsafe);
-      dout(1) << "..collection is empty" << dendl;
-      return -ENOENT;
-    }
-  uint32_t ins = binary_search<object_t> (scp->objects, scp->count, o);
-  dout(4) << "..insertion point is " << ins << dendl;
-  if (ins >= scp->count || scp->objects[ins] != o)
-    {
-      if (txn != NULL)
-        txn->abort();
-      if (onsafe != NULL)
-        CLEANUP(onsafe);
-      dout(1) << "..object not in collection" << dendl;
-      return -ENOENT;
-    }
-
-  if (ins < scp->count - 1)
-    {
-      size_t n = (scp->count - ins - 1) * sizeof (object_t);
-      dout(5) << "..moving " << n << " bytes down" << dendl;
-      memmove (&scp->objects[ins], &scp->objects[ins + 1], n);
-    }
-  scp->count--;
-
-  dout(3) << "..collection " << scp << dendl;
-
-  Dbt nval (scp, value.get_size() - sizeof (object_t));
-  if (db->put (txn, &key, &nval, 0) != 0)
-    {
-      if (txn != NULL)
-        txn->abort();
-      if (onsafe != NULL)
-        CLEANUP(onsafe);
-      derr(1) << "..putting modified collection failed" << dendl;
-      return -EIO;
-    }
-
-  if (txn != NULL)
-    txn->commit (0);
-  if (onsafe != NULL)
-    COMMIT(onsafe);
-  dout(4) << "..collection remove OK" << dendl;
-  return 0;
-}
-
-int OSBDB::collection_list(coll_t c, list<object_t>& o)
-{
-  if (!mounted)
-    {
-      derr(1) << "not mounted" << dendl;
-      return -EINVAL;
-    }
-
-  Dbt key (&c, sizeof (coll_t));
-  Dbt value;
-  DbTxn *txn = NULL;
-
-  if (transactional)
-    env->txn_begin (NULL, &txn, 0);
-
-  if (db->get (txn, &key, &value, 0) != 0)
-    {
-      if (txn != NULL)
-        txn->abort();
-      return -ENOENT;
-    }
-
-  stored_coll *scp = (stored_coll *) value.get_data();
-  auto_ptr<stored_coll> sc (scp);
-  for (uint32_t i = 0; i < scp->count; i++)
-    o.push_back (scp->objects[i]);
-
-  if (txn != NULL)
-    txn->commit (0);
-  return 0;
-}
-
-\f // Attributes
-
-int OSBDB::_setattr(object_t oid, const char *name,
-                    const void *value, size_t size, Context *onsafe,
-                    DbTxn *txn)
-{
-  dout(6) << "Context " << hex << onsafe << dec << dendl;
-  if (!mounted)
-    {
-      if (onsafe != NULL)
-        CLEANUP(onsafe);
-      return -EINVAL;
-    }
-
-  if (strlen (name) >= OSBDB_MAX_ATTR_LEN)
-    {
-      derr(1) << "name too long: " << name << dendl;
-      if (onsafe != NULL)
-        CLEANUP(onsafe);
-      return -ENAMETOOLONG;
-    }
-
-  scoped_lock __lock(&lock);
-
-  // Add name to attribute list, if needed.
-  attrs_id aids = new_attrs_id (oid);
-  Dbt attrs_key (&aids, sizeof_attrs_id());
-  Dbt attrs_val;
-  attrs_val.set_flags (DB_DBT_MALLOC);
-  stored_attrs *sap = NULL;
-  size_t sz = 0;
-
-  dout(3) << "  getting " << aids << dendl;
-  if (db->get (txn, &attrs_key, &attrs_val, 0) != 0)
-    {
-      dout(2) << "  first attribute" << dendl;
-      sz = sizeof (stored_attrs);
-      sap = (stored_attrs *) malloc(sz);
-      sap->count = 0;
-    }
-  else
-    {
-      sz = attrs_val.get_size();
-      sap = (stored_attrs *) attrs_val.get_data();
-      dout(2) << "..add to list of " << sap->count << " attrs" << dendl;
-    }
-  auto_ptr<stored_attrs> sa (sap);
-
-  attr_name _name;
-  strncpy (_name.name, name, OSBDB_MAX_ATTR_LEN);
-
-  int ins = 0;
-  if (sap->count > 0)
-    ins = binary_search<attr_name> (sap->names, sap->count, _name);
-  dout(3) << "..insertion point is " << ins << dendl;
-  if (sap->count == 0 ||
-      (ins >= sap->count || strcmp (sap->names[ins].name, name) != 0))
-    {
-      sz += sizeof (attr_name);
-      dout(3) << "..realloc " << ((void *) sap) << " to "
-              << dec << sz << dendl;
-      sap = (stored_attrs *) realloc (sap, sz);
-      dout(3) << "..returns " << ((void *) sap) << dendl;
-      sa.release ();
-      sa.reset (sap);
-      int n = (sap->count - ins) * sizeof (attr_name);
-      if (n > 0)
-        {
-          dout(3) << "..move " << n  << " bytes from 0x"
-                  << hex << (&sap->names[ins]) << " to 0x"
-                  << hex << (&sap->names[ins+1]) << dec << dendl;
-          memmove (&sap->names[ins+1], &sap->names[ins], n);
-        }
-      memset (&sap->names[ins], 0, sizeof (attr_name));
-      strncpy (sap->names[ins].name, name, OSBDB_MAX_ATTR_LEN);
-      sap->count++;
-
-      Dbt newAttrs_val (sap, sz);
-      newAttrs_val.set_ulen (sz);
-      newAttrs_val.set_flags (DB_DBT_USERMEM);
-      dout(3) << "..putting " << aids << dendl;
-      if (db->put (txn, &attrs_key, &newAttrs_val, 0) != 0)
-        {
-          derr(1) << ".writing attributes list failed" << dendl;
-          if (onsafe != NULL)
-            CLEANUP(onsafe);
-          return -EIO;
-        }
-    }
-  else
-    {
-      dout(3) << "..attribute " << name << " already exists" << dendl;
-    }
-
-  dout(5) << "..attributes list: " << sap << dendl;
-
-  // Add the attribute.
-  attr_id aid = new_attr_id (oid, name);
-  Dbt attr_key (&aid, sizeof (aid));
-  Dbt attr_val ((void *) value, size);
-  dout(3) << "..writing attribute key " << aid << dendl;
-  if (db->put (txn, &attr_key, &attr_val, 0) != 0)
-    {
-      derr(1) << ".writing attribute key failed" << dendl;
-      if (onsafe != NULL)
-        CLEANUP(onsafe);
-      return -EIO;
-    }
-
-  dout(4) << "..setattr OK" << dendl;
-  if (onsafe != NULL)
-    COMMIT(onsafe);
-  return 0;
-}
-
-int OSBDB::setattr(object_t oid, const char *name,
-                   const void *value, size_t size,
-                   Context *onsafe)
-{
-  dout(6) << "Context " << hex << onsafe << dec << dendl;
-  if (!mounted)
-    {
-      if (onsafe != NULL)
-        CLEANUP(onsafe);
-      return -EINVAL;
-    }
-
-  DbTxn *txn = NULL;
-  if (transactional)
-    env->txn_begin (NULL, &txn, 0);
-
-  dout(2) << "setattr " << oid << ":" << name << " => ("
-          << size << " bytes)" << dendl;
-  int ret = _setattr (oid, name, value, size, onsafe, txn);
-  if (ret == 0)
-    {
-      if (txn != NULL)
-        txn->commit (0);
-    }
-  else
-    {
-      if (txn != NULL)
-        txn->abort();
-    }
-  return ret;
-}
-
-int OSBDB::setattrs(object_t oid, map<string,bufferptr>& aset,
-                    Context *onsafe)
-{
-  dout(6) << "Context " << hex << onsafe << dec << dendl;
-  if (!mounted)
-    {
-      if (onsafe != NULL)
-        CLEANUP(onsafe);
-      return -EINVAL;
-    }
-
-  DbTxn *txn = NULL;
-
-  if (transactional)
-    env->txn_begin (NULL, &txn, 0);
-
-  map<string,bufferptr>::iterator it;
-  for (it = aset.begin(); it != aset.end(); it++)
-    {
-      string name = it->first;
-      bufferptr value = it->second;
-      int ret = _setattr (oid, name.c_str(), value.c_str(),
-                          value.length(), onsafe, txn);
-      if (ret != 0)
-        {
-          if (txn != NULL)
-            txn->abort();
-          return ret;
-        }
-    }
-
-  if (txn != NULL)
-    txn->commit (0);
-  return 0;
-}
-
-int OSBDB::_getattr (object_t oid, const char *name, void *value, size_t size)
-{
-  if (!mounted)
-    return -EINVAL;
-
-  dout(2) << "_getattr " << oid << " " << name << " " << size << dendl;
-
-  attr_id aid = new_attr_id (oid, name);
-  Dbt key (&aid, sizeof (aid));
-  Dbt val (value, size);
-  val.set_ulen (size);
-  val.set_doff (0);
-  val.set_dlen (size);
-  val.set_flags (DB_DBT_USERMEM | DB_DBT_PARTIAL);
-
-  int ret;
-  if ((ret = db->get (NULL, &key, &val, 0)) != 0)
-    {
-      derr(1) << ".getting value failed: " << db_strerror (ret) << dendl; 
-      return -ENOENT;
-    }
-
-  dout(4) << ".._getattr OK; returns " << val.get_size() << dendl;
-  return val.get_size();
-}
-
-int OSBDB::getattr(object_t oid, const char *name, void *value, size_t size)
-{
-  if (!mounted)
-    return -EINVAL;
-
-  return _getattr (oid, name, value, size);
-}
-
-int OSBDB::getattrs(object_t oid, map<string,bufferptr>& aset)
-{
-  if (!mounted)
-    return -EINVAL;
-
-  for (map<string,bufferptr>::iterator it = aset.begin();
-       it != aset.end(); it++)
-    {
-      int ret = _getattr (oid, (*it).first.c_str(),
-                          (*it).second.c_str(),
-                          (*it).second.length());
-      if (ret < 0)
-        return ret;
-    }
-  return 0;
-}
-
-int OSBDB::rmattr(object_t oid, const char *name, Context *onsafe)
-{
-  dout(6) << "Context " << hex << onsafe << dec << dendl;
-  if (!mounted)
-    {
-      if (onsafe != NULL)
-        CLEANUP(onsafe);
-      return -EINVAL;
-    }
-
-  scoped_lock __lock(&lock);
-  dout(2) << "rmattr " << oid << " " << name << dendl;
-
-  attrs_id aids = new_attrs_id (oid);
-  Dbt askey (&aids, sizeof_attrs_id());
-  Dbt asvalue;
-  asvalue.set_flags (DB_DBT_MALLOC);
-
-  DbTxn *txn = NULL;
-
-  if (transactional)
-    env->txn_begin (NULL, &txn, 0);
-
-  if (db->get (txn, &askey, &asvalue, 0) != 0)
-    {
-      if (txn != NULL)
-        txn->abort();
-      if (onsafe != NULL)
-        CLEANUP(onsafe);
-      return -ENOENT;
-    }
-
-  stored_attrs *sap = (stored_attrs *) asvalue.get_data();
-  auto_ptr<stored_attrs> sa (sap);
-
-  dout(5) << "..attributes list " << sap << dendl;
-
-  if (sap->count == 0)
-    {
-      if (txn != NULL)
-        txn->abort();
-      if (onsafe != NULL)
-        CLEANUP(onsafe);
-      derr(1) << ".empty attribute list" << dendl;
-      return -ENOENT;
-    }
-
-  attr_name _name;
-  memset(&_name, 0, sizeof (_name));
-  strncpy (_name.name, name, OSBDB_MAX_ATTR_LEN);
-  int ins = binary_search<attr_name> (sap->names, sap->count, _name);
-  dout(4) << "..insertion point is " << ins << dendl;
-  if (ins >= sap->count || strcmp (sap->names[ins].name, name) != 0)
-    {
-      if (txn != NULL)
-        txn->abort();
-      if (onsafe != NULL)
-        CLEANUP(onsafe);
-      derr(1) << ".attribute not found in list" << dendl;
-      return -ENOENT;
-    }
-
-  // Shift the later elements down by one, if needed.
-  int n = (sap->count - ins) * sizeof (attr_name);
-  if (n > 0)
-    {
-      dout(4) << "..shift down by " << n << dendl;
-      memmove (&(sap->names[ins]), &(sap->names[ins + 1]), n);
-    }
-  sap->count--;
-  dout(5) << "..attributes list now " << sap << dendl;
-
-  asvalue.set_size(asvalue.get_size() - sizeof (attr_name));
-  int ret;
-  if ((ret = db->put (txn, &askey, &asvalue, 0)) != 0)
-    {
-      derr(1) << "put stored_attrs " << db_strerror (ret) << dendl;
-      if (txn != NULL)
-        txn->abort();
-      if (onsafe != NULL)
-        CLEANUP(onsafe);
-      return -EIO;
-    }
-
-  // Remove the attribute.
-  attr_id aid = new_attr_id (oid, name);
-  Dbt key (&aid, sizeof (aid));
-  if ((ret = db->del (txn, &key, 0)) != 0)
-    {
-      derr(1) << "deleting " << aid << ": " << db_strerror(ret) << dendl;
-      if (txn != NULL)
-        txn->abort();
-      if (onsafe != NULL)
-        CLEANUP(onsafe);
-      return -EIO;
-    }
-
-  if (txn != NULL)
-    txn->commit (0);
-  if (onsafe != NULL)
-    COMMIT(onsafe);
-  dout(4) << "..rmattr OK" << dendl;
-  return 0;
-}
-
-int OSBDB::listattr(object_t oid, char *attrs, size_t size)
-{
-  if (!mounted)
-    return -EINVAL;
-
-  dout(2) << "listattr " << oid << dendl;
-
-  attrs_id aids = new_attrs_id (oid);
-  Dbt key (&aids, sizeof_attrs_id());
-  Dbt value;
-  value.set_flags (DB_DBT_MALLOC);
-
-  // XXX Transactions for read atomicity???
-
-  int ret;
-  if ((ret = db->get (NULL, &key, &value, 0)) != 0)
-    {
-      derr(1) << "fetching " << aids << ": " << db_strerror (ret)
-              << dendl;
-      return -ENOENT;
-    }
-
-  stored_attrs *attrsp = (stored_attrs *) value.get_data();
-  auto_ptr<stored_attrs> _attrs (attrsp);
-  size_t s = 0;
-  char *p = attrs;
-  for (unsigned i = 0; i < attrsp->count && s < size; i++)
-    {
-      int n = MIN (OSBDB_MAX_ATTR_LEN,
-                   MIN (strlen (attrsp->names[i].name), size - s - 1));
-      strncpy (p, attrsp->names[i].name, n);
-      p[n] = '\0';
-      p = p + n + 1;
-    }
-
-  dout(4) << "listattr OK" << dendl;
-  return 0;
-}
-
-\f // Collection attributes.
-
-int OSBDB::collection_setattr(coll_t cid, const char *name,
-                              const void *value, size_t size,
-                              Context *onsafe)
-{
-  dout(6) << "Context " << hex << onsafe << dec << dendl;
-  if (!mounted)
-    {
-      if (onsafe != NULL)
-        CLEANUP(onsafe);
-      return -EINVAL;
-    }
-
-  scoped_lock __lock(&lock);
-  dout(2) << "collection_setattr " << hex << cid << dec << " " << name
-          << " (" << size << " bytes)" << dendl;
-  if (strlen (name) >= OSBDB_MAX_ATTR_LEN)
-    {
-      derr(1) << "name too long" << dendl;
-      if (onsafe != NULL)
-        CLEANUP(onsafe);
-      return -ENAMETOOLONG;
-    }
-
-  // Add name to attribute list, if needed.
-  coll_attrs_id aids = new_coll_attrs_id (cid);
-  Dbt attrs_key (&aids, sizeof_coll_attrs_id());
-  Dbt attrs_val;
-  attrs_val.set_flags (DB_DBT_MALLOC);
-  stored_attrs *sap = NULL;
-  size_t sz = 0;
-
-  DbTxn *txn = NULL;
-  if (transactional)
-    env->txn_begin (NULL, &txn, 0);
-
-  dout(3) << "  getting " << aids << dendl;
-  if (db->get (txn, &attrs_key, &attrs_val, 0) != 0)
-    {
-      dout(2) << "  first attribute" << dendl;
-      sz = sizeof (stored_attrs);
-      sap = (stored_attrs *) malloc(sz);
-      sap->count = 0;
-    }
-  else
-    {
-      sz = attrs_val.get_size();
-      sap = (stored_attrs *) attrs_val.get_data();
-      dout(2) << "  add to list of " << sap->count << " attrs" << dendl;
-    }
-  auto_ptr<stored_attrs> sa (sap);
-
-  attr_name _name;
-  strncpy (_name.name, name, OSBDB_MAX_ATTR_LEN);
-
-  int ins = 0;
-  if (sap->count > 0)
-    ins = binary_search<attr_name> (sap->names, sap->count, _name);
-  dout(3) << "  insertion point is " << ins << dendl;
-  if (ins >= sap->count || strcmp (sap->names[ins].name, name) != 0)
-    {
-      sz += sizeof (attr_name);
-      dout(3) << "  realloc " << hex << ((void *) sap) << " to "
-              << dec << sz << dendl;
-      sap = (stored_attrs *) realloc (sap, sz);
-      dout(3) << "  returns " << hex << ((void *) sap) << dec << dendl;
-      sa.release ();
-      sa.reset (sap);
-      int n = (sap->count - ins) * sizeof (attr_name);
-      if (n > 0)
-        {
-          dout(3) << "  move " << n  << " bytes from 0x"
-                  << hex << (&sap->names[ins]) << " to 0x"
-                  << hex << (&sap->names[ins+1]) << dec << dendl;
-          memmove (&sap->names[ins+1], &sap->names[ins], n);
-        }
-      memset (&sap->names[ins], 0, sizeof (attr_name));
-      strncpy (sap->names[ins].name, name, OSBDB_MAX_ATTR_LEN);
-      sap->count++;
-
-      Dbt newAttrs_val (sap, sz);
-      newAttrs_val.set_ulen (sz);
-      newAttrs_val.set_flags (DB_DBT_USERMEM);
-      dout(3) << "  putting " << aids << dendl;
-      if (db->put (txn, &attrs_key, &newAttrs_val, 0) != 0)
-        {
-          if (txn != NULL)
-            txn->abort();
-          if (onsafe != NULL)
-            CLEANUP(onsafe);
-          derr(1) << ".putting new attributes failed" << dendl;
-          return -EIO;
-        }
-    }
-  else
-    {
-      dout(3) << "..attribute " << name << " already exists" << dendl;
-    }
-
-  dout(3) << "..attributes list: " << sap << dendl;
-
-  // Add the attribute.
-  coll_attr_id aid = new_coll_attr_id (cid, name);
-  Dbt attr_key (&aid, sizeof (aid));
-  Dbt attr_val ((void *) value, size);
-  dout(3) << "  writing attribute key " << aid << dendl;
-  if (db->put (txn, &attr_key, &attr_val, 0) != 0)
-    {
-      if (txn != NULL)
-        txn->abort();
-      if (onsafe != NULL)
-        CLEANUP(onsafe);
-      derr(1) << ".putting attribute failed" << dendl;
-      return -EIO;
-    }
-
-  if (txn != NULL)
-    txn->commit (0);
-  if (onsafe != NULL)
-    COMMIT(onsafe);
-
-  dout(4) << "..collection setattr OK" << dendl;
-  return 0;
-}
-
-int OSBDB::collection_rmattr(coll_t cid, const char *name,
-                             Context *onsafe)
-{
-  dout(6) << "Context " << hex << onsafe << dec << dendl;
-  if (!mounted)
-    {
-      if (onsafe != NULL)
-        CLEANUP(onsafe);
-      return -EINVAL;
-    }
-
-  scoped_lock __lock(&lock);
-  dout(2) << "collection_rmattr " << hex << cid << dec
-          << " " << name << dendl;
-
-  coll_attrs_id aids = new_coll_attrs_id (cid);
-  Dbt askey (&aids, sizeof_coll_attrs_id());
-  Dbt asvalue;
-  asvalue.set_flags (DB_DBT_MALLOC);
-
-  DbTxn *txn = NULL;
-  if (transactional)
-    env->txn_begin (NULL, &txn, 0);
-
-  if (db->get (txn, &askey, &asvalue, 0) != 0)
-    {
-      if (txn != NULL)
-        txn->abort();
-      if (onsafe != NULL)
-        CLEANUP(onsafe);
-      derr(1) << ".no attributes list" << dendl;
-      return -ENOENT;
-    }
-
-  stored_attrs *sap = (stored_attrs *) asvalue.get_data();
-  auto_ptr<stored_attrs> sa (sap);
-
-  dout(5) << "..attributes list " << sap << dendl;
-  if (sap->count == 0)
-    {
-      if (txn != NULL)
-        txn->abort();
-      if (onsafe != NULL)
-        CLEANUP(onsafe);
-      derr(1) << ".empty attributes list" << dendl;
-      return -ENOENT;
-    }
-
-  attr_name _name;
-  memset(&_name, 0, sizeof (_name));
-  strncpy (_name.name, name, OSBDB_MAX_ATTR_LEN);
-  int ins = binary_search<attr_name> (sap->names, sap->count, _name);
-  if (ins >= sap->count || strcmp (sap->names[ins].name, name) != 0)
-    {
-      if (txn != NULL)
-        txn->abort();
-      if (onsafe != NULL)
-        CLEANUP(onsafe);
-      derr(1) << ".attribute not listed" << dendl;
-      return -ENOENT;
-    }
-
-  // Shift the later elements down by one, if needed.
-  int n = (sap->count - ins) * sizeof (attr_name);
-  if (n > 0)
-    {
-      dout(4) << "..shift down by " << n << dendl;
-      memmove (&(sap->names[ins]), &(sap->names[ins + 1]), n);
-    }
-  sap->count--;
-  dout(5) << "..attributes list now " << sap << dendl;
-
-  asvalue.set_size(asvalue.get_size() - sizeof (attr_name));
-  int ret;
-  if ((ret = db->put (txn, &askey, &asvalue, 0)) != 0)
-    {
-      derr(1) << "put stored_attrs " << db_strerror (ret) << dendl;
-      if (txn != NULL)
-        txn->abort();
-      if (onsafe != NULL)
-        CLEANUP(onsafe);
-      return -EIO;
-    }
-
-  // Remove the attribute.
-  coll_attr_id aid = new_coll_attr_id (cid, name);
-  Dbt key (&aid, sizeof (aid));
-  if ((ret = db->del (txn, &key, 0)) != 0)
-    {
-      derr(1) << "deleting " << aid << ": " << db_strerror(ret) << dendl;
-      if (txn != NULL)
-        txn->abort();
-      if (onsafe != NULL)
-        CLEANUP(onsafe);
-      return -EIO;
-    }
-
-  if (txn != NULL)
-    txn->commit (0);
-  if (onsafe != NULL)
-    COMMIT(onsafe);
-
-  dout(4) << "..collection rmattr OK" << dendl;
-  return 0;
-}
-
-int OSBDB::collection_getattr(coll_t cid, const char *name,
-                              void *value, size_t size)
-{
-  if (!mounted)
-    return -EINVAL;
-
-  dout(2) << "collection_getattr " << hex << cid << dec
-          << " " << name << dendl;
-
-  // XXX transactions/read isolation?
-
-  coll_attr_id caid = new_coll_attr_id (cid, name);
-  Dbt key (&caid, sizeof (caid));
-  Dbt val (value, size);
-  val.set_ulen (size);
-  val.set_dlen (size);
-  val.set_flags (DB_DBT_USERMEM | DB_DBT_PARTIAL);
-
-  if (db->get (NULL, &key, &val, 0) != 0)
-    {
-      derr(1) << ".no attribute entry" << dendl;
-      return -ENOENT;
-    }
-
-  dout(4) << "..collection getattr OK; returns " << val.get_size() << dendl;
-  return val.get_size();
-}
-
-int OSBDB::collection_listattr(coll_t cid, char *attrs, size_t size)
-{
-  if (!mounted)
-    return -EINVAL;
-
-  dout(2) << "collection_listattr " << hex << cid << dec << dendl;
-
-  // XXX transactions/read isolation?
-
-  coll_attrs_id caids = new_coll_attrs_id (cid);
-  Dbt key (&caids, sizeof_coll_attrs_id());
-  Dbt value;
-  value.set_flags (DB_DBT_MALLOC);
-
-  int ret;
-  if ((ret = db->get (NULL, &key, &value, 0)) != 0)
-    {
-      derr(1) << "fetching " << caids << ": " << db_strerror (ret)
-              << dendl;
-      return -ENOENT;
-    }
-
-  stored_attrs *attrsp = (stored_attrs *) value.get_data();
-  auto_ptr<stored_attrs> _attrs (attrsp);
-  size_t s = 0;
-  char *p = attrs;
-  for (unsigned i = 0; i < attrsp->count && s < size; i++)
-    {
-      int n = MIN (OSBDB_MAX_ATTR_LEN,
-                   MIN (strlen (attrsp->names[i].name), size - s - 1));
-      strncpy (p, attrsp->names[i].name, n);
-      p[n] = '\0';
-      p = p + n + 1;
-    }
-  return 0;
-}
-
-\f // Sync.
-
-void OSBDB::sync (Context *onsync)
-{
-  if (!mounted)
-    return;
-
-  sync();
-
-  if (onsync != NULL)
-    {
-      g_timer.add_event_after(0.1, onsync);
-    }
-}
-
-void OSBDB::sync()
-{
-  if (!mounted)
-    return;
-
-  if (transactional)
-    {
-      env->log_flush (NULL);
-      env->lsn_reset (device.c_str(), 0);
-    }
-  db->sync(0);
-}
diff --git a/src/osbdb/OSBDB.h b/src/osbdb/OSBDB.h
deleted file mode 100644 (file)
index 8eb2004..0000000
+++ /dev/null
@@ -1,482 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
-// vim: ts=8 sw=2 smarttab
-/* OSBDB.h -- ObjectStore on Berkeley DB. -*- c++ -*-
-   Copyright (C) 2007 Casey Marshall <csm@soe.ucsc.edu>
-
-Ceph - scalable distributed file system
-
-This is free software; you can redistribute it and/or
-modify it under the terms of the GNU Lesser General Public
-License version 2.1, as published by the Free Software 
-Foundation.  See file COPYING. */
-
-
-#include <db_cxx.h>
-#include "osd/ObjectStore.h"
-
-#define OSBDB_MAGIC 0x05BDB
-
-/*
- * Maximum length of an attribute name.
- */
-#define OSBDB_MAX_ATTR_LEN 256
-
-#define OSBDB_THIS_VERSION 1
-
-#define OSBDB_SUPERBLOCK_KEY ((void *) "s")
-
-/*
- * The "superblock" of the BDB object store. We store one of these in
- * the DB, to store version and other information. We don't record
- * anything special here, just the version number the database was
- * written with.
- *
- * In principle, this structure is variable-length, depending on the
- * software version writing the superblock.
- */
-struct stored_superblock
-{
-  uint32_t version;
-};
-
-inline ostream& operator<<(ostream& out, const stored_superblock sb)
-{
-  out << "osbdb.super(" << sb.version << ")" << endl;
-  return out;
-}
-
-/**
- * An object identifier; we define this so we can have a POD object to
- * work with.
- */
-struct oid_t // POD
-{
-  char id[16];
-};
-
-inline void mkoid (oid_t& id, object_t& oid)
-{
-  // XXX byte order?
-  memcpy (id.id, &oid, sizeof (oid_t));
-}
-
-inline ostream& operator<<(ostream& out, const oid_t id)
-{
-  for (int i = 0; i < 16; i++)
-    {
-      out.fill('0');
-      out << setw(2) << hex << (id.id[i] & 0xFF);
-      if ((i & 3) == 3)
-        out << ':';
-    }
-  out.unsetf(ios::right);
-  out << dec;
-  return out;
-}
-
-/**
- * An "inode" key. We map a 'stored_object' struct to this key for
- * every object.
- */
-struct object_inode_key // POD
-{
-  oid_t oid;
-  char tag;
-};
-
-/**
- * "Constructor" for an object_inode_key.
- */
-inline object_inode_key new_object_inode_key (object_t& oid)
-{
-  object_inode_key key;
-  memset(&key, 0, sizeof (object_inode_key));
-  mkoid (key.oid, oid);
-  key.tag = 'i';
-  return key;
-}
-
-/*
- * We use this, instead of sizeof(), to try and guarantee that we
- * don't include the structure padding, if any.
- *
- * This *should* return 17: sizeof (oid_t) == 16; sizeof (char) == 1.
- */
-inline size_t sizeof_object_inode_key()
-{
-  return offsetof(object_inode_key, tag) + sizeof (char);
-}
-
-             // Frank Poole: Unfortunately, that sounds a little
-             //              like famous last words.
-             //   -- 2001: A Space Odyssey
-
-inline ostream& operator<<(ostream& out, const object_inode_key o)
-{
-  out << o.tag << "/" << o.oid;
-  return out;
-}
-
-/**
- * A stored object. This is essentially the "inode" of the object,
- * containing things like the object's length. The object itself is
- * stored as-is, mapped by the 128-bit object ID.
- */
-struct stored_object
-{
-  uint32_t length;
-};
-
-inline ostream& operator<<(ostream& out, const stored_object s)
-{
-  out << "inode(l:" << s.length << ")";
-  return out;
-}
-
-/*
- * Key referencing the list of attribute names for an object. This is
- * simply the object's ID, with an additional character 'a' appended.
- */
-struct attrs_id // POD
-{
-  oid_t oid;
-  char tag;
-};
-
-/*
- * "Construtor" for attrs_id.
- */
-inline struct attrs_id new_attrs_id (object_t& oid)
-{
-  attrs_id aid;
-  memset (&aid, 0, sizeof (attrs_id));
-  mkoid(aid.oid, oid);
-  aid.tag = 'a';
-  return aid;
-}
-
-/*
- * See explanation for sizeof_object_inode_id.
- */
-inline size_t sizeof_attrs_id()
-{
-  return offsetof(struct attrs_id, tag) + sizeof (char);
-}
-
-inline ostream& operator<<(ostream& out, const attrs_id id)
-{
-  out << id.tag << "/" << id.oid;
-  return out;
-}
-
-/*
- * Encapsulation of a single attribute name.
- */
-struct attr_name // POD
-{
-  char name[OSBDB_MAX_ATTR_LEN];
-};
-
-inline ostream& operator<<(ostream& out, const attr_name n)
-{
-  out << n.name;
-  return out;
-}
-
-inline bool operator<(const attr_name n1, const attr_name n2)
-{
-  return (strncmp (n1.name, n2.name, OSBDB_MAX_ATTR_LEN) < 0);
-}
-
-inline bool operator>(const attr_name n1, const attr_name n2)
-{
-  return (strncmp (n1.name, n2.name, OSBDB_MAX_ATTR_LEN) > 0);
-}
-
-inline bool operator==(const attr_name n1, const attr_name n2)
-{
-  std::cerr << n1.name << " == " << n2.name << "?" << endl;
-  return (strncmp (n1.name, n2.name, OSBDB_MAX_ATTR_LEN) == 0);
-}
-
-inline bool operator!=(const attr_name n1, const attr_name n2)
-{
-  return !(n1 == n2);
-}
-
-inline bool operator>=(const attr_name n1, const attr_name n2)
-{
-  return !(n1 < n2);
-}
-
-inline bool operator<=(const attr_name n1, const attr_name n2)
-{
-  return !(n1 > n2);
-}
-
-/*
- * A list of an object or collection's attribute names.
- */
-struct stored_attrs
-{
-  uint32_t count;
-  attr_name names[0];   // actually variable-length
-};
-
-inline ostream& operator<<(ostream& out, const stored_attrs *sa)
-{
-  out << sa->count << " [ ";
-  for (unsigned i = 0; i < sa->count; i++)
-    out << sa->names[i] << (i == sa->count - 1 ? " " : ", ");
-  out << "]";
-  return out;
-}
-
-/*
- * An object attribute key. An object attribute is mapped simply by
- * the object ID appended with the attribute name. Attribute names
- * may not be empty, and must be less than 256 characters, in this
- * implementation.
- */
-struct attr_id // POD
-{
-  oid_t oid;
-  attr_name name;
-};
-
-inline attr_id new_attr_id (object_t& oid, const char *name)
-{
-  attr_id aid;
-  memset(&aid, 0, sizeof (attr_id));
-  mkoid (aid.oid, oid);
-  strncpy (aid.name.name, name, OSBDB_MAX_ATTR_LEN);
-  return aid;
-}
-
-inline ostream& operator<<(ostream &out, const attr_id id)
-{
-  out << id.oid << ":" << id.name;
-  return out;
-}
-
-/*
- * A key for a collection attributes list.
- */
-struct coll_attrs_id // POD
-{
-  coll_t cid;
-  char tag;
-};
-
-inline coll_attrs_id new_coll_attrs_id (coll_t cid)
-{
-  coll_attrs_id catts;
-  memset(&catts, 0, sizeof (coll_attrs_id));
-  catts.cid = cid;
-  catts.tag = 'C';
-  return catts;
-}
-
-inline size_t sizeof_coll_attrs_id()
-{
-  return offsetof(coll_attrs_id, tag) + sizeof (char);
-}
-
-inline ostream& operator<<(ostream& out, coll_attrs_id id)
-{
-  out << id.tag << "/" << id.cid;
-  return out;
-}
-
-/*
- * A collection attribute key. Similar to 
- */
-struct coll_attr_id // POD
-{
-  coll_t cid;
-  attr_name name;
-};
-
-inline coll_attr_id new_coll_attr_id (coll_t cid, const char *name)
-{
-  coll_attr_id catt;
-  memset(&catt, 0, sizeof (coll_attr_id));
-  catt.cid = cid;
-  strncpy (catt.name.name, name, OSBDB_MAX_ATTR_LEN);
-  return catt;
-}
-
-inline ostream& operator<<(ostream& out, coll_attr_id id)
-{
-  out << id.cid << ":" << id.name;
-  return out;
-}
-
-/*
- * This is the key we store the master collections list under.
- */
-#define COLLECTIONS_KEY ((void *) "c")
-
-/*
- * The master list of collections. There should be one of these per
- * OSD. The sole reason for this structure is to have the ability
- * to enumerate all collections stored on this OSD.
- */
-struct stored_colls
-{
-  // The number of collections.
-  uint32_t count;
-
-  // The collection identifiers. This is a sorted list of coll_t
-  // values.
-  coll_t colls[0]; // actually variable-length
-};
-
-inline ostream& operator<<(ostream& out, stored_colls *c)
-{
-  out << c->count << " [ ";
-  for (unsigned i = 0; i < c->count; i++)
-    {
-      out << hex << c->colls[i];
-      if (i < c->count - 1)
-        out << ", ";
-    }
-  out << " ]" << dec;
-  return out;
-}
-
-/*
- * A stored collection (a bag of object IDs). These are referenced by
- * the bare collection identifier type, a coll_t (thus, a 32-bit
- * integer). Internally this is stored as a sorted list of object IDs.
- *
- * Note, this structure places all collection items in a single
- * record; this may be a memory burden for large collections.
- */
-struct stored_coll
-{
-  // The size of this collection.
-  uint32_t count;
-
-  // The object IDs in this collection. This is a sorted list of all
-  // object ID's in this collection.
-  object_t objects[0]; // actually variable-length
-};
-
-inline ostream& operator<<(ostream& out, stored_coll *c)
-{
-  out << c->count << " [ ";
-  for (unsigned i = 0; i < c->count; i++)
-    {
-      out << c->objects[i];
-      if (i < c->count - 1)
-        out << ", ";
-    }
-  out << " ]";
-  return out;
-}
-
-class OSBDBException : public std::exception
-{
-  const char *msg;
-
-public:
-  OSBDBException(const char *msg) : msg(msg) { }
-  const char *what() { return msg; }
-};
-
-/*
- * The object store interface for Berkeley DB.
- */
-class OSBDB : public ObjectStore
-{
- private:
-  Mutex lock;
-  DbEnv *env;
-  Db *db;
-  string device;
-  string env_dir;
-  bool mounted;
-  bool opened;
-  bool transactional;
-
- public:
-
-  OSBDB(const char *dev) throw(OSBDBException)
-    : lock(true), env(0), db (0), device (dev), mounted(false), opened(false),
-      transactional(g_conf.bdbstore_transactional)
-  {
-  }
-
-  ~OSBDB()
-  {
-    if (mounted)
-      {
-        umount();
-      }
-  }
-
-  int mount();
-  int umount();
-  int mkfs();
-
-  int statfs(struct statfs *buf);
-
-  int pick_object_revision_lt(object_t& oid);
-
-  bool exists(object_t oid);
-  int stat(object_t oid, struct stat *st);
-
-  int remove(object_t oid, Context *onsafe=0);
-
-  int truncate(object_t oid, off_t size, Context *onsafe=0);
-
-  int read(object_t oid, off_t offset, size_t len,
-           bufferlist& bl);
-  int write(object_t oid, off_t offset, size_t len,
-            bufferlist& bl, Context *onsafe);
-
-  int setattr(object_t oid, const char *name,
-              const void *value, size_t size, Context *onsafe=0);
-  int setattrs(object_t oid, map<string,bufferptr>& aset,
-               Context *onsafe=0);
-  int getattr(object_t oid, const char *name,
-              void *value, size_t size);
-  int getattrs(object_t oid, map<string,bufferptr>& aset);
-  int rmattr(object_t oid, const char *name,
-             Context *onsafe=0);
-  int listattr(object_t oid, char *attrs, size_t size);
-
-  int clone(object_t oid, object_t noid);
-  
-  // Collections.
-  
-  int list_collections(list<coll_t>& ls);
-  int create_collection(coll_t c, Context *onsafe=0);
-  int destroy_collection(coll_t c, Context *onsafe=0);
-  bool collection_exists(coll_t c);
-  int collection_stat(coll_t c, struct stat *st);
-  int collection_add(coll_t c, object_t o, Context *onsafe=0);
-  int collection_remove(coll_t c, object_t o, Context *onsafe=0);
-  int collection_list(coll_t c, list<object_t>& o);
-
-  int collection_setattr(coll_t cid, const char *name,
-                         const void *value, size_t size,
-                         Context *onsafe=0);
-  int collection_rmattr(coll_t cid, const char *name,
-                        Context *onsafe=0);
-  int collection_getattr(coll_t cid, const char *name,
-                         void *value, size_t size);
-  int collection_listattr(coll_t cid, char *attrs, size_t size);
-
-  void sync(Context *onsync);
-  void sync();
-
-private:
-  int opendb (DBTYPE type=DB_UNKNOWN, int flags=0, bool new_env=false);
-
-  int _setattr(object_t oid, const char *name, const void *value,
-               size_t size, Context *onsync, DbTxn *txn);
-  int _getattr(object_t oid, const char *name, void *value, size_t size);
-  DbEnv *getenv();
-};
index 5906e8f94212228cd94f39e50f7cc49267a44d04..1079f7d7cf33fffb692a8ca6a8ce22af8a466ba8 100644 (file)
 #include "common/ceph_argparse.h"
 #include "os/FileStore.h"
 
-#ifdef USE_OSBDB
-#include "osbdb/OSBDB.h"
-#endif // USE_OSBDB
-
-
 #include "ReplicatedPG.h"
 //#include "RAID4PG.h"
 
index 24c81590d899c8d913c6af5e3ef7047845b20137..83641f829d7c07064fc4c64d841075726e47eb67 100644 (file)
@@ -11,7 +11,6 @@ Foundation.  See file COPYING. */
 
 #include "osd/ObjectStore.h"
 #include "ebofs/Ebofs.h"
-#include "osbdb/OSBDB.h"
 #include "include/buffer.h"
 
 #include <iostream>
@@ -140,15 +139,6 @@ int main (int argc, char **argv)
         }
       os = new Ebofs (osd_file);
     }
-  else if (strcasecmp (osd_name, "osbdb") == 0)
-    {
-      os = new OSBDB (osd_file);
-    }
-  else if (strcasecmp (osd_name, "osbdb-btree") == 0)
-    {
-      g_conf.bdbstore_btree = true;
-      os = new OSBDB (osd_file);
-    }
   else
     {
       cerr << "I don't know about object store \"" << osd_name << "\""
diff --git a/src/test/old/testosbdb.cc b/src/test/old/testosbdb.cc
deleted file mode 100644 (file)
index 19268e7..0000000
+++ /dev/null
@@ -1,347 +0,0 @@
-/* testosbdb.cc -- test OSBDB.
-   Copyright (C) 2007 Casey Marshall <csm@soe.ucsc.edu> */
-
-
-#include <iostream>
-#include "osbdb/OSBDB.h"
-
-using namespace std;
-
-int
-main (int argc, char **argv)
-{
-  vector<char *> args;
-  argv_to_vec (argc, argv, args);
-  parse_config_options (args);
-
-  g_conf.debug_bdbstore = 10;
-  //g_conf.bdbstore_btree = true;
-  char dbfile[256];
-  strncpy (dbfile, "/tmp/testosbdb/db.XXXXXX", 256);
-  mktemp (dbfile);
-  OSBDB *os = new OSBDB(dbfile);
-  auto_ptr<OSBDB> osPtr (os);
-  os->mkfs();
-  os->mount();
-
-  // Put an object.
-  object_t oid (0xDEADBEEF00000000ULL, 0xFEEDFACE);
-
-  cout << "sizeof oid_t is " << sizeof (oid_t) << endl;
-  cout << "offsetof oid_t.id " << offsetof (oid_t, id) << endl;
-
-  cout << sizeof (object_t) << endl;
-  cout << sizeof (oid.ino) << endl;
-  cout << sizeof (oid.bno) << endl;
-  cout << sizeof (oid.rev) << endl;
-
-  // Shouldn't be there.
-  if (os->exists (oid))
-    {
-      cout << "FAIL: oid shouldn't be there " << oid << endl;
-    }
-
-  // Write an object.
-  char *x = (char *) malloc (1024);
-  memset(x, 0xaa, 1024);
-  bufferptr bp (x, 1024);
-  bufferlist bl;
-  bl.push_back (bp);
-
-  if (os->write (oid, 0L, 1024, bl, NULL) != 1024)
-    {
-      cout << "FAIL: writing object" << endl;
-    }
-
-  os->sync();
-
-  // Should be there.
-  if (!os->exists (oid))
-    {
-      cout << "FAIL: oid should be there: " << oid << endl;
-    }
-
-  memset(x, 0, 1024);
-  if (os->read (oid, 0, 1024, bl) != 1024)
-    {
-      cout << "FAIL: reading object" << endl;
-    }
-
-  for (int i = 0; i < 1024; i++)
-    {
-      if ((x[i] & 0xFF) != 0xaa)
-        {
-          cout << "FAIL: data read out is different" << endl;
-          break;
-        }
-    }
-
-  // Set some attributes
-  if (os->setattr (oid, "alpha", "value", strlen ("value")) != 0)
-    {
-      cout << "FAIL: set attribute" << endl;
-    }
-  if (os->setattr (oid, "beta", "value", strlen ("value")) != 0)
-    {
-      cout << "FAIL: set attribute" << endl;
-    }
-  if (os->setattr (oid, "gamma", "value", strlen ("value")) != 0)
-    {
-      cout << "FAIL: set attribute" << endl;
-    }
-  if (os->setattr (oid, "fred", "value", strlen ("value")) != 0)
-    {
-      cout << "FAIL: set attribute" << endl;
-    }
-
-  char *attrs = (char *) malloc (1024);
-  if (os->listattr (oid, attrs, 1024) != 0)
-    {
-      cout << "FAIL: listing attributes" << endl;
-    }
-  else
-    {
-      char *p = attrs;
-      if (strcmp (p, "alpha") != 0)
-        {
-          cout << "FAIL: should be \"alpha:\" \"" << p << "\"" << endl;
-        }
-      p = p + strlen (p) + 1;
-      if (strcmp (p, "beta") != 0)
-        {
-          cout << "FAIL: should be \"beta:\" \"" << p << "\"" << endl;
-        }
-      p = p + strlen (p) + 1;
-      if (strcmp (p, "fred") != 0)
-        {
-          cout << "FAIL: should be \"fred:\" \"" << p << "\"" << endl;
-        }
-      p = p + strlen (p) + 1;
-      if (strcmp (p, "gamma") != 0)
-        {
-          cout << "FAIL: should be \"gamma:\" \"" << p << "\"" << endl;
-        }
-    }
-
-  char attrvalue[256];
-  memset(attrvalue, 0, sizeof (attrvalue));
-  if (os->getattr (oid, "alpha", attrvalue, sizeof(attrvalue)) < 0)
-    {
-      cout << "FAIL: getattr alpha" << endl;
-    }
-  else if (strncmp ("value", attrvalue, strlen("value")) != 0)
-    {
-      cout << "FAIL: read attribute value differs" << endl;
-    }
-  memset(attrvalue, 0, sizeof (attrvalue));
-  if (os->getattr (oid, "fred", attrvalue, sizeof(attrvalue)) < 0)
-    {
-      cout << "FAIL: getattr fred" << endl;
-    }
-  else if (strncmp ("value", attrvalue, strlen("value")) != 0)
-    {
-      cout << "FAIL: read attribute value differs" << endl;
-    }
-  memset(attrvalue, 0, sizeof (attrvalue));
-  if (os->getattr (oid, "beta", attrvalue, sizeof(attrvalue)) < 0)
-    {
-      cout << "FAIL: getattr beta" << endl;
-    }
-  else if (strncmp ("value", attrvalue, strlen("value")) != 0)
-    {
-      cout << "FAIL: read attribute value differs" << endl;
-    }
-  memset(attrvalue, 0, sizeof (attrvalue));
-  if (os->getattr (oid, "gamma", attrvalue, sizeof(attrvalue)) < 0)
-    {
-      cout << "FAIL: getattr gamma" << endl;
-    }
-  else if (strncmp ("value", attrvalue, strlen("value")) != 0)
-    {
-      cout << "FAIL: read attribute value differs" << endl;
-    }
-
-  if (os->setattr (oid, "alpha", "different", strlen("different")) != 0)
-    cout << "FAIL: setattr overwrite" << endl;
-  memset(attrvalue, 0, sizeof (attrvalue));
-  if (os->getattr (oid, "alpha", attrvalue, sizeof(attrvalue)) < 0)
-    {
-      cout << "FAIL: getattr alpha" << endl;
-    }
-  else if (strncmp ("different", attrvalue, strlen("different")) != 0)
-    {
-      cout << "FAIL: read attribute value differs" << endl;
-    }
-
-  if (os->rmattr (oid, "alpha") != 0)
-    {
-      cout << "FAIL: rmattr alpha" << endl;
-    }
-  if (os->rmattr (oid, "fred") != 0)
-    {
-      cout << "FAIL: rmattr fred" << endl;
-    }
-  if (os->rmattr (oid, "beta") != 0)
-    {
-      cout << "FAIL: rmattr beta" << endl;
-    }
-  if (os->rmattr (oid, "gamma") != 0)
-    {
-      cout << "FAIL: rmattr gamma" << endl;
-    }
-
-  coll_t cid = 0xCAFEBABE;
-  if (os->create_collection (cid) != 0)
-    {
-      cout << "FAIL: create_collection" << endl;
-    }
-  if (os->create_collection (cid + 10) != 0)
-    {
-      cout << "FAIL: create_collection" << endl;
-    }
-  if (os->create_collection (cid + 5) != 0)
-    {
-      cout << "FAIL: create_collection" << endl;
-    }
-  if (os->create_collection (42) != 0)
-    {
-      cout << "FAIL: create_collection" << endl;
-    }
-
-  if (os->collection_add (cid, oid) != 0)
-    {
-      cout << "FAIL: collection_add" << endl;
-    }
-
-  list<coll_t> ls;
-  if (os->list_collections (ls) < 0)
-    {
-      cout << "FAIL: list_collections" << endl;
-    }
-  cout << "collections: ";
-  for (list<coll_t>::iterator it = ls.begin(); it != ls.end(); it++)
-    {
-      cout << *it << ", ";
-    }
-  cout << endl;
-
-  if (os->destroy_collection (0xCAFEBABE + 10) != 0)
-    {
-      cout << "FAIL: destroy_collection" << endl;
-    }
-
-  if (os->destroy_collection (0xCAFEBADE + 10) == 0)
-    {
-      cout << "FAIL: destroy_collection" << endl;
-    }
-
-  object_t oid2 (12345, 12345);
-  for (int i = 0; i < 8; i++)
-    {
-      oid2.rev++;
-      if (os->collection_add (cid, oid2) != 0)
-        {
-          cout << "FAIL: collection_add" << endl;
-        }
-    }
-  for (int i = 0; i < 8; i++)
-    {
-      if (os->collection_remove (cid, oid2) != 0)
-        {
-          cout << "FAIL: collection_remove" << endl;
-        }
-      oid2.rev--;
-    }
-
-  if (os->collection_setattr (cid, "alpha", "value", 5) != 0)
-    cout << "FAIL: collection_setattr" << endl;
-  if (os->collection_setattr (cid, "beta", "value", 5) != 0)
-    cout << "FAIL: collection_setattr" << endl;
-  if (os->collection_setattr (cid, "gamma", "value", 5) != 0)
-    cout << "FAIL: collection_setattr" << endl;
-  if (os->collection_setattr (cid, "fred", "value", 5) != 0)
-    cout << "FAIL: collection_setattr" << endl;
-
-  memset (attrvalue, 0, sizeof (attrvalue));
-  if (os->collection_getattr (cid, "alpha", attrvalue, sizeof (attrvalue)) < 0)
-    cout << "FAIL: collection_getattr" << endl;
-  else if (strncmp (attrvalue, "value", 5) != 0)
-    cout << "FAIL: collection attribute value different" << endl;
-  memset (attrvalue, 0, sizeof (attrvalue));
-  if (os->collection_getattr (cid, "beta", attrvalue, sizeof (attrvalue)) < 0)
-    cout << "FAIL: collection_getattr" << endl;
-  else if (strncmp (attrvalue, "value", 5) != 0)
-    cout << "FAIL: collection attribute value different" << endl;
-  memset (attrvalue, 0, sizeof (attrvalue));
-  if (os->collection_getattr (cid, "gamma", attrvalue, sizeof (attrvalue)) < 0)
-    cout << "FAIL: collection_getattr" << endl;
-  else if (strncmp (attrvalue, "value", 5) != 0)
-    cout << "FAIL: collection attribute value different" << endl;
-  memset (attrvalue, 0, sizeof (attrvalue));
-  if (os->collection_getattr (cid, "fred", attrvalue, sizeof (attrvalue)) < 0)
-    cout << "FAIL: collection_getattr" << endl;
-  else if (strncmp (attrvalue, "value", 5) != 0)
-    cout << "FAIL: collection attribute value different" << endl;
-
-  if (os->collection_setattr (cid, "alpha", "eulavvalue", 10) != 0)
-    cout << "FAIL: collection setattr overwrite" << endl;
-  memset (attrvalue, 0, sizeof (attrvalue));
-  if (os->collection_getattr (cid, "alpha", attrvalue, sizeof (attrvalue)) < 0)
-    cout << "FAIL: collection_getattr" << endl;
-  else if (strncmp (attrvalue, "eulavvalue", 10) != 0)
-    cout << "FAIL: collection attribute value different" << endl;
-  memset (attrvalue, 0, sizeof (attrvalue));
-  if (os->collection_getattr (cid, "beta", attrvalue, sizeof (attrvalue)) < 0)
-    cout << "FAIL: collection_getattr" << endl;
-  else if (strncmp (attrvalue, "value", 5) != 0)
-    cout << "FAIL: collection attribute value different" << endl;
-  memset (attrvalue, 0, sizeof (attrvalue));
-  if (os->collection_getattr (cid, "gamma", attrvalue, sizeof (attrvalue)) < 0)
-    cout << "FAIL: collection_getattr" << endl;
-  else if (strncmp (attrvalue, "value", 5) != 0)
-    cout << "FAIL: collection attribute value different" << endl;
-  memset (attrvalue, 0, sizeof (attrvalue));
-  if (os->collection_getattr (cid, "fred", attrvalue, sizeof (attrvalue)) < 0)
-    cout << "FAIL: collection_getattr" << endl;
-  else if (strncmp (attrvalue, "value", 5) != 0)
-    cout << "FAIL: collection attribute value different" << endl;
-
-  if (os->collection_rmattr (cid, "alpha") != 0)
-    cout << "FAIL: collection_rmattr" << endl;
-  if (os->collection_rmattr (cid, "fred") != 0)
-    cout << "FAIL: collection_rmattr" << endl;
-  if (os->collection_rmattr (cid, "beta") != 0)
-    cout << "FAIL: collection_rmattr" << endl;
-  if (os->collection_rmattr (cid, "gamma") != 0)
-    cout << "FAIL: collection_rmattr" << endl;
-
-  if (os->collection_rmattr (cid, "alpha") == 0)
-    cout << "FAIL: collection_rmattr (nonexistent)" << endl;
-
-  // Truncate the object.
-  if (os->truncate (oid, 512, NULL) != 0)
-    {
-      cout << "FAIL: truncate" << endl;
-    }
-
-  // Expand the object.
-  if (os->truncate (oid, 1200, NULL) != 0)
-    {
-      cout << "FAIL: expand" << endl;
-    }
-
-  // Delete the object.
-  if (os->remove (oid) != 0)
-    {
-      cout << "FAIL: could not remove object" << endl;
-    }
-
-  // Shouldn't be there
-  if (os->exists (oid))
-    {
-      cout << "FAIL: should not be there" << endl;
-    }
-
-  os->sync();
-  exit (0);
-}