}
+ // allocate a single buffer
buffer* alloc() {
// get more memory?
if (freelist.empty()) {
#include "Ebofs.h"
+#undef dout
+#define dout(x) if (x <= g_conf.debug) cout << "allocator."
+
+
+void Allocator::dump_freelist()
+{
+ for (int b=0; b<EBOFS_NUM_FREE_BUCKETS; b++) {
+ cout << "bucket " << b << endl;
+ if (fs->free_tab[b]->get_num_keys() > 0) {
+ Table<block_t,block_t>::Cursor cursor(fs->free_tab[b]);
+ fs->free_tab[b]->find(0, cursor);
+ while (1) {
+ cout << " ex " << cursor.current().key << " + " << cursor.current().value << endl;
+ if (cursor.move_right() < 0) break;
+ }
+ } else {
+ cout << " empty" << endl;
+ }
+ }
+}
+
+
int Allocator::find(Extent& ex, int bucket, block_t num, block_t near)
{
Table<block_t,block_t>::Cursor cursor(fs->free_tab[bucket]);
do {
if (cursor.current().value >= num)
found = true;
- } while (!found && cursor.move_right() > 0);
+ } while (!found && cursor.move_right() >= 0);
}
if (!found) {
// look to the left
fs->free_tab[bucket]->find( near, cursor );
- while (!found && cursor.move_left() > 0)
+ while (!found && cursor.move_left() >= 0)
if (cursor.current().value >= num)
found = true;
}
}
dout(1) << "allocator.alloc " << ex << " near " << near << endl;
+ dump_freelist();
return num;
}
}
fs->free_tab[bucket]->remove(ex.start);
fs->free_blocks -= ex.length;
+ dout(1) << "allocator.alloc partial " << ex << " near " << near << endl;
+ dump_freelist();
return ex.length;
}
}
dout(1) << "allocate failed, fs full! " << fs->free_blocks << endl;
+ dump_freelist();
return -1;
}
{
Extent newex = ex;
+ dout(1) << "release " << ex << endl;
+
// one after us?
for (int b=0; b<EBOFS_NUM_FREE_BUCKETS; b++) {
Table<block_t,block_t>::Cursor cursor(fs->free_tab[b]);
for (int b=0; b<EBOFS_NUM_FREE_BUCKETS; b++) {
Table<block_t,block_t>::Cursor cursor(fs->free_tab[b]);
fs->free_tab[b]->find( newex.start+newex.length, cursor );
- if (cursor.move_left() > 0 &&
+ if (cursor.move_left() >= 0 &&
(cursor.current().key + cursor.current().value == newex.start)) {
// merge
newex.start = cursor.current().key;
// ok, insert newex
int b = pick_bucket(ex.length);
fs->free_tab[b]->insert(ex.start, ex.length);
+
+ dump_freelist();
return 0;
}
int find(Extent& ex, int bucket, block_t num, block_t near);
+ void dump_freelist();
+
public:
Allocator(Ebofs *f) : fs(f) {}
bio->rval = r;
}
else if (bio->context) {
- bio->context->finish(r);
+ bio->context->finish((int)bio);
delete bio->context;
delete bio;
}
// FIXME?
if (r == 0 && pbio->context) {
- pbio->context->finish(-1);
+ pbio->context->finish(0);
delete pbio->context;
delete pbio;
}
-void ObjectCache::rx_finish(block_t start, block_t length)
+void ObjectCache::rx_finish(ioh_t ioh, block_t start, block_t length)
{
list<Context*> waiters;
else {
dout(10) << "rx_finish ignoring " << *p->second << endl;
}
+
+ if (p->second->ioh == ioh) p->second->ioh = 0;
// trigger waiters
waiters.splice(waiters.begin(), p->second->waitfor_read);
}
-void ObjectCache::tx_finish(block_t start, block_t length, version_t version)
+void ObjectCache::tx_finish(ioh_t ioh, block_t start, block_t length, version_t version)
{
list<Context*> waiters;
p->second->set_last_flushed(version);
bc->mark_clean(p->second);
+ if (p->second->ioh == ioh) p->second->ioh = 0;
+
// trigger waiters
waiters.splice(waiters.begin(), p->second->waitfor_flush);
}
block_t cur = start;
block_t left = len;
- if (p != data.begin() && p->first < cur) {
+ if (p != data.begin() &&
+ (p->first > cur || p == data.end())) {
p--; // might overlap!
if (p->first + p->second->length() <= cur)
p++; // doesn't overlap.
block_t cur = start;
block_t left = len;
- if (p != data.begin() && p->first < cur) {
+ if (p != data.begin() &&
+ (p->first > cur || p == data.end())) {
p--; // might overlap!
if (p->first + p->second->length() <= cur)
p++; // doesn't overlap.
}
*/
+ void copy_partial_substr(off_t start, off_t end, bufferlist& bl) {
+ map<off_t, bufferlist>::iterator i = partial.begin();
+
+ // skip first bits (fully to left)
+ while ((i->first + i->second.length() < start) &&
+ i != partial.end())
+ i++;
+ assert(i != partial.end());
+ assert(i->first <= start);
+
+ // first
+ unsigned bhoff = MAX(start, i->first) - i->first;
+ unsigned bhlen = MIN(end-start, i->second.length());
+ bl.substr_of( i->second, bhoff, bhlen );
+
+ off_t pos = i->first + i->second.length();
+
+ // have continuous to end?
+ for (i++; i != partial.end(); i++) {
+ if (pos >= end) break;
+ assert(pos == i->first);
+
+ pos = i->first + i->second.length();
+
+ if (pos <= end) { // this whole frag
+ bl.append( i->second );
+ } else { // partial end
+ unsigned bhlen = end-start-bl.length();
+ bufferlist frag;
+ frag.substr_of( i->second, 0, bhlen );
+ bl.claim_append(frag);
+ break; // done.
+ }
+ }
+
+ assert(pos >= end);
+ assert(bl.length() == (unsigned)(end-start));
+ }
+
+ bool have_partial_range(off_t start, off_t end) {
+ map<off_t, bufferlist>::iterator i = partial.begin();
+
+ // skip first bits (fully to left)
+ while ((i->first + i->second.length() < start) &&
+ i != partial.end())
+ i++;
+ if (i == partial.end()) return false;
+
+ // have start?
+ if (i->first > start) return false;
+ off_t pos = i->first + i->second.length();
+
+ // have continuous to end?
+ for (i++; i != partial.end(); i++) {
+ assert(pos <= i->first);
+ if (pos < i->first) return false;
+ assert(pos == i->first);
+ pos = i->first + i->second.length();
+ if (pos >= end) break; // gone far enough
+ }
+
+ if (pos >= end) return true;
+ return false;
+ }
+
bool partial_is_complete(off_t size) {
+ return have_partial_range( (off_t)(start()*EBOFS_BLOCK_SIZE),
+ MIN( size, (off_t)(end()*EBOFS_BLOCK_SIZE) ) );
+ /*
map<off_t, bufferlist>::iterator i = partial.begin();
if (i == partial.end()) return false;
if (i->first != (off_t)(object_loc.start * EBOFS_BLOCK_SIZE)) return false;
off_t upto = MIN( size, (off_t)(end()*EBOFS_BLOCK_SIZE) );
if (pos == upto) return true;
return false;
+ */
}
void apply_partial() {
const off_t bhstart = start() * EBOFS_BLOCK_SIZE;
int scan_versions(block_t start, block_t len,
version_t& low, version_t& high);
- void rx_finish(block_t start, block_t length);
- void tx_finish(block_t start, block_t length, version_t v);
+ void rx_finish(ioh_t ioh, block_t start, block_t length);
+ void tx_finish(ioh_t ioh, block_t start, block_t length, version_t v);
};
C_OC_RxFinish(ObjectCache *o, block_t s, block_t l) :
oc(o), start(s), length(l) {}
void finish(int r) {
- if (r == 0)
- oc->rx_finish(start, length);
+ ioh_t ioh = (ioh_t)r;
+ if (ioh)
+ oc->rx_finish(ioh, start, length);
}
};
C_OC_TxFinish(ObjectCache *o, block_t s, block_t l, version_t v) :
oc(o), start(s), length(l), version(v) {}
void finish(int r) {
- if (r == 0)
- oc->tx_finish(start, length, version);
+ ioh_t ioh = (ioh_t)r;
+ if (ioh)
+ oc->tx_finish(ioh, start, length, version);
}
};
bc.lock.Unlock();
}
-class C_E_Flush : public Context {
- int *i;
- Mutex *m;
- Cond *c;
-public:
- C_E_Flush(int *_i, Mutex *_m, Cond *_c) : i(_i), m(_m), c(_c) {}
- void finish(int r) {
- (*i)--;
- m->Lock();
- c->Signal();
- m->Unlock();
- }
-};
void Ebofs::flush_all()
{
// FIXME what about partial heads?
- // write all dirty bufferheads
+ dout(1) << "flush_all" << endl;
+
bc.lock.Lock();
- dout(1) << "flush_all writing dirty bufferheads" << endl;
- while (!bc.dirty_bh.empty()) {
- set<BufferHead*>::iterator i = bc.dirty_bh.begin();
- BufferHead *bh = *i;
- if (bh->ioh) continue;
- Onode *on = get_onode(bh->oc->get_object_id());
- bh_write(on, bh);
- put_onode(on);
- }
- dout(1) << "flush_all submitted" << endl;
+ while (bc.get_stat_dirty() > 0 || // not strictly necessary
+ bc.get_stat_tx() > 0 ||
+ bc.get_stat_partial() > 0 ||
+ bc.get_stat_rx() > 0) {
+
+ // write all dirty bufferheads
+ while (!bc.dirty_bh.empty()) {
+ set<BufferHead*>::iterator i = bc.dirty_bh.begin();
+ BufferHead *bh = *i;
+ if (bh->ioh) continue;
+ Onode *on = get_onode(bh->oc->get_object_id());
+ bh_write(on, bh);
+ put_onode(on);
+ }
-
- while (bc.get_stat_tx() > 0 ||
- bc.get_stat_partial() > 0) {
- dout(1) << "flush_all waiting for " << bc.get_stat_tx() << " tx, " << bc.get_stat_partial() << " partial" << endl;
+ // wait for all tx and partial buffers to flush
+ dout(1) << "flush_all waiting for "
+ << bc.get_stat_dirty() << " dirty, "
+ << bc.get_stat_tx() << " tx, "
+ << bc.get_stat_rx() << " rx, "
+ << bc.get_stat_partial() << " partial"
+ << endl;
bc.waitfor_stat();
}
bc.lock.Unlock();
void Ebofs::bh_read(Onode *on, BufferHead *bh)
{
dout(5) << "bh_read " << *on << " on " << *bh << endl;
- assert(bh->get_version() == 0);
- assert(bh->is_rx() || bh->is_partial());
+
+ if (bh->is_missing()) {
+ bc.mark_rx(bh);
+ } else {
+ assert(bh->is_partial());
+ }
// get extents
vector<Extent> ex;
on->map_extents(bh->start(), bh->length(), ex);
+
+ // alloc new buffer
+ bc.bufferpool.alloc_list(bh->length(), bh->data); // new buffers!
// lay out on disk
- block_t ooff = 0;
+ block_t bhoff = 0;
for (unsigned i=0; i<ex.size(); i++) {
- dout(10) << "bh_read " << ooff << ": " << ex[i] << endl;
+ dout(10) << "bh_read " << bhoff << ": " << ex[i] << endl;
bufferlist sub;
- sub.substr_of(bh->data, ooff*EBOFS_BLOCK_SIZE, ex[i].length*EBOFS_BLOCK_SIZE);
+ sub.substr_of(bh->data, bhoff*EBOFS_BLOCK_SIZE, ex[i].length*EBOFS_BLOCK_SIZE);
- if (bh->is_partial())
- bh->waitfor_read.push_back(new C_E_FlushPartial(this, on, bh));
+ //if (bh->is_partial())
+ //bh->waitfor_read.push_back(new C_E_FlushPartial(this, on, bh));
assert(bh->ioh == 0);
bh->ioh = dev.read(ex[i].start, ex[i].length, sub,
- new C_OC_RxFinish(on->oc, ooff, ex[i].length));
-
- ooff += ex[i].length;
+ new C_OC_RxFinish(on->oc,
+ bhoff + bh->start(), ex[i].length));
+
+ bhoff += ex[i].length;
}
}
// partial at head or tail?
if ((bh->start() == bstart && off % EBOFS_BLOCK_SIZE != 0) ||
- (bh->last() == blast && (len+off) % EBOFS_BLOCK_SIZE != 0)) {
+ (bh->last() == blast && (len+off) % EBOFS_BLOCK_SIZE != 0) ||
+ (len % EBOFS_BLOCK_SIZE != 0)) {
// locate ourselves in bh
unsigned off_in_bh = opos - bh->start()*EBOFS_BLOCK_SIZE;
assert(off_in_bh >= 0);
else if (bh->is_missing()) {
dout(10) << "apply_write missing -> partial " << *bh << endl;
bh_read(on, bh);
+ bc.mark_partial(bh);
}
else if (bh->is_partial()) {
if (bh->ioh == 0) {
continue;
}
- // ok, we're talking full blocks now.
-
+ // ok, we're talking full block(s) now.
+ assert(opos % EBOFS_BLOCK_SIZE == 0);
+ assert(zleft+left >= (off_t)(EBOFS_BLOCK_SIZE*bh->length()));
// alloc new buffers.
- bc.bufferpool.alloc_list(len, bh->data);
+ bc.bufferpool.alloc_list(bh->length(), bh->data);
// copy!
unsigned len_in_bh = bh->length()*EBOFS_BLOCK_SIZE;
// *** file i/o ***
+class C_E_Cond : public Context {
+ Cond *cond;
+public:
+ C_E_Cond(Cond *c) : cond(c) {}
+ void finish(int r) {
+ cond->Signal();
+ }
+};
+
+bool Ebofs::attempt_read(Onode *on, size_t len, off_t off, bufferlist& bl, Cond *will_wait_on)
+{
+ dout(10) << "attempt_read " << *on << " len " << len << " off " << off << endl;
+ ObjectCache *oc = on->get_oc(&bc);
+
+ // map
+ block_t bstart = off / EBOFS_BLOCK_SIZE;
+ block_t blast = (len+off-1) / EBOFS_BLOCK_SIZE;
+ block_t blen = blast-bstart+1;
+
+ map<block_t, BufferHead*> hits;
+ map<block_t, BufferHead*> missing; // read these
+ map<block_t, BufferHead*> rx; // wait for these
+ map<block_t, BufferHead*> partials; // ??
+ oc->map_read(bstart, blen, hits, missing, rx, partials);
+
+ // missing buffers?
+ if (!missing.empty()) {
+ for (map<block_t,BufferHead*>::iterator i = missing.begin();
+ i != missing.end();
+ i++) {
+ dout(15) <<"attempt_read missing buffer " << *(i->second) << endl;
+ bh_read(on, i->second);
+ }
+ BufferHead *wait_on = missing.begin()->second;
+ wait_on->waitfor_read.push_back(new C_E_Cond(will_wait_on));
+ return false;
+ }
+
+ // are partials sufficient?
+ bool partials_ok = true;
+ for (map<block_t,BufferHead*>::iterator i = partials.begin();
+ i != partials.end();
+ i++) {
+ off_t start = MAX( off, (off_t)(i->second->start()*EBOFS_BLOCK_SIZE) );
+ off_t end = MIN( off+len, (off_t)(i->second->end()*EBOFS_BLOCK_SIZE) );
+
+ if (!i->second->have_partial_range(start, end)) {
+ if (partials_ok) {
+ // wait on this one
+ dout(15) <<"attempt_read insufficient partial buffer " << *(i->second) << endl;
+ i->second->waitfor_read.push_back(new C_E_Cond(will_wait_on));
+ }
+ partials_ok = false;
+ }
+ }
+ if (!partials_ok) return false;
+
+ // wait on rx?
+ if (!rx.empty()) {
+ BufferHead *wait_on = rx.begin()->second;
+ dout(15) <<"attempt_read waiting for read to finish on " << *wait_on << endl;
+ wait_on->waitfor_read.push_back(new C_E_Cond(will_wait_on));
+ return false;
+ }
+
+ // yay, we have it all!
+ // concurrently walk thru hits, partials.
+ map<block_t,BufferHead*>::iterator h = hits.begin();
+ map<block_t,BufferHead*>::iterator p = partials.begin();
+
+ off_t pos = off;
+ block_t curblock = bstart;
+ while (curblock <= blast) {
+ BufferHead *bh = 0;
+ if (h->first == curblock) {
+ bh = h->second;
+ h++;
+ } else if (p->first == curblock) {
+ bh = p->second;
+ p++;
+ } else assert(0);
+
+ off_t bhstart = (off_t)(bh->start()*EBOFS_BLOCK_SIZE);
+ off_t bhend = (off_t)(bh->end()*EBOFS_BLOCK_SIZE);
+ off_t start = MAX( pos, bhstart );
+ off_t end = MIN( off+len, bhend );
+
+ if (bh->is_partial()) {
+ // copy from a partial block. yuck!
+ bufferlist frag;
+ bh->copy_partial_substr( start, end, frag );
+ bl.claim_append( frag );
+ pos += frag.length();
+ } else {
+ // copy from a full block.
+ if (bhstart == start && bhend == end) {
+ bl.append( bh->data );
+ pos += bh->data.length();
+ } else {
+ bufferlist frag;
+ frag.substr_of(bh->data, start-bhstart, end-start);
+ pos += frag.length();
+ bl.claim_append( frag );
+ }
+ }
+
+ curblock = bh->end();
+ assert((off_t)(curblock*EBOFS_BLOCK_SIZE) == pos ||
+ end != bhend);
+ }
+
+ assert(bl.length() == len);
+ return true;
+}
+
int Ebofs::read(object_t oid,
size_t len, off_t off,
bufferlist& bl)
{
+ Onode *on = get_onode(oid);
+ if (!on)
+ return -1; // object dne?
+
+ // read data into bl. block as necessary.
+ Cond cond;
+
+ bc.lock.Lock();
+ while (1) {
+ // check size bound
+ if (off >= on->object_size) {
+ put_onode(on);
+ break;
+ }
+ size_t will_read = MIN( off+len, on->object_size ) - off;
+
+ if (attempt_read(on, will_read, off, bl, &cond))
+ break; // yay
+
+ // wait
+ cond.Wait(bc.lock);
+ }
+ bc.lock.Unlock();
+
return 0;
}
void zero(Onode *on, size_t len, off_t off, off_t write_thru);
void apply_write(Onode *on, size_t len, off_t off, bufferlist& bl);
- bool attempt_read(Onode *on, size_t len, off_t off, bufferlist& bl);
+ bool attempt_read(Onode *on, size_t len, off_t off, bufferlist& bl, Cond *will_wait_on);
// io
void bh_read(Onode *on, BufferHead *bh);
open[l+1] = table->pool.get_node( open[l].index_item(pos[l]).node );
pos[l+1] = open[l+1].size() - 1;
}
- return 0;
+ return 1;
}
int move_right() {
if (table->depth == 0) return OOB;
open[l+1] = table->pool.get_node( open[l].index_item(pos[l]).node );
pos[l+1] = 0; // furthest left
}
- return 0;
+ return 1;
}
// ** modifications **
Ebofs fs(dev);
fs.mkfs();
- if (1) { // test
+ if (0) { // test
bufferlist bl;
char crap[10000];
memset(crap, 0, 10000);
fs.write(10, 5000, 3222, bl, 0);
}
+ // test small writes
+ if (1) {
+ char crap[10000];
+ memset(crap, 0, 10000);
+ bufferlist bl;
+ bl.append(crap, 10000);
+
+ // write
+ srand(0);
+ for (int i=0; i<100; i++) {
+ off_t off = rand() % 1000000;
+ size_t len = 100;
+ cout << "writing bit at " << off << " len " << len << endl;
+ fs.write(10, len, off, bl, 0);
+ }
+
+ if (0) {
+ // read
+ srand(0);
+ for (int i=0; i<100; i++) {
+ bufferlist bl;
+ off_t off = rand() % 1000000;
+ size_t len = 100;
+ cout << "read bit at " << off << " len " << len << endl;
+ int r = fs.read(10, len, off, bl);
+ assert(bl.length() == len);
+ assert(r == 0);
+ }
+ }
+
+ // flush
+ fs.flush_all();
+ fs.trim_buffer_cache();
+
+ if (0) {
+ // read again
+ srand(0);
+ for (int i=0; i<100; i++) {
+ bufferlist bl;
+ off_t off = rand() % 1000000;
+ size_t len = 100;
+ cout << "read bit at " << off << " len " << len << endl;
+ int r = fs.read(10, len, off, bl);
+ assert(bl.length() == len);
+ assert(r == 0);
+ }
+
+ // flush
+ fs.trim_buffer_cache();
+ }
+
+ // write on empty cache
+ srand(0);
+ for (int i=0; i<100; i++) {
+ off_t off = rand() % 1000000;
+ size_t len = 100;
+ cout << "writing bit at " << off << " len " << len << endl;
+ fs.write(10, len, off, bl, 0);
+ }
+
+ }
+
fs.flush_all();
fs.trim_buffer_cache();
fs.trim_onode_cache();
bufferptr tempbp(bp, len, off);
push_back(tempbp);
}
+ void append(bufferlist& bl) {
+ bufferlist temp = bl; // copy list
+ claim_append(temp); // and append
+ }
/*