bool has_end = false;
std::string end;
+ uint64_t options_file_number;
+
// serialization interface to read and write the object
static Status Read(const std::string& data_str, CompactionServiceInput* obj);
Status Write(std::string* output);
compaction_input.has_end = sub_compact->end.has_value();
compaction_input.end =
compaction_input.has_end ? sub_compact->end->ToString() : "";
+ compaction_input.options_file_number =
+ sub_compact->compaction->input_version()
+ ->version_set()
+ ->options_file_number();
+
+ TEST_SYNC_POINT_CALLBACK(
+ "CompactionServiceJob::ProcessKeyValueCompactionWithCompactionService",
+ &compaction_input);
std::string compaction_input_binary;
Status s = compaction_input.Write(&compaction_input_binary);
{"end",
{offsetof(struct CompactionServiceInput, end), OptionType::kEncodedString,
OptionVerificationType::kNormal, OptionTypeFlags::kNone}},
+ {"options_file_number",
+ {offsetof(struct CompactionServiceInput, options_file_number),
+ OptionType::kUInt64T, OptionVerificationType::kNormal,
+ OptionTypeFlags::kNone}},
};
static std::unordered_map<std::string, OptionTypeInfo>
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
-
#include "db/db_test_util.h"
#include "port/stack_trace.h"
+#include "rocksdb/utilities/options_util.h"
#include "table/unique_id_impl.h"
namespace ROCKSDB_NAMESPACE {
ASSERT_TRUE(result.stats.is_remote_compaction);
}
+TEST_F(CompactionServiceTest, PreservedOptionsLocalCompaction) {
+ Options options = CurrentOptions();
+ options.level0_file_num_compaction_trigger = 2;
+ options.disable_auto_compactions = true;
+ DestroyAndReopen(options);
+
+ Random rnd(301);
+ for (auto i = 0; i < 2; ++i) {
+ for (auto j = 0; j < 10; ++j) {
+ ASSERT_OK(
+ Put("foo" + std::to_string(i * 10 + j), rnd.RandomString(1024)));
+ }
+ ASSERT_OK(Flush());
+ }
+
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "CompactionJob::ProcessKeyValueCompaction()::Processing", [&](void* arg) {
+ auto compaction = static_cast<Compaction*>(arg);
+ std::string options_file_name = OptionsFileName(
+ dbname_,
+ compaction->input_version()->version_set()->options_file_number());
+
+ // Change option twice to make sure the very first OPTIONS file gets
+ // purged
+ ASSERT_OK(dbfull()->SetOptions(
+ {{"level0_file_num_compaction_trigger", "4"}}));
+ ASSERT_EQ(4, dbfull()->GetOptions().level0_file_num_compaction_trigger);
+ ASSERT_OK(dbfull()->SetOptions(
+ {{"level0_file_num_compaction_trigger", "6"}}));
+ ASSERT_EQ(6, dbfull()->GetOptions().level0_file_num_compaction_trigger);
+ dbfull()->TEST_DeleteObsoleteFiles();
+
+ // For non-remote compactions, OPTIONS file can be deleted while
+ // using option at the start of the compaction
+ Status s = env_->FileExists(options_file_name);
+ ASSERT_NOK(s);
+ ASSERT_TRUE(s.IsNotFound());
+ // Should be old value
+ ASSERT_EQ(2, compaction->mutable_cf_options()
+ ->level0_file_num_compaction_trigger);
+ ASSERT_TRUE(dbfull()->min_options_file_numbers_.empty());
+ });
+
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+ Status s = dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
+ ASSERT_TRUE(s.ok());
+}
+
+TEST_F(CompactionServiceTest, PreservedOptionsRemoteCompaction) {
+ // For non-remote compaction do not preserve options file
+ Options options = CurrentOptions();
+ options.level0_file_num_compaction_trigger = 2;
+ options.disable_auto_compactions = true;
+ ReopenWithCompactionService(&options);
+ GenerateTestData();
+
+ auto my_cs = GetCompactionService();
+
+ Random rnd(301);
+ for (auto i = 0; i < 2; ++i) {
+ for (auto j = 0; j < 10; ++j) {
+ ASSERT_OK(
+ Put("foo" + std::to_string(i * 10 + j), rnd.RandomString(1024)));
+ }
+ ASSERT_OK(Flush());
+ }
+
+ bool is_primary_called = false;
+ // This will be called twice. One from primary and one from remote.
+ // Try changing the option when called from remote. Otherwise, the new option
+ // will be used
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "DBImpl::BackgroundCompaction:NonTrivial:BeforeRun", [&](void* /*arg*/) {
+ if (!is_primary_called) {
+ is_primary_called = true;
+ return;
+ }
+ // Change the option right before the compaction run
+ ASSERT_OK(dbfull()->SetOptions(
+ {{"level0_file_num_compaction_trigger", "4"}}));
+ ASSERT_EQ(4, dbfull()->GetOptions().level0_file_num_compaction_trigger);
+ dbfull()->TEST_DeleteObsoleteFiles();
+ });
+
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "CompactionServiceJob::ProcessKeyValueCompactionWithCompactionService",
+ [&](void* arg) {
+ auto input = static_cast<CompactionServiceInput*>(arg);
+ std::string options_file_name =
+ OptionsFileName(dbname_, input->options_file_number);
+
+ ASSERT_OK(env_->FileExists(options_file_name));
+ ASSERT_FALSE(dbfull()->min_options_file_numbers_.empty());
+ ASSERT_EQ(dbfull()->min_options_file_numbers_.front(),
+ input->options_file_number);
+
+ DBOptions db_options;
+ ConfigOptions config_options;
+ std::vector<ColumnFamilyDescriptor> all_column_families;
+ config_options.env = env_;
+ ASSERT_OK(LoadOptionsFromFile(config_options, options_file_name,
+ &db_options, &all_column_families));
+ bool has_cf = false;
+ for (auto& cf : all_column_families) {
+ if (cf.name == input->cf_name) {
+ // Should be old value
+ ASSERT_EQ(2, cf.options.level0_file_num_compaction_trigger);
+ has_cf = true;
+ }
+ }
+ ASSERT_TRUE(has_cf);
+ });
+
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "CompactionJob::ProcessKeyValueCompaction()::Processing", [&](void* arg) {
+ auto compaction = static_cast<Compaction*>(arg);
+ ASSERT_EQ(2, compaction->mutable_cf_options()
+ ->level0_file_num_compaction_trigger);
+ });
+
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+ Status s = dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
+ ASSERT_TRUE(s.ok());
+
+ CompactionServiceResult result;
+ my_cs->GetResult(&result);
+ ASSERT_OK(result.status);
+ ASSERT_TRUE(result.stats.is_manual_compaction);
+ ASSERT_TRUE(result.stats.is_remote_compaction);
+}
+
TEST_F(CompactionServiceTest, CorruptedOutput) {
Options options = CurrentOptions();
options.disable_auto_compactions = true;
}
}
+std::list<uint64_t>::iterator DBImpl::CaptureOptionsFileNumber() {
+ // We need to remember the iterator of our insert, because after the
+ // compaction is done, we need to remove that element from
+ // min_options_file_numbers_.
+ min_options_file_numbers_.push_back(versions_->options_file_number());
+ auto min_options_file_numbers_inserted_elem = min_options_file_numbers_.end();
+ --min_options_file_numbers_inserted_elem;
+ return min_options_file_numbers_inserted_elem;
+}
+
+void DBImpl::ReleaseOptionsFileNumber(
+ std::unique_ptr<std::list<uint64_t>::iterator>& v) {
+ if (v.get() != nullptr) {
+ min_options_file_numbers_.erase(*v.get());
+ v.reset();
+ }
+}
+
Status DBImpl::GetUpdatesSince(
SequenceNumber seq, std::unique_ptr<TransactionLogIterator>* iter,
const TransactionLogIterator::ReadOptions& read_options) {
uint64_t GetObsoleteSstFilesSize();
+ uint64_t MinOptionsFileNumberToKeep();
+
// Returns the list of live files in 'live' and the list
// of all files in the filesystem in 'candidate_files'.
// If force == false and the last call was less than
friend class XFTransactionWriteHandler;
friend class DBBlobIndexTest;
friend class WriteUnpreparedTransactionTest_RecoveryTest_Test;
+ friend class CompactionServiceTest_PreservedOptionsLocalCompaction_Test;
+ friend class CompactionServiceTest_PreservedOptionsRemoteCompaction_Test;
#endif
struct CompactionState;
void ReleaseFileNumberFromPendingOutputs(
std::unique_ptr<std::list<uint64_t>::iterator>& v);
+ // Similar to pending_outputs, preserve OPTIONS file. Used for remote
+ // compaction.
+ std::list<uint64_t>::iterator CaptureOptionsFileNumber();
+ void ReleaseOptionsFileNumber(
+ std::unique_ptr<std::list<uint64_t>::iterator>& v);
+
// Sets bg error if there is an error writing to WAL.
IOStatus SyncClosedWals(const WriteOptions& write_options,
JobContext* job_context, VersionEdit* synced_wals,
// State is protected with db mutex.
std::list<uint64_t> pending_outputs_;
+ // Similar to pending_outputs_, FindObsoleteFiles()/PurgeObsoleteFiles() never
+ // deletes any OPTIONS file that has number bigger than any of the file number
+ // in min_options_file_numbers_.
+ std::list<uint64_t> min_options_file_numbers_;
+
// flush_queue_ and compaction_queue_ hold column families that we need to
// flush and compact, respectively.
// A column family is inserted into flush_queue_ when it satisfies condition
compaction_job.Prepare();
+ std::unique_ptr<std::list<uint64_t>::iterator> min_options_file_number_elem;
+ if (immutable_db_options().compaction_service != nullptr) {
+ min_options_file_number_elem.reset(
+ new std::list<uint64_t>::iterator(CaptureOptionsFileNumber()));
+ }
+
mutex_.Unlock();
TEST_SYNC_POINT("CompactFilesImpl:0");
TEST_SYNC_POINT("CompactFilesImpl:1");
TEST_SYNC_POINT("CompactFilesImpl:3");
mutex_.Lock();
+ if (immutable_db_options().compaction_service != nullptr) {
+ ReleaseOptionsFileNumber(min_options_file_number_elem);
+ }
+
bool compaction_released = false;
Status status =
compaction_job.Install(*c->mutable_cf_options(), &compaction_released);
&bg_bottom_compaction_scheduled_);
compaction_job.Prepare();
+ std::unique_ptr<std::list<uint64_t>::iterator> min_options_file_number_elem;
+ if (immutable_db_options().compaction_service != nullptr) {
+ min_options_file_number_elem.reset(
+ new std::list<uint64_t>::iterator(CaptureOptionsFileNumber()));
+ }
+
NotifyOnCompactionBegin(c->column_family_data(), c.get(), status,
compaction_job_stats, job_context->job_id);
mutex_.Unlock();
compaction_job.Run().PermitUncheckedError();
TEST_SYNC_POINT("DBImpl::BackgroundCompaction:NonTrivial:AfterRun");
mutex_.Lock();
+
+ if (immutable_db_options().compaction_service != nullptr) {
+ ReleaseOptionsFileNumber(min_options_file_number_elem);
+ }
+
status =
compaction_job.Install(*c->mutable_cf_options(), &compaction_released);
io_s = compaction_job.io_status();
return versions_->GetObsoleteSstFilesSize();
}
+uint64_t DBImpl::MinOptionsFileNumberToKeep() {
+ mutex_.AssertHeld();
+ if (!min_options_file_numbers_.empty()) {
+ return *min_options_file_numbers_.begin();
+ }
+ return std::numeric_limits<uint64_t>::max();
+}
+
Status DBImpl::DisableFileDeletions() {
Status s;
int my_disable_delete_obsolete_files;
// here but later find newer generated unfinalized files while scanning.
job_context->min_pending_output = MinObsoleteSstNumberToKeep();
job_context->files_to_quarantine = error_handler_.GetFilesToQuarantine();
+ job_context->min_options_file_number = MinOptionsFileNumberToKeep();
// Get obsolete files. This function will also update the list of
// pending files in VersionSet().
dbname_);
// File numbers of most recent two OPTIONS file in candidate_files (found in
- // previos FindObsoleteFiles(full_scan=true))
+ // previous FindObsoleteFiles(full_scan=true))
// At this point, there must not be any duplicate file numbers in
// candidate_files.
uint64_t optsfile_num1 = std::numeric_limits<uint64_t>::min();
}
}
+ // For remote compactions, we need to keep OPTIONS file that may get
+ // referenced by the remote worker
+
+ optsfile_num2 = std::min(optsfile_num2, state.min_options_file_number);
+
// Close WALs before trying to delete them.
for (const auto w : state.logs_to_free) {
// TODO: maybe check the return value of Close.
return s;
}
- // 2. Load the options from latest OPTIONS file
+ // 2. Load the options
DBOptions db_options;
ConfigOptions config_options;
config_options.env = override_options.env;
std::vector<ColumnFamilyDescriptor> all_column_families;
- s = LoadLatestOptions(config_options, name, &db_options,
- &all_column_families);
- // In a very rare scenario, loading options may fail if the options changed by
- // the primary host at the same time. Just retry once for now.
- if (!s.ok()) {
- s = LoadLatestOptions(config_options, name, &db_options,
+
+ std::string options_file_name =
+ OptionsFileName(name, compaction_input.options_file_number);
+
+ s = LoadOptionsFromFile(config_options, options_file_name, &db_options,
&all_column_families);
- if (!s.ok()) {
- return s;
- }
+ if (!s.ok()) {
+ return s;
}
// 3. Override pointer configurations in DBOptions with
// that corresponds to the set of files in 'live'.
uint64_t manifest_file_number;
uint64_t pending_manifest_file_number;
+
+ // Used for remote compaction. To prevent OPTIONS files from getting
+ // purged by PurgeObsoleteFiles() of the primary host
+ uint64_t min_options_file_number;
uint64_t log_number;
uint64_t prev_log_number;
--- /dev/null
+OPTIONS file to be loaded by remote worker is now preserved so that it does not get purged by the primary host. A similar technique as how we are preserving new SST files from getting purged is used for this. min_options_file_numbers_ is tracked like pending_outputs_ is tracked.