]> git-server-git.apps.pok.os.sepia.ceph.com Git - rocksdb.git/commitdiff
ThreadPool to allow decrease number of threads and increase of number of threads...
authorsdong <siying.d@fb.com>
Mon, 19 May 2014 17:40:18 +0000 (10:40 -0700)
committersdong <siying.d@fb.com>
Mon, 19 May 2014 18:52:12 +0000 (11:52 -0700)
Summary:
Add a feature to decrease the number of threads in thread pool.
Also instantly schedule more threads if number of threads is increased.

Here is the way it is implemented: each background thread needs its thread ID. After decreasing number of threads, all threads are woken up. The thread with the largest thread ID will terminate. If there are more threads to terminate, the thread will wake up all threads again.

Another change is made so that when number of threads is increased, more threads are created and all previous excessive threads are woken up to do the work.

Test Plan: Add a unit test.

Reviewers: haobo, dhruba

Reviewed By: haobo

CC: yhchiang, igor, nkg-, leveldb
Differential Revision: https://reviews.facebook.net/D18675

util/env_posix.cc
util/env_test.cc

index 5cbd5bd009d845197424f76c2a59f1476330d006..52787517a5062732f74ce42de048a781e965bc85 100644 (file)
@@ -1470,17 +1470,50 @@ class PosixEnv : public Env {
       }
     }
 
-    void BGThread() {
+    // Return true if there is at least one thread needs to terminate.
+    bool HasExcessiveThread() {
+      return static_cast<int>(bgthreads_.size()) > total_threads_limit_;
+    }
+
+    // Return true iff the current thread is the excessive thread to terminate.
+    // Always terminate the running thread that is added last, even if there are
+    // more than one thread to terminate.
+    bool IsLastExcessiveThread(size_t thread_id) {
+      return HasExcessiveThread() &&
+             thread_id == bgthreads_.size() - 1;
+    }
+
+    // Is one of the threads to terminate.
+    bool IsExcessiveThread(size_t thread_id) {
+      return static_cast<int>(thread_id) >= total_threads_limit_;
+    }
+
+    void BGThread(size_t thread_id) {
       while (true) {
         // Wait until there is an item that is ready to run
         PthreadCall("lock", pthread_mutex_lock(&mu_));
-        while (queue_.empty() && !exit_all_threads_) {
+        // Stop waiting if the thread needs to do work or needs to terminate.
+        while (!exit_all_threads_ && !IsLastExcessiveThread(thread_id) &&
+               (queue_.empty() || IsExcessiveThread(thread_id))) {
           PthreadCall("wait", pthread_cond_wait(&bgsignal_, &mu_));
         }
         if (exit_all_threads_) { // mechanism to let BG threads exit safely
           PthreadCall("unlock", pthread_mutex_unlock(&mu_));
           break;
         }
+        if (IsLastExcessiveThread(thread_id)) {
+          // Current thread is the last generated one and is excessive.
+          // We always terminate excessive thread in the reverse order of
+          // generation time.
+          pthread_detach(bgthreads_.back());
+          bgthreads_.pop_back();
+          if (HasExcessiveThread()) {
+            // There is still at least more excessive thread to terminate.
+            WakeUpAllThreads();
+          }
+          PthreadCall("unlock", pthread_mutex_unlock(&mu_));
+          break;
+        }
         void (*function)(void*) = queue_.front().function;
         void* arg = queue_.front().arg;
         queue_.pop_front();
@@ -1491,36 +1524,50 @@ class PosixEnv : public Env {
       }
     }
 
+    // Helper struct for passing arguments when creating threads.
+    struct BGThreadMetadata {
+      ThreadPool* thread_pool_;
+      size_t thread_id_;  // Thread count in the thread.
+      explicit BGThreadMetadata(ThreadPool* thread_pool, size_t thread_id)
+          : thread_pool_(thread_pool), thread_id_(thread_id) {}
+    };
+
     static void* BGThreadWrapper(void* arg) {
-      reinterpret_cast<ThreadPool*>(arg)->BGThread();
+      BGThreadMetadata* meta = reinterpret_cast<BGThreadMetadata*>(arg);
+      size_t thread_id = meta->thread_id_;
+      ThreadPool* tp = meta->thread_pool_;
+      delete meta;
+      tp->BGThread(thread_id);
       return nullptr;
     }
 
+    void WakeUpAllThreads() {
+      PthreadCall("signalall", pthread_cond_broadcast(&bgsignal_));
+    }
+
     void SetBackgroundThreads(int num) {
       PthreadCall("lock", pthread_mutex_lock(&mu_));
-      if (num > total_threads_limit_) {
+      if (exit_all_threads_) {
+        PthreadCall("unlock", pthread_mutex_unlock(&mu_));
+        return;
+      }
+      if (num != total_threads_limit_) {
         total_threads_limit_ = num;
+        WakeUpAllThreads();
+        StartBGThreads();
       }
       assert(total_threads_limit_ > 0);
       PthreadCall("unlock", pthread_mutex_unlock(&mu_));
     }
 
-    void Schedule(void (*function)(void*), void* arg) {
-      PthreadCall("lock", pthread_mutex_lock(&mu_));
-
-      if (exit_all_threads_) {
-        PthreadCall("unlock", pthread_mutex_unlock(&mu_));
-        return;
-      }
+    void StartBGThreads() {
       // Start background thread if necessary
       while ((int)bgthreads_.size() < total_threads_limit_) {
         pthread_t t;
         PthreadCall(
-          "create thread",
-          pthread_create(&t,
-                         nullptr,
-                         &ThreadPool::BGThreadWrapper,
-                         this));
+            "create thread",
+            pthread_create(&t, nullptr, &ThreadPool::BGThreadWrapper,
+                           new BGThreadMetadata(this, bgthreads_.size())));
 
         // Set the thread name to aid debugging
 #if defined(_GNU_SOURCE) && defined(__GLIBC_PREREQ)
@@ -1534,6 +1581,17 @@ class PosixEnv : public Env {
 
         bgthreads_.push_back(t);
       }
+    }
+
+    void Schedule(void (*function)(void*), void* arg) {
+      PthreadCall("lock", pthread_mutex_lock(&mu_));
+
+      if (exit_all_threads_) {
+        PthreadCall("unlock", pthread_mutex_unlock(&mu_));
+        return;
+      }
+
+      StartBGThreads();
 
       // Add to priority queue
       queue_.push_back(BGItem());
@@ -1541,8 +1599,14 @@ class PosixEnv : public Env {
       queue_.back().arg = arg;
       queue_len_.store(queue_.size(), std::memory_order_relaxed);
 
-      // always wake up at least one waiting thread.
-      PthreadCall("signal", pthread_cond_signal(&bgsignal_));
+      if (!HasExcessiveThread()) {
+        // Wake up at least one waiting thread.
+        PthreadCall("signal", pthread_cond_signal(&bgsignal_));
+      } else {
+        // Need to wake up all threads to make sure the one woken
+        // up is not the one to terminate.
+        WakeUpAllThreads();
+      }
 
       PthreadCall("unlock", pthread_mutex_unlock(&mu_));
     }
index 1ac3773b2e66a8a459c804dc36213e5e616277c0..2abce6f3a7e054cf241c57ca9aa8caa5069183e5 100644 (file)
@@ -200,6 +200,197 @@ TEST(EnvPosixTest, TwoPools) {
   ASSERT_EQ(0U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
 }
 
+TEST(EnvPosixTest, DecreaseNumBgThreads) {
+  class SleepingBackgroundTask {
+   public:
+    explicit SleepingBackgroundTask()
+        : bg_cv_(&mutex_), should_sleep_(true), sleeping_(false) {}
+    void DoSleep() {
+      MutexLock l(&mutex_);
+      sleeping_ = true;
+      while (should_sleep_) {
+        bg_cv_.Wait();
+      }
+      sleeping_ = false;
+      bg_cv_.SignalAll();
+    }
+
+    void WakeUp() {
+      MutexLock l(&mutex_);
+      should_sleep_ = false;
+      bg_cv_.SignalAll();
+
+      while (sleeping_) {
+        bg_cv_.Wait();
+      }
+    }
+
+    bool IsSleeping() {
+      MutexLock l(&mutex_);
+      return sleeping_;
+    }
+
+    static void DoSleepTask(void* arg) {
+      reinterpret_cast<SleepingBackgroundTask*>(arg)->DoSleep();
+    }
+
+   private:
+    port::Mutex mutex_;
+    port::CondVar bg_cv_;  // Signalled when background work finishes
+    bool should_sleep_;
+    bool sleeping_;
+  };
+
+  std::vector<SleepingBackgroundTask> tasks(10);
+
+  // Set number of thread to 1 first.
+  env_->SetBackgroundThreads(1, Env::Priority::HIGH);
+  Env::Default()->SleepForMicroseconds(kDelayMicros);
+
+  // Schedule 3 tasks. 0 running; Task 1, 2 waiting.
+  for (size_t i = 0; i < 3; i++) {
+    env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &tasks[i],
+                   Env::Priority::HIGH);
+    Env::Default()->SleepForMicroseconds(kDelayMicros);
+  }
+  ASSERT_EQ(2U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
+  ASSERT_TRUE(tasks[0].IsSleeping());
+  ASSERT_TRUE(!tasks[1].IsSleeping());
+  ASSERT_TRUE(!tasks[2].IsSleeping());
+
+  // Increase to 2 threads. Task 0, 1 running; 2 waiting
+  env_->SetBackgroundThreads(2, Env::Priority::HIGH);
+  Env::Default()->SleepForMicroseconds(kDelayMicros);
+  ASSERT_EQ(1U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
+  ASSERT_TRUE(tasks[0].IsSleeping());
+  ASSERT_TRUE(tasks[1].IsSleeping());
+  ASSERT_TRUE(!tasks[2].IsSleeping());
+
+  // Shrink back to 1 thread. Still task 0, 1 running, 2 waiting
+  env_->SetBackgroundThreads(1, Env::Priority::HIGH);
+  Env::Default()->SleepForMicroseconds(kDelayMicros);
+  ASSERT_EQ(1U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
+  ASSERT_TRUE(tasks[0].IsSleeping());
+  ASSERT_TRUE(tasks[1].IsSleeping());
+  ASSERT_TRUE(!tasks[2].IsSleeping());
+
+  // The last task finishes. Task 0 running, 2 waiting.
+  tasks[1].WakeUp();
+  Env::Default()->SleepForMicroseconds(kDelayMicros);
+  ASSERT_EQ(1U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
+  ASSERT_TRUE(tasks[0].IsSleeping());
+  ASSERT_TRUE(!tasks[1].IsSleeping());
+  ASSERT_TRUE(!tasks[2].IsSleeping());
+
+  // Increase to 5 threads. Task 0 and 2 running.
+  env_->SetBackgroundThreads(5, Env::Priority::HIGH);
+  Env::Default()->SleepForMicroseconds(kDelayMicros);
+  ASSERT_EQ(0, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
+  ASSERT_TRUE(tasks[0].IsSleeping());
+  ASSERT_TRUE(tasks[2].IsSleeping());
+
+  // Change number of threads a couple of times while there is no sufficient
+  // tasks.
+  env_->SetBackgroundThreads(7, Env::Priority::HIGH);
+  Env::Default()->SleepForMicroseconds(kDelayMicros);
+  tasks[2].WakeUp();
+  ASSERT_EQ(0U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
+  env_->SetBackgroundThreads(3, Env::Priority::HIGH);
+  Env::Default()->SleepForMicroseconds(kDelayMicros);
+  ASSERT_EQ(0U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
+  env_->SetBackgroundThreads(4, Env::Priority::HIGH);
+  Env::Default()->SleepForMicroseconds(kDelayMicros);
+  ASSERT_EQ(0U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
+  env_->SetBackgroundThreads(5, Env::Priority::HIGH);
+  Env::Default()->SleepForMicroseconds(kDelayMicros);
+  ASSERT_EQ(0U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
+  env_->SetBackgroundThreads(4, Env::Priority::HIGH);
+  Env::Default()->SleepForMicroseconds(kDelayMicros);
+  ASSERT_EQ(0U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
+
+  Env::Default()->SleepForMicroseconds(kDelayMicros * 50);
+
+  // Enqueue 5 more tasks. Thread pool size now is 4.
+  // Task 0, 3, 4, 5 running;6, 7 waiting.
+  for (size_t i = 3; i < 8; i++) {
+    env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &tasks[i],
+                   Env::Priority::HIGH);
+  }
+  Env::Default()->SleepForMicroseconds(kDelayMicros);
+  ASSERT_EQ(2U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
+  ASSERT_TRUE(tasks[3].IsSleeping());
+  ASSERT_TRUE(tasks[4].IsSleeping());
+  ASSERT_TRUE(tasks[5].IsSleeping());
+  ASSERT_TRUE(!tasks[6].IsSleeping());
+  ASSERT_TRUE(!tasks[7].IsSleeping());
+
+  // Wake up task 0, 3 and 4. Task 5, 6, 7 running.
+  tasks[0].WakeUp();
+  tasks[3].WakeUp();
+  tasks[4].WakeUp();
+
+  Env::Default()->SleepForMicroseconds(kDelayMicros);
+  ASSERT_EQ(0, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
+  for (size_t i = 5; i < 8; i++) {
+    ASSERT_TRUE(tasks[i].IsSleeping());
+  }
+
+  // Shrink back to 1 thread. Still task 5, 6, 7 running
+  env_->SetBackgroundThreads(1, Env::Priority::HIGH);
+  Env::Default()->SleepForMicroseconds(kDelayMicros);
+  ASSERT_TRUE(tasks[5].IsSleeping());
+  ASSERT_TRUE(tasks[6].IsSleeping());
+  ASSERT_TRUE(tasks[7].IsSleeping());
+
+  // Wake up task  6. Task 5, 7 running
+  tasks[6].WakeUp();
+  Env::Default()->SleepForMicroseconds(kDelayMicros);
+  ASSERT_TRUE(tasks[5].IsSleeping());
+  ASSERT_TRUE(!tasks[6].IsSleeping());
+  ASSERT_TRUE(tasks[7].IsSleeping());
+
+  // Wake up threads 7. Task 5 running
+  tasks[7].WakeUp();
+  Env::Default()->SleepForMicroseconds(kDelayMicros);
+  ASSERT_TRUE(!tasks[7].IsSleeping());
+
+  // Enqueue thread 8 and 9. Task 5 running; one of 8, 9 might be running.
+  env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &tasks[8],
+                 Env::Priority::HIGH);
+  env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &tasks[9],
+                 Env::Priority::HIGH);
+  Env::Default()->SleepForMicroseconds(kDelayMicros);
+  ASSERT_GT(env_->GetThreadPoolQueueLen(Env::Priority::HIGH), 0);
+  ASSERT_TRUE(!tasks[8].IsSleeping() || !tasks[9].IsSleeping());
+
+  // Increase to 4 threads. Task 5, 8, 9 running.
+  env_->SetBackgroundThreads(4, Env::Priority::HIGH);
+  Env::Default()->SleepForMicroseconds(kDelayMicros);
+  ASSERT_EQ(0, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
+  ASSERT_TRUE(tasks[8].IsSleeping());
+  ASSERT_TRUE(tasks[9].IsSleeping());
+
+  // Shrink to 1 thread
+  env_->SetBackgroundThreads(1, Env::Priority::HIGH);
+
+  // Wake up thread 9.
+  tasks[9].WakeUp();
+  Env::Default()->SleepForMicroseconds(kDelayMicros);
+  ASSERT_TRUE(!tasks[9].IsSleeping());
+  ASSERT_TRUE(tasks[8].IsSleeping());
+
+  // Wake up thread 8
+  tasks[8].WakeUp();
+  Env::Default()->SleepForMicroseconds(kDelayMicros);
+  ASSERT_TRUE(!tasks[8].IsSleeping());
+
+  // Wake up the last thread
+  tasks[5].WakeUp();
+
+  Env::Default()->SleepForMicroseconds(kDelayMicros);
+  ASSERT_TRUE(!tasks[5].IsSleeping());
+}
+
 #ifdef OS_LINUX
 // To make sure the Env::GetUniqueId() related tests work correctly, The files
 // should be stored in regular storage like "hard disk" or "flash device".