}
asio::awaitable<std::tuple<std::vector<rgw_data_change_log_entry>,
- std::string>>
+ std::string, bool>>
RGWDataChangesLog::list_entries(const DoutPrefixProvider* dpp, int shard,
int max_entries, std::string marker)
{
}
if (max_entries <= 0) {
co_return std::make_tuple(std::vector<rgw_data_change_log_entry>{},
- std::string{});
+ std::string{}, false);
}
std::vector<rgw_data_change_log_entry> entries(max_entries);
entries.resize(max_entries);
auto [spanentries, outmark] = co_await bes->list(dpp, shard, entries, marker);
entries.resize(spanentries.size());
- co_return std::make_tuple(std::move(entries), std::move(outmark));
-}
-
-int RGWDataChangesLog::list_entries(
- const DoutPrefixProvider *dpp, int shard,
- int max_entries, std::vector<rgw_data_change_log_entry>& entries,
- std::string_view marker, std::string* out_marker, bool* truncated,
- std::string* errstr, optional_yield y)
-{
- std::tuple<std::span<rgw_data_change_log_entry>,
- std::string> out;
- if (shard >= num_shards) [[unlikely]] {
- if (errstr) {
- *errstr = fmt::format("{} is not a valid shard. Valid shards are integers in [0, {})",
- shard, num_shards);
- }
- return -EINVAL;
- }
- if (std::ssize(entries) < max_entries) {
- entries.resize(max_entries);
- }
- try {
- if (y) {
- auto& yield = y.get_yield_context();
- out = asio::co_spawn(yield.get_executor(),
- bes->list(dpp, shard, entries,
- std::string{marker}),
- yield);
- } else {
- maybe_warn_about_blocking(dpp);
- out = asio::co_spawn(rados->get_executor(),
- bes->list(dpp, shard, entries,
- std::string{marker}),
- async::use_blocked);
- }
- } catch (const std::exception&) {
- return ceph::from_exception(std::current_exception());
- }
- auto& [outries, outmark] = out;
- if (auto size = std::ssize(outries); size < std::ssize(entries)) {
- entries.resize(size);
- }
- if (truncated) {
- *truncated = !outmark.empty();
- }
- if (out_marker) {
- *out_marker = std::move(outmark);
- }
- return 0;
+ bool truncated = !outmark.empty();
+ co_return std::make_tuple(std::move(entries), std::move(outmark), truncated);
}
asio::awaitable<std::tuple<std::vector<rgw_data_change_log_entry>,
- RGWDataChangesLogMarker>>
+ RGWDataChangesLogMarker, bool>>
RGWDataChangesLog::list_entries(const DoutPrefixProvider *dpp,
int max_entries, RGWDataChangesLogMarker marker)
{
if (max_entries <= 0) {
co_return std::make_tuple(std::vector<rgw_data_change_log_entry>{},
- RGWDataChangesLogMarker{});
+ RGWDataChangesLogMarker{}, false);
}
std::vector<rgw_data_change_log_entry> entries(max_entries);
if (!remaining.empty()) {
entries.resize(entries.size() - remaining.size());
}
- co_return std::make_tuple(std::move(entries), std::move(marker));
+ bool truncated = marker;
+ co_return std::make_tuple(std::move(entries), std::move(marker), truncated);
}
-int RGWDataChangesLog::list_entries(const DoutPrefixProvider *dpp,int max_entries,
- std::vector<rgw_data_change_log_entry>& entries,
- RGWDataChangesLogMarker& marker, bool *ptruncated,
- optional_yield y)
-{
- std::tuple<std::vector<rgw_data_change_log_entry>,
- RGWDataChangesLogMarker> out;
- if (std::ssize(entries) < max_entries) {
- entries.resize(max_entries);
- }
- try {
- if (y) {
- auto& yield = y.get_yield_context();
- out = asio::co_spawn(yield.get_executor(),
- list_entries(dpp, max_entries,
- RGWDataChangesLogMarker{marker}),
- yield);
- } else {
- maybe_warn_about_blocking(dpp);
- out = asio::co_spawn(rados->get_executor(),
- list_entries(dpp, max_entries,
- RGWDataChangesLogMarker{marker}),
- async::use_blocked);
- }
- } catch (const std::exception&) {
- return ceph::from_exception(std::current_exception());
- }
- auto& [outries, outmark] = out;
- if (auto size = std::ssize(outries); size < std::ssize(entries)) {
- entries.resize(size);
- }
- if (ptruncated) {
- *ptruncated = (outmark.shard > 0 || !outmark.marker.empty());
- }
- marker = std::move(outmark);
- return 0;
-}
-
-int RGWDataChangesLog::get_info(const DoutPrefixProvider* dpp, int shard_id,
- RGWDataChangesLogInfo* info,
- std::string* errstr, optional_yield y)
+asio::awaitable<RGWDataChangesLogInfo>
+RGWDataChangesLog::get_info(const DoutPrefixProvider* dpp, int shard_id)
{
if (shard_id >= num_shards) [[unlikely]] {
- if (errstr) {
- *errstr = fmt::format(
+ throw sys::system_error{-EINVAL, sys::generic_category(),
+ fmt::format(
"{} is not a valid shard. Valid shards are integers in [0, {})",
- shard_id, num_shards);
- }
+ shard_id, num_shards)};
}
auto be = bes->head();
- try {
- if (y) {
- auto& yield = y.get_yield_context();
- *info = asio::co_spawn(yield.get_executor(),
- be->get_info(dpp, shard_id),
- yield);
- } else {
- maybe_warn_about_blocking(dpp);
- *info = asio::co_spawn(rados->get_executor(),
- be->get_info(dpp, shard_id),
- async::use_blocked);
- }
- } catch (const std::exception&) {
- return ceph::from_exception(std::current_exception());
- }
- if (!info->marker.empty()) {
- info->marker = gencursor(be->gen_id, info->marker);
+ auto info = co_await be->get_info(dpp, shard_id);
+ if (!info.marker.empty()) {
+ info.marker = gencursor(be->gen_id, info.marker);
}
- return 0;
+ co_return info;
}
asio::awaitable<void> DataLogBackends::trim_entries(
co_return;
}
-int RGWDataChangesLog::trim_entries(const DoutPrefixProvider *dpp, int shard_id,
- std::string_view marker, std::string* errstr,
- optional_yield y)
+asio::awaitable<void>
+RGWDataChangesLog::trim_entries(const DoutPrefixProvider *dpp, int shard_id,
+ std::string_view marker)
{
if (shard_id >= num_shards) [[unlikely]] {
- if (errstr) {
- *errstr = fmt::format(
+ throw sys::system_error{-EINVAL, sys::generic_category(),
+ fmt::format(
"{} is not a valid shard. Valid shards are integers in [0, {})",
- shard_id, num_shards);
- }
- }
- try {
- if (y) {
- auto& yield = y.get_yield_context();
- asio::co_spawn(yield.get_executor(),
- bes->trim_entries(dpp, shard_id, marker),
- yield);
- } else {
- maybe_warn_about_blocking(dpp);
- asio::co_spawn(rados->get_executor(),
- bes->trim_entries(dpp, shard_id, marker),
- async::use_blocked);
- }
- } catch (const std::exception& e) {
- return ceph::from_exception(std::current_exception());
+ shard_id, num_shards)};
}
- return 0;
+ auto be = bes->head();
+ co_return co_await bes->trim_entries(dpp, shard_id, marker);
}
-int RGWDataChangesLog::trim_entries(const DoutPrefixProvider* dpp, int shard_id,
- std::string_view marker,
- librados::AioCompletion* c)
+void RGWDataChangesLog::trim_entries(const DoutPrefixProvider* dpp, int shard_id,
+ std::string_view marker,
+ librados::AioCompletion* c)
{
- if (shard_id >= num_shards) [[unlikely]] {
- return -EINVAL;
- }
asio::co_spawn(rados->get_executor(),
- bes->trim_entries(dpp, shard_id, marker),
+ trim_entries(dpp, shard_id, marker),
c);
- return 0;
}
std::optional<uint64_t> through;
ldpp_dout(&dp, 2) << "RGWDataChangesLog::ChangesRenewThread: pruning old generations" << dendl;
operation = "trim_generations"sv;
- co_await bes->trim_generations(&dp, through);
+ co_await trim_generations(&dp, through);
operation = {};
if (through) {
ldpp_dout(&dp, 2)
"~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~");
}
-int RGWDataChangesLog::change_format(const DoutPrefixProvider *dpp,
- log_type type,optional_yield y)
+asio::awaitable<void>
+RGWDataChangesLog::change_format(const DoutPrefixProvider *dpp, log_type type)
{
- try {
- if (y) {
- auto& yield = y.get_yield_context();
- asio::co_spawn(yield.get_executor(),
- bes->new_backing(dpp, type),
- yield);
- } else {
- maybe_warn_about_blocking(dpp);
- asio::co_spawn(rados->get_executor(),
- bes->new_backing(dpp, type),
- async::use_blocked);
- }
- } catch (const std::exception&) {
- return ceph::from_exception(std::current_exception());
- }
- return 0;;
+ co_return co_await bes->new_backing(dpp, type);
}
-int RGWDataChangesLog::trim_generations(const DoutPrefixProvider *dpp,
- std::optional<uint64_t>& through,
- optional_yield y)
+asio::awaitable<void>
+RGWDataChangesLog::trim_generations(const DoutPrefixProvider *dpp,
+ std::optional<uint64_t>& through)
{
- try {
- if (y) {
- auto& yield = y.get_yield_context();
- asio::co_spawn(yield.get_executor(),
- bes->trim_generations(dpp, through),
- yield);
- } else {
- maybe_warn_about_blocking(dpp);
- asio::co_spawn(rados->get_executor(),
- bes->trim_generations(dpp, through),
- async::use_blocked);
- }
- } catch (const std::exception& e) {
- return ceph::from_exception(std::current_exception());
- }
-
- return 0;
+ co_return co_await bes->trim_generations(dpp, through);
}
asio::awaitable<std::pair<bc::flat_map<std::string, uint64_t>,
std::string>>
RGWDataChangesLog::read_sems(int index, std::string cursor) {
- namespace sem_set = neorados::cls::sem_set;
bc::flat_map<std::string, uint64_t> out;
try {
co_await rados->execute(
int shard_id, optional_yield y);
int get_log_shard_id(rgw_bucket& bucket, int shard_id);
asio::awaitable<std::tuple<std::vector<rgw_data_change_log_entry>,
- std::string>>
+ std::string, bool>>
list_entries(const DoutPrefixProvider* dpp, int shard, int max_entries,
std::string marker);
- int list_entries(const DoutPrefixProvider *dpp, int shard, int max_entries,
- std::vector<rgw_data_change_log_entry>& entries,
- std::string_view marker, std::string* out_marker,
- bool* truncated, std::string* errstr, optional_yield y);
asio::awaitable<std::tuple<std::vector<rgw_data_change_log_entry>,
- RGWDataChangesLogMarker>>
+ RGWDataChangesLogMarker, bool>>
list_entries(const DoutPrefixProvider *dpp, int max_entries,
RGWDataChangesLogMarker marker);
- int list_entries(const DoutPrefixProvider *dpp, int max_entries,
- std::vector<rgw_data_change_log_entry>& entries,
- RGWDataChangesLogMarker& marker, bool* ptruncated,
- optional_yield y);
-
- int trim_entries(const DoutPrefixProvider *dpp, int shard_id,
- std::string_view marker, std::string* errstr, optional_yield y);
- int trim_entries(const DoutPrefixProvider *dpp, int shard_id,
- std::string_view marker, librados::AioCompletion* c);
- int get_info(const DoutPrefixProvider *dpp, int shard_id,
- RGWDataChangesLogInfo *info, std::string* errstr,
- optional_yield y);
-
+ asio::awaitable<RGWDataChangesLogInfo>
+ get_info(const DoutPrefixProvider* dpp, int shard_id);
+ asio::awaitable<void>
+ trim_entries(const DoutPrefixProvider *dpp, int shard_id,
+ std::string_view marker);
+ void trim_entries(const DoutPrefixProvider *dpp, int shard_id,
+ std::string_view marker, librados::AioCompletion* c);
+ asio::awaitable<void>
+ trim_generations(const DoutPrefixProvider *dpp,
+ std::optional<uint64_t>& through);
+ asio::awaitable<void>
+ change_format(const DoutPrefixProvider *dpp, log_type type);
void mark_modified(int shard_id, const rgw_bucket_shard& bs, uint64_t gen);
auto read_clear_modified() {
std::unique_lock wl{modified_lock};
std::string get_sem_set_oid(int shard_id) const;
- int change_format(const DoutPrefixProvider *dpp, log_type type,
- optional_yield y);
- int trim_generations(const DoutPrefixProvider *dpp,
- std::optional<uint64_t>& through,
- optional_yield y);
asio::awaitable<std::pair<bc::flat_map<std::string, uint64_t>,
std::string>>
read_sems(int index, std::string cursor);
#include "common/ceph_json.h"
#include "common/strtol.h"
+#include "rgw/async_utils.h"
#include "rgw_rest.h"
#include "rgw_op.h"
#include "rgw_rest_s3.h"
// Note that last_marker is updated to be the marker of the last
// entry listed
- op_ret = static_cast<rgw::sal::RadosStore*>(driver)->svc()->
- datalog_rados->list_entries(this, shard_id, max_entries, entries,
- marker, &last_marker, &truncated, nullptr, y);
+ auto store = static_cast<rgw::sal::RadosStore*>(driver);
+ op_ret = rgw::run_coro(
+ this,
+ store->get_io_context(),
+ store->svc()->datalog_rados->list_entries(this, shard_id,
+ max_entries, marker),
+ std::tie(entries, last_marker, truncated),
+ "RGWDataChangesLog::list_entries", y);
+
RGWDataChangesLogInfo info;
- op_ret = static_cast<rgw::sal::RadosStore*>(driver)->svc()->
- datalog_rados->get_info(this, shard_id, &info, nullptr, y);
+ op_ret = rgw::run_coro(
+ this,
+ store->get_io_context(),
+ store->svc()->datalog_rados->get_info(this, shard_id),
+ info, "RGWDataChangesLog::get_info", y);
last_update = info.last_update;
}
return;
}
- op_ret = static_cast<rgw::sal::RadosStore*>(driver)->svc()->
- datalog_rados->get_info(this, shard_id, &info, nullptr, y);
+ auto store = static_cast<rgw::sal::RadosStore*>(driver);
+ op_ret = rgw::run_coro(this, store->get_io_context(),
+ store->svc()->datalog_rados->get_info(this, shard_id),
+ info, "RGWDataChangesLog::get_info", y);
}
void RGWOp_DATALog_ShardInfo::send_response() {
return;
}
- op_ret = static_cast<rgw::sal::RadosStore*>(driver)->svc()->
- datalog_rados->trim_entries(this, shard_id, marker, nullptr, y);
+ auto store = static_cast<rgw::sal::RadosStore*>(driver);
+ op_ret = rgw::run_coro(
+ this, store->get_io_context(),
+ store->svc()->datalog_rados->trim_entries(this, shard_id, marker),
+ "RGWDataChangesLog::trim_entries", y);
}
// not in header to avoid pulling in rgw_sync.h
int send_request(const DoutPrefixProvider *dpp) override {
set_status() << "sending request";
cn = stack->create_completion_notifier();
- return store->svc()->datalog_rados->trim_entries(dpp, shard, marker,
- cn->completion());
+ // Call cannot fail, all errors will be reported through the completion
+ store->svc()->datalog_rados->trim_entries(dpp, shard, marker,
+ cn->completion());
+ return 0;
}
int request_complete() override {
int r = cn->completion()->get_return_value();
// vim: ts=8 sw=2 smarttab ft=cpp
/*
- * Copyright (C) 2025 IBM
+ * Copyright (C) 2025 IBM
*/
#include <cerrno>
#include "radosgw-admin/orphan.h"
#include "radosgw-admin/sync_checkpoint.h"
+
+#include "rgw/async_utils.h"
+
#include "rgw_user.h"
#include "rgw_otp.h"
#include "rgw_rados.h"
} while (0)
using namespace std;
+using rgw::run_coro;
inline int posix_errortrans(int r)
{
}
}
-int run_coro(asio::awaitable<void> coro, std::string_view name) {
- try {
- // Blocking in startup code, not ideal, but won't hurt anything.
- asio::co_spawn(static_cast<rgw::sal::RadosStore*>(driver)->get_io_context(),
- std::move(coro),
- async::use_blocked);
- } catch (boost::system::system_error& e) {
- ldpp_dout(dpp(), -1) << name << ": failed: " << e.what() << dendl;
- return ceph::from_error_code(e.code());
- } catch (std::exception& e) {
- ldpp_dout(dpp(), -1) << name << ": failed: " << e.what() << dendl;
- return -EIO;
- }
- return 0;
-}
-
-
// This has an uncaught exception. Even if the exception is caught, the program
// would need to be terminated, so the warning is simply suppressed.
// coverity[root_function:SUPPRESS]
if (specified_shard_id) {
shard = shard_id;
}
- ret = run_coro(datalog->admin_sem_list(shard, max_entries, marker,
+ std::string err;
+ ret = run_coro(dpp(), context_pool,
+ datalog->admin_sem_list(shard, max_entries, marker,
cout, *formatter),
- "datalog seamphore list");
+ &err);
if (ret < 0) {
+ std::cerr << "datalog semaphore list: " << err << std::endl;
return ret;
}
}
std::cerr << "Specify the semaphore key with --marker." << std::endl;
return -EINVAL;
}
+ std::string errstr;
auto datalog = static_cast<rgw::sal::RadosStore*>(driver)
->svc()->datalog_rados;
- ret = run_coro(datalog->admin_sem_reset(marker, count.value_or(0)),
- "datalog seamphore reset");
+ ret = rgw::run_coro(dpp(), context_pool,
+ datalog->admin_sem_reset(marker, count.value_or(0)),
+ &errstr);
if (ret < 0) {
+ std::cerr << "datalog semaphore reset: " << errstr << std::endl;
return ret;
}
}
if (opt_cmd == OPT::DATALOG_LIST) {
formatter->open_array_section("entries");
- bool truncated;
+ bool truncated = false;
int count = 0;
if (max_entries < 0)
max_entries = 1000;
do {
std::vector<rgw_data_change_log_entry> entries;
if (specified_shard_id) {
- ret = datalog_svc->list_entries(dpp(), shard_id, max_entries - count,
- entries, marker,
- &marker, &truncated,
- &errstr, null_yield);
+ ret = run_coro(
+ dpp(),
+ context_pool,
+ datalog_svc->list_entries(dpp(), shard_id, max_entries - count,
+ marker),
+ std::tie(entries, marker, truncated),
+ &errstr);
} else {
- ret = datalog_svc->list_entries(dpp(), max_entries - count, entries,
- log_marker, &truncated, null_yield);
+ ret = run_coro(
+ dpp(),
+ context_pool,
+ datalog_svc->list_entries(dpp(), max_entries - count, log_marker),
+ std::tie(entries, log_marker, truncated),
+ &errstr);
}
if (ret < 0) {
cerr << "ERROR: datalog_svc->list_entries(): " << errstr << ": "
for (; i < g_ceph_context->_conf->rgw_data_log_num_shards; i++) {
vector<cls::log::entry> entries;
+ std::string errstr;
RGWDataChangesLogInfo info;
- static_cast<rgw::sal::RadosStore*>(driver)->svc()->
- datalog_rados->get_info(dpp(), i, &info, nullptr, null_yield);
+
+ int r = run_coro(dpp(), context_pool,
+ static_cast<rgw::sal::RadosStore*>(driver)->svc()->
+ datalog_rados->get_info(dpp(), i),
+ info, &errstr);
+
+ if (r < 0) {
+ std::cerr << "datalog status: " << errstr << std::endl;
+ return -r;
+ }
::encode_json("info", info, formatter.get());
return EINVAL;
}
+ std::string errstr;
auto datalog = static_cast<rgw::sal::RadosStore*>(driver)->svc()->datalog_rados;
- ret = datalog->trim_entries(dpp(), shard_id, marker, nullptr, null_yield);
+ ret = run_coro(dpp(), context_pool,
+ datalog->trim_entries(dpp(), shard_id, marker),
+ &errstr);
if (ret < 0 && ret != -ENODATA) {
- cerr << "ERROR: trim_entries(): " << cpp_strerror(-ret) << std::endl;
+ cerr << "ERROR: trim_entries(): " << errstr << std::endl;
return -ret;
}
}
return -EINVAL;
}
auto datalog = static_cast<rgw::sal::RadosStore*>(driver)->svc()->datalog_rados;
- ret = datalog->change_format(dpp(), *opt_log_type, null_yield);
+ std::string errstr;
+ ret = run_coro(dpp(), context_pool,
+ datalog->change_format(dpp(), *opt_log_type),
+ &errstr);
if (ret < 0) {
- cerr << "ERROR: change_format(): " << cpp_strerror(-ret) << std::endl;
+ cerr << "ERROR: change_format(): " << errstr << std::endl;
return -ret;
}
}
if (opt_cmd == OPT::DATALOG_PRUNE) {
auto datalog = static_cast<rgw::sal::RadosStore*>(driver)->svc()->datalog_rados;
std::optional<uint64_t> through;
- ret = datalog->trim_generations(dpp(), through, null_yield);
+ std::string errstr;
+ ret = run_coro(dpp(), context_pool,
+ datalog->trim_generations(dpp(), through),
+ &errstr);
if (ret < 0) {
- cerr << "ERROR: trim_generations(): " << cpp_strerror(-ret) << std::endl;
+ cerr << "ERROR: trim_generations(): " << errstr << std::endl;
return -ret;
}
RGWDataChangesLogMarker marker;
do {
std::vector<rgw_data_change_log_entry> entries;
- std::tie(entries, marker) =
+ std::tie(entries, marker, std::ignore) =
co_await datalog->list_entries(dpp, 1'000,
std::move(marker));
for (const auto& entry : entries) {