From 45175ca2e1b25aa43927424af4e36693e7359564 Mon Sep 17 00:00:00 2001 From: hx235 <83968999+hx235@users.noreply.github.com> Date: Wed, 8 Sep 2021 16:23:41 -0700 Subject: [PATCH] Charge read to rate limiter in BackupEngine (#8722) Summary: Context: While all the non-trivial write operations in BackupEngine go through the RateLimiter, reads currently do not. In general, this is not a huge issue because (especially since some I/O efficiency fixes) reads in BackupEngine are mostly limited by corresponding writes, for both backup and restore. But in principle we should charge the RateLimiter for reads as well. - Charged read operations in `BackupEngineImpl::CopyOrCreateFile`, `BackupEngineImpl::ReadFileAndComputeChecksum`, `BackupEngineImpl::BackupMeta::LoadFromFile` and `BackupEngineImpl::GetFileDbIdentities` Pull Request resolved: https://github.com/facebook/rocksdb/pull/8722 Test Plan: - Passed existing tests - Passed added unit tests Reviewed By: pdillinger Differential Revision: D30610464 Pulled By: hx235 fbshipit-source-id: 9b08c9387159a5385c8d390d6666377a0d0117e5 --- HISTORY.md | 1 + include/rocksdb/utilities/backup_engine.h | 6 + utilities/backupable/backupable_db.cc | 59 ++++++- utilities/backupable/backupable_db_test.cc | 170 +++++++++++++++++++++ 4 files changed, 228 insertions(+), 8 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index d4f19fb77..3d84951ce 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -8,6 +8,7 @@ * Fix a race in item ref counting in LRUCache when promoting an item from the SecondaryCache. * Fix a race in BackupEngine if RateLimiter is reconfigured during concurrent Restore operations. * Fix a bug on POSIX in which failure to create a lock file (e.g. out of space) can prevent future LockFile attempts in the same process on the same file from succeeding. +* Fix a bug that backup_rate_limiter and restore_rate_limiter in BackupEngine could not limit read rates. ### New Features * RemoteCompaction's interface now includes `db_name`, `db_id`, `session_id`, which could help the user uniquely identify compaction job between db instances and sessions. diff --git a/include/rocksdb/utilities/backup_engine.h b/include/rocksdb/utilities/backup_engine.h index 8c2eb2083..f508d562c 100644 --- a/include/rocksdb/utilities/backup_engine.h +++ b/include/rocksdb/utilities/backup_engine.h @@ -76,6 +76,9 @@ struct BackupEngineOptions { // Max bytes that can be transferred in a second during backup. // If 0, go as fast as you can + // This limit only applies to writes. To also limit reads, + // a rate limiter able to also limit reads (e.g, its mode = kAllIo) + // have to be passed in through the option "backup_rate_limiter" // Default: 0 uint64_t backup_rate_limit; @@ -86,6 +89,9 @@ struct BackupEngineOptions { // Max bytes that can be transferred in a second during restore. // If 0, go as fast as you can + // This limit only applies to writes. To also limit reads, + // a rate limiter able to also limit reads (e.g, its mode = kAllIo) + // have to be passed in through the option "restore_rate_limiter" // Default: 0 uint64_t restore_rate_limit; diff --git a/utilities/backupable/backupable_db.cc b/utilities/backupable/backupable_db.cc index 4e8b1f576..9f045062e 100644 --- a/utilities/backupable/backupable_db.cc +++ b/utilities/backupable/backupable_db.cc @@ -407,7 +407,7 @@ class BackupEngineImpl { Status LoadFromFile( const std::string& backup_dir, const std::unordered_map& abs_path_to_size, - Logger* info_log, + RateLimiter* rate_limiter, Logger* info_log, std::unordered_set* reported_ignored_fields); Status StoreToFile( bool sync, const TEST_FutureSchemaVersion2Options* test_future_options); @@ -550,7 +550,8 @@ class BackupEngineImpl { // Obtain db_id and db_session_id from the table properties of file_path Status GetFileDbIdentities(Env* src_env, const EnvOptions& src_env_options, - const std::string& file_path, std::string* db_id, + const std::string& file_path, + RateLimiter* rate_limiter, std::string* db_id, std::string* db_session_id); struct CopyOrCreateResult { @@ -1088,7 +1089,8 @@ Status BackupEngineImpl::Initialize() { &abs_path_to_size); if (s.ok()) { s = backup_iter->second->LoadFromFile( - options_.backup_dir, abs_path_to_size, options_.info_log, + options_.backup_dir, abs_path_to_size, + options_.backup_rate_limiter.get(), options_.info_log, &reported_ignored_fields_); } if (s.IsCorruption() || s.IsNotSupported()) { @@ -1954,6 +1956,10 @@ Status BackupEngineImpl::CopyOrCreateFile( size_t buffer_to_read = (buf_size < size_limit) ? buf_size : static_cast(size_limit); s = src_reader->Read(buffer_to_read, &data, buf.get()); + if (rate_limiter != nullptr) { + rate_limiter->Request(data.size(), Env::IO_LOW, nullptr /* stats */, + RateLimiter::OpType::kRead); + } processed_buffer_size += buffer_to_read; } else { data = contents; @@ -2046,8 +2052,8 @@ Status BackupEngineImpl::AddBackupFileWorkItem( // Prepare db_session_id to add to the file name // Ignore the returned status // In the failed cases, db_id and db_session_id will be empty - GetFileDbIdentities(db_env_, src_env_options, src_dir + fname, &db_id, - &db_session_id) + GetFileDbIdentities(db_env_, src_env_options, src_dir + fname, + rate_limiter, &db_id, &db_session_id) .PermitUncheckedError(); } // Calculate checksum if checksum and db session id are not available. @@ -2259,7 +2265,10 @@ Status BackupEngineImpl::ReadFileAndComputeChecksum( size_t buffer_to_read = (buf_size < size_limit) ? buf_size : static_cast(size_limit); s = src_reader->Read(buffer_to_read, &data, buf.get()); - + if (rate_limiter != nullptr) { + rate_limiter->Request(data.size(), Env::IO_LOW, nullptr /* stats */, + RateLimiter::OpType::kRead); + } if (!s.ok()) { return s; } @@ -2276,6 +2285,7 @@ Status BackupEngineImpl::ReadFileAndComputeChecksum( Status BackupEngineImpl::GetFileDbIdentities(Env* src_env, const EnvOptions& src_env_options, const std::string& file_path, + RateLimiter* rate_limiter, std::string* db_id, std::string* db_session_id) { assert(db_id != nullptr || db_session_id != nullptr); @@ -2300,6 +2310,13 @@ Status BackupEngineImpl::GetFileDbIdentities(Env* src_env, table_properties = sst_reader.GetInitTableProperties(); } else { table_properties = tp.get(); + if (table_properties != nullptr && rate_limiter != nullptr) { + // sizeof(*table_properties) is a sufficent but far-from-exact + // approximation of read bytes due to metaindex block, std::string + // properties and varint compression + rate_limiter->Request(sizeof(*table_properties), Env::IO_LOW, + nullptr /* stats */, RateLimiter::OpType::kRead); + } } } else { ROCKS_LOG_INFO(options_.info_log, "Failed to read %s: %s", @@ -2623,7 +2640,7 @@ const std::string kNonIgnorableFieldPrefix{"ni::"}; Status BackupEngineImpl::BackupMeta::LoadFromFile( const std::string& backup_dir, const std::unordered_map& abs_path_to_size, - Logger* info_log, + RateLimiter* rate_limiter, Logger* info_log, std::unordered_set* reported_ignored_fields) { assert(reported_ignored_fields); assert(Empty()); @@ -2645,6 +2662,10 @@ Status BackupEngineImpl::BackupMeta::LoadFromFile( // Failures handled at the end std::string line; if (backup_meta_reader->ReadLine(&line)) { + if (rate_limiter != nullptr) { + rate_limiter->Request(line.size(), Env::IO_LOW, nullptr /* stats */, + RateLimiter::OpType::kRead); + } if (StartsWith(line, kSchemaVersionPrefix)) { std::string ver = line.substr(kSchemaVersionPrefix.size()); if (ver == "2" || StartsWith(ver, "2.")) { @@ -2658,14 +2679,28 @@ Status BackupEngineImpl::BackupMeta::LoadFromFile( return Status::Corruption("Unexpected empty line"); } } - if (!line.empty() || backup_meta_reader->ReadLine(&line)) { + if (!line.empty()) { + timestamp_ = std::strtoull(line.c_str(), nullptr, /*base*/ 10); + } else if (backup_meta_reader->ReadLine(&line)) { + if (rate_limiter != nullptr) { + rate_limiter->Request(line.size(), Env::IO_LOW, nullptr /* stats */, + RateLimiter::OpType::kRead); + } timestamp_ = std::strtoull(line.c_str(), nullptr, /*base*/ 10); } if (backup_meta_reader->ReadLine(&line)) { + if (rate_limiter != nullptr) { + rate_limiter->Request(line.size(), Env::IO_LOW, nullptr /* stats */, + RateLimiter::OpType::kRead); + } sequence_number_ = std::strtoull(line.c_str(), nullptr, /*base*/ 10); } uint32_t num_files = UINT32_MAX; while (backup_meta_reader->ReadLine(&line)) { + if (rate_limiter != nullptr) { + rate_limiter->Request(line.size(), Env::IO_LOW, nullptr /* stats */, + RateLimiter::OpType::kRead); + } if (line.empty()) { return Status::Corruption("Unexpected empty line"); } @@ -2705,6 +2740,10 @@ Status BackupEngineImpl::BackupMeta::LoadFromFile( std::vector> files; bool footer_present = false; while (backup_meta_reader->ReadLine(&line)) { + if (rate_limiter != nullptr) { + rate_limiter->Request(line.size(), Env::IO_LOW, nullptr /* stats */, + RateLimiter::OpType::kRead); + } std::vector components = StringSplit(line, ' '); if (components.size() < 1) { @@ -2792,6 +2831,10 @@ Status BackupEngineImpl::BackupMeta::LoadFromFile( if (footer_present) { assert(schema_major_version >= 2); while (backup_meta_reader->ReadLine(&line)) { + if (rate_limiter != nullptr) { + rate_limiter->Request(line.size(), Env::IO_LOW, nullptr /* stats */, + RateLimiter::OpType::kRead); + } if (line.empty()) { return Status::Corruption("Unexpected empty line"); } diff --git a/utilities/backupable/backupable_db_test.cc b/utilities/backupable/backupable_db_test.cc index 2d4cfc1f0..de8c79639 100644 --- a/utilities/backupable/backupable_db_test.cc +++ b/utilities/backupable/backupable_db_test.cc @@ -13,6 +13,8 @@ #include #include +#include +#include #include #include #include @@ -2622,6 +2624,174 @@ TEST_P(BackupEngineRateLimitingTestWithParam, RateLimiting) { AssertBackupConsistency(0, 0, 100000, 100010); } + +TEST_P(BackupEngineRateLimitingTestWithParam, RateLimitingVerifyBackup) { + const std::size_t kMicrosPerSec = 1000 * 1000LL; + std::shared_ptr backupThrottler(NewGenericRateLimiter( + 1, 100 * 1000 /* refill_period_us */, 10 /* fairness */, + RateLimiter::Mode::kAllIo /* mode */)); + + bool makeThrottler = std::get<0>(GetParam()); + if (makeThrottler) { + backupable_options_->backup_rate_limiter = backupThrottler; + } + + bool is_single_threaded = std::get<1>(GetParam()) == 0 ? true : false; + backupable_options_->max_background_operations = is_single_threaded ? 1 : 10; + + const std::uint64_t backup_rate_limiter_limit = std::get<2>(GetParam()).first; + if (makeThrottler) { + backupable_options_->backup_rate_limiter->SetBytesPerSecond( + backup_rate_limiter_limit); + } else { + backupable_options_->backup_rate_limit = backup_rate_limiter_limit; + } + + DestroyDB(dbname_, Options()); + OpenDBAndBackupEngine(true /* destroy_old_data */); + FillDB(db_.get(), 0, 100000); + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), + false /* flush_before_backup */)); + + std::vector backup_infos; + BackupInfo backup_info; + backup_engine_->GetBackupInfo(&backup_infos); + ASSERT_EQ(1, backup_infos.size()); + const int backup_id = 1; + ASSERT_EQ(backup_id, backup_infos[0].backup_id); + ASSERT_OK(backup_engine_->GetBackupInfo(backup_id, &backup_info, + true /* include_file_details */)); + + std::uint64_t bytes_read_during_verify_backup = 0; + for (BackupFileInfo backup_file_info : backup_info.file_details) { + bytes_read_during_verify_backup += backup_file_info.size; + } + + auto start_verify_backup = db_chroot_env_->NowMicros(); + ASSERT_OK( + backup_engine_->VerifyBackup(backup_id, true /* verify_with_checksum */)); + auto verify_backup_time = db_chroot_env_->NowMicros() - start_verify_backup; + auto rate_limited_verify_backup_time = + (bytes_read_during_verify_backup * kMicrosPerSec) / + backup_rate_limiter_limit; + + if (makeThrottler) { + EXPECT_GE(verify_backup_time, 0.8 * rate_limited_verify_backup_time); + } + CloseDBAndBackupEngine(); + AssertBackupConsistency(backup_id, 0, 100000, 100010); + DestroyDB(dbname_, Options()); +} + +TEST_P(BackupEngineRateLimitingTestWithParam, RateLimitingChargeReadInBackup) { + bool is_single_threaded = std::get<1>(GetParam()) == 0 ? true : false; + backupable_options_->max_background_operations = is_single_threaded ? 1 : 10; + + const std::uint64_t backup_rate_limiter_limit = std::get<2>(GetParam()).first; + std::shared_ptr backup_rate_limiter(NewGenericRateLimiter( + backup_rate_limiter_limit, 100 * 1000 /* refill_period_us */, + 10 /* fairness */, RateLimiter::Mode::kWritesOnly /* mode */)); + backupable_options_->backup_rate_limiter = backup_rate_limiter; + + DestroyDB(dbname_, Options()); + OpenDBAndBackupEngine(true /* destroy_old_data */, false /* dummy */, + kShareWithChecksum /* shared_option */); + FillDB(db_.get(), 0, 10); + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), + false /* flush_before_backup */)); + std::int64_t total_bytes_through_with_no_read_charged = + backup_rate_limiter->GetTotalBytesThrough(); + CloseBackupEngine(); + + backup_rate_limiter.reset(NewGenericRateLimiter( + backup_rate_limiter_limit, 100 * 1000 /* refill_period_us */, + 10 /* fairness */, RateLimiter::Mode::kAllIo /* mode */)); + backupable_options_->backup_rate_limiter = backup_rate_limiter; + + OpenBackupEngine(true); + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), + false /* flush_before_backup */)); + std::int64_t total_bytes_through_with_read_charged = + backup_rate_limiter->GetTotalBytesThrough(); + EXPECT_GT(total_bytes_through_with_read_charged, + total_bytes_through_with_no_read_charged); + CloseDBAndBackupEngine(); + AssertBackupConsistency(1, 0, 10, 20); + DestroyDB(dbname_, Options()); +} + +TEST_P(BackupEngineRateLimitingTestWithParam, RateLimitingChargeReadInRestore) { + bool is_single_threaded = std::get<1>(GetParam()) == 0 ? true : false; + backupable_options_->max_background_operations = is_single_threaded ? 1 : 10; + + const std::uint64_t restore_rate_limiter_limit = + std::get<2>(GetParam()).second; + std::shared_ptr restore_rate_limiter(NewGenericRateLimiter( + restore_rate_limiter_limit, 100 * 1000 /* refill_period_us */, + 10 /* fairness */, RateLimiter::Mode::kWritesOnly /* mode */)); + backupable_options_->restore_rate_limiter = restore_rate_limiter; + + DestroyDB(dbname_, Options()); + OpenDBAndBackupEngine(true /* destroy_old_data */); + FillDB(db_.get(), 0, 10); + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), + false /* flush_before_backup */)); + CloseDBAndBackupEngine(); + DestroyDB(dbname_, Options()); + + OpenBackupEngine(false /* destroy_old_data */); + ASSERT_OK(backup_engine_->RestoreDBFromLatestBackup(dbname_, dbname_)); + std::int64_t total_bytes_through_with_no_read_charged = + restore_rate_limiter->GetTotalBytesThrough(); + CloseBackupEngine(); + DestroyDB(dbname_, Options()); + + restore_rate_limiter.reset(NewGenericRateLimiter( + restore_rate_limiter_limit, 100 * 1000 /* refill_period_us */, + 10 /* fairness */, RateLimiter::Mode::kAllIo /* mode */)); + backupable_options_->restore_rate_limiter = restore_rate_limiter; + + OpenBackupEngine(false /* destroy_old_data */); + ASSERT_OK(backup_engine_->RestoreDBFromLatestBackup(dbname_, dbname_)); + std::int64_t total_bytes_through_with_read_charged = + restore_rate_limiter->GetTotalBytesThrough(); + EXPECT_EQ(total_bytes_through_with_read_charged, + total_bytes_through_with_no_read_charged * 2); + CloseBackupEngine(); + AssertBackupConsistency(1, 0, 10, 20); + DestroyDB(dbname_, Options()); +} + +TEST_P(BackupEngineRateLimitingTestWithParam, + RateLimitingChargeReadInInitialize) { + bool is_single_threaded = std::get<1>(GetParam()) == 0 ? true : false; + backupable_options_->max_background_operations = is_single_threaded ? 1 : 10; + + const std::uint64_t backup_rate_limiter_limit = std::get<2>(GetParam()).first; + std::shared_ptr backup_rate_limiter(NewGenericRateLimiter( + backup_rate_limiter_limit, 100 * 1000 /* refill_period_us */, + 10 /* fairness */, RateLimiter::Mode::kAllIo /* mode */)); + backupable_options_->backup_rate_limiter = backup_rate_limiter; + + DestroyDB(dbname_, Options()); + OpenDBAndBackupEngine(true /* destroy_old_data */); + FillDB(db_.get(), 0, 10); + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), + false /* flush_before_backup */)); + CloseDBAndBackupEngine(); + AssertBackupConsistency(1, 0, 10, 20); + + std::int64_t total_bytes_through_before_initialize = + backupable_options_->backup_rate_limiter->GetTotalBytesThrough(); + OpenDBAndBackupEngine(false /* destroy_old_data */); + // We charge read in BackupEngineImpl::BackupMeta::LoadFromFile, + // which is called in BackupEngineImpl::Initialize() during + // OpenBackupEngine(false) + EXPECT_GT(backupable_options_->backup_rate_limiter->GetTotalBytesThrough(), + total_bytes_through_before_initialize); + CloseDBAndBackupEngine(); + DestroyDB(dbname_, Options()); +} #endif // !defined(ROCKSDB_VALGRIND_RUN) || defined(ROCKSDB_FULL_VALGRIND_RUN) TEST_F(BackupEngineTest, ReadOnlyBackupEngine) { -- 2.47.3