template<typename T>
uint32_t binary_search (T *array, size_t size, T key)
{
- int c = size >> 1;
- int i = c;
+ int low = 0;
+ int high = size;
+ int p = (low + high) / 2;
- while (c != 0)
+ while (low < high - 1)
{
- c = c >> 1;
- if (array[i] < key)
+ if (array[p] > key)
{
- i = i + c;
+ high = p;
}
- else if (array[i] > key)
+ else if (array[p] < key)
{
- i = i - c;
+ low = p;
}
else
- return i;
+ return p;
+
+ p = (low + high) / 2;
}
- if (array[i] < key)
- i++;
- return i;
+ if (array[p] < key)
+ p++;
+ else if (array[p] > key && p > 0)
+ p--;
+ return p;
}
\f // Management.
+int OSBDB::opendb(DBTYPE type, int flags)
+{
+ db = new Db(env, 0);
+ db->set_error_stream (&std::cerr);
+ db->set_message_stream (&std::cout);
+ db->set_flags (0);
+ if (!g_conf.bdbstore_btree)
+ {
+ if (g_conf.bdbstore_pagesize > 0)
+ db->set_pagesize (g_conf.bdbstore_pagesize);
+ if (g_conf.bdbstore_ffactor > 0 && g_conf.bdbstore_nelem > 0)
+ {
+ db->set_h_ffactor (g_conf.bdbstore_ffactor);
+ db->set_h_nelem (g_conf.bdbstore_nelem);
+ }
+ }
+ if (g_conf.bdbstore_cachesize > 0)
+ {
+ db->set_cachesize (0, g_conf.bdbstore_cachesize, 0);
+ }
+
+ int ret;
+ if ((ret = db->open (NULL, device.c_str(), NULL, type, flags, 0)) != 0)
+ {
+ derr(1) << "failed to open database: " << device << ": "
+ << strerror(ret) << std::endl;
+ return -EINVAL;
+ }
+ opened = true;
+ return 0;
+}
+
int OSBDB::mount()
{
+ dout(2) << "mount " << device << endl;
+
+ if (mounted)
+ return 0;
+
+ if (!opened)
+ {
+ int ret;
+ if ((ret = opendb ()) != 0)
+ return ret;
+ }
+
// XXX Do we want anything else in the superblock?
Dbt key (OSBDB_SUPERBLOCK_KEY, 1);
value.set_dlen (sizeof (super));
value.set_ulen (sizeof (super));
value.set_flags (DB_DBT_USERMEM | DB_DBT_PARTIAL);
- if (db.get (NULL, &key, &value, 0) != 0)
+
+ if (db->get (NULL, &key, &value, 0) != 0)
return -EINVAL; // XXX how to say "badly formed fs?"
+
+ dout(2) << ".mount " << super << endl;
+
if (super.version != OSBDB_THIS_VERSION)
return -EINVAL;
DBTYPE t;
- db.get_type (&t);
+ db->get_type (&t);
if (t == DB_BTREE)
{
u_int32_t minkey;
- db.get_bt_minkey (&minkey);
+ u_int32_t flags;
+ db->get_bt_minkey (&minkey);
+ db->get_flags (&flags);
dout(1) << "mounted version " << OSBDB_THIS_VERSION << "; Btree; "
- << "min keys per page: " << minkey << endl;
+ << "min keys per page: " << minkey << "; flags: "
+ << hex << flags << endl;
+ cout << dec;
}
else
{
u_int32_t ffactor;
u_int32_t nelem;
- db.get_h_ffactor (&ffactor);
- db.get_h_nelem (&nelem);
+ u_int32_t flags;
+ db->get_h_ffactor (&ffactor);
+ db->get_h_nelem (&nelem);
+ db->get_flags (&flags);
dout(1) << "mounted version " << OSBDB_THIS_VERSION << "; Hash; "
<< "fill factor: " << ffactor
- << " table size: " << nelem << endl;
+ << " table size: " << nelem << "; flags: "
+ << hex << flags << endl;
+ cout << dec;
}
+ mounted = true;
return 0;
}
int OSBDB::umount()
{
- // Anything?
+ if (!mounted)
+ return -EINVAL;
+ sync();
+ int ret;
+ if (opened)
+ {
+ if ((ret = db->close (0)) != 0)
+ {
+ derr(1) << "close: " << db_strerror(ret) << endl;
+ return -EINVAL;
+ }
+ delete db;
+ db = NULL;
+ }
+ mounted = false;
+ opened = false;
return 0;
}
int OSBDB::mkfs()
{
+ if (mounted)
+ return -EINVAL;
+
+ dout(2) << "mkfs" << endl;
+
+ unlink (device.c_str());
+ int ret;
+ if ((ret = opendb((g_conf.bdbstore_btree ? DB_BTREE : DB_HASH), DB_CREATE)) != 0)
+ {
+ derr(1) << "failed to open database: " << device << ": "
+ << strerror(ret) << std::endl;
+ return -EINVAL;
+ }
+ opened = true;
+ dout(3) << "..opened " << device << endl;
+
uint32_t c;
- int ret = db.truncate (NULL, &c, 0);
+ ret = db->truncate (NULL, &c, 0);
if (ret != 0)
- return -EIO; // ???
+ {
+ return -EIO; // ???
+ }
Dbt key (OSBDB_SUPERBLOCK_KEY, 1);
struct stored_superblock sb;
sb.version = OSBDB_THIS_VERSION;
Dbt value (&sb, sizeof (sb));
- if (db.put (NULL, &key, &value, 0) != 0)
- return -EIO; // ???
+ dout(3) << "..writing superblock" << endl;
+ if (db->put (NULL, &key, &value, 0) != 0)
+ {
+ return -EIO; // ???
+ }
+ dout(3) << "..wrote superblock" << endl;
return 0;
}
int OSBDB::pick_object_revision_lt(object_t& oid)
{
- return -1; // FIXME
+ if (!mounted)
+ return -EINVAL;
+
+ // XXX this is pretty lame. Can we do better?
+ assert(oid.rev > 0);
+ oid.rev--;
+ while (oid.rev > 0)
+ {
+ if (exists (oid))
+ {
+ return 0;
+ }
+ oid.rev--;
+ }
+ return -EEXIST; // FIXME
}
bool OSBDB::exists(object_t oid)
{
+ dout(2) << "exists " << oid << endl;
struct stat st;
return (stat (oid, &st) == 0);
}
int OSBDB::statfs (struct statfs *st)
{
- return 0;
+ return -ENOSYS;
}
int OSBDB::stat(object_t oid, struct stat *st)
{
+ if (!mounted)
+ return -EINVAL;
+
+ dout(2) << "stat " << oid << endl;
+
+ object_inode_key ikey = new_object_inode_key(oid);
stored_object obj;
- Dbt key (&oid, sizeof (object_t));
+ Dbt key (&ikey, sizeof_object_inode_key());
Dbt value (&obj, sizeof (obj));
- value.set_flags (DB_DBT_USERMEM | DB_DBT_PARTIAL);
+ value.set_flags (DB_DBT_USERMEM);
value.set_ulen (sizeof (obj));
- if (db.get (NULL, &key, &value, 0) != 0)
- return -ENOENT;
+ dout(3) << " lookup " << ikey << endl;
+ int ret;
+ if ((ret = db->get (NULL, &key, &value, 0)) != 0)
+ {
+ derr(1) << " get returned " << ret << endl;
+ return -ENOENT;
+ }
st->st_size = obj.length;
+ dout(3) << "stat length:" << obj.length << endl;
return 0;
}
int OSBDB::remove(object_t oid, Context *onsafe)
{
- Dbt key (&oid, sizeof (object_t));
- db.del (NULL, &key, 0);
+ if (!mounted)
+ return -EINVAL;
- // FIXME remove attributes for this object.
+ dout(2) << "remove " << oid << endl;
+
+ oid_t id;
+ mkoid(id, oid);
+ Dbt key (&id, sizeof (oid_t));
+ db->del (NULL, &key, 0);
+ object_inode_key _ikey = new_object_inode_key (oid);
+ Dbt ikey (&_ikey, sizeof_object_inode_key());
+ db->del (NULL, &ikey, 0);
+
+ attrs_id aids = new_attrs_id (oid);
+ Dbt askey (&aids, sizeof_attrs_id());
+ Dbt asval;
+ asval.set_flags (DB_DBT_MALLOC);
+ if (db->get (NULL, &askey, &asval, 0) == 0)
+ {
+ // We have attributes; remove them.
+ stored_attrs *sap = (stored_attrs *) asval.get_data();
+ auto_ptr<stored_attrs> sa (sap);
+ for (unsigned i = 0; i < sap->count; i++)
+ {
+ attr_id aid = new_attr_id (oid, sap->names[i].name);
+ Dbt akey (&aid, sizeof (aid));
+ db->del (NULL, &akey, 0);
+ }
+ db->del (NULL, &askey, 0);
+ }
return 0;
}
int OSBDB::truncate(object_t oid, off_t size, Context *onsafe)
{
+ if (!mounted)
+ return -EINVAL;
+
+ dout(2) << "truncate " << size << endl;
+
if (size > 0xFFFFFFFF)
return -ENOSPC;
+ object_inode_key ikey = new_object_inode_key (oid);
stored_object obj;
- Dbt key (&oid, sizeof (oid));
+ Dbt key (&ikey, sizeof_object_inode_key());
Dbt value (&obj, sizeof (obj));
value.set_dlen (sizeof (obj));
value.set_ulen (sizeof (obj));
- value.set_flags (DB_DBT_USERMEM | DB_DBT_PARTIAL);
+ value.set_flags (DB_DBT_USERMEM);
- if (db.get (NULL, &key, &value, 0) != 0)
- {
- return -ENOENT;
- }
+ if (db->get (NULL, &key, &value, 0) != 0)
+ return -ENOENT;
if (obj.length < size)
{
+ oid_t id;
+ mkoid (id, oid);
+ Dbt okey (&id, sizeof (oid_t));
char b[] = { '\0' };
Dbt newVal (b, 1);
- newVal.set_doff ((size_t) size + offsetof (stored_object, data) - 1);
+ newVal.set_doff ((size_t) size);
newVal.set_dlen (1);
newVal.set_ulen (1);
newVal.set_flags (DB_DBT_PARTIAL);
- if (db.put (NULL, &key, &newVal, 0) != 0)
- {
- return -EIO;
- }
+ if (db->put (NULL, &okey, &newVal, 0) != 0)
+ return -EIO;
obj.length = size;
- value.set_doff (0);
- value.set_dlen (sizeof (off_t));
- value.set_ulen (sizeof (off_t));
- value.set_flags (DB_DBT_USERMEM | DB_DBT_PARTIAL);
- if (db.put (NULL, &key, &value, 0) != 0)
- {
- return -EIO;
- }
+ value.set_ulen (sizeof (obj));
+ if (db->put (NULL, &key, &value, 0) != 0)
+ return -EIO;
}
else if (obj.length > size)
{
+ obj.length = size;
+ Dbt tval (&obj, sizeof (obj));
+ tval.set_ulen (sizeof (obj));
+ tval.set_flags (DB_DBT_USERMEM);
+ if (db->put (NULL, &key, &tval, 0) != 0)
+ return -EIO;
if (size == 0)
{
- obj.length = 0;
- Dbt tval (&obj, sizeof (obj));
- tval.set_ulen (sizeof (obj));
- tval.set_flags (DB_DBT_USERMEM);
- if (db.put (NULL, &key, &tval, 0) != 0)
- {
- return -EIO;
- }
- }
- else if (size < 2 * 1024 * 1024) // XXX
- {
- size_t newSz = sizeof (stored_object) + (size_t) size;
- auto_ptr<stored_object> obj2 ((stored_object *) malloc (newSz));
- stored_object *objp = obj2.get();
- Dbt tval (objp, newSz);
- tval.set_doff (0);
- tval.set_dlen (newSz);
- tval.set_ulen (newSz);
- tval.set_flags (DB_DBT_USERMEM | DB_DBT_PARTIAL);
- if (db.get (NULL, &key, &tval, 0) != 0)
- {
- return -EIO;
- }
-
- objp->length = size;
- tval.set_ulen (newSz);
- tval.set_size (newSz);
- tval.set_flags (DB_DBT_USERMEM);
- if (db.put (NULL, &key, &tval, 0) != 0)
- {
- return -EIO;
- }
+ char x[1];
+ oid_t id;
+ mkoid (id, oid);
+ Dbt okey (&id, sizeof (oid_t));
+ Dbt oval (&x, 0);
+ if (db->put (NULL, &okey, &oval, 0) != 0)
+ return -EIO;
}
else
{
- // XXX FIXME
- std::cerr << "WARNING: punting on truncating file to size "
- << size << std::endl;
- obj.length = size;
- value.set_doff (0);
- value.set_dlen (sizeof (off_t));
- value.set_ulen (sizeof (off_t));
- value.set_flags (DB_DBT_USERMEM | DB_DBT_PARTIAL);
- db.put (NULL, &key, &value, 0);
+ oid_t id;
+ mkoid (id, oid);
+ Dbt okey (&id, sizeof (oid_t));
+ Dbt oval;
+ oval.set_flags (DB_DBT_MALLOC);
+ if (db->get (NULL, &okey, &oval, 0) != 0)
+ return -EIO;
+ auto_ptr<char> ovalPtr ((char *) oval.get_data());
+ oval.set_size ((size_t) size);
+ oval.set_ulen ((size_t) size);
+ if (db->put (NULL, &okey, &oval, 0) != 0)
+ return -EIO;
}
}
int OSBDB::read(object_t oid, off_t offset, size_t len, bufferlist& bl)
{
- stored_object obj;
- Dbt key (&oid, sizeof (oid));
- Dbt value (&obj, sizeof (obj));
+ if (!mounted)
+ return -EINVAL;
- value.set_ulen (sizeof (obj));
- value.set_dlen (sizeof (obj));
- value.set_flags (DB_DBT_USERMEM | DB_DBT_PARTIAL);
-
- if (db.get (NULL, &key, &value, 0) != 0)
+ dout(2) << "read " << oid << " " << offset << " "
+ << len << endl;
+
+ DbTxn *txn = NULL;
+ //env->txn_begin (NULL, &txn, 0);
+
+ object_inode_key _ikey = new_object_inode_key (oid);
+ stored_object obj;
+ Dbt ikey (&_ikey, sizeof_object_inode_key());
+ Dbt ival (&obj, sizeof (obj));
+ ival.set_flags (DB_DBT_USERMEM);
+ ival.set_ulen (sizeof(obj));
+
+ dout(3) << " get " << _ikey << endl;
+ int ret;
+ if ((ret = db->get (txn, &ikey, &ival, 0)) != 0)
{
+ //txn->abort();
+ derr(1) << "get returned " << db_strerror (ret) << endl;
return -ENOENT;
}
- if (offset >= obj.length)
- return 0;
-
- char *buf = bl.c_str();
- Dbt dval (buf, len);
- dval.set_doff (offsetof (stored_object, data) + (size_t) offset);
- dval.set_dlen (len);
- dval.set_ulen (len);
- dval.set_flags (DB_DBT_USERMEM | DB_DBT_PARTIAL);
- if (db.get (NULL, &key, &dval, 0) != 0)
+ if (offset == 0 && len >= obj.length)
{
- return -EIO;
+ len = obj.length;
+ dout(3) << " doing full read of " << len << endl;
+ oid_t id;
+ mkoid (id, oid);
+ Dbt key (&id, sizeof (oid_t));
+ Dbt value (bl.c_str(), len);
+ value.set_ulen (len);
+ value.set_flags (DB_DBT_USERMEM);
+ dout(3) << " getting " << oid << endl;
+ if ((ret = db->get (txn, &key, &value, 0)) != 0)
+ {
+ derr(1) << " get returned " << db_strerror (ret) << endl;
+ //txn->abort();
+ return -EIO;
+ }
+ }
+ else
+ {
+ if (offset > obj.length)
+ return 0;
+ if (offset + len > obj.length)
+ len = obj.length - (size_t) offset;
+ dout(3) << " doing partial read of " << len << endl;
+ oid_t id;
+ mkoid (id, oid);
+ Dbt key (&id, sizeof (oid));
+ Dbt value (bl.c_str(), len);
+ value.set_doff ((size_t) offset);
+ value.set_dlen (len);
+ value.set_ulen (len);
+ value.set_flags (DB_DBT_USERMEM | DB_DBT_PARTIAL);
+ dout(3) << " getting " << oid << endl;
+ if ((ret = db->get (NULL, &key, &value, 0)) != 0)
+ {
+ derr(1) << "get returned " << db_strerror (ret) << endl;
+ //txn->abort();
+ return -EIO;
+ }
}
- return dval.get_size();
+ //txn->commit (0);
+ return len;
}
int OSBDB::write(object_t oid, off_t offset, size_t len,
bufferlist& bl, Context *onsafe)
{
- stored_object obj;
- Dbt key (&oid, sizeof (oid));
- Dbt value (&obj, sizeof (obj));
+ if (!mounted)
+ return -EINVAL;
- if (db.get (NULL, &key, &value, 0) != 0)
- {
- obj.length = 0;
- }
+ dout(2) << "write " << oid << " " << offset << " "
+ << len << endl;
+
+ if (offset > 0xFFFFFFFFL || offset + len > 0xFFFFFFFFL)
+ return -ENOSPC;
+
+ DbTxn *txn = NULL;
+ //env->txn_begin (NULL, &txn, 0);
- if (offset + len > obj.length)
+ object_inode_key _ikey = new_object_inode_key (oid);
+ stored_object obj;
+ Dbt ikey (&_ikey, sizeof_object_inode_key());
+ Dbt ival (&obj, sizeof (obj));
+ ival.set_ulen (sizeof (obj));
+ ival.set_flags (DB_DBT_USERMEM);
+
+ int ret;
+ dout(3) << " getting " << _ikey << endl;
+ if (db->get (txn, &ikey, &ival, 0) != 0)
{
+ dout(3) << " writing new object" << endl;
+
+ // New object.
obj.length = (size_t) offset + len;
- value.set_dlen (offsetof (stored_object, data));
- value.set_ulen (offsetof (stored_object, data));
- value.set_flags (DB_DBT_PARTIAL);
- if (db.put (NULL, &key, &value, 0) != 0)
+ dout(3) << " mapping " << _ikey << " => "
+ << obj << endl;
+ if ((ret = db->put (txn, &ikey, &ival, 0)) != 0)
{
+ derr(1) << " put returned " << db_strerror (ret) << endl;
return -EIO;
}
+
+ oid_t id;
+ mkoid (id, oid);
+ Dbt key (&id, sizeof (oid_t));
+ Dbt value (bl.c_str(), len);
+ if (offset == 0) // whole object
+ {
+ value.set_flags (DB_DBT_USERMEM);
+ value.set_ulen (len);
+ }
+ else
+ {
+ value.set_flags (DB_DBT_USERMEM | DB_DBT_PARTIAL);
+ value.set_ulen (len);
+ value.set_doff ((size_t) offset);
+ value.set_dlen (len);
+ }
+ dout(3) << " mapping " << oid << " => ("
+ << obj.length << " bytes)" << endl;
+ if ((ret = db->put (txn, &key, &value, 0)) != 0)
+ {
+ derr(1) << " put returned " << db_strerror (ret) << endl;
+ return -EIO;
+ }
+ return len;
}
- char *buf = bl.c_str();
- Dbt dval (buf, len);
- dval.set_doff (offsetof (stored_object, data) + (size_t) offset);
- dval.set_dlen (len);
- dval.set_ulen (len);
- dval.set_flags (DB_DBT_PARTIAL);
- if (db.put (NULL, &key, &dval, 0) != 0)
+ if (offset == 0 && len >= obj.length)
{
- return -EIO;
+ if (len != obj.length)
+ {
+ obj.length = len;
+ if ((ret = db->put (txn, &ikey, &ival, 0)) != 0)
+ {
+ derr(1) << " put returned " << db_strerror (ret) << endl;
+ return -EIO;
+ }
+ }
+ oid_t id;
+ mkoid(id, oid);
+ Dbt key (&id, sizeof (oid_t));
+ Dbt value (bl.c_str(), len);
+ if (db->put (txn, &key, &value, 0) != 0)
+ {
+ return -EIO;
+ }
+ }
+ else
+ {
+ if (offset + len > obj.length)
+ {
+ obj.length = (size_t) offset + len;
+ if (db->put (NULL, &ikey, &ival, 0) != 0)
+ {
+ return -EIO;
+ }
+ }
+ oid_t id;
+ mkoid(id, oid);
+ Dbt key (&id, sizeof (oid_t));
+ Dbt value (bl.c_str(), len);
+ value.set_doff ((size_t) offset);
+ value.set_dlen (len);
+ value.set_ulen (len);
+ value.set_flags (DB_DBT_PARTIAL);
+ if (db->put (NULL, &key, &value, 0) != 0)
+ {
+ return -EIO;
+ }
}
return len;
int OSBDB::clone(object_t oid, object_t noid)
{
- struct stored_object obj;
+ if (!mounted)
+ return -EINVAL;
+
+ dout(2) << "clone " << oid << ", " << noid << endl;
if (exists (noid))
return -EEXIST;
- Dbt key (&oid, sizeof (oid));
- Dbt value (&obj, sizeof (obj));
- Dbt newKey (&noid, sizeof (noid));
+ object_inode_key _ikey = new_object_inode_key (oid);
+ object_inode_key _nikey = new_object_inode_key (noid);
+ stored_object obj;
+ Dbt ikey (&_ikey, sizeof_object_inode_key());
+ Dbt ival (&obj, sizeof (obj));
+ Dbt nikey (&_nikey, sizeof_object_inode_key());
+ ival.set_ulen (sizeof (obj));
+ ival.set_flags (DB_DBT_USERMEM);
+
+ oid_t id, nid;
+ mkoid(id, oid);
+ mkoid(nid, noid);
+ Dbt key (&id, sizeof (oid_t));
+ Dbt nkey (&oid, sizeof (oid_t));
+ Dbt value;
+ value.set_flags (DB_DBT_MALLOC);
- value.set_ulen (sizeof (obj));
- value.set_dlen (sizeof (obj));
- if (db.get (NULL, &key, &value, 0) != 0)
+ if (db->get (NULL, &ikey, &ival, 0) != 0)
return -ENOENT;
+ if (db->get (NULL, &key, &value, 0) != 0)
+ return -ENOENT;
+ auto_ptr<char> valueptr ((char *) value.get_data());
- db.put (NULL, &newKey, &value, 0);
-
- auto_ptr<char> buf (new char[4096]);
- for (size_t o = 0; o < obj.length; o += 4096)
- {
- Dbt block (buf.get(), 4096);
- block.set_ulen (4096);
- block.set_doff (offsetof (stored_object, data) + o);
- block.set_dlen (4096);
- block.set_flags (DB_DBT_USERMEM | DB_DBT_PARTIAL);
- if (db.get (NULL, &key, &block, 0) != 0)
- {
- return -EIO;
- }
-
- block.set_flags (DB_DBT_PARTIAL);
- if (db.put (NULL, &newKey, &block, 0) != 0)
- {
- return -EIO;
- }
- }
+ if (db->put (NULL, &nikey, &ival, 0) != 0)
+ return -EIO;
+ if (db->put (NULL, &nkey, &value, 0) != 0)
+ return -EIO;
return 0;
}
int OSBDB::list_collections(list<coll_t>& ls)
{
+ if (!mounted)
+ return -EINVAL;
+
+ dout(2) << "list_collections" << endl;
+
Dbt key (COLLECTIONS_KEY, 1);
Dbt value;
value.set_flags (DB_DBT_MALLOC);
- if (db.get (NULL, &key, &value, 0) != 0)
+ if (db->get (NULL, &key, &value, 0) != 0)
return 0; // no collections.
auto_ptr<stored_colls> sc ((stored_colls *) value.get_data());
int OSBDB::create_collection(coll_t c, Context *onsafe)
{
+ if (!mounted)
+ return -EINVAL;
+
+ dout(2) << "create_collection " << c << endl;
+
Dbt key (COLLECTIONS_KEY, 1);
Dbt value;
value.set_flags (DB_DBT_MALLOC);
stored_colls *scp = NULL;
size_t sz = 0;
bool created = false;
- if (db.get (NULL, &key, &value, 0) != 0)
+ if (db->get (NULL, &key, &value, 0) != 0)
{
sz = sizeof (stored_colls) + sizeof (coll_t);
scp = (stored_colls *) malloc (sz);
if (scp->colls[ins] == c)
return -EEXIST;
+ dout(3) << "..insertion point: " << ins << endl;
+
// Make room for a new collection ID.
if (!created)
{
sz += sizeof (coll_t);
+ dout(3) << "..increase size to " << sz << endl;
stored_colls *scp2 = (stored_colls *) realloc (scp, sz);
+ sc.release ();
sc.reset (scp2);
scp = scp2;
}
- memmove (&scp->colls[ins], &scp->colls[ins + 1],
- (scp->count - ins) * sizeof (coll_t));
+ int n = (scp->count - ins) * sizeof (coll_t);
+ if (n > 0)
+ {
+ dout(3) << "..moving " << n << " bytes up" << endl;
+ memmove (&scp->colls[ins + 1], &scp->colls[ins], n);
+ }
+ scp->count++;
scp->colls[ins] = c;
+ dout(3) << "..collections: " << scp << endl;
+
// Put the modified collection list back.
{
Dbt value2 (scp, sz);
- if (db.put (NULL, &key, &value, 0) != 0)
+ if (db->put (NULL, &key, &value2, 0) != 0)
{
return -EIO;
}
new_coll.count = 0;
Dbt coll_key (&c, sizeof (coll_t));
Dbt coll_value (&new_coll, sizeof (stored_coll));
- if (db.put (NULL, &coll_key, &coll_value, 0) != 0)
+ if (db->put (NULL, &coll_key, &coll_value, 0) != 0)
{
return -EIO;
}
int OSBDB::destroy_collection(coll_t c, Context *onsafe)
{
+ if (!mounted)
+ return -EINVAL;
+
+ dout(2) << "destroy_collection " << c << endl;
+
Dbt key (COLLECTIONS_KEY, 1);
Dbt value;
value.set_flags (DB_DBT_MALLOC);
- if (db.get (NULL, &key, &value, 0) != 0)
+ if (db->get (NULL, &key, &value, 0) != 0)
{
return -ENOENT; // XXX
}
// Modify the record size to be one less.
Dbt nvalue (scp, value.get_size() - sizeof (coll_t));
nvalue.set_flags (DB_DBT_USERMEM);
- if (db.put (NULL, &key, &nvalue, 0) != 0)
+ if (db->put (NULL, &key, &nvalue, 0) != 0)
{
return -EIO;
}
// Delete the collection.
Dbt collKey (&c, sizeof (coll_t));
- if (db.del (NULL, &collKey, 0) != 0)
+ if (db->del (NULL, &collKey, 0) != 0)
{
return -EIO;
}
bool OSBDB::collection_exists(coll_t c)
{
+ if (!mounted)
+ return -EINVAL;
+
+ dout(2) << "collection_exists " << c << endl;
+
Dbt key (COLLECTIONS_KEY, 1);
Dbt value;
value.set_flags (DB_DBT_MALLOC);
- if (db.get (NULL, &key, &value, 0) != 0)
+ if (db->get (NULL, &key, &value, 0) != 0)
return false;
stored_colls *scp = (stored_colls *) value.get_data();
int OSBDB::collection_stat(coll_t c, struct stat *st)
{
+ if (!mounted)
+ return -EINVAL;
+
+ dout(2) << "collection_stat " << c << endl;
return -ENOSYS;
}
int OSBDB::collection_add(coll_t c, object_t o, Context *onsafe)
{
+ if (!mounted)
+ return -EINVAL;
+
+ dout(2) << "collection_add " << c << " " << o << endl;
+
Dbt key (&c, sizeof (coll_t));
Dbt value;
value.set_flags (DB_DBT_MALLOC);
- if (db.get (NULL, &key, &value, 0) != 0)
+ if (db->get (NULL, &key, &value, 0) != 0)
{
return -ENOENT;
}
// Make room for the new value, and add it.
sz += sizeof (object_t);
scp = (stored_coll *) realloc (scp, sz);
+ sc.release();
sc.reset (scp);
if (ins < scp->count)
{
- size_t n = (scp->count - ins - 1) * sizeof (object_t);
+ size_t n = (scp->count - ins) * sizeof (object_t);
memmove (&scp->objects[ins + 1], &scp->objects[ins], n);
}
scp->count++;
scp->objects[ins] = o;
+ dout(3) << "..collection: " << scp << endl;
+
Dbt nvalue (scp, sz);
- if (db.put (NULL, &key, &nvalue, 0) != 0)
+ if (db->put (NULL, &key, &nvalue, 0) != 0)
{
return -EIO;
}
int OSBDB::collection_remove(coll_t c, object_t o, Context *onsafe)
{
+ if (!mounted)
+ return -EINVAL;
+
+ dout(2) << "collection_remove " << c << " " << o << endl;
+
Dbt key (&c, sizeof (coll_t));
Dbt value;
value.set_flags (DB_DBT_MALLOC);
- if (db.get (NULL, &key, &value, 0) != 0)
+ if (db->get (NULL, &key, &value, 0) != 0)
{
return -ENOENT;
}
}
scp->count--;
+ dout(3) << "..collection " << scp << endl;
+
Dbt nval (scp, value.get_size() - sizeof (object_t));
- if (db.put (NULL, &key, &nval, 0) != 0)
+ if (db->put (NULL, &key, &nval, 0) != 0)
{
return -EIO;
}
int OSBDB::collection_list(coll_t c, list<object_t>& o)
{
+ if (!mounted)
+ return -EINVAL;
+
Dbt key (&c, sizeof (coll_t));
Dbt value;
- if (db.get (NULL, &key, &value, 0) != 0)
+ if (db->get (NULL, &key, &value, 0) != 0)
return -ENOENT;
stored_coll *scp = (stored_coll *) value.get_data();
int OSBDB::_setattr(object_t oid, const char *name,
const void *value, size_t size, Context *onsafe)
{
+ if (!mounted)
+ return -EINVAL;
+
if (strlen (name) >= OSBDB_MAX_ATTR_LEN)
return -ENAMETOOLONG;
// Add name to attribute list, if needed.
- attrs_id aids;
- aids.oid = oid;
- Dbt attrs_key (&aids, sizeof (aids));
+ attrs_id aids = new_attrs_id (oid);
+ Dbt attrs_key (&aids, sizeof_attrs_id());
Dbt attrs_val;
attrs_val.set_flags (DB_DBT_MALLOC);
stored_attrs *sap = NULL;
size_t sz = 0;
- if (db.get (NULL, &attrs_key, &attrs_val, 0) != 0)
+
+ dout(3) << " getting " << aids << endl;
+ if (db->get (NULL, &attrs_key, &attrs_val, 0) != 0)
{
+ dout(2) << " first attribute" << endl;
sz = sizeof (stored_attrs);
- sap = new stored_attrs;
+ sap = (stored_attrs *) malloc(sz);
sap->count = 0;
}
else
{
sz = attrs_val.get_size();
sap = (stored_attrs *) attrs_val.get_data();
+ dout(2) << " add to list of " << sap->count << " attrs" << endl;
}
auto_ptr<stored_attrs> sa (sap);
int ins = 0;
if (sap->count > 0)
ins = binary_search<attr_name> (sap->names, sap->count, _name);
- if (sap->count == 0 || sap->names[ins] != _name)
+ dout(3) << " insertion point is " << ins << endl;
+ if (sap->count == 0 || strcmp (sap->names[ins].name, name) != 0)
{
sz += sizeof (attr_name);
+ dout(3) << " realloc 0x" << hex << ((void *) sap) << " to "
+ << dec << sz << endl;
sap = (stored_attrs *) realloc (sap, sz);
+ dout(3) << " returns 0x" << hex << ((void *) sap) << endl;
+ sa.release ();
sa.reset (sap);
- size_t n = (sap->count - ins - 1) * sizeof (attr_name);
+ int n = (sap->count - ins) * sizeof (attr_name);
if (n > 0)
{
+ dout(3) << " move " << n << " bytes from 0x"
+ << hex << (&sap->names[ins]) << " to 0x"
+ << hex << (&sap->names[ins+1]) << endl;
memmove (&sap->names[ins+1], &sap->names[ins], n);
}
- sap->names[ins] = _name;
+ memset (&sap->names[ins], 0, sizeof (attr_name));
+ strncpy (sap->names[ins].name, name, OSBDB_MAX_ATTR_LEN);
sap->count++;
Dbt newAttrs_val (sap, sz);
newAttrs_val.set_ulen (sz);
newAttrs_val.set_flags (DB_DBT_USERMEM);
- if (db.put (NULL, &attrs_key, &newAttrs_val, 0) != 0)
+ dout(3) << " putting " << aids << endl;
+ if (db->put (NULL, &attrs_key, &newAttrs_val, 0) != 0)
return -EIO;
}
+ else
+ {
+ dout(3) << " attribute " << name << " already exists" << endl;
+ }
+
+ dout(3) << " attributes list: " << sap << endl;
// Add the attribute.
- attr_id aid;
- aid.oid = oid;
- strncpy (aid.name.name, name, OSBDB_MAX_ATTR_LEN);
+ attr_id aid = new_attr_id (oid, name);
Dbt attr_key (&aid, sizeof (aid));
Dbt attr_val ((void *) value, size);
- if (db.put (NULL, &attr_key, &attr_val, 0) != 0)
+ dout(3) << " writing attribute key " << aid << endl;
+ if (db->put (NULL, &attr_key, &attr_val, 0) != 0)
return -EIO;
return 0;
const void *value, size_t size,
Context *onsafe)
{
+ if (!mounted)
+ return -EINVAL;
+
+ dout(2) << "setattr " << oid << ":" << name << " => ("
+ << size << " bytes)" << endl;
int ret = _setattr (oid, name, value, size, onsafe);
return ret;
}
int OSBDB::setattrs(object_t oid, map<string,bufferptr>& aset,
Context *onsafe)
{
+ if (!mounted)
+ return -EINVAL;
+
map<string,bufferptr>::iterator it;
for (it = aset.begin(); it != aset.end(); it++)
{
return 0;
}
+int OSBDB::_getattr (object_t oid, const char *name, void *value, size_t size)
+{
+ if (!mounted)
+ return -EINVAL;
+
+ attr_id aid = new_attr_id (oid, name);
+ Dbt key (&aid, sizeof (aid));
+ Dbt val (value, size);
+ val.set_ulen (size);
+ val.set_flags (DB_DBT_USERMEM | DB_DBT_PARTIAL);
+
+ if (db->get (NULL, &key, &val, 0) != 0)
+ {
+ return -ENOENT;
+ }
+
+ return val.get_size();
+}
+
+int OSBDB::getattr(object_t oid, const char *name, void *value, size_t size)
+{
+ if (!mounted)
+ return -EINVAL;
+
+ return _getattr (oid, name, value, size);
+}
+
+int OSBDB::getattrs(object_t oid, map<string,bufferptr>& aset)
+{
+ if (!mounted)
+ return -EINVAL;
+
+ int count = 0;
+ for (map<string,bufferptr>::iterator it = aset.begin();
+ it != aset.end(); it++)
+ {
+ int ret = _getattr (oid, (*it).first.c_str(),
+ (*it).second.c_str(),
+ (*it).second.length());
+ if (ret < 0)
+ return ret;
+ count += ret;
+ }
+ return count;
+}
+
+int OSBDB::rmattr(object_t oid, const char *name, Context *onsafe)
+{
+ if (!mounted)
+ return -EINVAL;
+ attrs_id aids = new_attrs_id (oid);
+ Dbt askey (&aids, sizeof_attrs_id());
+ Dbt asvalue;
+ asvalue.set_flags (DB_DBT_MALLOC);
+
+ if (db->get (NULL, &askey, &asvalue, 0) != 0)
+ return -ENOENT;
+
+ stored_attrs *sap = (stored_attrs *) asvalue.get_data();
+ auto_ptr<stored_attrs> sa (sap);
+
+ if (sap->count == 0)
+ return -ENOENT;
+
+ attr_name _name;
+ memset(&name, 0, sizeof (_name));
+ strncpy (_name.name, name, OSBDB_MAX_ATTR_LEN);
+ int ins = binary_search<attr_name> (sap->names, sap->count, _name);
+ if (strcmp (sap->names[ins].name, name) != 0)
+ return -ENOENT;
+
+ // Shift the later elements down by one, if needed.
+ int n = (sap->count - ins) * sizeof (attr_name);
+ if (n > 0)
+ memmove (&(sap->names[ins]), &(sap->names[ins + 1]), n);
+ sap->count--;
+ asvalue.set_size(asvalue.get_size() - sizeof (attr_name));
+ int ret;
+ if ((ret = db->put (NULL, &askey, &asvalue, 0)) != 0)
+ {
+ derr(1) << "put stored_attrs " << db_strerror (ret) << endl;
+ return -EIO;
+ }
+
+ // Remove the attribute.
+ attr_id aid = new_attr_id (oid, name);
+ Dbt key (&aid, sizeof (aid));
+ if ((ret = db->del (NULL, &key, 0)) != 0)
+ derr(1) << "deleting " << aid << ": " << db_strerror(ret) << endl;
+
+ return 0;
+}
+
int OSBDB::listattr(object_t oid, char *attrs, size_t size)
{
- attrs_id aids;
- aids.oid = oid;
- Dbt key (&aids, sizeof (aids));
+ if (!mounted)
+ return -EINVAL;
+
+ dout(2) << "listattr " << oid << endl;
+
+ attrs_id aids = new_attrs_id (oid);
+ Dbt key (&aids, sizeof_attrs_id());
Dbt value;
value.set_flags (DB_DBT_MALLOC);
- if (!db.get (NULL, &key, &value, 0) != 0)
- return -ENOENT;
+ int ret;
+ if ((ret = db->get (NULL, &key, &value, 0)) != 0)
+ {
+ derr(1) << "fetching " << aids << ": " << db_strerror (ret)
+ << endl;
+ return -ENOENT;
+ }
stored_attrs *attrsp = (stored_attrs *) value.get_data();
auto_ptr<stored_attrs> _attrs (attrsp);
for (unsigned i = 0; i < attrsp->count && s < size; i++)
{
int n = MIN (OSBDB_MAX_ATTR_LEN,
- MIN (strlen (attrsp->names[i].name), size - s));
+ MIN (strlen (attrsp->names[i].name), size - s - 1));
strncpy (p, attrsp->names[i].name, n);
- p = p + n;
+ p[n] = '\0';
+ p = p + n + 1;
}
return 0;
}
const void *value, size_t size,
Context *onsafe)
{
- return -ENOSYS;
+ if (!mounted)
+ return -EINVAL;
+
+ dout(2) << "collection_setattr" << cid << " " << name
+ << " (" << size << " bytes)" << endl;
+ if (strlen (name) >= OSBDB_MAX_ATTR_LEN)
+ return -ENAMETOOLONG;
+
+ // Add name to attribute list, if needed.
+ coll_attrs_id aids = new_coll_attrs_id (cid);
+ Dbt attrs_key (&aids, sizeof_coll_attrs_id());
+ Dbt attrs_val;
+ attrs_val.set_flags (DB_DBT_MALLOC);
+ stored_attrs *sap = NULL;
+ size_t sz = 0;
+
+ dout(3) << " getting " << aids << endl;
+ if (db->get (NULL, &attrs_key, &attrs_val, 0) != 0)
+ {
+ dout(2) << " first attribute" << endl;
+ sz = sizeof (stored_attrs);
+ sap = (stored_attrs *) malloc(sz);
+ sap->count = 0;
+ }
+ else
+ {
+ sz = attrs_val.get_size();
+ sap = (stored_attrs *) attrs_val.get_data();
+ dout(2) << " add to list of " << sap->count << " attrs" << endl;
+ }
+ auto_ptr<stored_attrs> sa (sap);
+
+ attr_name _name;
+ strncpy (_name.name, name, OSBDB_MAX_ATTR_LEN);
+
+ int ins = 0;
+ if (sap->count > 0)
+ ins = binary_search<attr_name> (sap->names, sap->count, _name);
+ dout(3) << " insertion point is " << ins << endl;
+ if (sap->count == 0 || strcmp (sap->names[ins].name, name) != 0)
+ {
+ sz += sizeof (attr_name);
+ dout(3) << " realloc 0x" << hex << ((void *) sap) << " to "
+ << dec << sz << endl;
+ sap = (stored_attrs *) realloc (sap, sz);
+ dout(3) << " returns 0x" << hex << ((void *) sap) << endl;
+ sa.release ();
+ sa.reset (sap);
+ int n = (sap->count - ins) * sizeof (attr_name);
+ if (n > 0)
+ {
+ dout(3) << " move " << n << " bytes from 0x"
+ << hex << (&sap->names[ins]) << " to 0x"
+ << hex << (&sap->names[ins+1]) << endl;
+ memmove (&sap->names[ins+1], &sap->names[ins], n);
+ }
+ memset (&sap->names[ins], 0, sizeof (attr_name));
+ strncpy (sap->names[ins].name, name, OSBDB_MAX_ATTR_LEN);
+ sap->count++;
+
+ Dbt newAttrs_val (sap, sz);
+ newAttrs_val.set_ulen (sz);
+ newAttrs_val.set_flags (DB_DBT_USERMEM);
+ dout(3) << " putting " << aids << endl;
+ if (db->put (NULL, &attrs_key, &newAttrs_val, 0) != 0)
+ return -EIO;
+ }
+ else
+ {
+ dout(3) << " attribute " << name << " already exists" << endl;
+ }
+
+ dout(3) << " attributes list: " << sap << endl;
+
+ // Add the attribute.
+ coll_attr_id aid = new_coll_attr_id (cid, name);
+ Dbt attr_key (&aid, sizeof (aid));
+ Dbt attr_val ((void *) value, size);
+ dout(3) << " writing attribute key " << aid << endl;
+ if (db->put (NULL, &attr_key, &attr_val, 0) != 0)
+ return -EIO;
+
+ return 0;
}
int OSBDB::collection_rmattr(coll_t cid, const char *name,
Context *onsafe)
{
- return -ENOSYS;
+ if (!mounted)
+ return -EINVAL;
+
+ coll_attrs_id aids = new_coll_attrs_id (cid);
+ Dbt askey (&aids, sizeof_coll_attrs_id());
+ Dbt asvalue;
+ asvalue.set_flags (DB_DBT_MALLOC);
+
+ if (db->get (NULL, &askey, &asvalue, 0) != 0)
+ return -ENOENT;
+
+ stored_attrs *sap = (stored_attrs *) asvalue.get_data();
+ auto_ptr<stored_attrs> sa (sap);
+
+ if (sap->count == 0)
+ return -ENOENT;
+
+ attr_name _name;
+ memset(&name, 0, sizeof (_name));
+ strncpy (_name.name, name, OSBDB_MAX_ATTR_LEN);
+ int ins = binary_search<attr_name> (sap->names, sap->count, _name);
+ if (strcmp (sap->names[ins].name, name) != 0)
+ return -ENOENT;
+
+ // Shift the later elements down by one, if needed.
+ int n = (sap->count - ins) * sizeof (attr_name);
+ if (n > 0)
+ memmove (&(sap->names[ins]), &(sap->names[ins + 1]), n);
+ sap->count--;
+ asvalue.set_size(asvalue.get_size() - sizeof (attr_name));
+ int ret;
+ if ((ret = db->put (NULL, &askey, &asvalue, 0)) != 0)
+ {
+ derr(1) << "put stored_attrs " << db_strerror (ret) << endl;
+ return -EIO;
+ }
+
+ // Remove the attribute.
+ coll_attr_id aid = new_coll_attr_id (cid, name);
+ Dbt key (&aid, sizeof (aid));
+ if ((ret = db->del (NULL, &key, 0)) != 0)
+ derr(1) << "deleting " << aid << ": " << db_strerror(ret) << endl;
+
+ return 0;
}
int OSBDB::collection_getattr(coll_t cid, const char *name,
void *value, size_t size)
{
- return -ENOSYS;
+ if (!mounted)
+ return -EINVAL;
+
+ dout(2) << "collection_getattr " << cid << " " << name << endl;
+
+ coll_attr_id caid = new_coll_attr_id (cid, name);
+ Dbt key (&caid, sizeof (caid));
+ Dbt val (value, size);
+ val.set_ulen (size);
+ val.set_dlen (size);
+ val.set_flags (DB_DBT_USERMEM | DB_DBT_PARTIAL);
+
+ if (db->get (NULL, &key, &val, 0) != 0)
+ return -ENOENT;
+
+ return val.get_size();
}
int OSBDB::collection_listattr(coll_t cid, char *attrs, size_t size)
{
- return -ENOSYS;
+ if (!mounted)
+ return -EINVAL;
+
+ dout(2) << "collection_listattr " << cid << endl;
+
+ coll_attrs_id caids = new_coll_attrs_id (cid);
+ Dbt key (&caids, sizeof_coll_attrs_id());
+ Dbt value;
+ value.set_flags (DB_DBT_MALLOC);
+
+ int ret;
+ if ((ret = db->get (NULL, &key, &value, 0)) != 0)
+ {
+ derr(1) << "fetching " << caids << ": " << db_strerror (ret)
+ << endl;
+ return -ENOENT;
+ }
+
+ stored_attrs *attrsp = (stored_attrs *) value.get_data();
+ auto_ptr<stored_attrs> _attrs (attrsp);
+ size_t s = 0;
+ char *p = attrs;
+ for (unsigned i = 0; i < attrsp->count && s < size; i++)
+ {
+ int n = MIN (OSBDB_MAX_ATTR_LEN,
+ MIN (strlen (attrsp->names[i].name), size - s - 1));
+ strncpy (p, attrsp->names[i].name, n);
+ p[n] = '\0';
+ p = p + n + 1;
+ }
+ return 0;
}
\f // Sync.
void OSBDB::sync (Context *onsync)
{
+ if (!mounted)
+ return;
+
sync();
// huh?
}
void OSBDB::sync()
{
- db.sync(0);
+ if (!mounted)
+ return;
+
+ db->sync(0);
}
uint32_t version;
};
+inline ostream& operator<<(ostream& out, const stored_superblock sb)
+{
+ out << "osbdb.super(" << sb.version << ")" << endl;
+ return out;
+}
+
+/**
+ * An object identifier; we define this so we can have a POD object to
+ * work with.
+ */
+struct oid_t // POD
+{
+ char id[16];
+};
+
+inline void mkoid (oid_t& id, object_t& oid)
+{
+ // XXX byte order?
+ memcpy (id.id, &oid, sizeof (oid_t));
+}
+
+inline ostream& operator<<(ostream& out, const oid_t id)
+{
+ for (int i = 0; i < 16; i++)
+ {
+ out.fill('0');
+ out << setw(2) << hex << (id.id[i] & 0xFF);
+ if ((i & 3) == 3)
+ out << ':';
+ }
+ out.unsetf(ios::right);
+ out << dec;
+ return out;
+}
+
/**
- * A stored object. These are mapped by the raw 128-bit object ID, and
- * include the length of the object.
+ * An "inode" key. We map a 'stored_object' struct to this key for
+ * every object.
+ */
+struct object_inode_key // POD
+{
+ oid_t oid;
+ char tag;
+};
+
+/**
+ * "Constructor" for an object_inode_key.
+ */
+inline object_inode_key new_object_inode_key (object_t& oid)
+{
+ object_inode_key key;
+ memset(&key, 0, sizeof (object_inode_key));
+ mkoid (key.oid, oid);
+ key.tag = 'i';
+ return key;
+}
+
+/*
+ * We use this, instead of sizeof(), to try and guarantee that we
+ * don't include the structure padding, if any.
+ *
+ * This *should* return 17: sizeof (oid_t) == 16; sizeof (char) == 1.
+ */
+inline size_t sizeof_object_inode_key()
+{
+ return offsetof(object_inode_key, tag) + sizeof (char);
+}
+
+ // Frank Poole: Unfortunately, that sounds a little
+ // like famous last words.
+ // -- 2001: A Space Odyssey
+
+inline ostream& operator<<(ostream& out, const object_inode_key o)
+{
+ out << o.tag << "/" << o.oid;
+ return out;
+}
+
+/**
+ * A stored object. This is essentially the "inode" of the object,
+ * containing things like the object's length. The object itself is
+ * stored as-is, mapped by the 128-bit object ID.
*/
struct stored_object
{
uint32_t length;
- char data[0]; // Actually variable-length
};
+inline ostream& operator<<(ostream& out, const stored_object s)
+{
+ out << "inode(l:" << s.length << ")";
+ return out;
+}
+
/*
* Key referencing the list of attribute names for an object. This is
* simply the object's ID, with an additional character 'a' appended.
*/
-struct attrs_id
+struct attrs_id // POD
{
- attrs_id() : pad('a') { }
-
- object_t oid;
- const char pad;
+ oid_t oid;
+ char tag;
};
+/*
+ * "Construtor" for attrs_id.
+ */
+inline struct attrs_id new_attrs_id (object_t& oid)
+{
+ attrs_id aid;
+ memset (&aid, 0, sizeof (attrs_id));
+ mkoid(aid.oid, oid);
+ aid.tag = 'a';
+ return aid;
+}
+
+/*
+ * See explanation for sizeof_object_inode_id.
+ */
+inline size_t sizeof_attrs_id()
+{
+ return offsetof(struct attrs_id, tag) + sizeof (char);
+}
+
+inline ostream& operator<<(ostream& out, const attrs_id id)
+{
+ out << id.tag << "/" << id.oid;
+ return out;
+}
+
/*
* Encapsulation of a single attribute name.
*/
-struct attr_name
+struct attr_name // POD
{
char name[OSBDB_MAX_ATTR_LEN];
};
+inline ostream& operator<<(ostream& out, const attr_name n)
+{
+ out << n.name;
+ return out;
+}
+
inline bool operator<(const attr_name n1, const attr_name n2)
{
return (strncmp (n1.name, n2.name, OSBDB_MAX_ATTR_LEN) < 0);
inline bool operator==(const attr_name n1, const attr_name n2)
{
+ std::cerr << n1.name << " == " << n2.name << "?" << endl;
return (strncmp (n1.name, n2.name, OSBDB_MAX_ATTR_LEN) == 0);
}
attr_name names[0]; // actually variable-length
};
+inline ostream& operator<<(ostream& out, const stored_attrs *sa)
+{
+ out << sa->count << " [ ";
+ for (unsigned i = 0; i < sa->count; i++)
+ out << sa->names[i] << (i == sa->count - 1 ? " " : ", ");
+ out << "]";
+ return out;
+}
+
/*
* An object attribute key. An object attribute is mapped simply by
* the object ID appended with the attribute name. Attribute names
* may not be empty, and must be less than 256 characters, in this
* implementation.
*/
-struct attr_id
+struct attr_id // POD
{
- object_t oid;
+ oid_t oid;
attr_name name;
};
+inline attr_id new_attr_id (object_t& oid, const char *name)
+{
+ attr_id aid;
+ memset(&aid, 0, sizeof (attr_id));
+ mkoid (aid.oid, oid);
+ strncpy (aid.name.name, name, OSBDB_MAX_ATTR_LEN);
+ return aid;
+}
+
+inline ostream& operator<<(ostream &out, const attr_id id)
+{
+ out << id.oid << ":" << id.name;
+ return out;
+}
+
+/*
+ * A key for a collection attributes list.
+ */
+struct coll_attrs_id // POD
+{
+ coll_t cid;
+ char tag;
+};
+
+inline coll_attrs_id new_coll_attrs_id (coll_t cid)
+{
+ coll_attrs_id catts;
+ memset(&catts, 0, sizeof (coll_attrs_id));
+ catts.cid = cid;
+ catts.tag = 'C';
+ return catts;
+}
+
+inline size_t sizeof_coll_attrs_id()
+{
+ return offsetof(coll_attrs_id, tag) + sizeof (char);
+}
+
+inline ostream& operator<<(ostream& out, coll_attrs_id id)
+{
+ out << id.tag << "/" << id.cid;
+ return out;
+}
+
/*
* A collection attribute key. Similar to
*/
-struct coll_attr_id
+struct coll_attr_id // POD
{
coll_t cid;
attr_name name;
};
+inline coll_attr_id new_coll_attr_id (coll_t cid, const char *name)
+{
+ coll_attr_id catt;
+ memset(&catt, 0, sizeof (coll_attr_id));
+ catt.cid = cid;
+ strncpy (catt.name.name, name, OSBDB_MAX_ATTR_LEN);
+ return catt;
+}
+
+inline ostream& operator<<(ostream& out, coll_attr_id id)
+{
+ out << id.cid << ":" << id.name;
+ return out;
+}
+
/*
* This is the key we store the master collections list under.
*/
coll_t colls[0]; // actually variable-length
};
+inline ostream& operator<<(ostream& out, stored_colls *c)
+{
+ out << c->count << " [ ";
+ for (unsigned i = 0; i < c->count; i++)
+ {
+ out << hex << c->colls[i];
+ if (i < c->count - 1)
+ out << ", ";
+ }
+ out << " ]" << dec;
+ return out;
+}
+
/*
* A stored collection (a bag of object IDs). These are referenced by
* the bare collection identifier type, a coll_t (thus, a 32-bit
object_t objects[0]; // actually variable-length
};
+inline ostream& operator<<(ostream& out, stored_coll *c)
+{
+ out << c->count << " [ ";
+ for (unsigned i = 0; i < c->count; i++)
+ {
+ out << c->objects[i];
+ if (i < c->count - 1)
+ out << ", ";
+ }
+ out << " ]";
+ return out;
+}
+
/*
* The object store interface for Berkeley DB.
*/
class OSBDB : public ObjectStore
{
private:
- // DbEnv env;
- Db db;
+ DbEnv *env;
+ Db *db;
string device;
+ bool mounted;
+ bool opened;
public:
- OSBDB(const char *dev) : db (NULL, 0), device (dev)
+ OSBDB(const char *dev)
+ : env(0), db (0), device (dev), mounted(false), opened(false)
{
- if (!g_conf.bdbstore_btree)
+ /*env = new DbEnv (DB_CXX_NO_EXCEPTIONS);
+ env->set_error_stream (&std::cerr);
+ // WTF? You can't open an env if you set this flag here, but BDB
+ // says you also can't set it after you open the env.
+ //env->set_flags (DB_LOG_INMEMORY, 1);
+ char *p = strrchr (dev, '/');
+ int env_flags = (DB_CREATE | DB_THREAD | DB_INIT_LOCK
+ | DB_INIT_MPOOL | DB_INIT_TXN | DB_INIT_LOG);
+ if (p != NULL)
{
- if (g_conf.bdbstore_pagesize > 0)
- db.set_pagesize (g_conf.bdbstore_pagesize);
- if (g_conf.bdbstore_ffactor > 0 && g_conf.bdbstore_nelem > 0)
+ *p = '\0';
+ if (env->open (dev, env_flags, 0) != 0)
{
- db.set_h_ffactor (g_conf.bdbstore_ffactor);
- db.set_h_nelem (g_conf.bdbstore_nelem);
+ std::cerr << "failed to open environment: "
+ << dev << std::endl;
+ ::abort();
}
+ *p = '/';
+ dev = p+1;
}
- if (db.open (NULL, dev, "OSD", (g_conf.bdbstore_btree ? DB_BTREE : DB_HASH),
- DB_CREATE, 0) != 0)
+ else
{
- std::cerr << "failed to open database" << std::endl;
- ::abort();
+ if (env->open (NULL, env_flags, 0) != 0)
+ {
+ std::cerr << "failed to open environment: ." << std::endl;
+ ::abort();
+ }
}
+
+ // Double WTF: if you remove the DB_LOG_INMEMORY bit, db->open
+ // fails, inexplicably, with EINVAL!*/
+ // env->set_flags (DB_DIRECT_DB | /*DB_AUTO_COMMIT |*/ DB_LOG_INMEMORY, 1);
}
~OSBDB()
{
- db.close (0);
+ if (mounted)
+ {
+ umount();
+ }
+ if (env != NULL)
+ {
+ env->close (0);
+ delete env;
+ }
}
int mount();
const void *value, size_t size, Context *onsafe=0);
int setattrs(object_t oid, map<string,bufferptr>& aset,
Context *onsafe=0);
- int clone(object_t oid, object_t noid);
-
+ int getattr(object_t oid, const char *name,
+ void *value, size_t size);
+ int getattrs(object_t oid, map<string,bufferptr>& aset);
+ int rmattr(object_t oid, const char *name,
+ Context *onsafe=0);
int listattr(object_t oid, char *attrs, size_t size);
+
+ int clone(object_t oid, object_t noid);
// Collections.
void sync();
private:
+ int opendb (DBTYPE type=DB_UNKNOWN, int flags=0);
+
int _setattr(object_t oid, const char *name, const void *value,
size_t size, Context *onsync);
+ int _getattr(object_t oid, const char *name, void *value, size_t size);
};