From 7af85120485683634702c4f1031746eaa0ff9e6e Mon Sep 17 00:00:00 2001 From: sage Date: Fri, 22 Sep 2006 20:37:45 +0000 Subject: [PATCH] fixed partial bug. still ugly, and there's a lingering issue with commit notification... git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@874 29311d96-e01e-0410-9327-a35deaab8ce9 --- ceph/ebofs/BlockDevice.cc | 4 ++-- ceph/ebofs/BufferCache.cc | 43 +++++++++++++++++++++------------------ ceph/ebofs/BufferCache.h | 21 ++++++++++++------- ceph/ebofs/Ebofs.cc | 34 +++++++++++++++++-------------- 4 files changed, 58 insertions(+), 44 deletions(-) diff --git a/ceph/ebofs/BlockDevice.cc b/ceph/ebofs/BlockDevice.cc index 5c3530518eb9a..21e6723f8f849 100644 --- a/ceph/ebofs/BlockDevice.cc +++ b/ceph/ebofs/BlockDevice.cc @@ -88,7 +88,7 @@ block_t BlockDevice::get_num_blocks() void BlockDevice::barrier() { lock.Lock(); - dout(10) << "barrier" << endl; + dout(-10) << "barrier" << endl; if (!use_next_queue && !io_queue.empty()) use_next_queue = true; @@ -98,7 +98,7 @@ void BlockDevice::barrier() void BlockDevice::_bump_queue() { if (io_queue.empty() && use_next_queue) { - dout(10) << "_bump_queue next_io_queue (" << next_io_queue.size() + dout(-10) << "_bump_queue next_io_queue (" << next_io_queue.size() << ") -> io_queue (" << io_queue.size() << ")" << endl; // empty, duh. use_next_queue = false; io_queue.swap(next_io_queue); diff --git a/ceph/ebofs/BufferCache.cc b/ceph/ebofs/BufferCache.cc index bcb68b5910637..55c3a10a2b390 100644 --- a/ceph/ebofs/BufferCache.cc +++ b/ceph/ebofs/BufferCache.cc @@ -333,7 +333,8 @@ int ObjectCache::map_read(block_t start, block_t len, */ int ObjectCache::map_write(block_t start, block_t len, interval_set& alloc, - map& hits) + map& hits, + version_t super_epoch) { map::iterator p = data.lower_bound(start); @@ -409,7 +410,7 @@ int ObjectCache::map_write(block_t start, block_t len, BufferHead *right = bc->split(bh, cur); bc->bh_read(on, bh); // reread left bit bh = right; - } else if (bh->is_tx() && !newalloc && bc->bh_cancel_write(bh)) { + } else if (bh->is_tx() && !newalloc && bc->bh_cancel_write(bh, super_epoch)) { BufferHead *right = bc->split(bh, cur); bc->bh_write(on, bh); // rewrite left bit bh = right; @@ -428,7 +429,7 @@ int ObjectCache::map_write(block_t start, block_t len, BufferHead *right = bc->split(middle, cur+max); bc->bh_read(on, right); // reread right bh = middle; - } else if (bh->is_tx() && !newalloc && bc->bh_cancel_write(bh)) { + } else if (bh->is_tx() && !newalloc && bc->bh_cancel_write(bh, super_epoch)) { BufferHead *middle = bc->split(bh, cur); bc->bh_write(on, bh); // redo left p++; @@ -452,7 +453,7 @@ int ObjectCache::map_write(block_t start, block_t len, if (bh->is_rx() && bc->bh_cancel_read(bh)) { BufferHead *right = bc->split(bh, cur+max); bc->bh_read(on, right); // re-rx the right bit - } else if (bh->is_tx() && !newalloc && bc->bh_cancel_write(bh)) { + } else if (bh->is_tx() && !newalloc && bc->bh_cancel_write(bh, super_epoch)) { BufferHead *right = bc->split(bh, cur+max); bc->bh_write(on, right); // re-tx the right bit } else { @@ -462,7 +463,7 @@ int ObjectCache::map_write(block_t start, block_t len, } // try to cancel tx? - if (bh->is_tx() && !newalloc) bc->bh_cancel_write(bh); + if (bh->is_tx() && !newalloc) bc->bh_cancel_write(bh, super_epoch); // put in our map hits[cur] = bh; @@ -526,7 +527,7 @@ int ObjectCache::scan_versions(block_t start, block_t len, } */ -void ObjectCache::truncate(block_t blocks) +void ObjectCache::truncate(block_t blocks, version_t super_epoch) { dout(7) << "truncate " << hex << object_id << dec << " " << blocks << " blocks" @@ -549,7 +550,7 @@ void ObjectCache::truncate(block_t blocks) BufferHead *right = bc->split(bh, blocks); bc->bh_read(on, bh); // reread left bit bh = right; - } else if (bh->is_tx() && uncom && bc->bh_cancel_write(bh)) { + } else if (bh->is_tx() && uncom && bc->bh_cancel_write(bh, super_epoch)) { BufferHead *right = bc->split(bh, blocks); bc->bh_write(on, bh); // rewrite left bit bh = right; @@ -563,7 +564,7 @@ void ObjectCache::truncate(block_t blocks) if (bh->is_rx()) bc->bh_cancel_read(bh); if (bh->is_tx() && uncom) - bc->bh_cancel_write(bh); + bc->bh_cancel_write(bh, super_epoch); if (bh->is_partial() && uncom) bc->bh_cancel_partial_write(bh); } @@ -676,7 +677,8 @@ void BufferCache::bh_read(Onode *on, BufferHead *bh, block_t from) dout(20) << "bh_read " << *bh << " from " << ex << endl; C_OC_RxFinish *fin = new C_OC_RxFinish(ebofs_lock, on->oc, - bh->start(), bh->length()); + bh->start(), bh->length(), + ex.start); bufferpool.alloc(EBOFS_BLOCK_SIZE*bh->length(), fin->bl); // new buffers! @@ -746,13 +748,17 @@ void BufferCache::bh_write(Onode *on, BufferHead *bh, block_t shouldbe) } -bool BufferCache::bh_cancel_write(BufferHead *bh) +bool BufferCache::bh_cancel_write(BufferHead *bh, version_t cur_epoch) { if (bh->tx_ioh && dev.cancel_io(bh->tx_ioh) >= 0) { dout(10) << "bh_cancel_write on " << *bh << endl; bh->tx_ioh = 0; mark_dirty(bh); + + assert(bh->epoch_modified == cur_epoch); + assert(bh->epoch_modified > 0); dec_unflushed( bh->epoch_modified ); // assert.. this should be the same epoch! + int l = bh->oc->put(); assert(l); return true; @@ -781,10 +787,11 @@ void BufferCache::tx_finish(ObjectCache *oc, void BufferCache::rx_finish(ObjectCache *oc, ioh_t ioh, block_t start, block_t length, + block_t diskstart, bufferlist& bl) { ebofs_lock.Lock(); - dout(10) << "rx_finish ioh " << ioh << " on " << start << "~" << length << endl; + dout(-10) << "rx_finish ioh " << ioh << " on " << start << "~" << length << endl; // oc if (oc->put() == 0) @@ -793,7 +800,7 @@ void BufferCache::rx_finish(ObjectCache *oc, oc->rx_finish(ioh, start, length, bl); // finish any partials? - map >::iterator sp = partial_write.lower_bound(start); + map >::iterator sp = partial_write.lower_bound(diskstart); while (sp != partial_write.end()) { if (sp->first >= start+length) break; assert(sp->first >= start); @@ -838,11 +845,7 @@ void BufferCache::partial_tx_finish(version_t epoch) { ebofs_lock.Lock(); - /* don't need oc! - // oc? - if (oc->put() == 0) - delete oc; - */ + dout(-10) << "partial_tx_finish in epoch " << epoch << endl; // update unflushed counter assert(get_unflushed(epoch) > 0); @@ -870,7 +873,7 @@ void BufferCache::bh_queue_partial_write(Onode *on, BufferHead *bh) bh->partial_tx_to = exv[0].start; bh->partial_tx_epoch = bh->epoch_modified; - dout(10) << "bh_queue_partial_write " << *on << " on " << *bh << " block " << b << " epoch " << bh->epoch_modified << endl; + dout(-10) << "bh_queue_partial_write " << *on << " on " << *bh << " block " << b << " epoch " << bh->epoch_modified << endl; // copy map state, queue for this block @@ -890,7 +893,7 @@ void BufferCache::bh_cancel_partial_write(BufferHead *bh) void BufferCache::queue_partial(block_t from, block_t to, map& partial, version_t epoch) { - dout(10) << "queue_partial " << from << " -> " << to + dout(-10) << "queue_partial " << from << " -> " << to << " in epoch " << epoch << endl; @@ -912,7 +915,7 @@ void BufferCache::cancel_partial(block_t from, block_t to, version_t epoch) assert(partial_write[from].count(to)); assert(partial_write[from][to].epoch == epoch); - dout(10) << "cancel_partial " << from << " -> " << to + dout(-10) << "cancel_partial " << from << " -> " << to << " (was epoch " << partial_write[from][to].epoch << ")" << endl; diff --git a/ceph/ebofs/BufferCache.h b/ceph/ebofs/BufferCache.h index 96f4ab6c46fd5..d7678fa9a17cb 100644 --- a/ceph/ebofs/BufferCache.h +++ b/ceph/ebofs/BufferCache.h @@ -372,7 +372,8 @@ class ObjectCache { int map_write(block_t start, block_t len, interval_set& alloc, - map& hits); // can write to these. + map& hits, + version_t super_epoch); // can write to these. BufferHead *split(BufferHead *bh, block_t off); @@ -383,7 +384,7 @@ class ObjectCache { void rx_finish(ioh_t ioh, block_t start, block_t length, bufferlist& bl); void tx_finish(ioh_t ioh, block_t start, block_t length, version_t v, version_t epoch); - void truncate(block_t blocks); + void truncate(block_t blocks, version_t super_epoch); // void tear_down(); void dump() { @@ -509,6 +510,11 @@ class BufferCache { off_t get_stat_clean() { return stat_clean; } off_t get_stat_partial() { return stat_partial; } + + map &get_unflushed() { + return epoch_unflushed; + } + int get_unflushed(version_t epoch) { return epoch_unflushed[epoch]; } @@ -571,7 +577,7 @@ class BufferCache { void bh_write(Onode *on, BufferHead *bh, block_t shouldbe=0); bool bh_cancel_read(BufferHead *bh); - bool bh_cancel_write(BufferHead *bh); + bool bh_cancel_write(BufferHead *bh, version_t cur_epoch); void bh_queue_partial_write(Onode *on, BufferHead *bh); void bh_cancel_partial_write(BufferHead *bh); @@ -579,7 +585,7 @@ class BufferCache { void queue_partial(block_t from, block_t to, map& partial, version_t epoch); void cancel_partial(block_t from, block_t to, version_t epoch); - void rx_finish(ObjectCache *oc, ioh_t ioh, block_t start, block_t len, bufferlist& bl); + void rx_finish(ObjectCache *oc, ioh_t ioh, block_t start, block_t len, block_t diskstart, bufferlist& bl); void tx_finish(ObjectCache *oc, ioh_t ioh, block_t start, block_t len, version_t v, version_t e); void partial_tx_finish(version_t epoch); @@ -594,12 +600,13 @@ class C_OC_RxFinish : public BlockDevice::callback { Mutex &lock; ObjectCache *oc; block_t start, length; + block_t diskstart; public: bufferlist bl; - C_OC_RxFinish(Mutex &m, ObjectCache *o, block_t s, block_t l) : - lock(m), oc(o), start(s), length(l) {} + C_OC_RxFinish(Mutex &m, ObjectCache *o, block_t s, block_t l, block_t ds) : + lock(m), oc(o), start(s), length(l), diskstart(ds) {} void finish(ioh_t ioh, int r) { - oc->bc->rx_finish(oc, ioh, start, length, bl); + oc->bc->rx_finish(oc, ioh, start, length, diskstart, bl); } }; diff --git a/ceph/ebofs/Ebofs.cc b/ceph/ebofs/Ebofs.cc index 29983dd97eb80..f81a5f2aebf68 100644 --- a/ceph/ebofs/Ebofs.cc +++ b/ceph/ebofs/Ebofs.cc @@ -382,12 +382,12 @@ int Ebofs::commit_thread_entry() << ", " << get_limbo_blocks() << " (" << 100*get_limbo_blocks()/dev.get_num_blocks() << "%) limbo in " << get_limbo_extents() << endl; - dout(2) << "commit_thread nodes: " + dout(-2) << "commit_thread nodes: " << 100*nodepool.num_used()/nodepool.num_total() << "% used, " << nodepool.num_free() << " (" << 100*nodepool.num_free()/nodepool.num_total() << "%) free, " << nodepool.num_limbo() << " (" << 100*nodepool.num_limbo()/nodepool.num_total() << "%) limbo, " << nodepool.num_total() << " total." << endl; - dout(2) << "commit_thread bc: " + dout(-2) << "commit_thread bc: " << "size " << bc.get_size() << ", trimmable " << bc.get_trimmable() << ", max " << g_conf.ebofs_bc_size @@ -415,20 +415,20 @@ int Ebofs::commit_thread_entry() // wait for it all to flush (drops global lock) commit_bc_wait(super_epoch-1); - dout(30) << "commit_thread bc flushed" << endl; + dout(-30) << "commit_thread bc flushed" << endl; commit_inodes_wait(); - dout(30) << "commit_thread inodes flushed" << endl; + dout(-30) << "commit_thread inodes flushed" << endl; nodepool.commit_wait(); - dout(30) << "commit_thread btree nodes flushed" << endl; + dout(-30) << "commit_thread btree nodes flushed" << endl; // ok, now (synchronously) write the prior super! - dout(10) << "commit_thread commit flushed, writing super for prior epoch" << endl; + dout(-10) << "commit_thread commit flushed, writing super for prior epoch" << endl; dev.barrier(); ebofs_lock.Unlock(); write_super(super_epoch, superbp); ebofs_lock.Lock(); - dout(10) << "commit_thread wrote super" << endl; + dout(-10) << "commit_thread wrote super" << endl; // free limbo space now // (since we're done allocating things, @@ -452,7 +452,7 @@ int Ebofs::commit_thread_entry() sync_cond.Signal(); - dout(10) << "commit_thread commit finish" << endl; + dout(-10) << "commit_thread commit finish" << endl; } // trim bc? @@ -773,7 +773,7 @@ void Ebofs::remove_onode(Onode *on) // tear down buffer cache if (on->oc) { - on->oc->truncate(0); // this will kick readers along the way. + on->oc->truncate(0, super_epoch); // this will kick readers along the way. on->close_oc(); } @@ -1236,11 +1236,14 @@ void Ebofs::commit_bc_wait(version_t epoch) dout(10) << "commit_bc_wait on epoch " << epoch << endl; while (bc.get_unflushed(epoch) > 0) { - dout(10) << "commit_bc_wait " << bc.get_unflushed(epoch) << " unflushed in epoch " << epoch << endl; + //dout(10) << "commit_bc_wait " << bc.get_unflushed(epoch) << " unflushed in epoch " << epoch << endl; + dout(-10) << "commit_bc_wait epoch " << epoch << ", unflushed " << bc.get_unflushed() << endl; bc.waitfor_stat(); } - dout(10) << "commit_bc_wait all flushed for epoch " << epoch << endl; + bc.get_unflushed().erase(epoch); + + dout(-10) << "commit_bc_wait all flushed for epoch " << epoch << "; " << bc.get_unflushed() << endl; } @@ -1314,7 +1317,7 @@ void Ebofs::alloc_write(Onode *on, // reallocate uncommitted too? // ( --> yes. we can always make better allocation decisions later, with more information. ) - if (1) { + if (0) { list tx; ObjectCache *oc = on->get_oc(&bc); @@ -1336,7 +1339,8 @@ void Ebofs::alloc_write(Onode *on, assert(old.size() == 1); if (bh->start() >= start && bh->end() <= start+len) { - if (bc.bh_cancel_write(bh)) { + assert(bh->epoch_modified == super_epoch); + if (bc.bh_cancel_write(bh, super_epoch)) { if (bh->length() == 1) dout(10) << "alloc_write unallocated tx " << old[0] << ", canceled " << *bh << endl; allocator.unallocate(old[0]); // release (into free) @@ -1452,7 +1456,7 @@ void Ebofs::apply_write(Onode *on, off_t off, size_t len, bufferlist& bl) // map b range onto buffer_heads map hits; - oc->map_write(bstart, blen, alloc, hits); + oc->map_write(bstart, blen, alloc, hits, super_epoch); // get current versions //version_t lowv, highv; @@ -2248,7 +2252,7 @@ int Ebofs::_truncate(object_t oid, off_t size) // truncate buffer cache if (on->oc) - on->oc->truncate(on->object_blocks); + on->oc->truncate(on->object_blocks, super_epoch); // update uncommitted interval_set uncom; -- 2.39.5