}
+unsigned Ebofs::apply_transaction(Transaction& t, Context *onsafe)
+{
+ ebofs_lock.Lock();
+ dout(7) << "apply_transaction start (" << t.ops.size() << " ops)" << endl;
+
+ // do ops
+ unsigned r = 0; // bit fields indicate which ops failed.
+ int bit = 1;
+ for (list<int>::iterator p = t.ops.begin();
+ p != t.ops.end();
+ p++) {
+ switch (*p) {
+ case Transaction::OP_WRITE:
+ {
+ object_t oid = t.oids.front(); t.oids.pop_front();
+ off_t offset = t.offsets.front(); t.offsets.pop_front();
+ size_t len = t.lengths.front(); t.lengths.pop_front();
+ bufferlist bl = t.bls.front(); t.bls.pop_front();
+ if (_write(oid, len, offset, bl) < 0) {
+ dout(7) << "apply_transaction fail on _write" << endl;
+ r &= bit;
+ }
+ }
+ break;
+
+ case Transaction::OP_TRUNCATE:
+ {
+ object_t oid = t.oids.front(); t.oids.pop_front();
+ size_t len = t.lengths.front(); t.lengths.pop_front();
+ if (_truncate(oid, len) < 0) {
+ dout(7) << "apply_transaction fail on _truncate" << endl;
+ r &= bit;
+ }
+ }
+ break;
+
+ case Transaction::OP_REMOVE:
+ {
+ object_t oid = t.oids.front(); t.oids.pop_front();
+ if (_remove(oid) < 0) {
+ dout(7) << "apply_transaction fail on _remove" << endl;
+ r &= bit;
+ }
+ }
+ break;
+
+ case Transaction::OP_SETATTR:
+ {
+ object_t oid = t.oids.front(); t.oids.pop_front();
+ const char *attrname = t.attrnames.front(); t.attrnames.pop_front();
+ pair<const void*,int> attrval = t.attrvals.front(); t.attrvals.pop_front();
+ if (_setattr(oid, attrname, attrval.first, attrval.second) < 0) {
+ dout(7) << "apply_transaction fail on _setattr" << endl;
+ r &= bit;
+ }
+ }
+ break;
+
+ case Transaction::OP_RMATTR:
+ {
+ object_t oid = t.oids.front(); t.oids.pop_front();
+ const char *attrname = t.attrnames.front(); t.attrnames.pop_front();
+ if (_rmattr(oid, attrname) < 0) {
+ dout(7) << "apply_transaction fail on _rmattr" << endl;
+ r &= bit;
+ }
+ }
+ break;
+
+ case Transaction::OP_MKCOLL:
+ {
+ coll_t cid = t.cids.front(); t.cids.pop_front();
+ if (_create_collection(cid) < 0) {
+ dout(7) << "apply_transaction fail on _create_collection" << endl;
+ r &= bit;
+ }
+ }
+ break;
+
+ case Transaction::OP_RMCOLL:
+ {
+ coll_t cid = t.cids.front(); t.cids.pop_front();
+ if (_destroy_collection(cid) < 0) {
+ dout(7) << "apply_transaction fail on _remove_collection" << endl;
+ r &= bit;
+ }
+ }
+ break;
+
+ case Transaction::OP_COLL_ADD:
+ {
+ coll_t cid = t.cids.front(); t.cids.pop_front();
+ object_t oid = t.oids.front(); t.oids.pop_front();
+ if (_collection_add(cid, oid) < 0) {
+ //dout(7) << "apply_transaction fail on _collection_add" << endl;
+ //r &= bit;
+ }
+ }
+ break;
+
+ case Transaction::OP_COLL_REMOVE:
+ {
+ coll_t cid = t.cids.front(); t.cids.pop_front();
+ object_t oid = t.oids.front(); t.oids.pop_front();
+ if (_collection_remove(cid, oid) < 0) {
+ dout(7) << "apply_transaction fail on _collection_remove" << endl;
+ r &= bit;
+ }
+ }
+ break;
+
+ case Transaction::OP_COLL_SETATTR:
+ {
+ coll_t cid = t.cids.front(); t.cids.pop_front();
+ const char *attrname = t.attrnames.front(); t.attrnames.pop_front();
+ pair<const void*,int> attrval = t.attrvals.front(); t.attrvals.pop_front();
+ if (_collection_setattr(cid, attrname, attrval.first, attrval.second) < 0) {
+ dout(7) << "apply_transaction fail on _collection_setattr" << endl;
+ r &= bit;
+ }
+ }
+ break;
+
+ case Transaction::OP_COLL_RMATTR:
+ {
+ coll_t cid = t.cids.front(); t.cids.pop_front();
+ const char *attrname = t.attrnames.front(); t.attrnames.pop_front();
+ if (_collection_rmattr(cid, attrname) < 0) {
+ dout(7) << "apply_transaction fail on _collection_rmattr" << endl;
+ r &= bit;
+ }
+ }
+ break;
+
+ default:
+ cerr << "bad op " << *p << endl;
+ assert(0);
+ }
+
+ bit = bit << 1;
+ }
+
+ dout(7) << "apply_transaction finish (r = " << r << ")" << endl;
+
+ // set up commit waiter
+ if (r == 0) {
+ if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+ } else {
+ if (onsafe) delete onsafe;
+ }
+
+ ebofs_lock.Unlock();
+ return r;
+}
+
+
+
+int Ebofs::_write(object_t oid, size_t length, off_t offset, bufferlist& bl)
+{
+ dout(7) << "_write " << hex << oid << dec << " len " << length << " off " << offset << endl;
+
+ // out of space?
+ unsigned max = (length+offset) / EBOFS_BLOCK_SIZE + 10; // very conservative; assumes we have to rewrite
+ max += dirty_onodes.size() + dirty_cnodes.size();
+ if (max >= free_blocks) {
+ dout(1) << "write failing, only " << free_blocks << " blocks free, may need up to " << max << endl;
+ return -ENOSPC;
+ }
+
+ // get|create inode
+ Onode *on = get_onode(oid);
+ if (!on) on = new_onode(oid); // new inode!
+
+ dirty_onode(on); // dirty onode!
+
+ // apply write to buffer cache
+ apply_write(on, length, offset, bl);
+
+ // done.
+ put_onode(on);
+ trim_bc();
+
+ return length;
+}
+
+
int Ebofs::write(object_t oid,
size_t len, off_t off,
bufferlist& bl, bool fsync)
int Ebofs::write(object_t oid,
size_t len, off_t off,
bufferlist& bl, Context *onsafe)
-{
- static map<const char*, pair<void*,int> > null_setattrs;
- static map<coll_t, map<const char*, pair<void*,int> > > null_cmods;
- return write_transaction(oid, len, off, bl, null_setattrs, null_cmods, onsafe);
-}
-
-int Ebofs::write_transaction(object_t oid,
- size_t len, off_t off,
- bufferlist& bl,
- map<const char*, pair<void*,int> >& setattrs,
- map<coll_t, map<const char*, pair<void*,int> > >& cmods,
- Context *onsafe)
{
ebofs_lock.Lock();
- dout(7) << "write " << hex << oid << dec << " len " << len << " off " << off << endl;
assert(len > 0);
// too much unflushed dirty data? (if so, block!)
if (_write_will_block()) {
- dout(10) << "write blocking" << endl;
+ dout(10) << "write blocking "
+ << hex << oid << dec << " len " << len << " off " << off << endl;
+
while (_write_will_block())
bc.waitfor_stat(); // waits on ebofs_lock
- dout(7) << "write unblocked " << hex << oid << dec << " len " << len << " off " << off << endl;
- }
- // out of space?
- //unsigned max = ((len+off) - MIN(off, on->object_size)) / EBOFS_BLOCK_SIZE + 10;
- unsigned max = (len+off) / EBOFS_BLOCK_SIZE + 10; // very conservative; assumes we have to rewrite
- max += dirty_onodes.size() + dirty_cnodes.size();
- if (max >= free_blocks) {
- dout(1) << "write failing, only " << free_blocks << " blocks free, may need up to " << max << endl;
- if (onsafe) delete onsafe;
- ebofs_lock.Unlock();
- return -ENOSPC;
+ dout(7) << "write unblocked "
+ << hex << oid << dec << " len " << len << " off " << off << endl;
}
-
- // get|create inode
- Onode *on = get_onode(oid);
- if (!on) on = new_onode(oid); // new inode!
- dirty_onode(on); // dirty onode!
-
- // apply write to buffer cache
- apply_write(on, len, off, bl);
-
- // join collections?
- for (map<coll_t, map<const char*, pair<void*,int> > >::iterator cit = cmods.begin();
- cit != cmods.end();
- cit++) {
- const coll_t cid = cit->first;
- if (_collection_exists(cid)) {
- if (oc_tab->lookup(idpair_t(oid,cid)) < 0) {
- oc_tab->insert(idpair_t(oid,cid), true);
- co_tab->insert(idpair_t(cid,oid), true);
- }
- }
- }
+ // go
+ int r = _write(oid, len, off, bl);
- // apply attribute changes
- do_setattrs(on, setattrs);
- do_csetattrs(cmods);
-
- // prepare (eventual) journal entry?
-
- // set up commit waiter
- if (onsafe) {
- // commit on next full fs commit.
- // FIXME when we add journaling.
- commit_waiters[super_epoch].push_back(onsafe);
- //on->commit_waiters.push_back(onsafe); // in case we delete the object.
+ // commit waiter
+ if (r > 0) {
+ assert((size_t)r == len);
+ if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+ } else {
+ if (onsafe) delete onsafe;
}
- // done
- put_onode(on);
-
- trim_bc();
-
ebofs_lock.Unlock();
- return len;
+ return r;
}
-
-int Ebofs::remove(object_t oid, Context *onsafe)
+int Ebofs::_remove(object_t oid)
{
- static map<coll_t, map<const char*, pair<void*,int> > > null_cmods;
- return remove_transaction(oid, null_cmods, onsafe);
-}
+ dout(7) << "_remove " << hex << oid << dec << endl;
-int Ebofs::remove_transaction(object_t oid,
- map<coll_t, map<const char*, pair<void*,int> > >& cmods,
- Context *onsafe)
-{
- ebofs_lock.Lock();
- dout(7) << "remove " << hex << oid << dec << endl;
-
// get inode
Onode *on = get_onode(oid);
- if (!on) {
- ebofs_lock.Unlock();
- return -ENOENT;
- }
+ if (!on) return -ENOENT;
// ok remove it!
remove_onode(on);
- // do collection mods
- do_csetattrs(cmods);
+ return 0;
+}
+
+
+int Ebofs::remove(object_t oid, Context *onsafe)
+{
+ ebofs_lock.Lock();
+
+ // do it
+ int r = _remove(oid);
// set up commit waiter
- if (onsafe) {
- // commit on next full fs commit.
- // FIXME when we add journaling.
- commit_waiters[super_epoch].push_back(onsafe);
+ if (r >= 0) {
+ if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+ } else {
+ if (onsafe) delete onsafe;
}
ebofs_lock.Unlock();
- return 0;
+ return r;
}
-int Ebofs::truncate(object_t oid, off_t size, Context *onsafe)
+int Ebofs::_truncate(object_t oid, off_t size)
{
- static map<const char*, pair<void*,int> > null_setattrs;
- static map<coll_t, map<const char*, pair<void*,int> > > null_cmods;
- return truncate_transaction(oid, size, null_setattrs, null_cmods, onsafe);
-}
+ dout(7) << "_truncate " << hex << oid << dec << " size " << size << endl;
-int Ebofs::truncate_transaction(object_t oid, off_t size,
- map<const char*, pair<void*,int> >& setattrs,
- map<coll_t, map<const char*, pair<void*,int> > >& cmods,
- Context *onsafe)
-{
- ebofs_lock.Lock();
- dout(7) << "truncate " << hex << oid << dec << " size " << size << endl;
-
Onode *on = get_onode(oid);
- if (!on) {
- ebofs_lock.Unlock();
+ if (!on)
return -ENOENT;
- }
int r = 0;
if (size > on->object_size) {
assert(size == on->object_size);
}
- // apply attribute changes
- do_setattrs(on, setattrs);
- do_csetattrs(cmods);
+ put_onode(on);
+ return r;
+}
- // set up commit waiter
- if (onsafe)
- commit_waiters[super_epoch].push_back(onsafe);
+
+int Ebofs::truncate(object_t oid, off_t size, Context *onsafe)
+{
+ ebofs_lock.Lock();
- put_onode(on);
+ int r = _truncate(oid, size);
+
+ // set up commit waiter
+ if (r >= 0) {
+ if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+ } else {
+ if (onsafe) delete onsafe;
+ }
+
ebofs_lock.Unlock();
return r;
}
return 0;
}
-// attributes
-void Ebofs::do_setattrs(Onode *on, map<const char*, pair<void*,int> > &setattrs)
+int Ebofs::_setattr(object_t oid, const char *name, const void *value, size_t size)
{
- for (map<const char*, pair<void*,int> >::iterator it = setattrs.begin();
- it != setattrs.end();
- it++) {
- string n(it->first);
- if (it->second.first) {
- AttrVal val((char*)it->second.first, it->second.second);
- on->attr[n] = val;
- } else {
- on->attr.erase(n);
- }
- }
-}
-
-void Ebofs::do_csetattrs(map<coll_t, map<const char*, pair<void*,int> > > &cmods)
-{
- for (map<coll_t, map<const char*, pair<void*,int> > >::iterator cit = cmods.begin();
- cit != cmods.end();
- cit++) {
- const coll_t cid = cit->first;
-
- if (!cit->second.empty()) {
- // attrs
- Cnode *cn = get_cnode(cid);
- if (cn) {
- for (map<const char*, pair<void*,int> >::iterator ait = cit->second.begin();
- ait != cit->second.end();
- ait++) {
- string n(ait->first);
- if (ait->second.first) {
- AttrVal val((char*)ait->second.first, ait->second.second);
- cn->attr[n] = val;
- } else {
- cn->attr.erase(n);
- }
- }
- dirty_cnode(cn);
- put_cnode(cn);
- } else {
- dout(0) << "warning: collection " << hex << cid << dec << " dne for csetattrs" << endl;
- }
- }
- }
-}
-
-int Ebofs::setattr(object_t oid, const char *name, const void *value, size_t size, Context *onsafe)
-{
- ebofs_lock.Lock();
dout(8) << "setattr " << hex << oid << dec << " '" << name << "' len " << size << endl;
Onode *on = get_onode(oid);
- if (!on) {
- ebofs_lock.Unlock();
- return -ENOENT;
- }
+ if (!on) return -ENOENT;
string n(name);
AttrVal val((char*)value, size);
on->attr[n] = val;
dirty_onode(on);
+ put_onode(on);
+ return 0;
+}
- if (onsafe)
- commit_waiters[super_epoch].push_back(onsafe);
+int Ebofs::setattr(object_t oid, const char *name, const void *value, size_t size, Context *onsafe)
+{
+ ebofs_lock.Lock();
+ int r = _setattr(oid, name, value, size);
+
+ // set up commit waiter
+ if (r >= 0) {
+ if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+ } else {
+ if (onsafe) delete onsafe;
+ }
- put_onode(on);
ebofs_lock.Unlock();
- return 0;
+ return r;
}
int Ebofs::getattr(object_t oid, const char *name, void *value, size_t size)
return r;
}
-int Ebofs::rmattr(object_t oid, const char *name, Context *onsafe)
+
+int Ebofs::_rmattr(object_t oid, const char *name)
{
- ebofs_lock.Lock();
- dout(8) << "rmattr " << hex << oid << dec << " '" << name << "'" << endl;
+ dout(8) << "_rmattr " << hex << oid << dec << " '" << name << "'" << endl;
Onode *on = get_onode(oid);
- if (!on) {
- ebofs_lock.Unlock();
- return -ENOENT;
- }
+ if (!on) return -ENOENT;
string n(name);
on->attr.erase(n);
-
- if (onsafe)
- commit_waiters[super_epoch].push_back(onsafe);
-
dirty_onode(on);
put_onode(on);
- ebofs_lock.Unlock();
return 0;
}
+int Ebofs::rmattr(object_t oid, const char *name, Context *onsafe)
+{
+ ebofs_lock.Lock();
+
+ int r = _rmattr(oid, name);
+
+ // set up commit waiter
+ if (r >= 0) {
+ if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+ } else {
+ if (onsafe) delete onsafe;
+ }
+
+ ebofs_lock.Unlock();
+ return r;
+}
+
int Ebofs::listattr(object_t oid, vector<string>& attrs)
{
ebofs_lock.Lock();
return num;
}
-int Ebofs::create_collection(coll_t cid)
+int Ebofs::_create_collection(coll_t cid)
{
- ebofs_lock.Lock();
- dout(9) << "create_collection " << hex << cid << dec << endl;
+ dout(9) << "_create_collection " << hex << cid << dec << endl;
- if (_collection_exists(cid)) {
- ebofs_lock.Unlock();
+ if (_collection_exists(cid))
return -EEXIST;
- }
Cnode *cn = new_cnode(cid);
put_cnode(cn);
+
+ return 0;
+}
+
+int Ebofs::create_collection(coll_t cid, Context *onsafe)
+{
+ ebofs_lock.Lock();
+
+ int r = _create_collection(cid);
+
+ // set up commit waiter
+ if (r >= 0) {
+ if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+ } else {
+ if (onsafe) delete onsafe;
+ }
ebofs_lock.Unlock();
- return 0;
+ return r;
}
-int Ebofs::destroy_collection(coll_t cid)
+int Ebofs::_destroy_collection(coll_t cid)
{
- ebofs_lock.Lock();
- dout(9) << "destroy_collection " << hex << cid << dec << endl;
+ dout(9) << "_destroy_collection " << hex << cid << dec << endl;
- if (!_collection_exists(cid)) {
- ebofs_lock.Unlock();
+ if (!_collection_exists(cid))
return -ENOENT;
- }
Cnode *cn = get_cnode(cid);
assert(cn);
}
remove_cnode(cn);
- ebofs_lock.Lock();
return 0;
}
+int Ebofs::destroy_collection(coll_t cid, Context *onsafe)
+{
+ ebofs_lock.Lock();
+
+ int r = _destroy_collection(cid);
+
+ // set up commit waiter
+ if (r >= 0) {
+ if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+ } else {
+ if (onsafe) delete onsafe;
+ }
+
+ ebofs_lock.Unlock();
+ return r;
+}
+
bool Ebofs::collection_exists(coll_t cid)
{
ebofs_lock.Lock();
dout(10) << "collection_exists " << hex << cid << dec << endl;
-
bool r = _collection_exists(cid);
ebofs_lock.Unlock();
return r;
return (collection_tab->lookup(cid) == 0);
}
-int Ebofs::collection_add(coll_t cid, object_t oid)
+int Ebofs::_collection_add(coll_t cid, object_t oid)
{
- ebofs_lock.Lock();
- dout(9) << "collection_add " << hex << cid << " object " << oid << dec << endl;
+ dout(9) << "_collection_add " << hex << cid << " object " << oid << dec << endl;
- if (!_collection_exists(cid)) {
- ebofs_lock.Unlock();
+ if (!_collection_exists(cid))
return -ENOENT;
- }
+
if (oc_tab->lookup(idpair_t(oid,cid)) < 0) {
oc_tab->insert(idpair_t(oid,cid), true);
co_tab->insert(idpair_t(cid,oid), true);
+ return 0;
+ } else {
+ return -ENOENT; // FIXME?
}
+}
+
+int Ebofs::collection_add(coll_t cid, object_t oid, Context *onsafe)
+{
+ ebofs_lock.Lock();
+
+ int r = _collection_add(cid, oid);
+
+ // set up commit waiter
+ if (r >= 0) {
+ if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+ } else {
+ if (onsafe) delete onsafe;
+ }
+
ebofs_lock.Unlock();
return 0;
}
-int Ebofs::collection_remove(coll_t cid, object_t oid)
+int Ebofs::_collection_remove(coll_t cid, object_t oid)
{
- ebofs_lock.Lock();
- dout(9) << "collection_remove " << hex << cid << " object " << oid << dec << endl;
+ dout(9) << "_collection_remove " << hex << cid << " object " << oid << dec << endl;
- if (!_collection_exists(cid)) {
- ebofs_lock.Unlock();
+ if (!_collection_exists(cid))
return -ENOENT;
- }
+
oc_tab->remove(idpair_t(oid,cid));
co_tab->remove(idpair_t(cid,oid));
cnode_map.erase(cid);
cnode_lru.lru_remove(cn);
delete cn;
+ return 0;
+ } else {
+ return -ENOENT; // FIXME?
+ }
+}
+
+int Ebofs::collection_remove(coll_t cid, object_t oid, Context *onsafe)
+{
+ ebofs_lock.Lock();
+
+ int r = _collection_remove(cid, oid);
+
+ // set up commit waiter
+ if (r >= 0) {
+ if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+ } else {
+ if (onsafe) delete onsafe;
}
ebofs_lock.Unlock();
}
-int Ebofs::collection_setattr(coll_t cid, const char *name, const void *value, size_t size)
+int Ebofs::_collection_setattr(coll_t cid, const char *name, const void *value, size_t size)
{
- ebofs_lock.Lock();
- dout(10) << "collection_setattr " << hex << cid << dec << " '" << name << "' len " << size << endl;
+ dout(10) << "_collection_setattr " << hex << cid << dec << " '" << name << "' len " << size << endl;
Cnode *cn = get_cnode(cid);
- if (!cn) {
- ebofs_lock.Unlock();
- return -ENOENT;
- }
+ if (!cn) return -ENOENT;
string n(name);
AttrVal val((char*)value, size);
cn->attr[n] = val;
dirty_cnode(cn);
-
put_cnode(cn);
+
+ return 0;
+}
+
+int Ebofs::collection_setattr(coll_t cid, const char *name, const void *value, size_t size, Context *onsafe)
+{
+ ebofs_lock.Lock();
+ dout(10) << "collection_setattr " << hex << cid << dec << " '" << name << "' len " << size << endl;
+
+ int r = _collection_setattr(cid, name, value, size);
+
+ // set up commit waiter
+ if (r >= 0) {
+ if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+ } else {
+ if (onsafe) delete onsafe;
+ }
+
ebofs_lock.Unlock();
return 0;
}
return r;
}
-int Ebofs::collection_rmattr(coll_t cid, const char *name)
+int Ebofs::_collection_rmattr(coll_t cid, const char *name)
{
- ebofs_lock.Lock();
- dout(10) << "collection_rmattr " << hex << cid << dec << " '" << name << "'" << endl;
+ dout(10) << "_collection_rmattr " << hex << cid << dec << " '" << name << "'" << endl;
Cnode *cn = get_cnode(cid);
- if (!cn) {
- ebofs_lock.Unlock();
- return -ENOENT;
- }
+ if (!cn) return -ENOENT;
string n(name);
cn->attr.erase(n);
dirty_cnode(cn);
put_cnode(cn);
+
+ return 0;
+}
+
+int Ebofs::collection_rmattr(coll_t cid, const char *name, Context *onsafe)
+{
+ ebofs_lock.Lock();
+
+ int r = _collection_rmattr(cid, name);
+
+ // set up commit waiter
+ if (r >= 0) {
+ if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+ } else {
+ if (onsafe) delete onsafe;
+ }
+
ebofs_lock.Unlock();
return 0;
}
} finisher_thread;
- bool _write_will_block();
void alloc_more_node_space();
void do_csetattrs(map<coll_t, map<const char*, pair<void*,int> > > &cmods);
int statfs(struct statfs *buf);
+ // atomic transaction
+ unsigned apply_transaction(Transaction& t, Context *onsafe);
// object interface
bool exists(object_t);
int stat(object_t, struct stat*);
int read(object_t, size_t len, off_t off, bufferlist& bl);
- int write(object_t oid,
- size_t len, off_t off,
- bufferlist& bl, bool fsync=true);
- int write(object_t oid,
- size_t len, off_t offset,
- bufferlist& bl,
- Context *onsafe);
- int write_transaction(object_t oid,
- size_t len, off_t offset,
- bufferlist& bl,
- map<const char*, pair<void*,int> >& setattrs,
- map<coll_t, map<const char*, pair<void*,int> > >& cmods,
- Context *onsafe);
-
- int truncate(object_t oid, off_t size,
- Context *onsafe=0);
- int truncate_transaction(object_t oid, off_t size,
- map<const char*, pair<void*,int> >& setattrs,
- map<coll_t, map<const char*, pair<void*,int> > >& cmods,
- Context *onsafe);
-
- int remove(object_t oid,
- Context *onsafe=0);
- int remove_transaction(object_t,
- map<coll_t, map<const char*, pair<void*,int> > >& cmods,
- Context *onsafe=0);
-
+ int write(object_t oid, size_t len, off_t off, bufferlist& bl, bool fsync=true);
+ int write(object_t oid, size_t len, off_t offset, bufferlist& bl, Context *onsafe);
+ int truncate(object_t oid, off_t size, Context *onsafe=0);
+ int remove(object_t oid, Context *onsafe=0);
bool write_will_block();
// object attr
- int setattr(object_t oid, const char *name, const void *value, size_t size,
- Context *onsafe=0);
+ int setattr(object_t oid, const char *name, const void *value, size_t size, Context *onsafe=0);
int getattr(object_t oid, const char *name, void *value, size_t size);
- int rmattr(object_t oid, const char *name,
- Context *onsafe=0);
+ int rmattr(object_t oid, const char *name, Context *onsafe=0);
int listattr(object_t oid, vector<string>& attrs);
-
+
// collections
int list_collections(list<coll_t>& ls);
- //int collection_stat(coll_t c, struct stat *st);
- int create_collection(coll_t c);
- int destroy_collection(coll_t c);
-
- bool _collection_exists(coll_t c);
bool collection_exists(coll_t c);
- int collection_add(coll_t c, object_t o);
- int collection_remove(coll_t c, object_t o);
+
+ int create_collection(coll_t c, Context *onsafe);
+ int destroy_collection(coll_t c, Context *onsafe);
+ int collection_add(coll_t c, object_t o, Context *onsafe);
+ int collection_remove(coll_t c, object_t o, Context *onsafe);
+
int collection_list(coll_t c, list<object_t>& o);
- int collection_setattr(object_t oid, const char *name, const void *value, size_t size);
+ int collection_setattr(object_t oid, const char *name, const void *value, size_t size, Context *onsafe);
int collection_getattr(object_t oid, const char *name, void *value, size_t size);
- int collection_rmattr(coll_t cid, const char *name);
+ int collection_rmattr(coll_t cid, const char *name, Context *onsafe);
int collection_listattr(object_t oid, vector<string>& attrs);
+
+private:
+ // private interface -- use if caller already holds lock
+ bool _write_will_block();
+ int _write(object_t oid, size_t len, off_t offset, bufferlist& bl);
+ int _truncate(object_t oid, off_t size);
+ int _remove(object_t oid);
+ int _setattr(object_t oid, const char *name, const void *value, size_t size);
+ int _rmattr(object_t oid, const char *name);
+ bool _collection_exists(coll_t c);
+ int _create_collection(coll_t c);
+ int _destroy_collection(coll_t c);
+ int _collection_add(coll_t c, object_t o);
+ int _collection_remove(coll_t c, object_t o);
+ int _collection_setattr(object_t oid, const char *name, const void *value, size_t size);
+ int _collection_rmattr(coll_t cid, const char *name);
+
};
return r;
}
- int create_collection(coll_t c) {
+ int create_collection(coll_t c,
+ Context *onsafe=0) {
faker_lock.Lock();
fakecollections[c].size();
+ if (onsafe) g_timer.add_event_after(2.0,onsafe);
faker_lock.Unlock();
return 0;
}
- int destroy_collection(coll_t c) {
+ int destroy_collection(coll_t c,
+ Context *onsafe=0) {
int r = 0;
faker_lock.Lock();
if (fakecollections.count(c)) {
fakecollections.erase(c);
//fakecattr.erase(c);
+ if (onsafe) g_timer.add_event_after(2.0,onsafe);
} else
r = -1;
faker_lock.Unlock();
return r;
}
- int collection_add(coll_t c, object_t o) {
+ int collection_add(coll_t c, object_t o,
+ Context *onsafe=0) {
faker_lock.Lock();
fakecollections[c].insert(o);
+ if (onsafe) g_timer.add_event_after(2.0,onsafe);
faker_lock.Unlock();
return 0;
}
- int collection_remove(coll_t c, object_t o) {
+ int collection_remove(coll_t c, object_t o,
+ Context *onsafe=0) {
faker_lock.Lock();
fakecollections[c].erase(o);
+ if (onsafe) g_timer.add_event_after(2.0,onsafe);
faker_lock.Unlock();
return 0;
}
}
int collection_setattr(coll_t c, const char *name,
- void *value, size_t size) {
+ void *value, size_t size,
+ Context *onsafe=0) {
faker_lock.Lock();
int r = fakecattrs[c].setattr(name, value, size);
+ if (onsafe) g_timer.add_event_after(2.0,onsafe);
+ faker_lock.Unlock();
+ return r;
+ }
+ int collection_rmattr(coll_t c, const char *name,
+ Context *onsafe=0) {
+ faker_lock.Lock();
+ int r = fakecattrs[c].rmattr(name);
+ if (onsafe) g_timer.add_event_after(2.0,onsafe);
faker_lock.Unlock();
return r;
}
{
object_t oid = op->get_oid();
pg_t pgid = op->get_pg();
- int r;
// if the target object is locked for writing by another client, put 'op' to the waiting queue
// for _any_ op type -- eg only the locker can unlock!
if (block_if_wrlocked(op))
return; // op will be handled later, after the object becomes unlocked
+ // prepare the transaction
+ ObjectStore::Transaction t;
- // everybody will want to update the version.
- map<const char*, pair<void*,int> > setattrs;
- setattrs["version"] = pair<void*,int>(&version, sizeof(version));
-
- // and pg also gets matching version update.
- map<coll_t, map<const char*, pair<void*,int> > > cmods;
- cmods[pgid] = setattrs;
-
-
- // do the op!
+ // the op
switch (op->get_op()) {
case OSD_OP_WRLOCK:
case OSD_OP_REP_WRLOCK:
- // lock object
- r = store->setattr(oid, "wrlock", &op->get_asker(), sizeof(msg_addr_t), oncommit);
+ { // lock object
+ //r = store->setattr(oid, "wrlock", &op->get_asker(), sizeof(msg_addr_t), oncommit);
+ t.setattr(oid, "wrlock", &op->get_asker(), sizeof(msg_addr_t));
+ }
break;
case OSD_OP_WRUNLOCK:
case OSD_OP_REP_WRUNLOCK:
- // unlock objects
- r = store->rmattr(oid, "wrlock", oncommit);
-
- //unblock all operations that were waiting for this object to become unlocked
- if (waiting_for_wr_unlock.count(oid)) {
- take_waiters(waiting_for_wr_unlock[oid]);
- waiting_for_wr_unlock.erase(oid);
+ { // unlock objects
+ //r = store->rmattr(oid, "wrlock", oncommit);
+ t.rmattr(oid, "wrlock");
+
+ // unblock all operations that were waiting for this object to become unlocked
+ if (waiting_for_wr_unlock.count(oid)) {
+ take_waiters(waiting_for_wr_unlock[oid]);
+ waiting_for_wr_unlock.erase(oid);
+ }
}
break;
//bl = op->get_data();
bl.claim( op->get_data() );
- /* old way
- r = store->write(op->get_oid(),
- op->get_length(),
- op->get_offset(),
- bl,
- oncommit);
- store->setattr(op->get_oid(), "version", &v, sizeof(v));
- store->collection_add(pgid, oid); // FIXME : be careful w/ locking
- */
-
// go
- r = store->write_transaction(op->get_oid(),
- op->get_length(), op->get_offset(),
- bl,
- setattrs, cmods,
- oncommit);
+ //r = store->write(op->get_oid(),
+ // op->get_length(), op->get_offset(),
+ // bl, oncommit);
+ t.write( oid, op->get_offset(), op->get_length(), bl );
}
break;
case OSD_OP_TRUNCATE:
case OSD_OP_REP_TRUNCATE:
- { // truncate
- map<const char*, pair<void*,int> > setattrs;
- setattrs["version"] = pair<void*,int>(&version, sizeof(version));
- r = store->truncate_transaction(oid, op->get_offset(),
- setattrs, cmods,
- oncommit);
+ { // truncate
+ //r = store->truncate(oid, op->get_offset());
+ t.truncate(oid, op->get_length() );
}
break;
case OSD_OP_DELETE:
case OSD_OP_REP_DELETE:
- // delete
- r = store->remove_transaction(oid,
- cmods,
- oncommit);
+ { // delete
+ //r = store->remove(oid);
+ t.remove(oid);
+ }
break;
-
+
default:
assert(0);
}
+
+
+ // log in pg log
+ // FIXME
+
+
+ // object collection, version
+ if (op->get_op() == OSD_OP_TRUNCATE ||
+ op->get_op() == OSD_OP_REP_TRUNCATE) {
+ // remove object from c
+ t.collection_remove(pgid, oid);
+ } else {
+ // add object to c
+ t.collection_add(pgid, oid);
+
+ // object version
+ t.setattr(oid, "version", &version, sizeof(version));
+ }
+
+ // inc pg version
+ t.collection_setattr(pgid, "version", &version, sizeof(version));
+
+ // ok, go!
+ unsigned r = store->apply_transaction(t, oncommit);
+ if (r == 0 && // no errors
+ r == 2) { // or error on collection_add
+ cerr << "error applying transaction: r = " << r << endl;
+ assert(r == 0);
+ }
}
* low-level interface to the local OSD file system
*/
+
+
class ObjectStore {
+public:
+
+ /*********************************
+ * transaction
+ */
+ class Transaction {
+ public:
+ static const int OP_WRITE = 1; // oid, offset, len, bl
+ static const int OP_TRUNCATE = 2; // oid, len
+ static const int OP_REMOVE = 3; // oid
+ static const int OP_SETATTR = 4; // oid, attrname, attrval
+ static const int OP_RMATTR = 5; // oid, attrname
+ static const int OP_MKCOLL = 6; // cid
+ static const int OP_RMCOLL = 7; // cid
+ static const int OP_COLL_ADD = 8; // cid, oid
+ static const int OP_COLL_REMOVE = 9; // cid, oid
+ static const int OP_COLL_SETATTR = 10; // cid, attrname, attrval
+ static const int OP_COLL_RMATTR = 11; // cid, attrname
+
+ list<int> ops;
+ list<bufferlist> bls;
+ list<object_t> oids;
+ list<coll_t> cids;
+ list<off_t> offsets;
+ list<size_t> lengths;
+ list<const char*> attrnames;
+ list< pair<const void*,int> > attrvals;
+
+ void write(object_t oid, off_t off, size_t len, bufferlist& bl) {
+ int op = OP_WRITE;
+ ops.push_back(op);
+ oids.push_back(oid);
+ offsets.push_back(off);
+ lengths.push_back(len);
+ bls.push_back(bl);
+ }
+ void truncate(object_t oid, off_t off) {
+ int op = OP_TRUNCATE;
+ ops.push_back(op);
+ oids.push_back(oid);
+ offsets.push_back(off);
+ }
+ void remove(object_t oid) {
+ int op = OP_REMOVE;
+ ops.push_back(op);
+ oids.push_back(oid);
+ }
+ void setattr(object_t oid, const char* name, const void* val, int len) {
+ int op = OP_SETATTR;
+ ops.push_back(op);
+ oids.push_back(oid);
+ attrnames.push_back(name);
+ attrvals.push_back(pair<const void*,int>(val,len));
+ }
+ void rmattr(object_t oid, const char* name) {
+ int op = OP_RMATTR;
+ ops.push_back(op);
+ oids.push_back(oid);
+ attrnames.push_back(name);
+ }
+ void create_collection(coll_t cid) {
+ int op = OP_MKCOLL;
+ ops.push_back(op);
+ cids.push_back(cid);
+ }
+ void remove_collection(coll_t cid) {
+ int op = OP_RMCOLL;
+ ops.push_back(op);
+ cids.push_back(cid);
+ }
+ void collection_add(coll_t cid, object_t oid) {
+ int op = OP_COLL_ADD;
+ ops.push_back(op);
+ cids.push_back(cid);
+ oids.push_back(oid);
+ }
+ void collection_remove(coll_t cid, object_t oid) {
+ int op = OP_COLL_REMOVE;
+ ops.push_back(op);
+ cids.push_back(cid);
+ oids.push_back(oid);
+ }
+ void collection_setattr(coll_t cid, const char* name, const void* val, int len) {
+ int op = OP_COLL_SETATTR;
+ ops.push_back(op);
+ cids.push_back(cid);
+ attrnames.push_back(name);
+ attrvals.push_back(pair<const void*,int>(val,len));
+ }
+ void collection_rmattr(coll_t cid, const char* name) {
+ int op = OP_COLL_RMATTR;
+ ops.push_back(op);
+ cids.push_back(cid);
+ attrnames.push_back(name);
+ }
+
+ // etc.
+ };
+
+
+
+ /* this implementation is here only for naive ObjectStores that
+ * do not do atomic transactions natively. it is not atomic.
+ */
+ virtual unsigned apply_transaction(Transaction& t, Context *onsafe) {
+ // non-atomic implementation
+ for (list<int>::iterator p = t.ops.begin();
+ p != t.ops.end();
+ p++) {
+ Context *last = 0;
+ if (p == t.ops.end()) last = onsafe;
+
+ switch (*p) {
+ case Transaction::OP_WRITE:
+ {
+ object_t oid = t.oids.front(); t.oids.pop_front();
+ off_t offset = t.offsets.front(); t.offsets.pop_front();
+ size_t len = t.lengths.front(); t.lengths.pop_front();
+ bufferlist bl = t.bls.front(); t.bls.pop_front();
+ write(oid, len, offset, bl, last);
+ }
+ break;
+
+ case Transaction::OP_TRUNCATE:
+ {
+ object_t oid = t.oids.front(); t.oids.pop_front();
+ size_t len = t.lengths.front(); t.lengths.pop_front();
+ truncate(oid, len, last);
+ }
+ break;
+
+ case Transaction::OP_REMOVE:
+ {
+ object_t oid = t.oids.front(); t.oids.pop_front();
+ remove(oid, last);
+ }
+ break;
+
+ case Transaction::OP_SETATTR:
+ {
+ object_t oid = t.oids.front(); t.oids.pop_front();
+ const char *attrname = t.attrnames.front(); t.attrnames.pop_front();
+ pair<const void*,int> attrval = t.attrvals.front(); t.attrvals.pop_front();
+ setattr(oid, attrname, attrval.first, attrval.second, last);
+ }
+ break;
+
+ case Transaction::OP_RMATTR:
+ {
+ object_t oid = t.oids.front(); t.oids.pop_front();
+ const char *attrname = t.attrnames.front(); t.attrnames.pop_front();
+ rmattr(oid, attrname, last);
+ }
+ break;
+
+ case Transaction::OP_MKCOLL:
+ {
+ coll_t cid = t.cids.front(); t.cids.pop_front();
+ create_collection(cid, last);
+ }
+ break;
+
+ case Transaction::OP_RMCOLL:
+ {
+ coll_t cid = t.cids.front(); t.cids.pop_front();
+ destroy_collection(cid, last);
+ }
+ break;
+
+ case Transaction::OP_COLL_ADD:
+ {
+ coll_t cid = t.cids.front(); t.cids.pop_front();
+ object_t oid = t.oids.front(); t.oids.pop_front();
+ collection_add(cid, oid, last);
+ }
+ break;
+
+ case Transaction::OP_COLL_REMOVE:
+ {
+ coll_t cid = t.cids.front(); t.cids.pop_front();
+ object_t oid = t.oids.front(); t.oids.pop_front();
+ collection_remove(cid, oid, last);
+ }
+ break;
+
+ case Transaction::OP_COLL_SETATTR:
+ {
+ coll_t cid = t.cids.front(); t.cids.pop_front();
+ const char *attrname = t.attrnames.front(); t.attrnames.pop_front();
+ pair<const void*,int> attrval = t.attrvals.front(); t.attrvals.pop_front();
+ collection_setattr(cid, attrname, attrval.first, attrval.second, last);
+ }
+ break;
+
+ case Transaction::OP_COLL_RMATTR:
+ {
+ coll_t cid = t.cids.front(); t.cids.pop_front();
+ const char *attrname = t.attrnames.front(); t.attrnames.pop_front();
+ collection_rmattr(cid, attrname, last);
+ }
+ break;
+
+
+ default:
+ cerr << "bad op " << *p << endl;
+ assert(0);
+ }
+ }
+ return 0; // FIXME count errors
+ }
+
+ /*********************************************/
+
+
+
public:
ObjectStore() {}
virtual ~ObjectStore() {}
// collections
virtual int list_collections(list<coll_t>& ls) {return 0;}//= 0;
- virtual int create_collection(coll_t c) {return 0;}//= 0;
- virtual int destroy_collection(coll_t c) {return 0;}//= 0;
+ virtual int create_collection(coll_t c,
+ Context *onsafe=0) {return 0;}//= 0;
+ virtual int destroy_collection(coll_t c,
+ Context *onsafe=0) {return 0;}//= 0;
virtual bool collection_exists(coll_t c) {return 0;}
virtual int collection_stat(coll_t c, struct stat *st) {return 0;}//= 0;
- virtual int collection_add(coll_t c, object_t o) {return 0;}//= 0;
- virtual int collection_remove(coll_t c, object_t o) {return 0;}// = 0;
+ virtual int collection_add(coll_t c, object_t o,
+ Context *onsafe=0) {return 0;}//= 0;
+ virtual int collection_remove(coll_t c, object_t o,
+ Context *onsafe=0) {return 0;}// = 0;
virtual int collection_list(coll_t c, list<object_t>& o) {return 0;}//= 0;
virtual int collection_setattr(coll_t cid, const char *name,
- const void *value, size_t size) {return 0;} //= 0;
+ const void *value, size_t size,
+ Context *onsafe=0) {return 0;} //= 0;
+ virtual int collection_rmattr(coll_t cid, const char *name,
+ Context *onsafe=0) {return 0;} //= 0;
virtual int collection_getattr(coll_t cid, const char *name,
void *value, size_t size) {return 0;} //= 0;
virtual int collection_listattr(coll_t cid, char *attrs, size_t size) {return 0;} //= 0;