]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
ceph-common: The Sharded threadpool worker logic changed
authorSomnath Roy <somnath.roy@sandisk.com>
Tue, 20 May 2014 23:01:34 +0000 (16:01 -0700)
committerSomnath Roy <somnath.roy@sandisk.com>
Sat, 31 May 2014 01:45:31 +0000 (18:45 -0700)
Now, the _process() of the derived queue is processing one request
at a time and the outer loop is controlled by the sharded threadpool.
The stop/pause/drain functionalities are controlled by the sharded TP.

Signed-off-by: Somnath Roy <somnath.roy@sandisk.com>
src/common/WorkQueue.cc
src/common/WorkQueue.h

index 16f79528873d4a3f40a5402763aca704d0650d25..ea5678c5921d08769b391e2595cd0174143835c4 100644 (file)
@@ -256,7 +256,8 @@ void ThreadPool::drain(WorkQueue_* wq)
 }
 
 ShardedThreadPool::ShardedThreadPool(CephContext *pcct_, string nm, uint32_t pnum_threads):
-  cct(pcct_), name(nm), lockname(nm + "::lock"), shardedpool_lock(lockname.c_str()), num_threads(pnum_threads), wq(NULL) {}
+  cct(pcct_), name(nm), lockname(nm + "::lock"), shardedpool_lock(lockname.c_str()), num_threads(pnum_threads), 
+  stop_threads(0), pause_threads(0),drain_threads(0), in_process(0), wq(NULL) {}
 
 void ShardedThreadPool::shardedthreadpool_worker(uint32_t thread_index)
 {
@@ -267,7 +268,25 @@ void ShardedThreadPool::shardedthreadpool_worker(uint32_t thread_index)
   ss << name << " thread " << (void*)pthread_self();
   heartbeat_handle_d *hb = cct->get_heartbeat_map()->add_worker(ss.str());
 
-  wq->_process(thread_index, hb);
+  while (!stop_threads.read()) {
+    if(pause_threads.read()) {
+      shardedpool_lock.Lock();
+      while(pause_threads.read()) {
+       cct->get_heartbeat_map()->reset_timeout(hb, 4, 0);
+       shardedpol_cond.WaitInterval(cct, shardedpool_lock, utime_t(2, 0));
+      }
+      shardedpool_lock.Unlock();
+    }
+
+    wq->_process(thread_index, hb, in_process);
+
+    if ((pause_threads.read() != 0) || (drain_threads.read() != 0)) {
+      shardedpool_lock.Lock();
+      wait_cond.Signal();
+      shardedpool_lock.Unlock();
+    }
+
+  }
 
   ldout(cct,10) << "sharded worker finish" << dendl;
 
@@ -302,9 +321,9 @@ void ShardedThreadPool::start()
 void ShardedThreadPool::stop()
 {
   ldout(cct,10) << "stop" << dendl;
+  stop_threads.set(1);
   assert (wq != NULL);
-  wq->stop_threads_on_queue();
-
+  wq->return_waiting_threads();
   for (vector<WorkThreadSharded*>::iterator p = threads_shardedpool.begin();
        p != threads_shardedpool.end();
        ++p) {
@@ -318,32 +337,50 @@ void ShardedThreadPool::stop()
 void ShardedThreadPool::pause()
 {
   ldout(cct,10) << "pause" << dendl;
+  shardedpool_lock.Lock();
+  pause_threads.set(1);
   assert (wq != NULL);
-  wq->pause_threads_on_queue();
+  wq->return_waiting_threads();
+  while (in_process.read()){
+    wait_cond.Wait(shardedpool_lock);
+  }
+  shardedpool_lock.Unlock();
   ldout(cct,10) << "paused" << dendl; 
 }
 
 void ShardedThreadPool::pause_new()
 {
   ldout(cct,10) << "pause_new" << dendl;
+  shardedpool_lock.Lock();
+  pause_threads.set(1);
   assert (wq != NULL);
-  wq->pause_new_threads_on_queue();
+  wq->return_waiting_threads();
+  shardedpool_lock.Unlock();
   ldout(cct,10) << "paused_new" << dendl;
 }
 
 void ShardedThreadPool::unpause()
 {
   ldout(cct,10) << "unpause" << dendl;
-  assert (wq != NULL);
-  wq->unpause_threads_on_queue();
+  shardedpool_lock.Lock();
+  pause_threads.set(0);
+  shardedpol_cond.Signal();
+  shardedpool_lock.Unlock();
   ldout(cct,10) << "unpaused" << dendl;
 }
 
 void ShardedThreadPool::drain()
 {
   ldout(cct,10) << "drain" << dendl;
+  shardedpool_lock.Lock();
+  drain_threads.set(1);
   assert (wq != NULL);
-  wq->drain_threads_on_queue();
+  wq->return_waiting_threads();
+  while (in_process.read() || !wq->is_all_shard_empty()){
+    wait_cond.Wait(shardedpool_lock);
+  }
+  drain_threads.set(0);
+  shardedpool_lock.Unlock();
   ldout(cct,10) << "drained" << dendl;
 }
 
index 41ad6bd407e7176da56a1f4ddacbeec6d82eb5b3..6311cd5e04954696e68c6bc954f6d2fb4b07c317 100644 (file)
@@ -436,48 +436,43 @@ class ShardedThreadPool {
   string lockname;
   Mutex shardedpool_lock;
   Cond shardedpol_cond;
+  Cond wait_cond;
   uint32_t num_threads;
+  atomic_t stop_threads;
+  atomic_t pause_threads;
+  atomic_t drain_threads;
+  atomic_t in_process; 
 
 public:
 
   class baseShardedWQ {
-
+  
   public:
     time_t timeout_interval, suicide_interval;
-
-  protected:
-    atomic_t stop_threads;
-    atomic_t pause_threads;
-    atomic_t drain_threads;
-    atomic_t in_process;
-
-
-  public:
-
-    baseShardedWQ(time_t ti, time_t sti):timeout_interval(ti), suicide_interval(sti)
-                                       ,stop_threads(0), pause_threads(0)
-                                       ,drain_threads(0), in_process(0) {}
+    baseShardedWQ(time_t ti, time_t sti):timeout_interval(ti), suicide_interval(sti) {}
     virtual ~baseShardedWQ() {}
-    virtual void _process(uint32_t thread_index, heartbeat_handle_d *hb) = 0;
-    virtual void stop_threads_on_queue() = 0;
-    virtual void pause_threads_on_queue() = 0;
-    virtual void pause_new_threads_on_queue() = 0;
-    virtual void unpause_threads_on_queue() = 0;
-    virtual void drain_threads_on_queue() = 0;
 
-  };
+    virtual void _process(uint32_t thread_index, heartbeat_handle_d *hb, atomic_t& in_process ) = 0;
+    virtual void return_waiting_threads() = 0;
+    virtual bool is_all_shard_empty() = 0;
+  };      
 
   template <typename T>
   class ShardedWQ: public baseShardedWQ {
+  
+    ShardedThreadPool* sharded_pool;
+
+  protected:
+    virtual void _enqueue(T) = 0;
+    virtual void _enqueue_front(T) = 0;
+
 
   public:
-    ShardedWQ(time_t ti, time_t sti, ShardedThreadPool* tp):baseShardedWQ(ti, sti) {
+    ShardedWQ(time_t ti, time_t sti, ShardedThreadPool* tp): baseShardedWQ(ti, sti), 
+                                                                 sharded_pool(tp) {
       tp->set_wq(this);
-    
     }
-
-    virtual void _enqueue(T) = 0;
-    virtual void _enqueue_front(T) = 0;
+    virtual ~ShardedWQ() {}
 
     void queue(T item) {
       _enqueue(item);
@@ -485,6 +480,10 @@ public:
     void queue_front(T item) {
       _enqueue_front(item);
     }
+    void drain() {
+      sharded_pool->drain();
+    }
+    
   };
 
 private:
@@ -502,6 +501,13 @@ private:
   };
 
   vector<WorkThreadSharded*> threads_shardedpool;
+  void start_threads();
+  void shardedthreadpool_worker(uint32_t thread_index);
+  void set_wq(baseShardedWQ* swq) {
+    wq = swq;
+  }
+
+
 
 public:
 
@@ -509,10 +515,6 @@ public:
 
   ~ShardedThreadPool(){};
 
-  void set_wq(baseShardedWQ* swq) {
-    wq = swq;
-  }
-
   /// start thread pool thread
   void start();
   /// stop thread pool thread
@@ -526,11 +528,6 @@ public:
   /// wait for all work to complete
   void drain();
 
-  void start_threads();
-  void shardedthreadpool_worker(uint32_t thread_index);
-
-
-
 };