#include "db/wal_manager.h"
#include "file/file_util.h"
#include "file/filename.h"
-#include "options/options_parser.h"
#include "port/port.h"
#include "rocksdb/db.h"
#include "rocksdb/env.h"
#include "rocksdb/metadata.h"
#include "rocksdb/transaction_log.h"
#include "rocksdb/utilities/checkpoint.h"
-#include "rocksdb/utilities/options_util.h"
#include "test_util/sync_point.h"
#include "util/cast_util.h"
#include "util/file_checksum_helper.h"
Status Checkpoint::CreateCheckpoint(const std::string& /*checkpoint_dir*/,
uint64_t /*log_size_for_flush*/,
- uint64_t* /*sequence_number_ptr*/,
- const std::string& /*db_log_dir*/,
- const std::string& /*wal_dir*/) {
+ uint64_t* /*sequence_number_ptr*/) {
return Status::NotSupported("");
}
-void CheckpointImpl::CleanStagingDirectory(
- const std::string& full_private_path, Logger* info_log) {
- std::vector<std::string> subchildren;
+void CheckpointImpl::CleanStagingDirectory(const std::string& full_private_path,
+ Logger* info_log) {
+ std::vector<std::string> subchildren;
Status s = db_->GetEnv()->FileExists(full_private_path);
if (s.IsNotFound()) {
return;
}
- ROCKS_LOG_INFO(info_log, "File exists %s -- %s",
- full_private_path.c_str(), s.ToString().c_str());
+ ROCKS_LOG_INFO(info_log, "File exists %s -- %s", full_private_path.c_str(),
+ s.ToString().c_str());
s = db_->GetEnv()->GetChildren(full_private_path, &subchildren);
if (s.ok()) {
for (auto& subchild : subchildren) {
}
// finally delete the private dir
s = db_->GetEnv()->DeleteDir(full_private_path);
- ROCKS_LOG_INFO(info_log, "Delete dir %s -- %s",
- full_private_path.c_str(), s.ToString().c_str());
+ ROCKS_LOG_INFO(info_log, "Delete dir %s -- %s", full_private_path.c_str(),
+ s.ToString().c_str());
}
Status Checkpoint::ExportColumnFamily(
// Builds an openable snapshot of RocksDB
Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir,
uint64_t log_size_for_flush,
- uint64_t* sequence_number_ptr,
- const std::string& db_log_dir,
- const std::string& wal_dir) {
+ uint64_t* sequence_number_ptr) {
DBOptions db_options = db_->GetDBOptions();
Status s = db_->GetEnv()->FileExists(checkpoint_dir);
return Status::InvalidArgument("invalid checkpoint directory name");
}
- std::string parsed_checkpoint_dir =
- checkpoint_dir.substr(0, final_nonslash_idx + 1);
- std::string full_private_path = parsed_checkpoint_dir + ".tmp";
+ std::string full_private_path =
+ checkpoint_dir.substr(0, final_nonslash_idx + 1) + ".tmp";
ROCKS_LOG_INFO(db_options.info_log,
"Snapshot process -- using temporary directory %s",
full_private_path.c_str());
CleanStagingDirectory(full_private_path, db_options.info_log.get());
// create snapshot directory
s = db_->GetEnv()->CreateDir(full_private_path);
-
- // Remove the last `/`s if needed
- std::string parsed_log_dir =
- db_log_dir.empty()
- ? ""
- : db_log_dir.substr(0, db_log_dir.find_last_not_of('/') + 1);
- std::string parsed_wal_dir =
- wal_dir.empty() ? ""
- : wal_dir.substr(0, wal_dir.find_last_not_of('/') + 1);
-
- // Info log files are not copied or linked, just update the option value.
- std::string value_log_dir = parsed_log_dir == db_->GetName() ||
- parsed_log_dir == parsed_checkpoint_dir
- ? ""
- : parsed_log_dir;
-
- // If the wal_dir is empty, or the same as the source db dir, update the
- // option value to the checkpoint dir.
- std::string value_wal_dir; // Option value to override
- std::string new_wal_dir; // The target location to copy/link WAL files
- if (parsed_wal_dir.empty() || parsed_wal_dir == db_->GetName() ||
- parsed_wal_dir == parsed_checkpoint_dir) {
- value_wal_dir = parsed_checkpoint_dir;
- new_wal_dir = full_private_path; // Copy to the temp dir
- } else {
- value_wal_dir = parsed_wal_dir;
- std::string prefix = parsed_checkpoint_dir + "/";
- // If checkpoint_dir is parent of wal_dir, create the wal dir inside the tmp
- // dir; otherwise, create it directly.
- new_wal_dir =
- parsed_wal_dir.rfind(prefix, 0) == 0
- ? full_private_path + "/" + parsed_wal_dir.substr(prefix.size())
- : parsed_wal_dir;
- s = db_->GetEnv()->FileExists(new_wal_dir);
- if (s.IsNotFound()) {
- s = db_->GetEnv()->CreateDir(new_wal_dir);
- }
- }
-
uint64_t sequence_number = 0;
if (s.ok()) {
// enable file deletions
s = CreateCustomCheckpoint(
db_options,
[&](const std::string& src_dirname, const std::string& fname,
- FileType type) {
+ FileType) {
ROCKS_LOG_INFO(db_options.info_log, "Hard Linking %s",
fname.c_str());
- // WAL file links may be created in another location.
- return db_->GetFileSystem()->LinkFile(
- src_dirname + fname,
- (type == kWalFile ? new_wal_dir : full_private_path) + fname,
- IOOptions(), nullptr);
+ return db_->GetFileSystem()->LinkFile(src_dirname + fname,
+ full_private_path + fname,
+ IOOptions(), nullptr);
} /* link_file_cb */,
[&](const std::string& src_dirname, const std::string& fname,
- uint64_t size_limit_bytes, FileType type,
+ uint64_t size_limit_bytes, FileType,
const std::string& /* checksum_func_name */,
const std::string& /* checksum_val */) {
ROCKS_LOG_INFO(db_options.info_log, "Copying %s", fname.c_str());
- if (type == kOptionsFile) {
- // Modify and rewrite option files
- return CopyOptionsFile(src_dirname + fname,
- full_private_path + fname, value_log_dir,
- value_wal_dir);
- } else {
- // Copy other files. WAL files may be copied to another location.
- return Status(CopyFile(
- db_->GetFileSystem(), src_dirname + fname,
- (type == kWalFile ? new_wal_dir : full_private_path) + fname,
- size_limit_bytes, db_options.use_fsync));
- }
+ return CopyFile(db_->GetFileSystem(), src_dirname + fname,
+ full_private_path + fname, size_limit_bytes,
+ db_options.use_fsync);
} /* copy_file_cb */,
[&](const std::string& fname, const std::string& contents, FileType) {
ROCKS_LOG_INFO(db_options.info_log, "Creating %s", fname.c_str());
if (s.ok()) {
// move tmp private backup to real snapshot directory
- s = db_->GetEnv()->RenameFile(full_private_path, parsed_checkpoint_dir);
+ s = db_->GetEnv()->RenameFile(full_private_path, checkpoint_dir);
}
if (s.ok()) {
std::unique_ptr<Directory> checkpoint_directory;
- s = db_->GetEnv()->NewDirectory(parsed_checkpoint_dir,
- &checkpoint_directory);
+ s = db_->GetEnv()->NewDirectory(checkpoint_dir, &checkpoint_directory);
if (s.ok() && checkpoint_directory != nullptr) {
s = checkpoint_directory->Fsync();
}
auto wal_dir = ioptions.GetWalDir();
for (size_t i = 0; s.ok() && i < wal_size; ++i) {
if ((live_wal_files[i]->Type() == kAliveLogFile) &&
- (!flush_memtable ||
- live_wal_files[i]->LogNumber() >= min_log_num)) {
+ (!flush_memtable || live_wal_files[i]->LogNumber() >= min_log_num)) {
if (i + 1 == wal_size) {
s = copy_file_cb(wal_dir, live_wal_files[i]->PathName(),
live_wal_files[i]->SizeFileBytes(), kWalFile,
return s;
}
-
-Status CheckpointImpl::CopyOptionsFile(const std::string& src_file,
- const std::string& target_file,
- const std::string& db_log_dir,
- const std::string& wal_dir) {
- Status s;
- DBOptions src_db_options;
- std::vector<ColumnFamilyDescriptor> src_cf_descs;
- s = LoadOptionsFromFile(ConfigOptions(), src_file, &src_db_options,
- &src_cf_descs);
- if (!s.ok()) {
- return s;
- }
-
- // Override these 2 options
- src_db_options.db_log_dir = db_log_dir;
- src_db_options.wal_dir = wal_dir;
-
- std::vector<std::string> src_cf_names;
- std::vector<ColumnFamilyOptions> src_cf_opts;
- src_cf_names.reserve(src_cf_descs.size());
- src_cf_opts.reserve(src_cf_descs.size());
- for (ColumnFamilyDescriptor desc : src_cf_descs) {
- src_cf_names.push_back(desc.name);
- src_cf_opts.push_back(desc.options);
- }
-
- return PersistRocksDBOptions(src_db_options, src_cf_names, src_cf_opts,
- target_file, db_->GetFileSystem());
-}
-
} // namespace ROCKSDB_NAMESPACE
#endif // ROCKSDB_LITE
#ifndef OS_WIN
#include <unistd.h>
#endif
-#include <iomanip>
#include <iostream>
-#include <sstream>
#include <thread>
#include <utility>
-#include <vector>
#include "db/db_impl/db_impl.h"
#include "file/file_util.h"
#include "port/port.h"
#include "port/stack_trace.h"
-#include "rocksdb/convenience.h"
#include "rocksdb/db.h"
#include "rocksdb/env.h"
-#include "rocksdb/utilities/options_util.h"
#include "rocksdb/utilities/transaction_db.h"
#include "test_util/sync_point.h"
#include "test_util/testharness.h"
}
return result;
}
-
- static std::string IntToFixedWidthString(size_t i, int len) {
- std::stringstream ss;
- ss << std::setw(len) << std::setfill('0') << i;
- return ss.str();
- }
};
TEST_F(CheckpointTest, GetSnapshotLink) {
delete snapshot_db;
}
-TEST_F(CheckpointTest, CheckpointWithOptionsDirsTest) {
- // If the checkpoint and the source db share the same wal_dir, files may be
- // corrupted if both write to or delete from the same wal_dir. db_log_dir
- // should also be updated during checkpointing, but it is less important since
- // log files are not copied or linked to the checkpoint.
-
- // 8 bytes key, 1 kB record, 4 kB MemTable. Each batch should trigger 25
- // flushes
- const int key_len = 8;
- const int value_len = 1016;
- const size_t num_keys = 100;
- const size_t buffer_size = 4096;
-
- std::string value(value_len, ' ');
-
- std::vector<std::string> dirs1 = {"", "", "", ""};
- std::vector<std::string> dirs2 = {"/logs", "/wal", "", ""};
- std::vector<std::string> dirs3 = {"/logs", "/wal", "/logs", "/wal"};
-
- for (auto dirs : {dirs1, dirs2, dirs3}) {
- std::string src_log_dir = dirs[0].empty() ? "" : dbname_ + dirs[0];
- std::string src_wal_dir = dirs[1].empty() ? "" : dbname_ + dirs[1];
- std::string snap_log_dir = dirs[2].empty() ? "" : snapshot_name_ + dirs[2];
- std::string snap_wal_dir = dirs[3].empty() ? "" : snapshot_name_ + dirs[3];
-
- Options src_opts = CurrentOptions();
- WriteOptions w_opts;
- ReadOptions r_opts;
- DB* snapshotDB;
- Checkpoint* checkpoint;
-
- src_opts = CurrentOptions();
- delete db_;
- db_ = nullptr;
- ASSERT_OK(DestroyDB(dbname_, src_opts));
-
- // Create a database
- src_opts.create_if_missing = true;
- src_opts.write_buffer_size = buffer_size;
- src_opts.OptimizeUniversalStyleCompaction(buffer_size);
- src_opts.db_log_dir = src_log_dir;
- src_opts.wal_dir = src_wal_dir;
-
- ASSERT_OK(DB::Open(src_opts, dbname_, &db_));
-
- // Write to src db
- for (size_t i = 1; i <= num_keys; i++) {
- ASSERT_OK(db_->Put(w_opts, IntToFixedWidthString(i, key_len), value));
- }
-
- // Take a snapshot
- ASSERT_OK(Checkpoint::Create(db_, &checkpoint));
- ASSERT_OK(checkpoint->CreateCheckpoint(snapshot_name_, 0, nullptr,
- snap_log_dir, snap_wal_dir));
-
- // Write to src db again
- for (size_t i = num_keys + 1; i <= num_keys * 2; i++) {
- ASSERT_OK(db_->Put(w_opts, IntToFixedWidthString(i, key_len), value));
- }
-
- std::string result;
- std::string key1 = IntToFixedWidthString(num_keys, key_len);
- std::string key2 = IntToFixedWidthString(num_keys * 2, key_len);
- std::string key3 = IntToFixedWidthString(num_keys * 3, key_len);
-
- ASSERT_OK(db_->Get(r_opts, key1, &result));
- ASSERT_OK(db_->Get(r_opts, key2, &result));
-
- // Open snapshot with its own options
- DBOptions snap_opts;
- std::vector<ColumnFamilyDescriptor> snap_cfs;
- ASSERT_OK(LoadLatestOptions(ConfigOptions(), snapshot_name_, &snap_opts,
- &snap_cfs));
-
- ASSERT_EQ(snap_opts.db_log_dir, snap_log_dir);
- ASSERT_EQ(snap_opts.wal_dir,
- snap_wal_dir.empty() ? snapshot_name_ : snap_wal_dir);
-
- std::vector<ColumnFamilyHandle*> handles;
- ASSERT_OK(
- DB::Open(snap_opts, snapshot_name_, snap_cfs, &handles, &snapshotDB));
- for (ColumnFamilyHandle* handle : handles) {
- delete handle;
- }
- handles.clear();
-
- ASSERT_OK(snapshotDB->Get(r_opts, key1, &result));
- ASSERT_TRUE(snapshotDB->Get(r_opts, key2, &result).IsNotFound());
-
- // Write to snapshot
- for (size_t i = num_keys * 2 + 1; i <= num_keys * 3; i++) {
- ASSERT_OK(
- snapshotDB->Put(w_opts, IntToFixedWidthString(i, key_len), value));
- }
-
- ASSERT_OK(snapshotDB->Get(r_opts, key3, &result));
- ASSERT_TRUE(db_->Get(r_opts, key3, &result).IsNotFound());
-
- // Close and reopen the snapshot
- delete snapshotDB;
- ASSERT_OK(
- DB::Open(snap_opts, snapshot_name_, snap_cfs, &handles, &snapshotDB));
- for (ColumnFamilyHandle* handle : handles) {
- delete handle;
- }
- handles.clear();
- ASSERT_TRUE(snapshotDB->Get(r_opts, key2, &result).IsNotFound());
- ASSERT_OK(snapshotDB->Get(r_opts, key3, &result));
-
- delete snapshotDB;
-
- // Close and reopen the source db
- delete db_;
- src_opts.create_if_missing = false;
- ASSERT_OK(DB::Open(src_opts, dbname_, &db_));
- ASSERT_OK(db_->Get(r_opts, key2, &result));
- ASSERT_TRUE(db_->Get(r_opts, key3, &result).IsNotFound());
- delete db_;
-
- // Delete the snapshot
- Options del_opts;
- del_opts.db_log_dir = snap_opts.db_log_dir;
- del_opts.wal_dir = snap_opts.wal_dir;
- ASSERT_OK(DestroyDB(snapshot_name_, del_opts));
-
- // Reopen the source db again
- ASSERT_OK(DB::Open(src_opts, dbname_, &db_));
-
- delete db_;
- db_ = nullptr;
- ASSERT_OK(DestroyDB(dbname_, src_opts));
-
- dbname_ = test::PerThreadDBPath(env_, "db_test");
-
- delete checkpoint;
- }
-}
-
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {