### New Features
* TransactionOptions::skip_concurrency_control allows pessimistic transactions to skip the overhead of concurrency control. Could be used for optimizing certain transactions or during recovery.
-* Introduced CacheAllocator, which lets the user specify custom allocator for memory in block cache.
### Bug Fixes
* Avoid creating empty SSTs and subsequently deleting them in certain cases during compaction.
}
LRUCache::LRUCache(size_t capacity, int num_shard_bits,
- bool strict_capacity_limit, double high_pri_pool_ratio,
- std::shared_ptr<CacheAllocator> allocator)
- : ShardedCache(capacity, num_shard_bits, strict_capacity_limit,
- std::move(allocator)) {
+ bool strict_capacity_limit, double high_pri_pool_ratio)
+ : ShardedCache(capacity, num_shard_bits, strict_capacity_limit) {
num_shards_ = 1 << num_shard_bits;
shards_ = reinterpret_cast<LRUCacheShard*>(
port::cacheline_aligned_alloc(sizeof(LRUCacheShard) * num_shards_));
std::shared_ptr<Cache> NewLRUCache(const LRUCacheOptions& cache_opts) {
return NewLRUCache(cache_opts.capacity, cache_opts.num_shard_bits,
cache_opts.strict_capacity_limit,
- cache_opts.high_pri_pool_ratio,
- cache_opts.cache_allocator);
+ cache_opts.high_pri_pool_ratio);
}
-std::shared_ptr<Cache> NewLRUCache(
- size_t capacity, int num_shard_bits, bool strict_capacity_limit,
- double high_pri_pool_ratio,
- std::shared_ptr<CacheAllocator> cache_allocator) {
+std::shared_ptr<Cache> NewLRUCache(size_t capacity, int num_shard_bits,
+ bool strict_capacity_limit,
+ double high_pri_pool_ratio) {
if (num_shard_bits >= 20) {
return nullptr; // the cache cannot be sharded into too many fine pieces
}
num_shard_bits = GetDefaultCacheShardBits(capacity);
}
return std::make_shared<LRUCache>(capacity, num_shard_bits,
- strict_capacity_limit, high_pri_pool_ratio,
- std::move(cache_allocator));
+ strict_capacity_limit, high_pri_pool_ratio);
}
} // namespace rocksdb
class LRUCache : public ShardedCache {
public:
LRUCache(size_t capacity, int num_shard_bits, bool strict_capacity_limit,
- double high_pri_pool_ratio,
- std::shared_ptr<CacheAllocator> cache_allocator = nullptr);
+ double high_pri_pool_ratio);
virtual ~LRUCache();
virtual const char* Name() const override { return "LRUCache"; }
virtual CacheShard* GetShard(int shard) override;
namespace rocksdb {
ShardedCache::ShardedCache(size_t capacity, int num_shard_bits,
- bool strict_capacity_limit,
- std::shared_ptr<CacheAllocator> allocator)
- : Cache(std::move(allocator)),
- num_shard_bits_(num_shard_bits),
+ bool strict_capacity_limit)
+ : num_shard_bits_(num_shard_bits),
capacity_(capacity),
strict_capacity_limit_(strict_capacity_limit),
last_id_(1) {}
strict_capacity_limit_);
ret.append(buffer);
}
- snprintf(buffer, kBufferSize, " cache_allocator : %s\n",
- cache_allocator() ? cache_allocator()->Name() : "None");
- ret.append(buffer);
ret.append(GetShard(0)->GetPrintableOptions());
return ret;
}
// Keys are sharded by the highest num_shard_bits bits of hash value.
class ShardedCache : public Cache {
public:
- ShardedCache(size_t capacity, int num_shard_bits, bool strict_capacity_limit,
- std::shared_ptr<CacheAllocator> cache_allocator = nullptr);
+ ShardedCache(size_t capacity, int num_shard_bits, bool strict_capacity_limit);
virtual ~ShardedCache() = default;
virtual const char* Name() const override = 0;
virtual CacheShard* GetShard(int shard) = 0;
#include <stdint.h>
#include <memory>
#include <string>
-#include "rocksdb/cache_allocator.h"
#include "rocksdb/slice.h"
#include "rocksdb/statistics.h"
#include "rocksdb/status.h"
// BlockBasedTableOptions::cache_index_and_filter_blocks_with_high_priority.
double high_pri_pool_ratio = 0.0;
- // If non-nullptr will use this allocator instead of system allocator when
- // allocating memory for cache blocks. Call this method before you start using
- // the cache!
- std::shared_ptr<CacheAllocator> cache_allocator;
-
LRUCacheOptions() {}
LRUCacheOptions(size_t _capacity, int _num_shard_bits,
- bool _strict_capacity_limit, double _high_pri_pool_ratio,
- std::shared_ptr<CacheAllocator> _cache_allocator = nullptr)
+ bool _strict_capacity_limit, double _high_pri_pool_ratio)
: capacity(_capacity),
num_shard_bits(_num_shard_bits),
strict_capacity_limit(_strict_capacity_limit),
- high_pri_pool_ratio(_high_pri_pool_ratio),
- cache_allocator(std::move(_cache_allocator)) {}
+ high_pri_pool_ratio(_high_pri_pool_ratio) {}
};
// Create a new cache with a fixed size capacity. The cache is sharded
// high_pri_pool_pct.
// num_shard_bits = -1 means it is automatically determined: every shard
// will be at least 512KB and number of shard bits will not exceed 6.
-extern std::shared_ptr<Cache> NewLRUCache(
- size_t capacity, int num_shard_bits = -1,
- bool strict_capacity_limit = false, double high_pri_pool_ratio = 0.0,
- std::shared_ptr<CacheAllocator> cache_allocator = nullptr);
+extern std::shared_ptr<Cache> NewLRUCache(size_t capacity,
+ int num_shard_bits = -1,
+ bool strict_capacity_limit = false,
+ double high_pri_pool_ratio = 0.0);
extern std::shared_ptr<Cache> NewLRUCache(const LRUCacheOptions& cache_opts);
int num_shard_bits = -1,
bool strict_capacity_limit = false);
-
class Cache {
public:
// Depending on implementation, cache entries with high priority could be less
// likely to get evicted than low priority entries.
enum class Priority { HIGH, LOW };
- Cache(std::shared_ptr<CacheAllocator> allocator = nullptr)
- : cache_allocator_(std::move(allocator)) {}
+ Cache() {}
// Destroys all existing entries by calling the "deleter"
// function that was passed via the Insert() function.
virtual void TEST_mark_as_data_block(const Slice& /*key*/,
size_t /*charge*/) {}
- CacheAllocator* cache_allocator() const { return cache_allocator_.get(); }
-
private:
// No copying allowed
Cache(const Cache&);
Cache& operator=(const Cache&);
-
- std::shared_ptr<CacheAllocator> cache_allocator_;
};
} // namespace rocksdb
+++ /dev/null
-// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
-// This source code is licensed under both the GPLv2 (found in the
-// COPYING file in the root directory) and Apache 2.0 License
-// (found in the LICENSE.Apache file in the root directory).
-
-#pragma once
-
-// CacheAllocator is an interface that a client can implement to supply custom
-// cache allocation and deallocation methods. See rocksdb/cache.h for more
-// information.
-// All methods should be thread-safe.
-class CacheAllocator {
- public:
- virtual ~CacheAllocator() = default;
-
- // Name of the cache allocator, printed in the log
- virtual const char* Name() const = 0;
-
- // Allocate a block of at least size size
- virtual void* Allocate(size_t size) = 0;
- // Deallocate previously allocated block
- virtual void Deallocate(void* p) = 0;
- // Returns the memory size of the block allocated at p. The default
- // implementation that just returns the original allocation_size is fine.
- virtual size_t UsableSize(void* /*p*/, size_t allocation_size) const {
- // default implementation just returns the allocation size
- return allocation_size;
- }
-};
#include "table/full_filter_block.h"
#include "table/table_builder.h"
-#include "util/cache_allocator.h"
#include "util/coding.h"
#include "util/compression.h"
#include "util/crc32c.h"
size_t size = block_contents.size();
- auto ubuf =
- AllocateBlock(size + 1, block_cache_compressed->cache_allocator());
+ std::unique_ptr<char[]> ubuf(new char[size + 1]);
memcpy(ubuf.get(), block_contents.data(), size);
ubuf[size] = type;
std::unique_ptr<Block>* result, const ImmutableCFOptions& ioptions,
bool do_uncompress, const Slice& compression_dict,
const PersistentCacheOptions& cache_options, SequenceNumber global_seqno,
- size_t read_amp_bytes_per_bit, CacheAllocator* allocator = nullptr,
- const bool immortal_file = false) {
+ size_t read_amp_bytes_per_bit, const bool immortal_file = false) {
BlockContents contents;
- BlockFetcher block_fetcher(
- file, prefetch_buffer, footer, options, handle, &contents, ioptions,
- do_uncompress, compression_dict, cache_options, allocator, immortal_file);
+ BlockFetcher block_fetcher(file, prefetch_buffer, footer, options, handle,
+ &contents, ioptions, do_uncompress,
+ compression_dict, cache_options, immortal_file);
Status s = block_fetcher.ReadBlockContents();
if (s.ok()) {
result->reset(new Block(std::move(contents), global_seqno,
return s;
}
-inline CacheAllocator* GetCacheAllocator(
- const BlockBasedTableOptions& table_options) {
- return table_options.block_cache.get()
- ? table_options.block_cache->cache_allocator()
- : nullptr;
-}
-
// Delete the resource that is held by the iterator.
template <class ResourceType>
void DeleteHeldResource(void* arg, void* /*ignored*/) {
rep->footer.metaindex_handle(), &meta, rep->ioptions,
true /* decompress */, Slice() /*compression dict*/,
rep->persistent_cache_options, kDisableGlobalSequenceNumber,
- 0 /* read_amp_bytes_per_bit */,
- GetCacheAllocator(rep->table_options));
+ 0 /* read_amp_bytes_per_bit */);
if (!s.ok()) {
ROCKS_LOG_ERROR(rep->ioptions.info_log,
const ImmutableCFOptions& ioptions, const ReadOptions& read_options,
BlockBasedTable::CachableEntry<Block>* block, uint32_t format_version,
const Slice& compression_dict, size_t read_amp_bytes_per_bit, bool is_index,
- GetContext* get_context, CacheAllocator* allocator) {
+ GetContext* get_context) {
Status s;
Block* compressed_block = nullptr;
Cache::Handle* block_cache_compressed_handle = nullptr;
compression_dict);
s = UncompressBlockContents(uncompresssion_ctx, compressed_block->data(),
compressed_block->size(), &contents,
- format_version, ioptions, allocator);
+ format_version, ioptions);
// Insert uncompressed block into block cache
if (s.ok()) {
const ReadOptions& /*read_options*/, const ImmutableCFOptions& ioptions,
CachableEntry<Block>* block, Block* raw_block, uint32_t format_version,
const Slice& compression_dict, size_t read_amp_bytes_per_bit, bool is_index,
- Cache::Priority priority, GetContext* get_context,
- CacheAllocator* allocator) {
+ Cache::Priority priority, GetContext* get_context) {
assert(raw_block->compression_type() == kNoCompression ||
block_cache_compressed != nullptr);
compression_dict);
s = UncompressBlockContents(uncompression_ctx, raw_block->data(),
raw_block->size(), &contents, format_version,
- ioptions, allocator);
+ ioptions);
}
if (!s.ok()) {
delete raw_block;
BlockFetcher block_fetcher(rep->file.get(), prefetch_buffer, rep->footer,
ReadOptions(), filter_handle, &block,
rep->ioptions, false /* decompress */,
- dummy_comp_dict, rep->persistent_cache_options,
- GetCacheAllocator(rep->table_options));
+ dummy_comp_dict, rep->persistent_cache_options);
Status s = block_fetcher.ReadBlockContents();
if (!s.ok()) {
&block_value, rep->ioptions, rep->blocks_maybe_compressed,
compression_dict, rep->persistent_cache_options,
is_index ? kDisableGlobalSequenceNumber : rep->global_seqno,
- rep->table_options.read_amp_bytes_per_bit,
- GetCacheAllocator(rep->table_options),
- rep->immortal_table);
+ rep->table_options.read_amp_bytes_per_bit, rep->immortal_table);
}
if (s.ok()) {
block.value = block_value.release();
s = GetDataBlockFromCache(
key, ckey, block_cache, block_cache_compressed, rep->ioptions, ro,
block_entry, rep->table_options.format_version, compression_dict,
- rep->table_options.read_amp_bytes_per_bit, is_index, get_context,
- GetCacheAllocator(rep->table_options));
+ rep->table_options.read_amp_bytes_per_bit, is_index, get_context);
if (block_entry->value == nullptr && !no_io && ro.fill_cache) {
std::unique_ptr<Block> raw_block;
block_cache_compressed == nullptr && rep->blocks_maybe_compressed,
compression_dict, rep->persistent_cache_options,
is_index ? kDisableGlobalSequenceNumber : rep->global_seqno,
- rep->table_options.read_amp_bytes_per_bit,
- GetCacheAllocator(rep->table_options),
- rep->immortal_table);
+ rep->table_options.read_amp_bytes_per_bit, rep->immortal_table);
}
if (s.ok()) {
.cache_index_and_filter_blocks_with_high_priority
? Cache::Priority::HIGH
: Cache::Priority::LOW,
- get_context, GetCacheAllocator(rep->table_options));
+ get_context);
}
}
}
BlockHandle handle = index_iter->value();
BlockContents contents;
Slice dummy_comp_dict;
- BlockFetcher block_fetcher(
- rep_->file.get(), nullptr /* prefetch buffer */, rep_->footer,
- ReadOptions(), handle, &contents, rep_->ioptions,
- false /* decompress */, dummy_comp_dict /*compression dict*/,
- rep_->persistent_cache_options,
- GetCacheAllocator(rep_->table_options));
+ BlockFetcher block_fetcher(rep_->file.get(), nullptr /* prefetch buffer */,
+ rep_->footer, ReadOptions(), handle, &contents,
+ rep_->ioptions, false /* decompress */,
+ dummy_comp_dict /*compression dict*/,
+ rep_->persistent_cache_options);
s = block_fetcher.ReadBlockContents();
if (!s.ok()) {
break;
s = handle.DecodeFrom(&input);
BlockContents contents;
Slice dummy_comp_dict;
- BlockFetcher block_fetcher(
- rep_->file.get(), nullptr /* prefetch buffer */, rep_->footer,
- ReadOptions(), handle, &contents, rep_->ioptions,
- false /* decompress */, dummy_comp_dict /*compression dict*/,
- rep_->persistent_cache_options,
- GetCacheAllocator(rep_->table_options));
+ BlockFetcher block_fetcher(rep_->file.get(), nullptr /* prefetch buffer */,
+ rep_->footer, ReadOptions(), handle, &contents,
+ rep_->ioptions, false /* decompress */,
+ dummy_comp_dict /*compression dict*/,
+ rep_->persistent_cache_options);
s = block_fetcher.ReadBlockContents();
if (!s.ok()) {
break;
rep_->file.get(), nullptr /* prefetch_buffer */, rep_->footer,
ReadOptions(), handle, &block, rep_->ioptions,
false /*decompress*/, dummy_comp_dict /*compression dict*/,
- rep_->persistent_cache_options,
- GetCacheAllocator(rep_->table_options));
+ rep_->persistent_cache_options);
s = block_fetcher.ReadBlockContents();
if (!s.ok()) {
rep_->filter.reset(new BlockBasedFilterBlockReader(
const ImmutableCFOptions& ioptions, const ReadOptions& read_options,
BlockBasedTable::CachableEntry<Block>* block, uint32_t format_version,
const Slice& compression_dict, size_t read_amp_bytes_per_bit,
- bool is_index = false, GetContext* get_context = nullptr,
- CacheAllocator* allocator = nullptr);
+ bool is_index = false, GetContext* get_context = nullptr);
// Put a raw block (maybe compressed) to the corresponding block caches.
// This method will perform decompression against raw_block if needed and then
CachableEntry<Block>* block, Block* raw_block, uint32_t format_version,
const Slice& compression_dict, size_t read_amp_bytes_per_bit,
bool is_index = false, Cache::Priority pri = Cache::Priority::LOW,
- GetContext* get_context = nullptr, CacheAllocator* allocator = nullptr);
+ GetContext* get_context = nullptr);
// Calls (*handle_result)(arg, ...) repeatedly, starting with the entry found
// after a call to Seek(key), until handle_result returns false.
#include "rocksdb/env.h"
#include "table/block.h"
#include "table/block_based_table_reader.h"
-#include "table/format.h"
#include "table/persistent_cache_helper.h"
-#include "util/cache_allocator.h"
+#include "table/format.h"
#include "util/coding.h"
#include "util/compression.h"
#include "util/crc32c.h"
if (cache_options_.persistent_cache &&
cache_options_.persistent_cache->IsCompressed()) {
// lookup uncompressed cache mode p-cache
- std::unique_ptr<char[]> raw_data;
status_ = PersistentCacheHelper::LookupRawPage(
- cache_options_, handle_, &raw_data, block_size_ + kBlockTrailerSize);
+ cache_options_, handle_, &heap_buf_, block_size_ + kBlockTrailerSize);
if (status_.ok()) {
- heap_buf_ = CacheAllocationPtr(raw_data.release());
used_buf_ = heap_buf_.get();
slice_ = Slice(heap_buf_.get(), block_size_);
return true;
// trivially allocated stack buffer instead of needing a full malloc()
used_buf_ = &stack_buf_[0];
} else {
- heap_buf_ = AllocateBlock(block_size_ + kBlockTrailerSize, allocator_);
+ heap_buf_.reset(new char[block_size_ + kBlockTrailerSize]);
used_buf_ = heap_buf_.get();
}
}
// or heap provided. Refer to https://github.com/facebook/rocksdb/pull/4096
if (got_from_prefetch_buffer_ || used_buf_ == &stack_buf_[0]) {
assert(used_buf_ != heap_buf_.get());
- heap_buf_ = AllocateBlock(block_size_ + kBlockTrailerSize, allocator_);
+ heap_buf_.reset(new char[block_size_ + kBlockTrailerSize]);
memcpy(heap_buf_.get(), used_buf_, block_size_ + kBlockTrailerSize);
}
*contents_ = BlockContents(std::move(heap_buf_), block_size_, true,
if (do_uncompress_ && compression_type != kNoCompression) {
// compressed page, uncompress, update cache
UncompressionContext uncompression_ctx(compression_type, compression_dict_);
- status_ = UncompressBlockContents(uncompression_ctx, slice_.data(),
- block_size_, contents_, footer_.version(),
- ioptions_, allocator_);
+ status_ =
+ UncompressBlockContents(uncompression_ctx, slice_.data(), block_size_,
+ contents_, footer_.version(), ioptions_);
} else {
GetBlockContents();
}
#include "table/block.h"
#include "table/format.h"
-#include "util/cache_allocator.h"
-
namespace rocksdb {
class BlockFetcher {
public:
BlockContents* contents, const ImmutableCFOptions& ioptions,
bool do_uncompress, const Slice& compression_dict,
const PersistentCacheOptions& cache_options,
- CacheAllocator* allocator = nullptr,
const bool immortal_source = false)
: file_(file),
prefetch_buffer_(prefetch_buffer),
do_uncompress_(do_uncompress),
immortal_source_(immortal_source),
compression_dict_(compression_dict),
- cache_options_(cache_options),
- allocator_(allocator) {}
+ cache_options_(cache_options) {}
Status ReadBlockContents();
private:
const bool immortal_source_;
const Slice& compression_dict_;
const PersistentCacheOptions& cache_options_;
- CacheAllocator* allocator_;
Status status_;
Slice slice_;
char* used_buf_ = nullptr;
size_t block_size_;
- CacheAllocationPtr heap_buf_;
+ std::unique_ptr<char[]> heap_buf_;
char stack_buf_[kDefaultStackBufferSize];
bool got_from_prefetch_buffer_ = false;
rocksdb::CompressionType compression_type;
#include "table/block_based_table_reader.h"
#include "table/block_fetcher.h"
#include "table/persistent_cache_helper.h"
-#include "util/cache_allocator.h"
#include "util/coding.h"
#include "util/compression.h"
#include "util/crc32c.h"
Status UncompressBlockContentsForCompressionType(
const UncompressionContext& uncompression_ctx, const char* data, size_t n,
BlockContents* contents, uint32_t format_version,
- const ImmutableCFOptions& ioptions,
- CacheAllocator* allocator) {
- CacheAllocationPtr ubuf;
+ const ImmutableCFOptions& ioptions) {
+ std::unique_ptr<char[]> ubuf;
assert(uncompression_ctx.type() != kNoCompression &&
"Invalid compression type");
if (!Snappy_GetUncompressedLength(data, n, &ulength)) {
return Status::Corruption(snappy_corrupt_msg);
}
- ubuf = AllocateBlock(ulength, allocator);
+ ubuf.reset(new char[ulength]);
if (!Snappy_Uncompress(data, n, ubuf.get())) {
return Status::Corruption(snappy_corrupt_msg);
}
break;
}
case kZlibCompression:
- ubuf = Zlib_Uncompress(
+ ubuf.reset(Zlib_Uncompress(
uncompression_ctx, data, n, &decompress_size,
- GetCompressFormatForVersion(kZlibCompression, format_version),
- allocator);
+ GetCompressFormatForVersion(kZlibCompression, format_version)));
if (!ubuf) {
static char zlib_corrupt_msg[] =
"Zlib not supported or corrupted Zlib compressed block contents";
BlockContents(std::move(ubuf), decompress_size, true, kNoCompression);
break;
case kBZip2Compression:
- ubuf = BZip2_Uncompress(
+ ubuf.reset(BZip2_Uncompress(
data, n, &decompress_size,
- GetCompressFormatForVersion(kBZip2Compression, format_version),
- allocator);
+ GetCompressFormatForVersion(kBZip2Compression, format_version)));
if (!ubuf) {
static char bzip2_corrupt_msg[] =
"Bzip2 not supported or corrupted Bzip2 compressed block contents";
BlockContents(std::move(ubuf), decompress_size, true, kNoCompression);
break;
case kLZ4Compression:
- ubuf = LZ4_Uncompress(
+ ubuf.reset(LZ4_Uncompress(
uncompression_ctx, data, n, &decompress_size,
- GetCompressFormatForVersion(kLZ4Compression, format_version),
- allocator);
+ GetCompressFormatForVersion(kLZ4Compression, format_version)));
if (!ubuf) {
static char lz4_corrupt_msg[] =
"LZ4 not supported or corrupted LZ4 compressed block contents";
BlockContents(std::move(ubuf), decompress_size, true, kNoCompression);
break;
case kLZ4HCCompression:
- ubuf = LZ4_Uncompress(
+ ubuf.reset(LZ4_Uncompress(
uncompression_ctx, data, n, &decompress_size,
- GetCompressFormatForVersion(kLZ4HCCompression, format_version),
- allocator);
+ GetCompressFormatForVersion(kLZ4HCCompression, format_version)));
if (!ubuf) {
static char lz4hc_corrupt_msg[] =
"LZ4HC not supported or corrupted LZ4HC compressed block contents";
BlockContents(std::move(ubuf), decompress_size, true, kNoCompression);
break;
case kXpressCompression:
- // XPRESS allocates memory internally, thus no support for custom
- // allocator.
ubuf.reset(XPRESS_Uncompress(data, n, &decompress_size));
if (!ubuf) {
static char xpress_corrupt_msg[] =
break;
case kZSTD:
case kZSTDNotFinalCompression:
- ubuf = ZSTD_Uncompress(uncompression_ctx, data, n, &decompress_size,
- allocator);
+ ubuf.reset(ZSTD_Uncompress(uncompression_ctx, data, n, &decompress_size));
if (!ubuf) {
static char zstd_corrupt_msg[] =
"ZSTD not supported or corrupted ZSTD compressed block contents";
Status UncompressBlockContents(const UncompressionContext& uncompression_ctx,
const char* data, size_t n,
BlockContents* contents, uint32_t format_version,
- const ImmutableCFOptions& ioptions,
- CacheAllocator* allocator) {
+ const ImmutableCFOptions& ioptions) {
assert(data[n] != kNoCompression);
assert(data[n] == uncompression_ctx.type());
- return UncompressBlockContentsForCompressionType(uncompression_ctx, data, n,
- contents, format_version,
- ioptions, allocator);
+ return UncompressBlockContentsForCompressionType(
+ uncompression_ctx, data, n, contents, format_version, ioptions);
}
} // namespace rocksdb
#include "port/port.h" // noexcept
#include "table/persistent_cache_options.h"
#include "util/file_reader_writer.h"
-#include "util/cache_allocator.h"
namespace rocksdb {
Slice data; // Actual contents of data
bool cachable; // True iff data can be cached
CompressionType compression_type;
- CacheAllocationPtr allocation;
+ std::unique_ptr<char[]> allocation;
BlockContents() : cachable(false), compression_type(kNoCompression) {}
CompressionType _compression_type)
: data(_data), cachable(_cachable), compression_type(_compression_type) {}
- BlockContents(CacheAllocationPtr&& _data, size_t _size, bool _cachable,
+ BlockContents(std::unique_ptr<char[]>&& _data, size_t _size, bool _cachable,
CompressionType _compression_type)
: data(_data.get(), _size),
cachable(_cachable),
compression_type(_compression_type),
allocation(std::move(_data)) {}
- BlockContents(std::unique_ptr<char[]>&& _data, size_t _size, bool _cachable,
- CompressionType _compression_type)
- : data(_data.get(), _size),
- cachable(_cachable),
- compression_type(_compression_type) {
- allocation.reset(_data.release());
- }
-
// The additional memory space taken by the block data.
size_t usable_size() const {
if (allocation.get() != nullptr) {
- auto allocator = allocation.get_deleter().allocator;
- if (allocator) {
- return allocator->UsableSize(allocation.get(), data.size());
- }
#ifdef ROCKSDB_MALLOC_USABLE_SIZE
return malloc_usable_size(allocation.get());
#else
extern Status UncompressBlockContents(
const UncompressionContext& uncompression_ctx, const char* data, size_t n,
BlockContents* contents, uint32_t compress_format_version,
- const ImmutableCFOptions& ioptions, CacheAllocator* allocator = nullptr);
+ const ImmutableCFOptions& ioptions);
// This is an extension to UncompressBlockContents that accepts
// a specific compression type. This is used by un-wrapped blocks
extern Status UncompressBlockContentsForCompressionType(
const UncompressionContext& uncompression_ctx, const char* data, size_t n,
BlockContents* contents, uint32_t compress_format_version,
- const ImmutableCFOptions& ioptions, CacheAllocator* allocator = nullptr);
+ const ImmutableCFOptions& ioptions);
// Implementation details follow. Clients should ignore,
DynamicBloom bloom_;
PlainTableReaderFileInfo file_info_;
Arena arena_;
- CacheAllocationPtr index_block_alloc_;
- CacheAllocationPtr bloom_block_alloc_;
+ std::unique_ptr<char[]> index_block_alloc_;
+ std::unique_ptr<char[]> bloom_block_alloc_;
const ImmutableCFOptions& ioptions_;
uint64_t file_size_;
c.ResetTableReader();
}
-namespace {
-class CustomCacheAllocator : public CacheAllocator {
- public:
- virtual const char* Name() const override { return "CustomCacheAllocator"; }
-
- void* Allocate(size_t size) override {
- ++numAllocations;
- auto ptr = new char[size + 16];
- memcpy(ptr, "cache_allocator_", 16); // mangle first 16 bytes
- return reinterpret_cast<void*>(ptr + 16);
- }
- void Deallocate(void* p) override {
- ++numDeallocations;
- char* ptr = reinterpret_cast<char*>(p) - 16;
- delete[] ptr;
- }
-
- std::atomic<int> numAllocations;
- std::atomic<int> numDeallocations;
-};
-} // namespace
-
-TEST_P(BlockBasedTableTest, CacheAllocator) {
- auto custom_cache_allocator = std::make_shared<CustomCacheAllocator>();
- {
- Options opt;
- unique_ptr<InternalKeyComparator> ikc;
- ikc.reset(new test::PlainInternalKeyComparator(opt.comparator));
- opt.compression = kNoCompression;
- BlockBasedTableOptions table_options;
- table_options.block_size = 1024;
- LRUCacheOptions lruOptions;
- lruOptions.cache_allocator = custom_cache_allocator;
- lruOptions.capacity = 16 * 1024 * 1024;
- lruOptions.num_shard_bits = 4;
- table_options.block_cache = NewLRUCache(std::move(lruOptions));
- opt.table_factory.reset(NewBlockBasedTableFactory(table_options));
-
- TableConstructor c(BytewiseComparator(),
- true /* convert_to_internal_key_ */);
- c.Add("k01", "hello");
- c.Add("k02", "hello2");
- c.Add("k03", std::string(10000, 'x'));
- c.Add("k04", std::string(200000, 'x'));
- c.Add("k05", std::string(300000, 'x'));
- c.Add("k06", "hello3");
- c.Add("k07", std::string(100000, 'x'));
- std::vector<std::string> keys;
- stl_wrappers::KVMap kvmap;
- const ImmutableCFOptions ioptions(opt);
- const MutableCFOptions moptions(opt);
- c.Finish(opt, ioptions, moptions, table_options, *ikc, &keys, &kvmap);
-
- unique_ptr<InternalIterator> iter(
- c.NewIterator(moptions.prefix_extractor.get()));
- iter->SeekToFirst();
- while (iter->Valid()) {
- iter->key();
- iter->value();
- iter->Next();
- }
- ASSERT_OK(iter->status());
- }
-
- // out of scope, block cache should have been deleted, all allocations
- // deallocated
- EXPECT_EQ(custom_cache_allocator->numAllocations.load(),
- custom_cache_allocator->numDeallocations.load());
- // make sure that allocations actually happened through the cache allocator
- EXPECT_GT(custom_cache_allocator->numAllocations.load(), 0);
-}
-
TEST_P(BlockBasedTableTest, NewIndexIteratorLeak) {
// A regression test to avoid data race described in
// https://github.com/facebook/rocksdb/issues/1267
int64_t bytes = 0;
int decompress_size;
while (ok && bytes < 1024 * 1048576) {
- CacheAllocationPtr uncompressed;
+ char *uncompressed = nullptr;
switch (FLAGS_compression_type_e) {
case rocksdb::kSnappyCompression: {
// get size and allocate here to make comparison fair
ok = false;
break;
}
- uncompressed = AllocateBlock(ulength, nullptr);
+ uncompressed = new char[ulength];
ok = Snappy_Uncompress(compressed.data(), compressed.size(),
- uncompressed.get());
+ uncompressed);
break;
}
case rocksdb::kZlibCompression:
uncompressed = Zlib_Uncompress(uncompression_ctx, compressed.data(),
compressed.size(), &decompress_size, 2);
- ok = uncompressed.get() != nullptr;
+ ok = uncompressed != nullptr;
break;
case rocksdb::kBZip2Compression:
uncompressed = BZip2_Uncompress(compressed.data(), compressed.size(),
&decompress_size, 2);
- ok = uncompressed.get() != nullptr;
+ ok = uncompressed != nullptr;
break;
case rocksdb::kLZ4Compression:
uncompressed = LZ4_Uncompress(uncompression_ctx, compressed.data(),
compressed.size(), &decompress_size, 2);
- ok = uncompressed.get() != nullptr;
+ ok = uncompressed != nullptr;
break;
case rocksdb::kLZ4HCCompression:
uncompressed = LZ4_Uncompress(uncompression_ctx, compressed.data(),
compressed.size(), &decompress_size, 2);
- ok = uncompressed.get() != nullptr;
+ ok = uncompressed != nullptr;
break;
case rocksdb::kXpressCompression:
- uncompressed.reset(XPRESS_Uncompress(
- compressed.data(), compressed.size(), &decompress_size));
- ok = uncompressed.get() != nullptr;
+ uncompressed = XPRESS_Uncompress(compressed.data(), compressed.size(),
+ &decompress_size);
+ ok = uncompressed != nullptr;
break;
case rocksdb::kZSTD:
uncompressed = ZSTD_Uncompress(uncompression_ctx, compressed.data(),
compressed.size(), &decompress_size);
- ok = uncompressed.get() != nullptr;
+ ok = uncompressed != nullptr;
break;
default:
ok = false;
}
+ delete[] uncompressed;
bytes += input.size();
thread->stats.FinishedOps(nullptr, nullptr, 1, kUncompress);
}
+++ /dev/null
-// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
-// This source code is licensed under both the GPLv2 (found in the
-// COPYING file in the root directory) and Apache 2.0 License
-// (found in the LICENSE.Apache file in the root directory).
-//
-
-#pragma once
-
-#include "rocksdb/cache_allocator.h"
-
-namespace rocksdb {
-
-struct CustomDeleter {
- CustomDeleter(CacheAllocator* a = nullptr) : allocator(a) {}
-
- void operator()(char* ptr) const {
- if (allocator) {
- allocator->Deallocate(reinterpret_cast<void*>(ptr));
- } else {
- delete[] ptr;
- }
- }
-
- CacheAllocator* allocator;
-};
-
-using CacheAllocationPtr = std::unique_ptr<char[], CustomDeleter>;
-
-inline CacheAllocationPtr AllocateBlock(size_t size,
- CacheAllocator* allocator) {
- if (allocator) {
- auto block = reinterpret_cast<char*>(allocator->Allocate(size));
- return CacheAllocationPtr(block, allocator);
- }
- return CacheAllocationPtr(new char[size]);
-}
-
-} // namespace rocksdb
#include <string>
#include "rocksdb/options.h"
-#include "rocksdb/table.h"
-#include "util/cache_allocator.h"
#include "util/coding.h"
#include "util/compression_context_cache.h"
// header in varint32 format
// @param compression_dict Data for presetting the compression library's
// dictionary.
-inline CacheAllocationPtr Zlib_Uncompress(
- const UncompressionContext& ctx, const char* input_data,
- size_t input_length, int* decompress_size, uint32_t compress_format_version,
- CacheAllocator* allocator = nullptr, int windowBits = -14) {
+inline char* Zlib_Uncompress(const UncompressionContext& ctx,
+ const char* input_data, size_t input_length,
+ int* decompress_size,
+ uint32_t compress_format_version,
+ int windowBits = -14) {
#ifdef ZLIB
uint32_t output_len = 0;
if (compress_format_version == 2) {
_stream.next_in = (Bytef*)input_data;
_stream.avail_in = static_cast<unsigned int>(input_length);
- auto output = AllocateBlock(output_len, allocator);
+ char* output = new char[output_len];
- _stream.next_out = (Bytef*)output.get();
+ _stream.next_out = (Bytef*)output;
_stream.avail_out = static_cast<unsigned int>(output_len);
bool done = false;
size_t old_sz = output_len;
uint32_t output_len_delta = output_len / 5;
output_len += output_len_delta < 10 ? 10 : output_len_delta;
- auto tmp = AllocateBlock(output_len, allocator);
- memcpy(tmp.get(), output.get(), old_sz);
- output = std::move(tmp);
+ char* tmp = new char[output_len];
+ memcpy(tmp, output, old_sz);
+ delete[] output;
+ output = tmp;
// Set more output.
- _stream.next_out = (Bytef*)(output.get() + old_sz);
+ _stream.next_out = (Bytef*)(output + old_sz);
_stream.avail_out = static_cast<unsigned int>(output_len - old_sz);
break;
}
case Z_BUF_ERROR:
default:
+ delete[] output;
inflateEnd(&_stream);
return nullptr;
}
// block header
// compress_format_version == 2 -- decompressed size is included in the block
// header in varint32 format
-inline CacheAllocationPtr BZip2_Uncompress(
- const char* input_data, size_t input_length, int* decompress_size,
- uint32_t compress_format_version, CacheAllocator* allocator = nullptr) {
+inline char* BZip2_Uncompress(const char* input_data, size_t input_length,
+ int* decompress_size,
+ uint32_t compress_format_version) {
#ifdef BZIP2
uint32_t output_len = 0;
if (compress_format_version == 2) {
_stream.next_in = (char*)input_data;
_stream.avail_in = static_cast<unsigned int>(input_length);
- auto output = AllocateBlock(output_len, allocator);
+ char* output = new char[output_len];
- _stream.next_out = (char*)output.get();
+ _stream.next_out = (char*)output;
_stream.avail_out = static_cast<unsigned int>(output_len);
bool done = false;
assert(compress_format_version != 2);
uint32_t old_sz = output_len;
output_len = output_len * 1.2;
- auto tmp = AllocateBlock(output_len, allocator);
- memcpy(tmp.get(), output.get(), old_sz);
- output = std::move(tmp);
+ char* tmp = new char[output_len];
+ memcpy(tmp, output, old_sz);
+ delete[] output;
+ output = tmp;
// Set more output.
- _stream.next_out = (char*)(output.get() + old_sz);
+ _stream.next_out = (char*)(output + old_sz);
_stream.avail_out = static_cast<unsigned int>(output_len - old_sz);
break;
}
default:
+ delete[] output;
BZ2_bzDecompressEnd(&_stream);
return nullptr;
}
// header in varint32 format
// @param compression_dict Data for presetting the compression library's
// dictionary.
-inline CacheAllocationPtr LZ4_Uncompress(const UncompressionContext& ctx,
- const char* input_data,
- size_t input_length,
- int* decompress_size,
- uint32_t compress_format_version,
- CacheAllocator* allocator = nullptr) {
+inline char* LZ4_Uncompress(const UncompressionContext& ctx,
+ const char* input_data, size_t input_length,
+ int* decompress_size,
+ uint32_t compress_format_version) {
#ifdef LZ4
uint32_t output_len = 0;
if (compress_format_version == 2) {
input_data += 8;
}
- auto output = AllocateBlock(output_len, allocator);
+ char* output = new char[output_len];
#if LZ4_VERSION_NUMBER >= 10400 // r124+
LZ4_streamDecode_t* stream = LZ4_createStreamDecode();
if (ctx.dict().size()) {
static_cast<int>(ctx.dict().size()));
}
*decompress_size = LZ4_decompress_safe_continue(
- stream, input_data, output.get(), static_cast<int>(input_length),
+ stream, input_data, output, static_cast<int>(input_length),
static_cast<int>(output_len));
LZ4_freeStreamDecode(stream);
#else // up to r123
- *decompress_size = LZ4_decompress_safe(input_data, output.get(),
- static_cast<int>(input_length),
- static_cast<int>(output_len));
+ *decompress_size =
+ LZ4_decompress_safe(input_data, output, static_cast<int>(input_length),
+ static_cast<int>(output_len));
#endif // LZ4_VERSION_NUMBER >= 10400
if (*decompress_size < 0) {
+ delete[] output;
return nullptr;
}
assert(*decompress_size == static_cast<int>(output_len));
(void)input_length;
(void)decompress_size;
(void)compress_format_version;
- (void)allocator;
return nullptr;
#endif
}
// @param compression_dict Data for presetting the compression library's
// dictionary.
-inline CacheAllocationPtr ZSTD_Uncompress(const UncompressionContext& ctx,
- const char* input_data,
- size_t input_length,
- int* decompress_size,
- CacheAllocator* allocator = nullptr) {
+inline char* ZSTD_Uncompress(const UncompressionContext& ctx,
+ const char* input_data, size_t input_length,
+ int* decompress_size) {
#ifdef ZSTD
uint32_t output_len = 0;
if (!compression::GetDecompressedSizeInfo(&input_data, &input_length,
return nullptr;
}
- auto output = AllocateBlock(output_len, allocator);
+ char* output = new char[output_len];
size_t actual_output_length;
#if ZSTD_VERSION_NUMBER >= 500 // v0.5.0+
ZSTD_DCtx* context = ctx.GetZSTDContext();
assert(context != nullptr);
actual_output_length = ZSTD_decompress_usingDict(
- context, output.get(), output_len, input_data, input_length,
- ctx.dict().data(), ctx.dict().size());
+ context, output, output_len, input_data, input_length, ctx.dict().data(),
+ ctx.dict().size());
#else // up to v0.4.x
actual_output_length =
- ZSTD_decompress(output.get(), output_len, input_data, input_length);
+ ZSTD_decompress(output, output_len, input_data, input_length);
#endif // ZSTD_VERSION_NUMBER >= 500
assert(actual_output_length == output_len);
*decompress_size = static_cast<int>(actual_output_length);
(void)input_data;
(void)input_length;
(void)decompress_size;
- (void)allocator;
return nullptr;
#endif
}