#include "util/logging.h"
#include "util/mock_env.h"
#include "util/mutexlock.h"
+#include "util/sync_point.h"
#include "util/testharness.h"
#include "util/testutil.h"
Status NewWritableFile(const std::string& fname,
unique_ptr<WritableFile>* result,
const EnvOptions& soptions) override {
+ if (!IsFilesystemActive()) {
+ return Status::Corruption("Not Active");
+ }
+ // Not allow overwriting files
+ if (target()->FileExists(fname)) {
+ return Status::Corruption("File already exists.");
+ }
Status s = target()->NewWritableFile(fname, result, soptions);
if (s.ok()) {
result->reset(new TestWritableFile(fname, std::move(*result), this));
}
virtual Status DeleteFile(const std::string& f) override {
+ if (!IsFilesystemActive()) {
+ return Status::Corruption("Not Active");
+ }
Status s = EnvWrapper::DeleteFile(f);
if (!s.ok()) {
fprintf(stderr, "Cannot delete file %s: %s\n", f.c_str(),
virtual Status RenameFile(const std::string& s,
const std::string& t) override {
+ if (!IsFilesystemActive()) {
+ return Status::Corruption("Not Active");
+ }
Status ret = EnvWrapper::RenameFile(s, t);
if (ret.ok()) {
}
Status TestWritableFile::Append(const Slice& data) {
+ if (!env_->IsFilesystemActive()) {
+ return Status::Corruption("Not Active");
+ }
Status s = target_->Append(data);
- if (s.ok() && env_->IsFilesystemActive()) {
+ if (s.ok()) {
state_.pos_ += data.size();
}
return s;
ASSERT_OK(s);
}
- void Build(const WriteOptions& write_options, int start_idx, int num_vals) {
+ void Build(const WriteOptions& write_options, int start_idx, int num_vals,
+ bool sequential = true) {
std::string key_space, value_space;
WriteBatch batch;
for (int i = start_idx; i < start_idx + num_vals; i++) {
- Slice key = Key(i, &key_space);
+ Slice key = Key(sequential, i, &key_space);
batch.Clear();
batch.Put(key, Value(i, &value_space));
ASSERT_OK(db_->Write(write_options, &batch));
}
}
- Status ReadValue(int i, std::string* val) const {
+ Status ReadValue(int i, std::string* val, bool sequential) const {
std::string key_space, value_space;
- Slice key = Key(i, &key_space);
+ Slice key = Key(sequential, i, &key_space);
Value(i, &value_space);
ReadOptions options;
return db_->Get(options, key, val);
}
- Status Verify(int start_idx, int num_vals,
- ExpectedVerifResult expected) const {
+ Status Verify(int start_idx, int num_vals, ExpectedVerifResult expected,
+ bool seqeuntial = true) const {
std::string val;
std::string value_space;
Status s;
for (int i = start_idx; i < start_idx + num_vals && s.ok(); i++) {
Value(i, &value_space);
- s = ReadValue(i, &val);
+ s = ReadValue(i, &val, seqeuntial);
if (s.ok()) {
EXPECT_EQ(value_space, val);
}
}
// Return the ith key
- Slice Key(int i, std::string* storage) const {
+ Slice Key(bool seqeuntial, int i, std::string* storage) const {
+ int num = i;
+ if (!seqeuntial) {
+ // random transfer
+ const int m = 0x5bd1e995;
+ num *= m;
+ num ^= num << 24;
+ }
char buf[100];
- snprintf(buf, sizeof(buf), "%016d", i);
+ snprintf(buf, sizeof(buf), "%016d", num);
storage->assign(buf, strlen(buf));
return Slice(*storage);
}
write_options.sync = false;
std::string key_space, value_space;
- ASSERT_OK(
- db_->Put(write_options, Key(1, &key_space), Value(1, &value_space)));
+ ASSERT_OK(db_->Put(write_options, Key(true, 1, &key_space),
+ Value(1, &value_space)));
FlushOptions flush_options;
flush_options.wait = false;
ASSERT_OK(db_->Flush(flush_options));
write_options.sync = true;
- ASSERT_OK(
- db_->Put(write_options, Key(2, &key_space), Value(2, &value_space)));
+ ASSERT_OK(db_->Put(write_options, Key(true, 2, &key_space),
+ Value(2, &value_space)));
env_->SetFilesystemActive(false);
NoWriteTestReopenWithFault(kResetDropAndDeleteUnsynced);
ASSERT_OK(OpenDB());
std::string val;
Value(2, &value_space);
- ASSERT_OK(ReadValue(2, &val));
+ ASSERT_OK(ReadValue(2, &val, true));
ASSERT_EQ(value_space, val);
Value(1, &value_space);
- ASSERT_OK(ReadValue(1, &val));
+ ASSERT_OK(ReadValue(1, &val, true));
ASSERT_EQ(value_space, val);
}
+TEST_F(FaultInjectionTest, UninstalledCompaction) {
+ options_.target_file_size_base = 32 * 1024;
+ options_.write_buffer_size = 100 << 10; // 100KB
+ options_.level0_file_num_compaction_trigger = 6;
+ options_.level0_stop_writes_trigger = 1 << 10;
+ options_.level0_slowdown_writes_trigger = 1 << 10;
+ options_.max_background_compactions = 1;
+ OpenDB();
+
+ rocksdb::SyncPoint::GetInstance()->LoadDependency({
+ {"FaultInjectionTest::FaultTest:0", "DBImpl::BGWorkCompaction"},
+ {"CompactionJob::Run():End", "FaultInjectionTest::FaultTest:1"},
+ {"FaultInjectionTest::FaultTest:2",
+ "DBImpl::BackgroundCompaction:NonTrivial:AfterRun"},
+ });
+ rocksdb::SyncPoint::GetInstance()->EnableProcessing();
+
+ int kNumKeys = 1000;
+ Build(WriteOptions(), 0, kNumKeys, false);
+ FlushOptions flush_options;
+ flush_options.wait = true;
+ db_->Flush(flush_options);
+ ASSERT_OK(db_->Put(WriteOptions(), "", ""));
+ TEST_SYNC_POINT("FaultInjectionTest::FaultTest:0");
+ TEST_SYNC_POINT("FaultInjectionTest::FaultTest:1");
+ env_->SetFilesystemActive(false);
+ TEST_SYNC_POINT("FaultInjectionTest::FaultTest:2");
+ CloseDB();
+ rocksdb::SyncPoint::GetInstance()->DisableProcessing();
+ ResetDBState(kResetDropUnsyncedData);
+
+ std::atomic<bool> opened(false);
+ rocksdb::SyncPoint::GetInstance()->SetCallBack(
+ "DBImpl::Open:Opened", [&](void* arg) { opened.store(true); });
+ rocksdb::SyncPoint::GetInstance()->SetCallBack(
+ "DBImpl::BGWorkCompaction",
+ [&](void* arg) { ASSERT_TRUE(opened.load()); });
+ rocksdb::SyncPoint::GetInstance()->EnableProcessing();
+ ASSERT_OK(OpenDB());
+ static_cast<DBImpl*>(db_)->TEST_WaitForCompact();
+ ASSERT_OK(Verify(0, kNumKeys, FaultInjectionTest::kValExpectFound, false));
+ ASSERT_OK(db_->Put(WriteOptions(), "", ""));
+ rocksdb::SyncPoint::GetInstance()->DisableProcessing();
+}
+
} // namespace rocksdb
int main(int argc, char** argv) {