--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2023 IBM
+ *
+ * See file COPYING for license information.
+ *
+ */
+
+#pragma once
+
+/// \file neorados/cls/fifo.h
+///
+/// \brief NeoRADOS interface to FIFO class
+///
+/// The `fifo` object class stores a queue structured log across
+/// multiple OSDs. Each FIFO comprises a head object with metadata
+/// such as the current head and tail objects, as well as a set of
+/// data objects, containing a bunch of entries. New entries may be
+/// pushed at the head and trimmed at the tail. Entries may be
+/// retrieved for processing with the `list` operation. Each entry
+/// comes with a marker that may be used to trim up to (inclusively)
+/// that object once processing is complete. The head object is the
+/// notional 'name' of the FIFO, provided at creation or opening time.
+
+#include <cerrno>
+#include <coroutine>
+#include <cstdint>
+#include <deque>
+#include <map>
+#include <memory>
+#include <mutex>
+#include <optional>
+#include <span>
+#include <string>
+#include <string_view>
+#include <utility>
+#include <vector>
+
+#include <fmt/format.h>
+
+#include <boost/asio/as_tuple.hpp>
+#include <boost/asio/async_result.hpp>
+#include <boost/asio/deferred.hpp>
+
+#include <boost/asio/experimental/co_composed.hpp>
+
+#include <boost/system/error_code.hpp>
+#include <boost/system/system_error.hpp>
+
+#include "include/buffer.h"
+#include "include/neorados/RADOS.hpp"
+
+#include "common/debug.h"
+#include "common/strtol.h"
+
+#include "neorados/cls/common.h"
+
+#include "cls/fifo/cls_fifo_types.h"
+#include "cls/fifo/cls_fifo_ops.h"
+
+// Asio's co_compse generates spurious warnings when compiled with
+// -O0. the 'mismatched' `operator new` calls directly into the
+// matching `operator new`, returning its result.
+#pragma GCC diagnostic push
+#pragma GCC diagnostic ignored "-Wmismatched-new-delete"
+
+namespace neorados::cls::fifo {
+/// Entries to be returned by the list operation
+struct entry {
+ /// Data stored in the entry
+ ceph::buffer::list data;
+ /// Marker (for trimming, continuing list operations)
+ std::string marker;
+ /// Time stored (set by the OSD)
+ ceph::real_time mtime;
+};
+
+inline std::ostream& operator <<(std::ostream& m, const entry& e) {
+ return m << "[data: " << e.data
+ << ", marker: " << e.marker
+ << ", mtime: " << e.mtime << "]";
+}
+
+/// This is the FIFO client class. It handles the logic of keeping
+/// state synchronized with that on the server, journal processing,
+/// and multipart push.
+class FIFO {
+ friend class FIFOtest;
+ /// A marker is a part number and byte offset used to indicate an
+ /// entry.
+ struct marker {
+ std::int64_t num = 0;
+ std::uint64_t ofs = 0;
+
+ /// Default constructor
+ marker() = default;
+
+ /// Construct from part and offset
+ ///
+ /// \param num Part number
+ /// \param ofs Offset within the part
+ marker(std::int64_t num, std::uint64_t ofs) : num(num), ofs(ofs) {}
+
+ /// Return a string representation of the marker
+ std::string to_string() {
+ return fmt::format("{:0>20}:{:0>20}", num, ofs);
+ }
+ };
+
+ /// Maximum number of retries if we run into a conflict. Limited to
+ /// keep us from locking up if we never hit the end condition.
+ static constexpr auto MAX_RACE_RETRIES = 10;
+
+ /// RADOS handle
+ neorados::RADOS& rados;
+ /// Head object
+ const neorados::Object obj;
+ /// Locator
+ const neorados::IOContext ioc;
+ /// Total size of the part header.
+ std::uint32_t part_header_size;
+ /// The additional space required to store an entry, above the size
+ /// of the entry itself.
+ std::uint32_t part_entry_overhead;
+ /// Local copy of FIFO data
+ rados::cls::fifo::info info;
+
+ /// Mutex protecting local data;
+ std::mutex m;
+
+ /// Constructor
+ ///
+ /// \param rados RADOS handle
+ /// \param obj Head object
+ /// \param ioc Locator
+ /// \param part_header_size Total size of the part header.
+ /// \param part_entry_overhead The additional space required to
+ /// store an entry, above the size of the
+ /// entry itself.
+ FIFO(neorados::RADOS& rados,
+ neorados::Object obj,
+ neorados::IOContext ioc,
+ std::uint32_t part_header_size,
+ std::uint32_t part_entry_overhead,
+ rados::cls::fifo::info info)
+ : rados(rados), obj(std::move(obj)), ioc(std::move(ioc)),
+ part_header_size(part_header_size),
+ part_entry_overhead(part_entry_overhead),
+ info(std::move(info)) {}
+
+ /// \name Primitives
+ ///
+ /// Low-level coroutininized operations in the FIFO objclass.
+ ///@{
+
+public:
+ /// \brief Retrieve FIFO metadata
+ ///
+ /// \param rados RADOS handle
+ /// \param obj Head object for FIFO
+ /// \param ioc Locator
+ /// \param token Boost.Asio CompletionToken
+ /// \param objv Operation will fail if FIFO is not at this version
+ ///
+ /// \return The metadata info, part header size, and entry overhead
+ /// in a way appropriate to the completion token.
+ template<boost::asio::completion_token_for<
+ void(boost::system::error_code, rados::cls::fifo::info,
+ uint32_t, uint32_t)> CompletionToken>
+ static auto get_meta(neorados::RADOS& rados,
+ neorados::Object obj,
+ neorados::IOContext ioc,
+ std::optional<rados::cls::fifo::objv> objv,
+ CompletionToken&& token) {
+ namespace fifo = rados::cls::fifo;
+ neorados::ReadOp op;
+ fifo::op::get_meta gm;
+ gm.version = objv;
+ return exec<fifo::op::get_meta_reply>(
+ rados, std::move(obj), std::move(ioc),
+ fifo::op::CLASS, fifo::op::GET_META, std::move(gm),
+ [](fifo::op::get_meta_reply&& ret) {
+ return std::make_tuple(std::move(ret.info),
+ ret.part_header_size,
+ ret.part_entry_overhead);
+ }, std::forward<CompletionToken>(token));
+ }
+
+private:
+ /// \brief Retrieve part info
+ ///
+ /// \param part_num Number of part to query
+ /// \param token Boost.Asio CompletionToken
+ ///
+ /// \return The part info in a way appropriate to the completion token.
+ template<boost::asio::completion_token_for<
+ void(boost::system::error_code, rados::cls::fifo::part_header)>
+ CompletionToken>
+ auto get_part_info(std::int64_t part_num,
+ CompletionToken&& token) {
+ namespace fifo = rados::cls::fifo;
+ std::unique_lock l(m);
+ Object part_oid = info.part_oid(part_num);;
+ l.unlock();
+
+ return exec<fifo::op::get_part_info_reply>(
+ rados, std::move(std::move(part_oid)), ioc,
+ fifo::op::CLASS, fifo::op::GET_PART_INFO,
+ fifo::op::get_part_info{},
+ [](fifo::op::get_part_info_reply&& ret) {
+ return std::move(ret.header);
+ }, std::forward<CompletionToken>(token));
+ }
+
+
+ /// \brief Create a new part object
+ ///
+ /// \param part_num Part number
+ /// \param token Boost.Asio CompletionToken
+ ///
+ /// \return Nothing, but may error in a way appropriate to the
+ /// completion token.
+ template<boost::asio::completion_token_for<
+ void(boost::system::error_code)> CompletionToken>
+ auto create_part(std::int64_t part_num, CompletionToken&& token) {
+ namespace fifo = rados::cls::fifo;
+ namespace buffer = ceph::buffer;
+ neorados::WriteOp op;
+ op.create(false); /* We don't need exclusivity, part_init ensures we're
+ creating from the same journal entry. */
+ std::unique_lock l(m);
+ fifo::op::init_part ip;
+ ip.params = info.params;
+ buffer::list in;
+ encode(ip, in);
+ op.exec(fifo::op::CLASS, fifo::op::INIT_PART, std::move(in));
+ auto oid = info.part_oid(part_num);
+ l.unlock();
+ return rados.execute(oid, ioc, std::move(op),
+ std::forward<CompletionToken>(token));
+ }
+
+ /// \brief Remove a part object
+ ///
+ /// \param part_num Part number
+ /// \param token Boost.Asio CompletionToken
+ ///
+ /// \return Nothing, but may error in a way appropriate to the
+ /// completion token.
+ template<boost::asio::completion_token_for<
+ void(boost::system::error_code)> CompletionToken>
+ auto remove_part(std::int64_t part_num, CompletionToken&& token) {
+ namespace fifo = rados::cls::fifo;
+ namespace buffer = ceph::buffer;
+ neorados::WriteOp op;
+ op.remove();
+ std::unique_lock l(m);
+ auto oid = info.part_oid(part_num);
+ l.unlock();
+ return rados.execute(oid, ioc, std::move(op),
+ std::forward<CompletionToken>(token));
+ }
+
+ /// \brief Update objclass FIFO metadata
+ ///
+ /// \param objv Current metadata version (objclass will error on mismatch)
+ /// \param update Changes to make to metadata
+ /// \param token Boost.Asio CompletionToken
+ ///
+ /// \return Nothing, but may error in a way appropriate to the
+ /// completion token.
+ template<boost::asio::completion_token_for<
+ void(boost::system::error_code)> CompletionToken>
+ auto update_meta(const rados::cls::fifo::objv& objv,
+ const rados::cls::fifo::update& update,
+ CompletionToken&& token) {
+ namespace fifo = rados::cls::fifo;
+ namespace buffer = ceph::buffer;
+ neorados::WriteOp op;
+ fifo::op::update_meta um;
+
+ um.version = objv;
+ um.tail_part_num = update.tail_part_num();
+ um.head_part_num = update.head_part_num();
+ um.min_push_part_num = update.min_push_part_num();
+ um.max_push_part_num = update.max_push_part_num();
+ um.journal_entries_add = std::move(update).journal_entries_add();
+ um.journal_entries_rm = std::move(update).journal_entries_rm();
+
+ buffer::list in;
+ encode(um, in);
+ op.exec(fifo::op::CLASS, fifo::op::UPDATE_META, std::move(in));
+ return rados.execute(obj, ioc, std::move(op),
+ std::forward<CompletionToken>(token));
+ }
+
+ /// \brief Create FIFO head object
+ ///
+ /// \param rados RADOS handle
+ /// \param obj Head object for the FIFO
+ /// \param ioc Locator
+ /// \param objv Version (error if FIFO exists and this doesn't match)
+ /// \param oid_prefix Prefix for all object names
+ /// \param exclusive If true, error if object already exists
+ /// \param max_part_size Max size of part objects
+ /// \param max_entry_size Max size of entries
+ /// \param token Boost.Asio CompletionToken
+ ///
+ /// \return Nothing, but may error in a way appropriate to the
+ /// completion token.
+ template<boost::asio::completion_token_for<
+ void(boost::system::error_code)> CompletionToken>
+ static auto create_meta(neorados::RADOS& rados,
+ neorados::Object obj,
+ neorados::IOContext ioc,
+ std::optional<rados::cls::fifo::objv> objv,
+ std::optional<std::string> oid_prefix,
+ bool exclusive,
+ std::uint64_t max_part_size,
+ std::uint64_t max_entry_size,
+ CompletionToken&& token) {
+ namespace fifo = rados::cls::fifo;
+ namespace buffer = ceph::buffer;
+ neorados::WriteOp op;
+ fifo::op::create_meta cm;
+
+ cm.id = obj;
+ cm.version = objv;
+ cm.oid_prefix = oid_prefix;
+ cm.max_part_size = max_part_size;
+ cm.max_entry_size = max_entry_size;
+ cm.exclusive = exclusive;
+
+ buffer::list in;
+ encode(cm, in);
+ op.exec(fifo::op::CLASS, fifo::op::CREATE_META, in);
+ return rados.execute(std::move(obj), std::move(ioc), std::move(op),
+ std::forward<CompletionToken>(token));
+ }
+
+ /// \brief Push some entries to a given part
+ ///
+ /// \param dpp Prefix for debug prints
+ /// \param entries Entries to push
+ /// \param token Boost.Asio CompletionToken
+ ///
+ /// \return Possibly errors in a way appropriate to the completion
+ /// token.
+ template<boost::asio::completion_token_for<
+ void(boost::system::error_code, int)> CompletionToken>
+ auto push_entries(const DoutPrefixProvider* dpp,
+ std::deque<ceph::buffer::list> entries,
+ CompletionToken&& token) {
+ namespace asio = boost::asio;
+ namespace sys = boost::system;
+ namespace fifo = rados::cls::fifo;
+ namespace buffer = ceph::buffer;
+ return asio::async_initiate<CompletionToken, void(sys::error_code, int)>
+ (asio::experimental::co_composed<void(sys::error_code, int)>
+ ([](auto state, const DoutPrefixProvider* dpp,
+ std::deque<buffer::list> entries, FIFO* f) -> void {
+ try {
+ state.throw_if_cancelled(true);
+ state.reset_cancellation_state(asio::enable_terminal_cancellation());
+
+ std::unique_lock l(f->m);
+ auto head_part_num = f->info.head_part_num;
+ auto oid = f->info.part_oid(head_part_num);
+ l.unlock();
+
+ WriteOp op;
+ op.assert_exists();
+
+ fifo::op::push_part pp;
+
+ pp.data_bufs = std::move(entries);
+ pp.total_len = 0;
+
+ for (const auto &bl : pp.data_bufs)
+ pp.total_len += bl.length();
+
+ buffer::list in;
+ encode(pp, in);
+ int pushes;
+ op.exec(fifo::op::CLASS, fifo::op::PUSH_PART, in,
+ [&pushes](sys::error_code, int r, const buffer::list &) {
+ pushes = r;
+ });
+ op.returnvec();
+ co_await f->rados.execute(std::move(oid), f->ioc, std::move(op),
+ asio::deferred);
+ co_return {sys::error_code{}, pushes};
+ } catch (const sys::system_error& e) {
+ ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " push entries failed: " << e.what() << dendl;
+ co_return {e.code(), 0};
+ }
+ }, rados.get_executor()),
+ token, dpp, std::move(entries), this);
+ }
+
+ /// \brief List entries from a given part
+ ///
+ /// \param dpp Prefix provider for debug logging
+ /// \param part_num Part number to list
+ /// \param ofs Offset from which to list
+ /// \param result Span giving space for results
+ /// \param token Boost.Asio CompletionToken
+ ///
+ /// \return A subspan containing results, a bool indicating whether there
+ /// are more entries within the part, and a bool indicating whether
+ /// the part is full all in a way appropriate to the completion
+ /// token.
+ template<boost::asio::completion_token_for<
+ void(boost::system::error_code, std::span<entry>, bool, bool)>
+ CompletionToken>
+ auto list_part(const DoutPrefixProvider* dpp,
+ std::int64_t part_num, std::uint64_t ofs,
+ std::span<entry> result,
+ CompletionToken&& token) {
+ namespace asio = boost::asio;
+ namespace sys = boost::system;
+ namespace fifo = rados::cls::fifo;
+ namespace buffer = ceph::buffer;
+ return asio::async_initiate<
+ CompletionToken, void(sys::error_code, std::span<entry>, bool, bool)>
+ (asio::experimental::co_composed<
+ void(sys::error_code, std::span<entry>, bool, bool)>
+ ([](auto state, const DoutPrefixProvider* dpp, std::int64_t part_num,
+ std::uint64_t ofs,std::span<entry> result,
+ FIFO* f) -> void {
+ try {
+ state.throw_if_cancelled(true);
+ state.reset_cancellation_state(asio::enable_terminal_cancellation());
+
+ std::unique_lock l(f->m);
+ auto oid = f->info.part_oid(part_num);
+ l.unlock();
+
+ ReadOp op;
+ fifo::op::list_part lp;
+
+ lp.ofs = ofs;
+ lp.max_entries = result.size();
+
+ buffer::list in;
+ encode(lp, in);
+ buffer::list bl;
+ op.exec(fifo::op::CLASS, fifo::op::LIST_PART, in, &bl, nullptr);
+ co_await f->rados.execute(oid, f->ioc, std::move(op), nullptr,
+ asio::deferred);
+ bool more, full_part;
+ {
+ ceph::buffer::list::const_iterator bi = bl.begin();
+ DECODE_START(1, bi);
+ std::string tag;
+ decode(tag, bi);
+ uint32_t len;
+ decode(len, bi);
+ if (len > result.size()) {
+ throw buffer::end_of_buffer{};
+ }
+ result = result.first(len);
+ for (auto i = 0u; i < len; ++i) {
+ fifo::part_list_entry entry;
+ decode(entry, bi);
+ result[i] = {.data = std::move(entry.data),
+ .marker = marker{part_num, entry.ofs}.to_string(),
+ .mtime = entry.mtime};
+ }
+ decode(more, bi);
+ decode(full_part, bi);
+ DECODE_FINISH(bi);
+ }
+ co_return {sys::error_code{}, std::move(result), more, full_part};
+ } catch (const sys::system_error& e) {
+ if (e.code() != sys::errc::no_such_file_or_directory) {
+ ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " list entries failed: " << e.what() << dendl;
+ }
+ co_return {e.code(), std::span<entry>{}, false, false};
+ }
+ }, rados.get_executor()),
+ token, dpp, part_num, ofs, std::move(result), this);
+ }
+
+ /// \brief Trim entries on a given part
+ ///
+ /// \param dpp Prefix provider for debug logging
+ /// \param part_num Part to trim
+ /// \param ofs Offset to which to trim
+ /// \param exclusive If true, exclude end of range from trim
+ /// \param token Boost.Asio CompletionToken
+ ///
+ /// \return Possibly errors in a way appropriate to the completion
+ /// token.
+ template<boost::asio::completion_token_for<
+ void(boost::system::error_code)> CompletionToken>
+ auto trim_part(const DoutPrefixProvider* dpp,
+ std::int64_t part_num,
+ std::uint64_t ofs,
+ bool exclusive,
+ CompletionToken&& token) {
+ namespace asio = boost::asio;
+ namespace sys = boost::system;
+ namespace fifo = rados::cls::fifo;
+ namespace buffer = ceph::buffer;
+ return asio::async_initiate<CompletionToken, void(sys::error_code)>
+ (asio::experimental::co_composed<void(sys::error_code)>
+ ([](auto state, const DoutPrefixProvider* dpp,
+ int64_t part_num, uint64_t ofs, bool exclusive, FIFO* f) -> void {
+ try {
+ state.throw_if_cancelled(true);
+ state.reset_cancellation_state(asio::enable_terminal_cancellation());
+
+ std::unique_lock l(f->m);
+ auto oid = f->info.part_oid(part_num);
+ l.unlock();
+
+ WriteOp op;
+ fifo::op::trim_part tp;
+
+ tp.ofs = ofs;
+ tp.exclusive = exclusive;
+
+ buffer::list in;
+ encode(tp, in);
+ op.exec(fifo::op::CLASS, fifo::op::TRIM_PART, in);
+ co_await f->rados.execute(std::move(oid), f->ioc, std::move(op),
+ asio::deferred);
+ co_return sys::error_code{};
+ } catch (const sys::system_error& e) {
+ ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " trim part failed: " << e.what() << dendl;
+ co_return e.code();
+ }
+ }, rados.get_executor()),
+ token, dpp, part_num, ofs, exclusive, this);
+ }
+
+ ///@}
+
+ /// \name Logics
+ ///
+ /// Internal logic used to implement the public interface
+ ///
+ ///@{
+
+ /// \brief Convert a string to a marker
+ ///
+ /// This is not a free function since an empty string always refers
+ /// to the tail part.
+ ///
+ /// \param s String representation of the marker
+ /// \param l Ownership of mutex
+ ///
+ /// \returns The marker or nullopt if the string representation is invalid
+ std::optional<marker> to_marker(std::string_view s,
+ std::unique_lock<std::mutex>& l) {
+ assert(l.owns_lock());
+ marker m;
+ if (s.empty()) {
+ m.num = info.tail_part_num;
+ m.ofs = 0;
+ return m;
+ }
+
+ auto pos = s.find(':');
+ if (pos == s.npos) {
+ return std::nullopt;
+ }
+
+ auto num = s.substr(0, pos);
+ auto ofs = s.substr(pos + 1);
+
+ auto n = ceph::parse<decltype(m.num)>(num);
+ if (!n) {
+ return std::nullopt;
+ }
+ m.num = *n;
+ auto o = ceph::parse<decltype(m.ofs)>(ofs);
+ if (!o) {
+ return std::nullopt;
+ }
+ m.ofs = *o;
+ return m;
+ }
+
+ /// \brief Force re-read of metadata
+ ///
+ /// \param dpp Prefix provider for debug logging
+ /// \param token Boost.Asio CompletionToken
+ ///
+ /// \return Possibly errors in a way appropriate to the completion
+ /// token.
+ template<boost::asio::completion_token_for<
+ void(boost::system::error_code)> CompletionToken>
+ auto read_meta(const DoutPrefixProvider* dpp,
+ CompletionToken&& token) {
+ namespace asio = boost::asio;
+ namespace sys = boost::system;
+ return asio::async_initiate<CompletionToken, void(sys::error_code)>
+ (asio::experimental::co_composed<void(sys::error_code)>
+ ([](auto state, const DoutPrefixProvider* dpp,
+ FIFO* f) -> void {
+ try {
+ state.throw_if_cancelled(true);
+ state.reset_cancellation_state(asio::enable_terminal_cancellation());
+
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " entering" << dendl;
+ auto [info, part_header_size, part_entry_overhead]
+ = co_await get_meta(f->rados, f->obj, f->ioc, std::nullopt,
+ asio::deferred);
+ std::unique_lock l(f->m);
+ if (info.version.same_or_later(f->info.version)) {
+ f->info = std::move(info);
+ f->part_header_size = part_header_size;
+ f->part_entry_overhead = part_entry_overhead;
+ }
+ } catch (const sys::system_error& e) {
+ ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " read_meta failed: " << e.what() << dendl;
+ co_return e.code();
+ }
+ co_return sys::error_code{};
+ }, rados.get_executor()),
+ token, dpp, this);
+ }
+
+ /// \brief Update local metadata
+ ///
+ /// \param dpp Debugging prefix provider
+ /// \param objv Current metadata version (objclass will error on mismatch)
+ /// \param update Changes to make to metadata
+ /// \param token Boost.Asio CompletionToken
+ ///
+ /// \exception boost::system::system_error equivalent to
+ /// boost::system::errc::operation_canceled on version mismatch.
+ void apply_update(const DoutPrefixProvider *dpp,
+ rados::cls::fifo::info* info,
+ const rados::cls::fifo::objv& objv,
+ const rados::cls::fifo::update& update) {
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " entering" << dendl;
+ std::unique_lock l(m);
+ if (objv != info->version) {
+ ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " version mismatch, canceling" << dendl;
+ throw boost::system::system_error(ECANCELED,
+ boost::system::generic_category());
+ }
+
+ info->apply_update(update);
+ }
+
+ /// \brief Update metadata locally and in the objclass
+ ///
+ /// \param dpp Debug prefix provider
+ /// \param update The update to apply
+ /// \param objv Current object version
+ /// \param token Boost.Asio CompletionToken
+ ///
+ /// \return True if the operation was canceled. false otherwise in a
+ /// way appropriate to the completion token.
+ template<boost::asio::completion_token_for<
+ void(boost::system::error_code, bool)> CompletionToken>
+ auto update_meta(const DoutPrefixProvider* dpp,
+ rados::cls::fifo::update update,
+ rados::cls::fifo::objv version,
+ CompletionToken&& token) {
+ namespace asio = boost::asio;
+ namespace sys = boost::system;
+ namespace fifo = rados::cls::fifo;
+ return asio::async_initiate<CompletionToken, void(sys::error_code, bool)>
+ (asio::experimental::co_composed<void(sys::error_code, bool)>
+ ([](auto state, const DoutPrefixProvider* dpp,
+ fifo::update update, fifo::objv version,
+ FIFO* f) -> void {
+ try {
+ state.throw_if_cancelled(true);
+ state.reset_cancellation_state(asio::enable_terminal_cancellation());
+
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " entering" << dendl;
+
+ auto [ec] = co_await f->update_meta(version, update,
+ asio::as_tuple(asio::deferred));
+ bool canceled;
+ if (ec && ec != sys::errc::operation_canceled) {
+ throw sys::system_error(ec);
+ }
+ canceled = (ec == sys::errc::operation_canceled);
+ if (!canceled) {
+ try {
+ f->apply_update(dpp, &f->info, version, update);
+ } catch (const sys::system_error& e) {
+ if (e.code() == sys::errc::operation_canceled) {
+ canceled = true;
+ } else {
+ throw;
+ }
+ }
+ }
+ if (canceled) {
+ co_await f->read_meta(dpp, asio::deferred);
+ }
+ if (canceled) {
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " canceled" << dendl;
+ }
+ co_return {sys::error_code{}, canceled};
+ } catch (const sys::system_error& e) {
+ ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " failed with error: " << e.what() << dendl;
+ co_return {e.code(), false};
+ }
+ }, rados.get_executor()),
+ token, dpp, std::move(update), std::move(version), this);
+ }
+
+ /// \brief Process the journal
+ ///
+ /// \param dpp Debug prefix provider
+ /// \param token Boost.Asio CompletionToken
+ ///
+ /// \return Nothing, but may error in a way appropriate to the
+ /// completion token.
+ template<boost::asio::completion_token_for<
+ void(boost::system::error_code)> CompletionToken>
+ auto process_journal(const DoutPrefixProvider* dpp,
+ CompletionToken&& token) {
+ namespace asio = boost::asio;
+ namespace sys = boost::system;
+ namespace fifo = rados::cls::fifo;
+ return asio::async_initiate<CompletionToken, void(sys::error_code)>
+ (asio::experimental::co_composed<void(sys::error_code)>
+ ([](auto state, const DoutPrefixProvider* dpp, FIFO* f) -> void {
+ try {
+ state.throw_if_cancelled(true);
+ state.reset_cancellation_state(asio::enable_terminal_cancellation());
+
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " entering." << dendl;
+ std::vector<fifo::journal_entry> processed;
+
+ std::unique_lock l(f->m);
+ auto tmpjournal = f->info.journal;
+ auto new_tail = f->info.tail_part_num;
+ auto new_head = f->info.head_part_num;
+ auto new_max = f->info.max_push_part_num;
+ l.unlock();
+
+ for (auto& entry : tmpjournal) {
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " processing entry: entry=" << entry
+ << dendl;
+ switch (entry.op) {
+ using enum fifo::journal_entry::Op;
+ case create:
+ ldpp_dout(dpp, 10) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " Creating part " << entry.part_num << dendl;
+ co_await f->create_part(entry.part_num, asio::deferred);
+ if (entry.part_num > new_max) {
+ new_max = entry.part_num;
+ }
+ break;
+ case set_head:
+ ldpp_dout(dpp, 10) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " Setting head to " << entry.part_num << dendl;
+ if (entry.part_num > new_head) {
+ new_head = entry.part_num;
+ }
+ break;
+ case remove:
+ try {
+ ldpp_dout(dpp, 10) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " Removing part " << entry.part_num
+ << dendl;
+
+ co_await f->remove_part(entry.part_num, asio::deferred);
+ if (entry.part_num >= new_tail) {
+ new_tail = entry.part_num + 1;
+ }
+ } catch (const sys::system_error& e) {
+ if (e.code() != sys::errc::no_such_file_or_directory) {
+ throw;
+ }
+ }
+ break;
+ default:
+ ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " unknown journaled op: entry=" << entry
+ << dendl;
+ throw sys::system_error{EINVAL, sys::generic_category()};
+ }
+
+ processed.push_back(std::move(entry));
+ }
+
+ // Postprocess
+ bool canceled = true;
+
+ for (auto i = 0; canceled && i < MAX_RACE_RETRIES; ++i) {
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " postprocessing: i=" << i << dendl;
+
+ std::optional<std::int64_t> tail_part_num;
+ std::optional<std::int64_t> head_part_num;
+ std::optional<std::int64_t> max_part_num;
+
+ std::unique_lock l(f->m);
+ auto objv = f->info.version;
+ if (new_tail > tail_part_num)
+ tail_part_num = new_tail;
+ if (new_head > f->info.head_part_num)
+ head_part_num = new_head;
+ if (new_max > f->info.max_push_part_num)
+ max_part_num = new_max;
+ l.unlock();
+
+ if (processed.empty() && !tail_part_num && !max_part_num) {
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " nothing to update any more: i=" << i
+ << dendl;
+ canceled = false;
+ break;
+ }
+ auto u = fifo::update().tail_part_num(tail_part_num)
+ .head_part_num(head_part_num).max_push_part_num(max_part_num)
+ .journal_entries_rm(processed);
+ ldpp_dout(dpp, 10) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " Calling update_meta: update=" << u
+ << dendl;
+
+ canceled = co_await f->update_meta(dpp, u, objv, asio::deferred);
+ if (canceled) {
+ std::vector<fifo::journal_entry> new_processed;
+ std::unique_lock l(f->m);
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " update canceled, retrying: i=" << i
+ << dendl;
+ for (auto& e : processed) {
+ if (f->info.journal.contains(e)) {
+ new_processed.push_back(e);
+ }
+ }
+ processed = std::move(new_processed);
+ }
+ }
+ if (canceled) {
+ ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " canceled too many times, giving up" << dendl;
+ throw sys::system_error(ECANCELED, sys::generic_category());
+ }
+ } catch (const sys::system_error& e) {
+ ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " process_journal failed" << ":" << e.what()
+ << dendl;
+ co_return e.code();
+ }
+ co_return sys::error_code{};
+ }, rados.get_executor()),
+ token, dpp, this);
+ }
+
+ /// \brief Create a new part
+ ///
+ /// And possibly set it as head
+ ///
+ /// \param dpp Debug prefix provider
+ /// \param new_part_num New part to create
+ /// \param is_head True if part is to be new head
+ /// \param token Boost.Asio CompletionToken
+ ///
+ /// \return Nothing, but may error in a way appropriate to the
+ /// completion token.
+ template<boost::asio::completion_token_for<
+ void(boost::system::error_code)> CompletionToken>
+ auto prepare_new_part(const DoutPrefixProvider* dpp,
+ std::int64_t new_part_num, bool is_head,
+ CompletionToken&& token) {
+ namespace asio = boost::asio;
+ namespace sys = boost::system;
+ namespace fifo = rados::cls::fifo;
+ return asio::async_initiate<CompletionToken, void(sys::error_code)>
+ (asio::experimental::co_composed<void(sys::error_code)>
+ ([](auto state, const DoutPrefixProvider* dpp,
+ std::int64_t new_part_num, bool is_head, FIFO* f) -> void {
+ try {
+ state.throw_if_cancelled(true);
+ state.reset_cancellation_state(asio::enable_terminal_cancellation());
+
+
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " entering" << dendl;
+ std::unique_lock l(f->m);
+ using enum fifo::journal_entry::Op;
+ std::vector<fifo::journal_entry> jentries{{create, new_part_num}};
+ if (f->info.journal.contains({create, new_part_num}) &&
+ (!is_head || f->info.journal.contains({set_head, new_part_num}))) {
+ l.unlock();
+ ldpp_dout(dpp, 5)
+ << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " new part journaled, but not processed"
+ << dendl;
+ co_await f->process_journal(dpp, asio::deferred);
+ co_return sys::error_code{};
+ }
+ auto version = f->info.version;
+
+ if (is_head) {
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " needs new head" << dendl;
+ jentries.push_back({set_head, new_part_num});
+ }
+ l.unlock();
+
+ bool canceled = true;
+ for (auto i = 0; canceled && i < MAX_RACE_RETRIES; ++i) {
+ canceled = false;
+ ldpp_dout(dpp, 20)
+ << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " updating metadata: i=" << i << dendl;
+ auto u = fifo::update{}.journal_entries_add(jentries);
+ canceled = co_await f->update_meta(dpp, u, version,
+ asio::deferred);
+ if (canceled) {
+ std::unique_lock l(f->m);
+ version = f->info.version;
+ auto found = (f->info.journal.contains({create, new_part_num}) ||
+ f->info.journal.contains({set_head, new_part_num}));
+ if ((f->info.max_push_part_num >= new_part_num &&
+ f->info.head_part_num >= new_part_num)) {
+ ldpp_dout(dpp, 20)
+ << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " raced, but journaled and processed: i=" << i
+ << dendl;
+ co_return sys::error_code{};
+ }
+ if (found) {
+ ldpp_dout(dpp, 20)
+ << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " raced, journaled but not processed: i=" << i
+ << dendl;
+ canceled = false;
+ }
+ l.unlock();
+ }
+ }
+ if (canceled) {
+ ldpp_dout(dpp, -1)
+ << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " canceled too many times, giving up" << dendl;
+ throw sys::system_error{ECANCELED, sys::generic_category()};
+ }
+ co_await f->process_journal(dpp, asio::deferred);
+ co_return sys::error_code{};
+ } catch (const sys::system_error& e) {
+ ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " prepare_new_part failed: " << e.what()
+ << dendl;
+ co_return e.code();
+ }
+ }, rados.get_executor()),
+ token, dpp, new_part_num, is_head, this);
+ }
+
+ /// \brief Set a new part as head
+ ///
+ /// In practice we create a new part and set it as head in one go.
+ ///
+ /// \param dpp Debug prefix provider
+ /// \param new_head_part_num New head part number
+ /// \param token Boost.Asio CompletionToken
+ ///
+ /// \return Nothing, but may error in a way appropriate to the
+ /// completion token.
+ template<boost::asio::completion_token_for<
+ void(boost::system::error_code)> CompletionToken>
+ auto prepare_new_head(const DoutPrefixProvider* dpp,
+ std::int64_t new_head_part_num,
+ CompletionToken&& token) {
+ namespace asio = boost::asio;
+ namespace sys = boost::system;
+ namespace fifo = rados::cls::fifo;
+ return asio::async_initiate<CompletionToken, void(sys::error_code)>
+ (asio::experimental::co_composed<void(sys::error_code)>
+ ([](auto state, const DoutPrefixProvider* dpp,
+ std::int64_t new_head_part_num, FIFO* f) -> void {
+ try {
+ state.throw_if_cancelled(true);
+ state.reset_cancellation_state(asio::enable_terminal_cancellation());
+
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " entering" << dendl;
+ std::unique_lock l(f->m);
+ auto max_push_part_num = f->info.max_push_part_num;
+ auto version = f->info.version;
+ l.unlock();
+
+ if (max_push_part_num < new_head_part_num) {
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " need new part" << dendl;
+ co_await f->prepare_new_part(dpp, new_head_part_num, true,
+ asio::deferred);
+ std::unique_lock l(f->m);
+ if (f->info.max_push_part_num < new_head_part_num) {
+ ldpp_dout(dpp, -1)
+ << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " inconsistency, push part less than head part."
+ << dendl;
+ throw sys::system_error(EIO, sys::generic_category());
+ }
+ l.unlock();
+ }
+
+ using enum fifo::journal_entry::Op;
+ fifo::journal_entry jentry;
+ jentry.op = set_head;
+ jentry.part_num = new_head_part_num;
+
+ bool canceled = true;
+ for (auto i = 0; canceled && i < MAX_RACE_RETRIES; ++i) {
+ canceled = false;
+ ldpp_dout(dpp, 20)
+ << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " updating metadata: i=" << i << dendl;
+ auto u = fifo::update{}.journal_entries_add({{jentry}});
+ canceled = co_await f->update_meta(dpp, u, version,
+ asio::deferred);
+ if (canceled) {
+ std::unique_lock l(f->m);
+ auto found =
+ (f->info.journal.contains({create, new_head_part_num}) ||
+ f->info.journal.contains({set_head, new_head_part_num}));
+ version = f->info.version;
+ if ((f->info.head_part_num >= new_head_part_num)) {
+ ldpp_dout(dpp, 20)
+ << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " raced, but journaled and processed: i=" << i
+ << dendl;
+ co_return sys::error_code{};
+ }
+ if (found) {
+ ldpp_dout(dpp, 20)
+ << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " raced, journaled but not processed: i=" << i
+ << dendl;
+ canceled = false;
+ }
+ l.unlock();
+ }
+ }
+ if (canceled) {
+ ldpp_dout(dpp, -1)
+ << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " canceled too many times, giving up" << dendl;
+ throw sys::system_error(ECANCELED, sys::generic_category());
+ }
+ co_await f->process_journal(dpp, asio::deferred);
+ co_return sys::error_code{};
+ } catch (const sys::system_error& e) {
+ ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " prepare_new_head failed: " << e.what()
+ << dendl;
+ co_return e.code();
+ }
+ }, rados.get_executor()),
+ token, dpp, new_head_part_num, this);
+ }
+
+ ///@}
+
+
+public:
+
+ FIFO(const FIFO&) = delete;
+ FIFO& operator =(const FIFO&) = delete;
+ FIFO(FIFO&&) = delete;
+ FIFO& operator =(FIFO&&) = delete;
+
+ /// \brief Open an existing FIFO
+ ///
+ /// \param dpp Prefix provider for debug logging
+ /// \param rados RADOS handle
+ /// \param obj Head object for FIFO
+ /// \param ioc Locator
+ /// \param token Boost.Asio CompletionToken
+ /// \param objv Operation will fail if FIFO is not at this version
+ /// \param probe If true, the caller is probing the existence of the
+ /// FIFO. Don't print errors if we can't find it.
+ ///
+ /// \return A `unique_ptr` to the open FIFO in a way appropriate to
+ /// the completion token.
+ template<boost::asio::completion_token_for<
+ void(boost::system::error_code, std::unique_ptr<FIFO>)>
+ CompletionToken>
+ static auto open(const DoutPrefixProvider* dpp,
+ neorados::RADOS& rados,
+ neorados::Object obj,
+ neorados::IOContext ioc,
+ CompletionToken&& token,
+ std::optional<rados::cls::fifo::objv> objv = std::nullopt,
+ bool probe = false) {
+ namespace asio = boost::asio;
+ namespace sys = boost::system;
+ return asio::async_initiate<CompletionToken,
+ void(sys::error_code, std::unique_ptr<FIFO>)>
+ (asio::experimental::co_composed<void(sys::error_code, std::unique_ptr<FIFO>)>
+ ([](auto state, const DoutPrefixProvider* dpp, neorados::RADOS& rados,
+ neorados::Object obj, neorados::IOContext ioc,
+ std::optional<rados::cls::fifo::objv> objv, bool probe) -> void {
+ try {
+ state.throw_if_cancelled(true);
+ state.reset_cancellation_state(asio::enable_terminal_cancellation());
+ ldpp_dout(dpp, 20)
+ << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " entering" << dendl;
+ auto [info, size, over] = co_await get_meta(rados, obj, ioc, objv,
+ asio::deferred);
+ std::unique_ptr<FIFO> f(new FIFO(rados,
+ std::move(obj),
+ std::move(ioc),
+ size, over, std::move(info)));
+ probe = 0;
+
+ // If there are journal entries, process them, in case
+ // someone crashed mid-transaction.
+ if (!info.journal.empty()) {
+ ldpp_dout(dpp, 20)
+ << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " processing leftover journal" << dendl;
+ co_await f->process_journal(dpp, asio::deferred);
+ }
+ co_return {sys::error_code{}, std::move(f)};
+ } catch (const sys::system_error& e) {
+ if (!probe ||
+ (probe && !(e.code() == sys::errc::no_such_file_or_directory ||
+ e.code() == sys::errc::no_message_available))) {
+ ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " open failed:" << e.what() << dendl;
+ }
+ co_return {e.code(), std::unique_ptr<FIFO>{}};
+ }
+ }, rados.get_executor()),
+ token, dpp, std::ref(rados), std::move(obj), std::move(ioc),
+ std::move(objv), probe);
+ }
+
+ /// \brief Create and open a FIFO
+ ///
+ /// \param dpp Prefix provider for debug logging
+ /// \param rados RADOS handle
+ /// \param obj Head object for FIFO
+ /// \param ioc Locator
+ /// \param token Boost.Asio CompletionToken
+ /// \param objv Operation will fail if FIFO exists and is not at this version
+ /// \param oid_prefix Prefix for all objects
+ /// \param exclusive Fail if the FIFO already exists
+ /// \param max_part_size Maximum allowed size of parts
+ /// \param max_entry_size Maximum allowed size of entries
+ ///
+ /// \return A `unique_ptr` to the open FIFO in a way appropriate to
+ /// the completion token.
+ template<boost::asio::completion_token_for<
+ void(boost::system::error_code, std::unique_ptr<FIFO>)>
+ CompletionToken>
+ static auto create(const DoutPrefixProvider* dpp,
+ neorados::RADOS& rados,
+ neorados::Object obj,
+ neorados::IOContext ioc,
+ CompletionToken&& token,
+ std::optional<rados::cls::fifo::objv> objv = std::nullopt,
+ std::optional<std::string> oid_prefix = std::nullopt,
+ bool exclusive = false,
+ std::uint64_t max_part_size = default_max_part_size,
+ std::uint64_t max_entry_size = default_max_entry_size) {
+ namespace asio = boost::asio;
+ namespace sys = boost::system;
+ return asio::async_initiate<CompletionToken,
+ void(sys::error_code, std::unique_ptr<FIFO>)>
+ (asio::experimental::co_composed<void(sys::error_code,
+ std::unique_ptr<FIFO>)>
+ ([](auto state, const DoutPrefixProvider* dpp, neorados::RADOS& rados,
+ neorados::Object obj, neorados::IOContext ioc,
+ std::optional<rados::cls::fifo::objv> objv,
+ std::optional<std::string> oid_prefix, bool exclusive,
+ std::uint64_t max_part_size, std::uint64_t max_entry_size) -> void {
+ try {
+ state.throw_if_cancelled(true);
+ state.reset_cancellation_state(asio::enable_terminal_cancellation());
+
+ ldpp_dout(dpp, 20)
+ << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " entering" << dendl;
+ ldpp_dout(dpp, 10)
+ << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " Calling create_meta" << dendl;
+ co_await create_meta(rados, obj, ioc, objv, oid_prefix, exclusive,
+ max_part_size, max_entry_size, asio::deferred);
+ auto f = co_await open(dpp, rados, std::move(obj), std::move(ioc),
+ asio::deferred, objv);
+ co_return std::make_tuple(sys::error_code{}, std::move(f));
+ } catch (const sys::system_error& e) {
+ ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " create failed: " << e.what() << dendl;
+ co_return {e.code(), std::unique_ptr<FIFO>{}};
+ }
+ }, rados.get_executor()),
+ token, dpp, std::ref(rados), std::move(obj), std::move(ioc),
+ std::move(objv), std::move(oid_prefix), exclusive, max_part_size,
+ max_entry_size);
+ }
+
+ /// \brief Push entries to the FIFO
+ ///
+ /// \param dpp Prefix provider for debug logging
+ /// \param entries Vector of entries
+ /// \param token Boost.Asio CompletionToken
+ ///
+ /// \return Nothing, but may error in a way appropriate to the
+ /// completion token.
+ template<boost::asio::completion_token_for<
+ void(boost::system::error_code)> CompletionToken>
+ auto push(const DoutPrefixProvider* dpp,
+ std::deque<ceph::buffer::list> entries,
+ CompletionToken&& token) {
+ namespace asio = boost::asio;
+ namespace sys = boost::system;
+ namespace buffer = ceph::buffer;
+ return asio::async_initiate<CompletionToken,
+ void(sys::error_code)>
+ (asio::experimental::co_composed<void(sys::error_code)>
+ ([](auto state, const DoutPrefixProvider* dpp,
+ std::deque<buffer::list> remaining, FIFO* f) -> void {
+ try {
+ state.throw_if_cancelled(true);
+ state.reset_cancellation_state(asio::enable_terminal_cancellation());
+
+ std::unique_lock l(f->m);
+ auto max_entry_size = f->info.params.max_entry_size;
+ auto need_new_head = f->info.need_new_head();
+ auto head_part_num = f->info.head_part_num;
+ l.unlock();
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " entering" << dendl;
+ if (remaining.empty()) {
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " empty push, returning success" << dendl;
+ co_return sys::error_code{};
+ }
+
+ // Validate sizes
+ for (const auto& bl : remaining) {
+ if (bl.length() > max_entry_size) {
+ ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " entry bigger than max_entry_size"
+ << dendl;
+ co_return sys::error_code{E2BIG, sys::generic_category()};
+ }
+ }
+
+ if (need_new_head) {
+ ldpp_dout(dpp, 10) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " need new head: "
+ << head_part_num + 1 << dendl;
+ co_await f->prepare_new_head(dpp, head_part_num + 1, asio::deferred);
+ }
+
+ std::deque<buffer::list> batch;
+
+ uint64_t batch_len = 0;
+ auto retries = 0;
+ bool canceled = true;
+ while ((!remaining.empty() || !batch.empty()) &&
+ (retries <= MAX_RACE_RETRIES)) {
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " preparing push: remaining=" << remaining.size()
+ << " batch=" << batch.size() << " retries=" << retries
+ << dendl;
+ std::unique_lock l(f->m);
+ head_part_num = f->info.head_part_num;
+ auto max_part_size = f->info.params.max_part_size;
+ auto overhead = f->part_entry_overhead;
+ l.unlock();
+
+ while (!remaining.empty() &&
+ (remaining.front().length() + batch_len <= max_part_size)) {
+ /* We can send entries with data_len up to max_entry_size,
+ however, we want to also account the overhead when
+ dealing with multiple entries. Previous check doesn't
+ account for overhead on purpose. */
+ batch_len += remaining.front().length() + overhead;
+ batch.push_back(std::move(remaining.front()));
+ remaining.pop_front();
+ }
+ ldpp_dout(dpp, 20)
+ << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " prepared push: remaining=" << remaining.size()
+ << " batch=" << batch.size() << " retries=" << retries
+ << " batch_len=" << batch_len << dendl;
+
+ auto [ec, n] =
+ co_await f->push_entries(dpp, batch,
+ asio::as_tuple(asio::deferred));
+ if (ec == sys::errc::result_out_of_range) {
+ canceled = true;
+ ++retries;
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " need new head " << head_part_num + 1
+ << dendl;
+ co_await f->prepare_new_head(dpp, head_part_num + 1,
+ asio::deferred);
+ continue;
+ } else if (ec == sys::errc::no_such_file_or_directory) {
+ ldpp_dout(dpp, 20)
+ << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " racing client trimmed part, rereading metadata "
+ << dendl;
+ canceled = true;
+ ++retries;
+ co_await f->read_meta(dpp, asio::deferred);
+ continue;
+ } else if (ec) {
+ ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " push_entries failed: " << ec.message()
+ << dendl;
+ throw sys::system_error(ec);
+ }
+ assert(n >= 0);
+ // Made forward progress!
+ canceled = false;
+ retries = 0;
+ batch_len = 0;
+ if (n == ssize(batch)) {
+ batch.clear();
+ } else {
+ batch.erase(batch.begin(), batch.begin() + n);
+ for (const auto& b : batch) {
+ batch_len += b.length() + overhead;
+ }
+ }
+ }
+ if (canceled) {
+ ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " canceled too many times, giving up."
+ << dendl;
+ co_return sys::error_code{ECANCELED, sys::generic_category()};
+ }
+ co_return sys::error_code{};
+ } catch (const sys::system_error& e) {
+ ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " push failed: " << e.what() << dendl;
+ co_return e.code();
+ }
+ }, rados.get_executor()),
+ token, dpp, std::move(entries), this);
+ }
+
+ /// \brief Push an entry to the FIFO
+ ///
+ /// \param dpp Prefix provider for debug logging
+ /// \param entries Entries to push
+ /// \param token Boost.Asio CompletionToken
+ ///
+ /// \return Nothing, but may error in a way appropriate to the
+ /// completion token.
+ template<boost::asio::completion_token_for<
+ void(boost::system::error_code)> CompletionToken>
+ auto push(const DoutPrefixProvider* dpp,
+ std::span<ceph::buffer::list> entries,
+ CompletionToken&& token) {
+ namespace buffer = ceph::buffer;
+ std::deque<buffer::list> deque{std::make_move_iterator(entries.begin()),
+ std::make_move_iterator(entries.end())};
+ return push(dpp, std::move(deque), std::forward<CompletionToken>(token));
+ }
+
+ /// \brief Push an entry to the FIFO
+ ///
+ /// \param dpp Prefix provider for debug logging
+ /// \param entry Entry to push
+ /// \param token Boost.Asio CompletionToken
+ ///
+ /// \return Nothing, but may error in a way appropriate to the
+ /// completion token.
+ template<boost::asio::completion_token_for<
+ void(boost::system::error_code)> CompletionToken>
+ auto push(const DoutPrefixProvider* dpp,
+ ceph::buffer::list entry,
+ CompletionToken&& token) {
+ namespace buffer = ceph::buffer;
+ std::deque<buffer::list> entries;
+ entries.push_back(std::move(entry));
+ return push(dpp, std::move(entries), std::forward<CompletionToken>(token));
+ }
+
+ /// \brief List entries in the FIFO
+ ///
+ /// \param dpp Prefix provider for debug logging
+ /// \param markstr Marker to resume listing
+ /// \param entries Space for entries
+ /// \param token Boost.Asio CompletionToken
+ ///
+ /// \return (span<entry>, marker) where the span is long enough to hold
+ /// returned entries, and marker is non-null if the listing was
+ /// incomplete, in a way appropriate to the completion token.
+ template<boost::asio::completion_token_for<
+ void(boost::system::error_code, std::span<entry>,
+ std::string)> CompletionToken>
+ auto list(const DoutPrefixProvider* dpp,
+ std::string markstr, std::span<entry> entries,
+ CompletionToken&& token) {
+ namespace asio = boost::asio;
+ namespace sys = boost::system;
+ return asio::async_initiate<CompletionToken,
+ void(sys::error_code, std::span<entry>,
+ std::string)>
+ (asio::experimental::co_composed<void(sys::error_code,
+ std::span<entry>,
+ std::string)>
+ ([](auto state, const DoutPrefixProvider* dpp,
+ std::string markstr, std::span<entry> entries,
+ FIFO* f) -> void {
+ try {
+ state.throw_if_cancelled(true);
+ state.reset_cancellation_state(asio::enable_terminal_cancellation());
+
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " entering" << dendl;
+ std::unique_lock l(f->m);
+ std::int64_t part_num = f->info.tail_part_num;
+ std::uint64_t ofs = 0;
+ if (!markstr.empty()) {
+ auto marker = f->to_marker(markstr, l);
+ if (!marker) {
+ ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " invalid marker string: " << markstr
+ << dendl;
+ throw sys::system_error{EINVAL, sys::generic_category()};
+ }
+ part_num = marker->num;
+ ofs = marker->ofs;
+ }
+ l.unlock();
+
+ bool more = false;
+
+ auto entries_left = entries;
+
+ while (entries_left.size() > 0) {
+ more = false;
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " entries_left.size()="
+ << entries_left.size() << dendl;
+ auto [ec, res, part_more, part_full] =
+ co_await f->list_part(dpp, part_num, ofs, entries_left,
+ asio::as_tuple(asio::deferred));
+ if (ec == sys::errc::no_such_file_or_directory) {
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " missing part, rereading metadata"
+ << dendl;
+ co_await f->read_meta(dpp, asio::deferred);
+ std::unique_lock l(f->m);
+ if (part_num < f->info.tail_part_num) {
+ /* raced with trim? restart */
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " raced with trim, restarting" << dendl;
+ entries_left = entries;
+ part_num = f->info.tail_part_num;
+ l.unlock();
+ ofs = 0;
+ continue;
+ }
+ l.unlock();
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " assuming part was not written yet, "
+ << "so end of data" << dendl;
+ break;
+ } else if (ec) {
+ ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " list_entries failed: " << ec.message()
+ << dendl;
+ throw sys::system_error(ec);
+ }
+ more = part_full || part_more;
+ entries_left = entries_left.last(entries_left.size() - res.size());
+
+ if (!part_full) {
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " head part is not full, so we can assume "
+ << "we're done" << dendl;
+ break;
+ }
+ if (!part_more) {
+ ++part_num;
+ ofs = 0;
+ }
+ }
+ std::string marker;
+ if (entries_left.size() > 0) {
+ entries = entries.first(entries.size() - entries_left.size());
+ }
+ if (more && !entries.empty()) {
+ marker = entries.back().marker;
+ }
+ co_return {sys::error_code{}, std::move(entries), std::move(marker)};
+ } catch (const sys::system_error& e) {
+ ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " list failed: " << e.what() << dendl;
+ co_return {e.code(), std::span<entry>{}, std::string{}};
+ }
+ }, rados.get_executor()),
+ token, dpp, std::move(markstr), std::move(entries), this);
+ }
+
+ /// \brief Push entries to the FIFO
+ ///
+ /// \param dpp Prefix provider for debug logging
+ /// \param marker Marker to which to trim
+ /// \param exclusive If true, exclude the marked element from trim,
+ /// if false, trim it.
+ /// \param token Boost.Asio CompletionToken
+ ///
+ /// \return Nothing, but may error in a way appropriate to the
+ /// completion token.
+ template<boost::asio::completion_token_for<
+ void(boost::system::error_code)> CompletionToken>
+ auto trim(const DoutPrefixProvider* dpp,
+ std::string marker, bool exclusive,
+ CompletionToken&& token) {
+ namespace asio = boost::asio;
+ namespace sys = boost::system;
+ namespace fifo = rados::cls::fifo;
+ return asio::async_initiate<CompletionToken,
+ void(sys::error_code)>
+ (asio::experimental::co_composed<void(sys::error_code)>
+ ([](auto state, const DoutPrefixProvider* dpp,
+ std::string markstr, bool exclusive, FIFO* f) -> void {
+ try {
+ state.throw_if_cancelled(true);
+ state.reset_cancellation_state(asio::enable_terminal_cancellation());
+
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " entering" << dendl;
+ bool overshoot = false;
+ std::unique_lock l(f->m);
+ auto marker = f->to_marker(markstr, l);
+ if (!marker) {
+ ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " invalid marker string: " << markstr
+ << dendl;
+ throw sys::system_error{EINVAL, sys::generic_category()};
+ }
+ auto part_num = marker->num;
+ auto ofs = marker->ofs;
+ auto hn = f->info.head_part_num;
+ const auto max_part_size = f->info.params.max_part_size;
+ if (part_num > hn) {
+ l.unlock();
+ co_await f->read_meta(dpp, asio::deferred);
+ l.lock();
+ hn = f->info.head_part_num;
+ if (part_num > hn) {
+ overshoot = true;
+ part_num = hn;
+ ofs = max_part_size;
+ }
+ }
+ if (part_num < f->info.tail_part_num) {
+ throw sys::system_error(ENODATA, sys::generic_category());
+ }
+ auto pn = f->info.tail_part_num;
+ l.unlock();
+
+ while (pn < part_num) {
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " pn=" << pn << dendl;
+ auto [ec] = co_await f->trim_part(dpp, pn, max_part_size, false,
+ asio::as_tuple(asio::deferred));
+ if (ec && ec == sys::errc::no_such_file_or_directory) {
+ ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " trim_part failed: " << ec.message()
+ << dendl;
+ throw sys::system_error(ec);
+ }
+ ++pn;
+ }
+
+ auto [ec] = co_await f->trim_part(dpp, pn, ofs, exclusive,
+ asio::as_tuple(asio::deferred));
+ if (ec && ec == sys::errc::no_such_file_or_directory) {
+ ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " trim_part failed: " << ec.message()
+ << dendl;
+ throw sys::system_error(ec);
+ }
+
+ l.lock();
+ auto tail_part_num = f->info.tail_part_num;
+ auto objv = f->info.version;
+ l.unlock();
+ bool canceled = tail_part_num < part_num;
+ int retries = 0;
+ while ((tail_part_num < part_num) &&
+ canceled &&
+ (retries <= MAX_RACE_RETRIES)) {
+ canceled = co_await f->update_meta(dpp, fifo::update{}
+ .tail_part_num(part_num),
+ objv, asio::deferred);
+ if (canceled) {
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " canceled: retries=" << retries
+ << dendl;
+ l.lock();
+ tail_part_num = f->info.tail_part_num;
+ objv = f->info.version;
+ l.unlock();
+ ++retries;
+ }
+ }
+ if (canceled) {
+ ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " canceled too many times, giving up"
+ << dendl;
+ throw sys::system_error(EIO, sys::generic_category());
+ }
+ co_return (overshoot ?
+ sys::error_code{ENODATA, sys::generic_category()} :
+ sys::error_code{});
+ co_return sys::error_code{};
+ } catch (const sys::system_error& e) {
+ ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " trim failed: " << e.what() << dendl;
+ co_return e.code();
+ }
+ }, rados.get_executor()),
+ token, dpp, std::move(marker), exclusive, this);
+ }
+
+ static constexpr auto max_list_entries =
+ rados::cls::fifo::op::MAX_LIST_ENTRIES;
+
+ /// The default maximum size of every part object (that is, every
+ /// object holding entries)
+ static constexpr std::uint64_t default_max_part_size = 4 * 1024 * 1024;
+
+ /// The default maximum entry size
+ static constexpr std::uint64_t default_max_entry_size = 32 * 1024;
+
+ /// Return a marker comparing less than any other marker.
+ static auto min_marker() {
+ return marker{std::numeric_limits<decltype(marker::num)>::max(),
+ std::numeric_limits<decltype(marker::ofs)>::max()}
+ .to_string();
+ }
+
+ /// Return a marker comparing greater than any other marker.
+ static auto max_marker() {
+ return marker{std::numeric_limits<decltype(marker::num)>::max(),
+ std::numeric_limits<decltype(marker::ofs)>::max()}
+ .to_string();
+ }
+
+ /// \brief Get information on the last entry
+ ///
+ /// \param dpp Prefix provider for debug logging
+ /// \param token Boost.Asio CompletionToken
+ ///
+ /// \return {marker, time} for the latest entry in a way appropriate
+ /// to the completion token.
+ template<boost::asio::completion_token_for<
+ void(boost::system::error_code, std::string, ceph::real_time)>
+ CompletionToken>
+ auto last_entry_info(const DoutPrefixProvider* dpp,
+ CompletionToken&& token) {
+ namespace asio = boost::asio;
+ namespace sys = boost::system;
+ namespace buffer = ceph::buffer;
+ return asio::async_initiate<
+ CompletionToken, void(sys::error_code, std::string, ceph::real_time)>
+ (asio::experimental::co_composed<
+ void(sys::error_code, std::string, ceph::real_time)>
+ ([](auto state, const DoutPrefixProvider* dpp,
+ FIFO* f) -> void {
+ try {
+ state.throw_if_cancelled(true);
+ state.reset_cancellation_state(asio::enable_terminal_cancellation());
+
+ co_await f->read_meta(dpp, asio::deferred);
+ std::unique_lock l(f->m);
+ auto head_part_num = f->info.head_part_num;
+ l.unlock();
+
+ if (head_part_num < 0) {
+ co_return {sys::error_code{}, std::string{},
+ ceph::real_clock::zero()};
+ } else {
+ auto header =
+ co_await f->get_part_info(head_part_num, asio::deferred);
+ co_return {sys::error_code{},
+ marker{head_part_num, header.last_ofs}.to_string(),
+ header.max_time};
+ }
+ } catch (const sys::system_error& e) {
+ ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " push failed: " << e.what() << dendl;
+ co_return {e.code(), std::string{}, ceph::real_time{}};
+ }
+ }, rados.get_executor()),
+ token, dpp, this);
+ }
+};
+} // namespace neorados::cls::fifo {
+#pragma GCC diagnostic pop
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2023 IBM
+ *
+ * See file COPYING for license information.
+ *
+ */
+
+#include "neorados/cls/fifo.h"
+
+#include <array>
+#include <boost/system/detail/errc.hpp>
+#include <coroutine>
+#include <memory>
+#include <new>
+#include <string_view>
+#include <utility>
+
+#include <boost/asio/post.hpp>
+#include <boost/asio/redirect_error.hpp>
+#include <boost/asio/use_awaitable.hpp>
+
+#include <boost/system/errc.hpp>
+#include <boost/system/error_code.hpp>
+
+#include "include/neorados/RADOS.hpp"
+
+#include "cls/version/cls_version_types.h"
+
+#include "test/neorados/common_tests.h"
+
+#include "gtest/gtest.h"
+
+namespace asio = boost::asio;
+namespace sys = boost::system;
+namespace buffer = ceph::buffer;
+namespace fifo = neorados::cls::fifo;
+
+using namespace std::literals;
+
+namespace neorados::cls::fifo {
+class FIFOtest {
+public:
+ template<typename... Args>
+ static auto create_meta(Args&&... args) {
+ return FIFO::create_meta(std::forward<Args>(args)...);
+ }
+
+ template<typename... Args>
+ static auto get_meta(Args&&... args) {
+ return FIFO::get_meta(std::forward<Args>(args)...);
+ }
+
+ template<typename... Args>
+ static auto read_meta(fifo::FIFO& f, Args&&... args) {
+ return f.read_meta(std::forward<Args>(args)...);
+ }
+
+ static auto meta(fifo::FIFO& f) {
+ return f.info;
+ }
+
+ template<typename... Args>
+ static auto get_part_info(fifo::FIFO& f, Args&&... args) {
+ return f.get_part_info(std::forward<Args>(args)...);
+ }
+
+ static auto get_part_layout_info(fifo::FIFO& f) {
+ return std::make_tuple(f.part_header_size, f.part_entry_overhead);
+ }
+
+};
+}
+
+using fifo::FIFOtest;
+
+auto cct = new CephContext(CEPH_ENTITY_TYPE_CLIENT);
+const DoutPrefix dp(cct, 1, "test legacy cls fifo: ");
+
+CORO_TEST_F(cls_fifo, create, NeoRadosTest)
+{
+ std::string_view fifo_id = "fifo";
+
+
+ sys::error_code ec;
+ co_await FIFOtest::create_meta(rados(), fifo_id, pool(), std::nullopt,
+ std::nullopt, false, 0,
+ fifo::FIFO::default_max_entry_size,
+ asio::redirect_error(asio::use_awaitable, ec));
+ EXPECT_EQ(sys::errc::invalid_argument, ec);
+ co_await FIFOtest::create_meta(rados(), fifo_id, pool(), std::nullopt,
+ std::nullopt, false,
+ fifo::FIFO::default_max_part_size, 0,
+ asio::redirect_error(asio::use_awaitable, ec));
+
+ EXPECT_EQ(sys::errc::invalid_argument, ec);
+ co_await FIFOtest::create_meta(rados(), fifo_id, pool(), std::nullopt,
+ std::nullopt, false,
+ fifo::FIFO::default_max_part_size,
+ fifo::FIFO::default_max_entry_size,
+ asio::redirect_error(asio::use_awaitable, ec));
+ EXPECT_FALSE(ec);
+ neorados::ReadOp op;
+ std::uint64_t size;
+ op.stat(&size, nullptr);
+ co_await execute(fifo_id, std::move(op), nullptr);
+ EXPECT_GT(size, 0);
+ /* test idempotency */
+ co_await FIFOtest::create_meta(rados(), fifo_id, pool(), std::nullopt,
+ std::nullopt, false,
+ fifo::FIFO::default_max_part_size,
+ fifo::FIFO::default_max_entry_size,
+ asio::redirect_error(asio::use_awaitable, ec));
+ EXPECT_FALSE(ec);
+}
+
+CORO_TEST_F(cls_fifo, get_info, NeoRadosTest)
+{
+ std::string_view fifo_id = "fifo";
+
+ co_await FIFOtest::create_meta(rados(), fifo_id, pool(), std::nullopt,
+ std::nullopt, false,
+ fifo::FIFO::default_max_part_size,
+ fifo::FIFO::default_max_entry_size,
+ asio::use_awaitable);
+ auto [info, part_header_size, part_entry_overhead] =
+ co_await FIFOtest::get_meta(rados(), fifo_id, pool(), std::nullopt,
+ asio::use_awaitable);
+ EXPECT_GT(part_header_size, 0);
+ EXPECT_GT(part_entry_overhead, 0);
+ EXPECT_FALSE(info.version.instance.empty());
+
+ std::tie(info, part_header_size, part_entry_overhead) =
+ co_await FIFOtest::get_meta(rados(), fifo_id, pool(), info.version,
+ asio::use_awaitable);
+
+
+ decltype(info.version) objv;
+ objv.instance = "foo";
+ objv.ver = 12;
+
+ sys::error_code ec;
+ std::tie(info, part_header_size, part_entry_overhead) =
+ co_await FIFOtest::get_meta(rados(), fifo_id, pool(), objv,
+ asio::redirect_error(asio::use_awaitable, ec));
+
+ EXPECT_EQ(sys::errc::operation_canceled, ec);
+}
+
+CORO_TEST_F(fifo, open_default, NeoRadosTest)
+{
+ std::string_view fifo_id = "fifo";
+
+ auto f = co_await fifo::FIFO::create(&dp, rados(), fifo_id, pool(),
+ asio::use_awaitable);
+ // force reading from backend
+ co_await FIFOtest::read_meta(*f, &dp, asio::use_awaitable);
+ auto info = FIFOtest::meta(*f);
+ EXPECT_EQ(info.id, fifo_id);
+}
+
+CORO_TEST_F(fifo, open_params, NeoRadosTest)
+{
+ std::string_view fifo_id = "fifo";
+
+ const std::uint64_t max_part_size = 10 * 1024;
+ const std::uint64_t max_entry_size = 128;
+ auto oid_prefix = "foo.123."s;
+ rados::cls::fifo::objv objv;
+ objv.instance = "fooz"s;
+ objv.ver = 10;
+
+ auto f = co_await fifo::FIFO::create(&dp, rados(), fifo_id, pool(),
+ asio::use_awaitable,
+ objv, oid_prefix, false,
+ max_part_size, max_entry_size);
+
+ // force reading from backend
+ co_await FIFOtest::read_meta(*f, &dp, asio::use_awaitable);
+ auto info = FIFOtest::meta(*f);
+ EXPECT_EQ(info.id, fifo_id);
+ EXPECT_EQ(info.params.max_part_size, max_part_size);
+ EXPECT_EQ(info.params.max_entry_size, max_entry_size);
+ EXPECT_EQ(info.version, objv);
+}
+
+template<class T>
+inline T decode_entry(const fifo::entry& entry)
+{
+ T val;
+ auto iter = entry.data.cbegin();
+ decode(val, iter);
+ return std::move(val);
+}
+
+CORO_TEST_F(fifo, push_list_trim, NeoRadosTest)
+{
+ std::string_view fifo_id = "fifo";
+
+ auto f = co_await fifo::FIFO::create(&dp, rados(), fifo_id, pool(),
+ asio::use_awaitable);
+ static constexpr auto max_entries = 10u;
+ for (auto i = 0u; i < max_entries; ++i) {
+ buffer::list bl;
+ encode(i, bl);
+ co_await f->push(&dp, std::move(bl), asio::use_awaitable);
+ }
+
+ std::array<fifo::entry, max_entries> entries;
+ /* get entries one by one */
+ std::string inmark;
+ for (auto i = 0u; i < max_entries; ++i) {
+ auto [result, marker] =
+ co_await f->list(&dp, inmark, std::span{entries}.first(1),
+ asio::use_awaitable);
+ bool expected_marker = (i != (max_entries - 1));
+ EXPECT_EQ(expected_marker, !marker.empty());
+ EXPECT_EQ(1, result.size());
+
+ auto val = decode_entry<std::uint32_t>(result.front());
+
+ EXPECT_EQ(i, val);
+ inmark = marker;
+ }
+
+ /* get all entries at once */
+ std::array<std::string, max_entries> markers;
+
+ std::uint32_t min_entry = 0;
+ auto [res, marker] = co_await f->list(&dp, {}, entries,
+ asio::use_awaitable);
+
+ EXPECT_TRUE(marker.empty());
+ EXPECT_EQ(max_entries, res.size());
+ for (auto i = 0u; i < max_entries; ++i) {
+ auto val = decode_entry<std::uint32_t>(res[i]);
+ markers[i] = res[i].marker;
+ EXPECT_EQ(i, val);
+ }
+
+ /* trim one entry */
+ co_await f->trim(&dp, markers[min_entry], false, asio::use_awaitable);
+ ++min_entry;
+
+ std::tie(res, marker) = co_await f->list(&dp, {}, entries,
+ asio::use_awaitable);
+
+ EXPECT_TRUE(marker.empty());
+ EXPECT_EQ(max_entries - min_entry, res.size());
+
+ for (auto i = min_entry; i < max_entries; ++i) {
+ auto val = decode_entry<std::uint32_t>(res[i - min_entry]);
+ markers[i - min_entry] = res[i - min_entry].marker;
+ EXPECT_EQ(i, val);
+ }
+}
+
+CORO_TEST_F(fifo, push_too_big, NeoRadosTest)
+{
+ static constexpr auto max_part_size = 2048ull;
+ static constexpr auto max_entry_size = 128ull;
+ std::string_view fifo_id = "fifo";
+
+ auto f = co_await fifo::FIFO::create(&dp, rados(), fifo_id, pool(),
+ asio::use_awaitable, std::nullopt,
+ std::nullopt, false, max_part_size,
+ max_entry_size);
+
+ std::array<char, max_entry_size + 1> buf;
+ buf.fill('\0');
+ buffer::list bl;
+ bl.append(buf.data(), sizeof(buf));
+
+ sys::error_code ec;
+ co_await f->push(&dp, bl, asio::redirect_error(asio::use_awaitable, ec));
+ EXPECT_EQ(sys::errc::argument_list_too_long, ec);
+}
+
+CORO_TEST_F(fifo, multiple_parts, NeoRadosTest)
+{
+ static constexpr auto max_part_size = 2048ull;
+ static constexpr auto max_entry_size = 128ull;
+ std::string_view fifo_id = "fifo";
+
+ auto f = co_await fifo::FIFO::create(&dp, rados(), fifo_id, pool(),
+ asio::use_awaitable, std::nullopt,
+ std::nullopt, false, max_part_size,
+ max_entry_size);
+ std::array<char, max_entry_size> buf;
+ buf.fill('\0');
+ const auto [part_header_size, part_entry_overhead] =
+ FIFOtest::get_part_layout_info(*f);
+ const auto entries_per_part = ((max_part_size - part_header_size) /
+ (max_entry_size + part_entry_overhead));
+ const auto max_entries = entries_per_part * 4 + 1;
+ /* push enough entries */
+ for (auto i = 0u; i < max_entries; ++i) {
+ buffer::list bl;
+ *std::launder(reinterpret_cast<int*>(buf.data())) = i;
+ bl.append(buf.data(), sizeof(buf));
+ co_await f->push(&dp, bl, asio::use_awaitable);
+ }
+
+ auto info = FIFOtest::meta(*f);
+ EXPECT_EQ(info.id, fifo_id);
+ /* head should have advanced */
+ EXPECT_GT(info.head_part_num, 0);
+
+ std::vector<fifo::entry> entries{max_entries};
+ {
+ /* list all at once */
+ auto [result, marker] = co_await f->list(&dp, {}, entries,
+ asio::use_awaitable);
+
+ EXPECT_TRUE(marker.empty());
+ EXPECT_EQ(max_entries, result.size());
+
+ for (auto i = 0u; i < max_entries; ++i) {
+ auto& bl = result[i].data;
+ EXPECT_EQ(i, *std::launder(reinterpret_cast<int*>(bl.c_str())));
+ }
+ }
+
+ std::string marker;
+ /* get entries one by one */
+ for (auto i = 0u; i < max_entries; ++i) {
+ std::span<fifo::entry> result;
+ std::tie(result, marker) = co_await f->list(&dp, marker,
+ std::span(entries).first(1),
+ asio::use_awaitable);
+
+ EXPECT_EQ(1, result.size());
+ const bool expected_more = (i != (max_entries - 1));
+ EXPECT_EQ(expected_more, !marker.empty());
+
+ auto& e = result.front();
+ auto& bl = e.data;
+ EXPECT_EQ(i, *std::launder(reinterpret_cast<int*>(bl.c_str())));
+ }
+
+ /* trim one at a time */
+ marker.clear();
+ for (auto i = 0u; i < max_entries; ++i) {
+ /* read single entry */
+ std::span<fifo::entry> result;
+ std::tie(result, marker) = co_await f->list(&dp, {},
+ std::span(entries).first(1),
+ asio::use_awaitable);
+ EXPECT_EQ(result.size(), 1);
+ const bool expected_more = (i != (max_entries - 1));
+ EXPECT_EQ(expected_more, !marker.empty());
+
+ /* trim one entry */
+ co_await f->trim(&dp, result.front().marker, false, asio::use_awaitable);
+
+ /* check tail */
+ info = FIFOtest::meta(*f);
+
+ EXPECT_EQ(info.tail_part_num, i / entries_per_part);
+
+ /* try to read all again, see how many entries left */
+ std::tie(result, marker) = co_await f->list(&dp, marker, entries,
+ asio::use_awaitable);
+ EXPECT_EQ(max_entries - i - 1, result.size());
+ EXPECT_TRUE(marker.empty());
+ }
+
+ /* tail now should point at head */
+ info = FIFOtest::meta(*f);
+ EXPECT_EQ(info.head_part_num, info.tail_part_num);
+
+ /* check old tails are removed */
+ for (auto i = 0; i < info.tail_part_num; ++i) {
+ sys::error_code ec;
+ co_await FIFOtest::get_part_info(*f, i, asio::redirect_error(
+ asio::use_awaitable, ec));
+ EXPECT_EQ(sys::errc::no_such_file_or_directory, ec);
+ }
+ /* check current tail exists */
+ co_await FIFOtest::get_part_info(*f, info.tail_part_num, asio::use_awaitable);
+}
+
+CORO_TEST_F(fifo, two_pushers, NeoRadosTest)
+{
+ static constexpr auto max_part_size = 2048ull;
+ static constexpr auto max_entry_size = 128ull;
+ std::string_view fifo_id = "fifo";
+
+ auto f1 = co_await fifo::FIFO::create(&dp, rados(), fifo_id, pool(),
+ asio::use_awaitable, std::nullopt,
+ std::nullopt, false, max_part_size,
+ max_entry_size);
+ std::array<char, max_entry_size> buf;
+ buf.fill('\0');
+ const auto [part_header_size, part_entry_overhead] =
+ FIFOtest::get_part_layout_info(*f1);
+ const auto entries_per_part = ((max_part_size - part_header_size) /
+ (max_entry_size + part_entry_overhead));
+ const auto max_entries = entries_per_part * 4 + 1;
+
+
+ auto f2 = co_await fifo::FIFO::open(&dp, rados(), fifo_id, pool(),
+ asio::use_awaitable);
+ std::vector fifos{f1.get(), f2.get()};
+
+ for (auto i = 0u; i < max_entries; ++i) {
+ buffer::list bl;
+ *std::launder(reinterpret_cast<int*>(buf.data())) = i;
+ bl.append(buf.data(), sizeof(buf));
+ auto f = fifos[i % fifos.size()];
+ co_await f->push(&dp, bl, asio::use_awaitable);
+ }
+
+ /* list all by both */
+ std::vector<fifo::entry> entries{max_entries};
+ auto [result, marker] = co_await f1->list(&dp, {}, entries,
+ asio::use_awaitable);
+ EXPECT_TRUE(marker.empty());
+ EXPECT_EQ(max_entries, result.size());
+
+ std::tie(result, marker) = co_await f2->list(&dp, {}, entries,
+ asio::use_awaitable);
+ EXPECT_TRUE(marker.empty());
+ EXPECT_EQ(max_entries, result.size());
+
+ for (auto i = 0u; i < max_entries; ++i) {
+ auto& bl = result[i].data;
+ EXPECT_EQ(i, *std::launder(reinterpret_cast<int*>(bl.c_str())));
+ }
+}
+
+CORO_TEST_F(fifo, two_pushers_trim, NeoRadosTest)
+{
+ static constexpr auto max_part_size = 2048ull;
+ static constexpr auto max_entry_size = 128ull;
+ std::string_view fifo_id = "fifo";
+
+ auto f1 = co_await fifo::FIFO::create(&dp, rados(), fifo_id, pool(),
+ asio::use_awaitable, std::nullopt,
+ std::nullopt, false, max_part_size,
+ max_entry_size);
+ std::array<char, max_entry_size> buf;
+ buf.fill('\0');
+ const auto [part_header_size, part_entry_overhead] =
+ FIFOtest::get_part_layout_info(*f1);
+ const auto entries_per_part = ((max_part_size - part_header_size) /
+ (max_entry_size + part_entry_overhead));
+ const auto max_entries = entries_per_part * 4 + 1;
+
+
+ auto f2 = co_await fifo::FIFO::open(&dp, rados(), fifo_id, pool(),
+ asio::use_awaitable);
+ /* push one entry to f2 and the rest to f1 */
+ for (auto i = 0u; i < max_entries; ++i) {
+ buffer::list bl;
+ *std::launder(reinterpret_cast<int*>(buf.data())) = i;
+ bl.append(buf.data(), sizeof(buf));
+ auto& f = (i < 1 ? f2 : f1);
+ co_await f->push(&dp, bl, asio::use_awaitable);
+ }
+
+ /* trim half by fifo1 */
+ auto num = max_entries / 2;
+ std::vector<fifo::entry> entries{max_entries};
+ auto [result, marker] = co_await f1->list(&dp, {},
+ std::span(entries).first(num),
+ asio::use_awaitable);
+ EXPECT_TRUE(!marker.empty());
+ EXPECT_EQ(num, result.size());
+
+ for (auto i = 0u; i < num; ++i) {
+ auto& bl = result[i].data;
+ EXPECT_EQ(i, *std::launder(reinterpret_cast<int*>(bl.c_str())));
+ }
+
+ auto& entry = result[num - 1];
+ EXPECT_EQ(marker, entry.marker);
+ co_await f1->trim(&dp, marker, false, asio::use_awaitable);
+ /* list what's left by fifo2 */
+
+ const auto left = max_entries - num;
+ std::tie(result, marker) = co_await f2->list(&dp, marker,
+ std::span(entries).first(left),
+ asio::use_awaitable);
+
+ EXPECT_EQ(left, result.size());
+ EXPECT_TRUE(marker.empty());
+
+ for (auto i = num; i < max_entries; ++i) {
+ auto& bl = result[i - num].data;
+ EXPECT_EQ(i, *std::launder(reinterpret_cast<int*>(bl.c_str())));
+ }
+}
+
+CORO_TEST_F(fifo, push_batch, NeoRadosTest)
+{
+ static constexpr auto max_part_size = 2048ull;
+ static constexpr auto max_entry_size = 128ull;
+ std::string_view fifo_id = "fifo";
+
+ auto f = co_await fifo::FIFO::create(&dp, rados(), fifo_id, pool(),
+ asio::use_awaitable, std::nullopt,
+ std::nullopt, false, max_part_size,
+ max_entry_size);
+ std::array<char, max_entry_size> buf;
+ buf.fill('\0');
+ const auto [part_header_size, part_entry_overhead] =
+ FIFOtest::get_part_layout_info(*f);
+ const auto entries_per_part = ((max_part_size - part_header_size) /
+ (max_entry_size + part_entry_overhead));
+ const auto max_entries = entries_per_part * 4 + 1;
+
+
+ std::deque<buffer::list> bufs;
+ for (auto i = 0u; i < max_entries; ++i) {
+ buffer::list bl;
+ *std::launder(reinterpret_cast<int*>(buf.data())) = i;
+ bl.append(buf.data(), sizeof(buf));
+ bufs.push_back(bl);
+ }
+ EXPECT_EQ(max_entries, bufs.size());
+ co_await f->push(&dp, std::move(bufs), asio::use_awaitable);
+
+ /* list all */
+ std::vector<fifo::entry> entries{max_entries};
+ auto [result, marker] = co_await f->list(&dp, {}, entries,
+ asio::use_awaitable);
+ EXPECT_TRUE(marker.empty());
+ EXPECT_EQ(max_entries, result.size());
+ for (auto i = 0u; i < max_entries; ++i) {
+ auto& bl = result[i].data;
+ EXPECT_EQ(i, *std::launder(reinterpret_cast<int*>(bl.c_str())));
+ }
+ auto info = FIFOtest::meta(*f);
+ EXPECT_EQ(info.head_part_num, 4);
+}
+
+CORO_TEST_F(fifo, trim_exclusive, NeoRadosTest)
+{
+ std::string_view fifo_id = "fifo";
+ auto f = co_await fifo::FIFO::create(&dp, rados(), fifo_id, pool(),
+ asio::use_awaitable);
+
+ static constexpr auto max_entries = 10u;
+ for (auto i = 0u; i < max_entries; ++i) {
+ buffer::list bl;
+ encode(i, bl);
+ co_await f->push(&dp, std::move(bl), asio::use_awaitable);
+ }
+
+ std::array<fifo::entry, max_entries> entries;
+ auto [result, marker] = co_await f->list(&dp, {}, std::span{entries}.first(1),
+ asio::use_awaitable);
+ auto val = decode_entry<std::uint32_t>(result.front());
+ EXPECT_EQ(0, val);
+ EXPECT_EQ(marker, result.front().marker);
+
+ co_await f->trim(&dp, marker, true, asio::use_awaitable);
+
+ std::tie(result, marker) = co_await f->list(&dp, {}, entries,
+ asio::use_awaitable);
+ val = decode_entry<std::uint32_t>(result.front());
+ EXPECT_EQ(0, val);
+ co_await f->trim(&dp, result[4].marker, true, asio::use_awaitable);
+
+ std::tie(result, marker) = co_await f->list(&dp, {}, entries,
+ asio::use_awaitable);
+ val = decode_entry<std::uint32_t>(result.front());
+ EXPECT_EQ(4, val);
+ co_await f->trim(&dp, result.back().marker, true, asio::use_awaitable);
+
+ std::tie(result, marker) = co_await f->list(&dp, {}, entries,
+ asio::use_awaitable);
+ val = decode_entry<std::uint32_t>(result.front());
+ EXPECT_EQ(1, result.size());
+ EXPECT_EQ(max_entries - 1, val);
+}
+
+CORO_TEST_F(fifo, trim_all, NeoRadosTest)
+{
+ std::string_view fifo_id = "fifo";
+ auto f = co_await fifo::FIFO::create(&dp, rados(), fifo_id, pool(),
+ asio::use_awaitable);
+
+ static constexpr auto max_entries = 10u;
+ for (auto i = 0u; i < max_entries; ++i) {
+ buffer::list bl;
+ encode(i, bl);
+ co_await f->push(&dp, std::move(bl), asio::use_awaitable);
+ }
+
+ sys::error_code ec;
+ co_await f->trim(&dp, f->max_marker(), false,
+ asio::redirect_error(asio::use_awaitable, ec));
+ EXPECT_EQ(sys::errc::no_message_available, ec);
+
+ std::array<fifo::entry, max_entries> entries;
+ auto [result, marker] = co_await f->list(&dp, {}, entries,
+ asio::use_awaitable);
+ EXPECT_TRUE(result.empty());
+ EXPECT_TRUE(marker.empty());
+}
+
+TEST(neocls_fifo_bare, lambdata)
+{
+ asio::io_context c;
+
+ std::optional<neorados::RADOS> rados;
+ neorados::IOContext pool;
+ std::string_view fifo_id = "fifo";
+ std::unique_ptr<fifo::FIFO> f;
+ static constexpr auto max_entries = 10u;
+ std::array<fifo::entry, max_entries> list_entries;
+ bool completed = false;
+ neorados::RADOS::Builder{}.build(
+ c,
+ [&](sys::error_code ec, neorados::RADOS r_) {
+ ASSERT_FALSE(ec);
+ rados = std::move(r_);
+ create_pool(
+ *rados, get_temp_pool_name(),
+ [&](sys::error_code ec, int64_t poolid) {
+ ASSERT_FALSE(ec);
+ pool.set_pool(poolid);
+ fifo::FIFO::create(
+ &dp, *rados, fifo_id, pool,
+ [&](sys::error_code ec, std::unique_ptr<fifo::FIFO> f_) {
+ ASSERT_FALSE(ec);
+ f = std::move(f_);
+ std::array<buffer::list, max_entries> entries;
+ for (auto i = 0u; i < max_entries; ++i) {
+ encode(i, entries[i]);
+ }
+ f->push(
+ &dp, entries,
+ [&](sys::error_code ec) {
+ ASSERT_FALSE(ec);
+ f->list(
+ &dp, {}, list_entries,
+ [&](sys::error_code ec, std::span<fifo::entry> result,
+ std::string marker) {
+ ASSERT_FALSE(ec);
+ ASSERT_EQ(max_entries, result.size());
+ ASSERT_TRUE(marker.empty());
+ for (auto i = 0u; i < max_entries; ++i) {
+ auto val = decode_entry<std::uint32_t>(result[i]);
+ EXPECT_EQ(i, val);
+ }
+ f->trim(
+ &dp, f->max_marker(), false,
+ [&](sys::error_code ec) {
+ ASSERT_EQ(sys::errc::no_message_available, ec);
+ f->list(
+ &dp, {}, list_entries,
+ [&](sys::error_code ec, std::span<fifo::entry> result,
+ std::string marker) {
+ ASSERT_FALSE(ec);
+ ASSERT_TRUE(result.empty());
+ ASSERT_TRUE(marker.empty());
+ completed = true;
+ });
+ });
+ });
+ });
+ });
+ });
+ });
+ c.run();
+ ASSERT_TRUE(completed);
+}