]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
WorkQueue: Taking care of potential race condition during pause()
authorSomnath Roy <somnath.roy@sandisk.com>
Sat, 24 May 2014 02:20:46 +0000 (19:20 -0700)
committerSomnath Roy <somnath.roy@sandisk.com>
Sat, 31 May 2014 01:46:08 +0000 (18:46 -0700)
Introduced two variables to keep track of number of threads paused
and drained during threadpool pause/drain. The pause()/drain() call
is waiting till number of pause/drain threads equals to toral number of
thread pool threads.

Signed-off-by: Somnath Roy <somnath.roy@sandisk.com>
src/common/WorkQueue.cc
src/common/WorkQueue.h
src/osd/OSD.cc
src/osd/OSD.h

index ea5678c5921d08769b391e2595cd0174143835c4..d3815c40f109ceda99b8518a751883795196a457 100644 (file)
@@ -257,7 +257,7 @@ void ThreadPool::drain(WorkQueue_* wq)
 
 ShardedThreadPool::ShardedThreadPool(CephContext *pcct_, string nm, uint32_t pnum_threads):
   cct(pcct_), name(nm), lockname(nm + "::lock"), shardedpool_lock(lockname.c_str()), num_threads(pnum_threads), 
-  stop_threads(0), pause_threads(0),drain_threads(0), in_process(0), wq(NULL) {}
+  stop_threads(0), pause_threads(0),drain_threads(0), in_process(0), num_paused(0), num_drained(0), wq(NULL) {}
 
 void ShardedThreadPool::shardedthreadpool_worker(uint32_t thread_index)
 {
@@ -271,21 +271,31 @@ void ShardedThreadPool::shardedthreadpool_worker(uint32_t thread_index)
   while (!stop_threads.read()) {
     if(pause_threads.read()) {
       shardedpool_lock.Lock();
+      ++num_paused;
+      wait_cond.Signal();
       while(pause_threads.read()) {
        cct->get_heartbeat_map()->reset_timeout(hb, 4, 0);
        shardedpol_cond.WaitInterval(cct, shardedpool_lock, utime_t(2, 0));
       }
+      --num_paused;
       shardedpool_lock.Unlock();
     }
-
-    wq->_process(thread_index, hb, in_process);
-
-    if ((pause_threads.read() != 0) || (drain_threads.read() != 0)) {
+    if (drain_threads.read()) {
       shardedpool_lock.Lock();
-      wait_cond.Signal();
+      if (wq->is_shard_empty(thread_index)) {
+        ++num_drained;
+        wait_cond.Signal();
+        while (drain_threads.read()) {
+          cct->get_heartbeat_map()->reset_timeout(hb, 4, 0);
+          shardedpol_cond.WaitInterval(cct, shardedpool_lock, utime_t(2, 0));
+        }
+        --num_drained;
+      }
       shardedpool_lock.Unlock();
     }
 
+    wq->_process(thread_index, hb);
+
   }
 
   ldout(cct,10) << "sharded worker finish" << dendl;
@@ -341,7 +351,7 @@ void ShardedThreadPool::pause()
   pause_threads.set(1);
   assert (wq != NULL);
   wq->return_waiting_threads();
-  while (in_process.read()){
+  while (num_threads != num_paused){
     wait_cond.Wait(shardedpool_lock);
   }
   shardedpool_lock.Unlock();
@@ -376,7 +386,7 @@ void ShardedThreadPool::drain()
   drain_threads.set(1);
   assert (wq != NULL);
   wq->return_waiting_threads();
-  while (in_process.read() || !wq->is_all_shard_empty()){
+  while (num_threads != num_drained) {
     wait_cond.Wait(shardedpool_lock);
   }
   drain_threads.set(0);
index 6311cd5e04954696e68c6bc954f6d2fb4b07c317..352ed5ea1b610bad3a514c047dabbc27aacd754d 100644 (file)
@@ -442,6 +442,8 @@ class ShardedThreadPool {
   atomic_t pause_threads;
   atomic_t drain_threads;
   atomic_t in_process; 
+  uint32_t num_paused;
+  uint32_t num_drained;
 
 public:
 
@@ -452,9 +454,9 @@ public:
     baseShardedWQ(time_t ti, time_t sti):timeout_interval(ti), suicide_interval(sti) {}
     virtual ~baseShardedWQ() {}
 
-    virtual void _process(uint32_t thread_index, heartbeat_handle_d *hb, atomic_t& in_process ) = 0;
+    virtual void _process(uint32_t thread_index, heartbeat_handle_d *hb ) = 0;
     virtual void return_waiting_threads() = 0;
-    virtual bool is_all_shard_empty() = 0;
+    virtual bool is_shard_empty(uint32_t thread_index) = 0;
   };      
 
   template <typename T>
index aa947529164260955959ab954fa2aeca2c1f93c0..6eb2df44932d96662c4699d4ad0f8e4712360e04 100644 (file)
@@ -7962,7 +7962,7 @@ void OSD::enqueue_op(PG *pg, OpRequestRef op)
   pg->queue_op(op);
 }
 
-void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb, atomic_t& in_process ) {
+void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb ) {
 
   uint32_t shard_index = thread_index % num_shards;
 
@@ -7981,11 +7981,9 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb, a
       return;
     }
   }
-
   pair<PGRef, OpRequestRef> item = sdata->pqueue.dequeue();
   sdata->pg_for_processing[&*(item.first)].push_back(item.second);
   sdata->sdata_op_ordering_lock.Unlock();
-  in_process.inc();
   ThreadPool::TPHandle tp_handle(osd->cct, hb, timeout_interval, suicide_interval);
 
   (item.first)->lock_suspend_timeout(tp_handle);
@@ -8015,7 +8013,6 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb, a
 
   osd->dequeue_op(item.first, op, tp_handle);
   (item.first)->unlock();
-  in_process.dec();
 
 }
 
index 08c1219c00e01fdc8e5cb163bb9257b8d23f1c05..08922bd0363e296b57c7cbbfc947b02cd5724b8a 100644 (file)
@@ -1336,7 +1336,7 @@ private:
         }
       }
 
-      void _process(uint32_t thread_index, heartbeat_handle_d *hb, atomic_t& in_process );
+      void _process(uint32_t thread_index, heartbeat_handle_d *hb);
       void _enqueue(pair <PGRef, OpRequestRef> item);
       void _enqueue_front(pair <PGRef, OpRequestRef> item);
       
@@ -1398,26 +1398,9 @@ private:
         }
 
       }
-
-      bool is_all_shard_empty() {
-        bool is_empty = true;
-        for(uint32_t i = 0; i < num_shards; i++) {
-          ShardData* sdata = shard_list[i];
-          assert(NULL != sdata);
-          sdata->sdata_op_ordering_lock.Lock();
-          if (!sdata->pqueue.empty()) {
-           is_empty = false;
-            sdata->sdata_op_ordering_lock.Unlock();
-            break;
-          }
-          sdata->sdata_op_ordering_lock.Unlock();
-        }
-        return is_empty;
-
-      }
  
-      bool is_shard_empty(uint32_t shard_index) {
-
+      bool is_shard_empty(uint32_t thread_index) {
+        uint32_t shard_index = thread_index % num_shards; 
         ShardData* sdata = shard_list[shard_index];
         assert(NULL != sdata);
         Mutex::Locker l(sdata->sdata_op_ordering_lock);