]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
flusher.
authorsage <sage@29311d96-e01e-0410-9327-a35deaab8ce9>
Mon, 2 Oct 2006 03:50:44 +0000 (03:50 +0000)
committersage <sage@29311d96-e01e-0410-9327-a35deaab8ce9>
Mon, 2 Oct 2006 03:50:44 +0000 (03:50 +0000)
failure upcall.

git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@882 29311d96-e01e-0410-9327-a35deaab8ce9

ceph/osdc/ObjectCacher.cc
ceph/osdc/ObjectCacher.h
ceph/osdc/Objecter.cc
ceph/osdc/Objecter.h

index 2708e9c3b9a2d273fea2225222ca4027f9cb8ec5..d7802153b6b602234b20e20d7d0e68575955565d 100644 (file)
@@ -64,12 +64,12 @@ ObjectCacher::BufferHead *ObjectCacher::Object::split(BufferHead *bh, off_t off)
 }
 
 
-void ObjectCacher::Object::merge(BufferHead *left, BufferHead *right)
+void ObjectCacher::Object::merge_left(BufferHead *left, BufferHead *right)
 {
   assert(left->end() == right->start());
   assert(left->get_state() == right->get_state());
 
-  dout(10) << "merge " << *left << " + " << *right << endl;
+  dout(10) << "merge_left " << *left << " + " << *right << endl;
   oc->bh_remove(this, right);
   oc->bh_stat_sub(left);
   left->set_length( left->length() + right->length());
@@ -79,8 +79,9 @@ void ObjectCacher::Object::merge(BufferHead *left, BufferHead *right)
   left->bl.claim_append(right->bl);
   
   // version 
-  // note: this is sorta busted, but shouldn't be used, cuz we're pbly about to write.. right?
+  // note: this is sorta busted, but should only be used for dirty buffers
   left->last_write_tid =  MAX( left->last_write_tid, right->last_write_tid );
+  left->last_write = MAX( left->last_write, right->last_write );
 
   // waiters
   for (map<off_t, list<Context*> >::iterator p = right->waitfor_read.begin();
@@ -92,9 +93,54 @@ void ObjectCacher::Object::merge(BufferHead *left, BufferHead *right)
   // hose right
   delete right;
 
-  dout(10) << "merge result " << *left << endl;
+  dout(10) << "merge_left result " << *left << endl;
 }
 
+/* buggy possibly, but more importnatly, unnecessary.
+void ObjectCacher::Object::merge_right(BufferHead *left, BufferHead *right)
+{
+  assert(left->end() == right->start());
+  assert(left->get_state() == right->get_state());
+
+  dout(10) << "merge_right " << *left << " + " << *right << endl;
+  oc->bh_remove(this, left);
+  oc->bh_stat_sub(right);
+  data.erase(right->start());
+  right->set_start( left->start() );
+  data[right->start()] = right;
+  right->set_length( left->length() + right->length());
+  oc->bh_stat_add(right);
+
+  // data
+  bufferlist nbl;
+  nbl.claim(left->bl);
+  nbl.claim_append(right->bl);
+  right->bl.claim(nbl);
+  
+  // version 
+  // note: this is sorta busted, but should only be used for dirty buffers
+  right->last_write_tid =  MAX( left->last_write_tid, right->last_write_tid );
+
+  // waiters
+  map<off_t,list<Context*> > old;
+  old.swap(right->waitfor_read);
+
+  // take left's waiters
+  right->waitfor_read.swap(left->waitfor_read);
+
+  // shift old waiters
+  for (map<off_t, list<Context*> >::iterator p = old.begin();
+          p != old.end();
+          p++) 
+       right->waitfor_read[p->first + left->length()].swap( p->second );
+  
+  // hose left
+  delete left;
+
+  dout(10) << "merge_right result " << *right << endl;
+}
+*/
+
 /*
  * map a range of bytes into buffer_heads.
  * - create missing buffer_heads as necessary.
@@ -189,6 +235,7 @@ int ObjectCacher::Object::map_read(Objecter::OSDRead *rd,
  * map a range of extents on an object's buffer cache.
  * - combine any bh's we're writing into one
  * - break up bufferheads that don't fall completely within the range
+ * //no! - return a bh that includes the write.  may also include other dirty data to left and/or right.
  */
 ObjectCacher::BufferHead *ObjectCacher::Object::map_write(Objecter::OSDWrite *wr)
 {
@@ -211,7 +258,15 @@ ObjectCacher::BufferHead *ObjectCacher::Object::map_write(Objecter::OSDWrite *wr
     
     if (p != data.begin() && 
         (p == data.end() || p->first > cur)) {
-      p--;     // might overlap!
+      p--;     // might overlap or butt up!
+
+         /*// dirty and butts up?
+         if (p->first + p->second->length() == cur &&
+                 p->second->is_dirty()) {
+               dout(10) << "map_write will append to tail of " << *p->second << endl;
+               final = p->second;
+         }
+         */
       if (p->first + p->second->length() <= cur) 
         p++;   // doesn't overlap.
     }    
@@ -241,33 +296,42 @@ ObjectCacher::BufferHead *ObjectCacher::Object::map_write(Objecter::OSDWrite *wr
         BufferHead *bh = p->second;
         dout(10) << "map_write bh " << *bh << " intersected" << endl;
         
-        if (p->first < cur) {
+               /*if (bh->is_dirty()) {
+                 // already dirty, let's use it.
+                 final = bh;
+               } else {
+               */
+               if (p->first < cur) {
                  assert(final == 0);
-          if (cur + max >= p->first + p->second->length()) {
-            // we want right bit (one splice)
+                 if (cur + max >= p->first + p->second->length()) {
+                       // we want right bit (one splice)
                        final = split(bh, cur);   // just split it, take right half.
-            p++;
-            assert(p->second == final);
-          } else {
-            // we want middle bit (two splices)
+                       p++;
+                       assert(p->second == final);
+                 } else {
+                       // we want middle bit (two splices)
                        final = split(bh, cur);
                        p++;
                        assert(p->second == final);
                        split(final, cur+max);
                  }
-        } else if (p->first == cur) {
-          if (p->second->length() <= max) {
-            // whole bufferhead, piece of cake.
-          } else {
-            // we want left bit (one splice)
+               } else if (p->first == cur) {
+                 /*if (bh->is_dirty()) {
+                         // already dirty, use it.
+                       } 
+                       else*/
+                 if (p->second->length() <= max) {
+                       // whole bufferhead, piece of cake.
+                 } else {
+                       // we want left bit (one splice)
                        split(bh, cur + max);        // just split
                  }
                  if (final) 
-                       merge(final,bh);
+                       merge_left(final,bh);
                  else
                        final = bh;
-        }
-        
+               }
+               
         // keep going.
         off_t lenfromcur = final->end() - cur;
         cur += lenfromcur;
@@ -312,15 +376,17 @@ ObjectCacher::BufferHead *ObjectCacher::Object::map_write(Objecter::OSDWrite *wr
 
 /* private */
 
-void ObjectCacher::bh_read(Object *ob, BufferHead *bh)
+void ObjectCacher::bh_read(BufferHead *bh)
 {
   dout(7) << "bh_read on " << *bh << endl;
 
+  mark_rx(bh);
+
   // finisher
-  C_ReadFinish *onfinish = new C_ReadFinish(this, ob->get_oid(), bh->start(), bh->length());
+  C_ReadFinish *onfinish = new C_ReadFinish(this, bh->ob->get_oid(), bh->start(), bh->length());
 
   // go
-  objecter->read(ob->get_oid(), bh->start(), bh->length(), &onfinish->bl,
+  objecter->read(bh->ob->get_oid(), bh->start(), bh->length(), &onfinish->bl,
                                 onfinish);
 }
 
@@ -350,24 +416,29 @@ void ObjectCacher::bh_read_finish(object_t oid, off_t start, size_t length, buff
                                << opos << "~" << bh->start() - opos 
                                << endl;
                opos = bh->start();
-               p++;
                continue;
          }
          
          if (!bh->is_rx()) {
                dout(10) << "bh_read_finish skipping non-rx " << *bh << endl;
+               opos = bh->end();
+               p++;
                continue;
          }
          
+         assert(opos >= bh->start());
          assert(bh->start() == opos);   // we don't merge rx bh's... yet!
-         assert(bh->length() < start+(off_t)length-opos);
+         assert(bh->length() <= start+(off_t)length-opos);
          
          bh->bl.substr_of(bl,
-                                          start+length-opos,
+                                          opos-bh->start(),
                                           bh->length());
          mark_clean(bh);
          dout(10) << "bh_read_finish read " << *bh << endl;
-
+         
+         opos = bh->end();
+         p++;
+         
          // finishers?
          // called with lock held.
          list<Context*> ls;
@@ -383,22 +454,22 @@ void ObjectCacher::bh_read_finish(object_t oid, off_t start, size_t length, buff
 }
 
 
-void ObjectCacher::bh_write(Object *ob, BufferHead *bh)
+void ObjectCacher::bh_write(BufferHead *bh)
 {
   dout(7) << "bh_write " << *bh << endl;
   
   // finishers
-  C_WriteAck *onack = new C_WriteAck(this, ob->get_oid(), bh->start(), bh->length());
-  C_WriteCommit *oncommit = new C_WriteCommit(this, ob->get_oid(), bh->start(), bh->length());
+  C_WriteAck *onack = new C_WriteAck(this, bh->ob->get_oid(), bh->start(), bh->length());
+  C_WriteCommit *oncommit = new C_WriteCommit(this, bh->ob->get_oid(), bh->start(), bh->length());
 
   // go
-  tid_t tid = objecter->write(ob->get_oid(), bh->start(), bh->length(), bh->bl,
+  tid_t tid = objecter->write(bh->ob->get_oid(), bh->start(), bh->length(), bh->bl,
                                                          onack, oncommit);
 
   // set bh last_write_tid
   onack->tid = tid;
   oncommit->tid = tid;
-  ob->last_write_tid = tid;
+  bh->ob->last_write_tid = tid;
   bh->last_write_tid = tid;
 
   mark_tx(bh);
@@ -482,13 +553,13 @@ void ObjectCacher::bh_write_ack(object_t oid, off_t start, size_t length, tid_t
        Object *ob = objects[oid];
        
        // apply to bh's!
-       off_t opos = start;
-       
-       for (map<off_t, BufferHead*>::iterator p = ob->data.lower_bound(opos);
-                p != ob->data.end() && opos < start+(off_t)length;
+       for (map<off_t, BufferHead*>::iterator p = ob->data.lower_bound(start);
+                p != ob->data.end();
                 p++) {
          BufferHead *bh = p->second;
          
+         if (bh->start() > start+(off_t)length) break;
+
          if (bh->start() < start &&
                  bh->end() > start+(off_t)length) {
                dout(20) << "bh_write_ack skipping " << *bh << endl;
@@ -560,22 +631,25 @@ void ObjectCacher::bh_write_commit(object_t oid, off_t start, size_t length, tid
 }
 
 
-void ObjectCacher::flush()
+void ObjectCacher::flush(off_t amount)
 {
   utime_t cutoff = g_clock.now();
   //cutoff.sec_ref() -= g_conf.client_oc_max_dirty_age;
 
-  dout(10) << "flush" << endl;
+  dout(10) << "flush " << amount << endl;
   
-  while (1) {
+  off_t did = 0;
+  while (amount == 0 || did < amount) {
        BufferHead *bh = (BufferHead*) lru_dirty.lru_get_next_expire();
        if (!bh) break;
        if (bh->last_write > cutoff) break;
 
-       bh_write(bh->ob, bh);
+       did += bh->length();
+       bh_write(bh);
   }    
 }
 
+
 void ObjectCacher::trim(off_t max)
 {
   if (max < 0) 
@@ -633,12 +707,12 @@ int ObjectCacher::readx(Objecter::OSDRead *rd, inodeno_t ino, Context *onfinish)
        map<off_t, BufferHead*> hits, missing, rx;
     o->map_read(rd, hits, missing, rx);
        
-       if (!missing.empty() && !rx.empty()) {
+       if (!missing.empty() || !rx.empty()) {
          // read missing
          for (map<off_t, BufferHead*>::iterator bh_it = missing.begin();
                   bh_it != missing.end();
                   bh_it++) {
-               bh_read(o, bh_it->second);
+               bh_read(bh_it->second);
                if (success) {
                  dout(10) << "readx missed, waiting on " << *bh_it->second 
                                   << " off " << bh_it->first << endl;
@@ -660,6 +734,8 @@ int ObjectCacher::readx(Objecter::OSDRead *rd, inodeno_t ino, Context *onfinish)
                }
          }       
        } else {
+         assert(!hits.empty());
+
          // make a plain list
          for (map<off_t, BufferHead*>::iterator bh_it = hits.begin();
                   bh_it != hits.end();
@@ -728,9 +804,11 @@ int ObjectCacher::readx(Objecter::OSDRead *rd, inodeno_t ino, Context *onfinish)
           i != stripe_map.end();
           i++) {
        assert(pos == i->first);
+       dout(10) << "readx  adding buffer len " << i->second.length() << " at " << pos << endl;
        pos += i->second.length();
        rd->bl->claim_append(i->second);
   }
+  dout(10) << "readx  result is " << rd->bl->length() << endl;
 
   trim();
   
@@ -760,6 +838,7 @@ int ObjectCacher::writex(Objecter::OSDWrite *wr, inodeno_t ino)
        for (map<size_t,size_t>::iterator f_it = ex_it->buffer_extents.begin();
                 f_it != ex_it->buffer_extents.end();
                 f_it++) {
+         dout(10) << "writex writing " << f_it->first << "~" << f_it->second << " into " << *bh << " at " << opos << endl;
          size_t bhoff = bh->start() - opos;
          assert(f_it->second <= bh->length() - bhoff);
 
@@ -775,6 +854,22 @@ int ObjectCacher::writex(Objecter::OSDWrite *wr, inodeno_t ino)
        mark_dirty(bh);
        touch_bh(bh);
        bh->last_write = now;
+
+       // recombine with left?
+       map<off_t,BufferHead*>::iterator p = o->data.find(bh->start());
+       if (p != o->data.begin()) {
+         p--;
+         if (p->second->is_dirty()) {
+               o->merge_left(p->second,bh);
+               bh = p->second;
+         }
+       }
+       // right?
+       p = o->data.find(bh->start());
+       p++;
+       if (p != o->data.end() &&
+               p->second->is_dirty()) 
+         o->merge_left(p->second,bh);
   }
 
   delete wr;
@@ -787,8 +882,9 @@ int ObjectCacher::writex(Objecter::OSDWrite *wr, inodeno_t ino)
 // blocking wait for write.
 void ObjectCacher::wait_for_write(size_t len, Mutex& lock)
 {
-  while (get_stat_dirty() + (off_t)len > g_conf.client_oc_max_dirty) {
+  while (get_stat_dirty() > g_conf.client_oc_max_dirty) {
        dout(10) << "wait_for_write waiting" << endl;
+       flusher_cond.Signal();
        stat_waiter++;
        stat_cond.Wait(lock);
        stat_waiter--;
@@ -796,6 +892,48 @@ void ObjectCacher::wait_for_write(size_t len, Mutex& lock)
   }
 }
 
+void ObjectCacher::flusher_entry()
+{
+  dout(10) << "flusher start" << endl;
+  lock.Lock();
+  while (!flusher_stop) {
+       while (!flusher_stop) {
+         off_t all = get_stat_tx() + get_stat_rx() + get_stat_clean() + get_stat_dirty();
+         dout(11) << "flusher "
+                          << all << " / " << g_conf.client_oc_size << ":  "
+                          << get_stat_tx() << " tx, "
+                          << get_stat_rx() << " rx, "
+                          << get_stat_clean() << " clean, "
+                          << get_stat_dirty() << " / " << g_conf.client_oc_max_dirty << " dirty"
+                          << endl;
+         if (get_stat_dirty() > g_conf.client_oc_max_dirty) {
+               // flush some dirty pages
+               dout(10) << "flusher " 
+                                << get_stat_dirty() << " / " << g_conf.client_oc_max_dirty << " dirty,"
+                                << " flushing some dirty bhs" << endl;
+               flush(get_stat_dirty() - g_conf.client_oc_max_dirty);
+         }
+         else {
+               // check tail of lru for old dirty items
+               utime_t cutoff = g_clock.now();
+               cutoff.sec_ref()--;
+               BufferHead *bh = 0;
+               while ((bh = (BufferHead*)lru_dirty.lru_get_next_expire()) != 0 &&
+                          bh->last_write < cutoff) {
+                 dout(10) << "flusher flushing aged dirty bh " << *bh << endl;
+                 bh_write(bh);
+               }
+               break;
+         }
+       }
+       if (flusher_stop) break;
+       flusher_cond.WaitInterval(lock, utime_t(1,0));
+  }
+  lock.Unlock();
+  dout(10) << "flusher finish" << endl;
+}
+
+
   
 // blocking.  atomic+sync.
 int ObjectCacher::atomic_sync_readx(Objecter::OSDRead *rd, inodeno_t ino, Mutex& lock)
@@ -1129,7 +1267,7 @@ bool ObjectCacher::flush(Object *ob)
        }
        if (!bh->is_dirty()) continue;
        
-       bh_write(ob, bh);
+       bh_write(bh);
        clean = false;
   }
   return clean;
@@ -1319,3 +1457,6 @@ void ObjectCacher::kick_sync_readers(inodeno_t ino)
 
   finish_contexts(ls);
 }
+
+
+
index f2a23cb029055dbcf5bbd7a7ced29ece8b0869cf..c3144e172bc6997102bd9f4e61ebc0c4d4565903 100644 (file)
@@ -6,6 +6,7 @@
 #include "include/Context.h"
 
 #include "common/Cond.h"
+#include "common/Thread.h"
 
 #include "Objecter.h"
 #include "Filer.h"
@@ -173,7 +174,8 @@ class ObjectCacher {
 
        // mid-level
        BufferHead *split(BufferHead *bh, off_t off);
-       void merge(BufferHead *left, BufferHead *right);
+       void merge_left(BufferHead *left, BufferHead *right);
+       void merge_right(BufferHead *left, BufferHead *right);
 
        int map_read(Objecter::OSDRead *rd,
                                 map<off_t, BufferHead*>& hits,
@@ -196,6 +198,20 @@ class ObjectCacher {
   set<BufferHead*>    dirty_bh;
   LRU   lru_dirty, lru_rest;
 
+  Cond flusher_cond;
+  bool flusher_stop;
+  void flusher_entry();
+  class FlusherThread : public Thread {
+       ObjectCacher *oc;
+  public:
+       FlusherThread(ObjectCacher *o) : oc(o) {}
+       void *entry() {
+         oc->flusher_entry();
+         return 0;
+       }
+  } flusher_thread;
+  
+
   // objects
   Object *get_object(object_t oid, inodeno_t ino) {
        // have it?
@@ -290,6 +306,7 @@ class ObjectCacher {
   void mark_tx(BufferHead *bh) { bh_set_state(bh, BufferHead::STATE_TX); };
   void mark_dirty(BufferHead *bh) { 
        bh_set_state(bh, BufferHead::STATE_DIRTY); 
+       lru_dirty.lru_touch(bh);
        //bh->set_dirty_stamp(g_clock.now());
   };
 
@@ -311,11 +328,11 @@ class ObjectCacher {
   }
 
   // io
-  void bh_read(Object *ob, BufferHead *bh);
-  void bh_write(Object *ob, BufferHead *bh);
+  void bh_read(BufferHead *bh);
+  void bh_write(BufferHead *bh);
 
   void trim(off_t max=-1);
-  void flush();
+  void flush(off_t amount=0);
 
   bool flush(Object *o);
   off_t release(Object *o);
@@ -387,8 +404,17 @@ class ObjectCacher {
  public:
   ObjectCacher(Objecter *o, Mutex& l) : 
        objecter(o), filer(o), lock(l),
+       flusher_stop(false), flusher_thread(this),
        stat_waiter(0),
        stat_clean(0), stat_dirty(0), stat_rx(0), stat_tx(0), stat_missing(0) {
+       flusher_thread.create();
+  }
+  ~ObjectCacher() {
+       //lock.Lock();  // hmm.. watch out for deadlock!
+       flusher_stop = true;
+       flusher_cond.Signal();
+       //lock.Unlock();
+       flusher_thread.join();
   }
 
 
@@ -402,7 +428,7 @@ class ObjectCacher {
        void finish(int) {
          int r = oc->readx(rd, ino, onfinish);
          if (r > 0) {
-               onfinish->finish(0);
+               onfinish->finish(r);
                delete onfinish;
          }
        }
index 762ec4e080075f8f1a074f0d7cf9dc6198dcc000..c4ac65e9026a52b9d2b774e602d9ab8a75735ee7 100644 (file)
@@ -10,6 +10,8 @@
 #include "messages/MOSDMap.h"
 #include "messages/MOSDGetMap.h"
 
+#include "messages/MOSDFailure.h"
+
 #include <errno.h>
 
 #include "config.h"
@@ -704,3 +706,9 @@ tid_t Objecter::lockx(OSDLock *l, Context *onack, Context *oncommit)
 
 
 
+Message* Objecter::ms_handle_failure(msg_addr_t dest, entity_inst_t& inst)
+{
+  dout(0) << "ms_handle_failure " << dest << " inst " << inst << endl;
+  messenger->send_message(new MOSDFailure(dest, inst, osdmap->get_epoch()), MSG_ADDR_MON(0));
+  return 0;
+}
index 450f5701159131cc302a9fdc70b307a934f4d4fc..1b5b54fadf9e81231bf108c84355b564c599c4fc 100644 (file)
@@ -167,6 +167,10 @@ class Objecter {
                         Context *onack, Context *oncommit);
 
   tid_t lock(int op, object_t oid, Context *onack, Context *oncommit);
+
+
+  Message* ms_handle_failure(msg_addr_t dest, entity_inst_t& inst);
+
 };
 
 #endif