From a2d26647c011274b61805f8ac17c3422e9b9b63c Mon Sep 17 00:00:00 2001 From: "Adam C. Emerson" Date: Fri, 11 Jul 2025 14:57:02 -0400 Subject: [PATCH] neorados/fifo: Rewrite as proper I/O object Split nominal handle object and reference-counted implementation. While we're at it, add lazy-open functionality. Signed-off-by: Adam C. Emerson --- src/cls/fifo/cls_fifo_types.h | 4 + src/neorados/cls/common.h | 5 +- src/neorados/cls/fifo.h | 1675 ++------------------ src/neorados/cls/fifo/detail/fifo.h | 1602 +++++++++++++++++++ src/neorados/cls/fifo/entry.h | 39 + src/test/cls_fifo/ceph_test_neocls_fifo.cc | 180 ++- 6 files changed, 1857 insertions(+), 1648 deletions(-) create mode 100644 src/neorados/cls/fifo/detail/fifo.h create mode 100644 src/neorados/cls/fifo/entry.h diff --git a/src/cls/fifo/cls_fifo_types.h b/src/cls/fifo/cls_fifo_types.h index 2ae601e4aaf..139b2986007 100644 --- a/src/cls/fifo/cls_fifo_types.h +++ b/src/cls/fifo/cls_fifo_types.h @@ -610,4 +610,8 @@ template<> struct fmt::formatter : fmt::ostream_formatter {}; template<> struct fmt::formatter : fmt::ostream_formatter {}; +template<> +struct fmt::formatter : fmt::ostream_formatter {}; +template<> +struct fmt::formatter : fmt::ostream_formatter {}; #endif diff --git a/src/neorados/cls/common.h b/src/neorados/cls/common.h index 3ce18103f26..59d6cdeff16 100644 --- a/src/neorados/cls/common.h +++ b/src/neorados/cls/common.h @@ -12,7 +12,6 @@ #pragma once #include -#include #include #include #include @@ -35,6 +34,10 @@ #include "include/buffer.h" #include "include/encoding.h" +#if !defined(__clang__) && defined(__GNUC__) && (__GNUC__ < 13) +#define BROKEN_CO_COMPOSED +#endif + /// \file neorados/cls/common.h /// /// \brief Helpers for writing simple CLS clients diff --git a/src/neorados/cls/fifo.h b/src/neorados/cls/fifo.h index fe0f94a85a9..f50f769bc81 100644 --- a/src/neorados/cls/fifo.h +++ b/src/neorados/cls/fifo.h @@ -25,27 +25,21 @@ /// that object once processing is complete. The head object is the /// notional 'name' of the FIFO, provided at creation or opening time. -#include -#include +#include "fifo/detail/fifo.h" + #include #include -#include #include -#include #include #include #include -#include #include -#include #include -#include #include -#include - -#include +#include +#include #include #include @@ -53,1136 +47,60 @@ #include "include/buffer.h" #include "include/neorados/RADOS.hpp" -#include "common/debug.h" -#include "common/strtol.h" - -#include "neorados/cls/common.h" +#include "common/dout.h" +#include "common/dout_fmt.h" #include "cls/fifo/cls_fifo_types.h" -#include "cls/fifo/cls_fifo_ops.h" - -// Asio's co_compse generates spurious warnings when compiled with -// -O0. the 'mismatched' `operator new` calls directly into the -// matching `operator new`, returning its result. -#pragma GCC diagnostic push -#pragma GCC diagnostic ignored "-Wmismatched-new-delete" namespace neorados::cls::fifo { -/// Entries to be returned by the list operation -struct entry { - /// Data stored in the entry - ceph::buffer::list data; - /// Marker (for trimming, continuing list operations) - std::string marker; - /// Time stored (set by the OSD) - ceph::real_time mtime; -}; - -inline std::ostream& operator <<(std::ostream& m, const entry& e) { - return m << "[data: " << e.data - << ", marker: " << e.marker - << ", mtime: " << e.mtime << "]"; -} - /// This is the FIFO client class. It handles the logic of keeping /// state synchronized with that on the server, journal processing, /// and multipart push. class FIFO { friend class FIFOtest; - /// A marker is a part number and byte offset used to indicate an - /// entry. - struct marker { - std::int64_t num = 0; - std::uint64_t ofs = 0; - - /// Default constructor - marker() = default; - - /// Construct from part and offset - /// - /// \param num Part number - /// \param ofs Offset within the part - marker(std::int64_t num, std::uint64_t ofs) : num(num), ofs(ofs) {} - - /// Return a string representation of the marker - std::string to_string() { - return fmt::format("{:0>20}:{:0>20}", num, ofs); - } - }; - - /// Maximum number of retries if we run into a conflict. Limited to - /// keep us from locking up if we never hit the end condition. - static constexpr auto MAX_RACE_RETRIES = 10; - - /// RADOS handle - neorados::RADOS& rados; - /// Head object - const neorados::Object obj; - /// Locator - const neorados::IOContext ioc; - /// Total size of the part header. - std::uint32_t part_header_size; - /// The additional space required to store an entry, above the size - /// of the entry itself. - std::uint32_t part_entry_overhead; - /// Local copy of FIFO data - rados::cls::fifo::info info; - - /// Mutex protecting local data; - std::mutex m; - - /// Constructor - /// - /// \param rados RADOS handle - /// \param obj Head object - /// \param ioc Locator - /// \param part_header_size Total size of the part header. - /// \param part_entry_overhead The additional space required to - /// store an entry, above the size of the - /// entry itself. - FIFO(neorados::RADOS& rados, - neorados::Object obj, - neorados::IOContext ioc, - std::uint32_t part_header_size, - std::uint32_t part_entry_overhead, - rados::cls::fifo::info info) - : rados(rados), obj(std::move(obj)), ioc(std::move(ioc)), - part_header_size(part_header_size), - part_entry_overhead(part_entry_overhead), - info(std::move(info)) {} - - /// \name Primitives - /// - /// Low-level coroutininized operations in the FIFO objclass. - ///@{ public: -#if 0 - /// \brief Retrieve FIFO metadata - /// - /// \param rados RADOS handle - /// \param obj Head object for FIFO - /// \param ioc Locator - /// \param token Boost.Asio CompletionToken - /// \param objv Operation will fail if FIFO is not at this version - /// - /// \return The metadata info, part header size, and entry overhead - /// in a way appropriate to the completion token. - template CompletionToken> - static auto get_meta(neorados::RADOS& rados, - neorados::Object obj, - neorados::IOContext ioc, - std::optional objv, - CompletionToken&& token) { - namespace fifo = rados::cls::fifo; - neorados::ReadOp op; - fifo::op::get_meta gm; - gm.version = objv; - return exec( - rados, std::move(obj), std::move(ioc), - fifo::op::CLASS, fifo::op::GET_META, std::move(gm), - [](fifo::op::get_meta_reply&& ret) { - return std::make_tuple(std::move(ret.info), - ret.part_header_size, - ret.part_entry_overhead); - }, std::forward(token)); - } -#endif + using executor_type = neorados::RADOS::executor_type; - /// \brief Retrieve FIFO metadata - /// - /// \param rados RADOS handle - /// \param obj Head object for FIFO - /// \param ioc Locator - /// \param objv Operation will fail if FIFO is not at this version - /// \param token Boost.Asio CompletionToken - /// - /// \return The metadata info, part header size, and entry overhead - /// in a way appropriate to the completion token. - template CompletionToken> - static auto get_meta(neorados::RADOS& rados, - neorados::Object obj, - neorados::IOContext ioc, - std::optional objv, - CompletionToken&& token) { - namespace fifo = rados::cls::fifo; - namespace asio = boost::asio; - namespace sys = boost::system; - return asio::async_initiate< - CompletionToken, void(sys::error_code, rados::cls::fifo::info, - uint32_t, uint32_t)> - (asio::experimental::co_composed< - void(sys::error_code, fifo::info, uint32_t, uint32_t)> - ([](auto state, RADOS& r, Object obj, IOContext ioc, - std::optional objv) -> void { - try { - state.throw_if_cancelled(true); - state.reset_cancellation_state(asio::enable_terminal_cancellation()); - buffer::list in; - fifo::op::get_meta gm; - gm.version = objv; - encode(gm, in); - ReadOp op; - buffer::list out; - op.exec(fifo::op::CLASS, fifo::op::GET_META, in, &out); - co_await r.execute(std::move(obj), std::move(ioc), std::move(op), nullptr, - asio::deferred); - fifo::op::get_meta_reply ret; - decode(ret, out); - co_return std::make_tuple(sys::error_code{}, std::move(ret.info), - ret.part_header_size, - ret.part_entry_overhead); - } catch (const sys::system_error& e) { - co_return std::make_tuple(e.code(), fifo::info{}, uint32_t{}, - uint32_t{}); - } - }, rados.get_executor()), - token, std::ref(rados), std::move(obj), std::move(ioc), std::move(objv)); - } - -#if 0 private: - /// \brief Retrieve part info - /// - /// \param part_num Number of part to query - /// \param token Boost.Asio CompletionToken - /// - /// \return The part info in a way appropriate to the completion token. - template - CompletionToken> - auto get_part_info(std::int64_t part_num, - CompletionToken&& token) { - namespace fifo = rados::cls::fifo; - std::unique_lock l(m); - Object part_oid = info.part_oid(part_num);; - l.unlock(); - - return exec( - rados, std::move(std::move(part_oid)), ioc, - fifo::op::CLASS, fifo::op::GET_PART_INFO, - fifo::op::get_part_info{}, - [](fifo::op::get_part_info_reply&& ret) { - return std::move(ret.header); - }, std::forward(token)); - } -#endif - - /// \brief Retrieve part info - /// - /// \param part_num Number of part to query - /// \param token Boost.Asio CompletionToken - /// - /// \return The part info in a way appropriate to the completion token. - template - CompletionToken> - auto get_part_info(std::int64_t part_num, - CompletionToken&& token) { - namespace fifo = rados::cls::fifo; - namespace asio = boost::asio; - namespace sys = boost::system; - std::unique_lock l(m); - Object part_oid = info.part_oid(part_num);; - l.unlock(); - return asio::async_initiate< - CompletionToken, void(sys::error_code, fifo::part_header)> - (asio::experimental::co_composed< - void(sys::error_code, fifo::part_header)> - ([](auto state, Object part_oid, FIFO* f) -> void { - try { - state.throw_if_cancelled(true); - state.reset_cancellation_state(asio::enable_terminal_cancellation()); - buffer::list in; - fifo::op::get_part_info gpi; - encode(gpi, in); - ReadOp op; - buffer::list out; - op.exec(fifo::op::CLASS, fifo::op::GET_PART_INFO, in, &out); - co_await f->rados.execute(std::move(part_oid), f->ioc, std::move(op), - nullptr, asio::deferred); - - fifo::op::get_part_info_reply ret; - decode(ret, out); - co_return std::make_tuple(sys::error_code{}, std::move(ret.header)); - } catch (const sys::system_error& e) { - co_return std::make_tuple(e.code(), fifo::part_header{}); - } - }, rados.get_executor()), - token, std::move(part_oid), this); - } - - /// \brief Create a new part object - /// - /// \param part_num Part number - /// \param token Boost.Asio CompletionToken - /// - /// \return Nothing, but may error in a way appropriate to the - /// completion token. - template CompletionToken> - auto create_part(std::int64_t part_num, CompletionToken&& token) { - namespace fifo = rados::cls::fifo; - namespace buffer = ceph::buffer; - neorados::WriteOp op; - op.create(false); /* We don't need exclusivity, part_init ensures we're - creating from the same journal entry. */ - std::unique_lock l(m); - fifo::op::init_part ip; - ip.params = info.params; - buffer::list in; - encode(ip, in); - op.exec(fifo::op::CLASS, fifo::op::INIT_PART, std::move(in)); - auto oid = info.part_oid(part_num); - l.unlock(); - return rados.execute(oid, ioc, std::move(op), - std::forward(token)); - } - - /// \brief Remove a part object - /// - /// \param part_num Part number - /// \param token Boost.Asio CompletionToken - /// - /// \return Nothing, but may error in a way appropriate to the - /// completion token. - template CompletionToken> - auto remove_part(std::int64_t part_num, CompletionToken&& token) { - namespace fifo = rados::cls::fifo; - namespace buffer = ceph::buffer; - neorados::WriteOp op; - op.remove(); - std::unique_lock l(m); - auto oid = info.part_oid(part_num); - l.unlock(); - return rados.execute(oid, ioc, std::move(op), - std::forward(token)); - } - - /// \brief Update objclass FIFO metadata - /// - /// \param objv Current metadata version (objclass will error on mismatch) - /// \param update Changes to make to metadata - /// \param token Boost.Asio CompletionToken - /// - /// \return Nothing, but may error in a way appropriate to the - /// completion token. - template CompletionToken> - auto update_meta(const rados::cls::fifo::objv& objv, - const rados::cls::fifo::update& update, - CompletionToken&& token) { - namespace fifo = rados::cls::fifo; - namespace buffer = ceph::buffer; - neorados::WriteOp op; - fifo::op::update_meta um; - - um.version = objv; - um.tail_part_num = update.tail_part_num(); - um.head_part_num = update.head_part_num(); - um.min_push_part_num = update.min_push_part_num(); - um.max_push_part_num = update.max_push_part_num(); - um.journal_entries_add = std::move(update).journal_entries_add(); - um.journal_entries_rm = std::move(update).journal_entries_rm(); - - buffer::list in; - encode(um, in); - op.exec(fifo::op::CLASS, fifo::op::UPDATE_META, std::move(in)); - return rados.execute(obj, ioc, std::move(op), - std::forward(token)); - } - - /// \brief Create FIFO head object - /// - /// \param rados RADOS handle - /// \param obj Head object for the FIFO - /// \param ioc Locator - /// \param objv Version (error if FIFO exists and this doesn't match) - /// \param oid_prefix Prefix for all object names - /// \param exclusive If true, error if object already exists - /// \param max_part_size Max size of part objects - /// \param max_entry_size Max size of entries - /// \param token Boost.Asio CompletionToken - /// - /// \return Nothing, but may error in a way appropriate to the - /// completion token. - template CompletionToken> - static auto create_meta(neorados::RADOS& rados, - neorados::Object obj, - neorados::IOContext ioc, - std::optional objv, - std::optional oid_prefix, - bool exclusive, - std::uint64_t max_part_size, - std::uint64_t max_entry_size, - CompletionToken&& token) { - namespace fifo = rados::cls::fifo; - namespace buffer = ceph::buffer; - neorados::WriteOp op; - fifo::op::create_meta cm; - - cm.id = obj; - cm.version = objv; - cm.oid_prefix = oid_prefix; - cm.max_part_size = max_part_size; - cm.max_entry_size = max_entry_size; - cm.exclusive = exclusive; - - buffer::list in; - encode(cm, in); - op.exec(fifo::op::CLASS, fifo::op::CREATE_META, in); - return rados.execute(std::move(obj), std::move(ioc), std::move(op), - std::forward(token)); - } - - /// \brief Push some entries to a given part - /// - /// \param dpp Prefix for debug prints - /// \param entries Entries to push - /// \param token Boost.Asio CompletionToken - /// - /// \return Possibly errors in a way appropriate to the completion - /// token. - template CompletionToken> - auto push_entries(const DoutPrefixProvider* dpp, - std::deque entries, - CompletionToken&& token) { - namespace asio = boost::asio; - namespace sys = boost::system; - namespace fifo = rados::cls::fifo; - namespace buffer = ceph::buffer; - return asio::async_initiate - (asio::experimental::co_composed - ([](auto state, const DoutPrefixProvider* dpp, - std::deque entries, FIFO* f) -> void { - try { - state.throw_if_cancelled(true); - state.reset_cancellation_state(asio::enable_terminal_cancellation()); - - std::unique_lock l(f->m); - auto head_part_num = f->info.head_part_num; - auto oid = f->info.part_oid(head_part_num); - l.unlock(); - - WriteOp op; - op.assert_exists(); - - fifo::op::push_part pp; - - pp.data_bufs = std::move(entries); - pp.total_len = 0; - - for (const auto &bl : pp.data_bufs) - pp.total_len += bl.length(); - - buffer::list in; - encode(pp, in); - int pushes; - op.exec(fifo::op::CLASS, fifo::op::PUSH_PART, in, - [&pushes](sys::error_code, int r, const buffer::list &) { - pushes = r; - }); - op.returnvec(); - co_await f->rados.execute(std::move(oid), f->ioc, std::move(op), - asio::deferred); - co_return {sys::error_code{}, pushes}; - } catch (const sys::system_error& e) { - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " push entries failed: " << e.what() << dendl; - co_return {e.code(), 0}; - } - }, rados.get_executor()), - token, dpp, std::move(entries), this); - } - - /// \brief List entries from a given part - /// - /// \param dpp Prefix provider for debug logging - /// \param part_num Part number to list - /// \param ofs Offset from which to list - /// \param result Span giving space for results - /// \param token Boost.Asio CompletionToken - /// - /// \return A subspan containing results, a bool indicating whether there - /// are more entries within the part, and a bool indicating whether - /// the part is full all in a way appropriate to the completion - /// token. - template, bool, bool)> - CompletionToken> - auto list_part(const DoutPrefixProvider* dpp, - std::int64_t part_num, std::uint64_t ofs, - std::span result, - CompletionToken&& token) { - namespace asio = boost::asio; - namespace sys = boost::system; - namespace fifo = rados::cls::fifo; - namespace buffer = ceph::buffer; - return asio::async_initiate< - CompletionToken, void(sys::error_code, std::span, bool, bool)> - (asio::experimental::co_composed< - void(sys::error_code, std::span, bool, bool)> - ([](auto state, const DoutPrefixProvider* dpp, std::int64_t part_num, - std::uint64_t ofs,std::span result, - FIFO* f) -> void { - try { - state.throw_if_cancelled(true); - state.reset_cancellation_state(asio::enable_terminal_cancellation()); - - std::unique_lock l(f->m); - auto oid = f->info.part_oid(part_num); - l.unlock(); - - ReadOp op; - fifo::op::list_part lp; - - lp.ofs = ofs; - lp.max_entries = result.size(); - - buffer::list in; - encode(lp, in); - buffer::list bl; - op.exec(fifo::op::CLASS, fifo::op::LIST_PART, in, &bl, nullptr); - co_await f->rados.execute(oid, f->ioc, std::move(op), nullptr, - asio::deferred); - bool more, full_part; - { - ceph::buffer::list::const_iterator bi = bl.begin(); - DECODE_START(1, bi); - std::string tag; - decode(tag, bi); - uint32_t len; - decode(len, bi); - if (len > result.size()) { - throw buffer::end_of_buffer{}; - } - result = result.first(len); - for (auto i = 0u; i < len; ++i) { - fifo::part_list_entry entry; - decode(entry, bi); - result[i] = {.data = std::move(entry.data), - .marker = marker{part_num, entry.ofs}.to_string(), - .mtime = entry.mtime}; - } - decode(more, bi); - decode(full_part, bi); - DECODE_FINISH(bi); - } - co_return {sys::error_code{}, std::move(result), more, full_part}; - } catch (const sys::system_error& e) { - if (e.code() != sys::errc::no_such_file_or_directory) { - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " list entries failed: " << e.what() << dendl; - } - co_return {e.code(), std::span{}, false, false}; - } - }, rados.get_executor()), - token, dpp, part_num, ofs, std::move(result), this); - } - - /// \brief Trim entries on a given part - /// - /// \param dpp Prefix provider for debug logging - /// \param part_num Part to trim - /// \param ofs Offset to which to trim - /// \param exclusive If true, exclude end of range from trim - /// \param token Boost.Asio CompletionToken - /// - /// \return Possibly errors in a way appropriate to the completion - /// token. - template CompletionToken> - auto trim_part(const DoutPrefixProvider* dpp, - std::int64_t part_num, - std::uint64_t ofs, - bool exclusive, - CompletionToken&& token) { - namespace asio = boost::asio; - namespace sys = boost::system; - namespace fifo = rados::cls::fifo; - namespace buffer = ceph::buffer; - return asio::async_initiate - (asio::experimental::co_composed - ([](auto state, const DoutPrefixProvider* dpp, - int64_t part_num, uint64_t ofs, bool exclusive, FIFO* f) -> void { - try { - state.throw_if_cancelled(true); - state.reset_cancellation_state(asio::enable_terminal_cancellation()); - - std::unique_lock l(f->m); - auto oid = f->info.part_oid(part_num); - l.unlock(); - - WriteOp op; - fifo::op::trim_part tp; - - tp.ofs = ofs; - tp.exclusive = exclusive; - - buffer::list in; - encode(tp, in); - op.exec(fifo::op::CLASS, fifo::op::TRIM_PART, in); - co_await f->rados.execute(std::move(oid), f->ioc, std::move(op), - asio::deferred); - co_return sys::error_code{}; - } catch (const sys::system_error& e) { - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " trim part failed: " << e.what() << dendl; - co_return e.code(); - } - }, rados.get_executor()), - token, dpp, part_num, ofs, exclusive, this); - } - - ///@} - - /// \name Logics - /// - /// Internal logic used to implement the public interface - /// - ///@{ - - /// \brief Convert a string to a marker - /// - /// This is not a free function since an empty string always refers - /// to the tail part. - /// - /// \param s String representation of the marker - /// \param l Ownership of mutex - /// - /// \returns The marker or nullopt if the string representation is invalid - std::optional to_marker(std::string_view s, - std::unique_lock& l) { - assert(l.owns_lock()); - marker m; - if (s.empty()) { - m.num = info.tail_part_num; - m.ofs = 0; - return m; - } - - auto pos = s.find(':'); - if (pos == s.npos) { - return std::nullopt; - } - - auto num = s.substr(0, pos); - auto ofs = s.substr(pos + 1); - - auto n = ceph::parse(num); - if (!n) { - return std::nullopt; - } - m.num = *n; - auto o = ceph::parse(ofs); - if (!o) { - return std::nullopt; - } - m.ofs = *o; - return m; - } - - /// \brief Force re-read of metadata - /// - /// \param dpp Prefix provider for debug logging - /// \param token Boost.Asio CompletionToken - /// - /// \return Possibly errors in a way appropriate to the completion - /// token. - template CompletionToken> - auto read_meta(const DoutPrefixProvider* dpp, - CompletionToken&& token) { - namespace asio = boost::asio; - namespace sys = boost::system; - return asio::async_initiate - (asio::experimental::co_composed - ([](auto state, const DoutPrefixProvider* dpp, - FIFO* f) -> void { - try { - state.throw_if_cancelled(true); - state.reset_cancellation_state(asio::enable_terminal_cancellation()); - - ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " entering" << dendl; - auto [info, part_header_size, part_entry_overhead] - = co_await get_meta(f->rados, f->obj, f->ioc, std::nullopt, - asio::deferred); - std::unique_lock l(f->m); - if (info.version.same_or_later(f->info.version)) { - f->info = std::move(info); - f->part_header_size = part_header_size; - f->part_entry_overhead = part_entry_overhead; - } - } catch (const sys::system_error& e) { - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " read_meta failed: " << e.what() << dendl; - co_return e.code(); - } - co_return sys::error_code{}; - }, rados.get_executor()), - token, dpp, this); - } - - /// \brief Update local metadata - /// - /// \param dpp Debugging prefix provider - /// \param objv Current metadata version (objclass will error on mismatch) - /// \param update Changes to make to metadata - /// \param token Boost.Asio CompletionToken - /// - /// \exception boost::system::system_error equivalent to - /// boost::system::errc::operation_canceled on version mismatch. - void apply_update(const DoutPrefixProvider *dpp, - rados::cls::fifo::info* info, - const rados::cls::fifo::objv& objv, - const rados::cls::fifo::update& update) { - ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " entering" << dendl; - std::unique_lock l(m); - if (objv != info->version) { - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " version mismatch, canceling" << dendl; - throw boost::system::system_error(ECANCELED, - boost::system::generic_category()); - } - - info->apply_update(update); - } - - /// \brief Update metadata locally and in the objclass - /// - /// \param dpp Debug prefix provider - /// \param update The update to apply - /// \param objv Current object version - /// \param token Boost.Asio CompletionToken - /// - /// \return True if the operation was canceled. false otherwise in a - /// way appropriate to the completion token. - template CompletionToken> - auto update_meta(const DoutPrefixProvider* dpp, - rados::cls::fifo::update update, - rados::cls::fifo::objv version, - CompletionToken&& token) { - namespace asio = boost::asio; - namespace sys = boost::system; - namespace fifo = rados::cls::fifo; - return asio::async_initiate - (asio::experimental::co_composed - ([](auto state, const DoutPrefixProvider* dpp, - fifo::update update, fifo::objv version, - FIFO* f) -> void { - try { - state.throw_if_cancelled(true); - state.reset_cancellation_state(asio::enable_terminal_cancellation()); - - ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " entering" << dendl; - - auto [ec] = co_await f->update_meta(version, update, - asio::as_tuple(asio::deferred)); - bool canceled; - if (ec && ec != sys::errc::operation_canceled) { - throw sys::system_error(ec); - } - canceled = (ec == sys::errc::operation_canceled); - if (!canceled) { - try { - f->apply_update(dpp, &f->info, version, update); - } catch (const sys::system_error& e) { - if (e.code() == sys::errc::operation_canceled) { - canceled = true; - } else { - throw; - } - } - } - if (canceled) { - co_await f->read_meta(dpp, asio::deferred); - } - if (canceled) { - ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " canceled" << dendl; - } - co_return {sys::error_code{}, canceled}; - } catch (const sys::system_error& e) { - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " failed with error: " << e.what() << dendl; - co_return {e.code(), false}; - } - }, rados.get_executor()), - token, dpp, std::move(update), std::move(version), this); - } - - /// \brief Process the journal - /// - /// \param dpp Debug prefix provider - /// \param token Boost.Asio CompletionToken - /// - /// \return Nothing, but may error in a way appropriate to the - /// completion token. - template CompletionToken> - auto process_journal(const DoutPrefixProvider* dpp, - CompletionToken&& token) { - namespace asio = boost::asio; - namespace sys = boost::system; - namespace fifo = rados::cls::fifo; - return asio::async_initiate - (asio::experimental::co_composed - ([](auto state, const DoutPrefixProvider* dpp, FIFO* f) -> void { - try { - state.throw_if_cancelled(true); - state.reset_cancellation_state(asio::enable_terminal_cancellation()); - - ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " entering." << dendl; - std::vector processed; - - std::unique_lock l(f->m); - auto tmpjournal = f->info.journal; - auto new_tail = f->info.tail_part_num; - auto new_head = f->info.head_part_num; - auto new_max = f->info.max_push_part_num; - l.unlock(); - - for (auto& entry : tmpjournal) { - ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " processing entry: entry=" << entry - << dendl; - switch (entry.op) { - using enum fifo::journal_entry::Op; - case create: - ldpp_dout(dpp, 10) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " Creating part " << entry.part_num << dendl; - co_await f->create_part(entry.part_num, asio::deferred); - if (entry.part_num > new_max) { - new_max = entry.part_num; - } - break; - case set_head: - ldpp_dout(dpp, 10) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " Setting head to " << entry.part_num << dendl; - if (entry.part_num > new_head) { - new_head = entry.part_num; - } - break; - case remove: - try { - ldpp_dout(dpp, 10) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " Removing part " << entry.part_num - << dendl; - - co_await f->remove_part(entry.part_num, asio::deferred); - if (entry.part_num >= new_tail) { - new_tail = entry.part_num + 1; - } - } catch (const sys::system_error& e) { - if (e.code() != sys::errc::no_such_file_or_directory) { - throw; - } - } - break; - default: - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " unknown journaled op: entry=" << entry - << dendl; - throw sys::system_error{EINVAL, sys::generic_category()}; - } - - processed.push_back(std::move(entry)); - } - - // Postprocess - bool canceled = true; + executor_type executor; - for (auto i = 0; canceled && i < MAX_RACE_RETRIES; ++i) { - ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " postprocessing: i=" << i << dendl; - - std::optional tail_part_num; - std::optional head_part_num; - std::optional max_part_num; - - std::unique_lock l(f->m); - auto objv = f->info.version; - if (new_tail > tail_part_num) - tail_part_num = new_tail; - if (new_head > f->info.head_part_num) - head_part_num = new_head; - if (new_max > f->info.max_push_part_num) - max_part_num = new_max; - l.unlock(); - - if (processed.empty() && !tail_part_num && !max_part_num) { - ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " nothing to update any more: i=" << i - << dendl; - canceled = false; - break; - } - auto u = fifo::update().tail_part_num(tail_part_num) - .head_part_num(head_part_num).max_push_part_num(max_part_num) - .journal_entries_rm(processed); - ldpp_dout(dpp, 10) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " Calling update_meta: update=" << u - << dendl; - - canceled = co_await f->update_meta(dpp, u, objv, asio::deferred); - if (canceled) { - std::vector new_processed; - std::unique_lock l(f->m); - ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " update canceled, retrying: i=" << i - << dendl; - for (auto& e : processed) { - if (f->info.journal.contains(e)) { - new_processed.push_back(e); - } - } - processed = std::move(new_processed); - } - } - if (canceled) { - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " canceled too many times, giving up" << dendl; - throw sys::system_error(ECANCELED, sys::generic_category()); - } - } catch (const sys::system_error& e) { - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " process_journal failed" << ":" << e.what() - << dendl; - co_return e.code(); - } - co_return sys::error_code{}; - }, rados.get_executor()), - token, dpp, this); +public: + auto get_executor() const { + return executor; } - /// \brief Create a new part - /// - /// And possibly set it as head - /// - /// \param dpp Debug prefix provider - /// \param new_part_num New part to create - /// \param is_head True if part is to be new head - /// \param token Boost.Asio CompletionToken - /// - /// \return Nothing, but may error in a way appropriate to the - /// completion token. - template CompletionToken> - auto prepare_new_part(const DoutPrefixProvider* dpp, - std::int64_t new_part_num, bool is_head, - CompletionToken&& token) { - namespace asio = boost::asio; - namespace sys = boost::system; - namespace fifo = rados::cls::fifo; - return asio::async_initiate - (asio::experimental::co_composed - ([](auto state, const DoutPrefixProvider* dpp, - std::int64_t new_part_num, bool is_head, FIFO* f) -> void { - try { - state.throw_if_cancelled(true); - state.reset_cancellation_state(asio::enable_terminal_cancellation()); + /// The default maximum size of every part object (that is, every + /// object holding entries) + static constexpr std::uint64_t default_max_part_size = 4 * 1024 * 1024; + /// The default maximum entry size + static constexpr std::uint64_t default_max_entry_size = 32 * 1024; - ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " entering" << dendl; - std::unique_lock l(f->m); - using enum fifo::journal_entry::Op; - std::vector jentries{{create, new_part_num}}; - if (f->info.journal.contains({create, new_part_num}) && - (!is_head || f->info.journal.contains({set_head, new_part_num}))) { - l.unlock(); - ldpp_dout(dpp, 5) - << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " new part journaled, but not processed" - << dendl; - co_await f->process_journal(dpp, asio::deferred); - co_return sys::error_code{}; - } - auto version = f->info.version; +private: - if (is_head) { - ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " needs new head" << dendl; - jentries.push_back({set_head, new_part_num}); - } - l.unlock(); + std::shared_ptr impl; - bool canceled = true; - for (auto i = 0; canceled && i < MAX_RACE_RETRIES; ++i) { - canceled = false; - ldpp_dout(dpp, 20) - << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " updating metadata: i=" << i << dendl; - auto u = fifo::update{}.journal_entries_add(jentries); - canceled = co_await f->update_meta(dpp, u, version, - asio::deferred); - if (canceled) { - std::unique_lock l(f->m); - version = f->info.version; - auto found = (f->info.journal.contains({create, new_part_num}) || - f->info.journal.contains({set_head, new_part_num})); - if ((f->info.max_push_part_num >= new_part_num && - f->info.head_part_num >= new_part_num)) { - ldpp_dout(dpp, 20) - << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " raced, but journaled and processed: i=" << i - << dendl; - co_return sys::error_code{}; - } - if (found) { - ldpp_dout(dpp, 20) - << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " raced, journaled but not processed: i=" << i - << dendl; - canceled = false; - } - l.unlock(); - } - } - if (canceled) { - ldpp_dout(dpp, -1) - << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " canceled too many times, giving up" << dendl; - throw sys::system_error{ECANCELED, sys::generic_category()}; - } - co_await f->process_journal(dpp, asio::deferred); - co_return sys::error_code{}; - } catch (const sys::system_error& e) { - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " prepare_new_part failed: " << e.what() - << dendl; - co_return e.code(); - } - }, rados.get_executor()), - token, dpp, new_part_num, is_head, this); - } + FIFO(std::shared_ptr&& impl) + : executor(impl->get_executor()), impl(std::move(impl)) {} - /// \brief Set a new part as head - /// - /// In practice we create a new part and set it as head in one go. + /// Make sure each operation has a reference to the implementation /// - /// \param dpp Debug prefix provider - /// \param new_head_part_num New head part number - /// \param token Boost.Asio CompletionToken + /// I don't think we need a work-guard since `co_composed` let us + /// pass executors to keep alive /// - /// \return Nothing, but may error in a way appropriate to the - /// completion token. - template CompletionToken> - auto prepare_new_head(const DoutPrefixProvider* dpp, - std::int64_t new_head_part_num, - CompletionToken&& token) { - namespace asio = boost::asio; - namespace sys = boost::system; - namespace fifo = rados::cls::fifo; - return asio::async_initiate - (asio::experimental::co_composed - ([](auto state, const DoutPrefixProvider* dpp, - std::int64_t new_head_part_num, FIFO* f) -> void { - try { - state.throw_if_cancelled(true); - state.reset_cancellation_state(asio::enable_terminal_cancellation()); - - ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " entering" << dendl; - std::unique_lock l(f->m); - auto max_push_part_num = f->info.max_push_part_num; - auto version = f->info.version; - l.unlock(); - - if (max_push_part_num < new_head_part_num) { - ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " need new part" << dendl; - co_await f->prepare_new_part(dpp, new_head_part_num, true, - asio::deferred); - std::unique_lock l(f->m); - if (f->info.max_push_part_num < new_head_part_num) { - ldpp_dout(dpp, -1) - << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " inconsistency, push part less than head part." - << dendl; - throw sys::system_error(EIO, sys::generic_category()); - } - l.unlock(); - } - - using enum fifo::journal_entry::Op; - fifo::journal_entry jentry; - jentry.op = set_head; - jentry.part_num = new_head_part_num; - - bool canceled = true; - for (auto i = 0; canceled && i < MAX_RACE_RETRIES; ++i) { - canceled = false; - ldpp_dout(dpp, 20) - << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " updating metadata: i=" << i << dendl; - auto u = fifo::update{}.journal_entries_add({{jentry}}); - canceled = co_await f->update_meta(dpp, u, version, - asio::deferred); - if (canceled) { - std::unique_lock l(f->m); - auto found = - (f->info.journal.contains({create, new_head_part_num}) || - f->info.journal.contains({set_head, new_head_part_num})); - version = f->info.version; - if ((f->info.head_part_num >= new_head_part_num)) { - ldpp_dout(dpp, 20) - << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " raced, but journaled and processed: i=" << i - << dendl; - co_return sys::error_code{}; - } - if (found) { - ldpp_dout(dpp, 20) - << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " raced, journaled but not processed: i=" << i - << dendl; - canceled = false; - } - l.unlock(); - } - } - if (canceled) { - ldpp_dout(dpp, -1) - << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " canceled too many times, giving up" << dendl; - throw sys::system_error(ECANCELED, sys::generic_category()); - } - co_await f->process_journal(dpp, asio::deferred); - co_return sys::error_code{}; - } catch (const sys::system_error& e) { - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " prepare_new_head failed: " << e.what() - << dendl; - co_return e.code(); - } - }, rados.get_executor()), - token, dpp, new_head_part_num, this); + /// \param token The token to annotate + template + auto consign(CompletionToken&& token) { + return boost::asio::consign( + std::forward(token), + // Even though RADOS holds the executor across operations, we + // still need it live between operations. + std::make_pair(impl, get_executor())); } - ///@} - - public: - FIFO(const FIFO&) = delete; - FIFO& operator =(const FIFO&) = delete; - FIFO(FIFO&&) = delete; - FIFO& operator =(FIFO&&) = delete; - /// \brief Open an existing FIFO /// /// \param dpp Prefix provider for debug logging @@ -1200,7 +118,7 @@ public: void(boost::system::error_code, std::unique_ptr)> CompletionToken> static auto open(const DoutPrefixProvider* dpp, - neorados::RADOS& rados, + neorados::RADOS rados, neorados::Object obj, neorados::IOContext ioc, CompletionToken&& token, @@ -1210,44 +128,27 @@ public: namespace sys = boost::system; return asio::async_initiate)> - (asio::experimental::co_composed)> - ([](auto state, const DoutPrefixProvider* dpp, neorados::RADOS& rados, + (asio::co_composed)> + ([](auto state, const DoutPrefixProvider* dpp, neorados::RADOS rados, neorados::Object obj, neorados::IOContext ioc, std::optional objv, bool probe) -> void { try { state.throw_if_cancelled(true); state.reset_cancellation_state(asio::enable_terminal_cancellation()); - ldpp_dout(dpp, 20) - << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " entering" << dendl; - auto [info, size, over] = co_await get_meta(rados, obj, ioc, objv, - asio::deferred); - std::unique_ptr f(new FIFO(rados, - std::move(obj), - std::move(ioc), - size, over, std::move(info))); - probe = 0; - - // If there are journal entries, process them, in case - // someone crashed mid-transaction. - if (!info.journal.empty()) { - ldpp_dout(dpp, 20) - << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " processing leftover journal" << dendl; - co_await f->process_journal(dpp, asio::deferred); - } - co_return {sys::error_code{}, std::move(f)}; - } catch (const sys::system_error& e) { - if (!probe || - (probe && !(e.code() == sys::errc::no_such_file_or_directory || - e.code() == sys::errc::no_message_available))) { - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " open failed:" << e.what() << dendl; - } - co_return {e.code(), std::unique_ptr{}}; - } + auto e = rados.get_executor(); + auto impl = std::make_shared(std::move(rados), + std::move(obj), + std::move(ioc)); + + co_await impl->do_open(dpp, objv, probe, + boost::asio::consign(asio::deferred, impl)); + co_return {sys::error_code{}, + std::unique_ptr{new FIFO(std::move(impl))}}; + } catch (const sys::system_error &e) { + co_return {e.code(), std::unique_ptr{}}; + } }, rados.get_executor()), - token, dpp, std::ref(rados), std::move(obj), std::move(ioc), + token, dpp, std::move(rados), std::move(obj), std::move(ioc), std::move(objv), probe); } @@ -1270,7 +171,7 @@ public: void(boost::system::error_code, std::unique_ptr)> CompletionToken> static auto create(const DoutPrefixProvider* dpp, - neorados::RADOS& rados, + neorados::RADOS rados, neorados::Object obj, neorados::IOContext ioc, CompletionToken&& token, @@ -1283,9 +184,9 @@ public: namespace sys = boost::system; return asio::async_initiate)> - (asio::experimental::co_composed)> - ([](auto state, const DoutPrefixProvider* dpp, neorados::RADOS& rados, + ([](auto state, const DoutPrefixProvider* dpp, neorados::RADOS rados, neorados::Object obj, neorados::IOContext ioc, std::optional objv, std::optional oid_prefix, bool exclusive, @@ -1294,24 +195,23 @@ public: state.throw_if_cancelled(true); state.reset_cancellation_state(asio::enable_terminal_cancellation()); - ldpp_dout(dpp, 20) - << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " entering" << dendl; - ldpp_dout(dpp, 10) - << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " Calling create_meta" << dendl; - co_await create_meta(rados, obj, ioc, objv, oid_prefix, exclusive, - max_part_size, max_entry_size, asio::deferred); - auto f = co_await open(dpp, rados, std::move(obj), std::move(ioc), - asio::deferred, objv); - co_return std::make_tuple(sys::error_code{}, std::move(f)); + auto impl = std::make_shared(std::move(rados), + std::move(obj), + std::move(ioc)); + + co_await impl->do_create(dpp, objv, std::move(oid_prefix), exclusive, + max_part_size, max_entry_size, + asio::deferred); + co_return {sys::error_code{}, + std::unique_ptr{new FIFO(std::move(impl))}}; + } catch (const sys::system_error& e) { - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " create failed: " << e.what() << dendl; + ldpp_dout_fmt(dpp, -1, "FIFO::create:{}: create failed: {}", + __LINE__, e.what()); co_return {e.code(), std::unique_ptr{}}; } }, rados.get_executor()), - token, dpp, std::ref(rados), std::move(obj), std::move(ioc), + token, dpp, std::move(rados), std::move(obj), std::move(ioc), std::move(objv), std::move(oid_prefix), exclusive, max_part_size, max_entry_size); } @@ -1325,143 +225,15 @@ public: /// \return Nothing, but may error in a way appropriate to the /// completion token. template CompletionToken> + void(boost::system::error_code)> CompletionToken> auto push(const DoutPrefixProvider* dpp, - std::deque entries, + std::deque entries, CompletionToken&& token) { - namespace asio = boost::asio; - namespace sys = boost::system; - namespace buffer = ceph::buffer; - return asio::async_initiate - (asio::experimental::co_composed - ([](auto state, const DoutPrefixProvider* dpp, - std::deque remaining, FIFO* f) -> void { - try { - state.throw_if_cancelled(true); - state.reset_cancellation_state(asio::enable_terminal_cancellation()); - - std::unique_lock l(f->m); - auto max_entry_size = f->info.params.max_entry_size; - auto need_new_head = f->info.need_new_head(); - auto head_part_num = f->info.head_part_num; - l.unlock(); - ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " entering" << dendl; - if (remaining.empty()) { - ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " empty push, returning success" << dendl; - co_return sys::error_code{}; - } - - // Validate sizes - for (const auto& bl : remaining) { - if (bl.length() > max_entry_size) { - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " entry bigger than max_entry_size" - << dendl; - co_return sys::error_code{E2BIG, sys::generic_category()}; - } - } - - if (need_new_head) { - ldpp_dout(dpp, 10) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " need new head: " - << head_part_num + 1 << dendl; - co_await f->prepare_new_head(dpp, head_part_num + 1, asio::deferred); - } - - std::deque batch; - - uint64_t batch_len = 0; - auto retries = 0; - bool canceled = true; - while ((!remaining.empty() || !batch.empty()) && - (retries <= MAX_RACE_RETRIES)) { - ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " preparing push: remaining=" << remaining.size() - << " batch=" << batch.size() << " retries=" << retries - << dendl; - std::unique_lock l(f->m); - head_part_num = f->info.head_part_num; - auto max_part_size = f->info.params.max_part_size; - auto overhead = f->part_entry_overhead; - l.unlock(); - - while (!remaining.empty() && - (remaining.front().length() + batch_len <= max_part_size)) { - /* We can send entries with data_len up to max_entry_size, - however, we want to also account the overhead when - dealing with multiple entries. Previous check doesn't - account for overhead on purpose. */ - batch_len += remaining.front().length() + overhead; - batch.push_back(std::move(remaining.front())); - remaining.pop_front(); - } - ldpp_dout(dpp, 20) - << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " prepared push: remaining=" << remaining.size() - << " batch=" << batch.size() << " retries=" << retries - << " batch_len=" << batch_len << dendl; - - auto [ec, n] = - co_await f->push_entries(dpp, batch, - asio::as_tuple(asio::deferred)); - if (ec == sys::errc::result_out_of_range) { - canceled = true; - ++retries; - ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " need new head " << head_part_num + 1 - << dendl; - co_await f->prepare_new_head(dpp, head_part_num + 1, - asio::deferred); - continue; - } else if (ec == sys::errc::no_such_file_or_directory) { - ldpp_dout(dpp, 20) - << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " racing client trimmed part, rereading metadata " - << dendl; - canceled = true; - ++retries; - co_await f->read_meta(dpp, asio::deferred); - continue; - } else if (ec) { - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " push_entries failed: " << ec.message() - << dendl; - throw sys::system_error(ec); - } - assert(n >= 0); - // Made forward progress! - canceled = false; - retries = 0; - batch_len = 0; - if (n == ssize(batch)) { - batch.clear(); - } else { - batch.erase(batch.begin(), batch.begin() + n); - for (const auto& b : batch) { - batch_len += b.length() + overhead; - } - } - } - if (canceled) { - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " canceled too many times, giving up." - << dendl; - co_return sys::error_code{ECANCELED, sys::generic_category()}; - } - co_return sys::error_code{}; - } catch (const sys::system_error& e) { - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " push failed: " << e.what() << dendl; - co_return e.code(); - } - }, rados.get_executor()), - token, dpp, std::move(entries), this); + return impl->push(dpp, std::move(entries), + consign(std::forward(token))); } - /// \brief Push an entry to the FIFO + /// \brief Push entries to the FIFO /// /// \param dpp Prefix provider for debug logging /// \param entries Entries to push @@ -1510,115 +282,16 @@ public: /// returned entries, and marker is non-null if the listing was /// incomplete, in a way appropriate to the completion token. template, - std::string)> CompletionToken> + void(boost::system::error_code, std::span, + std::string)> CompletionToken> auto list(const DoutPrefixProvider* dpp, std::string markstr, std::span entries, CompletionToken&& token) { - namespace asio = boost::asio; - namespace sys = boost::system; - return asio::async_initiate, - std::string)> - (asio::experimental::co_composed, - std::string)> - ([](auto state, const DoutPrefixProvider* dpp, - std::string markstr, std::span entries, - FIFO* f) -> void { - try { - state.throw_if_cancelled(true); - state.reset_cancellation_state(asio::enable_terminal_cancellation()); - - ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " entering" << dendl; - std::unique_lock l(f->m); - std::int64_t part_num = f->info.tail_part_num; - std::uint64_t ofs = 0; - if (!markstr.empty()) { - auto marker = f->to_marker(markstr, l); - if (!marker) { - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " invalid marker string: " << markstr - << dendl; - throw sys::system_error{EINVAL, sys::generic_category()}; - } - part_num = marker->num; - ofs = marker->ofs; - } - l.unlock(); - - bool more = false; - - auto entries_left = entries; - - while (entries_left.size() > 0) { - more = false; - ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " entries_left.size()=" - << entries_left.size() << dendl; - auto [ec, res, part_more, part_full] = - co_await f->list_part(dpp, part_num, ofs, entries_left, - asio::as_tuple(asio::deferred)); - if (ec == sys::errc::no_such_file_or_directory) { - ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " missing part, rereading metadata" - << dendl; - co_await f->read_meta(dpp, asio::deferred); - std::unique_lock l(f->m); - if (part_num < f->info.tail_part_num) { - /* raced with trim? restart */ - ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " raced with trim, restarting" << dendl; - entries_left = entries; - part_num = f->info.tail_part_num; - l.unlock(); - ofs = 0; - continue; - } - l.unlock(); - ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " assuming part was not written yet, " - << "so end of data" << dendl; - break; - } else if (ec) { - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " list_entries failed: " << ec.message() - << dendl; - throw sys::system_error(ec); - } - more = part_full || part_more; - entries_left = entries_left.last(entries_left.size() - res.size()); - - if (!part_full) { - ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " head part is not full, so we can assume " - << "we're done" << dendl; - break; - } - if (!part_more) { - ++part_num; - ofs = 0; - } - } - std::string marker; - if (entries_left.size() > 0) { - entries = entries.first(entries.size() - entries_left.size()); - } - if (more && !entries.empty()) { - marker = entries.back().marker; - } - co_return {sys::error_code{}, std::move(entries), std::move(marker)}; - } catch (const sys::system_error& e) { - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " list failed: " << e.what() << dendl; - co_return {e.code(), std::span{}, std::string{}}; - } - }, rados.get_executor()), - token, dpp, std::move(markstr), std::move(entries), this); + return impl->list(dpp, std::move(markstr), entries, + consign(std::forward(token))); } - /// \brief Push entries to the FIFO + /// \brief Trim entries from the FIFO /// /// \param dpp Prefix provider for debug logging /// \param marker Marker to which to trim @@ -1629,193 +302,71 @@ public: /// \return Nothing, but may error in a way appropriate to the /// completion token. template CompletionToken> + void(boost::system::error_code)> CompletionToken> auto trim(const DoutPrefixProvider* dpp, std::string marker, bool exclusive, CompletionToken&& token) { - namespace asio = boost::asio; - namespace sys = boost::system; - namespace fifo = rados::cls::fifo; - return asio::async_initiate - (asio::experimental::co_composed - ([](auto state, const DoutPrefixProvider* dpp, - std::string markstr, bool exclusive, FIFO* f) -> void { - try { - state.throw_if_cancelled(true); - state.reset_cancellation_state(asio::enable_terminal_cancellation()); - - ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " entering" << dendl; - bool overshoot = false; - std::unique_lock l(f->m); - auto marker = f->to_marker(markstr, l); - if (!marker) { - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " invalid marker string: " << markstr - << dendl; - throw sys::system_error{EINVAL, sys::generic_category()}; - } - auto part_num = marker->num; - auto ofs = marker->ofs; - auto hn = f->info.head_part_num; - const auto max_part_size = f->info.params.max_part_size; - if (part_num > hn) { - l.unlock(); - co_await f->read_meta(dpp, asio::deferred); - l.lock(); - hn = f->info.head_part_num; - if (part_num > hn) { - overshoot = true; - part_num = hn; - ofs = max_part_size; - } - } - if (part_num < f->info.tail_part_num) { - throw sys::system_error(ENODATA, sys::generic_category()); - } - auto pn = f->info.tail_part_num; - l.unlock(); - - while (pn < part_num) { - ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " pn=" << pn << dendl; - auto [ec] = co_await f->trim_part(dpp, pn, max_part_size, false, - asio::as_tuple(asio::deferred)); - if (ec && ec == sys::errc::no_such_file_or_directory) { - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " trim_part failed: " << ec.message() - << dendl; - throw sys::system_error(ec); - } - ++pn; - } - - auto [ec] = co_await f->trim_part(dpp, pn, ofs, exclusive, - asio::as_tuple(asio::deferred)); - if (ec && ec == sys::errc::no_such_file_or_directory) { - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " trim_part failed: " << ec.message() - << dendl; - throw sys::system_error(ec); - } + return impl->trim(dpp, std::move(marker), exclusive, + consign(std::forward(token))); + } - l.lock(); - auto tail_part_num = f->info.tail_part_num; - auto objv = f->info.version; - l.unlock(); - bool canceled = tail_part_num < part_num; - int retries = 0; - while ((tail_part_num < part_num) && - canceled && - (retries <= MAX_RACE_RETRIES)) { - canceled = co_await f->update_meta(dpp, fifo::update{} - .tail_part_num(part_num), - objv, asio::deferred); - if (canceled) { - ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " canceled: retries=" << retries - << dendl; - l.lock(); - tail_part_num = f->info.tail_part_num; - objv = f->info.version; - l.unlock(); - ++retries; - } - } - if (canceled) { - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " canceled too many times, giving up" - << dendl; - throw sys::system_error(EIO, sys::generic_category()); - } - co_return (overshoot ? - sys::error_code{ENODATA, sys::generic_category()} : - sys::error_code{}); - co_return sys::error_code{}; - } catch (const sys::system_error& e) { - if (ceph::from_error_code(e.code()) != -ENODATA) { - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " trim failed: " << e.what() << dendl; - } - co_return e.code(); - } - }, rados.get_executor()), - token, dpp, std::move(marker), exclusive, this); + /// \brief Get information on the last entry + /// + /// \param dpp Prefix provider for debug logging + /// \param token Boost.Asio CompletionToken + /// + /// \return {marker, time} for the latest entry in a way appropriate + /// to the completion token. + template CompletionToken> + auto last_entry_info(const DoutPrefixProvider* dpp, + CompletionToken&& token) { + return impl->last_entry_info(dpp, + consign(std::forward(token))); } static constexpr auto max_list_entries = rados::cls::fifo::op::MAX_LIST_ENTRIES; - /// The default maximum size of every part object (that is, every - /// object holding entries) - static constexpr std::uint64_t default_max_part_size = 4 * 1024 * 1024; - - /// The default maximum entry size - static constexpr std::uint64_t default_max_entry_size = 32 * 1024; - /// Return a marker comparing less than any other marker. static auto min_marker() { - return marker{std::numeric_limits::max(), - std::numeric_limits::max()} + using detail::FIFOImpl; + return FIFOImpl::marker{ + std::numeric_limits::max(), + std::numeric_limits::max()} .to_string(); } /// Return a marker comparing greater than any other marker. static auto max_marker() { - return marker{std::numeric_limits::max(), - std::numeric_limits::max()} + using detail::FIFOImpl; + return FIFOImpl::marker{ + std::numeric_limits::max(), + std::numeric_limits::max()} .to_string(); } - /// \brief Get information on the last entry + /// \brief Retrieve FIFO metadata /// - /// \param dpp Prefix provider for debug logging + /// \param rados RADOS handle + /// \param obj Head object for FIFO + /// \param ioc Locator /// \param token Boost.Asio CompletionToken + /// \param objv Operation will fail if FIFO is not at this version /// - /// \return {marker, time} for the latest entry in a way appropriate - /// to the completion token. + /// \return The metadata info, part header size, and entry overhead + /// in a way appropriate to the completion token. template - CompletionToken> - auto last_entry_info(const DoutPrefixProvider* dpp, + void(boost::system::error_code, rados::cls::fifo::info, + uint32_t, uint32_t)> CompletionToken> + static auto get_meta(neorados::RADOS rados, Object obj, IOContext ioc, + std::optional objv, CompletionToken&& token) { - namespace asio = boost::asio; - namespace sys = boost::system; - namespace buffer = ceph::buffer; - return asio::async_initiate< - CompletionToken, void(sys::error_code, std::string, ceph::real_time)> - (asio::experimental::co_composed< - void(sys::error_code, std::string, ceph::real_time)> - ([](auto state, const DoutPrefixProvider* dpp, - FIFO* f) -> void { - try { - state.throw_if_cancelled(true); - state.reset_cancellation_state(asio::enable_terminal_cancellation()); - - co_await f->read_meta(dpp, asio::deferred); - std::unique_lock l(f->m); - auto head_part_num = f->info.head_part_num; - l.unlock(); - if (head_part_num < 0) { - co_return {sys::error_code{}, std::string{}, - ceph::real_clock::zero()}; - } else { - auto header = - co_await f->get_part_info(head_part_num, asio::deferred); - co_return {sys::error_code{}, - marker{head_part_num, header.last_ofs}.to_string(), - header.max_time}; - } - } catch (const sys::system_error& e) { - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " push failed: " << e.what() << dendl; - co_return {e.code(), std::string{}, ceph::real_time{}}; - } - }, rados.get_executor()), - token, dpp, this); + return detail::FIFOImpl::get_meta(rados, std::move(obj), std::move(ioc), + std::move(objv), + std::forward(token)); } }; } // namespace neorados::cls::fifo { -#pragma GCC diagnostic pop diff --git a/src/neorados/cls/fifo/detail/fifo.h b/src/neorados/cls/fifo/detail/fifo.h new file mode 100644 index 00000000000..e11215fa462 --- /dev/null +++ b/src/neorados/cls/fifo/detail/fifo.h @@ -0,0 +1,1602 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab ft=cpp + +/* + * Ceph - scalable distributed file system + * + * Copyright contributors to the Ceph project + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +#include "include/neorados/RADOS.hpp" + +#include "common/async/service.h" + +#include "common/dout.h" +#include "common/dout_fmt.h" + +#include "cls/fifo/cls_fifo_types.h" +#include "cls/fifo/cls_fifo_ops.h" + +#include "neorados/cls/common.h" +#include "neorados/cls/fifo/entry.h" + +#pragma once + +/// \file neorados/cls/fifo/detail/fifo.h +/// +/// \brief NeoRADOS implementation of FIFO class + +namespace neorados::cls::fifo { +class FIFOtest; +namespace detail { + +namespace asio = boost::asio; +namespace sys = boost::system; + +namespace async = ceph::async; +namespace buffer = ceph::buffer; +namespace fifo = rados::cls::fifo; + +using neorados::RADOS; +using neorados::Object; +using neorados::IOContext; +using neorados::ReadOp; +using neorados::WriteOp; + + +class FIFOImpl : public std::enable_shared_from_this, + public async::service_list_base_hook { + friend FIFOtest; + friend async::service; + + /// Maximum number of retries if we run into a conflict. Limited to + /// keep us from locking up if we never hit the end condition. + static constexpr auto MAX_RACE_RETRIES = 10; + + /// RADOS handle + RADOS rados; + /// Head object + const Object obj; + /// Locator + const IOContext ioc; + /// Total size of the part header. + std::uint32_t part_header_size; + /// The additional space required to store an entry, above the size + /// of the entry itself. + std::uint32_t part_entry_overhead; + + async::service& svc; + std::uint64_t subsystem = 0; + + void service_shutdown() { + // In case the last owner of a reference is an op we're about to + // cancel. (This can happen if the `RADOS` object + auto service_ref = shared_from_this(); + rados.cancel_subsystem(subsystem); + } + + /// Local copy of FIFO data + fifo::info info; + + /// Mutex protecting local data; + std::mutex m; + +public: + + /// Constructor + /// + /// \param rados RADOS handle + /// \param obj Head object + /// \param ioc Locator + /// \param part_header_size Total size of the part header. + /// \param part_entry_overhead The additional space required to + /// store an entry, above the size of the + /// entry itself. + FIFOImpl(RADOS rados, Object obj, IOContext ioc) + : rados(rados), obj(std::move(obj)), ioc(std::move(ioc)), + svc(boost::asio::use_service>( + boost::asio::query(rados.get_executor(), + boost::asio::execution::context))), + subsystem(rados.new_subsystem()) { + svc.add(*this); + } + + ~FIFOImpl() { + svc.remove(*this); + } + + /// A marker is a part number and byte offset used to indicate an + /// entry. + struct marker { + std::int64_t num = 0; + std::uint64_t ofs = 0; + + /// Default constructor + marker() = default; + + /// Construct from part and offset + /// + /// \param num Part number + /// \param ofs Offset within the part + marker(std::int64_t num, std::uint64_t ofs) : num(num), ofs(ofs) {} + + /// Return a string representation of the marker + std::string to_string() { + return fmt::format("{:0>20}:{:0>20}", num, ofs); + } + }; + + + /// \name Primitives + /// + /// Low-level coroutininized operations in the FIFO objclass. + ///@{ + +#if !defined(BROKEN_CO_COMPOSED) + /// \brief Retrieve FIFO metadata + /// + /// \param rados RADOS handle + /// \param obj Head object for FIFO + /// \param ioc Locator + /// \param token Boost.Asio CompletionToken + /// \param objv Operation will fail if FIFO is not at this version + /// + /// \return The metadata info, part header size, and entry overhead + /// in a way appropriate to the completion token. + template CompletionToken> + static auto get_meta(RADOS& rados, Object obj, IOContext ioc, + std::optional objv, + CompletionToken&& token) { + ReadOp op; + fifo::op::get_meta gm; + gm.version = objv; + return exec( + rados, std::move(obj), std::move(ioc), + fifo::op::CLASS, fifo::op::GET_META, std::move(gm), + [](fifo::op::get_meta_reply&& ret) { + return std::make_tuple(std::move(ret.info), + ret.part_header_size, + ret.part_entry_overhead); + }, std::forward(token)); + } +#else // BROKEN_CO_COMPOSED + /// \brief Retrieve FIFO metadata + /// + /// \param rados RADOS handle + /// \param obj Head object for FIFO + /// \param ioc Locator + /// \param objv Operation will fail if FIFO is not at this version + /// \param token Boost.Asio CompletionToken + /// + /// \return The metadata info, part header size, and entry overhead + /// in a way appropriate to the completion token. + template CompletionToken> + static auto get_meta(RADOS& rados, Object obj, IOContext ioc, + std::optional objv, + CompletionToken&& token) { + return asio::async_initiate< + CompletionToken, void(sys::error_code, fifo::info, + uint32_t, uint32_t)> + (asio::co_composed< + void(sys::error_code, fifo::info, uint32_t, uint32_t)> + ([](auto state, RADOS& r, Object obj, IOContext ioc, + std::optional objv) -> void { + try { + state.throw_if_cancelled(true); + state.reset_cancellation_state(asio::enable_terminal_cancellation()); + buffer::list in; + fifo::op::get_meta gm; + gm.version = objv; + encode(gm, in); + ReadOp op; + buffer::list out; + op.exec(fifo::op::CLASS, fifo::op::GET_META, in, &out); + co_await r.execute(std::move(obj), std::move(ioc), std::move(op), nullptr, + asio::deferred); + fifo::op::get_meta_reply ret; + decode(ret, out); + co_return std::make_tuple(sys::error_code{}, std::move(ret.info), + ret.part_header_size, + ret.part_entry_overhead); + } catch (const sys::system_error& e) { + co_return std::make_tuple(e.code(), fifo::info{}, uint32_t{}, + uint32_t{}); + } + }, rados.get_executor()), + token, std::ref(rados), std::move(obj), std::move(ioc), std::move(objv)); + } +#endif // BROKEN_CO_COMPOSED + +#if !defined(BROKEN_CO_COMPOSED) + /// \brief Retrieve part info + /// + /// \param part_num Number of part to query + /// \param token Boost.Asio CompletionToken + /// + /// \return The part info in a way appropriate to the completion token. + template CompletionToken> + auto get_part_info(std::int64_t part_num, + CompletionToken&& token) { + std::unique_lock l(m); + Object part_oid = info.part_oid(part_num);; + l.unlock(); + + return exec( + rados, std::move(std::move(part_oid)), ioc, + fifo::op::CLASS, fifo::op::GET_PART_INFO, + fifo::op::get_part_info{}, + [](fifo::op::get_part_info_reply&& ret) { + return std::move(ret.header); + }, std::forward(token)); + } +#else // BROKEN_CO_COMPOSED + /// \brief Retrieve part info + /// + /// \param part_num Number of part to query + /// \param token Boost.Asio CompletionToken + /// + /// \return The part info in a way appropriate to the completion token. + template + CompletionToken> + auto get_part_info(std::int64_t part_num, + CompletionToken&& token) { + std::unique_lock l(m); + Object part_oid = info.part_oid(part_num);; + l.unlock(); + return asio::async_initiate< + CompletionToken, void(sys::error_code, fifo::part_header)> + (asio::co_composed< + void(sys::error_code, fifo::part_header)> + ([](auto state, Object part_oid, FIFOImpl* f) -> void { + try { + state.throw_if_cancelled(true); + state.reset_cancellation_state(asio::enable_terminal_cancellation()); + buffer::list in; + fifo::op::get_part_info gpi; + encode(gpi, in); + ReadOp op; + buffer::list out; + op.exec(fifo::op::CLASS, fifo::op::GET_PART_INFO, in, &out); + co_await f->rados.execute(std::move(part_oid), f->ioc, std::move(op), + nullptr, asio::deferred); + + fifo::op::get_part_info_reply ret; + decode(ret, out); + co_return std::make_tuple(sys::error_code{}, std::move(ret.header)); + } catch (const sys::system_error& e) { + co_return std::make_tuple(e.code(), fifo::part_header{}); + } + }, rados.get_executor()), + token, std::move(part_oid), this); + } +#endif // BROKEN_CO_COMPOSED + +private: + + /// \brief Create a new part object + /// + /// \param part_num Part number + /// \param token Boost.Asio CompletionToken + /// + /// \return Nothing, but may error in a way appropriate to the + /// completion token. + template CompletionToken> + auto create_part(std::int64_t part_num, CompletionToken&& token) { + WriteOp op; + op.create(false); /* We don't need exclusivity, part_init ensures we're + creating from the same journal entry. */ + std::unique_lock l(m); + fifo::op::init_part ip; + ip.params = info.params; + buffer::list in; + encode(ip, in); + op.exec(fifo::op::CLASS, fifo::op::INIT_PART, std::move(in)); + auto oid = info.part_oid(part_num); + l.unlock(); + return rados.execute(oid, ioc, std::move(op), + std::forward(token)); + } + + /// \brief Remove a part object + /// + /// \param part_num Part number + /// \param token Boost.Asio CompletionToken + /// + /// \return Nothing, but may error in a way appropriate to the + /// completion token. + template CompletionToken> + auto remove_part(std::int64_t part_num, CompletionToken&& token) { + WriteOp op; + op.remove(); + std::unique_lock l(m); + auto oid = info.part_oid(part_num); + l.unlock(); + return rados.execute(oid, ioc, std::move(op), + std::forward(token)); + } + + /// \brief Update objclass FIFO metadata + /// + /// \param objv Current metadata version (objclass will error on mismatch) + /// \param update Changes to make to metadata + /// \param token Boost.Asio CompletionToken + /// + /// \return Nothing, but may error in a way appropriate to the + /// completion token. + template CompletionToken> + auto update_meta(const fifo::objv& objv, const fifo::update& update, + CompletionToken&& token) { + WriteOp op; + fifo::op::update_meta um; + + um.version = objv; + um.tail_part_num = update.tail_part_num(); + um.head_part_num = update.head_part_num(); + um.min_push_part_num = update.min_push_part_num(); + um.max_push_part_num = update.max_push_part_num(); + um.journal_entries_add = std::move(update).journal_entries_add(); + um.journal_entries_rm = std::move(update).journal_entries_rm(); + + buffer::list in; + encode(um, in); + op.exec(fifo::op::CLASS, fifo::op::UPDATE_META, std::move(in)); + return rados.execute(obj, ioc, std::move(op), + std::forward(token)); + } + + /// \brief Create FIFO head object + /// + /// \param rados RADOS handle + /// \param obj Head object for the FIFO + /// \param ioc Locator + /// \param objv Version (error if FIFO exists and this doesn't match) + /// \param oid_prefix Prefix for all object names + /// \param exclusive If true, error if object already exists + /// \param max_part_size Max size of part objects + /// \param max_entry_size Max size of entries + /// \param token Boost.Asio CompletionToken + /// + /// \return Nothing, but may error in a way appropriate to the + /// completion token. + template CompletionToken> + static auto create_meta(RADOS& rados, Object obj, IOContext ioc, + std::optional objv, + std::optional oid_prefix, + bool exclusive, + std::uint64_t max_part_size, + std::uint64_t max_entry_size, + CompletionToken&& token) { + WriteOp op; + fifo::op::create_meta cm; + + cm.id = obj; + cm.version = objv; + cm.oid_prefix = oid_prefix; + cm.max_part_size = max_part_size; + cm.max_entry_size = max_entry_size; + cm.exclusive = exclusive; + + buffer::list in; + encode(cm, in); + op.exec(fifo::op::CLASS, fifo::op::CREATE_META, in); + return rados.execute(std::move(obj), std::move(ioc), std::move(op), + std::forward(token)); + } + + /// \brief Push some entries to a given part + /// + /// \param dpp Prefix for debug prints + /// \param entries Entries to push + /// \param token Boost.Asio CompletionToken + /// + /// \return Possibly errors in a way appropriate to the completion + /// token. + template CompletionToken> + auto push_entries(const DoutPrefixProvider* dpp, + std::deque entries, + CompletionToken&& token) { + return asio::async_initiate + (asio::co_composed + ([](auto state, const DoutPrefixProvider* dpp, + std::deque entries, FIFOImpl* f) -> void { + try { + state.throw_if_cancelled(true); + state.reset_cancellation_state(asio::enable_terminal_cancellation()); + + std::unique_lock l(f->m); + auto head_part_num = f->info.head_part_num; + auto oid = f->info.part_oid(head_part_num); + l.unlock(); + + WriteOp op; + op.assert_exists(); + + fifo::op::push_part pp; + + pp.data_bufs = std::move(entries); + pp.total_len = 0; + + for (const auto &bl : pp.data_bufs) + pp.total_len += bl.length(); + + buffer::list in; + encode(pp, in); + int pushes; + op.exec(fifo::op::CLASS, fifo::op::PUSH_PART, in, + [&pushes](sys::error_code, int r, const buffer::list &) { + pushes = r; + }); + op.returnvec(); + co_await f->rados.execute(std::move(oid), f->ioc, std::move(op), + asio::deferred); + co_return {sys::error_code{}, pushes}; + } catch (const sys::system_error& e) { + ldpp_dout_fmt(dpp, 10, "push_entries failed: {}", e.what()); + co_return {e.code(), 0}; + } + }, rados.get_executor()), + token, dpp, std::move(entries), this); + } + + /// \brief List entries from a given part + /// + /// \param dpp Prefix provider for debug logging + /// \param part_num Part number to list + /// \param ofs Offset from which to list + /// \param result Span giving space for results + /// \param token Boost.Asio CompletionToken + /// + /// \return A subspan containing results, a bool indicating whether there + /// are more entries within the part, and a bool indicating whether + /// the part is full all in a way appropriate to the completion + /// token. + template, bool, bool)> + CompletionToken> + auto list_part(const DoutPrefixProvider* dpp, + std::int64_t part_num, std::uint64_t ofs, + std::span result, + CompletionToken&& token) { + return asio::async_initiate< + CompletionToken, void(sys::error_code, std::span, bool, bool)> + (asio::co_composed< + void(sys::error_code, std::span, bool, bool)> + ([](auto state, const DoutPrefixProvider* dpp, std::int64_t part_num, + std::uint64_t ofs,std::span result, + FIFOImpl* f) -> void { + try { + state.throw_if_cancelled(true); + state.reset_cancellation_state(asio::enable_terminal_cancellation()); + + std::unique_lock l(f->m); + auto oid = f->info.part_oid(part_num); + l.unlock(); + + ReadOp op; + fifo::op::list_part lp; + + lp.ofs = ofs; + lp.max_entries = result.size(); + + buffer::list in; + encode(lp, in); + buffer::list bl; + op.exec(fifo::op::CLASS, fifo::op::LIST_PART, in, &bl, nullptr); + co_await f->rados.execute(oid, f->ioc, std::move(op), nullptr, + asio::deferred); + bool more, full_part; + { + buffer::list::const_iterator bi = bl.begin(); + DECODE_START(1, bi); + std::string tag; + decode(tag, bi); + uint32_t len; + decode(len, bi); + if (len > result.size()) { + throw buffer::end_of_buffer{}; + } + result = result.first(len); + for (auto i = 0u; i < len; ++i) { + fifo::part_list_entry entry; + decode(entry, bi); + result[i] = {.data = std::move(entry.data), + .marker = marker{part_num, entry.ofs}.to_string(), + .mtime = entry.mtime}; + } + decode(more, bi); + decode(full_part, bi); + DECODE_FINISH(bi); + } + co_return {sys::error_code{}, std::move(result), more, full_part}; + } catch (const sys::system_error &e) { + if (e.code() != sys::errc::no_such_file_or_directory) { + ldpp_dout_fmt(dpp, 10, "list_part failed: {}", e.what()); + } + co_return {e.code(), std::span{}, false, false}; + } + }, rados.get_executor()), + token, dpp, part_num, ofs, std::move(result), this); + } + + /// \brief Trim entries on a given part + /// + /// \param dpp Prefix provider for debug logging + /// \param part_num Part to trim + /// \param ofs Offset to which to trim + /// \param exclusive If true, exclude end of range from trim + /// \param token Boost.Asio CompletionToken + /// + /// \return Possibly errors in a way appropriate to the completion + /// token. + template CompletionToken> + auto trim_part(const DoutPrefixProvider* dpp, + std::int64_t part_num, + std::uint64_t ofs, + bool exclusive, + CompletionToken&& token) { + return asio::async_initiate + (asio::co_composed + ([](auto state, const DoutPrefixProvider* dpp, + int64_t part_num, uint64_t ofs, bool exclusive, FIFOImpl* f) -> void { + try { + state.throw_if_cancelled(true); + state.reset_cancellation_state(asio::enable_terminal_cancellation()); + + std::unique_lock l(f->m); + auto oid = f->info.part_oid(part_num); + l.unlock(); + + WriteOp op; + fifo::op::trim_part tp; + + tp.ofs = ofs; + tp.exclusive = exclusive; + + buffer::list in; + encode(tp, in); + op.exec(fifo::op::CLASS, fifo::op::TRIM_PART, in); + co_await f->rados.execute(std::move(oid), f->ioc, std::move(op), + asio::deferred); + co_return sys::error_code{}; + } catch (const sys::system_error& e) { + ldpp_dout_fmt(dpp, 10, "trim_part failed: ", + e.what()); + co_return e.code(); + } + }, rados.get_executor()), + token, dpp, part_num, ofs, exclusive, this); + } + ///@} + + /// \name Logics + /// + /// Internal logic used to implement the public interface + /// + ///@{ + + /// \brief Convert a string to a marker + /// + /// This is not a free function since an empty string always refers + /// to the tail part. + /// + /// \param s String representation of the marker + /// \param l Ownership of mutex + /// + /// \returns The marker or nullopt if the string representation is invalid + std::optional to_marker(std::string_view s, + std::unique_lock& l) { + assert(l.owns_lock()); + marker m; + if (s.empty()) { + m.num = info.tail_part_num; + m.ofs = 0; + return m; + } + + auto pos = s.find(':'); + if (pos == s.npos) { + return std::nullopt; + } + + auto num = s.substr(0, pos); + auto ofs = s.substr(pos + 1); + + auto n = ceph::parse(num); + if (!n) { + return std::nullopt; + } + m.num = *n; + auto o = ceph::parse(ofs); + if (!o) { + return std::nullopt; + } + m.ofs = *o; + return m; + } + + /// \brief Force re-read of metadata + /// + /// \param dpp Prefix provider for debug logging + /// \param token Boost.Asio CompletionToken + /// + /// \return Possibly errors in a way appropriate to the completion + /// token. + template CompletionToken> + auto read_meta(const DoutPrefixProvider* dpp, + CompletionToken&& token) { + return asio::async_initiate + (asio::co_composed + ([](auto state, const DoutPrefixProvider* dpp, + FIFOImpl* f) -> void { + try { + state.throw_if_cancelled(true); + state.reset_cancellation_state(asio::enable_terminal_cancellation()); + + ldpp_dout_fmt(dpp, 20, "read_meta: entering"); + auto [info, part_header_size, part_entry_overhead] = co_await get_meta( + f->rados, f->obj, f->ioc, std::nullopt, asio::deferred); + std::unique_lock l(f->m); + if (info.version.same_or_later(f->info.version)) { + f->info = std::move(info); + f->part_header_size = part_header_size; + f->part_entry_overhead = part_entry_overhead; + } + } catch (const sys::system_error& e) { + ldpp_dout_fmt(dpp, 5, "read_meta failed: {}", e.what()); + co_return e.code(); + } + co_return sys::error_code{}; + }, rados.get_executor()), + token, dpp, this); + } + + /// \brief Update local metadata + /// + /// \param dpp Debugging prefix provider + /// \param objv Current metadata version (objclass will error on mismatch) + /// \param update Changes to make to metadata + /// \param token Boost.Asio CompletionToken + /// + /// \exception boost::system::system_error equivalent to + /// boost::system::errc::operation_canceled on version mismatch. + void apply_update(const DoutPrefixProvider *dpp, + fifo::info* info, + const fifo::objv& objv, + const fifo::update& update) { + ldpp_dout_fmt(dpp, 20, "apply_update: entering"); + std::unique_lock l(m); + if (objv != info->version) { + ldpp_dout_fmt(dpp, 10, "apply_update, {}: version mismatch, canceling", + __LINE__); + throw sys::system_error(ECANCELED, sys::generic_category()); + } + + info->apply_update(update); + } + + /// \brief Update metadata locally and in the objclass + /// + /// \param dpp Debug prefix provider + /// \param update The update to apply + /// \param objv Current object version + /// \param token Boost.Asio CompletionToken + /// + /// \return True if the operation was canceled. false otherwise in a + /// way appropriate to the completion token. + template CompletionToken> + auto update_meta(const DoutPrefixProvider* dpp, + fifo::update update, fifo::objv version, + CompletionToken&& token) { + return asio::async_initiate + (asio::co_composed + ([](auto state, const DoutPrefixProvider* dpp, + fifo::update update, fifo::objv version, + FIFOImpl* f) -> void { + try { + state.throw_if_cancelled(true); + state.reset_cancellation_state(asio::enable_terminal_cancellation()); + + ldpp_dout_fmt(dpp, 20, "update_meta: entering"); + auto [ec] = co_await f->update_meta(version, update, + asio::as_tuple(asio::deferred)); + bool canceled; + if (ec && ec != sys::errc::operation_canceled) { + throw sys::system_error(ec); + } + canceled = (ec == sys::errc::operation_canceled); + if (!canceled) { + try { + f->apply_update(dpp, &f->info, version, update); + } catch (const sys::system_error& e) { + if (e.code() == sys::errc::operation_canceled) { + canceled = true; + } else { + throw; + } + } + } + if (canceled) { + co_await f->read_meta(dpp, asio::deferred); + } + if (canceled) { + ldpp_dout_fmt(dpp, 20, "update_meta, {} canceled", __LINE__); + } + co_return {sys::error_code{}, canceled}; + } catch (const sys::system_error& e) { + ldpp_dout_fmt(dpp, 10, "update_meta failed with error: {}", e.what()); + co_return {e.code(), false}; + } + }, rados.get_executor()), + token, dpp, std::move(update), std::move(version), this); + } + + /// \brief Process the journal + /// + /// \param dpp Debug prefix provider + /// \param token Boost.Asio CompletionToken + /// + /// \return Nothing, but may error in a way appropriate to the + /// completion token. + template CompletionToken> + auto process_journal(const DoutPrefixProvider* dpp, + CompletionToken&& token) { + return asio::async_initiate + (asio::co_composed + ([](auto state, const DoutPrefixProvider* dpp, FIFOImpl* f) -> void { + try { + state.throw_if_cancelled(true); + state.reset_cancellation_state(asio::enable_terminal_cancellation()); + + ldpp_dout_fmt(dpp, 20, "process_journal: entering", __LINE__); + std::vector processed; + + std::unique_lock l(f->m); + auto tmpjournal = f->info.journal; + auto new_tail = f->info.tail_part_num; + auto new_head = f->info.head_part_num; + auto new_max = f->info.max_push_part_num; + l.unlock(); + + for (auto& entry : tmpjournal) { + ldpp_dout_fmt(dpp, 20, "process_journal, {} processing entry: entry=", + __LINE__, entry); + switch (entry.op) { + using enum fifo::journal_entry::Op; + case create: + ldpp_dout_fmt(dpp, 10, "process_journal, {}: Creating part {}", + __LINE__, entry.part_num); + co_await f->create_part(entry.part_num, asio::deferred); + if (entry.part_num > new_max) { + new_max = entry.part_num; + } + break; + case set_head: + ldpp_dout_fmt(dpp, 10, "process_journal, {}: Setting head to {}", + __LINE__, entry.part_num); + if (entry.part_num > new_head) { + new_head = entry.part_num; + } + break; + case remove: + try { + ldpp_dout_fmt(dpp, 10, "process_journal, {}: Removing part {}", + __LINE__, entry.part_num); + co_await f->remove_part(entry.part_num, asio::deferred); + if (entry.part_num >= new_tail) { + new_tail = entry.part_num + 1; + } + } catch (const sys::system_error& e) { + if (e.code() != sys::errc::no_such_file_or_directory) { + throw; + } + } + break; + default: + ldpp_dout_fmt(dpp, 1, "process_journal, {}: " + "unknown journaled op: entry={}", + __LINE__, entry); + throw sys::system_error{EINVAL, sys::generic_category()}; + } + + processed.push_back(std::move(entry)); + } + + // Postprocess + bool canceled = true; + + for (auto i = 0; canceled && i < MAX_RACE_RETRIES; ++i) { + ldpp_dout_fmt(dpp, 20, "process_journal, {}: postprocessing: i={}", + __LINE__, i); + + std::optional tail_part_num; + std::optional head_part_num; + std::optional max_part_num; + + std::unique_lock l(f->m); + auto objv = f->info.version; + if (new_tail > tail_part_num) + tail_part_num = new_tail; + if (new_head > f->info.head_part_num) + head_part_num = new_head; + if (new_max > f->info.max_push_part_num) + max_part_num = new_max; + l.unlock(); + + if (processed.empty() && !tail_part_num && !max_part_num) { + ldpp_dout_fmt(dpp, 20, "process_journal, {}: " + "nothing to update any more: i={}", + __LINE__, i); + canceled = false; + break; + } + auto u = fifo::update().tail_part_num(tail_part_num) + .head_part_num(head_part_num).max_push_part_num(max_part_num) + .journal_entries_rm(processed); + ldpp_dout_fmt(dpp, 10, "process_journal, {}: " + "Calling update_meta: update=", + __LINE__, u); + + canceled = co_await f->update_meta(dpp, u, objv, asio::deferred); + if (canceled) { + std::vector new_processed; + std::unique_lock l(f->m); + ldpp_dout_fmt(dpp, 20, "process_journal, {}: " + "update canceled, retrying: i=", + __LINE__, i); + for (auto& e : processed) { + if (f->info.journal.contains(e)) { + new_processed.push_back(e); + } + } + processed = std::move(new_processed); + } + } + if (canceled) { + ldpp_dout_fmt(dpp, 5, "process_journal, {}: " + "canceled too many times, giving up", + __LINE__); + throw sys::system_error(ECANCELED, sys::generic_category()); + } + } catch (const sys::system_error& e) { + ldpp_dout_fmt(dpp, 3, "process_journal: failed: {}", e.what()); + co_return e.code(); + } + co_return sys::error_code{}; + }, rados.get_executor()), + token, dpp, this); + } + + /// \brief Create a new part + /// + /// And possibly set it as head + /// + /// \param dpp Debug prefix provider + /// \param new_part_num New part to create + /// \param is_head True if part is to be new head + /// \param token Boost.Asio CompletionToken + /// + /// \return Nothing, but may error in a way appropriate to the + /// completion token. + template CompletionToken> + auto prepare_new_part(const DoutPrefixProvider* dpp, + std::int64_t new_part_num, bool is_head, + CompletionToken&& token) { + return asio::async_initiate + (asio::co_composed + ([](auto state, const DoutPrefixProvider* dpp, + std::int64_t new_part_num, bool is_head, FIFOImpl* f) -> void { + try { + state.throw_if_cancelled(true); + state.reset_cancellation_state(asio::enable_terminal_cancellation()); + + + ldpp_dout_fmt(dpp, 20, "prepare_new_part: entering"); + std::unique_lock l(f->m); + using enum fifo::journal_entry::Op; + std::vector jentries{{create, new_part_num}}; + if (f->info.journal.contains({create, new_part_num}) && + (!is_head || f->info.journal.contains({set_head, new_part_num}))) { + l.unlock(); + ldpp_dout_fmt(dpp, 5, "prepare_new_part, {} new part journaled, but not processed", + __LINE__); + co_await f->process_journal(dpp, asio::deferred); + co_return sys::error_code{}; + } + auto version = f->info.version; + + if (is_head) { + ldpp_dout_fmt(dpp, 20, "prepare_new_part, {}: needs new head", + __LINE__); + jentries.push_back({set_head, new_part_num}); + } + l.unlock(); + + bool canceled = true; + for (auto i = 0; canceled && i < MAX_RACE_RETRIES; ++i) { + canceled = false; + ldpp_dout_fmt(dpp, 20, "prepare_new_part, {}: updating metadata: i={}", + __LINE__, i); + auto u = fifo::update{}.journal_entries_add(jentries); + canceled = co_await f->update_meta(dpp, u, version, + asio::deferred); + if (canceled) { + std::unique_lock l(f->m); + version = f->info.version; + auto found = (f->info.journal.contains({create, new_part_num}) || + f->info.journal.contains({set_head, new_part_num})); + if ((f->info.max_push_part_num >= new_part_num && + f->info.head_part_num >= new_part_num)) { + ldpp_dout_fmt(dpp, 20, "prepare_new_part, {}: " + "raced, but journaled and processed: i={}", + __LINE__, i); + co_return sys::error_code{}; + } + if (found) { + ldpp_dout_fmt(dpp, 20, + "prepare_new_part, {}: " + "raced, journaled but not processed: i={}", + __LINE__, i); + canceled = false; + } + l.unlock(); + } + } + if (canceled) { + ldpp_dout_fmt(dpp, 5, "prepare_new_part, {}: " + "canceled too many times, giving up", + __LINE__); + throw sys::system_error{ECANCELED, sys::generic_category()}; + } + co_await f->process_journal(dpp, asio::deferred); + co_return sys::error_code{}; + } catch (const sys::system_error& e) { + ldpp_dout_fmt(dpp, 5, "prepare_new_part failed: {}", e.what()); + co_return e.code(); + } + }, rados.get_executor()), + token, dpp, new_part_num, is_head, this); + } + + /// \brief Set a new part as head + /// + /// In practice we create a new part and set it as head in one go. + /// + /// \param dpp Debug prefix provider + /// \param new_head_part_num New head part number + /// \param token Boost.Asio CompletionToken + /// + /// \return Nothing, but may error in a way appropriate to the + /// completion token. + template CompletionToken> + auto prepare_new_head(const DoutPrefixProvider* dpp, + std::int64_t new_head_part_num, + CompletionToken&& token) { + return asio::async_initiate + (asio::co_composed + ([](auto state, const DoutPrefixProvider* dpp, + std::int64_t new_head_part_num, FIFOImpl* f) -> void { + try { + state.throw_if_cancelled(true); + state.reset_cancellation_state(asio::enable_terminal_cancellation()); + + ldpp_dout_fmt(dpp, 20, "prepare_new_head: entering"); + std::unique_lock l(f->m); + auto max_push_part_num = f->info.max_push_part_num; + auto version = f->info.version; + l.unlock(); + + if (max_push_part_num < new_head_part_num) { + ldpp_dout_fmt(dpp, 20, "prepare_new_head, {}: need new part", + __LINE__); + co_await f->prepare_new_part(dpp, new_head_part_num, true, + asio::deferred); + std::unique_lock l(f->m); + if (f->info.max_push_part_num < new_head_part_num) { + ldpp_dout_fmt(dpp, 1, "prepare_new_head, {}: inconsistency, " + "push part less than head part.", + __LINE__); + throw sys::system_error(EIO, sys::generic_category()); + } + l.unlock(); + } + + using enum fifo::journal_entry::Op; + fifo::journal_entry jentry; + jentry.op = set_head; + jentry.part_num = new_head_part_num; + + bool canceled = true; + for (auto i = 0; canceled && i < MAX_RACE_RETRIES; ++i) { + canceled = false; + ldpp_dout_fmt(dpp, 20, "prepare_new_head, {}: " + "updating metadata: i={}", __LINE__, i); + auto u = fifo::update{}.journal_entries_add({{jentry}}); + canceled = co_await f->update_meta(dpp, u, version, + asio::deferred); + if (canceled) { + std::unique_lock l(f->m); + auto found = + (f->info.journal.contains({create, new_head_part_num}) || + f->info.journal.contains({set_head, new_head_part_num})); + version = f->info.version; + if ((f->info.head_part_num >= new_head_part_num)) { + ldpp_dout_fmt(dpp, 20, "prepare_new_head, {}: raced, " + "but journaled and processed: i={}", + __LINE__, i); + co_return sys::error_code{}; + } + if (found) { + ldpp_dout_fmt(dpp, 20, + "prepare_new_head, {}: raced, " + "journaled but not processed: i={}", + __LINE__, i); + canceled = false; + } + l.unlock(); + } + } + if (canceled) { + ldpp_dout_fmt(dpp, 5, "prepare_new_head, {}: " + "canceled too many times, giving up", + __LINE__); + throw sys::system_error(ECANCELED, sys::generic_category()); + } + co_await f->process_journal(dpp, asio::deferred); + co_return sys::error_code{}; + } catch (const sys::system_error& e) { + ldpp_dout_fmt(dpp, 3, "prepare_new_head failed: {}", e.what()); + co_return e.code(); + } + }, rados.get_executor()), + token, dpp, new_head_part_num, this); + } + + ///@} + +public: + + /// \brief Open an existing FIFO + /// + /// \param dpp Prefix provider for debug logging + /// \param objv Operation will fail if FIFO is not at this version + /// \param probe If true, the caller is probing the existence of the + /// FIFO. Don't print errors if we can't find it. + /// \param token Boost.Asio CompletionToken + /// + /// \return A `unique_ptr` to the open FIFO in a way appropriate to + /// the completion token. + template CompletionToken> + auto do_open(const DoutPrefixProvider* dpp, + std::optional objv, + bool probe, CompletionToken&& token) { + return asio::async_initiate + (asio::co_composed + ([](auto state, const DoutPrefixProvider* dpp, + std::optional objv, bool probe, + FIFOImpl* f) -> void { + try { + state.throw_if_cancelled(true); + state.reset_cancellation_state(asio::enable_terminal_cancellation()); + ldpp_dout_fmt(dpp, 20, "do_open: entering"); + std::tie(f->info, f->part_header_size, f->part_entry_overhead) + = co_await get_meta(f->rados, f->obj, f->ioc, objv, + asio::deferred); + probe = 0; + + // If there are journal entries, process them, in case + // someone crashed mid-transaction. + if (!f->info.journal.empty()) { + ldpp_dout_fmt(dpp, 20, "do_open, {}: processing leftover journal", + __LINE__); + co_await f->process_journal(dpp, asio::deferred); + } + co_return sys::error_code{}; + } catch (const sys::system_error& e) { + if (!probe || + (probe && !(e.code() == sys::errc::no_such_file_or_directory || + e.code() == sys::errc::no_message_available))) { + ldpp_dout_fmt(dpp, 5, "do_open failed: {}:", e.what()); + } + co_return e.code(); + } + }, rados.get_executor()), + token, dpp, std::move(objv), probe, this); + } + + template CompletionToken> + auto do_create(const DoutPrefixProvider* dpp, + std::optional objv, + std::optional oid_prefix, + bool exclusive, + std::uint64_t max_part_size, + std::uint64_t max_entry_size, + CompletionToken&& token) { + return asio::async_initiate + (asio::co_composed + ([](auto state, const DoutPrefixProvider* dpp, + std::optional objv, + std::optional oid_prefix, + bool exclusive, + std::uint64_t max_part_size, + std::uint64_t max_entry_size, + FIFOImpl* f) -> void { + try { + state.throw_if_cancelled(true); + state.reset_cancellation_state(asio::enable_terminal_cancellation()); + ldpp_dout_fmt(dpp, 20, "do_create: entering"); + co_await create_meta(f->rados, f->obj, f->ioc, objv, oid_prefix, + exclusive, max_part_size, max_entry_size, + asio::deferred); + co_await f->do_open(dpp, objv, false, asio::deferred); + + co_return sys::error_code{}; + } catch (const sys::system_error& e) { + ldpp_dout_fmt(dpp, 5, "do_create failed: {}", e.what()); + co_return e.code(); + } + }, rados.get_executor()), + token, dpp, std::move(objv), oid_prefix, exclusive, max_part_size, + max_entry_size, this); + } + +public: + + /// \brief Push entries to the FIFO + /// + /// \param dpp Prefix provider for debug logging + /// \param entries Vector of entries + /// \param token Boost.Asio CompletionToken + /// + /// \return Nothing, but may error in a way appropriate to the + /// completion token. + template CompletionToken> + auto push(const DoutPrefixProvider* dpp, + std::deque entries, + CompletionToken&& token) { + return asio::async_initiate + (asio::co_composed + ([](auto state, const DoutPrefixProvider* dpp, + std::deque remaining, FIFOImpl* f) -> void { + try { + state.throw_if_cancelled(true); + state.reset_cancellation_state(asio::enable_terminal_cancellation()); + + std::unique_lock l(f->m); + auto max_entry_size = f->info.params.max_entry_size; + auto need_new_head = f->info.need_new_head(); + auto head_part_num = f->info.head_part_num; + l.unlock(); + ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << " entering" << dendl; + if (remaining.empty()) { + ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << " empty push, returning success" << dendl; + co_return sys::error_code{}; + } + + // Validate sizes + for (const auto& bl : remaining) { + if (bl.length() > max_entry_size) { + ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << " entry bigger than max_entry_size" + << dendl; + co_return sys::error_code{E2BIG, sys::generic_category()}; + } + } + + if (need_new_head) { + ldpp_dout(dpp, 10) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << " need new head: " + << head_part_num + 1 << dendl; + co_await f->prepare_new_head(dpp, head_part_num + 1, asio::deferred); + } + + std::deque batch; + + uint64_t batch_len = 0; + auto retries = 0; + bool canceled = true; + while ((!remaining.empty() || !batch.empty()) && + (retries <= MAX_RACE_RETRIES)) { + ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << " preparing push: remaining=" << remaining.size() + << " batch=" << batch.size() << " retries=" << retries + << dendl; + std::unique_lock l(f->m); + head_part_num = f->info.head_part_num; + auto max_part_size = f->info.params.max_part_size; + auto overhead = f->part_entry_overhead; + l.unlock(); + + while (!remaining.empty() && + (remaining.front().length() + batch_len <= max_part_size)) { + /* We can send entries with data_len up to max_entry_size, + however, we want to also account the overhead when + dealing with multiple entries. Previous check doesn't + account for overhead on purpose. */ + batch_len += remaining.front().length() + overhead; + batch.push_back(std::move(remaining.front())); + remaining.pop_front(); + } + ldpp_dout(dpp, 20) + << __PRETTY_FUNCTION__ << ":" << __LINE__ + << " prepared push: remaining=" << remaining.size() + << " batch=" << batch.size() << " retries=" << retries + << " batch_len=" << batch_len << dendl; + + auto [ec, n] = + co_await f->push_entries(dpp, batch, + asio::as_tuple(asio::deferred)); + if (ec == sys::errc::result_out_of_range) { + canceled = true; + ++retries; + ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << " need new head " << head_part_num + 1 + << dendl; + co_await f->prepare_new_head(dpp, head_part_num + 1, + asio::deferred); + continue; + } else if (ec == sys::errc::no_such_file_or_directory) { + ldpp_dout(dpp, 20) + << __PRETTY_FUNCTION__ << ":" << __LINE__ + << " racing client trimmed part, rereading metadata " + << dendl; + canceled = true; + ++retries; + co_await f->read_meta(dpp, asio::deferred); + continue; + } else if (ec) { + ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << " push_entries failed: " << ec.message() + << dendl; + throw sys::system_error(ec); + } + assert(n >= 0); + // Made forward progress! + canceled = false; + retries = 0; + batch_len = 0; + if (n == ssize(batch)) { + batch.clear(); + } else { + batch.erase(batch.begin(), batch.begin() + n); + for (const auto& b : batch) { + batch_len += b.length() + overhead; + } + } + } + if (canceled) { + ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << " canceled too many times, giving up." + << dendl; + co_return sys::error_code{ECANCELED, sys::generic_category()}; + } + co_return sys::error_code{}; + } catch (const sys::system_error& e) { + ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << " push failed: " << e.what() << dendl; + co_return e.code(); + } + }, rados.get_executor()), + token, dpp, std::move(entries), this); + } + + /// \brief List entries in the FIFO + /// + /// \param dpp Prefix provider for debug logging + /// \param markstr Marker to resume listing + /// \param entries Space for entries + /// \param token Boost.Asio CompletionToken + /// + /// \return (span, marker) where the span is long enough to hold + /// returned entries, and marker is non-null if the listing was + /// incomplete, in a way appropriate to the completion token. + template, + std::string)> CompletionToken> + auto list(const DoutPrefixProvider* dpp, + std::string markstr, std::span entries, + CompletionToken&& token) { + return asio::async_initiate, + std::string)> + (asio::co_composed, + std::string)> + ([](auto state, const DoutPrefixProvider* dpp, + std::string markstr, std::span entries, + FIFOImpl* f) -> void { + try { + state.throw_if_cancelled(true); + state.reset_cancellation_state(asio::enable_terminal_cancellation()); + + ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << " entering" << dendl; + std::unique_lock l(f->m); + std::int64_t part_num = f->info.tail_part_num; + std::uint64_t ofs = 0; + if (!markstr.empty()) { + auto marker = f->to_marker(markstr, l); + if (!marker) { + ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << " invalid marker string: " << markstr + << dendl; + throw sys::system_error{EINVAL, sys::generic_category()}; + } + part_num = marker->num; + ofs = marker->ofs; + } + l.unlock(); + + bool more = false; + + auto entries_left = entries; + + while (entries_left.size() > 0) { + more = false; + ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << " entries_left.size()=" + << entries_left.size() << dendl; + auto [ec, res, part_more, part_full] = + co_await f->list_part(dpp, part_num, ofs, entries_left, + asio::as_tuple(asio::deferred)); + if (ec == sys::errc::no_such_file_or_directory) { + ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << " missing part, rereading metadata" + << dendl; + co_await f->read_meta(dpp, asio::deferred); + std::unique_lock l(f->m); + if (part_num < f->info.tail_part_num) { + /* raced with trim? restart */ + ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << " raced with trim, restarting" << dendl; + entries_left = entries; + part_num = f->info.tail_part_num; + l.unlock(); + ofs = 0; + continue; + } + l.unlock(); + ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << " assuming part was not written yet, " + << "so end of data" << dendl; + break; + } else if (ec) { + ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << " list_entries failed: " << ec.message() + << dendl; + throw sys::system_error(ec); + } + more = part_full || part_more; + entries_left = entries_left.last(entries_left.size() - res.size()); + + if (!part_full) { + ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << " head part is not full, so we can assume " + << "we're done" << dendl; + break; + } + if (!part_more) { + ++part_num; + ofs = 0; + } + } + std::string marker; + if (entries_left.size() > 0) { + entries = entries.first(entries.size() - entries_left.size()); + } + if (more && !entries.empty()) { + marker = entries.back().marker; + } + co_return {sys::error_code{}, std::move(entries), std::move(marker)}; + } catch (const sys::system_error& e) { + ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << " list failed: " << e.what() << dendl; + co_return {e.code(), std::span{}, std::string{}}; + } + }, rados.get_executor()), + token, dpp, std::move(markstr), std::move(entries), this); + } + + /// \brief Trim entries from the FIFO + /// + /// \param dpp Prefix provider for debug logging + /// \param marker Marker to which to trim + /// \param exclusive If true, exclude the marked element from trim, + /// if false, trim it. + /// \param token Boost.Asio CompletionToken + /// + /// \return Nothing, but may error in a way appropriate to the + /// completion token. + template CompletionToken> + auto trim(const DoutPrefixProvider* dpp, + std::string marker, bool exclusive, + CompletionToken&& token) { + return asio::async_initiate + (asio::co_composed + ([](auto state, const DoutPrefixProvider* dpp, + std::string markstr, bool exclusive, FIFOImpl* f) -> void { + try { + state.throw_if_cancelled(true); + state.reset_cancellation_state(asio::enable_terminal_cancellation()); + + ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << " entering" << dendl; + bool overshoot = false; + std::unique_lock l(f->m); + auto marker = f->to_marker(markstr, l); + if (!marker) { + ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << " invalid marker string: " << markstr + << dendl; + throw sys::system_error{EINVAL, sys::generic_category()}; + } + auto part_num = marker->num; + auto ofs = marker->ofs; + auto hn = f->info.head_part_num; + const auto max_part_size = f->info.params.max_part_size; + if (part_num > hn) { + l.unlock(); + co_await f->read_meta(dpp, asio::deferred); + l.lock(); + hn = f->info.head_part_num; + if (part_num > hn) { + overshoot = true; + part_num = hn; + ofs = max_part_size; + } + } + if (part_num < f->info.tail_part_num) { + throw sys::system_error(ENODATA, sys::generic_category()); + } + auto pn = f->info.tail_part_num; + l.unlock(); + + while (pn < part_num) { + ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << " pn=" << pn << dendl; + auto [ec] = co_await f->trim_part(dpp, pn, max_part_size, false, + asio::as_tuple(asio::deferred)); + if (ec && ec == sys::errc::no_such_file_or_directory) { + ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << " trim_part failed: " << ec.message() + << dendl; + throw sys::system_error(ec); + } + ++pn; + } + + auto [ec] = co_await f->trim_part(dpp, pn, ofs, exclusive, + asio::as_tuple(asio::deferred)); + if (ec && ec == sys::errc::no_such_file_or_directory) { + ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << " trim_part failed: " << ec.message() + << dendl; + throw sys::system_error(ec); + } + + l.lock(); + auto tail_part_num = f->info.tail_part_num; + auto objv = f->info.version; + l.unlock(); + bool canceled = tail_part_num < part_num; + int retries = 0; + while ((tail_part_num < part_num) && + canceled && + (retries <= MAX_RACE_RETRIES)) { + canceled = co_await f->update_meta(dpp, fifo::update{} + .tail_part_num(part_num), + objv, asio::deferred); + if (canceled) { + ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << " canceled: retries=" << retries + << dendl; + l.lock(); + tail_part_num = f->info.tail_part_num; + objv = f->info.version; + l.unlock(); + ++retries; + } + } + if (canceled) { + ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << " canceled too many times, giving up" + << dendl; + throw sys::system_error(EIO, sys::generic_category()); + } + co_return (overshoot ? + sys::error_code{ENODATA, sys::generic_category()} : + sys::error_code{}); + } catch (const sys::system_error& e) { + if (ceph::from_error_code(e.code()) != -ENODATA) { + ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << " trim failed: " << e.what() << dendl; + } + co_return e.code(); + } + }, rados.get_executor()), + token, dpp, std::move(marker), exclusive, this); + } + + /// \brief Get information on the last entry + /// + /// \param dpp Prefix provider for debug logging + /// \param token Boost.Asio CompletionToken + /// + /// \return {marker, time} for the latest entry in a way appropriate + /// to the completion token. + template + CompletionToken> + auto last_entry_info(const DoutPrefixProvider* dpp, + CompletionToken&& token) { + return asio::async_initiate< + CompletionToken, void(sys::error_code, std::string, ceph::real_time)> + (asio::co_composed + ([](auto state, const DoutPrefixProvider* dpp, + FIFOImpl* f) -> void { + try { + state.throw_if_cancelled(true); + state.reset_cancellation_state(asio::enable_terminal_cancellation()); + + co_await f->read_meta(dpp, asio::deferred); + std::unique_lock l(f->m); + auto head_part_num = f->info.head_part_num; + l.unlock(); + + if (head_part_num < 0) { + co_return {sys::error_code{}, std::string{}, + ceph::real_clock::zero()}; + } else { + auto header = + co_await f->get_part_info(head_part_num, asio::deferred); + co_return {sys::error_code{}, + marker{head_part_num, header.last_ofs}.to_string(), + header.max_time}; + } + } catch (const sys::system_error& e) { + ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << " read failed: " << e.what() << dendl; + co_return {e.code(), std::string{}, ceph::real_time{}}; + } + }, rados.get_executor()), + token, dpp, this); + } + + using executor_type = RADOS::executor_type; + executor_type get_executor() { + return rados.get_executor(); + } +}; +} // namespace detail +} // namespace neorados::clas::fifo diff --git a/src/neorados/cls/fifo/entry.h b/src/neorados/cls/fifo/entry.h new file mode 100644 index 00000000000..6720dd71f14 --- /dev/null +++ b/src/neorados/cls/fifo/entry.h @@ -0,0 +1,39 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2023 IBM + * + * See file COPYING for license information. + * + */ + +/// \file neorados/cls/fifo/entry.h +/// +/// \brief Entries returned by FIFO list +/// +/// Part of the public FIFO interface but needed by `FIFOImpl`. + +#pragma once + +#include "include/buffer.h" +#include "common/ceph_time.h" + +namespace neorados::cls::fifo { +/// Entries to be returned by the list operation +struct entry { + /// Data stored in the entry + ceph::buffer::list data; + /// Marker (for trimming, continuing list operations) + std::string marker; + /// Time stored (set by the OSD) + ceph::real_time mtime; +}; + +inline std::ostream& operator <<(std::ostream& m, const entry& e) { + return m << "[data: " << e.data + << ", marker: " << e.marker + << ", mtime: " << e.mtime << "]"; +} +} // neorados::cls::fifo diff --git a/src/test/cls_fifo/ceph_test_neocls_fifo.cc b/src/test/cls_fifo/ceph_test_neocls_fifo.cc index 8b9accf9d01..042b6023d5c 100644 --- a/src/test/cls_fifo/ceph_test_neocls_fifo.cc +++ b/src/test/cls_fifo/ceph_test_neocls_fifo.cc @@ -13,7 +13,6 @@ #include #include -#include #include #include #include @@ -37,45 +36,55 @@ namespace asio = boost::asio; namespace sys = boost::system; namespace buffer = ceph::buffer; -namespace fifo = neorados::cls::fifo; using namespace std::literals; namespace neorados::cls::fifo { +namespace fifo = rados::cls::fifo; + class FIFOtest { public: template - static auto create_meta(Args&&... args) { - return FIFO::create_meta(std::forward(args)...); + static auto create_meta(neorados::RADOS rados, Args&&... args) { + return detail::FIFOImpl::create_meta(rados, + std::forward(args)...); } template - static auto get_meta(Args&&... args) { - return FIFO::get_meta(std::forward(args)...); + static auto get_meta(neorados::RADOS rados, Args&&... args) { + return FIFO::get_meta(rados, std::forward(args)...); } - template - static auto read_meta(fifo::FIFO& f, Args&&... args) { - return f.read_meta(std::forward(args)...); - } - static auto meta(fifo::FIFO& f) { - return f.info; + template CompletionToken> + static auto read_meta(FIFO& f, const DoutPrefixProvider* dpp, + CompletionToken&& token) { + return f.impl->read_meta(dpp, + f.consign(std::forward(token))); } - template - static auto get_part_info(fifo::FIFO& f, Args&&... args) { - return f.get_part_info(std::forward(args)...); + static auto meta(FIFO& f) { + return f.impl->info; } - static auto get_part_layout_info(fifo::FIFO& f) { - return std::make_tuple(f.part_header_size, f.part_entry_overhead); + template CompletionToken> + static auto get_part_info(FIFO& f, std::int64_t num, CompletionToken&& token) { + return f.impl->get_part_info(num,f.consign( + std::forward(token))); } + static auto get_part_layout_info(FIFO& f) { + return std::make_tuple( + f.impl->part_header_size, + f.impl->part_entry_overhead); + } }; } -using fifo::FIFOtest; +using neorados::cls::fifo::FIFO; +using neorados::cls::fifo::FIFOtest; +using neorados::cls::fifo::entry; auto cct = new CephContext(CEPH_ENTITY_TYPE_CLIENT); const DoutPrefix dp(cct, 1, "test legacy cls fifo: "); @@ -88,19 +97,19 @@ CORO_TEST_F(cls_fifo, create, NeoRadosTest) sys::error_code ec; co_await FIFOtest::create_meta(rados(), fifo_id, pool(), std::nullopt, std::nullopt, false, 0, - fifo::FIFO::default_max_entry_size, + FIFO::default_max_entry_size, asio::redirect_error(asio::use_awaitable, ec)); EXPECT_EQ(sys::errc::invalid_argument, ec); co_await FIFOtest::create_meta(rados(), fifo_id, pool(), std::nullopt, std::nullopt, false, - fifo::FIFO::default_max_part_size, 0, + FIFO::default_max_part_size, 0, asio::redirect_error(asio::use_awaitable, ec)); EXPECT_EQ(sys::errc::invalid_argument, ec); co_await FIFOtest::create_meta(rados(), fifo_id, pool(), std::nullopt, std::nullopt, false, - fifo::FIFO::default_max_part_size, - fifo::FIFO::default_max_entry_size, + FIFO::default_max_part_size, + FIFO::default_max_entry_size, asio::redirect_error(asio::use_awaitable, ec)); EXPECT_FALSE(ec); neorados::ReadOp op; @@ -111,30 +120,31 @@ CORO_TEST_F(cls_fifo, create, NeoRadosTest) /* test idempotency */ co_await FIFOtest::create_meta(rados(), fifo_id, pool(), std::nullopt, std::nullopt, false, - fifo::FIFO::default_max_part_size, - fifo::FIFO::default_max_entry_size, + FIFO::default_max_part_size, + FIFO::default_max_entry_size, asio::redirect_error(asio::use_awaitable, ec)); EXPECT_FALSE(ec); } CORO_TEST_F(cls_fifo, get_info, NeoRadosTest) { + auto r = rados(); std::string_view fifo_id = "fifo"; - co_await FIFOtest::create_meta(rados(), fifo_id, pool(), std::nullopt, + co_await FIFOtest::create_meta(r, fifo_id, pool(), std::nullopt, std::nullopt, false, - fifo::FIFO::default_max_part_size, - fifo::FIFO::default_max_entry_size, + FIFO::default_max_part_size, + FIFO::default_max_entry_size, asio::use_awaitable); auto [info, part_header_size, part_entry_overhead] = - co_await FIFOtest::get_meta(rados(), fifo_id, pool(), std::nullopt, + co_await FIFOtest::get_meta(r, fifo_id, pool(), std::nullopt, asio::use_awaitable); EXPECT_GT(part_header_size, 0); EXPECT_GT(part_entry_overhead, 0); EXPECT_FALSE(info.version.instance.empty()); std::tie(info, part_header_size, part_entry_overhead) = - co_await FIFOtest::get_meta(rados(), fifo_id, pool(), info.version, + co_await FIFOtest::get_meta(r, fifo_id, pool(), info.version, asio::use_awaitable); @@ -142,20 +152,20 @@ CORO_TEST_F(cls_fifo, get_info, NeoRadosTest) objv.instance = "foo"; objv.ver = 12; - sys::error_code ec; - std::tie(info, part_header_size, part_entry_overhead) = - co_await FIFOtest::get_meta(rados(), fifo_id, pool(), objv, - asio::redirect_error(asio::use_awaitable, ec)); + EXPECT_THROW({ + std::tie(info, part_header_size, part_entry_overhead) = + co_await FIFOtest::get_meta(r, fifo_id, pool(), objv, + asio::use_awaitable); - EXPECT_EQ(sys::errc::operation_canceled, ec); + }, sys::system_error); } CORO_TEST_F(fifo, open_default, NeoRadosTest) { std::string_view fifo_id = "fifo"; - auto f = co_await fifo::FIFO::create(&dp, rados(), fifo_id, pool(), - asio::use_awaitable); + auto f = co_await FIFO::create(&dp, rados(), fifo_id, pool(), + asio::use_awaitable); // force reading from backend co_await FIFOtest::read_meta(*f, &dp, asio::use_awaitable); auto info = FIFOtest::meta(*f); @@ -173,10 +183,10 @@ CORO_TEST_F(fifo, open_params, NeoRadosTest) objv.instance = "fooz"s; objv.ver = 10; - auto f = co_await fifo::FIFO::create(&dp, rados(), fifo_id, pool(), - asio::use_awaitable, - objv, oid_prefix, false, - max_part_size, max_entry_size); + auto f = co_await FIFO::create(&dp, rados(), fifo_id, pool(), + asio::use_awaitable, + objv, oid_prefix, false, + max_part_size, max_entry_size); // force reading from backend co_await FIFOtest::read_meta(*f, &dp, asio::use_awaitable); @@ -188,7 +198,7 @@ CORO_TEST_F(fifo, open_params, NeoRadosTest) } template -inline T decode_entry(const fifo::entry& entry) +inline T decode_entry(const entry& entry) { T val; auto iter = entry.data.cbegin(); @@ -200,8 +210,8 @@ CORO_TEST_F(fifo, push_list_trim, NeoRadosTest) { std::string_view fifo_id = "fifo"; - auto f = co_await fifo::FIFO::create(&dp, rados(), fifo_id, pool(), - asio::use_awaitable); + auto f = co_await FIFO::create(&dp, rados(), fifo_id, pool(), + asio::use_awaitable); static constexpr auto max_entries = 10u; for (auto i = 0u; i < max_entries; ++i) { buffer::list bl; @@ -209,7 +219,7 @@ CORO_TEST_F(fifo, push_list_trim, NeoRadosTest) co_await f->push(&dp, std::move(bl), asio::use_awaitable); } - std::array entries; + std::array entries; /* get entries one by one */ std::string inmark; for (auto i = 0u; i < max_entries; ++i) { @@ -264,10 +274,10 @@ CORO_TEST_F(fifo, push_too_big, NeoRadosTest) static constexpr auto max_entry_size = 128ull; std::string_view fifo_id = "fifo"; - auto f = co_await fifo::FIFO::create(&dp, rados(), fifo_id, pool(), - asio::use_awaitable, std::nullopt, - std::nullopt, false, max_part_size, - max_entry_size); + auto f = co_await FIFO::create(&dp, rados(), fifo_id, pool(), + asio::use_awaitable, std::nullopt, + std::nullopt, false, max_part_size, + max_entry_size); std::array buf; buf.fill('\0'); @@ -285,10 +295,10 @@ CORO_TEST_F(fifo, multiple_parts, NeoRadosTest) static constexpr auto max_entry_size = 128ull; std::string_view fifo_id = "fifo"; - auto f = co_await fifo::FIFO::create(&dp, rados(), fifo_id, pool(), - asio::use_awaitable, std::nullopt, - std::nullopt, false, max_part_size, - max_entry_size); + auto f = co_await FIFO::create(&dp, rados(), fifo_id, pool(), + asio::use_awaitable, std::nullopt, + std::nullopt, false, max_part_size, + max_entry_size); std::array buf; buf.fill('\0'); const auto [part_header_size, part_entry_overhead] = @@ -309,7 +319,7 @@ CORO_TEST_F(fifo, multiple_parts, NeoRadosTest) /* head should have advanced */ EXPECT_GT(info.head_part_num, 0); - std::vector entries{max_entries}; + std::vector entries{max_entries}; { /* list all at once */ auto [result, marker] = co_await f->list(&dp, {}, entries, @@ -327,7 +337,7 @@ CORO_TEST_F(fifo, multiple_parts, NeoRadosTest) std::string marker; /* get entries one by one */ for (auto i = 0u; i < max_entries; ++i) { - std::span result; + std::span result; std::tie(result, marker) = co_await f->list(&dp, marker, std::span(entries).first(1), asio::use_awaitable); @@ -345,7 +355,7 @@ CORO_TEST_F(fifo, multiple_parts, NeoRadosTest) marker.clear(); for (auto i = 0u; i < max_entries; ++i) { /* read single entry */ - std::span result; + std::span result; std::tie(result, marker) = co_await f->list(&dp, {}, std::span(entries).first(1), asio::use_awaitable); @@ -389,10 +399,10 @@ CORO_TEST_F(fifo, two_pushers, NeoRadosTest) static constexpr auto max_entry_size = 128ull; std::string_view fifo_id = "fifo"; - auto f1 = co_await fifo::FIFO::create(&dp, rados(), fifo_id, pool(), - asio::use_awaitable, std::nullopt, - std::nullopt, false, max_part_size, - max_entry_size); + auto f1 = co_await FIFO::create(&dp, rados(), fifo_id, pool(), + asio::use_awaitable, std::nullopt, + std::nullopt, false, max_part_size, + max_entry_size); std::array buf; buf.fill('\0'); const auto [part_header_size, part_entry_overhead] = @@ -402,7 +412,7 @@ CORO_TEST_F(fifo, two_pushers, NeoRadosTest) const auto max_entries = entries_per_part * 4 + 1; - auto f2 = co_await fifo::FIFO::open(&dp, rados(), fifo_id, pool(), + auto f2 = co_await FIFO::open(&dp, rados(), fifo_id, pool(), asio::use_awaitable); std::vector fifos{f1.get(), f2.get()}; @@ -415,7 +425,7 @@ CORO_TEST_F(fifo, two_pushers, NeoRadosTest) } /* list all by both */ - std::vector entries{max_entries}; + std::vector entries{max_entries}; auto [result, marker] = co_await f1->list(&dp, {}, entries, asio::use_awaitable); EXPECT_TRUE(marker.empty()); @@ -438,10 +448,10 @@ CORO_TEST_F(fifo, two_pushers_trim, NeoRadosTest) static constexpr auto max_entry_size = 128ull; std::string_view fifo_id = "fifo"; - auto f1 = co_await fifo::FIFO::create(&dp, rados(), fifo_id, pool(), - asio::use_awaitable, std::nullopt, - std::nullopt, false, max_part_size, - max_entry_size); + auto f1 = co_await FIFO::create(&dp, rados(), fifo_id, pool(), + asio::use_awaitable, std::nullopt, + std::nullopt, false, max_part_size, + max_entry_size); std::array buf; buf.fill('\0'); const auto [part_header_size, part_entry_overhead] = @@ -451,8 +461,8 @@ CORO_TEST_F(fifo, two_pushers_trim, NeoRadosTest) const auto max_entries = entries_per_part * 4 + 1; - auto f2 = co_await fifo::FIFO::open(&dp, rados(), fifo_id, pool(), - asio::use_awaitable); + auto f2 = co_await FIFO::open(&dp, rados(), fifo_id, pool(), + asio::use_awaitable); /* push one entry to f2 and the rest to f1 */ for (auto i = 0u; i < max_entries; ++i) { buffer::list bl; @@ -464,7 +474,7 @@ CORO_TEST_F(fifo, two_pushers_trim, NeoRadosTest) /* trim half by fifo1 */ auto num = max_entries / 2; - std::vector entries{max_entries}; + std::vector entries{max_entries}; auto [result, marker] = co_await f1->list(&dp, {}, std::span(entries).first(num), asio::use_awaitable); @@ -501,10 +511,10 @@ CORO_TEST_F(fifo, push_batch, NeoRadosTest) static constexpr auto max_entry_size = 128ull; std::string_view fifo_id = "fifo"; - auto f = co_await fifo::FIFO::create(&dp, rados(), fifo_id, pool(), - asio::use_awaitable, std::nullopt, - std::nullopt, false, max_part_size, - max_entry_size); + auto f = co_await FIFO::create(&dp, rados(), fifo_id, pool(), + asio::use_awaitable, std::nullopt, + std::nullopt, false, max_part_size, + max_entry_size); std::array buf; buf.fill('\0'); const auto [part_header_size, part_entry_overhead] = @@ -525,7 +535,7 @@ CORO_TEST_F(fifo, push_batch, NeoRadosTest) co_await f->push(&dp, std::move(bufs), asio::use_awaitable); /* list all */ - std::vector entries{max_entries}; + std::vector entries{max_entries}; auto [result, marker] = co_await f->list(&dp, {}, entries, asio::use_awaitable); EXPECT_TRUE(marker.empty()); @@ -541,8 +551,8 @@ CORO_TEST_F(fifo, push_batch, NeoRadosTest) CORO_TEST_F(fifo, trim_exclusive, NeoRadosTest) { std::string_view fifo_id = "fifo"; - auto f = co_await fifo::FIFO::create(&dp, rados(), fifo_id, pool(), - asio::use_awaitable); + auto f = co_await FIFO::create(&dp, rados(), fifo_id, pool(), + asio::use_awaitable); static constexpr auto max_entries = 10u; for (auto i = 0u; i < max_entries; ++i) { @@ -551,7 +561,7 @@ CORO_TEST_F(fifo, trim_exclusive, NeoRadosTest) co_await f->push(&dp, std::move(bl), asio::use_awaitable); } - std::array entries; + std::array entries; auto [result, marker] = co_await f->list(&dp, {}, std::span{entries}.first(1), asio::use_awaitable); auto val = decode_entry(result.front()); @@ -582,8 +592,8 @@ CORO_TEST_F(fifo, trim_exclusive, NeoRadosTest) CORO_TEST_F(fifo, trim_all, NeoRadosTest) { std::string_view fifo_id = "fifo"; - auto f = co_await fifo::FIFO::create(&dp, rados(), fifo_id, pool(), - asio::use_awaitable); + auto f = co_await FIFO::create(&dp, rados(), fifo_id, pool(), + asio::use_awaitable); static constexpr auto max_entries = 10u; for (auto i = 0u; i < max_entries; ++i) { @@ -597,7 +607,7 @@ CORO_TEST_F(fifo, trim_all, NeoRadosTest) asio::redirect_error(asio::use_awaitable, ec)); EXPECT_EQ(sys::errc::no_message_available, ec); - std::array entries; + std::array entries; auto [result, marker] = co_await f->list(&dp, {}, entries, asio::use_awaitable); EXPECT_TRUE(result.empty()); @@ -611,9 +621,9 @@ TEST(neocls_fifo_bare, lambdata) std::optional rados; neorados::IOContext pool; std::string_view fifo_id = "fifo"; - std::unique_ptr f; + std::unique_ptr f; static constexpr auto max_entries = 10u; - std::array list_entries; + std::array list_entries; bool completed = false; neorados::RADOS::Builder{}.build( c, @@ -625,9 +635,9 @@ TEST(neocls_fifo_bare, lambdata) [&](sys::error_code ec, int64_t poolid) { ASSERT_FALSE(ec); pool.set_pool(poolid); - fifo::FIFO::create( + FIFO::create( &dp, *rados, fifo_id, pool, - [&](sys::error_code ec, std::unique_ptr f_) { + [&](sys::error_code ec, std::unique_ptr f_) { ASSERT_FALSE(ec); f = std::move(f_); std::array entries; @@ -640,7 +650,7 @@ TEST(neocls_fifo_bare, lambdata) ASSERT_FALSE(ec); f->list( &dp, {}, list_entries, - [&](sys::error_code ec, std::span result, + [&](sys::error_code ec, std::span result, std::string marker) { ASSERT_FALSE(ec); ASSERT_EQ(max_entries, result.size()); @@ -655,7 +665,7 @@ TEST(neocls_fifo_bare, lambdata) ASSERT_EQ(sys::errc::no_message_available, ec); f->list( &dp, {}, list_entries, - [&](sys::error_code ec, std::span result, + [&](sys::error_code ec, std::span result, std::string marker) { ASSERT_FALSE(ec); ASSERT_TRUE(result.empty()); -- 2.39.5