std::ostringstream ss;
ss << part_header->max_time;
CLS_LOG(5, "%s:%d read part_header:\n"
- "\ttag=%s\n"
"\tmagic=0x%" PRIx64 "\n"
"\tmin_ofs=%" PRId64 "\n"
"\tlast_ofs=%" PRId64 "\n"
"\tmax_index=%" PRId64 "\n"
"\tmax_time=%s\n",
__PRETTY_FUNCTION__, __LINE__,
- part_header->tag.c_str(),
part_header->magic,
part_header->min_ofs,
part_header->last_ofs,
std::uint64_t size;
- if (op.tag.empty()) {
- CLS_ERR("%s: tag required", __PRETTY_FUNCTION__);
- return -EINVAL;
- }
-
int r = cls_cxx_stat2(hctx, &size, nullptr);
if (r < 0 && r != -ENOENT) {
CLS_ERR("ERROR: %s: cls_cxx_stat2() on obj returned %d", __PRETTY_FUNCTION__, r);
return r;
}
- if (!(part_header.tag == op.tag &&
- part_header.params == op.params)) {
+ if (!(part_header.params == op.params)) {
CLS_ERR("%s: failed to re-create existing part with different "
"params", __PRETTY_FUNCTION__);
return -EEXIST;
part_header part_header;
- part_header.tag = op.tag;
part_header.params = op.params;
part_header.min_ofs = CLS_FIFO_MAX_PART_HEADER_SIZE;
return -EINVAL;
}
- if (op.tag.empty()) {
- CLS_ERR("%s: tag required", __PRETTY_FUNCTION__);
- return -EINVAL;
- }
-
part_header part_header;
int r = read_part_header(hctx, &part_header);
if (r < 0) {
return r;
}
- if (!(part_header.tag == op.tag)) {
- CLS_ERR("%s: bad tag", __PRETTY_FUNCTION__);
- return -EINVAL;
- }
-
std::uint64_t effective_len = op.total_len + op.data_bufs.size() *
part_entry_overhead;
return r;
}
- if (op.tag &&
- !(part_header.tag == *op.tag)) {
- CLS_ERR("%s: bad tag", __PRETTY_FUNCTION__);
- return -EINVAL;
- }
-
if (op.ofs < part_header.min_ofs) {
return 0;
}
return r;
}
- if (op.tag &&
- !(part_header.tag == *op.tag)) {
- CLS_ERR("%s: bad tag", __PRETTY_FUNCTION__);
- return -EINVAL;
- }
-
EntryReader reader(hctx, part_header, op.ofs);
if (op.ofs >= part_header.min_ofs &&
op::list_part_reply reply;
- reply.tag = part_header.tag;
-
auto max_entries = std::min(op.max_entries, op::MAX_LIST_ENTRIES);
for (int i = 0; i < max_entries && !reader.end(); ++i) {
struct init_part
{
- std::string tag;
data_params params;
void encode(ceph::buffer::list& bl) const {
ENCODE_START(1, 1, bl);
+ std::string tag;
encode(tag, bl);
encode(params, bl);
ENCODE_FINISH(bl);
}
void decode(ceph::buffer::list::const_iterator& bl) {
DECODE_START(1, bl);
+ std::string tag;
decode(tag, bl);
decode(params, bl);
DECODE_FINISH(bl);
struct push_part
{
- std::string tag;
std::deque<ceph::buffer::list> data_bufs;
std::uint64_t total_len{0};
void encode(ceph::buffer::list& bl) const {
ENCODE_START(1, 1, bl);
+ std::string tag;
encode(tag, bl);
encode(data_bufs, bl);
encode(total_len, bl);
}
void decode(ceph::buffer::list::const_iterator& bl) {
DECODE_START(1, bl);
+ std::string tag;
decode(tag, bl);
decode(data_bufs, bl);
decode(total_len, bl);
struct trim_part
{
- std::optional<std::string> tag;
std::uint64_t ofs{0};
bool exclusive = false;
void encode(ceph::buffer::list& bl) const {
ENCODE_START(1, 1, bl);
+ std::optional<std::string> tag;
encode(tag, bl);
encode(ofs, bl);
encode(exclusive, bl);
}
void decode(ceph::buffer::list::const_iterator& bl) {
DECODE_START(1, bl);
+ std::optional<std::string> tag;
decode(tag, bl);
decode(ofs, bl);
decode(exclusive, bl);
struct list_part
{
- std::optional<std::string> tag;
std::uint64_t ofs{0};
int max_entries{100};
void encode(ceph::buffer::list& bl) const {
ENCODE_START(1, 1, bl);
+ std::optional<std::string> tag;
encode(tag, bl);
encode(ofs, bl);
encode(max_entries, bl);
}
void decode(ceph::buffer::list::const_iterator& bl) {
DECODE_START(1, bl);
+ std::optional<std::string> tag;
decode(tag, bl);
decode(ofs, bl);
decode(max_entries, bl);
struct list_part_reply
{
- std::string tag;
std::vector<part_list_entry> entries;
bool more{false};
bool full_part{false}; /* whether part is full or still can be written to.
void encode(ceph::buffer::list& bl) const {
ENCODE_START(1, 1, bl);
+ std::string tag;
encode(tag, bl);
encode(entries, bl);
encode(more, bl);
}
void decode(ceph::buffer::list::const_iterator& bl) {
DECODE_START(1, bl);
+ std::string tag;
decode(tag, bl);
decode(entries, bl);
decode(more, bl);
remove = 3,
} op{Op::unknown};
- std::int64_t part_num{0};
- std::string part_tag;
+ std::int64_t part_num{-1};
+
+ journal_entry() = default;
+ journal_entry(Op op, std::int64_t part_num)
+ : op(op), part_num(part_num) {}
void encode(ceph::buffer::list& bl) const {
ENCODE_START(1, 1, bl);
encode((int)op, bl);
encode(part_num, bl);
+ std::string part_tag;
encode(part_tag, bl);
ENCODE_FINISH(bl);
}
decode(i, bl);
op = static_cast<Op>(i);
decode(part_num, bl);
+ std::string part_tag;
decode(part_tag, bl);
DECODE_FINISH(bl);
}
friend bool operator ==(const journal_entry& lhs, const journal_entry& rhs) {
return (lhs.op == rhs.op &&
- lhs.part_num == rhs.part_num &&
- lhs.part_tag == rhs.part_tag);
+ lhs.part_num == rhs.part_num);
}
};
WRITE_CLASS_ENCODER(journal_entry)
}
inline std::ostream& operator <<(std::ostream& m, const journal_entry& j) {
return m << "op: " << j.op << ", "
- << "part_num: " << j.part_num << ", "
- << "part_tag: " << j.part_tag;
+ << "part_num: " << j.part_num;
}
// This is actually a useful builder, since otherwise we end up with
std::int64_t min_push_part_num{0};
std::int64_t max_push_part_num{-1};
- std::string head_tag;
- std::map<int64_t, std::string> tags;
-
std::multimap<int64_t, journal_entry> journal;
bool need_new_head() const {
encode(head_part_num, bl);
encode(min_push_part_num, bl);
encode(max_push_part_num, bl);
+ std::string head_tag;
+ std::map<int64_t, std::string> tags;
encode(tags, bl);
encode(head_tag, bl);
encode(journal, bl);
decode(head_part_num, bl);
decode(min_push_part_num, bl);
decode(max_push_part_num, bl);
+ std::string head_tag;
+ std::map<int64_t, std::string> tags;
decode(tags, bl);
decode(head_tag, bl);
decode(journal, bl);
return fmt::format("{}.{}", oid_prefix, part_num);
}
- journal_entry next_journal_entry(std::string tag) const {
- journal_entry entry;
- entry.op = journal_entry::Op::create;
- entry.part_num = max_push_part_num + 1;
- entry.part_tag = std::move(tag);
- return entry;
- }
-
std::optional<std::string>
apply_update(const update& update) {
if (update.tail_part_num()) {
"allowed, part num={}", entry.part_num);
}
- if (entry.op == journal_entry::Op::create) {
- tags[entry.part_num] = entry.part_tag;
- }
-
journal.emplace(entry.part_num, entry);
}
}
if (update.head_part_num()) {
- tags.erase(head_part_num);
head_part_num = *update.head_part_num();
- auto iter = tags.find(head_part_num);
- if (iter != tags.end()) {
- head_tag = iter->second;
- } else {
- head_tag.erase();
- }
}
return std::nullopt;
<< "head_part_num: " << i.head_part_num << ", "
<< "min_push_part_num: " << i.min_push_part_num << ", "
<< "max_push_part_num: " << i.max_push_part_num << ", "
- << "head_tag: " << i.head_tag << ", "
- << "tags: {" << i.tags << "}, "
<< "journal: {" << i.journal;
}
}
struct part_header {
- std::string tag;
-
data_params params;
std::uint64_t magic{0};
void encode(ceph::buffer::list& bl) const {
ENCODE_START(1, 1, bl);
+ std::string tag;
encode(tag, bl);
encode(params, bl);
encode(magic, bl);
}
void decode(ceph::buffer::list::const_iterator& bl) {
DECODE_START(1, bl);
+ std::string tag;
decode(tag, bl);
decode(params, bl);
decode(magic, bl);
WRITE_CLASS_ENCODER(part_header)
inline std::ostream& operator <<(std::ostream& m, const part_header& p) {
using ceph::operator <<;
- return m << "tag: " << p.tag << ", "
- << "params: {" << p.params << "}, "
+ return m << "params: {" << p.params << "}, "
<< "magic: " << p.magic << ", "
<< "min_ofs: " << p.min_ofs << ", "
<< "last_ofs: " << p.last_ofs << ", "
# ${BLKID_LIBRARIES} ${CRYPTO_LIBS} ${EXTRALIBS})
# target_link_libraries(libneorados ${rados_libs})
# install(TARGETS libneorados DESTINATION ${CMAKE_INSTALL_LIBDIR})
-add_library(neorados_cls_fifo STATIC cls/fifo.cc)
-target_link_libraries(neorados_cls_fifo PRIVATE
- libneorados ceph-common fmt::fmt)
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2020 Red Hat <contact@redhat.com>
- * Author: Adam C. Emerson
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation. See file COPYING.
- *
- */
-
-#include <cstdint>
-#include <numeric>
-#include <optional>
-#include <string_view>
-
-#undef FMT_HEADER_ONLY
-#define FMT_HEADER_ONLY 1
-#include <fmt/format.h>
-
-#include <boost/system/error_code.hpp>
-
-#include "include/neorados/RADOS.hpp"
-
-#include "include/buffer.h"
-
-#include "common/random_string.h"
-
-#include "cls/fifo/cls_fifo_types.h"
-#include "cls/fifo/cls_fifo_ops.h"
-
-#include "fifo.h"
-
-using namespace std;
-
-namespace neorados::cls::fifo {
-namespace bs = boost::system;
-namespace cb = ceph::buffer;
-namespace fifo = rados::cls::fifo;
-
-void create_meta(WriteOp& op, std::string_view id,
- std::optional<fifo::objv> objv,
- std::optional<std::string_view> oid_prefix,
- bool exclusive,
- std::uint64_t max_part_size,
- std::uint64_t max_entry_size)
-{
- fifo::op::create_meta cm;
-
- cm.id = id;
- cm.version = objv;
- cm.oid_prefix = oid_prefix;
- cm.max_part_size = max_part_size;
- cm.max_entry_size = max_entry_size;
- cm.exclusive = exclusive;
-
- cb::list in;
- encode(cm, in);
- op.exec(fifo::op::CLASS, fifo::op::CREATE_META, in);
-}
-
-void get_meta(ReadOp& op, std::optional<fifo::objv> objv,
- bs::error_code* ec_out, fifo::info* info,
- std::uint32_t* part_header_size,
- std::uint32_t* part_entry_overhead)
-{
- fifo::op::get_meta gm;
- gm.version = objv;
- cb::list in;
- encode(gm, in);
- op.exec(fifo::op::CLASS, fifo::op::GET_META, in,
- [ec_out, info, part_header_size,
- part_entry_overhead](bs::error_code ec, const cb::list& bl) {
- fifo::op::get_meta_reply reply;
- if (!ec) try {
- auto iter = bl.cbegin();
- decode(reply, iter);
- } catch (const cb::error& err) {
- ec = err.code();
- }
- if (ec_out) *ec_out = ec;
- if (info) *info = std::move(reply.info);
- if (part_header_size) *part_header_size = reply.part_header_size;
- if (part_entry_overhead)
- *part_entry_overhead = reply.part_entry_overhead;
- });
-};
-
-void update_meta(WriteOp& op, const fifo::objv& objv,
- const fifo::update& update)
-{
- fifo::op::update_meta um;
-
- um.version = objv;
- um.tail_part_num = update.tail_part_num();
- um.head_part_num = update.head_part_num();
- um.min_push_part_num = update.min_push_part_num();
- um.max_push_part_num = update.max_push_part_num();
- um.journal_entries_add = std::move(update).journal_entries_add();
- um.journal_entries_rm = std::move(update).journal_entries_rm();
-
- cb::list in;
- encode(um, in);
- op.exec(fifo::op::CLASS, fifo::op::UPDATE_META, in);
-}
-
-void part_init(WriteOp& op, std::string_view tag,
- fifo::data_params params)
-{
- fifo::op::init_part ip;
-
- ip.tag = tag;
- ip.params = params;
-
- cb::list in;
- encode(ip, in);
- op.exec(fifo::op::CLASS, fifo::op::INIT_PART, in);
-}
-
-void push_part(WriteOp& op, std::string_view tag,
- std::deque<cb::list> data_bufs,
- fu2::unique_function<void(bs::error_code, int)> f)
-{
- fifo::op::push_part pp;
-
- pp.tag = tag;
- pp.data_bufs = data_bufs;
- pp.total_len = 0;
-
- for (const auto& bl : data_bufs)
- pp.total_len += bl.length();
-
- cb::list in;
- encode(pp, in);
- op.exec(fifo::op::CLASS, fifo::op::PUSH_PART, in,
- [f = std::move(f)](bs::error_code ec, int r, const cb::list&) mutable {
- std::move(f)(ec, r);
- });
- op.returnvec();
-}
-
-void trim_part(WriteOp& op,
- std::optional<std::string_view> tag,
- std::uint64_t ofs, bool exclusive)
-{
- fifo::op::trim_part tp;
-
- tp.tag = tag;
- tp.ofs = ofs;
- tp.exclusive = exclusive;
-
- bufferlist in;
- encode(tp, in);
- op.exec(fifo::op::CLASS, fifo::op::TRIM_PART, in);
-}
-
-void list_part(ReadOp& op,
- std::optional<string_view> tag,
- std::uint64_t ofs,
- std::uint64_t max_entries,
- bs::error_code* ec_out,
- std::vector<fifo::part_list_entry>* entries,
- bool* more,
- bool* full_part,
- std::string* ptag)
-{
- fifo::op::list_part lp;
-
- lp.tag = tag;
- lp.ofs = ofs;
- lp.max_entries = max_entries;
-
- bufferlist in;
- encode(lp, in);
- op.exec(fifo::op::CLASS, fifo::op::LIST_PART, in,
- [entries, more, full_part, ptag, ec_out](bs::error_code ec,
- const cb::list& bl) {
- if (ec) {
- if (ec_out) *ec_out = ec;
- return;
- }
-
- fifo::op::list_part_reply reply;
- auto iter = bl.cbegin();
- try {
- decode(reply, iter);
- } catch (const cb::error& err) {
- if (ec_out) *ec_out = ec;
- return;
- }
-
- if (entries) *entries = std::move(reply.entries);
- if (more) *more = reply.more;
- if (full_part) *full_part = reply.full_part;
- if (ptag) *ptag = reply.tag;
- });
-}
-
-void get_part_info(ReadOp& op,
- bs::error_code* out_ec,
- fifo::part_header* header)
-{
- fifo::op::get_part_info gpi;
-
- bufferlist in;
- encode(gpi, in);
- op.exec(fifo::op::CLASS, fifo::op::GET_PART_INFO, in,
- [out_ec, header](bs::error_code ec, const cb::list& bl) {
- if (ec) {
- if (out_ec) *out_ec = ec;
- }
- fifo::op::get_part_info_reply reply;
- auto iter = bl.cbegin();
- try {
- decode(reply, iter);
- } catch (const cb::error& err) {
- if (out_ec) *out_ec = ec;
- return;
- }
-
- if (header) *header = std::move(reply.header);
- });
-}
-
-std::optional<marker> FIFO::to_marker(std::string_view s) {
- marker m;
- if (s.empty()) {
- m.num = info.tail_part_num;
- m.ofs = 0;
- return m;
- }
-
- auto pos = s.find(':');
- if (pos == string::npos) {
- return std::nullopt;
- }
-
- auto num = s.substr(0, pos);
- auto ofs = s.substr(pos + 1);
-
- auto n = ceph::parse<decltype(m.num)>(num);
- if (!n) {
- return std::nullopt;
- }
- m.num = *n;
- auto o = ceph::parse<decltype(m.ofs)>(ofs);
- if (!o) {
- return std::nullopt;
- }
- m.ofs = *o;
- return m;
-}
-
-bs::error_code FIFO::apply_update(fifo::info* info,
- const fifo::objv& objv,
- const fifo::update& update) {
- std::unique_lock l(m);
- auto err = info->apply_update(update);
- if (objv != info->version) {
- ldout(r->cct(), 0) << __func__ << "(): Raced locally!" << dendl;
- return errc::raced;
- }
- if (err) {
- ldout(r->cct(), 0) << __func__ << "(): ERROR: " << err << dendl;
- return errc::update_failed;
- }
-
- ++info->version.ver;
-
- return {};
-}
-
-std::string FIFO::generate_tag() const
-{
- static constexpr auto HEADER_TAG_SIZE = 16;
- return gen_rand_alphanumeric_plain(r->cct(), HEADER_TAG_SIZE);
-}
-
-#pragma GCC diagnostic push
-#pragma GCC diagnostic ignored "-Wnon-virtual-dtor"
-#pragma clang diagnostic push
-#pragma clang diagnostic ignored "-Wnon-virtual-dtor"
-class error_category : public ceph::converting_category {
-public:
- error_category(){}
- const char* name() const noexcept override;
- const char* message(int ev, char*, std::size_t) const noexcept override;
- std::string message(int ev) const override;
- bs::error_condition default_error_condition(int ev) const noexcept
- override;
- bool equivalent(int ev, const bs::error_condition& c) const
- noexcept override;
- using ceph::converting_category::equivalent;
- int from_code(int ev) const noexcept override;
-};
-#pragma GCC diagnostic pop
-#pragma clang diagnostic pop
-
-const char* error_category::name() const noexcept {
- return "FIFO";
-}
-
-const char* error_category::message(int ev, char*, std::size_t) const noexcept {
- if (ev == 0)
- return "No error";
-
- switch (static_cast<errc>(ev)) {
- case errc::raced:
- return "Retry-race count exceeded";
-
- case errc::inconsistency:
- return "Inconsistent result! New head before old head";
-
- case errc::entry_too_large:
- return "Pushed entry too large";
-
- case errc::invalid_marker:
- return "Invalid marker string";
-
- case errc::update_failed:
- return "Update failed";
- }
-
- return "Unknown error";
-}
-
-std::string error_category::message(int ev) const {
- return message(ev, nullptr, 0);
-}
-
-bs::error_condition
-error_category::default_error_condition(int ev) const noexcept {
- switch (static_cast<errc>(ev)) {
- case errc::raced:
- return bs::errc::operation_canceled;
-
- case errc::inconsistency:
- return bs::errc::io_error;
-
- case errc::entry_too_large:
- return bs::errc::value_too_large;
-
- case errc::invalid_marker:
- return bs::errc::invalid_argument;
-
- case errc::update_failed:
- return bs::errc::invalid_argument;
- }
-
- return { ev, *this };
-}
-
-bool error_category::equivalent(int ev, const bs::error_condition& c) const noexcept {
- return default_error_condition(ev) == c;
-}
-
-int error_category::from_code(int ev) const noexcept {
- switch (static_cast<errc>(ev)) {
- case errc::raced:
- return -ECANCELED;
-
- case errc::inconsistency:
- return -EIO;
-
- case errc::entry_too_large:
- return -E2BIG;
-
- case errc::invalid_marker:
- return -EINVAL;
-
- case errc::update_failed:
- return -EINVAL;
-
- }
- return -EDOM;
-}
-
-const bs::error_category& error_category() noexcept {
- static const class error_category c;
- return c;
-}
-
-}
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2020 Red Hat <contact@redhat.com>
- * Author: Adam C. Emerson
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation. See file COPYING.
- *
- */
-
-#ifndef CEPH_NEORADOS_CLS_FIFIO_H
-#define CEPH_NEORADOS_CLS_FIFIO_H
-
-#include <cstdint>
-#include <deque>
-#include <map>
-#include <memory>
-#include <mutex>
-#include <optional>
-#include <string_view>
-#include <vector>
-
-#include <boost/asio.hpp>
-#include <boost/system/error_code.hpp>
-
-#undef FMT_HEADER_ONLY
-#define FMT_HEADER_ONLY 1
-#include <fmt/format.h>
-
-#include "include/neorados/RADOS.hpp"
-#include "include/buffer.h"
-
-#include "common/allocate_unique.h"
-#include "common/async/bind_handler.h"
-#include "common/async/bind_like.h"
-#include "common/async/completion.h"
-#include "common/async/forward_handler.h"
-
-#include "common/dout.h"
-
-#include "cls/fifo/cls_fifo_types.h"
-#include "cls/fifo/cls_fifo_ops.h"
-
-namespace neorados::cls::fifo {
-namespace ba = boost::asio;
-namespace bs = boost::system;
-namespace ca = ceph::async;
-namespace cb = ceph::buffer;
-namespace fifo = rados::cls::fifo;
-
-inline constexpr auto dout_subsys = ceph_subsys_rados;
-inline constexpr std::uint64_t default_max_part_size = 4 * 1024 * 1024;
-inline constexpr std::uint64_t default_max_entry_size = 32 * 1024;
-inline constexpr auto MAX_RACE_RETRIES = 10;
-
-
-const boost::system::error_category& error_category() noexcept;
-
-enum class errc {
- raced = 1,
- inconsistency,
- entry_too_large,
- invalid_marker,
- update_failed
-};
-}
-
-namespace boost::system {
-template<>
-struct is_error_code_enum<::neorados::cls::fifo::errc> {
- static const bool value = true;
-};
-template<>
-struct is_error_condition_enum<::neorados::cls::fifo::errc> {
- static const bool value = false;
-};
-}
-
-namespace neorados::cls::fifo {
-// explicit conversion:
-inline bs::error_code make_error_code(errc e) noexcept {
- return { static_cast<int>(e), error_category() };
-}
-
-inline bs::error_code make_error_category(errc e) noexcept {
- return { static_cast<int>(e), error_category() };
-}
-
-void create_meta(WriteOp& op, std::string_view id,
- std::optional<fifo::objv> objv,
- std::optional<std::string_view> oid_prefix,
- bool exclusive = false,
- std::uint64_t max_part_size = default_max_part_size,
- std::uint64_t max_entry_size = default_max_entry_size);
-void get_meta(ReadOp& op, std::optional<fifo::objv> objv,
- bs::error_code* ec_out, fifo::info* info,
- std::uint32_t* part_header_size,
- std::uint32_t* part_entry_overhead);
-
-void update_meta(WriteOp& op, const fifo::objv& objv,
- const fifo::update& desc);
-
-void part_init(WriteOp& op, std::string_view tag,
- fifo::data_params params);
-
-void push_part(WriteOp& op, std::string_view tag,
- std::deque<cb::list> data_bufs,
- fu2::unique_function<void(bs::error_code, int)>);
-void trim_part(WriteOp& op, std::optional<std::string_view> tag,
- std::uint64_t ofs,
- bool exclusive);
-void list_part(ReadOp& op,
- std::optional<std::string_view> tag,
- std::uint64_t ofs,
- std::uint64_t max_entries,
- bs::error_code* ec_out,
- std::vector<fifo::part_list_entry>* entries,
- bool* more,
- bool* full_part,
- std::string* ptag);
-void get_part_info(ReadOp& op,
- bs::error_code* out_ec,
- fifo::part_header* header);
-
-struct marker {
- std::int64_t num = 0;
- std::uint64_t ofs = 0;
-
- marker() = default;
- marker(std::int64_t num, std::uint64_t ofs) : num(num), ofs(ofs) {}
- static marker max() {
- return { std::numeric_limits<decltype(num)>::max(),
- std::numeric_limits<decltype(ofs)>::max() };
- }
-
- std::string to_string() {
- return fmt::format("{:0>20}:{:0>20}", num, ofs);
- }
-};
-
-struct list_entry {
- cb::list data;
- std::string marker;
- ceph::real_time mtime;
-};
-
-using part_info = fifo::part_header;
-
-namespace detail {
-template<typename Handler>
-class JournalProcessor;
-}
-
-/// Completions, Handlers, and CompletionTokens
-/// ===========================================
-///
-/// This class is based on Boost.Asio. For information, see
-/// https://www.boost.org/doc/libs/1_74_0/doc/html/boost_asio.html
-///
-/// As summary, Asio's design is that of functions taking completion
-/// handlers. Every handler has a signature, like
-/// (boost::system::error_code, std::string). The completion handler
-/// receives the result of the function, and the signature is the type
-/// of that result.
-///
-/// The completion handler is specified with a CompletionToken. The
-/// CompletionToken is any type that has a specialization of
-/// async_complete and async_result. See
-/// https://www.boost.org/doc/libs/1_74_0/doc/html/boost_asio/reference/async_completion.html
-/// and https://www.boost.org/doc/libs/1_74_0/doc/html/boost_asio/reference/async_result.html
-///
-/// The return type of a function taking a CompletionToken is
-/// async_result<CompletionToken, Signature>::return_type.
-///
-/// Functions
-/// ---------
-///
-/// The default implementations treat whatever value is described as a
-/// function, whose parameters correspond to the signature, and calls
-/// it upon completion.
-///
-/// EXAMPLE:
-/// Let f be an asynchronous function whose signature is (bs::error_code, int)
-/// Let g be an asynchronous function whose signature is
-/// (bs::error_code, int, std::string).
-///
-///
-/// f([](bs::error_code ec, int i) { ... });
-/// g([](bs::error_code ec, int i, std::string s) { ... });
-///
-/// Will schedule asynchronous tasks, and the provided lambdas will be
-/// called on completion. In this case, f and g return void.
-///
-/// There are other specializations. Commonly used ones are.
-///
-/// Futures
-/// -------
-///
-/// A CompletionToken of boost::asio::use_future will complete with a
-/// promise whose type matches (minus any initial error_code) the
-/// function's signature. The corresponding future is returned. If the
-/// error_code of the result is non-zero, the future is set with an
-/// exception of type boost::asio::system_error.
-///
-/// See https://www.boost.org/doc/libs/1_74_0/doc/html/boost_asio/reference/use_future_t.html
-///
-/// EXAMPLE:
-///
-/// std::future<int> = f(ba::use_future);
-/// std::future<std::tuple<int, std::string> = g(ba::use_future).
-///
-/// Coroutines
-/// ----------
-///
-/// A CompletionToken of type spawn::yield_context suspends execution
-/// of the current coroutine until completion of the operation. See
-/// src/spawn/README.md
-/// https://www.boost.org/doc/libs/1_74_0/doc/html/boost_asio/reference/spawn.html and
-/// https://www.boost.org/doc/libs/1_74_0/doc/html/boost_asio/reference/yield_context.html
-///
-/// Operations given this CompletionToken return their results, modulo
-/// any leading error_code. A non-zero error code will be thrown, by
-/// default, but may be bound to a variable instead with the overload
-/// of the array-subscript oeprator.
-///
-/// EXAMPLE:
-/// // Within a function with a yield_context parameter named y
-///
-/// try {
-/// int i = f(y);
-/// } catch (const bs::system_error& ec) { ... }
-///
-/// bs::error_code ec;
-/// auto [i, s] = g(y[ec]);
-///
-/// Blocking calls
-/// --------------
-///
-/// ceph::async::use_blocked, defined in src/common/async/blocked_completion.h
-/// Suspends the current thread of execution, returning the results of
-/// the operation on resumption. Its calling convention is analogous to
-/// that of yield_context.
-///
-/// EXAMPLE:
-/// try {
-/// int i = f(ca::use_blocked);
-/// } catch (const bs::system_error& e) { ... }
-///
-/// bs::error_code ec;
-/// auto [i, s] = g(ca::use_blocked[ec]);
-///
-/// librados Completions
-/// --------------------
-///
-/// If src/common/async/librados_completion.h is included in the
-/// current translation unit, then librados::AioCompletion* may be used
-/// as a CompletionToken. This is only permitted when the completion
-/// signature is either bs::system_error or void. The return type of
-/// functions provided a CompletionToken of AioCompletion* is void. If
-/// the signature includes an error code and the error code is set,
-/// then the error is translated to an int which is set as the result
-/// of the AioCompletion.
-///
-/// EXAMPLE:
-/// // Assume an asynchronous function h whose signature is bs::error_code.
-///
-/// AioCompletion* c = Rados::aio_create_completion();
-/// h(c);
-/// int r = c.get_return_value();
-///
-/// See also src/test/cls_fifo/bench_cls_fifo.cc for a full, simple
-/// example of a program using this class with coroutines.
-///
-///
-/// Markers
-/// =======
-///
-/// Markers represent a position within the FIFO. Internally, they are
-/// part/offset pairs. Externally, they are ordered but otherwise
-/// opaque strings. Markers that compare lower denote positions closer
-/// to the tail.
-///
-/// A marker is returned with every entry from a list() operation. They
-/// may be supplied to a list operation to resume from a given
-/// position, and must be supplied to trim give the position to which
-/// to trim.
-
-class FIFO {
-public:
-
- FIFO(const FIFO&) = delete;
- FIFO& operator =(const FIFO&) = delete;
- FIFO(FIFO&&) = delete;
- FIFO& operator =(FIFO&&) = delete;
-
- /// Open an existing FIFO.
- /// Signature: (bs::error_code ec, std::unique_ptr<FIFO> f)
- template<typename CT>
- static auto open(RADOS& r, //< RADOS handle
- const IOContext& ioc, //< Context for pool, namespace, etc.
- Object oid, //< OID for the 'main' object of the FIFO
- CT&& ct, //< CompletionToken
- /// Fail if is not this version
- std::optional<fifo::objv> objv = std::nullopt,
- /// Default executor. By default use the one
- /// associated with the RADOS handle.
- std::optional<ba::executor> executor = std::nullopt) {
- ba::async_completion<CT, void(bs::error_code,
- std::unique_ptr<FIFO>)> init(ct);
- auto e = ba::get_associated_executor(init.completion_handler,
- executor.value_or(r.get_executor()));
- auto a = ba::get_associated_allocator(init.completion_handler);
- _read_meta_(
- &r, oid, ioc, objv,
- ca::bind_ea(
- e, a,
- [&r, ioc, oid, executor, handler = std::move(init.completion_handler)]
- (bs::error_code ec, fifo::info info,
- std::uint32_t size, std::uint32_t over) mutable {
- std::unique_ptr<FIFO> f(
- new FIFO(r, ioc, oid, executor.value_or(r.get_executor())));
- f->info = info;
- f->part_header_size = size;
- f->part_entry_overhead = over;
- // If there are journal entries, process them, in case
- // someone crashed mid-transaction.
- if (!ec && !info.journal.empty()) {
- auto e = ba::get_associated_executor(handler, f->get_executor());
- auto a = ba::get_associated_allocator(handler);
- auto g = f.get();
- g->_process_journal(
- ca::bind_ea(
- e, a,
- [f = std::move(f),
- handler = std::move(handler)](bs::error_code ec) mutable {
- std::move(handler)(ec, std::move(f));
- }));
- return;
- }
- std::move(handler)(ec, std::move(f));
- return;
- }));
- return init.result.get();
- }
-
- /// Open an existing or create a new FIFO.
- /// Signature: (bs::error_code ec, std::unique_ptr<FIFO> f)
- template<typename CT>
- static auto create(RADOS& r, /// RADOS handle
- const IOContext& ioc, /// Context for pool, namespace, etc.
- Object oid, /// OID for the 'main' object of the FIFO
- CT&& ct, /// CompletionToken
- /// Fail if FIFO exists and is not this version
- std::optional<fifo::objv> objv = std::nullopt,
- /// Custom prefix for parts
- std::optional<std::string_view> oid_prefix = std::nullopt,
- /// Fail if FIFO already exists
- bool exclusive = false,
- /// Size at which a part is considered full
- std::uint64_t max_part_size = default_max_part_size,
- /// Maximum size of any entry
- std::uint64_t max_entry_size = default_max_entry_size,
- /// Default executor. By default use the one
- /// associated with the RADOS handle.
- std::optional<ba::executor> executor = std::nullopt) {
- ba::async_completion<CT, void(bs::error_code,
- std::unique_ptr<FIFO>)> init(ct);
- WriteOp op;
- create_meta(op, oid, objv, oid_prefix, exclusive, max_part_size,
- max_entry_size);
- auto e = ba::get_associated_executor(init.completion_handler,
- executor.value_or(r.get_executor()));
- auto a = ba::get_associated_allocator(init.completion_handler);
- r.execute(
- oid, ioc, std::move(op),
- ca::bind_ea(
- e, a,
- [objv, &r, ioc, oid, executor, handler = std::move(init.completion_handler)]
- (bs::error_code ec) mutable {
- if (ec) {
- std::move(handler)(ec, nullptr);
- return;
- }
- auto e = ba::get_associated_executor(
- handler, executor.value_or(r.get_executor()));
- auto a = ba::get_associated_allocator(handler);
- FIFO::_read_meta_(
- &r, oid, ioc, objv,
- ca::bind_ea(
- e, a,
- [&r, ioc, executor, oid, handler = std::move(handler)]
- (bs::error_code ec, fifo::info info,
- std::uint32_t size, std::uint32_t over) mutable {
- std::unique_ptr<FIFO> f(
- new FIFO(r, ioc, oid, executor.value_or(r.get_executor())));
- f->info = info;
- f->part_header_size = size;
- f->part_entry_overhead = over;
- if (!ec && !info.journal.empty()) {
- auto e = ba::get_associated_executor(handler,
- f->get_executor());
- auto a = ba::get_associated_allocator(handler);
- auto g = f.get();
- g->_process_journal(
- ca::bind_ea(
- e, a,
- [f = std::move(f), handler = std::move(handler)]
- (bs::error_code ec) mutable {
- std::move(handler)(ec, std::move(f));
- }));
- return;
- }
- std::move(handler)(ec, std::move(f));
- }));
- }));
- return init.result.get();
- }
-
- /// Force a re-read of FIFO metadata.
- /// Signature: (bs::error_code ec)
- template<typename CT>
- auto read_meta(CT&& ct, //< CompletionToken
- /// Fail if FIFO not at this version
- std::optional<fifo::objv> objv = std::nullopt) {
- std::unique_lock l(m);
- auto version = info.version;
- l.unlock();
- ba::async_completion<CT, void(bs::error_code)> init(ct);
- auto e = ba::get_associated_executor(init.completion_handler,
- get_executor());
- auto a = ba::get_associated_allocator(init.completion_handler);
- _read_meta_(
- r, oid, ioc, objv,
- ca::bind_ea(
- e, a,
- [this, version, handler = std::move(init.completion_handler)]
- (bs::error_code ec, fifo::info newinfo,
- std::uint32_t size, std::uint32_t over) mutable {
- std::unique_lock l(m);
- if (version == info.version) {
- info = newinfo;
- part_header_size = size;
- part_entry_overhead = over;
- }
- l.unlock();
- return std::move(handler)(ec);
- }));
- return init.result.get();
- }
-
- /// Return a reference to currently known metadata
- const fifo::info& meta() const {
- return info;
- }
-
- /// Return header size and entry overhead of partitions.
- std::pair<std::uint32_t, std::uint32_t> get_part_layout_info() {
- return {part_header_size, part_entry_overhead};
- }
-
- /// Push a single entry to the FIFO.
- /// Signature: (bs::error_code)
- template<typename CT>
- auto push(const cb::list& bl, //< Bufferlist holding entry to push
- CT&& ct //< CompletionToken
- ) {
- return push(std::vector{ bl }, std::forward<CT>(ct));
- }
-
- /// Push a many entries to the FIFO.
- /// Signature: (bs::error_code)
- template<typename CT>
- auto push(const std::vector<cb::list>& data_bufs, //< Entries to push
- CT&& ct //< CompletionToken
- ) {
- ba::async_completion<CT, void(bs::error_code)> init(ct);
- std::unique_lock l(m);
- auto max_entry_size = info.params.max_entry_size;
- auto need_new_head = info.need_new_head();
- l.unlock();
- auto e = ba::get_associated_executor(init.completion_handler,
- get_executor());
- auto a = ba::get_associated_allocator(init.completion_handler);
- if (data_bufs.empty() ) {
- // Can't fail if you don't try.
- e.post(ca::bind_handler(std::move(init.completion_handler),
- bs::error_code{}), a);
- return init.result.get();
- }
-
- // Validate sizes
- for (const auto& bl : data_bufs) {
- if (bl.length() > max_entry_size) {
- ldout(r->cct(), 10) << __func__ << "(): entry too large: "
- << bl.length() << " > "
- << info.params.max_entry_size << dendl;
- e.post(ca::bind_handler(std::move(init.completion_handler),
- errc::entry_too_large), a);
- return init.result.get();
- }
- }
-
- auto p = ca::bind_ea(e, a,
- Pusher(this, {data_bufs.begin(), data_bufs.end()},
- {}, 0, std::move(init.completion_handler)));
-
- if (need_new_head) {
- _prepare_new_head(std::move(p));
- } else {
- e.dispatch(std::move(p), a);
- }
- return init.result.get();
- }
-
- /// List the entries in a FIFO
- /// Signature(bs::error_code ec, bs::vector<list_entry> entries, bool more)
- ///
- /// More is true if entries beyond the last exist.
- /// The list entries are of the form:
- /// data - Contents of the entry
- /// marker - String representing the position of this entry within the FIFO.
- /// mtime - Time (on the OSD) at which the entry was pushed.
- template<typename CT>
- auto list(int max_entries, //< Maximum number of entries to fetch
- /// Optionally, a marker indicating the position after
- /// which to begin listing. If null, begin at the tail.
- std::optional<std::string_view> markstr,
- CT&& ct //< CompletionToken
- ) {
- ba::async_completion<CT, void(bs::error_code,
- std::vector<list_entry>, bool)> init(ct);
- std::unique_lock l(m);
- std::int64_t part_num = info.tail_part_num;
- l.unlock();
- std::uint64_t ofs = 0;
- auto a = ba::get_associated_allocator(init.completion_handler);
- auto e = ba::get_associated_executor(init.completion_handler);
-
- if (markstr) {
- auto marker = to_marker(*markstr);
- if (!marker) {
- ldout(r->cct(), 0) << __func__
- << "(): failed to parse marker (" << *markstr
- << ")" << dendl;
- e.post(ca::bind_handler(std::move(init.completion_handler),
- errc::invalid_marker,
- std::vector<list_entry>{}, false), a);
- return init.result.get();
- }
- part_num = marker->num;
- ofs = marker->ofs;
- }
-
- using handler_type = decltype(init.completion_handler);
- auto ls = ceph::allocate_unique<Lister<handler_type>>(
- a, this, part_num, ofs, max_entries,
- std::move(init.completion_handler));
- ls.release()->list();
- return init.result.get();
- }
-
- /// Trim entries from the tail to the given position
- /// Signature: (bs::error_code)
- template<typename CT>
- auto trim(std::string_view markstr, //< Position to which to trim, inclusive
- bool exclusive, //< If true, trim markers up to but NOT INCLUDING
- //< markstr, otherwise trim markstr as well.
- CT&& ct //< CompletionToken
- ) {
- auto m = to_marker(markstr);
- ba::async_completion<CT, void(bs::error_code)> init(ct);
- auto a = ba::get_associated_allocator(init.completion_handler);
- auto e = ba::get_associated_executor(init.completion_handler);
- if (!m) {
- ldout(r->cct(), 0) << __func__ << "(): failed to parse marker: marker="
- << markstr << dendl;
- e.post(ca::bind_handler(std::move(init.completion_handler),
- errc::invalid_marker), a);
- return init.result.get();
- } else {
- using handler_type = decltype(init.completion_handler);
- auto t = ceph::allocate_unique<Trimmer<handler_type>>(
- a, this, m->num, m->ofs, exclusive, std::move(init.completion_handler));
- t.release()->trim();
- }
- return init.result.get();
- }
-
- /// Get information about a specific partition
- /// Signature: (bs::error_code, part_info)
- ///
- /// part_info has the following entries
- /// tag - A random string identifying this partition. Used internally
- /// as a sanity check to make sure operations haven't been misdirected
- /// params - Data parameters, identical for every partition within a
- /// FIFO and the same as what is returned from get_part_layout()
- /// magic - A random magic number, used internally as a prefix to
- /// every entry stored on the OSD to ensure sync
- /// min_ofs - Offset of the first entry
- /// max_ofs - Offset of the highest entry
- /// min_index - Minimum entry index
- /// max_index - Maximum entry index
- /// max_time - Time of the latest push
- ///
- /// The difference between ofs and index is that ofs is a byte
- /// offset. Index is a count. Nothing really uses indices, but
- /// they're tracked and sanity-checked as an invariant on the OSD.
- ///
- /// max_ofs and max_time are the two that have been used externally
- /// so far.
- template<typename CT>
- auto get_part_info(int64_t part_num, // The number of the partition
- CT&& ct // CompletionToken
- ) {
-
- ba::async_completion<CT, void(bs::error_code, part_info)> init(ct);
- fifo::op::get_part_info gpi;
- cb::list in;
- encode(gpi, in);
- ReadOp op;
- auto e = ba::get_associated_executor(init.completion_handler,
- get_executor());
- auto a = ba::get_associated_allocator(init.completion_handler);
- auto reply = ceph::allocate_unique<
- ExecDecodeCB<fifo::op::get_part_info_reply>>(a);
-
- op.exec(fifo::op::CLASS, fifo::op::GET_PART_INFO, in,
- std::ref(*reply));
- std::unique_lock l(m);
- auto part_oid = info.part_oid(part_num);
- l.unlock();
- r->execute(part_oid, ioc, std::move(op), nullptr,
- ca::bind_ea(e, a,
- PartInfoGetter(std::move(init.completion_handler),
- std::move(reply))));
- return init.result.get();
- }
-
- using executor_type = ba::executor;
-
- /// Return the default executor, as specified at creation.
- ba::executor get_executor() const {
- return executor;
- }
-
-private:
- template<typename Handler>
- friend class detail::JournalProcessor;
- RADOS* const r;
- const IOContext ioc;
- const Object oid;
- std::mutex m;
-
- fifo::info info;
-
- std::uint32_t part_header_size = 0xdeadbeef;
- std::uint32_t part_entry_overhead = 0xdeadbeef;
-
- ba::executor executor;
-
- std::optional<marker> to_marker(std::string_view s);
-
- template<typename Handler, typename T>
- static void assoc_delete(const Handler& handler, T* t) {
- using Alloc = ba::associated_allocator_t<Handler>;
- using Traits = typename std::allocator_traits<Alloc>;
- using RebindAlloc = typename Traits::template rebind_alloc<T>;
- using RebindTraits = typename std::allocator_traits<RebindAlloc>;
- RebindAlloc a(get_associated_allocator(handler));
- RebindTraits::destroy(a, t);
- RebindTraits::deallocate(a, t, 1);
- }
-
- FIFO(RADOS& r,
- IOContext ioc,
- Object oid,
- ba::executor executor)
- : r(&r), ioc(std::move(ioc)), oid(oid), executor(executor) {}
-
- std::string generate_tag() const;
-
- template <typename T>
- struct ExecDecodeCB {
- bs::error_code ec;
- T result;
- void operator()(bs::error_code e, const cb::list& r) {
- if (e) {
- ec = e;
- return;
- }
- try {
- auto p = r.begin();
- using ceph::decode;
- decode(result, p);
- } catch (const cb::error& err) {
- ec = err.code();
- }
- }
- };
-
- template<typename Handler>
- class MetaReader {
- Handler handler;
- using allocator_type = boost::asio::associated_allocator_t<Handler>;
- using decoder_type = ExecDecodeCB<fifo::op::get_meta_reply>;
- using decoder_ptr = ceph::allocated_unique_ptr<decoder_type, allocator_type>;
- decoder_ptr decoder;
- public:
- MetaReader(Handler&& handler, decoder_ptr&& decoder)
- : handler(std::move(handler)), decoder(std::move(decoder)) {}
-
- void operator ()(bs::error_code ec) {
- if (!ec) {
- ec = decoder->ec;
- }
- auto reply = std::move(decoder->result);
- decoder.reset(); // free handler-allocated memory before dispatching
-
- std::move(handler)(ec, std::move(reply.info),
- std::move(reply.part_header_size),
- std::move(reply.part_entry_overhead));
- }
- };
-
- // Renamed to get around a compiler bug in Bionic that kept
- // complaining we weren't capturing 'this' to make a static function call.
- template<typename Handler>
- static void _read_meta_(RADOS* r, const Object& oid, const IOContext& ioc,
- std::optional<fifo::objv> objv,
- Handler&& handler, /* error_code, info, uint64,
- uint64 */
- std::optional<ba::executor> executor = std::nullopt){
- fifo::op::get_meta gm;
-
- gm.version = objv;
-
- cb::list in;
- encode(gm, in);
- ReadOp op;
-
- auto a = ba::get_associated_allocator(handler);
- auto reply =
- ceph::allocate_unique<ExecDecodeCB<fifo::op::get_meta_reply>>(a);
-
- auto e = ba::get_associated_executor(handler);
- op.exec(fifo::op::CLASS, fifo::op::GET_META, in, std::ref(*reply));
- r->execute(oid, ioc, std::move(op), nullptr,
- ca::bind_ea(e, a, MetaReader(std::move(handler),
- std::move(reply))));
- };
-
- template<typename Handler>
- void _read_meta(Handler&& handler /* error_code */) {
- auto e = ba::get_associated_executor(handler, get_executor());
- auto a = ba::get_associated_allocator(handler);
- _read_meta_(r, oid, ioc,
- std::nullopt,
- ca::bind_ea(
- e, a,
- [this,
- handler = std::move(handler)](bs::error_code ec,
- fifo::info&& info,
- std::uint64_t phs,
- std::uint64_t peo) mutable {
- std::unique_lock l(m);
- if (ec) {
- l.unlock();
- std::move(handler)(ec);
- return;
- }
- // We have a newer version already!
- if (!info.version.same_or_later(this->info.version)) {
- l.unlock();
- std::move(handler)(bs::error_code{});
- return;
- }
- this->info = std::move(info);
- part_header_size = phs;
- part_entry_overhead = peo;
- l.unlock();
- std::move(handler)(bs::error_code{});
- }), get_executor());
- }
-
- bs::error_code apply_update(fifo::info* info,
- const fifo::objv& objv,
- const fifo::update& update);
-
-
- template<typename Handler>
- void _update_meta(const fifo::update& update,
- fifo::objv version,
- Handler&& handler /* error_code, bool */) {
- WriteOp op;
-
- cls::fifo::update_meta(op, info.version, update);
-
- auto a = ba::get_associated_allocator(handler);
- auto e = ba::get_associated_executor(handler, get_executor());
-
- r->execute(
- oid, ioc, std::move(op),
- ca::bind_ea(
- e, a,
- [this, e, a, version, update,
- handler = std::move(handler)](bs::error_code ec) mutable {
- if (ec && ec != bs::errc::operation_canceled) {
- std::move(handler)(ec, bool{});
- return;
- }
-
- auto canceled = (ec == bs::errc::operation_canceled);
-
- if (!canceled) {
- ec = apply_update(&info,
- version,
- update);
- if (ec) {
- canceled = true;
- }
- }
-
- if (canceled) {
- _read_meta(
- ca::bind_ea(
- e, a,
- [handler = std::move(handler)](bs::error_code ec) mutable {
- std::move(handler)(ec, ec ? false : true);
- }));
- return;
- }
- std::move(handler)(ec, false);
- return;
- }));
- }
-
- template<typename Handler>
- auto _process_journal(Handler&& handler /* error_code */) {
- auto a = ba::get_associated_allocator(std::ref(handler));
- auto j = ceph::allocate_unique<detail::JournalProcessor<Handler>>(
- a, this, std::move(handler));
- auto p = j.release();
- p->process();
- }
-
- template<typename Handler>
- class NewPartPreparer {
- FIFO* f;
- Handler handler;
- std::vector<fifo::journal_entry> jentries;
- int i;
- std::int64_t new_head_part_num;
-
- public:
-
- void operator ()(bs::error_code ec, bool canceled) {
- if (ec) {
- std::move(handler)(ec);
- return;
- }
-
- if (canceled) {
- std::unique_lock l(f->m);
- auto iter = f->info.journal.find(jentries.front().part_num);
- auto max_push_part_num = f->info.max_push_part_num;
- auto head_part_num = f->info.head_part_num;
- auto version = f->info.version;
- auto found = (iter != f->info.journal.end());
- l.unlock();
- if ((max_push_part_num >= jentries.front().part_num &&
- head_part_num >= new_head_part_num)) {
- /* raced, but new part was already written */
- std::move(handler)(bs::error_code{});
- return;
- }
- if (i >= MAX_RACE_RETRIES) {
- std::move(handler)(errc::raced);
- return;
- }
- if (!found) {
- auto e = ba::get_associated_executor(handler, f->get_executor());
- auto a = ba::get_associated_allocator(handler);
- f->_update_meta(fifo::update{}
- .journal_entries_add(jentries),
- version,
- ca::bind_ea(
- e, a,
- NewPartPreparer(f, std::move(handler),
- jentries,
- i + 1, new_head_part_num)));
- return;
- }
- // Fall through. We still need to process the journal.
- }
- f->_process_journal(std::move(handler));
- return;
- }
-
- NewPartPreparer(FIFO* f,
- Handler&& handler,
- std::vector<fifo::journal_entry> jentries,
- int i, std::int64_t new_head_part_num)
- : f(f), handler(std::move(handler)), jentries(std::move(jentries)),
- i(i), new_head_part_num(new_head_part_num) {}
- };
-
- template<typename Handler>
- void _prepare_new_part(bool is_head,
- Handler&& handler /* error_code */) {
- std::unique_lock l(m);
- std::vector jentries = { info.next_journal_entry(generate_tag()) };
- std::int64_t new_head_part_num = info.head_part_num;
- auto version = info.version;
-
- if (is_head) {
- auto new_head_jentry = jentries.front();
- new_head_jentry.op = fifo::journal_entry::Op::set_head;
- new_head_part_num = jentries.front().part_num;
- jentries.push_back(std::move(new_head_jentry));
- }
- l.unlock();
-
- auto e = ba::get_associated_executor(handler, get_executor());
- auto a = ba::get_associated_allocator(handler);
- _update_meta(fifo::update{}.journal_entries_add(jentries),
- version,
- ca::bind_ea(
- e, a,
- NewPartPreparer(this, std::move(handler),
- jentries, 0, new_head_part_num)));
- }
-
- template<typename Handler>
- class NewHeadPreparer {
- FIFO* f;
- Handler handler;
- int i;
- std::int64_t new_head_num;
-
- public:
-
- void operator ()(bs::error_code ec, bool canceled) {
- std::unique_lock l(f->m);
- auto head_part_num = f->info.head_part_num;
- auto version = f->info.version;
- l.unlock();
-
- if (ec) {
- std::move(handler)(ec);
- return;
- }
- if (canceled) {
- if (i >= MAX_RACE_RETRIES) {
- std::move(handler)(errc::raced);
- return;
- }
-
- // Raced, but there's still work to do!
- if (head_part_num < new_head_num) {
- auto e = ba::get_associated_executor(handler, f->get_executor());
- auto a = ba::get_associated_allocator(handler);
- f->_update_meta(fifo::update{}.head_part_num(new_head_num),
- version,
- ca::bind_ea(
- e, a,
- NewHeadPreparer(f, std::move(handler),
- i + 1,
- new_head_num)));
- return;
- }
- }
- // Either we succeeded, or we were raced by someone who did it for us.
- std::move(handler)(bs::error_code{});
- return;
- }
-
- NewHeadPreparer(FIFO* f,
- Handler&& handler,
- int i, std::int64_t new_head_num)
- : f(f), handler(std::move(handler)), i(i), new_head_num(new_head_num) {}
- };
-
- template<typename Handler>
- void _prepare_new_head(Handler&& handler /* error_code */) {
- std::unique_lock l(m);
- int64_t new_head_num = info.head_part_num + 1;
- auto max_push_part_num = info.max_push_part_num;
- auto version = info.version;
- l.unlock();
-
- if (max_push_part_num < new_head_num) {
- auto e = ba::get_associated_executor(handler, get_executor());
- auto a = ba::get_associated_allocator(handler);
- _prepare_new_part(
- true,
- ca::bind_ea(
- e, a,
- [this, new_head_num,
- handler = std::move(handler)](bs::error_code ec) mutable {
- if (ec) {
- handler(ec);
- return;
- }
- std::unique_lock l(m);
- if (info.max_push_part_num < new_head_num) {
- l.unlock();
- ldout(r->cct(), 0)
- << "ERROR: " << __func__
- << ": after new part creation: meta_info.max_push_part_num="
- << info.max_push_part_num << " new_head_num="
- << info.max_push_part_num << dendl;
- std::move(handler)(errc::inconsistency);
- } else {
- l.unlock();
- std::move(handler)(bs::error_code{});
- }
- }));
- return;
- }
- auto e = ba::get_associated_executor(handler, get_executor());
- auto a = ba::get_associated_allocator(handler);
- _update_meta(fifo::update{}.head_part_num(new_head_num),
- version,
- ca::bind_ea(
- e, a,
- NewHeadPreparer(this, std::move(handler), 0,
- new_head_num)));
- }
-
- template<typename T>
- struct ExecHandleCB {
- bs::error_code ec;
- T result;
- void operator()(bs::error_code e, const T& t) {
- if (e) {
- ec = e;
- return;
- }
- result = t;
- }
- };
-
- template<typename Handler>
- class EntryPusher {
- Handler handler;
- using allocator_type = boost::asio::associated_allocator_t<Handler>;
- using decoder_type = ExecHandleCB<int>;
- using decoder_ptr = ceph::allocated_unique_ptr<decoder_type, allocator_type>;
- decoder_ptr decoder;
-
- public:
-
- EntryPusher(Handler&& handler, decoder_ptr&& decoder)
- : handler(std::move(handler)), decoder(std::move(decoder)) {}
-
- void operator ()(bs::error_code ec) {
- if (!ec) {
- ec = decoder->ec;
- }
- auto reply = std::move(decoder->result);
- decoder.reset(); // free handler-allocated memory before dispatching
-
- std::move(handler)(ec, std::move(reply));
- }
- };
-
- template<typename Handler>
- auto push_entries(const std::deque<cb::list>& data_bufs,
- Handler&& handler /* error_code, int */) {
- WriteOp op;
- std::unique_lock l(m);
- auto head_part_num = info.head_part_num;
- auto tag = info.head_tag;
- auto oid = info.part_oid(head_part_num);
- l.unlock();
-
- auto a = ba::get_associated_allocator(handler);
- auto reply = ceph::allocate_unique<ExecHandleCB<int>>(a);
-
- auto e = ba::get_associated_executor(handler, get_executor());
- push_part(op, tag, data_bufs, std::ref(*reply));
- return r->execute(oid, ioc, std::move(op),
- ca::bind_ea(e, a, EntryPusher(std::move(handler),
- std::move(reply))));
- }
-
- template<typename CT>
- auto trim_part(int64_t part_num,
- uint64_t ofs,
- std::optional<std::string_view> tag,
- bool exclusive,
- CT&& ct) {
- WriteOp op;
- cls::fifo::trim_part(op, tag, ofs, exclusive);
- return r->execute(info.part_oid(part_num), ioc, std::move(op),
- std::forward<CT>(ct));
- }
-
-
- template<typename Handler>
- class Pusher {
- FIFO* f;
- std::deque<cb::list> remaining;
- std::deque<cb::list> batch;
- int i;
- Handler handler;
-
- void prep_then_push(const unsigned successes) {
- std::unique_lock l(f->m);
- auto max_part_size = f->info.params.max_part_size;
- auto part_entry_overhead = f->part_entry_overhead;
- l.unlock();
-
- uint64_t batch_len = 0;
- if (successes > 0) {
- if (successes == batch.size()) {
- batch.clear();
- } else {
- batch.erase(batch.begin(), batch.begin() + successes);
- for (const auto& b : batch) {
- batch_len += b.length() + part_entry_overhead;
- }
- }
- }
-
- if (batch.empty() && remaining.empty()) {
- std::move(handler)(bs::error_code{});
- return;
- }
-
- while (!remaining.empty() &&
- (remaining.front().length() + batch_len <= max_part_size)) {
-
- /* We can send entries with data_len up to max_entry_size,
- however, we want to also account the overhead when
- dealing with multiple entries. Previous check doesn't
- account for overhead on purpose. */
- batch_len += remaining.front().length() + part_entry_overhead;
- batch.push_back(std::move(remaining.front()));
- remaining.pop_front();
- }
- push();
- }
-
- void push() {
- auto e = ba::get_associated_executor(handler, f->get_executor());
- auto a = ba::get_associated_allocator(handler);
- f->push_entries(batch,
- ca::bind_ea(e, a,
- Pusher(f, std::move(remaining),
- batch, i,
- std::move(handler))));
- }
-
- public:
-
- // Initial call!
- void operator ()() {
- prep_then_push(0);
- }
-
- // Called with response to push_entries
- void operator ()(bs::error_code ec, int r) {
- if (ec == bs::errc::result_out_of_range) {
- auto e = ba::get_associated_executor(handler, f->get_executor());
- auto a = ba::get_associated_allocator(handler);
- f->_prepare_new_head(
- ca::bind_ea(e, a,
- Pusher(f, std::move(remaining),
- std::move(batch), i,
- std::move(handler))));
- return;
- }
- if (ec) {
- std::move(handler)(ec);
- return;
- }
- i = 0; // We've made forward progress, so reset the race counter!
- prep_then_push(r);
- }
-
- // Called with response to prepare_new_head
- void operator ()(bs::error_code ec) {
- if (ec == bs::errc::operation_canceled) {
- if (i == MAX_RACE_RETRIES) {
- ldout(f->r->cct(), 0)
- << "ERROR: " << __func__
- << "(): race check failed too many times, likely a bug" << dendl;
- std::move(handler)(make_error_code(errc::raced));
- return;
- }
- ++i;
- } else if (ec) {
- std::move(handler)(ec);
- return;
- }
-
- if (batch.empty()) {
- prep_then_push(0);
- return;
- } else {
- push();
- return;
- }
- }
-
- Pusher(FIFO* f, std::deque<cb::list>&& remaining,
- std::deque<cb::list> batch, int i,
- Handler&& handler)
- : f(f), remaining(std::move(remaining)),
- batch(std::move(batch)), i(i),
- handler(std::move(handler)) {}
- };
-
- template<typename Handler>
- class Lister {
- FIFO* f;
- std::vector<list_entry> result;
- bool more = false;
- std::int64_t part_num;
- std::uint64_t ofs;
- int max_entries;
- bs::error_code ec_out;
- std::vector<fifo::part_list_entry> entries;
- bool part_more = false;
- bool part_full = false;
- Handler handler;
-
- void handle(bs::error_code ec) {
- auto h = std::move(handler);
- auto m = more;
- auto r = std::move(result);
-
- FIFO::assoc_delete(h, this);
- std::move(h)(ec, std::move(r), m);
- }
-
- public:
- Lister(FIFO* f, std::int64_t part_num, std::uint64_t ofs, int max_entries,
- Handler&& handler)
- : f(f), part_num(part_num), ofs(ofs), max_entries(max_entries),
- handler(std::move(handler)) {
- result.reserve(max_entries);
- }
-
-
- Lister(const Lister&) = delete;
- Lister& operator =(const Lister&) = delete;
- Lister(Lister&&) = delete;
- Lister& operator =(Lister&&) = delete;
-
- void list() {
- if (max_entries > 0) {
- ReadOp op;
- ec_out.clear();
- part_more = false;
- part_full = false;
- entries.clear();
-
- std::unique_lock l(f->m);
- auto part_oid = f->info.part_oid(part_num);
- l.unlock();
-
- list_part(op,
- {},
- ofs,
- max_entries,
- &ec_out,
- &entries,
- &part_more,
- &part_full,
- nullptr);
- auto e = ba::get_associated_executor(handler, f->get_executor());
- auto a = ba::get_associated_allocator(handler);
- f->r->execute(
- part_oid,
- f->ioc,
- std::move(op),
- nullptr,
- ca::bind_ea(
- e, a,
- [t = std::unique_ptr<Lister>(this), this,
- part_oid](bs::error_code ec) mutable {
- t.release();
- if (ec == bs::errc::no_such_file_or_directory) {
- auto e = ba::get_associated_executor(handler,
- f->get_executor());
- auto a = ba::get_associated_allocator(handler);
- f->_read_meta(
- ca::bind_ea(
- e, a,
- [this](bs::error_code ec) mutable {
- if (ec) {
- handle(ec);
- return;
- }
-
- if (part_num < f->info.tail_part_num) {
- /* raced with trim? restart */
- max_entries += result.size();
- result.clear();
- part_num = f->info.tail_part_num;
- ofs = 0;
- list();
- }
- /* assuming part was not written yet, so end of data */
- more = false;
- handle({});
- return;
- }));
- return;
- }
- if (ec) {
- ldout(f->r->cct(), 0)
- << __func__
- << "(): list_part() on oid=" << part_oid
- << " returned ec=" << ec.message() << dendl;
- handle(ec);
- return;
- }
- if (ec_out) {
- ldout(f->r->cct(), 0)
- << __func__
- << "(): list_part() on oid=" << f->info.part_oid(part_num)
- << " returned ec=" << ec_out.message() << dendl;
- handle(ec_out);
- return;
- }
-
- more = part_full || part_more;
- for (auto& entry : entries) {
- list_entry e;
- e.data = std::move(entry.data);
- e.marker = marker{part_num, entry.ofs}.to_string();
- e.mtime = entry.mtime;
- result.push_back(std::move(e));
- }
- max_entries -= entries.size();
- entries.clear();
- if (max_entries > 0 &&
- part_more) {
- list();
- return;
- }
-
- if (!part_full) { /* head part is not full */
- handle({});
- return;
- }
- ++part_num;
- ofs = 0;
- list();
- }));
- } else {
- handle({});
- return;
- }
- }
- };
-
- template<typename Handler>
- class Trimmer {
- FIFO* f;
- std::int64_t part_num;
- std::uint64_t ofs;
- bool exclusive;
- Handler handler;
- std::int64_t pn;
- int i = 0;
-
- void handle(bs::error_code ec) {
- auto h = std::move(handler);
-
- FIFO::assoc_delete(h, this);
- return std::move(h)(ec);
- }
-
- void update() {
- std::unique_lock l(f->m);
- auto objv = f->info.version;
- l.unlock();
- auto a = ba::get_associated_allocator(handler);
- auto e = ba::get_associated_executor(handler, f->get_executor());
- f->_update_meta(
- fifo::update{}.tail_part_num(part_num),
- objv,
- ca::bind_ea(
- e, a,
- [this, t = std::unique_ptr<Trimmer>(this)](bs::error_code ec,
- bool canceled) mutable {
- t.release();
- if (canceled)
- if (i >= MAX_RACE_RETRIES) {
- ldout(f->r->cct(), 0)
- << "ERROR: " << __func__
- << "(): race check failed too many times, likely a bug"
- << dendl;
- handle(errc::raced);
- return;
- }
- std::unique_lock l(f->m);
- auto tail_part_num = f->info.tail_part_num;
- l.unlock();
- if (tail_part_num < part_num) {
- ++i;
- update();
- return;
- }
- handle({});
- return;
- }));
- }
-
- public:
- Trimmer(FIFO* f, std::int64_t part_num, std::uint64_t ofs,
- bool exclusive, Handler&& handler)
- : f(f), part_num(part_num), ofs(ofs), exclusive(exclusive),
- handler(std::move(handler)) {
- std::unique_lock l(f->m);
- pn = f->info.tail_part_num;
- }
-
- void trim() {
- auto a = ba::get_associated_allocator(handler);
- auto e = ba::get_associated_executor(handler, f->get_executor());
- if (pn < part_num) {
- std::unique_lock l(f->m);
- auto max_part_size = f->info.params.max_part_size;
- l.unlock();
- f->trim_part(
- pn, max_part_size, std::nullopt,
- false,
- ca::bind_ea(
- e, a,
- [t = std::unique_ptr<Trimmer>(this),
- this](bs::error_code ec) mutable {
- t.release();
- if (ec && ec != bs::errc::no_such_file_or_directory) {
- ldout(f->r->cct(), 0)
- << __func__ << "(): ERROR: trim_part() on part="
- << pn << " returned ec=" << ec.message() << dendl;
- handle(ec);
- return;
- }
- ++pn;
- trim();
- }));
- return;
- }
- f->trim_part(
- part_num, ofs, std::nullopt, exclusive,
- ca::bind_ea(
- e, a,
- [t = std::unique_ptr<Trimmer>(this),
- this](bs::error_code ec) mutable {
- t.release();
- if (ec && ec != bs::errc::no_such_file_or_directory) {
- ldout(f->r->cct(), 0)
- << __func__ << "(): ERROR: trim_part() on part=" << part_num
- << " returned ec=" << ec.message() << dendl;
- handle(ec);
- return;
- }
- std::unique_lock l(f->m);
- auto tail_part_num = f->info.tail_part_num;
- l.unlock();
- if (part_num <= tail_part_num) {
- /* don't need to modify meta info */
- handle({});
- return;
- }
- update();
- }));
- }
- };
-
- template<typename Handler>
- class PartInfoGetter {
- Handler handler;
- using allocator_type = boost::asio::associated_allocator_t<Handler>;
- using decoder_type = ExecDecodeCB<fifo::op::get_part_info_reply>;
- using decoder_ptr = ceph::allocated_unique_ptr<decoder_type, allocator_type>;
- decoder_ptr decoder;
- public:
- PartInfoGetter(Handler&& handler, decoder_ptr&& decoder)
- : handler(std::move(handler)), decoder(std::move(decoder)) {}
-
- void operator ()(bs::error_code ec) {
- if (!ec) {
- ec = decoder->ec;
- }
- auto reply = std::move(decoder->result);
- decoder.reset(); // free handler-allocated memory before dispatching
-
- auto p = ca::bind_handler(std::move(handler),
- ec, std::move(reply.header));
- std::move(p)();
- }
- };
-
-
-};
-
-namespace detail {
-template<typename Handler>
-class JournalProcessor {
-private:
- FIFO* const fifo;
- Handler handler;
-
- std::vector<fifo::journal_entry> processed;
- std::multimap<std::int64_t, fifo::journal_entry> journal;
- std::multimap<std::int64_t, fifo::journal_entry>::iterator iter;
- std::int64_t new_tail;
- std::int64_t new_head;
- std::int64_t new_max;
- int race_retries = 0;
-
- template<typename CT>
- auto create_part(int64_t part_num, std::string_view tag, CT&& ct) {
- WriteOp op;
- op.create(false); /* We don't need exclusivity, part_init ensures
- we're creating from the same journal entry. */
- std::unique_lock l(fifo->m);
- part_init(op, tag, fifo->info.params);
- auto oid = fifo->info.part_oid(part_num);
- l.unlock();
- return fifo->r->execute(oid, fifo->ioc,
- std::move(op), std::forward<CT>(ct));
- }
-
- template<typename CT>
- auto remove_part(int64_t part_num, std::string_view tag, CT&& ct) {
- WriteOp op;
- op.remove();
- std::unique_lock l(fifo->m);
- auto oid = fifo->info.part_oid(part_num);
- l.unlock();
- return fifo->r->execute(oid, fifo->ioc,
- std::move(op), std::forward<CT>(ct));
- }
-
- template<typename PP>
- void process_journal_entry(const fifo::journal_entry& entry,
- PP&& pp) {
- switch (entry.op) {
- case fifo::journal_entry::Op::unknown:
- std::move(pp)(errc::inconsistency);
- return;
- break;
-
- case fifo::journal_entry::Op::create:
- create_part(entry.part_num, entry.part_tag, std::move(pp));
- return;
- break;
- case fifo::journal_entry::Op::set_head:
- ba::post(ba::get_associated_executor(handler, fifo->get_executor()),
- [pp = std::move(pp)]() mutable {
- std::move(pp)(bs::error_code{});
- });
- return;
- break;
- case fifo::journal_entry::Op::remove:
- remove_part(entry.part_num, entry.part_tag, std::move(pp));
- return;
- break;
- }
- std::move(pp)(errc::inconsistency);
- return;
- }
-
- auto journal_entry_finisher(const fifo::journal_entry& entry) {
- auto a = ba::get_associated_allocator(handler);
- auto e = ba::get_associated_executor(handler, fifo->get_executor());
- return
- ca::bind_ea(
- e, a,
- [t = std::unique_ptr<JournalProcessor>(this), this,
- entry](bs::error_code ec) mutable {
- t.release();
- if (entry.op == fifo::journal_entry::Op::remove &&
- ec == bs::errc::no_such_file_or_directory)
- ec.clear();
-
- if (ec) {
- ldout(fifo->r->cct(), 0)
- << __func__
- << "(): ERROR: failed processing journal entry for part="
- << entry.part_num << " with error " << ec.message()
- << " Bug or inconsistency." << dendl;
- handle(errc::inconsistency);
- return;
- } else {
- switch (entry.op) {
- case fifo::journal_entry::Op::unknown:
- // Can't happen. Filtered out in process_journal_entry.
- abort();
- break;
-
- case fifo::journal_entry::Op::create:
- if (entry.part_num > new_max) {
- new_max = entry.part_num;
- }
- break;
- case fifo::journal_entry::Op::set_head:
- if (entry.part_num > new_head) {
- new_head = entry.part_num;
- }
- break;
- case fifo::journal_entry::Op::remove:
- if (entry.part_num >= new_tail) {
- new_tail = entry.part_num + 1;
- }
- break;
- }
- processed.push_back(entry);
- }
- ++iter;
- process();
- });
- }
-
- struct JournalPostprocessor {
- std::unique_ptr<JournalProcessor> j_;
- bool first;
- void operator ()(bs::error_code ec, bool canceled) {
- std::optional<int64_t> tail_part_num;
- std::optional<int64_t> head_part_num;
- std::optional<int64_t> max_part_num;
-
- auto j = j_.release();
-
- if (!first && !ec && !canceled) {
- j->handle({});
- return;
- }
-
- if (canceled) {
- if (j->race_retries >= MAX_RACE_RETRIES) {
- ldout(j->fifo->r->cct(), 0) << "ERROR: " << __func__ <<
- "(): race check failed too many times, likely a bug" << dendl;
- j->handle(errc::raced);
- return;
- }
-
- ++j->race_retries;
-
- std::vector<fifo::journal_entry> new_processed;
- std::unique_lock l(j->fifo->m);
- for (auto& e : j->processed) {
- auto jiter = j->fifo->info.journal.find(e.part_num);
- /* journal entry was already processed */
- if (jiter == j->fifo->info.journal.end() ||
- !(jiter->second == e)) {
- continue;
- }
- new_processed.push_back(e);
- }
- j->processed = std::move(new_processed);
- }
-
- std::unique_lock l(j->fifo->m);
- auto objv = j->fifo->info.version;
- if (j->new_tail > j->fifo->info.tail_part_num) {
- tail_part_num = j->new_tail;
- }
-
- if (j->new_head > j->fifo->info.head_part_num) {
- head_part_num = j->new_head;
- }
-
- if (j->new_max > j->fifo->info.max_push_part_num) {
- max_part_num = j->new_max;
- }
- l.unlock();
-
- if (j->processed.empty() &&
- !tail_part_num &&
- !max_part_num) {
- /* nothing to update anymore */
- j->handle({});
- return;
- }
- auto a = ba::get_associated_allocator(j->handler);
- auto e = ba::get_associated_executor(j->handler, j->fifo->get_executor());
- j->fifo->_update_meta(fifo::update{}
- .tail_part_num(tail_part_num)
- .head_part_num(head_part_num)
- .max_push_part_num(max_part_num)
- .journal_entries_rm(j->processed),
- objv,
- ca::bind_ea(
- e, a,
- JournalPostprocessor{j, false}));
- return;
- }
-
- JournalPostprocessor(JournalProcessor* j, bool first)
- : j_(j), first(first) {}
- };
-
- void postprocess() {
- if (processed.empty()) {
- handle({});
- return;
- }
- JournalPostprocessor(this, true)({}, false);
- }
-
- void handle(bs::error_code ec) {
- auto e = ba::get_associated_executor(handler, fifo->get_executor());
- auto a = ba::get_associated_allocator(handler);
- auto h = std::move(handler);
- FIFO::assoc_delete(h, this);
- e.dispatch(ca::bind_handler(std::move(h), ec), a);
- return;
- }
-
-public:
-
- JournalProcessor(FIFO* fifo, Handler&& handler)
- : fifo(fifo), handler(std::move(handler)) {
- std::unique_lock l(fifo->m);
- journal = fifo->info.journal;
- iter = journal.begin();
- new_tail = fifo->info.tail_part_num;
- new_head = fifo->info.head_part_num;
- new_max = fifo->info.max_push_part_num;
- }
-
- JournalProcessor(const JournalProcessor&) = delete;
- JournalProcessor& operator =(const JournalProcessor&) = delete;
- JournalProcessor(JournalProcessor&&) = delete;
- JournalProcessor& operator =(JournalProcessor&&) = delete;
-
- void process() {
- if (iter != journal.end()) {
- const auto entry = iter->second;
- process_journal_entry(entry,
- journal_entry_finisher(entry));
- return;
- } else {
- postprocess();
- return;
- }
- }
-};
-}
-}
-
-#endif // CEPH_RADOS_CLS_FIFIO_H
add_executable(radosgw-admin ${radosgw_admin_srcs})
target_link_libraries(radosgw-admin ${rgw_libs} librados
cls_rgw_client cls_otp_client cls_lock_client cls_refcount_client
- cls_log_client cls_timeindex_client neorados_cls_fifo
+ cls_log_client cls_timeindex_client
cls_version_client cls_user_client
global ${LIB_RESOLV}
OATH::OATH
add_executable(radosgw-es ${radosgw_es_srcs})
target_link_libraries(radosgw-es ${rgw_libs} librados
cls_rgw_client cls_otp_client cls_lock_client cls_refcount_client
- cls_log_client cls_timeindex_client neorados_cls_fifo
+ cls_log_client cls_timeindex_client
cls_version_client cls_user_client
global ${LIB_RESOLV}
${CURL_LIBRARIES} ${EXPAT_LIBRARIES} ${BLKID_LIBRARIES})
add_executable(radosgw-object-expirer ${radosgw_object_expirer_srcs})
target_link_libraries(radosgw-object-expirer ${rgw_libs} librados
cls_rgw_client cls_otp_client cls_lock_client cls_refcount_client
- cls_log_client cls_timeindex_client neorados_cls_fifo
+ cls_log_client cls_timeindex_client
cls_version_client cls_user_client
global ${LIB_RESOLV}
${CURL_LIBRARIES} ${EXPAT_LIBRARIES})
cls_refcount_client
cls_log_client
cls_timeindex_client
- neorados_cls_fifo
cls_version_client
cls_user_client
${LIB_RESOLV}
op->exec(fifo::op::CLASS, fifo::op::UPDATE_META, in);
}
-void part_init(lr::ObjectWriteOperation* op, std::string_view tag,
- fifo::data_params params)
+void part_init(lr::ObjectWriteOperation* op, fifo::data_params params)
{
fifo::op::init_part ip;
- ip.tag = tag;
ip.params = params;
cb::list in;
op->exec(fifo::op::CLASS, fifo::op::INIT_PART, in);
}
-int push_part(const DoutPrefixProvider *dpp, lr::IoCtx& ioctx, const std::string& oid, std::string_view tag,
+int push_part(const DoutPrefixProvider *dpp, lr::IoCtx& ioctx, const std::string& oid,
std::deque<cb::list> data_bufs, std::uint64_t tid,
optional_yield y)
{
lr::ObjectWriteOperation op;
fifo::op::push_part pp;
- pp.tag = tag;
pp.data_bufs = data_bufs;
pp.total_len = 0;
return retval;
}
-void push_part(lr::IoCtx& ioctx, const std::string& oid, std::string_view tag,
+void push_part(lr::IoCtx& ioctx, const std::string& oid,
std::deque<cb::list> data_bufs, std::uint64_t tid,
lr::AioCompletion* c)
{
lr::ObjectWriteOperation op;
fifo::op::push_part pp;
- pp.tag = tag;
pp.data_bufs = data_bufs;
pp.total_len = 0;
}
void trim_part(lr::ObjectWriteOperation* op,
- std::optional<std::string_view> tag,
std::uint64_t ofs, bool exclusive)
{
fifo::op::trim_part tp;
- tp.tag = tag;
tp.ofs = ofs;
tp.exclusive = exclusive;
}
int list_part(const DoutPrefixProvider *dpp, lr::IoCtx& ioctx, const std::string& oid,
- std::optional<std::string_view> tag, std::uint64_t ofs,
- std::uint64_t max_entries,
+ std::uint64_t ofs, std::uint64_t max_entries,
std::vector<fifo::part_list_entry>* entries,
- bool* more, bool* full_part, std::string* ptag,
+ bool* more, bool* full_part,
std::uint64_t tid, optional_yield y)
{
lr::ObjectReadOperation op;
fifo::op::list_part lp;
- lp.tag = tag;
lp.ofs = ofs;
lp.max_entries = max_entries;
if (entries) *entries = std::move(reply.entries);
if (more) *more = reply.more;
if (full_part) *full_part = reply.full_part;
- if (ptag) *ptag = reply.tag;
} catch (const cb::error& err) {
ldpp_dout(dpp, -1)
<< __PRETTY_FUNCTION__ << ":" << __LINE__
std::vector<fifo::part_list_entry>* entries;
bool* more;
bool* full_part;
- std::string* ptag;
std::uint64_t tid;
list_entry_completion(CephContext* cct, int* r_out, std::vector<fifo::part_list_entry>* entries,
- bool* more, bool* full_part, std::string* ptag,
- std::uint64_t tid)
+ bool* more, bool* full_part, std::uint64_t tid)
: cct(cct), r_out(r_out), entries(entries), more(more),
- full_part(full_part), ptag(ptag), tid(tid) {}
+ full_part(full_part), tid(tid) {}
virtual ~list_entry_completion() = default;
void handle_completion(int r, bufferlist& bl) override {
if (r >= 0) try {
if (entries) *entries = std::move(reply.entries);
if (more) *more = reply.more;
if (full_part) *full_part = reply.full_part;
- if (ptag) *ptag = reply.tag;
} catch (const cb::error& err) {
lderr(cct)
<< __PRETTY_FUNCTION__ << ":" << __LINE__
};
lr::ObjectReadOperation list_part(CephContext* cct,
- std::optional<std::string_view> tag,
std::uint64_t ofs,
std::uint64_t max_entries,
int* r_out,
std::vector<fifo::part_list_entry>* entries,
bool* more, bool* full_part,
- std::string* ptag, std::uint64_t tid)
+ std::uint64_t tid)
{
lr::ObjectReadOperation op;
fifo::op::list_part lp;
- lp.tag = tag;
lp.ofs = ofs;
lp.max_entries = max_entries;
encode(lp, in);
op.exec(fifo::op::CLASS, fifo::op::LIST_PART, in,
new list_entry_completion(cct, r_out, entries, more, full_part,
- ptag, tid));
+ tid));
return op;
}
return m;
}
-std::string FIFO::generate_tag() const
-{
- static constexpr auto HEADER_TAG_SIZE = 16;
- return gen_rand_alphanumeric_plain(static_cast<CephContext*>(ioctx.cct()),
- HEADER_TAG_SIZE);
-}
-
-
int FIFO::apply_update(const DoutPrefixProvider *dpp,
fifo::info* info,
const fifo::objv& objv,
assert(r >= 0);
}
-int FIFO::create_part(const DoutPrefixProvider *dpp, int64_t part_num, std::string_view tag, std::uint64_t tid,
+int FIFO::create_part(const DoutPrefixProvider *dpp, int64_t part_num, std::uint64_t tid,
optional_yield y)
{
ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
op.create(false); /* We don't need exclusivity, part_init ensures
we're creating from the same journal entry. */
std::unique_lock l(m);
- part_init(&op, tag, info.params);
+ part_init(&op, info.params);
auto oid = info.part_oid(part_num);
l.unlock();
auto r = rgw_rados_operate(dpp, ioctx, oid, &op, y);
return r;
}
-int FIFO::remove_part(const DoutPrefixProvider *dpp, int64_t part_num, std::string_view tag, std::uint64_t tid,
+int FIFO::remove_part(const DoutPrefixProvider *dpp, int64_t part_num, std::uint64_t tid,
optional_yield y)
{
ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
<< " processing entry: entry=" << entry << " tid=" << tid
<< dendl;
switch (entry.op) {
- case fifo::journal_entry::Op::create:
- r = create_part(dpp, entry.part_num, entry.part_tag, tid, y);
+ using enum fifo::journal_entry::Op;
+ case create:
+ r = create_part(dpp, entry.part_num, tid, y);
if (entry.part_num > new_max) {
new_max = entry.part_num;
}
break;
- case fifo::journal_entry::Op::set_head:
+ case set_head:
r = 0;
if (entry.part_num > new_head) {
new_head = entry.part_num;
}
break;
- case fifo::journal_entry::Op::remove:
- r = remove_part(dpp, entry.part_num, entry.part_tag, tid, y);
+ case remove:
+ r = remove_part(dpp, entry.part_num, tid, y);
if (r == -ENOENT) r = 0;
if (entry.part_num >= new_tail) {
new_tail = entry.part_num + 1;
ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
<< " entering: tid=" << tid << dendl;
std::unique_lock l(m);
- std::vector jentries = { info.next_journal_entry(generate_tag()) };
+ using enum fifo::journal_entry::Op;
+ std::vector<fifo::journal_entry> jentries{{
+ create, info.max_push_part_num + 1
+ }};
if (info.journal.find(jentries.front().part_num) != info.journal.end()) {
l.unlock();
ldpp_dout(dpp, 5) << __PRETTY_FUNCTION__ << ":" << __LINE__
if (is_head) {
ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
<< " needs new head: tid=" << tid << dendl;
- auto new_head_jentry = jentries.front();
- new_head_jentry.op = fifo::journal_entry::Op::set_head;
- new_head_part_num = jentries.front().part_num;
- jentries.push_back(std::move(new_head_jentry));
+ jentries.push_back({ set_head, jentries.front().part_num });
}
l.unlock();
lr::AioCompletion* c)
{
std::unique_lock l(m);
- std::vector jentries = { info.next_journal_entry(generate_tag()) };
+ using enum fifo::journal_entry::Op;
+ std::vector<fifo::journal_entry> jentries{{
+ create, info.max_push_part_num + 1
+ }};
if (info.journal.find(jentries.front().part_num) != info.journal.end()) {
l.unlock();
ldpp_dout(dpp, 5) << __PRETTY_FUNCTION__ << ":" << __LINE__
auto version = info.version;
if (is_head) {
- auto new_head_jentry = jentries.front();
- new_head_jentry.op = fifo::journal_entry::Op::set_head;
- new_head_part_num = jentries.front().part_num;
- jentries.push_back(std::move(new_head_jentry));
+ jentries.push_back({ set_head, jentries.front().part_num });
}
l.unlock();
<< " entering: tid=" << tid << dendl;
std::unique_lock l(m);
auto head_part_num = info.head_part_num;
- auto tag = info.head_tag;
const auto part_oid = info.part_oid(head_part_num);
l.unlock();
- auto r = push_part(dpp, ioctx, part_oid, tag, data_bufs, tid, y);
+ auto r = push_part(dpp, ioctx, part_oid, data_bufs, tid, y);
if (r < 0) {
ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
<< " push_part failed: r=" << r << " tid=" << tid << dendl;
{
std::unique_lock l(m);
auto head_part_num = info.head_part_num;
- auto tag = info.head_tag;
const auto part_oid = info.part_oid(head_part_num);
l.unlock();
- push_part(ioctx, part_oid, tag, data_bufs, tid, c);
+ push_part(ioctx, part_oid, data_bufs, tid, c);
}
int FIFO::trim_part(const DoutPrefixProvider *dpp, int64_t part_num, uint64_t ofs,
- std::optional<std::string_view> tag,
bool exclusive, std::uint64_t tid,
optional_yield y)
{
std::unique_lock l(m);
const auto part_oid = info.part_oid(part_num);
l.unlock();
- rgw::cls::fifo::trim_part(&op, tag, ofs, exclusive);
+ rgw::cls::fifo::trim_part(&op, ofs, exclusive);
auto r = rgw_rados_operate(dpp, ioctx, part_oid, &op, y);
if (r < 0) {
ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
}
void FIFO::trim_part(const DoutPrefixProvider *dpp, int64_t part_num, uint64_t ofs,
- std::optional<std::string_view> tag,
bool exclusive, std::uint64_t tid,
lr::AioCompletion* c)
{
std::unique_lock l(m);
const auto part_oid = info.part_oid(part_num);
l.unlock();
- rgw::cls::fifo::trim_part(&op, tag, ofs, exclusive);
+ rgw::cls::fifo::trim_part(&op, ofs, exclusive);
auto r = ioctx.aio_operate(part_oid, c, &op);
ceph_assert(r >= 0);
}
auto part_oid = info.part_oid(part_num);
l.unlock();
- r = list_part(dpp, ioctx, part_oid, {}, ofs, max_entries, &entries,
- &part_more, &part_full, nullptr, tid, y);
+ r = list_part(dpp, ioctx, part_oid, ofs, max_entries, &entries,
+ &part_more, &part_full, tid, y);
if (r == -ENOENT) {
ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
<< " missing part, rereading metadata"
<< " pn=" << pn << " tid=" << tid << dendl;
std::unique_lock l(m);
l.unlock();
- r = trim_part(dpp, pn, max_part_size, std::nullopt, false, tid, y);
+ r = trim_part(dpp, pn, max_part_size, false, tid, y);
if (r < 0 && r == -ENOENT) {
ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
<< " trim_part failed: r=" << r
}
++pn;
}
- r = trim_part(dpp, part_num, ofs, std::nullopt, exclusive, tid, y);
+ r = trim_part(dpp, part_num, ofs, exclusive, tid, y);
if (r < 0 && r != -ENOENT) {
ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
<< " trim_part failed: r=" << r
if (pn < part_num) {
ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
<< " pn=" << pn << " tid=" << tid << dendl;
- fifo->trim_part(dpp, pn++, max_part_size, std::nullopt,
- false, tid, call(std::move(p)));
+ fifo->trim_part(dpp, pn++, max_part_size, false, tid,
+ call(std::move(p)));
} else {
update = true;
canceled = tail_part_num < part_num;
- fifo->trim_part(dpp, part_num, ofs, std::nullopt, exclusive, tid,
- call(std::move(p)));
+ fifo->trim_part(dpp, part_num, ofs, exclusive, tid, call(std::move(p)));
}
return;
}
std::unique_lock l(fifo->m);
const auto max_part_size = fifo->info.params.max_part_size;
l.unlock();
- fifo->trim_part(dpp, pn++, max_part_size, std::nullopt,
- false, tid, call(std::move(p)));
+ fifo->trim_part(dpp, pn++, max_part_size, false, tid,
+ call(std::move(p)));
return;
}
l.unlock();
update = true;
canceled = tail_part_num < part_num;
- fifo->trim_part(dpp, part_num, ofs, std::nullopt, exclusive, tid,
- call(std::move(p)));
+ fifo->trim_part(dpp, part_num, ofs, exclusive, tid, call(std::move(p)));
return;
}
} else {
trimmer->update = true;
}
- trim_part(dpp, pn, ofs, std::nullopt, exclusive,
- tid, Trimmer::call(std::move(trimmer)));
+ trim_part(dpp, pn, ofs, exclusive, tid, Trimmer::call(std::move(trimmer)));
}
int FIFO::get_part_info(const DoutPrefixProvider *dpp, int64_t part_num,
pp_callback,
} state;
- void create_part(const DoutPrefixProvider *dpp, Ptr&& p, int64_t part_num,
- std::string_view tag) {
+ void create_part(const DoutPrefixProvider *dpp, Ptr&& p, int64_t part_num) {
ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
<< " entering: tid=" << tid << dendl;
state = entry_callback;
op.create(false); /* We don't need exclusivity, part_init ensures
we're creating from the same journal entry. */
std::unique_lock l(fifo->m);
- part_init(&op, tag, fifo->info.params);
+ part_init(&op, fifo->info.params);
auto oid = fifo->info.part_oid(part_num);
l.unlock();
auto r = fifo->ioctx.aio_operate(oid, call(std::move(p)), &op);
return;
}
- void remove_part(const DoutPrefixProvider *dpp, Ptr&& p, int64_t part_num,
- std::string_view tag) {
+ void remove_part(const DoutPrefixProvider *dpp, Ptr&& p, int64_t part_num) {
ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
<< " entering: tid=" << tid << dendl;
state = entry_callback;
<< " finishing entry: entry=" << entry
<< " tid=" << tid << dendl;
- if (entry.op == fifo::journal_entry::Op::remove && r == -ENOENT)
+ using enum fifo::journal_entry::Op;
+ if (entry.op == remove && r == -ENOENT)
r = 0;
if (r < 0) {
return;
} else {
switch (entry.op) {
- case fifo::journal_entry::Op::unknown:
- case fifo::journal_entry::Op::set_head:
+ case unknown:
+ case set_head:
// Can't happen. Filtered out in process.
complete(std::move(p), -EIO);
return;
- case fifo::journal_entry::Op::create:
+ case create:
if (entry.part_num > new_max) {
new_max = entry.part_num;
}
break;
- case fifo::journal_entry::Op::remove:
+ case remove:
if (entry.part_num >= new_tail) {
new_tail = entry.part_num + 1;
}
<< " tid=" << tid << dendl;
const auto entry = iter->second;
switch (entry.op) {
- case fifo::journal_entry::Op::create:
- create_part(dpp, std::move(p), entry.part_num, entry.part_tag);
+ using enum fifo::journal_entry::Op;
+ case create:
+ create_part(dpp, std::move(p), entry.part_num);
return;
- case fifo::journal_entry::Op::set_head:
+ case set_head:
if (entry.part_num > new_head) {
new_head = entry.part_num;
}
processed.push_back(entry);
++iter;
continue;
- case fifo::journal_entry::Op::remove:
- remove_part(dpp, std::move(p), entry.part_num, entry.part_tag);
+ case remove:
+ remove_part(dpp, std::move(p), entry.part_num);
return;
default:
ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
l.unlock();
read = false;
- auto op = list_part(f->cct, {}, ofs, max_entries, &r_out,
- &entries, &part_more, &part_full,
- nullptr, tid);
+ auto op = list_part(f->cct, ofs, max_entries, &r_out,
+ &entries, &part_more, &part_full, tid);
f->ioctx.aio_operate(part_oid, call(std::move(p)), &op, nullptr);
} else {
complete(std::move(p), 0);
std::string oid)
: ioctx(std::move(ioc)), oid(oid) {}
- std::string generate_tag() const;
-
int apply_update(const DoutPrefixProvider *dpp,
fifo::info* info,
const fifo::objv& objv,
void _update_meta(const DoutPrefixProvider *dpp, const fifo::update& update,
fifo::objv version, bool* pcanceled,
std::uint64_t tid, lr::AioCompletion* c);
- int create_part(const DoutPrefixProvider *dpp, int64_t part_num, std::string_view tag, std::uint64_t tid,
+ int create_part(const DoutPrefixProvider *dpp, int64_t part_num, std::uint64_t tid,
optional_yield y);
- int remove_part(const DoutPrefixProvider *dpp, int64_t part_num, std::string_view tag, std::uint64_t tid,
+ int remove_part(const DoutPrefixProvider *dpp, int64_t part_num, std::uint64_t tid,
optional_yield y);
int process_journal(const DoutPrefixProvider *dpp, std::uint64_t tid, optional_yield y);
void process_journal(const DoutPrefixProvider *dpp, std::uint64_t tid, lr::AioCompletion* c);
void push_entries(const std::deque<cb::list>& data_bufs,
std::uint64_t tid, lr::AioCompletion* c);
int trim_part(const DoutPrefixProvider *dpp, int64_t part_num, uint64_t ofs,
- std::optional<std::string_view> tag, bool exclusive,
- std::uint64_t tid, optional_yield y);
+ bool exclusive, std::uint64_t tid, optional_yield y);
void trim_part(const DoutPrefixProvider *dpp, int64_t part_num, uint64_t ofs,
- std::optional<std::string_view> tag, bool exclusive,
- std::uint64_t tid, lr::AioCompletion* c);
+ bool exclusive, std::uint64_t tid, lr::AioCompletion* c);
/// Force refresh of metadata, yielding/blocking style
int read_meta(const DoutPrefixProvider *dpp, std::uint64_t tid, optional_yield y);
# libcls_* dependencies cascade to osd, kv and other libs that are not
# available on Windows yet.
add_subdirectory(cls_hello)
- add_subdirectory(cls_fifo)
add_subdirectory(cls_cas)
add_subdirectory(cls_lock)
add_subdirectory(cls_log)
+++ /dev/null
-add_executable(ceph_test_cls_fifo
- test_cls_fifo.cc
- )
-target_link_libraries(ceph_test_cls_fifo
- neorados_cls_fifo
- libneorados
- spawn
- ${UNITTEST_LIBS}
- ${BLKID_LIBRARIES}
- ${CMAKE_DL_LIBS}
- ${CRYPTO_LIBS}
- ${EXTRALIBS}
- neoradostest-support
- )
-install(TARGETS
- ceph_test_cls_fifo
- DESTINATION ${CMAKE_INSTALL_BINDIR})
-
-add_executable(ceph_bench_cls_fifo
- bench_cls_fifo.cc
- )
-target_link_libraries(ceph_bench_cls_fifo
- neorados_cls_fifo
- libneorados
- spawn
- ${UNITTEST_LIBS}
- ${BLKID_LIBRARIES}
- ${CMAKE_DL_LIBS}
- ${CRYPTO_LIBS}
- ${EXTRALIBS}
- )
-install(TARGETS
- ceph_test_cls_fifo
- DESTINATION ${CMAKE_INSTALL_BINDIR})
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2020 Red Hat, Inc.
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation. See file COPYING.
- *
- */
-
-
-#include <cerrno>
-#include <chrono>
-#include <cstdint>
-#include <exception>
-#include <future>
-#include <iostream>
-#include <string_view>
-
-#include <boost/asio.hpp>
-#include <boost/system/error_code.hpp>
-#include <boost/program_options.hpp>
-
-#undef FMT_HEADER_ONLY
-#define FMT_HEADER_ONLY 1
-#include <fmt/chrono.h>
-#include <fmt/format.h>
-#include <fmt/ostream.h>
-
-#include <spawn/spawn.hpp>
-
-#include "include/neorados/RADOS.hpp"
-
-#include "neorados/cls/fifo.h"
-
-using namespace std;
-
-namespace ba = boost::asio;
-namespace bs = boost::system;
-namespace bpo = boost::program_options;
-namespace cb = ceph::buffer;
-namespace R = neorados;
-namespace RCf = neorados::cls::fifo;
-namespace fifo = rados::cls::fifo;
-namespace s = spawn;
-namespace sc = std::chrono;
-
-#if FMT_VERSION >= 90000
-template<>
-struct fmt::formatter<bpo::options_description> : fmt::ostream_formatter {};
-#endif
-
-namespace {
-static constexpr auto PUSH = 0x01 << 0;
-static constexpr auto PULL = 0x01 << 1;
-static constexpr auto BOTH = PUSH | PULL;
-static constexpr auto CLEAN = 0x01 << 2;
-static constexpr auto METADATA = 0x01 << 3;
-static constexpr auto PARTINFO = 0x01 << 4;
-static constexpr auto LIST = 0x01 << 5;
-
-struct benchmark {
- std::uint32_t entries = 0;
- sc::duration<double> elapsed = 0ns;
-
- std::uint64_t ratio() const {
- return entries/std::max(elapsed,
- sc::duration<double>(1ns)).count();
- }
- benchmark() = default;
- benchmark(std::uint32_t entries, sc::duration<double> elapsed)
- : entries(entries), elapsed(elapsed) {}
-};
-
-benchmark push(RCf::FIFO& f, const std::uint32_t count,
- const std::uint32_t entry_size, const std::uint32_t push_entries,
- s::yield_context y)
-{
- cb::list entry;
- entry.push_back(cb::create_small_page_aligned(entry_size));
- entry.zero();
-
- std::vector entries(std::min(count, push_entries), entry);
- auto remaining = count;
- auto start = sc::steady_clock::now();
- while (remaining) {
- if (entries.size() > remaining) {
- entries.resize(remaining);
- }
- f.push(entries, y);
- remaining -= entries.size();
- }
- auto finish = sc::steady_clock::now();
- return benchmark(count, (finish - start));
-}
-
-benchmark pull(RCf::FIFO& f, const std::uint32_t count,
- const std::uint32_t pull_entries, s::yield_context y)
-{
- auto remaining = count;
- std::uint32_t got = 0;
-
- auto start = sc::steady_clock::now();
- while (remaining) {
- auto [result, more] = f.list(std::min(remaining, pull_entries),
- std::nullopt, y);
- if (result.empty())
- break;
- got += result.size();
- remaining -= result.size();
- f.trim(result.back().marker, false, y);
- }
- auto finish = sc::steady_clock::now();
- return benchmark(got, (finish - start));
-}
-
-void concurpull(const std::string& oid, const std::int64_t pool,
- const std::uint32_t count, const std::uint32_t pull_entries,
- std::promise<benchmark> notify, const bool* const exit_early)
-{
- ba::io_context c;
- benchmark bench;
- std::exception_ptr ex;
- s::spawn(
- c,
- [&](s::yield_context y) {
- try {
- auto r = R::RADOS::Builder{}.build(c, y);
- R::IOContext ioc(pool);
- auto f = RCf::FIFO::open(r, ioc, oid, y);
- auto remaining = count;
- std::uint32_t got = 0;
-
- auto start = sc::steady_clock::now();
- while (remaining) {
- if (*exit_early) break;
- auto [result, more] =
- f->list(std::min(remaining, pull_entries), std::nullopt, y);
- if (result.empty()) {
- // We just keep going assuming they'll push more.
- continue;
- }
- got += result.size();
- remaining -= result.size();
- if (*exit_early) break;
- f->trim(result.back().marker, false, y);
- }
- auto finish = sc::steady_clock::now();
- bench.entries = got;
- bench.elapsed = finish - start;
- } catch (const std::exception&) {
- ex = std::current_exception();
- }
- });
- c.run();
- if (ex) {
- notify.set_exception(std::current_exception());
- } else {
- notify.set_value(bench);
- }
-}
-
-void clean(R::RADOS& r, const R::IOContext& ioc, RCf::FIFO& f,
- s::yield_context y)
-{
- f.read_meta(y);
- const auto info = f.meta();
- if (info.head_part_num > -1) {
- for (auto i = info.tail_part_num; i <= info.head_part_num; ++i) {
- R::WriteOp op;
- op.remove();
- r.execute(info.part_oid(i), ioc, std::move(op), y);
- }
- }
- R::WriteOp op;
- op.remove();
- r.execute(info.id, ioc, std::move(op), y);
-}
-}
-
-int main(int argc, char* argv[])
-{
- const std::string_view prog(argv[0]);
- std::string command;
- try {
- std::uint32_t count = 0;
- std::string oid;
- std::string pool;
- std::uint32_t entry_size = 0;
- std::uint32_t push_entries = 0;
- std::uint32_t pull_entries = 0;
- std::uint64_t max_part_size = 0;
- std::uint64_t max_entry_size = 0;
- std::int64_t part_num = 0;
- std::string marker;
-
- bpo::options_description desc(fmt::format("{} options", prog));
- desc.add_options()
- ("help", "show help")
- ("oid", bpo::value<std::string>(&oid)->default_value("fifo"s),
- "the base oid for the fifo")
- ("pool", bpo::value<std::string>(&pool)->default_value("fifo_benchmark"s),
- "the base oid for the fifo")
- ("count", bpo::value<std::uint32_t>(&count)->default_value(1024),
- "total count of items")
- ("entry-size", bpo::value<std::uint32_t>(&entry_size)->default_value(64),
- "size of entries to push")
- ("push-entries",
- bpo::value<std::uint32_t>(&push_entries)
- ->default_value(512), "entries to push per call")
- ("max-part-size", bpo::value<std::uint64_t>(&max_part_size)
- ->default_value(RCf::default_max_part_size),
- "maximum entry size allowed by FIFO")
- ("max-entry-size", bpo::value<std::uint64_t>(&max_entry_size)
- ->default_value(RCf::default_max_entry_size),
- "maximum entry size allowed by FIFO")
- ("pull-entries",
- bpo::value<uint32_t>(&pull_entries)
- ->default_value(512), "entries to pull per call")
- ("part-num",
- bpo::value<int64_t>(&part_num)
- ->default_value(-1), "partition number, -1 for head")
- ("marker", bpo::value<std::string>(&marker), "marker to begin list")
- ("command", bpo::value<std::string>(&command),
- "the operation to perform");
-
- bpo::positional_options_description p;
- p.add("command", 1);
-
- bpo::variables_map vm;
-
- bpo::store(bpo::command_line_parser(argc, argv).
- options(desc).positional(p).run(), vm);
-
- bpo::notify(vm);
-
- if (vm.count("help")) {
- fmt::print(std::cout, "{}", desc);
- fmt::print(std::cout, "\n{} commands:\n", prog);
- fmt::print(std::cout, " push\t\t\t push entries into fifo\n");
- fmt::print(std::cout, " pull\t\t\t retrieve and trim entries\n");
- fmt::print(std::cout, " both\t\t\t both at once, in two threads\n");
- fmt::print(std::cout, " metadata\t\t\t print metadata\n");
- fmt::print(std::cout, " partinfo\t\t\t print metadata\n");
- fmt::print(std::cout, " list\t\t\t list entries\n");
- fmt::print(std::cout, " clean\t\t\t clean up\n");
- return 0;
- }
-
-
- if (vm.find("command") == vm.end()) {
- fmt::print(std::cerr, "{}: a command is required\n", prog);
- return 1;
- }
-
- int op = 0;
- if (command == "push"s) {
- op = PUSH;
- } else if (command == "pull"s) {
- op = PULL;
- } else if (command == "both"s) {
- op = BOTH;
- } else if (command == "clean"s) {
- op = CLEAN;
- } else if (command == "metadata"s) {
- op = METADATA;
- } else if (command == "partinfo"s) {
- op = PARTINFO;
- } else if (command == "list"s) {
- op = LIST;
- } else {
- fmt::print(std::cerr, "{}: {} is not a valid command\n",
- prog, command);
- return 1;
- }
-
- if (!(op & PULL) && !vm["pull-entries"].defaulted()) {
- fmt::print(std::cerr, "{}: pull-entries is only meaningful when pulling\n",
- prog);
- return 1;
- }
-
- if (!(op & PUSH)) {
- for (const auto& p : { "entry-size"s, "push-entries"s, "max-part-size"s,
- "max-entry-size"s }) {
- if (!vm[p].defaulted()) {
- fmt::print(std::cerr, "{}: {} is only meaningful when pushing\n",
- prog, p);
- return 1;
- }
- }
- }
-
- if (!(op & BOTH) && !(op & LIST) && !vm["count"].defaulted()) {
- fmt::print(std::cerr, "{}: count is only meaningful when pulling, pushing, both, or listing\n",
- prog);
- return 1;
- }
-
- if (!(op & PARTINFO) && !vm["part-num"].defaulted()) {
- fmt::print(std::cerr, "{}: part-num is only meaningful when getting part info\n",
- prog);
- return 1;
- }
-
- if (count == 0) {
- fmt::print(std::cerr, "{}: count must be nonzero\n", prog);
- return 1;
- }
-
- if ((op & PULL) && (pull_entries == 0)) {
- fmt::print(std::cerr,
- "{}: pull-entries must be nonzero\n", prog);
- return 1;
- }
-
- if (!(op & LIST) && vm.count("marker") > 0) {
- fmt::print(std::cerr, "{}: marker is only meaningful when listing\n",
- prog);
- return 1;
- }
-
- if (op & PUSH) {
- if (entry_size == 0) {
- fmt::print(std::cerr, "{}: entry-size must be nonzero\n", prog);
- return 1;
- }
- if (push_entries== 0) {
- fmt::print(std::cerr, "{}: push-entries must be nonzero\n", prog);
- return 1;
- }
- if (max_entry_size == 0) {
- fmt::print(std::cerr, "{}: max-entry-size must be nonzero\n", prog);
- return 1;
- }
- if (max_part_size == 0) {
- fmt::print(std::cerr, "{}: max-part-size must be nonzero\n", prog);
- return 1;
- }
- if (entry_size > max_entry_size) {
- fmt::print(std::cerr,
- "{}: entry-size may not be greater than max-entry-size\n",
- prog);
- return 1;
- }
- if (max_entry_size >= max_part_size) {
- fmt::print(std::cerr,
- "{}: max-entry-size may be less than max-part-size\n",
- prog);
- return 1;
- }
- }
-
- ba::io_context c;
- benchmark pushmark, pullmark;
- fifo::info meta;
- fifo::part_header partinfo;
- bool more = false;
- std::vector<RCf::list_entry> entries;
- s::spawn(
- c,
- [&](s::yield_context y) {
- auto r = R::RADOS::Builder{}.build(c, y);
- bs::error_code ec;
- std::int64_t pid;
- pid = r.lookup_pool(pool, y[ec]);
- if (ec) {
- r.create_pool(pool, std::nullopt, y);
- pid = r.lookup_pool(pool, y);
- }
- const R::IOContext ioc(pid);
- auto f = RCf::FIFO::create(r, ioc, oid, y, std::nullopt,
- std::nullopt, false, max_part_size,
- max_entry_size);
-
- switch (op) {
- case PUSH:
- pushmark = push(*f, count, entry_size, push_entries, y);
- break;
-
- case PULL:
- pullmark = pull(*f, count, pull_entries, y);
- break;
-
- case METADATA:
- meta = f->meta();
- break;
-
- case PARTINFO:
- meta = f->meta();
- if (part_num == -1) {
- part_num = meta.head_part_num;
- }
- partinfo = f->get_part_info(part_num, y);
- break;
-
- case LIST:
- if (vm.count("marker") == 0) {
- std::tie(entries, more) = f->list(count, std::nullopt, y);
- } else {
- std::tie(entries, more) = f->list(count, marker, y);
- }
- break;
-
- case BOTH: {
- std::promise<benchmark> notify;
- bool exit_early = false;
-
- auto notifier = notify.get_future();
- std::thread t(concurpull, oid, pid, count, pull_entries,
- std::move(notify), &exit_early);
- t.detach();
- try {
- pushmark = push(*f, count, entry_size, push_entries, y);
- } catch (const std::exception&) {
- exit_early = true;
- notifier.wait();
- throw;
- }
- pullmark = notifier.get();
- }
- }
-
- if (op & CLEAN)
- clean(r, ioc, *f, y);
- });
- c.run();
- if (op & PUSH) {
- fmt::print("Pushed {} in {} at {}/s\n",
- pushmark.entries, pushmark.elapsed, pushmark.ratio());
- }
- if (op & PULL) {
- if (pullmark.entries == count) {
- fmt::print(std::cout, "Pulled {} in {} at {}/s\n",
- pullmark.entries, pullmark.elapsed, pullmark.ratio());
- } else {
- fmt::print(std::cout, "Pulled {} (of {} requested), in {} at {}/s\n",
- pullmark.entries, count, pullmark.elapsed, pullmark.ratio());
- }
- }
- if (op & METADATA) {
- fmt::print(std::cout, "Metadata: [{}]\n", meta);
- }
- if (op & PARTINFO) {
- fmt::print(std::cout, "Info for partition {}: [{}]\n", part_num, partinfo);
- }
- if (op & LIST) {
- for (const auto& entry : entries) {
- fmt::print(std::cout, "{}\t{}\n", entry.marker, entry.mtime);
- }
- if (more) {
- fmt::print(std::cout, "...");
- }
- }
- } catch (const std::exception& e) {
- if (command.empty()) {
- fmt::print(std::cerr, "{}: {}\n", prog, e.what());
- } else {
- fmt::print(std::cerr, "{}: {}: {}\n", prog, command, e.what());
- }
- return 1;
- }
-
- return 0;
-}
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2019 Red Hat, Inc.
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation. See file COPYING.
- *
- */
-
-#include <cerrno>
-#include <iostream>
-#include <string_view>
-
-#include <boost/asio.hpp>
-#include <boost/system/error_code.hpp>
-
-#include <spawn/spawn.hpp>
-
-#include "include/scope_guard.h"
-#include "include/types.h"
-#include "include/neorados/RADOS.hpp"
-
-#include "cls/fifo/cls_fifo_ops.h"
-
-#include "neorados/cls/fifo.h"
-
-#include "test/neorados/common_tests.h"
-
-#include "gtest/gtest.h"
-
-using namespace std;
-
-namespace R = neorados;
-namespace ba = boost::asio;
-namespace bs = boost::system;
-namespace cb = ceph::buffer;
-namespace fifo = rados::cls::fifo;
-namespace RCf = neorados::cls::fifo;
-namespace s = spawn;
-
-namespace {
-void fifo_create(R::RADOS& r,
- const R::IOContext& ioc,
- const R::Object& oid,
- std::string_view id,
- s::yield_context y,
- std::optional<fifo::objv> objv = std::nullopt,
- std::optional<std::string_view> oid_prefix = std::nullopt,
- bool exclusive = false,
- std::uint64_t max_part_size = RCf::default_max_part_size,
- std::uint64_t max_entry_size = RCf::default_max_entry_size)
-{
- R::WriteOp op;
- RCf::create_meta(op, id, objv, oid_prefix, exclusive, max_part_size,
- max_entry_size);
- r.execute(oid, ioc, std::move(op), y);
-}
-}
-
-TEST(ClsFIFO, TestCreate) {
- ba::io_context c;
- auto fifo_id = "fifo"sv;
- R::Object oid(fifo_id);
-
- s::spawn(c, [&](s::yield_context y) {
- auto r = R::RADOS::Builder{}.build(c, y);
- auto pool = create_pool(r, get_temp_pool_name(), y);
- auto sg = make_scope_guard(
- [&] {
- r.delete_pool(pool, y);
- });
- R::IOContext ioc(pool);
- bs::error_code ec;
- fifo_create(r, ioc, oid, ""s, y[ec]);
- EXPECT_EQ(bs::errc::invalid_argument, ec);
- fifo_create(r, ioc, oid, fifo_id, y[ec], std::nullopt,
- std::nullopt, false, 0);
- EXPECT_EQ(bs::errc::invalid_argument, ec);
- fifo_create(r, ioc, oid, {}, y[ec],
- std::nullopt, std::nullopt,
- false, RCf::default_max_part_size, 0);
- EXPECT_EQ(bs::errc::invalid_argument, ec);
- fifo_create(r, ioc, oid, fifo_id, y);
- {
- std::uint64_t size;
- std::uint64_t size2;
- {
- R::ReadOp op;
- op.stat(&size, nullptr);
- r.execute(oid, ioc, std::move(op),
- nullptr, y);
- EXPECT_GT(size, 0);
- }
-
- {
- R::ReadOp op;
- op.stat(&size2, nullptr);
- r.execute(oid, ioc, std::move(op), nullptr, y);
- }
- EXPECT_EQ(size2, size);
- }
- /* test idempotency */
- fifo_create(r, ioc, oid, fifo_id, y);
- fifo_create(r, ioc, oid, {}, y[ec], std::nullopt,
- std::nullopt, false);
- EXPECT_EQ(bs::errc::invalid_argument, ec);
- fifo_create(r, ioc, oid, {}, y[ec], std::nullopt,
- "myprefix"sv, false);
- EXPECT_EQ(bs::errc::invalid_argument, ec);
- fifo_create(r, ioc, oid, "foo"sv, y[ec],
- std::nullopt, std::nullopt, false);
- EXPECT_EQ(bs::errc::file_exists, ec);
- });
- c.run();
-}
-
-TEST(ClsFIFO, TestGetInfo) {
- ba::io_context c;
- auto fifo_id = "fifo"sv;
- R::Object oid(fifo_id);
-
- s::spawn(c, [&](s::yield_context y) {
- auto r = R::RADOS::Builder{}.build(c, y);
- auto pool = create_pool(r, get_temp_pool_name(), y);
- auto sg = make_scope_guard(
- [&] {
- r.delete_pool(pool, y);
- });
- R::IOContext ioc(pool);
- /* first successful create */
- fifo_create(r, ioc, oid, fifo_id, y);
-
- fifo::info info;
- std::uint32_t part_header_size;
- std::uint32_t part_entry_overhead;
- {
- R::ReadOp op;
- RCf::get_meta(op, std::nullopt,
- nullptr, &info, &part_header_size,
- &part_entry_overhead);
- r.execute(oid, ioc, std::move(op), nullptr, y);
- EXPECT_GT(part_header_size, 0);
- EXPECT_GT(part_entry_overhead, 0);
- EXPECT_FALSE(info.version.instance.empty());
- }
- {
- R::ReadOp op;
- RCf::get_meta(op, info.version,
- nullptr, &info, &part_header_size,
- &part_entry_overhead);
- r.execute(oid, ioc, std::move(op), nullptr, y);
- }
- {
- R::ReadOp op;
- fifo::objv objv;
- objv.instance = "foo";
- objv.ver = 12;
- RCf::get_meta(op, objv,
- nullptr, &info, &part_header_size,
- &part_entry_overhead);
- ASSERT_ANY_THROW(r.execute(oid, ioc, std::move(op),
- nullptr, y));
- }
- });
- c.run();
-}
-
-TEST(FIFO, TestOpenDefault) {
- ba::io_context c;
- auto fifo_id = "fifo"s;
-
- s::spawn(c, [&](s::yield_context y) {
- auto r = R::RADOS::Builder{}.build(c, y);
- auto pool = create_pool(r, get_temp_pool_name(), y);
- auto sg = make_scope_guard(
- [&] {
- r.delete_pool(pool, y);
- });
- R::IOContext ioc(pool);
- auto fifo = RCf::FIFO::create(r, ioc, fifo_id, y);
- // force reading from backend
- fifo->read_meta(y);
- auto info = fifo->meta();
- EXPECT_EQ(info.id, fifo_id);
- });
- c.run();
-}
-
-TEST(FIFO, TestOpenParams) {
- ba::io_context c;
- auto fifo_id = "fifo"sv;
-
- s::spawn(c, [&](s::yield_context y) {
- auto r = R::RADOS::Builder{}.build(c, y);
- auto pool = create_pool(r, get_temp_pool_name(), y);
- auto sg = make_scope_guard(
- [&] {
- r.delete_pool(pool, y);
- });
- R::IOContext ioc(pool);
-
- const std::uint64_t max_part_size = 10 * 1024;
- const std::uint64_t max_entry_size = 128;
- auto oid_prefix = "foo.123."sv;
- fifo::objv objv;
- objv.instance = "fooz"s;
- objv.ver = 10;
-
- /* first successful create */
- auto f = RCf::FIFO::create(r, ioc, fifo_id, y, objv, oid_prefix,
- false, max_part_size,
- max_entry_size);
-
-
- /* force reading from backend */
- f->read_meta(y);
- auto info = f->meta();
- ASSERT_EQ(info.id, fifo_id);
- ASSERT_EQ(info.params.max_part_size, max_part_size);
- ASSERT_EQ(info.params.max_entry_size, max_entry_size);
- ASSERT_EQ(info.version, objv);
- });
- c.run();
-}
-
-namespace {
-template<class T>
-std::pair<T, std::string> decode_entry(const RCf::list_entry& entry)
-{
- T val;
- auto iter = entry.data.cbegin();
- decode(val, iter);
- return std::make_pair(std::move(val), entry.marker);
-}
-}
-
-
-
-TEST(FIFO, TestPushListTrim) {
- ba::io_context c;
- auto fifo_id = "fifo"sv;
-
- s::spawn(c, [&](s::yield_context y) mutable {
- auto r = R::RADOS::Builder{}.build(c, y);
- auto pool = create_pool(r, get_temp_pool_name(), y);
- auto sg = make_scope_guard(
- [&] {
- r.delete_pool(pool, y);
- });
- R::IOContext ioc(pool);
- auto f = RCf::FIFO::create(r, ioc, fifo_id, y);
- static constexpr auto max_entries = 10u;
- for (uint32_t i = 0; i < max_entries; ++i) {
- cb::list bl;
- encode(i, bl);
- f->push(bl, y);
- }
-
- std::optional<std::string> marker;
- /* get entries one by one */
-
- for (auto i = 0u; i < max_entries; ++i) {
- auto [result, more] = f->list(1, marker, y);
-
- bool expected_more = (i != (max_entries - 1));
- ASSERT_EQ(expected_more, more);
- ASSERT_EQ(1, result.size());
-
- std::uint32_t val;
- std::tie(val, marker) =
- decode_entry<std::uint32_t>(result.front());
-
- ASSERT_EQ(i, val);
- }
-
- /* get all entries at once */
- std::string markers[max_entries];
- std::uint32_t min_entry = 0;
- {
- auto [result, more] = f->list(max_entries * 10, std::nullopt,
- y);
-
- ASSERT_FALSE(more);
- ASSERT_EQ(max_entries, result.size());
-
-
- for (auto i = 0u; i < max_entries; ++i) {
- std::uint32_t val;
-
- std::tie(val, markers[i]) =
- decode_entry<std::uint32_t>(result[i]);
- ASSERT_EQ(i, val);
- }
-
-
- /* trim one entry */
- f->trim(markers[min_entry], false, y);
- ++min_entry;
- }
-
- auto [result, more] = f->list(max_entries * 10,
- std::nullopt, y);
-
- ASSERT_FALSE(more);
- ASSERT_EQ(max_entries - min_entry, result.size());
-
- for (auto i = min_entry; i < max_entries; ++i) {
- std::uint32_t val;
-
- std::tie(val, markers[i - min_entry]) =
- decode_entry<std::uint32_t>(result[i - min_entry]);
- ASSERT_EQ(i, val);
- }
-
- });
- c.run();
-}
-
-
-TEST(FIFO, TestPushTooBig) {
- ba::io_context c;
- auto fifo_id = "fifo"sv;
- static constexpr auto max_part_size = 2048ull;
- static constexpr auto max_entry_size = 128ull;
-
- s::spawn(c, [&](s::yield_context y) {
- auto r = R::RADOS::Builder{}.build(c, y);
- auto pool = create_pool(r, get_temp_pool_name(), y);
- auto sg = make_scope_guard(
- [&] {
- r.delete_pool(pool, y);
- });
- R::IOContext ioc(pool);
-
- auto f = RCf::FIFO::create(r, ioc, fifo_id, y, std::nullopt,
- std::nullopt, false, max_part_size,
- max_entry_size);
-
- char buf[max_entry_size + 1];
- memset(buf, 0, sizeof(buf));
-
- cb::list bl;
- bl.append(buf, sizeof(buf));
-
- bs::error_code ec;
- f->push(bl, y[ec]);
- EXPECT_EQ(RCf::errc::entry_too_large, ec);
- });
- c.run();
-}
-
-
-TEST(FIFO, TestMultipleParts) {
- ba::io_context c;
- auto fifo_id = "fifo"sv;
- static constexpr auto max_part_size = 2048ull;
- static constexpr auto max_entry_size = 128ull;
-
- s::spawn(c, [&](s::yield_context y) mutable {
- auto r = R::RADOS::Builder{}.build(c, y);
- auto pool = create_pool(r, get_temp_pool_name(), y);
- auto sg = make_scope_guard(
- [&] {
- r.delete_pool(pool, y);
- });
- R::IOContext ioc(pool);
-
- auto f = RCf::FIFO::create(r, ioc, fifo_id, y, std::nullopt,
- std::nullopt, false, max_part_size,
- max_entry_size);
-
-
- char buf[max_entry_size];
- memset(buf, 0, sizeof(buf));
-
- const auto [part_header_size, part_entry_overhead] =
- f->get_part_layout_info();
-
- const auto entries_per_part =
- (max_part_size - part_header_size) /
- (max_entry_size + part_entry_overhead);
-
- const auto max_entries = entries_per_part * 4 + 1;
-
- /* push enough entries */
- for (auto i = 0u; i < max_entries; ++i) {
- cb::list bl;
-
- *(int *)buf = i;
- bl.append(buf, sizeof(buf));
-
- f->push(bl, y);
- }
-
- auto info = f->meta();
-
- ASSERT_EQ(info.id, fifo_id);
- /* head should have advanced */
- ASSERT_GT(info.head_part_num, 0);
-
-
- /* list all at once */
- auto [result, more] = f->list(max_entries, std::nullopt, y);
- EXPECT_EQ(false, more);
-
- ASSERT_EQ(max_entries, result.size());
-
- for (auto i = 0u; i < max_entries; ++i) {
- auto& bl = result[i].data;
- ASSERT_EQ(i, *(int *)bl.c_str());
- }
-
- std::optional<std::string> marker;
- /* get entries one by one */
-
- for (auto i = 0u; i < max_entries; ++i) {
- auto [result, more] = f->list(1, marker, y);
- ASSERT_EQ(result.size(), 1);
- const bool expected_more = (i != (max_entries - 1));
- ASSERT_EQ(expected_more, more);
-
- std::uint32_t val;
- std::tie(val, marker) =
- decode_entry<std::uint32_t>(result.front());
-
- auto& entry = result.front();
- auto& bl = entry.data;
- ASSERT_EQ(i, *(int *)bl.c_str());
- marker = entry.marker;
- }
-
- /* trim one at a time */
- marker.reset();
- for (auto i = 0u; i < max_entries; ++i) {
- /* read single entry */
- {
- auto [result, more] = f->list(1, marker, y);
- ASSERT_EQ(result.size(), 1);
- const bool expected_more = (i != (max_entries - 1));
- ASSERT_EQ(expected_more, more);
-
- marker = result.front().marker;
-
- f->trim(*marker, false, y);
- }
-
- /* check tail */
- info = f->meta();
- ASSERT_EQ(info.tail_part_num, i / entries_per_part);
-
- /* try to read all again, see how many entries left */
- auto [result, more] = f->list(max_entries, marker, y);
- ASSERT_EQ(max_entries - i - 1, result.size());
- ASSERT_EQ(false, more);
- }
-
- /* tail now should point at head */
- info = f->meta();
- ASSERT_EQ(info.head_part_num, info.tail_part_num);
-
- /* check old tails are removed */
- for (auto i = 0; i < info.tail_part_num; ++i) {
- bs::error_code ec;
- f->get_part_info(i, y[ec]);
- ASSERT_EQ(bs::errc::no_such_file_or_directory, ec);
- }
- /* check current tail exists */
- f->get_part_info(info.tail_part_num, y);
- });
- c.run();
-}
-
-
-TEST(FIFO, TestTwoPushers) {
- ba::io_context c;
- auto fifo_id = "fifo"sv;
- static constexpr auto max_part_size = 2048ull;
- static constexpr auto max_entry_size = 128ull;
-
- s::spawn(c, [&](s::yield_context y) {
- auto r = R::RADOS::Builder{}.build(c, y);
- auto pool = create_pool(r, get_temp_pool_name(), y);
- auto sg = make_scope_guard(
- [&] {
- r.delete_pool(pool, y);
- });
- R::IOContext ioc(pool);
-
- auto f = RCf::FIFO::create(r, ioc, fifo_id, y, std::nullopt,
- std::nullopt, false, max_part_size,
- max_entry_size);
-
-
-
- char buf[max_entry_size];
- memset(buf, 0, sizeof(buf));
-
-
- auto [part_header_size, part_entry_overhead] =
- f->get_part_layout_info();
-
- const auto entries_per_part =
- (max_part_size - part_header_size) /
- (max_entry_size + part_entry_overhead);
-
- const auto max_entries = entries_per_part * 4 + 1;
-
- auto f2 = RCf::FIFO::open(r, ioc, fifo_id, y);
-
- std::vector fifos{&f, &f2};
-
- for (auto i = 0u; i < max_entries; ++i) {
- cb::list bl;
- *(int *)buf = i;
- bl.append(buf, sizeof(buf));
-
- auto& f = fifos[i % fifos.size()];
-
- (*f)->push(bl, y);
- }
-
- /* list all by both */
- {
- auto [result, more] = f2->list(max_entries, std::nullopt, y);
-
- ASSERT_EQ(false, more);
- ASSERT_EQ(max_entries, result.size());
- }
- auto [result, more] = f2->list(max_entries, std::nullopt, y);
- ASSERT_EQ(false, more);
- ASSERT_EQ(max_entries, result.size());
-
- for (auto i = 0u; i < max_entries; ++i) {
- auto& bl = result[i].data;
- ASSERT_EQ(i, *(int *)bl.c_str());
- }
- });
- c.run();
-}
-
-
-TEST(FIFO, TestTwoPushersTrim) {
- ba::io_context c;
- auto fifo_id = "fifo"sv;
- static constexpr auto max_part_size = 2048ull;
- static constexpr auto max_entry_size = 128ull;
-
- s::spawn(c, [&](s::yield_context y) {
- auto r = R::RADOS::Builder{}.build(c, y);
- auto pool = create_pool(r, get_temp_pool_name(), y);
- auto sg = make_scope_guard(
- [&] {
- r.delete_pool(pool, y);
- });
- R::IOContext ioc(pool);
-
- auto f1 = RCf::FIFO::create(r, ioc, fifo_id, y, std::nullopt,
- std::nullopt, false, max_part_size,
- max_entry_size);
-
- char buf[max_entry_size];
- memset(buf, 0, sizeof(buf));
-
-
- auto [part_header_size, part_entry_overhead] =
- f1->get_part_layout_info();
-
- const auto entries_per_part =
- (max_part_size - part_header_size) /
- (max_entry_size + part_entry_overhead);
-
- const auto max_entries = entries_per_part * 4 + 1;
-
- auto f2 = RCf::FIFO::open(r, ioc, fifo_id, y);
-
- /* push one entry to f2 and the rest to f1 */
-
- for (auto i = 0u; i < max_entries; ++i) {
- cb::list bl;
-
- *(int *)buf = i;
- bl.append(buf, sizeof(buf));
-
- auto f = (i < 1 ? &f2 : &f1);
- (*f)->push(bl, y);
- }
-
- /* trim half by fifo1 */
- auto num = max_entries / 2;
-
- std::string marker;
- {
- auto [result, more] = f1->list(num, std::nullopt, y);
-
- ASSERT_EQ(true, more);
- ASSERT_EQ(num, result.size());
-
- for (auto i = 0u; i < num; ++i) {
- auto& bl = result[i].data;
- ASSERT_EQ(i, *(int *)bl.c_str());
- }
-
- auto& entry = result[num - 1];
- marker = entry.marker;
-
- f1->trim(marker, false, y);
-
- /* list what's left by fifo2 */
-
- }
-
- const auto left = max_entries - num;
- auto [result, more] = f2->list(left, marker, y);
- ASSERT_EQ(left, result.size());
- ASSERT_EQ(false, more);
-
- for (auto i = num; i < max_entries; ++i) {
- auto& bl = result[i - num].data;
- ASSERT_EQ(i, *(int *)bl.c_str());
- }
- });
- c.run();
-}
-
-TEST(FIFO, TestPushBatch) {
- ba::io_context c;
- auto fifo_id = "fifo"sv;
- static constexpr auto max_part_size = 2048ull;
- static constexpr auto max_entry_size = 128ull;
-
- s::spawn(c, [&](s::yield_context y) {
- auto r = R::RADOS::Builder{}.build(c, y);
- auto pool = create_pool(r, get_temp_pool_name(), y);
- auto sg = make_scope_guard(
- [&] {
- r.delete_pool(pool, y);
- });
- R::IOContext ioc(pool);
-
- auto f = RCf::FIFO::create(r, ioc, fifo_id, y, std::nullopt,
- std::nullopt, false, max_part_size,
- max_entry_size);
-
-
- char buf[max_entry_size];
- memset(buf, 0, sizeof(buf));
-
- auto [part_header_size, part_entry_overhead]
- = f->get_part_layout_info();
-
- auto entries_per_part =
- (max_part_size - part_header_size) /
- (max_entry_size + part_entry_overhead);
-
- auto max_entries = entries_per_part * 4 + 1; /* enough entries to span multiple parts */
-
- std::vector<cb::list> bufs;
-
- for (auto i = 0u; i < max_entries; ++i) {
- cb::list bl;
-
- *(int *)buf = i;
- bl.append(buf, sizeof(buf));
-
- bufs.push_back(bl);
- }
-
- f->push(bufs, y);
-
- /* list all */
-
- auto [result, more] = f->list(max_entries, std::nullopt, y);
- ASSERT_EQ(false, more);
- ASSERT_EQ(max_entries, result.size());
-
- for (auto i = 0u; i < max_entries; ++i) {
- auto& bl = result[i].data;
- ASSERT_EQ(i, *(int *)bl.c_str());
- }
-
- auto& info = f->meta();
- ASSERT_EQ(info.head_part_num, 4);
- });
- c.run();
-}
-
-TEST(FIFO, TestTrimExclusive) {
- ba::io_context c;
- auto fifo_id = "fifo"sv;
-
- s::spawn(c, [&](s::yield_context y) mutable {
- auto r = R::RADOS::Builder{}.build(c, y);
- auto pool = create_pool(r, get_temp_pool_name(), y);
- auto sg = make_scope_guard(
- [&] {
- r.delete_pool(pool, y);
- });
- R::IOContext ioc(pool);
- auto f = RCf::FIFO::create(r, ioc, fifo_id, y);
- static constexpr auto max_entries = 10u;
- for (uint32_t i = 0; i < max_entries; ++i) {
- cb::list bl;
- encode(i, bl);
- f->push(bl, y);
- }
-
- {
- auto [result, more] = f->list(1, std::nullopt, y);
- auto [val, marker] =
- decode_entry<std::uint32_t>(result.front());
- ASSERT_EQ(0, val);
- f->trim(marker, true, y);
- }
- {
- auto [result, more] = f->list(max_entries, std::nullopt, y);
- auto [val, marker] = decode_entry<std::uint32_t>(result.front());
- ASSERT_EQ(0, val);
- f->trim(result[4].marker, true, y);
- }
- {
- auto [result, more] = f->list(max_entries, std::nullopt, y);
- auto [val, marker] =
- decode_entry<std::uint32_t>(result.front());
- ASSERT_EQ(4, val);
- f->trim(result.back().marker, true, y);
- }
- {
- auto [result, more] = f->list(max_entries, std::nullopt, y);
- auto [val, marker] =
- decode_entry<std::uint32_t>(result.front());
- ASSERT_EQ(result.size(), 1);
- ASSERT_EQ(max_entries - 1, val);
- }
- });
- c.run();
-}
{
auto c = R::Rados::aio_create_completion();
f->get_head_info(&dp, [&](int r, RCf::part_info&& p) {
- ASSERT_TRUE(p.tag.empty());
ASSERT_EQ(0, p.magic);
ASSERT_EQ(0, p.min_ofs);
ASSERT_EQ(0, p.last_ofs);