]> git-server-git.apps.pok.os.sepia.ceph.com Git - rocksdb.git/commitdiff
Support SstFileManager::SetDeleteRateBytesPerSecond()
authorIslam AbdelRahman <tec@fb.com>
Thu, 16 Mar 2017 19:06:04 +0000 (12:06 -0700)
committerIslam AbdelRahman <tec@fb.com>
Thu, 6 Apr 2017 23:55:23 +0000 (16:55 -0700)
Summary:
Update DeleteScheduler component to support changing delete rate in runtime by introducing
SstFileManager::SetDeleteRateBytesPerSecond()
Closes https://github.com/facebook/rocksdb/pull/1994

Differential Revision: D4719906

Pulled By: IslamAbdelRahman

fbshipit-source-id: e6b8d9e

db/db_sst_test.cc
include/rocksdb/sst_file_manager.h
util/delete_scheduler.cc
util/delete_scheduler.h
util/delete_scheduler_test.cc
util/sst_file_manager_impl.cc
util/sst_file_manager_impl.h

index 903fca7625859632fb1adf1e872090748e55d52e..d329935773b7a57901a516246c293cb27e25199e 100644 (file)
@@ -328,9 +328,10 @@ TEST_F(DBSSTTest, RateLimitedDelete) {
   std::string trash_dir = test::TmpDir(env_) + "/trash";
   int64_t rate_bytes_per_sec = 1024 * 10;  // 10 Kbs / Sec
   Status s;
-  options.sst_file_manager.reset(NewSstFileManager(
-      env_, nullptr, trash_dir, rate_bytes_per_sec, false, &s));
+  options.sst_file_manager.reset(
+      NewSstFileManager(env_, nullptr, trash_dir, 0, false, &s));
   ASSERT_OK(s);
+  options.sst_file_manager->SetDeleteRateBytesPerSecond(rate_bytes_per_sec);
   auto sfm = static_cast<SstFileManagerImpl*>(options.sst_file_manager.get());
 
   ASSERT_OK(TryReopen(options));
index f8fee2742f8ede924d52222592c0fa978dfaa52c..7490615c5b43ee660709e642557ef04470f802db 100644 (file)
@@ -50,6 +50,11 @@ class SstFileManager {
   // Return delete rate limit in bytes per second.
   // thread-safe
   virtual int64_t GetDeleteRateBytesPerSecond() = 0;
+
+  // Update the delete rate limit in bytes per second.
+  // zero means disable delete rate limiting and delete files immediately
+  // thread-safe
+  virtual void SetDeleteRateBytesPerSecond(int64_t delete_rate) = 0;
 };
 
 // Create a new SstFileManager that can be shared among multiple RocksDB
index 91387e3459dea3de02b45c295cd6a95936d6e3b8..a2ac477160d10ea3493fb3938420aa1b6f649c1b 100644 (file)
@@ -29,13 +29,8 @@ DeleteScheduler::DeleteScheduler(Env* env, const std::string& trash_dir,
       cv_(&mu_),
       info_log_(info_log),
       sst_file_manager_(sst_file_manager) {
-  if (rate_bytes_per_sec_ <= 0) {
-    // Rate limiting is disabled
-    bg_thread_.reset();
-  } else {
-    bg_thread_.reset(
-        new port::Thread(&DeleteScheduler::BackgroundEmptyTrash, this));
-  }
+  bg_thread_.reset(
+      new port::Thread(&DeleteScheduler::BackgroundEmptyTrash, this));
 }
 
 DeleteScheduler::~DeleteScheduler() {
@@ -51,8 +46,9 @@ DeleteScheduler::~DeleteScheduler() {
 
 Status DeleteScheduler::DeleteFile(const std::string& file_path) {
   Status s;
-  if (rate_bytes_per_sec_ <= 0) {
+  if (rate_bytes_per_sec_.load() <= 0) {
     // Rate limiting is disabled
+    TEST_SYNC_POINT("DeleteScheduler::DeleteFile");
     s = env_->DeleteFile(file_path);
     if (s.ok() && sst_file_manager_) {
       sst_file_manager_->OnDeleteFile(file_path);
@@ -147,7 +143,16 @@ void DeleteScheduler::BackgroundEmptyTrash() {
     // Delete all files in queue_
     uint64_t start_time = env_->NowMicros();
     uint64_t total_deleted_bytes = 0;
+    int64_t current_delete_rate = rate_bytes_per_sec_.load();
     while (!queue_.empty() && !closing_) {
+      if (current_delete_rate != rate_bytes_per_sec_.load()) {
+        // User changed the delete rate
+        current_delete_rate = rate_bytes_per_sec_.load();
+        start_time = env_->NowMicros();
+        total_deleted_bytes = 0;
+      }
+
+      // Get new file to delete
       std::string path_in_trash = queue_.front();
       queue_.pop();
 
@@ -164,9 +169,16 @@ void DeleteScheduler::BackgroundEmptyTrash() {
       }
 
       // Apply penlty if necessary
-      uint64_t total_penlty =
-          ((total_deleted_bytes * kMicrosInSecond) / rate_bytes_per_sec_);
-      while (!closing_ && !cv_.TimedWait(start_time + total_penlty)) {}
+      uint64_t total_penlty;
+      if (current_delete_rate > 0) {
+        // rate limiting is enabled
+        total_penlty =
+            ((total_deleted_bytes * kMicrosInSecond) / current_delete_rate);
+        while (!closing_ && !cv_.TimedWait(start_time + total_penlty)) {}
+      } else {
+        // rate limiting is disabled
+        total_penlty = 0;
+      }
       TEST_SYNC_POINT_CALLBACK("DeleteScheduler::BackgroundEmptyTrash:Wait",
                                &total_penlty);
 
index 878a117e56537bf541efb8d456e369452c560166..678bf5c177b0b1a62a00a93066e45420eadecf5a 100644 (file)
@@ -39,7 +39,12 @@ class DeleteScheduler {
   ~DeleteScheduler();
 
   // Return delete rate limit in bytes per second
-  int64_t GetRateBytesPerSecond() { return rate_bytes_per_sec_; }
+  int64_t GetRateBytesPerSecond() { return rate_bytes_per_sec_.load(); }
+
+  // Set delete rate limit in bytes per second
+  void SetRateBytesPerSecond(int64_t bytes_per_sec) {
+    return rate_bytes_per_sec_.store(bytes_per_sec);
+  }
 
   // Move file to trash directory and schedule it's deletion
   Status DeleteFile(const std::string& fname);
@@ -64,7 +69,7 @@ class DeleteScheduler {
   // Path to the trash directory
   std::string trash_dir_;
   // Maximum number of bytes that should be deleted per second
-  int64_t rate_bytes_per_sec_;
+  std::atomic<int64_t> rate_bytes_per_sec_;
   // Mutex to protect queue_, pending_files_, bg_errors_, closing_
   InstrumentedMutex mu_;
   // Queue of files in trash that need to be deleted
index 087ba38afb4968a9364b37a1cb135c9f7d6441df..0301c23cff70a59ce536b0d19fc2bf527d3428fe 100644 (file)
@@ -422,6 +422,90 @@ TEST_F(DeleteSchedulerTest, MoveToTrashError) {
 
   rocksdb::SyncPoint::GetInstance()->DisableProcessing();
 }
+
+TEST_F(DeleteSchedulerTest, DynamicRateLimiting1) {
+  std::vector<uint64_t> penalties;
+  int bg_delete_file = 0;
+  int fg_delete_file = 0;
+  rocksdb::SyncPoint::GetInstance()->SetCallBack(
+      "DeleteScheduler::DeleteTrashFile:DeleteFile",
+      [&](void* arg) { bg_delete_file++; });
+  rocksdb::SyncPoint::GetInstance()->SetCallBack(
+      "DeleteScheduler::DeleteFile",
+      [&](void* arg) { fg_delete_file++; });
+  rocksdb::SyncPoint::GetInstance()->SetCallBack(
+      "DeleteScheduler::BackgroundEmptyTrash:Wait",
+      [&](void* arg) { penalties.push_back(*(static_cast<int*>(arg))); });
+
+  rocksdb::SyncPoint::GetInstance()->LoadDependency({
+      {"DeleteSchedulerTest::DynamicRateLimiting1:1",
+       "DeleteScheduler::BackgroundEmptyTrash"},
+  });
+  rocksdb::SyncPoint::GetInstance()->EnableProcessing();
+
+  rate_bytes_per_sec_ = 0;  // Disable rate limiting initially
+  NewDeleteScheduler();
+
+
+  int num_files = 10;  // 10 files
+  uint64_t file_size = 1024;  // every file is 1 kb
+
+  std::vector<int64_t> delete_kbs_per_sec = {512, 200, 0, 100, 50, -2, 25};
+  for (size_t t = 0; t < delete_kbs_per_sec.size(); t++) {
+    penalties.clear();
+    bg_delete_file = 0;
+    fg_delete_file = 0;
+    rocksdb::SyncPoint::GetInstance()->ClearTrace();
+    rocksdb::SyncPoint::GetInstance()->EnableProcessing();
+
+    DestroyAndCreateDir(dummy_files_dir_);
+    rate_bytes_per_sec_ = delete_kbs_per_sec[t] * 1024;
+    delete_scheduler_->SetRateBytesPerSecond(rate_bytes_per_sec_);
+
+    // Create 100 dummy files, every file is 1 Kb
+    std::vector<std::string> generated_files;
+    for (int i = 0; i < num_files; i++) {
+      std::string file_name = "file" + ToString(i) + ".data";
+      generated_files.push_back(NewDummyFile(file_name, file_size));
+    }
+
+    // Delete dummy files and measure time spent to empty trash
+    for (int i = 0; i < num_files; i++) {
+      ASSERT_OK(delete_scheduler_->DeleteFile(generated_files[i]));
+    }
+    ASSERT_EQ(CountFilesInDir(dummy_files_dir_), 0);
+
+    if (rate_bytes_per_sec_ > 0) {
+      uint64_t delete_start_time = env_->NowMicros();
+      TEST_SYNC_POINT("DeleteSchedulerTest::DynamicRateLimiting1:1");
+      delete_scheduler_->WaitForEmptyTrash();
+      uint64_t time_spent_deleting = env_->NowMicros() - delete_start_time;
+
+      auto bg_errors = delete_scheduler_->GetBackgroundErrors();
+      ASSERT_EQ(bg_errors.size(), 0);
+
+      uint64_t total_files_size = 0;
+      uint64_t expected_penlty = 0;
+      ASSERT_EQ(penalties.size(), num_files);
+      for (int i = 0; i < num_files; i++) {
+        total_files_size += file_size;
+        expected_penlty = ((total_files_size * 1000000) / rate_bytes_per_sec_);
+        ASSERT_EQ(expected_penlty, penalties[i]);
+      }
+      ASSERT_GT(time_spent_deleting, expected_penlty * 0.9);
+      ASSERT_EQ(bg_delete_file, num_files);
+      ASSERT_EQ(fg_delete_file, 0);
+    } else {
+      ASSERT_EQ(penalties.size(), 0);
+      ASSERT_EQ(bg_delete_file, 0);
+      ASSERT_EQ(fg_delete_file, num_files);
+    }
+
+    ASSERT_EQ(CountFilesInDir(trash_dir_), 0);
+    rocksdb::SyncPoint::GetInstance()->DisableProcessing();
+  }
+}
+
 }  // namespace rocksdb
 
 int main(int argc, char** argv) {
index 208971cb5a9069d07394321fa856b262cd0368af..863118938e83b1ec72cdab299a0f21b906724b9c 100644 (file)
@@ -87,6 +87,10 @@ int64_t SstFileManagerImpl::GetDeleteRateBytesPerSecond() {
   return delete_scheduler_.GetRateBytesPerSecond();
 }
 
+void SstFileManagerImpl::SetDeleteRateBytesPerSecond(int64_t delete_rate) {
+  return delete_scheduler_.SetRateBytesPerSecond(delete_rate);
+}
+
 Status SstFileManagerImpl::ScheduleFileDeletion(const std::string& file_path) {
   return delete_scheduler_.DeleteFile(file_path);
 }
@@ -127,7 +131,7 @@ SstFileManager* NewSstFileManager(Env* env, std::shared_ptr<Logger> info_log,
       new SstFileManagerImpl(env, info_log, trash_dir, rate_bytes_per_sec);
 
   Status s;
-  if (trash_dir != "" && rate_bytes_per_sec > 0) {
+  if (trash_dir != "") {
     s = env->CreateDirIfMissing(trash_dir);
     if (s.ok() && delete_existing_trash) {
       std::vector<std::string> files_in_trash;
index 543c9d95a1a6261edfa493fe4e8c68fdacb7d036..c9df6e9ca577a5266975fa3c68b17ac0ea42aef7 100644 (file)
@@ -64,6 +64,9 @@ class SstFileManagerImpl : public SstFileManager {
   // Return delete rate limit in bytes per second.
   virtual int64_t GetDeleteRateBytesPerSecond() override;
 
+  // Update the delete rate limit in bytes per second.
+  virtual void SetDeleteRateBytesPerSecond(int64_t delete_rate) override;
+
   // Move file to trash directory and schedule it's deletion.
   virtual Status ScheduleFileDeletion(const std::string& file_path);