]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
cls/queue: add 2-phase-commit queue implementation
authorYuval Lifshitz <yuvalif@yahoo.com>
Mon, 17 Feb 2020 16:45:15 +0000 (18:45 +0200)
committerYuval Lifshitz <yuvalif@yahoo.com>
Wed, 11 Mar 2020 20:28:52 +0000 (22:28 +0200)
2-phase commit (2pc) queue is based on the existing cls_queue
together with the abilities to:
* make a reservation of entries on the queue
* commit entries based on a reservation
* abort a reservation and release it
* get a list of all pending reservations (so they could be cleaned)

Signed-off-by: Yuval Lifshitz <yuvalif@yahoo.com>
14 files changed:
src/cls/2pc_queue/cls_2pc_queue.cc [new file with mode: 0644]
src/cls/2pc_queue/cls_2pc_queue_client.cc [new file with mode: 0644]
src/cls/2pc_queue/cls_2pc_queue_client.h [new file with mode: 0644]
src/cls/2pc_queue/cls_2pc_queue_const.h [new file with mode: 0644]
src/cls/2pc_queue/cls_2pc_queue_ops.h [new file with mode: 0644]
src/cls/2pc_queue/cls_2pc_queue_types.h [new file with mode: 0644]
src/cls/CMakeLists.txt
src/cls/queue/cls_queue_src.cc
src/cls/queue/cls_queue_src.h
src/cls/queue/cls_queue_types.h
src/osd/CMakeLists.txt
src/test/CMakeLists.txt
src/test/cls_2pc_queue/CMakeLists.txt [new file with mode: 0644]
src/test/cls_2pc_queue/test_cls_2pc_queue.cc [new file with mode: 0644]

diff --git a/src/cls/2pc_queue/cls_2pc_queue.cc b/src/cls/2pc_queue/cls_2pc_queue.cc
new file mode 100644 (file)
index 0000000..539105f
--- /dev/null
@@ -0,0 +1,287 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "include/types.h"
+
+#include "cls/2pc_queue/cls_2pc_queue_types.h"
+#include "cls/2pc_queue/cls_2pc_queue_ops.h"
+#include "cls/2pc_queue/cls_2pc_queue_const.h"
+#include "cls/queue/cls_queue_ops.h"
+#include "cls/queue/cls_queue_src.h"
+#include "objclass/objclass.h"
+
+CLS_VER(1,0)
+CLS_NAME(2pc_queue)
+
+static int cls_2pc_queue_init(cls_method_context_t hctx, bufferlist *in, bufferlist *out) {
+  auto in_iter = in->cbegin();
+
+  cls_queue_init_op op;
+  try {
+    decode(op, in_iter);
+  } catch (buffer::error& err) {
+    CLS_LOG(1, "ERROR: cls_2pc_queue_init: failed to decode entry: %s", err.what());
+    return -EINVAL;
+  }
+
+  cls_2pc_urgent_data urgent_data;
+
+  cls_queue_init_op init_op;
+
+  CLS_LOG(20, "INFO: cls_2pc_queue_init: max size is %lu (bytes)", op.queue_size);
+
+  init_op.queue_size = op.queue_size;
+  init_op.max_urgent_data_size = 23552; // overall head is 24KB ~ pending 1K reservations ops
+  encode(urgent_data, init_op.bl_urgent_data);
+
+  return queue_init(hctx, init_op);
+}
+
+static int cls_2pc_queue_reserve(cls_method_context_t hctx, bufferlist *in, bufferlist *out) {
+  cls_2pc_queue_reserve_op res_op;
+  try {
+    auto in_iter = in->cbegin();
+    decode(res_op, in_iter);
+  } catch (buffer::error& err) {
+    CLS_LOG(1, "ERROR: cls_2pc_queue_reserve: failed to decode entry: %s", err.what());
+    return -EINVAL;
+  }
+
+  if (res_op.size == 0) {
+    CLS_LOG(1, "ERROR: cls_2pc_queue_reserve: cannot reserve zero bytes");
+    return -EINVAL;
+  }
+  if (res_op.entries == 0) {
+    CLS_LOG(1, "ERROR: cls_2pc_queue_reserve: cannot reserve zero entries");
+    return -EINVAL;
+  }
+
+  // get head
+  cls_queue_head head;
+  int ret = queue_read_head(hctx, head);
+  if (ret < 0) {
+    return ret;
+  }
+
+  cls_2pc_urgent_data urgent_data;
+  try {
+    auto in_iter = head.bl_urgent_data.cbegin();
+    decode(urgent_data, in_iter);
+  } catch (buffer::error& err) {
+    CLS_LOG(1, "ERROR: cls_2pc_queue_reserve: failed to decode entry: %s", err.what());
+    return -EINVAL;
+  }
+
+  const auto overhead = res_op.entries*QUEUE_ENTRY_OVERHEAD;
+  const auto remaining_size = (head.tail.offset >= head.front.offset) ?
+    (head.queue_size - head.tail.offset) + (head.front.offset - head.max_head_size) :
+    head.front.offset - head.tail.offset;  
+
+
+  if (res_op.size + urgent_data.reserved_size + overhead > remaining_size) {
+    CLS_LOG(1, "ERROR: cls_2pc_queue_reserve: reservations exceeded maximum capacity");
+    CLS_LOG(10, "INFO: cls_2pc_queue_reserve: remaining size: %lu (bytes)", remaining_size);
+    CLS_LOG(10, "INFO: cls_2pc_queue_reserve: current reservations: %lu (bytes)", urgent_data.reserved_size);
+    CLS_LOG(10, "INFO: cls_2pc_queue_reserve: requested size: %lu (bytes)", res_op.size);
+    return -ENOSPC;
+  }
+
+  urgent_data.reserved_size += res_op.size + overhead;
+  bool result;
+  std::tie(std::ignore, result) = urgent_data.reservations.emplace(std::piecewise_construct,
+          std::forward_as_tuple(++urgent_data.last_id),
+          std::forward_as_tuple(res_op.size, ceph::real_clock::now()));
+  if (!result) {
+    // an old reservation that was never committed or aborted is in the map
+    // caller should try again assuming other IDs are ok
+    CLS_LOG(1, "ERROR: cls_2pc_queue_reserve: reservation id conflict after rollover: %u", urgent_data.last_id);
+    return -EAGAIN;
+  }
+
+  // write back head
+  head.bl_urgent_data.clear();
+  encode(urgent_data, head.bl_urgent_data);
+
+  const uint64_t urgent_data_length = head.bl_urgent_data.length();
+
+  if (head.max_urgent_data_size < urgent_data_length) {
+    // TODO: use objects xattr for spillover
+    CLS_LOG(1, "ERROR: cls_2pc_queue_reserve: urgent data size: %lu exceeded maximum: %lu", urgent_data_length, head.max_urgent_data_size);
+    return -ENOSPC;
+  }
+
+  ret = queue_write_head(hctx, head);
+  if (ret < 0) {
+    return ret;
+  }
+  
+  CLS_LOG(20, "INFO: cls_2pc_queue_reserve: remaining size: %lu (bytes)", remaining_size);
+  CLS_LOG(20, "INFO: cls_2pc_queue_reserve: current reservations: %lu (bytes)", urgent_data.reserved_size);
+  CLS_LOG(20, "INFO: cls_2pc_queue_reserve: requested size: %lu (bytes)", res_op.size);
+  CLS_LOG(20, "INFO: cls_2pc_queue_reserve: urgent data size: %lu (bytes)", urgent_data_length);
+  CLS_LOG(20, "INFO: cls_2pc_queue_reserve: current reservation ops: %lu", urgent_data.reservations.size());
+
+  cls_2pc_queue_reserve_ret op_ret;
+  op_ret.id = urgent_data.last_id;
+  encode(op_ret, *out);
+
+  return 0;
+}
+
+static int cls_2pc_queue_commit(cls_method_context_t hctx, bufferlist *in, bufferlist *out) {
+  cls_2pc_queue_commit_op commit_op;
+  try {
+    auto in_iter = in->cbegin();
+    decode(commit_op, in_iter);
+  } catch (buffer::error& err) {
+    CLS_LOG(1, "ERROR: cls_2pc_queue_commit: failed to decode entry: %s", err.what());
+    return -EINVAL;
+  }
+  
+  // get head
+  cls_queue_head head;
+  int ret = queue_read_head(hctx, head);
+  if (ret < 0) {
+    return ret;
+  }
+
+  cls_2pc_urgent_data urgent_data;
+  try {
+    auto in_iter = head.bl_urgent_data.cbegin();
+    decode(urgent_data, in_iter);
+  } catch (buffer::error& err) {
+    CLS_LOG(1, "ERROR: cls_2pc_queue_commit: failed to decode entry: %s", err.what());
+    return -EINVAL;
+  }
+  
+  auto it = urgent_data.reservations.find(commit_op.id);
+  if (it == urgent_data.reservations.end()) {
+    CLS_LOG(1, "ERROR: cls_2pc_queue_commit: reservation does not exist: %u", commit_op.id);
+    return -ENOENT;
+  }
+
+  auto& res = it->second;
+  const auto actual_size = std::accumulate(commit_op.bl_data_vec.begin(), 
+          commit_op.bl_data_vec.end(), 0UL, [] (uint64_t sum, const bufferlist& bl) {
+            return sum + bl.length();
+          });
+   
+  if (res.size < actual_size) {
+    CLS_LOG(1, "ERROR: cls_2pc_queue_commit: trying to commit %lu bytes to a %lu bytes reservation", 
+            actual_size,
+            res.size);
+    return -EINVAL;
+  }
+
+  // commit the data to the queue
+  cls_queue_enqueue_op enqueue_op;
+  enqueue_op.bl_data_vec = std::move(commit_op.bl_data_vec);
+  ret = queue_enqueue(hctx, enqueue_op, head);
+  if (ret < 0) {
+    return ret;
+  }
+
+  // remove the reservation
+  urgent_data.reserved_size -= res.size;
+  urgent_data.reservations.erase(it);
+  
+  CLS_LOG(20, "INFO: cls_2pc_queue_commit: current reservations: %lu (bytes)", urgent_data.reserved_size);
+  CLS_LOG(20, "INFO: cls_2pc_queue_reserve: current reservation ops: %lu", urgent_data.reservations.size());
+  // write back head
+  head.bl_urgent_data.clear();
+  encode(urgent_data, head.bl_urgent_data);
+  return queue_write_head(hctx, head);
+}
+
+static int cls_2pc_queue_abort(cls_method_context_t hctx, bufferlist *in, bufferlist *out) {
+  cls_2pc_queue_abort_op abort_op;
+  try {
+    auto in_iter = in->cbegin();
+    decode(abort_op, in_iter);
+  } catch (buffer::error& err) {
+    CLS_LOG(1, "ERROR: cls_2pc_queue_abort: failed to decode entry: %s", err.what());
+    return -EINVAL;
+  }
+  
+  // get head
+  cls_queue_head head;
+  int ret = queue_read_head(hctx, head);
+  if (ret < 0) {
+    return ret;
+  }
+
+  cls_2pc_urgent_data urgent_data;
+  try {
+    auto in_iter = head.bl_urgent_data.cbegin();
+    decode(urgent_data, in_iter);
+  } catch (buffer::error& err) {
+    CLS_LOG(1, "ERROR: cls_2pc_queue_abort: failed to decode entry: %s", err.what());
+    return -EINVAL;
+  }
+  
+  auto it = urgent_data.reservations.find(abort_op.id);
+  if (it == urgent_data.reservations.end()) {
+    CLS_LOG(10, "INFO: cls_2pc_queue_abort: reservation does not exist: %u", abort_op.id);
+    return 0;
+  }
+
+  // remove the reservation
+  urgent_data.reserved_size -= it->second.size;
+  urgent_data.reservations.erase(it);
+
+  CLS_LOG(20, "INFO: cls_2pc_queue_abort: current reservations: %lu (bytes)", urgent_data.reserved_size);
+  CLS_LOG(20, "INFO: cls_2pc_queue_reserve: current reservation ops: %lu", urgent_data.reservations.size());
+
+  // write back head
+  head.bl_urgent_data.clear();
+  encode(urgent_data, head.bl_urgent_data);
+  return queue_write_head(hctx, head);
+}
+
+static int cls_2pc_queue_list_reservations(cls_method_context_t hctx, bufferlist *in, bufferlist *out) {
+  //get head
+  cls_queue_head head;
+  int ret = queue_read_head(hctx, head);
+  if (ret < 0) {
+    return ret;
+  }
+
+  cls_2pc_urgent_data urgent_data;
+  try {
+    auto in_iter = head.bl_urgent_data.cbegin();
+    decode(urgent_data, in_iter);
+  } catch (buffer::error& err) {
+    CLS_LOG(1, "ERROR: cls_2pc_queue_list_reservations: failed to decode entry: %s", err.what());
+    return -EINVAL;
+  }
+  
+  cls_2pc_queue_reservations_ret op_ret;
+  op_ret.reservations = std::move(urgent_data.reservations);
+
+  encode(op_ret, *out);
+
+  return 0;
+}
+
+CLS_INIT(2pc_queue)
+{
+  CLS_LOG(1, "Loaded 2pc queue class!");
+
+  cls_handle_t h_class;
+  cls_method_handle_t h_2pc_queue_init;
+  cls_method_handle_t h_2pc_queue_reserve;
+  cls_method_handle_t h_2pc_queue_commit;
+  cls_method_handle_t h_2pc_queue_abort;
+  cls_method_handle_t h_2pc_queue_list_reservations;
+
+  cls_register(TPC_QUEUE_CLASS, &h_class);
+
+  cls_register_cxx_method(h_class, TPC_QUEUE_INIT, CLS_METHOD_RD | CLS_METHOD_WR, cls_2pc_queue_init, &h_2pc_queue_init);
+  cls_register_cxx_method(h_class, TPC_QUEUE_RESERVE, CLS_METHOD_RD | CLS_METHOD_WR, cls_2pc_queue_reserve, &h_2pc_queue_reserve);
+  cls_register_cxx_method(h_class, TPC_QUEUE_COMMIT, CLS_METHOD_RD | CLS_METHOD_WR, cls_2pc_queue_commit, &h_2pc_queue_commit);
+  cls_register_cxx_method(h_class, TPC_QUEUE_ABORT, CLS_METHOD_RD | CLS_METHOD_WR, cls_2pc_queue_abort, &h_2pc_queue_abort);
+  cls_register_cxx_method(h_class, TPC_QUEUE_LIST_RESERVATIONS, CLS_METHOD_RD, cls_2pc_queue_list_reservations, &h_2pc_queue_list_reservations);
+
+  return;
+}
+
diff --git a/src/cls/2pc_queue/cls_2pc_queue_client.cc b/src/cls/2pc_queue/cls_2pc_queue_client.cc
new file mode 100644 (file)
index 0000000..ebd7c4a
--- /dev/null
@@ -0,0 +1,143 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "cls/2pc_queue/cls_2pc_queue_client.h"
+#include "cls/2pc_queue/cls_2pc_queue_ops.h"
+#include "cls/2pc_queue/cls_2pc_queue_const.h"
+#include "cls/queue/cls_queue_ops.h"
+#include "cls/queue/cls_queue_const.h"
+
+using namespace librados;
+
+void cls_2pc_queue_init(ObjectWriteOperation& op, const std::string& queue_name, uint64_t size) {
+  bufferlist in;
+  cls_queue_init_op call;
+  call.queue_size = size;
+  encode(call, in);
+  op.exec(TPC_QUEUE_CLASS, TPC_QUEUE_INIT, in);
+}
+
+int cls_2pc_queue_get_capacity(IoCtx& io_ctx, const string& queue_name, uint64_t& size) {
+  bufferlist in, out;
+  const auto r = io_ctx.exec(queue_name, QUEUE_CLASS, QUEUE_GET_CAPACITY, in, out);
+  if (r < 0 ) {
+    return r;
+  }
+
+  cls_queue_get_capacity_ret op_ret;
+  auto iter = out.cbegin();
+  try {
+    decode(op_ret, iter);
+  } catch (buffer::error& err) {
+    return -EIO;
+  }
+
+  size = op_ret.queue_capacity;
+
+  return 0;
+}
+
+int cls_2pc_queue_reserve(IoCtx& io_ctx, const string& queue_name, librados::ObjectWriteOperation& op, 
+        uint64_t res_size, uint32_t entries, cls_2pc_reservation::id_t& res_id) {
+  bufferlist in, out;
+  cls_2pc_queue_reserve_op reserve_op;
+  reserve_op.size = res_size;
+  reserve_op.entries = entries;
+
+  encode(reserve_op, in);
+  int rval;
+  op.exec(TPC_QUEUE_CLASS, TPC_QUEUE_RESERVE, in, &out, &rval);
+  const auto r = io_ctx.operate(queue_name, &op, librados::OPERATION_RETURNVEC);
+  if (r < 0) {
+    return r;
+  }
+
+  cls_2pc_queue_reserve_ret op_ret;
+  auto iter = out.cbegin();
+  try {
+    decode(op_ret, iter);
+  } catch (buffer::error& err) {
+    return -EIO;
+  }
+  res_id = op_ret.id;
+
+  return 0;
+}
+
+void cls_2pc_queue_commit(ObjectWriteOperation& op, std::vector<bufferlist> bl_data_vec, 
+        cls_2pc_reservation::id_t res_id) {
+  bufferlist in;
+  cls_2pc_queue_commit_op commit_op;
+  commit_op.id = res_id;
+  commit_op.bl_data_vec = std::move(bl_data_vec);
+  encode(commit_op, in);
+  op.exec(TPC_QUEUE_CLASS, TPC_QUEUE_COMMIT, in);
+}
+
+void cls_2pc_queue_abort(ObjectWriteOperation& op, cls_2pc_reservation::id_t res_id) {
+  bufferlist in;
+  cls_2pc_queue_abort_op abort_op;
+  abort_op.id = res_id;
+  encode(abort_op, in);
+  op.exec(TPC_QUEUE_CLASS, TPC_QUEUE_ABORT, in);
+}
+
+int cls_2pc_queue_list_entries(IoCtx& io_ctx, const string& queue_name, const string& marker, uint32_t max,
+                            std::vector<cls_queue_entry>& entries,
+                            bool *truncated, std::string& next_marker) {
+  bufferlist in, out;
+  cls_queue_list_op op;
+  op.start_marker = marker;
+  op.max = max;
+  encode(op, in);
+
+  const auto r = io_ctx.exec(queue_name, QUEUE_CLASS, QUEUE_LIST_ENTRIES, in, out);
+  if (r < 0) {
+    return r;
+  }
+
+  cls_queue_list_ret ret;
+  auto iter = out.cbegin();
+  try {
+    decode(ret, iter);
+  } catch (buffer::error& err) {
+    return -EIO;
+  }
+
+  entries = std::move(ret.entries);
+  *truncated = ret.is_truncated;
+
+  next_marker = std::move(ret.next_marker);
+
+  return 0;
+}
+
+int cls_2pc_queue_list_reservations(librados::IoCtx& io_ctx, const std::string& queue_name, cls_2pc_reservations& reservations) {
+  bufferlist in, out;
+
+  const auto r = io_ctx.exec(queue_name, TPC_QUEUE_CLASS, TPC_QUEUE_LIST_RESERVATIONS, in, out);
+  if (r < 0) {
+    return r;
+  }
+
+  cls_2pc_queue_reservations_ret ret;
+  auto iter = out.cbegin();
+  try {
+    decode(ret, iter);
+  } catch (buffer::error& err) {
+    return -EIO;
+  }
+
+  reservations = std::move(ret.reservations);
+
+  return 0;
+}
+
+void cls_2pc_queue_remove_entries(ObjectWriteOperation& op, const std::string& end_marker) {
+  bufferlist in;
+  cls_queue_remove_op rem_op;
+  rem_op.end_marker = end_marker;
+  encode(rem_op, in);
+  op.exec(QUEUE_CLASS, QUEUE_REMOVE_ENTRIES, in);
+}
+
diff --git a/src/cls/2pc_queue/cls_2pc_queue_client.h b/src/cls/2pc_queue/cls_2pc_queue_client.h
new file mode 100644 (file)
index 0000000..6b408e1
--- /dev/null
@@ -0,0 +1,47 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <string>
+#include <vector>
+#include "include/rados/librados.hpp"
+#include "cls/queue/cls_queue_types.h"
+#include "cls/2pc_queue/cls_2pc_queue_types.h"
+
+// initialize the queue with maximum size (bytes)
+// note that the actual size of the queue will be larger, as 24K bytes will be allocated in the head object
+// and more may be allocated as xattrs of the object (depending with the number of concurrent reservations)
+void cls_2pc_queue_init(librados::ObjectWriteOperation& op, const std::string& queue_name, uint64_t size);
+
+// return max capacity (bytes)
+int cls_2pc_queue_get_capacity(librados::IoCtx& io_ctx, const string& queue_name, uint64_t& size);
+
+// make a reservation on the queue (in bytes) and number of expected entries (to calculate overhead)
+// return a reservation id if reservations is possible, 0 otherwise
+int cls_2pc_queue_reserve(librados::IoCtx& io_ctx, const std::string& queue_name, librados::ObjectWriteOperation& op, 
+        uint64_t res_size, uint32_t entries, cls_2pc_reservation::id_t& res_id);
+
+// commit data using a reservation done beforehand
+// res_id must be allocated using cls_2pc_queue_reserve, and could be either committed or aborted once
+// the size of bl_data_vec must be equal or smaller to the size reserved for the res_id
+// note that the number of entries in bl_data_vec does not have to match the number of entries reserved
+// only size (including the overhead of the entries) is checked
+void cls_2pc_queue_commit(librados::ObjectWriteOperation& op, std::vector<bufferlist> bl_data_vec, 
+        cls_2pc_reservation::id_t res_id);
+
+// abort a reservation
+// res_id must be allocated using cls_2pc_queue_reserve
+void cls_2pc_queue_abort(librados::ObjectWriteOperation& op, 
+        cls_2pc_reservation::id_t res_id);
+
+// incremental listing of all entries in the queue
+int cls_2pc_queue_list_entries(librados::IoCtx& io_ctx, const std::string& queue_name, const std::string& marker, uint32_t max,
+        std::vector<cls_queue_entry>& entries, bool *truncated, std::string& next_marker);
+
+// list all pending reservations in the queue
+int cls_2pc_queue_list_reservations(librados::IoCtx& io_ctx, const std::string& queue_name, cls_2pc_reservations& reservations);
+
+// remove all entries up to the given marker
+void cls_2pc_queue_remove_entries(librados::ObjectWriteOperation& op, const std::string& end_marker);
+
diff --git a/src/cls/2pc_queue/cls_2pc_queue_const.h b/src/cls/2pc_queue/cls_2pc_queue_const.h
new file mode 100644 (file)
index 0000000..2a22ff1
--- /dev/null
@@ -0,0 +1,10 @@
+#pragma once
+
+#define TPC_QUEUE_CLASS "2pc_queue"
+
+#define TPC_QUEUE_INIT "2pc_queue_init"
+#define TPC_QUEUE_RESERVE "2pc_queue_reserve"
+#define TPC_QUEUE_COMMIT "2pc_queue_commit"
+#define TPC_QUEUE_ABORT "2pc_queue_abort"
+#define TPC_QUEUE_LIST_RESERVATIONS "2pc_queue_list_reservations"
+
diff --git a/src/cls/2pc_queue/cls_2pc_queue_ops.h b/src/cls/2pc_queue/cls_2pc_queue_ops.h
new file mode 100644 (file)
index 0000000..5a58dc6
--- /dev/null
@@ -0,0 +1,97 @@
+#pragma once
+
+#include "include/types.h"
+#include "cls_2pc_queue_types.h"
+
+struct cls_2pc_queue_reserve_op {
+  uint64_t size;
+  uint32_t entries;
+
+  void encode(bufferlist& bl) const {
+    ENCODE_START(1, 1, bl);
+    encode(size, bl);
+    encode(entries, bl);
+    ENCODE_FINISH(bl);
+  }
+
+  void decode(bufferlist::const_iterator& bl) {
+    DECODE_START(1, bl);
+    decode(size, bl);
+    decode(entries, bl);
+    DECODE_FINISH(bl);
+  }
+};
+WRITE_CLASS_ENCODER(cls_2pc_queue_reserve_op)
+
+struct cls_2pc_queue_reserve_ret {
+  cls_2pc_reservation::id_t id; // allocated reservation id
+
+  void encode(bufferlist& bl) const {
+    ENCODE_START(1, 1, bl);
+    encode(id, bl);
+    ENCODE_FINISH(bl);
+  }
+
+  void decode(bufferlist::const_iterator& bl) {
+    DECODE_START(1, bl);
+    decode(id, bl);
+    DECODE_FINISH(bl);
+  }
+};
+WRITE_CLASS_ENCODER(cls_2pc_queue_reserve_ret)
+
+struct cls_2pc_queue_commit_op {
+  cls_2pc_reservation::id_t id; // reservation to commit
+  std::vector<bufferlist> bl_data_vec; // the data to enqueue
+
+  void encode(bufferlist& bl) const {
+    ENCODE_START(1, 1, bl);
+    encode(id, bl);
+    encode(bl_data_vec, bl);
+    ENCODE_FINISH(bl);
+  }
+
+  void decode(bufferlist::const_iterator& bl) {
+    DECODE_START(1, bl);
+    decode(id, bl);
+    decode(bl_data_vec, bl);
+    DECODE_FINISH(bl);
+  }
+
+};
+WRITE_CLASS_ENCODER(cls_2pc_queue_commit_op)
+
+struct cls_2pc_queue_abort_op {
+  cls_2pc_reservation::id_t id; // reservation to abort
+
+  void encode(bufferlist& bl) const {
+    ENCODE_START(1, 1, bl);
+    encode(id, bl);
+    ENCODE_FINISH(bl);
+  }
+
+  void decode(bufferlist::const_iterator& bl) {
+    DECODE_START(1, bl);
+    decode(id, bl);
+    DECODE_FINISH(bl);
+  }
+};
+WRITE_CLASS_ENCODER(cls_2pc_queue_abort_op)
+
+struct cls_2pc_queue_reservations_ret {
+  cls_2pc_reservations reservations; // reservation list (keyed by id)
+
+  void encode(bufferlist& bl) const {
+    ENCODE_START(1, 1, bl);
+    encode(reservations, bl);
+    ENCODE_FINISH(bl);
+  }
+
+  void decode(bufferlist::const_iterator& bl) {
+    DECODE_START(1, bl);
+    decode(reservations, bl);
+    DECODE_FINISH(bl);
+  }
+};
+WRITE_CLASS_ENCODER(cls_2pc_queue_reservations_ret)
+
diff --git a/src/cls/2pc_queue/cls_2pc_queue_types.h b/src/cls/2pc_queue/cls_2pc_queue_types.h
new file mode 100644 (file)
index 0000000..acf6534
--- /dev/null
@@ -0,0 +1,58 @@
+#pragma once
+
+#include "include/types.h"
+
+struct cls_2pc_reservation
+{
+  using id_t = uint32_t;
+  inline static const id_t NO_ID{0};
+  uint64_t size;        // how many entries are reserved
+  ceph::real_time timestamp;  // when the reservation was done (used for cleaning stale reservations)
+
+  cls_2pc_reservation(uint64_t _size, ceph::real_time _timestamp) :
+      size(_size), timestamp(_timestamp) {}
+
+  cls_2pc_reservation() = default;
+
+  void encode(bufferlist& bl) const {
+    ENCODE_START(1, 1, bl);
+    encode(size, bl);
+    encode(timestamp, bl);
+    ENCODE_FINISH(bl);
+  }
+
+  void decode(bufferlist::const_iterator& bl) {
+    DECODE_START(1, bl);
+    decode(size, bl);
+    decode(timestamp, bl);
+    DECODE_FINISH(bl);
+  }
+};
+WRITE_CLASS_ENCODER(cls_2pc_reservation)
+
+using cls_2pc_reservations = ceph::unordered_map<cls_2pc_reservation::id_t, cls_2pc_reservation>;
+
+struct cls_2pc_urgent_data
+{
+  uint64_t reserved_size{0};   // pending reservations size in bytes
+  cls_2pc_reservation::id_t last_id{cls_2pc_reservation::NO_ID}; // last allocated id
+  cls_2pc_reservations reservations; // reservation list (keyed by id)
+
+  void encode(bufferlist& bl) const {
+    ENCODE_START(1, 1, bl);
+    encode(reserved_size, bl);
+    encode(last_id, bl);
+    encode(reservations, bl);
+    ENCODE_FINISH(bl);
+  }
+
+  void decode(bufferlist::const_iterator& bl) {
+    DECODE_START(1, bl);
+    decode(reserved_size, bl);
+    decode(last_id, bl);
+    decode(reservations, bl);
+    DECODE_FINISH(bl);
+  }
+};
+WRITE_CLASS_ENCODER(cls_2pc_urgent_data)
+
index 00f077ce0ec1d2087a74a0edf8918e324ea8339c..72c3fd08bbe1d92d189021c83860a81ad0f1731b 100644 (file)
@@ -300,3 +300,21 @@ if (WITH_RADOSGW)
     rgw_gc/cls_rgw_gc_client.cc)
   add_library(cls_rgw_gc_client STATIC ${cls_rgw_gc_client_srcs})
 endif (WITH_RADOSGW)
+
+#cls_2pc_queue
+set(cls_2pc_queue_srcs
+  2pc_queue/cls_2pc_queue.cc
+  queue/cls_queue_src.cc
+  ${CMAKE_SOURCE_DIR}/src/common/ceph_json.cc)
+add_library(cls_2pc_queue SHARED ${cls_2pc_queue_srcs})
+set_target_properties(cls_2pc_queue PROPERTIES
+  VERSION "1.0.0"
+  SOVERSION "1"
+  INSTALL_RPATH ""
+  CXX_VISIBILITY_PRESET hidden)
+install(TARGETS cls_2pc_queue DESTINATION ${cls_dir})
+
+set(cls_2pc_queue_client_srcs
+  2pc_queue/cls_2pc_queue_client.cc)
+add_library(cls_2pc_queue_client STATIC ${cls_2pc_queue_client_srcs})
+
index b48dcd19f8517188d6fda54c39b1ef8ed5682143..879cfd2b4921364f800a878f511805efdb9b128c 100644 (file)
@@ -67,10 +67,9 @@ int queue_read_head(cls_method_context_t hctx, cls_queue_head& head)
     return -EINVAL;
   }
 
-  constexpr auto decoded_head_size = sizeof(queue_head_start) + sizeof(encoded_len);
-  if (encoded_len > (chunk_size - decoded_head_size)) {
+  if (encoded_len > (chunk_size - QUEUE_ENTRY_OVERHEAD)) {
     start_offset = chunk_size;
-    chunk_size = (encoded_len - (chunk_size - decoded_head_size));
+    chunk_size = (encoded_len - (chunk_size - QUEUE_ENTRY_OVERHEAD));
     bufferlist bl_remaining_head;
     const auto ret = cls_cxx_read2(hctx, start_offset, chunk_size, &bl_remaining_head, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL);
     if (ret < 0) {
@@ -139,6 +138,32 @@ int queue_get_capacity(cls_method_context_t hctx, cls_queue_get_capacity_ret& op
   return 0;
 }
 
+
+/*
+enqueue of new bufferlist happens in the free spaces of the queue, the queue can be in
+one of two states:
+
+(1) split free space
++-------------+--------------------------------------------------------------------+
+| object head |                XXXXXXXXXXXXXXXXXXXXXXXXXXX                         |
+|             |                ^                         ^                         |
+| front  tail |                |                         |                         |
++---+------+--+----------------|-------------------------|-------------------------+
+    |      |                   |                         |
+    |      +-------------------|-------------------------+
+    +--------------------------+
+
+(2) continuous free space
++-------------+--------------------------------------------------------------------+
+| object head |XXXXXXXXXXXXXXXXX                         XXXXXXXXXXXXXXXXXXXXXXXXXX|
+|             |                ^                         ^                         |
+| front  tail |                |                         |                         |
++---+------+--+----------------|-------------------------|-------------------------+
+    |      |                   |                         |
+    |      +-------------------+                         |
+    +----------------------------------------------------+
+*/
+
 int queue_enqueue(cls_method_context_t hctx, cls_queue_enqueue_op& op, cls_queue_head& head)
 {
   if ((head.front.offset == head.tail.offset) && (head.tail.gen == head.front.gen + 1)) {
@@ -314,7 +339,7 @@ int queue_list_entries(cls_method_context_t hctx, const cls_queue_list_op& op, c
       }
       // Magic number + Data size - process if not done in previous iteration
       if (! entry_start_processed ) {
-        if (size_to_process >= (sizeof(uint16_t) + sizeof(uint64_t))) {
+        if (size_to_process >= QUEUE_ENTRY_OVERHEAD) {
           // Decode magic number at start
           try {
             decode(entry_start, it);
index 9111f1f0409c92e376e159d9f08dcffe1f4b9068..9970b98ea7cf6c9fef05c3ca22e9885f891b98ba 100644 (file)
@@ -1,6 +1,10 @@
 #ifndef CEPH_CLS_QUEUE_SRC_H
 #define CEPH_CLS_QUEUE_SRC_H
 
+#include "objclass/objclass.h"
+#include "cls/queue/cls_queue_types.h"
+#include "cls/queue/cls_queue_ops.h"
+
 int queue_write_head(cls_method_context_t hctx, cls_queue_head& head);
 int queue_read_head(cls_method_context_t hctx, cls_queue_head& head);
 int queue_init(cls_method_context_t hctx, const cls_queue_init_op& op);
@@ -9,4 +13,4 @@ int queue_enqueue(cls_method_context_t hctx, cls_queue_enqueue_op& op, cls_queue
 int queue_list_entries(cls_method_context_t hctx, const cls_queue_list_op& op, cls_queue_list_ret& op_ret, cls_queue_head& head);
 int queue_remove_entries(cls_method_context_t hctx, const cls_queue_remove_op& op, cls_queue_head& head);
 
-#endif /* CEPH_CLS_QUEUE_SRC_H */
\ No newline at end of file
+#endif /* CEPH_CLS_QUEUE_SRC_H */
index cf50cb163f45815241caefa0d23e8e90947d44eb..0a253bdfefb8ed19fc263e7b9b1fdb585704c8ab 100644 (file)
@@ -11,6 +11,7 @@
 
 constexpr unsigned int QUEUE_HEAD_START = 0xDEAD;
 constexpr unsigned int QUEUE_ENTRY_START = 0xBEEF;
+constexpr unsigned int QUEUE_ENTRY_OVERHEAD = sizeof(uint16_t) + sizeof(uint64_t);
 
 struct cls_queue_entry
 {
@@ -114,4 +115,4 @@ struct cls_queue_head
 };
 WRITE_CLASS_ENCODER(cls_queue_head)
 
-#endif
\ No newline at end of file
+#endif
index c993ac2ff1b64ffcd5e8a18fd496b51524ae1184..56844af949933c33c97c23babeb1a3de33462f13 100644 (file)
@@ -67,5 +67,5 @@ if(WITH_RBD)
   add_dependencies(osd cls_rbd)
 endif()
 if(WITH_RADOSGW)
-  add_dependencies(osd cls_otp cls_rgw cls_queue cls_rgw_gc)
+  add_dependencies(osd cls_otp cls_rgw cls_queue cls_rgw_gc cls_2pc_queue)
 endif()
index 82208219e7873aa5e14033d02134196ea9f432c0..f4157d184b96cd271357e357134f51dd53554ff5 100644 (file)
@@ -22,6 +22,7 @@ add_subdirectory(cls_version)
 add_subdirectory(cls_lua)
 add_subdirectory(cls_rgw_gc)
 add_subdirectory(cls_queue)
+add_subdirectory(cls_2pc_queue)
 add_subdirectory(common)
 add_subdirectory(compressor)
 add_subdirectory(crush)
diff --git a/src/test/cls_2pc_queue/CMakeLists.txt b/src/test/cls_2pc_queue/CMakeLists.txt
new file mode 100644 (file)
index 0000000..1b5a4ed
--- /dev/null
@@ -0,0 +1,18 @@
+add_executable(ceph_test_cls_2pc_queue
+  test_cls_2pc_queue.cc
+)
+target_link_libraries(ceph_test_cls_2pc_queue
+  cls_2pc_queue_client
+  cls_queue_client
+  librados
+  global
+  ${UNITTEST_LIBS}
+  ${EXTRALIBS}
+  ${BLKID_LIBRARIES}
+  ${CMAKE_DL_LIBS}
+  radostest-cxx)
+
+ install(TARGETS
+  ceph_test_cls_2pc_queue
+  DESTINATION ${CMAKE_INSTALL_BINDIR})
+
diff --git a/src/test/cls_2pc_queue/test_cls_2pc_queue.cc b/src/test/cls_2pc_queue/test_cls_2pc_queue.cc
new file mode 100644 (file)
index 0000000..6ca8817
--- /dev/null
@@ -0,0 +1,704 @@
+// -*- mode:C; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "include/types.h"
+
+#include "cls/2pc_queue/cls_2pc_queue_types.h"
+#include "cls/2pc_queue/cls_2pc_queue_client.h"
+#include "cls/queue/cls_queue_client.h"
+#include "cls/2pc_queue/cls_2pc_queue_types.h"
+
+#include "gtest/gtest.h"
+#include "test/librados/test_cxx.h"
+#include "global/global_context.h"
+
+#include <string>
+#include <vector>
+#include <algorithm>
+#include <thread>
+#include <chrono>
+#include <atomic>
+
+class TestCls2PCQueue : public ::testing::Test {
+protected:
+  librados::Rados rados;
+  std::string pool_name;
+  librados::IoCtx ioctx;
+
+  void SetUp() override {
+    pool_name = get_temp_pool_name();
+    ASSERT_EQ("", create_one_pool_pp(pool_name, rados));
+    ASSERT_EQ(0, rados.ioctx_create(pool_name.c_str(), ioctx));
+  }
+
+  void TearDown() override {
+    ioctx.close();
+    ASSERT_EQ(0, destroy_one_pool_pp(pool_name, rados));
+  }
+};
+
+TEST_F(TestCls2PCQueue, GetCapacity)
+{
+  const std::string queue_name = "my-queue";
+  const auto max_size = 8*1024;
+  librados::ObjectWriteOperation op;
+  op.create(true);
+  cls_2pc_queue_init(op, queue_name, max_size);
+  ASSERT_EQ(0, ioctx.operate(queue_name, &op));
+
+  uint64_t size;
+
+  const int ret = cls_queue_get_capacity(ioctx, queue_name, size);
+  ASSERT_EQ(0, ret);
+  ASSERT_EQ(max_size, size);
+}
+
+TEST_F(TestCls2PCQueue, Reserve)
+{
+  const std::string queue_name = "my-queue";
+  const auto max_size = 1024U*1024U;
+  const auto number_of_ops = 10U;
+  const auto number_of_elements = 23U;
+  const auto size_to_reserve = 250U;
+  librados::ObjectWriteOperation op;
+  op.create(true);
+  cls_2pc_queue_init(op, queue_name, max_size);
+  ASSERT_EQ(0, ioctx.operate(queue_name, &op));
+
+  for (auto i = 0U; i < number_of_ops; ++i) {
+    cls_2pc_reservation::id_t res_id;
+    ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, op, size_to_reserve, number_of_elements, res_id), 0);
+    ASSERT_NE(res_id, cls_2pc_reservation::NO_ID);
+  }
+  cls_2pc_reservations reservations;
+  ASSERT_EQ(0, cls_2pc_queue_list_reservations(ioctx, queue_name, reservations));
+  ASSERT_EQ(reservations.size(), number_of_ops);
+  for (const auto& r : reservations) {
+      ASSERT_NE(r.first, cls_2pc_reservation::NO_ID);
+      ASSERT_GT(r.second.timestamp.time_since_epoch().count(), 0);
+  }
+}
+
+TEST_F(TestCls2PCQueue, Commit)
+{
+  const std::string queue_name = "my-queue";
+  const auto max_size = 1024*1024*128;
+  const auto number_of_ops = 200U;
+  const auto number_of_elements = 23U;
+  librados::ObjectWriteOperation op;
+  op.create(true);
+  cls_2pc_queue_init(op, queue_name, max_size);
+  ASSERT_EQ(0, ioctx.operate(queue_name, &op));
+
+  for (auto i = 0U; i < number_of_ops; ++i) {
+    const std::string element_prefix("op-" +to_string(i) + "-element-");
+    auto total_size = 0UL;
+    std::vector<bufferlist> data(number_of_elements);
+    // create vector of buffer lists
+    std::generate(data.begin(), data.end(), [j = 0, &element_prefix, &total_size] () mutable {
+          bufferlist bl;
+          bl.append(element_prefix + to_string(j++));
+          total_size += bl.length();
+          return bl;
+        });
+
+    cls_2pc_reservation::id_t res_id;
+    ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, op, total_size, number_of_elements, res_id), 0);
+    ASSERT_NE(res_id, cls_2pc_reservation::NO_ID);
+    cls_2pc_queue_commit(op, data, res_id);
+    ASSERT_EQ(0, ioctx.operate(queue_name, &op));
+  }
+  cls_2pc_reservations reservations;
+  ASSERT_EQ(0, cls_2pc_queue_list_reservations(ioctx, queue_name, reservations));
+  ASSERT_EQ(reservations.size(), 0);
+}
+
+TEST_F(TestCls2PCQueue, Abort)
+{
+  const std::string queue_name = "my-queue";
+  const auto max_size = 1024U*1024U;
+  const auto number_of_ops = 17U;
+  const auto number_of_elements = 23U;
+  const auto size_to_reserve = 250U;
+  librados::ObjectWriteOperation op;
+  op.create(true);
+  cls_2pc_queue_init(op, queue_name, max_size);
+  ASSERT_EQ(0, ioctx.operate(queue_name, &op));
+
+  for (auto i = 0U; i < number_of_ops; ++i) {
+    cls_2pc_reservation::id_t res_id;
+    ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, op, size_to_reserve, number_of_elements, res_id), 0);
+    ASSERT_NE(res_id, cls_2pc_reservation::NO_ID);
+    cls_2pc_queue_abort(op, res_id);
+    ASSERT_EQ(0, ioctx.operate(queue_name, &op));
+  }
+  cls_2pc_reservations reservations;
+  ASSERT_EQ(0, cls_2pc_queue_list_reservations(ioctx, queue_name, reservations));
+  ASSERT_EQ(reservations.size(), 0);
+}
+
+TEST_F(TestCls2PCQueue, ReserveError)
+{
+  const std::string queue_name = "my-queue";
+  const auto max_size = 256U*1024U;
+  const auto number_of_ops = 254U;
+  const auto number_of_elements = 1U;
+  const auto size_to_reserve = 1024U;
+  librados::ObjectWriteOperation op;
+  op.create(true);
+  cls_2pc_queue_init(op, queue_name, max_size);
+  ASSERT_EQ(0, ioctx.operate(queue_name, &op));
+
+  cls_2pc_reservation::id_t res_id;
+  for (auto i = 0U; i < number_of_ops-1; ++i) {
+    ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, op, size_to_reserve, number_of_elements, res_id), 0);
+    ASSERT_NE(res_id, cls_2pc_reservation::NO_ID);
+  }
+  res_id = cls_2pc_reservation::NO_ID;
+  // this one is failing because it exceeds the queue size
+  ASSERT_NE(cls_2pc_queue_reserve(ioctx, queue_name, op, size_to_reserve, number_of_elements, res_id), 0);
+  ASSERT_EQ(res_id, cls_2pc_reservation::NO_ID);
+
+  // this one is failing because it tries to reserve 0 entries
+  ASSERT_NE(cls_2pc_queue_reserve(ioctx, queue_name, op, size_to_reserve, 0, res_id), 0);
+  // this one is failing because it tries to reserve 0 bytes
+  ASSERT_NE(cls_2pc_queue_reserve(ioctx, queue_name, op, 0, number_of_elements, res_id), 0);
+
+  cls_2pc_reservations reservations;
+  ASSERT_EQ(0, cls_2pc_queue_list_reservations(ioctx, queue_name, reservations));
+  ASSERT_EQ(reservations.size(), number_of_ops-1);
+  for (const auto& r : reservations) {
+      ASSERT_NE(r.first, cls_2pc_reservation::NO_ID);
+      ASSERT_GT(r.second.timestamp.time_since_epoch().count(), 0);
+  }
+}
+
+TEST_F(TestCls2PCQueue, CommitError)
+{
+  const std::string queue_name = "my-queue";
+  const auto max_size = 1024*1024;
+  const auto number_of_ops = 17U;
+  const auto number_of_elements = 23U;
+  librados::ObjectWriteOperation op;
+  op.create(true);
+  cls_2pc_queue_init(op, queue_name, max_size);
+  ASSERT_EQ(0, ioctx.operate(queue_name, &op));
+
+  const auto invalid_reservation_op = 8;
+  const auto invalid_elements_op = 11;
+  std::vector<bufferlist> invalid_data(number_of_elements+3);
+  // create vector of buffer lists
+  std::generate(invalid_data.begin(), invalid_data.end(), [j = 0] () mutable {
+      bufferlist bl;
+      bl.append("invalid data is larger that regular data" + to_string(j++));
+      return bl;
+    });
+  for (auto i = 0U; i < number_of_ops; ++i) {
+    const std::string element_prefix("op-" +to_string(i) + "-element-");
+    std::vector<bufferlist> data(number_of_elements);
+    auto total_size = 0UL;
+    // create vector of buffer lists
+    std::generate(data.begin(), data.end(), [j = 0, &element_prefix, &total_size] () mutable {
+          bufferlist bl;
+          bl.append(element_prefix + to_string(j++));
+          total_size += bl.length();
+          return bl;
+        });
+
+    cls_2pc_reservation::id_t res_id;
+    ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, op, total_size, number_of_elements, res_id), 0);
+    ASSERT_NE(res_id, cls_2pc_reservation::NO_ID);
+    if (i == invalid_reservation_op) {
+      // fail on a commits with invalid reservation id
+      cls_2pc_queue_commit(op, data, res_id+999);
+      ASSERT_NE(0, ioctx.operate(queue_name, &op));
+    } else if (i == invalid_elements_op) {
+      // fail on a commits when data size is larger than the reserved one
+      cls_2pc_queue_commit(op, invalid_data, res_id);
+      ASSERT_NE(0, ioctx.operate(queue_name, &op));
+    } else {
+      cls_2pc_queue_commit(op, data, res_id);
+      ASSERT_EQ(0, ioctx.operate(queue_name, &op));
+    }
+  }
+  cls_2pc_reservations reservations;
+  ASSERT_EQ(0, cls_2pc_queue_list_reservations(ioctx, queue_name, reservations));
+  // 2 reservations were not comitted
+  ASSERT_EQ(reservations.size(), 2);
+}
+
+TEST_F(TestCls2PCQueue, AbortError)
+{
+  const std::string queue_name = "my-queue";
+  const auto max_size = 1024*1024;
+  const auto number_of_ops = 17U;
+  const auto number_of_elements = 23U;
+  const auto size_to_reserve = 250U;
+  librados::ObjectWriteOperation op;
+  op.create(true);
+  cls_2pc_queue_init(op, queue_name, max_size);
+  ASSERT_EQ(0, ioctx.operate(queue_name, &op));
+
+  const auto invalid_reservation_op = 8;
+
+  for (auto i = 0U; i < number_of_ops; ++i) {
+    cls_2pc_reservation::id_t res_id;
+    ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, op, size_to_reserve, number_of_elements, res_id), 0);
+    ASSERT_NE(res_id, cls_2pc_reservation::NO_ID);
+    if (i == invalid_reservation_op) {
+      // aborting a reservation which does not exists
+      // is a no-op, not an error
+      cls_2pc_queue_abort(op, res_id+999);
+    } else {
+      cls_2pc_queue_abort(op, res_id);
+    }
+    ASSERT_EQ(0, ioctx.operate(queue_name, &op));
+  }
+  cls_2pc_reservations reservations;
+  ASSERT_EQ(0, cls_2pc_queue_list_reservations(ioctx, queue_name, reservations));
+  // 1 reservation was not aborted
+  ASSERT_EQ(reservations.size(), 1);
+}
+
+TEST_F(TestCls2PCQueue, MultiReserve)
+{
+  const std::string queue_name = "my-queue";
+  const auto max_size = 1024*1024;
+  const auto number_of_ops = 11U;
+  const auto number_of_elements = 23U;
+  const auto max_producer_count = 10U;
+  const auto size_to_reserve = 250U;
+  librados::ObjectWriteOperation op;
+  op.create(true);
+  cls_2pc_queue_init(op, queue_name, max_size);
+  ASSERT_EQ(0, ioctx.operate(queue_name, &op));
+
+  std::vector<std::thread> producers(max_producer_count);
+  for (auto& p : producers) {
+    p = std::thread([this, &queue_name] {
+      librados::ObjectWriteOperation op;
+         for (auto i = 0U; i < number_of_ops; ++i) {
+        cls_2pc_reservation::id_t res_id = cls_2pc_reservation::NO_ID;
+        ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, op, size_to_reserve, number_of_elements, res_id), 0);
+        ASSERT_NE(res_id, 0);
+      }
+    });
+  }
+
+  std::for_each(producers.begin(), producers.end(), [](auto& p) { p.join(); });
+
+  cls_2pc_reservations reservations;
+  ASSERT_EQ(0, cls_2pc_queue_list_reservations(ioctx, queue_name, reservations));
+  ASSERT_EQ(reservations.size(), number_of_ops*max_producer_count);
+  auto total_reservations = 0U;
+  for (const auto& r : reservations) {
+    total_reservations += r.second.size;
+  }
+  ASSERT_EQ(total_reservations, number_of_ops*max_producer_count*size_to_reserve);
+}
+
+TEST_F(TestCls2PCQueue, MultiCommit)
+{
+  const std::string queue_name = "my-queue";
+  const auto max_size = 1024*1024;
+  const auto number_of_ops = 11U;
+  const auto number_of_elements = 23U;
+  const auto max_producer_count = 10U;
+  librados::ObjectWriteOperation op;
+  op.create(true);
+  cls_2pc_queue_init(op, queue_name, max_size);
+  ASSERT_EQ(0, ioctx.operate(queue_name, &op));
+
+  std::vector<std::thread> producers(max_producer_count);
+  for (auto& p : producers) {
+    p = std::thread([this, &queue_name] {
+      librados::ObjectWriteOperation op;
+         for (auto i = 0U; i < number_of_ops; ++i) {
+        const std::string element_prefix("op-" +to_string(i) + "-element-");
+        std::vector<bufferlist> data(number_of_elements);
+        auto total_size = 0UL;
+        // create vector of buffer lists
+        std::generate(data.begin(), data.end(), [j = 0, &element_prefix, &total_size] () mutable {
+            bufferlist bl;
+            bl.append(element_prefix + to_string(j++));
+            total_size += bl.length();
+            return bl;
+          });
+        cls_2pc_reservation::id_t res_id = cls_2pc_reservation::NO_ID;
+        ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, op, total_size, number_of_elements, res_id), 0);
+        ASSERT_NE(res_id, 0);
+        cls_2pc_queue_commit(op, data, res_id);
+        ASSERT_EQ(0, ioctx.operate(queue_name, &op));
+      }
+    });
+  }
+
+  std::for_each(producers.begin(), producers.end(), [](auto& p) { p.join(); });
+
+  cls_2pc_reservations reservations;
+  ASSERT_EQ(0, cls_2pc_queue_list_reservations(ioctx, queue_name, reservations));
+  ASSERT_EQ(reservations.size(), 0);
+}
+
+TEST_F(TestCls2PCQueue, MultiAbort)
+{
+  const std::string queue_name = "my-queue";
+  const auto max_size = 1024*1024;
+  const auto number_of_ops = 11U;
+  const auto number_of_elements = 23U;
+  const auto max_producer_count = 10U;
+  const auto size_to_reserve = 250U;
+  librados::ObjectWriteOperation op;
+  op.create(true);
+  cls_2pc_queue_init(op, queue_name, max_size);
+  ASSERT_EQ(0, ioctx.operate(queue_name, &op));
+
+  std::vector<std::thread> producers(max_producer_count);
+  for (auto& p : producers) {
+    p = std::thread([this, &queue_name] {
+      librados::ObjectWriteOperation op;
+         for (auto i = 0U; i < number_of_ops; ++i) {
+        cls_2pc_reservation::id_t res_id = cls_2pc_reservation::NO_ID;
+        ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, op, size_to_reserve, number_of_elements, res_id), 0);
+        ASSERT_NE(res_id, 0);
+        cls_2pc_queue_abort(op, res_id);
+        ASSERT_EQ(0, ioctx.operate(queue_name, &op));
+      }
+    });
+  }
+
+  std::for_each(producers.begin(), producers.end(), [](auto& p) { p.join(); });
+
+  cls_2pc_reservations reservations;
+  ASSERT_EQ(0, cls_2pc_queue_list_reservations(ioctx, queue_name, reservations));
+  ASSERT_EQ(reservations.size(), 0);
+}
+
+TEST_F(TestCls2PCQueue, ReserveCommit)
+{
+  const std::string queue_name = "my-queue";
+  const auto max_size = 1024*1024;
+  const auto number_of_ops = 11U;
+  const auto number_of_elements = 23U;
+  const auto max_workers = 10U;
+  const auto size_to_reserve = 512U;
+  librados::ObjectWriteOperation op;
+  op.create(true);
+  cls_2pc_queue_init(op, queue_name, max_size);
+  ASSERT_EQ(0, ioctx.operate(queue_name, &op));
+
+  std::vector<std::thread> reservers(max_workers);
+  for (auto& r : reservers) {
+    r = std::thread([this, &queue_name] {
+      librados::ObjectWriteOperation op;
+         for (auto i = 0U; i < number_of_ops; ++i) {
+        cls_2pc_reservation::id_t res_id = cls_2pc_reservation::NO_ID;
+        ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, op, size_to_reserve, number_of_elements, res_id), 0);
+        ASSERT_NE(res_id, cls_2pc_reservation::NO_ID);
+      }
+    });
+  }
+
+  auto committer = std::thread([this, &queue_name] {
+    librados::ObjectWriteOperation op;
+    int remaining_ops = number_of_ops*max_workers;
+    while (remaining_ops > 0) {
+      const std::string element_prefix("op-" +to_string(remaining_ops) + "-element-");
+      std::vector<bufferlist> data(number_of_elements);
+      // create vector of buffer lists
+      std::generate(data.begin(), data.end(), [j = 0, &element_prefix] () mutable {
+          bufferlist bl;
+          bl.append(element_prefix + to_string(j++));
+          return bl;
+        });
+      cls_2pc_reservations reservations;
+      ASSERT_EQ(0, cls_2pc_queue_list_reservations(ioctx, queue_name, reservations));
+      for (const auto& r : reservations) {
+        cls_2pc_queue_commit(op, data, r.first);
+        ASSERT_EQ(0, ioctx.operate(queue_name, &op));
+        --remaining_ops;
+      }
+    }
+  });
+
+  std::for_each(reservers.begin(), reservers.end(), [](auto& r) { r.join(); });
+  committer.join();
+
+  cls_2pc_reservations reservations;
+  ASSERT_EQ(0, cls_2pc_queue_list_reservations(ioctx, queue_name, reservations));
+  ASSERT_EQ(reservations.size(), 0);
+}
+
+TEST_F(TestCls2PCQueue, ReserveAbort)
+{
+  const std::string queue_name = "my-queue";
+  const auto max_size = 1024*1024;
+  const auto number_of_ops = 17U;
+  const auto number_of_elements = 23U;
+  const auto max_workers = 10U;
+  const auto size_to_reserve = 250U;
+  librados::ObjectWriteOperation op;
+  op.create(true);
+  cls_2pc_queue_init(op, queue_name, max_size);
+  ASSERT_EQ(0, ioctx.operate(queue_name, &op));
+
+  std::vector<std::thread> reservers(max_workers);
+  for (auto& r : reservers) {
+    r = std::thread([this, &queue_name] {
+      librados::ObjectWriteOperation op;
+         for (auto i = 0U; i < number_of_ops; ++i) {
+        cls_2pc_reservation::id_t res_id = cls_2pc_reservation::NO_ID;
+        ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, op, size_to_reserve, number_of_elements, res_id), 0);
+        ASSERT_NE(res_id, cls_2pc_reservation::NO_ID);
+      }
+    });
+  }
+  
+  auto aborter = std::thread([this, &queue_name] {
+    librados::ObjectWriteOperation op;
+    int remaining_ops = number_of_ops*max_workers;
+    while (remaining_ops > 0) {
+      cls_2pc_reservations reservations;
+      ASSERT_EQ(0, cls_2pc_queue_list_reservations(ioctx, queue_name, reservations));
+      for (const auto& r : reservations) {
+        cls_2pc_queue_abort(op, r.first);
+        ASSERT_EQ(0, ioctx.operate(queue_name, &op));
+        --remaining_ops;
+      }
+    }
+  });
+
+  std::for_each(reservers.begin(), reservers.end(), [](auto& r) { r.join(); });
+  aborter.join();
+
+  cls_2pc_reservations reservations;
+  ASSERT_EQ(0, cls_2pc_queue_list_reservations(ioctx, queue_name, reservations));
+  ASSERT_EQ(reservations.size(), 0);
+}
+
+TEST_F(TestCls2PCQueue, Cleanup)
+{
+  const std::string queue_name = "my-queue";
+  const auto max_size = 128*1024*1024;
+  const auto number_of_ops = 17U;
+  const auto number_of_elements = 23U;
+  const auto max_workers = 10U;
+  const auto size_to_reserve = 512U;
+  librados::ObjectWriteOperation op;
+  op.create(true);
+  cls_2pc_queue_init(op, queue_name, max_size);
+  ASSERT_EQ(0, ioctx.operate(queue_name, &op));
+
+  // anything older than 100ms is cosidered stale
+  ceph::real_time stale_time = ceph::real_clock::now() + std::chrono::milliseconds(100);
+
+  std::vector<std::thread> reservers(max_workers);
+  for (auto& r : reservers) {
+    r = std::thread([this, &queue_name] {
+      librados::ObjectWriteOperation op;
+         for (auto i = 0U; i < number_of_ops; ++i) {
+        cls_2pc_reservation::id_t res_id = cls_2pc_reservation::NO_ID;
+        ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, op, size_to_reserve, number_of_elements, res_id), 0);
+        ASSERT_NE(res_id, cls_2pc_reservation::NO_ID);
+        // wait for 10ms between each reservation to make sure at least some are stale
+        std::this_thread::sleep_for(std::chrono::milliseconds(10));
+      }
+    });
+  }
+  auto cleaned_reservations = 0U;
+  auto committed_reservations = 0U;
+  auto aborter = std::thread([this, &queue_name, &stale_time, &cleaned_reservations, &committed_reservations] {
+    librados::ObjectWriteOperation op;
+    int remaining_ops = number_of_ops*max_workers;
+    while (remaining_ops > 0) {
+      cls_2pc_reservations reservations;
+      ASSERT_EQ(0, cls_2pc_queue_list_reservations(ioctx, queue_name, reservations));
+      for (const auto& r : reservations) {
+        if (r.second.timestamp > stale_time) {
+          // abort stale reservations
+          cls_2pc_queue_abort(op, r.first);
+          ASSERT_EQ(0, ioctx.operate(queue_name, &op));
+          ++cleaned_reservations;
+        } else {
+          // commit good reservations
+          const std::string element_prefix("op-" +to_string(remaining_ops) + "-element-");
+          std::vector<bufferlist> data(number_of_elements);
+          // create vector of buffer lists
+          std::generate(data.begin(), data.end(), [j = 0, &element_prefix] () mutable {
+              bufferlist bl;
+              bl.append(element_prefix + to_string(j++));
+              return bl;
+            });
+          cls_2pc_queue_commit(op, data, r.first);
+          ASSERT_EQ(0, ioctx.operate(queue_name, &op));
+          ++committed_reservations;
+        }
+        --remaining_ops;
+      }
+    }
+  });
+
+
+  std::for_each(reservers.begin(), reservers.end(), [](auto& r) { r.join(); });
+  aborter.join();
+
+  ASSERT_GT(cleaned_reservations, 0);
+  ASSERT_GT(committed_reservations, 0);
+  cls_2pc_reservations reservations;
+  ASSERT_EQ(0, cls_2pc_queue_list_reservations(ioctx, queue_name, reservations));
+  ASSERT_EQ(reservations.size(), 0);
+}
+
+TEST_F(TestCls2PCQueue, MultiProducer)
+{
+  const std::string queue_name = "my-queue";
+  const auto max_size = 128*1024*1024;
+  const auto number_of_ops = 300U;
+  const auto number_of_elements = 23U;
+  const auto max_producer_count = 10U;
+  librados::ObjectWriteOperation op;
+  op.create(true);
+  cls_2pc_queue_init(op, queue_name, max_size);
+  ASSERT_EQ(0, ioctx.operate(queue_name, &op));
+
+  auto producer_count = max_producer_count;
+
+  std::vector<std::thread> producers(max_producer_count);
+  for (auto& p : producers) {
+    p = std::thread([this, &queue_name, &producer_count] {
+      librados::ObjectWriteOperation op;
+         for (auto i = 0U; i < number_of_ops; ++i) {
+        const std::string element_prefix("op-" +to_string(i) + "-element-");
+        std::vector<bufferlist> data(number_of_elements);
+        auto total_size = 0UL;
+        // create vector of buffer lists
+        std::generate(data.begin(), data.end(), [j = 0, &element_prefix, &total_size] () mutable {
+            bufferlist bl;
+            bl.append(element_prefix + to_string(j++));
+            total_size += bl.length();
+            return bl;
+          });
+        cls_2pc_reservation::id_t res_id = cls_2pc_reservation::NO_ID;
+        ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, op, total_size, number_of_elements, res_id), 0);
+        ASSERT_NE(res_id, 0);
+        cls_2pc_queue_commit(op, data, res_id);
+        ASSERT_EQ(0, ioctx.operate(queue_name, &op));
+      }
+      --producer_count;
+    });
+  }
+
+  auto consume_count = 0U;
+  std::thread consumer([this, &queue_name, &consume_count, &producer_count] {
+          librados::ObjectWriteOperation op;
+          const auto max_elements = 42;
+          const std::string marker;
+          bool truncated = false;
+          std::string end_marker;
+          std::vector<cls_queue_entry> entries;
+          while (producer_count > 0 || truncated) {
+            const auto ret = cls_2pc_queue_list_entries(ioctx, queue_name, marker, max_elements, entries, &truncated, end_marker);
+            ASSERT_EQ(0, ret);
+            consume_count += entries.size();
+            cls_2pc_queue_remove_entries(op, end_marker); 
+            ASSERT_EQ(0, ioctx.operate(queue_name, &op));
+          }
+       });
+
+  std::for_each(producers.begin(), producers.end(), [](auto& p) { p.join(); });
+  consumer.join();
+  ASSERT_EQ(consume_count, number_of_ops*number_of_elements*max_producer_count);
+}
+
+TEST_F(TestCls2PCQueue, MultiProducerConsumer)
+{
+  const std::string queue_name = "my-queue";
+  const auto max_size = 1024*1024;
+  const auto number_of_ops = 300U;
+  const auto number_of_elements = 23U;
+  const auto max_workers = 10U;
+  librados::ObjectWriteOperation op;
+  op.create(true);
+  cls_2pc_queue_init(op, queue_name, max_size);
+  ASSERT_EQ(0, ioctx.operate(queue_name, &op));
+
+  auto producer_count = max_workers;
+
+  auto retry_happened = false;
+
+  std::vector<std::thread> producers(max_workers);
+  for (auto& p : producers) {
+    p = std::thread([this, &queue_name, &producer_count, &retry_happened] {
+      librados::ObjectWriteOperation op;
+         for (auto i = 0U; i < number_of_ops; ++i) {
+        const std::string element_prefix("op-" +to_string(i) + "-element-");
+        std::vector<bufferlist> data(number_of_elements);
+        auto total_size = 0UL;
+        // create vector of buffer lists
+        std::generate(data.begin(), data.end(), [j = 0, &element_prefix, &total_size] () mutable {
+            bufferlist bl;
+            bl.append(element_prefix + to_string(j++));
+            total_size += bl.length();
+            return bl;
+          });
+        cls_2pc_reservation::id_t res_id = cls_2pc_reservation::NO_ID;
+        auto rc = cls_2pc_queue_reserve(ioctx, queue_name, op, total_size, number_of_elements, res_id);
+        while (rc != 0) {
+          // other errors should cause test to fail
+          ASSERT_EQ(rc, -ENOSPC);
+          ASSERT_EQ(res_id, 0);
+          // queue is full, sleep and retry
+          retry_happened = true;
+          std::this_thread::sleep_for(std::chrono::milliseconds(10));
+          rc = cls_2pc_queue_reserve(ioctx, queue_name, op, total_size, number_of_elements, res_id);
+        };
+        ASSERT_NE(res_id, 0);
+        cls_2pc_queue_commit(op, data, res_id);
+        ASSERT_EQ(0, ioctx.operate(queue_name, &op));
+      }
+      --producer_count;
+    });
+  }
+
+  const auto max_elements = 128;
+  std::vector<std::thread> consumers(max_workers/2);
+  for (auto& c : consumers) {
+    c = std::thread([this, &queue_name, &producer_count] {
+          librados::ObjectWriteOperation op;
+          const std::string marker;
+          bool truncated = false;
+          std::string end_marker;
+          std::vector<cls_queue_entry> entries;
+          while (producer_count > 0 || truncated) {
+            const auto ret = cls_2pc_queue_list_entries(ioctx, queue_name, marker, max_elements, entries, &truncated, end_marker);
+            ASSERT_EQ(0, ret);
+            if (entries.empty()) {
+              // queue is empty, let it fill
+              std::this_thread::sleep_for(std::chrono::milliseconds(100));
+            } else {
+              cls_2pc_queue_remove_entries(op, end_marker); 
+              ASSERT_EQ(0, ioctx.operate(queue_name, &op));
+            }
+          }
+       });
+  }
+
+  std::for_each(producers.begin(), producers.end(), [](auto& p) { p.join(); });
+  std::for_each(consumers.begin(), consumers.end(), [](auto& c) { c.join(); });
+  if (!retry_happened) {
+      std::cerr << "Queue was never full - all reservations were sucessfull." <<
+          "Please decrease the amount of consumer threads" << std::endl;
+  }
+  // make sure that queue is empty and no reservations remain
+  cls_2pc_reservations reservations;
+  ASSERT_EQ(0, cls_2pc_queue_list_reservations(ioctx, queue_name, reservations));
+  ASSERT_EQ(reservations.size(), 0);
+  const std::string marker;
+  bool truncated = false;
+  std::string end_marker;
+  std::vector<cls_queue_entry> entries;
+  ASSERT_EQ(0, cls_2pc_queue_list_entries(ioctx, queue_name, marker, max_elements, entries, &truncated, end_marker));
+  ASSERT_EQ(entries.size(), 0);
+}
+