From 08c0ffdc8921d99069edce49dafb597be4d92d8b Mon Sep 17 00:00:00 2001 From: Adam Emerson Date: Thu, 21 Nov 2024 17:58:21 -0500 Subject: [PATCH] rgw/multisite/datalog: C++20 Coroutine Refactor Rewrite all of `RGWDataChangesLog` and supporting classes to use non-blocking, C++20 coroutines. Make interfaces for `optional_yield` and `librados::AioCompletion`. Signed-off-by: Adam C. Emerson --- src/rgw/driver/rados/rgw_datalog.cc | 1197 ++++++++++++----------- src/rgw/driver/rados/rgw_datalog.h | 199 ++-- src/rgw/driver/rados/rgw_log_backing.cc | 1079 ++++++++++---------- src/rgw/driver/rados/rgw_log_backing.h | 320 +++--- src/rgw/driver/rados/rgw_notify.cc | 1 + src/rgw/driver/rados/rgw_sal_rados.cc | 85 +- src/rgw/driver/rados/rgw_sal_rados.h | 2 + src/rgw/driver/rados/rgw_tools.cc | 1 + src/rgw/radosgw-admin/radosgw-admin.cc | 11 +- src/rgw/rgw_appmain.cc | 1 + src/rgw/rgw_realm_reloader.cc | 1 + src/rgw/rgw_sal.h | 3 + src/test/rgw/CMakeLists.txt | 5 +- src/test/rgw/test_log_backing.cc | 374 +++---- 14 files changed, 1607 insertions(+), 1672 deletions(-) diff --git a/src/rgw/driver/rados/rgw_datalog.cc b/src/rgw/driver/rados/rgw_datalog.cc index 49371553ca7ce..9a498666a0594 100644 --- a/src/rgw/driver/rados/rgw_datalog.cc +++ b/src/rgw/driver/rados/rgw_datalog.cc @@ -1,33 +1,49 @@ // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab ft=cpp +#include +#include #include // for std::shared_lock +#include #include -#include "common/async/yield_context.h" -#include "common/debug.h" -#include "common/containers.h" -#include "common/errno.h" -#include "common/error_code.h" +#include +#include +#include + +#include "include/fs_types.h" +#include "include/neorados/RADOS.hpp" #include "common/async/blocked_completion.h" #include "common/async/librados_completion.h" +#include "common/async/yield_context.h" + +#include "common/dout.h" +#include "common/containers.h" +#include "common/error_code.h" -#include "cls/fifo/cls_fifo_types.h" -#include "cls/log/cls_log_client.h" +#include "neorados/cls/fifo.h" +#include "neorados/cls/log.h" -#include "cls_fifo_legacy.h" +#include "rgw_asio_thread.h" +#include "rgw_bucket.h" #include "rgw_bucket_layout.h" #include "rgw_datalog.h" #include "rgw_log_backing.h" #include "rgw_tools.h" #include "rgw_sal_rados.h" -#define dout_context g_ceph_context static constexpr auto dout_subsys = ceph_subsys_rgw; -namespace bs = boost::system; -namespace lr = librados; +using namespace std::literals; + +namespace sys = boost::system; + +namespace nlog = ::neorados::cls::log; +namespace fifo = ::neorados::cls::fifo; + +namespace async = ceph::async; +namespace buffer = ceph::buffer; using ceph::containers::tiny_vector; @@ -104,11 +120,12 @@ class RGWDataChangesOmap final : public RGWDataChangesBE { std::vector oids; public: - RGWDataChangesOmap(lr::IoCtx& ioctx, + RGWDataChangesOmap(neorados::RADOS& r, + neorados::IOContext loc, RGWDataChangesLog& datalog, uint64_t gen_id, int num_shards) - : RGWDataChangesBE(ioctx, datalog, gen_id) { + : RGWDataChangesBE(r, std::move(loc), datalog, gen_id) { oids.reserve(num_shards); for (auto i = 0; i < num_shards; ++i) { oids.push_back(get_oid(i)); @@ -117,305 +134,214 @@ public: ~RGWDataChangesOmap() override = default; void prepare(ceph::real_time ut, const std::string& key, - ceph::buffer::list&& entry, entries& out) override { + buffer::list&& entry, entries& out) override { if (!std::holds_alternative(out)) { ceph_assert(std::visit([](const auto& v) { return std::empty(v); }, out)); out = centries(); } - cls::log::entry e; - cls_log_add_prepare_entry(e, ut, {}, key, entry); + cls::log::entry e{ut, {}, key, std::move(entry)}; std::get(out).push_back(std::move(e)); } - int push(const DoutPrefixProvider *dpp, int index, entries&& items, optional_yield y) override { - lr::ObjectWriteOperation op; - cls_log_add(op, std::get(items), true); - auto r = rgw_rados_operate(dpp, ioctx, oids[index], std::move(op), y); - if (r < 0) { - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ - << ": failed to push to " << oids[index] << cpp_strerror(-r) - << dendl; - } - return r; + asio::awaitable push(const DoutPrefixProvider *dpp, int index, + entries&& items) override { + co_await r.execute( + oids[index], loc, + neorados::WriteOp{}.exec(nlog::add(std::get(items))), + asio::use_awaitable); + co_return; } - int push(const DoutPrefixProvider *dpp, int index, ceph::real_time now, - const std::string& key, ceph::buffer::list&& bl, - optional_yield y) override { - lr::ObjectWriteOperation op; - cls_log_add(op, now, {}, key, bl); - auto r = rgw_rados_operate(dpp, ioctx, oids[index], std::move(op), y); - if (r < 0) { - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ - << ": failed to push to " << oids[index] - << cpp_strerror(-r) << dendl; - } - return r; + asio::awaitable push(const DoutPrefixProvider *dpp, int index, + ceph::real_time now, const std::string& key, + buffer::list&& bl) override { + co_await r.execute( + oids[index], loc, + neorados::WriteOp{}.exec(nlog::add(now, {}, key, std::move(bl))), + asio::use_awaitable); + co_return; } - int list(const DoutPrefixProvider *dpp, int index, int max_entries, - std::vector& entries, - std::optional marker, - std::string* out_marker, bool* truncated, - optional_yield y) override { - std::vector log_entries; - lr::ObjectReadOperation op; - cls_log_list(op, {}, {}, std::string(marker.value_or("")), - max_entries, log_entries, out_marker, truncated); - auto r = rgw_rados_operate(dpp, ioctx, oids[index], std::move(op), nullptr, y); - if (r == -ENOENT) { - *truncated = false; - return 0; - } - if (r < 0) { + + asio::awaitable, + std::string>> + list(const DoutPrefixProvider* dpp, int shard, + std::span entries, + std::string marker) override { + std::vector entrystore{entries.size()}; + + try { + auto [lentries, lmark] = + co_await nlog::list(r, oids[shard], loc, {}, {}, marker, entrystore, + asio::use_awaitable); + + entries = entries.first(lentries.size()); + std::ranges::transform(lentries, std::begin(entries), + [](const auto& e) { + rgw_data_change_log_entry entry; + entry.log_id = e.id; + entry.log_timestamp = e.timestamp; + auto liter = e.data.cbegin(); + decode(entry.entry, liter); + return entry; + }); + co_return std::make_tuple(std::move(entries), lmark); + } catch (const buffer::error& err) { ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ - << ": failed to list " << oids[index] - << cpp_strerror(-r) << dendl; - return r; - } - for (auto iter = log_entries.begin(); iter != log_entries.end(); ++iter) { - rgw_data_change_log_entry log_entry; - log_entry.log_id = iter->id; - auto rt = iter->timestamp; - log_entry.log_timestamp = rt; - auto liter = iter->data.cbegin(); - try { - decode(log_entry.entry, liter); - } catch (ceph::buffer::error& err) { + << ": failed to decode data changes log entry: " + << err.what() << dendl; + throw; + } catch (const sys::system_error& e) { + if (e.code() == sys::errc::no_such_file_or_directory) { + co_return std::make_tuple(entries.first(0), std::string{}); + } else { ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ - << ": failed to decode data changes log entry: " - << err.what() << dendl; - return -EIO; + << ": failed to list " << oids[shard] + << ": " << e.what() << dendl; + throw; } - entries.push_back(log_entry); - } - return 0; - } - int get_info(const DoutPrefixProvider *dpp, int index, - RGWDataChangesLogInfo *info, optional_yield y) override { - cls::log::header header; - lr::ObjectReadOperation op; - cls_log_info(op, &header); - auto r = rgw_rados_operate(dpp, ioctx, oids[index], std::move(op), nullptr, y); - if (r == -ENOENT) r = 0; - if (r < 0) { - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ - << ": failed to get info from " << oids[index] - << cpp_strerror(-r) << dendl; - } else { - info->marker = header.max_marker; - info->last_update = header.max_time; } - return r; } - int trim(const DoutPrefixProvider *dpp, int index, std::string_view marker, - optional_yield y) override { - lr::ObjectWriteOperation op; - cls_log_trim(op, {}, {}, {}, std::string(marker)); - auto r = rgw_rados_operate(dpp, ioctx, oids[index], std::move(op), y); - if (r == -ENOENT) r = -ENODATA; - if (r < 0 && r != -ENODATA) { + asio::awaitable get_info(const DoutPrefixProvider *dpp, + int index) override { + try { + auto header = co_await nlog::info(r, oids[index], loc, + asio::use_awaitable); + co_return RGWDataChangesLogInfo{.marker = header.max_marker, + .last_update = header.max_time}; + } catch (const sys::system_error& e) { + if (e.code() == sys::errc::no_such_file_or_directory) { + co_return RGWDataChangesLogInfo{}; + } ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ - << ": failed to get info from " << oids[index] - << cpp_strerror(-r) << dendl; + << ": failed to get info from " << oids[index] + << ": " << e.what() << dendl; + throw; } - return r; } - int trim(const DoutPrefixProvider *dpp, int index, std::string_view marker, - lr::AioCompletion* c) override { - lr::ObjectWriteOperation op; - cls_log_trim(op, {}, {}, {}, std::string(marker)); - auto r = ioctx.aio_operate(oids[index], c, &op, 0); - if (r == -ENOENT) r = -ENODATA; - if (r < 0) { - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ - << ": failed to get info from " << oids[index] - << cpp_strerror(-r) << dendl; + asio::awaitable trim(const DoutPrefixProvider *dpp, int index, + std::string_view marker) override { + try { + co_await nlog::trim(r, oids[index], loc, {}, std::string{marker}, + asio::use_awaitable); + co_return; + } catch (const sys::system_error& e) { + if (e.code() == sys::errc::no_such_file_or_directory) { + co_return; + } else { + ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ + << ": failed to get trim " << oids[index] + << ": " << e.what() << dendl; + throw; + } } - return r; } std::string_view max_marker() const override { return "99999999"; } - int is_empty(const DoutPrefixProvider *dpp, optional_yield y) override { - for (auto shard = 0u; shard < oids.size(); ++shard) { - std::vector log_entries; - lr::ObjectReadOperation op; - std::string out_marker; - bool truncated; - cls_log_list(op, {}, {}, {}, 1, log_entries, &out_marker, &truncated); - auto r = rgw_rados_operate(dpp, ioctx, oids[shard], std::move(op), nullptr, y); - if (r == -ENOENT) { - continue; - } - if (r < 0) { - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ - << ": failed to list " << oids[shard] - << cpp_strerror(-r) << dendl; - return r; - } - if (!log_entries.empty()) { - return 0; + asio::awaitable is_empty(const DoutPrefixProvider* dpp) override { + std::vector entrystore{1}; + for (auto oid = 0; oid < std::ssize(oids); ++oid) { + try { + auto [entries, marker] = + co_await nlog::list(r, oids[oid], loc, {}, {}, {}, entrystore, + asio::use_awaitable); + if (!entries.empty()) { + co_return false; + } + } catch (const sys::system_error& e) { + if (e.code() == sys::errc::no_such_file_or_directory) { + continue; + } } } - return 1; + co_return true; } }; class RGWDataChangesFIFO final : public RGWDataChangesBE { - using centries = std::vector; + using centries = std::vector; tiny_vector fifos; public: - RGWDataChangesFIFO(lr::IoCtx& ioctx, + RGWDataChangesFIFO(neorados::RADOS& r, + neorados::IOContext loc, RGWDataChangesLog& datalog, - uint64_t gen_id, int shards) - : RGWDataChangesBE(ioctx, datalog, gen_id), - fifos(shards, [&ioctx, this](std::size_t i, auto emplacer) { - emplacer.emplace(ioctx, get_oid(i)); + uint64_t gen_id, + int num_shards) + : RGWDataChangesBE(r, std::move(loc), datalog, gen_id), + fifos(num_shards, [&r, &loc, this](std::size_t i, auto emplacer) { + emplacer.emplace(r, get_oid(i), loc); }) {} ~RGWDataChangesFIFO() override = default; void prepare(ceph::real_time, const std::string&, - ceph::buffer::list&& entry, entries& out) override { + buffer::list&& entry, entries& out) override { if (!std::holds_alternative(out)) { ceph_assert(std::visit([](auto& v) { return std::empty(v); }, out)); out = centries(); } std::get(out).push_back(std::move(entry)); } - int push(const DoutPrefixProvider *dpp, int index, entries&& items, - optional_yield y) override { - auto r = fifos[index].push(dpp, std::get(items), y); - if (r < 0) { - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ - << ": unable to push to FIFO: " << get_oid(index) - << ": " << cpp_strerror(-r) << dendl; - } - return r; + asio::awaitable push(const DoutPrefixProvider* dpp, int index, + entries&& items) override { + co_return co_await fifos[index].push(dpp, std::get(items)); } - int push(const DoutPrefixProvider *dpp, int index, ceph::real_time, - const std::string&, ceph::buffer::list&& bl, - optional_yield y) override { - auto r = fifos[index].push(dpp, std::move(bl), y); - if (r < 0) { - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ - << ": unable to push to FIFO: " << get_oid(index) - << ": " << cpp_strerror(-r) << dendl; - } - return r; + asio::awaitable push(const DoutPrefixProvider* dpp, int index, + ceph::real_time, const std::string&, + buffer::list&& bl) override { + co_return co_await fifos[index].push(dpp, std::move(bl)); } - int list(const DoutPrefixProvider *dpp, int index, int max_entries, - std::vector& entries, - std::optional marker, std::string* out_marker, - bool* truncated, optional_yield y) override { - std::vector log_entries; - bool more = false; - auto r = fifos[index].list(dpp, max_entries, marker, &log_entries, &more, - y); - if (r < 0) { + asio::awaitable, + std::string>> + list(const DoutPrefixProvider* dpp, int shard, + std::span entries, + std::string marker) override { + try { + std::vector log_entries{entries.size()}; + auto [lentries, outmark] = + co_await fifos[shard].list(dpp, marker, log_entries); + entries = entries.first(lentries.size()); + std::ranges::transform(lentries, entries.begin(), + [](const auto& e) { + rgw_data_change_log_entry entry ; + entry.log_id = e.marker; + entry.log_timestamp = e.mtime; + auto liter = e.data.cbegin(); + decode(entry.entry, liter); + return entry; + }); + co_return std::make_tuple(std::move(entries), + outmark ? std::move(*outmark) : std::string{}); + } catch (const buffer::error& err) { ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ - << ": unable to list FIFO: " << get_oid(index) - << ": " << cpp_strerror(-r) << dendl; - return r; + << ": failed to decode data changes log entry: " + << err.what() << dendl; + throw; } - for (const auto& entry : log_entries) { - rgw_data_change_log_entry log_entry; - log_entry.log_id = entry.marker; - log_entry.log_timestamp = entry.mtime; - auto liter = entry.data.cbegin(); - try { - decode(log_entry.entry, liter); - } catch (const buffer::error& err) { - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ - << ": failed to decode data changes log entry: " - << err.what() << dendl; - return -EIO; - } - entries.push_back(std::move(log_entry)); - } - if (truncated) - *truncated = more; - if (out_marker && !log_entries.empty()) { - *out_marker = log_entries.back().marker; - } - return 0; } - int get_info(const DoutPrefixProvider *dpp, int index, - RGWDataChangesLogInfo *info, optional_yield y) override { + asio::awaitable + get_info(const DoutPrefixProvider *dpp, int index) override { auto& fifo = fifos[index]; - auto r = fifo.read_meta(dpp, y); - if (r < 0) { - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ - << ": unable to get FIFO metadata: " << get_oid(index) - << ": " << cpp_strerror(-r) << dendl; - return r; - } - rados::cls::fifo::info m; - fifo.meta(dpp, m, y); - auto p = m.head_part_num; - if (p < 0) { - info->marker = ""; - info->last_update = ceph::real_clock::zero(); - return 0; - } - rgw::cls::fifo::part_info h; - r = fifo.get_part_info(dpp, p, &h, y); - if (r < 0) { - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ - << ": unable to get part info: " << get_oid(index) << "/" << p - << ": " << cpp_strerror(-r) << dendl; - return r; - } - info->marker = rgw::cls::fifo::marker{p, h.last_ofs}.to_string(); - info->last_update = h.max_time; - return 0; + auto [marker, last_update] = co_await fifo.last_entry_info(dpp); + co_return RGWDataChangesLogInfo{ .marker = marker, + .last_update = last_update }; } - int trim(const DoutPrefixProvider *dpp, int index, std::string_view marker, - optional_yield y) override { - auto r = fifos[index].trim(dpp, marker, false, y); - if (r < 0) { - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ - << ": unable to trim FIFO: " << get_oid(index) - << ": " << cpp_strerror(-r) << dendl; - } - return r; - } - int trim(const DoutPrefixProvider *dpp, int index, std::string_view marker, - librados::AioCompletion* c) override { - int r = 0; - if (marker == rgw::cls::fifo::marker(0, 0).to_string()) { - rgw_complete_aio_completion(c, -ENODATA); - } else { - // This null_yield is used for lazily opening FIFOs. - // - // shouldn't exist, but it can't be eliminated - // since your caller is an RGWCoroutine in the data sync code. - // - // It can be eliminated after Reef when we can get rid of - // AioCompletion entirely. - fifos[index].trim(dpp, marker, false, c, null_yield); - } - return r; + asio::awaitable trim(const DoutPrefixProvider *dpp, int index, + std::string_view marker) override { + co_await fifos[index].trim(dpp, std::string{marker}, false); } std::string_view max_marker() const override { - static const std::string mm = - rgw::cls::fifo::marker::max().to_string(); - return std::string_view(mm); + static const auto max_mark = fifo::FIFO::max_marker(); + return std::string_view(max_mark); } - int is_empty(const DoutPrefixProvider *dpp, optional_yield y) override { - std::vector log_entries; - bool more = false; + asio::awaitable is_empty(const DoutPrefixProvider *dpp) override { + std::vector entrystore; for (auto shard = 0u; shard < fifos.size(); ++shard) { - auto r = fifos[shard].list(dpp, 1, {}, &log_entries, &more, y); - if (r < 0) { - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ - << ": unable to list FIFO: " << get_oid(shard) - << ": " << cpp_strerror(-r) << dendl; - return r; - } - if (!log_entries.empty()) { - return 0; + auto [lentries, outmark] = + co_await fifos[shard].list(dpp, {}, entrystore); + if (!lentries.empty()) { + co_return false; } } - return 1; + co_return true; } }; @@ -425,9 +351,8 @@ RGWDataChangesLog::RGWDataChangesLog(CephContext* cct) prefix(get_prefix()), changes(cct->_conf->rgw_data_log_changes_size) {} -bs::error_code DataLogBackends::handle_init(entries_t e) noexcept { +void DataLogBackends::handle_init(entries_t e) { std::unique_lock l(m); - for (const auto& [gen_id, gen] : e) { if (gen.pruned) { lderr(datalog.cct) @@ -443,88 +368,116 @@ bs::error_code DataLogBackends::handle_init(entries_t e) noexcept { switch (gen.type) { case log_type::omap: emplace(gen_id, - boost::intrusive_ptr(new RGWDataChangesOmap(ioctx, datalog, gen_id, shards)) - ); + boost::intrusive_ptr( + new RGWDataChangesOmap(rados, loc, datalog, gen_id, shards))); break; case log_type::fifo: emplace(gen_id, - boost::intrusive_ptr(new RGWDataChangesFIFO(ioctx, datalog, gen_id, shards)) - ); + boost::intrusive_ptr( + new RGWDataChangesFIFO(rados, loc, datalog, gen_id, shards))); break; default: lderr(datalog.cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ << ": IMPOSSIBLE: invalid log type: gen_id=" << gen_id << ", type" << gen.type << dendl; - return bs::error_code(EFAULT, bs::system_category()); + throw sys::system_error{EFAULT, sys::generic_category()}; } - } catch (const bs::system_error& err) { + } catch (const sys::system_error& err) { lderr(datalog.cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ << ": error setting up backend: gen_id=" << gen_id << ", err=" << err.what() << dendl; - return err.code(); + throw; } } - return {}; } -bs::error_code DataLogBackends::handle_new_gens(entries_t e) noexcept { - return handle_init(std::move(e)); + +void DataLogBackends::handle_new_gens(entries_t e) { + handle_init(std::move(e)); } -bs::error_code DataLogBackends::handle_empty_to(uint64_t new_tail) noexcept { + +void DataLogBackends::handle_empty_to(uint64_t new_tail) { std::unique_lock l(m); auto i = cbegin(); if (i->first < new_tail) { - return {}; + return; } if (new_tail >= (cend() - 1)->first) { lderr(datalog.cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ << ": ERROR: attempt to trim head: new_tail=" << new_tail << dendl; - return bs::error_code(EFAULT, bs::system_category()); + throw sys::system_error(EFAULT, sys::system_category()); } erase(i, upper_bound(new_tail)); - return {}; } int RGWDataChangesLog::start(const DoutPrefixProvider *dpp, const RGWZone* _zone, const RGWZoneParams& zoneparams, - rgw::sal::RadosStore* store) + rgw::sal::RadosStore* _store) { zone = _zone; + store = _store; ceph_assert(zone); auto defbacking = to_log_type( cct->_conf.get_val("rgw_default_data_log_backing")); // Should be guaranteed by `set_enum_allowed` ceph_assert(defbacking); auto log_pool = zoneparams.log_pool; - auto r = rgw_init_ioctx(dpp, store->getRados()->get_rados_handle(), log_pool, ioctx, true, false); - if (r < 0) { + + try { + std::exception_ptr eptr; + std::tie(eptr, loc) = + asio::co_spawn(store->get_io_context(), + rgw::init_iocontext(dpp, store->get_neorados(), + log_pool, rgw::create, + asio::use_awaitable), + async::use_blocked); + if (eptr) { + std::rethrow_exception(eptr); + } + } catch (const sys::system_error& e) { ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ - << ": Failed to initialized ioctx, r=" << r - << ", pool=" << log_pool << dendl; - return -r; + << ": Failed to initialized ioctx: " << e.what() + << ", pool=" << log_pool << dendl; + return ceph::from_error_code(e.code()); + } catch (const std::exception& e) { + ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ + << ": Failed to initialized ioctx: " << e.what() + << ", pool=" << log_pool << dendl; + return -EIO; } - // This null_yield is in startup code, so it doesn't matter that much. - auto besr = logback_generations::init( - dpp, ioctx, metadata_log_oid(), [this](uint64_t gen_id, int shard) { - return get_oid(gen_id, shard); - }, - num_shards, *defbacking, null_yield, *this); - - - if (!besr) { + // Blocking in startup code, not ideal, but won't hurt anything. + try { + std::exception_ptr eptr; + std::tie(eptr, bes) = + asio::co_spawn( + store->get_io_context().get_executor(), + logback_generations::init( + dpp, store->get_neorados(), metadata_log_oid(), loc, + [this](uint64_t gen_id, int shard) { + return get_oid(gen_id, shard); + }, num_shards, *defbacking, *this), + async::use_blocked); + if (eptr) { + std::rethrow_exception(eptr); + } + } catch (const sys::system_error& e) { lderr(cct) << __PRETTY_FUNCTION__ << ": Error initializing backends: " - << besr.error().message() << dendl; - return ceph::from_error_code(besr.error()); + << e.what() << dendl; + return ceph::from_error_code(e.code()); + } catch (const std::exception& e) { + ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ + << ": Error initializing backends: " << e.what() + << dendl; + return -EIO; } - bes = std::move(*besr); - renew_thread = make_named_thread("rgw_dt_lg_renew", - &RGWDataChangesLog::renew_run, this); + asio::co_spawn(store->get_io_context().get_executor(), + renew_run(), asio::detached); return 0; } @@ -536,10 +489,11 @@ int RGWDataChangesLog::choose_oid(const rgw_bucket_shard& bs) { return static_cast(r); } -int RGWDataChangesLog::renew_entries(const DoutPrefixProvider *dpp) +asio::awaitable +RGWDataChangesLog::renew_entries(const DoutPrefixProvider* dpp) { if (!zone->log_data) - return 0; + co_return; /* we can't keep the bucket name as part of the cls::log::entry, and we need * it later, so we keep two lists under the map */ @@ -557,7 +511,7 @@ int RGWDataChangesLog::renew_entries(const DoutPrefixProvider *dpp) auto index = choose_oid(bs); rgw_data_change change; - bufferlist bl; + buffer::list bl; change.entity_type = ENTITY_TYPE_BUCKET; change.key = bs.get_key(); change.timestamp = ut; @@ -572,34 +526,24 @@ int RGWDataChangesLog::renew_entries(const DoutPrefixProvider *dpp) auto& [buckets, entries] = p; auto now = real_clock::now(); - - // This null_yield can stay (for now) as we're in our own thread. - auto ret = be->push(dpp, index, std::move(entries), null_yield); - if (ret < 0) { - /* we don't really need to have a special handling for failed cases here, - * as this is just an optimization. */ - ldpp_dout(dpp, -1) << "ERROR: be->push() returned " << ret << dendl; - return ret; - } - + co_await be->push(dpp, index, std::move(entries)); auto expiration = now; expiration += ceph::make_timespan(cct->_conf->rgw_data_log_window); for (auto& [bs, gen] : buckets) { update_renewed(bs, gen, expiration); } } - - return 0; + co_return; } auto RGWDataChangesLog::_get_change(const rgw_bucket_shard& bs, uint64_t gen) -> ChangeStatusPtr { - ceph_assert(ceph_mutex_is_locked(lock)); ChangeStatusPtr status; if (!changes.find({bs, gen}, status)) { - status = std::make_shared(); + status = std::make_shared(store->get_io_context() + .get_executor()); changes.add({bs, gen}, status); } return status; @@ -633,15 +577,21 @@ int RGWDataChangesLog::get_log_shard_id(rgw_bucket& bucket, int shard_id) { return choose_oid(bs); } -bool RGWDataChangesLog::filter_bucket(const DoutPrefixProvider *dpp, - const rgw_bucket& bucket, - optional_yield y) const +asio::awaitable +RGWDataChangesLog::filter_bucket(const DoutPrefixProvider *dpp, + const rgw_bucket& bucket) const { if (!bucket_filter) { - return true; + co_return true; } - return bucket_filter(bucket, y, dpp); + co_return co_await asio::spawn( + co_await asio::this_coro::executor, + [this, dpp, &bucket](asio::yield_context yc) { + optional_yield y(yc); + return bucket_filter(bucket, y, dpp); + }, asio::use_awaitable); + co_return true; } std::string RGWDataChangesLog::get_oid(uint64_t gen_id, int i) const { @@ -650,19 +600,20 @@ std::string RGWDataChangesLog::get_oid(uint64_t gen_id, int i) const { fmt::format("{}.{}", prefix, i)); } -int RGWDataChangesLog::add_entry(const DoutPrefixProvider *dpp, - const RGWBucketInfo& bucket_info, - const rgw::bucket_log_layout_generation& gen, - int shard_id, optional_yield y) +asio::awaitable +RGWDataChangesLog::add_entry(const DoutPrefixProvider* dpp, + const RGWBucketInfo& bucket_info, + const rgw::bucket_log_layout_generation& gen, + int shard_id) { if (!zone->log_data) { - return 0; + co_return; } auto& bucket = bucket_info.bucket; - if (!filter_bucket(dpp, bucket, y)) { - return 0; + if (!co_await filter_bucket(dpp, bucket)) { + co_return; } if (observer) { @@ -692,181 +643,313 @@ int RGWDataChangesLog::add_entry(const DoutPrefixProvider *dpp, /* no need to send, recently completed */ sl.unlock(); register_renew(bs, gen); - return 0; + co_return; } - RefCountedCond* cond; - if (status->pending) { - cond = status->cond; - - ceph_assert(cond); - - status->cond->get(); + co_await status->cond.async_wait(sl, asio::use_awaitable); sl.unlock(); - - int ret = cond->wait(); - cond->put(); - if (!ret) { - register_renew(bs, gen); - } - return ret; + register_renew(bs, gen); + co_return; } - status->cond = new RefCountedCond; + status->cond.notify(sl); status->pending = true; ceph::real_time expiration; + status->cur_sent = now; - int ret; - - do { - status->cur_sent = now; - - expiration = now; - expiration += ceph::make_timespan(cct->_conf->rgw_data_log_window); + expiration = now; + expiration += ceph::make_timespan(cct->_conf->rgw_data_log_window); - sl.unlock(); - - ceph::buffer::list bl; - rgw_data_change change; - change.entity_type = ENTITY_TYPE_BUCKET; - change.key = bs.get_key(); - change.timestamp = now; - change.gen = gen.gen; - encode(change, bl); - - ldpp_dout(dpp, 20) << "RGWDataChangesLog::add_entry() sending update with now=" << now << " cur_expiration=" << expiration << dendl; + sl.unlock(); - auto be = bes->head(); - // TODO: pass y once we fix the deadlock from https://tracker.ceph.com/issues/63373 - ret = be->push(dpp, index, now, change.key, std::move(bl), null_yield); + buffer::list bl; + rgw_data_change change; + change.entity_type = ENTITY_TYPE_BUCKET; + change.key = bs.get_key(); + change.timestamp = now; + change.gen = gen.gen; + encode(change, bl); - now = real_clock::now(); + ldpp_dout(dpp, 20) << "RGWDataChangesLog::add_entry() sending update with now=" << now << " cur_expiration=" << expiration << dendl; - sl.lock(); + auto be = bes->head(); + co_await be->push(dpp, index, now, change.key, std::move(bl)); - } while (!ret && real_clock::now() > expiration); + now = real_clock::now(); - cond = status->cond; + sl.lock(); status->pending = false; /* time of when operation started, not completed */ status->cur_expiration = status->cur_sent; status->cur_expiration += make_timespan(cct->_conf->rgw_data_log_window); - status->cond = nullptr; + status->cond.notify(sl); sl.unlock(); - cond->done(ret); - cond->put(); + co_return; +} - return ret; +int RGWDataChangesLog::add_entry(const DoutPrefixProvider* dpp, + const RGWBucketInfo& bucket_info, + const rgw::bucket_log_layout_generation& gen, + int shard_id, optional_yield y) +{ + std::exception_ptr eptr; + if (y) { + auto& yield = y.get_yield_context(); + try { + asio::co_spawn(yield.get_executor(), + add_entry(dpp, bucket_info, gen, shard_id), + yield); + } catch (const std::exception&) { + eptr = std::current_exception(); + } + } else { + maybe_warn_about_blocking(dpp); + eptr = asio::co_spawn(store->get_io_context().get_executor(), + add_entry(dpp, bucket_info, gen, shard_id), + async::use_blocked); + } + return ceph::from_exception(eptr); } -int DataLogBackends::list(const DoutPrefixProvider *dpp, int shard, int max_entries, - std::vector& entries, - std::string_view marker, std::string* out_marker, - bool* truncated, optional_yield y) +asio::awaitable, + std::string>> +DataLogBackends::list(const DoutPrefixProvider *dpp, int shard, + std::span entries, + std::string marker) { - const auto [start_id, start_cursor] = cursorgen(marker); - auto gen_id = start_id; + const auto [start_id, // Starting generation + start_cursor // Cursor to be used when listing the + // starting generation + ] = cursorgen(marker); + auto gen_id = start_id; // Current generation being listed + // Cursor with prepended generation, returned to caller std::string out_cursor; - while (max_entries > 0) { - std::vector gentries; + // Span to return to caller + auto entries_out = entries; + // Iterator (for inserting stuff into span) + auto out = entries_out.begin(); + // Allocated storage for raw listings from backend + std::vector gentries{entries.size()}; + while (out < entries_out.end()) { std::unique_lock l(m); auto i = lower_bound(gen_id); - if (i == end()) return 0; + if (i == end()) + break; auto be = i->second; l.unlock(); gen_id = be->gen_id; - auto r = be->list(dpp, shard, max_entries, gentries, - gen_id == start_id ? start_cursor : std::string{}, - &out_cursor, truncated, y); - if (r < 0) - return r; - - if (out_marker && !out_cursor.empty()) { - *out_marker = gencursor(gen_id, out_cursor); - } - for (auto& g : gentries) { - g.log_id = gencursor(gen_id, g.log_id); + auto inspan = std::span{gentries}.first(entries_out.end() - out); + // Since later generations continue listings from the + // first, start them at the beginning. + auto incursor = gen_id == start_id ? start_cursor : std::string{}; + auto [raw_entries, raw_cursor] + = co_await be->list(dpp, shard, inspan, incursor); + out = std::transform(std::make_move_iterator(raw_entries.begin()), + std::make_move_iterator(raw_entries.end()), + out, [gen_id](rgw_data_change_log_entry e) { + e.log_id = gencursor(gen_id, e.log_id); + return e; + }); + if (!raw_cursor.empty()) { + out_cursor = gencursor(gen_id, raw_cursor); + } else { + out_cursor.clear(); + break; } - if (int s = gentries.size(); s < 0 || s > max_entries) - max_entries = 0; - else - max_entries -= gentries.size(); - - std::move(gentries.begin(), gentries.end(), - std::back_inserter(entries)); ++gen_id; } - return 0; + entries_out = entries_out.first(out - entries_out.begin()); + co_return std::make_tuple(entries_out, std::move(out_cursor)); } -int RGWDataChangesLog::list_entries(const DoutPrefixProvider *dpp, int shard, int max_entries, - std::vector& entries, - std::string_view marker, - std::string* out_marker, bool* truncated, - optional_yield y) +asio::awaitable, + std::string>> +RGWDataChangesLog::list_entries(const DoutPrefixProvider* dpp, int shard, + int max_entries, std::string marker) { assert(shard < num_shards); - return bes->list(dpp, shard, max_entries, entries, marker, out_marker, - truncated, y); + if (max_entries <= 0) { + co_return std::make_tuple(std::vector{}, + std::string{}); + } + std::vector entries(max_entries); + entries.resize(max_entries); + auto [spanentries, outmark] = co_await bes->list(dpp, shard, entries, marker); + entries.resize(spanentries.size()); + co_return std::make_tuple(std::move(entries), std::move(outmark)); } -int RGWDataChangesLog::list_entries(const DoutPrefixProvider *dpp, int max_entries, - std::vector& entries, - LogMarker& marker, bool *ptruncated, - optional_yield y) +int RGWDataChangesLog::list_entries( + const DoutPrefixProvider *dpp, int shard, + int max_entries, std::vector& entries, + std::string_view marker, std::string* out_marker, bool* truncated, + optional_yield y) { - bool truncated; - entries.clear(); - for (; marker.shard < num_shards && int(entries.size()) < max_entries; - marker.shard++, marker.marker.clear()) { - int ret = list_entries(dpp, marker.shard, max_entries - entries.size(), - entries, marker.marker, NULL, &truncated, y); - if (ret == -ENOENT) { - continue; + assert(shard < num_shards); + std::exception_ptr eptr; + std::tuple, + std::string> out; + if (std::ssize(entries) < max_entries) { + entries.resize(max_entries); + } + if (y) { + auto& yield = y.get_yield_context(); + try { + out = asio::co_spawn(yield.get_executor(), + bes->list(dpp, shard, entries, + std::string{marker}), + yield); + } catch (const std::exception&) { + eptr = std::current_exception(); } - if (ret < 0) { - return ret; + } else { + maybe_warn_about_blocking(dpp); + std::tie(eptr, out) = asio::co_spawn(store->get_io_context().get_executor(), + bes->list(dpp, shard, entries, + std::string{marker}), + async::use_blocked); + } + if (eptr) { + return ceph::from_exception(eptr); + } + auto& [outries, outmark] = out; + if (auto size = std::ssize(outries); size < std::ssize(entries)) { + entries.resize(size); + } + if (truncated) { + *truncated = !outmark.empty(); + } + if (out_marker) { + *out_marker = std::move(outmark); + } + return 0; +} + +asio::awaitable, + RGWDataChangesLogMarker>> +RGWDataChangesLog::list_entries(const DoutPrefixProvider *dpp, + int max_entries, RGWDataChangesLogMarker marker) +{ + if (max_entries <= 0) { + co_return std::make_tuple(std::vector{}, + RGWDataChangesLogMarker{}); + } + + std::vector entries(max_entries); + std::span remaining{entries}; + + do { + std::span outspan; + std::string outmark; + std::tie(outspan, outmark) = co_await bes->list(dpp, marker.shard, + remaining, marker.marker); + remaining = remaining.last(remaining.size() - outspan.size()); + if (!outmark.empty()) { + marker.marker = std::move(outmark); + } else if (outmark.empty() && marker.shard < (num_shards - 1)) { + ++marker.shard; + marker.marker.clear(); + } else { + marker.clear(); } - if (!truncated) { - *ptruncated = false; - return 0; + } while (!remaining.empty() && marker); + if (!remaining.empty()) { + entries.resize(entries.size() - remaining.size()); + } + co_return std::make_tuple(std::move(entries), std::move(marker)); +} + +int RGWDataChangesLog::list_entries(const DoutPrefixProvider *dpp,int max_entries, + std::vector& entries, + RGWDataChangesLogMarker& marker, bool *ptruncated, + optional_yield y) +{ + std::exception_ptr eptr; + std::tuple, + RGWDataChangesLogMarker> out; + if (std::ssize(entries) < max_entries) { + entries.resize(max_entries); + } + if (y) { + auto& yield = y.get_yield_context(); + try { + out = asio::co_spawn(yield.get_executor(), + list_entries(dpp, max_entries, + RGWDataChangesLogMarker{marker}), + yield); + } catch (const std::exception&) { + eptr = std::current_exception(); } + } else { + maybe_warn_about_blocking(dpp); + std::tie(eptr, out) = + asio::co_spawn(store->get_io_context().get_executor(), + list_entries(dpp, max_entries, + RGWDataChangesLogMarker{marker}), + async::use_blocked); + } + if (eptr) { + return ceph::from_exception(eptr); } - *ptruncated = (marker.shard < num_shards); + auto& [outries, outmark] = out; + if (auto size = std::ssize(outries); size < std::ssize(entries)) { + entries.resize(size); + } + if (ptruncated) { + *ptruncated = (outmark.shard > 0 || !outmark.marker.empty()); + } + marker = std::move(outmark); return 0; } -int RGWDataChangesLog::get_info(const DoutPrefixProvider *dpp, int shard_id, - RGWDataChangesLogInfo *info, optional_yield y) +int RGWDataChangesLog::get_info(const DoutPrefixProvider* dpp, int shard_id, + RGWDataChangesLogInfo* info, optional_yield y) { assert(shard_id < num_shards); auto be = bes->head(); - auto r = be->get_info(dpp, shard_id, info, y); + std::exception_ptr eptr; + if (y) { + auto& yield = y.get_yield_context(); + try { + *info = asio::co_spawn(yield.get_executor(), + be->get_info(dpp, shard_id), + yield); + } catch (const std::exception&) { + eptr = std::current_exception(); + } + } else { + maybe_warn_about_blocking(dpp); + std::tie(eptr, *info) = asio::co_spawn(store->get_io_context().get_executor(), + be->get_info(dpp, shard_id), + async::use_blocked); + } if (!info->marker.empty()) { info->marker = gencursor(be->gen_id, info->marker); } - return r; + return ceph::from_exception(eptr); } -int DataLogBackends::trim_entries(const DoutPrefixProvider *dpp, int shard_id, - std::string_view marker, optional_yield y) +asio::awaitable DataLogBackends::trim_entries( + const DoutPrefixProvider *dpp, int shard_id, + std::string_view marker) { - auto [target_gen, cursor] = cursorgen(marker); + auto [target_gen, cursor] = cursorgen(std::string{marker}); std::unique_lock l(m); const auto head_gen = (end() - 1)->second->gen_id; const auto tail_gen = begin()->first; - if (target_gen < tail_gen) return 0; + if (target_gen < tail_gen) + co_return; auto r = 0; for (auto be = lower_bound(0)->second; be->gen_id <= target_gen && be->gen_id <= head_gen && r >= 0; be = upper_bound(be->gen_id)->second) { l.unlock(); auto c = be->gen_id == target_gen ? cursor : be->max_marker(); - r = be->trim(dpp, shard_id, c, y); + co_await be->trim(dpp, shard_id, c); if (r == -ENOENT) r = -ENODATA; if (r == -ENODATA && be->gen_id < target_gen) @@ -875,86 +958,46 @@ int DataLogBackends::trim_entries(const DoutPrefixProvider *dpp, int shard_id, break; l.lock(); }; - return r; + co_return; } int RGWDataChangesLog::trim_entries(const DoutPrefixProvider *dpp, int shard_id, std::string_view marker, optional_yield y) { assert(shard_id < num_shards); - return bes->trim_entries(dpp, shard_id, marker, y); -} - -class GenTrim : public rgw::cls::fifo::Completion { -public: - DataLogBackends* const bes; - const int shard_id; - const uint64_t target_gen; - const std::string cursor; - const uint64_t head_gen; - const uint64_t tail_gen; - boost::intrusive_ptr be; - - GenTrim(const DoutPrefixProvider *dpp, DataLogBackends* bes, int shard_id, uint64_t target_gen, - std::string cursor, uint64_t head_gen, uint64_t tail_gen, - boost::intrusive_ptr be, - lr::AioCompletion* super) - : Completion(dpp, super), bes(bes), shard_id(shard_id), target_gen(target_gen), - cursor(std::move(cursor)), head_gen(head_gen), tail_gen(tail_gen), - be(std::move(be)) {} - - void handle(const DoutPrefixProvider *dpp, Ptr&& p, int r) { - auto gen_id = be->gen_id; - be.reset(); - if (r == -ENOENT) - r = -ENODATA; - if (r == -ENODATA && gen_id < target_gen) - r = 0; - if (r < 0) { - complete(std::move(p), r); - return; - } - - { - std::unique_lock l(bes->m); - auto i = bes->upper_bound(gen_id); - if (i == bes->end() || i->first > target_gen || i->first > head_gen) { - l.unlock(); - complete(std::move(p), -ENODATA); - return; - } - be = i->second; + std::exception_ptr eptr; + if (y) { + auto& yield = y.get_yield_context(); + try { + asio::co_spawn(yield.get_executor(), + bes->trim_entries(dpp, shard_id, marker), + yield); + } catch (const std::exception& e) { + eptr = std::current_exception(); } - auto c = be->gen_id == target_gen ? cursor : be->max_marker(); - be->trim(dpp, shard_id, c, call(std::move(p))); - } -}; - -void DataLogBackends::trim_entries(const DoutPrefixProvider *dpp, int shard_id, std::string_view marker, - librados::AioCompletion* c) -{ - auto [target_gen, cursor] = cursorgen(marker); - std::unique_lock l(m); - const auto head_gen = (end() - 1)->second->gen_id; - const auto tail_gen = begin()->first; - if (target_gen < tail_gen) { - l.unlock(); - rgw_complete_aio_completion(c, -ENODATA); - return; + } else { + maybe_warn_about_blocking(dpp); + eptr = asio::co_spawn(store->get_io_context().get_executor(), + bes->trim_entries(dpp, shard_id, marker), + async::use_blocked); } - auto be = begin()->second; - l.unlock(); - auto gt = std::make_unique(dpp, this, shard_id, target_gen, - std::string(cursor), head_gen, tail_gen, - be, c); + return ceph::from_exception(eptr); +} - auto cc = be->gen_id == target_gen ? cursor : be->max_marker(); - be->trim(dpp, shard_id, cc, GenTrim::call(std::move(gt))); +int RGWDataChangesLog::trim_entries(const DoutPrefixProvider* dpp, int shard_id, + std::string_view marker, + librados::AioCompletion* c) { + asio::co_spawn(store->get_io_context().get_executor(), + bes->trim_entries(dpp, shard_id, marker), + c); + return 0; } -int DataLogBackends::trim_generations(const DoutPrefixProvider *dpp, - std::optional& through, - optional_yield y) { + +asio::awaitable DataLogBackends::trim_generations( + const DoutPrefixProvider *dpp, + std::optional& through) +{ if (size() != 1) { std::vector candidates; { @@ -967,10 +1010,7 @@ int DataLogBackends::trim_generations(const DoutPrefixProvider *dpp, std::optional highest; for (auto& be : candidates) { - auto r = be->is_empty(dpp, y); - if (r < 0) { - return r; - } else if (r == 1) { + if (co_await be->is_empty(dpp)) { highest = be->gen_id; } else { break; @@ -979,83 +1019,89 @@ int DataLogBackends::trim_generations(const DoutPrefixProvider *dpp, through = highest; if (!highest) { - return 0; - } - auto ec = empty_to(dpp, *highest, y); - if (ec) { - return ceph::from_error_code(ec); + co_return; } + co_await empty_to(dpp, *highest); } - return ceph::from_error_code(remove_empty(dpp, y)); + co_await remove_empty(dpp); + co_return; } -int RGWDataChangesLog::trim_entries(const DoutPrefixProvider *dpp, int shard_id, std::string_view marker, - librados::AioCompletion* c) -{ - assert(shard_id < num_shards); - bes->trim_entries(dpp, shard_id, marker, c); - return 0; -} - bool RGWDataChangesLog::going_down() const { return down_flag; } RGWDataChangesLog::~RGWDataChangesLog() { + shutdown(); +} + +void RGWDataChangesLog::shutdown() { down_flag = true; - if (renew_thread.joinable()) { - renew_stop(); - renew_thread.join(); - } + renew_stop(); } -void RGWDataChangesLog::renew_run() noexcept { +asio::awaitable RGWDataChangesLog::renew_run() { static constexpr auto runs_per_prune = 150; auto run = 0; - for (;;) { - const DoutPrefix dp(cct, dout_subsys, "rgw data changes log: "); - ldpp_dout(&dp, 2) << "RGWDataChangesLog::ChangesRenewThread: start" << dendl; - int r = renew_entries(&dp); - if (r < 0) { - ldpp_dout(&dp, 0) << "ERROR: RGWDataChangesLog::renew_entries returned error r=" << r << dendl; - } + renew_timer.emplace(co_await asio::this_coro::executor); + std::string_view operation; + const DoutPrefix dp(cct, dout_subsys, "rgw data changes log: "); + for (;;) try { + ldpp_dout(&dp, 2) << "RGWDataChangesLog::ChangesRenewThread: start" + << dendl; + operation = "RGWDataChangesLog::renew_entries"sv; + co_await renew_entries(&dp); + operation = {}; + if (going_down()) + break; - if (going_down()) - break; + if (run == runs_per_prune) { + std::optional through; + ldpp_dout(&dp, 2) << "RGWDataChangesLog::ChangesRenewThread: pruning old generations" << dendl; + operation = "trim_generations"sv; + co_await bes->trim_generations(&dp, through); + operation = {}; + if (through) { + ldpp_dout(&dp, 2) + << "RGWDataChangesLog::ChangesRenewThread: pruned generations " + << "through " << *through << "." << dendl; + } else { + ldpp_dout(&dp, 2) + << "RGWDataChangesLog::ChangesRenewThread: nothing to prune." + << dendl; + } + run = 0; + } else { + ++run; + } - if (run == runs_per_prune) { - std::optional through; - ldpp_dout(&dp, 2) << "RGWDataChangesLog::ChangesRenewThread: pruning old generations" << dendl; - // This null_yield can stay, for now, as it's in its own thread. - trim_generations(&dp, through, null_yield); - if (r < 0) { - derr << "RGWDataChangesLog::ChangesRenewThread: failed pruning r=" - << r << dendl; - } else if (through) { - ldpp_dout(&dp, 2) << "RGWDataChangesLog::ChangesRenewThread: pruned generations " - << "through " << *through << "." << dendl; + int interval = cct->_conf->rgw_data_log_window * 3 / 4; + renew_timer->expires_after(std::chrono::seconds(interval)); + co_await renew_timer->async_wait(asio::use_awaitable); + } catch (sys::system_error& e) { + if (e.code() == asio::error::operation_aborted) { + ldpp_dout(&dp, 10) + << "RGWDataChangesLog::renew_entries canceled, going down" << dendl; + break; } else { - ldpp_dout(&dp, 2) << "RGWDataChangesLog::ChangesRenewThread: nothing to prune." - << dendl; + ldpp_dout(&dp, 0) + << "renew_thread: ERROR: " + << (operation.empty() ? operation : "_conf->rgw_data_log_window * 3 / 4; - std::unique_lock locker{renew_lock}; - renew_cond.wait_for(locker, std::chrono::seconds(interval)); - } } void RGWDataChangesLog::renew_stop() { - std::lock_guard l{renew_lock}; - renew_cond.notify_all(); + std::lock_guard l{lock}; + if (renew_timer) { + renew_timer->cancel(); + } } void RGWDataChangesLog::mark_modified(int shard_id, const rgw_bucket_shard& bs, uint64_t gen) @@ -1083,13 +1129,46 @@ std::string RGWDataChangesLog::max_marker() const { } int RGWDataChangesLog::change_format(const DoutPrefixProvider *dpp, log_type type, optional_yield y) { - return ceph::from_error_code(bes->new_backing(dpp, type, y)); + std::exception_ptr eptr; + if (y) { + auto& yield = y.get_yield_context(); + try { + asio::co_spawn(yield.get_executor(), + bes->new_backing(dpp, type), + yield); + } catch (const std::exception&) { + eptr = std::current_exception(); + } + } else { + maybe_warn_about_blocking(dpp); + eptr = asio::co_spawn(store->get_io_context().get_executor(), + bes->new_backing(dpp, type), + async::use_blocked); + } + return ceph::from_exception(eptr); } int RGWDataChangesLog::trim_generations(const DoutPrefixProvider *dpp, std::optional& through, optional_yield y) { - return bes->trim_generations(dpp, through, y); + std::exception_ptr eptr; + if (y) { + auto& yield = y.get_yield_context(); + try { + asio::co_spawn(yield.get_executor(), + bes->trim_generations(dpp, through), + yield); + } catch (const std::exception& e) { + eptr = std::current_exception(); + } + + } else { + maybe_warn_about_blocking(dpp); + eptr = asio::co_spawn(store->get_io_context().get_executor(), + bes->trim_generations(dpp, through), + async::use_blocked); + } + return ceph::from_exception(eptr); } void RGWDataChangesLogInfo::dump(Formatter *f) const @@ -1102,9 +1181,5 @@ void RGWDataChangesLogInfo::dump(Formatter *f) const void RGWDataChangesLogInfo::decode_json(JSONObj *obj) { JSONDecoder::decode_json("marker", marker, obj); - utime_t ut; - JSONDecoder::decode_json("last_update", ut, obj); - last_update = ut.to_real_time(); + JSONDecoder::decode_json("last_update", last_update, obj); } - - diff --git a/src/rgw/driver/rados/rgw_datalog.h b/src/rgw/driver/rados/rgw_datalog.h index f8ef38464115d..6537f821bde43 100644 --- a/src/rgw/driver/rados/rgw_datalog.h +++ b/src/rgw/driver/rados/rgw_datalog.h @@ -6,11 +6,16 @@ #include #include #include +#include +#include +#include #include #include #include #include +#include + #include #include #include @@ -18,33 +23,44 @@ #include -#include "common/async/yield_context.h" +#include "include/neorados/RADOS.hpp" + #include "include/buffer.h" #include "include/encoding.h" #include "include/function2.hpp" -#include "include/rados/librados.hpp" +#include "common/async/async_cond.h" +#include "common/async/yield_context.h" #include "common/ceph_context.h" #include "common/ceph_json.h" #include "common/ceph_time.h" #include "common/Formatter.h" #include "common/lru_map.h" -#include "common/RefCountedObj.h" #include "cls/log/cls_log_types.h" #include "rgw_basic_types.h" #include "rgw_log_backing.h" #include "rgw_sync_policy.h" -#include "rgw_zone.h" #include "rgw_trim_bilog.h" +#include "rgw_zone.h" +namespace asio = boost::asio; namespace bc = boost::container; enum DataLogEntityType { ENTITY_TYPE_UNKNOWN = 0, ENTITY_TYPE_BUCKET = 1, }; +inline std::ostream& operator <<(std::ostream& m, + const DataLogEntityType& t) { + switch (t) { + case ENTITY_TYPE_BUCKET: + return m << "bucket"; + default: + return m << "unknown"; + } +} struct rgw_data_change { DataLogEntityType entity_type; @@ -84,6 +100,13 @@ struct rgw_data_change { static void generate_test_instances(std::list& l); }; WRITE_CLASS_ENCODER(rgw_data_change) +inline std::ostream& operator <<(std::ostream& m, + const rgw_data_change& c) { + return m << "[entity_type: " << c.entity_type + << ", key: " << c.key + << ", timestamp: " << c.timestamp + << ", gen: " << c.gen << "]"; +} struct rgw_data_change_log_entry { std::string log_id; @@ -110,6 +133,12 @@ struct rgw_data_change_log_entry { void decode_json(JSONObj* obj); }; WRITE_CLASS_ENCODER(rgw_data_change_log_entry) +inline std::ostream& operator <<(std::ostream& m, + const rgw_data_change_log_entry& e) { + return m << "[log_id: " << e.log_id + << ", log_timestamp: " << e.log_timestamp + << ", entry: " << e.entry << "]"; +} struct RGWDataChangesLogInfo { std::string marker; @@ -124,6 +153,17 @@ struct RGWDataChangesLogMarker { std::string marker; RGWDataChangesLogMarker() = default; + RGWDataChangesLogMarker(int shard, std::string marker) + : shard(shard), marker(std::move(marker)) {} + + operator bool() const { + return (shard > 0 || !marker.empty()); + } + + void clear() { + shard = 0; + marker.clear(); + } }; class RGWDataChangesLog; @@ -163,12 +203,13 @@ class DataLogBackends final std::mutex m; RGWDataChangesLog& datalog; - DataLogBackends(librados::IoCtx& ioctx, - std::string oid, + DataLogBackends(neorados::RADOS& rados, + const neorados::Object oid, + const neorados::IOContext& loc, fu2::unique_function&& get_oid, int shards, RGWDataChangesLog& datalog) noexcept - : logback_generations(ioctx, oid, std::move(get_oid), + : logback_generations(rados, oid, loc, std::move(get_oid), shards), datalog(datalog) {} public: @@ -178,22 +219,19 @@ public: --i; return i->second; } - int list(const DoutPrefixProvider *dpp, int shard, int max_entries, - std::vector& entries, - std::string_view marker, std::string* out_marker, bool* truncated, - optional_yield y); - int trim_entries(const DoutPrefixProvider *dpp, int shard_id, - std::string_view marker, optional_yield y); - void trim_entries(const DoutPrefixProvider *dpp, int shard_id, std::string_view marker, - librados::AioCompletion* c); - - bs::error_code handle_init(entries_t e) noexcept override; - bs::error_code handle_new_gens(entries_t e) noexcept override; - bs::error_code handle_empty_to(uint64_t new_tail) noexcept override; - - int trim_generations(const DoutPrefixProvider *dpp, - std::optional& through, - optional_yield y); + asio::awaitable, + std::string>> + list(const DoutPrefixProvider *dpp, int shard, + std::span entries, + std::string marker); + asio::awaitable trim_entries(const DoutPrefixProvider *dpp, int shard_id, + std::string_view marker); + void handle_init(entries_t e) override; + void handle_new_gens(entries_t e) override; + void handle_empty_to(uint64_t new_tail) override; + + asio::awaitable trim_generations(const DoutPrefixProvider *dpp, + std::optional& through); }; struct BucketGen { @@ -231,7 +269,8 @@ inline bool operator <(const BucketGen& l, const BucketGen& r) { class RGWDataChangesLog { friend DataLogBackends; CephContext *cct; - librados::IoCtx ioctx; + rgw::sal::RadosStore* store = nullptr; + neorados::IOContext loc; rgw::BucketChangeObserver *observer = nullptr; const RGWZone* zone; std::unique_ptr bes; @@ -243,9 +282,8 @@ class RGWDataChangesLog { } std::string prefix; - ceph::mutex lock = ceph::make_mutex("RGWDataChangesLog::lock"); - ceph::shared_mutex modified_lock = - ceph::make_shared_mutex("RGWDataChangesLog::modified_lock"); + std::mutex lock; + std::shared_mutex modified_lock; bc::flat_map> modified_shards; std::atomic down_flag = { false }; @@ -255,8 +293,11 @@ class RGWDataChangesLog { ceph::real_time cur_expiration; ceph::real_time cur_sent; bool pending = false; - RefCountedCond* cond = nullptr; - ceph::mutex lock = ceph::make_mutex("RGWDataChangesLog::ChangeStatus"); + ceph::async::async_cond cond; + std::mutex lock; + + ChangeStatus(boost::asio::io_context::executor_type executor) + : cond(executor) {} }; using ChangeStatusPtr = std::shared_ptr; @@ -272,16 +313,16 @@ class RGWDataChangesLog { uint64_t gen, ceph::real_time expiration); - ceph::mutex renew_lock = ceph::make_mutex("ChangesRenewThread::lock"); - ceph::condition_variable renew_cond; - void renew_run() noexcept; + std::optional renew_timer; + asio::awaitable renew_run(); void renew_stop(); - std::thread renew_thread; - std::function bucket_filter; + std::function bucket_filter; bool going_down() const; - bool filter_bucket(const DoutPrefixProvider *dpp, const rgw_bucket& bucket, optional_yield y) const; - int renew_entries(const DoutPrefixProvider *dpp); + asio::awaitable filter_bucket(const DoutPrefixProvider* dpp, + const rgw_bucket& bucket) const; + asio::awaitable renew_entries(const DoutPrefixProvider *dpp); public: @@ -291,28 +332,39 @@ public: int start(const DoutPrefixProvider *dpp, const RGWZone* _zone, const RGWZoneParams& zoneparams, rgw::sal::RadosStore* store); int choose_oid(const rgw_bucket_shard& bs); - int add_entry(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, - const rgw::bucket_log_layout_generation& gen, int shard_id, - optional_yield y); + asio::awaitable add_entry(const DoutPrefixProvider *dpp, + const RGWBucketInfo& bucket_info, + const rgw::bucket_log_layout_generation& gen, + int shard_id); + int add_entry(const DoutPrefixProvider *dpp, + const RGWBucketInfo& bucket_info, + const rgw::bucket_log_layout_generation& gen, + int shard_id, optional_yield y); int get_log_shard_id(rgw_bucket& bucket, int shard_id); + asio::awaitable, + std::string>> + list_entries(const DoutPrefixProvider* dpp, int shard, int max_entries, + std::string marker); int list_entries(const DoutPrefixProvider *dpp, int shard, int max_entries, std::vector& entries, std::string_view marker, std::string* out_marker, bool* truncated, optional_yield y); + asio::awaitable, + RGWDataChangesLogMarker>> + list_entries(const DoutPrefixProvider *dpp, int max_entries, + RGWDataChangesLogMarker marker); + int list_entries(const DoutPrefixProvider *dpp, int max_entries, + std::vector& entries, + RGWDataChangesLogMarker& marker, bool* ptruncated, + optional_yield y); + int trim_entries(const DoutPrefixProvider *dpp, int shard_id, std::string_view marker, optional_yield y); - int trim_entries(const DoutPrefixProvider *dpp, int shard_id, std::string_view marker, - librados::AioCompletion* c); // :( + int trim_entries(const DoutPrefixProvider *dpp, int shard_id, + std::string_view marker, librados::AioCompletion* c); int get_info(const DoutPrefixProvider *dpp, int shard_id, RGWDataChangesLogInfo *info, optional_yield y); - using LogMarker = RGWDataChangesLogMarker; - - int list_entries(const DoutPrefixProvider *dpp, int max_entries, - std::vector& entries, - LogMarker& marker, bool* ptruncated, - optional_yield y); - void mark_modified(int shard_id, const rgw_bucket_shard& bs, uint64_t gen); auto read_clear_modified() { std::unique_lock wl{modified_lock}; @@ -338,14 +390,17 @@ public: int trim_generations(const DoutPrefixProvider *dpp, std::optional& through, optional_yield y); + void shutdown(); }; class RGWDataChangesBE : public boost::intrusive_ref_counter { protected: - librados::IoCtx& ioctx; - CephContext* const cct; + neorados::RADOS& r; + neorados::IOContext loc; RGWDataChangesLog& datalog; + CephContext* cct{r.cct()}; + std::string get_oid(int shard_id) { return datalog.get_oid(gen_id, shard_id); } @@ -355,34 +410,30 @@ public: const uint64_t gen_id; - RGWDataChangesBE(librados::IoCtx& ioctx, + RGWDataChangesBE(neorados::RADOS& r, + neorados::IOContext loc, RGWDataChangesLog& datalog, uint64_t gen_id) - : ioctx(ioctx), cct(static_cast(ioctx.cct())), - datalog(datalog), gen_id(gen_id) {} + : r(r), loc(std::move(loc)), datalog(datalog), gen_id(gen_id) {} virtual ~RGWDataChangesBE() = default; - virtual void prepare(ceph::real_time now, - const std::string& key, - ceph::buffer::list&& entry, - entries& out) = 0; - virtual int push(const DoutPrefixProvider *dpp, int index, entries&& items, - optional_yield y) = 0; - virtual int push(const DoutPrefixProvider *dpp, int index, ceph::real_time now, - const std::string& key, ceph::buffer::list&& bl, - optional_yield y) = 0; - virtual int list(const DoutPrefixProvider *dpp, int shard, int max_entries, - std::vector& entries, - std::optional marker, - std::string* out_marker, bool* truncated, - optional_yield y) = 0; - virtual int get_info(const DoutPrefixProvider *dpp, int index, - RGWDataChangesLogInfo *info, optional_yield y) = 0; - virtual int trim(const DoutPrefixProvider *dpp, int index, - std::string_view marker, optional_yield y) = 0; - virtual int trim(const DoutPrefixProvider *dpp, int index, - std::string_view marker, librados::AioCompletion* c) = 0; + virtual void prepare(ceph::real_time now, const std::string& key, + ceph::buffer::list&& entry, entries& out) = 0; + virtual asio::awaitable push(const DoutPrefixProvider *dpp, int index, + entries&& items) = 0; + virtual asio::awaitable push(const DoutPrefixProvider *dpp, int index, + ceph::real_time now, + const std::string& key, + ceph::buffer::list&& bl) = 0; + virtual asio::awaitable, + std::string>> + list(const DoutPrefixProvider* dpp, int shard, + std::span entries, std::string marker) = 0; + virtual asio::awaitable + get_info(const DoutPrefixProvider *dpp, int index) = 0; + virtual asio::awaitable trim(const DoutPrefixProvider *dpp, int index, + std::string_view marker) = 0; virtual std::string_view max_marker() const = 0; // 1 on empty, 0 on non-empty, negative on error. - virtual int is_empty(const DoutPrefixProvider *dpp, optional_yield y) = 0; + virtual asio::awaitable is_empty(const DoutPrefixProvider *dpp) = 0; }; diff --git a/src/rgw/driver/rados/rgw_log_backing.cc b/src/rgw/driver/rados/rgw_log_backing.cc index 75a238f6d447f..3a6efb10b3645 100644 --- a/src/rgw/driver/rados/rgw_log_backing.cc +++ b/src/rgw/driver/rados/rgw_log_backing.cc @@ -1,15 +1,35 @@ // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab ft=cpp -#include "cls/log/cls_log_client.h" -#include "cls/version/cls_version_client.h" +#include +#include +#include +#include + +#include +#include +#include +#include + +#include "include/neorados/RADOS.hpp" + +#include "common/async/blocked_completion.h" +#include "common/async/async_call.h" + +#include "neorados/cls/log.h" +#include "neorados/cls/version.h" +#include "neorados/cls/fifo.h" #include "rgw_log_backing.h" -#include "rgw_tools.h" -#include "cls_fifo_legacy.h" +#include "rgw_common.h" + using namespace std::chrono_literals; -namespace cb = ceph::buffer; + +namespace async = ceph::async; +namespace buffer = ceph::buffer; + +namespace version = neorados::cls::version; static constexpr auto dout_subsys = ceph_subsys_rgw; @@ -31,684 +51,647 @@ inline std::ostream& operator <<(std::ostream& m, const shard_check& t) { namespace { /// Return the shard type, and a bool to see whether it has entries. -shard_check -probe_shard(const DoutPrefixProvider *dpp, librados::IoCtx& ioctx, const std::string& oid, - bool& fifo_unsupported, optional_yield y) +asio::awaitable +probe_shard(const DoutPrefixProvider* dpp, neorados::RADOS& rados, + const neorados::Object& obj, const neorados::IOContext& loc, + bool& fifo_unsupported) { ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " probing oid=" << oid - << dendl; + << " probing obj=" << obj << dendl; if (!fifo_unsupported) { - std::unique_ptr fifo; - auto r = rgw::cls::fifo::FIFO::open(dpp, ioctx, oid, - &fifo, y, - std::nullopt, true); - switch (r) { - case 0: + sys::error_code ec; + auto fifo = co_await fifo::FIFO::open( + dpp, rados, obj, loc, + asio::redirect_error(asio::use_awaitable, ec), + std::nullopt, true); + if (!ec) { ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << ": oid=" << oid << " is FIFO" + << ": obj=" << obj << " is FIFO" << dendl; - return shard_check::fifo; - - case -ENODATA: + co_return shard_check::fifo; + } else if (ec == sys::errc::no_message_available) { ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << ": oid=" << oid << " is empty and therefore OMAP" + << ": obj=" << obj << " is empty and therefore OMAP" << dendl; - return shard_check::omap; - - case -ENOENT: + co_return shard_check::omap; + } else if (ec == sys::errc::no_such_file_or_directory) { ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << ": oid=" << oid << " does not exist" + << ": obj=" << obj << " does not exist" << dendl; - return shard_check::dne; - - case -EPERM: + co_return shard_check::dne; + } else if (ec == sys::errc::operation_not_permitted) { ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ << ": FIFO is unsupported, marking." << dendl; fifo_unsupported = true; - return shard_check::omap; - - default: + co_return shard_check::omap; + } else { ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << ": error probing: r=" << r - << ", oid=" << oid << dendl; - return shard_check::corrupt; + << ": error probing: " << ec.message() + << ", obj=" << obj << dendl; + co_return shard_check::corrupt; } } else { // Since FIFO is unsupported, OMAP is the only alternative - return shard_check::omap; + co_return shard_check::omap; } } -tl::expected -handle_dne(const DoutPrefixProvider *dpp, librados::IoCtx& ioctx, - log_type def, - std::string oid, - bool fifo_unsupported, - optional_yield y) +asio::awaitable handle_dne(const DoutPrefixProvider *dpp, + neorados::RADOS& rados, + const neorados::Object& obj, + const neorados::IOContext& loc, + log_type def, + bool fifo_unsupported) { if (def == log_type::fifo) { if (fifo_unsupported) { ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ << " WARNING: FIFO set as default but not supported by OSD. " << "Falling back to OMAP." << dendl; - return log_type::omap; + co_return log_type::omap; } - std::unique_ptr fifo; - auto r = rgw::cls::fifo::FIFO::create(dpp, ioctx, oid, - &fifo, y, - std::nullopt); - if (r < 0) { + try { + auto fifo = co_await fifo::FIFO::create(dpp, rados, obj, loc, + asio::use_awaitable); + } catch (const std::exception& e) { ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " error creating FIFO: r=" << r - << ", oid=" << oid << dendl; - return tl::unexpected(bs::error_code(-r, bs::system_category())); + << " error creating FIFO: " << e.what() + << ", obj=" << obj << dendl; + throw; } } - return def; + co_return def; } } -tl::expected -log_backing_type(const DoutPrefixProvider *dpp, - librados::IoCtx& ioctx, +asio::awaitable +log_backing_type(const DoutPrefixProvider* dpp, + neorados::RADOS& rados, + const neorados::IOContext& loc, log_type def, int shards, - const fu2::unique_function& get_oid, - optional_yield y) + const fu2::unique_function& get_oid) { auto check = shard_check::dne; bool fifo_unsupported = false; for (int i = 0; i < shards; ++i) { - auto c = probe_shard(dpp, ioctx, get_oid(i), fifo_unsupported, y); + auto c = co_await probe_shard(dpp, rados, neorados::Object(get_oid(i)), loc, + fifo_unsupported); if (c == shard_check::corrupt) - return tl::unexpected(bs::error_code(EIO, bs::system_category())); + throw sys::system_error(EIO, sys::generic_category()); if (c == shard_check::dne) continue; if (check == shard_check::dne) { check = c; continue; } - if (check != c) { ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ << " clashing types: check=" << check << ", c=" << c << dendl; - return tl::unexpected(bs::error_code(EIO, bs::system_category())); + throw sys::system_error(EIO, sys::generic_category()); } } if (check == shard_check::corrupt) { ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ << " should be unreachable!" << dendl; - return tl::unexpected(bs::error_code(EIO, bs::system_category())); + throw sys::system_error(EIO, sys::system_category()); } if (check == shard_check::dne) - return handle_dne(dpp, ioctx, - def, - get_oid(0), - fifo_unsupported, - y); + co_return co_await handle_dne(dpp, rados, get_oid(0), loc, def, + fifo_unsupported); - return (check == shard_check::fifo ? log_type::fifo : log_type::omap); + co_return (check == shard_check::fifo ? log_type::fifo : log_type::omap); } -bs::error_code log_remove(const DoutPrefixProvider *dpp, - librados::IoCtx& ioctx, - int shards, - const fu2::unique_function& get_oid, - bool leave_zero, - optional_yield y) +asio::awaitable log_remove( + const DoutPrefixProvider *dpp, + neorados::RADOS& rados, + const neorados::IOContext& loc, + int shards, + const fu2::unique_function& get_oid, + bool leave_zero) { - bs::error_code ec; + sys::error_code ec; for (int i = 0; i < shards; ++i) { auto oid = get_oid(i); - rados::cls::fifo::info info; - uint32_t part_header_size = 0, part_entry_overhead = 0; - - auto r = rgw::cls::fifo::get_meta(dpp, ioctx, oid, std::nullopt, &info, - &part_header_size, &part_entry_overhead, - 0, y, true); - if (r == -ENOENT) continue; - if (r == 0 && info.head_part_num > -1) { + auto [info, part_header_size, part_entry_overhead] + = co_await fifo::FIFO::get_meta(rados, oid, loc, std::nullopt, + asio::redirect_error(asio::use_awaitable, + ec)); + if (ec == sys::errc::no_such_file_or_directory) continue; + if (!ec && info.head_part_num > -1) { for (auto j = info.tail_part_num; j <= info.head_part_num; ++j) { - librados::ObjectWriteOperation op; - op.remove(); - auto part_oid = info.part_oid(j); - auto subr = rgw_rados_operate(dpp, ioctx, part_oid, std::move(op), y); - if (subr < 0 && subr != -ENOENT) { - if (!ec) - ec = bs::error_code(-subr, bs::system_category()); + sys::error_code subec; + co_await rados.execute(info.part_oid(j), + loc, neorados::WriteOp{}.remove(), + asio::redirect_error(asio::use_awaitable, subec)); + if (subec && subec != sys::errc::no_such_file_or_directory) { + if (!ec) ec = subec; ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << ": failed removing FIFO part: part_oid=" << part_oid - << ", subr=" << subr << dendl; + << ": failed removing FIFO part " << j + << ": " << subec.message() << dendl; } } } - if (r < 0 && r != -ENODATA) { - if (!ec) - ec = bs::error_code(-r, bs::system_category()); + if (ec && ec != sys::errc::no_message_available) { ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << ": failed checking FIFO part: oid=" << oid - << ", r=" << r << dendl; + << ": failed checking FIFO part: oid=" << oid + << ": " << ec.message() << dendl; } - librados::ObjectWriteOperation op; + neorados::WriteOp op; if (i == 0 && leave_zero) { // Leave shard 0 in existence, but remove contents and // omap. cls_lock stores things in the xattrs. And sync needs to // rendezvous with locks on generation 0 shard 0. - op.omap_set_header({}); - op.omap_clear(); - op.truncate(0); + op.set_omap_header({}) + .clear_omap() + .truncate(0); } else { op.remove(); } - r = rgw_rados_operate(dpp, ioctx, oid, std::move(op), y); - if (r < 0 && r != -ENOENT) { - if (!ec) - ec = bs::error_code(-r, bs::system_category()); + co_await rados.execute(oid, loc, std::move(op), + asio::redirect_error(asio::use_awaitable, ec)); + if (ec && ec != sys::errc::no_such_file_or_directory) { ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << ": failed removing shard: oid=" << oid - << ", r=" << r << dendl; + << ": failed removing shard: oid=" << oid + << ": " << ec.message() << dendl; } } - return ec; + if (ec) + throw sys::error_code(ec); + co_return; } logback_generations::~logback_generations() { if (watchcookie > 0) { - auto cct = static_cast(ioctx.cct()); - auto r = ioctx.unwatch2(watchcookie); - if (r < 0) { + auto cct = rados.cct(); + sys::error_code ec; + rados.unwatch(watchcookie, loc, asio::detached); + if (ec) { lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ << ": failed unwatching oid=" << oid - << ", r=" << r << dendl; + << ", " << ec.message() << dendl; } } } -bs::error_code logback_generations::setup(const DoutPrefixProvider *dpp, - log_type def, - optional_yield y) noexcept +asio::awaitable logback_generations::setup(const DoutPrefixProvider *dpp, + log_type def) { + bool must_create = false; try { // First, read. - auto cct = static_cast(ioctx.cct()); - auto res = read(dpp, y); - if (!res && res.error() != bs::errc::no_such_file_or_directory) { - return res.error(); - } - if (res) { - std::unique_lock lock(m); - std::tie(entries_, version) = std::move(*res); - } else { - // Are we the first? Then create generation 0 and the generations - // metadata. - librados::ObjectWriteOperation op; - auto type = log_backing_type(dpp, ioctx, def, shards, - [this](int shard) { - return this->get_oid(0, shard); - }, y); - if (!type) - return type.error(); - - logback_generation l; - l.type = *type; - - std::unique_lock lock(m); - version.ver = 1; - static constexpr auto TAG_LEN = 24; - version.tag.clear(); - append_rand_alpha(cct, version.tag, version.tag, TAG_LEN); - op.create(true); - cls_version_set(op, version); - cb::list bl; - entries_.emplace(0, std::move(l)); - encode(entries_, bl); - lock.unlock(); - - op.write_full(bl); - auto r = rgw_rados_operate(dpp, ioctx, oid, std::move(op), y); - if (r < 0 && r != -EEXIST) { - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << ": failed writing oid=" << oid - << ", r=" << r << dendl; - return bs::error_code(-r, bs::system_category()); + auto [es, v] = co_await read(dpp); + co_await async::async_dispatch( + strand, + [&] { + entries = std::move(es); + version = std::move(v); + }, asio::use_awaitable); + } catch (const sys::system_error& e) { + if (e.code() != sys::errc::no_such_file_or_directory) { + throw; + } + // No co_awaiting in a catch block. + must_create = true; + } + if (must_create) { + // Are we the first? Then create generation 0 and the generations + // metadata. + auto type = co_await log_backing_type(dpp, rados, loc, def, shards, + [this](int shard) { + return this->get_oid(0, shard); + }); + auto op = co_await async::async_dispatch( + strand, + [this, type] { + neorados::WriteOp op; + logback_generation l; + l.type = type; + version.ver = 1; + static constexpr auto TAG_LEN = 24; + version.tag.clear(); + append_rand_alpha(rados.cct(), version.tag, version.tag, TAG_LEN); + buffer::list bl; + entries.emplace(0, std::move(l)); + encode(entries, bl); + op.create(true) + .exec(version::set(version)) + .write_full(std::move(bl)); + return op; + }, asio::use_awaitable); + + sys::error_code ec; + co_await rados.execute(oid, loc, std::move(op), + asio::redirect_error(asio::use_awaitable, ec)); + if (ec && ec != sys::errc::file_exists) { + ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << ": failed writing oid=" << oid + << ", " << ec.message() << dendl; + throw sys::system_error(ec); + } + // Did someone race us? Then re-read. + if (ec == sys::errc::file_exists) { + auto [es, v] = co_await read(dpp); + if (es.empty()) { + throw sys::system_error{ + EIO, sys::generic_category(), + "Generations metadata object exists, but empty."}; } - // Did someone race us? Then re-read. - if (r != 0) { - res = read(dpp, y); - if (!res) - return res.error(); - if (res->first.empty()) - return bs::error_code(EIO, bs::system_category()); - auto l = res->first.begin()->second; - // In the unlikely event that someone raced us, created - // generation zero, incremented, then erased generation zero, - // don't leave generation zero lying around. - if (l.gen_id != 0) { - auto ec = log_remove(dpp, ioctx, shards, - [this](int shard) { - return this->get_oid(0, shard); - }, true, y); - if (ec) return ec; - } - std::unique_lock lock(m); - std::tie(entries_, version) = std::move(*res); + auto l = es.begin()->second; + // In the unlikely event that someone raced us, created + // generation zero, incremented, then erased generation zero, + // don't leave generation zero lying around. + if (l.gen_id != 0) { + co_await log_remove(dpp, rados, loc, shards, + [this](int shard) { + return this->get_oid(0, shard); + }, true); } + co_await async::async_dispatch( + strand, + [&] { + entries = std::move(es); + version = std::move(v); + }, asio::use_awaitable); } - // Pass all non-empty generations to the handler - std::unique_lock lock(m); - auto i = lowest_nomempty(entries_); - entries_t e; - std::copy(i, entries_.cend(), - std::inserter(e, e.end())); - m.unlock(); - auto ec = watch(); - if (ec) { - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << ": failed to re-establish watch, unsafe to continue: oid=" - << oid << ", ec=" << ec.message() << dendl; - } - return handle_init(std::move(e)); - } catch (const std::bad_alloc&) { - return bs::error_code(ENOMEM, bs::system_category()); } -} -bs::error_code logback_generations::update(const DoutPrefixProvider *dpp, optional_yield y) noexcept -{ + // Pass all non-empty generations to the handler + auto e = co_await async::async_dispatch( + strand, + [&] { + auto i = lowest_nomempty(entries); + entries_t e; + std::copy(i, entries.cend(), + std::inserter(e, e.end())); + return e; + }, asio::use_awaitable); try { - auto res = read(dpp, y); - if (!res) { - return res.error(); - } - - std::unique_lock l(m); - auto& [es, v] = *res; - if (v == version) { - // Nothing to do! - return {}; - } - - // Check consistency and prepare update - if (es.empty()) { - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << ": INCONSISTENCY! Read empty update." << dendl; - return bs::error_code(EFAULT, bs::system_category()); - } - auto cur_lowest = lowest_nomempty(entries_); - // Straight up can't happen - assert(cur_lowest != entries_.cend()); - auto new_lowest = lowest_nomempty(es); - if (new_lowest == es.cend()) { - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << ": INCONSISTENCY! Read update with no active head." << dendl; - return bs::error_code(EFAULT, bs::system_category()); - } - if (new_lowest->first < cur_lowest->first) { - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << ": INCONSISTENCY! Tail moved wrong way." << dendl; - return bs::error_code(EFAULT, bs::system_category()); - } - - std::optional highest_empty; - if (new_lowest->first > cur_lowest->first && new_lowest != es.begin()) { - --new_lowest; - highest_empty = new_lowest->first; - } - - entries_t new_entries; - - if ((es.end() - 1)->first < (entries_.end() - 1)->first) { - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << ": INCONSISTENCY! Head moved wrong way." << dendl; - return bs::error_code(EFAULT, bs::system_category()); - } - - if ((es.end() - 1)->first > (entries_.end() - 1)->first) { - auto ei = es.lower_bound((entries_.end() - 1)->first + 1); - std::copy(ei, es.end(), std::inserter(new_entries, new_entries.end())); - } - - // Everything checks out! + co_await watch(); + } catch (const std::exception& e) { + ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << ": failed to re-establish watch, unsafe to continue: oid=" + << oid << ", " << e.what() << dendl; + throw; + } + handle_init(std::move(e)); + co_return; +} - version = v; - entries_ = es; - l.unlock(); +asio::awaitable logback_generations::update(const DoutPrefixProvider *dpp) +{ + auto [es, v] = co_await read(dpp); + auto [do_nothing, highest_empty, new_entries] = + co_await async::async_dispatch( + strand, + [&]() -> std::tuple, entries_t> { + if (v == version) { + // Nothing to do! + return {true, {}, {}}; + } + // Check consistency and prepare update + if (es.empty()) { + throw sys::system_error{ + EFAULT, sys::generic_category(), + "logback_generations::update: INCONSISTENCY! Read empty update."}; + } + auto cur_lowest = lowest_nomempty(entries); + // Straight up can't happen + assert(cur_lowest != entries.cend()); + auto new_lowest = lowest_nomempty(es); + if (new_lowest == es.cend()) { + throw sys::system_error{ + EFAULT, sys::generic_category(), + "logback_generations::update: INCONSISTENCY! Read update with no " + "active head!"}; + } + if (new_lowest->first < cur_lowest->first) { + throw sys::system_error{ + EFAULT, sys::generic_category(), + "logback_generations::update: INCONSISTENCY! Tail moved wrong way."}; + } + std::optional highest_empty; + if (new_lowest->first > cur_lowest->first && new_lowest != es.begin()) { + --new_lowest; + highest_empty = new_lowest->first; + } + entries_t new_entries; - if (highest_empty) { - auto ec = handle_empty_to(*highest_empty); - if (ec) return ec; - } + if ((es.end() - 1)->first < (entries.end() - 1)->first) { + throw sys::system_error{ + EFAULT, sys::generic_category(), + "logback_generations::update: INCONSISTENCY! Head moved wrong way."}; + } + if ((es.end() - 1)->first > (entries.end() - 1)->first) { + auto ei = es.lower_bound((entries.end() - 1)->first + 1); + std::copy(ei, es.end(), std::inserter(new_entries, new_entries.end())); + } + // Everything checks out! + version = std::move(v); + entries = std::move(es); + return {false, highest_empty, new_entries}; + }, asio::use_awaitable); + + if (do_nothing) { + co_return; + } + if (highest_empty) { + handle_empty_to(*highest_empty); + } - if (!new_entries.empty()) { - auto ec = handle_new_gens(std::move(new_entries)); - if (ec) return ec; - } - } catch (const std::bad_alloc&) { - return bs::error_code(ENOMEM, bs::system_category()); + if (!new_entries.empty()) { + handle_new_gens(std::move(new_entries)); } - return {}; + co_return; } -auto logback_generations::read(const DoutPrefixProvider *dpp, optional_yield y) noexcept -> - tl::expected, bs::error_code> +auto logback_generations::read(const DoutPrefixProvider *dpp) + -> asio::awaitable> { - try { - librados::ObjectReadOperation op; - std::unique_lock l(m); - cls_version_check(op, version, VER_COND_GE); - l.unlock(); - obj_version v2; - cls_version_read(op, &v2); - cb::list bl; - op.read(0, 0, &bl, nullptr); - auto r = rgw_rados_operate(dpp, ioctx, oid, std::move(op), nullptr, y); - if (r < 0) { - if (r == -ENOENT) { - ldpp_dout(dpp, 5) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << ": oid=" << oid - << " not found" << dendl; - } else { - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << ": failed reading oid=" << oid - << ", r=" << r << dendl; - } - return tl::unexpected(bs::error_code(-r, bs::system_category())); - } - auto bi = bl.cbegin(); - entries_t e; - try { - decode(e, bi); - } catch (const cb::error& err) { - return tl::unexpected(err.code()); + neorados::ReadOp op; + op.exec(version::check(co_await async::async_dispatch( + strand, + [this] { + return version; + }, asio::use_awaitable), + VER_COND_GE)); + obj_version v2; + op.exec(version::read(&v2)); + buffer::list bl; + op.read(0, 0, &bl, nullptr); + sys::error_code ec; + co_await rados.execute(oid, loc, std::move(op), nullptr, + asio::redirect_error(asio::use_awaitable, ec)); + if (ec) { + if (ec == sys::errc::no_such_file_or_directory) { + ldpp_dout(dpp, 5) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << ": oid=" << oid + << " not found" << dendl; + } else { + ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << ": failed reading oid=" << oid + << ", " << ec.message() << dendl; } - return std::pair{ std::move(e), std::move(v2) }; - } catch (const std::bad_alloc&) { - return tl::unexpected(bs::error_code(ENOMEM, bs::system_category())); + throw sys::system_error(ec); } + auto bi = bl.cbegin(); + entries_t e; + decode(e, bi); + co_return std::pair{std::move(e), std::move(v2)}; } -bs::error_code logback_generations::write(const DoutPrefixProvider *dpp, entries_t&& e, - std::unique_lock&& l_, - optional_yield y) noexcept +// This *must* be executed in logback_generations::strand +// +// The return value must be used. If true, run update and retry. +asio::awaitable logback_generations::write(const DoutPrefixProvider *dpp, + entries_t&& e) { - auto l = std::move(l_); - ceph_assert(l.mutex() == &m && - l.owns_lock()); - try { - librados::ObjectWriteOperation op; - cls_version_check(op, version, VER_COND_GE); - cb::list bl; - encode(e, bl); - op.write_full(bl); - cls_version_inc(op); - auto oldv = version; - l.unlock(); - auto r = rgw_rados_operate(dpp, ioctx, oid, std::move(op), y); - if (r == 0) { - if (oldv != version) { - return { ECANCELED, bs::system_category() }; - } - entries_ = std::move(e); - version.inc(); - return {}; - } - if (r < 0 && r != -ECANCELED) { - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << ": failed reading oid=" << oid - << ", r=" << r << dendl; - return { -r, bs::system_category() }; - } - if (r == -ECANCELED) { - auto ec = update(dpp, y); - if (ec) { - return ec; - } else { - return { ECANCELED, bs::system_category() }; - } - } - } catch (const std::bad_alloc&) { - return { ENOMEM, bs::system_category() }; + ceph_assert(co_await asio::this_coro::executor == strand); + bool canceled = false; + buffer::list bl; + encode(e, bl); + neorados::WriteOp op; + op.exec(version::check(version, VER_COND_GE)) + .write_full(std::move(bl)) + .exec(version::inc()); + sys::error_code ec; + co_await rados.execute(oid, loc, std::move(op), + asio::redirect_error(asio::use_awaitable, ec)); + if (!ec) { + entries = std::move(e); + version.inc(); + } else if (ec == sys::errc::operation_canceled) { + canceled = true; + } else { + ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << ": failed reading oid=" << oid + << ", " << ec.message() << dendl; + throw sys::system_error(ec); } - return {}; + co_return canceled; } -bs::error_code logback_generations::watch() noexcept { - try { - auto cct = static_cast(ioctx.cct()); - auto r = ioctx.watch2(oid, &watchcookie, this); - if (r < 0) { - lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << ": failed to set watch oid=" << oid - << ", r=" << r << dendl; - return { -r, bs::system_category() }; - } - } catch (const std::bad_alloc&) { - return bs::error_code(ENOMEM, bs::system_category()); - } - return {}; +asio::awaitable logback_generations::watch() +{ + watchcookie = co_await rados.watch(oid, loc, std::nullopt, std::ref(*this), + asio::use_awaitable); + co_return; } -bs::error_code logback_generations::new_backing(const DoutPrefixProvider *dpp, - log_type type, - optional_yield y) noexcept { +asio::awaitable +logback_generations::new_backing(const DoutPrefixProvider* dpp, log_type type) { static constexpr auto max_tries = 10; - try { - auto ec = update(dpp, y); - if (ec) return ec; - auto tries = 0; - entries_t new_entries; - do { - std::unique_lock l(m); - auto last = entries_.end() - 1; - if (last->second.type == type) { - // Nothing to be done - return {}; - } - auto newgenid = last->first + 1; - logback_generation newgen; - newgen.gen_id = newgenid; - newgen.type = type; - new_entries.emplace(newgenid, newgen); - auto es = entries_; - es.emplace(newgenid, std::move(newgen)); - ec = write(dpp, std::move(es), std::move(l), y); - ++tries; - } while (ec == bs::errc::operation_canceled && - tries < max_tries); - if (tries >= max_tries) { - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << ": exhausted retry attempts." << dendl; - return ec; - } - - if (ec) { - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << ": write failed with ec=" << ec.message() << dendl; - return ec; - } + auto tries = 0; + entries_t new_entries; + bool canceled = false; + do { + co_await update(dpp); + canceled = + co_await asio::co_spawn( + strand, + [](logback_generations& l, log_type type, entries_t& new_entries, + const DoutPrefixProvider* dpp) + -> asio::awaitable { + auto last = l.entries.end() - 1; + if (last->second.type == type) { + // Nothing to be done + co_return false; + } + auto newgenid = last->first + 1; + logback_generation newgen; + newgen.gen_id = newgenid; + newgen.type = type; + new_entries.emplace(newgenid, newgen); + auto es = l.entries; + es.emplace(newgenid, std::move(newgen)); + co_return co_await l.write(dpp, std::move(es)); + }(*this, type, new_entries, dpp), + asio::use_awaitable); + ++tries; + } while (canceled && tries < max_tries); + if (tries >= max_tries) { + ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << ": exhausted retry attempts." << dendl; + throw sys::system_error(ECANCELED, sys::generic_category(), + "Exhausted retry attempts"); + } - cb::list bl, rbl; + co_await rados.notify(oid, loc, {}, 10s, asio::use_awaitable); + handle_new_gens(new_entries); + co_return; +} - auto r = rgw_rados_notify(dpp, ioctx, oid, bl, 10'000, &rbl, y); - if (r < 0) { - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << ": notify failed with r=" << r << dendl; - return { -r, bs::system_category() }; - } - ec = handle_new_gens(new_entries); - } catch (const std::bad_alloc&) { - return bs::error_code(ENOMEM, bs::system_category()); +asio::awaitable +logback_generations::empty_to(const DoutPrefixProvider* dpp, + uint64_t gen_id) { + static constexpr auto max_tries = 10; + auto tries = 0; + uint64_t newtail = 0; + bool canceled = false; + do { + co_await update(dpp); + canceled = + co_await asio::co_spawn( + strand, + [](logback_generations& l, uint64_t gen_id, + uint64_t& newtail, const DoutPrefixProvider* dpp) + -> asio::awaitable { + { + auto last = l.entries.end() - 1; + if (gen_id >= last->first) { + ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << ": Attempt to trim beyond the possible." + << dendl; + throw sys::system_error{EINVAL, sys::system_category()}; + } + } + auto es = l.entries; + auto ei = es.upper_bound(gen_id); + if (ei == es.begin()) { + // Nothing to be done. + co_return false; + } + for (auto i = es.begin(); i < ei; ++i) { + newtail = i->first; + i->second.pruned = ceph::real_clock::now(); + } + co_return co_await l.write(dpp, std::move(es)); + }(*this, gen_id, newtail, dpp), + asio::use_awaitable); + if (canceled) { + co_await update(dpp); + } + ++tries; + } while (canceled && tries < max_tries); + if (tries >= max_tries) { + ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << ": exhausted retry attempts." << dendl; + throw sys::system_error(ECANCELED, sys::generic_category(), + "Exhausted retry attempts"); } - return {}; + co_await rados.notify(oid, loc, {}, 10s, asio::use_awaitable); + handle_empty_to(newtail); + co_return; } -bs::error_code logback_generations::empty_to(const DoutPrefixProvider *dpp, - uint64_t gen_id, - optional_yield y) noexcept { +asio::awaitable +logback_generations::remove_empty(const DoutPrefixProvider* dpp) { static constexpr auto max_tries = 10; - try { - auto ec = update(dpp, y); - if (ec) return ec; - auto tries = 0; - uint64_t newtail = 0; - do { - std::unique_lock l(m); - { - auto last = entries_.end() - 1; - if (gen_id >= last->first) { - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << ": Attempt to trim beyond the possible." << dendl; - return bs::error_code(EINVAL, bs::system_category()); - } + co_await update(dpp); + auto tries = 0; + entries_t new_entries; + bool nothing_to_do = + co_await async::async_dispatch( + strand, + [this] { + ceph_assert(!entries.empty()); + auto i = lowest_nomempty(entries); + return i == entries.begin(); + }, asio::use_awaitable); + if (nothing_to_do) + co_return; + entries_t es; + auto now = ceph::real_clock::now(); + bool canceled = false; + do { + entries_t es2; + co_await async::async_dispatch( + strand, + [this, now, &es, &es2] { + std::copy_if(entries.cbegin(), entries.cend(), + std::inserter(es, es.end()), + [now](const auto& e) { + if (!e.second.pruned) + return false; + auto pruned = *e.second.pruned; + return (now - pruned) >= 1h; + }); + es2 = entries; + }, asio::use_awaitable); + for (const auto& [gen_id, e] : es) { + ceph_assert(e.pruned); + co_await log_remove(dpp, rados, loc, shards, + [this, gen_id = gen_id](int shard) { + return this->get_oid(gen_id, shard); + }, (gen_id == 0)); + if (auto i = es2.find(gen_id); i != es2.end()) { + es2.erase(i); } - auto es = entries_; - auto ei = es.upper_bound(gen_id); - if (ei == es.begin()) { - // Nothing to be done. - return {}; - } - for (auto i = es.begin(); i < ei; ++i) { - newtail = i->first; - i->second.pruned = ceph::real_clock::now(); - } - ec = write(dpp, std::move(es), std::move(l), y); - ++tries; - } while (ec == bs::errc::operation_canceled && - tries < max_tries); - if (tries >= max_tries) { - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << ": exhausted retry attempts." << dendl; - return ec; } - - if (ec) { - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << ": write failed with ec=" << ec.message() << dendl; - return ec; - } - - cb::list bl, rbl; - - auto r = rgw_rados_notify(dpp, ioctx, oid, bl, 10'000, &rbl, y); - if (r < 0) { - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << ": notify failed with r=" << r << dendl; - return { -r, bs::system_category() }; - } - ec = handle_empty_to(newtail); - } catch (const std::bad_alloc&) { - return bs::error_code(ENOMEM, bs::system_category()); + es.clear(); + canceled = co_await co_spawn( + strand, + write(dpp, std::move(es2)), + asio::use_awaitable); + if (canceled) { + co_await update(dpp); + } + ++tries; + } while (canceled && tries < max_tries); + if (tries >= max_tries) { + ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << ": exhausted retry attempts." << dendl; + throw sys::system_error(ECANCELED, sys::generic_category(), + "Exhausted retry attempts"); } - return {}; + co_await rados.notify(oid, loc, {}, 10s, asio::use_awaitable); } -bs::error_code logback_generations::remove_empty(const DoutPrefixProvider *dpp, optional_yield y) noexcept { - static constexpr auto max_tries = 10; - try { - auto ec = update(dpp, y); - if (ec) return ec; - auto tries = 0; - entries_t new_entries; - entries_t es; - auto now = ceph::real_clock::now(); - { - std::unique_lock l(m); - ceph_assert(!entries_.empty()); - { - auto i = lowest_nomempty(entries_); - if (i == entries_.begin()) { - return {}; - } - } - l.unlock(); - } - do { - std::copy_if(entries_.cbegin(), entries_.cend(), - std::inserter(es, es.end()), - [now](const auto& e) { - if (!e.second.pruned) - return false; - - auto pruned = *e.second.pruned; - return (now - pruned) >= 1h; - }); - auto es2 = entries_; - for (const auto& [gen_id, e] : es) { - ceph_assert(e.pruned); - auto ec = log_remove(dpp, ioctx, shards, - [this, gen_id = gen_id](int shard) { - return this->get_oid(gen_id, shard); - }, (gen_id == 0), y); - if (ec) { - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << ": Error pruning: gen_id=" << gen_id - << " ec=" << ec.message() << dendl; - } - if (auto i = es2.find(gen_id); i != es2.end()) { - es2.erase(i); - } +void logback_generations::operator ()(sys::error_code ec, + uint64_t notify_id, + uint64_t cookie, + uint64_t notifier_id, + bufferlist&& bl) { + co_spawn(rados.get_executor(), + handle_notify(ec, notify_id, cookie, notifier_id, std::move(bl)), + asio::detached); +} + +asio::awaitable +logback_generations::handle_notify(sys::error_code ec, + uint64_t notify_id, + uint64_t cookie, + uint64_t notifier_id, + bufferlist&& bl) { + const DoutPrefix dp(rados.cct(), dout_subsys, + "logback generations handle_notify: "); + if (!ec) { + if (notifier_id != my_id) { + try { + co_await update(&dp); + } catch (const std::exception& e) { + ldpp_dout(&dp, -1) + << __PRETTY_FUNCTION__ << ":" << __LINE__ + << ": update failed (" << e.what() + << "), no one to report to and no safe way to continue." + << dendl; + std::terminate(); } - std::unique_lock l(m); - es.clear(); - ec = write(dpp, std::move(es2), std::move(l), y); - ++tries; - } while (ec == bs::errc::operation_canceled && - tries < max_tries); - if (tries >= max_tries) { - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << ": exhausted retry attempts." << dendl; - return ec; } - + co_await rados.notify_ack(oid, loc, notify_id, watchcookie, {}, + asio::use_awaitable); + } else { + ec.clear(); + co_await rados.unwatch(watchcookie, loc, + asio::redirect_error(asio::use_awaitable, ec)); if (ec) { - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << ": write failed with ec=" << ec.message() << dendl; - return ec; + ldpp_dout(&dp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << ": failed to set unwatch oid=" << oid + << ", " << ec.message() << dendl; } - } catch (const std::bad_alloc&) { - return bs::error_code(ENOMEM, bs::system_category()); - } - return {}; -} -void logback_generations::handle_notify(uint64_t notify_id, - uint64_t cookie, - uint64_t notifier_id, - bufferlist& bl) -{ - auto cct = static_cast(ioctx.cct()); - const DoutPrefix dp(cct, dout_subsys, "logback generations handle_notify: "); - if (notifier_id != my_id) { - auto ec = update(&dp, null_yield); - if (ec) { - lderr(cct) + try { + co_await watch(); + } catch (const std::exception& e) { + ldpp_dout(&dp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << ": update failed, no one to report to and no safe way to continue." - << dendl; - abort(); + << ": failed to re-establish watch, unsafe to continue: oid=" + << oid << ": " << e.what() << dendl; + std::terminate(); } } - cb::list rbl; - ioctx.notify_ack(oid, notify_id, watchcookie, rbl); -} - -void logback_generations::handle_error(uint64_t cookie, int err) { - auto cct = static_cast(ioctx.cct()); - auto r = ioctx.unwatch2(watchcookie); - if (r < 0) { - lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << ": failed to set unwatch oid=" << oid - << ", r=" << r << dendl; - } - - auto ec = watch(); - if (ec) { - lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << ": failed to re-establish watch, unsafe to continue: oid=" - << oid << ", ec=" << ec.message() << dendl; - } } diff --git a/src/rgw/driver/rados/rgw_log_backing.h b/src/rgw/driver/rados/rgw_log_backing.h index 737d6725eb438..999aa0d98d7c2 100644 --- a/src/rgw/driver/rados/rgw_log_backing.h +++ b/src/rgw/driver/rados/rgw_log_backing.h @@ -8,28 +8,29 @@ #include #include -#include +#include +#include +#include #include #include #include -#include "include/rados/librados.hpp" +#include "include/neorados/RADOS.hpp" #include "include/encoding.h" -#include "include/expected.hpp" #include "include/function2.hpp" #include "cls/version/cls_version_types.h" -#include "common/async/yield_context.h" -#include "common/Formatter.h" #include "common/strtol.h" -namespace bc = boost::container; -namespace bs = boost::system; +#include "neorados/cls/fifo.h" -#include "cls_fifo_legacy.h" +namespace container = boost::container; +namespace sys = boost::system; +namespace asio = boost::asio; +namespace fifo = neorados::cls::fifo; /// Type of log backing, stored in the mark used in the quick check, /// and passed to checking functions. @@ -70,25 +71,22 @@ inline std::ostream& operator <<(std::ostream& m, const log_type& t) { } /// Look over the shards in a log and determine the type. -tl::expected -log_backing_type(const DoutPrefixProvider *dpp, - librados::IoCtx& ioctx, +asio::awaitable +log_backing_type(const DoutPrefixProvider* dpp, + neorados::RADOS& rados, + const neorados::IOContext& loc, log_type def, - int shards, //< Total number of shards - /// A function taking a shard number and - /// returning an oid. - const fu2::unique_function& get_oid, - optional_yield y); + int shards, + const fu2::unique_function& get_oid); /// Remove all log shards and associated parts of fifos. -bs::error_code log_remove(librados::IoCtx& ioctx, - int shards, //< Total number of shards - /// A function taking a shard number and - /// returning an oid. - const fu2::unique_function& get_oid, - bool leave_zero, - optional_yield y); - +asio::awaitable log_remove( + const DoutPrefixProvider *dpp, + neorados::RADOS& rados, + const neorados::IOContext& loc, + int shards, + const fu2::unique_function& get_oid, + bool leave_zero); struct logback_generation { uint64_t gen_id = 0; @@ -117,21 +115,23 @@ inline std::ostream& operator <<(std::ostream& m, const logback_generation& g) { << (g.pruned ? "PRUNED" : "NOT PRUNED") << "]"; } -class logback_generations : public librados::WatchCtx2 { +class logback_generations { public: - using entries_t = bc::flat_map; + using entries_t = container::flat_map; protected: - librados::IoCtx& ioctx; - logback_generations(librados::IoCtx& ioctx, - std::string oid, - fu2::unique_function&& get_oid, - int shards) noexcept - : ioctx(ioctx), oid(oid), get_oid(std::move(get_oid)), + neorados::RADOS& rados; + neorados::IOContext loc; + logback_generations( + neorados::RADOS& rados, + neorados::Object oid, + neorados::IOContext loc, + fu2::unique_function get_oid, + int shards) noexcept + : rados(rados), loc(std::move(loc)), oid(oid), get_oid(std::move(get_oid)), shards(shards) {} - uint64_t my_id = ioctx.get_instance_id(); + uint64_t my_id = rados.instance_id(); private: const std::string oid; @@ -145,16 +145,15 @@ private: uint64_t watchcookie = 0; obj_version version; - std::mutex m; - entries_t entries_; + asio::strand strand{ + asio::make_strand(rados.get_executor())}; + entries_t entries; - tl::expected, bs::error_code> - read(const DoutPrefixProvider *dpp, optional_yield y) noexcept; - bs::error_code write(const DoutPrefixProvider *dpp, entries_t&& e, std::unique_lock&& l_, - optional_yield y) noexcept; - bs::error_code setup(const DoutPrefixProvider *dpp, log_type def, optional_yield y) noexcept; - - bs::error_code watch() noexcept; + asio::awaitable> + read(const DoutPrefixProvider *dpp); + asio::awaitable write(const DoutPrefixProvider *dpp, entries_t&& e); + asio::awaitable setup(const DoutPrefixProvider *dpp, log_type def); + asio::awaitable watch(); auto lowest_nomempty(const entries_t& es) { return std::find_if(es.begin(), es.end(), @@ -166,68 +165,65 @@ private: public: /// For the use of watch/notify. - - void handle_notify(uint64_t notify_id, - uint64_t cookie, - uint64_t notifier_id, - bufferlist& bl) override final; - - void handle_error(uint64_t cookie, int err) override final; + void operator ()(sys::error_code ec, + uint64_t notify_id, + uint64_t cookie, + uint64_t notifier_id, + bufferlist&& bl); + asio::awaitable handle_notify(sys::error_code ec, + uint64_t notify_id, + uint64_t cookie, + uint64_t notifier_id, + bufferlist&& bl); /// Public interface - virtual ~logback_generations(); template - static tl::expected, bs::error_code> - init(const DoutPrefixProvider *dpp, librados::IoCtx& ioctx_, std::string oid_, - fu2::unique_function&& get_oid_, - int shards_, log_type def, optional_yield y, - Args&& ...args) noexcept { - try { - T* lgp = new T(ioctx_, std::move(oid_), - std::move(get_oid_), - shards_, std::forward(args)...); - std::unique_ptr lg(lgp); - lgp = nullptr; - auto ec = lg->setup(dpp, def, y); - if (ec) - return tl::unexpected(ec); - // Obnoxiousness for C++ Compiler in Bionic Beaver - return tl::expected, bs::error_code>(std::move(lg)); - } catch (const std::bad_alloc&) { - return tl::unexpected(bs::error_code(ENOMEM, bs::system_category())); - } + static asio::awaitable> init( + const DoutPrefixProvider *dpp, + neorados::RADOS& r_, + const neorados::Object& oid_, + const neorados::IOContext& loc_, + fu2::unique_function&& get_oid_, + int shards_, log_type def, + Args&& ...args) { + std::unique_ptr lg{new T(r_, oid_, loc_, + std::move(get_oid_), + shards_, std::forward(args)...)}; + co_await lg->setup(dpp, def); + co_return lg; } - bs::error_code update(const DoutPrefixProvider *dpp, optional_yield y) noexcept; + asio::awaitable update(const DoutPrefixProvider *dpp); - entries_t entries() const { - return entries_; + entries_t get_entries() const { + return entries; } - bs::error_code new_backing(const DoutPrefixProvider *dpp, log_type type, optional_yield y) noexcept; + asio::awaitable new_backing(const DoutPrefixProvider *dpp, + log_type type); - bs::error_code empty_to(const DoutPrefixProvider *dpp, uint64_t gen_id, optional_yield y) noexcept; + asio::awaitable empty_to(const DoutPrefixProvider *dpp, uint64_t gen_id); - bs::error_code remove_empty(const DoutPrefixProvider *dpp, optional_yield y) noexcept; + asio::awaitable remove_empty(const DoutPrefixProvider *dpp); // Callbacks, to be defined by descendant. /// Handle initialization on startup /// /// @param e All non-empty generations - virtual bs::error_code handle_init(entries_t e) noexcept = 0; + virtual void handle_init(entries_t e) = 0; /// Handle new generations. /// /// @param e Map of generations added since last update - virtual bs::error_code handle_new_gens(entries_t e) noexcept = 0; + virtual void handle_new_gens(entries_t e) = 0; /// Handle generations being marked empty /// /// @param new_tail Lowest non-empty generation - virtual bs::error_code handle_empty_to(uint64_t new_tail) noexcept = 0; + virtual void handle_empty_to(uint64_t new_tail) = 0; }; inline std::string gencursor(uint64_t gen_id, std::string_view cursor) { @@ -236,171 +232,79 @@ inline std::string gencursor(uint64_t gen_id, std::string_view cursor) { std::string(cursor)); } -inline std::pair -cursorgen(std::string_view cursor_) { - if (cursor_.empty()) { +inline std::pair +cursorgen(std::optional cursor_) { + if (!cursor_ || cursor_->empty()) { return { 0, "" }; } - std::string_view cursor = cursor_; + std::string_view cursor = *cursor_; if (cursor[0] != 'G') { - return { 0, cursor }; + return { 0, std::string{cursor} }; } cursor.remove_prefix(1); auto gen_id = ceph::consume(cursor); if (!gen_id || cursor[0] != '@') { - return { 0, cursor_ }; + return { 0, *cursor_ }; } cursor.remove_prefix(1); - return { *gen_id, cursor }; + return { *gen_id, std::string{cursor} }; } class LazyFIFO { - librados::IoCtx& ioctx; - std::string oid; + neorados::RADOS& r; + const std::string oid; + const neorados::IOContext loc; std::mutex m; - std::unique_ptr fifo; + std::unique_ptr fifo; - int lazy_init(const DoutPrefixProvider *dpp, optional_yield y) { + asio::awaitable lazy_init(const DoutPrefixProvider *dpp) { std::unique_lock l(m); if (fifo) { - return 0; + co_return; } else { l.unlock(); // FIFO supports multiple clients by design, so it's safe to // race to create them. - std::unique_ptr fifo_tmp; - auto r = rgw::cls::fifo::FIFO::create(dpp, ioctx, oid, &fifo_tmp, y); - if (r) { - return r; - } + auto fifo_tmp = co_await fifo::FIFO::create(dpp, r, oid, loc, + asio::use_awaitable); l.lock(); if (!fifo) { // We won the race fifo = std::move(fifo_tmp); } } - return 0; + l.unlock(); + co_return; } public: - LazyFIFO(librados::IoCtx& ioctx, std::string oid) - : ioctx(ioctx), oid(std::move(oid)) {} - - int read_meta(const DoutPrefixProvider *dpp, optional_yield y) { - auto r = lazy_init(dpp, y); - if (r < 0) return r; - return fifo->read_meta(dpp, y); - } - - int meta(const DoutPrefixProvider *dpp, rados::cls::fifo::info& info, optional_yield y) { - auto r = lazy_init(dpp, y); - if (r < 0) return r; - info = fifo->meta(); - return 0; - } - - int get_part_layout_info(const DoutPrefixProvider *dpp, - std::uint32_t& part_header_size, - std::uint32_t& part_entry_overhead, - optional_yield y) { - auto r = lazy_init(dpp, y); - if (r < 0) return r; - std::tie(part_header_size, part_entry_overhead) - = fifo->get_part_layout_info(); - return 0; - } - - int push(const DoutPrefixProvider *dpp, - const ceph::buffer::list& bl, - optional_yield y) { - auto r = lazy_init(dpp, y); - if (r < 0) return r; - return fifo->push(dpp, bl, y); - } - - int push(const DoutPrefixProvider *dpp, - ceph::buffer::list& bl, - librados::AioCompletion* c, - optional_yield y) { - auto r = lazy_init(dpp, y); - if (r < 0) return r; - fifo->push(dpp, bl, c); - return 0; - } - - int push(const DoutPrefixProvider *dpp, - const std::vector& data_bufs, - optional_yield y) { - auto r = lazy_init(dpp, y); - if (r < 0) return r; - return fifo->push(dpp, data_bufs, y); - } - - int push(const DoutPrefixProvider *dpp, - const std::vector& data_bufs, - librados::AioCompletion* c, - optional_yield y) { - auto r = lazy_init(dpp, y); - if (r < 0) return r; - fifo->push(dpp, data_bufs, c); - return 0; - } - - int list(const DoutPrefixProvider *dpp, - int max_entries, std::optional markstr, - std::vector* out, - bool* more, optional_yield y) { - auto r = lazy_init(dpp, y); - if (r < 0) return r; - return fifo->list(dpp, max_entries, markstr, out, more, y); - } - - int list(const DoutPrefixProvider *dpp, int max_entries, std::optional markstr, - std::vector* out, bool* more, - librados::AioCompletion* c, optional_yield y) { - auto r = lazy_init(dpp, y); - if (r < 0) return r; - fifo->list(dpp, max_entries, markstr, out, more, c); - return 0; - } - - int trim(const DoutPrefixProvider *dpp, std::string_view markstr, bool exclusive, optional_yield y) { - auto r = lazy_init(dpp, y); - if (r < 0) return r; - return fifo->trim(dpp, markstr, exclusive, y); - } + LazyFIFO(neorados::RADOS& r, std::string oid, neorados::IOContext loc) + : r(r), oid(std::move(oid)), loc(std::move(loc)) {} - int trim(const DoutPrefixProvider *dpp, std::string_view markstr, bool exclusive, librados::AioCompletion* c, - optional_yield y) { - auto r = lazy_init(dpp, y); - if (r < 0) return r; - fifo->trim(dpp, markstr, exclusive, c); - return 0; + template + asio::awaitable push(const DoutPrefixProvider *dpp, Args&& ...args) { + co_await lazy_init(dpp); + co_return co_await fifo->push(dpp, std::forward(args)..., + asio::use_awaitable); } - int get_part_info(const DoutPrefixProvider *dpp, int64_t part_num, rados::cls::fifo::part_header* header, - optional_yield y) { - auto r = lazy_init(dpp, y); - if (r < 0) return r; - return fifo->get_part_info(dpp, part_num, header, y); + asio::awaitable, std::optional>> + list(const DoutPrefixProvider *dpp, std::string markstr, + std::span entries) { + co_await lazy_init(dpp); + co_return co_await fifo->list(dpp, markstr, entries, asio::use_awaitable); } - int get_part_info(const DoutPrefixProvider *dpp, int64_t part_num, rados::cls::fifo::part_header* header, - librados::AioCompletion* c, optional_yield y) { - auto r = lazy_init(dpp, y); - if (r < 0) return r; - fifo->get_part_info(part_num, header, c); - return 0; + asio::awaitable trim(const DoutPrefixProvider *dpp, + std::string markstr, bool exclusive) { + co_await lazy_init(dpp); + co_return co_await fifo->trim(dpp, markstr, exclusive, asio::use_awaitable); } - int get_head_info(const DoutPrefixProvider *dpp, fu2::unique_function< - void(int r, rados::cls::fifo::part_header&&)>&& f, - librados::AioCompletion* c, - optional_yield y) { - auto r = lazy_init(dpp, y); - if (r < 0) return r; - fifo->get_head_info(dpp, std::move(f), c); - return 0; + asio::awaitable> + last_entry_info(const DoutPrefixProvider *dpp) { + co_await lazy_init(dpp); + co_return co_await fifo->last_entry_info(dpp, asio::use_awaitable); } }; diff --git a/src/rgw/driver/rados/rgw_notify.cc b/src/rgw/driver/rados/rgw_notify.cc index 03161fd81666f..77bba50d81a2f 100644 --- a/src/rgw/driver/rados/rgw_notify.cc +++ b/src/rgw/driver/rados/rgw_notify.cc @@ -22,6 +22,7 @@ #include "rgw_url.h" #include #include +#include "librados/AioCompletionImpl.h" #include diff --git a/src/rgw/driver/rados/rgw_sal_rados.cc b/src/rgw/driver/rados/rgw_sal_rados.cc index 228d83cb3cb25..5e82d5b0a2306 100644 --- a/src/rgw/driver/rados/rgw_sal_rados.cc +++ b/src/rgw/driver/rados/rgw_sal_rados.cc @@ -13,79 +13,77 @@ * */ -#include -#include -#include -#include +#include +#include #include -#include -#include + #include -#include + #include #include +#include + #include "common/async/blocked_completion.h" -#include "include/function2.hpp" -#include "common/Clock.h" #include "common/ceph_time.h" +#include "common/Clock.h" #include "common/errno.h" -#include "role.h" -#include "rgw_obj_types.h" -#include "rgw_rados.h" -#include "rgw_sal.h" -#include "rgw_sal_rados.h" -#include "rgw_bucket.h" -#include "rgw_multi.h" +#include "librados/AioCompletionImpl.h" + +#include "cls/rgw/cls_rgw_client.h" + #include "rgw_acl.h" -#include "rgw_acl_s3.h" -#include "rgw_aio.h" #include "rgw_aio_throttle.h" -#include "rgw_tools.h" -#include "rgw_tracer.h" -#include "rgw_oidc_provider.h" - -#include "rgw_zone.h" -#include "rgw_rest_conn.h" -#include "rgw_service.h" +#include "rgw_bucket.h" +#include "rgw_bucket_logging.h" #include "rgw_lc.h" #include "rgw_lc_tier.h" +#include "rgw_lc_tier.h" #include "rgw_mdlog.h" -#include "rgw_rest_admin.h" +#include "rgw_multi.h" +#include "rgw_obj_types.h" +#include "rgw_oidc_provider.h" +#include "rgw_rados.h" #include "rgw_rest_bucket.h" -#include "rgw_rest_metadata.h" -#include "rgw_rest_log.h" #include "rgw_rest_config.h" +#include "rgw_rest_conn.h" +#include "rgw_rest_log.h" +#include "rgw_rest_metadata.h" #include "rgw_rest_ratelimit.h" #include "rgw_rest_realm.h" #include "rgw_rest_user.h" -#include "rgw_lc_tier.h" -#include "rgw_bucket_logging.h" -#include "services/svc_sys_obj.h" -#include "services/svc_mdlog.h" -#include "services/svc_cls.h" +#include "rgw_sal.h" +#include "rgw_sal_rados.h" +#include "rgw_service.h" +#include "rgw_tools.h" +#include "rgw_tracer.h" +#include "rgw_zone.h" + #include "services/svc_bilog_rados.h" #include "services/svc_bi_rados.h" -#include "services/svc_zone.h" -#include "services/svc_tier_rados.h" -#include "services/svc_quota.h" +#include "services/svc_cls.h" #include "services/svc_config_key.h" -#include "services/svc_zone_utils.h" -#include "services/svc_user.h" +#include "services/svc_mdlog.h" +#include "services/svc_quota.h" +#include "services/svc_sys_obj.h" #include "services/svc_sys_obj_cache.h" -#include "cls/rgw/cls_rgw_client.h" +#include "services/svc_tier_rados.h" +#include "services/svc_user.h" +#include "services/svc_zone.h" +#include "services/svc_zone_utils.h" #include "account.h" #include "buckets.h" #include "group.h" #include "groups.h" -#include "roles.h" -#include "users.h" #include "rgw_pubsub.h" +#include "role.h" +#include "roles.h" #include "topic.h" #include "topics.h" +#include "users.h" #define dout_subsys ceph_subsys_rgw @@ -2287,6 +2285,11 @@ int RadosStore::meta_remove(const DoutPrefixProvider* dpp, std::string& metadata return ctl()->meta.mgr->remove(metadata_key, y, dpp); } +void RadosStore::shutdown(void) { + svc()->datalog_rados->shutdown(); + return; +} + void RadosStore::finalize(void) { if (rados) diff --git a/src/rgw/driver/rados/rgw_sal_rados.h b/src/rgw/driver/rados/rgw_sal_rados.h index cbb51872decd0..08ab55bcf8278 100644 --- a/src/rgw/driver/rados/rgw_sal_rados.h +++ b/src/rgw/driver/rados/rgw_sal_rados.h @@ -427,6 +427,8 @@ class RadosStore : public StoreDriver { virtual const std::string& get_compression_type(const rgw_placement_rule& rule) override; virtual bool valid_placement(const rgw_placement_rule& rule) override; + virtual void shutdown(void) override; + virtual void finalize(void) override; virtual CephContext* ctx(void) override { return rados->ctx(); } diff --git a/src/rgw/driver/rados/rgw_tools.cc b/src/rgw/driver/rados/rgw_tools.cc index 2a12b583508c4..70fa258a554f1 100644 --- a/src/rgw/driver/rados/rgw_tools.cc +++ b/src/rgw/driver/rados/rgw_tools.cc @@ -4,6 +4,7 @@ #include "auth/AuthRegistry.h" #include "common/errno.h" +#include "librados/AioCompletionImpl.h" #include "librados/librados_asio.h" #include "include/stringify.h" diff --git a/src/rgw/radosgw-admin/radosgw-admin.cc b/src/rgw/radosgw-admin/radosgw-admin.cc index 2dcd2650f7647..466e030c15f7f 100644 --- a/src/rgw/radosgw-admin/radosgw-admin.cc +++ b/src/rgw/radosgw-admin/radosgw-admin.cc @@ -1238,9 +1238,14 @@ static void show_topics_info_v2(const rgw_pubsub_topic& topic, class StoreDestructor { rgw::sal::Driver* driver; + ceph::async::io_context_pool* pool; public: - explicit StoreDestructor(rgw::sal::Driver* _s) : driver(_s) {} + explicit StoreDestructor(rgw::sal::Driver* _s, + ceph::async::io_context_pool* pool) + : driver(_s), pool(pool) {} ~StoreDestructor() { + driver->shutdown(); + pool->finish(); DriverManager::close_storage(driver); rgw_http_client_cleanup(); } @@ -4703,7 +4708,7 @@ int main(int argc, const char **argv) oath_init(); - StoreDestructor store_destructor(driver); + StoreDestructor store_destructor(driver, &context_pool); if (raw_storage_op) { switch (opt_cmd) { @@ -10859,7 +10864,7 @@ next: } auto datalog_svc = static_cast(driver)->svc()->datalog_rados; - RGWDataChangesLog::LogMarker log_marker; + RGWDataChangesLogMarker log_marker; do { std::vector entries; diff --git a/src/rgw/rgw_appmain.cc b/src/rgw/rgw_appmain.cc index ded764dde3973..ea0b130347f5c 100644 --- a/src/rgw/rgw_appmain.cc +++ b/src/rgw/rgw_appmain.cc @@ -600,6 +600,7 @@ void rgw::AppMain::shutdown(std::function finalize_async_signals) lua_background->shutdown(); } + env.driver->shutdown(); // Do this before closing storage so requests don't try to call into // closed storage. context_pool->finish(); diff --git a/src/rgw/rgw_realm_reloader.cc b/src/rgw/rgw_realm_reloader.cc index e975ee66abf22..7b2a90b5ae7da 100644 --- a/src/rgw/rgw_realm_reloader.cc +++ b/src/rgw/rgw_realm_reloader.cc @@ -97,6 +97,7 @@ void RGWRealmReloader::reload() // TODO: make RGWRados responsible for rgw_log_usage lifetime rgw_log_usage_finalize(); + env.driver->shutdown(); // destroy the existing driver DriverManager::close_storage(env.driver); env.driver = nullptr; diff --git a/src/rgw/rgw_sal.h b/src/rgw/rgw_sal.h index 18ab24732885a..1cf8c6cf3eb1f 100644 --- a/src/rgw/rgw_sal.h +++ b/src/rgw/rgw_sal.h @@ -678,6 +678,9 @@ class Driver { /** Check to see if this placement rule is valid */ virtual bool valid_placement(const rgw_placement_rule& rule) = 0; + /** Shut down background tasks, to be called while Asio is running. */ + virtual void shutdown(void) { }; + /** Clean up a driver for termination */ virtual void finalize(void) = 0; diff --git a/src/test/rgw/CMakeLists.txt b/src/test/rgw/CMakeLists.txt index 6c923b4d0e8b4..9bcc4383a8faa 100644 --- a/src/test/rgw/CMakeLists.txt +++ b/src/test/rgw/CMakeLists.txt @@ -318,7 +318,10 @@ add_executable(unittest_log_backing test_log_backing.cc) target_include_directories(unittest_log_backing SYSTEM PRIVATE "${CMAKE_SOURCE_DIR}/src/rgw" SYSTEM PRIVATE "${CMAKE_SOURCE_DIR}/src/rgw/store/rados") -target_link_libraries(unittest_log_backing radostest-cxx ${UNITTEST_LIBS} +target_link_libraries(unittest_log_backing + libneorados + neoradostest-support + ${UNITTEST_LIBS} ${rgw_libs}) add_executable(unittest_rgw_lua test_rgw_lua.cc) diff --git a/src/test/rgw/test_log_backing.cc b/src/test/rgw/test_log_backing.cc index 984d03d2ee061..125a3d1fa4def 100644 --- a/src/test/rgw/test_log_backing.cc +++ b/src/test/rgw/test_log_backing.cc @@ -14,178 +14,100 @@ #include "rgw_log_backing.h" -#include -#include #include +#include +#include + +#include +#include + #include -#include "common/Clock.h" // for ceph_clock_now() -#include "include/types.h" #include "include/rados/librados.hpp" +#include "include/neorados/RADOS.hpp" -#include "test/librados/test_cxx.h" -#include "global/global_context.h" +#include "common/ceph_time.h" -#include "cls/log/cls_log_client.h" +#include "neorados/cls/fifo.h" +#include "neorados/cls/log.h" -#include "rgw_tools.h" -#include "cls_fifo_legacy.h" +#include "test/neorados/common_tests.h" #include "gtest/gtest.h" -namespace lr = librados; -namespace cb = ceph::buffer; -namespace fifo = rados::cls::fifo; -namespace RCf = rgw::cls::fifo; - -auto cct = new CephContext(CEPH_ENTITY_TYPE_CLIENT); -const DoutPrefix dp(cct, 1, "test log backing: "); - -class LogBacking : public testing::Test { -protected: - static constexpr int SHARDS = 3; - const std::string pool_name = get_temp_pool_name(); - lr::Rados rados; - lr::IoCtx ioctx; - lr::Rados rados2; - lr::IoCtx ioctx2; - - void SetUp() override { - ASSERT_EQ("", create_one_pool_pp(pool_name, rados)); - ASSERT_EQ(0, rados.ioctx_create(pool_name.c_str(), ioctx)); - connect_cluster_pp(rados2); - ASSERT_EQ(0, rados2.ioctx_create(pool_name.c_str(), ioctx2)); - } - void TearDown() override { - destroy_one_pool_pp(pool_name, rados); - } +namespace asio = boost::asio; +namespace buffer = ceph::buffer; - std::string get_oid(uint64_t gen_id, int i) const { - return (gen_id > 0 ? - fmt::format("shard@G{}.{}", gen_id, i) : - fmt::format("shard.{}", i)); - } +namespace fifo = neorados::cls::fifo; +namespace logn = neorados::cls::log; - void make_omap() { - for (int i = 0; i < SHARDS; ++i) { - using ceph::encode; - lr::ObjectWriteOperation op; - cb::list bl; - encode(i, bl); - cls_log_add(op, ceph::real_clock::now(), {}, "meow", bl); - auto r = rgw_rados_operate(&dp, ioctx, get_oid(0, i), std::move(op), - null_yield); - ASSERT_GE(r, 0); - } - } +namespace { +inline constexpr int SHARDS = 3; +std::string get_oid(uint64_t gen_id, int i) { + return (gen_id > 0 ? + fmt::format("shard@G{}.{}", gen_id, i) : + fmt::format("shard.{}", i)); +} - void add_omap(int i) { +asio::awaitable make_omap(neorados::RADOS& rados, + const neorados::IOContext& loc) { + for (int i = 0; i < SHARDS; ++i) { using ceph::encode; - lr::ObjectWriteOperation op; - cb::list bl; + neorados::WriteOp op; + buffer::list bl; encode(i, bl); - cls_log_add(op, ceph::real_clock::now(), {}, "meow", bl); - auto r = rgw_rados_operate(&dp, ioctx, get_oid(0, i), std::move(op), null_yield); - ASSERT_GE(r, 0); - } - - void empty_omap() { - for (int i = 0; i < SHARDS; ++i) { - auto oid = get_oid(0, i); - std::string to_marker; - { - lr::ObjectReadOperation op; - std::vector entries; - bool truncated = false; - cls_log_list(op, {}, {}, {}, 1, entries, &to_marker, &truncated); - auto r = rgw_rados_operate(&dp, ioctx, oid, std::move(op), nullptr, null_yield); - ASSERT_GE(r, 0); - ASSERT_FALSE(entries.empty()); - } - { - lr::ObjectWriteOperation op; - cls_log_trim(op, {}, {}, {}, to_marker); - auto r = rgw_rados_operate(&dp, ioctx, oid, std::move(op), null_yield); - ASSERT_GE(r, 0); - } - { - lr::ObjectReadOperation op; - std::vector entries; - bool truncated = false; - cls_log_list(op, {}, {}, {}, 1, entries, &to_marker, &truncated); - auto r = rgw_rados_operate(&dp, ioctx, oid, std::move(op), nullptr, null_yield); - ASSERT_GE(r, 0); - ASSERT_TRUE(entries.empty()); - } - } + op.exec(logn::add(ceph::real_clock::now(), {}, "meow", std::move(bl))); + co_await rados.execute(get_oid(0, i), loc, std::move(op), + asio::use_awaitable); } + co_return; +} - void make_fifo() - { - for (int i = 0; i < SHARDS; ++i) { - std::unique_ptr fifo; - auto r = RCf::FIFO::create(&dp, ioctx, get_oid(0, i), &fifo, null_yield); - ASSERT_EQ(0, r); - ASSERT_TRUE(fifo); - } - } - - void add_fifo(int i) - { - using ceph::encode; - std::unique_ptr fifo; - auto r = RCf::FIFO::open(&dp, ioctx, get_oid(0, i), &fifo, null_yield); - ASSERT_GE(0, r); - ASSERT_TRUE(fifo); - cb::list bl; - encode(i, bl); - r = fifo->push(&dp, bl, null_yield); - ASSERT_GE(0, r); - } - - void assert_empty() { - std::vector result; - lr::ObjectCursor next; - auto r = ioctx.object_list(ioctx.object_list_begin(), ioctx.object_list_end(), - 100, {}, &result, &next); - ASSERT_GE(r, 0); - ASSERT_TRUE(result.empty()); +asio::awaitable make_fifo(const DoutPrefixProvider* dpp, + neorados::RADOS& rados, + const neorados::IOContext& loc) { + for (int i = 0; i < SHARDS; ++i) { + auto fifo = co_await fifo::FIFO::create(dpp, rados, get_oid(0, i), loc, + asio::use_awaitable); + EXPECT_TRUE(fifo); } -}; +} +} -TEST_F(LogBacking, TestOmap) +CORO_TEST_F(LogBacking, TestOmap, NeoRadosTest) { - make_omap(); - auto stat = log_backing_type(&dp, ioctx, log_type::fifo, SHARDS, - [this](int shard){ return get_oid(0, shard); }, - null_yield); - ASSERT_EQ(log_type::omap, *stat); + co_await make_omap(rados(), pool()); + auto stat = co_await log_backing_type( + dpp(), rados(), pool(), log_type::fifo, SHARDS, + [](int shard){ return get_oid(0, shard); }); + EXPECT_EQ(log_type::omap, stat); } -TEST_F(LogBacking, TestOmapEmpty) + +CORO_TEST_F(LogBacking, TestOmapEmpty, NeoRadosTest) { - auto stat = log_backing_type(&dp, ioctx, log_type::omap, SHARDS, - [this](int shard){ return get_oid(0, shard); }, - null_yield); - ASSERT_EQ(log_type::omap, *stat); + auto stat = co_await log_backing_type( + dpp(), rados(), pool(), log_type::omap, SHARDS, + [](int shard){ return get_oid(0, shard); }); + EXPECT_EQ(log_type::omap, stat); } -TEST_F(LogBacking, TestFIFO) +CORO_TEST_F(LogBacking, TestFIFO, NeoRadosTest) { - make_fifo(); - auto stat = log_backing_type(&dp, ioctx, log_type::fifo, SHARDS, - [this](int shard){ return get_oid(0, shard); }, - null_yield); - ASSERT_EQ(log_type::fifo, *stat); + co_await make_fifo(dpp(), rados(), pool()); + auto stat = co_await log_backing_type( + dpp(), rados(), pool(), log_type::fifo, SHARDS, + [](int shard){ return get_oid(0, shard); }); + EXPECT_EQ(log_type::fifo, stat); } -TEST_F(LogBacking, TestFIFOEmpty) +CORO_TEST_F(LogBacking, TestFIFOEmpty, NeoRadosTest) { - auto stat = log_backing_type(&dp, ioctx, log_type::fifo, SHARDS, - [this](int shard){ return get_oid(0, shard); }, - null_yield); - ASSERT_EQ(log_type::fifo, *stat); + auto stat = co_await log_backing_type( + dpp(), rados(), pool(), log_type::fifo, SHARDS, + [](int shard){ return get_oid(0, shard); }); + EXPECT_EQ(log_type::fifo, stat); } TEST(CursorGen, RoundTrip) { @@ -214,153 +136,133 @@ public: using logback_generations::logback_generations; - bs::error_code handle_init(entries_t e) noexcept { + void handle_init(entries_t e) override { got_entries = e; - return {}; } - bs::error_code handle_new_gens(entries_t e) noexcept { + void handle_new_gens(entries_t e) override { got_entries = e; - return {}; } - bs::error_code handle_empty_to(uint64_t new_tail) noexcept { + void handle_empty_to(uint64_t new_tail) override { tail = new_tail; - return {}; } }; -TEST_F(LogBacking, GenerationSingle) -{ - auto lgr = logback_generations::init( - &dp, ioctx, "foobar", [this](uint64_t gen_id, int shard) { - return get_oid(gen_id, shard); - }, SHARDS, log_type::fifo, null_yield); - ASSERT_TRUE(lgr); - - auto lg = std::move(*lgr); +CORO_TEST_F(LogBacking, GenerationSingle, NeoRadosTest) { + auto lg = co_await logback_generations::init( + dpp(), rados(), "foobar", pool(), &get_oid, SHARDS, log_type::fifo); - ASSERT_EQ(0, lg->got_entries.begin()->first); + EXPECT_FALSE(lg->got_entries.empty()); + EXPECT_EQ(0, lg->got_entries.begin()->first); - ASSERT_EQ(0, lg->got_entries[0].gen_id); - ASSERT_EQ(log_type::fifo, lg->got_entries[0].type); - ASSERT_FALSE(lg->got_entries[0].pruned); + EXPECT_EQ(0, lg->got_entries[0].gen_id); + EXPECT_EQ(log_type::fifo, lg->got_entries[0].type); + EXPECT_FALSE(lg->got_entries[0].pruned); - auto ec = lg->empty_to(&dp, 0, null_yield); - ASSERT_TRUE(ec); + EXPECT_THROW({ + co_await lg->empty_to(dpp(), 0); + }, sys::system_error); lg.reset(); - lg = *logback_generations::init( - &dp, ioctx, "foobar", [this](uint64_t gen_id, int shard) { - return get_oid(gen_id, shard); - }, SHARDS, log_type::fifo, null_yield); + lg = co_await logback_generations::init( + dpp(), rados(), "foobar", pool(), &get_oid, SHARDS, log_type::fifo); - ASSERT_EQ(0, lg->got_entries.begin()->first); + EXPECT_EQ(0, lg->got_entries.begin()->first); - ASSERT_EQ(0, lg->got_entries[0].gen_id); - ASSERT_EQ(log_type::fifo, lg->got_entries[0].type); - ASSERT_FALSE(lg->got_entries[0].pruned); + EXPECT_EQ(0, lg->got_entries[0].gen_id); + EXPECT_EQ(log_type::fifo, lg->got_entries[0].type); + EXPECT_FALSE(lg->got_entries[0].pruned); lg->got_entries.clear(); - ec = lg->new_backing(&dp, log_type::omap, null_yield); - ASSERT_FALSE(ec); + co_await lg->new_backing(dpp(), log_type::omap); - ASSERT_EQ(1, lg->got_entries.size()); - ASSERT_EQ(1, lg->got_entries[1].gen_id); - ASSERT_EQ(log_type::omap, lg->got_entries[1].type); - ASSERT_FALSE(lg->got_entries[1].pruned); + EXPECT_EQ(1, lg->got_entries.size()); + EXPECT_EQ(1, lg->got_entries[1].gen_id); + EXPECT_EQ(log_type::omap, lg->got_entries[1].type); + EXPECT_FALSE(lg->got_entries[1].pruned); lg.reset(); - lg = *logback_generations::init( - &dp, ioctx, "foobar", [this](uint64_t gen_id, int shard) { - return get_oid(gen_id, shard); - }, SHARDS, log_type::fifo, null_yield); + lg = co_await logback_generations::init( + dpp(), rados(), "foobar", pool(), &get_oid, SHARDS, log_type::fifo); - ASSERT_EQ(2, lg->got_entries.size()); - ASSERT_EQ(0, lg->got_entries[0].gen_id); - ASSERT_EQ(log_type::fifo, lg->got_entries[0].type); - ASSERT_FALSE(lg->got_entries[0].pruned); + EXPECT_EQ(2, lg->got_entries.size()); + EXPECT_EQ(0, lg->got_entries[0].gen_id); + EXPECT_EQ(log_type::fifo, lg->got_entries[0].type); + EXPECT_FALSE(lg->got_entries[0].pruned); - ASSERT_EQ(1, lg->got_entries[1].gen_id); - ASSERT_EQ(log_type::omap, lg->got_entries[1].type); - ASSERT_FALSE(lg->got_entries[1].pruned); + EXPECT_EQ(1, lg->got_entries[1].gen_id); + EXPECT_EQ(log_type::omap, lg->got_entries[1].type); + EXPECT_FALSE(lg->got_entries[1].pruned); - ec = lg->empty_to(&dp, 0, null_yield); - ASSERT_FALSE(ec); + co_await lg->empty_to(dpp(), 0); - ASSERT_EQ(0, *lg->tail); + EXPECT_EQ(0, *lg->tail); lg.reset(); - lg = *logback_generations::init( - &dp, ioctx, "foobar", [this](uint64_t gen_id, int shard) { - return get_oid(gen_id, shard); - }, SHARDS, log_type::fifo, null_yield); + lg = co_await logback_generations::init( + dpp(), rados(), "foobar", pool(), &get_oid, SHARDS, log_type::fifo); - ASSERT_EQ(1, lg->got_entries.size()); - ASSERT_EQ(1, lg->got_entries[1].gen_id); - ASSERT_EQ(log_type::omap, lg->got_entries[1].type); - ASSERT_FALSE(lg->got_entries[1].pruned); + EXPECT_EQ(1, lg->got_entries.size()); + EXPECT_EQ(1, lg->got_entries[1].gen_id); + EXPECT_EQ(log_type::omap, lg->got_entries[1].type); + EXPECT_FALSE(lg->got_entries[1].pruned); } -TEST_F(LogBacking, GenerationWN) -{ - auto lg1 = *logback_generations::init( - &dp, ioctx, "foobar", [this](uint64_t gen_id, int shard) { - return get_oid(gen_id, shard); - }, SHARDS, log_type::fifo, null_yield); +CORO_TEST_F(LogBacking, GenerationWN, NeoRadosTest) { + auto lg1 = co_await logback_generations::init( + dpp(), rados(), "foobar", pool(), &get_oid, SHARDS, log_type::fifo); - auto ec = lg1->new_backing(&dp, log_type::omap, null_yield); - ASSERT_FALSE(ec); + co_await lg1->new_backing(dpp(), log_type::omap); - ASSERT_EQ(1, lg1->got_entries.size()); - ASSERT_EQ(1, lg1->got_entries[1].gen_id); - ASSERT_EQ(log_type::omap, lg1->got_entries[1].type); - ASSERT_FALSE(lg1->got_entries[1].pruned); + EXPECT_EQ(1, lg1->got_entries.size()); + EXPECT_EQ(1, lg1->got_entries[1].gen_id); + EXPECT_EQ(log_type::omap, lg1->got_entries[1].type); + EXPECT_FALSE(lg1->got_entries[1].pruned); lg1->got_entries.clear(); - auto lg2 = *logback_generations::init( - &dp, ioctx2, "foobar", [this](uint64_t gen_id, int shard) { - return get_oid(gen_id, shard); - }, SHARDS, log_type::fifo, null_yield); + auto rados2 = co_await neorados::RADOS::Builder{} + .build(asio_context, boost::asio::use_awaitable); + + auto lg2 = co_await logback_generations::init( + dpp(), rados2, "foobar", pool(), &get_oid, SHARDS, log_type::fifo); - ASSERT_EQ(2, lg2->got_entries.size()); + EXPECT_EQ(2, lg2->got_entries.size()); - ASSERT_EQ(0, lg2->got_entries[0].gen_id); - ASSERT_EQ(log_type::fifo, lg2->got_entries[0].type); - ASSERT_FALSE(lg2->got_entries[0].pruned); + EXPECT_EQ(0, lg2->got_entries[0].gen_id); + EXPECT_EQ(log_type::fifo, lg2->got_entries[0].type); + EXPECT_FALSE(lg2->got_entries[0].pruned); - ASSERT_EQ(1, lg2->got_entries[1].gen_id); - ASSERT_EQ(log_type::omap, lg2->got_entries[1].type); - ASSERT_FALSE(lg2->got_entries[1].pruned); + EXPECT_EQ(1, lg2->got_entries[1].gen_id); + EXPECT_EQ(log_type::omap, lg2->got_entries[1].type); + EXPECT_FALSE(lg2->got_entries[1].pruned); lg2->got_entries.clear(); - ec = lg1->new_backing(&dp, log_type::fifo, null_yield); - ASSERT_FALSE(ec); + co_await lg1->new_backing(dpp(), log_type::fifo); - ASSERT_EQ(1, lg1->got_entries.size()); - ASSERT_EQ(2, lg1->got_entries[2].gen_id); - ASSERT_EQ(log_type::fifo, lg1->got_entries[2].type); - ASSERT_FALSE(lg1->got_entries[2].pruned); + EXPECT_EQ(1, lg1->got_entries.size()); + EXPECT_EQ(2, lg1->got_entries[2].gen_id); + EXPECT_EQ(log_type::fifo, lg1->got_entries[2].type); + EXPECT_FALSE(lg1->got_entries[2].pruned); - ASSERT_EQ(1, lg2->got_entries.size()); - ASSERT_EQ(2, lg2->got_entries[2].gen_id); - ASSERT_EQ(log_type::fifo, lg2->got_entries[2].type); - ASSERT_FALSE(lg2->got_entries[2].pruned); + EXPECT_EQ(1, lg2->got_entries.size()); + EXPECT_EQ(2, lg2->got_entries[2].gen_id); + EXPECT_EQ(log_type::fifo, lg2->got_entries[2].type); + EXPECT_FALSE(lg2->got_entries[2].pruned); lg1->got_entries.clear(); lg2->got_entries.clear(); - ec = lg2->empty_to(&dp, 1, null_yield); - ASSERT_FALSE(ec); + co_await lg2->empty_to(dpp(), 1); - ASSERT_EQ(1, *lg1->tail); - ASSERT_EQ(1, *lg2->tail); + EXPECT_EQ(1, *lg1->tail); + EXPECT_EQ(1, *lg2->tail); lg1->tail.reset(); lg2->tail.reset(); -- 2.39.5