### 3.9.0 (12/8/2014)
-### New Features
-* Add rocksdb::GetThreadList(), which in the future will return the current status of all
- rocksdb-related threads. We will have more code instruments in the following RocksDB
- releases.
-
### Public API changes
* New API to create a checkpoint added. Given a directory name, creates a new
database which is an image of the existing database.
flush_job_test \
wal_manager_test \
listener_test \
- compaction_job_test \
- thread_list_test
+ compaction_job_test
SUBSET := $(shell echo $(TESTS) |sed s/^.*$(ROCKSDBTESTS_START)/$(ROCKSDBTESTS_START)/)
#include "util/stop_watch.h"
#include "util/sync_point.h"
#include "util/string_util.h"
-#include "util/thread_status_impl.h"
namespace rocksdb {
}
DBImpl::~DBImpl() {
- EraseThreadStatusDbInfo();
mutex_.Lock();
if (flush_on_destroy_) {
}
} // MutexLock l(&mutex_)
- // this is outside the mutex
- if (s.ok()) {
- NewThreadStatusCfInfo(
- reinterpret_cast<ColumnFamilyHandleImpl*>(*handle)->cfd());
- }
return s;
}
// Note that here we erase the associated cf_info of the to-be-dropped
// cfd before its ref-count goes to zero to avoid having to erase cf_info
// later inside db_mutex.
- EraseThreadStatusCfInfo(cfd);
assert(cfd->IsDropped());
auto* mutable_cf_options = cfd->GetLatestMutableCFOptions();
max_total_in_memory_state_ -= mutable_cf_options->write_buffer_size *
if (cfd != nullptr) {
handles->push_back(
new ColumnFamilyHandleImpl(cfd, impl, &impl->mutex_));
- impl->NewThreadStatusCfInfo(cfd);
} else {
if (db_options.create_missing_column_families) {
// missing column family, create it
return result;
}
-#if ROCKSDB_USING_THREAD_STATUS
-void DBImpl::NewThreadStatusCfInfo(
- ColumnFamilyData* cfd) const {
- if (db_options_.enable_thread_tracking) {
- ThreadStatusImpl::NewColumnFamilyInfo(
- this, GetName(), cfd, cfd->GetName());
- }
-}
-
-void DBImpl::EraseThreadStatusCfInfo(
- ColumnFamilyData* cfd) const {
- if (db_options_.enable_thread_tracking) {
- ThreadStatusImpl::EraseColumnFamilyInfo(cfd);
- }
-}
-
-void DBImpl::EraseThreadStatusDbInfo() const {
- if (db_options_.enable_thread_tracking) {
- ThreadStatusImpl::EraseDatabaseInfo(this);
- }
-}
-
-Status GetThreadList(std::vector<ThreadStatus>* thread_list) {
- return thread_local_status.GetThreadList(thread_list);
-}
-#else
-void DBImpl::NewThreadStatusCfInfo(
- ColumnFamilyData* cfd) const {
-}
-
-void DBImpl::EraseThreadStatusCfInfo(
- ColumnFamilyData* cfd) const {
-}
-
-void DBImpl::EraseThreadStatusDbInfo() const {
-}
-#endif // ROCKSDB_USING_THREAD_STATUS
-
//
// A global method that can dump out the build version
void DumpRocksDBBuildVersion(Logger * log) {
void NotifyOnFlushCompleted(ColumnFamilyData* cfd, uint64_t file_number,
const MutableCFOptions& mutable_cf_options);
- void NewThreadStatusCfInfo(ColumnFamilyData* cfd) const;
-
- void EraseThreadStatusCfInfo(ColumnFamilyData* cfd) const;
-
- void EraseThreadStatusDbInfo() const;
-
private:
friend class DB;
friend class InternalStats;
#include "db/merge_context.h"
#include "db/db_iter.h"
#include "util/perf_context_imp.h"
-#include "util/thread_status_impl.h"
namespace rocksdb {
}
}
impl->mutex_.Unlock();
- if (s.ok()) {
- *dbptr = impl;
- for (auto* h : *handles) {
- impl->NewThreadStatusCfInfo(
- reinterpret_cast<ColumnFamilyHandleImpl*>(h)->cfd());
- }
- } else {
+ if (!s.ok()) {
for (auto h : *handles) {
delete h;
}
#include "rocksdb/table.h"
#include "rocksdb/options.h"
#include "rocksdb/table_properties.h"
-#include "rocksdb/thread_status.h"
#include "rocksdb/utilities/write_batch_with_index.h"
#include "rocksdb/utilities/checkpoint.h"
#include "table/block_based_table_factory.h"
#include "util/testutil.h"
#include "util/mock_env.h"
#include "util/string_util.h"
-#include "util/thread_status_impl.h"
namespace rocksdb {
sleeping_task_low3.WaitUntilDone();
}
-#if ROCKSDB_USING_THREAD_STATUS
-TEST(DBTest, GetThreadList) {
- Options options;
- options.env = env_;
- options.enable_thread_tracking = true;
- TryReopen(options);
-
- std::vector<ThreadStatus> thread_list;
- Status s = GetThreadList(&thread_list);
-
- for (int i = 0; i < 2; ++i) {
- // repeat the test with differet number of high / low priority threads
- const int kTestCount = 3;
- const unsigned int kHighPriCounts[kTestCount] = {3, 2, 5};
- const unsigned int kLowPriCounts[kTestCount] = {10, 15, 3};
- for (int test = 0; test < kTestCount; ++test) {
- // Change the number of threads in high / low priority pool.
- env_->SetBackgroundThreads(kHighPriCounts[test], Env::HIGH);
- env_->SetBackgroundThreads(kLowPriCounts[test], Env::LOW);
- // Wait to ensure the all threads has been registered
- env_->SleepForMicroseconds(100000);
- s = GetThreadList(&thread_list);
- ASSERT_OK(s);
- unsigned int thread_type_counts[ThreadStatus::ThreadType::TOTAL];
- memset(thread_type_counts, 0, sizeof(thread_type_counts));
- for (auto thread : thread_list) {
- ASSERT_LT(thread.thread_type, ThreadStatus::ThreadType::TOTAL);
- thread_type_counts[thread.thread_type]++;
- }
- // Verify the total number of threades
- ASSERT_EQ(
- thread_list.size(),
- kHighPriCounts[test] + kLowPriCounts[test]);
- // Verify the number of high-priority threads
- ASSERT_EQ(
- thread_type_counts[ThreadStatus::ThreadType::ROCKSDB_HIGH_PRIORITY],
- kHighPriCounts[test]);
- // Verify the number of low-priority threads
- ASSERT_EQ(
- thread_type_counts[ThreadStatus::ThreadType::ROCKSDB_LOW_PRIORITY],
- kLowPriCounts[test]);
- }
- if (i == 0) {
- // repeat the test with multiple column families
- CreateAndReopenWithCF({"pikachu", "about-to-remove"}, options);
- ThreadStatusImpl::TEST_VerifyColumnFamilyInfoMap(handles_, true);
- }
- }
- db_->DropColumnFamily(handles_[2]);
- delete handles_[2];
- handles_.erase(handles_.begin() + 2);
- ThreadStatusImpl::TEST_VerifyColumnFamilyInfoMap(handles_, true);
- Close();
- ThreadStatusImpl::TEST_VerifyColumnFamilyInfoMap(handles_, true);
-}
-
-TEST(DBTest, DisableThreadList) {
- Options options;
- options.env = env_;
- options.enable_thread_tracking = false;
- TryReopen(options);
- CreateAndReopenWithCF({"pikachu", "about-to-remove"}, options);
- // Verify non of the column family info exists
- ThreadStatusImpl::TEST_VerifyColumnFamilyInfoMap(handles_, false);
-}
-#endif // ROCKSDB_USING_THREAD_STATUS
-
TEST(DBTest, DynamicCompactionOptions) {
// minimum write buffer size is enforced at 64KB
const uint64_t k32KB = 1 << 15;
#include "rocksdb/types.h"
#include "rocksdb/transaction_log.h"
#include "rocksdb/listener.h"
-#include "rocksdb/thread_status.h"
namespace rocksdb {
Status RepairDB(const std::string& dbname, const Options& options);
#endif
-#if ROCKSDB_USING_THREAD_STATUS
-// Obtain the status of all rocksdb-related threads.
-Status GetThreadList(std::vector<ThreadStatus>* thread_list);
-#endif
-
} // namespace rocksdb
+++ /dev/null
-// Copyright (c) 2014, Facebook, Inc. All rights reserved.
-// This source code is licensed under the BSD-style license found in the
-// LICENSE file in the root directory of this source tree. An additional grant
-// of patent rights can be found in the PATENTS file in the same directory.
-
-#pragma once
-
-#include <cstddef>
-#include <string>
-
-#ifndef ROCKSDB_USING_THREAD_STATUS
-#define ROCKSDB_USING_THREAD_STATUS \
- !defined(ROCKSDB_LITE) && \
- !defined(NROCKSDB_THREAD_STATUS) && \
- !defined(OS_MACOSX) && \
- !defined(IOS_CROSS_COMPILE)
-#endif
-
-namespace rocksdb {
-
-// A structure that describes the current status of a thread.
-// The status of active threads can be fetched using
-// rocksdb::GetThreadList().
-struct ThreadStatus {
- enum ThreadType {
- ROCKSDB_HIGH_PRIORITY = 0x0,
- ROCKSDB_LOW_PRIORITY = 0x1,
- USER_THREAD = 0x2,
- TOTAL = 0x3
- };
-
-#if ROCKSDB_USING_THREAD_STATUS
- ThreadStatus(const uint64_t _id,
- const ThreadType _thread_type,
- const std::string& _db_name,
- const std::string& _cf_name,
- const std::string& _event) :
- thread_id(_id), thread_type(_thread_type),
- db_name(_db_name),
- cf_name(_cf_name),
- event(_event) {}
-
- // An unique ID for the thread.
- const uint64_t thread_id;
-
- // The type of the thread, it could be ROCKSDB_HIGH_PRIORITY,
- // ROCKSDB_LOW_PRIORITY, and USER_THREAD
- const ThreadType thread_type;
-
- // The name of the DB instance where the thread is currently
- // involved with. It would be set to empty string if the thread
- // does not involve in any DB operation.
- const std::string db_name;
-
- // The name of the column family where the thread is currently
- // It would be set to empty string if the thread does not involve
- // in any column family.
- const std::string cf_name;
-
- // The event that the current thread is involved.
- // It would be set to empty string if the information about event
- // is not currently available.
- const std::string event;
-#endif // ROCKSDB_USING_THREAD_STATUS
-};
-
-} // namespace rocksdb
#include "util/random.h"
#include "util/iostats_context_imp.h"
#include "util/rate_limiter.h"
-#include "util/thread_status_impl.h"
// Get nano time for mach systems
#ifdef __MACH__
namespace rocksdb {
-#if ROCKSDB_USING_THREAD_STATUS
-extern ThreadStatusImpl thread_local_status;
-#endif
-
namespace {
// A wrapper for fadvise, if the platform doesn't support fadvise,
BGThreadMetadata* meta = reinterpret_cast<BGThreadMetadata*>(arg);
size_t thread_id = meta->thread_id_;
ThreadPool* tp = meta->thread_pool_;
-#if ROCKSDB_USING_THREAD_STATUS
- // for thread-status
- thread_local_status.SetThreadType(
- (tp->GetThreadPriority() == Env::Priority::HIGH ?
- ThreadStatus::ThreadType::ROCKSDB_HIGH_PRIORITY :
- ThreadStatus::ThreadType::ROCKSDB_LOW_PRIORITY));
-#endif
delete meta;
tp->BGThread(thread_id);
-#if ROCKSDB_USING_THREAD_STATUS
- thread_local_status.UnregisterThread();
-#endif
return nullptr;
}
+++ /dev/null
-// Copyright (c) 2014, Facebook, Inc. All rights reserved.
-// This source code is licensed under the BSD-style license found in the
-// LICENSE file in the root directory of this source tree. An additional grant
-// of patent rights can be found in the PATENTS file in the same directory.
-
-#include <mutex>
-#include <condition_variable>
-
-#include "util/thread_status_impl.h"
-#include "util/testharness.h"
-#include "rocksdb/db.h"
-
-#if ROCKSDB_USING_THREAD_STATUS
-
-namespace rocksdb {
-
-class SleepingBackgroundTask {
- public:
- SleepingBackgroundTask(const void* db_key, const std::string& db_name,
- const void* cf_key, const std::string& cf_name)
- : db_key_(db_key), db_name_(db_name),
- cf_key_(cf_key), cf_name_(cf_name),
- should_sleep_(true), sleeping_count_(0) {
- ThreadStatusImpl::NewColumnFamilyInfo(
- db_key_, db_name_, cf_key_, cf_name_);
- }
-
- ~SleepingBackgroundTask() {
- ThreadStatusImpl::EraseDatabaseInfo(db_key_);
- }
-
- void DoSleep() {
- thread_local_status.SetColumnFamilyInfoKey(cf_key_);
- std::unique_lock<std::mutex> l(mutex_);
- sleeping_count_++;
- while (should_sleep_) {
- bg_cv_.wait(l);
- }
- sleeping_count_--;
- bg_cv_.notify_all();
- thread_local_status.SetColumnFamilyInfoKey(0);
- }
- void WakeUp() {
- std::unique_lock<std::mutex> l(mutex_);
- should_sleep_ = false;
- bg_cv_.notify_all();
- }
- void WaitUntilDone() {
- std::unique_lock<std::mutex> l(mutex_);
- while (sleeping_count_ > 0) {
- bg_cv_.wait(l);
- }
- }
-
- static void DoSleepTask(void* arg) {
- reinterpret_cast<SleepingBackgroundTask*>(arg)->DoSleep();
- }
-
- private:
- const void* db_key_;
- const std::string db_name_;
- const void* cf_key_;
- const std::string cf_name_;
- std::mutex mutex_;
- std::condition_variable bg_cv_;
- bool should_sleep_;
- std::atomic<int> sleeping_count_;
-};
-
-class ThreadListTest {
- public:
- ThreadListTest() {
- }
-};
-
-TEST(ThreadListTest, SimpleColumnFamilyInfoTest) {
- Env* env = Env::Default();
- const int kHighPriorityThreads = 3;
- const int kLowPriorityThreads = 5;
- const int kSleepingHighPriThreads = kHighPriorityThreads - 1;
- const int kSleepingLowPriThreads = kLowPriorityThreads / 3;
- env->SetBackgroundThreads(kHighPriorityThreads, Env::HIGH);
- env->SetBackgroundThreads(kLowPriorityThreads, Env::LOW);
-
- SleepingBackgroundTask sleeping_task(
- reinterpret_cast<void*>(1234), "sleeping",
- reinterpret_cast<void*>(5678), "pikachu");
-
- for (int test = 0; test < kSleepingHighPriThreads; ++test) {
- env->Schedule(&SleepingBackgroundTask::DoSleepTask,
- &sleeping_task, Env::Priority::HIGH);
- }
- for (int test = 0; test < kSleepingLowPriThreads; ++test) {
- env->Schedule(&SleepingBackgroundTask::DoSleepTask,
- &sleeping_task, Env::Priority::LOW);
- }
-
- // make sure everything is scheduled.
- env->SleepForMicroseconds(10000);
-
- std::vector<ThreadStatus> thread_list;
-
- // Verify the number of sleeping threads in each pool.
- GetThreadList(&thread_list);
- int sleeping_count[ThreadStatus::ThreadType::TOTAL] = {0};
- for (auto thread_status : thread_list) {
- if (thread_status.cf_name == "pikachu" &&
- thread_status.db_name == "sleeping") {
- sleeping_count[thread_status.thread_type]++;
- }
- }
- ASSERT_EQ(
- sleeping_count[ThreadStatus::ThreadType::ROCKSDB_HIGH_PRIORITY],
- kSleepingHighPriThreads);
- ASSERT_EQ(
- sleeping_count[ThreadStatus::ThreadType::ROCKSDB_LOW_PRIORITY],
- kSleepingLowPriThreads);
- ASSERT_EQ(
- sleeping_count[ThreadStatus::ThreadType::USER_THREAD], 0);
-
- sleeping_task.WakeUp();
- sleeping_task.WaitUntilDone();
-
- // Verify none of the threads are sleeping
- GetThreadList(&thread_list);
- for (int i = 0; i < ThreadStatus::ThreadType::TOTAL; ++i) {
- sleeping_count[i] = 0;
- }
-
- for (auto thread_status : thread_list) {
- if (thread_status.cf_name == "pikachu" &&
- thread_status.db_name == "sleeping") {
- sleeping_count[thread_status.thread_type]++;
- }
- }
- ASSERT_EQ(
- sleeping_count[ThreadStatus::ThreadType::ROCKSDB_HIGH_PRIORITY], 0);
- ASSERT_EQ(
- sleeping_count[ThreadStatus::ThreadType::ROCKSDB_LOW_PRIORITY], 0);
- ASSERT_EQ(
- sleeping_count[ThreadStatus::ThreadType::USER_THREAD], 0);
-}
-
-} // namespace rocksdb
-
-int main(int argc, char** argv) {
- return rocksdb::test::RunAllTests();
-}
-
-#else
-
-int main(int argc, char** argv) {
- return 0;
-}
-
-#endif // ROCKSDB_USING_THREAD_STATUS
+++ /dev/null
-// Copyright (c) 2014, Facebook, Inc. All rights reserved.
-// This source code is licensed under the BSD-style license found in the
-// LICENSE file in the root directory of this source tree. An additional grant
-// of patent rights can be found in the PATENTS file in the same directory.
-
-#include "port/likely.h"
-#include "util/mutexlock.h"
-#include "util/thread_status_impl.h"
-
-namespace rocksdb {
-
-#if ROCKSDB_USING_THREAD_STATUS
-__thread ThreadStatusData* ThreadStatusImpl::thread_status_data_ = nullptr;
-std::mutex ThreadStatusImpl::thread_list_mutex_;
-std::unordered_set<ThreadStatusData*> ThreadStatusImpl::thread_data_set_;
-std::unordered_map<const void*, std::unique_ptr<ConstantColumnFamilyInfo>>
- ThreadStatusImpl::cf_info_map_;
-std::unordered_map<const void*, std::unordered_set<const void*>>
- ThreadStatusImpl::db_key_map_;
-
-ThreadStatusImpl thread_local_status;
-
-ThreadStatusImpl::~ThreadStatusImpl() {
- assert(thread_data_set_.size() == 0);
-}
-
-void ThreadStatusImpl::UnregisterThread() {
- if (thread_status_data_ != nullptr) {
- std::lock_guard<std::mutex> lck(thread_list_mutex_);
- thread_data_set_.erase(thread_status_data_);
- delete thread_status_data_;
- thread_status_data_ = nullptr;
- }
-}
-
-void ThreadStatusImpl::SetThreadType(
- ThreadStatus::ThreadType ttype) {
- auto* data = InitAndGet();
- data->thread_type.store(ttype, std::memory_order_relaxed);
-}
-
-void ThreadStatusImpl::SetColumnFamilyInfoKey(
- const void* cf_key) {
- auto* data = InitAndGet();
- data->cf_key.store(cf_key, std::memory_order_relaxed);
-}
-
-void ThreadStatusImpl::SetEventInfoPtr(
- const ThreadEventInfo* event_info) {
- auto* data = InitAndGet();
- data->event_info.store(event_info, std::memory_order_relaxed);
-}
-
-Status ThreadStatusImpl::GetThreadList(
- std::vector<ThreadStatus>* thread_list) const {
- thread_list->clear();
- std::vector<std::shared_ptr<ThreadStatusData>> valid_list;
-
- std::lock_guard<std::mutex> lck(thread_list_mutex_);
- for (auto* thread_data : thread_data_set_) {
- assert(thread_data);
- auto thread_type = thread_data->thread_type.load(
- std::memory_order_relaxed);
- auto cf_key = thread_data->cf_key.load(
- std::memory_order_relaxed);
- auto iter = cf_info_map_.find(cf_key);
- assert(cf_key == 0 || iter != cf_info_map_.end());
- auto* cf_info = iter != cf_info_map_.end() ?
- iter->second.get() : nullptr;
- auto* event_info = thread_data->event_info.load(
- std::memory_order_relaxed);
- const std::string* db_name = nullptr;
- const std::string* cf_name = nullptr;
- const std::string* event_name = nullptr;
- if (cf_info != nullptr) {
- db_name = &cf_info->db_name;
- cf_name = &cf_info->cf_name;
- // display lower-level info only when higher-level info is available.
- if (event_info != nullptr) {
- event_name = &event_info->event_name;
- }
- }
- thread_list->emplace_back(
- thread_data->thread_id, thread_type,
- db_name ? *db_name : "",
- cf_name ? *cf_name : "",
- event_name ? *event_name : "");
- }
-
- return Status::OK();
-}
-
-ThreadStatusData* ThreadStatusImpl::InitAndGet() {
- if (UNLIKELY(thread_status_data_ == nullptr)) {
- thread_status_data_ = new ThreadStatusData();
- thread_status_data_->thread_id = reinterpret_cast<uint64_t>(
- thread_status_data_);
- std::lock_guard<std::mutex> lck(thread_list_mutex_);
- thread_data_set_.insert(thread_status_data_);
- }
- return thread_status_data_;
-}
-
-void ThreadStatusImpl::NewColumnFamilyInfo(
- const void* db_key, const std::string& db_name,
- const void* cf_key, const std::string& cf_name) {
- std::lock_guard<std::mutex> lck(thread_list_mutex_);
-
- cf_info_map_[cf_key].reset(
- new ConstantColumnFamilyInfo(db_key, db_name, cf_name));
- db_key_map_[db_key].insert(cf_key);
-}
-
-void ThreadStatusImpl::EraseColumnFamilyInfo(const void* cf_key) {
- std::lock_guard<std::mutex> lck(thread_list_mutex_);
- auto cf_pair = cf_info_map_.find(cf_key);
- assert(cf_pair != cf_info_map_.end());
-
- auto* cf_info = cf_pair->second.get();
- assert(cf_info);
-
- // Remove its entry from db_key_map_ by the following steps:
- // 1. Obtain the entry in db_key_map_ whose set contains cf_key
- // 2. Remove it from the set.
- auto db_pair = db_key_map_.find(cf_info->db_key);
- assert(db_pair != db_key_map_.end());
- size_t result __attribute__((unused)) = db_pair->second.erase(cf_key);
- assert(result);
-
- cf_pair->second.reset();
- result = cf_info_map_.erase(cf_key);
- assert(result);
-}
-
-void ThreadStatusImpl::EraseDatabaseInfo(const void* db_key) {
- std::lock_guard<std::mutex> lck(thread_list_mutex_);
- auto db_pair = db_key_map_.find(db_key);
- if (UNLIKELY(db_pair == db_key_map_.end())) {
- // In some occasional cases such as DB::Open fails, we won't
- // register ColumnFamilyInfo for a db.
- return;
- }
-
- size_t result __attribute__((unused)) = 0;
- for (auto cf_key : db_pair->second) {
- auto cf_pair = cf_info_map_.find(cf_key);
- assert(cf_pair != cf_info_map_.end());
- cf_pair->second.reset();
- result = cf_info_map_.erase(cf_key);
- assert(result);
- }
- db_key_map_.erase(db_key);
-}
-
-#else
-
-ThreadStatusImpl::~ThreadStatusImpl() {
-}
-
-void ThreadStatusImpl::UnregisterThread() {
-}
-
-void ThreadStatusImpl::SetThreadType(
- ThreadStatus::ThreadType ttype) {
-}
-
-void ThreadStatusImpl::SetColumnFamilyInfoKey(
- const void* cf_key) {
-}
-
-void ThreadStatusImpl::SetEventInfoPtr(
- const ThreadEventInfo* event_info) {
-}
-
-Status ThreadStatusImpl::GetThreadList(
- std::vector<ThreadStatus>* thread_list) const {
- return Status::NotSupported(
- "GetThreadList is not supported in the current running environment.");
-}
-
-void ThreadStatusImpl::NewColumnFamilyInfo(
- const void* db_key, const std::string& db_name,
- const void* cf_key, const std::string& cf_name) {
-}
-
-void ThreadStatusImpl::EraseColumnFamilyInfo(const void* cf_key) {
-}
-
-void ThreadStatusImpl::EraseDatabaseInfo(const void* db_key) {
-}
-
-ThreadStatusImpl thread_local_status;
-#endif // ROCKSDB_USING_THREAD_STATUS
-} // namespace rocksdb
+++ /dev/null
-// Copyright (c) 2013, Facebook, Inc. All rights reserved.
-// This source code is licensed under the BSD-style license found in the
-// LICENSE file in the root directory of this source tree. An additional grant
-// of patent rights can be found in the PATENTS file in the same directory.
-//
-// The implementation of ThreadStatus. It is implemented via combination
-// of macros and thread-local variables.
-//
-// Note that we make get and set access to ThreadStatusData lockless.
-// As a result, ThreadStatusData as a whole is not atomic. However,
-// we guarantee consistent ThreadStatusData all the time whenever
-// user call GetThreadList(). This consistency guarantee is done
-// by having the following constraint in the internal implementation
-// of set and get order:
-//
-// 1. When reset any information in ThreadStatusData, always start from
-// clearing up the lower-level information first.
-// 2. When setting any information in ThreadStatusData, always start from
-// setting the higher-level information.
-// 3. When returning ThreadStatusData to the user, fields are fetched from
-// higher-level to lower-level. In addition, where there's a nullptr
-// in one field, then all fields that has lower-level than that field
-// should be ignored.
-//
-// The high to low level information would be:
-// thread_id > thread_type > db > cf > event > event_count > event_details
-//
-// This means user might not always get full information, but whenever
-// returned by the GetThreadList() is guaranteed to be consistent.
-#pragma once
-#include <unordered_set>
-#include <atomic>
-#include <string>
-#include <unordered_map>
-#include <mutex>
-#include <list>
-#include <vector>
-#include "rocksdb/status.h"
-#include "rocksdb/thread_status.h"
-#include "port/port_posix.h"
-
-namespace rocksdb {
-
-class ColumnFamilyHandle;
-
-// The mutable version of ThreadStatus. It has a static set maintaining
-// the set of current registered threades.
-//
-// Note that it is suggested to call the above macros.
-struct ConstantColumnFamilyInfo {
-#if ROCKSDB_USING_THREAD_STATUS
- public:
- ConstantColumnFamilyInfo(
- const void* _db_key,
- const std::string& _db_name,
- const std::string& _cf_name) :
- db_key(_db_key), db_name(_db_name), cf_name(_cf_name) {}
- const void* db_key;
- const std::string db_name;
- const std::string cf_name;
-#endif // ROCKSDB_USING_THREAD_STATUS
-};
-
-struct ThreadEventInfo {
-#if ROCKSDB_USING_THREAD_STATUS
- public:
- const std::string event_name;
-#endif // ROCKSDB_USING_THREAD_STATUS
-};
-
-// the internal data-structure that is used to reflect the current
-// status of a thread using a set of atomic pointers.
-struct ThreadStatusData {
-#if ROCKSDB_USING_THREAD_STATUS
- explicit ThreadStatusData() : thread_id(0) {
- thread_type.store(ThreadStatus::ThreadType::USER_THREAD);
- cf_key.store(0);
- event_info.store(nullptr);
- }
- uint64_t thread_id;
- std::atomic<ThreadStatus::ThreadType> thread_type;
- std::atomic<const void*> cf_key;
- std::atomic<const ThreadEventInfo*> event_info;
-#endif // ROCKSDB_USING_THREAD_STATUS
-};
-
-class ThreadStatusImpl {
- public:
- ThreadStatusImpl() {}
-
- // Releases all ThreadStatusData of all active threads.
- ~ThreadStatusImpl();
-
- void UnregisterThread();
-
- // Set the thread type of the current thread.
- void SetThreadType(ThreadStatus::ThreadType ttype);
-
- // Update the column-family info of the current thread by setting
- // its thread-local pointer of ThreadEventInfo to the correct entry.
- void SetColumnFamilyInfoKey(const void* cf_key);
-
- // Update the event info of the current thread by setting
- // its thread-local pointer of ThreadEventInfo to the correct entry.
- void SetEventInfoPtr(const ThreadEventInfo* event_info);
-
- Status GetThreadList(
- std::vector<ThreadStatus>* thread_list) const;
-
- // Create an entry in the global ColumnFamilyInfo table for the
- // specified column family. This function should be called only
- // when the current thread does not hold db_mutex.
- static void NewColumnFamilyInfo(
- const void* db_key, const std::string& db_name,
- const void* cf_key, const std::string& cf_name);
-
- // Erase all ConstantColumnFamilyInfo that is associated with the
- // specified db instance. This function should be called only when
- // the current thread does not hold db_mutex.
- static void EraseDatabaseInfo(const void* db_key);
-
- // Erase the ConstantColumnFamilyInfo that is associated with the
- // specified ColumnFamilyData. This function should be called only
- // when the current thread does not hold db_mutex.
- static void EraseColumnFamilyInfo(const void* cf_key);
-
- // Verifies whether the input ColumnFamilyHandles matches
- // the information stored in the current cf_info_map.
- static void TEST_VerifyColumnFamilyInfoMap(
- const std::vector<ColumnFamilyHandle*>& handles,
- bool check_exist);
-
- protected:
-
-#if ROCKSDB_USING_THREAD_STATUS
- // The thread-local variable for storing thread status.
- static __thread ThreadStatusData* thread_status_data_;
-
- // Obtain the pointer to the thread status data. It also performs
- // initialization when necessary.
- ThreadStatusData* InitAndGet();
-
- // The mutex that protects cf_info_map and db_key_map.
- static std::mutex thread_list_mutex_;
-
- // The current status data of all active threads.
- static std::unordered_set<ThreadStatusData*> thread_data_set_;
-
- // A global map that keeps the column family information. It is stored
- // globally instead of inside DB is to avoid the situation where DB is
- // closing while GetThreadList function already get the pointer to its
- // CopnstantColumnFamilyInfo.
- static std::unordered_map<
- const void*, std::unique_ptr<ConstantColumnFamilyInfo>> cf_info_map_;
-
- // A db_key to cf_key map that allows erasing elements in cf_info_map
- // associated to the same db_key faster.
- static std::unordered_map<
- const void*, std::unordered_set<const void*>> db_key_map_;
-#else
- static ThreadStatusData* thread_status_data_;
-#endif // ROCKSDB_USING_THREAD_STATUS
-};
-
-
-extern ThreadStatusImpl thread_local_status;
-} // namespace rocksdb
+++ /dev/null
-// Copyright (c) 2014, Facebook, Inc. All rights reserved.
-// This source code is licensed under the BSD-style license found in the
-// LICENSE file in the root directory of this source tree. An additional grant
-// of patent rights can be found in the PATENTS file in the same directory.
-
-#include <mutex>
-
-#include "util/thread_status_impl.h"
-#include "db/column_family.h"
-#if ROCKSDB_USING_THREAD_STATUS
-
-namespace rocksdb {
-void ThreadStatusImpl::TEST_VerifyColumnFamilyInfoMap(
- const std::vector<ColumnFamilyHandle*>& handles,
- bool check_exist) {
- std::unique_lock<std::mutex> lock(thread_list_mutex_);
- if (check_exist) {
- assert(cf_info_map_.size() == handles.size());
- }
- for (auto* handle : handles) {
- auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(handle)->cfd();
- auto iter __attribute__((unused)) = cf_info_map_.find(cfd);
- if (check_exist) {
- assert(iter != cf_info_map_.end());
- assert(iter->second);
- assert(iter->second->cf_name == cfd->GetName());
- } else {
- assert(iter == cf_info_map_.end());
- }
- }
-}
-} // namespace rocksdb
-#endif // ROCKSDB_USING_THREAD_STATUS
#include "db/db_impl.h"
#include "db/version_set.h"
#include "table/get_context.h"
-#include "util/thread_status_impl.h"
namespace rocksdb {
if (!s.ok()) {
return s;
}
- NewThreadStatusCfInfo(cfd_);
version_ = cfd_->GetSuperVersion()->current;
user_comparator_ = cfd_->user_comparator();
auto* vstorage = version_->storage_info();