}
-void ObjectCacher::Object::merge(BufferHead *left, BufferHead *right)
+void ObjectCacher::Object::merge_left(BufferHead *left, BufferHead *right)
{
assert(left->end() == right->start());
assert(left->get_state() == right->get_state());
- dout(10) << "merge " << *left << " + " << *right << endl;
+ dout(10) << "merge_left " << *left << " + " << *right << endl;
oc->bh_remove(this, right);
oc->bh_stat_sub(left);
left->set_length( left->length() + right->length());
left->bl.claim_append(right->bl);
// version
- // note: this is sorta busted, but shouldn't be used, cuz we're pbly about to write.. right?
+ // note: this is sorta busted, but should only be used for dirty buffers
left->last_write_tid = MAX( left->last_write_tid, right->last_write_tid );
+ left->last_write = MAX( left->last_write, right->last_write );
// waiters
for (map<off_t, list<Context*> >::iterator p = right->waitfor_read.begin();
// hose right
delete right;
- dout(10) << "merge result " << *left << endl;
+ dout(10) << "merge_left result " << *left << endl;
}
+/* buggy possibly, but more importnatly, unnecessary.
+void ObjectCacher::Object::merge_right(BufferHead *left, BufferHead *right)
+{
+ assert(left->end() == right->start());
+ assert(left->get_state() == right->get_state());
+
+ dout(10) << "merge_right " << *left << " + " << *right << endl;
+ oc->bh_remove(this, left);
+ oc->bh_stat_sub(right);
+ data.erase(right->start());
+ right->set_start( left->start() );
+ data[right->start()] = right;
+ right->set_length( left->length() + right->length());
+ oc->bh_stat_add(right);
+
+ // data
+ bufferlist nbl;
+ nbl.claim(left->bl);
+ nbl.claim_append(right->bl);
+ right->bl.claim(nbl);
+
+ // version
+ // note: this is sorta busted, but should only be used for dirty buffers
+ right->last_write_tid = MAX( left->last_write_tid, right->last_write_tid );
+
+ // waiters
+ map<off_t,list<Context*> > old;
+ old.swap(right->waitfor_read);
+
+ // take left's waiters
+ right->waitfor_read.swap(left->waitfor_read);
+
+ // shift old waiters
+ for (map<off_t, list<Context*> >::iterator p = old.begin();
+ p != old.end();
+ p++)
+ right->waitfor_read[p->first + left->length()].swap( p->second );
+
+ // hose left
+ delete left;
+
+ dout(10) << "merge_right result " << *right << endl;
+}
+*/
+
/*
* map a range of bytes into buffer_heads.
* - create missing buffer_heads as necessary.
* map a range of extents on an object's buffer cache.
* - combine any bh's we're writing into one
* - break up bufferheads that don't fall completely within the range
+ * //no! - return a bh that includes the write. may also include other dirty data to left and/or right.
*/
ObjectCacher::BufferHead *ObjectCacher::Object::map_write(Objecter::OSDWrite *wr)
{
if (p != data.begin() &&
(p == data.end() || p->first > cur)) {
- p--; // might overlap!
+ p--; // might overlap or butt up!
+
+ /*// dirty and butts up?
+ if (p->first + p->second->length() == cur &&
+ p->second->is_dirty()) {
+ dout(10) << "map_write will append to tail of " << *p->second << endl;
+ final = p->second;
+ }
+ */
if (p->first + p->second->length() <= cur)
p++; // doesn't overlap.
}
BufferHead *bh = p->second;
dout(10) << "map_write bh " << *bh << " intersected" << endl;
- if (p->first < cur) {
+ /*if (bh->is_dirty()) {
+ // already dirty, let's use it.
+ final = bh;
+ } else {
+ */
+ if (p->first < cur) {
assert(final == 0);
- if (cur + max >= p->first + p->second->length()) {
- // we want right bit (one splice)
+ if (cur + max >= p->first + p->second->length()) {
+ // we want right bit (one splice)
final = split(bh, cur); // just split it, take right half.
- p++;
- assert(p->second == final);
- } else {
- // we want middle bit (two splices)
+ p++;
+ assert(p->second == final);
+ } else {
+ // we want middle bit (two splices)
final = split(bh, cur);
p++;
assert(p->second == final);
split(final, cur+max);
}
- } else if (p->first == cur) {
- if (p->second->length() <= max) {
- // whole bufferhead, piece of cake.
- } else {
- // we want left bit (one splice)
+ } else if (p->first == cur) {
+ /*if (bh->is_dirty()) {
+ // already dirty, use it.
+ }
+ else*/
+ if (p->second->length() <= max) {
+ // whole bufferhead, piece of cake.
+ } else {
+ // we want left bit (one splice)
split(bh, cur + max); // just split
}
if (final)
- merge(final,bh);
+ merge_left(final,bh);
else
final = bh;
- }
-
+ }
+
// keep going.
off_t lenfromcur = final->end() - cur;
cur += lenfromcur;
/* private */
-void ObjectCacher::bh_read(Object *ob, BufferHead *bh)
+void ObjectCacher::bh_read(BufferHead *bh)
{
dout(7) << "bh_read on " << *bh << endl;
+ mark_rx(bh);
+
// finisher
- C_ReadFinish *onfinish = new C_ReadFinish(this, ob->get_oid(), bh->start(), bh->length());
+ C_ReadFinish *onfinish = new C_ReadFinish(this, bh->ob->get_oid(), bh->start(), bh->length());
// go
- objecter->read(ob->get_oid(), bh->start(), bh->length(), &onfinish->bl,
+ objecter->read(bh->ob->get_oid(), bh->start(), bh->length(), &onfinish->bl,
onfinish);
}
<< opos << "~" << bh->start() - opos
<< endl;
opos = bh->start();
- p++;
continue;
}
if (!bh->is_rx()) {
dout(10) << "bh_read_finish skipping non-rx " << *bh << endl;
+ opos = bh->end();
+ p++;
continue;
}
+ assert(opos >= bh->start());
assert(bh->start() == opos); // we don't merge rx bh's... yet!
- assert(bh->length() < start+(off_t)length-opos);
+ assert(bh->length() <= start+(off_t)length-opos);
bh->bl.substr_of(bl,
- start+length-opos,
+ opos-bh->start(),
bh->length());
mark_clean(bh);
dout(10) << "bh_read_finish read " << *bh << endl;
-
+
+ opos = bh->end();
+ p++;
+
// finishers?
// called with lock held.
list<Context*> ls;
}
-void ObjectCacher::bh_write(Object *ob, BufferHead *bh)
+void ObjectCacher::bh_write(BufferHead *bh)
{
dout(7) << "bh_write " << *bh << endl;
// finishers
- C_WriteAck *onack = new C_WriteAck(this, ob->get_oid(), bh->start(), bh->length());
- C_WriteCommit *oncommit = new C_WriteCommit(this, ob->get_oid(), bh->start(), bh->length());
+ C_WriteAck *onack = new C_WriteAck(this, bh->ob->get_oid(), bh->start(), bh->length());
+ C_WriteCommit *oncommit = new C_WriteCommit(this, bh->ob->get_oid(), bh->start(), bh->length());
// go
- tid_t tid = objecter->write(ob->get_oid(), bh->start(), bh->length(), bh->bl,
+ tid_t tid = objecter->write(bh->ob->get_oid(), bh->start(), bh->length(), bh->bl,
onack, oncommit);
// set bh last_write_tid
onack->tid = tid;
oncommit->tid = tid;
- ob->last_write_tid = tid;
+ bh->ob->last_write_tid = tid;
bh->last_write_tid = tid;
mark_tx(bh);
Object *ob = objects[oid];
// apply to bh's!
- off_t opos = start;
-
- for (map<off_t, BufferHead*>::iterator p = ob->data.lower_bound(opos);
- p != ob->data.end() && opos < start+(off_t)length;
+ for (map<off_t, BufferHead*>::iterator p = ob->data.lower_bound(start);
+ p != ob->data.end();
p++) {
BufferHead *bh = p->second;
+ if (bh->start() > start+(off_t)length) break;
+
if (bh->start() < start &&
bh->end() > start+(off_t)length) {
dout(20) << "bh_write_ack skipping " << *bh << endl;
}
-void ObjectCacher::flush()
+void ObjectCacher::flush(off_t amount)
{
utime_t cutoff = g_clock.now();
//cutoff.sec_ref() -= g_conf.client_oc_max_dirty_age;
- dout(10) << "flush" << endl;
+ dout(10) << "flush " << amount << endl;
- while (1) {
+ off_t did = 0;
+ while (amount == 0 || did < amount) {
BufferHead *bh = (BufferHead*) lru_dirty.lru_get_next_expire();
if (!bh) break;
if (bh->last_write > cutoff) break;
- bh_write(bh->ob, bh);
+ did += bh->length();
+ bh_write(bh);
}
}
+
void ObjectCacher::trim(off_t max)
{
if (max < 0)
map<off_t, BufferHead*> hits, missing, rx;
o->map_read(rd, hits, missing, rx);
- if (!missing.empty() && !rx.empty()) {
+ if (!missing.empty() || !rx.empty()) {
// read missing
for (map<off_t, BufferHead*>::iterator bh_it = missing.begin();
bh_it != missing.end();
bh_it++) {
- bh_read(o, bh_it->second);
+ bh_read(bh_it->second);
if (success) {
dout(10) << "readx missed, waiting on " << *bh_it->second
<< " off " << bh_it->first << endl;
}
}
} else {
+ assert(!hits.empty());
+
// make a plain list
for (map<off_t, BufferHead*>::iterator bh_it = hits.begin();
bh_it != hits.end();
i != stripe_map.end();
i++) {
assert(pos == i->first);
+ dout(10) << "readx adding buffer len " << i->second.length() << " at " << pos << endl;
pos += i->second.length();
rd->bl->claim_append(i->second);
}
+ dout(10) << "readx result is " << rd->bl->length() << endl;
trim();
for (map<size_t,size_t>::iterator f_it = ex_it->buffer_extents.begin();
f_it != ex_it->buffer_extents.end();
f_it++) {
+ dout(10) << "writex writing " << f_it->first << "~" << f_it->second << " into " << *bh << " at " << opos << endl;
size_t bhoff = bh->start() - opos;
assert(f_it->second <= bh->length() - bhoff);
mark_dirty(bh);
touch_bh(bh);
bh->last_write = now;
+
+ // recombine with left?
+ map<off_t,BufferHead*>::iterator p = o->data.find(bh->start());
+ if (p != o->data.begin()) {
+ p--;
+ if (p->second->is_dirty()) {
+ o->merge_left(p->second,bh);
+ bh = p->second;
+ }
+ }
+ // right?
+ p = o->data.find(bh->start());
+ p++;
+ if (p != o->data.end() &&
+ p->second->is_dirty())
+ o->merge_left(p->second,bh);
}
delete wr;
// blocking wait for write.
void ObjectCacher::wait_for_write(size_t len, Mutex& lock)
{
- while (get_stat_dirty() + (off_t)len > g_conf.client_oc_max_dirty) {
+ while (get_stat_dirty() > g_conf.client_oc_max_dirty) {
dout(10) << "wait_for_write waiting" << endl;
+ flusher_cond.Signal();
stat_waiter++;
stat_cond.Wait(lock);
stat_waiter--;
}
}
+void ObjectCacher::flusher_entry()
+{
+ dout(10) << "flusher start" << endl;
+ lock.Lock();
+ while (!flusher_stop) {
+ while (!flusher_stop) {
+ off_t all = get_stat_tx() + get_stat_rx() + get_stat_clean() + get_stat_dirty();
+ dout(11) << "flusher "
+ << all << " / " << g_conf.client_oc_size << ": "
+ << get_stat_tx() << " tx, "
+ << get_stat_rx() << " rx, "
+ << get_stat_clean() << " clean, "
+ << get_stat_dirty() << " / " << g_conf.client_oc_max_dirty << " dirty"
+ << endl;
+ if (get_stat_dirty() > g_conf.client_oc_max_dirty) {
+ // flush some dirty pages
+ dout(10) << "flusher "
+ << get_stat_dirty() << " / " << g_conf.client_oc_max_dirty << " dirty,"
+ << " flushing some dirty bhs" << endl;
+ flush(get_stat_dirty() - g_conf.client_oc_max_dirty);
+ }
+ else {
+ // check tail of lru for old dirty items
+ utime_t cutoff = g_clock.now();
+ cutoff.sec_ref()--;
+ BufferHead *bh = 0;
+ while ((bh = (BufferHead*)lru_dirty.lru_get_next_expire()) != 0 &&
+ bh->last_write < cutoff) {
+ dout(10) << "flusher flushing aged dirty bh " << *bh << endl;
+ bh_write(bh);
+ }
+ break;
+ }
+ }
+ if (flusher_stop) break;
+ flusher_cond.WaitInterval(lock, utime_t(1,0));
+ }
+ lock.Unlock();
+ dout(10) << "flusher finish" << endl;
+}
+
+
// blocking. atomic+sync.
int ObjectCacher::atomic_sync_readx(Objecter::OSDRead *rd, inodeno_t ino, Mutex& lock)
}
if (!bh->is_dirty()) continue;
- bh_write(ob, bh);
+ bh_write(bh);
clean = false;
}
return clean;
finish_contexts(ls);
}
+
+
+