]> git-server-git.apps.pok.os.sepia.ceph.com Git - rocksdb.git/commitdiff
Hotcold: Changing compaction based on hotness of record.
authorKosie van der Merwe <kosie.vandermerwe@gmail.com>
Fri, 15 Feb 2013 19:51:50 +0000 (11:51 -0800)
committerKosie van der Merwe <kosie.vandermerwe@gmail.com>
Fri, 15 Feb 2013 19:51:50 +0000 (11:51 -0800)
Summary:
Changed `DoCompactionWork()` to output multiple files and imported https://reviews.facebook.net/D8025 (Ideally all issues with version_set.h, version_set.cc and version_set_test.cc should be brought up there so I can keep that patch up to date and we can decide whether to mainline it separately) so we can handle overlapping files in level 1+.

Added code to determine whether a record is hot or cold given the metrics DB and an iterator on the database (which allows us to get all the necessary data in order to lookup the hotness)

Test Plan:
make check

db_bench --benchmarks=readrandomwriterandom --hotcold=1

Reviewers: dhruba, vamsi, chip, heyongqiang

Reviewed By: vamsi

CC: leveldb
Differential Revision: https://reviews.facebook.net/D8163

18 files changed:
db/db_impl.cc
db/db_impl.h
db/db_iter.cc
db/db_test.cc
db/version_set.cc
db/version_set.h
db/version_set_test.cc
include/leveldb/cache.h
include/leveldb/iterator.h
include/leveldb/options.h
table/block.cc
table/block.h
table/merger.cc
table/metrics_info.cc [new file with mode: 0644]
table/metrics_info.h [new file with mode: 0644]
table/table.cc
table/two_level_iterator.cc
util/cache.cc

index 89fbadcae42a8dbfdfd021209a247c727968e3b8..2ce5750f1f7c963f942b54904a3062bd36b1b562 100644 (file)
@@ -33,6 +33,7 @@
 #include "port/port.h"
 #include "table/block.h"
 #include "table/merger.h"
+#include "table/metrics_info.h"
 #include "table/two_level_iterator.h"
 #include "util/coding.h"
 #include "util/logging.h"
@@ -96,6 +97,7 @@ struct DBImpl::CompactionState {
   // Files produced by compaction
   struct Output {
     uint64_t number;
+    bool skip; // Whether this output should be skipped due to being empty.
     uint64_t file_size;
     InternalKey smallest, largest;
   };
@@ -103,17 +105,24 @@ struct DBImpl::CompactionState {
   std::list<uint64_t> allocated_file_numbers;
 
   // State kept for output being generated
-  WritableFile* outfile;
-  TableBuilder* builder;
+  // We have potentially have more than one outfile due to hot-cold separation
+  // needing both a hot file and a cold file to output to.
+  size_t num_outfiles;
+  WritableFile** outfiles;
+  TableBuilder** builders;
 
   uint64_t total_bytes;
 
-  Output* current_output() { return &outputs[outputs.size()-1]; }
+  Output* current_output(size_t idx) {
+    assert(idx < num_outfiles);
+    return &outputs[outputs.size()-num_outfiles+idx];
+  }
 
   explicit CompactionState(Compaction* c)
       : compaction(c),
-        outfile(NULL),
-        builder(NULL),
+        num_outfiles(0),
+        outfiles(NULL),
+        builders(NULL),
         total_bytes(0) {
   }
 };
@@ -1168,6 +1177,15 @@ Status DBImpl::TEST_CompactMemTable() {
   return FlushMemTable(FlushOptions());
 }
 
+void DBImpl::TEST_ForceFlushMetrics() {
+  assert(is_hotcold_);
+  assert(options_.block_cache != NULL);
+
+  options_.block_cache->ForceFlushMetrics();
+
+  TEST_WaitForMetricsFlush();
+}
+
 Status DBImpl::TEST_WaitForCompactMemTable() {
   return WaitForCompactMemTable();
 }
@@ -1181,6 +1199,13 @@ Status DBImpl::TEST_WaitForCompact() {
   return bg_error_;
 }
 
+void DBImpl::TEST_WaitForMetricsFlush() {
+  MutexLock l(&mutex_);
+  while (bg_flushing_metrics_scheduled_) {
+    bg_cv_.Wait();
+  }
+}
+
 void DBImpl::MaybeScheduleCompaction() {
   mutex_.AssertHeld();
   if (bg_compaction_scheduled_ >= options_.max_background_compactions) {
@@ -1345,14 +1370,23 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress,
 
 void DBImpl::CleanupCompaction(CompactionState* compact) {
   mutex_.AssertHeld();
-  if (compact->builder != NULL) {
+  if (compact->builders != NULL) {
     // May happen if we get a shutdown call in the middle of compaction
-    compact->builder->Abandon();
-    delete compact->builder;
+
+    for (size_t i = 0; i < compact->num_outfiles; ++i) {
+      compact->builders[i]->Abandon();
+      delete compact->builders[i];
+      delete compact->outfiles[i];
+    }
+    delete[] compact->builders;
+    delete[] compact->outfiles;
+
+    compact->num_outfiles = 0;
+    compact->builders = NULL;
+    compact->outfiles = NULL;
   } else {
-    assert(compact->outfile == NULL);
+    assert(compact->outfiles == NULL);
   }
-  delete compact->outfile;
   for (size_t i = 0; i < compact->outputs.size(); i++) {
     const CompactionState::Output& out = compact->outputs[i];
     pending_outputs_.erase(out.number);
@@ -1366,7 +1400,7 @@ void DBImpl::CleanupCompaction(CompactionState* compact) {
 void DBImpl::AllocateCompactionOutputFileNumbers(CompactionState* compact) {
   mutex_.AssertHeld();
   assert(compact != NULL);
-  assert(compact->builder == NULL);
+  assert(compact->builders == NULL);
   int filesNeeded = compact->compaction->num_input_files(1);
   for (int i = 0; i < filesNeeded; i++) {
     uint64_t file_number = versions_->NewFileNumber();
@@ -1389,32 +1423,56 @@ void DBImpl::ReleaseCompactionUnusedFileNumbers(CompactionState* compact) {
 
 Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) {
   assert(compact != NULL);
-  assert(compact->builder == NULL);
-  uint64_t file_number;
-  // If we have not yet exhausted the pre-allocated file numbers,
-  // then use the one from the front. Otherwise, we have to acquire
-  // the heavyweight lock and allocate a new file number.
-  if (!compact->allocated_file_numbers.empty()) {
-    file_number = compact->allocated_file_numbers.front();
-    compact->allocated_file_numbers.pop_front();
-  } else {
-    mutex_.Lock();
-    file_number = versions_->NewFileNumber();
-    pending_outputs_.insert(file_number);
-    mutex_.Unlock();
-  }
-  CompactionState::Output out;
-  out.number = file_number;
-  out.smallest.Clear();
-  out.largest.Clear();
-  compact->outputs.push_back(out);
+  assert(compact->builders == NULL);
 
-  // Make the output file
-  std::string fname = TableFileName(dbname_, file_number);
-  Status s = env_->NewWritableFile(fname, &compact->outfile);
-  if (s.ok()) {
-    compact->builder = new TableBuilder(options_, compact->outfile,
-                                        compact->compaction->level() + 1);
+  compact->num_outfiles = is_hotcold_?2:1;
+  compact->builders = new TableBuilder*[compact->num_outfiles];
+  compact->outfiles = new WritableFile*[compact->num_outfiles];
+
+  Status s;
+  for (size_t i = 0; i < compact->num_outfiles; ++i) {
+    uint64_t file_number;
+    // If we have not yet exhausted the pre-allocated file numbers,
+    // then use the one from the front. Otherwise, we have to acquire
+    // the heavyweight lock and allocate a new file number.
+    if (!compact->allocated_file_numbers.empty()) {
+      file_number = compact->allocated_file_numbers.front();
+      compact->allocated_file_numbers.pop_front();
+    } else {
+      mutex_.Lock();
+      file_number = versions_->NewFileNumber();
+      pending_outputs_.insert(file_number);
+      mutex_.Unlock();
+    }
+    CompactionState::Output out;
+    out.number = file_number;
+    out.smallest.Clear();
+    out.largest.Clear();
+    compact->outputs.push_back(out);
+
+    // Make the output file
+    std::string fname = TableFileName(dbname_, file_number);
+    s = env_->NewWritableFile(fname, &compact->outfiles[i]);
+    if (s.ok()) {
+      compact->builders[i] = new TableBuilder(options_, compact->outfiles[i],
+                                              compact->compaction->level() + 1);
+    } else {
+      // Clean up already constructed builders and outfiles
+      for (size_t j = 0; j < i; ++j) {
+        compact->builders[j]->Abandon();
+        delete compact->builders[j];
+        delete compact->outfiles[j];
+      }
+      delete[] compact->builders;
+      delete[] compact->outfiles;
+
+      compact->num_outfiles = 0;
+      compact->builders = NULL;
+      compact->outfiles = NULL;
+
+      // And stop looping
+      break;
+    }
   }
   return s;
 }
@@ -1422,55 +1480,79 @@ Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) {
 Status DBImpl::FinishCompactionOutputFile(CompactionState* compact,
                                           Iterator* input) {
   assert(compact != NULL);
-  assert(compact->outfile != NULL);
-  assert(compact->builder != NULL);
+  assert(compact->outfiles != NULL);
+  assert(compact->builders != NULL);
 
-  const uint64_t output_number = compact->current_output()->number;
-  assert(output_number != 0);
+  Status s;
 
-  // Check for iterator errors
-  Status s = input->status();
-  const uint64_t current_entries = compact->builder->NumEntries();
-  if (s.ok()) {
-    s = compact->builder->Finish();
-  } else {
-    compact->builder->Abandon();
-  }
-  const uint64_t current_bytes = compact->builder->FileSize();
-  compact->current_output()->file_size = current_bytes;
-  compact->total_bytes += current_bytes;
-  delete compact->builder;
-  compact->builder = NULL;
-
-  // Finish and check for file errors
-  if (s.ok() && !options_.disableDataSync) {
-    if (options_.use_fsync) {
-      s = compact->outfile->Fsync();
+  size_t i;
+  for (i = 0; i < compact->num_outfiles && s.ok(); ++i) {
+    const uint64_t output_number = compact->current_output(i)->number;
+    assert(output_number != 0);
+
+    // Check for iterator errors
+    s = input->status();
+    const uint64_t current_entries = compact->builders[i]->NumEntries();
+    if (s.ok() && current_entries > 0) {
+      // Don't write out the output file if it is empty.
+      s = compact->builders[i]->Finish();
     } else {
-      s = compact->outfile->Sync();
+      compact->builders[i]->Abandon();
+    }
+    const uint64_t current_bytes = compact->builders[i]->FileSize();
+    compact->current_output(i)->file_size = current_bytes;
+    compact->current_output(i)->skip = current_entries == 0;
+    compact->total_bytes += current_bytes;
+    delete compact->builders[i];
+    compact->builders[i] = nullptr;
+
+    // Finish and check for file errors
+    if (s.ok() && !options_.disableDataSync && current_entries > 0) {
+      if (options_.use_fsync) {
+        s = compact->outfiles[i]->Fsync();
+      } else {
+        s = compact->outfiles[i]->Sync();
+      }
+    }
+    if (s.ok() && current_entries > 0) {
+      s = compact->outfiles[i]->Close();
+    }
+    delete compact->outfiles[i];
+    compact->outfiles[i] = nullptr;
+
+    if (s.ok() && current_entries > 0) {
+      // Verify that the table is usable
+      Iterator* iter = table_cache_->NewIterator(ReadOptions(),
+                                                 output_number,
+                                                 current_bytes);
+      s = iter->status();
+      delete iter;
+      if (s.ok()) {
+        Log(options_.info_log,
+            "Generated table #%llu: %lld keys, %lld bytes",
+            (unsigned long long) output_number,
+            (unsigned long long) current_entries,
+            (unsigned long long) current_bytes);
+      }
     }
   }
-  if (s.ok()) {
-    s = compact->outfile->Close();
-  }
-  delete compact->outfile;
-  compact->outfile = NULL;
-
-  if (s.ok() && current_entries > 0) {
-    // Verify that the table is usable
-    Iterator* iter = table_cache_->NewIterator(ReadOptions(),
-                                               output_number,
-                                               current_bytes);
-    s = iter->status();
-    delete iter;
-    if (s.ok()) {
-      Log(options_.info_log,
-          "Generated table #%llu: %lld keys, %lld bytes",
-          (unsigned long long) output_number,
-          (unsigned long long) current_entries,
-          (unsigned long long) current_bytes);
+
+  // Clean up rest of the output files and builders if we experienced an error.
+  if (!s.ok()) {
+    for (; i < compact->num_outfiles; ++i) {
+      compact->builders[i]->Abandon();
+      delete compact->builders[i];
+      delete compact->outfiles[i];
     }
   }
+
+  delete[] compact->builders;
+  delete[] compact->outfiles;
+
+  compact->num_outfiles = 0;
+  compact->builders = NULL;
+  compact->outfiles = NULL;
+
   return s;
 }
 
@@ -1504,6 +1586,7 @@ Status DBImpl::InstallCompactionResults(CompactionState* compact) {
   const int level = compact->compaction->level();
   for (size_t i = 0; i < compact->outputs.size(); i++) {
     const CompactionState::Output& out = compact->outputs[i];
+    if (out.skip) continue;
     compact->compaction->edit()->AddFile(
         level + 1,
         out.number, out.file_size, out.smallest, out.largest);
@@ -1571,11 +1654,16 @@ void DBImpl::FlushMetrics(void* db) {
     CompactMetrics(metrics);
 
     // Flush metrics to database.
-    // TODO (opt): Remove read-modify-write as it is incredibly slow.
     DB* metrics_db = dbimpl->metrics_db_;
     for (size_t i = 0; i < metrics.size(); ++i) {
       assert(metrics[i] != NULL);
       const std::string& key = metrics[i]->GetDBKey();
+      // TODO: fix this code by some means so that we don't lose metrics
+      //       already in the database. Using a read-update-write cycle is far
+      //       too slow so we temporarily replaced it with an overwrite, with
+      //       the old code left commented out to show the ideal logic.
+      metrics_db->Put(WriteOptions(), key, metrics[i]->GetDBValue());
+      /*
       BlockMetrics* db_metrics = NULL;
 
       std::string db_value;
@@ -1595,6 +1683,7 @@ void DBImpl::FlushMetrics(void* db) {
         // TODO: Log an error here.
       }
       delete db_metrics;
+      */
     }
 
     for (size_t i = 0; i < metrics.size(); ++i) {
@@ -1628,6 +1717,22 @@ void DBImpl::HandleMetrics(void* db, std::vector<BlockMetrics*>* metrics) {
   }
 }
 
+bool DBImpl::TEST_IsHot(const Iterator* iter) {
+  assert(is_hotcold_);
+  assert(iter != NULL);
+
+  BlockMetrics* bm = NULL;
+  ReadOptions metrics_read_options;
+  bool result = IsRecordHot(iter, metrics_db_, metrics_read_options, &bm);
+  delete bm;
+
+  return result;
+}
+
+bool DBImpl::TEST_IsHotCold() {
+  return is_hotcold_;
+}
+
 //
 // Given a sequence number, return the sequence number of the
 // earliest snapshot that this sequence number is visible in.
@@ -1669,8 +1774,8 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
   Log(options_.info_log, "Compaction start summary: %s\n", scratch);
 
   assert(versions_->NumLevelFiles(compact->compaction->level()) > 0);
-  assert(compact->builder == NULL);
-  assert(compact->outfile == NULL);
+  assert(compact->builders == NULL);
+  assert(compact->outfiles == NULL);
 
   SequenceNumber visible_at_tip = 0;
   SequenceNumber earliest_snapshot;
@@ -1692,6 +1797,8 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
   // Release mutex while we're actually doing the compaction work
   mutex_.Unlock();
 
+  BlockMetrics* block_metrics_store = NULL;
+
   const uint64_t start_micros = env_->NowMicros();
   Iterator* input = versions_->MakeInputIterator(compact->compaction);
   input->SeekToFirst();
@@ -1719,7 +1826,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
     Slice value = input->value();
     Slice* compaction_filter_value = NULL;
     if (compact->compaction->ShouldStopBefore(key) &&
-        compact->builder != NULL) {
+        compact->builders != NULL) {
       status = FinishCompactionOutputFile(compact, input);
       if (!status.ok()) {
         break;
@@ -1808,20 +1915,34 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
 
     if (!drop) {
       // Open output file if necessary
-      if (compact->builder == NULL) {
+      if (compact->builders == NULL) {
         status = OpenCompactionOutputFile(compact);
         if (!status.ok()) {
           break;
         }
       }
-      if (compact->builder->NumEntries() == 0) {
-        compact->current_output()->smallest.DecodeFrom(key);
+
+      // Select output file
+      size_t outfile_idx = 0;
+      if (is_hotcold_ && IsRecordHot(input, metrics_db_, ReadOptions(),
+                                     &block_metrics_store)) {
+        outfile_idx = 1;
       }
-      compact->current_output()->largest.DecodeFrom(key);
-      compact->builder->Add(key, value);
+      assert(outfile_idx < compact->num_outfiles);
 
-      // Close output file if it is big enough
-      if (compact->builder->FileSize() >=
+
+      if (compact->builders[outfile_idx]->NumEntries() == 0) {
+        compact->current_output(outfile_idx)->smallest.DecodeFrom(key);
+      }
+      compact->current_output(outfile_idx)->largest.DecodeFrom(key);
+      compact->builders[outfile_idx]->Add(key, value);
+
+      // Close all the output files when any of them reach the file size limit.
+      // By checking only the file we added the record to we are still
+      // guaranteed to catch when this happens as none of the other current
+      // output files have changed their size.
+      // TODO: consider adding the size of all the builders together.
+      if (compact->builders[outfile_idx]->FileSize() >=
           compact->compaction->MaxOutputFileSize()) {
         status = FinishCompactionOutputFile(compact, input);
         if (!status.ok()) {
@@ -1833,10 +1954,12 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
     input->Next();
   }
 
+  delete block_metrics_store;
+
   if (status.ok() && shutting_down_.Acquire_Load()) {
     status = Status::IOError("Deleting DB during compaction");
   }
-  if (status.ok() && compact->builder != NULL) {
+  if (status.ok() && compact->builders != NULL) {
     status = FinishCompactionOutputFile(compact, input);
   }
   if (status.ok()) {
@@ -1852,10 +1975,10 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
   stats.files_in_levelnp1 = compact->compaction->num_input_files(1);
 
   int num_output_files = compact->outputs.size();
-  if (compact->builder != NULL) {
+  if (compact->builders != NULL) {
     // An error occured so ignore the last output.
     assert(num_output_files > 0);
-    --num_output_files;
+    num_output_files -= compact->num_outfiles;
   }
   stats.files_out_levelnp1 = num_output_files;
 
@@ -1996,7 +2119,7 @@ Status DBImpl::Get(const ReadOptions& options,
       // Done
     } else {
       ReadOptions read_options = options;
-      if (is_hotcold_) {
+      if (read_options.record_accesses && is_hotcold_) {
         read_options.metrics_handler = this;
       }
       s = current->Get(read_options, lkey, value, &stats);
@@ -2019,7 +2142,7 @@ Status DBImpl::Get(const ReadOptions& options,
 Iterator* DBImpl::NewIterator(const ReadOptions& options) {
   SequenceNumber latest_snapshot;
   ReadOptions read_options = options;
-  if (is_hotcold_) {
+  if (read_options.record_accesses && is_hotcold_) {
     read_options.metrics_handler = this;
   }
   Iterator* internal_iter = NewInternalIterator(read_options, &latest_snapshot);
index b8021d368a9d303985ca648d5f59c4c4a95af87c..40120a649f7b2145c2e6c6968d33a1e09bf1f79f 100644 (file)
@@ -71,12 +71,19 @@ class DBImpl : public DB {
   // Force current memtable contents to be compacted.
   Status TEST_CompactMemTable();
 
+  // Forcefully flushes metrics to metrics DB from where it can be read.
+  // This method only returns after the flush finishes.
+  void TEST_ForceFlushMetrics();
+
   // Wait for memtable compaction
   Status TEST_WaitForCompactMemTable();
 
   // Wait for any compaction
   Status TEST_WaitForCompact();
 
+  // Wait for flushing of metrics to finish
+  void TEST_WaitForMetricsFlush();
+
   // Return an internal iterator over the current state of the database.
   // The keys of this iterator are internal keys (see format.h).
   // The returned iterator should be deleted when no longer needed.
@@ -91,6 +98,15 @@ class DBImpl : public DB {
 
   // Return the current manifest file no.
   uint64_t TEST_Current_Manifest_FileNo();
+
+  // Returns true if the record pointed to by iter is hot.
+  // REQUIRES: this is a hot-cold database.
+  // REQUIRES: iter is not NULL and belongs to this database.
+  bool TEST_IsHot(const Iterator* iter);
+
+  // Returns true if the DBImpl does hot-cold tracking.
+  bool TEST_IsHotCold();
+
  protected:
   Env* const env_;
   const std::string dbname_;
index 87dca2ded46453dec5e04cef652d2545086af485..a8ef4f1db0e8718083045877df38f26c311a86f5 100644 (file)
@@ -76,6 +76,13 @@ class DBIter: public Iterator {
     }
   }
 
+  virtual const Iterator* FindSubIterator() const {
+    if (!Valid()) {
+      return NULL;
+    }
+    return iter_;
+  };
+
   virtual void Next();
   virtual void Prev();
   virtual void Seek(const Slice& target);
index e18861c4c8f4e6c9be2813d581d62e35968f5354..5a90b893ef42cf1315594b7ef5c491f6be7fd9c4 100644 (file)
@@ -14,6 +14,7 @@
 #include "leveldb/cache.h"
 #include "leveldb/env.h"
 #include "leveldb/table.h"
+#include "util/coding.h"
 #include "util/hash.h"
 #include "util/logging.h"
 #include "util/mutexlock.h"
@@ -282,6 +283,10 @@ class DBTest {
     ASSERT_OK(TryReopen(options));
   }
 
+  void ReopenWithHotCold(Options* options = NULL) {
+    ASSERT_OK(TryReopenWithHotCold(options));
+  }
+
   void Close() {
     delete db_;
     db_ = NULL;
@@ -294,6 +299,13 @@ class DBTest {
     ASSERT_OK(TryReopen(options));
   }
 
+  void DestroyAndReopenWithHotCold(Options* options = NULL) {
+    delete db_;
+    db_ = NULL;
+    DestroyDB(dbname_, Options());
+    ASSERT_OK(TryReopenWithHotCold(options));
+  }
+
   Status PureReopen(Options* options, DB** db) {
     return DB::Open(*options, dbname_, db);
   }
@@ -313,6 +325,21 @@ class DBTest {
     return DB::Open(opts, dbname_, &db_);
   }
 
+  Status TryReopenWithHotCold(Options* options) {
+    delete db_;
+    db_ = NULL;
+    Options opts;
+    if (options != NULL) {
+      opts = *options;
+    } else {
+      opts = CurrentOptions();
+      opts.create_if_missing = true;
+    }
+    last_options_ = opts;
+
+    return DB::OpenWithHotCold(opts, dbname_, &db_);
+  }
+
   Status Put(const std::string& k, const std::string& v) {
     return db_->Put(WriteOptions(), k, v);
   }
@@ -2891,6 +2918,96 @@ TEST(DBTest, Randomized) {
   } while (ChangeOptions());
 }
 
+TEST(DBTest, HotCold) {
+  const uint32_t kNumKeys = 1 << 9;
+  const uint32_t kKeysToSkip = 6;
+  do {
+    DestroyAndReopenWithHotCold(NULL);
+    ASSERT_TRUE(db_ != NULL);
+
+    // Skip if this db is not hotcold
+    if (!dbfull()->TEST_IsHotCold()) {
+      continue;
+    }
+
+    // Populate db and push all data to sstable
+    for (uint32_t key = 0; key < kNumKeys; ++key) {
+      std::string skey;
+      PutFixed32(&skey, key);
+
+      Put(skey, skey);
+    }
+    dbfull()->TEST_CompactMemTable();
+    dbfull()->CompactRange(NULL, NULL);
+
+    // Flush metrics if any and check that all records are cold
+    dbfull()->TEST_ForceFlushMetrics();
+    {
+      ReadOptions opts;
+      opts.record_accesses = false;
+      Iterator* iter = db_->NewIterator(opts);
+      for (uint32_t key = 0; key < kNumKeys; ++key) {
+        std::string skey;
+        PutFixed32(&skey, key);
+
+        iter->Seek(skey);
+        ASSERT_TRUE(iter->Valid());
+        ASSERT_EQ(skey, iter->key().ToString());
+
+        ASSERT_TRUE(!dbfull()->TEST_IsHot(iter));
+      }
+      delete iter;
+    }
+
+    // Access a few rows and flush the metrics to the metrics db.
+    for (uint32_t key = 0; key < kNumKeys; key += kKeysToSkip) {
+      std::string skey;
+      PutFixed32(&skey, key);
+
+      ASSERT_EQ(skey, Get(skey));
+    }
+    dbfull()->TEST_ForceFlushMetrics();
+
+    // Check that keys that should be hot are hot.
+    // Due to false positives we don't check whether cold keys are actually
+    // cold.
+    {
+      ReadOptions opts;
+      opts.record_accesses = false;
+      Iterator* iter = db_->NewIterator(opts);
+      uint32_t totalColdKeys = 0; // Total number of keys that are supposed to
+                                  // be cold.
+      uint32_t totalHotColdKeys = 0; // Counts number of keys which are
+                                     // supposed to be cold that are actually
+                                     // hot
+      for (uint32_t key = 0; key < kNumKeys; ++key) {
+        std::string skey;
+        PutFixed32(&skey, key);
+
+        iter->Seek(skey);
+        ASSERT_TRUE(iter->Valid());
+        ASSERT_EQ(skey, iter->key().ToString());
+
+        if (key % kKeysToSkip == 0) {
+          ASSERT_TRUE(dbfull()->TEST_IsHot(iter));
+        } else {
+          ++totalColdKeys;
+          if (dbfull()->TEST_IsHot(iter)) {
+            ++totalHotColdKeys;
+          }
+        }
+      }
+      delete iter;
+
+      if (totalColdKeys != 0) {
+        fprintf(stderr, "Hot false-positive rate: %.2f%%\n",
+                double(totalHotColdKeys)/totalColdKeys*100);
+      }
+    }
+
+  } while (ChangeOptions());
+}
+
 std::string MakeKey(unsigned int num) {
   char buf[30];
   snprintf(buf, sizeof(buf), "%016u", num);
index 0e3ca89f007f85adbf5e108ec6463b1324cbdecf..3316c01949491c3e7984287e49ff420bee5132de 100644 (file)
@@ -28,6 +28,14 @@ static int64_t TotalFileSize(const std::vector<FileMetaData*>& files) {
   return sum;
 }
 
+static int64_t TotalFileSize(const std::vector<FileGroup*>& file_groups) {
+  int64_t sum = 0;
+  for (size_t i = 0; i < file_groups.size() && file_groups[i]; i++) {
+    sum += file_groups[i]->total_file_size;
+  }
+  return sum;
+}
+
 Version::~Version() {
   assert(refs_ == 0);
 
@@ -35,6 +43,14 @@ Version::~Version() {
   prev_->next_ = next_;
   next_->prev_ = prev_;
 
+  // Free FileGroups
+  for (int level = 0; level < vset_->NumberLevels(); level++) {
+    for (size_t i = 0; i < file_groups_[level].size(); i++) {
+      delete file_groups_[level][i];
+    }
+  }
+  delete[] file_groups_;
+
   // Drop references to files
   for (int level = 0; level < vset_->NumberLevels(); level++) {
     for (size_t i = 0; i < files_[level].size(); i++) {
@@ -121,28 +137,181 @@ bool SomeFileOverlapsRange(
   return !BeforeFile(ucmp, largest_user_key, files[index]);
 }
 
-// An internal iterator.  For a given version/level pair, yields
-// information about the files in the level.  For a given entry, key()
-// is the largest key that occurs in the file, and value() is an
-// 16-byte value containing the file number and file size, both
-// encoded using EncodeFixed64.
-class Version::LevelFileNumIterator : public Iterator {
+Iterator* FileGroup::NewIterator(TableCache* cache,
+                                 const InternalKeyComparator* icmp,
+                                 const ReadOptions& options) {
+  if (files.size() == 1) {
+    Iterator* iter = cache->NewIterator(options, files[0]->number,
+                                        files[0]->file_size);
+    return iter;
+  }
+
+  Iterator** iters = new Iterator*[files.size()];
+  for (size_t i = 0; i < files.size(); ++i) {
+    iters[i] = cache->NewIterator(options, files[i]->number,
+                                  files[i]->file_size);
+  }
+  Iterator* iter = NewMergingIterator(icmp, iters, files.size());
+  delete[] iters;
+  return iter;
+}
+
+namespace {
+// Represents either the start or end of a file. "start" indicates which key we
+// use of "file".
+struct EndPoint {
+  FileMetaData* file;
+  bool start; // If true we use the first key; otherwise, the last.
+
+  EndPoint(FileMetaData* file, bool start)
+    : file(file), start(start) {
+  }
+};
+// Comparator to sort end points. Sort by the key of two end points. If a two
+// endpoints have the same key and one is a start and the other an end end
+// point, then the end end point is ordered before the start end point.
+struct BySmallestEndPoint {
+  const InternalKeyComparator* internal_comparator;
+
+  BySmallestEndPoint(const InternalKeyComparator* internal_comparator)
+    : internal_comparator(internal_comparator) {
+  }
+
+  bool operator()(const EndPoint& a, const EndPoint& b) {
+    const InternalKey& a_key = a.start?a.file->smallest:a.file->largest;
+    const InternalKey& b_key = b.start?b.file->smallest:b.file->largest;
+
+    int r = internal_comparator->Compare(a_key, b_key);
+
+    // if a_key < b_key or else if a_key == b_key and "a" is an start point and
+    // "b" is a end point then "a" comes before "b".
+    return (r < 0) || (r == 0 && a.start && !b.start);
+  }
+};
+}  // namespace
+void GroupFiles(
+    const InternalKeyComparator* internal_comparator,
+    const std::vector<FileMetaData*>& files,
+    std::vector<FileGroup*>& file_groups) {
+  file_groups.clear();
+
+  // Store and sort the end points of the files.
+  std::vector<EndPoint> end_points;
+  end_points.reserve(files.size() * 2);
+  for (size_t i = 0; i < files.size(); ++i) {
+    end_points.push_back(EndPoint(files[i], false));
+    end_points.push_back(EndPoint(files[i], true));
+  }
+  std::sort(end_points.begin(), end_points.end(),
+            BySmallestEndPoint(internal_comparator));
+
+  // Loop through end points finding groups of overlapping files.
+  size_t cur_unended_files = 0; // Number of files of which we've seen the
+                                // start point, but not the end point.
+  for (size_t i = 0; i < end_points.size(); ++i) {
+    FileGroup* cur_group;
+    if (end_points[i].start) {
+      if (cur_unended_files == 0) {
+        // Create a new group.
+        cur_group = new FileGroup();
+        file_groups.push_back(cur_group);
+
+        // First file in this group, so update the smallest key.
+        cur_group->smallest = end_points[i].file->smallest;
+      } else {
+        cur_group = file_groups[file_groups.size()-1];
+      }
+
+      // Add files to group when we first encounter them.
+      cur_group->files.push_back(end_points[i].file);
+      cur_group->total_file_size += end_points[i].file->file_size;
+
+      ++cur_unended_files;
+    } else {
+      --cur_unended_files;
+      assert(cur_unended_files >= 0);
+      if (cur_unended_files == 0) {
+        // No more files are going to overlap with current group so set the
+        // largest key in the group, cause we are either done or going to start
+        // a new group the next iteration.
+
+        cur_group = file_groups[file_groups.size()-1];
+        cur_group->largest = end_points[i].file->largest;
+      }
+    }
+  }
+}
+
+size_t FindGroup(const InternalKeyComparator& icmp,
+                 const std::vector<FileGroup*>& groups,
+                 const Slice& key) {
+  size_t left = 0;
+  size_t right = groups.size();
+  while (left < right) {
+    size_t mid = (left + right) / 2;
+    const FileGroup* g = groups[mid];
+    if (icmp.InternalKeyComparator::Compare(g->largest.Encode(), key) < 0) {
+      // Key at "mid.largest" is < "target".  Therefore all
+      // groups at or before "mid" are uninteresting.
+      left = mid + 1;
+    } else {
+      // Key at "mid.largest" is >= "target".  Therefore all groups
+      // after "mid" are uninteresting.
+      right = mid;
+    }
+  }
+  return right;
+}
+
+bool SomeGroupOverlapsRange(
+    const InternalKeyComparator& icmp,
+    const std::vector<FileGroup*>& groups,
+    const Slice* smallest_user_key,
+    const Slice* largest_user_key) {
+  const Comparator* ucmp = icmp.user_comparator();
+
+  // Binary search over group list
+  uint32_t index = 0;
+  if (smallest_user_key != NULL) {
+    // Find the earliest possible internal key for smallest_user_key
+    InternalKey small(*smallest_user_key, kMaxSequenceNumber,
+                      kValueTypeForSeek);
+    index = FindGroup(icmp, groups, small.Encode());
+  }
+
+  if (index >= groups.size()) {
+    // beginning of range is after all files, so no overlap.
+    return false;
+  }
+
+  return (largest_user_key == NULL ||
+          ucmp->Compare(*largest_user_key,
+                        groups[index]->smallest.user_key()) >= 0);
+}
+
+namespace {
+// An internal iterator. For a given list of groups, yields information about
+// those groups. For a given entry, key() is the largest key that occurs in the
+// group, and value() is an 8-byte value containing the index of the group in
+// the list, encoded using EncodeFixed64.
+class LevelGroupIdxIterator : public Iterator {
  public:
-  LevelFileNumIterator(const InternalKeyComparator& icmp,
-                       const std::vector<FileMetaData*>* flist)
+  LevelGroupIdxIterator(const InternalKeyComparator& icmp,
+                        const std::vector<FileGroup*>& groups)
       : icmp_(icmp),
-        flist_(flist),
-        index_(flist->size()) {        // Marks as invalid
+        groups_(groups),
+        index_(groups_.size()) {        // Marks as invalid
+    assert(!groups.empty());
   }
   virtual bool Valid() const {
-    return index_ < flist_->size();
+    return index_ < groups_.size();
   }
   virtual void Seek(const Slice& target) {
-    index_ = FindFile(icmp_, *flist_, target);
+    index_ = FindGroup(icmp_, groups_, target);
   }
   virtual void SeekToFirst() { index_ = 0; }
   virtual void SeekToLast() {
-    index_ = flist_->empty() ? 0 : flist_->size() - 1;
+    index_ = groups_.empty() ? 0 : groups_.size() - 1;
   }
   virtual void Next() {
     assert(Valid());
@@ -151,51 +320,86 @@ class Version::LevelFileNumIterator : public Iterator {
   virtual void Prev() {
     assert(Valid());
     if (index_ == 0) {
-      index_ = flist_->size();  // Marks as invalid
+      index_ = groups_.size();  // Marks as invalid
     } else {
       index_--;
     }
   }
   Slice key() const {
     assert(Valid());
-    return (*flist_)[index_]->largest.Encode();
+    return groups_[index_]->largest.Encode();
   }
   Slice value() const {
     assert(Valid());
-    EncodeFixed64(value_buf_, (*flist_)[index_]->number);
-    EncodeFixed64(value_buf_+8, (*flist_)[index_]->file_size);
+    EncodeFixed64(value_buf_, index_);
     return Slice(value_buf_, sizeof(value_buf_));
   }
   virtual Status status() const { return Status::OK(); }
  private:
   const InternalKeyComparator icmp_;
-  const std::vector<FileMetaData*>* const flist_;
-  uint32_t index_;
+  uint64_t level_;
+  const std::vector<FileGroup*>& groups_;
+  size_t index_;
 
   // Backing store for value().  Holds the file number and size.
-  mutable char value_buf_[16];
+  mutable char value_buf_[8];
 };
 
-static Iterator* GetFileIterator(void* arg,
-                                 const ReadOptions& options,
-                                 const Slice& file_value) {
-  TableCache* cache = reinterpret_cast<TableCache*>(arg);
-  if (file_value.size() != 16) {
-    return NewErrorIterator(
-        Status::Corruption("FileReader invoked with unexpected value"));
+// Struct that contains all the information that GetGroupIterator() needs to
+// function.
+struct GetGroupIteratorArg {
+  const InternalKeyComparator* icmp;
+  TableCache* cache;
+  const std::vector<FileGroup*>& groups;
+
+  GetGroupIteratorArg(const InternalKeyComparator* icmp,
+                      TableCache* cache,
+                      const std::vector<FileGroup*>& groups)
+    : icmp(icmp), cache(cache), groups(groups) {
+  }
+
+  // Function that gets passed to an Iterator in order to free an instance of
+  // this class.
+  static void IterDelete(void* arg1, void*) {
+    GetGroupIteratorArg* ggia = reinterpret_cast<GetGroupIteratorArg*>(arg1);
+    delete ggia;
+  }
+};
+// Returns an iterator over the specifided group.
+static Iterator* GetGroupIterator(void* arg,
+                                  const ReadOptions& options,
+                                  const Slice& index) {
+  GetGroupIteratorArg* ggia = reinterpret_cast<GetGroupIteratorArg*>(arg);
+  Iterator* iter;
+  if (index.size() != 8) {
+    iter = NewErrorIterator(
+        Status::Corruption("GetGroupIterator() invoked with unexpected index"));
   } else {
-    return cache->NewIterator(options,
-                              DecodeFixed64(file_value.data()),
-                              DecodeFixed64(file_value.data() + 8));
+    uint64_t iindex = DecodeFixed64(index.data());
+    assert(iindex < ggia->groups.size());
+    iter = ggia->groups[iindex]->NewIterator(ggia->cache, ggia->icmp, options);
   }
+  return iter;
 }
 
-Iterator* Version::NewConcatenatingIterator(const ReadOptions& options,
-                                            int level) const {
-  return NewTwoLevelIterator(
-      new LevelFileNumIterator(vset_->icmp_, &files_[level]),
-      &GetFileIterator, vset_->table_cache_, options);
+// Returns an iterator that concatenates together the given groups.
+// REQUIRES: The groups are sorted in order and are non-overlapping
+Iterator* NewGroupsConcatenatingIterator(
+        const ReadOptions& options,
+        TableCache* cache,
+        const InternalKeyComparator* icmp,
+        const std::vector<FileGroup*>& groups) {
+  assert(!groups.empty());
+
+  GetGroupIteratorArg* ggia = new GetGroupIteratorArg(icmp, cache, groups);
+  Iterator* iter = NewTwoLevelIterator(
+      new LevelGroupIdxIterator(*icmp, groups),
+      &GetGroupIterator, ggia, options);
+  iter->RegisterCleanup(&GetGroupIteratorArg::IterDelete, ggia, NULL);
+
+  return iter;
 }
+}  // namespace
 
 void Version::AddIterators(const ReadOptions& options,
                            std::vector<Iterator*>* iters) {
@@ -207,11 +411,14 @@ void Version::AddIterators(const ReadOptions& options,
   }
 
   // For levels > 0, we can use a concatenating iterator that sequentially
-  // walks through the non-overlapping files in the level, opening them
+  // walks through the non-overlapping groups in the level, opening them
   // lazily.
   for (int level = 1; level < vset_->NumberLevels(); level++) {
-    if (!files_[level].empty()) {
-      iters->push_back(NewConcatenatingIterator(options, level));
+    if (!file_groups_[level].empty()) {
+      iters->push_back(NewGroupsConcatenatingIterator(options,
+                                                      vset_->table_cache_,
+                                                      &vset_->icmp_,
+                                                      file_groups_[level]));
     }
   }
 }
@@ -229,6 +436,7 @@ struct Saver {
   const Comparator* ucmp;
   Slice user_key;
   std::string* value;
+  SequenceNumber seq;
   bool didIO;    // did we do any disk io?
 };
 }
@@ -243,11 +451,13 @@ static void SaveValue(void* arg, const Slice& ikey, const Slice& v, bool didIO){
       s->state = (parsed_key.type == kTypeValue) ? kFound : kDeleted;
       if (s->state == kFound) {
         s->value->assign(v.data(), v.size());
+        s->seq = parsed_key.sequence;
       }
     }
   }
 }
 
+
 static bool NewestFirst(FileMetaData* a, FileMetaData* b) {
   return a->number > b->number;
 }
@@ -263,6 +473,7 @@ Version::Version(VersionSet* vset, uint64_t version_number)
       offset_manifest_file_(0),
       version_number_(version_number) {
   files_ = new std::vector<FileMetaData*>[vset->NumberLevels()];
+  file_groups_ = new std::vector<FileGroup*>[vset->NumberLevels()]();
 }
 
 Status Version::Get(const ReadOptions& options,
@@ -282,53 +493,60 @@ Status Version::Get(const ReadOptions& options,
   // We can search level-by-level since entries never hop across
   // levels.  Therefore we are guaranteed that if we find data
   // in an smaller level, later levels are irrelevant.
-  std::vector<FileMetaData*> tmp;
-  FileMetaData* tmp2;
+  std::vector<FileMetaData*> files;
   for (int level = 0; level < vset_->NumberLevels(); level++) {
-    size_t num_files = files_[level].size();
-    if (num_files == 0) continue;
+    size_t num_groups = file_groups_[level].size();
+    if (files_[level].size() == 0) continue;
+    assert(num_groups > 0);
 
-    // Get the list of files to search in this level
-    FileMetaData* const* files = &files_[level][0];
-    if (level == 0) {
-      // Level-0 files may overlap each other.  Find all files that
-      // overlap user_key and process them in order from newest to oldest.
-      tmp.reserve(num_files);
-      for (uint32_t i = 0; i < num_files; i++) {
-        FileMetaData* f = files[i];
-        if (ucmp->Compare(user_key, f->smallest.user_key()) >= 0 &&
-            ucmp->Compare(user_key, f->largest.user_key()) <= 0) {
-          tmp.push_back(f);
+    const std::vector<FileGroup*>& groups = file_groups_[level];
+    size_t index = FindGroup(vset_->icmp_, groups, ikey);
+    if (index >= num_groups) {
+      continue;
+    } else if (ucmp->Compare(user_key,
+                             groups[index]->smallest.user_key()) >= 0) {
+      // Although user keys can span multiple groups we don't have to worry
+      // about checking the next group as we are only interested in the user
+      // key with the greatest sequence number and as internal keys with the same
+      // user key are sorted in order of decreasing sequence number, this group
+      // is guaranteed to contain the user key with the largest sequence
+      // number.
+
+      for (size_t i = 0; i < groups[index]->files.size(); i++) {
+        FileMetaData* f = file_groups_[level][index]->files[i];
+
+        if (i != 0 && ucmp->Compare(user_key, f->smallest.user_key()) < 0) {
+          // We know the user key is greater than the smallest key of the first
+          // file (which is also the smallest key of the group) so we avoid
+          // calling the comparator.
+          // If the smallest key in the current file is greater than the user
+          // key, then no other file will overlap with the key so we can stop.
+          break;
         }
-      }
-      if (tmp.empty()) continue;
 
-      std::sort(tmp.begin(), tmp.end(), NewestFirst);
-      files = &tmp[0];
-      num_files = tmp.size();
-    } else {
-      // Binary search to find earliest index whose largest key >= ikey.
-      uint32_t index = FindFile(vset_->icmp_, files_[level], ikey);
-      if (index >= num_files) {
-        files = NULL;
-        num_files = 0;
-      } else {
-        tmp2 = files[index];
-        if (ucmp->Compare(user_key, tmp2->smallest.user_key()) < 0) {
-          // All of "tmp2" is past any data for user_key
-          files = NULL;
-          num_files = 0;
-        } else {
-          files = &tmp2;
-          num_files = 1;
+        if (ucmp->Compare(user_key, f->largest.user_key()) <= 0) {
+          files.push_back(f);
         }
       }
     }
 
-    for (uint32_t i = 0; i < num_files; ++i) {
+    if (files.empty()) continue;
+
+    // Sort files since in level 0 we can simply look starting from the newest
+    // file.
+    if (level == 0) {
+      std::sort(files.begin(), files.end(), NewestFirst);
+    }
+
+    Saver saver;
+    saver.ucmp = ucmp;
+    std::string saved_value;
+    SequenceNumber saved_seq = 0;
+    SaverState saved_state = kNotFound;
+
+    for (uint32_t i = 0; i < files.size(); ++i) {
 
       FileMetaData* f = files[i];
-      Saver saver;
       saver.state = kNotFound;
       saver.ucmp = ucmp;
       saver.user_key = user_key;
@@ -363,15 +581,46 @@ Status Version::Get(const ReadOptions& options,
         case kNotFound:
           break;      // Keep searching in other files
         case kFound:
-          return s;
+          if (level == 0 || files.size() == 1) {
+            return s; // no need to look at other files
+          }
+
+          if (saver.seq >= saved_seq) { // save most recent
+            saved_value = *saver.value;
+            saved_seq = saver.seq;
+            saved_state = kFound;
+          }
+          break; // keep searching in other files
         case kDeleted:
-          s = Status::NotFound(Slice());  // Use empty error message for speed
-          return s;
+          if (level == 0 || files.size() == 1) {
+            s = Status::NotFound(Slice());  // Use empty error message for speed
+            return s;
+          }
+
+          if (saver.seq >= saved_seq) { // save most recent
+            saved_seq = saver.seq;
+            saved_state = kNotFound;
+          }
+          break;
         case kCorrupt:
           s = Status::Corruption("corrupted key for ", user_key);
           return s;
       }
     }
+
+    assert(saved_state == kNotFound || files.size() > 1);
+    assert(s.ok());
+    switch (saved_state) {
+      case kFound:
+        *value = saved_value;
+        return s;
+      case kDeleted:
+        s = Status::NotFound(Slice());// Use empty error message for speed
+        return s;
+      default:
+        assert(saved_state == kNotFound);
+    }
+    files.clear();
   }
 
   return Status::NotFound(Slice());  // Use an empty error message for speed
@@ -381,7 +630,10 @@ bool Version::UpdateStats(const GetStats& stats) {
   FileMetaData* f = stats.seek_file;
   if (f != NULL) {
     f->allowed_seeks--;
-    if (f->allowed_seeks <= 0 && file_to_compact_ == NULL) {
+    if (f->allowed_seeks <= 0 && file_to_compact_ == NULL &&
+        stats.seek_file_level < vset_->NumberLevels()-1) {
+      // We don't allow seek compactions to happen on the level with the
+      // greatest number.
       file_to_compact_ = f;
       file_to_compact_level_ = stats.seek_file_level;
       return true;
@@ -406,8 +658,8 @@ void Version::Unref() {
 bool Version::OverlapInLevel(int level,
                              const Slice* smallest_user_key,
                              const Slice* largest_user_key) {
-  return SomeFileOverlapsRange(vset_->icmp_, (level > 0), files_[level],
-                               smallest_user_key, largest_user_key);
+  return SomeGroupOverlapsRange(vset_->icmp_, file_groups_[level],
+                                smallest_user_key, largest_user_key);
 }
 
 int Version::PickLevelForMemTableOutput(
@@ -442,171 +694,42 @@ int Version::PickLevelForMemTableOutput(
 }
 
 // Store in "*inputs" all files in "level" that overlap [begin,end]
-// If hint_index is specified, then it points to a file in the
-// overlapping range.
-// The file_index returns a pointer to any file in an overlapping range.
+// Including all files that overlap files in those range, files that overlap
+// those files in turn and so on.
 void Version::GetOverlappingInputs(
     int level,
     const InternalKey* begin,
     const InternalKey* end,
-    std::vector<FileMetaData*>* inputs,
-    int hint_index,
-    int* file_index) {
+    std::vector<FileMetaData*>* inputs) {
+  assert(level >= 0);
+  assert(level < vset_->NumberLevels());
   inputs->clear();
+
+  const std::vector<FileGroup*>& groups = file_groups_[level];
+
   Slice user_begin, user_end;
+  size_t start_index = 0;
   if (begin != NULL) {
     user_begin = begin->user_key();
+    start_index = FindGroup(vset_->icmp_, groups, begin->Encode());
   }
   if (end != NULL) {
     user_end = end->user_key();
   }
-  if (file_index) {
-    *file_index = -1;
-  }
-  const Comparator* user_cmp = vset_->icmp_.user_comparator();
-  if (begin != NULL && end != NULL && level > 0) {
-    GetOverlappingInputsBinarySearch(level, user_begin, user_end, inputs,
-      hint_index, file_index);
-    return;
-  }
-  for (size_t i = 0; i < files_[level].size(); ) {
-    FileMetaData* f = files_[level][i++];
-    const Slice file_start = f->smallest.user_key();
-    const Slice file_limit = f->largest.user_key();
-    if (begin != NULL && user_cmp->Compare(file_limit, user_begin) < 0) {
-      // "f" is completely before specified range; skip it
-    } else if (end != NULL && user_cmp->Compare(file_start, user_end) > 0) {
-      // "f" is completely after specified range; skip it
-    } else {
-      inputs->push_back(f);
-      if (level == 0) {
-        // Level-0 files may overlap each other.  So check if the newly
-        // added file has expanded the range.  If so, restart search.
-        if (begin != NULL && user_cmp->Compare(file_start, user_begin) < 0) {
-          user_begin = file_start;
-          inputs->clear();
-          i = 0;
-        } else if (end != NULL && user_cmp->Compare(file_limit, user_end) > 0) {
-          user_end = file_limit;
-          inputs->clear();
-          i = 0;
-        }
-      } else if (file_index) {
-        *file_index = i-1;
-      }
-    }
-  }
-}
 
-// Store in "*inputs" all files in "level" that overlap [begin,end]
-// Employ binary search to find at least one file that overlaps the
-// specified range. From that file, iterate backwards and
-// forwards to find all overlapping files.
-void Version::GetOverlappingInputsBinarySearch(
-    int level,
-    const Slice& user_begin,
-    const Slice& user_end,
-    std::vector<FileMetaData*>* inputs,
-    int hint_index,
-    int* file_index) {
-  assert(level > 0);
-  int min = 0;
-  int mid = 0;
-  int max = files_[level].size() -1;
-  bool foundOverlap = false;
   const Comparator* user_cmp = vset_->icmp_.user_comparator();
+  while (start_index < groups.size()) {
+    const FileGroup* group = groups[start_index];
 
-  // if the caller already knows the index of a file that has overlap,
-  // then we can skip the binary search.
-  if (hint_index != -1) {
-    mid = hint_index;
-    foundOverlap = true;
-  }
-
-  while (!foundOverlap && min <= max) {
-    mid = (min + max)/2;
-    FileMetaData* f = files_[level][mid];
-    const Slice file_start = f->smallest.user_key();
-    const Slice file_limit = f->largest.user_key();
-    if (user_cmp->Compare(file_limit, user_begin) < 0) {
-      min = mid + 1;
-    } else if (user_cmp->Compare(user_end, file_start) < 0) {
-      max = mid - 1;
-    } else {
-      foundOverlap = true;
+    // If the current group is larger than the end range, stop searching.
+    if (end != NULL &&
+        user_cmp->Compare(group->smallest.user_key(), user_end) > 0) {
       break;
     }
-  }
 
-  // If there were no overlapping files, return immediately.
-  if (!foundOverlap) {
-    return;
-  }
-  // returns the index where an overlap is found
-  if (file_index) {
-    *file_index = mid;
-  }
-  ExtendOverlappingInputs(level, user_begin, user_end, inputs, mid);
-}
+    inputs->insert(inputs->end(), group->files.begin(), group->files.end());
 
-// Store in "*inputs" all files in "level" that overlap [begin,end]
-// The midIndex specifies the index of at least one file that
-// overlaps the specified range. From that file, iterate backward
-// and forward to find all overlapping files.
-void Version::ExtendOverlappingInputs(
-    int level,
-    const Slice& user_begin,
-    const Slice& user_end,
-    std::vector<FileMetaData*>* inputs,
-    int midIndex) {
-
-  const Comparator* user_cmp = vset_->icmp_.user_comparator();
-#ifndef NDEBUG
-  {
-    // assert that the file at midIndex overlaps with the range
-    assert(midIndex < files_[level].size());
-    FileMetaData* f = files_[level][midIndex];
-    const Slice fstart = f->smallest.user_key();
-    const Slice flimit = f->largest.user_key();
-    if (user_cmp->Compare(fstart, user_begin) >= 0) {
-      assert(user_cmp->Compare(fstart, user_end) <= 0);
-    } else {
-      assert(user_cmp->Compare(flimit, user_begin) >= 0);
-    }
-  }
-#endif
-  int startIndex = midIndex + 1;
-  int endIndex = midIndex;
-  int count __attribute__((unused)) = 0;
-
-  // check backwards from 'mid' to lower indices
-  for (int i = midIndex; i >= 0 ; i--) {
-    FileMetaData* f = files_[level][i];
-    const Slice file_limit = f->largest.user_key();
-    if (user_cmp->Compare(file_limit, user_begin) >= 0) {
-      startIndex = i;
-      assert((count++, true));
-    } else {
-      break;
-    }
-  }
-  // check forward from 'mid+1' to higher indices
-  for (unsigned int i = midIndex+1; i < files_[level].size(); i++) {
-    FileMetaData* f = files_[level][i];
-    const Slice file_start = f->smallest.user_key();
-    if (user_cmp->Compare(file_start, user_end) <= 0) {
-      assert((count++, true));
-      endIndex = i;
-    } else {
-      break;
-    }
-  }
-  assert(count == endIndex - startIndex + 1);
-
-  // insert overlapping files into vector
-  for (int i = startIndex; i <= endIndex; i++) {
-    FileMetaData* f = files_[level][i];
-    inputs->push_back(f);
+    ++start_index;
   }
 }
 
@@ -638,6 +761,13 @@ std::string Version::DebugString(bool hex) const {
   return r;
 }
 
+void Version::RebuildGroups(int level) {
+  for (size_t i = 0; i < file_groups_[level].size(); ++i) {
+    delete file_groups_[level][i];
+  }
+  GroupFiles(&vset_->icmp_, files_[level], file_groups_[level]);
+}
+
 // this is used to batch writes to the manifest file
 struct VersionSet::ManifestWriter {
   Status status;
@@ -717,21 +847,6 @@ class VersionSet::Builder {
 
   void CheckConsistency(Version* v) {
 #ifndef NDEBUG
-    for (int level = 0; level < vset_->NumberLevels(); level++) {
-      // Make sure there is no overlap in levels > 0
-      if (level > 0) {
-        for (uint32_t i = 1; i < v->files_[level].size(); i++) {
-          const InternalKey& prev_end = v->files_[level][i-1]->largest;
-          const InternalKey& this_begin = v->files_[level][i]->smallest;
-          if (vset_->icmp_.Compare(prev_end, this_begin) >= 0) {
-            fprintf(stderr, "overlapping ranges in same level %s vs. %s\n",
-                    prev_end.DebugString().c_str(),
-                    this_begin.DebugString().c_str());
-            abort();
-          }
-        }
-      }
-    }
 #endif
   }
 
@@ -861,6 +976,8 @@ class VersionSet::Builder {
       for (; base_iter != base_end; ++base_iter) {
         MaybeAddFile(v, level, *base_iter);
       }
+
+      v->RebuildGroups(level);
     }
     CheckConsistency(v);
   }
@@ -870,11 +987,6 @@ class VersionSet::Builder {
       // File is deleted: do nothing
     } else {
       std::vector<FileMetaData*>* files = &v->files_[level];
-      if (level > 0 && !files->empty()) {
-        // Must not overlap
-        assert(vset_->icmp_.Compare((*files)[files->size()-1]->largest,
-                                    f->smallest) < 0);
-      }
       f->refs++;
       files->push_back(f);
     }
@@ -1582,33 +1694,54 @@ bool VersionSet::ManifestContains(const std::string& record) const {
   return result;
 }
 
+uint64_t FileGroup::ApproximateOffsetOf(const VersionSet* vset,
+                                        const InternalKey& ikey) {
+  // ikey is before FileGroup so no overlap occurs.
+  if (vset->icmp_.Compare(ikey, smallest) < 0) return 0;
+  // Entire FileGroup is after ikey so just use total size.
+  if (vset->icmp_.Compare(largest, ikey) <= 0) return total_file_size;
+
+  uint64_t result = 0;
+  for (size_t i = 0; i < files.size(); i++) {
+    if (vset->icmp_.Compare(files[i]->largest, ikey) <= 0) {
+      // Entire file is before "ikey", so just add the file size
+      result += files[i]->file_size;
+    } else if (vset->icmp_.Compare(files[i]->smallest, ikey) > 0) {
+      // Entire file is after "ikey", so ignore
+
+      // files are sorted by start point so no more overlaps will occur.
+      break;
+    } else {
+      // "ikey" falls in the range for this table.  Add the
+      // approximate offset of "ikey" within the table.
+      Table* tableptr;
+      Iterator* iter = vset->table_cache_->NewIterator(
+          ReadOptions(), files[i]->number, files[i]->file_size, &tableptr);
+      if (tableptr != NULL) {
+        result += tableptr->ApproximateOffsetOf(ikey.Encode());
+      }
+      delete iter;
+    }
+  }
+  return result;
+}
 
 uint64_t VersionSet::ApproximateOffsetOf(Version* v, const InternalKey& ikey) {
   uint64_t result = 0;
   for (int level = 0; level < NumberLevels(); level++) {
-    const std::vector<FileMetaData*>& files = v->files_[level];
-    for (size_t i = 0; i < files.size(); i++) {
-      if (icmp_.Compare(files[i]->largest, ikey) <= 0) {
-        // Entire file is before "ikey", so just add the file size
-        result += files[i]->file_size;
-      } else if (icmp_.Compare(files[i]->smallest, ikey) > 0) {
-        // Entire file is after "ikey", so ignore
-        if (level > 0) {
-          // Files other than level 0 are sorted by meta->smallest, so
-          // no further files in this level will contain data for
-          // "ikey".
-          break;
-        }
-      } else {
-        // "ikey" falls in the range for this table.  Add the
-        // approximate offset of "ikey" within the table.
-        Table* tableptr;
-        Iterator* iter = table_cache_->NewIterator(
-            ReadOptions(), files[i]->number, files[i]->file_size, &tableptr);
-        if (tableptr != NULL) {
-          result += tableptr->ApproximateOffsetOf(ikey.Encode());
-        }
-        delete iter;
+    const std::vector<FileGroup*>& groups = v->file_groups_[level];
+    for (size_t i = 0; i < groups.size(); i++) {
+      // We can use FileGroup's ApproximateOffsetOf as it is efficient.
+      uint64_t group_offset = groups[i]->ApproximateOffsetOf(this, ikey);
+      result += group_offset;
+
+      if (group_offset == 0) {
+        // Since groups are non-empty we are guaranteed other groups after this
+        // one won't contain the key, as either the key falls before the
+        // current group or it is the first key in the group.
+        //
+        // So we can stop looping.
+        break;
       }
     }
   }
@@ -1701,31 +1834,35 @@ void VersionSet::GetRange2(const std::vector<FileMetaData*>& inputs1,
   GetRange(all, smallest, largest);
 }
 
+namespace {
+void DeleteFileGroups(void* arg1, void* arg2) {
+  std::vector<FileGroup*>* groups =
+    reinterpret_cast<std::vector<FileGroup*>*>(arg1);
+
+  for (size_t i = 0; i < groups->size(); ++i) {
+    delete (*groups)[i];
+  }
+  delete groups;
+}
+}
 Iterator* VersionSet::MakeInputIterator(Compaction* c) {
   ReadOptions options;
   options.verify_checksums = options_->paranoid_checks;
   options.fill_cache = false;
 
-  // Level-0 files have to be merged together.  For other levels,
-  // we will make a concatenating iterator per level.
-  // TODO(opt): use concatenating iterator for level-0 if there is no overlap
-  const int space = (c->level() == 0 ? c->inputs_[0].size() + 1 : 2);
+  // We construct groups of the input files, one for each level, then we merge
+  // these iterators together.
+  const int space = 2;
   Iterator** list = new Iterator*[space];
   int num = 0;
   for (int which = 0; which < 2; which++) {
     if (!c->inputs_[which].empty()) {
-      if (c->level() + which == 0) {
-        const std::vector<FileMetaData*>& files = c->inputs_[which];
-        for (size_t i = 0; i < files.size(); i++) {
-          list[num++] = table_cache_->NewIterator(
-              options, files[i]->number, files[i]->file_size);
-        }
-      } else {
-        // Create concatenating iterator for the files from this level
-        list[num++] = NewTwoLevelIterator(
-            new Version::LevelFileNumIterator(icmp_, &c->inputs_[which]),
-            &GetFileIterator, table_cache_, options);
-      }
+      std::vector<FileGroup*>* groups = new std::vector<FileGroup*>();
+      GroupFiles(&icmp_, c->inputs_[which], *groups);
+      list[num] = NewGroupsConcatenatingIterator(options, table_cache_,
+                                                 &icmp_, *groups);
+      list[num]->RegisterCleanup(&DeleteFileGroups, groups, NULL);
+      ++num;
     }
   }
   assert(num <= space);
@@ -1883,14 +2020,10 @@ Compaction* VersionSet::PickCompactionBySize(int level, double score) {
 
     // Do not pick this file if its parents at level+1 are being compacted.
     // Maybe we can avoid redoing this work in SetupOtherInputs
-    int parent_index = -1;
-    if (ParentRangeInCompaction(&f->smallest, &f->largest, level,
-                                &parent_index)) {
+    if (ParentRangeInCompaction(&f->smallest, &f->largest, level)) {
       continue;
     }
     c->inputs_[0].push_back(f);
-    c->base_index_ = index;
-    c->parent_index_ = parent_index;
     break;
   }
 
@@ -1948,23 +2081,26 @@ Compaction* VersionSet::PickCompaction() {
   c->input_version_ = current_;
   c->input_version_->Ref();
 
-  // Files in level 0 may overlap each other, so pick up all overlapping ones
-  // Two level 0 compaction won't run at the same time, so don't need to worry
-  // about files on level 0 being compacted.
-  if (level == 0) {
-    assert(compactions_in_progress_[0].empty());
+  // Files may overlap each other, so pick up all overlapping ones
+  {
     InternalKey smallest, largest;
     GetRange(c->inputs_[0], &smallest, &largest);
     // Note that the next call will discard the file we placed in
     // c->inputs_[0] earlier and replace it with an overlapping set
     // which will include the picked file.
     c->inputs_[0].clear();
-    current_->GetOverlappingInputs(0, &smallest, &largest, &c->inputs_[0]);
-    if (ParentRangeInCompaction(&smallest, &largest,
-                                level, &c->parent_index_)) {
+    current_->GetOverlappingInputs(level, &smallest, &largest, &c->inputs_[0]);
+    if (ParentRangeInCompaction(&smallest, &largest, level)) {
       delete c;
       return NULL;
     }
+    for (unsigned int i = 0; i < c->inputs_[0].size(); i++) {
+      FileMetaData* f = c->inputs_[0][i];
+      if (f->being_compacted) {
+        delete c;
+        return NULL;
+      }
+    }
     assert(!c->inputs_[0].empty());
   }
 
@@ -1981,11 +2117,13 @@ Compaction* VersionSet::PickCompaction() {
 
 // Returns true if any one of the parent files are being compacted
 bool VersionSet::ParentRangeInCompaction(const InternalKey* smallest,
-  const InternalKey* largest, int level, int* parent_index) {
+  const InternalKey* largest, int level) {
+  assert(level >= 0);
+  assert(level < NumberLevels()-1);
   std::vector<FileMetaData*> inputs;
 
   current_->GetOverlappingInputs(level+1, smallest, largest,
-                                 &inputs, *parent_index, parent_index);
+                                 &inputs);
   return FilesInCompaction(inputs);
 }
 
@@ -2001,11 +2139,13 @@ bool VersionSet::FilesInCompaction(std::vector<FileMetaData*>& files) {
 
 void VersionSet::SetupOtherInputs(Compaction* c) {
   const int level = c->level();
+  assert(level >= 0);
+  assert(level < NumberLevels()-1);
+
   InternalKey smallest, largest;
   GetRange(c->inputs_[0], &smallest, &largest);
 
-  current_->GetOverlappingInputs(level+1, &smallest, &largest, &c->inputs_[1],
-                                 c->parent_index_, &c->parent_index_);
+  current_->GetOverlappingInputs(level+1, &smallest, &largest, &c->inputs_[1]);
 
   // Get entire range covered by compaction
   InternalKey all_start, all_limit;
@@ -2015,8 +2155,7 @@ void VersionSet::SetupOtherInputs(Compaction* c) {
   // changing the number of "level+1" files we pick up.
   if (!c->inputs_[1].empty()) {
     std::vector<FileMetaData*> expanded0;
-    current_->GetOverlappingInputs(level, &all_start, &all_limit, &expanded0,
-                                   c->base_index_, NULL);
+    current_->GetOverlappingInputs(level, &all_start, &all_limit, &expanded0);
     const int64_t inputs0_size = TotalFileSize(c->inputs_[0]);
     const int64_t inputs1_size = TotalFileSize(c->inputs_[1]);
     const int64_t expanded0_size = TotalFileSize(expanded0);
@@ -2028,8 +2167,7 @@ void VersionSet::SetupOtherInputs(Compaction* c) {
       GetRange(expanded0, &new_start, &new_limit);
       std::vector<FileMetaData*> expanded1;
       current_->GetOverlappingInputs(level+1, &new_start, &new_limit,
-                                     &expanded1, c->parent_index_,
-                                     &c->parent_index_);
+                                     &expanded1);
       if (expanded1.size() == c->inputs_[1].size() &&
           !FilesInCompaction(expanded1)) {
         Log(options_->info_log,
@@ -2053,8 +2191,10 @@ void VersionSet::SetupOtherInputs(Compaction* c) {
   // Compute the set of grandparent files that overlap this compaction
   // (parent == level+1; grandparent == level+2)
   if (level + 2 < NumberLevels()) {
+    std::vector<FileMetaData*> grandparents;
     current_->GetOverlappingInputs(level + 2, &all_start, &all_limit,
-                                   &c->grandparents_);
+                                   &grandparents);
+    GroupFiles(&icmp_, grandparents, c->grandparents_);
   }
 
   if (false) {
@@ -2121,8 +2261,6 @@ Compaction::Compaction(int level, uint64_t target_file_size,
       grandparent_index_(0),
       seen_key_(false),
       overlapped_bytes_(0),
-      base_index_(-1),
-      parent_index_(-1),
       score_(0) {
   edit_ = new VersionEdit(number_levels_);
   level_ptrs_ = new size_t[number_levels_];
@@ -2137,6 +2275,9 @@ Compaction::~Compaction() {
   if (input_version_ != NULL) {
     input_version_->Unref();
   }
+  for (FileGroup* g: grandparents_) {
+    delete g;
+  }
 }
 
 bool Compaction::IsTrivialMove() const {
@@ -2184,7 +2325,7 @@ bool Compaction::ShouldStopBefore(const Slice& internal_key) {
       icmp->Compare(internal_key,
                     grandparents_[grandparent_index_]->largest.Encode()) > 0) {
     if (seen_key_) {
-      overlapped_bytes_ += grandparents_[grandparent_index_]->file_size;
+      overlapped_bytes_ += grandparents_[grandparent_index_]->total_file_size;
     }
     assert(grandparent_index_ + 1 >= grandparents_.size() ||
            icmp->Compare(grandparents_[grandparent_index_]->largest.Encode(),
index 1f0a9ac174e67f55fc59c4e936a001ed6051b72b..71f05fd03b107016849fb8113277564efdbd7e55 100644 (file)
@@ -57,6 +57,60 @@ extern bool SomeFileOverlapsRange(
     const Slice* smallest_user_key,
     const Slice* largest_user_key);
 
+// Group of files that overlap.
+class FileGroup {
+ public:
+  // smallest and largest store the smallest and largest key in any file in
+  // files.
+  InternalKey smallest;
+  InternalKey largest;
+
+  // Files are sorted by their smallest key.
+  std::vector<FileMetaData*> files;
+
+  // Total size of all files in files.
+  uint64_t total_file_size;
+
+  // Returns an iterator that iterates over all key-value pairs in this group.
+  // No duplicate suppression is done.
+  Iterator* NewIterator(TableCache* cache, const InternalKeyComparator* icmp,
+                        const ReadOptions& options);
+
+  // Return the approximate offset in the group of the data for
+  // "ikey".
+  uint64_t ApproximateOffsetOf(const VersionSet* vset,
+                               const InternalKey& ikey);
+};
+
+// Given a comparator and list of files, groups files into groups of
+// overlapping files. The groups are stored in file_groups and are
+// non-overlapping and are sorted by the given comparator. Additionally,
+// it is guaranteed that no group is empty and each group has the properties
+// guaranteed in FileGroup.
+// file_groups's contents are discarded by this function.
+void GroupFiles(
+    const InternalKeyComparator* internal_comparator,
+    const std::vector<FileMetaData*>& files,
+    std::vector<FileGroup*>& file_groups);
+
+// Return the smallest index i such that groups[i]->largest >= key.
+// Return groups.size() if there is no such group.
+// REQUIRES: "groups" contains a sorted list of non-overlapping groups.
+extern size_t FindGroup(const InternalKeyComparator& icmp,
+                        const std::vector<FileGroup*>& groups,
+                        const Slice& key);
+
+// Returns true iff some group in "groups" overlaps the user key range
+// [*smallest,*largest].
+// smallest==NULL represents a key smaller than all keys in the DB.
+// largest==NULL represents a key largest than all keys in the DB.
+// REQUIRES: "groups" contains a sorted list of non-overlapping groups.
+extern bool SomeGroupOverlapsRange(
+    const InternalKeyComparator& icmp,
+    const std::vector<FileGroup*>& groups,
+    const Slice* smallest_user_key,
+    const Slice* largest_user_key);
+
 class Version {
  public:
   // Append to *iters a sequence of iterators that will
@@ -88,24 +142,7 @@ class Version {
       int level,
       const InternalKey* begin,         // NULL means before all keys
       const InternalKey* end,           // NULL means after all keys
-      std::vector<FileMetaData*>* inputs,
-      int hint_index = -1,              // index of overlap file
-      int* file_index = NULL);          // return index of overlap file
-
-  void GetOverlappingInputsBinarySearch(
-      int level,
-      const Slice& begin,         // NULL means before all keys
-      const Slice& end,           // NULL means after all keys
-      std::vector<FileMetaData*>* inputs,
-      int hint_index,             // index of overlap file
-      int* file_index);           // return index of overlap file
-
-  void ExtendOverlappingInputs(
-      int level,
-      const Slice& begin,         // NULL means before all keys
-      const Slice& end,           // NULL means after all keys
-      std::vector<FileMetaData*>* inputs,
-      int index);                 // start extending from this index
+      std::vector<FileMetaData*>* inputs);
 
   // Returns true iff some file in the specified level overlaps
   // some part of [*smallest_user_key,*largest_user_key].
@@ -132,10 +169,10 @@ class Version {
 
  private:
   friend class Compaction;
+  friend class FileGroup;
   friend class VersionSet;
 
-  class LevelFileNumIterator;
-  Iterator* NewConcatenatingIterator(const ReadOptions&, int level) const;
+  void RebuildGroups(int level);
 
   VersionSet* vset_;            // VersionSet to which this Version belongs
   Version* next_;               // Next version in linked list
@@ -143,9 +180,12 @@ class Version {
   int refs_;                    // Number of live refs to this version
 
   // List of files per level, files in each level are arranged
-  // in increasing order of keys
+  // in increasing order of their start keys. Files may overlap.
   std::vector<FileMetaData*>* files_;
 
+  // List of file groups per level.
+  std::vector<FileGroup*>* file_groups_;
+
   // A list for the same set of files that are stored in files_,
   // but files in each level are now sorted based on file
   // size. The file with the largest size is at the front.
@@ -373,6 +413,7 @@ class VersionSet {
   struct ManifestWriter;
 
   friend class Compaction;
+  friend class FileGroup;
   friend class Version;
 
   void Init(int num_levels);
@@ -457,7 +498,7 @@ class VersionSet {
 
   // Returns true if any one of the parent files are being compacted
   bool ParentRangeInCompaction(const InternalKey* smallest,
-    const InternalKey* largest, int level, int* index);
+    const InternalKey* largest, int level);
 
   // Returns true if any one of the specified files are being compacted
   bool FilesInCompaction(std::vector<FileMetaData*>& files);
@@ -535,13 +576,11 @@ class Compaction {
 
   // State used to check for number of of overlapping grandparent files
   // (parent == level_ + 1, grandparent == level_ + 2)
-  std::vector<FileMetaData*> grandparents_;
+  std::vector<FileGroup*> grandparents_;
   size_t grandparent_index_;  // Index in grandparent_starts_
   bool seen_key_;             // Some output key has been seen
   int64_t overlapped_bytes_;  // Bytes of overlap between current output
                               // and grandparent files
-  int base_index_;   // index of the file in files_[level_]
-  int parent_index_; // index of some file with same range in files_[level_+1]
   double score_;     // score that was used to pick this compaction.
 
   // State for implementing IsBaseLevelForKey
index 75c558a71d21367f5bbcfae4df32023a4cb3e701..f56b49f6c70e5b1bf058f3a8b1af54892250301b 100644 (file)
@@ -172,6 +172,151 @@ TEST(FindFileTest, OverlappingFiles) {
   ASSERT_TRUE(Overlaps("600", "700"));
 }
 
+class FileGroupTest {
+ public:
+  std::vector<FileMetaData*> files_;
+  std::vector<FileGroup*> groups_;
+
+  FileGroupTest() { }
+
+  ~FileGroupTest() {
+    for (unsigned int i = 0; i < groups_.size(); i++) {
+      delete groups_[i];
+    }
+
+    for (unsigned int i = 0; i < files_.size(); i++) {
+      delete files_[i];
+    }
+  }
+
+  void AddFile(const char* smallest, const char* largest,
+               SequenceNumber smallest_seq = 100,
+               SequenceNumber largest_seq = 100) {
+    FileMetaData* f = new FileMetaData;
+    f->number = files_.size() + 1;
+    f->smallest = InternalKey(smallest, smallest_seq, kTypeValue);
+    f->largest = InternalKey(largest, largest_seq, kTypeValue);
+    f->file_size = smallest_seq + (largest_seq - smallest_seq)/2;
+    files_.push_back(f);
+  }
+
+  void RebuildGroups() {
+    for (unsigned int i = 0; i < groups_.size(); i++) {
+      delete groups_[i];
+    }
+
+    InternalKeyComparator cmp(BytewiseComparator());
+    GroupFiles(&cmp, files_, groups_);
+
+    CheckGroupsValid();
+  }
+
+  size_t Find(const char* key, SequenceNumber seq = 100) {
+    InternalKey target(key, seq, kTypeValue);
+    InternalKeyComparator cmp(BytewiseComparator());
+    return FindGroup(cmp, groups_, target.Encode());
+  }
+
+  bool Overlaps(const char* smallest, const char* largest) {
+    InternalKeyComparator cmp(BytewiseComparator());
+    Slice s(smallest != NULL ? smallest : "");
+    Slice l(largest != NULL ? largest : "");
+    return SomeGroupOverlapsRange(cmp, groups_,
+                                 (smallest != NULL ? &s : NULL),
+                                 (largest != NULL ? &l : NULL));
+  }
+
+  void CheckGroupsValid() {
+    InternalKeyComparator cmp(BytewiseComparator());
+
+    // Check group internal consistency.
+    for (size_t i = 0; i < groups_.size(); ++i) {
+      FileGroup& group = *groups_[i];
+
+      // Check all groups are non-empty
+      ASSERT_TRUE(!group.files.empty());
+
+      // Check files in all groups are sorted by smallest key.
+      for (size_t j = 0; j < group.files.size()-1; ++j) {
+        ASSERT_TRUE(cmp.Compare(group.files[j]->smallest,
+                                group.files[j+1]->smallest) < 0);
+      }
+
+      // Check smallest key in group is correct.
+      ASSERT_TRUE(cmp.Compare(group.smallest, group.files[0]->smallest) == 0);
+
+      // Check files in group actually overlap.
+      InternalKey largest_so_far = group.files[0]->largest;
+      for (size_t j = 0; j < group.files.size(); ++j) {
+        ASSERT_TRUE(cmp.Compare(largest_so_far,
+                                group.files[j]->smallest) >= 0);
+
+        if (cmp.Compare(largest_so_far, group.files[j]->largest) < 0) {
+          largest_so_far = group.files[j]->largest;
+        }
+      }
+
+      // Check largest key in group is correct.
+      ASSERT_TRUE(cmp.Compare(group.largest, largest_so_far) == 0);
+
+      // Check total file size is valid.
+      uint64_t total_file_size = 0;
+      for (size_t j = 0; j < group.files.size(); ++j) {
+        total_file_size += group.files[j]->file_size;
+      }
+      ASSERT_EQ(total_file_size, group.total_file_size);
+    }
+
+    // Check groups are sorted and don't overlap.
+    for (size_t i = 0; i < groups_.size()-1; ++i) {
+      ASSERT_TRUE(cmp.Compare(groups_[i]->largest,
+                              groups_[i+1]->smallest) < 0);
+    }
+  }
+};
+
+TEST(FileGroupTest, Construction) {
+  AddFile("150", "600");
+  AddFile("250", "700");
+  AddFile("650", "800");
+  AddFile("801", "805");
+  RebuildGroups();
+
+  ASSERT_EQ(groups_.size(), 2u);
+}
+
+TEST(FileGroupTest, OverlappingGroups) {
+  AddFile("150", "600");
+  AddFile("250", "700");
+  AddFile("650", "800");
+  AddFile("851", "855");
+  RebuildGroups();
+
+  ASSERT_TRUE(Overlaps(NULL, NULL));
+  ASSERT_TRUE(!Overlaps(NULL, "149"));
+  ASSERT_TRUE(Overlaps(NULL, "150"));
+  ASSERT_TRUE(Overlaps("855", NULL));
+  ASSERT_TRUE(!Overlaps("801", "850"));
+  ASSERT_TRUE(Overlaps("601", "649"));
+}
+
+TEST(FileGroupTest, FindingGroups) {
+  AddFile("100", "200");
+  AddFile("300", "400");
+  AddFile("600", "800");
+  RebuildGroups();
+
+  ASSERT_EQ(groups_.size(), 3u);
+
+  ASSERT_EQ(Find("0"), 0u);
+  ASSERT_EQ(Find("200"), 0u);
+  ASSERT_EQ(Find("201"), 1u);
+  ASSERT_EQ(Find("400"), 1u);
+  ASSERT_EQ(Find("401"), 2u);
+  ASSERT_EQ(Find("800"), 2u);
+  ASSERT_EQ(Find("801"), 3u);
+}
+
 }  // namespace leveldb
 
 int main(int argc, char** argv) {
index 0432a4749deb1031ea774fdd2bcd277d7e198dd0..6842bfa92006205bb9017cdb37ac71f7f014a8f5 100644 (file)
@@ -115,6 +115,9 @@ class Cache {
   // REQUIRES: handler has been added with AddHandler()
   virtual void RemoveHandler(void* handler);
 
+  // Forcefully flushes all metrics currently held by the cache.
+  virtual void ForceFlushMetrics();
+
  private:
   void LRU_Remove(Handle* e);
   void LRU_Append(Handle* e);
index ad543eb46cde9af30f9250ee2eaa7f0979cc2994..3487b8a4d41097dabeb4f5bf1c18331cf6d01138 100644 (file)
@@ -67,6 +67,14 @@ class Iterator {
   // If an error has occurred, return it.  Else return an ok status.
   virtual Status status() const = 0;
 
+  // If this Iterator makes use of some sub-Iterator to provide the current
+  // key-value pair, then this function returns a pointer to that
+  // Iterator-instance.
+  //
+  // Returns NULL if either Valid() is false or no sub-Iterator is providing
+  // the key-value pair.
+  virtual const Iterator* FindSubIterator() const { return NULL; };
+
   // Clients are allowed to register function/arg1/arg2 triples that
   // will be invoked when this iterator is destroyed.
   //
index c422fb027166ad183929ca76846835945402402c..3145fc46eedb74f047fd6b19f3b9e25debaf53b6 100644 (file)
@@ -381,15 +381,22 @@ struct ReadOptions {
   // Default: NULL
   const Snapshot* snapshot;
 
+  // If true accesses are recorded when the database is in hot-cold mode.
+  // Otherwise, they aren't.
+  // Default: true;
+  bool record_accesses;
+
   ReadOptions()
       : verify_checksums(false),
         fill_cache(true),
         snapshot(NULL),
+        record_accesses(true),
         metrics_handler(NULL) {
   }
   ReadOptions(bool cksum, bool cache) :
               verify_checksums(cksum), fill_cache(cache),
               snapshot(NULL),
+              record_accesses(true),
               metrics_handler(NULL) {
   }
 
index 11a4029ef1396dfe4cf3f7f8ce16d97e6bed9617..25ee5c7c484e6296b7f961bb89b27954be5da4cd 100644 (file)
@@ -227,8 +227,6 @@ class Block::Iter : public Iterator {
   }
 
  private:
-  friend class Block;
-
   void CorruptionError() {
     current_ = restarts_;
     restart_index_ = num_restarts_;
@@ -276,15 +274,21 @@ class Block::Iter : public Iterator {
 // This is the iterator returned by Block::NewMetricsIterator() on success.
 class Block::MetricsIter : public Block::Iter {
  private:
+  uint64_t file_number_;
+  uint64_t block_offset_;
   BlockMetrics* metrics_;
 
  public:
   MetricsIter(const Comparator* comparator,
+              uint64_t file_number,
+              uint64_t block_offset,
               const char* data,
               uint32_t restarts,
               uint32_t num_restarts,
               BlockMetrics* metrics)
       : Block::Iter(comparator, data, restarts, num_restarts),
+        file_number_(file_number),
+        block_offset_(block_offset),
         metrics_(metrics) {
   }
 
@@ -317,6 +321,8 @@ class Block::MetricsIter : public Block::Iter {
   }
 
  private:
+  friend class Block;
+
   void RecordAccess() {
     if (metrics_ != NULL && Valid()) {
       metrics_->RecordAccess(restart_index_, restart_offset_);
@@ -336,6 +342,22 @@ Iterator* Block::NewIterator(const Comparator* cmp) {
   }
 }
 
+
+Iterator* Block::NewIterator(const Comparator* cmp,
+                             uint64_t file_number,
+                             uint64_t block_offset) {
+  if (size_ < 2*sizeof(uint32_t)) {
+    return NewErrorIterator(Status::Corruption("bad block contents"));
+  }
+  const uint32_t num_restarts = NumRestarts();
+  if (num_restarts == 0) {
+    return NewEmptyIterator();
+  } else {
+    return new MetricsIter(cmp, file_number, block_offset, data_,
+                           restart_offset_, num_restarts, NULL);
+  }
+}
+
 Iterator* Block::NewMetricsIterator(const Comparator* cmp,
                                     uint64_t file_number,
                                     uint64_t block_offset,
@@ -352,11 +374,31 @@ Iterator* Block::NewMetricsIterator(const Comparator* cmp,
   } else {
     *metrics = new BlockMetrics(file_number, block_offset, num_restarts,
                                 kBytesPerRestart);
-    return new MetricsIter(cmp, data_, restart_offset_, num_restarts,
-                           *metrics);
+    return new MetricsIter(cmp, file_number, block_offset, data_,
+                           restart_offset_, num_restarts, *metrics);
   }
 }
 
+bool Block::GetBlockIterInfo(const Iterator* iter,
+                             uint64_t& file_number,
+                             uint64_t& block_offset,
+                             uint32_t& restart_index,
+                             uint32_t& restart_offset) {
+  const MetricsIter* biter = dynamic_cast<const MetricsIter*>(iter);
+
+  if (biter == NULL) {
+    return false;
+  }
+
+  file_number = biter->file_number_;
+  block_offset = biter->block_offset_;
+  restart_index = biter->restart_index_;
+  restart_offset = biter->restart_offset_;
+
+  return true;
+}
+
+
 BlockMetrics::BlockMetrics(uint64_t file_number, uint64_t block_offset,
                            uint32_t num_restarts, uint32_t bytes_per_restart)
   : file_number_(file_number),
@@ -377,6 +419,15 @@ BlockMetrics::BlockMetrics(uint64_t file_number, uint64_t block_offset,
   memcpy(metrics_, data.data(), kBlockMetricsSize);
 }
 
+void BlockMetrics::CreateDBKey(uint64_t file_number, uint64_t block_offset,
+                               std::string* db_key) {
+  assert(db_key != NULL);
+
+  db_key->clear();
+  PutFixed64(db_key, file_number);
+  PutFixed64(db_key, block_offset);
+}
+
 BlockMetrics* BlockMetrics::Create(uint64_t file_number, uint64_t block_offset,
                                    const std::string& db_value) {
   Slice data(db_value);
@@ -440,6 +491,11 @@ bool BlockMetrics::IsCompatible(const BlockMetrics* bm) const {
           bm->block_offset_ == block_offset_);
 }
 
+bool BlockMetrics::IsSameBlock(uint64_t file_number, uint64_t block_offset) const {
+  return (file_number_ == file_number &&
+          block_offset_ == block_offset);
+}
+
 void BlockMetrics::Join(const BlockMetrics* bm) {
   assert(IsCompatible(bm));
 
index a6e162cd54cb8b8e03e5b90efa160bc947316fef..8193a06799dab7c3b66f4915bd86f79a0e22a29e 100644 (file)
@@ -26,6 +26,12 @@ class Block {
   size_t size() const { return size_; }
   Iterator* NewIterator(const Comparator* comparator);
 
+  // Creates an iterator on the block that knows which file and block it
+  // belongs to.
+  Iterator* NewIterator(const Comparator* comparator,
+                        uint64_t file_number,
+                        uint64_t block_offset);
+
   // Creates a new iterator that keeps track of accesses.
   //
   // Creates a BlockMetrics object on the heap and sets metrics to it.
@@ -37,6 +43,14 @@ class Block {
                                uint64_t block_offset,
                                BlockMetrics** metrics);
 
+  // Returns true if iter is a Block iterator and also knows that which file
+  // and block it belongs to.
+  static bool GetBlockIterInfo(const Iterator* iter,
+                               uint64_t& file_number,
+                               uint64_t& block_offset,
+                               uint32_t& restart_index,
+                               uint32_t& restart_offset);
+
  private:
   uint32_t NumRestarts() const;
 
@@ -75,6 +89,11 @@ class BlockMetrics {
   BlockMetrics(uint64_t file_number, uint64_t block_offset,
                uint32_t num_restarts, uint32_t bytes_per_restart);
 
+  // Clears and puts the DB key for the file_number-block_offset-pair in
+  // *db_key.
+  static void CreateDBKey(uint64_t file_number, uint64_t block_offset,
+                          std::string* db_key);
+
   // Creates a BlockMetrics object from the DB key and value. Returns NULL if
   // either/both are invalid.
   static BlockMetrics* Create(const std::string& db_key,
@@ -101,6 +120,9 @@ class BlockMetrics {
   // Returns true if bm represents metrics for the same block.
   bool IsCompatible(const BlockMetrics* bm) const;
 
+  // Returns true if the given file_number and block_offset match this block's.
+  bool IsSameBlock(uint64_t file_number, uint64_t block_offset) const;
+
   // Joins the metrics from the other metrics into this one.
   // REQUIRES: this->IsCompatible(bm);
   void Join(const BlockMetrics* bm);
index 7c5ef92f4b5f3c10319973cdd63fc8e105190d14..410ff95893affc110d2e37f83e04a0b1f9be43a1 100644 (file)
@@ -168,6 +168,13 @@ class MergingIterator : public Iterator {
     return status;
   }
 
+  virtual const Iterator* FindSubIterator() const {
+    if (!Valid()) {
+      return NULL;
+    }
+    return current_->iter();
+  };
+
  private:
   void FindSmallest();
   void FindLargest();
diff --git a/table/metrics_info.cc b/table/metrics_info.cc
new file mode 100644 (file)
index 0000000..89b9326
--- /dev/null
@@ -0,0 +1,69 @@
+// Copyright (c) 2013 Facebook.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#include "table/metrics_info.h"
+
+#include "leveldb/status.h"
+#include "table/block.h"
+
+namespace leveldb {
+
+bool IsRecordHot(const Iterator* iter, DB* metrics_db,
+                 const ReadOptions& metrics_read_opts,
+                 BlockMetrics** block_metrics_store) {
+  assert(iter != NULL);
+  assert(metrics_db != NULL);
+  assert(block_metrics_store != NULL);
+
+  static const bool kDefaultHotness = false;
+
+  bool has_block_info = false;
+  uint64_t file_number;
+  uint64_t block_offset;
+  uint32_t restart_index;
+  uint32_t restart_offset;
+
+  do {
+    if (Block::GetBlockIterInfo(iter, file_number, block_offset, restart_index,
+                                restart_offset)) {
+      has_block_info = true;
+      continue;
+    }
+  } while ((iter = iter->FindSubIterator()) != NULL);
+
+  if (!has_block_info) {
+    return kDefaultHotness;
+  }
+
+  if ((*block_metrics_store) == NULL ||
+      !(*block_metrics_store)->IsSameBlock(file_number, block_offset)) {
+    // Stored block metrics is invalid so we have to load a new one.
+
+    // Free previous block metrics if any
+    if ((*block_metrics_store) != NULL) {
+      delete *block_metrics_store;
+      *block_metrics_store = NULL;
+    }
+
+    std::string db_key;
+    BlockMetrics::CreateDBKey(file_number, block_offset, &db_key);
+
+    std::string db_value;
+    Status s = metrics_db->Get(metrics_read_opts, db_key, &db_value);
+    if (!s.ok()) {
+      return kDefaultHotness;
+    }
+
+    *block_metrics_store = BlockMetrics::Create(file_number, block_offset,
+                                                db_value);
+  }
+
+  if ((*block_metrics_store) != NULL) {
+    return (*block_metrics_store)->IsHot(restart_index, restart_offset);
+  }
+
+  return kDefaultHotness;
+}
+
+}  // namespace leveldb
diff --git a/table/metrics_info.h b/table/metrics_info.h
new file mode 100644 (file)
index 0000000..f6d5c61
--- /dev/null
@@ -0,0 +1,31 @@
+// Copyright (c) 2013 Facebook.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#ifndef STORAGE_LEVELDB_TABLE_BLOCK_METRICS_INFO_H_
+#define STORAGE_LEVELDB_TABLE_BLOCK_METRICS_INFO_H_
+
+#include "leveldb/db.h"
+#include "leveldb/iterator.h"
+#include "leveldb/options.h"
+#include <stdint.h>
+
+namespace leveldb {
+
+class BlockMetrics;
+
+// Returns true if the record pointed too by iter is hot according to the
+// value stored in metrics_db.
+//
+// *block_metrics_store temporarily stores a BlockMetrics instance.  At the
+// end, when the application no longer plans to call IsRecordHot(), if
+// *block_metrics_store is not NULL then this value must be deleted.
+//
+// REQUIRES: neither iter, metrics_db nor block_metrics_store is NULL.
+bool IsRecordHot(const Iterator* iter, DB* metrics_db,
+                 const ReadOptions& metrics_read_opts,
+                 BlockMetrics** block_metrics_store);
+
+}  // namespace leveldb
+
+#endif  // STORAGE_LEVELDB_TABLE_BLOCK_METRICS_INFO_H_
index 2dd93c8840fbe05281900e6a9de4b0d922952056..48a0abffe4cc4564c00cce1d03b5ecead994daaf 100644 (file)
@@ -239,7 +239,9 @@ Iterator* Table::BlockReader(void* arg,
   Iterator* iter;
   if (block != NULL) {
     if (options.metrics_handler == NULL || cache_handle == NULL) {
-      iter = block->NewIterator(table->rep_->options.comparator);
+      iter = block->NewIterator(table->rep_->options.comparator,
+                                table->rep_->file_number,
+                                handle.offset());
 
       if (cache_handle == NULL) {
         iter->RegisterCleanup(&DeleteBlock, block, NULL);
index 7822ebab9c32ce579c42f9621545d7283e8332b9..66bdc58ecda89ae793b323eee466f58726898610 100644 (file)
@@ -53,6 +53,14 @@ class TwoLevelIterator: public Iterator {
     }
   }
 
+  virtual const Iterator* FindSubIterator() const {
+    if (!Valid()) {
+      return NULL;
+    }
+    return data_iter_.iter();
+  };
+
+
  private:
   void SaveError(const Status& s) {
     if (status_.ok() && !s.ok()) status_ = s;
index cf9e57819a3a94c0e735c300f73449669958c3d6..7dce68d725d755d2da330e6fc6aa5480ea817d1b 100644 (file)
@@ -30,6 +30,8 @@ void Cache::AddHandler(
 }
 void Cache::RemoveHandler(void* handler) {
 }
+void Cache::ForceFlushMetrics() {
+}
 
 
 namespace {
@@ -175,6 +177,7 @@ class LRUCache {
       void* handler,
       void (*handler_func)(void*, std::vector<BlockMetrics*>*));
   void RemoveHandler(void* handler);
+  void ForceFlushMetrics();
 
  private:
   void LRU_Remove(LRUHandle* e);
@@ -350,6 +353,15 @@ void LRUCache::RemoveHandler(void* handler) {
   metrics_store_.erase(handler);
 }
 
+void LRUCache::ForceFlushMetrics() {
+  std::map<void*, std::vector<BlockMetrics*>*>::iterator it;
+  for (it = metrics_store_.begin();
+       it != metrics_store_.end(); ++it) {
+    void* handler = it->first;
+    (*handlers_[handler])(handler, metrics_store_[handler]);
+    metrics_store_[handler] = new std::vector<BlockMetrics*>();
+  }
+}
 
 static int kNumShardBits = 4;         // default values, can be overridden
 
@@ -439,6 +451,12 @@ class ShardedLRUCache : public Cache {
       shard_[i].RemoveHandler(handler);
     }
   }
+
+  void ForceFlushMetrics() {
+    for (size_t i = 0; i < numShards_; ++i) {
+      shard_[i].ForceFlushMetrics();
+    }
+  }
 };
 
 }  // end anonymous namespace