// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
+#include <stddef.h>
#include <stdio.h>
#include <algorithm>
ASSERT_FALSE(iter->UpperBoundCheckResult() == IterBoundCheck::kOutOfBound);
}
+TEST_P(
+ BlockBasedTableTest,
+ IncreaseCacheReservationForCompressDictBuildingBufferOnBuilderAddAndDecreaseOnBuilderFinish) {
+ constexpr std::size_t kSizeDummyEntry = 256 * 1024;
+ constexpr std::size_t kMetaDataChargeOverhead = 10000;
+ constexpr std::size_t kCacheCapacity = 8 * 1024 * 1024;
+ constexpr std::size_t kMaxDictBytes = 1024;
+ constexpr std::size_t kMaxDictBufferBytes = 1024;
+
+ BlockBasedTableOptions table_options = GetBlockBasedTableOptions();
+ LRUCacheOptions lo;
+ lo.capacity = kCacheCapacity;
+ lo.num_shard_bits = 0; // 2^0 shard
+ lo.strict_capacity_limit = true;
+ std::shared_ptr<Cache> cache(NewLRUCache(lo));
+ table_options.block_cache = cache;
+ table_options.flush_block_policy_factory =
+ std::make_shared<FlushBlockEveryKeyPolicyFactory>();
+
+ Options options;
+ options.compression = kSnappyCompression;
+ options.compression_opts.max_dict_bytes = kMaxDictBytes;
+ options.compression_opts.max_dict_buffer_bytes = kMaxDictBufferBytes;
+ options.table_factory.reset(NewBlockBasedTableFactory(table_options));
+
+ test::StringSink* sink = new test::StringSink();
+ std::unique_ptr<FSWritableFile> holder(sink);
+ std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
+ std::move(holder), "test_file_name", FileOptions()));
+
+ ImmutableOptions ioptions(options);
+ MutableCFOptions moptions(options);
+ InternalKeyComparator ikc(options.comparator);
+ IntTblPropCollectorFactories int_tbl_prop_collector_factories;
+
+ std::unique_ptr<TableBuilder> builder(options.table_factory->NewTableBuilder(
+ TableBuilderOptions(ioptions, moptions, ikc,
+ &int_tbl_prop_collector_factories, kSnappyCompression,
+ options.compression_opts, kUnknownColumnFamily,
+ "test_cf", -1 /* level */),
+ file_writer.get()));
+
+ std::string key1 = "key1";
+ std::string value1 = "val1";
+ InternalKey ik1(key1, 0 /* sequnce number */, kTypeValue);
+ // Adding the first key won't trigger a flush by FlushBlockEveryKeyPolicy
+ // therefore won't trigger any data block's buffering
+ builder->Add(ik1.Encode(), value1);
+ ASSERT_EQ(cache->GetPinnedUsage(), 0 * kSizeDummyEntry);
+
+ std::string key2 = "key2";
+ std::string value2 = "val2";
+ InternalKey ik2(key2, 1 /* sequnce number */, kTypeValue);
+ // Adding the second key will trigger a flush of the last data block (the one
+ // containing key1 and value1) by FlushBlockEveryKeyPolicy and hence trigger
+ // buffering of that data block.
+ builder->Add(ik2.Encode(), value2);
+ // Cache reservation will increase for last buffered data block (the one
+ // containing key1 and value1) since the buffer limit is not exceeded after
+ // that buffering and the cache will not be full after this reservation
+ EXPECT_GE(cache->GetPinnedUsage(), 1 * kSizeDummyEntry);
+ EXPECT_LT(cache->GetPinnedUsage(),
+ 1 * kSizeDummyEntry + kMetaDataChargeOverhead);
+
+ ASSERT_OK(builder->Finish());
+ EXPECT_EQ(cache->GetPinnedUsage(), 0 * kSizeDummyEntry);
+}
+
+TEST_P(
+ BlockBasedTableTest,
+ IncreaseCacheReservationForCompressDictBuildingBufferOnBuilderAddAndDecreaseOnBufferLimitExceed) {
+ constexpr std::size_t kSizeDummyEntry = 256 * 1024;
+ constexpr std::size_t kMetaDataChargeOverhead = 10000;
+ constexpr std::size_t kCacheCapacity = 8 * 1024 * 1024;
+ constexpr std::size_t kMaxDictBytes = 1024;
+ constexpr std::size_t kMaxDictBufferBytes = 2 * kSizeDummyEntry;
+
+ BlockBasedTableOptions table_options = GetBlockBasedTableOptions();
+ LRUCacheOptions lo;
+ lo.capacity = kCacheCapacity;
+ lo.num_shard_bits = 0; // 2^0 shard
+ lo.strict_capacity_limit = true;
+ std::shared_ptr<Cache> cache(NewLRUCache(lo));
+ table_options.block_cache = cache;
+ table_options.flush_block_policy_factory =
+ std::make_shared<FlushBlockEveryKeyPolicyFactory>();
+
+ Options options;
+ options.compression = kSnappyCompression;
+ options.compression_opts.max_dict_bytes = kMaxDictBytes;
+ options.compression_opts.max_dict_buffer_bytes = kMaxDictBufferBytes;
+ options.table_factory.reset(NewBlockBasedTableFactory(table_options));
+
+ test::StringSink* sink = new test::StringSink();
+ std::unique_ptr<FSWritableFile> holder(sink);
+ std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
+ std::move(holder), "test_file_name", FileOptions()));
+
+ ImmutableOptions ioptions(options);
+ MutableCFOptions moptions(options);
+ InternalKeyComparator ikc(options.comparator);
+ IntTblPropCollectorFactories int_tbl_prop_collector_factories;
+
+ std::unique_ptr<TableBuilder> builder(options.table_factory->NewTableBuilder(
+ TableBuilderOptions(ioptions, moptions, ikc,
+ &int_tbl_prop_collector_factories, kSnappyCompression,
+ options.compression_opts, kUnknownColumnFamily,
+ "test_cf", -1 /* level */),
+ file_writer.get()));
+
+ std::string key1 = "key1";
+ std::string value1(kSizeDummyEntry, '0');
+ InternalKey ik1(key1, 0 /* sequnce number */, kTypeValue);
+ // Adding the first key won't trigger a flush by FlushBlockEveryKeyPolicy
+ // therefore won't trigger any data block's buffering
+ builder->Add(ik1.Encode(), value1);
+ ASSERT_EQ(cache->GetPinnedUsage(), 0 * kSizeDummyEntry);
+
+ std::string key2 = "key2";
+ std::string value2(kSizeDummyEntry, '0');
+ InternalKey ik2(key2, 1 /* sequnce number */, kTypeValue);
+ // Adding the second key will trigger a flush of the last data block (the one
+ // containing key1 and value1) by FlushBlockEveryKeyPolicy and hence trigger
+ // buffering of the last data block.
+ builder->Add(ik2.Encode(), value2);
+ // Cache reservation will increase for last buffered data block (the one
+ // containing key1 and value1) since the buffer limit is not exceeded after
+ // the buffering and the cache will not be full after this reservation
+ EXPECT_GE(cache->GetPinnedUsage(), 2 * kSizeDummyEntry);
+ EXPECT_LT(cache->GetPinnedUsage(),
+ 2 * kSizeDummyEntry + kMetaDataChargeOverhead);
+
+ std::string key3 = "key3";
+ std::string value3 = "val3";
+ InternalKey ik3(key3, 2 /* sequnce number */, kTypeValue);
+ // Adding the third key will trigger a flush of the last data block (the one
+ // containing key2 and value2) by FlushBlockEveryKeyPolicy and hence trigger
+ // buffering of the last data block.
+ builder->Add(ik3.Encode(), value3);
+ // Cache reservation will decrease since the buffer limit is now exceeded
+ // after the last buffering and EnterUnbuffered() is triggered
+ EXPECT_EQ(cache->GetPinnedUsage(), 0 * kSizeDummyEntry);
+
+ ASSERT_OK(builder->Finish());
+ EXPECT_EQ(cache->GetPinnedUsage(), 0 * kSizeDummyEntry);
+}
+
+TEST_P(
+ BlockBasedTableTest,
+ IncreaseCacheReservationForCompressDictBuildingBufferOnBuilderAddAndDecreaseOnCacheFull) {
+ constexpr std::size_t kSizeDummyEntry = 256 * 1024;
+ constexpr std::size_t kMetaDataChargeOverhead = 10000;
+ // A small kCacheCapacity is chosen so that increase cache reservation for
+ // buffering two data blocks, each containing key1/value1, key2/a big
+ // value2, will cause cache full
+ constexpr std::size_t kCacheCapacity =
+ 1 * kSizeDummyEntry + kSizeDummyEntry / 2;
+ constexpr std::size_t kMaxDictBytes = 1024;
+ // A big kMaxDictBufferBytes is chosen so that adding a big key value pair
+ // (key2, value2) won't exceed the buffer limit
+ constexpr std::size_t kMaxDictBufferBytes = 1024 * 1024 * 1024;
+
+ BlockBasedTableOptions table_options = GetBlockBasedTableOptions();
+ LRUCacheOptions lo;
+ lo.capacity = kCacheCapacity;
+ lo.num_shard_bits = 0; // 2^0 shard
+ lo.strict_capacity_limit = true;
+ std::shared_ptr<Cache> cache(NewLRUCache(lo));
+ table_options.block_cache = cache;
+ table_options.flush_block_policy_factory =
+ std::make_shared<FlushBlockEveryKeyPolicyFactory>();
+
+ Options options;
+ options.compression = kSnappyCompression;
+ options.compression_opts.max_dict_bytes = kMaxDictBytes;
+ options.compression_opts.max_dict_buffer_bytes = kMaxDictBufferBytes;
+ options.table_factory.reset(NewBlockBasedTableFactory(table_options));
+
+ test::StringSink* sink = new test::StringSink();
+ std::unique_ptr<FSWritableFile> holder(sink);
+ std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
+ std::move(holder), "test_file_name", FileOptions()));
+
+ ImmutableOptions ioptions(options);
+ MutableCFOptions moptions(options);
+ InternalKeyComparator ikc(options.comparator);
+ IntTblPropCollectorFactories int_tbl_prop_collector_factories;
+
+ std::unique_ptr<TableBuilder> builder(options.table_factory->NewTableBuilder(
+ TableBuilderOptions(ioptions, moptions, ikc,
+ &int_tbl_prop_collector_factories, kSnappyCompression,
+ options.compression_opts, kUnknownColumnFamily,
+ "test_cf", -1 /* level */),
+ file_writer.get()));
+
+ std::string key1 = "key1";
+ std::string value1 = "val1";
+ InternalKey ik1(key1, 0 /* sequnce number */, kTypeValue);
+ // Adding the first key won't trigger a flush by FlushBlockEveryKeyPolicy
+ // therefore won't trigger any data block's buffering
+ builder->Add(ik1.Encode(), value1);
+ ASSERT_EQ(cache->GetPinnedUsage(), 0 * kSizeDummyEntry);
+
+ std::string key2 = "key2";
+ std::string value2(kSizeDummyEntry, '0');
+ InternalKey ik2(key2, 1 /* sequnce number */, kTypeValue);
+ // Adding the second key will trigger a flush of the last data block (the one
+ // containing key1 and value1) by FlushBlockEveryKeyPolicy and hence trigger
+ // buffering of the last data block.
+ builder->Add(ik2.Encode(), value2);
+ // Cache reservation will increase for the last buffered data block (the one
+ // containing key1 and value1) since the buffer limit is not exceeded after
+ // the buffering and the cache will not be full after this reservation
+ EXPECT_GE(cache->GetPinnedUsage(), 1 * kSizeDummyEntry);
+ EXPECT_LT(cache->GetPinnedUsage(),
+ 1 * kSizeDummyEntry + kMetaDataChargeOverhead);
+
+ std::string key3 = "key3";
+ std::string value3 = "value3";
+ InternalKey ik3(key3, 2 /* sequnce number */, kTypeValue);
+ // Adding the third key will trigger a flush of the last data block (the one
+ // containing key2 and value2) by FlushBlockEveryKeyPolicy and hence trigger
+ // buffering of the last data block.
+ builder->Add(ik3.Encode(), value3);
+ // Cache reservation will decrease since the cache is now full after
+ // increasing reservation for the last buffered block and EnterUnbuffered() is
+ // triggered
+ EXPECT_EQ(cache->GetPinnedUsage(), 0 * kSizeDummyEntry);
+
+ ASSERT_OK(builder->Finish());
+ EXPECT_EQ(cache->GetPinnedUsage(), 0 * kSizeDummyEntry);
+}
+
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {