\f // Management.
-int OSBDB::opendb(DBTYPE type, int flags)
+int OSBDB::opendb(DBTYPE type, int flags, bool new_env)
{
+ // BDB transactions require an environment.
+ if (g_conf.bdbstore_transactional)
+ {
+ env = new DbEnv (DB_CXX_NO_EXCEPTIONS);
+ env->set_error_stream (&std::cerr);
+ env->set_message_stream (&std::cout);
+ env->set_flags (DB_LOG_INMEMORY, 1);
+ //env->set_flags (DB_DIRECT_DB, 1);
+ int env_flags = (DB_CREATE
+ | DB_THREAD
+ | DB_INIT_LOCK
+ | DB_INIT_MPOOL
+ | DB_INIT_TXN
+ | DB_INIT_LOG
+ | DB_PRIVATE);
+ //if (new_env)
+ // env->remove (env_dir.c_str(), 0);
+ if (env->open (NULL, env_flags, 0) != 0)
+ {
+ std::cerr << "failed to open environment " << std::endl;
+ return -EIO;
+ }
+
+ }
+
db = new Db(env, 0);
db->set_error_stream (&std::cerr);
db->set_message_stream (&std::cout);
db->set_cachesize (0, g_conf.bdbstore_cachesize, 0);
}
+ flags = flags | DB_THREAD;
+ if (transactional)
+ flags = flags | DB_AUTO_COMMIT;
+
int ret;
if ((ret = db->open (NULL, device.c_str(), NULL, type, flags, 0)) != 0)
{
{
if (!mounted)
return -EINVAL;
- sync();
+
+ dout(2) << "umount" << endl;
+
int ret;
if (opened)
{
+ if (transactional)
+ {
+ env->log_flush (NULL);
+ if ((ret = env->lsn_reset (device.c_str(), 0)) != 0)
+ {
+ derr(1) << "lsn_reset: " << db_strerror (ret) << endl;
+ }
+ }
+
+ db->sync (0);
+
if ((ret = db->close (0)) != 0)
{
derr(1) << "close: " << db_strerror(ret) << endl;
}
delete db;
db = NULL;
+
+ if (env)
+ {
+ env->close (0);
+ delete env;
+ env = NULL;
+ }
}
mounted = false;
opened = false;
dout(2) << "mkfs" << endl;
unlink (device.c_str());
+
int ret;
- if ((ret = opendb((g_conf.bdbstore_btree ? DB_BTREE : DB_HASH), DB_CREATE)) != 0)
+ if ((ret = opendb((g_conf.bdbstore_btree ? DB_BTREE : DB_HASH),
+ DB_CREATE, true)) != 0)
{
derr(1) << "failed to open database: " << device << ": "
- << strerror(ret) << std::endl;
+ << db_strerror(ret) << std::endl;
return -EINVAL;
}
opened = true;
ret = db->truncate (NULL, &c, 0);
if (ret != 0)
{
+ derr(1) << "db truncate failed: " << db_strerror (ret) << endl;
return -EIO; // ???
}
Dbt value (&sb, sizeof (sb));
dout(3) << "..writing superblock" << endl;
- if (db->put (NULL, &key, &value, 0) != 0)
+ if ((ret = db->put (NULL, &key, &value, 0)) != 0)
{
- return -EIO; // ???
+ derr(1) << "failed to write superblock: " << db_strerror (ret)
+ << endl;
+ return -EIO;
}
dout(3) << "..wrote superblock" << endl;
int OSBDB::pick_object_revision_lt(object_t& oid)
{
- if (!mounted)
- return -EINVAL;
-
- // XXX this is pretty lame. Can we do better?
- assert(oid.rev > 0);
- oid.rev--;
- while (oid.rev > 0)
- {
- if (exists (oid))
- {
- return 0;
- }
- oid.rev--;
- }
- return -EEXIST; // FIXME
+ // Not really needed.
+ return -ENOSYS;
}
bool OSBDB::exists(object_t oid)
int OSBDB::statfs (struct statfs *st)
{
- return -ENOSYS;
+ // Hacky?
+ if (::statfs (device.c_str(), st) != 0)
+ return -errno;
+ st->f_type = OSBDB_MAGIC;
+ return 0;
}
int OSBDB::stat(object_t oid, struct stat *st)
dout(2) << "remove " << oid << endl;
+ DbTxn *txn = NULL;
+ if (transactional)
+ env->txn_begin (NULL, &txn, 0);
+
oid_t id;
mkoid(id, oid);
Dbt key (&id, sizeof (oid_t));
db->del (NULL, &key, 0);
object_inode_key _ikey = new_object_inode_key (oid);
Dbt ikey (&_ikey, sizeof_object_inode_key());
- db->del (NULL, &ikey, 0);
+ db->del (txn, &ikey, 0);
attrs_id aids = new_attrs_id (oid);
Dbt askey (&aids, sizeof_attrs_id());
Dbt asval;
asval.set_flags (DB_DBT_MALLOC);
- if (db->get (NULL, &askey, &asval, 0) == 0)
+ if (db->get (txn, &askey, &asval, 0) == 0)
{
// We have attributes; remove them.
stored_attrs *sap = (stored_attrs *) asval.get_data();
{
attr_id aid = new_attr_id (oid, sap->names[i].name);
Dbt akey (&aid, sizeof (aid));
- db->del (NULL, &akey, 0);
+ db->del (txn, &akey, 0);
}
- db->del (NULL, &askey, 0);
+ db->del (txn, &askey, 0);
}
+ if (txn)
+ txn->commit (0);
return 0;
}
if (size > 0xFFFFFFFF)
return -ENOSPC;
+ DbTxn *txn = NULL;
+
+ if (transactional)
+ env->txn_begin (NULL, &txn, 0);
+
object_inode_key ikey = new_object_inode_key (oid);
stored_object obj;
Dbt key (&ikey, sizeof_object_inode_key());
value.set_ulen (sizeof (obj));
value.set_flags (DB_DBT_USERMEM);
- if (db->get (NULL, &key, &value, 0) != 0)
- return -ENOENT;
+ if (db->get (txn, &key, &value, 0) != 0)
+ {
+ if (txn)
+ txn->abort();
+ return -ENOENT;
+ }
if (obj.length < size)
{
newVal.set_dlen (1);
newVal.set_ulen (1);
newVal.set_flags (DB_DBT_PARTIAL);
- if (db->put (NULL, &okey, &newVal, 0) != 0)
- return -EIO;
+ if (db->put (txn, &okey, &newVal, 0) != 0)
+ {
+ if (txn)
+ txn->abort();
+ return -EIO;
+ }
obj.length = size;
value.set_ulen (sizeof (obj));
- if (db->put (NULL, &key, &value, 0) != 0)
- return -EIO;
+ if (db->put (txn, &key, &value, 0) != 0)
+ {
+ if (txn)
+ txn->abort();
+ return -EIO;
+ }
}
else if (obj.length > size)
{
Dbt tval (&obj, sizeof (obj));
tval.set_ulen (sizeof (obj));
tval.set_flags (DB_DBT_USERMEM);
- if (db->put (NULL, &key, &tval, 0) != 0)
- return -EIO;
+ if (db->put (txn, &key, &tval, 0) != 0)
+ {
+ if (txn)
+ txn->abort();
+ return -EIO;
+ }
if (size == 0)
{
char x[1];
mkoid (id, oid);
Dbt okey (&id, sizeof (oid_t));
Dbt oval (&x, 0);
- if (db->put (NULL, &okey, &oval, 0) != 0)
- return -EIO;
+ if (db->put (txn, &okey, &oval, 0) != 0)
+ {
+ if (txn)
+ txn->abort();
+ return -EIO;
+ }
}
else
{
Dbt okey (&id, sizeof (oid_t));
Dbt oval;
oval.set_flags (DB_DBT_MALLOC);
- if (db->get (NULL, &okey, &oval, 0) != 0)
- return -EIO;
+ if (db->get (txn, &okey, &oval, 0) != 0)
+ {
+ if (txn)
+ txn->abort();
+ return -EIO;
+ }
auto_ptr<char> ovalPtr ((char *) oval.get_data());
oval.set_size ((size_t) size);
oval.set_ulen ((size_t) size);
- if (db->put (NULL, &okey, &oval, 0) != 0)
- return -EIO;
+ if (db->put (txn, &okey, &oval, 0) != 0)
+ {
+ if (txn)
+ txn->abort();
+ return -EIO;
+ }
}
}
+ if (txn)
+ txn->commit (0);
+
return 0;
}
<< len << endl;
DbTxn *txn = NULL;
- //env->txn_begin (NULL, &txn, 0);
+ if (transactional)
+ env->txn_begin (NULL, &txn, 0);
object_inode_key _ikey = new_object_inode_key (oid);
stored_object obj;
int ret;
if ((ret = db->get (txn, &ikey, &ival, 0)) != 0)
{
- //txn->abort();
+ if (txn)
+ txn->abort();
derr(1) << "get returned " << db_strerror (ret) << endl;
return -ENOENT;
}
if ((ret = db->get (txn, &key, &value, 0)) != 0)
{
derr(1) << " get returned " << db_strerror (ret) << endl;
- //txn->abort();
+ if (txn)
+ txn->abort();
return -EIO;
}
}
value.set_ulen (len);
value.set_flags (DB_DBT_USERMEM | DB_DBT_PARTIAL);
dout(3) << " getting " << oid << endl;
- if ((ret = db->get (NULL, &key, &value, 0)) != 0)
+ if ((ret = db->get (txn, &key, &value, 0)) != 0)
{
derr(1) << "get returned " << db_strerror (ret) << endl;
- //txn->abort();
+ if (txn)
+ txn->abort();
return -EIO;
}
}
- //txn->commit (0);
+ if (txn)
+ txn->commit (0);
return len;
}
return -ENOSPC;
DbTxn *txn = NULL;
- //env->txn_begin (NULL, &txn, 0);
+ if (transactional)
+ env->txn_begin (txn, &txn, 0);
object_inode_key _ikey = new_object_inode_key (oid);
stored_object obj;
<< obj << endl;
if ((ret = db->put (txn, &ikey, &ival, 0)) != 0)
{
- derr(1) << " put returned " << db_strerror (ret) << endl;
+ derr(1) << " put returned " << db_strerror (ret) << endl;
+ if (txn)
+ txn->abort();
return -EIO;
}
<< obj.length << " bytes)" << endl;
if ((ret = db->put (txn, &key, &value, 0)) != 0)
{
- derr(1) << " put returned " << db_strerror (ret) << endl;
+ derr(1) << " put returned " << db_strerror (ret) << endl;
+ if (txn)
+ txn->abort();
return -EIO;
}
+
+ if (txn)
+ txn->commit (0);
return len;
}
obj.length = len;
if ((ret = db->put (txn, &ikey, &ival, 0)) != 0)
{
- derr(1) << " put returned " << db_strerror (ret) << endl;
+ derr(1) << " put returned " << db_strerror (ret) << endl;
+ if (txn)
+ txn->abort();
return -EIO;
}
}
Dbt value (bl.c_str(), len);
if (db->put (txn, &key, &value, 0) != 0)
{
+ if (txn)
+ txn->abort();
return -EIO;
}
}
if (offset + len > obj.length)
{
obj.length = (size_t) offset + len;
- if (db->put (NULL, &ikey, &ival, 0) != 0)
+ if (db->put (txn, &ikey, &ival, 0) != 0)
{
+ if (txn)
+ txn->abort();
return -EIO;
}
}
value.set_dlen (len);
value.set_ulen (len);
value.set_flags (DB_DBT_PARTIAL);
- if (db->put (NULL, &key, &value, 0) != 0)
+ if (db->put (txn, &key, &value, 0) != 0)
{
+ if (txn)
+ txn->abort();
return -EIO;
}
}
+ if (txn)
+ txn->commit (0);
return len;
}
if (exists (noid))
return -EEXIST;
+ DbTxn *txn = NULL;
+ if (transactional)
+ env->txn_begin (NULL, &txn, 0);
+
object_inode_key _ikey = new_object_inode_key (oid);
object_inode_key _nikey = new_object_inode_key (noid);
stored_object obj;
Dbt value;
value.set_flags (DB_DBT_MALLOC);
- if (db->get (NULL, &ikey, &ival, 0) != 0)
- return -ENOENT;
- if (db->get (NULL, &key, &value, 0) != 0)
- return -ENOENT;
+ if (db->get (txn, &ikey, &ival, 0) != 0)
+ {
+ if (txn)
+ txn->abort();
+ return -ENOENT;
+ }
+ if (db->get (txn, &key, &value, 0) != 0)
+ {
+ if (txn)
+ txn->abort();
+ return -ENOENT;
+ }
auto_ptr<char> valueptr ((char *) value.get_data());
- if (db->put (NULL, &nikey, &ival, 0) != 0)
- return -EIO;
- if (db->put (NULL, &nkey, &value, 0) != 0)
- return -EIO;
+ if (db->put (txn, &nikey, &ival, 0) != 0)
+ {
+ if (txn)
+ txn->abort();
+ return -EIO;
+ }
+ if (db->put (txn, &nkey, &value, 0) != 0)
+ {
+ if (txn)
+ txn->abort();
+ return -EIO;
+ }
+ if (txn)
+ txn->commit (0);
return 0;
}
Dbt value;
value.set_flags (DB_DBT_MALLOC);
+ DbTxn *txn = NULL;
+ if (transactional)
+ env->txn_begin (NULL, &txn, 0);
+
stored_colls *scp = NULL;
size_t sz = 0;
bool created = false;
- if (db->get (NULL, &key, &value, 0) != 0)
+ if (db->get (txn, &key, &value, 0) != 0)
{
sz = sizeof (stored_colls) + sizeof (coll_t);
scp = (stored_colls *) malloc (sz);
if (scp->count > 0)
ins = binary_search<coll_t> (scp->colls, scp->count, c);
if (scp->colls[ins] == c)
- return -EEXIST;
+ {
+ if (txn != NULL)
+ txn->abort();
+ return -EEXIST;
+ }
dout(3) << "..insertion point: " << ins << endl;
// Put the modified collection list back.
{
Dbt value2 (scp, sz);
- if (db->put (NULL, &key, &value2, 0) != 0)
+ if (db->put (txn, &key, &value2, 0) != 0)
{
+ if (txn != NULL)
+ txn->abort();
return -EIO;
}
}
new_coll.count = 0;
Dbt coll_key (&c, sizeof (coll_t));
Dbt coll_value (&new_coll, sizeof (stored_coll));
- if (db->put (NULL, &coll_key, &coll_value, 0) != 0)
+ if (db->put (txn, &coll_key, &coll_value, 0) != 0)
{
+ if (txn != NULL)
+ txn->abort();
return -EIO;
}
}
+ if (txn != NULL)
+ txn->commit (0);
return 0;
}
Dbt key (COLLECTIONS_KEY, 1);
Dbt value;
value.set_flags (DB_DBT_MALLOC);
+ DbTxn *txn = NULL;
- if (db->get (NULL, &key, &value, 0) != 0)
+ if (transactional)
+ env->txn_begin (NULL, &txn, 0);
+
+ if (db->get (txn, &key, &value, 0) != 0)
{
+ if (txn != NULL)
+ txn->abort();
return -ENOENT; // XXX
}
auto_ptr<stored_colls> valueBuf (scp);
if (scp->count == 0)
{
+ if (txn != NULL)
+ txn->abort();
return -ENOENT;
}
uint32_t ins = binary_search<coll_t> (scp->colls, scp->count, c);
if (scp->colls[ins] != c)
{
+ if (txn != NULL)
+ txn->abort();
return -ENOENT;
}
// Modify the record size to be one less.
Dbt nvalue (scp, value.get_size() - sizeof (coll_t));
nvalue.set_flags (DB_DBT_USERMEM);
- if (db->put (NULL, &key, &nvalue, 0) != 0)
+ if (db->put (txn, &key, &nvalue, 0) != 0)
{
+ if (txn != NULL)
+ txn->abort();
return -EIO;
}
// Delete the collection.
Dbt collKey (&c, sizeof (coll_t));
- if (db->del (NULL, &collKey, 0) != 0)
+ if (db->del (txn, &collKey, 0) != 0)
{
+ if (txn != NULL)
+ txn->abort();
return -EIO;
}
+ if (txn != NULL)
+ txn->commit (0);
return 0;
}
return -EINVAL;
dout(2) << "collection_stat " << c << endl;
+ // XXX is this needed?
return -ENOSYS;
}
Dbt key (&c, sizeof (coll_t));
Dbt value;
value.set_flags (DB_DBT_MALLOC);
+ DbTxn *txn = NULL;
- if (db->get (NULL, &key, &value, 0) != 0)
+ if (transactional)
+ env->txn_begin (NULL, &txn, 0);
+
+ if (db->get (txn, &key, &value, 0) != 0)
{
+ if (txn != NULL)
+ txn->abort();
return -ENOENT;
}
// Already there?
if (scp->objects[ins] == o)
{
+ if (txn != NULL)
+ txn->abort();
return -EEXIST;
}
}
dout(3) << "..collection: " << scp << endl;
Dbt nvalue (scp, sz);
- if (db->put (NULL, &key, &nvalue, 0) != 0)
+ if (db->put (txn, &key, &nvalue, 0) != 0)
{
+ if (txn != NULL)
+ txn->abort();
return -EIO;
}
+ if (txn != NULL)
+ txn->commit (0);
return 0;
}
Dbt key (&c, sizeof (coll_t));
Dbt value;
value.set_flags (DB_DBT_MALLOC);
+ DbTxn *txn = NULL;
+
+ if (transactional)
+ env->txn_begin (NULL, &txn, 0);
- if (db->get (NULL, &key, &value, 0) != 0)
+ if (db->get (txn, &key, &value, 0) != 0)
{
+ if (txn != NULL)
+ txn->abort();
return -ENOENT;
}
if (scp->count == 0)
{
+ if (txn != NULL)
+ txn->abort();
return -ENOENT;
}
uint32_t ins = binary_search<object_t> (scp->objects, scp->count, o);
if (scp->objects[ins] != o)
{
+ if (txn != NULL)
+ txn->abort();
return -ENOENT;
}
dout(3) << "..collection " << scp << endl;
Dbt nval (scp, value.get_size() - sizeof (object_t));
- if (db->put (NULL, &key, &nval, 0) != 0)
+ if (db->put (txn, &key, &nval, 0) != 0)
{
+ if (txn != NULL)
+ txn->abort();
return -EIO;
}
+ if (txn != NULL)
+ txn->commit (0);
return 0;
}
Dbt key (&c, sizeof (coll_t));
Dbt value;
- if (db->get (NULL, &key, &value, 0) != 0)
- return -ENOENT;
+ DbTxn *txn = NULL;
+
+ if (transactional)
+ env->txn_begin (NULL, &txn, 0);
+
+ if (db->get (txn, &key, &value, 0) != 0)
+ {
+ if (txn != NULL)
+ txn->abort();
+ return -ENOENT;
+ }
stored_coll *scp = (stored_coll *) value.get_data();
auto_ptr<stored_coll> sc (scp);
for (uint32_t i = 0; i < scp->count; i++)
o.push_back (scp->objects[i]);
+ if (txn != NULL)
+ txn->commit (0);
return 0;
}
\f // Attributes
int OSBDB::_setattr(object_t oid, const char *name,
- const void *value, size_t size, Context *onsafe)
+ const void *value, size_t size, Context *onsafe,
+ DbTxn *txn)
{
if (!mounted)
return -EINVAL;
size_t sz = 0;
dout(3) << " getting " << aids << endl;
- if (db->get (NULL, &attrs_key, &attrs_val, 0) != 0)
+ if (db->get (txn, &attrs_key, &attrs_val, 0) != 0)
{
dout(2) << " first attribute" << endl;
sz = sizeof (stored_attrs);
newAttrs_val.set_ulen (sz);
newAttrs_val.set_flags (DB_DBT_USERMEM);
dout(3) << " putting " << aids << endl;
- if (db->put (NULL, &attrs_key, &newAttrs_val, 0) != 0)
+ if (db->put (txn, &attrs_key, &newAttrs_val, 0) != 0)
return -EIO;
}
else
Dbt attr_key (&aid, sizeof (aid));
Dbt attr_val ((void *) value, size);
dout(3) << " writing attribute key " << aid << endl;
- if (db->put (NULL, &attr_key, &attr_val, 0) != 0)
+ if (db->put (txn, &attr_key, &attr_val, 0) != 0)
return -EIO;
return 0;
if (!mounted)
return -EINVAL;
+ DbTxn *txn = NULL;
+ if (transactional)
+ env->txn_begin (NULL, &txn, 0);
+
dout(2) << "setattr " << oid << ":" << name << " => ("
<< size << " bytes)" << endl;
- int ret = _setattr (oid, name, value, size, onsafe);
+ int ret = _setattr (oid, name, value, size, onsafe, txn);
+ if (ret == 0)
+ {
+ if (txn != NULL)
+ txn->commit (0);
+ }
+ else
+ {
+ if (txn != NULL)
+ txn->abort();
+ }
return ret;
}
if (!mounted)
return -EINVAL;
+ DbTxn *txn = NULL;
+
+ if (transactional)
+ env->txn_begin (NULL, &txn, 0);
+
map<string,bufferptr>::iterator it;
for (it = aset.begin(); it != aset.end(); it++)
{
string name = it->first;
bufferptr value = it->second;
int ret = _setattr (oid, name.c_str(), value.c_str(),
- value.length(), onsafe);
+ value.length(), onsafe, txn);
if (ret != 0)
{
+ if (txn != NULL)
+ txn->abort();
return ret;
}
}
+
+ if (txn != NULL)
+ txn->commit (0);
return 0;
}
Dbt asvalue;
asvalue.set_flags (DB_DBT_MALLOC);
- if (db->get (NULL, &askey, &asvalue, 0) != 0)
- return -ENOENT;
+ DbTxn *txn = NULL;
+
+ if (transactional)
+ env->txn_begin (NULL, &txn, 0);
+
+ if (db->get (txn, &askey, &asvalue, 0) != 0)
+ {
+ if (txn != NULL)
+ txn->abort();
+ return -ENOENT;
+ }
stored_attrs *sap = (stored_attrs *) asvalue.get_data();
auto_ptr<stored_attrs> sa (sap);
if (sap->count == 0)
- return -ENOENT;
+ {
+ if (txn != NULL)
+ txn->abort();
+ return -ENOENT;
+ }
attr_name _name;
memset(&name, 0, sizeof (_name));
strncpy (_name.name, name, OSBDB_MAX_ATTR_LEN);
int ins = binary_search<attr_name> (sap->names, sap->count, _name);
if (strcmp (sap->names[ins].name, name) != 0)
- return -ENOENT;
+ {
+ if (txn != NULL)
+ txn->abort();
+ return -ENOENT;
+ }
// Shift the later elements down by one, if needed.
int n = (sap->count - ins) * sizeof (attr_name);
sap->count--;
asvalue.set_size(asvalue.get_size() - sizeof (attr_name));
int ret;
- if ((ret = db->put (NULL, &askey, &asvalue, 0)) != 0)
+ if ((ret = db->put (txn, &askey, &asvalue, 0)) != 0)
{
derr(1) << "put stored_attrs " << db_strerror (ret) << endl;
+ if (txn != NULL)
+ txn->abort();
return -EIO;
}
// Remove the attribute.
attr_id aid = new_attr_id (oid, name);
Dbt key (&aid, sizeof (aid));
- if ((ret = db->del (NULL, &key, 0)) != 0)
- derr(1) << "deleting " << aid << ": " << db_strerror(ret) << endl;
+ if ((ret = db->del (txn, &key, 0)) != 0)
+ {
+ derr(1) << "deleting " << aid << ": " << db_strerror(ret) << endl;
+ if (txn != NULL)
+ txn->abort();
+ return -EIO;
+ }
+ if (txn != NULL)
+ txn->commit (0);
return 0;
}
Dbt value;
value.set_flags (DB_DBT_MALLOC);
+ // XXX Transactions for read atomicity???
+
int ret;
if ((ret = db->get (NULL, &key, &value, 0)) != 0)
{
stored_attrs *sap = NULL;
size_t sz = 0;
+ DbTxn *txn = NULL;
+ if (transactional)
+ env->txn_begin (NULL, &txn, 0);
+
dout(3) << " getting " << aids << endl;
- if (db->get (NULL, &attrs_key, &attrs_val, 0) != 0)
+ if (db->get (txn, &attrs_key, &attrs_val, 0) != 0)
{
dout(2) << " first attribute" << endl;
sz = sizeof (stored_attrs);
newAttrs_val.set_ulen (sz);
newAttrs_val.set_flags (DB_DBT_USERMEM);
dout(3) << " putting " << aids << endl;
- if (db->put (NULL, &attrs_key, &newAttrs_val, 0) != 0)
- return -EIO;
+ if (db->put (txn, &attrs_key, &newAttrs_val, 0) != 0)
+ {
+ if (txn != NULL)
+ txn->abort();
+ return -EIO;
+ }
}
else
{
Dbt attr_key (&aid, sizeof (aid));
Dbt attr_val ((void *) value, size);
dout(3) << " writing attribute key " << aid << endl;
- if (db->put (NULL, &attr_key, &attr_val, 0) != 0)
- return -EIO;
+ if (db->put (txn, &attr_key, &attr_val, 0) != 0)
+ {
+ if (txn != NULL)
+ txn->abort();
+ return -EIO;
+ }
+ if (txn != NULL)
+ txn->commit (0);
return 0;
}
Dbt asvalue;
asvalue.set_flags (DB_DBT_MALLOC);
- if (db->get (NULL, &askey, &asvalue, 0) != 0)
- return -ENOENT;
+ DbTxn *txn = NULL;
+ if (transactional)
+ env->txn_begin (NULL, &txn, 0);
+
+ if (db->get (txn, &askey, &asvalue, 0) != 0)
+ {
+ if (txn != NULL)
+ txn->abort();
+ return -ENOENT;
+ }
stored_attrs *sap = (stored_attrs *) asvalue.get_data();
auto_ptr<stored_attrs> sa (sap);
if (sap->count == 0)
- return -ENOENT;
+ {
+ if (txn != NULL)
+ txn->abort();
+ return -ENOENT;
+ }
attr_name _name;
memset(&name, 0, sizeof (_name));
strncpy (_name.name, name, OSBDB_MAX_ATTR_LEN);
int ins = binary_search<attr_name> (sap->names, sap->count, _name);
if (strcmp (sap->names[ins].name, name) != 0)
- return -ENOENT;
+ {
+ if (txn != NULL)
+ txn->abort();
+ return -ENOENT;
+ }
// Shift the later elements down by one, if needed.
int n = (sap->count - ins) * sizeof (attr_name);
sap->count--;
asvalue.set_size(asvalue.get_size() - sizeof (attr_name));
int ret;
- if ((ret = db->put (NULL, &askey, &asvalue, 0)) != 0)
+ if ((ret = db->put (txn, &askey, &asvalue, 0)) != 0)
{
derr(1) << "put stored_attrs " << db_strerror (ret) << endl;
+ if (txn != NULL)
+ txn->abort();
return -EIO;
}
// Remove the attribute.
coll_attr_id aid = new_coll_attr_id (cid, name);
Dbt key (&aid, sizeof (aid));
- if ((ret = db->del (NULL, &key, 0)) != 0)
- derr(1) << "deleting " << aid << ": " << db_strerror(ret) << endl;
+ if ((ret = db->del (txn, &key, 0)) != 0)
+ {
+ derr(1) << "deleting " << aid << ": " << db_strerror(ret) << endl;
+ if (txn != NULL)
+ txn->abort();
+ return -EIO;
+ }
+ if (txn != NULL)
+ txn->commit (0);
return 0;
}
dout(2) << "collection_getattr " << cid << " " << name << endl;
+ // XXX transactions/read isolation?
+
coll_attr_id caid = new_coll_attr_id (cid, name);
Dbt key (&caid, sizeof (caid));
Dbt val (value, size);
dout(2) << "collection_listattr " << cid << endl;
+ // XXX transactions/read isolation?
+
coll_attrs_id caids = new_coll_attrs_id (cid);
Dbt key (&caids, sizeof_coll_attrs_id());
Dbt value;
#include <iostream>
#include <cerrno>
+#include <vector>
#include <fcntl.h>
#include <sys/mount.h>
int main (int argc, char **argv)
{
+ vector<char *> args;
char *osd_name = "ebofs";
unsigned object_size = 1024;
unsigned object_count = 1024;
char *mountcmd = "mount /tmp/testos";
char *umountcmd = "umount /tmp/testos";
+ bool ebofs_raw_device = false;
bool inhibit_remount = (getenv("TESTOS_INHIBIT_REMOUNT") != NULL);
if (argc > 1
&& (strcmp (argv[1], "-h") == 0
|| strcmp (argv[1], "-help") == 0
- || strcmp (argv[1], "--help") == 0
- || argc > 6))
+ || strcmp (argv[1], "--help") == 0))
{
cout << "usage: " << argv[0] << " [store [object-size [object-count [iterations [seed]]]]]" << endl;
cout << endl;
exit (0);
}
+ argv_to_vec (argc, argv, args);
+ for (vector<char*>::iterator it = args.begin(); it != args.end();
+ it++)
+ cout << *it << " ";
+ cout << endl;
+ parse_config_options (args);
+ for (vector<char*>::iterator it = args.begin(); it != args.end();
+ it++)
+ cout << *it << " ";
+ cout << endl;
+
+ argc = args.size();
+ if (argc > 0)
+ osd_name = args[0];
if (argc > 1)
- osd_name = argv[1];
+ object_size = (unsigned) atol (args[1]);
if (argc > 2)
- object_size = (unsigned) atol (argv[2]);
+ object_count = (unsigned) atol (args[2]);
if (argc > 3)
- object_count = (unsigned) atol (argv[3]);
+ write_iter = (unsigned) atol (args[3]);
if (argc > 4)
- write_iter = (unsigned) atol (argv[4]);
- if (argc > 5)
- random_seed = (unsigned) atol (argv[5]);
+ random_seed = (unsigned) atol (args[4]);
// algin object size to 'long'
object_size = ((object_size + (sizeof (long) - 1)) / sizeof (long)) * sizeof (long);
strcpy (osd_file, "/tmp/testos/testos.XXXXXX");
mktemp (osd_file);
+ if (strcasecmp (osd_name, "ebofs") == 0)
+ {
+ char *dev_env = getenv ("TESTOS_EBOFS_DEV");
+ if (dev_env != NULL)
+ {
+ // Assume it is a true device.
+ strncpy (osd_file, dev_env, 32);
+ inhibit_remount = true;
+ ebofs_raw_device = true;
+ }
+ }
+
if (!inhibit_remount)
{
if (system (mountcmd) != 0)
ObjectStore *os = NULL;
if (strcasecmp (osd_name, "ebofs") == 0)
{
- FILE *f = fopen (osd_file, "w");
- if (f == NULL)
+ if (!ebofs_raw_device)
{
- cerr << "failed to open " << osd_file << ": " << strerror (errno)
- << endl;
- exit (1);
+ FILE *f = fopen (osd_file, "w");
+ if (f == NULL)
+ {
+ cerr << "failed to open " << osd_file << ": " << strerror (errno)
+ << endl;
+ exit (1);
+ }
+ // 1G file.
+ fseek (f, 1024 * 1024 * 1024, SEEK_SET);
+ fputc ('\0', f);
+ fclose (f);
}
- // 1G file.
- fseek (f, 1024 * 1024 * 1024, SEEK_SET);
- fputc ('\0', f);
- fclose (f);
- // 20K cache
- g_conf.ebofs_bc_size = 5; // times 4K
os = new Ebofs (osd_file);
}
else if (strcasecmp (osd_name, "osbdb") == 0)
{
- char *e = getenv ("OSBDB_FFACTOR");
- if (e != NULL)
- g_conf.bdbstore_ffactor = atol(e);
- e = getenv ("OSBDB_NELEM");
- if (e != NULL)
- g_conf.bdbstore_nelem = atol(e);
- e = getenv ("OSBDB_PAGESIZE");
- if (e != NULL)
- g_conf.bdbstore_pagesize = atol(e);
- g_conf.debug_bdbstore = 1;
- // 20K cache
- g_conf.bdbstore_cachesize = 20 * 1024;
os = new OSBDB (osd_file);
}
else if (strcasecmp (osd_name, "osbdb-btree") == 0)
{
g_conf.bdbstore_btree = true;
- // 20K cache
- g_conf.bdbstore_cachesize = 20 * 1024;
os = new OSBDB (osd_file);
}
else
os->mount();
+ // Shuffle the OIDs.
+ for (int j = 0; j < object_count; j++)
+ {
+ int x = random() % object_count;
+ if (x < 0)
+ x = -x;
+ object_t o = oids[j];
+ oids[j] = oids[x];
+ oids[x] = o;
+ }
+
begin = g_clock.now();
for (unsigned o = 0; o < object_count; o++)
{
cerr << "Finished in " << (total_write + total_read) << endl;
- double write_mean = (double) total_write / write_iter;
+ double write_mean = ((double) total_write) / ((double) write_iter);
double write_sd = 0.0;
for (unsigned i = 0; i < write_iter; i++)
{
- double x = (double) writes[i] - write_mean;
+ double x = ((double) writes[i]) - write_mean;
write_sd += x * x;
}
- write_sd = sqrt (write_sd / write_iter);
+ write_sd = sqrt (write_sd / ((double) write_iter));
- double read_mean = (double) total_read / write_iter;
+ double read_mean = ((double) total_read) / ((double) write_iter);
double read_sd = 0.0;
for (unsigned i = 0; i < write_iter; i++)
{
- double x = (double) reads[i] - read_mean;
+ double x = ((double) reads[i]) - read_mean;
write_sd += x * x;
}
- read_sd = sqrt (read_sd / write_iter);
+ read_sd = sqrt (read_sd / ((double) write_iter));
cout << "TESTOS: write " << osd_name << ":" << object_size << ":"
<< object_count << ":" << write_iter << ":" << random_seed
<< " -- " << write_mean << " " << write_sd << endl;
+ cout << "TESTOS: write.raw -- ";
+ for (int i = 0; i < write_iter; i++)
+ cout << ((double) writes[i]) << " ";
+ cout << endl;
+
cout << "TESTOS: read " << osd_name << ":" << object_size << ":"
<< object_count << ":" << write_iter << ":" << random_seed
<< " -- " << read_mean << " " << read_sd << endl;
+ cout << "TESTOS: read.raw -- ";
+ for (int i = 0; i < write_iter; i++)
+ cout << ((double) reads[i]) << " ";
+ cout << endl;
+
unlink (osd_file);
if (!inhibit_remount)
{
--- /dev/null
+/* testosbdb.cc -- test OSBDB.
+ Copyright (C) 2007 Casey Marshall <csm@soe.ucsc.edu> */
+
+
+#include <iostream>
+#include "osbdb/OSBDB.h"
+
+using namespace std;
+
+int
+main (int argc, char **argv)
+{
+ vector<char *> args;
+ argv_to_vec (argc, argv, args);
+ parse_config_options (args);
+
+ g_conf.debug_bdbstore = 3;
+ //g_conf.bdbstore_btree = true;
+ char dbfile[256];
+ strncpy (dbfile, "/tmp/testosbdb/db.XXXXXX", 256);
+ mktemp (dbfile);
+ OSBDB *os = new OSBDB(dbfile);
+ auto_ptr<OSBDB> osPtr (os);
+ os->mkfs();
+ os->mount();
+
+ // Put an object.
+ object_t oid (0xDEADBEEF00000000ULL, 0xFEEDFACE);
+
+ cout << "sizeof oid_t is " << sizeof (oid_t) << endl;
+ cout << "offsetof oid_t.id " << offsetof (oid_t, id) << endl;
+
+ cout << sizeof (object_t) << endl;
+ cout << sizeof (oid.ino) << endl;
+ cout << sizeof (oid.bno) << endl;
+ cout << sizeof (oid.rev) << endl;
+
+ // Shouldn't be there.
+ if (os->exists (oid))
+ {
+ cout << "FAIL: oid shouldn't be there " << oid << endl;
+ }
+
+ // Write an object.
+ char *x = (char *) malloc (1024);
+ memset(x, 0xaa, 1024);
+ bufferptr bp (x, 1024);
+ bufferlist bl;
+ bl.push_back (bp);
+
+ if (os->write (oid, 0L, 1024, bl, NULL) != 1024)
+ {
+ cout << "FAIL: writing object" << endl;
+ }
+
+ os->sync();
+
+ // Should be there.
+ if (!os->exists (oid))
+ {
+ cout << "FAIL: oid should be there: " << oid << endl;
+ }
+
+ memset(x, 0, 1024);
+ if (os->read (oid, 0, 1024, bl) != 1024)
+ {
+ cout << "FAIL: reading object" << endl;
+ }
+
+ for (int i = 0; i < 1024; i++)
+ {
+ if ((x[i] & 0xFF) != 0xaa)
+ {
+ cout << "FAIL: data read out is different" << endl;
+ break;
+ }
+ }
+
+ // Set some attributes
+ if (os->setattr (oid, "alpha", "value", strlen ("value")) != 0)
+ {
+ cout << "FAIL: set attribute" << endl;
+ }
+ if (os->setattr (oid, "beta", "value", strlen ("value")) != 0)
+ {
+ cout << "FAIL: set attribute" << endl;
+ }
+ if (os->setattr (oid, "gamma", "value", strlen ("value")) != 0)
+ {
+ cout << "FAIL: set attribute" << endl;
+ }
+ if (os->setattr (oid, "fred", "value", strlen ("value")) != 0)
+ {
+ cout << "FAIL: set attribute" << endl;
+ }
+
+ char *attrs = (char *) malloc (1024);
+ if (os->listattr (oid, attrs, 1024) != 0)
+ {
+ cout << "FAIL: listing attributes" << endl;
+ }
+ else
+ {
+ char *p = attrs;
+ if (strcmp (p, "alpha") != 0)
+ {
+ cout << "FAIL: should be \"alpha:\" \"" << p << "\"" << endl;
+ }
+ p = p + strlen (p) + 1;
+ if (strcmp (p, "beta") != 0)
+ {
+ cout << "FAIL: should be \"beta:\" \"" << p << "\"" << endl;
+ }
+ p = p + strlen (p) + 1;
+ if (strcmp (p, "fred") != 0)
+ {
+ cout << "FAIL: should be \"fred:\" \"" << p << "\"" << endl;
+ }
+ p = p + strlen (p) + 1;
+ if (strcmp (p, "gamma") != 0)
+ {
+ cout << "FAIL: should be \"gamma:\" \"" << p << "\"" << endl;
+ }
+ }
+
+ coll_t cid = 0xCAFEBABE;
+ if (os->create_collection (cid) != 0)
+ {
+ cout << "FAIL: create_collection" << endl;
+ }
+ if (os->create_collection (cid + 10) != 0)
+ {
+ cout << "FAIL: create_collection" << endl;
+ }
+ if (os->create_collection (cid + 5) != 0)
+ {
+ cout << "FAIL: create_collection" << endl;
+ }
+ if (os->create_collection (42) != 0)
+ {
+ cout << "FAIL: create_collection" << endl;
+ }
+
+ if (os->collection_add (cid, oid) != 0)
+ {
+ cout << "FAIL: collection_add" << endl;
+ }
+
+ list<coll_t> ls;
+ if (os->list_collections (ls) < 0)
+ {
+ cout << "FAIL: list_collections" << endl;
+ }
+ cout << "collections: ";
+ for (list<coll_t>::iterator it = ls.begin(); it != ls.end(); it++)
+ {
+ cout << *it << ", ";
+ }
+ cout << endl;
+
+ if (os->destroy_collection (0xCAFEBABE + 10) != 0)
+ {
+ cout << "FAIL: destroy_collection" << endl;
+ }
+
+ if (os->destroy_collection (0xCAFEBADE + 10) == 0)
+ {
+ cout << "FAIL: destroy_collection" << endl;
+ }
+
+ object_t oid2 (12345, 12345);
+ for (int i = 0; i < 8; i++)
+ {
+ oid2.rev++;
+ if (os->collection_add (cid, oid2) != 0)
+ {
+ cout << "FAIL: collection_add" << endl;
+ }
+ }
+ for (int i = 0; i < 8; i++)
+ {
+ if (os->collection_remove (cid, oid2) != 0)
+ {
+ cout << "FAIL: collection_remove" << endl;
+ }
+ oid2.rev--;
+ }
+
+ // Truncate the object.
+ if (os->truncate (oid, 512, NULL) != 0)
+ {
+ cout << "FAIL: truncate" << endl;
+ }
+
+ // Expand the object.
+ if (os->truncate (oid, 1200, NULL) != 0)
+ {
+ cout << "FAIL: expand" << endl;
+ }
+
+ // Delete the object.
+ if (os->remove (oid) != 0)
+ {
+ cout << "FAIL: could not remove object" << endl;
+ }
+
+ // Shouldn't be there
+ if (os->exists (oid))
+ {
+ cout << "FAIL: should not be there" << endl;
+ }
+
+ os->sync();
+ exit (0);
+}