install(TARGETS cls_cas DESTINATION ${cls_dir})
+
#cls_queue
set(cls_queue_srcs
queue/cls_queue.cc
add_library(cls_rgw_gc_client STATIC ${cls_rgw_gc_client_srcs})
endif (WITH_RADOSGW)
+
#cls_2pc_queue
set(cls_2pc_queue_srcs
2pc_queue/cls_2pc_queue.cc
INSTALL_RPATH ""
CXX_VISIBILITY_PRESET hidden)
install(TARGETS cls_2pc_queue DESTINATION ${cls_dir})
-
set(cls_2pc_queue_client_srcs
2pc_queue/cls_2pc_queue_client.cc)
add_library(cls_2pc_queue_client STATIC ${cls_2pc_queue_client_srcs})
+
add_subdirectory(cmpomap)
+
+# cls_fifo
+set(cls_fifo_srcs fifo/cls_fifo.cc fifo/cls_fifo_types.cc)
+add_library(cls_fifo SHARED ${cls_fifo_srcs})
+set_target_properties(cls_fifo PROPERTIES
+ VERSION "1.0.0"
+ SOVERSION "1"
+ INSTALL_RPATH ""
+ CXX_VISIBILITY_PRESET hidden)
+install(TARGETS cls_fifo DESTINATION ${cls_dir})
+
+set(cls_fifo_client_srcs
+ fifo/cls_fifo_client.cc
+ fifo/cls_fifo_types.cc
+ fifo/cls_fifo_ops.cc)
+add_library(cls_fifo_client STATIC ${cls_fifo_client_srcs})
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+/** \file
+ *
+ * This is an OSD class that implements methods for management
+ * and use of fifo
+ *
+ */
+
+#include <errno.h>
+
+#include "objclass/objclass.h"
+
+#include "cls/fifo/cls_fifo_ops.h"
+#include "cls/fifo/cls_fifo_types.h"
+
+
+using namespace rados::cls::fifo;
+
+
+CLS_VER(1,0)
+CLS_NAME(fifo)
+
+
+#define CLS_FIFO_MAX_PART_HEADER_SIZE 512
+
+static uint32_t part_entry_overhead;
+
+struct cls_fifo_entry_header_pre {
+ __le64 magic;
+ __le64 pre_size;
+ __le64 header_size;
+ __le64 data_size;
+ __le64 index;
+ __le32 reserved;
+} __attribute__ ((packed));
+
+struct cls_fifo_entry_header {
+ ceph::real_time mtime;
+
+ void encode(bufferlist &bl) const {
+ ENCODE_START(1, 1, bl);
+ encode(mtime, bl);
+ ENCODE_FINISH(bl);
+ }
+ void decode(bufferlist::const_iterator &bl) {
+ DECODE_START(1, bl);
+ decode(mtime, bl);
+ DECODE_FINISH(bl);
+ }
+};
+WRITE_CLASS_ENCODER(cls_fifo_entry_header)
+
+
+static string new_oid_prefix(string id, std::optional<string>& val)
+{
+ if (val) {
+ return *val;
+ }
+
+#define PREFIX_RND_SIZE 12
+
+ char buf[PREFIX_RND_SIZE + 1];
+ buf[PREFIX_RND_SIZE] = 0;
+
+ cls_gen_rand_base64(buf, sizeof(buf) - 1);
+
+ char s[id.size() + 1 + sizeof(buf) + 16];
+ snprintf(s, sizeof(s), "%s.%s", id.c_str(), buf);
+ return s;
+}
+
+static int write_header(cls_method_context_t hctx,
+ fifo_info_t& header,
+ bool inc_ver = true)
+{
+ if (header.objv.instance.empty()) {
+#define HEADER_INSTANCE_SIZE 16
+ char buf[HEADER_INSTANCE_SIZE + 1];
+ buf[HEADER_INSTANCE_SIZE] = 0;
+ cls_gen_rand_base64(buf, sizeof(buf) - 1);
+
+ header.objv.instance = buf;
+ }
+ if (inc_ver) {
+ ++header.objv.ver;
+ }
+ bufferlist bl;
+ encode(header, bl);
+ return cls_cxx_write_full(hctx, &bl);
+}
+
+static int read_part_header(cls_method_context_t hctx,
+ fifo_part_header_t *part_header)
+{
+ bufferlist bl;
+ int r = cls_cxx_read2(hctx, 0, CLS_FIFO_MAX_PART_HEADER_SIZE, &bl, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
+ if (r < 0) {
+ CLS_ERR("ERROR: %s(): cls_cxx_read2() on obj returned %d", __func__, r);
+ return r;
+ }
+
+ auto iter = bl.cbegin();
+ try {
+ decode(*part_header, iter);
+ } catch (buffer::error& err) {
+ CLS_ERR("ERROR: %s(): failed decoding part header", __func__);
+ return -EIO;
+ }
+
+ CLS_LOG(20, "%s():%d read part_header:\n"
+ "\ttag=%s\n"
+ "\tmagic=0x%llx\n"
+ "\tmin_ofs=%lld\n"
+ "\tmax_ofs=%lld\n"
+ "\tmin_index=%lld\n"
+ "\tmax_index=%lld\n",
+ __func__, __LINE__,
+ part_header->tag.c_str(),
+ (long long)part_header->magic,
+ (long long)part_header->min_ofs,
+ (long long)part_header->max_ofs,
+ (long long)part_header->min_index,
+ (long long)part_header->max_index);
+
+ return 0;
+
+}
+
+static int write_part_header(cls_method_context_t hctx,
+ fifo_part_header_t& part_header)
+{
+ bufferlist bl;
+ encode(part_header, bl);
+
+ if (bl.length() > CLS_FIFO_MAX_PART_HEADER_SIZE) {
+ CLS_LOG(10, "%s(): cannot write part header, buffer exceeds max size", __func__);
+ return -EIO;
+ }
+
+ int r = cls_cxx_write2(hctx, 0, bl.length(),
+ &bl, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
+ if (r < 0) {
+ CLS_LOG(10, "%s(): failed to write part header: r=%d",
+ __func__, r);
+ return r;
+ }
+
+ return 0;
+}
+
+static int read_header(cls_method_context_t hctx,
+ std::optional<fifo_objv_t> objv,
+ fifo_info_t *info)
+{
+ uint64_t size;
+
+ int r = cls_cxx_stat2(hctx, &size, nullptr);
+ if (r < 0) {
+ CLS_ERR("ERROR: %s(): cls_cxx_stat2() on obj returned %d", __func__, r);
+ return r;
+ }
+
+ bufferlist bl;
+ r = cls_cxx_read2(hctx, 0, size, &bl, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
+ if (r < 0) {
+ CLS_ERR("ERROR: %s(): cls_cxx_read2() on obj returned %d", __func__, r);
+ return r;
+ }
+
+ try {
+ auto iter = bl.cbegin();
+ decode(*info, iter);
+ } catch (buffer::error& err) {
+ CLS_ERR("ERROR: %s(): failed decoding header", __func__);
+ return -EIO;
+ }
+
+ if (objv &&
+ !(info->objv == *objv)) {
+ string s1 = info->objv.to_str();
+ string s2 = objv->to_str();
+ CLS_LOG(10, "%s(): version mismatch (header=%s, req=%s), cancelled operation", __func__, s1.c_str(), s2.c_str());
+ return -ECANCELED;
+ }
+
+ return 0;
+}
+
+static int fifo_meta_create_op(cls_method_context_t hctx,
+ bufferlist *in, bufferlist *out)
+{
+ CLS_LOG(20, "%s", __func__);
+
+ cls_fifo_meta_create_op op;
+ try {
+ auto iter = in->cbegin();
+ decode(op, iter);
+ } catch (const buffer::error &err) {
+ CLS_ERR("ERROR: %s(): failed to decode request", __func__);
+ return -EINVAL;
+ }
+
+ uint64_t size;
+
+ int r = cls_cxx_stat2(hctx, &size, nullptr);
+ if (r < 0 && r != -ENOENT) {
+ CLS_ERR("ERROR: %s(): cls_cxx_stat2() on obj returned %d", __func__, r);
+ return r;
+ }
+ if (op.exclusive && r == 0) {
+ CLS_LOG(10, "%s(): exclusive create but queue already exists", __func__);
+ return -EEXIST;
+ }
+
+ if (r == 0) {
+ bufferlist bl;
+ r = cls_cxx_read2(hctx, 0, size, &bl, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
+ if (r < 0) {
+ CLS_ERR("ERROR: %s(): cls_cxx_read2() on obj returned %d", __func__, r);
+ return r;
+ }
+
+ fifo_info_t header;
+ try {
+ auto iter = bl.cbegin();
+ decode(header, iter);
+ } catch (buffer::error& err) {
+ CLS_ERR("ERROR: %s(): failed decoding header", __func__);
+ return -EIO;
+ }
+
+ if (!(header.id == op.id &&
+ (!op.oid_prefix ||
+ header.oid_prefix == *op.oid_prefix) &&
+ (!op.objv ||
+ header.objv == *op.objv))) {
+ CLS_LOG(10, "%s(): failed to re-create existing queue with different params", __func__);
+ return -EEXIST;
+ }
+
+ return 0; /* already exists */
+ }
+ fifo_info_t header;
+
+ header.id = op.id;
+ if (op.objv) {
+ header.objv = *op.objv;
+ } else {
+#define DEFAULT_INSTANCE_SIZE 16
+ char buf[DEFAULT_INSTANCE_SIZE + 1];
+ cls_gen_rand_base64(buf, sizeof(buf));
+ buf[DEFAULT_INSTANCE_SIZE] = '\0';
+ header.objv.instance = buf;
+ header.objv.ver = 1;
+ }
+ header.oid_prefix = new_oid_prefix(op.id, op.oid_prefix);
+
+ header.data_params.max_part_size = op.max_part_size;
+ header.data_params.max_entry_size = op.max_entry_size;
+ header.data_params.full_size_threshold = op.max_part_size - op.max_entry_size - part_entry_overhead;
+
+ r = write_header(hctx, header, false);
+ if (r < 0) {
+ CLS_LOG(10, "%s(): failed to write header: r=%d", __func__, r);
+ return r;
+ }
+
+ return 0;
+}
+
+static int fifo_meta_update_op(cls_method_context_t hctx,
+ bufferlist *in, bufferlist *out)
+{
+ CLS_LOG(20, "%s", __func__);
+
+ cls_fifo_meta_update_op op;
+ try {
+ auto iter = in->cbegin();
+ decode(op, iter);
+ } catch (const buffer::error &err) {
+ CLS_ERR("ERROR: %s(): failed to decode request", __func__);
+ return -EINVAL;
+ }
+
+ fifo_info_t header;
+
+ int r = read_header(hctx, op.objv, &header);
+ if (r < 0) {
+ return r;
+ }
+
+ string err;
+
+ r = header.apply_update(op.tail_part_num,
+ op.head_part_num,
+ op.min_push_part_num,
+ op.max_push_part_num,
+ op.journal_entries_add,
+ op.journal_entries_rm,
+ &err);
+ if (r < 0) {
+ CLS_LOG(10, "%s(): %s", __func__, err.c_str());
+ return r;
+ }
+
+ r = write_header(hctx, header);
+ if (r < 0) {
+ CLS_LOG(10, "%s(): failed to write header: r=%d", __func__, r);
+ return r;
+ }
+
+ return 0;
+}
+
+static int fifo_meta_get_op(cls_method_context_t hctx,
+ bufferlist *in, bufferlist *out)
+{
+ CLS_LOG(20, "%s", __func__);
+
+ cls_fifo_meta_get_op op;
+ try {
+ auto iter = in->cbegin();
+ decode(op, iter);
+ } catch (const buffer::error &err) {
+ CLS_ERR("ERROR: %s(): failed to decode request", __func__);
+ return -EINVAL;
+ }
+
+ cls_fifo_meta_get_op_reply reply;
+ int r = read_header(hctx, op.objv, &reply.info);
+ if (r < 0) {
+ return r;
+ }
+
+ reply.part_header_size = CLS_FIFO_MAX_PART_HEADER_SIZE;
+ reply.part_entry_overhead = part_entry_overhead;
+
+ encode(reply, *out);
+
+ return 0;
+}
+
+static int fifo_part_init_op(cls_method_context_t hctx,
+ bufferlist *in, bufferlist *out)
+{
+ CLS_LOG(20, "%s", __func__);
+
+ cls_fifo_part_init_op op;
+ try {
+ auto iter = in->cbegin();
+ decode(op, iter);
+ } catch (const buffer::error &err) {
+ CLS_ERR("ERROR: %s(): failed to decode request", __func__);
+ return -EINVAL;
+ }
+
+ uint64_t size;
+
+ int r = cls_cxx_stat2(hctx, &size, nullptr);
+ if (r < 0 && r != -ENOENT) {
+ CLS_ERR("ERROR: %s(): cls_cxx_stat2() on obj returned %d", __func__, r);
+ return r;
+ }
+ if (r == 0 && size > 0) {
+ fifo_part_header_t part_header;
+ r = read_part_header(hctx, &part_header);
+ if (r < 0) {
+ CLS_LOG(10, "%s(): failed to read part header", __func__);
+ return r;
+ }
+
+ if (!(part_header.tag == op.tag &&
+ part_header.params == op.data_params)) {
+ CLS_LOG(10, "%s(): failed to re-create existing part with different params", __func__);
+ return -EEXIST;
+ }
+
+ return 0; /* already exists */
+ }
+
+ fifo_part_header_t part_header;
+
+ part_header.tag = op.tag;
+ part_header.params = op.data_params;
+
+ part_header.min_ofs = CLS_FIFO_MAX_PART_HEADER_SIZE;
+ part_header.max_ofs = part_header.min_ofs;
+
+ cls_gen_random_bytes((char *)&part_header.magic, sizeof(part_header.magic));
+
+ r = write_part_header(hctx, part_header);
+ if (r < 0) {
+ CLS_LOG(10, "%s(): failed to write header: r=%d", __func__, r);
+ return r;
+ }
+
+ return 0;
+}
+
+static bool full_part(const fifo_part_header_t& part_header)
+{
+ return (part_header.max_ofs > part_header.params.full_size_threshold);
+}
+
+static int fifo_part_push_op(cls_method_context_t hctx,
+ bufferlist *in, bufferlist *out)
+{
+ CLS_LOG(20, "%s", __func__);
+
+ cls_fifo_part_push_op op;
+ try {
+ auto iter = in->cbegin();
+ decode(op, iter);
+ } catch (const buffer::error &err) {
+ CLS_ERR("ERROR: %s(): failed to decode request", __func__);
+ return -EINVAL;
+ }
+
+ fifo_part_header_t part_header;
+ int r = read_part_header(hctx, &part_header);
+ if (r < 0) {
+ CLS_LOG(10, "%s(): failed to read part header", __func__);
+ return r;
+ }
+
+ if (!(part_header.tag == op.tag)) {
+ CLS_LOG(10, "%s(): bad tag", __func__);
+ return -EINVAL;
+ }
+
+ uint64_t effective_len = op.total_len + op.data_bufs.size() * part_entry_overhead;
+
+ if (effective_len > part_header.params.max_entry_size + part_entry_overhead) {
+ return -EINVAL;
+ }
+
+ if (full_part(part_header)) {
+ return -ERANGE;
+ }
+
+ struct cls_fifo_entry_header entry_header;
+ entry_header.mtime = real_clock::now();
+
+ bufferlist entry_header_bl;
+ encode(entry_header, entry_header_bl);
+
+ auto max_index = part_header.max_index;
+ auto ofs = part_header.max_ofs;
+
+ cls_fifo_entry_header_pre pre_header;
+ pre_header.magic = part_header.magic;
+ pre_header.pre_size = sizeof(pre_header);
+ pre_header.reserved = 0;
+
+ uint64_t total_data = 0;
+
+ for (auto& data : op.data_bufs) {
+ total_data += data.length();
+
+ pre_header.header_size = entry_header_bl.length();
+ pre_header.data_size = data.length();
+ pre_header.index = max_index;
+
+ bufferptr pre((char *)&pre_header, sizeof(pre_header));
+ bufferlist all_data;
+ all_data.append(pre);
+ all_data.append(entry_header_bl);
+ all_data.claim_append(data);
+
+ auto write_len = all_data.length();
+
+ r = cls_cxx_write2(hctx, ofs, write_len,
+ &all_data, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
+ if (r < 0) {
+ CLS_LOG(10, "%s(): failed to write entry (ofs=%lld len=%lld): r=%d",
+ __func__, (long long)part_header.max_ofs, (long long)write_len, r);
+ return r;
+ }
+
+ ofs += write_len;
+ ++max_index;
+ }
+
+ if (total_data != op.total_len) {
+ CLS_LOG(10, "%s(): length mismatch: op.total_len=%lld total data received=%lld",
+ __func__, (long long)op.total_len, (long long)total_data);
+ return -EINVAL;
+ }
+
+ part_header.max_index = max_index;
+ part_header.max_ofs = ofs;
+
+ r = write_part_header(hctx, part_header);
+ if (r < 0) {
+ CLS_LOG(10, "%s(): failed to write header: r=%d", __func__, r);
+ return r;
+ }
+
+ return 0;
+}
+
+class EntryReader {
+ static constexpr uint64_t prefetch_len = (128 * 1024);
+
+ cls_method_context_t hctx;
+
+ fifo_part_header_t& part_header;
+
+ uint64_t ofs;
+ bufferlist data;
+
+ int fetch(uint64_t num_bytes);
+ int read(uint64_t num_bytes, bufferlist *pbl);
+ int peek(uint64_t num_bytes, char *dest);
+ int seek(uint64_t num_bytes);
+
+public:
+ EntryReader(cls_method_context_t _hctx,
+ fifo_part_header_t& _part_header,
+ uint64_t _ofs) : hctx(_hctx),
+ part_header(_part_header),
+ ofs(_ofs) {
+ if (ofs < part_header.min_ofs) {
+ ofs = part_header.min_ofs;
+ }
+ }
+
+ uint64_t get_ofs() const {
+ return ofs;
+ }
+
+ bool end() const {
+ return (ofs >= part_header.max_ofs);
+ }
+
+ int peek_pre_header(cls_fifo_entry_header_pre *pre_header);
+ int get_next_entry(bufferlist *pbl,
+ uint64_t *pofs,
+ ceph::real_time *pmtime);
+};
+
+
+int EntryReader::fetch(uint64_t num_bytes)
+{
+ CLS_LOG(20, "%s(): fetch %d bytes, ofs=%d data.length()=%d", __func__, (int)num_bytes, (int)ofs, (int)data.length());
+ if (data.length() < num_bytes) {
+ bufferlist bl;
+ CLS_LOG(20, "%s(): reading %d bytes at ofs=%d", __func__, (int)prefetch_len, (int)ofs + data.length());
+ int r = cls_cxx_read2(hctx, ofs + data.length(), prefetch_len, &bl, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
+ if (r < 0) {
+ CLS_ERR("ERROR: %s(): cls_cxx_read2() on obj returned %d", __func__, r);
+ return r;
+ }
+ data.claim_append(bl);
+ }
+
+ if ((unsigned)num_bytes > data.length()) {
+ CLS_LOG(20, "%s(): requested %lld bytes, but only %lld were available", __func__, (long long)num_bytes, (long long)data.length());
+ return -ERANGE;
+ }
+
+ return 0;
+}
+
+int EntryReader::read(uint64_t num_bytes, bufferlist *pbl)
+{
+ int r = fetch(num_bytes);
+ if (r < 0) {
+ return r;
+ }
+ data.splice(0, num_bytes, pbl);
+
+ ofs += num_bytes;
+
+ return 0;
+}
+
+int EntryReader::peek(uint64_t num_bytes, char *dest)
+{
+ int r = fetch(num_bytes);
+ if (r < 0) {
+ return r;
+ }
+
+ data.begin().copy(num_bytes, dest);
+
+ return 0;
+}
+
+int EntryReader::seek(uint64_t num_bytes)
+{
+ bufferlist bl;
+
+ CLS_LOG(20, "%s():%d: num_bytes=%d", __func__, __LINE__, (int)num_bytes);
+ return read(num_bytes, &bl);
+}
+
+int EntryReader::peek_pre_header(cls_fifo_entry_header_pre *pre_header)
+{
+ if (end()) {
+ return -ENOENT;
+ }
+
+ int r = peek(sizeof(*pre_header), (char *)pre_header);
+ if (r < 0) {
+ CLS_ERR("ERROR: %s(): peek() size=%d failed: r=%d", __func__, (int)sizeof(pre_header), r);
+ return r;
+ }
+
+ if (pre_header->magic != part_header.magic) {
+ CLS_ERR("ERROR: %s(): unexpected pre_header magic", __func__);
+ return -ERANGE;
+ }
+
+ return 0;
+}
+
+
+int EntryReader::get_next_entry(bufferlist *pbl,
+ uint64_t *pofs,
+ ceph::real_time *pmtime)
+{
+ cls_fifo_entry_header_pre pre_header;
+ int r = peek_pre_header(&pre_header);
+ if (r < 0) {
+ CLS_ERR("ERROR: %s(): peek_pre_header() failed: r=%d", __func__, r);
+ return r;
+ }
+
+ if (pofs) {
+ *pofs = ofs;
+ }
+
+ CLS_LOG(20, "%s():%d: pre_header.pre_size=%d", __func__, __LINE__, (int)pre_header.pre_size);
+ r = seek(pre_header.pre_size);
+ if (r < 0) {
+ CLS_ERR("ERROR: %s(): failed to seek: r=%d", __func__, r);
+ return r;
+ }
+
+ bufferlist header;
+ CLS_LOG(20, "%s():%d: pre_header.header_size=%d", __func__, __LINE__, (int)pre_header.header_size);
+ r = read(pre_header.header_size, &header);
+ if (r < 0) {
+ CLS_ERR("ERROR: %s(): failed to read entry header: r=%d", __func__, r);
+ return r;
+ }
+
+ cls_fifo_entry_header entry_header;
+ auto iter = header.cbegin();
+ try {
+ decode(entry_header, iter);
+ } catch (buffer::error& err) {
+ CLS_ERR("%s(): failed decoding entry header", __func__);
+ return -EIO;
+ }
+
+ if (pmtime) {
+ *pmtime = entry_header.mtime;
+ }
+
+ if (pbl) {
+ r = read(pre_header.data_size, pbl);
+ if (r < 0) {
+ CLS_ERR("%s(): failed reading data: r=%d", __func__, r);
+ return r;
+ }
+ } else {
+ r = seek(pre_header.data_size);
+ if (r < 0) {
+ CLS_ERR("ERROR: %s(): failed to seek: r=%d", __func__, r);
+ return r;
+ }
+ }
+
+ return 0;
+}
+
+static int fifo_part_trim_op(cls_method_context_t hctx,
+ bufferlist *in, bufferlist *out)
+{
+ CLS_LOG(20, "%s", __func__);
+
+ cls_fifo_part_trim_op op;
+ try {
+ auto iter = in->cbegin();
+ decode(op, iter);
+ } catch (const buffer::error &err) {
+ CLS_ERR("ERROR: %s(): failed to decode request", __func__);
+ return -EINVAL;
+ }
+
+ fifo_part_header_t part_header;
+ int r = read_part_header(hctx, &part_header);
+ if (r < 0) {
+ CLS_LOG(10, "%s(): failed to read part header", __func__);
+ return r;
+ }
+
+ if (op.tag &&
+ !(part_header.tag == *op.tag)) {
+ CLS_LOG(10, "%s(): bad tag", __func__);
+ return -EINVAL;
+ }
+
+ if (op.ofs < part_header.min_ofs) {
+ return 0;
+ }
+
+ if (op.ofs >= part_header.max_ofs) {
+ if (full_part(part_header)) {
+ /*
+ * trim full part completely: remove object
+ */
+
+ r = cls_cxx_remove(hctx);
+ if (r < 0) {
+ CLS_LOG(0, "%s(): ERROR: cls_cxx_remove() returned r=%d", __func__, r);
+ return r;
+ }
+
+ return 0;
+ }
+
+ part_header.min_ofs = part_header.max_ofs;
+ part_header.min_index = part_header.max_index;
+ } else {
+ EntryReader reader(hctx, part_header, op.ofs);
+
+ cls_fifo_entry_header_pre pre_header;
+ int r = reader.peek_pre_header(&pre_header);
+ if (r < 0) {
+ return r;
+ }
+
+ r = reader.get_next_entry(nullptr, nullptr, nullptr);
+ if (r < 0) {
+ CLS_ERR("ERROR: %s(): unexpected failure at get_next_entry(): r=%d", __func__, r);
+ return r;
+ }
+
+ part_header.min_ofs = reader.get_ofs();
+ part_header.min_index = pre_header.index + 1;
+ }
+
+ r = write_part_header(hctx, part_header);
+ if (r < 0) {
+ CLS_LOG(10, "%s(): failed to write header: r=%d", __func__, r);
+ return r;
+ }
+
+ return 0;
+}
+
+static int fifo_part_list_op(cls_method_context_t hctx,
+ bufferlist *in, bufferlist *out)
+{
+ CLS_LOG(20, "%s", __func__);
+
+ cls_fifo_part_list_op op;
+ try {
+ auto iter = in->cbegin();
+ decode(op, iter);
+ } catch (const buffer::error &err) {
+ CLS_ERR("ERROR: %s(): failed to decode request", __func__);
+ return -EINVAL;
+ }
+
+ fifo_part_header_t part_header;
+ int r = read_part_header(hctx, &part_header);
+ if (r < 0) {
+ CLS_LOG(10, "%s(): failed to read part header", __func__);
+ return r;
+ }
+
+ if (op.tag &&
+ !(part_header.tag == *op.tag)) {
+ CLS_LOG(10, "%s(): bad tag", __func__);
+ return -EINVAL;
+ }
+
+ EntryReader reader(hctx, part_header, op.ofs);
+
+ if (op.ofs >= part_header.min_ofs &&
+ !reader.end()) {
+ r = reader.get_next_entry(nullptr, nullptr, nullptr);
+ if (r < 0) {
+ CLS_ERR("ERROR: %s(): unexpected failure at get_next_entry(): r=%d", __func__, r);
+ return r;
+ }
+ }
+
+ cls_fifo_part_list_op_reply reply;
+
+ reply.tag = part_header.tag;
+
+#define LIST_MAX_ENTRIES 512
+
+ auto max_entries = std::min(op.max_entries, (int)LIST_MAX_ENTRIES);
+
+ for (int i = 0; i < max_entries && !reader.end(); ++i) {
+ bufferlist data;
+ ceph::real_time mtime;
+ uint64_t ofs;
+
+ r = reader.get_next_entry(&data, &ofs, &mtime);
+ if (r < 0) {
+ CLS_ERR("ERROR: %s(): unexpected failure at get_next_entry(): r=%d", __func__, r);
+ return r;
+ }
+
+ reply.entries.emplace_back(std::move(data), ofs, mtime);
+ }
+
+ reply.more = !reader.end();
+ reply.full_part = full_part(part_header);
+
+ encode(reply, *out);
+
+ return 0;
+}
+
+static int fifo_part_get_info_op(cls_method_context_t hctx,
+ bufferlist *in, bufferlist *out)
+{
+ CLS_LOG(20, "%s", __func__);
+
+ cls_fifo_part_get_info_op op;
+ try {
+ auto iter = in->cbegin();
+ decode(op, iter);
+ } catch (const buffer::error &err) {
+ CLS_ERR("ERROR: %s(): failed to decode request", __func__);
+ return -EINVAL;
+ }
+
+ cls_fifo_part_get_info_op_reply reply;
+
+ int r = read_part_header(hctx, &reply.header);
+ if (r < 0) {
+ CLS_LOG(10, "%s(): failed to read part header", __func__);
+ return r;
+ }
+
+ encode(reply, *out);
+
+ return 0;
+}
+
+CLS_INIT(fifo)
+{
+ CLS_LOG(20, "Loaded fifo class!");
+
+ cls_handle_t h_class;
+ cls_method_handle_t h_fifo_meta_create_op;
+ cls_method_handle_t h_fifo_meta_get_op;
+ cls_method_handle_t h_fifo_meta_update_op;
+ cls_method_handle_t h_fifo_part_init_op;
+ cls_method_handle_t h_fifo_part_push_op;
+ cls_method_handle_t h_fifo_part_trim_op;
+ cls_method_handle_t h_fifo_part_list_op;
+ cls_method_handle_t h_fifo_part_get_info_op;
+
+ cls_register("fifo", &h_class);
+ cls_register_cxx_method(h_class, "fifo_meta_create",
+ CLS_METHOD_RD | CLS_METHOD_WR,
+ fifo_meta_create_op, &h_fifo_meta_create_op);
+
+ cls_register_cxx_method(h_class, "fifo_meta_get",
+ CLS_METHOD_RD,
+ fifo_meta_get_op, &h_fifo_meta_get_op);
+
+ cls_register_cxx_method(h_class, "fifo_meta_update",
+ CLS_METHOD_RD | CLS_METHOD_WR,
+ fifo_meta_update_op, &h_fifo_meta_update_op);
+
+ cls_register_cxx_method(h_class, "fifo_part_init",
+ CLS_METHOD_RD | CLS_METHOD_WR,
+ fifo_part_init_op, &h_fifo_part_init_op);
+
+ cls_register_cxx_method(h_class, "fifo_part_push",
+ CLS_METHOD_RD | CLS_METHOD_WR,
+ fifo_part_push_op, &h_fifo_part_push_op);
+
+ cls_register_cxx_method(h_class, "fifo_part_trim",
+ CLS_METHOD_RD | CLS_METHOD_WR,
+ fifo_part_trim_op, &h_fifo_part_trim_op);
+
+ cls_register_cxx_method(h_class, "fifo_part_list",
+ CLS_METHOD_RD,
+ fifo_part_list_op, &h_fifo_part_list_op);
+
+ cls_register_cxx_method(h_class, "fifo_part_get_info",
+ CLS_METHOD_RD,
+ fifo_part_get_info_op, &h_fifo_part_get_info_op);
+
+ /* calculate entry overhead */
+ struct cls_fifo_entry_header entry_header;
+ bufferlist entry_header_bl;
+ encode(entry_header, entry_header_bl);
+
+ part_entry_overhead = sizeof(cls_fifo_entry_header_pre) + entry_header_bl.length();
+
+ return;
+}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2019 Red Hat, Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include "include/rados/librados.hpp"
+#include "common/dout.h"
+
+#include "auth/Crypto.h"
+
+using namespace librados;
+
+#include "cls/fifo/cls_fifo_ops.h"
+#include "cls/fifo/cls_fifo_client.h"
+
+
+#define dout_subsys ceph_subsys_objclass
+
+
+namespace rados {
+ namespace cls {
+ namespace fifo {
+ int ClsFIFO::meta_create(librados::ObjectWriteOperation *rados_op,
+ const string& id,
+ const MetaCreateParams& params) {
+ cls_fifo_meta_create_op op;
+
+ auto& state = params.state;
+
+ if (id.empty()) {
+ return -EINVAL;
+ }
+
+ op.id = id;
+ op.objv = state.objv;
+ op.oid_prefix = state.oid_prefix;
+ op.max_part_size = state.max_part_size;
+ op.max_entry_size = state.max_entry_size;
+ op.exclusive = state.exclusive;
+
+ if (op.max_part_size == 0 ||
+ op.max_entry_size == 0 ||
+ op.max_entry_size > op.max_part_size) {
+ return -EINVAL;
+ }
+
+ bufferlist in;
+ encode(op, in);
+ rados_op->exec("fifo", "fifo_meta_create", in);
+
+ return 0;
+ }
+
+ int ClsFIFO::meta_get(librados::IoCtx& ioctx,
+ const string& oid,
+ const MetaGetParams& params,
+ fifo_info_t *result,
+ uint32_t *part_header_size,
+ uint32_t *part_entry_overhead) {
+ cls_fifo_meta_get_op op;
+
+ auto& state = params.state;
+
+ op.objv = state.objv;
+
+ librados::ObjectReadOperation rop;
+
+ bufferlist in;
+ bufferlist out;
+ int op_ret;
+ encode(op, in);
+ rop.exec("fifo", "fifo_meta_get", in, &out, &op_ret);
+
+ int r = ioctx.operate(oid, &rop, nullptr);
+ if (r < 0) {
+ return r;
+ }
+
+ if (op_ret < 0) {
+ return op_ret;
+ }
+
+ cls_fifo_meta_get_op_reply reply;
+ auto iter = out.cbegin();
+ try {
+ decode(reply, iter);
+ } catch (buffer::error& err) {
+ return -EIO;
+ }
+
+ *result = reply.info;
+
+ if (part_header_size) {
+ *part_header_size = reply.part_header_size;
+ }
+
+ if (part_entry_overhead) {
+ *part_entry_overhead = reply.part_entry_overhead;
+ }
+
+ return 0;
+ }
+
+ int ClsFIFO::meta_update(librados::ObjectWriteOperation *rados_op,
+ const MetaUpdateParams& params) {
+ cls_fifo_meta_update_op op;
+
+ auto& state = params.state;
+
+ if (state.objv.empty()) {
+ return -EINVAL;
+ }
+
+ op.objv = state.objv;
+ op.tail_part_num = state.tail_part_num;
+ op.head_part_num = state.head_part_num;
+ op.min_push_part_num = state.min_push_part_num;
+ op.max_push_part_num = state.max_push_part_num;
+ op.journal_entries_add = state.journal_entries_add;
+ op.journal_entries_rm = state.journal_entries_rm;
+
+ bufferlist in;
+ encode(op, in);
+ rados_op->exec("fifo", "fifo_meta_update", in);
+
+ return 0;
+ }
+
+ int ClsFIFO::part_init(librados::ObjectWriteOperation *rados_op,
+ const PartInitParams& params) {
+ cls_fifo_part_init_op op;
+
+ auto& state = params.state;
+
+ if (state.tag.empty()) {
+ return -EINVAL;
+ }
+
+ op.tag = state.tag;
+ op.data_params = state.data_params;
+
+ bufferlist in;
+ encode(op, in);
+ rados_op->exec("fifo", "fifo_part_init", in);
+
+ return 0;
+ }
+
+ int ClsFIFO::push_part(librados::ObjectWriteOperation *rados_op,
+ const PushPartParams& params) {
+ cls_fifo_part_push_op op;
+
+ auto& state = params.state;
+
+ if (state.tag.empty()) {
+ return -EINVAL;
+ }
+
+ op.tag = state.tag;
+ op.data_bufs = state.data_bufs;
+ op.total_len = state.total_len;
+
+ bufferlist in;
+ encode(op, in);
+ rados_op->exec("fifo", "fifo_part_push", in);
+
+ return 0;
+ }
+
+ int ClsFIFO::trim_part(librados::ObjectWriteOperation *rados_op,
+ const TrimPartParams& params) {
+ cls_fifo_part_trim_op op;
+
+ auto& state = params.state;
+
+ op.tag = state.tag;
+ op.ofs = state.ofs;
+
+ bufferlist in;
+ encode(op, in);
+ rados_op->exec("fifo", "fifo_part_trim", in);
+
+ return 0;
+ }
+
+ int ClsFIFO::list_part(librados::IoCtx& ioctx,
+ const string& oid,
+ const ListPartParams& params,
+ std::vector<cls_fifo_part_list_entry_t> *pentries,
+ bool *more,
+ bool *full_part,
+ string *ptag)
+ {
+ cls_fifo_part_list_op op;
+
+ auto& state = params.state;
+
+ op.tag = state.tag;
+ op.ofs = state.ofs;
+ op.max_entries = state.max_entries;
+
+ librados::ObjectReadOperation rop;
+
+ bufferlist in;
+ bufferlist out;
+ int op_ret;
+ encode(op, in);
+ rop.exec("fifo", "fifo_part_list", in, &out, &op_ret);
+
+ int r = ioctx.operate(oid, &rop, nullptr);
+ if (r < 0) {
+ return r;
+ }
+
+ if (op_ret < 0) {
+ return op_ret;
+ }
+
+ cls_fifo_part_list_op_reply reply;
+ auto iter = out.cbegin();
+ try {
+ decode(reply, iter);
+ } catch (buffer::error& err) {
+ return -EIO;
+ }
+
+ if (pentries) {
+ *pentries = std::move(reply.entries);
+ }
+
+ if (more) {
+ *more = reply.more;
+ }
+
+ if (full_part) {
+ *full_part = reply.full_part;
+ }
+
+ if (ptag) {
+ *ptag = reply.tag;
+ }
+
+ return 0;
+ }
+
+ int ClsFIFO::get_part_info(librados::IoCtx& ioctx,
+ const string& oid,
+ rados::cls::fifo::fifo_part_header_t *header)
+ {
+ cls_fifo_part_get_info_op op;
+
+ librados::ObjectReadOperation rop;
+
+ bufferlist in;
+ bufferlist out;
+ int op_ret;
+ encode(op, in);
+ rop.exec("fifo", "fifo_part_get_info", in, &out, &op_ret);
+
+ int r = ioctx.operate(oid, &rop, nullptr);
+ if (r < 0) {
+ return r;
+ }
+
+ if (op_ret < 0) {
+ return op_ret;
+ }
+
+ cls_fifo_part_get_info_op_reply reply;
+ auto iter = out.cbegin();
+ try {
+ decode(reply, iter);
+ } catch (buffer::error& err) {
+ return -EIO;
+ }
+
+ if (header) {
+ *header = std::move(reply.header);
+ }
+
+ return 0;
+ }
+
+ string FIFO::craft_marker(int64_t part_num,
+ uint64_t part_ofs)
+ {
+ char buf[64];
+ snprintf(buf, sizeof(buf), "%lld:%lld", (long long)part_num, (long long)part_ofs);
+ return string(buf);
+ }
+
+ bool FIFO::parse_marker(const string& marker,
+ int64_t *part_num,
+ uint64_t *part_ofs)
+ {
+ if (marker.empty()) {
+ *part_num = meta_info.tail_part_num;
+ *part_ofs = 0;
+ return true;
+ }
+
+ auto pos = marker.find(':');
+ if (pos == string::npos) {
+ return false;
+ }
+
+ auto first = marker.substr(0, pos);
+ auto second = marker.substr(pos + 1);
+
+ string err;
+
+ *part_num = (int64_t)strict_strtoll(first.c_str(), 10, &err);
+ if (!err.empty()) {
+ return false;
+ }
+
+ *part_ofs = (uint64_t)strict_strtoll(second.c_str(), 10, &err);
+ if (!err.empty()) {
+ return false;
+ }
+
+ return true;
+ }
+
+ int FIFO::init_ioctx(librados::Rados *rados,
+ const string& pool,
+ std::optional<string> pool_ns)
+ {
+ _ioctx.emplace();
+ int r = rados->ioctx_create(pool.c_str(), *_ioctx);
+ if (r < 0) {
+ return r;
+ }
+
+ if (pool_ns && !pool_ns->empty()) {
+ _ioctx->set_namespace(*pool_ns);
+ }
+
+ ioctx = &(*_ioctx);
+
+ return 0;
+ }
+
+ int ClsFIFO::MetaUpdateParams::apply_update(CephContext *cct,
+ fifo_info_t *info)
+ {
+ string err;
+
+ int r = info->apply_update(state.tail_part_num,
+ state.head_part_num,
+ state.min_push_part_num,
+ state.max_push_part_num,
+ state.journal_entries_add,
+ state.journal_entries_rm,
+ &err);
+ if (r < 0) {
+ ldout(cct, 0) << __func__ << "(): ERROR: " << err << dendl;
+ return r;
+ }
+
+ ++info->objv.ver;
+
+ return 0;
+ }
+
+ int FIFO::update_meta(ClsFIFO::MetaUpdateParams& update_params,
+ bool *canceled)
+ {
+ update_params.objv(meta_info.objv);
+
+ librados::ObjectWriteOperation wop;
+ int r = ClsFIFO::meta_update(&wop, update_params);
+ if (r < 0) {
+ return r;
+ }
+
+ r = ioctx->operate(meta_oid, &wop);
+ if (r < 0 && r != -ECANCELED) {
+ return r;
+ }
+
+ *canceled = (r == -ECANCELED);
+
+ if (!*canceled) {
+ r = update_params.apply_update(cct, &meta_info);
+ if (r < 0) { /* should really not happen,
+ but if it does, let's treat it as if race was detected */
+ *canceled = true;
+ }
+ }
+
+ if (*canceled) {
+ r = do_read_meta();
+ }
+ if (r < 0) {
+ return r;
+ }
+
+ return 0;
+ }
+
+ int FIFO::do_read_meta(std::optional<fifo_objv_t> objv)
+ {
+ ClsFIFO::MetaGetParams get_params;
+ if (objv) {
+ get_params.objv(*objv);
+ }
+ int r = ClsFIFO::meta_get(*ioctx,
+ meta_oid,
+ get_params,
+ &meta_info,
+ &part_header_size,
+ &part_entry_overhead);
+ if (r < 0) {
+ return r;
+ }
+
+ return 0;
+ }
+
+ int FIFO::create_part(int64_t part_num, const string& tag,
+ int64_t& max_part_num) {
+ librados::ObjectWriteOperation op;
+
+ op.create(true); /* exclusive */
+ int r = ClsFIFO::part_init(&op,
+ ClsFIFO::PartInitParams()
+ .tag(tag)
+ .data_params(meta_info.data_params));
+ if (r < 0) {
+ return r;
+ }
+
+ r = ioctx->operate(meta_info.part_oid(part_num), &op);
+ if (r < 0) {
+ return r;
+ }
+
+ if (part_num > max_part_num) {
+ max_part_num = part_num;
+ }
+
+ return 0;
+ }
+
+ int FIFO::remove_part(int64_t part_num, const string& tag,
+ int64_t& tail_part_num) {
+ librados::ObjectWriteOperation op;
+ op.remove();
+ int r = ioctx->operate(meta_info.part_oid(part_num), &op);
+ if (r == -ENOENT) {
+ r = 0;
+ }
+ if (r < 0) {
+ return r;
+ }
+
+ if (part_num >= tail_part_num) {
+ tail_part_num = part_num + 1;
+ }
+
+ return 0;
+ }
+
+ int FIFO::process_journal_entry(const fifo_journal_entry_t& entry,
+ int64_t& tail_part_num,
+ int64_t& head_part_num,
+ int64_t& max_part_num)
+ {
+
+ switch (entry.op) {
+ case fifo_journal_entry_t::Op::OP_CREATE:
+ return create_part(entry.part_num, entry.part_tag, max_part_num);
+ case fifo_journal_entry_t::Op::OP_SET_HEAD:
+ if (entry.part_num > head_part_num) {
+ head_part_num = entry.part_num;
+ }
+ return 0;
+ case fifo_journal_entry_t::Op::OP_REMOVE:
+ return remove_part(entry.part_num, entry.part_tag, tail_part_num);
+ default:
+ /* nothing to do */
+ break;
+ }
+
+ return -EIO;
+ }
+
+ int FIFO::process_journal_entries(vector<fifo_journal_entry_t> *processed,
+ int64_t& tail_part_num,
+ int64_t& head_part_num,
+ int64_t& max_part_num)
+ {
+ for (auto& iter : meta_info.journal) {
+ auto& entry = iter.second;
+ int r = process_journal_entry(entry, tail_part_num, head_part_num, max_part_num);
+ if (r < 0) {
+ ldout(cct, 10) << __func__ << "(): ERROR: failed processing journal entry for part=" << entry.part_num << dendl;
+ } else {
+ processed->push_back(entry);
+ }
+ }
+
+ return 0;
+ }
+
+ int FIFO::process_journal()
+ {
+ vector<fifo_journal_entry_t> processed;
+
+ int64_t new_tail = meta_info.tail_part_num;
+ int64_t new_head = meta_info.head_part_num;
+ int64_t new_max = meta_info.max_push_part_num;
+
+ int r = process_journal_entries(&processed, new_tail, new_head, new_max);
+ if (r < 0) {
+ return r;
+ }
+
+ if (processed.empty()) {
+ return 0;
+ }
+
+#define RACE_RETRY 10
+
+ int i;
+
+ for (i = 0; i < RACE_RETRY; ++i) {
+ bool canceled;
+
+ std::optional<int64_t> tail_part_num;
+ std::optional<int64_t> head_part_num;
+ std::optional<int64_t> max_part_num;
+
+ if (new_tail > meta_info.tail_part_num) {
+ tail_part_num = new_tail;
+ }
+
+ if (new_head > meta_info.head_part_num) {
+ head_part_num = new_head;
+ }
+
+ if (new_max > meta_info.max_push_part_num) {
+ max_part_num = new_max;
+ }
+
+ if (processed.empty() &&
+ !tail_part_num &&
+ !max_part_num) {
+ /* nothing to update anymore */
+ break;
+ }
+
+ r = update_meta(ClsFIFO::MetaUpdateParams()
+ .journal_entries_rm(processed)
+ .tail_part_num(tail_part_num)
+ .head_part_num(head_part_num)
+ .max_push_part_num(max_part_num),
+ &canceled);
+ if (r < 0) {
+ return r;
+ }
+
+ if (canceled) {
+ vector<fifo_journal_entry_t> new_processed;
+
+ for (auto& e : processed) {
+ auto jiter = meta_info.journal.find(e.part_num);
+ if (jiter == meta_info.journal.end() || /* journal entry was already processed */
+ !(jiter->second == e)) {
+ continue;
+ }
+
+ new_processed.push_back(e);
+ }
+ processed = std::move(new_processed);
+ continue;
+ }
+ break;
+ }
+ if (i == RACE_RETRY) {
+ ldout(cct, 0) << "ERROR: " << __func__ << "(): race check failed too many times, likely a bug" << dendl;
+ return -ECANCELED;
+ }
+ return 0;
+ }
+
+ static const char alphanum_plain_table[]="ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789";
+
+ void gen_rand_alphanumeric_plain(CephContext *cct, char *dest, int size) /* size should be the required string size + 1 */
+ {
+ cct->random()->get_bytes(dest, size);
+
+ int i;
+ for (i = 0; i < size - 1; i++) {
+ int pos = (unsigned)dest[i];
+ dest[i] = alphanum_plain_table[pos % (sizeof(alphanum_plain_table) - 1)];
+ }
+ dest[i] = '\0';
+ }
+
+ static string generate_tag(CephContext *cct)
+ {
+#define HEADER_TAG_SIZE 16
+ char buf[HEADER_TAG_SIZE + 1];
+ buf[HEADER_TAG_SIZE] = 0;
+ gen_rand_alphanumeric_plain(cct, buf, sizeof(buf));
+ return string(buf);
+ }
+
+ int FIFO::prepare_new_part(bool is_head)
+ {
+ fifo_journal_entry_t jentry;
+
+ meta_info.prepare_next_journal_entry(&jentry, generate_tag(cct));
+
+ int64_t new_head_part_num = meta_info.head_part_num;
+
+ std::optional<fifo_journal_entry_t> new_head_jentry;
+ if (is_head) {
+ new_head_jentry = jentry;
+ new_head_jentry->op = fifo_journal_entry_t::OP_SET_HEAD;
+ new_head_part_num = jentry.part_num;
+ }
+
+ int r;
+ bool canceled;
+
+ int i;
+
+ for (i = 0; i < RACE_RETRY; ++i) {
+ r = update_meta(ClsFIFO::MetaUpdateParams()
+ .journal_entry_add(jentry)
+ .journal_entry_add(new_head_jentry),
+ &canceled);
+ if (r < 0) {
+ return r;
+ }
+
+ if (canceled) {
+ if (meta_info.max_push_part_num >= jentry.part_num &&
+ meta_info.head_part_num >= new_head_part_num) { /* raced, but new part was already written */
+ return 0;
+ }
+
+ auto iter = meta_info.journal.find(jentry.part_num);
+ if (iter == meta_info.journal.end()) {
+ continue;
+ }
+ }
+ break;
+ }
+ if (i == RACE_RETRY) {
+ ldout(cct, 0) << "ERROR: " << __func__ << "(): race check failed too many times, likely a bug" << dendl;
+ return -ECANCELED;
+ }
+
+ r = process_journal();
+ if (r < 0) {
+ return r;
+ }
+
+ return 0;
+ }
+
+ int FIFO::prepare_new_head()
+ {
+ int64_t new_head_num = meta_info.head_part_num + 1;
+
+ if (meta_info.max_push_part_num < new_head_num) {
+ int r = prepare_new_part(true);
+ if (r < 0) {
+ return r;
+ }
+
+ if (meta_info.max_push_part_num < new_head_num) {
+ ldout(cct, 0) << "ERROR: " << __func__ << ": after new part creation: meta_info.max_push_part_num="
+ << meta_info.max_push_part_num << " new_head_num=" << meta_info.max_push_part_num << dendl;
+ return -EIO;
+ }
+
+ return 0;
+ }
+
+ int i;
+
+ for (i = 0; i < RACE_RETRY; ++i) {
+ bool canceled;
+ int r = update_meta(ClsFIFO::MetaUpdateParams()
+ .head_part_num(new_head_num),
+ &canceled);
+ if (r < 0) {
+ return r;
+ }
+
+ if (canceled) {
+ if (meta_info.head_part_num < new_head_num) {
+ continue;
+ }
+ }
+ break;
+ }
+ if (i == RACE_RETRY) {
+ ldout(cct, 0) << "ERROR: " << __func__ << "(): race check failed too many times, likely a bug" << dendl;
+ return -ECANCELED;
+ }
+
+
+ return 0;
+ }
+
+ int FIFO::open(bool create,
+ std::optional<ClsFIFO::MetaCreateParams> create_params)
+ {
+ if (!ioctx) {
+ return -EINVAL;
+ }
+
+ if (create) {
+ librados::ObjectWriteOperation op;
+
+ ClsFIFO::MetaCreateParams default_params;
+ ClsFIFO::MetaCreateParams *params = (create_params ? &(*create_params) : &default_params);
+
+ int r = ClsFIFO::meta_create(&op, id, *params);
+ if (r < 0) {
+ return r;
+ }
+
+ r = ioctx->operate(meta_oid, &op);
+ if (r < 0) {
+ return r;
+ }
+ }
+
+ std::optional<fifo_objv_t> objv = (create_params ? create_params->state.objv : nullopt);
+
+ int r = do_read_meta(objv);
+ if (r < 0) {
+ return r;
+ }
+
+ is_open = true;
+
+ return 0;
+ }
+
+ int FIFO::read_meta(std::optional<fifo_objv_t> objv)
+ {
+ if (!is_open) {
+ return -EINVAL;
+ }
+
+ return do_read_meta(objv);
+ }
+
+ int FIFO::push_entries(int64_t part_num, std::vector<bufferlist>& data_bufs)
+ {
+ if (!is_open) {
+ return -EINVAL;
+ }
+
+ librados::ObjectWriteOperation op;
+
+ int r = ClsFIFO::push_part(&op, ClsFIFO::PushPartParams()
+ .tag(meta_info.head_tag)
+ .data_bufs(data_bufs));
+ if (r < 0) {
+ return r;
+ }
+
+ r = ioctx->operate(meta_info.part_oid(part_num), &op);
+ if (r < 0) {
+ return r;
+ }
+
+ return 0;
+ }
+
+ int FIFO::trim_part(int64_t part_num,
+ uint64_t ofs,
+ std::optional<string> tag)
+ {
+ if (!is_open) {
+ return -EINVAL;
+ }
+
+ librados::ObjectWriteOperation op;
+
+ int r = ClsFIFO::trim_part(&op, ClsFIFO::TrimPartParams()
+ .tag(tag)
+ .ofs(ofs));
+ if (r < 0) {
+ return r;
+ }
+
+ r = ioctx->operate(meta_info.part_oid(part_num), &op);
+ if (r < 0) {
+ return r;
+ }
+
+ return 0;
+ }
+
+ int FIFO::push(bufferlist& bl)
+ {
+ std::vector<bufferlist> data_bufs;
+ data_bufs.push_back(bl);
+
+ return push(data_bufs);
+ }
+
+ int FIFO::push(vector<bufferlist>& data_bufs)
+ {
+ if (!is_open) {
+ return -EINVAL;
+ }
+
+ int r;
+
+ if (meta_info.need_new_head()) {
+ r = prepare_new_head();
+ if (r < 0) {
+ return r;
+ }
+ }
+
+ int i;
+
+ auto iter = data_bufs.begin();
+
+ while (iter != data_bufs.end()) {
+ uint64_t batch_len = 0;
+
+ vector<bufferlist> batch;
+
+ for (; iter != data_bufs.end(); ++iter) {
+ auto& data = *iter;
+ auto data_len = data.length();
+ auto max_entry_size = meta_info.data_params.max_entry_size;
+
+ if (data_len > max_entry_size) {
+ ldout(cct, 10) << __func__ << "(): entry too large: " << data_len << " > " << meta_info.data_params.max_entry_size << dendl;
+ return -EINVAL;
+ }
+
+ if (batch_len + data_len > max_entry_size) {
+ break;
+ }
+
+ batch_len += data_len + part_entry_overhead; /* we can send entry with data_len up to max_entry_size,
+ however, we want to also account the overhead when dealing
+ with multiple entries. Previous check doesn't account
+ for overhead on purpose. */
+
+ batch.push_back(data);
+ }
+
+
+ for (i = 0; i < RACE_RETRY; ++i) {
+ r = push_entries(meta_info.head_part_num, batch);
+ if (r == -ERANGE) {
+ r = prepare_new_head();
+ if (r < 0) {
+ return r;
+ }
+ continue;
+ }
+ if (r < 0) {
+ return r;
+ }
+ break;
+ }
+ if (i == RACE_RETRY) {
+ ldout(cct, 0) << "ERROR: " << __func__ << "(): race check failed too many times, likely a bug" << dendl;
+ return -ECANCELED;
+ }
+ }
+
+ return 0;
+ }
+
+ int FIFO::list(int max_entries,
+ std::optional<string> marker,
+ vector<fifo_entry> *result,
+ bool *more)
+ {
+ if (!is_open) {
+ return -EINVAL;
+ }
+
+ *more = false;
+
+ int64_t part_num = meta_info.tail_part_num;
+ uint64_t ofs = 0;
+
+ if (marker) {
+ if (!parse_marker(*marker, &part_num, &ofs)) {
+ ldout(cct, 20) << __func__ << "(): failed to parse marker (" << *marker << ")" << dendl;
+ return -EINVAL;
+ }
+ }
+
+ result->clear();
+ result->reserve(max_entries);
+
+ bool part_more{false};
+ bool part_full{false};
+
+ while (max_entries > 0) {
+ std::vector<cls_fifo_part_list_entry_t> entries;
+ int r = ClsFIFO::list_part(*ioctx,
+ meta_info.part_oid(part_num),
+ ClsFIFO::ListPartParams()
+ .ofs(ofs)
+ .max_entries(max_entries),
+ &entries,
+ &part_more,
+ &part_full,
+ nullptr);
+ if (r == -ENOENT) {
+ r = do_read_meta();
+ if (r < 0) {
+ return r;
+ }
+
+ if (part_num < meta_info.tail_part_num) {
+ /* raced with trim? restart */
+ result->clear();
+ part_num = meta_info.tail_part_num;
+ ofs = 0;
+ continue;
+ }
+
+ /* assuming part was not written yet, so end of data */
+
+ *more = false;
+
+ return 0;
+ }
+ if (r < 0) {
+ ldout(cct, 20) << __func__ << "(): ClsFIFO::list_part() on oid=" << meta_info.part_oid(part_num) << " returned r=" << r << dendl;
+ return r;
+ }
+
+ for (auto& entry : entries) {
+ fifo_entry e;
+ e.data = std::move(entry.data);
+ e.marker = craft_marker(part_num, entry.ofs);
+ e.mtime = entry.mtime;
+
+ result->push_back(e);
+ }
+ max_entries -= entries.size();
+
+ if (max_entries > 0 &&
+ part_more) {
+ continue;
+ }
+
+ if (!part_full) { /* head part is not full */
+ break;
+ }
+
+ ++part_num;
+ ofs = 0;
+ }
+
+ *more = part_full || part_more;
+
+ return 0;
+ }
+
+ int FIFO::trim(const string& marker)
+ {
+ if (!is_open) {
+ return -EINVAL;
+ }
+
+ int64_t part_num;
+ uint64_t ofs;
+
+ if (!parse_marker(marker, &part_num, &ofs)) {
+ ldout(cct, 20) << __func__ << "(): failed to parse marker: marker=" << marker << dendl;
+ return -EINVAL;
+ }
+
+ for (int64_t pn = meta_info.tail_part_num; pn < part_num; ++pn) {
+ int r = trim_part(pn, meta_info.data_params.max_part_size, std::nullopt);
+ if (r < 0 &&
+ r != -ENOENT) {
+ ldout(cct, 0) << __func__ << "(): ERROR: trim_part() on part=" << pn << " returned r=" << r << dendl;
+ return r;
+ }
+ }
+
+ int r = trim_part(part_num, ofs, std::nullopt);
+ if (r < 0 &&
+ r != -ENOENT) {
+ ldout(cct, 0) << __func__ << "(): ERROR: trim_part() on part=" << part_num << " returned r=" << r << dendl;
+ return r;
+ }
+
+ if (part_num <= meta_info.tail_part_num) {
+ /* don't need to modify meta info */
+ return 0;
+ }
+
+ int i;
+
+ for (i = 0; i < RACE_RETRY; ++i) {
+ bool canceled;
+ int r = update_meta(ClsFIFO::MetaUpdateParams()
+ .tail_part_num(part_num),
+ &canceled);
+ if (r < 0) {
+ return r;
+ }
+
+ if (canceled) {
+ if (meta_info.tail_part_num < part_num) {
+ continue;
+ }
+ }
+ break;
+
+ if (i == RACE_RETRY) {
+ ldout(cct, 0) << "ERROR: " << __func__ << "(): race check failed too many times, likely a bug" << dendl;
+ return -ECANCELED;
+ }
+ }
+
+ return 0;
+ }
+
+ int FIFO::get_part_info(int64_t part_num,
+ fifo_part_info *result)
+ {
+ if (!is_open) {
+ return -EINVAL;
+ }
+
+ fifo_part_header_t header;
+
+ int r = ClsFIFO::get_part_info(*ioctx,
+ meta_info.part_oid(part_num),
+ &header);
+ if (r < 0) {
+ return r;
+ }
+
+ *result = std::move(header);
+
+ return 0;
+ }
+
+ } // namespace fifo
+ } // namespace cls
+} // namespace rados
+
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2019 Red Hat, Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+
+#pragma once
+
+#include "cls/fifo/cls_fifo_types.h"
+
+namespace rados {
+ namespace cls {
+ namespace fifo {
+
+ class ClsFIFO {
+ public:
+
+ /* create */
+
+ struct MetaCreateParams {
+ struct State {
+ static constexpr uint64_t default_max_part_size = 4 * 1024 * 1024;
+ static constexpr uint64_t default_max_entry_size = 32 * 1024;
+ std::optional<fifo_objv_t> objv;
+ std::optional<std::string> oid_prefix;
+ bool exclusive{false};
+ uint64_t max_part_size{default_max_part_size};
+ uint64_t max_entry_size{default_max_entry_size};
+ } state;
+
+ MetaCreateParams& oid_prefix(const std::string& oid_prefix) {
+ state.oid_prefix = oid_prefix;
+ return *this;
+ }
+ MetaCreateParams& exclusive(bool exclusive) {
+ state.exclusive = exclusive;
+ return *this;
+ }
+ MetaCreateParams& max_part_size(uint64_t max_part_size) {
+ state.max_part_size = max_part_size;
+ return *this;
+ }
+ MetaCreateParams& max_entry_size(uint64_t max_entry_size) {
+ state.max_entry_size = max_entry_size;
+ return *this;
+ }
+ MetaCreateParams& objv(const fifo_objv_t& objv) {
+ state.objv = objv;
+ return *this;
+ }
+ MetaCreateParams& objv(const std::string& instance, uint64_t ver) {
+ state.objv = fifo_objv_t{instance, ver};
+ return *this;
+ }
+ };
+
+ static int meta_create(librados::ObjectWriteOperation *op,
+ const string& id,
+ const MetaCreateParams& params);
+
+ /* get info */
+
+ struct MetaGetParams {
+ struct State {
+ std::optional<fifo_objv_t> objv;
+ } state;
+
+ MetaGetParams& objv(std::optional<fifo_objv_t>& v) {
+ state.objv = v;
+ return *this;
+ }
+ MetaGetParams& objv(const fifo_objv_t& v) {
+ state.objv = v;
+ return *this;
+ }
+ MetaGetParams& objv(const std::string& instance, uint64_t ver) {
+ state.objv = fifo_objv_t{instance, ver};
+ return *this;
+ }
+ };
+ static int meta_get(librados::IoCtx& ioctx,
+ const string& oid,
+ const MetaGetParams& params,
+ rados::cls::fifo::fifo_info_t *result,
+ uint32_t *part_header_size,
+ uint32_t *part_entry_overhead);
+
+ /* update */
+
+ struct MetaUpdateParams {
+ struct State {
+ rados::cls::fifo::fifo_objv_t objv;
+
+ std::optional<uint64_t> tail_part_num;
+ std::optional<uint64_t> head_part_num;
+ std::optional<uint64_t> min_push_part_num;
+ std::optional<uint64_t> max_push_part_num;
+ std::vector<rados::cls::fifo::fifo_journal_entry_t> journal_entries_add;
+ std::vector<rados::cls::fifo::fifo_journal_entry_t> journal_entries_rm;
+ } state;
+
+ MetaUpdateParams& objv(const fifo_objv_t& objv) {
+ state.objv = objv;
+ return *this;
+ }
+ MetaUpdateParams& tail_part_num(std::optional<uint64_t> tail_part_num) {
+ state.tail_part_num = tail_part_num;
+ return *this;
+ }
+ MetaUpdateParams& tail_part_num(uint64_t tail_part_num) {
+ state.tail_part_num = tail_part_num;
+ return *this;
+ }
+ MetaUpdateParams& head_part_num(std::optional<uint64_t> head_part_num) {
+ state.head_part_num = head_part_num;
+ return *this;
+ }
+ MetaUpdateParams& head_part_num(uint64_t head_part_num) {
+ state.head_part_num = head_part_num;
+ return *this;
+ }
+ MetaUpdateParams& min_push_part_num(uint64_t num) {
+ state.min_push_part_num = num;
+ return *this;
+ }
+ MetaUpdateParams& max_push_part_num(std::optional<uint64_t> num) {
+ state.max_push_part_num = num;
+ return *this;
+ }
+ MetaUpdateParams& max_push_part_num(uint64_t num) {
+ state.max_push_part_num = num;
+ return *this;
+ }
+ MetaUpdateParams& journal_entry_add(std::optional<rados::cls::fifo::fifo_journal_entry_t> entry) {
+ if (entry) {
+ state.journal_entries_add.push_back(*entry);
+ }
+ return *this;
+ }
+ MetaUpdateParams& journal_entry_add(const rados::cls::fifo::fifo_journal_entry_t& entry) {
+ state.journal_entries_add.push_back(entry);
+ return *this;
+ }
+ MetaUpdateParams& journal_entries_rm(std::vector<rados::cls::fifo::fifo_journal_entry_t>& entries) {
+ state.journal_entries_rm = entries;
+ return *this;
+ }
+
+ int apply_update(CephContext *cct,
+ rados::cls::fifo::fifo_info_t *info);
+ };
+
+ static int meta_update(librados::ObjectWriteOperation *rados_op,
+ const MetaUpdateParams& params);
+ /* init part */
+
+ struct PartInitParams {
+ struct State {
+ string tag;
+ rados::cls::fifo::fifo_data_params_t data_params;
+ } state;
+
+ PartInitParams& tag(const std::string& tag) {
+ state.tag = tag;
+ return *this;
+ }
+ PartInitParams& data_params(const rados::cls::fifo::fifo_data_params_t& data_params) {
+ state.data_params = data_params;
+ return *this;
+ }
+ };
+
+ static int part_init(librados::ObjectWriteOperation *op,
+ const PartInitParams& params);
+
+ /* push part */
+
+ struct PushPartParams {
+ struct State {
+ string tag;
+ std::vector<bufferlist> data_bufs;
+ uint64_t total_len{0};
+ } state;
+
+ PushPartParams& tag(const std::string& tag) {
+ state.tag = tag;
+ return *this;
+ }
+ PushPartParams& data(bufferlist& bl) {
+ state.total_len += bl.length();
+ state.data_bufs.emplace_back(bl);
+ return *this;
+ }
+ PushPartParams& data_bufs(std::vector<bufferlist>& dbs) {
+ for (auto& bl : dbs) {
+ data(bl);
+ }
+ return *this;
+ }
+ };
+
+ static int push_part(librados::ObjectWriteOperation *op,
+ const PushPartParams& params);
+ /* trim part */
+
+ struct TrimPartParams {
+ struct State {
+ std::optional<string> tag;
+ uint64_t ofs;
+ } state;
+
+ TrimPartParams& tag(std::optional<std::string> tag) {
+ state.tag = tag;
+ return *this;
+ }
+ TrimPartParams& ofs(uint64_t ofs) {
+ state.ofs = ofs;
+ return *this;
+ }
+ };
+
+ static int trim_part(librados::ObjectWriteOperation *op,
+ const TrimPartParams& params);
+ /* list part */
+
+ struct ListPartParams {
+ struct State {
+ std::optional<string> tag;
+ uint64_t ofs;
+ int max_entries{100};
+ } state;
+
+ ListPartParams& tag(const std::string& tag) {
+ state.tag = tag;
+ return *this;
+ }
+ ListPartParams& ofs(uint64_t ofs) {
+ state.ofs = ofs;
+ return *this;
+ }
+ ListPartParams& max_entries(int _max_entries) {
+ state.max_entries = _max_entries;
+ return *this;
+ }
+ };
+
+ static int list_part(librados::IoCtx& ioctx,
+ const string& oid,
+ const ListPartParams& params,
+ std::vector<cls_fifo_part_list_entry_t> *pentries,
+ bool *more,
+ bool *full_part = nullptr,
+ string *ptag = nullptr);
+
+ static int get_part_info(librados::IoCtx& ioctx,
+ const string& oid,
+ rados::cls::fifo::fifo_part_header_t *header);
+ };
+
+ struct fifo_entry {
+ bufferlist data;
+ string marker;
+ ceph::real_time mtime;
+ };
+
+ using fifo_part_info = rados::cls::fifo::fifo_part_header_t;
+
+ class FIFO {
+ CephContext *cct;
+ string id;
+
+ string meta_oid;
+
+ std::optional<librados::IoCtx> _ioctx;
+ librados::IoCtx *ioctx{nullptr};
+
+ fifo_info_t meta_info;
+
+ uint32_t part_header_size;
+ uint32_t part_entry_overhead;
+
+ bool is_open{false};
+
+ string craft_marker(int64_t part_num,
+ uint64_t part_ofs);
+
+ bool parse_marker(const string& marker,
+ int64_t *part_num,
+ uint64_t *part_ofs);
+
+ int update_meta(ClsFIFO::MetaUpdateParams& update_params,
+ bool *canceled);
+ int do_read_meta(std::optional<fifo_objv_t> objv = std::nullopt);
+
+ int create_part(int64_t part_num, const string& tag,
+ int64_t& max_part_num);
+ int remove_part(int64_t part_num, const string& tag,
+ int64_t& tail_part_num);
+
+ int process_journal_entry(const fifo_journal_entry_t& entry,
+ int64_t& tail_part_num,
+ int64_t& head_part_num,
+ int64_t& max_part_num);
+ int process_journal_entries(vector<fifo_journal_entry_t> *processed,
+ int64_t& tail_part_num,
+ int64_t& head_part_num,
+ int64_t& max_part_num);
+ int process_journal();
+
+ int prepare_new_part(bool is_head);
+ int prepare_new_head();
+
+ int push_entries(int64_t part_num, std::vector<bufferlist>& data_bufs);
+ int trim_part(int64_t part_num,
+ uint64_t ofs,
+ std::optional<string> tag);
+
+ public:
+ FIFO(CephContext *_cct,
+ const string& _id,
+ librados::IoCtx *_ioctx = nullptr) : cct(_cct),
+ id(_id),
+ ioctx(_ioctx) {
+ meta_oid = id;
+ }
+
+ int init_ioctx(librados::Rados *rados,
+ const string& pool,
+ std::optional<string> pool_ns);
+
+ void set_ioctx(librados::IoCtx *_ioctx) {
+ ioctx = ioctx;
+ }
+
+ int open(bool create,
+ std::optional<ClsFIFO::MetaCreateParams> create_params = std::nullopt);
+
+ int read_meta(std::optional<fifo_objv_t> objv = std::nullopt);
+
+ const fifo_info_t& get_meta() const {
+ return meta_info;
+ }
+
+ void get_part_layout_info(uint32_t *header_size, uint32_t *entry_overhead) {
+ if (header_size) {
+ *header_size = part_header_size;
+ }
+
+ if (entry_overhead) {
+ *entry_overhead = part_entry_overhead;
+ }
+ }
+
+ int push(bufferlist& bl);
+ int push(vector<bufferlist>& bl);
+
+ int list(int max_entries,
+ std::optional<string> marker,
+ vector<fifo_entry> *result,
+ bool *more);
+
+ int trim(const string& marker);
+
+ int get_part_info(int64_t part_num,
+ fifo_part_info *result);
+ };
+ } // namespace fifo
+ } // namespace cls
+} // namespace rados
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2019 Red Hat, Inc.
+ * Copyright (C) 2019 SUSE LLC
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#pragma once
+
+#include "include/types.h"
+#include "include/utime.h"
+#include "cls/fifo/cls_fifo_types.h"
+
+struct cls_fifo_meta_create_op
+{
+ string id;
+ std::optional<rados::cls::fifo::fifo_objv_t> objv;
+ struct {
+ string name;
+ string ns;
+ } pool;
+ std::optional<string> oid_prefix;
+
+ uint64_t max_part_size{0};
+ uint64_t max_entry_size{0};
+
+ bool exclusive{false};
+
+ void encode(bufferlist &bl) const {
+ ENCODE_START(1, 1, bl);
+ encode(id, bl);
+ encode(objv, bl);
+ encode(pool.name, bl);
+ encode(pool.ns, bl);
+ encode(oid_prefix, bl);
+ encode(max_part_size, bl);
+ encode(max_entry_size, bl);
+ encode(exclusive, bl);
+ ENCODE_FINISH(bl);
+ }
+ void decode(bufferlist::const_iterator &bl) {
+ DECODE_START(1, bl);
+ decode(id, bl);
+ decode(objv, bl);
+ decode(pool.name, bl);
+ decode(pool.ns, bl);
+ decode(oid_prefix, bl);
+ decode(max_part_size, bl);
+ decode(max_entry_size, bl);
+ decode(exclusive, bl);
+ DECODE_FINISH(bl);
+ }
+};
+WRITE_CLASS_ENCODER(cls_fifo_meta_create_op)
+
+struct cls_fifo_meta_get_op
+{
+ std::optional<rados::cls::fifo::fifo_objv_t> objv;
+
+ void encode(bufferlist &bl) const {
+ ENCODE_START(1, 1, bl);
+ encode(objv, bl);
+ ENCODE_FINISH(bl);
+ }
+ void decode(bufferlist::const_iterator &bl) {
+ DECODE_START(1, bl);
+ decode(objv, bl);
+ DECODE_FINISH(bl);
+ }
+};
+WRITE_CLASS_ENCODER(cls_fifo_meta_get_op)
+
+struct cls_fifo_meta_get_op_reply
+{
+ rados::cls::fifo::fifo_info_t info;
+ uint32_t part_header_size{0};
+ uint32_t part_entry_overhead{0}; /* per entry extra data that is stored */
+
+ void encode(bufferlist &bl) const {
+ ENCODE_START(1, 1, bl);
+ encode(info, bl);
+ encode(part_header_size, bl);
+ encode(part_entry_overhead, bl);
+ ENCODE_FINISH(bl);
+ }
+ void decode(bufferlist::const_iterator &bl) {
+ DECODE_START(1, bl);
+ decode(info, bl);
+ decode(part_header_size, bl);
+ decode(part_entry_overhead, bl);
+ DECODE_FINISH(bl);
+ }
+};
+WRITE_CLASS_ENCODER(cls_fifo_meta_get_op_reply)
+
+struct cls_fifo_meta_update_op
+{
+ rados::cls::fifo::fifo_objv_t objv;
+
+ std::optional<uint64_t> tail_part_num;
+ std::optional<uint64_t> head_part_num;
+ std::optional<uint64_t> min_push_part_num;
+ std::optional<uint64_t> max_push_part_num;
+ std::vector<rados::cls::fifo::fifo_journal_entry_t> journal_entries_add;
+ std::vector<rados::cls::fifo::fifo_journal_entry_t> journal_entries_rm;
+
+ void encode(bufferlist &bl) const {
+ ENCODE_START(1, 1, bl);
+ encode(objv, bl);
+ encode(tail_part_num, bl);
+ encode(head_part_num, bl);
+ encode(min_push_part_num, bl);
+ encode(max_push_part_num, bl);
+ encode(journal_entries_add, bl);
+ encode(journal_entries_rm, bl);
+ ENCODE_FINISH(bl);
+ }
+ void decode(bufferlist::const_iterator &bl) {
+ DECODE_START(1, bl);
+ decode(objv, bl);
+ decode(tail_part_num, bl);
+ decode(head_part_num, bl);
+ decode(min_push_part_num, bl);
+ decode(max_push_part_num, bl);
+ decode(journal_entries_add, bl);
+ decode(journal_entries_rm, bl);
+ DECODE_FINISH(bl);
+ }
+};
+WRITE_CLASS_ENCODER(cls_fifo_meta_update_op)
+
+struct cls_fifo_part_init_op
+{
+ string tag;
+ rados::cls::fifo::fifo_data_params_t data_params;
+
+ void encode(bufferlist &bl) const {
+ ENCODE_START(1, 1, bl);
+ encode(tag, bl);
+ encode(data_params, bl);
+ ENCODE_FINISH(bl);
+ }
+ void decode(bufferlist::const_iterator &bl) {
+ DECODE_START(1, bl);
+ decode(tag, bl);
+ decode(data_params, bl);
+ DECODE_FINISH(bl);
+ }
+};
+WRITE_CLASS_ENCODER(cls_fifo_part_init_op)
+
+struct cls_fifo_part_push_op
+{
+ string tag;
+ std::vector<bufferlist> data_bufs;
+ uint64_t total_len{0};
+
+ void encode(bufferlist &bl) const {
+ ENCODE_START(1, 1, bl);
+ encode(tag, bl);
+ encode(data_bufs, bl);
+ encode(total_len, bl);
+ ENCODE_FINISH(bl);
+ }
+ void decode(bufferlist::const_iterator &bl) {
+ DECODE_START(1, bl);
+ decode(tag, bl);
+ decode(data_bufs, bl);
+ decode(total_len, bl);
+ DECODE_FINISH(bl);
+ }
+};
+WRITE_CLASS_ENCODER(cls_fifo_part_push_op)
+
+struct cls_fifo_part_trim_op
+{
+ std::optional<string> tag;
+ uint64_t ofs{0};
+
+ void encode(bufferlist &bl) const {
+ ENCODE_START(1, 1, bl);
+ encode(tag, bl);
+ encode(ofs, bl);
+ ENCODE_FINISH(bl);
+ }
+ void decode(bufferlist::const_iterator &bl) {
+ DECODE_START(1, bl);
+ decode(tag, bl);
+ decode(ofs, bl);
+ DECODE_FINISH(bl);
+ }
+};
+WRITE_CLASS_ENCODER(cls_fifo_part_trim_op)
+
+struct cls_fifo_part_list_op
+{
+ std::optional<string> tag;
+ uint64_t ofs{0};
+ int max_entries{100};
+
+ void encode(bufferlist &bl) const {
+ ENCODE_START(1, 1, bl);
+ encode(tag, bl);
+ encode(ofs, bl);
+ encode(max_entries, bl);
+ ENCODE_FINISH(bl);
+ }
+ void decode(bufferlist::const_iterator &bl) {
+ DECODE_START(1, bl);
+ decode(tag, bl);
+ decode(ofs, bl);
+ decode(max_entries, bl);
+ DECODE_FINISH(bl);
+ }
+};
+WRITE_CLASS_ENCODER(cls_fifo_part_list_op)
+
+struct cls_fifo_part_list_op_reply
+{
+ string tag;
+ vector<rados::cls::fifo::cls_fifo_part_list_entry_t> entries;
+ bool more{false};
+ bool full_part{false}; /* whether part is full or still can be written to.
+ A non full part is by definition head part */
+
+ void encode(bufferlist &bl) const {
+ ENCODE_START(1, 1, bl);
+ encode(tag, bl);
+ encode(entries, bl);
+ encode(more, bl);
+ encode(full_part, bl);
+ ENCODE_FINISH(bl);
+ }
+ void decode(bufferlist::const_iterator &bl) {
+ DECODE_START(1, bl);
+ decode(tag, bl);
+ decode(entries, bl);
+ decode(more, bl);
+ decode(full_part, bl);
+ DECODE_FINISH(bl);
+ }
+};
+WRITE_CLASS_ENCODER(cls_fifo_part_list_op_reply)
+
+struct cls_fifo_part_get_info_op
+{
+ void encode(bufferlist &bl) const {
+ ENCODE_START(1, 1, bl);
+ ENCODE_FINISH(bl);
+ }
+ void decode(bufferlist::const_iterator &bl) {
+ DECODE_START(1, bl);
+ DECODE_FINISH(bl);
+ }
+};
+WRITE_CLASS_ENCODER(cls_fifo_part_get_info_op)
+
+struct cls_fifo_part_get_info_op_reply
+{
+ rados::cls::fifo::fifo_part_header_t header;
+
+ void encode(bufferlist &bl) const {
+ ENCODE_START(1, 1, bl);
+ encode(header, bl);
+ ENCODE_FINISH(bl);
+ }
+ void decode(bufferlist::const_iterator &bl) {
+ DECODE_START(1, bl);
+ decode(header, bl);
+ DECODE_FINISH(bl);
+ }
+};
+WRITE_CLASS_ENCODER(cls_fifo_part_get_info_op_reply)
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2019 Red Hat, Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include "cls_fifo_types.h"
+
+string rados::cls::fifo::fifo_info_t::part_oid(int64_t part_num)
+{
+ char buf[oid_prefix.size() + 32];
+ snprintf(buf, sizeof(buf), "%s.%lld", oid_prefix.c_str(), (long long)part_num);
+
+ return string(buf);
+}
+
+void rados::cls::fifo::fifo_info_t::prepare_next_journal_entry(fifo_journal_entry_t *entry, const string& tag)
+{
+ entry->op = fifo_journal_entry_t::Op::OP_CREATE;
+ entry->part_num = max_push_part_num + 1;
+ entry->part_tag = tag;
+}
+
+int rados::cls::fifo::fifo_info_t::apply_update(std::optional<uint64_t>& _tail_part_num,
+ std::optional<uint64_t>& _head_part_num,
+ std::optional<uint64_t>& _min_push_part_num,
+ std::optional<uint64_t>& _max_push_part_num,
+ std::vector<rados::cls::fifo::fifo_journal_entry_t>& journal_entries_add,
+ std::vector<rados::cls::fifo::fifo_journal_entry_t>& journal_entries_rm,
+ string *err)
+{
+ if (_tail_part_num) {
+ tail_part_num = *_tail_part_num;
+ }
+
+ if (_min_push_part_num) {
+ min_push_part_num = *_min_push_part_num;
+ }
+
+ if (_max_push_part_num) {
+ max_push_part_num = *_max_push_part_num;
+ }
+
+ for (auto& entry : journal_entries_add) {
+ auto iter = journal.find(entry.part_num);
+ if (iter != journal.end() &&
+ iter->second.op == entry.op) {
+ /* don't allow multiple concurrent (same) operations on the same part,
+ racing clients should use objv to avoid races anyway */
+ if (err) {
+ stringstream ss;
+ ss << "multiple concurrent operations on same part are not allowed, part num=" << entry.part_num;
+ *err = ss.str();
+ }
+ return -EINVAL;
+ }
+
+ if (entry.op == fifo_journal_entry_t::Op::OP_CREATE) {
+ tags[entry.part_num] = entry.part_tag;
+ }
+
+ journal.insert(std::pair<int64_t, fifo_journal_entry_t>(entry.part_num, std::move(entry)));
+ }
+
+ for (auto& entry : journal_entries_rm) {
+ journal.erase(entry.part_num);
+ }
+
+ if (_head_part_num) {
+ tags.erase(head_part_num);
+ head_part_num = *_head_part_num;
+ auto iter = tags.find(head_part_num);
+ if (iter != tags.end()) {
+ head_tag = iter->second;
+ } else {
+ head_tag.erase();
+ }
+ }
+
+ return 0;
+}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2019 Red Hat, Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#pragma once
+
+
+#include "include/encoding.h"
+#include "include/types.h"
+
+
+class JSONObj;
+
+namespace rados {
+ namespace cls {
+ namespace fifo {
+ struct fifo_objv_t {
+ string instance;
+ uint64_t ver{0};
+
+ void encode(bufferlist &bl) const {
+ ENCODE_START(1, 1, bl);
+ encode(instance, bl);
+ encode(ver, bl);
+ ENCODE_FINISH(bl);
+ }
+ void decode(bufferlist::const_iterator &bl) {
+ DECODE_START(1, bl);
+ decode(instance, bl);
+ decode(ver, bl);
+ DECODE_FINISH(bl);
+ }
+ void dump(Formatter *f) const;
+ void decode_json(JSONObj *obj);
+
+ bool operator==(const fifo_objv_t& rhs) const {
+ return (instance == rhs.instance &&
+ ver == rhs.ver);
+ }
+
+ bool empty() const {
+ return instance.empty();
+ }
+
+ string to_str() {
+ char buf[instance.size() + 32];
+ snprintf(buf, sizeof(buf), "%s{%lld}", instance.c_str(), (long long)ver);
+ return string(buf);
+ }
+ };
+ WRITE_CLASS_ENCODER(rados::cls::fifo::fifo_objv_t)
+
+ struct fifo_data_params_t {
+ uint64_t max_part_size{0};
+ uint64_t max_entry_size{0};
+ uint64_t full_size_threshold{0};
+
+ void encode(bufferlist &bl) const {
+ ENCODE_START(1, 1, bl);
+ encode(max_part_size, bl);
+ encode(max_entry_size, bl);
+ encode(full_size_threshold, bl);
+ ENCODE_FINISH(bl);
+ }
+ void decode(bufferlist::const_iterator &bl) {
+ DECODE_START(1, bl);
+ decode(max_part_size, bl);
+ decode(max_entry_size, bl);
+ decode(full_size_threshold, bl);
+ DECODE_FINISH(bl);
+ }
+ void dump(Formatter *f) const;
+ void decode_json(JSONObj *obj);
+
+ bool operator==(const fifo_data_params_t& rhs) const {
+ return (max_part_size == rhs.max_part_size &&
+ max_entry_size == rhs.max_entry_size &&
+ full_size_threshold == rhs.full_size_threshold);
+ }
+ };
+ WRITE_CLASS_ENCODER(rados::cls::fifo::fifo_data_params_t)
+
+ struct fifo_journal_entry_t {
+ enum Op {
+ OP_UNKNOWN = 0,
+ OP_CREATE = 1,
+ OP_SET_HEAD = 2,
+ OP_REMOVE = 3,
+ } op{OP_UNKNOWN};
+
+ int64_t part_num{0};
+ string part_tag;
+
+ void encode(bufferlist &bl) const {
+ ENCODE_START(1, 1, bl);
+ encode((int)op, bl);
+ encode(part_num, bl);
+ encode(part_tag, bl);
+ ENCODE_FINISH(bl);
+ }
+ void decode(bufferlist::const_iterator &bl) {
+ DECODE_START(1, bl);
+ int i;
+ decode(i, bl);
+ op = (Op)i;
+ decode(part_num, bl);
+ decode(part_tag, bl);
+ DECODE_FINISH(bl);
+ }
+ void dump(Formatter *f) const;
+
+ bool operator==(const fifo_journal_entry_t& e) {
+ return (op == e.op &&
+ part_num == e.part_num &&
+ part_tag == e.part_tag);
+ }
+ };
+ WRITE_CLASS_ENCODER(rados::cls::fifo::fifo_journal_entry_t)
+
+ struct fifo_info_t {
+ string id;
+ fifo_objv_t objv;
+ string oid_prefix;
+ fifo_data_params_t data_params;
+
+ int64_t tail_part_num{0};
+ int64_t head_part_num{-1};
+ int64_t min_push_part_num{0};
+ int64_t max_push_part_num{-1};
+
+ string head_tag;
+ map<int64_t, string> tags;
+
+ std::multimap<int64_t, fifo_journal_entry_t> journal;
+
+ bool need_new_head() {
+ return (head_part_num < min_push_part_num);
+ }
+
+ bool need_new_part() {
+ return (max_push_part_num < min_push_part_num);
+ }
+
+ string part_oid(int64_t part_num);
+ void prepare_next_journal_entry(fifo_journal_entry_t *entry, const string& tag);
+
+ int apply_update(std::optional<uint64_t>& _tail_part_num,
+ std::optional<uint64_t>& _head_part_num,
+ std::optional<uint64_t>& _min_push_part_num,
+ std::optional<uint64_t>& _max_push_part_num,
+ std::vector<rados::cls::fifo::fifo_journal_entry_t>& journal_entries_add,
+ std::vector<rados::cls::fifo::fifo_journal_entry_t>& journal_entries_rm,
+ string *err);
+
+ void encode(bufferlist &bl) const {
+ ENCODE_START(1, 1, bl);
+ encode(id, bl);
+ encode(objv, bl);
+ encode(oid_prefix, bl);
+ encode(data_params, bl);
+ encode(tail_part_num, bl);
+ encode(head_part_num, bl);
+ encode(min_push_part_num, bl);
+ encode(max_push_part_num, bl);
+ encode(tags, bl);
+ encode(head_tag, bl);
+ encode(journal, bl);
+ ENCODE_FINISH(bl);
+ }
+ void decode(bufferlist::const_iterator &bl) {
+ DECODE_START(1, bl);
+ decode(id, bl);
+ decode(objv, bl);
+ decode(oid_prefix, bl);
+ decode(data_params, bl);
+ decode(tail_part_num, bl);
+ decode(head_part_num, bl);
+ decode(min_push_part_num, bl);
+ decode(max_push_part_num, bl);
+ decode(tags, bl);
+ decode(head_tag, bl);
+ decode(journal, bl);
+ DECODE_FINISH(bl);
+ }
+ void dump(Formatter *f) const;
+ void decode_json(JSONObj *obj);
+ };
+ WRITE_CLASS_ENCODER(rados::cls::fifo::fifo_info_t)
+
+ struct cls_fifo_part_list_entry_t {
+ bufferlist data;
+ uint64_t ofs;
+ ceph::real_time mtime;
+
+ cls_fifo_part_list_entry_t() {}
+ cls_fifo_part_list_entry_t(bufferlist&& _data,
+ uint64_t _ofs,
+ ceph::real_time _mtime) : data(std::move(_data)), ofs(_ofs), mtime(_mtime) {}
+
+
+ void encode(bufferlist &bl) const {
+ ENCODE_START(1, 1, bl);
+ encode(data, bl);
+ encode(ofs, bl);
+ encode(mtime, bl);
+ ENCODE_FINISH(bl);
+ }
+ void decode(bufferlist::const_iterator &bl) {
+ DECODE_START(1, bl);
+ decode(data, bl);
+ decode(ofs, bl);
+ decode(mtime, bl);
+ DECODE_FINISH(bl);
+ }
+ };
+ WRITE_CLASS_ENCODER(rados::cls::fifo::cls_fifo_part_list_entry_t)
+
+ struct fifo_part_header_t {
+ string tag;
+
+ fifo_data_params_t params;
+
+ uint64_t magic{0};
+
+ uint64_t min_ofs{0};
+ uint64_t max_ofs{0};
+ uint64_t min_index{0};
+ uint64_t max_index{0};
+
+ void encode(bufferlist &bl) const {
+ ENCODE_START(1, 1, bl);
+ encode(tag, bl);
+ encode(params, bl);
+ encode(magic, bl);
+ encode(min_ofs, bl);
+ encode(max_ofs, bl);
+ encode(min_index, bl);
+ encode(max_index, bl);
+ ENCODE_FINISH(bl);
+ }
+ void decode(bufferlist::const_iterator &bl) {
+ DECODE_START(1, bl);
+ decode(tag, bl);
+ decode(params, bl);
+ decode(magic, bl);
+ decode(min_ofs, bl);
+ decode(max_ofs, bl);
+ decode(min_index, bl);
+ decode(max_index, bl);
+ DECODE_FINISH(bl);
+ }
+ };
+ WRITE_CLASS_ENCODER(fifo_part_header_t)
+
+ }
+ }
+}
+
+static inline ostream& operator<<(ostream& os, const rados::cls::fifo::fifo_objv_t& objv)
+{
+ return os << objv.instance << "{" << objv.ver << "}";
+}
+
$<TARGET_PROPERTY:GTest::GTest,INTERFACE_INCLUDE_DIRECTORIES>)
add_subdirectory(cls_hello)
+add_subdirectory(cls_fifo)
add_subdirectory(cls_lock)
add_subdirectory(cls_cas)
add_subdirectory(cls_log)
--- /dev/null
+add_executable(ceph_test_cls_fifo
+ test_cls_fifo.cc
+ )
+target_link_libraries(ceph_test_cls_fifo
+ cls_fifo_client
+ librados
+ global
+ ${UNITTEST_LIBS}
+ ${BLKID_LIBRARIES}
+ ${CMAKE_DL_LIBS}
+ ${CRYPTO_LIBS}
+ ${EXTRALIBS}
+ radostest-cxx
+ )
+install(TARGETS
+ ceph_test_cls_fifo
+ DESTINATION ${CMAKE_INSTALL_BINDIR})
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2019 Red Hat, Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include <iostream>
+#include <errno.h>
+
+#include "include/types.h"
+#include "include/rados/librados.hpp"
+
+#include "test/librados/test_cxx.h"
+#include "global/global_context.h"
+
+#include "gtest/gtest.h"
+
+using namespace librados;
+
+#include "cls/fifo/cls_fifo_client.h"
+
+
+using namespace rados::cls::fifo;
+
+static CephContext *cct(librados::IoCtx& ioctx)
+{
+ return reinterpret_cast<CephContext *>(ioctx.cct());
+}
+
+static int fifo_create(IoCtx& ioctx,
+ const string& oid,
+ const string& id,
+ const ClsFIFO::MetaCreateParams& params)
+{
+ ObjectWriteOperation op;
+
+ int r = ClsFIFO::meta_create(&op, id, params);
+ if (r < 0) {
+ return r;
+ }
+
+ return ioctx.operate(oid, &op);
+}
+
+TEST(ClsFIFO, TestCreate) {
+ Rados cluster;
+ std::string pool_name = get_temp_pool_name();
+ ASSERT_EQ("", create_one_pool_pp(pool_name, cluster));
+ IoCtx ioctx;
+ cluster.ioctx_create(pool_name.c_str(), ioctx);
+
+ string fifo_id = "fifo";
+ string oid = fifo_id;
+
+ ASSERT_EQ(-EINVAL, fifo_create(ioctx, oid, string(),
+ ClsFIFO::MetaCreateParams()));
+
+ ASSERT_EQ(-EINVAL, fifo_create(ioctx, oid, fifo_id,
+ ClsFIFO::MetaCreateParams()
+ .max_part_size(0)));
+
+ ASSERT_EQ(-EINVAL, fifo_create(ioctx, oid, fifo_id,
+ ClsFIFO::MetaCreateParams()
+ .max_entry_size(0)));
+
+ /* first successful create */
+ ASSERT_EQ(0, fifo_create(ioctx, oid, fifo_id,
+ ClsFIFO::MetaCreateParams()));
+
+ uint64_t size;
+ struct timespec ts;
+ ASSERT_EQ(0, ioctx.stat2(oid, &size, &ts));
+ ASSERT_GT(size, 0);
+
+ /* test idempotency */
+ ASSERT_EQ(0, fifo_create(ioctx, oid, fifo_id,
+ ClsFIFO::MetaCreateParams()));
+
+ uint64_t size2;
+ struct timespec ts2;
+ ASSERT_EQ(0, ioctx.stat2(oid, &size2, &ts2));
+ ASSERT_EQ(size2, size);
+
+ ASSERT_EQ(-EEXIST, fifo_create(ioctx, oid, fifo_id,
+ ClsFIFO::MetaCreateParams()
+ .exclusive(true)));
+
+ ASSERT_EQ(-EEXIST, fifo_create(ioctx, oid, fifo_id,
+ ClsFIFO::MetaCreateParams()
+ .oid_prefix("myprefix")
+ .exclusive(false)));
+
+ ASSERT_EQ(-EEXIST, fifo_create(ioctx, oid, "foo",
+ ClsFIFO::MetaCreateParams()
+ .exclusive(false)));
+
+ ASSERT_EQ(0, destroy_one_pool_pp(pool_name, cluster));
+}
+
+TEST(ClsFIFO, TestGetInfo) {
+ Rados cluster;
+ std::string pool_name = get_temp_pool_name();
+ ASSERT_EQ("", create_one_pool_pp(pool_name, cluster));
+ IoCtx ioctx;
+ cluster.ioctx_create(pool_name.c_str(), ioctx);
+
+ string fifo_id = "fifo";
+ string oid = fifo_id;
+
+ fifo_info_t info;
+
+ /* first successful create */
+ ASSERT_EQ(0, fifo_create(ioctx, oid, fifo_id,
+ ClsFIFO::MetaCreateParams()));
+
+ uint32_t part_header_size;
+ uint32_t part_entry_overhead;
+
+ ASSERT_EQ(0, ClsFIFO::meta_get(ioctx, oid,
+ ClsFIFO::MetaGetParams(), &info,
+ &part_header_size, &part_entry_overhead));
+
+ ASSERT_GT(part_header_size, 0);
+ ASSERT_GT(part_entry_overhead, 0);
+
+ ASSERT_TRUE(!info.objv.instance.empty());
+
+ ASSERT_EQ(0, ClsFIFO::meta_get(ioctx, oid,
+ ClsFIFO::MetaGetParams()
+ .objv(info.objv),
+ &info,
+ &part_header_size, &part_entry_overhead));
+
+ fifo_objv_t objv;
+ objv.instance="foo";
+ objv.ver = 12;
+ ASSERT_EQ(-ECANCELED, ClsFIFO::meta_get(ioctx, oid,
+ ClsFIFO::MetaGetParams()
+ .objv(objv),
+ &info,
+ &part_header_size, &part_entry_overhead));
+
+ ASSERT_EQ(0, destroy_one_pool_pp(pool_name, cluster));
+}
+
+TEST(FIFO, TestOpenDefault) {
+ Rados cluster;
+ std::string pool_name = get_temp_pool_name();
+ ASSERT_EQ("", create_one_pool_pp(pool_name, cluster));
+ IoCtx ioctx;
+ cluster.ioctx_create(pool_name.c_str(), ioctx);
+
+ string fifo_id = "fifo";
+
+ FIFO fifo(cct(ioctx), fifo_id, &ioctx);
+
+ /* pre-open ops that should fail */
+ ASSERT_EQ(-EINVAL, fifo.read_meta());
+
+ bufferlist bl;
+ ASSERT_EQ(-EINVAL, fifo.push(bl));
+
+ ASSERT_EQ(-EINVAL, fifo.list(100, nullopt, nullptr, nullptr));
+ ASSERT_EQ(-EINVAL, fifo.trim(string()));
+
+ ASSERT_EQ(-ENOENT, fifo.open(false));
+
+ /* first successful create */
+ ASSERT_EQ(0, fifo.open(true));
+
+ ASSERT_EQ(0, fifo.read_meta()); /* force reading from backend */
+
+ auto info = fifo.get_meta();
+
+ ASSERT_EQ(info.id, fifo_id);
+
+ ASSERT_EQ(0, destroy_one_pool_pp(pool_name, cluster));
+}
+
+TEST(FIFO, TestOpenParams) {
+ Rados cluster;
+ std::string pool_name = get_temp_pool_name();
+ ASSERT_EQ("", create_one_pool_pp(pool_name, cluster));
+ IoCtx ioctx;
+ cluster.ioctx_create(pool_name.c_str(), ioctx);
+
+ string fifo_id = "fifo";
+
+ FIFO fifo(cct(ioctx), fifo_id, &ioctx);
+
+ uint64_t max_part_size = 10 * 1024;
+ uint64_t max_entry_size = 128;
+ string oid_prefix = "foo.123.";
+
+ fifo_objv_t objv;
+ objv.instance = "fooz";
+ objv.ver = 10;
+
+
+ /* first successful create */
+ ASSERT_EQ(0, fifo.open(true,
+ ClsFIFO::MetaCreateParams()
+ .max_part_size(max_part_size)
+ .max_entry_size(max_entry_size)
+ .oid_prefix(oid_prefix)
+ .objv(objv)));
+
+ ASSERT_EQ(0, fifo.read_meta()); /* force reading from backend */
+
+ auto info = fifo.get_meta();
+
+ ASSERT_EQ(info.id, fifo_id);
+ ASSERT_EQ(info.data_params.max_part_size, max_part_size);
+ ASSERT_EQ(info.data_params.max_entry_size, max_entry_size);
+ ASSERT_EQ(info.objv, objv);
+
+ ASSERT_EQ(0, destroy_one_pool_pp(pool_name, cluster));
+}
+
+template <class T>
+static int decode_entry(fifo_entry& entry,
+ T *val,
+ string *marker)
+{
+ *marker = entry.marker;
+ auto iter = entry.data.cbegin();
+
+ try {
+ decode(*val, iter);
+ } catch (buffer::error& err) {
+ return -EIO;
+ }
+
+ return 0;
+}
+
+TEST(FIFO, TestPushListTrim) {
+ Rados cluster;
+ std::string pool_name = get_temp_pool_name();
+ ASSERT_EQ("", create_one_pool_pp(pool_name, cluster));
+ IoCtx ioctx;
+ cluster.ioctx_create(pool_name.c_str(), ioctx);
+
+ string fifo_id = "fifo";
+
+ FIFO fifo(cct(ioctx), fifo_id, &ioctx);
+
+ /* first successful create */
+ ASSERT_EQ(0, fifo.open(true));
+
+ uint32_t max_entries = 10;
+
+ for (uint32_t i = 0; i < max_entries; ++i) {
+ bufferlist bl;
+ encode(i, bl);
+ ASSERT_EQ(0, fifo.push(bl));
+ }
+
+ string marker;
+
+ /* get entries one by one */
+
+ for (uint32_t i = 0; i < max_entries; ++i) {
+ vector<fifo_entry> result;
+ bool more;
+ ASSERT_EQ(0, fifo.list(1, marker, &result, &more));
+
+ bool expected_more = (i != (max_entries - 1));
+ ASSERT_EQ(expected_more, more);
+ ASSERT_EQ(1, result.size());
+
+ uint32_t val;
+ ASSERT_EQ(0, decode_entry(result.front(), &val, &marker));
+
+ ASSERT_EQ(i, val);
+ }
+
+ /* get all entries at once */
+ vector<fifo_entry> result;
+ bool more;
+ ASSERT_EQ(0, fifo.list(max_entries * 10, string(), &result, &more));
+
+ ASSERT_FALSE(more);
+ ASSERT_EQ(max_entries, result.size());
+
+ string markers[max_entries];
+
+
+ for (uint32_t i = 0; i < max_entries; ++i) {
+ uint32_t val;
+
+ ASSERT_EQ(0, decode_entry(result[i], &val, &markers[i]));
+ ASSERT_EQ(i, val);
+ }
+
+ uint32_t min_entry = 0;
+
+ /* trim one entry */
+ fifo.trim(markers[min_entry]);
+ ++min_entry;
+
+ ASSERT_EQ(0, fifo.list(max_entries * 10, string(), &result, &more));
+
+ ASSERT_FALSE(more);
+ ASSERT_EQ(max_entries - min_entry, result.size());
+
+ for (uint32_t i = min_entry; i < max_entries; ++i) {
+ uint32_t val;
+
+ ASSERT_EQ(0, decode_entry(result[i - min_entry], &val, &markers[i]));
+ ASSERT_EQ(i, val);
+ }
+
+
+ ASSERT_EQ(0, destroy_one_pool_pp(pool_name, cluster));
+}
+
+TEST(FIFO, TestPushTooBig) {
+ Rados cluster;
+ std::string pool_name = get_temp_pool_name();
+ ASSERT_EQ("", create_one_pool_pp(pool_name, cluster));
+ IoCtx ioctx;
+ cluster.ioctx_create(pool_name.c_str(), ioctx);
+
+ string fifo_id = "fifo";
+
+ FIFO fifo(cct(ioctx), fifo_id, &ioctx);
+
+ uint64_t max_part_size = 2048;
+ uint64_t max_entry_size = 128;
+
+ char buf[max_entry_size + 1];
+ memset(buf, 0, sizeof(buf));
+
+ /* first successful create */
+ ASSERT_EQ(0, fifo.open(true,
+ ClsFIFO::MetaCreateParams()
+ .max_part_size(max_part_size)
+ .max_entry_size(max_entry_size)));
+
+ bufferlist bl;
+ bl.append(buf, sizeof(buf));
+
+ ASSERT_EQ(-EINVAL, fifo.push(bl));
+
+
+ ASSERT_EQ(0, destroy_one_pool_pp(pool_name, cluster));
+}
+
+TEST(FIFO, TestMultipleParts) {
+ Rados cluster;
+ std::string pool_name = get_temp_pool_name();
+ ASSERT_EQ("", create_one_pool_pp(pool_name, cluster));
+ IoCtx ioctx;
+ cluster.ioctx_create(pool_name.c_str(), ioctx);
+
+ string fifo_id = "fifo";
+
+ FIFO fifo(cct(ioctx), fifo_id, &ioctx);
+
+ uint64_t max_part_size = 2048;
+ uint64_t max_entry_size = 128;
+
+ char buf[max_entry_size];
+ memset(buf, 0, sizeof(buf));
+
+ /* create */
+ ASSERT_EQ(0, fifo.open(true,
+ ClsFIFO::MetaCreateParams()
+ .max_part_size(max_part_size)
+ .max_entry_size(max_entry_size)));
+
+ uint32_t part_header_size;
+ uint32_t part_entry_overhead;
+
+ fifo.get_part_layout_info(&part_header_size, &part_entry_overhead);
+
+ int entries_per_part = (max_part_size - part_header_size) / (max_entry_size + part_entry_overhead);
+
+ int max_entries = entries_per_part * 4 + 1;
+
+ /* push enough entries */
+ for (int i = 0; i < max_entries; ++i) {
+ bufferlist bl;
+
+ *(int *)buf = i;
+ bl.append(buf, sizeof(buf));
+
+ ASSERT_EQ(0, fifo.push(bl));
+ }
+
+ auto info = fifo.get_meta();
+
+ ASSERT_EQ(info.id, fifo_id);
+ ASSERT_GT(info.head_part_num, 0); /* head should have advanced */
+
+
+ /* list all at once */
+ vector<fifo_entry> result;
+ bool more;
+ ASSERT_EQ(0, fifo.list(max_entries, string(), &result, &more));
+ ASSERT_EQ(false, more);
+
+ ASSERT_EQ(max_entries, result.size());
+
+ for (int i = 0; i < max_entries; ++i) {
+ auto& bl = result[i].data;
+ ASSERT_EQ(i, *(int *)bl.c_str());
+ }
+
+
+ /* list one at a time */
+ string marker;
+ for (int i = 0; i < max_entries; ++i) {
+ ASSERT_EQ(0, fifo.list(1, marker, &result, &more));
+
+ ASSERT_EQ(result.size(), 1);
+ bool expected_more = (i != (max_entries - 1));
+ ASSERT_EQ(expected_more, more);
+
+ auto& entry = result[0];
+
+ auto& bl = entry.data;
+ marker = entry.marker;
+
+ ASSERT_EQ(i, *(int *)bl.c_str());
+ }
+
+ /* trim one at a time */
+ marker.clear();
+ for (int i = 0; i < max_entries; ++i) {
+ /* read single entry */
+ ASSERT_EQ(0, fifo.list(1, marker, &result, &more));
+
+ ASSERT_EQ(result.size(), 1);
+ bool expected_more = (i != (max_entries - 1));
+ ASSERT_EQ(expected_more, more);
+
+ marker = result[0].marker;
+
+ /* trim */
+ ASSERT_EQ(0, fifo.trim(marker));
+
+ /* check tail */
+ info = fifo.get_meta();
+ ASSERT_EQ(info.tail_part_num, i / entries_per_part);
+
+ /* try to read all again, see how many entries left */
+ ASSERT_EQ(0, fifo.list(max_entries, marker, &result, &more));
+ ASSERT_EQ(max_entries - i - 1, result.size());
+ ASSERT_EQ(false, more);
+ }
+
+ /* tail now should point at head */
+ info = fifo.get_meta();
+ ASSERT_EQ(info.head_part_num, info.tail_part_num);
+
+ fifo_part_info part_info;
+
+ /* check old tails are removed */
+ for (int i = 0; i < info.tail_part_num; ++i) {
+ ASSERT_EQ(-ENOENT, fifo.get_part_info(i, &part_info));
+ }
+
+ /* check curent tail exists */
+ ASSERT_EQ(0, fifo.get_part_info(info.tail_part_num, &part_info));
+
+ ASSERT_EQ(0, destroy_one_pool_pp(pool_name, cluster));
+}
+
+TEST(FIFO, TestTwoPushers) {
+ Rados cluster;
+ std::string pool_name = get_temp_pool_name();
+ ASSERT_EQ("", create_one_pool_pp(pool_name, cluster));
+ IoCtx ioctx;
+ cluster.ioctx_create(pool_name.c_str(), ioctx);
+
+ string fifo_id = "fifo";
+
+ FIFO fifo(cct(ioctx), fifo_id, &ioctx);
+
+ uint64_t max_part_size = 2048;
+ uint64_t max_entry_size = 128;
+
+ char buf[max_entry_size];
+ memset(buf, 0, sizeof(buf));
+
+ /* create */
+ ASSERT_EQ(0, fifo.open(true,
+ ClsFIFO::MetaCreateParams()
+ .max_part_size(max_part_size)
+ .max_entry_size(max_entry_size)));
+
+ uint32_t part_header_size;
+ uint32_t part_entry_overhead;
+
+ fifo.get_part_layout_info(&part_header_size, &part_entry_overhead);
+
+ int entries_per_part = (max_part_size - part_header_size) / (max_entry_size + part_entry_overhead);
+
+ int max_entries = entries_per_part * 4 + 1;
+
+ FIFO fifo2(cct(ioctx), fifo_id, &ioctx);
+
+ /* open second one */
+ ASSERT_EQ(0, fifo2.open(true,
+ ClsFIFO::MetaCreateParams()));
+
+ vector<FIFO *> fifos(2);
+ fifos[0] = &fifo;
+ fifos[1] = &fifo2;
+
+ for (int i = 0; i < max_entries; ++i) {
+ bufferlist bl;
+
+ *(int *)buf = i;
+ bl.append(buf, sizeof(buf));
+
+ auto& f = fifos[i % fifos.size()];
+
+ ASSERT_EQ(0, f->push(bl));
+ }
+
+ /* list all by both */
+ vector<fifo_entry> result;
+ bool more;
+ ASSERT_EQ(0, fifo2.list(max_entries, string(), &result, &more));
+
+ ASSERT_EQ(false, more);
+
+ ASSERT_EQ(max_entries, result.size());
+
+ ASSERT_EQ(0, fifo.list(max_entries, string(), &result, &more));
+ ASSERT_EQ(false, more);
+
+ ASSERT_EQ(max_entries, result.size());
+
+ for (int i = 0; i < max_entries; ++i) {
+ auto& bl = result[i].data;
+ ASSERT_EQ(i, *(int *)bl.c_str());
+ }
+
+ ASSERT_EQ(0, destroy_one_pool_pp(pool_name, cluster));
+}
+
+TEST(FIFO, TestTwoPushersTrim) {
+ Rados cluster;
+ std::string pool_name = get_temp_pool_name();
+ ASSERT_EQ("", create_one_pool_pp(pool_name, cluster));
+ IoCtx ioctx;
+ cluster.ioctx_create(pool_name.c_str(), ioctx);
+
+ string fifo_id = "fifo";
+
+ FIFO fifo1(cct(ioctx), fifo_id, &ioctx);
+
+ uint64_t max_part_size = 2048;
+ uint64_t max_entry_size = 128;
+
+ char buf[max_entry_size];
+ memset(buf, 0, sizeof(buf));
+
+ /* create */
+ ASSERT_EQ(0, fifo1.open(true,
+ ClsFIFO::MetaCreateParams()
+ .max_part_size(max_part_size)
+ .max_entry_size(max_entry_size)));
+
+ uint32_t part_header_size;
+ uint32_t part_entry_overhead;
+
+ fifo1.get_part_layout_info(&part_header_size, &part_entry_overhead);
+
+ int entries_per_part = (max_part_size - part_header_size) / (max_entry_size + part_entry_overhead);
+
+ int max_entries = entries_per_part * 4 + 1;
+
+ FIFO fifo2(cct(ioctx), fifo_id, &ioctx);
+
+ /* open second one */
+ ASSERT_EQ(0, fifo2.open(true,
+ ClsFIFO::MetaCreateParams()));
+
+ /* push one entry to fifo2 and the rest to fifo1 */
+
+ for (int i = 0; i < max_entries; ++i) {
+ bufferlist bl;
+
+ *(int *)buf = i;
+ bl.append(buf, sizeof(buf));
+
+ FIFO *f = (i < 1 ? &fifo2 : &fifo1);
+
+ ASSERT_EQ(0, f->push(bl));
+ }
+
+ /* trim half by fifo1 */
+ int num = max_entries / 2;
+
+ vector<fifo_entry> result;
+ bool more;
+ ASSERT_EQ(0, fifo1.list(num, string(), &result, &more));
+
+ ASSERT_EQ(true, more);
+ ASSERT_EQ(num, result.size());
+
+ for (int i = 0; i < num; ++i) {
+ auto& bl = result[i].data;
+ ASSERT_EQ(i, *(int *)bl.c_str());
+ }
+
+ auto& entry = result[num - 1];
+ auto& marker = entry.marker;
+
+ ASSERT_EQ(0, fifo1.trim(marker));
+
+ /* list what's left by fifo2 */
+
+ int left = max_entries - num;
+
+ ASSERT_EQ(0, fifo2.list(left, marker, &result, &more));
+ ASSERT_EQ(left, result.size());
+ ASSERT_EQ(false, more);
+
+ for (int i = num; i < max_entries; ++i) {
+ auto& bl = result[i - num].data;
+ ASSERT_EQ(i, *(int *)bl.c_str());
+ }
+
+ ASSERT_EQ(0, destroy_one_pool_pp(pool_name, cluster));
+}
+
+TEST(FIFO, TestPushBatch) {
+ Rados cluster;
+ std::string pool_name = get_temp_pool_name();
+ ASSERT_EQ("", create_one_pool_pp(pool_name, cluster));
+ IoCtx ioctx;
+ cluster.ioctx_create(pool_name.c_str(), ioctx);
+
+ string fifo_id = "fifo";
+
+ FIFO fifo(cct(ioctx), fifo_id, &ioctx);
+
+ uint64_t max_part_size = 2048;
+ uint64_t max_entry_size = 128;
+
+ char buf[max_entry_size];
+ memset(buf, 0, sizeof(buf));
+
+ /* create */
+ ASSERT_EQ(0, fifo.open(true,
+ ClsFIFO::MetaCreateParams()
+ .max_part_size(max_part_size)
+ .max_entry_size(max_entry_size)));
+
+ uint32_t part_header_size;
+ uint32_t part_entry_overhead;
+
+ fifo.get_part_layout_info(&part_header_size, &part_entry_overhead);
+
+ int entries_per_part = (max_part_size - part_header_size) / (max_entry_size + part_entry_overhead);
+
+ int max_entries = entries_per_part * 4 + 1; /* enough entries to span multiple parts */
+
+ vector<bufferlist> bufs;
+
+ for (int i = 0; i < max_entries; ++i) {
+ bufferlist bl;
+
+ *(int *)buf = i;
+ bl.append(buf, sizeof(buf));
+
+ bufs.push_back(bl);
+ }
+
+ ASSERT_EQ(0, fifo.push(bufs));
+
+ /* list all */
+
+ vector<fifo_entry> result;
+ bool more;
+ ASSERT_EQ(0, fifo.list(max_entries, string(), &result, &more));
+
+ ASSERT_EQ(false, more);
+ ASSERT_EQ(max_entries, result.size());
+
+ for (int i = 0; i < max_entries; ++i) {
+ auto& bl = result[i].data;
+ ASSERT_EQ(i, *(int *)bl.c_str());
+ }
+
+ auto& info = fifo.get_meta();
+ ASSERT_EQ(info.head_part_num, 4);
+
+ ASSERT_EQ(0, destroy_one_pool_pp(pool_name, cluster));
+}