From: Adam C. Emerson Date: Tue, 4 Feb 2020 16:33:48 +0000 (-0500) Subject: cls/fifo: Use neorados interface X-Git-Tag: v17.0.0~1186^2~11 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=6c950a2c2dbbce84eb287993be95972320a3f695;p=ceph.git cls/fifo: Use neorados interface Rewrite around the asynchronous ASIO-based RADOS interface, and include support for calls from multiple threads. `allocate_unique` pattern contributed by Casey Bodley Co-authored-by: Casey Bodley Signed-off-by: Adam C. Emerson --- diff --git a/src/cls/CMakeLists.txt b/src/cls/CMakeLists.txt index a100dd04b68e3..a18b6bf3ea76a 100644 --- a/src/cls/CMakeLists.txt +++ b/src/cls/CMakeLists.txt @@ -329,7 +329,7 @@ add_library(cls_2pc_queue_client STATIC ${cls_2pc_queue_client_srcs}) add_subdirectory(cmpomap) # cls_fifo -set(cls_fifo_srcs fifo/cls_fifo.cc fifo/cls_fifo_types.cc) +set(cls_fifo_srcs fifo/cls_fifo.cc) add_library(cls_fifo SHARED ${cls_fifo_srcs}) set_target_properties(cls_fifo PROPERTIES VERSION "1.0.0" @@ -337,9 +337,3 @@ set_target_properties(cls_fifo PROPERTIES INSTALL_RPATH "" CXX_VISIBILITY_PRESET hidden) install(TARGETS cls_fifo DESTINATION ${cls_dir}) - -set(cls_fifo_client_srcs - fifo/cls_fifo_client.cc - fifo/cls_fifo_types.cc - fifo/cls_fifo_ops.cc) -add_library(cls_fifo_client STATIC ${cls_fifo_client_srcs}) diff --git a/src/cls/fifo/cls_fifo.cc b/src/cls/fifo/cls_fifo.cc index 553ad30357398..baa94dc8eb830 100644 --- a/src/cls/fifo/cls_fifo.cc +++ b/src/cls/fifo/cls_fifo.cc @@ -8,26 +8,32 @@ * */ -#include +#include +#include +#include + +#undef FMT_HEADER_ONLY +#define FMT_HEADER_ONLY 1 +#include + +#include "include/buffer.h" +#include "include/types.h" #include "objclass/objclass.h" #include "cls/fifo/cls_fifo_ops.h" #include "cls/fifo/cls_fifo_types.h" - -using namespace rados::cls::fifo; - - CLS_VER(1,0) CLS_NAME(fifo) +namespace rados::cls::fifo { -#define CLS_FIFO_MAX_PART_HEADER_SIZE 512 +static constexpr auto CLS_FIFO_MAX_PART_HEADER_SIZE = 512; -static uint32_t part_entry_overhead; +static std::uint32_t part_entry_overhead; -struct cls_fifo_entry_header_pre { +struct entry_header_pre { __le64 magic; __le64 pre_size; __le64 header_size; @@ -36,66 +42,64 @@ struct cls_fifo_entry_header_pre { __le32 reserved; } __attribute__ ((packed)); -struct cls_fifo_entry_header { +struct entry_header { ceph::real_time mtime; - void encode(bufferlist &bl) const { + void encode(ceph::buffer::list& bl) const { ENCODE_START(1, 1, bl); encode(mtime, bl); ENCODE_FINISH(bl); } - void decode(bufferlist::const_iterator &bl) { + void decode(ceph::buffer::list::const_iterator& bl) { DECODE_START(1, bl); decode(mtime, bl); DECODE_FINISH(bl); } }; -WRITE_CLASS_ENCODER(cls_fifo_entry_header) +WRITE_CLASS_ENCODER(entry_header) +namespace { -static string new_oid_prefix(string id, std::optional& val) +std::string new_oid_prefix(std::string id, std::optional& val) { + static constexpr auto PREFIX_RND_SIZE = 12; if (val) { return *val; } -#define PREFIX_RND_SIZE 12 - char buf[PREFIX_RND_SIZE + 1]; buf[PREFIX_RND_SIZE] = 0; cls_gen_rand_base64(buf, sizeof(buf) - 1); - char s[id.size() + 1 + sizeof(buf) + 16]; - snprintf(s, sizeof(s), "%s.%s", id.c_str(), buf); - return s; + return fmt::format("{}.{}", id, buf); } -static int write_header(cls_method_context_t hctx, - fifo_info_t& header, - bool inc_ver = true) +int write_header(cls_method_context_t hctx, + info& header, + bool inc_ver = true) { - if (header.objv.instance.empty()) { -#define HEADER_INSTANCE_SIZE 16 - char buf[HEADER_INSTANCE_SIZE + 1]; - buf[HEADER_INSTANCE_SIZE] = 0; - cls_gen_rand_base64(buf, sizeof(buf) - 1); - - header.objv.instance = buf; + static constexpr auto HEADER_INSTANCE_SIZE = 16; + if (header.version.instance.empty()) { + char buf[HEADER_INSTANCE_SIZE + 1]; + buf[HEADER_INSTANCE_SIZE] = 0; + cls_gen_rand_base64(buf, sizeof(buf) - 1); + header.version.instance = buf; } if (inc_ver) { - ++header.objv.ver; + ++header.version.ver; } - bufferlist bl; + ceph::buffer::list bl; encode(header, bl); return cls_cxx_write_full(hctx, &bl); } -static int read_part_header(cls_method_context_t hctx, - fifo_part_header_t *part_header) +int read_part_header(cls_method_context_t hctx, + part_header* part_header) { - bufferlist bl; - int r = cls_cxx_read2(hctx, 0, CLS_FIFO_MAX_PART_HEADER_SIZE, &bl, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED); + ceph::buffer::list bl; + int r = cls_cxx_read2(hctx, 0, CLS_FIFO_MAX_PART_HEADER_SIZE, &bl, + CEPH_OSD_OP_FLAG_FADVISE_WILLNEED); if (r < 0) { CLS_ERR("ERROR: %s(): cls_cxx_read2() on obj returned %d", __func__, r); return r; @@ -104,34 +108,40 @@ static int read_part_header(cls_method_context_t hctx, auto iter = bl.cbegin(); try { decode(*part_header, iter); - } catch (buffer::error& err) { + } catch (const ceph::buffer::error& err) { CLS_ERR("ERROR: %s(): failed decoding part header", __func__); return -EIO; } - CLS_LOG(20, "%s():%d read part_header:\n" - "\ttag=%s\n" - "\tmagic=0x%llx\n" - "\tmin_ofs=%lld\n" - "\tmax_ofs=%lld\n" - "\tmin_index=%lld\n" - "\tmax_index=%lld\n", - __func__, __LINE__, - part_header->tag.c_str(), - (long long)part_header->magic, - (long long)part_header->min_ofs, - (long long)part_header->max_ofs, - (long long)part_header->min_index, - (long long)part_header->max_index); + using ceph::operator <<; + std::ostringstream ss; + ss << part_header->max_time; + CLS_LOG(10, "%s():%d read part_header:\n" + "\ttag=%s\n" + "\tmagic=0x%" PRIx64 "\n" + "\tmin_ofs=%" PRId64 "\n" + "\tlast_ofs=%" PRId64 "\n" + "\tnext_ofs=%" PRId64 "\n" + "\tmin_index=%" PRId64 "\n" + "\tmax_index=%" PRId64 "\n" + "\tmax_time=%s\n", + __func__, __LINE__, + part_header->tag.c_str(), + part_header->magic, + part_header->min_ofs, + part_header->last_ofs, + part_header->next_ofs, + part_header->min_index, + part_header->max_index, + ss.str().c_str()); return 0; - } -static int write_part_header(cls_method_context_t hctx, - fifo_part_header_t& part_header) +int write_part_header(cls_method_context_t hctx, + part_header& part_header) { - bufferlist bl; + ceph::buffer::list bl; encode(part_header, bl); if (bl.length() > CLS_FIFO_MAX_PART_HEADER_SIZE) { @@ -140,7 +150,7 @@ static int write_part_header(cls_method_context_t hctx, } int r = cls_cxx_write2(hctx, 0, bl.length(), - &bl, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED); + &bl, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED); if (r < 0) { CLS_LOG(10, "%s(): failed to write part header: r=%d", __func__, r); @@ -150,11 +160,11 @@ static int write_part_header(cls_method_context_t hctx, return 0; } -static int read_header(cls_method_context_t hctx, - std::optional objv, - fifo_info_t *info) +int read_header(cls_method_context_t hctx, + std::optional objv, + info* info) { - uint64_t size; + std::uint64_t size; int r = cls_cxx_stat2(hctx, &size, nullptr); if (r < 0) { @@ -162,47 +172,64 @@ static int read_header(cls_method_context_t hctx, return r; } - bufferlist bl; + ceph::buffer::list bl; r = cls_cxx_read2(hctx, 0, size, &bl, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED); if (r < 0) { CLS_ERR("ERROR: %s(): cls_cxx_read2() on obj returned %d", __func__, r); return r; } + if (r == 0) { + CLS_ERR("ERROR: %s(): Zero length object, returning ENODATA", __func__); + return -ENODATA; + } + try { auto iter = bl.cbegin(); decode(*info, iter); - } catch (buffer::error& err) { + } catch (const ceph::buffer::error& err) { CLS_ERR("ERROR: %s(): failed decoding header", __func__); return -EIO; } - if (objv && - !(info->objv == *objv)) { - string s1 = info->objv.to_str(); - string s2 = objv->to_str(); - CLS_LOG(10, "%s(): version mismatch (header=%s, req=%s), cancelled operation", __func__, s1.c_str(), s2.c_str()); + if (objv && !(info->version== *objv)) { + auto s1 = info->version.to_str(); + auto s2 = objv->to_str(); + CLS_LOG(10, "%s(): version mismatch (header=%s, req=%s), canceled operation", + __func__, s1.c_str(), s2.c_str()); return -ECANCELED; } return 0; } -static int fifo_meta_create_op(cls_method_context_t hctx, - bufferlist *in, bufferlist *out) +int create_meta(cls_method_context_t hctx, + ceph::buffer::list* in, ceph::buffer::list* out) { - CLS_LOG(20, "%s", __func__); + CLS_LOG(10, "%s", __func__); - cls_fifo_meta_create_op op; + op::create_meta op; try { auto iter = in->cbegin(); decode(op, iter); - } catch (const buffer::error &err) { + } catch (const ceph::buffer::error& err) { CLS_ERR("ERROR: %s(): failed to decode request", __func__); return -EINVAL; } - uint64_t size; + if (op.id.empty()) { + CLS_LOG(10, "%s(): ID cannot be empty", __func__); + return -EINVAL; + } + + if (op.max_part_size == 0 || + op.max_entry_size == 0 || + op.max_entry_size > op.max_part_size) { + CLS_ERR("ERROR: %s(): invalid dimensions.", __func__); + return -EINVAL; + } + + std::uint64_t size; int r = cls_cxx_stat2(hctx, &size, nullptr); if (r < 0 && r != -ENOENT) { @@ -215,18 +242,18 @@ static int fifo_meta_create_op(cls_method_context_t hctx, } if (r == 0) { - bufferlist bl; + ceph::buffer::list bl; r = cls_cxx_read2(hctx, 0, size, &bl, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED); if (r < 0) { CLS_ERR("ERROR: %s(): cls_cxx_read2() on obj returned %d", __func__, r); return r; } - fifo_info_t header; + info header; try { auto iter = bl.cbegin(); decode(header, iter); - } catch (buffer::error& err) { + } catch (const ceph::buffer::error& err) { CLS_ERR("ERROR: %s(): failed decoding header", __func__); return -EIO; } @@ -234,32 +261,33 @@ static int fifo_meta_create_op(cls_method_context_t hctx, if (!(header.id == op.id && (!op.oid_prefix || header.oid_prefix == *op.oid_prefix) && - (!op.objv || - header.objv == *op.objv))) { - CLS_LOG(10, "%s(): failed to re-create existing queue with different params", __func__); + (!op.version || + header.version == *op.version))) { + CLS_LOG(10, "%s(): failed to re-create existing queue " + "with different params", __func__); return -EEXIST; } return 0; /* already exists */ } - fifo_info_t header; - + info header; + header.id = op.id; - if (op.objv) { - header.objv = *op.objv; + if (op.version) { + header.version = *op.version; } else { -#define DEFAULT_INSTANCE_SIZE 16 + static constexpr auto DEFAULT_INSTANCE_SIZE = 16; char buf[DEFAULT_INSTANCE_SIZE + 1]; cls_gen_rand_base64(buf, sizeof(buf)); buf[DEFAULT_INSTANCE_SIZE] = '\0'; - header.objv.instance = buf; - header.objv.ver = 1; + header.version.instance = buf; + header.version.ver = 1; } header.oid_prefix = new_oid_prefix(op.id, op.oid_prefix); - header.data_params.max_part_size = op.max_part_size; - header.data_params.max_entry_size = op.max_entry_size; - header.data_params.full_size_threshold = op.max_part_size - op.max_entry_size - part_entry_overhead; + header.params.max_part_size = op.max_part_size; + header.params.max_entry_size = op.max_entry_size; + header.params.full_size_threshold = op.max_part_size - op.max_entry_size - part_entry_overhead; r = write_header(hctx, header, false); if (r < 0) { @@ -270,41 +298,46 @@ static int fifo_meta_create_op(cls_method_context_t hctx, return 0; } -static int fifo_meta_update_op(cls_method_context_t hctx, - bufferlist *in, bufferlist *out) +int update_meta(cls_method_context_t hctx, ceph::buffer::list* in, + ceph::buffer::list* out) { - CLS_LOG(20, "%s", __func__); + CLS_LOG(10, "%s", __func__); - cls_fifo_meta_update_op op; + op::update_meta op; try { auto iter = in->cbegin(); decode(op, iter); - } catch (const buffer::error &err) { + } catch (const ceph::buffer::error& err) { CLS_ERR("ERROR: %s(): failed to decode request", __func__); return -EINVAL; } - fifo_info_t header; - - int r = read_header(hctx, op.objv, &header); - if (r < 0) { - return r; + if (op.version.empty()) { + CLS_LOG(10, "%s(): no version supplied", __func__); + return -EINVAL; } - string err; + info header; - r = header.apply_update(op.tail_part_num, - op.head_part_num, - op.min_push_part_num, - op.max_push_part_num, - op.journal_entries_add, - op.journal_entries_rm, - &err); + int r = read_header(hctx, op.version, &header); if (r < 0) { - CLS_LOG(10, "%s(): %s", __func__, err.c_str()); return r; } + auto err = header.apply_update(fifo::update() + .tail_part_num(op.tail_part_num) + .head_part_num(op.head_part_num) + .min_push_part_num(op.min_push_part_num) + .max_push_part_num(op.max_push_part_num) + .journal_entries_add( + std::move(op.journal_entries_add)) + .journal_entries_rm( + std::move(op.journal_entries_rm))); + if (err) { + CLS_LOG(10, "%s(): %s", __func__, err->c_str()); + return -EINVAL; + } + r = write_header(hctx, header); if (r < 0) { CLS_LOG(10, "%s(): failed to write header: r=%d", __func__, r); @@ -314,22 +347,22 @@ static int fifo_meta_update_op(cls_method_context_t hctx, return 0; } -static int fifo_meta_get_op(cls_method_context_t hctx, - bufferlist *in, bufferlist *out) +int get_meta(cls_method_context_t hctx, ceph::buffer::list* in, + ceph::buffer::list* out) { - CLS_LOG(20, "%s", __func__); + CLS_LOG(10, "%s", __func__); - cls_fifo_meta_get_op op; + op::get_meta op; try { auto iter = in->cbegin(); decode(op, iter); - } catch (const buffer::error &err) { + } catch (const ceph::buffer::error &err) { CLS_ERR("ERROR: %s(): failed to decode request", __func__); return -EINVAL; } - cls_fifo_meta_get_op_reply reply; - int r = read_header(hctx, op.objv, &reply.info); + op::get_meta_reply reply; + int r = read_header(hctx, op.version, &reply.info); if (r < 0) { return r; } @@ -342,21 +375,26 @@ static int fifo_meta_get_op(cls_method_context_t hctx, return 0; } -static int fifo_part_init_op(cls_method_context_t hctx, - bufferlist *in, bufferlist *out) +int init_part(cls_method_context_t hctx, ceph::buffer::list* in, + ceph::buffer::list *out) { - CLS_LOG(20, "%s", __func__); + CLS_LOG(10, "%s", __func__); - cls_fifo_part_init_op op; + op::init_part op; try { auto iter = in->cbegin(); decode(op, iter); - } catch (const buffer::error &err) { + } catch (const ceph::buffer::error &err) { CLS_ERR("ERROR: %s(): failed to decode request", __func__); return -EINVAL; } - uint64_t size; + std::uint64_t size; + + if (op.tag.empty()) { + CLS_LOG(10, "%s(): tag required", __func__); + return -EINVAL; + } int r = cls_cxx_stat2(hctx, &size, nullptr); if (r < 0 && r != -ENOENT) { @@ -364,7 +402,7 @@ static int fifo_part_init_op(cls_method_context_t hctx, return r; } if (r == 0 && size > 0) { - fifo_part_header_t part_header; + part_header part_header; r = read_part_header(hctx, &part_header); if (r < 0) { CLS_LOG(10, "%s(): failed to read part header", __func__); @@ -372,23 +410,27 @@ static int fifo_part_init_op(cls_method_context_t hctx, } if (!(part_header.tag == op.tag && - part_header.params == op.data_params)) { - CLS_LOG(10, "%s(): failed to re-create existing part with different params", __func__); + part_header.params == op.params)) { + CLS_LOG(10, "%s(): failed to re-create existing part with different " + "params", __func__); return -EEXIST; } return 0; /* already exists */ } - fifo_part_header_t part_header; - + part_header part_header; + part_header.tag = op.tag; - part_header.params = op.data_params; + part_header.params = op.params; part_header.min_ofs = CLS_FIFO_MAX_PART_HEADER_SIZE; - part_header.max_ofs = part_header.min_ofs; + part_header.last_ofs = 0; + part_header.next_ofs = part_header.min_ofs; + part_header.max_time = ceph::real_clock::now(); - cls_gen_random_bytes((char *)&part_header.magic, sizeof(part_header.magic)); + cls_gen_random_bytes(reinterpret_cast(&part_header.magic), + sizeof(part_header.magic)); r = write_part_header(hctx, part_header); if (r < 0) { @@ -399,26 +441,31 @@ static int fifo_part_init_op(cls_method_context_t hctx, return 0; } -static bool full_part(const fifo_part_header_t& part_header) +bool full_part(const part_header& part_header) { - return (part_header.max_ofs > part_header.params.full_size_threshold); + return (part_header.next_ofs > part_header.params.full_size_threshold); } -static int fifo_part_push_op(cls_method_context_t hctx, - bufferlist *in, bufferlist *out) +int push_part(cls_method_context_t hctx, ceph::buffer::list* in, + ceph::buffer::list* out) { - CLS_LOG(20, "%s", __func__); + CLS_LOG(10, "%s", __func__); - cls_fifo_part_push_op op; + op::push_part op; try { auto iter = in->cbegin(); decode(op, iter); - } catch (const buffer::error &err) { + } catch (const ceph::buffer::error& err) { CLS_ERR("ERROR: %s(): failed to decode request", __func__); return -EINVAL; } - fifo_part_header_t part_header; + if (op.tag.empty()) { + CLS_LOG(10, "%s(): tag required", __func__); + return -EINVAL; + } + + part_header part_header; int r = read_part_header(hctx, &part_header); if (r < 0) { CLS_LOG(10, "%s(): failed to read part header", __func__); @@ -430,9 +477,10 @@ static int fifo_part_push_op(cls_method_context_t hctx, return -EINVAL; } - uint64_t effective_len = op.total_len + op.data_bufs.size() * part_entry_overhead; + std::uint64_t effective_len = op.total_len + op.data_bufs.size() * + part_entry_overhead; - if (effective_len > part_header.params.max_entry_size + part_entry_overhead) { + if (effective_len > part_header.params.max_part_size) { return -EINVAL; } @@ -440,57 +488,69 @@ static int fifo_part_push_op(cls_method_context_t hctx, return -ERANGE; } - struct cls_fifo_entry_header entry_header; - entry_header.mtime = real_clock::now(); - - bufferlist entry_header_bl; + auto now = ceph::real_clock::now(); + struct entry_header entry_header = { now }; + ceph::buffer::list entry_header_bl; encode(entry_header, entry_header_bl); auto max_index = part_header.max_index; - auto ofs = part_header.max_ofs; + const auto write_ofs = part_header.next_ofs; + auto ofs = part_header.next_ofs; - cls_fifo_entry_header_pre pre_header; + entry_header_pre pre_header; pre_header.magic = part_header.magic; pre_header.pre_size = sizeof(pre_header); pre_header.reserved = 0; - uint64_t total_data = 0; - + std::uint64_t total_data = 0; for (auto& data : op.data_bufs) { total_data += data.length(); + } + if (total_data != op.total_len) { + CLS_LOG(10, "%s(): length mismatch: op.total_len=%" PRId64 + " total data received=%" PRId64, + __func__, op.total_len, total_data); + return -EINVAL; + } + + + int entries_pushed = 0; + ceph::buffer::list all_data; + for (auto& data : op.data_bufs) { + if (full_part(part_header)) + break; pre_header.header_size = entry_header_bl.length(); pre_header.data_size = data.length(); pre_header.index = max_index; - bufferptr pre((char *)&pre_header, sizeof(pre_header)); - bufferlist all_data; + bufferptr pre(reinterpret_cast(&pre_header), sizeof(pre_header)); + auto entry_write_len = pre.length() + entry_header_bl.length() + data.length(); all_data.append(pre); all_data.append(entry_header_bl); all_data.claim_append(data); - auto write_len = all_data.length(); - - r = cls_cxx_write2(hctx, ofs, write_len, - &all_data, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED); - if (r < 0) { - CLS_LOG(10, "%s(): failed to write entry (ofs=%lld len=%lld): r=%d", - __func__, (long long)part_header.max_ofs, (long long)write_len, r); - return r; - } - - ofs += write_len; + part_header.last_ofs = ofs; + ofs += entry_write_len; ++max_index; + ++entries_pushed; + part_header.max_index = max_index; + part_header.next_ofs = ofs; } + part_header.max_time = now; - if (total_data != op.total_len) { - CLS_LOG(10, "%s(): length mismatch: op.total_len=%lld total data received=%lld", - __func__, (long long)op.total_len, (long long)total_data); - return -EINVAL; + auto write_len = all_data.length(); + + r = cls_cxx_write2(hctx, write_ofs, write_len, + &all_data, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED); + + if (r < 0) { + CLS_LOG(10,"%s(): failed to write entries (ofs=%" PRIu64 + " len=%u): r=%d", __func__, write_ofs, + write_len, r); + return r; } - part_header.max_index = max_index; - part_header.max_ofs = ofs; r = write_part_header(hctx, part_header); if (r < 0) { @@ -498,56 +558,60 @@ static int fifo_part_push_op(cls_method_context_t hctx, return r; } - return 0; + if (entries_pushed == 0) { + CLS_LOG(0, "%s(): pushed no entries? Can't happen!", __func__); + return -EFAULT; + } + + return entries_pushed; } class EntryReader { - static constexpr uint64_t prefetch_len = (128 * 1024); + static constexpr std::uint64_t prefetch_len = (128 * 1024); cls_method_context_t hctx; - fifo_part_header_t& part_header; + const fifo::part_header& part_header; - uint64_t ofs; - bufferlist data; + std::uint64_t ofs; + ceph::buffer::list data; - int fetch(uint64_t num_bytes); - int read(uint64_t num_bytes, bufferlist *pbl); - int peek(uint64_t num_bytes, char *dest); - int seek(uint64_t num_bytes); + int fetch(std::uint64_t num_bytes); + int read(std::uint64_t num_bytes, ceph::buffer::list* pbl); + int peek(std::uint64_t num_bytes, char *dest); + int seek(std::uint64_t num_bytes); public: - EntryReader(cls_method_context_t _hctx, - fifo_part_header_t& _part_header, - uint64_t _ofs) : hctx(_hctx), - part_header(_part_header), - ofs(_ofs) { - if (ofs < part_header.min_ofs) { - ofs = part_header.min_ofs; - } - } - - uint64_t get_ofs() const { + EntryReader(cls_method_context_t hctx, + const fifo::part_header& part_header, + uint64_t ofs) : hctx(hctx), + part_header(part_header), + ofs(ofs < part_header.min_ofs ? + part_header.min_ofs : + ofs) {} + + std::uint64_t get_ofs() const { return ofs; } bool end() const { - return (ofs >= part_header.max_ofs); + return (ofs >= part_header.next_ofs); } - int peek_pre_header(cls_fifo_entry_header_pre *pre_header); - int get_next_entry(bufferlist *pbl, - uint64_t *pofs, - ceph::real_time *pmtime); + int peek_pre_header(entry_header_pre* pre_header); + int get_next_entry(ceph::buffer::list* pbl, + std::uint64_t* pofs, + ceph::real_time* pmtime); }; -int EntryReader::fetch(uint64_t num_bytes) +int EntryReader::fetch(std::uint64_t num_bytes) { - CLS_LOG(20, "%s(): fetch %d bytes, ofs=%d data.length()=%d", __func__, (int)num_bytes, (int)ofs, (int)data.length()); + CLS_LOG(10, "%s(): fetch %d bytes, ofs=%d data.length()=%d", __func__, (int)num_bytes, (int)ofs, (int)data.length()); if (data.length() < num_bytes) { - bufferlist bl; - CLS_LOG(20, "%s(): reading %d bytes at ofs=%d", __func__, (int)prefetch_len, (int)ofs + data.length()); + ceph::buffer::list bl; + CLS_LOG(10, "%s(): reading % " PRId64 " bytes at ofs=%" PRId64, __func__, + prefetch_len, ofs + data.length()); int r = cls_cxx_read2(hctx, ofs + data.length(), prefetch_len, &bl, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED); if (r < 0) { CLS_ERR("ERROR: %s(): cls_cxx_read2() on obj returned %d", __func__, r); @@ -556,15 +620,16 @@ int EntryReader::fetch(uint64_t num_bytes) data.claim_append(bl); } - if ((unsigned)num_bytes > data.length()) { - CLS_LOG(20, "%s(): requested %lld bytes, but only %lld were available", __func__, (long long)num_bytes, (long long)data.length()); + if (static_cast(num_bytes) > data.length()) { + CLS_LOG(10, "%s(): requested %" PRId64 " bytes, but only " + "%u were available", __func__, num_bytes, data.length()); return -ERANGE; } return 0; } -int EntryReader::read(uint64_t num_bytes, bufferlist *pbl) +int EntryReader::read(std::uint64_t num_bytes, ceph::buffer::list* pbl) { int r = fetch(num_bytes); if (r < 0) { @@ -577,7 +642,7 @@ int EntryReader::read(uint64_t num_bytes, bufferlist *pbl) return 0; } -int EntryReader::peek(uint64_t num_bytes, char *dest) +int EntryReader::peek(std::uint64_t num_bytes, char* dest) { int r = fetch(num_bytes); if (r < 0) { @@ -589,23 +654,25 @@ int EntryReader::peek(uint64_t num_bytes, char *dest) return 0; } -int EntryReader::seek(uint64_t num_bytes) +int EntryReader::seek(std::uint64_t num_bytes) { - bufferlist bl; + ceph::buffer::list bl; - CLS_LOG(20, "%s():%d: num_bytes=%d", __func__, __LINE__, (int)num_bytes); + CLS_LOG(10, "%s():%d: num_bytes=%" PRIu64, __func__, __LINE__, num_bytes); return read(num_bytes, &bl); } -int EntryReader::peek_pre_header(cls_fifo_entry_header_pre *pre_header) +int EntryReader::peek_pre_header(entry_header_pre* pre_header) { if (end()) { return -ENOENT; } - int r = peek(sizeof(*pre_header), (char *)pre_header); + int r = peek(sizeof(*pre_header), + reinterpret_cast(pre_header)); if (r < 0) { - CLS_ERR("ERROR: %s(): peek() size=%d failed: r=%d", __func__, (int)sizeof(pre_header), r); + CLS_ERR("ERROR: %s(): peek() size=%zu failed: r=%d", __func__, + sizeof(pre_header), r); return r; } @@ -618,11 +685,11 @@ int EntryReader::peek_pre_header(cls_fifo_entry_header_pre *pre_header) } -int EntryReader::get_next_entry(bufferlist *pbl, - uint64_t *pofs, - ceph::real_time *pmtime) +int EntryReader::get_next_entry(ceph::buffer::list* pbl, + std::uint64_t* pofs, + ceph::real_time* pmtime) { - cls_fifo_entry_header_pre pre_header; + entry_header_pre pre_header; int r = peek_pre_header(&pre_header); if (r < 0) { CLS_ERR("ERROR: %s(): peek_pre_header() failed: r=%d", __func__, r); @@ -633,26 +700,27 @@ int EntryReader::get_next_entry(bufferlist *pbl, *pofs = ofs; } - CLS_LOG(20, "%s():%d: pre_header.pre_size=%d", __func__, __LINE__, (int)pre_header.pre_size); + CLS_LOG(10, "%s():%d: pre_header.pre_size=%llu", __func__, __LINE__, + pre_header.pre_size); r = seek(pre_header.pre_size); if (r < 0) { CLS_ERR("ERROR: %s(): failed to seek: r=%d", __func__, r); return r; } - bufferlist header; - CLS_LOG(20, "%s():%d: pre_header.header_size=%d", __func__, __LINE__, (int)pre_header.header_size); + ceph::buffer::list header; + CLS_LOG(10, "%s():%d: pre_header.header_size=%d", __func__, __LINE__, (int)pre_header.header_size); r = read(pre_header.header_size, &header); if (r < 0) { CLS_ERR("ERROR: %s(): failed to read entry header: r=%d", __func__, r); return r; } - cls_fifo_entry_header entry_header; + entry_header entry_header; auto iter = header.cbegin(); try { decode(entry_header, iter); - } catch (buffer::error& err) { + } catch (ceph::buffer::error& err) { CLS_ERR("%s(): failed decoding entry header", __func__); return -EIO; } @@ -678,21 +746,21 @@ int EntryReader::get_next_entry(bufferlist *pbl, return 0; } -static int fifo_part_trim_op(cls_method_context_t hctx, - bufferlist *in, bufferlist *out) +int trim_part(cls_method_context_t hctx, + ceph::buffer::list *in, ceph::buffer::list *out) { - CLS_LOG(20, "%s", __func__); + CLS_LOG(10, "%s", __func__); - cls_fifo_part_trim_op op; + op::trim_part op; try { auto iter = in->cbegin(); decode(op, iter); - } catch (const buffer::error &err) { + } catch (const ceph::buffer::error &err) { CLS_ERR("ERROR: %s(): failed to decode request", __func__); return -EINVAL; } - fifo_part_header_t part_header; + part_header part_header; int r = read_part_header(hctx, &part_header); if (r < 0) { CLS_LOG(10, "%s(): failed to read part header", __func__); @@ -709,7 +777,7 @@ static int fifo_part_trim_op(cls_method_context_t hctx, return 0; } - if (op.ofs >= part_header.max_ofs) { + if (op.ofs >= part_header.next_ofs) { if (full_part(part_header)) { /* * trim full part completely: remove object @@ -718,18 +786,18 @@ static int fifo_part_trim_op(cls_method_context_t hctx, r = cls_cxx_remove(hctx); if (r < 0) { CLS_LOG(0, "%s(): ERROR: cls_cxx_remove() returned r=%d", __func__, r); - return r; + return r; } return 0; } - - part_header.min_ofs = part_header.max_ofs; + + part_header.min_ofs = part_header.next_ofs; part_header.min_index = part_header.max_index; } else { EntryReader reader(hctx, part_header, op.ofs); - cls_fifo_entry_header_pre pre_header; + entry_header_pre pre_header; int r = reader.peek_pre_header(&pre_header); if (r < 0) { return r; @@ -737,7 +805,8 @@ static int fifo_part_trim_op(cls_method_context_t hctx, r = reader.get_next_entry(nullptr, nullptr, nullptr); if (r < 0) { - CLS_ERR("ERROR: %s(): unexpected failure at get_next_entry(): r=%d", __func__, r); + CLS_ERR("ERROR: %s(): unexpected failure at get_next_entry(): r=%d", + __func__, r); return r; } @@ -754,12 +823,12 @@ static int fifo_part_trim_op(cls_method_context_t hctx, return 0; } -static int fifo_part_list_op(cls_method_context_t hctx, - bufferlist *in, bufferlist *out) +int list_part(cls_method_context_t hctx, ceph::buffer::list* in, + ceph::buffer::list* out) { - CLS_LOG(20, "%s", __func__); + CLS_LOG(10, "%s", __func__); - cls_fifo_part_list_op op; + op::list_part op; try { auto iter = in->cbegin(); decode(op, iter); @@ -768,7 +837,7 @@ static int fifo_part_list_op(cls_method_context_t hctx, return -EINVAL; } - fifo_part_header_t part_header; + part_header part_header; int r = read_part_header(hctx, &part_header); if (r < 0) { CLS_LOG(10, "%s(): failed to read part header", __func__); @@ -792,22 +861,21 @@ static int fifo_part_list_op(cls_method_context_t hctx, } } - cls_fifo_part_list_op_reply reply; + op::list_part_reply reply; reply.tag = part_header.tag; -#define LIST_MAX_ENTRIES 512 - - auto max_entries = std::min(op.max_entries, (int)LIST_MAX_ENTRIES); + auto max_entries = std::min(op.max_entries, op::MAX_LIST_ENTRIES); for (int i = 0; i < max_entries && !reader.end(); ++i) { - bufferlist data; + ceph::buffer::list data; ceph::real_time mtime; - uint64_t ofs; + std::uint64_t ofs; r = reader.get_next_entry(&data, &ofs, &mtime); if (r < 0) { - CLS_ERR("ERROR: %s(): unexpected failure at get_next_entry(): r=%d", __func__, r); + CLS_ERR("ERROR: %s(): unexpected failure at get_next_entry(): r=%d", + __func__, r); return r; } @@ -822,21 +890,21 @@ static int fifo_part_list_op(cls_method_context_t hctx, return 0; } -static int fifo_part_get_info_op(cls_method_context_t hctx, - bufferlist *in, bufferlist *out) +int get_part_info(cls_method_context_t hctx, ceph::buffer::list *in, + ceph::buffer::list *out) { - CLS_LOG(20, "%s", __func__); + CLS_LOG(10, "%s", __func__); - cls_fifo_part_get_info_op op; + op::get_part_info op; try { auto iter = in->cbegin(); decode(op, iter); - } catch (const buffer::error &err) { + } catch (const ceph::buffer::error &err) { CLS_ERR("ERROR: %s(): failed to decode request", __func__); return -EINVAL; } - cls_fifo_part_get_info_op_reply reply; + op::get_part_info_reply reply; int r = read_part_header(hctx, &reply.header); if (r < 0) { @@ -848,60 +916,63 @@ static int fifo_part_get_info_op(cls_method_context_t hctx, return 0; } +} +} // namespace rados::cls::fifo CLS_INIT(fifo) { - CLS_LOG(20, "Loaded fifo class!"); + using namespace rados::cls::fifo; + CLS_LOG(10, "Loaded fifo class!"); cls_handle_t h_class; - cls_method_handle_t h_fifo_meta_create_op; - cls_method_handle_t h_fifo_meta_get_op; - cls_method_handle_t h_fifo_meta_update_op; - cls_method_handle_t h_fifo_part_init_op; - cls_method_handle_t h_fifo_part_push_op; - cls_method_handle_t h_fifo_part_trim_op; - cls_method_handle_t h_fifo_part_list_op; - cls_method_handle_t h_fifo_part_get_info_op; - - cls_register("fifo", &h_class); - cls_register_cxx_method(h_class, "fifo_meta_create", + cls_method_handle_t h_create_meta; + cls_method_handle_t h_get_meta; + cls_method_handle_t h_update_meta; + cls_method_handle_t h_init_part; + cls_method_handle_t h_push_part; + cls_method_handle_t h_trim_part; + cls_method_handle_t h_list_part; + cls_method_handle_t h_get_part_info; + + cls_register(op::CLASS, &h_class); + cls_register_cxx_method(h_class, op::CREATE_META, CLS_METHOD_RD | CLS_METHOD_WR, - fifo_meta_create_op, &h_fifo_meta_create_op); + create_meta, &h_create_meta); - cls_register_cxx_method(h_class, "fifo_meta_get", + cls_register_cxx_method(h_class, op::GET_META, CLS_METHOD_RD, - fifo_meta_get_op, &h_fifo_meta_get_op); + get_meta, &h_get_meta); - cls_register_cxx_method(h_class, "fifo_meta_update", + cls_register_cxx_method(h_class, op::UPDATE_META, CLS_METHOD_RD | CLS_METHOD_WR, - fifo_meta_update_op, &h_fifo_meta_update_op); + update_meta, &h_update_meta); - cls_register_cxx_method(h_class, "fifo_part_init", + cls_register_cxx_method(h_class, op::INIT_PART, CLS_METHOD_RD | CLS_METHOD_WR, - fifo_part_init_op, &h_fifo_part_init_op); + init_part, &h_init_part); - cls_register_cxx_method(h_class, "fifo_part_push", + cls_register_cxx_method(h_class, op::PUSH_PART, CLS_METHOD_RD | CLS_METHOD_WR, - fifo_part_push_op, &h_fifo_part_push_op); + push_part, &h_push_part); - cls_register_cxx_method(h_class, "fifo_part_trim", + cls_register_cxx_method(h_class, op::TRIM_PART, CLS_METHOD_RD | CLS_METHOD_WR, - fifo_part_trim_op, &h_fifo_part_trim_op); + trim_part, &h_trim_part); - cls_register_cxx_method(h_class, "fifo_part_list", + cls_register_cxx_method(h_class, op::LIST_PART, CLS_METHOD_RD, - fifo_part_list_op, &h_fifo_part_list_op); + list_part, &h_list_part); - cls_register_cxx_method(h_class, "fifo_part_get_info", + cls_register_cxx_method(h_class, op::GET_PART_INFO, CLS_METHOD_RD, - fifo_part_get_info_op, &h_fifo_part_get_info_op); + get_part_info, &h_get_part_info); /* calculate entry overhead */ - struct cls_fifo_entry_header entry_header; - bufferlist entry_header_bl; + struct entry_header entry_header; + ceph::buffer::list entry_header_bl; encode(entry_header, entry_header_bl); - part_entry_overhead = sizeof(cls_fifo_entry_header_pre) + entry_header_bl.length(); + part_entry_overhead = sizeof(entry_header_pre) + entry_header_bl.length(); return; } diff --git a/src/cls/fifo/cls_fifo_client.cc b/src/cls/fifo/cls_fifo_client.cc deleted file mode 100644 index f4888fda9b1f7..0000000000000 --- a/src/cls/fifo/cls_fifo_client.cc +++ /dev/null @@ -1,1070 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab - -/* - * Ceph - scalable distributed file system - * - * Copyright (C) 2019 Red Hat, Inc. - * - * This is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software - * Foundation. See file COPYING. - * - */ - -#include "include/rados/librados.hpp" -#include "common/dout.h" - -#include "auth/Crypto.h" - -using namespace librados; - -#include "cls/fifo/cls_fifo_ops.h" -#include "cls/fifo/cls_fifo_client.h" - - -#define dout_subsys ceph_subsys_objclass - - -namespace rados { - namespace cls { - namespace fifo { - int ClsFIFO::meta_create(librados::ObjectWriteOperation *rados_op, - const string& id, - const MetaCreateParams& params) { - cls_fifo_meta_create_op op; - - auto& state = params.state; - - if (id.empty()) { - return -EINVAL; - } - - op.id = id; - op.objv = state.objv; - op.oid_prefix = state.oid_prefix; - op.max_part_size = state.max_part_size; - op.max_entry_size = state.max_entry_size; - op.exclusive = state.exclusive; - - if (op.max_part_size == 0 || - op.max_entry_size == 0 || - op.max_entry_size > op.max_part_size) { - return -EINVAL; - } - - bufferlist in; - encode(op, in); - rados_op->exec("fifo", "fifo_meta_create", in); - - return 0; - } - - int ClsFIFO::meta_get(librados::IoCtx& ioctx, - const string& oid, - const MetaGetParams& params, - fifo_info_t *result, - uint32_t *part_header_size, - uint32_t *part_entry_overhead) { - cls_fifo_meta_get_op op; - - auto& state = params.state; - - op.objv = state.objv; - - librados::ObjectReadOperation rop; - - bufferlist in; - bufferlist out; - int op_ret; - encode(op, in); - rop.exec("fifo", "fifo_meta_get", in, &out, &op_ret); - - int r = ioctx.operate(oid, &rop, nullptr); - if (r < 0) { - return r; - } - - if (op_ret < 0) { - return op_ret; - } - - cls_fifo_meta_get_op_reply reply; - auto iter = out.cbegin(); - try { - decode(reply, iter); - } catch (buffer::error& err) { - return -EIO; - } - - *result = reply.info; - - if (part_header_size) { - *part_header_size = reply.part_header_size; - } - - if (part_entry_overhead) { - *part_entry_overhead = reply.part_entry_overhead; - } - - return 0; - } - - int ClsFIFO::meta_update(librados::ObjectWriteOperation *rados_op, - const MetaUpdateParams& params) { - cls_fifo_meta_update_op op; - - auto& state = params.state; - - if (state.objv.empty()) { - return -EINVAL; - } - - op.objv = state.objv; - op.tail_part_num = state.tail_part_num; - op.head_part_num = state.head_part_num; - op.min_push_part_num = state.min_push_part_num; - op.max_push_part_num = state.max_push_part_num; - op.journal_entries_add = state.journal_entries_add; - op.journal_entries_rm = state.journal_entries_rm; - - bufferlist in; - encode(op, in); - rados_op->exec("fifo", "fifo_meta_update", in); - - return 0; - } - - int ClsFIFO::part_init(librados::ObjectWriteOperation *rados_op, - const PartInitParams& params) { - cls_fifo_part_init_op op; - - auto& state = params.state; - - if (state.tag.empty()) { - return -EINVAL; - } - - op.tag = state.tag; - op.data_params = state.data_params; - - bufferlist in; - encode(op, in); - rados_op->exec("fifo", "fifo_part_init", in); - - return 0; - } - - int ClsFIFO::push_part(librados::ObjectWriteOperation *rados_op, - const PushPartParams& params) { - cls_fifo_part_push_op op; - - auto& state = params.state; - - if (state.tag.empty()) { - return -EINVAL; - } - - op.tag = state.tag; - op.data_bufs = state.data_bufs; - op.total_len = state.total_len; - - bufferlist in; - encode(op, in); - rados_op->exec("fifo", "fifo_part_push", in); - - return 0; - } - - int ClsFIFO::trim_part(librados::ObjectWriteOperation *rados_op, - const TrimPartParams& params) { - cls_fifo_part_trim_op op; - - auto& state = params.state; - - op.tag = state.tag; - op.ofs = state.ofs; - - bufferlist in; - encode(op, in); - rados_op->exec("fifo", "fifo_part_trim", in); - - return 0; - } - - int ClsFIFO::list_part(librados::IoCtx& ioctx, - const string& oid, - const ListPartParams& params, - std::vector *pentries, - bool *more, - bool *full_part, - string *ptag) - { - cls_fifo_part_list_op op; - - auto& state = params.state; - - op.tag = state.tag; - op.ofs = state.ofs; - op.max_entries = state.max_entries; - - librados::ObjectReadOperation rop; - - bufferlist in; - bufferlist out; - int op_ret; - encode(op, in); - rop.exec("fifo", "fifo_part_list", in, &out, &op_ret); - - int r = ioctx.operate(oid, &rop, nullptr); - if (r < 0) { - return r; - } - - if (op_ret < 0) { - return op_ret; - } - - cls_fifo_part_list_op_reply reply; - auto iter = out.cbegin(); - try { - decode(reply, iter); - } catch (buffer::error& err) { - return -EIO; - } - - if (pentries) { - *pentries = std::move(reply.entries); - } - - if (more) { - *more = reply.more; - } - - if (full_part) { - *full_part = reply.full_part; - } - - if (ptag) { - *ptag = reply.tag; - } - - return 0; - } - - int ClsFIFO::get_part_info(librados::IoCtx& ioctx, - const string& oid, - rados::cls::fifo::fifo_part_header_t *header) - { - cls_fifo_part_get_info_op op; - - librados::ObjectReadOperation rop; - - bufferlist in; - bufferlist out; - int op_ret; - encode(op, in); - rop.exec("fifo", "fifo_part_get_info", in, &out, &op_ret); - - int r = ioctx.operate(oid, &rop, nullptr); - if (r < 0) { - return r; - } - - if (op_ret < 0) { - return op_ret; - } - - cls_fifo_part_get_info_op_reply reply; - auto iter = out.cbegin(); - try { - decode(reply, iter); - } catch (buffer::error& err) { - return -EIO; - } - - if (header) { - *header = std::move(reply.header); - } - - return 0; - } - - string FIFO::craft_marker(int64_t part_num, - uint64_t part_ofs) - { - char buf[64]; - snprintf(buf, sizeof(buf), "%lld:%lld", (long long)part_num, (long long)part_ofs); - return string(buf); - } - - bool FIFO::parse_marker(const string& marker, - int64_t *part_num, - uint64_t *part_ofs) - { - if (marker.empty()) { - *part_num = meta_info.tail_part_num; - *part_ofs = 0; - return true; - } - - auto pos = marker.find(':'); - if (pos == string::npos) { - return false; - } - - auto first = marker.substr(0, pos); - auto second = marker.substr(pos + 1); - - string err; - - *part_num = (int64_t)strict_strtoll(first.c_str(), 10, &err); - if (!err.empty()) { - return false; - } - - *part_ofs = (uint64_t)strict_strtoll(second.c_str(), 10, &err); - if (!err.empty()) { - return false; - } - - return true; - } - - int FIFO::init_ioctx(librados::Rados *rados, - const string& pool, - std::optional pool_ns) - { - _ioctx.emplace(); - int r = rados->ioctx_create(pool.c_str(), *_ioctx); - if (r < 0) { - return r; - } - - if (pool_ns && !pool_ns->empty()) { - _ioctx->set_namespace(*pool_ns); - } - - ioctx = &(*_ioctx); - - return 0; - } - - int ClsFIFO::MetaUpdateParams::apply_update(CephContext *cct, - fifo_info_t *info) - { - string err; - - int r = info->apply_update(state.tail_part_num, - state.head_part_num, - state.min_push_part_num, - state.max_push_part_num, - state.journal_entries_add, - state.journal_entries_rm, - &err); - if (r < 0) { - ldout(cct, 0) << __func__ << "(): ERROR: " << err << dendl; - return r; - } - - ++info->objv.ver; - - return 0; - } - - int FIFO::update_meta(ClsFIFO::MetaUpdateParams& update_params, - bool *canceled) - { - update_params.objv(meta_info.objv); - - librados::ObjectWriteOperation wop; - int r = ClsFIFO::meta_update(&wop, update_params); - if (r < 0) { - return r; - } - - r = ioctx->operate(meta_oid, &wop); - if (r < 0 && r != -ECANCELED) { - return r; - } - - *canceled = (r == -ECANCELED); - - if (!*canceled) { - r = update_params.apply_update(cct, &meta_info); - if (r < 0) { /* should really not happen, - but if it does, let's treat it as if race was detected */ - *canceled = true; - } - } - - if (*canceled) { - r = do_read_meta(); - } - if (r < 0) { - return r; - } - - return 0; - } - - int FIFO::do_read_meta(std::optional objv) - { - ClsFIFO::MetaGetParams get_params; - if (objv) { - get_params.objv(*objv); - } - int r = ClsFIFO::meta_get(*ioctx, - meta_oid, - get_params, - &meta_info, - &part_header_size, - &part_entry_overhead); - if (r < 0) { - return r; - } - - return 0; - } - - int FIFO::create_part(int64_t part_num, const string& tag, - int64_t& max_part_num) { - librados::ObjectWriteOperation op; - - op.create(true); /* exclusive */ - int r = ClsFIFO::part_init(&op, - ClsFIFO::PartInitParams() - .tag(tag) - .data_params(meta_info.data_params)); - if (r < 0) { - return r; - } - - r = ioctx->operate(meta_info.part_oid(part_num), &op); - if (r < 0) { - return r; - } - - if (part_num > max_part_num) { - max_part_num = part_num; - } - - return 0; - } - - int FIFO::remove_part(int64_t part_num, const string& tag, - int64_t& tail_part_num) { - librados::ObjectWriteOperation op; - op.remove(); - int r = ioctx->operate(meta_info.part_oid(part_num), &op); - if (r == -ENOENT) { - r = 0; - } - if (r < 0) { - return r; - } - - if (part_num >= tail_part_num) { - tail_part_num = part_num + 1; - } - - return 0; - } - - int FIFO::process_journal_entry(const fifo_journal_entry_t& entry, - int64_t& tail_part_num, - int64_t& head_part_num, - int64_t& max_part_num) - { - - switch (entry.op) { - case fifo_journal_entry_t::Op::OP_CREATE: - return create_part(entry.part_num, entry.part_tag, max_part_num); - case fifo_journal_entry_t::Op::OP_SET_HEAD: - if (entry.part_num > head_part_num) { - head_part_num = entry.part_num; - } - return 0; - case fifo_journal_entry_t::Op::OP_REMOVE: - return remove_part(entry.part_num, entry.part_tag, tail_part_num); - default: - /* nothing to do */ - break; - } - - return -EIO; - } - - int FIFO::process_journal_entries(vector *processed, - int64_t& tail_part_num, - int64_t& head_part_num, - int64_t& max_part_num) - { - for (auto& iter : meta_info.journal) { - auto& entry = iter.second; - int r = process_journal_entry(entry, tail_part_num, head_part_num, max_part_num); - if (r < 0) { - ldout(cct, 10) << __func__ << "(): ERROR: failed processing journal entry for part=" << entry.part_num << dendl; - } else { - processed->push_back(entry); - } - } - - return 0; - } - - int FIFO::process_journal() - { - vector processed; - - int64_t new_tail = meta_info.tail_part_num; - int64_t new_head = meta_info.head_part_num; - int64_t new_max = meta_info.max_push_part_num; - - int r = process_journal_entries(&processed, new_tail, new_head, new_max); - if (r < 0) { - return r; - } - - if (processed.empty()) { - return 0; - } - -#define RACE_RETRY 10 - - int i; - - for (i = 0; i < RACE_RETRY; ++i) { - bool canceled; - - std::optional tail_part_num; - std::optional head_part_num; - std::optional max_part_num; - - if (new_tail > meta_info.tail_part_num) { - tail_part_num = new_tail; - } - - if (new_head > meta_info.head_part_num) { - head_part_num = new_head; - } - - if (new_max > meta_info.max_push_part_num) { - max_part_num = new_max; - } - - if (processed.empty() && - !tail_part_num && - !max_part_num) { - /* nothing to update anymore */ - break; - } - - r = update_meta(ClsFIFO::MetaUpdateParams() - .journal_entries_rm(processed) - .tail_part_num(tail_part_num) - .head_part_num(head_part_num) - .max_push_part_num(max_part_num), - &canceled); - if (r < 0) { - return r; - } - - if (canceled) { - vector new_processed; - - for (auto& e : processed) { - auto jiter = meta_info.journal.find(e.part_num); - if (jiter == meta_info.journal.end() || /* journal entry was already processed */ - !(jiter->second == e)) { - continue; - } - - new_processed.push_back(e); - } - processed = std::move(new_processed); - continue; - } - break; - } - if (i == RACE_RETRY) { - ldout(cct, 0) << "ERROR: " << __func__ << "(): race check failed too many times, likely a bug" << dendl; - return -ECANCELED; - } - return 0; - } - - static const char alphanum_plain_table[]="ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789"; - - void gen_rand_alphanumeric_plain(CephContext *cct, char *dest, int size) /* size should be the required string size + 1 */ - { - cct->random()->get_bytes(dest, size); - - int i; - for (i = 0; i < size - 1; i++) { - int pos = (unsigned)dest[i]; - dest[i] = alphanum_plain_table[pos % (sizeof(alphanum_plain_table) - 1)]; - } - dest[i] = '\0'; - } - - static string generate_tag(CephContext *cct) - { -#define HEADER_TAG_SIZE 16 - char buf[HEADER_TAG_SIZE + 1]; - buf[HEADER_TAG_SIZE] = 0; - gen_rand_alphanumeric_plain(cct, buf, sizeof(buf)); - return string(buf); - } - - int FIFO::prepare_new_part(bool is_head) - { - fifo_journal_entry_t jentry; - - meta_info.prepare_next_journal_entry(&jentry, generate_tag(cct)); - - int64_t new_head_part_num = meta_info.head_part_num; - - std::optional new_head_jentry; - if (is_head) { - new_head_jentry = jentry; - new_head_jentry->op = fifo_journal_entry_t::OP_SET_HEAD; - new_head_part_num = jentry.part_num; - } - - int r; - bool canceled; - - int i; - - for (i = 0; i < RACE_RETRY; ++i) { - r = update_meta(ClsFIFO::MetaUpdateParams() - .journal_entry_add(jentry) - .journal_entry_add(new_head_jentry), - &canceled); - if (r < 0) { - return r; - } - - if (canceled) { - if (meta_info.max_push_part_num >= jentry.part_num && - meta_info.head_part_num >= new_head_part_num) { /* raced, but new part was already written */ - return 0; - } - - auto iter = meta_info.journal.find(jentry.part_num); - if (iter == meta_info.journal.end()) { - continue; - } - } - break; - } - if (i == RACE_RETRY) { - ldout(cct, 0) << "ERROR: " << __func__ << "(): race check failed too many times, likely a bug" << dendl; - return -ECANCELED; - } - - r = process_journal(); - if (r < 0) { - return r; - } - - return 0; - } - - int FIFO::prepare_new_head() - { - int64_t new_head_num = meta_info.head_part_num + 1; - - if (meta_info.max_push_part_num < new_head_num) { - int r = prepare_new_part(true); - if (r < 0) { - return r; - } - - if (meta_info.max_push_part_num < new_head_num) { - ldout(cct, 0) << "ERROR: " << __func__ << ": after new part creation: meta_info.max_push_part_num=" - << meta_info.max_push_part_num << " new_head_num=" << meta_info.max_push_part_num << dendl; - return -EIO; - } - - return 0; - } - - int i; - - for (i = 0; i < RACE_RETRY; ++i) { - bool canceled; - int r = update_meta(ClsFIFO::MetaUpdateParams() - .head_part_num(new_head_num), - &canceled); - if (r < 0) { - return r; - } - - if (canceled) { - if (meta_info.head_part_num < new_head_num) { - continue; - } - } - break; - } - if (i == RACE_RETRY) { - ldout(cct, 0) << "ERROR: " << __func__ << "(): race check failed too many times, likely a bug" << dendl; - return -ECANCELED; - } - - - return 0; - } - - int FIFO::open(bool create, - std::optional create_params) - { - if (!ioctx) { - return -EINVAL; - } - - if (create) { - librados::ObjectWriteOperation op; - - ClsFIFO::MetaCreateParams default_params; - ClsFIFO::MetaCreateParams *params = (create_params ? &(*create_params) : &default_params); - - int r = ClsFIFO::meta_create(&op, id, *params); - if (r < 0) { - return r; - } - - r = ioctx->operate(meta_oid, &op); - if (r < 0) { - return r; - } - } - - std::optional objv = (create_params ? create_params->state.objv : nullopt); - - int r = do_read_meta(objv); - if (r < 0) { - return r; - } - - is_open = true; - - return 0; - } - - int FIFO::read_meta(std::optional objv) - { - if (!is_open) { - return -EINVAL; - } - - return do_read_meta(objv); - } - - int FIFO::push_entries(int64_t part_num, std::vector& data_bufs) - { - if (!is_open) { - return -EINVAL; - } - - librados::ObjectWriteOperation op; - - int r = ClsFIFO::push_part(&op, ClsFIFO::PushPartParams() - .tag(meta_info.head_tag) - .data_bufs(data_bufs)); - if (r < 0) { - return r; - } - - r = ioctx->operate(meta_info.part_oid(part_num), &op); - if (r < 0) { - return r; - } - - return 0; - } - - int FIFO::trim_part(int64_t part_num, - uint64_t ofs, - std::optional tag) - { - if (!is_open) { - return -EINVAL; - } - - librados::ObjectWriteOperation op; - - int r = ClsFIFO::trim_part(&op, ClsFIFO::TrimPartParams() - .tag(tag) - .ofs(ofs)); - if (r < 0) { - return r; - } - - r = ioctx->operate(meta_info.part_oid(part_num), &op); - if (r < 0) { - return r; - } - - return 0; - } - - int FIFO::push(bufferlist& bl) - { - std::vector data_bufs; - data_bufs.push_back(bl); - - return push(data_bufs); - } - - int FIFO::push(vector& data_bufs) - { - if (!is_open) { - return -EINVAL; - } - - int r; - - if (meta_info.need_new_head()) { - r = prepare_new_head(); - if (r < 0) { - return r; - } - } - - int i; - - auto iter = data_bufs.begin(); - - while (iter != data_bufs.end()) { - uint64_t batch_len = 0; - - vector batch; - - for (; iter != data_bufs.end(); ++iter) { - auto& data = *iter; - auto data_len = data.length(); - auto max_entry_size = meta_info.data_params.max_entry_size; - - if (data_len > max_entry_size) { - ldout(cct, 10) << __func__ << "(): entry too large: " << data_len << " > " << meta_info.data_params.max_entry_size << dendl; - return -EINVAL; - } - - if (batch_len + data_len > max_entry_size) { - break; - } - - batch_len += data_len + part_entry_overhead; /* we can send entry with data_len up to max_entry_size, - however, we want to also account the overhead when dealing - with multiple entries. Previous check doesn't account - for overhead on purpose. */ - - batch.push_back(data); - } - - - for (i = 0; i < RACE_RETRY; ++i) { - r = push_entries(meta_info.head_part_num, batch); - if (r == -ERANGE) { - r = prepare_new_head(); - if (r < 0) { - return r; - } - continue; - } - if (r < 0) { - return r; - } - break; - } - if (i == RACE_RETRY) { - ldout(cct, 0) << "ERROR: " << __func__ << "(): race check failed too many times, likely a bug" << dendl; - return -ECANCELED; - } - } - - return 0; - } - - int FIFO::list(int max_entries, - std::optional marker, - vector *result, - bool *more) - { - if (!is_open) { - return -EINVAL; - } - - *more = false; - - int64_t part_num = meta_info.tail_part_num; - uint64_t ofs = 0; - - if (marker) { - if (!parse_marker(*marker, &part_num, &ofs)) { - ldout(cct, 20) << __func__ << "(): failed to parse marker (" << *marker << ")" << dendl; - return -EINVAL; - } - } - - result->clear(); - result->reserve(max_entries); - - bool part_more{false}; - bool part_full{false}; - - while (max_entries > 0) { - std::vector entries; - int r = ClsFIFO::list_part(*ioctx, - meta_info.part_oid(part_num), - ClsFIFO::ListPartParams() - .ofs(ofs) - .max_entries(max_entries), - &entries, - &part_more, - &part_full, - nullptr); - if (r == -ENOENT) { - r = do_read_meta(); - if (r < 0) { - return r; - } - - if (part_num < meta_info.tail_part_num) { - /* raced with trim? restart */ - result->clear(); - part_num = meta_info.tail_part_num; - ofs = 0; - continue; - } - - /* assuming part was not written yet, so end of data */ - - *more = false; - - return 0; - } - if (r < 0) { - ldout(cct, 20) << __func__ << "(): ClsFIFO::list_part() on oid=" << meta_info.part_oid(part_num) << " returned r=" << r << dendl; - return r; - } - - for (auto& entry : entries) { - fifo_entry e; - e.data = std::move(entry.data); - e.marker = craft_marker(part_num, entry.ofs); - e.mtime = entry.mtime; - - result->push_back(e); - } - max_entries -= entries.size(); - - if (max_entries > 0 && - part_more) { - continue; - } - - if (!part_full) { /* head part is not full */ - break; - } - - ++part_num; - ofs = 0; - } - - *more = part_full || part_more; - - return 0; - } - - int FIFO::trim(const string& marker) - { - if (!is_open) { - return -EINVAL; - } - - int64_t part_num; - uint64_t ofs; - - if (!parse_marker(marker, &part_num, &ofs)) { - ldout(cct, 20) << __func__ << "(): failed to parse marker: marker=" << marker << dendl; - return -EINVAL; - } - - for (int64_t pn = meta_info.tail_part_num; pn < part_num; ++pn) { - int r = trim_part(pn, meta_info.data_params.max_part_size, std::nullopt); - if (r < 0 && - r != -ENOENT) { - ldout(cct, 0) << __func__ << "(): ERROR: trim_part() on part=" << pn << " returned r=" << r << dendl; - return r; - } - } - - int r = trim_part(part_num, ofs, std::nullopt); - if (r < 0 && - r != -ENOENT) { - ldout(cct, 0) << __func__ << "(): ERROR: trim_part() on part=" << part_num << " returned r=" << r << dendl; - return r; - } - - if (part_num <= meta_info.tail_part_num) { - /* don't need to modify meta info */ - return 0; - } - - int i; - - for (i = 0; i < RACE_RETRY; ++i) { - bool canceled; - int r = update_meta(ClsFIFO::MetaUpdateParams() - .tail_part_num(part_num), - &canceled); - if (r < 0) { - return r; - } - - if (canceled) { - if (meta_info.tail_part_num < part_num) { - continue; - } - } - break; - - if (i == RACE_RETRY) { - ldout(cct, 0) << "ERROR: " << __func__ << "(): race check failed too many times, likely a bug" << dendl; - return -ECANCELED; - } - } - - return 0; - } - - int FIFO::get_part_info(int64_t part_num, - fifo_part_info *result) - { - if (!is_open) { - return -EINVAL; - } - - fifo_part_header_t header; - - int r = ClsFIFO::get_part_info(*ioctx, - meta_info.part_oid(part_num), - &header); - if (r < 0) { - return r; - } - - *result = std::move(header); - - return 0; - } - - } // namespace fifo - } // namespace cls -} // namespace rados - diff --git a/src/cls/fifo/cls_fifo_client.h b/src/cls/fifo/cls_fifo_client.h deleted file mode 100644 index 02e5681c47d39..0000000000000 --- a/src/cls/fifo/cls_fifo_client.h +++ /dev/null @@ -1,382 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab - -/* - * Ceph - scalable distributed file system - * - * Copyright (C) 2019 Red Hat, Inc. - * - * This is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software - * Foundation. See file COPYING. - * - */ - -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab - - -#pragma once - -#include "cls/fifo/cls_fifo_types.h" - -namespace rados { - namespace cls { - namespace fifo { - - class ClsFIFO { - public: - - /* create */ - - struct MetaCreateParams { - struct State { - static constexpr uint64_t default_max_part_size = 4 * 1024 * 1024; - static constexpr uint64_t default_max_entry_size = 32 * 1024; - std::optional objv; - std::optional oid_prefix; - bool exclusive{false}; - uint64_t max_part_size{default_max_part_size}; - uint64_t max_entry_size{default_max_entry_size}; - } state; - - MetaCreateParams& oid_prefix(const std::string& oid_prefix) { - state.oid_prefix = oid_prefix; - return *this; - } - MetaCreateParams& exclusive(bool exclusive) { - state.exclusive = exclusive; - return *this; - } - MetaCreateParams& max_part_size(uint64_t max_part_size) { - state.max_part_size = max_part_size; - return *this; - } - MetaCreateParams& max_entry_size(uint64_t max_entry_size) { - state.max_entry_size = max_entry_size; - return *this; - } - MetaCreateParams& objv(const fifo_objv_t& objv) { - state.objv = objv; - return *this; - } - MetaCreateParams& objv(const std::string& instance, uint64_t ver) { - state.objv = fifo_objv_t{instance, ver}; - return *this; - } - }; - - static int meta_create(librados::ObjectWriteOperation *op, - const string& id, - const MetaCreateParams& params); - - /* get info */ - - struct MetaGetParams { - struct State { - std::optional objv; - } state; - - MetaGetParams& objv(std::optional& v) { - state.objv = v; - return *this; - } - MetaGetParams& objv(const fifo_objv_t& v) { - state.objv = v; - return *this; - } - MetaGetParams& objv(const std::string& instance, uint64_t ver) { - state.objv = fifo_objv_t{instance, ver}; - return *this; - } - }; - static int meta_get(librados::IoCtx& ioctx, - const string& oid, - const MetaGetParams& params, - rados::cls::fifo::fifo_info_t *result, - uint32_t *part_header_size, - uint32_t *part_entry_overhead); - - /* update */ - - struct MetaUpdateParams { - struct State { - rados::cls::fifo::fifo_objv_t objv; - - std::optional tail_part_num; - std::optional head_part_num; - std::optional min_push_part_num; - std::optional max_push_part_num; - std::vector journal_entries_add; - std::vector journal_entries_rm; - } state; - - MetaUpdateParams& objv(const fifo_objv_t& objv) { - state.objv = objv; - return *this; - } - MetaUpdateParams& tail_part_num(std::optional tail_part_num) { - state.tail_part_num = tail_part_num; - return *this; - } - MetaUpdateParams& tail_part_num(uint64_t tail_part_num) { - state.tail_part_num = tail_part_num; - return *this; - } - MetaUpdateParams& head_part_num(std::optional head_part_num) { - state.head_part_num = head_part_num; - return *this; - } - MetaUpdateParams& head_part_num(uint64_t head_part_num) { - state.head_part_num = head_part_num; - return *this; - } - MetaUpdateParams& min_push_part_num(uint64_t num) { - state.min_push_part_num = num; - return *this; - } - MetaUpdateParams& max_push_part_num(std::optional num) { - state.max_push_part_num = num; - return *this; - } - MetaUpdateParams& max_push_part_num(uint64_t num) { - state.max_push_part_num = num; - return *this; - } - MetaUpdateParams& journal_entry_add(std::optional entry) { - if (entry) { - state.journal_entries_add.push_back(*entry); - } - return *this; - } - MetaUpdateParams& journal_entry_add(const rados::cls::fifo::fifo_journal_entry_t& entry) { - state.journal_entries_add.push_back(entry); - return *this; - } - MetaUpdateParams& journal_entries_rm(std::vector& entries) { - state.journal_entries_rm = entries; - return *this; - } - - int apply_update(CephContext *cct, - rados::cls::fifo::fifo_info_t *info); - }; - - static int meta_update(librados::ObjectWriteOperation *rados_op, - const MetaUpdateParams& params); - /* init part */ - - struct PartInitParams { - struct State { - string tag; - rados::cls::fifo::fifo_data_params_t data_params; - } state; - - PartInitParams& tag(const std::string& tag) { - state.tag = tag; - return *this; - } - PartInitParams& data_params(const rados::cls::fifo::fifo_data_params_t& data_params) { - state.data_params = data_params; - return *this; - } - }; - - static int part_init(librados::ObjectWriteOperation *op, - const PartInitParams& params); - - /* push part */ - - struct PushPartParams { - struct State { - string tag; - std::vector data_bufs; - uint64_t total_len{0}; - } state; - - PushPartParams& tag(const std::string& tag) { - state.tag = tag; - return *this; - } - PushPartParams& data(bufferlist& bl) { - state.total_len += bl.length(); - state.data_bufs.emplace_back(bl); - return *this; - } - PushPartParams& data_bufs(std::vector& dbs) { - for (auto& bl : dbs) { - data(bl); - } - return *this; - } - }; - - static int push_part(librados::ObjectWriteOperation *op, - const PushPartParams& params); - /* trim part */ - - struct TrimPartParams { - struct State { - std::optional tag; - uint64_t ofs; - } state; - - TrimPartParams& tag(std::optional tag) { - state.tag = tag; - return *this; - } - TrimPartParams& ofs(uint64_t ofs) { - state.ofs = ofs; - return *this; - } - }; - - static int trim_part(librados::ObjectWriteOperation *op, - const TrimPartParams& params); - /* list part */ - - struct ListPartParams { - struct State { - std::optional tag; - uint64_t ofs; - int max_entries{100}; - } state; - - ListPartParams& tag(const std::string& tag) { - state.tag = tag; - return *this; - } - ListPartParams& ofs(uint64_t ofs) { - state.ofs = ofs; - return *this; - } - ListPartParams& max_entries(int _max_entries) { - state.max_entries = _max_entries; - return *this; - } - }; - - static int list_part(librados::IoCtx& ioctx, - const string& oid, - const ListPartParams& params, - std::vector *pentries, - bool *more, - bool *full_part = nullptr, - string *ptag = nullptr); - - static int get_part_info(librados::IoCtx& ioctx, - const string& oid, - rados::cls::fifo::fifo_part_header_t *header); - }; - - struct fifo_entry { - bufferlist data; - string marker; - ceph::real_time mtime; - }; - - using fifo_part_info = rados::cls::fifo::fifo_part_header_t; - - class FIFO { - CephContext *cct; - string id; - - string meta_oid; - - std::optional _ioctx; - librados::IoCtx *ioctx{nullptr}; - - fifo_info_t meta_info; - - uint32_t part_header_size; - uint32_t part_entry_overhead; - - bool is_open{false}; - - string craft_marker(int64_t part_num, - uint64_t part_ofs); - - bool parse_marker(const string& marker, - int64_t *part_num, - uint64_t *part_ofs); - - int update_meta(ClsFIFO::MetaUpdateParams& update_params, - bool *canceled); - int do_read_meta(std::optional objv = std::nullopt); - - int create_part(int64_t part_num, const string& tag, - int64_t& max_part_num); - int remove_part(int64_t part_num, const string& tag, - int64_t& tail_part_num); - - int process_journal_entry(const fifo_journal_entry_t& entry, - int64_t& tail_part_num, - int64_t& head_part_num, - int64_t& max_part_num); - int process_journal_entries(vector *processed, - int64_t& tail_part_num, - int64_t& head_part_num, - int64_t& max_part_num); - int process_journal(); - - int prepare_new_part(bool is_head); - int prepare_new_head(); - - int push_entries(int64_t part_num, std::vector& data_bufs); - int trim_part(int64_t part_num, - uint64_t ofs, - std::optional tag); - - public: - FIFO(CephContext *_cct, - const string& _id, - librados::IoCtx *_ioctx = nullptr) : cct(_cct), - id(_id), - ioctx(_ioctx) { - meta_oid = id; - } - - int init_ioctx(librados::Rados *rados, - const string& pool, - std::optional pool_ns); - - void set_ioctx(librados::IoCtx *_ioctx) { - ioctx = ioctx; - } - - int open(bool create, - std::optional create_params = std::nullopt); - - int read_meta(std::optional objv = std::nullopt); - - const fifo_info_t& get_meta() const { - return meta_info; - } - - void get_part_layout_info(uint32_t *header_size, uint32_t *entry_overhead) { - if (header_size) { - *header_size = part_header_size; - } - - if (entry_overhead) { - *entry_overhead = part_entry_overhead; - } - } - - int push(bufferlist& bl); - int push(vector& bl); - - int list(int max_entries, - std::optional marker, - vector *result, - bool *more); - - int trim(const string& marker); - - int get_part_info(int64_t part_num, - fifo_part_info *result); - }; - } // namespace fifo - } // namespace cls -} // namespace rados diff --git a/src/cls/fifo/cls_fifo_ops.cc b/src/cls/fifo/cls_fifo_ops.cc deleted file mode 100644 index e69de29bb2d1d..0000000000000 diff --git a/src/cls/fifo/cls_fifo_ops.h b/src/cls/fifo/cls_fifo_ops.h index 2a8ceab3eaaf3..0adae1e5d39ae 100644 --- a/src/cls/fifo/cls_fifo_ops.h +++ b/src/cls/fifo/cls_fifo_ops.h @@ -16,29 +16,37 @@ #pragma once +#include +#include +#include +#include + +#include "include/buffer.h" +#include "include/encoding.h" #include "include/types.h" -#include "include/utime.h" + #include "cls/fifo/cls_fifo_types.h" -struct cls_fifo_meta_create_op +namespace rados::cls::fifo::op { +struct create_meta { - string id; - std::optional objv; + std::string id; + std::optional version; struct { - string name; - string ns; + std::string name; + std::string ns; } pool; - std::optional oid_prefix; + std::optional oid_prefix; - uint64_t max_part_size{0}; - uint64_t max_entry_size{0}; + std::uint64_t max_part_size{0}; + std::uint64_t max_entry_size{0}; bool exclusive{false}; - void encode(bufferlist &bl) const { + void encode(ceph::buffer::list& bl) const { ENCODE_START(1, 1, bl); encode(id, bl); - encode(objv, bl); + encode(version, bl); encode(pool.name, bl); encode(pool.ns, bl); encode(oid_prefix, bl); @@ -47,10 +55,10 @@ struct cls_fifo_meta_create_op encode(exclusive, bl); ENCODE_FINISH(bl); } - void decode(bufferlist::const_iterator &bl) { + void decode(ceph::buffer::list::const_iterator& bl) { DECODE_START(1, bl); decode(id, bl); - decode(objv, bl); + decode(version, bl); decode(pool.name, bl); decode(pool.ns, bl); decode(oid_prefix, bl); @@ -60,39 +68,40 @@ struct cls_fifo_meta_create_op DECODE_FINISH(bl); } }; -WRITE_CLASS_ENCODER(cls_fifo_meta_create_op) +WRITE_CLASS_ENCODER(create_meta) -struct cls_fifo_meta_get_op +struct get_meta { - std::optional objv; + std::optional version; - void encode(bufferlist &bl) const { + void encode(ceph::buffer::list& bl) const { ENCODE_START(1, 1, bl); - encode(objv, bl); + encode(version, bl); ENCODE_FINISH(bl); } - void decode(bufferlist::const_iterator &bl) { + void decode(ceph::buffer::list::const_iterator& bl) { DECODE_START(1, bl); - decode(objv, bl); + decode(version, bl); DECODE_FINISH(bl); } }; -WRITE_CLASS_ENCODER(cls_fifo_meta_get_op) +WRITE_CLASS_ENCODER(get_meta) -struct cls_fifo_meta_get_op_reply +struct get_meta_reply { - rados::cls::fifo::fifo_info_t info; - uint32_t part_header_size{0}; - uint32_t part_entry_overhead{0}; /* per entry extra data that is stored */ + fifo::info info; + std::uint32_t part_header_size{0}; + /* per entry extra data that is stored */ + std::uint32_t part_entry_overhead{0}; - void encode(bufferlist &bl) const { + void encode(ceph::buffer::list& bl) const { ENCODE_START(1, 1, bl); encode(info, bl); encode(part_header_size, bl); encode(part_entry_overhead, bl); ENCODE_FINISH(bl); } - void decode(bufferlist::const_iterator &bl) { + void decode(ceph::buffer::list::const_iterator& bl) { DECODE_START(1, bl); decode(info, bl); decode(part_header_size, bl); @@ -100,22 +109,22 @@ struct cls_fifo_meta_get_op_reply DECODE_FINISH(bl); } }; -WRITE_CLASS_ENCODER(cls_fifo_meta_get_op_reply) +WRITE_CLASS_ENCODER(get_meta_reply) -struct cls_fifo_meta_update_op +struct update_meta { - rados::cls::fifo::fifo_objv_t objv; + objv version; - std::optional tail_part_num; - std::optional head_part_num; - std::optional min_push_part_num; - std::optional max_push_part_num; - std::vector journal_entries_add; - std::vector journal_entries_rm; + std::optional tail_part_num; + std::optional head_part_num; + std::optional min_push_part_num; + std::optional max_push_part_num; + std::vector journal_entries_add; + std::vector journal_entries_rm; - void encode(bufferlist &bl) const { + void encode(ceph::buffer::list& bl) const { ENCODE_START(1, 1, bl); - encode(objv, bl); + encode(version, bl); encode(tail_part_num, bl); encode(head_part_num, bl); encode(min_push_part_num, bl); @@ -124,9 +133,9 @@ struct cls_fifo_meta_update_op encode(journal_entries_rm, bl); ENCODE_FINISH(bl); } - void decode(bufferlist::const_iterator &bl) { + void decode(ceph::buffer::list::const_iterator& bl) { DECODE_START(1, bl); - decode(objv, bl); + decode(version, bl); decode(tail_part_num, bl); decode(head_part_num, bl); decode(min_push_part_num, bl); @@ -136,42 +145,42 @@ struct cls_fifo_meta_update_op DECODE_FINISH(bl); } }; -WRITE_CLASS_ENCODER(cls_fifo_meta_update_op) +WRITE_CLASS_ENCODER(update_meta) -struct cls_fifo_part_init_op +struct init_part { - string tag; - rados::cls::fifo::fifo_data_params_t data_params; + std::string tag; + data_params params; - void encode(bufferlist &bl) const { + void encode(ceph::buffer::list& bl) const { ENCODE_START(1, 1, bl); encode(tag, bl); - encode(data_params, bl); + encode(params, bl); ENCODE_FINISH(bl); } - void decode(bufferlist::const_iterator &bl) { + void decode(ceph::buffer::list::const_iterator& bl) { DECODE_START(1, bl); decode(tag, bl); - decode(data_params, bl); + decode(params, bl); DECODE_FINISH(bl); } }; -WRITE_CLASS_ENCODER(cls_fifo_part_init_op) +WRITE_CLASS_ENCODER(init_part) -struct cls_fifo_part_push_op +struct push_part { - string tag; - std::vector data_bufs; - uint64_t total_len{0}; + std::string tag; + std::deque data_bufs; + std::uint64_t total_len{0}; - void encode(bufferlist &bl) const { + void encode(ceph::buffer::list& bl) const { ENCODE_START(1, 1, bl); encode(tag, bl); encode(data_bufs, bl); encode(total_len, bl); ENCODE_FINISH(bl); } - void decode(bufferlist::const_iterator &bl) { + void decode(ceph::buffer::list::const_iterator& bl) { DECODE_START(1, bl); decode(tag, bl); decode(data_bufs, bl); @@ -179,42 +188,42 @@ struct cls_fifo_part_push_op DECODE_FINISH(bl); } }; -WRITE_CLASS_ENCODER(cls_fifo_part_push_op) +WRITE_CLASS_ENCODER(push_part) -struct cls_fifo_part_trim_op +struct trim_part { - std::optional tag; - uint64_t ofs{0}; + std::optional tag; + std::uint64_t ofs{0}; - void encode(bufferlist &bl) const { + void encode(ceph::buffer::list& bl) const { ENCODE_START(1, 1, bl); encode(tag, bl); encode(ofs, bl); ENCODE_FINISH(bl); } - void decode(bufferlist::const_iterator &bl) { + void decode(ceph::buffer::list::const_iterator& bl) { DECODE_START(1, bl); decode(tag, bl); decode(ofs, bl); DECODE_FINISH(bl); } }; -WRITE_CLASS_ENCODER(cls_fifo_part_trim_op) +WRITE_CLASS_ENCODER(trim_part) -struct cls_fifo_part_list_op +struct list_part { std::optional tag; - uint64_t ofs{0}; + std::uint64_t ofs{0}; int max_entries{100}; - void encode(bufferlist &bl) const { + void encode(ceph::buffer::list& bl) const { ENCODE_START(1, 1, bl); encode(tag, bl); encode(ofs, bl); encode(max_entries, bl); ENCODE_FINISH(bl); } - void decode(bufferlist::const_iterator &bl) { + void decode(ceph::buffer::list::const_iterator& bl) { DECODE_START(1, bl); decode(tag, bl); decode(ofs, bl); @@ -222,17 +231,18 @@ struct cls_fifo_part_list_op DECODE_FINISH(bl); } }; -WRITE_CLASS_ENCODER(cls_fifo_part_list_op) +WRITE_CLASS_ENCODER(list_part) +inline constexpr int MAX_LIST_ENTRIES = 512; -struct cls_fifo_part_list_op_reply +struct list_part_reply { - string tag; - vector entries; + std::string tag; + std::vector entries; bool more{false}; bool full_part{false}; /* whether part is full or still can be written to. A non full part is by definition head part */ - void encode(bufferlist &bl) const { + void encode(ceph::buffer::list& bl) const { ENCODE_START(1, 1, bl); encode(tag, bl); encode(entries, bl); @@ -240,7 +250,7 @@ struct cls_fifo_part_list_op_reply encode(full_part, bl); ENCODE_FINISH(bl); } - void decode(bufferlist::const_iterator &bl) { + void decode(ceph::buffer::list::const_iterator& bl) { DECODE_START(1, bl); decode(tag, bl); decode(entries, bl); @@ -249,34 +259,45 @@ struct cls_fifo_part_list_op_reply DECODE_FINISH(bl); } }; -WRITE_CLASS_ENCODER(cls_fifo_part_list_op_reply) +WRITE_CLASS_ENCODER(list_part_reply) -struct cls_fifo_part_get_info_op +struct get_part_info { - void encode(bufferlist &bl) const { + void encode(ceph::buffer::list &bl) const { ENCODE_START(1, 1, bl); ENCODE_FINISH(bl); } - void decode(bufferlist::const_iterator &bl) { + void decode(ceph::buffer::list::const_iterator &bl) { DECODE_START(1, bl); DECODE_FINISH(bl); } }; -WRITE_CLASS_ENCODER(cls_fifo_part_get_info_op) +WRITE_CLASS_ENCODER(get_part_info) -struct cls_fifo_part_get_info_op_reply +struct get_part_info_reply { - rados::cls::fifo::fifo_part_header_t header; + part_header header; - void encode(bufferlist &bl) const { + void encode(ceph::buffer::list &bl) const { ENCODE_START(1, 1, bl); encode(header, bl); ENCODE_FINISH(bl); } - void decode(bufferlist::const_iterator &bl) { + void decode(ceph::buffer::list::const_iterator &bl) { DECODE_START(1, bl); decode(header, bl); DECODE_FINISH(bl); } }; -WRITE_CLASS_ENCODER(cls_fifo_part_get_info_op_reply) +WRITE_CLASS_ENCODER(get_part_info_reply) + +inline constexpr auto CLASS = "fifo"; +inline constexpr auto CREATE_META = "create_meta"; +inline constexpr auto GET_META = "get_meta"; +inline constexpr auto UPDATE_META = "update_meta"; +inline constexpr auto INIT_PART = "init_part"; +inline constexpr auto PUSH_PART = "push_part"; +inline constexpr auto TRIM_PART = "trim_part"; +inline constexpr auto LIST_PART = "part_list"; +inline constexpr auto GET_PART_INFO = "get_part_info"; +} // namespace rados::cls::fifo::op diff --git a/src/cls/fifo/cls_fifo_types.cc b/src/cls/fifo/cls_fifo_types.cc deleted file mode 100644 index b182551e3a856..0000000000000 --- a/src/cls/fifo/cls_fifo_types.cc +++ /dev/null @@ -1,90 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab - -/* - * Ceph - scalable distributed file system - * - * Copyright (C) 2019 Red Hat, Inc. - * - * This is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software - * Foundation. See file COPYING. - * - */ - -#include "cls_fifo_types.h" - -string rados::cls::fifo::fifo_info_t::part_oid(int64_t part_num) -{ - char buf[oid_prefix.size() + 32]; - snprintf(buf, sizeof(buf), "%s.%lld", oid_prefix.c_str(), (long long)part_num); - - return string(buf); -} - -void rados::cls::fifo::fifo_info_t::prepare_next_journal_entry(fifo_journal_entry_t *entry, const string& tag) -{ - entry->op = fifo_journal_entry_t::Op::OP_CREATE; - entry->part_num = max_push_part_num + 1; - entry->part_tag = tag; -} - -int rados::cls::fifo::fifo_info_t::apply_update(std::optional& _tail_part_num, - std::optional& _head_part_num, - std::optional& _min_push_part_num, - std::optional& _max_push_part_num, - std::vector& journal_entries_add, - std::vector& journal_entries_rm, - string *err) -{ - if (_tail_part_num) { - tail_part_num = *_tail_part_num; - } - - if (_min_push_part_num) { - min_push_part_num = *_min_push_part_num; - } - - if (_max_push_part_num) { - max_push_part_num = *_max_push_part_num; - } - - for (auto& entry : journal_entries_add) { - auto iter = journal.find(entry.part_num); - if (iter != journal.end() && - iter->second.op == entry.op) { - /* don't allow multiple concurrent (same) operations on the same part, - racing clients should use objv to avoid races anyway */ - if (err) { - stringstream ss; - ss << "multiple concurrent operations on same part are not allowed, part num=" << entry.part_num; - *err = ss.str(); - } - return -EINVAL; - } - - if (entry.op == fifo_journal_entry_t::Op::OP_CREATE) { - tags[entry.part_num] = entry.part_tag; - } - - journal.insert(std::pair(entry.part_num, std::move(entry))); - } - - for (auto& entry : journal_entries_rm) { - journal.erase(entry.part_num); - } - - if (_head_part_num) { - tags.erase(head_part_num); - head_part_num = *_head_part_num; - auto iter = tags.find(head_part_num); - if (iter != tags.end()) { - head_tag = iter->second; - } else { - head_tag.erase(); - } - } - - return 0; -} diff --git a/src/cls/fifo/cls_fifo_types.h b/src/cls/fifo/cls_fifo_types.h index bdc1ed773edf8..749f66e7b96e6 100644 --- a/src/cls/fifo/cls_fifo_types.h +++ b/src/cls/fifo/cls_fifo_types.h @@ -15,260 +15,510 @@ #pragma once - +#include +#include +#include +#include +#include +#include + +#undef FMT_HEADER_ONLY +#define FMT_HEADER_ONLY 1 +#include + +#include "include/buffer.h" #include "include/encoding.h" #include "include/types.h" +#include "common/ceph_time.h" class JSONObj; -namespace rados { - namespace cls { - namespace fifo { - struct fifo_objv_t { - string instance; - uint64_t ver{0}; - - void encode(bufferlist &bl) const { - ENCODE_START(1, 1, bl); - encode(instance, bl); - encode(ver, bl); - ENCODE_FINISH(bl); - } - void decode(bufferlist::const_iterator &bl) { - DECODE_START(1, bl); - decode(instance, bl); - decode(ver, bl); - DECODE_FINISH(bl); - } - void dump(Formatter *f) const; - void decode_json(JSONObj *obj); - - bool operator==(const fifo_objv_t& rhs) const { - return (instance == rhs.instance && - ver == rhs.ver); - } - - bool empty() const { - return instance.empty(); - } - - string to_str() { - char buf[instance.size() + 32]; - snprintf(buf, sizeof(buf), "%s{%lld}", instance.c_str(), (long long)ver); - return string(buf); - } - }; - WRITE_CLASS_ENCODER(rados::cls::fifo::fifo_objv_t) - - struct fifo_data_params_t { - uint64_t max_part_size{0}; - uint64_t max_entry_size{0}; - uint64_t full_size_threshold{0}; - - void encode(bufferlist &bl) const { - ENCODE_START(1, 1, bl); - encode(max_part_size, bl); - encode(max_entry_size, bl); - encode(full_size_threshold, bl); - ENCODE_FINISH(bl); - } - void decode(bufferlist::const_iterator &bl) { - DECODE_START(1, bl); - decode(max_part_size, bl); - decode(max_entry_size, bl); - decode(full_size_threshold, bl); - DECODE_FINISH(bl); - } - void dump(Formatter *f) const; - void decode_json(JSONObj *obj); - - bool operator==(const fifo_data_params_t& rhs) const { - return (max_part_size == rhs.max_part_size && - max_entry_size == rhs.max_entry_size && - full_size_threshold == rhs.full_size_threshold); - } - }; - WRITE_CLASS_ENCODER(rados::cls::fifo::fifo_data_params_t) - - struct fifo_journal_entry_t { - enum Op { - OP_UNKNOWN = 0, - OP_CREATE = 1, - OP_SET_HEAD = 2, - OP_REMOVE = 3, - } op{OP_UNKNOWN}; - - int64_t part_num{0}; - string part_tag; - - void encode(bufferlist &bl) const { - ENCODE_START(1, 1, bl); - encode((int)op, bl); - encode(part_num, bl); - encode(part_tag, bl); - ENCODE_FINISH(bl); - } - void decode(bufferlist::const_iterator &bl) { - DECODE_START(1, bl); - int i; - decode(i, bl); - op = (Op)i; - decode(part_num, bl); - decode(part_tag, bl); - DECODE_FINISH(bl); - } - void dump(Formatter *f) const; - - bool operator==(const fifo_journal_entry_t& e) { - return (op == e.op && - part_num == e.part_num && - part_tag == e.part_tag); - } - }; - WRITE_CLASS_ENCODER(rados::cls::fifo::fifo_journal_entry_t) - - struct fifo_info_t { - string id; - fifo_objv_t objv; - string oid_prefix; - fifo_data_params_t data_params; - - int64_t tail_part_num{0}; - int64_t head_part_num{-1}; - int64_t min_push_part_num{0}; - int64_t max_push_part_num{-1}; - - string head_tag; - map tags; - - std::multimap journal; - - bool need_new_head() { - return (head_part_num < min_push_part_num); - } - - bool need_new_part() { - return (max_push_part_num < min_push_part_num); - } - - string part_oid(int64_t part_num); - void prepare_next_journal_entry(fifo_journal_entry_t *entry, const string& tag); - - int apply_update(std::optional& _tail_part_num, - std::optional& _head_part_num, - std::optional& _min_push_part_num, - std::optional& _max_push_part_num, - std::vector& journal_entries_add, - std::vector& journal_entries_rm, - string *err); - - void encode(bufferlist &bl) const { - ENCODE_START(1, 1, bl); - encode(id, bl); - encode(objv, bl); - encode(oid_prefix, bl); - encode(data_params, bl); - encode(tail_part_num, bl); - encode(head_part_num, bl); - encode(min_push_part_num, bl); - encode(max_push_part_num, bl); - encode(tags, bl); - encode(head_tag, bl); - encode(journal, bl); - ENCODE_FINISH(bl); - } - void decode(bufferlist::const_iterator &bl) { - DECODE_START(1, bl); - decode(id, bl); - decode(objv, bl); - decode(oid_prefix, bl); - decode(data_params, bl); - decode(tail_part_num, bl); - decode(head_part_num, bl); - decode(min_push_part_num, bl); - decode(max_push_part_num, bl); - decode(tags, bl); - decode(head_tag, bl); - decode(journal, bl); - DECODE_FINISH(bl); - } - void dump(Formatter *f) const; - void decode_json(JSONObj *obj); - }; - WRITE_CLASS_ENCODER(rados::cls::fifo::fifo_info_t) - - struct cls_fifo_part_list_entry_t { - bufferlist data; - uint64_t ofs; - ceph::real_time mtime; - - cls_fifo_part_list_entry_t() {} - cls_fifo_part_list_entry_t(bufferlist&& _data, - uint64_t _ofs, - ceph::real_time _mtime) : data(std::move(_data)), ofs(_ofs), mtime(_mtime) {} - - - void encode(bufferlist &bl) const { - ENCODE_START(1, 1, bl); - encode(data, bl); - encode(ofs, bl); - encode(mtime, bl); - ENCODE_FINISH(bl); - } - void decode(bufferlist::const_iterator &bl) { - DECODE_START(1, bl); - decode(data, bl); - decode(ofs, bl); - decode(mtime, bl); - DECODE_FINISH(bl); - } - }; - WRITE_CLASS_ENCODER(rados::cls::fifo::cls_fifo_part_list_entry_t) - - struct fifo_part_header_t { - string tag; - - fifo_data_params_t params; - - uint64_t magic{0}; - - uint64_t min_ofs{0}; - uint64_t max_ofs{0}; - uint64_t min_index{0}; - uint64_t max_index{0}; - - void encode(bufferlist &bl) const { - ENCODE_START(1, 1, bl); - encode(tag, bl); - encode(params, bl); - encode(magic, bl); - encode(min_ofs, bl); - encode(max_ofs, bl); - encode(min_index, bl); - encode(max_index, bl); - ENCODE_FINISH(bl); - } - void decode(bufferlist::const_iterator &bl) { - DECODE_START(1, bl); - decode(tag, bl); - decode(params, bl); - decode(magic, bl); - decode(min_ofs, bl); - decode(max_ofs, bl); - decode(min_index, bl); - decode(max_index, bl); - DECODE_FINISH(bl); - } - }; - WRITE_CLASS_ENCODER(fifo_part_header_t) +namespace rados::cls::fifo { +struct objv { + std::string instance; + std::uint64_t ver{0}; + + void encode(ceph::buffer::list& bl) const { + ENCODE_START(1, 1, bl); + encode(instance, bl); + encode(ver, bl); + ENCODE_FINISH(bl); + } + void decode(ceph::buffer::list::const_iterator& bl) { + DECODE_START(1, bl); + decode(instance, bl); + decode(ver, bl); + DECODE_FINISH(bl); + } + void dump(ceph::Formatter* f) const; + void decode_json(JSONObj* obj); + + bool operator ==(const objv& rhs) const { + return (instance == rhs.instance && + ver == rhs.ver); + } + bool operator !=(const objv& rhs) const { + return (instance != rhs.instance || + ver != rhs.ver); + } + bool same_or_later(const objv& rhs) const { + return (instance == rhs.instance || + ver >= rhs.ver); + } + + bool empty() const { + return instance.empty(); + } + + std::string to_str() const { + return fmt::format("{}{{{}}}", instance, ver); + } +}; +WRITE_CLASS_ENCODER(objv) +inline ostream& operator <<(std::ostream& os, const objv& objv) +{ + return os << objv.to_str(); +} + +struct data_params { + std::uint64_t max_part_size{0}; + std::uint64_t max_entry_size{0}; + std::uint64_t full_size_threshold{0}; + + void encode(ceph::buffer::list& bl) const { + ENCODE_START(1, 1, bl); + encode(max_part_size, bl); + encode(max_entry_size, bl); + encode(full_size_threshold, bl); + ENCODE_FINISH(bl); + } + void decode(ceph::buffer::list::const_iterator& bl) { + DECODE_START(1, bl); + decode(max_part_size, bl); + decode(max_entry_size, bl); + decode(full_size_threshold, bl); + DECODE_FINISH(bl); + } + void dump(ceph::Formatter* f) const; + void decode_json(JSONObj* obj); + + bool operator ==(const data_params& rhs) const { + return (max_part_size == rhs.max_part_size && + max_entry_size == rhs.max_entry_size && + full_size_threshold == rhs.full_size_threshold); + } +}; +WRITE_CLASS_ENCODER(data_params) +inline std::ostream& operator <<(std::ostream& m, const data_params& d) { + return m << "max_part_size: " << d.max_part_size << ", " + << "max_entry_size: " << d.max_entry_size << ", " + << "full_size_threshold: " << d.full_size_threshold; +} + +struct journal_entry { + enum class Op { + unknown = 0, + create = 1, + set_head = 2, + remove = 3, + } op{Op::unknown}; + + std::int64_t part_num{0}; + std::string part_tag; + + void encode(ceph::buffer::list& bl) const { + ENCODE_START(1, 1, bl); + encode((int)op, bl); + encode(part_num, bl); + encode(part_tag, bl); + ENCODE_FINISH(bl); + } + void decode(ceph::buffer::list::const_iterator& bl) { + DECODE_START(1, bl); + int i; + decode(i, bl); + op = static_cast(i); + decode(part_num, bl); + decode(part_tag, bl); + DECODE_FINISH(bl); + } + void dump(ceph::Formatter* f) const; + + bool operator ==(const journal_entry& e) { + return (op == e.op && + part_num == e.part_num && + part_tag == e.part_tag); + } +}; +WRITE_CLASS_ENCODER(journal_entry) +inline std::ostream& operator <<(std::ostream& m, const journal_entry::Op& o) { + switch (o) { + case journal_entry::Op::unknown: + return m << "Op::unknown"; + case journal_entry::Op::create: + return m << "Op::create"; + case journal_entry::Op::set_head: + return m << "Op::set_head"; + case journal_entry::Op::remove: + return m << "Op::remove"; + } + return m << "Bad value: " << static_cast(o); +} +inline std::ostream& operator <<(std::ostream& m, const journal_entry& j) { + return m << "op: " << j.op << ", " + << "part_num: " << j.part_num << ", " + << "part_tag: " << j.part_tag; +} + +// This is actually a useful builder, since otherwise we end up with +// four uint64_ts in a row and only care about a subset at a time. +class update { + std::optional tail_part_num_; + std::optional head_part_num_; + std::optional min_push_part_num_; + std::optional max_push_part_num_; + std::vector journal_entries_add_; + std::vector journal_entries_rm_; + +public: + + update&& tail_part_num(std::optional num) noexcept { + tail_part_num_ = num; + return std::move(*this); + } + auto tail_part_num() const noexcept { + return tail_part_num_; + } + + update&& head_part_num(std::optional num) noexcept { + head_part_num_ = num; + return std::move(*this); + } + auto head_part_num() const noexcept { + return head_part_num_; + } + + update&& min_push_part_num(std::optional num) + noexcept { + min_push_part_num_ = num; + return std::move(*this); + } + auto min_push_part_num() const noexcept { + return min_push_part_num_; + } + + update&& max_push_part_num(std::optional num) noexcept { + max_push_part_num_ = num; + return std::move(*this); + } + auto max_push_part_num() const noexcept { + return max_push_part_num_; + } + + update&& journal_entry_add(fifo::journal_entry entry) { + journal_entries_add_.push_back(std::move(entry)); + return std::move(*this); + } + update&& journal_entries_add( + std::optional>&& entries) { + if (entries) { + journal_entries_add_ = std::move(*entries); + } else { + journal_entries_add_.clear(); + } + return std::move(*this); + } + const auto& journal_entries_add() const & noexcept { + return journal_entries_add_; + } + auto&& journal_entries_add() && noexcept { + return std::move(journal_entries_add_); + } + + update&& journal_entry_rm(fifo::journal_entry entry) { + journal_entries_rm_.push_back(std::move(entry)); + return std::move(*this); + } + update&& journal_entries_rm( + std::optional>&& entries) { + if (entries) { + journal_entries_rm_ = std::move(*entries); + } else { + journal_entries_rm_.clear(); + } + return std::move(*this); + } + const auto& journal_entries_rm() const & noexcept { + return journal_entries_rm_; + } + auto&& journal_entries_rm() && noexcept { + return std::move(journal_entries_rm_); + } + friend std::ostream& operator <<(std::ostream& m, const update& u); +}; +inline std::ostream& operator <<(std::ostream& m, const update& u) { + bool prev = false; + if (u.tail_part_num_) { + m << "tail_part_num: " << *u.tail_part_num_; + prev = true; + } + if (u.head_part_num_) { + if (prev) + m << ", "; + m << "head_part_num: " << *u.head_part_num_; + prev = true; + } + if (u.min_push_part_num_) { + if (prev) + m << ", "; + m << "min_push_part_num: " << *u.min_push_part_num_; + prev = true; + } + if (u.max_push_part_num_) { + if (prev) + m << ", "; + m << "max_push_part_num: " << *u.max_push_part_num_; + prev = true; + } + if (!u.journal_entries_add_.empty()) { + if (prev) + m << ", "; + m << "journal_entries_add: {" << u.journal_entries_add_ << "}"; + prev = true; + } + if (!u.journal_entries_rm_.empty()) { + if (prev) + m << ", "; + m << "journal_entries_rm: {" << u.journal_entries_rm_ << "}"; + prev = true; + } + if (!prev) + m << "(none)"; + return m; +} +struct info { + std::string id; + objv version; + std::string oid_prefix; + data_params params; + + std::int64_t tail_part_num{0}; + std::int64_t head_part_num{-1}; + std::int64_t min_push_part_num{0}; + std::int64_t max_push_part_num{-1}; + + std::string head_tag; + std::map tags; + + std::multimap journal; + + bool need_new_head() const { + return (head_part_num < min_push_part_num); + } + + bool need_new_part() const { + return (max_push_part_num < min_push_part_num); + } + + void encode(ceph::buffer::list& bl) const { + ENCODE_START(1, 1, bl); + encode(id, bl); + encode(version, bl); + encode(oid_prefix, bl); + encode(params, bl); + encode(tail_part_num, bl); + encode(head_part_num, bl); + encode(min_push_part_num, bl); + encode(max_push_part_num, bl); + encode(tags, bl); + encode(head_tag, bl); + encode(journal, bl); + ENCODE_FINISH(bl); + } + void decode(ceph::buffer::list::const_iterator& bl) { + DECODE_START(1, bl); + decode(id, bl); + decode(version, bl); + decode(oid_prefix, bl); + decode(params, bl); + decode(tail_part_num, bl); + decode(head_part_num, bl); + decode(min_push_part_num, bl); + decode(max_push_part_num, bl); + decode(tags, bl); + decode(head_tag, bl); + decode(journal, bl); + DECODE_FINISH(bl); + } + void dump(ceph::Formatter* f) const; + void decode_json(JSONObj* obj); + + std::string part_oid(std::int64_t part_num) const { + return fmt::format("{}.{}", oid_prefix, part_num); + } + + journal_entry next_journal_entry(std::string tag) const { + journal_entry entry; + entry.op = journal_entry::Op::create; + entry.part_num = max_push_part_num + 1; + entry.part_tag = std::move(tag); + return entry; + } + + std::optional + apply_update(const update& update) { + if (update.tail_part_num()) { + tail_part_num = *update.tail_part_num(); + } + + if (update.min_push_part_num()) { + min_push_part_num = *update.min_push_part_num(); + } + + if (update.max_push_part_num()) { + max_push_part_num = *update.max_push_part_num(); + } + + for (const auto& entry : update.journal_entries_add()) { + auto iter = journal.find(entry.part_num); + if (iter != journal.end() && + iter->second.op == entry.op) { + /* don't allow multiple concurrent (same) operations on the same part, + racing clients should use objv to avoid races anyway */ + return fmt::format("multiple concurrent operations on same part are not " + "allowed, part num={}", entry.part_num); + } + + if (entry.op == journal_entry::Op::create) { + tags[entry.part_num] = entry.part_tag; + } + + journal.emplace(entry.part_num, entry); } + + for (const auto& entry : update.journal_entries_rm()) { + journal.erase(entry.part_num); + } + + if (update.head_part_num()) { + tags.erase(head_part_num); + head_part_num = *update.head_part_num(); + auto iter = tags.find(head_part_num); + if (iter != tags.end()) { + head_tag = iter->second; + } else { + head_tag.erase(); + } + } + + return std::nullopt; } +}; +WRITE_CLASS_ENCODER(info) +inline std::ostream& operator <<(std::ostream& m, const info& i) { + return m << "id: " << i.id << ", " + << "version: " << i.version << ", " + << "oid_prefix: " << i.oid_prefix << ", " + << "params: {" << i.params << "}, " + << "tail_part_num: " << i.tail_part_num << ", " + << "head_part_num: " << i.head_part_num << ", " + << "min_push_part_num: " << i.min_push_part_num << ", " + << "max_push_part_num: " << i.max_push_part_num << ", " + << "head_tag: " << i.head_tag << ", " + << "tags: {" << i.tags << "}, " + << "journal: {" << i.journal; } -static inline ostream& operator<<(ostream& os, const rados::cls::fifo::fifo_objv_t& objv) -{ - return os << objv.instance << "{" << objv.ver << "}"; +struct part_list_entry { + ceph::buffer::list data; + std::uint64_t ofs = 0; + ceph::real_time mtime; + + part_list_entry() {} + part_list_entry(ceph::buffer::list&& data, + uint64_t ofs, + ceph::real_time mtime) + : data(std::move(data)), ofs(ofs), mtime(mtime) {} + + + void encode(ceph::buffer::list& bl) const { + ENCODE_START(1, 1, bl); + encode(data, bl); + encode(ofs, bl); + encode(mtime, bl); + ENCODE_FINISH(bl); + } + void decode(ceph::buffer::list::const_iterator& bl) { + DECODE_START(1, bl); + decode(data, bl); + decode(ofs, bl); + decode(mtime, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(part_list_entry) +inline std::ostream& operator <<(std::ostream& m, + const part_list_entry& p) { + using ceph::operator <<; + return m << "data: " << p.data << ", " + << "ofs: " << p.ofs << ", " + << "mtime: " << p.mtime; } +struct part_header { + std::string tag; + + data_params params; + + std::uint64_t magic{0}; + + std::uint64_t min_ofs{0}; + std::uint64_t last_ofs{0}; + std::uint64_t next_ofs{0}; + std::uint64_t min_index{0}; + std::uint64_t max_index{0}; + ceph::real_time max_time; + + void encode(ceph::buffer::list& bl) const { + ENCODE_START(1, 1, bl); + encode(tag, bl); + encode(params, bl); + encode(magic, bl); + encode(min_ofs, bl); + encode(last_ofs, bl); + encode(next_ofs, bl); + encode(min_index, bl); + encode(max_index, bl); + encode(max_time, bl); + ENCODE_FINISH(bl); + } + void decode(ceph::buffer::list::const_iterator& bl) { + DECODE_START(1, bl); + decode(tag, bl); + decode(params, bl); + decode(magic, bl); + decode(min_ofs, bl); + decode(last_ofs, bl); + decode(next_ofs, bl); + decode(min_index, bl); + decode(max_index, bl); + decode(max_time, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(part_header) +inline std::ostream& operator <<(std::ostream& m, const part_header& p) { + using ceph::operator <<; + return m << "tag: " << p.tag << ", " + << "params: {" << p.params << "}, " + << "magic: " << p.magic << ", " + << "min_ofs: " << p.min_ofs << ", " + << "last_ofs: " << p.last_ofs << ", " + << "next_ofs: " << p.next_ofs << ", " + << "min_index: " << p.min_index << ", " + << "max_index: " << p.max_index << ", " + << "max_time: " << p.max_time; +} +} // namespace rados::cls::fifo diff --git a/src/common/options.cc b/src/common/options.cc index 919241515b9fd..b94a4715174ee 100644 --- a/src/common/options.cc +++ b/src/common/options.cc @@ -3306,11 +3306,11 @@ std::vector