]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
C_Gather: Rewrite for thread safety.
authorGreg Farnum <gregf@hq.newdream.net>
Sat, 15 Jan 2011 00:11:01 +0000 (16:11 -0800)
committerGreg Farnum <gregf@hq.newdream.net>
Sat, 15 Jan 2011 00:11:01 +0000 (16:11 -0800)
Previously, C_Gather wasn't thread safe at all,
and there was an issue with creating subs while some
subs were being finished.
These issues are now fixed.

Signed-off-by: Greg Farnum <gregf@hq.newdream.net>
src/include/Context.h

index 1591602cc2be444a01aaf67ee24df67b0c636b2e..655ea5b33e0a05507e29e5f21d5933fb31af9a19 100644 (file)
@@ -114,16 +114,20 @@ class C_Gather : public Context {
 private:
   int result;
   Context *onfinish;
-  std::set<int> waitfor;
-  int num;
+  std::set<Context*> waitfor;
+  int sub_created_count;
+  int sub_existing_count;
+  Mutex lock;
   bool any;  /* if true, OR, otherwise, AND */
   bool activated;
 
-  bool sub_finish(void *sub, int num, int r) {
-    assert(waitfor.count(num));
-    waitfor.erase(num);
+  bool sub_finish(Context* sub, int r) {
+    Mutex::Locker l(lock);
+    assert(waitfor.count(sub));
+    waitfor.erase(sub);
+    --sub_existing_count;
 
-    //generic_dout(0) << "C_Gather " << this << ".sub_finish(r=" << r << ") " << sub << " " << num << " of " << waitfor << dendl;
+    //generic_dout(0) << "C_Gather " << this << ".sub_finish(r=" << r << ") " << sub << " " << dendl;
 
     if (r < 0 && result == 0)
       result = r;
@@ -132,17 +136,21 @@ private:
       return false;  // no finisher set yet, ignore.
 
     if (any && onfinish) {
+      lock.Unlock();
       onfinish->finish(result);
+      lock.Lock();
       delete onfinish;
       onfinish = 0;
     }
 
-    if (!waitfor.empty()) 
+    if (sub_existing_count)
       return false;  // more subs left
 
     // last one
     if (!any && onfinish) {
+      lock.Unlock();
       onfinish->finish(result);
+      lock.Lock();
       delete onfinish;
       onfinish = 0;
     }
@@ -151,50 +159,58 @@ private:
 
   class C_GatherSub : public Context {
     C_Gather *gather;
-    int num;
   public:
-    C_GatherSub(C_Gather *g, int n) : gather(g), num(n) {}
+    C_GatherSub(C_Gather *g) : gather(g) {}
     void finish(int r) {
-      if (gather->sub_finish((void *)this, num, r)) {
+      if (gather->sub_finish(this, r))
        delete gather;   // last one!
-       gather = 0;
-      }
+      gather = 0;
     }
     ~C_GatherSub() {
       if (gather)
-       gather->rm_sub(num);
+       gather->rm_sub(this);
     }
   };
 
 public:
-  C_Gather(Context *f=0, bool an=false) : result(0), onfinish(f), num(0), any(an),
+  C_Gather(Context *f=0, bool an=false) : result(0), onfinish(f), sub_created_count(0),
+                                          sub_existing_count(0),
+                                          lock("C_Gather::lock", true, false), //disable lockdep
+                                          any(an),
                                          activated(onfinish ? true : false) {
     //generic_dout(0) << "C_Gather " << this << ".new" << dendl;
   }
   ~C_Gather() {
     //generic_dout(0) << "C_Gather " << this << ".delete" << dendl;
+    assert(sub_existing_count == 0);
+    assert(waitfor.empty());
     assert(!onfinish);
   }
 
   void set_finisher(Context *c) {
+    Mutex::Locker l(lock);
     assert(!onfinish);
     onfinish = c;
     activated = true;
   }
   Context *new_sub() {
-    num++;
-    waitfor.insert(num);
-    Context *s = new C_GatherSub(this, num);
-    //generic_dout(0) << "C_Gather " << this << ".new_sub " << num << " " << s << dendl;
+    Mutex::Locker l(lock);
+    sub_created_count++;
+    sub_existing_count++;
+    Context *s = new C_GatherSub(this);
+    waitfor.insert(s);
+    //generic_dout(0) << "C_Gather " << this << ".new_sub is " << sub_created_count << " " << s << dendl;
     return s;
   }
-  void rm_sub(int n) {
-    num--;
-    waitfor.erase(n);
+  void rm_sub(Context *s) {
+    Mutex::Locker l(lock);
+    assert(waitfor.count(s));
+    waitfor.erase(s);
+    sub_existing_count--;
   }
 
-  bool empty() { return num == 0; }
-  int get_num() { return num; }
+  bool empty() { Mutex::Locker l(lock); return sub_existing_count == 0; }
+  int get_num() { Mutex::Locker l(lock); return sub_created_count; }
 
   void finish(int r) {
     assert(0);    // nobody should ever call me.