]> git-server-git.apps.pok.os.sepia.ceph.com Git - rocksdb.git/commitdiff
Fixed `Block::MetricsIter` to only create metrics on successful seek and added readho...
authorKosie van der Merwe <kosie.vandermerwe@gmail.com>
Fri, 15 Feb 2013 20:05:04 +0000 (12:05 -0800)
committerKosie van der Merwe <kosie.vandermerwe@gmail.com>
Fri, 15 Feb 2013 20:05:04 +0000 (12:05 -0800)
Summary:
Changed `Block::MetricsIter` to only record metrics when the seek finds the value the user was looking for.

Added extra benchmark to db_bench.

Test Plan: make check

Reviewers: vamsi, dhruba

Reviewed By: vamsi

CC: leveldb
Differential Revision: https://reviews.facebook.net/D8433

db/db_bench.cc
db/dbformat.cc
db/dbformat.h
table/block.cc
table/block.h
table/block_test.cc

index 8469d6c3aac63cae45f8c078e6af8410d8a7301f..5258c07e27157f7be2e780e49578b6b05ca3a02f 100644 (file)
@@ -741,6 +741,8 @@ class Benchmark {
         method = &Benchmark::ReadWhileWriting;
       } else if (name == Slice("readrandomwriterandom")) {
         method = &Benchmark::ReadRandomWriteRandom;
+      } else if (name == Slice("readhotwriterandom")) {
+        method = &Benchmark::ReadHotWriteRandom;
       } else if (name == Slice("compact")) {
         method = &Benchmark::Compact;
       } else if (name == Slice("crc32c")) {
@@ -1261,6 +1263,54 @@ class Benchmark {
     thread->stats.AddMessage(msg);
   }
 
+  void ReadHotWriteRandom(ThreadState* thread) {
+    ReadOptions options(FLAGS_verify_checksum, true);
+    RandomGenerator gen;
+    std::string value;
+    long found = 0;
+    int get_weight = 0;
+    int put_weight = 0;
+    long reads_done = 0;
+    long writes_done = 0;
+    // the number of iterations is the larger of read_ or write_
+    for (long i = 0; i < readwrites_; i++) {
+      char key[100];
+      if (get_weight == 0 && put_weight == 0) {
+        // one batch complated, reinitialize for next batch
+        get_weight = FLAGS_readwritepercent;
+        put_weight = 100 - get_weight;
+      }
+      int k = thread->rand.Next() % FLAGS_num;
+      if (get_weight > 0) {
+        k = k/100*100;
+      }
+      snprintf(key, sizeof(key), "%016d", k);
+      if (get_weight > 0) {
+        // do all the gets first
+        if (db_->Get(options, key, &value).ok()) {
+          found++;
+        }
+        get_weight--;
+        reads_done++;
+      } else  if (put_weight > 0) {
+        // then do all the corresponding number of puts
+        // for all the gets we have done earlier
+        Status s = db_->Put(write_options_, key, gen.Generate(value_size_));
+        if (!s.ok()) {
+          fprintf(stderr, "put error: %s\n", s.ToString().c_str());
+          exit(1);
+        }
+        put_weight--;
+        writes_done++;
+      }
+      thread->stats.FinishedSingleOp(db_);
+    }
+    char msg[100];
+    snprintf(msg, sizeof(msg), "( reads:%ld writes:%ld total:%ld )",
+             reads_done, writes_done, readwrites_);
+    thread->stats.AddMessage(msg);
+  }
+
   void Compact(ThreadState* thread) {
     db_->CompactRange(NULL, NULL);
   }
index 7b7336abc5f1d97ab1ee56e0e7c292a6eccc3ead..ce03aab3e22e23a2c1244079887921550c38e233 100644 (file)
@@ -47,13 +47,16 @@ const char* InternalKeyComparator::Name() const {
   return "leveldb.InternalKeyComparator";
 }
 
-int InternalKeyComparator::Compare(const Slice& akey, const Slice& bkey) const {
+int InternalKeyComparator::Compare(const Slice& akey, const Slice& bkey,
+                                   bool* user_key_eq) const {
+  assert(user_key_eq != nullptr);
   // Order by:
   //    increasing user key (according to user-supplied comparator)
   //    decreasing sequence number
   //    decreasing type (though sequence# should be enough to disambiguate)
   int r = user_comparator_->Compare(ExtractUserKey(akey), ExtractUserKey(bkey));
   if (r == 0) {
+    *user_key_eq = true;
     const uint64_t anum = DecodeFixed64(akey.data() + akey.size() - 8);
     const uint64_t bnum = DecodeFixed64(bkey.data() + bkey.size() - 8);
     if (anum > bnum) {
@@ -61,10 +64,17 @@ int InternalKeyComparator::Compare(const Slice& akey, const Slice& bkey) const {
     } else if (anum < bnum) {
       r = +1;
     }
+  } else {
+    *user_key_eq = false;
   }
   return r;
 }
 
+int InternalKeyComparator::Compare(const Slice& akey, const Slice& bkey) const {
+  bool ignored;
+  return Compare(akey, bkey, &ignored);
+}
+
 void InternalKeyComparator::FindShortestSeparator(
       std::string* start,
       const Slice& limit) const {
index 66f0e76fa6405287f69d5ab358681db837182480..748fbee831ef02c4eab284c80bef2ff6ebd9ee7b 100644 (file)
@@ -96,6 +96,13 @@ class InternalKeyComparator : public Comparator {
 
   const Comparator* user_comparator() const { return user_comparator_; }
 
+  // Compares "a" and "b" and sets "user_key_eq" to true if they have an user
+  // key that compares equal under the user comparator and false otherwise.
+  //
+  // We need to expose this information in terms of an extra flag, because ties
+  // where both values have the same user key are broken with sequence number
+  // and record type.
+  int Compare(const Slice& a, const Slice& b, bool* user_key_eq) const;
   int Compare(const InternalKey& a, const InternalKey& b) const;
 };
 
index 18d021f886cf9838f017383ad8d23580473dadef..387517e45c8129d47a6c02167e1efc77950634ed 100644 (file)
@@ -8,6 +8,7 @@
 
 #include <vector>
 #include <algorithm>
+#include "db/dbformat.h"
 #include "leveldb/db.h"
 #include "leveldb/comparator.h"
 #include "table/format.h"
@@ -120,6 +121,39 @@ class Block::Iter : public Iterator {
     value_ = Slice(data_ + offset, 0);
   }
 
+  // Returns the first restart index possibly containing "target".
+  // Returns "num_restarts_" if this search encounters an error key.
+  uint32_t FindFirstRestartPointContaining(const Slice& target) {
+    // Binary search in restart array to find the first restart point
+    // with a key >= target
+    uint32_t left = 0;
+    uint32_t right = num_restarts_ - 1;
+    while (left < right) {
+      uint32_t mid = (left + right + 1) / 2;
+      uint32_t region_offset = GetRestartPoint(mid);
+      uint32_t shared, non_shared, value_length;
+      const char* key_ptr = DecodeEntry(data_ + region_offset,
+                                        data_ + restarts_,
+                                        &shared, &non_shared, &value_length);
+      if (key_ptr == NULL || (shared != 0)) {
+        CorruptionError();
+        return num_restarts_;
+      }
+      Slice mid_key(key_ptr, non_shared);
+      if (Compare(mid_key, target) < 0) {
+        // Key at "mid" is smaller than "target".  Therefore all
+        // blocks before "mid" are uninteresting.
+        left = mid;
+      } else {
+        // Key at "mid" is >= "target".  Therefore all blocks at or
+        // after "mid" are uninteresting.
+        right = mid - 1;
+      }
+    }
+
+    return left;
+  }
+
  public:
   Iter(const Comparator* comparator,
        uint64_t file_number,
@@ -181,35 +215,8 @@ class Block::Iter : public Iterator {
   }
 
   virtual void Seek(const Slice& target) {
-    // Binary search in restart array to find the first restart point
-    // with a key >= target
-    uint32_t left = 0;
-    uint32_t right = num_restarts_ - 1;
-    while (left < right) {
-      uint32_t mid = (left + right + 1) / 2;
-      uint32_t region_offset = GetRestartPoint(mid);
-      uint32_t shared, non_shared, value_length;
-      const char* key_ptr = DecodeEntry(data_ + region_offset,
-                                        data_ + restarts_,
-                                        &shared, &non_shared, &value_length);
-      if (key_ptr == NULL || (shared != 0)) {
-        CorruptionError();
-        return;
-      }
-      Slice mid_key(key_ptr, non_shared);
-      if (Compare(mid_key, target) < 0) {
-        // Key at "mid" is smaller than "target".  Therefore all
-        // blocks before "mid" are uninteresting.
-        left = mid;
-      } else {
-        // Key at "mid" is >= "target".  Therefore all blocks at or
-        // after "mid" are uninteresting.
-        right = mid - 1;
-      }
-    }
-
     // Linear search (within restart block) for first key >= target
-    SeekToRestartPoint(left);
+    SeekToRestartPoint(FindFirstRestartPointContaining(target));
     while (true) {
       if (!ParseNextKey()) {
         return;
@@ -232,7 +239,7 @@ class Block::Iter : public Iterator {
     }
   }
 
- private:
+ protected:
   friend class Block;
 
   void CorruptionError() {
@@ -286,6 +293,7 @@ class Block::MetricsIter : public Block::Iter {
   Cache::Handle* cache_handle_;
   void* metrics_handler_;
   BlockMetrics* metrics_;
+  const InternalKeyComparator* icmp_;
 
  public:
   MetricsIter(const Comparator* comparator,
@@ -302,7 +310,8 @@ class Block::MetricsIter : public Block::Iter {
         cache_(cache),
         cache_handle_(cache_handle),
         metrics_handler_(metrics_handler),
-        metrics_(NULL) {
+        metrics_(NULL),
+        icmp_(dynamic_cast<const InternalKeyComparator*>(comparator_)) {
   }
 
   virtual ~MetricsIter() {
@@ -315,19 +324,35 @@ class Block::MetricsIter : public Block::Iter {
   }
 
   virtual void Seek(const Slice& target) {
-    Block::Iter::Seek(target);
-    RecordAccess();
+    if (icmp_ == nullptr) {
+      Block::Iter::Seek(target);
+      return;
+    }
+
+    SeekToRestartPoint(FindFirstRestartPointContaining(target));
+    bool user_key_eq = false;
+    while (true) {
+      if (!ParseNextKey()) {
+        return;
+      }
+      if (icmp_->Compare(key_, target, &user_key_eq) >= 0) {
+        break;
+      }
+    }
+
+    if (user_key_eq && Valid()) {
+      RecordAccess();
+    }
   }
 
  private:
   void RecordAccess() {
-    if (Valid()) {
-      if (metrics_ == nullptr) {
-        metrics_ = new BlockMetrics(file_number_, block_offset_,
-                                    num_restarts_, kBytesPerRestart);
-      }
-      metrics_->RecordAccess(restart_index_, restart_offset_);
+    assert(Valid());
+    if (metrics_ == nullptr) {
+      metrics_ = new BlockMetrics(file_number_, block_offset_,
+                                  num_restarts_, kBytesPerRestart);
     }
+    metrics_->RecordAccess(restart_index_, restart_offset_);
   }
 };
 
index 0a055008141db992d5bc189861b053e84ccae2cb..81a1c937baf8a3318cdd434af318e3642bd4b7d1 100644 (file)
@@ -37,7 +37,8 @@ class Block {
                         uint64_t block_offset);
 
   // Creates a new iterator that keeps track of accesses. When this iterator is
-  // deleted it frees the cache handle and passes the metrics to the cache specified.
+  // deleted it frees the cache handle and passes the metrics to the cache
+  // specified.
   // REQUIRES: cache, cache_handle, metrics_handler must be non-NULL
   Iterator* NewMetricsIterator(const Comparator* comparator,
                                uint64_t file_number,
index 57dedade86d4f7fcb5ec8cc98279dbff4b23d5ac..50adbe03576a8da8dded1a9b8085afca3f9b824c 100644 (file)
@@ -104,6 +104,130 @@ TEST(BlockTest, SimpleTest) {
   delete iter;
 }
 
+class MockCache : public Cache {
+ public:
+  bool generated_metrics;
+  bool was_released;
+
+ public:
+  MockCache() {
+    Reset();
+  }
+
+  // Stub implementations so class compiles
+  virtual Cache::Handle* Insert(const Slice& key, void* value, size_t charge,
+                         void (*deleter)(const Slice& key, void* value)) {
+    return nullptr;
+  }
+  virtual Cache::Handle* Lookup(const Slice& key) { return nullptr; }
+  virtual void* Value(Cache::Handle* handle) { return nullptr; }
+  virtual void Erase(const Slice& key) {}
+  virtual uint64_t NewId() { return 4;}
+  virtual size_t GetCapacity() { return 0; }
+
+  virtual void Release(Cache::Handle* handle) {
+    generated_metrics = false;
+    was_released = true;
+  }
+
+  virtual void ReleaseAndRecordMetrics(Cache::Handle* handle, void* handler,
+                                       BlockMetrics* metrics) {
+    generated_metrics = metrics != nullptr;
+    delete metrics;
+    was_released = true;
+  }
+
+  void Reset() {
+    generated_metrics = false;
+    was_released = false;
+  }
+};
+
+TEST(BlockTest, MetricsIter) {
+  Random rnd(301);
+  Options options = Options();
+  options.comparator = new InternalKeyComparator(options.comparator);
+  std::vector<std::string> keys;
+  BlockBuilder builder(&options);
+  int num_records = 1000;
+  char buf[20];
+  char* p = &buf[0];
+
+  // add a bunch of records to a block
+  for (int i = 0; i < num_records; i++) {
+    // generate random kvs
+    sprintf(p, "%6d", i);
+    std::string k(p);
+    std::string v = RandomString(&rnd, 100); // 100 byte values
+
+    // write kvs to the block
+    InternalKey ik(k, 100, kTypeValue);
+    Slice key = ik.Encode();
+    Slice value(v);
+    if (i % 2 == 0) { // only add even keys
+      builder.Add(key, value);
+    }
+
+    // remember kvs in a lookaside array
+    keys.push_back(key.ToString());
+  }
+
+  // read serialized contents of the block
+  Slice rawblock = builder.Finish();
+
+  // create block reader
+  BlockContents contents;
+  contents.data = rawblock;
+  contents.cachable = false;
+  contents.heap_allocated = false;
+  Block reader(contents);
+
+  MockCache c;
+  Cache::Handle* ch = reinterpret_cast<Cache::Handle*>(&c);
+
+  c.Reset();
+  Iterator* iter = reader.NewMetricsIterator(options.comparator, 0, 0,
+                                             &c, ch, &c);
+  delete iter;
+  ASSERT_TRUE(c.was_released);
+  ASSERT_TRUE(!c.generated_metrics) << "needlessly generated metrics.\n";
+
+  for (int i = 1; i < num_records; i += 2) {
+    c.Reset();
+    iter = reader.NewMetricsIterator(options.comparator, 0, 0, &c, ch, &c);
+    iter->Seek(keys[i]);
+    delete iter;
+    ASSERT_TRUE(c.was_released);
+    ASSERT_TRUE(!c.generated_metrics) << "generated metrics for unfound row.\n";
+  }
+
+  c.Reset();
+  iter = reader.NewMetricsIterator(options.comparator, 0, 0, &c, ch, &c);
+  iter->SeekToFirst();
+  iter->Next();
+  iter->SeekToLast();
+  iter->Prev();
+  delete iter;
+  ASSERT_TRUE(c.was_released);
+  ASSERT_TRUE(!c.generated_metrics) << "generated metrics for non-Seek().\n";
+
+  for (int i = 0; i < num_records; i += 2) {
+    c.Reset();
+    iter = reader.NewMetricsIterator(options.comparator, 0, 0, &c, ch, &c);
+    iter->Seek(keys[i]);
+    delete iter;
+    ASSERT_TRUE(c.was_released);
+    ASSERT_TRUE(c.generated_metrics) << "didn't generate metrics for found row\n";
+  }
+
+  c.Reset();
+  iter = reader.NewMetricsIterator(options.comparator, 0, 0, &c, ch, &c);
+  iter->Seek(InternalKey("     2", 140, kTypeDeletion).Encode());
+  delete iter;
+  ASSERT_TRUE(c.was_released);
+  ASSERT_TRUE(c.generated_metrics) << "didn't generate metrics for Seek().\n";
+}
+
 class BlockMetricsTest {
  private:
  public: