]> git-server-git.apps.pok.os.sepia.ceph.com Git - rocksdb.git/commitdiff
Introduce CacheAllocator, a custom allocator for cache blocks (#4437)
authorIgor Canadi <igor@rockset.com>
Wed, 3 Oct 2018 00:21:54 +0000 (17:21 -0700)
committerFosco Marotto <fjm@fb.com>
Wed, 31 Oct 2018 20:02:53 +0000 (13:02 -0700)
Summary:
This is a conceptually simple change, but it touches many files to
pass the allocator through function calls.

We introduce CacheAllocator, which can be used by clients to configure
custom allocator for cache blocks. Our motivation is to hook this up
with folly's `JemallocNodumpAllocator`
(https://github.com/facebook/folly/blob/f43ce6d6866b7b994b3019df561109afae050ebc/folly/experimental/JemallocNodumpAllocator.h),
but there are many other possible use cases.

Additionally, this commit cleans up memory allocation in
`util/compression.h`, making sure that all allocations are wrapped in a
unique_ptr as soon as possible.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/4437

Differential Revision: D10132814

Pulled By: yiwu-arbug

fbshipit-source-id: be1343a4b69f6048df127939fea9bbc96969f564

19 files changed:
HISTORY.md
cache/lru_cache.cc
cache/lru_cache.h
cache/sharded_cache.cc
cache/sharded_cache.h
include/rocksdb/cache.h
include/rocksdb/cache_allocator.h [new file with mode: 0644]
table/block_based_table_builder.cc
table/block_based_table_reader.cc
table/block_based_table_reader.h
table/block_fetcher.cc
table/block_fetcher.h
table/format.cc
table/format.h
table/plain_table_reader.h
table/table_test.cc
tools/db_bench_tool.cc
util/cache_allocator.h [new file with mode: 0644]
util/compression.h

index 0648814fd13f1bca41461245b33d93d27c1e03da..7c6b945753bc16c81eabc88ade0b5b1728a8f002 100644 (file)
@@ -20,6 +20,7 @@
 
 ### New Features
 * TransactionOptions::skip_concurrency_control allows pessimistic transactions to skip the overhead of concurrency control. Could be used for optimizing certain transactions or during recovery.
+* Introduced CacheAllocator, which lets the user specify custom allocator for memory in block cache.
 
 ### Bug Fixes
 * Avoid creating empty SSTs and subsequently deleting them in certain cases during compaction.
index d4cbb9a45f3850f6247518716a9e5dfeaaed7e95..9f3acd16abc795c4434447932ef1a83f5aec8f91 100644 (file)
@@ -461,8 +461,10 @@ std::string LRUCacheShard::GetPrintableOptions() const {
 }
 
 LRUCache::LRUCache(size_t capacity, int num_shard_bits,
-                   bool strict_capacity_limit, double high_pri_pool_ratio)
-    : ShardedCache(capacity, num_shard_bits, strict_capacity_limit) {
+                   bool strict_capacity_limit, double high_pri_pool_ratio,
+                   std::shared_ptr<CacheAllocator> allocator)
+    : ShardedCache(capacity, num_shard_bits, strict_capacity_limit,
+                   std::move(allocator)) {
   num_shards_ = 1 << num_shard_bits;
   shards_ = reinterpret_cast<LRUCacheShard*>(
       port::cacheline_aligned_alloc(sizeof(LRUCacheShard) * num_shards_));
@@ -537,12 +539,14 @@ double LRUCache::GetHighPriPoolRatio() {
 std::shared_ptr<Cache> NewLRUCache(const LRUCacheOptions& cache_opts) {
   return NewLRUCache(cache_opts.capacity, cache_opts.num_shard_bits,
                      cache_opts.strict_capacity_limit,
-                     cache_opts.high_pri_pool_ratio);
+                     cache_opts.high_pri_pool_ratio,
+                     cache_opts.cache_allocator);
 }
 
-std::shared_ptr<Cache> NewLRUCache(size_t capacity, int num_shard_bits,
-                                   bool strict_capacity_limit,
-                                   double high_pri_pool_ratio) {
+std::shared_ptr<Cache> NewLRUCache(
+    size_t capacity, int num_shard_bits, bool strict_capacity_limit,
+    double high_pri_pool_ratio,
+    std::shared_ptr<CacheAllocator> cache_allocator) {
   if (num_shard_bits >= 20) {
     return nullptr;  // the cache cannot be sharded into too many fine pieces
   }
@@ -554,7 +558,8 @@ std::shared_ptr<Cache> NewLRUCache(size_t capacity, int num_shard_bits,
     num_shard_bits = GetDefaultCacheShardBits(capacity);
   }
   return std::make_shared<LRUCache>(capacity, num_shard_bits,
-                                    strict_capacity_limit, high_pri_pool_ratio);
+                                    strict_capacity_limit, high_pri_pool_ratio,
+                                    std::move(cache_allocator));
 }
 
 }  // namespace rocksdb
index 3c067f0c1e6067d160ed94ea283788de74606493..0b925a16657ed1360cd21c32a05513af07c4ed14 100644 (file)
@@ -279,7 +279,8 @@ class ALIGN_AS(CACHE_LINE_SIZE) LRUCacheShard : public CacheShard {
 class LRUCache : public ShardedCache {
  public:
   LRUCache(size_t capacity, int num_shard_bits, bool strict_capacity_limit,
-           double high_pri_pool_ratio);
+           double high_pri_pool_ratio,
+           std::shared_ptr<CacheAllocator> cache_allocator = nullptr);
   virtual ~LRUCache();
   virtual const char* Name() const override { return "LRUCache"; }
   virtual CacheShard* GetShard(int shard) override;
index 6a0a22282115ca1b3d49dd8106b7ce24c9e2853b..3fef82a79da30fab69e331ad525c22e99df6ca28 100644 (file)
 namespace rocksdb {
 
 ShardedCache::ShardedCache(size_t capacity, int num_shard_bits,
-                           bool strict_capacity_limit)
-    : num_shard_bits_(num_shard_bits),
+                           bool strict_capacity_limit,
+                           std::shared_ptr<CacheAllocator> allocator)
+    : Cache(std::move(allocator)),
+      num_shard_bits_(num_shard_bits),
       capacity_(capacity),
       strict_capacity_limit_(strict_capacity_limit),
       last_id_(1) {}
@@ -142,6 +144,9 @@ std::string ShardedCache::GetPrintableOptions() const {
              strict_capacity_limit_);
     ret.append(buffer);
   }
+  snprintf(buffer, kBufferSize, "    cache_allocator : %s\n",
+           cache_allocator() ? cache_allocator()->Name() : "None");
+  ret.append(buffer);
   ret.append(GetShard(0)->GetPrintableOptions());
   return ret;
 }
index 4f9dea2ad0fe670ad68b1374a418ece3eca6915a..9876a882bad05f94e531e7064eaf02826409ef0c 100644 (file)
@@ -47,7 +47,8 @@ class CacheShard {
 // Keys are sharded by the highest num_shard_bits bits of hash value.
 class ShardedCache : public Cache {
  public:
-  ShardedCache(size_t capacity, int num_shard_bits, bool strict_capacity_limit);
+  ShardedCache(size_t capacity, int num_shard_bits, bool strict_capacity_limit,
+               std::shared_ptr<CacheAllocator> cache_allocator = nullptr);
   virtual ~ShardedCache() = default;
   virtual const char* Name() const override = 0;
   virtual CacheShard* GetShard(int shard) = 0;
index da3b934d83073e881a3f1c00eb8e41dd178d0a3c..d4d2b65109a54e1b83ab747ba8d1c78f178e1c99 100644 (file)
@@ -25,6 +25,7 @@
 #include <stdint.h>
 #include <memory>
 #include <string>
+#include "rocksdb/cache_allocator.h"
 #include "rocksdb/slice.h"
 #include "rocksdb/statistics.h"
 #include "rocksdb/status.h"
@@ -58,13 +59,20 @@ struct LRUCacheOptions {
   // BlockBasedTableOptions::cache_index_and_filter_blocks_with_high_priority.
   double high_pri_pool_ratio = 0.0;
 
+  // If non-nullptr will use this allocator instead of system allocator when
+  // allocating memory for cache blocks. Call this method before you start using
+  // the cache!
+  std::shared_ptr<CacheAllocator> cache_allocator;
+
   LRUCacheOptions() {}
   LRUCacheOptions(size_t _capacity, int _num_shard_bits,
-                  bool _strict_capacity_limit, double _high_pri_pool_ratio)
+                  bool _strict_capacity_limit, double _high_pri_pool_ratio,
+                  std::shared_ptr<CacheAllocator> _cache_allocator = nullptr)
       : capacity(_capacity),
         num_shard_bits(_num_shard_bits),
         strict_capacity_limit(_strict_capacity_limit),
-        high_pri_pool_ratio(_high_pri_pool_ratio) {}
+        high_pri_pool_ratio(_high_pri_pool_ratio),
+        cache_allocator(std::move(_cache_allocator)) {}
 };
 
 // Create a new cache with a fixed size capacity. The cache is sharded
@@ -75,10 +83,10 @@ struct LRUCacheOptions {
 // high_pri_pool_pct.
 // num_shard_bits = -1 means it is automatically determined: every shard
 // will be at least 512KB and number of shard bits will not exceed 6.
-extern std::shared_ptr<Cache> NewLRUCache(size_t capacity,
-                                          int num_shard_bits = -1,
-                                          bool strict_capacity_limit = false,
-                                          double high_pri_pool_ratio = 0.0);
+extern std::shared_ptr<Cache> NewLRUCache(
+    size_t capacity, int num_shard_bits = -1,
+    bool strict_capacity_limit = false, double high_pri_pool_ratio = 0.0,
+    std::shared_ptr<CacheAllocator> cache_allocator = nullptr);
 
 extern std::shared_ptr<Cache> NewLRUCache(const LRUCacheOptions& cache_opts);
 
@@ -91,13 +99,15 @@ extern std::shared_ptr<Cache> NewClockCache(size_t capacity,
                                             int num_shard_bits = -1,
                                             bool strict_capacity_limit = false);
 
+
 class Cache {
  public:
   // Depending on implementation, cache entries with high priority could be less
   // likely to get evicted than low priority entries.
   enum class Priority { HIGH, LOW };
 
-  Cache() {}
+  Cache(std::shared_ptr<CacheAllocator> allocator = nullptr)
+      : cache_allocator_(std::move(allocator)) {}
 
   // Destroys all existing entries by calling the "deleter"
   // function that was passed via the Insert() function.
@@ -228,10 +238,14 @@ class Cache {
   virtual void TEST_mark_as_data_block(const Slice& /*key*/,
                                        size_t /*charge*/) {}
 
+  CacheAllocator* cache_allocator() const { return cache_allocator_.get(); }
+
  private:
   // No copying allowed
   Cache(const Cache&);
   Cache& operator=(const Cache&);
+
+  std::shared_ptr<CacheAllocator> cache_allocator_;
 };
 
 }  // namespace rocksdb
diff --git a/include/rocksdb/cache_allocator.h b/include/rocksdb/cache_allocator.h
new file mode 100644 (file)
index 0000000..5bbec0e
--- /dev/null
@@ -0,0 +1,29 @@
+// Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
+//  This source code is licensed under both the GPLv2 (found in the
+//  COPYING file in the root directory) and Apache 2.0 License
+//  (found in the LICENSE.Apache file in the root directory).
+
+#pragma once
+
+// CacheAllocator is an interface that a client can implement to supply custom
+// cache allocation and deallocation methods. See rocksdb/cache.h for more
+// information.
+// All methods should be thread-safe.
+class CacheAllocator {
+ public:
+  virtual ~CacheAllocator() = default;
+
+  // Name of the cache allocator, printed in the log
+  virtual const char* Name() const = 0;
+
+  // Allocate a block of at least size size
+  virtual void* Allocate(size_t size) = 0;
+  // Deallocate previously allocated block
+  virtual void Deallocate(void* p) = 0;
+  // Returns the memory size of the block allocated at p. The default
+  // implementation that just returns the original allocation_size is fine.
+  virtual size_t UsableSize(void* /*p*/, size_t allocation_size) const {
+    // default implementation just returns the allocation size
+    return allocation_size;
+  }
+};
index 59c385d65aeec7855bb662ad8ceb6b3a881d1f13..26c14d21dd3e8bef310e9b4b19e43e9fa53d11cd 100644 (file)
@@ -39,6 +39,7 @@
 #include "table/full_filter_block.h"
 #include "table/table_builder.h"
 
+#include "util/cache_allocator.h"
 #include "util/coding.h"
 #include "util/compression.h"
 #include "util/crc32c.h"
@@ -654,7 +655,8 @@ Status BlockBasedTableBuilder::InsertBlockInCache(const Slice& block_contents,
 
     size_t size = block_contents.size();
 
-    std::unique_ptr<char[]> ubuf(new char[size + 1]);
+    auto ubuf =
+        AllocateBlock(size + 1, block_cache_compressed->cache_allocator());
     memcpy(ubuf.get(), block_contents.data(), size);
     ubuf[size] = type;
 
index 9f2e02d68063e4cd3a66b9fa038351ae22c895b8..7e7cec1b17b37ee4fa58c52f2b88165192436f09 100644 (file)
@@ -80,11 +80,12 @@ Status ReadBlockFromFile(
     std::unique_ptr<Block>* result, const ImmutableCFOptions& ioptions,
     bool do_uncompress, const Slice& compression_dict,
     const PersistentCacheOptions& cache_options, SequenceNumber global_seqno,
-    size_t read_amp_bytes_per_bit, const bool immortal_file = false) {
+    size_t read_amp_bytes_per_bit, CacheAllocator* allocator = nullptr,
+    const bool immortal_file = false) {
   BlockContents contents;
-  BlockFetcher block_fetcher(file, prefetch_buffer, footer, options, handle,
-                             &contents, ioptions, do_uncompress,
-                             compression_dict, cache_options, immortal_file);
+  BlockFetcher block_fetcher(
+      file, prefetch_buffer, footer, options, handle, &contents, ioptions,
+      do_uncompress, compression_dict, cache_options, allocator, immortal_file);
   Status s = block_fetcher.ReadBlockContents();
   if (s.ok()) {
     result->reset(new Block(std::move(contents), global_seqno,
@@ -94,6 +95,13 @@ Status ReadBlockFromFile(
   return s;
 }
 
+inline CacheAllocator* GetCacheAllocator(
+    const BlockBasedTableOptions& table_options) {
+  return table_options.block_cache.get()
+             ? table_options.block_cache->cache_allocator()
+             : nullptr;
+}
+
 // Delete the resource that is held by the iterator.
 template <class ResourceType>
 void DeleteHeldResource(void* arg, void* /*ignored*/) {
@@ -1150,7 +1158,8 @@ Status BlockBasedTable::ReadMetaBlock(Rep* rep,
       rep->footer.metaindex_handle(), &meta, rep->ioptions,
       true /* decompress */, Slice() /*compression dict*/,
       rep->persistent_cache_options, kDisableGlobalSequenceNumber,
-      0 /* read_amp_bytes_per_bit */);
+      0 /* read_amp_bytes_per_bit */,
+      GetCacheAllocator(rep->table_options));
 
   if (!s.ok()) {
     ROCKS_LOG_ERROR(rep->ioptions.info_log,
@@ -1173,7 +1182,7 @@ Status BlockBasedTable::GetDataBlockFromCache(
     const ImmutableCFOptions& ioptions, const ReadOptions& read_options,
     BlockBasedTable::CachableEntry<Block>* block, uint32_t format_version,
     const Slice& compression_dict, size_t read_amp_bytes_per_bit, bool is_index,
-    GetContext* get_context) {
+    GetContext* get_context, CacheAllocator* allocator) {
   Status s;
   Block* compressed_block = nullptr;
   Cache::Handle* block_cache_compressed_handle = nullptr;
@@ -1230,7 +1239,7 @@ Status BlockBasedTable::GetDataBlockFromCache(
                                           compression_dict);
   s = UncompressBlockContents(uncompresssion_ctx, compressed_block->data(),
                               compressed_block->size(), &contents,
-                              format_version, ioptions);
+                              format_version, ioptions, allocator);
 
   // Insert uncompressed block into block cache
   if (s.ok()) {
@@ -1292,7 +1301,8 @@ Status BlockBasedTable::PutDataBlockToCache(
     const ReadOptions& /*read_options*/, const ImmutableCFOptions& ioptions,
     CachableEntry<Block>* block, Block* raw_block, uint32_t format_version,
     const Slice& compression_dict, size_t read_amp_bytes_per_bit, bool is_index,
-    Cache::Priority priority, GetContext* get_context) {
+    Cache::Priority priority, GetContext* get_context,
+    CacheAllocator* allocator) {
   assert(raw_block->compression_type() == kNoCompression ||
          block_cache_compressed != nullptr);
 
@@ -1305,7 +1315,7 @@ Status BlockBasedTable::PutDataBlockToCache(
                                            compression_dict);
     s = UncompressBlockContents(uncompression_ctx, raw_block->data(),
                                 raw_block->size(), &contents, format_version,
-                                ioptions);
+                                ioptions, allocator);
   }
   if (!s.ok()) {
     delete raw_block;
@@ -1402,7 +1412,8 @@ FilterBlockReader* BlockBasedTable::ReadFilter(
   BlockFetcher block_fetcher(rep->file.get(), prefetch_buffer, rep->footer,
                              ReadOptions(), filter_handle, &block,
                              rep->ioptions, false /* decompress */,
-                             dummy_comp_dict, rep->persistent_cache_options);
+                             dummy_comp_dict, rep->persistent_cache_options,
+                             GetCacheAllocator(rep->table_options));
   Status s = block_fetcher.ReadBlockContents();
 
   if (!s.ok()) {
@@ -1700,7 +1711,9 @@ TBlockIter* BlockBasedTable::NewDataBlockIterator(
           &block_value, rep->ioptions, rep->blocks_maybe_compressed,
           compression_dict, rep->persistent_cache_options,
           is_index ? kDisableGlobalSequenceNumber : rep->global_seqno,
-          rep->table_options.read_amp_bytes_per_bit, rep->immortal_table);
+          rep->table_options.read_amp_bytes_per_bit,
+          GetCacheAllocator(rep->table_options),
+          rep->immortal_table);
     }
     if (s.ok()) {
       block.value = block_value.release();
@@ -1792,7 +1805,8 @@ Status BlockBasedTable::MaybeLoadDataBlockToCache(
     s = GetDataBlockFromCache(
         key, ckey, block_cache, block_cache_compressed, rep->ioptions, ro,
         block_entry, rep->table_options.format_version, compression_dict,
-        rep->table_options.read_amp_bytes_per_bit, is_index, get_context);
+        rep->table_options.read_amp_bytes_per_bit, is_index, get_context,
+        GetCacheAllocator(rep->table_options));
 
     if (block_entry->value == nullptr && !no_io && ro.fill_cache) {
       std::unique_ptr<Block> raw_block;
@@ -1804,7 +1818,9 @@ Status BlockBasedTable::MaybeLoadDataBlockToCache(
             block_cache_compressed == nullptr && rep->blocks_maybe_compressed,
             compression_dict, rep->persistent_cache_options,
             is_index ? kDisableGlobalSequenceNumber : rep->global_seqno,
-            rep->table_options.read_amp_bytes_per_bit, rep->immortal_table);
+            rep->table_options.read_amp_bytes_per_bit,
+            GetCacheAllocator(rep->table_options),
+            rep->immortal_table);
       }
 
       if (s.ok()) {
@@ -1817,7 +1833,7 @@ Status BlockBasedTable::MaybeLoadDataBlockToCache(
                             .cache_index_and_filter_blocks_with_high_priority
                 ? Cache::Priority::HIGH
                 : Cache::Priority::LOW,
-            get_context);
+            get_context, GetCacheAllocator(rep->table_options));
       }
     }
   }
@@ -2524,11 +2540,12 @@ Status BlockBasedTable::VerifyChecksumInBlocks(
     BlockHandle handle = index_iter->value();
     BlockContents contents;
     Slice dummy_comp_dict;
-    BlockFetcher block_fetcher(rep_->file.get(), nullptr /* prefetch buffer */,
-                               rep_->footer, ReadOptions(), handle, &contents,
-                               rep_->ioptions, false /* decompress */,
-                               dummy_comp_dict /*compression dict*/,
-                               rep_->persistent_cache_options);
+    BlockFetcher block_fetcher(
+        rep_->file.get(), nullptr /* prefetch buffer */, rep_->footer,
+        ReadOptions(), handle, &contents, rep_->ioptions,
+        false /* decompress */, dummy_comp_dict /*compression dict*/,
+        rep_->persistent_cache_options,
+        GetCacheAllocator(rep_->table_options));
     s = block_fetcher.ReadBlockContents();
     if (!s.ok()) {
       break;
@@ -2550,11 +2567,12 @@ Status BlockBasedTable::VerifyChecksumInBlocks(
     s = handle.DecodeFrom(&input);
     BlockContents contents;
     Slice dummy_comp_dict;
-    BlockFetcher block_fetcher(rep_->file.get(), nullptr /* prefetch buffer */,
-                               rep_->footer, ReadOptions(), handle, &contents,
-                               rep_->ioptions, false /* decompress */,
-                               dummy_comp_dict /*compression dict*/,
-                               rep_->persistent_cache_options);
+    BlockFetcher block_fetcher(
+        rep_->file.get(), nullptr /* prefetch buffer */, rep_->footer,
+        ReadOptions(), handle, &contents, rep_->ioptions,
+        false /* decompress */, dummy_comp_dict /*compression dict*/,
+        rep_->persistent_cache_options,
+        GetCacheAllocator(rep_->table_options));
     s = block_fetcher.ReadBlockContents();
     if (!s.ok()) {
       break;
@@ -2858,7 +2876,8 @@ Status BlockBasedTable::DumpTable(WritableFile* out_file,
               rep_->file.get(), nullptr /* prefetch_buffer */, rep_->footer,
               ReadOptions(), handle, &block, rep_->ioptions,
               false /*decompress*/, dummy_comp_dict /*compression dict*/,
-              rep_->persistent_cache_options);
+              rep_->persistent_cache_options,
+              GetCacheAllocator(rep_->table_options));
           s = block_fetcher.ReadBlockContents();
           if (!s.ok()) {
             rep_->filter.reset(new BlockBasedFilterBlockReader(
index 3cada0c2c2da3489438fa4ad0747693fee2ee73e..59a0f36b56bc792d4e9a6bffa3b08aaf12c54bce 100644 (file)
@@ -303,7 +303,8 @@ class BlockBasedTable : public TableReader {
       const ImmutableCFOptions& ioptions, const ReadOptions& read_options,
       BlockBasedTable::CachableEntry<Block>* block, uint32_t format_version,
       const Slice& compression_dict, size_t read_amp_bytes_per_bit,
-      bool is_index = false, GetContext* get_context = nullptr);
+      bool is_index = false, GetContext* get_context = nullptr,
+      CacheAllocator* allocator = nullptr);
 
   // Put a raw block (maybe compressed) to the corresponding block caches.
   // This method will perform decompression against raw_block if needed and then
@@ -322,7 +323,7 @@ class BlockBasedTable : public TableReader {
       CachableEntry<Block>* block, Block* raw_block, uint32_t format_version,
       const Slice& compression_dict, size_t read_amp_bytes_per_bit,
       bool is_index = false, Cache::Priority pri = Cache::Priority::LOW,
-      GetContext* get_context = nullptr);
+      GetContext* get_context = nullptr, CacheAllocator* allocator = nullptr);
 
   // Calls (*handle_result)(arg, ...) repeatedly, starting with the entry found
   // after a call to Seek(key), until handle_result returns false.
index ea97066ec40fa621316393a84da2177760aa15ec..489705758442a888f4c77aec883e29ca035c1921 100644 (file)
@@ -17,8 +17,9 @@
 #include "rocksdb/env.h"
 #include "table/block.h"
 #include "table/block_based_table_reader.h"
-#include "table/persistent_cache_helper.h"
 #include "table/format.h"
+#include "table/persistent_cache_helper.h"
+#include "util/cache_allocator.h"
 #include "util/coding.h"
 #include "util/compression.h"
 #include "util/crc32c.h"
@@ -107,9 +108,11 @@ bool BlockFetcher::TryGetCompressedBlockFromPersistentCache() {
   if (cache_options_.persistent_cache &&
       cache_options_.persistent_cache->IsCompressed()) {
     // lookup uncompressed cache mode p-cache
+    std::unique_ptr<char[]> raw_data;
     status_ = PersistentCacheHelper::LookupRawPage(
-        cache_options_, handle_, &heap_buf_, block_size_ + kBlockTrailerSize);
+        cache_options_, handle_, &raw_data, block_size_ + kBlockTrailerSize);
     if (status_.ok()) {
+      heap_buf_ = CacheAllocationPtr(raw_data.release());
       used_buf_ = heap_buf_.get();
       slice_ = Slice(heap_buf_.get(), block_size_);
       return true;
@@ -132,7 +135,7 @@ void BlockFetcher::PrepareBufferForBlockFromFile() {
     // trivially allocated stack buffer instead of needing a full malloc()
     used_buf_ = &stack_buf_[0];
   } else {
-    heap_buf_.reset(new char[block_size_ + kBlockTrailerSize]);
+    heap_buf_ = AllocateBlock(block_size_ + kBlockTrailerSize, allocator_);
     used_buf_ = heap_buf_.get();
   }
 }
@@ -170,7 +173,7 @@ void BlockFetcher::GetBlockContents() {
     // or heap provided. Refer to https://github.com/facebook/rocksdb/pull/4096
     if (got_from_prefetch_buffer_ || used_buf_ == &stack_buf_[0]) {
       assert(used_buf_ != heap_buf_.get());
-      heap_buf_.reset(new char[block_size_ + kBlockTrailerSize]);
+      heap_buf_ = AllocateBlock(block_size_ + kBlockTrailerSize, allocator_);
       memcpy(heap_buf_.get(), used_buf_, block_size_ + kBlockTrailerSize);
     }
     *contents_ = BlockContents(std::move(heap_buf_), block_size_, true,
@@ -228,9 +231,9 @@ Status BlockFetcher::ReadBlockContents() {
   if (do_uncompress_ && compression_type != kNoCompression) {
     // compressed page, uncompress, update cache
     UncompressionContext uncompression_ctx(compression_type, compression_dict_);
-    status_ =
-        UncompressBlockContents(uncompression_ctx, slice_.data(), block_size_,
-                                contents_, footer_.version(), ioptions_);
+    status_ = UncompressBlockContents(uncompression_ctx, slice_.data(),
+                                      block_size_, contents_, footer_.version(),
+                                      ioptions_, allocator_);
   } else {
     GetBlockContents();
   }
index 9e0d2448dd57973f199c197024e4aca8b8ed7ef4..a8d9d657240ab442b9c838f45f0a0513da532f01 100644 (file)
@@ -11,6 +11,8 @@
 #include "table/block.h"
 #include "table/format.h"
 
+#include "util/cache_allocator.h"
+
 namespace rocksdb {
 class BlockFetcher {
  public:
@@ -26,6 +28,7 @@ class BlockFetcher {
                BlockContents* contents, const ImmutableCFOptions& ioptions,
                bool do_uncompress, const Slice& compression_dict,
                const PersistentCacheOptions& cache_options,
+               CacheAllocator* allocator = nullptr,
                const bool immortal_source = false)
       : file_(file),
         prefetch_buffer_(prefetch_buffer),
@@ -37,7 +40,8 @@ class BlockFetcher {
         do_uncompress_(do_uncompress),
         immortal_source_(immortal_source),
         compression_dict_(compression_dict),
-        cache_options_(cache_options) {}
+        cache_options_(cache_options),
+        allocator_(allocator) {}
   Status ReadBlockContents();
 
  private:
@@ -54,11 +58,12 @@ class BlockFetcher {
   const bool immortal_source_;
   const Slice& compression_dict_;
   const PersistentCacheOptions& cache_options_;
+  CacheAllocator* allocator_;
   Status status_;
   Slice slice_;
   char* used_buf_ = nullptr;
   size_t block_size_;
-  std::unique_ptr<char[]> heap_buf_;
+  CacheAllocationPtr heap_buf_;
   char stack_buf_[kDefaultStackBufferSize];
   bool got_from_prefetch_buffer_ = false;
   rocksdb::CompressionType compression_type;
index 16d959c3dceb94f6b99526cea6edc81a3a162cc8..f565fb14acc1b668a8aed2690b4e366e1e90eb42 100644 (file)
@@ -19,6 +19,7 @@
 #include "table/block_based_table_reader.h"
 #include "table/block_fetcher.h"
 #include "table/persistent_cache_helper.h"
+#include "util/cache_allocator.h"
 #include "util/coding.h"
 #include "util/compression.h"
 #include "util/crc32c.h"
@@ -279,8 +280,9 @@ Status ReadFooterFromFile(RandomAccessFileReader* file,
 Status UncompressBlockContentsForCompressionType(
     const UncompressionContext& uncompression_ctx, const char* data, size_t n,
     BlockContents* contents, uint32_t format_version,
-    const ImmutableCFOptions& ioptions) {
-  std::unique_ptr<char[]> ubuf;
+    const ImmutableCFOptions& ioptions,
+    CacheAllocator* allocator) {
+  CacheAllocationPtr ubuf;
 
   assert(uncompression_ctx.type() != kNoCompression &&
          "Invalid compression type");
@@ -296,7 +298,7 @@ Status UncompressBlockContentsForCompressionType(
       if (!Snappy_GetUncompressedLength(data, n, &ulength)) {
         return Status::Corruption(snappy_corrupt_msg);
       }
-      ubuf.reset(new char[ulength]);
+      ubuf = AllocateBlock(ulength, allocator);
       if (!Snappy_Uncompress(data, n, ubuf.get())) {
         return Status::Corruption(snappy_corrupt_msg);
       }
@@ -304,9 +306,10 @@ Status UncompressBlockContentsForCompressionType(
       break;
     }
     case kZlibCompression:
-      ubuf.reset(Zlib_Uncompress(
+      ubuf = Zlib_Uncompress(
           uncompression_ctx, data, n, &decompress_size,
-          GetCompressFormatForVersion(kZlibCompression, format_version)));
+          GetCompressFormatForVersion(kZlibCompression, format_version),
+          allocator);
       if (!ubuf) {
         static char zlib_corrupt_msg[] =
           "Zlib not supported or corrupted Zlib compressed block contents";
@@ -316,9 +319,10 @@ Status UncompressBlockContentsForCompressionType(
           BlockContents(std::move(ubuf), decompress_size, true, kNoCompression);
       break;
     case kBZip2Compression:
-      ubuf.reset(BZip2_Uncompress(
+      ubuf = BZip2_Uncompress(
           data, n, &decompress_size,
-          GetCompressFormatForVersion(kBZip2Compression, format_version)));
+          GetCompressFormatForVersion(kBZip2Compression, format_version),
+          allocator);
       if (!ubuf) {
         static char bzip2_corrupt_msg[] =
           "Bzip2 not supported or corrupted Bzip2 compressed block contents";
@@ -328,9 +332,10 @@ Status UncompressBlockContentsForCompressionType(
           BlockContents(std::move(ubuf), decompress_size, true, kNoCompression);
       break;
     case kLZ4Compression:
-      ubuf.reset(LZ4_Uncompress(
+      ubuf = LZ4_Uncompress(
           uncompression_ctx, data, n, &decompress_size,
-          GetCompressFormatForVersion(kLZ4Compression, format_version)));
+          GetCompressFormatForVersion(kLZ4Compression, format_version),
+          allocator);
       if (!ubuf) {
         static char lz4_corrupt_msg[] =
           "LZ4 not supported or corrupted LZ4 compressed block contents";
@@ -340,9 +345,10 @@ Status UncompressBlockContentsForCompressionType(
           BlockContents(std::move(ubuf), decompress_size, true, kNoCompression);
       break;
     case kLZ4HCCompression:
-      ubuf.reset(LZ4_Uncompress(
+      ubuf = LZ4_Uncompress(
           uncompression_ctx, data, n, &decompress_size,
-          GetCompressFormatForVersion(kLZ4HCCompression, format_version)));
+          GetCompressFormatForVersion(kLZ4HCCompression, format_version),
+          allocator);
       if (!ubuf) {
         static char lz4hc_corrupt_msg[] =
           "LZ4HC not supported or corrupted LZ4HC compressed block contents";
@@ -352,6 +358,8 @@ Status UncompressBlockContentsForCompressionType(
           BlockContents(std::move(ubuf), decompress_size, true, kNoCompression);
       break;
     case kXpressCompression:
+      // XPRESS allocates memory internally, thus no support for custom
+      // allocator.
       ubuf.reset(XPRESS_Uncompress(data, n, &decompress_size));
       if (!ubuf) {
         static char xpress_corrupt_msg[] =
@@ -363,7 +371,8 @@ Status UncompressBlockContentsForCompressionType(
       break;
     case kZSTD:
     case kZSTDNotFinalCompression:
-      ubuf.reset(ZSTD_Uncompress(uncompression_ctx, data, n, &decompress_size));
+      ubuf = ZSTD_Uncompress(uncompression_ctx, data, n, &decompress_size,
+                             allocator);
       if (!ubuf) {
         static char zstd_corrupt_msg[] =
             "ZSTD not supported or corrupted ZSTD compressed block contents";
@@ -396,11 +405,13 @@ Status UncompressBlockContentsForCompressionType(
 Status UncompressBlockContents(const UncompressionContext& uncompression_ctx,
                                const char* data, size_t n,
                                BlockContents* contents, uint32_t format_version,
-                               const ImmutableCFOptions& ioptions) {
+                               const ImmutableCFOptions& ioptions,
+                               CacheAllocator* allocator) {
   assert(data[n] != kNoCompression);
   assert(data[n] == uncompression_ctx.type());
-  return UncompressBlockContentsForCompressionType(
-      uncompression_ctx, data, n, contents, format_version, ioptions);
+  return UncompressBlockContentsForCompressionType(uncompression_ctx, data, n,
+                                                   contents, format_version,
+                                                   ioptions, allocator);
 }
 
 }  // namespace rocksdb
index 6e0e99c1c74c181367607b6d2450f2f8c6b1762e..441d7107e07111c01fa9753f2d8285f8b6981864 100644 (file)
@@ -26,6 +26,7 @@
 #include "port/port.h"  // noexcept
 #include "table/persistent_cache_options.h"
 #include "util/file_reader_writer.h"
+#include "util/cache_allocator.h"
 
 namespace rocksdb {
 
@@ -192,7 +193,7 @@ struct BlockContents {
   Slice data;     // Actual contents of data
   bool cachable;  // True iff data can be cached
   CompressionType compression_type;
-  std::unique_ptr<char[]> allocation;
+  CacheAllocationPtr allocation;
 
   BlockContents() : cachable(false), compression_type(kNoCompression) {}
 
@@ -200,16 +201,28 @@ struct BlockContents {
                 CompressionType _compression_type)
       : data(_data), cachable(_cachable), compression_type(_compression_type) {}
 
-  BlockContents(std::unique_ptr<char[]>&& _data, size_t _size, bool _cachable,
+  BlockContents(CacheAllocationPtr&& _data, size_t _size, bool _cachable,
                 CompressionType _compression_type)
       : data(_data.get(), _size),
         cachable(_cachable),
         compression_type(_compression_type),
         allocation(std::move(_data)) {}
 
+  BlockContents(std::unique_ptr<char[]>&& _data, size_t _size, bool _cachable,
+                CompressionType _compression_type)
+      : data(_data.get(), _size),
+        cachable(_cachable),
+        compression_type(_compression_type) {
+    allocation.reset(_data.release());
+  }
+
   // The additional memory space taken by the block data.
   size_t usable_size() const {
     if (allocation.get() != nullptr) {
+      auto allocator = allocation.get_deleter().allocator;
+      if (allocator) {
+        return allocator->UsableSize(allocation.get(), data.size());
+      }
 #ifdef ROCKSDB_MALLOC_USABLE_SIZE
       return malloc_usable_size(allocation.get());
 #else
@@ -252,7 +265,7 @@ extern Status ReadBlockContents(
 extern Status UncompressBlockContents(
     const UncompressionContext& uncompression_ctx, const char* data, size_t n,
     BlockContents* contents, uint32_t compress_format_version,
-    const ImmutableCFOptions& ioptions);
+    const ImmutableCFOptions& ioptions, CacheAllocator* allocator = nullptr);
 
 // This is an extension to UncompressBlockContents that accepts
 // a specific compression type. This is used by un-wrapped blocks
@@ -260,7 +273,7 @@ extern Status UncompressBlockContents(
 extern Status UncompressBlockContentsForCompressionType(
     const UncompressionContext& uncompression_ctx, const char* data, size_t n,
     BlockContents* contents, uint32_t compress_format_version,
-    const ImmutableCFOptions& ioptions);
+    const ImmutableCFOptions& ioptions, CacheAllocator* allocator = nullptr);
 
 // Implementation details follow.  Clients should ignore,
 
index df08a98fa17d2411a802aa9a0db82a45b7fad8e5..d7a8ed4aa4856f3034464389c7a55036e1a1695a 100644 (file)
@@ -153,8 +153,8 @@ class PlainTableReader: public TableReader {
   DynamicBloom bloom_;
   PlainTableReaderFileInfo file_info_;
   Arena arena_;
-  std::unique_ptr<char[]> index_block_alloc_;
-  std::unique_ptr<char[]> bloom_block_alloc_;
+  CacheAllocationPtr index_block_alloc_;
+  CacheAllocationPtr bloom_block_alloc_;
 
   const ImmutableCFOptions& ioptions_;
   uint64_t file_size_;
index 26383fa81797bd0d4c076561d6382e83f981980a..a525c58663abb1c54923ee582d9c91e3215acc29 100644 (file)
@@ -2477,6 +2477,78 @@ TEST_P(BlockBasedTableTest, BlockCacheLeak) {
   c.ResetTableReader();
 }
 
+namespace {
+class CustomCacheAllocator : public CacheAllocator {
+ public:
+  virtual const char* Name() const override { return "CustomCacheAllocator"; }
+
+  void* Allocate(size_t size) override {
+    ++numAllocations;
+    auto ptr = new char[size + 16];
+    memcpy(ptr, "cache_allocator_", 16);  // mangle first 16 bytes
+    return reinterpret_cast<void*>(ptr + 16);
+  }
+  void Deallocate(void* p) override {
+    ++numDeallocations;
+    char* ptr = reinterpret_cast<char*>(p) - 16;
+    delete[] ptr;
+  }
+
+  std::atomic<int> numAllocations;
+  std::atomic<int> numDeallocations;
+};
+}  // namespace
+
+TEST_P(BlockBasedTableTest, CacheAllocator) {
+  auto custom_cache_allocator = std::make_shared<CustomCacheAllocator>();
+  {
+    Options opt;
+    unique_ptr<InternalKeyComparator> ikc;
+    ikc.reset(new test::PlainInternalKeyComparator(opt.comparator));
+    opt.compression = kNoCompression;
+    BlockBasedTableOptions table_options;
+    table_options.block_size = 1024;
+    LRUCacheOptions lruOptions;
+    lruOptions.cache_allocator = custom_cache_allocator;
+    lruOptions.capacity = 16 * 1024 * 1024;
+    lruOptions.num_shard_bits = 4;
+    table_options.block_cache = NewLRUCache(std::move(lruOptions));
+    opt.table_factory.reset(NewBlockBasedTableFactory(table_options));
+
+    TableConstructor c(BytewiseComparator(),
+                       true /* convert_to_internal_key_ */);
+    c.Add("k01", "hello");
+    c.Add("k02", "hello2");
+    c.Add("k03", std::string(10000, 'x'));
+    c.Add("k04", std::string(200000, 'x'));
+    c.Add("k05", std::string(300000, 'x'));
+    c.Add("k06", "hello3");
+    c.Add("k07", std::string(100000, 'x'));
+    std::vector<std::string> keys;
+    stl_wrappers::KVMap kvmap;
+    const ImmutableCFOptions ioptions(opt);
+    const MutableCFOptions moptions(opt);
+    c.Finish(opt, ioptions, moptions, table_options, *ikc, &keys, &kvmap);
+
+    unique_ptr<InternalIterator> iter(
+        c.NewIterator(moptions.prefix_extractor.get()));
+    iter->SeekToFirst();
+    while (iter->Valid()) {
+      iter->key();
+      iter->value();
+      iter->Next();
+    }
+    ASSERT_OK(iter->status());
+  }
+
+  // out of scope, block cache should have been deleted, all allocations
+  // deallocated
+  EXPECT_EQ(custom_cache_allocator->numAllocations.load(),
+            custom_cache_allocator->numDeallocations.load());
+  // make sure that allocations actually happened through the cache allocator
+  EXPECT_GT(custom_cache_allocator->numAllocations.load(), 0);
+}
+
 TEST_P(BlockBasedTableTest, NewIndexIteratorLeak) {
   // A regression test to avoid data race described in
   // https://github.com/facebook/rocksdb/issues/1267
index e3560d6fac86d027504fd5185f1ea4a3b7907550..9b72f3a9106145390cced5e4f5eb1d5961df7c51 100644 (file)
@@ -3040,7 +3040,7 @@ void VerifyDBFromDB(std::string& truth_db_name) {
     int64_t bytes = 0;
     int decompress_size;
     while (ok && bytes < 1024 * 1048576) {
-      char *uncompressed = nullptr;
+      CacheAllocationPtr uncompressed;
       switch (FLAGS_compression_type_e) {
         case rocksdb::kSnappyCompression: {
           // get size and allocate here to make comparison fair
@@ -3050,45 +3050,44 @@ void VerifyDBFromDB(std::string& truth_db_name) {
             ok = false;
             break;
           }
-          uncompressed = new char[ulength];
+          uncompressed = AllocateBlock(ulength, nullptr);
           ok = Snappy_Uncompress(compressed.data(), compressed.size(),
-                                 uncompressed);
+                                 uncompressed.get());
           break;
         }
       case rocksdb::kZlibCompression:
         uncompressed = Zlib_Uncompress(uncompression_ctx, compressed.data(),
                                        compressed.size(), &decompress_size, 2);
-        ok = uncompressed != nullptr;
+        ok = uncompressed.get() != nullptr;
         break;
       case rocksdb::kBZip2Compression:
         uncompressed = BZip2_Uncompress(compressed.data(), compressed.size(),
                                         &decompress_size, 2);
-        ok = uncompressed != nullptr;
+        ok = uncompressed.get() != nullptr;
         break;
       case rocksdb::kLZ4Compression:
         uncompressed = LZ4_Uncompress(uncompression_ctx, compressed.data(),
                                       compressed.size(), &decompress_size, 2);
-        ok = uncompressed != nullptr;
+        ok = uncompressed.get() != nullptr;
         break;
       case rocksdb::kLZ4HCCompression:
         uncompressed = LZ4_Uncompress(uncompression_ctx, compressed.data(),
                                       compressed.size(), &decompress_size, 2);
-        ok = uncompressed != nullptr;
+        ok = uncompressed.get() != nullptr;
         break;
       case rocksdb::kXpressCompression:
-        uncompressed = XPRESS_Uncompress(compressed.data(), compressed.size(),
-          &decompress_size);
-        ok = uncompressed != nullptr;
+        uncompressed.reset(XPRESS_Uncompress(
+            compressed.data(), compressed.size(), &decompress_size));
+        ok = uncompressed.get() != nullptr;
         break;
       case rocksdb::kZSTD:
         uncompressed = ZSTD_Uncompress(uncompression_ctx, compressed.data(),
                                        compressed.size(), &decompress_size);
-        ok = uncompressed != nullptr;
+        ok = uncompressed.get() != nullptr;
         break;
       default:
         ok = false;
       }
-      delete[] uncompressed;
       bytes += input.size();
       thread->stats.FinishedOps(nullptr, nullptr, 1, kUncompress);
     }
diff --git a/util/cache_allocator.h b/util/cache_allocator.h
new file mode 100644 (file)
index 0000000..68dd3dc
--- /dev/null
@@ -0,0 +1,38 @@
+//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
+//  This source code is licensed under both the GPLv2 (found in the
+//  COPYING file in the root directory) and Apache 2.0 License
+//  (found in the LICENSE.Apache file in the root directory).
+//
+
+#pragma once
+
+#include "rocksdb/cache_allocator.h"
+
+namespace rocksdb {
+
+struct CustomDeleter {
+  CustomDeleter(CacheAllocator* a = nullptr) : allocator(a) {}
+
+  void operator()(char* ptr) const {
+    if (allocator) {
+      allocator->Deallocate(reinterpret_cast<void*>(ptr));
+    } else {
+      delete[] ptr;
+    }
+  }
+
+  CacheAllocator* allocator;
+};
+
+using CacheAllocationPtr = std::unique_ptr<char[], CustomDeleter>;
+
+inline CacheAllocationPtr AllocateBlock(size_t size,
+                                        CacheAllocator* allocator) {
+  if (allocator) {
+    auto block = reinterpret_cast<char*>(allocator->Allocate(size));
+    return CacheAllocationPtr(block, allocator);
+  }
+  return CacheAllocationPtr(new char[size]);
+}
+
+}  // namespace rocksdb
index e918e14fbec736b686f2a63e21774bae0d9c21b1..d7bab05ed0f450e1d8f0bb09ce6ae0321cbddfe1 100644 (file)
@@ -14,6 +14,8 @@
 #include <string>
 
 #include "rocksdb/options.h"
+#include "rocksdb/table.h"
+#include "util/cache_allocator.h"
 #include "util/coding.h"
 #include "util/compression_context_cache.h"
 
@@ -495,11 +497,10 @@ inline bool Zlib_Compress(const CompressionContext& ctx,
 // header in varint32 format
 // @param compression_dict Data for presetting the compression library's
 //    dictionary.
-inline char* Zlib_Uncompress(const UncompressionContext& ctx,
-                             const char* input_data, size_t input_length,
-                             int* decompress_size,
-                             uint32_t compress_format_version,
-                             int windowBits = -14) {
+inline CacheAllocationPtr Zlib_Uncompress(
+    const UncompressionContext& ctx, const char* input_data,
+    size_t input_length, int* decompress_size, uint32_t compress_format_version,
+    CacheAllocator* allocator = nullptr, int windowBits = -14) {
 #ifdef ZLIB
   uint32_t output_len = 0;
   if (compress_format_version == 2) {
@@ -541,9 +542,9 @@ inline char* Zlib_Uncompress(const UncompressionContext& ctx,
   _stream.next_in = (Bytef*)input_data;
   _stream.avail_in = static_cast<unsigned int>(input_length);
 
-  char* output = new char[output_len];
+  auto output = AllocateBlock(output_len, allocator);
 
-  _stream.next_out = (Bytef*)output;
+  _stream.next_out = (Bytef*)output.get();
   _stream.avail_out = static_cast<unsigned int>(output_len);
 
   bool done = false;
@@ -561,19 +562,17 @@ inline char* Zlib_Uncompress(const UncompressionContext& ctx,
         size_t old_sz = output_len;
         uint32_t output_len_delta = output_len / 5;
         output_len += output_len_delta < 10 ? 10 : output_len_delta;
-        char* tmp = new char[output_len];
-        memcpy(tmp, output, old_sz);
-        delete[] output;
-        output = tmp;
+        auto tmp = AllocateBlock(output_len, allocator);
+        memcpy(tmp.get(), output.get(), old_sz);
+        output = std::move(tmp);
 
         // Set more output.
-        _stream.next_out = (Bytef*)(output + old_sz);
+        _stream.next_out = (Bytef*)(output.get() + old_sz);
         _stream.avail_out = static_cast<unsigned int>(output_len - old_sz);
         break;
       }
       case Z_BUF_ERROR:
       default:
-        delete[] output;
         inflateEnd(&_stream);
         return nullptr;
     }
@@ -660,9 +659,9 @@ inline bool BZip2_Compress(const CompressionContext& /*ctx*/,
 // block header
 // compress_format_version == 2 -- decompressed size is included in the block
 // header in varint32 format
-inline char* BZip2_Uncompress(const char* input_data, size_t input_length,
-                              int* decompress_size,
-                              uint32_t compress_format_version) {
+inline CacheAllocationPtr BZip2_Uncompress(
+    const char* input_data, size_t input_length, int* decompress_size,
+    uint32_t compress_format_version, CacheAllocator* allocator = nullptr) {
 #ifdef BZIP2
   uint32_t output_len = 0;
   if (compress_format_version == 2) {
@@ -690,9 +689,9 @@ inline char* BZip2_Uncompress(const char* input_data, size_t input_length,
   _stream.next_in = (char*)input_data;
   _stream.avail_in = static_cast<unsigned int>(input_length);
 
-  char* output = new char[output_len];
+  auto output = AllocateBlock(output_len, allocator);
 
-  _stream.next_out = (char*)output;
+  _stream.next_out = (char*)output.get();
   _stream.avail_out = static_cast<unsigned int>(output_len);
 
   bool done = false;
@@ -709,18 +708,16 @@ inline char* BZip2_Uncompress(const char* input_data, size_t input_length,
         assert(compress_format_version != 2);
         uint32_t old_sz = output_len;
         output_len = output_len * 1.2;
-        char* tmp = new char[output_len];
-        memcpy(tmp, output, old_sz);
-        delete[] output;
-        output = tmp;
+        auto tmp = AllocateBlock(output_len, allocator);
+        memcpy(tmp.get(), output.get(), old_sz);
+        output = std::move(tmp);
 
         // Set more output.
-        _stream.next_out = (char*)(output + old_sz);
+        _stream.next_out = (char*)(output.get() + old_sz);
         _stream.avail_out = static_cast<unsigned int>(output_len - old_sz);
         break;
       }
       default:
-        delete[] output;
         BZ2_bzDecompressEnd(&_stream);
         return nullptr;
     }
@@ -814,10 +811,12 @@ inline bool LZ4_Compress(const CompressionContext& ctx,
 // header in varint32 format
 // @param compression_dict Data for presetting the compression library's
 //    dictionary.
-inline char* LZ4_Uncompress(const UncompressionContext& ctx,
-                            const char* input_data, size_t input_length,
-                            int* decompress_size,
-                            uint32_t compress_format_version) {
+inline CacheAllocationPtr LZ4_Uncompress(const UncompressionContext& ctx,
+                                         const char* input_data,
+                                         size_t input_length,
+                                         int* decompress_size,
+                                         uint32_t compress_format_version,
+                                         CacheAllocator* allocator = nullptr) {
 #ifdef LZ4
   uint32_t output_len = 0;
   if (compress_format_version == 2) {
@@ -837,7 +836,7 @@ inline char* LZ4_Uncompress(const UncompressionContext& ctx,
     input_data += 8;
   }
 
-  char* output = new char[output_len];
+  auto output = AllocateBlock(output_len, allocator);
 #if LZ4_VERSION_NUMBER >= 10400  // r124+
   LZ4_streamDecode_t* stream = LZ4_createStreamDecode();
   if (ctx.dict().size()) {
@@ -845,17 +844,16 @@ inline char* LZ4_Uncompress(const UncompressionContext& ctx,
                         static_cast<int>(ctx.dict().size()));
   }
   *decompress_size = LZ4_decompress_safe_continue(
-      stream, input_data, output, static_cast<int>(input_length),
+      stream, input_data, output.get(), static_cast<int>(input_length),
       static_cast<int>(output_len));
   LZ4_freeStreamDecode(stream);
 #else   // up to r123
-  *decompress_size =
-      LZ4_decompress_safe(input_data, output, static_cast<int>(input_length),
-                          static_cast<int>(output_len));
+  *decompress_size = LZ4_decompress_safe(input_data, output.get(),
+                                         static_cast<int>(input_length),
+                                         static_cast<int>(output_len));
 #endif  // LZ4_VERSION_NUMBER >= 10400
 
   if (*decompress_size < 0) {
-    delete[] output;
     return nullptr;
   }
   assert(*decompress_size == static_cast<int>(output_len));
@@ -866,6 +864,7 @@ inline char* LZ4_Uncompress(const UncompressionContext& ctx,
   (void)input_length;
   (void)decompress_size;
   (void)compress_format_version;
+  (void)allocator;
   return nullptr;
 #endif
 }
@@ -1028,9 +1027,11 @@ inline bool ZSTD_Compress(const CompressionContext& ctx, const char* input,
 
 // @param compression_dict Data for presetting the compression library's
 //    dictionary.
-inline char* ZSTD_Uncompress(const UncompressionContext& ctx,
-                             const char* input_data, size_t input_length,
-                             int* decompress_size) {
+inline CacheAllocationPtr ZSTD_Uncompress(const UncompressionContext& ctx,
+                                          const char* input_data,
+                                          size_t input_length,
+                                          int* decompress_size,
+                                          CacheAllocator* allocator = nullptr) {
 #ifdef ZSTD
   uint32_t output_len = 0;
   if (!compression::GetDecompressedSizeInfo(&input_data, &input_length,
@@ -1038,17 +1039,17 @@ inline char* ZSTD_Uncompress(const UncompressionContext& ctx,
     return nullptr;
   }
 
-  char* output = new char[output_len];
+  auto output = AllocateBlock(output_len, allocator);
   size_t actual_output_length;
 #if ZSTD_VERSION_NUMBER >= 500  // v0.5.0+
   ZSTD_DCtx* context = ctx.GetZSTDContext();
   assert(context != nullptr);
   actual_output_length = ZSTD_decompress_usingDict(
-      context, output, output_len, input_data, input_length, ctx.dict().data(),
-      ctx.dict().size());
+      context, output.get(), output_len, input_data, input_length,
+      ctx.dict().data(), ctx.dict().size());
 #else   // up to v0.4.x
   actual_output_length =
-      ZSTD_decompress(output, output_len, input_data, input_length);
+      ZSTD_decompress(output.get(), output_len, input_data, input_length);
 #endif  // ZSTD_VERSION_NUMBER >= 500
   assert(actual_output_length == output_len);
   *decompress_size = static_cast<int>(actual_output_length);
@@ -1058,6 +1059,7 @@ inline char* ZSTD_Uncompress(const UncompressionContext& ctx,
   (void)input_data;
   (void)input_length;
   (void)decompress_size;
+  (void)allocator;
   return nullptr;
 #endif
 }