]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
objectcacher: only want for commit
authorSage Weil <sage.weil@dreamhost.com>
Fri, 26 Aug 2011 16:47:03 +0000 (09:47 -0700)
committerSage Weil <sage.weil@dreamhost.com>
Fri, 26 Aug 2011 16:47:03 +0000 (09:47 -0700)
There was some old, weird stuff going on here where we would wait for the
ACK and COMMIT separately.  This is just wrong.  Writeback does not
complete until the data is committed on disk.

Simplify by waiting only for commit, removing all the 'ack' code, and
going back to a single callback (flush_set).

I didn't notice this for 05063867e2a54176ffc9bbc73391f52766ab403f; both of
these cleanups are needed to fix this.

Signed-off-by: Sage Weil <sage.weil@dreamhost.com>
src/client/Client.cc
src/osdc/ObjectCacher.cc
src/osdc/ObjectCacher.h

index bb9cafc6cbacefa4d1cfc0574e78353d0310075b..ffdb0c7b87dfdd73d021d606e087d3b178a05e40 100644 (file)
@@ -162,7 +162,6 @@ Client::Client(Messenger *m, MonClient *mc)
   objecter = new Objecter(cct, messenger, monclient, osdmap, client_lock, timer);
   objecter->set_client_incarnation(0);  // client always 0, for now.
   objectcacher = new ObjectCacher(cct, objecter, client_lock, 
-                                 0,                            // all ack callback
                                  client_flush_set_callback,    // all commit callback
                                  (void*)this);
   filer = new Filer(objecter);
index a23cf97ea6fac9b6e47ccb138fcbb56ad732042a..ac2a60a6a8e809fc0d7e6d8e7d08829b90c2c1bb 100644 (file)
@@ -18,9 +18,9 @@
 
 ObjectCacher::
 ObjectCacher(CephContext *cct_, Objecter *o, Mutex& l, flush_set_callback_t flush_callback,
-              flush_set_callback_t commit_callback, void *callback_arg) : 
+            void *flush_callback_arg) : 
     cct(cct_), objecter(o), filer(o), lock(l),
-    flush_set_callback(flush_callback), commit_set_callback(commit_callback), flush_set_callback_arg(callback_arg),
+    flush_set_callback(flush_callback), flush_set_callback_arg(flush_callback_arg),
     flusher_stop(false), flusher_thread(this),
     stat_waiter(0),
     stat_clean(0), stat_dirty(0), stat_rx(0), stat_tx(0), stat_missing(0) {
@@ -539,8 +539,6 @@ void ObjectCacher::bh_write(BufferHead *bh)
   ldout(cct, 7) << "bh_write " << *bh << dendl;
   
   // finishers
-  C_WriteAck *onack = new C_WriteAck(this, bh->ob->oloc.pool,
-                                     bh->ob->get_soid(), bh->start(), bh->length());
   C_WriteCommit *oncommit = new C_WriteCommit(this, bh->ob->oloc.pool,
                                               bh->ob->get_soid(), bh->start(), bh->length());
 
@@ -551,10 +549,9 @@ void ObjectCacher::bh_write(BufferHead *bh)
                              bh->start(), bh->length(),
                              bh->snapc, bh->bl, bh->last_write, 0,
                              oset->truncate_size, oset->truncate_seq,
-                             onack, oncommit);
+                                   NULL, oncommit);
 
   // set bh last_write_tid
-  onack->tid = tid;
   oncommit->tid = tid;
   bh->ob->last_write_tid = tid;
   bh->last_write_tid = tid;
@@ -579,9 +576,9 @@ void ObjectCacher::lock_ack(int poolid, list<sobject_t>& oids, tid_t tid)
     list<Context*> ls;
 
     // waiters?
-    if (ob->waitfor_ack.count(tid)) {
-      ls.splice(ls.end(), ob->waitfor_ack[tid]);
-      ob->waitfor_ack.erase(tid);
+    if (ob->waitfor_commit.count(tid)) {
+      ls.splice(ls.end(), ob->waitfor_commit[tid]);
+      ob->waitfor_commit.erase(tid);
     }
     
     assert(tid <= ob->last_write_tid);
@@ -610,7 +607,7 @@ void ObjectCacher::lock_ack(int poolid, list<sobject_t>& oids, tid_t tid)
         assert(0);
       }
       
-      ob->last_ack_tid = tid;
+      ob->last_commit_tid = tid;
       
       if (ob->can_close())
         close_object(ob);
@@ -624,17 +621,17 @@ void ObjectCacher::lock_ack(int poolid, list<sobject_t>& oids, tid_t tid)
   }
 }
 
-void ObjectCacher::bh_write_ack(int poolid, sobject_t oid, loff_t start, uint64_t length, tid_t tid)
+void ObjectCacher::bh_write_commit(int poolid, sobject_t oid, loff_t start, uint64_t length, tid_t tid)
 {
   //lock.Lock();
   
-  ldout(cct, 7) << "bh_write_ack " 
+  ldout(cct, 7) << "bh_write_commit " 
           << oid 
           << " tid " << tid
           << " " << start << "~" << length
           << dendl;
   if (objects[poolid].count(oid) == 0) {
-    ldout(cct, 7) << "bh_write_ack no object cache" << dendl;
+    ldout(cct, 7) << "bh_write_commit no object cache" << dendl;
     assert(0);
   } else {
     Object *ob = objects[poolid][oid];
@@ -649,65 +646,30 @@ void ObjectCacher::bh_write_ack(int poolid, sobject_t oid, loff_t start, uint64_
 
       if (bh->start() < start &&
           bh->end() > start+(loff_t)length) {
-        ldout(cct, 20) << "bh_write_ack skipping " << *bh << dendl;
+        ldout(cct, 20) << "bh_write_commit skipping " << *bh << dendl;
         continue;
       }
       
       // make sure bh is tx
       if (!bh->is_tx()) {
-        ldout(cct, 10) << "bh_write_ack skipping non-tx " << *bh << dendl;
+        ldout(cct, 10) << "bh_write_commit skipping non-tx " << *bh << dendl;
         continue;
       }
       
       // make sure bh tid matches
       if (bh->last_write_tid != tid) {
         assert(bh->last_write_tid > tid);
-        ldout(cct, 10) << "bh_write_ack newer tid on " << *bh << dendl;
+        ldout(cct, 10) << "bh_write_commit newer tid on " << *bh << dendl;
         continue;
       }
       
       // ok!  mark bh clean.
       mark_clean(bh);
-      ldout(cct, 10) << "bh_write_ack clean " << *bh << dendl;
+      ldout(cct, 10) << "bh_write_commit clean " << *bh << dendl;
     }
     
-    // update object last_ack.
-    assert(ob->last_ack_tid < tid);
-    ob->last_ack_tid = tid;
-
-    // waiters?
-    if (ob->waitfor_ack.count(tid)) {
-      list<Context*> ls;
-      ls.splice(ls.begin(), ob->waitfor_ack[tid]);
-      ob->waitfor_ack.erase(tid);
-      finish_contexts(cct, ls);
-    }
-
-    // is the entire object set now clean?
-    if (flush_set_callback && ob->oset->dirty_or_tx == 0) {
-      flush_set_callback(flush_set_callback_arg, ob->oset);
-    }
-  }
-  //lock.Unlock();
-}
-
-void ObjectCacher::bh_write_commit(int poolid, sobject_t oid, loff_t start, uint64_t length, tid_t tid)
-{
-  //lock.Lock();
-  
-  // update object last_commit
-  ldout(cct, 7) << "bh_write_commit " 
-          << oid 
-          << " tid " << tid
-          << " " << start << "~" << length
-          << dendl;
-  if (objects[poolid].count(oid) == 0) {
-    ldout(cct, 7) << "bh_write_commit no object cache" << dendl;
-    //assert(0);
-  } else {
-    Object *ob = objects[poolid][oid];
-    
     // update last_commit.
+    assert(ob->last_commit_tid < tid);
     ob->last_commit_tid = tid;
 
     // waiters?
@@ -719,23 +681,19 @@ void ObjectCacher::bh_write_commit(int poolid, sobject_t oid, loff_t start, uint
     }
 
     // is the entire object set now clean and fully committed?
-    if (commit_set_callback &&
-       ob->last_commit_tid == ob->last_write_tid) {
-      ObjectSet *oset = ob->oset;
-      if (ob->can_close())
-       close_object(ob);
-      if (commit_set_callback) {
-       if (oset->dirty_or_tx == 0) {        // nothing dirty/tx
-         commit_set_callback(flush_set_callback_arg, oset);      
-       }
-      }
+    ObjectSet *oset = ob->oset;
+    if (ob->can_close())
+      close_object(ob);
+
+    // is the entire object set now clean?
+    if (flush_set_callback &&
+       oset->dirty_or_tx == 0) {        // nothing dirty/tx
+      flush_set_callback(flush_set_callback_arg, oset);      
     }
   }
-
-  //  lock.Unlock();
+  //lock.Unlock();
 }
 
-
 void ObjectCacher::flush(loff_t amount)
 {
   utime_t cutoff = ceph_clock_now(cct);
@@ -1494,7 +1452,7 @@ bool ObjectCacher::flush_set(ObjectSet *oset, Context *onfinish)
                << " on " << *ob
                << dendl;
       if (onfinish != NULL)
-        ob->waitfor_ack[ob->last_write_tid].push_back(gather.new_sub());
+        ob->waitfor_commit[ob->last_write_tid].push_back(gather.new_sub());
     }
   }
   if (onfinish != NULL)
@@ -1712,9 +1670,9 @@ void ObjectCacher::truncate_set(ObjectSet *oset, vector<ObjectExtent>& exls)
   }
 
   // did we truncate off dirty data?
-  if (commit_set_callback &&
+  if (flush_set_callback &&
       were_dirty && oset->dirty_or_tx == 0)
-    commit_set_callback(flush_set_callback_arg, oset);
+    flush_set_callback(flush_set_callback_arg, oset);
 }
 
 
index 73fd7abd35804a136503e522dcd50d46984e6822..f19ca63ae946abd4bb1a67a062074a3e6ad75295 100644 (file)
@@ -145,12 +145,10 @@ class ObjectCacher {
     map<loff_t, BufferHead*>     data;
 
     tid_t last_write_tid;  // version of bh (if non-zero)
-    tid_t last_ack_tid;    // last update acked.
     tid_t last_commit_tid; // last update commited.
 
     int dirty_or_tx;
 
-    map< tid_t, list<Context*> > waitfor_ack;
     map< tid_t, list<Context*> > waitfor_commit;
     list<Context*> waitfor_rd;
     list<Context*> waitfor_wr;
@@ -176,7 +174,7 @@ class ObjectCacher {
     Object(ObjectCacher *_oc, sobject_t o, ObjectSet *os, object_locator_t& l) : 
       oc(_oc),
       oid(o), oset(os), set_item(this), oloc(l),
-      last_write_tid(0), last_ack_tid(0), last_commit_tid(0),
+      last_write_tid(0), last_commit_tid(0),
       dirty_or_tx(0),
       lock_state(LOCK_NONE), wrlock_ref(0), rdlock_ref(0) {
       // add to set
@@ -198,7 +196,7 @@ class ObjectCacher {
 
     bool can_close() {
       return data.empty() && lock_state == LOCK_NONE &&
-        waitfor_ack.empty() && waitfor_commit.empty() &&
+        waitfor_commit.empty() &&
         waitfor_rd.empty() && waitfor_wr.empty() &&
        dirty_or_tx == 0;
     }
@@ -275,7 +273,7 @@ class ObjectCacher {
  private:
   Mutex& lock;
   
-  flush_set_callback_t flush_set_callback, commit_set_callback;
+  flush_set_callback_t flush_set_callback;
   void *flush_set_callback_arg;
 
   vector<hash_map<sobject_t, Object*> > objects; // indexed by pool_id
@@ -466,7 +464,6 @@ class ObjectCacher {
 
  public:
   void bh_read_finish(int poolid, sobject_t oid, loff_t offset, uint64_t length, bufferlist &bl);
-  void bh_write_ack(int poolid, sobject_t oid, loff_t offset, uint64_t length, tid_t t);
   void bh_write_commit(int poolid, sobject_t oid, loff_t offset, uint64_t length, tid_t t);
   void lock_ack(int poolid, list<sobject_t>& oids, tid_t tid);
 
@@ -485,20 +482,6 @@ class ObjectCacher {
     }
   };
 
-  class C_WriteAck : public Context {
-    ObjectCacher *oc;
-    int poolid;
-    sobject_t oid;
-    loff_t start;
-    uint64_t length;
-  public:
-    tid_t tid;
-    C_WriteAck(ObjectCacher *c, int _poolid, sobject_t o, loff_t s, uint64_t l) :
-      oc(c), poolid(_poolid), oid(o), start(s), length(l) {}
-    void finish(int r) {
-      oc->bh_write_ack(poolid, oid, start, length, tid);
-    }
-  };
   class C_WriteCommit : public Context {
     ObjectCacher *oc;
     int poolid;
@@ -533,8 +516,7 @@ class ObjectCacher {
  public:
   ObjectCacher(CephContext *cct_, Objecter *o, Mutex& l,
               flush_set_callback_t flush_callback,
-              flush_set_callback_t commit_callback,
-              void *callback_arg);
+              void *flush_callback_arg);
   ~ObjectCacher() {
     // we should be empty.
     for (vector<hash_map<sobject_t, Object *> >::iterator i = objects.begin();
@@ -693,7 +675,7 @@ inline ostream& operator<<(ostream& out, ObjectCacher::Object &ob)
 {
   out << "object["
       << ob.get_soid() << " oset " << ob.oset << dec
-      << " wr " << ob.last_write_tid << "/" << ob.last_ack_tid << "/" << ob.last_commit_tid;
+      << " wr " << ob.last_write_tid << "/" << ob.last_commit_tid;
 
   switch (ob.lock_state) {
   case ObjectCacher::Object::LOCK_WRLOCKING: out << " wrlocking"; break;