}
}
- void BGThread() {
+ // Return true if there is at least one thread needs to terminate.
+ bool HasExcessiveThread() {
+ return static_cast<int>(bgthreads_.size()) > total_threads_limit_;
+ }
+
+ // Return true iff the current thread is the excessive thread to terminate.
+ // Always terminate the running thread that is added last, even if there are
+ // more than one thread to terminate.
+ bool IsLastExcessiveThread(size_t thread_id) {
+ return HasExcessiveThread() &&
+ thread_id == bgthreads_.size() - 1;
+ }
+
+ // Is one of the threads to terminate.
+ bool IsExcessiveThread(size_t thread_id) {
+ return static_cast<int>(thread_id) >= total_threads_limit_;
+ }
+
+ void BGThread(size_t thread_id) {
while (true) {
// Wait until there is an item that is ready to run
PthreadCall("lock", pthread_mutex_lock(&mu_));
- while (queue_.empty() && !exit_all_threads_) {
+ // Stop waiting if the thread needs to do work or needs to terminate.
+ while (!exit_all_threads_ && !IsLastExcessiveThread(thread_id) &&
+ (queue_.empty() || IsExcessiveThread(thread_id))) {
PthreadCall("wait", pthread_cond_wait(&bgsignal_, &mu_));
}
if (exit_all_threads_) { // mechanism to let BG threads exit safely
PthreadCall("unlock", pthread_mutex_unlock(&mu_));
break;
}
+ if (IsLastExcessiveThread(thread_id)) {
+ // Current thread is the last generated one and is excessive.
+ // We always terminate excessive thread in the reverse order of
+ // generation time.
+ pthread_detach(bgthreads_.back());
+ bgthreads_.pop_back();
+ if (HasExcessiveThread()) {
+ // There is still at least more excessive thread to terminate.
+ WakeUpAllThreads();
+ }
+ PthreadCall("unlock", pthread_mutex_unlock(&mu_));
+ break;
+ }
void (*function)(void*) = queue_.front().function;
void* arg = queue_.front().arg;
queue_.pop_front();
}
}
+ // Helper struct for passing arguments when creating threads.
+ struct BGThreadMetadata {
+ ThreadPool* thread_pool_;
+ size_t thread_id_; // Thread count in the thread.
+ explicit BGThreadMetadata(ThreadPool* thread_pool, size_t thread_id)
+ : thread_pool_(thread_pool), thread_id_(thread_id) {}
+ };
+
static void* BGThreadWrapper(void* arg) {
- reinterpret_cast<ThreadPool*>(arg)->BGThread();
+ BGThreadMetadata* meta = reinterpret_cast<BGThreadMetadata*>(arg);
+ size_t thread_id = meta->thread_id_;
+ ThreadPool* tp = meta->thread_pool_;
+ delete meta;
+ tp->BGThread(thread_id);
return nullptr;
}
+ void WakeUpAllThreads() {
+ PthreadCall("signalall", pthread_cond_broadcast(&bgsignal_));
+ }
+
void SetBackgroundThreads(int num) {
PthreadCall("lock", pthread_mutex_lock(&mu_));
- if (num > total_threads_limit_) {
+ if (exit_all_threads_) {
+ PthreadCall("unlock", pthread_mutex_unlock(&mu_));
+ return;
+ }
+ if (num != total_threads_limit_) {
total_threads_limit_ = num;
+ WakeUpAllThreads();
+ StartBGThreads();
}
assert(total_threads_limit_ > 0);
PthreadCall("unlock", pthread_mutex_unlock(&mu_));
}
- void Schedule(void (*function)(void*), void* arg) {
- PthreadCall("lock", pthread_mutex_lock(&mu_));
-
- if (exit_all_threads_) {
- PthreadCall("unlock", pthread_mutex_unlock(&mu_));
- return;
- }
+ void StartBGThreads() {
// Start background thread if necessary
while ((int)bgthreads_.size() < total_threads_limit_) {
pthread_t t;
PthreadCall(
- "create thread",
- pthread_create(&t,
- nullptr,
- &ThreadPool::BGThreadWrapper,
- this));
+ "create thread",
+ pthread_create(&t, nullptr, &ThreadPool::BGThreadWrapper,
+ new BGThreadMetadata(this, bgthreads_.size())));
// Set the thread name to aid debugging
#if defined(_GNU_SOURCE) && defined(__GLIBC_PREREQ)
bgthreads_.push_back(t);
}
+ }
+
+ void Schedule(void (*function)(void*), void* arg) {
+ PthreadCall("lock", pthread_mutex_lock(&mu_));
+
+ if (exit_all_threads_) {
+ PthreadCall("unlock", pthread_mutex_unlock(&mu_));
+ return;
+ }
+
+ StartBGThreads();
// Add to priority queue
queue_.push_back(BGItem());
queue_.back().arg = arg;
queue_len_.store(queue_.size(), std::memory_order_relaxed);
- // always wake up at least one waiting thread.
- PthreadCall("signal", pthread_cond_signal(&bgsignal_));
+ if (!HasExcessiveThread()) {
+ // Wake up at least one waiting thread.
+ PthreadCall("signal", pthread_cond_signal(&bgsignal_));
+ } else {
+ // Need to wake up all threads to make sure the one woken
+ // up is not the one to terminate.
+ WakeUpAllThreads();
+ }
PthreadCall("unlock", pthread_mutex_unlock(&mu_));
}
ASSERT_EQ(0U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
}
+TEST(EnvPosixTest, DecreaseNumBgThreads) {
+ class SleepingBackgroundTask {
+ public:
+ explicit SleepingBackgroundTask()
+ : bg_cv_(&mutex_), should_sleep_(true), sleeping_(false) {}
+ void DoSleep() {
+ MutexLock l(&mutex_);
+ sleeping_ = true;
+ while (should_sleep_) {
+ bg_cv_.Wait();
+ }
+ sleeping_ = false;
+ bg_cv_.SignalAll();
+ }
+
+ void WakeUp() {
+ MutexLock l(&mutex_);
+ should_sleep_ = false;
+ bg_cv_.SignalAll();
+
+ while (sleeping_) {
+ bg_cv_.Wait();
+ }
+ }
+
+ bool IsSleeping() {
+ MutexLock l(&mutex_);
+ return sleeping_;
+ }
+
+ static void DoSleepTask(void* arg) {
+ reinterpret_cast<SleepingBackgroundTask*>(arg)->DoSleep();
+ }
+
+ private:
+ port::Mutex mutex_;
+ port::CondVar bg_cv_; // Signalled when background work finishes
+ bool should_sleep_;
+ bool sleeping_;
+ };
+
+ std::vector<SleepingBackgroundTask> tasks(10);
+
+ // Set number of thread to 1 first.
+ env_->SetBackgroundThreads(1, Env::Priority::HIGH);
+ Env::Default()->SleepForMicroseconds(kDelayMicros);
+
+ // Schedule 3 tasks. 0 running; Task 1, 2 waiting.
+ for (size_t i = 0; i < 3; i++) {
+ env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &tasks[i],
+ Env::Priority::HIGH);
+ Env::Default()->SleepForMicroseconds(kDelayMicros);
+ }
+ ASSERT_EQ(2U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
+ ASSERT_TRUE(tasks[0].IsSleeping());
+ ASSERT_TRUE(!tasks[1].IsSleeping());
+ ASSERT_TRUE(!tasks[2].IsSleeping());
+
+ // Increase to 2 threads. Task 0, 1 running; 2 waiting
+ env_->SetBackgroundThreads(2, Env::Priority::HIGH);
+ Env::Default()->SleepForMicroseconds(kDelayMicros);
+ ASSERT_EQ(1U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
+ ASSERT_TRUE(tasks[0].IsSleeping());
+ ASSERT_TRUE(tasks[1].IsSleeping());
+ ASSERT_TRUE(!tasks[2].IsSleeping());
+
+ // Shrink back to 1 thread. Still task 0, 1 running, 2 waiting
+ env_->SetBackgroundThreads(1, Env::Priority::HIGH);
+ Env::Default()->SleepForMicroseconds(kDelayMicros);
+ ASSERT_EQ(1U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
+ ASSERT_TRUE(tasks[0].IsSleeping());
+ ASSERT_TRUE(tasks[1].IsSleeping());
+ ASSERT_TRUE(!tasks[2].IsSleeping());
+
+ // The last task finishes. Task 0 running, 2 waiting.
+ tasks[1].WakeUp();
+ Env::Default()->SleepForMicroseconds(kDelayMicros);
+ ASSERT_EQ(1U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
+ ASSERT_TRUE(tasks[0].IsSleeping());
+ ASSERT_TRUE(!tasks[1].IsSleeping());
+ ASSERT_TRUE(!tasks[2].IsSleeping());
+
+ // Increase to 5 threads. Task 0 and 2 running.
+ env_->SetBackgroundThreads(5, Env::Priority::HIGH);
+ Env::Default()->SleepForMicroseconds(kDelayMicros);
+ ASSERT_EQ(0, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
+ ASSERT_TRUE(tasks[0].IsSleeping());
+ ASSERT_TRUE(tasks[2].IsSleeping());
+
+ // Change number of threads a couple of times while there is no sufficient
+ // tasks.
+ env_->SetBackgroundThreads(7, Env::Priority::HIGH);
+ Env::Default()->SleepForMicroseconds(kDelayMicros);
+ tasks[2].WakeUp();
+ ASSERT_EQ(0U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
+ env_->SetBackgroundThreads(3, Env::Priority::HIGH);
+ Env::Default()->SleepForMicroseconds(kDelayMicros);
+ ASSERT_EQ(0U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
+ env_->SetBackgroundThreads(4, Env::Priority::HIGH);
+ Env::Default()->SleepForMicroseconds(kDelayMicros);
+ ASSERT_EQ(0U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
+ env_->SetBackgroundThreads(5, Env::Priority::HIGH);
+ Env::Default()->SleepForMicroseconds(kDelayMicros);
+ ASSERT_EQ(0U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
+ env_->SetBackgroundThreads(4, Env::Priority::HIGH);
+ Env::Default()->SleepForMicroseconds(kDelayMicros);
+ ASSERT_EQ(0U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
+
+ Env::Default()->SleepForMicroseconds(kDelayMicros * 50);
+
+ // Enqueue 5 more tasks. Thread pool size now is 4.
+ // Task 0, 3, 4, 5 running;6, 7 waiting.
+ for (size_t i = 3; i < 8; i++) {
+ env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &tasks[i],
+ Env::Priority::HIGH);
+ }
+ Env::Default()->SleepForMicroseconds(kDelayMicros);
+ ASSERT_EQ(2U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
+ ASSERT_TRUE(tasks[3].IsSleeping());
+ ASSERT_TRUE(tasks[4].IsSleeping());
+ ASSERT_TRUE(tasks[5].IsSleeping());
+ ASSERT_TRUE(!tasks[6].IsSleeping());
+ ASSERT_TRUE(!tasks[7].IsSleeping());
+
+ // Wake up task 0, 3 and 4. Task 5, 6, 7 running.
+ tasks[0].WakeUp();
+ tasks[3].WakeUp();
+ tasks[4].WakeUp();
+
+ Env::Default()->SleepForMicroseconds(kDelayMicros);
+ ASSERT_EQ(0, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
+ for (size_t i = 5; i < 8; i++) {
+ ASSERT_TRUE(tasks[i].IsSleeping());
+ }
+
+ // Shrink back to 1 thread. Still task 5, 6, 7 running
+ env_->SetBackgroundThreads(1, Env::Priority::HIGH);
+ Env::Default()->SleepForMicroseconds(kDelayMicros);
+ ASSERT_TRUE(tasks[5].IsSleeping());
+ ASSERT_TRUE(tasks[6].IsSleeping());
+ ASSERT_TRUE(tasks[7].IsSleeping());
+
+ // Wake up task 6. Task 5, 7 running
+ tasks[6].WakeUp();
+ Env::Default()->SleepForMicroseconds(kDelayMicros);
+ ASSERT_TRUE(tasks[5].IsSleeping());
+ ASSERT_TRUE(!tasks[6].IsSleeping());
+ ASSERT_TRUE(tasks[7].IsSleeping());
+
+ // Wake up threads 7. Task 5 running
+ tasks[7].WakeUp();
+ Env::Default()->SleepForMicroseconds(kDelayMicros);
+ ASSERT_TRUE(!tasks[7].IsSleeping());
+
+ // Enqueue thread 8 and 9. Task 5 running; one of 8, 9 might be running.
+ env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &tasks[8],
+ Env::Priority::HIGH);
+ env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &tasks[9],
+ Env::Priority::HIGH);
+ Env::Default()->SleepForMicroseconds(kDelayMicros);
+ ASSERT_GT(env_->GetThreadPoolQueueLen(Env::Priority::HIGH), 0);
+ ASSERT_TRUE(!tasks[8].IsSleeping() || !tasks[9].IsSleeping());
+
+ // Increase to 4 threads. Task 5, 8, 9 running.
+ env_->SetBackgroundThreads(4, Env::Priority::HIGH);
+ Env::Default()->SleepForMicroseconds(kDelayMicros);
+ ASSERT_EQ(0, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
+ ASSERT_TRUE(tasks[8].IsSleeping());
+ ASSERT_TRUE(tasks[9].IsSleeping());
+
+ // Shrink to 1 thread
+ env_->SetBackgroundThreads(1, Env::Priority::HIGH);
+
+ // Wake up thread 9.
+ tasks[9].WakeUp();
+ Env::Default()->SleepForMicroseconds(kDelayMicros);
+ ASSERT_TRUE(!tasks[9].IsSleeping());
+ ASSERT_TRUE(tasks[8].IsSleeping());
+
+ // Wake up thread 8
+ tasks[8].WakeUp();
+ Env::Default()->SleepForMicroseconds(kDelayMicros);
+ ASSERT_TRUE(!tasks[8].IsSleeping());
+
+ // Wake up the last thread
+ tasks[5].WakeUp();
+
+ Env::Default()->SleepForMicroseconds(kDelayMicros);
+ ASSERT_TRUE(!tasks[5].IsSleeping());
+}
+
#ifdef OS_LINUX
// To make sure the Env::GetUniqueId() related tests work correctly, The files
// should be stored in regular storage like "hard disk" or "flash device".