== todo
-papers to read
-- gribble et al 2000, scalable distributed hash table
-- sagiv blink trees
-- johnson and colbrook's DE and DB-trees (maybe fewer locks?)
-
+ebofs
+- verify proper behavior of conflicting/overlapping reads of clones
-rados todo
+rados paper todo
- better experiments
- flush log only in response to subsequent read or write?
- better behaving recovery
- snapshots
rados snapshots
-- write is tagged with current_rev.
- - if latest object.rev != current_rev, we clone it before applying write.
-- reads are tagged with readv.
- - we read from newest object whose snapv >= readv.
-
+- attr.crev is rev we were created in.
+- oid.rev=0 is "live". defined for attr.crev <= rev.
+- otherwise, defined for attr.crev <= rev < oid.rev (i.e. oid.rev is upper bound, non-inclusive.)
+
+- write is tagged with op.rev
+ - if latest op.rev != attr.crev,
+ - we clone to oid.rev=rev
+ - clone keeps old crev.
+ - change live attr.crev=rev.
+- delete is tagged with op.rev
+ - rename live to oid.rev = op.rev
+- read is tagged with op.rev
+ - if 0, we read from 0 (if it exists).
+ - otherwise we choose object rev based on op.rev vs oid.rev, and then verifying attr.crev <= op.rev.
partial[cur] = e;
dout(20) << "map_read partial " << *e << endl;
}
- else assert(0);
+ else {
+ dout(0) << "map_read ??? " << *e << endl;
+ assert(0);
+ }
block_t lenfromcur = MIN(e->end() - cur, left);
cur += lenfromcur;
bc->bh_cancel_read(bh);
if (bh->is_tx() && uncom)
bc->bh_cancel_write(bh, super_epoch);
- if (bh->is_partial() && uncom)
- bc->bh_cancel_partial_write(bh);
+ if (bh->shadow_of) {
+ dout(10) << "truncate " << *bh << " unshadowing " << *bh->shadow_of << endl;
+ // shadow
+ bh->shadow_of->remove_shadow(bh);
+ if (bh->is_partial())
+ bc->cancel_shadow_partial(bh->rx_from.start, bh);
+ } else {
+ // normal
+ if (bh->is_partial() && uncom)
+ bc->bh_cancel_partial_write(bh);
+ }
}
for (map<block_t,list<Context*> >::iterator p = bh->waitfor_read.begin();
}
+void ObjectCache::clone_to(Onode *other)
+{
+ ObjectCache *ton = 0;
+
+ for (map<block_t, BufferHead*>::iterator p = data.begin();
+ p != data.end();
+ p++) {
+ BufferHead *bh = p->second;
+ dout(10) << "clone_to ? " << *bh << endl;
+ if (bh->is_dirty() || bh->is_tx() || bh->is_partial()) {
+ // dup dirty or tx bh's
+ if (!ton)
+ ton = other->get_oc(bc);
+ BufferHead *nbh = new BufferHead(ton);
+ nbh->set_start( bh->start() );
+ nbh->set_length( bh->length() );
+ nbh->data = bh->data; // just copy refs to underlying buffers.
+ bc->add_bh(nbh);
+
+ if (bh->is_partial()) {
+ dout(0) << "clone_to PARTIAL FIXME NOT FULLY IMPLEMENTED ******" << endl;
+ nbh->partial = bh->partial;
+ bc->mark_partial(nbh);
+ // register as shadow_partial
+ bc->add_shadow_partial(bh->rx_from.start, nbh);
+ } else {
+ // clean buffer will shadow
+ bh->add_shadow(nbh);
+ bc->mark_clean(nbh);
+ }
+
+ dout(10) << "clone_to dup " << *bh << " -> " << *nbh << endl;
+ }
+ }
+}
+
+
/************** BufferCache ***************/
block_t newleftlen = after - orig->start();
right->set_start( after );
right->set_length( orig->length() - newleftlen );
-
+
// shorten left
stat_sub(orig);
orig->set_length( newleftlen );
right->rx_from.start += newleftlen;
}
+ // dup shadows
+ for (set<BufferHead*>::iterator p = orig->shadows.begin();
+ p != orig->shadows.end();
+ ++p)
+ right->add_shadow(*p);
+
// split buffers too
bufferlist bl;
bl.claim(orig->data);
}
}
- //
+ // shadow partials?
+ {
+ list<Context*> waiters;
+ map<block_t, set<BufferHead*> >::iterator sp = shadow_partials.lower_bound(diskstart);
+ while (sp != shadow_partials.end()) {
+ if (sp->first >= diskstart+length) break;
+ assert(sp->first >= diskstart);
+
+ block_t pblock = sp->first;
+ set<BufferHead*> ls;
+ ls.swap( sp->second );
+
+ map<block_t, set<BufferHead*> >::iterator t = sp;
+ sp++;
+ shadow_partials.erase(t);
+
+ for (set<BufferHead*>::iterator p = ls.begin();
+ p != ls.end();
+ ++p) {
+ BufferHead *bh = *p;
+ dout(10) << "rx_finish applying shadow_partial for " << pblock
+ << " to " << *bh << endl;
+ bufferptr bp = buffer::create_page_aligned(EBOFS_BLOCK_SIZE);
+ bh->data.clear();
+ bh->data.push_back( bp );
+ bh->data.copy_in((pblock-diskstart)*EBOFS_BLOCK_SIZE,
+ (pblock-diskstart+1)*EBOFS_BLOCK_SIZE,
+ bl);
+ bh->apply_partial();
+ bh->set_state(BufferHead::STATE_CLEAN);
+
+ // trigger waiters
+ for (map<block_t,list<Context*> >::iterator p = bh->waitfor_read.begin();
+ p != bh->waitfor_read.end();
+ p++) {
+ assert(p->first >= bh->start() && p->first < bh->end());
+ waiters.splice(waiters.begin(), p->second);
+ }
+ bh->waitfor_read.clear();
+ }
+ }
+
+ // kick waiters
+ finish_contexts(waiters);
+ }
+
+ // done.
ebofs_lock.Unlock();
}
}
+void BufferCache::add_shadow_partial(block_t from, BufferHead *bh)
+{
+ dout(10) << "add_shadow_partial from " << from << " " << *bh << endl;
+ shadow_partials[from].insert(bh);
+}
+void BufferCache::cancel_shadow_partial(block_t from, BufferHead *bh)
+{
+ dout(10) << "cancel_shadow_partial from " << from << " " << *bh << endl;
+ shadow_partials[from].erase(bh);
+}
map<off_t, bufferlist> partial; // partial dirty content overlayed onto incoming data
map< block_t, list<Context*> > waitfor_read;
+
+ set<BufferHead*> shadows; // shadow bh's that clone()ed me.
+ BufferHead* shadow_of;
private:
int ref;
BufferHead(ObjectCache *o) :
oc(o), //cancellable_ioh(0), tx_epoch(0),
rx_ioh(0), tx_ioh(0), tx_block(0), partial_tx_to(0), partial_tx_epoch(0),
+ shadow_of(0),
ref(0), state(STATE_MISSING), epoch_modified(0), version(0), last_flushed(0)
{}
+ ~BufferHead() {
+ unpin_shadows();
+ }
ObjectCache *get_oc() { return oc; }
void set_state(int s) {
if (s == STATE_PARTIAL || s == STATE_RX || s == STATE_TX) get();
if (state == STATE_PARTIAL || state == STATE_RX || state == STATE_TX) put();
+
+ if ((state == STATE_TX && s != STATE_TX) ||
+ (state == STATE_PARTIAL && s != STATE_PARTIAL))
+ unpin_shadows();
+
state = s;
}
int get_state() { return state; }
//void cancel_partials();
//void queue_partial_write(block_t b);
+ void add_shadow(BufferHead *dup) {
+ shadows.insert(dup);
+ dup->shadow_of = this;
+ dup->get();
+ }
+ void remove_shadow(BufferHead *dup) {
+ shadows.erase(dup);
+ dup->shadow_of = 0;
+ dup->put();
+ }
+ void unpin_shadows() {
+ for (set<BufferHead*>::iterator p = shadows.begin();
+ p != shadows.end();
+ ++p) {
+ //cout << "unpin shadow " << *p << endl;
+ (*p)->shadow_of = 0;
+ (*p)->put();
+ }
+ shadows.clear();
+ }
void copy_partial_substr(off_t start, off_t end, bufferlist& bl) {
map<off_t, bufferlist>::iterator i = partial.begin();
void truncate(block_t blocks, version_t super_epoch);
// void tear_down();
+ void clone_to(Onode *other);
+
void dump() {
for (map<block_t,BufferHead*>::iterator i = data.begin();
i != data.end();
};
map<block_t, map<block_t, PartialWrite> > partial_write; // queued writes w/ partial content
+ map<block_t, set<BufferHead*> > shadow_partials;
public:
BufferCache(BlockDevice& d, Mutex& el) :
void queue_partial(block_t from, block_t to, map<off_t, bufferlist>& partial, version_t epoch);
void cancel_partial(block_t from, block_t to, version_t epoch);
+ void add_shadow_partial(block_t from, BufferHead *bh);
+ void cancel_shadow_partial(block_t from, BufferHead *bh);
+
void rx_finish(ObjectCache *oc, ioh_t ioh, block_t start, block_t len, block_t diskstart, bufferlist& bl);
void tx_finish(ObjectCache *oc, ioh_t ioh, block_t start, block_t len, version_t v, version_t e);
void partial_tx_finish(version_t epoch);
<< eo->num_extents << " extents" << endl;
assert(eo->object_id == oid);
}
+ on->readonly = eo->readonly;
on->onode_loc = eo->onode_loc;
on->object_size = eo->object_size;
on->object_blocks = eo->object_blocks;
{
// onode
struct ebofs_onode eo;
+ eo.readonly = on->readonly;
eo.onode_loc = on->onode_loc;
eo.object_id = on->object_id;
eo.object_size = on->object_size;
// get|create inode
Onode *on = get_onode(oid);
if (!on) on = new_onode(oid); // new inode!
+ if (on->readonly) {
+ put_onode(on);
+ return -EACCES;
+ }
dirty_onode(on); // dirty onode!
Onode *on = get_onode(oid);
if (!on)
return -ENOENT;
-
+ if (on->readonly) {
+ put_onode(on);
+ return -EACCES;
+ }
+
int r = 0;
if (size > on->object_size) {
r = -EINVAL; // whatever
}
// truncate buffer cache
- if (on->oc)
+ if (on->oc) {
on->oc->truncate(on->object_blocks, super_epoch);
+ if (on->oc->is_empty())
+ on->close_oc();
+ }
// update uncommitted
interval_set<block_t> uncom;
int Ebofs::_clone(object_t from, object_t to)
{
+ dout(7) << "_clone " << from << " -> " << to << endl;
+
Onode *fon = get_onode(from);
if (!fon) return -ENOENT;
Onode *ton = get_onode(to);
if (ton) {
put_onode(fon);
+ put_onode(ton);
return -EEXIST;
}
ton = new_onode(to);
assert(ton);
// copy easy bits
+ ton->readonly = true;
ton->object_size = fon->object_size;
ton->object_blocks = fon->object_blocks;
ton->attr = fon->attr;
allocator.alloc_inc(p->second);
}
+ // clear uncommitted
+ fon->uncommitted.clear();
+
+ // muck with ObjectCache
+ if (fon->oc)
+ fon->oc->clone_to( ton );
+
+ // ok!
+ put_onode(ton);
+ put_onode(fon);
return 0;
}
Onode *on = get_onode(oid);
if (!on) return -ENOENT;
+ if (on->readonly) {
+ put_onode(on);
+ return -EACCES;
+ }
string n(name);
on->attr[n] = buffer::copy((char*)value, size);
Onode *on = get_onode(oid);
if (!on) return -ENOENT;
+ if (on->readonly) {
+ put_onode(on);
+ return -EACCES;
+ }
on->attr = attrset;
dirty_onode(on);
Onode *on = get_onode(oid);
if (!on) return -ENOENT;
+ if (on->readonly) {
+ put_onode(on);
+ return -EACCES;
+ }
string n(name);
on->attr.erase(n);
version_t version; // incremented on each modify.
// data
+ bool readonly;
Extent onode_loc;
off_t object_size;
unsigned object_blocks;
public:
Onode(object_t oid) : ref(0), object_id(oid), version(0),
- object_size(0), object_blocks(0), oc(0),
- dirty(false), dangling(false), deleted(false) {
+ readonly(false),
+ object_size(0), object_blocks(0), oc(0),
+ dirty(false), dangling(false), deleted(false) {
onode_loc.length = 0;
}
~Onode() {
void get() {
if (ref == 0) lru_pin();
ref++;
- //cout << "ebofs.onode.get " << hex << object_id << dec << " " << ref << endl;
+ cout << "ebofs.onode.get " << hex << object_id << dec << " " << ref << endl;
}
void put() {
ref--;
if (ref == 0) lru_unpin();
- //cout << "ebofs.onode.put " << hex << object_id << dec << " " << ref << endl;
+ cout << "ebofs.onode.put " << hex << object_id << dec << " " << ref << endl;
}
void mark_dirty() {
while (!stop) {
object_t oid;
- oid.ino = (rand() % 2) + 0x10000000;
+ oid.ino = (rand() % 10) + 0x10000000;
coll_t cid = rand() % 50;
off_t off = rand() % 10000;//0;//rand() % 1000000;
off_t len = 1+rand() % 100000;
if (rand() % 2) a = "two";
int l = 3;//rand() % 10;
- switch (rand() % 9) {
+ switch (rand() % 10) {
case 0:
{
+ oid.rev = rand() % 10;
cout << t << " read " << hex << oid << dec << " at " << off << " len " << len << endl;
bufferlist bl;
fs.read(oid, off, len, bl);
fs.truncate(oid, 0);
}
break;
+
+ case 9:
+ {
+ object_t newoid = oid;
+ newoid.rev = rand() % 10;
+ cout << t << " clone " << oid << " to " << newoid << endl;
+ fs.clone(oid, newoid, 0);
+ }
}
Ebofs fs(filename);
if (fs.mount() < 0) return -1;
+
+ // explicit tests
+ if (1) {
+ // verify that clone() plays nice with partial writes
+ object_t oid(1,1);
+ bufferptr bp(10000);
+ bp.zero();
+ bufferlist bl;
+ bl.push_back(bp);
+ fs.write(oid, 0, 10000, bl, 0);
+
+ fs.sync();
+ fs.trim_buffer_cache();
+
+ // induce a partial write
+ bufferlist bl2;
+ bl2.substr_of(bl, 0, 100);
+ fs.write(oid, 100, 100, bl2, 0);
+
+ // clone it
+ object_t oid2;
+ oid2 = oid;
+ oid2.rev = 1;
+ fs.clone(oid, oid2, 0);
+
+ // ...
+ if (0) {
+ // make sure partial still behaves after orig is removed...
+ fs.remove(oid, 0);
+
+ // or i read for oid2...
+ bufferlist rbl;
+ fs.read(oid2, 0, 200, rbl);
+ }
+ if (1) {
+ // make sure things behave if we remove the clone
+ fs.remove(oid2,0);
+ }
+ }
+ // /explicit tests
+
list<Tester*> ls;
for (int i=0; i<threads; i++) {
Tester *t = new Tester(fs);
object_t object_id; /* for kicks */
off_t object_size; /* file size in bytes. should this be 64-bit? */
unsigned object_blocks;
+ bool readonly;
int num_collections;
int num_attr; // num attr in onode