]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
cls/fifo: FIFO over RADOS
authorYehuda Sadeh <yehuda@redhat.com>
Thu, 5 Sep 2019 11:15:15 +0000 (04:15 -0700)
committerAdam C. Emerson <aemerson@redhat.com>
Wed, 9 Sep 2020 02:09:40 +0000 (22:09 -0400)
This is an implementation of fifo queue over rados. Data is appended
to rados object until it's full. At that point data will be written to
a new object. Data is read from the tail sequentially (can be iterated
using marker). Data can be trimmed (up to and including marker).

Queue has a header object (meta), and zero or more data objects (parts).

The software has two layers: the higher level client operations side
that deals with the application apis, and manages the meta and parts,
and there’s the objclass level that deals with the rados layout of
meta and part objects. There are different objclass methods that deal
with reading and modifying each of these entities.

A single part has max possible size, however, it may become full once
a certain smaller size is reached (full_size_threshold). It is
imperative that once a part has reached its capacity, it will not
allow any more writes into it. For this reason, it is important that
data being written to the queue does not exceed max_entry_size . This
is enforced, by the higher level client api.

Written entries go to the current head object, and when it’s full, a
new head part is created. When listing entries, data is iterated from
tail to the current head. Trim can either change the pointer within
the current tail object, and if needed it removes tail objects.

A unitest has been created to test functionality.

Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
Squashed-by: Adam C. Emerson <aemerson@redhat.com>
src/cls/CMakeLists.txt
src/cls/fifo/cls_fifo.cc [new file with mode: 0644]
src/cls/fifo/cls_fifo_client.cc [new file with mode: 0644]
src/cls/fifo/cls_fifo_client.h [new file with mode: 0644]
src/cls/fifo/cls_fifo_ops.cc [new file with mode: 0644]
src/cls/fifo/cls_fifo_ops.h [new file with mode: 0644]
src/cls/fifo/cls_fifo_types.cc [new file with mode: 0644]
src/cls/fifo/cls_fifo_types.h [new file with mode: 0644]
src/test/CMakeLists.txt
src/test/cls_fifo/CMakeLists.txt [new file with mode: 0644]
src/test/cls_fifo/test_cls_fifo.cc [new file with mode: 0644]

index cb427482e92ff14c67b85e55c6e238cdd5f1f880..a100dd04b68e380de007deed70ea0b2c3f19bc0a 100644 (file)
@@ -271,6 +271,7 @@ set_target_properties(cls_cas PROPERTIES
 install(TARGETS cls_cas DESTINATION ${cls_dir})
 
 
+
 #cls_queue
 set(cls_queue_srcs
   queue/cls_queue.cc
@@ -307,6 +308,7 @@ if (WITH_RADOSGW)
   add_library(cls_rgw_gc_client STATIC ${cls_rgw_gc_client_srcs})
 endif (WITH_RADOSGW)
 
+
 #cls_2pc_queue
 set(cls_2pc_queue_srcs
   2pc_queue/cls_2pc_queue.cc
@@ -319,9 +321,25 @@ set_target_properties(cls_2pc_queue PROPERTIES
   INSTALL_RPATH ""
   CXX_VISIBILITY_PRESET hidden)
 install(TARGETS cls_2pc_queue DESTINATION ${cls_dir})
-
 set(cls_2pc_queue_client_srcs
   2pc_queue/cls_2pc_queue_client.cc)
 add_library(cls_2pc_queue_client STATIC ${cls_2pc_queue_client_srcs})
 
+
 add_subdirectory(cmpomap)
+
+# cls_fifo
+set(cls_fifo_srcs fifo/cls_fifo.cc fifo/cls_fifo_types.cc)
+add_library(cls_fifo SHARED ${cls_fifo_srcs})
+set_target_properties(cls_fifo PROPERTIES
+  VERSION "1.0.0"
+  SOVERSION "1"
+  INSTALL_RPATH ""
+  CXX_VISIBILITY_PRESET hidden)
+install(TARGETS cls_fifo DESTINATION ${cls_dir})
+
+set(cls_fifo_client_srcs
+  fifo/cls_fifo_client.cc
+  fifo/cls_fifo_types.cc
+  fifo/cls_fifo_ops.cc)
+add_library(cls_fifo_client STATIC ${cls_fifo_client_srcs})
diff --git a/src/cls/fifo/cls_fifo.cc b/src/cls/fifo/cls_fifo.cc
new file mode 100644 (file)
index 0000000..553ad30
--- /dev/null
@@ -0,0 +1,907 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+/** \file
+ *
+ * This is an OSD class that implements methods for management
+ * and use of fifo
+ *
+ */
+
+#include <errno.h>
+
+#include "objclass/objclass.h"
+
+#include "cls/fifo/cls_fifo_ops.h"
+#include "cls/fifo/cls_fifo_types.h"
+
+
+using namespace rados::cls::fifo;
+
+
+CLS_VER(1,0)
+CLS_NAME(fifo)
+
+
+#define CLS_FIFO_MAX_PART_HEADER_SIZE 512
+
+static uint32_t part_entry_overhead;
+
+struct cls_fifo_entry_header_pre {
+  __le64 magic;
+  __le64 pre_size;
+  __le64 header_size;
+  __le64 data_size;
+  __le64 index;
+  __le32 reserved;
+} __attribute__ ((packed));
+
+struct cls_fifo_entry_header {
+  ceph::real_time mtime;
+
+  void encode(bufferlist &bl) const {
+    ENCODE_START(1, 1, bl);
+    encode(mtime, bl);
+    ENCODE_FINISH(bl);
+  }
+  void decode(bufferlist::const_iterator &bl) {
+    DECODE_START(1, bl);
+    decode(mtime, bl);
+    DECODE_FINISH(bl);
+  }
+};
+WRITE_CLASS_ENCODER(cls_fifo_entry_header)
+
+
+static string new_oid_prefix(string id, std::optional<string>& val)
+{
+  if (val) {
+    return *val;
+  }
+
+#define PREFIX_RND_SIZE 12
+
+  char buf[PREFIX_RND_SIZE + 1];
+  buf[PREFIX_RND_SIZE] = 0;
+
+  cls_gen_rand_base64(buf, sizeof(buf) - 1);
+
+  char s[id.size() + 1 + sizeof(buf) + 16];
+  snprintf(s, sizeof(s), "%s.%s", id.c_str(), buf);
+  return s;
+}
+
+static int write_header(cls_method_context_t hctx,
+                        fifo_info_t& header,
+                        bool inc_ver = true)
+{
+  if (header.objv.instance.empty()) {
+#define HEADER_INSTANCE_SIZE 16
+  char buf[HEADER_INSTANCE_SIZE + 1];
+  buf[HEADER_INSTANCE_SIZE] = 0;
+  cls_gen_rand_base64(buf, sizeof(buf) - 1);
+
+    header.objv.instance = buf;
+  }
+  if (inc_ver) {
+    ++header.objv.ver;
+  }
+  bufferlist bl;
+  encode(header, bl);
+  return cls_cxx_write_full(hctx, &bl);
+}
+
+static int read_part_header(cls_method_context_t hctx,
+                            fifo_part_header_t *part_header)
+{
+  bufferlist bl;
+  int r = cls_cxx_read2(hctx, 0, CLS_FIFO_MAX_PART_HEADER_SIZE, &bl, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
+  if (r < 0) {
+    CLS_ERR("ERROR: %s(): cls_cxx_read2() on obj returned %d", __func__, r);
+    return r;
+  }
+
+  auto iter = bl.cbegin();
+  try {
+    decode(*part_header, iter);
+  } catch (buffer::error& err) {
+    CLS_ERR("ERROR: %s(): failed decoding part header", __func__);
+    return -EIO;
+  }
+
+  CLS_LOG(20, "%s():%d read part_header:\n"
+           "\ttag=%s\n"
+           "\tmagic=0x%llx\n"
+           "\tmin_ofs=%lld\n"
+           "\tmax_ofs=%lld\n"
+           "\tmin_index=%lld\n"
+           "\tmax_index=%lld\n",
+           __func__, __LINE__,
+           part_header->tag.c_str(), 
+           (long long)part_header->magic,
+           (long long)part_header->min_ofs,
+           (long long)part_header->max_ofs,
+           (long long)part_header->min_index,
+           (long long)part_header->max_index);
+
+  return 0;
+
+}
+
+static int write_part_header(cls_method_context_t hctx,
+                             fifo_part_header_t& part_header)
+{
+  bufferlist bl;
+  encode(part_header, bl);
+
+  if (bl.length() > CLS_FIFO_MAX_PART_HEADER_SIZE) {
+    CLS_LOG(10, "%s(): cannot write part header, buffer exceeds max size", __func__);
+    return -EIO;
+  }
+
+  int r = cls_cxx_write2(hctx, 0, bl.length(),
+                     &bl, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
+  if (r < 0) {
+    CLS_LOG(10, "%s(): failed to write part header: r=%d",
+            __func__, r);
+    return r;
+  }
+
+  return 0;
+}
+
+static int read_header(cls_method_context_t hctx,
+                       std::optional<fifo_objv_t> objv,
+                       fifo_info_t *info)
+{
+  uint64_t size;
+
+  int r = cls_cxx_stat2(hctx, &size, nullptr);
+  if (r < 0) {
+    CLS_ERR("ERROR: %s(): cls_cxx_stat2() on obj returned %d", __func__, r);
+    return r;
+  }
+
+  bufferlist bl;
+  r = cls_cxx_read2(hctx, 0, size, &bl, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
+  if (r < 0) {
+    CLS_ERR("ERROR: %s(): cls_cxx_read2() on obj returned %d", __func__, r);
+    return r;
+  }
+
+  try {
+    auto iter = bl.cbegin();
+    decode(*info, iter);
+  } catch (buffer::error& err) {
+    CLS_ERR("ERROR: %s(): failed decoding header", __func__);
+    return -EIO;
+  }
+
+  if (objv &&
+      !(info->objv == *objv)) {
+    string s1 = info->objv.to_str();
+    string s2 = objv->to_str();
+    CLS_LOG(10, "%s(): version mismatch (header=%s, req=%s), cancelled operation", __func__, s1.c_str(), s2.c_str());
+    return -ECANCELED;
+  }
+
+  return 0;
+}
+
+static int fifo_meta_create_op(cls_method_context_t hctx,
+                          bufferlist *in, bufferlist *out)
+{
+  CLS_LOG(20, "%s", __func__);
+
+  cls_fifo_meta_create_op op;
+  try {
+    auto iter = in->cbegin();
+    decode(op, iter);
+  } catch (const buffer::error &err) {
+    CLS_ERR("ERROR: %s(): failed to decode request", __func__);
+    return -EINVAL;
+  }
+
+  uint64_t size;
+
+  int r = cls_cxx_stat2(hctx, &size, nullptr);
+  if (r < 0 && r != -ENOENT) {
+    CLS_ERR("ERROR: %s(): cls_cxx_stat2() on obj returned %d", __func__, r);
+    return r;
+  }
+  if (op.exclusive && r == 0) {
+    CLS_LOG(10, "%s(): exclusive create but queue already exists", __func__);
+    return -EEXIST;
+  }
+
+  if (r == 0) {
+    bufferlist bl;
+    r = cls_cxx_read2(hctx, 0, size, &bl, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
+    if (r < 0) {
+      CLS_ERR("ERROR: %s(): cls_cxx_read2() on obj returned %d", __func__, r);
+      return r;
+    }
+
+    fifo_info_t header;
+    try {
+      auto iter = bl.cbegin();
+      decode(header, iter);
+    } catch (buffer::error& err) {
+      CLS_ERR("ERROR: %s(): failed decoding header", __func__);
+      return -EIO;
+    }
+
+    if (!(header.id == op.id &&
+          (!op.oid_prefix ||
+           header.oid_prefix == *op.oid_prefix) &&
+          (!op.objv ||
+           header.objv == *op.objv))) {
+      CLS_LOG(10, "%s(): failed to re-create existing queue with different params", __func__);
+      return -EEXIST;
+    }
+
+    return 0; /* already exists */
+  }
+  fifo_info_t header;
+  
+  header.id = op.id;
+  if (op.objv) {
+    header.objv = *op.objv;
+  } else {
+#define DEFAULT_INSTANCE_SIZE 16
+    char buf[DEFAULT_INSTANCE_SIZE + 1];
+    cls_gen_rand_base64(buf, sizeof(buf));
+    buf[DEFAULT_INSTANCE_SIZE] = '\0';
+    header.objv.instance = buf;
+    header.objv.ver = 1;
+  }
+  header.oid_prefix = new_oid_prefix(op.id, op.oid_prefix);
+
+  header.data_params.max_part_size = op.max_part_size;
+  header.data_params.max_entry_size = op.max_entry_size;
+  header.data_params.full_size_threshold = op.max_part_size - op.max_entry_size - part_entry_overhead;
+
+  r = write_header(hctx, header, false);
+  if (r < 0) {
+    CLS_LOG(10, "%s(): failed to write header: r=%d", __func__, r);
+    return r;
+  }
+
+  return 0;
+}
+
+static int fifo_meta_update_op(cls_method_context_t hctx,
+                                bufferlist *in, bufferlist *out)
+{
+  CLS_LOG(20, "%s", __func__);
+
+  cls_fifo_meta_update_op op;
+  try {
+    auto iter = in->cbegin();
+    decode(op, iter);
+  } catch (const buffer::error &err) {
+    CLS_ERR("ERROR: %s(): failed to decode request", __func__);
+    return -EINVAL;
+  }
+
+  fifo_info_t header;
+
+  int r = read_header(hctx, op.objv, &header);
+  if (r < 0) {
+    return r;
+  }
+
+  string err;
+
+  r = header.apply_update(op.tail_part_num,
+                          op.head_part_num,
+                          op.min_push_part_num,
+                          op.max_push_part_num,
+                          op.journal_entries_add,
+                          op.journal_entries_rm,
+                          &err);
+  if (r < 0) {
+    CLS_LOG(10, "%s(): %s", __func__, err.c_str());
+    return r;
+  }
+
+  r = write_header(hctx, header);
+  if (r < 0) {
+    CLS_LOG(10, "%s(): failed to write header: r=%d", __func__, r);
+    return r;
+  }
+
+  return 0;
+}
+
+static int fifo_meta_get_op(cls_method_context_t hctx,
+                          bufferlist *in, bufferlist *out)
+{
+  CLS_LOG(20, "%s", __func__);
+
+  cls_fifo_meta_get_op op;
+  try {
+    auto iter = in->cbegin();
+    decode(op, iter);
+  } catch (const buffer::error &err) {
+    CLS_ERR("ERROR: %s(): failed to decode request", __func__);
+    return -EINVAL;
+  }
+
+  cls_fifo_meta_get_op_reply reply;
+  int r = read_header(hctx, op.objv, &reply.info);
+  if (r < 0) {
+    return r;
+  }
+
+  reply.part_header_size = CLS_FIFO_MAX_PART_HEADER_SIZE;
+  reply.part_entry_overhead = part_entry_overhead;
+
+  encode(reply, *out);
+
+  return 0;
+}
+
+static int fifo_part_init_op(cls_method_context_t hctx,
+                             bufferlist *in, bufferlist *out)
+{
+  CLS_LOG(20, "%s", __func__);
+
+  cls_fifo_part_init_op op;
+  try {
+    auto iter = in->cbegin();
+    decode(op, iter);
+  } catch (const buffer::error &err) {
+    CLS_ERR("ERROR: %s(): failed to decode request", __func__);
+    return -EINVAL;
+  }
+
+  uint64_t size;
+
+  int r = cls_cxx_stat2(hctx, &size, nullptr);
+  if (r < 0 && r != -ENOENT) {
+    CLS_ERR("ERROR: %s(): cls_cxx_stat2() on obj returned %d", __func__, r);
+    return r;
+  }
+  if (r == 0 && size > 0) {
+    fifo_part_header_t part_header;
+    r = read_part_header(hctx, &part_header);
+    if (r < 0) {
+      CLS_LOG(10, "%s(): failed to read part header", __func__);
+      return r;
+    }
+
+    if (!(part_header.tag == op.tag &&
+          part_header.params == op.data_params)) {
+      CLS_LOG(10, "%s(): failed to re-create existing part with different params", __func__);
+      return -EEXIST;
+    }
+
+    return 0; /* already exists */
+  }
+
+  fifo_part_header_t part_header;
+  
+  part_header.tag = op.tag;
+  part_header.params = op.data_params;
+
+  part_header.min_ofs = CLS_FIFO_MAX_PART_HEADER_SIZE;
+  part_header.max_ofs = part_header.min_ofs;
+
+  cls_gen_random_bytes((char *)&part_header.magic, sizeof(part_header.magic));
+
+  r = write_part_header(hctx, part_header);
+  if (r < 0) {
+    CLS_LOG(10, "%s(): failed to write header: r=%d", __func__, r);
+    return r;
+  }
+
+  return 0;
+}
+
+static bool full_part(const fifo_part_header_t& part_header)
+{
+  return (part_header.max_ofs > part_header.params.full_size_threshold);
+}
+
+static int fifo_part_push_op(cls_method_context_t hctx,
+                             bufferlist *in, bufferlist *out)
+{
+  CLS_LOG(20, "%s", __func__);
+
+  cls_fifo_part_push_op op;
+  try {
+    auto iter = in->cbegin();
+    decode(op, iter);
+  } catch (const buffer::error &err) {
+    CLS_ERR("ERROR: %s(): failed to decode request", __func__);
+    return -EINVAL;
+  }
+
+  fifo_part_header_t part_header;
+  int r = read_part_header(hctx, &part_header);
+  if (r < 0) {
+    CLS_LOG(10, "%s(): failed to read part header", __func__);
+    return r;
+  }
+
+  if (!(part_header.tag == op.tag)) {
+    CLS_LOG(10, "%s(): bad tag", __func__);
+    return -EINVAL;
+  }
+
+  uint64_t effective_len = op.total_len + op.data_bufs.size() * part_entry_overhead;
+
+  if (effective_len > part_header.params.max_entry_size + part_entry_overhead) {
+    return -EINVAL;
+  }
+
+  if (full_part(part_header)) {
+    return -ERANGE;
+  }
+
+  struct cls_fifo_entry_header entry_header;
+  entry_header.mtime = real_clock::now();
+
+  bufferlist entry_header_bl;
+  encode(entry_header, entry_header_bl);
+
+  auto max_index = part_header.max_index;
+  auto ofs = part_header.max_ofs;
+
+  cls_fifo_entry_header_pre pre_header;
+  pre_header.magic = part_header.magic;
+  pre_header.pre_size = sizeof(pre_header);
+  pre_header.reserved = 0;
+
+  uint64_t total_data = 0;
+
+  for (auto& data : op.data_bufs) {
+    total_data += data.length();
+
+    pre_header.header_size = entry_header_bl.length();
+    pre_header.data_size = data.length();
+    pre_header.index = max_index;
+
+    bufferptr pre((char *)&pre_header, sizeof(pre_header));
+    bufferlist all_data;
+    all_data.append(pre);
+    all_data.append(entry_header_bl);
+    all_data.claim_append(data);
+
+    auto write_len = all_data.length();
+
+    r = cls_cxx_write2(hctx, ofs, write_len,
+                       &all_data, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
+    if (r < 0) {
+      CLS_LOG(10, "%s(): failed to write entry (ofs=%lld len=%lld): r=%d",
+              __func__, (long long)part_header.max_ofs, (long long)write_len, r);
+      return r;
+    }
+
+    ofs += write_len;
+    ++max_index;
+  }
+
+  if (total_data != op.total_len) {
+    CLS_LOG(10, "%s(): length mismatch: op.total_len=%lld total data received=%lld",
+            __func__, (long long)op.total_len, (long long)total_data);
+    return -EINVAL;
+  }
+
+  part_header.max_index = max_index;
+  part_header.max_ofs = ofs;
+
+  r = write_part_header(hctx, part_header);
+  if (r < 0) {
+    CLS_LOG(10, "%s(): failed to write header: r=%d", __func__, r);
+    return r;
+  }
+
+  return 0;
+}
+
+class EntryReader {
+  static constexpr uint64_t prefetch_len = (128 * 1024);
+
+  cls_method_context_t hctx;
+
+  fifo_part_header_t& part_header;
+
+  uint64_t ofs;
+  bufferlist data;
+
+  int fetch(uint64_t num_bytes);
+  int read(uint64_t num_bytes, bufferlist *pbl);
+  int peek(uint64_t num_bytes, char *dest);
+  int seek(uint64_t num_bytes);
+
+public:
+  EntryReader(cls_method_context_t _hctx,
+              fifo_part_header_t& _part_header,
+              uint64_t _ofs) : hctx(_hctx),
+                               part_header(_part_header),
+                               ofs(_ofs) {
+    if (ofs < part_header.min_ofs) {
+      ofs = part_header.min_ofs;
+    }
+  }
+
+  uint64_t get_ofs() const {
+    return ofs;
+  }
+
+  bool end() const {
+    return (ofs >= part_header.max_ofs);
+  }
+
+  int peek_pre_header(cls_fifo_entry_header_pre *pre_header);
+  int get_next_entry(bufferlist *pbl,
+                     uint64_t *pofs,
+                     ceph::real_time *pmtime);
+};
+
+
+int EntryReader::fetch(uint64_t num_bytes)
+{
+  CLS_LOG(20, "%s(): fetch %d bytes, ofs=%d data.length()=%d", __func__, (int)num_bytes, (int)ofs, (int)data.length());
+  if (data.length() < num_bytes) {
+    bufferlist bl;
+    CLS_LOG(20, "%s(): reading %d bytes at ofs=%d", __func__, (int)prefetch_len, (int)ofs + data.length());
+    int r = cls_cxx_read2(hctx, ofs + data.length(), prefetch_len, &bl, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
+    if (r < 0) {
+      CLS_ERR("ERROR: %s(): cls_cxx_read2() on obj returned %d", __func__, r);
+      return r;
+    }
+    data.claim_append(bl);
+  }
+
+  if ((unsigned)num_bytes > data.length()) {
+    CLS_LOG(20, "%s(): requested %lld bytes, but only %lld were available", __func__, (long long)num_bytes, (long long)data.length());
+    return -ERANGE;
+  }
+
+  return 0;
+}
+
+int EntryReader::read(uint64_t num_bytes, bufferlist *pbl)
+{
+  int r = fetch(num_bytes);
+  if (r < 0) {
+    return r;
+  }
+  data.splice(0, num_bytes, pbl);
+
+  ofs += num_bytes;
+
+  return 0;
+}
+
+int EntryReader::peek(uint64_t num_bytes, char *dest)
+{
+  int r = fetch(num_bytes);
+  if (r < 0) {
+    return r;
+  }
+
+  data.begin().copy(num_bytes, dest);
+
+  return 0;
+}
+
+int EntryReader::seek(uint64_t num_bytes)
+{
+  bufferlist bl;
+
+  CLS_LOG(20, "%s():%d: num_bytes=%d", __func__, __LINE__, (int)num_bytes);
+  return read(num_bytes, &bl);
+}
+
+int EntryReader::peek_pre_header(cls_fifo_entry_header_pre *pre_header)
+{
+  if (end()) {
+    return -ENOENT;
+  }
+
+  int r = peek(sizeof(*pre_header), (char *)pre_header);
+  if (r < 0) {
+    CLS_ERR("ERROR: %s(): peek() size=%d failed: r=%d", __func__, (int)sizeof(pre_header), r);
+    return r;
+  }
+
+  if (pre_header->magic != part_header.magic) {
+    CLS_ERR("ERROR: %s(): unexpected pre_header magic", __func__);
+    return -ERANGE;
+  }
+
+  return 0;
+}
+
+
+int EntryReader::get_next_entry(bufferlist *pbl,
+                                uint64_t *pofs,
+                                ceph::real_time *pmtime)
+{
+  cls_fifo_entry_header_pre pre_header;
+  int r = peek_pre_header(&pre_header);
+  if (r < 0) {
+    CLS_ERR("ERROR: %s(): peek_pre_header() failed: r=%d", __func__, r);
+    return r;
+  }
+
+  if (pofs) {
+    *pofs = ofs;
+  }
+
+  CLS_LOG(20, "%s():%d: pre_header.pre_size=%d", __func__, __LINE__, (int)pre_header.pre_size);
+  r = seek(pre_header.pre_size);
+  if (r < 0) {
+    CLS_ERR("ERROR: %s(): failed to seek: r=%d", __func__, r);
+    return r;
+  }
+
+  bufferlist header;
+  CLS_LOG(20, "%s():%d: pre_header.header_size=%d", __func__, __LINE__, (int)pre_header.header_size);
+  r = read(pre_header.header_size, &header);
+  if (r < 0) {
+    CLS_ERR("ERROR: %s(): failed to read entry header: r=%d", __func__, r);
+    return r;
+  }
+
+  cls_fifo_entry_header entry_header;
+  auto iter = header.cbegin();
+  try {
+    decode(entry_header, iter);
+  } catch (buffer::error& err) {
+    CLS_ERR("%s(): failed decoding entry header", __func__);
+    return -EIO;
+  }
+
+  if (pmtime) {
+    *pmtime = entry_header.mtime;
+  }
+
+  if (pbl) {
+    r = read(pre_header.data_size, pbl);
+    if (r < 0) {
+      CLS_ERR("%s(): failed reading data: r=%d", __func__, r);
+      return r;
+    }
+  } else {
+    r = seek(pre_header.data_size);
+    if (r < 0) {
+      CLS_ERR("ERROR: %s(): failed to seek: r=%d", __func__, r);
+      return r;
+    }
+  }
+
+  return 0;
+}
+
+static int fifo_part_trim_op(cls_method_context_t hctx,
+                             bufferlist *in, bufferlist *out)
+{
+  CLS_LOG(20, "%s", __func__);
+
+  cls_fifo_part_trim_op op;
+  try {
+    auto iter = in->cbegin();
+    decode(op, iter);
+  } catch (const buffer::error &err) {
+    CLS_ERR("ERROR: %s(): failed to decode request", __func__);
+    return -EINVAL;
+  }
+
+  fifo_part_header_t part_header;
+  int r = read_part_header(hctx, &part_header);
+  if (r < 0) {
+    CLS_LOG(10, "%s(): failed to read part header", __func__);
+    return r;
+  }
+
+  if (op.tag &&
+      !(part_header.tag == *op.tag)) {
+    CLS_LOG(10, "%s(): bad tag", __func__);
+    return -EINVAL;
+  }
+
+  if (op.ofs < part_header.min_ofs) {
+    return 0;
+  }
+
+  if (op.ofs >= part_header.max_ofs) {
+    if (full_part(part_header)) {
+      /*
+       * trim full part completely: remove object
+       */
+
+      r = cls_cxx_remove(hctx);
+      if (r < 0) {
+        CLS_LOG(0, "%s(): ERROR: cls_cxx_remove() returned r=%d", __func__, r);
+        return r;
+      }
+
+      return 0;
+    }
+    
+    part_header.min_ofs = part_header.max_ofs;
+    part_header.min_index = part_header.max_index;
+  } else {
+    EntryReader reader(hctx, part_header, op.ofs);
+
+    cls_fifo_entry_header_pre pre_header;
+    int r = reader.peek_pre_header(&pre_header);
+    if (r < 0) {
+      return r;
+    }
+
+    r = reader.get_next_entry(nullptr, nullptr, nullptr);
+    if (r < 0) {
+      CLS_ERR("ERROR: %s(): unexpected failure at get_next_entry(): r=%d", __func__, r);
+      return r;
+    }
+
+    part_header.min_ofs = reader.get_ofs();
+    part_header.min_index = pre_header.index + 1;
+  }
+
+  r = write_part_header(hctx, part_header);
+  if (r < 0) {
+    CLS_LOG(10, "%s(): failed to write header: r=%d", __func__, r);
+    return r;
+  }
+
+  return 0;
+}
+
+static int fifo_part_list_op(cls_method_context_t hctx,
+                             bufferlist *in, bufferlist *out)
+{
+  CLS_LOG(20, "%s", __func__);
+
+  cls_fifo_part_list_op op;
+  try {
+    auto iter = in->cbegin();
+    decode(op, iter);
+  } catch (const buffer::error &err) {
+    CLS_ERR("ERROR: %s(): failed to decode request", __func__);
+    return -EINVAL;
+  }
+
+  fifo_part_header_t part_header;
+  int r = read_part_header(hctx, &part_header);
+  if (r < 0) {
+    CLS_LOG(10, "%s(): failed to read part header", __func__);
+    return r;
+  }
+
+  if (op.tag &&
+      !(part_header.tag == *op.tag)) {
+    CLS_LOG(10, "%s(): bad tag", __func__);
+    return -EINVAL;
+  }
+
+  EntryReader reader(hctx, part_header, op.ofs);
+
+  if (op.ofs >= part_header.min_ofs &&
+      !reader.end()) {
+    r = reader.get_next_entry(nullptr, nullptr, nullptr);
+    if (r < 0) {
+      CLS_ERR("ERROR: %s(): unexpected failure at get_next_entry(): r=%d", __func__, r);
+      return r;
+    }
+  }
+
+  cls_fifo_part_list_op_reply reply;
+
+  reply.tag = part_header.tag;
+
+#define LIST_MAX_ENTRIES 512
+
+  auto max_entries = std::min(op.max_entries, (int)LIST_MAX_ENTRIES);
+
+  for (int i = 0; i < max_entries && !reader.end(); ++i) {
+    bufferlist data;
+    ceph::real_time mtime;
+    uint64_t ofs;
+
+    r = reader.get_next_entry(&data, &ofs, &mtime);
+    if (r < 0) {
+      CLS_ERR("ERROR: %s(): unexpected failure at get_next_entry(): r=%d", __func__, r);
+      return r;
+    }
+
+    reply.entries.emplace_back(std::move(data), ofs, mtime);
+  }
+
+  reply.more = !reader.end();
+  reply.full_part = full_part(part_header);
+
+  encode(reply, *out);
+
+  return 0;
+}
+
+static int fifo_part_get_info_op(cls_method_context_t hctx,
+                                 bufferlist *in, bufferlist *out)
+{
+  CLS_LOG(20, "%s", __func__);
+
+  cls_fifo_part_get_info_op op;
+  try {
+    auto iter = in->cbegin();
+    decode(op, iter);
+  } catch (const buffer::error &err) {
+    CLS_ERR("ERROR: %s(): failed to decode request", __func__);
+    return -EINVAL;
+  }
+
+  cls_fifo_part_get_info_op_reply reply;
+
+  int r = read_part_header(hctx, &reply.header);
+  if (r < 0) {
+    CLS_LOG(10, "%s(): failed to read part header", __func__);
+    return r;
+  }
+
+  encode(reply, *out);
+
+  return 0;
+}
+
+CLS_INIT(fifo)
+{
+  CLS_LOG(20, "Loaded fifo class!");
+
+  cls_handle_t h_class;
+  cls_method_handle_t h_fifo_meta_create_op;
+  cls_method_handle_t h_fifo_meta_get_op;
+  cls_method_handle_t h_fifo_meta_update_op;
+  cls_method_handle_t h_fifo_part_init_op;
+  cls_method_handle_t h_fifo_part_push_op;
+  cls_method_handle_t h_fifo_part_trim_op;
+  cls_method_handle_t h_fifo_part_list_op;
+  cls_method_handle_t h_fifo_part_get_info_op;
+
+  cls_register("fifo", &h_class);
+  cls_register_cxx_method(h_class, "fifo_meta_create",
+                          CLS_METHOD_RD | CLS_METHOD_WR,
+                          fifo_meta_create_op, &h_fifo_meta_create_op);
+
+  cls_register_cxx_method(h_class, "fifo_meta_get",
+                          CLS_METHOD_RD,
+                          fifo_meta_get_op, &h_fifo_meta_get_op);
+
+  cls_register_cxx_method(h_class, "fifo_meta_update",
+                          CLS_METHOD_RD | CLS_METHOD_WR,
+                          fifo_meta_update_op, &h_fifo_meta_update_op);
+
+  cls_register_cxx_method(h_class, "fifo_part_init",
+                          CLS_METHOD_RD | CLS_METHOD_WR,
+                          fifo_part_init_op, &h_fifo_part_init_op);
+
+  cls_register_cxx_method(h_class, "fifo_part_push",
+                          CLS_METHOD_RD | CLS_METHOD_WR,
+                          fifo_part_push_op, &h_fifo_part_push_op);
+
+  cls_register_cxx_method(h_class, "fifo_part_trim",
+                          CLS_METHOD_RD | CLS_METHOD_WR,
+                          fifo_part_trim_op, &h_fifo_part_trim_op);
+
+  cls_register_cxx_method(h_class, "fifo_part_list",
+                          CLS_METHOD_RD,
+                          fifo_part_list_op, &h_fifo_part_list_op);
+
+  cls_register_cxx_method(h_class, "fifo_part_get_info",
+                          CLS_METHOD_RD,
+                          fifo_part_get_info_op, &h_fifo_part_get_info_op);
+
+  /* calculate entry overhead */
+  struct cls_fifo_entry_header entry_header;
+  bufferlist entry_header_bl;
+  encode(entry_header, entry_header_bl);
+
+  part_entry_overhead = sizeof(cls_fifo_entry_header_pre) + entry_header_bl.length();
+
+  return;
+}
diff --git a/src/cls/fifo/cls_fifo_client.cc b/src/cls/fifo/cls_fifo_client.cc
new file mode 100644 (file)
index 0000000..f4888fd
--- /dev/null
@@ -0,0 +1,1070 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2019 Red Hat, Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#include "include/rados/librados.hpp"
+#include "common/dout.h"
+
+#include "auth/Crypto.h"
+
+using namespace librados;
+
+#include "cls/fifo/cls_fifo_ops.h"
+#include "cls/fifo/cls_fifo_client.h"
+
+
+#define dout_subsys ceph_subsys_objclass
+
+
+namespace rados {
+  namespace cls {
+    namespace fifo {
+      int ClsFIFO::meta_create(librados::ObjectWriteOperation *rados_op,
+                               const string& id,
+                               const MetaCreateParams& params) {
+        cls_fifo_meta_create_op op;
+
+        auto& state = params.state;
+
+        if (id.empty()) {
+          return -EINVAL;
+        }
+
+        op.id = id;
+        op.objv = state.objv;
+        op.oid_prefix = state.oid_prefix;
+        op.max_part_size = state.max_part_size;
+        op.max_entry_size = state.max_entry_size;
+        op.exclusive = state.exclusive;
+
+        if (op.max_part_size == 0 ||
+            op.max_entry_size == 0 ||
+            op.max_entry_size > op.max_part_size) {
+          return -EINVAL;
+        }
+
+        bufferlist in;
+        encode(op, in);
+        rados_op->exec("fifo", "fifo_meta_create", in);
+
+        return 0;
+      }
+
+      int ClsFIFO::meta_get(librados::IoCtx& ioctx,
+                            const string& oid,
+                            const MetaGetParams& params,
+                            fifo_info_t *result,
+                            uint32_t *part_header_size,
+                            uint32_t *part_entry_overhead) {
+        cls_fifo_meta_get_op op;
+
+        auto& state = params.state;
+
+        op.objv = state.objv;
+
+        librados::ObjectReadOperation rop;
+
+        bufferlist in;
+        bufferlist out;
+        int op_ret;
+        encode(op, in);
+        rop.exec("fifo", "fifo_meta_get", in, &out, &op_ret);
+
+        int r = ioctx.operate(oid, &rop, nullptr);
+        if (r < 0) {
+          return r;
+        }
+
+        if (op_ret < 0) {
+          return op_ret;
+        }
+
+        cls_fifo_meta_get_op_reply reply;
+        auto iter = out.cbegin();
+        try {
+          decode(reply, iter);
+        } catch (buffer::error& err) {
+          return -EIO;
+        }
+
+        *result = reply.info;
+
+        if (part_header_size) {
+          *part_header_size = reply.part_header_size;
+        }
+
+        if (part_entry_overhead) {
+          *part_entry_overhead = reply.part_entry_overhead;
+        }
+
+        return 0;
+      }
+
+      int ClsFIFO::meta_update(librados::ObjectWriteOperation *rados_op,
+                               const MetaUpdateParams& params) {
+        cls_fifo_meta_update_op op;
+
+        auto& state = params.state;
+
+        if (state.objv.empty()) {
+          return -EINVAL;
+        }
+
+        op.objv = state.objv;
+        op.tail_part_num = state.tail_part_num;
+        op.head_part_num = state.head_part_num;
+        op.min_push_part_num = state.min_push_part_num;
+        op.max_push_part_num = state.max_push_part_num;
+        op.journal_entries_add = state.journal_entries_add;
+        op.journal_entries_rm = state.journal_entries_rm;
+
+        bufferlist in;
+        encode(op, in);
+        rados_op->exec("fifo", "fifo_meta_update", in);
+
+        return 0;
+      }
+
+      int ClsFIFO::part_init(librados::ObjectWriteOperation *rados_op,
+                             const PartInitParams& params) {
+        cls_fifo_part_init_op op;
+
+        auto& state = params.state;
+
+        if (state.tag.empty()) {
+          return -EINVAL;
+        }
+
+        op.tag = state.tag;
+        op.data_params = state.data_params;
+
+        bufferlist in;
+        encode(op, in);
+        rados_op->exec("fifo", "fifo_part_init", in);
+
+        return 0;
+      }
+
+      int ClsFIFO::push_part(librados::ObjectWriteOperation *rados_op,
+                             const PushPartParams& params) {
+        cls_fifo_part_push_op op;
+
+        auto& state = params.state;
+
+        if (state.tag.empty()) {
+          return -EINVAL;
+        }
+
+        op.tag = state.tag;
+        op.data_bufs = state.data_bufs;
+        op.total_len = state.total_len;
+
+        bufferlist in;
+        encode(op, in);
+        rados_op->exec("fifo", "fifo_part_push", in);
+
+        return 0;
+      }
+
+      int ClsFIFO::trim_part(librados::ObjectWriteOperation *rados_op,
+                             const TrimPartParams& params) {
+        cls_fifo_part_trim_op op;
+
+        auto& state = params.state;
+
+        op.tag = state.tag;
+        op.ofs = state.ofs;
+
+        bufferlist in;
+        encode(op, in);
+        rados_op->exec("fifo", "fifo_part_trim", in);
+
+        return 0;
+      }
+
+      int ClsFIFO::list_part(librados::IoCtx& ioctx,
+                             const string& oid,
+                             const ListPartParams& params,
+                             std::vector<cls_fifo_part_list_entry_t> *pentries,
+                             bool *more,
+                             bool *full_part,
+                             string *ptag)
+      {
+        cls_fifo_part_list_op op;
+
+        auto& state = params.state;
+
+        op.tag = state.tag;
+        op.ofs = state.ofs;
+        op.max_entries = state.max_entries;
+
+        librados::ObjectReadOperation rop;
+
+        bufferlist in;
+        bufferlist out;
+        int op_ret;
+        encode(op, in);
+        rop.exec("fifo", "fifo_part_list", in, &out, &op_ret);
+
+        int r = ioctx.operate(oid, &rop, nullptr);
+        if (r < 0) {
+          return r;
+        }
+
+        if (op_ret < 0) {
+          return op_ret;
+        }
+
+        cls_fifo_part_list_op_reply reply;
+        auto iter = out.cbegin();
+        try {
+          decode(reply, iter);
+        } catch (buffer::error& err) {
+          return -EIO;
+        }
+
+        if (pentries) {
+          *pentries = std::move(reply.entries);
+        }
+
+        if (more) {
+          *more = reply.more;
+        }
+
+        if (full_part) {
+          *full_part = reply.full_part;
+        }
+
+        if (ptag) {
+          *ptag = reply.tag;
+        }
+
+        return 0;
+      }
+
+      int ClsFIFO::get_part_info(librados::IoCtx& ioctx,
+                                 const string& oid,
+                                 rados::cls::fifo::fifo_part_header_t *header)
+      {
+        cls_fifo_part_get_info_op op;
+
+        librados::ObjectReadOperation rop;
+
+        bufferlist in;
+        bufferlist out;
+        int op_ret;
+        encode(op, in);
+        rop.exec("fifo", "fifo_part_get_info", in, &out, &op_ret);
+
+        int r = ioctx.operate(oid, &rop, nullptr);
+        if (r < 0) {
+          return r;
+        }
+
+        if (op_ret < 0) {
+          return op_ret;
+        }
+
+        cls_fifo_part_get_info_op_reply reply;
+        auto iter = out.cbegin();
+        try {
+          decode(reply, iter);
+        } catch (buffer::error& err) {
+          return -EIO;
+        }
+
+        if (header) {
+          *header = std::move(reply.header);
+        }
+
+        return 0;
+      }
+
+      string FIFO::craft_marker(int64_t part_num,
+                                   uint64_t part_ofs)
+      {
+        char buf[64];
+        snprintf(buf, sizeof(buf), "%lld:%lld", (long long)part_num, (long long)part_ofs);
+        return string(buf);
+      }
+
+      bool FIFO::parse_marker(const string& marker,
+                              int64_t *part_num,
+                              uint64_t *part_ofs)
+      {
+        if (marker.empty()) {
+          *part_num = meta_info.tail_part_num;
+          *part_ofs = 0;
+          return true;
+        }
+
+        auto pos = marker.find(':');
+        if (pos == string::npos) {
+          return false;
+        }
+
+        auto first = marker.substr(0, pos);
+        auto second = marker.substr(pos + 1);
+
+        string err;
+
+        *part_num = (int64_t)strict_strtoll(first.c_str(), 10, &err);
+        if (!err.empty()) {
+          return false;
+        }
+
+        *part_ofs = (uint64_t)strict_strtoll(second.c_str(), 10, &err);
+        if (!err.empty()) {
+          return false;
+        }
+
+        return true;
+      }
+
+      int FIFO::init_ioctx(librados::Rados *rados,
+                           const string& pool,
+                           std::optional<string> pool_ns)
+      {
+        _ioctx.emplace();
+        int r = rados->ioctx_create(pool.c_str(), *_ioctx);
+        if (r < 0) {
+          return r;
+        }
+
+        if (pool_ns && !pool_ns->empty()) {
+          _ioctx->set_namespace(*pool_ns);
+        }
+
+        ioctx = &(*_ioctx);
+
+        return 0;
+      }
+
+      int ClsFIFO::MetaUpdateParams::apply_update(CephContext *cct,
+                                                  fifo_info_t *info)
+      {
+        string err;
+
+        int r = info->apply_update(state.tail_part_num,
+                                   state.head_part_num,
+                                   state.min_push_part_num,
+                                   state.max_push_part_num,
+                                   state.journal_entries_add,
+                                   state.journal_entries_rm,
+                                   &err);
+        if (r < 0) {
+          ldout(cct, 0) << __func__ << "(): ERROR: " << err << dendl;
+          return r;
+        }
+
+        ++info->objv.ver;
+
+        return 0;
+      }
+
+      int FIFO::update_meta(ClsFIFO::MetaUpdateParams& update_params,
+                            bool *canceled)
+      {
+        update_params.objv(meta_info.objv);
+
+        librados::ObjectWriteOperation wop;
+        int r = ClsFIFO::meta_update(&wop, update_params);
+        if (r < 0) {
+          return r;
+        }
+
+        r = ioctx->operate(meta_oid, &wop);
+        if (r < 0 && r != -ECANCELED) {
+          return r;
+        }
+
+        *canceled = (r == -ECANCELED);
+
+        if (!*canceled) {
+          r = update_params.apply_update(cct, &meta_info);
+          if (r < 0) { /* should really not happen,
+                          but if it does, let's treat it as if race was detected */
+            *canceled = true;
+          }
+        }
+
+        if (*canceled) {
+          r = do_read_meta();
+        }
+        if (r < 0) {
+          return r;
+        }
+
+        return 0;
+      }
+
+      int FIFO::do_read_meta(std::optional<fifo_objv_t> objv)
+      {
+        ClsFIFO::MetaGetParams get_params;
+        if (objv) {
+          get_params.objv(*objv);
+        }
+        int r = ClsFIFO::meta_get(*ioctx,
+                                  meta_oid,
+                                  get_params,
+                                  &meta_info,
+                                  &part_header_size,
+                                  &part_entry_overhead);
+        if (r < 0) {
+          return r;
+        }
+
+        return 0;
+      }
+
+      int FIFO::create_part(int64_t part_num, const string& tag,
+                            int64_t& max_part_num) {
+        librados::ObjectWriteOperation op;
+
+        op.create(true); /* exclusive */
+        int r = ClsFIFO::part_init(&op,
+                                   ClsFIFO::PartInitParams()
+                                   .tag(tag)
+                                   .data_params(meta_info.data_params));
+        if (r < 0) {
+          return r;
+        }
+
+        r = ioctx->operate(meta_info.part_oid(part_num), &op);
+        if (r < 0) {
+          return r;
+        }
+
+        if (part_num > max_part_num) {
+          max_part_num = part_num;
+        }
+
+        return 0;
+      }
+
+      int FIFO::remove_part(int64_t part_num, const string& tag,
+                            int64_t& tail_part_num) {
+        librados::ObjectWriteOperation op;
+        op.remove();
+        int r = ioctx->operate(meta_info.part_oid(part_num), &op);
+        if (r == -ENOENT) {
+          r = 0;
+        }
+        if (r < 0) {
+          return r;
+        }
+
+        if (part_num >= tail_part_num) {
+          tail_part_num = part_num + 1;
+        }
+
+        return 0;
+      }
+
+      int FIFO::process_journal_entry(const fifo_journal_entry_t& entry,
+                                      int64_t& tail_part_num,
+                                      int64_t& head_part_num,
+                                      int64_t& max_part_num)
+      {
+
+        switch (entry.op) {
+          case fifo_journal_entry_t::Op::OP_CREATE:
+            return create_part(entry.part_num, entry.part_tag, max_part_num);
+          case fifo_journal_entry_t::Op::OP_SET_HEAD:
+            if (entry.part_num > head_part_num) {
+              head_part_num = entry.part_num;
+            }
+            return 0;
+          case fifo_journal_entry_t::Op::OP_REMOVE:
+            return remove_part(entry.part_num, entry.part_tag, tail_part_num);
+        default:
+          /* nothing to do */
+          break;
+        }
+
+        return -EIO;
+      }
+
+      int FIFO::process_journal_entries(vector<fifo_journal_entry_t> *processed,
+                                        int64_t& tail_part_num,
+                                        int64_t& head_part_num,
+                                        int64_t& max_part_num)
+      {
+        for (auto& iter : meta_info.journal) {
+          auto& entry = iter.second;
+          int r = process_journal_entry(entry, tail_part_num, head_part_num, max_part_num);
+          if (r < 0) {
+            ldout(cct, 10) << __func__ << "(): ERROR: failed processing journal entry for part=" << entry.part_num << dendl;
+          } else {
+            processed->push_back(entry);
+          }
+        }
+
+        return 0;
+      }
+
+      int FIFO::process_journal()
+      {
+        vector<fifo_journal_entry_t> processed;
+
+        int64_t new_tail = meta_info.tail_part_num;
+        int64_t new_head = meta_info.head_part_num;
+        int64_t new_max = meta_info.max_push_part_num;
+
+        int r = process_journal_entries(&processed, new_tail, new_head, new_max);
+        if (r < 0) {
+          return r;
+        }
+
+        if (processed.empty()) {
+          return 0;
+        }
+
+#define RACE_RETRY 10
+
+        int i;
+
+        for (i = 0; i < RACE_RETRY; ++i) {
+          bool canceled;
+
+          std::optional<int64_t> tail_part_num;
+          std::optional<int64_t> head_part_num;
+          std::optional<int64_t> max_part_num;
+
+          if (new_tail > meta_info.tail_part_num) {
+            tail_part_num = new_tail;
+          }
+
+          if (new_head > meta_info.head_part_num) {
+            head_part_num = new_head;
+          }
+
+          if (new_max > meta_info.max_push_part_num) {
+            max_part_num = new_max;
+          }
+
+          if (processed.empty() &&
+              !tail_part_num &&
+              !max_part_num) {
+            /* nothing to update anymore */
+            break;
+          }
+
+          r = update_meta(ClsFIFO::MetaUpdateParams()
+                          .journal_entries_rm(processed)
+                          .tail_part_num(tail_part_num)
+                          .head_part_num(head_part_num)
+                          .max_push_part_num(max_part_num),
+                          &canceled);
+          if (r < 0) {
+            return r;
+          }
+
+          if (canceled) {
+            vector<fifo_journal_entry_t> new_processed;
+
+            for (auto& e : processed) {
+              auto jiter = meta_info.journal.find(e.part_num);
+              if (jiter == meta_info.journal.end() || /* journal entry was already processed */
+                  !(jiter->second == e)) {
+                continue;
+              }
+              
+              new_processed.push_back(e);
+            }
+            processed = std::move(new_processed);
+            continue;
+          }
+          break;
+        }
+        if (i == RACE_RETRY) {
+          ldout(cct, 0) << "ERROR: " << __func__ << "(): race check failed too many times, likely a bug" << dendl;
+          return -ECANCELED;
+        }
+        return 0;
+      }
+
+      static const char alphanum_plain_table[]="ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789";
+
+      void gen_rand_alphanumeric_plain(CephContext *cct, char *dest, int size) /* size should be the required string size + 1 */
+      {
+        cct->random()->get_bytes(dest, size);
+
+        int i;
+        for (i = 0; i < size - 1; i++) {
+          int pos = (unsigned)dest[i];
+          dest[i] = alphanum_plain_table[pos % (sizeof(alphanum_plain_table) - 1)];
+        }
+        dest[i] = '\0';
+      }
+
+      static string generate_tag(CephContext *cct)
+      {
+#define HEADER_TAG_SIZE 16
+        char buf[HEADER_TAG_SIZE + 1];
+        buf[HEADER_TAG_SIZE] = 0;
+        gen_rand_alphanumeric_plain(cct, buf, sizeof(buf));
+        return string(buf);
+      }
+
+      int FIFO::prepare_new_part(bool is_head)
+      {
+        fifo_journal_entry_t jentry;
+
+        meta_info.prepare_next_journal_entry(&jentry, generate_tag(cct));
+
+        int64_t new_head_part_num = meta_info.head_part_num;
+
+        std::optional<fifo_journal_entry_t> new_head_jentry;
+        if (is_head) {
+          new_head_jentry = jentry;
+          new_head_jentry->op = fifo_journal_entry_t::OP_SET_HEAD;
+          new_head_part_num = jentry.part_num;
+        }
+
+        int r;
+        bool canceled;
+
+        int i;
+
+        for (i = 0; i < RACE_RETRY; ++i) {
+          r = update_meta(ClsFIFO::MetaUpdateParams()
+                          .journal_entry_add(jentry)
+                          .journal_entry_add(new_head_jentry),
+                          &canceled);
+          if (r < 0) {
+            return r;
+          }
+
+          if (canceled) {
+            if (meta_info.max_push_part_num >= jentry.part_num &&
+                meta_info.head_part_num >= new_head_part_num) { /* raced, but new part was already written */
+              return 0;
+            }
+
+            auto iter = meta_info.journal.find(jentry.part_num);
+            if (iter == meta_info.journal.end()) {
+              continue;
+            }
+          }
+          break;
+        }
+        if (i == RACE_RETRY) {
+          ldout(cct, 0) << "ERROR: " << __func__ << "(): race check failed too many times, likely a bug" << dendl;
+          return -ECANCELED;
+        }
+
+        r = process_journal();
+        if (r < 0) {
+          return r;
+        }
+
+        return 0;
+      }
+
+      int FIFO::prepare_new_head()
+      {
+        int64_t new_head_num = meta_info.head_part_num + 1;
+
+        if (meta_info.max_push_part_num < new_head_num) {
+          int r = prepare_new_part(true);
+          if (r < 0) {
+            return r;
+          }
+
+          if (meta_info.max_push_part_num < new_head_num) {
+            ldout(cct, 0) << "ERROR: " << __func__ << ": after new part creation: meta_info.max_push_part_num="
+              << meta_info.max_push_part_num << " new_head_num=" << meta_info.max_push_part_num << dendl;
+            return -EIO;
+          }
+
+          return 0;
+        }
+
+        int i;
+
+        for (i = 0; i < RACE_RETRY; ++i) {
+          bool canceled;
+          int r = update_meta(ClsFIFO::MetaUpdateParams()
+                              .head_part_num(new_head_num),
+                              &canceled);
+          if (r < 0) {
+            return r;
+          }
+
+          if (canceled) {
+            if (meta_info.head_part_num < new_head_num) {
+              continue;
+            }
+          }
+          break;
+        }
+        if (i == RACE_RETRY) {
+          ldout(cct, 0) << "ERROR: " << __func__ << "(): race check failed too many times, likely a bug" << dendl;
+          return -ECANCELED;
+        }
+
+        
+        return 0;
+      }
+
+      int FIFO::open(bool create,
+                        std::optional<ClsFIFO::MetaCreateParams> create_params)
+      {
+        if (!ioctx) {
+          return -EINVAL;
+        }
+
+        if (create) {
+          librados::ObjectWriteOperation op;
+
+          ClsFIFO::MetaCreateParams default_params;
+          ClsFIFO::MetaCreateParams *params = (create_params ? &(*create_params) : &default_params);
+
+          int r = ClsFIFO::meta_create(&op, id, *params);
+          if (r < 0) {
+            return r;
+          }
+
+          r = ioctx->operate(meta_oid, &op);
+          if (r < 0) {
+            return r;
+          }
+        }
+
+        std::optional<fifo_objv_t> objv = (create_params ?  create_params->state.objv : nullopt);
+
+        int r = do_read_meta(objv);
+        if (r < 0) {
+          return r;
+        }
+
+        is_open = true;
+
+        return 0;
+      }
+
+      int FIFO::read_meta(std::optional<fifo_objv_t> objv)
+      {
+        if (!is_open) {
+          return -EINVAL;
+        }
+
+        return do_read_meta(objv);
+      }
+
+      int FIFO::push_entries(int64_t part_num, std::vector<bufferlist>& data_bufs)
+      {
+        if (!is_open) {
+          return -EINVAL;
+        }
+
+        librados::ObjectWriteOperation op;
+
+        int r = ClsFIFO::push_part(&op, ClsFIFO::PushPartParams()
+                                   .tag(meta_info.head_tag)
+                                   .data_bufs(data_bufs));
+        if (r < 0) {
+          return r;
+        }
+
+        r = ioctx->operate(meta_info.part_oid(part_num), &op);
+        if (r < 0) {
+          return r;
+        }
+
+        return 0;
+      }
+
+      int FIFO::trim_part(int64_t part_num,
+                          uint64_t ofs,
+                          std::optional<string> tag)
+      {
+        if (!is_open) {
+          return -EINVAL;
+        }
+
+        librados::ObjectWriteOperation op;
+
+        int r = ClsFIFO::trim_part(&op, ClsFIFO::TrimPartParams()
+                                        .tag(tag)
+                                        .ofs(ofs));
+        if (r < 0) {
+          return r;
+        }
+
+        r = ioctx->operate(meta_info.part_oid(part_num), &op);
+        if (r < 0) {
+          return r;
+        }
+
+        return 0;
+      }
+
+      int FIFO::push(bufferlist& bl)
+      {
+        std::vector<bufferlist> data_bufs;
+        data_bufs.push_back(bl);
+
+        return push(data_bufs);
+      }
+
+      int FIFO::push(vector<bufferlist>& data_bufs)
+      {
+        if (!is_open) {
+          return -EINVAL;
+        }
+
+        int r;
+
+        if (meta_info.need_new_head()) {
+          r = prepare_new_head();
+          if (r < 0) {
+            return r;
+          }
+        }
+
+        int i;
+
+        auto iter = data_bufs.begin();
+
+        while (iter != data_bufs.end()) {
+          uint64_t batch_len = 0;
+
+          vector<bufferlist> batch;
+
+          for (; iter != data_bufs.end(); ++iter) {
+            auto& data = *iter;
+            auto data_len = data.length();
+            auto max_entry_size = meta_info.data_params.max_entry_size;
+
+            if (data_len > max_entry_size) {
+              ldout(cct, 10) << __func__ << "(): entry too large: " << data_len << " > " <<  meta_info.data_params.max_entry_size << dendl;
+              return -EINVAL;
+            }
+
+            if (batch_len + data_len > max_entry_size) {
+              break;
+            }
+
+            batch_len +=  data_len + part_entry_overhead; /* we can send entry with data_len up to max_entry_size,
+                                                             however, we want to also account the overhead when dealing
+                                                             with multiple entries. Previous check doesn't account
+                                                             for overhead on purpose. */
+
+            batch.push_back(data);
+          }
+
+
+          for (i = 0; i < RACE_RETRY; ++i) {
+            r = push_entries(meta_info.head_part_num, batch);
+            if (r == -ERANGE) {
+              r = prepare_new_head();
+              if (r < 0) {
+                return r;
+              }
+              continue;
+            }
+            if (r < 0) {
+              return r;
+            }
+            break;
+          }
+          if (i == RACE_RETRY) {
+            ldout(cct, 0) << "ERROR: " << __func__ << "(): race check failed too many times, likely a bug" << dendl;
+            return -ECANCELED;
+          }
+        }
+
+        return 0;
+      }
+
+      int FIFO::list(int max_entries,
+                     std::optional<string> marker,
+                     vector<fifo_entry> *result,
+                     bool *more)
+      {
+        if (!is_open) {
+          return -EINVAL;
+        }
+
+        *more = false;
+
+        int64_t part_num = meta_info.tail_part_num;
+        uint64_t ofs = 0;
+
+        if (marker) {
+          if (!parse_marker(*marker, &part_num, &ofs)) {
+            ldout(cct, 20) << __func__ << "(): failed to parse marker (" << *marker << ")" << dendl;
+            return -EINVAL;
+          }
+        }
+
+        result->clear();
+        result->reserve(max_entries);
+
+        bool part_more{false};
+        bool part_full{false};
+
+        while (max_entries > 0) {
+          std::vector<cls_fifo_part_list_entry_t> entries;
+          int r = ClsFIFO::list_part(*ioctx,
+                                     meta_info.part_oid(part_num),
+                                     ClsFIFO::ListPartParams()
+                                     .ofs(ofs)
+                                     .max_entries(max_entries),
+                                     &entries,
+                                     &part_more,
+                                     &part_full,
+                                     nullptr);
+          if (r == -ENOENT) {
+            r = do_read_meta();
+            if (r < 0) {
+              return r;
+            }
+
+            if (part_num < meta_info.tail_part_num) {
+              /* raced with trim? restart */
+              result->clear();
+              part_num = meta_info.tail_part_num;
+              ofs = 0;
+              continue;
+            }
+
+            /* assuming part was not written yet, so end of data */
+
+            *more = false;
+
+            return 0;
+          }
+          if (r < 0) {
+            ldout(cct, 20) << __func__ << "(): ClsFIFO::list_part() on oid=" << meta_info.part_oid(part_num) << " returned r=" << r << dendl;
+            return r;
+          }
+
+          for (auto& entry : entries) {
+            fifo_entry e;
+            e.data = std::move(entry.data);
+            e.marker = craft_marker(part_num, entry.ofs);
+            e.mtime = entry.mtime;
+
+            result->push_back(e);
+          }
+          max_entries -= entries.size();
+
+          if (max_entries > 0 &&
+              part_more) {
+            continue;
+          }
+
+          if (!part_full) { /* head part is not full */
+            break;
+          }
+
+          ++part_num;
+          ofs = 0;
+        }
+
+        *more = part_full || part_more;
+
+        return 0;
+      }
+
+      int FIFO::trim(const string& marker)
+      {
+        if (!is_open) {
+          return -EINVAL;
+        }
+
+        int64_t part_num;
+        uint64_t ofs;
+
+        if (!parse_marker(marker, &part_num, &ofs)) {
+          ldout(cct, 20) << __func__ << "(): failed to parse marker: marker=" << marker << dendl;
+          return -EINVAL;
+        }
+
+        for (int64_t pn = meta_info.tail_part_num; pn < part_num; ++pn) {
+          int r = trim_part(pn, meta_info.data_params.max_part_size, std::nullopt);
+          if (r < 0 &&
+              r != -ENOENT) {
+            ldout(cct, 0) << __func__ << "(): ERROR: trim_part() on part=" << pn << " returned r=" << r << dendl;
+            return r;
+          }
+        }
+
+        int r = trim_part(part_num, ofs, std::nullopt);
+        if (r < 0 &&
+            r != -ENOENT) {
+          ldout(cct, 0) << __func__ << "(): ERROR: trim_part() on part=" << part_num << " returned r=" << r << dendl;
+          return r;
+        }
+
+        if (part_num <= meta_info.tail_part_num) {
+          /* don't need to modify meta info */
+          return 0;
+        }
+
+        int i;
+
+        for (i = 0; i < RACE_RETRY; ++i) {
+          bool canceled;
+          int r = update_meta(ClsFIFO::MetaUpdateParams()
+                              .tail_part_num(part_num),
+                              &canceled);
+          if (r < 0) {
+            return r;
+          }
+
+          if (canceled) {
+            if (meta_info.tail_part_num < part_num) {
+              continue;
+            }
+          }
+          break;
+
+          if (i == RACE_RETRY) {
+            ldout(cct, 0) << "ERROR: " << __func__ << "(): race check failed too many times, likely a bug" << dendl;
+            return -ECANCELED;
+          }
+        }
+
+        return 0;
+      }
+
+      int FIFO::get_part_info(int64_t part_num,
+                              fifo_part_info *result)
+      {
+        if (!is_open) {
+          return -EINVAL;
+        }
+
+        fifo_part_header_t header;
+
+        int r = ClsFIFO::get_part_info(*ioctx,
+                                       meta_info.part_oid(part_num),
+                                       &header);
+        if (r < 0) {
+          return r;
+        }
+
+        *result = std::move(header);
+
+        return 0;
+      }
+
+    } // namespace fifo
+  } // namespace cls
+} // namespace rados
+
diff --git a/src/cls/fifo/cls_fifo_client.h b/src/cls/fifo/cls_fifo_client.h
new file mode 100644 (file)
index 0000000..02e5681
--- /dev/null
@@ -0,0 +1,382 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2019 Red Hat, Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+
+#pragma once
+
+#include "cls/fifo/cls_fifo_types.h"
+
+namespace rados {
+  namespace cls {
+    namespace fifo {
+
+      class ClsFIFO {
+      public:
+
+        /* create */
+
+        struct MetaCreateParams {
+          struct State {
+            static constexpr uint64_t default_max_part_size = 4 * 1024 * 1024;
+            static constexpr uint64_t default_max_entry_size = 32 * 1024;
+            std::optional<fifo_objv_t> objv;
+            std::optional<std::string> oid_prefix;
+            bool exclusive{false};
+            uint64_t max_part_size{default_max_part_size};
+            uint64_t max_entry_size{default_max_entry_size};
+          } state;
+
+          MetaCreateParams& oid_prefix(const std::string& oid_prefix) {
+            state.oid_prefix = oid_prefix;
+            return *this;
+          }
+          MetaCreateParams& exclusive(bool exclusive) {
+            state.exclusive = exclusive;
+            return *this;
+          }
+          MetaCreateParams& max_part_size(uint64_t max_part_size) {
+            state.max_part_size = max_part_size;
+            return *this;
+          }
+          MetaCreateParams& max_entry_size(uint64_t max_entry_size) {
+            state.max_entry_size = max_entry_size;
+            return *this;
+          }
+          MetaCreateParams& objv(const fifo_objv_t& objv) {
+            state.objv = objv;
+            return *this;
+          }
+          MetaCreateParams& objv(const std::string& instance, uint64_t ver) {
+            state.objv = fifo_objv_t{instance, ver};
+            return *this;
+          }
+        };
+
+        static int meta_create(librados::ObjectWriteOperation *op,
+                               const string& id,
+                               const MetaCreateParams& params);
+
+        /* get info */
+
+        struct MetaGetParams {
+          struct State {
+            std::optional<fifo_objv_t> objv;
+          } state;
+
+          MetaGetParams& objv(std::optional<fifo_objv_t>& v) {
+            state.objv = v;
+            return *this;
+          }
+          MetaGetParams& objv(const fifo_objv_t& v) {
+            state.objv = v;
+            return *this;
+          }
+          MetaGetParams& objv(const std::string& instance, uint64_t ver) {
+            state.objv = fifo_objv_t{instance, ver};
+            return *this;
+          }
+        };
+        static int meta_get(librados::IoCtx& ioctx,
+                            const string& oid,
+                            const MetaGetParams& params,
+                            rados::cls::fifo::fifo_info_t *result,
+                            uint32_t *part_header_size,
+                            uint32_t *part_entry_overhead);
+
+        /* update */
+
+        struct MetaUpdateParams {
+          struct State {
+            rados::cls::fifo::fifo_objv_t objv;
+
+            std::optional<uint64_t> tail_part_num;
+            std::optional<uint64_t> head_part_num;
+            std::optional<uint64_t> min_push_part_num;
+            std::optional<uint64_t> max_push_part_num;
+            std::vector<rados::cls::fifo::fifo_journal_entry_t> journal_entries_add;
+            std::vector<rados::cls::fifo::fifo_journal_entry_t> journal_entries_rm;
+          } state;
+
+          MetaUpdateParams& objv(const fifo_objv_t& objv) {
+            state.objv = objv;
+            return *this;
+          }
+          MetaUpdateParams& tail_part_num(std::optional<uint64_t> tail_part_num) {
+            state.tail_part_num = tail_part_num;
+            return *this;
+          }
+          MetaUpdateParams& tail_part_num(uint64_t tail_part_num) {
+            state.tail_part_num = tail_part_num;
+            return *this;
+          }
+          MetaUpdateParams& head_part_num(std::optional<uint64_t> head_part_num) {
+            state.head_part_num = head_part_num;
+            return *this;
+          }
+          MetaUpdateParams& head_part_num(uint64_t head_part_num) {
+            state.head_part_num = head_part_num;
+            return *this;
+          }
+          MetaUpdateParams& min_push_part_num(uint64_t num) {
+            state.min_push_part_num = num;
+            return *this;
+          }
+          MetaUpdateParams& max_push_part_num(std::optional<uint64_t> num) {
+            state.max_push_part_num = num;
+            return *this;
+          }
+          MetaUpdateParams& max_push_part_num(uint64_t num) {
+            state.max_push_part_num = num;
+            return *this;
+          }
+          MetaUpdateParams& journal_entry_add(std::optional<rados::cls::fifo::fifo_journal_entry_t> entry) {
+            if (entry) {
+              state.journal_entries_add.push_back(*entry);
+            }
+            return *this;
+          }
+          MetaUpdateParams& journal_entry_add(const rados::cls::fifo::fifo_journal_entry_t& entry) {
+            state.journal_entries_add.push_back(entry);
+            return *this;
+          }
+          MetaUpdateParams& journal_entries_rm(std::vector<rados::cls::fifo::fifo_journal_entry_t>& entries) {
+            state.journal_entries_rm = entries;
+            return *this;
+          }
+
+          int apply_update(CephContext *cct,
+                           rados::cls::fifo::fifo_info_t *info);
+        };
+
+        static int meta_update(librados::ObjectWriteOperation *rados_op,
+                                const MetaUpdateParams& params);
+        /* init part */
+
+        struct PartInitParams {
+          struct State {
+            string tag;
+            rados::cls::fifo::fifo_data_params_t data_params;
+          } state;
+
+          PartInitParams& tag(const std::string& tag) {
+            state.tag = tag;
+            return *this;
+          }
+          PartInitParams& data_params(const rados::cls::fifo::fifo_data_params_t& data_params) {
+            state.data_params = data_params;
+            return *this;
+          }
+        };
+
+        static int part_init(librados::ObjectWriteOperation *op,
+                             const PartInitParams& params);
+
+       /* push part */
+
+        struct PushPartParams {
+          struct State {
+            string tag;
+           std::vector<bufferlist> data_bufs;
+           uint64_t total_len{0};
+          } state;
+
+          PushPartParams& tag(const std::string& tag) {
+            state.tag = tag;
+            return *this;
+          }
+          PushPartParams& data(bufferlist& bl) {
+           state.total_len += bl.length();
+            state.data_bufs.emplace_back(bl);
+            return *this;
+          }
+          PushPartParams& data_bufs(std::vector<bufferlist>& dbs) {
+           for (auto& bl : dbs) {
+             data(bl);
+           }
+            return *this;
+          }
+        };
+
+        static int push_part(librados::ObjectWriteOperation *op,
+                             const PushPartParams& params);
+       /* trim part */
+
+        struct TrimPartParams {
+          struct State {
+            std::optional<string> tag;
+            uint64_t ofs;
+          } state;
+
+          TrimPartParams& tag(std::optional<std::string> tag) {
+            state.tag = tag;
+            return *this;
+          }
+          TrimPartParams& ofs(uint64_t ofs) {
+            state.ofs = ofs;
+            return *this;
+          }
+        };
+
+        static int trim_part(librados::ObjectWriteOperation *op,
+                             const TrimPartParams& params);
+       /* list part */
+
+        struct ListPartParams {
+          struct State {
+            std::optional<string> tag;
+            uint64_t ofs;
+            int max_entries{100};
+          } state;
+
+          ListPartParams& tag(const std::string& tag) {
+            state.tag = tag;
+            return *this;
+          }
+          ListPartParams& ofs(uint64_t ofs) {
+            state.ofs = ofs;
+            return *this;
+          }
+          ListPartParams& max_entries(int _max_entries) {
+            state.max_entries = _max_entries;
+            return *this;
+          }
+        };
+
+        static int list_part(librados::IoCtx& ioctx,
+                             const string& oid,
+                             const ListPartParams& params,
+                             std::vector<cls_fifo_part_list_entry_t> *pentries,
+                             bool *more,
+                             bool *full_part = nullptr,
+                             string *ptag = nullptr);
+
+        static int get_part_info(librados::IoCtx& ioctx,
+                                 const string& oid,
+                                 rados::cls::fifo::fifo_part_header_t *header);
+      };
+
+      struct fifo_entry {
+        bufferlist data;
+        string marker;
+        ceph::real_time mtime;
+      };
+
+      using fifo_part_info = rados::cls::fifo::fifo_part_header_t;
+
+      class FIFO {
+        CephContext *cct;
+        string id;
+
+        string meta_oid;
+
+        std::optional<librados::IoCtx> _ioctx;
+        librados::IoCtx *ioctx{nullptr};
+
+        fifo_info_t meta_info;
+
+        uint32_t part_header_size;
+        uint32_t part_entry_overhead;
+
+        bool is_open{false};
+
+        string craft_marker(int64_t part_num,
+                        uint64_t part_ofs);
+
+        bool parse_marker(const string& marker,
+                          int64_t *part_num,
+                          uint64_t *part_ofs);
+
+        int update_meta(ClsFIFO::MetaUpdateParams& update_params,
+                        bool *canceled);
+        int do_read_meta(std::optional<fifo_objv_t> objv = std::nullopt);
+
+        int create_part(int64_t part_num, const string& tag,
+                        int64_t& max_part_num);
+        int remove_part(int64_t part_num, const string& tag,
+                        int64_t& tail_part_num);
+
+        int process_journal_entry(const fifo_journal_entry_t& entry,
+                                  int64_t& tail_part_num,
+                                  int64_t& head_part_num,
+                                  int64_t& max_part_num);
+        int process_journal_entries(vector<fifo_journal_entry_t> *processed,
+                                    int64_t& tail_part_num,
+                                    int64_t& head_part_num,
+                                    int64_t& max_part_num);
+        int process_journal();
+
+        int prepare_new_part(bool is_head);
+        int prepare_new_head();
+
+       int push_entries(int64_t part_num, std::vector<bufferlist>& data_bufs);
+        int trim_part(int64_t part_num,
+                      uint64_t ofs,
+                      std::optional<string> tag);
+
+      public:
+        FIFO(CephContext *_cct,
+             const string& _id,
+             librados::IoCtx *_ioctx = nullptr) : cct(_cct),
+                                                  id(_id),
+                                                  ioctx(_ioctx) {
+          meta_oid = id;
+        }
+
+        int init_ioctx(librados::Rados *rados,
+                       const string& pool,
+                       std::optional<string> pool_ns);
+
+        void set_ioctx(librados::IoCtx *_ioctx) {
+          ioctx = ioctx;
+        }
+
+        int open(bool create,
+                 std::optional<ClsFIFO::MetaCreateParams> create_params = std::nullopt);
+
+        int read_meta(std::optional<fifo_objv_t> objv = std::nullopt);
+
+        const fifo_info_t& get_meta() const {
+          return meta_info;
+        }
+
+        void get_part_layout_info(uint32_t *header_size, uint32_t *entry_overhead) {
+          if (header_size) {
+            *header_size = part_header_size;
+          }
+
+          if (entry_overhead) {
+            *entry_overhead = part_entry_overhead;
+          }
+        }
+
+        int push(bufferlist& bl);
+       int push(vector<bufferlist>& bl);
+
+        int list(int max_entries,
+                 std::optional<string> marker,
+                 vector<fifo_entry> *result,
+                 bool *more);
+
+        int trim(const string& marker);
+
+        int get_part_info(int64_t part_num,
+                          fifo_part_info *result);
+      };
+    } // namespace fifo
+  }  // namespace cls
+} // namespace rados
diff --git a/src/cls/fifo/cls_fifo_ops.cc b/src/cls/fifo/cls_fifo_ops.cc
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/src/cls/fifo/cls_fifo_ops.h b/src/cls/fifo/cls_fifo_ops.h
new file mode 100644 (file)
index 0000000..2a8ceab
--- /dev/null
@@ -0,0 +1,282 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2019 Red Hat, Inc.
+ * Copyright (C) 2019 SUSE LLC
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#pragma once
+
+#include "include/types.h"
+#include "include/utime.h"
+#include "cls/fifo/cls_fifo_types.h"
+
+struct cls_fifo_meta_create_op
+{
+  string id;
+  std::optional<rados::cls::fifo::fifo_objv_t> objv;
+  struct {
+    string name;
+    string ns;
+  } pool;
+  std::optional<string> oid_prefix;
+
+  uint64_t max_part_size{0};
+  uint64_t max_entry_size{0};
+
+  bool exclusive{false};
+
+  void encode(bufferlist &bl) const {
+    ENCODE_START(1, 1, bl);
+    encode(id, bl);
+    encode(objv, bl);
+    encode(pool.name, bl);
+    encode(pool.ns, bl);
+    encode(oid_prefix, bl);
+    encode(max_part_size, bl);
+    encode(max_entry_size, bl);
+    encode(exclusive, bl);
+    ENCODE_FINISH(bl);
+  }
+  void decode(bufferlist::const_iterator &bl) {
+    DECODE_START(1, bl);
+    decode(id, bl);
+    decode(objv, bl);
+    decode(pool.name, bl);
+    decode(pool.ns, bl);
+    decode(oid_prefix, bl);
+    decode(max_part_size, bl);
+    decode(max_entry_size, bl);
+    decode(exclusive, bl);
+    DECODE_FINISH(bl);
+  }
+};
+WRITE_CLASS_ENCODER(cls_fifo_meta_create_op)
+
+struct cls_fifo_meta_get_op
+{
+  std::optional<rados::cls::fifo::fifo_objv_t> objv;
+
+  void encode(bufferlist &bl) const {
+    ENCODE_START(1, 1, bl);
+    encode(objv, bl);
+    ENCODE_FINISH(bl);
+  }
+  void decode(bufferlist::const_iterator &bl) {
+    DECODE_START(1, bl);
+    decode(objv, bl);
+    DECODE_FINISH(bl);
+  }
+};
+WRITE_CLASS_ENCODER(cls_fifo_meta_get_op)
+
+struct cls_fifo_meta_get_op_reply
+{
+  rados::cls::fifo::fifo_info_t info;
+  uint32_t part_header_size{0};
+  uint32_t part_entry_overhead{0}; /* per entry extra data that is stored */
+
+  void encode(bufferlist &bl) const {
+    ENCODE_START(1, 1, bl);
+    encode(info, bl);
+    encode(part_header_size, bl);
+    encode(part_entry_overhead, bl);
+    ENCODE_FINISH(bl);
+  }
+  void decode(bufferlist::const_iterator &bl) {
+    DECODE_START(1, bl);
+    decode(info, bl);
+    decode(part_header_size, bl);
+    decode(part_entry_overhead, bl);
+    DECODE_FINISH(bl);
+  }
+};
+WRITE_CLASS_ENCODER(cls_fifo_meta_get_op_reply)
+
+struct cls_fifo_meta_update_op
+{
+  rados::cls::fifo::fifo_objv_t objv;
+
+  std::optional<uint64_t> tail_part_num;
+  std::optional<uint64_t> head_part_num;
+  std::optional<uint64_t> min_push_part_num;
+  std::optional<uint64_t> max_push_part_num;
+  std::vector<rados::cls::fifo::fifo_journal_entry_t> journal_entries_add;
+  std::vector<rados::cls::fifo::fifo_journal_entry_t> journal_entries_rm;
+
+  void encode(bufferlist &bl) const {
+    ENCODE_START(1, 1, bl);
+    encode(objv, bl);
+    encode(tail_part_num, bl);
+    encode(head_part_num, bl);
+    encode(min_push_part_num, bl);
+    encode(max_push_part_num, bl);
+    encode(journal_entries_add, bl);
+    encode(journal_entries_rm, bl);
+    ENCODE_FINISH(bl);
+  }
+  void decode(bufferlist::const_iterator &bl) {
+    DECODE_START(1, bl);
+    decode(objv, bl);
+    decode(tail_part_num, bl);
+    decode(head_part_num, bl);
+    decode(min_push_part_num, bl);
+    decode(max_push_part_num, bl);
+    decode(journal_entries_add, bl);
+    decode(journal_entries_rm, bl);
+    DECODE_FINISH(bl);
+  }
+};
+WRITE_CLASS_ENCODER(cls_fifo_meta_update_op)
+
+struct cls_fifo_part_init_op
+{
+  string tag;
+  rados::cls::fifo::fifo_data_params_t data_params;
+
+  void encode(bufferlist &bl) const {
+    ENCODE_START(1, 1, bl);
+    encode(tag, bl);
+    encode(data_params, bl);
+    ENCODE_FINISH(bl);
+  }
+  void decode(bufferlist::const_iterator &bl) {
+    DECODE_START(1, bl);
+    decode(tag, bl);
+    decode(data_params, bl);
+    DECODE_FINISH(bl);
+  }
+};
+WRITE_CLASS_ENCODER(cls_fifo_part_init_op)
+
+struct cls_fifo_part_push_op
+{
+  string tag;
+  std::vector<bufferlist> data_bufs;
+  uint64_t total_len{0};
+
+  void encode(bufferlist &bl) const {
+    ENCODE_START(1, 1, bl);
+    encode(tag, bl);
+    encode(data_bufs, bl);
+    encode(total_len, bl);
+    ENCODE_FINISH(bl);
+  }
+  void decode(bufferlist::const_iterator &bl) {
+    DECODE_START(1, bl);
+    decode(tag, bl);
+    decode(data_bufs, bl);
+    decode(total_len, bl);
+    DECODE_FINISH(bl);
+  }
+};
+WRITE_CLASS_ENCODER(cls_fifo_part_push_op)
+
+struct cls_fifo_part_trim_op
+{
+  std::optional<string> tag;
+  uint64_t ofs{0};
+
+  void encode(bufferlist &bl) const {
+    ENCODE_START(1, 1, bl);
+    encode(tag, bl);
+    encode(ofs, bl);
+    ENCODE_FINISH(bl);
+  }
+  void decode(bufferlist::const_iterator &bl) {
+    DECODE_START(1, bl);
+    decode(tag, bl);
+    decode(ofs, bl);
+    DECODE_FINISH(bl);
+  }
+};
+WRITE_CLASS_ENCODER(cls_fifo_part_trim_op)
+
+struct cls_fifo_part_list_op
+{
+  std::optional<string> tag;
+  uint64_t ofs{0};
+  int max_entries{100};
+
+  void encode(bufferlist &bl) const {
+    ENCODE_START(1, 1, bl);
+    encode(tag, bl);
+    encode(ofs, bl);
+    encode(max_entries, bl);
+    ENCODE_FINISH(bl);
+  }
+  void decode(bufferlist::const_iterator &bl) {
+    DECODE_START(1, bl);
+    decode(tag, bl);
+    decode(ofs, bl);
+    decode(max_entries, bl);
+    DECODE_FINISH(bl);
+  }
+};
+WRITE_CLASS_ENCODER(cls_fifo_part_list_op)
+
+struct cls_fifo_part_list_op_reply
+{
+  string tag;
+  vector<rados::cls::fifo::cls_fifo_part_list_entry_t> entries;
+  bool more{false};
+  bool full_part{false}; /* whether part is full or still can be written to.
+                            A non full part is by definition head part */
+
+  void encode(bufferlist &bl) const {
+    ENCODE_START(1, 1, bl);
+    encode(tag, bl);
+    encode(entries, bl);
+    encode(more, bl);
+    encode(full_part, bl);
+    ENCODE_FINISH(bl);
+  }
+  void decode(bufferlist::const_iterator &bl) {
+    DECODE_START(1, bl);
+    decode(tag, bl);
+    decode(entries, bl);
+    decode(more, bl);
+    decode(full_part, bl);
+    DECODE_FINISH(bl);
+  }
+};
+WRITE_CLASS_ENCODER(cls_fifo_part_list_op_reply)
+
+struct cls_fifo_part_get_info_op
+{
+  void encode(bufferlist &bl) const {
+    ENCODE_START(1, 1, bl);
+    ENCODE_FINISH(bl);
+  }
+  void decode(bufferlist::const_iterator &bl) {
+    DECODE_START(1, bl);
+    DECODE_FINISH(bl);
+  }
+};
+WRITE_CLASS_ENCODER(cls_fifo_part_get_info_op)
+
+struct cls_fifo_part_get_info_op_reply
+{
+  rados::cls::fifo::fifo_part_header_t header;
+
+  void encode(bufferlist &bl) const {
+    ENCODE_START(1, 1, bl);
+    encode(header, bl);
+    ENCODE_FINISH(bl);
+  }
+  void decode(bufferlist::const_iterator &bl) {
+    DECODE_START(1, bl);
+    decode(header, bl);
+    DECODE_FINISH(bl);
+  }
+};
+WRITE_CLASS_ENCODER(cls_fifo_part_get_info_op_reply)
diff --git a/src/cls/fifo/cls_fifo_types.cc b/src/cls/fifo/cls_fifo_types.cc
new file mode 100644 (file)
index 0000000..b182551
--- /dev/null
@@ -0,0 +1,90 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2019 Red Hat, Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#include "cls_fifo_types.h"
+
+string rados::cls::fifo::fifo_info_t::part_oid(int64_t part_num)
+{
+  char buf[oid_prefix.size() + 32];
+  snprintf(buf, sizeof(buf), "%s.%lld", oid_prefix.c_str(), (long long)part_num);
+
+  return string(buf);
+}
+
+void rados::cls::fifo::fifo_info_t::prepare_next_journal_entry(fifo_journal_entry_t *entry, const string& tag)
+{
+  entry->op = fifo_journal_entry_t::Op::OP_CREATE;
+  entry->part_num = max_push_part_num + 1;
+  entry->part_tag = tag;
+}
+
+int rados::cls::fifo::fifo_info_t::apply_update(std::optional<uint64_t>& _tail_part_num,
+                                                std::optional<uint64_t>& _head_part_num,
+                                                std::optional<uint64_t>& _min_push_part_num,
+                                                std::optional<uint64_t>& _max_push_part_num,
+                                                std::vector<rados::cls::fifo::fifo_journal_entry_t>& journal_entries_add,
+                                                std::vector<rados::cls::fifo::fifo_journal_entry_t>& journal_entries_rm,
+                                                string *err)
+{
+  if (_tail_part_num) {
+    tail_part_num = *_tail_part_num;
+  }
+
+  if (_min_push_part_num) {
+    min_push_part_num = *_min_push_part_num;
+  }
+
+  if (_max_push_part_num) {
+    max_push_part_num = *_max_push_part_num;
+  }
+
+  for (auto& entry : journal_entries_add) {
+    auto iter = journal.find(entry.part_num);
+    if (iter != journal.end() &&
+        iter->second.op == entry.op) {
+      /* don't allow multiple concurrent (same) operations on the same part,
+         racing clients should use objv to avoid races anyway */
+      if (err) {
+        stringstream ss;
+        ss << "multiple concurrent operations on same part are not allowed, part num=" << entry.part_num;
+        *err = ss.str();
+      }
+      return -EINVAL;
+    }
+
+    if (entry.op == fifo_journal_entry_t::Op::OP_CREATE) {
+      tags[entry.part_num] = entry.part_tag;
+    }
+
+    journal.insert(std::pair<int64_t, fifo_journal_entry_t>(entry.part_num, std::move(entry)));
+  }
+
+  for (auto& entry : journal_entries_rm) {
+    journal.erase(entry.part_num);
+  }
+
+  if (_head_part_num) {
+    tags.erase(head_part_num);
+    head_part_num = *_head_part_num;
+    auto iter = tags.find(head_part_num);
+    if (iter != tags.end()) {
+      head_tag = iter->second;
+    } else {
+      head_tag.erase();
+    }
+  }
+
+  return 0;
+}
diff --git a/src/cls/fifo/cls_fifo_types.h b/src/cls/fifo/cls_fifo_types.h
new file mode 100644 (file)
index 0000000..bdc1ed7
--- /dev/null
@@ -0,0 +1,274 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2019 Red Hat, Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#pragma once
+
+
+#include "include/encoding.h"
+#include "include/types.h"
+
+
+class JSONObj;
+
+namespace rados {
+  namespace cls {
+    namespace fifo {
+      struct fifo_objv_t {
+        string instance;
+        uint64_t ver{0};
+
+        void encode(bufferlist &bl) const {
+          ENCODE_START(1, 1, bl);
+          encode(instance, bl);
+          encode(ver, bl);
+          ENCODE_FINISH(bl);
+        }
+        void decode(bufferlist::const_iterator &bl) {
+          DECODE_START(1, bl);
+          decode(instance, bl);
+          decode(ver, bl);
+          DECODE_FINISH(bl);
+        }
+        void dump(Formatter *f) const;
+        void decode_json(JSONObj *obj);
+
+        bool operator==(const fifo_objv_t& rhs) const {
+          return (instance == rhs.instance &&
+                  ver == rhs.ver);
+        }
+
+        bool empty() const {
+          return instance.empty();
+        }
+
+        string to_str() {
+          char buf[instance.size() + 32];
+          snprintf(buf, sizeof(buf), "%s{%lld}", instance.c_str(), (long long)ver);
+          return string(buf);
+        }
+      };
+      WRITE_CLASS_ENCODER(rados::cls::fifo::fifo_objv_t)
+
+      struct fifo_data_params_t {
+        uint64_t max_part_size{0};
+        uint64_t max_entry_size{0};
+        uint64_t full_size_threshold{0};
+
+        void encode(bufferlist &bl) const {
+          ENCODE_START(1, 1, bl);
+          encode(max_part_size, bl);
+          encode(max_entry_size, bl);
+          encode(full_size_threshold, bl);
+          ENCODE_FINISH(bl);
+        }
+        void decode(bufferlist::const_iterator &bl) {
+          DECODE_START(1, bl);
+          decode(max_part_size, bl);
+          decode(max_entry_size, bl);
+          decode(full_size_threshold, bl);
+          DECODE_FINISH(bl);
+        }
+        void dump(Formatter *f) const;
+        void decode_json(JSONObj *obj);
+
+        bool operator==(const fifo_data_params_t& rhs) const {
+          return (max_part_size == rhs.max_part_size &&
+                  max_entry_size == rhs.max_entry_size &&
+                  full_size_threshold == rhs.full_size_threshold);
+        }
+      };
+      WRITE_CLASS_ENCODER(rados::cls::fifo::fifo_data_params_t)
+
+      struct fifo_journal_entry_t {
+        enum Op {
+          OP_UNKNOWN  = 0,
+          OP_CREATE   = 1,
+          OP_SET_HEAD = 2,
+          OP_REMOVE   = 3,
+        } op{OP_UNKNOWN};
+
+        int64_t part_num{0};
+        string part_tag;
+
+        void encode(bufferlist &bl) const {
+          ENCODE_START(1, 1, bl);
+          encode((int)op, bl);
+          encode(part_num, bl);
+          encode(part_tag, bl);
+          ENCODE_FINISH(bl);
+        }
+        void decode(bufferlist::const_iterator &bl) {
+          DECODE_START(1, bl);
+          int i;
+          decode(i, bl);
+          op = (Op)i;
+          decode(part_num, bl);
+          decode(part_tag, bl);
+          DECODE_FINISH(bl);
+        }
+        void dump(Formatter *f) const;
+
+        bool operator==(const fifo_journal_entry_t& e) {
+          return (op == e.op &&
+                  part_num == e.part_num &&
+                  part_tag == e.part_tag);
+        }
+      };
+      WRITE_CLASS_ENCODER(rados::cls::fifo::fifo_journal_entry_t)
+
+      struct fifo_info_t {
+        string id;
+        fifo_objv_t objv;
+        string oid_prefix;
+        fifo_data_params_t data_params;
+
+        int64_t tail_part_num{0};
+        int64_t head_part_num{-1};
+        int64_t min_push_part_num{0};
+        int64_t max_push_part_num{-1};
+
+        string head_tag;
+        map<int64_t, string> tags;
+
+        std::multimap<int64_t, fifo_journal_entry_t> journal;
+
+        bool need_new_head() {
+          return (head_part_num < min_push_part_num);
+        }
+
+        bool need_new_part() {
+          return (max_push_part_num < min_push_part_num);
+        }
+
+        string part_oid(int64_t part_num);
+        void prepare_next_journal_entry(fifo_journal_entry_t *entry, const string& tag);
+
+        int apply_update(std::optional<uint64_t>& _tail_part_num,
+                         std::optional<uint64_t>& _head_part_num,
+                         std::optional<uint64_t>& _min_push_part_num,
+                         std::optional<uint64_t>& _max_push_part_num,
+                         std::vector<rados::cls::fifo::fifo_journal_entry_t>& journal_entries_add,
+                         std::vector<rados::cls::fifo::fifo_journal_entry_t>& journal_entries_rm,
+                         string *err);
+
+        void encode(bufferlist &bl) const {
+          ENCODE_START(1, 1, bl);
+          encode(id, bl);
+          encode(objv, bl);
+          encode(oid_prefix, bl);
+          encode(data_params, bl);
+          encode(tail_part_num, bl);
+          encode(head_part_num, bl);
+          encode(min_push_part_num, bl);
+          encode(max_push_part_num, bl);
+          encode(tags, bl);
+          encode(head_tag, bl);
+          encode(journal, bl);
+          ENCODE_FINISH(bl);
+        }
+        void decode(bufferlist::const_iterator &bl) {
+          DECODE_START(1, bl);
+          decode(id, bl);
+          decode(objv, bl);
+          decode(oid_prefix, bl);
+          decode(data_params, bl);
+          decode(tail_part_num, bl);
+          decode(head_part_num, bl);
+          decode(min_push_part_num, bl);
+          decode(max_push_part_num, bl);
+          decode(tags, bl);
+          decode(head_tag, bl);
+          decode(journal, bl);
+          DECODE_FINISH(bl);
+        }
+        void dump(Formatter *f) const;
+        void decode_json(JSONObj *obj);
+      };
+      WRITE_CLASS_ENCODER(rados::cls::fifo::fifo_info_t)
+
+      struct cls_fifo_part_list_entry_t {
+        bufferlist data;
+        uint64_t ofs;
+        ceph::real_time mtime;
+
+        cls_fifo_part_list_entry_t() {}
+        cls_fifo_part_list_entry_t(bufferlist&& _data,
+                                   uint64_t _ofs,
+                                   ceph::real_time _mtime) : data(std::move(_data)), ofs(_ofs), mtime(_mtime) {}
+
+
+        void encode(bufferlist &bl) const {
+          ENCODE_START(1, 1, bl);
+          encode(data, bl);
+          encode(ofs, bl);
+          encode(mtime, bl);
+          ENCODE_FINISH(bl);
+        }
+        void decode(bufferlist::const_iterator &bl) {
+          DECODE_START(1, bl);
+          decode(data, bl);
+          decode(ofs, bl);
+          decode(mtime, bl);
+          DECODE_FINISH(bl);
+        }
+      };
+      WRITE_CLASS_ENCODER(rados::cls::fifo::cls_fifo_part_list_entry_t)
+
+      struct fifo_part_header_t {
+        string tag;
+
+        fifo_data_params_t params;
+
+        uint64_t magic{0};
+
+        uint64_t min_ofs{0};
+        uint64_t max_ofs{0};
+        uint64_t min_index{0};
+        uint64_t max_index{0};
+
+        void encode(bufferlist &bl) const {
+          ENCODE_START(1, 1, bl);
+          encode(tag, bl);
+          encode(params, bl);
+          encode(magic, bl);
+          encode(min_ofs, bl);
+          encode(max_ofs, bl);
+          encode(min_index, bl);
+          encode(max_index, bl);
+          ENCODE_FINISH(bl);
+        }
+        void decode(bufferlist::const_iterator &bl) {
+          DECODE_START(1, bl);
+          decode(tag, bl);
+          decode(params, bl);
+          decode(magic, bl);
+          decode(min_ofs, bl);
+          decode(max_ofs, bl);
+          decode(min_index, bl);
+          decode(max_index, bl);
+          DECODE_FINISH(bl);
+        }
+      };
+      WRITE_CLASS_ENCODER(fifo_part_header_t)
+
+    }
+  }
+}
+
+static inline ostream& operator<<(ostream& os, const rados::cls::fifo::fifo_objv_t& objv)
+{
+  return os << objv.instance << "{" << objv.ver << "}";
+}
+
index 0d82c929f4c5a74081da63df9252c9f96c40fada..c261a313e3bb86fd544a2fd29ce642b7497b0df6 100644 (file)
@@ -8,6 +8,7 @@ target_include_directories(unit-main PRIVATE
   $<TARGET_PROPERTY:GTest::GTest,INTERFACE_INCLUDE_DIRECTORIES>)
 
 add_subdirectory(cls_hello)
+add_subdirectory(cls_fifo)
 add_subdirectory(cls_lock)
 add_subdirectory(cls_cas)
 add_subdirectory(cls_log)
diff --git a/src/test/cls_fifo/CMakeLists.txt b/src/test/cls_fifo/CMakeLists.txt
new file mode 100644 (file)
index 0000000..49487f8
--- /dev/null
@@ -0,0 +1,17 @@
+add_executable(ceph_test_cls_fifo
+  test_cls_fifo.cc
+  )
+target_link_libraries(ceph_test_cls_fifo
+  cls_fifo_client
+  librados
+  global
+  ${UNITTEST_LIBS}
+  ${BLKID_LIBRARIES}
+  ${CMAKE_DL_LIBS}
+  ${CRYPTO_LIBS}
+  ${EXTRALIBS}
+  radostest-cxx
+  )
+install(TARGETS
+  ceph_test_cls_fifo
+  DESTINATION ${CMAKE_INSTALL_BINDIR})
diff --git a/src/test/cls_fifo/test_cls_fifo.cc b/src/test/cls_fifo/test_cls_fifo.cc
new file mode 100644 (file)
index 0000000..58286f4
--- /dev/null
@@ -0,0 +1,704 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2019 Red Hat, Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software 
+ * Foundation.  See file COPYING.
+ * 
+ */
+
+#include <iostream>
+#include <errno.h>
+
+#include "include/types.h"
+#include "include/rados/librados.hpp"
+
+#include "test/librados/test_cxx.h"
+#include "global/global_context.h"
+
+#include "gtest/gtest.h"
+
+using namespace librados;
+
+#include "cls/fifo/cls_fifo_client.h"
+
+
+using namespace rados::cls::fifo;
+
+static CephContext *cct(librados::IoCtx& ioctx)
+{
+  return reinterpret_cast<CephContext *>(ioctx.cct());
+}
+
+static int fifo_create(IoCtx& ioctx,
+                       const string& oid,
+                       const string& id,
+                       const ClsFIFO::MetaCreateParams& params)
+{
+  ObjectWriteOperation op;
+
+  int r = ClsFIFO::meta_create(&op, id, params);
+  if (r < 0) {
+    return r;
+  }
+
+  return ioctx.operate(oid, &op);
+}
+
+TEST(ClsFIFO, TestCreate) {
+  Rados cluster;
+  std::string pool_name = get_temp_pool_name();
+  ASSERT_EQ("", create_one_pool_pp(pool_name, cluster));
+  IoCtx ioctx;
+  cluster.ioctx_create(pool_name.c_str(), ioctx);
+
+  string fifo_id = "fifo";
+  string oid = fifo_id;
+
+  ASSERT_EQ(-EINVAL, fifo_create(ioctx, oid, string(),
+                                  ClsFIFO::MetaCreateParams()));
+
+  ASSERT_EQ(-EINVAL, fifo_create(ioctx, oid, fifo_id,
+                     ClsFIFO::MetaCreateParams()
+                     .max_part_size(0)));
+
+  ASSERT_EQ(-EINVAL, fifo_create(ioctx, oid, fifo_id,
+                     ClsFIFO::MetaCreateParams()
+                     .max_entry_size(0)));
+  
+  /* first successful create */
+  ASSERT_EQ(0, fifo_create(ioctx, oid, fifo_id,
+               ClsFIFO::MetaCreateParams()));
+
+  uint64_t size;
+  struct timespec ts;
+  ASSERT_EQ(0, ioctx.stat2(oid, &size, &ts));
+  ASSERT_GT(size, 0);
+
+  /* test idempotency */
+  ASSERT_EQ(0, fifo_create(ioctx, oid, fifo_id,
+               ClsFIFO::MetaCreateParams()));
+
+  uint64_t size2;
+  struct timespec ts2;
+  ASSERT_EQ(0, ioctx.stat2(oid, &size2, &ts2));
+  ASSERT_EQ(size2, size);
+
+  ASSERT_EQ(-EEXIST, fifo_create(ioctx, oid, fifo_id,
+               ClsFIFO::MetaCreateParams()
+               .exclusive(true)));
+
+  ASSERT_EQ(-EEXIST, fifo_create(ioctx, oid, fifo_id,
+               ClsFIFO::MetaCreateParams()
+               .oid_prefix("myprefix")
+               .exclusive(false)));
+
+  ASSERT_EQ(-EEXIST, fifo_create(ioctx, oid, "foo",
+               ClsFIFO::MetaCreateParams()
+               .exclusive(false)));
+
+  ASSERT_EQ(0, destroy_one_pool_pp(pool_name, cluster));
+}
+
+TEST(ClsFIFO, TestGetInfo) {
+  Rados cluster;
+  std::string pool_name = get_temp_pool_name();
+  ASSERT_EQ("", create_one_pool_pp(pool_name, cluster));
+  IoCtx ioctx;
+  cluster.ioctx_create(pool_name.c_str(), ioctx);
+
+  string fifo_id = "fifo";
+  string oid = fifo_id;
+
+  fifo_info_t info;
+
+  /* first successful create */
+  ASSERT_EQ(0, fifo_create(ioctx, oid, fifo_id,
+               ClsFIFO::MetaCreateParams()));
+
+  uint32_t part_header_size;
+  uint32_t part_entry_overhead;
+
+  ASSERT_EQ(0, ClsFIFO::meta_get(ioctx, oid,
+               ClsFIFO::MetaGetParams(), &info,
+               &part_header_size, &part_entry_overhead));
+
+  ASSERT_GT(part_header_size, 0);
+  ASSERT_GT(part_entry_overhead, 0);
+
+  ASSERT_TRUE(!info.objv.instance.empty());
+
+  ASSERT_EQ(0, ClsFIFO::meta_get(ioctx, oid,
+               ClsFIFO::MetaGetParams()
+               .objv(info.objv),
+               &info,
+               &part_header_size, &part_entry_overhead));
+
+  fifo_objv_t objv;
+  objv.instance="foo";
+  objv.ver = 12;
+  ASSERT_EQ(-ECANCELED, ClsFIFO::meta_get(ioctx, oid,
+               ClsFIFO::MetaGetParams()
+               .objv(objv),
+               &info,
+               &part_header_size, &part_entry_overhead));
+
+  ASSERT_EQ(0, destroy_one_pool_pp(pool_name, cluster));
+}
+
+TEST(FIFO, TestOpenDefault) {
+  Rados cluster;
+  std::string pool_name = get_temp_pool_name();
+  ASSERT_EQ("", create_one_pool_pp(pool_name, cluster));
+  IoCtx ioctx;
+  cluster.ioctx_create(pool_name.c_str(), ioctx);
+
+  string fifo_id = "fifo";
+
+  FIFO fifo(cct(ioctx), fifo_id, &ioctx);
+
+  /* pre-open ops that should fail */
+  ASSERT_EQ(-EINVAL, fifo.read_meta());
+
+  bufferlist bl;
+  ASSERT_EQ(-EINVAL, fifo.push(bl));
+
+  ASSERT_EQ(-EINVAL, fifo.list(100, nullopt, nullptr, nullptr));
+  ASSERT_EQ(-EINVAL, fifo.trim(string()));
+
+  ASSERT_EQ(-ENOENT, fifo.open(false));
+
+  /* first successful create */
+  ASSERT_EQ(0, fifo.open(true));
+
+  ASSERT_EQ(0, fifo.read_meta()); /* force reading from backend */
+
+  auto info = fifo.get_meta();
+
+  ASSERT_EQ(info.id, fifo_id);
+
+  ASSERT_EQ(0, destroy_one_pool_pp(pool_name, cluster));
+}
+
+TEST(FIFO, TestOpenParams) {
+  Rados cluster;
+  std::string pool_name = get_temp_pool_name();
+  ASSERT_EQ("", create_one_pool_pp(pool_name, cluster));
+  IoCtx ioctx;
+  cluster.ioctx_create(pool_name.c_str(), ioctx);
+
+  string fifo_id = "fifo";
+
+  FIFO fifo(cct(ioctx), fifo_id, &ioctx);
+
+  uint64_t max_part_size = 10 * 1024;
+  uint64_t max_entry_size = 128;
+  string oid_prefix = "foo.123.";
+
+  fifo_objv_t objv;
+  objv.instance = "fooz";
+  objv.ver = 10;
+
+
+  /* first successful create */
+  ASSERT_EQ(0, fifo.open(true,
+                         ClsFIFO::MetaCreateParams()
+                         .max_part_size(max_part_size)
+                         .max_entry_size(max_entry_size)
+                         .oid_prefix(oid_prefix)
+                         .objv(objv)));
+
+  ASSERT_EQ(0, fifo.read_meta()); /* force reading from backend */
+
+  auto info = fifo.get_meta();
+
+  ASSERT_EQ(info.id, fifo_id);
+  ASSERT_EQ(info.data_params.max_part_size, max_part_size);
+  ASSERT_EQ(info.data_params.max_entry_size, max_entry_size);
+  ASSERT_EQ(info.objv, objv);
+
+  ASSERT_EQ(0, destroy_one_pool_pp(pool_name, cluster));
+}
+
+template <class T>
+static int decode_entry(fifo_entry& entry,
+                        T *val,
+                        string *marker)
+{
+  *marker = entry.marker;
+  auto iter = entry.data.cbegin();
+
+  try {
+    decode(*val, iter);
+  } catch (buffer::error& err) {
+    return -EIO;
+  }
+
+  return 0;
+}
+
+TEST(FIFO, TestPushListTrim) {
+  Rados cluster;
+  std::string pool_name = get_temp_pool_name();
+  ASSERT_EQ("", create_one_pool_pp(pool_name, cluster));
+  IoCtx ioctx;
+  cluster.ioctx_create(pool_name.c_str(), ioctx);
+
+  string fifo_id = "fifo";
+
+  FIFO fifo(cct(ioctx), fifo_id, &ioctx);
+
+  /* first successful create */
+  ASSERT_EQ(0, fifo.open(true));
+
+  uint32_t max_entries = 10;
+
+  for (uint32_t i = 0; i < max_entries; ++i) {
+    bufferlist bl;
+    encode(i, bl);
+    ASSERT_EQ(0, fifo.push(bl));
+  }
+
+  string marker;
+
+  /* get entries one by one */
+
+  for (uint32_t i = 0; i < max_entries; ++i) {
+    vector<fifo_entry> result;
+    bool more;
+    ASSERT_EQ(0, fifo.list(1, marker, &result, &more));
+
+    bool expected_more = (i != (max_entries - 1));
+    ASSERT_EQ(expected_more, more);
+    ASSERT_EQ(1, result.size());
+
+    uint32_t val;
+    ASSERT_EQ(0, decode_entry(result.front(), &val, &marker));
+
+    ASSERT_EQ(i, val);
+  }
+
+  /* get all entries at once */
+  vector<fifo_entry> result;
+  bool more;
+  ASSERT_EQ(0, fifo.list(max_entries * 10, string(), &result, &more));
+
+  ASSERT_FALSE(more);
+  ASSERT_EQ(max_entries, result.size());
+
+  string markers[max_entries];
+
+
+  for (uint32_t i = 0; i < max_entries; ++i) {
+    uint32_t val;
+
+    ASSERT_EQ(0, decode_entry(result[i], &val, &markers[i]));
+    ASSERT_EQ(i, val);
+  }
+
+  uint32_t min_entry = 0;
+
+  /* trim one entry */
+  fifo.trim(markers[min_entry]);
+  ++min_entry;
+
+  ASSERT_EQ(0, fifo.list(max_entries * 10, string(), &result, &more));
+
+  ASSERT_FALSE(more);
+  ASSERT_EQ(max_entries - min_entry, result.size());
+
+  for (uint32_t i = min_entry; i < max_entries; ++i) {
+    uint32_t val;
+
+    ASSERT_EQ(0, decode_entry(result[i - min_entry], &val, &markers[i]));
+    ASSERT_EQ(i, val);
+  }
+
+
+  ASSERT_EQ(0, destroy_one_pool_pp(pool_name, cluster));
+}
+
+TEST(FIFO, TestPushTooBig) {
+  Rados cluster;
+  std::string pool_name = get_temp_pool_name();
+  ASSERT_EQ("", create_one_pool_pp(pool_name, cluster));
+  IoCtx ioctx;
+  cluster.ioctx_create(pool_name.c_str(), ioctx);
+
+  string fifo_id = "fifo";
+
+  FIFO fifo(cct(ioctx), fifo_id, &ioctx);
+
+  uint64_t max_part_size = 2048;
+  uint64_t max_entry_size = 128;
+
+  char buf[max_entry_size + 1];
+  memset(buf, 0, sizeof(buf));
+
+  /* first successful create */
+  ASSERT_EQ(0, fifo.open(true,
+                         ClsFIFO::MetaCreateParams()
+                         .max_part_size(max_part_size)
+                         .max_entry_size(max_entry_size)));
+
+  bufferlist bl;
+  bl.append(buf, sizeof(buf));
+
+  ASSERT_EQ(-EINVAL, fifo.push(bl));
+
+
+  ASSERT_EQ(0, destroy_one_pool_pp(pool_name, cluster));
+}
+
+TEST(FIFO, TestMultipleParts) {
+  Rados cluster;
+  std::string pool_name = get_temp_pool_name();
+  ASSERT_EQ("", create_one_pool_pp(pool_name, cluster));
+  IoCtx ioctx;
+  cluster.ioctx_create(pool_name.c_str(), ioctx);
+
+  string fifo_id = "fifo";
+
+  FIFO fifo(cct(ioctx), fifo_id, &ioctx);
+
+  uint64_t max_part_size = 2048;
+  uint64_t max_entry_size = 128;
+
+  char buf[max_entry_size];
+  memset(buf, 0, sizeof(buf));
+
+  /* create */
+  ASSERT_EQ(0, fifo.open(true,
+                         ClsFIFO::MetaCreateParams()
+                         .max_part_size(max_part_size)
+                         .max_entry_size(max_entry_size)));
+
+  uint32_t part_header_size;
+  uint32_t part_entry_overhead;
+
+  fifo.get_part_layout_info(&part_header_size, &part_entry_overhead);
+
+  int entries_per_part = (max_part_size - part_header_size) / (max_entry_size + part_entry_overhead);
+
+  int max_entries = entries_per_part * 4 + 1;
+
+  /* push enough entries */
+  for (int i = 0; i < max_entries; ++i) {
+    bufferlist bl;
+
+    *(int *)buf = i;
+    bl.append(buf, sizeof(buf));
+
+    ASSERT_EQ(0, fifo.push(bl));
+  }
+
+  auto info = fifo.get_meta();
+
+  ASSERT_EQ(info.id, fifo_id);
+  ASSERT_GT(info.head_part_num, 0); /* head should have advanced */
+
+
+  /* list all at once */
+  vector<fifo_entry> result;
+  bool more;
+  ASSERT_EQ(0, fifo.list(max_entries, string(), &result, &more));
+  ASSERT_EQ(false, more);
+
+  ASSERT_EQ(max_entries, result.size());
+
+  for (int i = 0; i < max_entries; ++i) {
+    auto& bl = result[i].data;
+    ASSERT_EQ(i, *(int *)bl.c_str());
+  }
+
+
+  /* list one at a time */
+  string marker;
+  for (int i = 0; i < max_entries; ++i) {
+    ASSERT_EQ(0, fifo.list(1, marker, &result, &more));
+
+    ASSERT_EQ(result.size(), 1);
+    bool expected_more = (i != (max_entries - 1));
+    ASSERT_EQ(expected_more, more);
+
+    auto& entry = result[0];
+
+    auto& bl = entry.data;
+    marker = entry.marker;
+
+    ASSERT_EQ(i, *(int *)bl.c_str());
+  }
+
+  /* trim one at a time */
+  marker.clear();
+  for (int i = 0; i < max_entries; ++i) {
+    /* read single entry */
+    ASSERT_EQ(0, fifo.list(1, marker, &result, &more));
+
+    ASSERT_EQ(result.size(), 1);
+    bool expected_more = (i != (max_entries - 1));
+    ASSERT_EQ(expected_more, more);
+
+    marker = result[0].marker;
+
+    /* trim */
+    ASSERT_EQ(0, fifo.trim(marker));
+
+    /* check tail */
+    info = fifo.get_meta();
+    ASSERT_EQ(info.tail_part_num, i / entries_per_part);
+
+    /* try to read all again, see how many entries left */
+    ASSERT_EQ(0, fifo.list(max_entries, marker, &result, &more));
+    ASSERT_EQ(max_entries - i - 1, result.size());
+    ASSERT_EQ(false, more);
+  }
+
+  /* tail now should point at head */
+  info = fifo.get_meta();
+  ASSERT_EQ(info.head_part_num, info.tail_part_num);
+
+  fifo_part_info part_info;
+
+  /* check old tails are removed */
+  for (int i = 0; i < info.tail_part_num; ++i) {
+    ASSERT_EQ(-ENOENT, fifo.get_part_info(i, &part_info));
+  }
+
+  /* check curent tail exists */
+  ASSERT_EQ(0, fifo.get_part_info(info.tail_part_num, &part_info));
+
+  ASSERT_EQ(0, destroy_one_pool_pp(pool_name, cluster));
+}
+
+TEST(FIFO, TestTwoPushers) {
+  Rados cluster;
+  std::string pool_name = get_temp_pool_name();
+  ASSERT_EQ("", create_one_pool_pp(pool_name, cluster));
+  IoCtx ioctx;
+  cluster.ioctx_create(pool_name.c_str(), ioctx);
+
+  string fifo_id = "fifo";
+
+  FIFO fifo(cct(ioctx), fifo_id, &ioctx);
+
+  uint64_t max_part_size = 2048;
+  uint64_t max_entry_size = 128;
+
+  char buf[max_entry_size];
+  memset(buf, 0, sizeof(buf));
+
+  /* create */
+  ASSERT_EQ(0, fifo.open(true,
+                         ClsFIFO::MetaCreateParams()
+                         .max_part_size(max_part_size)
+                         .max_entry_size(max_entry_size)));
+
+  uint32_t part_header_size;
+  uint32_t part_entry_overhead;
+
+  fifo.get_part_layout_info(&part_header_size, &part_entry_overhead);
+
+  int entries_per_part = (max_part_size - part_header_size) / (max_entry_size + part_entry_overhead);
+
+  int max_entries = entries_per_part * 4 + 1;
+
+  FIFO fifo2(cct(ioctx), fifo_id, &ioctx);
+
+  /* open second one */
+  ASSERT_EQ(0, fifo2.open(true,
+                         ClsFIFO::MetaCreateParams()));
+
+  vector<FIFO *> fifos(2);
+  fifos[0] = &fifo;
+  fifos[1] = &fifo2;
+
+  for (int i = 0; i < max_entries; ++i) {
+    bufferlist bl;
+
+    *(int *)buf = i;
+    bl.append(buf, sizeof(buf));
+
+    auto& f = fifos[i % fifos.size()];
+
+    ASSERT_EQ(0, f->push(bl));
+  }
+
+  /* list all by both */
+  vector<fifo_entry> result;
+  bool more;
+  ASSERT_EQ(0, fifo2.list(max_entries, string(), &result, &more));
+
+  ASSERT_EQ(false, more);
+
+  ASSERT_EQ(max_entries, result.size());
+
+  ASSERT_EQ(0, fifo.list(max_entries, string(), &result, &more));
+  ASSERT_EQ(false, more);
+
+  ASSERT_EQ(max_entries, result.size());
+
+  for (int i = 0; i < max_entries; ++i) {
+    auto& bl = result[i].data;
+    ASSERT_EQ(i, *(int *)bl.c_str());
+  }
+
+  ASSERT_EQ(0, destroy_one_pool_pp(pool_name, cluster));
+}
+
+TEST(FIFO, TestTwoPushersTrim) {
+  Rados cluster;
+  std::string pool_name = get_temp_pool_name();
+  ASSERT_EQ("", create_one_pool_pp(pool_name, cluster));
+  IoCtx ioctx;
+  cluster.ioctx_create(pool_name.c_str(), ioctx);
+
+  string fifo_id = "fifo";
+
+  FIFO fifo1(cct(ioctx), fifo_id, &ioctx);
+
+  uint64_t max_part_size = 2048;
+  uint64_t max_entry_size = 128;
+
+  char buf[max_entry_size];
+  memset(buf, 0, sizeof(buf));
+
+  /* create */
+  ASSERT_EQ(0, fifo1.open(true,
+                          ClsFIFO::MetaCreateParams()
+                          .max_part_size(max_part_size)
+                          .max_entry_size(max_entry_size)));
+
+  uint32_t part_header_size;
+  uint32_t part_entry_overhead;
+
+  fifo1.get_part_layout_info(&part_header_size, &part_entry_overhead);
+
+  int entries_per_part = (max_part_size - part_header_size) / (max_entry_size + part_entry_overhead);
+
+  int max_entries = entries_per_part * 4 + 1;
+
+  FIFO fifo2(cct(ioctx), fifo_id, &ioctx);
+
+  /* open second one */
+  ASSERT_EQ(0, fifo2.open(true,
+                         ClsFIFO::MetaCreateParams()));
+
+  /* push one entry to fifo2 and the rest to fifo1 */
+
+  for (int i = 0; i < max_entries; ++i) {
+    bufferlist bl;
+
+    *(int *)buf = i;
+    bl.append(buf, sizeof(buf));
+
+    FIFO *f = (i < 1 ? &fifo2 : &fifo1);
+
+    ASSERT_EQ(0, f->push(bl));
+  }
+
+  /* trim half by fifo1 */
+  int num = max_entries / 2;
+
+  vector<fifo_entry> result;
+  bool more;
+  ASSERT_EQ(0, fifo1.list(num, string(), &result, &more));
+
+  ASSERT_EQ(true, more);
+  ASSERT_EQ(num, result.size());
+
+  for (int i = 0; i < num; ++i) {
+    auto& bl = result[i].data;
+    ASSERT_EQ(i, *(int *)bl.c_str());
+  }
+
+  auto& entry = result[num - 1];
+  auto& marker = entry.marker;
+
+  ASSERT_EQ(0, fifo1.trim(marker));
+
+  /* list what's left by fifo2 */
+
+  int left = max_entries - num;
+
+  ASSERT_EQ(0, fifo2.list(left, marker, &result, &more));
+  ASSERT_EQ(left, result.size());
+  ASSERT_EQ(false, more);
+
+  for (int i = num; i < max_entries; ++i) {
+    auto& bl = result[i - num].data;
+    ASSERT_EQ(i, *(int *)bl.c_str());
+  }
+
+  ASSERT_EQ(0, destroy_one_pool_pp(pool_name, cluster));
+}
+
+TEST(FIFO, TestPushBatch) {
+  Rados cluster;
+  std::string pool_name = get_temp_pool_name();
+  ASSERT_EQ("", create_one_pool_pp(pool_name, cluster));
+  IoCtx ioctx;
+  cluster.ioctx_create(pool_name.c_str(), ioctx);
+
+  string fifo_id = "fifo";
+
+  FIFO fifo(cct(ioctx), fifo_id, &ioctx);
+
+  uint64_t max_part_size = 2048;
+  uint64_t max_entry_size = 128;
+
+  char buf[max_entry_size];
+  memset(buf, 0, sizeof(buf));
+
+  /* create */
+  ASSERT_EQ(0, fifo.open(true,
+                          ClsFIFO::MetaCreateParams()
+                          .max_part_size(max_part_size)
+                          .max_entry_size(max_entry_size)));
+
+  uint32_t part_header_size;
+  uint32_t part_entry_overhead;
+
+  fifo.get_part_layout_info(&part_header_size, &part_entry_overhead);
+
+  int entries_per_part = (max_part_size - part_header_size) / (max_entry_size + part_entry_overhead);
+
+  int max_entries = entries_per_part * 4 + 1; /* enough entries to span multiple parts */
+
+  vector<bufferlist> bufs;
+
+  for (int i = 0; i < max_entries; ++i) {
+    bufferlist bl;
+
+    *(int *)buf = i;
+    bl.append(buf, sizeof(buf));
+
+    bufs.push_back(bl);
+  }
+
+  ASSERT_EQ(0, fifo.push(bufs));
+
+  /* list all */
+
+  vector<fifo_entry> result;
+  bool more;
+  ASSERT_EQ(0, fifo.list(max_entries, string(), &result, &more));
+
+  ASSERT_EQ(false, more);
+  ASSERT_EQ(max_entries, result.size());
+
+  for (int i = 0; i < max_entries; ++i) {
+    auto& bl = result[i].data;
+    ASSERT_EQ(i, *(int *)bl.c_str());
+  }
+
+  auto& info = fifo.get_meta();
+  ASSERT_EQ(info.head_part_num, 4);
+
+  ASSERT_EQ(0, destroy_one_pool_pp(pool_name, cluster));
+}