]> git-server-git.apps.pok.os.sepia.ceph.com Git - rocksdb.git/commitdiff
make blob file close synchronous
authorYi Wu <yiwu@fb.com>
Fri, 25 Aug 2017 17:40:25 +0000 (10:40 -0700)
committerFacebook Github Bot <facebook-github-bot@users.noreply.github.com>
Fri, 25 Aug 2017 17:41:49 +0000 (10:41 -0700)
Summary:
Fixing flaky blob_db_test.

To close a blob file, blob db used to add a CloseSeqWrite job to the background thread to close it. Changing file close to be synchronous in order to simplify logic, and fix flaky blob_db_test.
Closes https://github.com/facebook/rocksdb/pull/2787

Differential Revision: D5699387

Pulled By: yiwu-arbug

fbshipit-source-id: dd07a945cd435cd3808fce7ee4ea57817409474a

utilities/blob_db/blob_db_impl.cc
utilities/blob_db/blob_db_impl.h
utilities/blob_db/blob_db_test.cc

index d787529b1dadea82dd49be1f54803be8b424ba40..553f89f2a58336fd387f0306cfb51ebce7f83670 100644 (file)
@@ -891,7 +891,10 @@ Status BlobDBImpl::Write(const WriteOptions& opts, WriteBatch* updates) {
   }
 
   if (blob_inserter.has_put()) {
-    CloseIf(blob_inserter.last_file());
+    s = CloseBlobFileIfNeeded(blob_inserter.last_file());
+    if (!s.ok()) {
+      return s;
+    }
   }
 
   // add deleted key to list of keys that have been deleted for book-keeping
@@ -1022,7 +1025,9 @@ Status BlobDBImpl::PutUntil(const WriteOptions& options,
     extendTTL(&(bfile->ttl_range_), expiration);
   }
 
-  CloseIf(bfile);
+  if (s.ok()) {
+    s = CloseBlobFileIfNeeded(bfile);
+  }
 
   TEST_SYNC_POINT("BlobDBImpl::PutUntil:Finish");
   return s;
@@ -1362,58 +1367,44 @@ std::pair<bool, int64_t> BlobDBImpl::SanityCheck(bool aborted) {
   return std::make_pair(true, -1);
 }
 
-std::pair<bool, int64_t> BlobDBImpl::CloseSeqWrite(
-    std::shared_ptr<BlobFile> bfile, bool aborted) {
+Status BlobDBImpl::CloseBlobFile(std::shared_ptr<BlobFile> bfile) {
+  Status s;
+  ROCKS_LOG_INFO(db_options_.info_log, "Close blob file %" PRIu64,
+                 bfile->BlobFileNumber());
   {
     WriteLock wl(&mutex_);
 
-    // this prevents others from picking up this file
-    open_blob_files_.erase(bfile);
-
-    auto findit =
-        std::find(open_simple_files_.begin(), open_simple_files_.end(), bfile);
-    if (findit != open_simple_files_.end()) open_simple_files_.erase(findit);
+    if (bfile->HasTTL()) {
+      size_t erased __attribute__((__unused__)) = open_blob_files_.erase(bfile);
+      assert(erased == 1);
+    } else {
+      auto iter = std::find(open_simple_files_.begin(),
+                            open_simple_files_.end(), bfile);
+      assert(iter != open_simple_files_.end());
+      open_simple_files_.erase(iter);
+    }
   }
 
   if (!bfile->closed_.load()) {
     WriteLock lockbfile_w(&bfile->mutex_);
-    bfile->WriteFooterAndCloseLocked();
+    s = bfile->WriteFooterAndCloseLocked();
   }
 
-  return std::make_pair(false, -1);
-}
-
-void BlobDBImpl::CloseIf(const std::shared_ptr<BlobFile>& bfile) {
-  // atomic read
-  bool close = bfile->GetFileSize() > bdb_options_.blob_file_size;
-  if (!close) return;
-
-  if (debug_level_ >= 2) {
-    ROCKS_LOG_DEBUG(db_options_.info_log,
-                    "Scheduling file for close %s fsize: %" PRIu64
-                    " limit: %" PRIu64,
-                    bfile->PathName().c_str(), bfile->GetFileSize(),
-                    bdb_options_.blob_file_size);
+  if (!s.ok()) {
+    ROCKS_LOG_ERROR(db_options_.info_log,
+                    "Failed to close blob file %" PRIu64 "with error: %s",
+                    bfile->BlobFileNumber(), s.ToString().c_str());
   }
 
-  {
-    WriteLock wl(&mutex_);
+  return s;
+}
 
-    open_blob_files_.erase(bfile);
-    auto findit =
-        std::find(open_simple_files_.begin(), open_simple_files_.end(), bfile);
-    if (findit != open_simple_files_.end()) {
-      open_simple_files_.erase(findit);
-    } else {
-      ROCKS_LOG_WARN(db_options_.info_log,
-                     "File not found while closing %s fsize: %" PRIu64
-                     " Multithreaded Writes?",
-                     bfile->PathName().c_str(), bfile->GetFileSize());
-    }
+Status BlobDBImpl::CloseBlobFileIfNeeded(std::shared_ptr<BlobFile>& bfile) {
+  // atomic read
+  if (bfile->GetFileSize() < bdb_options_.blob_file_size) {
+    return Status::OK();
   }
-
-  tqueue_.add(0, std::bind(&BlobDBImpl::CloseSeqWrite, this, bfile,
-                           std::placeholders::_1));
+  return CloseBlobFile(bfile);
 }
 
 bool BlobDBImpl::FileDeleteOk_SnapshotCheckLocked(
@@ -1585,7 +1576,7 @@ std::pair<bool, int64_t> BlobDBImpl::CheckSeqFiles(bool aborted) {
   }
 
   for (auto bfile : process_files) {
-    CloseSeqWrite(bfile, false);
+    CloseBlobFile(bfile);
   }
 
   return std::make_pair(true, -1);
@@ -1916,7 +1907,8 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr<BlobFile>& bfptr,
     delete transaction;
   }
   ROCKS_LOG_INFO(
-      db_options_.info_log, "%s blob file %" PRIu64 ".",
+      db_options_.info_log,
+      "%s blob file %" PRIu64
       ". Total blob records: %" PRIu64 ", Deletes: %" PRIu64 "/%" PRIu64
       " succeeded, Relocates: %" PRIu64 "/%" PRIu64 " succeeded.",
       s.ok() ? "Successfully garbage collected" : "Failed to garbage collect",
@@ -2334,8 +2326,8 @@ void BlobDBImpl::TEST_DeleteObsoleteFiles() {
   DeleteObsoleteFiles(false /*abort*/);
 }
 
-void BlobDBImpl::TEST_CloseBlobFile(std::shared_ptr<BlobFile>& bfile) {
-  CloseSeqWrite(bfile, false /*abort*/);
+Status BlobDBImpl::TEST_CloseBlobFile(std::shared_ptr<BlobFile>& bfile) {
+  return CloseBlobFile(bfile);
 }
 
 Status BlobDBImpl::TEST_GCFileAndUpdateLSM(std::shared_ptr<BlobFile>& bfile,
index 9886dbe5b214a22f7497f2affd723d4f4645bc37..e7c49b20d4d1121a52908e77e53275a10f6714af 100644 (file)
@@ -263,7 +263,7 @@ class BlobDBImpl : public BlobDB {
 
   std::vector<std::shared_ptr<BlobFile>> TEST_GetObsoleteFiles() const;
 
-  void TEST_CloseBlobFile(std::shared_ptr<BlobFile>& bfile);
+  Status TEST_CloseBlobFile(std::shared_ptr<BlobFile>& bfile);
 
   Status TEST_GCFileAndUpdateLSM(std::shared_ptr<BlobFile>& bfile,
                                  GCStats* gc_stats);
@@ -293,11 +293,6 @@ class BlobDBImpl : public BlobDB {
   // this handler is called.
   void OnFlushBeginHandler(DB* db, const FlushJobInfo& info);
 
-  // timer queue callback to close a file by appending a footer
-  // removes file from open files list
-  std::pair<bool, int64_t> CloseSeqWrite(std::shared_ptr<BlobFile> bfile,
-                                         bool aborted);
-
   // is this file ready for Garbage collection. if the TTL of the file
   // has expired or if threshold of the file has been evicted
   // tt - current time
@@ -308,8 +303,11 @@ class BlobDBImpl : public BlobDB {
   // collect all the blob log files from the blob directory
   Status GetAllLogFiles(std::set<std::pair<uint64_t, std::string>>* file_nums);
 
-  // appends a task into timer queue to close the file
-  void CloseIf(const std::shared_ptr<BlobFile>& bfile);
+  // Close a file by appending a footer, and removes file from open files list.
+  Status CloseBlobFile(std::shared_ptr<BlobFile> bfile);
+
+  // Close a file if its size exceeds blob_file_size
+  Status CloseBlobFileIfNeeded(std::shared_ptr<BlobFile>& bfile);
 
   uint64_t ExtractExpiration(const Slice& key, const Slice& value,
                              Slice* value_slice, std::string* new_value);
@@ -470,7 +468,7 @@ class BlobDBImpl : public BlobDB {
   // epoch or version of the open files.
   std::atomic<uint64_t> epoch_of_;
 
-  // typically we keep 4 open blob files (simple i.e. no TTL)
+  // All opened non-TTL blob files.
   std::vector<std::shared_ptr<BlobFile>> open_simple_files_;
 
   // all the blob files which are currently being appended to based
index 8ec01698aa8d3b73ab48921226947a66a29e9dd4..41c1482e7e6f74c9512bf94055e48f3859c45cb0 100644 (file)
@@ -185,7 +185,7 @@ TEST_F(BlobDBTest, PutWithTTL) {
   auto blob_files = bdb_impl->TEST_GetBlobFiles();
   ASSERT_EQ(1, blob_files.size());
   ASSERT_TRUE(blob_files[0]->HasTTL());
-  bdb_impl->TEST_CloseBlobFile(blob_files[0]);
+  ASSERT_OK(bdb_impl->TEST_CloseBlobFile(blob_files[0]));
   GCStats gc_stats;
   ASSERT_OK(bdb_impl->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats));
   ASSERT_EQ(100 - data.size(), gc_stats.num_deletes);
@@ -214,7 +214,7 @@ TEST_F(BlobDBTest, PutUntil) {
   auto blob_files = bdb_impl->TEST_GetBlobFiles();
   ASSERT_EQ(1, blob_files.size());
   ASSERT_TRUE(blob_files[0]->HasTTL());
-  bdb_impl->TEST_CloseBlobFile(blob_files[0]);
+  ASSERT_OK(bdb_impl->TEST_CloseBlobFile(blob_files[0]));
   GCStats gc_stats;
   ASSERT_OK(bdb_impl->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats));
   ASSERT_EQ(100 - data.size(), gc_stats.num_deletes);
@@ -246,7 +246,7 @@ TEST_F(BlobDBTest, TTLExtrator_NoTTL) {
   auto blob_files = bdb_impl->TEST_GetBlobFiles();
   ASSERT_EQ(1, blob_files.size());
   ASSERT_FALSE(blob_files[0]->HasTTL());
-  bdb_impl->TEST_CloseBlobFile(blob_files[0]);
+  ASSERT_OK(bdb_impl->TEST_CloseBlobFile(blob_files[0]));
   GCStats gc_stats;
   ASSERT_OK(bdb_impl->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats));
   ASSERT_EQ(0, gc_stats.num_deletes);
@@ -291,7 +291,7 @@ TEST_F(BlobDBTest, TTLExtractor_ExtractTTL) {
   auto blob_files = bdb_impl->TEST_GetBlobFiles();
   ASSERT_EQ(1, blob_files.size());
   ASSERT_TRUE(blob_files[0]->HasTTL());
-  bdb_impl->TEST_CloseBlobFile(blob_files[0]);
+  ASSERT_OK(bdb_impl->TEST_CloseBlobFile(blob_files[0]));
   GCStats gc_stats;
   ASSERT_OK(bdb_impl->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats));
   auto &data = static_cast<TestTTLExtractor *>(ttl_extractor_.get())->data;
@@ -338,7 +338,7 @@ TEST_F(BlobDBTest, TTLExtractor_ExtractExpiration) {
   auto blob_files = bdb_impl->TEST_GetBlobFiles();
   ASSERT_EQ(1, blob_files.size());
   ASSERT_TRUE(blob_files[0]->HasTTL());
-  bdb_impl->TEST_CloseBlobFile(blob_files[0]);
+  ASSERT_OK(bdb_impl->TEST_CloseBlobFile(blob_files[0]));
   GCStats gc_stats;
   ASSERT_OK(bdb_impl->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats));
   auto &data = static_cast<TestTTLExtractor *>(ttl_extractor_.get())->data;
@@ -395,7 +395,7 @@ TEST_F(BlobDBTest, TTLExtractor_ChangeValue) {
   auto blob_files = bdb_impl->TEST_GetBlobFiles();
   ASSERT_EQ(1, blob_files.size());
   ASSERT_TRUE(blob_files[0]->HasTTL());
-  bdb_impl->TEST_CloseBlobFile(blob_files[0]);
+  ASSERT_OK(bdb_impl->TEST_CloseBlobFile(blob_files[0]));
   GCStats gc_stats;
   ASSERT_OK(bdb_impl->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats));
   ASSERT_EQ(100 - data.size(), gc_stats.num_deletes);
@@ -592,7 +592,7 @@ TEST_F(BlobDBTest, GCAfterOverwriteKeys) {
   }
   auto blob_files = blob_db_impl->TEST_GetBlobFiles();
   ASSERT_EQ(1, blob_files.size());
-  blob_db_impl->TEST_CloseBlobFile(blob_files[0]);
+  ASSERT_OK(blob_db_impl->TEST_CloseBlobFile(blob_files[0]));
   // Test for data in SST
   size_t new_keys = 0;
   for (int i = 0; i < 100; i++) {
@@ -627,7 +627,7 @@ TEST_F(BlobDBTest, GCRelocateKeyWhileOverwriting) {
       static_cast_with_check<BlobDBImpl, BlobDB>(blob_db_);
   auto blob_files = blob_db_impl->TEST_GetBlobFiles();
   ASSERT_EQ(1, blob_files.size());
-  blob_db_impl->TEST_CloseBlobFile(blob_files[0]);
+  ASSERT_OK(blob_db_impl->TEST_CloseBlobFile(blob_files[0]));
 
   SyncPoint::GetInstance()->LoadDependency(
       {{"BlobDBImpl::GCFileAndUpdateLSM:AfterGetForUpdate",
@@ -663,7 +663,7 @@ TEST_F(BlobDBTest, GCExpiredKeyWhileOverwriting) {
       static_cast_with_check<BlobDBImpl, BlobDB>(blob_db_);
   auto blob_files = blob_db_impl->TEST_GetBlobFiles();
   ASSERT_EQ(1, blob_files.size());
-  blob_db_impl->TEST_CloseBlobFile(blob_files[0]);
+  ASSERT_OK(blob_db_impl->TEST_CloseBlobFile(blob_files[0]));
   mock_env_->set_now_micros(300 * 1000000);
 
   SyncPoint::GetInstance()->LoadDependency(
@@ -708,7 +708,6 @@ TEST_F(BlobDBTest, GCOldestSimpleBlobFileWhenOutOfSpace) {
   ASSERT_EQ(11, blob_files.size());
   ASSERT_TRUE(blob_files[0]->HasTTL());
   ASSERT_TRUE(blob_files[0]->Immutable());
-  blob_db_impl->TEST_CloseBlobFile(blob_files[0]);
   for (int i = 1; i <= 10; i++) {
     ASSERT_FALSE(blob_files[i]->HasTTL());
     if (i < 10) {
@@ -736,7 +735,7 @@ TEST_F(BlobDBTest, ReadWhileGC) {
     ASSERT_EQ(1, blob_files.size());
     std::shared_ptr<BlobFile> bfile = blob_files[0];
     uint64_t bfile_number = bfile->BlobFileNumber();
-    blob_db_impl->TEST_CloseBlobFile(bfile);
+    ASSERT_OK(blob_db_impl->TEST_CloseBlobFile(bfile));
 
     switch (i) {
       case 0: