]> git-server-git.apps.pok.os.sepia.ceph.com Git - rocksdb.git/commitdiff
Fix data race to VersionSet::io_status_ (#7034)
authorYanqin Jin <yanqin@fb.com>
Sat, 27 Jun 2020 15:55:49 +0000 (08:55 -0700)
committerAndrew Kryczka <andrewkr@fb.com>
Thu, 9 Jul 2020 22:51:01 +0000 (15:51 -0700)
Summary:
After https://github.com/facebook/rocksdb/issues/6949 , VersionSet::io_status_ can be concurrently accessed by multiple
threads without lock, causing tsan test to fail. For example, a bg flush thread
resets io_status_ before calling LogAndApply(), while another thread already in
the process of LogAndApply() reads io_status_. This is a bug.

We do not have to reset io_status_ each time we call LogAndApply(). io_status_
is part of the state of VersionSet, and it indicates the outcome of preceding
MANIFEST/CURRENT files IO operations. Its value should be updated only when:

1. MANIFEST/CURRENT files IO fail for the first time.
2. MANIFEST/CURRENT files IO succeed as part of recovering from a prior
   failure without process restart, e.g. calling Resume().

Test Plan (devserver):
COMPILE_WITH_TSAN=1 make check
COMPILE_WITH_TSAN=1 make db_test2
./db_test2 --gtest_filter=DBTest2.CompactionStall
Pull Request resolved: https://github.com/facebook/rocksdb/pull/7034

Reviewed By: zhichao-cao

Differential Revision: D22247137

Pulled By: riversand963

fbshipit-source-id: 77b83e05390f3ee3cd2d96d3fdd6fe4f225e3216

db/compaction/compaction_job.cc
db/db_impl/db_impl_compaction_flush.cc
db/memtable_list.cc
db/version_set.cc
db/version_set.h

index 46e685abd64b0b2bcfe7843710ae2a9c9b05fc16..9908bb180150d9f9a9bc94edfb4630242415d166 100644 (file)
@@ -721,7 +721,6 @@ Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options) {
   cfd->internal_stats()->AddCompactionStats(
       compact_->compaction->output_level(), thread_pri_, compaction_stats_);
 
-  versions_->SetIOStatus(IOStatus::OK());
   if (status.ok()) {
     status = InstallCompactionResults(mutable_cf_options);
   }
index 1ab4cf523a1cf26f2e418dddee038a2357a684c2..f93d70551ab5327ba833bea9797b3a99e5a3a2df 100644 (file)
@@ -2703,7 +2703,6 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
     for (const auto& f : *c->inputs(0)) {
       c->edit()->DeleteFile(c->level(), f->fd.GetNumber());
     }
-    versions_->SetIOStatus(IOStatus::OK());
     status = versions_->LogAndApply(c->column_family_data(),
                                     *c->mutable_cf_options(), c->edit(),
                                     &mutex_, directories_.GetDbDir());
@@ -2761,7 +2760,6 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
       }
     }
 
-    versions_->SetIOStatus(IOStatus::OK());
     status = versions_->LogAndApply(c->column_family_data(),
                                     *c->mutable_cf_options(), c->edit(),
                                     &mutex_, directories_.GetDbDir());
index 89de07f3d05eecb05e76ce2b6f8a6d790f4fbef8..f38ead559135d3dfd2d7f0c407e135e2bfbf7193 100644 (file)
@@ -470,7 +470,6 @@ Status MemTableList::TryInstallMemtableFlushResults(
       }
 
       // this can release and reacquire the mutex.
-      vset->SetIOStatus(IOStatus::OK());
       s = vset->LogAndApply(cfd, mutable_cf_options, edit_list, mu,
                             db_directory);
       *io_s = vset->io_status();
index 8696b57f35ea455581348b53eb9f556279b08a5a..db282a51e1b1f22e8f0d3ece45304011cf06fb96 100644 (file)
@@ -3878,10 +3878,6 @@ Status VersionSet::ProcessManifestWrites(
   }
 #endif  // NDEBUG
 
-  uint64_t new_manifest_file_size = 0;
-  Status s;
-  IOStatus io_s;
-
   assert(pending_manifest_file_number_ == 0);
   if (!descriptor_log_ ||
       manifest_file_size_ > db_options_->max_manifest_file_size) {
@@ -3911,6 +3907,9 @@ Status VersionSet::ProcessManifestWrites(
     }
   }
 
+  uint64_t new_manifest_file_size = 0;
+  Status s;
+  IOStatus io_s;
   {
     FileOptions opt_file_opts = fs_->OptimizeForManifestWrite(file_options_);
     mu->Unlock();
@@ -3947,9 +3946,9 @@ Status VersionSet::ProcessManifestWrites(
       std::string descriptor_fname =
           DescriptorFileName(dbname_, pending_manifest_file_number_);
       std::unique_ptr<FSWritableFile> descriptor_file;
-      s = NewWritableFile(fs_, descriptor_fname, &descriptor_file,
-                          opt_file_opts);
-      if (s.ok()) {
+      io_s = NewWritableFile(fs_, descriptor_fname, &descriptor_file,
+                             opt_file_opts);
+      if (io_s.ok()) {
         descriptor_file->SetPreallocationBlockSize(
             db_options_->manifest_preallocation_size);
 
@@ -3958,7 +3957,10 @@ Status VersionSet::ProcessManifestWrites(
             nullptr, db_options_->listeners));
         descriptor_log_.reset(
             new log::Writer(std::move(file_writer), 0, false));
-        s = WriteCurrentStateToManifest(curr_state, descriptor_log_.get());
+        s = WriteCurrentStateToManifest(curr_state, descriptor_log_.get(),
+                                        io_s);
+      } else {
+        s = io_s;
       }
     }
 
@@ -3994,7 +3996,6 @@ Status VersionSet::ProcessManifestWrites(
 #endif /* !NDEBUG */
         io_s = descriptor_log_->AddRecord(record);
         if (!io_s.ok()) {
-          io_status_ = io_s;
           s = io_s;
           break;
         }
@@ -4005,12 +4006,9 @@ Status VersionSet::ProcessManifestWrites(
             "VersionSet::ProcessManifestWrites:AfterSyncManifest", &io_s);
       }
       if (!io_s.ok()) {
-        io_status_ = io_s;
         s = io_s;
         ROCKS_LOG_ERROR(db_options_->info_log, "MANIFEST write %s\n",
                         s.ToString().c_str());
-      } else if (io_status_.IsIOError()) {
-        io_status_ = io_s;
       }
     }
 
@@ -4020,10 +4018,7 @@ Status VersionSet::ProcessManifestWrites(
       io_s = SetCurrentFile(fs_, dbname_, pending_manifest_file_number_,
                             db_directory);
       if (!io_s.ok()) {
-        io_status_ = io_s;
         s = io_s;
-      } else if (io_status_.IsIOError()) {
-        io_status_ = io_s;
       }
       TEST_SYNC_POINT("VersionSet::ProcessManifestWrites:AfterNewManifest");
     }
@@ -4044,6 +4039,14 @@ Status VersionSet::ProcessManifestWrites(
     mu->Lock();
   }
 
+  if (!io_s.ok()) {
+    if (io_status_.ok()) {
+      io_status_ = io_s;
+    }
+  } else if (!io_status_.ok()) {
+    io_status_ = io_s;
+  }
+
   // Append the old manifest file to the obsolete_manifest_ list to be deleted
   // by PurgeObsoleteFiles later.
   if (s.ok() && new_descriptor_log) {
@@ -5297,7 +5300,7 @@ void VersionSet::MarkMinLogNumberToKeep2PC(uint64_t number) {
 
 Status VersionSet::WriteCurrentStateToManifest(
     const std::unordered_map<uint32_t, MutableCFState>& curr_state,
-    log::Writer* log) {
+    log::Writer* log, IOStatus& io_s) {
   // TODO: Break up into multiple records to reduce memory usage on recovery?
 
   // WARNING: This method doesn't hold a mutex!!
@@ -5306,6 +5309,7 @@ Status VersionSet::WriteCurrentStateToManifest(
   // LogAndApply. Column family manipulations can only happen within LogAndApply
   // (the same single thread), so we're safe to iterate.
 
+  assert(io_s.ok());
   if (db_options_->write_dbid_to_manifest) {
     VersionEdit edit_for_db_id;
     assert(!db_id_.empty());
@@ -5315,10 +5319,9 @@ Status VersionSet::WriteCurrentStateToManifest(
       return Status::Corruption("Unable to Encode VersionEdit:" +
                                 edit_for_db_id.DebugString(true));
     }
-    IOStatus io_s = log->AddRecord(db_id_record);
+    io_s = log->AddRecord(db_id_record);
     if (!io_s.ok()) {
-      io_status_ = io_s;
-      return std::move(io_s);
+      return io_s;
     }
   }
 
@@ -5345,10 +5348,9 @@ Status VersionSet::WriteCurrentStateToManifest(
         return Status::Corruption(
             "Unable to Encode VersionEdit:" + edit.DebugString(true));
       }
-      IOStatus io_s = log->AddRecord(record);
+      io_s = log->AddRecord(record);
       if (!io_s.ok()) {
-        io_status_ = io_s;
-        return std::move(io_s);
+        return io_s;
       }
     }
 
@@ -5398,10 +5400,9 @@ Status VersionSet::WriteCurrentStateToManifest(
         return Status::Corruption(
             "Unable to Encode VersionEdit:" + edit.DebugString(true));
       }
-      IOStatus io_s = log->AddRecord(record);
+      io_s = log->AddRecord(record);
       if (!io_s.ok()) {
-        io_status_ = io_s;
-        return std::move(io_s);
+        return io_s;
       }
     }
   }
index 16661e097731ab5ca91a53474aa7493cb332713d..829a7d0cf885bc7eb8f7a533fddf2a34bf1a34f3 100644 (file)
@@ -1159,10 +1159,7 @@ class VersionSet {
   static uint64_t GetTotalSstFilesSize(Version* dummy_versions);
 
   // Get the IO Status returned by written Manifest.
-  IOStatus io_status() const { return io_status_; }
-
-  // Set the IO Status to OK. Called before Manifest write if needed.
-  void SetIOStatus(const IOStatus& s) { io_status_ = s; }
+  const IOStatus& io_status() const { return io_status_; }
 
  protected:
   using VersionBuilderMap =
@@ -1205,7 +1202,7 @@ class VersionSet {
   // Save current contents to *log
   Status WriteCurrentStateToManifest(
       const std::unordered_map<uint32_t, MutableCFState>& curr_state,
-      log::Writer* log);
+      log::Writer* log, IOStatus& io_s);
 
   void AppendVersion(ColumnFamilyData* column_family_data, Version* v);