]> git-server-git.apps.pok.os.sepia.ceph.com Git - rocksdb.git/commitdiff
Flush job should release reference current version if sync log failed
authorYi Wu <yiwu@fb.com>
Fri, 20 Jan 2017 07:03:45 +0000 (23:03 -0800)
committerYi Wu <yiwu@fb.com>
Fri, 20 Jan 2017 18:47:01 +0000 (10:47 -0800)
Summary:
Fix the bug when sync log fail, FlushJob::Run() will not be execute and
reference to cfd->current() will not be release.
Closes https://github.com/facebook/rocksdb/pull/1792

Differential Revision: D4441316

Pulled By: yiwu-arbug

fbshipit-source-id: 5523e28

db/db_flush_test.cc
db/db_impl.cc
db/flush_job.cc
db/flush_job.h
db/version_set.h
util/fault_injection_test_env.cc

index ab4b1ab4c3fce71a1014b41cfac8ca231c794e1b..3bacb513f207d853707559400d911489a6201db0 100644 (file)
@@ -9,6 +9,7 @@
 
 #include "db/db_test_util.h"
 #include "port/stack_trace.h"
+#include "util/fault_injection_test_env.h"
 #include "util/sync_point.h"
 
 namespace rocksdb {
@@ -47,6 +48,38 @@ TEST_F(DBFlushTest, FlushWhileWritingManifest) {
 #endif  // ROCKSDB_LITE
 }
 
+TEST_F(DBFlushTest, SyncFail) {
+  std::unique_ptr<FaultInjectionTestEnv> fault_injection_env(
+      new FaultInjectionTestEnv(Env::Default()));
+  Options options;
+  options.disable_auto_compactions = true;
+  options.env = fault_injection_env.get();
+
+  SyncPoint::GetInstance()->LoadDependency(
+      {{"DBFlushTest::SyncFail:1", "DBImpl::SyncClosedLogs:Start"},
+       {"DBImpl::SyncClosedLogs:Failed", "DBFlushTest::SyncFail:2"}});
+  SyncPoint::GetInstance()->EnableProcessing();
+
+  Reopen(options);
+  Put("key", "value");
+  auto* cfd =
+      reinterpret_cast<ColumnFamilyHandleImpl*>(db_->DefaultColumnFamily())
+          ->cfd();
+  int refs_before = cfd->current()->TEST_refs();
+  FlushOptions flush_options;
+  flush_options.wait = false;
+  ASSERT_OK(dbfull()->Flush(flush_options));
+  fault_injection_env->SetFilesystemActive(false);
+  TEST_SYNC_POINT("DBFlushTest::SyncFail:1");
+  TEST_SYNC_POINT("DBFlushTest::SyncFail:2");
+  fault_injection_env->SetFilesystemActive(true);
+  dbfull()->TEST_WaitForFlushMemTable();
+  ASSERT_EQ("", FilesPerLevel());  // flush failed.
+  // Flush job should release ref count to current version.
+  ASSERT_EQ(refs_before, cfd->current()->TEST_refs());
+  Destroy(options);
+}
+
 }  // namespace rocksdb
 
 int main(int argc, char** argv) {
index f884701cdaf5277e23a9581b67410b9b7ab0a794..e8e83626b2beab18e71d434f3bb5b551d058c6b0 100644 (file)
@@ -1842,6 +1842,7 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
 }
 
 Status DBImpl::SyncClosedLogs(JobContext* job_context) {
+  TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Start");
   mutex_.AssertHeld();
   autovector<log::Writer*, 1> logs_to_sync;
   uint64_t current_log_number = logfile_number_;
@@ -1878,6 +1879,7 @@ Status DBImpl::SyncClosedLogs(JobContext* job_context) {
     MarkLogsSynced(current_log_number - 1, true, s);
     if (!s.ok()) {
       bg_error_ = s;
+      TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Failed");
       return s;
     }
   }
@@ -1928,6 +1930,8 @@ Status DBImpl::FlushMemTableToOutputFile(
   // is unlocked by the current thread.
   if (s.ok()) {
     s = flush_job.Run(&file_meta);
+  } else {
+    flush_job.Cancel();
   }
 
   if (s.ok()) {
@@ -2762,7 +2766,7 @@ void DBImpl::MarkLogsSynced(
       ++it;
     }
   }
-  assert(logs_.empty() || logs_[0].number > up_to ||
+  assert(!status.ok() || logs_.empty() || logs_[0].number > up_to ||
          (logs_.size() == 1 && !logs_[0].getting_synced));
   log_sync_cv_.SignalAll();
 }
index 33c295a5f058de34f05d27c5e2e60a6dd78cbd72..e957fd7af11a0a5bada1831915cc7327c3f99a0a 100644 (file)
@@ -230,6 +230,12 @@ Status FlushJob::Run(FileMetaData* file_meta) {
   return s;
 }
 
+void FlushJob::Cancel() {
+  db_mutex_->AssertHeld();
+  assert(base_ != nullptr);
+  base_->Unref();
+}
+
 Status FlushJob::WriteLevel0Table() {
   AutoThreadOperationStageUpdater stage_updater(
       ThreadStatus::STAGE_FLUSH_WRITE_L0);
index 5a3229cb60c7fe38542233402595322d44ec5cfd..31672dd2796204f59ea2978fca01dfd7c7fee4d5 100644 (file)
@@ -67,9 +67,11 @@ class FlushJob {
 
   ~FlushJob();
 
-  // Require db_mutex held
+  // Require db_mutex held.
+  // Once PickMemTable() is called, either Run() or Cancel() has to be call.
   void PickMemTable();
   Status Run(FileMetaData* file_meta = nullptr);
+  void Cancel();
   TableProperties GetTableProperties() const { return table_properties_; }
 
  private:
index 0d7b85e8ccce26ce036f7940a4eae467d3a8a7b2..08ed9620157d535a1c31c6d828c5f5af3440840a 100644 (file)
@@ -520,6 +520,8 @@ class Version {
     return next_;
   }
 
+  int TEST_refs() const { return refs_; }
+
   VersionStorageInfo* storage_info() { return &storage_info_; }
 
   VersionSet* version_set() { return vset_; }
index e0a39c5c79ce69fb720960040d43d85765ef3e5b..dd41e5b07829e1472cf68cb41a7a9a59c2a1ac72 100644 (file)
@@ -149,7 +149,7 @@ Status TestWritableFile::Flush() {
 
 Status TestWritableFile::Sync() {
   if (!env_->IsFilesystemActive()) {
-    return Status::OK();
+    return Status::IOError("FaultInjectionTestEnv: not active");
   }
   // No need to actual sync.
   state_.pos_at_last_sync_ = state_.pos_;