]> git-server-git.apps.pok.os.sepia.ceph.com Git - rocksdb.git/commitdiff
Cache warming blocks during flush (#8561)
authorAkanksha Mahajan <akankshamahajan@fb.com>
Tue, 3 Aug 2021 19:42:22 +0000 (12:42 -0700)
committerFacebook GitHub Bot <facebook-github-bot@users.noreply.github.com>
Tue, 3 Aug 2021 19:44:15 +0000 (12:44 -0700)
Summary:
Insert warm blocks  (data, uncompressed dict, index and filter blocks) during flush in Block cache which is enabled under option BlockBasedTableOptions.prepopulate_block_cache.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8561

Test Plan: Added unit test

Reviewed By: anand1976

Differential Revision: D29773411

Pulled By: akankshamahajan15

fbshipit-source-id: 6631123c10134340ef0bd7e90baafaa6deba0e66

HISTORY.md
db/db_block_cache_test.cc
include/rocksdb/table.h
table/block_based/block_based_table_builder.cc
table/block_based/block_based_table_builder.h

index 42fd34181017aa07eace8ae4e5a0a7b7f0ef65ac..a7456738055c7c958808e30b18bf56f39b1aa584 100644 (file)
@@ -5,6 +5,7 @@
 ### New Features
 * Made the EventListener extend the Customizable class.
 * EventListeners that have a non-empty Name() and that are registered with the ObjectRegistry can now be serialized to/from the OPTIONS file.
+* Insert warm blocks (data blocks, uncompressed dict blocks, index and filter blocks) in Block cache during flush under option BlockBasedTableOptions.prepopulate_block_cache. Previously it was enabled for only data blocks.
 
 ### Performance Improvements
 * Try to avoid updating DBOptions if `SetDBOptions()` does not change any option value.
index 6c03dbf444e98da6d3403dc8c5efd6a81b258a6b..faf6f8a5059231ea8aa4d5c84000bbfff5584b8e 100644 (file)
@@ -497,10 +497,46 @@ TEST_F(DBBlockCacheTest, WarmCacheWithDataBlocksDuringFlush) {
     ASSERT_OK(Put(ToString(i), value));
     ASSERT_OK(Flush());
     ASSERT_EQ(i, options.statistics->getTickerCount(BLOCK_CACHE_DATA_ADD));
+    ASSERT_EQ(value, Get(ToString(i)));
+    ASSERT_EQ(0, options.statistics->getTickerCount(BLOCK_CACHE_DATA_MISS));
+    ASSERT_EQ(i, options.statistics->getTickerCount(BLOCK_CACHE_DATA_HIT));
+  }
+}
+
+// This test cache all types of blocks during flush.
+TEST_F(DBBlockCacheTest, WarmCacheWithBlocksDuringFlush) {
+  Options options = CurrentOptions();
+  options.create_if_missing = true;
+  options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
+
+  BlockBasedTableOptions table_options;
+  table_options.block_cache = NewLRUCache(1 << 25, 0, false);
+  table_options.cache_index_and_filter_blocks = true;
+  table_options.prepopulate_block_cache =
+      BlockBasedTableOptions::PrepopulateBlockCache::kFlushOnly;
+  table_options.filter_policy.reset(NewBloomFilterPolicy(10, false));
+  options.table_factory.reset(NewBlockBasedTableFactory(table_options));
+  DestroyAndReopen(options);
+
+  std::string value(kValueSize, 'a');
+  for (size_t i = 1; i < 2; i++) {
+    ASSERT_OK(Put(ToString(i), value));
+    ASSERT_OK(Flush());
+    ASSERT_EQ(i, options.statistics->getTickerCount(BLOCK_CACHE_DATA_ADD));
+    ASSERT_EQ(i, options.statistics->getTickerCount(BLOCK_CACHE_INDEX_ADD));
+    ASSERT_EQ(i, options.statistics->getTickerCount(BLOCK_CACHE_FILTER_ADD));
 
     ASSERT_EQ(value, Get(ToString(i)));
+
     ASSERT_EQ(0, options.statistics->getTickerCount(BLOCK_CACHE_DATA_MISS));
     ASSERT_EQ(i, options.statistics->getTickerCount(BLOCK_CACHE_DATA_HIT));
+
+    ASSERT_EQ(0, options.statistics->getTickerCount(BLOCK_CACHE_INDEX_MISS));
+    ASSERT_EQ(i * 3, options.statistics->getTickerCount(BLOCK_CACHE_INDEX_HIT));
+
+    ASSERT_EQ(0, options.statistics->getTickerCount(BLOCK_CACHE_FILTER_MISS));
+    ASSERT_EQ(i * 2,
+              options.statistics->getTickerCount(BLOCK_CACHE_FILTER_HIT));
   }
 }
 #endif
index 13b31ee47b9f7ec6a3d14aa05d19518b7e3dc01b..94045ff6bf5f9f8038c7e4a47b69806a55487c36 100644 (file)
@@ -464,22 +464,18 @@ struct BlockBasedTableOptions {
   // Default: 256 KB (256 * 1024).
   size_t max_auto_readahead_size = 256 * 1024;
 
-  // If enabled, prepopulate warm/hot data blocks which are already in memory
-  // into block cache at the time of flush. On a flush, the data block that is
-  // in memory (in memtables) get flushed to the device. If using Direct IO,
-  // additional IO is incurred to read this data back into memory again, which
-  // is avoided by enabling this option. This further helps if the workload
-  // exhibits high temporal locality, where most of the reads go to recently
-  // written data. This also helps in case of Distributed FileSystem.
-  //
-  // Right now, this is enabled only for flush for data blocks. We plan to
-  // expand this option to cover compactions in the future and for other types
-  // of blocks.
+  // If enabled, prepopulate warm/hot blocks (data, uncompressed dict, index and
+  // filter blocks) which are already in memory into block cache at the time of
+  // flush. On a flush, the block that is in memory (in memtables) get flushed
+  // to the device. If using Direct IO, additional IO is incurred to read this
+  // data back into memory again, which is avoided by enabling this option. This
+  // further helps if the workload exhibits high temporal locality, where most
+  // of the reads go to recently written data. This also helps in case of
+  // Distributed FileSystem.
   enum class PrepopulateBlockCache : char {
     // Disable prepopulate block cache.
     kDisable,
-    // Prepopulate data blocks during flush only. Plan to extend it to all block
-    // types.
+    // Prepopulate blocks during flush only.
     kFlushOnly,
   };
 
index bb8cfa14d62a1b64bd478d55f77824500b7f7e75..189a5dd9985bfaaf020f83d35dcbe47522243afa 100644 (file)
@@ -36,6 +36,7 @@
 #include "table/block_based/block_based_table_factory.h"
 #include "table/block_based/block_based_table_reader.h"
 #include "table/block_based/block_builder.h"
+#include "table/block_based/block_like_traits.h"
 #include "table/block_based/filter_block.h"
 #include "table/block_based/filter_policy_internal.h"
 #include "table/block_based/full_filter_block.h"
@@ -994,33 +995,34 @@ void BlockBasedTableBuilder::Flush() {
                                              r->get_offset());
     r->pc_rep->EmitBlock(block_rep);
   } else {
-    WriteBlock(&r->data_block, &r->pending_handle, true /* is_data_block */);
+    WriteBlock(&r->data_block, &r->pending_handle, BlockType::kData);
   }
 }
 
 void BlockBasedTableBuilder::WriteBlock(BlockBuilder* block,
                                         BlockHandle* handle,
-                                        bool is_data_block) {
+                                        BlockType block_type) {
   block->Finish();
   std::string raw_block_contents;
   block->SwapAndReset(raw_block_contents);
   if (rep_->state == Rep::State::kBuffered) {
-    assert(is_data_block);
+    assert(block_type == BlockType::kData);
     rep_->data_block_buffers.emplace_back(std::move(raw_block_contents));
     rep_->data_begin_offset += rep_->data_block_buffers.back().size();
     return;
   }
-  WriteBlock(raw_block_contents, handle, is_data_block);
+  WriteBlock(raw_block_contents, handle, block_type);
 }
 
 void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents,
                                         BlockHandle* handle,
-                                        bool is_data_block) {
+                                        BlockType block_type) {
   Rep* r = rep_;
   assert(r->state == Rep::State::kUnbuffered);
   Slice block_contents;
   CompressionType type;
   Status compress_status;
+  bool is_data_block = block_type == BlockType::kData;
   CompressAndVerifyBlock(raw_block_contents, is_data_block,
                          *(r->compression_ctxs[0]), r->verify_ctxs[0].get(),
                          &(r->compressed_output), &(block_contents), &type,
@@ -1030,8 +1032,7 @@ void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents,
     return;
   }
 
-  WriteRawBlock(block_contents, type, handle, is_data_block,
-                &raw_block_contents);
+  WriteRawBlock(block_contents, type, handle, block_type, &raw_block_contents);
   r->compressed_output.clear();
   if (is_data_block) {
     if (r->filter_builder != nullptr) {
@@ -1189,9 +1190,10 @@ void BlockBasedTableBuilder::CompressAndVerifyBlock(
 void BlockBasedTableBuilder::WriteRawBlock(const Slice& block_contents,
                                            CompressionType type,
                                            BlockHandle* handle,
-                                           bool is_data_block,
+                                           BlockType block_type,
                                            const Slice* raw_block_contents) {
   Rep* r = rep_;
+  bool is_data_block = block_type == BlockType::kData;
   Status s = Status::OK();
   IOStatus io_s = IOStatus::OK();
   StopWatch sw(r->ioptions.clock, r->ioptions.stats, WRITE_RAW_BLOCK_MICROS);
@@ -1247,13 +1249,12 @@ void BlockBasedTableBuilder::WriteRawBlock(const Slice& block_contents,
     io_s = r->file->Append(Slice(trailer, kBlockTrailerSize));
     if (io_s.ok()) {
       assert(s.ok());
-      if (is_data_block &&
-          r->table_options.prepopulate_block_cache ==
-              BlockBasedTableOptions::PrepopulateBlockCache::kFlushOnly) {
+      if (r->table_options.prepopulate_block_cache ==
+          BlockBasedTableOptions::PrepopulateBlockCache::kFlushOnly) {
         if (type == kNoCompression) {
-          s = InsertBlockInCache(block_contents, handle);
+          s = InsertBlockInCacheHelper(block_contents, handle, block_type);
         } else if (raw_block_contents != nullptr) {
-          s = InsertBlockInCache(*raw_block_contents, handle);
+          s = InsertBlockInCacheHelper(*raw_block_contents, handle, block_type);
         }
         if (!s.ok()) {
           r->SetStatus(s);
@@ -1328,10 +1329,8 @@ void BlockBasedTableBuilder::BGWorkWriteRawBlock() {
     }
 
     r->pc_rep->file_size_estimator.SetCurrBlockRawSize(block_rep->data->size());
-
     WriteRawBlock(block_rep->compressed_contents, block_rep->compression_type,
-                  &r->pending_handle, true /* is_data_block*/,
-                  &block_rep->contents);
+                  &r->pending_handle, BlockType::kData, &block_rep->contents);
     if (!ok()) {
       break;
     }
@@ -1460,8 +1459,30 @@ Status BlockBasedTableBuilder::InsertBlockInCompressedCache(
   return s;
 }
 
+Status BlockBasedTableBuilder::InsertBlockInCacheHelper(
+    const Slice& block_contents, const BlockHandle* handle,
+    BlockType block_type) {
+  Status s;
+  if (block_type == BlockType::kData || block_type == BlockType::kIndex) {
+    s = InsertBlockInCache<Block>(block_contents, handle, block_type);
+  } else if (block_type == BlockType::kFilter) {
+    if (rep_->filter_builder->IsBlockBased()) {
+      s = InsertBlockInCache<Block>(block_contents, handle, block_type);
+    } else {
+      s = InsertBlockInCache<ParsedFullFilterBlock>(block_contents, handle,
+                                                    block_type);
+    }
+  } else if (block_type == BlockType::kCompressionDictionary) {
+    s = InsertBlockInCache<UncompressionDict>(block_contents, handle,
+                                              block_type);
+  }
+  return s;
+}
+
+template <typename TBlocklike>
 Status BlockBasedTableBuilder::InsertBlockInCache(const Slice& block_contents,
-                                                  const BlockHandle* handle) {
+                                                  const BlockHandle* handle,
+                                                  BlockType block_type) {
   // Uncompressed regular block cache
   Cache* block_cache = rep_->table_options.block_cache.get();
   Status s;
@@ -1479,15 +1500,25 @@ Status BlockBasedTableBuilder::InsertBlockInCache(const Slice& block_contents,
 
     const size_t read_amp_bytes_per_bit =
         rep_->table_options.read_amp_bytes_per_bit;
-    Block* block = new Block(std::move(results), read_amp_bytes_per_bit);
-    size_t charge = block->ApproximateMemoryUsage();
-    s = block_cache->Insert(key, block, charge, &DeleteEntryCached<Block>);
-    if (s.ok()) {
-      BlockBasedTable::UpdateCacheInsertionMetrics(
-          BlockType::kData, nullptr /*get_context*/, charge,
-          s.IsOkOverwritten(), rep_->ioptions.stats);
-    } else {
-      RecordTick(rep_->ioptions.stats, BLOCK_CACHE_ADD_FAILURES);
+
+    TBlocklike* block_holder = BlocklikeTraits<TBlocklike>::Create(
+        std::move(results), read_amp_bytes_per_bit,
+        rep_->ioptions.statistics.get(),
+        false /*rep_->blocks_definitely_zstd_compressed*/,
+        rep_->table_options.filter_policy.get());
+
+    if (block_holder->own_bytes()) {
+      size_t charge = block_holder->ApproximateMemoryUsage();
+      s = block_cache->Insert(key, block_holder, charge,
+                              &DeleteEntryCached<TBlocklike>);
+
+      if (s.ok()) {
+        BlockBasedTable::UpdateCacheInsertionMetrics(
+            block_type, nullptr /*get_context*/, charge, s.IsOkOverwritten(),
+            rep_->ioptions.stats);
+      } else {
+        RecordTick(rep_->ioptions.stats, BLOCK_CACHE_ADD_FAILURES);
+      }
     }
   }
   return s;
@@ -1507,7 +1538,8 @@ void BlockBasedTableBuilder::WriteFilterBlock(
           rep_->filter_builder->Finish(filter_block_handle, &s);
       assert(s.ok() || s.IsIncomplete());
       rep_->props.filter_size += filter_content.size();
-      WriteRawBlock(filter_content, kNoCompression, &filter_block_handle);
+      WriteRawBlock(filter_content, kNoCompression, &filter_block_handle,
+                    BlockType::kFilter);
     }
   }
   if (ok() && !empty_filter_block) {
@@ -1541,7 +1573,7 @@ void BlockBasedTableBuilder::WriteIndexBlock(
   if (ok()) {
     for (const auto& item : index_blocks.meta_blocks) {
       BlockHandle block_handle;
-      WriteBlock(item.second, &block_handle, false /* is_data_block */);
+      WriteBlock(item.second, &block_handle, BlockType::kIndex);
       if (!ok()) {
         break;
       }
@@ -1550,10 +1582,11 @@ void BlockBasedTableBuilder::WriteIndexBlock(
   }
   if (ok()) {
     if (rep_->table_options.enable_index_compression) {
-      WriteBlock(index_blocks.index_block_contents, index_block_handle, false);
+      WriteBlock(index_blocks.index_block_contents, index_block_handle,
+                 BlockType::kIndex);
     } else {
       WriteRawBlock(index_blocks.index_block_contents, kNoCompression,
-                    index_block_handle);
+                    index_block_handle, BlockType::kIndex);
     }
   }
   // If there are more index partitions, finish them and write them out
@@ -1567,10 +1600,10 @@ void BlockBasedTableBuilder::WriteIndexBlock(
       }
       if (rep_->table_options.enable_index_compression) {
         WriteBlock(index_blocks.index_block_contents, index_block_handle,
-                   false);
+                   BlockType::kIndex);
       } else {
         WriteRawBlock(index_blocks.index_block_contents, kNoCompression,
-                      index_block_handle);
+                      index_block_handle, BlockType::kIndex);
       }
       // The last index_block_handle will be for the partition index block
     }
@@ -1665,7 +1698,7 @@ void BlockBasedTableBuilder::WritePropertiesBlock(
                                          &property_block_builder);
 
     WriteRawBlock(property_block_builder.Finish(), kNoCompression,
-                  &properties_block_handle);
+                  &properties_block_handle, BlockType::kProperties);
   }
   if (ok()) {
 #ifndef NDEBUG
@@ -1691,7 +1724,8 @@ void BlockBasedTableBuilder::WriteCompressionDictBlock(
     BlockHandle compression_dict_block_handle;
     if (ok()) {
       WriteRawBlock(rep_->compression_dict->GetRawDict(), kNoCompression,
-                    &compression_dict_block_handle);
+                    &compression_dict_block_handle,
+                    BlockType::kCompressionDictionary);
 #ifndef NDEBUG
       Slice compression_dict = rep_->compression_dict->GetRawDict();
       TEST_SYNC_POINT_CALLBACK(
@@ -1711,7 +1745,7 @@ void BlockBasedTableBuilder::WriteRangeDelBlock(
   if (ok() && !rep_->range_del_block.empty()) {
     BlockHandle range_del_block_handle;
     WriteRawBlock(rep_->range_del_block.Finish(), kNoCompression,
-                  &range_del_block_handle);
+                  &range_del_block_handle, BlockType::kRangeDeletion);
     meta_index_builder->Add(kRangeDelBlock, range_del_block_handle);
   }
 }
@@ -1872,8 +1906,7 @@ void BlockBasedTableBuilder::EnterUnbuffered() {
         }
         r->index_builder->OnKeyAdded(key);
       }
-      WriteBlock(Slice(data_block), &r->pending_handle,
-                 true /* is_data_block */);
+      WriteBlock(Slice(data_block), &r->pending_handle, BlockType::kData);
       if (ok() && i + 1 < r->data_block_buffers.size()) {
         assert(next_block_iter != nullptr);
         Slice first_key_in_next_block = next_block_iter->key();
@@ -1935,7 +1968,7 @@ Status BlockBasedTableBuilder::Finish() {
   if (ok()) {
     // flush the meta index block
     WriteRawBlock(meta_index_builder.Finish(), kNoCompression,
-                  &metaindex_block_handle);
+                  &metaindex_block_handle, BlockType::kMetaIndex);
   }
   if (ok()) {
     WriteFooter(metaindex_block_handle, index_block_handle);
index 65be35b194864ce073146cce83d7c4741d2b1d38..ed91dbf3293444ba2898b1c807bbfc6443de1a0e 100644 (file)
@@ -9,6 +9,7 @@
 
 #pragma once
 #include <stdint.h>
+
 #include <limits>
 #include <string>
 #include <utility>
@@ -108,20 +109,27 @@ class BlockBasedTableBuilder : public TableBuilder {
   // Call block's Finish() method and then
   // - in buffered mode, buffer the uncompressed block contents.
   // - in unbuffered mode, write the compressed block contents to file.
-  void WriteBlock(BlockBuilder* block, BlockHandle* handle, bool is_data_block);
+  void WriteBlock(BlockBuilder* block, BlockHandle* handle,
+                  BlockType blocktype);
 
   // Compress and write block content to the file.
   void WriteBlock(const Slice& block_contents, BlockHandle* handle,
-                  bool is_data_block);
+                  BlockType block_type);
   // Directly write data to the file.
   void WriteRawBlock(const Slice& data, CompressionType, BlockHandle* handle,
-                     bool is_data_block = false,
-                     const Slice* raw_data = nullptr);
+
+                     BlockType block_type, const Slice* raw_data = nullptr);
 
   void SetupCacheKeyPrefix(const TableBuilderOptions& tbo);
 
+  template <typename TBlocklike>
   Status InsertBlockInCache(const Slice& block_contents,
-                            const BlockHandle* handle);
+                            const BlockHandle* handle, BlockType block_type);
+
+  Status InsertBlockInCacheHelper(const Slice& block_contents,
+                                  const BlockHandle* handle,
+                                  BlockType block_type);
+
   Status InsertBlockInCompressedCache(const Slice& block_contents,
                                       const CompressionType type,
                                       const BlockHandle* handle);