dout(7) << "bc: allocated " << bl.buffers().size() << " buffers (" << bl.length() << " bytes) " << endl;
}
-void Bufferhead::dirty() {
+void Bufferhead::miss_start(size_t miss_len)
+{
+ assert(state == BUFHD_STATE_CLEAN);
+ state = BUFHD_STATE_INFLIGHT;
+ this->miss_len = miss_len;
+ bc->lru.lru_touch(this);
+}
+
+void Bufferhead::miss_finish()
+{
+ assert(state == BUFHD_STATE_INFLIGHT);
+ state = BUFHD_STATE_CLEAN;
+ //assert(bl.length() == miss_len);
+ wakeup_read_waiters();
+ wakeup_write_waiters();
+}
+
+void Bufferhead::dirty()
+{
if (state == BUFHD_STATE_CLEAN) {
dout(10) << "bc: dirtying clean buffer size: " << bl.length() << endl;
state = BUFHD_STATE_DIRTY;
dirty_since = time(NULL); // start clock for dirty buffer here
+ bc->lru.lru_touch(this);
bc->clean_to_dirty(bl.length());
- dout(10) << "bc: clean_size: " << bc->get_clean_size() << " dirty_size: " << bc->get_dirty_size() << " flushing_size: " << bc->get_flushing_size() << endl;
+ dout(6) << "bc: clean_size: " << bc->get_clean_size() << " dirty_size: " << bc->get_dirty_size() << " flushing_size: " << bc->get_flushing_size() << " age: " << bc->dirty_buffers.get_age() << endl;
assert(!bc->dirty_buffers.exist(this));
bc->dirty_buffers.insert(this);
get();
}
}
-void Bufferhead::leave_dirtybuffers() {
+void Bufferhead::dirtybuffers_erase()
+{
dout(10) << "bc: erase in dirtybuffers size: " << bl.length() << " in state " << state << endl;
assert(bc->dirty_buffers.exist(this));
bc->dirty_buffers.erase(this);
put();
}
-void Bufferhead::flush_start() {
+void Bufferhead::flush_start()
+{
dout(10) << "bc: flush_start" << endl;
assert(state == BUFHD_STATE_DIRTY);
state = BUFHD_STATE_INFLIGHT;
- leave_dirtybuffers();
+ dirtybuffers_erase();
bc->dirty_to_flushing(bl.length());
- dout(10) << "bc: clean_size: " << bc->get_clean_size() << " dirty_size: " << bc->get_dirty_size() << " flushing_size: " << bc->get_flushing_size() << endl;
+ dout(6) << "bc: clean_size: " << bc->get_clean_size() << " dirty_size: " << bc->get_dirty_size() << " flushing_size: " << bc->get_flushing_size() << " age: " << bc->dirty_buffers.get_age() << endl;
}
-void Bufferhead::flush_finish() {
+void Bufferhead::flush_finish()
+{
dout(10) << "bc: flush_finish" << endl;
assert(state == BUFHD_STATE_INFLIGHT);
state = BUFHD_STATE_CLEAN;
bc->flushing_to_clean(bl.length());
- dout(10) << "bc: clean_size: " << bc->get_clean_size() << " dirty_size: " << bc->get_dirty_size() << " flushing_size: " << bc->get_flushing_size() << endl;
+ dout(6) << "bc: clean_size: " << bc->get_clean_size() << " dirty_size: " << bc->get_dirty_size() << " flushing_size: " << bc->get_flushing_size() << " age: " << bc->dirty_buffers.get_age() << endl;
wakeup_write_waiters(); // readers never wait on flushes
}
// -- Dirtybuffers methods
+void Dirtybuffers::erase(Bufferhead* bh)
+{
+ dout(7) << "dirtybuffer: erase bh->ino: " << bh->ino << " offset: " << bh->offset << endl;
+ int osize = _dbufs.size();
+ for (multimap<time_t, Bufferhead*>::iterator it = _dbufs.lower_bound(bh->dirty_since);
+ it != _dbufs.upper_bound(bh->dirty_since);
+ it++) {
+ if (it->second == bh) {
+ _dbufs.erase(it);
+ break;
+ }
+ }
+ assert(_dbufs.size() == osize - 1);
+}
+
+void Dirtybuffers::insert(Bufferhead* bh)
+{
+ dout(7) << "dirtybuffer: insert bh->ino: " << bh->ino << " offset: " << bh->offset << endl;
+ _dbufs.insert(pair<time_t, Bufferhead*>(bh->dirty_since, bh));
+}
+
+bool Dirtybuffers::exist(Bufferhead* bh)
+{
+ for (multimap<time_t, Bufferhead*>::iterator it = _dbufs.lower_bound(bh->dirty_since);
+ it != _dbufs.upper_bound(bh->dirty_since);
+ it++ ) {
+ if (it->second == bh) {
+ dout(10) << "dirtybuffer: found bh->ino: " << bh->ino << " offset: " << bh->offset << endl;
+ return true;
+ }
+ }
+ return false;
+}
+
+
void Dirtybuffers::get_expired(time_t ttl, size_t left_dirty, list<Bufferhead*>& to_flush)
{
time_t now = time(NULL);
Bufferhead *bh = next->second;
start->second->claim_append(bh);
if (bh->state == BUFHD_STATE_DIRTY) {
- bh->leave_dirtybuffers();
+ bh->dirtybuffers_erase();
bh->state = BUFHD_STATE_CLEAN;
}
removed.push_back(bh);
decrease_size(it->second->bl.length());
- dout(10) << "bc: clean_size: " << get_clean_size() << " dirty_size: " << get_dirty_size() << " flushing_size: " << get_flushing_size() << endl;
+ dout(6) << "bc: clean_size: " << get_clean_size() << " dirty_size: " << get_dirty_size() << " flushing_size: " << get_flushing_size() << " age: " << dirty_buffers.get_age() << endl;
assert(clean_size >= 0);
delete it->second;
}
delete fc;
}
+void Buffercache::get_reclaimable(size_t min_size, list<Bufferhead*>& reclaimed)
+{
+ while (min_size > 0) {
+ if (Bufferhead *bh = (Bufferhead*)lru.lru_expire()) {
+ reclaimed.push_back(bh);
+ min_size -= bh->bl.length();
+ } else {
+ break;
+ }
+ }
+}
+
+
size_t Buffercache::reclaim(size_t min_size)
{
dout(7) << "bc: reclaim min_size: " << min_size << endl;
decrease_size(bh->bl.length());
- dout(10) << "bc: clean_size: " << get_clean_size() << " dirty_size: " << get_dirty_size() << " flushing_size: " << get_flushing_size() << endl;
+ dout(6) << "bc: clean_size: " << get_clean_size() << " dirty_size: " << get_dirty_size() << " flushing_size: " << get_flushing_size() << " age: " << dirty_buffers.get_age() << endl;
assert(clean_size >= 0);
bh->fc->buffer_map.erase(bh->offset);
if (bh->fc->buffer_map.empty()) {
write_waiters.clear();
}
- void miss_start(size_t miss_len) {
- assert(state == BUFHD_STATE_CLEAN);
- state = BUFHD_STATE_INFLIGHT;
- this->miss_len = miss_len;
- }
-
- void miss_finish() {
- assert(state == BUFHD_STATE_INFLIGHT);
- state = BUFHD_STATE_CLEAN;
- //assert(bl.length() == miss_len);
- wakeup_read_waiters();
- wakeup_write_waiters();
- }
-
+ void miss_start(size_t miss_len);
+ void miss_finish();
void dirty();
- void leave_dirtybuffers();
+ void dirtybuffers_erase();
void flush_start();
void flush_finish();
void claim_append(Bufferhead* other);
multimap<time_t, Bufferhead*> _dbufs;
public:
- void erase(Bufferhead* bh) {
- dout(7) << "dirtybuffer: erase bh->ino: " << bh->ino << " offset: " << bh->offset << endl;
- int osize = _dbufs.size();
- for (multimap<time_t, Bufferhead*>::iterator it = _dbufs.lower_bound(bh->dirty_since);
- it != _dbufs.upper_bound(bh->dirty_since);
- it++) {
- if (it->second == bh) {
- _dbufs.erase(it);
- break;
- }
- }
- assert(_dbufs.size() == osize - 1);
- }
-
- void insert(Bufferhead* bh) {
- dout(7) << "dirtybuffer: insert bh->ino: " << bh->ino << " offset: " << bh->offset << endl;
- _dbufs.insert(pair<time_t, Bufferhead*>(bh->dirty_since, bh));
- }
-
+ void erase(Bufferhead* bh);
+ void insert(Bufferhead* bh);
bool empty() { return _dbufs.empty(); }
-
- bool exist(Bufferhead* bh) {
- for (multimap<time_t, Bufferhead*>::iterator it = _dbufs.lower_bound(bh->dirty_since);
- it != _dbufs.upper_bound(bh->dirty_since);
- it++ ) {
- if (it->second == bh) {
- dout(10) << "dirtybuffer: found bh->ino: " << bh->ino << " offset: " << bh->offset << endl;
- return true;
- }
- }
- return false;
- }
+ bool exist(Bufferhead* bh);
void get_expired(time_t ttl, size_t left_dirty, list<Bufferhead*>& to_flush);
+ time_t get_age() {
+ if (!_dbufs.empty()) return time(NULL) - _dbufs.begin()->second->dirty_since;
+ }
};
LRU lru;
Dirtybuffers dirty_buffers;
list<Cond*> waitfor_flushed;
+ set<Bufferhead*> flushing_buffers;
Buffercache() : dirty_size(0), flushing_size(0), clean_size(0) { }
return bcache_map[ino];
}
+ void wait_for_flush(Mutex &lock) {
+ Cond cond;
+ waitfor_flushed.push_back(&cond);
+ cond.Wait(lock);
+ }
+
void clean_to_dirty(size_t size) {
clean_size -= size;
assert(clean_size >= 0);
size_t get_clean_size() { return clean_size; }
size_t get_dirty_size() { return dirty_size; }
size_t get_flushing_size() { return flushing_size; }
+ size_t get_total_size() { return clean_size + dirty_size + flushing_size; }
+ void get_reclaimable(size_t min_size, list<Bufferhead*>&);
void insert(Bufferhead *bh);
void dirty(inodeno_t ino, size_t size, off_t offset, const char *src);
};
-int Client::flush_inode_buffers(Inode *in)
+void Client::flush_inode_buffers(Inode *in)
{
if (!in->inflight_buffers.empty()) {
dout(7) << "inflight buffers of sync write, waiting" << endl;
Bufferhead *bh;
C_Client_FlushFinish(Bufferhead *bh) {
this->bh = bh;
+ bh->bc->flushing_buffers.insert(bh);
}
void finish(int r) {
bh->flush_finish();
+ bh->bc->flushing_buffers.erase(bh);
+ if (bh->bc->flushing_buffers.empty()) {
+ for (list<Cond*>::iterator it = bh->bc->waitfor_flushed.begin();
+ it != bh->bc->waitfor_flushed.end();
+ it++) {
+ (*it)->Signal();
+ }
+ bh->bc->waitfor_flushed.clear();
+ }
}
};
-int Client::flush_buffers(int ttl, size_t dirty_size)
+void Client::flush_buffers(int ttl, size_t dirty_size)
{
// ttl = 0 or dirty_size = 0: flush all
if (!bc.dirty_buffers.empty()) {
(*it)->flush_start();
C_Client_FlushFinish *onfinish = new C_Client_FlushFinish(*it);
filer->write((*it)->ino, g_OSD_FileLayout, (*it)->bl.length(), (*it)->offset, (*it)->bl, 0, onfinish);
+ bc.wait_for_flush(client_lock);
}
} else {
dout(7) << "no dirty buffers" << endl;
}
}
+void Client::trim_bcache()
+{
+ if (bc.get_total_size() > g_conf.client_bcache_size) {
+ // need to free buffers
+ if (bc.get_dirty_size() > g_conf.client_bcache_hiwater * bc.get_total_size()) {
+ // flush buffers until we have low water mark
+ size_t want_target_size = (size_t) g_conf.client_bcache_lowater * bc.get_total_size();
+ flush_buffers(g_conf.client_bcache_ttl, want_target_size);
+ }
+ // Now reclaim buffers
+ bc.reclaim(bc.get_total_size() - g_conf.client_bcache_size);
+ }
+}
+
+
+
/*
* release inode (read cached) buffers from memory
*/
-int Client::release_inode_buffers(Inode *in)
+void Client::release_inode_buffers(Inode *in)
{
#ifdef BUFFERCACHE
- bc.release_file(in->inode.ino);
+ // Check first we actually cached the file
+ if (bc.bcache_map.count(in->inode.ino)) bc.release_file(in->inode.ino);
#endif
}
-
void Client::handle_file_caps(MClientFileCaps *m)
{
Fh *f = fh_map[m->get_fh()];
release_inode_buffers(in);
put_inode( in );
-
+
MClientReply *reply = make_request(req, true, mds_auth);
assert(reply);
int result = reply->get_result();
rvalue = bc.touch_continuous(hits, size, offset);
fc->copy_out(rvalue, offset, buf);
dout(7) << "read bc no hit: returned first " << rvalue << " bytes" << endl;
+
+ trim_bcache();
}
#endif
// done!
}
bc.dirty(in->inode.ino, size, offset, buf);
+ trim_bcache();
+
/*
hack for now.. replace this with a real buffer cache
// buffer cache
Buffercache bc;
- int flush_buffers(int ttl, size_t dirty_size); // flush dirty buffers
- int flush_inode_buffers(Inode *in); // flush buffered writes
- int release_inode_buffers(Inode *in); // release cached reads
+ void flush_buffers(int ttl, size_t dirty_size); // flush dirty buffers
+ void trim_bcache();
+ void flush_inode_buffers(Inode *in); // flush buffered writes
+ void release_inode_buffers(Inode *in); // release cached reads
friend class SyntheticClient;
client_cache_mid: .5,
client_cache_stat_ttl: 10, // seconds until cached stat results become invalid
client_use_random_mds: false,
+
client_bcache_alloc_minsize: 1024,
client_bcache_alloc_maxsize: 262144,
client_bcache_ttl: 30, // seconds until dirty buffers are written to disk
+ client_bcache_size: 10485760, // 10MB *for testing*
+ client_bcache_lowater: .6, // fraction of size
+ client_bcache_hiwater: .8,
+ client_bcache_maxfrag: 10, // max actual relative # of bheads over opt rel # of bheads
+
client_trace: 0,
fuse_direct_io: 1,
float client_cache_mid;
int client_cache_stat_ttl;
bool client_use_random_mds; // debug flag
+
int client_bcache_alloc_minsize;
int client_bcache_alloc_maxsize;
int client_bcache_ttl;
+ int client_bcache_size;
+ float client_bcache_lowater;
+ float client_bcache_hiwater;
+ int client_bcache_maxfrag;
+
int client_trace;
int fuse_direct_io;