${MPICC} -DUSE_OBFS ${MPICFLAGS} ${MPILIBS} $^ -o $@ ../uofs/uofs.a
# ebofs
+ebofs: mkfs.ebofs test.ebofs
+
mkfs.ebofs: ebofs/mkfs.ebofs.cc config.cc common/Clock.o ebofs/ebo.o
${CC} -pg ${CFLAGS} ${LIBS} $^ -o $@
+test.ebofs: ebofs/test.ebofs.cc config.cc common/Clock.o ebofs/ebo.o
+ ${CC} -pg ${CFLAGS} ${LIBS} $^ -o $@
+
+
testmpi: test/testmpi.cc msg/MPIMessenger.cc config.o common/Timer.o common/clock.o msg/Messenger.o msg/Dispatcher.o msg/error.o
fake_osdmap_expand: 0,
fake_osd_sync: true,
- debug: 100,
+ debug: 10,
debug_mds_balancer: 1,
debug_mds_log: 1,
debug_buffer: 0,
#undef dout
-#define dout(x) if (x <= g_conf.debug) cout << "allocator."
+#define dout(x) if (x <= g_conf.debug) cout << "ebofs.allocator."
void Allocator::dump_freelist()
{
+ if (0)
for (int b=0; b<EBOFS_NUM_FREE_BUCKETS; b++) {
dout(20) << "dump bucket " << b << endl;
if (fs->free_tab[b]->get_num_keys() > 0) {
int Allocator::allocate(Extent& ex, block_t num, block_t near)
{
+ /*
if (!near) {
near = num/2; // this is totally wrong and stupid.
}
+ */
int bucket;
}
}
- dout(1) << "allocator.alloc " << ex << " near " << near << endl;
+ dout(10) << "allocate " << ex << " near " << near << endl;
dump_freelist();
return num;
}
fs->free_tab[bucket]->remove(ex.start);
fs->free_blocks -= ex.length;
- dout(1) << "allocator.alloc partial " << ex << " near " << near << endl;
+ dout(10) << "allocate partial " << ex << " near " << near << endl;
dump_freelist();
return ex.length;
}
int Allocator::release(Extent& ex)
{
- dout(1) << "release " << ex << " (into limbo)" << endl;
+ dout(10) << "release " << ex << " (into limbo)" << endl;
limbo.insert(ex.start, ex.length);
return 0;
}
{
Extent newex = ex;
- dout(1) << "release " << ex << endl;
+ dout(10) << "release " << ex << endl;
// one after us?
for (int b=0; b<EBOFS_NUM_FREE_BUCKETS; b++) {
#include "config.h"
+#include <unistd.h>
+#include <stdlib.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+#include <sys/file.h>
+#include <iostream>
+#include <cassert>
+#include <errno.h>
+
+#include <sys/uio.h>
+
+#include <sys/ioctl.h>
+#include <linux/fs.h>
+
#undef dout
-#define dout(x) if (x <= g_conf.debug) cout << "blockdevice."
+#define dout(x) if (x <= g_conf.debug) cout << "dev."
+
+
+block_t BlockDevice::get_num_blocks()
+{
+ if (!num_blocks) {
+ // stat
+ struct stat st;
+ assert(fd > 0);
+ int r = ::fstat(fd, &st);
+ assert(r == 0);
+ num_blocks = st.st_size / (block_t)EBOFS_BLOCK_SIZE;
+ }
+ return num_blocks;
+}
int BlockDevice::io_thread_entry()
{
- dout(1) << "io_thread start" << endl;
+ dout(10) << "io_thread start" << endl;
// elevator nonsense!
bool dir_forward = true;
multimap<block_t,biovec*>::iterator i = io_queue.lower_bound(pos);
if (i == io_queue.end()) break;
+ // merge contiguous ops
+ list<biovec*> biols;
+ char type = i->second->type;
pos = i->first;
- biovec *bio = i->second;
+ while (pos == i->first &&
+ type == i->second->type) {
+ dout(20) << "io_thread dequeue io at " << pos << " " << (void*)i->second << endl;
+ biovec *bio = i->second;
+ biols.push_back(bio);
+ pos += bio->length;
+
+ multimap<block_t,biovec*>::iterator prev = i;
+ i++;
+ io_queue_map.erase(bio);
+ io_queue.erase(prev);
+
+ if (i == io_queue.end()) break;
+ }
- dout(20) << "io_thread dequeue io at " << pos << endl;
- io_queue_map.erase(i->second);
- io_queue.erase(i);
-
lock.Unlock();
- do_io(bio);
+ do_io(biols);
lock.Lock();
}
} else {
// reverse sweep
dout(20) << "io_thread reverse sweep" << endl;
pos = get_num_blocks();
+
while (1) {
// find i > pos
multimap<block_t,biovec*>::iterator i = io_queue.upper_bound(pos);
if (i == io_queue.begin()) break;
i--; // and back down one (to get i <= pos)
-
- pos = i->first;
- biovec *bio = i->second;
-
- dout(20) << "io_thread dequeue io at " << pos << endl;
- io_queue_map.erase(i->second);
- io_queue.erase(i);
+ // merge continguous ops
+ list<biovec*> biols;
+ char type = i->second->type;
+ pos = i->first;
+ while (pos == i->first && type == i->second->type) {
+ dout(20) << "io_thread dequeue io at " << pos << " " << (void*)i->second << endl;
+ biovec *bio = i->second;
+ biols.push_back(bio);
+ pos += bio->length;
+
+ multimap<block_t,biovec*>::iterator prev = i;
+ bool begin = (i == io_queue.begin());
+ if (!begin) i--;
+ io_queue_map.erase(bio);
+ io_queue.erase(prev);
+
+ if (begin) break;
+ }
+
lock.Unlock();
- do_io(bio);
+ do_io(biols);
lock.Lock();
}
}
}
lock.Unlock();
- dout(1) << "io_thread finish" << endl;
+ dout(10) << "io_thread finish" << endl;
return 0;
}
-void BlockDevice::do_io(biovec *bio)
+void BlockDevice::do_io(list<biovec*>& biols)
{
int r;
+ assert(!biols.empty());
+
+ // get full range, type, bl
+ bufferlist bl;
+ bl.claim(biols.front()->bl);
+ block_t start = biols.front()->start;
+ block_t length = biols.front()->length;
+ char type = biols.front()->type;
+
+ list<biovec*>::iterator p = biols.begin();
+ for (p++; p != biols.end(); p++) {
+ length += (*p)->length;
+ bl.claim_append((*p)->bl);
+ }
- if (bio->type == biovec::IO_WRITE) {
- r = _write(bio->start, bio->length, bio->bl);
- } else if (bio->type == biovec::IO_READ) {
- r = _read(bio->start, bio->length, bio->bl);
+ // do it
+ dout(20) << "do_io start " << (type==biovec::IO_WRITE?"write":"read")
+ << " " << start << "~" << length << endl;
+ if (type == biovec::IO_WRITE) {
+ r = _write(start, length, bl);
+ } else if (type == biovec::IO_READ) {
+ r = _read(start, length, bl);
} else assert(0);
-
- dout(20) << "do_io finish " << (void*)bio << " " << bio->start << "~" << bio->length << " " << (void*)bio->cond << " " << (void*)bio->context << endl;
-
- if (bio->cond) {
- bio->cond->Signal();
- bio->rval = r;
- }
- else if (bio->context) {
- bio->context->finish((int)bio);
- delete bio->context;
- delete bio;
+ dout(20) << "do_io finish " << (type==biovec::IO_WRITE?"write":"read")
+ << " " << start << "~" << length << endl;
+
+ // finish
+ for (p = biols.begin(); p != biols.end(); p++) {
+ biovec *bio = *p;
+ if (bio->cond) {
+ //lock.Lock();
+ bio->rval = r;
+ bio->cond->Signal();
+ //lock.Unlock();
+ }
+ else if (bio->context) {
+ bio->context->finish((int)bio);
+ delete bio->context;
+ delete bio;
+ }
}
}
void BlockDevice::_submit_io(biovec *b)
{
// NOTE: lock must be held
- dout(1) << "_submit_io " << (void*)b << endl;
+ dout(15) << "_submit_io " << (void*)b << endl;
// wake up thread?
if (io_queue.empty()) io_wakeup.Signal();
-
- // queue
+
+ // queue anew
io_queue.insert(pair<block_t,biovec*>(b->start, b));
io_queue_map[b] = b->start;
}
{
// NOTE: lock must be held
if (io_queue_map.count(bio) == 0) {
- dout(1) << "_cancel_io " << (void*)bio << " FAILED" << endl;
+ dout(15) << "_cancel_io " << (void*)bio << " FAILED" << endl;
return -1;
}
- dout(1) << "_cancel_io " << (void*)bio << endl;
+ dout(15) << "_cancel_io " << (void*)bio << endl;
block_t b = io_queue_map[bio];
multimap<block_t,biovec*>::iterator p = io_queue.lower_bound(b);
off_t actual = lseek(fd, offset, SEEK_SET);
assert(actual == offset);
- off_t len = num*EBOFS_BLOCK_SIZE;
- assert((int)bl.length() >= len);
-
+ size_t len = num*EBOFS_BLOCK_SIZE;
+ assert(bl.length() >= len);
+
+ struct iovec iov[ bl.buffers().size() ];
+ int n = 0;
+ size_t left = len;
for (list<bufferptr>::iterator i = bl.buffers().begin();
i != bl.buffers().end();
i++) {
assert(i->length() % EBOFS_BLOCK_SIZE == 0);
- int blen = i->length();
- if (blen > len) blen = len;
-
- int got = ::read(fd, i->c_str(), blen);
- assert(got <= blen);
-
- len -= blen;
- if (len == 0) break;
+ iov[n].iov_base = i->c_str();
+ iov[n].iov_len = MIN(left, i->length());
+
+ left -= iov[n].iov_len;
+ n++;
+ if (left == 0) break;
}
+
+ int got = ::readv(fd, iov, n);
+ assert(got <= (int)len);
return 0;
}
assert(actual == offset);
// write buffers
- off_t len = num*EBOFS_BLOCK_SIZE;
-
+ size_t len = num*EBOFS_BLOCK_SIZE;
+
+ struct iovec iov[ bl.buffers().size() ];
+
+ int n = 0;
+ size_t left = len;
for (list<bufferptr>::iterator i = bl.buffers().begin();
i != bl.buffers().end();
i++) {
assert(i->length() % EBOFS_BLOCK_SIZE == 0);
+
+ iov[n].iov_base = i->c_str();
+ iov[n].iov_len = MIN(left, i->length());
- off_t left = i->length();
- if (left > len) left = len;
- int r = ::write(fd, i->c_str(), left);
- dout(1) << "write " << fd << " " << (void*)i->c_str() << " " << left << endl;
- if (r < 0) {
- dout(1) << "couldn't write bno " << bno << " num " << num << " (" << left << " bytes) p=" << (void*)i->c_str() << " r=" << r << " errno " << errno << " " << strerror(errno) << endl;
- } else {
- assert(r == left);
- }
-
- len -= left;
- if (len == 0) break;
+ left -= iov[n].iov_len;
+ n++;
+ if (left == 0) break;
+ }
+
+ int r = ::writev(fd, iov, n);
+
+ if (r < 0) {
+ dout(1) << "couldn't write bno " << bno << " num " << num
+ << " (" << num << " bytes) r=" << r
+ << " errno " << errno << " " << strerror(errno) << endl;
+ } else {
+ assert(r == (int)len);
}
return 0;
assert(fd>0);
// shut down io thread
- dout(1) << "close stopping io thread" << endl;
+ dout(10) << "close stopping io thread" << endl;
lock.Lock();
io_stop = true;
io_wakeup.Signal();
lock.Unlock();
io_thread.join();
- dout(1) << "close closing" << endl;
- return ::close(fd);
+ dout(1) << "close" << endl;
+ ::close(fd);
+ fd = 0;
+
+ return 0;
}
int BlockDevice::cancel_io(ioh_t ioh)
// FIXME?
if (r == 0 && pbio->context) {
- //pbio->context->finish(0); ******HELP!!!
+ //pbio->context->finish(0);
delete pbio->context;
delete pbio;
}
#include "types.h"
-#include <unistd.h>
-#include <stdlib.h>
-#include <sys/types.h>
-#include <sys/stat.h>
-#include <fcntl.h>
-#include <sys/file.h>
-#include <iostream>
-#include <cassert>
-#include <errno.h>
-
-#include <sys/ioctl.h>
-#include <linux/fs.h>
typedef void *ioh_t;
void _submit_io(biovec *b);
int _cancel_io(biovec *bio);
- void do_io(biovec *b);
+ void do_io(list<biovec*>& biols);
// io_thread
}
// get size in blocks
- block_t get_num_blocks() {
- if (!num_blocks) {
- // stat
- struct stat st;
- assert(fd > 0);
- int r = ::fstat(fd, &st);
- assert(r == 0);
- num_blocks = st.st_size / (block_t)EBOFS_BLOCK_SIZE;
- }
- return num_blocks;
- }
+ block_t get_num_blocks();
int open();
int close();
#include "BufferCache.h"
+#include "Onode.h"
-#undef dout
-#define dout(x) if (x <= g_conf.debug) cout << "bc."
+/*********** BufferHead **************/
-/*********** BufferHead **************/
+#undef dout
+#define dout(x) if (x <= g_conf.debug) cout << "ebofs.bh."
+void BufferHead::finish_partials()
+{
+ dout(10) << "finish_partials on " << *this << endl;
+
+ // submit partial writes
+ for (map<block_t, PartialWrite>::iterator p = partial_write.begin();
+ p != partial_write.end();
+ p++) {
+ dout(10) << "finish_partials submitting queued write to " << p->second.block << endl;
+ // copy raw buffer; this may be a past write
+ bufferlist bl;
+ bl.push_back( oc->bc->bufferpool.alloc(EBOFS_BLOCK_SIZE) );
+ bl.copy_in(0, EBOFS_BLOCK_SIZE, data);
+ apply_partial( bl, p->second.partial );
+
+ oc->bc->dev.write( p->second.block, 1, bl,
+ new C_OC_PartialTxFinish( oc, p->second.epoch ));
+ }
+ partial_write.clear();
+}
+void BufferHead::queue_partial_write(block_t b)
+{
+ if (partial_write.count(b)) {
+ // overwrite previous partial write
+ // note that it better be same epoch if it's the same block!!
+ assert( partial_write[b].epoch == epoch_modified );
+ partial_write.erase(b);
+ } else {
+ oc->bc->inc_unflushed( epoch_modified );
+ }
+ partial_write[ b ].partial = partial;
+ partial_write[ b ].block = b;
+ partial_write[ b ].epoch = epoch_modified;
+}
/************ ObjectCache **************/
+#undef dout
+#define dout(x) if (x <= g_conf.debug) cout << "ebofs.oc."
+
+
void ObjectCache::rx_finish(ioh_t ioh, block_t start, block_t length)
{
list<Context*> waiters;
- bc->lock.Lock();
+ bc->ebofs_lock.Lock();
dout(10) << "rx_finish " << start << "~" << length << endl;
for (map<block_t, BufferHead*>::iterator p = data.lower_bound(start);
p != data.end();
p++) {
+ dout(10) << "rx_finish ?" << *p->second << endl;
+
// past?
if (p->first >= start+length) break;
+ if (p->second->end() > start+length) break; // past
+
+ assert(p->first >= start);
+ assert(p->second->end() <= start+length);
+
+ dout(10) << "rx_finish !" << *p->second << endl;
+
+ if (p->second->rx_ioh == ioh)
+ p->second->rx_ioh = 0;
+
+ if (p->second->is_partial_writes())
+ p->second->finish_partials();
if (p->second->is_rx()) {
- if (p->second->get_version() == 0) {
- assert(p->second->end() <= start+length);
- dout(10) << "rx_finish rx -> clean on " << *p->second << endl;
- bc->mark_clean(p->second);
- }
+ assert(p->second->get_version() == 0);
+ assert(p->second->end() <= start+length);
+ dout(10) << "rx_finish rx -> clean on " << *p->second << endl;
+ bc->mark_clean(p->second);
}
else if (p->second->is_partial()) {
- dout(10) << "rx_finish partial -> dirty on " << *p->second << endl;
+ dout(10) << "rx_finish partial -> clean on " << *p->second << endl;
p->second->apply_partial();
- bc->mark_dirty(p->second);
+ bc->mark_clean(p->second);
}
else {
- dout(10) << "rx_finish ignoring " << *p->second << endl;
+ dout(10) << "rx_finish ignoring status on (dirty|tx) " << *p->second << endl;
+ assert(p->second->is_dirty() || p->second->is_tx());
}
-
- if (p->second->ioh == ioh) p->second->ioh = 0;
// trigger waiters
waiters.splice(waiters.begin(), p->second->waitfor_read);
finish_contexts(waiters);
- bc->lock.Unlock();
+ bc->ebofs_lock.Unlock();
}
-void ObjectCache::tx_finish(ioh_t ioh, block_t start, block_t length, version_t version)
+void ObjectCache::tx_finish(ioh_t ioh, block_t start, block_t length,
+ version_t version, version_t epoch)
{
list<Context*> waiters;
-
- bc->lock.Lock();
-
+
+ bc->ebofs_lock.Lock();
+
dout(10) << "tx_finish " << start << "~" << length << " v" << version << endl;
for (map<block_t, BufferHead*>::iterator p = data.lower_bound(start);
p != data.end();
p++) {
dout(20) << "tx_finish ?bh " << *p->second << endl;
assert(p->first == p->second->start());
- //dout(10) << "tx_finish bh " << *p->second << endl;
// past?
if (p->first >= start+length) break;
+ if (p->second->tx_ioh == ioh)
+ p->second->tx_ioh = 0;
+
if (!p->second->is_tx()) {
dout(10) << "tx_finish bh not marked tx, skipping" << endl;
continue;
assert(p->second->is_tx());
assert(p->second->end() <= start+length);
- dout(10) << "tx_finish tx -> clean on " << *p->second << endl;
- p->second->set_last_flushed(version);
- bc->mark_clean(p->second);
+ if (version == p->second->version) {
+ dout(10) << "tx_finish tx -> clean on " << *p->second << endl;
+ p->second->set_last_flushed(version);
+ bc->mark_clean(p->second);
- if (p->second->ioh == ioh) {
- p->second->ioh = 0;
- }
- else if (p->second->shadow_ioh == ioh) {
- p->second->shadow_ioh = 0;
+ // trigger waiters
+ waiters.splice(waiters.begin(), p->second->waitfor_flush);
+ } else {
+ dout(10) << "tx_finish leaving tx, " << p->second->version << " > " << version
+ << " on " << *p->second << endl;
}
-
- // trigger waiters
- waiters.splice(waiters.begin(), p->second->waitfor_flush);
}
finish_contexts(waiters);
- bc->lock.Unlock();
+ // update unflushed counter
+ assert(bc->get_unflushed(epoch) > 0);
+ bc->dec_unflushed(epoch);
+
+ bc->ebofs_lock.Unlock();
+}
+
+void ObjectCache::partial_tx_finish(version_t epoch)
+{
+ bc->ebofs_lock.Lock();
+
+ dout(10) << "partial_tx_finish in epoch " << epoch << endl;
+
+ // update unflushed counter
+ assert(bc->get_unflushed(epoch) > 0);
+ bc->dec_unflushed(epoch);
+
+ bc->ebofs_lock.Unlock();
}
-int ObjectCache::map_read(block_t start, block_t len,
+/*
+ * map a range of blocks into buffer_heads.
+ * - create missing buffer_heads as necessary.
+ * - fragment along disk extent boundaries
+ */
+
+int ObjectCache::map_read(Onode *on,
+ block_t start, block_t len,
map<block_t, BufferHead*>& hits,
map<block_t, BufferHead*>& missing,
map<block_t, BufferHead*>& rx,
map<block_t, BufferHead*>& partial) {
map<block_t, BufferHead*>::iterator p = data.lower_bound(start);
- // p->first >= start
block_t cur = start;
block_t left = len;
// at end?
if (p == data.end()) {
// rest is a miss.
- BufferHead *n = new BufferHead(this);
- bc->add_bh(n);
- n->set_start( cur );
- n->set_length( left );
- data[cur] = n;
- missing[cur] = n;
+ vector<Extent> exv;
+ on->map_extents(cur, left, exv); // we might consider some prefetch here.
+ for (unsigned i=0; i<exv.size(); i++) {
+ BufferHead *n = new BufferHead(this);
+ bc->add_bh(n);
+ n->set_start( cur );
+ n->set_length( exv[i].length );
+ data[cur] = n;
+ missing[cur] = n;
+ cur += exv[i].length;
+ }
break;
}
} else if (p->first > cur) {
// gap.. miss
block_t next = p->first;
- BufferHead *n = new BufferHead(this);
- bc->add_bh(n);
- n->set_start( cur );
- if (next - cur < left)
- n->set_length( next - cur );
- else
- n->set_length( left );
- data[cur] = n;
- missing[cur] = n;
-
- cur += n->length();
- left -= n->length();
+ vector<Extent> exv;
+ on->map_extents(cur, MIN(next-cur, left), exv); // we might consider some prefetch here
+
+ for (unsigned i=0; i<exv.size(); i++) {
+ BufferHead *n = new BufferHead(this);
+ bc->add_bh(n);
+ n->set_start( cur );
+ n->set_length( exv[i].length );
+ data[cur] = n;
+ missing[cur] = n;
+ cur += n->length();
+ left -= n->length();
+ }
continue; // more?
}
else
* map a range of pages on an object's buffer cache.
*
* - break up bufferheads that don't fall completely within the range
+ * - cancel rx ops we obsolete.
+ * - resubmit rx ops if we split bufferheads
+ *
+ * - leave potentially obsoleted tx ops alone (for now)
+ * - don't worry about disk extent boundaries (yet)
*/
-int ObjectCache::map_write(block_t start, block_t len,
- map<block_t, BufferHead*>& hits)
+int ObjectCache::map_write(Onode *on,
+ block_t start, block_t len,
+ interval_set<block_t>& alloc,
+ map<block_t, BufferHead*>& hits)
{
map<block_t, BufferHead*>::iterator p = data.lower_bound(start);
// p->first >= start
}
for (; left > 0; p++) {
+ // max for this bh (bc of (re)alloc on disk)
+ block_t max = left;
+ bool newalloc = false;
+
+ // based on alloc/no-alloc boundary ...
+ if (alloc.contains(cur, left)) {
+ if (alloc.contains(cur)) {
+ block_t ends = alloc.end_after(cur);
+ max = MIN(left, ends-cur);
+ newalloc = true;
+ } else {
+ if (alloc.starts_after(cur)) {
+ block_t st = alloc.start_after(cur);
+ max = MIN(left, st-cur);
+ }
+ }
+ }
+
+ // based on disk extent boundary ...
+ vector<Extent> exv;
+ on->map_extents(cur, max, exv);
+ if (exv.size() > 1)
+ max = exv[0].length;
+
+ if (newalloc) {
+ dout(10) << "map_write " << cur << "~" << max << " is new alloc on disk" << endl;
+ } else {
+ dout(10) << "map_write " << cur << "~" << max << " keeps old alloc on disk" << endl;
+ }
+
// at end?
if (p == data.end()) {
BufferHead *n = new BufferHead(this);
bc->add_bh(n);
n->set_start( cur );
- n->set_length( left );
+ n->set_length( max );
data[cur] = n;
hits[cur] = n;
break;
}
-
+
if (p->first <= cur) {
- // have it (or part of it)
- BufferHead *e = p->second;
-
- if (p->first == cur && p->second->length() <= left) {
- // whole bufferhead, piece of cake.
- } else {
- if (e->is_clean()) {
- // we'll need to cut the buffer! :(
- if (p->first == cur && p->second->length() > left) {
- // we want left bit (one splice)
- bc->split(e, cur+left);
+ BufferHead *bh = p->second;
+ dout(10) << "map_write bh " << *bh << " intersected" << endl;
+
+ if (p->first < cur) {
+ if (cur+max >= p->first+p->second->length()) {
+ // we want right bit (one splice)
+ if (bh->is_rx() && bc->bh_cancel_read(bh)) {
+ BufferHead *right = bc->split(bh, cur);
+ bc->bh_read(on, bh); // reread left bit
+ bh = right;
+ } else if (bh->is_tx() && !newalloc && bc->bh_cancel_write(bh)) {
+ BufferHead *right = bc->split(bh, cur);
+ bc->bh_write(on, bh); // reread left bit
+ bh = right;
+ } else {
+ bh = bc->split(bh, cur); // just split it
}
- else if (p->first < cur && cur+left >= p->first+p->second->length()) {
- // we want right bit (one splice)
- e = bc->split(e, cur);
+ p++;
+ assert(p->second == bh);
+ } else {
+ // we want middle bit (two splices)
+ if (bh->is_rx() && bc->bh_cancel_read(bh)) {
+ BufferHead *middle = bc->split(bh, cur);
+ bc->bh_read(on, bh); // reread left
p++;
- assert(p->second == e);
+ assert(p->second == middle);
+ BufferHead *right = bc->split(middle, cur+max);
+ bc->bh_read(on, right); // reread right
+ bh = middle;
+ } else if (bh->is_tx() && !newalloc && bc->bh_cancel_write(bh)) {
+ BufferHead *middle = bc->split(bh, cur);
+ bc->bh_write(on, bh); // redo left
+ p++;
+ assert(p->second == middle);
+ BufferHead *right = bc->split(middle, cur+max);
+ bc->bh_write(on, right); // redo right
+ bh = middle;
} else {
- // we want middle bit (two splices)
- e = bc->split(e, cur);
+ BufferHead *middle = bc->split(bh, cur);
p++;
- assert(p->second == e);
- bc->split(e, cur+left);
+ assert(p->second == middle);
+ bc->split(middle, cur+max);
+ bh = middle;
+ }
+ }
+ } else if (p->first == cur) {
+ if (p->second->length() <= max) {
+ // whole bufferhead, piece of cake.
+ } else {
+ // we want left bit (one splice)
+ if (bh->is_rx() && bc->bh_cancel_read(bh)) {
+ BufferHead *right = bc->split(bh, cur+max);
+ bc->bh_read(on, right); // re-rx the right bit
+ } else if (bh->is_tx() && !newalloc && bc->bh_cancel_write(bh)) {
+ BufferHead *right = bc->split(bh, cur+max);
+ bc->bh_write(on, right); // re-tx the right bit
+ } else {
+ bc->split(bh, cur+max); // just split
}
- }
+ }
}
- // FIXME
- hits[cur] = e;
+ // put in our map
+ hits[cur] = bh;
// keep going.
- block_t lenfromcur = e->length();
- if (e->start() < cur)
- lenfromcur -= cur - e->start();
+ block_t lenfromcur = bh->length();
+ if (bh->start() < cur)
+ lenfromcur -= cur - bh->start();
if (lenfromcur < left) {
cur += lenfromcur;
} else {
// gap!
block_t next = p->first;
+ block_t glen = MIN(next-cur, max);
+ dout(10) << "map_write gap " << cur << "~" << glen << endl;
BufferHead *n = new BufferHead(this);
bc->add_bh(n);
n->set_start( cur );
- if (next - cur < left)
- n->set_length( next - cur );
- else
- n->set_length( left );
+ n->set_length( glen );
data[cur] = n;
hits[cur] = n;
- cur += n->length();
- left -= n->length();
+ cur += glen;
+ left -= glen;
continue; // more?
}
}
/************** BufferCache ***************/
+#undef dout
+#define dout(x) if (x <= g_conf.debug) cout << "ebofs.bc."
+
+
BufferHead *BufferCache::split(BufferHead *orig, block_t after)
{
}
+void BufferCache::bh_read(Onode *on, BufferHead *bh)
+{
+ dout(5) << "bh_read " << *on << " on " << *bh << endl;
+
+ if (bh->is_missing()) {
+ mark_rx(bh);
+ } else {
+ assert(bh->is_partial());
+ }
+
+ // get extent. there should be only one!
+ vector<Extent> exv;
+ on->map_extents(bh->start(), bh->length(), exv);
+ assert(exv.size() == 1);
+ Extent ex = exv[0];
+
+ // alloc new buffer
+ bufferpool.alloc(EBOFS_BLOCK_SIZE*bh->length(), bh->data); // new buffers!
+
+ // this should be empty!!
+ assert(bh->rx_ioh == 0);
+
+ dout(20) << "bh_read " << *bh << " from " << ex << endl;
+
+ bh->rx_ioh = dev.read(ex.start, ex.length, bh->data,
+ new C_OC_RxFinish(on->oc,
+ bh->start(), bh->length()));
+}
+
+bool BufferCache::bh_cancel_read(BufferHead *bh)
+{
+ assert(bh->rx_ioh);
+ if (dev.cancel_io(bh->rx_ioh) >= 0) {
+ dout(10) << "bh_cancel_read on " << *bh << endl;
+ bh->rx_ioh = 0;
+ mark_missing(bh);
+ return true;
+ }
+ return false;
+}
+
+void BufferCache::bh_write(Onode *on, BufferHead *bh)
+{
+ dout(5) << "bh_write " << *on << " on " << *bh << endl;
+ assert(bh->get_version() > 0);
+
+ assert(bh->is_dirty());
+ mark_tx(bh);
+
+ // get extents
+ vector<Extent> exv;
+ on->map_extents(bh->start(), bh->length(), exv);
+ assert(exv.size() == 1);
+ Extent ex = exv[0];
+
+ dout(20) << "bh_write " << *bh << " to " << ex << endl;
+
+ //assert(bh->tx_ioh == 0);
+
+ bh->tx_ioh = dev.write(ex.start, ex.length, bh->data,
+ new C_OC_TxFinish(on->oc,
+ bh->start(), bh->length(),
+ bh->get_version(),
+ bh->epoch_modified));
+
+ epoch_unflushed[ bh->epoch_modified ]++;
+}
+
+
+bool BufferCache::bh_cancel_write(BufferHead *bh)
+{
+ assert(bh->tx_ioh);
+ if (dev.cancel_io(bh->tx_ioh) >= 0) {
+ dout(10) << "bh_cancel_write on " << *bh << endl;
+ bh->tx_ioh = 0;
+ mark_missing(bh);
+ epoch_unflushed[ bh->epoch_modified ]--; // assert.. this should be the same epoch!
+ return true;
+ }
+ return false;
+}
+
+
+void BufferCache::bh_queue_partial_write(Onode *on, BufferHead *bh)
+{
+ dout(5) << "bh_queue_partial_write " << *on << " on " << *bh << endl;
+ assert(bh->get_version() > 0);
+
+ assert(bh->is_partial());
+ assert(bh->length() == 1);
+
+ // get the block
+ vector<Extent> exv;
+ on->map_extents(bh->start(), bh->length(), exv);
+ assert(exv.size() == 1);
+ block_t b = exv[0].start;
+ assert(exv[0].length == 1);
+
+ // copy map state, queue for this block
+ bh->queue_partial_write( b );
+}
#include "AlignedBufferPool.h"
#include "BlockDevice.h"
-#define BH_STATE_DIRTY 1
+#include "include/interval_set.h"
class ObjectCache;
class BufferCache;
+class Onode;
class BufferHead : public LRUObject {
public:
+ /*
+ * - buffer_heads should always break across disk extent boundaries
+ * - partial buffer_heads are always 1 block.
+ */
const static int STATE_MISSING = 0; // missing; data is on disk, but not loaded.
const static int STATE_CLEAN = 1; // Rw clean
const static int STATE_DIRTY = 2; // RW dirty
const static int STATE_TX = 3; // Rw flushing to disk
const static int STATE_RX = 4; // w reading from disk
- const static int STATE_PARTIAL = 5; // reading from disk, + partial content map.
+ const static int STATE_PARTIAL = 5; // reading from disk, + partial content map. always 1 block.
+
+ class PartialWrite {
+ public:
+ map<off_t, bufferlist> partial; // partial dirty content overlayed onto incoming data
+ block_t block;
+ version_t epoch;
+ };
public:
ObjectCache *oc;
- bufferlist data, shadow_data;
- ioh_t ioh, shadow_ioh; // any pending read/write op
- version_t tx_epoch; // epoch this write is in
+ bufferlist data;
+
+ ioh_t rx_ioh; //
+ ioh_t tx_ioh; //
list<Context*> waitfor_read;
list<Context*> waitfor_flush;
private:
- map<off_t, bufferlist> partial; // partial dirty content overlayed onto incoming data
+ map<off_t, bufferlist> partial; // partial dirty content overlayed onto incoming data
+
+ map<block_t, PartialWrite> partial_write; // queued writes w/ partial content
int ref;
int state;
+
+ public:
+ version_t epoch_modified;
+
version_t version; // current version in cache
version_t last_flushed; // last version flushed to disk
public:
BufferHead(ObjectCache *o) :
- oc(o), ioh(0), shadow_ioh(0), tx_epoch(0),
- ref(0), state(STATE_MISSING), version(0), last_flushed(0)
+ oc(o), //cancellable_ioh(0), tx_epoch(0),
+ rx_ioh(0), tx_ioh(0),
+ ref(0), state(STATE_MISSING), epoch_modified(0), version(0), last_flushed(0)
{}
ObjectCache *get_oc() { return oc; }
bool is_rx() { return state == STATE_RX; }
bool is_partial() { return state == STATE_PARTIAL; }
- /*
- void substr(block_t start, block_t len, bufferlist& sub) {
- // determine offset in bufferlist
- block_t start = object_loc.start - off;
- block_t len = num - start;
- if (start+len > object_loc.length)
- len = object_loc.length - start;
-
- sub.substr_of(data, start*EBOFS_BLOCK_SIZE, len*EBOFS_BLOCK_SIZE);
- }
- */
+ bool is_partial_writes() { return !partial_write.empty(); }
+ void finish_partials();
+ void queue_partial_write(block_t b);
+
void copy_partial_substr(off_t start, off_t end, bufferlist& bl) {
map<off_t, bufferlist>::iterator i = partial.begin();
*/
}
void apply_partial() {
+ apply_partial(data, partial);
+ }
+ void apply_partial(bufferlist& bl, map<off_t, bufferlist>& pm) {
const off_t bhstart = start() * EBOFS_BLOCK_SIZE;
//assert(partial_is_complete());
- for (map<off_t, bufferlist>::iterator i = partial.begin();
- i != partial.end();
+ for (map<off_t, bufferlist>::iterator i = pm.begin();
+ i != pm.end();
i++) {
int pos = i->first - bhstart;
- data.copy_in(pos, i->second.length(), i->second);
+ bl.copy_in(pos, i->second.length(), i->second);
}
- partial.clear();
+ pm.clear();
}
void add_partial(off_t off, bufferlist& p) {
// trim overlap
class ObjectCache {
- private:
+ public:
object_t object_id;
BufferCache *bc;
+ private:
map<block_t, BufferHead*> data;
public:
}
bool is_empty() { return data.empty(); }
- int map_read(block_t start, block_t len,
+ int map_read(Onode *on,
+ block_t start, block_t len,
map<block_t, BufferHead*>& hits, // hits
map<block_t, BufferHead*>& missing, // read these from disk
map<block_t, BufferHead*>& rx, // wait for these to finish reading from disk
map<block_t, BufferHead*>& partial); // (maybe) wait for these to read from disk
- int map_write(block_t start, block_t len,
+ int map_write(Onode *on,
+ block_t start, block_t len,
+ interval_set<block_t>& alloc,
map<block_t, BufferHead*>& hits); // can write to these.
BufferHead *split(BufferHead *bh, block_t off);
version_t& low, version_t& high);
void rx_finish(ioh_t ioh, block_t start, block_t length);
- void tx_finish(ioh_t ioh, block_t start, block_t length, version_t v);
-
+ void tx_finish(ioh_t ioh, block_t start, block_t length, version_t v, version_t epoch);
+ void partial_tx_finish(version_t epoch);
};
class C_OC_RxFinish : public Context {
ObjectCache *oc;
block_t start, length;
version_t version;
+ version_t epoch;
public:
- C_OC_TxFinish(ObjectCache *o, block_t s, block_t l, version_t v) :
- oc(o), start(s), length(l), version(v) {}
+ C_OC_TxFinish(ObjectCache *o, block_t s, block_t l, version_t v, version_t e) :
+ oc(o), start(s), length(l), version(v), epoch(e) {}
void finish(int r) {
ioh_t ioh = (ioh_t)r;
- if (ioh)
- oc->tx_finish(ioh, start, length, version);
+ if (ioh) {
+ oc->tx_finish(ioh, start, length, version, epoch);
+ }
+ }
+};
+
+class C_OC_PartialTxFinish : public Context {
+ ObjectCache *oc;
+ version_t epoch;
+public:
+ C_OC_PartialTxFinish(ObjectCache *o, version_t e) :
+ oc(o), epoch(e) {}
+ void finish(int r) {
+ ioh_t ioh = (ioh_t)r;
+ if (ioh) {
+ oc->partial_tx_finish(epoch);
+ }
}
};
class BufferCache {
public:
- Mutex &lock; // hack: ref to global lock
+ Mutex &ebofs_lock; // hack: this is a ref to global ebofs_lock
BlockDevice &dev;
AlignedBufferPool &bufferpool;
off_t stat_partial;
off_t stat_missing;
+ map<version_t, int> epoch_unflushed;
+
public:
- BufferCache(BlockDevice& d, AlignedBufferPool& bp, Mutex& glock) :
- lock(glock), dev(d), bufferpool(bp),
+ BufferCache(BlockDevice& d, AlignedBufferPool& bp, Mutex& el) :
+ ebofs_lock(el), dev(d), bufferpool(bp),
stat_waiter(0),
stat_clean(0), stat_dirty(0), stat_rx(0), stat_tx(0), stat_partial(0), stat_missing(0)
{}
off_t get_stat_clean() { return stat_clean; }
off_t get_stat_partial() { return stat_partial; }
+ int get_unflushed(version_t epoch) {
+ return epoch_unflushed[epoch];
+ }
+ void inc_unflushed(version_t epoch) {
+ epoch_unflushed[epoch]++;
+ }
+ void dec_unflushed(version_t epoch) {
+ epoch_unflushed[epoch]--;
+ if (stat_waiter &&
+ epoch_unflushed[epoch] == 0)
+ stat_cond.Signal();
+ }
+
void waitfor_stat() {
stat_waiter++;
- stat_cond.Wait(lock);
+ stat_cond.Wait(ebofs_lock);
stat_waiter--;
}
set_state(bh2, bh1->get_state());
}
+ void mark_missing(BufferHead *bh) { set_state(bh, BufferHead::STATE_MISSING); };
void mark_clean(BufferHead *bh) { set_state(bh, BufferHead::STATE_CLEAN); };
void mark_rx(BufferHead *bh) { set_state(bh, BufferHead::STATE_RX); };
void mark_partial(BufferHead *bh) { set_state(bh, BufferHead::STATE_PARTIAL); };
};
+ // io
+ void bh_read(Onode *on, BufferHead *bh);
+ void bh_write(Onode *on, BufferHead *bh);
+ void bh_queue_partial_write(Onode *on, BufferHead *bh);
+
+ bool bh_cancel_read(BufferHead *bh);
+ bool bh_cancel_write(BufferHead *bh);
+
+ friend class C_E_FlushPartial;
+
+
BufferHead *split(BufferHead *orig, block_t after);
// note: this will fail in mount -> unmount -> mount type situations, bc
// prior state isn't fully cleaned up.
+ dout(1) << "mount" << endl;
+
ebofs_lock.Lock();
assert(!mounted);
struct ebofs_super *sb1 = (struct ebofs_super*)bp1.c_str();
struct ebofs_super *sb2 = (struct ebofs_super*)bp2.c_str();
- dout(2) << "mount super @0 epoch " << sb1->epoch << endl;
- dout(2) << "mount super @1 epoch " << sb2->epoch << endl;
+ dout(3) << "mount super @0 epoch " << sb1->epoch << endl;
+ dout(3) << "mount super @1 epoch " << sb2->epoch << endl;
// pick newest super
struct ebofs_super *sb = 0;
else
sb = sb2;
super_epoch = sb->epoch;
- dout(2) << "mount epoch " << super_epoch << endl;
+ dout(3) << "mount epoch " << super_epoch << endl;
assert(super_epoch == sb->epoch);
// init node pools
- dout(2) << "mount nodepool" << endl;
+ dout(3) << "mount nodepool" << endl;
nodepool.init( &sb->nodepool );
nodepool.read_usemap( dev, super_epoch );
nodepool.read_clean_nodes( dev );
// open tables
- dout(2) << "mount opening tables" << endl;
+ dout(3) << "mount opening tables" << endl;
object_tab = new Table<object_t, Extent>( nodepool, sb->object_tab );
for (int i=0; i<EBOFS_NUM_FREE_BUCKETS; i++)
free_tab[i] = new Table<block_t, block_t>( nodepool, sb->free_tab[i] );
oc_tab = new Table<idpair_t, bool>( nodepool, sb->oc_tab );
co_tab = new Table<idpair_t, bool>( nodepool, sb->co_tab );
- dout(2) << "mount starting commit thread" << endl;
+ dout(3) << "mount starting commit thread" << endl;
commit_thread.create();
- dout(2) << "mount mounted" << endl;
+ dout(1) << "mount mounted" << endl;
mounted = true;
ebofs_lock.Unlock();
write_super(1, superbp1);
// free memory
- dout(1) << "mkfs: cleaning up" << endl;
+ dout(3) << "mkfs: cleaning up" << endl;
close_tables();
dout(1) << "mkfs: done" << endl;
ebofs_lock.Lock();
// mark unmounting
+ dout(1) << "umount start" << endl;
readonly = true;
unmounting = true;
commit_cond.Signal();
// wait
+ dout(2) << "umount stopping commit thread" << endl;
ebofs_lock.Unlock();
commit_thread.join();
+ ebofs_lock.Lock();
// free memory
+ dout(2) << "umount cleaning up" << endl;
close_tables();
+ dout(1) << "umount done" << endl;
ebofs_lock.Unlock();
return 0;
}
{
struct ebofs_super sb;
- dout(1) << "prepare_super v" << epoch << endl;
+ dout(10) << "prepare_super v" << epoch << endl;
// fill in super
memset(&sb, 0, sizeof(sb));
{
block_t bno = epoch & 1;
- dout(1) << "write_super v" << epoch << " to b" << bno << endl;
+ dout(10) << "write_super v" << epoch << " to b" << bno << endl;
dev.write(bno, 1, bp);
}
int Ebofs::commit_thread_entry()
-{
- dout(10) << "commit_thread start" << endl;
-
+{
ebofs_lock.Lock();
+ dout(10) << "commit_thread start" << endl;
+
+ commit_thread_started = true;
+ sync_cond.Signal();
while (mounted) {
ebofs_lock.Unlock();
write_super(super_epoch, superbp);
ebofs_lock.Lock();
-
+
+ sync_cond.Signal();
+
dout(10) << "commit_thread commit finish" << endl;
}
- ebofs_lock.Unlock();
-
dout(10) << "commit_thread finish" << endl;
+ ebofs_lock.Unlock();
return 0;
}
// attr
unsigned off = sizeof(eo);
- for (map<string, AttrVal >::iterator i = on->attr.begin();
+ for (map<string, AttrVal>::iterator i = on->attr.begin();
i != on->attr.end();
i++) {
bl.copy_in(off, i->first.length()+1, i->first.c_str());
inodes_flushing++;
write_onode(on, new C_E_InodeFlush(this));
on->mark_clean();
+ on->uncommitted.clear(); // commit allocated blocks
}
+ dirty_onodes.clear();
// cnodes
for (set<Cnode*>::iterator i = dirty_cnodes.begin();
write_cnode(cn, new C_E_InodeFlush(this));
cn->mark_clean();
}
+ dirty_cnodes.clear();
dout(10) << "commit_inodes_start writing " << inodes_flushing << " onodes+cnodes" << endl;
}
{
// caller must hold ebofs_lock
while (inodes_flushing > 0) {
- dout(10) << "commit_inodes_wait for " << inodes_flushing << " onodes+cnodes to flush" << endl;
+ dout(10) << "commit_inodes_wait waiting for " << inodes_flushing << " onodes+cnodes to flush" << endl;
inode_commit_cond.Wait(ebofs_lock);
}
dout(10) << "commit_inodes_wait all flushed" << endl;
// *** buffer cache ***
-// ... should already hold lock ...
void Ebofs::trim_buffer_cache()
{
- //ebofs_lock.Lock();
-
- // flush any dirty items?
- while (bc.lru_dirty.lru_get_size() > bc.lru_dirty.lru_get_max()) {
- BufferHead *bh = (BufferHead*) bc.lru_dirty.lru_expire();
- if (!bh) break;
-
- bc.lru_dirty.lru_insert_bot(bh);
+ ebofs_lock.Lock();
+ dout(10) << "trim_buffer_cache start: "
+ << bc.lru_rest.lru_get_size() << " rest + "
+ << bc.lru_dirty.lru_get_size() << " dirty " << endl;
- dout(10) << "trim_buffer_cache dirty " << *bh << endl;
- assert(bh->is_dirty());
-
- Onode *on = get_onode( bh->oc->get_object_id() );
- bh_write(on, bh);
- put_onode(on);
- }
-
- // trim bufferheads
+ // trim trimmable bufferheads
while (bc.lru_rest.lru_get_size() > bc.lru_rest.lru_get_max()) {
BufferHead *bh = (BufferHead*) bc.lru_rest.lru_expire();
if (!bh) break;
-
- dout(10) << "trim_buffer_cache rest " << *bh << endl;
+
+ dout(10) << "trim_buffer_cache trimming " << *bh << endl;
assert(bh->is_clean());
ObjectCache *oc = bh->oc;
put_onode(on);
}
}
- dout(10) << "trim_buffer_cache "
+ dout(10) << "trim_buffer_cache finish: "
<< bc.lru_rest.lru_get_size() << " rest + "
<< bc.lru_dirty.lru_get_size() << " dirty " << endl;
-
- //ebofs_lock.Unlock();
+
+ ebofs_lock.Unlock();
}
-
-void Ebofs::commit_bc_wait(version_t epoch)
-{
- dout(1) << "commit_bc_wait" << endl;
-}
-
-void Ebofs::flush_all()
+void Ebofs::sync()
{
- // FIXME what about partial heads?
+ ebofs_lock.Lock();
+ dout(3) << "sync in " << super_epoch << endl;
- dout(1) << "flush_all" << endl;
-
- bc.lock.Lock();
-
- while (bc.get_stat_dirty() > 0 || // not strictly necessary
- bc.get_stat_tx() > 0 ||
- bc.get_stat_partial() > 0 ||
- bc.get_stat_rx() > 0) {
-
- // write all dirty bufferheads
- while (!bc.dirty_bh.empty()) {
- set<BufferHead*>::iterator i = bc.dirty_bh.begin();
- BufferHead *bh = *i;
- if (bh->ioh) continue;
- Onode *on = get_onode(bh->oc->get_object_id());
- bh_write(on, bh);
- put_onode(on);
- }
-
- // wait for all tx and partial buffers to flush
- dout(1) << "flush_all waiting for "
- << bc.get_stat_dirty() << " dirty, "
- << bc.get_stat_tx() << " tx, "
- << bc.get_stat_rx() << " rx, "
- << bc.get_stat_partial() << " partial"
- << endl;
- bc.waitfor_stat();
+ if (!commit_thread_started) {
+ dout(10) << "sync waiting for commit thread to start" << endl;
+ sync_cond.Wait(ebofs_lock);
}
- bc.lock.Unlock();
- dout(1) << "flush_all done" << endl;
-}
-
-
-
-// ? is this the best way ?
-class C_E_FlushPartial : public Context {
- Ebofs *ebofs;
- Onode *on;
- BufferHead *bh;
-public:
- C_E_FlushPartial(Ebofs *e, Onode *o, BufferHead *b) : ebofs(e), on(o), bh(b) {}
- void finish(int r) {
- if (r == 0) ebofs->bh_write(on, bh);
- }
-};
-
-
-void Ebofs::bh_read(Onode *on, BufferHead *bh)
-{
- dout(5) << "bh_read " << *on << " on " << *bh << endl;
-
- if (bh->is_missing()) {
- bc.mark_rx(bh);
- } else {
- assert(bh->is_partial());
+ if (mid_commit) {
+ dout(10) << "sync waiting for commit in progress" << endl;
+ sync_cond.Wait(ebofs_lock);
}
- // get extents
- vector<Extent> ex;
- on->map_extents(bh->start(), bh->length(), ex);
-
- // alloc new buffer
- bc.bufferpool.alloc(EBOFS_BLOCK_SIZE*bh->length(), bh->data); // new buffers!
+ commit_cond.Signal(); // trigger a commit
- // lay out on disk
- block_t bhoff = 0;
- for (unsigned i=0; i<ex.size(); i++) {
- dout(10) << "bh_read " << bhoff << ": " << ex[i] << endl;
- bufferlist sub;
- sub.substr_of(bh->data, bhoff*EBOFS_BLOCK_SIZE, ex[i].length*EBOFS_BLOCK_SIZE);
-
- //if (bh->is_partial())
- //bh->waitfor_read.push_back(new C_E_FlushPartial(this, on, bh));
+ sync_cond.Wait(ebofs_lock); // wait
- assert(bh->ioh == 0);
- bh->ioh = dev.read(ex[i].start, ex[i].length, sub,
- new C_OC_RxFinish(on->oc,
- bhoff + bh->start(), ex[i].length));
-
- bhoff += ex[i].length;
- }
+ dout(3) << "sync finish in " << super_epoch << endl;
+ ebofs_lock.Unlock();
}
-void Ebofs::bh_write(Onode *on, BufferHead *bh)
-{
- dout(5) << "bh_write " << *on << " on " << *bh << endl;
- assert(bh->get_version() > 0);
- assert(bh->is_dirty());
- bc.mark_tx(bh);
- bh->tx_epoch = super_epoch; // note the epoch!
+void Ebofs::commit_bc_wait(version_t epoch)
+{
+ dout(10) << "commit_bc_wait on epoch " << epoch << endl;
+
+ while (bc.get_unflushed(epoch) > 0) {
+ dout(10) << "commit_bc_wait " << bc.get_unflushed(epoch) << " unflushed in epoch " << epoch << endl;
+ bc.waitfor_stat();
+ }
- // get extents
- vector<Extent> ex;
- on->map_extents(bh->start(), bh->length(), ex);
+ dout(10) << "commit_bc_wait all flushed for epoch " << epoch << endl;
+}
- // lay out on disk
- block_t bhoff = 0;
- for (unsigned i=0; i<ex.size(); i++) {
- dout(10) << "bh_write bh off " << bhoff << ": " << ex[i] << endl;
- bufferlist sub;
- sub.substr_of(bh->data, bhoff*EBOFS_BLOCK_SIZE, ex[i].length*EBOFS_BLOCK_SIZE);
- assert(bh->ioh == 0);
- bh->ioh = dev.write(ex[i].start, ex[i].length, sub,
- new C_OC_TxFinish(on->oc,
- bhoff + bh->start(), ex[i].length,
- bh->get_version()));
- bhoff += ex[i].length;
- }
-}
/*
* allocate a write to blocks on disk.
- * take care to not overwrite any "safe" data blocks.
- * break up bufferheads in bh_hits that span realloc boundaries.
- * final bufferhead set stored in final!
+ * - take care to not overwrite any "safe" data blocks.
+ * - allocate/map new extents on disk as necessary
*/
void Ebofs::alloc_write(Onode *on,
- block_t start, block_t len,
- map<block_t, BufferHead*>& hits)
+ block_t start, block_t len,
+ interval_set<block_t>& alloc)
{
// first decide what pages to (re)allocate
- interval_set<block_t> alloc;
on->map_alloc_regions(start, len, alloc);
- dout(10) << "alloc_write need to alloc " << alloc << endl;
+ dout(10) << "alloc_write need to (re)alloc " << alloc << endl;
// merge alloc into onode uncommitted map
- cout << "union of " << on->uncommitted << " and " << alloc << endl;
on->uncommitted.union_of(alloc);
-
- dout(10) << "alloc_write onode uncommitted now " << on->uncommitted << endl;
+ dout(10) << "alloc_write onode.uncommitted is now " << on->uncommitted << endl;
// allocate the space
for (map<block_t,block_t>::iterator i = alloc.m.begin();
cur += ex.length;
}
}
-
- // now break up the bh's as necessary
- block_t cur = start;
- block_t left = len;
-
- map<block_t,BufferHead*>::iterator bhp = hits.begin();
- map<block_t,block_t>::iterator ap = alloc.m.begin();
-
- block_t aoff = 0;
- while (left > 0) {
- assert(cur == bhp->first);
- BufferHead *bh = bhp->second;
- assert(cur == bh->start());
- assert(left >= bh->length());
-
- assert(ap->first+aoff == bh->start());
- if (ap->second-aoff == bh->length()) {
- // perfect.
- cur += bh->length();
- left -= bh->length();
- ap++;
- aoff = 0;
- bhp++;
- continue;
- }
-
- if (bh->length() < ap->second-aoff) {
- // bh is within alloc range.
- cur += bh->length();
- left -= bh->length();
- aoff += bh->length();
- bhp++;
- continue;
- }
-
- // bh spans alloc boundary, split it!
- assert(bh->length() > ap->second - aoff);
- BufferHead *n = bc.split(bh, bh->start() + ap->second-aoff);
- hits[n->start()] = n; // add new guy to hit map
-
- // bh is now shortened...
- cur += bh->length();
- left -= bh->length();
- assert(ap->second == aoff + bh->length());
- aoff = 0;
- ap++;
- continue;
- }
}
+
void Ebofs::apply_write(Onode *on, size_t len, off_t off, bufferlist& bl)
{
ObjectCache *oc = on->get_oc(&bc);
block_t blast = (len+off-1) / EBOFS_BLOCK_SIZE;
block_t blen = blast-bstart+1;
+ // allocate write on disk.
+ interval_set<block_t> alloc;
+ alloc_write(on, bstart, blen, alloc);
+
// map b range onto buffer_heads
map<block_t, BufferHead*> hits;
- oc->map_write(bstart, blen, hits);
-
- // allocate write on disk. break buffer_heads across realloc/no realloc boundaries
- alloc_write(on, bstart, blen, hits);
+ oc->map_write(on, bstart, blen, alloc, hits);
// get current versions
version_t lowv, highv;
i++) {
BufferHead *bh = i->second;
bh->set_version(highv+1);
+ bh->epoch_modified = super_epoch;
- // cancel old io?
- if (bh->is_tx()) {
- if (bh->tx_epoch == super_epoch) {
- // try to cancel the old io (just bc it's a waste)
- dout(10) << "apply_write canceling old io on " << *bh << endl;
- bc.dev.cancel_io(bh->ioh);
- bh->ioh = 0;
- } else {
- // this tx is from prior epoch! shadow+copy the buffer before we modify it.
- bh->shadow_data.claim(bh->data);
- bc.bufferpool.alloc(EBOFS_BLOCK_SIZE*bh->length(), bh->data); // new buffers!
- bh->data.copy_in(0, bh->length()*EBOFS_BLOCK_SIZE, bh->shadow_data);
- bh->shadow_ioh = bh->ioh;
- bh->ioh = 0;
- }
+ // old write in progress?
+ if (bh->is_tx()) { // copy the buffer to avoid munging up in-flight write
+ dout(10) << "apply_write tx pending, copying buffer on " << *bh << endl;
+ bufferlist temp;
+ temp.claim(bh->data);
+ bc.bufferpool.alloc(EBOFS_BLOCK_SIZE*bh->length(), bh->data);
+ bh->data.copy_in(0, bh->length()*EBOFS_BLOCK_SIZE, temp);
+ }
+
+ // need to split off partial?
+ if (bh->is_missing() && bh->length() > 1 &&
+ (bh->start() == bstart && off % EBOFS_BLOCK_SIZE != 0)) {
+ BufferHead *right = bc.split(bh, bh->start()+1);
+ hits[right->start()] = right;
+ dout(10) << "apply_write split off left block for partial write; rest is " << *right << endl;
+ }
+ if (bh->is_missing() && bh->length() > 1 &&
+ (bh->last() == blast && len+off % EBOFS_BLOCK_SIZE != 0) &&
+ (len+off < on->object_size)) {
+ BufferHead *right = bc.split(bh, bh->last());
+ hits[right->start()] = right;
+ dout(10) << "apply_write split off right block for upcoming partial write; rest is " << *right << endl;
}
// partial at head or tail?
if ((bh->start() == bstart && off % EBOFS_BLOCK_SIZE != 0) ||
- (bh->last() == blast && (len+off) % EBOFS_BLOCK_SIZE != 0) ||
- (len % EBOFS_BLOCK_SIZE != 0)) {
+ (bh->last() == blast && (len+off) % EBOFS_BLOCK_SIZE != 0)) {
// locate ourselves in bh
unsigned off_in_bh = opos - bh->start()*EBOFS_BLOCK_SIZE;
assert(off_in_bh >= 0);
bh->data.zero();
bh->apply_partial();
bc.mark_dirty(bh);
- if (bh->ioh) {
- bc.dev.cancel_io( bh->ioh );
- bh->ioh = 0;
- }
- bh_write(on, bh);
+ bc.bh_write(on, bh);
}
else if (bh->is_rx()) {
dout(10) << "apply_write rx -> partial " << *bh << endl;
+ assert(bh->length() == 1);
bc.mark_partial(bh);
+ bc.bh_queue_partial_write(on, bh); // queue the eventual write
}
else if (bh->is_missing()) {
dout(10) << "apply_write missing -> partial " << *bh << endl;
- bh_read(on, bh);
+ assert(bh->length() == 1);
bc.mark_partial(bh);
+ bc.bh_read(on, bh);
+ bc.bh_queue_partial_write(on, bh); // queue the eventual write
}
else if (bh->is_partial()) {
- if (bh->ioh == 0) {
- dout(10) << "apply_write submitting rx for partial " << *bh << endl;
- bh_read(on, bh);
- }
+ dout(10) << "apply_write already partial, no need to submit rx on " << *bh << endl;
+ bc.bh_queue_partial_write(on, bh); // queue the eventual write
}
+
+
} else {
assert(bh->is_clean() || bh->is_dirty() || bh->is_tx());
if (!bh->is_dirty())
bc.mark_dirty(bh);
- bh_write(on, bh);
+ bc.bh_write(on, bh);
}
continue;
}
if (!bh->is_dirty())
bc.mark_dirty(bh);
- bh_write(on, bh);
+ bc.bh_write(on, bh);
}
assert(zleft == 0);
map<block_t, BufferHead*> missing; // read these
map<block_t, BufferHead*> rx; // wait for these
map<block_t, BufferHead*> partials; // ??
- oc->map_read(bstart, blen, hits, missing, rx, partials);
+ oc->map_read(on, bstart, blen, hits, missing, rx, partials);
// missing buffers?
if (!missing.empty()) {
i != missing.end();
i++) {
dout(15) <<"attempt_read missing buffer " << *(i->second) << endl;
- bh_read(on, i->second);
+ bc.bh_read(on, i->second);
}
BufferHead *wait_on = missing.begin()->second;
wait_on->waitfor_read.push_back(new C_E_Cond(will_wait_on));
bool mounted, unmounting;
bool readonly;
version_t super_epoch;
+ bool commit_thread_started, mid_commit;
+ Cond commit_cond; // to wake up the commit thread
+ Cond sync_cond;
void prepare_super(version_t epoch, bufferptr& bp);
void write_super(version_t epoch, bufferptr& bp);
-
- Cond commit_cond; // to wake up the commit thread
int commit_thread_entry();
class CommitThread : public Thread {
BufferCache bc;
pthread_t flushd_thread_id;
+ version_t trigger_commit();
void commit_bc_wait(version_t epoch);
public:
+ void sync();
void trim_buffer_cache();
- void flush_all();
+
protected:
//void zero(Onode *on, size_t len, off_t off, off_t write_thru);
void alloc_write(Onode *on,
block_t start, block_t len,
- map<block_t, BufferHead*>& hits);
+ interval_set<block_t>& alloc);
void apply_write(Onode *on, size_t len, off_t off, bufferlist& bl);
bool attempt_read(Onode *on, size_t len, off_t off, bufferlist& bl, Cond *will_wait_on);
- // io
- void bh_read(Onode *on, BufferHead *bh);
- void bh_write(Onode *on, BufferHead *bh);
-
- friend class C_E_FlushPartial;
-
int flushd_thread();
static int flusd_thread_entry(void *p) {
return ((Ebofs*)p)->flushd_thread();
public:
Ebofs(BlockDevice& d) :
dev(d),
- mounted(false), unmounting(false), readonly(false), super_epoch(0),
+ mounted(false), unmounting(false), readonly(false),
+ super_epoch(0), commit_thread_started(false), mid_commit(false),
commit_thread(this),
free_blocks(0), allocator(this),
bufferpool(EBOFS_BLOCK_SIZE),
+ nodepool(ebofs_lock),
object_tab(0), collection_tab(0), oc_tab(0), co_tab(0),
inodes_flushing(0),
bc(dev, bufferpool, ebofs_lock) {
*
*/
-
-
class Onode : public LRUObject {
private:
int ref;
void get() {
if (ref == 0) lru_pin();
ref++;
- cout << "onode.get " << ref << endl;
+ //cout << "ebofs.onode.get " << ref << endl;
}
void put() {
ref--;
if (ref == 0) lru_unpin();
- cout << "onode.put " << ref << endl;
+ //cout << "ebofs.onode.put " << ref << endl;
}
void mark_dirty() {
return sizeof(Extent) * extents.size();
}
-
};
/** table **/
-class _Table {
- int asdfasdfasdf;
-};
+#define dbtout dout(20)
template<class K, class V>
-class Table : public _Table {
+class Table {
private:
NodePool &pool;
int main(int argc, char **argv)
{
// args
- char *filename = 0;
- if (argc > 1) filename = argv[1];
- if (!filename) return -1;
+ vector<char*> args;
+ argv_to_vec(argc, argv, args);
+ parse_config_options(args);
+
+ if (args.size() < 1) {
+ cerr << "usage: mkfs.ebofs [options] <device file>" << endl;
+ return -1;
+ }
+ char *filename = args[0];
// device
BlockDevice dev(filename);
Ebofs mfs(dev);
mfs.mkfs();
- // test-o-rama!
- Ebofs fs(dev);
- fs.mount();
-
- if (0) { // test
- bufferlist bl;
- char crap[10000];
- memset(crap, 0, 10000);
- bl.append(crap, 10000);
- fs.write(10, bl.length(), 200, bl, (Context*)0);
- fs.trim_buffer_cache();
- fs.write(10, bl.length(), 3222, bl, (Context*)0);
- fs.trim_buffer_cache();
- fs.write(10, 5000, 3222, bl, (Context*)0);
- }
-
- // test small writes
if (1) {
- char crap[10000];
- memset(crap, 0, 10000);
- bufferlist bl;
- bl.append(crap, 10000);
-
- // write
- srand(0);
- for (int i=0; i<100; i++) {
- off_t off = rand() % 1000000;
- size_t len = 100;
- cout << "writing bit at " << off << " len " << len << endl;
- fs.write(10, len, off, bl, (Context*)0);
- }
+ // test-o-rama!
+ Ebofs fs(dev);
+ fs.mount();
- if (0) {
- // read
- srand(0);
- for (int i=0; i<100; i++) {
+ if (1) { // test
bufferlist bl;
- off_t off = rand() % 1000000;
- size_t len = 100;
- cout << "read bit at " << off << " len " << len << endl;
- int r = fs.read(10, len, off, bl);
- assert(bl.length() == len);
- assert(r == 0);
- }
+ char crap[10000];
+ memset(crap, 0, 10000);
+ bl.append(crap, 10000);
+ fs.write(10, bl.length(), 200, bl, (Context*)0);
+ sleep(1);
+ fs.trim_buffer_cache();
+ fs.write(10, bl.length(), 5222, bl, (Context*)0);
+ fs.trim_buffer_cache();
+ //fs.write(10, 5000, 3222, bl, (Context*)0);
}
-
- // flush
- fs.flush_all();
- fs.trim_buffer_cache();
-
+
+ // test small writes
if (0) {
- // read again
- srand(0);
- for (int i=0; i<100; i++) {
+ char crap[10000];
+ memset(crap, 0, 10000);
bufferlist bl;
- off_t off = rand() % 1000000;
- size_t len = 100;
- cout << "read bit at " << off << " len " << len << endl;
- int r = fs.read(10, len, off, bl);
- assert(bl.length() == len);
- assert(r == 0);
+ bl.append(crap, 10000);
+
+ // write
+ srand(0);
+ for (int i=0; i<100; i++) {
+ off_t off = rand() % 1000000;
+ size_t len = 100;
+ cout << "writing bit at " << off << " len " << len << endl;
+ fs.write(10, len, off, bl, (Context*)0);
+ }
+
+ if (0) {
+ // read
+ srand(0);
+ for (int i=0; i<100; i++) {
+ bufferlist bl;
+ off_t off = rand() % 1000000;
+ size_t len = 100;
+ cout << "read bit at " << off << " len " << len << endl;
+ int r = fs.read(10, len, off, bl);
+ assert(bl.length() == len);
+ assert(r == 0);
+ }
+ }
+
+ // flush
+ fs.sync();
+ fs.trim_buffer_cache();
+ //fs.trim_buffer_cache();
+
+ if (0) {
+ // read again
+ srand(0);
+ for (int i=0; i<100; i++) {
+ bufferlist bl;
+ off_t off = rand() % 1000000;
+ size_t len = 100;
+ cout << "read bit at " << off << " len " << len << endl;
+ int r = fs.read(10, len, off, bl);
+ assert(bl.length() == len);
+ assert(r == 0);
+ }
+
+ // flush
+ fs.sync();
+ fs.trim_buffer_cache();
+ }
+
+ // write on empty cache
+ srand(0);
+ for (int i=0; i<100; i++) {
+ off_t off = rand() % 1000000;
+ size_t len = 100;
+ cout << "writing bit at " << off << " len " << len << endl;
+ fs.write(10, len, off, bl, (Context*)0);
+ }
+
}
-
- // flush
+
+ fs.sync();
fs.trim_buffer_cache();
- }
-
- // write on empty cache
- srand(0);
- for (int i=0; i<100; i++) {
- off_t off = rand() % 1000000;
- size_t len = 100;
- cout << "writing bit at " << off << " len " << len << endl;
- fs.write(10, len, off, bl, (Context*)0);
- }
-
+ fs.trim_onode_cache();
+
+ fs.umount();
}
-
- fs.flush_all();
- fs.trim_buffer_cache();
- fs.trim_onode_cache();
-
- fs.umount();
dev.close();
}
set<nodeid_t> clean; // aka used
set<nodeid_t> limbo;
- Mutex lock;
+ Mutex &ebofs_lock;
Cond commit_cond;
int flushing;
public:
- NodePool() : bufferpool(EBOFS_NODE_BYTES),
+ NodePool(Mutex &el) :
+ bufferpool(EBOFS_NODE_BYTES),
+ ebofs_lock(el),
flushing(0) {}
~NodePool() {
// nodes
to read. so it only really works when called from mount()!
*/
for (unsigned r=0; r<region_loc.size(); r++) {
- dout(3) << "read region " << r << " at " << region_loc[r] << endl;
+ dout(3) << "ebofs.nodepool.read region " << r << " at " << region_loc[r] << endl;
for (block_t boff = 0; boff < region_loc[r].length; boff++) {
nodeid_t nid = make_nodeid(r, boff);
if (!clean.count(nid)) continue;
- dout(20) << "read node " << nid << endl;
+ dout(20) << "ebofs.nodepool.read node " << nid << endl;
bufferptr bp = bufferpool.alloc(EBOFS_NODE_BYTES);
dev.read(region_loc[r].start + (block_t)boff, EBOFS_NODE_BLOCKS,
Node *n = new Node(nid, bp, Node::STATE_CLEAN);
node_map[nid] = n;
+ dout(10) << "ebofs.nodepool.read node " << n << " at " << (void*)n << endl;
}
}
return 0;
};
void flushed_usemap() {
- lock.Lock();
+ ebofs_lock.Lock();
flushing--;
if (flushing == 0)
commit_cond.Signal();
- lock.Unlock();
+ ebofs_lock.Unlock();
}
public:
};
void flushed_node(nodeid_t nid) {
- lock.Lock();
+ ebofs_lock.Lock();
assert(tx.count(nid));
+ // mark nid clean|limbo
if (tx.count(nid)) {
tx.erase(nid);
clean.insert(nid);
+
+ // make node itself clean
+ node_map[nid]->set_state(Node::STATE_CLEAN);
}
else {
assert(limbo.count(nid));
}
-
+
flushing--;
if (flushing == 0)
commit_cond.Signal();
- lock.Unlock();
+ ebofs_lock.Unlock();
}
public:
void commit_start(BlockDevice& dev, version_t version) {
- lock.Lock();
assert(!is_committing());
// write map
free.insert(*i);
}
limbo.clear();
-
- lock.Unlock();
}
void commit_wait() {
- lock.Lock();
while (is_committing()) {
- commit_cond.Wait(lock);
+ commit_cond.Wait(ebofs_lock);
}
- lock.Unlock();
}
// new node
Node* new_node(int type) {
nodeid_t nid = alloc_id();
- dbtout << "pool.new_node " << nid << endl;
+ dout(15) << "ebofs.nodepool.new_node " << nid << endl;
// alloc node
bufferptr bp = bufferpool.alloc(EBOFS_NODE_BYTES);
void release(Node *n) {
const nodeid_t nid = n->get_id();
- dbtout << "pool.release on " << nid << endl;
+ dout(15) << "ebofs.nodepool.release on " << nid << endl;
node_map.erase(nid);
if (n->is_dirty()) {
}
void release_all() {
- set<Node*> left;
- for (map<nodeid_t,Node*>::iterator i = node_map.begin();
- i != node_map.end();
- i++)
- left.insert(i->second);
- for (set<Node*>::iterator i = left.begin();
- i != left.end();
- i++)
- release( *i );
+ while (!node_map.empty()) {
+ map<nodeid_t,Node*>::iterator i = node_map.begin();
+ dout(2) << "ebofs.nodepool.release_all leftover " << i->first << " " << i->second << endl;
+ release( i->second );
+ }
assert(node_map.empty());
}
// get new node id?
nodeid_t oldid = n->get_id();
nodeid_t newid = alloc_id();
- dbtout << "pool.dirty_node on " << oldid << " now " << newid << endl;
+ dout(2) << "ebofs.nodepool.dirty_node on " << oldid << " now " << newid << endl;
// release old block
if (n->is_clean()) {
using namespace std;
using namespace __gnu_cxx;
-#define dbtout cout
-
#define MIN(a,b) ((a)<=(b) ? (a):(b))
#define MAX(a,b) ((a)>=(b) ? (a):(b))
bool contains(T i) {
typename map<T,T>::iterator p = find_inc(i);
- if (p == end()) return false;
+ if (p == m.end()) return false;
if (p->first > i) return false;
if (p->first+p->second <= i) return false;
assert(p->first <= i && p->first+p->second > i);
}
bool contains(T start, T len) {
typename map<T,T>::iterator p = find_inc(start);
- if (p == end()) return false;
+ if (p == m.end()) return false;
if (p->first > start) return false;
if (p->first+p->second <= start) return false;
assert(p->first <= start && p->first+p->second > start);
if (p->first+p->second < start+len) return false;
return true;
}
+
+ // outer range of set
+ bool empty() {
+ return m.empty();
+ }
+ T start() {
+ assert(!empty());
+ typename map<T,T>::iterator p = m.begin();
+ return p->first;
+ }
+ T end() {
+ assert(!empty());
+ typename map<T,T>::iterator p = m.end();
+ p--;
+ return p->first+p->second;
+ }
+
+ // interval start after p (where p not in set)
+ bool starts_after(T i) {
+ assert(!contains(i));
+ typename map<T,T>::iterator p = find_inc(i);
+ if (p == m.end()) return false;
+ return true;
+ }
+ T start_after(T i) {
+ assert(!contains(i));
+ typename map<T,T>::iterator p = find_inc(i);
+ return p->first;
+ }
+
+ // interval end that contains start
+ T end_after(T start) {
+ assert(contains(start));
+ typename map<T,T>::iterator p = find_inc(start);
+ return p->first+p->second;
+ }
void insert(T start, T len) {
typename map<T,T>::iterator p = find_adj(start);