From: Yueh-Hsuan Chiang Date: Wed, 10 Dec 2014 23:50:13 +0000 (-0800) Subject: Remove ThreadStatus API from 3.9 X-Git-Tag: rocksdb-3.9~1 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=0d0d9bd3dcc7a717bb0ceb39648b23ae1bc7c7f3;p=rocksdb.git Remove ThreadStatus API from 3.9 Summary: Remove ThreadStatus API from 3.9 Test Plan: make dbg -j32 make OPT=-DROCKSDB_LITE shared_lib -j32 Reviewers: sdong, igor Subscribers: dhruba, leveldb Differential Revision: https://reviews.facebook.net/D30087 --- diff --git a/HISTORY.md b/HISTORY.md index ad626711..2692513e 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -2,11 +2,6 @@ ### 3.9.0 (12/8/2014) -### New Features -* Add rocksdb::GetThreadList(), which in the future will return the current status of all - rocksdb-related threads. We will have more code instruments in the following RocksDB - releases. - ### Public API changes * New API to create a checkpoint added. Given a directory name, creates a new database which is an image of the existing database. diff --git a/Makefile b/Makefile index 1ca41f8f..16bc93d0 100644 --- a/Makefile +++ b/Makefile @@ -164,8 +164,7 @@ TESTS = \ flush_job_test \ wal_manager_test \ listener_test \ - compaction_job_test \ - thread_list_test + compaction_job_test SUBSET := $(shell echo $(TESTS) |sed s/^.*$(ROCKSDBTESTS_START)/$(ROCKSDBTESTS_START)/) diff --git a/db/db_impl.cc b/db/db_impl.cc index 96515441..a2a6c161 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -77,7 +77,6 @@ #include "util/stop_watch.h" #include "util/sync_point.h" #include "util/string_util.h" -#include "util/thread_status_impl.h" namespace rocksdb { @@ -246,7 +245,6 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname) } DBImpl::~DBImpl() { - EraseThreadStatusDbInfo(); mutex_.Lock(); if (flush_on_destroy_) { @@ -2505,11 +2503,6 @@ Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& cf_options, } } // MutexLock l(&mutex_) - // this is outside the mutex - if (s.ok()) { - NewThreadStatusCfInfo( - reinterpret_cast(*handle)->cfd()); - } return s; } @@ -2545,7 +2538,6 @@ Status DBImpl::DropColumnFamily(ColumnFamilyHandle* column_family) { // Note that here we erase the associated cf_info of the to-be-dropped // cfd before its ref-count goes to zero to avoid having to erase cf_info // later inside db_mutex. - EraseThreadStatusCfInfo(cfd); assert(cfd->IsDropped()); auto* mutable_cf_options = cfd->GetLatestMutableCFOptions(); max_total_in_memory_state_ -= mutable_cf_options->write_buffer_size * @@ -3574,7 +3566,6 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname, if (cfd != nullptr) { handles->push_back( new ColumnFamilyHandleImpl(cfd, impl, &impl->mutex_)); - impl->NewThreadStatusCfInfo(cfd); } else { if (db_options.create_missing_column_families) { // missing column family, create it @@ -3739,44 +3730,6 @@ Status DestroyDB(const std::string& dbname, const Options& options) { return result; } -#if ROCKSDB_USING_THREAD_STATUS -void DBImpl::NewThreadStatusCfInfo( - ColumnFamilyData* cfd) const { - if (db_options_.enable_thread_tracking) { - ThreadStatusImpl::NewColumnFamilyInfo( - this, GetName(), cfd, cfd->GetName()); - } -} - -void DBImpl::EraseThreadStatusCfInfo( - ColumnFamilyData* cfd) const { - if (db_options_.enable_thread_tracking) { - ThreadStatusImpl::EraseColumnFamilyInfo(cfd); - } -} - -void DBImpl::EraseThreadStatusDbInfo() const { - if (db_options_.enable_thread_tracking) { - ThreadStatusImpl::EraseDatabaseInfo(this); - } -} - -Status GetThreadList(std::vector* thread_list) { - return thread_local_status.GetThreadList(thread_list); -} -#else -void DBImpl::NewThreadStatusCfInfo( - ColumnFamilyData* cfd) const { -} - -void DBImpl::EraseThreadStatusCfInfo( - ColumnFamilyData* cfd) const { -} - -void DBImpl::EraseThreadStatusDbInfo() const { -} -#endif // ROCKSDB_USING_THREAD_STATUS - // // A global method that can dump out the build version void DumpRocksDBBuildVersion(Logger * log) { diff --git a/db/db_impl.h b/db/db_impl.h index 6577733b..0b2c4773 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -267,12 +267,6 @@ class DBImpl : public DB { void NotifyOnFlushCompleted(ColumnFamilyData* cfd, uint64_t file_number, const MutableCFOptions& mutable_cf_options); - void NewThreadStatusCfInfo(ColumnFamilyData* cfd) const; - - void EraseThreadStatusCfInfo(ColumnFamilyData* cfd) const; - - void EraseThreadStatusDbInfo() const; - private: friend class DB; friend class InternalStats; diff --git a/db/db_impl_readonly.cc b/db/db_impl_readonly.cc index 8b0beb7e..2581d845 100644 --- a/db/db_impl_readonly.cc +++ b/db/db_impl_readonly.cc @@ -10,7 +10,6 @@ #include "db/merge_context.h" #include "db/db_iter.h" #include "util/perf_context_imp.h" -#include "util/thread_status_impl.h" namespace rocksdb { @@ -152,13 +151,7 @@ Status DB::OpenForReadOnly( } } impl->mutex_.Unlock(); - if (s.ok()) { - *dbptr = impl; - for (auto* h : *handles) { - impl->NewThreadStatusCfInfo( - reinterpret_cast(h)->cfd()); - } - } else { + if (!s.ok()) { for (auto h : *handles) { delete h; } diff --git a/db/db_test.cc b/db/db_test.cc index 5d40a7b3..8bc755be 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -33,7 +33,6 @@ #include "rocksdb/table.h" #include "rocksdb/options.h" #include "rocksdb/table_properties.h" -#include "rocksdb/thread_status.h" #include "rocksdb/utilities/write_batch_with_index.h" #include "rocksdb/utilities/checkpoint.h" #include "table/block_based_table_factory.h" @@ -51,7 +50,6 @@ #include "util/testutil.h" #include "util/mock_env.h" #include "util/string_util.h" -#include "util/thread_status_impl.h" namespace rocksdb { @@ -9104,73 +9102,6 @@ TEST(DBTest, DynamicMemtableOptions) { sleeping_task_low3.WaitUntilDone(); } -#if ROCKSDB_USING_THREAD_STATUS -TEST(DBTest, GetThreadList) { - Options options; - options.env = env_; - options.enable_thread_tracking = true; - TryReopen(options); - - std::vector thread_list; - Status s = GetThreadList(&thread_list); - - for (int i = 0; i < 2; ++i) { - // repeat the test with differet number of high / low priority threads - const int kTestCount = 3; - const unsigned int kHighPriCounts[kTestCount] = {3, 2, 5}; - const unsigned int kLowPriCounts[kTestCount] = {10, 15, 3}; - for (int test = 0; test < kTestCount; ++test) { - // Change the number of threads in high / low priority pool. - env_->SetBackgroundThreads(kHighPriCounts[test], Env::HIGH); - env_->SetBackgroundThreads(kLowPriCounts[test], Env::LOW); - // Wait to ensure the all threads has been registered - env_->SleepForMicroseconds(100000); - s = GetThreadList(&thread_list); - ASSERT_OK(s); - unsigned int thread_type_counts[ThreadStatus::ThreadType::TOTAL]; - memset(thread_type_counts, 0, sizeof(thread_type_counts)); - for (auto thread : thread_list) { - ASSERT_LT(thread.thread_type, ThreadStatus::ThreadType::TOTAL); - thread_type_counts[thread.thread_type]++; - } - // Verify the total number of threades - ASSERT_EQ( - thread_list.size(), - kHighPriCounts[test] + kLowPriCounts[test]); - // Verify the number of high-priority threads - ASSERT_EQ( - thread_type_counts[ThreadStatus::ThreadType::ROCKSDB_HIGH_PRIORITY], - kHighPriCounts[test]); - // Verify the number of low-priority threads - ASSERT_EQ( - thread_type_counts[ThreadStatus::ThreadType::ROCKSDB_LOW_PRIORITY], - kLowPriCounts[test]); - } - if (i == 0) { - // repeat the test with multiple column families - CreateAndReopenWithCF({"pikachu", "about-to-remove"}, options); - ThreadStatusImpl::TEST_VerifyColumnFamilyInfoMap(handles_, true); - } - } - db_->DropColumnFamily(handles_[2]); - delete handles_[2]; - handles_.erase(handles_.begin() + 2); - ThreadStatusImpl::TEST_VerifyColumnFamilyInfoMap(handles_, true); - Close(); - ThreadStatusImpl::TEST_VerifyColumnFamilyInfoMap(handles_, true); -} - -TEST(DBTest, DisableThreadList) { - Options options; - options.env = env_; - options.enable_thread_tracking = false; - TryReopen(options); - CreateAndReopenWithCF({"pikachu", "about-to-remove"}, options); - // Verify non of the column family info exists - ThreadStatusImpl::TEST_VerifyColumnFamilyInfoMap(handles_, false); -} -#endif // ROCKSDB_USING_THREAD_STATUS - TEST(DBTest, DynamicCompactionOptions) { // minimum write buffer size is enforced at 64KB const uint64_t k32KB = 1 << 15; diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index 72878ff5..59ef6b4a 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -22,7 +22,6 @@ #include "rocksdb/types.h" #include "rocksdb/transaction_log.h" #include "rocksdb/listener.h" -#include "rocksdb/thread_status.h" namespace rocksdb { @@ -548,11 +547,6 @@ Status DestroyDB(const std::string& name, const Options& options); Status RepairDB(const std::string& dbname, const Options& options); #endif -#if ROCKSDB_USING_THREAD_STATUS -// Obtain the status of all rocksdb-related threads. -Status GetThreadList(std::vector* thread_list); -#endif - } // namespace rocksdb diff --git a/include/rocksdb/thread_status.h b/include/rocksdb/thread_status.h deleted file mode 100644 index f622aa40..00000000 --- a/include/rocksdb/thread_status.h +++ /dev/null @@ -1,67 +0,0 @@ -// Copyright (c) 2014, Facebook, Inc. All rights reserved. -// This source code is licensed under the BSD-style license found in the -// LICENSE file in the root directory of this source tree. An additional grant -// of patent rights can be found in the PATENTS file in the same directory. - -#pragma once - -#include -#include - -#ifndef ROCKSDB_USING_THREAD_STATUS -#define ROCKSDB_USING_THREAD_STATUS \ - !defined(ROCKSDB_LITE) && \ - !defined(NROCKSDB_THREAD_STATUS) && \ - !defined(OS_MACOSX) && \ - !defined(IOS_CROSS_COMPILE) -#endif - -namespace rocksdb { - -// A structure that describes the current status of a thread. -// The status of active threads can be fetched using -// rocksdb::GetThreadList(). -struct ThreadStatus { - enum ThreadType { - ROCKSDB_HIGH_PRIORITY = 0x0, - ROCKSDB_LOW_PRIORITY = 0x1, - USER_THREAD = 0x2, - TOTAL = 0x3 - }; - -#if ROCKSDB_USING_THREAD_STATUS - ThreadStatus(const uint64_t _id, - const ThreadType _thread_type, - const std::string& _db_name, - const std::string& _cf_name, - const std::string& _event) : - thread_id(_id), thread_type(_thread_type), - db_name(_db_name), - cf_name(_cf_name), - event(_event) {} - - // An unique ID for the thread. - const uint64_t thread_id; - - // The type of the thread, it could be ROCKSDB_HIGH_PRIORITY, - // ROCKSDB_LOW_PRIORITY, and USER_THREAD - const ThreadType thread_type; - - // The name of the DB instance where the thread is currently - // involved with. It would be set to empty string if the thread - // does not involve in any DB operation. - const std::string db_name; - - // The name of the column family where the thread is currently - // It would be set to empty string if the thread does not involve - // in any column family. - const std::string cf_name; - - // The event that the current thread is involved. - // It would be set to empty string if the information about event - // is not currently available. - const std::string event; -#endif // ROCKSDB_USING_THREAD_STATUS -}; - -} // namespace rocksdb diff --git a/util/env_posix.cc b/util/env_posix.cc index 039e79c4..93cca28c 100644 --- a/util/env_posix.cc +++ b/util/env_posix.cc @@ -42,7 +42,6 @@ #include "util/random.h" #include "util/iostats_context_imp.h" #include "util/rate_limiter.h" -#include "util/thread_status_impl.h" // Get nano time for mach systems #ifdef __MACH__ @@ -76,10 +75,6 @@ int rocksdb_kill_odds = 0; namespace rocksdb { -#if ROCKSDB_USING_THREAD_STATUS -extern ThreadStatusImpl thread_local_status; -#endif - namespace { // A wrapper for fadvise, if the platform doesn't support fadvise, @@ -1667,18 +1662,8 @@ class PosixEnv : public Env { BGThreadMetadata* meta = reinterpret_cast(arg); size_t thread_id = meta->thread_id_; ThreadPool* tp = meta->thread_pool_; -#if ROCKSDB_USING_THREAD_STATUS - // for thread-status - thread_local_status.SetThreadType( - (tp->GetThreadPriority() == Env::Priority::HIGH ? - ThreadStatus::ThreadType::ROCKSDB_HIGH_PRIORITY : - ThreadStatus::ThreadType::ROCKSDB_LOW_PRIORITY)); -#endif delete meta; tp->BGThread(thread_id); -#if ROCKSDB_USING_THREAD_STATUS - thread_local_status.UnregisterThread(); -#endif return nullptr; } diff --git a/util/thread_list_test.cc b/util/thread_list_test.cc deleted file mode 100644 index b5ff60cc..00000000 --- a/util/thread_list_test.cc +++ /dev/null @@ -1,156 +0,0 @@ -// Copyright (c) 2014, Facebook, Inc. All rights reserved. -// This source code is licensed under the BSD-style license found in the -// LICENSE file in the root directory of this source tree. An additional grant -// of patent rights can be found in the PATENTS file in the same directory. - -#include -#include - -#include "util/thread_status_impl.h" -#include "util/testharness.h" -#include "rocksdb/db.h" - -#if ROCKSDB_USING_THREAD_STATUS - -namespace rocksdb { - -class SleepingBackgroundTask { - public: - SleepingBackgroundTask(const void* db_key, const std::string& db_name, - const void* cf_key, const std::string& cf_name) - : db_key_(db_key), db_name_(db_name), - cf_key_(cf_key), cf_name_(cf_name), - should_sleep_(true), sleeping_count_(0) { - ThreadStatusImpl::NewColumnFamilyInfo( - db_key_, db_name_, cf_key_, cf_name_); - } - - ~SleepingBackgroundTask() { - ThreadStatusImpl::EraseDatabaseInfo(db_key_); - } - - void DoSleep() { - thread_local_status.SetColumnFamilyInfoKey(cf_key_); - std::unique_lock l(mutex_); - sleeping_count_++; - while (should_sleep_) { - bg_cv_.wait(l); - } - sleeping_count_--; - bg_cv_.notify_all(); - thread_local_status.SetColumnFamilyInfoKey(0); - } - void WakeUp() { - std::unique_lock l(mutex_); - should_sleep_ = false; - bg_cv_.notify_all(); - } - void WaitUntilDone() { - std::unique_lock l(mutex_); - while (sleeping_count_ > 0) { - bg_cv_.wait(l); - } - } - - static void DoSleepTask(void* arg) { - reinterpret_cast(arg)->DoSleep(); - } - - private: - const void* db_key_; - const std::string db_name_; - const void* cf_key_; - const std::string cf_name_; - std::mutex mutex_; - std::condition_variable bg_cv_; - bool should_sleep_; - std::atomic sleeping_count_; -}; - -class ThreadListTest { - public: - ThreadListTest() { - } -}; - -TEST(ThreadListTest, SimpleColumnFamilyInfoTest) { - Env* env = Env::Default(); - const int kHighPriorityThreads = 3; - const int kLowPriorityThreads = 5; - const int kSleepingHighPriThreads = kHighPriorityThreads - 1; - const int kSleepingLowPriThreads = kLowPriorityThreads / 3; - env->SetBackgroundThreads(kHighPriorityThreads, Env::HIGH); - env->SetBackgroundThreads(kLowPriorityThreads, Env::LOW); - - SleepingBackgroundTask sleeping_task( - reinterpret_cast(1234), "sleeping", - reinterpret_cast(5678), "pikachu"); - - for (int test = 0; test < kSleepingHighPriThreads; ++test) { - env->Schedule(&SleepingBackgroundTask::DoSleepTask, - &sleeping_task, Env::Priority::HIGH); - } - for (int test = 0; test < kSleepingLowPriThreads; ++test) { - env->Schedule(&SleepingBackgroundTask::DoSleepTask, - &sleeping_task, Env::Priority::LOW); - } - - // make sure everything is scheduled. - env->SleepForMicroseconds(10000); - - std::vector thread_list; - - // Verify the number of sleeping threads in each pool. - GetThreadList(&thread_list); - int sleeping_count[ThreadStatus::ThreadType::TOTAL] = {0}; - for (auto thread_status : thread_list) { - if (thread_status.cf_name == "pikachu" && - thread_status.db_name == "sleeping") { - sleeping_count[thread_status.thread_type]++; - } - } - ASSERT_EQ( - sleeping_count[ThreadStatus::ThreadType::ROCKSDB_HIGH_PRIORITY], - kSleepingHighPriThreads); - ASSERT_EQ( - sleeping_count[ThreadStatus::ThreadType::ROCKSDB_LOW_PRIORITY], - kSleepingLowPriThreads); - ASSERT_EQ( - sleeping_count[ThreadStatus::ThreadType::USER_THREAD], 0); - - sleeping_task.WakeUp(); - sleeping_task.WaitUntilDone(); - - // Verify none of the threads are sleeping - GetThreadList(&thread_list); - for (int i = 0; i < ThreadStatus::ThreadType::TOTAL; ++i) { - sleeping_count[i] = 0; - } - - for (auto thread_status : thread_list) { - if (thread_status.cf_name == "pikachu" && - thread_status.db_name == "sleeping") { - sleeping_count[thread_status.thread_type]++; - } - } - ASSERT_EQ( - sleeping_count[ThreadStatus::ThreadType::ROCKSDB_HIGH_PRIORITY], 0); - ASSERT_EQ( - sleeping_count[ThreadStatus::ThreadType::ROCKSDB_LOW_PRIORITY], 0); - ASSERT_EQ( - sleeping_count[ThreadStatus::ThreadType::USER_THREAD], 0); -} - -} // namespace rocksdb - -int main(int argc, char** argv) { - return rocksdb::test::RunAllTests(); -} - -#else - -int main(int argc, char** argv) { - return 0; -} - -#endif // ROCKSDB_USING_THREAD_STATUS diff --git a/util/thread_status_impl.cc b/util/thread_status_impl.cc deleted file mode 100644 index 35dc181e..00000000 --- a/util/thread_status_impl.cc +++ /dev/null @@ -1,194 +0,0 @@ -// Copyright (c) 2014, Facebook, Inc. All rights reserved. -// This source code is licensed under the BSD-style license found in the -// LICENSE file in the root directory of this source tree. An additional grant -// of patent rights can be found in the PATENTS file in the same directory. - -#include "port/likely.h" -#include "util/mutexlock.h" -#include "util/thread_status_impl.h" - -namespace rocksdb { - -#if ROCKSDB_USING_THREAD_STATUS -__thread ThreadStatusData* ThreadStatusImpl::thread_status_data_ = nullptr; -std::mutex ThreadStatusImpl::thread_list_mutex_; -std::unordered_set ThreadStatusImpl::thread_data_set_; -std::unordered_map> - ThreadStatusImpl::cf_info_map_; -std::unordered_map> - ThreadStatusImpl::db_key_map_; - -ThreadStatusImpl thread_local_status; - -ThreadStatusImpl::~ThreadStatusImpl() { - assert(thread_data_set_.size() == 0); -} - -void ThreadStatusImpl::UnregisterThread() { - if (thread_status_data_ != nullptr) { - std::lock_guard lck(thread_list_mutex_); - thread_data_set_.erase(thread_status_data_); - delete thread_status_data_; - thread_status_data_ = nullptr; - } -} - -void ThreadStatusImpl::SetThreadType( - ThreadStatus::ThreadType ttype) { - auto* data = InitAndGet(); - data->thread_type.store(ttype, std::memory_order_relaxed); -} - -void ThreadStatusImpl::SetColumnFamilyInfoKey( - const void* cf_key) { - auto* data = InitAndGet(); - data->cf_key.store(cf_key, std::memory_order_relaxed); -} - -void ThreadStatusImpl::SetEventInfoPtr( - const ThreadEventInfo* event_info) { - auto* data = InitAndGet(); - data->event_info.store(event_info, std::memory_order_relaxed); -} - -Status ThreadStatusImpl::GetThreadList( - std::vector* thread_list) const { - thread_list->clear(); - std::vector> valid_list; - - std::lock_guard lck(thread_list_mutex_); - for (auto* thread_data : thread_data_set_) { - assert(thread_data); - auto thread_type = thread_data->thread_type.load( - std::memory_order_relaxed); - auto cf_key = thread_data->cf_key.load( - std::memory_order_relaxed); - auto iter = cf_info_map_.find(cf_key); - assert(cf_key == 0 || iter != cf_info_map_.end()); - auto* cf_info = iter != cf_info_map_.end() ? - iter->second.get() : nullptr; - auto* event_info = thread_data->event_info.load( - std::memory_order_relaxed); - const std::string* db_name = nullptr; - const std::string* cf_name = nullptr; - const std::string* event_name = nullptr; - if (cf_info != nullptr) { - db_name = &cf_info->db_name; - cf_name = &cf_info->cf_name; - // display lower-level info only when higher-level info is available. - if (event_info != nullptr) { - event_name = &event_info->event_name; - } - } - thread_list->emplace_back( - thread_data->thread_id, thread_type, - db_name ? *db_name : "", - cf_name ? *cf_name : "", - event_name ? *event_name : ""); - } - - return Status::OK(); -} - -ThreadStatusData* ThreadStatusImpl::InitAndGet() { - if (UNLIKELY(thread_status_data_ == nullptr)) { - thread_status_data_ = new ThreadStatusData(); - thread_status_data_->thread_id = reinterpret_cast( - thread_status_data_); - std::lock_guard lck(thread_list_mutex_); - thread_data_set_.insert(thread_status_data_); - } - return thread_status_data_; -} - -void ThreadStatusImpl::NewColumnFamilyInfo( - const void* db_key, const std::string& db_name, - const void* cf_key, const std::string& cf_name) { - std::lock_guard lck(thread_list_mutex_); - - cf_info_map_[cf_key].reset( - new ConstantColumnFamilyInfo(db_key, db_name, cf_name)); - db_key_map_[db_key].insert(cf_key); -} - -void ThreadStatusImpl::EraseColumnFamilyInfo(const void* cf_key) { - std::lock_guard lck(thread_list_mutex_); - auto cf_pair = cf_info_map_.find(cf_key); - assert(cf_pair != cf_info_map_.end()); - - auto* cf_info = cf_pair->second.get(); - assert(cf_info); - - // Remove its entry from db_key_map_ by the following steps: - // 1. Obtain the entry in db_key_map_ whose set contains cf_key - // 2. Remove it from the set. - auto db_pair = db_key_map_.find(cf_info->db_key); - assert(db_pair != db_key_map_.end()); - size_t result __attribute__((unused)) = db_pair->second.erase(cf_key); - assert(result); - - cf_pair->second.reset(); - result = cf_info_map_.erase(cf_key); - assert(result); -} - -void ThreadStatusImpl::EraseDatabaseInfo(const void* db_key) { - std::lock_guard lck(thread_list_mutex_); - auto db_pair = db_key_map_.find(db_key); - if (UNLIKELY(db_pair == db_key_map_.end())) { - // In some occasional cases such as DB::Open fails, we won't - // register ColumnFamilyInfo for a db. - return; - } - - size_t result __attribute__((unused)) = 0; - for (auto cf_key : db_pair->second) { - auto cf_pair = cf_info_map_.find(cf_key); - assert(cf_pair != cf_info_map_.end()); - cf_pair->second.reset(); - result = cf_info_map_.erase(cf_key); - assert(result); - } - db_key_map_.erase(db_key); -} - -#else - -ThreadStatusImpl::~ThreadStatusImpl() { -} - -void ThreadStatusImpl::UnregisterThread() { -} - -void ThreadStatusImpl::SetThreadType( - ThreadStatus::ThreadType ttype) { -} - -void ThreadStatusImpl::SetColumnFamilyInfoKey( - const void* cf_key) { -} - -void ThreadStatusImpl::SetEventInfoPtr( - const ThreadEventInfo* event_info) { -} - -Status ThreadStatusImpl::GetThreadList( - std::vector* thread_list) const { - return Status::NotSupported( - "GetThreadList is not supported in the current running environment."); -} - -void ThreadStatusImpl::NewColumnFamilyInfo( - const void* db_key, const std::string& db_name, - const void* cf_key, const std::string& cf_name) { -} - -void ThreadStatusImpl::EraseColumnFamilyInfo(const void* cf_key) { -} - -void ThreadStatusImpl::EraseDatabaseInfo(const void* db_key) { -} - -ThreadStatusImpl thread_local_status; -#endif // ROCKSDB_USING_THREAD_STATUS -} // namespace rocksdb diff --git a/util/thread_status_impl.h b/util/thread_status_impl.h deleted file mode 100644 index a6e9a7e5..00000000 --- a/util/thread_status_impl.h +++ /dev/null @@ -1,167 +0,0 @@ -// Copyright (c) 2013, Facebook, Inc. All rights reserved. -// This source code is licensed under the BSD-style license found in the -// LICENSE file in the root directory of this source tree. An additional grant -// of patent rights can be found in the PATENTS file in the same directory. -// -// The implementation of ThreadStatus. It is implemented via combination -// of macros and thread-local variables. -// -// Note that we make get and set access to ThreadStatusData lockless. -// As a result, ThreadStatusData as a whole is not atomic. However, -// we guarantee consistent ThreadStatusData all the time whenever -// user call GetThreadList(). This consistency guarantee is done -// by having the following constraint in the internal implementation -// of set and get order: -// -// 1. When reset any information in ThreadStatusData, always start from -// clearing up the lower-level information first. -// 2. When setting any information in ThreadStatusData, always start from -// setting the higher-level information. -// 3. When returning ThreadStatusData to the user, fields are fetched from -// higher-level to lower-level. In addition, where there's a nullptr -// in one field, then all fields that has lower-level than that field -// should be ignored. -// -// The high to low level information would be: -// thread_id > thread_type > db > cf > event > event_count > event_details -// -// This means user might not always get full information, but whenever -// returned by the GetThreadList() is guaranteed to be consistent. -#pragma once -#include -#include -#include -#include -#include -#include -#include -#include "rocksdb/status.h" -#include "rocksdb/thread_status.h" -#include "port/port_posix.h" - -namespace rocksdb { - -class ColumnFamilyHandle; - -// The mutable version of ThreadStatus. It has a static set maintaining -// the set of current registered threades. -// -// Note that it is suggested to call the above macros. -struct ConstantColumnFamilyInfo { -#if ROCKSDB_USING_THREAD_STATUS - public: - ConstantColumnFamilyInfo( - const void* _db_key, - const std::string& _db_name, - const std::string& _cf_name) : - db_key(_db_key), db_name(_db_name), cf_name(_cf_name) {} - const void* db_key; - const std::string db_name; - const std::string cf_name; -#endif // ROCKSDB_USING_THREAD_STATUS -}; - -struct ThreadEventInfo { -#if ROCKSDB_USING_THREAD_STATUS - public: - const std::string event_name; -#endif // ROCKSDB_USING_THREAD_STATUS -}; - -// the internal data-structure that is used to reflect the current -// status of a thread using a set of atomic pointers. -struct ThreadStatusData { -#if ROCKSDB_USING_THREAD_STATUS - explicit ThreadStatusData() : thread_id(0) { - thread_type.store(ThreadStatus::ThreadType::USER_THREAD); - cf_key.store(0); - event_info.store(nullptr); - } - uint64_t thread_id; - std::atomic thread_type; - std::atomic cf_key; - std::atomic event_info; -#endif // ROCKSDB_USING_THREAD_STATUS -}; - -class ThreadStatusImpl { - public: - ThreadStatusImpl() {} - - // Releases all ThreadStatusData of all active threads. - ~ThreadStatusImpl(); - - void UnregisterThread(); - - // Set the thread type of the current thread. - void SetThreadType(ThreadStatus::ThreadType ttype); - - // Update the column-family info of the current thread by setting - // its thread-local pointer of ThreadEventInfo to the correct entry. - void SetColumnFamilyInfoKey(const void* cf_key); - - // Update the event info of the current thread by setting - // its thread-local pointer of ThreadEventInfo to the correct entry. - void SetEventInfoPtr(const ThreadEventInfo* event_info); - - Status GetThreadList( - std::vector* thread_list) const; - - // Create an entry in the global ColumnFamilyInfo table for the - // specified column family. This function should be called only - // when the current thread does not hold db_mutex. - static void NewColumnFamilyInfo( - const void* db_key, const std::string& db_name, - const void* cf_key, const std::string& cf_name); - - // Erase all ConstantColumnFamilyInfo that is associated with the - // specified db instance. This function should be called only when - // the current thread does not hold db_mutex. - static void EraseDatabaseInfo(const void* db_key); - - // Erase the ConstantColumnFamilyInfo that is associated with the - // specified ColumnFamilyData. This function should be called only - // when the current thread does not hold db_mutex. - static void EraseColumnFamilyInfo(const void* cf_key); - - // Verifies whether the input ColumnFamilyHandles matches - // the information stored in the current cf_info_map. - static void TEST_VerifyColumnFamilyInfoMap( - const std::vector& handles, - bool check_exist); - - protected: - -#if ROCKSDB_USING_THREAD_STATUS - // The thread-local variable for storing thread status. - static __thread ThreadStatusData* thread_status_data_; - - // Obtain the pointer to the thread status data. It also performs - // initialization when necessary. - ThreadStatusData* InitAndGet(); - - // The mutex that protects cf_info_map and db_key_map. - static std::mutex thread_list_mutex_; - - // The current status data of all active threads. - static std::unordered_set thread_data_set_; - - // A global map that keeps the column family information. It is stored - // globally instead of inside DB is to avoid the situation where DB is - // closing while GetThreadList function already get the pointer to its - // CopnstantColumnFamilyInfo. - static std::unordered_map< - const void*, std::unique_ptr> cf_info_map_; - - // A db_key to cf_key map that allows erasing elements in cf_info_map - // associated to the same db_key faster. - static std::unordered_map< - const void*, std::unordered_set> db_key_map_; -#else - static ThreadStatusData* thread_status_data_; -#endif // ROCKSDB_USING_THREAD_STATUS -}; - - -extern ThreadStatusImpl thread_local_status; -} // namespace rocksdb diff --git a/util/thread_status_impl_debug.cc b/util/thread_status_impl_debug.cc deleted file mode 100644 index 5489499d..00000000 --- a/util/thread_status_impl_debug.cc +++ /dev/null @@ -1,33 +0,0 @@ -// Copyright (c) 2014, Facebook, Inc. All rights reserved. -// This source code is licensed under the BSD-style license found in the -// LICENSE file in the root directory of this source tree. An additional grant -// of patent rights can be found in the PATENTS file in the same directory. - -#include - -#include "util/thread_status_impl.h" -#include "db/column_family.h" -#if ROCKSDB_USING_THREAD_STATUS - -namespace rocksdb { -void ThreadStatusImpl::TEST_VerifyColumnFamilyInfoMap( - const std::vector& handles, - bool check_exist) { - std::unique_lock lock(thread_list_mutex_); - if (check_exist) { - assert(cf_info_map_.size() == handles.size()); - } - for (auto* handle : handles) { - auto* cfd = reinterpret_cast(handle)->cfd(); - auto iter __attribute__((unused)) = cf_info_map_.find(cfd); - if (check_exist) { - assert(iter != cf_info_map_.end()); - assert(iter->second); - assert(iter->second->cf_name == cfd->GetName()); - } else { - assert(iter == cf_info_map_.end()); - } - } -} -} // namespace rocksdb -#endif // ROCKSDB_USING_THREAD_STATUS diff --git a/utilities/compacted_db/compacted_db_impl.cc b/utilities/compacted_db/compacted_db_impl.cc index fd35698b..335dae77 100644 --- a/utilities/compacted_db/compacted_db_impl.cc +++ b/utilities/compacted_db/compacted_db_impl.cc @@ -8,7 +8,6 @@ #include "db/db_impl.h" #include "db/version_set.h" #include "table/get_context.h" -#include "util/thread_status_impl.h" namespace rocksdb { @@ -103,7 +102,6 @@ Status CompactedDBImpl::Init(const Options& options) { if (!s.ok()) { return s; } - NewThreadStatusCfInfo(cfd_); version_ = cfd_->GetSuperVersion()->current; user_comparator_ = cfd_->user_comparator(); auto* vstorage = version_->storage_info();