]> git-server-git.apps.pok.os.sepia.ceph.com Git - rocksdb.git/commitdiff
Account for dictionary-building buffer in global memory limit (#8428)
authorHui Xiao <huixiao@fb.com>
Wed, 8 Sep 2021 19:34:35 +0000 (12:34 -0700)
committerFacebook GitHub Bot <facebook-github-bot@users.noreply.github.com>
Wed, 8 Sep 2021 19:35:46 +0000 (12:35 -0700)
Summary:
Context:
Some data blocks are temporarily buffered in memory in BlockBasedTableBuilder for building compression dictionary used in data block compression. Currently this memory usage is not counted toward our global memory usage utilizing block cache capacity. To improve that, this PR charges that memory usage into the block cache to achieve better memory tracking and limiting.

- Reserve memory in block cache for buffered data blocks that are used to build a compression dictionary
- Release all the memory associated with buffering the data blocks mentioned above in EnterUnbuffered(), which is called when (a) buffer limit is exceeded after buffering OR (b) the block cache becomes full after reservation OR (c) BlockBasedTableBuilder calls Finish()

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8428

Test Plan:
- Passing existing unit tests
- Passing new unit tests

Reviewed By: ajkr

Differential Revision: D30755305

Pulled By: hx235

fbshipit-source-id: 6e66665020b775154a94c4c5e0f2adaeaff13981

cache/cache_entry_roles.cc
cache/cache_entry_roles.h
cache/cache_reservation_manager.cc
table/block_based/block_based_table_builder.cc
table/table_test.cc

index dbc7120639803b9d53ca8579e186dc823c9fbb49..4416b7c2f40ec7c8e3364b65dbd8e78f19124549 100644 (file)
@@ -19,6 +19,7 @@ std::array<const char*, kNumCacheEntryRoles> kCacheEntryRoleToCamelString{{
     "IndexBlock",
     "OtherBlock",
     "WriteBuffer",
+    "CompressionDictionaryBuildingBuffer",
     "Misc",
 }};
 
@@ -30,6 +31,7 @@ std::array<const char*, kNumCacheEntryRoles> kCacheEntryRoleToHyphenString{{
     "index-block",
     "other-block",
     "write-buffer",
+    "compression-dictionary-building-buffer",
     "misc",
 }};
 
index 22148e00c41f2eae6a72ae1b170d814640df8044..9a6a2ad245e3832ad8232c5dd2b6813559f217ae 100644 (file)
@@ -14,6 +14,8 @@
 namespace ROCKSDB_NAMESPACE {
 
 // Classifications of block cache entries, for reporting statistics
+// Adding new enum to this class requires corresponding updates to
+// kCacheEntryRoleToCamelString and kCacheEntryRoleToHyphenString
 enum class CacheEntryRole {
   // Block-based table data block
   kDataBlock,
@@ -29,6 +31,9 @@ enum class CacheEntryRole {
   kOtherBlock,
   // WriteBufferManager reservations to account for memtable usage
   kWriteBuffer,
+  // BlockBasedTableBuilder reservations to account for
+  // compression dictionary building buffer's memory usage
+  kCompressionDictionaryBuildingBuffer,
   // Default bucket, for miscellaneous cache entries. Do not use for
   // entries that could potentially add up to large usage.
   kMisc,
index d6f62d647e0b533d010c2e48aedffa4152e0c194..6a00748718009d44ca79976b9a8914cf4ca36804 100644 (file)
@@ -69,6 +69,9 @@ Status CacheReservationManager::UpdateCacheReservation(
 // This makes it possible to keep the template definitions in the .cc file.
 template Status CacheReservationManager::UpdateCacheReservation<
     CacheEntryRole::kWriteBuffer>(std::size_t new_mem_used);
+template Status CacheReservationManager::UpdateCacheReservation<
+    CacheEntryRole::kCompressionDictionaryBuildingBuffer>(
+    std::size_t new_mem_used);
 // For cache reservation manager unit tests
 template Status CacheReservationManager::UpdateCacheReservation<
     CacheEntryRole::kMisc>(std::size_t new_mem_used);
index be2cae968cbc69518819d4c6371336a75365f290..d56530965bbdc8ef9a9a2cf5a183435ba37c4d5d 100644 (file)
@@ -21,6 +21,8 @@
 #include <unordered_map>
 #include <utility>
 
+#include "cache/cache_entry_roles.h"
+#include "cache/cache_reservation_manager.h"
 #include "db/dbformat.h"
 #include "index_builder.h"
 #include "memory/memory_allocator.h"
@@ -312,7 +314,7 @@ struct BlockBasedTableBuilder::Rep {
   // `kBuffered` state is allowed only as long as the buffering of uncompressed
   // data blocks (see `data_block_buffers`) does not exceed `buffer_limit`.
   uint64_t buffer_limit;
-
+  std::unique_ptr<CacheReservationManager> cache_rev_mng;
   const bool use_delta_encoding_for_index_values;
   std::unique_ptr<FilterBlockBuilder> filter_builder;
   char cache_key_prefix[BlockBasedTable::kMaxCacheKeyPrefixSize];
@@ -444,6 +446,12 @@ struct BlockBasedTableBuilder::Rep {
       buffer_limit = std::min(tbo.target_file_size,
                               compression_opts.max_dict_buffer_bytes);
     }
+    if (table_options.no_block_cache) {
+      cache_rev_mng.reset(nullptr);
+    } else {
+      cache_rev_mng.reset(
+          new CacheReservationManager(table_options.block_cache));
+    }
     for (uint32_t i = 0; i < compression_opts.parallel_threads; i++) {
       compression_ctxs[i].reset(new CompressionContext(compression_type));
     }
@@ -896,10 +904,24 @@ void BlockBasedTableBuilder::Add(const Slice& key, const Slice& value) {
       assert(!r->data_block.empty());
       r->first_key_in_next_block = &key;
       Flush();
+      if (r->state == Rep::State::kBuffered) {
+        bool exceeds_buffer_limit =
+            (r->buffer_limit != 0 && r->data_begin_offset > r->buffer_limit);
+        bool is_cache_full = false;
+
+        // Increase cache reservation for the last buffered data block
+        // only if the block is not going to be unbuffered immediately
+        // and there exists a cache reservation manager
+        if (!exceeds_buffer_limit && r->cache_rev_mng != nullptr) {
+          Status s = r->cache_rev_mng->UpdateCacheReservation<
+              CacheEntryRole::kCompressionDictionaryBuildingBuffer>(
+              r->data_begin_offset);
+          is_cache_full = s.IsIncomplete();
+        }
 
-      if (r->state == Rep::State::kBuffered && r->buffer_limit != 0 &&
-          r->data_begin_offset > r->buffer_limit) {
-        EnterUnbuffered();
+        if (exceeds_buffer_limit || is_cache_full) {
+          EnterUnbuffered();
+        }
       }
 
       // Add item to index block.
@@ -1910,10 +1932,16 @@ void BlockBasedTableBuilder::EnterUnbuffered() {
                                         r->pending_handle);
       }
     }
-
     std::swap(iter, next_block_iter);
   }
   r->data_block_buffers.clear();
+  r->data_begin_offset = 0;
+  if (r->cache_rev_mng != nullptr) {
+    Status s = r->cache_rev_mng->UpdateCacheReservation<
+        CacheEntryRole::kCompressionDictionaryBuildingBuffer>(
+        r->data_begin_offset);
+    s.PermitUncheckedError();
+  }
 }
 
 Status BlockBasedTableBuilder::Finish() {
index f805418af72a5483197b024df7b612af13328dc7..ad07531863aebaae48347743fe3519d3e8c10674 100644 (file)
@@ -7,6 +7,7 @@
 // Use of this source code is governed by a BSD-style license that can be
 // found in the LICENSE file. See the AUTHORS file for names of contributors.
 
+#include <stddef.h>
 #include <stdio.h>
 
 #include <algorithm>
@@ -4746,6 +4747,239 @@ TEST_P(BlockBasedTableTest, OutOfBoundOnNext) {
   ASSERT_FALSE(iter->UpperBoundCheckResult() == IterBoundCheck::kOutOfBound);
 }
 
+TEST_P(
+    BlockBasedTableTest,
+    IncreaseCacheReservationForCompressDictBuildingBufferOnBuilderAddAndDecreaseOnBuilderFinish) {
+  constexpr std::size_t kSizeDummyEntry = 256 * 1024;
+  constexpr std::size_t kMetaDataChargeOverhead = 10000;
+  constexpr std::size_t kCacheCapacity = 8 * 1024 * 1024;
+  constexpr std::size_t kMaxDictBytes = 1024;
+  constexpr std::size_t kMaxDictBufferBytes = 1024;
+
+  BlockBasedTableOptions table_options = GetBlockBasedTableOptions();
+  LRUCacheOptions lo;
+  lo.capacity = kCacheCapacity;
+  lo.num_shard_bits = 0;  // 2^0 shard
+  lo.strict_capacity_limit = true;
+  std::shared_ptr<Cache> cache(NewLRUCache(lo));
+  table_options.block_cache = cache;
+  table_options.flush_block_policy_factory =
+      std::make_shared<FlushBlockEveryKeyPolicyFactory>();
+
+  Options options;
+  options.compression = kSnappyCompression;
+  options.compression_opts.max_dict_bytes = kMaxDictBytes;
+  options.compression_opts.max_dict_buffer_bytes = kMaxDictBufferBytes;
+  options.table_factory.reset(NewBlockBasedTableFactory(table_options));
+
+  test::StringSink* sink = new test::StringSink();
+  std::unique_ptr<FSWritableFile> holder(sink);
+  std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
+      std::move(holder), "test_file_name", FileOptions()));
+
+  ImmutableOptions ioptions(options);
+  MutableCFOptions moptions(options);
+  InternalKeyComparator ikc(options.comparator);
+  IntTblPropCollectorFactories int_tbl_prop_collector_factories;
+
+  std::unique_ptr<TableBuilder> builder(options.table_factory->NewTableBuilder(
+      TableBuilderOptions(ioptions, moptions, ikc,
+                          &int_tbl_prop_collector_factories, kSnappyCompression,
+                          options.compression_opts, kUnknownColumnFamily,
+                          "test_cf", -1 /* level */),
+      file_writer.get()));
+
+  std::string key1 = "key1";
+  std::string value1 = "val1";
+  InternalKey ik1(key1, 0 /* sequnce number */, kTypeValue);
+  // Adding the first key won't trigger a flush by FlushBlockEveryKeyPolicy
+  // therefore won't trigger any data block's buffering
+  builder->Add(ik1.Encode(), value1);
+  ASSERT_EQ(cache->GetPinnedUsage(), 0 * kSizeDummyEntry);
+
+  std::string key2 = "key2";
+  std::string value2 = "val2";
+  InternalKey ik2(key2, 1 /* sequnce number */, kTypeValue);
+  // Adding the second key will trigger a flush of the last data block (the one
+  // containing key1 and value1) by FlushBlockEveryKeyPolicy and hence trigger
+  // buffering of that data block.
+  builder->Add(ik2.Encode(), value2);
+  // Cache reservation will increase for last buffered data block (the one
+  // containing key1 and value1) since the buffer limit is not exceeded after
+  // that buffering and the cache will not be full after this reservation
+  EXPECT_GE(cache->GetPinnedUsage(), 1 * kSizeDummyEntry);
+  EXPECT_LT(cache->GetPinnedUsage(),
+            1 * kSizeDummyEntry + kMetaDataChargeOverhead);
+
+  ASSERT_OK(builder->Finish());
+  EXPECT_EQ(cache->GetPinnedUsage(), 0 * kSizeDummyEntry);
+}
+
+TEST_P(
+    BlockBasedTableTest,
+    IncreaseCacheReservationForCompressDictBuildingBufferOnBuilderAddAndDecreaseOnBufferLimitExceed) {
+  constexpr std::size_t kSizeDummyEntry = 256 * 1024;
+  constexpr std::size_t kMetaDataChargeOverhead = 10000;
+  constexpr std::size_t kCacheCapacity = 8 * 1024 * 1024;
+  constexpr std::size_t kMaxDictBytes = 1024;
+  constexpr std::size_t kMaxDictBufferBytes = 2 * kSizeDummyEntry;
+
+  BlockBasedTableOptions table_options = GetBlockBasedTableOptions();
+  LRUCacheOptions lo;
+  lo.capacity = kCacheCapacity;
+  lo.num_shard_bits = 0;  // 2^0 shard
+  lo.strict_capacity_limit = true;
+  std::shared_ptr<Cache> cache(NewLRUCache(lo));
+  table_options.block_cache = cache;
+  table_options.flush_block_policy_factory =
+      std::make_shared<FlushBlockEveryKeyPolicyFactory>();
+
+  Options options;
+  options.compression = kSnappyCompression;
+  options.compression_opts.max_dict_bytes = kMaxDictBytes;
+  options.compression_opts.max_dict_buffer_bytes = kMaxDictBufferBytes;
+  options.table_factory.reset(NewBlockBasedTableFactory(table_options));
+
+  test::StringSink* sink = new test::StringSink();
+  std::unique_ptr<FSWritableFile> holder(sink);
+  std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
+      std::move(holder), "test_file_name", FileOptions()));
+
+  ImmutableOptions ioptions(options);
+  MutableCFOptions moptions(options);
+  InternalKeyComparator ikc(options.comparator);
+  IntTblPropCollectorFactories int_tbl_prop_collector_factories;
+
+  std::unique_ptr<TableBuilder> builder(options.table_factory->NewTableBuilder(
+      TableBuilderOptions(ioptions, moptions, ikc,
+                          &int_tbl_prop_collector_factories, kSnappyCompression,
+                          options.compression_opts, kUnknownColumnFamily,
+                          "test_cf", -1 /* level */),
+      file_writer.get()));
+
+  std::string key1 = "key1";
+  std::string value1(kSizeDummyEntry, '0');
+  InternalKey ik1(key1, 0 /* sequnce number */, kTypeValue);
+  // Adding the first key won't trigger a flush by FlushBlockEveryKeyPolicy
+  // therefore won't trigger any data block's buffering
+  builder->Add(ik1.Encode(), value1);
+  ASSERT_EQ(cache->GetPinnedUsage(), 0 * kSizeDummyEntry);
+
+  std::string key2 = "key2";
+  std::string value2(kSizeDummyEntry, '0');
+  InternalKey ik2(key2, 1 /* sequnce number */, kTypeValue);
+  // Adding the second key will trigger a flush of the last data block (the one
+  // containing key1 and value1) by FlushBlockEveryKeyPolicy and hence trigger
+  // buffering of the last data block.
+  builder->Add(ik2.Encode(), value2);
+  // Cache reservation will increase for last buffered data block (the one
+  // containing key1 and value1) since the buffer limit is not exceeded after
+  // the buffering and the cache will not be full after this reservation
+  EXPECT_GE(cache->GetPinnedUsage(), 2 * kSizeDummyEntry);
+  EXPECT_LT(cache->GetPinnedUsage(),
+            2 * kSizeDummyEntry + kMetaDataChargeOverhead);
+
+  std::string key3 = "key3";
+  std::string value3 = "val3";
+  InternalKey ik3(key3, 2 /* sequnce number */, kTypeValue);
+  // Adding the third key will trigger a flush of the last data block (the one
+  // containing key2 and value2) by FlushBlockEveryKeyPolicy and hence trigger
+  // buffering of the last data block.
+  builder->Add(ik3.Encode(), value3);
+  // Cache reservation will decrease since the buffer limit is now exceeded
+  // after the last buffering and EnterUnbuffered() is triggered
+  EXPECT_EQ(cache->GetPinnedUsage(), 0 * kSizeDummyEntry);
+
+  ASSERT_OK(builder->Finish());
+  EXPECT_EQ(cache->GetPinnedUsage(), 0 * kSizeDummyEntry);
+}
+
+TEST_P(
+    BlockBasedTableTest,
+    IncreaseCacheReservationForCompressDictBuildingBufferOnBuilderAddAndDecreaseOnCacheFull) {
+  constexpr std::size_t kSizeDummyEntry = 256 * 1024;
+  constexpr std::size_t kMetaDataChargeOverhead = 10000;
+  // A small kCacheCapacity is chosen so that increase cache reservation for
+  // buffering two data blocks, each containing key1/value1, key2/a big
+  // value2, will cause cache full
+  constexpr std::size_t kCacheCapacity =
+      1 * kSizeDummyEntry + kSizeDummyEntry / 2;
+  constexpr std::size_t kMaxDictBytes = 1024;
+  // A big kMaxDictBufferBytes is chosen so that adding a big key value pair
+  // (key2, value2) won't exceed the buffer limit
+  constexpr std::size_t kMaxDictBufferBytes = 1024 * 1024 * 1024;
+
+  BlockBasedTableOptions table_options = GetBlockBasedTableOptions();
+  LRUCacheOptions lo;
+  lo.capacity = kCacheCapacity;
+  lo.num_shard_bits = 0;  // 2^0 shard
+  lo.strict_capacity_limit = true;
+  std::shared_ptr<Cache> cache(NewLRUCache(lo));
+  table_options.block_cache = cache;
+  table_options.flush_block_policy_factory =
+      std::make_shared<FlushBlockEveryKeyPolicyFactory>();
+
+  Options options;
+  options.compression = kSnappyCompression;
+  options.compression_opts.max_dict_bytes = kMaxDictBytes;
+  options.compression_opts.max_dict_buffer_bytes = kMaxDictBufferBytes;
+  options.table_factory.reset(NewBlockBasedTableFactory(table_options));
+
+  test::StringSink* sink = new test::StringSink();
+  std::unique_ptr<FSWritableFile> holder(sink);
+  std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
+      std::move(holder), "test_file_name", FileOptions()));
+
+  ImmutableOptions ioptions(options);
+  MutableCFOptions moptions(options);
+  InternalKeyComparator ikc(options.comparator);
+  IntTblPropCollectorFactories int_tbl_prop_collector_factories;
+
+  std::unique_ptr<TableBuilder> builder(options.table_factory->NewTableBuilder(
+      TableBuilderOptions(ioptions, moptions, ikc,
+                          &int_tbl_prop_collector_factories, kSnappyCompression,
+                          options.compression_opts, kUnknownColumnFamily,
+                          "test_cf", -1 /* level */),
+      file_writer.get()));
+
+  std::string key1 = "key1";
+  std::string value1 = "val1";
+  InternalKey ik1(key1, 0 /* sequnce number */, kTypeValue);
+  // Adding the first key won't trigger a flush by FlushBlockEveryKeyPolicy
+  // therefore won't trigger any data block's buffering
+  builder->Add(ik1.Encode(), value1);
+  ASSERT_EQ(cache->GetPinnedUsage(), 0 * kSizeDummyEntry);
+
+  std::string key2 = "key2";
+  std::string value2(kSizeDummyEntry, '0');
+  InternalKey ik2(key2, 1 /* sequnce number */, kTypeValue);
+  // Adding the second key will trigger a flush of the last data block (the one
+  // containing key1 and value1) by FlushBlockEveryKeyPolicy and hence trigger
+  // buffering of the last data block.
+  builder->Add(ik2.Encode(), value2);
+  // Cache reservation will increase for the last buffered data block (the one
+  // containing key1 and value1) since the buffer limit is not exceeded after
+  // the buffering and the cache will not be full after this reservation
+  EXPECT_GE(cache->GetPinnedUsage(), 1 * kSizeDummyEntry);
+  EXPECT_LT(cache->GetPinnedUsage(),
+            1 * kSizeDummyEntry + kMetaDataChargeOverhead);
+
+  std::string key3 = "key3";
+  std::string value3 = "value3";
+  InternalKey ik3(key3, 2 /* sequnce number */, kTypeValue);
+  // Adding the third key will trigger a flush of the last data block (the one
+  // containing key2 and value2) by FlushBlockEveryKeyPolicy and hence trigger
+  // buffering of the last data block.
+  builder->Add(ik3.Encode(), value3);
+  // Cache reservation will decrease since the cache is now full after
+  // increasing reservation for the last buffered block and EnterUnbuffered() is
+  // triggered
+  EXPECT_EQ(cache->GetPinnedUsage(), 0 * kSizeDummyEntry);
+
+  ASSERT_OK(builder->Finish());
+  EXPECT_EQ(cache->GetPinnedUsage(), 0 * kSizeDummyEntry);
+}
+
 }  // namespace ROCKSDB_NAMESPACE
 
 int main(int argc, char** argv) {