From: Adam C. Emerson Date: Thu, 14 May 2020 15:11:12 +0000 (-0400) Subject: rgw: FIFO for legacy RADOS client X-Git-Tag: wip-pdonnell-testing-20200918.022351~91^2~10 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=2d2a90c15ab95f7fe926b488cc7c4454a41980dd;p=ceph-ci.git rgw: FIFO for legacy RADOS client Use optional_yield for most operations and provide an AioCompletion* overload for trim. Signed-off-by: Adam C. Emerson --- diff --git a/src/rgw/CMakeLists.txt b/src/rgw/CMakeLists.txt index 13ef176724e..8303f790e27 100644 --- a/src/rgw/CMakeLists.txt +++ b/src/rgw/CMakeLists.txt @@ -153,7 +153,8 @@ set(librgw_common_srcs rgw_object_lock.cc rgw_kms.cc rgw_url.cc - rgw_oidc_provider) + rgw_oidc_provider + cls_fifo_legacy.cc) if(WITH_RADOSGW_AMQP_ENDPOINT) list(APPEND librgw_common_srcs rgw_amqp.cc) diff --git a/src/rgw/cls_fifo_legacy.cc b/src/rgw/cls_fifo_legacy.cc new file mode 100644 index 00000000000..d44f52cf15e --- /dev/null +++ b/src/rgw/cls_fifo_legacy.cc @@ -0,0 +1,1107 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2020 Red Hat + * Author: Adam C. Emerson + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include +#include +#include +#include + +#undef FMT_HEADER_ONLY +#define FMT_HEADER_ONLY 1 +#include + +#include "include/rados/librados.hpp" + +#include "include/buffer.h" + +#include "common/async/yield_context.h" +#include "common/random_string.h" + +#include "cls/fifo/cls_fifo_types.h" +#include "cls/fifo/cls_fifo_ops.h" + +#include "librados/AioCompletionImpl.h" + +#include "rgw_tools.h" + +#include "cls_fifo_legacy.h" + +namespace rgw::cls::fifo { +namespace cb = ceph::buffer; +namespace fifo = rados::cls::fifo; + +using ceph::from_error_code; + +inline constexpr auto MAX_RACE_RETRIES = 10; + +void create_meta(lr::ObjectWriteOperation* op, + std::string_view id, + std::optional objv, + std::optional oid_prefix, + bool exclusive, + std::uint64_t max_part_size, + std::uint64_t max_entry_size) +{ + fifo::op::create_meta cm; + + cm.id = id; + cm.version = objv; + cm.oid_prefix = oid_prefix; + cm.max_part_size = max_part_size; + cm.max_entry_size = max_entry_size; + cm.exclusive = exclusive; + + cb::list in; + encode(cm, in); + op->exec(fifo::op::CLASS, fifo::op::CREATE_META, in); +} + +int get_meta(lr::IoCtx& ioctx, const std::string& oid, + std::optional objv, fifo::info* info, + std::uint32_t* part_header_size, + std::uint32_t* part_entry_overhead, + optional_yield y) +{ + lr::ObjectReadOperation op; + fifo::op::get_meta gm; + gm.version = objv; + cb::list in; + encode(gm, in); + cb::list bl; + + op.exec(fifo::op::CLASS, fifo::op::GET_META, in, + &bl, nullptr); + auto r = rgw_rados_operate(ioctx, oid, &op, nullptr, y); + if (r >= 0) try { + fifo::op::get_meta_reply reply; + auto iter = bl.cbegin(); + decode(reply, iter); + if (info) *info = std::move(reply.info); + if (part_header_size) *part_header_size = reply.part_header_size; + if (part_entry_overhead) + *part_entry_overhead = reply.part_entry_overhead; + } catch (const cb::error& err) { + r = from_error_code(err.code()); + } + return r; +}; + +void update_meta(lr::ObjectWriteOperation* op, const fifo::objv& objv, + const fifo::update& update) +{ + fifo::op::update_meta um; + + um.version = objv; + um.tail_part_num = update.tail_part_num(); + um.head_part_num = update.head_part_num(); + um.min_push_part_num = update.min_push_part_num(); + um.max_push_part_num = update.max_push_part_num(); + um.journal_entries_add = std::move(update).journal_entries_add(); + um.journal_entries_rm = std::move(update).journal_entries_rm(); + + cb::list in; + encode(um, in); + op->exec(fifo::op::CLASS, fifo::op::UPDATE_META, in); +} + +void part_init(lr::ObjectWriteOperation* op, std::string_view tag, + fifo::data_params params) +{ + fifo::op::init_part ip; + + ip.tag = tag; + ip.params = params; + + cb::list in; + encode(ip, in); + op->exec(fifo::op::CLASS, fifo::op::INIT_PART, in); +} + +int push_part(lr::IoCtx& ioctx, const std::string& oid, std::string_view tag, + std::deque data_bufs, optional_yield y) +{ + lr::ObjectWriteOperation op; + fifo::op::push_part pp; + + pp.tag = tag; + pp.data_bufs = data_bufs; + pp.total_len = 0; + + for (const auto& bl : data_bufs) + pp.total_len += bl.length(); + + cb::list in; + encode(pp, in); + auto retval = 0; + op.exec(fifo::op::CLASS, fifo::op::PUSH_PART, in, nullptr, &retval); + auto r = rgw_rados_operate(ioctx, oid, &op, y, lr::OPERATION_RETURNVEC); + return r < 0 ? r : retval; +} + +void trim_part(lr::ObjectWriteOperation* op, + std::optional tag, + std::uint64_t ofs) +{ + fifo::op::trim_part tp; + + tp.tag = tag; + tp.ofs = ofs; + + cb::list in; + encode(tp, in); + op->exec(fifo::op::CLASS, fifo::op::TRIM_PART, in); +} + +int list_part(lr::IoCtx& ioctx, const std::string& oid, + std::optional tag, std::uint64_t ofs, + std::uint64_t max_entries, + std::vector* entries, + bool* more, bool* full_part, std::string* ptag, + optional_yield y) +{ + lr::ObjectReadOperation op; + fifo::op::list_part lp; + + lp.tag = tag; + lp.ofs = ofs; + lp.max_entries = max_entries; + + cb::list in; + encode(lp, in); + cb::list bl; + op.exec(fifo::op::CLASS, fifo::op::LIST_PART, in, &bl, nullptr); + auto r = rgw_rados_operate(ioctx, oid, &op, nullptr, y); + if (r >= 0) try { + fifo::op::list_part_reply reply; + auto iter = bl.cbegin(); + decode(reply, iter); + if (entries) *entries = std::move(reply.entries); + if (more) *more = reply.more; + if (full_part) *full_part = reply.full_part; + if (ptag) *ptag = reply.tag; + } catch (const cb::error& err) { + r = from_error_code(err.code()); + } + return r; +} + +int get_part_info(lr::IoCtx& ioctx, const std::string& oid, + fifo::part_header* header, optional_yield y) +{ + lr::ObjectReadOperation op; + fifo::op::get_part_info gpi; + + cb::list in; + cb::list bl; + encode(gpi, in); + op.exec(fifo::op::CLASS, fifo::op::GET_PART_INFO, in, &bl, nullptr); + auto r = rgw_rados_operate(ioctx, oid, &op, nullptr, y); + if (r >= 0) try { + fifo::op::get_part_info_reply reply; + auto iter = bl.cbegin(); + decode(reply, iter); + if (header) *header = std::move(reply.header); + } catch (const cb::error& err) { + r = from_error_code(err.code()); + } + return r; +} + +static void complete(lr::AioCompletion* c_, int r) +{ + auto c = c_->pc; + c->lock.lock(); + c->rval = r; + c->complete = true; + c->lock.unlock(); + + auto cb_complete = c->callback_complete; + auto cb_complete_arg = c->callback_complete_arg; + if (cb_complete) + cb_complete(c, cb_complete_arg); + + auto cb_safe = c->callback_safe; + auto cb_safe_arg = c->callback_safe_arg; + if (cb_safe) + cb_safe(c, cb_safe_arg); + + c->lock.lock(); + c->callback_complete = NULL; + c->callback_safe = NULL; + c->cond.notify_all(); + c->put_unlock(); +} + +std::optional FIFO::to_marker(std::string_view s) +{ + marker m; + if (s.empty()) { + m.num = info.tail_part_num; + m.ofs = 0; + return m; + } + + auto pos = s.find(':'); + if (pos == string::npos) { + return std::nullopt; + } + + auto num = s.substr(0, pos); + auto ofs = s.substr(pos + 1); + + auto n = ceph::parse(num); + if (!n) { + return std::nullopt; + } + m.num = *n; + auto o = ceph::parse(ofs); + if (!o) { + return std::nullopt; + } + m.ofs = *o; + return m; +} + +std::string FIFO::generate_tag() const +{ + static constexpr auto HEADER_TAG_SIZE = 16; + return gen_rand_alphanumeric_plain(static_cast(ioctx.cct()), + HEADER_TAG_SIZE); +} + + +int FIFO::apply_update(fifo::info* info, + const fifo::objv& objv, + const fifo::update& update) +{ + std::unique_lock l(m); + auto err = info->apply_update(update); + if (objv != info->version) { + return -ECANCELED; + } + if (err) { + return -ECANCELED; + } + + ++info->version.ver; + + return {}; +} + +int FIFO::_update_meta(const fifo::update& update, + fifo::objv version, bool* pcanceled, + optional_yield y) +{ + lr::ObjectWriteOperation op; + bool canceled = false; + update_meta(&op, info.version, update); + auto r = rgw_rados_operate(ioctx, oid, &op, y); + if (r >= 0 || r == -ECANCELED) { + canceled = (r == -ECANCELED); + if (!canceled) { + r = apply_update(&info, version, update); + if (r < 0) canceled = true; + } + if (canceled) { + r = read_meta(y); + canceled = r < 0 ? false : true; + } + } + if (pcanceled) *pcanceled = canceled; + return r; +} + +struct Updater { + FIFO* fifo; + lr::AioCompletion* super; + lr::AioCompletion* cur = lr::Rados::aio_create_completion( + static_cast(this), &FIFO::update_callback); + fifo::update update; + fifo::objv version; + bool reread = false; + bool* pcanceled = nullptr; + Updater(FIFO* fifo, lr::AioCompletion* super, + const fifo::update& update, fifo::objv version, + bool* pcanceled) + : fifo(fifo), super(super), update(update), version(version), + pcanceled(pcanceled) { + super->pc->get(); + } + ~Updater() { + cur->release(); + } +}; + + +void FIFO::update_callback(lr::completion_t, void* arg) +{ + auto updater = static_cast(arg); + if (!updater->reread) { + int r = updater->cur->get_return_value(); + if (r < 0 && r != -ECANCELED) { + complete(updater->super, r); + delete updater; + return; + } + bool canceled = (r == -ECANCELED); + if (!canceled) { + int r = updater->fifo->apply_update(&updater->fifo->info, updater->version, + updater->update); + if (r < 0) + canceled = true; + } + if (!canceled) { + if (updater->pcanceled) + *updater->pcanceled = false; + complete(updater->super, 0); + } else { + updater->cur->release(); + updater->cur = lr::Rados::aio_create_completion( + arg, &FIFO::update_callback); + assert(uintptr_t(updater->cur) >= 0x1000); + updater->reread = true; + auto r = updater->fifo->read_meta(updater->cur); + if (r < 0) { + complete(updater->super, r); + delete updater; + } + } + } else { + int r = updater->cur->get_return_value(); + if (r < 0 && updater->pcanceled) { + *updater->pcanceled = false; + } else if (r >= 0 && updater->pcanceled) { + *updater->pcanceled = true; + } + complete(updater->super, r); + delete updater; + } +} + +int FIFO::_update_meta(const fifo::update& update, + fifo::objv version, bool* pcanceled, + lr::AioCompletion* c) +{ + lr::ObjectWriteOperation op; + update_meta(&op, info.version, update); + auto updater = new Updater(this, c, update, version, pcanceled); + lr::AioCompletion* cur = lr::Rados::aio_create_completion( + static_cast(updater), &FIFO::update_callback); + updater->cur = cur; + assert(uintptr_t(updater->cur) >= 0x1000); + auto r = ioctx.aio_operate(oid, cur, &op); + if (r < 0) { + delete updater; + } + return r; +} + +int FIFO::create_part(int64_t part_num, std::string_view tag, optional_yield y) +{ + lr::ObjectWriteOperation op; + op.create(false); /* We don't need exclusivity, part_init ensures + we're creating from the same journal entry. */ + std::unique_lock l(m); + part_init(&op, tag, info.params); + auto oid = info.part_oid(part_num); + l.unlock(); + return rgw_rados_operate(ioctx, oid, &op, y); +} + +int FIFO::remove_part(int64_t part_num, std::string_view tag, optional_yield y) +{ + lr::ObjectWriteOperation op; + op.remove(); + std::unique_lock l(m); + auto oid = info.part_oid(part_num); + l.unlock(); + return rgw_rados_operate(ioctx, oid, &op, y); +} + +int FIFO::process_journal(optional_yield y) +{ + std::vector processed; + + std::unique_lock l(m); + auto tmpjournal = info.journal; + auto new_tail = info.tail_part_num; + auto new_head = info.head_part_num; + auto new_max = info.max_push_part_num; + l.unlock(); + + int r; + for (auto& [n, entry] : tmpjournal) { + switch (entry.op) { + case fifo::journal_entry::Op::create: + r = create_part(entry.part_num, entry.part_tag, y); + if (entry.part_num > new_max) { + new_max = entry.part_num; + } + break; + case fifo::journal_entry::Op::set_head: + if (entry.part_num > new_head) { + new_head = entry.part_num; + } + break; + case fifo::journal_entry::Op::remove: + r = remove_part(entry.part_num, entry.part_tag, y); + if (r == -ENOENT) r = 0; + if (entry.part_num >= new_tail) { + new_tail = entry.part_num + 1; + } + break; + default: + return -EIO; + } + + if (r < 0) { + return -EIO; + } + + processed.push_back(std::move(entry)); + } + + // Postprocess + + bool canceled = true; + + for (auto i = 0; canceled && i < MAX_RACE_RETRIES; ++i) { + std::optional tail_part_num; + std::optional head_part_num; + std::optional max_part_num; + + std::unique_lock l(m); + auto objv = info.version; + if (new_tail > tail_part_num) tail_part_num = new_tail; + if (new_head > info.head_part_num) head_part_num = new_head; + if (new_max > info.max_push_part_num) max_part_num = new_max; + l.unlock(); + + if (processed.empty() && + !tail_part_num && + !max_part_num) { + /* nothing to update anymore */ + canceled = false; + break; + } + _update_meta(fifo::update() + .tail_part_num(tail_part_num) + .head_part_num(head_part_num) + .max_push_part_num(max_part_num) + .journal_entries_rm(processed), + objv, &canceled, y); + if (r < 0) break; + + if (canceled) { + std::vector new_processed; + std::unique_lock l(m); + for (auto& e : processed) { + auto jiter = info.journal.find(e.part_num); + /* journal entry was already processed */ + if (jiter == info.journal.end() || + !(jiter->second == e)) { + continue; + } + new_processed.push_back(e); + } + processed = std::move(new_processed); + } + } + if (canceled) + r = -ECANCELED; + return r; +} + +int FIFO::_prepare_new_part(bool is_head, optional_yield y) +{ + std::unique_lock l(m); + std::vector jentries = { info.next_journal_entry(generate_tag()) }; + std::int64_t new_head_part_num = info.head_part_num; + auto version = info.version; + + if (is_head) { + auto new_head_jentry = jentries.front(); + new_head_jentry.op = fifo::journal_entry::Op::set_head; + new_head_part_num = jentries.front().part_num; + jentries.push_back(std::move(new_head_jentry)); + } + l.unlock(); + + int r = 0; + bool canceled = true; + for (auto i = 0; canceled && i < MAX_RACE_RETRIES; ++i) { + canceled = false; + r = _update_meta(fifo::update{}.journal_entries_add(jentries), + version, &canceled, y); + if (r >= 0 && canceled) { + std::unique_lock l(m); + auto found = (info.journal.find(jentries.front().part_num) != + info.journal.end()); + if ((info.max_push_part_num >= jentries.front().part_num && + info.head_part_num >= new_head_part_num)) { + // We don't even need to process the journal. + return 0; + } + if (found) { + // Journaled, but not processed. + canceled = false; + } + l.unlock(); + } + } + if (canceled) + r = -ECANCELED; + if (r >= 0) + r = process_journal(y); + return r; +} + +int FIFO::_prepare_new_head(optional_yield y) { + std::unique_lock l(m); + std::int64_t new_head_num = info.head_part_num + 1; + auto max_push_part_num = info.max_push_part_num; + auto version = info.version; + l.unlock(); + + int r = 0; + if (max_push_part_num < new_head_num) { + r = _prepare_new_part(true, y); + if (r >= 0) { + std::unique_lock l(m); + if (info.max_push_part_num < new_head_num) { + r = -EIO; + } + l.unlock(); + } + } else { + bool canceled = true; + for (auto i = 0; canceled && i < MAX_RACE_RETRIES; ++i) { + _update_meta(fifo::update{}.head_part_num(new_head_num), + version, &canceled, y); + if (r < 0) + break; + std::unique_lock l(m); + auto head_part_num = info.head_part_num; + version = info.version; + l.unlock(); + if (canceled && (head_part_num >= new_head_num)) { + // Race, but someone did it for us + canceled = false; + } + } + if (canceled) + r = -ECANCELED; + } + return 0; +} + +int FIFO::push_entries(const std::deque& data_bufs, + optional_yield y) +{ + std::unique_lock l(m); + auto head_part_num = info.head_part_num; + auto tag = info.head_tag; + const auto part_oid = info.part_oid(head_part_num); + l.unlock(); + + return push_part(ioctx, part_oid, tag, data_bufs, y); +} + +int FIFO::trim_part(int64_t part_num, uint64_t ofs, + std::optional tag, + optional_yield y) +{ + lr::ObjectWriteOperation op; + std::unique_lock l(m); + const auto part_oid = info.part_oid(part_num); + l.unlock(); + rgw::cls::fifo::trim_part(&op, tag, ofs); + return rgw_rados_operate(ioctx, part_oid, &op, y); +} + +int FIFO::trim_part(int64_t part_num, uint64_t ofs, + std::optional tag, + lr::AioCompletion* c) +{ + lr::ObjectWriteOperation op; + std::unique_lock l(m); + const auto part_oid = info.part_oid(part_num); + l.unlock(); + rgw::cls::fifo::trim_part(&op, tag, ofs); + return ioctx.aio_operate(part_oid, c, &op); +} + +int FIFO::open(lr::IoCtx ioctx, std::string oid, std::unique_ptr* fifo, + optional_yield y, std::optional objv) +{ + fifo::info info; + std::uint32_t size; + std::uint32_t over; + int r = get_meta(ioctx, std::move(oid), objv, &info, &size, &over, y); + if (r >= 0) { + std::unique_ptr f(new FIFO(std::move(ioctx), oid)); + f->info = info; + f->part_header_size = size; + f->part_entry_overhead = over; + // If there are journal entries, process them, in case + // someone crashed mid-transaction. + if (!info.journal.empty()) { + r = f->process_journal(y); + if (r >= 0 && fifo) + *fifo = std::move(f); + } else { + *fifo = std::move(f); + } + } + return r; +} + +int FIFO::create(lr::IoCtx ioctx, std::string oid, std::unique_ptr* fifo, + optional_yield y, std::optional objv, + std::optional oid_prefix, + bool exclusive, std::uint64_t max_part_size, + std::uint64_t max_entry_size) +{ + lr::ObjectWriteOperation op; + create_meta(&op, oid, objv, oid_prefix, exclusive, max_part_size, + max_entry_size); + int r = rgw_rados_operate(ioctx, oid, &op, y); + if (r >= 0) { + r = open(std::move(ioctx), std::move(oid), fifo, y, objv); + } + return r; +} + +int FIFO::read_meta(optional_yield y) { + fifo::info _info; + std::uint32_t _phs; + std::uint32_t _peo; + + auto r = get_meta(ioctx, oid, nullopt, &_info, &_phs, &_peo, y); + if (r >= 0) { + std::unique_lock l(m); + // We have a newer version already! + if (_info.version.same_or_later(this->info.version)) { + info = std::move(_info); + part_header_size = _phs; + part_entry_overhead = _peo; + } + } + return r; +} + +struct Reader { + FIFO* fifo; + cb::list bl; + lr::AioCompletion* super; + lr::AioCompletion* cur = lr::Rados::aio_create_completion( + static_cast(this), &FIFO::read_callback); + Reader(FIFO* fifo, lr::AioCompletion* super) + : fifo(fifo), super(super) { + super->pc->get(); + } + ~Reader() { + cur->release(); + } +}; + +void FIFO::read_callback(lr::completion_t, void* arg) +{ + auto reader = static_cast(arg); + auto r = reader->cur->get_return_value(); + if (r >= 0) try { + fifo::op::get_meta_reply reply; + auto iter = reader->bl.cbegin(); + decode(reply, iter); + std::unique_lock l(reader->fifo->m); + if (reply.info.version.same_or_later(reader->fifo->info.version)) { + reader->fifo->info = std::move(reply.info); + reader->fifo->part_header_size = reply.part_header_size; + reader->fifo->part_entry_overhead = reply.part_entry_overhead; + } + } catch (const cb::error& err) { + r = from_error_code(err.code()); + } + complete(reader->super, r); + delete reader; +} + +int FIFO::read_meta(lr::AioCompletion* c) +{ + lr::ObjectReadOperation op; + fifo::op::get_meta gm; + cb::list in; + encode(gm, in); + auto reader = new Reader(this, c); + auto r = ioctx.aio_exec(oid, reader->cur, fifo::op::CLASS, + fifo::op::GET_META, in, &reader->bl); + if (r < 0) { + delete reader; + } + return r; +} + + +const fifo::info& FIFO::meta() const { + return info; +} + +std::pair FIFO::get_part_layout_info() const { + return {part_header_size, part_entry_overhead}; +} + +int FIFO::push(const cb::list& bl, optional_yield y) { + return push(std::vector{ bl }, y); +} + +int FIFO::push(const std::vector& data_bufs, optional_yield y) +{ + std::unique_lock l(m); + auto max_entry_size = info.params.max_entry_size; + auto need_new_head = info.need_new_head(); + l.unlock(); + if (data_bufs.empty()) { + return 0; + } + + // Validate sizes + for (const auto& bl : data_bufs) + if (bl.length() > max_entry_size) + return -E2BIG; + + int r = 0; + if (need_new_head) { + r = _prepare_new_head(y); + if (r < 0) + return r; + } + + std::deque remaining(data_bufs.begin(), data_bufs.end()); + std::deque batch; + + uint64_t batch_len = 0; + auto retries = 0; + bool canceled = true; + while ((!remaining.empty() || !batch.empty()) && + (retries <= MAX_RACE_RETRIES)) { + std::unique_lock l(m); + auto max_part_size = info.params.max_part_size; + auto overhead = part_entry_overhead; + l.unlock(); + + while (!remaining.empty() && + (remaining.front().length() + batch_len <= max_part_size)) { + /* We can send entries with data_len up to max_entry_size, + however, we want to also account the overhead when + dealing with multiple entries. Previous check doesn't + account for overhead on purpose. */ + batch_len += remaining.front().length() + overhead; + batch.push_back(std::move(remaining.front())); + remaining.pop_front(); + } + + auto r = push_entries(batch, y); + if (r >= 0) { + // Made forward progress! + canceled = false; + retries = 0; + batch_len = 0; + if (static_cast(r) == batch.size()) { + batch.clear(); + } else { + batch.erase(batch.begin(), batch.begin() + r); + for (const auto& b : batch) { + batch_len += b.length() + part_entry_overhead; + } + } + } else if (r == -ERANGE) { + canceled = true; + ++retries; + r = _prepare_new_head(y); + } + if (r < 0) + break; + } + if (canceled && r == 0) + r = -ECANCELED; + return r; +} + +int FIFO::list(int max_entries, + std::optional markstr, + std::vector* presult, bool* pmore, + optional_yield y) +{ + std::unique_lock l(m); + std::int64_t part_num = info.tail_part_num; + l.unlock(); + std::uint64_t ofs = 0; + if (markstr) { + auto marker = to_marker(*markstr); + if (!marker) return -EINVAL; + part_num = marker->num; + ofs = marker->ofs; + } + + std::vector result; + result.reserve(max_entries); + bool more = false; + + std::vector entries; + int r = 0; + while (max_entries > 0) { + bool part_more = false; + bool part_full = false; + + std::unique_lock l(m); + auto part_oid = info.part_oid(part_num); + l.unlock(); + + list_part(ioctx, part_oid, {}, ofs, max_entries, &entries, + &part_more, &part_full, nullptr, y); + if (r == -ENOENT) { + r = read_meta(y); + if (r < 0) return r; + if (part_num < info.tail_part_num) { + /* raced with trim? restart */ + max_entries += result.size(); + result.clear(); + std::unique_lock l(m); + part_num = info.tail_part_num; + l.unlock(); + ofs = 0; + continue; + } + /* assuming part was not written yet, so end of data */ + more = false; + r = 0; + break; + } + if (r < 0) { + break; + } + more = part_full || part_more; + for (auto& entry : entries) { + list_entry e; + e.data = std::move(entry.data); + e.marker = marker{part_num, entry.ofs}.to_string(); + e.mtime = entry.mtime; + result.push_back(std::move(e)); + --max_entries; + if (max_entries == 0) + break; + } + entries.clear(); + if (max_entries > 0 && + part_more) { + } + + if (!part_full) { + /* head part is not full, so we can assume we're done. */ + break; + } + if (!part_more) { + ++part_num; + ofs = 0; + } + } + if (r >= 0) { + if (presult) *presult = std::move(result); + if (pmore) *pmore = more; + } + return r; +} + +int FIFO::trim(std::string_view markstr, optional_yield y) +{ + auto marker = to_marker(markstr); + if (!marker) { + return -EINVAL; + } + auto part_num = marker->num; + auto ofs = marker->ofs; + std::unique_lock l(m); + auto pn = info.tail_part_num; + l.unlock(); + + int r; + while (pn < part_num) { + std::unique_lock l(m); + auto max_part_size = info.params.max_part_size; + l.unlock(); + r = trim_part(pn, max_part_size, std::nullopt, y); + if (r < 0 && r == -ENOENT) { + return r; + } + ++pn; + } + r = trim_part(part_num, ofs, std::nullopt, y); + if (r < 0 && r != -ENOENT) { + return r; + } + + l.lock(); + auto tail_part_num = info.tail_part_num; + auto objv = info.version; + l.unlock(); + bool canceled = tail_part_num < part_num; + int retries = 0; + while ((tail_part_num < part_num) && + canceled && + (retries <= MAX_RACE_RETRIES)) { + r = _update_meta(fifo::update{}.tail_part_num(part_num), objv, &canceled, + y); + if (canceled) { + l.lock(); + tail_part_num = info.tail_part_num; + objv = info.version; + l.unlock(); + ++retries; + } + } + if (canceled) { + r = -EIO; + } + return r; +} + +struct Trimmer { + FIFO* fifo; + std::int64_t part_num; + std::uint64_t ofs; + std::int64_t pn; + lr::AioCompletion* super; + lr::AioCompletion* cur = lr::Rados::aio_create_completion( + static_cast(this), &FIFO::trim_callback); + bool update = false; + bool canceled = false; + int retries = 0; + + Trimmer(FIFO* fifo, std::int64_t part_num, std::uint64_t ofs, std::int64_t pn, + lr::AioCompletion* super) + : fifo(fifo), part_num(part_num), ofs(ofs), pn(pn), super(super) { + super->pc->get(); + } + ~Trimmer() { + cur->release(); + } +}; + +void FIFO::trim_callback(lr::completion_t, void* arg) +{ + auto trimmer = static_cast(arg); + int r = trimmer->cur->get_return_value(); + if (r == -ENOENT) { + r = 0; + } + + if (r < 0) { + complete(trimmer->super, r); + delete trimmer; + } else if (!trimmer->update) { + trimmer->retries = 0; + if (trimmer->pn < trimmer->part_num) { + std::unique_lock l(trimmer->fifo->m); + const auto max_part_size = trimmer->fifo->info.params.max_part_size; + l.unlock(); + trimmer->cur->release(); + trimmer->cur = lr::Rados::aio_create_completion(arg, &FIFO::trim_callback); + r = trimmer->fifo->trim_part(trimmer->pn++, max_part_size, std::nullopt, + trimmer->cur); + if (r < 0) { + complete(trimmer->super, r); + delete trimmer; + } + } else { + std::unique_lock l(trimmer->fifo->m); + const auto tail_part_num = trimmer->fifo->info.tail_part_num; + l.unlock(); + trimmer->cur->release(); + trimmer->cur = lr::Rados::aio_create_completion(arg, &FIFO::trim_callback); + trimmer->update = true; + trimmer->canceled = tail_part_num < trimmer->part_num; + r = trimmer->fifo->trim_part(trimmer->part_num, trimmer->ofs, + std::nullopt, trimmer->cur); + if (r < 0) { + complete(trimmer->super, r); + delete trimmer; + } + } + } else { + std::unique_lock l(trimmer->fifo->m); + auto tail_part_num = trimmer->fifo->info.tail_part_num; + auto objv = trimmer->fifo->info.version; + l.unlock(); + if ((tail_part_num < trimmer->part_num) && + trimmer->canceled) { + if (trimmer->retries > MAX_RACE_RETRIES) { + complete(trimmer->super, -EIO); + delete trimmer; + } else { + trimmer->cur->release(); + trimmer->cur = lr::Rados::aio_create_completion(arg, + &FIFO::trim_callback); + ++trimmer->retries; + auto r = trimmer->fifo->_update_meta(fifo::update{} + .tail_part_num(trimmer->part_num), + objv, &trimmer->canceled, + trimmer->cur); + if (r < 0) { + complete(trimmer->super, r); + delete trimmer; + } + } + } else { + complete(trimmer->super, 0); + } + } +} + +int FIFO::trim(std::string_view markstr, lr::AioCompletion* c) { + auto marker = to_marker(markstr); + if (!marker) { + return -EINVAL; + } + std::unique_lock l(m); + const auto max_part_size = info.params.max_part_size; + const auto pn = info.tail_part_num; + const auto part_oid = info.part_oid(pn); + l.unlock(); + auto trimmer = new Trimmer(this, marker->num, marker->ofs, pn, c); + ++trimmer->pn; + auto ofs = marker->ofs; + if (pn < marker->num) { + ofs = max_part_size; + } else { + trimmer->update = true; + } + auto r = trimmer->fifo->trim_part(pn, ofs, std::nullopt, trimmer->cur); + if (r < 0) { + complete(trimmer->super, r); + delete trimmer; + } + return r; +} + +int FIFO::get_part_info(int64_t part_num, + fifo::part_header* header, + optional_yield y) +{ + std::unique_lock l(m); + const auto part_oid = info.part_oid(part_num); + l.unlock(); + return rgw::cls::fifo::get_part_info(ioctx, part_oid, header, y); +} +} diff --git a/src/rgw/cls_fifo_legacy.h b/src/rgw/cls_fifo_legacy.h new file mode 100644 index 00000000000..19c024719b0 --- /dev/null +++ b/src/rgw/cls_fifo_legacy.h @@ -0,0 +1,231 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2020 Red Hat + * Author: Adam C. Emerson + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef CEPH_RGW_CLS_FIFO_LEGACY_H +#define CEPH_RGW_CLS_FIFO_LEGACY_H + +#include +#include +#include +#include +#include +#include +#include +#include + +#undef FMT_HEADER_ONLY +#define FMT_HEADER_ONLY 1 +#include + +#include "include/rados/librados.hpp" +#include "include/buffer.h" + +#include "common/async/yield_context.h" + +#include "cls/fifo/cls_fifo_types.h" +#include "cls/fifo/cls_fifo_ops.h" + +namespace rgw::cls::fifo { +namespace cb = ceph::buffer; +namespace fifo = rados::cls::fifo; +namespace lr = librados; + +inline constexpr std::uint64_t default_max_part_size = 4 * 1024 * 1024; +inline constexpr std::uint64_t default_max_entry_size = 32 * 1024; + +void create_meta(lr::ObjectWriteOperation* op, std::string_view id, + std::optional objv, + std::optional oid_prefix, + bool exclusive = false, + std::uint64_t max_part_size = default_max_part_size, + std::uint64_t max_entry_size = default_max_entry_size); +int get_meta(lr::IoCtx& ioctx, const std::string& oid, + std::optional objv, fifo::info* info, + std::uint32_t* part_header_size, + std::uint32_t* part_entry_overhead, optional_yield y); +void update_meta(lr::ObjectWriteOperation* op, const fifo::objv& objv, + const fifo::update& update); +void part_init(lr::ObjectWriteOperation* op, std::string_view tag, + fifo::data_params params); +int push_part(lr::IoCtx& ioctx, const std::string& oid, std::string_view tag, + std::deque data_bufs, optional_yield y); +void trim_part(lr::ObjectWriteOperation* op, + std::optional tag, std::uint64_t ofs); +int list_part(lr::IoCtx& ioctx, const std::string& oid, + std::optional tag, std::uint64_t ofs, + std::uint64_t max_entries, + std::vector* entries, + bool* more, bool* full_part, std::string* ptag, + optional_yield y); +int get_part_info(lr::IoCtx& ioctx, const std::string& oid, + fifo::part_header* header, optional_yield y); + +struct marker { + std::int64_t num = 0; + std::uint64_t ofs = 0; + + marker() = default; + marker(std::int64_t num, std::uint64_t ofs) : num(num), ofs(ofs) {} + static marker max() { + return { std::numeric_limits::max(), + std::numeric_limits::max() }; + } + + std::string to_string() { + return fmt::format("{:0>20}:{:0>20}", num, ofs); + } +}; + +struct list_entry { + cb::list data; + std::string marker; + ceph::real_time mtime; +}; + +using part_info = fifo::part_header; + +/// This is an implementation of FIFO using librados to facilitate +/// backports. Please see /src/neorados/cls/fifo.h for full +/// information. +/// +/// This library uses optional_yield. Please see +/// /src/common/async/yield_context.h. In summary, optional_yield +/// contains either a spawn::yield_context (in which case the current +/// coroutine is suspended until completion) or null_yield (in which +/// case the current thread is blocked until completion.) +/// +/// Please see the librados documentation for information on +/// AioCompletion and IoCtx. + +class FIFO { + friend struct Reader; + friend struct Updater; + friend struct Trimmer; + + mutable lr::IoCtx ioctx; + const std::string oid; + std::mutex m; + + fifo::info info; + + std::uint32_t part_header_size = 0xdeadbeef; + std::uint32_t part_entry_overhead = 0xdeadbeef; + + std::optional to_marker(std::string_view s); + + FIFO(lr::IoCtx&& ioc, + std::string oid) + : ioctx(std::move(ioc)), oid(oid) {} + + std::string generate_tag() const; + + int apply_update(fifo::info* info, + const fifo::objv& objv, + const fifo::update& update); + int _update_meta(const fifo::update& update, + fifo::objv version, bool* pcanceled, + optional_yield y); + int _update_meta(const fifo::update& update, + fifo::objv version, bool* pcanceled, + lr::AioCompletion* c); + int create_part(int64_t part_num, std::string_view tag, optional_yield y); + int remove_part(int64_t part_num, std::string_view tag, optional_yield y); + int process_journal(optional_yield y); + int _prepare_new_part(bool is_head, optional_yield y); + int _prepare_new_head(optional_yield y); + int push_entries(const std::deque& data_bufs, + optional_yield y); + int trim_part(int64_t part_num, uint64_t ofs, + std::optional tag, optional_yield y); + int trim_part(int64_t part_num, uint64_t ofs, + std::optional tag, lr::AioCompletion* c); + + static void trim_callback(lr::completion_t, void* arg); + static void update_callback(lr::completion_t, void* arg); + static void read_callback(lr::completion_t, void* arg); + +public: + + FIFO(const FIFO&) = delete; + FIFO& operator =(const FIFO&) = delete; + FIFO(FIFO&&) = delete; + FIFO& operator =(FIFO&&) = delete; + + /// Open an existing FIFO. + static int open(lr::IoCtx ioctx, //< IO Context + std::string oid, //< OID for metadata object + std::unique_ptr* fifo, //< OUT: Pointer to FIFO object + optional_yield y, //< Optional yield context + /// Operation will fail if FIFO is not at this version + std::optional objv = std::nullopt); + /// Create a new or open an existing FIFO. + static int create(lr::IoCtx ioctx, //< IO Context + std::string oid, //< OID for metadata object + std::unique_ptr* fifo, //< OUT: Pointer to FIFO object + optional_yield y, //< Optional yield context + /// Operation will fail if the FIFO exists and is + /// not of this version. + std::optional objv = std::nullopt, + /// Prefix for all objects + std::optional oid_prefix = std::nullopt, + /// Fail if the FIFO already exists + bool exclusive = false, + /// Maximum allowed size of parts + std::uint64_t max_part_size = default_max_part_size, + /// Maximum allowed size of entries + std::uint64_t max_entry_size = default_max_entry_size); + + /// Force refresh of metadata, yielding/blocking style + int read_meta(optional_yield y); + /// Force refresh of metadata, with a librados Completion + int read_meta(lr::AioCompletion* c); + /// Get currently known metadata + const fifo::info& meta() const; + /// Get partition header and entry overhead size + std::pair get_part_layout_info() const; + /// Push an entry to the FIFO + int push(const cb::list& bl, //< Entry to push + optional_yield y //< Optional yield + ); + /// Push entres to the FIFO + int push(const std::vector& data_bufs, //< Entries to push + /// Optional yield + optional_yield y); + /// List entries + int list(int max_entries, /// Maximum entries to list + /// Point after which to begin listing. Start at tail if null + std::optional markstr, + std::vector* out, //< OUT: entries + /// OUT: True if more entries in FIFO beyond the last returned + bool* more, + optional_yield y //< Optional yield + ); + /// Trim entries, coroutine/block style + int trim(std::string_view markstr, //< Position to which to trim, inclusive + optional_yield y //< Optional yield + ); + /// Trim entries, librados AioCompletion style + int trim(std::string_view markstr, //< Position to which to trim, inclusive + lr::AioCompletion* c //< librados AIO Completion + ); + /// Get part info + int get_part_info(int64_t part_num, /// Part number + fifo::part_header* header, //< OUT: Information + optional_yield y //< Optional yield + ); +}; +} + +#endif // CEPH_RGW_CLS_FIFO_LEGACY_H diff --git a/src/test/rgw/CMakeLists.txt b/src/test/rgw/CMakeLists.txt index ab1e2f65a5b..f518b7b2d34 100644 --- a/src/test/rgw/CMakeLists.txt +++ b/src/test/rgw/CMakeLists.txt @@ -205,3 +205,9 @@ install(TARGETS ceph_test_rgw_gc_log DESTINATION ${CMAKE_INSTALL_BINDIR}) add_ceph_test(test-ceph-diff-sorted.sh ${CMAKE_CURRENT_SOURCE_DIR}/test-ceph-diff-sorted.sh) + + +# unittest_cls_fifo_legacy +add_executable(unittest_cls_fifo_legacy test_cls_fifo_legacy.cc) +target_link_libraries(unittest_cls_fifo_legacy radostest-cxx ${UNITTEST_LIBS} + ${rgw_libs}) diff --git a/src/test/rgw/test_cls_fifo_legacy.cc b/src/test/rgw/test_cls_fifo_legacy.cc new file mode 100644 index 00000000000..f8808b87de8 --- /dev/null +++ b/src/test/rgw/test_cls_fifo_legacy.cc @@ -0,0 +1,609 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2019 Red Hat, Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include +#include +#include + +#include "include/scope_guard.h" +#include "include/types.h" +#include "include/rados/librados.hpp" + +#include "cls/fifo/cls_fifo_ops.h" +#include "test/librados/test_cxx.h" +#include "global/global_context.h" + +#include "rgw/rgw_tools.h" +#include "rgw/cls_fifo_legacy.h" + +#include "gtest/gtest.h" + +namespace R = librados; +namespace cb = ceph::buffer; +namespace fifo = rados::cls::fifo; +namespace RCf = rgw::cls::fifo; + +namespace { +int fifo_create(R::IoCtx& ioctx, + const std::string& oid, + std::string_view id, + optional_yield y, + std::optional objv = std::nullopt, + std::optional oid_prefix = std::nullopt, + bool exclusive = false, + std::uint64_t max_part_size = RCf::default_max_part_size, + std::uint64_t max_entry_size = RCf::default_max_entry_size) +{ + R::ObjectWriteOperation op; + RCf::create_meta(&op, id, objv, oid_prefix, exclusive, max_part_size, + max_entry_size); + return rgw_rados_operate(ioctx, oid, &op, y); +} +} + +class LegacyFIFO : public testing::Test { +protected: + const std::string pool_name = get_temp_pool_name(); + const std::string fifo_id = "fifo"; + R::Rados rados; + librados::IoCtx ioctx; + + void SetUp() override { + ASSERT_EQ("", create_one_pool_pp(pool_name, rados)); + ASSERT_EQ(0, rados.ioctx_create(pool_name.c_str(), ioctx)); + } + void TearDown() override { + destroy_one_pool_pp(pool_name, rados); + } +}; + +using LegacyClsFIFO = LegacyFIFO; + +TEST_F(LegacyClsFIFO, TestCreate) +{ + auto r = fifo_create(ioctx, fifo_id, ""s, null_yield); + EXPECT_EQ(-EINVAL, r); + r = fifo_create(ioctx, fifo_id, fifo_id, null_yield, std::nullopt, + std::nullopt, false, 0); + EXPECT_EQ(-EINVAL, r); + r = fifo_create(ioctx, fifo_id, {}, null_yield, + std::nullopt, std::nullopt, + false, RCf::default_max_part_size, 0); + EXPECT_EQ(-EINVAL, r); + r = fifo_create(ioctx, fifo_id, fifo_id, null_yield); + EXPECT_EQ(0, r); + std::uint64_t size; + ioctx.stat(fifo_id, &size, nullptr); + EXPECT_GT(size, 0); + /* test idempotency */ + r = fifo_create(ioctx, fifo_id, fifo_id, null_yield); + EXPECT_EQ(0, r); + r = fifo_create(ioctx, fifo_id, {}, null_yield, std::nullopt, + std::nullopt, false); + EXPECT_EQ(-EINVAL, r); + r = fifo_create(ioctx, fifo_id, {}, null_yield, std::nullopt, + "myprefix"sv, false); + EXPECT_EQ(-EINVAL, r); + r = fifo_create(ioctx, fifo_id, "foo"sv, null_yield, + std::nullopt, std::nullopt, false); + EXPECT_EQ(-EEXIST, r); +} + +TEST_F(LegacyClsFIFO, TestGetInfo) +{ + auto r = fifo_create(ioctx, fifo_id, fifo_id, null_yield); + fifo::info info; + std::uint32_t part_header_size; + std::uint32_t part_entry_overhead; + r = RCf::get_meta(ioctx, fifo_id, std::nullopt, &info, &part_header_size, + &part_entry_overhead, null_yield); + EXPECT_EQ(0, r); + EXPECT_GT(part_header_size, 0); + EXPECT_GT(part_entry_overhead, 0); + EXPECT_FALSE(info.version.instance.empty()); + + r = RCf::get_meta(ioctx, fifo_id, info.version, &info, &part_header_size, + &part_entry_overhead, null_yield); + EXPECT_EQ(0, r); + fifo::objv objv; + objv.instance = "foo"; + objv.ver = 12; + r = RCf::get_meta(ioctx, fifo_id, objv, &info, &part_header_size, + &part_entry_overhead, null_yield); + EXPECT_EQ(-ECANCELED, r); +} + +TEST_F(LegacyFIFO, TestOpenDefault) +{ + std::unique_ptr fifo; + auto r = RCf::FIFO::create(ioctx, fifo_id, &fifo, null_yield); + ASSERT_EQ(0, r); + // force reading from backend + r = fifo->read_meta(null_yield); + EXPECT_EQ(0, r); + auto info = fifo->meta(); + EXPECT_EQ(info.id, fifo_id); +} + +TEST_F(LegacyFIFO, TestOpenParams) +{ + const std::uint64_t max_part_size = 10 * 1024; + const std::uint64_t max_entry_size = 128; + auto oid_prefix = "foo.123."sv; + fifo::objv objv; + objv.instance = "fooz"s; + objv.ver = 10; + + /* first successful create */ + std::unique_ptr f; + auto r = RCf::FIFO::create(ioctx, fifo_id, &f, null_yield, objv, oid_prefix, + false, max_part_size, max_entry_size); + ASSERT_EQ(0, r); + + /* force reading from backend */ + r = f->read_meta(null_yield); + auto info = f->meta(); + EXPECT_EQ(info.id, fifo_id); + EXPECT_EQ(info.params.max_part_size, max_part_size); + EXPECT_EQ(info.params.max_entry_size, max_entry_size); + EXPECT_EQ(info.version, objv); +} + +namespace { +template +std::pair decode_entry(const RCf::list_entry& entry) +{ + T val; + auto iter = entry.data.cbegin(); + decode(val, iter); + return std::make_pair(std::move(val), entry.marker); +} +} + + +TEST_F(LegacyFIFO, TestPushListTrim) +{ + std::unique_ptr f; + auto r = RCf::FIFO::create(ioctx, fifo_id, &f, null_yield); + ASSERT_EQ(0, r); + static constexpr auto max_entries = 10u; + for (uint32_t i = 0; i < max_entries; ++i) { + cb::list bl; + encode(i, bl); + r = f->push(bl, null_yield); + ASSERT_EQ(0, r); + } + + std::optional marker; + /* get entries one by one */ + std::vector result; + bool more = false; + for (auto i = 0u; i < max_entries; ++i) { + + r = f->list(1, marker, &result, &more, null_yield); + ASSERT_EQ(0, r); + + bool expected_more = (i != (max_entries - 1)); + ASSERT_EQ(expected_more, more); + ASSERT_EQ(1, result.size()); + + std::uint32_t val; + std::tie(val, marker) = decode_entry(result.front()); + + ASSERT_EQ(i, val); + result.clear(); + } + + /* get all entries at once */ + std::string markers[max_entries]; + std::uint32_t min_entry = 0; + r = f->list(max_entries * 10, std::nullopt, &result, &more, null_yield); + ASSERT_EQ(0, r); + + ASSERT_FALSE(more); + ASSERT_EQ(max_entries, result.size()); + for (auto i = 0u; i < max_entries; ++i) { + std::uint32_t val; + std::tie(val, markers[i]) = decode_entry(result[i]); + ASSERT_EQ(i, val); + } + + /* trim one entry */ + r = f->trim(markers[min_entry], null_yield); + ASSERT_EQ(0, r); + ++min_entry; + + r = f->list(max_entries * 10, std::nullopt, &result, &more, null_yield); + ASSERT_EQ(0, r); + ASSERT_FALSE(more); + ASSERT_EQ(max_entries - min_entry, result.size()); + + for (auto i = min_entry; i < max_entries; ++i) { + std::uint32_t val; + std::tie(val, markers[i - min_entry]) = + decode_entry(result[i - min_entry]); + EXPECT_EQ(i, val); + } +} + + +TEST_F(LegacyFIFO, TestPushTooBig) +{ + static constexpr auto max_part_size = 2048ull; + static constexpr auto max_entry_size = 128ull; + + std::unique_ptr f; + auto r = RCf::FIFO::create(ioctx, fifo_id, &f, null_yield, std::nullopt, + std::nullopt, false, max_part_size, max_entry_size); + ASSERT_EQ(0, r); + + char buf[max_entry_size + 1]; + memset(buf, 0, sizeof(buf)); + + cb::list bl; + bl.append(buf, sizeof(buf)); + + r = f->push(bl, null_yield); + EXPECT_EQ(-E2BIG, r); +} + + +TEST_F(LegacyFIFO, TestMultipleParts) +{ + static constexpr auto max_part_size = 2048ull; + static constexpr auto max_entry_size = 128ull; + std::unique_ptr f; + auto r = RCf::FIFO::create(ioctx, fifo_id, &f, null_yield, std::nullopt, + std::nullopt, false, max_part_size, + max_entry_size); + ASSERT_EQ(0, r); + + char buf[max_entry_size]; + memset(buf, 0, sizeof(buf)); + const auto [part_header_size, part_entry_overhead] = + f->get_part_layout_info(); + const auto entries_per_part = ((max_part_size - part_header_size) / + (max_entry_size + part_entry_overhead)); + const auto max_entries = entries_per_part * 4 + 1; + /* push enough entries */ + for (auto i = 0u; i < max_entries; ++i) { + cb::list bl; + *(int *)buf = i; + bl.append(buf, sizeof(buf)); + r = f->push(bl, null_yield); + ASSERT_EQ(0, r); + } + + auto info = f->meta(); + ASSERT_EQ(info.id, fifo_id); + /* head should have advanced */ + ASSERT_GT(info.head_part_num, 0); + + /* list all at once */ + std::vector result; + bool more = false; + r = f->list(max_entries, std::nullopt, &result, &more, null_yield); + ASSERT_EQ(0, r); + EXPECT_EQ(false, more); + ASSERT_EQ(max_entries, result.size()); + + for (auto i = 0u; i < max_entries; ++i) { + auto& bl = result[i].data; + ASSERT_EQ(i, *(int *)bl.c_str()); + } + + std::optional marker; + /* get entries one by one */ + + for (auto i = 0u; i < max_entries; ++i) { + r = f->list(1, marker, &result, &more, null_yield); + ASSERT_EQ(0, r); + ASSERT_EQ(result.size(), 1); + const bool expected_more = (i != (max_entries - 1)); + ASSERT_EQ(expected_more, more); + + std::uint32_t val; + std::tie(val, marker) = decode_entry(result.front()); + + auto& entry = result.front(); + auto& bl = entry.data; + ASSERT_EQ(i, *(int *)bl.c_str()); + marker = entry.marker; + } + + /* trim one at a time */ + marker.reset(); + for (auto i = 0u; i < max_entries; ++i) { + /* read single entry */ + r = f->list(1, marker, &result, &more, null_yield); + ASSERT_EQ(0, r); + ASSERT_EQ(result.size(), 1); + const bool expected_more = (i != (max_entries - 1)); + ASSERT_EQ(expected_more, more); + + marker = result.front().marker; + r = f->trim(*marker, null_yield); + ASSERT_EQ(0, r); + + /* check tail */ + info = f->meta(); + ASSERT_EQ(info.tail_part_num, i / entries_per_part); + + /* try to read all again, see how many entries left */ + r = f->list(max_entries, marker, &result, &more, null_yield); + ASSERT_EQ(max_entries - i - 1, result.size()); + ASSERT_EQ(false, more); + } + + /* tail now should point at head */ + info = f->meta(); + ASSERT_EQ(info.head_part_num, info.tail_part_num); + + RCf::part_info partinfo; + /* check old tails are removed */ + for (auto i = 0; i < info.tail_part_num; ++i) { + r = f->get_part_info(i, &partinfo, null_yield); + ASSERT_EQ(-ENOENT, r); + } + /* check current tail exists */ + r = f->get_part_info(info.tail_part_num, &partinfo, null_yield); + ASSERT_EQ(0, r); +} + +TEST_F(LegacyFIFO, TestTwoPushers) +{ + static constexpr auto max_part_size = 2048ull; + static constexpr auto max_entry_size = 128ull; + + std::unique_ptr f; + auto r = RCf::FIFO::create(ioctx, fifo_id, &f, null_yield, std::nullopt, + std::nullopt, false, max_part_size, + max_entry_size); + ASSERT_EQ(0, r); + char buf[max_entry_size]; + memset(buf, 0, sizeof(buf)); + + auto [part_header_size, part_entry_overhead] = f->get_part_layout_info(); + const auto entries_per_part = ((max_part_size - part_header_size) / + (max_entry_size + part_entry_overhead)); + const auto max_entries = entries_per_part * 4 + 1; + std::unique_ptr f2; + r = RCf::FIFO::open(ioctx, fifo_id, &f2, null_yield); + std::vector fifos{&f, &f2}; + + for (auto i = 0u; i < max_entries; ++i) { + cb::list bl; + *(int *)buf = i; + bl.append(buf, sizeof(buf)); + auto& f = *fifos[i % fifos.size()]; + r = f->push(bl, null_yield); + ASSERT_EQ(0, r); + } + + /* list all by both */ + std::vector result; + bool more = false; + r = f2->list(max_entries, std::nullopt, &result, &more, null_yield); + ASSERT_EQ(0, r); + ASSERT_EQ(false, more); + ASSERT_EQ(max_entries, result.size()); + + r = f2->list(max_entries, std::nullopt, &result, &more, null_yield); + ASSERT_EQ(0, r); + ASSERT_EQ(false, more); + ASSERT_EQ(max_entries, result.size()); + + for (auto i = 0u; i < max_entries; ++i) { + auto& bl = result[i].data; + ASSERT_EQ(i, *(int *)bl.c_str()); + } +} + +TEST_F(LegacyFIFO, TestTwoPushersTrim) +{ + static constexpr auto max_part_size = 2048ull; + static constexpr auto max_entry_size = 128ull; + std::unique_ptr f1; + auto r = RCf::FIFO::create(ioctx, fifo_id, &f1, null_yield, std::nullopt, + std::nullopt, false, max_part_size, + max_entry_size); + ASSERT_EQ(0, r); + + char buf[max_entry_size]; + memset(buf, 0, sizeof(buf)); + + auto [part_header_size, part_entry_overhead] = f1->get_part_layout_info(); + const auto entries_per_part = ((max_part_size - part_header_size) / + (max_entry_size + part_entry_overhead)); + const auto max_entries = entries_per_part * 4 + 1; + + std::unique_ptr f2; + r = RCf::FIFO::open(ioctx, fifo_id, &f2, null_yield); + ASSERT_EQ(0, r); + + /* push one entry to f2 and the rest to f1 */ + for (auto i = 0u; i < max_entries; ++i) { + cb::list bl; + *(int *)buf = i; + bl.append(buf, sizeof(buf)); + auto& f = (i < 1 ? f2 : f1); + r = f->push(bl, null_yield); + ASSERT_EQ(0, r); + } + + /* trim half by fifo1 */ + auto num = max_entries / 2; + std::string marker; + std::vector result; + bool more = false; + r = f1->list(num, std::nullopt, &result, &more, null_yield); + ASSERT_EQ(0, r); + ASSERT_EQ(true, more); + ASSERT_EQ(num, result.size()); + + for (auto i = 0u; i < num; ++i) { + auto& bl = result[i].data; + ASSERT_EQ(i, *(int *)bl.c_str()); + } + + auto& entry = result[num - 1]; + marker = entry.marker; + r = f1->trim(marker, null_yield); + /* list what's left by fifo2 */ + + const auto left = max_entries - num; + f2->list(left, marker, &result, &more, null_yield); + ASSERT_EQ(left, result.size()); + ASSERT_EQ(false, more); + + for (auto i = num; i < max_entries; ++i) { + auto& bl = result[i - num].data; + ASSERT_EQ(i, *(int *)bl.c_str()); + } +} + +TEST_F(LegacyFIFO, TestPushBatch) +{ + static constexpr auto max_part_size = 2048ull; + static constexpr auto max_entry_size = 128ull; + + std::unique_ptr f; + auto r = RCf::FIFO::create(ioctx, fifo_id, &f, null_yield, std::nullopt, + std::nullopt, false, max_part_size, + max_entry_size); + ASSERT_EQ(0, r); + + char buf[max_entry_size]; + memset(buf, 0, sizeof(buf)); + auto [part_header_size, part_entry_overhead] = f->get_part_layout_info(); + auto entries_per_part = ((max_part_size - part_header_size) / + (max_entry_size + part_entry_overhead)); + auto max_entries = entries_per_part * 4 + 1; /* enough entries to span multiple parts */ + std::vector bufs; + for (auto i = 0u; i < max_entries; ++i) { + cb::list bl; + *(int *)buf = i; + bl.append(buf, sizeof(buf)); + bufs.push_back(bl); + } + ASSERT_EQ(max_entries, bufs.size()); + + r = f->push(bufs, null_yield); + ASSERT_EQ(0, r); + + /* list all */ + + std::vector result; + bool more = false; + r = f->list(max_entries, std::nullopt, &result, &more, null_yield); + ASSERT_EQ(0, r); + ASSERT_EQ(false, more); + ASSERT_EQ(max_entries, result.size()); + for (auto i = 0u; i < max_entries; ++i) { + auto& bl = result[i].data; + ASSERT_EQ(i, *(int *)bl.c_str()); + } + auto& info = f->meta(); + ASSERT_EQ(info.head_part_num, 4); +} + +TEST_F(LegacyFIFO, TestAioTrim) +{ + static constexpr auto max_part_size = 2048ull; + static constexpr auto max_entry_size = 128ull; + std::unique_ptr f; + auto r = RCf::FIFO::create(ioctx, fifo_id, &f, null_yield, std::nullopt, + std::nullopt, false, max_part_size, + max_entry_size); + ASSERT_EQ(0, r); + + char buf[max_entry_size]; + memset(buf, 0, sizeof(buf)); + const auto [part_header_size, part_entry_overhead] = + f->get_part_layout_info(); + const auto entries_per_part = ((max_part_size - part_header_size) / + (max_entry_size + part_entry_overhead)); + const auto max_entries = entries_per_part * 4 + 1; + /* push enough entries */ + std::vector bufs; + for (auto i = 0u; i < max_entries; ++i) { + cb::list bl; + *(int *)buf = i; + bl.append(buf, sizeof(buf)); + bufs.push_back(std::move(bl)); + } + ASSERT_EQ(max_entries, bufs.size()); + + r = f->push(bufs, null_yield); + ASSERT_EQ(0, r); + + auto info = f->meta(); + ASSERT_EQ(info.id, fifo_id); + /* head should have advanced */ + ASSERT_GT(info.head_part_num, 0); + + /* list all at once */ + std::vector result; + bool more = false; + r = f->list(max_entries, std::nullopt, &result, &more, null_yield); + ASSERT_EQ(0, r); + ASSERT_EQ(false, more); + ASSERT_EQ(max_entries, result.size()); + + std::optional marker; + /* trim one at a time */ + result.clear(); + more = false; + marker.reset(); + for (auto i = 0u; i < max_entries; ++i) { + /* read single entry */ + r = f->list(1, marker, &result, &more, null_yield); + ASSERT_EQ(0, r); + ASSERT_EQ(result.size(), 1); + const bool expected_more = (i != (max_entries - 1)); + ASSERT_EQ(expected_more, more); + + marker = result.front().marker; + std::unique_ptr c(rados.aio_create_completion(nullptr, + nullptr)); + r = f->trim(*marker, c.get()); + ASSERT_EQ(0, r); + c->wait_for_complete(); + r = c->get_return_value(); + ASSERT_EQ(0, r); + + /* check tail */ + info = f->meta(); + ASSERT_EQ(info.tail_part_num, i / entries_per_part); + + /* try to read all again, see how many entries left */ + r = f->list(max_entries, marker, &result, &more, null_yield); + ASSERT_EQ(max_entries - i - 1, result.size()); + ASSERT_EQ(false, more); + } + + /* tail now should point at head */ + info = f->meta(); + ASSERT_EQ(info.head_part_num, info.tail_part_num); + + RCf::part_info partinfo; + /* check old tails are removed */ + for (auto i = 0; i < info.tail_part_num; ++i) { + r = f->get_part_info(i, &partinfo, null_yield); + ASSERT_EQ(-ENOENT, r); + } + /* check current tail exists */ + r = f->get_part_info(info.tail_part_num, &partinfo, null_yield); + ASSERT_EQ(0, r); +}