]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
ceph-common: Implementation of the sharded threadpool.
authorSomnath Roy <somnath.roy@sandisk.com>
Wed, 14 May 2014 22:50:25 +0000 (15:50 -0700)
committerSomnath Roy <somnath.roy@sandisk.com>
Sat, 31 May 2014 01:44:27 +0000 (18:44 -0700)
Threadpool will only be having a single work queue and internally the
work queue will be having multiple storage data structures.
Based on some logic (which is derived class implementation specific)
the work queue will shard the requests among these storage structures.
Each storage will be guarded by finer grained sunchronization
objects. Sharded threadpool threads will be assigned to work on a
shard based on some algorithm which is again derived class implementation
specific.

Signed-off-by: Somnath Roy <somnath.roy@sandisk.com>
src/common/WorkQueue.cc
src/common/WorkQueue.h

index f47435bf3abc680e7c21b470331be7dcb44f70ce..16f79528873d4a3f40a5402763aca704d0650d25 100644 (file)
@@ -255,3 +255,95 @@ void ThreadPool::drain(WorkQueue_* wq)
   _lock.Unlock();
 }
 
+ShardedThreadPool::ShardedThreadPool(CephContext *pcct_, string nm, uint32_t pnum_threads):
+  cct(pcct_), name(nm), lockname(nm + "::lock"), shardedpool_lock(lockname.c_str()), num_threads(pnum_threads), wq(NULL) {}
+
+void ShardedThreadPool::shardedthreadpool_worker(uint32_t thread_index)
+{
+  assert (wq != NULL);
+  ldout(cct,10) << "worker start" << dendl;
+
+  std::stringstream ss;
+  ss << name << " thread " << (void*)pthread_self();
+  heartbeat_handle_d *hb = cct->get_heartbeat_map()->add_worker(ss.str());
+
+  wq->_process(thread_index, hb);
+
+  ldout(cct,10) << "sharded worker finish" << dendl;
+
+  cct->get_heartbeat_map()->remove_worker(hb);
+
+}
+
+void ShardedThreadPool::start_threads()
+{
+  assert(shardedpool_lock.is_locked());
+  int32_t thread_index = 0;
+  while (threads_shardedpool.size() < num_threads) {
+
+    WorkThreadSharded *wt = new WorkThreadSharded(this, thread_index);
+    ldout(cct, 10) << "start_threads creating and starting " << wt << dendl;
+    threads_shardedpool.push_back(wt);
+    wt->create();
+    thread_index ++;
+  }
+}
+
+void ShardedThreadPool::start()
+{
+  ldout(cct,10) << "start" << dendl;
+
+  shardedpool_lock.Lock();
+  start_threads();
+  shardedpool_lock.Unlock();
+  ldout(cct,15) << "started" << dendl;
+}
+
+void ShardedThreadPool::stop()
+{
+  ldout(cct,10) << "stop" << dendl;
+  assert (wq != NULL);
+  wq->stop_threads_on_queue();
+
+  for (vector<WorkThreadSharded*>::iterator p = threads_shardedpool.begin();
+       p != threads_shardedpool.end();
+       ++p) {
+    (*p)->join();
+    delete *p;
+  }
+  threads_shardedpool.clear();
+  ldout(cct,15) << "stopped" << dendl;
+}
+
+void ShardedThreadPool::pause()
+{
+  ldout(cct,10) << "pause" << dendl;
+  assert (wq != NULL);
+  wq->pause_threads_on_queue();
+  ldout(cct,10) << "paused" << dendl; 
+}
+
+void ShardedThreadPool::pause_new()
+{
+  ldout(cct,10) << "pause_new" << dendl;
+  assert (wq != NULL);
+  wq->pause_new_threads_on_queue();
+  ldout(cct,10) << "paused_new" << dendl;
+}
+
+void ShardedThreadPool::unpause()
+{
+  ldout(cct,10) << "unpause" << dendl;
+  assert (wq != NULL);
+  wq->unpause_threads_on_queue();
+  ldout(cct,10) << "unpaused" << dendl;
+}
+
+void ShardedThreadPool::drain()
+{
+  ldout(cct,10) << "drain" << dendl;
+  assert (wq != NULL);
+  wq->drain_threads_on_queue();
+  ldout(cct,10) << "drained" << dendl;
+}
+
index 71cf89605e7c466de91a7ca444803f7edadbd8d5..41ad6bd407e7176da56a1f4ddacbeec6d82eb5b3 100644 (file)
@@ -41,13 +41,13 @@ public:
     heartbeat_handle_d *hb;
     time_t grace;
     time_t suicide_grace;
+  public:
     TPHandle(
       CephContext *cct,
       heartbeat_handle_d *hb,
       time_t grace,
       time_t suicide_grace)
       : cct(cct), hb(hb), grace(grace), suicide_grace(suicide_grace) {}
-  public:
     void reset_tp_timeout();
     void suspend_tp_timeout();
   };
@@ -429,4 +429,109 @@ public:
   }
 };
 
+class ShardedThreadPool {
+
+  CephContext *cct;
+  string name;
+  string lockname;
+  Mutex shardedpool_lock;
+  Cond shardedpol_cond;
+  uint32_t num_threads;
+
+public:
+
+  class baseShardedWQ {
+
+  public:
+    time_t timeout_interval, suicide_interval;
+
+  protected:
+    atomic_t stop_threads;
+    atomic_t pause_threads;
+    atomic_t drain_threads;
+    atomic_t in_process;
+
+
+  public:
+
+    baseShardedWQ(time_t ti, time_t sti):timeout_interval(ti), suicide_interval(sti)
+                                       ,stop_threads(0), pause_threads(0)
+                                       ,drain_threads(0), in_process(0) {}
+    virtual ~baseShardedWQ() {}
+    virtual void _process(uint32_t thread_index, heartbeat_handle_d *hb) = 0;
+    virtual void stop_threads_on_queue() = 0;
+    virtual void pause_threads_on_queue() = 0;
+    virtual void pause_new_threads_on_queue() = 0;
+    virtual void unpause_threads_on_queue() = 0;
+    virtual void drain_threads_on_queue() = 0;
+
+  };
+
+  template <typename T>
+  class ShardedWQ: public baseShardedWQ {
+
+  public:
+    ShardedWQ(time_t ti, time_t sti, ShardedThreadPool* tp):baseShardedWQ(ti, sti) {
+      tp->set_wq(this);
+    
+    }
+
+    virtual void _enqueue(T) = 0;
+    virtual void _enqueue_front(T) = 0;
+
+    void queue(T item) {
+      _enqueue(item);
+    }
+    void queue_front(T item) {
+      _enqueue_front(item);
+    }
+  };
+
+private:
+
+  baseShardedWQ* wq;
+  // threads
+  struct WorkThreadSharded : public Thread {
+    ShardedThreadPool *pool;
+    uint32_t thread_index;
+    WorkThreadSharded(ShardedThreadPool *p, uint32_t pthread_index): pool(p),thread_index(pthread_index) {}
+    void *entry() {
+      pool->shardedthreadpool_worker(thread_index);
+      return 0;
+    }
+  };
+
+  vector<WorkThreadSharded*> threads_shardedpool;
+
+public:
+
+  ShardedThreadPool(CephContext *cct_, string nm, uint32_t pnum_threads);
+
+  ~ShardedThreadPool(){};
+
+  void set_wq(baseShardedWQ* swq) {
+    wq = swq;
+  }
+
+  /// start thread pool thread
+  void start();
+  /// stop thread pool thread
+  void stop();
+  /// pause thread pool (if it not already paused)
+  void pause();
+  /// pause initiation of new work
+  void pause_new();
+  /// resume work in thread pool.  must match each pause() call 1:1 to resume.
+  void unpause();
+  /// wait for all work to complete
+  void drain();
+
+  void start_threads();
+  void shardedthreadpool_worker(uint32_t thread_index);
+
+
+
+};
+
+
 #endif