]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
2007-03-01 Casey Marshall <csm@soe.ucsc.edu>
authorrsdio <rsdio@29311d96-e01e-0410-9327-a35deaab8ce9>
Fri, 2 Mar 2007 04:07:56 +0000 (04:07 +0000)
committerrsdio <rsdio@29311d96-e01e-0410-9327-a35deaab8ce9>
Fri, 2 Mar 2007 04:07:56 +0000 (04:07 +0000)
* config.cc (g_conf): initialize bdbstore_transactional.
(parse_config_options): accept `--bdbstore-transactional' and
`--debug-bdbstore'.
* config.h (struct md_config_t): add `bdbstore_transactional.'
* doc/bdb.txt: new file.
* osbdb/OSBDB.cc (statfs): implemented.
Added optional transactions to most methods.
* osbdb/OSBDB.h: updated for transactions.
* test/testosbdb.cc: new file.
* test/testos.cc: updated.

git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@1156 29311d96-e01e-0410-9327-a35deaab8ce9

trunk/ceph/config.cc
trunk/ceph/config.h
trunk/ceph/doc/bdb.txt [new file with mode: 0644]
trunk/ceph/osbdb/OSBDB.cc
trunk/ceph/osbdb/OSBDB.h
trunk/ceph/test/testos.cc
trunk/ceph/test/testosbdb.cc [new file with mode: 0644]

index 28bfee898f3985476da1a9606acc6cad3ef787de..207bf2e8236543a21f7082f18fd9f0ed2e5702d5 100644 (file)
@@ -309,7 +309,8 @@ md_config_t g_conf = {
   bdbstore_ffactor: 0,
   bdbstore_nelem: 0,
   bdbstore_pagesize: 0,
-  bdbstore_cachesize: 0
+  bdbstore_cachesize: 0,
+  bdbstore_transactional: false
 #endif // USE_OSBDB
 };
 
@@ -813,6 +814,12 @@ void parse_config_options(std::vector<char*>& args)
     else if (strcmp(args[i], "--bdbstore-cachesize") == 0) {
       g_conf.bdbstore_cachesize = atoi(args[++i]);
     }
+    else if (strcmp(args[i], "--bdbstore-transactional") == 0) {
+      g_conf.bdbstore_transactional = true;
+    }
+    else if (strcmp(args[i], "--debug-bdbstore") == 0) {
+      g_conf.debug_bdbstore = atoi(args[++i]);
+    }
 #endif // USE_OSBDB
 
     else {
index cf0b4540b07c88a5c1171f120e1622a8292e7ac4..a39f280d77f9adad8a4247aee6e16fccf0971767 100644 (file)
@@ -298,6 +298,7 @@ struct md_config_t {
   int bdbstore_nelem;
   int bdbstore_pagesize;
   int bdbstore_cachesize;
+  bool bdbstore_transactional;
 #endif // USE_OSBDB
 };
 
diff --git a/trunk/ceph/doc/bdb.txt b/trunk/ceph/doc/bdb.txt
new file mode 100644 (file)
index 0000000..63e647f
--- /dev/null
@@ -0,0 +1,48 @@
+OBJECT STORE ON BERKELEY DB
+---------------------------
+
+OSBDB is an implementation of an object store that uses Berkeley DB as
+the underlying storage. It is meant to be an alternative to EBOFS.
+
+BUILDING
+--------
+
+You will need to have Berkeley DB installed, including the developent
+packages. We've tested this with Berkeley DB 4.4.20 on Ubuntu 6.10.
+
+To compile OSBDB support, you need to pass the argument "want_bdb=yes"
+to "make." If you don't specify this, OSBDB and all its associated
+support is not included in the executables.
+
+RUNNING
+-------
+
+To use OSBDB in Ceph, simply pass the --bdbstore flag to programs. You
+don't need to create a "device" for OSBDB ahead of time; Berkeley DB
+will take care of creating the files. You also *cannot* use a raw
+device as your store -- it must be regular file.
+
+OSBDB additionally accepts the following flags:
+
+  --bdbstore-btree         Configures OSBDB to use the "Btree"
+                           database type for Berkeley DB. The default
+                           database type is "Hash".
+
+  --bdbstore-hash-ffactor  Sets the "fill factor" for the hash
+                           database type. Takes an integer argument.
+
+  --bdbstore-hash-nelem    Sets the "nelem" parameter for the hash
+                           database type. Takes an integer argument.
+
+  --bdbstore-hash-pagesize Sets the page size for the hash database
+                           type. Takes an integer argument.
+
+  --bdbstore-cachesize     Sets the cache size. Takes an integer
+                           argument, which must be a power of two, and
+                           no less than 20 KiB.
+
+  --bdbstore-transactional Enable (in-memory-only) transactions for
+                           all operations in the OSBDB store.
+
+  --debug-bdbstore         Set the debug level. Takes an integer
+                           argument.
index c4f4f5a71acbc68d390ada0f32d924eecd5e3151..45d5423561a83f51e90e7f04e69038a4630113c4 100644 (file)
@@ -61,8 +61,33 @@ uint32_t binary_search (T *array, size_t size, T key)
 
 \f // Management.
 
-int OSBDB::opendb(DBTYPE type, int flags)
+int OSBDB::opendb(DBTYPE type, int flags, bool new_env)
 {
+  // BDB transactions require an environment.
+  if (g_conf.bdbstore_transactional)
+    {
+      env = new DbEnv (DB_CXX_NO_EXCEPTIONS);
+      env->set_error_stream (&std::cerr);
+      env->set_message_stream (&std::cout);
+      env->set_flags (DB_LOG_INMEMORY, 1);
+      //env->set_flags (DB_DIRECT_DB, 1);
+      int env_flags = (DB_CREATE
+                       | DB_THREAD
+                       | DB_INIT_LOCK
+                       | DB_INIT_MPOOL
+                       | DB_INIT_TXN
+                       | DB_INIT_LOG
+                       | DB_PRIVATE);
+      //if (new_env)
+      //  env->remove (env_dir.c_str(), 0);
+      if (env->open (NULL, env_flags, 0) != 0)
+        {
+          std::cerr << "failed to open environment " << std::endl;
+          return -EIO;
+        }
+
+    }
+
   db = new Db(env, 0);
   db->set_error_stream (&std::cerr);
   db->set_message_stream (&std::cout);
@@ -82,6 +107,10 @@ int OSBDB::opendb(DBTYPE type, int flags)
       db->set_cachesize (0, g_conf.bdbstore_cachesize, 0);
     }
 
+  flags = flags | DB_THREAD;
+  if (transactional)
+    flags = flags | DB_AUTO_COMMIT;
+
   int ret;
   if ((ret = db->open (NULL, device.c_str(), NULL, type, flags, 0)) != 0)
     {
@@ -161,10 +190,23 @@ int OSBDB::umount()
 {
   if (!mounted)
     return -EINVAL;
-  sync();
+
+  dout(2) << "umount" << endl;
+
   int ret;
   if (opened)
     {
+      if (transactional)
+        {
+          env->log_flush (NULL);
+          if ((ret = env->lsn_reset (device.c_str(), 0)) != 0)
+            {
+              derr(1) << "lsn_reset: " << db_strerror (ret) << endl;
+            }
+        }
+
+      db->sync (0);
+
       if ((ret = db->close (0)) != 0)
         {
           derr(1) << "close: " << db_strerror(ret) << endl;
@@ -172,6 +214,13 @@ int OSBDB::umount()
         }
       delete db;
       db = NULL;
+
+      if (env)
+        {
+          env->close (0);
+          delete env;
+          env = NULL;
+        }
     }
   mounted = false;
   opened = false;
@@ -186,11 +235,13 @@ int OSBDB::mkfs()
   dout(2) << "mkfs" << endl;
 
   unlink (device.c_str());
+
   int ret;
-  if ((ret = opendb((g_conf.bdbstore_btree ? DB_BTREE : DB_HASH), DB_CREATE)) != 0)
+  if ((ret = opendb((g_conf.bdbstore_btree ? DB_BTREE : DB_HASH),
+                    DB_CREATE, true)) != 0)
     {
       derr(1) << "failed to open database: " << device << ": "
-              << strerror(ret) << std::endl;
+              << db_strerror(ret) << std::endl;
       return -EINVAL;
     }
   opened = true;
@@ -200,6 +251,7 @@ int OSBDB::mkfs()
   ret = db->truncate (NULL, &c, 0);
   if (ret != 0)
     {
+      derr(1) << "db truncate failed: " << db_strerror (ret) << endl;
       return -EIO; // ???
     }
 
@@ -209,9 +261,11 @@ int OSBDB::mkfs()
   Dbt value (&sb, sizeof (sb));
 
   dout(3) << "..writing superblock" << endl;
-  if (db->put (NULL, &key, &value, 0) != 0)
+  if ((ret = db->put (NULL, &key, &value, 0)) != 0)
     {
-      return -EIO; // ???
+      derr(1) << "failed to write superblock: " << db_strerror (ret)
+              << endl;
+      return -EIO;
     }
   dout(3) << "..wrote superblock" << endl;
 
@@ -222,21 +276,8 @@ int OSBDB::mkfs()
 
 int OSBDB::pick_object_revision_lt(object_t& oid)
 {
-  if (!mounted)
-    return -EINVAL;
-
-  // XXX this is pretty lame. Can we do better?
-  assert(oid.rev > 0);
-  oid.rev--;
-  while (oid.rev > 0)
-    {
-      if (exists (oid))
-        {
-          return 0;
-        }
-      oid.rev--;
-    }
-  return -EEXIST; // FIXME
+  // Not really needed.
+  return -ENOSYS;
 }
 
 bool OSBDB::exists(object_t oid)
@@ -248,7 +289,11 @@ bool OSBDB::exists(object_t oid)
 
 int OSBDB::statfs (struct statfs *st)
 {
-  return -ENOSYS;
+  // Hacky?
+  if (::statfs (device.c_str(), st) != 0)
+    return -errno;
+  st->f_type = OSBDB_MAGIC;
+  return 0;
 }
 
 int OSBDB::stat(object_t oid, struct stat *st)
@@ -285,19 +330,23 @@ int OSBDB::remove(object_t oid, Context *onsafe)
 
   dout(2) << "remove " << oid << endl;
 
+  DbTxn *txn = NULL;
+  if (transactional)
+    env->txn_begin (NULL, &txn, 0);
+
   oid_t id;
   mkoid(id, oid);
   Dbt key (&id, sizeof (oid_t));
   db->del (NULL, &key, 0);
   object_inode_key _ikey = new_object_inode_key (oid);
   Dbt ikey (&_ikey, sizeof_object_inode_key());
-  db->del (NULL, &ikey, 0);
+  db->del (txn, &ikey, 0);
 
   attrs_id aids = new_attrs_id (oid);
   Dbt askey (&aids, sizeof_attrs_id());
   Dbt asval;
   asval.set_flags (DB_DBT_MALLOC);
-  if (db->get (NULL, &askey, &asval, 0) == 0)
+  if (db->get (txn, &askey, &asval, 0) == 0)
     {
       // We have attributes; remove them.
       stored_attrs *sap = (stored_attrs *) asval.get_data();
@@ -306,11 +355,13 @@ int OSBDB::remove(object_t oid, Context *onsafe)
         {
           attr_id aid = new_attr_id (oid, sap->names[i].name);
           Dbt akey (&aid, sizeof (aid));
-          db->del (NULL, &akey, 0);
+          db->del (txn, &akey, 0);
         }
-      db->del (NULL, &askey, 0);
+      db->del (txn, &askey, 0);
     }
 
+  if (txn)
+    txn->commit (0);
   return 0;
 }
 
@@ -324,6 +375,11 @@ int OSBDB::truncate(object_t oid, off_t size, Context *onsafe)
   if (size > 0xFFFFFFFF)
     return -ENOSPC;
 
+  DbTxn *txn = NULL;
+
+  if (transactional)
+    env->txn_begin (NULL, &txn, 0);
+
   object_inode_key ikey = new_object_inode_key (oid);
   stored_object obj;
   Dbt key (&ikey, sizeof_object_inode_key());
@@ -332,8 +388,12 @@ int OSBDB::truncate(object_t oid, off_t size, Context *onsafe)
   value.set_ulen (sizeof (obj));
   value.set_flags (DB_DBT_USERMEM);
 
-  if (db->get (NULL, &key, &value, 0) != 0)
-    return -ENOENT;
+  if (db->get (txn, &key, &value, 0) != 0)
+    {
+      if (txn)
+        txn->abort();
+      return -ENOENT;
+    }
 
   if (obj.length < size)
     {
@@ -346,13 +406,21 @@ int OSBDB::truncate(object_t oid, off_t size, Context *onsafe)
       newVal.set_dlen (1);
       newVal.set_ulen (1);
       newVal.set_flags (DB_DBT_PARTIAL);
-      if (db->put (NULL, &okey, &newVal, 0) != 0)
-        return -EIO;
+      if (db->put (txn, &okey, &newVal, 0) != 0)
+        {
+          if (txn)
+            txn->abort();
+          return -EIO;
+        }
 
       obj.length = size;
       value.set_ulen (sizeof (obj));
-      if (db->put (NULL, &key, &value, 0) != 0)
-        return -EIO;
+      if (db->put (txn, &key, &value, 0) != 0)
+        {
+          if (txn)
+            txn->abort();
+          return -EIO;
+        }
     }
   else if (obj.length > size)
     {
@@ -360,8 +428,12 @@ int OSBDB::truncate(object_t oid, off_t size, Context *onsafe)
       Dbt tval (&obj, sizeof (obj));
       tval.set_ulen (sizeof (obj));
       tval.set_flags (DB_DBT_USERMEM);
-      if (db->put (NULL, &key, &tval, 0) != 0)
-        return -EIO;
+      if (db->put (txn, &key, &tval, 0) != 0)
+        {
+          if (txn)
+            txn->abort();
+          return -EIO;
+        }
       if (size == 0)
         {
           char x[1];
@@ -369,8 +441,12 @@ int OSBDB::truncate(object_t oid, off_t size, Context *onsafe)
           mkoid (id, oid);
           Dbt okey (&id, sizeof (oid_t));
           Dbt oval (&x, 0);
-          if (db->put (NULL, &okey, &oval, 0) != 0)
-            return -EIO;
+          if (db->put (txn, &okey, &oval, 0) != 0)
+            {
+              if (txn)
+                txn->abort();
+              return -EIO;
+            }
         }
       else
         {
@@ -379,16 +455,27 @@ int OSBDB::truncate(object_t oid, off_t size, Context *onsafe)
           Dbt okey (&id, sizeof (oid_t));
           Dbt oval;
           oval.set_flags (DB_DBT_MALLOC);
-          if (db->get (NULL, &okey, &oval, 0) != 0)
-            return -EIO;
+          if (db->get (txn, &okey, &oval, 0) != 0)
+            {
+              if (txn)
+                txn->abort();
+              return -EIO;
+            }
           auto_ptr<char> ovalPtr ((char *) oval.get_data());
           oval.set_size ((size_t) size);
           oval.set_ulen ((size_t) size);
-          if (db->put (NULL, &okey, &oval, 0) != 0)
-            return -EIO;
+          if (db->put (txn, &okey, &oval, 0) != 0)
+            {
+              if (txn)
+                txn->abort();
+              return -EIO;
+            }
         }
     }
 
+  if (txn)
+    txn->commit (0);
+
   return 0;
 }
 
@@ -401,7 +488,8 @@ int OSBDB::read(object_t oid, off_t offset, size_t len, bufferlist& bl)
           << len << endl;
 
   DbTxn *txn = NULL;
-  //env->txn_begin (NULL, &txn, 0);
+  if (transactional)
+    env->txn_begin (NULL, &txn, 0);
 
   object_inode_key _ikey = new_object_inode_key (oid);
   stored_object obj;
@@ -414,7 +502,8 @@ int OSBDB::read(object_t oid, off_t offset, size_t len, bufferlist& bl)
   int ret;
   if ((ret = db->get (txn, &ikey, &ival, 0)) != 0)
     {
-      //txn->abort();
+      if (txn)
+        txn->abort();
       derr(1) << "get returned " << db_strerror (ret) << endl;
       return -ENOENT;
     }
@@ -433,7 +522,8 @@ int OSBDB::read(object_t oid, off_t offset, size_t len, bufferlist& bl)
       if ((ret = db->get (txn, &key, &value, 0)) != 0)
         {
           derr(1) << " get returned " << db_strerror (ret) << endl;
-          //txn->abort();
+          if (txn)
+            txn->abort();
           return -EIO;
         }
     }
@@ -453,15 +543,17 @@ int OSBDB::read(object_t oid, off_t offset, size_t len, bufferlist& bl)
       value.set_ulen (len);
       value.set_flags (DB_DBT_USERMEM | DB_DBT_PARTIAL);
       dout(3) << "  getting " << oid << endl;
-      if ((ret = db->get (NULL, &key, &value, 0)) != 0)
+      if ((ret = db->get (txn, &key, &value, 0)) != 0)
         {
           derr(1) << "get returned " << db_strerror (ret) << endl;
-          //txn->abort();
+          if (txn)
+            txn->abort();
           return -EIO;
         }
     }
 
-  //txn->commit (0);
+  if (txn)
+    txn->commit (0);
   return len;
 }
 
@@ -478,7 +570,8 @@ int OSBDB::write(object_t oid, off_t offset, size_t len,
     return -ENOSPC;
 
   DbTxn *txn = NULL;
-  //env->txn_begin (NULL, &txn, 0);
+  if (transactional)
+    env->txn_begin (txn, &txn, 0);
 
   object_inode_key _ikey = new_object_inode_key (oid);
   stored_object obj;
@@ -499,7 +592,9 @@ int OSBDB::write(object_t oid, off_t offset, size_t len,
               << obj << endl;
       if ((ret = db->put (txn, &ikey, &ival, 0)) != 0)
         {
-          derr(1) << "  put returned " << db_strerror (ret) << endl; 
+          derr(1) << "  put returned " << db_strerror (ret) << endl;
+          if (txn)
+            txn->abort();
           return -EIO;
         }
 
@@ -523,9 +618,14 @@ int OSBDB::write(object_t oid, off_t offset, size_t len,
               << obj.length << " bytes)" << endl;
       if ((ret = db->put (txn, &key, &value, 0)) != 0)
         {
-          derr(1) << "  put returned " << db_strerror (ret) << endl; 
+          derr(1) << "  put returned " << db_strerror (ret) << endl;
+          if (txn)
+            txn->abort();
           return -EIO;
         }
+
+      if (txn)
+        txn->commit (0);
       return len;
     }
 
@@ -536,7 +636,9 @@ int OSBDB::write(object_t oid, off_t offset, size_t len,
           obj.length = len;
           if ((ret = db->put (txn, &ikey, &ival, 0)) != 0)
             {
-              derr(1) << "  put returned " << db_strerror (ret) << endl; 
+              derr(1) << "  put returned " << db_strerror (ret) << endl;
+              if (txn)
+                txn->abort();
               return -EIO;
             }
         }
@@ -546,6 +648,8 @@ int OSBDB::write(object_t oid, off_t offset, size_t len,
       Dbt value (bl.c_str(), len);
       if (db->put (txn, &key, &value, 0) != 0)
         {
+          if (txn)
+            txn->abort();
           return -EIO;
         }
     }
@@ -554,8 +658,10 @@ int OSBDB::write(object_t oid, off_t offset, size_t len,
       if (offset + len > obj.length)
         {
           obj.length = (size_t) offset + len;
-          if (db->put (NULL, &ikey, &ival, 0) != 0)
+          if (db->put (txn, &ikey, &ival, 0) != 0)
             {
+              if (txn)
+                txn->abort();
               return -EIO;
             }
         }
@@ -567,12 +673,16 @@ int OSBDB::write(object_t oid, off_t offset, size_t len,
       value.set_dlen (len);
       value.set_ulen (len);
       value.set_flags (DB_DBT_PARTIAL);
-      if (db->put (NULL, &key, &value, 0) != 0)
+      if (db->put (txn, &key, &value, 0) != 0)
         {
+          if (txn)
+            txn->abort();
           return -EIO;
         }
     }
 
+  if (txn)
+    txn->commit (0);
   return len;
 }
 
@@ -586,6 +696,10 @@ int OSBDB::clone(object_t oid, object_t noid)
   if (exists (noid))
     return -EEXIST;
 
+  DbTxn *txn = NULL;
+  if (transactional)
+    env->txn_begin (NULL, &txn, 0);
+
   object_inode_key _ikey = new_object_inode_key (oid);
   object_inode_key _nikey = new_object_inode_key (noid);
   stored_object obj;
@@ -603,17 +717,35 @@ int OSBDB::clone(object_t oid, object_t noid)
   Dbt value;
   value.set_flags (DB_DBT_MALLOC);
 
-  if (db->get (NULL, &ikey, &ival, 0) != 0)
-    return -ENOENT;
-  if (db->get (NULL, &key, &value, 0) != 0)
-    return -ENOENT;
+  if (db->get (txn, &ikey, &ival, 0) != 0)
+    {
+      if (txn)
+        txn->abort();
+      return -ENOENT;
+    }
+  if (db->get (txn, &key, &value, 0) != 0)
+    {
+      if (txn)
+        txn->abort();
+      return -ENOENT;
+    }
   auto_ptr<char> valueptr ((char *) value.get_data());
 
-  if (db->put (NULL, &nikey, &ival, 0) != 0)
-    return -EIO;
-  if (db->put (NULL, &nkey, &value, 0) != 0)
-    return -EIO;
+  if (db->put (txn, &nikey, &ival, 0) != 0)
+    {
+      if (txn)
+        txn->abort();
+      return -EIO;
+    }
+  if (db->put (txn, &nkey, &value, 0) != 0)
+    {
+      if (txn)
+        txn->abort();
+      return -EIO;
+    }
 
+  if (txn)
+    txn->commit (0);
   return 0;
 }
 
@@ -652,10 +784,14 @@ int OSBDB::create_collection(coll_t c, Context *onsafe)
   Dbt value;
   value.set_flags (DB_DBT_MALLOC);
 
+  DbTxn *txn = NULL;
+  if (transactional)
+    env->txn_begin (NULL, &txn, 0);
+
   stored_colls *scp = NULL;
   size_t sz = 0;
   bool created = false;
-  if (db->get (NULL, &key, &value, 0) != 0)
+  if (db->get (txn, &key, &value, 0) != 0)
     {
       sz = sizeof (stored_colls) + sizeof (coll_t);
       scp = (stored_colls *) malloc (sz);
@@ -673,7 +809,11 @@ int OSBDB::create_collection(coll_t c, Context *onsafe)
   if (scp->count > 0)
     ins = binary_search<coll_t> (scp->colls, scp->count, c);
   if (scp->colls[ins] == c)
-    return -EEXIST;
+    {
+      if (txn != NULL)
+        txn->abort();
+      return -EEXIST;
+    }
 
   dout(3) << "..insertion point: " << ins << endl;
 
@@ -702,8 +842,10 @@ int OSBDB::create_collection(coll_t c, Context *onsafe)
   // Put the modified collection list back.
   {
     Dbt value2 (scp, sz);
-    if (db->put (NULL, &key, &value2, 0) != 0)
+    if (db->put (txn, &key, &value2, 0) != 0)
       {
+        if (txn != NULL)
+          txn->abort();
         return -EIO;
       }
   }
@@ -714,12 +856,16 @@ int OSBDB::create_collection(coll_t c, Context *onsafe)
     new_coll.count = 0;
     Dbt coll_key (&c, sizeof (coll_t));
     Dbt coll_value (&new_coll, sizeof (stored_coll));
-    if (db->put (NULL, &coll_key, &coll_value, 0) != 0)
+    if (db->put (txn, &coll_key, &coll_value, 0) != 0)
       {
+        if (txn != NULL)
+          txn->abort();
         return -EIO;
       }
   }
 
+  if (txn != NULL)
+    txn->commit (0);
   return 0;
 }
 
@@ -733,9 +879,15 @@ int OSBDB::destroy_collection(coll_t c, Context *onsafe)
   Dbt key (COLLECTIONS_KEY, 1);
   Dbt value;
   value.set_flags (DB_DBT_MALLOC);
+  DbTxn *txn = NULL;
 
-  if (db->get (NULL, &key, &value, 0) != 0)
+  if (transactional)
+    env->txn_begin (NULL, &txn, 0);
+
+  if (db->get (txn, &key, &value, 0) != 0)
     {
+      if (txn != NULL)
+        txn->abort();
       return -ENOENT; // XXX
     }
 
@@ -743,11 +895,15 @@ int OSBDB::destroy_collection(coll_t c, Context *onsafe)
   auto_ptr<stored_colls> valueBuf (scp);
   if (scp->count == 0)
     {
+      if (txn != NULL)
+        txn->abort();
       return -ENOENT;
     }
   uint32_t ins = binary_search<coll_t> (scp->colls, scp->count, c);
   if (scp->colls[ins] != c)
     {
+      if (txn != NULL)
+        txn->abort();
       return -ENOENT;
     }
 
@@ -761,18 +917,24 @@ int OSBDB::destroy_collection(coll_t c, Context *onsafe)
   // Modify the record size to be one less.
   Dbt nvalue (scp, value.get_size() - sizeof (coll_t));
   nvalue.set_flags (DB_DBT_USERMEM);
-  if (db->put (NULL, &key, &nvalue, 0) != 0)
+  if (db->put (txn, &key, &nvalue, 0) != 0)
     {
+      if (txn != NULL)
+        txn->abort();
       return -EIO;
     }
 
   // Delete the collection.
   Dbt collKey (&c, sizeof (coll_t));
-  if (db->del (NULL, &collKey, 0) != 0)
+  if (db->del (txn, &collKey, 0) != 0)
     {
+      if (txn != NULL)
+        txn->abort();
       return -EIO;
     }
 
+  if (txn != NULL)
+    txn->commit (0);
   return 0;
 }
 
@@ -805,6 +967,7 @@ int OSBDB::collection_stat(coll_t c, struct stat *st)
     return -EINVAL;
 
   dout(2) << "collection_stat " << c << endl;
+  // XXX is this needed?
   return -ENOSYS;
 }
 
@@ -818,9 +981,15 @@ int OSBDB::collection_add(coll_t c, object_t o, Context *onsafe)
   Dbt key (&c, sizeof (coll_t));
   Dbt value;
   value.set_flags (DB_DBT_MALLOC);
+  DbTxn *txn = NULL;
 
-  if (db->get (NULL, &key, &value, 0) != 0)
+  if (transactional)
+    env->txn_begin (NULL, &txn, 0);
+
+  if (db->get (txn, &key, &value, 0) != 0)
     {
+      if (txn != NULL)
+        txn->abort();
       return -ENOENT;
     }
 
@@ -836,6 +1005,8 @@ int OSBDB::collection_add(coll_t c, object_t o, Context *onsafe)
       // Already there?
       if (scp->objects[ins] == o)
         {
+          if (txn != NULL)
+            txn->abort();
           return -EEXIST;
         }
     }
@@ -856,11 +1027,15 @@ int OSBDB::collection_add(coll_t c, object_t o, Context *onsafe)
   dout(3) << "..collection: " << scp << endl;
 
   Dbt nvalue (scp, sz);
-  if (db->put (NULL, &key, &nvalue, 0) != 0)
+  if (db->put (txn, &key, &nvalue, 0) != 0)
     {
+      if (txn != NULL)
+        txn->abort();
       return -EIO;
     }
 
+  if (txn != NULL)
+    txn->commit (0);
   return 0;
 }
 
@@ -874,9 +1049,15 @@ int OSBDB::collection_remove(coll_t c, object_t o, Context *onsafe)
   Dbt key (&c, sizeof (coll_t));
   Dbt value;
   value.set_flags (DB_DBT_MALLOC);
+  DbTxn *txn = NULL;
+  if (transactional)
+    env->txn_begin (NULL, &txn, 0);
 
-  if (db->get (NULL, &key, &value, 0) != 0)
+  if (db->get (txn, &key, &value, 0) != 0)
     {
+      if (txn != NULL)
+        txn->abort();
       return -ENOENT;
     }
 
@@ -885,11 +1066,15 @@ int OSBDB::collection_remove(coll_t c, object_t o, Context *onsafe)
 
   if (scp->count == 0)
     {
+      if (txn != NULL)
+        txn->abort();
       return -ENOENT;
     }
   uint32_t ins = binary_search<object_t> (scp->objects, scp->count, o);
   if (scp->objects[ins] != o)
     {
+      if (txn != NULL)
+        txn->abort();
       return -ENOENT;
     }
 
@@ -903,11 +1088,15 @@ int OSBDB::collection_remove(coll_t c, object_t o, Context *onsafe)
   dout(3) << "..collection " << scp << endl;
 
   Dbt nval (scp, value.get_size() - sizeof (object_t));
-  if (db->put (NULL, &key, &nval, 0) != 0)
+  if (db->put (txn, &key, &nval, 0) != 0)
     {
+      if (txn != NULL)
+        txn->abort();
       return -EIO;
     }
 
+  if (txn != NULL)
+    txn->commit (0);
   return 0;
 }
 
@@ -918,21 +1107,33 @@ int OSBDB::collection_list(coll_t c, list<object_t>& o)
 
   Dbt key (&c, sizeof (coll_t));
   Dbt value;
-  if (db->get (NULL, &key, &value, 0) != 0)
-    return -ENOENT;
+  DbTxn *txn = NULL;
+
+  if (transactional)
+    env->txn_begin (NULL, &txn, 0);
+
+  if (db->get (txn, &key, &value, 0) != 0)
+    {
+      if (txn != NULL)
+        txn->abort();
+      return -ENOENT;
+    }
 
   stored_coll *scp = (stored_coll *) value.get_data();
   auto_ptr<stored_coll> sc (scp);
   for (uint32_t i = 0; i < scp->count; i++)
     o.push_back (scp->objects[i]);
 
+  if (txn != NULL)
+    txn->commit (0);
   return 0;
 }
 
 \f // Attributes
 
 int OSBDB::_setattr(object_t oid, const char *name,
-                    const void *value, size_t size, Context *onsafe)
+                    const void *value, size_t size, Context *onsafe,
+                    DbTxn *txn)
 {
   if (!mounted)
     return -EINVAL;
@@ -949,7 +1150,7 @@ int OSBDB::_setattr(object_t oid, const char *name,
   size_t sz = 0;
 
   dout(3) << "  getting " << aids << endl;
-  if (db->get (NULL, &attrs_key, &attrs_val, 0) != 0)
+  if (db->get (txn, &attrs_key, &attrs_val, 0) != 0)
     {
       dout(2) << "  first attribute" << endl;
       sz = sizeof (stored_attrs);
@@ -996,7 +1197,7 @@ int OSBDB::_setattr(object_t oid, const char *name,
       newAttrs_val.set_ulen (sz);
       newAttrs_val.set_flags (DB_DBT_USERMEM);
       dout(3) << "  putting " << aids << endl;
-      if (db->put (NULL, &attrs_key, &newAttrs_val, 0) != 0)
+      if (db->put (txn, &attrs_key, &newAttrs_val, 0) != 0)
         return -EIO;
     }
   else
@@ -1011,7 +1212,7 @@ int OSBDB::_setattr(object_t oid, const char *name,
   Dbt attr_key (&aid, sizeof (aid));
   Dbt attr_val ((void *) value, size);
   dout(3) << "  writing attribute key " << aid << endl;
-  if (db->put (NULL, &attr_key, &attr_val, 0) != 0)
+  if (db->put (txn, &attr_key, &attr_val, 0) != 0)
     return -EIO;
 
   return 0;
@@ -1024,9 +1225,23 @@ int OSBDB::setattr(object_t oid, const char *name,
   if (!mounted)
     return -EINVAL;
 
+  DbTxn *txn = NULL;
+  if (transactional)
+    env->txn_begin (NULL, &txn, 0);
+
   dout(2) << "setattr " << oid << ":" << name << " => ("
           << size << " bytes)" << endl;
-  int ret = _setattr (oid, name, value, size, onsafe);
+  int ret = _setattr (oid, name, value, size, onsafe, txn);
+  if (ret == 0)
+    {
+      if (txn != NULL)
+        txn->commit (0);
+    }
+  else
+    {
+      if (txn != NULL)
+        txn->abort();
+    }
   return ret;
 }
 
@@ -1036,18 +1251,28 @@ int OSBDB::setattrs(object_t oid, map<string,bufferptr>& aset,
   if (!mounted)
     return -EINVAL;
 
+  DbTxn *txn = NULL;
+
+  if (transactional)
+    env->txn_begin (NULL, &txn, 0);
+
   map<string,bufferptr>::iterator it;
   for (it = aset.begin(); it != aset.end(); it++)
     {
       string name = it->first;
       bufferptr value = it->second;
       int ret = _setattr (oid, name.c_str(), value.c_str(),
-                          value.length(), onsafe);
+                          value.length(), onsafe, txn);
       if (ret != 0)
         {
+          if (txn != NULL)
+            txn->abort();
           return ret;
         }
     }
+
+  if (txn != NULL)
+    txn->commit (0);
   return 0;
 }
 
@@ -1106,21 +1331,38 @@ int OSBDB::rmattr(object_t oid, const char *name, Context *onsafe)
   Dbt asvalue;
   asvalue.set_flags (DB_DBT_MALLOC);
 
-  if (db->get (NULL, &askey, &asvalue, 0) != 0)
-    return -ENOENT;
+  DbTxn *txn = NULL;
+
+  if (transactional)
+    env->txn_begin (NULL, &txn, 0);
+
+  if (db->get (txn, &askey, &asvalue, 0) != 0)
+    {
+      if (txn != NULL)
+        txn->abort();
+      return -ENOENT;
+    }
 
   stored_attrs *sap = (stored_attrs *) asvalue.get_data();
   auto_ptr<stored_attrs> sa (sap);
 
   if (sap->count == 0)
-    return -ENOENT;
+    {
+      if (txn != NULL)
+        txn->abort();
+      return -ENOENT;
+    }
 
   attr_name _name;
   memset(&name, 0, sizeof (_name));
   strncpy (_name.name, name, OSBDB_MAX_ATTR_LEN);
   int ins = binary_search<attr_name> (sap->names, sap->count, _name);
   if (strcmp (sap->names[ins].name, name) != 0)
-    return -ENOENT;
+    {
+      if (txn != NULL)
+        txn->abort();
+      return -ENOENT;
+    }
 
   // Shift the later elements down by one, if needed.
   int n = (sap->count - ins) * sizeof (attr_name);
@@ -1129,18 +1371,27 @@ int OSBDB::rmattr(object_t oid, const char *name, Context *onsafe)
   sap->count--;
   asvalue.set_size(asvalue.get_size() - sizeof (attr_name));
   int ret;
-  if ((ret = db->put (NULL, &askey, &asvalue, 0)) != 0)
+  if ((ret = db->put (txn, &askey, &asvalue, 0)) != 0)
     {
       derr(1) << "put stored_attrs " << db_strerror (ret) << endl;
+      if (txn != NULL)
+        txn->abort();
       return -EIO;
     }
 
   // Remove the attribute.
   attr_id aid = new_attr_id (oid, name);
   Dbt key (&aid, sizeof (aid));
-  if ((ret = db->del (NULL, &key, 0)) != 0)
-    derr(1) << "deleting " << aid << ": " << db_strerror(ret) << endl;
+  if ((ret = db->del (txn, &key, 0)) != 0)
+    {
+      derr(1) << "deleting " << aid << ": " << db_strerror(ret) << endl;
+      if (txn != NULL)
+        txn->abort();
+      return -EIO;
+    }
 
+  if (txn != NULL)
+    txn->commit (0);
   return 0;
 }
 
@@ -1156,6 +1407,8 @@ int OSBDB::listattr(object_t oid, char *attrs, size_t size)
   Dbt value;
   value.set_flags (DB_DBT_MALLOC);
 
+  // XXX Transactions for read atomicity???
+
   int ret;
   if ((ret = db->get (NULL, &key, &value, 0)) != 0)
     {
@@ -1201,8 +1454,12 @@ int OSBDB::collection_setattr(coll_t cid, const char *name,
   stored_attrs *sap = NULL;
   size_t sz = 0;
 
+  DbTxn *txn = NULL;
+  if (transactional)
+    env->txn_begin (NULL, &txn, 0);
+
   dout(3) << "  getting " << aids << endl;
-  if (db->get (NULL, &attrs_key, &attrs_val, 0) != 0)
+  if (db->get (txn, &attrs_key, &attrs_val, 0) != 0)
     {
       dout(2) << "  first attribute" << endl;
       sz = sizeof (stored_attrs);
@@ -1249,8 +1506,12 @@ int OSBDB::collection_setattr(coll_t cid, const char *name,
       newAttrs_val.set_ulen (sz);
       newAttrs_val.set_flags (DB_DBT_USERMEM);
       dout(3) << "  putting " << aids << endl;
-      if (db->put (NULL, &attrs_key, &newAttrs_val, 0) != 0)
-        return -EIO;
+      if (db->put (txn, &attrs_key, &newAttrs_val, 0) != 0)
+        {
+          if (txn != NULL)
+            txn->abort();
+          return -EIO;
+        }
     }
   else
     {
@@ -1264,9 +1525,15 @@ int OSBDB::collection_setattr(coll_t cid, const char *name,
   Dbt attr_key (&aid, sizeof (aid));
   Dbt attr_val ((void *) value, size);
   dout(3) << "  writing attribute key " << aid << endl;
-  if (db->put (NULL, &attr_key, &attr_val, 0) != 0)
-    return -EIO;
+  if (db->put (txn, &attr_key, &attr_val, 0) != 0)
+    {
+      if (txn != NULL)
+        txn->abort();
+      return -EIO;
+    }
 
+  if (txn != NULL)
+    txn->commit (0);
   return 0;
 }
 
@@ -1281,21 +1548,37 @@ int OSBDB::collection_rmattr(coll_t cid, const char *name,
   Dbt asvalue;
   asvalue.set_flags (DB_DBT_MALLOC);
 
-  if (db->get (NULL, &askey, &asvalue, 0) != 0)
-    return -ENOENT;
+  DbTxn *txn = NULL;
+  if (transactional)
+    env->txn_begin (NULL, &txn, 0);
+
+  if (db->get (txn, &askey, &asvalue, 0) != 0)
+    {
+      if (txn != NULL)
+        txn->abort();
+      return -ENOENT;
+    }
 
   stored_attrs *sap = (stored_attrs *) asvalue.get_data();
   auto_ptr<stored_attrs> sa (sap);
 
   if (sap->count == 0)
-    return -ENOENT;
+    {
+      if (txn != NULL)
+        txn->abort();
+      return -ENOENT;
+    }
 
   attr_name _name;
   memset(&name, 0, sizeof (_name));
   strncpy (_name.name, name, OSBDB_MAX_ATTR_LEN);
   int ins = binary_search<attr_name> (sap->names, sap->count, _name);
   if (strcmp (sap->names[ins].name, name) != 0)
-    return -ENOENT;
+    {
+      if (txn != NULL)
+        txn->abort();
+      return -ENOENT;
+    }
 
   // Shift the later elements down by one, if needed.
   int n = (sap->count - ins) * sizeof (attr_name);
@@ -1304,18 +1587,27 @@ int OSBDB::collection_rmattr(coll_t cid, const char *name,
   sap->count--;
   asvalue.set_size(asvalue.get_size() - sizeof (attr_name));
   int ret;
-  if ((ret = db->put (NULL, &askey, &asvalue, 0)) != 0)
+  if ((ret = db->put (txn, &askey, &asvalue, 0)) != 0)
     {
       derr(1) << "put stored_attrs " << db_strerror (ret) << endl;
+      if (txn != NULL)
+        txn->abort();
       return -EIO;
     }
 
   // Remove the attribute.
   coll_attr_id aid = new_coll_attr_id (cid, name);
   Dbt key (&aid, sizeof (aid));
-  if ((ret = db->del (NULL, &key, 0)) != 0)
-    derr(1) << "deleting " << aid << ": " << db_strerror(ret) << endl;
+  if ((ret = db->del (txn, &key, 0)) != 0)
+    {
+      derr(1) << "deleting " << aid << ": " << db_strerror(ret) << endl;
+      if (txn != NULL)
+        txn->abort();
+      return -EIO;
+    }
 
+  if (txn != NULL)
+    txn->commit (0);
   return 0;
 }
 
@@ -1327,6 +1619,8 @@ int OSBDB::collection_getattr(coll_t cid, const char *name,
 
   dout(2) << "collection_getattr " << cid << " " << name << endl;
 
+  // XXX transactions/read isolation?
+
   coll_attr_id caid = new_coll_attr_id (cid, name);
   Dbt key (&caid, sizeof (caid));
   Dbt val (value, size);
@@ -1347,6 +1641,8 @@ int OSBDB::collection_listattr(coll_t cid, char *attrs, size_t size)
 
   dout(2) << "collection_listattr " << cid << endl;
 
+  // XXX transactions/read isolation?
+
   coll_attrs_id caids = new_coll_attrs_id (cid);
   Dbt key (&caids, sizeof_coll_attrs_id());
   Dbt value;
index 9ba42d206d29057e5fc799f1f57a7f409f5df7eb..61cf4b16c48b54033bdd65961001c90441cacf5e 100644 (file)
@@ -12,11 +12,7 @@ Foundation.  See file COPYING. */
 #include <db_cxx.h>
 #include "osd/ObjectStore.h"
 
-// Redefine this to use a different BDB access type. DB_BTREE is
-// probably the only other one that makes sense.
-#ifndef OSBDB_DB_TYPE
-#define OSBDB_DB_TYPE DB_HASH
-#endif // OSBDB_DB_TYPE
+#define OSBDB_MAGIC 0x05BDB
 
 /*
  * Maximum length of an attribute name.
@@ -378,6 +374,15 @@ inline ostream& operator<<(ostream& out, stored_coll *c)
   return out;
 }
 
+class OSBDBException : public std::exception
+{
+  const char *msg;
+
+public:
+  OSBDBException(const char *msg) : msg(msg) { }
+  const char *what() { return msg; }
+};
+
 /*
  * The object store interface for Berkeley DB.
  */
@@ -387,46 +392,17 @@ class OSBDB : public ObjectStore
   DbEnv *env;
   Db *db;
   string device;
+  string env_dir;
   bool mounted;
   bool opened;
+  bool transactional;
 
  public:
 
-  OSBDB(const char *dev)
-    : env(0), db (0), device (dev), mounted(false), opened(false)
+  OSBDB(const char *dev) throw(OSBDBException)
+    : env(0), db (0), device (dev), mounted(false), opened(false),
+      transactional(g_conf.bdbstore_transactional)
   {
-    /*env = new DbEnv (DB_CXX_NO_EXCEPTIONS);
-    env->set_error_stream (&std::cerr);
-    // WTF? You can't open an env if you set this flag here, but BDB
-    // says you also can't set it after you open the env.
-    //env->set_flags (DB_LOG_INMEMORY, 1);
-    char *p = strrchr (dev, '/');
-    int env_flags = (DB_CREATE | DB_THREAD | DB_INIT_LOCK
-                     | DB_INIT_MPOOL | DB_INIT_TXN | DB_INIT_LOG);
-    if (p != NULL)
-      {
-        *p = '\0';
-        if (env->open (dev, env_flags, 0) != 0)
-          {
-            std::cerr << "failed to open environment: "
-                      << dev << std::endl;
-            ::abort();
-          }
-        *p = '/';
-        dev = p+1;
-      }
-    else
-      {
-        if (env->open (NULL, env_flags, 0) != 0)
-          {
-            std::cerr << "failed to open environment: ." << std::endl;
-            ::abort();
-          }
-      }
-
-    // Double WTF: if you remove the DB_LOG_INMEMORY bit, db->open
-    // fails, inexplicably, with EINVAL!*/
-    //    env->set_flags (DB_DIRECT_DB | /*DB_AUTO_COMMIT |*/ DB_LOG_INMEMORY, 1);
   }
 
   ~OSBDB()
@@ -435,11 +411,6 @@ class OSBDB : public ObjectStore
       {
         umount();
       }
-    if (env != NULL)
-      {
-        env->close (0);
-        delete env;
-      }
   }
 
   int mount();
@@ -499,9 +470,9 @@ class OSBDB : public ObjectStore
   void sync();
 
 private:
-  int opendb (DBTYPE type=DB_UNKNOWN, int flags=0);
+  int opendb (DBTYPE type=DB_UNKNOWN, int flags=0, bool new_env=false);
 
   int _setattr(object_t oid, const char *name, const void *value,
-               size_t size, Context *onsync);
+               size_t size, Context *onsync, DbTxn *txn);
   int _getattr(object_t oid, const char *name, void *value, size_t size);
 };
index 0296f05a4934597c32305d6cff86927e84c176f4..ee27a16ee4786a5259c7e0ee42dcc69144d13f19 100644 (file)
@@ -16,6 +16,7 @@ Foundation.  See file COPYING. */
 
 #include <iostream>
 #include <cerrno>
+#include <vector>
 
 #include <fcntl.h>
 #include <sys/mount.h>
@@ -38,6 +39,7 @@ to_msec (struct timeval &time)
 
 int main (int argc, char **argv)
 {
+  vector<char *> args;
   char *osd_name = "ebofs";
   unsigned object_size = 1024;
   unsigned object_count = 1024;
@@ -47,13 +49,13 @@ int main (int argc, char **argv)
   char *mountcmd = "mount /tmp/testos";
   char *umountcmd = "umount /tmp/testos";
 
+  bool ebofs_raw_device = false;
   bool inhibit_remount = (getenv("TESTOS_INHIBIT_REMOUNT") != NULL);
 
   if (argc > 1
       && (strcmp (argv[1], "-h") == 0
           || strcmp (argv[1], "-help") == 0
-          || strcmp (argv[1], "--help") == 0
-          || argc > 6))
+          || strcmp (argv[1], "--help") == 0))
     {
       cout << "usage: " << argv[0] << " [store [object-size [object-count [iterations [seed]]]]]" << endl;
       cout << endl;
@@ -68,16 +70,28 @@ int main (int argc, char **argv)
       exit (0);
     }
 
+  argv_to_vec (argc, argv, args);
+  for (vector<char*>::iterator it = args.begin(); it != args.end();
+       it++)
+    cout << *it << " ";
+  cout << endl;
+  parse_config_options (args);
+  for (vector<char*>::iterator it = args.begin(); it != args.end();
+       it++)
+    cout << *it << " ";
+  cout << endl;
+
+  argc = args.size();
+  if (argc > 0)
+    osd_name = args[0];
   if (argc > 1)
-    osd_name = argv[1];
+    object_size = (unsigned) atol (args[1]);
   if (argc > 2)
-    object_size = (unsigned) atol (argv[2]);
+    object_count = (unsigned) atol (args[2]);
   if (argc > 3)
-    object_count = (unsigned) atol (argv[3]);
+    write_iter = (unsigned) atol (args[3]);
   if (argc > 4)
-    write_iter = (unsigned) atol (argv[4]);
-  if (argc > 5)
-    random_seed = (unsigned) atol (argv[5]);
+    random_seed = (unsigned) atol (args[4]);
 
   // algin object size to 'long'
   object_size = ((object_size + (sizeof (long) - 1)) / sizeof (long)) * sizeof (long);
@@ -86,6 +100,18 @@ int main (int argc, char **argv)
   strcpy (osd_file, "/tmp/testos/testos.XXXXXX");
   mktemp (osd_file);
 
+  if (strcasecmp (osd_name, "ebofs") == 0)
+    {
+      char *dev_env = getenv ("TESTOS_EBOFS_DEV");
+      if (dev_env != NULL)
+        {
+          // Assume it is a true device.
+          strncpy (osd_file, dev_env, 32);
+          inhibit_remount = true;
+          ebofs_raw_device = true;
+        }
+    }
+
   if (!inhibit_remount)
     {
       if (system (mountcmd) != 0)
@@ -98,42 +124,29 @@ int main (int argc, char **argv)
   ObjectStore *os = NULL;
   if (strcasecmp (osd_name, "ebofs") == 0)
     {
-      FILE *f = fopen (osd_file, "w");
-      if (f == NULL)
+      if (!ebofs_raw_device)
         {
-          cerr << "failed to open " << osd_file << ": " << strerror (errno)
-               << endl;
-          exit (1);
+          FILE *f = fopen (osd_file, "w");
+          if (f == NULL)
+            {
+              cerr << "failed to open " << osd_file << ": " << strerror (errno)
+                   << endl;
+              exit (1);
+            }
+          // 1G file.
+          fseek (f, 1024 * 1024 * 1024, SEEK_SET);
+          fputc ('\0', f);
+          fclose (f);
         }
-      // 1G file.
-      fseek (f, 1024 * 1024 * 1024, SEEK_SET);
-      fputc ('\0', f);
-      fclose (f);
-      // 20K cache
-      g_conf.ebofs_bc_size = 5; // times 4K
       os = new Ebofs (osd_file);
     }
   else if (strcasecmp (osd_name, "osbdb") == 0)
     {
-      char *e = getenv ("OSBDB_FFACTOR");
-      if (e != NULL)
-        g_conf.bdbstore_ffactor = atol(e);
-      e = getenv ("OSBDB_NELEM");
-      if (e != NULL)
-        g_conf.bdbstore_nelem = atol(e);
-      e = getenv ("OSBDB_PAGESIZE");
-      if (e != NULL)
-        g_conf.bdbstore_pagesize = atol(e);
-      g_conf.debug_bdbstore = 1;
-      // 20K cache
-      g_conf.bdbstore_cachesize = 20 * 1024;
       os = new OSBDB (osd_file);
     }
   else if (strcasecmp (osd_name, "osbdb-btree") == 0)
     {
       g_conf.bdbstore_btree = true;
-      // 20K cache
-      g_conf.bdbstore_cachesize = 20 * 1024;
       os = new OSBDB (osd_file);
     }
   else
@@ -230,6 +243,17 @@ int main (int argc, char **argv)
       
       os->mount();
 
+      // Shuffle the OIDs.
+      for (int j = 0; j < object_count; j++)
+        {
+          int x = random() % object_count;
+          if (x < 0)
+            x = -x;
+          object_t o = oids[j];
+          oids[j] = oids[x];
+          oids[x] = o;
+        }
+
       begin = g_clock.now();
       for (unsigned o = 0; o < object_count; o++)
         {
@@ -269,32 +293,42 @@ int main (int argc, char **argv)
 
   cerr << "Finished in " << (total_write + total_read) << endl;
 
-  double write_mean = (double) total_write / write_iter;
+  double write_mean = ((double) total_write) / ((double) write_iter);
   double write_sd = 0.0;
   for (unsigned i = 0; i < write_iter; i++)
     {
-      double x = (double) writes[i] - write_mean;
+      double x = ((double) writes[i]) - write_mean;
       write_sd += x * x;
     }
-  write_sd = sqrt (write_sd / write_iter);
+  write_sd = sqrt (write_sd / ((double) write_iter));
 
-  double read_mean = (double) total_read / write_iter;
+  double read_mean = ((double) total_read) / ((double) write_iter);
   double read_sd = 0.0;
   for (unsigned i = 0; i < write_iter; i++)
     {
-      double x = (double) reads[i] - read_mean;
+      double x = ((double) reads[i]) - read_mean;
       write_sd += x * x;
     }
-  read_sd = sqrt (read_sd / write_iter);
+  read_sd = sqrt (read_sd / ((double) write_iter));
 
   cout << "TESTOS: write " << osd_name << ":" << object_size << ":"
        << object_count << ":" << write_iter << ":" << random_seed
        << " -- " << write_mean << " " << write_sd << endl;
 
+  cout << "TESTOS: write.raw -- ";
+  for (int i = 0; i < write_iter; i++)
+    cout << ((double) writes[i]) << " ";
+  cout << endl;
+
   cout << "TESTOS: read " << osd_name << ":" << object_size << ":"
        << object_count << ":" << write_iter << ":" << random_seed
        << " -- " << read_mean << " " << read_sd << endl;
 
+  cout << "TESTOS: read.raw -- ";
+  for (int i = 0; i < write_iter; i++)
+    cout << ((double) reads[i]) << " ";
+  cout << endl;
+
   unlink (osd_file);
   if (!inhibit_remount)
     {
diff --git a/trunk/ceph/test/testosbdb.cc b/trunk/ceph/test/testosbdb.cc
new file mode 100644 (file)
index 0000000..50a4ee6
--- /dev/null
@@ -0,0 +1,215 @@
+/* testosbdb.cc -- test OSBDB.
+   Copyright (C) 2007 Casey Marshall <csm@soe.ucsc.edu> */
+
+
+#include <iostream>
+#include "osbdb/OSBDB.h"
+
+using namespace std;
+
+int
+main (int argc, char **argv)
+{
+  vector<char *> args;
+  argv_to_vec (argc, argv, args);
+  parse_config_options (args);
+
+  g_conf.debug_bdbstore = 3;
+  //g_conf.bdbstore_btree = true;
+  char dbfile[256];
+  strncpy (dbfile, "/tmp/testosbdb/db.XXXXXX", 256);
+  mktemp (dbfile);
+  OSBDB *os = new OSBDB(dbfile);
+  auto_ptr<OSBDB> osPtr (os);
+  os->mkfs();
+  os->mount();
+
+  // Put an object.
+  object_t oid (0xDEADBEEF00000000ULL, 0xFEEDFACE);
+
+  cout << "sizeof oid_t is " << sizeof (oid_t) << endl;
+  cout << "offsetof oid_t.id " << offsetof (oid_t, id) << endl;
+
+  cout << sizeof (object_t) << endl;
+  cout << sizeof (oid.ino) << endl;
+  cout << sizeof (oid.bno) << endl;
+  cout << sizeof (oid.rev) << endl;
+
+  // Shouldn't be there.
+  if (os->exists (oid))
+    {
+      cout << "FAIL: oid shouldn't be there " << oid << endl;
+    }
+
+  // Write an object.
+  char *x = (char *) malloc (1024);
+  memset(x, 0xaa, 1024);
+  bufferptr bp (x, 1024);
+  bufferlist bl;
+  bl.push_back (bp);
+
+  if (os->write (oid, 0L, 1024, bl, NULL) != 1024)
+    {
+      cout << "FAIL: writing object" << endl;
+    }
+
+  os->sync();
+
+  // Should be there.
+  if (!os->exists (oid))
+    {
+      cout << "FAIL: oid should be there: " << oid << endl;
+    }
+
+  memset(x, 0, 1024);
+  if (os->read (oid, 0, 1024, bl) != 1024)
+    {
+      cout << "FAIL: reading object" << endl;
+    }
+
+  for (int i = 0; i < 1024; i++)
+    {
+      if ((x[i] & 0xFF) != 0xaa)
+        {
+          cout << "FAIL: data read out is different" << endl;
+          break;
+        }
+    }
+
+  // Set some attributes
+  if (os->setattr (oid, "alpha", "value", strlen ("value")) != 0)
+    {
+      cout << "FAIL: set attribute" << endl;
+    }
+  if (os->setattr (oid, "beta", "value", strlen ("value")) != 0)
+    {
+      cout << "FAIL: set attribute" << endl;
+    }
+  if (os->setattr (oid, "gamma", "value", strlen ("value")) != 0)
+    {
+      cout << "FAIL: set attribute" << endl;
+    }
+  if (os->setattr (oid, "fred", "value", strlen ("value")) != 0)
+    {
+      cout << "FAIL: set attribute" << endl;
+    }
+
+  char *attrs = (char *) malloc (1024);
+  if (os->listattr (oid, attrs, 1024) != 0)
+    {
+      cout << "FAIL: listing attributes" << endl;
+    }
+  else
+    {
+      char *p = attrs;
+      if (strcmp (p, "alpha") != 0)
+        {
+          cout << "FAIL: should be \"alpha:\" \"" << p << "\"" << endl;
+        }
+      p = p + strlen (p) + 1;
+      if (strcmp (p, "beta") != 0)
+        {
+          cout << "FAIL: should be \"beta:\" \"" << p << "\"" << endl;
+        }
+      p = p + strlen (p) + 1;
+      if (strcmp (p, "fred") != 0)
+        {
+          cout << "FAIL: should be \"fred:\" \"" << p << "\"" << endl;
+        }
+      p = p + strlen (p) + 1;
+      if (strcmp (p, "gamma") != 0)
+        {
+          cout << "FAIL: should be \"gamma:\" \"" << p << "\"" << endl;
+        }
+    }
+
+  coll_t cid = 0xCAFEBABE;
+  if (os->create_collection (cid) != 0)
+    {
+      cout << "FAIL: create_collection" << endl;
+    }
+  if (os->create_collection (cid + 10) != 0)
+    {
+      cout << "FAIL: create_collection" << endl;
+    }
+  if (os->create_collection (cid + 5) != 0)
+    {
+      cout << "FAIL: create_collection" << endl;
+    }
+  if (os->create_collection (42) != 0)
+    {
+      cout << "FAIL: create_collection" << endl;
+    }
+
+  if (os->collection_add (cid, oid) != 0)
+    {
+      cout << "FAIL: collection_add" << endl;
+    }
+
+  list<coll_t> ls;
+  if (os->list_collections (ls) < 0)
+    {
+      cout << "FAIL: list_collections" << endl;
+    }
+  cout << "collections: ";
+  for (list<coll_t>::iterator it = ls.begin(); it != ls.end(); it++)
+    {
+      cout << *it << ", ";
+    }
+  cout << endl;
+
+  if (os->destroy_collection (0xCAFEBABE + 10) != 0)
+    {
+      cout << "FAIL: destroy_collection" << endl;
+    }
+
+  if (os->destroy_collection (0xCAFEBADE + 10) == 0)
+    {
+      cout << "FAIL: destroy_collection" << endl;
+    }
+
+  object_t oid2 (12345, 12345);
+  for (int i = 0; i < 8; i++)
+    {
+      oid2.rev++;
+      if (os->collection_add (cid, oid2) != 0)
+        {
+          cout << "FAIL: collection_add" << endl;
+        }
+    }
+  for (int i = 0; i < 8; i++)
+    {
+      if (os->collection_remove (cid, oid2) != 0)
+        {
+          cout << "FAIL: collection_remove" << endl;
+        }
+      oid2.rev--;
+    }
+
+  // Truncate the object.
+  if (os->truncate (oid, 512, NULL) != 0)
+    {
+      cout << "FAIL: truncate" << endl;
+    }
+
+  // Expand the object.
+  if (os->truncate (oid, 1200, NULL) != 0)
+    {
+      cout << "FAIL: expand" << endl;
+    }
+
+  // Delete the object.
+  if (os->remove (oid) != 0)
+    {
+      cout << "FAIL: could not remove object" << endl;
+    }
+
+  // Shouldn't be there
+  if (os->exists (oid))
+    {
+      cout << "FAIL: should not be there" << endl;
+    }
+
+  os->sync();
+  exit (0);
+}