]> git-server-git.apps.pok.os.sepia.ceph.com Git - rocksdb.git/commitdiff
Fix blob callback in compaction and atomic flush (#8681)
authorAkanksha Mahajan <akankshamahajan@fb.com>
Fri, 20 Aug 2021 18:37:53 +0000 (11:37 -0700)
committerFacebook GitHub Bot <facebook-github-bot@users.noreply.github.com>
Fri, 20 Aug 2021 18:41:14 +0000 (11:41 -0700)
Summary:
Pass BlobFileCompletionCallback  in case of atomic flush and
compaction job which is currently nullptr(default parameter).
BlobFileCompletionCallback is used in case of IntegratedBlobDB to report new blob files to
SstFileManager.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8681

Test Plan: CircleCI jobs

Reviewed By: ltamasi

Differential Revision: D30445998

Pulled By: akankshamahajan15

fbshipit-source-id: ba48093843864faec57f1f365cce7b5a569c4021

HISTORY.md
db/db_impl/db_impl_compaction_flush.cc
db/db_sst_test.cc

index aa728f4125914f15f9afb479e40968ed8f8fa22a..a424099e39638f2e50496fab7ff7e39f2c8c826a 100644 (file)
@@ -8,6 +8,7 @@
 * Fixed an issue where `OnFlushCompleted` was not called for atomic flush.
 * Fixed a bug affecting the batched `MultiGet` API when used with keys spanning multiple column families and `sorted_input == false`.
 * Fixed a potential incorrect result in opt mode and assertion failures caused by releasing snapshot(s) during compaction.
+* Fixed passing of BlobFileCompletionCallback to Compaction job and Atomic flush job which was default paramter (nullptr). BlobFileCompletitionCallback is internal callback that manages addition of blob files to SSTFileManager.
 
 ### New Features
 * Made the EventListener extend the Customizable class.
index df849d5191ae77f9421d94fe04899989544773b9..b1679d75657c8256fe2992c3dc6663a87b1e6ea3 100644 (file)
@@ -411,7 +411,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
         stats_, &event_logger_, mutable_cf_options.report_bg_io_stats,
         false /* sync_output_directory */, false /* write_manifest */,
         thread_pri, io_tracer_, db_id_, db_session_id_,
-        cfd->GetFullHistoryTsLow()));
+        cfd->GetFullHistoryTsLow(), &blob_callback_));
   }
 
   std::vector<FileMetaData> file_meta(num_cfs);
@@ -1280,7 +1280,7 @@ Status DBImpl::CompactFilesImpl(
       c->mutable_cf_options()->report_bg_io_stats, dbname_,
       &compaction_job_stats, Env::Priority::USER, io_tracer_,
       &manual_compaction_paused_, nullptr, db_id_, db_session_id_,
-      c->column_family_data()->GetFullHistoryTsLow());
+      c->column_family_data()->GetFullHistoryTsLow(), &blob_callback_);
 
   // Creating a compaction influences the compaction score because the score
   // takes running compactions into account (by skipping files that are already
@@ -3193,7 +3193,8 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
         &compaction_job_stats, thread_pri, io_tracer_,
         is_manual ? &manual_compaction_paused_ : nullptr,
         is_manual ? manual_compaction->canceled : nullptr, db_id_,
-        db_session_id_, c->column_family_data()->GetFullHistoryTsLow());
+        db_session_id_, c->column_family_data()->GetFullHistoryTsLow(),
+        &blob_callback_);
     compaction_job.Prepare();
 
     NotifyOnCompactionBegin(c->column_family_data(), c.get(), status,
index 77984758fd60d97ac94bb036e44beb1b9c8af7fb..cba6fb0b7881ae3437fad46153a232e8daada122 100644 (file)
@@ -1569,6 +1569,96 @@ TEST_F(DBSSTTest, GetTotalSstFilesSizeVersionsFilesShared) {
   ASSERT_EQ(total_sst_files_size, 0);
 }
 
+// This test if blob files are recorded by SST File Manager when Compaction job
+// creates/delete them and in case of AtomicFlush.
+TEST_F(DBSSTTest, DBWithSFMForBlobFilesAtomicFlush) {
+  std::shared_ptr<SstFileManager> sst_file_manager(NewSstFileManager(env_));
+  auto sfm = static_cast<SstFileManagerImpl*>(sst_file_manager.get());
+  Options options = CurrentOptions();
+  options.sst_file_manager = sst_file_manager;
+  options.enable_blob_files = true;
+  options.min_blob_size = 0;
+  options.disable_auto_compactions = true;
+  options.enable_blob_garbage_collection = true;
+  options.blob_garbage_collection_age_cutoff = 0.5;
+  options.atomic_flush = true;
+
+  int files_added = 0;
+  int files_deleted = 0;
+  int files_scheduled_to_delete = 0;
+
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+      "SstFileManagerImpl::OnAddFile", [&](void* arg) {
+        const std::string* const file_path =
+            static_cast<const std::string*>(arg);
+        if (EndsWith(*file_path, ".blob")) {
+          files_added++;
+        }
+      });
+
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+      "SstFileManagerImpl::OnDeleteFile", [&](void* arg) {
+        const std::string* const file_path =
+            static_cast<const std::string*>(arg);
+        if (EndsWith(*file_path, ".blob")) {
+          files_deleted++;
+        }
+      });
+
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+      "SstFileManagerImpl::ScheduleFileDeletion", [&](void* arg) {
+        assert(arg);
+        const std::string* const file_path =
+            static_cast<const std::string*>(arg);
+        if (EndsWith(*file_path, ".blob")) {
+          ++files_scheduled_to_delete;
+        }
+      });
+
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+
+  DestroyAndReopen(options);
+  Random rnd(301);
+
+  ASSERT_OK(Put("key_1", "value_1"));
+  ASSERT_OK(Put("key_2", "value_2"));
+  ASSERT_OK(Put("key_3", "value_3"));
+  ASSERT_OK(Put("key_4", "value_4"));
+  ASSERT_OK(Flush());
+
+  // Overwrite will create the garbage data.
+  ASSERT_OK(Put("key_3", "new_value_3"));
+  ASSERT_OK(Put("key_4", "new_value_4"));
+  ASSERT_OK(Flush());
+
+  ASSERT_OK(Put("Key5", "blob_value5"));
+  ASSERT_OK(Put("Key6", "blob_value6"));
+  ASSERT_OK(Flush());
+
+  ASSERT_EQ(files_added, 3);
+  ASSERT_EQ(files_deleted, 0);
+  ASSERT_EQ(files_scheduled_to_delete, 0);
+  files_added = 0;
+
+  constexpr Slice* begin = nullptr;
+  constexpr Slice* end = nullptr;
+  // Compaction job will create a new file and delete the older files.
+  ASSERT_OK(db_->CompactRange(CompactRangeOptions(), begin, end));
+
+  ASSERT_EQ(files_added, 1);
+  ASSERT_EQ(files_deleted, 1);
+  ASSERT_EQ(files_scheduled_to_delete, 1);
+
+  Close();
+  ASSERT_OK(DestroyDB(dbname_, options));
+  sfm->WaitForEmptyTrash();
+  ASSERT_EQ(files_deleted, 4);
+  ASSERT_EQ(files_scheduled_to_delete, 4);
+
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
+}
+
 #endif  // ROCKSDB_LITE
 
 }  // namespace ROCKSDB_NAMESPACE