#include "file/sequence_file_reader.h"
#include "file/writable_file_writer.h"
#include "logging/logging.h"
+#include "monitoring/iostats_context_imp.h"
#include "port/port.h"
#include "rocksdb/rate_limiter.h"
#include "rocksdb/transaction_log.h"
bool sync;
RateLimiter* rate_limiter;
uint64_t size_limit;
+ Statistics* stats;
std::promise<CopyOrCreateResult> result;
std::function<void()> progress_callback;
std::string src_checksum_func_name;
sync(false),
rate_limiter(nullptr),
size_limit(0),
+ stats(nullptr),
src_checksum_func_name(kUnknownFileChecksumFuncName),
src_checksum_hex(""),
db_id(""),
sync = o.sync;
rate_limiter = o.rate_limiter;
size_limit = o.size_limit;
+ stats = o.stats;
result = std::move(o.result);
progress_callback = std::move(o.progress_callback);
src_checksum_func_name = std::move(o.src_checksum_func_name);
CopyOrCreateWorkItem(
std::string _src_path, std::string _dst_path, std::string _contents,
Env* _src_env, Env* _dst_env, EnvOptions _src_env_options, bool _sync,
- RateLimiter* _rate_limiter, uint64_t _size_limit,
+ RateLimiter* _rate_limiter, uint64_t _size_limit, Statistics* _stats,
std::function<void()> _progress_callback = []() {},
const std::string& _src_checksum_func_name =
kUnknownFileChecksumFuncName,
sync(_sync),
rate_limiter(_rate_limiter),
size_limit(_size_limit),
+ stats(_stats),
progress_callback(_progress_callback),
src_checksum_func_name(_src_checksum_func_name),
src_checksum_hex(_src_checksum_hex),
BackupID backup_id, bool shared, const std::string& src_dir,
const std::string& fname, // starts with "/"
const EnvOptions& src_env_options, RateLimiter* rate_limiter,
- FileType file_type, uint64_t size_bytes, uint64_t size_limit = 0,
- bool shared_checksum = false,
+ FileType file_type, uint64_t size_bytes, Statistics* stats,
+ uint64_t size_limit = 0, bool shared_checksum = false,
std::function<void()> progress_callback = []() {},
const std::string& contents = std::string(),
const std::string& src_checksum_func_name = kUnknownFileChecksumFuncName,
port::SetCpuPriority(0, priority);
current_priority = priority;
}
+ // `bytes_read` and `bytes_written` stats are enabled based on
+ // compile-time support and cannot be dynamically toggled. So we do not
+ // need to worry about `PerfLevel` here, unlike many other
+ // `IOStatsContext` / `PerfContext` stats.
+ uint64_t prev_bytes_read = IOSTATS(bytes_read);
+ uint64_t prev_bytes_written = IOSTATS(bytes_written);
+
CopyOrCreateResult result;
result.status = CopyOrCreateFile(
work_item.src_path, work_item.dst_path, work_item.contents,
work_item.sync, work_item.rate_limiter, &result.size,
&result.checksum_hex, work_item.size_limit,
work_item.progress_callback);
+
+ RecordTick(work_item.stats, BACKUP_READ_BYTES,
+ IOSTATS(bytes_read) - prev_bytes_read);
+ RecordTick(work_item.stats, BACKUP_WRITE_BYTES,
+ IOSTATS(bytes_written) - prev_bytes_written);
+
result.db_id = work_item.db_id;
result.db_session_id = work_item.db_session_id;
if (result.status.ok() && !work_item.src_checksum_hex.empty()) {
BackupID new_backup_id = latest_backup_id_ + 1;
+ // `bytes_read` and `bytes_written` stats are enabled based on compile-time
+ // support and cannot be dynamically toggled. So we do not need to worry about
+ // `PerfLevel` here, unlike many other `IOStatsContext` / `PerfContext` stats.
+ uint64_t prev_bytes_read = IOSTATS(bytes_read);
+ uint64_t prev_bytes_written = IOSTATS(bytes_written);
+
assert(backups_.find(new_backup_id) == backups_.end());
auto private_dir = GetAbsolutePath(GetPrivateFileRel(new_backup_id));
std::vector<BackupAfterCopyOrCreateWorkItem> backup_items_to_finish;
// Add a CopyOrCreateWorkItem to the channel for each live file
Status disabled = db->DisableFileDeletions();
+ DBOptions db_options = db->GetDBOptions();
+ Statistics* stats = db_options.statistics.get();
if (s.ok()) {
CheckpointImpl checkpoint(db);
uint64_t sequence_number = 0;
- DBOptions db_options = db->GetDBOptions();
FileChecksumGenFactory* db_checksum_factory =
db_options.file_checksum_gen_factory.get();
const std::string kFileChecksumGenFactoryName =
options_.share_table_files &&
(type == kTableFile || type == kBlobFile),
src_dirname, fname, src_env_options, rate_limiter, type,
- size_bytes, size_limit_bytes,
+ size_bytes, db_options.statistics.get(), size_limit_bytes,
options_.share_files_with_checksum &&
(type == kTableFile || type == kBlobFile),
options.progress_callback, "" /* contents */,
live_dst_paths, backup_items_to_finish, new_backup_id,
false /* shared */, "" /* src_dir */, fname,
EnvOptions() /* src_env_options */, rate_limiter, type,
- contents.size(), 0 /* size_limit */, false /* shared_checksum */,
- options.progress_callback, contents);
+ contents.size(), db_options.statistics.get(), 0 /* size_limit */,
+ false /* shared_checksum */, options.progress_callback, contents);
} /* create_file_cb */,
&sequence_number, options.flush_before_backup ? 0 : port::kMaxUint64,
compare_checksum);
if (s.ok()) {
backup_statistics_.IncrementNumberSuccessBackup();
- }
- if (!s.ok()) {
+ // here we know that we succeeded and installed the new backup
+ latest_backup_id_ = new_backup_id;
+ latest_valid_backup_id_ = new_backup_id;
+ if (new_backup_id_ptr) {
+ *new_backup_id_ptr = new_backup_id;
+ }
+ ROCKS_LOG_INFO(options_.info_log, "Backup DONE. All is good");
+
+ // backup_speed is in byte/second
+ double backup_speed = new_backup->GetSize() / (1.048576 * backup_time);
+ ROCKS_LOG_INFO(options_.info_log, "Backup number of files: %u",
+ new_backup->GetNumberFiles());
+ char human_size[16];
+ AppendHumanBytes(new_backup->GetSize(), human_size, sizeof(human_size));
+ ROCKS_LOG_INFO(options_.info_log, "Backup size: %s", human_size);
+ ROCKS_LOG_INFO(options_.info_log, "Backup time: %" PRIu64 " microseconds",
+ backup_time);
+ ROCKS_LOG_INFO(options_.info_log, "Backup speed: %.3f MB/s", backup_speed);
+ ROCKS_LOG_INFO(options_.info_log, "Backup Statistics %s",
+ backup_statistics_.ToString().c_str());
+ } else {
backup_statistics_.IncrementNumberFailBackup();
// clean all the files we might have created
ROCKS_LOG_INFO(options_.info_log, "Backup failed -- %s",
// delete files that we might have already written
might_need_garbage_collect_ = true;
DeleteBackup(new_backup_id).PermitUncheckedError();
- return s;
}
- // here we know that we succeeded and installed the new backup
- // in the LATEST_BACKUP file
- latest_backup_id_ = new_backup_id;
- latest_valid_backup_id_ = new_backup_id;
- if (new_backup_id_ptr) {
- *new_backup_id_ptr = new_backup_id;
- }
- ROCKS_LOG_INFO(options_.info_log, "Backup DONE. All is good");
-
- // backup_speed is in byte/second
- double backup_speed = new_backup->GetSize() / (1.048576 * backup_time);
- ROCKS_LOG_INFO(options_.info_log, "Backup number of files: %u",
- new_backup->GetNumberFiles());
- char human_size[16];
- AppendHumanBytes(new_backup->GetSize(), human_size, sizeof(human_size));
- ROCKS_LOG_INFO(options_.info_log, "Backup size: %s", human_size);
- ROCKS_LOG_INFO(options_.info_log, "Backup time: %" PRIu64 " microseconds",
- backup_time);
- ROCKS_LOG_INFO(options_.info_log, "Backup speed: %.3f MB/s", backup_speed);
- ROCKS_LOG_INFO(options_.info_log, "Backup Statistics %s",
- backup_statistics_.ToString().c_str());
+ RecordTick(stats, BACKUP_READ_BYTES, IOSTATS(bytes_read) - prev_bytes_read);
+ RecordTick(stats, BACKUP_WRITE_BYTES,
+ IOSTATS(bytes_written) - prev_bytes_written);
return s;
}
CopyOrCreateWorkItem copy_or_create_work_item(
GetAbsolutePath(file), dst, "" /* contents */, backup_env_, db_env_,
EnvOptions() /* src_env_options */, options_.sync,
- options_.restore_rate_limiter.get(), 0 /* size_limit */);
+ options_.restore_rate_limiter.get(), 0 /* size_limit */,
+ nullptr /* stats */);
RestoreAfterCopyOrCreateWorkItem after_copy_or_create_work_item(
copy_or_create_work_item.result.get_future(), file, dst,
file_info->checksum_hex);
BackupID backup_id, bool shared, const std::string& src_dir,
const std::string& fname, const EnvOptions& src_env_options,
RateLimiter* rate_limiter, FileType file_type, uint64_t size_bytes,
- uint64_t size_limit, bool shared_checksum,
+ Statistics* stats, uint64_t size_limit, bool shared_checksum,
std::function<void()> progress_callback, const std::string& contents,
const std::string& src_checksum_func_name,
const std::string& src_checksum_str) {
CopyOrCreateWorkItem copy_or_create_work_item(
src_dir.empty() ? "" : src_dir + fname, *copy_dest_path, contents,
db_env_, backup_env_, src_env_options, options_.sync, rate_limiter,
- size_limit, progress_callback, src_checksum_func_name, checksum_hex,
- db_id, db_session_id);
+ size_limit, stats, progress_callback, src_checksum_func_name,
+ checksum_hex, db_id, db_session_id);
BackupAfterCopyOrCreateWorkItem after_copy_or_create_work_item(
copy_or_create_work_item.result.get_future(), shared, need_to_copy,
backup_env_, temp_dest_path, final_dest_path, dst_relative);
}
s = backup_meta_file->Append(Slice(buf.str()));
+ IOSTATS_ADD(bytes_written, buf.str().size());
if (s.ok() && sync) {
s = backup_meta_file->Sync();
}
DestroyDB(dbname_, options_);
}
+// Populates `*total_size` with the size of all files under `backup_dir`.
+// We don't go through `BackupEngine` currently because it's hard to figure out
+// the metadata file size.
+Status GetSizeOfBackupFiles(FileSystem* backup_fs,
+ const std::string& backup_dir, size_t* total_size) {
+ *total_size = 0;
+ std::vector<std::string> dir_stack = {backup_dir};
+ Status s;
+ while (s.ok() && !dir_stack.empty()) {
+ std::string dir = std::move(dir_stack.back());
+ dir_stack.pop_back();
+ std::vector<std::string> children;
+ s = backup_fs->GetChildren(dir, IOOptions(), &children, nullptr /* dbg */);
+ for (size_t i = 0; s.ok() && i < children.size(); ++i) {
+ std::string path = dir + "/" + children[i];
+ bool is_dir;
+ s = backup_fs->IsDirectory(path, IOOptions(), &is_dir, nullptr /* dbg */);
+ uint64_t file_size = 0;
+ if (s.ok()) {
+ if (is_dir) {
+ dir_stack.emplace_back(std::move(path));
+ } else {
+ s = backup_fs->GetFileSize(path, IOOptions(), &file_size,
+ nullptr /* dbg */);
+ }
+ }
+ if (s.ok()) {
+ *total_size += file_size;
+ }
+ }
+ }
+ return s;
+}
+
+TEST_F(BackupEngineTest, IOStats) {
+ // Tests the `BACKUP_READ_BYTES` and `BACKUP_WRITE_BYTES` ticker stats have
+ // the expected values according to the files in the backups.
+
+ // These ticker stats are expected to be populated regardless of `PerfLevel`
+ // in user thread
+ SetPerfLevel(kDisable);
+
+ options_.statistics = CreateDBStatistics();
+ OpenDBAndBackupEngine(true /* destroy_old_data */, false /* dummy */,
+ kShareWithChecksum);
+
+ FillDB(db_.get(), 0 /* from */, 100 /* to */, kFlushMost);
+
+ ASSERT_EQ(0, options_.statistics->getTickerCount(BACKUP_READ_BYTES));
+ ASSERT_EQ(0, options_.statistics->getTickerCount(BACKUP_WRITE_BYTES));
+ ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(),
+ false /* flush_before_backup */));
+
+ size_t orig_backup_files_size;
+ ASSERT_OK(GetSizeOfBackupFiles(test_backup_env_->GetFileSystem().get(),
+ backupdir_, &orig_backup_files_size));
+ size_t expected_bytes_written = orig_backup_files_size;
+ ASSERT_EQ(expected_bytes_written,
+ options_.statistics->getTickerCount(BACKUP_WRITE_BYTES));
+ // Bytes read is more difficult to pin down since there are reads for many
+ // purposes other than creating file, like `GetSortedWalFiles()` to find first
+ // sequence number, or `CreateNewBackup()` thread to find SST file session ID.
+ // So we loosely require there are at least as many reads as needed for
+ // copying, but not as many as twice that.
+ ASSERT_GE(options_.statistics->getTickerCount(BACKUP_READ_BYTES),
+ expected_bytes_written);
+ ASSERT_LT(expected_bytes_written,
+ 2 * options_.statistics->getTickerCount(BACKUP_READ_BYTES));
+
+ FillDB(db_.get(), 100 /* from */, 200 /* to */, kFlushMost);
+
+ ASSERT_OK(options_.statistics->Reset());
+ ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(),
+ false /* flush_before_backup */));
+ size_t final_backup_files_size;
+ ASSERT_OK(GetSizeOfBackupFiles(test_backup_env_->GetFileSystem().get(),
+ backupdir_, &final_backup_files_size));
+ expected_bytes_written = final_backup_files_size - orig_backup_files_size;
+ ASSERT_EQ(expected_bytes_written,
+ options_.statistics->getTickerCount(BACKUP_WRITE_BYTES));
+ // See above for why these bounds were chosen.
+ ASSERT_GE(options_.statistics->getTickerCount(BACKUP_READ_BYTES),
+ expected_bytes_written);
+ ASSERT_LT(expected_bytes_written,
+ 2 * options_.statistics->getTickerCount(BACKUP_READ_BYTES));
+}
+
} // anon namespace
} // namespace ROCKSDB_NAMESPACE