From: Kosie van der Merwe Date: Fri, 15 Feb 2013 20:05:04 +0000 (-0800) Subject: Fixed `Block::MetricsIter` to only create metrics on successful seek and added readho... X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=9a9a560ba9e250e4347f2fd20df012f3302cf35c;p=rocksdb.git Fixed `Block::MetricsIter` to only create metrics on successful seek and added readhotwriterandom to db_bench Summary: Changed `Block::MetricsIter` to only record metrics when the seek finds the value the user was looking for. Added extra benchmark to db_bench. Test Plan: make check Reviewers: vamsi, dhruba Reviewed By: vamsi CC: leveldb Differential Revision: https://reviews.facebook.net/D8433 --- diff --git a/db/db_bench.cc b/db/db_bench.cc index 8469d6c3..5258c07e 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -741,6 +741,8 @@ class Benchmark { method = &Benchmark::ReadWhileWriting; } else if (name == Slice("readrandomwriterandom")) { method = &Benchmark::ReadRandomWriteRandom; + } else if (name == Slice("readhotwriterandom")) { + method = &Benchmark::ReadHotWriteRandom; } else if (name == Slice("compact")) { method = &Benchmark::Compact; } else if (name == Slice("crc32c")) { @@ -1261,6 +1263,54 @@ class Benchmark { thread->stats.AddMessage(msg); } + void ReadHotWriteRandom(ThreadState* thread) { + ReadOptions options(FLAGS_verify_checksum, true); + RandomGenerator gen; + std::string value; + long found = 0; + int get_weight = 0; + int put_weight = 0; + long reads_done = 0; + long writes_done = 0; + // the number of iterations is the larger of read_ or write_ + for (long i = 0; i < readwrites_; i++) { + char key[100]; + if (get_weight == 0 && put_weight == 0) { + // one batch complated, reinitialize for next batch + get_weight = FLAGS_readwritepercent; + put_weight = 100 - get_weight; + } + int k = thread->rand.Next() % FLAGS_num; + if (get_weight > 0) { + k = k/100*100; + } + snprintf(key, sizeof(key), "%016d", k); + if (get_weight > 0) { + // do all the gets first + if (db_->Get(options, key, &value).ok()) { + found++; + } + get_weight--; + reads_done++; + } else if (put_weight > 0) { + // then do all the corresponding number of puts + // for all the gets we have done earlier + Status s = db_->Put(write_options_, key, gen.Generate(value_size_)); + if (!s.ok()) { + fprintf(stderr, "put error: %s\n", s.ToString().c_str()); + exit(1); + } + put_weight--; + writes_done++; + } + thread->stats.FinishedSingleOp(db_); + } + char msg[100]; + snprintf(msg, sizeof(msg), "( reads:%ld writes:%ld total:%ld )", + reads_done, writes_done, readwrites_); + thread->stats.AddMessage(msg); + } + void Compact(ThreadState* thread) { db_->CompactRange(NULL, NULL); } diff --git a/db/dbformat.cc b/db/dbformat.cc index 7b7336ab..ce03aab3 100644 --- a/db/dbformat.cc +++ b/db/dbformat.cc @@ -47,13 +47,16 @@ const char* InternalKeyComparator::Name() const { return "leveldb.InternalKeyComparator"; } -int InternalKeyComparator::Compare(const Slice& akey, const Slice& bkey) const { +int InternalKeyComparator::Compare(const Slice& akey, const Slice& bkey, + bool* user_key_eq) const { + assert(user_key_eq != nullptr); // Order by: // increasing user key (according to user-supplied comparator) // decreasing sequence number // decreasing type (though sequence# should be enough to disambiguate) int r = user_comparator_->Compare(ExtractUserKey(akey), ExtractUserKey(bkey)); if (r == 0) { + *user_key_eq = true; const uint64_t anum = DecodeFixed64(akey.data() + akey.size() - 8); const uint64_t bnum = DecodeFixed64(bkey.data() + bkey.size() - 8); if (anum > bnum) { @@ -61,10 +64,17 @@ int InternalKeyComparator::Compare(const Slice& akey, const Slice& bkey) const { } else if (anum < bnum) { r = +1; } + } else { + *user_key_eq = false; } return r; } +int InternalKeyComparator::Compare(const Slice& akey, const Slice& bkey) const { + bool ignored; + return Compare(akey, bkey, &ignored); +} + void InternalKeyComparator::FindShortestSeparator( std::string* start, const Slice& limit) const { diff --git a/db/dbformat.h b/db/dbformat.h index 66f0e76f..748fbee8 100644 --- a/db/dbformat.h +++ b/db/dbformat.h @@ -96,6 +96,13 @@ class InternalKeyComparator : public Comparator { const Comparator* user_comparator() const { return user_comparator_; } + // Compares "a" and "b" and sets "user_key_eq" to true if they have an user + // key that compares equal under the user comparator and false otherwise. + // + // We need to expose this information in terms of an extra flag, because ties + // where both values have the same user key are broken with sequence number + // and record type. + int Compare(const Slice& a, const Slice& b, bool* user_key_eq) const; int Compare(const InternalKey& a, const InternalKey& b) const; }; diff --git a/table/block.cc b/table/block.cc index 18d021f8..387517e4 100644 --- a/table/block.cc +++ b/table/block.cc @@ -8,6 +8,7 @@ #include #include +#include "db/dbformat.h" #include "leveldb/db.h" #include "leveldb/comparator.h" #include "table/format.h" @@ -120,6 +121,39 @@ class Block::Iter : public Iterator { value_ = Slice(data_ + offset, 0); } + // Returns the first restart index possibly containing "target". + // Returns "num_restarts_" if this search encounters an error key. + uint32_t FindFirstRestartPointContaining(const Slice& target) { + // Binary search in restart array to find the first restart point + // with a key >= target + uint32_t left = 0; + uint32_t right = num_restarts_ - 1; + while (left < right) { + uint32_t mid = (left + right + 1) / 2; + uint32_t region_offset = GetRestartPoint(mid); + uint32_t shared, non_shared, value_length; + const char* key_ptr = DecodeEntry(data_ + region_offset, + data_ + restarts_, + &shared, &non_shared, &value_length); + if (key_ptr == NULL || (shared != 0)) { + CorruptionError(); + return num_restarts_; + } + Slice mid_key(key_ptr, non_shared); + if (Compare(mid_key, target) < 0) { + // Key at "mid" is smaller than "target". Therefore all + // blocks before "mid" are uninteresting. + left = mid; + } else { + // Key at "mid" is >= "target". Therefore all blocks at or + // after "mid" are uninteresting. + right = mid - 1; + } + } + + return left; + } + public: Iter(const Comparator* comparator, uint64_t file_number, @@ -181,35 +215,8 @@ class Block::Iter : public Iterator { } virtual void Seek(const Slice& target) { - // Binary search in restart array to find the first restart point - // with a key >= target - uint32_t left = 0; - uint32_t right = num_restarts_ - 1; - while (left < right) { - uint32_t mid = (left + right + 1) / 2; - uint32_t region_offset = GetRestartPoint(mid); - uint32_t shared, non_shared, value_length; - const char* key_ptr = DecodeEntry(data_ + region_offset, - data_ + restarts_, - &shared, &non_shared, &value_length); - if (key_ptr == NULL || (shared != 0)) { - CorruptionError(); - return; - } - Slice mid_key(key_ptr, non_shared); - if (Compare(mid_key, target) < 0) { - // Key at "mid" is smaller than "target". Therefore all - // blocks before "mid" are uninteresting. - left = mid; - } else { - // Key at "mid" is >= "target". Therefore all blocks at or - // after "mid" are uninteresting. - right = mid - 1; - } - } - // Linear search (within restart block) for first key >= target - SeekToRestartPoint(left); + SeekToRestartPoint(FindFirstRestartPointContaining(target)); while (true) { if (!ParseNextKey()) { return; @@ -232,7 +239,7 @@ class Block::Iter : public Iterator { } } - private: + protected: friend class Block; void CorruptionError() { @@ -286,6 +293,7 @@ class Block::MetricsIter : public Block::Iter { Cache::Handle* cache_handle_; void* metrics_handler_; BlockMetrics* metrics_; + const InternalKeyComparator* icmp_; public: MetricsIter(const Comparator* comparator, @@ -302,7 +310,8 @@ class Block::MetricsIter : public Block::Iter { cache_(cache), cache_handle_(cache_handle), metrics_handler_(metrics_handler), - metrics_(NULL) { + metrics_(NULL), + icmp_(dynamic_cast(comparator_)) { } virtual ~MetricsIter() { @@ -315,19 +324,35 @@ class Block::MetricsIter : public Block::Iter { } virtual void Seek(const Slice& target) { - Block::Iter::Seek(target); - RecordAccess(); + if (icmp_ == nullptr) { + Block::Iter::Seek(target); + return; + } + + SeekToRestartPoint(FindFirstRestartPointContaining(target)); + bool user_key_eq = false; + while (true) { + if (!ParseNextKey()) { + return; + } + if (icmp_->Compare(key_, target, &user_key_eq) >= 0) { + break; + } + } + + if (user_key_eq && Valid()) { + RecordAccess(); + } } private: void RecordAccess() { - if (Valid()) { - if (metrics_ == nullptr) { - metrics_ = new BlockMetrics(file_number_, block_offset_, - num_restarts_, kBytesPerRestart); - } - metrics_->RecordAccess(restart_index_, restart_offset_); + assert(Valid()); + if (metrics_ == nullptr) { + metrics_ = new BlockMetrics(file_number_, block_offset_, + num_restarts_, kBytesPerRestart); } + metrics_->RecordAccess(restart_index_, restart_offset_); } }; diff --git a/table/block.h b/table/block.h index 0a055008..81a1c937 100644 --- a/table/block.h +++ b/table/block.h @@ -37,7 +37,8 @@ class Block { uint64_t block_offset); // Creates a new iterator that keeps track of accesses. When this iterator is - // deleted it frees the cache handle and passes the metrics to the cache specified. + // deleted it frees the cache handle and passes the metrics to the cache + // specified. // REQUIRES: cache, cache_handle, metrics_handler must be non-NULL Iterator* NewMetricsIterator(const Comparator* comparator, uint64_t file_number, diff --git a/table/block_test.cc b/table/block_test.cc index 57dedade..50adbe03 100644 --- a/table/block_test.cc +++ b/table/block_test.cc @@ -104,6 +104,130 @@ TEST(BlockTest, SimpleTest) { delete iter; } +class MockCache : public Cache { + public: + bool generated_metrics; + bool was_released; + + public: + MockCache() { + Reset(); + } + + // Stub implementations so class compiles + virtual Cache::Handle* Insert(const Slice& key, void* value, size_t charge, + void (*deleter)(const Slice& key, void* value)) { + return nullptr; + } + virtual Cache::Handle* Lookup(const Slice& key) { return nullptr; } + virtual void* Value(Cache::Handle* handle) { return nullptr; } + virtual void Erase(const Slice& key) {} + virtual uint64_t NewId() { return 4;} + virtual size_t GetCapacity() { return 0; } + + virtual void Release(Cache::Handle* handle) { + generated_metrics = false; + was_released = true; + } + + virtual void ReleaseAndRecordMetrics(Cache::Handle* handle, void* handler, + BlockMetrics* metrics) { + generated_metrics = metrics != nullptr; + delete metrics; + was_released = true; + } + + void Reset() { + generated_metrics = false; + was_released = false; + } +}; + +TEST(BlockTest, MetricsIter) { + Random rnd(301); + Options options = Options(); + options.comparator = new InternalKeyComparator(options.comparator); + std::vector keys; + BlockBuilder builder(&options); + int num_records = 1000; + char buf[20]; + char* p = &buf[0]; + + // add a bunch of records to a block + for (int i = 0; i < num_records; i++) { + // generate random kvs + sprintf(p, "%6d", i); + std::string k(p); + std::string v = RandomString(&rnd, 100); // 100 byte values + + // write kvs to the block + InternalKey ik(k, 100, kTypeValue); + Slice key = ik.Encode(); + Slice value(v); + if (i % 2 == 0) { // only add even keys + builder.Add(key, value); + } + + // remember kvs in a lookaside array + keys.push_back(key.ToString()); + } + + // read serialized contents of the block + Slice rawblock = builder.Finish(); + + // create block reader + BlockContents contents; + contents.data = rawblock; + contents.cachable = false; + contents.heap_allocated = false; + Block reader(contents); + + MockCache c; + Cache::Handle* ch = reinterpret_cast(&c); + + c.Reset(); + Iterator* iter = reader.NewMetricsIterator(options.comparator, 0, 0, + &c, ch, &c); + delete iter; + ASSERT_TRUE(c.was_released); + ASSERT_TRUE(!c.generated_metrics) << "needlessly generated metrics.\n"; + + for (int i = 1; i < num_records; i += 2) { + c.Reset(); + iter = reader.NewMetricsIterator(options.comparator, 0, 0, &c, ch, &c); + iter->Seek(keys[i]); + delete iter; + ASSERT_TRUE(c.was_released); + ASSERT_TRUE(!c.generated_metrics) << "generated metrics for unfound row.\n"; + } + + c.Reset(); + iter = reader.NewMetricsIterator(options.comparator, 0, 0, &c, ch, &c); + iter->SeekToFirst(); + iter->Next(); + iter->SeekToLast(); + iter->Prev(); + delete iter; + ASSERT_TRUE(c.was_released); + ASSERT_TRUE(!c.generated_metrics) << "generated metrics for non-Seek().\n"; + + for (int i = 0; i < num_records; i += 2) { + c.Reset(); + iter = reader.NewMetricsIterator(options.comparator, 0, 0, &c, ch, &c); + iter->Seek(keys[i]); + delete iter; + ASSERT_TRUE(c.was_released); + ASSERT_TRUE(c.generated_metrics) << "didn't generate metrics for found row\n"; + } + + c.Reset(); + iter = reader.NewMetricsIterator(options.comparator, 0, 0, &c, ch, &c); + iter->Seek(InternalKey(" 2", 140, kTypeDeletion).Encode()); + delete iter; + ASSERT_TRUE(c.was_released); + ASSERT_TRUE(c.generated_metrics) << "didn't generate metrics for Seek().\n"; +} + class BlockMetricsTest { private: public: