]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
Modified Files:
authorcarlosm <carlosm@29311d96-e01e-0410-9327-a35deaab8ce9>
Sun, 10 Jul 2005 18:29:28 +0000 (18:29 +0000)
committercarlosm <carlosm@29311d96-e01e-0410-9327-a35deaab8ce9>
Sun, 10 Jul 2005 18:29:28 +0000 (18:29 +0000)
  Makefile config.cc client/Buffercache.cc client/Buffercache.h
  client/Client.cc client/Client.h include/config.h

Buffercache with flushing after max age and shrinking based on max size. Still
not bug free: e.g. copying a large file eventually corrupts buffercache size
variables with the consequence that flushing stops.

git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@437 29311d96-e01e-0410-9327-a35deaab8ce9

ceph/client/Buffercache.cc
ceph/client/Buffercache.h
ceph/client/Client.cc
ceph/client/Client.h
ceph/config.cc
ceph/config.h

index a1d9020a43beb443c96582b07cd3b35c6c78c566..7062900e1179774e350ffb93ce343332fd64e946 100644 (file)
@@ -70,13 +70,32 @@ void Bufferhead::alloc_buffers(size_t size)
   dout(7) << "bc: allocated " << bl.buffers().size() << " buffers (" << bl.length() << " bytes) " << endl;
 }
 
-void Bufferhead::dirty() {
+void Bufferhead::miss_start(size_t miss_len) 
+{
+  assert(state == BUFHD_STATE_CLEAN);
+  state = BUFHD_STATE_INFLIGHT;
+  this->miss_len = miss_len;
+  bc->lru.lru_touch(this);
+}
+
+void Bufferhead::miss_finish() 
+{
+  assert(state == BUFHD_STATE_INFLIGHT);
+  state = BUFHD_STATE_CLEAN;
+  //assert(bl.length() == miss_len);
+  wakeup_read_waiters();
+  wakeup_write_waiters();
+}
+
+void Bufferhead::dirty() 
+{
   if (state == BUFHD_STATE_CLEAN) {
     dout(10) << "bc: dirtying clean buffer size: " << bl.length() << endl;
     state = BUFHD_STATE_DIRTY;
     dirty_since = time(NULL); // start clock for dirty buffer here
+    bc->lru.lru_touch(this);
     bc->clean_to_dirty(bl.length());
-    dout(10) << "bc: clean_size: " << bc->get_clean_size() << " dirty_size: " << bc->get_dirty_size() << " flushing_size: " << bc->get_flushing_size() << endl;
+    dout(6) << "bc: clean_size: " << bc->get_clean_size() << " dirty_size: " << bc->get_dirty_size() << " flushing_size: " << bc->get_flushing_size() << " age: " << bc->dirty_buffers.get_age() << endl;
     assert(!bc->dirty_buffers.exist(this));
     bc->dirty_buffers.insert(this); 
     get();
@@ -88,7 +107,8 @@ void Bufferhead::dirty() {
   }
 }
 
-void Bufferhead::leave_dirtybuffers() {
+void Bufferhead::dirtybuffers_erase() 
+{
   dout(10) << "bc: erase in dirtybuffers size: " << bl.length() << " in state " << state << endl;
   assert(bc->dirty_buffers.exist(this));
   bc->dirty_buffers.erase(this);
@@ -98,21 +118,23 @@ void Bufferhead::leave_dirtybuffers() {
   put();
 }
 
-void Bufferhead::flush_start() {
+void Bufferhead::flush_start() 
+{
   dout(10) << "bc: flush_start" << endl;
   assert(state == BUFHD_STATE_DIRTY);
   state = BUFHD_STATE_INFLIGHT;
-  leave_dirtybuffers();
+  dirtybuffers_erase();
   bc->dirty_to_flushing(bl.length());
-  dout(10) << "bc: clean_size: " << bc->get_clean_size() << " dirty_size: " << bc->get_dirty_size() << " flushing_size: " << bc->get_flushing_size() << endl;
+  dout(6) << "bc: clean_size: " << bc->get_clean_size() << " dirty_size: " << bc->get_dirty_size() << " flushing_size: " << bc->get_flushing_size() << " age: " << bc->dirty_buffers.get_age() << endl;
 }
 
-void Bufferhead::flush_finish() {
+void Bufferhead::flush_finish() 
+{
   dout(10) << "bc: flush_finish" << endl;
   assert(state == BUFHD_STATE_INFLIGHT);
   state = BUFHD_STATE_CLEAN;
   bc->flushing_to_clean(bl.length());
-  dout(10) << "bc: clean_size: " << bc->get_clean_size() << " dirty_size: " << bc->get_dirty_size() << " flushing_size: " << bc->get_flushing_size() << endl;
+  dout(6) << "bc: clean_size: " << bc->get_clean_size() << " dirty_size: " << bc->get_dirty_size() << " flushing_size: " << bc->get_flushing_size() << " age: " << bc->dirty_buffers.get_age() << endl;
   wakeup_write_waiters(); // readers never wait on flushes
 }
 
@@ -128,6 +150,41 @@ void Bufferhead::claim_append(Bufferhead *other)
 
 // -- Dirtybuffers methods
 
+void Dirtybuffers::erase(Bufferhead* bh) 
+{
+  dout(7) << "dirtybuffer: erase bh->ino: " << bh->ino << " offset: " << bh->offset << endl;
+  int osize = _dbufs.size();
+  for (multimap<time_t, Bufferhead*>::iterator it = _dbufs.lower_bound(bh->dirty_since);
+       it != _dbufs.upper_bound(bh->dirty_since);
+       it++) {
+   if (it->second == bh) {
+      _dbufs.erase(it);
+      break;
+    }
+  }
+  assert(_dbufs.size() == osize - 1);
+}
+
+void Dirtybuffers::insert(Bufferhead* bh) 
+{
+  dout(7) << "dirtybuffer: insert bh->ino: " << bh->ino << " offset: " << bh->offset << endl;
+  _dbufs.insert(pair<time_t, Bufferhead*>(bh->dirty_since, bh));
+}
+
+bool Dirtybuffers::exist(Bufferhead* bh) 
+{
+  for (multimap<time_t, Bufferhead*>::iterator it = _dbufs.lower_bound(bh->dirty_since);
+      it != _dbufs.upper_bound(bh->dirty_since);
+      it++ ) {
+    if (it->second == bh) {
+      dout(10) << "dirtybuffer: found bh->ino: " << bh->ino << " offset: " << bh->offset << endl;
+      return true;
+    }
+  }
+  return false;
+}
+
+
 void Dirtybuffers::get_expired(time_t ttl, size_t left_dirty, list<Bufferhead*>& to_flush) 
 {
   time_t now = time(NULL);
@@ -224,7 +281,7 @@ void Filecache::simplify()
       Bufferhead *bh = next->second;
       start->second->claim_append(bh);
       if (bh->state == BUFHD_STATE_DIRTY) {
-        bh->leave_dirtybuffers(); 
+        bh->dirtybuffers_erase(); 
        bh->state = BUFHD_STATE_CLEAN; 
       }
       removed.push_back(bh);
@@ -372,7 +429,7 @@ void Buffercache::release_file(inodeno_t ino)
 
     decrease_size(it->second->bl.length());
 
-    dout(10) << "bc: clean_size: " << get_clean_size() << " dirty_size: " << get_dirty_size() << " flushing_size: " << get_flushing_size() << endl;
+    dout(6) << "bc: clean_size: " << get_clean_size() << " dirty_size: " << get_dirty_size() << " flushing_size: " << get_flushing_size() << " age: " << dirty_buffers.get_age() << endl;
     assert(clean_size >= 0);
     delete it->second;    
   }
@@ -381,6 +438,19 @@ void Buffercache::release_file(inodeno_t ino)
   delete fc;  
 }
 
+void Buffercache::get_reclaimable(size_t min_size, list<Bufferhead*>& reclaimed)
+{
+  while (min_size > 0) {
+    if (Bufferhead *bh = (Bufferhead*)lru.lru_expire()) {
+      reclaimed.push_back(bh);
+      min_size -= bh->bl.length();
+    } else {
+      break;
+    }
+  }
+}
+
+
 size_t Buffercache::reclaim(size_t min_size)
 {
   dout(7) << "bc: reclaim min_size: " << min_size << endl;
@@ -395,7 +465,7 @@ size_t Buffercache::reclaim(size_t min_size)
 
       decrease_size(bh->bl.length());
 
-      dout(10) << "bc: clean_size: " << get_clean_size() << " dirty_size: " << get_dirty_size() << " flushing_size: " << get_flushing_size() << endl;
+      dout(6) << "bc: clean_size: " << get_clean_size() << " dirty_size: " << get_dirty_size() << " flushing_size: " << get_flushing_size() << " age: " << dirty_buffers.get_age() << endl;
       assert(clean_size >= 0);
       bh->fc->buffer_map.erase(bh->offset);
       if (bh->fc->buffer_map.empty()) {
index f270bc38548ce73bbf2bd7f8f29b47ebb1959ed9..849d7d393580eb53460bdcdce47398a4ab061443 100644 (file)
@@ -98,22 +98,10 @@ class Bufferhead : public LRUObject {
     write_waiters.clear(); 
   }
   
-  void miss_start(size_t miss_len) {
-       assert(state == BUFHD_STATE_CLEAN);
-       state = BUFHD_STATE_INFLIGHT;
-       this->miss_len = miss_len;
-  }
-  
-  void miss_finish() {
-       assert(state == BUFHD_STATE_INFLIGHT);
-       state = BUFHD_STATE_CLEAN;
-       //assert(bl.length() == miss_len);
-       wakeup_read_waiters();
-       wakeup_write_waiters();
-  }
-  
+  void miss_start(size_t miss_len);
+  void miss_finish();
   void dirty();
-  void leave_dirtybuffers();
+  void dirtybuffers_erase();
   void flush_start();
   void flush_finish();
   void claim_append(Bufferhead* other);
@@ -125,39 +113,14 @@ class Dirtybuffers {
   multimap<time_t, Bufferhead*> _dbufs;
 
  public:
-  void erase(Bufferhead* bh) {
-    dout(7) << "dirtybuffer: erase bh->ino: " << bh->ino << " offset: " << bh->offset << endl;
-    int osize = _dbufs.size();
-    for (multimap<time_t, Bufferhead*>::iterator it = _dbufs.lower_bound(bh->dirty_since);
-         it != _dbufs.upper_bound(bh->dirty_since);
-        it++) {
-     if (it->second == bh) {
-        _dbufs.erase(it);
-        break;
-      }
-    }
-    assert(_dbufs.size() == osize - 1);
-  }
-
-  void insert(Bufferhead* bh) {
-    dout(7) << "dirtybuffer: insert bh->ino: " << bh->ino << " offset: " << bh->offset << endl;
-    _dbufs.insert(pair<time_t, Bufferhead*>(bh->dirty_since, bh));
-  }
-
+  void erase(Bufferhead* bh);
+  void insert(Bufferhead* bh);
   bool empty() { return _dbufs.empty(); }
-
-  bool exist(Bufferhead* bh) {
-    for (multimap<time_t, Bufferhead*>::iterator it = _dbufs.lower_bound(bh->dirty_since);
-        it != _dbufs.upper_bound(bh->dirty_since);
-       it++ ) {
-      if (it->second == bh) {
-       dout(10) << "dirtybuffer: found bh->ino: " << bh->ino << " offset: " << bh->offset << endl;
-        return true;
-      }
-    }
-    return false;
-  }
+  bool exist(Bufferhead* bh);
   void get_expired(time_t ttl, size_t left_dirty, list<Bufferhead*>& to_flush);
+  time_t get_age() { 
+    if (!_dbufs.empty()) return time(NULL) - _dbufs.begin()->second->dirty_since;
+  }
 };
 
 
@@ -216,6 +179,7 @@ class Buffercache {
   LRU lru;
   Dirtybuffers dirty_buffers;
   list<Cond*> waitfor_flushed;
+  set<Bufferhead*> flushing_buffers;
 
   Buffercache() : dirty_size(0), flushing_size(0), clean_size(0) { }
   
@@ -236,6 +200,12 @@ class Buffercache {
     return bcache_map[ino];
   }
       
+  void wait_for_flush(Mutex &lock) {
+    Cond cond;
+    waitfor_flushed.push_back(&cond);
+    cond.Wait(lock);
+  }
+
   void clean_to_dirty(size_t size) {
     clean_size -= size;
     assert(clean_size >= 0);
@@ -266,6 +236,8 @@ class Buffercache {
   size_t get_clean_size() { return clean_size; }
   size_t get_dirty_size() { return dirty_size; }
   size_t get_flushing_size() { return flushing_size; }
+  size_t get_total_size() { return clean_size + dirty_size + flushing_size; }
+  void get_reclaimable(size_t min_size, list<Bufferhead*>&);
 
   void insert(Bufferhead *bh);
   void dirty(inodeno_t ino, size_t size, off_t offset, const char *src);
index 9d4636d864054eddf6809fc443345a2cda08f97b..14d965913098a256425dd783d63771bbcada0edd 100644 (file)
@@ -421,7 +421,7 @@ public:
 };
 
 
-int Client::flush_inode_buffers(Inode *in)
+void Client::flush_inode_buffers(Inode *in)
 {
   if (!in->inflight_buffers.empty()) {
     dout(7) << "inflight buffers of sync write, waiting" << endl;
@@ -458,13 +458,23 @@ public:
   Bufferhead *bh;
   C_Client_FlushFinish(Bufferhead *bh) {
     this->bh = bh;
+    bh->bc->flushing_buffers.insert(bh);
   }
   void finish(int r) {
     bh->flush_finish();
+    bh->bc->flushing_buffers.erase(bh);
+    if (bh->bc->flushing_buffers.empty()) {
+      for (list<Cond*>::iterator it = bh->bc->waitfor_flushed.begin();
+           it != bh->bc->waitfor_flushed.end();
+          it++) {
+       (*it)->Signal();
+      }
+      bh->bc->waitfor_flushed.clear();
+    }
   }
 };
 
-int Client::flush_buffers(int ttl, size_t dirty_size)
+void Client::flush_buffers(int ttl, size_t dirty_size)
 {
   // ttl = 0 or dirty_size = 0: flush all
   if (!bc.dirty_buffers.empty()) {
@@ -476,24 +486,41 @@ int Client::flush_buffers(int ttl, size_t dirty_size)
       (*it)->flush_start();
       C_Client_FlushFinish *onfinish = new C_Client_FlushFinish(*it);
       filer->write((*it)->ino, g_OSD_FileLayout, (*it)->bl.length(), (*it)->offset, (*it)->bl, 0, onfinish);
+      bc.wait_for_flush(client_lock);
     }
   } else {
        dout(7) << "no dirty buffers" << endl;
   }
 }
 
+void Client::trim_bcache()
+{
+  if (bc.get_total_size() >  g_conf.client_bcache_size) {
+    // need to free buffers 
+    if (bc.get_dirty_size() > g_conf.client_bcache_hiwater * bc.get_total_size()) {
+      // flush buffers until we have low water mark
+      size_t want_target_size = (size_t)  g_conf.client_bcache_lowater * bc.get_total_size();
+      flush_buffers(g_conf.client_bcache_ttl, want_target_size);
+    }
+    // Now reclaim buffers
+    bc.reclaim(bc.get_total_size() - g_conf.client_bcache_size);
+  }
+}
+      
+                                                                                  
+
 /*
  * release inode (read cached) buffers from memory
  */
-int Client::release_inode_buffers(Inode *in)
+void Client::release_inode_buffers(Inode *in)
 {
 #ifdef BUFFERCACHE
-  bc.release_file(in->inode.ino);
+  // Check first we actually cached the file
+  if (bc.bcache_map.count(in->inode.ino)) bc.release_file(in->inode.ino);
 #endif
 }
 
 
-
 void Client::handle_file_caps(MClientFileCaps *m)
 {
   Fh *f = fh_map[m->get_fh()];
@@ -1214,7 +1241,7 @@ int Client::close(fileh_t fh)
 
   release_inode_buffers(in);
   put_inode( in );
-
+  
   MClientReply *reply = make_request(req, true, mds_auth);
   assert(reply);
   int result = reply->get_result();
@@ -1378,6 +1405,8 @@ int Client::read(fileh_t fh, char *buf, size_t size, off_t offset)
     rvalue = bc.touch_continuous(hits, size, offset);
     fc->copy_out(rvalue, offset, buf);
     dout(7) << "read bc no hit: returned first " << rvalue << " bytes" << endl;
+
+    trim_bcache();
   }
 #endif
   // done!
@@ -1451,6 +1480,8 @@ int Client::write(fileh_t fh, const char *buf, size_t size, off_t offset)
     } 
     bc.dirty(in->inode.ino, size, offset, buf);
 
+    trim_bcache();
+
     /*
       hack for now.. replace this with a real buffer cache
 
index 9ec3e54ab0380fbf28194b9a6ac621fc9de2a0c8..e038b12b265920143c19580aefe9a5b7ec841eca 100644 (file)
@@ -291,9 +291,10 @@ class Client : public Dispatcher {
   // buffer cache
   Buffercache bc;
   
-  int flush_buffers(int ttl, size_t dirty_size);     // flush dirty buffers
-  int flush_inode_buffers(Inode *in);     // flush buffered writes
-  int release_inode_buffers(Inode *in);   // release cached reads
+  void flush_buffers(int ttl, size_t dirty_size);     // flush dirty buffers
+  void trim_bcache();
+  void flush_inode_buffers(Inode *in);     // flush buffered writes
+  void release_inode_buffers(Inode *in);   // release cached reads
                
   friend class SyntheticClient;
 
index 271fefae08394810a54735d6ec7245ceebd27474..06ac0ad9f58087eb3ebba00737a9dc0757ef780e 100644 (file)
@@ -60,9 +60,15 @@ md_config_t g_conf = {
   client_cache_mid: .5,
   client_cache_stat_ttl: 10, // seconds until cached stat results become invalid
   client_use_random_mds:  false,
+
   client_bcache_alloc_minsize: 1024,
   client_bcache_alloc_maxsize: 262144,
   client_bcache_ttl: 30, // seconds until dirty buffers are written to disk
+  client_bcache_size: 10485760, // 10MB *for testing*
+  client_bcache_lowater: .6, // fraction of size
+  client_bcache_hiwater: .8, 
+  client_bcache_maxfrag: 10, // max actual relative # of bheads over opt rel # of bheads
+
   client_trace: 0,
   fuse_direct_io: 1,
   
index 869fa65b08f98ab471efe8835343b0950040e9e9..d94d4d17071baad808d3992b1644fbe20de400c0 100644 (file)
@@ -35,9 +35,15 @@ struct md_config_t {
   float    client_cache_mid;
   int      client_cache_stat_ttl;
   bool     client_use_random_mds;          // debug flag
+
   int      client_bcache_alloc_minsize;
   int      client_bcache_alloc_maxsize;
   int      client_bcache_ttl;
+  int      client_bcache_size;
+  float    client_bcache_lowater;
+  float    client_bcache_hiwater;
+  int      client_bcache_maxfrag;
+
   int      client_trace;
   int      fuse_direct_io;