### New Features
* TransactionOptions::skip_concurrency_control allows pessimistic transactions to skip the overhead of concurrency control. Could be used for optimizing certain transactions or during recovery.
+* Introduced CacheAllocator, which lets the user specify custom allocator for memory in block cache.
### Bug Fixes
* Avoid creating empty SSTs and subsequently deleting them in certain cases during compaction.
}
LRUCache::LRUCache(size_t capacity, int num_shard_bits,
- bool strict_capacity_limit, double high_pri_pool_ratio)
- : ShardedCache(capacity, num_shard_bits, strict_capacity_limit) {
+ bool strict_capacity_limit, double high_pri_pool_ratio,
+ std::shared_ptr<CacheAllocator> allocator)
+ : ShardedCache(capacity, num_shard_bits, strict_capacity_limit,
+ std::move(allocator)) {
num_shards_ = 1 << num_shard_bits;
shards_ = reinterpret_cast<LRUCacheShard*>(
port::cacheline_aligned_alloc(sizeof(LRUCacheShard) * num_shards_));
std::shared_ptr<Cache> NewLRUCache(const LRUCacheOptions& cache_opts) {
return NewLRUCache(cache_opts.capacity, cache_opts.num_shard_bits,
cache_opts.strict_capacity_limit,
- cache_opts.high_pri_pool_ratio);
+ cache_opts.high_pri_pool_ratio,
+ cache_opts.cache_allocator);
}
-std::shared_ptr<Cache> NewLRUCache(size_t capacity, int num_shard_bits,
- bool strict_capacity_limit,
- double high_pri_pool_ratio) {
+std::shared_ptr<Cache> NewLRUCache(
+ size_t capacity, int num_shard_bits, bool strict_capacity_limit,
+ double high_pri_pool_ratio,
+ std::shared_ptr<CacheAllocator> cache_allocator) {
if (num_shard_bits >= 20) {
return nullptr; // the cache cannot be sharded into too many fine pieces
}
num_shard_bits = GetDefaultCacheShardBits(capacity);
}
return std::make_shared<LRUCache>(capacity, num_shard_bits,
- strict_capacity_limit, high_pri_pool_ratio);
+ strict_capacity_limit, high_pri_pool_ratio,
+ std::move(cache_allocator));
}
} // namespace rocksdb
class LRUCache : public ShardedCache {
public:
LRUCache(size_t capacity, int num_shard_bits, bool strict_capacity_limit,
- double high_pri_pool_ratio);
+ double high_pri_pool_ratio,
+ std::shared_ptr<CacheAllocator> cache_allocator = nullptr);
virtual ~LRUCache();
virtual const char* Name() const override { return "LRUCache"; }
virtual CacheShard* GetShard(int shard) override;
namespace rocksdb {
ShardedCache::ShardedCache(size_t capacity, int num_shard_bits,
- bool strict_capacity_limit)
- : num_shard_bits_(num_shard_bits),
+ bool strict_capacity_limit,
+ std::shared_ptr<CacheAllocator> allocator)
+ : Cache(std::move(allocator)),
+ num_shard_bits_(num_shard_bits),
capacity_(capacity),
strict_capacity_limit_(strict_capacity_limit),
last_id_(1) {}
strict_capacity_limit_);
ret.append(buffer);
}
+ snprintf(buffer, kBufferSize, " cache_allocator : %s\n",
+ cache_allocator() ? cache_allocator()->Name() : "None");
+ ret.append(buffer);
ret.append(GetShard(0)->GetPrintableOptions());
return ret;
}
// Keys are sharded by the highest num_shard_bits bits of hash value.
class ShardedCache : public Cache {
public:
- ShardedCache(size_t capacity, int num_shard_bits, bool strict_capacity_limit);
+ ShardedCache(size_t capacity, int num_shard_bits, bool strict_capacity_limit,
+ std::shared_ptr<CacheAllocator> cache_allocator = nullptr);
virtual ~ShardedCache() = default;
virtual const char* Name() const override = 0;
virtual CacheShard* GetShard(int shard) = 0;
#include <stdint.h>
#include <memory>
#include <string>
+#include "rocksdb/cache_allocator.h"
#include "rocksdb/slice.h"
#include "rocksdb/statistics.h"
#include "rocksdb/status.h"
// BlockBasedTableOptions::cache_index_and_filter_blocks_with_high_priority.
double high_pri_pool_ratio = 0.0;
+ // If non-nullptr will use this allocator instead of system allocator when
+ // allocating memory for cache blocks. Call this method before you start using
+ // the cache!
+ std::shared_ptr<CacheAllocator> cache_allocator;
+
LRUCacheOptions() {}
LRUCacheOptions(size_t _capacity, int _num_shard_bits,
- bool _strict_capacity_limit, double _high_pri_pool_ratio)
+ bool _strict_capacity_limit, double _high_pri_pool_ratio,
+ std::shared_ptr<CacheAllocator> _cache_allocator = nullptr)
: capacity(_capacity),
num_shard_bits(_num_shard_bits),
strict_capacity_limit(_strict_capacity_limit),
- high_pri_pool_ratio(_high_pri_pool_ratio) {}
+ high_pri_pool_ratio(_high_pri_pool_ratio),
+ cache_allocator(std::move(_cache_allocator)) {}
};
// Create a new cache with a fixed size capacity. The cache is sharded
// high_pri_pool_pct.
// num_shard_bits = -1 means it is automatically determined: every shard
// will be at least 512KB and number of shard bits will not exceed 6.
-extern std::shared_ptr<Cache> NewLRUCache(size_t capacity,
- int num_shard_bits = -1,
- bool strict_capacity_limit = false,
- double high_pri_pool_ratio = 0.0);
+extern std::shared_ptr<Cache> NewLRUCache(
+ size_t capacity, int num_shard_bits = -1,
+ bool strict_capacity_limit = false, double high_pri_pool_ratio = 0.0,
+ std::shared_ptr<CacheAllocator> cache_allocator = nullptr);
extern std::shared_ptr<Cache> NewLRUCache(const LRUCacheOptions& cache_opts);
int num_shard_bits = -1,
bool strict_capacity_limit = false);
+
class Cache {
public:
// Depending on implementation, cache entries with high priority could be less
// likely to get evicted than low priority entries.
enum class Priority { HIGH, LOW };
- Cache() {}
+ Cache(std::shared_ptr<CacheAllocator> allocator = nullptr)
+ : cache_allocator_(std::move(allocator)) {}
// Destroys all existing entries by calling the "deleter"
// function that was passed via the Insert() function.
virtual void TEST_mark_as_data_block(const Slice& /*key*/,
size_t /*charge*/) {}
+ CacheAllocator* cache_allocator() const { return cache_allocator_.get(); }
+
private:
// No copying allowed
Cache(const Cache&);
Cache& operator=(const Cache&);
+
+ std::shared_ptr<CacheAllocator> cache_allocator_;
};
} // namespace rocksdb
--- /dev/null
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#pragma once
+
+// CacheAllocator is an interface that a client can implement to supply custom
+// cache allocation and deallocation methods. See rocksdb/cache.h for more
+// information.
+// All methods should be thread-safe.
+class CacheAllocator {
+ public:
+ virtual ~CacheAllocator() = default;
+
+ // Name of the cache allocator, printed in the log
+ virtual const char* Name() const = 0;
+
+ // Allocate a block of at least size size
+ virtual void* Allocate(size_t size) = 0;
+ // Deallocate previously allocated block
+ virtual void Deallocate(void* p) = 0;
+ // Returns the memory size of the block allocated at p. The default
+ // implementation that just returns the original allocation_size is fine.
+ virtual size_t UsableSize(void* /*p*/, size_t allocation_size) const {
+ // default implementation just returns the allocation size
+ return allocation_size;
+ }
+};
#include "table/full_filter_block.h"
#include "table/table_builder.h"
+#include "util/cache_allocator.h"
#include "util/coding.h"
#include "util/compression.h"
#include "util/crc32c.h"
size_t size = block_contents.size();
- std::unique_ptr<char[]> ubuf(new char[size + 1]);
+ auto ubuf =
+ AllocateBlock(size + 1, block_cache_compressed->cache_allocator());
memcpy(ubuf.get(), block_contents.data(), size);
ubuf[size] = type;
std::unique_ptr<Block>* result, const ImmutableCFOptions& ioptions,
bool do_uncompress, const Slice& compression_dict,
const PersistentCacheOptions& cache_options, SequenceNumber global_seqno,
- size_t read_amp_bytes_per_bit, const bool immortal_file = false) {
+ size_t read_amp_bytes_per_bit, CacheAllocator* allocator = nullptr,
+ const bool immortal_file = false) {
BlockContents contents;
- BlockFetcher block_fetcher(file, prefetch_buffer, footer, options, handle,
- &contents, ioptions, do_uncompress,
- compression_dict, cache_options, immortal_file);
+ BlockFetcher block_fetcher(
+ file, prefetch_buffer, footer, options, handle, &contents, ioptions,
+ do_uncompress, compression_dict, cache_options, allocator, immortal_file);
Status s = block_fetcher.ReadBlockContents();
if (s.ok()) {
result->reset(new Block(std::move(contents), global_seqno,
return s;
}
+inline CacheAllocator* GetCacheAllocator(
+ const BlockBasedTableOptions& table_options) {
+ return table_options.block_cache.get()
+ ? table_options.block_cache->cache_allocator()
+ : nullptr;
+}
+
// Delete the resource that is held by the iterator.
template <class ResourceType>
void DeleteHeldResource(void* arg, void* /*ignored*/) {
rep->footer.metaindex_handle(), &meta, rep->ioptions,
true /* decompress */, Slice() /*compression dict*/,
rep->persistent_cache_options, kDisableGlobalSequenceNumber,
- 0 /* read_amp_bytes_per_bit */);
+ 0 /* read_amp_bytes_per_bit */,
+ GetCacheAllocator(rep->table_options));
if (!s.ok()) {
ROCKS_LOG_ERROR(rep->ioptions.info_log,
const ImmutableCFOptions& ioptions, const ReadOptions& read_options,
BlockBasedTable::CachableEntry<Block>* block, uint32_t format_version,
const Slice& compression_dict, size_t read_amp_bytes_per_bit, bool is_index,
- GetContext* get_context) {
+ GetContext* get_context, CacheAllocator* allocator) {
Status s;
Block* compressed_block = nullptr;
Cache::Handle* block_cache_compressed_handle = nullptr;
compression_dict);
s = UncompressBlockContents(uncompresssion_ctx, compressed_block->data(),
compressed_block->size(), &contents,
- format_version, ioptions);
+ format_version, ioptions, allocator);
// Insert uncompressed block into block cache
if (s.ok()) {
const ReadOptions& /*read_options*/, const ImmutableCFOptions& ioptions,
CachableEntry<Block>* block, Block* raw_block, uint32_t format_version,
const Slice& compression_dict, size_t read_amp_bytes_per_bit, bool is_index,
- Cache::Priority priority, GetContext* get_context) {
+ Cache::Priority priority, GetContext* get_context,
+ CacheAllocator* allocator) {
assert(raw_block->compression_type() == kNoCompression ||
block_cache_compressed != nullptr);
compression_dict);
s = UncompressBlockContents(uncompression_ctx, raw_block->data(),
raw_block->size(), &contents, format_version,
- ioptions);
+ ioptions, allocator);
}
if (!s.ok()) {
delete raw_block;
BlockFetcher block_fetcher(rep->file.get(), prefetch_buffer, rep->footer,
ReadOptions(), filter_handle, &block,
rep->ioptions, false /* decompress */,
- dummy_comp_dict, rep->persistent_cache_options);
+ dummy_comp_dict, rep->persistent_cache_options,
+ GetCacheAllocator(rep->table_options));
Status s = block_fetcher.ReadBlockContents();
if (!s.ok()) {
&block_value, rep->ioptions, rep->blocks_maybe_compressed,
compression_dict, rep->persistent_cache_options,
is_index ? kDisableGlobalSequenceNumber : rep->global_seqno,
- rep->table_options.read_amp_bytes_per_bit, rep->immortal_table);
+ rep->table_options.read_amp_bytes_per_bit,
+ GetCacheAllocator(rep->table_options),
+ rep->immortal_table);
}
if (s.ok()) {
block.value = block_value.release();
s = GetDataBlockFromCache(
key, ckey, block_cache, block_cache_compressed, rep->ioptions, ro,
block_entry, rep->table_options.format_version, compression_dict,
- rep->table_options.read_amp_bytes_per_bit, is_index, get_context);
+ rep->table_options.read_amp_bytes_per_bit, is_index, get_context,
+ GetCacheAllocator(rep->table_options));
if (block_entry->value == nullptr && !no_io && ro.fill_cache) {
std::unique_ptr<Block> raw_block;
block_cache_compressed == nullptr && rep->blocks_maybe_compressed,
compression_dict, rep->persistent_cache_options,
is_index ? kDisableGlobalSequenceNumber : rep->global_seqno,
- rep->table_options.read_amp_bytes_per_bit, rep->immortal_table);
+ rep->table_options.read_amp_bytes_per_bit,
+ GetCacheAllocator(rep->table_options),
+ rep->immortal_table);
}
if (s.ok()) {
.cache_index_and_filter_blocks_with_high_priority
? Cache::Priority::HIGH
: Cache::Priority::LOW,
- get_context);
+ get_context, GetCacheAllocator(rep->table_options));
}
}
}
BlockHandle handle = index_iter->value();
BlockContents contents;
Slice dummy_comp_dict;
- BlockFetcher block_fetcher(rep_->file.get(), nullptr /* prefetch buffer */,
- rep_->footer, ReadOptions(), handle, &contents,
- rep_->ioptions, false /* decompress */,
- dummy_comp_dict /*compression dict*/,
- rep_->persistent_cache_options);
+ BlockFetcher block_fetcher(
+ rep_->file.get(), nullptr /* prefetch buffer */, rep_->footer,
+ ReadOptions(), handle, &contents, rep_->ioptions,
+ false /* decompress */, dummy_comp_dict /*compression dict*/,
+ rep_->persistent_cache_options,
+ GetCacheAllocator(rep_->table_options));
s = block_fetcher.ReadBlockContents();
if (!s.ok()) {
break;
s = handle.DecodeFrom(&input);
BlockContents contents;
Slice dummy_comp_dict;
- BlockFetcher block_fetcher(rep_->file.get(), nullptr /* prefetch buffer */,
- rep_->footer, ReadOptions(), handle, &contents,
- rep_->ioptions, false /* decompress */,
- dummy_comp_dict /*compression dict*/,
- rep_->persistent_cache_options);
+ BlockFetcher block_fetcher(
+ rep_->file.get(), nullptr /* prefetch buffer */, rep_->footer,
+ ReadOptions(), handle, &contents, rep_->ioptions,
+ false /* decompress */, dummy_comp_dict /*compression dict*/,
+ rep_->persistent_cache_options,
+ GetCacheAllocator(rep_->table_options));
s = block_fetcher.ReadBlockContents();
if (!s.ok()) {
break;
rep_->file.get(), nullptr /* prefetch_buffer */, rep_->footer,
ReadOptions(), handle, &block, rep_->ioptions,
false /*decompress*/, dummy_comp_dict /*compression dict*/,
- rep_->persistent_cache_options);
+ rep_->persistent_cache_options,
+ GetCacheAllocator(rep_->table_options));
s = block_fetcher.ReadBlockContents();
if (!s.ok()) {
rep_->filter.reset(new BlockBasedFilterBlockReader(
const ImmutableCFOptions& ioptions, const ReadOptions& read_options,
BlockBasedTable::CachableEntry<Block>* block, uint32_t format_version,
const Slice& compression_dict, size_t read_amp_bytes_per_bit,
- bool is_index = false, GetContext* get_context = nullptr);
+ bool is_index = false, GetContext* get_context = nullptr,
+ CacheAllocator* allocator = nullptr);
// Put a raw block (maybe compressed) to the corresponding block caches.
// This method will perform decompression against raw_block if needed and then
CachableEntry<Block>* block, Block* raw_block, uint32_t format_version,
const Slice& compression_dict, size_t read_amp_bytes_per_bit,
bool is_index = false, Cache::Priority pri = Cache::Priority::LOW,
- GetContext* get_context = nullptr);
+ GetContext* get_context = nullptr, CacheAllocator* allocator = nullptr);
// Calls (*handle_result)(arg, ...) repeatedly, starting with the entry found
// after a call to Seek(key), until handle_result returns false.
#include "rocksdb/env.h"
#include "table/block.h"
#include "table/block_based_table_reader.h"
-#include "table/persistent_cache_helper.h"
#include "table/format.h"
+#include "table/persistent_cache_helper.h"
+#include "util/cache_allocator.h"
#include "util/coding.h"
#include "util/compression.h"
#include "util/crc32c.h"
if (cache_options_.persistent_cache &&
cache_options_.persistent_cache->IsCompressed()) {
// lookup uncompressed cache mode p-cache
+ std::unique_ptr<char[]> raw_data;
status_ = PersistentCacheHelper::LookupRawPage(
- cache_options_, handle_, &heap_buf_, block_size_ + kBlockTrailerSize);
+ cache_options_, handle_, &raw_data, block_size_ + kBlockTrailerSize);
if (status_.ok()) {
+ heap_buf_ = CacheAllocationPtr(raw_data.release());
used_buf_ = heap_buf_.get();
slice_ = Slice(heap_buf_.get(), block_size_);
return true;
// trivially allocated stack buffer instead of needing a full malloc()
used_buf_ = &stack_buf_[0];
} else {
- heap_buf_.reset(new char[block_size_ + kBlockTrailerSize]);
+ heap_buf_ = AllocateBlock(block_size_ + kBlockTrailerSize, allocator_);
used_buf_ = heap_buf_.get();
}
}
// or heap provided. Refer to https://github.com/facebook/rocksdb/pull/4096
if (got_from_prefetch_buffer_ || used_buf_ == &stack_buf_[0]) {
assert(used_buf_ != heap_buf_.get());
- heap_buf_.reset(new char[block_size_ + kBlockTrailerSize]);
+ heap_buf_ = AllocateBlock(block_size_ + kBlockTrailerSize, allocator_);
memcpy(heap_buf_.get(), used_buf_, block_size_ + kBlockTrailerSize);
}
*contents_ = BlockContents(std::move(heap_buf_), block_size_, true,
if (do_uncompress_ && compression_type != kNoCompression) {
// compressed page, uncompress, update cache
UncompressionContext uncompression_ctx(compression_type, compression_dict_);
- status_ =
- UncompressBlockContents(uncompression_ctx, slice_.data(), block_size_,
- contents_, footer_.version(), ioptions_);
+ status_ = UncompressBlockContents(uncompression_ctx, slice_.data(),
+ block_size_, contents_, footer_.version(),
+ ioptions_, allocator_);
} else {
GetBlockContents();
}
#include "table/block.h"
#include "table/format.h"
+#include "util/cache_allocator.h"
+
namespace rocksdb {
class BlockFetcher {
public:
BlockContents* contents, const ImmutableCFOptions& ioptions,
bool do_uncompress, const Slice& compression_dict,
const PersistentCacheOptions& cache_options,
+ CacheAllocator* allocator = nullptr,
const bool immortal_source = false)
: file_(file),
prefetch_buffer_(prefetch_buffer),
do_uncompress_(do_uncompress),
immortal_source_(immortal_source),
compression_dict_(compression_dict),
- cache_options_(cache_options) {}
+ cache_options_(cache_options),
+ allocator_(allocator) {}
Status ReadBlockContents();
private:
const bool immortal_source_;
const Slice& compression_dict_;
const PersistentCacheOptions& cache_options_;
+ CacheAllocator* allocator_;
Status status_;
Slice slice_;
char* used_buf_ = nullptr;
size_t block_size_;
- std::unique_ptr<char[]> heap_buf_;
+ CacheAllocationPtr heap_buf_;
char stack_buf_[kDefaultStackBufferSize];
bool got_from_prefetch_buffer_ = false;
rocksdb::CompressionType compression_type;
#include "table/block_based_table_reader.h"
#include "table/block_fetcher.h"
#include "table/persistent_cache_helper.h"
+#include "util/cache_allocator.h"
#include "util/coding.h"
#include "util/compression.h"
#include "util/crc32c.h"
Status UncompressBlockContentsForCompressionType(
const UncompressionContext& uncompression_ctx, const char* data, size_t n,
BlockContents* contents, uint32_t format_version,
- const ImmutableCFOptions& ioptions) {
- std::unique_ptr<char[]> ubuf;
+ const ImmutableCFOptions& ioptions,
+ CacheAllocator* allocator) {
+ CacheAllocationPtr ubuf;
assert(uncompression_ctx.type() != kNoCompression &&
"Invalid compression type");
if (!Snappy_GetUncompressedLength(data, n, &ulength)) {
return Status::Corruption(snappy_corrupt_msg);
}
- ubuf.reset(new char[ulength]);
+ ubuf = AllocateBlock(ulength, allocator);
if (!Snappy_Uncompress(data, n, ubuf.get())) {
return Status::Corruption(snappy_corrupt_msg);
}
break;
}
case kZlibCompression:
- ubuf.reset(Zlib_Uncompress(
+ ubuf = Zlib_Uncompress(
uncompression_ctx, data, n, &decompress_size,
- GetCompressFormatForVersion(kZlibCompression, format_version)));
+ GetCompressFormatForVersion(kZlibCompression, format_version),
+ allocator);
if (!ubuf) {
static char zlib_corrupt_msg[] =
"Zlib not supported or corrupted Zlib compressed block contents";
BlockContents(std::move(ubuf), decompress_size, true, kNoCompression);
break;
case kBZip2Compression:
- ubuf.reset(BZip2_Uncompress(
+ ubuf = BZip2_Uncompress(
data, n, &decompress_size,
- GetCompressFormatForVersion(kBZip2Compression, format_version)));
+ GetCompressFormatForVersion(kBZip2Compression, format_version),
+ allocator);
if (!ubuf) {
static char bzip2_corrupt_msg[] =
"Bzip2 not supported or corrupted Bzip2 compressed block contents";
BlockContents(std::move(ubuf), decompress_size, true, kNoCompression);
break;
case kLZ4Compression:
- ubuf.reset(LZ4_Uncompress(
+ ubuf = LZ4_Uncompress(
uncompression_ctx, data, n, &decompress_size,
- GetCompressFormatForVersion(kLZ4Compression, format_version)));
+ GetCompressFormatForVersion(kLZ4Compression, format_version),
+ allocator);
if (!ubuf) {
static char lz4_corrupt_msg[] =
"LZ4 not supported or corrupted LZ4 compressed block contents";
BlockContents(std::move(ubuf), decompress_size, true, kNoCompression);
break;
case kLZ4HCCompression:
- ubuf.reset(LZ4_Uncompress(
+ ubuf = LZ4_Uncompress(
uncompression_ctx, data, n, &decompress_size,
- GetCompressFormatForVersion(kLZ4HCCompression, format_version)));
+ GetCompressFormatForVersion(kLZ4HCCompression, format_version),
+ allocator);
if (!ubuf) {
static char lz4hc_corrupt_msg[] =
"LZ4HC not supported or corrupted LZ4HC compressed block contents";
BlockContents(std::move(ubuf), decompress_size, true, kNoCompression);
break;
case kXpressCompression:
+ // XPRESS allocates memory internally, thus no support for custom
+ // allocator.
ubuf.reset(XPRESS_Uncompress(data, n, &decompress_size));
if (!ubuf) {
static char xpress_corrupt_msg[] =
break;
case kZSTD:
case kZSTDNotFinalCompression:
- ubuf.reset(ZSTD_Uncompress(uncompression_ctx, data, n, &decompress_size));
+ ubuf = ZSTD_Uncompress(uncompression_ctx, data, n, &decompress_size,
+ allocator);
if (!ubuf) {
static char zstd_corrupt_msg[] =
"ZSTD not supported or corrupted ZSTD compressed block contents";
Status UncompressBlockContents(const UncompressionContext& uncompression_ctx,
const char* data, size_t n,
BlockContents* contents, uint32_t format_version,
- const ImmutableCFOptions& ioptions) {
+ const ImmutableCFOptions& ioptions,
+ CacheAllocator* allocator) {
assert(data[n] != kNoCompression);
assert(data[n] == uncompression_ctx.type());
- return UncompressBlockContentsForCompressionType(
- uncompression_ctx, data, n, contents, format_version, ioptions);
+ return UncompressBlockContentsForCompressionType(uncompression_ctx, data, n,
+ contents, format_version,
+ ioptions, allocator);
}
} // namespace rocksdb
#include "port/port.h" // noexcept
#include "table/persistent_cache_options.h"
#include "util/file_reader_writer.h"
+#include "util/cache_allocator.h"
namespace rocksdb {
Slice data; // Actual contents of data
bool cachable; // True iff data can be cached
CompressionType compression_type;
- std::unique_ptr<char[]> allocation;
+ CacheAllocationPtr allocation;
BlockContents() : cachable(false), compression_type(kNoCompression) {}
CompressionType _compression_type)
: data(_data), cachable(_cachable), compression_type(_compression_type) {}
- BlockContents(std::unique_ptr<char[]>&& _data, size_t _size, bool _cachable,
+ BlockContents(CacheAllocationPtr&& _data, size_t _size, bool _cachable,
CompressionType _compression_type)
: data(_data.get(), _size),
cachable(_cachable),
compression_type(_compression_type),
allocation(std::move(_data)) {}
+ BlockContents(std::unique_ptr<char[]>&& _data, size_t _size, bool _cachable,
+ CompressionType _compression_type)
+ : data(_data.get(), _size),
+ cachable(_cachable),
+ compression_type(_compression_type) {
+ allocation.reset(_data.release());
+ }
+
// The additional memory space taken by the block data.
size_t usable_size() const {
if (allocation.get() != nullptr) {
+ auto allocator = allocation.get_deleter().allocator;
+ if (allocator) {
+ return allocator->UsableSize(allocation.get(), data.size());
+ }
#ifdef ROCKSDB_MALLOC_USABLE_SIZE
return malloc_usable_size(allocation.get());
#else
extern Status UncompressBlockContents(
const UncompressionContext& uncompression_ctx, const char* data, size_t n,
BlockContents* contents, uint32_t compress_format_version,
- const ImmutableCFOptions& ioptions);
+ const ImmutableCFOptions& ioptions, CacheAllocator* allocator = nullptr);
// This is an extension to UncompressBlockContents that accepts
// a specific compression type. This is used by un-wrapped blocks
extern Status UncompressBlockContentsForCompressionType(
const UncompressionContext& uncompression_ctx, const char* data, size_t n,
BlockContents* contents, uint32_t compress_format_version,
- const ImmutableCFOptions& ioptions);
+ const ImmutableCFOptions& ioptions, CacheAllocator* allocator = nullptr);
// Implementation details follow. Clients should ignore,
DynamicBloom bloom_;
PlainTableReaderFileInfo file_info_;
Arena arena_;
- std::unique_ptr<char[]> index_block_alloc_;
- std::unique_ptr<char[]> bloom_block_alloc_;
+ CacheAllocationPtr index_block_alloc_;
+ CacheAllocationPtr bloom_block_alloc_;
const ImmutableCFOptions& ioptions_;
uint64_t file_size_;
c.ResetTableReader();
}
+namespace {
+class CustomCacheAllocator : public CacheAllocator {
+ public:
+ virtual const char* Name() const override { return "CustomCacheAllocator"; }
+
+ void* Allocate(size_t size) override {
+ ++numAllocations;
+ auto ptr = new char[size + 16];
+ memcpy(ptr, "cache_allocator_", 16); // mangle first 16 bytes
+ return reinterpret_cast<void*>(ptr + 16);
+ }
+ void Deallocate(void* p) override {
+ ++numDeallocations;
+ char* ptr = reinterpret_cast<char*>(p) - 16;
+ delete[] ptr;
+ }
+
+ std::atomic<int> numAllocations;
+ std::atomic<int> numDeallocations;
+};
+} // namespace
+
+TEST_P(BlockBasedTableTest, CacheAllocator) {
+ auto custom_cache_allocator = std::make_shared<CustomCacheAllocator>();
+ {
+ Options opt;
+ unique_ptr<InternalKeyComparator> ikc;
+ ikc.reset(new test::PlainInternalKeyComparator(opt.comparator));
+ opt.compression = kNoCompression;
+ BlockBasedTableOptions table_options;
+ table_options.block_size = 1024;
+ LRUCacheOptions lruOptions;
+ lruOptions.cache_allocator = custom_cache_allocator;
+ lruOptions.capacity = 16 * 1024 * 1024;
+ lruOptions.num_shard_bits = 4;
+ table_options.block_cache = NewLRUCache(std::move(lruOptions));
+ opt.table_factory.reset(NewBlockBasedTableFactory(table_options));
+
+ TableConstructor c(BytewiseComparator(),
+ true /* convert_to_internal_key_ */);
+ c.Add("k01", "hello");
+ c.Add("k02", "hello2");
+ c.Add("k03", std::string(10000, 'x'));
+ c.Add("k04", std::string(200000, 'x'));
+ c.Add("k05", std::string(300000, 'x'));
+ c.Add("k06", "hello3");
+ c.Add("k07", std::string(100000, 'x'));
+ std::vector<std::string> keys;
+ stl_wrappers::KVMap kvmap;
+ const ImmutableCFOptions ioptions(opt);
+ const MutableCFOptions moptions(opt);
+ c.Finish(opt, ioptions, moptions, table_options, *ikc, &keys, &kvmap);
+
+ unique_ptr<InternalIterator> iter(
+ c.NewIterator(moptions.prefix_extractor.get()));
+ iter->SeekToFirst();
+ while (iter->Valid()) {
+ iter->key();
+ iter->value();
+ iter->Next();
+ }
+ ASSERT_OK(iter->status());
+ }
+
+ // out of scope, block cache should have been deleted, all allocations
+ // deallocated
+ EXPECT_EQ(custom_cache_allocator->numAllocations.load(),
+ custom_cache_allocator->numDeallocations.load());
+ // make sure that allocations actually happened through the cache allocator
+ EXPECT_GT(custom_cache_allocator->numAllocations.load(), 0);
+}
+
TEST_P(BlockBasedTableTest, NewIndexIteratorLeak) {
// A regression test to avoid data race described in
// https://github.com/facebook/rocksdb/issues/1267
int64_t bytes = 0;
int decompress_size;
while (ok && bytes < 1024 * 1048576) {
- char *uncompressed = nullptr;
+ CacheAllocationPtr uncompressed;
switch (FLAGS_compression_type_e) {
case rocksdb::kSnappyCompression: {
// get size and allocate here to make comparison fair
ok = false;
break;
}
- uncompressed = new char[ulength];
+ uncompressed = AllocateBlock(ulength, nullptr);
ok = Snappy_Uncompress(compressed.data(), compressed.size(),
- uncompressed);
+ uncompressed.get());
break;
}
case rocksdb::kZlibCompression:
uncompressed = Zlib_Uncompress(uncompression_ctx, compressed.data(),
compressed.size(), &decompress_size, 2);
- ok = uncompressed != nullptr;
+ ok = uncompressed.get() != nullptr;
break;
case rocksdb::kBZip2Compression:
uncompressed = BZip2_Uncompress(compressed.data(), compressed.size(),
&decompress_size, 2);
- ok = uncompressed != nullptr;
+ ok = uncompressed.get() != nullptr;
break;
case rocksdb::kLZ4Compression:
uncompressed = LZ4_Uncompress(uncompression_ctx, compressed.data(),
compressed.size(), &decompress_size, 2);
- ok = uncompressed != nullptr;
+ ok = uncompressed.get() != nullptr;
break;
case rocksdb::kLZ4HCCompression:
uncompressed = LZ4_Uncompress(uncompression_ctx, compressed.data(),
compressed.size(), &decompress_size, 2);
- ok = uncompressed != nullptr;
+ ok = uncompressed.get() != nullptr;
break;
case rocksdb::kXpressCompression:
- uncompressed = XPRESS_Uncompress(compressed.data(), compressed.size(),
- &decompress_size);
- ok = uncompressed != nullptr;
+ uncompressed.reset(XPRESS_Uncompress(
+ compressed.data(), compressed.size(), &decompress_size));
+ ok = uncompressed.get() != nullptr;
break;
case rocksdb::kZSTD:
uncompressed = ZSTD_Uncompress(uncompression_ctx, compressed.data(),
compressed.size(), &decompress_size);
- ok = uncompressed != nullptr;
+ ok = uncompressed.get() != nullptr;
break;
default:
ok = false;
}
- delete[] uncompressed;
bytes += input.size();
thread->stats.FinishedOps(nullptr, nullptr, 1, kUncompress);
}
--- /dev/null
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+
+#pragma once
+
+#include "rocksdb/cache_allocator.h"
+
+namespace rocksdb {
+
+struct CustomDeleter {
+ CustomDeleter(CacheAllocator* a = nullptr) : allocator(a) {}
+
+ void operator()(char* ptr) const {
+ if (allocator) {
+ allocator->Deallocate(reinterpret_cast<void*>(ptr));
+ } else {
+ delete[] ptr;
+ }
+ }
+
+ CacheAllocator* allocator;
+};
+
+using CacheAllocationPtr = std::unique_ptr<char[], CustomDeleter>;
+
+inline CacheAllocationPtr AllocateBlock(size_t size,
+ CacheAllocator* allocator) {
+ if (allocator) {
+ auto block = reinterpret_cast<char*>(allocator->Allocate(size));
+ return CacheAllocationPtr(block, allocator);
+ }
+ return CacheAllocationPtr(new char[size]);
+}
+
+} // namespace rocksdb
#include <string>
#include "rocksdb/options.h"
+#include "rocksdb/table.h"
+#include "util/cache_allocator.h"
#include "util/coding.h"
#include "util/compression_context_cache.h"
// header in varint32 format
// @param compression_dict Data for presetting the compression library's
// dictionary.
-inline char* Zlib_Uncompress(const UncompressionContext& ctx,
- const char* input_data, size_t input_length,
- int* decompress_size,
- uint32_t compress_format_version,
- int windowBits = -14) {
+inline CacheAllocationPtr Zlib_Uncompress(
+ const UncompressionContext& ctx, const char* input_data,
+ size_t input_length, int* decompress_size, uint32_t compress_format_version,
+ CacheAllocator* allocator = nullptr, int windowBits = -14) {
#ifdef ZLIB
uint32_t output_len = 0;
if (compress_format_version == 2) {
_stream.next_in = (Bytef*)input_data;
_stream.avail_in = static_cast<unsigned int>(input_length);
- char* output = new char[output_len];
+ auto output = AllocateBlock(output_len, allocator);
- _stream.next_out = (Bytef*)output;
+ _stream.next_out = (Bytef*)output.get();
_stream.avail_out = static_cast<unsigned int>(output_len);
bool done = false;
size_t old_sz = output_len;
uint32_t output_len_delta = output_len / 5;
output_len += output_len_delta < 10 ? 10 : output_len_delta;
- char* tmp = new char[output_len];
- memcpy(tmp, output, old_sz);
- delete[] output;
- output = tmp;
+ auto tmp = AllocateBlock(output_len, allocator);
+ memcpy(tmp.get(), output.get(), old_sz);
+ output = std::move(tmp);
// Set more output.
- _stream.next_out = (Bytef*)(output + old_sz);
+ _stream.next_out = (Bytef*)(output.get() + old_sz);
_stream.avail_out = static_cast<unsigned int>(output_len - old_sz);
break;
}
case Z_BUF_ERROR:
default:
- delete[] output;
inflateEnd(&_stream);
return nullptr;
}
// block header
// compress_format_version == 2 -- decompressed size is included in the block
// header in varint32 format
-inline char* BZip2_Uncompress(const char* input_data, size_t input_length,
- int* decompress_size,
- uint32_t compress_format_version) {
+inline CacheAllocationPtr BZip2_Uncompress(
+ const char* input_data, size_t input_length, int* decompress_size,
+ uint32_t compress_format_version, CacheAllocator* allocator = nullptr) {
#ifdef BZIP2
uint32_t output_len = 0;
if (compress_format_version == 2) {
_stream.next_in = (char*)input_data;
_stream.avail_in = static_cast<unsigned int>(input_length);
- char* output = new char[output_len];
+ auto output = AllocateBlock(output_len, allocator);
- _stream.next_out = (char*)output;
+ _stream.next_out = (char*)output.get();
_stream.avail_out = static_cast<unsigned int>(output_len);
bool done = false;
assert(compress_format_version != 2);
uint32_t old_sz = output_len;
output_len = output_len * 1.2;
- char* tmp = new char[output_len];
- memcpy(tmp, output, old_sz);
- delete[] output;
- output = tmp;
+ auto tmp = AllocateBlock(output_len, allocator);
+ memcpy(tmp.get(), output.get(), old_sz);
+ output = std::move(tmp);
// Set more output.
- _stream.next_out = (char*)(output + old_sz);
+ _stream.next_out = (char*)(output.get() + old_sz);
_stream.avail_out = static_cast<unsigned int>(output_len - old_sz);
break;
}
default:
- delete[] output;
BZ2_bzDecompressEnd(&_stream);
return nullptr;
}
// header in varint32 format
// @param compression_dict Data for presetting the compression library's
// dictionary.
-inline char* LZ4_Uncompress(const UncompressionContext& ctx,
- const char* input_data, size_t input_length,
- int* decompress_size,
- uint32_t compress_format_version) {
+inline CacheAllocationPtr LZ4_Uncompress(const UncompressionContext& ctx,
+ const char* input_data,
+ size_t input_length,
+ int* decompress_size,
+ uint32_t compress_format_version,
+ CacheAllocator* allocator = nullptr) {
#ifdef LZ4
uint32_t output_len = 0;
if (compress_format_version == 2) {
input_data += 8;
}
- char* output = new char[output_len];
+ auto output = AllocateBlock(output_len, allocator);
#if LZ4_VERSION_NUMBER >= 10400 // r124+
LZ4_streamDecode_t* stream = LZ4_createStreamDecode();
if (ctx.dict().size()) {
static_cast<int>(ctx.dict().size()));
}
*decompress_size = LZ4_decompress_safe_continue(
- stream, input_data, output, static_cast<int>(input_length),
+ stream, input_data, output.get(), static_cast<int>(input_length),
static_cast<int>(output_len));
LZ4_freeStreamDecode(stream);
#else // up to r123
- *decompress_size =
- LZ4_decompress_safe(input_data, output, static_cast<int>(input_length),
- static_cast<int>(output_len));
+ *decompress_size = LZ4_decompress_safe(input_data, output.get(),
+ static_cast<int>(input_length),
+ static_cast<int>(output_len));
#endif // LZ4_VERSION_NUMBER >= 10400
if (*decompress_size < 0) {
- delete[] output;
return nullptr;
}
assert(*decompress_size == static_cast<int>(output_len));
(void)input_length;
(void)decompress_size;
(void)compress_format_version;
+ (void)allocator;
return nullptr;
#endif
}
// @param compression_dict Data for presetting the compression library's
// dictionary.
-inline char* ZSTD_Uncompress(const UncompressionContext& ctx,
- const char* input_data, size_t input_length,
- int* decompress_size) {
+inline CacheAllocationPtr ZSTD_Uncompress(const UncompressionContext& ctx,
+ const char* input_data,
+ size_t input_length,
+ int* decompress_size,
+ CacheAllocator* allocator = nullptr) {
#ifdef ZSTD
uint32_t output_len = 0;
if (!compression::GetDecompressedSizeInfo(&input_data, &input_length,
return nullptr;
}
- char* output = new char[output_len];
+ auto output = AllocateBlock(output_len, allocator);
size_t actual_output_length;
#if ZSTD_VERSION_NUMBER >= 500 // v0.5.0+
ZSTD_DCtx* context = ctx.GetZSTDContext();
assert(context != nullptr);
actual_output_length = ZSTD_decompress_usingDict(
- context, output, output_len, input_data, input_length, ctx.dict().data(),
- ctx.dict().size());
+ context, output.get(), output_len, input_data, input_length,
+ ctx.dict().data(), ctx.dict().size());
#else // up to v0.4.x
actual_output_length =
- ZSTD_decompress(output, output_len, input_data, input_length);
+ ZSTD_decompress(output.get(), output_len, input_data, input_length);
#endif // ZSTD_VERSION_NUMBER >= 500
assert(actual_output_length == output_len);
*decompress_size = static_cast<int>(actual_output_length);
(void)input_data;
(void)input_length;
(void)decompress_size;
+ (void)allocator;
return nullptr;
#endif
}