else if (bh->is_partial()) {
dout(10) << "rx_finish partial -> tx on " << *bh << dendl;
- if (1) {
- // double-check what block i am
- vector<Extent> exv;
- on->map_extents(bh->start(), 1, exv, 0);
- assert(exv.size() == 1);
- assert(exv[0].start != 0);
- block_t cur_block = exv[0].start;
- assert(cur_block == bh->partial_tx_to);
- }
+ // see what block i am
+ vector<Extent> exv;
+ on->map_extents(bh->start(), 1, exv, 0);
+ assert(exv.size() == 1);
+ assert(exv[0].start != 0);
+ block_t cur_block = exv[0].start;
// verify csum
assert(bl.length() == (unsigned)EBOFS_BLOCK_SIZE);
bh->oc->on->bad_byte_extents.subtract(new_over);
}
- // ok, cancel my low-level partial (since we're still here, and can bh_write ourselves)
- bc->cancel_partial( bh->rx_from.start, bh->partial_tx_to, bh->partial_tx_epoch );
-
// apply partial to myself
assert(bh->data.length() == 0);
bufferptr bp = buffer::create_page_aligned(EBOFS_BLOCK_SIZE);
// write "normally"
bc->mark_dirty(bh);
- bc->bh_write(on, bh, bh->partial_tx_to);//cur_block);
+ bc->bh_write(on, bh, cur_block);
assert(bh->oc->on->is_dirty());
// clean up a bit
- bh->partial_tx_to = 0;
- bh->partial_tx_epoch = 0;
bh->partial.clear();
}
else {
dout(10) << "truncate " << *bh << " unshadowing " << *bh->shadow_of << dendl;
// shadow
bh->shadow_of->remove_shadow(bh);
- if (bh->is_partial())
- bc->cancel_shadow_partial(bh->rx_from.start, bh);
- } else {
- // normal
- if (bh->is_partial() && uncom)
- bc->bh_cancel_partial_write(bh);
}
for (map<block_t,list<Context*> >::iterator p = bh->waitfor_read.begin();
dout(0) << "clone_to PARTIAL FIXME NOT FULLY IMPLEMENTED ******" << dendl;
nbh->partial = bh->partial;
bc->mark_partial(nbh);
- // register as shadow_partial
- bc->add_shadow_partial(bh->rx_from.start, nbh);
} else {
// clean buffer will shadow
bh->add_shadow(nbh);
else
oc->rx_finish(ioh, start, length, bl);
- // finish any partials?
- // note: these are partials that were re-written after a commit,
- // or for whom the OC was destroyed (eg truncated after a commit)
- if (length == 1) {
- map<block_t,PartialWriteSet>::iterator sp = partial_write.find(diskstart);
- if (sp != partial_write.end()) {
- block_t pblock = diskstart;
-
- // verify csum
- csum_t want = sp->second.csum;
- csum_t actual = calc_csum(bl.c_str(), bl.length());
- if (actual != want || rand() % 5 == 0) {
- dout(0) << "rx_finish bad csum on partial block " << pblock << dendl;
- derr(0) << "rx_finish bad csum on partial block " << pblock << " ****************" << dendl;
- poison_commit = true;
- *sp->second.on->get_extent_csum_ptr(sp->second.oblock, 1) = actual;
- sp->second.on->data_csum += actual - want;
-
- interval_set<off_t> bad;
- bad.insert(sp->second.oblock*EBOFS_BLOCK_SIZE, EBOFS_BLOCK_SIZE);
- sp->second.on->bad_byte_extents.union_of(bad);
-
- interval_set<off_t> overwritten;
- for (map<block_t, PartialWrite>::iterator p = sp->second.writes.begin();
- p != sp->second.writes.end();
- p++) {
- interval_set<off_t> o;
- for (map<off_t,bufferlist>::iterator q = p->second.partial.begin();
- q != p->second.partial.end();
- q++)
- o.insert(sp->second.oblock*EBOFS_BLOCK_SIZE+q->first, q->second.length());
- overwritten.union_of(o);
- }
- interval_set<off_t> new_over;
- new_over.intersection_of(sp->second.on->bad_byte_extents, overwritten);
- sp->second.on->bad_byte_extents.subtract(new_over);
-
- dout(10) << "rx_finish overwrote " << overwritten << ", newly " << new_over
- << ", now " << sp->second.on->bad_byte_extents.m << dendl;
- }
-
- for (map<block_t, PartialWrite>::iterator p = sp->second.writes.begin();
- p != sp->second.writes.end();
- p++) {
- dout(10) << "rx_finish partial from " << pblock << " -> " << p->first
- << " for epoch " << p->second.epoch
- << dendl;
- // make the combined block
- bufferlist combined;
- bufferptr bp = buffer::create_page_aligned(EBOFS_BLOCK_SIZE);
- combined.push_back( bp );
- combined.copy_in((pblock-diskstart)*EBOFS_BLOCK_SIZE, (pblock-diskstart+1)*EBOFS_BLOCK_SIZE, bl);
- do_apply_partial( combined, p->second.partial );
-
- // write it!
- dev.write(pblock, 1, combined,
- new C_OC_PartialTxFinish( this, p->second.epoch ),
- "finish_partials");
- }
- partial_write.erase(sp);
- }
- }
-
- // shadow partials?
- {
- list<Context*> waiters;
- map<block_t, set<BufferHead*> >::iterator sp = shadow_partials.lower_bound(diskstart);
- while (sp != shadow_partials.end()) {
- if (sp->first >= diskstart+length) break;
- assert(sp->first >= diskstart);
-
- block_t pblock = sp->first;
- set<BufferHead*> ls;
- ls.swap( sp->second );
-
- map<block_t, set<BufferHead*> >::iterator t = sp;
- sp++;
- shadow_partials.erase(t);
-
- for (set<BufferHead*>::iterator p = ls.begin();
- p != ls.end();
- ++p) {
- BufferHead *bh = *p;
- dout(10) << "rx_finish applying shadow_partial for " << pblock
- << " to " << *bh << dendl;
- bufferptr bp = buffer::create_page_aligned(EBOFS_BLOCK_SIZE);
- bh->data.clear();
- bh->data.push_back( bp );
- bh->data.copy_in((pblock-diskstart)*EBOFS_BLOCK_SIZE,
- EBOFS_BLOCK_SIZE,
- bl);
- bh->apply_partial();
- mark_clean(bh);
-
- // trigger waiters
- for (map<block_t,list<Context*> >::iterator p = bh->waitfor_read.begin();
- p != bh->waitfor_read.end();
- p++) {
- assert(p->first >= bh->start() && p->first < bh->end());
- waiters.splice(waiters.begin(), p->second);
- }
- bh->waitfor_read.clear();
- }
- }
-
- // kick waiters
- finish_contexts(waiters);
- }
-
// done.
ebofs_lock.Unlock();
}
-void BufferCache::partial_tx_finish(version_t epoch)
-{
- ebofs_lock.Lock();
-
- dout(10) << "partial_tx_finish in epoch " << epoch << dendl;
-
- // update unflushed counter
- assert(get_unflushed(EBOFS_BC_FLUSH_PARTIAL, epoch) > 0);
- dec_unflushed(EBOFS_BC_FLUSH_PARTIAL, epoch);
-
- ebofs_lock.Unlock();
-}
-
-
-
-
-void BufferCache::bh_queue_partial_write(Onode *on, BufferHead *bh)
-{
- assert(bh->get_version() > 0);
-
- assert(bh->is_partial());
- assert(bh->length() == 1);
-
- // get the block no
- vector<Extent> exv;
- on->map_extents(bh->start(), bh->length(), exv, 0);
- assert(exv.size() == 1);
- assert(exv[0].start != 0);
- block_t b = exv[0].start;
- assert(exv[0].length == 1);
- bh->partial_tx_to = exv[0].start;
- bh->partial_tx_epoch = bh->epoch_modified;
-
- dout(10) << "bh_queue_partial_write " << *on << " on " << *bh << " block " << b << " epoch " << bh->epoch_modified << dendl;
-
-
- // copy map state, queue for this block
- assert(bh->rx_from.length == 1);
- csum_t csum = *on->get_extent_csum_ptr(bh->start(), 1);
- queue_partial(on, bh->start(), csum,
- bh->rx_from.start, bh->partial_tx_to, bh->partial, bh->partial_tx_epoch );
-}
-
-void BufferCache::bh_cancel_partial_write(BufferHead *bh)
-{
- assert(bh->is_partial());
- assert(bh->length() == 1);
-
- cancel_partial( bh->rx_from.start, bh->partial_tx_to, bh->partial_tx_epoch );
-}
-
-
-void BufferCache::queue_partial(Onode *on, block_t oblock, csum_t csum,
- block_t from, block_t to,
- map<off_t, bufferlist>& partial,
- version_t epoch)
-{
- dout(10) << "queue_partial " << on->object_id << " at " << oblock
- << " from disk " << from << " -> " << to
- << " in epoch " << epoch
- << dendl;
-
- if (partial_write[from].writes.count(to)) {
- // this should be in the same epoch.
- assert( partial_write[from].writes[to].epoch == epoch);
- assert(0); // actually.. no!
- } else {
- inc_unflushed( EBOFS_BC_FLUSH_PARTIAL, epoch );
- }
-
- on->get(); // one ref for each <from,to> pair.
- partial_write[from].on = on;
- partial_write[from].csum = csum;
- partial_write[from].oblock = oblock;
- partial_write[from].writes[to].partial = partial;
- partial_write[from].writes[to].epoch = epoch;
-}
-
-void BufferCache::cancel_partial(block_t from, block_t to, version_t epoch)
-{
- assert(partial_write.count(from));
- assert(partial_write[from].writes.count(to));
- assert(partial_write[from].writes[to].epoch == epoch);
-
- dout(10) << "cancel_partial " << from << " -> " << to
- << " (was epoch " << partial_write[from].writes[to].epoch << ")"
- << dendl;
-
- partial_write[from].writes.erase(to);
- partial_write[from].on->put(); // one ref per <from,to> pair.
- if (partial_write[from].writes.empty())
- partial_write.erase(from);
-
- dec_unflushed( EBOFS_BC_FLUSH_PARTIAL, epoch );
-}
-
-
-void BufferCache::add_shadow_partial(block_t from, BufferHead *bh)
-{
- dout(10) << "add_shadow_partial from " << from << " " << *bh << dendl;
- shadow_partials[from].insert(bh);
-}
-
-void BufferCache::cancel_shadow_partial(block_t from, BufferHead *bh)
-{
- dout(10) << "cancel_shadow_partial from " << from << " " << *bh << dendl;
- shadow_partials[from].erase(bh);
-}
Extent rx_from;
ioh_t tx_ioh; //
block_t tx_block;
- block_t partial_tx_to;
- version_t partial_tx_epoch;
map<off_t, bufferlist> partial; // partial dirty content overlayed onto incoming data
public:
BufferHead(ObjectCache *o, block_t start, block_t len) :
oc(o), //cancellable_ioh(0), tx_epoch(0),
- rx_ioh(0), tx_ioh(0), tx_block(0), partial_tx_to(0), partial_tx_epoch(0),
+ rx_ioh(0), tx_ioh(0), tx_block(0),
shadow_of(0),
ref(0), state(STATE_MISSING), epoch_modified(0), version(0), last_flushed(0),
object_loc(start, len),
bool bh_cancel_read(BufferHead *bh);
bool bh_cancel_write(BufferHead *bh, version_t cur_epoch);
- void bh_queue_partial_write(Onode *on, BufferHead *bh);
- void bh_cancel_partial_write(BufferHead *bh);
-
- void queue_partial(Onode *on, block_t opos, csum_t csum, block_t from, block_t to, map<off_t, bufferlist>& partial, version_t epoch);
- void cancel_partial(block_t from, block_t to, version_t epoch);
-
- void add_shadow_partial(block_t from, BufferHead *bh);
- void cancel_shadow_partial(block_t from, BufferHead *bh);
-
void rx_finish(ObjectCache *oc, ioh_t ioh, block_t start, block_t len, block_t diskstart, bufferlist& bl);
void tx_finish(ObjectCache *oc, ioh_t ioh, block_t start, block_t len, version_t v, version_t e);
- void partial_tx_finish(version_t epoch);
friend class C_E_FlushPartial;
}
};
-class C_OC_PartialTxFinish : public BlockDevice::callback {
- BufferCache *bc;
- version_t epoch;
-public:
- C_OC_PartialTxFinish(BufferCache *b, version_t e) :
- bc(b), epoch(e) {}
- void finish(ioh_t ioh, int r) {
- bc->partial_tx_finish(epoch);
- }
-};
-
#endif