From f89a6d1833f45220ce558919de656f99cd1c9342 Mon Sep 17 00:00:00 2001 From: "Adam C. Emerson" Date: Tue, 29 Nov 2022 23:21:51 -0500 Subject: [PATCH] cls/fifo: Remove all use of part tags Part tags make part creation and setting the head non-idempotent, leading to issues where racing RGWs may get confused about the correct tag for a part. (Or worse, potentially have the metadata header hold different value for a part than the part's header.) Consistently only requires that all nodes agree on the number. Fixes: https://tracker.ceph.com/issues/57562 Signed-off-by: Adam C. Emerson --- src/cls/fifo/cls_fifo.cc | 35 +- src/cls/fifo/cls_fifo_ops.h | 15 +- src/cls/fifo/cls_fifo_types.h | 50 +- src/neorados/CMakeLists.txt | 3 - src/neorados/cls/fifo.cc | 387 ----- src/neorados/cls/fifo.h | 1756 ----------------------- src/rgw/CMakeLists.txt | 7 +- src/rgw/driver/rados/cls_fifo_legacy.cc | 149 +- src/rgw/driver/rados/cls_fifo_legacy.h | 12 +- src/test/CMakeLists.txt | 1 - src/test/cls_fifo/CMakeLists.txt | 34 - src/test/cls_fifo/bench_cls_fifo.cc | 469 ------ src/test/cls_fifo/test_cls_fifo.cc | 741 ---------- src/test/rgw/test_cls_fifo_legacy.cc | 1 - 14 files changed, 94 insertions(+), 3566 deletions(-) delete mode 100644 src/neorados/cls/fifo.cc delete mode 100644 src/neorados/cls/fifo.h delete mode 100644 src/test/cls_fifo/CMakeLists.txt delete mode 100644 src/test/cls_fifo/bench_cls_fifo.cc delete mode 100644 src/test/cls_fifo/test_cls_fifo.cc diff --git a/src/cls/fifo/cls_fifo.cc b/src/cls/fifo/cls_fifo.cc index 14313a73516..5e2e2d42f85 100644 --- a/src/cls/fifo/cls_fifo.cc +++ b/src/cls/fifo/cls_fifo.cc @@ -117,7 +117,6 @@ int read_part_header(cls_method_context_t hctx, std::ostringstream ss; ss << part_header->max_time; CLS_LOG(5, "%s:%d read part_header:\n" - "\ttag=%s\n" "\tmagic=0x%" PRIx64 "\n" "\tmin_ofs=%" PRId64 "\n" "\tlast_ofs=%" PRId64 "\n" @@ -126,7 +125,6 @@ int read_part_header(cls_method_context_t hctx, "\tmax_index=%" PRId64 "\n" "\tmax_time=%s\n", __PRETTY_FUNCTION__, __LINE__, - part_header->tag.c_str(), part_header->magic, part_header->min_ofs, part_header->last_ofs, @@ -406,11 +404,6 @@ int init_part(cls_method_context_t hctx, ceph::buffer::list* in, std::uint64_t size; - if (op.tag.empty()) { - CLS_ERR("%s: tag required", __PRETTY_FUNCTION__); - return -EINVAL; - } - int r = cls_cxx_stat2(hctx, &size, nullptr); if (r < 0 && r != -ENOENT) { CLS_ERR("ERROR: %s: cls_cxx_stat2() on obj returned %d", __PRETTY_FUNCTION__, r); @@ -424,8 +417,7 @@ int init_part(cls_method_context_t hctx, ceph::buffer::list* in, return r; } - if (!(part_header.tag == op.tag && - part_header.params == op.params)) { + if (!(part_header.params == op.params)) { CLS_ERR("%s: failed to re-create existing part with different " "params", __PRETTY_FUNCTION__); return -EEXIST; @@ -436,7 +428,6 @@ int init_part(cls_method_context_t hctx, ceph::buffer::list* in, part_header part_header; - part_header.tag = op.tag; part_header.params = op.params; part_header.min_ofs = CLS_FIFO_MAX_PART_HEADER_SIZE; @@ -475,11 +466,6 @@ int push_part(cls_method_context_t hctx, ceph::buffer::list* in, return -EINVAL; } - if (op.tag.empty()) { - CLS_ERR("%s: tag required", __PRETTY_FUNCTION__); - return -EINVAL; - } - part_header part_header; int r = read_part_header(hctx, &part_header); if (r < 0) { @@ -487,11 +473,6 @@ int push_part(cls_method_context_t hctx, ceph::buffer::list* in, return r; } - if (!(part_header.tag == op.tag)) { - CLS_ERR("%s: bad tag", __PRETTY_FUNCTION__); - return -EINVAL; - } - std::uint64_t effective_len = op.total_len + op.data_bufs.size() * part_entry_overhead; @@ -782,12 +763,6 @@ int trim_part(cls_method_context_t hctx, return r; } - if (op.tag && - !(part_header.tag == *op.tag)) { - CLS_ERR("%s: bad tag", __PRETTY_FUNCTION__); - return -EINVAL; - } - if (op.ofs < part_header.min_ofs) { return 0; } @@ -866,12 +841,6 @@ int list_part(cls_method_context_t hctx, ceph::buffer::list* in, return r; } - if (op.tag && - !(part_header.tag == *op.tag)) { - CLS_ERR("%s: bad tag", __PRETTY_FUNCTION__); - return -EINVAL; - } - EntryReader reader(hctx, part_header, op.ofs); if (op.ofs >= part_header.min_ofs && @@ -885,8 +854,6 @@ int list_part(cls_method_context_t hctx, ceph::buffer::list* in, op::list_part_reply reply; - reply.tag = part_header.tag; - auto max_entries = std::min(op.max_entries, op::MAX_LIST_ENTRIES); for (int i = 0; i < max_entries && !reader.end(); ++i) { diff --git a/src/cls/fifo/cls_fifo_ops.h b/src/cls/fifo/cls_fifo_ops.h index 91a012c0847..e850c635c0b 100644 --- a/src/cls/fifo/cls_fifo_ops.h +++ b/src/cls/fifo/cls_fifo_ops.h @@ -149,17 +149,18 @@ WRITE_CLASS_ENCODER(update_meta) struct init_part { - std::string tag; data_params params; void encode(ceph::buffer::list& bl) const { ENCODE_START(1, 1, bl); + std::string tag; encode(tag, bl); encode(params, bl); ENCODE_FINISH(bl); } void decode(ceph::buffer::list::const_iterator& bl) { DECODE_START(1, bl); + std::string tag; decode(tag, bl); decode(params, bl); DECODE_FINISH(bl); @@ -169,12 +170,12 @@ WRITE_CLASS_ENCODER(init_part) struct push_part { - std::string tag; std::deque data_bufs; std::uint64_t total_len{0}; void encode(ceph::buffer::list& bl) const { ENCODE_START(1, 1, bl); + std::string tag; encode(tag, bl); encode(data_bufs, bl); encode(total_len, bl); @@ -182,6 +183,7 @@ struct push_part } void decode(ceph::buffer::list::const_iterator& bl) { DECODE_START(1, bl); + std::string tag; decode(tag, bl); decode(data_bufs, bl); decode(total_len, bl); @@ -192,12 +194,12 @@ WRITE_CLASS_ENCODER(push_part) struct trim_part { - std::optional tag; std::uint64_t ofs{0}; bool exclusive = false; void encode(ceph::buffer::list& bl) const { ENCODE_START(1, 1, bl); + std::optional tag; encode(tag, bl); encode(ofs, bl); encode(exclusive, bl); @@ -205,6 +207,7 @@ struct trim_part } void decode(ceph::buffer::list::const_iterator& bl) { DECODE_START(1, bl); + std::optional tag; decode(tag, bl); decode(ofs, bl); decode(exclusive, bl); @@ -215,12 +218,12 @@ WRITE_CLASS_ENCODER(trim_part) struct list_part { - std::optional tag; std::uint64_t ofs{0}; int max_entries{100}; void encode(ceph::buffer::list& bl) const { ENCODE_START(1, 1, bl); + std::optional tag; encode(tag, bl); encode(ofs, bl); encode(max_entries, bl); @@ -228,6 +231,7 @@ struct list_part } void decode(ceph::buffer::list::const_iterator& bl) { DECODE_START(1, bl); + std::optional tag; decode(tag, bl); decode(ofs, bl); decode(max_entries, bl); @@ -239,7 +243,6 @@ inline constexpr int MAX_LIST_ENTRIES = 512; struct list_part_reply { - std::string tag; std::vector entries; bool more{false}; bool full_part{false}; /* whether part is full or still can be written to. @@ -247,6 +250,7 @@ struct list_part_reply void encode(ceph::buffer::list& bl) const { ENCODE_START(1, 1, bl); + std::string tag; encode(tag, bl); encode(entries, bl); encode(more, bl); @@ -255,6 +259,7 @@ struct list_part_reply } void decode(ceph::buffer::list::const_iterator& bl) { DECODE_START(1, bl); + std::string tag; decode(tag, bl); decode(entries, bl); decode(more, bl); diff --git a/src/cls/fifo/cls_fifo_types.h b/src/cls/fifo/cls_fifo_types.h index 3ba5a293621..cc02a5dcd45 100644 --- a/src/cls/fifo/cls_fifo_types.h +++ b/src/cls/fifo/cls_fifo_types.h @@ -126,13 +126,17 @@ struct journal_entry { remove = 3, } op{Op::unknown}; - std::int64_t part_num{0}; - std::string part_tag; + std::int64_t part_num{-1}; + + journal_entry() = default; + journal_entry(Op op, std::int64_t part_num) + : op(op), part_num(part_num) {} void encode(ceph::buffer::list& bl) const { ENCODE_START(1, 1, bl); encode((int)op, bl); encode(part_num, bl); + std::string part_tag; encode(part_tag, bl); ENCODE_FINISH(bl); } @@ -142,6 +146,7 @@ struct journal_entry { decode(i, bl); op = static_cast(i); decode(part_num, bl); + std::string part_tag; decode(part_tag, bl); DECODE_FINISH(bl); } @@ -149,8 +154,7 @@ struct journal_entry { friend bool operator ==(const journal_entry& lhs, const journal_entry& rhs) { return (lhs.op == rhs.op && - lhs.part_num == rhs.part_num && - lhs.part_tag == rhs.part_tag); + lhs.part_num == rhs.part_num); } }; WRITE_CLASS_ENCODER(journal_entry) @@ -169,8 +173,7 @@ inline std::ostream& operator <<(std::ostream& m, const journal_entry::Op& o) { } inline std::ostream& operator <<(std::ostream& m, const journal_entry& j) { return m << "op: " << j.op << ", " - << "part_num: " << j.part_num << ", " - << "part_tag: " << j.part_tag; + << "part_num: " << j.part_num; } // This is actually a useful builder, since otherwise we end up with @@ -311,9 +314,6 @@ struct info { std::int64_t min_push_part_num{0}; std::int64_t max_push_part_num{-1}; - std::string head_tag; - std::map tags; - std::multimap journal; bool need_new_head() const { @@ -334,6 +334,8 @@ struct info { encode(head_part_num, bl); encode(min_push_part_num, bl); encode(max_push_part_num, bl); + std::string head_tag; + std::map tags; encode(tags, bl); encode(head_tag, bl); encode(journal, bl); @@ -349,6 +351,8 @@ struct info { decode(head_part_num, bl); decode(min_push_part_num, bl); decode(max_push_part_num, bl); + std::string head_tag; + std::map tags; decode(tags, bl); decode(head_tag, bl); decode(journal, bl); @@ -361,14 +365,6 @@ struct info { return fmt::format("{}.{}", oid_prefix, part_num); } - journal_entry next_journal_entry(std::string tag) const { - journal_entry entry; - entry.op = journal_entry::Op::create; - entry.part_num = max_push_part_num + 1; - entry.part_tag = std::move(tag); - return entry; - } - std::optional apply_update(const update& update) { if (update.tail_part_num()) { @@ -393,10 +389,6 @@ struct info { "allowed, part num={}", entry.part_num); } - if (entry.op == journal_entry::Op::create) { - tags[entry.part_num] = entry.part_tag; - } - journal.emplace(entry.part_num, entry); } @@ -405,14 +397,7 @@ struct info { } if (update.head_part_num()) { - tags.erase(head_part_num); head_part_num = *update.head_part_num(); - auto iter = tags.find(head_part_num); - if (iter != tags.end()) { - head_tag = iter->second; - } else { - head_tag.erase(); - } } return std::nullopt; @@ -428,8 +413,6 @@ inline std::ostream& operator <<(std::ostream& m, const info& i) { << "head_part_num: " << i.head_part_num << ", " << "min_push_part_num: " << i.min_push_part_num << ", " << "max_push_part_num: " << i.max_push_part_num << ", " - << "head_tag: " << i.head_tag << ", " - << "tags: {" << i.tags << "}, " << "journal: {" << i.journal; } @@ -470,8 +453,6 @@ inline std::ostream& operator <<(std::ostream& m, } struct part_header { - std::string tag; - data_params params; std::uint64_t magic{0}; @@ -485,6 +466,7 @@ struct part_header { void encode(ceph::buffer::list& bl) const { ENCODE_START(1, 1, bl); + std::string tag; encode(tag, bl); encode(params, bl); encode(magic, bl); @@ -498,6 +480,7 @@ struct part_header { } void decode(ceph::buffer::list::const_iterator& bl) { DECODE_START(1, bl); + std::string tag; decode(tag, bl); decode(params, bl); decode(magic, bl); @@ -513,8 +496,7 @@ struct part_header { WRITE_CLASS_ENCODER(part_header) inline std::ostream& operator <<(std::ostream& m, const part_header& p) { using ceph::operator <<; - return m << "tag: " << p.tag << ", " - << "params: {" << p.params << "}, " + return m << "params: {" << p.params << "}, " << "magic: " << p.magic << ", " << "min_ofs: " << p.min_ofs << ", " << "last_ofs: " << p.last_ofs << ", " diff --git a/src/neorados/CMakeLists.txt b/src/neorados/CMakeLists.txt index 3c7aee7c6fe..c66043ac9f9 100644 --- a/src/neorados/CMakeLists.txt +++ b/src/neorados/CMakeLists.txt @@ -39,6 +39,3 @@ target_link_libraries(libneorados PRIVATE # ${BLKID_LIBRARIES} ${CRYPTO_LIBS} ${EXTRALIBS}) # target_link_libraries(libneorados ${rados_libs}) # install(TARGETS libneorados DESTINATION ${CMAKE_INSTALL_LIBDIR}) -add_library(neorados_cls_fifo STATIC cls/fifo.cc) -target_link_libraries(neorados_cls_fifo PRIVATE - libneorados ceph-common fmt::fmt) diff --git a/src/neorados/cls/fifo.cc b/src/neorados/cls/fifo.cc deleted file mode 100644 index f95ede5ff8f..00000000000 --- a/src/neorados/cls/fifo.cc +++ /dev/null @@ -1,387 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab -/* - * Ceph - scalable distributed file system - * - * Copyright (C) 2020 Red Hat - * Author: Adam C. Emerson - * - * This is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software - * Foundation. See file COPYING. - * - */ - -#include -#include -#include -#include - -#undef FMT_HEADER_ONLY -#define FMT_HEADER_ONLY 1 -#include - -#include - -#include "include/neorados/RADOS.hpp" - -#include "include/buffer.h" - -#include "common/random_string.h" - -#include "cls/fifo/cls_fifo_types.h" -#include "cls/fifo/cls_fifo_ops.h" - -#include "fifo.h" - -using namespace std; - -namespace neorados::cls::fifo { -namespace bs = boost::system; -namespace cb = ceph::buffer; -namespace fifo = rados::cls::fifo; - -void create_meta(WriteOp& op, std::string_view id, - std::optional objv, - std::optional oid_prefix, - bool exclusive, - std::uint64_t max_part_size, - std::uint64_t max_entry_size) -{ - fifo::op::create_meta cm; - - cm.id = id; - cm.version = objv; - cm.oid_prefix = oid_prefix; - cm.max_part_size = max_part_size; - cm.max_entry_size = max_entry_size; - cm.exclusive = exclusive; - - cb::list in; - encode(cm, in); - op.exec(fifo::op::CLASS, fifo::op::CREATE_META, in); -} - -void get_meta(ReadOp& op, std::optional objv, - bs::error_code* ec_out, fifo::info* info, - std::uint32_t* part_header_size, - std::uint32_t* part_entry_overhead) -{ - fifo::op::get_meta gm; - gm.version = objv; - cb::list in; - encode(gm, in); - op.exec(fifo::op::CLASS, fifo::op::GET_META, in, - [ec_out, info, part_header_size, - part_entry_overhead](bs::error_code ec, const cb::list& bl) { - fifo::op::get_meta_reply reply; - if (!ec) try { - auto iter = bl.cbegin(); - decode(reply, iter); - } catch (const cb::error& err) { - ec = err.code(); - } - if (ec_out) *ec_out = ec; - if (info) *info = std::move(reply.info); - if (part_header_size) *part_header_size = reply.part_header_size; - if (part_entry_overhead) - *part_entry_overhead = reply.part_entry_overhead; - }); -}; - -void update_meta(WriteOp& op, const fifo::objv& objv, - const fifo::update& update) -{ - fifo::op::update_meta um; - - um.version = objv; - um.tail_part_num = update.tail_part_num(); - um.head_part_num = update.head_part_num(); - um.min_push_part_num = update.min_push_part_num(); - um.max_push_part_num = update.max_push_part_num(); - um.journal_entries_add = std::move(update).journal_entries_add(); - um.journal_entries_rm = std::move(update).journal_entries_rm(); - - cb::list in; - encode(um, in); - op.exec(fifo::op::CLASS, fifo::op::UPDATE_META, in); -} - -void part_init(WriteOp& op, std::string_view tag, - fifo::data_params params) -{ - fifo::op::init_part ip; - - ip.tag = tag; - ip.params = params; - - cb::list in; - encode(ip, in); - op.exec(fifo::op::CLASS, fifo::op::INIT_PART, in); -} - -void push_part(WriteOp& op, std::string_view tag, - std::deque data_bufs, - fu2::unique_function f) -{ - fifo::op::push_part pp; - - pp.tag = tag; - pp.data_bufs = data_bufs; - pp.total_len = 0; - - for (const auto& bl : data_bufs) - pp.total_len += bl.length(); - - cb::list in; - encode(pp, in); - op.exec(fifo::op::CLASS, fifo::op::PUSH_PART, in, - [f = std::move(f)](bs::error_code ec, int r, const cb::list&) mutable { - std::move(f)(ec, r); - }); - op.returnvec(); -} - -void trim_part(WriteOp& op, - std::optional tag, - std::uint64_t ofs, bool exclusive) -{ - fifo::op::trim_part tp; - - tp.tag = tag; - tp.ofs = ofs; - tp.exclusive = exclusive; - - bufferlist in; - encode(tp, in); - op.exec(fifo::op::CLASS, fifo::op::TRIM_PART, in); -} - -void list_part(ReadOp& op, - std::optional tag, - std::uint64_t ofs, - std::uint64_t max_entries, - bs::error_code* ec_out, - std::vector* entries, - bool* more, - bool* full_part, - std::string* ptag) -{ - fifo::op::list_part lp; - - lp.tag = tag; - lp.ofs = ofs; - lp.max_entries = max_entries; - - bufferlist in; - encode(lp, in); - op.exec(fifo::op::CLASS, fifo::op::LIST_PART, in, - [entries, more, full_part, ptag, ec_out](bs::error_code ec, - const cb::list& bl) { - if (ec) { - if (ec_out) *ec_out = ec; - return; - } - - fifo::op::list_part_reply reply; - auto iter = bl.cbegin(); - try { - decode(reply, iter); - } catch (const cb::error& err) { - if (ec_out) *ec_out = ec; - return; - } - - if (entries) *entries = std::move(reply.entries); - if (more) *more = reply.more; - if (full_part) *full_part = reply.full_part; - if (ptag) *ptag = reply.tag; - }); -} - -void get_part_info(ReadOp& op, - bs::error_code* out_ec, - fifo::part_header* header) -{ - fifo::op::get_part_info gpi; - - bufferlist in; - encode(gpi, in); - op.exec(fifo::op::CLASS, fifo::op::GET_PART_INFO, in, - [out_ec, header](bs::error_code ec, const cb::list& bl) { - if (ec) { - if (out_ec) *out_ec = ec; - } - fifo::op::get_part_info_reply reply; - auto iter = bl.cbegin(); - try { - decode(reply, iter); - } catch (const cb::error& err) { - if (out_ec) *out_ec = ec; - return; - } - - if (header) *header = std::move(reply.header); - }); -} - -std::optional FIFO::to_marker(std::string_view s) { - marker m; - if (s.empty()) { - m.num = info.tail_part_num; - m.ofs = 0; - return m; - } - - auto pos = s.find(':'); - if (pos == string::npos) { - return std::nullopt; - } - - auto num = s.substr(0, pos); - auto ofs = s.substr(pos + 1); - - auto n = ceph::parse(num); - if (!n) { - return std::nullopt; - } - m.num = *n; - auto o = ceph::parse(ofs); - if (!o) { - return std::nullopt; - } - m.ofs = *o; - return m; -} - -bs::error_code FIFO::apply_update(fifo::info* info, - const fifo::objv& objv, - const fifo::update& update) { - std::unique_lock l(m); - auto err = info->apply_update(update); - if (objv != info->version) { - ldout(r->cct(), 0) << __func__ << "(): Raced locally!" << dendl; - return errc::raced; - } - if (err) { - ldout(r->cct(), 0) << __func__ << "(): ERROR: " << err << dendl; - return errc::update_failed; - } - - ++info->version.ver; - - return {}; -} - -std::string FIFO::generate_tag() const -{ - static constexpr auto HEADER_TAG_SIZE = 16; - return gen_rand_alphanumeric_plain(r->cct(), HEADER_TAG_SIZE); -} - -#pragma GCC diagnostic push -#pragma GCC diagnostic ignored "-Wnon-virtual-dtor" -#pragma clang diagnostic push -#pragma clang diagnostic ignored "-Wnon-virtual-dtor" -class error_category : public ceph::converting_category { -public: - error_category(){} - const char* name() const noexcept override; - const char* message(int ev, char*, std::size_t) const noexcept override; - std::string message(int ev) const override; - bs::error_condition default_error_condition(int ev) const noexcept - override; - bool equivalent(int ev, const bs::error_condition& c) const - noexcept override; - using ceph::converting_category::equivalent; - int from_code(int ev) const noexcept override; -}; -#pragma GCC diagnostic pop -#pragma clang diagnostic pop - -const char* error_category::name() const noexcept { - return "FIFO"; -} - -const char* error_category::message(int ev, char*, std::size_t) const noexcept { - if (ev == 0) - return "No error"; - - switch (static_cast(ev)) { - case errc::raced: - return "Retry-race count exceeded"; - - case errc::inconsistency: - return "Inconsistent result! New head before old head"; - - case errc::entry_too_large: - return "Pushed entry too large"; - - case errc::invalid_marker: - return "Invalid marker string"; - - case errc::update_failed: - return "Update failed"; - } - - return "Unknown error"; -} - -std::string error_category::message(int ev) const { - return message(ev, nullptr, 0); -} - -bs::error_condition -error_category::default_error_condition(int ev) const noexcept { - switch (static_cast(ev)) { - case errc::raced: - return bs::errc::operation_canceled; - - case errc::inconsistency: - return bs::errc::io_error; - - case errc::entry_too_large: - return bs::errc::value_too_large; - - case errc::invalid_marker: - return bs::errc::invalid_argument; - - case errc::update_failed: - return bs::errc::invalid_argument; - } - - return { ev, *this }; -} - -bool error_category::equivalent(int ev, const bs::error_condition& c) const noexcept { - return default_error_condition(ev) == c; -} - -int error_category::from_code(int ev) const noexcept { - switch (static_cast(ev)) { - case errc::raced: - return -ECANCELED; - - case errc::inconsistency: - return -EIO; - - case errc::entry_too_large: - return -E2BIG; - - case errc::invalid_marker: - return -EINVAL; - - case errc::update_failed: - return -EINVAL; - - } - return -EDOM; -} - -const bs::error_category& error_category() noexcept { - static const class error_category c; - return c; -} - -} diff --git a/src/neorados/cls/fifo.h b/src/neorados/cls/fifo.h deleted file mode 100644 index b95f7f94ffe..00000000000 --- a/src/neorados/cls/fifo.h +++ /dev/null @@ -1,1756 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab -/* - * Ceph - scalable distributed file system - * - * Copyright (C) 2020 Red Hat - * Author: Adam C. Emerson - * - * This is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software - * Foundation. See file COPYING. - * - */ - -#ifndef CEPH_NEORADOS_CLS_FIFIO_H -#define CEPH_NEORADOS_CLS_FIFIO_H - -#include -#include -#include -#include -#include -#include -#include -#include - -#include -#include - -#undef FMT_HEADER_ONLY -#define FMT_HEADER_ONLY 1 -#include - -#include "include/neorados/RADOS.hpp" -#include "include/buffer.h" - -#include "common/allocate_unique.h" -#include "common/async/bind_handler.h" -#include "common/async/bind_like.h" -#include "common/async/completion.h" -#include "common/async/forward_handler.h" - -#include "common/dout.h" - -#include "cls/fifo/cls_fifo_types.h" -#include "cls/fifo/cls_fifo_ops.h" - -namespace neorados::cls::fifo { -namespace ba = boost::asio; -namespace bs = boost::system; -namespace ca = ceph::async; -namespace cb = ceph::buffer; -namespace fifo = rados::cls::fifo; - -inline constexpr auto dout_subsys = ceph_subsys_rados; -inline constexpr std::uint64_t default_max_part_size = 4 * 1024 * 1024; -inline constexpr std::uint64_t default_max_entry_size = 32 * 1024; -inline constexpr auto MAX_RACE_RETRIES = 10; - - -const boost::system::error_category& error_category() noexcept; - -enum class errc { - raced = 1, - inconsistency, - entry_too_large, - invalid_marker, - update_failed -}; -} - -namespace boost::system { -template<> -struct is_error_code_enum<::neorados::cls::fifo::errc> { - static const bool value = true; -}; -template<> -struct is_error_condition_enum<::neorados::cls::fifo::errc> { - static const bool value = false; -}; -} - -namespace neorados::cls::fifo { -// explicit conversion: -inline bs::error_code make_error_code(errc e) noexcept { - return { static_cast(e), error_category() }; -} - -inline bs::error_code make_error_category(errc e) noexcept { - return { static_cast(e), error_category() }; -} - -void create_meta(WriteOp& op, std::string_view id, - std::optional objv, - std::optional oid_prefix, - bool exclusive = false, - std::uint64_t max_part_size = default_max_part_size, - std::uint64_t max_entry_size = default_max_entry_size); -void get_meta(ReadOp& op, std::optional objv, - bs::error_code* ec_out, fifo::info* info, - std::uint32_t* part_header_size, - std::uint32_t* part_entry_overhead); - -void update_meta(WriteOp& op, const fifo::objv& objv, - const fifo::update& desc); - -void part_init(WriteOp& op, std::string_view tag, - fifo::data_params params); - -void push_part(WriteOp& op, std::string_view tag, - std::deque data_bufs, - fu2::unique_function); -void trim_part(WriteOp& op, std::optional tag, - std::uint64_t ofs, - bool exclusive); -void list_part(ReadOp& op, - std::optional tag, - std::uint64_t ofs, - std::uint64_t max_entries, - bs::error_code* ec_out, - std::vector* entries, - bool* more, - bool* full_part, - std::string* ptag); -void get_part_info(ReadOp& op, - bs::error_code* out_ec, - fifo::part_header* header); - -struct marker { - std::int64_t num = 0; - std::uint64_t ofs = 0; - - marker() = default; - marker(std::int64_t num, std::uint64_t ofs) : num(num), ofs(ofs) {} - static marker max() { - return { std::numeric_limits::max(), - std::numeric_limits::max() }; - } - - std::string to_string() { - return fmt::format("{:0>20}:{:0>20}", num, ofs); - } -}; - -struct list_entry { - cb::list data; - std::string marker; - ceph::real_time mtime; -}; - -using part_info = fifo::part_header; - -namespace detail { -template -class JournalProcessor; -} - -/// Completions, Handlers, and CompletionTokens -/// =========================================== -/// -/// This class is based on Boost.Asio. For information, see -/// https://www.boost.org/doc/libs/1_74_0/doc/html/boost_asio.html -/// -/// As summary, Asio's design is that of functions taking completion -/// handlers. Every handler has a signature, like -/// (boost::system::error_code, std::string). The completion handler -/// receives the result of the function, and the signature is the type -/// of that result. -/// -/// The completion handler is specified with a CompletionToken. The -/// CompletionToken is any type that has a specialization of -/// async_complete and async_result. See -/// https://www.boost.org/doc/libs/1_74_0/doc/html/boost_asio/reference/async_completion.html -/// and https://www.boost.org/doc/libs/1_74_0/doc/html/boost_asio/reference/async_result.html -/// -/// The return type of a function taking a CompletionToken is -/// async_result::return_type. -/// -/// Functions -/// --------- -/// -/// The default implementations treat whatever value is described as a -/// function, whose parameters correspond to the signature, and calls -/// it upon completion. -/// -/// EXAMPLE: -/// Let f be an asynchronous function whose signature is (bs::error_code, int) -/// Let g be an asynchronous function whose signature is -/// (bs::error_code, int, std::string). -/// -/// -/// f([](bs::error_code ec, int i) { ... }); -/// g([](bs::error_code ec, int i, std::string s) { ... }); -/// -/// Will schedule asynchronous tasks, and the provided lambdas will be -/// called on completion. In this case, f and g return void. -/// -/// There are other specializations. Commonly used ones are. -/// -/// Futures -/// ------- -/// -/// A CompletionToken of boost::asio::use_future will complete with a -/// promise whose type matches (minus any initial error_code) the -/// function's signature. The corresponding future is returned. If the -/// error_code of the result is non-zero, the future is set with an -/// exception of type boost::asio::system_error. -/// -/// See https://www.boost.org/doc/libs/1_74_0/doc/html/boost_asio/reference/use_future_t.html -/// -/// EXAMPLE: -/// -/// std::future = f(ba::use_future); -/// std::future = g(ba::use_future). -/// -/// Coroutines -/// ---------- -/// -/// A CompletionToken of type spawn::yield_context suspends execution -/// of the current coroutine until completion of the operation. See -/// src/spawn/README.md -/// https://www.boost.org/doc/libs/1_74_0/doc/html/boost_asio/reference/spawn.html and -/// https://www.boost.org/doc/libs/1_74_0/doc/html/boost_asio/reference/yield_context.html -/// -/// Operations given this CompletionToken return their results, modulo -/// any leading error_code. A non-zero error code will be thrown, by -/// default, but may be bound to a variable instead with the overload -/// of the array-subscript oeprator. -/// -/// EXAMPLE: -/// // Within a function with a yield_context parameter named y -/// -/// try { -/// int i = f(y); -/// } catch (const bs::system_error& ec) { ... } -/// -/// bs::error_code ec; -/// auto [i, s] = g(y[ec]); -/// -/// Blocking calls -/// -------------- -/// -/// ceph::async::use_blocked, defined in src/common/async/blocked_completion.h -/// Suspends the current thread of execution, returning the results of -/// the operation on resumption. Its calling convention is analogous to -/// that of yield_context. -/// -/// EXAMPLE: -/// try { -/// int i = f(ca::use_blocked); -/// } catch (const bs::system_error& e) { ... } -/// -/// bs::error_code ec; -/// auto [i, s] = g(ca::use_blocked[ec]); -/// -/// librados Completions -/// -------------------- -/// -/// If src/common/async/librados_completion.h is included in the -/// current translation unit, then librados::AioCompletion* may be used -/// as a CompletionToken. This is only permitted when the completion -/// signature is either bs::system_error or void. The return type of -/// functions provided a CompletionToken of AioCompletion* is void. If -/// the signature includes an error code and the error code is set, -/// then the error is translated to an int which is set as the result -/// of the AioCompletion. -/// -/// EXAMPLE: -/// // Assume an asynchronous function h whose signature is bs::error_code. -/// -/// AioCompletion* c = Rados::aio_create_completion(); -/// h(c); -/// int r = c.get_return_value(); -/// -/// See also src/test/cls_fifo/bench_cls_fifo.cc for a full, simple -/// example of a program using this class with coroutines. -/// -/// -/// Markers -/// ======= -/// -/// Markers represent a position within the FIFO. Internally, they are -/// part/offset pairs. Externally, they are ordered but otherwise -/// opaque strings. Markers that compare lower denote positions closer -/// to the tail. -/// -/// A marker is returned with every entry from a list() operation. They -/// may be supplied to a list operation to resume from a given -/// position, and must be supplied to trim give the position to which -/// to trim. - -class FIFO { -public: - - FIFO(const FIFO&) = delete; - FIFO& operator =(const FIFO&) = delete; - FIFO(FIFO&&) = delete; - FIFO& operator =(FIFO&&) = delete; - - /// Open an existing FIFO. - /// Signature: (bs::error_code ec, std::unique_ptr f) - template - static auto open(RADOS& r, //< RADOS handle - const IOContext& ioc, //< Context for pool, namespace, etc. - Object oid, //< OID for the 'main' object of the FIFO - CT&& ct, //< CompletionToken - /// Fail if is not this version - std::optional objv = std::nullopt, - /// Default executor. By default use the one - /// associated with the RADOS handle. - std::optional executor = std::nullopt) { - ba::async_completion)> init(ct); - auto e = ba::get_associated_executor(init.completion_handler, - executor.value_or(r.get_executor())); - auto a = ba::get_associated_allocator(init.completion_handler); - _read_meta_( - &r, oid, ioc, objv, - ca::bind_ea( - e, a, - [&r, ioc, oid, executor, handler = std::move(init.completion_handler)] - (bs::error_code ec, fifo::info info, - std::uint32_t size, std::uint32_t over) mutable { - std::unique_ptr f( - new FIFO(r, ioc, oid, executor.value_or(r.get_executor()))); - f->info = info; - f->part_header_size = size; - f->part_entry_overhead = over; - // If there are journal entries, process them, in case - // someone crashed mid-transaction. - if (!ec && !info.journal.empty()) { - auto e = ba::get_associated_executor(handler, f->get_executor()); - auto a = ba::get_associated_allocator(handler); - auto g = f.get(); - g->_process_journal( - ca::bind_ea( - e, a, - [f = std::move(f), - handler = std::move(handler)](bs::error_code ec) mutable { - std::move(handler)(ec, std::move(f)); - })); - return; - } - std::move(handler)(ec, std::move(f)); - return; - })); - return init.result.get(); - } - - /// Open an existing or create a new FIFO. - /// Signature: (bs::error_code ec, std::unique_ptr f) - template - static auto create(RADOS& r, /// RADOS handle - const IOContext& ioc, /// Context for pool, namespace, etc. - Object oid, /// OID for the 'main' object of the FIFO - CT&& ct, /// CompletionToken - /// Fail if FIFO exists and is not this version - std::optional objv = std::nullopt, - /// Custom prefix for parts - std::optional oid_prefix = std::nullopt, - /// Fail if FIFO already exists - bool exclusive = false, - /// Size at which a part is considered full - std::uint64_t max_part_size = default_max_part_size, - /// Maximum size of any entry - std::uint64_t max_entry_size = default_max_entry_size, - /// Default executor. By default use the one - /// associated with the RADOS handle. - std::optional executor = std::nullopt) { - ba::async_completion)> init(ct); - WriteOp op; - create_meta(op, oid, objv, oid_prefix, exclusive, max_part_size, - max_entry_size); - auto e = ba::get_associated_executor(init.completion_handler, - executor.value_or(r.get_executor())); - auto a = ba::get_associated_allocator(init.completion_handler); - r.execute( - oid, ioc, std::move(op), - ca::bind_ea( - e, a, - [objv, &r, ioc, oid, executor, handler = std::move(init.completion_handler)] - (bs::error_code ec) mutable { - if (ec) { - std::move(handler)(ec, nullptr); - return; - } - auto e = ba::get_associated_executor( - handler, executor.value_or(r.get_executor())); - auto a = ba::get_associated_allocator(handler); - FIFO::_read_meta_( - &r, oid, ioc, objv, - ca::bind_ea( - e, a, - [&r, ioc, executor, oid, handler = std::move(handler)] - (bs::error_code ec, fifo::info info, - std::uint32_t size, std::uint32_t over) mutable { - std::unique_ptr f( - new FIFO(r, ioc, oid, executor.value_or(r.get_executor()))); - f->info = info; - f->part_header_size = size; - f->part_entry_overhead = over; - if (!ec && !info.journal.empty()) { - auto e = ba::get_associated_executor(handler, - f->get_executor()); - auto a = ba::get_associated_allocator(handler); - auto g = f.get(); - g->_process_journal( - ca::bind_ea( - e, a, - [f = std::move(f), handler = std::move(handler)] - (bs::error_code ec) mutable { - std::move(handler)(ec, std::move(f)); - })); - return; - } - std::move(handler)(ec, std::move(f)); - })); - })); - return init.result.get(); - } - - /// Force a re-read of FIFO metadata. - /// Signature: (bs::error_code ec) - template - auto read_meta(CT&& ct, //< CompletionToken - /// Fail if FIFO not at this version - std::optional objv = std::nullopt) { - std::unique_lock l(m); - auto version = info.version; - l.unlock(); - ba::async_completion init(ct); - auto e = ba::get_associated_executor(init.completion_handler, - get_executor()); - auto a = ba::get_associated_allocator(init.completion_handler); - _read_meta_( - r, oid, ioc, objv, - ca::bind_ea( - e, a, - [this, version, handler = std::move(init.completion_handler)] - (bs::error_code ec, fifo::info newinfo, - std::uint32_t size, std::uint32_t over) mutable { - std::unique_lock l(m); - if (version == info.version) { - info = newinfo; - part_header_size = size; - part_entry_overhead = over; - } - l.unlock(); - return std::move(handler)(ec); - })); - return init.result.get(); - } - - /// Return a reference to currently known metadata - const fifo::info& meta() const { - return info; - } - - /// Return header size and entry overhead of partitions. - std::pair get_part_layout_info() { - return {part_header_size, part_entry_overhead}; - } - - /// Push a single entry to the FIFO. - /// Signature: (bs::error_code) - template - auto push(const cb::list& bl, //< Bufferlist holding entry to push - CT&& ct //< CompletionToken - ) { - return push(std::vector{ bl }, std::forward(ct)); - } - - /// Push a many entries to the FIFO. - /// Signature: (bs::error_code) - template - auto push(const std::vector& data_bufs, //< Entries to push - CT&& ct //< CompletionToken - ) { - ba::async_completion init(ct); - std::unique_lock l(m); - auto max_entry_size = info.params.max_entry_size; - auto need_new_head = info.need_new_head(); - l.unlock(); - auto e = ba::get_associated_executor(init.completion_handler, - get_executor()); - auto a = ba::get_associated_allocator(init.completion_handler); - if (data_bufs.empty() ) { - // Can't fail if you don't try. - e.post(ca::bind_handler(std::move(init.completion_handler), - bs::error_code{}), a); - return init.result.get(); - } - - // Validate sizes - for (const auto& bl : data_bufs) { - if (bl.length() > max_entry_size) { - ldout(r->cct(), 10) << __func__ << "(): entry too large: " - << bl.length() << " > " - << info.params.max_entry_size << dendl; - e.post(ca::bind_handler(std::move(init.completion_handler), - errc::entry_too_large), a); - return init.result.get(); - } - } - - auto p = ca::bind_ea(e, a, - Pusher(this, {data_bufs.begin(), data_bufs.end()}, - {}, 0, std::move(init.completion_handler))); - - if (need_new_head) { - _prepare_new_head(std::move(p)); - } else { - e.dispatch(std::move(p), a); - } - return init.result.get(); - } - - /// List the entries in a FIFO - /// Signature(bs::error_code ec, bs::vector entries, bool more) - /// - /// More is true if entries beyond the last exist. - /// The list entries are of the form: - /// data - Contents of the entry - /// marker - String representing the position of this entry within the FIFO. - /// mtime - Time (on the OSD) at which the entry was pushed. - template - auto list(int max_entries, //< Maximum number of entries to fetch - /// Optionally, a marker indicating the position after - /// which to begin listing. If null, begin at the tail. - std::optional markstr, - CT&& ct //< CompletionToken - ) { - ba::async_completion, bool)> init(ct); - std::unique_lock l(m); - std::int64_t part_num = info.tail_part_num; - l.unlock(); - std::uint64_t ofs = 0; - auto a = ba::get_associated_allocator(init.completion_handler); - auto e = ba::get_associated_executor(init.completion_handler); - - if (markstr) { - auto marker = to_marker(*markstr); - if (!marker) { - ldout(r->cct(), 0) << __func__ - << "(): failed to parse marker (" << *markstr - << ")" << dendl; - e.post(ca::bind_handler(std::move(init.completion_handler), - errc::invalid_marker, - std::vector{}, false), a); - return init.result.get(); - } - part_num = marker->num; - ofs = marker->ofs; - } - - using handler_type = decltype(init.completion_handler); - auto ls = ceph::allocate_unique>( - a, this, part_num, ofs, max_entries, - std::move(init.completion_handler)); - ls.release()->list(); - return init.result.get(); - } - - /// Trim entries from the tail to the given position - /// Signature: (bs::error_code) - template - auto trim(std::string_view markstr, //< Position to which to trim, inclusive - bool exclusive, //< If true, trim markers up to but NOT INCLUDING - //< markstr, otherwise trim markstr as well. - CT&& ct //< CompletionToken - ) { - auto m = to_marker(markstr); - ba::async_completion init(ct); - auto a = ba::get_associated_allocator(init.completion_handler); - auto e = ba::get_associated_executor(init.completion_handler); - if (!m) { - ldout(r->cct(), 0) << __func__ << "(): failed to parse marker: marker=" - << markstr << dendl; - e.post(ca::bind_handler(std::move(init.completion_handler), - errc::invalid_marker), a); - return init.result.get(); - } else { - using handler_type = decltype(init.completion_handler); - auto t = ceph::allocate_unique>( - a, this, m->num, m->ofs, exclusive, std::move(init.completion_handler)); - t.release()->trim(); - } - return init.result.get(); - } - - /// Get information about a specific partition - /// Signature: (bs::error_code, part_info) - /// - /// part_info has the following entries - /// tag - A random string identifying this partition. Used internally - /// as a sanity check to make sure operations haven't been misdirected - /// params - Data parameters, identical for every partition within a - /// FIFO and the same as what is returned from get_part_layout() - /// magic - A random magic number, used internally as a prefix to - /// every entry stored on the OSD to ensure sync - /// min_ofs - Offset of the first entry - /// max_ofs - Offset of the highest entry - /// min_index - Minimum entry index - /// max_index - Maximum entry index - /// max_time - Time of the latest push - /// - /// The difference between ofs and index is that ofs is a byte - /// offset. Index is a count. Nothing really uses indices, but - /// they're tracked and sanity-checked as an invariant on the OSD. - /// - /// max_ofs and max_time are the two that have been used externally - /// so far. - template - auto get_part_info(int64_t part_num, // The number of the partition - CT&& ct // CompletionToken - ) { - - ba::async_completion init(ct); - fifo::op::get_part_info gpi; - cb::list in; - encode(gpi, in); - ReadOp op; - auto e = ba::get_associated_executor(init.completion_handler, - get_executor()); - auto a = ba::get_associated_allocator(init.completion_handler); - auto reply = ceph::allocate_unique< - ExecDecodeCB>(a); - - op.exec(fifo::op::CLASS, fifo::op::GET_PART_INFO, in, - std::ref(*reply)); - std::unique_lock l(m); - auto part_oid = info.part_oid(part_num); - l.unlock(); - r->execute(part_oid, ioc, std::move(op), nullptr, - ca::bind_ea(e, a, - PartInfoGetter(std::move(init.completion_handler), - std::move(reply)))); - return init.result.get(); - } - - using executor_type = ba::executor; - - /// Return the default executor, as specified at creation. - ba::executor get_executor() const { - return executor; - } - -private: - template - friend class detail::JournalProcessor; - RADOS* const r; - const IOContext ioc; - const Object oid; - std::mutex m; - - fifo::info info; - - std::uint32_t part_header_size = 0xdeadbeef; - std::uint32_t part_entry_overhead = 0xdeadbeef; - - ba::executor executor; - - std::optional to_marker(std::string_view s); - - template - static void assoc_delete(const Handler& handler, T* t) { - using Alloc = ba::associated_allocator_t; - using Traits = typename std::allocator_traits; - using RebindAlloc = typename Traits::template rebind_alloc; - using RebindTraits = typename std::allocator_traits; - RebindAlloc a(get_associated_allocator(handler)); - RebindTraits::destroy(a, t); - RebindTraits::deallocate(a, t, 1); - } - - FIFO(RADOS& r, - IOContext ioc, - Object oid, - ba::executor executor) - : r(&r), ioc(std::move(ioc)), oid(oid), executor(executor) {} - - std::string generate_tag() const; - - template - struct ExecDecodeCB { - bs::error_code ec; - T result; - void operator()(bs::error_code e, const cb::list& r) { - if (e) { - ec = e; - return; - } - try { - auto p = r.begin(); - using ceph::decode; - decode(result, p); - } catch (const cb::error& err) { - ec = err.code(); - } - } - }; - - template - class MetaReader { - Handler handler; - using allocator_type = boost::asio::associated_allocator_t; - using decoder_type = ExecDecodeCB; - using decoder_ptr = ceph::allocated_unique_ptr; - decoder_ptr decoder; - public: - MetaReader(Handler&& handler, decoder_ptr&& decoder) - : handler(std::move(handler)), decoder(std::move(decoder)) {} - - void operator ()(bs::error_code ec) { - if (!ec) { - ec = decoder->ec; - } - auto reply = std::move(decoder->result); - decoder.reset(); // free handler-allocated memory before dispatching - - std::move(handler)(ec, std::move(reply.info), - std::move(reply.part_header_size), - std::move(reply.part_entry_overhead)); - } - }; - - // Renamed to get around a compiler bug in Bionic that kept - // complaining we weren't capturing 'this' to make a static function call. - template - static void _read_meta_(RADOS* r, const Object& oid, const IOContext& ioc, - std::optional objv, - Handler&& handler, /* error_code, info, uint64, - uint64 */ - std::optional executor = std::nullopt){ - fifo::op::get_meta gm; - - gm.version = objv; - - cb::list in; - encode(gm, in); - ReadOp op; - - auto a = ba::get_associated_allocator(handler); - auto reply = - ceph::allocate_unique>(a); - - auto e = ba::get_associated_executor(handler); - op.exec(fifo::op::CLASS, fifo::op::GET_META, in, std::ref(*reply)); - r->execute(oid, ioc, std::move(op), nullptr, - ca::bind_ea(e, a, MetaReader(std::move(handler), - std::move(reply)))); - }; - - template - void _read_meta(Handler&& handler /* error_code */) { - auto e = ba::get_associated_executor(handler, get_executor()); - auto a = ba::get_associated_allocator(handler); - _read_meta_(r, oid, ioc, - std::nullopt, - ca::bind_ea( - e, a, - [this, - handler = std::move(handler)](bs::error_code ec, - fifo::info&& info, - std::uint64_t phs, - std::uint64_t peo) mutable { - std::unique_lock l(m); - if (ec) { - l.unlock(); - std::move(handler)(ec); - return; - } - // We have a newer version already! - if (!info.version.same_or_later(this->info.version)) { - l.unlock(); - std::move(handler)(bs::error_code{}); - return; - } - this->info = std::move(info); - part_header_size = phs; - part_entry_overhead = peo; - l.unlock(); - std::move(handler)(bs::error_code{}); - }), get_executor()); - } - - bs::error_code apply_update(fifo::info* info, - const fifo::objv& objv, - const fifo::update& update); - - - template - void _update_meta(const fifo::update& update, - fifo::objv version, - Handler&& handler /* error_code, bool */) { - WriteOp op; - - cls::fifo::update_meta(op, info.version, update); - - auto a = ba::get_associated_allocator(handler); - auto e = ba::get_associated_executor(handler, get_executor()); - - r->execute( - oid, ioc, std::move(op), - ca::bind_ea( - e, a, - [this, e, a, version, update, - handler = std::move(handler)](bs::error_code ec) mutable { - if (ec && ec != bs::errc::operation_canceled) { - std::move(handler)(ec, bool{}); - return; - } - - auto canceled = (ec == bs::errc::operation_canceled); - - if (!canceled) { - ec = apply_update(&info, - version, - update); - if (ec) { - canceled = true; - } - } - - if (canceled) { - _read_meta( - ca::bind_ea( - e, a, - [handler = std::move(handler)](bs::error_code ec) mutable { - std::move(handler)(ec, ec ? false : true); - })); - return; - } - std::move(handler)(ec, false); - return; - })); - } - - template - auto _process_journal(Handler&& handler /* error_code */) { - auto a = ba::get_associated_allocator(std::ref(handler)); - auto j = ceph::allocate_unique>( - a, this, std::move(handler)); - auto p = j.release(); - p->process(); - } - - template - class NewPartPreparer { - FIFO* f; - Handler handler; - std::vector jentries; - int i; - std::int64_t new_head_part_num; - - public: - - void operator ()(bs::error_code ec, bool canceled) { - if (ec) { - std::move(handler)(ec); - return; - } - - if (canceled) { - std::unique_lock l(f->m); - auto iter = f->info.journal.find(jentries.front().part_num); - auto max_push_part_num = f->info.max_push_part_num; - auto head_part_num = f->info.head_part_num; - auto version = f->info.version; - auto found = (iter != f->info.journal.end()); - l.unlock(); - if ((max_push_part_num >= jentries.front().part_num && - head_part_num >= new_head_part_num)) { - /* raced, but new part was already written */ - std::move(handler)(bs::error_code{}); - return; - } - if (i >= MAX_RACE_RETRIES) { - std::move(handler)(errc::raced); - return; - } - if (!found) { - auto e = ba::get_associated_executor(handler, f->get_executor()); - auto a = ba::get_associated_allocator(handler); - f->_update_meta(fifo::update{} - .journal_entries_add(jentries), - version, - ca::bind_ea( - e, a, - NewPartPreparer(f, std::move(handler), - jentries, - i + 1, new_head_part_num))); - return; - } - // Fall through. We still need to process the journal. - } - f->_process_journal(std::move(handler)); - return; - } - - NewPartPreparer(FIFO* f, - Handler&& handler, - std::vector jentries, - int i, std::int64_t new_head_part_num) - : f(f), handler(std::move(handler)), jentries(std::move(jentries)), - i(i), new_head_part_num(new_head_part_num) {} - }; - - template - void _prepare_new_part(bool is_head, - Handler&& handler /* error_code */) { - std::unique_lock l(m); - std::vector jentries = { info.next_journal_entry(generate_tag()) }; - std::int64_t new_head_part_num = info.head_part_num; - auto version = info.version; - - if (is_head) { - auto new_head_jentry = jentries.front(); - new_head_jentry.op = fifo::journal_entry::Op::set_head; - new_head_part_num = jentries.front().part_num; - jentries.push_back(std::move(new_head_jentry)); - } - l.unlock(); - - auto e = ba::get_associated_executor(handler, get_executor()); - auto a = ba::get_associated_allocator(handler); - _update_meta(fifo::update{}.journal_entries_add(jentries), - version, - ca::bind_ea( - e, a, - NewPartPreparer(this, std::move(handler), - jentries, 0, new_head_part_num))); - } - - template - class NewHeadPreparer { - FIFO* f; - Handler handler; - int i; - std::int64_t new_head_num; - - public: - - void operator ()(bs::error_code ec, bool canceled) { - std::unique_lock l(f->m); - auto head_part_num = f->info.head_part_num; - auto version = f->info.version; - l.unlock(); - - if (ec) { - std::move(handler)(ec); - return; - } - if (canceled) { - if (i >= MAX_RACE_RETRIES) { - std::move(handler)(errc::raced); - return; - } - - // Raced, but there's still work to do! - if (head_part_num < new_head_num) { - auto e = ba::get_associated_executor(handler, f->get_executor()); - auto a = ba::get_associated_allocator(handler); - f->_update_meta(fifo::update{}.head_part_num(new_head_num), - version, - ca::bind_ea( - e, a, - NewHeadPreparer(f, std::move(handler), - i + 1, - new_head_num))); - return; - } - } - // Either we succeeded, or we were raced by someone who did it for us. - std::move(handler)(bs::error_code{}); - return; - } - - NewHeadPreparer(FIFO* f, - Handler&& handler, - int i, std::int64_t new_head_num) - : f(f), handler(std::move(handler)), i(i), new_head_num(new_head_num) {} - }; - - template - void _prepare_new_head(Handler&& handler /* error_code */) { - std::unique_lock l(m); - int64_t new_head_num = info.head_part_num + 1; - auto max_push_part_num = info.max_push_part_num; - auto version = info.version; - l.unlock(); - - if (max_push_part_num < new_head_num) { - auto e = ba::get_associated_executor(handler, get_executor()); - auto a = ba::get_associated_allocator(handler); - _prepare_new_part( - true, - ca::bind_ea( - e, a, - [this, new_head_num, - handler = std::move(handler)](bs::error_code ec) mutable { - if (ec) { - handler(ec); - return; - } - std::unique_lock l(m); - if (info.max_push_part_num < new_head_num) { - l.unlock(); - ldout(r->cct(), 0) - << "ERROR: " << __func__ - << ": after new part creation: meta_info.max_push_part_num=" - << info.max_push_part_num << " new_head_num=" - << info.max_push_part_num << dendl; - std::move(handler)(errc::inconsistency); - } else { - l.unlock(); - std::move(handler)(bs::error_code{}); - } - })); - return; - } - auto e = ba::get_associated_executor(handler, get_executor()); - auto a = ba::get_associated_allocator(handler); - _update_meta(fifo::update{}.head_part_num(new_head_num), - version, - ca::bind_ea( - e, a, - NewHeadPreparer(this, std::move(handler), 0, - new_head_num))); - } - - template - struct ExecHandleCB { - bs::error_code ec; - T result; - void operator()(bs::error_code e, const T& t) { - if (e) { - ec = e; - return; - } - result = t; - } - }; - - template - class EntryPusher { - Handler handler; - using allocator_type = boost::asio::associated_allocator_t; - using decoder_type = ExecHandleCB; - using decoder_ptr = ceph::allocated_unique_ptr; - decoder_ptr decoder; - - public: - - EntryPusher(Handler&& handler, decoder_ptr&& decoder) - : handler(std::move(handler)), decoder(std::move(decoder)) {} - - void operator ()(bs::error_code ec) { - if (!ec) { - ec = decoder->ec; - } - auto reply = std::move(decoder->result); - decoder.reset(); // free handler-allocated memory before dispatching - - std::move(handler)(ec, std::move(reply)); - } - }; - - template - auto push_entries(const std::deque& data_bufs, - Handler&& handler /* error_code, int */) { - WriteOp op; - std::unique_lock l(m); - auto head_part_num = info.head_part_num; - auto tag = info.head_tag; - auto oid = info.part_oid(head_part_num); - l.unlock(); - - auto a = ba::get_associated_allocator(handler); - auto reply = ceph::allocate_unique>(a); - - auto e = ba::get_associated_executor(handler, get_executor()); - push_part(op, tag, data_bufs, std::ref(*reply)); - return r->execute(oid, ioc, std::move(op), - ca::bind_ea(e, a, EntryPusher(std::move(handler), - std::move(reply)))); - } - - template - auto trim_part(int64_t part_num, - uint64_t ofs, - std::optional tag, - bool exclusive, - CT&& ct) { - WriteOp op; - cls::fifo::trim_part(op, tag, ofs, exclusive); - return r->execute(info.part_oid(part_num), ioc, std::move(op), - std::forward(ct)); - } - - - template - class Pusher { - FIFO* f; - std::deque remaining; - std::deque batch; - int i; - Handler handler; - - void prep_then_push(const unsigned successes) { - std::unique_lock l(f->m); - auto max_part_size = f->info.params.max_part_size; - auto part_entry_overhead = f->part_entry_overhead; - l.unlock(); - - uint64_t batch_len = 0; - if (successes > 0) { - if (successes == batch.size()) { - batch.clear(); - } else { - batch.erase(batch.begin(), batch.begin() + successes); - for (const auto& b : batch) { - batch_len += b.length() + part_entry_overhead; - } - } - } - - if (batch.empty() && remaining.empty()) { - std::move(handler)(bs::error_code{}); - return; - } - - while (!remaining.empty() && - (remaining.front().length() + batch_len <= max_part_size)) { - - /* We can send entries with data_len up to max_entry_size, - however, we want to also account the overhead when - dealing with multiple entries. Previous check doesn't - account for overhead on purpose. */ - batch_len += remaining.front().length() + part_entry_overhead; - batch.push_back(std::move(remaining.front())); - remaining.pop_front(); - } - push(); - } - - void push() { - auto e = ba::get_associated_executor(handler, f->get_executor()); - auto a = ba::get_associated_allocator(handler); - f->push_entries(batch, - ca::bind_ea(e, a, - Pusher(f, std::move(remaining), - batch, i, - std::move(handler)))); - } - - public: - - // Initial call! - void operator ()() { - prep_then_push(0); - } - - // Called with response to push_entries - void operator ()(bs::error_code ec, int r) { - if (ec == bs::errc::result_out_of_range) { - auto e = ba::get_associated_executor(handler, f->get_executor()); - auto a = ba::get_associated_allocator(handler); - f->_prepare_new_head( - ca::bind_ea(e, a, - Pusher(f, std::move(remaining), - std::move(batch), i, - std::move(handler)))); - return; - } - if (ec) { - std::move(handler)(ec); - return; - } - i = 0; // We've made forward progress, so reset the race counter! - prep_then_push(r); - } - - // Called with response to prepare_new_head - void operator ()(bs::error_code ec) { - if (ec == bs::errc::operation_canceled) { - if (i == MAX_RACE_RETRIES) { - ldout(f->r->cct(), 0) - << "ERROR: " << __func__ - << "(): race check failed too many times, likely a bug" << dendl; - std::move(handler)(make_error_code(errc::raced)); - return; - } - ++i; - } else if (ec) { - std::move(handler)(ec); - return; - } - - if (batch.empty()) { - prep_then_push(0); - return; - } else { - push(); - return; - } - } - - Pusher(FIFO* f, std::deque&& remaining, - std::deque batch, int i, - Handler&& handler) - : f(f), remaining(std::move(remaining)), - batch(std::move(batch)), i(i), - handler(std::move(handler)) {} - }; - - template - class Lister { - FIFO* f; - std::vector result; - bool more = false; - std::int64_t part_num; - std::uint64_t ofs; - int max_entries; - bs::error_code ec_out; - std::vector entries; - bool part_more = false; - bool part_full = false; - Handler handler; - - void handle(bs::error_code ec) { - auto h = std::move(handler); - auto m = more; - auto r = std::move(result); - - FIFO::assoc_delete(h, this); - std::move(h)(ec, std::move(r), m); - } - - public: - Lister(FIFO* f, std::int64_t part_num, std::uint64_t ofs, int max_entries, - Handler&& handler) - : f(f), part_num(part_num), ofs(ofs), max_entries(max_entries), - handler(std::move(handler)) { - result.reserve(max_entries); - } - - - Lister(const Lister&) = delete; - Lister& operator =(const Lister&) = delete; - Lister(Lister&&) = delete; - Lister& operator =(Lister&&) = delete; - - void list() { - if (max_entries > 0) { - ReadOp op; - ec_out.clear(); - part_more = false; - part_full = false; - entries.clear(); - - std::unique_lock l(f->m); - auto part_oid = f->info.part_oid(part_num); - l.unlock(); - - list_part(op, - {}, - ofs, - max_entries, - &ec_out, - &entries, - &part_more, - &part_full, - nullptr); - auto e = ba::get_associated_executor(handler, f->get_executor()); - auto a = ba::get_associated_allocator(handler); - f->r->execute( - part_oid, - f->ioc, - std::move(op), - nullptr, - ca::bind_ea( - e, a, - [t = std::unique_ptr(this), this, - part_oid](bs::error_code ec) mutable { - t.release(); - if (ec == bs::errc::no_such_file_or_directory) { - auto e = ba::get_associated_executor(handler, - f->get_executor()); - auto a = ba::get_associated_allocator(handler); - f->_read_meta( - ca::bind_ea( - e, a, - [this](bs::error_code ec) mutable { - if (ec) { - handle(ec); - return; - } - - if (part_num < f->info.tail_part_num) { - /* raced with trim? restart */ - max_entries += result.size(); - result.clear(); - part_num = f->info.tail_part_num; - ofs = 0; - list(); - } - /* assuming part was not written yet, so end of data */ - more = false; - handle({}); - return; - })); - return; - } - if (ec) { - ldout(f->r->cct(), 0) - << __func__ - << "(): list_part() on oid=" << part_oid - << " returned ec=" << ec.message() << dendl; - handle(ec); - return; - } - if (ec_out) { - ldout(f->r->cct(), 0) - << __func__ - << "(): list_part() on oid=" << f->info.part_oid(part_num) - << " returned ec=" << ec_out.message() << dendl; - handle(ec_out); - return; - } - - more = part_full || part_more; - for (auto& entry : entries) { - list_entry e; - e.data = std::move(entry.data); - e.marker = marker{part_num, entry.ofs}.to_string(); - e.mtime = entry.mtime; - result.push_back(std::move(e)); - } - max_entries -= entries.size(); - entries.clear(); - if (max_entries > 0 && - part_more) { - list(); - return; - } - - if (!part_full) { /* head part is not full */ - handle({}); - return; - } - ++part_num; - ofs = 0; - list(); - })); - } else { - handle({}); - return; - } - } - }; - - template - class Trimmer { - FIFO* f; - std::int64_t part_num; - std::uint64_t ofs; - bool exclusive; - Handler handler; - std::int64_t pn; - int i = 0; - - void handle(bs::error_code ec) { - auto h = std::move(handler); - - FIFO::assoc_delete(h, this); - return std::move(h)(ec); - } - - void update() { - std::unique_lock l(f->m); - auto objv = f->info.version; - l.unlock(); - auto a = ba::get_associated_allocator(handler); - auto e = ba::get_associated_executor(handler, f->get_executor()); - f->_update_meta( - fifo::update{}.tail_part_num(part_num), - objv, - ca::bind_ea( - e, a, - [this, t = std::unique_ptr(this)](bs::error_code ec, - bool canceled) mutable { - t.release(); - if (canceled) - if (i >= MAX_RACE_RETRIES) { - ldout(f->r->cct(), 0) - << "ERROR: " << __func__ - << "(): race check failed too many times, likely a bug" - << dendl; - handle(errc::raced); - return; - } - std::unique_lock l(f->m); - auto tail_part_num = f->info.tail_part_num; - l.unlock(); - if (tail_part_num < part_num) { - ++i; - update(); - return; - } - handle({}); - return; - })); - } - - public: - Trimmer(FIFO* f, std::int64_t part_num, std::uint64_t ofs, - bool exclusive, Handler&& handler) - : f(f), part_num(part_num), ofs(ofs), exclusive(exclusive), - handler(std::move(handler)) { - std::unique_lock l(f->m); - pn = f->info.tail_part_num; - } - - void trim() { - auto a = ba::get_associated_allocator(handler); - auto e = ba::get_associated_executor(handler, f->get_executor()); - if (pn < part_num) { - std::unique_lock l(f->m); - auto max_part_size = f->info.params.max_part_size; - l.unlock(); - f->trim_part( - pn, max_part_size, std::nullopt, - false, - ca::bind_ea( - e, a, - [t = std::unique_ptr(this), - this](bs::error_code ec) mutable { - t.release(); - if (ec && ec != bs::errc::no_such_file_or_directory) { - ldout(f->r->cct(), 0) - << __func__ << "(): ERROR: trim_part() on part=" - << pn << " returned ec=" << ec.message() << dendl; - handle(ec); - return; - } - ++pn; - trim(); - })); - return; - } - f->trim_part( - part_num, ofs, std::nullopt, exclusive, - ca::bind_ea( - e, a, - [t = std::unique_ptr(this), - this](bs::error_code ec) mutable { - t.release(); - if (ec && ec != bs::errc::no_such_file_or_directory) { - ldout(f->r->cct(), 0) - << __func__ << "(): ERROR: trim_part() on part=" << part_num - << " returned ec=" << ec.message() << dendl; - handle(ec); - return; - } - std::unique_lock l(f->m); - auto tail_part_num = f->info.tail_part_num; - l.unlock(); - if (part_num <= tail_part_num) { - /* don't need to modify meta info */ - handle({}); - return; - } - update(); - })); - } - }; - - template - class PartInfoGetter { - Handler handler; - using allocator_type = boost::asio::associated_allocator_t; - using decoder_type = ExecDecodeCB; - using decoder_ptr = ceph::allocated_unique_ptr; - decoder_ptr decoder; - public: - PartInfoGetter(Handler&& handler, decoder_ptr&& decoder) - : handler(std::move(handler)), decoder(std::move(decoder)) {} - - void operator ()(bs::error_code ec) { - if (!ec) { - ec = decoder->ec; - } - auto reply = std::move(decoder->result); - decoder.reset(); // free handler-allocated memory before dispatching - - auto p = ca::bind_handler(std::move(handler), - ec, std::move(reply.header)); - std::move(p)(); - } - }; - - -}; - -namespace detail { -template -class JournalProcessor { -private: - FIFO* const fifo; - Handler handler; - - std::vector processed; - std::multimap journal; - std::multimap::iterator iter; - std::int64_t new_tail; - std::int64_t new_head; - std::int64_t new_max; - int race_retries = 0; - - template - auto create_part(int64_t part_num, std::string_view tag, CT&& ct) { - WriteOp op; - op.create(false); /* We don't need exclusivity, part_init ensures - we're creating from the same journal entry. */ - std::unique_lock l(fifo->m); - part_init(op, tag, fifo->info.params); - auto oid = fifo->info.part_oid(part_num); - l.unlock(); - return fifo->r->execute(oid, fifo->ioc, - std::move(op), std::forward(ct)); - } - - template - auto remove_part(int64_t part_num, std::string_view tag, CT&& ct) { - WriteOp op; - op.remove(); - std::unique_lock l(fifo->m); - auto oid = fifo->info.part_oid(part_num); - l.unlock(); - return fifo->r->execute(oid, fifo->ioc, - std::move(op), std::forward(ct)); - } - - template - void process_journal_entry(const fifo::journal_entry& entry, - PP&& pp) { - switch (entry.op) { - case fifo::journal_entry::Op::unknown: - std::move(pp)(errc::inconsistency); - return; - break; - - case fifo::journal_entry::Op::create: - create_part(entry.part_num, entry.part_tag, std::move(pp)); - return; - break; - case fifo::journal_entry::Op::set_head: - ba::post(ba::get_associated_executor(handler, fifo->get_executor()), - [pp = std::move(pp)]() mutable { - std::move(pp)(bs::error_code{}); - }); - return; - break; - case fifo::journal_entry::Op::remove: - remove_part(entry.part_num, entry.part_tag, std::move(pp)); - return; - break; - } - std::move(pp)(errc::inconsistency); - return; - } - - auto journal_entry_finisher(const fifo::journal_entry& entry) { - auto a = ba::get_associated_allocator(handler); - auto e = ba::get_associated_executor(handler, fifo->get_executor()); - return - ca::bind_ea( - e, a, - [t = std::unique_ptr(this), this, - entry](bs::error_code ec) mutable { - t.release(); - if (entry.op == fifo::journal_entry::Op::remove && - ec == bs::errc::no_such_file_or_directory) - ec.clear(); - - if (ec) { - ldout(fifo->r->cct(), 0) - << __func__ - << "(): ERROR: failed processing journal entry for part=" - << entry.part_num << " with error " << ec.message() - << " Bug or inconsistency." << dendl; - handle(errc::inconsistency); - return; - } else { - switch (entry.op) { - case fifo::journal_entry::Op::unknown: - // Can't happen. Filtered out in process_journal_entry. - abort(); - break; - - case fifo::journal_entry::Op::create: - if (entry.part_num > new_max) { - new_max = entry.part_num; - } - break; - case fifo::journal_entry::Op::set_head: - if (entry.part_num > new_head) { - new_head = entry.part_num; - } - break; - case fifo::journal_entry::Op::remove: - if (entry.part_num >= new_tail) { - new_tail = entry.part_num + 1; - } - break; - } - processed.push_back(entry); - } - ++iter; - process(); - }); - } - - struct JournalPostprocessor { - std::unique_ptr j_; - bool first; - void operator ()(bs::error_code ec, bool canceled) { - std::optional tail_part_num; - std::optional head_part_num; - std::optional max_part_num; - - auto j = j_.release(); - - if (!first && !ec && !canceled) { - j->handle({}); - return; - } - - if (canceled) { - if (j->race_retries >= MAX_RACE_RETRIES) { - ldout(j->fifo->r->cct(), 0) << "ERROR: " << __func__ << - "(): race check failed too many times, likely a bug" << dendl; - j->handle(errc::raced); - return; - } - - ++j->race_retries; - - std::vector new_processed; - std::unique_lock l(j->fifo->m); - for (auto& e : j->processed) { - auto jiter = j->fifo->info.journal.find(e.part_num); - /* journal entry was already processed */ - if (jiter == j->fifo->info.journal.end() || - !(jiter->second == e)) { - continue; - } - new_processed.push_back(e); - } - j->processed = std::move(new_processed); - } - - std::unique_lock l(j->fifo->m); - auto objv = j->fifo->info.version; - if (j->new_tail > j->fifo->info.tail_part_num) { - tail_part_num = j->new_tail; - } - - if (j->new_head > j->fifo->info.head_part_num) { - head_part_num = j->new_head; - } - - if (j->new_max > j->fifo->info.max_push_part_num) { - max_part_num = j->new_max; - } - l.unlock(); - - if (j->processed.empty() && - !tail_part_num && - !max_part_num) { - /* nothing to update anymore */ - j->handle({}); - return; - } - auto a = ba::get_associated_allocator(j->handler); - auto e = ba::get_associated_executor(j->handler, j->fifo->get_executor()); - j->fifo->_update_meta(fifo::update{} - .tail_part_num(tail_part_num) - .head_part_num(head_part_num) - .max_push_part_num(max_part_num) - .journal_entries_rm(j->processed), - objv, - ca::bind_ea( - e, a, - JournalPostprocessor{j, false})); - return; - } - - JournalPostprocessor(JournalProcessor* j, bool first) - : j_(j), first(first) {} - }; - - void postprocess() { - if (processed.empty()) { - handle({}); - return; - } - JournalPostprocessor(this, true)({}, false); - } - - void handle(bs::error_code ec) { - auto e = ba::get_associated_executor(handler, fifo->get_executor()); - auto a = ba::get_associated_allocator(handler); - auto h = std::move(handler); - FIFO::assoc_delete(h, this); - e.dispatch(ca::bind_handler(std::move(h), ec), a); - return; - } - -public: - - JournalProcessor(FIFO* fifo, Handler&& handler) - : fifo(fifo), handler(std::move(handler)) { - std::unique_lock l(fifo->m); - journal = fifo->info.journal; - iter = journal.begin(); - new_tail = fifo->info.tail_part_num; - new_head = fifo->info.head_part_num; - new_max = fifo->info.max_push_part_num; - } - - JournalProcessor(const JournalProcessor&) = delete; - JournalProcessor& operator =(const JournalProcessor&) = delete; - JournalProcessor(JournalProcessor&&) = delete; - JournalProcessor& operator =(JournalProcessor&&) = delete; - - void process() { - if (iter != journal.end()) { - const auto entry = iter->second; - process_journal_entry(entry, - journal_entry_finisher(entry)); - return; - } else { - postprocess(); - return; - } - } -}; -} -} - -#endif // CEPH_RADOS_CLS_FIFIO_H diff --git a/src/rgw/CMakeLists.txt b/src/rgw/CMakeLists.txt index 15c4d4c1d78..11f047c61db 100644 --- a/src/rgw/CMakeLists.txt +++ b/src/rgw/CMakeLists.txt @@ -426,7 +426,7 @@ set(radosgw_admin_srcs add_executable(radosgw-admin ${radosgw_admin_srcs}) target_link_libraries(radosgw-admin ${rgw_libs} librados cls_rgw_client cls_otp_client cls_lock_client cls_refcount_client - cls_log_client cls_timeindex_client neorados_cls_fifo + cls_log_client cls_timeindex_client cls_version_client cls_user_client global ${LIB_RESOLV} OATH::OATH @@ -438,7 +438,7 @@ set(radosgw_es_srcs add_executable(radosgw-es ${radosgw_es_srcs}) target_link_libraries(radosgw-es ${rgw_libs} librados cls_rgw_client cls_otp_client cls_lock_client cls_refcount_client - cls_log_client cls_timeindex_client neorados_cls_fifo + cls_log_client cls_timeindex_client cls_version_client cls_user_client global ${LIB_RESOLV} ${CURL_LIBRARIES} ${EXPAT_LIBRARIES} ${BLKID_LIBRARIES}) @@ -456,7 +456,7 @@ set(radosgw_object_expirer_srcs add_executable(radosgw-object-expirer ${radosgw_object_expirer_srcs}) target_link_libraries(radosgw-object-expirer ${rgw_libs} librados cls_rgw_client cls_otp_client cls_lock_client cls_refcount_client - cls_log_client cls_timeindex_client neorados_cls_fifo + cls_log_client cls_timeindex_client cls_version_client cls_user_client global ${LIB_RESOLV} ${CURL_LIBRARIES} ${EXPAT_LIBRARIES}) @@ -493,7 +493,6 @@ target_link_libraries(rgw cls_refcount_client cls_log_client cls_timeindex_client - neorados_cls_fifo cls_version_client cls_user_client ${LIB_RESOLV} diff --git a/src/rgw/driver/rados/cls_fifo_legacy.cc b/src/rgw/driver/rados/cls_fifo_legacy.cc index 5b83c850caf..3edec238014 100644 --- a/src/rgw/driver/rados/cls_fifo_legacy.cc +++ b/src/rgw/driver/rados/cls_fifo_legacy.cc @@ -121,12 +121,10 @@ void update_meta(lr::ObjectWriteOperation* op, const fifo::objv& objv, op->exec(fifo::op::CLASS, fifo::op::UPDATE_META, in); } -void part_init(lr::ObjectWriteOperation* op, std::string_view tag, - fifo::data_params params) +void part_init(lr::ObjectWriteOperation* op, fifo::data_params params) { fifo::op::init_part ip; - ip.tag = tag; ip.params = params; cb::list in; @@ -134,14 +132,13 @@ void part_init(lr::ObjectWriteOperation* op, std::string_view tag, op->exec(fifo::op::CLASS, fifo::op::INIT_PART, in); } -int push_part(const DoutPrefixProvider *dpp, lr::IoCtx& ioctx, const std::string& oid, std::string_view tag, +int push_part(const DoutPrefixProvider *dpp, lr::IoCtx& ioctx, const std::string& oid, std::deque data_bufs, std::uint64_t tid, optional_yield y) { lr::ObjectWriteOperation op; fifo::op::push_part pp; - pp.tag = tag; pp.data_bufs = data_bufs; pp.total_len = 0; @@ -169,14 +166,13 @@ int push_part(const DoutPrefixProvider *dpp, lr::IoCtx& ioctx, const std::string return retval; } -void push_part(lr::IoCtx& ioctx, const std::string& oid, std::string_view tag, +void push_part(lr::IoCtx& ioctx, const std::string& oid, std::deque data_bufs, std::uint64_t tid, lr::AioCompletion* c) { lr::ObjectWriteOperation op; fifo::op::push_part pp; - pp.tag = tag; pp.data_bufs = data_bufs; pp.total_len = 0; @@ -191,12 +187,10 @@ void push_part(lr::IoCtx& ioctx, const std::string& oid, std::string_view tag, } void trim_part(lr::ObjectWriteOperation* op, - std::optional tag, std::uint64_t ofs, bool exclusive) { fifo::op::trim_part tp; - tp.tag = tag; tp.ofs = ofs; tp.exclusive = exclusive; @@ -206,16 +200,14 @@ void trim_part(lr::ObjectWriteOperation* op, } int list_part(const DoutPrefixProvider *dpp, lr::IoCtx& ioctx, const std::string& oid, - std::optional tag, std::uint64_t ofs, - std::uint64_t max_entries, + std::uint64_t ofs, std::uint64_t max_entries, std::vector* entries, - bool* more, bool* full_part, std::string* ptag, + bool* more, bool* full_part, std::uint64_t tid, optional_yield y) { lr::ObjectReadOperation op; fifo::op::list_part lp; - lp.tag = tag; lp.ofs = ofs; lp.max_entries = max_entries; @@ -231,7 +223,6 @@ int list_part(const DoutPrefixProvider *dpp, lr::IoCtx& ioctx, const std::string if (entries) *entries = std::move(reply.entries); if (more) *more = reply.more; if (full_part) *full_part = reply.full_part; - if (ptag) *ptag = reply.tag; } catch (const cb::error& err) { ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ @@ -253,14 +244,12 @@ struct list_entry_completion : public lr::ObjectOperationCompletion { std::vector* entries; bool* more; bool* full_part; - std::string* ptag; std::uint64_t tid; list_entry_completion(CephContext* cct, int* r_out, std::vector* entries, - bool* more, bool* full_part, std::string* ptag, - std::uint64_t tid) + bool* more, bool* full_part, std::uint64_t tid) : cct(cct), r_out(r_out), entries(entries), more(more), - full_part(full_part), ptag(ptag), tid(tid) {} + full_part(full_part), tid(tid) {} virtual ~list_entry_completion() = default; void handle_completion(int r, bufferlist& bl) override { if (r >= 0) try { @@ -270,7 +259,6 @@ struct list_entry_completion : public lr::ObjectOperationCompletion { if (entries) *entries = std::move(reply.entries); if (more) *more = reply.more; if (full_part) *full_part = reply.full_part; - if (ptag) *ptag = reply.tag; } catch (const cb::error& err) { lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ @@ -288,18 +276,16 @@ struct list_entry_completion : public lr::ObjectOperationCompletion { }; lr::ObjectReadOperation list_part(CephContext* cct, - std::optional tag, std::uint64_t ofs, std::uint64_t max_entries, int* r_out, std::vector* entries, bool* more, bool* full_part, - std::string* ptag, std::uint64_t tid) + std::uint64_t tid) { lr::ObjectReadOperation op; fifo::op::list_part lp; - lp.tag = tag; lp.ofs = ofs; lp.max_entries = max_entries; @@ -307,7 +293,7 @@ lr::ObjectReadOperation list_part(CephContext* cct, encode(lp, in); op.exec(fifo::op::CLASS, fifo::op::LIST_PART, in, new list_entry_completion(cct, r_out, entries, more, full_part, - ptag, tid)); + tid)); return op; } @@ -421,14 +407,6 @@ std::optional FIFO::to_marker(std::string_view s) return m; } -std::string FIFO::generate_tag() const -{ - static constexpr auto HEADER_TAG_SIZE = 16; - return gen_rand_alphanumeric_plain(static_cast(ioctx.cct()), - HEADER_TAG_SIZE); -} - - int FIFO::apply_update(const DoutPrefixProvider *dpp, fifo::info* info, const fifo::objv& objv, @@ -577,7 +555,7 @@ void FIFO::_update_meta(const DoutPrefixProvider *dpp, const fifo::update& updat assert(r >= 0); } -int FIFO::create_part(const DoutPrefixProvider *dpp, int64_t part_num, std::string_view tag, std::uint64_t tid, +int FIFO::create_part(const DoutPrefixProvider *dpp, int64_t part_num, std::uint64_t tid, optional_yield y) { ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ @@ -586,7 +564,7 @@ int FIFO::create_part(const DoutPrefixProvider *dpp, int64_t part_num, std::stri op.create(false); /* We don't need exclusivity, part_init ensures we're creating from the same journal entry. */ std::unique_lock l(m); - part_init(&op, tag, info.params); + part_init(&op, info.params); auto oid = info.part_oid(part_num); l.unlock(); auto r = rgw_rados_operate(dpp, ioctx, oid, &op, y); @@ -598,7 +576,7 @@ int FIFO::create_part(const DoutPrefixProvider *dpp, int64_t part_num, std::stri return r; } -int FIFO::remove_part(const DoutPrefixProvider *dpp, int64_t part_num, std::string_view tag, std::uint64_t tid, +int FIFO::remove_part(const DoutPrefixProvider *dpp, int64_t part_num, std::uint64_t tid, optional_yield y) { ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ @@ -636,20 +614,21 @@ int FIFO::process_journal(const DoutPrefixProvider *dpp, std::uint64_t tid, opti << " processing entry: entry=" << entry << " tid=" << tid << dendl; switch (entry.op) { - case fifo::journal_entry::Op::create: - r = create_part(dpp, entry.part_num, entry.part_tag, tid, y); + using enum fifo::journal_entry::Op; + case create: + r = create_part(dpp, entry.part_num, tid, y); if (entry.part_num > new_max) { new_max = entry.part_num; } break; - case fifo::journal_entry::Op::set_head: + case set_head: r = 0; if (entry.part_num > new_head) { new_head = entry.part_num; } break; - case fifo::journal_entry::Op::remove: - r = remove_part(dpp, entry.part_num, entry.part_tag, tid, y); + case remove: + r = remove_part(dpp, entry.part_num, tid, y); if (r == -ENOENT) r = 0; if (entry.part_num >= new_tail) { new_tail = entry.part_num + 1; @@ -745,7 +724,10 @@ int FIFO::_prepare_new_part(const DoutPrefixProvider *dpp, bool is_head, std::ui ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ << " entering: tid=" << tid << dendl; std::unique_lock l(m); - std::vector jentries = { info.next_journal_entry(generate_tag()) }; + using enum fifo::journal_entry::Op; + std::vector jentries{{ + create, info.max_push_part_num + 1 + }}; if (info.journal.find(jentries.front().part_num) != info.journal.end()) { l.unlock(); ldpp_dout(dpp, 5) << __PRETTY_FUNCTION__ << ":" << __LINE__ @@ -764,10 +746,7 @@ int FIFO::_prepare_new_part(const DoutPrefixProvider *dpp, bool is_head, std::ui if (is_head) { ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ << " needs new head: tid=" << tid << dendl; - auto new_head_jentry = jentries.front(); - new_head_jentry.op = fifo::journal_entry::Op::set_head; - new_head_part_num = jentries.front().part_num; - jentries.push_back(std::move(new_head_jentry)); + jentries.push_back({ set_head, jentries.front().part_num }); } l.unlock(); @@ -950,7 +929,10 @@ void FIFO::_prepare_new_part(const DoutPrefixProvider *dpp, bool is_head, std::u lr::AioCompletion* c) { std::unique_lock l(m); - std::vector jentries = { info.next_journal_entry(generate_tag()) }; + using enum fifo::journal_entry::Op; + std::vector jentries{{ + create, info.max_push_part_num + 1 + }}; if (info.journal.find(jentries.front().part_num) != info.journal.end()) { l.unlock(); ldpp_dout(dpp, 5) << __PRETTY_FUNCTION__ << ":" << __LINE__ @@ -963,10 +945,7 @@ void FIFO::_prepare_new_part(const DoutPrefixProvider *dpp, bool is_head, std::u auto version = info.version; if (is_head) { - auto new_head_jentry = jentries.front(); - new_head_jentry.op = fifo::journal_entry::Op::set_head; - new_head_part_num = jentries.front().part_num; - jentries.push_back(std::move(new_head_jentry)); + jentries.push_back({ set_head, jentries.front().part_num }); } l.unlock(); @@ -1091,11 +1070,10 @@ int FIFO::push_entries(const DoutPrefixProvider *dpp, const std::deque << " entering: tid=" << tid << dendl; std::unique_lock l(m); auto head_part_num = info.head_part_num; - auto tag = info.head_tag; const auto part_oid = info.part_oid(head_part_num); l.unlock(); - auto r = push_part(dpp, ioctx, part_oid, tag, data_bufs, tid, y); + auto r = push_part(dpp, ioctx, part_oid, data_bufs, tid, y); if (r < 0) { ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ << " push_part failed: r=" << r << " tid=" << tid << dendl; @@ -1108,15 +1086,13 @@ void FIFO::push_entries(const std::deque& data_bufs, { std::unique_lock l(m); auto head_part_num = info.head_part_num; - auto tag = info.head_tag; const auto part_oid = info.part_oid(head_part_num); l.unlock(); - push_part(ioctx, part_oid, tag, data_bufs, tid, c); + push_part(ioctx, part_oid, data_bufs, tid, c); } int FIFO::trim_part(const DoutPrefixProvider *dpp, int64_t part_num, uint64_t ofs, - std::optional tag, bool exclusive, std::uint64_t tid, optional_yield y) { @@ -1126,7 +1102,7 @@ int FIFO::trim_part(const DoutPrefixProvider *dpp, int64_t part_num, uint64_t of std::unique_lock l(m); const auto part_oid = info.part_oid(part_num); l.unlock(); - rgw::cls::fifo::trim_part(&op, tag, ofs, exclusive); + rgw::cls::fifo::trim_part(&op, ofs, exclusive); auto r = rgw_rados_operate(dpp, ioctx, part_oid, &op, y); if (r < 0) { ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ @@ -1136,7 +1112,6 @@ int FIFO::trim_part(const DoutPrefixProvider *dpp, int64_t part_num, uint64_t of } void FIFO::trim_part(const DoutPrefixProvider *dpp, int64_t part_num, uint64_t ofs, - std::optional tag, bool exclusive, std::uint64_t tid, lr::AioCompletion* c) { @@ -1146,7 +1121,7 @@ void FIFO::trim_part(const DoutPrefixProvider *dpp, int64_t part_num, uint64_t o std::unique_lock l(m); const auto part_oid = info.part_oid(part_num); l.unlock(); - rgw::cls::fifo::trim_part(&op, tag, ofs, exclusive); + rgw::cls::fifo::trim_part(&op, ofs, exclusive); auto r = ioctx.aio_operate(part_oid, c, &op); ceph_assert(r >= 0); } @@ -1622,8 +1597,8 @@ int FIFO::list(const DoutPrefixProvider *dpp, int max_entries, auto part_oid = info.part_oid(part_num); l.unlock(); - r = list_part(dpp, ioctx, part_oid, {}, ofs, max_entries, &entries, - &part_more, &part_full, nullptr, tid, y); + r = list_part(dpp, ioctx, part_oid, ofs, max_entries, &entries, + &part_more, &part_full, tid, y); if (r == -ENOENT) { ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ << " missing part, rereading metadata" @@ -1735,7 +1710,7 @@ int FIFO::trim(const DoutPrefixProvider *dpp, std::string_view markstr, bool exc << " pn=" << pn << " tid=" << tid << dendl; std::unique_lock l(m); l.unlock(); - r = trim_part(dpp, pn, max_part_size, std::nullopt, false, tid, y); + r = trim_part(dpp, pn, max_part_size, false, tid, y); if (r < 0 && r == -ENOENT) { ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ << " trim_part failed: r=" << r @@ -1744,7 +1719,7 @@ int FIFO::trim(const DoutPrefixProvider *dpp, std::string_view markstr, bool exc } ++pn; } - r = trim_part(dpp, part_num, ofs, std::nullopt, exclusive, tid, y); + r = trim_part(dpp, part_num, ofs, exclusive, tid, y); if (r < 0 && r != -ENOENT) { ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ << " trim_part failed: r=" << r @@ -1837,13 +1812,12 @@ struct Trimmer : public Completion { if (pn < part_num) { ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ << " pn=" << pn << " tid=" << tid << dendl; - fifo->trim_part(dpp, pn++, max_part_size, std::nullopt, - false, tid, call(std::move(p))); + fifo->trim_part(dpp, pn++, max_part_size, false, tid, + call(std::move(p))); } else { update = true; canceled = tail_part_num < part_num; - fifo->trim_part(dpp, part_num, ofs, std::nullopt, exclusive, tid, - call(std::move(p))); + fifo->trim_part(dpp, part_num, ofs, exclusive, tid, call(std::move(p))); } return; } @@ -1870,8 +1844,8 @@ struct Trimmer : public Completion { std::unique_lock l(fifo->m); const auto max_part_size = fifo->info.params.max_part_size; l.unlock(); - fifo->trim_part(dpp, pn++, max_part_size, std::nullopt, - false, tid, call(std::move(p))); + fifo->trim_part(dpp, pn++, max_part_size, false, tid, + call(std::move(p))); return; } @@ -1880,8 +1854,7 @@ struct Trimmer : public Completion { l.unlock(); update = true; canceled = tail_part_num < part_num; - fifo->trim_part(dpp, part_num, ofs, std::nullopt, exclusive, tid, - call(std::move(p))); + fifo->trim_part(dpp, part_num, ofs, exclusive, tid, call(std::move(p))); return; } @@ -1942,8 +1915,7 @@ void FIFO::trim(const DoutPrefixProvider *dpp, std::string_view markstr, bool ex } else { trimmer->update = true; } - trim_part(dpp, pn, ofs, std::nullopt, exclusive, - tid, Trimmer::call(std::move(trimmer))); + trim_part(dpp, pn, ofs, exclusive, tid, Trimmer::call(std::move(trimmer))); } int FIFO::get_part_info(const DoutPrefixProvider *dpp, int64_t part_num, @@ -2064,8 +2036,7 @@ private: pp_callback, } state; - void create_part(const DoutPrefixProvider *dpp, Ptr&& p, int64_t part_num, - std::string_view tag) { + void create_part(const DoutPrefixProvider *dpp, Ptr&& p, int64_t part_num) { ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ << " entering: tid=" << tid << dendl; state = entry_callback; @@ -2073,7 +2044,7 @@ private: op.create(false); /* We don't need exclusivity, part_init ensures we're creating from the same journal entry. */ std::unique_lock l(fifo->m); - part_init(&op, tag, fifo->info.params); + part_init(&op, fifo->info.params); auto oid = fifo->info.part_oid(part_num); l.unlock(); auto r = fifo->ioctx.aio_operate(oid, call(std::move(p)), &op); @@ -2081,8 +2052,7 @@ private: return; } - void remove_part(const DoutPrefixProvider *dpp, Ptr&& p, int64_t part_num, - std::string_view tag) { + void remove_part(const DoutPrefixProvider *dpp, Ptr&& p, int64_t part_num) { ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ << " entering: tid=" << tid << dendl; state = entry_callback; @@ -2105,7 +2075,8 @@ private: << " finishing entry: entry=" << entry << " tid=" << tid << dendl; - if (entry.op == fifo::journal_entry::Op::remove && r == -ENOENT) + using enum fifo::journal_entry::Op; + if (entry.op == remove && r == -ENOENT) r = 0; if (r < 0) { @@ -2116,18 +2087,18 @@ private: return; } else { switch (entry.op) { - case fifo::journal_entry::Op::unknown: - case fifo::journal_entry::Op::set_head: + case unknown: + case set_head: // Can't happen. Filtered out in process. complete(std::move(p), -EIO); return; - case fifo::journal_entry::Op::create: + case create: if (entry.part_num > new_max) { new_max = entry.part_num; } break; - case fifo::journal_entry::Op::remove: + case remove: if (entry.part_num >= new_tail) { new_tail = entry.part_num + 1; } @@ -2269,18 +2240,19 @@ public: << " tid=" << tid << dendl; const auto entry = iter->second; switch (entry.op) { - case fifo::journal_entry::Op::create: - create_part(dpp, std::move(p), entry.part_num, entry.part_tag); + using enum fifo::journal_entry::Op; + case create: + create_part(dpp, std::move(p), entry.part_num); return; - case fifo::journal_entry::Op::set_head: + case set_head: if (entry.part_num > new_head) { new_head = entry.part_num; } processed.push_back(entry); ++iter; continue; - case fifo::journal_entry::Op::remove: - remove_part(dpp, std::move(p), entry.part_num, entry.part_tag); + case remove: + remove_part(dpp, std::move(p), entry.part_num); return; default: ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ @@ -2375,9 +2347,8 @@ public: l.unlock(); read = false; - auto op = list_part(f->cct, {}, ofs, max_entries, &r_out, - &entries, &part_more, &part_full, - nullptr, tid); + auto op = list_part(f->cct, ofs, max_entries, &r_out, + &entries, &part_more, &part_full, tid); f->ioctx.aio_operate(part_oid, call(std::move(p)), &op, nullptr); } else { complete(std::move(p), 0); diff --git a/src/rgw/driver/rados/cls_fifo_legacy.h b/src/rgw/driver/rados/cls_fifo_legacy.h index 02ad25bc6a9..96c393e43a7 100644 --- a/src/rgw/driver/rados/cls_fifo_legacy.h +++ b/src/rgw/driver/rados/cls_fifo_legacy.h @@ -125,8 +125,6 @@ class FIFO { std::string oid) : ioctx(std::move(ioc)), oid(oid) {} - std::string generate_tag() const; - int apply_update(const DoutPrefixProvider *dpp, fifo::info* info, const fifo::objv& objv, @@ -138,9 +136,9 @@ class FIFO { void _update_meta(const DoutPrefixProvider *dpp, const fifo::update& update, fifo::objv version, bool* pcanceled, std::uint64_t tid, lr::AioCompletion* c); - int create_part(const DoutPrefixProvider *dpp, int64_t part_num, std::string_view tag, std::uint64_t tid, + int create_part(const DoutPrefixProvider *dpp, int64_t part_num, std::uint64_t tid, optional_yield y); - int remove_part(const DoutPrefixProvider *dpp, int64_t part_num, std::string_view tag, std::uint64_t tid, + int remove_part(const DoutPrefixProvider *dpp, int64_t part_num, std::uint64_t tid, optional_yield y); int process_journal(const DoutPrefixProvider *dpp, std::uint64_t tid, optional_yield y); void process_journal(const DoutPrefixProvider *dpp, std::uint64_t tid, lr::AioCompletion* c); @@ -153,11 +151,9 @@ class FIFO { void push_entries(const std::deque& data_bufs, std::uint64_t tid, lr::AioCompletion* c); int trim_part(const DoutPrefixProvider *dpp, int64_t part_num, uint64_t ofs, - std::optional tag, bool exclusive, - std::uint64_t tid, optional_yield y); + bool exclusive, std::uint64_t tid, optional_yield y); void trim_part(const DoutPrefixProvider *dpp, int64_t part_num, uint64_t ofs, - std::optional tag, bool exclusive, - std::uint64_t tid, lr::AioCompletion* c); + bool exclusive, std::uint64_t tid, lr::AioCompletion* c); /// Force refresh of metadata, yielding/blocking style int read_meta(const DoutPrefixProvider *dpp, std::uint64_t tid, optional_yield y); diff --git a/src/test/CMakeLists.txt b/src/test/CMakeLists.txt index 8909788a4f8..c9fa36e3821 100644 --- a/src/test/CMakeLists.txt +++ b/src/test/CMakeLists.txt @@ -38,7 +38,6 @@ if(NOT WIN32) # libcls_* dependencies cascade to osd, kv and other libs that are not # available on Windows yet. add_subdirectory(cls_hello) - add_subdirectory(cls_fifo) add_subdirectory(cls_cas) add_subdirectory(cls_lock) add_subdirectory(cls_log) diff --git a/src/test/cls_fifo/CMakeLists.txt b/src/test/cls_fifo/CMakeLists.txt deleted file mode 100644 index 3abf7634a2f..00000000000 --- a/src/test/cls_fifo/CMakeLists.txt +++ /dev/null @@ -1,34 +0,0 @@ -add_executable(ceph_test_cls_fifo - test_cls_fifo.cc - ) -target_link_libraries(ceph_test_cls_fifo - neorados_cls_fifo - libneorados - spawn - ${UNITTEST_LIBS} - ${BLKID_LIBRARIES} - ${CMAKE_DL_LIBS} - ${CRYPTO_LIBS} - ${EXTRALIBS} - neoradostest-support - ) -install(TARGETS - ceph_test_cls_fifo - DESTINATION ${CMAKE_INSTALL_BINDIR}) - -add_executable(ceph_bench_cls_fifo - bench_cls_fifo.cc - ) -target_link_libraries(ceph_bench_cls_fifo - neorados_cls_fifo - libneorados - spawn - ${UNITTEST_LIBS} - ${BLKID_LIBRARIES} - ${CMAKE_DL_LIBS} - ${CRYPTO_LIBS} - ${EXTRALIBS} - ) -install(TARGETS - ceph_test_cls_fifo - DESTINATION ${CMAKE_INSTALL_BINDIR}) diff --git a/src/test/cls_fifo/bench_cls_fifo.cc b/src/test/cls_fifo/bench_cls_fifo.cc deleted file mode 100644 index a8ba1aa4b94..00000000000 --- a/src/test/cls_fifo/bench_cls_fifo.cc +++ /dev/null @@ -1,469 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab -/* - * Ceph - scalable distributed file system - * - * Copyright (C) 2020 Red Hat, Inc. - * - * This is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software - * Foundation. See file COPYING. - * - */ - - -#include -#include -#include -#include -#include -#include -#include - -#include -#include -#include - -#undef FMT_HEADER_ONLY -#define FMT_HEADER_ONLY 1 -#include -#include -#include - -#include - -#include "include/neorados/RADOS.hpp" - -#include "neorados/cls/fifo.h" - -using namespace std; - -namespace ba = boost::asio; -namespace bs = boost::system; -namespace bpo = boost::program_options; -namespace cb = ceph::buffer; -namespace R = neorados; -namespace RCf = neorados::cls::fifo; -namespace fifo = rados::cls::fifo; -namespace s = spawn; -namespace sc = std::chrono; - -#if FMT_VERSION >= 90000 -template<> -struct fmt::formatter : fmt::ostream_formatter {}; -#endif - -namespace { -static constexpr auto PUSH = 0x01 << 0; -static constexpr auto PULL = 0x01 << 1; -static constexpr auto BOTH = PUSH | PULL; -static constexpr auto CLEAN = 0x01 << 2; -static constexpr auto METADATA = 0x01 << 3; -static constexpr auto PARTINFO = 0x01 << 4; -static constexpr auto LIST = 0x01 << 5; - -struct benchmark { - std::uint32_t entries = 0; - sc::duration elapsed = 0ns; - - std::uint64_t ratio() const { - return entries/std::max(elapsed, - sc::duration(1ns)).count(); - } - benchmark() = default; - benchmark(std::uint32_t entries, sc::duration elapsed) - : entries(entries), elapsed(elapsed) {} -}; - -benchmark push(RCf::FIFO& f, const std::uint32_t count, - const std::uint32_t entry_size, const std::uint32_t push_entries, - s::yield_context y) -{ - cb::list entry; - entry.push_back(cb::create_small_page_aligned(entry_size)); - entry.zero(); - - std::vector entries(std::min(count, push_entries), entry); - auto remaining = count; - auto start = sc::steady_clock::now(); - while (remaining) { - if (entries.size() > remaining) { - entries.resize(remaining); - } - f.push(entries, y); - remaining -= entries.size(); - } - auto finish = sc::steady_clock::now(); - return benchmark(count, (finish - start)); -} - -benchmark pull(RCf::FIFO& f, const std::uint32_t count, - const std::uint32_t pull_entries, s::yield_context y) -{ - auto remaining = count; - std::uint32_t got = 0; - - auto start = sc::steady_clock::now(); - while (remaining) { - auto [result, more] = f.list(std::min(remaining, pull_entries), - std::nullopt, y); - if (result.empty()) - break; - got += result.size(); - remaining -= result.size(); - f.trim(result.back().marker, false, y); - } - auto finish = sc::steady_clock::now(); - return benchmark(got, (finish - start)); -} - -void concurpull(const std::string& oid, const std::int64_t pool, - const std::uint32_t count, const std::uint32_t pull_entries, - std::promise notify, const bool* const exit_early) -{ - ba::io_context c; - benchmark bench; - std::exception_ptr ex; - s::spawn( - c, - [&](s::yield_context y) { - try { - auto r = R::RADOS::Builder{}.build(c, y); - R::IOContext ioc(pool); - auto f = RCf::FIFO::open(r, ioc, oid, y); - auto remaining = count; - std::uint32_t got = 0; - - auto start = sc::steady_clock::now(); - while (remaining) { - if (*exit_early) break; - auto [result, more] = - f->list(std::min(remaining, pull_entries), std::nullopt, y); - if (result.empty()) { - // We just keep going assuming they'll push more. - continue; - } - got += result.size(); - remaining -= result.size(); - if (*exit_early) break; - f->trim(result.back().marker, false, y); - } - auto finish = sc::steady_clock::now(); - bench.entries = got; - bench.elapsed = finish - start; - } catch (const std::exception&) { - ex = std::current_exception(); - } - }); - c.run(); - if (ex) { - notify.set_exception(std::current_exception()); - } else { - notify.set_value(bench); - } -} - -void clean(R::RADOS& r, const R::IOContext& ioc, RCf::FIFO& f, - s::yield_context y) -{ - f.read_meta(y); - const auto info = f.meta(); - if (info.head_part_num > -1) { - for (auto i = info.tail_part_num; i <= info.head_part_num; ++i) { - R::WriteOp op; - op.remove(); - r.execute(info.part_oid(i), ioc, std::move(op), y); - } - } - R::WriteOp op; - op.remove(); - r.execute(info.id, ioc, std::move(op), y); -} -} - -int main(int argc, char* argv[]) -{ - const std::string_view prog(argv[0]); - std::string command; - try { - std::uint32_t count = 0; - std::string oid; - std::string pool; - std::uint32_t entry_size = 0; - std::uint32_t push_entries = 0; - std::uint32_t pull_entries = 0; - std::uint64_t max_part_size = 0; - std::uint64_t max_entry_size = 0; - std::int64_t part_num = 0; - std::string marker; - - bpo::options_description desc(fmt::format("{} options", prog)); - desc.add_options() - ("help", "show help") - ("oid", bpo::value(&oid)->default_value("fifo"s), - "the base oid for the fifo") - ("pool", bpo::value(&pool)->default_value("fifo_benchmark"s), - "the base oid for the fifo") - ("count", bpo::value(&count)->default_value(1024), - "total count of items") - ("entry-size", bpo::value(&entry_size)->default_value(64), - "size of entries to push") - ("push-entries", - bpo::value(&push_entries) - ->default_value(512), "entries to push per call") - ("max-part-size", bpo::value(&max_part_size) - ->default_value(RCf::default_max_part_size), - "maximum entry size allowed by FIFO") - ("max-entry-size", bpo::value(&max_entry_size) - ->default_value(RCf::default_max_entry_size), - "maximum entry size allowed by FIFO") - ("pull-entries", - bpo::value(&pull_entries) - ->default_value(512), "entries to pull per call") - ("part-num", - bpo::value(&part_num) - ->default_value(-1), "partition number, -1 for head") - ("marker", bpo::value(&marker), "marker to begin list") - ("command", bpo::value(&command), - "the operation to perform"); - - bpo::positional_options_description p; - p.add("command", 1); - - bpo::variables_map vm; - - bpo::store(bpo::command_line_parser(argc, argv). - options(desc).positional(p).run(), vm); - - bpo::notify(vm); - - if (vm.count("help")) { - fmt::print(std::cout, "{}", desc); - fmt::print(std::cout, "\n{} commands:\n", prog); - fmt::print(std::cout, " push\t\t\t push entries into fifo\n"); - fmt::print(std::cout, " pull\t\t\t retrieve and trim entries\n"); - fmt::print(std::cout, " both\t\t\t both at once, in two threads\n"); - fmt::print(std::cout, " metadata\t\t\t print metadata\n"); - fmt::print(std::cout, " partinfo\t\t\t print metadata\n"); - fmt::print(std::cout, " list\t\t\t list entries\n"); - fmt::print(std::cout, " clean\t\t\t clean up\n"); - return 0; - } - - - if (vm.find("command") == vm.end()) { - fmt::print(std::cerr, "{}: a command is required\n", prog); - return 1; - } - - int op = 0; - if (command == "push"s) { - op = PUSH; - } else if (command == "pull"s) { - op = PULL; - } else if (command == "both"s) { - op = BOTH; - } else if (command == "clean"s) { - op = CLEAN; - } else if (command == "metadata"s) { - op = METADATA; - } else if (command == "partinfo"s) { - op = PARTINFO; - } else if (command == "list"s) { - op = LIST; - } else { - fmt::print(std::cerr, "{}: {} is not a valid command\n", - prog, command); - return 1; - } - - if (!(op & PULL) && !vm["pull-entries"].defaulted()) { - fmt::print(std::cerr, "{}: pull-entries is only meaningful when pulling\n", - prog); - return 1; - } - - if (!(op & PUSH)) { - for (const auto& p : { "entry-size"s, "push-entries"s, "max-part-size"s, - "max-entry-size"s }) { - if (!vm[p].defaulted()) { - fmt::print(std::cerr, "{}: {} is only meaningful when pushing\n", - prog, p); - return 1; - } - } - } - - if (!(op & BOTH) && !(op & LIST) && !vm["count"].defaulted()) { - fmt::print(std::cerr, "{}: count is only meaningful when pulling, pushing, both, or listing\n", - prog); - return 1; - } - - if (!(op & PARTINFO) && !vm["part-num"].defaulted()) { - fmt::print(std::cerr, "{}: part-num is only meaningful when getting part info\n", - prog); - return 1; - } - - if (count == 0) { - fmt::print(std::cerr, "{}: count must be nonzero\n", prog); - return 1; - } - - if ((op & PULL) && (pull_entries == 0)) { - fmt::print(std::cerr, - "{}: pull-entries must be nonzero\n", prog); - return 1; - } - - if (!(op & LIST) && vm.count("marker") > 0) { - fmt::print(std::cerr, "{}: marker is only meaningful when listing\n", - prog); - return 1; - } - - if (op & PUSH) { - if (entry_size == 0) { - fmt::print(std::cerr, "{}: entry-size must be nonzero\n", prog); - return 1; - } - if (push_entries== 0) { - fmt::print(std::cerr, "{}: push-entries must be nonzero\n", prog); - return 1; - } - if (max_entry_size == 0) { - fmt::print(std::cerr, "{}: max-entry-size must be nonzero\n", prog); - return 1; - } - if (max_part_size == 0) { - fmt::print(std::cerr, "{}: max-part-size must be nonzero\n", prog); - return 1; - } - if (entry_size > max_entry_size) { - fmt::print(std::cerr, - "{}: entry-size may not be greater than max-entry-size\n", - prog); - return 1; - } - if (max_entry_size >= max_part_size) { - fmt::print(std::cerr, - "{}: max-entry-size may be less than max-part-size\n", - prog); - return 1; - } - } - - ba::io_context c; - benchmark pushmark, pullmark; - fifo::info meta; - fifo::part_header partinfo; - bool more = false; - std::vector entries; - s::spawn( - c, - [&](s::yield_context y) { - auto r = R::RADOS::Builder{}.build(c, y); - bs::error_code ec; - std::int64_t pid; - pid = r.lookup_pool(pool, y[ec]); - if (ec) { - r.create_pool(pool, std::nullopt, y); - pid = r.lookup_pool(pool, y); - } - const R::IOContext ioc(pid); - auto f = RCf::FIFO::create(r, ioc, oid, y, std::nullopt, - std::nullopt, false, max_part_size, - max_entry_size); - - switch (op) { - case PUSH: - pushmark = push(*f, count, entry_size, push_entries, y); - break; - - case PULL: - pullmark = pull(*f, count, pull_entries, y); - break; - - case METADATA: - meta = f->meta(); - break; - - case PARTINFO: - meta = f->meta(); - if (part_num == -1) { - part_num = meta.head_part_num; - } - partinfo = f->get_part_info(part_num, y); - break; - - case LIST: - if (vm.count("marker") == 0) { - std::tie(entries, more) = f->list(count, std::nullopt, y); - } else { - std::tie(entries, more) = f->list(count, marker, y); - } - break; - - case BOTH: { - std::promise notify; - bool exit_early = false; - - auto notifier = notify.get_future(); - std::thread t(concurpull, oid, pid, count, pull_entries, - std::move(notify), &exit_early); - t.detach(); - try { - pushmark = push(*f, count, entry_size, push_entries, y); - } catch (const std::exception&) { - exit_early = true; - notifier.wait(); - throw; - } - pullmark = notifier.get(); - } - } - - if (op & CLEAN) - clean(r, ioc, *f, y); - }); - c.run(); - if (op & PUSH) { - fmt::print("Pushed {} in {} at {}/s\n", - pushmark.entries, pushmark.elapsed, pushmark.ratio()); - } - if (op & PULL) { - if (pullmark.entries == count) { - fmt::print(std::cout, "Pulled {} in {} at {}/s\n", - pullmark.entries, pullmark.elapsed, pullmark.ratio()); - } else { - fmt::print(std::cout, "Pulled {} (of {} requested), in {} at {}/s\n", - pullmark.entries, count, pullmark.elapsed, pullmark.ratio()); - } - } - if (op & METADATA) { - fmt::print(std::cout, "Metadata: [{}]\n", meta); - } - if (op & PARTINFO) { - fmt::print(std::cout, "Info for partition {}: [{}]\n", part_num, partinfo); - } - if (op & LIST) { - for (const auto& entry : entries) { - fmt::print(std::cout, "{}\t{}\n", entry.marker, entry.mtime); - } - if (more) { - fmt::print(std::cout, "..."); - } - } - } catch (const std::exception& e) { - if (command.empty()) { - fmt::print(std::cerr, "{}: {}\n", prog, e.what()); - } else { - fmt::print(std::cerr, "{}: {}: {}\n", prog, command, e.what()); - } - return 1; - } - - return 0; -} diff --git a/src/test/cls_fifo/test_cls_fifo.cc b/src/test/cls_fifo/test_cls_fifo.cc deleted file mode 100644 index c7a40413bb8..00000000000 --- a/src/test/cls_fifo/test_cls_fifo.cc +++ /dev/null @@ -1,741 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab -/* - * Ceph - scalable distributed file system - * - * Copyright (C) 2019 Red Hat, Inc. - * - * This is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software - * Foundation. See file COPYING. - * - */ - -#include -#include -#include - -#include -#include - -#include - -#include "include/scope_guard.h" -#include "include/types.h" -#include "include/neorados/RADOS.hpp" - -#include "cls/fifo/cls_fifo_ops.h" - -#include "neorados/cls/fifo.h" - -#include "test/neorados/common_tests.h" - -#include "gtest/gtest.h" - -using namespace std; - -namespace R = neorados; -namespace ba = boost::asio; -namespace bs = boost::system; -namespace cb = ceph::buffer; -namespace fifo = rados::cls::fifo; -namespace RCf = neorados::cls::fifo; -namespace s = spawn; - -namespace { -void fifo_create(R::RADOS& r, - const R::IOContext& ioc, - const R::Object& oid, - std::string_view id, - s::yield_context y, - std::optional objv = std::nullopt, - std::optional oid_prefix = std::nullopt, - bool exclusive = false, - std::uint64_t max_part_size = RCf::default_max_part_size, - std::uint64_t max_entry_size = RCf::default_max_entry_size) -{ - R::WriteOp op; - RCf::create_meta(op, id, objv, oid_prefix, exclusive, max_part_size, - max_entry_size); - r.execute(oid, ioc, std::move(op), y); -} -} - -TEST(ClsFIFO, TestCreate) { - ba::io_context c; - auto fifo_id = "fifo"sv; - R::Object oid(fifo_id); - - s::spawn(c, [&](s::yield_context y) { - auto r = R::RADOS::Builder{}.build(c, y); - auto pool = create_pool(r, get_temp_pool_name(), y); - auto sg = make_scope_guard( - [&] { - r.delete_pool(pool, y); - }); - R::IOContext ioc(pool); - bs::error_code ec; - fifo_create(r, ioc, oid, ""s, y[ec]); - EXPECT_EQ(bs::errc::invalid_argument, ec); - fifo_create(r, ioc, oid, fifo_id, y[ec], std::nullopt, - std::nullopt, false, 0); - EXPECT_EQ(bs::errc::invalid_argument, ec); - fifo_create(r, ioc, oid, {}, y[ec], - std::nullopt, std::nullopt, - false, RCf::default_max_part_size, 0); - EXPECT_EQ(bs::errc::invalid_argument, ec); - fifo_create(r, ioc, oid, fifo_id, y); - { - std::uint64_t size; - std::uint64_t size2; - { - R::ReadOp op; - op.stat(&size, nullptr); - r.execute(oid, ioc, std::move(op), - nullptr, y); - EXPECT_GT(size, 0); - } - - { - R::ReadOp op; - op.stat(&size2, nullptr); - r.execute(oid, ioc, std::move(op), nullptr, y); - } - EXPECT_EQ(size2, size); - } - /* test idempotency */ - fifo_create(r, ioc, oid, fifo_id, y); - fifo_create(r, ioc, oid, {}, y[ec], std::nullopt, - std::nullopt, false); - EXPECT_EQ(bs::errc::invalid_argument, ec); - fifo_create(r, ioc, oid, {}, y[ec], std::nullopt, - "myprefix"sv, false); - EXPECT_EQ(bs::errc::invalid_argument, ec); - fifo_create(r, ioc, oid, "foo"sv, y[ec], - std::nullopt, std::nullopt, false); - EXPECT_EQ(bs::errc::file_exists, ec); - }); - c.run(); -} - -TEST(ClsFIFO, TestGetInfo) { - ba::io_context c; - auto fifo_id = "fifo"sv; - R::Object oid(fifo_id); - - s::spawn(c, [&](s::yield_context y) { - auto r = R::RADOS::Builder{}.build(c, y); - auto pool = create_pool(r, get_temp_pool_name(), y); - auto sg = make_scope_guard( - [&] { - r.delete_pool(pool, y); - }); - R::IOContext ioc(pool); - /* first successful create */ - fifo_create(r, ioc, oid, fifo_id, y); - - fifo::info info; - std::uint32_t part_header_size; - std::uint32_t part_entry_overhead; - { - R::ReadOp op; - RCf::get_meta(op, std::nullopt, - nullptr, &info, &part_header_size, - &part_entry_overhead); - r.execute(oid, ioc, std::move(op), nullptr, y); - EXPECT_GT(part_header_size, 0); - EXPECT_GT(part_entry_overhead, 0); - EXPECT_FALSE(info.version.instance.empty()); - } - { - R::ReadOp op; - RCf::get_meta(op, info.version, - nullptr, &info, &part_header_size, - &part_entry_overhead); - r.execute(oid, ioc, std::move(op), nullptr, y); - } - { - R::ReadOp op; - fifo::objv objv; - objv.instance = "foo"; - objv.ver = 12; - RCf::get_meta(op, objv, - nullptr, &info, &part_header_size, - &part_entry_overhead); - ASSERT_ANY_THROW(r.execute(oid, ioc, std::move(op), - nullptr, y)); - } - }); - c.run(); -} - -TEST(FIFO, TestOpenDefault) { - ba::io_context c; - auto fifo_id = "fifo"s; - - s::spawn(c, [&](s::yield_context y) { - auto r = R::RADOS::Builder{}.build(c, y); - auto pool = create_pool(r, get_temp_pool_name(), y); - auto sg = make_scope_guard( - [&] { - r.delete_pool(pool, y); - }); - R::IOContext ioc(pool); - auto fifo = RCf::FIFO::create(r, ioc, fifo_id, y); - // force reading from backend - fifo->read_meta(y); - auto info = fifo->meta(); - EXPECT_EQ(info.id, fifo_id); - }); - c.run(); -} - -TEST(FIFO, TestOpenParams) { - ba::io_context c; - auto fifo_id = "fifo"sv; - - s::spawn(c, [&](s::yield_context y) { - auto r = R::RADOS::Builder{}.build(c, y); - auto pool = create_pool(r, get_temp_pool_name(), y); - auto sg = make_scope_guard( - [&] { - r.delete_pool(pool, y); - }); - R::IOContext ioc(pool); - - const std::uint64_t max_part_size = 10 * 1024; - const std::uint64_t max_entry_size = 128; - auto oid_prefix = "foo.123."sv; - fifo::objv objv; - objv.instance = "fooz"s; - objv.ver = 10; - - /* first successful create */ - auto f = RCf::FIFO::create(r, ioc, fifo_id, y, objv, oid_prefix, - false, max_part_size, - max_entry_size); - - - /* force reading from backend */ - f->read_meta(y); - auto info = f->meta(); - ASSERT_EQ(info.id, fifo_id); - ASSERT_EQ(info.params.max_part_size, max_part_size); - ASSERT_EQ(info.params.max_entry_size, max_entry_size); - ASSERT_EQ(info.version, objv); - }); - c.run(); -} - -namespace { -template -std::pair decode_entry(const RCf::list_entry& entry) -{ - T val; - auto iter = entry.data.cbegin(); - decode(val, iter); - return std::make_pair(std::move(val), entry.marker); -} -} - - - -TEST(FIFO, TestPushListTrim) { - ba::io_context c; - auto fifo_id = "fifo"sv; - - s::spawn(c, [&](s::yield_context y) mutable { - auto r = R::RADOS::Builder{}.build(c, y); - auto pool = create_pool(r, get_temp_pool_name(), y); - auto sg = make_scope_guard( - [&] { - r.delete_pool(pool, y); - }); - R::IOContext ioc(pool); - auto f = RCf::FIFO::create(r, ioc, fifo_id, y); - static constexpr auto max_entries = 10u; - for (uint32_t i = 0; i < max_entries; ++i) { - cb::list bl; - encode(i, bl); - f->push(bl, y); - } - - std::optional marker; - /* get entries one by one */ - - for (auto i = 0u; i < max_entries; ++i) { - auto [result, more] = f->list(1, marker, y); - - bool expected_more = (i != (max_entries - 1)); - ASSERT_EQ(expected_more, more); - ASSERT_EQ(1, result.size()); - - std::uint32_t val; - std::tie(val, marker) = - decode_entry(result.front()); - - ASSERT_EQ(i, val); - } - - /* get all entries at once */ - std::string markers[max_entries]; - std::uint32_t min_entry = 0; - { - auto [result, more] = f->list(max_entries * 10, std::nullopt, - y); - - ASSERT_FALSE(more); - ASSERT_EQ(max_entries, result.size()); - - - for (auto i = 0u; i < max_entries; ++i) { - std::uint32_t val; - - std::tie(val, markers[i]) = - decode_entry(result[i]); - ASSERT_EQ(i, val); - } - - - /* trim one entry */ - f->trim(markers[min_entry], false, y); - ++min_entry; - } - - auto [result, more] = f->list(max_entries * 10, - std::nullopt, y); - - ASSERT_FALSE(more); - ASSERT_EQ(max_entries - min_entry, result.size()); - - for (auto i = min_entry; i < max_entries; ++i) { - std::uint32_t val; - - std::tie(val, markers[i - min_entry]) = - decode_entry(result[i - min_entry]); - ASSERT_EQ(i, val); - } - - }); - c.run(); -} - - -TEST(FIFO, TestPushTooBig) { - ba::io_context c; - auto fifo_id = "fifo"sv; - static constexpr auto max_part_size = 2048ull; - static constexpr auto max_entry_size = 128ull; - - s::spawn(c, [&](s::yield_context y) { - auto r = R::RADOS::Builder{}.build(c, y); - auto pool = create_pool(r, get_temp_pool_name(), y); - auto sg = make_scope_guard( - [&] { - r.delete_pool(pool, y); - }); - R::IOContext ioc(pool); - - auto f = RCf::FIFO::create(r, ioc, fifo_id, y, std::nullopt, - std::nullopt, false, max_part_size, - max_entry_size); - - char buf[max_entry_size + 1]; - memset(buf, 0, sizeof(buf)); - - cb::list bl; - bl.append(buf, sizeof(buf)); - - bs::error_code ec; - f->push(bl, y[ec]); - EXPECT_EQ(RCf::errc::entry_too_large, ec); - }); - c.run(); -} - - -TEST(FIFO, TestMultipleParts) { - ba::io_context c; - auto fifo_id = "fifo"sv; - static constexpr auto max_part_size = 2048ull; - static constexpr auto max_entry_size = 128ull; - - s::spawn(c, [&](s::yield_context y) mutable { - auto r = R::RADOS::Builder{}.build(c, y); - auto pool = create_pool(r, get_temp_pool_name(), y); - auto sg = make_scope_guard( - [&] { - r.delete_pool(pool, y); - }); - R::IOContext ioc(pool); - - auto f = RCf::FIFO::create(r, ioc, fifo_id, y, std::nullopt, - std::nullopt, false, max_part_size, - max_entry_size); - - - char buf[max_entry_size]; - memset(buf, 0, sizeof(buf)); - - const auto [part_header_size, part_entry_overhead] = - f->get_part_layout_info(); - - const auto entries_per_part = - (max_part_size - part_header_size) / - (max_entry_size + part_entry_overhead); - - const auto max_entries = entries_per_part * 4 + 1; - - /* push enough entries */ - for (auto i = 0u; i < max_entries; ++i) { - cb::list bl; - - *(int *)buf = i; - bl.append(buf, sizeof(buf)); - - f->push(bl, y); - } - - auto info = f->meta(); - - ASSERT_EQ(info.id, fifo_id); - /* head should have advanced */ - ASSERT_GT(info.head_part_num, 0); - - - /* list all at once */ - auto [result, more] = f->list(max_entries, std::nullopt, y); - EXPECT_EQ(false, more); - - ASSERT_EQ(max_entries, result.size()); - - for (auto i = 0u; i < max_entries; ++i) { - auto& bl = result[i].data; - ASSERT_EQ(i, *(int *)bl.c_str()); - } - - std::optional marker; - /* get entries one by one */ - - for (auto i = 0u; i < max_entries; ++i) { - auto [result, more] = f->list(1, marker, y); - ASSERT_EQ(result.size(), 1); - const bool expected_more = (i != (max_entries - 1)); - ASSERT_EQ(expected_more, more); - - std::uint32_t val; - std::tie(val, marker) = - decode_entry(result.front()); - - auto& entry = result.front(); - auto& bl = entry.data; - ASSERT_EQ(i, *(int *)bl.c_str()); - marker = entry.marker; - } - - /* trim one at a time */ - marker.reset(); - for (auto i = 0u; i < max_entries; ++i) { - /* read single entry */ - { - auto [result, more] = f->list(1, marker, y); - ASSERT_EQ(result.size(), 1); - const bool expected_more = (i != (max_entries - 1)); - ASSERT_EQ(expected_more, more); - - marker = result.front().marker; - - f->trim(*marker, false, y); - } - - /* check tail */ - info = f->meta(); - ASSERT_EQ(info.tail_part_num, i / entries_per_part); - - /* try to read all again, see how many entries left */ - auto [result, more] = f->list(max_entries, marker, y); - ASSERT_EQ(max_entries - i - 1, result.size()); - ASSERT_EQ(false, more); - } - - /* tail now should point at head */ - info = f->meta(); - ASSERT_EQ(info.head_part_num, info.tail_part_num); - - /* check old tails are removed */ - for (auto i = 0; i < info.tail_part_num; ++i) { - bs::error_code ec; - f->get_part_info(i, y[ec]); - ASSERT_EQ(bs::errc::no_such_file_or_directory, ec); - } - /* check current tail exists */ - f->get_part_info(info.tail_part_num, y); - }); - c.run(); -} - - -TEST(FIFO, TestTwoPushers) { - ba::io_context c; - auto fifo_id = "fifo"sv; - static constexpr auto max_part_size = 2048ull; - static constexpr auto max_entry_size = 128ull; - - s::spawn(c, [&](s::yield_context y) { - auto r = R::RADOS::Builder{}.build(c, y); - auto pool = create_pool(r, get_temp_pool_name(), y); - auto sg = make_scope_guard( - [&] { - r.delete_pool(pool, y); - }); - R::IOContext ioc(pool); - - auto f = RCf::FIFO::create(r, ioc, fifo_id, y, std::nullopt, - std::nullopt, false, max_part_size, - max_entry_size); - - - - char buf[max_entry_size]; - memset(buf, 0, sizeof(buf)); - - - auto [part_header_size, part_entry_overhead] = - f->get_part_layout_info(); - - const auto entries_per_part = - (max_part_size - part_header_size) / - (max_entry_size + part_entry_overhead); - - const auto max_entries = entries_per_part * 4 + 1; - - auto f2 = RCf::FIFO::open(r, ioc, fifo_id, y); - - std::vector fifos{&f, &f2}; - - for (auto i = 0u; i < max_entries; ++i) { - cb::list bl; - *(int *)buf = i; - bl.append(buf, sizeof(buf)); - - auto& f = fifos[i % fifos.size()]; - - (*f)->push(bl, y); - } - - /* list all by both */ - { - auto [result, more] = f2->list(max_entries, std::nullopt, y); - - ASSERT_EQ(false, more); - ASSERT_EQ(max_entries, result.size()); - } - auto [result, more] = f2->list(max_entries, std::nullopt, y); - ASSERT_EQ(false, more); - ASSERT_EQ(max_entries, result.size()); - - for (auto i = 0u; i < max_entries; ++i) { - auto& bl = result[i].data; - ASSERT_EQ(i, *(int *)bl.c_str()); - } - }); - c.run(); -} - - -TEST(FIFO, TestTwoPushersTrim) { - ba::io_context c; - auto fifo_id = "fifo"sv; - static constexpr auto max_part_size = 2048ull; - static constexpr auto max_entry_size = 128ull; - - s::spawn(c, [&](s::yield_context y) { - auto r = R::RADOS::Builder{}.build(c, y); - auto pool = create_pool(r, get_temp_pool_name(), y); - auto sg = make_scope_guard( - [&] { - r.delete_pool(pool, y); - }); - R::IOContext ioc(pool); - - auto f1 = RCf::FIFO::create(r, ioc, fifo_id, y, std::nullopt, - std::nullopt, false, max_part_size, - max_entry_size); - - char buf[max_entry_size]; - memset(buf, 0, sizeof(buf)); - - - auto [part_header_size, part_entry_overhead] = - f1->get_part_layout_info(); - - const auto entries_per_part = - (max_part_size - part_header_size) / - (max_entry_size + part_entry_overhead); - - const auto max_entries = entries_per_part * 4 + 1; - - auto f2 = RCf::FIFO::open(r, ioc, fifo_id, y); - - /* push one entry to f2 and the rest to f1 */ - - for (auto i = 0u; i < max_entries; ++i) { - cb::list bl; - - *(int *)buf = i; - bl.append(buf, sizeof(buf)); - - auto f = (i < 1 ? &f2 : &f1); - (*f)->push(bl, y); - } - - /* trim half by fifo1 */ - auto num = max_entries / 2; - - std::string marker; - { - auto [result, more] = f1->list(num, std::nullopt, y); - - ASSERT_EQ(true, more); - ASSERT_EQ(num, result.size()); - - for (auto i = 0u; i < num; ++i) { - auto& bl = result[i].data; - ASSERT_EQ(i, *(int *)bl.c_str()); - } - - auto& entry = result[num - 1]; - marker = entry.marker; - - f1->trim(marker, false, y); - - /* list what's left by fifo2 */ - - } - - const auto left = max_entries - num; - auto [result, more] = f2->list(left, marker, y); - ASSERT_EQ(left, result.size()); - ASSERT_EQ(false, more); - - for (auto i = num; i < max_entries; ++i) { - auto& bl = result[i - num].data; - ASSERT_EQ(i, *(int *)bl.c_str()); - } - }); - c.run(); -} - -TEST(FIFO, TestPushBatch) { - ba::io_context c; - auto fifo_id = "fifo"sv; - static constexpr auto max_part_size = 2048ull; - static constexpr auto max_entry_size = 128ull; - - s::spawn(c, [&](s::yield_context y) { - auto r = R::RADOS::Builder{}.build(c, y); - auto pool = create_pool(r, get_temp_pool_name(), y); - auto sg = make_scope_guard( - [&] { - r.delete_pool(pool, y); - }); - R::IOContext ioc(pool); - - auto f = RCf::FIFO::create(r, ioc, fifo_id, y, std::nullopt, - std::nullopt, false, max_part_size, - max_entry_size); - - - char buf[max_entry_size]; - memset(buf, 0, sizeof(buf)); - - auto [part_header_size, part_entry_overhead] - = f->get_part_layout_info(); - - auto entries_per_part = - (max_part_size - part_header_size) / - (max_entry_size + part_entry_overhead); - - auto max_entries = entries_per_part * 4 + 1; /* enough entries to span multiple parts */ - - std::vector bufs; - - for (auto i = 0u; i < max_entries; ++i) { - cb::list bl; - - *(int *)buf = i; - bl.append(buf, sizeof(buf)); - - bufs.push_back(bl); - } - - f->push(bufs, y); - - /* list all */ - - auto [result, more] = f->list(max_entries, std::nullopt, y); - ASSERT_EQ(false, more); - ASSERT_EQ(max_entries, result.size()); - - for (auto i = 0u; i < max_entries; ++i) { - auto& bl = result[i].data; - ASSERT_EQ(i, *(int *)bl.c_str()); - } - - auto& info = f->meta(); - ASSERT_EQ(info.head_part_num, 4); - }); - c.run(); -} - -TEST(FIFO, TestTrimExclusive) { - ba::io_context c; - auto fifo_id = "fifo"sv; - - s::spawn(c, [&](s::yield_context y) mutable { - auto r = R::RADOS::Builder{}.build(c, y); - auto pool = create_pool(r, get_temp_pool_name(), y); - auto sg = make_scope_guard( - [&] { - r.delete_pool(pool, y); - }); - R::IOContext ioc(pool); - auto f = RCf::FIFO::create(r, ioc, fifo_id, y); - static constexpr auto max_entries = 10u; - for (uint32_t i = 0; i < max_entries; ++i) { - cb::list bl; - encode(i, bl); - f->push(bl, y); - } - - { - auto [result, more] = f->list(1, std::nullopt, y); - auto [val, marker] = - decode_entry(result.front()); - ASSERT_EQ(0, val); - f->trim(marker, true, y); - } - { - auto [result, more] = f->list(max_entries, std::nullopt, y); - auto [val, marker] = decode_entry(result.front()); - ASSERT_EQ(0, val); - f->trim(result[4].marker, true, y); - } - { - auto [result, more] = f->list(max_entries, std::nullopt, y); - auto [val, marker] = - decode_entry(result.front()); - ASSERT_EQ(4, val); - f->trim(result.back().marker, true, y); - } - { - auto [result, more] = f->list(max_entries, std::nullopt, y); - auto [val, marker] = - decode_entry(result.front()); - ASSERT_EQ(result.size(), 1); - ASSERT_EQ(max_entries - 1, val); - } - }); - c.run(); -} diff --git a/src/test/rgw/test_cls_fifo_legacy.cc b/src/test/rgw/test_cls_fifo_legacy.cc index 64c53ec03e4..1fa5f8681ba 100644 --- a/src/test/rgw/test_cls_fifo_legacy.cc +++ b/src/test/rgw/test_cls_fifo_legacy.cc @@ -784,7 +784,6 @@ TEST_F(AioLegacyFIFO, TestMultipleParts) { auto c = R::Rados::aio_create_completion(); f->get_head_info(&dp, [&](int r, RCf::part_info&& p) { - ASSERT_TRUE(p.tag.empty()); ASSERT_EQ(0, p.magic); ASSERT_EQ(0, p.min_ofs); ASSERT_EQ(0, p.last_ofs); -- 2.39.5