From e0c4ef098eb6f5a84c827266e3b360810eea425f Mon Sep 17 00:00:00 2001 From: Adam Emerson Date: Tue, 17 Sep 2024 17:23:57 -0400 Subject: [PATCH] rgw: Add recovery semaphore commands to radosgw-admin Signed-off-by: Adam Emerson --- src/cls/sem_set/module.cc | 44 +++++++++ src/cls/sem_set/ops.h | 31 +++++++ src/neorados/cls/sem_set.h | 24 +++++ src/rgw/driver/rados/rgw_datalog.cc | 120 ++++++++++++++++++++++--- src/rgw/driver/rados/rgw_datalog.h | 8 ++ src/rgw/radosgw-admin/radosgw-admin.cc | 71 +++++++++++++++ src/test/cli/radosgw-admin/help.t | 4 + 7 files changed, 292 insertions(+), 10 deletions(-) diff --git a/src/cls/sem_set/module.cc b/src/cls/sem_set/module.cc index 66d33b9dbc369..3c0678368ca1a 100644 --- a/src/cls/sem_set/module.cc +++ b/src/cls/sem_set/module.cc @@ -104,6 +104,46 @@ int increment(cls_method_context_t hctx, buffer::list *in, buffer::list *out) return 0; } +int reset(cls_method_context_t hctx, buffer::list *in, buffer::list *out) +{ + CLS_LOG(10, "%s", __PRETTY_FUNCTION__); + + ss::reset op; + try { + auto iter = in->cbegin(); + decode(op, iter); + } catch (const std::exception& e) { + CLS_ERR("ERROR: %s: failed to decode request: %s", __PRETTY_FUNCTION__, + e.what()); + return -EINVAL; + } + + if (op.keys.size() > ::cls::sem_set::max_keys) { + CLS_ERR("ERROR: %s: too many keys: %zu", __PRETTY_FUNCTION__, + op.keys.size()); + return -E2BIG; + } + + for (const auto& [key_, v] : op.keys) try { + buffer::list valbl; + auto key = std::string(PREFIX) + key_; + sem_val val{v}; + encode(val, valbl); + auto r = cls_cxx_map_set_val(hctx, key, &valbl); + if (r < 0) { + CLS_ERR("ERROR: %s: failed to reset semaphore: r=%d", + __PRETTY_FUNCTION__, r); + return r; + } + } catch (const std::exception& e) { + CLS_ERR("CAN'T HAPPEN: %s: failed to decode semaphore: %s", + __PRETTY_FUNCTION__, e.what()); + return -EIO; + } + + return 0; +} + int decrement(cls_method_context_t hctx, buffer::list *in, buffer::list *out) { CLS_LOG(10, "%s", __PRETTY_FUNCTION__); @@ -248,6 +288,10 @@ CLS_INIT(sem_set) CLS_METHOD_RD | CLS_METHOD_WR, &decrement, &h_decrement); + cls_register_cxx_method(h_class, ss::RESET, + CLS_METHOD_RD | CLS_METHOD_WR, + &reset, &h_decrement); + cls_register_cxx_method(h_class, ss::LIST, CLS_METHOD_RD, &list, &h_list); diff --git a/src/cls/sem_set/ops.h b/src/cls/sem_set/ops.h index 24d63dca136c7..be906f18d3323 100644 --- a/src/cls/sem_set/ops.h +++ b/src/cls/sem_set/ops.h @@ -85,6 +85,36 @@ struct decrement { }; WRITE_CLASS_ENCODER(decrement); +struct reset { + std::unordered_map keys; + + reset() = default; + + reset(std::string s, uint64_t val = 0) + : keys({{std::move(s), val}}) {} + + reset(decltype(keys) s) + : keys(std::move(s)) {} + + template + reset(I begin, I end) + requires std::is_convertible_v + : keys(begin, end) {} + + void encode(buffer::list& bl) const { + ENCODE_START(1, 1, bl); + encode(keys, bl); + ENCODE_FINISH(bl); + } + + void decode(buffer::list::const_iterator& bl) { + DECODE_START(1, bl); + decode(keys, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(reset); + struct list_op { std::uint64_t count; std::string cursor; @@ -130,6 +160,7 @@ WRITE_CLASS_ENCODER(list_ret); inline constexpr auto CLASS = "sem_set"; inline constexpr auto INCREMENT = "increment"; inline constexpr auto DECREMENT = "decrement"; +inline constexpr auto RESET = "reset"; inline constexpr auto LIST = "list"; } // namespace cls::sem_set diff --git a/src/neorados/cls/sem_set.h b/src/neorados/cls/sem_set.h index ec810af931376..a42100b51adf2 100644 --- a/src/neorados/cls/sem_set.h +++ b/src/neorados/cls/sem_set.h @@ -210,6 +210,30 @@ decrement(I begin, I end, ceph::timespan grace = 0ns) }}; } +/// \brief Reset semaphore +/// +/// Append a call to a write operation that reset the semaphore +/// on a key to a given value. +/// +/// \param key Key to reset +/// \param val Value to set it to +/// +/// \note This function exists to be called by radosgw-admin when the +/// administrator wants to reset a semaphore. It should not be called +/// in normal RGW operation and can lead to unreplicated objects. +/// +/// \return The ClsWriteOp to be passed to WriteOp::exec +[[nodiscard]] inline auto reset(std::string key, std::uint64_t val) +{ + namespace ss = ::cls::sem_set; + buffer::list in; + ss::reset call{std::move(key), val}; + encode(call, in); + return ClsWriteOp{[in = std::move(in)](WriteOp& op) { + op.exec(ss::CLASS, ss::RESET, in); + }}; +} + /// \brief List keys and semaphores /// /// Append a call to a read operation that lists keys and semaphores diff --git a/src/rgw/driver/rados/rgw_datalog.cc b/src/rgw/driver/rados/rgw_datalog.cc index b989b81c46848..438430634649a 100644 --- a/src/rgw/driver/rados/rgw_datalog.cc +++ b/src/rgw/driver/rados/rgw_datalog.cc @@ -52,6 +52,7 @@ namespace sys = boost::system; namespace nlog = ::neorados::cls::log; namespace fifo = ::neorados::cls::fifo; +namespace ss = neorados::cls::sem_set; namespace async = ceph::async; namespace buffer = ceph::buffer; @@ -369,8 +370,7 @@ RGWDataChangesLog::RGWDataChangesLog(CephContext *cct, bool log_data, num_shards(num_shards ? *num_shards : cct->_conf->rgw_data_log_num_shards), prefix(get_prefix()), changes(cct->_conf->rgw_data_log_changes_size), - sem_max_keys(sem_max_keys ? *sem_max_keys : - neorados::cls::sem_set::max_keys) {} + sem_max_keys(sem_max_keys ? *sem_max_keys : ss::max_keys) {} void DataLogBackends::handle_init(entries_t e) { @@ -786,7 +786,6 @@ RGWDataChangesLog::renew_entries(const DoutPrefixProvider* dpp) co_return; } - namespace sem_set = neorados::cls::sem_set; // If we didn't error in pushing, we can now decrement the semaphores l.lock(); for (auto index = 0u; index < unsigned(num_shards); ++index) { @@ -800,7 +799,7 @@ RGWDataChangesLog::renew_entries(const DoutPrefixProvider* dpp) auto to_copy = std::min(sem_max_keys, keys.size()); std::copy_n(keys.begin(), to_copy, std::inserter(batch, batch.end())); - auto op = WriteOp{}.exec(sem_set::decrement(std::move(batch))); + auto op = WriteOp{}.exec(ss::decrement(std::move(batch))); l.unlock(); co_await rados->execute(get_sem_set_oid(index), loc, std::move(op), asio::use_awaitable); @@ -956,9 +955,8 @@ void RGWDataChangesLog::add_entry(const DoutPrefixProvider* dpp, auto need_sem_set = register_renew(std::move(bg)); if (need_sem_set) { using neorados::WriteOp; - using neorados::cls::sem_set::increment; rados->execute(get_sem_set_oid(index), loc, - WriteOp{}.exec(increment(std::move(key))), y); + WriteOp{}.exec(ss::increment(std::move(key))), y); } return; } @@ -1573,8 +1571,8 @@ RGWDataChangesLog::read_sems(int index, std::string cursor) { try { co_await rados->execute( get_sem_set_oid(index), loc, - neorados::ReadOp{}.exec(sem_set::list(sem_max_keys, std::move(cursor), - &out, &cursor)), + neorados::ReadOp{}.exec(ss::list(sem_max_keys, std::move(cursor), + &out, &cursor)), nullptr, asio::use_awaitable); } catch (const sys::system_error& e) { if (e.code() != sys::errc::no_such_file_or_directory) { @@ -1606,7 +1604,7 @@ RGWDataChangesLog::synthesize_entries( change.gen = bg.gen; encode(change, bl); be->prepare(timestamp, change.key, std::move(bl), batch); - } catch (const sys::error_code& e) { + } catch (const sys::system_error& e) { push_failed = true; ldpp_dout(dpp, -1) << "RGWDataChangesLog::synthesize_entries(): Unable to " << "parse Bucketgen key: " << key << "Got exception: " @@ -1695,7 +1693,7 @@ RGWDataChangesLog::decrement_sems( auto grace = ((ceph::mono_clock::now() - fetch_time) * 4) / 3; co_await rados->execute( get_sem_set_oid(index), loc, neorados::WriteOp{}.exec( - sem_set::decrement(std::move(batch), grace)), + ss::decrement(std::move(batch), grace)), asio::use_awaitable); } } @@ -1761,6 +1759,108 @@ asio::awaitable RGWDataChangesLog::recover(const DoutPrefixProvider* dpp, l.unlock(); } +asio::awaitable +RGWDataChangesLog::admin_sem_list(std::optional req_shard, + std::uint64_t max_entries, + std::string marker, + std::ostream& m, + ceph::Formatter& formatter) +{ + int shard = req_shard.value_or(0); + std::string keptmark; + + if (!marker.empty()) { + // Signal caught by radosgw-admin + BucketGen bg{marker}; + auto index = choose_oid(bg.shard); + if (req_shard && *req_shard != index) { + throw sys::system_error{ + EINVAL, sys::generic_category(), + fmt::format("Requested shard {} but marker is for shard {}", + shard, index)}; + } + } + bc::flat_map entries; + std::uint64_t count = 0; + bool begin_next = false; + // So the marker traverses between shards if the last entry in the + // shard is the last needed for max_entries + std::string mkeep; + entries.reserve(sem_max_keys); + formatter.open_object_section("semaphores"); + formatter.open_array_section("entries"); + while ((max_entries == 0 || (count < max_entries)) && shard < num_shards) { + entries.clear(); + try { + if (begin_next) { + marker.clear(); + begin_next = false; + } + co_await rados->execute(get_sem_set_oid(shard), loc, + neorados::ReadOp{}. + exec(ss::list(std::min(max_entries - count, + sem_max_keys), + marker, + &entries, &marker)), + nullptr, asio::use_awaitable); + if (!marker.empty()) { + mkeep = marker; + } + } catch (const sys::system_error& e) { + if (e.code() == sys::errc::no_such_file_or_directory) { + if (!req_shard) { + begin_next = true; + ++shard; + continue; + } else { + break; + } + } else { + throw; + } + } + for (auto i = entries.cbegin(); i != entries.cend(); ++i) { + const auto& [k, v] = *i; + formatter.open_object_section("semaphore"); + formatter.dump_string("key", k); + formatter.dump_unsigned("count", v); + formatter.close_section(); + ++count; + } + formatter.flush(m); + if (marker.empty()) { + if (!entries.empty()) { + mkeep = (entries.cend() - 1)->first; + } + if (!req_shard) { + ++shard; + } else { + break; + } + } + } + if (shard < num_shards && !req_shard && count == max_entries) { + marker = std::move(mkeep); + } + formatter.close_section(); + formatter.dump_string("marker", marker); + formatter.close_section(); + formatter.flush(m); + co_return; +} + +asio::awaitable +RGWDataChangesLog::admin_sem_reset(std::string_view marker, + std::uint64_t count) +{ + // Exceptions here are caught by radosgw-admin + BucketGen bg{marker}; + unsigned index = choose_oid(bg.shard); + auto wop = neorados::WriteOp{}.exec(ss::reset(std::string(marker), count)); + co_await rados->execute(get_sem_set_oid(index), loc, + std::move(wop), asio::use_awaitable); +} + void RGWDataChangesLogInfo::dump(Formatter *f) const { encode_json("marker", marker, f); diff --git a/src/rgw/driver/rados/rgw_datalog.h b/src/rgw/driver/rados/rgw_datalog.h index ec55b588a79c1..161b2d91776e0 100644 --- a/src/rgw/driver/rados/rgw_datalog.h +++ b/src/rgw/driver/rados/rgw_datalog.h @@ -533,6 +533,14 @@ public: asio::awaitable shutdown(); asio::awaitable shutdown_or_timeout(); void blocking_shutdown(); + + asio::awaitable admin_sem_list(std::optional req_shard, + std::uint64_t max_entries, + std::string marker, + std::ostream& m, + ceph::Formatter& formatter); + asio::awaitable admin_sem_reset(std::string_view marker, + std::uint64_t count); }; class RGWDataChangesBE : public boost::intrusive_ref_counter { diff --git a/src/rgw/radosgw-admin/radosgw-admin.cc b/src/rgw/radosgw-admin/radosgw-admin.cc index d59cc98cdc047..4e5cba2f79639 100644 --- a/src/rgw/radosgw-admin/radosgw-admin.cc +++ b/src/rgw/radosgw-admin/radosgw-admin.cc @@ -11,6 +11,9 @@ #include #include +#include +#include + extern "C" { #include } @@ -31,6 +34,8 @@ extern "C" { #include "common/safe_io.h" #include "common/fault_injector.h" +#include "common/async/blocked_completion.h" + #include "include/util.h" #include "cls/rgw/cls_rgw_types.h" @@ -292,6 +297,8 @@ void usage() cout << " datalog trim trim data log\n"; cout << " datalog status read data log status\n"; cout << " datalog type change datalog type to --log_type={fifo,omap}\n"; + cout << " datalog semaphore list List recovery semaphores\n"; + cout << " datalog semaphore reset Reset recovery semaphore (use marker)\n"; cout << " orphans find deprecated -- init and run search for leaked rados objects (use job-id, pool)\n"; cout << " orphans finish deprecated -- clean up search for leaked rados objects\n"; cout << " orphans list-jobs deprecated -- list the current job-ids for orphans search\n"; @@ -379,6 +386,8 @@ void usage() cout << " --end-date= end date in the format yyyy-mm-dd\n"; cout << " --bucket-id= bucket id\n"; cout << " --bucket-new-name= for bucket link: optional new name\n"; + cout << " --count= optional for:\n"; + cout << " datalog semaphore reset\n"; cout << " --shard-id= optional for:\n"; cout << " mdlog list\n"; cout << " data sync status\n"; @@ -814,6 +823,8 @@ enum class OPT { DATALOG_TRIM, DATALOG_TYPE, DATALOG_PRUNE, + DATALOG_SEMAPHORE_LIST, + DATALOG_SEMAPHORE_RESET, REALM_CREATE, REALM_DELETE, REALM_GET, @@ -1059,6 +1070,8 @@ static SimpleCmd::Commands all_cmds = { { "datalog trim", OPT::DATALOG_TRIM }, { "datalog type", OPT::DATALOG_TYPE }, { "datalog prune", OPT::DATALOG_PRUNE }, + { "datalog semaphore list", OPT::DATALOG_SEMAPHORE_LIST }, + { "datalog semaphore reset", OPT::DATALOG_SEMAPHORE_RESET }, { "realm create", OPT::REALM_CREATE }, { "realm rm", OPT::REALM_DELETE }, { "realm get", OPT::REALM_GET }, @@ -3485,6 +3498,27 @@ void init_realm_param(CephContext *cct, string& var, std::optional& opt_ } } +int run_coro(asio::awaitable coro, std::string_view name) { + try { + // Blocking in startup code, not ideal, but won't hurt anything. + std::exception_ptr eptr + = asio::co_spawn(static_cast(driver)->get_io_context(), + std::move(coro), + async::use_blocked); + if (eptr) { + std::rethrow_exception(eptr); + } + } catch (boost::system::system_error& e) { + ldpp_dout(dpp(), -1) << name << ": failed: " << e.what() << dendl; + return ceph::from_error_code(e.code()); + } catch (std::exception& e) { + ldpp_dout(dpp(), -1) << name << ": failed: " << e.what() << dendl; + return -EIO; + } + return 0; +} + + // This has an uncaught exception. Even if the exception is caught, the program // would need to be terminated, so the warning is simply suppressed. // coverity[root_function:SUPPRESS] @@ -3616,6 +3650,7 @@ int main(int argc, const char **argv) bool account_root_specified = false; int shard_id = -1; bool specified_shard_id = false; + std::optional count; string client_id; string op_id; string op_mask_str; @@ -3994,6 +4029,12 @@ int main(int argc, const char **argv) return EINVAL; } specified_shard_id = true; + } else if (ceph_argparse_witharg(args, i, &val, "--count", (char*)NULL)) { + count = strict_strtol(val.c_str(), 10, &err); + if (!err.empty()) { + cerr << "ERROR: failed to parse count: " << err << std::endl; + return EINVAL; + } } else if (ceph_argparse_witharg(args, i, &val, "--gen", (char*)NULL)) { gen = strict_strtoll(val.c_str(), 10, &err); if (!err.empty()) { @@ -4477,6 +4518,7 @@ int main(int argc, const char **argv) OPT::BILOG_STATUS, OPT::DATA_SYNC_STATUS, OPT::DATALOG_LIST, + OPT::DATALOG_SEMAPHORE_LIST, OPT::DATALOG_STATUS, OPT::REALM_GET, OPT::REALM_GET_DEFAULT, @@ -10837,6 +10879,35 @@ next: } } + if (opt_cmd == OPT::DATALOG_SEMAPHORE_LIST) { + auto datalog = static_cast(driver) + ->svc()->datalog_rados; + std::optional shard; + if (specified_shard_id) { + shard = shard_id; + } + ret = run_coro(datalog->admin_sem_list(shard, max_entries, marker, + cout, *formatter), + "datalog seamphore list"); + if (ret < 0) { + return ret; + } + } + + if (opt_cmd == OPT::DATALOG_SEMAPHORE_RESET) { + if (marker.empty()) { + std::cerr << "Specify the semaphore key with --marker." << std::endl; + return -EINVAL; + } + auto datalog = static_cast(driver) + ->svc()->datalog_rados; + ret = run_coro(datalog->admin_sem_reset(marker, count.value_or(0)), + "datalog seamphore reset"); + if (ret < 0) { + return ret; + } + } + if (opt_cmd == OPT::DATALOG_LIST) { formatter->open_array_section("entries"); bool truncated; diff --git a/src/test/cli/radosgw-admin/help.t b/src/test/cli/radosgw-admin/help.t index a5559f8fe2d37..7601e10dae3a9 100644 --- a/src/test/cli/radosgw-admin/help.t +++ b/src/test/cli/radosgw-admin/help.t @@ -162,6 +162,8 @@ datalog trim trim data log datalog status read data log status datalog type change datalog type to --log_type={fifo,omap} + datalog semaphore list List recovery semaphores + datalog semaphore reset Reset recovery semaphore (use marker) orphans find deprecated -- init and run search for leaked rados objects (use job-id, pool) orphans finish deprecated -- clean up search for leaked rados objects orphans list-jobs deprecated -- list the current job-ids for orphans search @@ -249,6 +251,8 @@ --end-date= end date in the format yyyy-mm-dd --bucket-id= bucket id --bucket-new-name= for bucket link: optional new name + --count= optional for: + datalog semaphore reset --shard-id= optional for: mdlog list data sync status -- 2.39.5