From: Kosie van der Merwe Date: Fri, 15 Feb 2013 20:02:15 +0000 (-0800) Subject: Hotcold: Short circuiting behaviour for file lookups when finding key in hot file X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=ea12281d6f3b7e43d2e28d7444896a4c85a8ee95;p=rocksdb.git Hotcold: Short circuiting behaviour for file lookups when finding key in hot file Summary: Before this update both hot and cold file had to be queried in order to get the latest value for a given key. Now if the key is found in the hot file then we know the latest value is in the hot file. Changed compaction to not allow a user key to have a record with a smaller sequence number in the hot file if there is a record with a larger sequence number in the cold file. Added short circuiting to `Version::Get()` to assume that if a key is found in a newer file then it has a newer sequence number. Test Plan: `make check` ran `db_bench` compiled with assertions on and in hot cold mode. Reviewers: vamsi, dhruba Reviewed By: vamsi CC: leveldb Differential Revision: https://reviews.facebook.net/D8451 --- diff --git a/db/db_impl.cc b/db/db_impl.cc index 8b0c3484..14ef2ede 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1421,6 +1421,12 @@ Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) { out.smallest.Clear(); out.largest.Clear(); compact->outputs.push_back(out); +#ifndef NDEBUG + if (i > 0) { + // Check that file numbers are strictly increasing. + assert(out.number > compact->outputs[compact->outputs.size()-2].number); + } +#endif // Make the output file std::string fname = TableFileName(dbname_, file_number); @@ -1458,6 +1464,10 @@ Status DBImpl::FinishCompactionOutputFile(CompactionState* compact, assert(!compact->outfiles.empty()); assert(!compact->builders.empty()); +#ifndef NDEBUG + std::vector> iters; +#endif + Status s; size_t i; @@ -1499,7 +1509,11 @@ Status DBImpl::FinishCompactionOutputFile(CompactionState* compact, output_number, current_bytes); s = iter->status(); +#ifndef NDEBUG + iters.push_back(unique_ptr(iter)); +#else delete iter; +#endif if (s.ok()) { Log(options_.info_log, "Generated table #%llu: %lld keys, %lld bytes", @@ -1517,6 +1531,57 @@ Status DBImpl::FinishCompactionOutputFile(CompactionState* compact, } } +#ifndef NDEBUG + if (s.ok() && is_hotcold_ && iters.size() >= 2) { + // Check that for a given user key, that there is no cold record with a + // greater sequence number than a record in the hot file. + assert(iters.size() == 2); // we should only have 2 files but we may have + // less if one of the files is empty and hence + // gets dropped. + assert(iters[0]->status().ok()); + assert(iters[1]->status().ok()); + + Iterator* hot = iters[1].get(); + Iterator* cold = iters[0].get(); + + hot->SeekToFirst(); + for(hot->SeekToFirst(); hot->Valid(); hot->Next()) { + ParsedInternalKey hot_key; + if (!ParseInternalKey(hot->key(), &hot_key)) { + continue; // skip error keys. + } + + // Move the cold iterator to the position past the hot key. + cold->Seek(hot->key()); + + // Move to the position just before the hot key, skipping error keys. + ParsedInternalKey cold_key; + if (cold->Valid()) { + cold->Prev(); + } else { + assert(cold->status().ok()); // bomb out if we suddenly had an IO error + cold->SeekToFirst(); // Assuming !cold->Valid() because hot->key() is + // past the end of cold's last key. + } + while (cold->Valid() && !ParseInternalKey(cold->key(), &cold_key)) { + cold->Prev(); + } + + assert(cold->status().ok()); // bomb out if we suddenly had an IO error + if (!cold->Valid()) { + // Went past the start of the cold file iterator, so look at next hot + // key. + continue; + } + + // If the hot user key and the cold user key compare equal then the cold + // key has a greater sequence number than the hot key + assert(user_comparator()->Compare(hot_key.user_key, + cold_key.user_key) != 0); + } + } +#endif + compact->num_outfiles = 0; compact->builders.clear(); compact->outfiles.clear(); @@ -1774,6 +1839,11 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { ParsedInternalKey ikey; std::string current_user_key; bool has_current_user_key = false; + bool encountered_error_key = false; + bool can_be_hot = true; // States whether the current user key can be hot or not. + // Once the user key has been written to the cold + // file other versions of it can no longer be written + // to the hot file. SequenceNumber last_sequence_for_key __attribute__((unused)) = kMaxSequenceNumber; SequenceNumber visible_in_snapshot = kMaxSequenceNumber; @@ -1806,6 +1876,8 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { if (!ParseInternalKey(key, &ikey)) { // Do not hide error keys current_user_key.clear(); + encountered_error_key = true; + can_be_hot = false; has_current_user_key = false; last_sequence_for_key = kMaxSequenceNumber; visible_in_snapshot = kMaxSequenceNumber; @@ -1816,6 +1888,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { // First occurrence of this user key current_user_key.assign(ikey.user_key.data(), ikey.user_key.size()); has_current_user_key = true; + can_be_hot = true; last_sequence_for_key = kMaxSequenceNumber; visible_in_snapshot = kMaxSequenceNumber; } @@ -1892,12 +1965,25 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { // Select output file size_t outfile_idx = 0; - if (is_hotcold_ && IsRecordHot(input.get(), metrics_db_, ReadOptions(), - &block_metrics_store)) { - outfile_idx = 1; + if (is_hotcold_ && + !encountered_error_key && + can_be_hot && + IsRecordHot(input.get(), metrics_db_, ReadOptions(), + &block_metrics_store)) { + outfile_idx = 1; // Use 1 as the hot file as it has a larger file + // number. So we can use the short circuiting logic + // of Version::Get() to avoid having to do lookups per + // level for hot keys. + // + // Due to short circuiting if we encounter an error + // key, we need to make all keys after it cold or else + // our short circuiting logic may be false. As a run + // of user keys can be interrupted with an error key. } assert(outfile_idx < compact->num_outfiles); - + if (outfile_idx == 0) { + can_be_hot = false; + } if (compact->builders[outfile_idx]->NumEntries() == 0) { compact->current_output(outfile_idx)->smallest.DecodeFrom(key); @@ -2089,7 +2175,9 @@ Status DBImpl::Get(const ReadOptions& options, if (read_options.record_accesses && is_hotcold_) { read_options.metrics_handler = this; } - s = current->Get(read_options, lkey, value, &stats); + // If the database is hot-cold then we know we can use short circuiting + // as the compaction logic guarantees that this will be valid. + s = current->Get(read_options, lkey, value, &stats, is_hotcold_); have_stat_update = true; } mutex_.Lock(); diff --git a/db/db_test.cc b/db/db_test.cc index 1fdcd26b..e6f7284a 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -3053,6 +3053,84 @@ TEST(DBTest, HotCold) { } while (ChangeOptions()); } +TEST(DBTest, HotColdShortCircuit) { + const uint32_t kNumKeys = 1 << 9; + const uint32_t kKeysToSkip = 6; + do { + DestroyAndReopenWithHotCold(NULL); + ASSERT_TRUE(db_ != NULL); + + + // Skip if this db is not hotcold + if (!dbfull()->TEST_IsHotCold()) { + continue; + } + + // Populate db and push all data to sstable + for (uint32_t key = 0; key < kNumKeys; ++key) { + std::string skey; + PutFixed32(&skey, key); + + Put(skey, skey); + } + dbfull()->TEST_CompactMemTable(); + dbfull()->CompactRange(NULL, NULL); + + // Flush metrics if any and check that all records are cold + dbfull()->TEST_ForceFlushMetrics(); + { + ReadOptions opts; + opts.record_accesses = false; + Iterator* iter = db_->NewIterator(opts); + for (uint32_t key = 0; key < kNumKeys; ++key) { + std::string skey; + PutFixed32(&skey, key); + + iter->Seek(skey); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(skey, iter->key().ToString()); + + ASSERT_TRUE(!dbfull()->TEST_IsHot(iter)); + } + delete iter; + } + + // Access a few rows and flush the metrics to the metrics db. + for (uint32_t key = 0; key < kNumKeys; key += kKeysToSkip) { + std::string skey; + PutFixed32(&skey, key); + + ASSERT_EQ(skey, Get(skey)); + } + dbfull()->TEST_ForceFlushMetrics(); + + const Snapshot* snapshot = dbfull()->GetSnapshot(); + + // Populate db and push all data to sstable + for (uint32_t key = 0; key < kNumKeys; ++key) { + std::string skey; + PutFixed32(&skey, key); + + Put(skey, skey + "0"); + } + dbfull()->TEST_CompactMemTable(); + dbfull()->CompactRange(NULL, NULL); + + // Check that all values are valid and that short circuiting logic did not fail us. + { + for (uint32_t key = 0; key < kNumKeys; ++key) { + std::string skey; + PutFixed32(&skey, key); + + ASSERT_EQ(skey, Get(skey, snapshot)); + ASSERT_EQ(skey + "0", Get(skey)); + } + } + + dbfull()->ReleaseSnapshot(snapshot); + } while (ChangeOptions()); +} + std::string MakeKey(unsigned int num) { char buf[30]; snprintf(buf, sizeof(buf), "%016u", num); diff --git a/db/version_set.cc b/db/version_set.cc index 9f0457e3..7a1c6854 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -479,7 +479,8 @@ Version::Version(VersionSet* vset, uint64_t version_number) Status Version::Get(const ReadOptions& options, const LookupKey& k, std::string* value, - GetStats* stats) { + GetStats* stats, + bool short_circuit) { Slice ikey = k.internal_key(); Slice user_key = k.user_key(); const Comparator* ucmp = vset_->icmp_.user_comparator(); @@ -534,7 +535,7 @@ Status Version::Get(const ReadOptions& options, // Sort files since in level 0 we can simply look starting from the newest // file. - if (level == 0) { + if (level == 0 || short_circuit) { std::sort(files.begin(), files.end(), NewestFirst); } @@ -581,7 +582,7 @@ Status Version::Get(const ReadOptions& options, case kNotFound: break; // Keep searching in other files case kFound: - if (level == 0 || files.size() == 1) { + if (level == 0 || short_circuit || files.size() == 1) { return s; // no need to look at other files } @@ -592,7 +593,7 @@ Status Version::Get(const ReadOptions& options, } break; // keep searching in other files case kDeleted: - if (level == 0 || files.size() == 1) { + if (level == 0 || short_circuit || files.size() == 1) { s = Status::NotFound(Slice()); // Use empty error message for speed return s; } diff --git a/db/version_set.h b/db/version_set.h index 6b642f55..c070e1bc 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -121,13 +121,18 @@ class Version { // Lookup the value for key. If found, store it in *val and // return OK. Else return a non-OK status. Fills *stats. + // + // If short_circuit is true, then if multiple files in a level have a range + // that overlaps the given key: The files are sorted from largest to smallest + // file number and the first file that contains the lookup key's value is + // used and the other files are skip. // REQUIRES: lock is not held struct GetStats { FileMetaData* seek_file; int seek_file_level; }; Status Get(const ReadOptions&, const LookupKey& key, std::string* val, - GetStats* stats); + GetStats* stats, bool short_circuit = false); // Adds "stats" into the current state. Returns true if a new // compaction may need to be triggered, false otherwise.