]> git-server-git.apps.pok.os.sepia.ceph.com Git - rocksdb.git/commitdiff
Hotcold: Short circuiting behaviour for file lookups when finding key in hot file
authorKosie van der Merwe <kosie.vandermerwe@gmail.com>
Fri, 15 Feb 2013 20:02:15 +0000 (12:02 -0800)
committerKosie van der Merwe <kosie.vandermerwe@gmail.com>
Fri, 15 Feb 2013 20:02:15 +0000 (12:02 -0800)
Summary:
Before this update both hot and cold file had to be queried in order to get the latest value for a given key. Now if the key is found in the hot file then we know the latest value is in the hot file.

Changed compaction to not allow a user key to have a record with a smaller sequence number in the hot file if there is a record with a larger sequence number in the cold file.

Added short circuiting to `Version::Get()` to assume that if a key is found in a newer file then it has a newer sequence number.

Test Plan:
`make check`

ran `db_bench` compiled with assertions on and in hot cold mode.

Reviewers: vamsi, dhruba

Reviewed By: vamsi

CC: leveldb
Differential Revision: https://reviews.facebook.net/D8451

db/db_impl.cc
db/db_test.cc
db/version_set.cc
db/version_set.h

index 8b0c3484cd8e6f94fd0b0a3ae675885b30ffd30c..14ef2ede8d61b5e89d258e5bcdbf27e2db15ed83 100644 (file)
@@ -1421,6 +1421,12 @@ Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) {
     out.smallest.Clear();
     out.largest.Clear();
     compact->outputs.push_back(out);
+#ifndef NDEBUG
+    if (i > 0) {
+      // Check that file numbers are strictly increasing.
+      assert(out.number > compact->outputs[compact->outputs.size()-2].number);
+    }
+#endif
 
     // Make the output file
     std::string fname = TableFileName(dbname_, file_number);
@@ -1458,6 +1464,10 @@ Status DBImpl::FinishCompactionOutputFile(CompactionState* compact,
   assert(!compact->outfiles.empty());
   assert(!compact->builders.empty());
 
+#ifndef NDEBUG
+  std::vector<unique_ptr<Iterator>> iters;
+#endif
+
   Status s;
 
   size_t i;
@@ -1499,7 +1509,11 @@ Status DBImpl::FinishCompactionOutputFile(CompactionState* compact,
                                                  output_number,
                                                  current_bytes);
       s = iter->status();
+#ifndef NDEBUG
+      iters.push_back(unique_ptr<Iterator>(iter));
+#else
       delete iter;
+#endif
       if (s.ok()) {
         Log(options_.info_log,
             "Generated table #%llu: %lld keys, %lld bytes",
@@ -1517,6 +1531,57 @@ Status DBImpl::FinishCompactionOutputFile(CompactionState* compact,
     }
   }
 
+#ifndef NDEBUG
+  if (s.ok() && is_hotcold_ && iters.size() >= 2) {
+    // Check that for a given user key, that there is no cold record with a
+    // greater sequence number than a record in the hot file.
+    assert(iters.size() == 2); // we should only have 2 files but we may have
+                               // less if one of the files is empty and hence
+                               // gets dropped.
+    assert(iters[0]->status().ok());
+    assert(iters[1]->status().ok());
+
+    Iterator* hot = iters[1].get();
+    Iterator* cold = iters[0].get();
+
+    hot->SeekToFirst();
+    for(hot->SeekToFirst(); hot->Valid(); hot->Next()) {
+      ParsedInternalKey hot_key;
+      if (!ParseInternalKey(hot->key(), &hot_key)) {
+        continue; // skip error keys.
+      }
+
+      // Move the cold iterator to the position past the hot key.
+      cold->Seek(hot->key());
+
+      // Move to the position just before the hot key, skipping error keys.
+      ParsedInternalKey cold_key;
+      if (cold->Valid()) {
+        cold->Prev();
+      } else {
+        assert(cold->status().ok()); // bomb out if we suddenly had an IO error
+        cold->SeekToFirst(); // Assuming !cold->Valid() because hot->key() is
+                             // past the end of cold's last key.
+      }
+      while (cold->Valid() && !ParseInternalKey(cold->key(), &cold_key)) {
+        cold->Prev();
+      }
+
+      assert(cold->status().ok()); // bomb out if we suddenly had an IO error
+      if (!cold->Valid()) {
+        // Went past the start of the cold file iterator, so look at next hot
+        // key.
+        continue;
+      }
+
+      // If the hot user key and the cold user key compare equal then the cold
+      // key has a greater sequence number than the hot key
+      assert(user_comparator()->Compare(hot_key.user_key,
+                                        cold_key.user_key) != 0);
+    }
+  }
+#endif
+
   compact->num_outfiles = 0;
   compact->builders.clear();
   compact->outfiles.clear();
@@ -1774,6 +1839,11 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
   ParsedInternalKey ikey;
   std::string current_user_key;
   bool has_current_user_key = false;
+  bool encountered_error_key = false;
+  bool can_be_hot = true; // States whether the current user key can be hot or not.
+                          // Once the user key has been written to the cold
+                          // file other versions of it can no longer be written
+                          // to the hot file.
   SequenceNumber last_sequence_for_key __attribute__((unused)) =
     kMaxSequenceNumber;
   SequenceNumber visible_in_snapshot = kMaxSequenceNumber;
@@ -1806,6 +1876,8 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
     if (!ParseInternalKey(key, &ikey)) {
       // Do not hide error keys
       current_user_key.clear();
+      encountered_error_key = true;
+      can_be_hot = false;
       has_current_user_key = false;
       last_sequence_for_key = kMaxSequenceNumber;
       visible_in_snapshot = kMaxSequenceNumber;
@@ -1816,6 +1888,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
         // First occurrence of this user key
         current_user_key.assign(ikey.user_key.data(), ikey.user_key.size());
         has_current_user_key = true;
+        can_be_hot = true;
         last_sequence_for_key = kMaxSequenceNumber;
         visible_in_snapshot = kMaxSequenceNumber;
       }
@@ -1892,12 +1965,25 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
 
       // Select output file
       size_t outfile_idx = 0;
-      if (is_hotcold_ && IsRecordHot(input.get(), metrics_db_, ReadOptions(),
-                                     &block_metrics_store)) {
-        outfile_idx = 1;
+      if (is_hotcold_ &&
+          !encountered_error_key &&
+          can_be_hot &&
+          IsRecordHot(input.get(), metrics_db_, ReadOptions(),
+                      &block_metrics_store)) {
+        outfile_idx = 1; // Use 1 as the hot file as it has a larger file
+                         // number. So we can use the short circuiting logic
+                         // of Version::Get() to avoid having to do lookups per
+                         // level for hot keys.
+                         //
+                         // Due to short circuiting if we encounter an error
+                         // key, we need to make all keys after it cold or else
+                         // our short circuiting logic may be false. As a run
+                         // of user keys can be interrupted with an error key.
       }
       assert(outfile_idx < compact->num_outfiles);
-
+      if (outfile_idx == 0) {
+        can_be_hot = false;
+      }
 
       if (compact->builders[outfile_idx]->NumEntries() == 0) {
         compact->current_output(outfile_idx)->smallest.DecodeFrom(key);
@@ -2089,7 +2175,9 @@ Status DBImpl::Get(const ReadOptions& options,
       if (read_options.record_accesses && is_hotcold_) {
         read_options.metrics_handler = this;
       }
-      s = current->Get(read_options, lkey, value, &stats);
+      // If the database is hot-cold then we know we can use short circuiting
+      // as the compaction logic guarantees that this will be valid.
+      s = current->Get(read_options, lkey, value, &stats, is_hotcold_);
       have_stat_update = true;
     }
     mutex_.Lock();
index 1fdcd26b6a45d6456c952474e3efd751737c3c11..e6f7284aa2dd8d1977cbda7c43db398390dfd572 100644 (file)
@@ -3053,6 +3053,84 @@ TEST(DBTest, HotCold) {
   } while (ChangeOptions());
 }
 
+TEST(DBTest, HotColdShortCircuit) {
+  const uint32_t kNumKeys = 1 << 9;
+  const uint32_t kKeysToSkip = 6;
+  do {
+    DestroyAndReopenWithHotCold(NULL);
+    ASSERT_TRUE(db_ != NULL);
+
+
+    // Skip if this db is not hotcold
+    if (!dbfull()->TEST_IsHotCold()) {
+      continue;
+    }
+
+    // Populate db and push all data to sstable
+    for (uint32_t key = 0; key < kNumKeys; ++key) {
+      std::string skey;
+      PutFixed32(&skey, key);
+
+      Put(skey, skey);
+    }
+    dbfull()->TEST_CompactMemTable();
+    dbfull()->CompactRange(NULL, NULL);
+
+    // Flush metrics if any and check that all records are cold
+    dbfull()->TEST_ForceFlushMetrics();
+    {
+      ReadOptions opts;
+      opts.record_accesses = false;
+      Iterator* iter = db_->NewIterator(opts);
+      for (uint32_t key = 0; key < kNumKeys; ++key) {
+        std::string skey;
+        PutFixed32(&skey, key);
+
+        iter->Seek(skey);
+        ASSERT_TRUE(iter->Valid());
+        ASSERT_EQ(skey, iter->key().ToString());
+
+        ASSERT_TRUE(!dbfull()->TEST_IsHot(iter));
+      }
+      delete iter;
+    }
+
+    // Access a few rows and flush the metrics to the metrics db.
+    for (uint32_t key = 0; key < kNumKeys; key += kKeysToSkip) {
+      std::string skey;
+      PutFixed32(&skey, key);
+
+      ASSERT_EQ(skey, Get(skey));
+    }
+    dbfull()->TEST_ForceFlushMetrics();
+
+    const Snapshot* snapshot = dbfull()->GetSnapshot();
+
+    // Populate db and push all data to sstable
+    for (uint32_t key = 0; key < kNumKeys; ++key) {
+      std::string skey;
+      PutFixed32(&skey, key);
+
+      Put(skey, skey + "0");
+    }
+    dbfull()->TEST_CompactMemTable();
+    dbfull()->CompactRange(NULL, NULL);
+
+    // Check that all values are valid and that short circuiting logic did not fail us.
+    {
+      for (uint32_t key = 0; key < kNumKeys; ++key) {
+        std::string skey;
+        PutFixed32(&skey, key);
+
+        ASSERT_EQ(skey, Get(skey, snapshot));
+        ASSERT_EQ(skey + "0", Get(skey));
+      }
+    }
+
+    dbfull()->ReleaseSnapshot(snapshot);
+  } while (ChangeOptions());
+}
+
 std::string MakeKey(unsigned int num) {
   char buf[30];
   snprintf(buf, sizeof(buf), "%016u", num);
index 9f0457e3ef7250fa1dd2ac5e9a544753a770425e..7a1c6854d1bb768d83608d1178c09cdb12ce1a5f 100644 (file)
@@ -479,7 +479,8 @@ Version::Version(VersionSet* vset, uint64_t version_number)
 Status Version::Get(const ReadOptions& options,
                     const LookupKey& k,
                     std::string* value,
-                    GetStats* stats) {
+                    GetStats* stats,
+                    bool short_circuit) {
   Slice ikey = k.internal_key();
   Slice user_key = k.user_key();
   const Comparator* ucmp = vset_->icmp_.user_comparator();
@@ -534,7 +535,7 @@ Status Version::Get(const ReadOptions& options,
 
     // Sort files since in level 0 we can simply look starting from the newest
     // file.
-    if (level == 0) {
+    if (level == 0 || short_circuit) {
       std::sort(files.begin(), files.end(), NewestFirst);
     }
 
@@ -581,7 +582,7 @@ Status Version::Get(const ReadOptions& options,
         case kNotFound:
           break;      // Keep searching in other files
         case kFound:
-          if (level == 0 || files.size() == 1) {
+          if (level == 0 || short_circuit || files.size() == 1) {
             return s; // no need to look at other files
           }
 
@@ -592,7 +593,7 @@ Status Version::Get(const ReadOptions& options,
           }
           break; // keep searching in other files
         case kDeleted:
-          if (level == 0 || files.size() == 1) {
+          if (level == 0 || short_circuit || files.size() == 1) {
             s = Status::NotFound(Slice());  // Use empty error message for speed
             return s;
           }
index 6b642f55528009552cb35048f314e1d4ace42edc..c070e1bca88498b38cb60bb189e1d8140ff0fbfb 100644 (file)
@@ -121,13 +121,18 @@ class Version {
 
   // Lookup the value for key.  If found, store it in *val and
   // return OK.  Else return a non-OK status.  Fills *stats.
+  //
+  // If short_circuit is true, then if multiple files in a level have a range
+  // that overlaps the given key: The files are sorted from largest to smallest
+  // file number and the first file that contains the lookup key's value is
+  // used and the other files are skip.
   // REQUIRES: lock is not held
   struct GetStats {
     FileMetaData* seek_file;
     int seek_file_level;
   };
   Status Get(const ReadOptions&, const LookupKey& key, std::string* val,
-             GetStats* stats);
+             GetStats* stats, bool short_circuit = false);
 
   // Adds "stats" into the current state.  Returns true if a new
   // compaction may need to be triggered, false otherwise.