data_ptr = GetVarint32Ptr(data_ptr, data_ptr + 1,
static_cast<uint32_t*>(&source_32));
source = static_cast<CacheTier>(source_32);
- handle_value_charge -= (data_ptr - ptr->get());
+ uint64_t data_size = 0;
+ data_ptr = GetVarint64Ptr(data_ptr, ptr->get() + handle_value_charge,
+ static_cast<uint64_t*>(&data_size));
+ assert(handle_value_charge > data_size);
+ handle_value_charge = data_size;
}
MemoryAllocator* allocator = cache_options_.memory_allocator.get();
}
auto internal_helper = GetHelper(cache_options_.enable_custom_split_merge);
- char header[10];
+ char header[20];
char* payload = header;
payload = EncodeVarint32(payload, static_cast<uint32_t>(type));
payload = EncodeVarint32(payload, static_cast<uint32_t>(source));
+ size_t data_size = (*helper->size_cb)(value);
+ char* data_size_ptr = payload;
+ payload = EncodeVarint64(payload, data_size);
size_t header_size = payload - header;
- size_t data_size = (*helper->size_cb)(value);
size_t total_size = data_size + header_size;
CacheAllocationPtr ptr =
AllocateBlock(total_size, cache_options_.memory_allocator.get());
val = Slice(compressed_val);
data_size = compressed_val.size();
+ payload = EncodeVarint64(data_size_ptr, data_size);
+ header_size = payload - header;
total_size = header_size + data_size;
PERF_COUNTER_ADD(compressed_sec_cache_compressed_bytes, data_size);
PERF_COUNTER_ADD(compressed_sec_cache_insert_real_count, 1);
if (cache_options_.enable_custom_split_merge) {
- size_t charge{0};
- CacheValueChunk* value_chunks_head =
- SplitValueIntoChunks(val, cache_options_.compression_type, charge);
- return cache_->Insert(key, value_chunks_head, internal_helper, charge);
+ size_t split_charge{0};
+ CacheValueChunk* value_chunks_head = SplitValueIntoChunks(
+ val, cache_options_.compression_type, split_charge);
+ return cache_->Insert(key, value_chunks_head, internal_helper,
+ split_charge);
} else {
+#ifdef ROCKSDB_MALLOC_USABLE_SIZE
+ size_t charge = malloc_usable_size(ptr.get());
+#else
+ size_t charge = total_size;
+#endif
std::memcpy(ptr.get(), header, header_size);
CacheAllocationPtr* buf = new CacheAllocationPtr(std::move(ptr));
- return cache_->Insert(key, buf, internal_helper, total_size);
+ charge += sizeof(CacheAllocationPtr);
+ return cache_->Insert(key, buf, internal_helper, charge);
}
}
}
}
+size_t CompressedSecondaryCache::TEST_GetCharge(const Slice& key) {
+ Cache::Handle* lru_handle = cache_->Lookup(key);
+ if (lru_handle == nullptr) {
+ return 0;
+ }
+
+ size_t charge = cache_->GetCharge(lru_handle);
+ if (cache_->Value(lru_handle) != nullptr &&
+ !cache_options_.enable_custom_split_merge) {
+ charge -= 10;
+ }
+ cache_->Release(lru_handle, /*erase_if_last_ref=*/false);
+ return charge;
+}
+
std::shared_ptr<SecondaryCache>
CompressedSecondaryCacheOptions::MakeSharedSecondaryCache() const {
return std::make_shared<CompressedSecondaryCache>(*this);
protected:
void BasicTestHelper(std::shared_ptr<SecondaryCache> sec_cache,
bool sec_cache_is_compressed) {
+ CompressedSecondaryCache* comp_sec_cache =
+ static_cast<CompressedSecondaryCache*>(sec_cache.get());
get_perf_context()->Reset();
bool kept_in_sec_cache{true};
// Lookup an non-existent key.
ASSERT_OK(sec_cache->Insert(key1, &item1, GetHelper(), false));
ASSERT_EQ(get_perf_context()->compressed_sec_cache_insert_real_count, 1);
+ ASSERT_GT(comp_sec_cache->TEST_GetCharge(key1), 1000);
+
std::unique_ptr<SecondaryCacheResultHandle> handle1_2 =
sec_cache->Lookup(key1, GetHelper(), this, true, /*advise_erase=*/true,
/*stats=*/nullptr, kept_in_sec_cache);