From fff39240db7cba67fed97cd3ceeb604586d2188a Mon Sep 17 00:00:00 2001 From: carlosm Date: Tue, 5 Jul 2005 19:21:26 +0000 Subject: [PATCH] Modified Files: Makefile client/Client.cc client/Client.h include/buffer.h include/bufferlist.h Buffercache-related changes in client/Client.* and include/buffer* Buffercache still buggy -- compile with -DBUFFERCACHE to enable code in client/Client.cc. git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@399 29311d96-e01e-0410-9327-a35deaab8ce9 --- ceph/Makefile | 18 +-- ceph/client/Client.cc | 325 +++++++++++++++++++++++++++++--------- ceph/client/Client.h | 5 + ceph/include/buffer.h | 2 +- ceph/include/bufferlist.h | 2 +- 5 files changed, 266 insertions(+), 86 deletions(-) diff --git a/ceph/Makefile b/ceph/Makefile index c70143f40c36e..78978d152b439 100644 --- a/ceph/Makefile +++ b/ceph/Makefile @@ -68,13 +68,13 @@ gprof-helper.so: test/gprof-helper.c import: mds/allmds.o osd/OSD.o msg/FakeMessenger.o import.cc ${COMMON_OBJS} ${CC} ${CFLAGS} ${LIBS} $^ -o $@ -singleclient: mds/allmds.o osd/OSD.o fakesingleclient.o client/Client.o msg/FakeMessenger.o fsck.o ${COMMON_OBJS} +singleclient: mds/allmds.o osd/OSD.o fakesingleclient.o client/Client.o client/Buffercache.o msg/FakeMessenger.o fsck.o ${COMMON_OBJS} ${CC} ${CFLAGS} ${LIBS} $^ -o $@ tp: osd/tp.o ${CC} ${CFLAGS} ${LIBS} $^ -o $@ -fuseclient: client/Client.o client/fuse.o msg/FakeMessenger.o ${COMMON_OBJS} +fuseclient: client/Client.o client/Buffercache.o client/fuse.o msg/FakeMessenger.o ${COMMON_OBJS} ${CC} ${CFLAGS} ${LIBS} -lfuse $^ -o $@ fakemds: test/fakemds.cc msg/FakeMessenger.o fakeclient/FakeClient.o osd/OSD.o mds/allmds.o ${COMMON_OBJS} @@ -86,25 +86,25 @@ mpitest: test/mpitest.o msg/MPIMessenger.cc mds/allmds.o osd/OSD.o fakeclient/Fa mttest: test/mttest.cc msg/MTMessenger.cc ${COMMON_OBJS} ${MPICC} ${CFLAGS} ${LIBS} $^ -o $@ -mpifuse: mpifuse.cc mds/allmds.o client/Client.o client/SyntheticClient.o osd/OSD.o client/fuse.o msg/MPIMessenger.cc ${COMMON_OBJS} +mpifuse: mpifuse.cc mds/allmds.o client/Client.o client/Buffercache.o client/SyntheticClient.o osd/OSD.o client/fuse.o msg/MPIMessenger.cc ${COMMON_OBJS} ${MPICC} ${CFLAGS} ${LIBS} -lfuse $^ -o $@ -tcpfuse: tcpfuse.cc mds/allmds.o client/Client.o client/SyntheticClient.o osd/OSD.o client/fuse.o msg/TCPMessenger.cc ${COMMON_OBJS} +tcpfuse: tcpfuse.cc mds/allmds.o client/Client.o client/Buffercache.o client/SyntheticClient.o osd/OSD.o client/fuse.o msg/TCPMessenger.cc ${COMMON_OBJS} ${MPICC} ${CFLAGS} ${LIBS} -lfuse $^ -o $@ -mpisyn: mpisyn.cc mds/allmds.o client/Client.o client/SyntheticClient.o osd/OSD.o msg/MPIMessenger.cc ${COMMON_OBJS} +mpisyn: mpisyn.cc mds/allmds.o client/Client.o client/Buffercache.o client/SyntheticClient.o osd/OSD.o msg/MPIMessenger.cc ${COMMON_OBJS} ${MPICC} ${MPICFLAGS} ${MPILIBS} $^ -o $@ -tcpsyn: tcpsyn.cc mds/allmds.o client/Client.o client/SyntheticClient.o osd/OSD.o msg/TCPMessenger.cc ${COMMON_OBJS} +tcpsyn: tcpsyn.cc mds/allmds.o client/Client.o client/Buffercache.o client/SyntheticClient.o osd/OSD.o msg/TCPMessenger.cc ${COMMON_OBJS} ${MPICC} ${MPICFLAGS} ${MPILIBS} $^ -o $@ -obfstest: tcpsyn.cc mds/allmds.o client/Client.o client/SyntheticClient.o osd/OSD.cc osd/OBFSStore.o msg/TCPMessenger.cc ${COMMON_OBJS} +obfstest: tcpsyn.cc mds/allmds.o client/Client.o client/Buffercache.o client/SyntheticClient.o osd/OSD.cc osd/OBFSStore.o msg/TCPMessenger.cc ${COMMON_OBJS} ${MPICC} -DUSE_OBFS ${MPICFLAGS} ${MPILIBS} $^ -o $@ ../uofs/uofs.a -fakesyn: fakesyn.cc mds/allmds.o client/Client.o client/SyntheticClient.o osd/OSD.o msg/FakeMessenger.o ${COMMON_OBJS} +fakesyn: fakesyn.cc mds/allmds.o client/Client.o client/Buffercache.o client/SyntheticClient.o osd/OSD.o msg/FakeMessenger.o ${COMMON_OBJS} ${CC} -pg ${CFLAGS} ${LIBS} $^ -o $@ -fakefuse: fakefuse.cc mds/allmds.o client/Client.o osd/OSD.o client/fuse.o msg/FakeMessenger.cc ${COMMON_OBJS} +fakefuse: fakefuse.cc mds/allmds.o client/Client.o client/Buffercache.o osd/OSD.o client/fuse.o msg/FakeMessenger.cc ${COMMON_OBJS} ${CC} -pg ${CFLAGS} ${LIBS} -lfuse $^ -o $@ testmpi: test/testmpi.cc msg/MPIMessenger.cc config.o common/Timer.o common/clock.o msg/Messenger.o msg/Dispatcher.o msg/error.o diff --git a/ceph/client/Client.cc b/ceph/client/Client.cc index 58eaf6cc6bb12..b7f86543bb04d 100644 --- a/ceph/client/Client.cc +++ b/ceph/client/Client.cc @@ -21,7 +21,7 @@ #include "include/config.h" #undef dout -#define dout(l) if (l<=g_conf.debug) cout << "client" << whoami << "." << pthread_self() << " " +#define dout(l) if (l<=g_conf.debug) cout << "client" << "." << pthread_self() << " " @@ -286,9 +286,6 @@ Dentry *Client::lookup(filepath& path) return dn; } - - - // ------- MClientReply *Client::make_request(MClientRequest *req) @@ -360,37 +357,111 @@ void Client::dispatch(Message *m) client_lock.Unlock(); } - - - - - - /* * flush inode (write cached) buffers to disk */ +class C_Client_FileFlushFinish : public Context { +public: + Filecache *fc; + Bufferhead *bh; + C_Client_FileFlushFinish(Filecache *fc, Bufferhead *bh) { + this->fc = fc; + this->bh = bh; + } + void finish(int r) { + bh->flush_finish(); + if (fc->dirty_buffers.empty()) { + // wake up flush waiters + for (list::iterator it = fc->waitfor_flushed.begin(); + it != fc->waitfor_flushed.end(); + it++) { + (*it)->Signal(); + } + fc->waitfor_flushed.clear(); + } + } +}; + + int Client::flush_inode_buffers(Inode *in) { - if (in->inflight_buffers.size() - /* || in->dirty_buffers.size() */) { - dout(7) << "inflight buffers, waiting" << endl; - Cond *cond = new Cond; - in->waitfor_flushed.push_back(cond); - cond->Wait(client_lock); - delete cond; - assert(in->inflight_buffers.empty()); - dout(7) << "inflight buffers flushed" << endl; + if (!in->inflight_buffers.empty()) { + dout(7) << "inflight buffers of sync write, waiting" << endl; + Cond *cond = new Cond; + in->waitfor_flushed.push_back(cond); + cond->Wait(client_lock); + delete cond; + assert(in->inflight_buffers.empty()); + dout(7) << "inflight buffers flushed" << endl; +#ifdef BUFFERCACHE + } else if (!bc.get_fc(in->inode.ino)->dirty_buffers.empty()) { + dout(7) << "inode " << in->inode.ino << " has dirty buffers" << endl; + Filecache *fc = bc.get_fc(in->inode.ino); + fc->simplify(); + for (set::iterator it = fc->dirty_buffers.begin(); + it != fc->dirty_buffers.end(); + it++) { + (*it)->flush_start(); + C_Client_FileFlushFinish *onfinish = new C_Client_FileFlushFinish(fc, *it); + filer->write(in->inode.ino, g_OSD_FileLayout, (*it)->bl.length(), (*it)->offset, (*it)->bl, 0, onfinish); + } + dout(7) << "dirty buffers, waiting" << endl; + fc->wait_for_flush(client_lock); +#endif } else { dout(7) << "no inflight buffers" << endl; } } +class C_Client_FlushFinish : public Context { +public: + Buffercache *bc; + Bufferhead *bh; + C_Client_FlushFinish(Buffercache *bc, Bufferhead *bh) { + this->bc = bc; + this->bh = bh; + } + void finish(int r) { + bh->flush_finish(); + if (bc->dirty_buffers.empty()) { + // wake up flush waiters + for (list::iterator it = bc->waitfor_flushed.begin(); + it != bc->waitfor_flushed.end(); + it++) { + (*it)->Signal(); + } + bc->waitfor_flushed.clear(); + } + } +}; + +int Client::flush_buffers() +{ + if (!bc.dirty_buffers.empty()) { + for (set::iterator it = bc.dirty_buffers.begin(); + it != bc.dirty_buffers.end(); + it++) { + (*it)->flush_start(); + C_Client_FlushFinish *onfinish = new C_Client_FlushFinish(&bc, *it); + filer->write((*it)->ino, g_OSD_FileLayout, (*it)->bl.length(), (*it)->offset, (*it)->bl, 0, onfinish); + } + dout(7) << "dirty buffers, waiting" << endl; + Cond cond; + bc.waitfor_flushed.push_back(&cond); + cond.Wait(client_lock); + } else { + dout(7) << "no dirty buffers" << endl; + } +} + /* * release inode (read cached) buffers from memory */ int Client::release_inode_buffers(Inode *in) { - dout(2) << "release_inode_buffers IMPLEMENT ME" << endl; +#ifdef BUFFERCACHE + bc.release_file(in->inode.ino); +#endif } @@ -475,8 +546,7 @@ int Client::mount(int mkfs) if (mkfs) m->set_mkfs(mkfs); client_lock.Unlock(); - MClientMountAck *reply = (MClientMountAck*)messenger->sendrecv(m, - MSG_ADDR_MDS(0), MDS_PORT_SERVER); + MClientMountAck *reply = (MClientMountAck*)messenger->sendrecv(m, MSG_ADDR_MDS(0), MDS_PORT_SERVER); client_lock.Lock(); assert(reply); @@ -1026,6 +1096,9 @@ int Client::close(fileh_t fh) // FIXME where does FUSE maintain user information req->set_caller_uid(getuid()); req->set_caller_gid(getgid()); + + // Make sure buffers are all clean! + flush_inode_buffers(in); // take note of the fact that we're mid-close /* mds may ack our close() after reissuing same fh to another open; remove from @@ -1061,17 +1134,38 @@ public: Mutex *mutex; int *rvalue; bool finished; - C_Client_Cond(Cond *cond, Mutex *mutex, int *rvalue) { - this->cond = cond; + C_Client_Cond(Cond *cond, Mutex *mutex, int *rvalue) { + this->cond = cond; + this->mutex = mutex; + this->rvalue = rvalue; + this->finished = false; + } + void finish(int r) { + //mutex->Lock(); + *rvalue = r; + finished = true; + cond->Signal(); + //mutex->Unlock(); + } +}; + +class C_Client_MissFinish : public Context { +public: + Bufferhead *bh; + Mutex *mutex; + int *rvalue; + bool finished; + C_Client_MissFinish(Bufferhead *bh, Mutex *mutex, int *rvalue) { + this->bh = bh; this->mutex = mutex; this->rvalue = rvalue; this->finished = false; } void finish(int r) { //mutex->Lock(); - *rvalue = r; + *rvalue += r; finished = true; - cond->Signal(); + bh->miss_finish(); //mutex->Unlock(); } }; @@ -1081,6 +1175,7 @@ int Client::read(fileh_t fh, char *buf, size_t size, off_t offset) { client_lock.Lock(); + dout(7) << "read len: " << size << " off: " << offset << endl; assert(fh_map.count(fh)); Fh *f = fh_map[fh]; Inode *in = f->inode; @@ -1095,56 +1190,121 @@ int Client::read(fileh_t fh, char *buf, size_t size, off_t offset) } if (cond) delete cond; + + // determine whether read range overlaps with file + // FIXME: maybe we should stat the file again? + dout(10) << "file size: " << in->inode.size << endl; + if (offset >= in->inode.size) { + client_lock.Unlock(); + return 0; + } + if (size > in->inode.size) size = in->inode.size; + int rvalue = 0; - if (0) { - // (some of) read from buffer? - // .... bleh .... - } else { - // issue read - Cond cond; +#ifndef BUFFERCACHE + { + Cond cond; + bufferlist blist; // data will go here - bufferlist blist; // data will go here - - C_Client_Cond *onfinish = new C_Client_Cond(&cond, &client_lock, &rvalue); - - filer->read(in->inode.ino, g_OSD_FileLayout, size, offset, &blist, onfinish); - - cond.Wait(client_lock); + C_Client_Cond *onfinish = new C_Client_Cond(&cond, &client_lock, &rvalue); + filer->read(in->inode.ino, g_OSD_FileLayout, size, offset, &blist, onfinish); + cond.Wait(client_lock); - // copy data into caller's buf - blist.copy(0, blist.length(), buf); + // copy data into caller's buf + blist.copy(0, blist.length(), buf); } +#else + // map buffercache + map hits, inflight; + map::iterator curbuf; + map holes; + map::iterator hole; + + Filecache *fc = bc.get_fc(in->inode.ino); + curbuf = fc->map_existing(size, offset, hits, inflight, holes); + + if (curbuf != fc->buffer_map.end() && hits.count(curbuf->first)) { + // sweet -- we can return stuff immediately: find out how much + dout(7) << "read bc hit" << endl; + rvalue = (int)bc.touch_continuous(hits, size, offset); + assert(rvalue > 0); + rvalue = fc->copy_out((size_t)rvalue, offset, buf); + assert(rvalue > 0); + dout(7) << "read bc hit: immediately returning " << rvalue << " bytes" << endl; + } + // issue reads for holes + int hole_rvalue = 0; //FIXME: don't really need to track rvalue in MissFinish context + for (hole = holes.begin(); hole != holes.end(); hole++) { + dout(7) << "read bc miss" << endl; + off_t hole_offset = hole->first; + size_t hole_size = hole->second; + + // insert new bufferhead without allocating buffers (Filer::handle_osd_read_reply allocates them) + Bufferhead *bh = new Bufferhead(in->inode.ino, hole_offset, &bc); + + // read into the buffercache: when finished transition state from inflight to clean + bh->miss_start(); + C_Client_MissFinish *onfinish = new C_Client_MissFinish(bh, &client_lock, &hole_rvalue); + filer->read(in->inode.ino, g_OSD_FileLayout, hole_size, hole_offset, &(bh->bl), onfinish); + dout(7) << "read bc miss: issued osd read len: " << hole_size << " off: " << hole_offset << endl; + } + + if (rvalue == 0) { + // we need to wait for the first buffer + dout(7) << "read bc miss: waiting for first buffer" << endl; + Bufferhead *bh; + if (curbuf == fc->buffer_map.end() && fc->buffer_map.count(offset)) { + dout(10) << "first buffer is currently read in" << endl; + bh = fc->buffer_map[offset]; + } else { + dout(10) << "first buffer is either hit or inflight" << endl; + bh = curbuf->second; + } + if (bh->state == BUFHD_STATE_INFLIGHT) { + bh->wait_for_read(client_lock); + } + + // buffer is filled -- see how much we can return + hits.clear(); inflight.clear(); holes.clear(); + fc->map_existing(size, offset, hits, inflight, holes); // FIXME: overkill + assert(hits.count(bh->offset)); + rvalue = bc.touch_continuous(hits, size, offset); + fc->copy_out(rvalue, offset, buf); + dout(7) << "read bc no hit: returned first " << rvalue << " bytes" << endl; + } +#endif + // done! client_lock.Unlock(); - return rvalue; + return rvalue; } - // hack.. see async write() below class C_Client_WriteBuffer : public Context { public: Inode *in; bufferlist *blist; C_Client_WriteBuffer(Inode *in, bufferlist *blist) { - this->in = in; - this->blist = blist; + this->in = in; + this->blist = blist; } void finish(int r) { - in->inflight_buffers.erase(blist); - delete blist; - - if (in->inflight_buffers.empty()) { - // wake up flush waiters - for (list::iterator it = in->waitfor_flushed.begin(); - it != in->waitfor_flushed.end(); - it++) { - (*it)->Signal(); - } - in->waitfor_flushed.clear(); - } + in->inflight_buffers.erase(blist); + delete blist; + + if (in->inflight_buffers.empty()) { + // wake up flush waiters + for (list::iterator it = in->waitfor_flushed.begin(); + it != in->waitfor_flushed.end(); + it++) { + (*it)->Signal(); + } + in->waitfor_flushed.clear(); + } } }; + int Client::write(fileh_t fh, const char *buf, size_t size, off_t offset) { client_lock.Lock(); @@ -1170,28 +1330,43 @@ int Client::write(fileh_t fh, const char *buf, size_t size, off_t offset) // buffered write? - if (false && f->caps & CFILE_CAP_WRBUFFER) { + if (f->caps & CFILE_CAP_WRBUFFER) { // buffered write - dout(10) << "buffered/async write" << endl; - - /* - hack for now.. replace this with a real buffer cache - - just copy the buffer, send the write off, and return immediately. - flush() will block until all outstanding writes complete. - */ - - bufferlist *blist = new bufferlist; - blist->push_back( new buffer(buf, size, BUFFER_MODE_COPY|BUFFER_MODE_FREE) ); - - in->inflight_buffers.insert(blist); - - Context *onfinish = new C_Client_WriteBuffer( in, blist ); - filer->write(in->inode.ino, g_OSD_FileLayout, size, offset, *blist, 0, onfinish); + dout(7) << "buffered/async write" << endl; + +#ifdef BUFFERCACHE + // map buffercache for writing + map buffers, inflight; + bc.map_or_alloc(in->inode.ino, size, offset, buffers, inflight); + + // wait for inflight buffers + while (!inflight.empty()) { + inflight.begin()->second->wait_for_write(client_lock); + buffers.clear(); inflight.clear(); + bc.map_or_alloc(in->inode.ino, size, offset, buffers, inflight); // FIXME: overkill + } + bc.dirty(in->inode.ino, size, offset, buf); +#else + /* + hack for now.. replace this with a real buffer cache + + just copy the buffer, send the write off, and return immediately. + flush() will block until all outstanding writes complete. + */ + + bufferlist *blist = new bufferlist; + blist->push_back( new buffer(buf, size, BUFFER_MODE_COPY|BUFFER_MODE_FREE) ); + + in->inflight_buffers.insert(blist); + + Context *onfinish = new C_Client_WriteBuffer( in, blist ); + filer->write(in->inode.ino, g_OSD_FileLayout, size, offset, *blist, 0, onfinish); +#endif } else { // synchronous write - dout(10) << "synchronous write" << endl; + // FIXME: do not bypass buffercache + dout(7) << "synchronous write" << endl; // create a buffer that refers to *buf, but doesn't try to free it when it's done. bufferlist blist; diff --git a/ceph/client/Client.h b/ceph/client/Client.h index 5c1800bd02371..11afd17d1afbc 100644 --- a/ceph/client/Client.h +++ b/ceph/client/Client.h @@ -1,6 +1,8 @@ #ifndef __CLIENT_H #define __CLIENT_H +#include "Buffercache.h" + #include "mds/MDCluster.h" #include "osd/OSDCluster.h" @@ -240,6 +242,9 @@ class Client : public Dispatcher { // buffer cache + Buffercache bc; + + int flush_buffers(); // flush dirty buffers int flush_inode_buffers(Inode *in); // flush buffered writes int release_inode_buffers(Inode *in); // release cached reads diff --git a/ceph/include/buffer.h b/ceph/include/buffer.h index 933d41e72bec7..028f3c6e196de 100644 --- a/ceph/include/buffer.h +++ b/ceph/include/buffer.h @@ -278,7 +278,7 @@ class bufferptr { assert(len >= 0 && off + len <= _len); memcpy(dest, c_str() + off, len); } - void copy_in(int off, int len, char *src) { + void copy_in(int off, int len, const char *src) { assert(off >= 0 && off <= _len); assert(len >= 0 && off + len <= _len); memcpy(c_str() + off, src, len); diff --git a/ceph/include/bufferlist.h b/ceph/include/bufferlist.h index 2222c99ec509d..0f9ddedd2bad5 100644 --- a/ceph/include/bufferlist.h +++ b/ceph/include/bufferlist.h @@ -161,7 +161,7 @@ class bufferlist { } } - void copy_in(int off, int len, char *src) { + void copy_in(int off, int len, const char *src) { assert(off >= 0); assert(off + len <= length()); -- 2.39.5