]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
ObjectCacher: always complete flush_set() callback
authorJosh Durgin <josh.durgin@inktank.com>
Fri, 22 Mar 2013 19:13:36 +0000 (12:13 -0700)
committerJosh Durgin <josh.durgin@inktank.com>
Thu, 28 Mar 2013 17:46:58 +0000 (10:46 -0700)
This removes the last remnants of
b5e9995f59d363ba00d9cac413d9b754ee44e370. If there's nothing to flush,
immediately call the callback instead of deleting it. Callers were
assuming they were responsible for completing the callback whenever
flush_set() returned true, and always called complete(0) in this
case. Simplify the interface and just do this in flush_set(), so that
it always calls the callback.

Since C_GatherBuilder deletes its finisher if there are no subs,
only set its finisher when subs are present. This way we can still
call ->complete() for the callback.

Signed-off-by: Josh Durgin <josh.durgin@inktank.com>
src/client/Client.cc
src/osdc/ObjectCacher.cc
src/test/osdc/object_cacher_stress.cc

index 1f5302ca580bfb810605db460888bf50350b2519..e81e60cca69454ab12d0b8775251e6cc420b2082 100644 (file)
@@ -2698,11 +2698,7 @@ bool Client::_flush(Inode *in, Context *onfinish)
   if (!onfinish) {
     onfinish = new C_Client_PutInode(this, in);
   }
-  bool safe = objectcacher->flush_set(&in->oset, onfinish);
-  if (safe) {
-    onfinish->complete(0);
-  }
-  return safe;
+  return objectcacher->flush_set(&in->oset, onfinish);
 }
 
 void Client::_flush_range(Inode *in, int64_t offset, uint64_t size)
index 3c4bc3c66f6fcc69dcbd6d1fe08b20f426b5c30c..33d555e7a3c001c4a9d7318e8b19bec05ae5a6a5 100644 (file)
@@ -1502,14 +1502,14 @@ bool ObjectCacher::flush_set(ObjectSet *oset, Context *onfinish)
   assert(lock.is_locked());
   if (oset->objects.empty()) {
     ldout(cct, 10) << "flush_set on " << oset << " dne" << dendl;
-    delete onfinish;
+    onfinish->complete(0);
     return true;
   }
 
   ldout(cct, 10) << "flush_set " << oset << dendl;
 
   // we'll need to wait for all objects to flush!
-  C_GatherBuilder gather(cct, onfinish);
+  C_GatherBuilder gather(cct);
 
   bool safe = true;
   for (xlist<Object*>::iterator i = oset->objects.begin();
@@ -1528,11 +1528,14 @@ bool ObjectCacher::flush_set(ObjectSet *oset, Context *onfinish)
         ob->waitfor_commit[ob->last_write_tid].push_back(gather.new_sub());
     }
   }
+  if (gather.has_subs())
+    gather.set_finisher(onfinish);
   if (onfinish != NULL)
     gather.activate();
   
   if (safe) {
     ldout(cct, 10) << "flush_set " << oset << " has no dirty|tx bhs" << dendl;
+    onfinish->complete(0);
     return true;
   }
   return false;
@@ -1545,14 +1548,14 @@ bool ObjectCacher::flush_set(ObjectSet *oset, vector<ObjectExtent>& exv, Context
   assert(lock.is_locked());
   if (oset->objects.empty()) {
     ldout(cct, 10) << "flush_set on " << oset << " dne" << dendl;
-    delete onfinish;
+    onfinish->complete(0);
     return true;
   }
 
   ldout(cct, 10) << "flush_set " << oset << " on " << exv.size() << " ObjectExtents" << dendl;
 
   // we'll need to wait for all objects to flush!
-  C_GatherBuilder gather(cct, onfinish);
+  C_GatherBuilder gather(cct);
 
   bool safe = true;
   for (vector<ObjectExtent>::iterator p = exv.begin();
@@ -1576,11 +1579,14 @@ bool ObjectCacher::flush_set(ObjectSet *oset, vector<ObjectExtent>& exv, Context
         ob->waitfor_commit[ob->last_write_tid].push_back(gather.new_sub());
     }
   }
+  if (gather.has_subs())
+    gather.set_finisher(onfinish);
   if (onfinish != NULL)
     gather.activate();
   
   if (safe) {
     ldout(cct, 10) << "flush_set " << oset << " has no dirty|tx bhs" << dendl;
+    onfinish->complete(0);
     return true;
   }
   return false;
index e2c58e8da9ebd38e632db93e113d1a5ba1dc20e5..ef713ae98cc97c79501b2bc004f2b4a38d4fd7c2 100644 (file)
@@ -147,13 +147,11 @@ int stress_test(uint64_t num_ops, uint64_t num_objs,
   bool already_flushed = obc.flush_set(&object_set, onfinish);
   std::cout << "already flushed = " << already_flushed << std::endl;
   lock.Unlock();
-  if (!already_flushed) {
-    mylock.Lock();
-    while (!done) {
-      cond.Wait(mylock);
-    }
-    mylock.Unlock();
+  mylock.Lock();
+  while (!done) {
+    cond.Wait(mylock);
   }
+  mylock.Unlock();
 
   lock.Lock();
   bool unclean = obc.release_set(&object_set);