]> git-server-git.apps.pok.os.sepia.ceph.com Git - rocksdb.git/commitdiff
Charge read to rate limiter in BackupEngine (#8722)
authorhx235 <83968999+hx235@users.noreply.github.com>
Wed, 8 Sep 2021 23:23:41 +0000 (16:23 -0700)
committerFacebook GitHub Bot <facebook-github-bot@users.noreply.github.com>
Wed, 8 Sep 2021 23:24:40 +0000 (16:24 -0700)
Summary:
Context:
While all the non-trivial write operations in BackupEngine go through the RateLimiter, reads currently do not. In general, this is not a huge issue because (especially since some I/O efficiency fixes) reads in BackupEngine are mostly limited by corresponding writes, for both backup and restore. But in principle we should charge the RateLimiter for reads as well.
- Charged read operations in `BackupEngineImpl::CopyOrCreateFile`, `BackupEngineImpl::ReadFileAndComputeChecksum`, `BackupEngineImpl::BackupMeta::LoadFromFile` and `BackupEngineImpl::GetFileDbIdentities`

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8722

Test Plan:
- Passed existing tests
- Passed added unit tests

Reviewed By: pdillinger

Differential Revision: D30610464

Pulled By: hx235

fbshipit-source-id: 9b08c9387159a5385c8d390d6666377a0d0117e5

HISTORY.md
include/rocksdb/utilities/backup_engine.h
utilities/backupable/backupable_db.cc
utilities/backupable/backupable_db_test.cc

index d4f19fb7722c264aa65adeee0da17521094509aa..3d84951cef19b594c23528c6a3e932c45bb1fc0a 100644 (file)
@@ -8,6 +8,7 @@
 * Fix a race in item ref counting in LRUCache when promoting an item from the SecondaryCache.
 * Fix a race in BackupEngine if RateLimiter is reconfigured during concurrent Restore operations.
 * Fix a bug on POSIX in which failure to create a lock file (e.g. out of space) can prevent future LockFile attempts in the same process on the same file from succeeding.
+* Fix a bug that backup_rate_limiter and restore_rate_limiter in BackupEngine could not limit read rates.
 
 ### New Features
 * RemoteCompaction's interface now includes `db_name`, `db_id`, `session_id`, which could help the user uniquely identify compaction job between db instances and sessions.
index 8c2eb20833c35bee7025f566341053f003cc92bd..f508d562c247f69396deaebf9b27e046a24a78a6 100644 (file)
@@ -76,6 +76,9 @@ struct BackupEngineOptions {
 
   // Max bytes that can be transferred in a second during backup.
   // If 0, go as fast as you can
+  // This limit only applies to writes. To also limit reads,
+  // a rate limiter able to also limit reads (e.g, its mode = kAllIo)
+  // have to be passed in through the option "backup_rate_limiter"
   // Default: 0
   uint64_t backup_rate_limit;
 
@@ -86,6 +89,9 @@ struct BackupEngineOptions {
 
   // Max bytes that can be transferred in a second during restore.
   // If 0, go as fast as you can
+  // This limit only applies to writes. To also limit reads,
+  // a rate limiter able to also limit reads (e.g, its mode = kAllIo)
+  // have to be passed in through the option "restore_rate_limiter"
   // Default: 0
   uint64_t restore_rate_limit;
 
index 4e8b1f57652ca3372bfdbe5574e289402743998f..9f045062e589fb9f8f571b57a54cdc8addb80968 100644 (file)
@@ -407,7 +407,7 @@ class BackupEngineImpl {
     Status LoadFromFile(
         const std::string& backup_dir,
         const std::unordered_map<std::string, uint64_t>& abs_path_to_size,
-        Logger* info_log,
+        RateLimiter* rate_limiter, Logger* info_log,
         std::unordered_set<std::string>* reported_ignored_fields);
     Status StoreToFile(
         bool sync, const TEST_FutureSchemaVersion2Options* test_future_options);
@@ -550,7 +550,8 @@ class BackupEngineImpl {
 
   // Obtain db_id and db_session_id from the table properties of file_path
   Status GetFileDbIdentities(Env* src_env, const EnvOptions& src_env_options,
-                             const std::string& file_path, std::string* db_id,
+                             const std::string& file_path,
+                             RateLimiter* rate_limiter, std::string* db_id,
                              std::string* db_session_id);
 
   struct CopyOrCreateResult {
@@ -1088,7 +1089,8 @@ Status BackupEngineImpl::Initialize() {
           &abs_path_to_size);
       if (s.ok()) {
         s = backup_iter->second->LoadFromFile(
-            options_.backup_dir, abs_path_to_size, options_.info_log,
+            options_.backup_dir, abs_path_to_size,
+            options_.backup_rate_limiter.get(), options_.info_log,
             &reported_ignored_fields_);
       }
       if (s.IsCorruption() || s.IsNotSupported()) {
@@ -1954,6 +1956,10 @@ Status BackupEngineImpl::CopyOrCreateFile(
       size_t buffer_to_read =
           (buf_size < size_limit) ? buf_size : static_cast<size_t>(size_limit);
       s = src_reader->Read(buffer_to_read, &data, buf.get());
+      if (rate_limiter != nullptr) {
+        rate_limiter->Request(data.size(), Env::IO_LOW, nullptr /* stats */,
+                              RateLimiter::OpType::kRead);
+      }
       processed_buffer_size += buffer_to_read;
     } else {
       data = contents;
@@ -2046,8 +2052,8 @@ Status BackupEngineImpl::AddBackupFileWorkItem(
       // Prepare db_session_id to add to the file name
       // Ignore the returned status
       // In the failed cases, db_id and db_session_id will be empty
-      GetFileDbIdentities(db_env_, src_env_options, src_dir + fname, &db_id,
-                          &db_session_id)
+      GetFileDbIdentities(db_env_, src_env_options, src_dir + fname,
+                          rate_limiter, &db_id, &db_session_id)
           .PermitUncheckedError();
     }
     // Calculate checksum if checksum and db session id are not available.
@@ -2259,7 +2265,10 @@ Status BackupEngineImpl::ReadFileAndComputeChecksum(
     size_t buffer_to_read =
         (buf_size < size_limit) ? buf_size : static_cast<size_t>(size_limit);
     s = src_reader->Read(buffer_to_read, &data, buf.get());
-
+    if (rate_limiter != nullptr) {
+      rate_limiter->Request(data.size(), Env::IO_LOW, nullptr /* stats */,
+                            RateLimiter::OpType::kRead);
+    }
     if (!s.ok()) {
       return s;
     }
@@ -2276,6 +2285,7 @@ Status BackupEngineImpl::ReadFileAndComputeChecksum(
 Status BackupEngineImpl::GetFileDbIdentities(Env* src_env,
                                              const EnvOptions& src_env_options,
                                              const std::string& file_path,
+                                             RateLimiter* rate_limiter,
                                              std::string* db_id,
                                              std::string* db_session_id) {
   assert(db_id != nullptr || db_session_id != nullptr);
@@ -2300,6 +2310,13 @@ Status BackupEngineImpl::GetFileDbIdentities(Env* src_env,
       table_properties = sst_reader.GetInitTableProperties();
     } else {
       table_properties = tp.get();
+      if (table_properties != nullptr && rate_limiter != nullptr) {
+        // sizeof(*table_properties) is a sufficent but far-from-exact
+        // approximation of read bytes due to metaindex block, std::string
+        // properties and varint compression
+        rate_limiter->Request(sizeof(*table_properties), Env::IO_LOW,
+                              nullptr /* stats */, RateLimiter::OpType::kRead);
+      }
     }
   } else {
     ROCKS_LOG_INFO(options_.info_log, "Failed to read %s: %s",
@@ -2623,7 +2640,7 @@ const std::string kNonIgnorableFieldPrefix{"ni::"};
 Status BackupEngineImpl::BackupMeta::LoadFromFile(
     const std::string& backup_dir,
     const std::unordered_map<std::string, uint64_t>& abs_path_to_size,
-    Logger* info_log,
+    RateLimiter* rate_limiter, Logger* info_log,
     std::unordered_set<std::string>* reported_ignored_fields) {
   assert(reported_ignored_fields);
   assert(Empty());
@@ -2645,6 +2662,10 @@ Status BackupEngineImpl::BackupMeta::LoadFromFile(
   // Failures handled at the end
   std::string line;
   if (backup_meta_reader->ReadLine(&line)) {
+    if (rate_limiter != nullptr) {
+      rate_limiter->Request(line.size(), Env::IO_LOW, nullptr /* stats */,
+                            RateLimiter::OpType::kRead);
+    }
     if (StartsWith(line, kSchemaVersionPrefix)) {
       std::string ver = line.substr(kSchemaVersionPrefix.size());
       if (ver == "2" || StartsWith(ver, "2.")) {
@@ -2658,14 +2679,28 @@ Status BackupEngineImpl::BackupMeta::LoadFromFile(
       return Status::Corruption("Unexpected empty line");
     }
   }
-  if (!line.empty() || backup_meta_reader->ReadLine(&line)) {
+  if (!line.empty()) {
+    timestamp_ = std::strtoull(line.c_str(), nullptr, /*base*/ 10);
+  } else if (backup_meta_reader->ReadLine(&line)) {
+    if (rate_limiter != nullptr) {
+      rate_limiter->Request(line.size(), Env::IO_LOW, nullptr /* stats */,
+                            RateLimiter::OpType::kRead);
+    }
     timestamp_ = std::strtoull(line.c_str(), nullptr, /*base*/ 10);
   }
   if (backup_meta_reader->ReadLine(&line)) {
+    if (rate_limiter != nullptr) {
+      rate_limiter->Request(line.size(), Env::IO_LOW, nullptr /* stats */,
+                            RateLimiter::OpType::kRead);
+    }
     sequence_number_ = std::strtoull(line.c_str(), nullptr, /*base*/ 10);
   }
   uint32_t num_files = UINT32_MAX;
   while (backup_meta_reader->ReadLine(&line)) {
+    if (rate_limiter != nullptr) {
+      rate_limiter->Request(line.size(), Env::IO_LOW, nullptr /* stats */,
+                            RateLimiter::OpType::kRead);
+    }
     if (line.empty()) {
       return Status::Corruption("Unexpected empty line");
     }
@@ -2705,6 +2740,10 @@ Status BackupEngineImpl::BackupMeta::LoadFromFile(
   std::vector<std::shared_ptr<FileInfo>> files;
   bool footer_present = false;
   while (backup_meta_reader->ReadLine(&line)) {
+    if (rate_limiter != nullptr) {
+      rate_limiter->Request(line.size(), Env::IO_LOW, nullptr /* stats */,
+                            RateLimiter::OpType::kRead);
+    }
     std::vector<std::string> components = StringSplit(line, ' ');
 
     if (components.size() < 1) {
@@ -2792,6 +2831,10 @@ Status BackupEngineImpl::BackupMeta::LoadFromFile(
   if (footer_present) {
     assert(schema_major_version >= 2);
     while (backup_meta_reader->ReadLine(&line)) {
+      if (rate_limiter != nullptr) {
+        rate_limiter->Request(line.size(), Env::IO_LOW, nullptr /* stats */,
+                              RateLimiter::OpType::kRead);
+      }
       if (line.empty()) {
         return Status::Corruption("Unexpected empty line");
       }
index 2d4cfc1f043e244b70c5b70ebf25ffd73500011b..de8c7963970fda1fff79ff339bc98b64f7a7d699 100644 (file)
@@ -13,6 +13,8 @@
 
 #include <algorithm>
 #include <array>
+#include <cstddef>
+#include <cstdint>
 #include <limits>
 #include <random>
 #include <string>
@@ -2622,6 +2624,174 @@ TEST_P(BackupEngineRateLimitingTestWithParam, RateLimiting) {
 
   AssertBackupConsistency(0, 0, 100000, 100010);
 }
+
+TEST_P(BackupEngineRateLimitingTestWithParam, RateLimitingVerifyBackup) {
+  const std::size_t kMicrosPerSec = 1000 * 1000LL;
+  std::shared_ptr<RateLimiter> backupThrottler(NewGenericRateLimiter(
+      1, 100 * 1000 /* refill_period_us */, 10 /* fairness */,
+      RateLimiter::Mode::kAllIo /* mode */));
+
+  bool makeThrottler = std::get<0>(GetParam());
+  if (makeThrottler) {
+    backupable_options_->backup_rate_limiter = backupThrottler;
+  }
+
+  bool is_single_threaded = std::get<1>(GetParam()) == 0 ? true : false;
+  backupable_options_->max_background_operations = is_single_threaded ? 1 : 10;
+
+  const std::uint64_t backup_rate_limiter_limit = std::get<2>(GetParam()).first;
+  if (makeThrottler) {
+    backupable_options_->backup_rate_limiter->SetBytesPerSecond(
+        backup_rate_limiter_limit);
+  } else {
+    backupable_options_->backup_rate_limit = backup_rate_limiter_limit;
+  }
+
+  DestroyDB(dbname_, Options());
+  OpenDBAndBackupEngine(true /* destroy_old_data */);
+  FillDB(db_.get(), 0, 100000);
+  ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(),
+                                            false /* flush_before_backup */));
+
+  std::vector<BackupInfo> backup_infos;
+  BackupInfo backup_info;
+  backup_engine_->GetBackupInfo(&backup_infos);
+  ASSERT_EQ(1, backup_infos.size());
+  const int backup_id = 1;
+  ASSERT_EQ(backup_id, backup_infos[0].backup_id);
+  ASSERT_OK(backup_engine_->GetBackupInfo(backup_id, &backup_info,
+                                          true /* include_file_details */));
+
+  std::uint64_t bytes_read_during_verify_backup = 0;
+  for (BackupFileInfo backup_file_info : backup_info.file_details) {
+    bytes_read_during_verify_backup += backup_file_info.size;
+  }
+
+  auto start_verify_backup = db_chroot_env_->NowMicros();
+  ASSERT_OK(
+      backup_engine_->VerifyBackup(backup_id, true /* verify_with_checksum */));
+  auto verify_backup_time = db_chroot_env_->NowMicros() - start_verify_backup;
+  auto rate_limited_verify_backup_time =
+      (bytes_read_during_verify_backup * kMicrosPerSec) /
+      backup_rate_limiter_limit;
+
+  if (makeThrottler) {
+    EXPECT_GE(verify_backup_time, 0.8 * rate_limited_verify_backup_time);
+  }
+  CloseDBAndBackupEngine();
+  AssertBackupConsistency(backup_id, 0, 100000, 100010);
+  DestroyDB(dbname_, Options());
+}
+
+TEST_P(BackupEngineRateLimitingTestWithParam, RateLimitingChargeReadInBackup) {
+  bool is_single_threaded = std::get<1>(GetParam()) == 0 ? true : false;
+  backupable_options_->max_background_operations = is_single_threaded ? 1 : 10;
+
+  const std::uint64_t backup_rate_limiter_limit = std::get<2>(GetParam()).first;
+  std::shared_ptr<RateLimiter> backup_rate_limiter(NewGenericRateLimiter(
+      backup_rate_limiter_limit, 100 * 1000 /* refill_period_us */,
+      10 /* fairness */, RateLimiter::Mode::kWritesOnly /* mode */));
+  backupable_options_->backup_rate_limiter = backup_rate_limiter;
+
+  DestroyDB(dbname_, Options());
+  OpenDBAndBackupEngine(true /* destroy_old_data */, false /* dummy */,
+                        kShareWithChecksum /* shared_option */);
+  FillDB(db_.get(), 0, 10);
+  ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(),
+                                            false /* flush_before_backup */));
+  std::int64_t total_bytes_through_with_no_read_charged =
+      backup_rate_limiter->GetTotalBytesThrough();
+  CloseBackupEngine();
+
+  backup_rate_limiter.reset(NewGenericRateLimiter(
+      backup_rate_limiter_limit, 100 * 1000 /* refill_period_us */,
+      10 /* fairness */, RateLimiter::Mode::kAllIo /* mode */));
+  backupable_options_->backup_rate_limiter = backup_rate_limiter;
+
+  OpenBackupEngine(true);
+  ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(),
+                                            false /* flush_before_backup */));
+  std::int64_t total_bytes_through_with_read_charged =
+      backup_rate_limiter->GetTotalBytesThrough();
+  EXPECT_GT(total_bytes_through_with_read_charged,
+            total_bytes_through_with_no_read_charged);
+  CloseDBAndBackupEngine();
+  AssertBackupConsistency(1, 0, 10, 20);
+  DestroyDB(dbname_, Options());
+}
+
+TEST_P(BackupEngineRateLimitingTestWithParam, RateLimitingChargeReadInRestore) {
+  bool is_single_threaded = std::get<1>(GetParam()) == 0 ? true : false;
+  backupable_options_->max_background_operations = is_single_threaded ? 1 : 10;
+
+  const std::uint64_t restore_rate_limiter_limit =
+      std::get<2>(GetParam()).second;
+  std::shared_ptr<RateLimiter> restore_rate_limiter(NewGenericRateLimiter(
+      restore_rate_limiter_limit, 100 * 1000 /* refill_period_us */,
+      10 /* fairness */, RateLimiter::Mode::kWritesOnly /* mode */));
+  backupable_options_->restore_rate_limiter = restore_rate_limiter;
+
+  DestroyDB(dbname_, Options());
+  OpenDBAndBackupEngine(true /* destroy_old_data */);
+  FillDB(db_.get(), 0, 10);
+  ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(),
+                                            false /* flush_before_backup */));
+  CloseDBAndBackupEngine();
+  DestroyDB(dbname_, Options());
+
+  OpenBackupEngine(false /* destroy_old_data */);
+  ASSERT_OK(backup_engine_->RestoreDBFromLatestBackup(dbname_, dbname_));
+  std::int64_t total_bytes_through_with_no_read_charged =
+      restore_rate_limiter->GetTotalBytesThrough();
+  CloseBackupEngine();
+  DestroyDB(dbname_, Options());
+
+  restore_rate_limiter.reset(NewGenericRateLimiter(
+      restore_rate_limiter_limit, 100 * 1000 /* refill_period_us */,
+      10 /* fairness */, RateLimiter::Mode::kAllIo /* mode */));
+  backupable_options_->restore_rate_limiter = restore_rate_limiter;
+
+  OpenBackupEngine(false /* destroy_old_data */);
+  ASSERT_OK(backup_engine_->RestoreDBFromLatestBackup(dbname_, dbname_));
+  std::int64_t total_bytes_through_with_read_charged =
+      restore_rate_limiter->GetTotalBytesThrough();
+  EXPECT_EQ(total_bytes_through_with_read_charged,
+            total_bytes_through_with_no_read_charged * 2);
+  CloseBackupEngine();
+  AssertBackupConsistency(1, 0, 10, 20);
+  DestroyDB(dbname_, Options());
+}
+
+TEST_P(BackupEngineRateLimitingTestWithParam,
+       RateLimitingChargeReadInInitialize) {
+  bool is_single_threaded = std::get<1>(GetParam()) == 0 ? true : false;
+  backupable_options_->max_background_operations = is_single_threaded ? 1 : 10;
+
+  const std::uint64_t backup_rate_limiter_limit = std::get<2>(GetParam()).first;
+  std::shared_ptr<RateLimiter> backup_rate_limiter(NewGenericRateLimiter(
+      backup_rate_limiter_limit, 100 * 1000 /* refill_period_us */,
+      10 /* fairness */, RateLimiter::Mode::kAllIo /* mode */));
+  backupable_options_->backup_rate_limiter = backup_rate_limiter;
+
+  DestroyDB(dbname_, Options());
+  OpenDBAndBackupEngine(true /* destroy_old_data */);
+  FillDB(db_.get(), 0, 10);
+  ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(),
+                                            false /* flush_before_backup */));
+  CloseDBAndBackupEngine();
+  AssertBackupConsistency(1, 0, 10, 20);
+
+  std::int64_t total_bytes_through_before_initialize =
+      backupable_options_->backup_rate_limiter->GetTotalBytesThrough();
+  OpenDBAndBackupEngine(false /* destroy_old_data */);
+  // We charge read in BackupEngineImpl::BackupMeta::LoadFromFile,
+  // which is called in BackupEngineImpl::Initialize() during
+  // OpenBackupEngine(false)
+  EXPECT_GT(backupable_options_->backup_rate_limiter->GetTotalBytesThrough(),
+            total_bytes_through_before_initialize);
+  CloseDBAndBackupEngine();
+  DestroyDB(dbname_, Options());
+}
 #endif  // !defined(ROCKSDB_VALGRIND_RUN) || defined(ROCKSDB_FULL_VALGRIND_RUN)
 
 TEST_F(BackupEngineTest, ReadOnlyBackupEngine) {