#include "db/db_test_util.h"
#include "port/stack_trace.h"
+#include "util/fault_injection_test_env.h"
#include "util/sync_point.h"
namespace rocksdb {
#endif // ROCKSDB_LITE
}
+TEST_F(DBFlushTest, SyncFail) {
+ std::unique_ptr<FaultInjectionTestEnv> fault_injection_env(
+ new FaultInjectionTestEnv(Env::Default()));
+ Options options;
+ options.disable_auto_compactions = true;
+ options.env = fault_injection_env.get();
+
+ SyncPoint::GetInstance()->LoadDependency(
+ {{"DBFlushTest::SyncFail:1", "DBImpl::SyncClosedLogs:Start"},
+ {"DBImpl::SyncClosedLogs:Failed", "DBFlushTest::SyncFail:2"}});
+ SyncPoint::GetInstance()->EnableProcessing();
+
+ Reopen(options);
+ Put("key", "value");
+ auto* cfd =
+ reinterpret_cast<ColumnFamilyHandleImpl*>(db_->DefaultColumnFamily())
+ ->cfd();
+ int refs_before = cfd->current()->TEST_refs();
+ FlushOptions flush_options;
+ flush_options.wait = false;
+ ASSERT_OK(dbfull()->Flush(flush_options));
+ fault_injection_env->SetFilesystemActive(false);
+ TEST_SYNC_POINT("DBFlushTest::SyncFail:1");
+ TEST_SYNC_POINT("DBFlushTest::SyncFail:2");
+ fault_injection_env->SetFilesystemActive(true);
+ dbfull()->TEST_WaitForFlushMemTable();
+ ASSERT_EQ("", FilesPerLevel()); // flush failed.
+ // Flush job should release ref count to current version.
+ ASSERT_EQ(refs_before, cfd->current()->TEST_refs());
+ Destroy(options);
+}
+
} // namespace rocksdb
int main(int argc, char** argv) {
}
Status DBImpl::SyncClosedLogs(JobContext* job_context) {
+ TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Start");
mutex_.AssertHeld();
autovector<log::Writer*, 1> logs_to_sync;
uint64_t current_log_number = logfile_number_;
MarkLogsSynced(current_log_number - 1, true, s);
if (!s.ok()) {
bg_error_ = s;
+ TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Failed");
return s;
}
}
// is unlocked by the current thread.
if (s.ok()) {
s = flush_job.Run(&file_meta);
+ } else {
+ flush_job.Cancel();
}
if (s.ok()) {
++it;
}
}
- assert(logs_.empty() || logs_[0].number > up_to ||
+ assert(!status.ok() || logs_.empty() || logs_[0].number > up_to ||
(logs_.size() == 1 && !logs_[0].getting_synced));
log_sync_cv_.SignalAll();
}
return s;
}
+void FlushJob::Cancel() {
+ db_mutex_->AssertHeld();
+ assert(base_ != nullptr);
+ base_->Unref();
+}
+
Status FlushJob::WriteLevel0Table() {
AutoThreadOperationStageUpdater stage_updater(
ThreadStatus::STAGE_FLUSH_WRITE_L0);
~FlushJob();
- // Require db_mutex held
+ // Require db_mutex held.
+ // Once PickMemTable() is called, either Run() or Cancel() has to be call.
void PickMemTable();
Status Run(FileMetaData* file_meta = nullptr);
+ void Cancel();
TableProperties GetTableProperties() const { return table_properties_; }
private: