]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
cls/2pc_queue: add async API to read operations
authorYuval Lifshitz <ylifshit@redhat.com>
Tue, 12 May 2020 12:44:44 +0000 (15:44 +0300)
committerYuval Lifshitz <ylifshit@redhat.com>
Sun, 17 May 2020 12:15:31 +0000 (15:15 +0300)
Signed-off-by: Yuval Lifshitz <ylifshit@redhat.com>
src/cls/2pc_queue/cls_2pc_queue_client.cc
src/cls/2pc_queue/cls_2pc_queue_client.h
src/test/cls_2pc_queue/test_cls_2pc_queue.cc

index 45f31cf401a309b5d7b7616bee1e8e2b394c11a1..648988d6b80e46078e500ca2414724154fa24f0f 100644 (file)
@@ -17,6 +17,21 @@ void cls_2pc_queue_init(ObjectWriteOperation& op, const std::string& queue_name,
   op.exec(TPC_QUEUE_CLASS, TPC_QUEUE_INIT, in);
 }
 
+int cls_2pc_queue_get_capacity_result(const bufferlist& bl, uint64_t& size) {
+  cls_queue_get_capacity_ret op_ret;
+  auto iter = bl.cbegin();
+  try {
+    decode(op_ret, iter);
+  } catch (buffer::error& err) {
+    return -EIO;
+  }
+
+  size = op_ret.queue_capacity;
+
+  return 0;
+}
+
+#ifndef CLS_CLIENT_HIDE_IOCTX
 int cls_2pc_queue_get_capacity(IoCtx& io_ctx, const string& queue_name, uint64_t& size) {
   bufferlist in, out;
   const auto r = io_ctx.exec(queue_name, TPC_QUEUE_CLASS, TPC_QUEUE_GET_CAPACITY, in, out);
@@ -24,20 +39,32 @@ int cls_2pc_queue_get_capacity(IoCtx& io_ctx, const string& queue_name, uint64_t
     return r;
   }
 
-  cls_queue_get_capacity_ret op_ret;
-  auto iter = out.cbegin();
+  return cls_2pc_queue_get_capacity_result(out, size);
+}
+#endif
+
+// optionally async method for getting capacity (bytes) 
+// after answer is received, call cls_2pc_queue_get_capacity_result() to prase the results
+void cls_2pc_queue_get_capacity(ObjectReadOperation& op, bufferlist* obl, int* prval) {
+  bufferlist in;
+  op.exec(TPC_QUEUE_CLASS, TPC_QUEUE_GET_CAPACITY, in, obl, prval);
+}
+
+
+int cls_2pc_queue_reserve_result(const bufferlist& bl, cls_2pc_reservation::id_t& res_id) {
+  cls_2pc_queue_reserve_ret op_ret;
+  auto iter = bl.cbegin();
   try {
     decode(op_ret, iter);
   } catch (buffer::error& err) {
     return -EIO;
   }
-
-  size = op_ret.queue_capacity;
+  res_id = op_ret.id;
 
   return 0;
 }
 
-int cls_2pc_queue_reserve(IoCtx& io_ctx, const string& queue_name, librados::ObjectWriteOperation& op, 
+int cls_2pc_queue_reserve(IoCtx& io_ctx, const string& queue_name, 
         uint64_t res_size, uint32_t entries, cls_2pc_reservation::id_t& res_id) {
   bufferlist in, out;
   cls_2pc_queue_reserve_op reserve_op;
@@ -46,22 +73,25 @@ int cls_2pc_queue_reserve(IoCtx& io_ctx, const string& queue_name, librados::Obj
 
   encode(reserve_op, in);
   int rval;
+  ObjectWriteOperation op;
   op.exec(TPC_QUEUE_CLASS, TPC_QUEUE_RESERVE, in, &out, &rval);
   const auto r = io_ctx.operate(queue_name, &op, librados::OPERATION_RETURNVEC);
+
   if (r < 0) {
     return r;
   }
+  
+  return cls_2pc_queue_reserve_result(out, res_id);
+}
 
-  cls_2pc_queue_reserve_ret op_ret;
-  auto iter = out.cbegin();
-  try {
-    decode(op_ret, iter);
-  } catch (buffer::error& err) {
-    return -EIO;
-  }
-  res_id = op_ret.id;
-
-  return 0;
+void cls_2pc_queue_reserve(ObjectWriteOperation& op, uint64_t res_size, 
+    uint32_t entries, bufferlist* obl, int* prval) {
+  bufferlist in;
+  cls_2pc_queue_reserve_op reserve_op;
+  reserve_op.size = res_size;
+  reserve_op.entries = entries;
+  encode(reserve_op, in);
+  op.exec(TPC_QUEUE_CLASS, TPC_QUEUE_RESERVE, in, obl, prval);
 }
 
 void cls_2pc_queue_commit(ObjectWriteOperation& op, std::vector<bufferlist> bl_data_vec, 
@@ -82,22 +112,10 @@ void cls_2pc_queue_abort(ObjectWriteOperation& op, cls_2pc_reservation::id_t res
   op.exec(TPC_QUEUE_CLASS, TPC_QUEUE_ABORT, in);
 }
 
-int cls_2pc_queue_list_entries(IoCtx& io_ctx, const string& queue_name, const string& marker, uint32_t max,
-                            std::vector<cls_queue_entry>& entries,
+int cls_2pc_queue_list_entries_result(const bufferlist& bl, std::vector<cls_queue_entry>& entries,
                             bool *truncated, std::string& next_marker) {
-  bufferlist in, out;
-  cls_queue_list_op op;
-  op.start_marker = marker;
-  op.max = max;
-  encode(op, in);
-
-  const auto r = io_ctx.exec(queue_name, TPC_QUEUE_CLASS, TPC_QUEUE_LIST_ENTRIES, in, out);
-  if (r < 0) {
-    return r;
-  }
-
   cls_queue_list_ret ret;
-  auto iter = out.cbegin();
+  auto iter = bl.cbegin();
   try {
     decode(ret, iter);
   } catch (buffer::error& err) {
@@ -112,16 +130,37 @@ int cls_2pc_queue_list_entries(IoCtx& io_ctx, const string& queue_name, const st
   return 0;
 }
 
-int cls_2pc_queue_list_reservations(librados::IoCtx& io_ctx, const std::string& queue_name, cls_2pc_reservations& reservations) {
+#ifndef CLS_CLIENT_HIDE_IOCTX
+int cls_2pc_queue_list_entries(IoCtx& io_ctx, const string& queue_name, const string& marker, uint32_t max,
+                            std::vector<cls_queue_entry>& entries,
+                            bool *truncated, std::string& next_marker) {
   bufferlist in, out;
+  cls_queue_list_op op;
+  op.start_marker = marker;
+  op.max = max;
+  encode(op, in);
 
-  const auto r = io_ctx.exec(queue_name, TPC_QUEUE_CLASS, TPC_QUEUE_LIST_RESERVATIONS, in, out);
+  const auto r  = io_ctx.exec(queue_name, TPC_QUEUE_CLASS, TPC_QUEUE_LIST_ENTRIES, in, out);
   if (r < 0) {
     return r;
   }
+  return cls_2pc_queue_list_entries_result(out, entries, truncated, next_marker);
+}
+#endif
+
+void cls_2pc_queue_list_entries(ObjectReadOperation& op, const std::string& marker, uint32_t max, bufferlist* obl, int* prval) {
+  bufferlist in;
+  cls_queue_list_op list_op;
+  list_op.start_marker = marker;
+  list_op.max = max;
+  encode(list_op, in);
 
+  op.exec(TPC_QUEUE_CLASS, TPC_QUEUE_LIST_ENTRIES, in, obl, prval);
+}
+
+int cls_2pc_queue_list_reservations_result(const bufferlist& bl, cls_2pc_reservations& reservations) {
   cls_2pc_queue_reservations_ret ret;
-  auto iter = out.cbegin();
+  auto iter = bl.cbegin();
   try {
     decode(ret, iter);
   } catch (buffer::error& err) {
@@ -133,6 +172,24 @@ int cls_2pc_queue_list_reservations(librados::IoCtx& io_ctx, const std::string&
   return 0;
 }
 
+#ifndef CLS_CLIENT_HIDE_IOCTX
+int cls_2pc_queue_list_reservations(IoCtx& io_ctx, const std::string& queue_name, cls_2pc_reservations& reservations) {
+  bufferlist in, out;
+
+  const auto r = io_ctx.exec(queue_name, TPC_QUEUE_CLASS, TPC_QUEUE_LIST_RESERVATIONS, in, out);
+  if (r < 0) {
+    return r;
+  }
+  return cls_2pc_queue_list_reservations_result(out, reservations);
+}
+#endif
+
+void cls_2pc_queue_list_reservations(ObjectReadOperation& op, bufferlist* obl, int* prval) {
+  bufferlist in;
+
+  op.exec(TPC_QUEUE_CLASS, TPC_QUEUE_LIST_RESERVATIONS, in, obl, prval);
+}
+
 void cls_2pc_queue_remove_entries(ObjectWriteOperation& op, const std::string& end_marker) {
   bufferlist in;
   cls_queue_remove_op rem_op;
index 6b408e113f6936c029719548a1bf5fa510ad3026..a64805a191c2dacab6c65648da3d861131774719 100644 (file)
 // and more may be allocated as xattrs of the object (depending with the number of concurrent reservations)
 void cls_2pc_queue_init(librados::ObjectWriteOperation& op, const std::string& queue_name, uint64_t size);
 
-// return max capacity (bytes)
+// these overloads which call io_ctx.operate() or io_ctx.exec() should not be called in the rgw.
+// rgw_rados_operate() should be called after the overloads w/o calls to io_ctx.operate()/exec()
+#ifndef CLS_CLIENT_HIDE_IOCTX
+// return capacity (bytes)
 int cls_2pc_queue_get_capacity(librados::IoCtx& io_ctx, const string& queue_name, uint64_t& size);
 
 // make a reservation on the queue (in bytes) and number of expected entries (to calculate overhead)
 // return a reservation id if reservations is possible, 0 otherwise
-int cls_2pc_queue_reserve(librados::IoCtx& io_ctx, const std::string& queue_name, librados::ObjectWriteOperation& op, 
+int cls_2pc_queue_reserve(librados::IoCtx& io_ctx, const std::string& queue_name, 
         uint64_t res_size, uint32_t entries, cls_2pc_reservation::id_t& res_id);
 
+// incremental listing of all entries in the queue
+int cls_2pc_queue_list_entries(librados::IoCtx& io_ctx, const std::string& queue_name, const std::string& marker, uint32_t max,
+        std::vector<cls_queue_entry>& entries, bool *truncated, std::string& next_marker);
+
+// list all pending reservations in the queue
+int cls_2pc_queue_list_reservations(librados::IoCtx& io_ctx, const std::string& queue_name, cls_2pc_reservations& reservations);
+#endif
+
+// optionally async method for getting capacity (bytes) 
+// after answer is received, call cls_2pc_queue_get_capacity_result() to parse the results
+void cls_2pc_queue_get_capacity(librados::ObjectReadOperation& op,  bufferlist* obl, int* prval);
+
+int cls_2pc_queue_get_capacity_result(const bufferlist& bl, uint64_t& size);
+
+// optionally async method for making a reservation on the queue (in bytes) and number of expected entries (to calculate overhead)
+// notes: 
+// (1) make sure that librados::OPERATION_RETURNVEC is passed to the executing function
+// (2) multiple operations cannot be executed in a batch (operations both read and write)
+// after answer is received, call cls_2pc_queue_reserve_result() to parse the results
+void cls_2pc_queue_reserve(librados::ObjectWriteOperation& op, uint64_t res_size, 
+    uint32_t entries, bufferlist* obl, int* prval);
+
+int cls_2pc_queue_reserve_result(const bufferlist& bl, cls_2pc_reservation::id_t& res_id);
+
 // commit data using a reservation done beforehand
 // res_id must be allocated using cls_2pc_queue_reserve, and could be either committed or aborted once
 // the size of bl_data_vec must be equal or smaller to the size reserved for the res_id
@@ -35,12 +62,18 @@ void cls_2pc_queue_commit(librados::ObjectWriteOperation& op, std::vector<buffer
 void cls_2pc_queue_abort(librados::ObjectWriteOperation& op, 
         cls_2pc_reservation::id_t res_id);
 
-// incremental listing of all entries in the queue
-int cls_2pc_queue_list_entries(librados::IoCtx& io_ctx, const std::string& queue_name, const std::string& marker, uint32_t max,
-        std::vector<cls_queue_entry>& entries, bool *truncated, std::string& next_marker);
+// optionally async incremental listing of all entries in the queue
+// after answer is received, call cls_2pc_queue_list_entries_result() to parse the results
+void cls_2pc_queue_list_entries(librados::ObjectReadOperation& op, const std::string& marker, uint32_t max, bufferlist* obl, int* prval);
 
-// list all pending reservations in the queue
-int cls_2pc_queue_list_reservations(librados::IoCtx& io_ctx, const std::string& queue_name, cls_2pc_reservations& reservations);
+int cls_2pc_queue_list_entries_result(const bufferlist& bl, std::vector<cls_queue_entry>& entries,
+                            bool *truncated, std::string& next_marker);
+
+// optionally async listing of all pending reservations in the queue
+// after answer is received, call cls_2pc_queue_list_reservations_result() to parse the results
+void cls_2pc_queue_list_reservations(librados::ObjectReadOperation& op, bufferlist* obl, int* prval);
+
+int cls_2pc_queue_list_reservations_result(const librados::bufferlist& bl, cls_2pc_reservations& reservations);
 
 // remove all entries up to the given marker
 void cls_2pc_queue_remove_entries(librados::ObjectWriteOperation& op, const std::string& end_marker);
index 9b325b258d079b9324529d985d533d7456add7ea..05db8122a6676dcc9c3930c3dbe743f0a368cf30 100644 (file)
@@ -39,7 +39,7 @@ protected:
 
 TEST_F(TestCls2PCQueue, GetCapacity)
 {
-  const std::string queue_name = "my-queue";
+  const std::string queue_name = __PRETTY_FUNCTION__;
   const auto max_size = 8*1024;
   librados::ObjectWriteOperation op;
   op.create(true);
@@ -53,9 +53,29 @@ TEST_F(TestCls2PCQueue, GetCapacity)
   ASSERT_EQ(max_size, size);
 }
 
+TEST_F(TestCls2PCQueue, AsyncGetCapacity)
+{
+  const std::string queue_name = __PRETTY_FUNCTION__;
+  const auto max_size = 8*1024;
+  librados::ObjectWriteOperation wop;
+  wop.create(true);
+  cls_2pc_queue_init(wop, queue_name, max_size);
+  ASSERT_EQ(0, ioctx.operate(queue_name, &wop));
+
+  librados::ObjectReadOperation rop;
+  bufferlist bl;
+  int rc;
+  cls_2pc_queue_get_capacity(rop, &bl, &rc);
+  ASSERT_EQ(0, ioctx.operate(queue_name, &rop, nullptr));
+  ASSERT_EQ(0, rc);
+  uint64_t size;
+  ASSERT_EQ(cls_2pc_queue_get_capacity_result(bl, size), 0);
+  ASSERT_EQ(max_size, size);
+}
+
 TEST_F(TestCls2PCQueue, Reserve)
 {
-  const std::string queue_name = "my-queue";
+  const std::string queue_name = __PRETTY_FUNCTION__;
   const auto max_size = 1024U*1024U;
   const auto number_of_ops = 10U;
   const auto number_of_elements = 23U;
@@ -67,8 +87,8 @@ TEST_F(TestCls2PCQueue, Reserve)
 
   for (auto i = 0U; i < number_of_ops; ++i) {
     cls_2pc_reservation::id_t res_id;
-    ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, op, size_to_reserve, number_of_elements, res_id), 0);
-    ASSERT_NE(res_id, cls_2pc_reservation::NO_ID);
+    ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, size_to_reserve, number_of_elements, res_id), 0);
+    ASSERT_EQ(res_id, i+1);
   }
   cls_2pc_reservations reservations;
   ASSERT_EQ(0, cls_2pc_queue_list_reservations(ioctx, queue_name, reservations));
@@ -79,9 +99,47 @@ TEST_F(TestCls2PCQueue, Reserve)
   }
 }
 
+TEST_F(TestCls2PCQueue, AsyncReserve)
+{
+  const std::string queue_name = __PRETTY_FUNCTION__;
+  const auto max_size = 1024U*1024U;
+  constexpr auto number_of_ops = 10U;
+  constexpr auto number_of_elements = 23U;
+  const auto size_to_reserve = 250U;
+  librados::ObjectWriteOperation wop;
+  wop.create(true);
+  cls_2pc_queue_init(wop, queue_name, max_size);
+  ASSERT_EQ(0, ioctx.operate(queue_name, &wop));
+
+  for (auto i = 0U; i < number_of_ops; ++i) {
+    bufferlist res_bl;
+    int res_rc;
+    cls_2pc_queue_reserve(wop, size_to_reserve, number_of_elements, &res_bl, &res_rc);
+    ASSERT_EQ(0, ioctx.operate(queue_name, &wop, librados::OPERATION_RETURNVEC));
+    ASSERT_EQ(res_rc, 0);
+    cls_2pc_reservation::id_t res_id;
+    ASSERT_EQ(0, cls_2pc_queue_reserve_result(res_bl, res_id));
+    ASSERT_EQ(res_id, i+1);
+  }
+
+  bufferlist bl;
+  int rc;
+  librados::ObjectReadOperation rop;
+  cls_2pc_queue_list_reservations(rop, &bl, &rc);
+  ASSERT_EQ(0, ioctx.operate(queue_name, &rop, nullptr));
+  ASSERT_EQ(0, rc);
+  cls_2pc_reservations reservations;
+  ASSERT_EQ(0, cls_2pc_queue_list_reservations_result(bl, reservations));
+  ASSERT_EQ(reservations.size(), number_of_ops);
+  for (const auto& r : reservations) {
+      ASSERT_NE(r.first, cls_2pc_reservation::NO_ID);
+      ASSERT_GT(r.second.timestamp.time_since_epoch().count(), 0);
+  }
+}
+
 TEST_F(TestCls2PCQueue, Commit)
 {
-  const std::string queue_name = "my-queue";
+  const std::string queue_name = __PRETTY_FUNCTION__;
   const auto max_size = 1024*1024*128;
   const auto number_of_ops = 200U;
   const auto number_of_elements = 23U;
@@ -103,7 +161,7 @@ TEST_F(TestCls2PCQueue, Commit)
         });
 
     cls_2pc_reservation::id_t res_id;
-    ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, op, total_size, number_of_elements, res_id), 0);
+    ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, total_size, number_of_elements, res_id), 0);
     ASSERT_NE(res_id, cls_2pc_reservation::NO_ID);
     cls_2pc_queue_commit(op, data, res_id);
     ASSERT_EQ(0, ioctx.operate(queue_name, &op));
@@ -115,7 +173,7 @@ TEST_F(TestCls2PCQueue, Commit)
 
 TEST_F(TestCls2PCQueue, Abort)
 {
-  const std::string queue_name = "my-queue";
+  const std::string queue_name = __PRETTY_FUNCTION__;
   const auto max_size = 1024U*1024U;
   const auto number_of_ops = 17U;
   const auto number_of_elements = 23U;
@@ -127,7 +185,7 @@ TEST_F(TestCls2PCQueue, Abort)
 
   for (auto i = 0U; i < number_of_ops; ++i) {
     cls_2pc_reservation::id_t res_id;
-    ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, op, size_to_reserve, number_of_elements, res_id), 0);
+    ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, size_to_reserve, number_of_elements, res_id), 0);
     ASSERT_NE(res_id, cls_2pc_reservation::NO_ID);
     cls_2pc_queue_abort(op, res_id);
     ASSERT_EQ(0, ioctx.operate(queue_name, &op));
@@ -139,7 +197,7 @@ TEST_F(TestCls2PCQueue, Abort)
 
 TEST_F(TestCls2PCQueue, ReserveError)
 {
-  const std::string queue_name = "my-queue";
+  const std::string queue_name = __PRETTY_FUNCTION__;
   const auto max_size = 256U*1024U;
   const auto number_of_ops = 254U;
   const auto number_of_elements = 1U;
@@ -151,18 +209,18 @@ TEST_F(TestCls2PCQueue, ReserveError)
 
   cls_2pc_reservation::id_t res_id;
   for (auto i = 0U; i < number_of_ops-1; ++i) {
-    ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, op, size_to_reserve, number_of_elements, res_id), 0);
+    ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, size_to_reserve, number_of_elements, res_id), 0);
     ASSERT_NE(res_id, cls_2pc_reservation::NO_ID);
   }
   res_id = cls_2pc_reservation::NO_ID;
   // this one is failing because it exceeds the queue size
-  ASSERT_NE(cls_2pc_queue_reserve(ioctx, queue_name, op, size_to_reserve, number_of_elements, res_id), 0);
+  ASSERT_NE(cls_2pc_queue_reserve(ioctx, queue_name, size_to_reserve, number_of_elements, res_id), 0);
   ASSERT_EQ(res_id, cls_2pc_reservation::NO_ID);
 
   // this one is failing because it tries to reserve 0 entries
-  ASSERT_NE(cls_2pc_queue_reserve(ioctx, queue_name, op, size_to_reserve, 0, res_id), 0);
+  ASSERT_NE(cls_2pc_queue_reserve(ioctx, queue_name, size_to_reserve, 0, res_id), 0);
   // this one is failing because it tries to reserve 0 bytes
-  ASSERT_NE(cls_2pc_queue_reserve(ioctx, queue_name, op, 0, number_of_elements, res_id), 0);
+  ASSERT_NE(cls_2pc_queue_reserve(ioctx, queue_name, 0, number_of_elements, res_id), 0);
 
   cls_2pc_reservations reservations;
   ASSERT_EQ(0, cls_2pc_queue_list_reservations(ioctx, queue_name, reservations));
@@ -175,7 +233,7 @@ TEST_F(TestCls2PCQueue, ReserveError)
 
 TEST_F(TestCls2PCQueue, CommitError)
 {
-  const std::string queue_name = "my-queue";
+  const std::string queue_name = __PRETTY_FUNCTION__;
   const auto max_size = 1024*1024;
   const auto number_of_ops = 17U;
   const auto number_of_elements = 23U;
@@ -206,7 +264,7 @@ TEST_F(TestCls2PCQueue, CommitError)
         });
 
     cls_2pc_reservation::id_t res_id;
-    ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, op, total_size, number_of_elements, res_id), 0);
+    ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, total_size, number_of_elements, res_id), 0);
     ASSERT_NE(res_id, cls_2pc_reservation::NO_ID);
     if (i == invalid_reservation_op) {
       // fail on a commits with invalid reservation id
@@ -229,7 +287,7 @@ TEST_F(TestCls2PCQueue, CommitError)
 
 TEST_F(TestCls2PCQueue, AbortError)
 {
-  const std::string queue_name = "my-queue";
+  const std::string queue_name = __PRETTY_FUNCTION__;
   const auto max_size = 1024*1024;
   const auto number_of_ops = 17U;
   const auto number_of_elements = 23U;
@@ -243,7 +301,7 @@ TEST_F(TestCls2PCQueue, AbortError)
 
   for (auto i = 0U; i < number_of_ops; ++i) {
     cls_2pc_reservation::id_t res_id;
-    ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, op, size_to_reserve, number_of_elements, res_id), 0);
+    ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, size_to_reserve, number_of_elements, res_id), 0);
     ASSERT_NE(res_id, cls_2pc_reservation::NO_ID);
     if (i == invalid_reservation_op) {
       // aborting a reservation which does not exists
@@ -262,7 +320,7 @@ TEST_F(TestCls2PCQueue, AbortError)
 
 TEST_F(TestCls2PCQueue, MultiReserve)
 {
-  const std::string queue_name = "my-queue";
+  const std::string queue_name = __PRETTY_FUNCTION__;
   const auto max_size = 1024*1024;
   const auto number_of_ops = 11U;
   const auto number_of_elements = 23U;
@@ -279,7 +337,7 @@ TEST_F(TestCls2PCQueue, MultiReserve)
       librados::ObjectWriteOperation op;
          for (auto i = 0U; i < number_of_ops; ++i) {
         cls_2pc_reservation::id_t res_id = cls_2pc_reservation::NO_ID;
-        ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, op, size_to_reserve, number_of_elements, res_id), 0);
+        ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, size_to_reserve, number_of_elements, res_id), 0);
         ASSERT_NE(res_id, 0);
       }
     });
@@ -299,7 +357,7 @@ TEST_F(TestCls2PCQueue, MultiReserve)
 
 TEST_F(TestCls2PCQueue, MultiCommit)
 {
-  const std::string queue_name = "my-queue";
+  const std::string queue_name = __PRETTY_FUNCTION__;
   const auto max_size = 1024*1024;
   const auto number_of_ops = 11U;
   const auto number_of_elements = 23U;
@@ -325,7 +383,7 @@ TEST_F(TestCls2PCQueue, MultiCommit)
             return bl;
           });
         cls_2pc_reservation::id_t res_id = cls_2pc_reservation::NO_ID;
-        ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, op, total_size, number_of_elements, res_id), 0);
+        ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, total_size, number_of_elements, res_id), 0);
         ASSERT_NE(res_id, 0);
         cls_2pc_queue_commit(op, data, res_id);
         ASSERT_EQ(0, ioctx.operate(queue_name, &op));
@@ -342,7 +400,7 @@ TEST_F(TestCls2PCQueue, MultiCommit)
 
 TEST_F(TestCls2PCQueue, MultiAbort)
 {
-  const std::string queue_name = "my-queue";
+  const std::string queue_name = __PRETTY_FUNCTION__;
   const auto max_size = 1024*1024;
   const auto number_of_ops = 11U;
   const auto number_of_elements = 23U;
@@ -359,7 +417,7 @@ TEST_F(TestCls2PCQueue, MultiAbort)
       librados::ObjectWriteOperation op;
          for (auto i = 0U; i < number_of_ops; ++i) {
         cls_2pc_reservation::id_t res_id = cls_2pc_reservation::NO_ID;
-        ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, op, size_to_reserve, number_of_elements, res_id), 0);
+        ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, size_to_reserve, number_of_elements, res_id), 0);
         ASSERT_NE(res_id, 0);
         cls_2pc_queue_abort(op, res_id);
         ASSERT_EQ(0, ioctx.operate(queue_name, &op));
@@ -376,7 +434,7 @@ TEST_F(TestCls2PCQueue, MultiAbort)
 
 TEST_F(TestCls2PCQueue, ReserveCommit)
 {
-  const std::string queue_name = "my-queue";
+  const std::string queue_name = __PRETTY_FUNCTION__;
   const auto max_size = 1024*1024;
   const auto number_of_ops = 11U;
   const auto number_of_elements = 23U;
@@ -393,7 +451,7 @@ TEST_F(TestCls2PCQueue, ReserveCommit)
       librados::ObjectWriteOperation op;
          for (auto i = 0U; i < number_of_ops; ++i) {
         cls_2pc_reservation::id_t res_id = cls_2pc_reservation::NO_ID;
-        ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, op, size_to_reserve, number_of_elements, res_id), 0);
+        ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, size_to_reserve, number_of_elements, res_id), 0);
         ASSERT_NE(res_id, cls_2pc_reservation::NO_ID);
       }
     });
@@ -431,7 +489,7 @@ TEST_F(TestCls2PCQueue, ReserveCommit)
 
 TEST_F(TestCls2PCQueue, ReserveAbort)
 {
-  const std::string queue_name = "my-queue";
+  const std::string queue_name = __PRETTY_FUNCTION__;
   const auto max_size = 1024*1024;
   const auto number_of_ops = 17U;
   const auto number_of_elements = 23U;
@@ -448,7 +506,7 @@ TEST_F(TestCls2PCQueue, ReserveAbort)
       librados::ObjectWriteOperation op;
          for (auto i = 0U; i < number_of_ops; ++i) {
         cls_2pc_reservation::id_t res_id = cls_2pc_reservation::NO_ID;
-        ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, op, size_to_reserve, number_of_elements, res_id), 0);
+        ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, size_to_reserve, number_of_elements, res_id), 0);
         ASSERT_NE(res_id, cls_2pc_reservation::NO_ID);
       }
     });
@@ -478,7 +536,7 @@ TEST_F(TestCls2PCQueue, ReserveAbort)
 
 TEST_F(TestCls2PCQueue, Cleanup)
 {
-  const std::string queue_name = "my-queue";
+  const std::string queue_name = __PRETTY_FUNCTION__;
   const auto max_size = 128*1024*1024;
   const auto number_of_ops = 17U;
   const auto number_of_elements = 23U;
@@ -498,7 +556,7 @@ TEST_F(TestCls2PCQueue, Cleanup)
       librados::ObjectWriteOperation op;
          for (auto i = 0U; i < number_of_ops; ++i) {
         cls_2pc_reservation::id_t res_id = cls_2pc_reservation::NO_ID;
-        ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, op, size_to_reserve, number_of_elements, res_id), 0);
+        ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, size_to_reserve, number_of_elements, res_id), 0);
         ASSERT_NE(res_id, cls_2pc_reservation::NO_ID);
         // wait for 10ms between each reservation to make sure at least some are stale
         std::this_thread::sleep_for(std::chrono::milliseconds(10));
@@ -552,7 +610,7 @@ TEST_F(TestCls2PCQueue, Cleanup)
 
 TEST_F(TestCls2PCQueue, MultiProducer)
 {
-  const std::string queue_name = "my-queue";
+  const std::string queue_name = __PRETTY_FUNCTION__;
   const auto max_size = 128*1024*1024;
   const auto number_of_ops = 300U;
   const auto number_of_elements = 23U;
@@ -580,7 +638,7 @@ TEST_F(TestCls2PCQueue, MultiProducer)
             return bl;
           });
         cls_2pc_reservation::id_t res_id = cls_2pc_reservation::NO_ID;
-        ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, op, total_size, number_of_elements, res_id), 0);
+        ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, total_size, number_of_elements, res_id), 0);
         ASSERT_NE(res_id, 0);
         cls_2pc_queue_commit(op, data, res_id);
         ASSERT_EQ(0, ioctx.operate(queue_name, &op));
@@ -611,9 +669,66 @@ TEST_F(TestCls2PCQueue, MultiProducer)
   ASSERT_EQ(consume_count, number_of_ops*number_of_elements*max_producer_count);
 }
 
+TEST_F(TestCls2PCQueue, AsyncConsumer)
+{
+  const std::string queue_name = __PRETTY_FUNCTION__;
+  constexpr auto max_size = 128*1024*1024;
+  constexpr auto number_of_ops = 250U;
+  constexpr auto number_of_elements = 23U;
+  librados::ObjectWriteOperation wop;
+  wop.create(true);
+  cls_2pc_queue_init(wop, queue_name, max_size);
+  ASSERT_EQ(0, ioctx.operate(queue_name, &wop));
+
+
+  for (auto i = 0U; i < number_of_ops; ++i) {
+    const std::string element_prefix("op-" +to_string(i) + "-element-");
+    std::vector<bufferlist> data(number_of_elements);
+    auto total_size = 0UL;
+    // create vector of buffer lists
+    std::generate(data.begin(), data.end(), [j = 0, &element_prefix, &total_size] () mutable {
+        bufferlist bl;
+        bl.append(element_prefix + to_string(j++));
+        total_size += bl.length();
+        return bl;
+        });
+    cls_2pc_reservation::id_t res_id = cls_2pc_reservation::NO_ID;
+    ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, total_size, number_of_elements, res_id), 0);
+    ASSERT_NE(res_id, 0);
+    cls_2pc_queue_commit(wop, data, res_id);
+    ASSERT_EQ(0, ioctx.operate(queue_name, &wop));
+  }
+
+  constexpr auto max_elements = 42;
+  std::string marker;
+  std::string end_marker;
+  librados::ObjectReadOperation rop;
+  auto consume_count = 0U;
+  std::vector<cls_queue_entry> entries;
+       bool truncated = true;
+  while (truncated) {
+               bufferlist bl;
+               int rc;
+    cls_2pc_queue_list_entries(rop, marker, max_elements, &bl, &rc);
+    ASSERT_EQ(0, ioctx.operate(queue_name, &rop, nullptr));
+    ASSERT_EQ(rc, 0);
+    ASSERT_EQ(cls_2pc_queue_list_entries_result(bl, entries, &truncated, end_marker), 0);
+    consume_count += entries.size();
+    cls_2pc_queue_remove_entries(wop, end_marker); 
+               marker = end_marker;
+  }
+
+  ASSERT_EQ(consume_count, number_of_ops*number_of_elements);
+       // execute all delete operations in a batch
+  ASSERT_EQ(0, ioctx.operate(queue_name, &wop));
+  // make sure that queue is empty
+  ASSERT_EQ(cls_2pc_queue_list_entries(ioctx, queue_name, marker, max_elements, entries, &truncated, end_marker), 0);
+  ASSERT_EQ(entries.size(), 0);
+}
+
 TEST_F(TestCls2PCQueue, MultiProducerConsumer)
 {
-  const std::string queue_name = "my-queue";
+  const std::string queue_name = __PRETTY_FUNCTION__;
   const auto max_size = 1024*1024;
   const auto number_of_ops = 300U;
   const auto number_of_elements = 23U;
@@ -643,7 +758,7 @@ TEST_F(TestCls2PCQueue, MultiProducerConsumer)
             return bl;
           });
         cls_2pc_reservation::id_t res_id = cls_2pc_reservation::NO_ID;
-        auto rc = cls_2pc_queue_reserve(ioctx, queue_name, op, total_size, number_of_elements, res_id);
+        auto rc = cls_2pc_queue_reserve(ioctx, queue_name, total_size, number_of_elements, res_id);
         while (rc != 0) {
           // other errors should cause test to fail
           ASSERT_EQ(rc, -ENOSPC);
@@ -651,7 +766,7 @@ TEST_F(TestCls2PCQueue, MultiProducerConsumer)
           // queue is full, sleep and retry
           retry_happened = true;
           std::this_thread::sleep_for(std::chrono::milliseconds(10));
-          rc = cls_2pc_queue_reserve(ioctx, queue_name, op, total_size, number_of_elements, res_id);
+          rc = cls_2pc_queue_reserve(ioctx, queue_name, total_size, number_of_elements, res_id);
         };
         ASSERT_NE(res_id, 0);
         cls_2pc_queue_commit(op, data, res_id);
@@ -704,7 +819,7 @@ TEST_F(TestCls2PCQueue, MultiProducerConsumer)
 
 TEST_F(TestCls2PCQueue, ReserveSpillover)
 {
-  const std::string queue_name = "my-queue";
+  const std::string queue_name = __PRETTY_FUNCTION__;
   const auto max_size = 1024U*1024U;
   const auto number_of_ops = 1024U;
   const auto number_of_elements = 8U;
@@ -716,7 +831,7 @@ TEST_F(TestCls2PCQueue, ReserveSpillover)
 
   for (auto i = 0U; i < number_of_ops; ++i) {
     cls_2pc_reservation::id_t res_id;
-    ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, op, size_to_reserve, number_of_elements, res_id), 0);
+    ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, size_to_reserve, number_of_elements, res_id), 0);
     ASSERT_NE(res_id, cls_2pc_reservation::NO_ID);
   }
   cls_2pc_reservations reservations;
@@ -730,7 +845,7 @@ TEST_F(TestCls2PCQueue, ReserveSpillover)
 
 TEST_F(TestCls2PCQueue, CommitSpillover)
 {
-  const std::string queue_name = "my-queue";
+  const std::string queue_name = __PRETTY_FUNCTION__;
   const auto max_size = 1024U*1024U;
   const auto number_of_ops = 1024U;
   const auto number_of_elements = 4U;
@@ -742,7 +857,7 @@ TEST_F(TestCls2PCQueue, CommitSpillover)
 
   for (auto i = 0U; i < number_of_ops; ++i) {
     cls_2pc_reservation::id_t res_id;
-    ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, op, size_to_reserve, number_of_elements, res_id), 0);
+    ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, size_to_reserve, number_of_elements, res_id), 0);
     ASSERT_NE(res_id, cls_2pc_reservation::NO_ID);
   }
   cls_2pc_reservations reservations;
@@ -768,7 +883,7 @@ TEST_F(TestCls2PCQueue, CommitSpillover)
 
 TEST_F(TestCls2PCQueue, AbortSpillover)
 {
-  const std::string queue_name = "my-queue";
+  const std::string queue_name = __PRETTY_FUNCTION__;
   const auto max_size = 1024U*1024U;
   const auto number_of_ops = 1024U;
   const auto number_of_elements = 4U;
@@ -780,7 +895,7 @@ TEST_F(TestCls2PCQueue, AbortSpillover)
 
   for (auto i = 0U; i < number_of_ops; ++i) {
     cls_2pc_reservation::id_t res_id;
-    ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, op, size_to_reserve, number_of_elements, res_id), 0);
+    ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, size_to_reserve, number_of_elements, res_id), 0);
     ASSERT_NE(res_id, cls_2pc_reservation::NO_ID);
   }
   cls_2pc_reservations reservations;