--- /dev/null
+// Copyright 2008-present Facebook. All Rights Reserved.
+#ifndef STORAGE_LEVELDB_ITER_HEAP_H_
+#define STORAGE_LEVELDB_ITER_HEAP_H_
+
+#include <queue>
+
+#include "leveldb/comparator.h"
+#include "table/iterator_wrapper.h"
+
+namespace leveldb {
+
+// Return the max of two keys.
+class MaxIteratorComparator {
+ public:
+ MaxIteratorComparator(const Comparator* comparator) :
+ comparator_(comparator) {}
+
+ bool operator()(IteratorWrapper* a, IteratorWrapper* b) {
+ return comparator_->Compare(a->key(), b->key()) <= 0;
+ }
+ private:
+ const Comparator* comparator_;
+};
+
+// Return the max of two keys.
+class MinIteratorComparator {
+ public:
+ // if maxHeap is set comparator returns the max value.
+ // else returns the min Value.
+ // Can use to create a minHeap or a maxHeap.
+ MinIteratorComparator(const Comparator* comparator) :
+ comparator_(comparator) {}
+
+ bool operator()(IteratorWrapper* a, IteratorWrapper* b) {
+ return comparator_->Compare(a->key(), b->key()) > 0;
+ }
+ private:
+ const Comparator* comparator_;
+};
+
+typedef std::priority_queue<
+ IteratorWrapper*,
+ std::vector<IteratorWrapper*>,
+ MaxIteratorComparator> MaxIterHeap;
+
+typedef std::priority_queue<
+ IteratorWrapper*,
+ std::vector<IteratorWrapper*>,
+ MinIteratorComparator> MinIterHeap;
+
+// Return's a new MaxHeap of IteratorWrapper's using the provided Comparator.
+MaxIterHeap NewMaxIterHeap(const Comparator* comparator) {
+ return MaxIterHeap(MaxIteratorComparator(comparator));
+}
+
+// Return's a new MinHeap of IteratorWrapper's using the provided Comparator.
+MinIterHeap NewMinIterHeap(const Comparator* comparator) {
+ return MinIterHeap(MinIteratorComparator(comparator));
+}
+
+} // namespace leveldb
+
+#endif // STORAGE_LEVELDB_ITER_HEAP_H_
#include "leveldb/comparator.h"
#include "leveldb/iterator.h"
+#include "table/iter_heap.h"
#include "table/iterator_wrapper.h"
namespace leveldb {
namespace {
+
class MergingIterator : public Iterator {
public:
MergingIterator(const Comparator* comparator, Iterator** children, int n)
children_(new IteratorWrapper[n]),
n_(n),
current_(NULL),
- direction_(kForward) {
+ direction_(kForward),
+ maxHeap_(NewMaxIterHeap(comparator_)),
+ minHeap_ (NewMinIterHeap(comparator_)) {
for (int i = 0; i < n; i++) {
children_[i].Set(children[i]);
}
+ for (int i = 0; i < n; ++i) {
+ if (children_[i].Valid()) {
+ minHeap_.push(&children_[i]);
+ }
+ }
}
virtual ~MergingIterator() {
}
virtual void SeekToFirst() {
+ ClearHeaps();
for (int i = 0; i < n_; i++) {
children_[i].SeekToFirst();
+ if (children_[i].Valid()) {
+ minHeap_.push(&children_[i]);
+ }
}
FindSmallest();
direction_ = kForward;
}
virtual void SeekToLast() {
+ ClearHeaps();
for (int i = 0; i < n_; i++) {
children_[i].SeekToLast();
+ if (children_[i].Valid()) {
+ maxHeap_.push(&children_[i]);
+ }
}
FindLargest();
direction_ = kReverse;
}
virtual void Seek(const Slice& target) {
+ ClearHeaps();
for (int i = 0; i < n_; i++) {
children_[i].Seek(target);
+ if (children_[i].Valid()) {
+ minHeap_.push(&children_[i]);
+ }
}
FindSmallest();
direction_ = kForward;
// the smallest child and key() == current_->key(). Otherwise,
// we explicitly position the non-current_ children.
if (direction_ != kForward) {
+ ClearHeaps();
for (int i = 0; i < n_; i++) {
IteratorWrapper* child = &children_[i];
if (child != current_) {
comparator_->Compare(key(), child->key()) == 0) {
child->Next();
}
+ if (child->Valid()) {
+ minHeap_.push(child);
+ }
}
}
direction_ = kForward;
}
+ // as the current points to the current record. move the iterator forward.
+ // and if it is valid add it to the heap.
current_->Next();
+ if (current_->Valid()){
+ minHeap_.push(current_);
+ }
FindSmallest();
}
virtual void Prev() {
assert(Valid());
-
// Ensure that all children are positioned before key().
// If we are moving in the reverse direction, it is already
// true for all of the non-current_ children since current_ is
// the largest child and key() == current_->key(). Otherwise,
// we explicitly position the non-current_ children.
if (direction_ != kReverse) {
+ ClearHeaps();
for (int i = 0; i < n_; i++) {
IteratorWrapper* child = &children_[i];
if (child != current_) {
// Child has no entries >= key(). Position at last entry.
child->SeekToLast();
}
+ if (child->Valid()) {
+ maxHeap_.push(child);
+ }
}
}
direction_ = kReverse;
}
current_->Prev();
+ if (current_->Valid()) {
+ maxHeap_.push(current_);
+ }
FindLargest();
}
private:
void FindSmallest();
void FindLargest();
+ void ClearHeaps();
- // We might want to use a heap in case there are lots of children.
- // For now we use a simple array since we expect a very small number
- // of children in leveldb.
const Comparator* comparator_;
IteratorWrapper* children_;
int n_;
IteratorWrapper* current_;
-
// Which direction is the iterator moving?
enum Direction {
kForward,
kReverse
};
Direction direction_;
+ MaxIterHeap maxHeap_;
+ MinIterHeap minHeap_;
};
void MergingIterator::FindSmallest() {
- IteratorWrapper* smallest = NULL;
- for (int i = 0; i < n_; i++) {
- IteratorWrapper* child = &children_[i];
- if (child->Valid()) {
- if (smallest == NULL) {
- smallest = child;
- } else if (comparator_->Compare(child->key(), smallest->key()) < 0) {
- smallest = child;
- }
- }
+ assert (direction_ == kForward);
+ if (minHeap_.empty()) {
+ current_ = NULL;
+ } else {
+ current_ = minHeap_.top();
+ assert(current_->Valid());
+ minHeap_.pop();
}
- current_ = smallest;
}
void MergingIterator::FindLargest() {
- IteratorWrapper* largest = NULL;
- for (int i = n_-1; i >= 0; i--) {
- IteratorWrapper* child = &children_[i];
- if (child->Valid()) {
- if (largest == NULL) {
- largest = child;
- } else if (comparator_->Compare(child->key(), largest->key()) > 0) {
- largest = child;
- }
- }
+ assert(direction_ == kReverse);
+ if (maxHeap_.empty()) {
+ current_ = NULL;
+ } else {
+ current_ = maxHeap_.top();
+ assert(current_->Valid());
+ maxHeap_.pop();
}
- current_ = largest;
+}
+
+void MergingIterator::ClearHeaps() {
+ maxHeap_ = NewMaxIterHeap(comparator_);
+ minHeap_ = NewMinIterHeap(comparator_);
}
} // namespace