]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: Add recovery semaphore commands to radosgw-admin
authorAdam Emerson <aemerson@redhat.com>
Tue, 17 Sep 2024 21:23:57 +0000 (17:23 -0400)
committerAdam C. Emerson <aemerson@redhat.com>
Tue, 1 Apr 2025 15:10:14 +0000 (11:10 -0400)
Signed-off-by: Adam Emerson <aemerson@redhat.com>
src/cls/sem_set/module.cc
src/cls/sem_set/ops.h
src/neorados/cls/sem_set.h
src/rgw/driver/rados/rgw_datalog.cc
src/rgw/driver/rados/rgw_datalog.h
src/rgw/radosgw-admin/radosgw-admin.cc
src/test/cli/radosgw-admin/help.t

index 66d33b9dbc36939204a0fe82a0484a82911b4f8e..3c0678368ca1a6ad433f2b9a3768ff0fd8bb337d 100644 (file)
@@ -104,6 +104,46 @@ int increment(cls_method_context_t hctx, buffer::list *in, buffer::list *out)
   return 0;
 }
 
+int reset(cls_method_context_t hctx, buffer::list *in, buffer::list *out)
+{
+  CLS_LOG(10, "%s", __PRETTY_FUNCTION__);
+
+  ss::reset op;
+  try {
+    auto iter = in->cbegin();
+    decode(op, iter);
+  } catch (const std::exception& e) {
+    CLS_ERR("ERROR: %s: failed to decode request: %s", __PRETTY_FUNCTION__,
+            e.what());
+    return -EINVAL;
+  }
+
+  if (op.keys.size() > ::cls::sem_set::max_keys) {
+    CLS_ERR("ERROR: %s: too many keys: %zu", __PRETTY_FUNCTION__,
+           op.keys.size());
+    return -E2BIG;
+  }
+
+  for (const auto& [key_, v] : op.keys) try {
+      buffer::list valbl;
+      auto key = std::string(PREFIX) + key_;
+      sem_val val{v};
+      encode(val, valbl);
+      auto r = cls_cxx_map_set_val(hctx, key, &valbl);
+      if (r < 0) {
+        CLS_ERR("ERROR: %s: failed to reset semaphore: r=%d",
+                __PRETTY_FUNCTION__, r);
+        return r;
+      }
+    } catch (const std::exception& e) {
+      CLS_ERR("CAN'T HAPPEN: %s: failed to decode semaphore: %s",
+              __PRETTY_FUNCTION__, e.what());
+      return -EIO;
+    }
+
+  return 0;
+}
+
 int decrement(cls_method_context_t hctx, buffer::list *in, buffer::list *out)
 {
   CLS_LOG(10, "%s", __PRETTY_FUNCTION__);
@@ -248,6 +288,10 @@ CLS_INIT(sem_set)
                           CLS_METHOD_RD | CLS_METHOD_WR,
                           &decrement, &h_decrement);
 
+  cls_register_cxx_method(h_class, ss::RESET,
+                          CLS_METHOD_RD | CLS_METHOD_WR,
+                          &reset, &h_decrement);
+
   cls_register_cxx_method(h_class, ss::LIST,
                           CLS_METHOD_RD,
                           &list, &h_list);
index 24d63dca136c7ab33f33826c529ecde07e1de3b2..be906f18d33231a30eb8b2918bbfece43ceb56c6 100644 (file)
@@ -85,6 +85,36 @@ struct decrement {
 };
 WRITE_CLASS_ENCODER(decrement);
 
+struct reset {
+  std::unordered_map<std::string, std::uint64_t> keys;
+
+  reset() = default;
+
+  reset(std::string s, uint64_t val = 0)
+    : keys({{std::move(s), val}}) {}
+
+  reset(decltype(keys) s)
+    : keys(std::move(s)) {}
+
+  template<std::input_iterator I>
+  reset(I begin, I end)
+    requires std::is_convertible_v<typename I::value_type, std::string>
+    : keys(begin, end) {}
+
+  void encode(buffer::list& bl) const {
+    ENCODE_START(1, 1, bl);
+    encode(keys, bl);
+    ENCODE_FINISH(bl);
+  }
+
+  void decode(buffer::list::const_iterator& bl) {
+    DECODE_START(1, bl);
+    decode(keys, bl);
+    DECODE_FINISH(bl);
+  }
+};
+WRITE_CLASS_ENCODER(reset);
+
 struct list_op {
   std::uint64_t count;
   std::string cursor;
@@ -130,6 +160,7 @@ WRITE_CLASS_ENCODER(list_ret);
 inline constexpr auto CLASS = "sem_set";
 inline constexpr auto INCREMENT = "increment";
 inline constexpr auto DECREMENT = "decrement";
+inline constexpr auto RESET = "reset";
 inline constexpr auto LIST = "list";
 
 } // namespace cls::sem_set
index ec810af9313768cb756c534c9c0090af46bf2cc4..a42100b51adf22baa7082686bb03545672afcb14 100644 (file)
@@ -210,6 +210,30 @@ decrement(I begin, I end, ceph::timespan grace = 0ns)
   }};
 }
 
+/// \brief Reset semaphore
+///
+/// Append a call to a write operation that reset the semaphore
+/// on a key to a given value.
+///
+/// \param key Key to reset
+/// \param val Value to set it to
+///
+/// \note This function exists to be called by radosgw-admin when the
+/// administrator wants to reset a semaphore. It should not be called
+/// in normal RGW operation and can lead to unreplicated objects.
+///
+/// \return The ClsWriteOp to be passed to WriteOp::exec
+[[nodiscard]] inline auto reset(std::string key, std::uint64_t val)
+{
+  namespace ss = ::cls::sem_set;
+  buffer::list in;
+  ss::reset call{std::move(key), val};
+  encode(call, in);
+  return ClsWriteOp{[in = std::move(in)](WriteOp& op) {
+    op.exec(ss::CLASS, ss::RESET, in);
+  }};
+}
+
 /// \brief List keys and semaphores
 ///
 /// Append a call to a read operation that lists keys and semaphores
index b989b81c468481a2cbe5336dc2b8ac8ba7ca827c..438430634649a61934b8aa8b46ac4c99511556e3 100644 (file)
@@ -52,6 +52,7 @@ namespace sys = boost::system;
 
 namespace nlog = ::neorados::cls::log;
 namespace fifo = ::neorados::cls::fifo;
+namespace ss = neorados::cls::sem_set;
 
 namespace async = ceph::async;
 namespace buffer = ceph::buffer;
@@ -369,8 +370,7 @@ RGWDataChangesLog::RGWDataChangesLog(CephContext *cct, bool log_data,
       num_shards(num_shards ? *num_shards :
                 cct->_conf->rgw_data_log_num_shards),
       prefix(get_prefix()), changes(cct->_conf->rgw_data_log_changes_size),
-      sem_max_keys(sem_max_keys ? *sem_max_keys :
-                  neorados::cls::sem_set::max_keys) {}
+      sem_max_keys(sem_max_keys ? *sem_max_keys : ss::max_keys) {}
 
 
 void DataLogBackends::handle_init(entries_t e) {
@@ -786,7 +786,6 @@ RGWDataChangesLog::renew_entries(const DoutPrefixProvider* dpp)
     co_return;
   }
 
-  namespace sem_set = neorados::cls::sem_set;
   // If we didn't error in pushing, we can now decrement the semaphores
   l.lock();
   for (auto index = 0u; index < unsigned(num_shards); ++index) {
@@ -800,7 +799,7 @@ RGWDataChangesLog::renew_entries(const DoutPrefixProvider* dpp)
       auto to_copy = std::min(sem_max_keys, keys.size());
       std::copy_n(keys.begin(), to_copy,
                  std::inserter(batch, batch.end()));
-      auto op = WriteOp{}.exec(sem_set::decrement(std::move(batch)));
+      auto op = WriteOp{}.exec(ss::decrement(std::move(batch)));
       l.unlock();
       co_await rados->execute(get_sem_set_oid(index), loc, std::move(op),
                              asio::use_awaitable);
@@ -956,9 +955,8 @@ void RGWDataChangesLog::add_entry(const DoutPrefixProvider* dpp,
     auto need_sem_set = register_renew(std::move(bg));
     if (need_sem_set) {
       using neorados::WriteOp;
-      using neorados::cls::sem_set::increment;
       rados->execute(get_sem_set_oid(index), loc,
-                    WriteOp{}.exec(increment(std::move(key))), y);
+                    WriteOp{}.exec(ss::increment(std::move(key))), y);
     }
     return;
   }
@@ -1573,8 +1571,8 @@ RGWDataChangesLog::read_sems(int index, std::string cursor) {
   try {
     co_await rados->execute(
       get_sem_set_oid(index), loc,
-      neorados::ReadOp{}.exec(sem_set::list(sem_max_keys, std::move(cursor),
-                                           &out, &cursor)),
+      neorados::ReadOp{}.exec(ss::list(sem_max_keys, std::move(cursor),
+                                      &out, &cursor)),
       nullptr, asio::use_awaitable);
   } catch (const sys::system_error& e) {
     if (e.code() != sys::errc::no_such_file_or_directory) {
@@ -1606,7 +1604,7 @@ RGWDataChangesLog::synthesize_entries(
       change.gen = bg.gen;
       encode(change, bl);
       be->prepare(timestamp, change.key, std::move(bl), batch);
-    } catch (const sys::error_code& e) {
+    } catch (const sys::system_error& e) {
       push_failed = true;
       ldpp_dout(dpp, -1) << "RGWDataChangesLog::synthesize_entries(): Unable to "
                         << "parse Bucketgen key: " << key << "Got exception: "
@@ -1695,7 +1693,7 @@ RGWDataChangesLog::decrement_sems(
     auto grace = ((ceph::mono_clock::now() - fetch_time) * 4) / 3;
     co_await rados->execute(
       get_sem_set_oid(index), loc, neorados::WriteOp{}.exec(
-       sem_set::decrement(std::move(batch), grace)),
+       ss::decrement(std::move(batch), grace)),
       asio::use_awaitable);
   }
 }
@@ -1761,6 +1759,108 @@ asio::awaitable<void> RGWDataChangesLog::recover(const DoutPrefixProvider* dpp,
   l.unlock();
 }
 
+asio::awaitable<void>
+RGWDataChangesLog::admin_sem_list(std::optional<int> req_shard,
+                                 std::uint64_t max_entries,
+                                 std::string marker,
+                                 std::ostream& m,
+                                 ceph::Formatter& formatter)
+{
+  int shard = req_shard.value_or(0);
+  std::string keptmark;
+
+  if (!marker.empty()) {
+    // Signal caught by radosgw-admin
+    BucketGen bg{marker};
+    auto index = choose_oid(bg.shard);
+    if (req_shard && *req_shard != index) {
+      throw sys::system_error{
+       EINVAL, sys::generic_category(),
+       fmt::format("Requested shard {} but marker is for shard {}",
+                   shard, index)};
+    }
+  }
+  bc::flat_map<std::string, std::uint64_t> entries;
+  std::uint64_t count = 0;
+  bool begin_next = false;
+  // So the marker traverses between shards if the last entry in the
+  // shard is the last needed for max_entries
+  std::string mkeep;
+  entries.reserve(sem_max_keys);
+  formatter.open_object_section("semaphores");
+  formatter.open_array_section("entries");
+  while ((max_entries == 0 || (count < max_entries)) && shard < num_shards) {
+    entries.clear();
+    try {
+      if (begin_next) {
+       marker.clear();
+       begin_next = false;
+      }
+      co_await rados->execute(get_sem_set_oid(shard), loc,
+                             neorados::ReadOp{}.
+                             exec(ss::list(std::min(max_entries - count,
+                                                    sem_max_keys),
+                                           marker,
+                                           &entries, &marker)),
+       nullptr, asio::use_awaitable);
+      if (!marker.empty()) {
+       mkeep = marker;
+      }
+    } catch (const sys::system_error& e) {
+      if (e.code() == sys::errc::no_such_file_or_directory) {
+       if (!req_shard) {
+         begin_next = true;
+         ++shard;
+         continue;
+       } else {
+         break;
+       }
+      } else {
+       throw;
+      }
+    }
+    for (auto i = entries.cbegin(); i != entries.cend(); ++i) {
+      const auto& [k, v] = *i;
+      formatter.open_object_section("semaphore");
+      formatter.dump_string("key", k);
+      formatter.dump_unsigned("count", v);
+      formatter.close_section();
+      ++count;
+    }
+    formatter.flush(m);
+    if (marker.empty()) {
+      if (!entries.empty()) {
+       mkeep = (entries.cend() - 1)->first;
+      }
+      if (!req_shard) {
+       ++shard;
+      } else {
+       break;
+      }
+    }
+  }
+  if (shard < num_shards && !req_shard && count == max_entries) {
+    marker = std::move(mkeep);
+  }
+  formatter.close_section();
+  formatter.dump_string("marker", marker);
+  formatter.close_section();
+  formatter.flush(m);
+  co_return;
+}
+
+asio::awaitable<void>
+RGWDataChangesLog::admin_sem_reset(std::string_view marker,
+                                  std::uint64_t count)
+{
+  // Exceptions here are caught by radosgw-admin
+  BucketGen bg{marker};
+  unsigned index = choose_oid(bg.shard);
+  auto wop = neorados::WriteOp{}.exec(ss::reset(std::string(marker), count));
+  co_await rados->execute(get_sem_set_oid(index), loc,
+                         std::move(wop), asio::use_awaitable);
+}
+
 void RGWDataChangesLogInfo::dump(Formatter *f) const
 {
   encode_json("marker", marker, f);
index ec55b588a79c189d272cb9cff9d709388ca298f6..161b2d91776e0948eff42e857ba03aecbf145f76 100644 (file)
@@ -533,6 +533,14 @@ public:
   asio::awaitable<void> shutdown();
   asio::awaitable<void> shutdown_or_timeout();
   void blocking_shutdown();
+
+  asio::awaitable<void> admin_sem_list(std::optional<int> req_shard,
+                                      std::uint64_t max_entries,
+                                      std::string marker,
+                                      std::ostream& m,
+                                      ceph::Formatter& formatter);
+  asio::awaitable<void> admin_sem_reset(std::string_view marker,
+                                       std::uint64_t count);
 };
 
 class RGWDataChangesBE : public boost::intrusive_ref_counter<RGWDataChangesBE> {
index d59cc98cdc047940dc151d5a8c1f6eccae31bbb9..4e5cba2f79639cdac5e1cbd8ff38865c8b50e616 100644 (file)
@@ -11,6 +11,9 @@
 #include <optional>
 #include <iostream>
 
+#include <boost/asio/co_spawn.hpp>
+#include <boost/asio/use_awaitable.hpp>
+
 extern "C" {
 #include <liboath/oath.h>
 }
@@ -31,6 +34,8 @@ extern "C" {
 #include "common/safe_io.h"
 #include "common/fault_injector.h"
 
+#include "common/async/blocked_completion.h"
+
 #include "include/util.h"
 
 #include "cls/rgw/cls_rgw_types.h"
@@ -292,6 +297,8 @@ void usage()
   cout << "  datalog trim                     trim data log\n";
   cout << "  datalog status                   read data log status\n";
   cout << "  datalog type                     change datalog type to --log_type={fifo,omap}\n";
+  cout << "  datalog semaphore list           List recovery semaphores\n";
+  cout << "  datalog semaphore reset          Reset recovery semaphore (use marker)\n";
   cout << "  orphans find                     deprecated -- init and run search for leaked rados objects (use job-id, pool)\n";
   cout << "  orphans finish                   deprecated -- clean up search for leaked rados objects\n";
   cout << "  orphans list-jobs                deprecated -- list the current job-ids for orphans search\n";
@@ -379,6 +386,8 @@ void usage()
   cout << "   --end-date=<date>                 end date in the format yyyy-mm-dd\n";
   cout << "   --bucket-id=<bucket-id>           bucket id\n";
   cout << "   --bucket-new-name=<bucket>        for bucket link: optional new name\n";
+  cout << "   --count=<count>                   optional for:\n";
+  cout << "                                       datalog semaphore reset\n";
   cout << "   --shard-id=<shard-id>             optional for:\n";
   cout << "                                       mdlog list\n";
   cout << "                                       data sync status\n";
@@ -814,6 +823,8 @@ enum class OPT {
   DATALOG_TRIM,
   DATALOG_TYPE,
   DATALOG_PRUNE,
+  DATALOG_SEMAPHORE_LIST,
+  DATALOG_SEMAPHORE_RESET,
   REALM_CREATE,
   REALM_DELETE,
   REALM_GET,
@@ -1059,6 +1070,8 @@ static SimpleCmd::Commands all_cmds = {
   { "datalog trim", OPT::DATALOG_TRIM },
   { "datalog type", OPT::DATALOG_TYPE },
   { "datalog prune", OPT::DATALOG_PRUNE },
+  { "datalog semaphore list", OPT::DATALOG_SEMAPHORE_LIST },
+  { "datalog semaphore reset", OPT::DATALOG_SEMAPHORE_RESET },
   { "realm create", OPT::REALM_CREATE },
   { "realm rm", OPT::REALM_DELETE },
   { "realm get", OPT::REALM_GET },
@@ -3485,6 +3498,27 @@ void init_realm_param(CephContext *cct, string& var, std::optional<string>& opt_
   }
 }
 
+int run_coro(asio::awaitable<void> coro, std::string_view name) {
+  try {
+    // Blocking in startup code, not ideal, but won't hurt anything.
+    std::exception_ptr eptr
+      = asio::co_spawn(static_cast<rgw::sal::RadosStore*>(driver)->get_io_context(),
+                      std::move(coro),
+                      async::use_blocked);
+    if (eptr) {
+      std::rethrow_exception(eptr);
+    }
+  } catch (boost::system::system_error& e) {
+    ldpp_dout(dpp(), -1) << name << ": failed: " << e.what() << dendl;
+    return ceph::from_error_code(e.code());
+  } catch (std::exception& e) {
+    ldpp_dout(dpp(), -1) << name << ": failed: " << e.what() << dendl;
+    return -EIO;
+  }
+  return 0;
+}
+
+
 // This has an uncaught exception. Even if the exception is caught, the program
 // would need to be terminated, so the warning is simply suppressed.
 // coverity[root_function:SUPPRESS]
@@ -3616,6 +3650,7 @@ int main(int argc, const char **argv)
   bool account_root_specified = false;
   int shard_id = -1;
   bool specified_shard_id = false;
+  std::optional<std::uint64_t> count;
   string client_id;
   string op_id;
   string op_mask_str;
@@ -3994,6 +4029,12 @@ int main(int argc, const char **argv)
         return EINVAL;
       }
       specified_shard_id = true;
+    } else if (ceph_argparse_witharg(args, i, &val, "--count", (char*)NULL)) {
+      count = strict_strtol(val.c_str(), 10, &err);
+      if (!err.empty()) {
+        cerr << "ERROR: failed to parse count: " << err << std::endl;
+        return EINVAL;
+      }
     } else if (ceph_argparse_witharg(args, i, &val, "--gen", (char*)NULL)) {
       gen = strict_strtoll(val.c_str(), 10, &err);
       if (!err.empty()) {
@@ -4477,6 +4518,7 @@ int main(int argc, const char **argv)
                         OPT::BILOG_STATUS,
                         OPT::DATA_SYNC_STATUS,
                         OPT::DATALOG_LIST,
+                        OPT::DATALOG_SEMAPHORE_LIST,
                         OPT::DATALOG_STATUS,
                         OPT::REALM_GET,
                         OPT::REALM_GET_DEFAULT,
@@ -10837,6 +10879,35 @@ next:
     }
   }
 
+  if (opt_cmd == OPT::DATALOG_SEMAPHORE_LIST) {
+    auto datalog = static_cast<rgw::sal::RadosStore*>(driver)
+      ->svc()->datalog_rados;
+    std::optional<int> shard;
+    if (specified_shard_id) {
+      shard = shard_id;
+    }
+    ret = run_coro(datalog->admin_sem_list(shard, max_entries, marker,
+                                          cout, *formatter),
+                  "datalog seamphore list");
+    if (ret < 0) {
+      return ret;
+    }
+  }
+
+  if (opt_cmd == OPT::DATALOG_SEMAPHORE_RESET) {
+    if (marker.empty()) {
+      std::cerr << "Specify the semaphore key with --marker." << std::endl;
+      return -EINVAL;
+    }
+    auto datalog = static_cast<rgw::sal::RadosStore*>(driver)
+      ->svc()->datalog_rados;
+    ret = run_coro(datalog->admin_sem_reset(marker, count.value_or(0)),
+                  "datalog seamphore reset");
+    if (ret < 0) {
+      return ret;
+    }
+  }
+
   if (opt_cmd == OPT::DATALOG_LIST) {
     formatter->open_array_section("entries");
     bool truncated;
index a5559f8fe2d377bcc46658624fbbff5d6ec6afce..7601e10dae3a945e6527ddb377998f7537ed02a4 100644 (file)
     datalog trim                     trim data log
     datalog status                   read data log status
     datalog type                     change datalog type to --log_type={fifo,omap}
+    datalog semaphore list           List recovery semaphores
+    datalog semaphore reset          Reset recovery semaphore (use marker)
     orphans find                     deprecated -- init and run search for leaked rados objects (use job-id, pool)
     orphans finish                   deprecated -- clean up search for leaked rados objects
     orphans list-jobs                deprecated -- list the current job-ids for orphans search
      --end-date=<date>                 end date in the format yyyy-mm-dd
      --bucket-id=<bucket-id>           bucket id
      --bucket-new-name=<bucket>        for bucket link: optional new name
+     --count=<count>                   optional for:
+                                         datalog semaphore reset
      --shard-id=<shard-id>             optional for:
                                          mdlog list
                                          data sync status