return 0;
}
+int reset(cls_method_context_t hctx, buffer::list *in, buffer::list *out)
+{
+ CLS_LOG(10, "%s", __PRETTY_FUNCTION__);
+
+ ss::reset op;
+ try {
+ auto iter = in->cbegin();
+ decode(op, iter);
+ } catch (const std::exception& e) {
+ CLS_ERR("ERROR: %s: failed to decode request: %s", __PRETTY_FUNCTION__,
+ e.what());
+ return -EINVAL;
+ }
+
+ if (op.keys.size() > ::cls::sem_set::max_keys) {
+ CLS_ERR("ERROR: %s: too many keys: %zu", __PRETTY_FUNCTION__,
+ op.keys.size());
+ return -E2BIG;
+ }
+
+ for (const auto& [key_, v] : op.keys) try {
+ buffer::list valbl;
+ auto key = std::string(PREFIX) + key_;
+ sem_val val{v};
+ encode(val, valbl);
+ auto r = cls_cxx_map_set_val(hctx, key, &valbl);
+ if (r < 0) {
+ CLS_ERR("ERROR: %s: failed to reset semaphore: r=%d",
+ __PRETTY_FUNCTION__, r);
+ return r;
+ }
+ } catch (const std::exception& e) {
+ CLS_ERR("CAN'T HAPPEN: %s: failed to decode semaphore: %s",
+ __PRETTY_FUNCTION__, e.what());
+ return -EIO;
+ }
+
+ return 0;
+}
+
int decrement(cls_method_context_t hctx, buffer::list *in, buffer::list *out)
{
CLS_LOG(10, "%s", __PRETTY_FUNCTION__);
CLS_METHOD_RD | CLS_METHOD_WR,
&decrement, &h_decrement);
+ cls_register_cxx_method(h_class, ss::RESET,
+ CLS_METHOD_RD | CLS_METHOD_WR,
+ &reset, &h_decrement);
+
cls_register_cxx_method(h_class, ss::LIST,
CLS_METHOD_RD,
&list, &h_list);
};
WRITE_CLASS_ENCODER(decrement);
+struct reset {
+ std::unordered_map<std::string, std::uint64_t> keys;
+
+ reset() = default;
+
+ reset(std::string s, uint64_t val = 0)
+ : keys({{std::move(s), val}}) {}
+
+ reset(decltype(keys) s)
+ : keys(std::move(s)) {}
+
+ template<std::input_iterator I>
+ reset(I begin, I end)
+ requires std::is_convertible_v<typename I::value_type, std::string>
+ : keys(begin, end) {}
+
+ void encode(buffer::list& bl) const {
+ ENCODE_START(1, 1, bl);
+ encode(keys, bl);
+ ENCODE_FINISH(bl);
+ }
+
+ void decode(buffer::list::const_iterator& bl) {
+ DECODE_START(1, bl);
+ decode(keys, bl);
+ DECODE_FINISH(bl);
+ }
+};
+WRITE_CLASS_ENCODER(reset);
+
struct list_op {
std::uint64_t count;
std::string cursor;
inline constexpr auto CLASS = "sem_set";
inline constexpr auto INCREMENT = "increment";
inline constexpr auto DECREMENT = "decrement";
+inline constexpr auto RESET = "reset";
inline constexpr auto LIST = "list";
} // namespace cls::sem_set
}};
}
+/// \brief Reset semaphore
+///
+/// Append a call to a write operation that reset the semaphore
+/// on a key to a given value.
+///
+/// \param key Key to reset
+/// \param val Value to set it to
+///
+/// \note This function exists to be called by radosgw-admin when the
+/// administrator wants to reset a semaphore. It should not be called
+/// in normal RGW operation and can lead to unreplicated objects.
+///
+/// \return The ClsWriteOp to be passed to WriteOp::exec
+[[nodiscard]] inline auto reset(std::string key, std::uint64_t val)
+{
+ namespace ss = ::cls::sem_set;
+ buffer::list in;
+ ss::reset call{std::move(key), val};
+ encode(call, in);
+ return ClsWriteOp{[in = std::move(in)](WriteOp& op) {
+ op.exec(ss::CLASS, ss::RESET, in);
+ }};
+}
+
/// \brief List keys and semaphores
///
/// Append a call to a read operation that lists keys and semaphores
namespace nlog = ::neorados::cls::log;
namespace fifo = ::neorados::cls::fifo;
+namespace ss = neorados::cls::sem_set;
namespace async = ceph::async;
namespace buffer = ceph::buffer;
num_shards(num_shards ? *num_shards :
cct->_conf->rgw_data_log_num_shards),
prefix(get_prefix()), changes(cct->_conf->rgw_data_log_changes_size),
- sem_max_keys(sem_max_keys ? *sem_max_keys :
- neorados::cls::sem_set::max_keys) {}
+ sem_max_keys(sem_max_keys ? *sem_max_keys : ss::max_keys) {}
void DataLogBackends::handle_init(entries_t e) {
co_return;
}
- namespace sem_set = neorados::cls::sem_set;
// If we didn't error in pushing, we can now decrement the semaphores
l.lock();
for (auto index = 0u; index < unsigned(num_shards); ++index) {
auto to_copy = std::min(sem_max_keys, keys.size());
std::copy_n(keys.begin(), to_copy,
std::inserter(batch, batch.end()));
- auto op = WriteOp{}.exec(sem_set::decrement(std::move(batch)));
+ auto op = WriteOp{}.exec(ss::decrement(std::move(batch)));
l.unlock();
co_await rados->execute(get_sem_set_oid(index), loc, std::move(op),
asio::use_awaitable);
auto need_sem_set = register_renew(std::move(bg));
if (need_sem_set) {
using neorados::WriteOp;
- using neorados::cls::sem_set::increment;
rados->execute(get_sem_set_oid(index), loc,
- WriteOp{}.exec(increment(std::move(key))), y);
+ WriteOp{}.exec(ss::increment(std::move(key))), y);
}
return;
}
try {
co_await rados->execute(
get_sem_set_oid(index), loc,
- neorados::ReadOp{}.exec(sem_set::list(sem_max_keys, std::move(cursor),
- &out, &cursor)),
+ neorados::ReadOp{}.exec(ss::list(sem_max_keys, std::move(cursor),
+ &out, &cursor)),
nullptr, asio::use_awaitable);
} catch (const sys::system_error& e) {
if (e.code() != sys::errc::no_such_file_or_directory) {
change.gen = bg.gen;
encode(change, bl);
be->prepare(timestamp, change.key, std::move(bl), batch);
- } catch (const sys::error_code& e) {
+ } catch (const sys::system_error& e) {
push_failed = true;
ldpp_dout(dpp, -1) << "RGWDataChangesLog::synthesize_entries(): Unable to "
<< "parse Bucketgen key: " << key << "Got exception: "
auto grace = ((ceph::mono_clock::now() - fetch_time) * 4) / 3;
co_await rados->execute(
get_sem_set_oid(index), loc, neorados::WriteOp{}.exec(
- sem_set::decrement(std::move(batch), grace)),
+ ss::decrement(std::move(batch), grace)),
asio::use_awaitable);
}
}
l.unlock();
}
+asio::awaitable<void>
+RGWDataChangesLog::admin_sem_list(std::optional<int> req_shard,
+ std::uint64_t max_entries,
+ std::string marker,
+ std::ostream& m,
+ ceph::Formatter& formatter)
+{
+ int shard = req_shard.value_or(0);
+ std::string keptmark;
+
+ if (!marker.empty()) {
+ // Signal caught by radosgw-admin
+ BucketGen bg{marker};
+ auto index = choose_oid(bg.shard);
+ if (req_shard && *req_shard != index) {
+ throw sys::system_error{
+ EINVAL, sys::generic_category(),
+ fmt::format("Requested shard {} but marker is for shard {}",
+ shard, index)};
+ }
+ }
+ bc::flat_map<std::string, std::uint64_t> entries;
+ std::uint64_t count = 0;
+ bool begin_next = false;
+ // So the marker traverses between shards if the last entry in the
+ // shard is the last needed for max_entries
+ std::string mkeep;
+ entries.reserve(sem_max_keys);
+ formatter.open_object_section("semaphores");
+ formatter.open_array_section("entries");
+ while ((max_entries == 0 || (count < max_entries)) && shard < num_shards) {
+ entries.clear();
+ try {
+ if (begin_next) {
+ marker.clear();
+ begin_next = false;
+ }
+ co_await rados->execute(get_sem_set_oid(shard), loc,
+ neorados::ReadOp{}.
+ exec(ss::list(std::min(max_entries - count,
+ sem_max_keys),
+ marker,
+ &entries, &marker)),
+ nullptr, asio::use_awaitable);
+ if (!marker.empty()) {
+ mkeep = marker;
+ }
+ } catch (const sys::system_error& e) {
+ if (e.code() == sys::errc::no_such_file_or_directory) {
+ if (!req_shard) {
+ begin_next = true;
+ ++shard;
+ continue;
+ } else {
+ break;
+ }
+ } else {
+ throw;
+ }
+ }
+ for (auto i = entries.cbegin(); i != entries.cend(); ++i) {
+ const auto& [k, v] = *i;
+ formatter.open_object_section("semaphore");
+ formatter.dump_string("key", k);
+ formatter.dump_unsigned("count", v);
+ formatter.close_section();
+ ++count;
+ }
+ formatter.flush(m);
+ if (marker.empty()) {
+ if (!entries.empty()) {
+ mkeep = (entries.cend() - 1)->first;
+ }
+ if (!req_shard) {
+ ++shard;
+ } else {
+ break;
+ }
+ }
+ }
+ if (shard < num_shards && !req_shard && count == max_entries) {
+ marker = std::move(mkeep);
+ }
+ formatter.close_section();
+ formatter.dump_string("marker", marker);
+ formatter.close_section();
+ formatter.flush(m);
+ co_return;
+}
+
+asio::awaitable<void>
+RGWDataChangesLog::admin_sem_reset(std::string_view marker,
+ std::uint64_t count)
+{
+ // Exceptions here are caught by radosgw-admin
+ BucketGen bg{marker};
+ unsigned index = choose_oid(bg.shard);
+ auto wop = neorados::WriteOp{}.exec(ss::reset(std::string(marker), count));
+ co_await rados->execute(get_sem_set_oid(index), loc,
+ std::move(wop), asio::use_awaitable);
+}
+
void RGWDataChangesLogInfo::dump(Formatter *f) const
{
encode_json("marker", marker, f);
asio::awaitable<void> shutdown();
asio::awaitable<void> shutdown_or_timeout();
void blocking_shutdown();
+
+ asio::awaitable<void> admin_sem_list(std::optional<int> req_shard,
+ std::uint64_t max_entries,
+ std::string marker,
+ std::ostream& m,
+ ceph::Formatter& formatter);
+ asio::awaitable<void> admin_sem_reset(std::string_view marker,
+ std::uint64_t count);
};
class RGWDataChangesBE : public boost::intrusive_ref_counter<RGWDataChangesBE> {
#include <optional>
#include <iostream>
+#include <boost/asio/co_spawn.hpp>
+#include <boost/asio/use_awaitable.hpp>
+
extern "C" {
#include <liboath/oath.h>
}
#include "common/safe_io.h"
#include "common/fault_injector.h"
+#include "common/async/blocked_completion.h"
+
#include "include/util.h"
#include "cls/rgw/cls_rgw_types.h"
cout << " datalog trim trim data log\n";
cout << " datalog status read data log status\n";
cout << " datalog type change datalog type to --log_type={fifo,omap}\n";
+ cout << " datalog semaphore list List recovery semaphores\n";
+ cout << " datalog semaphore reset Reset recovery semaphore (use marker)\n";
cout << " orphans find deprecated -- init and run search for leaked rados objects (use job-id, pool)\n";
cout << " orphans finish deprecated -- clean up search for leaked rados objects\n";
cout << " orphans list-jobs deprecated -- list the current job-ids for orphans search\n";
cout << " --end-date=<date> end date in the format yyyy-mm-dd\n";
cout << " --bucket-id=<bucket-id> bucket id\n";
cout << " --bucket-new-name=<bucket> for bucket link: optional new name\n";
+ cout << " --count=<count> optional for:\n";
+ cout << " datalog semaphore reset\n";
cout << " --shard-id=<shard-id> optional for:\n";
cout << " mdlog list\n";
cout << " data sync status\n";
DATALOG_TRIM,
DATALOG_TYPE,
DATALOG_PRUNE,
+ DATALOG_SEMAPHORE_LIST,
+ DATALOG_SEMAPHORE_RESET,
REALM_CREATE,
REALM_DELETE,
REALM_GET,
{ "datalog trim", OPT::DATALOG_TRIM },
{ "datalog type", OPT::DATALOG_TYPE },
{ "datalog prune", OPT::DATALOG_PRUNE },
+ { "datalog semaphore list", OPT::DATALOG_SEMAPHORE_LIST },
+ { "datalog semaphore reset", OPT::DATALOG_SEMAPHORE_RESET },
{ "realm create", OPT::REALM_CREATE },
{ "realm rm", OPT::REALM_DELETE },
{ "realm get", OPT::REALM_GET },
}
}
+int run_coro(asio::awaitable<void> coro, std::string_view name) {
+ try {
+ // Blocking in startup code, not ideal, but won't hurt anything.
+ std::exception_ptr eptr
+ = asio::co_spawn(static_cast<rgw::sal::RadosStore*>(driver)->get_io_context(),
+ std::move(coro),
+ async::use_blocked);
+ if (eptr) {
+ std::rethrow_exception(eptr);
+ }
+ } catch (boost::system::system_error& e) {
+ ldpp_dout(dpp(), -1) << name << ": failed: " << e.what() << dendl;
+ return ceph::from_error_code(e.code());
+ } catch (std::exception& e) {
+ ldpp_dout(dpp(), -1) << name << ": failed: " << e.what() << dendl;
+ return -EIO;
+ }
+ return 0;
+}
+
+
// This has an uncaught exception. Even if the exception is caught, the program
// would need to be terminated, so the warning is simply suppressed.
// coverity[root_function:SUPPRESS]
bool account_root_specified = false;
int shard_id = -1;
bool specified_shard_id = false;
+ std::optional<std::uint64_t> count;
string client_id;
string op_id;
string op_mask_str;
return EINVAL;
}
specified_shard_id = true;
+ } else if (ceph_argparse_witharg(args, i, &val, "--count", (char*)NULL)) {
+ count = strict_strtol(val.c_str(), 10, &err);
+ if (!err.empty()) {
+ cerr << "ERROR: failed to parse count: " << err << std::endl;
+ return EINVAL;
+ }
} else if (ceph_argparse_witharg(args, i, &val, "--gen", (char*)NULL)) {
gen = strict_strtoll(val.c_str(), 10, &err);
if (!err.empty()) {
OPT::BILOG_STATUS,
OPT::DATA_SYNC_STATUS,
OPT::DATALOG_LIST,
+ OPT::DATALOG_SEMAPHORE_LIST,
OPT::DATALOG_STATUS,
OPT::REALM_GET,
OPT::REALM_GET_DEFAULT,
}
}
+ if (opt_cmd == OPT::DATALOG_SEMAPHORE_LIST) {
+ auto datalog = static_cast<rgw::sal::RadosStore*>(driver)
+ ->svc()->datalog_rados;
+ std::optional<int> shard;
+ if (specified_shard_id) {
+ shard = shard_id;
+ }
+ ret = run_coro(datalog->admin_sem_list(shard, max_entries, marker,
+ cout, *formatter),
+ "datalog seamphore list");
+ if (ret < 0) {
+ return ret;
+ }
+ }
+
+ if (opt_cmd == OPT::DATALOG_SEMAPHORE_RESET) {
+ if (marker.empty()) {
+ std::cerr << "Specify the semaphore key with --marker." << std::endl;
+ return -EINVAL;
+ }
+ auto datalog = static_cast<rgw::sal::RadosStore*>(driver)
+ ->svc()->datalog_rados;
+ ret = run_coro(datalog->admin_sem_reset(marker, count.value_or(0)),
+ "datalog seamphore reset");
+ if (ret < 0) {
+ return ret;
+ }
+ }
+
if (opt_cmd == OPT::DATALOG_LIST) {
formatter->open_array_section("entries");
bool truncated;
datalog trim trim data log
datalog status read data log status
datalog type change datalog type to --log_type={fifo,omap}
+ datalog semaphore list List recovery semaphores
+ datalog semaphore reset Reset recovery semaphore (use marker)
orphans find deprecated -- init and run search for leaked rados objects (use job-id, pool)
orphans finish deprecated -- clean up search for leaked rados objects
orphans list-jobs deprecated -- list the current job-ids for orphans search
--end-date=<date> end date in the format yyyy-mm-dd
--bucket-id=<bucket-id> bucket id
--bucket-new-name=<bucket> for bucket link: optional new name
+ --count=<count> optional for:
+ datalog semaphore reset
--shard-id=<shard-id> optional for:
mdlog list
data sync status