## 6.23.0 (2021-07-16)
### Behavior Changes
* Obsolete keys in the bottommost level that were preserved for a snapshot will now be cleaned upon snapshot release in all cases. This form of compaction (snapshot release triggered compaction) previously had an artificial limitation that multiple tombstones needed to be present.
-
### Bug Fixes
* Blob file checksums are now printed in hexadecimal format when using the `manifest_dump` `ldb` command.
* `GetLiveFilesMetaData()` now populates the `temperature`, `oldest_ancester_time`, and `file_creation_time` fields of its `LiveFileMetaData` results when the information is available. Previously these fields always contained zero indicating unknown.
* Fix continuous logging of an existing background error on every user write
* Fix a bug that `Get()` return Status::OK() and an empty value for non-existent key when `read_options.read_tier = kBlockCacheTier`.
* Fix a bug that stat in `get_context` didn't accumulate to statistics when query is failed.
+* Fixed handling of DBOptions::wal_dir with LoadLatestOptions() or ldb --try_load_options on a copied or moved DB. Previously, when the WAL directory is same as DB directory (default), a copied or moved DB would reference the old path of the DB as the WAL directory, potentially corrupting both copies. Under this change, the wal_dir from DB::GetOptions() or LoadLatestOptions() may now be empty, indicating that the current DB directory is used for WALs. This is also a subtle API change.
### New Features
* ldb has a new feature, `list_live_files_metadata`, that shows the live SST files, as well as their LSM storage level and the column family they belong to.
ROCKS_LOG_WARN(
immutable_db_options_.info_log,
"Unable to Sync WAL file %s with error -- %s",
- LogFileName(immutable_db_options_.wal_dir, log_number).c_str(),
+ LogFileName(immutable_db_options_.GetWalDir(), log_number).c_str(),
s.ToString().c_str());
// Retain the first error
if (ret.ok()) {
const Status DBImpl::CreateArchivalDirectory() {
if (immutable_db_options_.WAL_ttl_seconds > 0 ||
immutable_db_options_.WAL_size_limit_MB > 0) {
- std::string archivalPath = ArchivalDirectory(immutable_db_options_.wal_dir);
+ std::string archivalPath =
+ ArchivalDirectory(immutable_db_options_.GetWalDir());
return env_->CreateDirIfMissing(archivalPath);
}
return Status::OK();
ImmutableDBOptions soptions(SanitizeOptions(dbname, options));
Env* env = soptions.env;
std::vector<std::string> filenames;
- bool wal_in_db_path = IsWalDirSameAsDBPath(&soptions);
+ bool wal_in_db_path = soptions.IsWalDirSameAsDBPath();
// Reset the logger because it holds a handle to the
// log file and prevents cleanup and directory removal
std::vector<std::string> walDirFiles;
std::string archivedir = ArchivalDirectory(dbname);
bool wal_dir_exists = false;
- if (dbname != soptions.wal_dir) {
+ if (!soptions.IsWalDirSameAsDBPath(dbname)) {
wal_dir_exists = env->GetChildren(soptions.wal_dir, &walDirFiles).ok();
archivedir = ArchivalDirectory(soptions.wal_dir);
}
}
// Add log files in wal_dir
- if (immutable_db_options_.wal_dir != dbname_) {
+
+ if (!immutable_db_options_.IsWalDirSameAsDBPath(dbname_)) {
std::vector<std::string> log_files;
Status s = env_->GetChildren(immutable_db_options_.wal_dir, &log_files);
s.PermitUncheckedError(); // TODO: What should we do on error?
blob_file.GetPath());
}
+ auto wal_dir = immutable_db_options_.GetWalDir();
for (auto file_num : state.log_delete_files) {
if (file_num > 0) {
- candidate_files.emplace_back(LogFileName(file_num),
- immutable_db_options_.wal_dir);
+ candidate_files.emplace_back(LogFileName(file_num), wal_dir);
}
}
for (const auto& filename : state.manifest_delete_files) {
fname = BlobFileName(candidate_file.file_path, number);
dir_to_sync = candidate_file.file_path;
} else {
- dir_to_sync =
- (type == kWalFile) ? immutable_db_options_.wal_dir : dbname_;
+ dir_to_sync = (type == kWalFile) ? wal_dir : dbname_;
fname = dir_to_sync +
((!dir_to_sync.empty() && dir_to_sync.back() == '/') ||
(!to_delete.empty() && to_delete.front() == '/')
result.recycle_log_file_num = 0;
}
- if (result.wal_dir.empty()) {
+ if (result.db_paths.size() == 0) {
+ result.db_paths.emplace_back(dbname, std::numeric_limits<uint64_t>::max());
+ } else if (result.wal_dir.empty()) {
// Use dbname as default
result.wal_dir = dbname;
}
- if (result.wal_dir.back() == '/') {
- result.wal_dir = result.wal_dir.substr(0, result.wal_dir.size() - 1);
+ if (!result.wal_dir.empty()) {
+ // If there is a wal_dir already set, check to see if the wal_dir is the
+ // same as the dbname AND the same as the db_path[0] (which must exist from
+ // a few lines ago). If the wal_dir matches both of these values, then clear
+ // the wal_dir value, which will make wal_dir == dbname. Most likely this
+ // condition was the result of reading an old options file where we forced
+ // wal_dir to be set (to dbname).
+ auto npath = NormalizePath(dbname + "/");
+ if (npath == NormalizePath(result.wal_dir + "/") &&
+ npath == NormalizePath(result.db_paths[0].path + "/")) {
+ result.wal_dir.clear();
+ }
}
- if (result.db_paths.size() == 0) {
- result.db_paths.emplace_back(dbname, std::numeric_limits<uint64_t>::max());
+ if (!result.wal_dir.empty() && result.wal_dir.back() == '/') {
+ result.wal_dir = result.wal_dir.substr(0, result.wal_dir.size() - 1);
}
if (result.use_direct_reads && result.compaction_readahead_size == 0) {
#ifndef ROCKSDB_LITE
ImmutableDBOptions immutable_db_options(result);
- if (!IsWalDirSameAsDBPath(&immutable_db_options)) {
+ if (!immutable_db_options.IsWalDirSameAsDBPath()) {
// Either the WAL dir and db_paths[0]/db_name are not the same, or we
// cannot tell for sure. In either case, assume they're different and
// explicitly cleanup the trash log files (bypass DeleteScheduler)
// DeleteScheduler::CleanupDirectory on the same dir later, it will be
// safe
std::vector<std::string> filenames;
- Status s = result.env->GetChildren(result.wal_dir, &filenames);
+ auto wal_dir = immutable_db_options.GetWalDir();
+ Status s = result.env->GetChildren(wal_dir, &filenames);
s.PermitUncheckedError(); //**TODO: What to do on error?
for (std::string& filename : filenames) {
if (filename.find(".log.trash", filename.length() -
std::string(".log.trash").length()) !=
std::string::npos) {
- std::string trash_file = result.wal_dir + "/" + filename;
+ std::string trash_file = wal_dir + "/" + filename;
result.env->DeleteFile(trash_file).PermitUncheckedError();
}
}
// Note that prev_log_number() is no longer used, but we pay
// attention to it in case we are recovering a database
// produced by an older version of rocksdb.
+ auto wal_dir = immutable_db_options_.GetWalDir();
if (!immutable_db_options_.best_efforts_recovery) {
- s = env_->GetChildren(immutable_db_options_.wal_dir, &files_in_wal_dir);
+ s = env_->GetChildren(wal_dir, &files_in_wal_dir);
}
if (s.IsNotFound()) {
- return Status::InvalidArgument("wal_dir not found",
- immutable_db_options_.wal_dir);
+ return Status::InvalidArgument("wal_dir not found", wal_dir);
} else if (!s.ok()) {
return s;
}
"existing log file: ",
file);
} else {
- wal_files[number] =
- LogFileName(immutable_db_options_.wal_dir, number);
+ wal_files[number] = LogFileName(wal_dir, number);
}
}
}
if (s.ok()) {
const std::string normalized_dbname = NormalizePath(dbname_);
const std::string normalized_wal_dir =
- NormalizePath(immutable_db_options_.wal_dir);
+ NormalizePath(immutable_db_options_.GetWalDir());
if (immutable_db_options_.best_efforts_recovery) {
filenames = std::move(files_in_dbname);
} else if (normalized_dbname == normalized_wal_dir) {
// update the file number allocation counter in VersionSet.
versions_->MarkFileNumberUsed(wal_number);
// Open the log file
- std::string fname = LogFileName(immutable_db_options_.wal_dir, wal_number);
+ std::string fname =
+ LogFileName(immutable_db_options_.GetWalDir(), wal_number);
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Recovering log #%" PRIu64 " mode %d", wal_number,
Status DBImpl::GetLogSizeAndMaybeTruncate(uint64_t wal_number, bool truncate,
LogFileNumberSize* log_ptr) {
LogFileNumberSize log(wal_number);
- std::string fname = LogFileName(immutable_db_options_.wal_dir, wal_number);
+ std::string fname =
+ LogFileName(immutable_db_options_.GetWalDir(), wal_number);
Status s;
// This gets the appear size of the wals, not including preallocated space.
s = env_->GetFileSize(fname, &log.size);
BuildDBOptions(immutable_db_options_, mutable_db_options_);
FileOptions opt_file_options =
fs_->OptimizeForLogWrite(file_options_, db_options);
- std::string log_fname =
- LogFileName(immutable_db_options_.wal_dir, log_file_num);
+ std::string wal_dir = immutable_db_options_.GetWalDir();
+ std::string log_fname = LogFileName(wal_dir, log_file_num);
if (recycle_log_number) {
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"reusing log %" PRIu64 " from recycle list\n",
recycle_log_number);
- std::string old_log_fname =
- LogFileName(immutable_db_options_.wal_dir, recycle_log_number);
+ std::string old_log_fname = LogFileName(wal_dir, recycle_log_number);
TEST_SYNC_POINT("DBImpl::CreateWAL:BeforeReuseWritableFile1");
TEST_SYNC_POINT("DBImpl::CreateWAL:BeforeReuseWritableFile2");
io_s = fs_->ReuseWritableFile(log_fname, old_log_fname, opt_file_options,
}
DBImpl* impl = new DBImpl(db_options, dbname, seq_per_batch, batch_per_txn);
- s = impl->env_->CreateDirIfMissing(impl->immutable_db_options_.wal_dir);
+ s = impl->env_->CreateDirIfMissing(impl->immutable_db_options_.GetWalDir());
if (s.ok()) {
std::vector<std::string> paths;
for (auto& db_path : impl->immutable_db_options_.db_paths) {
return s;
}
- impl->wal_in_db_path_ = IsWalDirSameAsDBPath(&impl->immutable_db_options_);
+ impl->wal_in_db_path_ = impl->immutable_db_options_.IsWalDirSameAsDBPath();
impl->mutex_.Lock();
// Handles create_if_missing, error_if_exists
assert(logs != nullptr);
std::vector<std::string> filenames;
Status s;
- s = env_->GetChildren(immutable_db_options_.wal_dir, &filenames);
+ s = env_->GetChildren(immutable_db_options_.GetWalDir(), &filenames);
if (s.IsNotFound()) {
return Status::InvalidArgument("Failed to open wal_dir",
- immutable_db_options_.wal_dir);
+ immutable_db_options_.GetWalDir());
} else if (!s.ok()) {
return s;
}
// initialize log reader from log_number
// TODO: min_log_number_to_keep_2pc check needed?
// Open the log file
- std::string fname = LogFileName(immutable_db_options_.wal_dir, log_number);
+ std::string fname =
+ LogFileName(immutable_db_options_.GetWalDir(), log_number);
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Recovering log #%" PRIu64 " mode %d", log_number,
static_cast<int>(immutable_db_options_.wal_recovery_mode));
&impl->write_controller_, impl->io_tracer_));
impl->column_family_memtables_.reset(
new ColumnFamilyMemTablesImpl(impl->versions_->GetColumnFamilySet()));
- impl->wal_in_db_path_ = IsWalDirSameAsDBPath(&impl->immutable_db_options_);
+ impl->wal_in_db_path_ = impl->immutable_db_options_.IsWalDirSameAsDBPath();
impl->mutex_.Lock();
s = impl->Recover(column_families, true, false, false);
}
// Get wal file in wal_dir
- if (dbname.compare(options.wal_dir) != 0) {
- if (!env->GetChildren(options.wal_dir, &files).ok()) {
- Error(options.info_log,
- "Error when reading %s dir\n",
- options.wal_dir.c_str());
+ const auto& wal_dir = options.GetWalDir(dbname);
+ if (!options.IsWalDirSameAsDBPath(dbname)) {
+ if (!env->GetChildren(wal_dir, &files).ok()) {
+ Error(options.info_log, "Error when reading %s dir\n", wal_dir.c_str());
return;
}
wal_info.clear();
for (const std::string& file : files) {
if (ParseFileName(file, &number, &type)) {
if (type == kWalFile) {
- if (env->GetFileSize(options.wal_dir + "/" + file, &file_size).ok()) {
+ if (env->GetFileSize(wal_dir + "/" + file, &file_size).ok()) {
wal_info.append(file)
.append(" size: ")
.append(std::to_string(file_size))
.append(" ; ");
} else {
Error(options.info_log, "Error when reading LOG file %s/%s\n",
- options.wal_dir.c_str(), file.c_str());
+ wal_dir.c_str(), file.c_str());
}
}
}
}
}
- Header(options.info_log, "Write Ahead Log file in %s: %s\n",
- options.wal_dir.c_str(), wal_info.c_str());
+ Header(options.info_log, "Write Ahead Log file in %s: %s\n", wal_dir.c_str(),
+ wal_info.c_str());
}
} // namespace ROCKSDB_NAMESPACE
options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery;
ReopenWithColumnFamilies({"default", "test1", "test2"}, options);
}
+
+TEST_F(DBTest2, RenameDirectory) {
+ Options options = CurrentOptions();
+ DestroyAndReopen(options);
+ ASSERT_OK(Put("foo", "value0"));
+ Close();
+ auto old_dbname = dbname_;
+ auto new_dbname = dbname_ + "_2";
+ EXPECT_OK(env_->RenameFile(dbname_, new_dbname));
+ options.create_if_missing = false;
+ dbname_ = new_dbname;
+ ASSERT_OK(TryReopen(options));
+ ASSERT_EQ("value0", Get("foo"));
+ Destroy(options);
+ dbname_ = old_dbname;
+}
} // namespace ROCKSDB_NAMESPACE
#ifdef ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS
}
// search wal_dir if user uses a customize wal_dir
- bool same = false;
- Status status = env_->AreFilesSame(db_options_.wal_dir, dbname_, &same);
- if (status.IsNotSupported()) {
- same = db_options_.wal_dir == dbname_;
- status = Status::OK();
- } else if (!status.ok()) {
- return status;
- }
-
+ bool same = immutable_db_options_.IsWalDirSameAsDBPath(dbname_);
if (!same) {
- to_search_paths.push_back(db_options_.wal_dir);
+ to_search_paths.push_back(immutable_db_options_.wal_dir);
}
for (size_t path_id = 0; path_id < to_search_paths.size(); path_id++) {
ROCKS_LOG_INFO(db_options_.info_log, "Searching path %s\n",
to_search_paths[path_id].c_str());
- status = env_->GetChildren(to_search_paths[path_id], &filenames);
+ Status status = env_->GetChildren(to_search_paths[path_id], &filenames);
if (!status.ok()) {
return status;
}
}
void ConvertLogFilesToTables() {
+ const auto& wal_dir = immutable_db_options_.GetWalDir();
for (size_t i = 0; i < logs_.size(); i++) {
// we should use LogFileName(wal_dir, logs_[i]) here. user might uses wal_dir option.
- std::string logname = LogFileName(db_options_.wal_dir, logs_[i]);
- Status status = ConvertLogToTable(logs_[i]);
+ std::string logname = LogFileName(wal_dir, logs_[i]);
+ Status status = ConvertLogToTable(wal_dir, logs_[i]);
if (!status.ok()) {
ROCKS_LOG_WARN(db_options_.info_log,
"Log #%" PRIu64 ": ignoring conversion error: %s",
}
}
- Status ConvertLogToTable(uint64_t log) {
+ Status ConvertLogToTable(const std::string& wal_dir, uint64_t log) {
struct LogReporter : public log::Reader::Reporter {
Env* env;
std::shared_ptr<Logger> info_log;
};
// Open the log file
- std::string logname = LogFileName(db_options_.wal_dir, log);
+ std::string logname = LogFileName(wal_dir, log);
const auto& fs = env_->GetFileSystem();
std::unique_ptr<SequentialFileReader> lfile_reader;
Status status = SequentialFileReader::Create(
#ifndef ROCKSDB_LITE
Status WalManager::DeleteFile(const std::string& fname, uint64_t number) {
- auto s = env_->DeleteFile(db_options_.wal_dir + "/" + fname);
+ auto s = env_->DeleteFile(wal_dir_ + "/" + fname);
if (s.ok()) {
MutexLock l(&read_first_record_cache_mutex_);
read_first_record_cache_.erase(number);
Status s;
// list wal files in main db dir.
VectorLogPtr logs;
- s = GetSortedWalsOfType(db_options_.wal_dir, logs, kAliveLogFile);
+ s = GetSortedWalsOfType(wal_dir_, logs, kAliveLogFile);
if (!s.ok()) {
return s;
}
files.clear();
// list wal files in archive dir.
- std::string archivedir = ArchivalDirectory(db_options_.wal_dir);
+ std::string archivedir = ArchivalDirectory(wal_dir_);
Status exists = env_->FileExists(archivedir);
if (exists.ok()) {
s = GetSortedWalsOfType(archivedir, files, kArchivedLogFile);
return s;
}
iter->reset(new TransactionLogIteratorImpl(
- db_options_.wal_dir, &db_options_, read_options, file_options_, seq,
+ wal_dir_, &db_options_, read_options, file_options_, seq,
std::move(wal_files), version_set, seq_per_batch_, io_tracer_));
return (*iter)->status();
}
purge_wal_files_last_run_ = now_seconds;
- std::string archival_dir = ArchivalDirectory(db_options_.wal_dir);
+ std::string archival_dir = ArchivalDirectory(wal_dir_);
std::vector<std::string> files;
s = env_->GetChildren(archival_dir, &files);
if (!s.ok()) {
for (size_t i = 0; i < files_del_num; ++i) {
std::string const file_path = archived_logs[i]->PathName();
- s = DeleteDBFile(&db_options_, db_options_.wal_dir + "/" + file_path,
- db_options_.wal_dir, false,
+ s = DeleteDBFile(&db_options_, wal_dir_ + "/" + file_path, wal_dir_, false,
/*force_fg=*/!wal_in_db_path_);
if (!s.ok()) {
ROCKS_LOG_WARN(db_options_.info_log, "Unable to delete file: %s: %s",
}
void WalManager::ArchiveWALFile(const std::string& fname, uint64_t number) {
- auto archived_log_name = ArchivedLogFileName(db_options_.wal_dir, number);
+ auto archived_log_name = ArchivedLogFileName(wal_dir_, number);
// The sync point below is used in (DBTest,TransactionLogIteratorRace)
TEST_SYNC_POINT("WalManager::PurgeObsoleteFiles:1");
Status s = env_->RenameFile(fname, archived_log_name);
}
Status s;
if (type == kAliveLogFile) {
- std::string fname = LogFileName(db_options_.wal_dir, number);
+ std::string fname = LogFileName(wal_dir_, number);
s = ReadFirstLine(fname, number, sequence);
if (!s.ok() && env_->FileExists(fname).ok()) {
// return any error that is not caused by non-existing file
if (type == kArchivedLogFile || !s.ok()) {
// check if the file got moved to archive.
- std::string archived_file =
- ArchivedLogFileName(db_options_.wal_dir, number);
+ std::string archived_file = ArchivedLogFileName(wal_dir_, number);
s = ReadFirstLine(archived_file, number, sequence);
// maybe the file was deleted from archive dir. If that's the case, return
// Status::OK(). The caller with identify this as empty file because
Status s;
uint64_t size_bytes;
- s = env_->GetFileSize(LogFileName(db_options_.wal_dir, number), &size_bytes);
+ s = env_->GetFileSize(LogFileName(wal_dir_, number), &size_bytes);
if (!s.ok()) {
return s;
fs_(db_options.fs, io_tracer),
purge_wal_files_last_run_(0),
seq_per_batch_(seq_per_batch),
- wal_in_db_path_(IsWalDirSameAsDBPath(&db_options)),
+ wal_dir_(db_options_.GetWalDir()),
+ wal_in_db_path_(db_options_.IsWalDirSameAsDBPath()),
io_tracer_(io_tracer) {}
Status GetSortedWalFiles(VectorLogPtr& files);
bool seq_per_batch_;
+ const std::string& wal_dir_;
+
bool wal_in_db_path_;
// obsolete files will be deleted every this seconds if ttl deletion is
#endif
}
-bool IsWalDirSameAsDBPath(const ImmutableDBOptions* db_options) {
- bool same = false;
- assert(!db_options->db_paths.empty());
- Status s = db_options->env->AreFilesSame(db_options->wal_dir,
- db_options->db_paths[0].path, &same);
- if (s.IsNotSupported()) {
- same = db_options->wal_dir == db_options->db_paths[0].path;
- }
- return same;
-}
-
// requested_checksum_func_name brings the function name of the checksum
// generator in checksum_factory. Empty string is permitted, in which case the
// name of the generator created by the factory is unchecked. When
const std::string& path_to_sync, const bool force_bg,
const bool force_fg);
-extern bool IsWalDirSameAsDBPath(const ImmutableDBOptions* db_options);
-
extern IOStatus GenerateOneFileChecksum(
FileSystem* fs, const std::string& file_path,
FileChecksumGenFactory* checksum_factory,
db_host_id.c_str());
}
+bool ImmutableDBOptions::IsWalDirSameAsDBPath() const {
+ assert(!db_paths.empty());
+ return IsWalDirSameAsDBPath(db_paths[0].path);
+}
+
+bool ImmutableDBOptions::IsWalDirSameAsDBPath(
+ const std::string& db_path) const {
+ bool same = wal_dir.empty();
+ if (!same) {
+ Status s = env->AreFilesSame(wal_dir, db_path, &same);
+ if (s.IsNotSupported()) {
+ same = wal_dir == db_path;
+ }
+ }
+ return same;
+}
+
+const std::string& ImmutableDBOptions::GetWalDir() const {
+ if (wal_dir.empty()) {
+ assert(!db_paths.empty());
+ return db_paths[0].path;
+ } else {
+ return wal_dir;
+ }
+}
+
+const std::string& ImmutableDBOptions::GetWalDir(
+ const std::string& path) const {
+ if (wal_dir.empty()) {
+ return path;
+ } else {
+ return wal_dir;
+ }
+}
+
MutableDBOptions::MutableDBOptions()
: max_background_jobs(2),
base_background_compactions(-1),
bool use_fsync;
std::vector<DbPath> db_paths;
std::string db_log_dir;
+ // The wal_dir option from the file. To determine the
+ // directory in use, the GetWalDir or IsWalDirSameAsDBPath
+ // methods should be used instead of accessing this variable directly.
std::string wal_dir;
size_t max_log_file_size;
size_t log_file_time_to_roll;
Statistics* stats;
Logger* logger;
std::shared_ptr<CompactionService> compaction_service;
+
+ bool IsWalDirSameAsDBPath() const;
+ bool IsWalDirSameAsDBPath(const std::string& path) const;
+ const std::string& GetWalDir() const;
+ const std::string& GetWalDir(const std::string& path) const;
};
struct MutableDBOptions {
db_ = nullptr;
return;
}
- if (options_.env->FileExists(options_.wal_dir).IsNotFound()) {
- options_.wal_dir = db_path_;
- fprintf(
- stderr,
- "wal_dir loaded from the option file doesn't exist. Ignore it.\n");
+ if (!options_.wal_dir.empty()) {
+ if (options_.env->FileExists(options_.wal_dir).IsNotFound()) {
+ options_.wal_dir = db_path_;
+ fprintf(
+ stderr,
+ "wal_dir loaded from the option file doesn't exist. Ignore it.\n");
+ }
}
// If merge operator is not set, set a string append operator.
if (!s.ok()) {
std::cerr << "Error when getting WAL files" << std::endl;
} else {
+ std::string wal_dir;
+ if (options_.wal_dir.empty()) {
+ wal_dir = db_->GetName();
+ } else {
+ wal_dir = NormalizePath(options_.wal_dir + "/");
+ }
for (auto& wal : wal_files) {
// TODO(qyang): option.wal_dir should be passed into ldb command
- std::string filename = db_->GetOptions().wal_dir + wal->PathName();
+ std::string filename = wal_dir + wal->PathName();
std::cout << filename << std::endl;
// TODO(myabandeh): allow configuring is_write_commited
DumpWalFile(options_, filename, true, true, true /* is_write_commited */,
ASSERT_EQ(column_families[1].options.num_levels, cf_opts.num_levels);
ASSERT_EQ(column_families[1].options.write_buffer_size, 268435456);
}
+
+TEST_F(LdbCmdTest, RenameDbAndLoadOptions) {
+ Env* env = TryLoadCustomOrDefaultEnv();
+ Options opts;
+ opts.env = env;
+ opts.create_if_missing = false;
+
+ std::string old_dbname = test::PerThreadDBPath(env, "ldb_cmd_test");
+ std::string new_dbname = old_dbname + "_2";
+ DestroyDB(old_dbname, opts);
+ DestroyDB(new_dbname, opts);
+
+ char old_arg[1024];
+ snprintf(old_arg, sizeof(old_arg), "--db=%s", old_dbname.c_str());
+ char new_arg[1024];
+ snprintf(new_arg, sizeof(old_arg), "--db=%s", new_dbname.c_str());
+ const char* argv1[] = {"./ldb",
+ old_arg,
+ "put",
+ "key1",
+ "value1",
+ "--try_load_options",
+ "--create_if_missing"};
+
+ const char* argv2[] = {"./ldb", old_arg, "get", "key1", "--try_load_options"};
+ const char* argv3[] = {"./ldb", new_arg, "put",
+ "key2", "value2", "--try_load_options"};
+
+ const char* argv4[] = {"./ldb", new_arg, "get", "key1", "--try_load_options"};
+ const char* argv5[] = {"./ldb", new_arg, "get", "key2", "--try_load_options"};
+
+ ASSERT_EQ(
+ 0, LDBCommandRunner::RunCommand(7, argv1, opts, LDBOptions(), nullptr));
+ ASSERT_EQ(
+ 0, LDBCommandRunner::RunCommand(5, argv2, opts, LDBOptions(), nullptr));
+ ConfigOptions config_opts;
+ Options options;
+ std::vector<ColumnFamilyDescriptor> column_families;
+ config_opts.env = env;
+ ASSERT_OK(
+ LoadLatestOptions(config_opts, old_dbname, &options, &column_families));
+ ASSERT_EQ(options.wal_dir, "");
+
+ ASSERT_OK(env->RenameFile(old_dbname, new_dbname));
+ ASSERT_NE(
+ 0, LDBCommandRunner::RunCommand(6, argv1, opts, LDBOptions(), nullptr));
+ ASSERT_NE(
+ 0, LDBCommandRunner::RunCommand(5, argv2, opts, LDBOptions(), nullptr));
+ ASSERT_EQ(
+ 0, LDBCommandRunner::RunCommand(6, argv3, opts, LDBOptions(), nullptr));
+ ASSERT_EQ(
+ 0, LDBCommandRunner::RunCommand(5, argv4, opts, LDBOptions(), nullptr));
+ ASSERT_EQ(
+ 0, LDBCommandRunner::RunCommand(5, argv5, opts, LDBOptions(), nullptr));
+ DestroyDB(new_dbname, opts);
+}
} // namespace ROCKSDB_NAMESPACE
#ifdef ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS
// Link WAL files. Copy exact size of last one because it is the only one
// that has changes after the last flush.
+ ImmutableDBOptions ioptions(db_options);
+ auto wal_dir = ioptions.GetWalDir();
for (size_t i = 0; s.ok() && i < wal_size; ++i) {
if ((live_wal_files[i]->Type() == kAliveLogFile) &&
(!flush_memtable ||
live_wal_files[i]->LogNumber() >= min_log_num)) {
if (i + 1 == wal_size) {
- s = copy_file_cb(db_options.wal_dir, live_wal_files[i]->PathName(),
+ s = copy_file_cb(wal_dir, live_wal_files[i]->PathName(),
live_wal_files[i]->SizeFileBytes(), kWalFile,
kUnknownFileChecksumFuncName, kUnknownFileChecksum);
break;
}
if (same_fs) {
// we only care about live log files
- s = link_file_cb(db_options.wal_dir, live_wal_files[i]->PathName(),
- kWalFile);
+ s = link_file_cb(wal_dir, live_wal_files[i]->PathName(), kWalFile);
if (s.IsNotSupported()) {
same_fs = false;
s = Status::OK();
}
}
if (!same_fs) {
- s = copy_file_cb(db_options.wal_dir, live_wal_files[i]->PathName(), 0,
- kWalFile, kUnknownFileChecksumFuncName,
- kUnknownFileChecksum);
+ s = copy_file_cb(wal_dir, live_wal_files[i]->PathName(), 0, kWalFile,
+ kUnknownFileChecksumFuncName, kUnknownFileChecksum);
}
}
}
// Ignore the errors for future releases when ignore_unknown_options=true
ASSERT_OK(LoadLatestOptions(ignore_opts, dbname_, &db_opts, &cf_descs));
}
+
+TEST_F(OptionsUtilTest, RenameDatabaseDirectory) {
+ DB* db;
+ Options options;
+ DBOptions db_opts;
+ std::vector<ColumnFamilyDescriptor> cf_descs;
+ std::vector<ColumnFamilyHandle*> handles;
+
+ options.create_if_missing = true;
+
+ ASSERT_OK(DB::Open(options, dbname_, &db));
+ ASSERT_OK(db->Put(WriteOptions(), "foo", "value0"));
+ delete db;
+
+ auto new_dbname = dbname_ + "_2";
+
+ ASSERT_OK(options.env->RenameFile(dbname_, new_dbname));
+ ASSERT_OK(LoadLatestOptions(new_dbname, options.env, &db_opts, &cf_descs));
+ ASSERT_EQ(cf_descs.size(), 1U);
+
+ db_opts.create_if_missing = false;
+ ASSERT_OK(DB::Open(db_opts, new_dbname, cf_descs, &handles, &db));
+ std::string value;
+ ASSERT_OK(db->Get(ReadOptions(), "foo", &value));
+ ASSERT_EQ("value0", value);
+ // close the db
+ for (auto* handle : handles) {
+ delete handle;
+ }
+ delete db;
+ Options new_options(db_opts, cf_descs[0].options);
+ ASSERT_OK(DestroyDB(new_dbname, new_options, cf_descs));
+ ASSERT_OK(DestroyDB(dbname_, options));
+}
+
+TEST_F(OptionsUtilTest, WalDirSettings) {
+ DB* db;
+ Options options;
+ DBOptions db_opts;
+ std::vector<ColumnFamilyDescriptor> cf_descs;
+ std::vector<ColumnFamilyHandle*> handles;
+
+ options.create_if_missing = true;
+
+ // Open a DB with no wal dir set. The wal_dir should stay empty
+ ASSERT_OK(DB::Open(options, dbname_, &db));
+ delete db;
+ ASSERT_OK(LoadLatestOptions(dbname_, options.env, &db_opts, &cf_descs));
+ ASSERT_EQ(db_opts.wal_dir, "");
+
+ // Open a DB with wal_dir == dbname. The wal_dir should be set to empty
+ options.wal_dir = dbname_;
+ ASSERT_OK(DB::Open(options, dbname_, &db));
+ delete db;
+ ASSERT_OK(LoadLatestOptions(dbname_, options.env, &db_opts, &cf_descs));
+ ASSERT_EQ(db_opts.wal_dir, "");
+
+ // Open a DB with no wal_dir but a db_path==dbname_. The wal_dir should be
+ // empty
+ options.wal_dir = "";
+ options.db_paths.emplace_back(dbname_, std::numeric_limits<uint64_t>::max());
+ ASSERT_OK(DB::Open(options, dbname_, &db));
+ delete db;
+ ASSERT_OK(LoadLatestOptions(dbname_, options.env, &db_opts, &cf_descs));
+ ASSERT_EQ(db_opts.wal_dir, "");
+
+ // Open a DB with no wal_dir==dbname_ and db_path==dbname_. The wal_dir
+ // should be empty
+ options.wal_dir = dbname_ + "/";
+ options.db_paths.emplace_back(dbname_, std::numeric_limits<uint64_t>::max());
+ ASSERT_OK(DB::Open(options, dbname_, &db));
+ delete db;
+ ASSERT_OK(LoadLatestOptions(dbname_, options.env, &db_opts, &cf_descs));
+ ASSERT_EQ(db_opts.wal_dir, "");
+ ASSERT_OK(DestroyDB(dbname_, options));
+
+ // Open a DB with no wal_dir but db_path != db_name. The wal_dir == dbname_
+ options.wal_dir = "";
+ options.db_paths.clear();
+ options.db_paths.emplace_back(dbname_ + "_0",
+ std::numeric_limits<uint64_t>::max());
+ ASSERT_OK(DB::Open(options, dbname_, &db));
+ delete db;
+ ASSERT_OK(LoadLatestOptions(dbname_, options.env, &db_opts, &cf_descs));
+ ASSERT_EQ(db_opts.wal_dir, dbname_);
+ ASSERT_OK(DestroyDB(dbname_, options));
+
+ // Open a DB with wal_dir != db_name. The wal_dir remains unchanged
+ options.wal_dir = dbname_ + "/wal";
+ options.db_paths.clear();
+ ASSERT_OK(DB::Open(options, dbname_, &db));
+ delete db;
+ ASSERT_OK(LoadLatestOptions(dbname_, options.env, &db_opts, &cf_descs));
+ ASSERT_EQ(db_opts.wal_dir, dbname_ + "/wal");
+ ASSERT_OK(DestroyDB(dbname_, options));
+}
+
+TEST_F(OptionsUtilTest, WalDirInOptins) {
+ DB* db;
+ Options options;
+ DBOptions db_opts;
+ std::vector<ColumnFamilyDescriptor> cf_descs;
+ std::vector<ColumnFamilyHandle*> handles;
+
+ // Store an options file with wal_dir=dbname_ and make sure it still loads
+ // when the input wal_dir is empty
+ options.create_if_missing = true;
+ options.wal_dir = "";
+ ASSERT_OK(DB::Open(options, dbname_, &db));
+ delete db;
+ options.wal_dir = dbname_;
+ std::string options_file;
+ ASSERT_OK(GetLatestOptionsFileName(dbname_, options.env, &options_file));
+ ASSERT_OK(PersistRocksDBOptions(options, {"default"}, {options},
+ dbname_ + "/" + options_file,
+ options.env->GetFileSystem().get()));
+ ASSERT_OK(LoadLatestOptions(dbname_, options.env, &db_opts, &cf_descs));
+ ASSERT_EQ(db_opts.wal_dir, dbname_);
+ options.wal_dir = "";
+ ASSERT_OK(DB::Open(options, dbname_, &db));
+ delete db;
+ ASSERT_OK(LoadLatestOptions(dbname_, options.env, &db_opts, &cf_descs));
+ ASSERT_EQ(db_opts.wal_dir, "");
+}
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {