std::unique_ptr<Directory> private_directory_;
static const size_t kDefaultCopyFileBufferSize = 5 * 1024 * 1024LL; // 5MB
- mutable size_t copy_file_buffer_size_;
bool read_only_;
BackupStatistics backup_statistics_;
std::unordered_set<std::string> reported_ignored_fields_;
options_(options),
db_env_(db_env),
backup_env_(options.backup_env != nullptr ? options.backup_env : db_env_),
- copy_file_buffer_size_(kDefaultCopyFileBufferSize),
read_only_(read_only) {
if (options_.backup_rate_limiter == nullptr &&
options_.backup_rate_limit > 0) {
s = backup_env_->CreateDir(private_dir);
}
- RateLimiter* rate_limiter = options_.backup_rate_limiter.get();
- if (rate_limiter) {
- copy_file_buffer_size_ = static_cast<size_t>(rate_limiter->GetSingleBurstBytes());
- }
-
// A set into which we will insert the dst_paths that are calculated for live
// files and live WAL files.
// This is used to check whether a live files shares a dst_path with another
? true
: false;
EnvOptions src_raw_env_options(db_options);
+ RateLimiter* rate_limiter = options_.backup_rate_limiter.get();
s = checkpoint.CreateCustomCheckpoint(
db_options,
[&](const std::string& /*src_dirname*/, const std::string& /*fname*/,
DeleteChildren(db_dir);
}
- RateLimiter* rate_limiter = options_.restore_rate_limiter.get();
- if (rate_limiter) {
- copy_file_buffer_size_ =
- static_cast<size_t>(rate_limiter->GetSingleBurstBytes());
- }
Status s;
std::vector<RestoreAfterCopyOrCreateWorkItem> restore_items_to_finish;
std::string temporary_current_file;
dst.c_str());
CopyOrCreateWorkItem copy_or_create_work_item(
GetAbsolutePath(file), dst, "" /* contents */, backup_env_, db_env_,
- EnvOptions() /* src_env_options */, options_.sync, rate_limiter,
- 0 /* size_limit */);
+ EnvOptions() /* src_env_options */, options_.sync,
+ options_.restore_rate_limiter.get(), 0 /* size_limit */);
RestoreAfterCopyOrCreateWorkItem after_copy_or_create_work_item(
copy_or_create_work_item.result.get_future(), file, dst,
file_info->checksum_hex);
return s;
}
+ size_t buf_size =
+ rate_limiter ? static_cast<size_t>(rate_limiter->GetSingleBurstBytes())
+ : kDefaultCopyFileBufferSize;
+
std::unique_ptr<WritableFileWriter> dest_writer(
new WritableFileWriter(std::move(dst_file), dst, dst_file_options));
std::unique_ptr<SequentialFileReader> src_reader;
std::unique_ptr<char[]> buf;
if (!src.empty()) {
src_reader.reset(new SequentialFileReader(std::move(src_file), src));
- buf.reset(new char[copy_file_buffer_size_]);
+ buf.reset(new char[buf_size]);
}
Slice data;
return Status::Incomplete("Backup stopped");
}
if (!src.empty()) {
- size_t buffer_to_read = (copy_file_buffer_size_ < size_limit)
- ? copy_file_buffer_size_
- : static_cast<size_t>(size_limit);
+ size_t buffer_to_read =
+ (buf_size < size_limit) ? buf_size : static_cast<size_t>(size_limit);
s = src_reader->Read(buffer_to_read, &data, buf.get());
processed_buffer_size += buffer_to_read;
} else {
return s;
}
- std::unique_ptr<char[]> buf(new char[copy_file_buffer_size_]);
+ RateLimiter* rate_limiter = options_.backup_rate_limiter.get();
+ size_t buf_size =
+ rate_limiter ? static_cast<size_t>(rate_limiter->GetSingleBurstBytes())
+ : kDefaultCopyFileBufferSize;
+ std::unique_ptr<char[]> buf(new char[buf_size]);
Slice data;
do {
if (stop_backup_.load(std::memory_order_acquire)) {
return Status::Incomplete("Backup stopped");
}
- size_t buffer_to_read = (copy_file_buffer_size_ < size_limit) ?
- copy_file_buffer_size_ : static_cast<size_t>(size_limit);
+ size_t buffer_to_read =
+ (buf_size < size_limit) ? buf_size : static_cast<size_t>(size_limit);
s = src_reader->Read(buffer_to_read, &data, buf.get());
if (!s.ok()) {
//
// Because of the challenges of integrating this into db_stress,
// this is a non-deterministic mini-stress test here instead.
+
+ // To check for a race condition in handling buffer size based on byte
+ // burst limit, we need a (generous) rate limiter
+ std::shared_ptr<RateLimiter> limiter{NewGenericRateLimiter(1000000000)};
+ backupable_options_->backup_rate_limiter = limiter;
+ backupable_options_->restore_rate_limiter = limiter;
+
OpenDBAndBackupEngine(true, false, kShareWithChecksum);
static constexpr int keys_iteration = 5000;
std::array<std::thread, 4> restore_verify_threads;
for (uint32_t i = 0; i < read_threads.size(); ++i) {
uint32_t sleep_micros = rng() % 100000;
- read_threads[i] = std::thread(
- [this, i, sleep_micros, &db_opts, &be_opts, &restore_verify_threads] {
+ read_threads[i] =
+ std::thread([this, i, sleep_micros, &db_opts, &be_opts,
+ &restore_verify_threads, &limiter] {
test_db_env_->SleepForMicroseconds(sleep_micros);
// Whether to also re-open the BackupEngine, potentially seeing
restore_db_dir));
}
+ // Test for race condition in reconfiguring limiter
+ // FIXME: this could set to a different value in all threads, except
+ // GenericRateLimiter::SetBytesPerSecond has a write-write race
+ // reported by TSAN
+ if (i == 0) {
+ limiter->SetBytesPerSecond(2000000000);
+ }
+
// Re-verify metadata (we don't receive updates from concurrently
// creating a new backup)
my_be->GetBackupInfo(&infos);