#include "db/wal_manager.h"
#include "file/file_util.h"
#include "file/filename.h"
+#include "options/options_parser.h"
#include "port/port.h"
#include "rocksdb/db.h"
#include "rocksdb/env.h"
#include "rocksdb/metadata.h"
#include "rocksdb/transaction_log.h"
#include "rocksdb/utilities/checkpoint.h"
+#include "rocksdb/utilities/options_util.h"
#include "test_util/sync_point.h"
#include "util/cast_util.h"
#include "util/file_checksum_helper.h"
Status Checkpoint::CreateCheckpoint(const std::string& /*checkpoint_dir*/,
uint64_t /*log_size_for_flush*/,
- uint64_t* /*sequence_number_ptr*/) {
+ uint64_t* /*sequence_number_ptr*/,
+ const std::string& /*db_log_dir*/,
+ const std::string& /*wal_dir*/) {
return Status::NotSupported("");
}
// Builds an openable snapshot of RocksDB
Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir,
uint64_t log_size_for_flush,
- uint64_t* sequence_number_ptr) {
+ uint64_t* sequence_number_ptr,
+ const std::string& db_log_dir,
+ const std::string& wal_dir) {
DBOptions db_options = db_->GetDBOptions();
Status s = db_->GetEnv()->FileExists(checkpoint_dir);
return Status::InvalidArgument("invalid checkpoint directory name");
}
- std::string full_private_path =
- checkpoint_dir.substr(0, final_nonslash_idx + 1) + ".tmp";
- ROCKS_LOG_INFO(
- db_options.info_log,
- "Snapshot process -- using temporary directory %s",
- full_private_path.c_str());
+ std::string parsed_checkpoint_dir =
+ checkpoint_dir.substr(0, final_nonslash_idx + 1);
+ std::string full_private_path = parsed_checkpoint_dir + ".tmp";
+ ROCKS_LOG_INFO(db_options.info_log,
+ "Snapshot process -- using temporary directory %s",
+ full_private_path.c_str());
CleanStagingDirectory(full_private_path, db_options.info_log.get());
// create snapshot directory
s = db_->GetEnv()->CreateDir(full_private_path);
+
+ // Remove the last `/`s if needed
+ std::string parsed_log_dir =
+ db_log_dir.empty()
+ ? ""
+ : db_log_dir.substr(0, db_log_dir.find_last_not_of('/') + 1);
+ std::string parsed_wal_dir =
+ wal_dir.empty() ? ""
+ : wal_dir.substr(0, wal_dir.find_last_not_of('/') + 1);
+
+ // Info log files are not copied or linked, just update the option value.
+ std::string value_log_dir = parsed_log_dir == db_->GetName() ||
+ parsed_log_dir == parsed_checkpoint_dir
+ ? ""
+ : parsed_log_dir;
+
+ // If the wal_dir is empty, or the same as the source db dir, update the
+ // option value to the checkpoint dir.
+ std::string value_wal_dir; // Option value to override
+ std::string new_wal_dir; // The target location to copy/link WAL files
+ if (parsed_wal_dir.empty() || parsed_wal_dir == db_->GetName() ||
+ parsed_wal_dir == parsed_checkpoint_dir) {
+ value_wal_dir = parsed_checkpoint_dir;
+ new_wal_dir = full_private_path; // Copy to the temp dir
+ } else {
+ value_wal_dir = parsed_wal_dir;
+ std::string prefix = parsed_checkpoint_dir + "/";
+ // If checkpoint_dir is parent of wal_dir, create the wal dir inside the tmp
+ // dir; otherwise, create it directly.
+ new_wal_dir =
+ parsed_wal_dir.rfind(prefix, 0) == 0
+ ? full_private_path + "/" + parsed_wal_dir.substr(prefix.size())
+ : parsed_wal_dir;
+ s = db_->GetEnv()->FileExists(new_wal_dir);
+ if (s.IsNotFound()) {
+ s = db_->GetEnv()->CreateDir(new_wal_dir);
+ }
+ }
+
uint64_t sequence_number = 0;
if (s.ok()) {
// enable file deletions
s = CreateCustomCheckpoint(
db_options,
[&](const std::string& src_dirname, const std::string& fname,
- FileType) {
+ FileType type) {
ROCKS_LOG_INFO(db_options.info_log, "Hard Linking %s",
fname.c_str());
- return db_->GetFileSystem()->LinkFile(src_dirname + fname,
- full_private_path + fname,
- IOOptions(), nullptr);
+ // WAL file links may be created in another location.
+ return db_->GetFileSystem()->LinkFile(
+ src_dirname + fname,
+ (type == kWalFile ? new_wal_dir : full_private_path) + fname,
+ IOOptions(), nullptr);
} /* link_file_cb */,
[&](const std::string& src_dirname, const std::string& fname,
- uint64_t size_limit_bytes, FileType,
+ uint64_t size_limit_bytes, FileType type,
const std::string& /* checksum_func_name */,
const std::string& /* checksum_val */) {
ROCKS_LOG_INFO(db_options.info_log, "Copying %s", fname.c_str());
- return CopyFile(db_->GetFileSystem(), src_dirname + fname,
- full_private_path + fname, size_limit_bytes,
- db_options.use_fsync);
+ if (type == kOptionsFile) {
+ // Modify and rewrite option files
+ return CopyOptionsFile(src_dirname + fname,
+ full_private_path + fname, value_log_dir,
+ value_wal_dir);
+ } else {
+ // Copy other files. WAL files may be copied to another location.
+ return Status(CopyFile(
+ db_->GetFileSystem(), src_dirname + fname,
+ (type == kWalFile ? new_wal_dir : full_private_path) + fname,
+ size_limit_bytes, db_options.use_fsync));
+ }
} /* copy_file_cb */,
[&](const std::string& fname, const std::string& contents, FileType) {
ROCKS_LOG_INFO(db_options.info_log, "Creating %s", fname.c_str());
if (s.ok()) {
// move tmp private backup to real snapshot directory
- s = db_->GetEnv()->RenameFile(full_private_path, checkpoint_dir);
+ s = db_->GetEnv()->RenameFile(full_private_path, parsed_checkpoint_dir);
}
if (s.ok()) {
std::unique_ptr<Directory> checkpoint_directory;
- s = db_->GetEnv()->NewDirectory(checkpoint_dir, &checkpoint_directory);
+ s = db_->GetEnv()->NewDirectory(parsed_checkpoint_dir,
+ &checkpoint_directory);
if (s.ok() && checkpoint_directory != nullptr) {
s = checkpoint_directory->Fsync();
}
return s;
}
+
+Status CheckpointImpl::CopyOptionsFile(const std::string& src_file,
+ const std::string& target_file,
+ const std::string& db_log_dir,
+ const std::string& wal_dir) {
+ Status s;
+ DBOptions src_db_options;
+ std::vector<ColumnFamilyDescriptor> src_cf_descs;
+ s = LoadOptionsFromFile(ConfigOptions(), src_file, &src_db_options,
+ &src_cf_descs);
+ if (!s.ok()) {
+ return s;
+ }
+
+ // Override these 2 options
+ src_db_options.db_log_dir = db_log_dir;
+ src_db_options.wal_dir = wal_dir;
+
+ std::vector<std::string> src_cf_names;
+ std::vector<ColumnFamilyOptions> src_cf_opts;
+ src_cf_names.reserve(src_cf_descs.size());
+ src_cf_opts.reserve(src_cf_descs.size());
+ for (ColumnFamilyDescriptor desc : src_cf_descs) {
+ src_cf_names.push_back(desc.name);
+ src_cf_opts.push_back(desc.options);
+ }
+
+ return PersistRocksDBOptions(src_db_options, src_cf_names, src_cf_opts,
+ target_file, db_->GetFileSystem());
+}
+
} // namespace ROCKSDB_NAMESPACE
#endif // ROCKSDB_LITE
#ifndef OS_WIN
#include <unistd.h>
#endif
+#include <iomanip>
#include <iostream>
+#include <sstream>
#include <thread>
#include <utility>
+#include <vector>
#include "db/db_impl/db_impl.h"
#include "file/file_util.h"
#include "port/port.h"
#include "port/stack_trace.h"
+#include "rocksdb/convenience.h"
#include "rocksdb/db.h"
#include "rocksdb/env.h"
+#include "rocksdb/utilities/options_util.h"
#include "rocksdb/utilities/transaction_db.h"
#include "test_util/sync_point.h"
#include "test_util/testharness.h"
}
return result;
}
+
+ static std::string IntToFixedWidthString(size_t i, int len) {
+ std::stringstream ss;
+ ss << std::setw(len) << std::setfill('0') << i;
+ return ss.str();
+ }
};
TEST_F(CheckpointTest, GetSnapshotLink) {
delete snapshot_db;
}
+TEST_F(CheckpointTest, CheckpointWithOptionsDirsTest) {
+ // If the checkpoint and the source db share the same wal_dir, files may be
+ // corrupted if both write to or delete from the same wal_dir. db_log_dir
+ // should also be updated during checkpointing, but it is less important since
+ // log files are not copied or linked to the checkpoint.
+
+ // 8 bytes key, 1 kB record, 4 kB MemTable. Each batch should trigger 25
+ // flushes
+ const int key_len = 8;
+ const int value_len = 1016;
+ const size_t num_keys = 100;
+ const size_t buffer_size = 4096;
+
+ std::string value(value_len, ' ');
+
+ std::vector<std::string> dirs1 = {"", "", "", ""};
+ std::vector<std::string> dirs2 = {"/logs", "/wal", "", ""};
+ std::vector<std::string> dirs3 = {"/logs", "/wal", "/logs", "/wal"};
+
+ for (auto dirs : {dirs1, dirs2, dirs3}) {
+ std::string src_log_dir = dirs[0].empty() ? "" : dbname_ + dirs[0];
+ std::string src_wal_dir = dirs[1].empty() ? "" : dbname_ + dirs[1];
+ std::string snap_log_dir = dirs[2].empty() ? "" : snapshot_name_ + dirs[2];
+ std::string snap_wal_dir = dirs[3].empty() ? "" : snapshot_name_ + dirs[3];
+
+ Options src_opts = CurrentOptions();
+ WriteOptions w_opts;
+ ReadOptions r_opts;
+ DB* snapshotDB;
+ Checkpoint* checkpoint;
+
+ src_opts = CurrentOptions();
+ delete db_;
+ db_ = nullptr;
+ ASSERT_OK(DestroyDB(dbname_, src_opts));
+
+ // Create a database
+ src_opts.create_if_missing = true;
+ src_opts.write_buffer_size = buffer_size;
+ src_opts.OptimizeUniversalStyleCompaction(buffer_size);
+ src_opts.db_log_dir = src_log_dir;
+ src_opts.wal_dir = src_wal_dir;
+
+ ASSERT_OK(DB::Open(src_opts, dbname_, &db_));
+
+ // Write to src db
+ for (size_t i = 1; i <= num_keys; i++) {
+ ASSERT_OK(db_->Put(w_opts, IntToFixedWidthString(i, key_len), value));
+ }
+
+ // Take a snapshot
+ ASSERT_OK(Checkpoint::Create(db_, &checkpoint));
+ ASSERT_OK(checkpoint->CreateCheckpoint(snapshot_name_, 0, nullptr,
+ snap_log_dir, snap_wal_dir));
+
+ // Write to src db again
+ for (size_t i = num_keys + 1; i <= num_keys * 2; i++) {
+ ASSERT_OK(db_->Put(w_opts, IntToFixedWidthString(i, key_len), value));
+ }
+
+ std::string result;
+ std::string key1 = IntToFixedWidthString(num_keys, key_len);
+ std::string key2 = IntToFixedWidthString(num_keys * 2, key_len);
+ std::string key3 = IntToFixedWidthString(num_keys * 3, key_len);
+
+ ASSERT_OK(db_->Get(r_opts, key1, &result));
+ ASSERT_OK(db_->Get(r_opts, key2, &result));
+
+ // Open snapshot with its own options
+ DBOptions snap_opts;
+ std::vector<ColumnFamilyDescriptor> snap_cfs;
+ ASSERT_OK(LoadLatestOptions(ConfigOptions(), snapshot_name_, &snap_opts,
+ &snap_cfs));
+
+ ASSERT_EQ(snap_opts.db_log_dir, snap_log_dir);
+ ASSERT_EQ(snap_opts.wal_dir,
+ snap_wal_dir.empty() ? snapshot_name_ : snap_wal_dir);
+
+ std::vector<ColumnFamilyHandle*> handles;
+ ASSERT_OK(
+ DB::Open(snap_opts, snapshot_name_, snap_cfs, &handles, &snapshotDB));
+ for (ColumnFamilyHandle* handle : handles) {
+ delete handle;
+ }
+ handles.clear();
+
+ ASSERT_OK(snapshotDB->Get(r_opts, key1, &result));
+ ASSERT_TRUE(snapshotDB->Get(r_opts, key2, &result).IsNotFound());
+
+ // Write to snapshot
+ for (size_t i = num_keys * 2 + 1; i <= num_keys * 3; i++) {
+ ASSERT_OK(
+ snapshotDB->Put(w_opts, IntToFixedWidthString(i, key_len), value));
+ }
+
+ ASSERT_OK(snapshotDB->Get(r_opts, key3, &result));
+ ASSERT_TRUE(db_->Get(r_opts, key3, &result).IsNotFound());
+
+ // Close and reopen the snapshot
+ delete snapshotDB;
+ ASSERT_OK(
+ DB::Open(snap_opts, snapshot_name_, snap_cfs, &handles, &snapshotDB));
+ for (ColumnFamilyHandle* handle : handles) {
+ delete handle;
+ }
+ handles.clear();
+ ASSERT_TRUE(snapshotDB->Get(r_opts, key2, &result).IsNotFound());
+ ASSERT_OK(snapshotDB->Get(r_opts, key3, &result));
+
+ delete snapshotDB;
+
+ // Close and reopen the source db
+ delete db_;
+ src_opts.create_if_missing = false;
+ ASSERT_OK(DB::Open(src_opts, dbname_, &db_));
+ ASSERT_OK(db_->Get(r_opts, key2, &result));
+ ASSERT_TRUE(db_->Get(r_opts, key3, &result).IsNotFound());
+ delete db_;
+
+ // Delete the snapshot
+ Options del_opts;
+ del_opts.db_log_dir = snap_opts.db_log_dir;
+ del_opts.wal_dir = snap_opts.wal_dir;
+ ASSERT_OK(DestroyDB(snapshot_name_, del_opts));
+
+ // Reopen the source db again
+ ASSERT_OK(DB::Open(src_opts, dbname_, &db_));
+
+ delete db_;
+ db_ = nullptr;
+ ASSERT_OK(DestroyDB(dbname_, src_opts));
+
+ dbname_ = test::PerThreadDBPath(env_, "db_test");
+
+ delete checkpoint;
+ }
+}
+
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {