]> git-server-git.apps.pok.os.sepia.ceph.com Git - rocksdb.git/commitdiff
Fix a buffer size race condition in BackupEngine (#8732)
authorPeter Dillinger <peterd@fb.com>
Wed, 1 Sep 2021 21:10:36 +0000 (14:10 -0700)
committerFacebook GitHub Bot <facebook-github-bot@users.noreply.github.com>
Wed, 1 Sep 2021 21:28:58 +0000 (14:28 -0700)
Summary:
If RateLimiter burst bytes changes during concurrent Restore
operations

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8732

Test Plan: updated unit test fails with TSAN before change, passes after

Reviewed By: ajkr

Differential Revision: D30683879

Pulled By: pdillinger

fbshipit-source-id: d0ddb3587ade91ee2a4d926b475acf7781b03086

HISTORY.md
utilities/backupable/backupable_db.cc
utilities/backupable/backupable_db_test.cc

index 94ee05da3249b8b64d2bebb719b3ba2d7da17fdc..f0aff62940aeb99181b7b1dc0f9a25274da3bc7a 100644 (file)
@@ -6,6 +6,7 @@
 * Fixed a bug that could lead to duplicate DB ID or DB session ID in POSIX environments without /proc/sys/kernel/random/uuid.
 * Fix a race in DumpStats() with column family destruction due to not taking a Ref on each entry while iterating the ColumnFamilySet.
 * Fix a race in item ref counting in LRUCache when promoting an item from the SecondaryCache.
+* Fix a race in BackupEngine if RateLimiter is reconfigured during concurrent Restore operations.
 
 ### New Features
 * RemoteCompaction's interface now includes `db_name`, `db_id`, `session_id`, which could help the user uniquely identify compaction job between db instances and sessions.
index 294fde6ca70486d723971d0a130fa6ab9705099d..5700f45b386df75299ba9f1b37f1714b6be00aec 100644 (file)
@@ -785,7 +785,6 @@ class BackupEngineImpl {
   std::unique_ptr<Directory> private_directory_;
 
   static const size_t kDefaultCopyFileBufferSize = 5 * 1024 * 1024LL;  // 5MB
-  mutable size_t copy_file_buffer_size_;
   bool read_only_;
   BackupStatistics backup_statistics_;
   std::unordered_set<std::string> reported_ignored_fields_;
@@ -924,7 +923,6 @@ BackupEngineImpl::BackupEngineImpl(const BackupEngineOptions& options,
       options_(options),
       db_env_(db_env),
       backup_env_(options.backup_env != nullptr ? options.backup_env : db_env_),
-      copy_file_buffer_size_(kDefaultCopyFileBufferSize),
       read_only_(read_only) {
   if (options_.backup_rate_limiter == nullptr &&
       options_.backup_rate_limit > 0) {
@@ -1263,11 +1261,6 @@ Status BackupEngineImpl::CreateNewBackupWithMetadata(
     s = backup_env_->CreateDir(private_dir);
   }
 
-  RateLimiter* rate_limiter = options_.backup_rate_limiter.get();
-  if (rate_limiter) {
-    copy_file_buffer_size_ = static_cast<size_t>(rate_limiter->GetSingleBurstBytes());
-  }
-
   // A set into which we will insert the dst_paths that are calculated for live
   // files and live WAL files.
   // This is used to check whether a live files shares a dst_path with another
@@ -1291,6 +1284,7 @@ Status BackupEngineImpl::CreateNewBackupWithMetadata(
             ? true
             : false;
     EnvOptions src_raw_env_options(db_options);
+    RateLimiter* rate_limiter = options_.backup_rate_limiter.get();
     s = checkpoint.CreateCustomCheckpoint(
         db_options,
         [&](const std::string& /*src_dirname*/, const std::string& /*fname*/,
@@ -1700,11 +1694,6 @@ Status BackupEngineImpl::RestoreDBFromBackup(const RestoreOptions& options,
     DeleteChildren(db_dir);
   }
 
-  RateLimiter* rate_limiter = options_.restore_rate_limiter.get();
-  if (rate_limiter) {
-    copy_file_buffer_size_ =
-        static_cast<size_t>(rate_limiter->GetSingleBurstBytes());
-  }
   Status s;
   std::vector<RestoreAfterCopyOrCreateWorkItem> restore_items_to_finish;
   std::string temporary_current_file;
@@ -1756,8 +1745,8 @@ Status BackupEngineImpl::RestoreDBFromBackup(const RestoreOptions& options,
                    dst.c_str());
     CopyOrCreateWorkItem copy_or_create_work_item(
         GetAbsolutePath(file), dst, "" /* contents */, backup_env_, db_env_,
-        EnvOptions() /* src_env_options */, options_.sync, rate_limiter,
-        0 /* size_limit */);
+        EnvOptions() /* src_env_options */, options_.sync,
+        options_.restore_rate_limiter.get(), 0 /* size_limit */);
     RestoreAfterCopyOrCreateWorkItem after_copy_or_create_work_item(
         copy_or_create_work_item.result.get_future(), file, dst,
         file_info->checksum_hex);
@@ -1916,13 +1905,17 @@ Status BackupEngineImpl::CopyOrCreateFile(
     return s;
   }
 
+  size_t buf_size =
+      rate_limiter ? static_cast<size_t>(rate_limiter->GetSingleBurstBytes())
+                   : kDefaultCopyFileBufferSize;
+
   std::unique_ptr<WritableFileWriter> dest_writer(
       new WritableFileWriter(std::move(dst_file), dst, dst_file_options));
   std::unique_ptr<SequentialFileReader> src_reader;
   std::unique_ptr<char[]> buf;
   if (!src.empty()) {
     src_reader.reset(new SequentialFileReader(std::move(src_file), src));
-    buf.reset(new char[copy_file_buffer_size_]);
+    buf.reset(new char[buf_size]);
   }
 
   Slice data;
@@ -1932,9 +1925,8 @@ Status BackupEngineImpl::CopyOrCreateFile(
       return Status::Incomplete("Backup stopped");
     }
     if (!src.empty()) {
-      size_t buffer_to_read = (copy_file_buffer_size_ < size_limit)
-                                  ? copy_file_buffer_size_
-                                  : static_cast<size_t>(size_limit);
+      size_t buffer_to_read =
+          (buf_size < size_limit) ? buf_size : static_cast<size_t>(size_limit);
       s = src_reader->Read(buffer_to_read, &data, buf.get());
       processed_buffer_size += buffer_to_read;
     } else {
@@ -2227,15 +2219,19 @@ Status BackupEngineImpl::ReadFileAndComputeChecksum(
     return s;
   }
 
-  std::unique_ptr<char[]> buf(new char[copy_file_buffer_size_]);
+  RateLimiter* rate_limiter = options_.backup_rate_limiter.get();
+  size_t buf_size =
+      rate_limiter ? static_cast<size_t>(rate_limiter->GetSingleBurstBytes())
+                   : kDefaultCopyFileBufferSize;
+  std::unique_ptr<char[]> buf(new char[buf_size]);
   Slice data;
 
   do {
     if (stop_backup_.load(std::memory_order_acquire)) {
       return Status::Incomplete("Backup stopped");
     }
-    size_t buffer_to_read = (copy_file_buffer_size_ < size_limit) ?
-      copy_file_buffer_size_ : static_cast<size_t>(size_limit);
+    size_t buffer_to_read =
+        (buf_size < size_limit) ? buf_size : static_cast<size_t>(size_limit);
     s = src_reader->Read(buffer_to_read, &data, buf.get());
 
     if (!s.ok()) {
index 2b7c5c8a347706a0a509a6c7e4cd97117151c954..97ed3aec1e925aa85b03879c93323c229a233365 100644 (file)
@@ -3120,6 +3120,13 @@ TEST_F(BackupEngineTest, Concurrency) {
   //
   // Because of the challenges of integrating this into db_stress,
   // this is a non-deterministic mini-stress test here instead.
+
+  // To check for a race condition in handling buffer size based on byte
+  // burst limit, we need a (generous) rate limiter
+  std::shared_ptr<RateLimiter> limiter{NewGenericRateLimiter(1000000000)};
+  backupable_options_->backup_rate_limiter = limiter;
+  backupable_options_->restore_rate_limiter = limiter;
+
   OpenDBAndBackupEngine(true, false, kShareWithChecksum);
 
   static constexpr int keys_iteration = 5000;
@@ -3145,8 +3152,9 @@ TEST_F(BackupEngineTest, Concurrency) {
   std::array<std::thread, 4> restore_verify_threads;
   for (uint32_t i = 0; i < read_threads.size(); ++i) {
     uint32_t sleep_micros = rng() % 100000;
-    read_threads[i] = std::thread(
-        [this, i, sleep_micros, &db_opts, &be_opts, &restore_verify_threads] {
+    read_threads[i] =
+        std::thread([this, i, sleep_micros, &db_opts, &be_opts,
+                     &restore_verify_threads, &limiter] {
           test_db_env_->SleepForMicroseconds(sleep_micros);
 
           // Whether to also re-open the BackupEngine, potentially seeing
@@ -3225,6 +3233,14 @@ TEST_F(BackupEngineTest, Concurrency) {
                                                  restore_db_dir));
           }
 
+          // Test for race condition in reconfiguring limiter
+          // FIXME: this could set to a different value in all threads, except
+          // GenericRateLimiter::SetBytesPerSecond has a write-write race
+          // reported by TSAN
+          if (i == 0) {
+            limiter->SetBytesPerSecond(2000000000);
+          }
+
           // Re-verify metadata (we don't receive updates from concurrently
           // creating a new backup)
           my_be->GetBackupInfo(&infos);