bg_cv_(&mutex_),
mem_(new MemTable(internal_comparator_, NumberLevels())),
logfile_number_(0),
+ tmp_metrics_store_(new std::vector<BlockMetrics*>()),
tmp_batch_(new WriteBatch),
bg_compaction_scheduled_(0),
bg_logstats_scheduled_(false),
host_name_ = "localhost";
}
last_log_ts = 0;
-
- // Set up metrics taking
- if (is_hotcold_) {
- assert(options_.block_cache != NULL);
-
- options_.block_cache->AddHandler(this, &DBImpl::HandleMetrics);
- }
}
DBImpl::~DBImpl() {
if (flush_on_destroy_) {
FlushMemTable(FlushOptions());
}
+ mutex_.Lock();
+ assert(tmp_metrics_store_ != nullptr);
if (is_hotcold_) {
- options_.block_cache->RemoveHandler(this);
+ for (BlockMetrics* bm: *tmp_metrics_store_) {
+ delete bm;
+ }
+ tmp_metrics_store_->clear();
}
- mutex_.Lock();
+ assert(tmp_metrics_store_->empty());
+ delete tmp_metrics_store_;
shutting_down_.Release_Store(this); // Any non-NULL value is ok
while (bg_compaction_scheduled_ || bg_logstats_scheduled_ ||
bg_flushing_metrics_scheduled_) {
assert(is_hotcold_);
assert(options_.block_cache != NULL);
- options_.block_cache->ForceFlushMetrics();
+ mutex_.Lock();
+ FlushTempMetrics();
+ mutex_.Unlock();
TEST_WaitForMetricsFlush();
}
return versions_->LogAndApply(compact->compaction->edit(), &mutex_);
}
-namespace {
-// Helper function that takes a vector of BlockMetrics and joins all the
-// metrics together that share the same key.
-//
-// Assumes it has exclusive access to metrics.
-void CompactMetrics(std::vector<BlockMetrics*>& metrics) {
- std::map<std::string, BlockMetrics*> unique;
-
- for (size_t i = 0; i < metrics.size(); ++i) {
- assert(metrics[i] != NULL);
- const std::string& key = metrics[i]->GetDBKey();
-
- if (unique.count(key) == 0) {
- unique[key] = metrics[i];
- } else {
- assert(unique[key]->IsCompatible(metrics[i]));
- unique[key]->Join(metrics[i]);
- delete metrics[i];
- }
- }
-
- metrics.clear();
-
- for (std::map<std::string, BlockMetrics*>::iterator it = unique.begin();
- it != unique.end(); ++it) {
- metrics.push_back(it->second);
- }
-}
-}
-void DBImpl::FlushMetrics(void* db) {
- DBImpl* dbimpl = reinterpret_cast<DBImpl*>(db);
-
- while (true) {
- std::vector<std::vector<BlockMetrics*>*> unflushed_metrics;
-
- // Get unflushed metrics.
- {
- MutexLock l(&dbimpl->mutex_);
-
- if (dbimpl->unflushed_metrics_.empty()) {
- dbimpl->bg_flushing_metrics_scheduled_ = false;
- dbimpl->bg_cv_.SignalAll();
- return;
- }
-
- unflushed_metrics = dbimpl->unflushed_metrics_;
- dbimpl->unflushed_metrics_.clear();
- }
-
- // Join all the metrics together in a single vector.
- std::vector<BlockMetrics*> metrics;
- for (size_t i = 0; i < unflushed_metrics.size(); ++i) {
- metrics.insert(metrics.end(), unflushed_metrics[i]->begin(),
- unflushed_metrics[i]->end());
- delete unflushed_metrics[i];
- }
-
- CompactMetrics(metrics);
-
- // Flush metrics to database.
- DB* metrics_db = dbimpl->metrics_db_;
- for (size_t i = 0; i < metrics.size(); ++i) {
- assert(metrics[i] != NULL);
- const std::string& key = metrics[i]->GetDBKey();
- // TODO: fix this code by some means so that we don't lose metrics
- // already in the database. Using a read-update-write cycle is far
- // too slow so we temporarily replaced it with an overwrite, with
- // the old code left commented out to show the ideal logic.
- metrics_db->Put(WriteOptions(), key, metrics[i]->GetDBValue());
- /*
- BlockMetrics* db_metrics = NULL;
-
- std::string db_value;
- Status s = metrics_db->Get(ReadOptions(), key, &db_value);
-
- if (s.ok()) {
- db_metrics = BlockMetrics::Create(key, db_value);
- }
-
- // We check if metrics[i] and db_metrics are compatible because
- // the metrics in the metrics db might have gotten corrupted.
- if (db_metrics != NULL && metrics[i]->IsCompatible(db_metrics)) {
- db_metrics->Join(metrics[i]);
- metrics_db->Put(WriteOptions(), key, db_metrics->GetDBValue());
- } else {
- metrics_db->Put(WriteOptions(), key, metrics[i]->GetDBValue());
- // TODO: Log an error here.
- }
- delete db_metrics;
- */
- }
-
- for (size_t i = 0; i < metrics.size(); ++i) {
- delete metrics[i];
- }
- }
-}
-
-void DBImpl::HandleMetrics(void* db, std::vector<BlockMetrics*>* metrics) {
- DBImpl* dbimpl = reinterpret_cast<DBImpl*>(db);
-
- // If we are shutting down or we are not a hot-cold db discard metrics.
- //
- // is_hotcold_ is constant after construction and shutting_down_ is an atomic
- // so we don't need to acquire mutex_ .
- if (!dbimpl->is_hotcold_ || dbimpl->shutting_down_.Acquire_Load()) {
- for (size_t i = 0; i < metrics->size(); ++i) {
- delete (*metrics)[i];
- }
- delete metrics;
-
- return;
- }
-
- MutexLock l(&dbimpl->mutex_);
- dbimpl->unflushed_metrics_.push_back(metrics);
-
- if (!dbimpl->bg_flushing_metrics_scheduled_) {
- dbimpl->bg_flushing_metrics_scheduled_ = true;
- dbimpl->env_->Schedule(&DBImpl::FlushMetrics, dbimpl);
- }
-}
-
-bool DBImpl::TEST_IsHot(const Iterator* iter) {
- assert(is_hotcold_);
- assert(iter != NULL);
-
- BlockMetrics* bm = NULL;
- ReadOptions metrics_read_options;
- bool result = IsRecordHot(iter, metrics_db_, metrics_read_options, &bm);
- delete bm;
-
- return result;
-}
-
-bool DBImpl::TEST_IsHotCold() {
- return is_hotcold_;
-}
-
//
// Given a sequence number, return the sequence number of the
// earliest snapshot that this sequence number is visible in.
return versions_->MaxNextLevelOverlappingBytes();
}
+
+// Number of metrics that are stored they get flushed to the
+// metrics db.
+size_t kMetricsFlushThreshold = 10000u;
+
+namespace {
+// Helper function that takes a vector of BlockMetrics and joins all the
+// metrics together that share the same key.
+//
+// Assumes it has exclusive access to metrics.
+void CompactMetrics(std::vector<BlockMetrics*>& metrics) {
+ std::map<std::string, BlockMetrics*> unique;
+
+ for (size_t i = 0; i < metrics.size(); ++i) {
+ assert(metrics[i] != NULL);
+ const std::string& key = metrics[i]->GetDBKey();
+
+ if (unique.count(key) == 0) {
+ unique[key] = metrics[i];
+ } else {
+ assert(unique[key]->IsCompatible(metrics[i]));
+ unique[key]->Join(metrics[i]);
+ delete metrics[i];
+ }
+ }
+
+ metrics.clear();
+
+ for (std::map<std::string, BlockMetrics*>::iterator it = unique.begin();
+ it != unique.end(); ++it) {
+ metrics.push_back(it->second);
+ }
+}
+}
+void DBImpl::FlushMetrics(void* db) {
+ DBImpl* dbimpl = reinterpret_cast<DBImpl*>(db);
+
+ while (true) {
+ std::vector<std::vector<BlockMetrics*>*> unflushed_metrics;
+
+ // Get unflushed metrics.
+ {
+ MutexLock l(&dbimpl->mutex_);
+
+ if (dbimpl->unflushed_metrics_.empty()) {
+ dbimpl->bg_flushing_metrics_scheduled_ = false;
+ dbimpl->bg_cv_.SignalAll();
+ return;
+ }
+
+ unflushed_metrics = dbimpl->unflushed_metrics_;
+ dbimpl->unflushed_metrics_.clear();
+ }
+
+ // Join all the metrics together in a single vector.
+ std::vector<BlockMetrics*> metrics;
+ for (size_t i = 0; i < unflushed_metrics.size(); ++i) {
+ metrics.insert(metrics.end(), unflushed_metrics[i]->begin(),
+ unflushed_metrics[i]->end());
+ delete unflushed_metrics[i];
+ }
+
+ CompactMetrics(metrics);
+
+ // Flush metrics to database.
+ DB* metrics_db = dbimpl->metrics_db_;
+ for (size_t i = 0; i < metrics.size(); ++i) {
+ assert(metrics[i] != NULL);
+ const std::string& key = metrics[i]->GetDBKey();
+ // TODO: fix this code by some means so that we don't lose metrics
+ // already in the database. Using a read-update-write cycle is far
+ // too slow so we temporarily replaced it with an overwrite, with
+ // the old code left commented out to show the ideal logic.
+ metrics_db->Put(WriteOptions(), key, metrics[i]->GetDBValue());
+ /*
+ BlockMetrics* db_metrics = NULL;
+
+ std::string db_value;
+ Status s = metrics_db->Get(ReadOptions(), key, &db_value);
+
+ if (s.ok()) {
+ db_metrics = BlockMetrics::Create(key, db_value);
+ }
+
+ // We check if metrics[i] and db_metrics are compatible because
+ // the metrics in the metrics db might have gotten corrupted.
+ if (db_metrics != NULL && metrics[i]->IsCompatible(db_metrics)) {
+ db_metrics->Join(metrics[i]);
+ metrics_db->Put(WriteOptions(), key, db_metrics->GetDBValue());
+ } else {
+ metrics_db->Put(WriteOptions(), key, metrics[i]->GetDBValue());
+ // TODO: Log an error here.
+ }
+ delete db_metrics;
+ */
+ }
+
+ for (size_t i = 0; i < metrics.size(); ++i) {
+ delete metrics[i];
+ }
+ }
+}
+
+void DBImpl::FlushTempMetrics() {
+ mutex_.AssertHeld();
+ assert(is_hotcold_);
+
+ unflushed_metrics_.push_back(tmp_metrics_store_);
+ tmp_metrics_store_ = new std::vector<BlockMetrics*>();
+
+ if (!bg_flushing_metrics_scheduled_) {
+ bg_flushing_metrics_scheduled_ = true;
+ env_->Schedule(&DBImpl::FlushMetrics, this);
+ }
+}
+
+bool DBImpl::TEST_IsHot(const Iterator* iter) {
+ assert(is_hotcold_);
+ assert(iter != NULL);
+
+ BlockMetrics* bm = NULL;
+ ReadOptions metrics_read_options;
+ bool result = IsRecordHot(iter, metrics_db_, metrics_read_options, &bm);
+ delete bm;
+
+ return result;
+}
+
+bool DBImpl::TEST_IsHotCold() {
+ return is_hotcold_;
+}
+
Status DBImpl::Get(const ReadOptions& options,
const Slice& key,
std::string* value) {
bool have_stat_update = false;
Version::GetStats stats;
+ BlockMetrics* bm = nullptr;
// Unlock while reading from files and memtables
{
mutex_.Unlock();
} else {
ReadOptions read_options = options;
if (read_options.record_accesses && is_hotcold_) {
- read_options.metrics_handler = this;
+ read_options.metrics_instance = &bm;
}
// If the database is hot-cold then we know we can use short circuiting
// as the compaction logic guarantees that this will be valid.
}
mutex_.Lock();
}
+ if (bm != nullptr) {
+ assert(is_hotcold_);
+ assert(tmp_metrics_store_ != nullptr);
+ tmp_metrics_store_->push_back(bm);
+
+ if (tmp_metrics_store_->size() >= kMetricsFlushThreshold) {
+ FlushTempMetrics();
+ }
+ }
if (!options_.disable_seek_compaction &&
have_stat_update && current->UpdateStats(stats)) {
DB* metrics_db = NULL;
if (with_hotcold) {
- if (options.no_block_cache) {
- return Status::InvalidArgument("Cannot have HotCold seperation when "
- "no_block_cache is true.");
- }
-
// Creates directory for db in case it might not create as we don't create
// intermediate directories.
options.env->CreateDir(dbname);
// This gets run in the background thread to handle flushing metrics we
// receive from the cache to metrics_db_. Only up to a single instance of this
// thread will run at a time per DBImpl instance.
- static void FlushMetrics(void*);
- // This is the callback function that gets passed to Cache to handle metrics.
- static void HandleMetrics(void* db, std::vector<BlockMetrics*>* metrics);
+ static void FlushMetrics(void* db);
+ // Flushes tmp_metrics_store_ to disk.
+ void FlushTempMetrics();
// Returns the list of live files in 'live' and the list
// of all files in the filesystem in 'allfiles'.
unique_ptr<log::Writer> log_;
// Metrics that have been received from the cache, but have not yet been
// flushed to metrics_db_.
+ std::vector<BlockMetrics*>* tmp_metrics_store_;
std::vector<std::vector<BlockMetrics*>*> unflushed_metrics_;
std::string host_name_;
ReadOptions read_options = options;
if (level < vset_->options_->min_hotcold_level) {
- read_options.metrics_handler = nullptr;
+ read_options.metrics_instance = nullptr;
}
for (uint32_t i = 0; i < files.size(); ++i) {
#include <memory>
#include <stdint.h>
-#include <vector>
#include "leveldb/slice.h"
namespace leveldb {
extern shared_ptr<Cache> NewLRUCache(size_t capacity);
extern shared_ptr<Cache> NewLRUCache(size_t capacity, int numShardBits);
-class BlockMetrics;
-
class Cache {
public:
Cache() { }
// returns the maximum configured capacity of the cache
virtual size_t GetCapacity() = 0;
-
- // Releases the handle and passes the metrics to the metrics handler
- // that was added with the AddHandler() method.
- // If handler was not registered then metrics is deleted.
- virtual void ReleaseAndRecordMetrics(Cache::Handle* handle, void* handler,
- BlockMetrics* metrics);
-
- // Adds a handler for a std::vector of metrics. Whenever enough metrics are
- // acquired by ReleaseAndRecordMetrics() handler_func is called. The first
- // parameter for the callback function is the handler itself.
- // Note that handler_func can get called on any thread and hence must be
- // thread-safe.
- //
- // REQUIRES: handler != NULL
- // REQUIRES: handler_func != NULL
- // REQUIRES: hander hasn't been added before. (handler can get re-added after
- // it was removed by RemoveHandler())
- virtual void AddHandler(
- void* handler,
- void (*handler_func)(void*, std::vector<BlockMetrics*>*));
-
- // Removes a handler. This call flushes all the cached metrics before
- // unregistering the handler.
- //
- // REQUIRES: handler has been added with AddHandler()
- virtual void RemoveHandler(void* handler);
-
- // Forcefully flushes all metrics currently held by the cache.
- virtual void ForceFlushMetrics();
-
private:
void LRU_Remove(Handle* e);
void LRU_Append(Handle* e);
size_t manifest_preallocation_size;
};
+class BlockMetrics;
// Options that control read operations
struct ReadOptions {
// If true, all data read from underlying storage will be
fill_cache(true),
snapshot(NULL),
record_accesses(true),
- metrics_handler(NULL) {
+ metrics_instance(nullptr) {
}
ReadOptions(bool cksum, bool cache) :
verify_checksums(cksum), fill_cache(cache),
snapshot(NULL),
record_accesses(true),
- metrics_handler(NULL) {
+ metrics_instance(nullptr) {
}
// Internal parameters
- void* metrics_handler;
+ BlockMetrics** metrics_instance;
};
// Options that control write operations
// This is the iterator returned by Block::NewMetricsIterator() on success.
class Block::MetricsIter : public Block::Iter {
private:
- Cache* cache_;
- Cache::Handle* cache_handle_;
- void* metrics_handler_;
- BlockMetrics* metrics_;
+ BlockMetrics** metrics_instance_;
const InternalKeyComparator* icmp_;
public:
const char* data,
uint32_t restarts,
uint32_t num_restarts,
- Cache* cache,
- Cache::Handle* cache_handle,
- void* metrics_handler)
+ BlockMetrics** metrics_instance)
: Block::Iter(comparator, file_number, block_offset, data, restarts,
num_restarts),
- cache_(cache),
- cache_handle_(cache_handle),
- metrics_handler_(metrics_handler),
- metrics_(NULL),
+ metrics_instance_(metrics_instance),
icmp_(dynamic_cast<const InternalKeyComparator*>(comparator_)) {
}
virtual ~MetricsIter() {
- if (metrics_) {
- cache_->ReleaseAndRecordMetrics(cache_handle_, metrics_handler_,
- metrics_);
- } else {
- cache_->Release(cache_handle_);
- }
}
virtual void Seek(const Slice& target) {
private:
void RecordAccess() {
assert(Valid());
- if (metrics_ == nullptr) {
- metrics_ = new BlockMetrics(file_number_, block_offset_,
- num_restarts_, kBytesPerRestart);
+ if (*metrics_instance_ == nullptr) {
+ *metrics_instance_ = new BlockMetrics(file_number_, block_offset_,
+ num_restarts_, kBytesPerRestart);
}
- metrics_->RecordAccess(restart_index_, restart_offset_);
+ (*metrics_instance_)->RecordAccess(restart_index_, restart_offset_);
}
};
Iterator* Block::NewMetricsIterator(const Comparator* cmp,
uint64_t file_number,
uint64_t block_offset,
- Cache* cache,
- Cache::Handle* cache_handle,
- void* metrics_handler) {
- assert(cache != nullptr);
- assert(cache_handle != nullptr);
- assert(metrics_handler != nullptr);
-
- Iterator* iter = nullptr;
+ BlockMetrics** metrics_instance) {
+ assert(metrics_instance != nullptr);
+
if (size_ < 2*sizeof(uint32_t)) {
- iter = NewErrorIterator(Status::Corruption("bad block contents"));
+ return NewErrorIterator(Status::Corruption("bad block contents"));
}
const uint32_t num_restarts = NumRestarts();
if (num_restarts == 0) {
- iter = NewEmptyIterator();
- }
-
- if (iter == nullptr) {
+ return NewEmptyIterator();
+ } else {
// MetricsIter takes ownership of the cache handle and will delete it.
- iter = new MetricsIter(cmp, file_number, block_offset, data_,
+ return new MetricsIter(cmp, file_number, block_offset, data_,
restart_offset_, num_restarts,
- cache, cache_handle, metrics_handler);
- } else {
- // Release the cache handle as neither the error iterator nor the empty
- // iterator need it's contents.
- cache->Release(cache_handle);
+ metrics_instance);
}
-
- return iter;
}
bool Block::GetBlockIterInfo(const Iterator* iter,
uint64_t file_number,
uint64_t block_offset);
- // Creates a new iterator that keeps track of accesses. When this iterator is
- // deleted it frees the cache handle and passes the metrics to the cache
- // specified.
- // REQUIRES: cache, cache_handle, metrics_handler must be non-NULL
+ // Creates a new iterator that keeps track of accesses.
+ // REQUIRES: metrics_instance must be non-NULL
Iterator* NewMetricsIterator(const Comparator* comparator,
uint64_t file_number,
uint64_t block_offset,
- Cache* cache,
- Cache::Handle* cache_handle,
- void* metrics_handler);
+ BlockMetrics** metrics_instance);
// Returns true if iter is a Block iterator and also knows that which file
// and block it belongs to.
delete iter;
}
-class MockCache : public Cache {
- public:
- bool generated_metrics;
- bool was_released;
-
- public:
- MockCache() {
- Reset();
- }
-
- // Stub implementations so class compiles
- virtual Cache::Handle* Insert(const Slice& key, void* value, size_t charge,
- void (*deleter)(const Slice& key, void* value)) {
- return nullptr;
- }
- virtual Cache::Handle* Lookup(const Slice& key) { return nullptr; }
- virtual void* Value(Cache::Handle* handle) { return nullptr; }
- virtual void Erase(const Slice& key) {}
- virtual uint64_t NewId() { return 4;}
- virtual size_t GetCapacity() { return 0; }
-
- virtual void Release(Cache::Handle* handle) {
- generated_metrics = false;
- was_released = true;
- }
-
- virtual void ReleaseAndRecordMetrics(Cache::Handle* handle, void* handler,
- BlockMetrics* metrics) {
- generated_metrics = metrics != nullptr;
- delete metrics;
- was_released = true;
- }
-
- void Reset() {
- generated_metrics = false;
- was_released = false;
- }
-};
-
TEST(BlockTest, MetricsIter) {
Random rnd(301);
Options options = Options();
contents.heap_allocated = false;
Block reader(contents);
- MockCache c;
- Cache::Handle* ch = reinterpret_cast<Cache::Handle*>(&c);
+ BlockMetrics* bm = nullptr;;
- c.Reset();
Iterator* iter = reader.NewMetricsIterator(options.comparator, 0, 0,
- &c, ch, &c);
+ &bm);
delete iter;
- ASSERT_TRUE(c.was_released);
- ASSERT_TRUE(!c.generated_metrics) << "needlessly generated metrics.\n";
+ ASSERT_TRUE(bm == nullptr) << "needlessly generated metrics.\n";
for (int i = 1; i < num_records; i += 2) {
- c.Reset();
- iter = reader.NewMetricsIterator(options.comparator, 0, 0, &c, ch, &c);
+ iter = reader.NewMetricsIterator(options.comparator, 0, 0, &bm);
iter->Seek(keys[i]);
delete iter;
- ASSERT_TRUE(c.was_released);
- ASSERT_TRUE(!c.generated_metrics) << "generated metrics for unfound row.\n";
+ ASSERT_TRUE(bm == nullptr) << "generated metrics for unfound row.\n";
}
- c.Reset();
- iter = reader.NewMetricsIterator(options.comparator, 0, 0, &c, ch, &c);
+ iter = reader.NewMetricsIterator(options.comparator, 0, 0, &bm);
iter->SeekToFirst();
iter->Next();
iter->SeekToLast();
iter->Prev();
delete iter;
- ASSERT_TRUE(c.was_released);
- ASSERT_TRUE(!c.generated_metrics) << "generated metrics for non-Seek().\n";
+ ASSERT_TRUE(bm == nullptr) << "generated metrics for non-Seek().\n";
for (int i = 0; i < num_records; i += 2) {
- c.Reset();
- iter = reader.NewMetricsIterator(options.comparator, 0, 0, &c, ch, &c);
+ iter = reader.NewMetricsIterator(options.comparator, 0, 0, &bm);
iter->Seek(keys[i]);
delete iter;
- ASSERT_TRUE(c.was_released);
- ASSERT_TRUE(c.generated_metrics) << "didn't generate metrics for found row\n";
+ ASSERT_TRUE(bm != nullptr) << "didn't generate metrics for found row\n";
+ delete bm; bm = nullptr;
}
- c.Reset();
- iter = reader.NewMetricsIterator(options.comparator, 0, 0, &c, ch, &c);
+ iter = reader.NewMetricsIterator(options.comparator, 0, 0, &bm);
iter->Seek(InternalKey(" 2", 140, kTypeDeletion).Encode());
delete iter;
- ASSERT_TRUE(c.was_released);
- ASSERT_TRUE(c.generated_metrics) << "didn't generate metrics for Seek().\n";
+ ASSERT_TRUE(bm != nullptr) << "didn't generate metrics for Seek().\n";
+ delete bm; bm = nullptr;
}
class BlockMetricsTest {
Iterator* iter;
if (block != NULL) {
- if (options.metrics_handler == NULL || cache_handle == NULL) {
+ if (options.metrics_instance == NULL || cache_handle == NULL) {
iter = block->NewIterator(table->rep_->options.comparator,
table->rep_->file_number,
handle.offset());
- if (cache_handle == NULL) {
- iter->RegisterCleanup(&DeleteBlock, block, NULL);
- } else {
- iter->RegisterCleanup(&ReleaseBlock, block_cache, cache_handle);
- }
} else {
iter = block->NewMetricsIterator(table->rep_->options.comparator,
table->rep_->file_number,
handle.offset(),
- block_cache,
- cache_handle,
- options.metrics_handler);
+ options.metrics_instance);
+ }
+
+ if (cache_handle == NULL) {
+ iter->RegisterCleanup(&DeleteBlock, block, NULL);
+ } else {
+ iter->RegisterCleanup(&ReleaseBlock, block_cache, cache_handle);
}
} else {
iter = NewErrorIterator(s);
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include <assert.h>
-#include <map>
#include <stdio.h>
#include <stdlib.h>
-#include <vector>
#include "leveldb/cache.h"
#include "port/port.h"
-#include "table/block.h"
#include "util/hash.h"
#include "util/mutexlock.h"
Cache::~Cache() {
}
-// Stub methods to not break backwards compatibility.
-void Cache::ReleaseAndRecordMetrics(Cache::Handle* handle, void* handler,
- BlockMetrics* metrics) {
- delete metrics;
-}
-void Cache::AddHandler(
- void* handler,
- void (*handler_func)(void*, std::vector<BlockMetrics*>*)) {
-}
-void Cache::RemoveHandler(void* handler) {
-}
-void Cache::ForceFlushMetrics() {
-}
-
-
namespace {
// LRU cache implementation
}
};
-// Number of metrics that are stored per shard before they get flushed to the
-// handler_func.
-size_t kMetricsFlushThreshold = 10000u;
-
// A single shard of sharded cache.
class LRUCache {
public:
LRUCache();
- virtual ~LRUCache();
+ ~LRUCache();
// Separate from constructor so caller can easily make an array of LRUCache
void SetCapacity(size_t capacity) { capacity_ = capacity; }
void Release(Cache::Handle* handle);
void Erase(const Slice& key, uint32_t hash);
-
- void ReleaseAndRecordMetrics(Cache::Handle* handle, void* handler,
- BlockMetrics* metrics);
- void AddHandler(
- void* handler,
- void (*handler_func)(void*, std::vector<BlockMetrics*>*));
- void RemoveHandler(void* handler);
- void ForceFlushMetrics();
-
private:
void LRU_Remove(LRUHandle* e);
void LRU_Append(LRUHandle* e);
LRUHandle lru_;
HandleTable table_;
-
- // Stores handler_funcs for the handlers.
- std::map<void*, void (*)(void*, std::vector<BlockMetrics*>*)> handlers_;
- // Stores the cache of metrics for specific handlers.
- std::map<void*, std::vector<BlockMetrics*>*> metrics_store_;
};
LRUCache::LRUCache()
}
LRUCache::~LRUCache() {
- // Flush cached metrics to the handlers.
- std::map<void*, std::vector<BlockMetrics*>*>::iterator it;
- for (it = metrics_store_.begin();
- it != metrics_store_.end(); ++it) {
- void* handler = it->first;
- (*handlers_[handler])(handler, metrics_store_[handler]);
- }
-
for (LRUHandle* e = lru_.next; e != &lru_; ) {
LRUHandle* next = e->next;
assert(e->refs == 1); // Error if caller has an unreleased handle
}
}
-void LRUCache::ReleaseAndRecordMetrics(Cache::Handle* handle, void* handler,
- BlockMetrics* metrics) {
- MutexLock l(&mutex_);
-
- // Stores the metrics in the cache for the handler if handler has been added;
- // otherwise, we just delete it.
- if (metrics_store_.count(handler) != 0) {
- metrics_store_[handler]->push_back(metrics);
- if (metrics_store_[handler]->size() >= kMetricsFlushThreshold) {
- (*handlers_[handler])(handler, metrics_store_[handler]);
- metrics_store_[handler] = new std::vector<BlockMetrics*>();
- }
- } else {
- delete metrics;
- }
-
- Unref(reinterpret_cast<LRUHandle*>(handle));
-}
-
-void LRUCache::AddHandler(
- void* handler,
- void (*handler_func)(void*, std::vector<BlockMetrics*>*)) {
- assert(handler != NULL);
- assert(handler_func != NULL);
- assert(handlers_.count(handler) == 0);
- assert(metrics_store_.count(handler) == 0);
-
- MutexLock l(&mutex_);
-
- handlers_[handler] = handler_func;
- metrics_store_[handler] = new std::vector<BlockMetrics*>();
-}
-
-void LRUCache::RemoveHandler(void* handler) {
- assert(handlers_.count(handler) != 0);
- assert(metrics_store_.count(handler) != 0);
- MutexLock l(&mutex_);
-
- (*handlers_[handler])(handler, metrics_store_[handler]);
- handlers_.erase(handler);
- metrics_store_.erase(handler);
-}
-
-void LRUCache::ForceFlushMetrics() {
- std::map<void*, std::vector<BlockMetrics*>*>::iterator it;
- for (it = metrics_store_.begin();
- it != metrics_store_.end(); ++it) {
- void* handler = it->first;
- (*handlers_[handler])(handler, metrics_store_[handler]);
- metrics_store_[handler] = new std::vector<BlockMetrics*>();
- }
-}
-
static int kNumShardBits = 4; // default values, can be overridden
class ShardedLRUCache : public Cache {
port::Mutex id_mutex_;
uint64_t last_id_;
int numShardBits;
- size_t numShards_;
size_t capacity_;
static inline uint32_t HashSlice(const Slice& s) {
void init(size_t capacity, int numbits) {
numShardBits = numbits;
capacity_ = capacity;
- numShards_ = 1u << numShardBits;
- shard_ = new LRUCache[numShards_];
- const size_t per_shard = (capacity + (numShards_ - 1)) / numShards_;
- for (size_t s = 0; s < numShards_; s++) {
+ int numShards = 1 << numShardBits;
+ shard_ = new LRUCache[numShards];
+ const size_t per_shard = (capacity + (numShards - 1)) / numShards;
+ for (int s = 0; s < numShards; s++) {
shard_[s].SetCapacity(per_shard);
}
}
virtual uint64_t GetCapacity() {
return capacity_;
}
-
- void ReleaseAndRecordMetrics(Handle* handle, void* handler,
- BlockMetrics* metrics) {
- LRUHandle* h = reinterpret_cast<LRUHandle*>(handle);
-
- shard_[Shard(h->hash)].ReleaseAndRecordMetrics(handle, handler, metrics);
- }
-
- void AddHandler(void* handler,
- void (*handler_func)(void*, std::vector<BlockMetrics*>*)) {
- for (size_t i = 0; i < numShards_; ++i) {
- shard_[i].AddHandler(handler, handler_func);
- }
- }
- void RemoveHandler(void* handler) {
- for (size_t i = 0; i < numShards_; ++i) {
- shard_[i].RemoveHandler(handler);
- }
- }
-
- void ForceFlushMetrics() {
- for (size_t i = 0; i < numShards_; ++i) {
- shard_[i].ForceFlushMetrics();
- }
- }
};
} // end anonymous namespace