]> git-server-git.apps.pok.os.sepia.ceph.com Git - rocksdb.git/commitdiff
Removed Cache from metrics flow for Block::MetricsIter to DBImpl
authorKosie van der Merwe <kosie.vandermerwe@gmail.com>
Fri, 15 Feb 2013 20:10:00 +0000 (12:10 -0800)
committerKosie van der Merwe <kosie.vandermerwe@gmail.com>
Fri, 15 Feb 2013 20:10:00 +0000 (12:10 -0800)
Summary:
Before this patch we had to send metrics to `Cache`, which then sent the metrics to `DBImpl`. Now we directly send them to `DBImpl`, by the use of a pointer to a pointer of a `BlockMetrics` instance. We can do this safely and without additional locks because:

* `Block::MetricsIter` runs in the same thread as `DBImpl::Get()` (in which the `BlockMetrics` pointer resides)
* `DBImpl::Get()` locks `DBImpl` again after the `Block::MetricsIter` is destroyed, so we don't have to do an additional lock.

Test Plan: make check

Reviewers: vamsi, dhruba

Reviewed By: vamsi

CC: leveldb
Differential Revision: https://reviews.facebook.net/D8583

db/db_impl.cc
db/db_impl.h
db/version_set.cc
include/leveldb/cache.h
include/leveldb/options.h
table/block.cc
table/block.h
table/block_test.cc
table/table.cc
util/cache.cc

index 86b2c30d2749568aa96ae948b2fd6703101ccae6..857c2a2802ba91024078a8f733834b753ee7fb6a 100644 (file)
@@ -187,6 +187,7 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname,
       bg_cv_(&mutex_),
       mem_(new MemTable(internal_comparator_, NumberLevels())),
       logfile_number_(0),
+      tmp_metrics_store_(new std::vector<BlockMetrics*>()),
       tmp_batch_(new WriteBatch),
       bg_compaction_scheduled_(0),
       bg_logstats_scheduled_(false),
@@ -229,13 +230,6 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname,
     host_name_ = "localhost";
   }
   last_log_ts = 0;
-
-  // Set up metrics taking
-  if (is_hotcold_) {
-    assert(options_.block_cache != NULL);
-
-    options_.block_cache->AddHandler(this, &DBImpl::HandleMetrics);
-  }
 }
 
 DBImpl::~DBImpl() {
@@ -243,10 +237,16 @@ DBImpl::~DBImpl() {
   if (flush_on_destroy_) {
     FlushMemTable(FlushOptions());
   }
+  mutex_.Lock();
+  assert(tmp_metrics_store_ != nullptr);
   if (is_hotcold_) {
-    options_.block_cache->RemoveHandler(this);
+    for (BlockMetrics* bm: *tmp_metrics_store_) {
+      delete bm;
+    }
+    tmp_metrics_store_->clear();
   }
-  mutex_.Lock();
+  assert(tmp_metrics_store_->empty());
+  delete tmp_metrics_store_;
   shutting_down_.Release_Store(this);  // Any non-NULL value is ok
   while (bg_compaction_scheduled_ || bg_logstats_scheduled_ ||
          bg_flushing_metrics_scheduled_) {
@@ -1156,7 +1156,9 @@ void DBImpl::TEST_ForceFlushMetrics() {
   assert(is_hotcold_);
   assert(options_.block_cache != NULL);
 
-  options_.block_cache->ForceFlushMetrics();
+  mutex_.Lock();
+  FlushTempMetrics();
+  mutex_.Unlock();
 
   TEST_WaitForMetricsFlush();
 }
@@ -1632,145 +1634,6 @@ Status DBImpl::InstallCompactionResults(CompactionState* compact) {
   return versions_->LogAndApply(compact->compaction->edit(), &mutex_);
 }
 
-namespace {
-// Helper function that takes a vector of BlockMetrics and joins all the
-// metrics together that share the same key.
-//
-// Assumes it has exclusive access to metrics.
-void CompactMetrics(std::vector<BlockMetrics*>& metrics) {
-  std::map<std::string, BlockMetrics*> unique;
-
-  for (size_t i = 0; i < metrics.size(); ++i) {
-    assert(metrics[i] != NULL);
-    const std::string& key = metrics[i]->GetDBKey();
-
-    if (unique.count(key) == 0) {
-      unique[key] = metrics[i];
-    } else {
-      assert(unique[key]->IsCompatible(metrics[i]));
-      unique[key]->Join(metrics[i]);
-      delete metrics[i];
-    }
-  }
-
-  metrics.clear();
-
-  for (std::map<std::string, BlockMetrics*>::iterator it = unique.begin();
-       it != unique.end(); ++it) {
-    metrics.push_back(it->second);
-  }
-}
-}
-void DBImpl::FlushMetrics(void* db) {
-  DBImpl* dbimpl = reinterpret_cast<DBImpl*>(db);
-
-  while (true) {
-    std::vector<std::vector<BlockMetrics*>*> unflushed_metrics;
-
-    // Get unflushed metrics.
-    {
-      MutexLock l(&dbimpl->mutex_);
-
-      if (dbimpl->unflushed_metrics_.empty()) {
-        dbimpl->bg_flushing_metrics_scheduled_ = false;
-        dbimpl->bg_cv_.SignalAll();
-        return;
-      }
-
-      unflushed_metrics = dbimpl->unflushed_metrics_;
-      dbimpl->unflushed_metrics_.clear();
-    }
-
-    // Join all the metrics together in a single vector.
-    std::vector<BlockMetrics*> metrics;
-    for (size_t i = 0; i < unflushed_metrics.size(); ++i) {
-      metrics.insert(metrics.end(), unflushed_metrics[i]->begin(),
-                     unflushed_metrics[i]->end());
-      delete unflushed_metrics[i];
-    }
-
-    CompactMetrics(metrics);
-
-    // Flush metrics to database.
-    DB* metrics_db = dbimpl->metrics_db_;
-    for (size_t i = 0; i < metrics.size(); ++i) {
-      assert(metrics[i] != NULL);
-      const std::string& key = metrics[i]->GetDBKey();
-      // TODO: fix this code by some means so that we don't lose metrics
-      //       already in the database. Using a read-update-write cycle is far
-      //       too slow so we temporarily replaced it with an overwrite, with
-      //       the old code left commented out to show the ideal logic.
-      metrics_db->Put(WriteOptions(), key, metrics[i]->GetDBValue());
-      /*
-      BlockMetrics* db_metrics = NULL;
-
-      std::string db_value;
-      Status s = metrics_db->Get(ReadOptions(), key, &db_value);
-
-      if (s.ok()) {
-        db_metrics = BlockMetrics::Create(key, db_value);
-      }
-
-      // We check if metrics[i] and db_metrics are compatible because
-      // the metrics in the metrics db might have gotten corrupted.
-      if (db_metrics != NULL && metrics[i]->IsCompatible(db_metrics)) {
-        db_metrics->Join(metrics[i]);
-        metrics_db->Put(WriteOptions(), key, db_metrics->GetDBValue());
-      } else {
-        metrics_db->Put(WriteOptions(), key, metrics[i]->GetDBValue());
-        // TODO: Log an error here.
-      }
-      delete db_metrics;
-      */
-    }
-
-    for (size_t i = 0; i < metrics.size(); ++i) {
-      delete metrics[i];
-    }
-  }
-}
-
-void DBImpl::HandleMetrics(void* db, std::vector<BlockMetrics*>* metrics) {
-  DBImpl* dbimpl = reinterpret_cast<DBImpl*>(db);
-
-  // If we are shutting down or we are not a hot-cold db discard metrics.
-  //
-  // is_hotcold_ is constant after construction and shutting_down_ is an atomic
-  // so we don't need to acquire mutex_ .
-  if (!dbimpl->is_hotcold_ || dbimpl->shutting_down_.Acquire_Load()) {
-    for (size_t i = 0; i < metrics->size(); ++i) {
-      delete (*metrics)[i];
-    }
-    delete metrics;
-
-    return;
-  }
-
-  MutexLock l(&dbimpl->mutex_);
-  dbimpl->unflushed_metrics_.push_back(metrics);
-
-  if (!dbimpl->bg_flushing_metrics_scheduled_) {
-    dbimpl->bg_flushing_metrics_scheduled_ = true;
-    dbimpl->env_->Schedule(&DBImpl::FlushMetrics, dbimpl);
-  }
-}
-
-bool DBImpl::TEST_IsHot(const Iterator* iter) {
-  assert(is_hotcold_);
-  assert(iter != NULL);
-
-  BlockMetrics* bm = NULL;
-  ReadOptions metrics_read_options;
-  bool result = IsRecordHot(iter, metrics_db_, metrics_read_options, &bm);
-  delete bm;
-
-  return result;
-}
-
-bool DBImpl::TEST_IsHotCold() {
-  return is_hotcold_;
-}
-
 //
 // Given a sequence number, return the sequence number of the
 // earliest snapshot that this sequence number is visible in.
@@ -2145,6 +2008,138 @@ int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes() {
   return versions_->MaxNextLevelOverlappingBytes();
 }
 
+
+// Number of metrics that are stored they get flushed to the
+// metrics db.
+size_t kMetricsFlushThreshold = 10000u;
+
+namespace {
+// Helper function that takes a vector of BlockMetrics and joins all the
+// metrics together that share the same key.
+//
+// Assumes it has exclusive access to metrics.
+void CompactMetrics(std::vector<BlockMetrics*>& metrics) {
+  std::map<std::string, BlockMetrics*> unique;
+
+  for (size_t i = 0; i < metrics.size(); ++i) {
+    assert(metrics[i] != NULL);
+    const std::string& key = metrics[i]->GetDBKey();
+
+    if (unique.count(key) == 0) {
+      unique[key] = metrics[i];
+    } else {
+      assert(unique[key]->IsCompatible(metrics[i]));
+      unique[key]->Join(metrics[i]);
+      delete metrics[i];
+    }
+  }
+
+  metrics.clear();
+
+  for (std::map<std::string, BlockMetrics*>::iterator it = unique.begin();
+       it != unique.end(); ++it) {
+    metrics.push_back(it->second);
+  }
+}
+}
+void DBImpl::FlushMetrics(void* db) {
+  DBImpl* dbimpl = reinterpret_cast<DBImpl*>(db);
+
+  while (true) {
+    std::vector<std::vector<BlockMetrics*>*> unflushed_metrics;
+
+    // Get unflushed metrics.
+    {
+      MutexLock l(&dbimpl->mutex_);
+
+      if (dbimpl->unflushed_metrics_.empty()) {
+        dbimpl->bg_flushing_metrics_scheduled_ = false;
+        dbimpl->bg_cv_.SignalAll();
+        return;
+      }
+
+      unflushed_metrics = dbimpl->unflushed_metrics_;
+      dbimpl->unflushed_metrics_.clear();
+    }
+
+    // Join all the metrics together in a single vector.
+    std::vector<BlockMetrics*> metrics;
+    for (size_t i = 0; i < unflushed_metrics.size(); ++i) {
+      metrics.insert(metrics.end(), unflushed_metrics[i]->begin(),
+                     unflushed_metrics[i]->end());
+      delete unflushed_metrics[i];
+    }
+
+    CompactMetrics(metrics);
+
+    // Flush metrics to database.
+    DB* metrics_db = dbimpl->metrics_db_;
+    for (size_t i = 0; i < metrics.size(); ++i) {
+      assert(metrics[i] != NULL);
+      const std::string& key = metrics[i]->GetDBKey();
+      // TODO: fix this code by some means so that we don't lose metrics
+      //       already in the database. Using a read-update-write cycle is far
+      //       too slow so we temporarily replaced it with an overwrite, with
+      //       the old code left commented out to show the ideal logic.
+      metrics_db->Put(WriteOptions(), key, metrics[i]->GetDBValue());
+      /*
+      BlockMetrics* db_metrics = NULL;
+
+      std::string db_value;
+      Status s = metrics_db->Get(ReadOptions(), key, &db_value);
+
+      if (s.ok()) {
+        db_metrics = BlockMetrics::Create(key, db_value);
+      }
+
+      // We check if metrics[i] and db_metrics are compatible because
+      // the metrics in the metrics db might have gotten corrupted.
+      if (db_metrics != NULL && metrics[i]->IsCompatible(db_metrics)) {
+        db_metrics->Join(metrics[i]);
+        metrics_db->Put(WriteOptions(), key, db_metrics->GetDBValue());
+      } else {
+        metrics_db->Put(WriteOptions(), key, metrics[i]->GetDBValue());
+        // TODO: Log an error here.
+      }
+      delete db_metrics;
+      */
+    }
+
+    for (size_t i = 0; i < metrics.size(); ++i) {
+      delete metrics[i];
+    }
+  }
+}
+
+void DBImpl::FlushTempMetrics() {
+  mutex_.AssertHeld();
+  assert(is_hotcold_);
+
+  unflushed_metrics_.push_back(tmp_metrics_store_);
+  tmp_metrics_store_ = new std::vector<BlockMetrics*>();
+
+  if (!bg_flushing_metrics_scheduled_) {
+    bg_flushing_metrics_scheduled_ = true;
+    env_->Schedule(&DBImpl::FlushMetrics, this);
+  }
+}
+
+bool DBImpl::TEST_IsHot(const Iterator* iter) {
+  assert(is_hotcold_);
+  assert(iter != NULL);
+
+  BlockMetrics* bm = NULL;
+  ReadOptions metrics_read_options;
+  bool result = IsRecordHot(iter, metrics_db_, metrics_read_options, &bm);
+  delete bm;
+
+  return result;
+}
+
+bool DBImpl::TEST_IsHotCold() {
+  return is_hotcold_;
+}
+
 Status DBImpl::Get(const ReadOptions& options,
                    const Slice& key,
                    std::string* value) {
@@ -2167,6 +2162,7 @@ Status DBImpl::Get(const ReadOptions& options,
   bool have_stat_update = false;
   Version::GetStats stats;
 
+  BlockMetrics* bm = nullptr;
   // Unlock while reading from files and memtables
   {
     mutex_.Unlock();
@@ -2179,7 +2175,7 @@ Status DBImpl::Get(const ReadOptions& options,
     } else {
       ReadOptions read_options = options;
       if (read_options.record_accesses && is_hotcold_) {
-        read_options.metrics_handler = this;
+        read_options.metrics_instance = &bm;
       }
       // If the database is hot-cold then we know we can use short circuiting
       // as the compaction logic guarantees that this will be valid.
@@ -2188,6 +2184,15 @@ Status DBImpl::Get(const ReadOptions& options,
     }
     mutex_.Lock();
   }
+  if (bm != nullptr) {
+    assert(is_hotcold_);
+    assert(tmp_metrics_store_ != nullptr);
+    tmp_metrics_store_->push_back(bm);
+
+    if (tmp_metrics_store_->size() >= kMetricsFlushThreshold) {
+      FlushTempMetrics();
+    }
+  }
 
   if (!options_.disable_seek_compaction &&
       have_stat_update && current->UpdateStats(stats)) {
@@ -2619,11 +2624,6 @@ Status DB::InternalOpen(const Options& options, const std::string& dbname,
   DB* metrics_db = NULL;
 
   if (with_hotcold) {
-    if (options.no_block_cache) {
-      return Status::InvalidArgument("Cannot have HotCold seperation when "
-                                     "no_block_cache is true.");
-    }
-
     // Creates directory for db in case it might not create as we don't create
     // intermediate directories.
     options.env->CreateDir(dbname);
index 651406824137bef5e58252492c517aba30ea350c..c91898af5a0970a7f9dc114be8e1acf8ed6d0a59 100644 (file)
@@ -192,10 +192,10 @@ class DBImpl : public DB {
   // This gets run in the background thread to handle flushing metrics we
   // receive from the cache to metrics_db_. Only up to a single instance of this
   // thread will run at a time per DBImpl instance.
-  static void FlushMetrics(void*);
-  // This is the callback function that gets passed to Cache to handle metrics.
-  static void HandleMetrics(void* db, std::vector<BlockMetrics*>* metrics);
+  static void FlushMetrics(void* db);
 
+  // Flushes tmp_metrics_store_ to disk.
+  void FlushTempMetrics();
 
   // Returns the list of live files in 'live' and the list
   // of all files in the filesystem in 'allfiles'.
@@ -250,6 +250,7 @@ class DBImpl : public DB {
   unique_ptr<log::Writer> log_;
   // Metrics that have been received from the cache, but have not yet been
   // flushed to metrics_db_.
+  std::vector<BlockMetrics*>* tmp_metrics_store_;
   std::vector<std::vector<BlockMetrics*>*> unflushed_metrics_;
 
   std::string host_name_;
index d57adb399528ccf3868064d5e51caaf12dec43fc..dd13494e664b83a9ca1c5b80711a9f251026083e 100644 (file)
@@ -547,7 +547,7 @@ Status Version::Get(const ReadOptions& options,
 
     ReadOptions read_options = options;
     if (level < vset_->options_->min_hotcold_level) {
-      read_options.metrics_handler = nullptr;
+      read_options.metrics_instance = nullptr;
     }
 
     for (uint32_t i = 0; i < files.size(); ++i) {
index 34333d965f656d28356b42f91b8fc96b9ca1dfc7..c729adcb0a4017618e9ebbb20e305e265c2ceb27 100644 (file)
@@ -20,7 +20,6 @@
 
 #include <memory>
 #include <stdint.h>
-#include <vector>
 #include "leveldb/slice.h"
 
 namespace leveldb {
@@ -34,8 +33,6 @@ class Cache;
 extern shared_ptr<Cache> NewLRUCache(size_t capacity);
 extern shared_ptr<Cache> NewLRUCache(size_t capacity, int numShardBits);
 
-class BlockMetrics;
-
 class Cache {
  public:
   Cache() { }
@@ -91,36 +88,6 @@ class Cache {
   // returns the maximum configured capacity of the cache
   virtual size_t GetCapacity() = 0;
 
-
-  // Releases the handle and passes the metrics to the metrics handler
-  // that was added with the AddHandler() method.
-  // If handler was not registered then metrics is deleted.
-  virtual void ReleaseAndRecordMetrics(Cache::Handle* handle, void* handler,
-                                       BlockMetrics* metrics);
-
-  // Adds a handler for a std::vector of metrics. Whenever enough metrics are
-  // acquired by ReleaseAndRecordMetrics() handler_func is called.  The first
-  // parameter for the callback function is the handler itself.
-  // Note that handler_func can get called on any thread and hence must be
-  // thread-safe.
-  //
-  // REQUIRES: handler != NULL
-  // REQUIRES: handler_func != NULL
-  // REQUIRES: hander hasn't been added before. (handler can get re-added after
-  //           it was removed by RemoveHandler())
-  virtual void AddHandler(
-      void* handler,
-      void (*handler_func)(void*, std::vector<BlockMetrics*>*));
-
-  // Removes a handler. This call flushes all the cached metrics before
-  // unregistering the handler.
-  //
-  // REQUIRES: handler has been added with AddHandler()
-  virtual void RemoveHandler(void* handler);
-
-  // Forcefully flushes all metrics currently held by the cache.
-  virtual void ForceFlushMetrics();
-
  private:
   void LRU_Remove(Handle* e);
   void LRU_Append(Handle* e);
index c119a9761888dce7c256ec744d992e87877388ae..95ca40567071b9bda653440090b7dbc27081b5ba 100644 (file)
@@ -380,6 +380,7 @@ struct Options {
   size_t manifest_preallocation_size;
 };
 
+class BlockMetrics;
 // Options that control read operations
 struct ReadOptions {
   // If true, all data read from underlying storage will be
@@ -409,17 +410,17 @@ struct ReadOptions {
         fill_cache(true),
         snapshot(NULL),
         record_accesses(true),
-        metrics_handler(NULL) {
+        metrics_instance(nullptr) {
   }
   ReadOptions(bool cksum, bool cache) :
               verify_checksums(cksum), fill_cache(cache),
               snapshot(NULL),
               record_accesses(true),
-              metrics_handler(NULL) {
+              metrics_instance(nullptr)  {
   }
 
   // Internal parameters
-  void* metrics_handler;
+  BlockMetrics** metrics_instance;
 };
 
 // Options that control write operations
index 387517e45c8129d47a6c02167e1efc77950634ed..e2ed4970f1d3b5afd510b98821ba36892e091f09 100644 (file)
@@ -289,10 +289,7 @@ class Block::Iter : public Iterator {
 // This is the iterator returned by Block::NewMetricsIterator() on success.
 class Block::MetricsIter : public Block::Iter {
  private:
-  Cache* cache_;
-  Cache::Handle* cache_handle_;
-  void* metrics_handler_;
-  BlockMetrics* metrics_;
+  BlockMetrics** metrics_instance_;
   const InternalKeyComparator* icmp_;
 
  public:
@@ -302,25 +299,14 @@ class Block::MetricsIter : public Block::Iter {
               const char* data,
               uint32_t restarts,
               uint32_t num_restarts,
-              Cache* cache,
-              Cache::Handle* cache_handle,
-              void* metrics_handler)
+              BlockMetrics** metrics_instance)
       : Block::Iter(comparator, file_number, block_offset, data, restarts,
                     num_restarts),
-        cache_(cache),
-        cache_handle_(cache_handle),
-        metrics_handler_(metrics_handler),
-        metrics_(NULL),
+        metrics_instance_(metrics_instance),
         icmp_(dynamic_cast<const InternalKeyComparator*>(comparator_)) {
   }
 
   virtual ~MetricsIter() {
-    if (metrics_) {
-      cache_->ReleaseAndRecordMetrics(cache_handle_, metrics_handler_,
-                                      metrics_);
-    } else {
-      cache_->Release(cache_handle_);
-    }
   }
 
   virtual void Seek(const Slice& target) {
@@ -348,11 +334,11 @@ class Block::MetricsIter : public Block::Iter {
  private:
   void RecordAccess() {
     assert(Valid());
-    if (metrics_ == nullptr) {
-      metrics_ = new BlockMetrics(file_number_, block_offset_,
-                                  num_restarts_, kBytesPerRestart);
+    if (*metrics_instance_ == nullptr) {
+      *metrics_instance_ = new BlockMetrics(file_number_, block_offset_,
+                                            num_restarts_, kBytesPerRestart);
     }
-    metrics_->RecordAccess(restart_index_, restart_offset_);
+    (*metrics_instance_)->RecordAccess(restart_index_, restart_offset_);
   }
 };
 
@@ -387,34 +373,21 @@ Iterator* Block::NewIterator(const Comparator* cmp,
 Iterator* Block::NewMetricsIterator(const Comparator* cmp,
                                     uint64_t file_number,
                                     uint64_t block_offset,
-                                    Cache* cache,
-                                    Cache::Handle* cache_handle,
-                                    void* metrics_handler) {
-  assert(cache != nullptr);
-  assert(cache_handle != nullptr);
-  assert(metrics_handler != nullptr);
-
-  Iterator* iter = nullptr;
+                                    BlockMetrics** metrics_instance) {
+  assert(metrics_instance != nullptr);
+
   if (size_ < 2*sizeof(uint32_t)) {
-    iter = NewErrorIterator(Status::Corruption("bad block contents"));
+    return NewErrorIterator(Status::Corruption("bad block contents"));
   }
   const uint32_t num_restarts = NumRestarts();
   if (num_restarts == 0) {
-    iter = NewEmptyIterator();
-  }
-
-  if (iter == nullptr) {
+    return NewEmptyIterator();
+  } else {
     // MetricsIter takes ownership of the cache handle and will delete it.
-    iter = new MetricsIter(cmp, file_number, block_offset, data_,
+    return new MetricsIter(cmp, file_number, block_offset, data_,
                            restart_offset_, num_restarts,
-                           cache, cache_handle, metrics_handler);
-  } else {
-    // Release the cache handle as neither the error iterator nor the empty
-    // iterator need it's contents.
-    cache->Release(cache_handle);
+                           metrics_instance);
   }
-
-  return iter;
 }
 
 bool Block::GetBlockIterInfo(const Iterator* iter,
index 81a1c937baf8a3318cdd434af318e3642bd4b7d1..6cb250cd4510151aae390595a33c82734746242b 100644 (file)
@@ -36,16 +36,12 @@ class Block {
                         uint64_t file_number,
                         uint64_t block_offset);
 
-  // Creates a new iterator that keeps track of accesses. When this iterator is
-  // deleted it frees the cache handle and passes the metrics to the cache
-  // specified.
-  // REQUIRES: cache, cache_handle, metrics_handler must be non-NULL
+  // Creates a new iterator that keeps track of accesses.
+  // REQUIRES: metrics_instance must be non-NULL
   Iterator* NewMetricsIterator(const Comparator* comparator,
                                uint64_t file_number,
                                uint64_t block_offset,
-                               Cache* cache,
-                               Cache::Handle* cache_handle,
-                               void* metrics_handler);
+                               BlockMetrics** metrics_instance);
 
   // Returns true if iter is a Block iterator and also knows that which file
   // and block it belongs to.
index 50adbe03576a8da8dded1a9b8085afca3f9b824c..04cc286833c18414d66737c18266678e22b2b0a2 100644 (file)
@@ -104,45 +104,6 @@ TEST(BlockTest, SimpleTest) {
   delete iter;
 }
 
-class MockCache : public Cache {
- public:
-  bool generated_metrics;
-  bool was_released;
-
- public:
-  MockCache() {
-    Reset();
-  }
-
-  // Stub implementations so class compiles
-  virtual Cache::Handle* Insert(const Slice& key, void* value, size_t charge,
-                         void (*deleter)(const Slice& key, void* value)) {
-    return nullptr;
-  }
-  virtual Cache::Handle* Lookup(const Slice& key) { return nullptr; }
-  virtual void* Value(Cache::Handle* handle) { return nullptr; }
-  virtual void Erase(const Slice& key) {}
-  virtual uint64_t NewId() { return 4;}
-  virtual size_t GetCapacity() { return 0; }
-
-  virtual void Release(Cache::Handle* handle) {
-    generated_metrics = false;
-    was_released = true;
-  }
-
-  virtual void ReleaseAndRecordMetrics(Cache::Handle* handle, void* handler,
-                                       BlockMetrics* metrics) {
-    generated_metrics = metrics != nullptr;
-    delete metrics;
-    was_released = true;
-  }
-
-  void Reset() {
-    generated_metrics = false;
-    was_released = false;
-  }
-};
-
 TEST(BlockTest, MetricsIter) {
   Random rnd(301);
   Options options = Options();
@@ -182,50 +143,41 @@ TEST(BlockTest, MetricsIter) {
   contents.heap_allocated = false;
   Block reader(contents);
 
-  MockCache c;
-  Cache::Handle* ch = reinterpret_cast<Cache::Handle*>(&c);
+  BlockMetrics* bm = nullptr;;
 
-  c.Reset();
   Iterator* iter = reader.NewMetricsIterator(options.comparator, 0, 0,
-                                             &c, ch, &c);
+                                             &bm);
   delete iter;
-  ASSERT_TRUE(c.was_released);
-  ASSERT_TRUE(!c.generated_metrics) << "needlessly generated metrics.\n";
+  ASSERT_TRUE(bm == nullptr) << "needlessly generated metrics.\n";
 
   for (int i = 1; i < num_records; i += 2) {
-    c.Reset();
-    iter = reader.NewMetricsIterator(options.comparator, 0, 0, &c, ch, &c);
+    iter = reader.NewMetricsIterator(options.comparator, 0, 0, &bm);
     iter->Seek(keys[i]);
     delete iter;
-    ASSERT_TRUE(c.was_released);
-    ASSERT_TRUE(!c.generated_metrics) << "generated metrics for unfound row.\n";
+    ASSERT_TRUE(bm == nullptr) << "generated metrics for unfound row.\n";
   }
 
-  c.Reset();
-  iter = reader.NewMetricsIterator(options.comparator, 0, 0, &c, ch, &c);
+  iter = reader.NewMetricsIterator(options.comparator, 0, 0, &bm);
   iter->SeekToFirst();
   iter->Next();
   iter->SeekToLast();
   iter->Prev();
   delete iter;
-  ASSERT_TRUE(c.was_released);
-  ASSERT_TRUE(!c.generated_metrics) << "generated metrics for non-Seek().\n";
+  ASSERT_TRUE(bm == nullptr) << "generated metrics for non-Seek().\n";
 
   for (int i = 0; i < num_records; i += 2) {
-    c.Reset();
-    iter = reader.NewMetricsIterator(options.comparator, 0, 0, &c, ch, &c);
+    iter = reader.NewMetricsIterator(options.comparator, 0, 0, &bm);
     iter->Seek(keys[i]);
     delete iter;
-    ASSERT_TRUE(c.was_released);
-    ASSERT_TRUE(c.generated_metrics) << "didn't generate metrics for found row\n";
+    ASSERT_TRUE(bm != nullptr) << "didn't generate metrics for found row\n";
+    delete bm; bm = nullptr;
   }
 
-  c.Reset();
-  iter = reader.NewMetricsIterator(options.comparator, 0, 0, &c, ch, &c);
+  iter = reader.NewMetricsIterator(options.comparator, 0, 0, &bm);
   iter->Seek(InternalKey("     2", 140, kTypeDeletion).Encode());
   delete iter;
-  ASSERT_TRUE(c.was_released);
-  ASSERT_TRUE(c.generated_metrics) << "didn't generate metrics for Seek().\n";
+  ASSERT_TRUE(bm != nullptr) << "didn't generate metrics for Seek().\n";
+  delete bm; bm = nullptr;
 }
 
 class BlockMetricsTest {
index 818c22c12b570513892b5a5a7192623768558b9c..d2aa14fdbe2c13ea1aea413803d6cbfaba1a1273 100644 (file)
@@ -261,23 +261,22 @@ Iterator* Table::BlockReader(void* arg,
 
   Iterator* iter;
   if (block != NULL) {
-    if (options.metrics_handler == NULL || cache_handle == NULL) {
+    if (options.metrics_instance == NULL || cache_handle == NULL) {
       iter = block->NewIterator(table->rep_->options.comparator,
                                 table->rep_->file_number,
                                 handle.offset());
 
-      if (cache_handle == NULL) {
-        iter->RegisterCleanup(&DeleteBlock, block, NULL);
-      } else {
-        iter->RegisterCleanup(&ReleaseBlock, block_cache, cache_handle);
-      }
     } else {
       iter = block->NewMetricsIterator(table->rep_->options.comparator,
                                        table->rep_->file_number,
                                        handle.offset(),
-                                       block_cache,
-                                       cache_handle,
-                                       options.metrics_handler);
+                                       options.metrics_instance);
+    }
+
+    if (cache_handle == NULL) {
+      iter->RegisterCleanup(&DeleteBlock, block, NULL);
+    } else {
+      iter->RegisterCleanup(&ReleaseBlock, block_cache, cache_handle);
     }
   } else {
     iter = NewErrorIterator(s);
index bd61d480261fa214a1be7e000b336548ae5a4f37..daaea44085ec3af7e9e2c744fc29a00d5c87c2ed 100644 (file)
@@ -3,14 +3,11 @@
 // found in the LICENSE file. See the AUTHORS file for names of contributors.
 
 #include <assert.h>
-#include <map>
 #include <stdio.h>
 #include <stdlib.h>
-#include <vector>
 
 #include "leveldb/cache.h"
 #include "port/port.h"
-#include "table/block.h"
 #include "util/hash.h"
 #include "util/mutexlock.h"
 
@@ -19,21 +16,6 @@ namespace leveldb {
 Cache::~Cache() {
 }
 
-// Stub methods to not break backwards compatibility.
-void Cache::ReleaseAndRecordMetrics(Cache::Handle* handle, void* handler,
-                                    BlockMetrics* metrics) {
-  delete metrics;
-}
-void Cache::AddHandler(
-      void* handler,
-      void (*handler_func)(void*, std::vector<BlockMetrics*>*)) {
-}
-void Cache::RemoveHandler(void* handler) {
-}
-void Cache::ForceFlushMetrics() {
-}
-
-
 namespace {
 
 // LRU cache implementation
@@ -149,15 +131,11 @@ class HandleTable {
   }
 };
 
-// Number of metrics that are stored per shard before they get flushed to the
-// handler_func.
-size_t kMetricsFlushThreshold = 10000u;
-
 // A single shard of sharded cache.
 class LRUCache {
  public:
   LRUCache();
-  virtual ~LRUCache();
+  ~LRUCache();
 
   // Separate from constructor so caller can easily make an array of LRUCache
   void SetCapacity(size_t capacity) { capacity_ = capacity; }
@@ -170,15 +148,6 @@ class LRUCache {
   void Release(Cache::Handle* handle);
   void Erase(const Slice& key, uint32_t hash);
 
-
-  void ReleaseAndRecordMetrics(Cache::Handle* handle, void* handler,
-                               BlockMetrics* metrics);
-  void AddHandler(
-      void* handler,
-      void (*handler_func)(void*, std::vector<BlockMetrics*>*));
-  void RemoveHandler(void* handler);
-  void ForceFlushMetrics();
-
  private:
   void LRU_Remove(LRUHandle* e);
   void LRU_Append(LRUHandle* e);
@@ -197,11 +166,6 @@ class LRUCache {
   LRUHandle lru_;
 
   HandleTable table_;
-
-  // Stores handler_funcs for the handlers.
-  std::map<void*, void (*)(void*, std::vector<BlockMetrics*>*)> handlers_;
-  // Stores the cache of metrics for specific handlers.
-  std::map<void*, std::vector<BlockMetrics*>*> metrics_store_;
 };
 
 LRUCache::LRUCache()
@@ -213,14 +177,6 @@ LRUCache::LRUCache()
 }
 
 LRUCache::~LRUCache() {
-  // Flush cached metrics to the handlers.
-  std::map<void*, std::vector<BlockMetrics*>*>::iterator it;
-  for (it = metrics_store_.begin();
-       it != metrics_store_.end(); ++it) {
-    void* handler = it->first;
-    (*handlers_[handler])(handler, metrics_store_[handler]);
-  }
-
   for (LRUHandle* e = lru_.next; e != &lru_; ) {
     LRUHandle* next = e->next;
     assert(e->refs == 1);  // Error if caller has an unreleased handle
@@ -310,59 +266,6 @@ void LRUCache::Erase(const Slice& key, uint32_t hash) {
   }
 }
 
-void LRUCache::ReleaseAndRecordMetrics(Cache::Handle* handle, void* handler,
-                                       BlockMetrics* metrics) {
-  MutexLock l(&mutex_);
-
-  // Stores the metrics in the cache for the handler if handler has been added;
-  // otherwise, we just delete it.
-  if (metrics_store_.count(handler) != 0) {
-    metrics_store_[handler]->push_back(metrics);
-    if (metrics_store_[handler]->size() >= kMetricsFlushThreshold) {
-      (*handlers_[handler])(handler, metrics_store_[handler]);
-      metrics_store_[handler] = new std::vector<BlockMetrics*>();
-    }
-  } else {
-    delete metrics;
-  }
-
-  Unref(reinterpret_cast<LRUHandle*>(handle));
-}
-
-void LRUCache::AddHandler(
-    void* handler,
-    void (*handler_func)(void*, std::vector<BlockMetrics*>*)) {
-  assert(handler != NULL);
-  assert(handler_func != NULL);
-  assert(handlers_.count(handler) == 0);
-  assert(metrics_store_.count(handler) == 0);
-
-  MutexLock l(&mutex_);
-
-  handlers_[handler] = handler_func;
-  metrics_store_[handler] = new std::vector<BlockMetrics*>();
-}
-
-void LRUCache::RemoveHandler(void* handler) {
-  assert(handlers_.count(handler) != 0);
-  assert(metrics_store_.count(handler) != 0);
-  MutexLock l(&mutex_);
-
-  (*handlers_[handler])(handler, metrics_store_[handler]);
-  handlers_.erase(handler);
-  metrics_store_.erase(handler);
-}
-
-void LRUCache::ForceFlushMetrics() {
-  std::map<void*, std::vector<BlockMetrics*>*>::iterator it;
-  for (it = metrics_store_.begin();
-       it != metrics_store_.end(); ++it) {
-    void* handler = it->first;
-    (*handlers_[handler])(handler, metrics_store_[handler]);
-    metrics_store_[handler] = new std::vector<BlockMetrics*>();
-  }
-}
-
 static int kNumShardBits = 4;         // default values, can be overridden
 
 class ShardedLRUCache : public Cache {
@@ -371,7 +274,6 @@ class ShardedLRUCache : public Cache {
   port::Mutex id_mutex_;
   uint64_t last_id_;
   int numShardBits;
-  size_t numShards_;
   size_t capacity_;
 
   static inline uint32_t HashSlice(const Slice& s) {
@@ -385,10 +287,10 @@ class ShardedLRUCache : public Cache {
   void init(size_t capacity, int numbits) {
     numShardBits = numbits;
     capacity_ = capacity;
-    numShards_ = 1u << numShardBits;
-    shard_ = new LRUCache[numShards_];
-    const size_t per_shard = (capacity + (numShards_ - 1)) / numShards_;
-    for (size_t s = 0; s < numShards_; s++) {
+    int numShards = 1 << numShardBits;
+    shard_ = new LRUCache[numShards];
+    const size_t per_shard = (capacity + (numShards - 1)) / numShards;
+    for (int s = 0; s < numShards; s++) {
       shard_[s].SetCapacity(per_shard);
     }
   }
@@ -432,31 +334,6 @@ class ShardedLRUCache : public Cache {
   virtual uint64_t GetCapacity() {
     return capacity_;
   }
-
-  void ReleaseAndRecordMetrics(Handle* handle, void* handler,
-                               BlockMetrics* metrics) {
-    LRUHandle* h = reinterpret_cast<LRUHandle*>(handle);
-
-    shard_[Shard(h->hash)].ReleaseAndRecordMetrics(handle, handler, metrics);
-  }
-
-  void AddHandler(void* handler,
-                  void (*handler_func)(void*, std::vector<BlockMetrics*>*)) {
-    for (size_t i = 0; i < numShards_; ++i) {
-     shard_[i].AddHandler(handler, handler_func);
-    }
-  }
-  void RemoveHandler(void* handler) {
-    for (size_t i = 0; i < numShards_; ++i) {
-      shard_[i].RemoveHandler(handler);
-    }
-  }
-
-  void ForceFlushMetrics() {
-    for (size_t i = 0; i < numShards_; ++i) {
-      shard_[i].ForceFlushMetrics();
-    }
-  }
 };
 
 }  // end anonymous namespace