From: Marcel Lauhoff Date: Tue, 11 Feb 2025 12:48:22 +0000 (+0100) Subject: common: Add WebCache X-Git-Tag: v21.0.1~14^2~17 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=9618827200a5e7d686bb1e65dcf3f2bb2b1ee4df;p=ceph.git common: Add WebCache A cache data structure for values that need to be retrieved form outside systems (e.g Key Management Systems). Features: - Thread safe, optimized for concurrent lookups and cache hits - Entry TTL expiration - Cache replacement strategy tuned to "web" workloads (SIEVE) - Performance Counters on hit, miss, expire, size, capacity, clears Signed-off-by: Marcel Lauhoff On-behalf-of: SAP marcel.lauhoff@sap.com --- diff --git a/src/common/web_cache.h b/src/common/web_cache.h new file mode 100644 index 00000000000..35e102a936e --- /dev/null +++ b/src/common/web_cache.h @@ -0,0 +1,559 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2025 Clyso GmbH + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#pragma once + +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "common/ceph_context.h" +#include "common/ceph_time.h" +#include "include/ceph_assert.h" +#include "include/common_fwd.h" +#include "common/perf_counters_collection.h" + +// Web Cache +// A cache for data living on other systems like Key Management Systems +// Goals/Features +// (1) Thread safe, bias towards handling highly concurrent lookups +// (2) Expire entries by ttl +// (3) Cache replacement tuned to "web" workloads +// +// Algorithm +// The implementation is based on SIEVE [0] with additional TTL +// expiration support +// +// Data Structures +// - key lookup table: key -> Node{value ptr, metadata) (owns nodes) +// - SIEVE FIFO + hand intrusive list on lookup's nodes + pointer +// +// Cache Stampedes +// +// Occurs when numerous threads simultaneously request the same data +// that is not yet in the cache. This can lead to many redundant, +// costly requests, that may overwhelm external systems. Although +// WebCache does not have built-in stampede mitigation, the lookup_or +// function is designed to be used with additional logic, such as +// ceph::async::call_once / std::call_once. +// +// [0] Zhang, Yazhuo, et al. "{SIEVE} is Simpler than {LRU}: an Efficient +// {Turn-Key} Eviction Algorithm for Web Caches." 21st USENIX +// Symposium on Networked Systems Design and Implementation (NSDI 24). +// 2024. + +class WebCacheTest; + +namespace webcache { + +enum class Metric { + metrics_start = 84000, + hit, + miss, + expired, + size, + capacity, + clear, + metrics_stop +}; + +template +class WebCache { + public: + // TODO(irq0) let the user choose the value pointer type + using ValuePtr = std::shared_ptr; + + protected: + struct Node : public boost::intrusive::list_base_hook<> { + std::atomic_bool visited; + ceph::real_time expires_at; + Key const* key; + ValuePtr value; + + explicit Node(ValuePtr value, ceph::timespan ttl) + : visited(false), + expires_at(ceph::real_clock::now() + ttl), + key(nullptr), + value(std::move(value)) {}; + // for testing + Node(ValuePtr value, ceph::real_time expires_at) + : visited(false), + expires_at(expires_at), + key(nullptr), + value(std::move(value)) {}; + + Node(Node&&) = default; + Node& operator=(Node&&) = default; + Node(const Node&) = delete; + Node& operator=(const Node&) = delete; + ~Node() = default; + + friend std::ostream& operator<<( + std::ostream& os, const WebCache::Node& node) { + fmt::print( + os, "$n[{}->{} v:{} expires:{}]", fmt::ptr(node.key), + fmt::ptr(node.value.get()), node.visited.load(), node.expires_at); + return os; + } + }; + using SieveQueue = boost::intrusive::list; + + private: + CephContext* _cct; + std::string _name; + PerfCounters* _perf; + size_t _capacity; + ceph::timespan _ttl; + std::unordered_map _lookup; + SieveQueue _sieve_queue; + Node* _sieve_hand; + mutable std::shared_mutex _cache_mutex; + + protected: + // sieve_evict removes the next node using the SIEVE algorithm from + // _sieve_queue and returns a pointer to the evicted node + Node* sieve_evict(); + + // sieve_expire_erase_unmutexed removes all expired nodes from the + // sieve_queue in place. It writes the expired nodes to out_expired. + // Updates the sieve hand + static void sieve_expire_erase_unmutexed( + SieveQueue& sieve_queue, Node* sieve_hand, + ceph::real_time eviction_cutoff, SieveQueue& out_expired); + + using SieveRemoveRet = std::pair; + // sieve_remove_unmutexed removes a node from sieve_queue and + // returns an iterator to next (or end()) and an updated sieve hand + static SieveRemoveRet sieve_remove_unmutexed( + SieveQueue& sieve_queue, Node* sieve_hand, const Node& node); + + static PerfCounters* initialize_perf_counters( + CephContext* cct, const std::string& name); + + std::optional lookup_unmutexed(const Key& key); + Node& insert_or_existing_unmutexed(const Key& key, ValuePtr value); + + void perf_tinc(Metric metric, ceph::timespan elapsed) { + if (_perf != nullptr) { + _perf->tinc(static_cast(metric), elapsed); + } + } + void perf_inc(Metric metric, uint64_t inc = 1) { + if (_perf != nullptr) { + _perf->inc(static_cast(metric), inc); + } + } + void perf_set(Metric metric, uint64_t set = 1) { + if (_perf != nullptr) { + _perf->set(static_cast(metric), set); + } + } + + public: + // For testing, custom use + explicit WebCache(size_t capacity, ceph::timespan ttl); + + // For system use, activates perf counters + WebCache( + CephContext* cct, const std::string& name, size_t capacity, + ceph::timespan ttl = ceph::timespan::zero()); + + // If enabled, remove perf counters (Requires perf counter subsystem + // running) + ~WebCache(); + + // lookup returns the stored value for a given key or not + std::optional lookup(const Key& key); + + // add caches a key/value pair. If key already exists it does + // nothing. Return the stored timestamp of the k/v mapping + ceph::real_time add(const Key& key, ValuePtr val); + + // lookup_or returns a value for key. If none is cached yet, insert + // new_val and return that. A common use is with + // ceph::async::call_once to add cache stampede mitigation + ValuePtr lookup_or(const Key& key, ValuePtr new_val); + + // update_ttl_if updates an entry's TTL if its value matches val. + // Return true if we updated an entry. False otherwise. + bool update_ttl_if( + const Key& key, const ValuePtr& val_ptr, ceph::timespan new_ttl); + + // remove_if removes an entry if it finds an entry where its values + // matches expected_val. return true if we removed an entry, false + // otherwise + bool remove_if(const Key& key, const ValuePtr& expected_val); + + size_t size() const; + size_t clear(); + + // expire_erase erases all expired cache entries + size_t expire_erase(); + + const std::string& name() { return _name; } + + PerfCounters* perf() { return _perf; } + + friend std::ostream& operator<<( + std::ostream& os, const WebCache& cache) { + std::shared_lock lock(cache._cache_mutex); + const auto now = ceph::real_clock::now(); + os << "$" << cache._name << "["; + + for (const auto& node : cache._sieve_queue) { + const auto ttl = node.expires_at - now; + fmt::print( + os, "\"{}\"({}){}{}", *(node.key), + std::chrono::duration_cast(ttl), + (&node == cache._sieve_hand) ? "👉" : "", node.visited ? "▮" : "▯"); + if (&node != &cache._sieve_queue.back()) { + os << ", "; + } + } + os << "]"; + return os; + } + + friend std::ostream& operator<<(std::ostream& os, const SieveQueue& nodes) { + for (const auto& node : nodes) { + os << node; + if (&node != &nodes.back()) { + os << ", "; + } + } + return os; + } + + friend class WebCacheTest; + friend class WebCacheConcurrencyTest; + friend class WebCacheRandomizedTest; + friend class WebCacheTest_SieveExample_Test; + friend class WebCacheTest_ExpireEraseOne_Test; + friend class WebCacheTest_ExpireEraseAll_Test; + friend class WebCacheTest_ExpireEraseEmpty_Test; + friend class WebCacheTest_ExpireEraseUpdatedTTLs_Test; + friend class WebCacheTest_SieveRemoveHand_Test; +}; + +template +WebCache::Node* WebCache::sieve_evict() { + if (_sieve_queue.empty()) { + return nullptr; + } + if (_sieve_hand == nullptr) { + _sieve_hand = &_sieve_queue.back(); + } + Node* result = nullptr; + const size_t queue_size_before = _sieve_queue.size(); + + auto hand = _sieve_queue.iterator_to(*_sieve_hand); + const auto rev_it = typename SieveQueue::reverse_iterator(std::next(hand)); + for (auto it = rev_it; it != _sieve_queue.rend(); ++it) { + Node& node = (*it); + if (node.visited) { + node.visited = false; + --hand; + } else { + hand = std::prev(_sieve_queue.erase(std::next(it).base())); + result = &node; + break; + } + } + // every node was visited. we need still need to evict the tail + if (result == nullptr) { + result = &_sieve_queue.back(); + _sieve_queue.pop_back(); + } + + ceph_assertf( + queue_size_before == 0 || (queue_size_before - _sieve_queue.size()) == 1, + "%d -> %d capacity:%d", queue_size_before, _sieve_queue.size(), + _capacity); + _sieve_hand = (hand == _sieve_queue.begin()) ? &_sieve_queue.back() : &*hand; + return result; +} + +template +WebCache::WebCache(size_t capacity, ceph::timespan ttl) + : _cct(nullptr), + _perf(nullptr), + _capacity(capacity), + _ttl(ttl), + _lookup(), + _sieve_queue(), + _sieve_hand(nullptr) {} + +template +WebCache::WebCache( + CephContext* cct, const std::string& name, size_t capacity, + ceph::timespan ttl) + : _cct(cct), + _name(name), + _perf(initialize_perf_counters(cct, name)), + _capacity(capacity), + _ttl(ttl), + _lookup(), + _sieve_queue(), + _sieve_hand(nullptr) { + ceph_assert(cct != nullptr); + perf_set(Metric::capacity, capacity); +} + +template +WebCache::~WebCache() { + if (_cct != nullptr && _perf != nullptr) { + _cct->get_perfcounters_collection()->remove(_perf); + } +} + +template +PerfCounters* WebCache::initialize_perf_counters( + CephContext* cct, const std::string& name) { + PerfCountersBuilder pcb( + cct, name, static_cast(Metric::metrics_start), + static_cast(Metric::metrics_stop)); + pcb.set_prio_default(PerfCountersBuilder::PRIO_USEFUL); + pcb.add_u64_counter(static_cast(Metric::hit), "hit", "Cache hits"); + pcb.add_u64_counter(static_cast(Metric::miss), "miss", "Cache misses"); + pcb.add_u64_counter( + static_cast(Metric::expired), "expired", "Expired cache entries"); + pcb.add_u64( + static_cast(Metric::size), "size", "Total number of cache entries"); + pcb.add_u64( + static_cast(Metric::capacity), "capacity", "Maximum cache capacity"); + pcb.add_u64_counter( + static_cast(Metric::clear), "clear", "Total number of cache clears"); + + auto* perf = pcb.create_perf_counters(); + cct->get_perfcounters_collection()->add(perf); + return perf; +} + +template +size_t WebCache::size() const { + std::shared_lock lock(_cache_mutex); + ceph_assert(_lookup.size() == _sieve_queue.size()); + return _lookup.size(); +} + +template +size_t WebCache::clear() { + std::lock_guard lock(_cache_mutex); + perf_inc(Metric::clear); + const size_t size_before = _sieve_queue.size(); + _sieve_queue.clear(); + _sieve_hand = nullptr; + _lookup.clear(); + perf_set(Metric::size, 0); + perf_set(Metric::hit, 0); + perf_set(Metric::miss, 0); + perf_set(Metric::expired, 0); + return size_before; +} + +template +ceph::real_time WebCache::add(const Key& key, ValuePtr value) { + // cache hit - fast under read lock + { + std::shared_lock lock(_cache_mutex); + if (auto search = _lookup.find(key); search != _lookup.end()) { + perf_inc(Metric::hit); + search->second.visited = true; + return search->second.expires_at; + } + } + // miss, take unique lock + { + std::lock_guard lock(_cache_mutex); + return insert_or_existing_unmutexed(key, value).expires_at; + } +} + +template +WebCache::ValuePtr WebCache::lookup_or( + const Key& key, ValuePtr new_val) { + { + std::shared_lock cache_lock(_cache_mutex); + auto maybe_value = lookup_unmutexed(key); + if (maybe_value.has_value()) { + perf_inc(Metric::hit); + return maybe_value.value(); + } + } + + // miss, take unique lock + { + std::lock_guard lock(_cache_mutex); + return insert_or_existing_unmutexed(key, new_val).value; + } +} + +template +bool WebCache::update_ttl_if( + const Key& key, const ValuePtr& val, ceph::timespan new_ttl) { + std::lock_guard lock(_cache_mutex); + if (auto search = _lookup.find(key); + (search != _lookup.end() && search->second.value == val)) { + search->second.expires_at = ceph::real_clock::now() + new_ttl; + return true; + } + return false; +} + +template +bool WebCache::remove_if( + const Key& key, const ValuePtr& expected_val) { + std::lock_guard lock(_cache_mutex); + if (auto search = _lookup.find(key); + (search != _lookup.end() && search->second.value == expected_val)) { + auto [_, hand_moved] = + sieve_remove_unmutexed(_sieve_queue, _sieve_hand, search->second); + _lookup.erase(search); + _sieve_hand = hand_moved; + return true; + } + return false; +} + +template +std::optional::ValuePtr> +WebCache::lookup_unmutexed(const Key& key) { + if (auto search = _lookup.find(key); search != _lookup.end()) { // cache hit + search->second.visited = true; + return search->second.value; + } else { + return std::nullopt; + } +} + +template +WebCache::Node& WebCache::insert_or_existing_unmutexed( + const Key& key, ValuePtr value) { + const auto& [it, took_place] = _lookup.emplace( + std::piecewise_construct, std::forward_as_tuple(key), + std::forward_as_tuple(std::move(value), _ttl)); + + if (took_place) { // cache miss + perf_inc(Metric::miss); + + ceph_assert(_lookup.size() == _sieve_queue.size() + 1); + // cache full? -> evict + if (_sieve_queue.size() >= _capacity) { + const auto node = sieve_evict(); + if (node != nullptr) { + _lookup.erase(*(node->key)); + } + } + auto& [stored_key, node] = *it; + node.key = &stored_key; + _sieve_queue.push_front(node); + perf_set(Metric::size, _lookup.size()); + return node; + } else { // cache hit + perf_inc(Metric::hit); + it->second.visited = true; + return it->second; + } +} + +template +WebCache::SieveRemoveRet +WebCache::sieve_remove_unmutexed( + SieveQueue& sieve_queue, Node* sieve_hand, const Node& node) { + const bool was_hand = &node == sieve_hand; + const auto node_it = sieve_queue.iterator_to(node); + auto it = sieve_queue.erase(node_it); + if (sieve_queue.empty()) { + return {sieve_queue.end(), nullptr}; + } + if (was_hand) { + return { + it, + (it == sieve_queue.begin()) ? &sieve_queue.back() : &(*std::prev(it))}; + } + return {it, sieve_hand}; +} + +template +std::optional::ValuePtr> +WebCache::lookup(const Key& key) { + std::shared_lock lock(_cache_mutex); + auto result = lookup_unmutexed(key); + if (result.has_value()) { + perf_inc(Metric::hit); + } else { + perf_inc(Metric::miss); + } + return result; +} + +template +void WebCache::sieve_expire_erase_unmutexed( + SieveQueue& sieve_queue, Node* sieve_hand, ceph::real_time eviction_cutoff, + SieveQueue& out_expired) { + // The sieve queue is ordered by ascending insertion time which + // would allow for efficient epiration by finding the first not + // expired element. BUT, since we allow updating TTLs this property + // no longer holds and we have do a full sieve queue sweep. + for (auto it = sieve_queue.begin(); it != sieve_queue.end();) { + Node& node = (*it); + const bool expired = node.expires_at <= eviction_cutoff; + if (expired) { + auto [next_it, next_hand] = + sieve_remove_unmutexed(sieve_queue, sieve_hand, node); + ceph_assert(!node.is_linked()); + out_expired.push_back(node); + it = next_it; + sieve_hand = next_hand; + } else { + ++it; + } + } +} + +template +size_t WebCache::expire_erase() { + std::lock_guard lock(_cache_mutex); + const auto expiration_cutoff = ceph::real_clock::now(); + const auto lookup_size_before = _lookup.size(); + SieveQueue expired; + sieve_expire_erase_unmutexed( + _sieve_queue, _sieve_hand, expiration_cutoff, expired); + const auto expired_size = expired.size(); + expired.clear_and_dispose( + [&](const Node* node) { _lookup.erase(*node->key); }); + ceph_assert((lookup_size_before - expired_size) == _lookup.size()); + perf_inc(Metric::expired, expired_size); + perf_set(Metric::size, _lookup.size()); + return expired_size; +} + +} // namespace webcache + +#if FMT_VERSION >= 90000 +template +struct fmt::formatter> : fmt::ostream_formatter { +}; +#endif diff --git a/src/test/common/CMakeLists.txt b/src/test/common/CMakeLists.txt index cbbce114355..655bbe9a6de 100644 --- a/src/test/common/CMakeLists.txt +++ b/src/test/common/CMakeLists.txt @@ -114,6 +114,13 @@ add_executable(unittest_shared_cache add_ceph_unittest(unittest_shared_cache) target_link_libraries(unittest_shared_cache global) +# unittest_web_cache +add_executable(unittest_web_cache + test_web_cache.cc + ) +add_ceph_unittest(unittest_web_cache) +target_link_libraries(unittest_web_cache ceph-common) + # unittest_sloppy_crc_map add_executable(unittest_sloppy_crc_map test_sloppy_crc_map.cc diff --git a/src/test/common/test_web_cache.cc b/src/test/common/test_web_cache.cc new file mode 100644 index 00000000000..524e4c0beb2 --- /dev/null +++ b/src/test/common/test_web_cache.cc @@ -0,0 +1,642 @@ +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "common/ceph_context.h" +#include "common/ceph_time.h" +#include "common/dout.h" +#include "common/JSONFormatter.h" +#include "common/web_cache.h" +#include "gtest/gtest.h" +#include "include/ceph_assert.h" +#include "include/msgr.h" +#include "include/uuid.h" +#include "log/Log.h" + +using namespace std::chrono_literals; + +namespace webcache { + +class WebCacheTest : public ::testing::Test { + protected: + static constexpr size_t SIEVE_EXAMPLE_CACHE_SIZE = 7; + const std::string a_key = "testkey"; + const std::string a_value = "testvalue"; + const std::shared_ptr a_valptr = + std::make_shared(a_value); + + const std::unique_ptr _cct; + size_t _initialized_capacity; + std::unique_ptr> _uut; + + WebCacheTest() + : _cct(new CephContext(CEPH_ENTITY_TYPE_ANY)), + _initialized_capacity(SIEVE_EXAMPLE_CACHE_SIZE), + _uut(std::make_unique>( + SIEVE_EXAMPLE_CACHE_SIZE, 42s)) { + _cct->_log->start(); + } + void TearDown() override { lderr(_cct.get()) << "AFTER: " << *_uut << dendl; } + + void reset_cache(size_t capacity) { + _uut = std::make_unique>(capacity, 42s); + _initialized_capacity = capacity; + } + void reset_cache_system_mode(size_t capacity) { + _uut = std::make_unique>( + _cct.get(), "testing", capacity); + _initialized_capacity = capacity; + } + + std::vector sieve_queue_keys() { + std::vector keys; + std::transform( + _uut->_sieve_queue.begin(), _uut->_sieve_queue.end(), + std::back_inserter(keys), [](const auto& node) { return *node.key; }); + return keys; + } + size_t sieve_hand_pos() { + if (_uut->_sieve_hand == nullptr) { + return _uut->_capacity -1; + } + return std::distance( + _uut->_sieve_queue.begin(), + _uut->_sieve_queue.iterator_to(*_uut->_sieve_hand)); + } + std::optional sieve_hand_key() { + if (_uut->size() == 0) { + return std::nullopt; + } + if (_uut->_sieve_hand == nullptr) { + return *_uut->_sieve_queue.back().key; + } + return *_uut->_sieve_hand->key; + } +}; + +TEST_F(WebCacheTest, AddReturnsGreaterZeroTs) { + auto expires_ts = _uut->add(a_key, a_valptr); + ASSERT_GT(expires_ts, ceph::real_clock::zero()); +} + +TEST_F(WebCacheTest, MultipleAddsToEmptyReturnFirstTs) { + const std::vector inserts = { + _uut->add(a_key, a_valptr), + _uut->add(a_key, a_valptr), + _uut->add(a_key, a_valptr), + _uut->add(a_key, a_valptr), + }; + ASSERT_THAT(inserts, ::testing::Each(inserts[0])); +} + +TEST_F(WebCacheTest, TwoSameKeyAddsGivesSize1) { + const auto _ = { + _uut->add(a_key, a_valptr), + _uut->add(a_key, a_valptr), + _uut->add(a_key, a_valptr), + }; + ASSERT_THAT(_uut->size(), 1); +} + +TEST_F(WebCacheTest, TwoDistinctKeyAddsGiveSize2) { + const auto _ = { + _uut->add("a", a_valptr), + _uut->add("b", a_valptr), + }; + ASSERT_THAT(_uut->size(), 2); +} + +TEST_F(WebCacheTest, LookupNonExistingReturnsIsNullopt) { + ASSERT_FALSE(_uut->lookup("not-existing").has_value()); +} + +TEST_F(WebCacheTest, AddDoesNotOverwrite) { + _uut->add(a_key, a_valptr); + ASSERT_EQ(_uut->lookup(a_key).value(), a_valptr); + + auto another_valptr = std::make_shared("another-value"); + _uut->add(a_key, another_valptr); + ASSERT_EQ(_uut->lookup(a_key).value(), a_valptr); +} + +TEST_F(WebCacheTest, LookupReturnsWhatWasAdded) { + const std::string key = "testkey"; + const std::string value = "testvalue"; + auto addptr = std::make_shared(value); + _uut->add(key, addptr); + + auto maybe_val = _uut->lookup(key); + ASSERT_TRUE(maybe_val); + ASSERT_EQ(value, *maybe_val.value()); +} + +TEST_F(WebCacheTest, AddsOverCapacityDontGoOver) { + for (size_t i = 0; i < (_initialized_capacity + 10); ++i) { + _uut->add(std::to_string(i), a_valptr); + } + ASSERT_EQ(_initialized_capacity, _uut->size()); +} + +TEST_F(WebCacheTest, AddALotUnique) { + reset_cache(100); + for (size_t i = 0; i < 10000; ++i) { + uuid_d uuid; + uuid.generate_random(); + _uut->add(uuid.to_string(), a_valptr); + } +} + +TEST_F(WebCacheTest, CacheTakesValOwnership) { + const std::string a_key = "testkey"; + const std::string a_value = "testvalue"; + std::shared_ptr a_valptr = + std::make_shared(a_value); + ASSERT_EQ(1, a_valptr.use_count()); + _uut->add(a_key, a_valptr); + ASSERT_EQ(2, a_valptr.use_count()); + auto retrieved = _uut->lookup(a_key).value(); + ASSERT_EQ(3, retrieved.use_count()); + a_valptr.reset(); + ASSERT_EQ(2, retrieved.use_count()); +} + +TEST_F(WebCacheTest, SimpleLookupOr) { + auto value = _uut->lookup_or(a_key, std::make_shared("test")); + ASSERT_EQ("test", *value); +} + +TEST_F(WebCacheTest, LookupOrAddsToCache) { + auto future = _uut->lookup_or(a_key, std::make_shared("test")); + ASSERT_EQ("test", *_uut->lookup(a_key).value()); +} + +TEST_F(WebCacheTest, SieveRemoveHand) { + std::array::Node, 3> store{ + WebCache::Node{ + std::make_shared("a"), ceph::real_clock::from_time_t(1)}, + WebCache::Node{ + std::make_shared("b"), ceph::real_clock::from_time_t(2)}, + WebCache::Node{ + std::make_shared("c"), ceph::real_clock::from_time_t(3)}, + }; + WebCache::SieveQueue nodes; + for (auto& node : store) { + nodes.push_back(node); + } + // SIEVE QUEUE: a b c + WebCache::Node* hand = &store[2]; + lderr(_cct.get()) << "NODES: " << &store[0] << ", " << &store[1] << ", " + << &store[2] << dendl; + lderr(_cct.get()) << "HAND: " << hand << dendl; + { + auto [it, hand_moved] = + WebCache::sieve_remove_unmutexed( + nodes, hand, store[2]); + ASSERT_EQ(&*nodes.end(), &*it); // we removed the last element + ASSERT_EQ(hand_moved, &nodes.back()); // hand pointed to the last element + ASSERT_EQ(hand_moved, &store[1]); + hand = hand_moved; + } + + { + auto [it, hand_moved] = + WebCache::sieve_remove_unmutexed( + nodes, hand, store[0]); + ASSERT_EQ(*it->value, "b"); + ASSERT_EQ(hand_moved, &store[1]); + hand = hand_moved; + } + + { + auto [it, hand_moved] = + WebCache::sieve_remove_unmutexed( + nodes, hand, store[1]); + ASSERT_EQ(&*nodes.end(), &*it); + ASSERT_EQ(hand_moved, nullptr); + } +} + +TEST_F(WebCacheTest, ExpireEraseOne) { + WebCache::Node alive_node( + a_valptr, ceph::real_clock::from_time_t(301)); + WebCache::Node expired_node( + a_valptr, ceph::real_clock::from_time_t(10)); + WebCache::SieveQueue nodes; + nodes.push_back(alive_node); + nodes.push_back(expired_node); + WebCache::Node* hand = nullptr; + + lderr(_cct.get()) << "NODES: " << &alive_node << ":" << alive_node + << ", " << &expired_node << ":" << expired_node << dendl; + lderr(_cct.get()) << "BEFORE EXPIRE: " << alive_node << ", " << expired_node + << dendl; + EXPECT_EQ(2, nodes.size()); + const auto expiration_cutoff = ceph::real_clock::from_time_t(300); + WebCache::SieveQueue expired; + WebCache::sieve_expire_erase_unmutexed( + nodes, hand, expiration_cutoff, expired); + lderr(_cct.get()) << "AFTER EXPIRE: " << alive_node << ", " << expired_node + << dendl; + lderr(_cct.get()) << "EXPIRED: " << expired << dendl; + EXPECT_EQ(1, nodes.size()); + EXPECT_EQ(nullptr, hand); + ASSERT_EQ(1, expired.size()); + EXPECT_EQ(&*expired.begin(), &expired_node); +} + +TEST_F(WebCacheTest, ExpireEraseAll) { + WebCache::Node expired_node_1( + a_valptr, ceph::real_clock::from_time_t(23)); + WebCache::Node expired_node_2( + a_valptr, ceph::real_clock::from_time_t(42)); + WebCache::SieveQueue nodes; + nodes.push_back(expired_node_1); + nodes.push_back(expired_node_2); + WebCache::Node* hand = nullptr; + lderr(_cct.get()) << "NODES: " << &expired_node_1 << ":" + << expired_node_1 << ", " << &expired_node_2 << ":" + << expired_node_2 << dendl; + lderr(_cct.get()) << "BEFORE EXPIRE: " << expired_node_1 << ", " + << expired_node_2 << dendl; + EXPECT_EQ(2, nodes.size()); + const auto expiration_cutoff = ceph::real_clock::from_time_t(300); + WebCache::SieveQueue expired; + WebCache::sieve_expire_erase_unmutexed( + nodes, hand, expiration_cutoff, expired); + lderr(_cct.get()) << "AFTER EXPIRE: " << expired_node_1 << ", " + << expired_node_2 << dendl; + lderr(_cct.get()) << "EXPIRED: " << expired << dendl; + EXPECT_EQ(0, nodes.size()); + EXPECT_EQ(nullptr, hand); + ASSERT_EQ(2, expired.size()); + ASSERT_EQ(&*expired.begin(), &expired_node_1); + ASSERT_EQ(&expired.back(), &expired_node_2); +} + +TEST_F(WebCacheTest, ExpireEraseUpdatedTTLs) { + _uut->add("a", a_valptr); // oldest + _uut->add("b", a_valptr); + _uut->add("c", a_valptr); + _uut->add("d", a_valptr); + _uut->add("e", a_valptr); // newest + + _uut->update_ttl_if("a", a_valptr, std::chrono::seconds(0)); // expired + _uut->update_ttl_if( + "b", a_valptr, std::chrono::seconds(100000)); // not expired + _uut->update_ttl_if("c", a_valptr, std::chrono::seconds(0)); + _uut->update_ttl_if("d", a_valptr, std::chrono::seconds(100000)); + _uut->update_ttl_if("e", a_valptr, std::chrono::seconds(0)); + + EXPECT_EQ(5, _uut->size()) << *_uut; + EXPECT_EQ(3, _uut->expire_erase()); + EXPECT_EQ(2, _uut->size()) << *_uut; +} + +TEST_F(WebCacheTest, ExpireEraseEmpty) { + WebCache::SieveQueue nodes; + const auto expiration_cutoff = ceph::real_clock::from_time_t(42); + WebCache::SieveQueue expired; + WebCache::Node* hand = nullptr; + WebCache::sieve_expire_erase_unmutexed( + nodes, hand, expiration_cutoff, expired); + ASSERT_EQ(0, nodes.size()); + ASSERT_EQ(0, expired.size()); + EXPECT_EQ(nullptr, hand); +} + +TEST_F(WebCacheTest, CacheHasSizeZeroAfterClear) { + _uut->add("a", a_valptr); + _uut->add("b", a_valptr); + _uut->add("c", a_valptr); + ASSERT_EQ(3, _uut->size()); + ASSERT_EQ(3, _uut->clear()); + ASSERT_EQ(0, _uut->size()); +} + +TEST_F(WebCacheTest, SieveExample) { + // Example from https://cachemon.github.io/SIEVE-website/ + _uut->add("a", a_valptr); + _uut->add("b", a_valptr); + _uut->add("c", a_valptr); + _uut->add("d", a_valptr); + _uut->add("e", a_valptr); + _uut->add("f", a_valptr); + _uut->add("g", a_valptr); + + _uut->add("a", a_valptr); + _uut->add("b", a_valptr); + _uut->add("g", a_valptr); + ASSERT_THAT( + sieve_queue_keys(), + ::testing::ElementsAre("g", "f", "e", "d", "c", "b", "a")); + ASSERT_EQ(sieve_hand_pos(), 6); + + _uut->add("h", a_valptr); + ASSERT_THAT( + sieve_queue_keys(), + ::testing::ElementsAre("h", "g", "f", "e", "d", "b", "a")); + ASSERT_EQ(sieve_hand_pos(), 4); + + _uut->add("a", a_valptr); + ASSERT_THAT( + sieve_queue_keys(), + ::testing::ElementsAre("h", "g", "f", "e", "d", "b", "a")); + ASSERT_EQ(sieve_hand_pos(), 4); + + _uut->add("d", a_valptr); + ASSERT_THAT( + sieve_queue_keys(), + ::testing::ElementsAre("h", "g", "f", "e", "d", "b", "a")); + ASSERT_EQ(sieve_hand_pos(), 4); + + _uut->add("i", a_valptr); + ASSERT_THAT( + sieve_queue_keys(), + ::testing::ElementsAre("i", "h", "g", "f", "d", "b", "a")); + ASSERT_EQ(sieve_hand_pos(), 3); + + _uut->add("b", a_valptr); + ASSERT_THAT( + sieve_queue_keys(), + ::testing::ElementsAre("i", "h", "g", "f", "d", "b", "a")); + ASSERT_EQ(sieve_hand_pos(), 3); + + _uut->add("j", a_valptr); + ASSERT_THAT( + sieve_queue_keys(), + ::testing::ElementsAre("j", "i", "h", "g", "d", "b", "a")); + ASSERT_EQ(sieve_hand_pos(), 3); +} + +TEST_F(WebCacheTest, RemoveIfNonExistingReturnsFalse) { + ASSERT_FALSE(_uut->remove_if("h", a_valptr)); +} + +TEST_F(WebCacheTest, RemoveIfExistingDifferentValueReturnsFalseDoesNothing) { + _uut->add("a", a_valptr); + ASSERT_FALSE( + _uut->remove_if("a", std::make_shared("someothervalue"))); + ASSERT_THAT(sieve_queue_keys(), ::testing::ElementsAre("a")); +} + +TEST_F(WebCacheTest, RemoveIfExampleHandMovement) { + // Start with the example from https://cachemon.github.io/SIEVE-website/ + _uut->add("a", a_valptr); + _uut->add("b", a_valptr); + _uut->add("c", a_valptr); + _uut->add("d", a_valptr); + _uut->add("e", a_valptr); + _uut->add("f", a_valptr); + _uut->add("g", a_valptr); + _uut->add("a", a_valptr); + _uut->add("b", a_valptr); + _uut->add("g", a_valptr); + _uut->add("h", a_valptr); + ASSERT_THAT( + sieve_queue_keys(), + ::testing::ElementsAre("h", "g", "f", "e", "d", "b", "a")); + ASSERT_EQ(sieve_hand_key(), "d"); + + ASSERT_TRUE(_uut->remove_if("h", a_valptr)); + ASSERT_THAT( + sieve_queue_keys(), ::testing::ElementsAre("g", "f", "e", "d", "b", "a")); + ASSERT_EQ(sieve_hand_key(), "d"); + + ASSERT_TRUE(_uut->remove_if("d", a_valptr)); + ASSERT_THAT( + sieve_queue_keys(), ::testing::ElementsAre("g", "f", "e", "b", "a")); + ASSERT_EQ(sieve_hand_key(), "e"); + + ASSERT_TRUE(_uut->remove_if("a", a_valptr)); + ASSERT_THAT(sieve_queue_keys(), ::testing::ElementsAre("g", "f", "e", "b")); + ASSERT_EQ(sieve_hand_key(), "e"); + + ASSERT_TRUE(_uut->remove_if("f", a_valptr)); + ASSERT_THAT(sieve_queue_keys(), ::testing::ElementsAre("g", "e", "b")); + ASSERT_EQ(sieve_hand_key(), "e"); + + ASSERT_TRUE(_uut->remove_if("e", a_valptr)); + ASSERT_THAT(sieve_queue_keys(), ::testing::ElementsAre("g", "b")); + ASSERT_EQ(sieve_hand_key(), "g"); + + ASSERT_TRUE(_uut->remove_if("b", a_valptr)); + ASSERT_THAT(sieve_queue_keys(), ::testing::ElementsAre("g")); + ASSERT_EQ(sieve_hand_key(), "g"); + + ASSERT_TRUE(_uut->remove_if("g", a_valptr)); + ASSERT_THAT(0, _uut->size()); + ASSERT_FALSE(sieve_hand_key().has_value()); +} + +TEST_F(WebCacheTest, SieveAllVisited) { + reset_cache(2); + _uut->add("a", a_valptr); + _uut->add("b", a_valptr); + _uut->add("a", a_valptr); + _uut->add("b", a_valptr); + _uut->add("c", a_valptr); +} + +class WebCacheConcurrencyTest : public WebCacheTest { + void TearDown() override { + if (_uut->perf() != nullptr) { + JSONFormatter f(true); + _uut->perf()->dump_formatted(&f, false, select_labeled_t::labeled); + f.flush(std::cout); + _uut->perf()->reset(); + } + } +}; + +TEST_F(WebCacheConcurrencyTest, BasicAddSame) { + reset_cache_system_mode(100); + const auto num_threads = + std::max(std::thread::hardware_concurrency() * 100, 100U); + std::vector threads; + for (size_t i = 0; i < num_threads; ++i) { + threads.emplace_back([&]() { + for (size_t j = 0; j < 1000; ++j) { + _uut->add(a_key, a_valptr); + } + }); + } + for (auto& th : threads) { + th.join(); + } + + ASSERT_EQ(1, _uut->size()); + ASSERT_TRUE(_uut->lookup(a_key).has_value()); +} + +TEST_F(WebCacheConcurrencyTest, BasicAddUnique) { + reset_cache_system_mode(100); + const auto num_threads = + std::max(std::thread::hardware_concurrency() * 100, 100U); + std::vector threads; + for (size_t i = 0; i < num_threads; ++i) { + threads.emplace_back([&]() { + for (size_t j = 0; j < 10; ++j) { + uuid_d uuid; + uuid.generate_random(); + _uut->add(uuid.to_string(), a_valptr); + } + }); + } + for (auto& th : threads) { + th.join(); + } +} + +// Example: Mitigate cache stampedes using std::call_once +TEST_F(WebCacheConcurrencyTest, StampedeSyncCallOnce) { + struct CacheValue { + std::once_flag once; + std::string value; + }; + webcache::WebCache cache( + _cct.get(), "test_web_cache", 100); + std::atomic_int fetches = 0; + const auto num_threads = + std::max(std::thread::hardware_concurrency() * 100, 100U); + std::vector threads; + for (size_t i = 0; i < num_threads; ++i) { + threads.emplace_back([&]() { + std::shared_ptr cache_value = + cache.lookup_or(a_key, std::make_shared()); + std::call_once(cache_value->once, [cache_value, &fetches]() { + fetches++; + std::this_thread::sleep_for(1000ms); + cache_value->value = "test"; + }); + }); + } + for (auto& th : threads) { + th.join(); + } + ASSERT_EQ(fetches.load(), 1); +} + +// Example: Mitigate cache stampedes using a mutex per value +TEST_F(WebCacheConcurrencyTest, StampedeMutex) { + struct CacheValue { + std::mutex mutex; + std::string value; + std::function fn; + CacheValue(std::function fn) : fn(fn) {} + std::string get() { + std::unique_lock lock(mutex); + if (value.empty()) { + value = fn(); + } + return value; + } + }; + webcache::WebCache cache( + _cct.get(), "test_web_cache", 100); + std::atomic_int fetches = 0; + const auto num_threads = + std::max(std::thread::hardware_concurrency() * 100, 100U); + std::vector threads; + for (size_t i = 0; i < num_threads; ++i) { + threads.emplace_back([&]() { + std::shared_ptr cache_value = + cache.lookup_or(a_key, std::make_shared([&fetches]() { + fetches++; + std::this_thread::sleep_for(1000ms); + return "test"; + })); + cache_value->get(); + }); + } + for (auto& th : threads) { + th.join(); + } + ASSERT_EQ(fetches.load(), 1); +} + +class WebCacheRandomizedTest : public WebCacheTest { + void TearDown() override { + if (_uut->perf() != nullptr) { + JSONFormatter f(true); + _uut->perf()->dump_formatted(&f, false, select_labeled_t::labeled); + f.flush(std::cout); + _uut->perf()->reset(); + } + } +}; + +TEST_F(WebCacheRandomizedTest, RandomCallMainOperations) { + reset_cache_system_mode(1000); + const size_t ops = 100000; + const auto num_threads = std::max(std::thread::hardware_concurrency(), 8U); + + std::vector base(100); + std::iota(base.begin(), base.end(), 0); + + std::random_device rd; + std::mt19937 gen(rd()); + std::uniform_int_distribution<> dist(0, base.size() - 1); + std::queue keys; + for (size_t i = 0; i < ops; ++i) { + keys.emplace(std::to_string(base[dist(gen)])); + } + + std::discrete_distribution<> op_dist({90, 1, 9}); + + std::mutex mutex; + std::vector threads; + std::atomic_int lookups; + std::atomic_int clears; + std::atomic_int expires; + for (size_t i = 0; i < num_threads; ++i) { + threads.emplace_back([&]() { + for (size_t op_i = 0; op_i < ops / num_threads; ++op_i) { + std::string key; + int op = -1; + { + std::unique_lock lock(mutex); + key = keys.front(); + keys.pop(); + op = op_dist(gen); + } + + switch (op) { + case 0: { // lookup_or + auto value = _uut->lookup_or(key, a_valptr); + lookups++; + } break; + case 1: // clear cache + _uut->clear(); + clears++;; + break; + case 2: // expire + _uut->expire_erase(); + expires++; + break; + default: + ceph_abort("should not happen"); + } + } + }); + } + for (auto& th : threads) { + th.join(); + } + EXPECT_GT(lookups, 1); + EXPECT_GT(expires, 1); + EXPECT_GT(clears, 1); + std::cout << "lookups: " << lookups + << ", expires: " << expires + << ", clears: " << clears + << std::endl; +} + +} // namespace webcache