From: Adam Emerson Date: Mon, 29 Apr 2024 21:39:59 +0000 (-0400) Subject: rgw/multisite/datalog: Remove cls_fifo_legacy X-Git-Tag: testing/wip-vshankar-testing-20250407.170244-debug~16^2~15 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=c0efcce83aca33dd8a776301440538735436218b;p=ceph-ci.git rgw/multisite/datalog: Remove cls_fifo_legacy Signed-off-by: Adam Emerson --- diff --git a/src/rgw/CMakeLists.txt b/src/rgw/CMakeLists.txt index 0c95fd7fbb9..35cf3960e32 100644 --- a/src/rgw/CMakeLists.txt +++ b/src/rgw/CMakeLists.txt @@ -164,7 +164,6 @@ set(librgw_common_srcs driver/rados/buckets.cc rgw_bucket_logging.cc rgw_rest_bucket_logging.cc - driver/rados/cls_fifo_legacy.cc driver/rados/group.cc driver/rados/groups.cc driver/rados/rgw_bucket.cc diff --git a/src/rgw/driver/rados/cls_fifo_legacy.cc b/src/rgw/driver/rados/cls_fifo_legacy.cc deleted file mode 100644 index a5cf2e017ee..00000000000 --- a/src/rgw/driver/rados/cls_fifo_legacy.cc +++ /dev/null @@ -1,2542 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab -/* - * Ceph - scalable distributed file system - * - * Copyright (C) 2020 Red Hat - * Author: Adam C. Emerson - * - * This is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software - * Foundation. See file COPYING. - * - */ - -#include -#include -#include -#include -#include - -#include - -#include "include/rados/librados.hpp" - -#include "include/buffer.h" - -#include "common/async/yield_context.h" -#include "common/random_string.h" -#include "common/strtol.h" // for ceph::parse() - -#include "cls/fifo/cls_fifo_types.h" -#include "cls/fifo/cls_fifo_ops.h" - -#include "cls_fifo_legacy.h" - -namespace rgw::cls::fifo { -namespace cb = ceph::buffer; -namespace fifo = rados::cls::fifo; - -using ceph::from_error_code; - -inline constexpr auto MAX_RACE_RETRIES = 10; - -void create_meta(lr::ObjectWriteOperation* op, - std::string_view id, - std::optional objv, - std::optional oid_prefix, - bool exclusive, - std::uint64_t max_part_size, - std::uint64_t max_entry_size) -{ - fifo::op::create_meta cm; - - cm.id = id; - cm.version = objv; - cm.oid_prefix = oid_prefix; - cm.max_part_size = max_part_size; - cm.max_entry_size = max_entry_size; - cm.exclusive = exclusive; - - cb::list in; - encode(cm, in); - op->exec(fifo::op::CLASS, fifo::op::CREATE_META, in); -} - -int get_meta(const DoutPrefixProvider *dpp, lr::IoCtx& ioctx, const std::string& oid, - std::optional objv, fifo::info* info, - std::uint32_t* part_header_size, - std::uint32_t* part_entry_overhead, - uint64_t tid, optional_yield y, - bool probe) -{ - lr::ObjectReadOperation op; - fifo::op::get_meta gm; - gm.version = objv; - cb::list in; - encode(gm, in); - cb::list bl; - - op.exec(fifo::op::CLASS, fifo::op::GET_META, in, - &bl, nullptr); - auto r = rgw_rados_operate(dpp, ioctx, oid, std::move(op), nullptr, y); - if (r >= 0) try { - fifo::op::get_meta_reply reply; - auto iter = bl.cbegin(); - decode(reply, iter); - if (info) *info = std::move(reply.info); - if (part_header_size) *part_header_size = reply.part_header_size; - if (part_entry_overhead) - *part_entry_overhead = reply.part_entry_overhead; - } catch (const cb::error& err) { - ldpp_dout(dpp, -1) - << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " decode failed: " << err.what() - << " tid=" << tid << dendl; - r = from_error_code(err.code()); - } else if (!(probe && (r == -ENOENT || r == -ENODATA))) { - ldpp_dout(dpp, -1) - << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " fifo::op::GET_META failed r=" << r << " tid=" << tid - << dendl; - } - return r; -}; - -namespace { -void update_meta(lr::ObjectWriteOperation* op, const fifo::objv& objv, - const fifo::update& update) -{ - fifo::op::update_meta um; - - um.version = objv; - um.tail_part_num = update.tail_part_num(); - um.head_part_num = update.head_part_num(); - um.min_push_part_num = update.min_push_part_num(); - um.max_push_part_num = update.max_push_part_num(); - um.journal_entries_add = update.journal_entries_add(); - um.journal_entries_rm = update.journal_entries_rm(); - - cb::list in; - encode(um, in); - op->exec(fifo::op::CLASS, fifo::op::UPDATE_META, in); -} - -void part_init(lr::ObjectWriteOperation* op, fifo::data_params params) -{ - fifo::op::init_part ip; - - ip.params = params; - - cb::list in; - encode(ip, in); - op->exec(fifo::op::CLASS, fifo::op::INIT_PART, in); -} - -int push_part(const DoutPrefixProvider *dpp, lr::IoCtx& ioctx, const std::string& oid, - std::deque data_bufs, std::uint64_t tid, - optional_yield y) -{ - lr::ObjectWriteOperation op; - fifo::op::push_part pp; - - op.assert_exists(); - - pp.data_bufs = data_bufs; - pp.total_len = 0; - - for (const auto& bl : data_bufs) - pp.total_len += bl.length(); - - cb::list in; - encode(pp, in); - auto retval = 0; - op.exec(fifo::op::CLASS, fifo::op::PUSH_PART, in, nullptr, &retval); - auto r = rgw_rados_operate(dpp, ioctx, oid, std::move(op), y, lr::OPERATION_RETURNVEC); - if (r < 0) { - ldpp_dout(dpp, -1) - << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " fifo::op::PUSH_PART failed r=" << r - << " tid=" << tid << dendl; - return r; - } - if (retval < 0) { - ldpp_dout(dpp, -1) - << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " error handling response retval=" << retval - << " tid=" << tid << dendl; - } - return retval; -} - -void push_part(lr::IoCtx& ioctx, const std::string& oid, - std::deque data_bufs, std::uint64_t tid, - lr::AioCompletion* c) -{ - lr::ObjectWriteOperation op; - fifo::op::push_part pp; - - pp.data_bufs = data_bufs; - pp.total_len = 0; - - for (const auto& bl : data_bufs) - pp.total_len += bl.length(); - - cb::list in; - encode(pp, in); - op.exec(fifo::op::CLASS, fifo::op::PUSH_PART, in); - auto r = ioctx.aio_operate(oid, c, &op, lr::OPERATION_RETURNVEC); - ceph_assert(r >= 0); -} - -void trim_part(lr::ObjectWriteOperation* op, - std::uint64_t ofs, bool exclusive) -{ - fifo::op::trim_part tp; - - tp.ofs = ofs; - tp.exclusive = exclusive; - - cb::list in; - encode(tp, in); - op->exec(fifo::op::CLASS, fifo::op::TRIM_PART, in); -} - -int list_part(const DoutPrefixProvider *dpp, lr::IoCtx& ioctx, const std::string& oid, - std::uint64_t ofs, std::uint64_t max_entries, - std::vector* entries, - bool* more, bool* full_part, - std::uint64_t tid, optional_yield y) -{ - lr::ObjectReadOperation op; - fifo::op::list_part lp; - - lp.ofs = ofs; - lp.max_entries = max_entries; - - cb::list in; - encode(lp, in); - cb::list bl; - op.exec(fifo::op::CLASS, fifo::op::LIST_PART, in, &bl, nullptr); - auto r = rgw_rados_operate(dpp, ioctx, oid, std::move(op), nullptr, y); - if (r >= 0) try { - fifo::op::list_part_reply reply; - auto iter = bl.cbegin(); - decode(reply, iter); - if (entries) *entries = std::move(reply.entries); - if (more) *more = reply.more; - if (full_part) *full_part = reply.full_part; - } catch (const cb::error& err) { - ldpp_dout(dpp, -1) - << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " decode failed: " << err.what() - << " tid=" << tid << dendl; - r = from_error_code(err.code()); - } else if (r != -ENOENT) { - ldpp_dout(dpp, -1) - << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " fifo::op::LIST_PART failed r=" << r << " tid=" << tid - << dendl; - } - return r; -} - -struct list_entry_completion : public lr::ObjectOperationCompletion { - CephContext* cct; - int* r_out; - std::vector* entries; - bool* more; - bool* full_part; - std::uint64_t tid; - - list_entry_completion(CephContext* cct, int* r_out, std::vector* entries, - bool* more, bool* full_part, std::uint64_t tid) - : cct(cct), r_out(r_out), entries(entries), more(more), - full_part(full_part), tid(tid) {} - virtual ~list_entry_completion() = default; - void handle_completion(int r, bufferlist& bl) override { - if (r >= 0) try { - fifo::op::list_part_reply reply; - auto iter = bl.cbegin(); - decode(reply, iter); - if (entries) *entries = std::move(reply.entries); - if (more) *more = reply.more; - if (full_part) *full_part = reply.full_part; - } catch (const cb::error& err) { - lderr(cct) - << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " decode failed: " << err.what() - << " tid=" << tid << dendl; - r = from_error_code(err.code()); - } else if (r < 0) { - lderr(cct) - << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " fifo::op::LIST_PART failed r=" << r << " tid=" << tid - << dendl; - } - if (r_out) *r_out = r; - } -}; - -lr::ObjectReadOperation list_part(CephContext* cct, - std::uint64_t ofs, - std::uint64_t max_entries, - int* r_out, - std::vector* entries, - bool* more, bool* full_part, - std::uint64_t tid) -{ - lr::ObjectReadOperation op; - fifo::op::list_part lp; - - lp.ofs = ofs; - lp.max_entries = max_entries; - - cb::list in; - encode(lp, in); - op.exec(fifo::op::CLASS, fifo::op::LIST_PART, in, - new list_entry_completion(cct, r_out, entries, more, full_part, - tid)); - return op; -} - -int get_part_info(const DoutPrefixProvider *dpp, lr::IoCtx& ioctx, const std::string& oid, - fifo::part_header* header, - std::uint64_t tid, optional_yield y) -{ - lr::ObjectReadOperation op; - fifo::op::get_part_info gpi; - - cb::list in; - cb::list bl; - encode(gpi, in); - op.exec(fifo::op::CLASS, fifo::op::GET_PART_INFO, in, &bl, nullptr); - auto r = rgw_rados_operate(dpp, ioctx, oid, std::move(op), nullptr, y); - if (r >= 0) try { - fifo::op::get_part_info_reply reply; - auto iter = bl.cbegin(); - decode(reply, iter); - if (header) *header = std::move(reply.header); - } catch (const cb::error& err) { - ldpp_dout(dpp, -1) - << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " decode failed: " << err.what() - << " tid=" << tid << dendl; - r = from_error_code(err.code()); - } else { - ldpp_dout(dpp, -1) - << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " fifo::op::GET_PART_INFO failed r=" << r << " tid=" << tid - << dendl; - } - return r; -} - -struct partinfo_completion : public lr::ObjectOperationCompletion { - CephContext* cct; - int* rp; - fifo::part_header* h; - std::uint64_t tid; - partinfo_completion(CephContext* cct, int* rp, fifo::part_header* h, - std::uint64_t tid) : - cct(cct), rp(rp), h(h), tid(tid) { - } - virtual ~partinfo_completion() = default; - void handle_completion(int r, bufferlist& bl) override { - if (r >= 0) try { - fifo::op::get_part_info_reply reply; - auto iter = bl.cbegin(); - decode(reply, iter); - if (h) *h = std::move(reply.header); - } catch (const cb::error& err) { - r = from_error_code(err.code()); - lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " decode failed: " << err.what() - << " tid=" << tid << dendl; - } else { - lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " fifo::op::GET_PART_INFO failed r=" << r << " tid=" << tid - << dendl; - } - if (rp) { - *rp = r; - } - } -}; - -lr::ObjectReadOperation get_part_info(CephContext* cct, - fifo::part_header* header, - std::uint64_t tid, int* r = 0) -{ - lr::ObjectReadOperation op; - fifo::op::get_part_info gpi; - - cb::list in; - cb::list bl; - encode(gpi, in); - op.exec(fifo::op::CLASS, fifo::op::GET_PART_INFO, in, - new partinfo_completion(cct, r, header, tid)); - return op; -} -} - -std::optional FIFO::to_marker(std::string_view s) -{ - marker m; - if (s.empty()) { - m.num = info.tail_part_num; - m.ofs = 0; - return m; - } - - auto pos = s.find(':'); - if (pos == s.npos) { - return std::nullopt; - } - - auto num = s.substr(0, pos); - auto ofs = s.substr(pos + 1); - - auto n = ceph::parse(num); - if (!n) { - return std::nullopt; - } - m.num = *n; - auto o = ceph::parse(ofs); - if (!o) { - return std::nullopt; - } - m.ofs = *o; - return m; -} - -int FIFO::apply_update(const DoutPrefixProvider *dpp, - fifo::info* info, - const fifo::objv& objv, - const fifo::update& update, - std::uint64_t tid) -{ - ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " entering: tid=" << tid << dendl; - std::unique_lock l(m); - if (objv != info->version) { - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " version mismatch, canceling: tid=" << tid << dendl; - return -ECANCELED; - } - - info->apply_update(update); - return {}; -} - -int FIFO::_update_meta(const DoutPrefixProvider *dpp, const fifo::update& update, - fifo::objv version, bool* pcanceled, - std::uint64_t tid, optional_yield y) -{ - ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " entering: tid=" << tid << dendl; - lr::ObjectWriteOperation op; - bool canceled = false; - update_meta(&op, version, update); - auto r = rgw_rados_operate(dpp, ioctx, oid, std::move(op), y); - if (r >= 0 || r == -ECANCELED) { - canceled = (r == -ECANCELED); - if (!canceled) { - r = apply_update(dpp, &info, version, update, tid); - if (r < 0) canceled = true; - } - if (canceled) { - r = read_meta(dpp, tid, y); - canceled = r < 0 ? false : true; - } - } - if (pcanceled) *pcanceled = canceled; - if (canceled) { - ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " canceled: tid=" << tid << dendl; - } - if (r < 0) { - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " returning error: r=" << r << " tid=" << tid << dendl; - } - return r; -} - -struct Updater : public Completion { - FIFO* fifo; - fifo::update update; - fifo::objv version; - bool reread = false; - bool* pcanceled = nullptr; - std::uint64_t tid; - Updater(const DoutPrefixProvider *dpp, FIFO* fifo, lr::AioCompletion* super, - const fifo::update& update, fifo::objv version, - bool* pcanceled, std::uint64_t tid) - : Completion(dpp, super), fifo(fifo), update(update), version(version), - pcanceled(pcanceled) {} - - void handle(const DoutPrefixProvider *dpp, Ptr&& p, int r) { - ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " entering: tid=" << tid << dendl; - if (reread) - handle_reread(dpp, std::move(p), r); - else - handle_update(dpp, std::move(p), r); - } - - void handle_update(const DoutPrefixProvider *dpp, Ptr&& p, int r) { - ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " handling async update_meta: tid=" - << tid << dendl; - if (r < 0 && r != -ECANCELED) { - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " update failed: r=" << r << " tid=" << tid << dendl; - complete(std::move(p), r); - return; - } - bool canceled = (r == -ECANCELED); - if (!canceled) { - int r = fifo->apply_update(dpp, &fifo->info, version, update, tid); - if (r < 0) { - ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " update failed, marking canceled: r=" << r - << " tid=" << tid << dendl; - canceled = true; - } - } - if (canceled) { - reread = true; - fifo->read_meta(dpp, tid, call(std::move(p))); - return; - } - if (pcanceled) - *pcanceled = false; - ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " completing: tid=" << tid << dendl; - complete(std::move(p), 0); - } - - void handle_reread(const DoutPrefixProvider *dpp, Ptr&& p, int r) { - ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " handling async read_meta: tid=" - << tid << dendl; - if (r < 0 && pcanceled) { - *pcanceled = false; - } else if (r >= 0 && pcanceled) { - *pcanceled = true; - } - if (r < 0) { - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " failed dispatching read_meta: r=" << r << " tid=" - << tid << dendl; - } else { - ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " completing: tid=" << tid << dendl; - } - complete(std::move(p), r); - } -}; - -void FIFO::_update_meta(const DoutPrefixProvider *dpp, const fifo::update& update, - fifo::objv version, bool* pcanceled, - std::uint64_t tid, lr::AioCompletion* c) -{ - ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " entering: tid=" << tid << dendl; - lr::ObjectWriteOperation op; - update_meta(&op, info.version, update); - auto updater = std::make_unique(dpp, this, c, update, version, pcanceled, - tid); - [[maybe_unused]] auto r = - ioctx.aio_operate(oid, Updater::call(std::move(updater)), &op); - assert(r >= 0); -} - -int FIFO::create_part(const DoutPrefixProvider *dpp, int64_t part_num, std::uint64_t tid, - optional_yield y) -{ - ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " entering: tid=" << tid << dendl; - lr::ObjectWriteOperation op; - op.create(false); /* We don't need exclusivity, part_init ensures - we're creating from the same journal entry. */ - std::unique_lock l(m); - part_init(&op, info.params); - auto oid = info.part_oid(part_num); - l.unlock(); - auto r = rgw_rados_operate(dpp, ioctx, oid, std::move(op), y); - if (r < 0) { - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " part_init failed: r=" << r << " tid=" - << tid << dendl; - } - return r; -} - -int FIFO::remove_part(const DoutPrefixProvider *dpp, int64_t part_num, std::uint64_t tid, - optional_yield y) -{ - ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " entering: tid=" << tid << dendl; - lr::ObjectWriteOperation op; - op.remove(); - std::unique_lock l(m); - auto oid = info.part_oid(part_num); - l.unlock(); - auto r = rgw_rados_operate(dpp, ioctx, oid, std::move(op), y); - if (r < 0) { - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " remove failed: r=" << r << " tid=" - << tid << dendl; - } - return r; -} - -int FIFO::process_journal(const DoutPrefixProvider *dpp, std::uint64_t tid, optional_yield y) -{ - ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " entering: tid=" << tid << dendl; - std::vector processed; - - std::unique_lock l(m); - auto tmpjournal = info.journal; - auto new_tail = info.tail_part_num; - auto new_head = info.head_part_num; - auto new_max = info.max_push_part_num; - l.unlock(); - - int r = 0; - for (auto& entry : tmpjournal) { - ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " processing entry: entry=" << entry << " tid=" << tid - << dendl; - switch (entry.op) { - using enum fifo::journal_entry::Op; - case create: - r = create_part(dpp, entry.part_num, tid, y); - if (entry.part_num > new_max) { - new_max = entry.part_num; - } - break; - case set_head: - r = 0; - if (entry.part_num > new_head) { - new_head = entry.part_num; - } - break; - case remove: - r = remove_part(dpp, entry.part_num, tid, y); - if (r == -ENOENT) r = 0; - if (entry.part_num >= new_tail) { - new_tail = entry.part_num + 1; - } - break; - default: - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " unknown journaled op: entry=" << entry << " tid=" - << tid << dendl; - return -EIO; - } - - if (r < 0) { - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " processing entry failed: entry=" << entry - << " r=" << r << " tid=" << tid << dendl; - return -r; - } - - processed.push_back(std::move(entry)); - } - - // Postprocess - bool canceled = true; - - for (auto i = 0; canceled && i < MAX_RACE_RETRIES; ++i) { - ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " postprocessing: i=" << i << " tid=" << tid << dendl; - - std::optional tail_part_num; - std::optional head_part_num; - std::optional max_part_num; - - std::unique_lock l(m); - auto objv = info.version; - if (new_tail > tail_part_num) tail_part_num = new_tail; - if (new_head > info.head_part_num) head_part_num = new_head; - if (new_max > info.max_push_part_num) max_part_num = new_max; - l.unlock(); - - if (processed.empty() && - !tail_part_num && - !max_part_num) { - ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " nothing to update any more: i=" << i << " tid=" - << tid << dendl; - canceled = false; - break; - } - auto u = fifo::update().tail_part_num(tail_part_num) - .head_part_num(head_part_num).max_push_part_num(max_part_num) - .journal_entries_rm(processed); - r = _update_meta(dpp, u, objv, &canceled, tid, y); - if (r < 0) { - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " _update_meta failed: update=" << u - << " r=" << r << " tid=" << tid << dendl; - break; - } - - if (canceled) { - std::vector new_processed; - std::unique_lock l(m); - ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " update canceled, retrying: i=" << i << " tid=" - << tid << dendl; - for (auto& e : processed) { - if (info.journal.contains(e)) { - new_processed.push_back(e); - } - } - processed = std::move(new_processed); - } - } - if (r == 0 && canceled) { - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " canceled too many times, giving up: tid=" << tid << dendl; - r = -ECANCELED; - } - if (r < 0) { - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " failed, r=: " << r << " tid=" << tid << dendl; - } - return r; -} - -int FIFO::_prepare_new_part(const DoutPrefixProvider *dpp, - std::int64_t new_part_num, bool is_head, - std::uint64_t tid, optional_yield y) -{ - ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " entering: tid=" << tid << dendl; - std::unique_lock l(m); - using enum fifo::journal_entry::Op; - std::vector jentries{{ create, new_part_num }}; - if (info.journal.contains({create, new_part_num}) && - (!is_head || info.journal.contains({set_head, new_part_num}))) { - l.unlock(); - ldpp_dout(dpp, 5) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " new part journaled, but not processed: tid=" - << tid << dendl; - auto r = process_journal(dpp, tid, y); - if (r < 0) { - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " process_journal failed: r=" << r << " tid=" << tid << dendl; - } - return r; - } - auto version = info.version; - - if (is_head) { - ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " needs new head: tid=" << tid << dendl; - jentries.push_back({ set_head, new_part_num }); - } - l.unlock(); - - int r = 0; - bool canceled = true; - for (auto i = 0; canceled && i < MAX_RACE_RETRIES; ++i) { - canceled = false; - ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " updating metadata: i=" << i << " tid=" << tid << dendl; - auto u = fifo::update{}.journal_entries_add(jentries); - r = _update_meta(dpp, u, version, &canceled, tid, y); - if (r >= 0 && canceled) { - std::unique_lock l(m); - version = info.version; - auto found = (info.journal.contains({create, new_part_num}) || - info.journal.contains({set_head, new_part_num})); - if ((info.max_push_part_num >= new_part_num && - info.head_part_num >= new_part_num)) { - ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " raced, but journaled and processed: i=" << i - << " tid=" << tid << dendl; - return 0; - } - if (found) { - ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " raced, journaled but not processed: i=" << i - << " tid=" << tid << dendl; - canceled = false; - } - l.unlock(); - } - if (r < 0) { - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " _update_meta failed: update=" << u << " r=" << r - << " tid=" << tid << dendl; - return r; - } - } - if (canceled) { - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " canceled too many times, giving up: tid=" << tid << dendl; - return -ECANCELED; - } - r = process_journal(dpp, tid, y); - if (r < 0) { - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " process_journal failed: r=" << r << " tid=" << tid << dendl; - } - return r; -} - -int FIFO::_prepare_new_head(const DoutPrefixProvider *dpp, - std::int64_t new_head_part_num, - std::uint64_t tid, optional_yield y) -{ - ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " entering: tid=" << tid << dendl; - std::unique_lock l(m); - auto max_push_part_num = info.max_push_part_num; - auto version = info.version; - l.unlock(); - - int r = 0; - if (max_push_part_num < new_head_part_num) { - ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " need new part: tid=" << tid << dendl; - r = _prepare_new_part(dpp, new_head_part_num, true, tid, y); - if (r < 0) { - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " _prepare_new_part failed: r=" << r - << " tid=" << tid << dendl; - return r; - } - std::unique_lock l(m); - if (info.max_push_part_num < new_head_part_num) { - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " inconsistency, push part less than head part: " - << " tid=" << tid << dendl; - return -EIO; - } - l.unlock(); - return 0; - } - - using enum fifo::journal_entry::Op; - fifo::journal_entry jentry; - jentry.op = set_head; - jentry.part_num = new_head_part_num; - - r = 0; - bool canceled = true; - for (auto i = 0; canceled && i < MAX_RACE_RETRIES; ++i) { - canceled = false; - ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " updating metadata: i=" << i << " tid=" << tid << dendl; - auto u = fifo::update{}.journal_entries_add({{ jentry }}); - r = _update_meta(dpp, u, version, &canceled, tid, y); - if (r >= 0 && canceled) { - std::unique_lock l(m); - auto found = (info.journal.contains({create, new_head_part_num}) || - info.journal.contains({set_head, new_head_part_num})); - version = info.version; - if ((info.head_part_num >= new_head_part_num)) { - ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " raced, but journaled and processed: i=" << i - << " tid=" << tid << dendl; - return 0; - } - if (found) { - ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " raced, journaled but not processed: i=" << i - << " tid=" << tid << dendl; - canceled = false; - } - l.unlock(); - } - if (r < 0) { - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " _update_meta failed: update=" << u << " r=" << r - << " tid=" << tid << dendl; - return r; - } - } - if (canceled) { - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " canceled too many times, giving up: tid=" << tid << dendl; - return -ECANCELED; - } - r = process_journal(dpp, tid, y); - if (r < 0) { - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " process_journal failed: r=" << r << " tid=" << tid << dendl; - } - return r; -} - -struct NewPartPreparer : public Completion { - FIFO* f; - std::vector jentries; - int i = 0; - std::int64_t new_part_num; - bool canceled = false; - uint64_t tid; - - NewPartPreparer(const DoutPrefixProvider *dpp, FIFO* f, lr::AioCompletion* super, - std::vector jentries, - std::int64_t new_part_num, - std::uint64_t tid) - : Completion(dpp, super), f(f), jentries(std::move(jentries)), - new_part_num(new_part_num), tid(tid) {} - - void handle(const DoutPrefixProvider *dpp, Ptr&& p, int r) { - ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " entering: tid=" << tid << dendl; - if (r < 0) { - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " _update_meta failed: r=" << r - << " tid=" << tid << dendl; - complete(std::move(p), r); - return; - } - - if (canceled) { - using enum fifo::journal_entry::Op; - std::unique_lock l(f->m); - auto found = (f->info.journal.contains({create, new_part_num}) || - f->info.journal.contains({set_head, new_part_num})); - auto max_push_part_num = f->info.max_push_part_num; - auto head_part_num = f->info.head_part_num; - auto version = f->info.version; - l.unlock(); - if ((max_push_part_num >= new_part_num && - head_part_num >= new_part_num)) { - ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " raced, but journaled and processed: i=" << i - << " tid=" << tid << dendl; - complete(std::move(p), 0); - return; - } - if (i >= MAX_RACE_RETRIES) { - complete(std::move(p), -ECANCELED); - return; - } - if (!found) { - ++i; - f->_update_meta(dpp, fifo::update{} - .journal_entries_add(jentries), - version, &canceled, tid, call(std::move(p))); - return; - } else { - ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " raced, journaled but not processed: i=" << i - << " tid=" << tid << dendl; - canceled = false; - } - // Fall through. We still need to process the journal. - } - f->process_journal(dpp, tid, super()); - return; - } -}; - -void FIFO::_prepare_new_part(const DoutPrefixProvider *dpp, std::int64_t new_part_num, - bool is_head, std::uint64_t tid, lr::AioCompletion* c) -{ - std::unique_lock l(m); - using enum fifo::journal_entry::Op; - std::vector jentries{{create, new_part_num}}; - if (info.journal.contains({create, new_part_num}) && - (!is_head || info.journal.contains({set_head, new_part_num}))) { - l.unlock(); - ldpp_dout(dpp, 5) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " new part journaled, but not processed: tid=" - << tid << dendl; - process_journal(dpp, tid, c); - return; - } - auto version = info.version; - - if (is_head) { - jentries.push_back({ set_head, new_part_num }); - } - l.unlock(); - - auto n = std::make_unique(dpp, this, c, jentries, - new_part_num, tid); - auto np = n.get(); - _update_meta(dpp, fifo::update{}.journal_entries_add(jentries), version, - &np->canceled, tid, NewPartPreparer::call(std::move(n))); -} - -struct NewHeadPreparer : public Completion { - FIFO* f; - int i = 0; - bool newpart; - std::int64_t new_head_part_num; - bool canceled = false; - std::uint64_t tid; - - NewHeadPreparer(const DoutPrefixProvider *dpp, FIFO* f, lr::AioCompletion* super, - bool newpart, std::int64_t new_head_part_num, - std::uint64_t tid) - : Completion(dpp, super), f(f), newpart(newpart), - new_head_part_num(new_head_part_num), tid(tid) {} - - void handle(const DoutPrefixProvider *dpp, Ptr&& p, int r) { - if (newpart) - handle_newpart(std::move(p), r); - else - handle_update(dpp, std::move(p), r); - } - - void handle_newpart(Ptr&& p, int r) { - if (r < 0) { - lderr(f->cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " _prepare_new_part failed: r=" << r - << " tid=" << tid << dendl; - complete(std::move(p), r); - return; - } - std::unique_lock l(f->m); - if (f->info.max_push_part_num < new_head_part_num) { - l.unlock(); - lderr(f->cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " _prepare_new_part failed: r=" << r - << " tid=" << tid << dendl; - complete(std::move(p), -EIO); - } else { - l.unlock(); - complete(std::move(p), 0); - } - } - - void handle_update(const DoutPrefixProvider *dpp, Ptr&& p, int r) { - ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " entering: tid=" << tid << dendl; - if (r < 0) { - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " _update_meta failed: r=" << r - << " tid=" << tid << dendl; - complete(std::move(p), r); - return; - } - - if (canceled) { - using enum fifo::journal_entry::Op; - std::unique_lock l(f->m); - auto found = (f->info.journal.contains({create, new_head_part_num }) || - f->info.journal.contains({set_head, new_head_part_num })); - auto head_part_num = f->info.head_part_num; - auto version = f->info.version; - - l.unlock(); - if ((head_part_num >= new_head_part_num)) { - ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " raced, but journaled and processed: i=" << i - << " tid=" << tid << dendl; - complete(std::move(p), 0); - return; - } - if (i >= MAX_RACE_RETRIES) { - complete(std::move(p), -ECANCELED); - return; - } - if (!found) { - ++i; - fifo::journal_entry jentry; - jentry.op = set_head; - jentry.part_num = new_head_part_num; - f->_update_meta(dpp, fifo::update{} - .journal_entries_add({{jentry}}), - version, &canceled, tid, call(std::move(p))); - return; - } else { - ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " raced, journaled but not processed: i=" << i - << " tid=" << tid << dendl; - canceled = false; - } - // Fall through. We still need to process the journal. - } - f->process_journal(dpp, tid, super()); - return; - } -}; - -void FIFO::_prepare_new_head(const DoutPrefixProvider *dpp, std::int64_t new_head_part_num, - std::uint64_t tid, lr::AioCompletion* c) -{ - ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " entering: tid=" << tid << dendl; - std::unique_lock l(m); - auto max_push_part_num = info.max_push_part_num; - auto version = info.version; - l.unlock(); - - if (max_push_part_num < new_head_part_num) { - ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " need new part: tid=" << tid << dendl; - auto n = std::make_unique(dpp, this, c, true, new_head_part_num, - tid); - _prepare_new_part(dpp, new_head_part_num, true, tid, - NewHeadPreparer::call(std::move(n))); - } else { - ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " updating head: tid=" << tid << dendl; - auto n = std::make_unique(dpp, this, c, false, new_head_part_num, - tid); - auto np = n.get(); - using enum fifo::journal_entry::Op; - fifo::journal_entry jentry; - jentry.op = set_head; - jentry.part_num = new_head_part_num; - _update_meta(dpp, fifo::update{}.journal_entries_add({{jentry}}), version, - &np->canceled, tid, NewHeadPreparer::call(std::move(n))); - } -} - -int FIFO::push_entries(const DoutPrefixProvider *dpp, const std::deque& data_bufs, - std::uint64_t tid, optional_yield y) -{ - ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " entering: tid=" << tid << dendl; - std::unique_lock l(m); - auto head_part_num = info.head_part_num; - const auto part_oid = info.part_oid(head_part_num); - l.unlock(); - - auto r = push_part(dpp, ioctx, part_oid, data_bufs, tid, y); - if (r < 0) { - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " push_part failed: r=" << r << " tid=" << tid << dendl; - } - return r; -} - -void FIFO::push_entries(const std::deque& data_bufs, - std::uint64_t tid, lr::AioCompletion* c) -{ - std::unique_lock l(m); - auto head_part_num = info.head_part_num; - const auto part_oid = info.part_oid(head_part_num); - l.unlock(); - - push_part(ioctx, part_oid, data_bufs, tid, c); -} - -int FIFO::trim_part(const DoutPrefixProvider *dpp, int64_t part_num, uint64_t ofs, - bool exclusive, std::uint64_t tid, - optional_yield y) -{ - ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " entering: tid=" << tid << dendl; - lr::ObjectWriteOperation op; - std::unique_lock l(m); - const auto part_oid = info.part_oid(part_num); - l.unlock(); - rgw::cls::fifo::trim_part(&op, ofs, exclusive); - auto r = rgw_rados_operate(dpp, ioctx, part_oid, std::move(op), y); - if (r < 0) { - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " trim_part failed: r=" << r << " tid=" << tid << dendl; - } - return 0; -} - -void FIFO::trim_part(const DoutPrefixProvider *dpp, int64_t part_num, uint64_t ofs, - bool exclusive, std::uint64_t tid, - lr::AioCompletion* c) -{ - ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " entering: tid=" << tid << dendl; - lr::ObjectWriteOperation op; - std::unique_lock l(m); - const auto part_oid = info.part_oid(part_num); - l.unlock(); - rgw::cls::fifo::trim_part(&op, ofs, exclusive); - auto r = ioctx.aio_operate(part_oid, c, &op); - ceph_assert(r >= 0); -} - -int FIFO::open(const DoutPrefixProvider *dpp, lr::IoCtx ioctx, std::string oid, std::unique_ptr* fifo, - optional_yield y, std::optional objv, - bool probe) -{ - ldpp_dout(dpp, 20) - << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " entering" << dendl; - fifo::info info; - std::uint32_t size; - std::uint32_t over; - int r = get_meta(dpp, ioctx, oid, objv, &info, &size, &over, 0, y, - probe); - if (r < 0) { - if (!(probe && (r == -ENOENT || r == -ENODATA))) { - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " get_meta failed: r=" << r << dendl; - } - return r; - } - std::unique_ptr f(new FIFO(std::move(ioctx), oid)); - f->info = info; - f->part_header_size = size; - f->part_entry_overhead = over; - // If there are journal entries, process them, in case - // someone crashed mid-transaction. - if (!info.journal.empty()) { - ldpp_dout(dpp, 20) - << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " processing leftover journal" << dendl; - r = f->process_journal(dpp, 0, y); - if (r < 0) { - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " process_journal failed: r=" << r << dendl; - return r; - } - } - *fifo = std::move(f); - return 0; -} - -int FIFO::create(const DoutPrefixProvider *dpp, lr::IoCtx ioctx, std::string oid, std::unique_ptr* fifo, - optional_yield y, std::optional objv, - std::optional oid_prefix, - bool exclusive, std::uint64_t max_part_size, - std::uint64_t max_entry_size) -{ - ldpp_dout(dpp, 20) - << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " entering" << dendl; - lr::ObjectWriteOperation op; - create_meta(&op, oid, objv, oid_prefix, exclusive, max_part_size, - max_entry_size); - auto r = rgw_rados_operate(dpp, ioctx, oid, std::move(op), y); - if (r < 0) { - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " create_meta failed: r=" << r << dendl; - return r; - } - r = open(dpp, std::move(ioctx), std::move(oid), fifo, y, objv); - return r; -} - -int FIFO::read_meta(const DoutPrefixProvider *dpp, std::uint64_t tid, optional_yield y) { - ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " entering: tid=" << tid << dendl; - fifo::info _info; - std::uint32_t _phs; - std::uint32_t _peo; - - auto r = get_meta(dpp, ioctx, oid, std::nullopt, &_info, &_phs, &_peo, tid, y); - if (r < 0) { - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " get_meta failed: r=" << r << " tid=" << tid << dendl; - return r; - } - std::unique_lock l(m); - // We have a newer version already! - if (_info.version.same_or_later(this->info.version)) { - info = std::move(_info); - part_header_size = _phs; - part_entry_overhead = _peo; - } - return 0; -} - -int FIFO::read_meta(const DoutPrefixProvider *dpp, optional_yield y) { - std::unique_lock l(m); - auto tid = ++next_tid; - l.unlock(); - return read_meta(dpp, tid, y); -} - -struct Reader : public Completion { - FIFO* fifo; - cb::list bl; - std::uint64_t tid; - Reader(const DoutPrefixProvider *dpp, FIFO* fifo, lr::AioCompletion* super, std::uint64_t tid) - : Completion(dpp, super), fifo(fifo), tid(tid) {} - - void handle(const DoutPrefixProvider *dpp, Ptr&& p, int r) { - ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " entering: tid=" << tid << dendl; - if (r >= 0) try { - fifo::op::get_meta_reply reply; - auto iter = bl.cbegin(); - decode(reply, iter); - std::unique_lock l(fifo->m); - if (reply.info.version.same_or_later(fifo->info.version)) { - fifo->info = std::move(reply.info); - fifo->part_header_size = reply.part_header_size; - fifo->part_entry_overhead = reply.part_entry_overhead; - } - } catch (const cb::error& err) { - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " failed to decode response err=" << err.what() - << " tid=" << tid << dendl; - r = from_error_code(err.code()); - } else { - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " read_meta failed r=" << r - << " tid=" << tid << dendl; - } - complete(std::move(p), r); - } -}; - -void FIFO::read_meta(const DoutPrefixProvider *dpp, std::uint64_t tid, lr::AioCompletion* c) -{ - ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " entering: tid=" << tid << dendl; - lr::ObjectReadOperation op; - fifo::op::get_meta gm; - cb::list in; - encode(gm, in); - auto reader = std::make_unique(dpp, this, c, tid); - auto rp = reader.get(); - [[maybe_unused]] auto r = ioctx.aio_exec( - oid, Reader::call(std::move(reader)), fifo::op::CLASS, fifo::op::GET_META, - in, &rp->bl); - assert(r >= 0); -} - -const fifo::info& FIFO::meta() const { - return info; -} - -std::pair FIFO::get_part_layout_info() const { - return {part_header_size, part_entry_overhead}; -} - -int FIFO::push(const DoutPrefixProvider *dpp, const cb::list& bl, optional_yield y) { - return push(dpp, std::vector{ bl }, y); -} - -void FIFO::push(const DoutPrefixProvider *dpp, const cb::list& bl, lr::AioCompletion* c) { - push(dpp, std::vector{ bl }, c); -} - -int FIFO::push(const DoutPrefixProvider *dpp, const std::vector& data_bufs, optional_yield y) -{ - std::unique_lock l(m); - auto tid = ++next_tid; - auto max_entry_size = info.params.max_entry_size; - auto need_new_head = info.need_new_head(); - auto head_part_num = info.head_part_num; - l.unlock(); - ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " entering: tid=" << tid << dendl; - if (data_bufs.empty()) { - ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " empty push, returning success tid=" << tid << dendl; - return 0; - } - - // Validate sizes - for (const auto& bl : data_bufs) { - if (bl.length() > max_entry_size) { - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " entry bigger than max_entry_size tid=" << tid << dendl; - return -E2BIG; - } - } - - int r = 0; - if (need_new_head) { - ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " need new head tid=" << tid << dendl; - r = _prepare_new_head(dpp, head_part_num + 1, tid, y); - if (r < 0) { - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " _prepare_new_head failed: r=" << r - << " tid=" << tid << dendl; - return r; - } - } - - std::deque remaining(data_bufs.begin(), data_bufs.end()); - std::deque batch; - - uint64_t batch_len = 0; - auto retries = 0; - bool canceled = true; - while ((!remaining.empty() || !batch.empty()) && - (retries <= MAX_RACE_RETRIES)) { - ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " preparing push: remaining=" << remaining.size() - << " batch=" << batch.size() << " retries=" << retries - << " tid=" << tid << dendl; - std::unique_lock l(m); - head_part_num = info.head_part_num; - auto max_part_size = info.params.max_part_size; - auto overhead = part_entry_overhead; - l.unlock(); - - while (!remaining.empty() && - (remaining.front().length() + batch_len <= max_part_size)) { - /* We can send entries with data_len up to max_entry_size, - however, we want to also account the overhead when - dealing with multiple entries. Previous check doesn't - account for overhead on purpose. */ - batch_len += remaining.front().length() + overhead; - batch.push_back(std::move(remaining.front())); - remaining.pop_front(); - } - ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " prepared push: remaining=" << remaining.size() - << " batch=" << batch.size() << " retries=" << retries - << " batch_len=" << batch_len - << " tid=" << tid << dendl; - - auto r = push_entries(dpp, batch, tid, y); - if (r == -ERANGE) { - canceled = true; - ++retries; - ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " need new head tid=" << tid << dendl; - r = _prepare_new_head(dpp, head_part_num + 1, tid, y); - if (r < 0) { - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " prepare_new_head failed: r=" << r - << " tid=" << tid << dendl; - return r; - } - r = 0; - continue; - } - if (r == -ENOENT) { - ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " racing client trimmed part, rereading metadata " - << "tid=" << tid << dendl; - canceled = true; - ++retries; - r = read_meta(dpp, y); - if (r < 0) { - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " read_meta failed: r=" << r - << " tid=" << tid << dendl; - return r; - } - r = 0; - continue; - } - if (r < 0) { - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " push_entries failed: r=" << r - << " tid=" << tid << dendl; - return r; - } - // Made forward progress! - canceled = false; - retries = 0; - batch_len = 0; - if (r == ssize(batch)) { - batch.clear(); - } else { - batch.erase(batch.begin(), batch.begin() + r); - for (const auto& b : batch) { - batch_len += b.length() + part_entry_overhead; - } - } - } - if (canceled) { - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " canceled too many times, giving up: tid=" << tid << dendl; - return -ECANCELED; - } - return 0; -} - -struct Pusher : public Completion { - FIFO* f; - std::deque remaining; - std::deque batch; - int i = 0; - std::int64_t head_part_num; - std::uint64_t tid; - enum { pushing, new_heading, meta_reading } state = pushing; - - void prep_then_push(const DoutPrefixProvider *dpp, Ptr&& p, const unsigned successes) { - std::unique_lock l(f->m); - auto max_part_size = f->info.params.max_part_size; - auto part_entry_overhead = f->part_entry_overhead; - head_part_num = f->info.head_part_num; - l.unlock(); - - ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " preparing push: remaining=" << remaining.size() - << " batch=" << batch.size() << " i=" << i - << " tid=" << tid << dendl; - - uint64_t batch_len = 0; - if (successes > 0) { - if (successes == batch.size()) { - batch.clear(); - } else { - batch.erase(batch.begin(), batch.begin() + successes); - for (const auto& b : batch) { - batch_len += b.length() + part_entry_overhead; - } - } - } - - if (batch.empty() && remaining.empty()) { - complete(std::move(p), 0); - return; - } - - while (!remaining.empty() && - (remaining.front().length() + batch_len <= max_part_size)) { - - /* We can send entries with data_len up to max_entry_size, - however, we want to also account the overhead when - dealing with multiple entries. Previous check doesn't - account for overhead on purpose. */ - batch_len += remaining.front().length() + part_entry_overhead; - batch.push_back(std::move(remaining.front())); - remaining.pop_front(); - } - ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " prepared push: remaining=" << remaining.size() - << " batch=" << batch.size() << " i=" << i - << " batch_len=" << batch_len - << " tid=" << tid << dendl; - push(std::move(p)); - } - - void push(Ptr&& p) { - f->push_entries(batch, tid, call(std::move(p))); - } - - void new_head(const DoutPrefixProvider *dpp, Ptr&& p) { - state = new_heading; - f->_prepare_new_head(dpp, head_part_num + 1, tid, call(std::move(p))); - } - - void read_meta(const DoutPrefixProvider *dpp, Ptr&& p) { - ++i; - state = meta_reading; - f->read_meta(dpp, tid, call(std::move(p))); - } - - void handle(const DoutPrefixProvider *dpp, Ptr&& p, int r) { - switch (state) { - case pushing: - if (r == -ERANGE) { - ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " need new head tid=" << tid << dendl; - new_head(dpp, std::move(p)); - return; - } - if (r == -ENOENT) { - if (i > MAX_RACE_RETRIES) { - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " racing client deleted part, but we're out" - << " of retries: tid=" << tid << dendl; - complete(std::move(p), r); - } - ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " racing client deleted part: tid=" << tid << dendl; - read_meta(dpp, std::move(p)); - return; - } - if (r < 0) { - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " push_entries failed: r=" << r - << " tid=" << tid << dendl; - complete(std::move(p), r); - return; - } - i = 0; // We've made forward progress, so reset the race counter! - prep_then_push(dpp, std::move(p), r); - break; - - case new_heading: - if (r < 0) { - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " prepare_new_head failed: r=" << r - << " tid=" << tid << dendl; - complete(std::move(p), r); - return; - } - state = pushing; - handle_new_head(dpp, std::move(p), r); - break; - - case meta_reading: - if (r < 0) { - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " read_meta failed: r=" << r - << " tid=" << tid << dendl; - complete(std::move(p), r); - return; - } - state = pushing; - prep_then_push(dpp, std::move(p), r); - break; - } - } - - void handle_new_head(const DoutPrefixProvider *dpp, Ptr&& p, int r) { - if (r == -ECANCELED) { - if (p->i == MAX_RACE_RETRIES) { - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " canceled too many times, giving up: tid=" << tid << dendl; - complete(std::move(p), -ECANCELED); - return; - } - ++p->i; - } else if (r) { - complete(std::move(p), r); - return; - } - - if (p->batch.empty()) { - prep_then_push(dpp, std::move(p), 0); - return; - } else { - push(std::move(p)); - return; - } - } - - Pusher(const DoutPrefixProvider *dpp, FIFO* f, std::deque&& remaining, - std::int64_t head_part_num, std::uint64_t tid, - lr::AioCompletion* super) - : Completion(dpp, super), f(f), remaining(std::move(remaining)), - head_part_num(head_part_num), tid(tid) {} -}; - -void FIFO::push(const DoutPrefixProvider *dpp, const std::vector& data_bufs, - lr::AioCompletion* c) -{ - std::unique_lock l(m); - auto tid = ++next_tid; - auto max_entry_size = info.params.max_entry_size; - auto need_new_head = info.need_new_head(); - auto head_part_num = info.head_part_num; - l.unlock(); - ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " entering: tid=" << tid << dendl; - auto p = std::make_unique(dpp, this, std::deque(data_bufs.begin(), data_bufs.end()), - head_part_num, tid, c); - // Validate sizes - for (const auto& bl : data_bufs) { - if (bl.length() > max_entry_size) { - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " entry bigger than max_entry_size tid=" << tid << dendl; - Pusher::complete(std::move(p), -E2BIG); - return; - } - } - - if (data_bufs.empty() ) { - ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " empty push, returning success tid=" << tid << dendl; - Pusher::complete(std::move(p), 0); - return; - } - - if (need_new_head) { - ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " need new head tid=" << tid << dendl; - p->new_head(dpp, std::move(p)); - } else { - p->prep_then_push(dpp, std::move(p), 0); - } -} - -int FIFO::list(const DoutPrefixProvider *dpp, int max_entries, - std::optional markstr, - std::vector* presult, bool* pmore, - optional_yield y) -{ - std::unique_lock l(m); - auto tid = ++next_tid; - std::int64_t part_num = info.tail_part_num; - l.unlock(); - ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " entering: tid=" << tid << dendl; - std::uint64_t ofs = 0; - if (markstr) { - auto marker = to_marker(*markstr); - if (!marker) { - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " invalid marker string: " << markstr - << " tid= "<< tid << dendl; - return -EINVAL; - } - part_num = marker->num; - ofs = marker->ofs; - } - - std::vector result; - result.reserve(max_entries); - bool more = false; - - std::vector entries; - int r = 0; - while (max_entries > 0) { - ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " max_entries=" << max_entries << " tid=" << tid << dendl; - bool part_more = false; - bool part_full = false; - - std::unique_lock l(m); - auto part_oid = info.part_oid(part_num); - l.unlock(); - - r = list_part(dpp, ioctx, part_oid, ofs, max_entries, &entries, - &part_more, &part_full, tid, y); - if (r == -ENOENT) { - ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " missing part, rereading metadata" - << " tid= "<< tid << dendl; - r = read_meta(dpp, tid, y); - if (r < 0) { - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " read_meta failed: r=" << r - << " tid= "<< tid << dendl; - return r; - } - if (part_num < info.tail_part_num) { - /* raced with trim? restart */ - ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " raced with trim, restarting: tid=" << tid << dendl; - max_entries += result.size(); - result.clear(); - std::unique_lock l(m); - part_num = info.tail_part_num; - l.unlock(); - ofs = 0; - continue; - } - ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " assuming part was not written yet, so end of data: " - << "tid=" << tid << dendl; - more = false; - r = 0; - break; - } - if (r < 0) { - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " list_entries failed: r=" << r - << " tid= "<< tid << dendl; - return r; - } - more = part_full || part_more; - for (auto& entry : entries) { - list_entry e; - e.data = std::move(entry.data); - e.marker = marker{part_num, entry.ofs}.to_string(); - e.mtime = entry.mtime; - result.push_back(std::move(e)); - --max_entries; - if (max_entries == 0) - break; - } - entries.clear(); - if (max_entries > 0 && - part_more) { - } - - if (!part_full) { - ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " head part is not full, so we can assume we're done: " - << "tid=" << tid << dendl; - break; - } - if (!part_more) { - ++part_num; - ofs = 0; - } - } - if (presult) - *presult = std::move(result); - if (pmore) - *pmore = more; - return 0; -} - -int FIFO::trim(const DoutPrefixProvider *dpp, std::string_view markstr, bool exclusive, optional_yield y) -{ - bool overshoot = false; - auto marker = to_marker(markstr); - if (!marker) { - return -EINVAL; - } - auto part_num = marker->num; - auto ofs = marker->ofs; - std::unique_lock l(m); - auto tid = ++next_tid; - auto hn = info.head_part_num; - const auto max_part_size = info.params.max_part_size; - if (part_num > hn) { - l.unlock(); - auto r = read_meta(dpp, tid, y); - if (r < 0) { - return r; - } - l.lock(); - auto hn = info.head_part_num; - if (part_num > hn) { - overshoot = true; - part_num = hn; - ofs = max_part_size; - } - } - if (part_num < info.tail_part_num) { - return -ENODATA; - } - auto pn = info.tail_part_num; - l.unlock(); - ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " entering: tid=" << tid << dendl; - - int r = 0; - while (pn < part_num) { - ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " pn=" << pn << " tid=" << tid << dendl; - std::unique_lock l(m); - l.unlock(); - r = trim_part(dpp, pn, max_part_size, false, tid, y); - if (r < 0 && r == -ENOENT) { - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " trim_part failed: r=" << r - << " tid= "<< tid << dendl; - return r; - } - ++pn; - } - r = trim_part(dpp, part_num, ofs, exclusive, tid, y); - if (r < 0 && r != -ENOENT) { - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " trim_part failed: r=" << r - << " tid= "<< tid << dendl; - return r; - } - - l.lock(); - auto tail_part_num = info.tail_part_num; - auto objv = info.version; - l.unlock(); - bool canceled = tail_part_num < part_num; - int retries = 0; - while ((tail_part_num < part_num) && - canceled && - (retries <= MAX_RACE_RETRIES)) { - r = _update_meta(dpp, fifo::update{}.tail_part_num(part_num), objv, &canceled, - tid, y); - if (r < 0) { - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " _update_meta failed: r=" << r - << " tid= "<< tid << dendl; - return r; - } - if (canceled) { - ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " canceled: retries=" << retries - << " tid=" << tid << dendl; - l.lock(); - tail_part_num = info.tail_part_num; - objv = info.version; - l.unlock(); - ++retries; - } - } - if (canceled) { - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " canceled too many times, giving up: tid=" << tid << dendl; - return -EIO; - } - return overshoot ? -ENODATA : 0; -} - -struct Trimmer : public Completion { - FIFO* fifo; - std::int64_t part_num; - std::uint64_t ofs; - std::int64_t pn; - bool exclusive; - std::uint64_t tid; - bool update = false; - bool reread = false; - bool canceled = false; - bool overshoot = false; - int retries = 0; - - Trimmer(const DoutPrefixProvider *dpp, FIFO* fifo, std::int64_t part_num, std::uint64_t ofs, std::int64_t pn, - bool exclusive, lr::AioCompletion* super, std::uint64_t tid) - : Completion(dpp, super), fifo(fifo), part_num(part_num), ofs(ofs), pn(pn), - exclusive(exclusive), tid(tid) {} - - void handle(const DoutPrefixProvider *dpp, Ptr&& p, int r) { - ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " entering: tid=" << tid << dendl; - - if (reread) { - reread = false; - if (r < 0) { - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " read_meta failed: r=" - << r << " tid=" << tid << dendl; - complete(std::move(p), r); - return; - } - std::unique_lock l(fifo->m); - auto hn = fifo->info.head_part_num; - const auto max_part_size = fifo->info.params.max_part_size; - const auto tail_part_num = fifo->info.tail_part_num; - l.unlock(); - if (part_num > hn) { - part_num = hn; - ofs = max_part_size; - overshoot = true; - } - if (part_num < tail_part_num) { - complete(std::move(p), -ENODATA); - return; - } - pn = tail_part_num; - if (pn < part_num) { - ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " pn=" << pn << " tid=" << tid << dendl; - fifo->trim_part(dpp, pn++, max_part_size, false, tid, - call(std::move(p))); - } else { - update = true; - canceled = tail_part_num < part_num; - fifo->trim_part(dpp, part_num, ofs, exclusive, tid, call(std::move(p))); - } - return; - } - - if (r == -ENOENT) { - r = 0; - } - - if (r < 0) { - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << (update ? " update_meta " : " trim ") << "failed: r=" - << r << " tid=" << tid << dendl; - complete(std::move(p), r); - return; - } - - if (!update) { - ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " handling preceding trim callback: tid=" << tid << dendl; - retries = 0; - if (pn < part_num) { - ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " pn=" << pn << " tid=" << tid << dendl; - std::unique_lock l(fifo->m); - const auto max_part_size = fifo->info.params.max_part_size; - l.unlock(); - fifo->trim_part(dpp, pn++, max_part_size, false, tid, - call(std::move(p))); - return; - } - - std::unique_lock l(fifo->m); - const auto tail_part_num = fifo->info.tail_part_num; - l.unlock(); - update = true; - canceled = tail_part_num < part_num; - fifo->trim_part(dpp, part_num, ofs, exclusive, tid, call(std::move(p))); - return; - } - - ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " handling update-needed callback: tid=" << tid << dendl; - std::unique_lock l(fifo->m); - auto tail_part_num = fifo->info.tail_part_num; - auto objv = fifo->info.version; - l.unlock(); - if ((tail_part_num < part_num) && - canceled) { - if (retries > MAX_RACE_RETRIES) { - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " canceled too many times, giving up: tid=" << tid << dendl; - complete(std::move(p), -EIO); - return; - } - ++retries; - fifo->_update_meta(dpp, fifo::update{} - .tail_part_num(part_num), objv, &canceled, - tid, call(std::move(p))); - } else { - complete(std::move(p), overshoot ? -ENODATA : 0); - } - } -}; - -void FIFO::trim(const DoutPrefixProvider *dpp, std::string_view markstr, bool exclusive, - lr::AioCompletion* c) { - auto marker = to_marker(markstr); - auto realmark = marker.value_or(::rgw::cls::fifo::marker{}); - std::unique_lock l(m); - const auto hn = info.head_part_num; - const auto max_part_size = info.params.max_part_size; - const auto pn = info.tail_part_num; - const auto part_oid = info.part_oid(pn); - auto tid = ++next_tid; - l.unlock(); - ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " entering: tid=" << tid << dendl; - auto trimmer = std::make_unique(dpp, this, realmark.num, realmark.ofs, - pn, exclusive, c, tid); - if (!marker) { - Trimmer::complete(std::move(trimmer), -EINVAL); - return; - } - ++trimmer->pn; - auto ofs = marker->ofs; - if (marker->num > hn) { - trimmer->reread = true; - read_meta(dpp, tid, Trimmer::call(std::move(trimmer))); - return; - } - if (pn < marker->num) { - ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " pn=" << pn << " tid=" << tid << dendl; - ofs = max_part_size; - } else { - trimmer->update = true; - } - trim_part(dpp, pn, ofs, exclusive, tid, Trimmer::call(std::move(trimmer))); -} - -int FIFO::get_part_info(const DoutPrefixProvider *dpp, int64_t part_num, - fifo::part_header* header, - optional_yield y) -{ - std::unique_lock l(m); - const auto part_oid = info.part_oid(part_num); - auto tid = ++next_tid; - l.unlock(); - auto r = rgw::cls::fifo::get_part_info(dpp, ioctx, part_oid, header, tid, y); - if (r < 0) { - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " get_part_info failed: r=" - << r << " tid=" << tid << dendl; - } - return r; -} - -void FIFO::get_part_info(int64_t part_num, - fifo::part_header* header, - lr::AioCompletion* c) -{ - std::unique_lock l(m); - const auto part_oid = info.part_oid(part_num); - auto tid = ++next_tid; - l.unlock(); - auto op = rgw::cls::fifo::get_part_info(cct, header, tid); - auto r = ioctx.aio_operate(part_oid, c, &op, nullptr); - ceph_assert(r >= 0); -} - -struct InfoGetter : Completion { - FIFO* fifo; - fifo::part_header header; - fu2::function f; - std::uint64_t tid; - bool headerread = false; - - InfoGetter(const DoutPrefixProvider *dpp, FIFO* fifo, fu2::function f, - std::uint64_t tid, lr::AioCompletion* super) - : Completion(dpp, super), fifo(fifo), f(std::move(f)), tid(tid) {} - void handle(const DoutPrefixProvider *dpp, Ptr&& p, int r) { - if (!headerread) { - if (r < 0) { - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " read_meta failed: r=" - << r << " tid=" << tid << dendl; - if (f) - f(r, {}); - complete(std::move(p), r); - return; - } - - auto info = fifo->meta(); - auto hpn = info.head_part_num; - if (hpn < 0) { - ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " no head, returning empty partinfo r=" - << r << " tid=" << tid << dendl; - if (f) - f(0, {}); - complete(std::move(p), r); - return; - } - headerread = true; - auto op = rgw::cls::fifo::get_part_info(fifo->cct, &header, tid); - std::unique_lock l(fifo->m); - auto oid = fifo->info.part_oid(hpn); - l.unlock(); - r = fifo->ioctx.aio_operate(oid, call(std::move(p)), &op, - nullptr); - ceph_assert(r >= 0); - return; - } - - if (r < 0) { - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " get_part_info failed: r=" - << r << " tid=" << tid << dendl; - } - - if (f) - f(r, std::move(header)); - complete(std::move(p), r); - return; - } -}; - -void FIFO::get_head_info(const DoutPrefixProvider *dpp, fu2::unique_function f, - lr::AioCompletion* c) -{ - std::unique_lock l(m); - auto tid = ++next_tid; - l.unlock(); - auto ig = std::make_unique(dpp, this, std::move(f), tid, c); - read_meta(dpp, tid, InfoGetter::call(std::move(ig))); -} - -struct JournalProcessor : public Completion { -private: - FIFO* const fifo; - - std::vector processed; - decltype(fifo->info.journal) journal; - decltype(journal)::iterator iter; - std::int64_t new_tail; - std::int64_t new_head; - std::int64_t new_max; - int race_retries = 0; - bool first_pp = true; - bool canceled = false; - std::uint64_t tid; - - enum { - entry_callback, - pp_callback, - } state; - - void create_part(const DoutPrefixProvider *dpp, Ptr&& p, int64_t part_num) { - ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " entering: tid=" << tid << dendl; - state = entry_callback; - lr::ObjectWriteOperation op; - op.create(false); /* We don't need exclusivity, part_init ensures - we're creating from the same journal entry. */ - std::unique_lock l(fifo->m); - part_init(&op, fifo->info.params); - auto oid = fifo->info.part_oid(part_num); - l.unlock(); - auto r = fifo->ioctx.aio_operate(oid, call(std::move(p)), &op); - ceph_assert(r >= 0); - return; - } - - void remove_part(const DoutPrefixProvider *dpp, Ptr&& p, int64_t part_num) { - ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " entering: tid=" << tid << dendl; - state = entry_callback; - lr::ObjectWriteOperation op; - op.remove(); - std::unique_lock l(fifo->m); - auto oid = fifo->info.part_oid(part_num); - l.unlock(); - auto r = fifo->ioctx.aio_operate(oid, call(std::move(p)), &op); - ceph_assert(r >= 0); - return; - } - - void finish_je(const DoutPrefixProvider *dpp, Ptr&& p, int r, - const fifo::journal_entry& entry) { - ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " entering: tid=" << tid << dendl; - - ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " finishing entry: entry=" << entry - << " tid=" << tid << dendl; - - using enum fifo::journal_entry::Op; - if (entry.op == remove && r == -ENOENT) - r = 0; - - if (r < 0) { - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " processing entry failed: entry=" << entry - << " r=" << r << " tid=" << tid << dendl; - complete(std::move(p), r); - return; - } else { - switch (entry.op) { - case unknown: - case set_head: - // Can't happen. Filtered out in process. - complete(std::move(p), -EIO); - return; - - case create: - if (entry.part_num > new_max) { - new_max = entry.part_num; - } - break; - case remove: - if (entry.part_num >= new_tail) { - new_tail = entry.part_num + 1; - } - break; - } - processed.push_back(entry); - } - ++iter; - process(dpp, std::move(p)); - } - - void postprocess(const DoutPrefixProvider *dpp, Ptr&& p) { - ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " entering: tid=" << tid << dendl; - if (processed.empty()) { - ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " nothing to update any more: race_retries=" - << race_retries << " tid=" << tid << dendl; - complete(std::move(p), 0); - return; - } - pp_run(dpp, std::move(p), 0, false); - } - -public: - - JournalProcessor(const DoutPrefixProvider *dpp, FIFO* fifo, std::uint64_t tid, lr::AioCompletion* super) - : Completion(dpp, super), fifo(fifo), tid(tid) { - std::unique_lock l(fifo->m); - journal = fifo->info.journal; - iter = journal.begin(); - new_tail = fifo->info.tail_part_num; - new_head = fifo->info.head_part_num; - new_max = fifo->info.max_push_part_num; - } - - void pp_run(const DoutPrefixProvider *dpp, Ptr&& p, int r, bool canceled) { - ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " entering: tid=" << tid << dendl; - std::optional tail_part_num; - std::optional head_part_num; - std::optional max_part_num; - - if (r < 0) { - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " failed, r=: " << r << " tid=" << tid << dendl; - complete(std::move(p), r); - } - - - ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " postprocessing: race_retries=" - << race_retries << " tid=" << tid << dendl; - - if (!first_pp && r == 0 && !canceled) { - ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " nothing to update any more: race_retries=" - << race_retries << " tid=" << tid << dendl; - complete(std::move(p), 0); - return; - } - - first_pp = false; - - if (canceled) { - if (race_retries >= MAX_RACE_RETRIES) { - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " canceled too many times, giving up: tid=" - << tid << dendl; - complete(std::move(p), -ECANCELED); - return; - } - ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " update canceled, retrying: race_retries=" - << race_retries << " tid=" << tid << dendl; - - ++race_retries; - - std::vector new_processed; - std::unique_lock l(fifo->m); - for (auto& e : processed) { - if (fifo->info.journal.contains(e)) { - new_processed.push_back(e); - } - } - processed = std::move(new_processed); - } - - std::unique_lock l(fifo->m); - auto objv = fifo->info.version; - if (new_tail > fifo->info.tail_part_num) { - tail_part_num = new_tail; - } - - if (new_head > fifo->info.head_part_num) { - head_part_num = new_head; - } - - if (new_max > fifo->info.max_push_part_num) { - max_part_num = new_max; - } - l.unlock(); - - if (processed.empty() && - !tail_part_num && - !max_part_num) { - /* nothing to update anymore */ - ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " nothing to update any more: race_retries=" - << race_retries << " tid=" << tid << dendl; - complete(std::move(p), 0); - return; - } - state = pp_callback; - fifo->_update_meta(dpp, fifo::update{} - .tail_part_num(tail_part_num) - .head_part_num(head_part_num) - .max_push_part_num(max_part_num) - .journal_entries_rm(processed), - objv, &this->canceled, tid, call(std::move(p))); - return; - } - - JournalProcessor(const JournalProcessor&) = delete; - JournalProcessor& operator =(const JournalProcessor&) = delete; - JournalProcessor(JournalProcessor&&) = delete; - JournalProcessor& operator =(JournalProcessor&&) = delete; - - void process(const DoutPrefixProvider *dpp, Ptr&& p) { - ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " entering: tid=" << tid << dendl; - while (iter != journal.end()) { - ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " processing entry: entry=" << *iter - << " tid=" << tid << dendl; - const auto entry = *iter; - switch (entry.op) { - using enum fifo::journal_entry::Op; - case create: - create_part(dpp, std::move(p), entry.part_num); - return; - case set_head: - if (entry.part_num > new_head) { - new_head = entry.part_num; - } - processed.push_back(entry); - ++iter; - continue; - case remove: - remove_part(dpp, std::move(p), entry.part_num); - return; - default: - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " unknown journaled op: entry=" << entry << " tid=" - << tid << dendl; - complete(std::move(p), -EIO); - return; - } - } - postprocess(dpp, std::move(p)); - return; - } - - void handle(const DoutPrefixProvider *dpp, Ptr&& p, int r) { - ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " entering: tid=" << tid << dendl; - switch (state) { - case entry_callback: - finish_je(dpp, std::move(p), r, *iter); - return; - case pp_callback: - auto c = canceled; - canceled = false; - pp_run(dpp, std::move(p), r, c); - return; - } - - abort(); - } - -}; - -void FIFO::process_journal(const DoutPrefixProvider *dpp, std::uint64_t tid, lr::AioCompletion* c) { - auto p = std::make_unique(dpp, this, tid, c); - p->process(dpp, std::move(p)); -} - -struct Lister : Completion { - FIFO* f; - std::vector result; - bool more = false; - std::int64_t part_num; - std::uint64_t ofs; - int max_entries; - int r_out = 0; - std::vector entries; - bool part_more = false; - bool part_full = false; - std::vector* entries_out; - bool* more_out; - std::uint64_t tid; - - bool read = false; - - void complete(Ptr&& p, int r) { - if (r >= 0) { - if (more_out) *more_out = more; - if (entries_out) *entries_out = std::move(result); - } - Completion::complete(std::move(p), r); - } - -public: - Lister(const DoutPrefixProvider *dpp, FIFO* f, std::int64_t part_num, std::uint64_t ofs, int max_entries, - std::vector* entries_out, bool* more_out, - std::uint64_t tid, lr::AioCompletion* super) - : Completion(dpp, super), f(f), part_num(part_num), ofs(ofs), max_entries(max_entries), - entries_out(entries_out), more_out(more_out), tid(tid) { - result.reserve(max_entries); - } - - Lister(const Lister&) = delete; - Lister& operator =(const Lister&) = delete; - Lister(Lister&&) = delete; - Lister& operator =(Lister&&) = delete; - - void handle(const DoutPrefixProvider *dpp, Ptr&& p, int r) { - if (read) - handle_read(std::move(p), r); - else - handle_list(dpp, std::move(p), r); - } - - void list(Ptr&& p) { - if (max_entries > 0) { - part_more = false; - part_full = false; - entries.clear(); - - std::unique_lock l(f->m); - auto part_oid = f->info.part_oid(part_num); - l.unlock(); - - read = false; - auto op = list_part(f->cct, ofs, max_entries, &r_out, - &entries, &part_more, &part_full, tid); - f->ioctx.aio_operate(part_oid, call(std::move(p)), &op, nullptr); - } else { - complete(std::move(p), 0); - } - } - - void handle_read(Ptr&& p, int r) { - read = false; - if (r >= 0) r = r_out; - r_out = 0; - - if (r < 0) { - complete(std::move(p), r); - return; - } - - if (part_num < f->info.tail_part_num) { - /* raced with trim? restart */ - max_entries += result.size(); - result.clear(); - part_num = f->info.tail_part_num; - ofs = 0; - list(std::move(p)); - return; - } - /* assuming part was not written yet, so end of data */ - more = false; - complete(std::move(p), 0); - return; - } - - void handle_list(const DoutPrefixProvider *dpp, Ptr&& p, int r) { - if (r >= 0) r = r_out; - r_out = 0; - std::unique_lock l(f->m); - auto part_oid = f->info.part_oid(part_num); - l.unlock(); - if (r == -ENOENT) { - read = true; - f->read_meta(dpp, tid, call(std::move(p))); - return; - } - if (r < 0) { - complete(std::move(p), r); - return; - } - - more = part_full || part_more; - for (auto& entry : entries) { - list_entry e; - e.data = std::move(entry.data); - e.marker = marker{part_num, entry.ofs}.to_string(); - e.mtime = entry.mtime; - result.push_back(std::move(e)); - } - max_entries -= entries.size(); - entries.clear(); - if (max_entries > 0 && part_more) { - list(std::move(p)); - return; - } - - if (!part_full) { /* head part is not full */ - complete(std::move(p), 0); - return; - } - ++part_num; - ofs = 0; - list(std::move(p)); - } -}; - -void FIFO::list(const DoutPrefixProvider *dpp, int max_entries, - std::optional markstr, - std::vector* out, - bool* more, - lr::AioCompletion* c) { - std::unique_lock l(m); - auto tid = ++next_tid; - std::int64_t part_num = info.tail_part_num; - l.unlock(); - std::uint64_t ofs = 0; - std::optional<::rgw::cls::fifo::marker> marker; - - if (markstr) { - marker = to_marker(*markstr); - if (marker) { - part_num = marker->num; - ofs = marker->ofs; - } - } - - auto ls = std::make_unique(dpp, this, part_num, ofs, max_entries, out, - more, tid, c); - if (markstr && !marker) { - auto l = ls.get(); - l->complete(std::move(ls), -EINVAL); - } else { - ls->list(std::move(ls)); - } -} -} diff --git a/src/rgw/driver/rados/cls_fifo_legacy.h b/src/rgw/driver/rados/cls_fifo_legacy.h deleted file mode 100644 index 85e8f539975..00000000000 --- a/src/rgw/driver/rados/cls_fifo_legacy.h +++ /dev/null @@ -1,335 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab -/* - * Ceph - scalable distributed file system - * - * Copyright (C) 2020 Red Hat - * Author: Adam C. Emerson - * - * This is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software - * Foundation. See file COPYING. - * - */ - -#pragma once - -#include -#include -#include -#include -#include -#include -#include -#include - -#include - -#include "include/rados/librados.hpp" -#include "include/buffer.h" -#include "include/function2.hpp" - -#include "common/async/yield_context.h" - -#include "cls/fifo/cls_fifo_types.h" -#include "cls/fifo/cls_fifo_ops.h" - -#include "librados/AioCompletionImpl.h" - -#include "rgw_tools.h" - -namespace rgw::cls::fifo { -namespace cb = ceph::buffer; -namespace fifo = ::rados::cls::fifo; -namespace lr = librados; - -inline constexpr std::uint64_t default_max_part_size = 4 * 1024 * 1024; -inline constexpr std::uint64_t default_max_entry_size = 32 * 1024; - -void create_meta(lr::ObjectWriteOperation* op, std::string_view id, - std::optional objv, - std::optional oid_prefix, - bool exclusive = false, - std::uint64_t max_part_size = default_max_part_size, - std::uint64_t max_entry_size = default_max_entry_size); -int get_meta(const DoutPrefixProvider *dpp, lr::IoCtx& ioctx, const std::string& oid, - std::optional objv, fifo::info* info, - std::uint32_t* part_header_size, - std::uint32_t* part_entry_overhead, - std::uint64_t tid, optional_yield y, - bool probe = false); -struct marker { - std::int64_t num = 0; - std::uint64_t ofs = 0; - - marker() = default; - marker(std::int64_t num, std::uint64_t ofs) : num(num), ofs(ofs) {} - static marker max() { - return { std::numeric_limits::max(), - std::numeric_limits::max() }; - } - - std::string to_string() { - return fmt::format("{:0>20}:{:0>20}", num, ofs); - } -}; - -struct list_entry { - cb::list data; - std::string marker; - ceph::real_time mtime; -}; - -using part_info = fifo::part_header; - -/// This is an implementation of FIFO using librados to facilitate -/// backports. Please see /src/neorados/cls/fifo.h for full -/// information. -/// -/// This library uses optional_yield. Please see -/// /src/common/async/yield_context.h. In summary, optional_yield -/// contains either a boost::asio::yield_context (in which case the current -/// coroutine is suspended until completion) or null_yield (in which -/// case the current thread is blocked until completion.) -/// -/// Please see the librados documentation for information on -/// AioCompletion and IoCtx. - -class FIFO { - friend struct Reader; - friend struct Updater; - friend struct Trimmer; - friend struct InfoGetter; - friend struct Pusher; - friend struct NewPartPreparer; - friend struct NewHeadPreparer; - friend struct JournalProcessor; - friend struct Lister; - - mutable lr::IoCtx ioctx; - CephContext* cct = static_cast(ioctx.cct()); - const std::string oid; - std::mutex m; - std::uint64_t next_tid = 0; - - fifo::info info; - - std::uint32_t part_header_size = 0xdeadbeef; - std::uint32_t part_entry_overhead = 0xdeadbeef; - - std::optional to_marker(std::string_view s); - - FIFO(lr::IoCtx&& ioc, - std::string oid) - : ioctx(std::move(ioc)), oid(oid) {} - - int apply_update(const DoutPrefixProvider *dpp, - fifo::info* info, - const fifo::objv& objv, - const fifo::update& update, - std::uint64_t tid); - int _update_meta(const DoutPrefixProvider *dpp, const fifo::update& update, - fifo::objv version, bool* pcanceled, - std::uint64_t tid, optional_yield y); - void _update_meta(const DoutPrefixProvider *dpp, const fifo::update& update, - fifo::objv version, bool* pcanceled, - std::uint64_t tid, lr::AioCompletion* c); - int create_part(const DoutPrefixProvider *dpp, int64_t part_num, std::uint64_t tid, - optional_yield y); - int remove_part(const DoutPrefixProvider *dpp, int64_t part_num, std::uint64_t tid, - optional_yield y); - int process_journal(const DoutPrefixProvider *dpp, std::uint64_t tid, optional_yield y); - void process_journal(const DoutPrefixProvider *dpp, std::uint64_t tid, lr::AioCompletion* c); - int _prepare_new_part(const DoutPrefixProvider *dpp, std::int64_t new_part_num, bool is_head, std::uint64_t tid, optional_yield y); - void _prepare_new_part(const DoutPrefixProvider *dpp, std::int64_t new_part_num, bool is_head, std::uint64_t tid, lr::AioCompletion* c); - int _prepare_new_head(const DoutPrefixProvider *dpp, std::int64_t new_head_part_num, - std::uint64_t tid, optional_yield y); - void _prepare_new_head(const DoutPrefixProvider *dpp, std::int64_t new_head_part_num, std::uint64_t tid, lr::AioCompletion* c); - int push_entries(const DoutPrefixProvider *dpp, const std::deque& data_bufs, - std::uint64_t tid, optional_yield y); - void push_entries(const std::deque& data_bufs, - std::uint64_t tid, lr::AioCompletion* c); - int trim_part(const DoutPrefixProvider *dpp, int64_t part_num, uint64_t ofs, - bool exclusive, std::uint64_t tid, optional_yield y); - void trim_part(const DoutPrefixProvider *dpp, int64_t part_num, uint64_t ofs, - bool exclusive, std::uint64_t tid, lr::AioCompletion* c); - - /// Force refresh of metadata, yielding/blocking style - int read_meta(const DoutPrefixProvider *dpp, std::uint64_t tid, optional_yield y); - /// Force refresh of metadata, with a librados Completion - void read_meta(const DoutPrefixProvider *dpp, std::uint64_t tid, lr::AioCompletion* c); - -public: - - FIFO(const FIFO&) = delete; - FIFO& operator =(const FIFO&) = delete; - FIFO(FIFO&&) = delete; - FIFO& operator =(FIFO&&) = delete; - - /// Open an existing FIFO. - static int open(const DoutPrefixProvider *dpp, lr::IoCtx ioctx, //< IO Context - std::string oid, //< OID for metadata object - std::unique_ptr* fifo, //< OUT: Pointer to FIFO object - optional_yield y, //< Optional yield context - /// Operation will fail if FIFO is not at this version - std::optional objv = std::nullopt, - /// Probing for existence, don't print errors if we - /// can't find it. - bool probe = false); - /// Create a new or open an existing FIFO. - static int create(const DoutPrefixProvider *dpp, lr::IoCtx ioctx, //< IO Context - std::string oid, //< OID for metadata object - std::unique_ptr* fifo, //< OUT: Pointer to FIFO object - optional_yield y, //< Optional yield context - /// Operation will fail if the FIFO exists and is - /// not of this version. - std::optional objv = std::nullopt, - /// Prefix for all objects - std::optional oid_prefix = std::nullopt, - /// Fail if the FIFO already exists - bool exclusive = false, - /// Maximum allowed size of parts - std::uint64_t max_part_size = default_max_part_size, - /// Maximum allowed size of entries - std::uint64_t max_entry_size = default_max_entry_size); - - /// Force refresh of metadata, yielding/blocking style - int read_meta(const DoutPrefixProvider *dpp, optional_yield y); - /// Get currently known metadata - const fifo::info& meta() const; - /// Get partition header and entry overhead size - std::pair get_part_layout_info() const; - /// Push an entry to the FIFO - int push(const DoutPrefixProvider *dpp, - const cb::list& bl, //< Entry to push - optional_yield y //< Optional yield - ); - /// Push an entry to the FIFO - void push(const DoutPrefixProvider *dpp, const cb::list& bl, //< Entry to push - lr::AioCompletion* c //< Async Completion - ); - /// Push entries to the FIFO - int push(const DoutPrefixProvider *dpp, - const std::vector& data_bufs, //< Entries to push - optional_yield y //< Optional yield - ); - /// Push entries to the FIFO - void push(const DoutPrefixProvider *dpp, const std::vector& data_bufs, //< Entries to push - lr::AioCompletion* c //< Async Completion - ); - /// List entries - int list(const DoutPrefixProvider *dpp, - int max_entries, //< Maximum entries to list - /// Point after which to begin listing. Start at tail if null - std::optional markstr, - std::vector* out, //< OUT: entries - /// OUT: True if more entries in FIFO beyond the last returned - bool* more, - optional_yield y //< Optional yield - ); - void list(const DoutPrefixProvider *dpp, - int max_entries, //< Maximum entries to list - /// Point after which to begin listing. Start at tail if null - std::optional markstr, - std::vector* out, //< OUT: entries - /// OUT: True if more entries in FIFO beyond the last returned - bool* more, - lr::AioCompletion* c //< Async Completion - ); - /// Trim entries, coroutine/block style - int trim(const DoutPrefixProvider *dpp, - std::string_view markstr, //< Position to which to trim, inclusive - bool exclusive, //< If true, do not trim the target entry - //< itself, just all those before it. - optional_yield y //< Optional yield - ); - /// Trim entries, librados AioCompletion style - void trim(const DoutPrefixProvider *dpp, - std::string_view markstr, //< Position to which to trim, inclusive - bool exclusive, //< If true, do not trim the target entry - //< itself, just all those before it. - lr::AioCompletion* c //< librados AIO Completion - ); - /// Get part info - int get_part_info(const DoutPrefixProvider *dpp, int64_t part_num, /// Part number - fifo::part_header* header, //< OUT: Information - optional_yield y //< Optional yield - ); - /// Get part info - void get_part_info(int64_t part_num, //< Part number - fifo::part_header* header, //< OUT: Information - lr::AioCompletion* c //< AIO Completion - ); - /// A convenience method to fetch the part information for the FIFO - /// head, using librados::AioCompletion, since - /// libradio::AioCompletions compose lousily. - void get_head_info(const DoutPrefixProvider *dpp, fu2::unique_function< //< Function to receive info - void(int r, fifo::part_header&&)>, - lr::AioCompletion* c //< AIO Completion - ); -}; - -template -struct Completion { -private: - const DoutPrefixProvider *_dpp; - lr::AioCompletion* _cur = nullptr; - lr::AioCompletion* _super; -public: - - using Ptr = std::unique_ptr; - - lr::AioCompletion* cur() const { - return _cur; - } - lr::AioCompletion* super() const { - return _super; - } - - Completion(const DoutPrefixProvider *dpp, lr::AioCompletion* super) : _dpp(dpp), _super(super) { - super->pc->get(); - } - - ~Completion() { - if (_super) { - _super->pc->put(); - } - if (_cur) - _cur->release(); - _super = nullptr; - _cur = nullptr; - } - - // The only times that aio_operate can return an error are: - // 1. The completion contains a null pointer. This should just - // crash, and in our case it does. - // 2. An attempt is made to write to a snapshot. RGW doesn't use - // snapshots, so we don't care. - // - // So we will just assert that initiating an Aio operation succeeds - // and not worry about recovering. - static lr::AioCompletion* call(Ptr&& p) { - p->_cur = lr::Rados::aio_create_completion(static_cast(p.get()), - &cb); - auto c = p->_cur; - p.release(); - // coverity[leaked_storage:SUPPRESS] - return c; - } - static void complete(Ptr&& p, int r) { - auto c = p->_super; - p->_super = nullptr; - rgw_complete_aio_completion(c, r); - } - - static void cb(lr::completion_t, void* arg) { - auto t = static_cast(arg); - auto r = t->_cur->get_return_value(); - t->_cur->release(); - t->_cur = nullptr; - t->handle(t->_dpp, Ptr(t), r); - } -}; - -} diff --git a/src/test/rgw/CMakeLists.txt b/src/test/rgw/CMakeLists.txt index 9bcc4383a8f..0dadae510c1 100644 --- a/src/test/rgw/CMakeLists.txt +++ b/src/test/rgw/CMakeLists.txt @@ -306,13 +306,6 @@ install(TARGETS ceph_test_rgw_gc_log DESTINATION ${CMAKE_INSTALL_BINDIR}) add_ceph_test(test-ceph-diff-sorted.sh ${CMAKE_CURRENT_SOURCE_DIR}/test-ceph-diff-sorted.sh) -# unittest_cls_fifo_legacy -add_executable(unittest_cls_fifo_legacy test_cls_fifo_legacy.cc) -target_include_directories(unittest_cls_fifo_legacy - SYSTEM PRIVATE "${CMAKE_SOURCE_DIR}/src/rgw") -target_link_libraries(unittest_cls_fifo_legacy radostest-cxx ${UNITTEST_LIBS} - ${rgw_libs}) - # unittest_log_backing add_executable(unittest_log_backing test_log_backing.cc) target_include_directories(unittest_log_backing diff --git a/src/test/rgw/test_cls_fifo_legacy.cc b/src/test/rgw/test_cls_fifo_legacy.cc deleted file mode 100644 index 9c383146fd3..00000000000 --- a/src/test/rgw/test_cls_fifo_legacy.cc +++ /dev/null @@ -1,1184 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab -/* - * Ceph - scalable distributed file system - * - * Copyright (C) 2019 Red Hat, Inc. - * - * This is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software - * Foundation. See file COPYING. - * - */ - -#include -#include -#include - -#include "include/scope_guard.h" -#include "include/types.h" -#include "include/rados/librados.hpp" -#include "common/ceph_context.h" - -#include "cls/fifo/cls_fifo_ops.h" -#include "test/librados/test_cxx.h" -#include "global/global_context.h" - -#include "rgw_tools.h" -#include "cls_fifo_legacy.h" - -#include "gtest/gtest.h" - -using namespace std::literals; -using namespace std::string_literals; - -namespace R = librados; -namespace cb = ceph::buffer; -namespace fifo = rados::cls::fifo; -namespace RCf = rgw::cls::fifo; - -auto cct = new CephContext(CEPH_ENTITY_TYPE_CLIENT); -const DoutPrefix dp(cct, 1, "test legacy cls fifo: "); - -namespace { -int fifo_create(const DoutPrefixProvider *dpp, R::IoCtx& ioctx, - const std::string& oid, - std::string_view id, - optional_yield y, - std::optional objv = std::nullopt, - std::optional oid_prefix = std::nullopt, - bool exclusive = false, - std::uint64_t max_part_size = RCf::default_max_part_size, - std::uint64_t max_entry_size = RCf::default_max_entry_size) -{ - R::ObjectWriteOperation op; - RCf::create_meta(&op, id, objv, oid_prefix, exclusive, max_part_size, - max_entry_size); - return rgw_rados_operate(dpp, ioctx, oid, std::move(op), y); -} -} - -class LegacyFIFO : public testing::Test { -protected: - const std::string pool_name = get_temp_pool_name(); - const std::string fifo_id = "fifo"; - R::Rados rados; - librados::IoCtx ioctx; - - void SetUp() override { - ASSERT_EQ("", create_one_pool_pp(pool_name, rados)); - ASSERT_EQ(0, rados.ioctx_create(pool_name.c_str(), ioctx)); - } - void TearDown() override { - destroy_one_pool_pp(pool_name, rados); - } -}; - -using LegacyClsFIFO = LegacyFIFO; -using AioLegacyFIFO = LegacyFIFO; - - -TEST_F(LegacyClsFIFO, TestCreate) -{ - auto r = fifo_create(&dp, ioctx, fifo_id, ""s, null_yield); - EXPECT_EQ(-EINVAL, r); - r = fifo_create(&dp, ioctx, fifo_id, fifo_id, null_yield, std::nullopt, - std::nullopt, false, 0); - EXPECT_EQ(-EINVAL, r); - r = fifo_create(&dp, ioctx, fifo_id, {}, null_yield, - std::nullopt, std::nullopt, - false, RCf::default_max_part_size, 0); - EXPECT_EQ(-EINVAL, r); - r = fifo_create(&dp, ioctx, fifo_id, fifo_id, null_yield); - EXPECT_EQ(0, r); - std::uint64_t size; - ioctx.stat(fifo_id, &size, nullptr); - EXPECT_GT(size, 0); - /* test idempotency */ - r = fifo_create(&dp, ioctx, fifo_id, fifo_id, null_yield); - EXPECT_EQ(0, r); - r = fifo_create(&dp, ioctx, fifo_id, {}, null_yield, std::nullopt, - std::nullopt, false); - EXPECT_EQ(-EINVAL, r); - r = fifo_create(&dp, ioctx, fifo_id, {}, null_yield, std::nullopt, - "myprefix"sv, false); - EXPECT_EQ(-EINVAL, r); - r = fifo_create(&dp, ioctx, fifo_id, "foo"sv, null_yield, - std::nullopt, std::nullopt, false); - EXPECT_EQ(-EEXIST, r); -} - -TEST_F(LegacyClsFIFO, TestGetInfo) -{ - auto r = fifo_create(&dp, ioctx, fifo_id, fifo_id, null_yield); - fifo::info info; - std::uint32_t part_header_size; - std::uint32_t part_entry_overhead; - r = RCf::get_meta(&dp, ioctx, fifo_id, std::nullopt, &info, &part_header_size, - &part_entry_overhead, 0, null_yield); - EXPECT_EQ(0, r); - EXPECT_GT(part_header_size, 0); - EXPECT_GT(part_entry_overhead, 0); - EXPECT_FALSE(info.version.instance.empty()); - - r = RCf::get_meta(&dp, ioctx, fifo_id, info.version, &info, &part_header_size, - &part_entry_overhead, 0, null_yield); - EXPECT_EQ(0, r); - fifo::objv objv; - objv.instance = "foo"; - objv.ver = 12; - r = RCf::get_meta(&dp, ioctx, fifo_id, objv, &info, &part_header_size, - &part_entry_overhead, 0, null_yield); - EXPECT_EQ(-ECANCELED, r); -} - -TEST_F(LegacyFIFO, TestOpenDefault) -{ - std::unique_ptr fifo; - auto r = RCf::FIFO::create(&dp, ioctx, fifo_id, &fifo, null_yield); - ASSERT_EQ(0, r); - // force reading from backend - r = fifo->read_meta(&dp, null_yield); - EXPECT_EQ(0, r); - auto info = fifo->meta(); - EXPECT_EQ(info.id, fifo_id); -} - -TEST_F(LegacyFIFO, TestOpenParams) -{ - const std::uint64_t max_part_size = 10 * 1024; - const std::uint64_t max_entry_size = 128; - auto oid_prefix = "foo.123."sv; - fifo::objv objv; - objv.instance = "fooz"s; - objv.ver = 10; - - /* first successful create */ - std::unique_ptr f; - auto r = RCf::FIFO::create(&dp, ioctx, fifo_id, &f, null_yield, objv, oid_prefix, - false, max_part_size, max_entry_size); - ASSERT_EQ(0, r); - - /* force reading from backend */ - r = f->read_meta(&dp, null_yield); - auto info = f->meta(); - EXPECT_EQ(info.id, fifo_id); - EXPECT_EQ(info.params.max_part_size, max_part_size); - EXPECT_EQ(info.params.max_entry_size, max_entry_size); - EXPECT_EQ(info.version, objv); -} - -namespace { -template -std::pair decode_entry(const RCf::list_entry& entry) -{ - T val; - auto iter = entry.data.cbegin(); - decode(val, iter); - return std::make_pair(std::move(val), entry.marker); -} -} - - -TEST_F(LegacyFIFO, TestPushListTrim) -{ - std::unique_ptr f; - auto r = RCf::FIFO::create(&dp, ioctx, fifo_id, &f, null_yield); - ASSERT_EQ(0, r); - static constexpr auto max_entries = 10u; - for (uint32_t i = 0; i < max_entries; ++i) { - cb::list bl; - encode(i, bl); - r = f->push(&dp, bl, null_yield); - ASSERT_EQ(0, r); - } - - std::optional marker; - /* get entries one by one */ - std::vector result; - bool more = false; - for (auto i = 0u; i < max_entries; ++i) { - - r = f->list(&dp, 1, marker, &result, &more, null_yield); - ASSERT_EQ(0, r); - - bool expected_more = (i != (max_entries - 1)); - ASSERT_EQ(expected_more, more); - ASSERT_EQ(1, result.size()); - - std::uint32_t val; - std::tie(val, marker) = decode_entry(result.front()); - - ASSERT_EQ(i, val); - result.clear(); - } - - /* get all entries at once */ - std::string markers[max_entries]; - std::uint32_t min_entry = 0; - r = f->list(&dp, max_entries * 10, std::nullopt, &result, &more, null_yield); - ASSERT_EQ(0, r); - - ASSERT_FALSE(more); - ASSERT_EQ(max_entries, result.size()); - for (auto i = 0u; i < max_entries; ++i) { - std::uint32_t val; - std::tie(val, markers[i]) = decode_entry(result[i]); - ASSERT_EQ(i, val); - } - - /* trim one entry */ - r = f->trim(&dp, markers[min_entry], false, null_yield); - ASSERT_EQ(0, r); - ++min_entry; - - r = f->list(&dp, max_entries * 10, std::nullopt, &result, &more, null_yield); - ASSERT_EQ(0, r); - ASSERT_FALSE(more); - ASSERT_EQ(max_entries - min_entry, result.size()); - - for (auto i = min_entry; i < max_entries; ++i) { - std::uint32_t val; - std::tie(val, markers[i - min_entry]) = - decode_entry(result[i - min_entry]); - EXPECT_EQ(i, val); - } -} - - -TEST_F(LegacyFIFO, TestPushTooBig) -{ - static constexpr auto max_part_size = 2048ull; - static constexpr auto max_entry_size = 128ull; - - std::unique_ptr f; - auto r = RCf::FIFO::create(&dp, ioctx, fifo_id, &f, null_yield, std::nullopt, - std::nullopt, false, max_part_size, max_entry_size); - ASSERT_EQ(0, r); - - char buf[max_entry_size + 1]; - memset(buf, 0, sizeof(buf)); - - cb::list bl; - bl.append(buf, sizeof(buf)); - - r = f->push(&dp, bl, null_yield); - EXPECT_EQ(-E2BIG, r); -} - - -TEST_F(LegacyFIFO, TestMultipleParts) -{ - static constexpr auto max_part_size = 2048ull; - static constexpr auto max_entry_size = 128ull; - std::unique_ptr f; - auto r = RCf::FIFO::create(&dp, ioctx, fifo_id, &f, null_yield, std::nullopt, - std::nullopt, false, max_part_size, - max_entry_size); - ASSERT_EQ(0, r); - - char buf[max_entry_size]; - memset(buf, 0, sizeof(buf)); - const auto [part_header_size, part_entry_overhead] = - f->get_part_layout_info(); - const auto entries_per_part = ((max_part_size - part_header_size) / - (max_entry_size + part_entry_overhead)); - const auto max_entries = entries_per_part * 4 + 1; - /* push enough entries */ - for (auto i = 0u; i < max_entries; ++i) { - cb::list bl; - *(int *)buf = i; - bl.append(buf, sizeof(buf)); - r = f->push(&dp, bl, null_yield); - ASSERT_EQ(0, r); - } - - auto info = f->meta(); - ASSERT_EQ(info.id, fifo_id); - /* head should have advanced */ - ASSERT_GT(info.head_part_num, 0); - - /* list all at once */ - std::vector result; - bool more = false; - r = f->list(&dp, max_entries, std::nullopt, &result, &more, null_yield); - ASSERT_EQ(0, r); - EXPECT_EQ(false, more); - ASSERT_EQ(max_entries, result.size()); - - for (auto i = 0u; i < max_entries; ++i) { - auto& bl = result[i].data; - ASSERT_EQ(i, *(int *)bl.c_str()); - } - - std::optional marker; - /* get entries one by one */ - - for (auto i = 0u; i < max_entries; ++i) { - r = f->list(&dp, 1, marker, &result, &more, null_yield); - ASSERT_EQ(0, r); - ASSERT_EQ(result.size(), 1); - const bool expected_more = (i != (max_entries - 1)); - ASSERT_EQ(expected_more, more); - - std::uint32_t val; - std::tie(val, marker) = decode_entry(result.front()); - - auto& entry = result.front(); - auto& bl = entry.data; - ASSERT_EQ(i, *(int *)bl.c_str()); - marker = entry.marker; - } - - /* trim one at a time */ - marker.reset(); - for (auto i = 0u; i < max_entries; ++i) { - /* read single entry */ - r = f->list(&dp, 1, marker, &result, &more, null_yield); - ASSERT_EQ(0, r); - ASSERT_EQ(result.size(), 1); - const bool expected_more = (i != (max_entries - 1)); - ASSERT_EQ(expected_more, more); - - marker = result.front().marker; - r = f->trim(&dp, *marker, false, null_yield); - ASSERT_EQ(0, r); - - /* check tail */ - info = f->meta(); - ASSERT_EQ(info.tail_part_num, i / entries_per_part); - - /* try to read all again, see how many entries left */ - r = f->list(&dp, max_entries, marker, &result, &more, null_yield); - ASSERT_EQ(max_entries - i - 1, result.size()); - ASSERT_EQ(false, more); - } - - /* tail now should point at head */ - info = f->meta(); - ASSERT_EQ(info.head_part_num, info.tail_part_num); - - RCf::part_info partinfo; - /* check old tails are removed */ - for (auto i = 0; i < info.tail_part_num; ++i) { - r = f->get_part_info(&dp, i, &partinfo, null_yield); - ASSERT_EQ(-ENOENT, r); - } - /* check current tail exists */ - r = f->get_part_info(&dp, info.tail_part_num, &partinfo, null_yield); - ASSERT_EQ(0, r); -} - -TEST_F(LegacyFIFO, TestTwoPushers) -{ - static constexpr auto max_part_size = 2048ull; - static constexpr auto max_entry_size = 128ull; - - std::unique_ptr f; - auto r = RCf::FIFO::create(&dp, ioctx, fifo_id, &f, null_yield, std::nullopt, - std::nullopt, false, max_part_size, - max_entry_size); - ASSERT_EQ(0, r); - char buf[max_entry_size]; - memset(buf, 0, sizeof(buf)); - - auto [part_header_size, part_entry_overhead] = f->get_part_layout_info(); - const auto entries_per_part = ((max_part_size - part_header_size) / - (max_entry_size + part_entry_overhead)); - const auto max_entries = entries_per_part * 4 + 1; - std::unique_ptr f2; - r = RCf::FIFO::open(&dp, ioctx, fifo_id, &f2, null_yield); - std::vector fifos{&f, &f2}; - - for (auto i = 0u; i < max_entries; ++i) { - cb::list bl; - *(int *)buf = i; - bl.append(buf, sizeof(buf)); - auto& f = *fifos[i % fifos.size()]; - r = f->push(&dp, bl, null_yield); - ASSERT_EQ(0, r); - } - - /* list all by both */ - std::vector result; - bool more = false; - r = f2->list(&dp, max_entries, std::nullopt, &result, &more, null_yield); - ASSERT_EQ(0, r); - ASSERT_EQ(false, more); - ASSERT_EQ(max_entries, result.size()); - - r = f2->list(&dp, max_entries, std::nullopt, &result, &more, null_yield); - ASSERT_EQ(0, r); - ASSERT_EQ(false, more); - ASSERT_EQ(max_entries, result.size()); - - for (auto i = 0u; i < max_entries; ++i) { - auto& bl = result[i].data; - ASSERT_EQ(i, *(int *)bl.c_str()); - } -} - -TEST_F(LegacyFIFO, TestTwoPushersTrim) -{ - static constexpr auto max_part_size = 2048ull; - static constexpr auto max_entry_size = 128ull; - std::unique_ptr f1; - auto r = RCf::FIFO::create(&dp, ioctx, fifo_id, &f1, null_yield, std::nullopt, - std::nullopt, false, max_part_size, - max_entry_size); - ASSERT_EQ(0, r); - - char buf[max_entry_size]; - memset(buf, 0, sizeof(buf)); - - auto [part_header_size, part_entry_overhead] = f1->get_part_layout_info(); - const auto entries_per_part = ((max_part_size - part_header_size) / - (max_entry_size + part_entry_overhead)); - const auto max_entries = entries_per_part * 4 + 1; - - std::unique_ptr f2; - r = RCf::FIFO::open(&dp, ioctx, fifo_id, &f2, null_yield); - ASSERT_EQ(0, r); - - /* push one entry to f2 and the rest to f1 */ - for (auto i = 0u; i < max_entries; ++i) { - cb::list bl; - *(int *)buf = i; - bl.append(buf, sizeof(buf)); - auto& f = (i < 1 ? f2 : f1); - r = f->push(&dp, bl, null_yield); - ASSERT_EQ(0, r); - } - - /* trim half by fifo1 */ - auto num = max_entries / 2; - std::string marker; - std::vector result; - bool more = false; - r = f1->list(&dp, num, std::nullopt, &result, &more, null_yield); - ASSERT_EQ(0, r); - ASSERT_EQ(true, more); - ASSERT_EQ(num, result.size()); - - for (auto i = 0u; i < num; ++i) { - auto& bl = result[i].data; - ASSERT_EQ(i, *(int *)bl.c_str()); - } - - auto& entry = result[num - 1]; - marker = entry.marker; - r = f1->trim(&dp, marker, false, null_yield); - /* list what's left by fifo2 */ - - const auto left = max_entries - num; - f2->list(&dp, left, marker, &result, &more, null_yield); - ASSERT_EQ(left, result.size()); - ASSERT_EQ(false, more); - - for (auto i = num; i < max_entries; ++i) { - auto& bl = result[i - num].data; - ASSERT_EQ(i, *(int *)bl.c_str()); - } -} - -TEST_F(LegacyFIFO, TestPushBatch) -{ - static constexpr auto max_part_size = 2048ull; - static constexpr auto max_entry_size = 128ull; - - std::unique_ptr f; - auto r = RCf::FIFO::create(&dp, ioctx, fifo_id, &f, null_yield, std::nullopt, - std::nullopt, false, max_part_size, - max_entry_size); - ASSERT_EQ(0, r); - - char buf[max_entry_size]; - memset(buf, 0, sizeof(buf)); - auto [part_header_size, part_entry_overhead] = f->get_part_layout_info(); - auto entries_per_part = ((max_part_size - part_header_size) / - (max_entry_size + part_entry_overhead)); - auto max_entries = entries_per_part * 4 + 1; /* enough entries to span multiple parts */ - std::vector bufs; - for (auto i = 0u; i < max_entries; ++i) { - cb::list bl; - *(int *)buf = i; - bl.append(buf, sizeof(buf)); - bufs.push_back(bl); - } - ASSERT_EQ(max_entries, bufs.size()); - - r = f->push(&dp, bufs, null_yield); - ASSERT_EQ(0, r); - - /* list all */ - - std::vector result; - bool more = false; - r = f->list(&dp, max_entries, std::nullopt, &result, &more, null_yield); - ASSERT_EQ(0, r); - ASSERT_EQ(false, more); - ASSERT_EQ(max_entries, result.size()); - for (auto i = 0u; i < max_entries; ++i) { - auto& bl = result[i].data; - ASSERT_EQ(i, *(int *)bl.c_str()); - } - auto& info = f->meta(); - ASSERT_EQ(info.head_part_num, 4); -} - -TEST_F(LegacyFIFO, TestAioTrim) -{ - static constexpr auto max_part_size = 2048ull; - static constexpr auto max_entry_size = 128ull; - std::unique_ptr f; - auto r = RCf::FIFO::create(&dp, ioctx, fifo_id, &f, null_yield, std::nullopt, - std::nullopt, false, max_part_size, - max_entry_size); - ASSERT_EQ(0, r); - - char buf[max_entry_size]; - memset(buf, 0, sizeof(buf)); - const auto [part_header_size, part_entry_overhead] = - f->get_part_layout_info(); - const auto entries_per_part = ((max_part_size - part_header_size) / - (max_entry_size + part_entry_overhead)); - const auto max_entries = entries_per_part * 4 + 1; - /* push enough entries */ - std::vector bufs; - for (auto i = 0u; i < max_entries; ++i) { - cb::list bl; - *(int *)buf = i; - bl.append(buf, sizeof(buf)); - bufs.push_back(std::move(bl)); - } - ASSERT_EQ(max_entries, bufs.size()); - - r = f->push(&dp, bufs, null_yield); - ASSERT_EQ(0, r); - - auto info = f->meta(); - ASSERT_EQ(info.id, fifo_id); - /* head should have advanced */ - ASSERT_GT(info.head_part_num, 0); - - /* list all at once */ - std::vector result; - bool more = false; - r = f->list(&dp, max_entries, std::nullopt, &result, &more, null_yield); - ASSERT_EQ(0, r); - ASSERT_EQ(false, more); - ASSERT_EQ(max_entries, result.size()); - - std::optional marker; - /* trim one at a time */ - result.clear(); - more = false; - marker.reset(); - for (auto i = 0u; i < max_entries; ++i) { - /* read single entry */ - r = f->list(&dp, 1, marker, &result, &more, null_yield); - ASSERT_EQ(0, r); - ASSERT_EQ(result.size(), 1); - const bool expected_more = (i != (max_entries - 1)); - ASSERT_EQ(expected_more, more); - - marker = result.front().marker; - std::unique_ptr c(rados.aio_create_completion(nullptr, - nullptr)); - f->trim(&dp, *marker, false, c.get()); - c->wait_for_complete(); - r = c->get_return_value(); - ASSERT_EQ(0, r); - - /* check tail */ - info = f->meta(); - ASSERT_EQ(info.tail_part_num, i / entries_per_part); - - /* try to read all again, see how many entries left */ - r = f->list(&dp, max_entries, marker, &result, &more, null_yield); - ASSERT_EQ(max_entries - i - 1, result.size()); - ASSERT_EQ(false, more); - } - - /* tail now should point at head */ - info = f->meta(); - ASSERT_EQ(info.head_part_num, info.tail_part_num); - - RCf::part_info partinfo; - /* check old tails are removed */ - for (auto i = 0; i < info.tail_part_num; ++i) { - r = f->get_part_info(&dp, i, &partinfo, null_yield); - ASSERT_EQ(-ENOENT, r); - } - /* check current tail exists */ - r = f->get_part_info(&dp, info.tail_part_num, &partinfo, null_yield); - ASSERT_EQ(0, r); -} - -TEST_F(LegacyFIFO, TestTrimExclusive) { - std::unique_ptr f; - auto r = RCf::FIFO::create(&dp, ioctx, fifo_id, &f, null_yield); - ASSERT_EQ(0, r); - std::vector result; - bool more = false; - - static constexpr auto max_entries = 10u; - for (uint32_t i = 0; i < max_entries; ++i) { - cb::list bl; - encode(i, bl); - f->push(&dp, bl, null_yield); - } - - f->list(&dp, 1, std::nullopt, &result, &more, null_yield); - auto [val, marker] = decode_entry(result.front()); - ASSERT_EQ(0, val); - f->trim(&dp, marker, true, null_yield); - - result.clear(); - f->list(&dp, max_entries, std::nullopt, &result, &more, null_yield); - std::tie(val, marker) = decode_entry(result.front()); - ASSERT_EQ(0, val); - f->trim(&dp, result[4].marker, true, null_yield); - - result.clear(); - f->list(&dp, max_entries, std::nullopt, &result, &more, null_yield); - std::tie(val, marker) = decode_entry(result.front()); - ASSERT_EQ(4, val); - f->trim(&dp, result.back().marker, true, null_yield); - - result.clear(); - f->list(&dp, max_entries, std::nullopt, &result, &more, null_yield); - std::tie(val, marker) = decode_entry(result.front()); - ASSERT_EQ(result.size(), 1); - ASSERT_EQ(max_entries - 1, val); -} - -TEST_F(AioLegacyFIFO, TestPushListTrim) -{ - std::unique_ptr f; - auto r = RCf::FIFO::create(&dp, ioctx, fifo_id, &f, null_yield); - ASSERT_EQ(0, r); - static constexpr auto max_entries = 10u; - for (uint32_t i = 0; i < max_entries; ++i) { - cb::list bl; - encode(i, bl); - auto c = R::Rados::aio_create_completion(); - f->push(&dp, bl, c); - c->wait_for_complete(); - r = c->get_return_value(); - c->release(); - ASSERT_EQ(0, r); - } - - std::optional marker; - /* get entries one by one */ - std::vector result; - bool more = false; - for (auto i = 0u; i < max_entries; ++i) { - auto c = R::Rados::aio_create_completion(); - f->list(&dp, 1, marker, &result, &more, c); - c->wait_for_complete(); - r = c->get_return_value(); - c->release(); - ASSERT_EQ(0, r); - - bool expected_more = (i != (max_entries - 1)); - ASSERT_EQ(expected_more, more); - ASSERT_EQ(1, result.size()); - - std::uint32_t val; - std::tie(val, marker) = decode_entry(result.front()); - - ASSERT_EQ(i, val); - result.clear(); - } - - /* get all entries at once */ - std::string markers[max_entries]; - std::uint32_t min_entry = 0; - auto c = R::Rados::aio_create_completion(); - f->list(&dp, max_entries * 10, std::nullopt, &result, &more, c); - c->wait_for_complete(); - r = c->get_return_value(); - c->release(); - ASSERT_EQ(0, r); - - ASSERT_FALSE(more); - ASSERT_EQ(max_entries, result.size()); - for (auto i = 0u; i < max_entries; ++i) { - std::uint32_t val; - std::tie(val, markers[i]) = decode_entry(result[i]); - ASSERT_EQ(i, val); - } - - /* trim one entry */ - c = R::Rados::aio_create_completion(); - f->trim(&dp, markers[min_entry], false, c); - c->wait_for_complete(); - r = c->get_return_value(); - c->release(); - ASSERT_EQ(0, r); - ++min_entry; - - c = R::Rados::aio_create_completion(); - f->list(&dp, max_entries * 10, std::nullopt, &result, &more, c); - c->wait_for_complete(); - r = c->get_return_value(); - c->release(); - ASSERT_EQ(0, r); - ASSERT_FALSE(more); - ASSERT_EQ(max_entries - min_entry, result.size()); - - for (auto i = min_entry; i < max_entries; ++i) { - std::uint32_t val; - std::tie(val, markers[i - min_entry]) = - decode_entry(result[i - min_entry]); - EXPECT_EQ(i, val); - } -} - - -TEST_F(AioLegacyFIFO, TestPushTooBig) -{ - static constexpr auto max_part_size = 2048ull; - static constexpr auto max_entry_size = 128ull; - - std::unique_ptr f; - auto r = RCf::FIFO::create(&dp, ioctx, fifo_id, &f, null_yield, std::nullopt, - std::nullopt, false, max_part_size, max_entry_size); - ASSERT_EQ(0, r); - - char buf[max_entry_size + 1]; - memset(buf, 0, sizeof(buf)); - - cb::list bl; - bl.append(buf, sizeof(buf)); - - auto c = R::Rados::aio_create_completion(); - f->push(&dp, bl, c); - c->wait_for_complete(); - r = c->get_return_value(); - ASSERT_EQ(-E2BIG, r); - c->release(); - - c = R::Rados::aio_create_completion(); - f->push(&dp, std::vector{}, c); - c->wait_for_complete(); - r = c->get_return_value(); - c->release(); - EXPECT_EQ(0, r); -} - - -TEST_F(AioLegacyFIFO, TestMultipleParts) -{ - static constexpr auto max_part_size = 2048ull; - static constexpr auto max_entry_size = 128ull; - std::unique_ptr f; - auto r = RCf::FIFO::create(&dp, ioctx, fifo_id, &f, null_yield, std::nullopt, - std::nullopt, false, max_part_size, - max_entry_size); - ASSERT_EQ(0, r); - - { - auto c = R::Rados::aio_create_completion(); - f->get_head_info(&dp, [&](int r, RCf::part_info&& p) { - ASSERT_EQ(0, p.magic); - ASSERT_EQ(0, p.min_ofs); - ASSERT_EQ(0, p.last_ofs); - ASSERT_EQ(0, p.next_ofs); - ASSERT_EQ(0, p.min_index); - ASSERT_EQ(0, p.max_index); - ASSERT_EQ(ceph::real_time{}, p.max_time); - }, c); - c->wait_for_complete(); - r = c->get_return_value(); - c->release(); - } - - char buf[max_entry_size]; - memset(buf, 0, sizeof(buf)); - const auto [part_header_size, part_entry_overhead] = - f->get_part_layout_info(); - const auto entries_per_part = ((max_part_size - part_header_size) / - (max_entry_size + part_entry_overhead)); - const auto max_entries = entries_per_part * 4 + 1; - /* push enough entries */ - for (auto i = 0u; i < max_entries; ++i) { - cb::list bl; - *(int *)buf = i; - bl.append(buf, sizeof(buf)); - auto c = R::Rados::aio_create_completion(); - f->push(&dp, bl, c); - c->wait_for_complete(); - r = c->get_return_value(); - c->release(); - EXPECT_EQ(0, r); - } - - auto info = f->meta(); - ASSERT_EQ(info.id, fifo_id); - /* head should have advanced */ - ASSERT_GT(info.head_part_num, 0); - - /* list all at once */ - std::vector result; - bool more = false; - auto c = R::Rados::aio_create_completion(); - f->list(&dp, max_entries, std::nullopt, &result, &more, c); - c->wait_for_complete(); - r = c->get_return_value(); - c->release(); - EXPECT_EQ(0, r); - EXPECT_EQ(false, more); - ASSERT_EQ(max_entries, result.size()); - - for (auto i = 0u; i < max_entries; ++i) { - auto& bl = result[i].data; - ASSERT_EQ(i, *(int *)bl.c_str()); - } - - std::optional marker; - /* get entries one by one */ - - for (auto i = 0u; i < max_entries; ++i) { - c = R::Rados::aio_create_completion(); - f->list(&dp, 1, marker, &result, &more, c); - c->wait_for_complete(); - r = c->get_return_value(); - c->release(); - EXPECT_EQ(0, r); - ASSERT_EQ(result.size(), 1); - const bool expected_more = (i != (max_entries - 1)); - ASSERT_EQ(expected_more, more); - - std::uint32_t val; - std::tie(val, marker) = decode_entry(result.front()); - - auto& entry = result.front(); - auto& bl = entry.data; - ASSERT_EQ(i, *(int *)bl.c_str()); - marker = entry.marker; - } - - /* trim one at a time */ - marker.reset(); - for (auto i = 0u; i < max_entries; ++i) { - /* read single entry */ - c = R::Rados::aio_create_completion(); - f->list(&dp, 1, marker, &result, &more, c); - c->wait_for_complete(); - r = c->get_return_value(); - c->release(); - EXPECT_EQ(0, r); - ASSERT_EQ(result.size(), 1); - const bool expected_more = (i != (max_entries - 1)); - ASSERT_EQ(expected_more, more); - - marker = result.front().marker; - c = R::Rados::aio_create_completion(); - f->trim(&dp, *marker, false, c); - c->wait_for_complete(); - r = c->get_return_value(); - c->release(); - EXPECT_EQ(0, r); - ASSERT_EQ(result.size(), 1); - - /* check tail */ - info = f->meta(); - ASSERT_EQ(info.tail_part_num, i / entries_per_part); - - /* try to read all again, see how many entries left */ - c = R::Rados::aio_create_completion(); - f->list(&dp, max_entries, marker, &result, &more, c); - c->wait_for_complete(); - r = c->get_return_value(); - c->release(); - EXPECT_EQ(0, r); - ASSERT_EQ(max_entries - i - 1, result.size()); - ASSERT_EQ(false, more); - } - - /* tail now should point at head */ - info = f->meta(); - ASSERT_EQ(info.head_part_num, info.tail_part_num); - - /* check old tails are removed */ - for (auto i = 0; i < info.tail_part_num; ++i) { - c = R::Rados::aio_create_completion(); - RCf::part_info partinfo; - f->get_part_info(i, &partinfo, c); - c->wait_for_complete(); - r = c->get_return_value(); - c->release(); - ASSERT_EQ(-ENOENT, r); - } - /* check current tail exists */ - std::uint64_t next_ofs; - { - c = R::Rados::aio_create_completion(); - RCf::part_info partinfo; - f->get_part_info(info.tail_part_num, &partinfo, c); - c->wait_for_complete(); - r = c->get_return_value(); - c->release(); - next_ofs = partinfo.next_ofs; - } - ASSERT_EQ(0, r); - - c = R::Rados::aio_create_completion(); - f->get_head_info(&dp, [&](int r, RCf::part_info&& p) { - ASSERT_EQ(next_ofs, p.next_ofs); - }, c); - c->wait_for_complete(); - r = c->get_return_value(); - c->release(); - ASSERT_EQ(0, r); -} - -TEST_F(AioLegacyFIFO, TestTwoPushers) -{ - static constexpr auto max_part_size = 2048ull; - static constexpr auto max_entry_size = 128ull; - - std::unique_ptr f; - auto r = RCf::FIFO::create(&dp, ioctx, fifo_id, &f, null_yield, std::nullopt, - std::nullopt, false, max_part_size, - max_entry_size); - ASSERT_EQ(0, r); - char buf[max_entry_size]; - memset(buf, 0, sizeof(buf)); - - auto [part_header_size, part_entry_overhead] = f->get_part_layout_info(); - const auto entries_per_part = ((max_part_size - part_header_size) / - (max_entry_size + part_entry_overhead)); - const auto max_entries = entries_per_part * 4 + 1; - std::unique_ptr f2; - r = RCf::FIFO::open(&dp, ioctx, fifo_id, &f2, null_yield); - std::vector fifos{&f, &f2}; - - for (auto i = 0u; i < max_entries; ++i) { - cb::list bl; - *(int *)buf = i; - bl.append(buf, sizeof(buf)); - auto& f = *fifos[i % fifos.size()]; - auto c = R::Rados::aio_create_completion(); - f->push(&dp, bl, c); - c->wait_for_complete(); - r = c->get_return_value(); - c->release(); - ASSERT_EQ(0, r); - } - - /* list all by both */ - std::vector result; - bool more = false; - auto c = R::Rados::aio_create_completion(); - f2->list(&dp, max_entries, std::nullopt, &result, &more, c); - c->wait_for_complete(); - r = c->get_return_value(); - c->release(); - ASSERT_EQ(0, r); - ASSERT_EQ(false, more); - ASSERT_EQ(max_entries, result.size()); - - c = R::Rados::aio_create_completion(); - f2->list(&dp, max_entries, std::nullopt, &result, &more, c); - c->wait_for_complete(); - r = c->get_return_value(); - c->release(); - ASSERT_EQ(0, r); - ASSERT_EQ(false, more); - ASSERT_EQ(max_entries, result.size()); - - for (auto i = 0u; i < max_entries; ++i) { - auto& bl = result[i].data; - ASSERT_EQ(i, *(int *)bl.c_str()); - } -} - -TEST_F(AioLegacyFIFO, TestTwoPushersTrim) -{ - static constexpr auto max_part_size = 2048ull; - static constexpr auto max_entry_size = 128ull; - std::unique_ptr f1; - auto r = RCf::FIFO::create(&dp, ioctx, fifo_id, &f1, null_yield, std::nullopt, - std::nullopt, false, max_part_size, - max_entry_size); - ASSERT_EQ(0, r); - - char buf[max_entry_size]; - memset(buf, 0, sizeof(buf)); - - auto [part_header_size, part_entry_overhead] = f1->get_part_layout_info(); - const auto entries_per_part = ((max_part_size - part_header_size) / - (max_entry_size + part_entry_overhead)); - const auto max_entries = entries_per_part * 4 + 1; - - std::unique_ptr f2; - r = RCf::FIFO::open(&dp, ioctx, fifo_id, &f2, null_yield); - ASSERT_EQ(0, r); - - /* push one entry to f2 and the rest to f1 */ - for (auto i = 0u; i < max_entries; ++i) { - cb::list bl; - *(int *)buf = i; - bl.append(buf, sizeof(buf)); - auto& f = (i < 1 ? f2 : f1); - auto c = R::Rados::aio_create_completion(); - f->push(&dp, bl, c); - c->wait_for_complete(); - r = c->get_return_value(); - c->release(); - ASSERT_EQ(0, r); - } - - /* trim half by fifo1 */ - auto num = max_entries / 2; - std::string marker; - std::vector result; - bool more = false; - auto c = R::Rados::aio_create_completion(); - f1->list(&dp, num, std::nullopt, &result, &more, c); - c->wait_for_complete(); - r = c->get_return_value(); - c->release(); - ASSERT_EQ(0, r); - ASSERT_EQ(true, more); - ASSERT_EQ(num, result.size()); - - for (auto i = 0u; i < num; ++i) { - auto& bl = result[i].data; - ASSERT_EQ(i, *(int *)bl.c_str()); - } - - auto& entry = result[num - 1]; - marker = entry.marker; - c = R::Rados::aio_create_completion(); - f1->trim(&dp, marker, false, c); - c->wait_for_complete(); - r = c->get_return_value(); - c->release(); - ASSERT_EQ(0, r); - /* list what's left by fifo2 */ - - const auto left = max_entries - num; - c = R::Rados::aio_create_completion(); - f2->list(&dp, left, marker, &result, &more, c); - c->wait_for_complete(); - r = c->get_return_value(); - c->release(); - ASSERT_EQ(0, r); - ASSERT_EQ(left, result.size()); - ASSERT_EQ(false, more); - - for (auto i = num; i < max_entries; ++i) { - auto& bl = result[i - num].data; - ASSERT_EQ(i, *(int *)bl.c_str()); - } -} - -TEST_F(AioLegacyFIFO, TestPushBatch) -{ - static constexpr auto max_part_size = 2048ull; - static constexpr auto max_entry_size = 128ull; - - std::unique_ptr f; - auto r = RCf::FIFO::create(&dp, ioctx, fifo_id, &f, null_yield, std::nullopt, - std::nullopt, false, max_part_size, - max_entry_size); - ASSERT_EQ(0, r); - - char buf[max_entry_size]; - memset(buf, 0, sizeof(buf)); - auto [part_header_size, part_entry_overhead] = f->get_part_layout_info(); - auto entries_per_part = ((max_part_size - part_header_size) / - (max_entry_size + part_entry_overhead)); - auto max_entries = entries_per_part * 4 + 1; /* enough entries to span multiple parts */ - std::vector bufs; - for (auto i = 0u; i < max_entries; ++i) { - cb::list bl; - *(int *)buf = i; - bl.append(buf, sizeof(buf)); - bufs.push_back(bl); - } - ASSERT_EQ(max_entries, bufs.size()); - - auto c = R::Rados::aio_create_completion(); - f->push(&dp, bufs, c); - c->wait_for_complete(); - r = c->get_return_value(); - c->release(); - ASSERT_EQ(0, r); - - /* list all */ - - std::vector result; - bool more = false; - c = R::Rados::aio_create_completion(); - f->list(&dp, max_entries, std::nullopt, &result, &more, c); - c->wait_for_complete(); - r = c->get_return_value(); - c->release(); - ASSERT_EQ(0, r); - ASSERT_EQ(false, more); - ASSERT_EQ(max_entries, result.size()); - for (auto i = 0u; i < max_entries; ++i) { - auto& bl = result[i].data; - ASSERT_EQ(i, *(int *)bl.c_str()); - } - auto& info = f->meta(); - ASSERT_EQ(info.head_part_num, 4); -} - -TEST_F(LegacyFIFO, TrimAll) -{ - std::unique_ptr f; - auto r = RCf::FIFO::create(&dp, ioctx, fifo_id, &f, null_yield); - ASSERT_EQ(0, r); - static constexpr auto max_entries = 10u; - for (uint32_t i = 0; i < max_entries; ++i) { - cb::list bl; - encode(i, bl); - r = f->push(&dp, bl, null_yield); - ASSERT_EQ(0, r); - } - - /* trim one entry */ - r = f->trim(&dp, RCf::marker::max().to_string(), false, null_yield); - ASSERT_EQ(-ENODATA, r); - - std::vector result; - bool more; - r = f->list(&dp, 1, std::nullopt, &result, &more, null_yield); - ASSERT_EQ(0, r); - ASSERT_TRUE(result.empty()); -} - -TEST_F(LegacyFIFO, AioTrimAll) -{ - std::unique_ptr f; - auto r = RCf::FIFO::create(&dp, ioctx, fifo_id, &f, null_yield); - ASSERT_EQ(0, r); - static constexpr auto max_entries = 10u; - for (uint32_t i = 0; i < max_entries; ++i) { - cb::list bl; - encode(i, bl); - r = f->push(&dp, bl, null_yield); - ASSERT_EQ(0, r); - } - - auto c = R::Rados::aio_create_completion(); - f->trim(&dp, RCf::marker::max().to_string(), false, c); - c->wait_for_complete(); - r = c->get_return_value(); - c->release(); - ASSERT_EQ(-ENODATA, r); - - std::vector result; - bool more; - r = f->list(&dp, 1, std::nullopt, &result, &more, null_yield); - ASSERT_EQ(0, r); - ASSERT_TRUE(result.empty()); -}