compact->builder->Abandon();
compact->builder.reset();
} else {
- assert(compact->outfile == nullptr);
+ assert(!status.ok() || compact->outfile == nullptr);
}
for (size_t i = 0; i < compact->outputs.size(); i++) {
const CompactionState::Output& out = compact->outputs[i];
pending_outputs_[file_number] = compact->compaction->GetOutputPathId();
mutex_.Unlock();
}
+ // Make the output file
+ std::string fname = TableFileName(db_options_.db_paths, file_number,
+ compact->compaction->GetOutputPathId());
+ Status s = env_->NewWritableFile(fname, &compact->outfile, env_options_);
+
+ if (!s.ok()) {
+ Log(InfoLogLevel::ERROR_LEVEL, db_options_.info_log,
+ "[%s] OpenCompactionOutputFiles for table #%" PRIu64 " "
+ "fails at NewWritableFile with status %s",
+ compact->compaction->column_family_data()->GetName().c_str(),
+ file_number, s.ToString().c_str());
+ LogFlush(db_options_.info_log);
+ return s;
+ }
CompactionState::Output out;
out.number = file_number;
out.path_id = compact->compaction->GetOutputPathId();
out.smallest_seqno = out.largest_seqno = 0;
compact->outputs.push_back(out);
- // Make the output file
- std::string fname = TableFileName(db_options_.db_paths, file_number,
- compact->compaction->GetOutputPathId());
- Status s = env_->NewWritableFile(fname, &compact->outfile, env_options_);
+ compact->outfile->SetIOPriority(Env::IO_LOW);
+ compact->outfile->SetPreallocationBlockSize(
+ compact->compaction->OutputFilePreallocationSize(mutable_cf_options));
- if (s.ok()) {
- compact->outfile->SetIOPriority(Env::IO_LOW);
- compact->outfile->SetPreallocationBlockSize(
- compact->compaction->OutputFilePreallocationSize(mutable_cf_options));
-
- ColumnFamilyData* cfd = compact->compaction->column_family_data();
- compact->builder.reset(NewTableBuilder(
- *cfd->ioptions(), cfd->internal_comparator(), compact->outfile.get(),
- compact->compaction->OutputCompressionType(),
- cfd->ioptions()->compression_opts));
- }
+ ColumnFamilyData* cfd = compact->compaction->column_family_data();
+ compact->builder.reset(NewTableBuilder(
+ *cfd->ioptions(), cfd->internal_comparator(), compact->outfile.get(),
+ compact->compaction->OutputCompressionType(),
+ cfd->ioptions()->compression_opts));
LogFlush(db_options_.info_log);
return s;
}
int64_t key_drop_obsolete = 0;
int64_t loop_cnt = 0;
while (input->Valid() && !shutting_down_.Acquire_Load() &&
- !cfd->IsDropped()) {
+ !cfd->IsDropped() && status.ok()) {
if (++loop_cnt > 1000) {
if (key_drop_user > 0) {
RecordTick(stats_, COMPACTION_KEY_DROP_USER, key_drop_user);
// Special Env used to delay background operations
class SpecialEnv : public EnvWrapper {
public:
+ Random rnd_;
+
// sstable Sync() calls are blocked while this pointer is non-nullptr.
port::AtomicPointer delay_sstable_sync_;
std::atomic<int> sync_counter_;
- explicit SpecialEnv(Env* base) : EnvWrapper(base) {
+ std::atomic<uint32_t> non_writeable_rate_;
+
+ std::atomic<uint32_t> new_writable_count_;
+
+ std::atomic<uint32_t> periodic_non_writable_;
+
+ explicit SpecialEnv(Env* base) : EnvWrapper(base), rnd_(301) {
delay_sstable_sync_.Release_Store(nullptr);
drop_writes_.Release_Store(nullptr);
no_space_.Release_Store(nullptr);
log_write_error_.Release_Store(nullptr);
bytes_written_ = 0;
sync_counter_ = 0;
+ non_writeable_rate_ = 0;
+ new_writable_count_ = 0;
+ periodic_non_writable_ = 0;
}
Status NewWritableFile(const std::string& f, unique_ptr<WritableFile>* r,
}
};
- if (non_writable_.Acquire_Load() != nullptr) {
- return Status::IOError("simulated write error");
+ if (non_writeable_rate_.load(std::memory_order_acquire) > 0) {
+ auto random_number = rnd_.Uniform(100);
+ if (random_number < non_writeable_rate_.load()) {
+ return Status::IOError("simulated random write error");
+ }
+ }
+
+ new_writable_count_++;
+
+ auto periodic_fail = periodic_non_writable_.load();
+ if (periodic_fail > 0 &&
+ new_writable_count_.load() % periodic_fail == 0) {
+ return Status::IOError("simulated periodic write error");
}
Status s = target()->NewWritableFile(f, r, soptions);
options.env = env_;
Reopen(&options);
ASSERT_OK(Put("foo", "v1"));
- env_->non_writable_.Release_Store(env_); // Force errors for new files
+ env_->non_writeable_rate_.store(100); // Force errors for new files
std::string big(100000, 'x');
int errors = 0;
for (int i = 0; i < 20; i++) {
}
}
ASSERT_GT(errors, 0);
- env_->non_writable_.Release_Store(nullptr);
+ env_->non_writeable_rate_.store(0);
} while (ChangeCompactOptions());
}
ASSERT_TRUE(SizeAtLevel(0) > k128KB + k64KB - 2 * k5KB);
}
+TEST(DBTest, FileCreationRandomFailure) {
+ Options options;
+ options.env = env_;
+ options.create_if_missing = true;
+ options.write_buffer_size = 100000; // Small write buffer
+ options.target_file_size_base = 200000;
+ options.max_bytes_for_level_base = 1000000;
+ options.max_bytes_for_level_multiplier = 2;
+
+ DestroyAndReopen(&options);
+ Random rnd(301);
+
+ const int kTestSize = kCDTKeysPerBuffer * 4096;
+ const int kTotalIteration = 100;
+ // the second half of the test involves in random failure
+ // of file creation.
+ const int kRandomFailureTest = kTotalIteration / 2;
+ std::vector<std::string> values;
+ for (int i = 0; i < kTestSize; ++i) {
+ values.push_back("NOT_FOUND");
+ }
+ for (int j = 0; j < kTotalIteration; ++j) {
+ if (j == kRandomFailureTest) {
+ env_->non_writeable_rate_.store(90);
+ }
+ for (int k = 0; k < kTestSize; ++k) {
+ // here we expect some of the Put fails.
+ std::string value = RandomString(&rnd, 100);
+ Status s = Put(Key(k), Slice(value));
+ if (s.ok()) {
+ // update the latest successful put
+ values[k] = value;
+ }
+ // But everything before we simulate the failure-test should succeed.
+ if (j < kRandomFailureTest) {
+ ASSERT_OK(s);
+ }
+ }
+ }
+
+ // If rocksdb does not do the correct job, internal assert will fail here.
+ dbfull()->TEST_WaitForFlushMemTable();
+ dbfull()->TEST_WaitForCompact();
+
+ // verify we have the latest successful update
+ for (int k = 0; k < kTestSize; ++k) {
+ auto v = Get(Key(k));
+ ASSERT_EQ(v, values[k]);
+ }
+
+ // reopen and reverify we have the latest successful update
+ env_->non_writeable_rate_.store(0);
+ Reopen(&options);
+ for (int k = 0; k < kTestSize; ++k) {
+ auto v = Get(Key(k));
+ ASSERT_EQ(v, values[k]);
+ }
+}
+
+TEST(DBTest, PartialCompactionFailure) {
+ Options options;
+ const int kKeySize = 16;
+ const int kKvSize = 1000;
+ const int kKeysPerBuffer = 100;
+ const int kNumL1Files = 5;
+ options.create_if_missing = true;
+ options.write_buffer_size = kKeysPerBuffer * kKvSize;
+ options.max_write_buffer_number = 2;
+ options.target_file_size_base =
+ options.write_buffer_size *
+ (options.max_write_buffer_number - 1);
+ options.level0_file_num_compaction_trigger = kNumL1Files;
+ options.max_bytes_for_level_base =
+ options.level0_file_num_compaction_trigger *
+ options.target_file_size_base;
+ options.max_bytes_for_level_multiplier = 2;
+ options.compression = kNoCompression;
+
+ // The number of NewWritableFiles calls required by each operation.
+ const int kNumInitialNewWritableFiles = 4;
+ const int kNumLevel0FlushNewWritableFiles =
+ options.level0_file_num_compaction_trigger * 2;
+ const int kNumLevel1NewWritableFiles =
+ options.level0_file_num_compaction_trigger + 1;
+ // This setting will make one of the file-creation fail
+ // in the first L0 -> L1 compaction while making sure
+ // all flushes succeeed.
+ env_->periodic_non_writable_ =
+ kNumInitialNewWritableFiles + kNumLevel0FlushNewWritableFiles +
+ kNumLevel1NewWritableFiles - 3;
+ options.env = env_;
+
+ DestroyAndReopen(&options);
+
+ const int kNumKeys =
+ options.level0_file_num_compaction_trigger *
+ (options.max_write_buffer_number - 1) *
+ kKeysPerBuffer * 1.0;
+
+ Random rnd(301);
+ std::vector<std::string> keys;
+ std::vector<std::string> values;
+ for (int k = 0; k < kNumKeys; ++k) {
+ keys.emplace_back(RandomString(&rnd, kKeySize));
+ values.emplace_back(RandomString(&rnd, kKvSize - kKeySize));
+ ASSERT_OK(Put(Slice(keys[k]), Slice(values[k])));
+ }
+
+ dbfull()->TEST_WaitForFlushMemTable();
+ // Make sure the number of L0 files can trigger compaction.
+ ASSERT_GE(NumTableFilesAtLevel(0),
+ options.level0_file_num_compaction_trigger);
+ auto previous_num_level0_files = NumTableFilesAtLevel(0);
+ // Expect compaction to fail here as one file will fail its
+ // creation.
+ dbfull()->TEST_WaitForCompact();
+ // Verify L0 -> L1 compaction does fail.
+ ASSERT_EQ(NumTableFilesAtLevel(1), 0);
+ // Verify all L0 files are still there.
+ ASSERT_EQ(NumTableFilesAtLevel(0), previous_num_level0_files);
+
+ // All key-values must exist after compaction fails.
+ for (int k = 0; k < kNumKeys; ++k) {
+ ASSERT_EQ(values[k], Get(keys[k]));
+ }
+
+ // Make sure RocksDB will not get into corrupted state.
+ Reopen(&options);
+
+ // Verify again after reopen.
+ for (int k = 0; k < kNumKeys; ++k) {
+ ASSERT_EQ(values[k], Get(keys[k]));
+ }
+}
+
+
TEST(DBTest, DynamicCompactionOptions) {
const uint64_t k64KB = 1 << 16;
const uint64_t k128KB = 1 << 17;
options.max_bytes_for_level_multiplier = 4;
DestroyAndReopen(&options);
- auto gen_l0_kb = [this](int start, int size, int stride = 1) {
+ auto gen_l0_kb = [this](int start, int size, int stride) {
Random rnd(301);
std::vector<std::string> values;
for (int i = 0; i < size; i++) {
// Write 3 files that have the same key range, trigger compaction and
// result in one L1 file
- gen_l0_kb(0, 128);
+ gen_l0_kb(0, 128, 1);
ASSERT_EQ(NumTableFilesAtLevel(0), 1);
- gen_l0_kb(0, 128);
+ gen_l0_kb(0, 128, 1);
ASSERT_EQ(NumTableFilesAtLevel(0), 2);
- gen_l0_kb(0, 128);
+ gen_l0_kb(0, 128, 1);
dbfull()->TEST_WaitForCompact();
ASSERT_EQ("0,1", FilesPerLevel());
std::vector<LiveFileMetaData> metadata;
{"target_file_size_base", "65536"}
}));
- gen_l0_kb(0, 128);
+ gen_l0_kb(0, 128, 1);
ASSERT_EQ("1,1", FilesPerLevel());
- gen_l0_kb(0, 128);
+ gen_l0_kb(0, 128, 1);
dbfull()->TEST_WaitForCompact();
ASSERT_EQ("0,2", FilesPerLevel());
metadata.clear();