std::string trash_dir = test::TmpDir(env_) + "/trash";
int64_t rate_bytes_per_sec = 1024 * 10; // 10 Kbs / Sec
Status s;
- options.sst_file_manager.reset(NewSstFileManager(
- env_, nullptr, trash_dir, rate_bytes_per_sec, false, &s));
+ options.sst_file_manager.reset(
+ NewSstFileManager(env_, nullptr, trash_dir, 0, false, &s));
ASSERT_OK(s);
+ options.sst_file_manager->SetDeleteRateBytesPerSecond(rate_bytes_per_sec);
auto sfm = static_cast<SstFileManagerImpl*>(options.sst_file_manager.get());
ASSERT_OK(TryReopen(options));
// Return delete rate limit in bytes per second.
// thread-safe
virtual int64_t GetDeleteRateBytesPerSecond() = 0;
+
+ // Update the delete rate limit in bytes per second.
+ // zero means disable delete rate limiting and delete files immediately
+ // thread-safe
+ virtual void SetDeleteRateBytesPerSecond(int64_t delete_rate) = 0;
};
// Create a new SstFileManager that can be shared among multiple RocksDB
cv_(&mu_),
info_log_(info_log),
sst_file_manager_(sst_file_manager) {
- if (rate_bytes_per_sec_ <= 0) {
- // Rate limiting is disabled
- bg_thread_.reset();
- } else {
- bg_thread_.reset(
- new port::Thread(&DeleteScheduler::BackgroundEmptyTrash, this));
- }
+ bg_thread_.reset(
+ new port::Thread(&DeleteScheduler::BackgroundEmptyTrash, this));
}
DeleteScheduler::~DeleteScheduler() {
Status DeleteScheduler::DeleteFile(const std::string& file_path) {
Status s;
- if (rate_bytes_per_sec_ <= 0) {
+ if (rate_bytes_per_sec_.load() <= 0) {
// Rate limiting is disabled
+ TEST_SYNC_POINT("DeleteScheduler::DeleteFile");
s = env_->DeleteFile(file_path);
if (s.ok() && sst_file_manager_) {
sst_file_manager_->OnDeleteFile(file_path);
// Delete all files in queue_
uint64_t start_time = env_->NowMicros();
uint64_t total_deleted_bytes = 0;
+ int64_t current_delete_rate = rate_bytes_per_sec_.load();
while (!queue_.empty() && !closing_) {
+ if (current_delete_rate != rate_bytes_per_sec_.load()) {
+ // User changed the delete rate
+ current_delete_rate = rate_bytes_per_sec_.load();
+ start_time = env_->NowMicros();
+ total_deleted_bytes = 0;
+ }
+
+ // Get new file to delete
std::string path_in_trash = queue_.front();
queue_.pop();
}
// Apply penlty if necessary
- uint64_t total_penlty =
- ((total_deleted_bytes * kMicrosInSecond) / rate_bytes_per_sec_);
- while (!closing_ && !cv_.TimedWait(start_time + total_penlty)) {}
+ uint64_t total_penlty;
+ if (current_delete_rate > 0) {
+ // rate limiting is enabled
+ total_penlty =
+ ((total_deleted_bytes * kMicrosInSecond) / current_delete_rate);
+ while (!closing_ && !cv_.TimedWait(start_time + total_penlty)) {}
+ } else {
+ // rate limiting is disabled
+ total_penlty = 0;
+ }
TEST_SYNC_POINT_CALLBACK("DeleteScheduler::BackgroundEmptyTrash:Wait",
&total_penlty);
~DeleteScheduler();
// Return delete rate limit in bytes per second
- int64_t GetRateBytesPerSecond() { return rate_bytes_per_sec_; }
+ int64_t GetRateBytesPerSecond() { return rate_bytes_per_sec_.load(); }
+
+ // Set delete rate limit in bytes per second
+ void SetRateBytesPerSecond(int64_t bytes_per_sec) {
+ return rate_bytes_per_sec_.store(bytes_per_sec);
+ }
// Move file to trash directory and schedule it's deletion
Status DeleteFile(const std::string& fname);
// Path to the trash directory
std::string trash_dir_;
// Maximum number of bytes that should be deleted per second
- int64_t rate_bytes_per_sec_;
+ std::atomic<int64_t> rate_bytes_per_sec_;
// Mutex to protect queue_, pending_files_, bg_errors_, closing_
InstrumentedMutex mu_;
// Queue of files in trash that need to be deleted
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
}
+
+TEST_F(DeleteSchedulerTest, DynamicRateLimiting1) {
+ std::vector<uint64_t> penalties;
+ int bg_delete_file = 0;
+ int fg_delete_file = 0;
+ rocksdb::SyncPoint::GetInstance()->SetCallBack(
+ "DeleteScheduler::DeleteTrashFile:DeleteFile",
+ [&](void* arg) { bg_delete_file++; });
+ rocksdb::SyncPoint::GetInstance()->SetCallBack(
+ "DeleteScheduler::DeleteFile",
+ [&](void* arg) { fg_delete_file++; });
+ rocksdb::SyncPoint::GetInstance()->SetCallBack(
+ "DeleteScheduler::BackgroundEmptyTrash:Wait",
+ [&](void* arg) { penalties.push_back(*(static_cast<int*>(arg))); });
+
+ rocksdb::SyncPoint::GetInstance()->LoadDependency({
+ {"DeleteSchedulerTest::DynamicRateLimiting1:1",
+ "DeleteScheduler::BackgroundEmptyTrash"},
+ });
+ rocksdb::SyncPoint::GetInstance()->EnableProcessing();
+
+ rate_bytes_per_sec_ = 0; // Disable rate limiting initially
+ NewDeleteScheduler();
+
+
+ int num_files = 10; // 10 files
+ uint64_t file_size = 1024; // every file is 1 kb
+
+ std::vector<int64_t> delete_kbs_per_sec = {512, 200, 0, 100, 50, -2, 25};
+ for (size_t t = 0; t < delete_kbs_per_sec.size(); t++) {
+ penalties.clear();
+ bg_delete_file = 0;
+ fg_delete_file = 0;
+ rocksdb::SyncPoint::GetInstance()->ClearTrace();
+ rocksdb::SyncPoint::GetInstance()->EnableProcessing();
+
+ DestroyAndCreateDir(dummy_files_dir_);
+ rate_bytes_per_sec_ = delete_kbs_per_sec[t] * 1024;
+ delete_scheduler_->SetRateBytesPerSecond(rate_bytes_per_sec_);
+
+ // Create 100 dummy files, every file is 1 Kb
+ std::vector<std::string> generated_files;
+ for (int i = 0; i < num_files; i++) {
+ std::string file_name = "file" + ToString(i) + ".data";
+ generated_files.push_back(NewDummyFile(file_name, file_size));
+ }
+
+ // Delete dummy files and measure time spent to empty trash
+ for (int i = 0; i < num_files; i++) {
+ ASSERT_OK(delete_scheduler_->DeleteFile(generated_files[i]));
+ }
+ ASSERT_EQ(CountFilesInDir(dummy_files_dir_), 0);
+
+ if (rate_bytes_per_sec_ > 0) {
+ uint64_t delete_start_time = env_->NowMicros();
+ TEST_SYNC_POINT("DeleteSchedulerTest::DynamicRateLimiting1:1");
+ delete_scheduler_->WaitForEmptyTrash();
+ uint64_t time_spent_deleting = env_->NowMicros() - delete_start_time;
+
+ auto bg_errors = delete_scheduler_->GetBackgroundErrors();
+ ASSERT_EQ(bg_errors.size(), 0);
+
+ uint64_t total_files_size = 0;
+ uint64_t expected_penlty = 0;
+ ASSERT_EQ(penalties.size(), num_files);
+ for (int i = 0; i < num_files; i++) {
+ total_files_size += file_size;
+ expected_penlty = ((total_files_size * 1000000) / rate_bytes_per_sec_);
+ ASSERT_EQ(expected_penlty, penalties[i]);
+ }
+ ASSERT_GT(time_spent_deleting, expected_penlty * 0.9);
+ ASSERT_EQ(bg_delete_file, num_files);
+ ASSERT_EQ(fg_delete_file, 0);
+ } else {
+ ASSERT_EQ(penalties.size(), 0);
+ ASSERT_EQ(bg_delete_file, 0);
+ ASSERT_EQ(fg_delete_file, num_files);
+ }
+
+ ASSERT_EQ(CountFilesInDir(trash_dir_), 0);
+ rocksdb::SyncPoint::GetInstance()->DisableProcessing();
+ }
+}
+
} // namespace rocksdb
int main(int argc, char** argv) {
return delete_scheduler_.GetRateBytesPerSecond();
}
+void SstFileManagerImpl::SetDeleteRateBytesPerSecond(int64_t delete_rate) {
+ return delete_scheduler_.SetRateBytesPerSecond(delete_rate);
+}
+
Status SstFileManagerImpl::ScheduleFileDeletion(const std::string& file_path) {
return delete_scheduler_.DeleteFile(file_path);
}
new SstFileManagerImpl(env, info_log, trash_dir, rate_bytes_per_sec);
Status s;
- if (trash_dir != "" && rate_bytes_per_sec > 0) {
+ if (trash_dir != "") {
s = env->CreateDirIfMissing(trash_dir);
if (s.ok() && delete_existing_trash) {
std::vector<std::string> files_in_trash;
// Return delete rate limit in bytes per second.
virtual int64_t GetDeleteRateBytesPerSecond() override;
+ // Update the delete rate limit in bytes per second.
+ virtual void SetDeleteRateBytesPerSecond(int64_t delete_rate) override;
+
// Move file to trash directory and schedule it's deletion.
virtual Status ScheduleFileDeletion(const std::string& file_path);