]> git-server-git.apps.pok.os.sepia.ceph.com Git - rocksdb.git/commitdiff
Use a priority queue to merge files.
authorAbhishek Kona <abhishekk@fb.com>
Wed, 26 Dec 2012 19:51:36 +0000 (11:51 -0800)
committerAbhishek Kona <abhishekk@fb.com>
Wed, 2 Jan 2013 21:52:25 +0000 (13:52 -0800)
Summary:
Use a std::priority_queue in merger.cc instead of doing a o(n) search
every time.
Currently only the ForwardIteration uses a Priority Queue.

Test Plan: make all check

Reviewers: dhruba

Reviewed By: dhruba

CC: emayanke, zshao
Differential Revision: https://reviews.facebook.net/D7629

table/iter_heap.h [new file with mode: 0644]
table/merger.cc

diff --git a/table/iter_heap.h b/table/iter_heap.h
new file mode 100644 (file)
index 0000000..52518ba
--- /dev/null
@@ -0,0 +1,63 @@
+// Copyright 2008-present Facebook. All Rights Reserved.
+#ifndef STORAGE_LEVELDB_ITER_HEAP_H_
+#define STORAGE_LEVELDB_ITER_HEAP_H_
+
+#include <queue>
+
+#include "leveldb/comparator.h"
+#include "table/iterator_wrapper.h"
+
+namespace leveldb {
+
+// Return the max of two keys.
+class MaxIteratorComparator {
+ public:
+  MaxIteratorComparator(const Comparator* comparator) :
+    comparator_(comparator) {}
+
+  bool operator()(IteratorWrapper* a, IteratorWrapper* b) {
+    return comparator_->Compare(a->key(), b->key()) <= 0;
+  }
+ private:
+  const Comparator* comparator_;
+};
+
+// Return the max of two keys.
+class MinIteratorComparator {
+ public:
+  // if maxHeap is set comparator returns the max value.
+  // else returns the min Value.
+  // Can use to create a minHeap or a maxHeap.
+  MinIteratorComparator(const Comparator* comparator) :
+    comparator_(comparator) {}
+
+  bool operator()(IteratorWrapper* a, IteratorWrapper* b) {
+    return comparator_->Compare(a->key(), b->key()) > 0;
+  }
+ private:
+  const Comparator* comparator_;
+};
+
+typedef std::priority_queue<
+          IteratorWrapper*,
+          std::vector<IteratorWrapper*>,
+          MaxIteratorComparator> MaxIterHeap;
+
+typedef std::priority_queue<
+          IteratorWrapper*,
+          std::vector<IteratorWrapper*>,
+          MinIteratorComparator> MinIterHeap;
+
+// Return's a new MaxHeap of IteratorWrapper's using the provided Comparator.
+MaxIterHeap NewMaxIterHeap(const Comparator* comparator) {
+  return MaxIterHeap(MaxIteratorComparator(comparator));
+}
+
+// Return's a new MinHeap of IteratorWrapper's using the provided Comparator.
+MinIterHeap NewMinIterHeap(const Comparator* comparator) {
+  return MinIterHeap(MinIteratorComparator(comparator));
+}
+
+}  // namespace leveldb
+
+#endif  // STORAGE_LEVELDB_ITER_HEAP_H_
index 2dde4dc21fde9d86e98f5a3f3b493745d07a22f7..5f7b24bd875952225de33eae3d385eff93823b12 100644 (file)
@@ -6,11 +6,13 @@
 
 #include "leveldb/comparator.h"
 #include "leveldb/iterator.h"
+#include "table/iter_heap.h"
 #include "table/iterator_wrapper.h"
 
 namespace leveldb {
 
 namespace {
+
 class MergingIterator : public Iterator {
  public:
   MergingIterator(const Comparator* comparator, Iterator** children, int n)
@@ -18,10 +20,17 @@ class MergingIterator : public Iterator {
         children_(new IteratorWrapper[n]),
         n_(n),
         current_(NULL),
-        direction_(kForward) {
+        direction_(kForward),
+        maxHeap_(NewMaxIterHeap(comparator_)),
+        minHeap_ (NewMinIterHeap(comparator_)) {
     for (int i = 0; i < n; i++) {
       children_[i].Set(children[i]);
     }
+    for (int i = 0; i < n; ++i) {
+      if (children_[i].Valid()) {
+        minHeap_.push(&children_[i]);
+      }
+    }
   }
 
   virtual ~MergingIterator() {
@@ -33,24 +42,36 @@ class MergingIterator : public Iterator {
   }
 
   virtual void SeekToFirst() {
+    ClearHeaps();
     for (int i = 0; i < n_; i++) {
       children_[i].SeekToFirst();
+      if (children_[i].Valid()) {
+        minHeap_.push(&children_[i]);
+      }
     }
     FindSmallest();
     direction_ = kForward;
   }
 
   virtual void SeekToLast() {
+    ClearHeaps();
     for (int i = 0; i < n_; i++) {
       children_[i].SeekToLast();
+      if (children_[i].Valid()) {
+        maxHeap_.push(&children_[i]);
+      }
     }
     FindLargest();
     direction_ = kReverse;
   }
 
   virtual void Seek(const Slice& target) {
+    ClearHeaps();
     for (int i = 0; i < n_; i++) {
       children_[i].Seek(target);
+      if (children_[i].Valid()) {
+        minHeap_.push(&children_[i]);
+      }
     }
     FindSmallest();
     direction_ = kForward;
@@ -65,6 +86,7 @@ class MergingIterator : public Iterator {
     // the smallest child and key() == current_->key().  Otherwise,
     // we explicitly position the non-current_ children.
     if (direction_ != kForward) {
+      ClearHeaps();
       for (int i = 0; i < n_; i++) {
         IteratorWrapper* child = &children_[i];
         if (child != current_) {
@@ -73,24 +95,32 @@ class MergingIterator : public Iterator {
               comparator_->Compare(key(), child->key()) == 0) {
             child->Next();
           }
+          if (child->Valid()) {
+            minHeap_.push(child);
+          }
         }
       }
       direction_ = kForward;
     }
 
+    // as the current points to the current record. move the iterator forward.
+    // and if it is valid add it to the heap.
     current_->Next();
+    if (current_->Valid()){
+      minHeap_.push(current_);
+    }
     FindSmallest();
   }
 
   virtual void Prev() {
     assert(Valid());
-
     // Ensure that all children are positioned before key().
     // If we are moving in the reverse direction, it is already
     // true for all of the non-current_ children since current_ is
     // the largest child and key() == current_->key().  Otherwise,
     // we explicitly position the non-current_ children.
     if (direction_ != kReverse) {
+      ClearHeaps();
       for (int i = 0; i < n_; i++) {
         IteratorWrapper* child = &children_[i];
         if (child != current_) {
@@ -102,12 +132,18 @@ class MergingIterator : public Iterator {
             // Child has no entries >= key().  Position at last entry.
             child->SeekToLast();
           }
+          if (child->Valid()) {
+            maxHeap_.push(child);
+          }
         }
       }
       direction_ = kReverse;
     }
 
     current_->Prev();
+    if (current_->Valid()) {
+      maxHeap_.push(current_);
+    }
     FindLargest();
   }
 
@@ -135,51 +171,47 @@ class MergingIterator : public Iterator {
  private:
   void FindSmallest();
   void FindLargest();
+  void ClearHeaps();
 
-  // We might want to use a heap in case there are lots of children.
-  // For now we use a simple array since we expect a very small number
-  // of children in leveldb.
   const Comparator* comparator_;
   IteratorWrapper* children_;
   int n_;
   IteratorWrapper* current_;
-
   // Which direction is the iterator moving?
   enum Direction {
     kForward,
     kReverse
   };
   Direction direction_;
+  MaxIterHeap maxHeap_;
+  MinIterHeap minHeap_;
 };
 
 void MergingIterator::FindSmallest() {
-  IteratorWrapper* smallest = NULL;
-  for (int i = 0; i < n_; i++) {
-    IteratorWrapper* child = &children_[i];
-    if (child->Valid()) {
-      if (smallest == NULL) {
-        smallest = child;
-      } else if (comparator_->Compare(child->key(), smallest->key()) < 0) {
-        smallest = child;
-      }
-    }
+  assert (direction_ == kForward);
+  if (minHeap_.empty()) {
+    current_ = NULL;
+  } else {
+    current_ = minHeap_.top();
+    assert(current_->Valid());
+    minHeap_.pop();
   }
-  current_ = smallest;
 }
 
 void MergingIterator::FindLargest() {
-  IteratorWrapper* largest = NULL;
-  for (int i = n_-1; i >= 0; i--) {
-    IteratorWrapper* child = &children_[i];
-    if (child->Valid()) {
-      if (largest == NULL) {
-        largest = child;
-      } else if (comparator_->Compare(child->key(), largest->key()) > 0) {
-        largest = child;
-      }
-    }
+  assert(direction_ == kReverse);
+  if (maxHeap_.empty()) {
+    current_ = NULL;
+  } else {
+    current_ = maxHeap_.top();
+    assert(current_->Valid());
+    maxHeap_.pop();
   }
-  current_ = largest;
+}
+
+void MergingIterator::ClearHeaps() {
+  maxHeap_ = NewMaxIterHeap(comparator_);
+  minHeap_ = NewMinIterHeap(comparator_);
 }
 }  // namespace