]> git-server-git.apps.pok.os.sepia.ceph.com Git - rocksdb.git/commitdiff
support bulk loading with universal compaction
authorAaron Gao <gzh@fb.com>
Wed, 26 Apr 2017 20:28:39 +0000 (13:28 -0700)
committerFacebook Github Bot <facebook-github-bot@users.noreply.github.com>
Wed, 26 Apr 2017 20:41:32 +0000 (13:41 -0700)
Summary:
Support buck load with universal compaction.
More test cases to be added.
Closes https://github.com/facebook/rocksdb/pull/2202

Differential Revision: D4935360

Pulled By: lightmark

fbshipit-source-id: cc3ca1b6f42faa503207dab1408d6bcf393ee5b5

db/compaction_picker_universal.cc
db/db_impl.cc
db/external_sst_file_basic_test.cc
db/external_sst_file_ingestion_job.cc
db/external_sst_file_ingestion_job.h
db/external_sst_file_test.cc

index 0c61efe861acc7a95c5d9cfcb4b106fbfbf04763..a14ea64737f442ea6bd078d8d8b817f52e27a1d5 100644 (file)
@@ -342,8 +342,6 @@ Compaction* UniversalCompactionPicker::PickCompaction(
       assert(f->smallest_seqno <= f->largest_seqno);
       if (is_first) {
         is_first = false;
-      } else {
-        assert(prev_smallest_seqno > f->largest_seqno);
       }
       prev_smallest_seqno = f->smallest_seqno;
     }
index fd934bc3f32e4a7704083ca0fc628f2ff5f5eae1..19046f3fd57423e4f9727cce068dda2b28165adf 100644 (file)
@@ -2532,7 +2532,8 @@ Status DBImpl::IngestExternalFile(
     if (status.ok()) {
       bool need_flush = false;
       status = ingestion_job.NeedsFlush(&need_flush);
-
+      TEST_SYNC_POINT_CALLBACK("DBImpl::IngestExternalFile:NeedFlush",
+                               &need_flush);
       if (status.ok() && need_flush) {
         mutex_.Unlock();
         status = FlushMemTable(cfd, FlushOptions(), true /* writes_stopped */);
index 9c6e08e7a3831554984effc2fb10147fd1fbc4b3..66a0220be7de29d560eb4ed26d0155e4e4c24195 100644 (file)
@@ -182,92 +182,94 @@ TEST_F(ExternalSSTFileBasicTest, NoCopy) {
 }
 
 TEST_F(ExternalSSTFileBasicTest, IngestFileWithGlobalSeqnoPickedSeqno) {
-  Options options = CurrentOptions();
-  DestroyAndReopen(options);
-  std::map<std::string, std::string> true_data;
-
-  int file_id = 1;
-
-  ASSERT_OK(GenerateAndAddExternalFile(options, {1, 2, 3, 4, 5, 6}, file_id++,
-                                       &true_data));
-  // File dont overwrite any keys, No seqno needed
-  ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 0);
-
-  ASSERT_OK(GenerateAndAddExternalFile(options, {10, 11, 12, 13}, file_id++,
-                                       &true_data));
-  // File dont overwrite any keys, No seqno needed
-  ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 0);
-
-  ASSERT_OK(
-      GenerateAndAddExternalFile(options, {1, 4, 6}, file_id++, &true_data));
-  // File overwrite some keys, a seqno will be assigned
-  ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 1);
-
-  ASSERT_OK(
-      GenerateAndAddExternalFile(options, {11, 15, 19}, file_id++, &true_data));
-  // File overwrite some keys, a seqno will be assigned
-  ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 2);
-
-  ASSERT_OK(
-      GenerateAndAddExternalFile(options, {120, 130}, file_id++, &true_data));
-  // File dont overwrite any keys, No seqno needed
-  ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 2);
-
-  ASSERT_OK(
-      GenerateAndAddExternalFile(options, {1, 130}, file_id++, &true_data));
-  // File overwrite some keys, a seqno will be assigned
-  ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 3);
-
-  // Write some keys through normal write path
-  for (int i = 0; i < 50; i++) {
-    ASSERT_OK(Put(Key(i), "memtable"));
-    true_data[Key(i)] = "memtable";
-  }
-  SequenceNumber last_seqno = dbfull()->GetLatestSequenceNumber();
-
-  ASSERT_OK(
-      GenerateAndAddExternalFile(options, {60, 61, 62}, file_id++, &true_data));
-  // File dont overwrite any keys, No seqno needed
-  ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno);
-
-  ASSERT_OK(
-      GenerateAndAddExternalFile(options, {40, 41, 42}, file_id++, &true_data));
-  // File overwrite some keys, a seqno will be assigned
-  ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno + 1);
-
-  ASSERT_OK(
-      GenerateAndAddExternalFile(options, {20, 30, 40}, file_id++, &true_data));
-  // File overwrite some keys, a seqno will be assigned
-  ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno + 2);
-
-  const Snapshot* snapshot = db_->GetSnapshot();
-
-  // We will need a seqno for the file regardless if the file overwrite
-  // keys in the DB or not because we have a snapshot
-  ASSERT_OK(
-      GenerateAndAddExternalFile(options, {1000, 1002}, file_id++, &true_data));
-  // A global seqno will be assigned anyway because of the snapshot
-  ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno + 3);
-
-  ASSERT_OK(
-      GenerateAndAddExternalFile(options, {2000, 3002}, file_id++, &true_data));
-  // A global seqno will be assigned anyway because of the snapshot
-  ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno + 4);
-
-  ASSERT_OK(GenerateAndAddExternalFile(options, {1, 20, 40, 100, 150},
-                                       file_id++, &true_data));
-  // A global seqno will be assigned anyway because of the snapshot
-  ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno + 5);
-
-  db_->ReleaseSnapshot(snapshot);
-
-  ASSERT_OK(
-      GenerateAndAddExternalFile(options, {5000, 5001}, file_id++, &true_data));
-  // No snapshot anymore, no need to assign a seqno
-  ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno + 5);
-
-  size_t kcnt = 0;
-  VerifyDBFromMap(true_data, &kcnt, false);
+  do {
+    Options options = CurrentOptions();
+    DestroyAndReopen(options);
+    std::map<std::string, std::string> true_data;
+
+    int file_id = 1;
+
+    ASSERT_OK(GenerateAndAddExternalFile(options, {1, 2, 3, 4, 5, 6}, file_id++,
+                                         &true_data));
+    // File dont overwrite any keys, No seqno needed
+    ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 0);
+
+    ASSERT_OK(GenerateAndAddExternalFile(options, {10, 11, 12, 13}, file_id++,
+                                         &true_data));
+    // File dont overwrite any keys, No seqno needed
+    ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 0);
+
+    ASSERT_OK(
+        GenerateAndAddExternalFile(options, {1, 4, 6}, file_id++, &true_data));
+    // File overwrite some keys, a seqno will be assigned
+    ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 1);
+
+    ASSERT_OK(GenerateAndAddExternalFile(options, {11, 15, 19}, file_id++,
+                                         &true_data));
+    // File overwrite some keys, a seqno will be assigned
+    ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 2);
+
+    ASSERT_OK(
+        GenerateAndAddExternalFile(options, {120, 130}, file_id++, &true_data));
+    // File dont overwrite any keys, No seqno needed
+    ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 2);
+
+    ASSERT_OK(
+        GenerateAndAddExternalFile(options, {1, 130}, file_id++, &true_data));
+    // File overwrite some keys, a seqno will be assigned
+    ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 3);
+
+    // Write some keys through normal write path
+    for (int i = 0; i < 50; i++) {
+      ASSERT_OK(Put(Key(i), "memtable"));
+      true_data[Key(i)] = "memtable";
+    }
+    SequenceNumber last_seqno = dbfull()->GetLatestSequenceNumber();
+
+    ASSERT_OK(GenerateAndAddExternalFile(options, {60, 61, 62}, file_id++,
+                                         &true_data));
+    // File dont overwrite any keys, No seqno needed
+    ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno);
+
+    ASSERT_OK(GenerateAndAddExternalFile(options, {40, 41, 42}, file_id++,
+                                         &true_data));
+    // File overwrite some keys, a seqno will be assigned
+    ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno + 1);
+
+    ASSERT_OK(GenerateAndAddExternalFile(options, {20, 30, 40}, file_id++,
+                                         &true_data));
+    // File overwrite some keys, a seqno will be assigned
+    ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno + 2);
+
+    const Snapshot* snapshot = db_->GetSnapshot();
+
+    // We will need a seqno for the file regardless if the file overwrite
+    // keys in the DB or not because we have a snapshot
+    ASSERT_OK(GenerateAndAddExternalFile(options, {1000, 1002}, file_id++,
+                                         &true_data));
+    // A global seqno will be assigned anyway because of the snapshot
+    ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno + 3);
+
+    ASSERT_OK(GenerateAndAddExternalFile(options, {2000, 3002}, file_id++,
+                                         &true_data));
+    // A global seqno will be assigned anyway because of the snapshot
+    ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno + 4);
+
+    ASSERT_OK(GenerateAndAddExternalFile(options, {1, 20, 40, 100, 150},
+                                         file_id++, &true_data));
+    // A global seqno will be assigned anyway because of the snapshot
+    ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno + 5);
+
+    db_->ReleaseSnapshot(snapshot);
+
+    ASSERT_OK(GenerateAndAddExternalFile(options, {5000, 5001}, file_id++,
+                                         &true_data));
+    // No snapshot anymore, no need to assign a seqno
+    ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno + 5);
+
+    size_t kcnt = 0;
+    VerifyDBFromMap(true_data, &kcnt, false);
+  } while (ChangeCompactOptions());
 }
 
 TEST_F(ExternalSSTFileBasicTest, FadviseTrigger) {
index 28ccc7dc71f4a5fb9c4b5e192fcf58fb140a4d71..9a6b3b29d351c73df738f74895f2af0bf9d47c58 100644 (file)
@@ -156,34 +156,34 @@ Status ExternalSstFileIngestionJob::Run() {
 
   bool consumed_seqno = false;
   bool force_global_seqno = false;
-  const SequenceNumber last_seqno = versions_->LastSequence();
+
   if (ingestion_options_.snapshot_consistency && !db_snapshots_->empty()) {
     // We need to assign a global sequence number to all the files even
     // if the dont overlap with any ranges since we have snapshots
     force_global_seqno = true;
   }
-
+  const SequenceNumber last_seqno = versions_->LastSequence();
   SuperVersion* super_version = cfd_->GetSuperVersion();
   edit_.SetColumnFamily(cfd_->GetID());
   // The levels that the files will be ingested into
+
   for (IngestedFileInfo& f : files_to_ingest_) {
-    bool overlap_with_db = false;
-    status = AssignLevelForIngestedFile(super_version, &f, &overlap_with_db);
+    SequenceNumber assigned_seqno = 0;
+    status = AssignLevelAndSeqnoForIngestedFile(
+        super_version, force_global_seqno, cfd_->ioptions()->compaction_style,
+        &f, &assigned_seqno);
     if (!status.ok()) {
       return status;
     }
-
-    if (overlap_with_db || force_global_seqno) {
-      status = AssignGlobalSeqnoForIngestedFile(&f, last_seqno + 1);
+    status = AssignGlobalSeqnoForIngestedFile(&f, assigned_seqno);
+    TEST_SYNC_POINT_CALLBACK("ExternalSstFileIngestionJob::Run",
+                             &assigned_seqno);
+    if (assigned_seqno == last_seqno + 1) {
       consumed_seqno = true;
-    } else {
-      status = AssignGlobalSeqnoForIngestedFile(&f, 0);
     }
-
     if (!status.ok()) {
       return status;
     }
-
     edit_.AddFile(f.picked_level, f.fd.GetNumber(), f.fd.GetPathId(),
                   f.fd.GetFileSize(), f.smallest_internal_key(),
                   f.largest_internal_key(), f.assigned_seqno, f.assigned_seqno,
@@ -388,15 +388,25 @@ Status ExternalSstFileIngestionJob::IngestedFilesOverlapWithMemtables(
   return status;
 }
 
-Status ExternalSstFileIngestionJob::AssignLevelForIngestedFile(
-    SuperVersion* sv, IngestedFileInfo* file_to_ingest, bool* overlap_with_db) {
-  *overlap_with_db = false;
+Status ExternalSstFileIngestionJob::AssignLevelAndSeqnoForIngestedFile(
+    SuperVersion* sv, bool force_global_seqno, CompactionStyle compaction_style,
+    IngestedFileInfo* file_to_ingest, SequenceNumber* assigned_seqno) {
+  Status status;
+  *assigned_seqno = 0;
+  const SequenceNumber last_seqno = versions_->LastSequence();
+  if (force_global_seqno) {
+    *assigned_seqno = last_seqno + 1;
+    if (compaction_style == kCompactionStyleUniversal) {
+      file_to_ingest->picked_level = 0;
+      return status;
+    }
+  }
 
+  bool overlap_with_db = false;
   Arena arena;
   ReadOptions ro;
   ro.total_order_seek = true;
 
-  Status status;
   int target_level = 0;
   auto* vstorage = cfd_->current()->storage_info();
   for (int lvl = 0; lvl < cfd_->NumberLevels(); lvl++) {
@@ -423,24 +433,46 @@ Status ExternalSstFileIngestionJob::AssignLevelForIngestedFile(
       if (!status.ok()) {
         return status;
       }
-
       if (overlap_with_level) {
         // We must use L0 or any level higher than `lvl` to be able to overwrite
         // the keys that we overlap with in this level, We also need to assign
         // this file a seqno to overwrite the existing keys in level `lvl`
-        *overlap_with_db = true;
+        overlap_with_db = true;
         break;
       }
+
+      if (compaction_style == kCompactionStyleUniversal && lvl != 0) {
+        const std::vector<FileMetaData*>& level_files =
+            vstorage->LevelFiles(lvl);
+        const SequenceNumber level_largest_seqno =
+            (*max_element(level_files.begin(), level_files.end(),
+                          [](FileMetaData* f1, FileMetaData* f2) {
+                            return f1->largest_seqno < f2->largest_seqno;
+                          }))
+                ->largest_seqno;
+        if (level_largest_seqno != 0) {
+          *assigned_seqno = level_largest_seqno;
+        } else {
+          continue;
+        }
+      }
+    } else if (compaction_style == kCompactionStyleUniversal) {
+      continue;
     }
 
     // We dont overlap with any keys in this level, but we still need to check
     // if our file can fit in it
-
     if (IngestedFileFitInLevel(file_to_ingest, lvl)) {
       target_level = lvl;
     }
   }
+ TEST_SYNC_POINT_CALLBACK(
+      "ExternalSstFileIngestionJob::AssignLevelAndSeqnoForIngestedFile",
+      &overlap_with_db);
   file_to_ingest->picked_level = target_level;
+  if (overlap_with_db && *assigned_seqno == 0) {
+    *assigned_seqno = last_seqno + 1;
+  }
   return status;
 }
 
index 9efa19e9dc160c95df7a829b6df82677afc0011f..20892bed51c6afb76ca3b3974262f7d1a276f664 100644 (file)
@@ -114,12 +114,14 @@ class ExternalSstFileIngestionJob {
   // REQUIRES: Mutex held
   Status IngestedFilesOverlapWithMemtables(SuperVersion* sv, bool* overlap);
 
-  // Assign `file_to_ingest` the lowest possible level that it can
-  // be ingested to.
+  // Assign `file_to_ingest` the appropriate sequence number and  the lowest
+  // possible level that it can be ingested to according to compaction_style.
   // REQUIRES: Mutex held
-  Status AssignLevelForIngestedFile(SuperVersion* sv,
-                                    IngestedFileInfo* file_to_ingest,
-                                    bool* overlap_with_db);
+  Status AssignLevelAndSeqnoForIngestedFile(SuperVersion* sv,
+                                            bool force_global_seqno,
+                                            CompactionStyle compaction_style,
+                                            IngestedFileInfo* file_to_ingest,
+                                            SequenceNumber* assigned_seqno);
 
   // Set the file global sequence number to `seqno`
   Status AssignGlobalSeqnoForIngestedFile(IngestedFileInfo* file_to_ingest,
index 4916c04d36a94e1052ff834b9c8ba784cd4ccedd..e88b62e648a879d526fb25928579e52e3cb97818 100644 (file)
@@ -315,8 +315,7 @@ TEST_F(ExternalSSTFileTest, Basic) {
       ASSERT_EQ(Get(Key(k)), value);
     }
     DestroyAndRecreateExternalSSTFilesDir();
-  } while (ChangeOptions(kSkipPlainTable | kSkipUniversalCompaction |
-                         kSkipFIFOCompaction));
+  } while (ChangeOptions(kSkipPlainTable | kSkipFIFOCompaction));
 }
 class SstFileWriterCollector : public TablePropertiesCollector {
  public:
@@ -556,8 +555,7 @@ TEST_F(ExternalSSTFileTest, AddList) {
       ASSERT_EQ(Get(Key(k)), value);
     }
     DestroyAndRecreateExternalSSTFilesDir();
-  } while (ChangeOptions(kSkipPlainTable | kSkipUniversalCompaction |
-                         kSkipFIFOCompaction));
+  } while (ChangeOptions(kSkipPlainTable | kSkipFIFOCompaction));
 }
 
 TEST_F(ExternalSSTFileTest, AddListAtomicity) {
@@ -599,8 +597,7 @@ TEST_F(ExternalSSTFileTest, AddListAtomicity) {
       ASSERT_EQ(Get(Key(k)), value);
     }
     DestroyAndRecreateExternalSSTFilesDir();
-  } while (ChangeOptions(kSkipPlainTable | kSkipUniversalCompaction |
-                         kSkipFIFOCompaction));
+  } while (ChangeOptions(kSkipPlainTable | kSkipFIFOCompaction));
 }
 // This test reporduce a bug that can happen in some cases if the DB started
 // purging obsolete files when we are adding an external sst file.
@@ -831,12 +828,31 @@ TEST_F(ExternalSSTFileTest, MultiThreaded) {
 
     fprintf(stderr, "Verified %d values\n", num_files * keys_per_file);
     DestroyAndRecreateExternalSSTFilesDir();
-  } while (ChangeOptions(kSkipPlainTable | kSkipUniversalCompaction |
-                         kSkipFIFOCompaction));
+  } while (ChangeOptions(kSkipPlainTable | kSkipFIFOCompaction));
 }
 
 TEST_F(ExternalSSTFileTest, OverlappingRanges) {
   Random rnd(301);
+  int picked_level = 0;
+  rocksdb::SyncPoint::GetInstance()->SetCallBack(
+    "ExternalSstFileIngestionJob::Run", [&picked_level](void* arg) {
+      ASSERT_TRUE(arg != nullptr);
+      picked_level = *(static_cast<int*>(arg));
+    });
+  bool need_flush = false;
+  rocksdb::SyncPoint::GetInstance()->SetCallBack(
+    "DBImpl::IngestExternalFile:NeedFlush", [&need_flush](void* arg) {
+      ASSERT_TRUE(arg != nullptr);
+      need_flush = *(static_cast<bool*>(arg));
+    });
+  bool overlap_with_db = false;
+  rocksdb::SyncPoint::GetInstance()->SetCallBack(
+      "ExternalSstFileIngestionJob::AssignLevelAndSeqnoForIngestedFile",
+      [&overlap_with_db](void* arg) {
+        ASSERT_TRUE(arg != nullptr);
+        overlap_with_db = *(static_cast<bool*>(arg));
+      });
+  rocksdb::SyncPoint::GetInstance()->EnableProcessing();
   do {
     Options options = CurrentOptions();
     DestroyAndReopen(options);
@@ -889,15 +905,27 @@ TEST_F(ExternalSSTFileTest, OverlappingRanges) {
 
         // Insert the generated file
         s = DeprecatedAddFile({file_name});
-
         auto it = true_data.lower_bound(Key(range_start));
-        if (it != true_data.end() && it->first <= Key(range_end)) {
-          // This range overlap with data already exist in DB
-          ASSERT_NOK(s);
-          failed_add_file++;
+        if (option_config_ != kUniversalCompaction &&
+            option_config_ != kUniversalCompactionMultiLevel) {
+          if (it != true_data.end() && it->first <= Key(range_end)) {
+            // This range overlap with data already exist in DB
+            ASSERT_NOK(s);
+            failed_add_file++;
+          } else {
+            ASSERT_OK(s);
+            success_add_file++;
+          }
         } else {
-          ASSERT_OK(s);
-          success_add_file++;
+          if ((it != true_data.end() && it->first <= Key(range_end)) ||
+              need_flush || picked_level > 0 || overlap_with_db) {
+            // This range overlap with data already exist in DB
+            ASSERT_NOK(s);
+            failed_add_file++;
+          } else {
+            ASSERT_OK(s);
+            success_add_file++;
+          }
         }
       }
 
@@ -930,8 +958,7 @@ TEST_F(ExternalSSTFileTest, OverlappingRanges) {
     }
     printf("keys/values verified\n");
     DestroyAndRecreateExternalSSTFilesDir();
-  } while (ChangeOptions(kSkipPlainTable | kSkipUniversalCompaction |
-                         kSkipFIFOCompaction));
+  } while (ChangeOptions(kSkipPlainTable | kSkipFIFOCompaction));
 }
 
 TEST_F(ExternalSSTFileTest, PickedLevel) {