]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
rgw/multisite/datalog: Test semaphores/recovery
authorAdam Emerson <aemerson@redhat.com>
Sat, 27 Apr 2024 17:20:12 +0000 (13:20 -0400)
committerAdam C. Emerson <aemerson@redhat.com>
Tue, 1 Apr 2025 15:10:14 +0000 (11:10 -0400)
Also tests renew_entry, backend, and list.

Signed-off-by: Adam Emerson <aemerson@redhat.com>
qa/suites/rgw/verify/tasks/cls.yaml
qa/workunits/rgw/test_rgw_datalog.sh [new file with mode: 0755]
src/rgw/driver/rados/rgw_datalog.cc
src/rgw/driver/rados/rgw_datalog.h
src/rgw/rgw_bucket_types.h
src/test/rgw/CMakeLists.txt
src/test/rgw/test_datalog.cc [new file with mode: 0644]

index 4748d1f69f5b0e7f553462ebf4773d877499497e..5800293484ef49c80163c15f6222c1ccc5e97548 100644 (file)
@@ -19,5 +19,6 @@ tasks:
         - cls/test_cls_sem_set.sh
         - rgw/test_rgw_gc_log.sh
         - rgw/test_rgw_obj.sh
+        - rgw/test_rgw_datalog.sh
         - rgw/test_librgw_file.sh
         - rgw/test_awssdkv4_sig.sh
diff --git a/qa/workunits/rgw/test_rgw_datalog.sh b/qa/workunits/rgw/test_rgw_datalog.sh
new file mode 100755 (executable)
index 0000000..87a5e2d
--- /dev/null
@@ -0,0 +1,5 @@
+#!/bin/sh -e
+
+ceph_test_datalog
+
+exit 0
index e9a14050e11d551d175f447a17cbd495510a566c..45ca67f021cde846d03c2708b90744497a93ef87 100644 (file)
@@ -363,11 +363,16 @@ RGWDataChangesLog::RGWDataChangesLog(CephContext* cct)
     prefix(get_prefix()),
     changes(cct->_conf->rgw_data_log_changes_size) {}
 
-RGWDataChangesLog::RGWDataChangesLog(CephContext* cct, bool log_data,
-                                    neorados::RADOS* rados)
-  : cct(cct), rados(rados), log_data(log_data),
-    num_shards(cct->_conf->rgw_data_log_num_shards),
-    prefix(get_prefix()), changes(cct->_conf->rgw_data_log_changes_size) {}
+RGWDataChangesLog::RGWDataChangesLog(CephContext *cct, bool log_data,
+                                     neorados::RADOS *rados,
+                                     std::optional<int> num_shards,
+                                     std::optional<uint64_t> sem_max_keys)
+    : cct(cct), rados(rados), log_data(log_data),
+      num_shards(num_shards ? *num_shards :
+                cct->_conf->rgw_data_log_num_shards),
+      prefix(get_prefix()), changes(cct->_conf->rgw_data_log_changes_size),
+      sem_max_keys(sem_max_keys ? *sem_max_keys :
+                  neorados::cls::sem_set::max_keys) {}
 
 
 void DataLogBackends::handle_init(entries_t e) {
index 1b6306eb8f6a7ae135f35ca809b8f2e2ace9b492..f63a890c54b8ce9176bebcc0337f6d8332d98409 100644 (file)
@@ -336,6 +336,20 @@ inline bool operator <(const BucketGen& l, const BucketGen& r) {
   }
 }
 
+inline std::ostream& operator <<(std::ostream& m, const BucketGen& bg) {
+  return m << "{" << bg.shard << ", " << bg.gen << "}";
+}
+
+namespace std {
+template <>
+struct hash<BucketGen> {
+  std::size_t operator ()(const BucketGen& bg) const noexcept {
+    return (hash<decltype(bg.shard)>{}(bg.shard) << 1)
+          ^ hash<decltype(bg.gen)>{}(bg.gen);
+  }
+};
+}
+
 class RGWDataChangesLog {
   friend class DataLogTestBase;
   friend DataLogBackends;
@@ -412,7 +426,9 @@ public:
   RGWDataChangesLog(CephContext* cct);
   // For testing.
   RGWDataChangesLog(CephContext* cct, bool log_data,
-                   neorados::RADOS* rados);
+                   neorados::RADOS* rados,
+                   std::optional<int> num_shards = std::nullopt,
+                   std::optional<uint64_t> sem_max_keys = std::nullopt);
   ~RGWDataChangesLog();
 
   asio::awaitable<void> start(const DoutPrefixProvider* dpp,
index ea379678ebe3f938c3a023fd4bffb4bebf71b3e5..c9e9da0d11e85cdcc8b32bf91ab8d96e01581d9e 100644 (file)
@@ -192,6 +192,18 @@ struct rgw_bucket {
 };
 WRITE_CLASS_ENCODER(rgw_bucket)
 
+namespace std {
+template<>
+struct hash<rgw_bucket>
+{
+  std::size_t operator ()(const rgw_bucket& b) const noexcept {
+    return ((std::hash<decltype(b.tenant)>{}(b.tenant) << 2) ^
+           (std::hash<decltype(b.name)>{}(b.name) << 1) ^
+           std::hash<decltype(b.bucket_id)>{}(b.bucket_id));
+  }
+};
+}
+
 inline std::ostream& operator<<(std::ostream& out, const rgw_bucket &b) {
   out << b.tenant << ":" << b.name << "[" << b.bucket_id << "])";
   return out;
@@ -231,6 +243,17 @@ struct rgw_bucket_shard {
   }
 }; /* rgw_bucket_shard */
 
+namespace std {
+template<>
+struct hash<rgw_bucket_shard>
+{
+  std::size_t operator ()(const rgw_bucket_shard& bs) const noexcept {
+    return ((std::hash<decltype(bs.bucket)>{}(bs.bucket) << 1) ^
+           std::hash<decltype(bs.shard_id)>{}(bs.shard_id));
+  }
+};
+}
+
 void encode(const rgw_bucket_shard& b, bufferlist& bl, uint64_t f=0);
 void decode(rgw_bucket_shard& b, bufferlist::const_iterator& bl);
 
index 0dadae510c1e7877e4cdcc6b03a57a4226da9aef..cf4252c90a1c6c673e66e3f7ac6e92e47dd24705 100644 (file)
@@ -356,3 +356,15 @@ target_include_directories(unittest_rgw_posix_driver
 target_link_libraries(unittest_rgw_posix_driver  ${UNITTEST_LIBS}
   ${rgw_libs} ${LMDB_LIBRARIES})
 endif(WITH_RADOSGW_POSIX)
+
+# ceph_test_datalog
+add_executable(ceph_test_datalog test_datalog.cc)
+target_include_directories(ceph_test_datalog
+  SYSTEM PRIVATE "${CMAKE_SOURCE_DIR}/src/rgw"
+  SYSTEM PRIVATE "${CMAKE_SOURCE_DIR}/src/rgw/store/rados")
+target_link_libraries(ceph_test_datalog
+  libneorados
+  neoradostest-support
+  ${UNITTEST_LIBS}
+  ${rgw_libs})
+install(TARGETS ceph_test_datalog DESTINATION ${CMAKE_INSTALL_BINDIR})
diff --git a/src/test/rgw/test_datalog.cc b/src/test/rgw/test_datalog.cc
new file mode 100644 (file)
index 0000000..4f32d0b
--- /dev/null
@@ -0,0 +1,448 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2019 Red Hat, Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#include "rgw_datalog.h"
+
+#include <string_view>
+
+#include <boost/asio/awaitable.hpp>
+#include <boost/asio/use_awaitable.hpp>
+
+#include <boost/system/errc.hpp>
+#include <boost/system/error_code.hpp>
+
+#include <fmt/format.h>
+
+#include "include/neorados/RADOS.hpp"
+
+#include "neorados/cls/sem_set.h"
+
+#include "test/neorados/common_tests.h"
+
+#include "gtest/gtest.h"
+
+namespace asio = boost::asio;
+namespace ss = neorados::cls::sem_set;
+
+using neorados::WriteOp;
+
+class DataLogTestBase : public CoroTest {
+private:
+  const std::string prefix_{std::string{"test framework "} +
+                           testing::UnitTest::GetInstance()->
+                           current_test_info()->name() +
+                           std::string{": "}};
+
+  std::optional<neorados::RADOS> rados_;
+  neorados::IOContext pool_;
+  const std::string pool_name_ = get_temp_pool_name(
+    testing::UnitTest::GetInstance()->current_test_info()->name());
+  std::unique_ptr<DoutPrefix> dpp_;
+
+  boost::asio::awaitable<uint64_t> create_pool() {
+    co_return co_await ::create_pool(rados(), pool_name(),
+                                    boost::asio::use_awaitable);
+  }
+
+  boost::asio::awaitable<void> clean_pool() {
+    co_await rados().delete_pool(pool().get_pool(),
+                               boost::asio::use_awaitable);
+  }
+
+  virtual asio::awaitable<std::unique_ptr<RGWDataChangesLog>>
+  create_datalog() = 0;
+
+protected:
+
+  std::unique_ptr<RGWDataChangesLog> datalog;
+
+  neorados::RADOS& rados() noexcept { return *rados_; }
+  const std::string& pool_name() const noexcept { return pool_name_; }
+  const neorados::IOContext& pool() const noexcept { return pool_; }
+  std::string_view prefix() const noexcept { return prefix_; }
+  const DoutPrefixProvider* dpp() const noexcept { return dpp_.get(); }
+  auto execute(std::string_view oid, neorados::WriteOp&& op,
+              std::uint64_t* ver = nullptr) {
+    return rados().execute(oid, pool(), std::move(op),
+                          boost::asio::use_awaitable, ver);
+  }
+  auto execute(std::string_view oid, neorados::ReadOp&& op,
+              std::uint64_t* ver = nullptr) {
+    return rados().execute(oid, pool(), std::move(op), nullptr,
+                          boost::asio::use_awaitable, ver);
+  }
+  auto execute(std::string_view oid, neorados::WriteOp&& op,
+              neorados::IOContext ioc, std::uint64_t* ver = nullptr) {
+    return rados().execute(oid, std::move(ioc), std::move(op),
+                          boost::asio::use_awaitable, ver);
+  }
+  auto execute(std::string_view oid, neorados::ReadOp&& op,
+              neorados::IOContext ioc, std::uint64_t* ver = nullptr) {
+    return rados().execute(oid, std::move(ioc), std::move(op), nullptr,
+                          boost::asio::use_awaitable, ver);
+  }
+
+  asio::awaitable<bc::flat_map<std::string, uint64_t>>
+  read_all_sems_all_shards() {
+    bc::flat_map<std::string, uint64_t> all_sems;
+
+    for (auto i = 0; i < datalog->num_shards; ++i) {
+      co_await datalog->read_all_sems(i, &all_sems);
+    }
+    co_return std::move(all_sems);
+  }
+
+  asio::awaitable<bc::flat_map<BucketGen, uint64_t>>
+  read_all_log(const DoutPrefixProvider* dpp) {
+    bc::flat_map<BucketGen, uint64_t> all_keys;
+
+    RGWDataChangesLogMarker marker;
+    do {
+      std::vector<rgw_data_change_log_entry> entries;
+      std::tie(entries, marker) =
+       co_await datalog->list_entries(dpp, 1'000,
+                                      std::move(marker));
+      for (const auto& entry : entries) {
+       auto key = fmt::format("{}:{}", entry.entry.key, entry.entry.gen);
+       all_keys[BucketGen{key}] += 1;
+      }
+    } while (marker);
+    co_return std::move(all_keys);
+  }
+
+  asio::awaitable<void> add_entry(const DoutPrefixProvider* dpp,
+                                  const BucketGen& bg) {
+    RGWBucketInfo bi;
+    bi.bucket = bg.shard.bucket;
+    rgw::bucket_log_layout_generation gen;
+    gen.gen = bg.gen;
+    co_await datalog->add_entry(dpp, bi, gen, bg.shard.shard_id);
+    co_return;
+  }
+
+  auto renew_entries(const DoutPrefixProvider* dpp) {
+    return datalog->renew_entries(dpp);
+  }
+
+  auto oid(const BucketGen& bg) {
+    return datalog->get_oid(0, datalog->choose_oid(bg.shard));
+  }
+
+  auto sem_set_oid(const BucketGen& bg) {
+    return datalog->get_sem_set_oid(datalog->choose_oid(bg.shard));
+  }
+
+  auto loc() {
+    return datalog->loc;
+  }
+
+  auto recover(const DoutPrefixProvider* dpp) {
+    return datalog->recover(dpp, nullptr);
+  }
+
+  void add_to_cur_cycle(const BucketGen& bg) {
+    std::unique_lock l(datalog->lock);
+    datalog->cur_cycle.insert(bg);
+  }
+
+  void add_to_semaphores(const BucketGen& bg) {
+    std::unique_lock l(datalog->lock);
+    datalog->semaphores[datalog->choose_oid(bg.shard)].insert(bg.get_key());
+  }
+
+public:
+
+  /// \brief Create RADOS handle and pool for the test
+  boost::asio::awaitable<void> CoSetUp() override {
+    rados_ = co_await neorados::RADOS::Builder{}
+      .build(asio_context, boost::asio::use_awaitable);
+    dpp_ = std::make_unique<DoutPrefix>(rados().cct(), 0, prefix().data());
+    pool_.set_pool(co_await create_pool());
+    datalog = co_await create_datalog();
+    co_return;
+  }
+
+  ~DataLogTestBase() override = default;
+
+  /// \brief Delete pool used for testing
+  boost::asio::awaitable<void> CoTearDown() override {
+    co_await datalog->shutdown();
+    co_await clean_pool();
+    co_return;
+  }
+};
+
+class DataLogTest : public DataLogTestBase {
+private:
+  asio::awaitable<std::unique_ptr<RGWDataChangesLog>> create_datalog() override {
+    auto datalog = std::make_unique<RGWDataChangesLog>(rados().cct(), true,
+                                                      &rados());
+    co_await datalog->start(dpp(), rgw_pool(pool_name()), false, true, false);
+    co_return std::move(datalog);
+  }
+};
+
+class DataLogWatchless : public DataLogTestBase {
+private:
+  asio::awaitable<std::unique_ptr<RGWDataChangesLog>> create_datalog() override {
+    auto datalog = std::make_unique<RGWDataChangesLog>(rados().cct(), true,
+                                                      &rados());
+    co_await datalog->start(dpp(), rgw_pool(pool_name()), false, false, false);
+    co_return std::move(datalog);
+  }
+};
+
+class DataLogBulky : public DataLogTestBase {
+private:
+  asio::awaitable<std::unique_ptr<RGWDataChangesLog>> create_datalog() override {
+    // Decrease max push/list and force everything into one shard so we
+    // can test iterated increment/decrement/list code.
+    auto datalog = std::make_unique<RGWDataChangesLog>(rados().cct(), true,
+                                                      &rados(), 1, 7);
+    co_await datalog->start(dpp(), rgw_pool(pool_name()), false, true, false);
+    co_return std::move(datalog);
+  }
+};
+
+
+
+const std::vector<BucketGen> ref{
+  {{{"fred", "foo"}, 32}, 3},
+  {{{"fred", "foo"}, 32}, 0},
+  {{{"fred", "foo"}, 13}, 0},
+  {{{"", "bar"}, 13}, 0},
+  {{{"", "bar", "zardoz"}, 11}, 0}};
+
+const auto bulky =
+  []() {
+    std::vector<BucketGen> ref;
+    for (auto i = 0; i < 30; ++i) {
+      ref.push_back({{{"", fmt::format("bucket{}", i)}, i}, 0});
+      ref.push_back({{{fmt::format("tenant{}", i),
+                      fmt::format("bucket{}", i)}, i}, 0});
+      ref.push_back({{{fmt::format("tenant{}", i),
+                      fmt::format("bucket{}", i),
+                      fmt::format("instance{}", i)}, i}, 0});
+    }
+    return ref;
+  }();
+
+TEST(DataLogBG, TestRoundTrip) {
+  for (const auto& bg : ref) {
+    ASSERT_EQ(bg, BucketGen{bg.get_key()});
+  }
+}
+
+CORO_TEST_F(DataLog, TestSem, DataLogTest) {
+  for (const auto& bg : ref) {
+    co_await add_entry(dpp(), bg);
+    // Second send adds it to working set and creates the semaphore
+    co_await add_entry(dpp(), bg);
+    // Third should *not* increment the semaphore again.
+    co_await add_entry(dpp(), bg);
+  }
+  auto sems = co_await read_all_sems_all_shards();
+  for (const auto& bg : ref) {
+    EXPECT_TRUE(sems.contains(bg.get_key()));
+    EXPECT_EQ(1, sems[bg.get_key()]);
+  }
+  co_await renew_entries(dpp());
+  sems.clear();
+  sems = co_await read_all_sems_all_shards();
+  EXPECT_TRUE(sems.empty());
+  const auto log_entries = co_await read_all_log(dpp());
+  for (const auto& bg : ref) {
+    EXPECT_TRUE(log_entries.contains(bg));
+  }
+  co_return;
+}
+
+CORO_TEST_F(DataLog, SimpleRecovery, DataLogTest) {
+  for (const auto& bg : ref) {
+    co_await rados().execute(sem_set_oid(bg), loc(),
+                            WriteOp{}.exec(ss::increment(bg.get_key())),
+                            asio::use_awaitable);
+  }
+  co_await recover(dpp());
+  auto sems = co_await read_all_sems_all_shards();
+  EXPECT_TRUE(sems.empty());
+
+  auto log_entries = co_await read_all_log(dpp());
+  for (const auto& bg : ref) {
+    EXPECT_TRUE(log_entries.contains(bg));
+  }
+
+  co_return;
+}
+
+CORO_TEST_F(DataLog, CycleRecovery, DataLogTest) {
+  for (const auto& bg : ref) {
+    co_await rados().execute(sem_set_oid(bg), loc(),
+                            WriteOp{}.exec(ss::increment(bg.get_key())),
+                            asio::use_awaitable);
+  }
+  add_to_cur_cycle(ref[0]);
+  add_to_cur_cycle(ref[1]);
+  co_await recover(dpp());
+  auto sems = co_await read_all_sems_all_shards();
+  for (const auto& bg : {ref[0], ref[1]}) {
+    EXPECT_TRUE(sems.contains(bg.get_key()));
+  }
+  for (const auto& bg : {ref[2], ref[3], ref[4]}) {
+    EXPECT_FALSE(sems.contains(bg.get_key()));
+  }
+
+  auto log_entries = co_await read_all_log(dpp());
+  for (const auto& bg : ref) {
+    EXPECT_TRUE(log_entries.contains(bg));
+  }
+
+  co_return;
+}
+
+CORO_TEST_F(DataLog, SemaphoresRecovery, DataLogTest) {
+  for (const auto& bg : ref) {
+    co_await rados().execute(sem_set_oid(bg), loc(),
+                            WriteOp{}.exec(ss::increment(bg.get_key())),
+                            asio::use_awaitable);
+  }
+  add_to_semaphores(ref[0]);
+  add_to_semaphores(ref[1]);
+  co_await recover(dpp());
+  auto sems = co_await read_all_sems_all_shards();
+  for (const auto& bg : {ref[0], ref[1]}) {
+    EXPECT_TRUE(sems.contains(bg.get_key()));
+  }
+  for (const auto& bg : {ref[2], ref[3], ref[4]}) {
+    EXPECT_FALSE(sems.contains(bg.get_key()));
+  }
+
+  const auto log_entries = co_await read_all_log(dpp());
+  for (const auto& bg : ref) {
+    EXPECT_EQ(1, log_entries.at(bg));
+  }
+
+  co_return;
+}
+
+CORO_TEST_F(DataLogWatchless, NotWatching, DataLogWatchless) {
+  for (const auto& bg : ref) {
+    co_await add_entry(dpp(), bg);
+    // With watch down, we should bypass the data window and get two entries
+    co_await add_entry(dpp(), bg);
+  }
+  auto sems = co_await read_all_sems_all_shards();
+  EXPECT_TRUE(sems.empty());
+  const auto log_entries = co_await read_all_log(dpp());
+  for (const auto& bg : ref) {
+    EXPECT_EQ(2, log_entries.at(bg));
+  }
+  co_return;
+}
+
+CORO_TEST_F(DataLogBulky, TestSemBulky, DataLogBulky) {
+  for (const auto& bg : bulky) {
+    co_await add_entry(dpp(), bg);
+    // Second send adds it to working set and creates the semaphore
+    co_await add_entry(dpp(), bg);
+  }
+  auto sems = co_await read_all_sems_all_shards();
+  for (const auto& bg : bulky) {
+    EXPECT_TRUE(sems.contains(bg.get_key()));
+    EXPECT_EQ(1, sems[bg.get_key()]);
+  }
+  co_await renew_entries(dpp());
+  sems.clear();
+  sems = co_await read_all_sems_all_shards();
+  EXPECT_TRUE(sems.empty());
+  const auto log_entries = co_await read_all_log(dpp());
+  for (const auto& bg : bulky) {
+    EXPECT_TRUE(log_entries.contains(bg));
+  }
+  co_return;
+}
+
+CORO_TEST_F(DataLogBulky, BulkyRecovery, DataLogBulky) {
+  for (const auto& bg : bulky) {
+    co_await rados().execute(sem_set_oid(bg), loc(),
+                            WriteOp{}.exec(ss::increment(bg.get_key())),
+                            asio::use_awaitable);
+  }
+  co_await recover(dpp());
+  auto sems = co_await read_all_sems_all_shards();
+  EXPECT_TRUE(sems.empty());
+
+  auto log_entries = co_await read_all_log(dpp());
+  for (const auto& bg : bulky) {
+    EXPECT_TRUE(log_entries.contains(bg));
+  }
+
+  co_return;
+}
+
+CORO_TEST_F(DataLogBulky, BulkyCycleRecovery, DataLogBulky) {
+  for (const auto& bg : bulky) {
+    co_await rados().execute(sem_set_oid(bg), loc(),
+                            WriteOp{}.exec(ss::increment(bg.get_key())),
+                            asio::use_awaitable);
+  }
+  for (auto i = 0u; i < bulky.size(); ++i) {
+    if (i % 2 == 0) {
+      add_to_cur_cycle(bulky[i]);
+    }
+  }
+  co_await recover(dpp());
+  auto sems = co_await read_all_sems_all_shards();
+  for (auto i = 0u; i < bulky.size(); ++i) {
+    if (i % 2 == 0) {
+      EXPECT_TRUE(sems.contains(bulky[i].get_key()));
+    } else {
+      EXPECT_FALSE(sems.contains(bulky[i].get_key()));
+    }
+  }
+
+  auto log_entries = co_await read_all_log(dpp());
+  for (const auto& bg : bulky) {
+    EXPECT_TRUE(log_entries.contains(bg));
+  }
+  co_return;
+}
+
+CORO_TEST_F(DataLogBulky, BulkySemaphoresRecovery, DataLogBulky) {
+  for (const auto& bg : bulky) {
+    co_await rados().execute(sem_set_oid(bg), loc(),
+                            WriteOp{}.exec(ss::increment(bg.get_key())),
+                            asio::use_awaitable);
+  }
+  for (auto i = 0u; i < bulky.size(); ++i) {
+    if (i % 2 == 0) {
+      add_to_semaphores(bulky[i]);
+    }
+  }
+  co_await recover(dpp());
+  auto sems = co_await read_all_sems_all_shards();
+  for (auto i = 0u; i < bulky.size(); ++i) {
+    if (i % 2 == 0) {
+      EXPECT_TRUE(sems.contains(bulky[i].get_key()));
+    } else {
+      EXPECT_FALSE(sems.contains(bulky[i].get_key()));
+    }
+  }
+
+  auto log_entries = co_await read_all_log(dpp());
+  for (const auto& bg : bulky) {
+    EXPECT_TRUE(log_entries.contains(bg));
+  }
+  co_return;
+}