From 5c9ff9d5c9d5c85e5ad39a83708848439ab9690c Mon Sep 17 00:00:00 2001 From: Adam Emerson Date: Sat, 27 Apr 2024 13:20:12 -0400 Subject: [PATCH] rgw/multisite/datalog: Test semaphores/recovery Also tests renew_entry, backend, and list. Signed-off-by: Adam Emerson --- qa/suites/rgw/verify/tasks/cls.yaml | 1 + qa/workunits/rgw/test_rgw_datalog.sh | 5 + src/rgw/driver/rados/rgw_datalog.cc | 15 +- src/rgw/driver/rados/rgw_datalog.h | 18 +- src/rgw/rgw_bucket_types.h | 23 ++ src/test/rgw/CMakeLists.txt | 12 + src/test/rgw/test_datalog.cc | 448 +++++++++++++++++++++++++++ 7 files changed, 516 insertions(+), 6 deletions(-) create mode 100755 qa/workunits/rgw/test_rgw_datalog.sh create mode 100644 src/test/rgw/test_datalog.cc diff --git a/qa/suites/rgw/verify/tasks/cls.yaml b/qa/suites/rgw/verify/tasks/cls.yaml index 4748d1f69f5..5800293484e 100644 --- a/qa/suites/rgw/verify/tasks/cls.yaml +++ b/qa/suites/rgw/verify/tasks/cls.yaml @@ -19,5 +19,6 @@ tasks: - cls/test_cls_sem_set.sh - rgw/test_rgw_gc_log.sh - rgw/test_rgw_obj.sh + - rgw/test_rgw_datalog.sh - rgw/test_librgw_file.sh - rgw/test_awssdkv4_sig.sh diff --git a/qa/workunits/rgw/test_rgw_datalog.sh b/qa/workunits/rgw/test_rgw_datalog.sh new file mode 100755 index 00000000000..87a5e2d1058 --- /dev/null +++ b/qa/workunits/rgw/test_rgw_datalog.sh @@ -0,0 +1,5 @@ +#!/bin/sh -e + +ceph_test_datalog + +exit 0 diff --git a/src/rgw/driver/rados/rgw_datalog.cc b/src/rgw/driver/rados/rgw_datalog.cc index e9a14050e11..45ca67f021c 100644 --- a/src/rgw/driver/rados/rgw_datalog.cc +++ b/src/rgw/driver/rados/rgw_datalog.cc @@ -363,11 +363,16 @@ RGWDataChangesLog::RGWDataChangesLog(CephContext* cct) prefix(get_prefix()), changes(cct->_conf->rgw_data_log_changes_size) {} -RGWDataChangesLog::RGWDataChangesLog(CephContext* cct, bool log_data, - neorados::RADOS* rados) - : cct(cct), rados(rados), log_data(log_data), - num_shards(cct->_conf->rgw_data_log_num_shards), - prefix(get_prefix()), changes(cct->_conf->rgw_data_log_changes_size) {} +RGWDataChangesLog::RGWDataChangesLog(CephContext *cct, bool log_data, + neorados::RADOS *rados, + std::optional num_shards, + std::optional sem_max_keys) + : cct(cct), rados(rados), log_data(log_data), + num_shards(num_shards ? *num_shards : + cct->_conf->rgw_data_log_num_shards), + prefix(get_prefix()), changes(cct->_conf->rgw_data_log_changes_size), + sem_max_keys(sem_max_keys ? *sem_max_keys : + neorados::cls::sem_set::max_keys) {} void DataLogBackends::handle_init(entries_t e) { diff --git a/src/rgw/driver/rados/rgw_datalog.h b/src/rgw/driver/rados/rgw_datalog.h index 1b6306eb8f6..f63a890c54b 100644 --- a/src/rgw/driver/rados/rgw_datalog.h +++ b/src/rgw/driver/rados/rgw_datalog.h @@ -336,6 +336,20 @@ inline bool operator <(const BucketGen& l, const BucketGen& r) { } } +inline std::ostream& operator <<(std::ostream& m, const BucketGen& bg) { + return m << "{" << bg.shard << ", " << bg.gen << "}"; +} + +namespace std { +template <> +struct hash { + std::size_t operator ()(const BucketGen& bg) const noexcept { + return (hash{}(bg.shard) << 1) + ^ hash{}(bg.gen); + } +}; +} + class RGWDataChangesLog { friend class DataLogTestBase; friend DataLogBackends; @@ -412,7 +426,9 @@ public: RGWDataChangesLog(CephContext* cct); // For testing. RGWDataChangesLog(CephContext* cct, bool log_data, - neorados::RADOS* rados); + neorados::RADOS* rados, + std::optional num_shards = std::nullopt, + std::optional sem_max_keys = std::nullopt); ~RGWDataChangesLog(); asio::awaitable start(const DoutPrefixProvider* dpp, diff --git a/src/rgw/rgw_bucket_types.h b/src/rgw/rgw_bucket_types.h index ea379678ebe..c9e9da0d11e 100644 --- a/src/rgw/rgw_bucket_types.h +++ b/src/rgw/rgw_bucket_types.h @@ -192,6 +192,18 @@ struct rgw_bucket { }; WRITE_CLASS_ENCODER(rgw_bucket) +namespace std { +template<> +struct hash +{ + std::size_t operator ()(const rgw_bucket& b) const noexcept { + return ((std::hash{}(b.tenant) << 2) ^ + (std::hash{}(b.name) << 1) ^ + std::hash{}(b.bucket_id)); + } +}; +} + inline std::ostream& operator<<(std::ostream& out, const rgw_bucket &b) { out << b.tenant << ":" << b.name << "[" << b.bucket_id << "])"; return out; @@ -231,6 +243,17 @@ struct rgw_bucket_shard { } }; /* rgw_bucket_shard */ +namespace std { +template<> +struct hash +{ + std::size_t operator ()(const rgw_bucket_shard& bs) const noexcept { + return ((std::hash{}(bs.bucket) << 1) ^ + std::hash{}(bs.shard_id)); + } +}; +} + void encode(const rgw_bucket_shard& b, bufferlist& bl, uint64_t f=0); void decode(rgw_bucket_shard& b, bufferlist::const_iterator& bl); diff --git a/src/test/rgw/CMakeLists.txt b/src/test/rgw/CMakeLists.txt index 0dadae510c1..cf4252c90a1 100644 --- a/src/test/rgw/CMakeLists.txt +++ b/src/test/rgw/CMakeLists.txt @@ -356,3 +356,15 @@ target_include_directories(unittest_rgw_posix_driver target_link_libraries(unittest_rgw_posix_driver ${UNITTEST_LIBS} ${rgw_libs} ${LMDB_LIBRARIES}) endif(WITH_RADOSGW_POSIX) + +# ceph_test_datalog +add_executable(ceph_test_datalog test_datalog.cc) +target_include_directories(ceph_test_datalog + SYSTEM PRIVATE "${CMAKE_SOURCE_DIR}/src/rgw" + SYSTEM PRIVATE "${CMAKE_SOURCE_DIR}/src/rgw/store/rados") +target_link_libraries(ceph_test_datalog + libneorados + neoradostest-support + ${UNITTEST_LIBS} + ${rgw_libs}) +install(TARGETS ceph_test_datalog DESTINATION ${CMAKE_INSTALL_BINDIR}) diff --git a/src/test/rgw/test_datalog.cc b/src/test/rgw/test_datalog.cc new file mode 100644 index 00000000000..4f32d0b9d27 --- /dev/null +++ b/src/test/rgw/test_datalog.cc @@ -0,0 +1,448 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2019 Red Hat, Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "rgw_datalog.h" + +#include + +#include +#include + +#include +#include + +#include + +#include "include/neorados/RADOS.hpp" + +#include "neorados/cls/sem_set.h" + +#include "test/neorados/common_tests.h" + +#include "gtest/gtest.h" + +namespace asio = boost::asio; +namespace ss = neorados::cls::sem_set; + +using neorados::WriteOp; + +class DataLogTestBase : public CoroTest { +private: + const std::string prefix_{std::string{"test framework "} + + testing::UnitTest::GetInstance()-> + current_test_info()->name() + + std::string{": "}}; + + std::optional rados_; + neorados::IOContext pool_; + const std::string pool_name_ = get_temp_pool_name( + testing::UnitTest::GetInstance()->current_test_info()->name()); + std::unique_ptr dpp_; + + boost::asio::awaitable create_pool() { + co_return co_await ::create_pool(rados(), pool_name(), + boost::asio::use_awaitable); + } + + boost::asio::awaitable clean_pool() { + co_await rados().delete_pool(pool().get_pool(), + boost::asio::use_awaitable); + } + + virtual asio::awaitable> + create_datalog() = 0; + +protected: + + std::unique_ptr datalog; + + neorados::RADOS& rados() noexcept { return *rados_; } + const std::string& pool_name() const noexcept { return pool_name_; } + const neorados::IOContext& pool() const noexcept { return pool_; } + std::string_view prefix() const noexcept { return prefix_; } + const DoutPrefixProvider* dpp() const noexcept { return dpp_.get(); } + auto execute(std::string_view oid, neorados::WriteOp&& op, + std::uint64_t* ver = nullptr) { + return rados().execute(oid, pool(), std::move(op), + boost::asio::use_awaitable, ver); + } + auto execute(std::string_view oid, neorados::ReadOp&& op, + std::uint64_t* ver = nullptr) { + return rados().execute(oid, pool(), std::move(op), nullptr, + boost::asio::use_awaitable, ver); + } + auto execute(std::string_view oid, neorados::WriteOp&& op, + neorados::IOContext ioc, std::uint64_t* ver = nullptr) { + return rados().execute(oid, std::move(ioc), std::move(op), + boost::asio::use_awaitable, ver); + } + auto execute(std::string_view oid, neorados::ReadOp&& op, + neorados::IOContext ioc, std::uint64_t* ver = nullptr) { + return rados().execute(oid, std::move(ioc), std::move(op), nullptr, + boost::asio::use_awaitable, ver); + } + + asio::awaitable> + read_all_sems_all_shards() { + bc::flat_map all_sems; + + for (auto i = 0; i < datalog->num_shards; ++i) { + co_await datalog->read_all_sems(i, &all_sems); + } + co_return std::move(all_sems); + } + + asio::awaitable> + read_all_log(const DoutPrefixProvider* dpp) { + bc::flat_map all_keys; + + RGWDataChangesLogMarker marker; + do { + std::vector entries; + std::tie(entries, marker) = + co_await datalog->list_entries(dpp, 1'000, + std::move(marker)); + for (const auto& entry : entries) { + auto key = fmt::format("{}:{}", entry.entry.key, entry.entry.gen); + all_keys[BucketGen{key}] += 1; + } + } while (marker); + co_return std::move(all_keys); + } + + asio::awaitable add_entry(const DoutPrefixProvider* dpp, + const BucketGen& bg) { + RGWBucketInfo bi; + bi.bucket = bg.shard.bucket; + rgw::bucket_log_layout_generation gen; + gen.gen = bg.gen; + co_await datalog->add_entry(dpp, bi, gen, bg.shard.shard_id); + co_return; + } + + auto renew_entries(const DoutPrefixProvider* dpp) { + return datalog->renew_entries(dpp); + } + + auto oid(const BucketGen& bg) { + return datalog->get_oid(0, datalog->choose_oid(bg.shard)); + } + + auto sem_set_oid(const BucketGen& bg) { + return datalog->get_sem_set_oid(datalog->choose_oid(bg.shard)); + } + + auto loc() { + return datalog->loc; + } + + auto recover(const DoutPrefixProvider* dpp) { + return datalog->recover(dpp, nullptr); + } + + void add_to_cur_cycle(const BucketGen& bg) { + std::unique_lock l(datalog->lock); + datalog->cur_cycle.insert(bg); + } + + void add_to_semaphores(const BucketGen& bg) { + std::unique_lock l(datalog->lock); + datalog->semaphores[datalog->choose_oid(bg.shard)].insert(bg.get_key()); + } + +public: + + /// \brief Create RADOS handle and pool for the test + boost::asio::awaitable CoSetUp() override { + rados_ = co_await neorados::RADOS::Builder{} + .build(asio_context, boost::asio::use_awaitable); + dpp_ = std::make_unique(rados().cct(), 0, prefix().data()); + pool_.set_pool(co_await create_pool()); + datalog = co_await create_datalog(); + co_return; + } + + ~DataLogTestBase() override = default; + + /// \brief Delete pool used for testing + boost::asio::awaitable CoTearDown() override { + co_await datalog->shutdown(); + co_await clean_pool(); + co_return; + } +}; + +class DataLogTest : public DataLogTestBase { +private: + asio::awaitable> create_datalog() override { + auto datalog = std::make_unique(rados().cct(), true, + &rados()); + co_await datalog->start(dpp(), rgw_pool(pool_name()), false, true, false); + co_return std::move(datalog); + } +}; + +class DataLogWatchless : public DataLogTestBase { +private: + asio::awaitable> create_datalog() override { + auto datalog = std::make_unique(rados().cct(), true, + &rados()); + co_await datalog->start(dpp(), rgw_pool(pool_name()), false, false, false); + co_return std::move(datalog); + } +}; + +class DataLogBulky : public DataLogTestBase { +private: + asio::awaitable> create_datalog() override { + // Decrease max push/list and force everything into one shard so we + // can test iterated increment/decrement/list code. + auto datalog = std::make_unique(rados().cct(), true, + &rados(), 1, 7); + co_await datalog->start(dpp(), rgw_pool(pool_name()), false, true, false); + co_return std::move(datalog); + } +}; + + + +const std::vector ref{ + {{{"fred", "foo"}, 32}, 3}, + {{{"fred", "foo"}, 32}, 0}, + {{{"fred", "foo"}, 13}, 0}, + {{{"", "bar"}, 13}, 0}, + {{{"", "bar", "zardoz"}, 11}, 0}}; + +const auto bulky = + []() { + std::vector ref; + for (auto i = 0; i < 30; ++i) { + ref.push_back({{{"", fmt::format("bucket{}", i)}, i}, 0}); + ref.push_back({{{fmt::format("tenant{}", i), + fmt::format("bucket{}", i)}, i}, 0}); + ref.push_back({{{fmt::format("tenant{}", i), + fmt::format("bucket{}", i), + fmt::format("instance{}", i)}, i}, 0}); + } + return ref; + }(); + +TEST(DataLogBG, TestRoundTrip) { + for (const auto& bg : ref) { + ASSERT_EQ(bg, BucketGen{bg.get_key()}); + } +} + +CORO_TEST_F(DataLog, TestSem, DataLogTest) { + for (const auto& bg : ref) { + co_await add_entry(dpp(), bg); + // Second send adds it to working set and creates the semaphore + co_await add_entry(dpp(), bg); + // Third should *not* increment the semaphore again. + co_await add_entry(dpp(), bg); + } + auto sems = co_await read_all_sems_all_shards(); + for (const auto& bg : ref) { + EXPECT_TRUE(sems.contains(bg.get_key())); + EXPECT_EQ(1, sems[bg.get_key()]); + } + co_await renew_entries(dpp()); + sems.clear(); + sems = co_await read_all_sems_all_shards(); + EXPECT_TRUE(sems.empty()); + const auto log_entries = co_await read_all_log(dpp()); + for (const auto& bg : ref) { + EXPECT_TRUE(log_entries.contains(bg)); + } + co_return; +} + +CORO_TEST_F(DataLog, SimpleRecovery, DataLogTest) { + for (const auto& bg : ref) { + co_await rados().execute(sem_set_oid(bg), loc(), + WriteOp{}.exec(ss::increment(bg.get_key())), + asio::use_awaitable); + } + co_await recover(dpp()); + auto sems = co_await read_all_sems_all_shards(); + EXPECT_TRUE(sems.empty()); + + auto log_entries = co_await read_all_log(dpp()); + for (const auto& bg : ref) { + EXPECT_TRUE(log_entries.contains(bg)); + } + + co_return; +} + +CORO_TEST_F(DataLog, CycleRecovery, DataLogTest) { + for (const auto& bg : ref) { + co_await rados().execute(sem_set_oid(bg), loc(), + WriteOp{}.exec(ss::increment(bg.get_key())), + asio::use_awaitable); + } + add_to_cur_cycle(ref[0]); + add_to_cur_cycle(ref[1]); + co_await recover(dpp()); + auto sems = co_await read_all_sems_all_shards(); + for (const auto& bg : {ref[0], ref[1]}) { + EXPECT_TRUE(sems.contains(bg.get_key())); + } + for (const auto& bg : {ref[2], ref[3], ref[4]}) { + EXPECT_FALSE(sems.contains(bg.get_key())); + } + + auto log_entries = co_await read_all_log(dpp()); + for (const auto& bg : ref) { + EXPECT_TRUE(log_entries.contains(bg)); + } + + co_return; +} + +CORO_TEST_F(DataLog, SemaphoresRecovery, DataLogTest) { + for (const auto& bg : ref) { + co_await rados().execute(sem_set_oid(bg), loc(), + WriteOp{}.exec(ss::increment(bg.get_key())), + asio::use_awaitable); + } + add_to_semaphores(ref[0]); + add_to_semaphores(ref[1]); + co_await recover(dpp()); + auto sems = co_await read_all_sems_all_shards(); + for (const auto& bg : {ref[0], ref[1]}) { + EXPECT_TRUE(sems.contains(bg.get_key())); + } + for (const auto& bg : {ref[2], ref[3], ref[4]}) { + EXPECT_FALSE(sems.contains(bg.get_key())); + } + + const auto log_entries = co_await read_all_log(dpp()); + for (const auto& bg : ref) { + EXPECT_EQ(1, log_entries.at(bg)); + } + + co_return; +} + +CORO_TEST_F(DataLogWatchless, NotWatching, DataLogWatchless) { + for (const auto& bg : ref) { + co_await add_entry(dpp(), bg); + // With watch down, we should bypass the data window and get two entries + co_await add_entry(dpp(), bg); + } + auto sems = co_await read_all_sems_all_shards(); + EXPECT_TRUE(sems.empty()); + const auto log_entries = co_await read_all_log(dpp()); + for (const auto& bg : ref) { + EXPECT_EQ(2, log_entries.at(bg)); + } + co_return; +} + +CORO_TEST_F(DataLogBulky, TestSemBulky, DataLogBulky) { + for (const auto& bg : bulky) { + co_await add_entry(dpp(), bg); + // Second send adds it to working set and creates the semaphore + co_await add_entry(dpp(), bg); + } + auto sems = co_await read_all_sems_all_shards(); + for (const auto& bg : bulky) { + EXPECT_TRUE(sems.contains(bg.get_key())); + EXPECT_EQ(1, sems[bg.get_key()]); + } + co_await renew_entries(dpp()); + sems.clear(); + sems = co_await read_all_sems_all_shards(); + EXPECT_TRUE(sems.empty()); + const auto log_entries = co_await read_all_log(dpp()); + for (const auto& bg : bulky) { + EXPECT_TRUE(log_entries.contains(bg)); + } + co_return; +} + +CORO_TEST_F(DataLogBulky, BulkyRecovery, DataLogBulky) { + for (const auto& bg : bulky) { + co_await rados().execute(sem_set_oid(bg), loc(), + WriteOp{}.exec(ss::increment(bg.get_key())), + asio::use_awaitable); + } + co_await recover(dpp()); + auto sems = co_await read_all_sems_all_shards(); + EXPECT_TRUE(sems.empty()); + + auto log_entries = co_await read_all_log(dpp()); + for (const auto& bg : bulky) { + EXPECT_TRUE(log_entries.contains(bg)); + } + + co_return; +} + +CORO_TEST_F(DataLogBulky, BulkyCycleRecovery, DataLogBulky) { + for (const auto& bg : bulky) { + co_await rados().execute(sem_set_oid(bg), loc(), + WriteOp{}.exec(ss::increment(bg.get_key())), + asio::use_awaitable); + } + for (auto i = 0u; i < bulky.size(); ++i) { + if (i % 2 == 0) { + add_to_cur_cycle(bulky[i]); + } + } + co_await recover(dpp()); + auto sems = co_await read_all_sems_all_shards(); + for (auto i = 0u; i < bulky.size(); ++i) { + if (i % 2 == 0) { + EXPECT_TRUE(sems.contains(bulky[i].get_key())); + } else { + EXPECT_FALSE(sems.contains(bulky[i].get_key())); + } + } + + auto log_entries = co_await read_all_log(dpp()); + for (const auto& bg : bulky) { + EXPECT_TRUE(log_entries.contains(bg)); + } + co_return; +} + +CORO_TEST_F(DataLogBulky, BulkySemaphoresRecovery, DataLogBulky) { + for (const auto& bg : bulky) { + co_await rados().execute(sem_set_oid(bg), loc(), + WriteOp{}.exec(ss::increment(bg.get_key())), + asio::use_awaitable); + } + for (auto i = 0u; i < bulky.size(); ++i) { + if (i % 2 == 0) { + add_to_semaphores(bulky[i]); + } + } + co_await recover(dpp()); + auto sems = co_await read_all_sems_all_shards(); + for (auto i = 0u; i < bulky.size(); ++i) { + if (i % 2 == 0) { + EXPECT_TRUE(sems.contains(bulky[i].get_key())); + } else { + EXPECT_FALSE(sems.contains(bulky[i].get_key())); + } + } + + auto log_entries = co_await read_all_log(dpp()); + for (const auto& bg : bulky) { + EXPECT_TRUE(log_entries.contains(bg)); + } + co_return; +} -- 2.39.5