method = &Benchmark::ReadWhileWriting;
} else if (name == Slice("readrandomwriterandom")) {
method = &Benchmark::ReadRandomWriteRandom;
+ } else if (name == Slice("readhotwriterandom")) {
+ method = &Benchmark::ReadHotWriteRandom;
} else if (name == Slice("compact")) {
method = &Benchmark::Compact;
} else if (name == Slice("crc32c")) {
thread->stats.AddMessage(msg);
}
+ void ReadHotWriteRandom(ThreadState* thread) {
+ ReadOptions options(FLAGS_verify_checksum, true);
+ RandomGenerator gen;
+ std::string value;
+ long found = 0;
+ int get_weight = 0;
+ int put_weight = 0;
+ long reads_done = 0;
+ long writes_done = 0;
+ // the number of iterations is the larger of read_ or write_
+ for (long i = 0; i < readwrites_; i++) {
+ char key[100];
+ if (get_weight == 0 && put_weight == 0) {
+ // one batch complated, reinitialize for next batch
+ get_weight = FLAGS_readwritepercent;
+ put_weight = 100 - get_weight;
+ }
+ int k = thread->rand.Next() % FLAGS_num;
+ if (get_weight > 0) {
+ k = k/100*100;
+ }
+ snprintf(key, sizeof(key), "%016d", k);
+ if (get_weight > 0) {
+ // do all the gets first
+ if (db_->Get(options, key, &value).ok()) {
+ found++;
+ }
+ get_weight--;
+ reads_done++;
+ } else if (put_weight > 0) {
+ // then do all the corresponding number of puts
+ // for all the gets we have done earlier
+ Status s = db_->Put(write_options_, key, gen.Generate(value_size_));
+ if (!s.ok()) {
+ fprintf(stderr, "put error: %s\n", s.ToString().c_str());
+ exit(1);
+ }
+ put_weight--;
+ writes_done++;
+ }
+ thread->stats.FinishedSingleOp(db_);
+ }
+ char msg[100];
+ snprintf(msg, sizeof(msg), "( reads:%ld writes:%ld total:%ld )",
+ reads_done, writes_done, readwrites_);
+ thread->stats.AddMessage(msg);
+ }
+
void Compact(ThreadState* thread) {
db_->CompactRange(NULL, NULL);
}
return "leveldb.InternalKeyComparator";
}
-int InternalKeyComparator::Compare(const Slice& akey, const Slice& bkey) const {
+int InternalKeyComparator::Compare(const Slice& akey, const Slice& bkey,
+ bool* user_key_eq) const {
+ assert(user_key_eq != nullptr);
// Order by:
// increasing user key (according to user-supplied comparator)
// decreasing sequence number
// decreasing type (though sequence# should be enough to disambiguate)
int r = user_comparator_->Compare(ExtractUserKey(akey), ExtractUserKey(bkey));
if (r == 0) {
+ *user_key_eq = true;
const uint64_t anum = DecodeFixed64(akey.data() + akey.size() - 8);
const uint64_t bnum = DecodeFixed64(bkey.data() + bkey.size() - 8);
if (anum > bnum) {
} else if (anum < bnum) {
r = +1;
}
+ } else {
+ *user_key_eq = false;
}
return r;
}
+int InternalKeyComparator::Compare(const Slice& akey, const Slice& bkey) const {
+ bool ignored;
+ return Compare(akey, bkey, &ignored);
+}
+
void InternalKeyComparator::FindShortestSeparator(
std::string* start,
const Slice& limit) const {
const Comparator* user_comparator() const { return user_comparator_; }
+ // Compares "a" and "b" and sets "user_key_eq" to true if they have an user
+ // key that compares equal under the user comparator and false otherwise.
+ //
+ // We need to expose this information in terms of an extra flag, because ties
+ // where both values have the same user key are broken with sequence number
+ // and record type.
+ int Compare(const Slice& a, const Slice& b, bool* user_key_eq) const;
int Compare(const InternalKey& a, const InternalKey& b) const;
};
#include <vector>
#include <algorithm>
+#include "db/dbformat.h"
#include "leveldb/db.h"
#include "leveldb/comparator.h"
#include "table/format.h"
value_ = Slice(data_ + offset, 0);
}
+ // Returns the first restart index possibly containing "target".
+ // Returns "num_restarts_" if this search encounters an error key.
+ uint32_t FindFirstRestartPointContaining(const Slice& target) {
+ // Binary search in restart array to find the first restart point
+ // with a key >= target
+ uint32_t left = 0;
+ uint32_t right = num_restarts_ - 1;
+ while (left < right) {
+ uint32_t mid = (left + right + 1) / 2;
+ uint32_t region_offset = GetRestartPoint(mid);
+ uint32_t shared, non_shared, value_length;
+ const char* key_ptr = DecodeEntry(data_ + region_offset,
+ data_ + restarts_,
+ &shared, &non_shared, &value_length);
+ if (key_ptr == NULL || (shared != 0)) {
+ CorruptionError();
+ return num_restarts_;
+ }
+ Slice mid_key(key_ptr, non_shared);
+ if (Compare(mid_key, target) < 0) {
+ // Key at "mid" is smaller than "target". Therefore all
+ // blocks before "mid" are uninteresting.
+ left = mid;
+ } else {
+ // Key at "mid" is >= "target". Therefore all blocks at or
+ // after "mid" are uninteresting.
+ right = mid - 1;
+ }
+ }
+
+ return left;
+ }
+
public:
Iter(const Comparator* comparator,
uint64_t file_number,
}
virtual void Seek(const Slice& target) {
- // Binary search in restart array to find the first restart point
- // with a key >= target
- uint32_t left = 0;
- uint32_t right = num_restarts_ - 1;
- while (left < right) {
- uint32_t mid = (left + right + 1) / 2;
- uint32_t region_offset = GetRestartPoint(mid);
- uint32_t shared, non_shared, value_length;
- const char* key_ptr = DecodeEntry(data_ + region_offset,
- data_ + restarts_,
- &shared, &non_shared, &value_length);
- if (key_ptr == NULL || (shared != 0)) {
- CorruptionError();
- return;
- }
- Slice mid_key(key_ptr, non_shared);
- if (Compare(mid_key, target) < 0) {
- // Key at "mid" is smaller than "target". Therefore all
- // blocks before "mid" are uninteresting.
- left = mid;
- } else {
- // Key at "mid" is >= "target". Therefore all blocks at or
- // after "mid" are uninteresting.
- right = mid - 1;
- }
- }
-
// Linear search (within restart block) for first key >= target
- SeekToRestartPoint(left);
+ SeekToRestartPoint(FindFirstRestartPointContaining(target));
while (true) {
if (!ParseNextKey()) {
return;
}
}
- private:
+ protected:
friend class Block;
void CorruptionError() {
Cache::Handle* cache_handle_;
void* metrics_handler_;
BlockMetrics* metrics_;
+ const InternalKeyComparator* icmp_;
public:
MetricsIter(const Comparator* comparator,
cache_(cache),
cache_handle_(cache_handle),
metrics_handler_(metrics_handler),
- metrics_(NULL) {
+ metrics_(NULL),
+ icmp_(dynamic_cast<const InternalKeyComparator*>(comparator_)) {
}
virtual ~MetricsIter() {
}
virtual void Seek(const Slice& target) {
- Block::Iter::Seek(target);
- RecordAccess();
+ if (icmp_ == nullptr) {
+ Block::Iter::Seek(target);
+ return;
+ }
+
+ SeekToRestartPoint(FindFirstRestartPointContaining(target));
+ bool user_key_eq = false;
+ while (true) {
+ if (!ParseNextKey()) {
+ return;
+ }
+ if (icmp_->Compare(key_, target, &user_key_eq) >= 0) {
+ break;
+ }
+ }
+
+ if (user_key_eq && Valid()) {
+ RecordAccess();
+ }
}
private:
void RecordAccess() {
- if (Valid()) {
- if (metrics_ == nullptr) {
- metrics_ = new BlockMetrics(file_number_, block_offset_,
- num_restarts_, kBytesPerRestart);
- }
- metrics_->RecordAccess(restart_index_, restart_offset_);
+ assert(Valid());
+ if (metrics_ == nullptr) {
+ metrics_ = new BlockMetrics(file_number_, block_offset_,
+ num_restarts_, kBytesPerRestart);
}
+ metrics_->RecordAccess(restart_index_, restart_offset_);
}
};
uint64_t block_offset);
// Creates a new iterator that keeps track of accesses. When this iterator is
- // deleted it frees the cache handle and passes the metrics to the cache specified.
+ // deleted it frees the cache handle and passes the metrics to the cache
+ // specified.
// REQUIRES: cache, cache_handle, metrics_handler must be non-NULL
Iterator* NewMetricsIterator(const Comparator* comparator,
uint64_t file_number,
delete iter;
}
+class MockCache : public Cache {
+ public:
+ bool generated_metrics;
+ bool was_released;
+
+ public:
+ MockCache() {
+ Reset();
+ }
+
+ // Stub implementations so class compiles
+ virtual Cache::Handle* Insert(const Slice& key, void* value, size_t charge,
+ void (*deleter)(const Slice& key, void* value)) {
+ return nullptr;
+ }
+ virtual Cache::Handle* Lookup(const Slice& key) { return nullptr; }
+ virtual void* Value(Cache::Handle* handle) { return nullptr; }
+ virtual void Erase(const Slice& key) {}
+ virtual uint64_t NewId() { return 4;}
+ virtual size_t GetCapacity() { return 0; }
+
+ virtual void Release(Cache::Handle* handle) {
+ generated_metrics = false;
+ was_released = true;
+ }
+
+ virtual void ReleaseAndRecordMetrics(Cache::Handle* handle, void* handler,
+ BlockMetrics* metrics) {
+ generated_metrics = metrics != nullptr;
+ delete metrics;
+ was_released = true;
+ }
+
+ void Reset() {
+ generated_metrics = false;
+ was_released = false;
+ }
+};
+
+TEST(BlockTest, MetricsIter) {
+ Random rnd(301);
+ Options options = Options();
+ options.comparator = new InternalKeyComparator(options.comparator);
+ std::vector<std::string> keys;
+ BlockBuilder builder(&options);
+ int num_records = 1000;
+ char buf[20];
+ char* p = &buf[0];
+
+ // add a bunch of records to a block
+ for (int i = 0; i < num_records; i++) {
+ // generate random kvs
+ sprintf(p, "%6d", i);
+ std::string k(p);
+ std::string v = RandomString(&rnd, 100); // 100 byte values
+
+ // write kvs to the block
+ InternalKey ik(k, 100, kTypeValue);
+ Slice key = ik.Encode();
+ Slice value(v);
+ if (i % 2 == 0) { // only add even keys
+ builder.Add(key, value);
+ }
+
+ // remember kvs in a lookaside array
+ keys.push_back(key.ToString());
+ }
+
+ // read serialized contents of the block
+ Slice rawblock = builder.Finish();
+
+ // create block reader
+ BlockContents contents;
+ contents.data = rawblock;
+ contents.cachable = false;
+ contents.heap_allocated = false;
+ Block reader(contents);
+
+ MockCache c;
+ Cache::Handle* ch = reinterpret_cast<Cache::Handle*>(&c);
+
+ c.Reset();
+ Iterator* iter = reader.NewMetricsIterator(options.comparator, 0, 0,
+ &c, ch, &c);
+ delete iter;
+ ASSERT_TRUE(c.was_released);
+ ASSERT_TRUE(!c.generated_metrics) << "needlessly generated metrics.\n";
+
+ for (int i = 1; i < num_records; i += 2) {
+ c.Reset();
+ iter = reader.NewMetricsIterator(options.comparator, 0, 0, &c, ch, &c);
+ iter->Seek(keys[i]);
+ delete iter;
+ ASSERT_TRUE(c.was_released);
+ ASSERT_TRUE(!c.generated_metrics) << "generated metrics for unfound row.\n";
+ }
+
+ c.Reset();
+ iter = reader.NewMetricsIterator(options.comparator, 0, 0, &c, ch, &c);
+ iter->SeekToFirst();
+ iter->Next();
+ iter->SeekToLast();
+ iter->Prev();
+ delete iter;
+ ASSERT_TRUE(c.was_released);
+ ASSERT_TRUE(!c.generated_metrics) << "generated metrics for non-Seek().\n";
+
+ for (int i = 0; i < num_records; i += 2) {
+ c.Reset();
+ iter = reader.NewMetricsIterator(options.comparator, 0, 0, &c, ch, &c);
+ iter->Seek(keys[i]);
+ delete iter;
+ ASSERT_TRUE(c.was_released);
+ ASSERT_TRUE(c.generated_metrics) << "didn't generate metrics for found row\n";
+ }
+
+ c.Reset();
+ iter = reader.NewMetricsIterator(options.comparator, 0, 0, &c, ch, &c);
+ iter->Seek(InternalKey(" 2", 140, kTypeDeletion).Encode());
+ delete iter;
+ ASSERT_TRUE(c.was_released);
+ ASSERT_TRUE(c.generated_metrics) << "didn't generate metrics for Seek().\n";
+}
+
class BlockMetricsTest {
private:
public: