]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
2007-02-19 Casey Marshall <csm@soe.ucsc.edu>
authorrsdio <rsdio@29311d96-e01e-0410-9327-a35deaab8ce9>
Tue, 20 Feb 2007 01:11:09 +0000 (01:11 +0000)
committerrsdio <rsdio@29311d96-e01e-0410-9327-a35deaab8ce9>
Tue, 20 Feb 2007 01:11:09 +0000 (01:11 +0000)
* config.cc (g_conf): set `bdbstore_cachesize' to 0 (== use
default BDB cache size).
(parse_config_options): handle `--bdbstore-cachesize'.
* config.h (md_config_t): add `bdbstore_cachesize.'
* osbdb/OSBDB.cc (binary_search): reimplemented, non-dumb like.
(OSBDB::opendb): new method.
(OSBDB::mount): handle db opening here.
(OSBDB::umount): close the DB, dispose of Db pointer.
(OSBDB::mkfs): handle db opening here.
(OSBDB::pick_object_revision_lt): implemented.
(OSBDB::stat): handle split between object data and object
"inode."
(OSBDB::remove): likewise; also remove attributes.
(OSBDB::truncate): likewise; don't worry about large objects
here.
(OSBDB::read): handle data/inode split; only use DB_DBT_PARTIAL if
we have to.
(OSBDB::write): likewise.
(OSBDB::clone): handle data/inode split; just read in the whole
object.
(OSBDB::create_collection): fix collections list move.
(OSBDB::collection_add): fix objects list move.
(OSBDB::_setattr): handle attribute key changes; fix attribute
names move.
(OSBDB::_getattr): new method.
(OSBDB::getattr): new method.
(OSBDB::getattrs): new method.
(OSBDB::rmattr): new method.
(OSBDB::listattr): fix attribute key types; fix reading
attributes.
(OSBDB::collection_setattr): implemented.
(OSBDB::collection_rmattr): implemented.
(OSBDB::collection_getattr): implemented.
(OSBDB::collection_listattr): implemented.
* osbdb/OSBDB.h: enforce POD types for DB keys; add << operator
for DB key types. Make an "inode" type for object metadata, so we
don't need to prepend an object length to the object data.
(OSBDB::env, OSBDB::mounted, OSBDB::opened): new fields.
(OSBDB::OSBDB): don't initialize `db' here.
(OSBDB::~OSBDB): handle db closure.

git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@1111 29311d96-e01e-0410-9327-a35deaab8ce9

trunk/ceph/config.cc
trunk/ceph/config.h
trunk/ceph/osbdb/OSBDB.cc
trunk/ceph/osbdb/OSBDB.h

index 4f1b110c07cc0b4ff5c83031618cf29e7c32d2dc..9219d3b9d16bf15ecffec16b690f68d94a1ee929 100644 (file)
@@ -307,7 +307,8 @@ md_config_t g_conf = {
   bdbstore_btree: false,
   bdbstore_ffactor: 0,
   bdbstore_nelem: 0,
-  bdbstore_pagesize: 0
+  bdbstore_pagesize: 0,
+  bdbstore_cachesize: 0
 #endif // USE_OSBDB
 };
 
@@ -799,6 +800,9 @@ void parse_config_options(std::vector<char*>& args)
     else if (strcmp(args[i], "--bdbstore-hash-pagesize") == 0) {
       g_conf.bdbstore_pagesize = atoi(args[++i]);
     }
+    else if (strcmp(args[i], "--bdbstore-cachesize") == 0) {
+      g_conf.bdbstore_cachesize = atoi(args[++i]);
+    }
 #endif // USE_OSBDB
 
     else {
index 0fa35bdc2390b5ed2a64f5d336d91e0dcecf5875..b0edea33ffacb5d8f54e0e5cbb52d7b2603dd81d 100644 (file)
@@ -296,6 +296,7 @@ struct md_config_t {
   int bdbstore_ffactor;
   int bdbstore_nelem;
   int bdbstore_pagesize;
+  int bdbstore_cachesize;
 #endif // USE_OSBDB
 };
 
index a61cf7207cbc97fb2fba670dd0a05b5f609f3026..c4f4f5a71acbc68d390ada0f32d924eecd5e3151 100644 (file)
@@ -32,33 +32,81 @@ using namespace std;
 template<typename T>
 uint32_t binary_search (T *array, size_t size, T key)
 {
-  int c = size >> 1;
-  int i = c;
+  int low = 0;
+  int high = size;
+  int p = (low + high) / 2;
 
-  while (c != 0)
+  while (low < high - 1)
     {
-      c = c >> 1;
-      if (array[i] < key)
+      if (array[p] > key)
         {
-          i = i + c;
+          high = p;
         }
-      else if (array[i] > key)
+      else if (array[p] < key)
         {
-          i = i - c;
+          low = p;
         }
       else
-        return i;
+        return p;
+
+      p = (low + high) / 2;
     }
 
-  if (array[i] < key)
-    i++;
-  return i;
+  if (array[p] < key)
+    p++;
+  else if (array[p] > key && p > 0)
+    p--;
+  return p;
 } 
 
 \f // Management.
 
+int OSBDB::opendb(DBTYPE type, int flags)
+{
+  db = new Db(env, 0);
+  db->set_error_stream (&std::cerr);
+  db->set_message_stream (&std::cout);
+  db->set_flags (0);
+  if (!g_conf.bdbstore_btree)
+    {
+      if (g_conf.bdbstore_pagesize > 0)
+        db->set_pagesize (g_conf.bdbstore_pagesize);
+      if (g_conf.bdbstore_ffactor > 0 && g_conf.bdbstore_nelem > 0)
+        {
+          db->set_h_ffactor (g_conf.bdbstore_ffactor);
+          db->set_h_nelem (g_conf.bdbstore_nelem);
+        }
+    }
+  if (g_conf.bdbstore_cachesize > 0)
+    {
+      db->set_cachesize (0, g_conf.bdbstore_cachesize, 0);
+    }
+
+  int ret;
+  if ((ret = db->open (NULL, device.c_str(), NULL, type, flags, 0)) != 0)
+    {
+      derr(1) << "failed to open database: " << device << ": "
+              << strerror(ret) << std::endl;
+      return -EINVAL;
+    }
+  opened = true;
+  return 0;
+}
+
 int OSBDB::mount()
 {
+  dout(2) << "mount " << device << endl;
+
+  if (mounted)
+    return 0;
+
+  if (!opened)
+    {
+      int ret;
+      if ((ret = opendb ()) != 0)
+        return ret;
+    }
+
   // XXX Do we want anything else in the superblock?
 
   Dbt key (OSBDB_SUPERBLOCK_KEY, 1);
@@ -67,55 +115,105 @@ int OSBDB::mount()
   value.set_dlen (sizeof (super));
   value.set_ulen (sizeof (super));
   value.set_flags (DB_DBT_USERMEM | DB_DBT_PARTIAL);
-  if (db.get (NULL, &key, &value, 0) != 0)
+
+  if (db->get (NULL, &key, &value, 0) != 0)
     return -EINVAL; // XXX how to say "badly formed fs?"
+
+  dout(2) << ".mount " << super << endl;
+
   if (super.version != OSBDB_THIS_VERSION)
     return -EINVAL;
 
   DBTYPE t;
-  db.get_type (&t);
+  db->get_type (&t);
 
   if (t == DB_BTREE)
     {
       u_int32_t minkey;
-      db.get_bt_minkey (&minkey);
+      u_int32_t flags;
+      db->get_bt_minkey (&minkey);
+      db->get_flags (&flags);
       dout(1) << "mounted version " << OSBDB_THIS_VERSION << "; Btree; "
-              << "min keys per page: " << minkey << endl;
+              << "min keys per page: " << minkey << "; flags: "
+              << hex << flags << endl;
+      cout << dec;
     }
   else
     {
       u_int32_t ffactor;
       u_int32_t nelem;
-      db.get_h_ffactor (&ffactor);
-      db.get_h_nelem (&nelem);
+      u_int32_t flags;
+      db->get_h_ffactor (&ffactor);
+      db->get_h_nelem (&nelem);
+      db->get_flags (&flags);
       dout(1) << "mounted version " << OSBDB_THIS_VERSION << "; Hash; "
               << "fill factor: " << ffactor
-              << " table size: " << nelem << endl;
+              << " table size: " << nelem << "; flags: "
+              << hex << flags << endl;
+      cout << dec;
     }
 
+  mounted = true;
   return 0;
 }
 
 int OSBDB::umount()
 {
-  // Anything?
+  if (!mounted)
+    return -EINVAL;
+  sync();
+  int ret;
+  if (opened)
+    {
+      if ((ret = db->close (0)) != 0)
+        {
+          derr(1) << "close: " << db_strerror(ret) << endl;
+          return -EINVAL;
+        }
+      delete db;
+      db = NULL;
+    }
+  mounted = false;
+  opened = false;
   return 0;
 }
 
 int OSBDB::mkfs()
 {
+  if (mounted)
+    return -EINVAL;
+
+  dout(2) << "mkfs" << endl;
+
+  unlink (device.c_str());
+  int ret;
+  if ((ret = opendb((g_conf.bdbstore_btree ? DB_BTREE : DB_HASH), DB_CREATE)) != 0)
+    {
+      derr(1) << "failed to open database: " << device << ": "
+              << strerror(ret) << std::endl;
+      return -EINVAL;
+    }
+  opened = true;
+  dout(3) << "..opened " << device << endl;
+
   uint32_t c;
-  int ret = db.truncate (NULL, &c, 0);
+  ret = db->truncate (NULL, &c, 0);
   if (ret != 0)
-    return -EIO; // ???
+    {
+      return -EIO; // ???
+    }
 
   Dbt key (OSBDB_SUPERBLOCK_KEY, 1);
   struct stored_superblock sb;
   sb.version = OSBDB_THIS_VERSION;
   Dbt value (&sb, sizeof (sb));
 
-  if (db.put (NULL, &key, &value, 0) != 0)
-    return -EIO; // ???
+  dout(3) << "..writing superblock" << endl;
+  if (db->put (NULL, &key, &value, 0) != 0)
+    {
+      return -EIO; // ???
+    }
+  dout(3) << "..wrote superblock" << endl;
 
   return 0;
 }
@@ -124,133 +222,170 @@ int OSBDB::mkfs()
 
 int OSBDB::pick_object_revision_lt(object_t& oid)
 {
-  return -1; // FIXME
+  if (!mounted)
+    return -EINVAL;
+
+  // XXX this is pretty lame. Can we do better?
+  assert(oid.rev > 0);
+  oid.rev--;
+  while (oid.rev > 0)
+    {
+      if (exists (oid))
+        {
+          return 0;
+        }
+      oid.rev--;
+    }
+  return -EEXIST; // FIXME
 }
 
 bool OSBDB::exists(object_t oid)
 {
+  dout(2) << "exists " << oid << endl;
   struct stat st;
   return (stat (oid, &st) == 0);
 }
 
 int OSBDB::statfs (struct statfs *st)
 {
-  return 0;
+  return -ENOSYS;
 }
 
 int OSBDB::stat(object_t oid, struct stat *st)
 {
+  if (!mounted)
+    return -EINVAL;
+
+  dout(2) << "stat " << oid << endl;
+
+  object_inode_key ikey = new_object_inode_key(oid);
   stored_object obj;
-  Dbt key (&oid, sizeof (object_t));
+  Dbt key (&ikey, sizeof_object_inode_key());
   Dbt value (&obj, sizeof (obj));
-  value.set_flags (DB_DBT_USERMEM | DB_DBT_PARTIAL);
+  value.set_flags (DB_DBT_USERMEM);
   value.set_ulen (sizeof (obj));
 
-  if (db.get (NULL, &key, &value, 0) != 0)
-    return -ENOENT;
+  dout(3) << "  lookup " << ikey << endl;
+  int ret;
+  if ((ret = db->get (NULL, &key, &value, 0)) != 0)
+    {
+      derr(1) << " get returned " << ret << endl;
+      return -ENOENT;
+    }
 
   st->st_size = obj.length;
+  dout(3) << "stat length:" << obj.length << endl;
   return 0;
 }
 
 int OSBDB::remove(object_t oid, Context *onsafe)
 {
-  Dbt key (&oid, sizeof (object_t));
-  db.del (NULL, &key, 0);
+  if (!mounted)
+    return -EINVAL;
 
-  // FIXME remove attributes for this object.
+  dout(2) << "remove " << oid << endl;
+
+  oid_t id;
+  mkoid(id, oid);
+  Dbt key (&id, sizeof (oid_t));
+  db->del (NULL, &key, 0);
+  object_inode_key _ikey = new_object_inode_key (oid);
+  Dbt ikey (&_ikey, sizeof_object_inode_key());
+  db->del (NULL, &ikey, 0);
+
+  attrs_id aids = new_attrs_id (oid);
+  Dbt askey (&aids, sizeof_attrs_id());
+  Dbt asval;
+  asval.set_flags (DB_DBT_MALLOC);
+  if (db->get (NULL, &askey, &asval, 0) == 0)
+    {
+      // We have attributes; remove them.
+      stored_attrs *sap = (stored_attrs *) asval.get_data();
+      auto_ptr<stored_attrs> sa (sap);
+      for (unsigned i = 0; i < sap->count; i++)
+        {
+          attr_id aid = new_attr_id (oid, sap->names[i].name);
+          Dbt akey (&aid, sizeof (aid));
+          db->del (NULL, &akey, 0);
+        }
+      db->del (NULL, &askey, 0);
+    }
 
   return 0;
 }
 
 int OSBDB::truncate(object_t oid, off_t size, Context *onsafe)
 {
+  if (!mounted)
+    return -EINVAL;
+
+  dout(2) << "truncate " << size << endl;
+
   if (size > 0xFFFFFFFF)
     return -ENOSPC;
 
+  object_inode_key ikey = new_object_inode_key (oid);
   stored_object obj;
-  Dbt key (&oid, sizeof (oid));
+  Dbt key (&ikey, sizeof_object_inode_key());
   Dbt value (&obj, sizeof (obj));
   value.set_dlen (sizeof (obj));
   value.set_ulen (sizeof (obj));
-  value.set_flags (DB_DBT_USERMEM | DB_DBT_PARTIAL);
+  value.set_flags (DB_DBT_USERMEM);
 
-  if (db.get (NULL, &key, &value, 0) != 0)
-    {
-      return -ENOENT;
-    }
+  if (db->get (NULL, &key, &value, 0) != 0)
+    return -ENOENT;
 
   if (obj.length < size)
     {
+      oid_t id;
+      mkoid (id, oid);
+      Dbt okey (&id, sizeof (oid_t));
       char b[] = { '\0' };
       Dbt newVal (b, 1);
-      newVal.set_doff ((size_t) size + offsetof (stored_object, data) - 1);
+      newVal.set_doff ((size_t) size);
       newVal.set_dlen (1);
       newVal.set_ulen (1);
       newVal.set_flags (DB_DBT_PARTIAL);
-      if (db.put (NULL, &key, &newVal, 0) != 0)
-        {
-          return -EIO;
-        }
+      if (db->put (NULL, &okey, &newVal, 0) != 0)
+        return -EIO;
 
       obj.length = size;
-      value.set_doff (0);
-      value.set_dlen (sizeof (off_t));
-      value.set_ulen (sizeof (off_t));
-      value.set_flags (DB_DBT_USERMEM | DB_DBT_PARTIAL);
-      if (db.put (NULL, &key, &value, 0) != 0)
-        {
-          return -EIO;
-        }
+      value.set_ulen (sizeof (obj));
+      if (db->put (NULL, &key, &value, 0) != 0)
+        return -EIO;
     }
   else if (obj.length > size)
     {
+      obj.length = size;
+      Dbt tval (&obj, sizeof (obj));
+      tval.set_ulen (sizeof (obj));
+      tval.set_flags (DB_DBT_USERMEM);
+      if (db->put (NULL, &key, &tval, 0) != 0)
+        return -EIO;
       if (size == 0)
         {
-          obj.length = 0;
-          Dbt tval (&obj, sizeof (obj));
-          tval.set_ulen (sizeof (obj));
-          tval.set_flags (DB_DBT_USERMEM);
-          if (db.put (NULL, &key, &tval, 0) != 0)
-            {
-              return -EIO;
-            }
-        }
-      else if (size < 2 * 1024 * 1024) // XXX
-        {
-          size_t newSz = sizeof (stored_object) + (size_t) size;
-          auto_ptr<stored_object> obj2 ((stored_object *) malloc (newSz));
-          stored_object *objp = obj2.get();
-          Dbt tval (objp, newSz);
-          tval.set_doff (0);
-          tval.set_dlen (newSz);
-          tval.set_ulen (newSz);
-          tval.set_flags (DB_DBT_USERMEM | DB_DBT_PARTIAL);
-          if (db.get (NULL, &key, &tval, 0) != 0)
-            {
-              return -EIO;
-            }
-
-          objp->length = size;
-          tval.set_ulen (newSz);
-          tval.set_size (newSz);
-          tval.set_flags (DB_DBT_USERMEM);
-          if (db.put (NULL, &key, &tval, 0) != 0)
-            {
-              return -EIO;
-            }
+          char x[1];
+          oid_t id;
+          mkoid (id, oid);
+          Dbt okey (&id, sizeof (oid_t));
+          Dbt oval (&x, 0);
+          if (db->put (NULL, &okey, &oval, 0) != 0)
+            return -EIO;
         }
       else
         {
-          // XXX FIXME
-          std::cerr << "WARNING: punting on truncating file to size "
-                    << size << std::endl;
-          obj.length = size;
-          value.set_doff (0);
-          value.set_dlen (sizeof (off_t));
-          value.set_ulen (sizeof (off_t));
-          value.set_flags (DB_DBT_USERMEM | DB_DBT_PARTIAL);
-          db.put (NULL, &key, &value, 0);
+          oid_t id;
+          mkoid (id, oid);
+          Dbt okey (&id, sizeof (oid_t));
+          Dbt oval;
+          oval.set_flags (DB_DBT_MALLOC);
+          if (db->get (NULL, &okey, &oval, 0) != 0)
+            return -EIO;
+          auto_ptr<char> ovalPtr ((char *) oval.get_data());
+          oval.set_size ((size_t) size);
+          oval.set_ulen ((size_t) size);
+          if (db->put (NULL, &okey, &oval, 0) != 0)
+            return -EIO;
         }
     }
 
@@ -259,69 +394,183 @@ int OSBDB::truncate(object_t oid, off_t size, Context *onsafe)
 
 int OSBDB::read(object_t oid, off_t offset, size_t len, bufferlist& bl)
 {
-  stored_object obj;
-  Dbt key (&oid, sizeof (oid));
-  Dbt value (&obj, sizeof (obj));
+  if (!mounted)
+    return -EINVAL;
 
-  value.set_ulen (sizeof (obj));
-  value.set_dlen (sizeof (obj));
-  value.set_flags (DB_DBT_USERMEM | DB_DBT_PARTIAL);
-  
-  if (db.get (NULL, &key, &value, 0) != 0)
+  dout(2) << "read " << oid << " " << offset << " "
+          << len << endl;
+
+  DbTxn *txn = NULL;
+  //env->txn_begin (NULL, &txn, 0);
+
+  object_inode_key _ikey = new_object_inode_key (oid);
+  stored_object obj;
+  Dbt ikey (&_ikey, sizeof_object_inode_key());
+  Dbt ival (&obj, sizeof (obj));
+  ival.set_flags (DB_DBT_USERMEM);
+  ival.set_ulen (sizeof(obj));
+
+  dout(3) << "  get " << _ikey << endl;
+  int ret;
+  if ((ret = db->get (txn, &ikey, &ival, 0)) != 0)
     {
+      //txn->abort();
+      derr(1) << "get returned " << db_strerror (ret) << endl;
       return -ENOENT;
     }
 
-  if (offset >= obj.length)
-    return 0;
-
-  char *buf = bl.c_str();
-  Dbt dval (buf, len);
-  dval.set_doff (offsetof (stored_object, data) + (size_t) offset);
-  dval.set_dlen (len);
-  dval.set_ulen (len);
-  dval.set_flags (DB_DBT_USERMEM | DB_DBT_PARTIAL);
-  if (db.get (NULL, &key, &dval, 0) != 0)
+  if (offset == 0 && len >= obj.length)
     {
-      return -EIO;
+      len = obj.length;
+      dout(3) << "  doing full read of " << len << endl;
+      oid_t id;
+      mkoid (id, oid);
+      Dbt key (&id, sizeof (oid_t));
+      Dbt value (bl.c_str(), len);
+      value.set_ulen (len);
+      value.set_flags (DB_DBT_USERMEM);
+      dout(3) << "  getting " << oid << endl;
+      if ((ret = db->get (txn, &key, &value, 0)) != 0)
+        {
+          derr(1) << " get returned " << db_strerror (ret) << endl;
+          //txn->abort();
+          return -EIO;
+        }
+    }
+  else
+    {
+      if (offset > obj.length)
+        return 0;
+      if (offset + len > obj.length)
+        len = obj.length - (size_t) offset;
+      dout(3) << "  doing partial read of " << len << endl;
+      oid_t id;
+      mkoid (id, oid);
+      Dbt key (&id, sizeof (oid));
+      Dbt value (bl.c_str(), len);
+      value.set_doff ((size_t) offset);
+      value.set_dlen (len);
+      value.set_ulen (len);
+      value.set_flags (DB_DBT_USERMEM | DB_DBT_PARTIAL);
+      dout(3) << "  getting " << oid << endl;
+      if ((ret = db->get (NULL, &key, &value, 0)) != 0)
+        {
+          derr(1) << "get returned " << db_strerror (ret) << endl;
+          //txn->abort();
+          return -EIO;
+        }
     }
 
-  return dval.get_size();
+  //txn->commit (0);
+  return len;
 }
 
 int OSBDB::write(object_t oid, off_t offset, size_t len,
                  bufferlist& bl, Context *onsafe)
 {
-  stored_object obj;
-  Dbt key (&oid, sizeof (oid));
-  Dbt value (&obj, sizeof (obj));
+  if (!mounted)
+    return -EINVAL;
 
-  if (db.get (NULL, &key, &value, 0) != 0)
-    {
-      obj.length = 0;
-    }
+  dout(2) << "write " << oid << " " << offset << " "
+          << len << endl;
+
+  if (offset > 0xFFFFFFFFL || offset + len > 0xFFFFFFFFL)
+    return -ENOSPC;
+
+  DbTxn *txn = NULL;
+  //env->txn_begin (NULL, &txn, 0);
 
-  if (offset + len > obj.length)
+  object_inode_key _ikey = new_object_inode_key (oid);
+  stored_object obj;
+  Dbt ikey (&_ikey, sizeof_object_inode_key());
+  Dbt ival (&obj, sizeof (obj));
+  ival.set_ulen (sizeof (obj));
+  ival.set_flags (DB_DBT_USERMEM);
+
+  int ret;
+  dout(3) << "  getting " << _ikey << endl;
+  if (db->get (txn, &ikey, &ival, 0) != 0)
     {
+      dout(3) << "  writing new object" << endl;
+
+      // New object.
       obj.length = (size_t) offset + len;
-      value.set_dlen (offsetof (stored_object, data));
-      value.set_ulen (offsetof (stored_object, data));
-      value.set_flags (DB_DBT_PARTIAL);
-      if (db.put (NULL, &key, &value, 0) != 0)
+      dout(3) << "  mapping " << _ikey << " => "
+              << obj << endl;
+      if ((ret = db->put (txn, &ikey, &ival, 0)) != 0)
         {
+          derr(1) << "  put returned " << db_strerror (ret) << endl; 
           return -EIO;
         }
+
+      oid_t id;
+      mkoid (id, oid);
+      Dbt key (&id, sizeof (oid_t));
+      Dbt value (bl.c_str(), len);
+      if (offset == 0) // whole object
+        {
+          value.set_flags (DB_DBT_USERMEM);
+          value.set_ulen (len);
+        }
+      else
+        {
+          value.set_flags (DB_DBT_USERMEM | DB_DBT_PARTIAL);
+          value.set_ulen (len);
+          value.set_doff ((size_t) offset);
+          value.set_dlen (len);
+        }
+      dout(3) << "  mapping " << oid << " => ("
+              << obj.length << " bytes)" << endl;
+      if ((ret = db->put (txn, &key, &value, 0)) != 0)
+        {
+          derr(1) << "  put returned " << db_strerror (ret) << endl; 
+          return -EIO;
+        }
+      return len;
     }
 
-  char *buf = bl.c_str();
-  Dbt dval (buf, len);
-  dval.set_doff (offsetof (stored_object, data) + (size_t) offset);
-  dval.set_dlen (len);
-  dval.set_ulen (len);
-  dval.set_flags (DB_DBT_PARTIAL);
-  if (db.put (NULL, &key, &dval, 0) != 0)
+  if (offset == 0 && len >= obj.length)
     {
-      return -EIO;
+      if (len != obj.length)
+        {
+          obj.length = len;
+          if ((ret = db->put (txn, &ikey, &ival, 0)) != 0)
+            {
+              derr(1) << "  put returned " << db_strerror (ret) << endl; 
+              return -EIO;
+            }
+        }
+      oid_t id;
+      mkoid(id, oid);
+      Dbt key (&id, sizeof (oid_t));
+      Dbt value (bl.c_str(), len);
+      if (db->put (txn, &key, &value, 0) != 0)
+        {
+          return -EIO;
+        }
+    }
+  else
+    {
+      if (offset + len > obj.length)
+        {
+          obj.length = (size_t) offset + len;
+          if (db->put (NULL, &ikey, &ival, 0) != 0)
+            {
+              return -EIO;
+            }
+        }
+      oid_t id;
+      mkoid(id, oid);
+      Dbt key (&id, sizeof (oid_t));
+      Dbt value (bl.c_str(), len);
+      value.set_doff ((size_t) offset);
+      value.set_dlen (len);
+      value.set_ulen (len);
+      value.set_flags (DB_DBT_PARTIAL);
+      if (db->put (NULL, &key, &value, 0) != 0)
+        {
+          return -EIO;
+        }
     }
 
   return len;
@@ -329,41 +578,41 @@ int OSBDB::write(object_t oid, off_t offset, size_t len,
 
 int OSBDB::clone(object_t oid, object_t noid)
 {
-  struct stored_object obj;
+  if (!mounted)
+    return -EINVAL;
+
+  dout(2) << "clone " << oid << ", " << noid << endl;
 
   if (exists (noid))
     return -EEXIST;
 
-  Dbt key (&oid, sizeof (oid));
-  Dbt value (&obj, sizeof (obj));
-  Dbt newKey (&noid, sizeof (noid));
+  object_inode_key _ikey = new_object_inode_key (oid);
+  object_inode_key _nikey = new_object_inode_key (noid);
+  stored_object obj;
+  Dbt ikey (&_ikey, sizeof_object_inode_key());
+  Dbt ival (&obj, sizeof (obj));
+  Dbt nikey (&_nikey, sizeof_object_inode_key());
+  ival.set_ulen (sizeof (obj));
+  ival.set_flags (DB_DBT_USERMEM);
+
+  oid_t id, nid;
+  mkoid(id, oid);
+  mkoid(nid, noid);
+  Dbt key (&id, sizeof (oid_t));
+  Dbt nkey (&oid, sizeof (oid_t));
+  Dbt value;
+  value.set_flags (DB_DBT_MALLOC);
 
-  value.set_ulen (sizeof (obj));
-  value.set_dlen (sizeof (obj));
-  if (db.get (NULL, &key, &value, 0) != 0)
+  if (db->get (NULL, &ikey, &ival, 0) != 0)
     return -ENOENT;
+  if (db->get (NULL, &key, &value, 0) != 0)
+    return -ENOENT;
+  auto_ptr<char> valueptr ((char *) value.get_data());
 
-  db.put (NULL, &newKey, &value, 0);
-
-  auto_ptr<char> buf (new char[4096]);
-  for (size_t o = 0; o < obj.length; o += 4096)
-    {
-      Dbt block (buf.get(), 4096);
-      block.set_ulen (4096);
-      block.set_doff (offsetof (stored_object, data) + o);
-      block.set_dlen (4096);
-      block.set_flags (DB_DBT_USERMEM | DB_DBT_PARTIAL);
-      if (db.get (NULL, &key, &block, 0) != 0)
-        {
-          return -EIO;
-        }
-
-      block.set_flags (DB_DBT_PARTIAL);
-      if (db.put (NULL, &newKey, &block, 0) != 0)
-        {
-          return -EIO;
-        }
-    }
+  if (db->put (NULL, &nikey, &ival, 0) != 0)
+    return -EIO;
+  if (db->put (NULL, &nkey, &value, 0) != 0)
+    return -EIO;
 
   return 0;
 }
@@ -372,11 +621,16 @@ int OSBDB::clone(object_t oid, object_t noid)
 
 int OSBDB::list_collections(list<coll_t>& ls)
 {
+  if (!mounted)
+    return -EINVAL;
+
+  dout(2) << "list_collections" << endl;
+
   Dbt key (COLLECTIONS_KEY, 1);
   Dbt value;
   value.set_flags (DB_DBT_MALLOC);
 
-  if (db.get (NULL, &key, &value, 0) != 0)
+  if (db->get (NULL, &key, &value, 0) != 0)
     return 0; // no collections.
 
   auto_ptr<stored_colls> sc ((stored_colls *) value.get_data());
@@ -389,6 +643,11 @@ int OSBDB::list_collections(list<coll_t>& ls)
 
 int OSBDB::create_collection(coll_t c, Context *onsafe)
 {
+  if (!mounted)
+    return -EINVAL;
+
+  dout(2) << "create_collection " << c << endl;
+
   Dbt key (COLLECTIONS_KEY, 1);
   Dbt value;
   value.set_flags (DB_DBT_MALLOC);
@@ -396,7 +655,7 @@ int OSBDB::create_collection(coll_t c, Context *onsafe)
   stored_colls *scp = NULL;
   size_t sz = 0;
   bool created = false;
-  if (db.get (NULL, &key, &value, 0) != 0)
+  if (db->get (NULL, &key, &value, 0) != 0)
     {
       sz = sizeof (stored_colls) + sizeof (coll_t);
       scp = (stored_colls *) malloc (sz);
@@ -416,23 +675,34 @@ int OSBDB::create_collection(coll_t c, Context *onsafe)
   if (scp->colls[ins] == c)
     return -EEXIST;
 
+  dout(3) << "..insertion point: " << ins << endl;
+
   // Make room for a new collection ID.
   if (!created)
   {
     sz += sizeof (coll_t);
+    dout(3) << "..increase size to " << sz << endl;
     stored_colls *scp2 = (stored_colls *) realloc (scp, sz);
+    sc.release ();
     sc.reset (scp2);
     scp = scp2;
   }
 
-  memmove (&scp->colls[ins], &scp->colls[ins + 1],
-           (scp->count - ins) * sizeof (coll_t));
+  int n = (scp->count - ins) * sizeof (coll_t);
+  if (n > 0)
+    {
+      dout(3) << "..moving " << n << " bytes up" << endl;
+      memmove (&scp->colls[ins + 1], &scp->colls[ins], n);
+    }
+  scp->count++;
   scp->colls[ins] = c;
 
+  dout(3) << "..collections: " << scp << endl;
+
   // Put the modified collection list back.
   {
     Dbt value2 (scp, sz);
-    if (db.put (NULL, &key, &value, 0) != 0)
+    if (db->put (NULL, &key, &value2, 0) != 0)
       {
         return -EIO;
       }
@@ -444,7 +714,7 @@ int OSBDB::create_collection(coll_t c, Context *onsafe)
     new_coll.count = 0;
     Dbt coll_key (&c, sizeof (coll_t));
     Dbt coll_value (&new_coll, sizeof (stored_coll));
-    if (db.put (NULL, &coll_key, &coll_value, 0) != 0)
+    if (db->put (NULL, &coll_key, &coll_value, 0) != 0)
       {
         return -EIO;
       }
@@ -455,11 +725,16 @@ int OSBDB::create_collection(coll_t c, Context *onsafe)
 
 int OSBDB::destroy_collection(coll_t c, Context *onsafe)
 {
+  if (!mounted)
+    return -EINVAL;
+
+  dout(2) << "destroy_collection " << c << endl;
+
   Dbt key (COLLECTIONS_KEY, 1);
   Dbt value;
   value.set_flags (DB_DBT_MALLOC);
 
-  if (db.get (NULL, &key, &value, 0) != 0)
+  if (db->get (NULL, &key, &value, 0) != 0)
     {
       return -ENOENT; // XXX
     }
@@ -486,14 +761,14 @@ int OSBDB::destroy_collection(coll_t c, Context *onsafe)
   // Modify the record size to be one less.
   Dbt nvalue (scp, value.get_size() - sizeof (coll_t));
   nvalue.set_flags (DB_DBT_USERMEM);
-  if (db.put (NULL, &key, &nvalue, 0) != 0)
+  if (db->put (NULL, &key, &nvalue, 0) != 0)
     {
       return -EIO;
     }
 
   // Delete the collection.
   Dbt collKey (&c, sizeof (coll_t));
-  if (db.del (NULL, &collKey, 0) != 0)
+  if (db->del (NULL, &collKey, 0) != 0)
     {
       return -EIO;
     }
@@ -503,11 +778,16 @@ int OSBDB::destroy_collection(coll_t c, Context *onsafe)
 
 bool OSBDB::collection_exists(coll_t c)
 {
+  if (!mounted)
+    return -EINVAL;
+
+  dout(2) << "collection_exists " << c << endl;
+
   Dbt key (COLLECTIONS_KEY, 1);
   Dbt value;
   value.set_flags (DB_DBT_MALLOC);
 
-  if (db.get (NULL, &key, &value, 0) != 0)
+  if (db->get (NULL, &key, &value, 0) != 0)
     return false;
 
   stored_colls *scp = (stored_colls *) value.get_data();
@@ -521,16 +801,25 @@ bool OSBDB::collection_exists(coll_t c)
 
 int OSBDB::collection_stat(coll_t c, struct stat *st)
 {
+  if (!mounted)
+    return -EINVAL;
+
+  dout(2) << "collection_stat " << c << endl;
   return -ENOSYS;
 }
 
 int OSBDB::collection_add(coll_t c, object_t o, Context *onsafe)
 {
+  if (!mounted)
+    return -EINVAL;
+
+  dout(2) << "collection_add " << c << " " << o << endl;
+
   Dbt key (&c, sizeof (coll_t));
   Dbt value;
   value.set_flags (DB_DBT_MALLOC);
 
-  if (db.get (NULL, &key, &value, 0) != 0)
+  if (db->get (NULL, &key, &value, 0) != 0)
     {
       return -ENOENT;
     }
@@ -554,17 +843,20 @@ int OSBDB::collection_add(coll_t c, object_t o, Context *onsafe)
   // Make room for the new value, and add it.
   sz += sizeof (object_t);
   scp = (stored_coll *) realloc (scp, sz);
+  sc.release();
   sc.reset (scp);
   if (ins < scp->count)
     {
-      size_t n = (scp->count - ins - 1) * sizeof (object_t);
+      size_t n = (scp->count - ins) * sizeof (object_t);
       memmove (&scp->objects[ins + 1], &scp->objects[ins], n);
     }
   scp->count++;
   scp->objects[ins] = o;
 
+  dout(3) << "..collection: " << scp << endl;
+
   Dbt nvalue (scp, sz);
-  if (db.put (NULL, &key, &nvalue, 0) != 0)
+  if (db->put (NULL, &key, &nvalue, 0) != 0)
     {
       return -EIO;
     }
@@ -574,11 +866,16 @@ int OSBDB::collection_add(coll_t c, object_t o, Context *onsafe)
 
 int OSBDB::collection_remove(coll_t c, object_t o, Context *onsafe)
 {
+  if (!mounted)
+    return -EINVAL;
+
+  dout(2) << "collection_remove " << c << " " << o << endl;
+
   Dbt key (&c, sizeof (coll_t));
   Dbt value;
   value.set_flags (DB_DBT_MALLOC);
 
-  if (db.get (NULL, &key, &value, 0) != 0)
+  if (db->get (NULL, &key, &value, 0) != 0)
     {
       return -ENOENT;
     }
@@ -603,8 +900,10 @@ int OSBDB::collection_remove(coll_t c, object_t o, Context *onsafe)
     }
   scp->count--;
 
+  dout(3) << "..collection " << scp << endl;
+
   Dbt nval (scp, value.get_size() - sizeof (object_t));
-  if (db.put (NULL, &key, &nval, 0) != 0)
+  if (db->put (NULL, &key, &nval, 0) != 0)
     {
       return -EIO;
     }
@@ -614,9 +913,12 @@ int OSBDB::collection_remove(coll_t c, object_t o, Context *onsafe)
 
 int OSBDB::collection_list(coll_t c, list<object_t>& o)
 {
+  if (!mounted)
+    return -EINVAL;
+
   Dbt key (&c, sizeof (coll_t));
   Dbt value;
-  if (db.get (NULL, &key, &value, 0) != 0)
+  if (db->get (NULL, &key, &value, 0) != 0)
     return -ENOENT;
 
   stored_coll *scp = (stored_coll *) value.get_data();
@@ -632,27 +934,33 @@ int OSBDB::collection_list(coll_t c, list<object_t>& o)
 int OSBDB::_setattr(object_t oid, const char *name,
                     const void *value, size_t size, Context *onsafe)
 {
+  if (!mounted)
+    return -EINVAL;
+
   if (strlen (name) >= OSBDB_MAX_ATTR_LEN)
     return -ENAMETOOLONG;
 
   // Add name to attribute list, if needed.
-  attrs_id aids;
-  aids.oid = oid;
-  Dbt attrs_key (&aids, sizeof (aids));
+  attrs_id aids = new_attrs_id (oid);
+  Dbt attrs_key (&aids, sizeof_attrs_id());
   Dbt attrs_val;
   attrs_val.set_flags (DB_DBT_MALLOC);
   stored_attrs *sap = NULL;
   size_t sz = 0;
-  if (db.get (NULL, &attrs_key, &attrs_val, 0) != 0)
+
+  dout(3) << "  getting " << aids << endl;
+  if (db->get (NULL, &attrs_key, &attrs_val, 0) != 0)
     {
+      dout(2) << "  first attribute" << endl;
       sz = sizeof (stored_attrs);
-      sap = new stored_attrs;
+      sap = (stored_attrs *) malloc(sz);
       sap->count = 0;
     }
   else
     {
       sz = attrs_val.get_size();
       sap = (stored_attrs *) attrs_val.get_data();
+      dout(2) << "  add to list of " << sap->count << " attrs" << endl;
     }
   auto_ptr<stored_attrs> sa (sap);
 
@@ -662,33 +970,48 @@ int OSBDB::_setattr(object_t oid, const char *name,
   int ins = 0;
   if (sap->count > 0)
     ins = binary_search<attr_name> (sap->names, sap->count, _name);
-  if (sap->count == 0 || sap->names[ins] != _name)
+  dout(3) << "  insertion point is " << ins << endl;
+  if (sap->count == 0 || strcmp (sap->names[ins].name, name) != 0)
     {
       sz += sizeof (attr_name);
+      dout(3) << "  realloc 0x" << hex << ((void *) sap) << " to "
+              << dec << sz << endl;
       sap = (stored_attrs *) realloc (sap, sz);
+      dout(3) << "  returns 0x" << hex << ((void *) sap) << endl;
+      sa.release ();
       sa.reset (sap);
-      size_t n = (sap->count - ins - 1) * sizeof (attr_name);
+      int n = (sap->count - ins) * sizeof (attr_name);
       if (n > 0)
         {
+          dout(3) << "  move " << n  << " bytes from 0x"
+                  << hex << (&sap->names[ins]) << " to 0x"
+                  << hex << (&sap->names[ins+1]) << endl;
           memmove (&sap->names[ins+1], &sap->names[ins], n);
         }
-      sap->names[ins] = _name;
+      memset (&sap->names[ins], 0, sizeof (attr_name));
+      strncpy (sap->names[ins].name, name, OSBDB_MAX_ATTR_LEN);
       sap->count++;
 
       Dbt newAttrs_val (sap, sz);
       newAttrs_val.set_ulen (sz);
       newAttrs_val.set_flags (DB_DBT_USERMEM);
-      if (db.put (NULL, &attrs_key, &newAttrs_val, 0) != 0)
+      dout(3) << "  putting " << aids << endl;
+      if (db->put (NULL, &attrs_key, &newAttrs_val, 0) != 0)
         return -EIO;
     }
+  else
+    {
+      dout(3) << "  attribute " << name << " already exists" << endl;
+    }
+
+  dout(3) << "  attributes list: " << sap << endl;
 
   // Add the attribute.
-  attr_id aid;
-  aid.oid = oid;
-  strncpy (aid.name.name, name, OSBDB_MAX_ATTR_LEN);
+  attr_id aid = new_attr_id (oid, name);
   Dbt attr_key (&aid, sizeof (aid));
   Dbt attr_val ((void *) value, size);
-  if (db.put (NULL, &attr_key, &attr_val, 0) != 0)
+  dout(3) << "  writing attribute key " << aid << endl;
+  if (db->put (NULL, &attr_key, &attr_val, 0) != 0)
     return -EIO;
 
   return 0;
@@ -698,6 +1021,11 @@ int OSBDB::setattr(object_t oid, const char *name,
                    const void *value, size_t size,
                    Context *onsafe)
 {
+  if (!mounted)
+    return -EINVAL;
+
+  dout(2) << "setattr " << oid << ":" << name << " => ("
+          << size << " bytes)" << endl;
   int ret = _setattr (oid, name, value, size, onsafe);
   return ret;
 }
@@ -705,6 +1033,9 @@ int OSBDB::setattr(object_t oid, const char *name,
 int OSBDB::setattrs(object_t oid, map<string,bufferptr>& aset,
                     Context *onsafe)
 {
+  if (!mounted)
+    return -EINVAL;
+
   map<string,bufferptr>::iterator it;
   for (it = aset.begin(); it != aset.end(); it++)
     {
@@ -720,16 +1051,118 @@ int OSBDB::setattrs(object_t oid, map<string,bufferptr>& aset,
   return 0;
 }
 
+int OSBDB::_getattr (object_t oid, const char *name, void *value, size_t size)
+{
+  if (!mounted)
+    return -EINVAL;
+
+  attr_id aid = new_attr_id (oid, name);
+  Dbt key (&aid, sizeof (aid));
+  Dbt val (value, size);
+  val.set_ulen (size);
+  val.set_flags (DB_DBT_USERMEM | DB_DBT_PARTIAL);
+
+  if (db->get (NULL, &key, &val, 0) != 0)
+    {
+      return -ENOENT;
+    }
+
+  return val.get_size();
+}
+
+int OSBDB::getattr(object_t oid, const char *name, void *value, size_t size)
+{
+  if (!mounted)
+    return -EINVAL;
+
+  return _getattr (oid, name, value, size);
+}
+
+int OSBDB::getattrs(object_t oid, map<string,bufferptr>& aset)
+{
+  if (!mounted)
+    return -EINVAL;
+
+  int count = 0;
+  for (map<string,bufferptr>::iterator it = aset.begin();
+       it != aset.end(); it++)
+    {
+      int ret = _getattr (oid, (*it).first.c_str(),
+                          (*it).second.c_str(),
+                          (*it).second.length());
+      if (ret < 0)
+        return ret;
+      count += ret;
+    }
+  return count;
+}
+
+int OSBDB::rmattr(object_t oid, const char *name, Context *onsafe)
+{
+  if (!mounted)
+    return -EINVAL;
+  attrs_id aids = new_attrs_id (oid);
+  Dbt askey (&aids, sizeof_attrs_id());
+  Dbt asvalue;
+  asvalue.set_flags (DB_DBT_MALLOC);
+
+  if (db->get (NULL, &askey, &asvalue, 0) != 0)
+    return -ENOENT;
+
+  stored_attrs *sap = (stored_attrs *) asvalue.get_data();
+  auto_ptr<stored_attrs> sa (sap);
+
+  if (sap->count == 0)
+    return -ENOENT;
+
+  attr_name _name;
+  memset(&name, 0, sizeof (_name));
+  strncpy (_name.name, name, OSBDB_MAX_ATTR_LEN);
+  int ins = binary_search<attr_name> (sap->names, sap->count, _name);
+  if (strcmp (sap->names[ins].name, name) != 0)
+    return -ENOENT;
+
+  // Shift the later elements down by one, if needed.
+  int n = (sap->count - ins) * sizeof (attr_name);
+  if (n > 0)
+    memmove (&(sap->names[ins]), &(sap->names[ins + 1]), n);
+  sap->count--;
+  asvalue.set_size(asvalue.get_size() - sizeof (attr_name));
+  int ret;
+  if ((ret = db->put (NULL, &askey, &asvalue, 0)) != 0)
+    {
+      derr(1) << "put stored_attrs " << db_strerror (ret) << endl;
+      return -EIO;
+    }
+
+  // Remove the attribute.
+  attr_id aid = new_attr_id (oid, name);
+  Dbt key (&aid, sizeof (aid));
+  if ((ret = db->del (NULL, &key, 0)) != 0)
+    derr(1) << "deleting " << aid << ": " << db_strerror(ret) << endl;
+
+  return 0;
+}
+
 int OSBDB::listattr(object_t oid, char *attrs, size_t size)
 {
-  attrs_id aids;
-  aids.oid = oid;
-  Dbt key (&aids, sizeof (aids));
+  if (!mounted)
+    return -EINVAL;
+
+  dout(2) << "listattr " << oid << endl;
+
+  attrs_id aids = new_attrs_id (oid);
+  Dbt key (&aids, sizeof_attrs_id());
   Dbt value;
   value.set_flags (DB_DBT_MALLOC);
 
-  if (!db.get (NULL, &key, &value, 0) != 0)
-    return -ENOENT;
+  int ret;
+  if ((ret = db->get (NULL, &key, &value, 0)) != 0)
+    {
+      derr(1) << "fetching " << aids << ": " << db_strerror (ret)
+              << endl;
+      return -ENOENT;
+    }
 
   stored_attrs *attrsp = (stored_attrs *) value.get_data();
   auto_ptr<stored_attrs> _attrs (attrsp);
@@ -738,9 +1171,10 @@ int OSBDB::listattr(object_t oid, char *attrs, size_t size)
   for (unsigned i = 0; i < attrsp->count && s < size; i++)
     {
       int n = MIN (OSBDB_MAX_ATTR_LEN,
-                   MIN (strlen (attrsp->names[i].name), size - s));
+                   MIN (strlen (attrsp->names[i].name), size - s - 1));
       strncpy (p, attrsp->names[i].name, n);
-      p = p + n;
+      p[n] = '\0';
+      p = p + n + 1;
     }
   return 0;
 }
@@ -751,35 +1185,211 @@ int OSBDB::collection_setattr(coll_t cid, const char *name,
                               const void *value, size_t size,
                               Context *onsafe)
 {
-  return -ENOSYS;
+  if (!mounted)
+    return -EINVAL;
+
+  dout(2) << "collection_setattr" << cid << " " << name
+          << " (" << size << " bytes)" << endl;
+  if (strlen (name) >= OSBDB_MAX_ATTR_LEN)
+    return -ENAMETOOLONG;
+
+  // Add name to attribute list, if needed.
+  coll_attrs_id aids = new_coll_attrs_id (cid);
+  Dbt attrs_key (&aids, sizeof_coll_attrs_id());
+  Dbt attrs_val;
+  attrs_val.set_flags (DB_DBT_MALLOC);
+  stored_attrs *sap = NULL;
+  size_t sz = 0;
+
+  dout(3) << "  getting " << aids << endl;
+  if (db->get (NULL, &attrs_key, &attrs_val, 0) != 0)
+    {
+      dout(2) << "  first attribute" << endl;
+      sz = sizeof (stored_attrs);
+      sap = (stored_attrs *) malloc(sz);
+      sap->count = 0;
+    }
+  else
+    {
+      sz = attrs_val.get_size();
+      sap = (stored_attrs *) attrs_val.get_data();
+      dout(2) << "  add to list of " << sap->count << " attrs" << endl;
+    }
+  auto_ptr<stored_attrs> sa (sap);
+
+  attr_name _name;
+  strncpy (_name.name, name, OSBDB_MAX_ATTR_LEN);
+
+  int ins = 0;
+  if (sap->count > 0)
+    ins = binary_search<attr_name> (sap->names, sap->count, _name);
+  dout(3) << "  insertion point is " << ins << endl;
+  if (sap->count == 0 || strcmp (sap->names[ins].name, name) != 0)
+    {
+      sz += sizeof (attr_name);
+      dout(3) << "  realloc 0x" << hex << ((void *) sap) << " to "
+              << dec << sz << endl;
+      sap = (stored_attrs *) realloc (sap, sz);
+      dout(3) << "  returns 0x" << hex << ((void *) sap) << endl;
+      sa.release ();
+      sa.reset (sap);
+      int n = (sap->count - ins) * sizeof (attr_name);
+      if (n > 0)
+        {
+          dout(3) << "  move " << n  << " bytes from 0x"
+                  << hex << (&sap->names[ins]) << " to 0x"
+                  << hex << (&sap->names[ins+1]) << endl;
+          memmove (&sap->names[ins+1], &sap->names[ins], n);
+        }
+      memset (&sap->names[ins], 0, sizeof (attr_name));
+      strncpy (sap->names[ins].name, name, OSBDB_MAX_ATTR_LEN);
+      sap->count++;
+
+      Dbt newAttrs_val (sap, sz);
+      newAttrs_val.set_ulen (sz);
+      newAttrs_val.set_flags (DB_DBT_USERMEM);
+      dout(3) << "  putting " << aids << endl;
+      if (db->put (NULL, &attrs_key, &newAttrs_val, 0) != 0)
+        return -EIO;
+    }
+  else
+    {
+      dout(3) << "  attribute " << name << " already exists" << endl;
+    }
+
+  dout(3) << "  attributes list: " << sap << endl;
+
+  // Add the attribute.
+  coll_attr_id aid = new_coll_attr_id (cid, name);
+  Dbt attr_key (&aid, sizeof (aid));
+  Dbt attr_val ((void *) value, size);
+  dout(3) << "  writing attribute key " << aid << endl;
+  if (db->put (NULL, &attr_key, &attr_val, 0) != 0)
+    return -EIO;
+
+  return 0;
 }
 
 int OSBDB::collection_rmattr(coll_t cid, const char *name,
                              Context *onsafe)
 {
-  return -ENOSYS;
+  if (!mounted)
+    return -EINVAL;
+
+  coll_attrs_id aids = new_coll_attrs_id (cid);
+  Dbt askey (&aids, sizeof_coll_attrs_id());
+  Dbt asvalue;
+  asvalue.set_flags (DB_DBT_MALLOC);
+
+  if (db->get (NULL, &askey, &asvalue, 0) != 0)
+    return -ENOENT;
+
+  stored_attrs *sap = (stored_attrs *) asvalue.get_data();
+  auto_ptr<stored_attrs> sa (sap);
+
+  if (sap->count == 0)
+    return -ENOENT;
+
+  attr_name _name;
+  memset(&name, 0, sizeof (_name));
+  strncpy (_name.name, name, OSBDB_MAX_ATTR_LEN);
+  int ins = binary_search<attr_name> (sap->names, sap->count, _name);
+  if (strcmp (sap->names[ins].name, name) != 0)
+    return -ENOENT;
+
+  // Shift the later elements down by one, if needed.
+  int n = (sap->count - ins) * sizeof (attr_name);
+  if (n > 0)
+    memmove (&(sap->names[ins]), &(sap->names[ins + 1]), n);
+  sap->count--;
+  asvalue.set_size(asvalue.get_size() - sizeof (attr_name));
+  int ret;
+  if ((ret = db->put (NULL, &askey, &asvalue, 0)) != 0)
+    {
+      derr(1) << "put stored_attrs " << db_strerror (ret) << endl;
+      return -EIO;
+    }
+
+  // Remove the attribute.
+  coll_attr_id aid = new_coll_attr_id (cid, name);
+  Dbt key (&aid, sizeof (aid));
+  if ((ret = db->del (NULL, &key, 0)) != 0)
+    derr(1) << "deleting " << aid << ": " << db_strerror(ret) << endl;
+
+  return 0;
 }
 
 int OSBDB::collection_getattr(coll_t cid, const char *name,
                               void *value, size_t size)
 {
-  return -ENOSYS;
+  if (!mounted)
+    return -EINVAL;
+
+  dout(2) << "collection_getattr " << cid << " " << name << endl;
+
+  coll_attr_id caid = new_coll_attr_id (cid, name);
+  Dbt key (&caid, sizeof (caid));
+  Dbt val (value, size);
+  val.set_ulen (size);
+  val.set_dlen (size);
+  val.set_flags (DB_DBT_USERMEM | DB_DBT_PARTIAL);
+
+  if (db->get (NULL, &key, &val, 0) != 0)
+    return -ENOENT;
+
+  return val.get_size();
 }
 
 int OSBDB::collection_listattr(coll_t cid, char *attrs, size_t size)
 {
-  return -ENOSYS;
+  if (!mounted)
+    return -EINVAL;
+
+  dout(2) << "collection_listattr " << cid << endl;
+
+  coll_attrs_id caids = new_coll_attrs_id (cid);
+  Dbt key (&caids, sizeof_coll_attrs_id());
+  Dbt value;
+  value.set_flags (DB_DBT_MALLOC);
+
+  int ret;
+  if ((ret = db->get (NULL, &key, &value, 0)) != 0)
+    {
+      derr(1) << "fetching " << caids << ": " << db_strerror (ret)
+              << endl;
+      return -ENOENT;
+    }
+
+  stored_attrs *attrsp = (stored_attrs *) value.get_data();
+  auto_ptr<stored_attrs> _attrs (attrsp);
+  size_t s = 0;
+  char *p = attrs;
+  for (unsigned i = 0; i < attrsp->count && s < size; i++)
+    {
+      int n = MIN (OSBDB_MAX_ATTR_LEN,
+                   MIN (strlen (attrsp->names[i].name), size - s - 1));
+      strncpy (p, attrsp->names[i].name, n);
+      p[n] = '\0';
+      p = p + n + 1;
+    }
+  return 0;
 }
 
 \f // Sync.
 
 void OSBDB::sync (Context *onsync)
 {
+  if (!mounted)
+    return;
+
   sync();
   // huh?
 }
 
 void OSBDB::sync()
 {
-  db.sync(0);
+  if (!mounted)
+    return;
+
+  db->sync(0);
 }
index 2e76b85ef43a4c1282faa6d488b996942873fff2..9ba42d206d29057e5fc799f1f57a7f409f5df7eb 100644 (file)
@@ -41,36 +41,150 @@ struct stored_superblock
   uint32_t version;
 };
 
+inline ostream& operator<<(ostream& out, const stored_superblock sb)
+{
+  out << "osbdb.super(" << sb.version << ")" << endl;
+  return out;
+}
+
+/**
+ * An object identifier; we define this so we can have a POD object to
+ * work with.
+ */
+struct oid_t // POD
+{
+  char id[16];
+};
+
+inline void mkoid (oid_t& id, object_t& oid)
+{
+  // XXX byte order?
+  memcpy (id.id, &oid, sizeof (oid_t));
+}
+
+inline ostream& operator<<(ostream& out, const oid_t id)
+{
+  for (int i = 0; i < 16; i++)
+    {
+      out.fill('0');
+      out << setw(2) << hex << (id.id[i] & 0xFF);
+      if ((i & 3) == 3)
+        out << ':';
+    }
+  out.unsetf(ios::right);
+  out << dec;
+  return out;
+}
+
 /**
- * A stored object. These are mapped by the raw 128-bit object ID, and
- * include the length of the object.
+ * An "inode" key. We map a 'stored_object' struct to this key for
+ * every object.
+ */
+struct object_inode_key // POD
+{
+  oid_t oid;
+  char tag;
+};
+
+/**
+ * "Constructor" for an object_inode_key.
+ */
+inline object_inode_key new_object_inode_key (object_t& oid)
+{
+  object_inode_key key;
+  memset(&key, 0, sizeof (object_inode_key));
+  mkoid (key.oid, oid);
+  key.tag = 'i';
+  return key;
+}
+
+/*
+ * We use this, instead of sizeof(), to try and guarantee that we
+ * don't include the structure padding, if any.
+ *
+ * This *should* return 17: sizeof (oid_t) == 16; sizeof (char) == 1.
+ */
+inline size_t sizeof_object_inode_key()
+{
+  return offsetof(object_inode_key, tag) + sizeof (char);
+}
+
+             // Frank Poole: Unfortunately, that sounds a little
+             //              like famous last words.
+             //   -- 2001: A Space Odyssey
+
+inline ostream& operator<<(ostream& out, const object_inode_key o)
+{
+  out << o.tag << "/" << o.oid;
+  return out;
+}
+
+/**
+ * A stored object. This is essentially the "inode" of the object,
+ * containing things like the object's length. The object itself is
+ * stored as-is, mapped by the 128-bit object ID.
  */
 struct stored_object
 {
   uint32_t length;
-  char data[0];    // Actually variable-length
 };
 
+inline ostream& operator<<(ostream& out, const stored_object s)
+{
+  out << "inode(l:" << s.length << ")";
+  return out;
+}
+
 /*
  * Key referencing the list of attribute names for an object. This is
  * simply the object's ID, with an additional character 'a' appended.
  */
-struct attrs_id
+struct attrs_id // POD
 {
-  attrs_id() : pad('a') { }
-
-  object_t oid;
-  const char pad;
+  oid_t oid;
+  char tag;
 };
 
+/*
+ * "Construtor" for attrs_id.
+ */
+inline struct attrs_id new_attrs_id (object_t& oid)
+{
+  attrs_id aid;
+  memset (&aid, 0, sizeof (attrs_id));
+  mkoid(aid.oid, oid);
+  aid.tag = 'a';
+  return aid;
+}
+
+/*
+ * See explanation for sizeof_object_inode_id.
+ */
+inline size_t sizeof_attrs_id()
+{
+  return offsetof(struct attrs_id, tag) + sizeof (char);
+}
+
+inline ostream& operator<<(ostream& out, const attrs_id id)
+{
+  out << id.tag << "/" << id.oid;
+  return out;
+}
+
 /*
  * Encapsulation of a single attribute name.
  */
-struct attr_name
+struct attr_name // POD
 {
   char name[OSBDB_MAX_ATTR_LEN];
 };
 
+inline ostream& operator<<(ostream& out, const attr_name n)
+{
+  out << n.name;
+  return out;
+}
+
 inline bool operator<(const attr_name n1, const attr_name n2)
 {
   return (strncmp (n1.name, n2.name, OSBDB_MAX_ATTR_LEN) < 0);
@@ -83,6 +197,7 @@ inline bool operator>(const attr_name n1, const attr_name n2)
 
 inline bool operator==(const attr_name n1, const attr_name n2)
 {
+  std::cerr << n1.name << " == " << n2.name << "?" << endl;
   return (strncmp (n1.name, n2.name, OSBDB_MAX_ATTR_LEN) == 0);
 }
 
@@ -110,27 +225,95 @@ struct stored_attrs
   attr_name names[0];   // actually variable-length
 };
 
+inline ostream& operator<<(ostream& out, const stored_attrs *sa)
+{
+  out << sa->count << " [ ";
+  for (unsigned i = 0; i < sa->count; i++)
+    out << sa->names[i] << (i == sa->count - 1 ? " " : ", ");
+  out << "]";
+  return out;
+}
+
 /*
  * An object attribute key. An object attribute is mapped simply by
  * the object ID appended with the attribute name. Attribute names
  * may not be empty, and must be less than 256 characters, in this
  * implementation.
  */
-struct attr_id
+struct attr_id // POD
 {
-  object_t oid;
+  oid_t oid;
   attr_name name;
 };
 
+inline attr_id new_attr_id (object_t& oid, const char *name)
+{
+  attr_id aid;
+  memset(&aid, 0, sizeof (attr_id));
+  mkoid (aid.oid, oid);
+  strncpy (aid.name.name, name, OSBDB_MAX_ATTR_LEN);
+  return aid;
+}
+
+inline ostream& operator<<(ostream &out, const attr_id id)
+{
+  out << id.oid << ":" << id.name;
+  return out;
+}
+
+/*
+ * A key for a collection attributes list.
+ */
+struct coll_attrs_id // POD
+{
+  coll_t cid;
+  char tag;
+};
+
+inline coll_attrs_id new_coll_attrs_id (coll_t cid)
+{
+  coll_attrs_id catts;
+  memset(&catts, 0, sizeof (coll_attrs_id));
+  catts.cid = cid;
+  catts.tag = 'C';
+  return catts;
+}
+
+inline size_t sizeof_coll_attrs_id()
+{
+  return offsetof(coll_attrs_id, tag) + sizeof (char);
+}
+
+inline ostream& operator<<(ostream& out, coll_attrs_id id)
+{
+  out << id.tag << "/" << id.cid;
+  return out;
+}
+
 /*
  * A collection attribute key. Similar to 
  */
-struct coll_attr_id
+struct coll_attr_id // POD
 {
   coll_t cid;
   attr_name name;
 };
 
+inline coll_attr_id new_coll_attr_id (coll_t cid, const char *name)
+{
+  coll_attr_id catt;
+  memset(&catt, 0, sizeof (coll_attr_id));
+  catt.cid = cid;
+  strncpy (catt.name.name, name, OSBDB_MAX_ATTR_LEN);
+  return catt;
+}
+
+inline ostream& operator<<(ostream& out, coll_attr_id id)
+{
+  out << id.cid << ":" << id.name;
+  return out;
+}
+
 /*
  * This is the key we store the master collections list under.
  */
@@ -151,6 +334,19 @@ struct stored_colls
   coll_t colls[0]; // actually variable-length
 };
 
+inline ostream& operator<<(ostream& out, stored_colls *c)
+{
+  out << c->count << " [ ";
+  for (unsigned i = 0; i < c->count; i++)
+    {
+      out << hex << c->colls[i];
+      if (i < c->count - 1)
+        out << ", ";
+    }
+  out << " ]" << dec;
+  return out;
+}
+
 /*
  * A stored collection (a bag of object IDs). These are referenced by
  * the bare collection identifier type, a coll_t (thus, a 32-bit
@@ -169,41 +365,81 @@ struct stored_coll
   object_t objects[0]; // actually variable-length
 };
 
+inline ostream& operator<<(ostream& out, stored_coll *c)
+{
+  out << c->count << " [ ";
+  for (unsigned i = 0; i < c->count; i++)
+    {
+      out << c->objects[i];
+      if (i < c->count - 1)
+        out << ", ";
+    }
+  out << " ]";
+  return out;
+}
+
 /*
  * The object store interface for Berkeley DB.
  */
 class OSBDB : public ObjectStore
 {
  private:
-  //  DbEnv env;
-  Db db;
+  DbEnv *env;
+  Db *db;
   string device;
+  bool mounted;
+  bool opened;
 
  public:
 
-  OSBDB(const char *dev) : db (NULL, 0), device (dev)
+  OSBDB(const char *dev)
+    : env(0), db (0), device (dev), mounted(false), opened(false)
   {
-    if (!g_conf.bdbstore_btree)
+    /*env = new DbEnv (DB_CXX_NO_EXCEPTIONS);
+    env->set_error_stream (&std::cerr);
+    // WTF? You can't open an env if you set this flag here, but BDB
+    // says you also can't set it after you open the env.
+    //env->set_flags (DB_LOG_INMEMORY, 1);
+    char *p = strrchr (dev, '/');
+    int env_flags = (DB_CREATE | DB_THREAD | DB_INIT_LOCK
+                     | DB_INIT_MPOOL | DB_INIT_TXN | DB_INIT_LOG);
+    if (p != NULL)
       {
-        if (g_conf.bdbstore_pagesize > 0)
-          db.set_pagesize (g_conf.bdbstore_pagesize);
-        if (g_conf.bdbstore_ffactor > 0 && g_conf.bdbstore_nelem > 0)
+        *p = '\0';
+        if (env->open (dev, env_flags, 0) != 0)
           {
-            db.set_h_ffactor (g_conf.bdbstore_ffactor);
-            db.set_h_nelem (g_conf.bdbstore_nelem);
+            std::cerr << "failed to open environment: "
+                      << dev << std::endl;
+            ::abort();
           }
+        *p = '/';
+        dev = p+1;
       }
-    if (db.open (NULL, dev, "OSD", (g_conf.bdbstore_btree ? DB_BTREE : DB_HASH),
-                 DB_CREATE, 0) != 0)
+    else
       {
-        std::cerr << "failed to open database" << std::endl;
-        ::abort();
+        if (env->open (NULL, env_flags, 0) != 0)
+          {
+            std::cerr << "failed to open environment: ." << std::endl;
+            ::abort();
+          }
       }
+
+    // Double WTF: if you remove the DB_LOG_INMEMORY bit, db->open
+    // fails, inexplicably, with EINVAL!*/
+    //    env->set_flags (DB_DIRECT_DB | /*DB_AUTO_COMMIT |*/ DB_LOG_INMEMORY, 1);
   }
 
   ~OSBDB()
   {
-    db.close (0);
+    if (mounted)
+      {
+        umount();
+      }
+    if (env != NULL)
+      {
+        env->close (0);
+        delete env;
+      }
   }
 
   int mount();
@@ -230,9 +466,14 @@ class OSBDB : public ObjectStore
               const void *value, size_t size, Context *onsafe=0);
   int setattrs(object_t oid, map<string,bufferptr>& aset,
                Context *onsafe=0);
-  int clone(object_t oid, object_t noid);
-
+  int getattr(object_t oid, const char *name,
+              void *value, size_t size);
+  int getattrs(object_t oid, map<string,bufferptr>& aset);
+  int rmattr(object_t oid, const char *name,
+             Context *onsafe=0);
   int listattr(object_t oid, char *attrs, size_t size);
+
+  int clone(object_t oid, object_t noid);
   
   // Collections.
   
@@ -258,6 +499,9 @@ class OSBDB : public ObjectStore
   void sync();
 
 private:
+  int opendb (DBTYPE type=DB_UNKNOWN, int flags=0);
+
   int _setattr(object_t oid, const char *name, const void *value,
                size_t size, Context *onsync);
+  int _getattr(object_t oid, const char *name, void *value, size_t size);
 };