]> git-server-git.apps.pok.os.sepia.ceph.com Git - rocksdb.git/commitdiff
delete superversions in BackgroundCallPurge (#6146)
author解轶伦 <xylnasos@qq.com>
Tue, 17 Dec 2019 21:20:42 +0000 (13:20 -0800)
committerYanqin Jin <yanqin@fb.com>
Thu, 2 Jan 2020 20:20:29 +0000 (12:20 -0800)
Summary:
I found that CleanupSuperVersion() may block Get() for 30ms+ (per MemTable is 256MB).

Then I found "delete sv" in ~SuperVersion() takes the time.

The backtrace looks like this

DBImpl::GetImpl() -> DBImpl::ReturnAndCleanupSuperVersion() ->
DBImpl::CleanupSuperVersion() : delete sv; -> ~SuperVersion()

I think it's better to delete in a background thread,  please review it。
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6146

Differential Revision: D18972066

fbshipit-source-id: 0f7b0b70b9bb1e27ad6fc1c8a408fbbf237ae08c

db/arena_wrapped_db_iter.cc
db/column_family.cc
db/column_family.h
db/db_impl/db_impl.cc
db/db_impl/db_impl.h
db/db_impl/db_impl_compaction_flush.cc
db/db_impl/db_impl_secondary.cc
db/db_test2.cc
db/forward_iterator.cc
include/rocksdb/options.h

index 840c20e9e4c55ae06ba1e3a9bd8dca0a5f117983..c2de8db9e53a6bd845fe6e508217dddaf872961c 100644 (file)
@@ -64,7 +64,7 @@ Status ArenaWrappedDBIter::Refresh() {
     arena_.~Arena();
     new (&arena_) Arena();
 
-    SuperVersion* sv = cfd_->GetReferencedSuperVersion(db_impl_->mutex());
+    SuperVersion* sv = cfd_->GetReferencedSuperVersion(db_impl_);
     if (read_callback_) {
       read_callback_->Refresh(latest_seq);
     }
index f66759818e873d1528d30f44eb3987cbde3639af..fd87dc287702c6cdf6ff2ef087fc22532df7106e 100644 (file)
@@ -1080,9 +1080,8 @@ Compaction* ColumnFamilyData::CompactRange(
   return result;
 }
 
-SuperVersion* ColumnFamilyData::GetReferencedSuperVersion(
-    InstrumentedMutex* db_mutex) {
-  SuperVersion* sv = GetThreadLocalSuperVersion(db_mutex);
+SuperVersion* ColumnFamilyData::GetReferencedSuperVersion(DBImpl* db) {
+  SuperVersion* sv = GetThreadLocalSuperVersion(db);
   sv->Ref();
   if (!ReturnThreadLocalSuperVersion(sv)) {
     // This Unref() corresponds to the Ref() in GetThreadLocalSuperVersion()
@@ -1094,8 +1093,7 @@ SuperVersion* ColumnFamilyData::GetReferencedSuperVersion(
   return sv;
 }
 
-SuperVersion* ColumnFamilyData::GetThreadLocalSuperVersion(
-    InstrumentedMutex* db_mutex) {
+SuperVersion* ColumnFamilyData::GetThreadLocalSuperVersion(DBImpl* db) {
   // The SuperVersion is cached in thread local storage to avoid acquiring
   // mutex when SuperVersion does not change since the last use. When a new
   // SuperVersion is installed, the compaction or flush thread cleans up
@@ -1122,16 +1120,21 @@ SuperVersion* ColumnFamilyData::GetThreadLocalSuperVersion(
 
     if (sv && sv->Unref()) {
       RecordTick(ioptions_.statistics, NUMBER_SUPERVERSION_CLEANUPS);
-      db_mutex->Lock();
+      db->mutex()->Lock();
       // NOTE: underlying resources held by superversion (sst files) might
       // not be released until the next background job.
       sv->Cleanup();
-      sv_to_delete = sv;
+      if (db->immutable_db_options().avoid_unnecessary_blocking_io) {
+        db->AddSuperVersionsToFreeQueue(sv);
+        db->SchedulePurge();
+      } else {
+        sv_to_delete = sv;
+      }
     } else {
-      db_mutex->Lock();
+      db->mutex()->Lock();
     }
     sv = super_version_->Ref();
-    db_mutex->Unlock();
+    db->mutex()->Unlock();
 
     delete sv_to_delete;
   }
index 135504ea21bd847f4c655f44339a7a36bf40c373..a6277d2156b2613ae98c489d43ed6974afa4cd37 100644 (file)
@@ -430,11 +430,11 @@ class ColumnFamilyData {
   SuperVersion* GetSuperVersion() { return super_version_; }
   // thread-safe
   // Return a already referenced SuperVersion to be used safely.
-  SuperVersion* GetReferencedSuperVersion(InstrumentedMutex* db_mutex);
+  SuperVersion* GetReferencedSuperVersion(DBImpl* db);
   // thread-safe
   // Get SuperVersion stored in thread local storage. If it does not exist,
   // get a reference from a current SuperVersion.
-  SuperVersion* GetThreadLocalSuperVersion(InstrumentedMutex* db_mutex);
+  SuperVersion* GetThreadLocalSuperVersion(DBImpl* db);
   // Try to return SuperVersion back to thread local storage. Retrun true on
   // success and false on failure. It fails when the thread local storage
   // contains anything other than SuperVersion::kSVInUse flag.
index ee73cc3fd750292830c77f8567c4d62cc0f51e86..91984736dcdac872f008e0728727bad8b9a9e5b4 100644 (file)
@@ -879,7 +879,7 @@ Status DBImpl::TablesRangeTombstoneSummary(ColumnFamilyHandle* column_family,
           column_family);
   ColumnFamilyData* cfd = cfh->cfd();
 
-  SuperVersion* super_version = cfd->GetReferencedSuperVersion(&mutex_);
+  SuperVersion* super_version = cfd->GetReferencedSuperVersion(this);
   Version* version = super_version->current;
 
   Status s =
@@ -895,7 +895,6 @@ void DBImpl::ScheduleBgLogWriterClose(JobContext* job_context) {
       AddToLogsToFreeQueue(l);
     }
     job_context->logs_to_free.clear();
-    SchedulePurge();
   }
 }
 
@@ -1322,6 +1321,14 @@ void DBImpl::BackgroundCallPurge() {
     delete log_writer;
     mutex_.Lock();
   }
+  while (!superversions_to_free_queue_.empty()) {
+    assert(!superversions_to_free_queue_.empty());
+    SuperVersion* sv = superversions_to_free_queue_.front();
+    superversions_to_free_queue_.pop_front();
+    mutex_.Unlock();
+    delete sv;
+    mutex_.Lock();
+  }
   for (const auto& file : purge_files_) {
     const PurgeFileInfo& purge_file = file.second;
     const std::string& fname = purge_file.fname;
@@ -1374,10 +1381,14 @@ static void CleanupIteratorState(void* arg1, void* /*arg2*/) {
     state->db->FindObsoleteFiles(&job_context, false, true);
     if (state->background_purge) {
       state->db->ScheduleBgLogWriterClose(&job_context);
+      state->db->AddSuperVersionsToFreeQueue(state->super_version);
+      state->db->SchedulePurge();
     }
     state->mu->Unlock();
 
-    delete state->super_version;
+    if (!state->background_purge) {
+      delete state->super_version;
+    }
     if (job_context.HaveSomethingToDelete()) {
       if (state->background_purge) {
         // PurgeObsoleteFiles here does not delete files. Instead, it adds the
@@ -2452,7 +2463,7 @@ Iterator* DBImpl::NewIterator(const ReadOptions& read_options,
     result = nullptr;
 
 #else
-    SuperVersion* sv = cfd->GetReferencedSuperVersion(&mutex_);
+    SuperVersion* sv = cfd->GetReferencedSuperVersion(this);
     auto iter = new ForwardIterator(this, read_options, cfd, sv);
     result = NewDBIterator(
         env_, read_options, *cfd->ioptions(), sv->mutable_cf_options,
@@ -2478,7 +2489,7 @@ ArenaWrappedDBIter* DBImpl::NewIteratorImpl(const ReadOptions& read_options,
                                             ReadCallback* read_callback,
                                             bool allow_blob,
                                             bool allow_refresh) {
-  SuperVersion* sv = cfd->GetReferencedSuperVersion(&mutex_);
+  SuperVersion* sv = cfd->GetReferencedSuperVersion(this);
 
   // Try to generate a DB iterator tree in continuous memory area to be
   // cache friendly. Here is an example of result:
@@ -2557,7 +2568,7 @@ Status DBImpl::NewIterators(
 #else
     for (auto cfh : column_families) {
       auto cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(cfh)->cfd();
-      SuperVersion* sv = cfd->GetReferencedSuperVersion(&mutex_);
+      SuperVersion* sv = cfd->GetReferencedSuperVersion(this);
       auto iter = new ForwardIterator(this, read_options, cfd, sv);
       iterators->push_back(NewDBIterator(
           env_, read_options, *cfd->ioptions(), sv->mutable_cf_options,
@@ -2884,7 +2895,7 @@ bool DBImpl::GetAggregatedIntProperty(const Slice& property,
 
 SuperVersion* DBImpl::GetAndRefSuperVersion(ColumnFamilyData* cfd) {
   // TODO(ljin): consider using GetReferencedSuperVersion() directly
-  return cfd->GetThreadLocalSuperVersion(&mutex_);
+  return cfd->GetThreadLocalSuperVersion(this);
 }
 
 // REQUIRED: this function should only be called on the write thread or if the
@@ -2902,11 +2913,19 @@ SuperVersion* DBImpl::GetAndRefSuperVersion(uint32_t column_family_id) {
 void DBImpl::CleanupSuperVersion(SuperVersion* sv) {
   // Release SuperVersion
   if (sv->Unref()) {
+    bool defer_purge =
+            immutable_db_options().avoid_unnecessary_blocking_io;
     {
       InstrumentedMutexLock l(&mutex_);
       sv->Cleanup();
+      if (defer_purge) {
+        AddSuperVersionsToFreeQueue(sv);
+        SchedulePurge();
+      }
+    }
+    if (!defer_purge) {
+      delete sv;
     }
-    delete sv;
     RecordTick(stats_, NUMBER_SUPERVERSION_CLEANUPS);
   }
   RecordTick(stats_, NUMBER_SUPERVERSION_RELEASES);
@@ -3912,7 +3931,7 @@ Status DBImpl::IngestExternalFiles(
     start_file_number += args[i - 1].external_files.size();
     auto* cfd =
         static_cast<ColumnFamilyHandleImpl*>(args[i].column_family)->cfd();
-    SuperVersion* super_version = cfd->GetReferencedSuperVersion(&mutex_);
+    SuperVersion* super_version = cfd->GetReferencedSuperVersion(this);
     exec_results[i].second = ingestion_jobs[i].Prepare(
         args[i].external_files, start_file_number, super_version);
     exec_results[i].first = true;
@@ -3923,7 +3942,7 @@ Status DBImpl::IngestExternalFiles(
   {
     auto* cfd =
         static_cast<ColumnFamilyHandleImpl*>(args[0].column_family)->cfd();
-    SuperVersion* super_version = cfd->GetReferencedSuperVersion(&mutex_);
+    SuperVersion* super_version = cfd->GetReferencedSuperVersion(this);
     exec_results[0].second = ingestion_jobs[0].Prepare(
         args[0].external_files, next_file_number, super_version);
     exec_results[0].first = true;
@@ -4192,7 +4211,7 @@ Status DBImpl::CreateColumnFamilyWithImport(
   dummy_sv_ctx.Clean();
 
   if (status.ok()) {
-    SuperVersion* sv = cfd->GetReferencedSuperVersion(&mutex_);
+    SuperVersion* sv = cfd->GetReferencedSuperVersion(this);
     status = import_job.Prepare(next_file_number, sv);
     CleanupSuperVersion(sv);
   }
@@ -4269,7 +4288,7 @@ Status DBImpl::VerifyChecksum(const ReadOptions& read_options) {
   }
   std::vector<SuperVersion*> sv_list;
   for (auto cfd : cfd_list) {
-    sv_list.push_back(cfd->GetReferencedSuperVersion(&mutex_));
+    sv_list.push_back(cfd->GetReferencedSuperVersion(this));
   }
   for (auto& sv : sv_list) {
     VersionStorageInfo* vstorage = sv->current->storage_info();
@@ -4294,14 +4313,23 @@ Status DBImpl::VerifyChecksum(const ReadOptions& read_options) {
       break;
     }
   }
+  bool defer_purge =
+          immutable_db_options().avoid_unnecessary_blocking_io;
   {
     InstrumentedMutexLock l(&mutex_);
     for (auto sv : sv_list) {
       if (sv && sv->Unref()) {
         sv->Cleanup();
-        delete sv;
+        if (defer_purge) {
+          AddSuperVersionsToFreeQueue(sv);
+        } else {
+          delete sv;
+        }
       }
     }
+    if (defer_purge) {
+      SchedulePurge();
+    }
     for (auto cfd : cfd_list) {
       cfd->Unref();
     }
index 67a81259da9122916e1370d40e09e87892fc946e..478a9253286a7de2d145fb9e3b32bac5476d8ef7 100644 (file)
@@ -798,6 +798,10 @@ class DBImpl : public DB {
     logs_to_free_queue_.push_back(log_writer);
   }
 
+  void AddSuperVersionsToFreeQueue(SuperVersion* sv) {
+    superversions_to_free_queue_.push_back(sv);
+  }
+
   void SetSnapshotChecker(SnapshotChecker* snapshot_checker);
 
   // Fill JobContext with snapshot information needed by flush and compaction.
@@ -1890,6 +1894,7 @@ class DBImpl : public DB {
 
   // A queue to store log writers to close
   std::deque<log::Writer*> logs_to_free_queue_;
+  std::deque<SuperVersion*> superversions_to_free_queue_;
   int unscheduled_flushes_;
   int unscheduled_compactions_;
 
index b01fdbc965c48be32026e0a6935f7aa511c48ac9..f399b97d119e96d5b9487e1fed45556003a011fa 100644 (file)
@@ -659,7 +659,7 @@ Status DBImpl::CompactRange(const CompactRangeOptions& options,
     // one/both sides of the interval are unbounded. But it requires more
     // changes to RangesOverlapWithMemtables.
     Range range(*begin, *end);
-    SuperVersion* super_version = cfd->GetReferencedSuperVersion(&mutex_);
+    SuperVersion* super_version = cfd->GetReferencedSuperVersion(this);
     cfd->RangesOverlapWithMemtables({range}, super_version, &flush_needed);
     CleanupSuperVersion(super_version);
   }
index a113019a9283196dc055c075e34b50ef5618f7ac..4c5b4e7946839a5b64208aeef22ffee3caee9bf6 100644 (file)
@@ -406,7 +406,7 @@ ArenaWrappedDBIter* DBImplSecondary::NewIteratorImpl(
     const ReadOptions& read_options, ColumnFamilyData* cfd,
     SequenceNumber snapshot, ReadCallback* read_callback) {
   assert(nullptr != cfd);
-  SuperVersion* super_version = cfd->GetReferencedSuperVersion(&mutex_);
+  SuperVersion* super_version = cfd->GetReferencedSuperVersion(this);
   auto db_iter = NewArenaWrappedDbIterator(
       env_, read_options, *cfd->ioptions(), super_version->mutable_cf_options,
       snapshot,
index 6b0ee157e4c95984c85e93e60685372209a73a47..b30dac6b5258000c95d7506af751b5550a9fc5a7 100644 (file)
@@ -4210,6 +4210,42 @@ TEST_F(DBTest2, SeekFileRangeDeleteTail) {
   }
   db_->ReleaseSnapshot(s1);
 }
+
+TEST_F(DBTest2, BackgroundPurgeTest) {
+  Options options = CurrentOptions();
+  options.write_buffer_manager = std::make_shared<rocksdb::WriteBufferManager>(1 << 20);
+  options.avoid_unnecessary_blocking_io = true;
+  DestroyAndReopen(options);
+  size_t base_value = options.write_buffer_manager->memory_usage();
+
+  ASSERT_OK(Put("a", "a"));
+  Iterator* iter = db_->NewIterator(ReadOptions());
+  ASSERT_OK(Flush());
+  size_t value = options.write_buffer_manager->memory_usage();
+  ASSERT_GT(value, base_value);
+
+  db_->GetEnv()->SetBackgroundThreads(1, Env::Priority::HIGH);
+  test::SleepingBackgroundTask sleeping_task_after;
+  db_->GetEnv()->Schedule(&test::SleepingBackgroundTask::DoSleepTask,
+                          &sleeping_task_after, Env::Priority::HIGH);
+  delete iter;
+
+  Env::Default()->SleepForMicroseconds(100000);
+  value = options.write_buffer_manager->memory_usage();
+  ASSERT_GT(value, base_value);
+
+  sleeping_task_after.WakeUp();
+  sleeping_task_after.WaitUntilDone();
+
+  test::SleepingBackgroundTask sleeping_task_after2;
+  db_->GetEnv()->Schedule(&test::SleepingBackgroundTask::DoSleepTask,
+                          &sleeping_task_after2, Env::Priority::HIGH);
+  sleeping_task_after2.WakeUp();
+  sleeping_task_after2.WaitUntilDone();
+
+  value = options.write_buffer_manager->memory_usage();
+  ASSERT_EQ(base_value, value);
+}
 }  // namespace rocksdb
 
 #ifdef ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS
index c875008c7695331a3820c03245253ba51112eecf..ae039db03ce19988d40b1fc793b6f6b0a32fe4aa 100644 (file)
@@ -239,9 +239,13 @@ void ForwardIterator::SVCleanup(DBImpl* db, SuperVersion* sv,
     db->FindObsoleteFiles(&job_context, false, true);
     if (background_purge_on_iterator_cleanup) {
       db->ScheduleBgLogWriterClose(&job_context);
+      db->AddSuperVersionsToFreeQueue(sv);
+      db->SchedulePurge();
     }
     db->mutex_.Unlock();
-    delete sv;
+    if (!background_purge_on_iterator_cleanup) {
+      delete sv;
+    }
     if (job_context.HaveSomethingToDelete()) {
       db->PurgeObsoleteFiles(job_context, background_purge_on_iterator_cleanup);
     }
@@ -614,7 +618,7 @@ void ForwardIterator::RebuildIterators(bool refresh_sv) {
   Cleanup(refresh_sv);
   if (refresh_sv) {
     // New
-    sv_ = cfd_->GetReferencedSuperVersion(&(db_->mutex_));
+    sv_ = cfd_->GetReferencedSuperVersion(db_);
   }
   ReadRangeDelAggregator range_del_agg(&cfd_->internal_comparator(),
                                        kMaxSequenceNumber /* upper_bound */);
@@ -668,7 +672,7 @@ void ForwardIterator::RebuildIterators(bool refresh_sv) {
 void ForwardIterator::RenewIterators() {
   SuperVersion* svnew;
   assert(sv_);
-  svnew = cfd_->GetReferencedSuperVersion(&(db_->mutex_));
+  svnew = cfd_->GetReferencedSuperVersion(db_);
 
   if (mutable_iter_ != nullptr) {
     DeleteIterator(mutable_iter_, true /* is_arena */);
index 624aa5245730d5becc89e39eacb365e70f4fdf08..7bc354bc121ae720eee526687db52ae7dd94243d 100644 (file)
@@ -1079,10 +1079,10 @@ struct DBOptions {
   // independently if the process crashes later and tries to recover.
   bool atomic_flush = false;
 
-  // If true, ColumnFamilyHandle's and Iterator's destructors won't delete
-  // obsolete files directly and will instead schedule a background job
-  // to do it. Use it if you're destroying iterators or ColumnFamilyHandle-s
-  // from latency-sensitive threads.
+  // If true, working thread may avoid doing unnecessary and long-latency
+  // operation (such as deleting obsolete files directly or deleting memtable)
+  // and will instead schedule a background job to do it.
+  // Use it if you're latency-sensitive.
   // If set to true, takes precedence over
   // ReadOptions::background_purge_on_iterator_cleanup.
   bool avoid_unnecessary_blocking_io = false;