From 969fe2e8596472dede7bc8b19d5f80f7c5626ffe Mon Sep 17 00:00:00 2001 From: sage Date: Wed, 14 Dec 2005 04:00:27 +0000 Subject: [PATCH] *** empty log message *** git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@520 29311d96-e01e-0410-9327-a35deaab8ce9 --- ceph/Makefile | 6 + ceph/config.cc | 2 +- ceph/ebofs/Allocator.cc | 13 +- ceph/ebofs/BlockDevice.cc | 226 +++++++++++++------ ceph/ebofs/BlockDevice.h | 26 +-- ceph/ebofs/BufferCache.cc | 431 ++++++++++++++++++++++++++++-------- ceph/ebofs/BufferCache.h | 135 ++++++++--- ceph/ebofs/Ebofs.cc | 356 ++++++++++------------------- ceph/ebofs/Ebofs.h | 21 +- ceph/ebofs/Onode.h | 7 +- ceph/ebofs/Table.h | 6 +- ceph/ebofs/mkfs.ebofs.cc | 165 +++++++------- ceph/ebofs/nodes.h | 52 +++-- ceph/ebofs/types.h | 2 - ceph/include/interval_set.h | 40 +++- 15 files changed, 898 insertions(+), 590 deletions(-) diff --git a/ceph/Makefile b/ceph/Makefile index 95bb09848c5cf..6ae152ac31a5c 100644 --- a/ceph/Makefile +++ b/ceph/Makefile @@ -129,9 +129,15 @@ obfstest: tcpsyn.cc mds/allmds.o client/Client.o client/Buffercache.o osd/OSD.cc ${MPICC} -DUSE_OBFS ${MPICFLAGS} ${MPILIBS} $^ -o $@ ../uofs/uofs.a # ebofs +ebofs: mkfs.ebofs test.ebofs + mkfs.ebofs: ebofs/mkfs.ebofs.cc config.cc common/Clock.o ebofs/ebo.o ${CC} -pg ${CFLAGS} ${LIBS} $^ -o $@ +test.ebofs: ebofs/test.ebofs.cc config.cc common/Clock.o ebofs/ebo.o + ${CC} -pg ${CFLAGS} ${LIBS} $^ -o $@ + + testmpi: test/testmpi.cc msg/MPIMessenger.cc config.o common/Timer.o common/clock.o msg/Messenger.o msg/Dispatcher.o msg/error.o diff --git a/ceph/config.cc b/ceph/config.cc index ea7c8ed355017..8b818095d754e 100644 --- a/ceph/config.cc +++ b/ceph/config.cc @@ -50,7 +50,7 @@ md_config_t g_conf = { fake_osdmap_expand: 0, fake_osd_sync: true, - debug: 100, + debug: 10, debug_mds_balancer: 1, debug_mds_log: 1, debug_buffer: 0, diff --git a/ceph/ebofs/Allocator.cc b/ceph/ebofs/Allocator.cc index dacde19a5162e..69bd76466dae7 100644 --- a/ceph/ebofs/Allocator.cc +++ b/ceph/ebofs/Allocator.cc @@ -4,11 +4,12 @@ #undef dout -#define dout(x) if (x <= g_conf.debug) cout << "allocator." +#define dout(x) if (x <= g_conf.debug) cout << "ebofs.allocator." void Allocator::dump_freelist() { + if (0) for (int b=0; bfree_tab[b]->get_num_keys() > 0) { @@ -59,9 +60,11 @@ int Allocator::find(Extent& ex, int bucket, block_t num, block_t near) int Allocator::allocate(Extent& ex, block_t num, block_t near) { + /* if (!near) { near = num/2; // this is totally wrong and stupid. } + */ int bucket; @@ -109,7 +112,7 @@ int Allocator::allocate(Extent& ex, block_t num, block_t near) } } - dout(1) << "allocator.alloc " << ex << " near " << near << endl; + dout(10) << "allocate " << ex << " near " << near << endl; dump_freelist(); return num; } @@ -124,7 +127,7 @@ int Allocator::allocate(Extent& ex, block_t num, block_t near) fs->free_tab[bucket]->remove(ex.start); fs->free_blocks -= ex.length; - dout(1) << "allocator.alloc partial " << ex << " near " << near << endl; + dout(10) << "allocate partial " << ex << " near " << near << endl; dump_freelist(); return ex.length; } @@ -137,7 +140,7 @@ int Allocator::allocate(Extent& ex, block_t num, block_t near) int Allocator::release(Extent& ex) { - dout(1) << "release " << ex << " (into limbo)" << endl; + dout(10) << "release " << ex << " (into limbo)" << endl; limbo.insert(ex.start, ex.length); return 0; } @@ -157,7 +160,7 @@ int Allocator::release_now(Extent& ex) { Extent newex = ex; - dout(1) << "release " << ex << endl; + dout(10) << "release " << ex << endl; // one after us? for (int b=0; b +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#include +#include + #undef dout -#define dout(x) if (x <= g_conf.debug) cout << "blockdevice." +#define dout(x) if (x <= g_conf.debug) cout << "dev." + + +block_t BlockDevice::get_num_blocks() +{ + if (!num_blocks) { + // stat + struct stat st; + assert(fd > 0); + int r = ::fstat(fd, &st); + assert(r == 0); + num_blocks = st.st_size / (block_t)EBOFS_BLOCK_SIZE; + } + return num_blocks; +} int BlockDevice::io_thread_entry() { - dout(1) << "io_thread start" << endl; + dout(10) << "io_thread start" << endl; // elevator nonsense! bool dir_forward = true; @@ -30,36 +59,61 @@ int BlockDevice::io_thread_entry() multimap::iterator i = io_queue.lower_bound(pos); if (i == io_queue.end()) break; + // merge contiguous ops + list biols; + char type = i->second->type; pos = i->first; - biovec *bio = i->second; + while (pos == i->first && + type == i->second->type) { + dout(20) << "io_thread dequeue io at " << pos << " " << (void*)i->second << endl; + biovec *bio = i->second; + biols.push_back(bio); + pos += bio->length; + + multimap::iterator prev = i; + i++; + io_queue_map.erase(bio); + io_queue.erase(prev); + + if (i == io_queue.end()) break; + } - dout(20) << "io_thread dequeue io at " << pos << endl; - io_queue_map.erase(i->second); - io_queue.erase(i); - lock.Unlock(); - do_io(bio); + do_io(biols); lock.Lock(); } } else { // reverse sweep dout(20) << "io_thread reverse sweep" << endl; pos = get_num_blocks(); + while (1) { // find i > pos multimap::iterator i = io_queue.upper_bound(pos); if (i == io_queue.begin()) break; i--; // and back down one (to get i <= pos) - - pos = i->first; - biovec *bio = i->second; - - dout(20) << "io_thread dequeue io at " << pos << endl; - io_queue_map.erase(i->second); - io_queue.erase(i); + // merge continguous ops + list biols; + char type = i->second->type; + pos = i->first; + while (pos == i->first && type == i->second->type) { + dout(20) << "io_thread dequeue io at " << pos << " " << (void*)i->second << endl; + biovec *bio = i->second; + biols.push_back(bio); + pos += bio->length; + + multimap::iterator prev = i; + bool begin = (i == io_queue.begin()); + if (!begin) i--; + io_queue_map.erase(bio); + io_queue.erase(prev); + + if (begin) break; + } + lock.Unlock(); - do_io(bio); + do_io(biols); lock.Lock(); } } @@ -74,30 +128,53 @@ int BlockDevice::io_thread_entry() } lock.Unlock(); - dout(1) << "io_thread finish" << endl; + dout(10) << "io_thread finish" << endl; return 0; } -void BlockDevice::do_io(biovec *bio) +void BlockDevice::do_io(list& biols) { int r; + assert(!biols.empty()); + + // get full range, type, bl + bufferlist bl; + bl.claim(biols.front()->bl); + block_t start = biols.front()->start; + block_t length = biols.front()->length; + char type = biols.front()->type; + + list::iterator p = biols.begin(); + for (p++; p != biols.end(); p++) { + length += (*p)->length; + bl.claim_append((*p)->bl); + } - if (bio->type == biovec::IO_WRITE) { - r = _write(bio->start, bio->length, bio->bl); - } else if (bio->type == biovec::IO_READ) { - r = _read(bio->start, bio->length, bio->bl); + // do it + dout(20) << "do_io start " << (type==biovec::IO_WRITE?"write":"read") + << " " << start << "~" << length << endl; + if (type == biovec::IO_WRITE) { + r = _write(start, length, bl); + } else if (type == biovec::IO_READ) { + r = _read(start, length, bl); } else assert(0); - - dout(20) << "do_io finish " << (void*)bio << " " << bio->start << "~" << bio->length << " " << (void*)bio->cond << " " << (void*)bio->context << endl; - - if (bio->cond) { - bio->cond->Signal(); - bio->rval = r; - } - else if (bio->context) { - bio->context->finish((int)bio); - delete bio->context; - delete bio; + dout(20) << "do_io finish " << (type==biovec::IO_WRITE?"write":"read") + << " " << start << "~" << length << endl; + + // finish + for (p = biols.begin(); p != biols.end(); p++) { + biovec *bio = *p; + if (bio->cond) { + //lock.Lock(); + bio->rval = r; + bio->cond->Signal(); + //lock.Unlock(); + } + else if (bio->context) { + bio->context->finish((int)bio); + delete bio->context; + delete bio; + } } } @@ -107,12 +184,12 @@ void BlockDevice::do_io(biovec *bio) void BlockDevice::_submit_io(biovec *b) { // NOTE: lock must be held - dout(1) << "_submit_io " << (void*)b << endl; + dout(15) << "_submit_io " << (void*)b << endl; // wake up thread? if (io_queue.empty()) io_wakeup.Signal(); - - // queue + + // queue anew io_queue.insert(pair(b->start, b)); io_queue_map[b] = b->start; } @@ -121,11 +198,11 @@ int BlockDevice::_cancel_io(biovec *bio) { // NOTE: lock must be held if (io_queue_map.count(bio) == 0) { - dout(1) << "_cancel_io " << (void*)bio << " FAILED" << endl; + dout(15) << "_cancel_io " << (void*)bio << " FAILED" << endl; return -1; } - dout(1) << "_cancel_io " << (void*)bio << endl; + dout(15) << "_cancel_io " << (void*)bio << endl; block_t b = io_queue_map[bio]; multimap::iterator p = io_queue.lower_bound(b); @@ -150,23 +227,27 @@ int BlockDevice::_read(block_t bno, unsigned num, bufferlist& bl) off_t actual = lseek(fd, offset, SEEK_SET); assert(actual == offset); - off_t len = num*EBOFS_BLOCK_SIZE; - assert((int)bl.length() >= len); - + size_t len = num*EBOFS_BLOCK_SIZE; + assert(bl.length() >= len); + + struct iovec iov[ bl.buffers().size() ]; + int n = 0; + size_t left = len; for (list::iterator i = bl.buffers().begin(); i != bl.buffers().end(); i++) { assert(i->length() % EBOFS_BLOCK_SIZE == 0); - int blen = i->length(); - if (blen > len) blen = len; - - int got = ::read(fd, i->c_str(), blen); - assert(got <= blen); - - len -= blen; - if (len == 0) break; + iov[n].iov_base = i->c_str(); + iov[n].iov_len = MIN(left, i->length()); + + left -= iov[n].iov_len; + n++; + if (left == 0) break; } + + int got = ::readv(fd, iov, n); + assert(got <= (int)len); return 0; } @@ -182,25 +263,33 @@ int BlockDevice::_write(unsigned bno, unsigned num, bufferlist& bl) assert(actual == offset); // write buffers - off_t len = num*EBOFS_BLOCK_SIZE; - + size_t len = num*EBOFS_BLOCK_SIZE; + + struct iovec iov[ bl.buffers().size() ]; + + int n = 0; + size_t left = len; for (list::iterator i = bl.buffers().begin(); i != bl.buffers().end(); i++) { assert(i->length() % EBOFS_BLOCK_SIZE == 0); + + iov[n].iov_base = i->c_str(); + iov[n].iov_len = MIN(left, i->length()); - off_t left = i->length(); - if (left > len) left = len; - int r = ::write(fd, i->c_str(), left); - dout(1) << "write " << fd << " " << (void*)i->c_str() << " " << left << endl; - if (r < 0) { - dout(1) << "couldn't write bno " << bno << " num " << num << " (" << left << " bytes) p=" << (void*)i->c_str() << " r=" << r << " errno " << errno << " " << strerror(errno) << endl; - } else { - assert(r == left); - } - - len -= left; - if (len == 0) break; + left -= iov[n].iov_len; + n++; + if (left == 0) break; + } + + int r = ::writev(fd, iov, n); + + if (r < 0) { + dout(1) << "couldn't write bno " << bno << " num " << num + << " (" << num << " bytes) r=" << r + << " errno " << errno << " " << strerror(errno) << endl; + } else { + assert(r == (int)len); } return 0; @@ -247,15 +336,18 @@ int BlockDevice::close() assert(fd>0); // shut down io thread - dout(1) << "close stopping io thread" << endl; + dout(10) << "close stopping io thread" << endl; lock.Lock(); io_stop = true; io_wakeup.Signal(); lock.Unlock(); io_thread.join(); - dout(1) << "close closing" << endl; - return ::close(fd); + dout(1) << "close" << endl; + ::close(fd); + fd = 0; + + return 0; } int BlockDevice::cancel_io(ioh_t ioh) @@ -268,7 +360,7 @@ int BlockDevice::cancel_io(ioh_t ioh) // FIXME? if (r == 0 && pbio->context) { - //pbio->context->finish(0); ******HELP!!! + //pbio->context->finish(0); delete pbio->context; delete pbio; } diff --git a/ceph/ebofs/BlockDevice.h b/ceph/ebofs/BlockDevice.h index 7513069b8b8f4..531799402f0a2 100644 --- a/ceph/ebofs/BlockDevice.h +++ b/ceph/ebofs/BlockDevice.h @@ -9,18 +9,6 @@ #include "types.h" -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include -#include typedef void *ioh_t; @@ -58,7 +46,7 @@ class BlockDevice { void _submit_io(biovec *b); int _cancel_io(biovec *bio); - void do_io(biovec *b); + void do_io(list& biols); // io_thread @@ -89,17 +77,7 @@ class BlockDevice { } // get size in blocks - block_t get_num_blocks() { - if (!num_blocks) { - // stat - struct stat st; - assert(fd > 0); - int r = ::fstat(fd, &st); - assert(r == 0); - num_blocks = st.st_size / (block_t)EBOFS_BLOCK_SIZE; - } - return num_blocks; - } + block_t get_num_blocks(); int open(); int close(); diff --git a/ceph/ebofs/BufferCache.cc b/ceph/ebofs/BufferCache.cc index 19d24daa2c282..3f15e6a0a0038 100644 --- a/ceph/ebofs/BufferCache.cc +++ b/ceph/ebofs/BufferCache.cc @@ -1,17 +1,52 @@ #include "BufferCache.h" +#include "Onode.h" -#undef dout -#define dout(x) if (x <= g_conf.debug) cout << "bc." +/*********** BufferHead **************/ -/*********** BufferHead **************/ +#undef dout +#define dout(x) if (x <= g_conf.debug) cout << "ebofs.bh." +void BufferHead::finish_partials() +{ + dout(10) << "finish_partials on " << *this << endl; + + // submit partial writes + for (map::iterator p = partial_write.begin(); + p != partial_write.end(); + p++) { + dout(10) << "finish_partials submitting queued write to " << p->second.block << endl; + // copy raw buffer; this may be a past write + bufferlist bl; + bl.push_back( oc->bc->bufferpool.alloc(EBOFS_BLOCK_SIZE) ); + bl.copy_in(0, EBOFS_BLOCK_SIZE, data); + apply_partial( bl, p->second.partial ); + + oc->bc->dev.write( p->second.block, 1, bl, + new C_OC_PartialTxFinish( oc, p->second.epoch )); + } + partial_write.clear(); +} +void BufferHead::queue_partial_write(block_t b) +{ + if (partial_write.count(b)) { + // overwrite previous partial write + // note that it better be same epoch if it's the same block!! + assert( partial_write[b].epoch == epoch_modified ); + partial_write.erase(b); + } else { + oc->bc->inc_unflushed( epoch_modified ); + } + partial_write[ b ].partial = partial; + partial_write[ b ].block = b; + partial_write[ b ].epoch = epoch_modified; +} @@ -21,37 +56,53 @@ /************ ObjectCache **************/ +#undef dout +#define dout(x) if (x <= g_conf.debug) cout << "ebofs.oc." + + void ObjectCache::rx_finish(ioh_t ioh, block_t start, block_t length) { list waiters; - bc->lock.Lock(); + bc->ebofs_lock.Lock(); dout(10) << "rx_finish " << start << "~" << length << endl; for (map::iterator p = data.lower_bound(start); p != data.end(); p++) { + dout(10) << "rx_finish ?" << *p->second << endl; + // past? if (p->first >= start+length) break; + if (p->second->end() > start+length) break; // past + + assert(p->first >= start); + assert(p->second->end() <= start+length); + + dout(10) << "rx_finish !" << *p->second << endl; + + if (p->second->rx_ioh == ioh) + p->second->rx_ioh = 0; + + if (p->second->is_partial_writes()) + p->second->finish_partials(); if (p->second->is_rx()) { - if (p->second->get_version() == 0) { - assert(p->second->end() <= start+length); - dout(10) << "rx_finish rx -> clean on " << *p->second << endl; - bc->mark_clean(p->second); - } + assert(p->second->get_version() == 0); + assert(p->second->end() <= start+length); + dout(10) << "rx_finish rx -> clean on " << *p->second << endl; + bc->mark_clean(p->second); } else if (p->second->is_partial()) { - dout(10) << "rx_finish partial -> dirty on " << *p->second << endl; + dout(10) << "rx_finish partial -> clean on " << *p->second << endl; p->second->apply_partial(); - bc->mark_dirty(p->second); + bc->mark_clean(p->second); } else { - dout(10) << "rx_finish ignoring " << *p->second << endl; + dout(10) << "rx_finish ignoring status on (dirty|tx) " << *p->second << endl; + assert(p->second->is_dirty() || p->second->is_tx()); } - - if (p->second->ioh == ioh) p->second->ioh = 0; // trigger waiters waiters.splice(waiters.begin(), p->second->waitfor_read); @@ -59,27 +110,30 @@ void ObjectCache::rx_finish(ioh_t ioh, block_t start, block_t length) finish_contexts(waiters); - bc->lock.Unlock(); + bc->ebofs_lock.Unlock(); } -void ObjectCache::tx_finish(ioh_t ioh, block_t start, block_t length, version_t version) +void ObjectCache::tx_finish(ioh_t ioh, block_t start, block_t length, + version_t version, version_t epoch) { list waiters; - - bc->lock.Lock(); - + + bc->ebofs_lock.Lock(); + dout(10) << "tx_finish " << start << "~" << length << " v" << version << endl; for (map::iterator p = data.lower_bound(start); p != data.end(); p++) { dout(20) << "tx_finish ?bh " << *p->second << endl; assert(p->first == p->second->start()); - //dout(10) << "tx_finish bh " << *p->second << endl; // past? if (p->first >= start+length) break; + if (p->second->tx_ioh == ioh) + p->second->tx_ioh = 0; + if (!p->second->is_tx()) { dout(10) << "tx_finish bh not marked tx, skipping" << endl; continue; @@ -88,35 +142,56 @@ void ObjectCache::tx_finish(ioh_t ioh, block_t start, block_t length, version_t assert(p->second->is_tx()); assert(p->second->end() <= start+length); - dout(10) << "tx_finish tx -> clean on " << *p->second << endl; - p->second->set_last_flushed(version); - bc->mark_clean(p->second); + if (version == p->second->version) { + dout(10) << "tx_finish tx -> clean on " << *p->second << endl; + p->second->set_last_flushed(version); + bc->mark_clean(p->second); - if (p->second->ioh == ioh) { - p->second->ioh = 0; - } - else if (p->second->shadow_ioh == ioh) { - p->second->shadow_ioh = 0; + // trigger waiters + waiters.splice(waiters.begin(), p->second->waitfor_flush); + } else { + dout(10) << "tx_finish leaving tx, " << p->second->version << " > " << version + << " on " << *p->second << endl; } - - // trigger waiters - waiters.splice(waiters.begin(), p->second->waitfor_flush); } finish_contexts(waiters); - bc->lock.Unlock(); + // update unflushed counter + assert(bc->get_unflushed(epoch) > 0); + bc->dec_unflushed(epoch); + + bc->ebofs_lock.Unlock(); +} + +void ObjectCache::partial_tx_finish(version_t epoch) +{ + bc->ebofs_lock.Lock(); + + dout(10) << "partial_tx_finish in epoch " << epoch << endl; + + // update unflushed counter + assert(bc->get_unflushed(epoch) > 0); + bc->dec_unflushed(epoch); + + bc->ebofs_lock.Unlock(); } -int ObjectCache::map_read(block_t start, block_t len, +/* + * map a range of blocks into buffer_heads. + * - create missing buffer_heads as necessary. + * - fragment along disk extent boundaries + */ + +int ObjectCache::map_read(Onode *on, + block_t start, block_t len, map& hits, map& missing, map& rx, map& partial) { map::iterator p = data.lower_bound(start); - // p->first >= start block_t cur = start; block_t left = len; @@ -132,12 +207,17 @@ int ObjectCache::map_read(block_t start, block_t len, // at end? if (p == data.end()) { // rest is a miss. - BufferHead *n = new BufferHead(this); - bc->add_bh(n); - n->set_start( cur ); - n->set_length( left ); - data[cur] = n; - missing[cur] = n; + vector exv; + on->map_extents(cur, left, exv); // we might consider some prefetch here. + for (unsigned i=0; iadd_bh(n); + n->set_start( cur ); + n->set_length( exv[i].length ); + data[cur] = n; + missing[cur] = n; + cur += exv[i].length; + } break; } @@ -172,18 +252,19 @@ int ObjectCache::map_read(block_t start, block_t len, } else if (p->first > cur) { // gap.. miss block_t next = p->first; - BufferHead *n = new BufferHead(this); - bc->add_bh(n); - n->set_start( cur ); - if (next - cur < left) - n->set_length( next - cur ); - else - n->set_length( left ); - data[cur] = n; - missing[cur] = n; - - cur += n->length(); - left -= n->length(); + vector exv; + on->map_extents(cur, MIN(next-cur, left), exv); // we might consider some prefetch here + + for (unsigned i=0; iadd_bh(n); + n->set_start( cur ); + n->set_length( exv[i].length ); + data[cur] = n; + missing[cur] = n; + cur += n->length(); + left -= n->length(); + } continue; // more? } else @@ -198,9 +279,16 @@ int ObjectCache::map_read(block_t start, block_t len, * map a range of pages on an object's buffer cache. * * - break up bufferheads that don't fall completely within the range + * - cancel rx ops we obsolete. + * - resubmit rx ops if we split bufferheads + * + * - leave potentially obsoleted tx ops alone (for now) + * - don't worry about disk extent boundaries (yet) */ -int ObjectCache::map_write(block_t start, block_t len, - map& hits) +int ObjectCache::map_write(Onode *on, + block_t start, block_t len, + interval_set& alloc, + map& hits) { map::iterator p = data.lower_bound(start); // p->first >= start @@ -216,52 +304,117 @@ int ObjectCache::map_write(block_t start, block_t len, } for (; left > 0; p++) { + // max for this bh (bc of (re)alloc on disk) + block_t max = left; + bool newalloc = false; + + // based on alloc/no-alloc boundary ... + if (alloc.contains(cur, left)) { + if (alloc.contains(cur)) { + block_t ends = alloc.end_after(cur); + max = MIN(left, ends-cur); + newalloc = true; + } else { + if (alloc.starts_after(cur)) { + block_t st = alloc.start_after(cur); + max = MIN(left, st-cur); + } + } + } + + // based on disk extent boundary ... + vector exv; + on->map_extents(cur, max, exv); + if (exv.size() > 1) + max = exv[0].length; + + if (newalloc) { + dout(10) << "map_write " << cur << "~" << max << " is new alloc on disk" << endl; + } else { + dout(10) << "map_write " << cur << "~" << max << " keeps old alloc on disk" << endl; + } + // at end? if (p == data.end()) { BufferHead *n = new BufferHead(this); bc->add_bh(n); n->set_start( cur ); - n->set_length( left ); + n->set_length( max ); data[cur] = n; hits[cur] = n; break; } - + if (p->first <= cur) { - // have it (or part of it) - BufferHead *e = p->second; - - if (p->first == cur && p->second->length() <= left) { - // whole bufferhead, piece of cake. - } else { - if (e->is_clean()) { - // we'll need to cut the buffer! :( - if (p->first == cur && p->second->length() > left) { - // we want left bit (one splice) - bc->split(e, cur+left); + BufferHead *bh = p->second; + dout(10) << "map_write bh " << *bh << " intersected" << endl; + + if (p->first < cur) { + if (cur+max >= p->first+p->second->length()) { + // we want right bit (one splice) + if (bh->is_rx() && bc->bh_cancel_read(bh)) { + BufferHead *right = bc->split(bh, cur); + bc->bh_read(on, bh); // reread left bit + bh = right; + } else if (bh->is_tx() && !newalloc && bc->bh_cancel_write(bh)) { + BufferHead *right = bc->split(bh, cur); + bc->bh_write(on, bh); // reread left bit + bh = right; + } else { + bh = bc->split(bh, cur); // just split it } - else if (p->first < cur && cur+left >= p->first+p->second->length()) { - // we want right bit (one splice) - e = bc->split(e, cur); + p++; + assert(p->second == bh); + } else { + // we want middle bit (two splices) + if (bh->is_rx() && bc->bh_cancel_read(bh)) { + BufferHead *middle = bc->split(bh, cur); + bc->bh_read(on, bh); // reread left p++; - assert(p->second == e); + assert(p->second == middle); + BufferHead *right = bc->split(middle, cur+max); + bc->bh_read(on, right); // reread right + bh = middle; + } else if (bh->is_tx() && !newalloc && bc->bh_cancel_write(bh)) { + BufferHead *middle = bc->split(bh, cur); + bc->bh_write(on, bh); // redo left + p++; + assert(p->second == middle); + BufferHead *right = bc->split(middle, cur+max); + bc->bh_write(on, right); // redo right + bh = middle; } else { - // we want middle bit (two splices) - e = bc->split(e, cur); + BufferHead *middle = bc->split(bh, cur); p++; - assert(p->second == e); - bc->split(e, cur+left); + assert(p->second == middle); + bc->split(middle, cur+max); + bh = middle; + } + } + } else if (p->first == cur) { + if (p->second->length() <= max) { + // whole bufferhead, piece of cake. + } else { + // we want left bit (one splice) + if (bh->is_rx() && bc->bh_cancel_read(bh)) { + BufferHead *right = bc->split(bh, cur+max); + bc->bh_read(on, right); // re-rx the right bit + } else if (bh->is_tx() && !newalloc && bc->bh_cancel_write(bh)) { + BufferHead *right = bc->split(bh, cur+max); + bc->bh_write(on, right); // re-tx the right bit + } else { + bc->split(bh, cur+max); // just split } - } + } } - // FIXME - hits[cur] = e; + // put in our map + hits[cur] = bh; // keep going. - block_t lenfromcur = e->length(); - if (e->start() < cur) - lenfromcur -= cur - e->start(); + block_t lenfromcur = bh->length(); + if (bh->start() < cur) + lenfromcur -= cur - bh->start(); if (lenfromcur < left) { cur += lenfromcur; @@ -273,18 +426,17 @@ int ObjectCache::map_write(block_t start, block_t len, } else { // gap! block_t next = p->first; + block_t glen = MIN(next-cur, max); + dout(10) << "map_write gap " << cur << "~" << glen << endl; BufferHead *n = new BufferHead(this); bc->add_bh(n); n->set_start( cur ); - if (next - cur < left) - n->set_length( next - cur ); - else - n->set_length( left ); + n->set_length( glen ); data[cur] = n; hits[cur] = n; - cur += n->length(); - left -= n->length(); + cur += glen; + left -= glen; continue; // more? } } @@ -326,6 +478,10 @@ int ObjectCache::scan_versions(block_t start, block_t len, /************** BufferCache ***************/ +#undef dout +#define dout(x) if (x <= g_conf.debug) cout << "ebofs.bc." + + BufferHead *BufferCache::split(BufferHead *orig, block_t after) { @@ -352,5 +508,106 @@ BufferHead *BufferCache::split(BufferHead *orig, block_t after) } +void BufferCache::bh_read(Onode *on, BufferHead *bh) +{ + dout(5) << "bh_read " << *on << " on " << *bh << endl; + + if (bh->is_missing()) { + mark_rx(bh); + } else { + assert(bh->is_partial()); + } + + // get extent. there should be only one! + vector exv; + on->map_extents(bh->start(), bh->length(), exv); + assert(exv.size() == 1); + Extent ex = exv[0]; + + // alloc new buffer + bufferpool.alloc(EBOFS_BLOCK_SIZE*bh->length(), bh->data); // new buffers! + + // this should be empty!! + assert(bh->rx_ioh == 0); + + dout(20) << "bh_read " << *bh << " from " << ex << endl; + + bh->rx_ioh = dev.read(ex.start, ex.length, bh->data, + new C_OC_RxFinish(on->oc, + bh->start(), bh->length())); +} + +bool BufferCache::bh_cancel_read(BufferHead *bh) +{ + assert(bh->rx_ioh); + if (dev.cancel_io(bh->rx_ioh) >= 0) { + dout(10) << "bh_cancel_read on " << *bh << endl; + bh->rx_ioh = 0; + mark_missing(bh); + return true; + } + return false; +} + +void BufferCache::bh_write(Onode *on, BufferHead *bh) +{ + dout(5) << "bh_write " << *on << " on " << *bh << endl; + assert(bh->get_version() > 0); + + assert(bh->is_dirty()); + mark_tx(bh); + + // get extents + vector exv; + on->map_extents(bh->start(), bh->length(), exv); + assert(exv.size() == 1); + Extent ex = exv[0]; + + dout(20) << "bh_write " << *bh << " to " << ex << endl; + + //assert(bh->tx_ioh == 0); + + bh->tx_ioh = dev.write(ex.start, ex.length, bh->data, + new C_OC_TxFinish(on->oc, + bh->start(), bh->length(), + bh->get_version(), + bh->epoch_modified)); + + epoch_unflushed[ bh->epoch_modified ]++; +} + + +bool BufferCache::bh_cancel_write(BufferHead *bh) +{ + assert(bh->tx_ioh); + if (dev.cancel_io(bh->tx_ioh) >= 0) { + dout(10) << "bh_cancel_write on " << *bh << endl; + bh->tx_ioh = 0; + mark_missing(bh); + epoch_unflushed[ bh->epoch_modified ]--; // assert.. this should be the same epoch! + return true; + } + return false; +} + + +void BufferCache::bh_queue_partial_write(Onode *on, BufferHead *bh) +{ + dout(5) << "bh_queue_partial_write " << *on << " on " << *bh << endl; + assert(bh->get_version() > 0); + + assert(bh->is_partial()); + assert(bh->length() == 1); + + // get the block + vector exv; + on->map_extents(bh->start(), bh->length(), exv); + assert(exv.size() == 1); + block_t b = exv[0].start; + assert(exv[0].length == 1); + + // copy map state, queue for this block + bh->queue_partial_write( b ); +} diff --git a/ceph/ebofs/BufferCache.h b/ceph/ebofs/BufferCache.h index 2d13648fb75f3..0ebefbb3603f1 100644 --- a/ceph/ebofs/BufferCache.h +++ b/ceph/ebofs/BufferCache.h @@ -10,35 +10,54 @@ #include "AlignedBufferPool.h" #include "BlockDevice.h" -#define BH_STATE_DIRTY 1 +#include "include/interval_set.h" class ObjectCache; class BufferCache; +class Onode; class BufferHead : public LRUObject { public: + /* + * - buffer_heads should always break across disk extent boundaries + * - partial buffer_heads are always 1 block. + */ const static int STATE_MISSING = 0; // missing; data is on disk, but not loaded. const static int STATE_CLEAN = 1; // Rw clean const static int STATE_DIRTY = 2; // RW dirty const static int STATE_TX = 3; // Rw flushing to disk const static int STATE_RX = 4; // w reading from disk - const static int STATE_PARTIAL = 5; // reading from disk, + partial content map. + const static int STATE_PARTIAL = 5; // reading from disk, + partial content map. always 1 block. + + class PartialWrite { + public: + map partial; // partial dirty content overlayed onto incoming data + block_t block; + version_t epoch; + }; public: ObjectCache *oc; - bufferlist data, shadow_data; - ioh_t ioh, shadow_ioh; // any pending read/write op - version_t tx_epoch; // epoch this write is in + bufferlist data; + + ioh_t rx_ioh; // + ioh_t tx_ioh; // list waitfor_read; list waitfor_flush; private: - map partial; // partial dirty content overlayed onto incoming data + map partial; // partial dirty content overlayed onto incoming data + + map partial_write; // queued writes w/ partial content int ref; int state; + + public: + version_t epoch_modified; + version_t version; // current version in cache version_t last_flushed; // last version flushed to disk @@ -48,8 +67,9 @@ class BufferHead : public LRUObject { public: BufferHead(ObjectCache *o) : - oc(o), ioh(0), shadow_ioh(0), tx_epoch(0), - ref(0), state(STATE_MISSING), version(0), last_flushed(0) + oc(o), //cancellable_ioh(0), tx_epoch(0), + rx_ioh(0), tx_ioh(0), + ref(0), state(STATE_MISSING), epoch_modified(0), version(0), last_flushed(0) {} ObjectCache *get_oc() { return oc; } @@ -97,17 +117,10 @@ class BufferHead : public LRUObject { bool is_rx() { return state == STATE_RX; } bool is_partial() { return state == STATE_PARTIAL; } - /* - void substr(block_t start, block_t len, bufferlist& sub) { - // determine offset in bufferlist - block_t start = object_loc.start - off; - block_t len = num - start; - if (start+len > object_loc.length) - len = object_loc.length - start; - - sub.substr_of(data, start*EBOFS_BLOCK_SIZE, len*EBOFS_BLOCK_SIZE); - } - */ + bool is_partial_writes() { return !partial_write.empty(); } + void finish_partials(); + void queue_partial_write(block_t b); + void copy_partial_substr(off_t start, off_t end, bufferlist& bl) { map::iterator i = partial.begin(); @@ -194,15 +207,18 @@ class BufferHead : public LRUObject { */ } void apply_partial() { + apply_partial(data, partial); + } + void apply_partial(bufferlist& bl, map& pm) { const off_t bhstart = start() * EBOFS_BLOCK_SIZE; //assert(partial_is_complete()); - for (map::iterator i = partial.begin(); - i != partial.end(); + for (map::iterator i = pm.begin(); + i != pm.end(); i++) { int pos = i->first - bhstart; - data.copy_in(pos, i->second.length(), i->second); + bl.copy_in(pos, i->second.length(), i->second); } - partial.clear(); + pm.clear(); } void add_partial(off_t off, bufferlist& p) { // trim overlap @@ -269,10 +285,11 @@ inline ostream& operator<<(ostream& out, BufferHead& bh) class ObjectCache { - private: + public: object_t object_id; BufferCache *bc; + private: map data; public: @@ -289,13 +306,16 @@ class ObjectCache { } bool is_empty() { return data.empty(); } - int map_read(block_t start, block_t len, + int map_read(Onode *on, + block_t start, block_t len, map& hits, // hits map& missing, // read these from disk map& rx, // wait for these to finish reading from disk map& partial); // (maybe) wait for these to read from disk - int map_write(block_t start, block_t len, + int map_write(Onode *on, + block_t start, block_t len, + interval_set& alloc, map& hits); // can write to these. BufferHead *split(BufferHead *bh, block_t off); @@ -304,8 +324,8 @@ class ObjectCache { version_t& low, version_t& high); void rx_finish(ioh_t ioh, block_t start, block_t length); - void tx_finish(ioh_t ioh, block_t start, block_t length, version_t v); - + void tx_finish(ioh_t ioh, block_t start, block_t length, version_t v, version_t epoch); + void partial_tx_finish(version_t epoch); }; class C_OC_RxFinish : public Context { @@ -325,20 +345,36 @@ class C_OC_TxFinish : public Context { ObjectCache *oc; block_t start, length; version_t version; + version_t epoch; public: - C_OC_TxFinish(ObjectCache *o, block_t s, block_t l, version_t v) : - oc(o), start(s), length(l), version(v) {} + C_OC_TxFinish(ObjectCache *o, block_t s, block_t l, version_t v, version_t e) : + oc(o), start(s), length(l), version(v), epoch(e) {} void finish(int r) { ioh_t ioh = (ioh_t)r; - if (ioh) - oc->tx_finish(ioh, start, length, version); + if (ioh) { + oc->tx_finish(ioh, start, length, version, epoch); + } + } +}; + +class C_OC_PartialTxFinish : public Context { + ObjectCache *oc; + version_t epoch; +public: + C_OC_PartialTxFinish(ObjectCache *o, version_t e) : + oc(o), epoch(e) {} + void finish(int r) { + ioh_t ioh = (ioh_t)r; + if (ioh) { + oc->partial_tx_finish(epoch); + } } }; class BufferCache { public: - Mutex &lock; // hack: ref to global lock + Mutex &ebofs_lock; // hack: this is a ref to global ebofs_lock BlockDevice &dev; AlignedBufferPool &bufferpool; @@ -357,9 +393,11 @@ class BufferCache { off_t stat_partial; off_t stat_missing; + map epoch_unflushed; + public: - BufferCache(BlockDevice& d, AlignedBufferPool& bp, Mutex& glock) : - lock(glock), dev(d), bufferpool(bp), + BufferCache(BlockDevice& d, AlignedBufferPool& bp, Mutex& el) : + ebofs_lock(el), dev(d), bufferpool(bp), stat_waiter(0), stat_clean(0), stat_dirty(0), stat_rx(0), stat_tx(0), stat_partial(0), stat_missing(0) {} @@ -416,9 +454,22 @@ class BufferCache { off_t get_stat_clean() { return stat_clean; } off_t get_stat_partial() { return stat_partial; } + int get_unflushed(version_t epoch) { + return epoch_unflushed[epoch]; + } + void inc_unflushed(version_t epoch) { + epoch_unflushed[epoch]++; + } + void dec_unflushed(version_t epoch) { + epoch_unflushed[epoch]--; + if (stat_waiter && + epoch_unflushed[epoch] == 0) + stat_cond.Signal(); + } + void waitfor_stat() { stat_waiter++; - stat_cond.Wait(lock); + stat_cond.Wait(ebofs_lock); stat_waiter--; } @@ -447,6 +498,7 @@ class BufferCache { set_state(bh2, bh1->get_state()); } + void mark_missing(BufferHead *bh) { set_state(bh, BufferHead::STATE_MISSING); }; void mark_clean(BufferHead *bh) { set_state(bh, BufferHead::STATE_CLEAN); }; void mark_rx(BufferHead *bh) { set_state(bh, BufferHead::STATE_RX); }; void mark_partial(BufferHead *bh) { set_state(bh, BufferHead::STATE_PARTIAL); }; @@ -457,6 +509,17 @@ class BufferCache { }; + // io + void bh_read(Onode *on, BufferHead *bh); + void bh_write(Onode *on, BufferHead *bh); + void bh_queue_partial_write(Onode *on, BufferHead *bh); + + bool bh_cancel_read(BufferHead *bh); + bool bh_cancel_write(BufferHead *bh); + + friend class C_E_FlushPartial; + + BufferHead *split(BufferHead *orig, block_t after); diff --git a/ceph/ebofs/Ebofs.cc b/ceph/ebofs/Ebofs.cc index 77b4471d09ad0..89295c672624d 100644 --- a/ceph/ebofs/Ebofs.cc +++ b/ceph/ebofs/Ebofs.cc @@ -11,6 +11,8 @@ int Ebofs::mount() // note: this will fail in mount -> unmount -> mount type situations, bc // prior state isn't fully cleaned up. + dout(1) << "mount" << endl; + ebofs_lock.Lock(); assert(!mounted); @@ -22,8 +24,8 @@ int Ebofs::mount() struct ebofs_super *sb1 = (struct ebofs_super*)bp1.c_str(); struct ebofs_super *sb2 = (struct ebofs_super*)bp2.c_str(); - dout(2) << "mount super @0 epoch " << sb1->epoch << endl; - dout(2) << "mount super @1 epoch " << sb2->epoch << endl; + dout(3) << "mount super @0 epoch " << sb1->epoch << endl; + dout(3) << "mount super @1 epoch " << sb2->epoch << endl; // pick newest super struct ebofs_super *sb = 0; @@ -32,17 +34,17 @@ int Ebofs::mount() else sb = sb2; super_epoch = sb->epoch; - dout(2) << "mount epoch " << super_epoch << endl; + dout(3) << "mount epoch " << super_epoch << endl; assert(super_epoch == sb->epoch); // init node pools - dout(2) << "mount nodepool" << endl; + dout(3) << "mount nodepool" << endl; nodepool.init( &sb->nodepool ); nodepool.read_usemap( dev, super_epoch ); nodepool.read_clean_nodes( dev ); // open tables - dout(2) << "mount opening tables" << endl; + dout(3) << "mount opening tables" << endl; object_tab = new Table( nodepool, sb->object_tab ); for (int i=0; i( nodepool, sb->free_tab[i] ); @@ -51,10 +53,10 @@ int Ebofs::mount() oc_tab = new Table( nodepool, sb->oc_tab ); co_tab = new Table( nodepool, sb->co_tab ); - dout(2) << "mount starting commit thread" << endl; + dout(3) << "mount starting commit thread" << endl; commit_thread.create(); - dout(2) << "mount mounted" << endl; + dout(1) << "mount mounted" << endl; mounted = true; ebofs_lock.Unlock(); @@ -124,7 +126,7 @@ int Ebofs::mkfs() write_super(1, superbp1); // free memory - dout(1) << "mkfs: cleaning up" << endl; + dout(3) << "mkfs: cleaning up" << endl; close_tables(); dout(1) << "mkfs: done" << endl; @@ -149,6 +151,7 @@ int Ebofs::umount() ebofs_lock.Lock(); // mark unmounting + dout(1) << "umount start" << endl; readonly = true; unmounting = true; @@ -156,12 +159,16 @@ int Ebofs::umount() commit_cond.Signal(); // wait + dout(2) << "umount stopping commit thread" << endl; ebofs_lock.Unlock(); commit_thread.join(); + ebofs_lock.Lock(); // free memory + dout(2) << "umount cleaning up" << endl; close_tables(); + dout(1) << "umount done" << endl; ebofs_lock.Unlock(); return 0; } @@ -172,7 +179,7 @@ void Ebofs::prepare_super(version_t epoch, bufferptr& bp) { struct ebofs_super sb; - dout(1) << "prepare_super v" << epoch << endl; + dout(10) << "prepare_super v" << epoch << endl; // fill in super memset(&sb, 0, sizeof(sb)); @@ -223,16 +230,18 @@ void Ebofs::write_super(version_t epoch, bufferptr& bp) { block_t bno = epoch & 1; - dout(1) << "write_super v" << epoch << " to b" << bno << endl; + dout(10) << "write_super v" << epoch << " to b" << bno << endl; dev.write(bno, 1, bp); } int Ebofs::commit_thread_entry() -{ - dout(10) << "commit_thread start" << endl; - +{ ebofs_lock.Lock(); + dout(10) << "commit_thread start" << endl; + + commit_thread_started = true; + sync_cond.Signal(); while (mounted) { @@ -272,13 +281,14 @@ int Ebofs::commit_thread_entry() ebofs_lock.Unlock(); write_super(super_epoch, superbp); ebofs_lock.Lock(); - + + sync_cond.Signal(); + dout(10) << "commit_thread commit finish" << endl; } - ebofs_lock.Unlock(); - dout(10) << "commit_thread finish" << endl; + ebofs_lock.Unlock(); return 0; } @@ -386,7 +396,7 @@ void Ebofs::write_onode(Onode *on, Context *c) // attr unsigned off = sizeof(eo); - for (map::iterator i = on->attr.begin(); + for (map::iterator i = on->attr.begin(); i != on->attr.end(); i++) { bl.copy_in(off, i->first.length()+1, i->first.c_str()); @@ -618,7 +628,9 @@ void Ebofs::commit_inodes_start() inodes_flushing++; write_onode(on, new C_E_InodeFlush(this)); on->mark_clean(); + on->uncommitted.clear(); // commit allocated blocks } + dirty_onodes.clear(); // cnodes for (set::iterator i = dirty_cnodes.begin(); @@ -629,6 +641,7 @@ void Ebofs::commit_inodes_start() write_cnode(cn, new C_E_InodeFlush(this)); cn->mark_clean(); } + dirty_cnodes.clear(); dout(10) << "commit_inodes_start writing " << inodes_flushing << " onodes+cnodes" << endl; } @@ -637,7 +650,7 @@ void Ebofs::commit_inodes_wait() { // caller must hold ebofs_lock while (inodes_flushing > 0) { - dout(10) << "commit_inodes_wait for " << inodes_flushing << " onodes+cnodes to flush" << endl; + dout(10) << "commit_inodes_wait waiting for " << inodes_flushing << " onodes+cnodes to flush" << endl; inode_commit_cond.Wait(ebofs_lock); } dout(10) << "commit_inodes_wait all flushed" << endl; @@ -651,32 +664,19 @@ void Ebofs::commit_inodes_wait() // *** buffer cache *** -// ... should already hold lock ... void Ebofs::trim_buffer_cache() { - //ebofs_lock.Lock(); - - // flush any dirty items? - while (bc.lru_dirty.lru_get_size() > bc.lru_dirty.lru_get_max()) { - BufferHead *bh = (BufferHead*) bc.lru_dirty.lru_expire(); - if (!bh) break; - - bc.lru_dirty.lru_insert_bot(bh); + ebofs_lock.Lock(); + dout(10) << "trim_buffer_cache start: " + << bc.lru_rest.lru_get_size() << " rest + " + << bc.lru_dirty.lru_get_size() << " dirty " << endl; - dout(10) << "trim_buffer_cache dirty " << *bh << endl; - assert(bh->is_dirty()); - - Onode *on = get_onode( bh->oc->get_object_id() ); - bh_write(on, bh); - put_onode(on); - } - - // trim bufferheads + // trim trimmable bufferheads while (bc.lru_rest.lru_get_size() > bc.lru_rest.lru_get_max()) { BufferHead *bh = (BufferHead*) bc.lru_rest.lru_expire(); if (!bh) break; - - dout(10) << "trim_buffer_cache rest " << *bh << endl; + + dout(10) << "trim_buffer_cache trimming " << *bh << endl; assert(bh->is_clean()); ObjectCache *oc = bh->oc; @@ -690,161 +690,72 @@ void Ebofs::trim_buffer_cache() put_onode(on); } } - dout(10) << "trim_buffer_cache " + dout(10) << "trim_buffer_cache finish: " << bc.lru_rest.lru_get_size() << " rest + " << bc.lru_dirty.lru_get_size() << " dirty " << endl; - - //ebofs_lock.Unlock(); + + ebofs_lock.Unlock(); } - -void Ebofs::commit_bc_wait(version_t epoch) -{ - dout(1) << "commit_bc_wait" << endl; -} - -void Ebofs::flush_all() +void Ebofs::sync() { - // FIXME what about partial heads? + ebofs_lock.Lock(); + dout(3) << "sync in " << super_epoch << endl; - dout(1) << "flush_all" << endl; - - bc.lock.Lock(); - - while (bc.get_stat_dirty() > 0 || // not strictly necessary - bc.get_stat_tx() > 0 || - bc.get_stat_partial() > 0 || - bc.get_stat_rx() > 0) { - - // write all dirty bufferheads - while (!bc.dirty_bh.empty()) { - set::iterator i = bc.dirty_bh.begin(); - BufferHead *bh = *i; - if (bh->ioh) continue; - Onode *on = get_onode(bh->oc->get_object_id()); - bh_write(on, bh); - put_onode(on); - } - - // wait for all tx and partial buffers to flush - dout(1) << "flush_all waiting for " - << bc.get_stat_dirty() << " dirty, " - << bc.get_stat_tx() << " tx, " - << bc.get_stat_rx() << " rx, " - << bc.get_stat_partial() << " partial" - << endl; - bc.waitfor_stat(); + if (!commit_thread_started) { + dout(10) << "sync waiting for commit thread to start" << endl; + sync_cond.Wait(ebofs_lock); } - bc.lock.Unlock(); - dout(1) << "flush_all done" << endl; -} - - - -// ? is this the best way ? -class C_E_FlushPartial : public Context { - Ebofs *ebofs; - Onode *on; - BufferHead *bh; -public: - C_E_FlushPartial(Ebofs *e, Onode *o, BufferHead *b) : ebofs(e), on(o), bh(b) {} - void finish(int r) { - if (r == 0) ebofs->bh_write(on, bh); - } -}; - - -void Ebofs::bh_read(Onode *on, BufferHead *bh) -{ - dout(5) << "bh_read " << *on << " on " << *bh << endl; - - if (bh->is_missing()) { - bc.mark_rx(bh); - } else { - assert(bh->is_partial()); + if (mid_commit) { + dout(10) << "sync waiting for commit in progress" << endl; + sync_cond.Wait(ebofs_lock); } - // get extents - vector ex; - on->map_extents(bh->start(), bh->length(), ex); - - // alloc new buffer - bc.bufferpool.alloc(EBOFS_BLOCK_SIZE*bh->length(), bh->data); // new buffers! + commit_cond.Signal(); // trigger a commit - // lay out on disk - block_t bhoff = 0; - for (unsigned i=0; idata, bhoff*EBOFS_BLOCK_SIZE, ex[i].length*EBOFS_BLOCK_SIZE); - - //if (bh->is_partial()) - //bh->waitfor_read.push_back(new C_E_FlushPartial(this, on, bh)); + sync_cond.Wait(ebofs_lock); // wait - assert(bh->ioh == 0); - bh->ioh = dev.read(ex[i].start, ex[i].length, sub, - new C_OC_RxFinish(on->oc, - bhoff + bh->start(), ex[i].length)); - - bhoff += ex[i].length; - } + dout(3) << "sync finish in " << super_epoch << endl; + ebofs_lock.Unlock(); } -void Ebofs::bh_write(Onode *on, BufferHead *bh) -{ - dout(5) << "bh_write " << *on << " on " << *bh << endl; - assert(bh->get_version() > 0); - assert(bh->is_dirty()); - bc.mark_tx(bh); - bh->tx_epoch = super_epoch; // note the epoch! +void Ebofs::commit_bc_wait(version_t epoch) +{ + dout(10) << "commit_bc_wait on epoch " << epoch << endl; + + while (bc.get_unflushed(epoch) > 0) { + dout(10) << "commit_bc_wait " << bc.get_unflushed(epoch) << " unflushed in epoch " << epoch << endl; + bc.waitfor_stat(); + } - // get extents - vector ex; - on->map_extents(bh->start(), bh->length(), ex); + dout(10) << "commit_bc_wait all flushed for epoch " << epoch << endl; +} - // lay out on disk - block_t bhoff = 0; - for (unsigned i=0; idata, bhoff*EBOFS_BLOCK_SIZE, ex[i].length*EBOFS_BLOCK_SIZE); - assert(bh->ioh == 0); - bh->ioh = dev.write(ex[i].start, ex[i].length, sub, - new C_OC_TxFinish(on->oc, - bhoff + bh->start(), ex[i].length, - bh->get_version())); - bhoff += ex[i].length; - } -} /* * allocate a write to blocks on disk. - * take care to not overwrite any "safe" data blocks. - * break up bufferheads in bh_hits that span realloc boundaries. - * final bufferhead set stored in final! + * - take care to not overwrite any "safe" data blocks. + * - allocate/map new extents on disk as necessary */ void Ebofs::alloc_write(Onode *on, - block_t start, block_t len, - map& hits) + block_t start, block_t len, + interval_set& alloc) { // first decide what pages to (re)allocate - interval_set alloc; on->map_alloc_regions(start, len, alloc); - dout(10) << "alloc_write need to alloc " << alloc << endl; + dout(10) << "alloc_write need to (re)alloc " << alloc << endl; // merge alloc into onode uncommitted map - cout << "union of " << on->uncommitted << " and " << alloc << endl; on->uncommitted.union_of(alloc); - - dout(10) << "alloc_write onode uncommitted now " << on->uncommitted << endl; + dout(10) << "alloc_write onode.uncommitted is now " << on->uncommitted << endl; // allocate the space for (map::iterator i = alloc.m.begin(); @@ -868,58 +779,11 @@ void Ebofs::alloc_write(Onode *on, cur += ex.length; } } - - // now break up the bh's as necessary - block_t cur = start; - block_t left = len; - - map::iterator bhp = hits.begin(); - map::iterator ap = alloc.m.begin(); - - block_t aoff = 0; - while (left > 0) { - assert(cur == bhp->first); - BufferHead *bh = bhp->second; - assert(cur == bh->start()); - assert(left >= bh->length()); - - assert(ap->first+aoff == bh->start()); - if (ap->second-aoff == bh->length()) { - // perfect. - cur += bh->length(); - left -= bh->length(); - ap++; - aoff = 0; - bhp++; - continue; - } - - if (bh->length() < ap->second-aoff) { - // bh is within alloc range. - cur += bh->length(); - left -= bh->length(); - aoff += bh->length(); - bhp++; - continue; - } - - // bh spans alloc boundary, split it! - assert(bh->length() > ap->second - aoff); - BufferHead *n = bc.split(bh, bh->start() + ap->second-aoff); - hits[n->start()] = n; // add new guy to hit map - - // bh is now shortened... - cur += bh->length(); - left -= bh->length(); - assert(ap->second == aoff + bh->length()); - aoff = 0; - ap++; - continue; - } } + void Ebofs::apply_write(Onode *on, size_t len, off_t off, bufferlist& bl) { ObjectCache *oc = on->get_oc(&bc); @@ -949,12 +813,13 @@ void Ebofs::apply_write(Onode *on, size_t len, off_t off, bufferlist& bl) block_t blast = (len+off-1) / EBOFS_BLOCK_SIZE; block_t blen = blast-bstart+1; + // allocate write on disk. + interval_set alloc; + alloc_write(on, bstart, blen, alloc); + // map b range onto buffer_heads map hits; - oc->map_write(bstart, blen, hits); - - // allocate write on disk. break buffer_heads across realloc/no realloc boundaries - alloc_write(on, bstart, blen, hits); + oc->map_write(on, bstart, blen, alloc, hits); // get current versions version_t lowv, highv; @@ -970,28 +835,35 @@ void Ebofs::apply_write(Onode *on, size_t len, off_t off, bufferlist& bl) i++) { BufferHead *bh = i->second; bh->set_version(highv+1); + bh->epoch_modified = super_epoch; - // cancel old io? - if (bh->is_tx()) { - if (bh->tx_epoch == super_epoch) { - // try to cancel the old io (just bc it's a waste) - dout(10) << "apply_write canceling old io on " << *bh << endl; - bc.dev.cancel_io(bh->ioh); - bh->ioh = 0; - } else { - // this tx is from prior epoch! shadow+copy the buffer before we modify it. - bh->shadow_data.claim(bh->data); - bc.bufferpool.alloc(EBOFS_BLOCK_SIZE*bh->length(), bh->data); // new buffers! - bh->data.copy_in(0, bh->length()*EBOFS_BLOCK_SIZE, bh->shadow_data); - bh->shadow_ioh = bh->ioh; - bh->ioh = 0; - } + // old write in progress? + if (bh->is_tx()) { // copy the buffer to avoid munging up in-flight write + dout(10) << "apply_write tx pending, copying buffer on " << *bh << endl; + bufferlist temp; + temp.claim(bh->data); + bc.bufferpool.alloc(EBOFS_BLOCK_SIZE*bh->length(), bh->data); + bh->data.copy_in(0, bh->length()*EBOFS_BLOCK_SIZE, temp); + } + + // need to split off partial? + if (bh->is_missing() && bh->length() > 1 && + (bh->start() == bstart && off % EBOFS_BLOCK_SIZE != 0)) { + BufferHead *right = bc.split(bh, bh->start()+1); + hits[right->start()] = right; + dout(10) << "apply_write split off left block for partial write; rest is " << *right << endl; + } + if (bh->is_missing() && bh->length() > 1 && + (bh->last() == blast && len+off % EBOFS_BLOCK_SIZE != 0) && + (len+off < on->object_size)) { + BufferHead *right = bc.split(bh, bh->last()); + hits[right->start()] = right; + dout(10) << "apply_write split off right block for upcoming partial write; rest is " << *right << endl; } // partial at head or tail? if ((bh->start() == bstart && off % EBOFS_BLOCK_SIZE != 0) || - (bh->last() == blast && (len+off) % EBOFS_BLOCK_SIZE != 0) || - (len % EBOFS_BLOCK_SIZE != 0)) { + (bh->last() == blast && (len+off) % EBOFS_BLOCK_SIZE != 0)) { // locate ourselves in bh unsigned off_in_bh = opos - bh->start()*EBOFS_BLOCK_SIZE; assert(off_in_bh >= 0); @@ -1029,27 +901,27 @@ void Ebofs::apply_write(Onode *on, size_t len, off_t off, bufferlist& bl) bh->data.zero(); bh->apply_partial(); bc.mark_dirty(bh); - if (bh->ioh) { - bc.dev.cancel_io( bh->ioh ); - bh->ioh = 0; - } - bh_write(on, bh); + bc.bh_write(on, bh); } else if (bh->is_rx()) { dout(10) << "apply_write rx -> partial " << *bh << endl; + assert(bh->length() == 1); bc.mark_partial(bh); + bc.bh_queue_partial_write(on, bh); // queue the eventual write } else if (bh->is_missing()) { dout(10) << "apply_write missing -> partial " << *bh << endl; - bh_read(on, bh); + assert(bh->length() == 1); bc.mark_partial(bh); + bc.bh_read(on, bh); + bc.bh_queue_partial_write(on, bh); // queue the eventual write } else if (bh->is_partial()) { - if (bh->ioh == 0) { - dout(10) << "apply_write submitting rx for partial " << *bh << endl; - bh_read(on, bh); - } + dout(10) << "apply_write already partial, no need to submit rx on " << *bh << endl; + bc.bh_queue_partial_write(on, bh); // queue the eventual write } + + } else { assert(bh->is_clean() || bh->is_dirty() || bh->is_tx()); @@ -1077,7 +949,7 @@ void Ebofs::apply_write(Onode *on, size_t len, off_t off, bufferlist& bl) if (!bh->is_dirty()) bc.mark_dirty(bh); - bh_write(on, bh); + bc.bh_write(on, bh); } continue; } @@ -1118,7 +990,7 @@ void Ebofs::apply_write(Onode *on, size_t len, off_t off, bufferlist& bl) if (!bh->is_dirty()) bc.mark_dirty(bh); - bh_write(on, bh); + bc.bh_write(on, bh); } assert(zleft == 0); @@ -1155,7 +1027,7 @@ bool Ebofs::attempt_read(Onode *on, size_t len, off_t off, bufferlist& bl, Cond map missing; // read these map rx; // wait for these map partials; // ?? - oc->map_read(bstart, blen, hits, missing, rx, partials); + oc->map_read(on, bstart, blen, hits, missing, rx, partials); // missing buffers? if (!missing.empty()) { @@ -1163,7 +1035,7 @@ bool Ebofs::attempt_read(Onode *on, size_t len, off_t off, bufferlist& bl, Cond i != missing.end(); i++) { dout(15) <<"attempt_read missing buffer " << *(i->second) << endl; - bh_read(on, i->second); + bc.bh_read(on, i->second); } BufferHead *wait_on = missing.begin()->second; wait_on->waitfor_read.push_back(new C_E_Cond(will_wait_on)); diff --git a/ceph/ebofs/Ebofs.h b/ceph/ebofs/Ebofs.h index 4830a8ad8c975..5888594544b96 100644 --- a/ceph/ebofs/Ebofs.h +++ b/ceph/ebofs/Ebofs.h @@ -38,11 +38,12 @@ class Ebofs : public ObjectStore { bool mounted, unmounting; bool readonly; version_t super_epoch; + bool commit_thread_started, mid_commit; + Cond commit_cond; // to wake up the commit thread + Cond sync_cond; void prepare_super(version_t epoch, bufferptr& bp); void write_super(version_t epoch, bufferptr& bp); - - Cond commit_cond; // to wake up the commit thread int commit_thread_entry(); class CommitThread : public Thread { @@ -120,26 +121,22 @@ class Ebofs : public ObjectStore { BufferCache bc; pthread_t flushd_thread_id; + version_t trigger_commit(); void commit_bc_wait(version_t epoch); public: + void sync(); void trim_buffer_cache(); - void flush_all(); + protected: //void zero(Onode *on, size_t len, off_t off, off_t write_thru); void alloc_write(Onode *on, block_t start, block_t len, - map& hits); + interval_set& alloc); void apply_write(Onode *on, size_t len, off_t off, bufferlist& bl); bool attempt_read(Onode *on, size_t len, off_t off, bufferlist& bl, Cond *will_wait_on); - // io - void bh_read(Onode *on, BufferHead *bh); - void bh_write(Onode *on, BufferHead *bh); - - friend class C_E_FlushPartial; - int flushd_thread(); static int flusd_thread_entry(void *p) { return ((Ebofs*)p)->flushd_thread(); @@ -148,10 +145,12 @@ class Ebofs : public ObjectStore { public: Ebofs(BlockDevice& d) : dev(d), - mounted(false), unmounting(false), readonly(false), super_epoch(0), + mounted(false), unmounting(false), readonly(false), + super_epoch(0), commit_thread_started(false), mid_commit(false), commit_thread(this), free_blocks(0), allocator(this), bufferpool(EBOFS_BLOCK_SIZE), + nodepool(ebofs_lock), object_tab(0), collection_tab(0), oc_tab(0), co_tab(0), inodes_flushing(0), bc(dev, bufferpool, ebofs_lock) { diff --git a/ceph/ebofs/Onode.h b/ceph/ebofs/Onode.h index 3f747bd7d0163..d32fc94017600 100644 --- a/ceph/ebofs/Onode.h +++ b/ceph/ebofs/Onode.h @@ -19,8 +19,6 @@ * */ - - class Onode : public LRUObject { private: int ref; @@ -60,12 +58,12 @@ public: void get() { if (ref == 0) lru_pin(); ref++; - cout << "onode.get " << ref << endl; + //cout << "ebofs.onode.get " << ref << endl; } void put() { ref--; if (ref == 0) lru_unpin(); - cout << "onode.put " << ref << endl; + //cout << "ebofs.onode.put " << ref << endl; } void mark_dirty() { @@ -290,7 +288,6 @@ public: return sizeof(Extent) * extents.size(); } - }; diff --git a/ceph/ebofs/Table.h b/ceph/ebofs/Table.h index d0812841e4165..eb6541d0c18d3 100644 --- a/ceph/ebofs/Table.h +++ b/ceph/ebofs/Table.h @@ -6,13 +6,11 @@ /** table **/ -class _Table { - int asdfasdfasdf; -}; +#define dbtout dout(20) template -class Table : public _Table { +class Table { private: NodePool &pool; diff --git a/ceph/ebofs/mkfs.ebofs.cc b/ceph/ebofs/mkfs.ebofs.cc index 733e202f2ca9d..3bc6b23beecbe 100644 --- a/ceph/ebofs/mkfs.ebofs.cc +++ b/ceph/ebofs/mkfs.ebofs.cc @@ -6,9 +6,15 @@ int main(int argc, char **argv) { // args - char *filename = 0; - if (argc > 1) filename = argv[1]; - if (!filename) return -1; + vector args; + argv_to_vec(argc, argv, args); + parse_config_options(args); + + if (args.size() < 1) { + cerr << "usage: mkfs.ebofs [options] " << endl; + return -1; + } + char *filename = args[0]; // device BlockDevice dev(filename); @@ -21,89 +27,94 @@ int main(int argc, char **argv) Ebofs mfs(dev); mfs.mkfs(); - // test-o-rama! - Ebofs fs(dev); - fs.mount(); - - if (0) { // test - bufferlist bl; - char crap[10000]; - memset(crap, 0, 10000); - bl.append(crap, 10000); - fs.write(10, bl.length(), 200, bl, (Context*)0); - fs.trim_buffer_cache(); - fs.write(10, bl.length(), 3222, bl, (Context*)0); - fs.trim_buffer_cache(); - fs.write(10, 5000, 3222, bl, (Context*)0); - } - - // test small writes if (1) { - char crap[10000]; - memset(crap, 0, 10000); - bufferlist bl; - bl.append(crap, 10000); - - // write - srand(0); - for (int i=0; i<100; i++) { - off_t off = rand() % 1000000; - size_t len = 100; - cout << "writing bit at " << off << " len " << len << endl; - fs.write(10, len, off, bl, (Context*)0); - } + // test-o-rama! + Ebofs fs(dev); + fs.mount(); - if (0) { - // read - srand(0); - for (int i=0; i<100; i++) { + if (1) { // test bufferlist bl; - off_t off = rand() % 1000000; - size_t len = 100; - cout << "read bit at " << off << " len " << len << endl; - int r = fs.read(10, len, off, bl); - assert(bl.length() == len); - assert(r == 0); - } + char crap[10000]; + memset(crap, 0, 10000); + bl.append(crap, 10000); + fs.write(10, bl.length(), 200, bl, (Context*)0); + sleep(1); + fs.trim_buffer_cache(); + fs.write(10, bl.length(), 5222, bl, (Context*)0); + fs.trim_buffer_cache(); + //fs.write(10, 5000, 3222, bl, (Context*)0); } - - // flush - fs.flush_all(); - fs.trim_buffer_cache(); - + + // test small writes if (0) { - // read again - srand(0); - for (int i=0; i<100; i++) { + char crap[10000]; + memset(crap, 0, 10000); bufferlist bl; - off_t off = rand() % 1000000; - size_t len = 100; - cout << "read bit at " << off << " len " << len << endl; - int r = fs.read(10, len, off, bl); - assert(bl.length() == len); - assert(r == 0); + bl.append(crap, 10000); + + // write + srand(0); + for (int i=0; i<100; i++) { + off_t off = rand() % 1000000; + size_t len = 100; + cout << "writing bit at " << off << " len " << len << endl; + fs.write(10, len, off, bl, (Context*)0); + } + + if (0) { + // read + srand(0); + for (int i=0; i<100; i++) { + bufferlist bl; + off_t off = rand() % 1000000; + size_t len = 100; + cout << "read bit at " << off << " len " << len << endl; + int r = fs.read(10, len, off, bl); + assert(bl.length() == len); + assert(r == 0); + } + } + + // flush + fs.sync(); + fs.trim_buffer_cache(); + //fs.trim_buffer_cache(); + + if (0) { + // read again + srand(0); + for (int i=0; i<100; i++) { + bufferlist bl; + off_t off = rand() % 1000000; + size_t len = 100; + cout << "read bit at " << off << " len " << len << endl; + int r = fs.read(10, len, off, bl); + assert(bl.length() == len); + assert(r == 0); + } + + // flush + fs.sync(); + fs.trim_buffer_cache(); + } + + // write on empty cache + srand(0); + for (int i=0; i<100; i++) { + off_t off = rand() % 1000000; + size_t len = 100; + cout << "writing bit at " << off << " len " << len << endl; + fs.write(10, len, off, bl, (Context*)0); + } + } - - // flush + + fs.sync(); fs.trim_buffer_cache(); - } - - // write on empty cache - srand(0); - for (int i=0; i<100; i++) { - off_t off = rand() % 1000000; - size_t len = 100; - cout << "writing bit at " << off << " len " << len << endl; - fs.write(10, len, off, bl, (Context*)0); - } - + fs.trim_onode_cache(); + + fs.umount(); } - - fs.flush_all(); - fs.trim_buffer_cache(); - fs.trim_onode_cache(); - - fs.umount(); dev.close(); } diff --git a/ceph/ebofs/nodes.h b/ceph/ebofs/nodes.h index d9c54e5fb415c..6064f9c7e33a2 100644 --- a/ceph/ebofs/nodes.h +++ b/ceph/ebofs/nodes.h @@ -126,7 +126,7 @@ class NodePool { set clean; // aka used set limbo; - Mutex lock; + Mutex &ebofs_lock; Cond commit_cond; int flushing; @@ -142,7 +142,9 @@ class NodePool { public: - NodePool() : bufferpool(EBOFS_NODE_BYTES), + NodePool(Mutex &el) : + bufferpool(EBOFS_NODE_BYTES), + ebofs_lock(el), flushing(0) {} ~NodePool() { // nodes @@ -227,13 +229,13 @@ class NodePool { to read. so it only really works when called from mount()! */ for (unsigned r=0; rset_state(Node::STATE_CLEAN); } else { assert(limbo.count(nid)); } - + flushing--; if (flushing == 0) commit_cond.Signal(); - lock.Unlock(); + ebofs_lock.Unlock(); } public: void commit_start(BlockDevice& dev, version_t version) { - lock.Lock(); assert(!is_committing()); // write map @@ -392,16 +398,12 @@ class NodePool { free.insert(*i); } limbo.clear(); - - lock.Unlock(); } void commit_wait() { - lock.Lock(); while (is_committing()) { - commit_cond.Wait(lock); + commit_cond.Wait(ebofs_lock); } - lock.Unlock(); } @@ -443,7 +445,7 @@ class NodePool { // new node Node* new_node(int type) { nodeid_t nid = alloc_id(); - dbtout << "pool.new_node " << nid << endl; + dout(15) << "ebofs.nodepool.new_node " << nid << endl; // alloc node bufferptr bp = bufferpool.alloc(EBOFS_NODE_BYTES); @@ -458,7 +460,7 @@ class NodePool { void release(Node *n) { const nodeid_t nid = n->get_id(); - dbtout << "pool.release on " << nid << endl; + dout(15) << "ebofs.nodepool.release on " << nid << endl; node_map.erase(nid); if (n->is_dirty()) { @@ -474,15 +476,11 @@ class NodePool { } void release_all() { - set left; - for (map::iterator i = node_map.begin(); - i != node_map.end(); - i++) - left.insert(i->second); - for (set::iterator i = left.begin(); - i != left.end(); - i++) - release( *i ); + while (!node_map.empty()) { + map::iterator i = node_map.begin(); + dout(2) << "ebofs.nodepool.release_all leftover " << i->first << " " << i->second << endl; + release( i->second ); + } assert(node_map.empty()); } @@ -490,7 +488,7 @@ class NodePool { // get new node id? nodeid_t oldid = n->get_id(); nodeid_t newid = alloc_id(); - dbtout << "pool.dirty_node on " << oldid << " now " << newid << endl; + dout(2) << "ebofs.nodepool.dirty_node on " << oldid << " now " << newid << endl; // release old block if (n->is_clean()) { diff --git a/ceph/ebofs/types.h b/ceph/ebofs/types.h index f5c34a873bc2f..b0637bfcd134f 100644 --- a/ceph/ebofs/types.h +++ b/ceph/ebofs/types.h @@ -11,8 +11,6 @@ using namespace std; using namespace __gnu_cxx; -#define dbtout cout - #define MIN(a,b) ((a)<=(b) ? (a):(b)) #define MAX(a,b) ((a)>=(b) ? (a):(b)) diff --git a/ceph/include/interval_set.h b/ceph/include/interval_set.h index 21fe454fad1f7..0214630cfda85 100644 --- a/ceph/include/interval_set.h +++ b/ceph/include/interval_set.h @@ -44,7 +44,7 @@ class interval_set { bool contains(T i) { typename map::iterator p = find_inc(i); - if (p == end()) return false; + if (p == m.end()) return false; if (p->first > i) return false; if (p->first+p->second <= i) return false; assert(p->first <= i && p->first+p->second > i); @@ -52,13 +52,49 @@ class interval_set { } bool contains(T start, T len) { typename map::iterator p = find_inc(start); - if (p == end()) return false; + if (p == m.end()) return false; if (p->first > start) return false; if (p->first+p->second <= start) return false; assert(p->first <= start && p->first+p->second > start); if (p->first+p->second < start+len) return false; return true; } + + // outer range of set + bool empty() { + return m.empty(); + } + T start() { + assert(!empty()); + typename map::iterator p = m.begin(); + return p->first; + } + T end() { + assert(!empty()); + typename map::iterator p = m.end(); + p--; + return p->first+p->second; + } + + // interval start after p (where p not in set) + bool starts_after(T i) { + assert(!contains(i)); + typename map::iterator p = find_inc(i); + if (p == m.end()) return false; + return true; + } + T start_after(T i) { + assert(!contains(i)); + typename map::iterator p = find_inc(i); + return p->first; + } + + // interval end that contains start + T end_after(T start) { + assert(contains(start)); + typename map::iterator p = find_inc(start); + return p->first+p->second; + } void insert(T start, T len) { typename map::iterator p = find_adj(start); -- 2.39.5