]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
Modified Files:
authorcarlosm <carlosm@29311d96-e01e-0410-9327-a35deaab8ce9>
Tue, 5 Jul 2005 19:21:26 +0000 (19:21 +0000)
committercarlosm <carlosm@29311d96-e01e-0410-9327-a35deaab8ce9>
Tue, 5 Jul 2005 19:21:26 +0000 (19:21 +0000)
Makefile client/Client.cc client/Client.h include/buffer.h
include/bufferlist.h

Buffercache-related changes in client/Client.* and include/buffer*
Buffercache still buggy -- compile with -DBUFFERCACHE to enable code in client/Client.cc.

git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@399 29311d96-e01e-0410-9327-a35deaab8ce9

ceph/Makefile
ceph/client/Client.cc
ceph/client/Client.h
ceph/include/buffer.h
ceph/include/bufferlist.h

index c70143f40c36e739c51f13865e1493b324933164..78978d152b439ac00a8e4ccad323aeceec87a673 100644 (file)
@@ -68,13 +68,13 @@ gprof-helper.so: test/gprof-helper.c
 import: mds/allmds.o osd/OSD.o msg/FakeMessenger.o import.cc ${COMMON_OBJS} 
        ${CC} ${CFLAGS} ${LIBS} $^ -o $@
 
-singleclient: mds/allmds.o osd/OSD.o fakesingleclient.o client/Client.o  msg/FakeMessenger.o fsck.o ${COMMON_OBJS} 
+singleclient: mds/allmds.o osd/OSD.o fakesingleclient.o client/Client.o client/Buffercache.o msg/FakeMessenger.o fsck.o ${COMMON_OBJS} 
        ${CC} ${CFLAGS} ${LIBS} $^ -o $@
 
 tp: osd/tp.o
        ${CC} ${CFLAGS} ${LIBS} $^ -o $@
 
-fuseclient: client/Client.o client/fuse.o msg/FakeMessenger.o ${COMMON_OBJS} 
+fuseclient: client/Client.o client/Buffercache.o client/fuse.o msg/FakeMessenger.o ${COMMON_OBJS} 
        ${CC} ${CFLAGS} ${LIBS} -lfuse $^ -o $@
 
 fakemds: test/fakemds.cc msg/FakeMessenger.o fakeclient/FakeClient.o osd/OSD.o mds/allmds.o ${COMMON_OBJS} 
@@ -86,25 +86,25 @@ mpitest: test/mpitest.o msg/MPIMessenger.cc mds/allmds.o osd/OSD.o fakeclient/Fa
 mttest: test/mttest.cc msg/MTMessenger.cc ${COMMON_OBJS}
        ${MPICC} ${CFLAGS} ${LIBS} $^ -o $@
 
-mpifuse: mpifuse.cc mds/allmds.o client/Client.o client/SyntheticClient.o osd/OSD.o client/fuse.o msg/MPIMessenger.cc ${COMMON_OBJS}
+mpifuse: mpifuse.cc mds/allmds.o client/Client.o client/Buffercache.o client/SyntheticClient.o osd/OSD.o client/fuse.o msg/MPIMessenger.cc ${COMMON_OBJS}
        ${MPICC} ${CFLAGS} ${LIBS} -lfuse $^ -o $@
 
-tcpfuse: tcpfuse.cc mds/allmds.o client/Client.o client/SyntheticClient.o osd/OSD.o client/fuse.o msg/TCPMessenger.cc ${COMMON_OBJS}
+tcpfuse: tcpfuse.cc mds/allmds.o client/Client.o client/Buffercache.o client/SyntheticClient.o osd/OSD.o client/fuse.o msg/TCPMessenger.cc ${COMMON_OBJS}
        ${MPICC} ${CFLAGS} ${LIBS} -lfuse $^ -o $@
 
-mpisyn: mpisyn.cc mds/allmds.o client/Client.o client/SyntheticClient.o osd/OSD.o msg/MPIMessenger.cc ${COMMON_OBJS}
+mpisyn: mpisyn.cc mds/allmds.o client/Client.o client/Buffercache.o client/SyntheticClient.o osd/OSD.o msg/MPIMessenger.cc ${COMMON_OBJS}
        ${MPICC} ${MPICFLAGS} ${MPILIBS} $^ -o $@
 
-tcpsyn: tcpsyn.cc mds/allmds.o client/Client.o client/SyntheticClient.o osd/OSD.o msg/TCPMessenger.cc ${COMMON_OBJS}
+tcpsyn: tcpsyn.cc mds/allmds.o client/Client.o client/Buffercache.o client/SyntheticClient.o osd/OSD.o msg/TCPMessenger.cc ${COMMON_OBJS}
        ${MPICC} ${MPICFLAGS} ${MPILIBS} $^ -o $@
 
-obfstest: tcpsyn.cc mds/allmds.o client/Client.o client/SyntheticClient.o osd/OSD.cc osd/OBFSStore.o msg/TCPMessenger.cc ${COMMON_OBJS} 
+obfstest: tcpsyn.cc mds/allmds.o client/Client.o client/Buffercache.o client/SyntheticClient.o osd/OSD.cc osd/OBFSStore.o msg/TCPMessenger.cc ${COMMON_OBJS} 
        ${MPICC} -DUSE_OBFS ${MPICFLAGS} ${MPILIBS} $^ -o $@ ../uofs/uofs.a
 
-fakesyn: fakesyn.cc mds/allmds.o client/Client.o client/SyntheticClient.o osd/OSD.o msg/FakeMessenger.o ${COMMON_OBJS}
+fakesyn: fakesyn.cc mds/allmds.o client/Client.o client/Buffercache.o client/SyntheticClient.o osd/OSD.o msg/FakeMessenger.o ${COMMON_OBJS}
        ${CC} -pg ${CFLAGS} ${LIBS} $^ -o $@
 
-fakefuse: fakefuse.cc mds/allmds.o client/Client.o osd/OSD.o client/fuse.o msg/FakeMessenger.cc ${COMMON_OBJS}
+fakefuse: fakefuse.cc mds/allmds.o client/Client.o client/Buffercache.o osd/OSD.o client/fuse.o msg/FakeMessenger.cc ${COMMON_OBJS}
        ${CC} -pg ${CFLAGS} ${LIBS} -lfuse $^ -o $@
 
 testmpi: test/testmpi.cc msg/MPIMessenger.cc config.o common/Timer.o common/clock.o msg/Messenger.o msg/Dispatcher.o msg/error.o
index 58eaf6cc6bb12c412917a4a9481a032956f7a015..b7f86543bb04df76da693c35b9880a52b447bb3b 100644 (file)
@@ -21,7 +21,7 @@
 
 #include "include/config.h"
 #undef dout
-#define  dout(l)    if (l<=g_conf.debug) cout << "client" << whoami << "." << pthread_self() << " "
+#define  dout(l)    if (l<=g_conf.debug) cout << "client" << "." << pthread_self() << " "
 
 
 
@@ -286,9 +286,6 @@ Dentry *Client::lookup(filepath& path)
   return dn;
 }
 
-
-
-
 // -------
 
 MClientReply *Client::make_request(MClientRequest *req)
@@ -360,37 +357,111 @@ void Client::dispatch(Message *m)
   client_lock.Unlock();
 }
 
-
-
-
-
-
-
 /*
  * flush inode (write cached) buffers to disk
  */
+class C_Client_FileFlushFinish : public Context {
+public:
+  Filecache *fc;
+  Bufferhead *bh;
+  C_Client_FileFlushFinish(Filecache *fc, Bufferhead *bh) {
+       this->fc = fc;
+       this->bh = bh;
+  }
+  void finish(int r) {
+    bh->flush_finish();
+       if (fc->dirty_buffers.empty()) {
+         // wake up flush waiters
+         for (list<Cond*>::iterator it = fc->waitfor_flushed.begin();
+                  it != fc->waitfor_flushed.end();
+                  it++) {
+               (*it)->Signal();
+         }
+         fc->waitfor_flushed.clear();
+       }
+  }
+};
+
+
 int Client::flush_inode_buffers(Inode *in)
 {
-  if (in->inflight_buffers.size() 
-         /* || in->dirty_buffers.size() */) {
-       dout(7) << "inflight buffers, waiting" << endl;
-       Cond *cond = new Cond;
-       in->waitfor_flushed.push_back(cond);
-       cond->Wait(client_lock);
-       delete cond;
-       assert(in->inflight_buffers.empty());
-       dout(7) << "inflight buffers flushed" << endl;
+  if (!in->inflight_buffers.empty()) {
+    dout(7) << "inflight buffers of sync write, waiting" << endl;
+    Cond *cond = new Cond;
+    in->waitfor_flushed.push_back(cond);
+    cond->Wait(client_lock);
+    delete cond;
+    assert(in->inflight_buffers.empty());
+    dout(7) << "inflight buffers flushed" << endl;
+#ifdef BUFFERCACHE
+  } else if (!bc.get_fc(in->inode.ino)->dirty_buffers.empty()) {
+    dout(7) << "inode " << in->inode.ino << " has dirty buffers" << endl;
+    Filecache *fc = bc.get_fc(in->inode.ino);
+    fc->simplify();
+    for (set<Bufferhead*>::iterator it = fc->dirty_buffers.begin();
+         it != fc->dirty_buffers.end();
+         it++) {
+      (*it)->flush_start();
+      C_Client_FileFlushFinish *onfinish = new C_Client_FileFlushFinish(fc, *it);
+      filer->write(in->inode.ino, g_OSD_FileLayout, (*it)->bl.length(), (*it)->offset, (*it)->bl, 0, onfinish);
+    }
+    dout(7) << "dirty buffers, waiting" << endl;
+    fc->wait_for_flush(client_lock);
+#endif
   } else {
        dout(7) << "no inflight buffers" << endl;
   }
 }
 
+class C_Client_FlushFinish : public Context {
+public:
+  Buffercache *bc;
+  Bufferhead *bh;
+  C_Client_FlushFinish(Buffercache *bc, Bufferhead *bh) {
+       this->bc = bc;
+    this->bh = bh;
+  }
+  void finish(int r) {
+    bh->flush_finish();
+       if (bc->dirty_buffers.empty()) {
+         // wake up flush waiters
+         for (list<Cond*>::iterator it = bc->waitfor_flushed.begin();
+                  it != bc->waitfor_flushed.end();
+                  it++) {
+               (*it)->Signal();
+         }
+         bc->waitfor_flushed.clear();
+       }
+  }
+};
+
+int Client::flush_buffers()
+{
+  if (!bc.dirty_buffers.empty()) {
+    for (set<Bufferhead*>::iterator it = bc.dirty_buffers.begin();
+         it != bc.dirty_buffers.end();
+         it++) {
+      (*it)->flush_start();
+      C_Client_FlushFinish *onfinish = new C_Client_FlushFinish(&bc, *it);
+      filer->write((*it)->ino, g_OSD_FileLayout, (*it)->bl.length(), (*it)->offset, (*it)->bl, 0, onfinish);
+    }
+    dout(7) << "dirty buffers, waiting" << endl;
+    Cond cond;
+    bc.waitfor_flushed.push_back(&cond);
+    cond.Wait(client_lock);
+  } else {
+       dout(7) << "no dirty buffers" << endl;
+  }
+}
+
 /*
  * release inode (read cached) buffers from memory
  */
 int Client::release_inode_buffers(Inode *in)
 {
-  dout(2) << "release_inode_buffers IMPLEMENT ME" << endl;
+#ifdef BUFFERCACHE
+  bc.release_file(in->inode.ino);
+#endif
 }
 
 
@@ -475,8 +546,7 @@ int Client::mount(int mkfs)
   if (mkfs) m->set_mkfs(mkfs);
 
   client_lock.Unlock();
-  MClientMountAck *reply = (MClientMountAck*)messenger->sendrecv(m,
-                                                                                                                                MSG_ADDR_MDS(0), MDS_PORT_SERVER);
+  MClientMountAck *reply = (MClientMountAck*)messenger->sendrecv(m, MSG_ADDR_MDS(0), MDS_PORT_SERVER);
   client_lock.Lock();
   assert(reply);
 
@@ -1026,6 +1096,9 @@ int Client::close(fileh_t fh)
   // FIXME where does FUSE maintain user information
   req->set_caller_uid(getuid());
   req->set_caller_gid(getgid());
+
+  // Make sure buffers are all clean!
+  flush_inode_buffers(in);
   
   // take note of the fact that we're mid-close
   /* mds may ack our close() after reissuing same fh to another open; remove from
@@ -1061,17 +1134,38 @@ public:
   Mutex *mutex;
   int *rvalue;
   bool finished;
-  C_Client_Cond(Cond *cond, Mutex *mutex, int *rvalue) { 
-       this->cond = cond;
+  C_Client_Cond(Cond *cond, Mutex *mutex, int *rvalue) {
+    this->cond = cond;
+    this->mutex = mutex;
+    this->rvalue = rvalue;
+    this->finished = false;
+  }
+  void finish(int r) {
+    //mutex->Lock();
+    *rvalue = r;
+    finished = true;
+    cond->Signal();
+    //mutex->Unlock();
+  }
+};
+
+class C_Client_MissFinish : public Context {
+public:
+  Bufferhead *bh;
+  Mutex *mutex;
+  int *rvalue;
+  bool finished;
+  C_Client_MissFinish(Bufferhead *bh, Mutex *mutex, int *rvalue) { 
+       this->bh = bh; 
        this->mutex = mutex;
        this->rvalue = rvalue;
        this->finished = false;
   }
   void finish(int r) {
        //mutex->Lock();
-       *rvalue = r;
+       *rvalue += r;
        finished = true;
-       cond->Signal();
+    bh->miss_finish();
        //mutex->Unlock();
   }
 };
@@ -1081,6 +1175,7 @@ int Client::read(fileh_t fh, char *buf, size_t size, off_t offset)
 {
   client_lock.Lock();
 
+  dout(7) << "read len: " << size << " off: " << offset << endl;
   assert(fh_map.count(fh));
   Fh *f = fh_map[fh];
   Inode *in = f->inode;
@@ -1095,56 +1190,121 @@ int Client::read(fileh_t fh, char *buf, size_t size, off_t offset)
   }
   if (cond) delete cond;
 
+
+  // determine whether read range overlaps with file
+  // FIXME: maybe we should stat the file again?
+  dout(10) << "file size: " << in->inode.size << endl;
+  if (offset >= in->inode.size) {
+    client_lock.Unlock();
+    return 0;
+  }
+  if (size > in->inode.size) size = in->inode.size;
+  
   int rvalue = 0;
-  if (0) {
-       // (some of) read from buffer?
-       // .... bleh ....
-  } else {
-       // issue read
-       Cond cond;
+#ifndef BUFFERCACHE
+  {
+    Cond cond;
+    bufferlist blist;   // data will go here
 
-       bufferlist blist;   // data will go here
-       
-       C_Client_Cond *onfinish = new C_Client_Cond(&cond, &client_lock, &rvalue);
-       
-       filer->read(in->inode.ino, g_OSD_FileLayout, size, offset, &blist, onfinish);
-       
-       cond.Wait(client_lock);
+    C_Client_Cond *onfinish = new C_Client_Cond(&cond, &client_lock, &rvalue);
+    filer->read(in->inode.ino, g_OSD_FileLayout, size, offset, &blist, onfinish);
+    cond.Wait(client_lock);
 
-       // copy data into caller's buf
-       blist.copy(0, blist.length(), buf);
+    // copy data into caller's buf
+    blist.copy(0, blist.length(), buf);
   }
 
+#else
+  // map buffercache 
+  map<off_t, Bufferhead*> hits, inflight;
+  map<off_t, Bufferhead*>::iterator curbuf;
+  map<off_t, size_t> holes;
+  map<off_t, size_t>::iterator hole;
+  
+  Filecache *fc = bc.get_fc(in->inode.ino);
+  curbuf = fc->map_existing(size, offset, hits, inflight, holes);  
+  
+  if (curbuf != fc->buffer_map.end() && hits.count(curbuf->first)) {
+    // sweet -- we can return stuff immediately: find out how much
+    dout(7) << "read bc hit" << endl;
+    rvalue = (int)bc.touch_continuous(hits, size, offset);
+    assert(rvalue > 0);
+    rvalue = fc->copy_out((size_t)rvalue, offset, buf);
+    assert(rvalue > 0);
+    dout(7) << "read bc hit: immediately returning " << rvalue << " bytes" << endl;
+  }
+  // issue reads for holes
+  int hole_rvalue = 0; //FIXME: don't really need to track rvalue in MissFinish context
+  for (hole = holes.begin(); hole != holes.end(); hole++) {
+    dout(7) << "read bc miss" << endl;
+    off_t hole_offset = hole->first;
+    size_t hole_size = hole->second;
+    
+    // insert new bufferhead without allocating buffers (Filer::handle_osd_read_reply allocates them)
+    Bufferhead *bh = new Bufferhead(in->inode.ino, hole_offset, &bc);
+    
+    // read into the buffercache: when finished transition state from inflight to clean
+    bh->miss_start();
+    C_Client_MissFinish *onfinish = new C_Client_MissFinish(bh, &client_lock, &hole_rvalue);   
+    filer->read(in->inode.ino, g_OSD_FileLayout, hole_size, hole_offset, &(bh->bl), onfinish);
+    dout(7) << "read bc miss: issued osd read len: " << hole_size << " off: " << hole_offset << endl;
+  }
+  
+  if (rvalue == 0) {
+    // we need to wait for the first buffer
+    dout(7) << "read bc miss: waiting for first buffer" << endl;
+    Bufferhead *bh;
+    if (curbuf == fc->buffer_map.end() && fc->buffer_map.count(offset)) {
+      dout(10) << "first buffer is currently read in" << endl;
+      bh = fc->buffer_map[offset];
+    } else {
+      dout(10) << "first buffer is either hit or inflight" << endl;
+      bh = curbuf->second;  
+    }
+    if (bh->state == BUFHD_STATE_INFLIGHT) {
+      bh->wait_for_read(client_lock);
+    }
+
+    // buffer is filled -- see how much we can return
+    hits.clear(); inflight.clear(); holes.clear();
+    fc->map_existing(size, offset, hits, inflight, holes); // FIXME: overkill
+    assert(hits.count(bh->offset));
+    rvalue = bc.touch_continuous(hits, size, offset);
+    fc->copy_out(rvalue, offset, buf);
+    dout(7) << "read bc no hit: returned first " << rvalue << " bytes" << endl;
+  }
+#endif
+  // done!
   client_lock.Unlock();
-  return rvalue;  
+  return rvalue;
 }
 
-
 // hack.. see async write() below
 class C_Client_WriteBuffer : public Context {
 public:
   Inode *in;
   bufferlist *blist;
   C_Client_WriteBuffer(Inode *in, bufferlist *blist) {
-       this->in = in;
-       this->blist = blist;
+        this->in = in;
+        this->blist = blist;
   }
   void finish(int r) {
-       in->inflight_buffers.erase(blist);
-       delete blist;
-       
-       if (in->inflight_buffers.empty()) {
-         // wake up flush waiters
-         for (list<Cond*>::iterator it = in->waitfor_flushed.begin();
-                  it != in->waitfor_flushed.end();
-                  it++) {
-               (*it)->Signal();
-         }
-         in->waitfor_flushed.clear();
-       }
+        in->inflight_buffers.erase(blist);
+        delete blist;
+
+        if (in->inflight_buffers.empty()) {
+          // wake up flush waiters
+          for (list<Cond*>::iterator it = in->waitfor_flushed.begin();
+                   it != in->waitfor_flushed.end();
+                   it++) {
+                (*it)->Signal();
+          }
+          in->waitfor_flushed.clear();
+        }
   }
 };
 
+
 int Client::write(fileh_t fh, const char *buf, size_t size, off_t offset) 
 {
   client_lock.Lock();
@@ -1170,28 +1330,43 @@ int Client::write(fileh_t fh, const char *buf, size_t size, off_t offset)
 
 
   // buffered write?
-  if (false && f->caps & CFILE_CAP_WRBUFFER) {
+  if (f->caps & CFILE_CAP_WRBUFFER) {
        // buffered write
-       dout(10) << "buffered/async write" << endl;
-
-       /*
-         hack for now.. replace this with a real buffer cache
-
-         just copy the buffer, send the write off, and return immediately.  
-         flush() will block until all outstanding writes complete.
-       */
-
-       bufferlist *blist = new bufferlist;
-       blist->push_back( new buffer(buf, size, BUFFER_MODE_COPY|BUFFER_MODE_FREE) );
-
-       in->inflight_buffers.insert(blist);
-
-       Context *onfinish = new C_Client_WriteBuffer( in, blist );
-       filer->write(in->inode.ino, g_OSD_FileLayout, size, offset, *blist, 0, onfinish);
+       dout(7) << "buffered/async write" << endl;
+    
+#ifdef BUFFERCACHE
+    // map buffercache for writing
+    map<off_t, Bufferhead*> buffers, inflight;
+    bc.map_or_alloc(in->inode.ino, size, offset, buffers, inflight); 
+    
+    // wait for inflight buffers
+    while (!inflight.empty()) {
+      inflight.begin()->second->wait_for_write(client_lock);
+      buffers.clear(); inflight.clear();
+      bc.map_or_alloc(in->inode.ino, size, offset, buffers, inflight); // FIXME: overkill
+    } 
+    bc.dirty(in->inode.ino, size, offset, buf);
+#else
+    /*
+      hack for now.. replace this with a real buffer cache
+
+      just copy the buffer, send the write off, and return immediately.  
+      flush() will block until all outstanding writes complete.
+    */
+
+    bufferlist *blist = new bufferlist;
+    blist->push_back( new buffer(buf, size, BUFFER_MODE_COPY|BUFFER_MODE_FREE) );
+
+    in->inflight_buffers.insert(blist);
+
+    Context *onfinish = new C_Client_WriteBuffer( in, blist );
+    filer->write(in->inode.ino, g_OSD_FileLayout, size, offset, *blist, 0, onfinish);
+#endif
 
   } else {
        // synchronous write
-       dout(10) << "synchronous write" << endl;
+    // FIXME: do not bypass buffercache
+       dout(7) << "synchronous write" << endl;
 
        // create a buffer that refers to *buf, but doesn't try to free it when it's done.
        bufferlist blist;
index 5c1800bd02371c710af8502c77fb8fd1407e5c76..11afd17d1afbc9cbfafdf079e12b921b9ac0bc98 100644 (file)
@@ -1,6 +1,8 @@
 #ifndef __CLIENT_H
 #define __CLIENT_H
 
+#include "Buffercache.h"
+
 #include "mds/MDCluster.h"
 #include "osd/OSDCluster.h"
 
@@ -240,6 +242,9 @@ class Client : public Dispatcher {
 
   
   // buffer cache
+  Buffercache bc;
+  
+  int flush_buffers();     // flush dirty buffers
   int flush_inode_buffers(Inode *in);     // flush buffered writes
   int release_inode_buffers(Inode *in);   // release cached reads
                
index 933d41e72bec7b4f73ab11349ba1bc77f778df98..028f3c6e196de8e1e4f9c4648d5ce4fe1bd815ff 100644 (file)
@@ -278,7 +278,7 @@ class bufferptr {
        assert(len >= 0 && off + len <= _len);
        memcpy(dest, c_str() + off, len);
   }
-  void copy_in(int off, int len, char *src) {
+  void copy_in(int off, int len, const char *src) {
        assert(off >= 0 && off <= _len);
        assert(len >= 0 && off + len <= _len);
        memcpy(c_str() + off, src, len);
index 2222c99ec509d4521db1c6aafd8e941151c15c31..0f9ddedd2bad5cf9a0e08d3385ace59e50232601 100644 (file)
@@ -161,7 +161,7 @@ class bufferlist {
        }
   }
 
-  void copy_in(int off, int len, char *src) {
+  void copy_in(int off, int len, const char *src) {
        assert(off >= 0);
        assert(off + len <= length());