From: Kosie van der Merwe Date: Fri, 15 Feb 2013 20:10:00 +0000 (-0800) Subject: Removed Cache from metrics flow for Block::MetricsIter to DBImpl X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=2c0dde198c5804ca48f27d6c512c57dee45f06a0;p=rocksdb.git Removed Cache from metrics flow for Block::MetricsIter to DBImpl Summary: Before this patch we had to send metrics to `Cache`, which then sent the metrics to `DBImpl`. Now we directly send them to `DBImpl`, by the use of a pointer to a pointer of a `BlockMetrics` instance. We can do this safely and without additional locks because: * `Block::MetricsIter` runs in the same thread as `DBImpl::Get()` (in which the `BlockMetrics` pointer resides) * `DBImpl::Get()` locks `DBImpl` again after the `Block::MetricsIter` is destroyed, so we don't have to do an additional lock. Test Plan: make check Reviewers: vamsi, dhruba Reviewed By: vamsi CC: leveldb Differential Revision: https://reviews.facebook.net/D8583 --- diff --git a/db/db_impl.cc b/db/db_impl.cc index 86b2c30d..857c2a28 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -187,6 +187,7 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname, bg_cv_(&mutex_), mem_(new MemTable(internal_comparator_, NumberLevels())), logfile_number_(0), + tmp_metrics_store_(new std::vector()), tmp_batch_(new WriteBatch), bg_compaction_scheduled_(0), bg_logstats_scheduled_(false), @@ -229,13 +230,6 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname, host_name_ = "localhost"; } last_log_ts = 0; - - // Set up metrics taking - if (is_hotcold_) { - assert(options_.block_cache != NULL); - - options_.block_cache->AddHandler(this, &DBImpl::HandleMetrics); - } } DBImpl::~DBImpl() { @@ -243,10 +237,16 @@ DBImpl::~DBImpl() { if (flush_on_destroy_) { FlushMemTable(FlushOptions()); } + mutex_.Lock(); + assert(tmp_metrics_store_ != nullptr); if (is_hotcold_) { - options_.block_cache->RemoveHandler(this); + for (BlockMetrics* bm: *tmp_metrics_store_) { + delete bm; + } + tmp_metrics_store_->clear(); } - mutex_.Lock(); + assert(tmp_metrics_store_->empty()); + delete tmp_metrics_store_; shutting_down_.Release_Store(this); // Any non-NULL value is ok while (bg_compaction_scheduled_ || bg_logstats_scheduled_ || bg_flushing_metrics_scheduled_) { @@ -1156,7 +1156,9 @@ void DBImpl::TEST_ForceFlushMetrics() { assert(is_hotcold_); assert(options_.block_cache != NULL); - options_.block_cache->ForceFlushMetrics(); + mutex_.Lock(); + FlushTempMetrics(); + mutex_.Unlock(); TEST_WaitForMetricsFlush(); } @@ -1632,145 +1634,6 @@ Status DBImpl::InstallCompactionResults(CompactionState* compact) { return versions_->LogAndApply(compact->compaction->edit(), &mutex_); } -namespace { -// Helper function that takes a vector of BlockMetrics and joins all the -// metrics together that share the same key. -// -// Assumes it has exclusive access to metrics. -void CompactMetrics(std::vector& metrics) { - std::map unique; - - for (size_t i = 0; i < metrics.size(); ++i) { - assert(metrics[i] != NULL); - const std::string& key = metrics[i]->GetDBKey(); - - if (unique.count(key) == 0) { - unique[key] = metrics[i]; - } else { - assert(unique[key]->IsCompatible(metrics[i])); - unique[key]->Join(metrics[i]); - delete metrics[i]; - } - } - - metrics.clear(); - - for (std::map::iterator it = unique.begin(); - it != unique.end(); ++it) { - metrics.push_back(it->second); - } -} -} -void DBImpl::FlushMetrics(void* db) { - DBImpl* dbimpl = reinterpret_cast(db); - - while (true) { - std::vector*> unflushed_metrics; - - // Get unflushed metrics. - { - MutexLock l(&dbimpl->mutex_); - - if (dbimpl->unflushed_metrics_.empty()) { - dbimpl->bg_flushing_metrics_scheduled_ = false; - dbimpl->bg_cv_.SignalAll(); - return; - } - - unflushed_metrics = dbimpl->unflushed_metrics_; - dbimpl->unflushed_metrics_.clear(); - } - - // Join all the metrics together in a single vector. - std::vector metrics; - for (size_t i = 0; i < unflushed_metrics.size(); ++i) { - metrics.insert(metrics.end(), unflushed_metrics[i]->begin(), - unflushed_metrics[i]->end()); - delete unflushed_metrics[i]; - } - - CompactMetrics(metrics); - - // Flush metrics to database. - DB* metrics_db = dbimpl->metrics_db_; - for (size_t i = 0; i < metrics.size(); ++i) { - assert(metrics[i] != NULL); - const std::string& key = metrics[i]->GetDBKey(); - // TODO: fix this code by some means so that we don't lose metrics - // already in the database. Using a read-update-write cycle is far - // too slow so we temporarily replaced it with an overwrite, with - // the old code left commented out to show the ideal logic. - metrics_db->Put(WriteOptions(), key, metrics[i]->GetDBValue()); - /* - BlockMetrics* db_metrics = NULL; - - std::string db_value; - Status s = metrics_db->Get(ReadOptions(), key, &db_value); - - if (s.ok()) { - db_metrics = BlockMetrics::Create(key, db_value); - } - - // We check if metrics[i] and db_metrics are compatible because - // the metrics in the metrics db might have gotten corrupted. - if (db_metrics != NULL && metrics[i]->IsCompatible(db_metrics)) { - db_metrics->Join(metrics[i]); - metrics_db->Put(WriteOptions(), key, db_metrics->GetDBValue()); - } else { - metrics_db->Put(WriteOptions(), key, metrics[i]->GetDBValue()); - // TODO: Log an error here. - } - delete db_metrics; - */ - } - - for (size_t i = 0; i < metrics.size(); ++i) { - delete metrics[i]; - } - } -} - -void DBImpl::HandleMetrics(void* db, std::vector* metrics) { - DBImpl* dbimpl = reinterpret_cast(db); - - // If we are shutting down or we are not a hot-cold db discard metrics. - // - // is_hotcold_ is constant after construction and shutting_down_ is an atomic - // so we don't need to acquire mutex_ . - if (!dbimpl->is_hotcold_ || dbimpl->shutting_down_.Acquire_Load()) { - for (size_t i = 0; i < metrics->size(); ++i) { - delete (*metrics)[i]; - } - delete metrics; - - return; - } - - MutexLock l(&dbimpl->mutex_); - dbimpl->unflushed_metrics_.push_back(metrics); - - if (!dbimpl->bg_flushing_metrics_scheduled_) { - dbimpl->bg_flushing_metrics_scheduled_ = true; - dbimpl->env_->Schedule(&DBImpl::FlushMetrics, dbimpl); - } -} - -bool DBImpl::TEST_IsHot(const Iterator* iter) { - assert(is_hotcold_); - assert(iter != NULL); - - BlockMetrics* bm = NULL; - ReadOptions metrics_read_options; - bool result = IsRecordHot(iter, metrics_db_, metrics_read_options, &bm); - delete bm; - - return result; -} - -bool DBImpl::TEST_IsHotCold() { - return is_hotcold_; -} - // // Given a sequence number, return the sequence number of the // earliest snapshot that this sequence number is visible in. @@ -2145,6 +2008,138 @@ int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes() { return versions_->MaxNextLevelOverlappingBytes(); } + +// Number of metrics that are stored they get flushed to the +// metrics db. +size_t kMetricsFlushThreshold = 10000u; + +namespace { +// Helper function that takes a vector of BlockMetrics and joins all the +// metrics together that share the same key. +// +// Assumes it has exclusive access to metrics. +void CompactMetrics(std::vector& metrics) { + std::map unique; + + for (size_t i = 0; i < metrics.size(); ++i) { + assert(metrics[i] != NULL); + const std::string& key = metrics[i]->GetDBKey(); + + if (unique.count(key) == 0) { + unique[key] = metrics[i]; + } else { + assert(unique[key]->IsCompatible(metrics[i])); + unique[key]->Join(metrics[i]); + delete metrics[i]; + } + } + + metrics.clear(); + + for (std::map::iterator it = unique.begin(); + it != unique.end(); ++it) { + metrics.push_back(it->second); + } +} +} +void DBImpl::FlushMetrics(void* db) { + DBImpl* dbimpl = reinterpret_cast(db); + + while (true) { + std::vector*> unflushed_metrics; + + // Get unflushed metrics. + { + MutexLock l(&dbimpl->mutex_); + + if (dbimpl->unflushed_metrics_.empty()) { + dbimpl->bg_flushing_metrics_scheduled_ = false; + dbimpl->bg_cv_.SignalAll(); + return; + } + + unflushed_metrics = dbimpl->unflushed_metrics_; + dbimpl->unflushed_metrics_.clear(); + } + + // Join all the metrics together in a single vector. + std::vector metrics; + for (size_t i = 0; i < unflushed_metrics.size(); ++i) { + metrics.insert(metrics.end(), unflushed_metrics[i]->begin(), + unflushed_metrics[i]->end()); + delete unflushed_metrics[i]; + } + + CompactMetrics(metrics); + + // Flush metrics to database. + DB* metrics_db = dbimpl->metrics_db_; + for (size_t i = 0; i < metrics.size(); ++i) { + assert(metrics[i] != NULL); + const std::string& key = metrics[i]->GetDBKey(); + // TODO: fix this code by some means so that we don't lose metrics + // already in the database. Using a read-update-write cycle is far + // too slow so we temporarily replaced it with an overwrite, with + // the old code left commented out to show the ideal logic. + metrics_db->Put(WriteOptions(), key, metrics[i]->GetDBValue()); + /* + BlockMetrics* db_metrics = NULL; + + std::string db_value; + Status s = metrics_db->Get(ReadOptions(), key, &db_value); + + if (s.ok()) { + db_metrics = BlockMetrics::Create(key, db_value); + } + + // We check if metrics[i] and db_metrics are compatible because + // the metrics in the metrics db might have gotten corrupted. + if (db_metrics != NULL && metrics[i]->IsCompatible(db_metrics)) { + db_metrics->Join(metrics[i]); + metrics_db->Put(WriteOptions(), key, db_metrics->GetDBValue()); + } else { + metrics_db->Put(WriteOptions(), key, metrics[i]->GetDBValue()); + // TODO: Log an error here. + } + delete db_metrics; + */ + } + + for (size_t i = 0; i < metrics.size(); ++i) { + delete metrics[i]; + } + } +} + +void DBImpl::FlushTempMetrics() { + mutex_.AssertHeld(); + assert(is_hotcold_); + + unflushed_metrics_.push_back(tmp_metrics_store_); + tmp_metrics_store_ = new std::vector(); + + if (!bg_flushing_metrics_scheduled_) { + bg_flushing_metrics_scheduled_ = true; + env_->Schedule(&DBImpl::FlushMetrics, this); + } +} + +bool DBImpl::TEST_IsHot(const Iterator* iter) { + assert(is_hotcold_); + assert(iter != NULL); + + BlockMetrics* bm = NULL; + ReadOptions metrics_read_options; + bool result = IsRecordHot(iter, metrics_db_, metrics_read_options, &bm); + delete bm; + + return result; +} + +bool DBImpl::TEST_IsHotCold() { + return is_hotcold_; +} + Status DBImpl::Get(const ReadOptions& options, const Slice& key, std::string* value) { @@ -2167,6 +2162,7 @@ Status DBImpl::Get(const ReadOptions& options, bool have_stat_update = false; Version::GetStats stats; + BlockMetrics* bm = nullptr; // Unlock while reading from files and memtables { mutex_.Unlock(); @@ -2179,7 +2175,7 @@ Status DBImpl::Get(const ReadOptions& options, } else { ReadOptions read_options = options; if (read_options.record_accesses && is_hotcold_) { - read_options.metrics_handler = this; + read_options.metrics_instance = &bm; } // If the database is hot-cold then we know we can use short circuiting // as the compaction logic guarantees that this will be valid. @@ -2188,6 +2184,15 @@ Status DBImpl::Get(const ReadOptions& options, } mutex_.Lock(); } + if (bm != nullptr) { + assert(is_hotcold_); + assert(tmp_metrics_store_ != nullptr); + tmp_metrics_store_->push_back(bm); + + if (tmp_metrics_store_->size() >= kMetricsFlushThreshold) { + FlushTempMetrics(); + } + } if (!options_.disable_seek_compaction && have_stat_update && current->UpdateStats(stats)) { @@ -2619,11 +2624,6 @@ Status DB::InternalOpen(const Options& options, const std::string& dbname, DB* metrics_db = NULL; if (with_hotcold) { - if (options.no_block_cache) { - return Status::InvalidArgument("Cannot have HotCold seperation when " - "no_block_cache is true."); - } - // Creates directory for db in case it might not create as we don't create // intermediate directories. options.env->CreateDir(dbname); diff --git a/db/db_impl.h b/db/db_impl.h index 65140682..c91898af 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -192,10 +192,10 @@ class DBImpl : public DB { // This gets run in the background thread to handle flushing metrics we // receive from the cache to metrics_db_. Only up to a single instance of this // thread will run at a time per DBImpl instance. - static void FlushMetrics(void*); - // This is the callback function that gets passed to Cache to handle metrics. - static void HandleMetrics(void* db, std::vector* metrics); + static void FlushMetrics(void* db); + // Flushes tmp_metrics_store_ to disk. + void FlushTempMetrics(); // Returns the list of live files in 'live' and the list // of all files in the filesystem in 'allfiles'. @@ -250,6 +250,7 @@ class DBImpl : public DB { unique_ptr log_; // Metrics that have been received from the cache, but have not yet been // flushed to metrics_db_. + std::vector* tmp_metrics_store_; std::vector*> unflushed_metrics_; std::string host_name_; diff --git a/db/version_set.cc b/db/version_set.cc index d57adb39..dd13494e 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -547,7 +547,7 @@ Status Version::Get(const ReadOptions& options, ReadOptions read_options = options; if (level < vset_->options_->min_hotcold_level) { - read_options.metrics_handler = nullptr; + read_options.metrics_instance = nullptr; } for (uint32_t i = 0; i < files.size(); ++i) { diff --git a/include/leveldb/cache.h b/include/leveldb/cache.h index 34333d96..c729adcb 100644 --- a/include/leveldb/cache.h +++ b/include/leveldb/cache.h @@ -20,7 +20,6 @@ #include #include -#include #include "leveldb/slice.h" namespace leveldb { @@ -34,8 +33,6 @@ class Cache; extern shared_ptr NewLRUCache(size_t capacity); extern shared_ptr NewLRUCache(size_t capacity, int numShardBits); -class BlockMetrics; - class Cache { public: Cache() { } @@ -91,36 +88,6 @@ class Cache { // returns the maximum configured capacity of the cache virtual size_t GetCapacity() = 0; - - // Releases the handle and passes the metrics to the metrics handler - // that was added with the AddHandler() method. - // If handler was not registered then metrics is deleted. - virtual void ReleaseAndRecordMetrics(Cache::Handle* handle, void* handler, - BlockMetrics* metrics); - - // Adds a handler for a std::vector of metrics. Whenever enough metrics are - // acquired by ReleaseAndRecordMetrics() handler_func is called. The first - // parameter for the callback function is the handler itself. - // Note that handler_func can get called on any thread and hence must be - // thread-safe. - // - // REQUIRES: handler != NULL - // REQUIRES: handler_func != NULL - // REQUIRES: hander hasn't been added before. (handler can get re-added after - // it was removed by RemoveHandler()) - virtual void AddHandler( - void* handler, - void (*handler_func)(void*, std::vector*)); - - // Removes a handler. This call flushes all the cached metrics before - // unregistering the handler. - // - // REQUIRES: handler has been added with AddHandler() - virtual void RemoveHandler(void* handler); - - // Forcefully flushes all metrics currently held by the cache. - virtual void ForceFlushMetrics(); - private: void LRU_Remove(Handle* e); void LRU_Append(Handle* e); diff --git a/include/leveldb/options.h b/include/leveldb/options.h index c119a976..95ca4056 100644 --- a/include/leveldb/options.h +++ b/include/leveldb/options.h @@ -380,6 +380,7 @@ struct Options { size_t manifest_preallocation_size; }; +class BlockMetrics; // Options that control read operations struct ReadOptions { // If true, all data read from underlying storage will be @@ -409,17 +410,17 @@ struct ReadOptions { fill_cache(true), snapshot(NULL), record_accesses(true), - metrics_handler(NULL) { + metrics_instance(nullptr) { } ReadOptions(bool cksum, bool cache) : verify_checksums(cksum), fill_cache(cache), snapshot(NULL), record_accesses(true), - metrics_handler(NULL) { + metrics_instance(nullptr) { } // Internal parameters - void* metrics_handler; + BlockMetrics** metrics_instance; }; // Options that control write operations diff --git a/table/block.cc b/table/block.cc index 387517e4..e2ed4970 100644 --- a/table/block.cc +++ b/table/block.cc @@ -289,10 +289,7 @@ class Block::Iter : public Iterator { // This is the iterator returned by Block::NewMetricsIterator() on success. class Block::MetricsIter : public Block::Iter { private: - Cache* cache_; - Cache::Handle* cache_handle_; - void* metrics_handler_; - BlockMetrics* metrics_; + BlockMetrics** metrics_instance_; const InternalKeyComparator* icmp_; public: @@ -302,25 +299,14 @@ class Block::MetricsIter : public Block::Iter { const char* data, uint32_t restarts, uint32_t num_restarts, - Cache* cache, - Cache::Handle* cache_handle, - void* metrics_handler) + BlockMetrics** metrics_instance) : Block::Iter(comparator, file_number, block_offset, data, restarts, num_restarts), - cache_(cache), - cache_handle_(cache_handle), - metrics_handler_(metrics_handler), - metrics_(NULL), + metrics_instance_(metrics_instance), icmp_(dynamic_cast(comparator_)) { } virtual ~MetricsIter() { - if (metrics_) { - cache_->ReleaseAndRecordMetrics(cache_handle_, metrics_handler_, - metrics_); - } else { - cache_->Release(cache_handle_); - } } virtual void Seek(const Slice& target) { @@ -348,11 +334,11 @@ class Block::MetricsIter : public Block::Iter { private: void RecordAccess() { assert(Valid()); - if (metrics_ == nullptr) { - metrics_ = new BlockMetrics(file_number_, block_offset_, - num_restarts_, kBytesPerRestart); + if (*metrics_instance_ == nullptr) { + *metrics_instance_ = new BlockMetrics(file_number_, block_offset_, + num_restarts_, kBytesPerRestart); } - metrics_->RecordAccess(restart_index_, restart_offset_); + (*metrics_instance_)->RecordAccess(restart_index_, restart_offset_); } }; @@ -387,34 +373,21 @@ Iterator* Block::NewIterator(const Comparator* cmp, Iterator* Block::NewMetricsIterator(const Comparator* cmp, uint64_t file_number, uint64_t block_offset, - Cache* cache, - Cache::Handle* cache_handle, - void* metrics_handler) { - assert(cache != nullptr); - assert(cache_handle != nullptr); - assert(metrics_handler != nullptr); - - Iterator* iter = nullptr; + BlockMetrics** metrics_instance) { + assert(metrics_instance != nullptr); + if (size_ < 2*sizeof(uint32_t)) { - iter = NewErrorIterator(Status::Corruption("bad block contents")); + return NewErrorIterator(Status::Corruption("bad block contents")); } const uint32_t num_restarts = NumRestarts(); if (num_restarts == 0) { - iter = NewEmptyIterator(); - } - - if (iter == nullptr) { + return NewEmptyIterator(); + } else { // MetricsIter takes ownership of the cache handle and will delete it. - iter = new MetricsIter(cmp, file_number, block_offset, data_, + return new MetricsIter(cmp, file_number, block_offset, data_, restart_offset_, num_restarts, - cache, cache_handle, metrics_handler); - } else { - // Release the cache handle as neither the error iterator nor the empty - // iterator need it's contents. - cache->Release(cache_handle); + metrics_instance); } - - return iter; } bool Block::GetBlockIterInfo(const Iterator* iter, diff --git a/table/block.h b/table/block.h index 81a1c937..6cb250cd 100644 --- a/table/block.h +++ b/table/block.h @@ -36,16 +36,12 @@ class Block { uint64_t file_number, uint64_t block_offset); - // Creates a new iterator that keeps track of accesses. When this iterator is - // deleted it frees the cache handle and passes the metrics to the cache - // specified. - // REQUIRES: cache, cache_handle, metrics_handler must be non-NULL + // Creates a new iterator that keeps track of accesses. + // REQUIRES: metrics_instance must be non-NULL Iterator* NewMetricsIterator(const Comparator* comparator, uint64_t file_number, uint64_t block_offset, - Cache* cache, - Cache::Handle* cache_handle, - void* metrics_handler); + BlockMetrics** metrics_instance); // Returns true if iter is a Block iterator and also knows that which file // and block it belongs to. diff --git a/table/block_test.cc b/table/block_test.cc index 50adbe03..04cc2868 100644 --- a/table/block_test.cc +++ b/table/block_test.cc @@ -104,45 +104,6 @@ TEST(BlockTest, SimpleTest) { delete iter; } -class MockCache : public Cache { - public: - bool generated_metrics; - bool was_released; - - public: - MockCache() { - Reset(); - } - - // Stub implementations so class compiles - virtual Cache::Handle* Insert(const Slice& key, void* value, size_t charge, - void (*deleter)(const Slice& key, void* value)) { - return nullptr; - } - virtual Cache::Handle* Lookup(const Slice& key) { return nullptr; } - virtual void* Value(Cache::Handle* handle) { return nullptr; } - virtual void Erase(const Slice& key) {} - virtual uint64_t NewId() { return 4;} - virtual size_t GetCapacity() { return 0; } - - virtual void Release(Cache::Handle* handle) { - generated_metrics = false; - was_released = true; - } - - virtual void ReleaseAndRecordMetrics(Cache::Handle* handle, void* handler, - BlockMetrics* metrics) { - generated_metrics = metrics != nullptr; - delete metrics; - was_released = true; - } - - void Reset() { - generated_metrics = false; - was_released = false; - } -}; - TEST(BlockTest, MetricsIter) { Random rnd(301); Options options = Options(); @@ -182,50 +143,41 @@ TEST(BlockTest, MetricsIter) { contents.heap_allocated = false; Block reader(contents); - MockCache c; - Cache::Handle* ch = reinterpret_cast(&c); + BlockMetrics* bm = nullptr;; - c.Reset(); Iterator* iter = reader.NewMetricsIterator(options.comparator, 0, 0, - &c, ch, &c); + &bm); delete iter; - ASSERT_TRUE(c.was_released); - ASSERT_TRUE(!c.generated_metrics) << "needlessly generated metrics.\n"; + ASSERT_TRUE(bm == nullptr) << "needlessly generated metrics.\n"; for (int i = 1; i < num_records; i += 2) { - c.Reset(); - iter = reader.NewMetricsIterator(options.comparator, 0, 0, &c, ch, &c); + iter = reader.NewMetricsIterator(options.comparator, 0, 0, &bm); iter->Seek(keys[i]); delete iter; - ASSERT_TRUE(c.was_released); - ASSERT_TRUE(!c.generated_metrics) << "generated metrics for unfound row.\n"; + ASSERT_TRUE(bm == nullptr) << "generated metrics for unfound row.\n"; } - c.Reset(); - iter = reader.NewMetricsIterator(options.comparator, 0, 0, &c, ch, &c); + iter = reader.NewMetricsIterator(options.comparator, 0, 0, &bm); iter->SeekToFirst(); iter->Next(); iter->SeekToLast(); iter->Prev(); delete iter; - ASSERT_TRUE(c.was_released); - ASSERT_TRUE(!c.generated_metrics) << "generated metrics for non-Seek().\n"; + ASSERT_TRUE(bm == nullptr) << "generated metrics for non-Seek().\n"; for (int i = 0; i < num_records; i += 2) { - c.Reset(); - iter = reader.NewMetricsIterator(options.comparator, 0, 0, &c, ch, &c); + iter = reader.NewMetricsIterator(options.comparator, 0, 0, &bm); iter->Seek(keys[i]); delete iter; - ASSERT_TRUE(c.was_released); - ASSERT_TRUE(c.generated_metrics) << "didn't generate metrics for found row\n"; + ASSERT_TRUE(bm != nullptr) << "didn't generate metrics for found row\n"; + delete bm; bm = nullptr; } - c.Reset(); - iter = reader.NewMetricsIterator(options.comparator, 0, 0, &c, ch, &c); + iter = reader.NewMetricsIterator(options.comparator, 0, 0, &bm); iter->Seek(InternalKey(" 2", 140, kTypeDeletion).Encode()); delete iter; - ASSERT_TRUE(c.was_released); - ASSERT_TRUE(c.generated_metrics) << "didn't generate metrics for Seek().\n"; + ASSERT_TRUE(bm != nullptr) << "didn't generate metrics for Seek().\n"; + delete bm; bm = nullptr; } class BlockMetricsTest { diff --git a/table/table.cc b/table/table.cc index 818c22c1..d2aa14fd 100644 --- a/table/table.cc +++ b/table/table.cc @@ -261,23 +261,22 @@ Iterator* Table::BlockReader(void* arg, Iterator* iter; if (block != NULL) { - if (options.metrics_handler == NULL || cache_handle == NULL) { + if (options.metrics_instance == NULL || cache_handle == NULL) { iter = block->NewIterator(table->rep_->options.comparator, table->rep_->file_number, handle.offset()); - if (cache_handle == NULL) { - iter->RegisterCleanup(&DeleteBlock, block, NULL); - } else { - iter->RegisterCleanup(&ReleaseBlock, block_cache, cache_handle); - } } else { iter = block->NewMetricsIterator(table->rep_->options.comparator, table->rep_->file_number, handle.offset(), - block_cache, - cache_handle, - options.metrics_handler); + options.metrics_instance); + } + + if (cache_handle == NULL) { + iter->RegisterCleanup(&DeleteBlock, block, NULL); + } else { + iter->RegisterCleanup(&ReleaseBlock, block_cache, cache_handle); } } else { iter = NewErrorIterator(s); diff --git a/util/cache.cc b/util/cache.cc index bd61d480..daaea440 100644 --- a/util/cache.cc +++ b/util/cache.cc @@ -3,14 +3,11 @@ // found in the LICENSE file. See the AUTHORS file for names of contributors. #include -#include #include #include -#include #include "leveldb/cache.h" #include "port/port.h" -#include "table/block.h" #include "util/hash.h" #include "util/mutexlock.h" @@ -19,21 +16,6 @@ namespace leveldb { Cache::~Cache() { } -// Stub methods to not break backwards compatibility. -void Cache::ReleaseAndRecordMetrics(Cache::Handle* handle, void* handler, - BlockMetrics* metrics) { - delete metrics; -} -void Cache::AddHandler( - void* handler, - void (*handler_func)(void*, std::vector*)) { -} -void Cache::RemoveHandler(void* handler) { -} -void Cache::ForceFlushMetrics() { -} - - namespace { // LRU cache implementation @@ -149,15 +131,11 @@ class HandleTable { } }; -// Number of metrics that are stored per shard before they get flushed to the -// handler_func. -size_t kMetricsFlushThreshold = 10000u; - // A single shard of sharded cache. class LRUCache { public: LRUCache(); - virtual ~LRUCache(); + ~LRUCache(); // Separate from constructor so caller can easily make an array of LRUCache void SetCapacity(size_t capacity) { capacity_ = capacity; } @@ -170,15 +148,6 @@ class LRUCache { void Release(Cache::Handle* handle); void Erase(const Slice& key, uint32_t hash); - - void ReleaseAndRecordMetrics(Cache::Handle* handle, void* handler, - BlockMetrics* metrics); - void AddHandler( - void* handler, - void (*handler_func)(void*, std::vector*)); - void RemoveHandler(void* handler); - void ForceFlushMetrics(); - private: void LRU_Remove(LRUHandle* e); void LRU_Append(LRUHandle* e); @@ -197,11 +166,6 @@ class LRUCache { LRUHandle lru_; HandleTable table_; - - // Stores handler_funcs for the handlers. - std::map*)> handlers_; - // Stores the cache of metrics for specific handlers. - std::map*> metrics_store_; }; LRUCache::LRUCache() @@ -213,14 +177,6 @@ LRUCache::LRUCache() } LRUCache::~LRUCache() { - // Flush cached metrics to the handlers. - std::map*>::iterator it; - for (it = metrics_store_.begin(); - it != metrics_store_.end(); ++it) { - void* handler = it->first; - (*handlers_[handler])(handler, metrics_store_[handler]); - } - for (LRUHandle* e = lru_.next; e != &lru_; ) { LRUHandle* next = e->next; assert(e->refs == 1); // Error if caller has an unreleased handle @@ -310,59 +266,6 @@ void LRUCache::Erase(const Slice& key, uint32_t hash) { } } -void LRUCache::ReleaseAndRecordMetrics(Cache::Handle* handle, void* handler, - BlockMetrics* metrics) { - MutexLock l(&mutex_); - - // Stores the metrics in the cache for the handler if handler has been added; - // otherwise, we just delete it. - if (metrics_store_.count(handler) != 0) { - metrics_store_[handler]->push_back(metrics); - if (metrics_store_[handler]->size() >= kMetricsFlushThreshold) { - (*handlers_[handler])(handler, metrics_store_[handler]); - metrics_store_[handler] = new std::vector(); - } - } else { - delete metrics; - } - - Unref(reinterpret_cast(handle)); -} - -void LRUCache::AddHandler( - void* handler, - void (*handler_func)(void*, std::vector*)) { - assert(handler != NULL); - assert(handler_func != NULL); - assert(handlers_.count(handler) == 0); - assert(metrics_store_.count(handler) == 0); - - MutexLock l(&mutex_); - - handlers_[handler] = handler_func; - metrics_store_[handler] = new std::vector(); -} - -void LRUCache::RemoveHandler(void* handler) { - assert(handlers_.count(handler) != 0); - assert(metrics_store_.count(handler) != 0); - MutexLock l(&mutex_); - - (*handlers_[handler])(handler, metrics_store_[handler]); - handlers_.erase(handler); - metrics_store_.erase(handler); -} - -void LRUCache::ForceFlushMetrics() { - std::map*>::iterator it; - for (it = metrics_store_.begin(); - it != metrics_store_.end(); ++it) { - void* handler = it->first; - (*handlers_[handler])(handler, metrics_store_[handler]); - metrics_store_[handler] = new std::vector(); - } -} - static int kNumShardBits = 4; // default values, can be overridden class ShardedLRUCache : public Cache { @@ -371,7 +274,6 @@ class ShardedLRUCache : public Cache { port::Mutex id_mutex_; uint64_t last_id_; int numShardBits; - size_t numShards_; size_t capacity_; static inline uint32_t HashSlice(const Slice& s) { @@ -385,10 +287,10 @@ class ShardedLRUCache : public Cache { void init(size_t capacity, int numbits) { numShardBits = numbits; capacity_ = capacity; - numShards_ = 1u << numShardBits; - shard_ = new LRUCache[numShards_]; - const size_t per_shard = (capacity + (numShards_ - 1)) / numShards_; - for (size_t s = 0; s < numShards_; s++) { + int numShards = 1 << numShardBits; + shard_ = new LRUCache[numShards]; + const size_t per_shard = (capacity + (numShards - 1)) / numShards; + for (int s = 0; s < numShards; s++) { shard_[s].SetCapacity(per_shard); } } @@ -432,31 +334,6 @@ class ShardedLRUCache : public Cache { virtual uint64_t GetCapacity() { return capacity_; } - - void ReleaseAndRecordMetrics(Handle* handle, void* handler, - BlockMetrics* metrics) { - LRUHandle* h = reinterpret_cast(handle); - - shard_[Shard(h->hash)].ReleaseAndRecordMetrics(handle, handler, metrics); - } - - void AddHandler(void* handler, - void (*handler_func)(void*, std::vector*)) { - for (size_t i = 0; i < numShards_; ++i) { - shard_[i].AddHandler(handler, handler_func); - } - } - void RemoveHandler(void* handler) { - for (size_t i = 0; i < numShards_; ++i) { - shard_[i].RemoveHandler(handler); - } - } - - void ForceFlushMetrics() { - for (size_t i = 0; i < numShards_; ++i) { - shard_[i].ForceFlushMetrics(); - } - } }; } // end anonymous namespace