From 33dc2d329db202d1d4fb23141e91fc6d30e4b1de Mon Sep 17 00:00:00 2001 From: sage Date: Thu, 18 May 2006 18:31:00 +0000 Subject: [PATCH] new objectstore transactions. way slicker! git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@770 29311d96-e01e-0410-9327-a35deaab8ce9 --- ceph/ebofs/Ebofs.cc | 620 ++++++++++++++++++++++++++--------------- ceph/ebofs/Ebofs.h | 76 +++-- ceph/osd/Fake.h | 28 +- ceph/osd/OSD.cc | 103 ++++--- ceph/osd/ObjectStore.h | 234 +++++++++++++++- 5 files changed, 742 insertions(+), 319 deletions(-) diff --git a/ceph/ebofs/Ebofs.cc b/ceph/ebofs/Ebofs.cc index 8206c07a3c71d..23f97d3c6bf4d 100644 --- a/ceph/ebofs/Ebofs.cc +++ b/ceph/ebofs/Ebofs.cc @@ -1737,6 +1737,192 @@ bool Ebofs::write_will_block() } +unsigned Ebofs::apply_transaction(Transaction& t, Context *onsafe) +{ + ebofs_lock.Lock(); + dout(7) << "apply_transaction start (" << t.ops.size() << " ops)" << endl; + + // do ops + unsigned r = 0; // bit fields indicate which ops failed. + int bit = 1; + for (list::iterator p = t.ops.begin(); + p != t.ops.end(); + p++) { + switch (*p) { + case Transaction::OP_WRITE: + { + object_t oid = t.oids.front(); t.oids.pop_front(); + off_t offset = t.offsets.front(); t.offsets.pop_front(); + size_t len = t.lengths.front(); t.lengths.pop_front(); + bufferlist bl = t.bls.front(); t.bls.pop_front(); + if (_write(oid, len, offset, bl) < 0) { + dout(7) << "apply_transaction fail on _write" << endl; + r &= bit; + } + } + break; + + case Transaction::OP_TRUNCATE: + { + object_t oid = t.oids.front(); t.oids.pop_front(); + size_t len = t.lengths.front(); t.lengths.pop_front(); + if (_truncate(oid, len) < 0) { + dout(7) << "apply_transaction fail on _truncate" << endl; + r &= bit; + } + } + break; + + case Transaction::OP_REMOVE: + { + object_t oid = t.oids.front(); t.oids.pop_front(); + if (_remove(oid) < 0) { + dout(7) << "apply_transaction fail on _remove" << endl; + r &= bit; + } + } + break; + + case Transaction::OP_SETATTR: + { + object_t oid = t.oids.front(); t.oids.pop_front(); + const char *attrname = t.attrnames.front(); t.attrnames.pop_front(); + pair attrval = t.attrvals.front(); t.attrvals.pop_front(); + if (_setattr(oid, attrname, attrval.first, attrval.second) < 0) { + dout(7) << "apply_transaction fail on _setattr" << endl; + r &= bit; + } + } + break; + + case Transaction::OP_RMATTR: + { + object_t oid = t.oids.front(); t.oids.pop_front(); + const char *attrname = t.attrnames.front(); t.attrnames.pop_front(); + if (_rmattr(oid, attrname) < 0) { + dout(7) << "apply_transaction fail on _rmattr" << endl; + r &= bit; + } + } + break; + + case Transaction::OP_MKCOLL: + { + coll_t cid = t.cids.front(); t.cids.pop_front(); + if (_create_collection(cid) < 0) { + dout(7) << "apply_transaction fail on _create_collection" << endl; + r &= bit; + } + } + break; + + case Transaction::OP_RMCOLL: + { + coll_t cid = t.cids.front(); t.cids.pop_front(); + if (_destroy_collection(cid) < 0) { + dout(7) << "apply_transaction fail on _remove_collection" << endl; + r &= bit; + } + } + break; + + case Transaction::OP_COLL_ADD: + { + coll_t cid = t.cids.front(); t.cids.pop_front(); + object_t oid = t.oids.front(); t.oids.pop_front(); + if (_collection_add(cid, oid) < 0) { + //dout(7) << "apply_transaction fail on _collection_add" << endl; + //r &= bit; + } + } + break; + + case Transaction::OP_COLL_REMOVE: + { + coll_t cid = t.cids.front(); t.cids.pop_front(); + object_t oid = t.oids.front(); t.oids.pop_front(); + if (_collection_remove(cid, oid) < 0) { + dout(7) << "apply_transaction fail on _collection_remove" << endl; + r &= bit; + } + } + break; + + case Transaction::OP_COLL_SETATTR: + { + coll_t cid = t.cids.front(); t.cids.pop_front(); + const char *attrname = t.attrnames.front(); t.attrnames.pop_front(); + pair attrval = t.attrvals.front(); t.attrvals.pop_front(); + if (_collection_setattr(cid, attrname, attrval.first, attrval.second) < 0) { + dout(7) << "apply_transaction fail on _collection_setattr" << endl; + r &= bit; + } + } + break; + + case Transaction::OP_COLL_RMATTR: + { + coll_t cid = t.cids.front(); t.cids.pop_front(); + const char *attrname = t.attrnames.front(); t.attrnames.pop_front(); + if (_collection_rmattr(cid, attrname) < 0) { + dout(7) << "apply_transaction fail on _collection_rmattr" << endl; + r &= bit; + } + } + break; + + default: + cerr << "bad op " << *p << endl; + assert(0); + } + + bit = bit << 1; + } + + dout(7) << "apply_transaction finish (r = " << r << ")" << endl; + + // set up commit waiter + if (r == 0) { + if (onsafe) commit_waiters[super_epoch].push_back(onsafe); + } else { + if (onsafe) delete onsafe; + } + + ebofs_lock.Unlock(); + return r; +} + + + +int Ebofs::_write(object_t oid, size_t length, off_t offset, bufferlist& bl) +{ + dout(7) << "_write " << hex << oid << dec << " len " << length << " off " << offset << endl; + + // out of space? + unsigned max = (length+offset) / EBOFS_BLOCK_SIZE + 10; // very conservative; assumes we have to rewrite + max += dirty_onodes.size() + dirty_cnodes.size(); + if (max >= free_blocks) { + dout(1) << "write failing, only " << free_blocks << " blocks free, may need up to " << max << endl; + return -ENOSPC; + } + + // get|create inode + Onode *on = get_onode(oid); + if (!on) on = new_onode(oid); // new inode! + + dirty_onode(on); // dirty onode! + + // apply write to buffer cache + apply_write(on, length, offset, bl); + + // done. + put_onode(on); + trim_bc(); + + return length; +} + + int Ebofs::write(object_t oid, size_t len, off_t off, bufferlist& bl, bool fsync) @@ -1769,146 +1955,78 @@ int Ebofs::write(object_t oid, int Ebofs::write(object_t oid, size_t len, off_t off, bufferlist& bl, Context *onsafe) -{ - static map > null_setattrs; - static map > > null_cmods; - return write_transaction(oid, len, off, bl, null_setattrs, null_cmods, onsafe); -} - -int Ebofs::write_transaction(object_t oid, - size_t len, off_t off, - bufferlist& bl, - map >& setattrs, - map > >& cmods, - Context *onsafe) { ebofs_lock.Lock(); - dout(7) << "write " << hex << oid << dec << " len " << len << " off " << off << endl; assert(len > 0); // too much unflushed dirty data? (if so, block!) if (_write_will_block()) { - dout(10) << "write blocking" << endl; + dout(10) << "write blocking " + << hex << oid << dec << " len " << len << " off " << off << endl; + while (_write_will_block()) bc.waitfor_stat(); // waits on ebofs_lock - dout(7) << "write unblocked " << hex << oid << dec << " len " << len << " off " << off << endl; - } - // out of space? - //unsigned max = ((len+off) - MIN(off, on->object_size)) / EBOFS_BLOCK_SIZE + 10; - unsigned max = (len+off) / EBOFS_BLOCK_SIZE + 10; // very conservative; assumes we have to rewrite - max += dirty_onodes.size() + dirty_cnodes.size(); - if (max >= free_blocks) { - dout(1) << "write failing, only " << free_blocks << " blocks free, may need up to " << max << endl; - if (onsafe) delete onsafe; - ebofs_lock.Unlock(); - return -ENOSPC; + dout(7) << "write unblocked " + << hex << oid << dec << " len " << len << " off " << off << endl; } - - // get|create inode - Onode *on = get_onode(oid); - if (!on) on = new_onode(oid); // new inode! - dirty_onode(on); // dirty onode! - - // apply write to buffer cache - apply_write(on, len, off, bl); - - // join collections? - for (map > >::iterator cit = cmods.begin(); - cit != cmods.end(); - cit++) { - const coll_t cid = cit->first; - if (_collection_exists(cid)) { - if (oc_tab->lookup(idpair_t(oid,cid)) < 0) { - oc_tab->insert(idpair_t(oid,cid), true); - co_tab->insert(idpair_t(cid,oid), true); - } - } - } + // go + int r = _write(oid, len, off, bl); - // apply attribute changes - do_setattrs(on, setattrs); - do_csetattrs(cmods); - - // prepare (eventual) journal entry? - - // set up commit waiter - if (onsafe) { - // commit on next full fs commit. - // FIXME when we add journaling. - commit_waiters[super_epoch].push_back(onsafe); - //on->commit_waiters.push_back(onsafe); // in case we delete the object. + // commit waiter + if (r > 0) { + assert((size_t)r == len); + if (onsafe) commit_waiters[super_epoch].push_back(onsafe); + } else { + if (onsafe) delete onsafe; } - // done - put_onode(on); - - trim_bc(); - ebofs_lock.Unlock(); - return len; + return r; } - -int Ebofs::remove(object_t oid, Context *onsafe) +int Ebofs::_remove(object_t oid) { - static map > > null_cmods; - return remove_transaction(oid, null_cmods, onsafe); -} + dout(7) << "_remove " << hex << oid << dec << endl; -int Ebofs::remove_transaction(object_t oid, - map > >& cmods, - Context *onsafe) -{ - ebofs_lock.Lock(); - dout(7) << "remove " << hex << oid << dec << endl; - // get inode Onode *on = get_onode(oid); - if (!on) { - ebofs_lock.Unlock(); - return -ENOENT; - } + if (!on) return -ENOENT; // ok remove it! remove_onode(on); - // do collection mods - do_csetattrs(cmods); + return 0; +} + + +int Ebofs::remove(object_t oid, Context *onsafe) +{ + ebofs_lock.Lock(); + + // do it + int r = _remove(oid); // set up commit waiter - if (onsafe) { - // commit on next full fs commit. - // FIXME when we add journaling. - commit_waiters[super_epoch].push_back(onsafe); + if (r >= 0) { + if (onsafe) commit_waiters[super_epoch].push_back(onsafe); + } else { + if (onsafe) delete onsafe; } ebofs_lock.Unlock(); - return 0; + return r; } -int Ebofs::truncate(object_t oid, off_t size, Context *onsafe) +int Ebofs::_truncate(object_t oid, off_t size) { - static map > null_setattrs; - static map > > null_cmods; - return truncate_transaction(oid, size, null_setattrs, null_cmods, onsafe); -} + dout(7) << "_truncate " << hex << oid << dec << " size " << size << endl; -int Ebofs::truncate_transaction(object_t oid, off_t size, - map >& setattrs, - map > >& cmods, - Context *onsafe) -{ - ebofs_lock.Lock(); - dout(7) << "truncate " << hex << oid << dec << " size " << size << endl; - Onode *on = get_onode(oid); - if (!on) { - ebofs_lock.Unlock(); + if (!on) return -ENOENT; - } int r = 0; if (size > on->object_size) { @@ -1943,15 +2061,24 @@ int Ebofs::truncate_transaction(object_t oid, off_t size, assert(size == on->object_size); } - // apply attribute changes - do_setattrs(on, setattrs); - do_csetattrs(cmods); + put_onode(on); + return r; +} - // set up commit waiter - if (onsafe) - commit_waiters[super_epoch].push_back(onsafe); + +int Ebofs::truncate(object_t oid, off_t size, Context *onsafe) +{ + ebofs_lock.Lock(); - put_onode(on); + int r = _truncate(oid, size); + + // set up commit waiter + if (r >= 0) { + if (onsafe) commit_waiters[super_epoch].push_back(onsafe); + } else { + if (onsafe) delete onsafe; + } + ebofs_lock.Unlock(); return r; } @@ -1985,76 +2112,36 @@ int Ebofs::stat(object_t oid, struct stat *st) return 0; } -// attributes -void Ebofs::do_setattrs(Onode *on, map > &setattrs) +int Ebofs::_setattr(object_t oid, const char *name, const void *value, size_t size) { - for (map >::iterator it = setattrs.begin(); - it != setattrs.end(); - it++) { - string n(it->first); - if (it->second.first) { - AttrVal val((char*)it->second.first, it->second.second); - on->attr[n] = val; - } else { - on->attr.erase(n); - } - } -} - -void Ebofs::do_csetattrs(map > > &cmods) -{ - for (map > >::iterator cit = cmods.begin(); - cit != cmods.end(); - cit++) { - const coll_t cid = cit->first; - - if (!cit->second.empty()) { - // attrs - Cnode *cn = get_cnode(cid); - if (cn) { - for (map >::iterator ait = cit->second.begin(); - ait != cit->second.end(); - ait++) { - string n(ait->first); - if (ait->second.first) { - AttrVal val((char*)ait->second.first, ait->second.second); - cn->attr[n] = val; - } else { - cn->attr.erase(n); - } - } - dirty_cnode(cn); - put_cnode(cn); - } else { - dout(0) << "warning: collection " << hex << cid << dec << " dne for csetattrs" << endl; - } - } - } -} - -int Ebofs::setattr(object_t oid, const char *name, const void *value, size_t size, Context *onsafe) -{ - ebofs_lock.Lock(); dout(8) << "setattr " << hex << oid << dec << " '" << name << "' len " << size << endl; Onode *on = get_onode(oid); - if (!on) { - ebofs_lock.Unlock(); - return -ENOENT; - } + if (!on) return -ENOENT; string n(name); AttrVal val((char*)value, size); on->attr[n] = val; dirty_onode(on); + put_onode(on); + return 0; +} - if (onsafe) - commit_waiters[super_epoch].push_back(onsafe); +int Ebofs::setattr(object_t oid, const char *name, const void *value, size_t size, Context *onsafe) +{ + ebofs_lock.Lock(); + int r = _setattr(oid, name, value, size); + + // set up commit waiter + if (r >= 0) { + if (onsafe) commit_waiters[super_epoch].push_back(onsafe); + } else { + if (onsafe) delete onsafe; + } - put_onode(on); ebofs_lock.Unlock(); - return 0; + return r; } int Ebofs::getattr(object_t oid, const char *name, void *value, size_t size) @@ -2081,29 +2168,38 @@ int Ebofs::getattr(object_t oid, const char *name, void *value, size_t size) return r; } -int Ebofs::rmattr(object_t oid, const char *name, Context *onsafe) + +int Ebofs::_rmattr(object_t oid, const char *name) { - ebofs_lock.Lock(); - dout(8) << "rmattr " << hex << oid << dec << " '" << name << "'" << endl; + dout(8) << "_rmattr " << hex << oid << dec << " '" << name << "'" << endl; Onode *on = get_onode(oid); - if (!on) { - ebofs_lock.Unlock(); - return -ENOENT; - } + if (!on) return -ENOENT; string n(name); on->attr.erase(n); - - if (onsafe) - commit_waiters[super_epoch].push_back(onsafe); - dirty_onode(on); put_onode(on); - ebofs_lock.Unlock(); return 0; } +int Ebofs::rmattr(object_t oid, const char *name, Context *onsafe) +{ + ebofs_lock.Lock(); + + int r = _rmattr(oid, name); + + // set up commit waiter + if (r >= 0) { + if (onsafe) commit_waiters[super_epoch].push_back(onsafe); + } else { + if (onsafe) delete onsafe; + } + + ebofs_lock.Unlock(); + return r; +} + int Ebofs::listattr(object_t oid, vector& attrs) { ebofs_lock.Lock(); @@ -2151,32 +2247,42 @@ int Ebofs::list_collections(list& ls) return num; } -int Ebofs::create_collection(coll_t cid) +int Ebofs::_create_collection(coll_t cid) { - ebofs_lock.Lock(); - dout(9) << "create_collection " << hex << cid << dec << endl; + dout(9) << "_create_collection " << hex << cid << dec << endl; - if (_collection_exists(cid)) { - ebofs_lock.Unlock(); + if (_collection_exists(cid)) return -EEXIST; - } Cnode *cn = new_cnode(cid); put_cnode(cn); + + return 0; +} + +int Ebofs::create_collection(coll_t cid, Context *onsafe) +{ + ebofs_lock.Lock(); + + int r = _create_collection(cid); + + // set up commit waiter + if (r >= 0) { + if (onsafe) commit_waiters[super_epoch].push_back(onsafe); + } else { + if (onsafe) delete onsafe; + } ebofs_lock.Unlock(); - return 0; + return r; } -int Ebofs::destroy_collection(coll_t cid) +int Ebofs::_destroy_collection(coll_t cid) { - ebofs_lock.Lock(); - dout(9) << "destroy_collection " << hex << cid << dec << endl; + dout(9) << "_destroy_collection " << hex << cid << dec << endl; - if (!_collection_exists(cid)) { - ebofs_lock.Unlock(); + if (!_collection_exists(cid)) return -ENOENT; - } Cnode *cn = get_cnode(cid); assert(cn); @@ -2192,15 +2298,30 @@ int Ebofs::destroy_collection(coll_t cid) } remove_cnode(cn); - ebofs_lock.Lock(); return 0; } +int Ebofs::destroy_collection(coll_t cid, Context *onsafe) +{ + ebofs_lock.Lock(); + + int r = _destroy_collection(cid); + + // set up commit waiter + if (r >= 0) { + if (onsafe) commit_waiters[super_epoch].push_back(onsafe); + } else { + if (onsafe) delete onsafe; + } + + ebofs_lock.Unlock(); + return r; +} + bool Ebofs::collection_exists(coll_t cid) { ebofs_lock.Lock(); dout(10) << "collection_exists " << hex << cid << dec << endl; - bool r = _collection_exists(cid); ebofs_lock.Unlock(); return r; @@ -2210,32 +2331,46 @@ bool Ebofs::_collection_exists(coll_t cid) return (collection_tab->lookup(cid) == 0); } -int Ebofs::collection_add(coll_t cid, object_t oid) +int Ebofs::_collection_add(coll_t cid, object_t oid) { - ebofs_lock.Lock(); - dout(9) << "collection_add " << hex << cid << " object " << oid << dec << endl; + dout(9) << "_collection_add " << hex << cid << " object " << oid << dec << endl; - if (!_collection_exists(cid)) { - ebofs_lock.Unlock(); + if (!_collection_exists(cid)) return -ENOENT; - } + if (oc_tab->lookup(idpair_t(oid,cid)) < 0) { oc_tab->insert(idpair_t(oid,cid), true); co_tab->insert(idpair_t(cid,oid), true); + return 0; + } else { + return -ENOENT; // FIXME? } +} + +int Ebofs::collection_add(coll_t cid, object_t oid, Context *onsafe) +{ + ebofs_lock.Lock(); + + int r = _collection_add(cid, oid); + + // set up commit waiter + if (r >= 0) { + if (onsafe) commit_waiters[super_epoch].push_back(onsafe); + } else { + if (onsafe) delete onsafe; + } + ebofs_lock.Unlock(); return 0; } -int Ebofs::collection_remove(coll_t cid, object_t oid) +int Ebofs::_collection_remove(coll_t cid, object_t oid) { - ebofs_lock.Lock(); - dout(9) << "collection_remove " << hex << cid << " object " << oid << dec << endl; + dout(9) << "_collection_remove " << hex << cid << " object " << oid << dec << endl; - if (!_collection_exists(cid)) { - ebofs_lock.Unlock(); + if (!_collection_exists(cid)) return -ENOENT; - } + oc_tab->remove(idpair_t(oid,cid)); co_tab->remove(idpair_t(cid,oid)); @@ -2245,6 +2380,23 @@ int Ebofs::collection_remove(coll_t cid, object_t oid) cnode_map.erase(cid); cnode_lru.lru_remove(cn); delete cn; + return 0; + } else { + return -ENOENT; // FIXME? + } +} + +int Ebofs::collection_remove(coll_t cid, object_t oid, Context *onsafe) +{ + ebofs_lock.Lock(); + + int r = _collection_remove(cid, oid); + + // set up commit waiter + if (r >= 0) { + if (onsafe) commit_waiters[super_epoch].push_back(onsafe); + } else { + if (onsafe) delete onsafe; } ebofs_lock.Unlock(); @@ -2280,23 +2432,36 @@ int Ebofs::collection_list(coll_t cid, list& ls) } -int Ebofs::collection_setattr(coll_t cid, const char *name, const void *value, size_t size) +int Ebofs::_collection_setattr(coll_t cid, const char *name, const void *value, size_t size) { - ebofs_lock.Lock(); - dout(10) << "collection_setattr " << hex << cid << dec << " '" << name << "' len " << size << endl; + dout(10) << "_collection_setattr " << hex << cid << dec << " '" << name << "' len " << size << endl; Cnode *cn = get_cnode(cid); - if (!cn) { - ebofs_lock.Unlock(); - return -ENOENT; - } + if (!cn) return -ENOENT; string n(name); AttrVal val((char*)value, size); cn->attr[n] = val; dirty_cnode(cn); - put_cnode(cn); + + return 0; +} + +int Ebofs::collection_setattr(coll_t cid, const char *name, const void *value, size_t size, Context *onsafe) +{ + ebofs_lock.Lock(); + dout(10) << "collection_setattr " << hex << cid << dec << " '" << name << "' len " << size << endl; + + int r = _collection_setattr(cid, name, value, size); + + // set up commit waiter + if (r >= 0) { + if (onsafe) commit_waiters[super_epoch].push_back(onsafe); + } else { + if (onsafe) delete onsafe; + } + ebofs_lock.Unlock(); return 0; } @@ -2326,22 +2491,35 @@ int Ebofs::collection_getattr(coll_t cid, const char *name, void *value, size_t return r; } -int Ebofs::collection_rmattr(coll_t cid, const char *name) +int Ebofs::_collection_rmattr(coll_t cid, const char *name) { - ebofs_lock.Lock(); - dout(10) << "collection_rmattr " << hex << cid << dec << " '" << name << "'" << endl; + dout(10) << "_collection_rmattr " << hex << cid << dec << " '" << name << "'" << endl; Cnode *cn = get_cnode(cid); - if (!cn) { - ebofs_lock.Unlock(); - return -ENOENT; - } + if (!cn) return -ENOENT; string n(name); cn->attr.erase(n); dirty_cnode(cn); put_cnode(cn); + + return 0; +} + +int Ebofs::collection_rmattr(coll_t cid, const char *name, Context *onsafe) +{ + ebofs_lock.Lock(); + + int r = _collection_rmattr(cid, name); + + // set up commit waiter + if (r >= 0) { + if (onsafe) commit_waiters[super_epoch].push_back(onsafe); + } else { + if (onsafe) delete onsafe; + } + ebofs_lock.Unlock(); return 0; } diff --git a/ceph/ebofs/Ebofs.h b/ceph/ebofs/Ebofs.h index cacca9b5de32d..ed2b4c4d2ffef 100644 --- a/ceph/ebofs/Ebofs.h +++ b/ceph/ebofs/Ebofs.h @@ -196,7 +196,6 @@ class Ebofs : public ObjectStore { } finisher_thread; - bool _write_will_block(); void alloc_more_node_space(); void do_csetattrs(map > > &cmods); @@ -232,63 +231,56 @@ class Ebofs : public ObjectStore { int statfs(struct statfs *buf); + // atomic transaction + unsigned apply_transaction(Transaction& t, Context *onsafe); // object interface bool exists(object_t); int stat(object_t, struct stat*); int read(object_t, size_t len, off_t off, bufferlist& bl); - int write(object_t oid, - size_t len, off_t off, - bufferlist& bl, bool fsync=true); - int write(object_t oid, - size_t len, off_t offset, - bufferlist& bl, - Context *onsafe); - int write_transaction(object_t oid, - size_t len, off_t offset, - bufferlist& bl, - map >& setattrs, - map > >& cmods, - Context *onsafe); - - int truncate(object_t oid, off_t size, - Context *onsafe=0); - int truncate_transaction(object_t oid, off_t size, - map >& setattrs, - map > >& cmods, - Context *onsafe); - - int remove(object_t oid, - Context *onsafe=0); - int remove_transaction(object_t, - map > >& cmods, - Context *onsafe=0); - + int write(object_t oid, size_t len, off_t off, bufferlist& bl, bool fsync=true); + int write(object_t oid, size_t len, off_t offset, bufferlist& bl, Context *onsafe); + int truncate(object_t oid, off_t size, Context *onsafe=0); + int remove(object_t oid, Context *onsafe=0); bool write_will_block(); // object attr - int setattr(object_t oid, const char *name, const void *value, size_t size, - Context *onsafe=0); + int setattr(object_t oid, const char *name, const void *value, size_t size, Context *onsafe=0); int getattr(object_t oid, const char *name, void *value, size_t size); - int rmattr(object_t oid, const char *name, - Context *onsafe=0); + int rmattr(object_t oid, const char *name, Context *onsafe=0); int listattr(object_t oid, vector& attrs); - + // collections int list_collections(list& ls); - //int collection_stat(coll_t c, struct stat *st); - int create_collection(coll_t c); - int destroy_collection(coll_t c); - - bool _collection_exists(coll_t c); bool collection_exists(coll_t c); - int collection_add(coll_t c, object_t o); - int collection_remove(coll_t c, object_t o); + + int create_collection(coll_t c, Context *onsafe); + int destroy_collection(coll_t c, Context *onsafe); + int collection_add(coll_t c, object_t o, Context *onsafe); + int collection_remove(coll_t c, object_t o, Context *onsafe); + int collection_list(coll_t c, list& o); - int collection_setattr(object_t oid, const char *name, const void *value, size_t size); + int collection_setattr(object_t oid, const char *name, const void *value, size_t size, Context *onsafe); int collection_getattr(object_t oid, const char *name, void *value, size_t size); - int collection_rmattr(coll_t cid, const char *name); + int collection_rmattr(coll_t cid, const char *name, Context *onsafe); int collection_listattr(object_t oid, vector& attrs); + +private: + // private interface -- use if caller already holds lock + bool _write_will_block(); + int _write(object_t oid, size_t len, off_t offset, bufferlist& bl); + int _truncate(object_t oid, off_t size); + int _remove(object_t oid); + int _setattr(object_t oid, const char *name, const void *value, size_t size); + int _rmattr(object_t oid, const char *name); + bool _collection_exists(coll_t c); + int _create_collection(coll_t c); + int _destroy_collection(coll_t c); + int _collection_add(coll_t c, object_t o); + int _collection_remove(coll_t c, object_t o); + int _collection_setattr(object_t oid, const char *name, const void *value, size_t size); + int _collection_rmattr(coll_t cid, const char *name); + }; diff --git a/ceph/osd/Fake.h b/ceph/osd/Fake.h index 3d20fe7239276..44bde8b42ab1d 100644 --- a/ceph/osd/Fake.h +++ b/ceph/osd/Fake.h @@ -45,19 +45,23 @@ class FakeStoreCollections { return r; } - int create_collection(coll_t c) { + int create_collection(coll_t c, + Context *onsafe=0) { faker_lock.Lock(); fakecollections[c].size(); + if (onsafe) g_timer.add_event_after(2.0,onsafe); faker_lock.Unlock(); return 0; } - int destroy_collection(coll_t c) { + int destroy_collection(coll_t c, + Context *onsafe=0) { int r = 0; faker_lock.Lock(); if (fakecollections.count(c)) { fakecollections.erase(c); //fakecattr.erase(c); + if (onsafe) g_timer.add_event_after(2.0,onsafe); } else r = -1; faker_lock.Unlock(); @@ -75,16 +79,20 @@ class FakeStoreCollections { return r; } - int collection_add(coll_t c, object_t o) { + int collection_add(coll_t c, object_t o, + Context *onsafe=0) { faker_lock.Lock(); fakecollections[c].insert(o); + if (onsafe) g_timer.add_event_after(2.0,onsafe); faker_lock.Unlock(); return 0; } - int collection_remove(coll_t c, object_t o) { + int collection_remove(coll_t c, object_t o, + Context *onsafe=0) { faker_lock.Lock(); fakecollections[c].erase(o); + if (onsafe) g_timer.add_event_after(2.0,onsafe); faker_lock.Unlock(); return 0; } @@ -182,9 +190,19 @@ class FakeStoreAttrs { } int collection_setattr(coll_t c, const char *name, - void *value, size_t size) { + void *value, size_t size, + Context *onsafe=0) { faker_lock.Lock(); int r = fakecattrs[c].setattr(name, value, size); + if (onsafe) g_timer.add_event_after(2.0,onsafe); + faker_lock.Unlock(); + return r; + } + int collection_rmattr(coll_t c, const char *name, + Context *onsafe=0) { + faker_lock.Lock(); + int r = fakecattrs[c].rmattr(name); + if (onsafe) g_timer.add_event_after(2.0,onsafe); faker_lock.Unlock(); return r; } diff --git a/ceph/osd/OSD.cc b/ceph/osd/OSD.cc index 08925e221b1a5..49b7fe6b1ca47 100644 --- a/ceph/osd/OSD.cc +++ b/ceph/osd/OSD.cc @@ -2041,40 +2041,36 @@ void OSD::op_apply(MOSDOp *op, version_t version, Context* oncommit) { object_t oid = op->get_oid(); pg_t pgid = op->get_pg(); - int r; // if the target object is locked for writing by another client, put 'op' to the waiting queue // for _any_ op type -- eg only the locker can unlock! if (block_if_wrlocked(op)) return; // op will be handled later, after the object becomes unlocked + // prepare the transaction + ObjectStore::Transaction t; - // everybody will want to update the version. - map > setattrs; - setattrs["version"] = pair(&version, sizeof(version)); - - // and pg also gets matching version update. - map > > cmods; - cmods[pgid] = setattrs; - - - // do the op! + // the op switch (op->get_op()) { case OSD_OP_WRLOCK: case OSD_OP_REP_WRLOCK: - // lock object - r = store->setattr(oid, "wrlock", &op->get_asker(), sizeof(msg_addr_t), oncommit); + { // lock object + //r = store->setattr(oid, "wrlock", &op->get_asker(), sizeof(msg_addr_t), oncommit); + t.setattr(oid, "wrlock", &op->get_asker(), sizeof(msg_addr_t)); + } break; case OSD_OP_WRUNLOCK: case OSD_OP_REP_WRUNLOCK: - // unlock objects - r = store->rmattr(oid, "wrlock", oncommit); - - //unblock all operations that were waiting for this object to become unlocked - if (waiting_for_wr_unlock.count(oid)) { - take_waiters(waiting_for_wr_unlock[oid]); - waiting_for_wr_unlock.erase(oid); + { // unlock objects + //r = store->rmattr(oid, "wrlock", oncommit); + t.rmattr(oid, "wrlock"); + + // unblock all operations that were waiting for this object to become unlocked + if (waiting_for_wr_unlock.count(oid)) { + take_waiters(waiting_for_wr_unlock[oid]); + waiting_for_wr_unlock.erase(oid); + } } break; @@ -2086,45 +2082,60 @@ void OSD::op_apply(MOSDOp *op, version_t version, Context* oncommit) //bl = op->get_data(); bl.claim( op->get_data() ); - /* old way - r = store->write(op->get_oid(), - op->get_length(), - op->get_offset(), - bl, - oncommit); - store->setattr(op->get_oid(), "version", &v, sizeof(v)); - store->collection_add(pgid, oid); // FIXME : be careful w/ locking - */ - // go - r = store->write_transaction(op->get_oid(), - op->get_length(), op->get_offset(), - bl, - setattrs, cmods, - oncommit); + //r = store->write(op->get_oid(), + // op->get_length(), op->get_offset(), + // bl, oncommit); + t.write( oid, op->get_offset(), op->get_length(), bl ); } break; case OSD_OP_TRUNCATE: case OSD_OP_REP_TRUNCATE: - { // truncate - map > setattrs; - setattrs["version"] = pair(&version, sizeof(version)); - r = store->truncate_transaction(oid, op->get_offset(), - setattrs, cmods, - oncommit); + { // truncate + //r = store->truncate(oid, op->get_offset()); + t.truncate(oid, op->get_length() ); } break; case OSD_OP_DELETE: case OSD_OP_REP_DELETE: - // delete - r = store->remove_transaction(oid, - cmods, - oncommit); + { // delete + //r = store->remove(oid); + t.remove(oid); + } break; - + default: assert(0); } + + + // log in pg log + // FIXME + + + // object collection, version + if (op->get_op() == OSD_OP_TRUNCATE || + op->get_op() == OSD_OP_REP_TRUNCATE) { + // remove object from c + t.collection_remove(pgid, oid); + } else { + // add object to c + t.collection_add(pgid, oid); + + // object version + t.setattr(oid, "version", &version, sizeof(version)); + } + + // inc pg version + t.collection_setattr(pgid, "version", &version, sizeof(version)); + + // ok, go! + unsigned r = store->apply_transaction(t, oncommit); + if (r == 0 && // no errors + r == 2) { // or error on collection_add + cerr << "error applying transaction: r = " << r << endl; + assert(r == 0); + } } diff --git a/ceph/osd/ObjectStore.h b/ceph/osd/ObjectStore.h index bff84b1e8bbca..03c6ab2a78479 100644 --- a/ceph/osd/ObjectStore.h +++ b/ceph/osd/ObjectStore.h @@ -35,7 +35,224 @@ using namespace std; * low-level interface to the local OSD file system */ + + class ObjectStore { +public: + + /********************************* + * transaction + */ + class Transaction { + public: + static const int OP_WRITE = 1; // oid, offset, len, bl + static const int OP_TRUNCATE = 2; // oid, len + static const int OP_REMOVE = 3; // oid + static const int OP_SETATTR = 4; // oid, attrname, attrval + static const int OP_RMATTR = 5; // oid, attrname + static const int OP_MKCOLL = 6; // cid + static const int OP_RMCOLL = 7; // cid + static const int OP_COLL_ADD = 8; // cid, oid + static const int OP_COLL_REMOVE = 9; // cid, oid + static const int OP_COLL_SETATTR = 10; // cid, attrname, attrval + static const int OP_COLL_RMATTR = 11; // cid, attrname + + list ops; + list bls; + list oids; + list cids; + list offsets; + list lengths; + list attrnames; + list< pair > attrvals; + + void write(object_t oid, off_t off, size_t len, bufferlist& bl) { + int op = OP_WRITE; + ops.push_back(op); + oids.push_back(oid); + offsets.push_back(off); + lengths.push_back(len); + bls.push_back(bl); + } + void truncate(object_t oid, off_t off) { + int op = OP_TRUNCATE; + ops.push_back(op); + oids.push_back(oid); + offsets.push_back(off); + } + void remove(object_t oid) { + int op = OP_REMOVE; + ops.push_back(op); + oids.push_back(oid); + } + void setattr(object_t oid, const char* name, const void* val, int len) { + int op = OP_SETATTR; + ops.push_back(op); + oids.push_back(oid); + attrnames.push_back(name); + attrvals.push_back(pair(val,len)); + } + void rmattr(object_t oid, const char* name) { + int op = OP_RMATTR; + ops.push_back(op); + oids.push_back(oid); + attrnames.push_back(name); + } + void create_collection(coll_t cid) { + int op = OP_MKCOLL; + ops.push_back(op); + cids.push_back(cid); + } + void remove_collection(coll_t cid) { + int op = OP_RMCOLL; + ops.push_back(op); + cids.push_back(cid); + } + void collection_add(coll_t cid, object_t oid) { + int op = OP_COLL_ADD; + ops.push_back(op); + cids.push_back(cid); + oids.push_back(oid); + } + void collection_remove(coll_t cid, object_t oid) { + int op = OP_COLL_REMOVE; + ops.push_back(op); + cids.push_back(cid); + oids.push_back(oid); + } + void collection_setattr(coll_t cid, const char* name, const void* val, int len) { + int op = OP_COLL_SETATTR; + ops.push_back(op); + cids.push_back(cid); + attrnames.push_back(name); + attrvals.push_back(pair(val,len)); + } + void collection_rmattr(coll_t cid, const char* name) { + int op = OP_COLL_RMATTR; + ops.push_back(op); + cids.push_back(cid); + attrnames.push_back(name); + } + + // etc. + }; + + + + /* this implementation is here only for naive ObjectStores that + * do not do atomic transactions natively. it is not atomic. + */ + virtual unsigned apply_transaction(Transaction& t, Context *onsafe) { + // non-atomic implementation + for (list::iterator p = t.ops.begin(); + p != t.ops.end(); + p++) { + Context *last = 0; + if (p == t.ops.end()) last = onsafe; + + switch (*p) { + case Transaction::OP_WRITE: + { + object_t oid = t.oids.front(); t.oids.pop_front(); + off_t offset = t.offsets.front(); t.offsets.pop_front(); + size_t len = t.lengths.front(); t.lengths.pop_front(); + bufferlist bl = t.bls.front(); t.bls.pop_front(); + write(oid, len, offset, bl, last); + } + break; + + case Transaction::OP_TRUNCATE: + { + object_t oid = t.oids.front(); t.oids.pop_front(); + size_t len = t.lengths.front(); t.lengths.pop_front(); + truncate(oid, len, last); + } + break; + + case Transaction::OP_REMOVE: + { + object_t oid = t.oids.front(); t.oids.pop_front(); + remove(oid, last); + } + break; + + case Transaction::OP_SETATTR: + { + object_t oid = t.oids.front(); t.oids.pop_front(); + const char *attrname = t.attrnames.front(); t.attrnames.pop_front(); + pair attrval = t.attrvals.front(); t.attrvals.pop_front(); + setattr(oid, attrname, attrval.first, attrval.second, last); + } + break; + + case Transaction::OP_RMATTR: + { + object_t oid = t.oids.front(); t.oids.pop_front(); + const char *attrname = t.attrnames.front(); t.attrnames.pop_front(); + rmattr(oid, attrname, last); + } + break; + + case Transaction::OP_MKCOLL: + { + coll_t cid = t.cids.front(); t.cids.pop_front(); + create_collection(cid, last); + } + break; + + case Transaction::OP_RMCOLL: + { + coll_t cid = t.cids.front(); t.cids.pop_front(); + destroy_collection(cid, last); + } + break; + + case Transaction::OP_COLL_ADD: + { + coll_t cid = t.cids.front(); t.cids.pop_front(); + object_t oid = t.oids.front(); t.oids.pop_front(); + collection_add(cid, oid, last); + } + break; + + case Transaction::OP_COLL_REMOVE: + { + coll_t cid = t.cids.front(); t.cids.pop_front(); + object_t oid = t.oids.front(); t.oids.pop_front(); + collection_remove(cid, oid, last); + } + break; + + case Transaction::OP_COLL_SETATTR: + { + coll_t cid = t.cids.front(); t.cids.pop_front(); + const char *attrname = t.attrnames.front(); t.attrnames.pop_front(); + pair attrval = t.attrvals.front(); t.attrvals.pop_front(); + collection_setattr(cid, attrname, attrval.first, attrval.second, last); + } + break; + + case Transaction::OP_COLL_RMATTR: + { + coll_t cid = t.cids.front(); t.cids.pop_front(); + const char *attrname = t.attrnames.front(); t.attrnames.pop_front(); + collection_rmattr(cid, attrname, last); + } + break; + + + default: + cerr << "bad op " << *p << endl; + assert(0); + } + } + return 0; // FIXME count errors + } + + /*********************************************/ + + + public: ObjectStore() {} virtual ~ObjectStore() {} @@ -141,16 +358,23 @@ class ObjectStore { // collections virtual int list_collections(list& ls) {return 0;}//= 0; - virtual int create_collection(coll_t c) {return 0;}//= 0; - virtual int destroy_collection(coll_t c) {return 0;}//= 0; + virtual int create_collection(coll_t c, + Context *onsafe=0) {return 0;}//= 0; + virtual int destroy_collection(coll_t c, + Context *onsafe=0) {return 0;}//= 0; virtual bool collection_exists(coll_t c) {return 0;} virtual int collection_stat(coll_t c, struct stat *st) {return 0;}//= 0; - virtual int collection_add(coll_t c, object_t o) {return 0;}//= 0; - virtual int collection_remove(coll_t c, object_t o) {return 0;}// = 0; + virtual int collection_add(coll_t c, object_t o, + Context *onsafe=0) {return 0;}//= 0; + virtual int collection_remove(coll_t c, object_t o, + Context *onsafe=0) {return 0;}// = 0; virtual int collection_list(coll_t c, list& o) {return 0;}//= 0; virtual int collection_setattr(coll_t cid, const char *name, - const void *value, size_t size) {return 0;} //= 0; + const void *value, size_t size, + Context *onsafe=0) {return 0;} //= 0; + virtual int collection_rmattr(coll_t cid, const char *name, + Context *onsafe=0) {return 0;} //= 0; virtual int collection_getattr(coll_t cid, const char *name, void *value, size_t size) {return 0;} //= 0; virtual int collection_listattr(coll_t cid, char *attrs, size_t size) {return 0;} //= 0; -- 2.39.5