From: Somnath Roy Date: Sat, 24 May 2014 02:20:46 +0000 (-0700) Subject: WorkQueue: Taking care of potential race condition during pause() X-Git-Tag: v0.83~110^2~6 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=b05da1ca9c0b9bc68605bfe152a9a03c72499d0e;p=ceph.git WorkQueue: Taking care of potential race condition during pause() Introduced two variables to keep track of number of threads paused and drained during threadpool pause/drain. The pause()/drain() call is waiting till number of pause/drain threads equals to toral number of thread pool threads. Signed-off-by: Somnath Roy --- diff --git a/src/common/WorkQueue.cc b/src/common/WorkQueue.cc index ea5678c5921..d3815c40f10 100644 --- a/src/common/WorkQueue.cc +++ b/src/common/WorkQueue.cc @@ -257,7 +257,7 @@ void ThreadPool::drain(WorkQueue_* wq) ShardedThreadPool::ShardedThreadPool(CephContext *pcct_, string nm, uint32_t pnum_threads): cct(pcct_), name(nm), lockname(nm + "::lock"), shardedpool_lock(lockname.c_str()), num_threads(pnum_threads), - stop_threads(0), pause_threads(0),drain_threads(0), in_process(0), wq(NULL) {} + stop_threads(0), pause_threads(0),drain_threads(0), in_process(0), num_paused(0), num_drained(0), wq(NULL) {} void ShardedThreadPool::shardedthreadpool_worker(uint32_t thread_index) { @@ -271,21 +271,31 @@ void ShardedThreadPool::shardedthreadpool_worker(uint32_t thread_index) while (!stop_threads.read()) { if(pause_threads.read()) { shardedpool_lock.Lock(); + ++num_paused; + wait_cond.Signal(); while(pause_threads.read()) { cct->get_heartbeat_map()->reset_timeout(hb, 4, 0); shardedpol_cond.WaitInterval(cct, shardedpool_lock, utime_t(2, 0)); } + --num_paused; shardedpool_lock.Unlock(); } - - wq->_process(thread_index, hb, in_process); - - if ((pause_threads.read() != 0) || (drain_threads.read() != 0)) { + if (drain_threads.read()) { shardedpool_lock.Lock(); - wait_cond.Signal(); + if (wq->is_shard_empty(thread_index)) { + ++num_drained; + wait_cond.Signal(); + while (drain_threads.read()) { + cct->get_heartbeat_map()->reset_timeout(hb, 4, 0); + shardedpol_cond.WaitInterval(cct, shardedpool_lock, utime_t(2, 0)); + } + --num_drained; + } shardedpool_lock.Unlock(); } + wq->_process(thread_index, hb); + } ldout(cct,10) << "sharded worker finish" << dendl; @@ -341,7 +351,7 @@ void ShardedThreadPool::pause() pause_threads.set(1); assert (wq != NULL); wq->return_waiting_threads(); - while (in_process.read()){ + while (num_threads != num_paused){ wait_cond.Wait(shardedpool_lock); } shardedpool_lock.Unlock(); @@ -376,7 +386,7 @@ void ShardedThreadPool::drain() drain_threads.set(1); assert (wq != NULL); wq->return_waiting_threads(); - while (in_process.read() || !wq->is_all_shard_empty()){ + while (num_threads != num_drained) { wait_cond.Wait(shardedpool_lock); } drain_threads.set(0); diff --git a/src/common/WorkQueue.h b/src/common/WorkQueue.h index 6311cd5e049..352ed5ea1b6 100644 --- a/src/common/WorkQueue.h +++ b/src/common/WorkQueue.h @@ -442,6 +442,8 @@ class ShardedThreadPool { atomic_t pause_threads; atomic_t drain_threads; atomic_t in_process; + uint32_t num_paused; + uint32_t num_drained; public: @@ -452,9 +454,9 @@ public: baseShardedWQ(time_t ti, time_t sti):timeout_interval(ti), suicide_interval(sti) {} virtual ~baseShardedWQ() {} - virtual void _process(uint32_t thread_index, heartbeat_handle_d *hb, atomic_t& in_process ) = 0; + virtual void _process(uint32_t thread_index, heartbeat_handle_d *hb ) = 0; virtual void return_waiting_threads() = 0; - virtual bool is_all_shard_empty() = 0; + virtual bool is_shard_empty(uint32_t thread_index) = 0; }; template diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index aa947529164..6eb2df44932 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -7962,7 +7962,7 @@ void OSD::enqueue_op(PG *pg, OpRequestRef op) pg->queue_op(op); } -void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb, atomic_t& in_process ) { +void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb ) { uint32_t shard_index = thread_index % num_shards; @@ -7981,11 +7981,9 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb, a return; } } - pair item = sdata->pqueue.dequeue(); sdata->pg_for_processing[&*(item.first)].push_back(item.second); sdata->sdata_op_ordering_lock.Unlock(); - in_process.inc(); ThreadPool::TPHandle tp_handle(osd->cct, hb, timeout_interval, suicide_interval); (item.first)->lock_suspend_timeout(tp_handle); @@ -8015,7 +8013,6 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb, a osd->dequeue_op(item.first, op, tp_handle); (item.first)->unlock(); - in_process.dec(); } diff --git a/src/osd/OSD.h b/src/osd/OSD.h index 08c1219c00e..08922bd0363 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -1336,7 +1336,7 @@ private: } } - void _process(uint32_t thread_index, heartbeat_handle_d *hb, atomic_t& in_process ); + void _process(uint32_t thread_index, heartbeat_handle_d *hb); void _enqueue(pair item); void _enqueue_front(pair item); @@ -1398,26 +1398,9 @@ private: } } - - bool is_all_shard_empty() { - bool is_empty = true; - for(uint32_t i = 0; i < num_shards; i++) { - ShardData* sdata = shard_list[i]; - assert(NULL != sdata); - sdata->sdata_op_ordering_lock.Lock(); - if (!sdata->pqueue.empty()) { - is_empty = false; - sdata->sdata_op_ordering_lock.Unlock(); - break; - } - sdata->sdata_op_ordering_lock.Unlock(); - } - return is_empty; - - } - bool is_shard_empty(uint32_t shard_index) { - + bool is_shard_empty(uint32_t thread_index) { + uint32_t shard_index = thread_index % num_shards; ShardData* sdata = shard_list[shard_index]; assert(NULL != sdata); Mutex::Locker l(sdata->sdata_op_ordering_lock);