From: Yehuda Sadeh Date: Thu, 5 Sep 2019 11:15:15 +0000 (-0700) Subject: cls/fifo: FIFO over RADOS X-Git-Tag: v16.1.0~1154^2~12 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=55255343f786057f6b87d1352473d554cfe68eb2;p=ceph.git cls/fifo: FIFO over RADOS This is an implementation of fifo queue over rados. Data is appended to rados object until it's full. At that point data will be written to a new object. Data is read from the tail sequentially (can be iterated using marker). Data can be trimmed (up to and including marker). Queue has a header object (meta), and zero or more data objects (parts). The software has two layers: the higher level client operations side that deals with the application apis, and manages the meta and parts, and there’s the objclass level that deals with the rados layout of meta and part objects. There are different objclass methods that deal with reading and modifying each of these entities. A single part has max possible size, however, it may become full once a certain smaller size is reached (full_size_threshold). It is imperative that once a part has reached its capacity, it will not allow any more writes into it. For this reason, it is important that data being written to the queue does not exceed max_entry_size . This is enforced, by the higher level client api. Written entries go to the current head object, and when it’s full, a new head part is created. When listing entries, data is iterated from tail to the current head. Trim can either change the pointer within the current tail object, and if needed it removes tail objects. A unitest has been created to test functionality. Signed-off-by: Yehuda Sadeh Squashed-by: Adam C. Emerson --- diff --git a/src/cls/CMakeLists.txt b/src/cls/CMakeLists.txt index cb427482e92f..a100dd04b68e 100644 --- a/src/cls/CMakeLists.txt +++ b/src/cls/CMakeLists.txt @@ -271,6 +271,7 @@ set_target_properties(cls_cas PROPERTIES install(TARGETS cls_cas DESTINATION ${cls_dir}) + #cls_queue set(cls_queue_srcs queue/cls_queue.cc @@ -307,6 +308,7 @@ if (WITH_RADOSGW) add_library(cls_rgw_gc_client STATIC ${cls_rgw_gc_client_srcs}) endif (WITH_RADOSGW) + #cls_2pc_queue set(cls_2pc_queue_srcs 2pc_queue/cls_2pc_queue.cc @@ -319,9 +321,25 @@ set_target_properties(cls_2pc_queue PROPERTIES INSTALL_RPATH "" CXX_VISIBILITY_PRESET hidden) install(TARGETS cls_2pc_queue DESTINATION ${cls_dir}) - set(cls_2pc_queue_client_srcs 2pc_queue/cls_2pc_queue_client.cc) add_library(cls_2pc_queue_client STATIC ${cls_2pc_queue_client_srcs}) + add_subdirectory(cmpomap) + +# cls_fifo +set(cls_fifo_srcs fifo/cls_fifo.cc fifo/cls_fifo_types.cc) +add_library(cls_fifo SHARED ${cls_fifo_srcs}) +set_target_properties(cls_fifo PROPERTIES + VERSION "1.0.0" + SOVERSION "1" + INSTALL_RPATH "" + CXX_VISIBILITY_PRESET hidden) +install(TARGETS cls_fifo DESTINATION ${cls_dir}) + +set(cls_fifo_client_srcs + fifo/cls_fifo_client.cc + fifo/cls_fifo_types.cc + fifo/cls_fifo_ops.cc) +add_library(cls_fifo_client STATIC ${cls_fifo_client_srcs}) diff --git a/src/cls/fifo/cls_fifo.cc b/src/cls/fifo/cls_fifo.cc new file mode 100644 index 000000000000..553ad3035739 --- /dev/null +++ b/src/cls/fifo/cls_fifo.cc @@ -0,0 +1,907 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +/** \file + * + * This is an OSD class that implements methods for management + * and use of fifo + * + */ + +#include + +#include "objclass/objclass.h" + +#include "cls/fifo/cls_fifo_ops.h" +#include "cls/fifo/cls_fifo_types.h" + + +using namespace rados::cls::fifo; + + +CLS_VER(1,0) +CLS_NAME(fifo) + + +#define CLS_FIFO_MAX_PART_HEADER_SIZE 512 + +static uint32_t part_entry_overhead; + +struct cls_fifo_entry_header_pre { + __le64 magic; + __le64 pre_size; + __le64 header_size; + __le64 data_size; + __le64 index; + __le32 reserved; +} __attribute__ ((packed)); + +struct cls_fifo_entry_header { + ceph::real_time mtime; + + void encode(bufferlist &bl) const { + ENCODE_START(1, 1, bl); + encode(mtime, bl); + ENCODE_FINISH(bl); + } + void decode(bufferlist::const_iterator &bl) { + DECODE_START(1, bl); + decode(mtime, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(cls_fifo_entry_header) + + +static string new_oid_prefix(string id, std::optional& val) +{ + if (val) { + return *val; + } + +#define PREFIX_RND_SIZE 12 + + char buf[PREFIX_RND_SIZE + 1]; + buf[PREFIX_RND_SIZE] = 0; + + cls_gen_rand_base64(buf, sizeof(buf) - 1); + + char s[id.size() + 1 + sizeof(buf) + 16]; + snprintf(s, sizeof(s), "%s.%s", id.c_str(), buf); + return s; +} + +static int write_header(cls_method_context_t hctx, + fifo_info_t& header, + bool inc_ver = true) +{ + if (header.objv.instance.empty()) { +#define HEADER_INSTANCE_SIZE 16 + char buf[HEADER_INSTANCE_SIZE + 1]; + buf[HEADER_INSTANCE_SIZE] = 0; + cls_gen_rand_base64(buf, sizeof(buf) - 1); + + header.objv.instance = buf; + } + if (inc_ver) { + ++header.objv.ver; + } + bufferlist bl; + encode(header, bl); + return cls_cxx_write_full(hctx, &bl); +} + +static int read_part_header(cls_method_context_t hctx, + fifo_part_header_t *part_header) +{ + bufferlist bl; + int r = cls_cxx_read2(hctx, 0, CLS_FIFO_MAX_PART_HEADER_SIZE, &bl, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED); + if (r < 0) { + CLS_ERR("ERROR: %s(): cls_cxx_read2() on obj returned %d", __func__, r); + return r; + } + + auto iter = bl.cbegin(); + try { + decode(*part_header, iter); + } catch (buffer::error& err) { + CLS_ERR("ERROR: %s(): failed decoding part header", __func__); + return -EIO; + } + + CLS_LOG(20, "%s():%d read part_header:\n" + "\ttag=%s\n" + "\tmagic=0x%llx\n" + "\tmin_ofs=%lld\n" + "\tmax_ofs=%lld\n" + "\tmin_index=%lld\n" + "\tmax_index=%lld\n", + __func__, __LINE__, + part_header->tag.c_str(), + (long long)part_header->magic, + (long long)part_header->min_ofs, + (long long)part_header->max_ofs, + (long long)part_header->min_index, + (long long)part_header->max_index); + + return 0; + +} + +static int write_part_header(cls_method_context_t hctx, + fifo_part_header_t& part_header) +{ + bufferlist bl; + encode(part_header, bl); + + if (bl.length() > CLS_FIFO_MAX_PART_HEADER_SIZE) { + CLS_LOG(10, "%s(): cannot write part header, buffer exceeds max size", __func__); + return -EIO; + } + + int r = cls_cxx_write2(hctx, 0, bl.length(), + &bl, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED); + if (r < 0) { + CLS_LOG(10, "%s(): failed to write part header: r=%d", + __func__, r); + return r; + } + + return 0; +} + +static int read_header(cls_method_context_t hctx, + std::optional objv, + fifo_info_t *info) +{ + uint64_t size; + + int r = cls_cxx_stat2(hctx, &size, nullptr); + if (r < 0) { + CLS_ERR("ERROR: %s(): cls_cxx_stat2() on obj returned %d", __func__, r); + return r; + } + + bufferlist bl; + r = cls_cxx_read2(hctx, 0, size, &bl, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED); + if (r < 0) { + CLS_ERR("ERROR: %s(): cls_cxx_read2() on obj returned %d", __func__, r); + return r; + } + + try { + auto iter = bl.cbegin(); + decode(*info, iter); + } catch (buffer::error& err) { + CLS_ERR("ERROR: %s(): failed decoding header", __func__); + return -EIO; + } + + if (objv && + !(info->objv == *objv)) { + string s1 = info->objv.to_str(); + string s2 = objv->to_str(); + CLS_LOG(10, "%s(): version mismatch (header=%s, req=%s), cancelled operation", __func__, s1.c_str(), s2.c_str()); + return -ECANCELED; + } + + return 0; +} + +static int fifo_meta_create_op(cls_method_context_t hctx, + bufferlist *in, bufferlist *out) +{ + CLS_LOG(20, "%s", __func__); + + cls_fifo_meta_create_op op; + try { + auto iter = in->cbegin(); + decode(op, iter); + } catch (const buffer::error &err) { + CLS_ERR("ERROR: %s(): failed to decode request", __func__); + return -EINVAL; + } + + uint64_t size; + + int r = cls_cxx_stat2(hctx, &size, nullptr); + if (r < 0 && r != -ENOENT) { + CLS_ERR("ERROR: %s(): cls_cxx_stat2() on obj returned %d", __func__, r); + return r; + } + if (op.exclusive && r == 0) { + CLS_LOG(10, "%s(): exclusive create but queue already exists", __func__); + return -EEXIST; + } + + if (r == 0) { + bufferlist bl; + r = cls_cxx_read2(hctx, 0, size, &bl, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED); + if (r < 0) { + CLS_ERR("ERROR: %s(): cls_cxx_read2() on obj returned %d", __func__, r); + return r; + } + + fifo_info_t header; + try { + auto iter = bl.cbegin(); + decode(header, iter); + } catch (buffer::error& err) { + CLS_ERR("ERROR: %s(): failed decoding header", __func__); + return -EIO; + } + + if (!(header.id == op.id && + (!op.oid_prefix || + header.oid_prefix == *op.oid_prefix) && + (!op.objv || + header.objv == *op.objv))) { + CLS_LOG(10, "%s(): failed to re-create existing queue with different params", __func__); + return -EEXIST; + } + + return 0; /* already exists */ + } + fifo_info_t header; + + header.id = op.id; + if (op.objv) { + header.objv = *op.objv; + } else { +#define DEFAULT_INSTANCE_SIZE 16 + char buf[DEFAULT_INSTANCE_SIZE + 1]; + cls_gen_rand_base64(buf, sizeof(buf)); + buf[DEFAULT_INSTANCE_SIZE] = '\0'; + header.objv.instance = buf; + header.objv.ver = 1; + } + header.oid_prefix = new_oid_prefix(op.id, op.oid_prefix); + + header.data_params.max_part_size = op.max_part_size; + header.data_params.max_entry_size = op.max_entry_size; + header.data_params.full_size_threshold = op.max_part_size - op.max_entry_size - part_entry_overhead; + + r = write_header(hctx, header, false); + if (r < 0) { + CLS_LOG(10, "%s(): failed to write header: r=%d", __func__, r); + return r; + } + + return 0; +} + +static int fifo_meta_update_op(cls_method_context_t hctx, + bufferlist *in, bufferlist *out) +{ + CLS_LOG(20, "%s", __func__); + + cls_fifo_meta_update_op op; + try { + auto iter = in->cbegin(); + decode(op, iter); + } catch (const buffer::error &err) { + CLS_ERR("ERROR: %s(): failed to decode request", __func__); + return -EINVAL; + } + + fifo_info_t header; + + int r = read_header(hctx, op.objv, &header); + if (r < 0) { + return r; + } + + string err; + + r = header.apply_update(op.tail_part_num, + op.head_part_num, + op.min_push_part_num, + op.max_push_part_num, + op.journal_entries_add, + op.journal_entries_rm, + &err); + if (r < 0) { + CLS_LOG(10, "%s(): %s", __func__, err.c_str()); + return r; + } + + r = write_header(hctx, header); + if (r < 0) { + CLS_LOG(10, "%s(): failed to write header: r=%d", __func__, r); + return r; + } + + return 0; +} + +static int fifo_meta_get_op(cls_method_context_t hctx, + bufferlist *in, bufferlist *out) +{ + CLS_LOG(20, "%s", __func__); + + cls_fifo_meta_get_op op; + try { + auto iter = in->cbegin(); + decode(op, iter); + } catch (const buffer::error &err) { + CLS_ERR("ERROR: %s(): failed to decode request", __func__); + return -EINVAL; + } + + cls_fifo_meta_get_op_reply reply; + int r = read_header(hctx, op.objv, &reply.info); + if (r < 0) { + return r; + } + + reply.part_header_size = CLS_FIFO_MAX_PART_HEADER_SIZE; + reply.part_entry_overhead = part_entry_overhead; + + encode(reply, *out); + + return 0; +} + +static int fifo_part_init_op(cls_method_context_t hctx, + bufferlist *in, bufferlist *out) +{ + CLS_LOG(20, "%s", __func__); + + cls_fifo_part_init_op op; + try { + auto iter = in->cbegin(); + decode(op, iter); + } catch (const buffer::error &err) { + CLS_ERR("ERROR: %s(): failed to decode request", __func__); + return -EINVAL; + } + + uint64_t size; + + int r = cls_cxx_stat2(hctx, &size, nullptr); + if (r < 0 && r != -ENOENT) { + CLS_ERR("ERROR: %s(): cls_cxx_stat2() on obj returned %d", __func__, r); + return r; + } + if (r == 0 && size > 0) { + fifo_part_header_t part_header; + r = read_part_header(hctx, &part_header); + if (r < 0) { + CLS_LOG(10, "%s(): failed to read part header", __func__); + return r; + } + + if (!(part_header.tag == op.tag && + part_header.params == op.data_params)) { + CLS_LOG(10, "%s(): failed to re-create existing part with different params", __func__); + return -EEXIST; + } + + return 0; /* already exists */ + } + + fifo_part_header_t part_header; + + part_header.tag = op.tag; + part_header.params = op.data_params; + + part_header.min_ofs = CLS_FIFO_MAX_PART_HEADER_SIZE; + part_header.max_ofs = part_header.min_ofs; + + cls_gen_random_bytes((char *)&part_header.magic, sizeof(part_header.magic)); + + r = write_part_header(hctx, part_header); + if (r < 0) { + CLS_LOG(10, "%s(): failed to write header: r=%d", __func__, r); + return r; + } + + return 0; +} + +static bool full_part(const fifo_part_header_t& part_header) +{ + return (part_header.max_ofs > part_header.params.full_size_threshold); +} + +static int fifo_part_push_op(cls_method_context_t hctx, + bufferlist *in, bufferlist *out) +{ + CLS_LOG(20, "%s", __func__); + + cls_fifo_part_push_op op; + try { + auto iter = in->cbegin(); + decode(op, iter); + } catch (const buffer::error &err) { + CLS_ERR("ERROR: %s(): failed to decode request", __func__); + return -EINVAL; + } + + fifo_part_header_t part_header; + int r = read_part_header(hctx, &part_header); + if (r < 0) { + CLS_LOG(10, "%s(): failed to read part header", __func__); + return r; + } + + if (!(part_header.tag == op.tag)) { + CLS_LOG(10, "%s(): bad tag", __func__); + return -EINVAL; + } + + uint64_t effective_len = op.total_len + op.data_bufs.size() * part_entry_overhead; + + if (effective_len > part_header.params.max_entry_size + part_entry_overhead) { + return -EINVAL; + } + + if (full_part(part_header)) { + return -ERANGE; + } + + struct cls_fifo_entry_header entry_header; + entry_header.mtime = real_clock::now(); + + bufferlist entry_header_bl; + encode(entry_header, entry_header_bl); + + auto max_index = part_header.max_index; + auto ofs = part_header.max_ofs; + + cls_fifo_entry_header_pre pre_header; + pre_header.magic = part_header.magic; + pre_header.pre_size = sizeof(pre_header); + pre_header.reserved = 0; + + uint64_t total_data = 0; + + for (auto& data : op.data_bufs) { + total_data += data.length(); + + pre_header.header_size = entry_header_bl.length(); + pre_header.data_size = data.length(); + pre_header.index = max_index; + + bufferptr pre((char *)&pre_header, sizeof(pre_header)); + bufferlist all_data; + all_data.append(pre); + all_data.append(entry_header_bl); + all_data.claim_append(data); + + auto write_len = all_data.length(); + + r = cls_cxx_write2(hctx, ofs, write_len, + &all_data, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED); + if (r < 0) { + CLS_LOG(10, "%s(): failed to write entry (ofs=%lld len=%lld): r=%d", + __func__, (long long)part_header.max_ofs, (long long)write_len, r); + return r; + } + + ofs += write_len; + ++max_index; + } + + if (total_data != op.total_len) { + CLS_LOG(10, "%s(): length mismatch: op.total_len=%lld total data received=%lld", + __func__, (long long)op.total_len, (long long)total_data); + return -EINVAL; + } + + part_header.max_index = max_index; + part_header.max_ofs = ofs; + + r = write_part_header(hctx, part_header); + if (r < 0) { + CLS_LOG(10, "%s(): failed to write header: r=%d", __func__, r); + return r; + } + + return 0; +} + +class EntryReader { + static constexpr uint64_t prefetch_len = (128 * 1024); + + cls_method_context_t hctx; + + fifo_part_header_t& part_header; + + uint64_t ofs; + bufferlist data; + + int fetch(uint64_t num_bytes); + int read(uint64_t num_bytes, bufferlist *pbl); + int peek(uint64_t num_bytes, char *dest); + int seek(uint64_t num_bytes); + +public: + EntryReader(cls_method_context_t _hctx, + fifo_part_header_t& _part_header, + uint64_t _ofs) : hctx(_hctx), + part_header(_part_header), + ofs(_ofs) { + if (ofs < part_header.min_ofs) { + ofs = part_header.min_ofs; + } + } + + uint64_t get_ofs() const { + return ofs; + } + + bool end() const { + return (ofs >= part_header.max_ofs); + } + + int peek_pre_header(cls_fifo_entry_header_pre *pre_header); + int get_next_entry(bufferlist *pbl, + uint64_t *pofs, + ceph::real_time *pmtime); +}; + + +int EntryReader::fetch(uint64_t num_bytes) +{ + CLS_LOG(20, "%s(): fetch %d bytes, ofs=%d data.length()=%d", __func__, (int)num_bytes, (int)ofs, (int)data.length()); + if (data.length() < num_bytes) { + bufferlist bl; + CLS_LOG(20, "%s(): reading %d bytes at ofs=%d", __func__, (int)prefetch_len, (int)ofs + data.length()); + int r = cls_cxx_read2(hctx, ofs + data.length(), prefetch_len, &bl, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED); + if (r < 0) { + CLS_ERR("ERROR: %s(): cls_cxx_read2() on obj returned %d", __func__, r); + return r; + } + data.claim_append(bl); + } + + if ((unsigned)num_bytes > data.length()) { + CLS_LOG(20, "%s(): requested %lld bytes, but only %lld were available", __func__, (long long)num_bytes, (long long)data.length()); + return -ERANGE; + } + + return 0; +} + +int EntryReader::read(uint64_t num_bytes, bufferlist *pbl) +{ + int r = fetch(num_bytes); + if (r < 0) { + return r; + } + data.splice(0, num_bytes, pbl); + + ofs += num_bytes; + + return 0; +} + +int EntryReader::peek(uint64_t num_bytes, char *dest) +{ + int r = fetch(num_bytes); + if (r < 0) { + return r; + } + + data.begin().copy(num_bytes, dest); + + return 0; +} + +int EntryReader::seek(uint64_t num_bytes) +{ + bufferlist bl; + + CLS_LOG(20, "%s():%d: num_bytes=%d", __func__, __LINE__, (int)num_bytes); + return read(num_bytes, &bl); +} + +int EntryReader::peek_pre_header(cls_fifo_entry_header_pre *pre_header) +{ + if (end()) { + return -ENOENT; + } + + int r = peek(sizeof(*pre_header), (char *)pre_header); + if (r < 0) { + CLS_ERR("ERROR: %s(): peek() size=%d failed: r=%d", __func__, (int)sizeof(pre_header), r); + return r; + } + + if (pre_header->magic != part_header.magic) { + CLS_ERR("ERROR: %s(): unexpected pre_header magic", __func__); + return -ERANGE; + } + + return 0; +} + + +int EntryReader::get_next_entry(bufferlist *pbl, + uint64_t *pofs, + ceph::real_time *pmtime) +{ + cls_fifo_entry_header_pre pre_header; + int r = peek_pre_header(&pre_header); + if (r < 0) { + CLS_ERR("ERROR: %s(): peek_pre_header() failed: r=%d", __func__, r); + return r; + } + + if (pofs) { + *pofs = ofs; + } + + CLS_LOG(20, "%s():%d: pre_header.pre_size=%d", __func__, __LINE__, (int)pre_header.pre_size); + r = seek(pre_header.pre_size); + if (r < 0) { + CLS_ERR("ERROR: %s(): failed to seek: r=%d", __func__, r); + return r; + } + + bufferlist header; + CLS_LOG(20, "%s():%d: pre_header.header_size=%d", __func__, __LINE__, (int)pre_header.header_size); + r = read(pre_header.header_size, &header); + if (r < 0) { + CLS_ERR("ERROR: %s(): failed to read entry header: r=%d", __func__, r); + return r; + } + + cls_fifo_entry_header entry_header; + auto iter = header.cbegin(); + try { + decode(entry_header, iter); + } catch (buffer::error& err) { + CLS_ERR("%s(): failed decoding entry header", __func__); + return -EIO; + } + + if (pmtime) { + *pmtime = entry_header.mtime; + } + + if (pbl) { + r = read(pre_header.data_size, pbl); + if (r < 0) { + CLS_ERR("%s(): failed reading data: r=%d", __func__, r); + return r; + } + } else { + r = seek(pre_header.data_size); + if (r < 0) { + CLS_ERR("ERROR: %s(): failed to seek: r=%d", __func__, r); + return r; + } + } + + return 0; +} + +static int fifo_part_trim_op(cls_method_context_t hctx, + bufferlist *in, bufferlist *out) +{ + CLS_LOG(20, "%s", __func__); + + cls_fifo_part_trim_op op; + try { + auto iter = in->cbegin(); + decode(op, iter); + } catch (const buffer::error &err) { + CLS_ERR("ERROR: %s(): failed to decode request", __func__); + return -EINVAL; + } + + fifo_part_header_t part_header; + int r = read_part_header(hctx, &part_header); + if (r < 0) { + CLS_LOG(10, "%s(): failed to read part header", __func__); + return r; + } + + if (op.tag && + !(part_header.tag == *op.tag)) { + CLS_LOG(10, "%s(): bad tag", __func__); + return -EINVAL; + } + + if (op.ofs < part_header.min_ofs) { + return 0; + } + + if (op.ofs >= part_header.max_ofs) { + if (full_part(part_header)) { + /* + * trim full part completely: remove object + */ + + r = cls_cxx_remove(hctx); + if (r < 0) { + CLS_LOG(0, "%s(): ERROR: cls_cxx_remove() returned r=%d", __func__, r); + return r; + } + + return 0; + } + + part_header.min_ofs = part_header.max_ofs; + part_header.min_index = part_header.max_index; + } else { + EntryReader reader(hctx, part_header, op.ofs); + + cls_fifo_entry_header_pre pre_header; + int r = reader.peek_pre_header(&pre_header); + if (r < 0) { + return r; + } + + r = reader.get_next_entry(nullptr, nullptr, nullptr); + if (r < 0) { + CLS_ERR("ERROR: %s(): unexpected failure at get_next_entry(): r=%d", __func__, r); + return r; + } + + part_header.min_ofs = reader.get_ofs(); + part_header.min_index = pre_header.index + 1; + } + + r = write_part_header(hctx, part_header); + if (r < 0) { + CLS_LOG(10, "%s(): failed to write header: r=%d", __func__, r); + return r; + } + + return 0; +} + +static int fifo_part_list_op(cls_method_context_t hctx, + bufferlist *in, bufferlist *out) +{ + CLS_LOG(20, "%s", __func__); + + cls_fifo_part_list_op op; + try { + auto iter = in->cbegin(); + decode(op, iter); + } catch (const buffer::error &err) { + CLS_ERR("ERROR: %s(): failed to decode request", __func__); + return -EINVAL; + } + + fifo_part_header_t part_header; + int r = read_part_header(hctx, &part_header); + if (r < 0) { + CLS_LOG(10, "%s(): failed to read part header", __func__); + return r; + } + + if (op.tag && + !(part_header.tag == *op.tag)) { + CLS_LOG(10, "%s(): bad tag", __func__); + return -EINVAL; + } + + EntryReader reader(hctx, part_header, op.ofs); + + if (op.ofs >= part_header.min_ofs && + !reader.end()) { + r = reader.get_next_entry(nullptr, nullptr, nullptr); + if (r < 0) { + CLS_ERR("ERROR: %s(): unexpected failure at get_next_entry(): r=%d", __func__, r); + return r; + } + } + + cls_fifo_part_list_op_reply reply; + + reply.tag = part_header.tag; + +#define LIST_MAX_ENTRIES 512 + + auto max_entries = std::min(op.max_entries, (int)LIST_MAX_ENTRIES); + + for (int i = 0; i < max_entries && !reader.end(); ++i) { + bufferlist data; + ceph::real_time mtime; + uint64_t ofs; + + r = reader.get_next_entry(&data, &ofs, &mtime); + if (r < 0) { + CLS_ERR("ERROR: %s(): unexpected failure at get_next_entry(): r=%d", __func__, r); + return r; + } + + reply.entries.emplace_back(std::move(data), ofs, mtime); + } + + reply.more = !reader.end(); + reply.full_part = full_part(part_header); + + encode(reply, *out); + + return 0; +} + +static int fifo_part_get_info_op(cls_method_context_t hctx, + bufferlist *in, bufferlist *out) +{ + CLS_LOG(20, "%s", __func__); + + cls_fifo_part_get_info_op op; + try { + auto iter = in->cbegin(); + decode(op, iter); + } catch (const buffer::error &err) { + CLS_ERR("ERROR: %s(): failed to decode request", __func__); + return -EINVAL; + } + + cls_fifo_part_get_info_op_reply reply; + + int r = read_part_header(hctx, &reply.header); + if (r < 0) { + CLS_LOG(10, "%s(): failed to read part header", __func__); + return r; + } + + encode(reply, *out); + + return 0; +} + +CLS_INIT(fifo) +{ + CLS_LOG(20, "Loaded fifo class!"); + + cls_handle_t h_class; + cls_method_handle_t h_fifo_meta_create_op; + cls_method_handle_t h_fifo_meta_get_op; + cls_method_handle_t h_fifo_meta_update_op; + cls_method_handle_t h_fifo_part_init_op; + cls_method_handle_t h_fifo_part_push_op; + cls_method_handle_t h_fifo_part_trim_op; + cls_method_handle_t h_fifo_part_list_op; + cls_method_handle_t h_fifo_part_get_info_op; + + cls_register("fifo", &h_class); + cls_register_cxx_method(h_class, "fifo_meta_create", + CLS_METHOD_RD | CLS_METHOD_WR, + fifo_meta_create_op, &h_fifo_meta_create_op); + + cls_register_cxx_method(h_class, "fifo_meta_get", + CLS_METHOD_RD, + fifo_meta_get_op, &h_fifo_meta_get_op); + + cls_register_cxx_method(h_class, "fifo_meta_update", + CLS_METHOD_RD | CLS_METHOD_WR, + fifo_meta_update_op, &h_fifo_meta_update_op); + + cls_register_cxx_method(h_class, "fifo_part_init", + CLS_METHOD_RD | CLS_METHOD_WR, + fifo_part_init_op, &h_fifo_part_init_op); + + cls_register_cxx_method(h_class, "fifo_part_push", + CLS_METHOD_RD | CLS_METHOD_WR, + fifo_part_push_op, &h_fifo_part_push_op); + + cls_register_cxx_method(h_class, "fifo_part_trim", + CLS_METHOD_RD | CLS_METHOD_WR, + fifo_part_trim_op, &h_fifo_part_trim_op); + + cls_register_cxx_method(h_class, "fifo_part_list", + CLS_METHOD_RD, + fifo_part_list_op, &h_fifo_part_list_op); + + cls_register_cxx_method(h_class, "fifo_part_get_info", + CLS_METHOD_RD, + fifo_part_get_info_op, &h_fifo_part_get_info_op); + + /* calculate entry overhead */ + struct cls_fifo_entry_header entry_header; + bufferlist entry_header_bl; + encode(entry_header, entry_header_bl); + + part_entry_overhead = sizeof(cls_fifo_entry_header_pre) + entry_header_bl.length(); + + return; +} diff --git a/src/cls/fifo/cls_fifo_client.cc b/src/cls/fifo/cls_fifo_client.cc new file mode 100644 index 000000000000..f4888fda9b1f --- /dev/null +++ b/src/cls/fifo/cls_fifo_client.cc @@ -0,0 +1,1070 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2019 Red Hat, Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "include/rados/librados.hpp" +#include "common/dout.h" + +#include "auth/Crypto.h" + +using namespace librados; + +#include "cls/fifo/cls_fifo_ops.h" +#include "cls/fifo/cls_fifo_client.h" + + +#define dout_subsys ceph_subsys_objclass + + +namespace rados { + namespace cls { + namespace fifo { + int ClsFIFO::meta_create(librados::ObjectWriteOperation *rados_op, + const string& id, + const MetaCreateParams& params) { + cls_fifo_meta_create_op op; + + auto& state = params.state; + + if (id.empty()) { + return -EINVAL; + } + + op.id = id; + op.objv = state.objv; + op.oid_prefix = state.oid_prefix; + op.max_part_size = state.max_part_size; + op.max_entry_size = state.max_entry_size; + op.exclusive = state.exclusive; + + if (op.max_part_size == 0 || + op.max_entry_size == 0 || + op.max_entry_size > op.max_part_size) { + return -EINVAL; + } + + bufferlist in; + encode(op, in); + rados_op->exec("fifo", "fifo_meta_create", in); + + return 0; + } + + int ClsFIFO::meta_get(librados::IoCtx& ioctx, + const string& oid, + const MetaGetParams& params, + fifo_info_t *result, + uint32_t *part_header_size, + uint32_t *part_entry_overhead) { + cls_fifo_meta_get_op op; + + auto& state = params.state; + + op.objv = state.objv; + + librados::ObjectReadOperation rop; + + bufferlist in; + bufferlist out; + int op_ret; + encode(op, in); + rop.exec("fifo", "fifo_meta_get", in, &out, &op_ret); + + int r = ioctx.operate(oid, &rop, nullptr); + if (r < 0) { + return r; + } + + if (op_ret < 0) { + return op_ret; + } + + cls_fifo_meta_get_op_reply reply; + auto iter = out.cbegin(); + try { + decode(reply, iter); + } catch (buffer::error& err) { + return -EIO; + } + + *result = reply.info; + + if (part_header_size) { + *part_header_size = reply.part_header_size; + } + + if (part_entry_overhead) { + *part_entry_overhead = reply.part_entry_overhead; + } + + return 0; + } + + int ClsFIFO::meta_update(librados::ObjectWriteOperation *rados_op, + const MetaUpdateParams& params) { + cls_fifo_meta_update_op op; + + auto& state = params.state; + + if (state.objv.empty()) { + return -EINVAL; + } + + op.objv = state.objv; + op.tail_part_num = state.tail_part_num; + op.head_part_num = state.head_part_num; + op.min_push_part_num = state.min_push_part_num; + op.max_push_part_num = state.max_push_part_num; + op.journal_entries_add = state.journal_entries_add; + op.journal_entries_rm = state.journal_entries_rm; + + bufferlist in; + encode(op, in); + rados_op->exec("fifo", "fifo_meta_update", in); + + return 0; + } + + int ClsFIFO::part_init(librados::ObjectWriteOperation *rados_op, + const PartInitParams& params) { + cls_fifo_part_init_op op; + + auto& state = params.state; + + if (state.tag.empty()) { + return -EINVAL; + } + + op.tag = state.tag; + op.data_params = state.data_params; + + bufferlist in; + encode(op, in); + rados_op->exec("fifo", "fifo_part_init", in); + + return 0; + } + + int ClsFIFO::push_part(librados::ObjectWriteOperation *rados_op, + const PushPartParams& params) { + cls_fifo_part_push_op op; + + auto& state = params.state; + + if (state.tag.empty()) { + return -EINVAL; + } + + op.tag = state.tag; + op.data_bufs = state.data_bufs; + op.total_len = state.total_len; + + bufferlist in; + encode(op, in); + rados_op->exec("fifo", "fifo_part_push", in); + + return 0; + } + + int ClsFIFO::trim_part(librados::ObjectWriteOperation *rados_op, + const TrimPartParams& params) { + cls_fifo_part_trim_op op; + + auto& state = params.state; + + op.tag = state.tag; + op.ofs = state.ofs; + + bufferlist in; + encode(op, in); + rados_op->exec("fifo", "fifo_part_trim", in); + + return 0; + } + + int ClsFIFO::list_part(librados::IoCtx& ioctx, + const string& oid, + const ListPartParams& params, + std::vector *pentries, + bool *more, + bool *full_part, + string *ptag) + { + cls_fifo_part_list_op op; + + auto& state = params.state; + + op.tag = state.tag; + op.ofs = state.ofs; + op.max_entries = state.max_entries; + + librados::ObjectReadOperation rop; + + bufferlist in; + bufferlist out; + int op_ret; + encode(op, in); + rop.exec("fifo", "fifo_part_list", in, &out, &op_ret); + + int r = ioctx.operate(oid, &rop, nullptr); + if (r < 0) { + return r; + } + + if (op_ret < 0) { + return op_ret; + } + + cls_fifo_part_list_op_reply reply; + auto iter = out.cbegin(); + try { + decode(reply, iter); + } catch (buffer::error& err) { + return -EIO; + } + + if (pentries) { + *pentries = std::move(reply.entries); + } + + if (more) { + *more = reply.more; + } + + if (full_part) { + *full_part = reply.full_part; + } + + if (ptag) { + *ptag = reply.tag; + } + + return 0; + } + + int ClsFIFO::get_part_info(librados::IoCtx& ioctx, + const string& oid, + rados::cls::fifo::fifo_part_header_t *header) + { + cls_fifo_part_get_info_op op; + + librados::ObjectReadOperation rop; + + bufferlist in; + bufferlist out; + int op_ret; + encode(op, in); + rop.exec("fifo", "fifo_part_get_info", in, &out, &op_ret); + + int r = ioctx.operate(oid, &rop, nullptr); + if (r < 0) { + return r; + } + + if (op_ret < 0) { + return op_ret; + } + + cls_fifo_part_get_info_op_reply reply; + auto iter = out.cbegin(); + try { + decode(reply, iter); + } catch (buffer::error& err) { + return -EIO; + } + + if (header) { + *header = std::move(reply.header); + } + + return 0; + } + + string FIFO::craft_marker(int64_t part_num, + uint64_t part_ofs) + { + char buf[64]; + snprintf(buf, sizeof(buf), "%lld:%lld", (long long)part_num, (long long)part_ofs); + return string(buf); + } + + bool FIFO::parse_marker(const string& marker, + int64_t *part_num, + uint64_t *part_ofs) + { + if (marker.empty()) { + *part_num = meta_info.tail_part_num; + *part_ofs = 0; + return true; + } + + auto pos = marker.find(':'); + if (pos == string::npos) { + return false; + } + + auto first = marker.substr(0, pos); + auto second = marker.substr(pos + 1); + + string err; + + *part_num = (int64_t)strict_strtoll(first.c_str(), 10, &err); + if (!err.empty()) { + return false; + } + + *part_ofs = (uint64_t)strict_strtoll(second.c_str(), 10, &err); + if (!err.empty()) { + return false; + } + + return true; + } + + int FIFO::init_ioctx(librados::Rados *rados, + const string& pool, + std::optional pool_ns) + { + _ioctx.emplace(); + int r = rados->ioctx_create(pool.c_str(), *_ioctx); + if (r < 0) { + return r; + } + + if (pool_ns && !pool_ns->empty()) { + _ioctx->set_namespace(*pool_ns); + } + + ioctx = &(*_ioctx); + + return 0; + } + + int ClsFIFO::MetaUpdateParams::apply_update(CephContext *cct, + fifo_info_t *info) + { + string err; + + int r = info->apply_update(state.tail_part_num, + state.head_part_num, + state.min_push_part_num, + state.max_push_part_num, + state.journal_entries_add, + state.journal_entries_rm, + &err); + if (r < 0) { + ldout(cct, 0) << __func__ << "(): ERROR: " << err << dendl; + return r; + } + + ++info->objv.ver; + + return 0; + } + + int FIFO::update_meta(ClsFIFO::MetaUpdateParams& update_params, + bool *canceled) + { + update_params.objv(meta_info.objv); + + librados::ObjectWriteOperation wop; + int r = ClsFIFO::meta_update(&wop, update_params); + if (r < 0) { + return r; + } + + r = ioctx->operate(meta_oid, &wop); + if (r < 0 && r != -ECANCELED) { + return r; + } + + *canceled = (r == -ECANCELED); + + if (!*canceled) { + r = update_params.apply_update(cct, &meta_info); + if (r < 0) { /* should really not happen, + but if it does, let's treat it as if race was detected */ + *canceled = true; + } + } + + if (*canceled) { + r = do_read_meta(); + } + if (r < 0) { + return r; + } + + return 0; + } + + int FIFO::do_read_meta(std::optional objv) + { + ClsFIFO::MetaGetParams get_params; + if (objv) { + get_params.objv(*objv); + } + int r = ClsFIFO::meta_get(*ioctx, + meta_oid, + get_params, + &meta_info, + &part_header_size, + &part_entry_overhead); + if (r < 0) { + return r; + } + + return 0; + } + + int FIFO::create_part(int64_t part_num, const string& tag, + int64_t& max_part_num) { + librados::ObjectWriteOperation op; + + op.create(true); /* exclusive */ + int r = ClsFIFO::part_init(&op, + ClsFIFO::PartInitParams() + .tag(tag) + .data_params(meta_info.data_params)); + if (r < 0) { + return r; + } + + r = ioctx->operate(meta_info.part_oid(part_num), &op); + if (r < 0) { + return r; + } + + if (part_num > max_part_num) { + max_part_num = part_num; + } + + return 0; + } + + int FIFO::remove_part(int64_t part_num, const string& tag, + int64_t& tail_part_num) { + librados::ObjectWriteOperation op; + op.remove(); + int r = ioctx->operate(meta_info.part_oid(part_num), &op); + if (r == -ENOENT) { + r = 0; + } + if (r < 0) { + return r; + } + + if (part_num >= tail_part_num) { + tail_part_num = part_num + 1; + } + + return 0; + } + + int FIFO::process_journal_entry(const fifo_journal_entry_t& entry, + int64_t& tail_part_num, + int64_t& head_part_num, + int64_t& max_part_num) + { + + switch (entry.op) { + case fifo_journal_entry_t::Op::OP_CREATE: + return create_part(entry.part_num, entry.part_tag, max_part_num); + case fifo_journal_entry_t::Op::OP_SET_HEAD: + if (entry.part_num > head_part_num) { + head_part_num = entry.part_num; + } + return 0; + case fifo_journal_entry_t::Op::OP_REMOVE: + return remove_part(entry.part_num, entry.part_tag, tail_part_num); + default: + /* nothing to do */ + break; + } + + return -EIO; + } + + int FIFO::process_journal_entries(vector *processed, + int64_t& tail_part_num, + int64_t& head_part_num, + int64_t& max_part_num) + { + for (auto& iter : meta_info.journal) { + auto& entry = iter.second; + int r = process_journal_entry(entry, tail_part_num, head_part_num, max_part_num); + if (r < 0) { + ldout(cct, 10) << __func__ << "(): ERROR: failed processing journal entry for part=" << entry.part_num << dendl; + } else { + processed->push_back(entry); + } + } + + return 0; + } + + int FIFO::process_journal() + { + vector processed; + + int64_t new_tail = meta_info.tail_part_num; + int64_t new_head = meta_info.head_part_num; + int64_t new_max = meta_info.max_push_part_num; + + int r = process_journal_entries(&processed, new_tail, new_head, new_max); + if (r < 0) { + return r; + } + + if (processed.empty()) { + return 0; + } + +#define RACE_RETRY 10 + + int i; + + for (i = 0; i < RACE_RETRY; ++i) { + bool canceled; + + std::optional tail_part_num; + std::optional head_part_num; + std::optional max_part_num; + + if (new_tail > meta_info.tail_part_num) { + tail_part_num = new_tail; + } + + if (new_head > meta_info.head_part_num) { + head_part_num = new_head; + } + + if (new_max > meta_info.max_push_part_num) { + max_part_num = new_max; + } + + if (processed.empty() && + !tail_part_num && + !max_part_num) { + /* nothing to update anymore */ + break; + } + + r = update_meta(ClsFIFO::MetaUpdateParams() + .journal_entries_rm(processed) + .tail_part_num(tail_part_num) + .head_part_num(head_part_num) + .max_push_part_num(max_part_num), + &canceled); + if (r < 0) { + return r; + } + + if (canceled) { + vector new_processed; + + for (auto& e : processed) { + auto jiter = meta_info.journal.find(e.part_num); + if (jiter == meta_info.journal.end() || /* journal entry was already processed */ + !(jiter->second == e)) { + continue; + } + + new_processed.push_back(e); + } + processed = std::move(new_processed); + continue; + } + break; + } + if (i == RACE_RETRY) { + ldout(cct, 0) << "ERROR: " << __func__ << "(): race check failed too many times, likely a bug" << dendl; + return -ECANCELED; + } + return 0; + } + + static const char alphanum_plain_table[]="ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789"; + + void gen_rand_alphanumeric_plain(CephContext *cct, char *dest, int size) /* size should be the required string size + 1 */ + { + cct->random()->get_bytes(dest, size); + + int i; + for (i = 0; i < size - 1; i++) { + int pos = (unsigned)dest[i]; + dest[i] = alphanum_plain_table[pos % (sizeof(alphanum_plain_table) - 1)]; + } + dest[i] = '\0'; + } + + static string generate_tag(CephContext *cct) + { +#define HEADER_TAG_SIZE 16 + char buf[HEADER_TAG_SIZE + 1]; + buf[HEADER_TAG_SIZE] = 0; + gen_rand_alphanumeric_plain(cct, buf, sizeof(buf)); + return string(buf); + } + + int FIFO::prepare_new_part(bool is_head) + { + fifo_journal_entry_t jentry; + + meta_info.prepare_next_journal_entry(&jentry, generate_tag(cct)); + + int64_t new_head_part_num = meta_info.head_part_num; + + std::optional new_head_jentry; + if (is_head) { + new_head_jentry = jentry; + new_head_jentry->op = fifo_journal_entry_t::OP_SET_HEAD; + new_head_part_num = jentry.part_num; + } + + int r; + bool canceled; + + int i; + + for (i = 0; i < RACE_RETRY; ++i) { + r = update_meta(ClsFIFO::MetaUpdateParams() + .journal_entry_add(jentry) + .journal_entry_add(new_head_jentry), + &canceled); + if (r < 0) { + return r; + } + + if (canceled) { + if (meta_info.max_push_part_num >= jentry.part_num && + meta_info.head_part_num >= new_head_part_num) { /* raced, but new part was already written */ + return 0; + } + + auto iter = meta_info.journal.find(jentry.part_num); + if (iter == meta_info.journal.end()) { + continue; + } + } + break; + } + if (i == RACE_RETRY) { + ldout(cct, 0) << "ERROR: " << __func__ << "(): race check failed too many times, likely a bug" << dendl; + return -ECANCELED; + } + + r = process_journal(); + if (r < 0) { + return r; + } + + return 0; + } + + int FIFO::prepare_new_head() + { + int64_t new_head_num = meta_info.head_part_num + 1; + + if (meta_info.max_push_part_num < new_head_num) { + int r = prepare_new_part(true); + if (r < 0) { + return r; + } + + if (meta_info.max_push_part_num < new_head_num) { + ldout(cct, 0) << "ERROR: " << __func__ << ": after new part creation: meta_info.max_push_part_num=" + << meta_info.max_push_part_num << " new_head_num=" << meta_info.max_push_part_num << dendl; + return -EIO; + } + + return 0; + } + + int i; + + for (i = 0; i < RACE_RETRY; ++i) { + bool canceled; + int r = update_meta(ClsFIFO::MetaUpdateParams() + .head_part_num(new_head_num), + &canceled); + if (r < 0) { + return r; + } + + if (canceled) { + if (meta_info.head_part_num < new_head_num) { + continue; + } + } + break; + } + if (i == RACE_RETRY) { + ldout(cct, 0) << "ERROR: " << __func__ << "(): race check failed too many times, likely a bug" << dendl; + return -ECANCELED; + } + + + return 0; + } + + int FIFO::open(bool create, + std::optional create_params) + { + if (!ioctx) { + return -EINVAL; + } + + if (create) { + librados::ObjectWriteOperation op; + + ClsFIFO::MetaCreateParams default_params; + ClsFIFO::MetaCreateParams *params = (create_params ? &(*create_params) : &default_params); + + int r = ClsFIFO::meta_create(&op, id, *params); + if (r < 0) { + return r; + } + + r = ioctx->operate(meta_oid, &op); + if (r < 0) { + return r; + } + } + + std::optional objv = (create_params ? create_params->state.objv : nullopt); + + int r = do_read_meta(objv); + if (r < 0) { + return r; + } + + is_open = true; + + return 0; + } + + int FIFO::read_meta(std::optional objv) + { + if (!is_open) { + return -EINVAL; + } + + return do_read_meta(objv); + } + + int FIFO::push_entries(int64_t part_num, std::vector& data_bufs) + { + if (!is_open) { + return -EINVAL; + } + + librados::ObjectWriteOperation op; + + int r = ClsFIFO::push_part(&op, ClsFIFO::PushPartParams() + .tag(meta_info.head_tag) + .data_bufs(data_bufs)); + if (r < 0) { + return r; + } + + r = ioctx->operate(meta_info.part_oid(part_num), &op); + if (r < 0) { + return r; + } + + return 0; + } + + int FIFO::trim_part(int64_t part_num, + uint64_t ofs, + std::optional tag) + { + if (!is_open) { + return -EINVAL; + } + + librados::ObjectWriteOperation op; + + int r = ClsFIFO::trim_part(&op, ClsFIFO::TrimPartParams() + .tag(tag) + .ofs(ofs)); + if (r < 0) { + return r; + } + + r = ioctx->operate(meta_info.part_oid(part_num), &op); + if (r < 0) { + return r; + } + + return 0; + } + + int FIFO::push(bufferlist& bl) + { + std::vector data_bufs; + data_bufs.push_back(bl); + + return push(data_bufs); + } + + int FIFO::push(vector& data_bufs) + { + if (!is_open) { + return -EINVAL; + } + + int r; + + if (meta_info.need_new_head()) { + r = prepare_new_head(); + if (r < 0) { + return r; + } + } + + int i; + + auto iter = data_bufs.begin(); + + while (iter != data_bufs.end()) { + uint64_t batch_len = 0; + + vector batch; + + for (; iter != data_bufs.end(); ++iter) { + auto& data = *iter; + auto data_len = data.length(); + auto max_entry_size = meta_info.data_params.max_entry_size; + + if (data_len > max_entry_size) { + ldout(cct, 10) << __func__ << "(): entry too large: " << data_len << " > " << meta_info.data_params.max_entry_size << dendl; + return -EINVAL; + } + + if (batch_len + data_len > max_entry_size) { + break; + } + + batch_len += data_len + part_entry_overhead; /* we can send entry with data_len up to max_entry_size, + however, we want to also account the overhead when dealing + with multiple entries. Previous check doesn't account + for overhead on purpose. */ + + batch.push_back(data); + } + + + for (i = 0; i < RACE_RETRY; ++i) { + r = push_entries(meta_info.head_part_num, batch); + if (r == -ERANGE) { + r = prepare_new_head(); + if (r < 0) { + return r; + } + continue; + } + if (r < 0) { + return r; + } + break; + } + if (i == RACE_RETRY) { + ldout(cct, 0) << "ERROR: " << __func__ << "(): race check failed too many times, likely a bug" << dendl; + return -ECANCELED; + } + } + + return 0; + } + + int FIFO::list(int max_entries, + std::optional marker, + vector *result, + bool *more) + { + if (!is_open) { + return -EINVAL; + } + + *more = false; + + int64_t part_num = meta_info.tail_part_num; + uint64_t ofs = 0; + + if (marker) { + if (!parse_marker(*marker, &part_num, &ofs)) { + ldout(cct, 20) << __func__ << "(): failed to parse marker (" << *marker << ")" << dendl; + return -EINVAL; + } + } + + result->clear(); + result->reserve(max_entries); + + bool part_more{false}; + bool part_full{false}; + + while (max_entries > 0) { + std::vector entries; + int r = ClsFIFO::list_part(*ioctx, + meta_info.part_oid(part_num), + ClsFIFO::ListPartParams() + .ofs(ofs) + .max_entries(max_entries), + &entries, + &part_more, + &part_full, + nullptr); + if (r == -ENOENT) { + r = do_read_meta(); + if (r < 0) { + return r; + } + + if (part_num < meta_info.tail_part_num) { + /* raced with trim? restart */ + result->clear(); + part_num = meta_info.tail_part_num; + ofs = 0; + continue; + } + + /* assuming part was not written yet, so end of data */ + + *more = false; + + return 0; + } + if (r < 0) { + ldout(cct, 20) << __func__ << "(): ClsFIFO::list_part() on oid=" << meta_info.part_oid(part_num) << " returned r=" << r << dendl; + return r; + } + + for (auto& entry : entries) { + fifo_entry e; + e.data = std::move(entry.data); + e.marker = craft_marker(part_num, entry.ofs); + e.mtime = entry.mtime; + + result->push_back(e); + } + max_entries -= entries.size(); + + if (max_entries > 0 && + part_more) { + continue; + } + + if (!part_full) { /* head part is not full */ + break; + } + + ++part_num; + ofs = 0; + } + + *more = part_full || part_more; + + return 0; + } + + int FIFO::trim(const string& marker) + { + if (!is_open) { + return -EINVAL; + } + + int64_t part_num; + uint64_t ofs; + + if (!parse_marker(marker, &part_num, &ofs)) { + ldout(cct, 20) << __func__ << "(): failed to parse marker: marker=" << marker << dendl; + return -EINVAL; + } + + for (int64_t pn = meta_info.tail_part_num; pn < part_num; ++pn) { + int r = trim_part(pn, meta_info.data_params.max_part_size, std::nullopt); + if (r < 0 && + r != -ENOENT) { + ldout(cct, 0) << __func__ << "(): ERROR: trim_part() on part=" << pn << " returned r=" << r << dendl; + return r; + } + } + + int r = trim_part(part_num, ofs, std::nullopt); + if (r < 0 && + r != -ENOENT) { + ldout(cct, 0) << __func__ << "(): ERROR: trim_part() on part=" << part_num << " returned r=" << r << dendl; + return r; + } + + if (part_num <= meta_info.tail_part_num) { + /* don't need to modify meta info */ + return 0; + } + + int i; + + for (i = 0; i < RACE_RETRY; ++i) { + bool canceled; + int r = update_meta(ClsFIFO::MetaUpdateParams() + .tail_part_num(part_num), + &canceled); + if (r < 0) { + return r; + } + + if (canceled) { + if (meta_info.tail_part_num < part_num) { + continue; + } + } + break; + + if (i == RACE_RETRY) { + ldout(cct, 0) << "ERROR: " << __func__ << "(): race check failed too many times, likely a bug" << dendl; + return -ECANCELED; + } + } + + return 0; + } + + int FIFO::get_part_info(int64_t part_num, + fifo_part_info *result) + { + if (!is_open) { + return -EINVAL; + } + + fifo_part_header_t header; + + int r = ClsFIFO::get_part_info(*ioctx, + meta_info.part_oid(part_num), + &header); + if (r < 0) { + return r; + } + + *result = std::move(header); + + return 0; + } + + } // namespace fifo + } // namespace cls +} // namespace rados + diff --git a/src/cls/fifo/cls_fifo_client.h b/src/cls/fifo/cls_fifo_client.h new file mode 100644 index 000000000000..02e5681c47d3 --- /dev/null +++ b/src/cls/fifo/cls_fifo_client.h @@ -0,0 +1,382 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2019 Red Hat, Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + + +#pragma once + +#include "cls/fifo/cls_fifo_types.h" + +namespace rados { + namespace cls { + namespace fifo { + + class ClsFIFO { + public: + + /* create */ + + struct MetaCreateParams { + struct State { + static constexpr uint64_t default_max_part_size = 4 * 1024 * 1024; + static constexpr uint64_t default_max_entry_size = 32 * 1024; + std::optional objv; + std::optional oid_prefix; + bool exclusive{false}; + uint64_t max_part_size{default_max_part_size}; + uint64_t max_entry_size{default_max_entry_size}; + } state; + + MetaCreateParams& oid_prefix(const std::string& oid_prefix) { + state.oid_prefix = oid_prefix; + return *this; + } + MetaCreateParams& exclusive(bool exclusive) { + state.exclusive = exclusive; + return *this; + } + MetaCreateParams& max_part_size(uint64_t max_part_size) { + state.max_part_size = max_part_size; + return *this; + } + MetaCreateParams& max_entry_size(uint64_t max_entry_size) { + state.max_entry_size = max_entry_size; + return *this; + } + MetaCreateParams& objv(const fifo_objv_t& objv) { + state.objv = objv; + return *this; + } + MetaCreateParams& objv(const std::string& instance, uint64_t ver) { + state.objv = fifo_objv_t{instance, ver}; + return *this; + } + }; + + static int meta_create(librados::ObjectWriteOperation *op, + const string& id, + const MetaCreateParams& params); + + /* get info */ + + struct MetaGetParams { + struct State { + std::optional objv; + } state; + + MetaGetParams& objv(std::optional& v) { + state.objv = v; + return *this; + } + MetaGetParams& objv(const fifo_objv_t& v) { + state.objv = v; + return *this; + } + MetaGetParams& objv(const std::string& instance, uint64_t ver) { + state.objv = fifo_objv_t{instance, ver}; + return *this; + } + }; + static int meta_get(librados::IoCtx& ioctx, + const string& oid, + const MetaGetParams& params, + rados::cls::fifo::fifo_info_t *result, + uint32_t *part_header_size, + uint32_t *part_entry_overhead); + + /* update */ + + struct MetaUpdateParams { + struct State { + rados::cls::fifo::fifo_objv_t objv; + + std::optional tail_part_num; + std::optional head_part_num; + std::optional min_push_part_num; + std::optional max_push_part_num; + std::vector journal_entries_add; + std::vector journal_entries_rm; + } state; + + MetaUpdateParams& objv(const fifo_objv_t& objv) { + state.objv = objv; + return *this; + } + MetaUpdateParams& tail_part_num(std::optional tail_part_num) { + state.tail_part_num = tail_part_num; + return *this; + } + MetaUpdateParams& tail_part_num(uint64_t tail_part_num) { + state.tail_part_num = tail_part_num; + return *this; + } + MetaUpdateParams& head_part_num(std::optional head_part_num) { + state.head_part_num = head_part_num; + return *this; + } + MetaUpdateParams& head_part_num(uint64_t head_part_num) { + state.head_part_num = head_part_num; + return *this; + } + MetaUpdateParams& min_push_part_num(uint64_t num) { + state.min_push_part_num = num; + return *this; + } + MetaUpdateParams& max_push_part_num(std::optional num) { + state.max_push_part_num = num; + return *this; + } + MetaUpdateParams& max_push_part_num(uint64_t num) { + state.max_push_part_num = num; + return *this; + } + MetaUpdateParams& journal_entry_add(std::optional entry) { + if (entry) { + state.journal_entries_add.push_back(*entry); + } + return *this; + } + MetaUpdateParams& journal_entry_add(const rados::cls::fifo::fifo_journal_entry_t& entry) { + state.journal_entries_add.push_back(entry); + return *this; + } + MetaUpdateParams& journal_entries_rm(std::vector& entries) { + state.journal_entries_rm = entries; + return *this; + } + + int apply_update(CephContext *cct, + rados::cls::fifo::fifo_info_t *info); + }; + + static int meta_update(librados::ObjectWriteOperation *rados_op, + const MetaUpdateParams& params); + /* init part */ + + struct PartInitParams { + struct State { + string tag; + rados::cls::fifo::fifo_data_params_t data_params; + } state; + + PartInitParams& tag(const std::string& tag) { + state.tag = tag; + return *this; + } + PartInitParams& data_params(const rados::cls::fifo::fifo_data_params_t& data_params) { + state.data_params = data_params; + return *this; + } + }; + + static int part_init(librados::ObjectWriteOperation *op, + const PartInitParams& params); + + /* push part */ + + struct PushPartParams { + struct State { + string tag; + std::vector data_bufs; + uint64_t total_len{0}; + } state; + + PushPartParams& tag(const std::string& tag) { + state.tag = tag; + return *this; + } + PushPartParams& data(bufferlist& bl) { + state.total_len += bl.length(); + state.data_bufs.emplace_back(bl); + return *this; + } + PushPartParams& data_bufs(std::vector& dbs) { + for (auto& bl : dbs) { + data(bl); + } + return *this; + } + }; + + static int push_part(librados::ObjectWriteOperation *op, + const PushPartParams& params); + /* trim part */ + + struct TrimPartParams { + struct State { + std::optional tag; + uint64_t ofs; + } state; + + TrimPartParams& tag(std::optional tag) { + state.tag = tag; + return *this; + } + TrimPartParams& ofs(uint64_t ofs) { + state.ofs = ofs; + return *this; + } + }; + + static int trim_part(librados::ObjectWriteOperation *op, + const TrimPartParams& params); + /* list part */ + + struct ListPartParams { + struct State { + std::optional tag; + uint64_t ofs; + int max_entries{100}; + } state; + + ListPartParams& tag(const std::string& tag) { + state.tag = tag; + return *this; + } + ListPartParams& ofs(uint64_t ofs) { + state.ofs = ofs; + return *this; + } + ListPartParams& max_entries(int _max_entries) { + state.max_entries = _max_entries; + return *this; + } + }; + + static int list_part(librados::IoCtx& ioctx, + const string& oid, + const ListPartParams& params, + std::vector *pentries, + bool *more, + bool *full_part = nullptr, + string *ptag = nullptr); + + static int get_part_info(librados::IoCtx& ioctx, + const string& oid, + rados::cls::fifo::fifo_part_header_t *header); + }; + + struct fifo_entry { + bufferlist data; + string marker; + ceph::real_time mtime; + }; + + using fifo_part_info = rados::cls::fifo::fifo_part_header_t; + + class FIFO { + CephContext *cct; + string id; + + string meta_oid; + + std::optional _ioctx; + librados::IoCtx *ioctx{nullptr}; + + fifo_info_t meta_info; + + uint32_t part_header_size; + uint32_t part_entry_overhead; + + bool is_open{false}; + + string craft_marker(int64_t part_num, + uint64_t part_ofs); + + bool parse_marker(const string& marker, + int64_t *part_num, + uint64_t *part_ofs); + + int update_meta(ClsFIFO::MetaUpdateParams& update_params, + bool *canceled); + int do_read_meta(std::optional objv = std::nullopt); + + int create_part(int64_t part_num, const string& tag, + int64_t& max_part_num); + int remove_part(int64_t part_num, const string& tag, + int64_t& tail_part_num); + + int process_journal_entry(const fifo_journal_entry_t& entry, + int64_t& tail_part_num, + int64_t& head_part_num, + int64_t& max_part_num); + int process_journal_entries(vector *processed, + int64_t& tail_part_num, + int64_t& head_part_num, + int64_t& max_part_num); + int process_journal(); + + int prepare_new_part(bool is_head); + int prepare_new_head(); + + int push_entries(int64_t part_num, std::vector& data_bufs); + int trim_part(int64_t part_num, + uint64_t ofs, + std::optional tag); + + public: + FIFO(CephContext *_cct, + const string& _id, + librados::IoCtx *_ioctx = nullptr) : cct(_cct), + id(_id), + ioctx(_ioctx) { + meta_oid = id; + } + + int init_ioctx(librados::Rados *rados, + const string& pool, + std::optional pool_ns); + + void set_ioctx(librados::IoCtx *_ioctx) { + ioctx = ioctx; + } + + int open(bool create, + std::optional create_params = std::nullopt); + + int read_meta(std::optional objv = std::nullopt); + + const fifo_info_t& get_meta() const { + return meta_info; + } + + void get_part_layout_info(uint32_t *header_size, uint32_t *entry_overhead) { + if (header_size) { + *header_size = part_header_size; + } + + if (entry_overhead) { + *entry_overhead = part_entry_overhead; + } + } + + int push(bufferlist& bl); + int push(vector& bl); + + int list(int max_entries, + std::optional marker, + vector *result, + bool *more); + + int trim(const string& marker); + + int get_part_info(int64_t part_num, + fifo_part_info *result); + }; + } // namespace fifo + } // namespace cls +} // namespace rados diff --git a/src/cls/fifo/cls_fifo_ops.cc b/src/cls/fifo/cls_fifo_ops.cc new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/src/cls/fifo/cls_fifo_ops.h b/src/cls/fifo/cls_fifo_ops.h new file mode 100644 index 000000000000..2a8ceab3eaaf --- /dev/null +++ b/src/cls/fifo/cls_fifo_ops.h @@ -0,0 +1,282 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2019 Red Hat, Inc. + * Copyright (C) 2019 SUSE LLC + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#pragma once + +#include "include/types.h" +#include "include/utime.h" +#include "cls/fifo/cls_fifo_types.h" + +struct cls_fifo_meta_create_op +{ + string id; + std::optional objv; + struct { + string name; + string ns; + } pool; + std::optional oid_prefix; + + uint64_t max_part_size{0}; + uint64_t max_entry_size{0}; + + bool exclusive{false}; + + void encode(bufferlist &bl) const { + ENCODE_START(1, 1, bl); + encode(id, bl); + encode(objv, bl); + encode(pool.name, bl); + encode(pool.ns, bl); + encode(oid_prefix, bl); + encode(max_part_size, bl); + encode(max_entry_size, bl); + encode(exclusive, bl); + ENCODE_FINISH(bl); + } + void decode(bufferlist::const_iterator &bl) { + DECODE_START(1, bl); + decode(id, bl); + decode(objv, bl); + decode(pool.name, bl); + decode(pool.ns, bl); + decode(oid_prefix, bl); + decode(max_part_size, bl); + decode(max_entry_size, bl); + decode(exclusive, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(cls_fifo_meta_create_op) + +struct cls_fifo_meta_get_op +{ + std::optional objv; + + void encode(bufferlist &bl) const { + ENCODE_START(1, 1, bl); + encode(objv, bl); + ENCODE_FINISH(bl); + } + void decode(bufferlist::const_iterator &bl) { + DECODE_START(1, bl); + decode(objv, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(cls_fifo_meta_get_op) + +struct cls_fifo_meta_get_op_reply +{ + rados::cls::fifo::fifo_info_t info; + uint32_t part_header_size{0}; + uint32_t part_entry_overhead{0}; /* per entry extra data that is stored */ + + void encode(bufferlist &bl) const { + ENCODE_START(1, 1, bl); + encode(info, bl); + encode(part_header_size, bl); + encode(part_entry_overhead, bl); + ENCODE_FINISH(bl); + } + void decode(bufferlist::const_iterator &bl) { + DECODE_START(1, bl); + decode(info, bl); + decode(part_header_size, bl); + decode(part_entry_overhead, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(cls_fifo_meta_get_op_reply) + +struct cls_fifo_meta_update_op +{ + rados::cls::fifo::fifo_objv_t objv; + + std::optional tail_part_num; + std::optional head_part_num; + std::optional min_push_part_num; + std::optional max_push_part_num; + std::vector journal_entries_add; + std::vector journal_entries_rm; + + void encode(bufferlist &bl) const { + ENCODE_START(1, 1, bl); + encode(objv, bl); + encode(tail_part_num, bl); + encode(head_part_num, bl); + encode(min_push_part_num, bl); + encode(max_push_part_num, bl); + encode(journal_entries_add, bl); + encode(journal_entries_rm, bl); + ENCODE_FINISH(bl); + } + void decode(bufferlist::const_iterator &bl) { + DECODE_START(1, bl); + decode(objv, bl); + decode(tail_part_num, bl); + decode(head_part_num, bl); + decode(min_push_part_num, bl); + decode(max_push_part_num, bl); + decode(journal_entries_add, bl); + decode(journal_entries_rm, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(cls_fifo_meta_update_op) + +struct cls_fifo_part_init_op +{ + string tag; + rados::cls::fifo::fifo_data_params_t data_params; + + void encode(bufferlist &bl) const { + ENCODE_START(1, 1, bl); + encode(tag, bl); + encode(data_params, bl); + ENCODE_FINISH(bl); + } + void decode(bufferlist::const_iterator &bl) { + DECODE_START(1, bl); + decode(tag, bl); + decode(data_params, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(cls_fifo_part_init_op) + +struct cls_fifo_part_push_op +{ + string tag; + std::vector data_bufs; + uint64_t total_len{0}; + + void encode(bufferlist &bl) const { + ENCODE_START(1, 1, bl); + encode(tag, bl); + encode(data_bufs, bl); + encode(total_len, bl); + ENCODE_FINISH(bl); + } + void decode(bufferlist::const_iterator &bl) { + DECODE_START(1, bl); + decode(tag, bl); + decode(data_bufs, bl); + decode(total_len, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(cls_fifo_part_push_op) + +struct cls_fifo_part_trim_op +{ + std::optional tag; + uint64_t ofs{0}; + + void encode(bufferlist &bl) const { + ENCODE_START(1, 1, bl); + encode(tag, bl); + encode(ofs, bl); + ENCODE_FINISH(bl); + } + void decode(bufferlist::const_iterator &bl) { + DECODE_START(1, bl); + decode(tag, bl); + decode(ofs, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(cls_fifo_part_trim_op) + +struct cls_fifo_part_list_op +{ + std::optional tag; + uint64_t ofs{0}; + int max_entries{100}; + + void encode(bufferlist &bl) const { + ENCODE_START(1, 1, bl); + encode(tag, bl); + encode(ofs, bl); + encode(max_entries, bl); + ENCODE_FINISH(bl); + } + void decode(bufferlist::const_iterator &bl) { + DECODE_START(1, bl); + decode(tag, bl); + decode(ofs, bl); + decode(max_entries, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(cls_fifo_part_list_op) + +struct cls_fifo_part_list_op_reply +{ + string tag; + vector entries; + bool more{false}; + bool full_part{false}; /* whether part is full or still can be written to. + A non full part is by definition head part */ + + void encode(bufferlist &bl) const { + ENCODE_START(1, 1, bl); + encode(tag, bl); + encode(entries, bl); + encode(more, bl); + encode(full_part, bl); + ENCODE_FINISH(bl); + } + void decode(bufferlist::const_iterator &bl) { + DECODE_START(1, bl); + decode(tag, bl); + decode(entries, bl); + decode(more, bl); + decode(full_part, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(cls_fifo_part_list_op_reply) + +struct cls_fifo_part_get_info_op +{ + void encode(bufferlist &bl) const { + ENCODE_START(1, 1, bl); + ENCODE_FINISH(bl); + } + void decode(bufferlist::const_iterator &bl) { + DECODE_START(1, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(cls_fifo_part_get_info_op) + +struct cls_fifo_part_get_info_op_reply +{ + rados::cls::fifo::fifo_part_header_t header; + + void encode(bufferlist &bl) const { + ENCODE_START(1, 1, bl); + encode(header, bl); + ENCODE_FINISH(bl); + } + void decode(bufferlist::const_iterator &bl) { + DECODE_START(1, bl); + decode(header, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(cls_fifo_part_get_info_op_reply) diff --git a/src/cls/fifo/cls_fifo_types.cc b/src/cls/fifo/cls_fifo_types.cc new file mode 100644 index 000000000000..b182551e3a85 --- /dev/null +++ b/src/cls/fifo/cls_fifo_types.cc @@ -0,0 +1,90 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2019 Red Hat, Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "cls_fifo_types.h" + +string rados::cls::fifo::fifo_info_t::part_oid(int64_t part_num) +{ + char buf[oid_prefix.size() + 32]; + snprintf(buf, sizeof(buf), "%s.%lld", oid_prefix.c_str(), (long long)part_num); + + return string(buf); +} + +void rados::cls::fifo::fifo_info_t::prepare_next_journal_entry(fifo_journal_entry_t *entry, const string& tag) +{ + entry->op = fifo_journal_entry_t::Op::OP_CREATE; + entry->part_num = max_push_part_num + 1; + entry->part_tag = tag; +} + +int rados::cls::fifo::fifo_info_t::apply_update(std::optional& _tail_part_num, + std::optional& _head_part_num, + std::optional& _min_push_part_num, + std::optional& _max_push_part_num, + std::vector& journal_entries_add, + std::vector& journal_entries_rm, + string *err) +{ + if (_tail_part_num) { + tail_part_num = *_tail_part_num; + } + + if (_min_push_part_num) { + min_push_part_num = *_min_push_part_num; + } + + if (_max_push_part_num) { + max_push_part_num = *_max_push_part_num; + } + + for (auto& entry : journal_entries_add) { + auto iter = journal.find(entry.part_num); + if (iter != journal.end() && + iter->second.op == entry.op) { + /* don't allow multiple concurrent (same) operations on the same part, + racing clients should use objv to avoid races anyway */ + if (err) { + stringstream ss; + ss << "multiple concurrent operations on same part are not allowed, part num=" << entry.part_num; + *err = ss.str(); + } + return -EINVAL; + } + + if (entry.op == fifo_journal_entry_t::Op::OP_CREATE) { + tags[entry.part_num] = entry.part_tag; + } + + journal.insert(std::pair(entry.part_num, std::move(entry))); + } + + for (auto& entry : journal_entries_rm) { + journal.erase(entry.part_num); + } + + if (_head_part_num) { + tags.erase(head_part_num); + head_part_num = *_head_part_num; + auto iter = tags.find(head_part_num); + if (iter != tags.end()) { + head_tag = iter->second; + } else { + head_tag.erase(); + } + } + + return 0; +} diff --git a/src/cls/fifo/cls_fifo_types.h b/src/cls/fifo/cls_fifo_types.h new file mode 100644 index 000000000000..bdc1ed773edf --- /dev/null +++ b/src/cls/fifo/cls_fifo_types.h @@ -0,0 +1,274 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2019 Red Hat, Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#pragma once + + +#include "include/encoding.h" +#include "include/types.h" + + +class JSONObj; + +namespace rados { + namespace cls { + namespace fifo { + struct fifo_objv_t { + string instance; + uint64_t ver{0}; + + void encode(bufferlist &bl) const { + ENCODE_START(1, 1, bl); + encode(instance, bl); + encode(ver, bl); + ENCODE_FINISH(bl); + } + void decode(bufferlist::const_iterator &bl) { + DECODE_START(1, bl); + decode(instance, bl); + decode(ver, bl); + DECODE_FINISH(bl); + } + void dump(Formatter *f) const; + void decode_json(JSONObj *obj); + + bool operator==(const fifo_objv_t& rhs) const { + return (instance == rhs.instance && + ver == rhs.ver); + } + + bool empty() const { + return instance.empty(); + } + + string to_str() { + char buf[instance.size() + 32]; + snprintf(buf, sizeof(buf), "%s{%lld}", instance.c_str(), (long long)ver); + return string(buf); + } + }; + WRITE_CLASS_ENCODER(rados::cls::fifo::fifo_objv_t) + + struct fifo_data_params_t { + uint64_t max_part_size{0}; + uint64_t max_entry_size{0}; + uint64_t full_size_threshold{0}; + + void encode(bufferlist &bl) const { + ENCODE_START(1, 1, bl); + encode(max_part_size, bl); + encode(max_entry_size, bl); + encode(full_size_threshold, bl); + ENCODE_FINISH(bl); + } + void decode(bufferlist::const_iterator &bl) { + DECODE_START(1, bl); + decode(max_part_size, bl); + decode(max_entry_size, bl); + decode(full_size_threshold, bl); + DECODE_FINISH(bl); + } + void dump(Formatter *f) const; + void decode_json(JSONObj *obj); + + bool operator==(const fifo_data_params_t& rhs) const { + return (max_part_size == rhs.max_part_size && + max_entry_size == rhs.max_entry_size && + full_size_threshold == rhs.full_size_threshold); + } + }; + WRITE_CLASS_ENCODER(rados::cls::fifo::fifo_data_params_t) + + struct fifo_journal_entry_t { + enum Op { + OP_UNKNOWN = 0, + OP_CREATE = 1, + OP_SET_HEAD = 2, + OP_REMOVE = 3, + } op{OP_UNKNOWN}; + + int64_t part_num{0}; + string part_tag; + + void encode(bufferlist &bl) const { + ENCODE_START(1, 1, bl); + encode((int)op, bl); + encode(part_num, bl); + encode(part_tag, bl); + ENCODE_FINISH(bl); + } + void decode(bufferlist::const_iterator &bl) { + DECODE_START(1, bl); + int i; + decode(i, bl); + op = (Op)i; + decode(part_num, bl); + decode(part_tag, bl); + DECODE_FINISH(bl); + } + void dump(Formatter *f) const; + + bool operator==(const fifo_journal_entry_t& e) { + return (op == e.op && + part_num == e.part_num && + part_tag == e.part_tag); + } + }; + WRITE_CLASS_ENCODER(rados::cls::fifo::fifo_journal_entry_t) + + struct fifo_info_t { + string id; + fifo_objv_t objv; + string oid_prefix; + fifo_data_params_t data_params; + + int64_t tail_part_num{0}; + int64_t head_part_num{-1}; + int64_t min_push_part_num{0}; + int64_t max_push_part_num{-1}; + + string head_tag; + map tags; + + std::multimap journal; + + bool need_new_head() { + return (head_part_num < min_push_part_num); + } + + bool need_new_part() { + return (max_push_part_num < min_push_part_num); + } + + string part_oid(int64_t part_num); + void prepare_next_journal_entry(fifo_journal_entry_t *entry, const string& tag); + + int apply_update(std::optional& _tail_part_num, + std::optional& _head_part_num, + std::optional& _min_push_part_num, + std::optional& _max_push_part_num, + std::vector& journal_entries_add, + std::vector& journal_entries_rm, + string *err); + + void encode(bufferlist &bl) const { + ENCODE_START(1, 1, bl); + encode(id, bl); + encode(objv, bl); + encode(oid_prefix, bl); + encode(data_params, bl); + encode(tail_part_num, bl); + encode(head_part_num, bl); + encode(min_push_part_num, bl); + encode(max_push_part_num, bl); + encode(tags, bl); + encode(head_tag, bl); + encode(journal, bl); + ENCODE_FINISH(bl); + } + void decode(bufferlist::const_iterator &bl) { + DECODE_START(1, bl); + decode(id, bl); + decode(objv, bl); + decode(oid_prefix, bl); + decode(data_params, bl); + decode(tail_part_num, bl); + decode(head_part_num, bl); + decode(min_push_part_num, bl); + decode(max_push_part_num, bl); + decode(tags, bl); + decode(head_tag, bl); + decode(journal, bl); + DECODE_FINISH(bl); + } + void dump(Formatter *f) const; + void decode_json(JSONObj *obj); + }; + WRITE_CLASS_ENCODER(rados::cls::fifo::fifo_info_t) + + struct cls_fifo_part_list_entry_t { + bufferlist data; + uint64_t ofs; + ceph::real_time mtime; + + cls_fifo_part_list_entry_t() {} + cls_fifo_part_list_entry_t(bufferlist&& _data, + uint64_t _ofs, + ceph::real_time _mtime) : data(std::move(_data)), ofs(_ofs), mtime(_mtime) {} + + + void encode(bufferlist &bl) const { + ENCODE_START(1, 1, bl); + encode(data, bl); + encode(ofs, bl); + encode(mtime, bl); + ENCODE_FINISH(bl); + } + void decode(bufferlist::const_iterator &bl) { + DECODE_START(1, bl); + decode(data, bl); + decode(ofs, bl); + decode(mtime, bl); + DECODE_FINISH(bl); + } + }; + WRITE_CLASS_ENCODER(rados::cls::fifo::cls_fifo_part_list_entry_t) + + struct fifo_part_header_t { + string tag; + + fifo_data_params_t params; + + uint64_t magic{0}; + + uint64_t min_ofs{0}; + uint64_t max_ofs{0}; + uint64_t min_index{0}; + uint64_t max_index{0}; + + void encode(bufferlist &bl) const { + ENCODE_START(1, 1, bl); + encode(tag, bl); + encode(params, bl); + encode(magic, bl); + encode(min_ofs, bl); + encode(max_ofs, bl); + encode(min_index, bl); + encode(max_index, bl); + ENCODE_FINISH(bl); + } + void decode(bufferlist::const_iterator &bl) { + DECODE_START(1, bl); + decode(tag, bl); + decode(params, bl); + decode(magic, bl); + decode(min_ofs, bl); + decode(max_ofs, bl); + decode(min_index, bl); + decode(max_index, bl); + DECODE_FINISH(bl); + } + }; + WRITE_CLASS_ENCODER(fifo_part_header_t) + + } + } +} + +static inline ostream& operator<<(ostream& os, const rados::cls::fifo::fifo_objv_t& objv) +{ + return os << objv.instance << "{" << objv.ver << "}"; +} + diff --git a/src/test/CMakeLists.txt b/src/test/CMakeLists.txt index 0d82c929f4c5..c261a313e3bb 100644 --- a/src/test/CMakeLists.txt +++ b/src/test/CMakeLists.txt @@ -8,6 +8,7 @@ target_include_directories(unit-main PRIVATE $) add_subdirectory(cls_hello) +add_subdirectory(cls_fifo) add_subdirectory(cls_lock) add_subdirectory(cls_cas) add_subdirectory(cls_log) diff --git a/src/test/cls_fifo/CMakeLists.txt b/src/test/cls_fifo/CMakeLists.txt new file mode 100644 index 000000000000..49487f8b795b --- /dev/null +++ b/src/test/cls_fifo/CMakeLists.txt @@ -0,0 +1,17 @@ +add_executable(ceph_test_cls_fifo + test_cls_fifo.cc + ) +target_link_libraries(ceph_test_cls_fifo + cls_fifo_client + librados + global + ${UNITTEST_LIBS} + ${BLKID_LIBRARIES} + ${CMAKE_DL_LIBS} + ${CRYPTO_LIBS} + ${EXTRALIBS} + radostest-cxx + ) +install(TARGETS + ceph_test_cls_fifo + DESTINATION ${CMAKE_INSTALL_BINDIR}) diff --git a/src/test/cls_fifo/test_cls_fifo.cc b/src/test/cls_fifo/test_cls_fifo.cc new file mode 100644 index 000000000000..58286f47cd2f --- /dev/null +++ b/src/test/cls_fifo/test_cls_fifo.cc @@ -0,0 +1,704 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2019 Red Hat, Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include +#include + +#include "include/types.h" +#include "include/rados/librados.hpp" + +#include "test/librados/test_cxx.h" +#include "global/global_context.h" + +#include "gtest/gtest.h" + +using namespace librados; + +#include "cls/fifo/cls_fifo_client.h" + + +using namespace rados::cls::fifo; + +static CephContext *cct(librados::IoCtx& ioctx) +{ + return reinterpret_cast(ioctx.cct()); +} + +static int fifo_create(IoCtx& ioctx, + const string& oid, + const string& id, + const ClsFIFO::MetaCreateParams& params) +{ + ObjectWriteOperation op; + + int r = ClsFIFO::meta_create(&op, id, params); + if (r < 0) { + return r; + } + + return ioctx.operate(oid, &op); +} + +TEST(ClsFIFO, TestCreate) { + Rados cluster; + std::string pool_name = get_temp_pool_name(); + ASSERT_EQ("", create_one_pool_pp(pool_name, cluster)); + IoCtx ioctx; + cluster.ioctx_create(pool_name.c_str(), ioctx); + + string fifo_id = "fifo"; + string oid = fifo_id; + + ASSERT_EQ(-EINVAL, fifo_create(ioctx, oid, string(), + ClsFIFO::MetaCreateParams())); + + ASSERT_EQ(-EINVAL, fifo_create(ioctx, oid, fifo_id, + ClsFIFO::MetaCreateParams() + .max_part_size(0))); + + ASSERT_EQ(-EINVAL, fifo_create(ioctx, oid, fifo_id, + ClsFIFO::MetaCreateParams() + .max_entry_size(0))); + + /* first successful create */ + ASSERT_EQ(0, fifo_create(ioctx, oid, fifo_id, + ClsFIFO::MetaCreateParams())); + + uint64_t size; + struct timespec ts; + ASSERT_EQ(0, ioctx.stat2(oid, &size, &ts)); + ASSERT_GT(size, 0); + + /* test idempotency */ + ASSERT_EQ(0, fifo_create(ioctx, oid, fifo_id, + ClsFIFO::MetaCreateParams())); + + uint64_t size2; + struct timespec ts2; + ASSERT_EQ(0, ioctx.stat2(oid, &size2, &ts2)); + ASSERT_EQ(size2, size); + + ASSERT_EQ(-EEXIST, fifo_create(ioctx, oid, fifo_id, + ClsFIFO::MetaCreateParams() + .exclusive(true))); + + ASSERT_EQ(-EEXIST, fifo_create(ioctx, oid, fifo_id, + ClsFIFO::MetaCreateParams() + .oid_prefix("myprefix") + .exclusive(false))); + + ASSERT_EQ(-EEXIST, fifo_create(ioctx, oid, "foo", + ClsFIFO::MetaCreateParams() + .exclusive(false))); + + ASSERT_EQ(0, destroy_one_pool_pp(pool_name, cluster)); +} + +TEST(ClsFIFO, TestGetInfo) { + Rados cluster; + std::string pool_name = get_temp_pool_name(); + ASSERT_EQ("", create_one_pool_pp(pool_name, cluster)); + IoCtx ioctx; + cluster.ioctx_create(pool_name.c_str(), ioctx); + + string fifo_id = "fifo"; + string oid = fifo_id; + + fifo_info_t info; + + /* first successful create */ + ASSERT_EQ(0, fifo_create(ioctx, oid, fifo_id, + ClsFIFO::MetaCreateParams())); + + uint32_t part_header_size; + uint32_t part_entry_overhead; + + ASSERT_EQ(0, ClsFIFO::meta_get(ioctx, oid, + ClsFIFO::MetaGetParams(), &info, + &part_header_size, &part_entry_overhead)); + + ASSERT_GT(part_header_size, 0); + ASSERT_GT(part_entry_overhead, 0); + + ASSERT_TRUE(!info.objv.instance.empty()); + + ASSERT_EQ(0, ClsFIFO::meta_get(ioctx, oid, + ClsFIFO::MetaGetParams() + .objv(info.objv), + &info, + &part_header_size, &part_entry_overhead)); + + fifo_objv_t objv; + objv.instance="foo"; + objv.ver = 12; + ASSERT_EQ(-ECANCELED, ClsFIFO::meta_get(ioctx, oid, + ClsFIFO::MetaGetParams() + .objv(objv), + &info, + &part_header_size, &part_entry_overhead)); + + ASSERT_EQ(0, destroy_one_pool_pp(pool_name, cluster)); +} + +TEST(FIFO, TestOpenDefault) { + Rados cluster; + std::string pool_name = get_temp_pool_name(); + ASSERT_EQ("", create_one_pool_pp(pool_name, cluster)); + IoCtx ioctx; + cluster.ioctx_create(pool_name.c_str(), ioctx); + + string fifo_id = "fifo"; + + FIFO fifo(cct(ioctx), fifo_id, &ioctx); + + /* pre-open ops that should fail */ + ASSERT_EQ(-EINVAL, fifo.read_meta()); + + bufferlist bl; + ASSERT_EQ(-EINVAL, fifo.push(bl)); + + ASSERT_EQ(-EINVAL, fifo.list(100, nullopt, nullptr, nullptr)); + ASSERT_EQ(-EINVAL, fifo.trim(string())); + + ASSERT_EQ(-ENOENT, fifo.open(false)); + + /* first successful create */ + ASSERT_EQ(0, fifo.open(true)); + + ASSERT_EQ(0, fifo.read_meta()); /* force reading from backend */ + + auto info = fifo.get_meta(); + + ASSERT_EQ(info.id, fifo_id); + + ASSERT_EQ(0, destroy_one_pool_pp(pool_name, cluster)); +} + +TEST(FIFO, TestOpenParams) { + Rados cluster; + std::string pool_name = get_temp_pool_name(); + ASSERT_EQ("", create_one_pool_pp(pool_name, cluster)); + IoCtx ioctx; + cluster.ioctx_create(pool_name.c_str(), ioctx); + + string fifo_id = "fifo"; + + FIFO fifo(cct(ioctx), fifo_id, &ioctx); + + uint64_t max_part_size = 10 * 1024; + uint64_t max_entry_size = 128; + string oid_prefix = "foo.123."; + + fifo_objv_t objv; + objv.instance = "fooz"; + objv.ver = 10; + + + /* first successful create */ + ASSERT_EQ(0, fifo.open(true, + ClsFIFO::MetaCreateParams() + .max_part_size(max_part_size) + .max_entry_size(max_entry_size) + .oid_prefix(oid_prefix) + .objv(objv))); + + ASSERT_EQ(0, fifo.read_meta()); /* force reading from backend */ + + auto info = fifo.get_meta(); + + ASSERT_EQ(info.id, fifo_id); + ASSERT_EQ(info.data_params.max_part_size, max_part_size); + ASSERT_EQ(info.data_params.max_entry_size, max_entry_size); + ASSERT_EQ(info.objv, objv); + + ASSERT_EQ(0, destroy_one_pool_pp(pool_name, cluster)); +} + +template +static int decode_entry(fifo_entry& entry, + T *val, + string *marker) +{ + *marker = entry.marker; + auto iter = entry.data.cbegin(); + + try { + decode(*val, iter); + } catch (buffer::error& err) { + return -EIO; + } + + return 0; +} + +TEST(FIFO, TestPushListTrim) { + Rados cluster; + std::string pool_name = get_temp_pool_name(); + ASSERT_EQ("", create_one_pool_pp(pool_name, cluster)); + IoCtx ioctx; + cluster.ioctx_create(pool_name.c_str(), ioctx); + + string fifo_id = "fifo"; + + FIFO fifo(cct(ioctx), fifo_id, &ioctx); + + /* first successful create */ + ASSERT_EQ(0, fifo.open(true)); + + uint32_t max_entries = 10; + + for (uint32_t i = 0; i < max_entries; ++i) { + bufferlist bl; + encode(i, bl); + ASSERT_EQ(0, fifo.push(bl)); + } + + string marker; + + /* get entries one by one */ + + for (uint32_t i = 0; i < max_entries; ++i) { + vector result; + bool more; + ASSERT_EQ(0, fifo.list(1, marker, &result, &more)); + + bool expected_more = (i != (max_entries - 1)); + ASSERT_EQ(expected_more, more); + ASSERT_EQ(1, result.size()); + + uint32_t val; + ASSERT_EQ(0, decode_entry(result.front(), &val, &marker)); + + ASSERT_EQ(i, val); + } + + /* get all entries at once */ + vector result; + bool more; + ASSERT_EQ(0, fifo.list(max_entries * 10, string(), &result, &more)); + + ASSERT_FALSE(more); + ASSERT_EQ(max_entries, result.size()); + + string markers[max_entries]; + + + for (uint32_t i = 0; i < max_entries; ++i) { + uint32_t val; + + ASSERT_EQ(0, decode_entry(result[i], &val, &markers[i])); + ASSERT_EQ(i, val); + } + + uint32_t min_entry = 0; + + /* trim one entry */ + fifo.trim(markers[min_entry]); + ++min_entry; + + ASSERT_EQ(0, fifo.list(max_entries * 10, string(), &result, &more)); + + ASSERT_FALSE(more); + ASSERT_EQ(max_entries - min_entry, result.size()); + + for (uint32_t i = min_entry; i < max_entries; ++i) { + uint32_t val; + + ASSERT_EQ(0, decode_entry(result[i - min_entry], &val, &markers[i])); + ASSERT_EQ(i, val); + } + + + ASSERT_EQ(0, destroy_one_pool_pp(pool_name, cluster)); +} + +TEST(FIFO, TestPushTooBig) { + Rados cluster; + std::string pool_name = get_temp_pool_name(); + ASSERT_EQ("", create_one_pool_pp(pool_name, cluster)); + IoCtx ioctx; + cluster.ioctx_create(pool_name.c_str(), ioctx); + + string fifo_id = "fifo"; + + FIFO fifo(cct(ioctx), fifo_id, &ioctx); + + uint64_t max_part_size = 2048; + uint64_t max_entry_size = 128; + + char buf[max_entry_size + 1]; + memset(buf, 0, sizeof(buf)); + + /* first successful create */ + ASSERT_EQ(0, fifo.open(true, + ClsFIFO::MetaCreateParams() + .max_part_size(max_part_size) + .max_entry_size(max_entry_size))); + + bufferlist bl; + bl.append(buf, sizeof(buf)); + + ASSERT_EQ(-EINVAL, fifo.push(bl)); + + + ASSERT_EQ(0, destroy_one_pool_pp(pool_name, cluster)); +} + +TEST(FIFO, TestMultipleParts) { + Rados cluster; + std::string pool_name = get_temp_pool_name(); + ASSERT_EQ("", create_one_pool_pp(pool_name, cluster)); + IoCtx ioctx; + cluster.ioctx_create(pool_name.c_str(), ioctx); + + string fifo_id = "fifo"; + + FIFO fifo(cct(ioctx), fifo_id, &ioctx); + + uint64_t max_part_size = 2048; + uint64_t max_entry_size = 128; + + char buf[max_entry_size]; + memset(buf, 0, sizeof(buf)); + + /* create */ + ASSERT_EQ(0, fifo.open(true, + ClsFIFO::MetaCreateParams() + .max_part_size(max_part_size) + .max_entry_size(max_entry_size))); + + uint32_t part_header_size; + uint32_t part_entry_overhead; + + fifo.get_part_layout_info(&part_header_size, &part_entry_overhead); + + int entries_per_part = (max_part_size - part_header_size) / (max_entry_size + part_entry_overhead); + + int max_entries = entries_per_part * 4 + 1; + + /* push enough entries */ + for (int i = 0; i < max_entries; ++i) { + bufferlist bl; + + *(int *)buf = i; + bl.append(buf, sizeof(buf)); + + ASSERT_EQ(0, fifo.push(bl)); + } + + auto info = fifo.get_meta(); + + ASSERT_EQ(info.id, fifo_id); + ASSERT_GT(info.head_part_num, 0); /* head should have advanced */ + + + /* list all at once */ + vector result; + bool more; + ASSERT_EQ(0, fifo.list(max_entries, string(), &result, &more)); + ASSERT_EQ(false, more); + + ASSERT_EQ(max_entries, result.size()); + + for (int i = 0; i < max_entries; ++i) { + auto& bl = result[i].data; + ASSERT_EQ(i, *(int *)bl.c_str()); + } + + + /* list one at a time */ + string marker; + for (int i = 0; i < max_entries; ++i) { + ASSERT_EQ(0, fifo.list(1, marker, &result, &more)); + + ASSERT_EQ(result.size(), 1); + bool expected_more = (i != (max_entries - 1)); + ASSERT_EQ(expected_more, more); + + auto& entry = result[0]; + + auto& bl = entry.data; + marker = entry.marker; + + ASSERT_EQ(i, *(int *)bl.c_str()); + } + + /* trim one at a time */ + marker.clear(); + for (int i = 0; i < max_entries; ++i) { + /* read single entry */ + ASSERT_EQ(0, fifo.list(1, marker, &result, &more)); + + ASSERT_EQ(result.size(), 1); + bool expected_more = (i != (max_entries - 1)); + ASSERT_EQ(expected_more, more); + + marker = result[0].marker; + + /* trim */ + ASSERT_EQ(0, fifo.trim(marker)); + + /* check tail */ + info = fifo.get_meta(); + ASSERT_EQ(info.tail_part_num, i / entries_per_part); + + /* try to read all again, see how many entries left */ + ASSERT_EQ(0, fifo.list(max_entries, marker, &result, &more)); + ASSERT_EQ(max_entries - i - 1, result.size()); + ASSERT_EQ(false, more); + } + + /* tail now should point at head */ + info = fifo.get_meta(); + ASSERT_EQ(info.head_part_num, info.tail_part_num); + + fifo_part_info part_info; + + /* check old tails are removed */ + for (int i = 0; i < info.tail_part_num; ++i) { + ASSERT_EQ(-ENOENT, fifo.get_part_info(i, &part_info)); + } + + /* check curent tail exists */ + ASSERT_EQ(0, fifo.get_part_info(info.tail_part_num, &part_info)); + + ASSERT_EQ(0, destroy_one_pool_pp(pool_name, cluster)); +} + +TEST(FIFO, TestTwoPushers) { + Rados cluster; + std::string pool_name = get_temp_pool_name(); + ASSERT_EQ("", create_one_pool_pp(pool_name, cluster)); + IoCtx ioctx; + cluster.ioctx_create(pool_name.c_str(), ioctx); + + string fifo_id = "fifo"; + + FIFO fifo(cct(ioctx), fifo_id, &ioctx); + + uint64_t max_part_size = 2048; + uint64_t max_entry_size = 128; + + char buf[max_entry_size]; + memset(buf, 0, sizeof(buf)); + + /* create */ + ASSERT_EQ(0, fifo.open(true, + ClsFIFO::MetaCreateParams() + .max_part_size(max_part_size) + .max_entry_size(max_entry_size))); + + uint32_t part_header_size; + uint32_t part_entry_overhead; + + fifo.get_part_layout_info(&part_header_size, &part_entry_overhead); + + int entries_per_part = (max_part_size - part_header_size) / (max_entry_size + part_entry_overhead); + + int max_entries = entries_per_part * 4 + 1; + + FIFO fifo2(cct(ioctx), fifo_id, &ioctx); + + /* open second one */ + ASSERT_EQ(0, fifo2.open(true, + ClsFIFO::MetaCreateParams())); + + vector fifos(2); + fifos[0] = &fifo; + fifos[1] = &fifo2; + + for (int i = 0; i < max_entries; ++i) { + bufferlist bl; + + *(int *)buf = i; + bl.append(buf, sizeof(buf)); + + auto& f = fifos[i % fifos.size()]; + + ASSERT_EQ(0, f->push(bl)); + } + + /* list all by both */ + vector result; + bool more; + ASSERT_EQ(0, fifo2.list(max_entries, string(), &result, &more)); + + ASSERT_EQ(false, more); + + ASSERT_EQ(max_entries, result.size()); + + ASSERT_EQ(0, fifo.list(max_entries, string(), &result, &more)); + ASSERT_EQ(false, more); + + ASSERT_EQ(max_entries, result.size()); + + for (int i = 0; i < max_entries; ++i) { + auto& bl = result[i].data; + ASSERT_EQ(i, *(int *)bl.c_str()); + } + + ASSERT_EQ(0, destroy_one_pool_pp(pool_name, cluster)); +} + +TEST(FIFO, TestTwoPushersTrim) { + Rados cluster; + std::string pool_name = get_temp_pool_name(); + ASSERT_EQ("", create_one_pool_pp(pool_name, cluster)); + IoCtx ioctx; + cluster.ioctx_create(pool_name.c_str(), ioctx); + + string fifo_id = "fifo"; + + FIFO fifo1(cct(ioctx), fifo_id, &ioctx); + + uint64_t max_part_size = 2048; + uint64_t max_entry_size = 128; + + char buf[max_entry_size]; + memset(buf, 0, sizeof(buf)); + + /* create */ + ASSERT_EQ(0, fifo1.open(true, + ClsFIFO::MetaCreateParams() + .max_part_size(max_part_size) + .max_entry_size(max_entry_size))); + + uint32_t part_header_size; + uint32_t part_entry_overhead; + + fifo1.get_part_layout_info(&part_header_size, &part_entry_overhead); + + int entries_per_part = (max_part_size - part_header_size) / (max_entry_size + part_entry_overhead); + + int max_entries = entries_per_part * 4 + 1; + + FIFO fifo2(cct(ioctx), fifo_id, &ioctx); + + /* open second one */ + ASSERT_EQ(0, fifo2.open(true, + ClsFIFO::MetaCreateParams())); + + /* push one entry to fifo2 and the rest to fifo1 */ + + for (int i = 0; i < max_entries; ++i) { + bufferlist bl; + + *(int *)buf = i; + bl.append(buf, sizeof(buf)); + + FIFO *f = (i < 1 ? &fifo2 : &fifo1); + + ASSERT_EQ(0, f->push(bl)); + } + + /* trim half by fifo1 */ + int num = max_entries / 2; + + vector result; + bool more; + ASSERT_EQ(0, fifo1.list(num, string(), &result, &more)); + + ASSERT_EQ(true, more); + ASSERT_EQ(num, result.size()); + + for (int i = 0; i < num; ++i) { + auto& bl = result[i].data; + ASSERT_EQ(i, *(int *)bl.c_str()); + } + + auto& entry = result[num - 1]; + auto& marker = entry.marker; + + ASSERT_EQ(0, fifo1.trim(marker)); + + /* list what's left by fifo2 */ + + int left = max_entries - num; + + ASSERT_EQ(0, fifo2.list(left, marker, &result, &more)); + ASSERT_EQ(left, result.size()); + ASSERT_EQ(false, more); + + for (int i = num; i < max_entries; ++i) { + auto& bl = result[i - num].data; + ASSERT_EQ(i, *(int *)bl.c_str()); + } + + ASSERT_EQ(0, destroy_one_pool_pp(pool_name, cluster)); +} + +TEST(FIFO, TestPushBatch) { + Rados cluster; + std::string pool_name = get_temp_pool_name(); + ASSERT_EQ("", create_one_pool_pp(pool_name, cluster)); + IoCtx ioctx; + cluster.ioctx_create(pool_name.c_str(), ioctx); + + string fifo_id = "fifo"; + + FIFO fifo(cct(ioctx), fifo_id, &ioctx); + + uint64_t max_part_size = 2048; + uint64_t max_entry_size = 128; + + char buf[max_entry_size]; + memset(buf, 0, sizeof(buf)); + + /* create */ + ASSERT_EQ(0, fifo.open(true, + ClsFIFO::MetaCreateParams() + .max_part_size(max_part_size) + .max_entry_size(max_entry_size))); + + uint32_t part_header_size; + uint32_t part_entry_overhead; + + fifo.get_part_layout_info(&part_header_size, &part_entry_overhead); + + int entries_per_part = (max_part_size - part_header_size) / (max_entry_size + part_entry_overhead); + + int max_entries = entries_per_part * 4 + 1; /* enough entries to span multiple parts */ + + vector bufs; + + for (int i = 0; i < max_entries; ++i) { + bufferlist bl; + + *(int *)buf = i; + bl.append(buf, sizeof(buf)); + + bufs.push_back(bl); + } + + ASSERT_EQ(0, fifo.push(bufs)); + + /* list all */ + + vector result; + bool more; + ASSERT_EQ(0, fifo.list(max_entries, string(), &result, &more)); + + ASSERT_EQ(false, more); + ASSERT_EQ(max_entries, result.size()); + + for (int i = 0; i < max_entries; ++i) { + auto& bl = result[i].data; + ASSERT_EQ(i, *(int *)bl.c_str()); + } + + auto& info = fifo.get_meta(); + ASSERT_EQ(info.head_part_num, 4); + + ASSERT_EQ(0, destroy_one_pool_pp(pool_name, cluster)); +}