#include "port/port.h"
#include "table/block.h"
#include "table/merger.h"
+#include "table/metrics_info.h"
#include "table/two_level_iterator.h"
#include "util/coding.h"
#include "util/logging.h"
// Files produced by compaction
struct Output {
uint64_t number;
+ bool skip; // Whether this output should be skipped due to being empty.
uint64_t file_size;
InternalKey smallest, largest;
};
std::list<uint64_t> allocated_file_numbers;
// State kept for output being generated
- WritableFile* outfile;
- TableBuilder* builder;
+ // We have potentially have more than one outfile due to hot-cold separation
+ // needing both a hot file and a cold file to output to.
+ size_t num_outfiles;
+ WritableFile** outfiles;
+ TableBuilder** builders;
uint64_t total_bytes;
- Output* current_output() { return &outputs[outputs.size()-1]; }
+ Output* current_output(size_t idx) {
+ assert(idx < num_outfiles);
+ return &outputs[outputs.size()-num_outfiles+idx];
+ }
explicit CompactionState(Compaction* c)
: compaction(c),
- outfile(NULL),
- builder(NULL),
+ num_outfiles(0),
+ outfiles(NULL),
+ builders(NULL),
total_bytes(0) {
}
};
return FlushMemTable(FlushOptions());
}
+void DBImpl::TEST_ForceFlushMetrics() {
+ assert(is_hotcold_);
+ assert(options_.block_cache != NULL);
+
+ options_.block_cache->ForceFlushMetrics();
+
+ TEST_WaitForMetricsFlush();
+}
+
Status DBImpl::TEST_WaitForCompactMemTable() {
return WaitForCompactMemTable();
}
return bg_error_;
}
+void DBImpl::TEST_WaitForMetricsFlush() {
+ MutexLock l(&mutex_);
+ while (bg_flushing_metrics_scheduled_) {
+ bg_cv_.Wait();
+ }
+}
+
void DBImpl::MaybeScheduleCompaction() {
mutex_.AssertHeld();
if (bg_compaction_scheduled_ >= options_.max_background_compactions) {
void DBImpl::CleanupCompaction(CompactionState* compact) {
mutex_.AssertHeld();
- if (compact->builder != NULL) {
+ if (compact->builders != NULL) {
// May happen if we get a shutdown call in the middle of compaction
- compact->builder->Abandon();
- delete compact->builder;
+
+ for (size_t i = 0; i < compact->num_outfiles; ++i) {
+ compact->builders[i]->Abandon();
+ delete compact->builders[i];
+ delete compact->outfiles[i];
+ }
+ delete[] compact->builders;
+ delete[] compact->outfiles;
+
+ compact->num_outfiles = 0;
+ compact->builders = NULL;
+ compact->outfiles = NULL;
} else {
- assert(compact->outfile == NULL);
+ assert(compact->outfiles == NULL);
}
- delete compact->outfile;
for (size_t i = 0; i < compact->outputs.size(); i++) {
const CompactionState::Output& out = compact->outputs[i];
pending_outputs_.erase(out.number);
void DBImpl::AllocateCompactionOutputFileNumbers(CompactionState* compact) {
mutex_.AssertHeld();
assert(compact != NULL);
- assert(compact->builder == NULL);
+ assert(compact->builders == NULL);
int filesNeeded = compact->compaction->num_input_files(1);
for (int i = 0; i < filesNeeded; i++) {
uint64_t file_number = versions_->NewFileNumber();
Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) {
assert(compact != NULL);
- assert(compact->builder == NULL);
- uint64_t file_number;
- // If we have not yet exhausted the pre-allocated file numbers,
- // then use the one from the front. Otherwise, we have to acquire
- // the heavyweight lock and allocate a new file number.
- if (!compact->allocated_file_numbers.empty()) {
- file_number = compact->allocated_file_numbers.front();
- compact->allocated_file_numbers.pop_front();
- } else {
- mutex_.Lock();
- file_number = versions_->NewFileNumber();
- pending_outputs_.insert(file_number);
- mutex_.Unlock();
- }
- CompactionState::Output out;
- out.number = file_number;
- out.smallest.Clear();
- out.largest.Clear();
- compact->outputs.push_back(out);
+ assert(compact->builders == NULL);
- // Make the output file
- std::string fname = TableFileName(dbname_, file_number);
- Status s = env_->NewWritableFile(fname, &compact->outfile);
- if (s.ok()) {
- compact->builder = new TableBuilder(options_, compact->outfile,
- compact->compaction->level() + 1);
+ compact->num_outfiles = is_hotcold_?2:1;
+ compact->builders = new TableBuilder*[compact->num_outfiles];
+ compact->outfiles = new WritableFile*[compact->num_outfiles];
+
+ Status s;
+ for (size_t i = 0; i < compact->num_outfiles; ++i) {
+ uint64_t file_number;
+ // If we have not yet exhausted the pre-allocated file numbers,
+ // then use the one from the front. Otherwise, we have to acquire
+ // the heavyweight lock and allocate a new file number.
+ if (!compact->allocated_file_numbers.empty()) {
+ file_number = compact->allocated_file_numbers.front();
+ compact->allocated_file_numbers.pop_front();
+ } else {
+ mutex_.Lock();
+ file_number = versions_->NewFileNumber();
+ pending_outputs_.insert(file_number);
+ mutex_.Unlock();
+ }
+ CompactionState::Output out;
+ out.number = file_number;
+ out.smallest.Clear();
+ out.largest.Clear();
+ compact->outputs.push_back(out);
+
+ // Make the output file
+ std::string fname = TableFileName(dbname_, file_number);
+ s = env_->NewWritableFile(fname, &compact->outfiles[i]);
+ if (s.ok()) {
+ compact->builders[i] = new TableBuilder(options_, compact->outfiles[i],
+ compact->compaction->level() + 1);
+ } else {
+ // Clean up already constructed builders and outfiles
+ for (size_t j = 0; j < i; ++j) {
+ compact->builders[j]->Abandon();
+ delete compact->builders[j];
+ delete compact->outfiles[j];
+ }
+ delete[] compact->builders;
+ delete[] compact->outfiles;
+
+ compact->num_outfiles = 0;
+ compact->builders = NULL;
+ compact->outfiles = NULL;
+
+ // And stop looping
+ break;
+ }
}
return s;
}
Status DBImpl::FinishCompactionOutputFile(CompactionState* compact,
Iterator* input) {
assert(compact != NULL);
- assert(compact->outfile != NULL);
- assert(compact->builder != NULL);
+ assert(compact->outfiles != NULL);
+ assert(compact->builders != NULL);
- const uint64_t output_number = compact->current_output()->number;
- assert(output_number != 0);
+ Status s;
- // Check for iterator errors
- Status s = input->status();
- const uint64_t current_entries = compact->builder->NumEntries();
- if (s.ok()) {
- s = compact->builder->Finish();
- } else {
- compact->builder->Abandon();
- }
- const uint64_t current_bytes = compact->builder->FileSize();
- compact->current_output()->file_size = current_bytes;
- compact->total_bytes += current_bytes;
- delete compact->builder;
- compact->builder = NULL;
-
- // Finish and check for file errors
- if (s.ok() && !options_.disableDataSync) {
- if (options_.use_fsync) {
- s = compact->outfile->Fsync();
+ size_t i;
+ for (i = 0; i < compact->num_outfiles && s.ok(); ++i) {
+ const uint64_t output_number = compact->current_output(i)->number;
+ assert(output_number != 0);
+
+ // Check for iterator errors
+ s = input->status();
+ const uint64_t current_entries = compact->builders[i]->NumEntries();
+ if (s.ok() && current_entries > 0) {
+ // Don't write out the output file if it is empty.
+ s = compact->builders[i]->Finish();
} else {
- s = compact->outfile->Sync();
+ compact->builders[i]->Abandon();
+ }
+ const uint64_t current_bytes = compact->builders[i]->FileSize();
+ compact->current_output(i)->file_size = current_bytes;
+ compact->current_output(i)->skip = current_entries == 0;
+ compact->total_bytes += current_bytes;
+ delete compact->builders[i];
+ compact->builders[i] = nullptr;
+
+ // Finish and check for file errors
+ if (s.ok() && !options_.disableDataSync && current_entries > 0) {
+ if (options_.use_fsync) {
+ s = compact->outfiles[i]->Fsync();
+ } else {
+ s = compact->outfiles[i]->Sync();
+ }
+ }
+ if (s.ok() && current_entries > 0) {
+ s = compact->outfiles[i]->Close();
+ }
+ delete compact->outfiles[i];
+ compact->outfiles[i] = nullptr;
+
+ if (s.ok() && current_entries > 0) {
+ // Verify that the table is usable
+ Iterator* iter = table_cache_->NewIterator(ReadOptions(),
+ output_number,
+ current_bytes);
+ s = iter->status();
+ delete iter;
+ if (s.ok()) {
+ Log(options_.info_log,
+ "Generated table #%llu: %lld keys, %lld bytes",
+ (unsigned long long) output_number,
+ (unsigned long long) current_entries,
+ (unsigned long long) current_bytes);
+ }
}
}
- if (s.ok()) {
- s = compact->outfile->Close();
- }
- delete compact->outfile;
- compact->outfile = NULL;
-
- if (s.ok() && current_entries > 0) {
- // Verify that the table is usable
- Iterator* iter = table_cache_->NewIterator(ReadOptions(),
- output_number,
- current_bytes);
- s = iter->status();
- delete iter;
- if (s.ok()) {
- Log(options_.info_log,
- "Generated table #%llu: %lld keys, %lld bytes",
- (unsigned long long) output_number,
- (unsigned long long) current_entries,
- (unsigned long long) current_bytes);
+
+ // Clean up rest of the output files and builders if we experienced an error.
+ if (!s.ok()) {
+ for (; i < compact->num_outfiles; ++i) {
+ compact->builders[i]->Abandon();
+ delete compact->builders[i];
+ delete compact->outfiles[i];
}
}
+
+ delete[] compact->builders;
+ delete[] compact->outfiles;
+
+ compact->num_outfiles = 0;
+ compact->builders = NULL;
+ compact->outfiles = NULL;
+
return s;
}
const int level = compact->compaction->level();
for (size_t i = 0; i < compact->outputs.size(); i++) {
const CompactionState::Output& out = compact->outputs[i];
+ if (out.skip) continue;
compact->compaction->edit()->AddFile(
level + 1,
out.number, out.file_size, out.smallest, out.largest);
CompactMetrics(metrics);
// Flush metrics to database.
- // TODO (opt): Remove read-modify-write as it is incredibly slow.
DB* metrics_db = dbimpl->metrics_db_;
for (size_t i = 0; i < metrics.size(); ++i) {
assert(metrics[i] != NULL);
const std::string& key = metrics[i]->GetDBKey();
+ // TODO: fix this code by some means so that we don't lose metrics
+ // already in the database. Using a read-update-write cycle is far
+ // too slow so we temporarily replaced it with an overwrite, with
+ // the old code left commented out to show the ideal logic.
+ metrics_db->Put(WriteOptions(), key, metrics[i]->GetDBValue());
+ /*
BlockMetrics* db_metrics = NULL;
std::string db_value;
// TODO: Log an error here.
}
delete db_metrics;
+ */
}
for (size_t i = 0; i < metrics.size(); ++i) {
}
}
+bool DBImpl::TEST_IsHot(const Iterator* iter) {
+ assert(is_hotcold_);
+ assert(iter != NULL);
+
+ BlockMetrics* bm = NULL;
+ ReadOptions metrics_read_options;
+ bool result = IsRecordHot(iter, metrics_db_, metrics_read_options, &bm);
+ delete bm;
+
+ return result;
+}
+
+bool DBImpl::TEST_IsHotCold() {
+ return is_hotcold_;
+}
+
//
// Given a sequence number, return the sequence number of the
// earliest snapshot that this sequence number is visible in.
Log(options_.info_log, "Compaction start summary: %s\n", scratch);
assert(versions_->NumLevelFiles(compact->compaction->level()) > 0);
- assert(compact->builder == NULL);
- assert(compact->outfile == NULL);
+ assert(compact->builders == NULL);
+ assert(compact->outfiles == NULL);
SequenceNumber visible_at_tip = 0;
SequenceNumber earliest_snapshot;
// Release mutex while we're actually doing the compaction work
mutex_.Unlock();
+ BlockMetrics* block_metrics_store = NULL;
+
const uint64_t start_micros = env_->NowMicros();
Iterator* input = versions_->MakeInputIterator(compact->compaction);
input->SeekToFirst();
Slice value = input->value();
Slice* compaction_filter_value = NULL;
if (compact->compaction->ShouldStopBefore(key) &&
- compact->builder != NULL) {
+ compact->builders != NULL) {
status = FinishCompactionOutputFile(compact, input);
if (!status.ok()) {
break;
if (!drop) {
// Open output file if necessary
- if (compact->builder == NULL) {
+ if (compact->builders == NULL) {
status = OpenCompactionOutputFile(compact);
if (!status.ok()) {
break;
}
}
- if (compact->builder->NumEntries() == 0) {
- compact->current_output()->smallest.DecodeFrom(key);
+
+ // Select output file
+ size_t outfile_idx = 0;
+ if (is_hotcold_ && IsRecordHot(input, metrics_db_, ReadOptions(),
+ &block_metrics_store)) {
+ outfile_idx = 1;
}
- compact->current_output()->largest.DecodeFrom(key);
- compact->builder->Add(key, value);
+ assert(outfile_idx < compact->num_outfiles);
- // Close output file if it is big enough
- if (compact->builder->FileSize() >=
+
+ if (compact->builders[outfile_idx]->NumEntries() == 0) {
+ compact->current_output(outfile_idx)->smallest.DecodeFrom(key);
+ }
+ compact->current_output(outfile_idx)->largest.DecodeFrom(key);
+ compact->builders[outfile_idx]->Add(key, value);
+
+ // Close all the output files when any of them reach the file size limit.
+ // By checking only the file we added the record to we are still
+ // guaranteed to catch when this happens as none of the other current
+ // output files have changed their size.
+ // TODO: consider adding the size of all the builders together.
+ if (compact->builders[outfile_idx]->FileSize() >=
compact->compaction->MaxOutputFileSize()) {
status = FinishCompactionOutputFile(compact, input);
if (!status.ok()) {
input->Next();
}
+ delete block_metrics_store;
+
if (status.ok() && shutting_down_.Acquire_Load()) {
status = Status::IOError("Deleting DB during compaction");
}
- if (status.ok() && compact->builder != NULL) {
+ if (status.ok() && compact->builders != NULL) {
status = FinishCompactionOutputFile(compact, input);
}
if (status.ok()) {
stats.files_in_levelnp1 = compact->compaction->num_input_files(1);
int num_output_files = compact->outputs.size();
- if (compact->builder != NULL) {
+ if (compact->builders != NULL) {
// An error occured so ignore the last output.
assert(num_output_files > 0);
- --num_output_files;
+ num_output_files -= compact->num_outfiles;
}
stats.files_out_levelnp1 = num_output_files;
// Done
} else {
ReadOptions read_options = options;
- if (is_hotcold_) {
+ if (read_options.record_accesses && is_hotcold_) {
read_options.metrics_handler = this;
}
s = current->Get(read_options, lkey, value, &stats);
Iterator* DBImpl::NewIterator(const ReadOptions& options) {
SequenceNumber latest_snapshot;
ReadOptions read_options = options;
- if (is_hotcold_) {
+ if (read_options.record_accesses && is_hotcold_) {
read_options.metrics_handler = this;
}
Iterator* internal_iter = NewInternalIterator(read_options, &latest_snapshot);
// Force current memtable contents to be compacted.
Status TEST_CompactMemTable();
+ // Forcefully flushes metrics to metrics DB from where it can be read.
+ // This method only returns after the flush finishes.
+ void TEST_ForceFlushMetrics();
+
// Wait for memtable compaction
Status TEST_WaitForCompactMemTable();
// Wait for any compaction
Status TEST_WaitForCompact();
+ // Wait for flushing of metrics to finish
+ void TEST_WaitForMetricsFlush();
+
// Return an internal iterator over the current state of the database.
// The keys of this iterator are internal keys (see format.h).
// The returned iterator should be deleted when no longer needed.
// Return the current manifest file no.
uint64_t TEST_Current_Manifest_FileNo();
+
+ // Returns true if the record pointed to by iter is hot.
+ // REQUIRES: this is a hot-cold database.
+ // REQUIRES: iter is not NULL and belongs to this database.
+ bool TEST_IsHot(const Iterator* iter);
+
+ // Returns true if the DBImpl does hot-cold tracking.
+ bool TEST_IsHotCold();
+
protected:
Env* const env_;
const std::string dbname_;
}
}
+ virtual const Iterator* FindSubIterator() const {
+ if (!Valid()) {
+ return NULL;
+ }
+ return iter_;
+ };
+
virtual void Next();
virtual void Prev();
virtual void Seek(const Slice& target);
#include "leveldb/cache.h"
#include "leveldb/env.h"
#include "leveldb/table.h"
+#include "util/coding.h"
#include "util/hash.h"
#include "util/logging.h"
#include "util/mutexlock.h"
ASSERT_OK(TryReopen(options));
}
+ void ReopenWithHotCold(Options* options = NULL) {
+ ASSERT_OK(TryReopenWithHotCold(options));
+ }
+
void Close() {
delete db_;
db_ = NULL;
ASSERT_OK(TryReopen(options));
}
+ void DestroyAndReopenWithHotCold(Options* options = NULL) {
+ delete db_;
+ db_ = NULL;
+ DestroyDB(dbname_, Options());
+ ASSERT_OK(TryReopenWithHotCold(options));
+ }
+
Status PureReopen(Options* options, DB** db) {
return DB::Open(*options, dbname_, db);
}
return DB::Open(opts, dbname_, &db_);
}
+ Status TryReopenWithHotCold(Options* options) {
+ delete db_;
+ db_ = NULL;
+ Options opts;
+ if (options != NULL) {
+ opts = *options;
+ } else {
+ opts = CurrentOptions();
+ opts.create_if_missing = true;
+ }
+ last_options_ = opts;
+
+ return DB::OpenWithHotCold(opts, dbname_, &db_);
+ }
+
Status Put(const std::string& k, const std::string& v) {
return db_->Put(WriteOptions(), k, v);
}
} while (ChangeOptions());
}
+TEST(DBTest, HotCold) {
+ const uint32_t kNumKeys = 1 << 9;
+ const uint32_t kKeysToSkip = 6;
+ do {
+ DestroyAndReopenWithHotCold(NULL);
+ ASSERT_TRUE(db_ != NULL);
+
+ // Skip if this db is not hotcold
+ if (!dbfull()->TEST_IsHotCold()) {
+ continue;
+ }
+
+ // Populate db and push all data to sstable
+ for (uint32_t key = 0; key < kNumKeys; ++key) {
+ std::string skey;
+ PutFixed32(&skey, key);
+
+ Put(skey, skey);
+ }
+ dbfull()->TEST_CompactMemTable();
+ dbfull()->CompactRange(NULL, NULL);
+
+ // Flush metrics if any and check that all records are cold
+ dbfull()->TEST_ForceFlushMetrics();
+ {
+ ReadOptions opts;
+ opts.record_accesses = false;
+ Iterator* iter = db_->NewIterator(opts);
+ for (uint32_t key = 0; key < kNumKeys; ++key) {
+ std::string skey;
+ PutFixed32(&skey, key);
+
+ iter->Seek(skey);
+ ASSERT_TRUE(iter->Valid());
+ ASSERT_EQ(skey, iter->key().ToString());
+
+ ASSERT_TRUE(!dbfull()->TEST_IsHot(iter));
+ }
+ delete iter;
+ }
+
+ // Access a few rows and flush the metrics to the metrics db.
+ for (uint32_t key = 0; key < kNumKeys; key += kKeysToSkip) {
+ std::string skey;
+ PutFixed32(&skey, key);
+
+ ASSERT_EQ(skey, Get(skey));
+ }
+ dbfull()->TEST_ForceFlushMetrics();
+
+ // Check that keys that should be hot are hot.
+ // Due to false positives we don't check whether cold keys are actually
+ // cold.
+ {
+ ReadOptions opts;
+ opts.record_accesses = false;
+ Iterator* iter = db_->NewIterator(opts);
+ uint32_t totalColdKeys = 0; // Total number of keys that are supposed to
+ // be cold.
+ uint32_t totalHotColdKeys = 0; // Counts number of keys which are
+ // supposed to be cold that are actually
+ // hot
+ for (uint32_t key = 0; key < kNumKeys; ++key) {
+ std::string skey;
+ PutFixed32(&skey, key);
+
+ iter->Seek(skey);
+ ASSERT_TRUE(iter->Valid());
+ ASSERT_EQ(skey, iter->key().ToString());
+
+ if (key % kKeysToSkip == 0) {
+ ASSERT_TRUE(dbfull()->TEST_IsHot(iter));
+ } else {
+ ++totalColdKeys;
+ if (dbfull()->TEST_IsHot(iter)) {
+ ++totalHotColdKeys;
+ }
+ }
+ }
+ delete iter;
+
+ if (totalColdKeys != 0) {
+ fprintf(stderr, "Hot false-positive rate: %.2f%%\n",
+ double(totalHotColdKeys)/totalColdKeys*100);
+ }
+ }
+
+ } while (ChangeOptions());
+}
+
std::string MakeKey(unsigned int num) {
char buf[30];
snprintf(buf, sizeof(buf), "%016u", num);
return sum;
}
+static int64_t TotalFileSize(const std::vector<FileGroup*>& file_groups) {
+ int64_t sum = 0;
+ for (size_t i = 0; i < file_groups.size() && file_groups[i]; i++) {
+ sum += file_groups[i]->total_file_size;
+ }
+ return sum;
+}
+
Version::~Version() {
assert(refs_ == 0);
prev_->next_ = next_;
next_->prev_ = prev_;
+ // Free FileGroups
+ for (int level = 0; level < vset_->NumberLevels(); level++) {
+ for (size_t i = 0; i < file_groups_[level].size(); i++) {
+ delete file_groups_[level][i];
+ }
+ }
+ delete[] file_groups_;
+
// Drop references to files
for (int level = 0; level < vset_->NumberLevels(); level++) {
for (size_t i = 0; i < files_[level].size(); i++) {
return !BeforeFile(ucmp, largest_user_key, files[index]);
}
-// An internal iterator. For a given version/level pair, yields
-// information about the files in the level. For a given entry, key()
-// is the largest key that occurs in the file, and value() is an
-// 16-byte value containing the file number and file size, both
-// encoded using EncodeFixed64.
-class Version::LevelFileNumIterator : public Iterator {
+Iterator* FileGroup::NewIterator(TableCache* cache,
+ const InternalKeyComparator* icmp,
+ const ReadOptions& options) {
+ if (files.size() == 1) {
+ Iterator* iter = cache->NewIterator(options, files[0]->number,
+ files[0]->file_size);
+ return iter;
+ }
+
+ Iterator** iters = new Iterator*[files.size()];
+ for (size_t i = 0; i < files.size(); ++i) {
+ iters[i] = cache->NewIterator(options, files[i]->number,
+ files[i]->file_size);
+ }
+ Iterator* iter = NewMergingIterator(icmp, iters, files.size());
+ delete[] iters;
+ return iter;
+}
+
+namespace {
+// Represents either the start or end of a file. "start" indicates which key we
+// use of "file".
+struct EndPoint {
+ FileMetaData* file;
+ bool start; // If true we use the first key; otherwise, the last.
+
+ EndPoint(FileMetaData* file, bool start)
+ : file(file), start(start) {
+ }
+};
+// Comparator to sort end points. Sort by the key of two end points. If a two
+// endpoints have the same key and one is a start and the other an end end
+// point, then the end end point is ordered before the start end point.
+struct BySmallestEndPoint {
+ const InternalKeyComparator* internal_comparator;
+
+ BySmallestEndPoint(const InternalKeyComparator* internal_comparator)
+ : internal_comparator(internal_comparator) {
+ }
+
+ bool operator()(const EndPoint& a, const EndPoint& b) {
+ const InternalKey& a_key = a.start?a.file->smallest:a.file->largest;
+ const InternalKey& b_key = b.start?b.file->smallest:b.file->largest;
+
+ int r = internal_comparator->Compare(a_key, b_key);
+
+ // if a_key < b_key or else if a_key == b_key and "a" is an start point and
+ // "b" is a end point then "a" comes before "b".
+ return (r < 0) || (r == 0 && a.start && !b.start);
+ }
+};
+} // namespace
+void GroupFiles(
+ const InternalKeyComparator* internal_comparator,
+ const std::vector<FileMetaData*>& files,
+ std::vector<FileGroup*>& file_groups) {
+ file_groups.clear();
+
+ // Store and sort the end points of the files.
+ std::vector<EndPoint> end_points;
+ end_points.reserve(files.size() * 2);
+ for (size_t i = 0; i < files.size(); ++i) {
+ end_points.push_back(EndPoint(files[i], false));
+ end_points.push_back(EndPoint(files[i], true));
+ }
+ std::sort(end_points.begin(), end_points.end(),
+ BySmallestEndPoint(internal_comparator));
+
+ // Loop through end points finding groups of overlapping files.
+ size_t cur_unended_files = 0; // Number of files of which we've seen the
+ // start point, but not the end point.
+ for (size_t i = 0; i < end_points.size(); ++i) {
+ FileGroup* cur_group;
+ if (end_points[i].start) {
+ if (cur_unended_files == 0) {
+ // Create a new group.
+ cur_group = new FileGroup();
+ file_groups.push_back(cur_group);
+
+ // First file in this group, so update the smallest key.
+ cur_group->smallest = end_points[i].file->smallest;
+ } else {
+ cur_group = file_groups[file_groups.size()-1];
+ }
+
+ // Add files to group when we first encounter them.
+ cur_group->files.push_back(end_points[i].file);
+ cur_group->total_file_size += end_points[i].file->file_size;
+
+ ++cur_unended_files;
+ } else {
+ --cur_unended_files;
+ assert(cur_unended_files >= 0);
+ if (cur_unended_files == 0) {
+ // No more files are going to overlap with current group so set the
+ // largest key in the group, cause we are either done or going to start
+ // a new group the next iteration.
+
+ cur_group = file_groups[file_groups.size()-1];
+ cur_group->largest = end_points[i].file->largest;
+ }
+ }
+ }
+}
+
+size_t FindGroup(const InternalKeyComparator& icmp,
+ const std::vector<FileGroup*>& groups,
+ const Slice& key) {
+ size_t left = 0;
+ size_t right = groups.size();
+ while (left < right) {
+ size_t mid = (left + right) / 2;
+ const FileGroup* g = groups[mid];
+ if (icmp.InternalKeyComparator::Compare(g->largest.Encode(), key) < 0) {
+ // Key at "mid.largest" is < "target". Therefore all
+ // groups at or before "mid" are uninteresting.
+ left = mid + 1;
+ } else {
+ // Key at "mid.largest" is >= "target". Therefore all groups
+ // after "mid" are uninteresting.
+ right = mid;
+ }
+ }
+ return right;
+}
+
+bool SomeGroupOverlapsRange(
+ const InternalKeyComparator& icmp,
+ const std::vector<FileGroup*>& groups,
+ const Slice* smallest_user_key,
+ const Slice* largest_user_key) {
+ const Comparator* ucmp = icmp.user_comparator();
+
+ // Binary search over group list
+ uint32_t index = 0;
+ if (smallest_user_key != NULL) {
+ // Find the earliest possible internal key for smallest_user_key
+ InternalKey small(*smallest_user_key, kMaxSequenceNumber,
+ kValueTypeForSeek);
+ index = FindGroup(icmp, groups, small.Encode());
+ }
+
+ if (index >= groups.size()) {
+ // beginning of range is after all files, so no overlap.
+ return false;
+ }
+
+ return (largest_user_key == NULL ||
+ ucmp->Compare(*largest_user_key,
+ groups[index]->smallest.user_key()) >= 0);
+}
+
+namespace {
+// An internal iterator. For a given list of groups, yields information about
+// those groups. For a given entry, key() is the largest key that occurs in the
+// group, and value() is an 8-byte value containing the index of the group in
+// the list, encoded using EncodeFixed64.
+class LevelGroupIdxIterator : public Iterator {
public:
- LevelFileNumIterator(const InternalKeyComparator& icmp,
- const std::vector<FileMetaData*>* flist)
+ LevelGroupIdxIterator(const InternalKeyComparator& icmp,
+ const std::vector<FileGroup*>& groups)
: icmp_(icmp),
- flist_(flist),
- index_(flist->size()) { // Marks as invalid
+ groups_(groups),
+ index_(groups_.size()) { // Marks as invalid
+ assert(!groups.empty());
}
virtual bool Valid() const {
- return index_ < flist_->size();
+ return index_ < groups_.size();
}
virtual void Seek(const Slice& target) {
- index_ = FindFile(icmp_, *flist_, target);
+ index_ = FindGroup(icmp_, groups_, target);
}
virtual void SeekToFirst() { index_ = 0; }
virtual void SeekToLast() {
- index_ = flist_->empty() ? 0 : flist_->size() - 1;
+ index_ = groups_.empty() ? 0 : groups_.size() - 1;
}
virtual void Next() {
assert(Valid());
virtual void Prev() {
assert(Valid());
if (index_ == 0) {
- index_ = flist_->size(); // Marks as invalid
+ index_ = groups_.size(); // Marks as invalid
} else {
index_--;
}
}
Slice key() const {
assert(Valid());
- return (*flist_)[index_]->largest.Encode();
+ return groups_[index_]->largest.Encode();
}
Slice value() const {
assert(Valid());
- EncodeFixed64(value_buf_, (*flist_)[index_]->number);
- EncodeFixed64(value_buf_+8, (*flist_)[index_]->file_size);
+ EncodeFixed64(value_buf_, index_);
return Slice(value_buf_, sizeof(value_buf_));
}
virtual Status status() const { return Status::OK(); }
private:
const InternalKeyComparator icmp_;
- const std::vector<FileMetaData*>* const flist_;
- uint32_t index_;
+ uint64_t level_;
+ const std::vector<FileGroup*>& groups_;
+ size_t index_;
// Backing store for value(). Holds the file number and size.
- mutable char value_buf_[16];
+ mutable char value_buf_[8];
};
-static Iterator* GetFileIterator(void* arg,
- const ReadOptions& options,
- const Slice& file_value) {
- TableCache* cache = reinterpret_cast<TableCache*>(arg);
- if (file_value.size() != 16) {
- return NewErrorIterator(
- Status::Corruption("FileReader invoked with unexpected value"));
+// Struct that contains all the information that GetGroupIterator() needs to
+// function.
+struct GetGroupIteratorArg {
+ const InternalKeyComparator* icmp;
+ TableCache* cache;
+ const std::vector<FileGroup*>& groups;
+
+ GetGroupIteratorArg(const InternalKeyComparator* icmp,
+ TableCache* cache,
+ const std::vector<FileGroup*>& groups)
+ : icmp(icmp), cache(cache), groups(groups) {
+ }
+
+ // Function that gets passed to an Iterator in order to free an instance of
+ // this class.
+ static void IterDelete(void* arg1, void*) {
+ GetGroupIteratorArg* ggia = reinterpret_cast<GetGroupIteratorArg*>(arg1);
+ delete ggia;
+ }
+};
+// Returns an iterator over the specifided group.
+static Iterator* GetGroupIterator(void* arg,
+ const ReadOptions& options,
+ const Slice& index) {
+ GetGroupIteratorArg* ggia = reinterpret_cast<GetGroupIteratorArg*>(arg);
+ Iterator* iter;
+ if (index.size() != 8) {
+ iter = NewErrorIterator(
+ Status::Corruption("GetGroupIterator() invoked with unexpected index"));
} else {
- return cache->NewIterator(options,
- DecodeFixed64(file_value.data()),
- DecodeFixed64(file_value.data() + 8));
+ uint64_t iindex = DecodeFixed64(index.data());
+ assert(iindex < ggia->groups.size());
+ iter = ggia->groups[iindex]->NewIterator(ggia->cache, ggia->icmp, options);
}
+ return iter;
}
-Iterator* Version::NewConcatenatingIterator(const ReadOptions& options,
- int level) const {
- return NewTwoLevelIterator(
- new LevelFileNumIterator(vset_->icmp_, &files_[level]),
- &GetFileIterator, vset_->table_cache_, options);
+// Returns an iterator that concatenates together the given groups.
+// REQUIRES: The groups are sorted in order and are non-overlapping
+Iterator* NewGroupsConcatenatingIterator(
+ const ReadOptions& options,
+ TableCache* cache,
+ const InternalKeyComparator* icmp,
+ const std::vector<FileGroup*>& groups) {
+ assert(!groups.empty());
+
+ GetGroupIteratorArg* ggia = new GetGroupIteratorArg(icmp, cache, groups);
+ Iterator* iter = NewTwoLevelIterator(
+ new LevelGroupIdxIterator(*icmp, groups),
+ &GetGroupIterator, ggia, options);
+ iter->RegisterCleanup(&GetGroupIteratorArg::IterDelete, ggia, NULL);
+
+ return iter;
}
+} // namespace
void Version::AddIterators(const ReadOptions& options,
std::vector<Iterator*>* iters) {
}
// For levels > 0, we can use a concatenating iterator that sequentially
- // walks through the non-overlapping files in the level, opening them
+ // walks through the non-overlapping groups in the level, opening them
// lazily.
for (int level = 1; level < vset_->NumberLevels(); level++) {
- if (!files_[level].empty()) {
- iters->push_back(NewConcatenatingIterator(options, level));
+ if (!file_groups_[level].empty()) {
+ iters->push_back(NewGroupsConcatenatingIterator(options,
+ vset_->table_cache_,
+ &vset_->icmp_,
+ file_groups_[level]));
}
}
}
const Comparator* ucmp;
Slice user_key;
std::string* value;
+ SequenceNumber seq;
bool didIO; // did we do any disk io?
};
}
s->state = (parsed_key.type == kTypeValue) ? kFound : kDeleted;
if (s->state == kFound) {
s->value->assign(v.data(), v.size());
+ s->seq = parsed_key.sequence;
}
}
}
}
+
static bool NewestFirst(FileMetaData* a, FileMetaData* b) {
return a->number > b->number;
}
offset_manifest_file_(0),
version_number_(version_number) {
files_ = new std::vector<FileMetaData*>[vset->NumberLevels()];
+ file_groups_ = new std::vector<FileGroup*>[vset->NumberLevels()]();
}
Status Version::Get(const ReadOptions& options,
// We can search level-by-level since entries never hop across
// levels. Therefore we are guaranteed that if we find data
// in an smaller level, later levels are irrelevant.
- std::vector<FileMetaData*> tmp;
- FileMetaData* tmp2;
+ std::vector<FileMetaData*> files;
for (int level = 0; level < vset_->NumberLevels(); level++) {
- size_t num_files = files_[level].size();
- if (num_files == 0) continue;
+ size_t num_groups = file_groups_[level].size();
+ if (files_[level].size() == 0) continue;
+ assert(num_groups > 0);
- // Get the list of files to search in this level
- FileMetaData* const* files = &files_[level][0];
- if (level == 0) {
- // Level-0 files may overlap each other. Find all files that
- // overlap user_key and process them in order from newest to oldest.
- tmp.reserve(num_files);
- for (uint32_t i = 0; i < num_files; i++) {
- FileMetaData* f = files[i];
- if (ucmp->Compare(user_key, f->smallest.user_key()) >= 0 &&
- ucmp->Compare(user_key, f->largest.user_key()) <= 0) {
- tmp.push_back(f);
+ const std::vector<FileGroup*>& groups = file_groups_[level];
+ size_t index = FindGroup(vset_->icmp_, groups, ikey);
+ if (index >= num_groups) {
+ continue;
+ } else if (ucmp->Compare(user_key,
+ groups[index]->smallest.user_key()) >= 0) {
+ // Although user keys can span multiple groups we don't have to worry
+ // about checking the next group as we are only interested in the user
+ // key with the greatest sequence number and as internal keys with the same
+ // user key are sorted in order of decreasing sequence number, this group
+ // is guaranteed to contain the user key with the largest sequence
+ // number.
+
+ for (size_t i = 0; i < groups[index]->files.size(); i++) {
+ FileMetaData* f = file_groups_[level][index]->files[i];
+
+ if (i != 0 && ucmp->Compare(user_key, f->smallest.user_key()) < 0) {
+ // We know the user key is greater than the smallest key of the first
+ // file (which is also the smallest key of the group) so we avoid
+ // calling the comparator.
+ // If the smallest key in the current file is greater than the user
+ // key, then no other file will overlap with the key so we can stop.
+ break;
}
- }
- if (tmp.empty()) continue;
- std::sort(tmp.begin(), tmp.end(), NewestFirst);
- files = &tmp[0];
- num_files = tmp.size();
- } else {
- // Binary search to find earliest index whose largest key >= ikey.
- uint32_t index = FindFile(vset_->icmp_, files_[level], ikey);
- if (index >= num_files) {
- files = NULL;
- num_files = 0;
- } else {
- tmp2 = files[index];
- if (ucmp->Compare(user_key, tmp2->smallest.user_key()) < 0) {
- // All of "tmp2" is past any data for user_key
- files = NULL;
- num_files = 0;
- } else {
- files = &tmp2;
- num_files = 1;
+ if (ucmp->Compare(user_key, f->largest.user_key()) <= 0) {
+ files.push_back(f);
}
}
}
- for (uint32_t i = 0; i < num_files; ++i) {
+ if (files.empty()) continue;
+
+ // Sort files since in level 0 we can simply look starting from the newest
+ // file.
+ if (level == 0) {
+ std::sort(files.begin(), files.end(), NewestFirst);
+ }
+
+ Saver saver;
+ saver.ucmp = ucmp;
+ std::string saved_value;
+ SequenceNumber saved_seq = 0;
+ SaverState saved_state = kNotFound;
+
+ for (uint32_t i = 0; i < files.size(); ++i) {
FileMetaData* f = files[i];
- Saver saver;
saver.state = kNotFound;
saver.ucmp = ucmp;
saver.user_key = user_key;
case kNotFound:
break; // Keep searching in other files
case kFound:
- return s;
+ if (level == 0 || files.size() == 1) {
+ return s; // no need to look at other files
+ }
+
+ if (saver.seq >= saved_seq) { // save most recent
+ saved_value = *saver.value;
+ saved_seq = saver.seq;
+ saved_state = kFound;
+ }
+ break; // keep searching in other files
case kDeleted:
- s = Status::NotFound(Slice()); // Use empty error message for speed
- return s;
+ if (level == 0 || files.size() == 1) {
+ s = Status::NotFound(Slice()); // Use empty error message for speed
+ return s;
+ }
+
+ if (saver.seq >= saved_seq) { // save most recent
+ saved_seq = saver.seq;
+ saved_state = kNotFound;
+ }
+ break;
case kCorrupt:
s = Status::Corruption("corrupted key for ", user_key);
return s;
}
}
+
+ assert(saved_state == kNotFound || files.size() > 1);
+ assert(s.ok());
+ switch (saved_state) {
+ case kFound:
+ *value = saved_value;
+ return s;
+ case kDeleted:
+ s = Status::NotFound(Slice());// Use empty error message for speed
+ return s;
+ default:
+ assert(saved_state == kNotFound);
+ }
+ files.clear();
}
return Status::NotFound(Slice()); // Use an empty error message for speed
FileMetaData* f = stats.seek_file;
if (f != NULL) {
f->allowed_seeks--;
- if (f->allowed_seeks <= 0 && file_to_compact_ == NULL) {
+ if (f->allowed_seeks <= 0 && file_to_compact_ == NULL &&
+ stats.seek_file_level < vset_->NumberLevels()-1) {
+ // We don't allow seek compactions to happen on the level with the
+ // greatest number.
file_to_compact_ = f;
file_to_compact_level_ = stats.seek_file_level;
return true;
bool Version::OverlapInLevel(int level,
const Slice* smallest_user_key,
const Slice* largest_user_key) {
- return SomeFileOverlapsRange(vset_->icmp_, (level > 0), files_[level],
- smallest_user_key, largest_user_key);
+ return SomeGroupOverlapsRange(vset_->icmp_, file_groups_[level],
+ smallest_user_key, largest_user_key);
}
int Version::PickLevelForMemTableOutput(
}
// Store in "*inputs" all files in "level" that overlap [begin,end]
-// If hint_index is specified, then it points to a file in the
-// overlapping range.
-// The file_index returns a pointer to any file in an overlapping range.
+// Including all files that overlap files in those range, files that overlap
+// those files in turn and so on.
void Version::GetOverlappingInputs(
int level,
const InternalKey* begin,
const InternalKey* end,
- std::vector<FileMetaData*>* inputs,
- int hint_index,
- int* file_index) {
+ std::vector<FileMetaData*>* inputs) {
+ assert(level >= 0);
+ assert(level < vset_->NumberLevels());
inputs->clear();
+
+ const std::vector<FileGroup*>& groups = file_groups_[level];
+
Slice user_begin, user_end;
+ size_t start_index = 0;
if (begin != NULL) {
user_begin = begin->user_key();
+ start_index = FindGroup(vset_->icmp_, groups, begin->Encode());
}
if (end != NULL) {
user_end = end->user_key();
}
- if (file_index) {
- *file_index = -1;
- }
- const Comparator* user_cmp = vset_->icmp_.user_comparator();
- if (begin != NULL && end != NULL && level > 0) {
- GetOverlappingInputsBinarySearch(level, user_begin, user_end, inputs,
- hint_index, file_index);
- return;
- }
- for (size_t i = 0; i < files_[level].size(); ) {
- FileMetaData* f = files_[level][i++];
- const Slice file_start = f->smallest.user_key();
- const Slice file_limit = f->largest.user_key();
- if (begin != NULL && user_cmp->Compare(file_limit, user_begin) < 0) {
- // "f" is completely before specified range; skip it
- } else if (end != NULL && user_cmp->Compare(file_start, user_end) > 0) {
- // "f" is completely after specified range; skip it
- } else {
- inputs->push_back(f);
- if (level == 0) {
- // Level-0 files may overlap each other. So check if the newly
- // added file has expanded the range. If so, restart search.
- if (begin != NULL && user_cmp->Compare(file_start, user_begin) < 0) {
- user_begin = file_start;
- inputs->clear();
- i = 0;
- } else if (end != NULL && user_cmp->Compare(file_limit, user_end) > 0) {
- user_end = file_limit;
- inputs->clear();
- i = 0;
- }
- } else if (file_index) {
- *file_index = i-1;
- }
- }
- }
-}
-// Store in "*inputs" all files in "level" that overlap [begin,end]
-// Employ binary search to find at least one file that overlaps the
-// specified range. From that file, iterate backwards and
-// forwards to find all overlapping files.
-void Version::GetOverlappingInputsBinarySearch(
- int level,
- const Slice& user_begin,
- const Slice& user_end,
- std::vector<FileMetaData*>* inputs,
- int hint_index,
- int* file_index) {
- assert(level > 0);
- int min = 0;
- int mid = 0;
- int max = files_[level].size() -1;
- bool foundOverlap = false;
const Comparator* user_cmp = vset_->icmp_.user_comparator();
+ while (start_index < groups.size()) {
+ const FileGroup* group = groups[start_index];
- // if the caller already knows the index of a file that has overlap,
- // then we can skip the binary search.
- if (hint_index != -1) {
- mid = hint_index;
- foundOverlap = true;
- }
-
- while (!foundOverlap && min <= max) {
- mid = (min + max)/2;
- FileMetaData* f = files_[level][mid];
- const Slice file_start = f->smallest.user_key();
- const Slice file_limit = f->largest.user_key();
- if (user_cmp->Compare(file_limit, user_begin) < 0) {
- min = mid + 1;
- } else if (user_cmp->Compare(user_end, file_start) < 0) {
- max = mid - 1;
- } else {
- foundOverlap = true;
+ // If the current group is larger than the end range, stop searching.
+ if (end != NULL &&
+ user_cmp->Compare(group->smallest.user_key(), user_end) > 0) {
break;
}
- }
- // If there were no overlapping files, return immediately.
- if (!foundOverlap) {
- return;
- }
- // returns the index where an overlap is found
- if (file_index) {
- *file_index = mid;
- }
- ExtendOverlappingInputs(level, user_begin, user_end, inputs, mid);
-}
+ inputs->insert(inputs->end(), group->files.begin(), group->files.end());
-// Store in "*inputs" all files in "level" that overlap [begin,end]
-// The midIndex specifies the index of at least one file that
-// overlaps the specified range. From that file, iterate backward
-// and forward to find all overlapping files.
-void Version::ExtendOverlappingInputs(
- int level,
- const Slice& user_begin,
- const Slice& user_end,
- std::vector<FileMetaData*>* inputs,
- int midIndex) {
-
- const Comparator* user_cmp = vset_->icmp_.user_comparator();
-#ifndef NDEBUG
- {
- // assert that the file at midIndex overlaps with the range
- assert(midIndex < files_[level].size());
- FileMetaData* f = files_[level][midIndex];
- const Slice fstart = f->smallest.user_key();
- const Slice flimit = f->largest.user_key();
- if (user_cmp->Compare(fstart, user_begin) >= 0) {
- assert(user_cmp->Compare(fstart, user_end) <= 0);
- } else {
- assert(user_cmp->Compare(flimit, user_begin) >= 0);
- }
- }
-#endif
- int startIndex = midIndex + 1;
- int endIndex = midIndex;
- int count __attribute__((unused)) = 0;
-
- // check backwards from 'mid' to lower indices
- for (int i = midIndex; i >= 0 ; i--) {
- FileMetaData* f = files_[level][i];
- const Slice file_limit = f->largest.user_key();
- if (user_cmp->Compare(file_limit, user_begin) >= 0) {
- startIndex = i;
- assert((count++, true));
- } else {
- break;
- }
- }
- // check forward from 'mid+1' to higher indices
- for (unsigned int i = midIndex+1; i < files_[level].size(); i++) {
- FileMetaData* f = files_[level][i];
- const Slice file_start = f->smallest.user_key();
- if (user_cmp->Compare(file_start, user_end) <= 0) {
- assert((count++, true));
- endIndex = i;
- } else {
- break;
- }
- }
- assert(count == endIndex - startIndex + 1);
-
- // insert overlapping files into vector
- for (int i = startIndex; i <= endIndex; i++) {
- FileMetaData* f = files_[level][i];
- inputs->push_back(f);
+ ++start_index;
}
}
return r;
}
+void Version::RebuildGroups(int level) {
+ for (size_t i = 0; i < file_groups_[level].size(); ++i) {
+ delete file_groups_[level][i];
+ }
+ GroupFiles(&vset_->icmp_, files_[level], file_groups_[level]);
+}
+
// this is used to batch writes to the manifest file
struct VersionSet::ManifestWriter {
Status status;
void CheckConsistency(Version* v) {
#ifndef NDEBUG
- for (int level = 0; level < vset_->NumberLevels(); level++) {
- // Make sure there is no overlap in levels > 0
- if (level > 0) {
- for (uint32_t i = 1; i < v->files_[level].size(); i++) {
- const InternalKey& prev_end = v->files_[level][i-1]->largest;
- const InternalKey& this_begin = v->files_[level][i]->smallest;
- if (vset_->icmp_.Compare(prev_end, this_begin) >= 0) {
- fprintf(stderr, "overlapping ranges in same level %s vs. %s\n",
- prev_end.DebugString().c_str(),
- this_begin.DebugString().c_str());
- abort();
- }
- }
- }
- }
#endif
}
for (; base_iter != base_end; ++base_iter) {
MaybeAddFile(v, level, *base_iter);
}
+
+ v->RebuildGroups(level);
}
CheckConsistency(v);
}
// File is deleted: do nothing
} else {
std::vector<FileMetaData*>* files = &v->files_[level];
- if (level > 0 && !files->empty()) {
- // Must not overlap
- assert(vset_->icmp_.Compare((*files)[files->size()-1]->largest,
- f->smallest) < 0);
- }
f->refs++;
files->push_back(f);
}
return result;
}
+uint64_t FileGroup::ApproximateOffsetOf(const VersionSet* vset,
+ const InternalKey& ikey) {
+ // ikey is before FileGroup so no overlap occurs.
+ if (vset->icmp_.Compare(ikey, smallest) < 0) return 0;
+ // Entire FileGroup is after ikey so just use total size.
+ if (vset->icmp_.Compare(largest, ikey) <= 0) return total_file_size;
+
+ uint64_t result = 0;
+ for (size_t i = 0; i < files.size(); i++) {
+ if (vset->icmp_.Compare(files[i]->largest, ikey) <= 0) {
+ // Entire file is before "ikey", so just add the file size
+ result += files[i]->file_size;
+ } else if (vset->icmp_.Compare(files[i]->smallest, ikey) > 0) {
+ // Entire file is after "ikey", so ignore
+
+ // files are sorted by start point so no more overlaps will occur.
+ break;
+ } else {
+ // "ikey" falls in the range for this table. Add the
+ // approximate offset of "ikey" within the table.
+ Table* tableptr;
+ Iterator* iter = vset->table_cache_->NewIterator(
+ ReadOptions(), files[i]->number, files[i]->file_size, &tableptr);
+ if (tableptr != NULL) {
+ result += tableptr->ApproximateOffsetOf(ikey.Encode());
+ }
+ delete iter;
+ }
+ }
+ return result;
+}
uint64_t VersionSet::ApproximateOffsetOf(Version* v, const InternalKey& ikey) {
uint64_t result = 0;
for (int level = 0; level < NumberLevels(); level++) {
- const std::vector<FileMetaData*>& files = v->files_[level];
- for (size_t i = 0; i < files.size(); i++) {
- if (icmp_.Compare(files[i]->largest, ikey) <= 0) {
- // Entire file is before "ikey", so just add the file size
- result += files[i]->file_size;
- } else if (icmp_.Compare(files[i]->smallest, ikey) > 0) {
- // Entire file is after "ikey", so ignore
- if (level > 0) {
- // Files other than level 0 are sorted by meta->smallest, so
- // no further files in this level will contain data for
- // "ikey".
- break;
- }
- } else {
- // "ikey" falls in the range for this table. Add the
- // approximate offset of "ikey" within the table.
- Table* tableptr;
- Iterator* iter = table_cache_->NewIterator(
- ReadOptions(), files[i]->number, files[i]->file_size, &tableptr);
- if (tableptr != NULL) {
- result += tableptr->ApproximateOffsetOf(ikey.Encode());
- }
- delete iter;
+ const std::vector<FileGroup*>& groups = v->file_groups_[level];
+ for (size_t i = 0; i < groups.size(); i++) {
+ // We can use FileGroup's ApproximateOffsetOf as it is efficient.
+ uint64_t group_offset = groups[i]->ApproximateOffsetOf(this, ikey);
+ result += group_offset;
+
+ if (group_offset == 0) {
+ // Since groups are non-empty we are guaranteed other groups after this
+ // one won't contain the key, as either the key falls before the
+ // current group or it is the first key in the group.
+ //
+ // So we can stop looping.
+ break;
}
}
}
GetRange(all, smallest, largest);
}
+namespace {
+void DeleteFileGroups(void* arg1, void* arg2) {
+ std::vector<FileGroup*>* groups =
+ reinterpret_cast<std::vector<FileGroup*>*>(arg1);
+
+ for (size_t i = 0; i < groups->size(); ++i) {
+ delete (*groups)[i];
+ }
+ delete groups;
+}
+}
Iterator* VersionSet::MakeInputIterator(Compaction* c) {
ReadOptions options;
options.verify_checksums = options_->paranoid_checks;
options.fill_cache = false;
- // Level-0 files have to be merged together. For other levels,
- // we will make a concatenating iterator per level.
- // TODO(opt): use concatenating iterator for level-0 if there is no overlap
- const int space = (c->level() == 0 ? c->inputs_[0].size() + 1 : 2);
+ // We construct groups of the input files, one for each level, then we merge
+ // these iterators together.
+ const int space = 2;
Iterator** list = new Iterator*[space];
int num = 0;
for (int which = 0; which < 2; which++) {
if (!c->inputs_[which].empty()) {
- if (c->level() + which == 0) {
- const std::vector<FileMetaData*>& files = c->inputs_[which];
- for (size_t i = 0; i < files.size(); i++) {
- list[num++] = table_cache_->NewIterator(
- options, files[i]->number, files[i]->file_size);
- }
- } else {
- // Create concatenating iterator for the files from this level
- list[num++] = NewTwoLevelIterator(
- new Version::LevelFileNumIterator(icmp_, &c->inputs_[which]),
- &GetFileIterator, table_cache_, options);
- }
+ std::vector<FileGroup*>* groups = new std::vector<FileGroup*>();
+ GroupFiles(&icmp_, c->inputs_[which], *groups);
+ list[num] = NewGroupsConcatenatingIterator(options, table_cache_,
+ &icmp_, *groups);
+ list[num]->RegisterCleanup(&DeleteFileGroups, groups, NULL);
+ ++num;
}
}
assert(num <= space);
// Do not pick this file if its parents at level+1 are being compacted.
// Maybe we can avoid redoing this work in SetupOtherInputs
- int parent_index = -1;
- if (ParentRangeInCompaction(&f->smallest, &f->largest, level,
- &parent_index)) {
+ if (ParentRangeInCompaction(&f->smallest, &f->largest, level)) {
continue;
}
c->inputs_[0].push_back(f);
- c->base_index_ = index;
- c->parent_index_ = parent_index;
break;
}
c->input_version_ = current_;
c->input_version_->Ref();
- // Files in level 0 may overlap each other, so pick up all overlapping ones
- // Two level 0 compaction won't run at the same time, so don't need to worry
- // about files on level 0 being compacted.
- if (level == 0) {
- assert(compactions_in_progress_[0].empty());
+ // Files may overlap each other, so pick up all overlapping ones
+ {
InternalKey smallest, largest;
GetRange(c->inputs_[0], &smallest, &largest);
// Note that the next call will discard the file we placed in
// c->inputs_[0] earlier and replace it with an overlapping set
// which will include the picked file.
c->inputs_[0].clear();
- current_->GetOverlappingInputs(0, &smallest, &largest, &c->inputs_[0]);
- if (ParentRangeInCompaction(&smallest, &largest,
- level, &c->parent_index_)) {
+ current_->GetOverlappingInputs(level, &smallest, &largest, &c->inputs_[0]);
+ if (ParentRangeInCompaction(&smallest, &largest, level)) {
delete c;
return NULL;
}
+ for (unsigned int i = 0; i < c->inputs_[0].size(); i++) {
+ FileMetaData* f = c->inputs_[0][i];
+ if (f->being_compacted) {
+ delete c;
+ return NULL;
+ }
+ }
assert(!c->inputs_[0].empty());
}
// Returns true if any one of the parent files are being compacted
bool VersionSet::ParentRangeInCompaction(const InternalKey* smallest,
- const InternalKey* largest, int level, int* parent_index) {
+ const InternalKey* largest, int level) {
+ assert(level >= 0);
+ assert(level < NumberLevels()-1);
std::vector<FileMetaData*> inputs;
current_->GetOverlappingInputs(level+1, smallest, largest,
- &inputs, *parent_index, parent_index);
+ &inputs);
return FilesInCompaction(inputs);
}
void VersionSet::SetupOtherInputs(Compaction* c) {
const int level = c->level();
+ assert(level >= 0);
+ assert(level < NumberLevels()-1);
+
InternalKey smallest, largest;
GetRange(c->inputs_[0], &smallest, &largest);
- current_->GetOverlappingInputs(level+1, &smallest, &largest, &c->inputs_[1],
- c->parent_index_, &c->parent_index_);
+ current_->GetOverlappingInputs(level+1, &smallest, &largest, &c->inputs_[1]);
// Get entire range covered by compaction
InternalKey all_start, all_limit;
// changing the number of "level+1" files we pick up.
if (!c->inputs_[1].empty()) {
std::vector<FileMetaData*> expanded0;
- current_->GetOverlappingInputs(level, &all_start, &all_limit, &expanded0,
- c->base_index_, NULL);
+ current_->GetOverlappingInputs(level, &all_start, &all_limit, &expanded0);
const int64_t inputs0_size = TotalFileSize(c->inputs_[0]);
const int64_t inputs1_size = TotalFileSize(c->inputs_[1]);
const int64_t expanded0_size = TotalFileSize(expanded0);
GetRange(expanded0, &new_start, &new_limit);
std::vector<FileMetaData*> expanded1;
current_->GetOverlappingInputs(level+1, &new_start, &new_limit,
- &expanded1, c->parent_index_,
- &c->parent_index_);
+ &expanded1);
if (expanded1.size() == c->inputs_[1].size() &&
!FilesInCompaction(expanded1)) {
Log(options_->info_log,
// Compute the set of grandparent files that overlap this compaction
// (parent == level+1; grandparent == level+2)
if (level + 2 < NumberLevels()) {
+ std::vector<FileMetaData*> grandparents;
current_->GetOverlappingInputs(level + 2, &all_start, &all_limit,
- &c->grandparents_);
+ &grandparents);
+ GroupFiles(&icmp_, grandparents, c->grandparents_);
}
if (false) {
grandparent_index_(0),
seen_key_(false),
overlapped_bytes_(0),
- base_index_(-1),
- parent_index_(-1),
score_(0) {
edit_ = new VersionEdit(number_levels_);
level_ptrs_ = new size_t[number_levels_];
if (input_version_ != NULL) {
input_version_->Unref();
}
+ for (FileGroup* g: grandparents_) {
+ delete g;
+ }
}
bool Compaction::IsTrivialMove() const {
icmp->Compare(internal_key,
grandparents_[grandparent_index_]->largest.Encode()) > 0) {
if (seen_key_) {
- overlapped_bytes_ += grandparents_[grandparent_index_]->file_size;
+ overlapped_bytes_ += grandparents_[grandparent_index_]->total_file_size;
}
assert(grandparent_index_ + 1 >= grandparents_.size() ||
icmp->Compare(grandparents_[grandparent_index_]->largest.Encode(),
const Slice* smallest_user_key,
const Slice* largest_user_key);
+// Group of files that overlap.
+class FileGroup {
+ public:
+ // smallest and largest store the smallest and largest key in any file in
+ // files.
+ InternalKey smallest;
+ InternalKey largest;
+
+ // Files are sorted by their smallest key.
+ std::vector<FileMetaData*> files;
+
+ // Total size of all files in files.
+ uint64_t total_file_size;
+
+ // Returns an iterator that iterates over all key-value pairs in this group.
+ // No duplicate suppression is done.
+ Iterator* NewIterator(TableCache* cache, const InternalKeyComparator* icmp,
+ const ReadOptions& options);
+
+ // Return the approximate offset in the group of the data for
+ // "ikey".
+ uint64_t ApproximateOffsetOf(const VersionSet* vset,
+ const InternalKey& ikey);
+};
+
+// Given a comparator and list of files, groups files into groups of
+// overlapping files. The groups are stored in file_groups and are
+// non-overlapping and are sorted by the given comparator. Additionally,
+// it is guaranteed that no group is empty and each group has the properties
+// guaranteed in FileGroup.
+// file_groups's contents are discarded by this function.
+void GroupFiles(
+ const InternalKeyComparator* internal_comparator,
+ const std::vector<FileMetaData*>& files,
+ std::vector<FileGroup*>& file_groups);
+
+// Return the smallest index i such that groups[i]->largest >= key.
+// Return groups.size() if there is no such group.
+// REQUIRES: "groups" contains a sorted list of non-overlapping groups.
+extern size_t FindGroup(const InternalKeyComparator& icmp,
+ const std::vector<FileGroup*>& groups,
+ const Slice& key);
+
+// Returns true iff some group in "groups" overlaps the user key range
+// [*smallest,*largest].
+// smallest==NULL represents a key smaller than all keys in the DB.
+// largest==NULL represents a key largest than all keys in the DB.
+// REQUIRES: "groups" contains a sorted list of non-overlapping groups.
+extern bool SomeGroupOverlapsRange(
+ const InternalKeyComparator& icmp,
+ const std::vector<FileGroup*>& groups,
+ const Slice* smallest_user_key,
+ const Slice* largest_user_key);
+
class Version {
public:
// Append to *iters a sequence of iterators that will
int level,
const InternalKey* begin, // NULL means before all keys
const InternalKey* end, // NULL means after all keys
- std::vector<FileMetaData*>* inputs,
- int hint_index = -1, // index of overlap file
- int* file_index = NULL); // return index of overlap file
-
- void GetOverlappingInputsBinarySearch(
- int level,
- const Slice& begin, // NULL means before all keys
- const Slice& end, // NULL means after all keys
- std::vector<FileMetaData*>* inputs,
- int hint_index, // index of overlap file
- int* file_index); // return index of overlap file
-
- void ExtendOverlappingInputs(
- int level,
- const Slice& begin, // NULL means before all keys
- const Slice& end, // NULL means after all keys
- std::vector<FileMetaData*>* inputs,
- int index); // start extending from this index
+ std::vector<FileMetaData*>* inputs);
// Returns true iff some file in the specified level overlaps
// some part of [*smallest_user_key,*largest_user_key].
private:
friend class Compaction;
+ friend class FileGroup;
friend class VersionSet;
- class LevelFileNumIterator;
- Iterator* NewConcatenatingIterator(const ReadOptions&, int level) const;
+ void RebuildGroups(int level);
VersionSet* vset_; // VersionSet to which this Version belongs
Version* next_; // Next version in linked list
int refs_; // Number of live refs to this version
// List of files per level, files in each level are arranged
- // in increasing order of keys
+ // in increasing order of their start keys. Files may overlap.
std::vector<FileMetaData*>* files_;
+ // List of file groups per level.
+ std::vector<FileGroup*>* file_groups_;
+
// A list for the same set of files that are stored in files_,
// but files in each level are now sorted based on file
// size. The file with the largest size is at the front.
struct ManifestWriter;
friend class Compaction;
+ friend class FileGroup;
friend class Version;
void Init(int num_levels);
// Returns true if any one of the parent files are being compacted
bool ParentRangeInCompaction(const InternalKey* smallest,
- const InternalKey* largest, int level, int* index);
+ const InternalKey* largest, int level);
// Returns true if any one of the specified files are being compacted
bool FilesInCompaction(std::vector<FileMetaData*>& files);
// State used to check for number of of overlapping grandparent files
// (parent == level_ + 1, grandparent == level_ + 2)
- std::vector<FileMetaData*> grandparents_;
+ std::vector<FileGroup*> grandparents_;
size_t grandparent_index_; // Index in grandparent_starts_
bool seen_key_; // Some output key has been seen
int64_t overlapped_bytes_; // Bytes of overlap between current output
// and grandparent files
- int base_index_; // index of the file in files_[level_]
- int parent_index_; // index of some file with same range in files_[level_+1]
double score_; // score that was used to pick this compaction.
// State for implementing IsBaseLevelForKey
ASSERT_TRUE(Overlaps("600", "700"));
}
+class FileGroupTest {
+ public:
+ std::vector<FileMetaData*> files_;
+ std::vector<FileGroup*> groups_;
+
+ FileGroupTest() { }
+
+ ~FileGroupTest() {
+ for (unsigned int i = 0; i < groups_.size(); i++) {
+ delete groups_[i];
+ }
+
+ for (unsigned int i = 0; i < files_.size(); i++) {
+ delete files_[i];
+ }
+ }
+
+ void AddFile(const char* smallest, const char* largest,
+ SequenceNumber smallest_seq = 100,
+ SequenceNumber largest_seq = 100) {
+ FileMetaData* f = new FileMetaData;
+ f->number = files_.size() + 1;
+ f->smallest = InternalKey(smallest, smallest_seq, kTypeValue);
+ f->largest = InternalKey(largest, largest_seq, kTypeValue);
+ f->file_size = smallest_seq + (largest_seq - smallest_seq)/2;
+ files_.push_back(f);
+ }
+
+ void RebuildGroups() {
+ for (unsigned int i = 0; i < groups_.size(); i++) {
+ delete groups_[i];
+ }
+
+ InternalKeyComparator cmp(BytewiseComparator());
+ GroupFiles(&cmp, files_, groups_);
+
+ CheckGroupsValid();
+ }
+
+ size_t Find(const char* key, SequenceNumber seq = 100) {
+ InternalKey target(key, seq, kTypeValue);
+ InternalKeyComparator cmp(BytewiseComparator());
+ return FindGroup(cmp, groups_, target.Encode());
+ }
+
+ bool Overlaps(const char* smallest, const char* largest) {
+ InternalKeyComparator cmp(BytewiseComparator());
+ Slice s(smallest != NULL ? smallest : "");
+ Slice l(largest != NULL ? largest : "");
+ return SomeGroupOverlapsRange(cmp, groups_,
+ (smallest != NULL ? &s : NULL),
+ (largest != NULL ? &l : NULL));
+ }
+
+ void CheckGroupsValid() {
+ InternalKeyComparator cmp(BytewiseComparator());
+
+ // Check group internal consistency.
+ for (size_t i = 0; i < groups_.size(); ++i) {
+ FileGroup& group = *groups_[i];
+
+ // Check all groups are non-empty
+ ASSERT_TRUE(!group.files.empty());
+
+ // Check files in all groups are sorted by smallest key.
+ for (size_t j = 0; j < group.files.size()-1; ++j) {
+ ASSERT_TRUE(cmp.Compare(group.files[j]->smallest,
+ group.files[j+1]->smallest) < 0);
+ }
+
+ // Check smallest key in group is correct.
+ ASSERT_TRUE(cmp.Compare(group.smallest, group.files[0]->smallest) == 0);
+
+ // Check files in group actually overlap.
+ InternalKey largest_so_far = group.files[0]->largest;
+ for (size_t j = 0; j < group.files.size(); ++j) {
+ ASSERT_TRUE(cmp.Compare(largest_so_far,
+ group.files[j]->smallest) >= 0);
+
+ if (cmp.Compare(largest_so_far, group.files[j]->largest) < 0) {
+ largest_so_far = group.files[j]->largest;
+ }
+ }
+
+ // Check largest key in group is correct.
+ ASSERT_TRUE(cmp.Compare(group.largest, largest_so_far) == 0);
+
+ // Check total file size is valid.
+ uint64_t total_file_size = 0;
+ for (size_t j = 0; j < group.files.size(); ++j) {
+ total_file_size += group.files[j]->file_size;
+ }
+ ASSERT_EQ(total_file_size, group.total_file_size);
+ }
+
+ // Check groups are sorted and don't overlap.
+ for (size_t i = 0; i < groups_.size()-1; ++i) {
+ ASSERT_TRUE(cmp.Compare(groups_[i]->largest,
+ groups_[i+1]->smallest) < 0);
+ }
+ }
+};
+
+TEST(FileGroupTest, Construction) {
+ AddFile("150", "600");
+ AddFile("250", "700");
+ AddFile("650", "800");
+ AddFile("801", "805");
+ RebuildGroups();
+
+ ASSERT_EQ(groups_.size(), 2u);
+}
+
+TEST(FileGroupTest, OverlappingGroups) {
+ AddFile("150", "600");
+ AddFile("250", "700");
+ AddFile("650", "800");
+ AddFile("851", "855");
+ RebuildGroups();
+
+ ASSERT_TRUE(Overlaps(NULL, NULL));
+ ASSERT_TRUE(!Overlaps(NULL, "149"));
+ ASSERT_TRUE(Overlaps(NULL, "150"));
+ ASSERT_TRUE(Overlaps("855", NULL));
+ ASSERT_TRUE(!Overlaps("801", "850"));
+ ASSERT_TRUE(Overlaps("601", "649"));
+}
+
+TEST(FileGroupTest, FindingGroups) {
+ AddFile("100", "200");
+ AddFile("300", "400");
+ AddFile("600", "800");
+ RebuildGroups();
+
+ ASSERT_EQ(groups_.size(), 3u);
+
+ ASSERT_EQ(Find("0"), 0u);
+ ASSERT_EQ(Find("200"), 0u);
+ ASSERT_EQ(Find("201"), 1u);
+ ASSERT_EQ(Find("400"), 1u);
+ ASSERT_EQ(Find("401"), 2u);
+ ASSERT_EQ(Find("800"), 2u);
+ ASSERT_EQ(Find("801"), 3u);
+}
+
} // namespace leveldb
int main(int argc, char** argv) {
// REQUIRES: handler has been added with AddHandler()
virtual void RemoveHandler(void* handler);
+ // Forcefully flushes all metrics currently held by the cache.
+ virtual void ForceFlushMetrics();
+
private:
void LRU_Remove(Handle* e);
void LRU_Append(Handle* e);
// If an error has occurred, return it. Else return an ok status.
virtual Status status() const = 0;
+ // If this Iterator makes use of some sub-Iterator to provide the current
+ // key-value pair, then this function returns a pointer to that
+ // Iterator-instance.
+ //
+ // Returns NULL if either Valid() is false or no sub-Iterator is providing
+ // the key-value pair.
+ virtual const Iterator* FindSubIterator() const { return NULL; };
+
// Clients are allowed to register function/arg1/arg2 triples that
// will be invoked when this iterator is destroyed.
//
// Default: NULL
const Snapshot* snapshot;
+ // If true accesses are recorded when the database is in hot-cold mode.
+ // Otherwise, they aren't.
+ // Default: true;
+ bool record_accesses;
+
ReadOptions()
: verify_checksums(false),
fill_cache(true),
snapshot(NULL),
+ record_accesses(true),
metrics_handler(NULL) {
}
ReadOptions(bool cksum, bool cache) :
verify_checksums(cksum), fill_cache(cache),
snapshot(NULL),
+ record_accesses(true),
metrics_handler(NULL) {
}
}
private:
- friend class Block;
-
void CorruptionError() {
current_ = restarts_;
restart_index_ = num_restarts_;
// This is the iterator returned by Block::NewMetricsIterator() on success.
class Block::MetricsIter : public Block::Iter {
private:
+ uint64_t file_number_;
+ uint64_t block_offset_;
BlockMetrics* metrics_;
public:
MetricsIter(const Comparator* comparator,
+ uint64_t file_number,
+ uint64_t block_offset,
const char* data,
uint32_t restarts,
uint32_t num_restarts,
BlockMetrics* metrics)
: Block::Iter(comparator, data, restarts, num_restarts),
+ file_number_(file_number),
+ block_offset_(block_offset),
metrics_(metrics) {
}
}
private:
+ friend class Block;
+
void RecordAccess() {
if (metrics_ != NULL && Valid()) {
metrics_->RecordAccess(restart_index_, restart_offset_);
}
}
+
+Iterator* Block::NewIterator(const Comparator* cmp,
+ uint64_t file_number,
+ uint64_t block_offset) {
+ if (size_ < 2*sizeof(uint32_t)) {
+ return NewErrorIterator(Status::Corruption("bad block contents"));
+ }
+ const uint32_t num_restarts = NumRestarts();
+ if (num_restarts == 0) {
+ return NewEmptyIterator();
+ } else {
+ return new MetricsIter(cmp, file_number, block_offset, data_,
+ restart_offset_, num_restarts, NULL);
+ }
+}
+
Iterator* Block::NewMetricsIterator(const Comparator* cmp,
uint64_t file_number,
uint64_t block_offset,
} else {
*metrics = new BlockMetrics(file_number, block_offset, num_restarts,
kBytesPerRestart);
- return new MetricsIter(cmp, data_, restart_offset_, num_restarts,
- *metrics);
+ return new MetricsIter(cmp, file_number, block_offset, data_,
+ restart_offset_, num_restarts, *metrics);
}
}
+bool Block::GetBlockIterInfo(const Iterator* iter,
+ uint64_t& file_number,
+ uint64_t& block_offset,
+ uint32_t& restart_index,
+ uint32_t& restart_offset) {
+ const MetricsIter* biter = dynamic_cast<const MetricsIter*>(iter);
+
+ if (biter == NULL) {
+ return false;
+ }
+
+ file_number = biter->file_number_;
+ block_offset = biter->block_offset_;
+ restart_index = biter->restart_index_;
+ restart_offset = biter->restart_offset_;
+
+ return true;
+}
+
+
BlockMetrics::BlockMetrics(uint64_t file_number, uint64_t block_offset,
uint32_t num_restarts, uint32_t bytes_per_restart)
: file_number_(file_number),
memcpy(metrics_, data.data(), kBlockMetricsSize);
}
+void BlockMetrics::CreateDBKey(uint64_t file_number, uint64_t block_offset,
+ std::string* db_key) {
+ assert(db_key != NULL);
+
+ db_key->clear();
+ PutFixed64(db_key, file_number);
+ PutFixed64(db_key, block_offset);
+}
+
BlockMetrics* BlockMetrics::Create(uint64_t file_number, uint64_t block_offset,
const std::string& db_value) {
Slice data(db_value);
bm->block_offset_ == block_offset_);
}
+bool BlockMetrics::IsSameBlock(uint64_t file_number, uint64_t block_offset) const {
+ return (file_number_ == file_number &&
+ block_offset_ == block_offset);
+}
+
void BlockMetrics::Join(const BlockMetrics* bm) {
assert(IsCompatible(bm));
size_t size() const { return size_; }
Iterator* NewIterator(const Comparator* comparator);
+ // Creates an iterator on the block that knows which file and block it
+ // belongs to.
+ Iterator* NewIterator(const Comparator* comparator,
+ uint64_t file_number,
+ uint64_t block_offset);
+
// Creates a new iterator that keeps track of accesses.
//
// Creates a BlockMetrics object on the heap and sets metrics to it.
uint64_t block_offset,
BlockMetrics** metrics);
+ // Returns true if iter is a Block iterator and also knows that which file
+ // and block it belongs to.
+ static bool GetBlockIterInfo(const Iterator* iter,
+ uint64_t& file_number,
+ uint64_t& block_offset,
+ uint32_t& restart_index,
+ uint32_t& restart_offset);
+
private:
uint32_t NumRestarts() const;
BlockMetrics(uint64_t file_number, uint64_t block_offset,
uint32_t num_restarts, uint32_t bytes_per_restart);
+ // Clears and puts the DB key for the file_number-block_offset-pair in
+ // *db_key.
+ static void CreateDBKey(uint64_t file_number, uint64_t block_offset,
+ std::string* db_key);
+
// Creates a BlockMetrics object from the DB key and value. Returns NULL if
// either/both are invalid.
static BlockMetrics* Create(const std::string& db_key,
// Returns true if bm represents metrics for the same block.
bool IsCompatible(const BlockMetrics* bm) const;
+ // Returns true if the given file_number and block_offset match this block's.
+ bool IsSameBlock(uint64_t file_number, uint64_t block_offset) const;
+
// Joins the metrics from the other metrics into this one.
// REQUIRES: this->IsCompatible(bm);
void Join(const BlockMetrics* bm);
return status;
}
+ virtual const Iterator* FindSubIterator() const {
+ if (!Valid()) {
+ return NULL;
+ }
+ return current_->iter();
+ };
+
private:
void FindSmallest();
void FindLargest();
--- /dev/null
+// Copyright (c) 2013 Facebook.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#include "table/metrics_info.h"
+
+#include "leveldb/status.h"
+#include "table/block.h"
+
+namespace leveldb {
+
+bool IsRecordHot(const Iterator* iter, DB* metrics_db,
+ const ReadOptions& metrics_read_opts,
+ BlockMetrics** block_metrics_store) {
+ assert(iter != NULL);
+ assert(metrics_db != NULL);
+ assert(block_metrics_store != NULL);
+
+ static const bool kDefaultHotness = false;
+
+ bool has_block_info = false;
+ uint64_t file_number;
+ uint64_t block_offset;
+ uint32_t restart_index;
+ uint32_t restart_offset;
+
+ do {
+ if (Block::GetBlockIterInfo(iter, file_number, block_offset, restart_index,
+ restart_offset)) {
+ has_block_info = true;
+ continue;
+ }
+ } while ((iter = iter->FindSubIterator()) != NULL);
+
+ if (!has_block_info) {
+ return kDefaultHotness;
+ }
+
+ if ((*block_metrics_store) == NULL ||
+ !(*block_metrics_store)->IsSameBlock(file_number, block_offset)) {
+ // Stored block metrics is invalid so we have to load a new one.
+
+ // Free previous block metrics if any
+ if ((*block_metrics_store) != NULL) {
+ delete *block_metrics_store;
+ *block_metrics_store = NULL;
+ }
+
+ std::string db_key;
+ BlockMetrics::CreateDBKey(file_number, block_offset, &db_key);
+
+ std::string db_value;
+ Status s = metrics_db->Get(metrics_read_opts, db_key, &db_value);
+ if (!s.ok()) {
+ return kDefaultHotness;
+ }
+
+ *block_metrics_store = BlockMetrics::Create(file_number, block_offset,
+ db_value);
+ }
+
+ if ((*block_metrics_store) != NULL) {
+ return (*block_metrics_store)->IsHot(restart_index, restart_offset);
+ }
+
+ return kDefaultHotness;
+}
+
+} // namespace leveldb
--- /dev/null
+// Copyright (c) 2013 Facebook.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#ifndef STORAGE_LEVELDB_TABLE_BLOCK_METRICS_INFO_H_
+#define STORAGE_LEVELDB_TABLE_BLOCK_METRICS_INFO_H_
+
+#include "leveldb/db.h"
+#include "leveldb/iterator.h"
+#include "leveldb/options.h"
+#include <stdint.h>
+
+namespace leveldb {
+
+class BlockMetrics;
+
+// Returns true if the record pointed too by iter is hot according to the
+// value stored in metrics_db.
+//
+// *block_metrics_store temporarily stores a BlockMetrics instance. At the
+// end, when the application no longer plans to call IsRecordHot(), if
+// *block_metrics_store is not NULL then this value must be deleted.
+//
+// REQUIRES: neither iter, metrics_db nor block_metrics_store is NULL.
+bool IsRecordHot(const Iterator* iter, DB* metrics_db,
+ const ReadOptions& metrics_read_opts,
+ BlockMetrics** block_metrics_store);
+
+} // namespace leveldb
+
+#endif // STORAGE_LEVELDB_TABLE_BLOCK_METRICS_INFO_H_
Iterator* iter;
if (block != NULL) {
if (options.metrics_handler == NULL || cache_handle == NULL) {
- iter = block->NewIterator(table->rep_->options.comparator);
+ iter = block->NewIterator(table->rep_->options.comparator,
+ table->rep_->file_number,
+ handle.offset());
if (cache_handle == NULL) {
iter->RegisterCleanup(&DeleteBlock, block, NULL);
}
}
+ virtual const Iterator* FindSubIterator() const {
+ if (!Valid()) {
+ return NULL;
+ }
+ return data_iter_.iter();
+ };
+
+
private:
void SaveError(const Status& s) {
if (status_.ok() && !s.ok()) status_ = s;
}
void Cache::RemoveHandler(void* handler) {
}
+void Cache::ForceFlushMetrics() {
+}
namespace {
void* handler,
void (*handler_func)(void*, std::vector<BlockMetrics*>*));
void RemoveHandler(void* handler);
+ void ForceFlushMetrics();
private:
void LRU_Remove(LRUHandle* e);
metrics_store_.erase(handler);
}
+void LRUCache::ForceFlushMetrics() {
+ std::map<void*, std::vector<BlockMetrics*>*>::iterator it;
+ for (it = metrics_store_.begin();
+ it != metrics_store_.end(); ++it) {
+ void* handler = it->first;
+ (*handlers_[handler])(handler, metrics_store_[handler]);
+ metrics_store_[handler] = new std::vector<BlockMetrics*>();
+ }
+}
static int kNumShardBits = 4; // default values, can be overridden
shard_[i].RemoveHandler(handler);
}
}
+
+ void ForceFlushMetrics() {
+ for (size_t i = 0; i < numShards_; ++i) {
+ shard_[i].ForceFlushMetrics();
+ }
+ }
};
} // end anonymous namespace