From 55cf6bad2fcf47c8a74fcb363a27a2cedb59ab52 Mon Sep 17 00:00:00 2001 From: Greg Farnum Date: Fri, 14 Jan 2011 16:11:01 -0800 Subject: [PATCH] C_Gather: Rewrite for thread safety. Previously, C_Gather wasn't thread safe at all, and there was an issue with creating subs while some subs were being finished. These issues are now fixed. Signed-off-by: Greg Farnum --- src/include/Context.h | 62 +++++++++++++++++++++++++++---------------- 1 file changed, 39 insertions(+), 23 deletions(-) diff --git a/src/include/Context.h b/src/include/Context.h index 1591602cc2be4..655ea5b33e0a0 100644 --- a/src/include/Context.h +++ b/src/include/Context.h @@ -114,16 +114,20 @@ class C_Gather : public Context { private: int result; Context *onfinish; - std::set waitfor; - int num; + std::set waitfor; + int sub_created_count; + int sub_existing_count; + Mutex lock; bool any; /* if true, OR, otherwise, AND */ bool activated; - bool sub_finish(void *sub, int num, int r) { - assert(waitfor.count(num)); - waitfor.erase(num); + bool sub_finish(Context* sub, int r) { + Mutex::Locker l(lock); + assert(waitfor.count(sub)); + waitfor.erase(sub); + --sub_existing_count; - //generic_dout(0) << "C_Gather " << this << ".sub_finish(r=" << r << ") " << sub << " " << num << " of " << waitfor << dendl; + //generic_dout(0) << "C_Gather " << this << ".sub_finish(r=" << r << ") " << sub << " " << dendl; if (r < 0 && result == 0) result = r; @@ -132,17 +136,21 @@ private: return false; // no finisher set yet, ignore. if (any && onfinish) { + lock.Unlock(); onfinish->finish(result); + lock.Lock(); delete onfinish; onfinish = 0; } - if (!waitfor.empty()) + if (sub_existing_count) return false; // more subs left // last one if (!any && onfinish) { + lock.Unlock(); onfinish->finish(result); + lock.Lock(); delete onfinish; onfinish = 0; } @@ -151,50 +159,58 @@ private: class C_GatherSub : public Context { C_Gather *gather; - int num; public: - C_GatherSub(C_Gather *g, int n) : gather(g), num(n) {} + C_GatherSub(C_Gather *g) : gather(g) {} void finish(int r) { - if (gather->sub_finish((void *)this, num, r)) { + if (gather->sub_finish(this, r)) delete gather; // last one! - gather = 0; - } + gather = 0; } ~C_GatherSub() { if (gather) - gather->rm_sub(num); + gather->rm_sub(this); } }; public: - C_Gather(Context *f=0, bool an=false) : result(0), onfinish(f), num(0), any(an), + C_Gather(Context *f=0, bool an=false) : result(0), onfinish(f), sub_created_count(0), + sub_existing_count(0), + lock("C_Gather::lock", true, false), //disable lockdep + any(an), activated(onfinish ? true : false) { //generic_dout(0) << "C_Gather " << this << ".new" << dendl; } ~C_Gather() { //generic_dout(0) << "C_Gather " << this << ".delete" << dendl; + assert(sub_existing_count == 0); + assert(waitfor.empty()); assert(!onfinish); } void set_finisher(Context *c) { + Mutex::Locker l(lock); assert(!onfinish); onfinish = c; activated = true; } Context *new_sub() { - num++; - waitfor.insert(num); - Context *s = new C_GatherSub(this, num); - //generic_dout(0) << "C_Gather " << this << ".new_sub " << num << " " << s << dendl; + Mutex::Locker l(lock); + sub_created_count++; + sub_existing_count++; + Context *s = new C_GatherSub(this); + waitfor.insert(s); + //generic_dout(0) << "C_Gather " << this << ".new_sub is " << sub_created_count << " " << s << dendl; return s; } - void rm_sub(int n) { - num--; - waitfor.erase(n); + void rm_sub(Context *s) { + Mutex::Locker l(lock); + assert(waitfor.count(s)); + waitfor.erase(s); + sub_existing_count--; } - bool empty() { return num == 0; } - int get_num() { return num; } + bool empty() { Mutex::Locker l(lock); return sub_existing_count == 0; } + int get_num() { Mutex::Locker l(lock); return sub_created_count; } void finish(int r) { assert(0); // nobody should ever call me. -- 2.39.5