Status LoadFromFile(
const std::string& backup_dir,
const std::unordered_map<std::string, uint64_t>& abs_path_to_size,
- Logger* info_log,
+ RateLimiter* rate_limiter, Logger* info_log,
std::unordered_set<std::string>* reported_ignored_fields);
Status StoreToFile(
bool sync, const TEST_FutureSchemaVersion2Options* test_future_options);
// Obtain db_id and db_session_id from the table properties of file_path
Status GetFileDbIdentities(Env* src_env, const EnvOptions& src_env_options,
- const std::string& file_path, std::string* db_id,
+ const std::string& file_path,
+ RateLimiter* rate_limiter, std::string* db_id,
std::string* db_session_id);
struct CopyOrCreateResult {
&abs_path_to_size);
if (s.ok()) {
s = backup_iter->second->LoadFromFile(
- options_.backup_dir, abs_path_to_size, options_.info_log,
+ options_.backup_dir, abs_path_to_size,
+ options_.backup_rate_limiter.get(), options_.info_log,
&reported_ignored_fields_);
}
if (s.IsCorruption() || s.IsNotSupported()) {
size_t buffer_to_read =
(buf_size < size_limit) ? buf_size : static_cast<size_t>(size_limit);
s = src_reader->Read(buffer_to_read, &data, buf.get());
+ if (rate_limiter != nullptr) {
+ rate_limiter->Request(data.size(), Env::IO_LOW, nullptr /* stats */,
+ RateLimiter::OpType::kRead);
+ }
processed_buffer_size += buffer_to_read;
} else {
data = contents;
// Prepare db_session_id to add to the file name
// Ignore the returned status
// In the failed cases, db_id and db_session_id will be empty
- GetFileDbIdentities(db_env_, src_env_options, src_dir + fname, &db_id,
- &db_session_id)
+ GetFileDbIdentities(db_env_, src_env_options, src_dir + fname,
+ rate_limiter, &db_id, &db_session_id)
.PermitUncheckedError();
}
// Calculate checksum if checksum and db session id are not available.
size_t buffer_to_read =
(buf_size < size_limit) ? buf_size : static_cast<size_t>(size_limit);
s = src_reader->Read(buffer_to_read, &data, buf.get());
-
+ if (rate_limiter != nullptr) {
+ rate_limiter->Request(data.size(), Env::IO_LOW, nullptr /* stats */,
+ RateLimiter::OpType::kRead);
+ }
if (!s.ok()) {
return s;
}
Status BackupEngineImpl::GetFileDbIdentities(Env* src_env,
const EnvOptions& src_env_options,
const std::string& file_path,
+ RateLimiter* rate_limiter,
std::string* db_id,
std::string* db_session_id) {
assert(db_id != nullptr || db_session_id != nullptr);
table_properties = sst_reader.GetInitTableProperties();
} else {
table_properties = tp.get();
+ if (table_properties != nullptr && rate_limiter != nullptr) {
+ // sizeof(*table_properties) is a sufficent but far-from-exact
+ // approximation of read bytes due to metaindex block, std::string
+ // properties and varint compression
+ rate_limiter->Request(sizeof(*table_properties), Env::IO_LOW,
+ nullptr /* stats */, RateLimiter::OpType::kRead);
+ }
}
} else {
ROCKS_LOG_INFO(options_.info_log, "Failed to read %s: %s",
Status BackupEngineImpl::BackupMeta::LoadFromFile(
const std::string& backup_dir,
const std::unordered_map<std::string, uint64_t>& abs_path_to_size,
- Logger* info_log,
+ RateLimiter* rate_limiter, Logger* info_log,
std::unordered_set<std::string>* reported_ignored_fields) {
assert(reported_ignored_fields);
assert(Empty());
// Failures handled at the end
std::string line;
if (backup_meta_reader->ReadLine(&line)) {
+ if (rate_limiter != nullptr) {
+ rate_limiter->Request(line.size(), Env::IO_LOW, nullptr /* stats */,
+ RateLimiter::OpType::kRead);
+ }
if (StartsWith(line, kSchemaVersionPrefix)) {
std::string ver = line.substr(kSchemaVersionPrefix.size());
if (ver == "2" || StartsWith(ver, "2.")) {
return Status::Corruption("Unexpected empty line");
}
}
- if (!line.empty() || backup_meta_reader->ReadLine(&line)) {
+ if (!line.empty()) {
+ timestamp_ = std::strtoull(line.c_str(), nullptr, /*base*/ 10);
+ } else if (backup_meta_reader->ReadLine(&line)) {
+ if (rate_limiter != nullptr) {
+ rate_limiter->Request(line.size(), Env::IO_LOW, nullptr /* stats */,
+ RateLimiter::OpType::kRead);
+ }
timestamp_ = std::strtoull(line.c_str(), nullptr, /*base*/ 10);
}
if (backup_meta_reader->ReadLine(&line)) {
+ if (rate_limiter != nullptr) {
+ rate_limiter->Request(line.size(), Env::IO_LOW, nullptr /* stats */,
+ RateLimiter::OpType::kRead);
+ }
sequence_number_ = std::strtoull(line.c_str(), nullptr, /*base*/ 10);
}
uint32_t num_files = UINT32_MAX;
while (backup_meta_reader->ReadLine(&line)) {
+ if (rate_limiter != nullptr) {
+ rate_limiter->Request(line.size(), Env::IO_LOW, nullptr /* stats */,
+ RateLimiter::OpType::kRead);
+ }
if (line.empty()) {
return Status::Corruption("Unexpected empty line");
}
std::vector<std::shared_ptr<FileInfo>> files;
bool footer_present = false;
while (backup_meta_reader->ReadLine(&line)) {
+ if (rate_limiter != nullptr) {
+ rate_limiter->Request(line.size(), Env::IO_LOW, nullptr /* stats */,
+ RateLimiter::OpType::kRead);
+ }
std::vector<std::string> components = StringSplit(line, ' ');
if (components.size() < 1) {
if (footer_present) {
assert(schema_major_version >= 2);
while (backup_meta_reader->ReadLine(&line)) {
+ if (rate_limiter != nullptr) {
+ rate_limiter->Request(line.size(), Env::IO_LOW, nullptr /* stats */,
+ RateLimiter::OpType::kRead);
+ }
if (line.empty()) {
return Status::Corruption("Unexpected empty line");
}
#include <algorithm>
#include <array>
+#include <cstddef>
+#include <cstdint>
#include <limits>
#include <random>
#include <string>
AssertBackupConsistency(0, 0, 100000, 100010);
}
+
+TEST_P(BackupEngineRateLimitingTestWithParam, RateLimitingVerifyBackup) {
+ const std::size_t kMicrosPerSec = 1000 * 1000LL;
+ std::shared_ptr<RateLimiter> backupThrottler(NewGenericRateLimiter(
+ 1, 100 * 1000 /* refill_period_us */, 10 /* fairness */,
+ RateLimiter::Mode::kAllIo /* mode */));
+
+ bool makeThrottler = std::get<0>(GetParam());
+ if (makeThrottler) {
+ backupable_options_->backup_rate_limiter = backupThrottler;
+ }
+
+ bool is_single_threaded = std::get<1>(GetParam()) == 0 ? true : false;
+ backupable_options_->max_background_operations = is_single_threaded ? 1 : 10;
+
+ const std::uint64_t backup_rate_limiter_limit = std::get<2>(GetParam()).first;
+ if (makeThrottler) {
+ backupable_options_->backup_rate_limiter->SetBytesPerSecond(
+ backup_rate_limiter_limit);
+ } else {
+ backupable_options_->backup_rate_limit = backup_rate_limiter_limit;
+ }
+
+ DestroyDB(dbname_, Options());
+ OpenDBAndBackupEngine(true /* destroy_old_data */);
+ FillDB(db_.get(), 0, 100000);
+ ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(),
+ false /* flush_before_backup */));
+
+ std::vector<BackupInfo> backup_infos;
+ BackupInfo backup_info;
+ backup_engine_->GetBackupInfo(&backup_infos);
+ ASSERT_EQ(1, backup_infos.size());
+ const int backup_id = 1;
+ ASSERT_EQ(backup_id, backup_infos[0].backup_id);
+ ASSERT_OK(backup_engine_->GetBackupInfo(backup_id, &backup_info,
+ true /* include_file_details */));
+
+ std::uint64_t bytes_read_during_verify_backup = 0;
+ for (BackupFileInfo backup_file_info : backup_info.file_details) {
+ bytes_read_during_verify_backup += backup_file_info.size;
+ }
+
+ auto start_verify_backup = db_chroot_env_->NowMicros();
+ ASSERT_OK(
+ backup_engine_->VerifyBackup(backup_id, true /* verify_with_checksum */));
+ auto verify_backup_time = db_chroot_env_->NowMicros() - start_verify_backup;
+ auto rate_limited_verify_backup_time =
+ (bytes_read_during_verify_backup * kMicrosPerSec) /
+ backup_rate_limiter_limit;
+
+ if (makeThrottler) {
+ EXPECT_GE(verify_backup_time, 0.8 * rate_limited_verify_backup_time);
+ }
+ CloseDBAndBackupEngine();
+ AssertBackupConsistency(backup_id, 0, 100000, 100010);
+ DestroyDB(dbname_, Options());
+}
+
+TEST_P(BackupEngineRateLimitingTestWithParam, RateLimitingChargeReadInBackup) {
+ bool is_single_threaded = std::get<1>(GetParam()) == 0 ? true : false;
+ backupable_options_->max_background_operations = is_single_threaded ? 1 : 10;
+
+ const std::uint64_t backup_rate_limiter_limit = std::get<2>(GetParam()).first;
+ std::shared_ptr<RateLimiter> backup_rate_limiter(NewGenericRateLimiter(
+ backup_rate_limiter_limit, 100 * 1000 /* refill_period_us */,
+ 10 /* fairness */, RateLimiter::Mode::kWritesOnly /* mode */));
+ backupable_options_->backup_rate_limiter = backup_rate_limiter;
+
+ DestroyDB(dbname_, Options());
+ OpenDBAndBackupEngine(true /* destroy_old_data */, false /* dummy */,
+ kShareWithChecksum /* shared_option */);
+ FillDB(db_.get(), 0, 10);
+ ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(),
+ false /* flush_before_backup */));
+ std::int64_t total_bytes_through_with_no_read_charged =
+ backup_rate_limiter->GetTotalBytesThrough();
+ CloseBackupEngine();
+
+ backup_rate_limiter.reset(NewGenericRateLimiter(
+ backup_rate_limiter_limit, 100 * 1000 /* refill_period_us */,
+ 10 /* fairness */, RateLimiter::Mode::kAllIo /* mode */));
+ backupable_options_->backup_rate_limiter = backup_rate_limiter;
+
+ OpenBackupEngine(true);
+ ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(),
+ false /* flush_before_backup */));
+ std::int64_t total_bytes_through_with_read_charged =
+ backup_rate_limiter->GetTotalBytesThrough();
+ EXPECT_GT(total_bytes_through_with_read_charged,
+ total_bytes_through_with_no_read_charged);
+ CloseDBAndBackupEngine();
+ AssertBackupConsistency(1, 0, 10, 20);
+ DestroyDB(dbname_, Options());
+}
+
+TEST_P(BackupEngineRateLimitingTestWithParam, RateLimitingChargeReadInRestore) {
+ bool is_single_threaded = std::get<1>(GetParam()) == 0 ? true : false;
+ backupable_options_->max_background_operations = is_single_threaded ? 1 : 10;
+
+ const std::uint64_t restore_rate_limiter_limit =
+ std::get<2>(GetParam()).second;
+ std::shared_ptr<RateLimiter> restore_rate_limiter(NewGenericRateLimiter(
+ restore_rate_limiter_limit, 100 * 1000 /* refill_period_us */,
+ 10 /* fairness */, RateLimiter::Mode::kWritesOnly /* mode */));
+ backupable_options_->restore_rate_limiter = restore_rate_limiter;
+
+ DestroyDB(dbname_, Options());
+ OpenDBAndBackupEngine(true /* destroy_old_data */);
+ FillDB(db_.get(), 0, 10);
+ ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(),
+ false /* flush_before_backup */));
+ CloseDBAndBackupEngine();
+ DestroyDB(dbname_, Options());
+
+ OpenBackupEngine(false /* destroy_old_data */);
+ ASSERT_OK(backup_engine_->RestoreDBFromLatestBackup(dbname_, dbname_));
+ std::int64_t total_bytes_through_with_no_read_charged =
+ restore_rate_limiter->GetTotalBytesThrough();
+ CloseBackupEngine();
+ DestroyDB(dbname_, Options());
+
+ restore_rate_limiter.reset(NewGenericRateLimiter(
+ restore_rate_limiter_limit, 100 * 1000 /* refill_period_us */,
+ 10 /* fairness */, RateLimiter::Mode::kAllIo /* mode */));
+ backupable_options_->restore_rate_limiter = restore_rate_limiter;
+
+ OpenBackupEngine(false /* destroy_old_data */);
+ ASSERT_OK(backup_engine_->RestoreDBFromLatestBackup(dbname_, dbname_));
+ std::int64_t total_bytes_through_with_read_charged =
+ restore_rate_limiter->GetTotalBytesThrough();
+ EXPECT_EQ(total_bytes_through_with_read_charged,
+ total_bytes_through_with_no_read_charged * 2);
+ CloseBackupEngine();
+ AssertBackupConsistency(1, 0, 10, 20);
+ DestroyDB(dbname_, Options());
+}
+
+TEST_P(BackupEngineRateLimitingTestWithParam,
+ RateLimitingChargeReadInInitialize) {
+ bool is_single_threaded = std::get<1>(GetParam()) == 0 ? true : false;
+ backupable_options_->max_background_operations = is_single_threaded ? 1 : 10;
+
+ const std::uint64_t backup_rate_limiter_limit = std::get<2>(GetParam()).first;
+ std::shared_ptr<RateLimiter> backup_rate_limiter(NewGenericRateLimiter(
+ backup_rate_limiter_limit, 100 * 1000 /* refill_period_us */,
+ 10 /* fairness */, RateLimiter::Mode::kAllIo /* mode */));
+ backupable_options_->backup_rate_limiter = backup_rate_limiter;
+
+ DestroyDB(dbname_, Options());
+ OpenDBAndBackupEngine(true /* destroy_old_data */);
+ FillDB(db_.get(), 0, 10);
+ ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(),
+ false /* flush_before_backup */));
+ CloseDBAndBackupEngine();
+ AssertBackupConsistency(1, 0, 10, 20);
+
+ std::int64_t total_bytes_through_before_initialize =
+ backupable_options_->backup_rate_limiter->GetTotalBytesThrough();
+ OpenDBAndBackupEngine(false /* destroy_old_data */);
+ // We charge read in BackupEngineImpl::BackupMeta::LoadFromFile,
+ // which is called in BackupEngineImpl::Initialize() during
+ // OpenBackupEngine(false)
+ EXPECT_GT(backupable_options_->backup_rate_limiter->GetTotalBytesThrough(),
+ total_bytes_through_before_initialize);
+ CloseDBAndBackupEngine();
+ DestroyDB(dbname_, Options());
+}
#endif // !defined(ROCKSDB_VALGRIND_RUN) || defined(ROCKSDB_FULL_VALGRIND_RUN)
TEST_F(BackupEngineTest, ReadOnlyBackupEngine) {