#include "include/config.h"
#undef dout
-#define dout(l) if (l<=g_conf.debug) cout << "client" << whoami << "." << pthread_self() << " "
+#define dout(l) if (l<=g_conf.debug) cout << "client" << "." << pthread_self() << " "
return dn;
}
-
-
-
// -------
MClientReply *Client::make_request(MClientRequest *req)
client_lock.Unlock();
}
-
-
-
-
-
-
/*
* flush inode (write cached) buffers to disk
*/
+class C_Client_FileFlushFinish : public Context {
+public:
+ Filecache *fc;
+ Bufferhead *bh;
+ C_Client_FileFlushFinish(Filecache *fc, Bufferhead *bh) {
+ this->fc = fc;
+ this->bh = bh;
+ }
+ void finish(int r) {
+ bh->flush_finish();
+ if (fc->dirty_buffers.empty()) {
+ // wake up flush waiters
+ for (list<Cond*>::iterator it = fc->waitfor_flushed.begin();
+ it != fc->waitfor_flushed.end();
+ it++) {
+ (*it)->Signal();
+ }
+ fc->waitfor_flushed.clear();
+ }
+ }
+};
+
+
int Client::flush_inode_buffers(Inode *in)
{
- if (in->inflight_buffers.size()
- /* || in->dirty_buffers.size() */) {
- dout(7) << "inflight buffers, waiting" << endl;
- Cond *cond = new Cond;
- in->waitfor_flushed.push_back(cond);
- cond->Wait(client_lock);
- delete cond;
- assert(in->inflight_buffers.empty());
- dout(7) << "inflight buffers flushed" << endl;
+ if (!in->inflight_buffers.empty()) {
+ dout(7) << "inflight buffers of sync write, waiting" << endl;
+ Cond *cond = new Cond;
+ in->waitfor_flushed.push_back(cond);
+ cond->Wait(client_lock);
+ delete cond;
+ assert(in->inflight_buffers.empty());
+ dout(7) << "inflight buffers flushed" << endl;
+#ifdef BUFFERCACHE
+ } else if (!bc.get_fc(in->inode.ino)->dirty_buffers.empty()) {
+ dout(7) << "inode " << in->inode.ino << " has dirty buffers" << endl;
+ Filecache *fc = bc.get_fc(in->inode.ino);
+ fc->simplify();
+ for (set<Bufferhead*>::iterator it = fc->dirty_buffers.begin();
+ it != fc->dirty_buffers.end();
+ it++) {
+ (*it)->flush_start();
+ C_Client_FileFlushFinish *onfinish = new C_Client_FileFlushFinish(fc, *it);
+ filer->write(in->inode.ino, g_OSD_FileLayout, (*it)->bl.length(), (*it)->offset, (*it)->bl, 0, onfinish);
+ }
+ dout(7) << "dirty buffers, waiting" << endl;
+ fc->wait_for_flush(client_lock);
+#endif
} else {
dout(7) << "no inflight buffers" << endl;
}
}
+class C_Client_FlushFinish : public Context {
+public:
+ Buffercache *bc;
+ Bufferhead *bh;
+ C_Client_FlushFinish(Buffercache *bc, Bufferhead *bh) {
+ this->bc = bc;
+ this->bh = bh;
+ }
+ void finish(int r) {
+ bh->flush_finish();
+ if (bc->dirty_buffers.empty()) {
+ // wake up flush waiters
+ for (list<Cond*>::iterator it = bc->waitfor_flushed.begin();
+ it != bc->waitfor_flushed.end();
+ it++) {
+ (*it)->Signal();
+ }
+ bc->waitfor_flushed.clear();
+ }
+ }
+};
+
+int Client::flush_buffers()
+{
+ if (!bc.dirty_buffers.empty()) {
+ for (set<Bufferhead*>::iterator it = bc.dirty_buffers.begin();
+ it != bc.dirty_buffers.end();
+ it++) {
+ (*it)->flush_start();
+ C_Client_FlushFinish *onfinish = new C_Client_FlushFinish(&bc, *it);
+ filer->write((*it)->ino, g_OSD_FileLayout, (*it)->bl.length(), (*it)->offset, (*it)->bl, 0, onfinish);
+ }
+ dout(7) << "dirty buffers, waiting" << endl;
+ Cond cond;
+ bc.waitfor_flushed.push_back(&cond);
+ cond.Wait(client_lock);
+ } else {
+ dout(7) << "no dirty buffers" << endl;
+ }
+}
+
/*
* release inode (read cached) buffers from memory
*/
int Client::release_inode_buffers(Inode *in)
{
- dout(2) << "release_inode_buffers IMPLEMENT ME" << endl;
+#ifdef BUFFERCACHE
+ bc.release_file(in->inode.ino);
+#endif
}
if (mkfs) m->set_mkfs(mkfs);
client_lock.Unlock();
- MClientMountAck *reply = (MClientMountAck*)messenger->sendrecv(m,
- MSG_ADDR_MDS(0), MDS_PORT_SERVER);
+ MClientMountAck *reply = (MClientMountAck*)messenger->sendrecv(m, MSG_ADDR_MDS(0), MDS_PORT_SERVER);
client_lock.Lock();
assert(reply);
// FIXME where does FUSE maintain user information
req->set_caller_uid(getuid());
req->set_caller_gid(getgid());
+
+ // Make sure buffers are all clean!
+ flush_inode_buffers(in);
// take note of the fact that we're mid-close
/* mds may ack our close() after reissuing same fh to another open; remove from
Mutex *mutex;
int *rvalue;
bool finished;
- C_Client_Cond(Cond *cond, Mutex *mutex, int *rvalue) {
- this->cond = cond;
+ C_Client_Cond(Cond *cond, Mutex *mutex, int *rvalue) {
+ this->cond = cond;
+ this->mutex = mutex;
+ this->rvalue = rvalue;
+ this->finished = false;
+ }
+ void finish(int r) {
+ //mutex->Lock();
+ *rvalue = r;
+ finished = true;
+ cond->Signal();
+ //mutex->Unlock();
+ }
+};
+
+class C_Client_MissFinish : public Context {
+public:
+ Bufferhead *bh;
+ Mutex *mutex;
+ int *rvalue;
+ bool finished;
+ C_Client_MissFinish(Bufferhead *bh, Mutex *mutex, int *rvalue) {
+ this->bh = bh;
this->mutex = mutex;
this->rvalue = rvalue;
this->finished = false;
}
void finish(int r) {
//mutex->Lock();
- *rvalue = r;
+ *rvalue += r;
finished = true;
- cond->Signal();
+ bh->miss_finish();
//mutex->Unlock();
}
};
{
client_lock.Lock();
+ dout(7) << "read len: " << size << " off: " << offset << endl;
assert(fh_map.count(fh));
Fh *f = fh_map[fh];
Inode *in = f->inode;
}
if (cond) delete cond;
+
+ // determine whether read range overlaps with file
+ // FIXME: maybe we should stat the file again?
+ dout(10) << "file size: " << in->inode.size << endl;
+ if (offset >= in->inode.size) {
+ client_lock.Unlock();
+ return 0;
+ }
+ if (size > in->inode.size) size = in->inode.size;
+
int rvalue = 0;
- if (0) {
- // (some of) read from buffer?
- // .... bleh ....
- } else {
- // issue read
- Cond cond;
+#ifndef BUFFERCACHE
+ {
+ Cond cond;
+ bufferlist blist; // data will go here
- bufferlist blist; // data will go here
-
- C_Client_Cond *onfinish = new C_Client_Cond(&cond, &client_lock, &rvalue);
-
- filer->read(in->inode.ino, g_OSD_FileLayout, size, offset, &blist, onfinish);
-
- cond.Wait(client_lock);
+ C_Client_Cond *onfinish = new C_Client_Cond(&cond, &client_lock, &rvalue);
+ filer->read(in->inode.ino, g_OSD_FileLayout, size, offset, &blist, onfinish);
+ cond.Wait(client_lock);
- // copy data into caller's buf
- blist.copy(0, blist.length(), buf);
+ // copy data into caller's buf
+ blist.copy(0, blist.length(), buf);
}
+#else
+ // map buffercache
+ map<off_t, Bufferhead*> hits, inflight;
+ map<off_t, Bufferhead*>::iterator curbuf;
+ map<off_t, size_t> holes;
+ map<off_t, size_t>::iterator hole;
+
+ Filecache *fc = bc.get_fc(in->inode.ino);
+ curbuf = fc->map_existing(size, offset, hits, inflight, holes);
+
+ if (curbuf != fc->buffer_map.end() && hits.count(curbuf->first)) {
+ // sweet -- we can return stuff immediately: find out how much
+ dout(7) << "read bc hit" << endl;
+ rvalue = (int)bc.touch_continuous(hits, size, offset);
+ assert(rvalue > 0);
+ rvalue = fc->copy_out((size_t)rvalue, offset, buf);
+ assert(rvalue > 0);
+ dout(7) << "read bc hit: immediately returning " << rvalue << " bytes" << endl;
+ }
+ // issue reads for holes
+ int hole_rvalue = 0; //FIXME: don't really need to track rvalue in MissFinish context
+ for (hole = holes.begin(); hole != holes.end(); hole++) {
+ dout(7) << "read bc miss" << endl;
+ off_t hole_offset = hole->first;
+ size_t hole_size = hole->second;
+
+ // insert new bufferhead without allocating buffers (Filer::handle_osd_read_reply allocates them)
+ Bufferhead *bh = new Bufferhead(in->inode.ino, hole_offset, &bc);
+
+ // read into the buffercache: when finished transition state from inflight to clean
+ bh->miss_start();
+ C_Client_MissFinish *onfinish = new C_Client_MissFinish(bh, &client_lock, &hole_rvalue);
+ filer->read(in->inode.ino, g_OSD_FileLayout, hole_size, hole_offset, &(bh->bl), onfinish);
+ dout(7) << "read bc miss: issued osd read len: " << hole_size << " off: " << hole_offset << endl;
+ }
+
+ if (rvalue == 0) {
+ // we need to wait for the first buffer
+ dout(7) << "read bc miss: waiting for first buffer" << endl;
+ Bufferhead *bh;
+ if (curbuf == fc->buffer_map.end() && fc->buffer_map.count(offset)) {
+ dout(10) << "first buffer is currently read in" << endl;
+ bh = fc->buffer_map[offset];
+ } else {
+ dout(10) << "first buffer is either hit or inflight" << endl;
+ bh = curbuf->second;
+ }
+ if (bh->state == BUFHD_STATE_INFLIGHT) {
+ bh->wait_for_read(client_lock);
+ }
+
+ // buffer is filled -- see how much we can return
+ hits.clear(); inflight.clear(); holes.clear();
+ fc->map_existing(size, offset, hits, inflight, holes); // FIXME: overkill
+ assert(hits.count(bh->offset));
+ rvalue = bc.touch_continuous(hits, size, offset);
+ fc->copy_out(rvalue, offset, buf);
+ dout(7) << "read bc no hit: returned first " << rvalue << " bytes" << endl;
+ }
+#endif
+ // done!
client_lock.Unlock();
- return rvalue;
+ return rvalue;
}
-
// hack.. see async write() below
class C_Client_WriteBuffer : public Context {
public:
Inode *in;
bufferlist *blist;
C_Client_WriteBuffer(Inode *in, bufferlist *blist) {
- this->in = in;
- this->blist = blist;
+ this->in = in;
+ this->blist = blist;
}
void finish(int r) {
- in->inflight_buffers.erase(blist);
- delete blist;
-
- if (in->inflight_buffers.empty()) {
- // wake up flush waiters
- for (list<Cond*>::iterator it = in->waitfor_flushed.begin();
- it != in->waitfor_flushed.end();
- it++) {
- (*it)->Signal();
- }
- in->waitfor_flushed.clear();
- }
+ in->inflight_buffers.erase(blist);
+ delete blist;
+
+ if (in->inflight_buffers.empty()) {
+ // wake up flush waiters
+ for (list<Cond*>::iterator it = in->waitfor_flushed.begin();
+ it != in->waitfor_flushed.end();
+ it++) {
+ (*it)->Signal();
+ }
+ in->waitfor_flushed.clear();
+ }
}
};
+
int Client::write(fileh_t fh, const char *buf, size_t size, off_t offset)
{
client_lock.Lock();
// buffered write?
- if (false && f->caps & CFILE_CAP_WRBUFFER) {
+ if (f->caps & CFILE_CAP_WRBUFFER) {
// buffered write
- dout(10) << "buffered/async write" << endl;
-
- /*
- hack for now.. replace this with a real buffer cache
-
- just copy the buffer, send the write off, and return immediately.
- flush() will block until all outstanding writes complete.
- */
-
- bufferlist *blist = new bufferlist;
- blist->push_back( new buffer(buf, size, BUFFER_MODE_COPY|BUFFER_MODE_FREE) );
-
- in->inflight_buffers.insert(blist);
-
- Context *onfinish = new C_Client_WriteBuffer( in, blist );
- filer->write(in->inode.ino, g_OSD_FileLayout, size, offset, *blist, 0, onfinish);
+ dout(7) << "buffered/async write" << endl;
+
+#ifdef BUFFERCACHE
+ // map buffercache for writing
+ map<off_t, Bufferhead*> buffers, inflight;
+ bc.map_or_alloc(in->inode.ino, size, offset, buffers, inflight);
+
+ // wait for inflight buffers
+ while (!inflight.empty()) {
+ inflight.begin()->second->wait_for_write(client_lock);
+ buffers.clear(); inflight.clear();
+ bc.map_or_alloc(in->inode.ino, size, offset, buffers, inflight); // FIXME: overkill
+ }
+ bc.dirty(in->inode.ino, size, offset, buf);
+#else
+ /*
+ hack for now.. replace this with a real buffer cache
+
+ just copy the buffer, send the write off, and return immediately.
+ flush() will block until all outstanding writes complete.
+ */
+
+ bufferlist *blist = new bufferlist;
+ blist->push_back( new buffer(buf, size, BUFFER_MODE_COPY|BUFFER_MODE_FREE) );
+
+ in->inflight_buffers.insert(blist);
+
+ Context *onfinish = new C_Client_WriteBuffer( in, blist );
+ filer->write(in->inode.ino, g_OSD_FileLayout, size, offset, *blist, 0, onfinish);
+#endif
} else {
// synchronous write
- dout(10) << "synchronous write" << endl;
+ // FIXME: do not bypass buffercache
+ dout(7) << "synchronous write" << endl;
// create a buffer that refers to *buf, but doesn't try to free it when it's done.
bufferlist blist;