]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
client oc fixup
authorsage <sage@29311d96-e01e-0410-9327-a35deaab8ce9>
Wed, 9 Aug 2006 04:23:06 +0000 (04:23 +0000)
committersage <sage@29311d96-e01e-0410-9327-a35deaab8ce9>
Wed, 9 Aug 2006 04:23:06 +0000 (04:23 +0000)
git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@792 29311d96-e01e-0410-9327-a35deaab8ce9

ceph/client/Client.cc
ceph/client/Client.h
ceph/client/FileCache.cc
ceph/client/FileCache.h
ceph/common/Clock.h
ceph/config.cc
ceph/config.h
ceph/include/lru.h
ceph/osdc/ObjectCacher.cc
ceph/osdc/ObjectCacher.h
ceph/osdc/Objecter.cc

index 2089c800c8febc29ca2c79abb62eaf4def67e577..8cb83aad971b26cf55c249d37d3bfaa8a52a7a33 100644 (file)
@@ -538,6 +538,7 @@ void Client::dispatch(Message *m)
        } else {
          dout(10) << "unmounting: trim pass, size still " << lru.lru_get_size() 
                           << "+" << inode_map.size() << endl;
+         dump_cache();   
        }
   }
 
@@ -1541,8 +1542,10 @@ int Client::open(const char *relpath, int mode)
        // caps included?
        int mds = MSG_ADDR_NUM(reply->get_source());
 
-       if (f->inode->caps.empty()) // first caps?
+       if (f->inode->caps.empty()) {// first caps?
+         dout(7) << " first caps on " << hex << f->inode->inode.ino << dec << endl;
          f->inode->get();
+       }
 
        int new_caps = reply->get_file_caps();
        new_caps &= CAP_FILE_WR|CAP_FILE_RD;    // HACK: test synchronous read/write
@@ -1865,7 +1868,7 @@ int Client::write(fh_t fh, const char *buf, off_t size, off_t offset)
        // create a buffer that refers to *buf, but doesn't try to free it when it's done.
        bufferlist blist;
        blist.push_back( new buffer(buf, size, BUFFER_MODE_NOCOPY|BUFFER_MODE_NOFREE) );
-         
+       
        // issue write
        Cond cond;
        bool done = false;
index 904d5680ce2828339c6797c344796a5e9d41496f..4a08e6d976b580cf1bc991c225acc839381438f2 100644 (file)
@@ -148,11 +148,11 @@ class Inode {
 
   void get() { 
        ref++; 
-       //cout << "inode.get on " << hex << inode.ino << dec << " now " << ref << endl;
+       cout << "inode.get on " << hex << inode.ino << dec << " now " << ref << endl;
   }
   void put() { 
        ref--; assert(ref >= 0); 
-       //cout << "inode.put on " << hex << inode.ino << dec << " now " << ref << endl;
+       cout << "inode.put on " << hex << inode.ino << dec << " now " << ref << endl;
   }
 
   Inode(inode_t _inode, ObjectCacher *_oc) : 
index 2092c11a982ee61b8e37704ed08c1ebd216a72b5..8aefd35f18824009d9a39e933d404df3341d50dd 100644 (file)
 
 void FileCache::flush_dirty(Context *onflush)
 {
-  oc->flush_set(inode.ino, onflush);
+  if (oc->flush_set(inode.ino, onflush)) {
+       onflush->finish(0);
+       delete onflush;
+  }
 }
 
 off_t FileCache::release_clean()
@@ -31,15 +34,12 @@ bool FileCache::is_dirty()
 void FileCache::empty(Context *onempty)
 {
   off_t unclean = release_clean();
-  bool clean = oc->flush_set(inode.ino);
+  bool clean = oc->flush_set(inode.ino, onempty);
   assert(!unclean == clean);
-  
+
   if (clean) {
        onempty->finish(0);
        delete onempty;
-  } else {
-       clean = oc->flush_set(inode.ino, onempty);      
-       assert(!clean);
   }
 }
 
@@ -147,3 +147,17 @@ void FileCache::write(off_t offset, size_t size, bufferlist& blist, Mutex& clien
   if (num_writing == 0 && !caps_callbacks.empty())
        check_caps();
 }
+
+bool FileCache::all_safe()
+{
+  return !oc->set_is_dirty_or_committing(inode.ino);
+}
+
+void FileCache::add_safe_waiter(Context *c) 
+{
+  bool safe = oc->commit_set(inode.ino, c);
+  if (safe) {
+       c->finish(0);
+       delete c;
+  }
+}
index 1b0eb3b948e02fa0851552fd0776cc75d21ff3b7..2e2cfc4e31c4e59d7d60f8ac26846f0c1c099e3c 100644 (file)
@@ -19,12 +19,12 @@ class FileCache {
 
   int num_reading;
   int num_writing;
-  int num_unsafe;
+  //int num_unsafe;
 
   // waiters
   list<Cond*> waitfor_read;
   list<Cond*> waitfor_write;
-  list<Context*> waitfor_safe;
+  //list<Context*> waitfor_safe;
   bool waitfor_release;
 
  public:
@@ -32,17 +32,17 @@ class FileCache {
        oc(_oc), 
        inode(_inode),
        latest_caps(0),
-       num_reading(0), num_writing(0), num_unsafe(0),
+       num_reading(0), num_writing(0),// num_unsafe(0),
        waitfor_release(false) {}
 
   // waiters/waiting
   bool can_read() { return latest_caps & CAP_FILE_RD; }
   bool can_write() { return latest_caps & CAP_FILE_WR; }
-  bool all_safe() { return num_unsafe == 0; }
+  bool all_safe();// { return num_unsafe == 0; }
 
   void add_read_waiter(Cond *c) { waitfor_read.push_back(c); }
   void add_write_waiter(Cond *c) { waitfor_write.push_back(c); }
-  void add_safe_waiter(Context *c) { waitfor_safe.push_back(c); }
+  void add_safe_waiter(Context *c);// { waitfor_safe.push_back(c); }
 
   // ...
   void flush_dirty(Context *onflush=0);
index 492e73c99f0312026a41e43fd2b64f5d1a36a617..4ee2fcd6fe5a2e43dc537bd5d8c561fa7b41d95e 100644 (file)
@@ -132,7 +132,7 @@ class Clock {
   // relative time (from startup)
   const utime_t& now() {
        gettimeofday(&last.timeval(), NULL);
-       last -= zero;
+       //last -= zero;
        //last = abs_last - start_offset;
        return last;
   }
index 3096757ce1dc7ed8accf24f7ee7eee539bbbd0c3..c0e75b18a1888047c2b75b424ae8c4b23a75870a 100644 (file)
@@ -71,6 +71,7 @@ md_config_t g_conf = {
   debug_mds_log: 1,
   debug_buffer: 0,
   debug_filer: 0,
+  debug_objecter: 0,
   debug_objectcacher: 0,
   debug_client: 0,
   debug_osd: 0,
@@ -384,6 +385,11 @@ void parse_config_options(vector<char*>& args)
                g_conf.debug_filer = atoi(args[++i]);
          else 
                g_debug_after_conf.debug_filer = atoi(args[++i]);
+       else if (strcmp(args[i], "--debug_objecter") == 0) 
+         if (!g_conf.debug_after) 
+               g_conf.debug_objecter = atoi(args[++i]);
+         else 
+               g_debug_after_conf.debug_objecter = atoi(args[++i]);
        else if (strcmp(args[i], "--debug_objectcacher") == 0) 
          if (!g_conf.debug_after) 
                g_conf.debug_objectcacher = atoi(args[++i]);
index b60d18ae97774e5fbeb08d47b51362e77a5aa190..ef28dc98c3db0ce928d38fcafc506a1c3c967f9e 100644 (file)
@@ -45,6 +45,7 @@ struct md_config_t {
   int debug_mds_log;
   int debug_buffer;
   int debug_filer;
+  int debug_objecter;
   int debug_objectcacher;
   int debug_client;
   int debug_osd;
index a8079ef13873ce5968c4332cb8fc035b46591e87..38a86e4d7da9c915c689454e9cf1c8a898a618a3 100644 (file)
@@ -258,15 +258,13 @@ class LRU {
 
 
   // expire -- expire a single item
-  LRUObject *lru_expire() {
+  LRUObject *lru_get_next_expire() {
        LRUObject *p;
-
+       
        // look through tail of bot
        while (lru_bot.get_length()) {
          p = lru_bot.get_tail();
-
-         if (!p->lru_pinned) 
-               return lru_remove(p);   // yay.
+         if (!p->lru_pinned) return p;
 
          // move to pintail
          lru_bot.remove(p);
@@ -276,8 +274,7 @@ class LRU {
        // ok, try head then
        while (lru_top.get_length()) {
          p = lru_top.get_tail();
-         if (!p->lru_pinned) 
-               return lru_remove( p );
+         if (!p->lru_pinned) return p;
 
          // move to pintail
          lru_top.remove(p);
@@ -287,6 +284,13 @@ class LRU {
        // no luck!
        return NULL;
   }
+  
+  LRUObject *lru_expire() {
+       LRUObject *p = lru_get_next_expire();
+       if (p) 
+         return lru_remove(p);
+       return NULL;
+  }
 
 
   void lru_status() {
index 39dbe598789f625748d3bf8d3f8cce0bb7181f45..4686ed27d923ee021e74e3edaf0d6fa67baac34c 100644 (file)
@@ -417,14 +417,14 @@ void ObjectCacher::lock_ack(list<object_t>& oids, tid_t tid)
        } 
        
        Object *ob = objects[oid];
+
+       list<Context*> ls;
        
        assert(tid <= ob->last_write_tid);
        if (ob->last_write_tid == tid) {
          dout(10) << "lock_ack " << *ob
                           << " tid " << tid << endl;
 
-         list<Context*> ls;
-
          switch (ob->lock_state) {
          case Object::LOCK_RDUNLOCKING: 
          case Object::LOCK_WRUNLOCKING: 
@@ -448,14 +448,21 @@ void ObjectCacher::lock_ack(list<object_t>& oids, tid_t tid)
          
          ob->last_ack_tid = tid;
          
-         finish_contexts(ls);
-
          if (ob->can_close())
                close_object(ob);
        } else {
          dout(10) << "lock_ack " << *ob 
                           << " tid " << tid << " obsolete" << endl;
        }
+
+       // waiters?
+       if (ob->waitfor_ack.count(tid)) {
+         ls.splice(ls.end(), ob->waitfor_ack[tid]);
+         ob->waitfor_ack.erase(tid);
+       }
+
+       finish_contexts(ls);
+
   }
 }
 
@@ -553,6 +560,22 @@ void ObjectCacher::bh_write_commit(object_t oid, off_t start, size_t length, tid
 }
 
 
+void ObjectCacher::flush()
+{
+  utime_t cutoff = g_clock.now();
+  //cutoff.sec_ref() -= g_conf.client_oc_max_dirty_age;
+
+  dout(10) << "flush" << endl;
+  
+  while (1) {
+       BufferHead *bh = (BufferHead*) lru_dirty.lru_get_next_expire();
+       if (!bh) break;
+       if (bh->last_write > cutoff) break;
+
+       bh_write(bh->ob, bh);
+  }    
+}
+
 void ObjectCacher::trim(off_t max)
 {
   if (max < 0) 
@@ -717,6 +740,8 @@ int ObjectCacher::readx(Objecter::OSDRead *rd, inodeno_t ino, Context *onfinish)
 
 int ObjectCacher::writex(Objecter::OSDWrite *wr, inodeno_t ino)
 {
+  utime_t now = g_clock.now();
+  
   for (list<ObjectExtent>::iterator ex_it = wr->extents.begin();
           ex_it != wr->extents.end();
        ex_it++) {
@@ -748,8 +773,12 @@ int ObjectCacher::writex(Objecter::OSDWrite *wr, inodeno_t ino)
 
        // it's dirty.
        mark_dirty(bh);
+       touch_bh(bh);
+       bh->last_write = now;
   }
 
+  delete wr;
+
   trim();
   return 0;
 }
@@ -801,7 +830,10 @@ int ObjectCacher::atomic_sync_readx(Objecter::OSDRead *rd, inodeno_t ino, Mutex&
          Object *o = get_object(i->first, ino);
          rdlock(o);
        }
-       
+
+       // readx will hose rd
+       list<ObjectExtent> extents = rd->extents;
+
        // do the read, into our cache
        Cond cond;
        bool done = false;
@@ -811,8 +843,8 @@ int ObjectCacher::atomic_sync_readx(Objecter::OSDRead *rd, inodeno_t ino, Mutex&
        while (!done) cond.Wait(lock);
        
        // release the locks
-       for (list<ObjectExtent>::iterator ex_it = rd->extents.begin();
-                ex_it != rd->extents.end();
+       for (list<ObjectExtent>::iterator ex_it = extents.begin();
+                ex_it != extents.end();
                 ex_it++) {
          assert(objects.count(ex_it->oid));
          Object *o = objects[ex_it->oid];
@@ -873,20 +905,23 @@ int ObjectCacher::atomic_sync_writex(Objecter::OSDWrite *wr, inodeno_t ino, Mute
        wrlock(o);
   }
   
+  // writex will hose wr
+  list<ObjectExtent> extents = wr->extents;
+
   // do the write, into our cache
   writex(wr, ino);
   
   // flush 
   // ...and release the locks?
-  for (list<ObjectExtent>::iterator ex_it = wr->extents.begin();
-          ex_it != wr->extents.end();
+  for (list<ObjectExtent>::iterator ex_it = extents.begin();
+          ex_it != extents.end();
           ex_it++) {
        assert(objects.count(ex_it->oid));
        Object *o = objects[ex_it->oid];
        
        wrunlock(o);
   }
-  
+
   return 0;
 }
  
@@ -1101,12 +1136,11 @@ bool ObjectCacher::flush(Object *ob)
 }
 
 // flush.  non-blocking, takes callback.
-// returns true if already flushed, and deletes the callback.
+// returns true if already flushed
 bool ObjectCacher::flush_set(inodeno_t ino, Context *onfinish)
 {
   if (objects_by_ino.count(ino) == 0) {
        dout(10) << "flush_set on " << hex << ino << dec << " dne" << endl;
-       delete onfinish;
        return true;
   }
 
@@ -1115,6 +1149,7 @@ bool ObjectCacher::flush_set(inodeno_t ino, Context *onfinish)
   C_Gather *gather = 0; // we'll need to wait for all objects to flush!
 
   set<Object*>& s = objects_by_ino[ino];
+  bool safe = true;
   for (set<Object*>::iterator i = s.begin();
           i != s.end();
           i++) {
@@ -1122,20 +1157,21 @@ bool ObjectCacher::flush_set(inodeno_t ino, Context *onfinish)
 
        if (!flush(ob)) {
          // we'll need to gather...
-         if (!gather) 
+         if (!gather && onfinish
                gather = new C_Gather(onfinish);
+         safe = false;
 
          dout(10) << "flush_set " << hex << ino << dec << " will wait for ack tid " 
                           << ob->last_write_tid 
                           << " on " << *ob
                           << endl;
-         ob->waitfor_ack[ob->last_write_tid].push_back(gather->new_sub());
+         if (gather)
+               ob->waitfor_ack[ob->last_write_tid].push_back(gather->new_sub());
        }
   }
   
-  if (!gather) {
+  if (safe) {
        dout(10) << "flush_set " << hex << ino << dec << " has no dirty|tx bhs" << endl;
-       delete onfinish;
        return true;
   }
   return false;
@@ -1150,7 +1186,6 @@ bool ObjectCacher::commit_set(inodeno_t ino, Context *onfinish)
 
   if (objects_by_ino.count(ino) == 0) {
        dout(10) << "commit_set on " << hex << ino << dec << " dne" << endl;
-       delete onfinish;
        return true;
   }
 
@@ -1159,6 +1194,7 @@ bool ObjectCacher::commit_set(inodeno_t ino, Context *onfinish)
   C_Gather *gather = 0; // we'll need to wait for all objects to commit
 
   set<Object*>& s = objects_by_ino[ino];
+  bool safe = true;
   for (set<Object*>::iterator i = s.begin();
           i != s.end();
           i++) {
@@ -1167,18 +1203,19 @@ bool ObjectCacher::commit_set(inodeno_t ino, Context *onfinish)
        // make sure it's flushing.
        flush_set(ino);
 
-       if (ob->last_write_tid < ob->last_commit_tid) {
+       if (ob->last_write_tid > ob->last_commit_tid) {
          dout(10) << "commit_set " << hex << ino << dec << " " << *ob 
                           << " will finish on commit tid " << ob->last_write_tid
                           << endl;
-         if (!gather) gather = new C_Gather(onfinish);
-         ob->waitfor_commit[ob->last_write_tid].push_back( gather->new_sub() );
+         if (!gather && onfinish) gather = new C_Gather(onfinish);
+         safe = false;
+         if (gather)
+               ob->waitfor_commit[ob->last_write_tid].push_back( gather->new_sub() );
        }
   }
 
-  if (!gather) {
+  if (safe) {
        dout(10) << "commit_set " << hex << ino << dec << " all committed" << endl;
-       delete onfinish;
        return true;
   }
   return false;
index f637709b500fc0fb3de23fa9d8021d285a14b7cc..f2a23cb029055dbcf5bbd7a7ced29ece8b0869cf 100644 (file)
@@ -41,6 +41,7 @@ class ObjectCacher {
        Object *ob;
        bufferlist  bl;
        tid_t last_write_tid;  // version of bh (if non-zero)
+       utime_t last_write;
        
        map< off_t, list<Context*> > waitfor_read;
        
@@ -314,6 +315,7 @@ class ObjectCacher {
   void bh_write(Object *ob, BufferHead *bh);
 
   void trim(off_t max=-1);
+  void flush();
 
   bool flush(Object *o);
   off_t release(Object *o);
index 8d75ec2ceb845d6d60adc581ef74d9ce67655ea5..fa7ab29c88d2aeebe66ed9bf171f3698fcbdd0a2 100644 (file)
@@ -14,7 +14,7 @@
 
 #include "config.h"
 #undef dout
-#define dout(x)  if (x <= g_conf.debug || x <= g_conf.debug_filer) cout << messenger->get_myaddr() << ".objecter "
+#define dout(x)  if (x <= g_conf.debug || x <= g_conf.debug_objecter) cout << messenger->get_myaddr() << ".objecter "
 
 
 // messages ------------------------------