op.exec(TPC_QUEUE_CLASS, TPC_QUEUE_INIT, in);
}
+int cls_2pc_queue_get_capacity_result(const bufferlist& bl, uint64_t& size) {
+ cls_queue_get_capacity_ret op_ret;
+ auto iter = bl.cbegin();
+ try {
+ decode(op_ret, iter);
+ } catch (buffer::error& err) {
+ return -EIO;
+ }
+
+ size = op_ret.queue_capacity;
+
+ return 0;
+}
+
+#ifndef CLS_CLIENT_HIDE_IOCTX
int cls_2pc_queue_get_capacity(IoCtx& io_ctx, const string& queue_name, uint64_t& size) {
bufferlist in, out;
const auto r = io_ctx.exec(queue_name, TPC_QUEUE_CLASS, TPC_QUEUE_GET_CAPACITY, in, out);
return r;
}
- cls_queue_get_capacity_ret op_ret;
- auto iter = out.cbegin();
+ return cls_2pc_queue_get_capacity_result(out, size);
+}
+#endif
+
+// optionally async method for getting capacity (bytes)
+// after answer is received, call cls_2pc_queue_get_capacity_result() to prase the results
+void cls_2pc_queue_get_capacity(ObjectReadOperation& op, bufferlist* obl, int* prval) {
+ bufferlist in;
+ op.exec(TPC_QUEUE_CLASS, TPC_QUEUE_GET_CAPACITY, in, obl, prval);
+}
+
+
+int cls_2pc_queue_reserve_result(const bufferlist& bl, cls_2pc_reservation::id_t& res_id) {
+ cls_2pc_queue_reserve_ret op_ret;
+ auto iter = bl.cbegin();
try {
decode(op_ret, iter);
} catch (buffer::error& err) {
return -EIO;
}
-
- size = op_ret.queue_capacity;
+ res_id = op_ret.id;
return 0;
}
-int cls_2pc_queue_reserve(IoCtx& io_ctx, const string& queue_name, librados::ObjectWriteOperation& op,
+int cls_2pc_queue_reserve(IoCtx& io_ctx, const string& queue_name,
uint64_t res_size, uint32_t entries, cls_2pc_reservation::id_t& res_id) {
bufferlist in, out;
cls_2pc_queue_reserve_op reserve_op;
encode(reserve_op, in);
int rval;
+ ObjectWriteOperation op;
op.exec(TPC_QUEUE_CLASS, TPC_QUEUE_RESERVE, in, &out, &rval);
const auto r = io_ctx.operate(queue_name, &op, librados::OPERATION_RETURNVEC);
+
if (r < 0) {
return r;
}
+
+ return cls_2pc_queue_reserve_result(out, res_id);
+}
- cls_2pc_queue_reserve_ret op_ret;
- auto iter = out.cbegin();
- try {
- decode(op_ret, iter);
- } catch (buffer::error& err) {
- return -EIO;
- }
- res_id = op_ret.id;
-
- return 0;
+void cls_2pc_queue_reserve(ObjectWriteOperation& op, uint64_t res_size,
+ uint32_t entries, bufferlist* obl, int* prval) {
+ bufferlist in;
+ cls_2pc_queue_reserve_op reserve_op;
+ reserve_op.size = res_size;
+ reserve_op.entries = entries;
+ encode(reserve_op, in);
+ op.exec(TPC_QUEUE_CLASS, TPC_QUEUE_RESERVE, in, obl, prval);
}
void cls_2pc_queue_commit(ObjectWriteOperation& op, std::vector<bufferlist> bl_data_vec,
op.exec(TPC_QUEUE_CLASS, TPC_QUEUE_ABORT, in);
}
-int cls_2pc_queue_list_entries(IoCtx& io_ctx, const string& queue_name, const string& marker, uint32_t max,
- std::vector<cls_queue_entry>& entries,
+int cls_2pc_queue_list_entries_result(const bufferlist& bl, std::vector<cls_queue_entry>& entries,
bool *truncated, std::string& next_marker) {
- bufferlist in, out;
- cls_queue_list_op op;
- op.start_marker = marker;
- op.max = max;
- encode(op, in);
-
- const auto r = io_ctx.exec(queue_name, TPC_QUEUE_CLASS, TPC_QUEUE_LIST_ENTRIES, in, out);
- if (r < 0) {
- return r;
- }
-
cls_queue_list_ret ret;
- auto iter = out.cbegin();
+ auto iter = bl.cbegin();
try {
decode(ret, iter);
} catch (buffer::error& err) {
return 0;
}
-int cls_2pc_queue_list_reservations(librados::IoCtx& io_ctx, const std::string& queue_name, cls_2pc_reservations& reservations) {
+#ifndef CLS_CLIENT_HIDE_IOCTX
+int cls_2pc_queue_list_entries(IoCtx& io_ctx, const string& queue_name, const string& marker, uint32_t max,
+ std::vector<cls_queue_entry>& entries,
+ bool *truncated, std::string& next_marker) {
bufferlist in, out;
+ cls_queue_list_op op;
+ op.start_marker = marker;
+ op.max = max;
+ encode(op, in);
- const auto r = io_ctx.exec(queue_name, TPC_QUEUE_CLASS, TPC_QUEUE_LIST_RESERVATIONS, in, out);
+ const auto r = io_ctx.exec(queue_name, TPC_QUEUE_CLASS, TPC_QUEUE_LIST_ENTRIES, in, out);
if (r < 0) {
return r;
}
+ return cls_2pc_queue_list_entries_result(out, entries, truncated, next_marker);
+}
+#endif
+
+void cls_2pc_queue_list_entries(ObjectReadOperation& op, const std::string& marker, uint32_t max, bufferlist* obl, int* prval) {
+ bufferlist in;
+ cls_queue_list_op list_op;
+ list_op.start_marker = marker;
+ list_op.max = max;
+ encode(list_op, in);
+ op.exec(TPC_QUEUE_CLASS, TPC_QUEUE_LIST_ENTRIES, in, obl, prval);
+}
+
+int cls_2pc_queue_list_reservations_result(const bufferlist& bl, cls_2pc_reservations& reservations) {
cls_2pc_queue_reservations_ret ret;
- auto iter = out.cbegin();
+ auto iter = bl.cbegin();
try {
decode(ret, iter);
} catch (buffer::error& err) {
return 0;
}
+#ifndef CLS_CLIENT_HIDE_IOCTX
+int cls_2pc_queue_list_reservations(IoCtx& io_ctx, const std::string& queue_name, cls_2pc_reservations& reservations) {
+ bufferlist in, out;
+
+ const auto r = io_ctx.exec(queue_name, TPC_QUEUE_CLASS, TPC_QUEUE_LIST_RESERVATIONS, in, out);
+ if (r < 0) {
+ return r;
+ }
+ return cls_2pc_queue_list_reservations_result(out, reservations);
+}
+#endif
+
+void cls_2pc_queue_list_reservations(ObjectReadOperation& op, bufferlist* obl, int* prval) {
+ bufferlist in;
+
+ op.exec(TPC_QUEUE_CLASS, TPC_QUEUE_LIST_RESERVATIONS, in, obl, prval);
+}
+
void cls_2pc_queue_remove_entries(ObjectWriteOperation& op, const std::string& end_marker) {
bufferlist in;
cls_queue_remove_op rem_op;
TEST_F(TestCls2PCQueue, GetCapacity)
{
- const std::string queue_name = "my-queue";
+ const std::string queue_name = __PRETTY_FUNCTION__;
const auto max_size = 8*1024;
librados::ObjectWriteOperation op;
op.create(true);
ASSERT_EQ(max_size, size);
}
+TEST_F(TestCls2PCQueue, AsyncGetCapacity)
+{
+ const std::string queue_name = __PRETTY_FUNCTION__;
+ const auto max_size = 8*1024;
+ librados::ObjectWriteOperation wop;
+ wop.create(true);
+ cls_2pc_queue_init(wop, queue_name, max_size);
+ ASSERT_EQ(0, ioctx.operate(queue_name, &wop));
+
+ librados::ObjectReadOperation rop;
+ bufferlist bl;
+ int rc;
+ cls_2pc_queue_get_capacity(rop, &bl, &rc);
+ ASSERT_EQ(0, ioctx.operate(queue_name, &rop, nullptr));
+ ASSERT_EQ(0, rc);
+ uint64_t size;
+ ASSERT_EQ(cls_2pc_queue_get_capacity_result(bl, size), 0);
+ ASSERT_EQ(max_size, size);
+}
+
TEST_F(TestCls2PCQueue, Reserve)
{
- const std::string queue_name = "my-queue";
+ const std::string queue_name = __PRETTY_FUNCTION__;
const auto max_size = 1024U*1024U;
const auto number_of_ops = 10U;
const auto number_of_elements = 23U;
for (auto i = 0U; i < number_of_ops; ++i) {
cls_2pc_reservation::id_t res_id;
- ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, op, size_to_reserve, number_of_elements, res_id), 0);
- ASSERT_NE(res_id, cls_2pc_reservation::NO_ID);
+ ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, size_to_reserve, number_of_elements, res_id), 0);
+ ASSERT_EQ(res_id, i+1);
}
cls_2pc_reservations reservations;
ASSERT_EQ(0, cls_2pc_queue_list_reservations(ioctx, queue_name, reservations));
}
}
+TEST_F(TestCls2PCQueue, AsyncReserve)
+{
+ const std::string queue_name = __PRETTY_FUNCTION__;
+ const auto max_size = 1024U*1024U;
+ constexpr auto number_of_ops = 10U;
+ constexpr auto number_of_elements = 23U;
+ const auto size_to_reserve = 250U;
+ librados::ObjectWriteOperation wop;
+ wop.create(true);
+ cls_2pc_queue_init(wop, queue_name, max_size);
+ ASSERT_EQ(0, ioctx.operate(queue_name, &wop));
+
+ for (auto i = 0U; i < number_of_ops; ++i) {
+ bufferlist res_bl;
+ int res_rc;
+ cls_2pc_queue_reserve(wop, size_to_reserve, number_of_elements, &res_bl, &res_rc);
+ ASSERT_EQ(0, ioctx.operate(queue_name, &wop, librados::OPERATION_RETURNVEC));
+ ASSERT_EQ(res_rc, 0);
+ cls_2pc_reservation::id_t res_id;
+ ASSERT_EQ(0, cls_2pc_queue_reserve_result(res_bl, res_id));
+ ASSERT_EQ(res_id, i+1);
+ }
+
+ bufferlist bl;
+ int rc;
+ librados::ObjectReadOperation rop;
+ cls_2pc_queue_list_reservations(rop, &bl, &rc);
+ ASSERT_EQ(0, ioctx.operate(queue_name, &rop, nullptr));
+ ASSERT_EQ(0, rc);
+ cls_2pc_reservations reservations;
+ ASSERT_EQ(0, cls_2pc_queue_list_reservations_result(bl, reservations));
+ ASSERT_EQ(reservations.size(), number_of_ops);
+ for (const auto& r : reservations) {
+ ASSERT_NE(r.first, cls_2pc_reservation::NO_ID);
+ ASSERT_GT(r.second.timestamp.time_since_epoch().count(), 0);
+ }
+}
+
TEST_F(TestCls2PCQueue, Commit)
{
- const std::string queue_name = "my-queue";
+ const std::string queue_name = __PRETTY_FUNCTION__;
const auto max_size = 1024*1024*128;
const auto number_of_ops = 200U;
const auto number_of_elements = 23U;
});
cls_2pc_reservation::id_t res_id;
- ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, op, total_size, number_of_elements, res_id), 0);
+ ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, total_size, number_of_elements, res_id), 0);
ASSERT_NE(res_id, cls_2pc_reservation::NO_ID);
cls_2pc_queue_commit(op, data, res_id);
ASSERT_EQ(0, ioctx.operate(queue_name, &op));
TEST_F(TestCls2PCQueue, Abort)
{
- const std::string queue_name = "my-queue";
+ const std::string queue_name = __PRETTY_FUNCTION__;
const auto max_size = 1024U*1024U;
const auto number_of_ops = 17U;
const auto number_of_elements = 23U;
for (auto i = 0U; i < number_of_ops; ++i) {
cls_2pc_reservation::id_t res_id;
- ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, op, size_to_reserve, number_of_elements, res_id), 0);
+ ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, size_to_reserve, number_of_elements, res_id), 0);
ASSERT_NE(res_id, cls_2pc_reservation::NO_ID);
cls_2pc_queue_abort(op, res_id);
ASSERT_EQ(0, ioctx.operate(queue_name, &op));
TEST_F(TestCls2PCQueue, ReserveError)
{
- const std::string queue_name = "my-queue";
+ const std::string queue_name = __PRETTY_FUNCTION__;
const auto max_size = 256U*1024U;
const auto number_of_ops = 254U;
const auto number_of_elements = 1U;
cls_2pc_reservation::id_t res_id;
for (auto i = 0U; i < number_of_ops-1; ++i) {
- ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, op, size_to_reserve, number_of_elements, res_id), 0);
+ ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, size_to_reserve, number_of_elements, res_id), 0);
ASSERT_NE(res_id, cls_2pc_reservation::NO_ID);
}
res_id = cls_2pc_reservation::NO_ID;
// this one is failing because it exceeds the queue size
- ASSERT_NE(cls_2pc_queue_reserve(ioctx, queue_name, op, size_to_reserve, number_of_elements, res_id), 0);
+ ASSERT_NE(cls_2pc_queue_reserve(ioctx, queue_name, size_to_reserve, number_of_elements, res_id), 0);
ASSERT_EQ(res_id, cls_2pc_reservation::NO_ID);
// this one is failing because it tries to reserve 0 entries
- ASSERT_NE(cls_2pc_queue_reserve(ioctx, queue_name, op, size_to_reserve, 0, res_id), 0);
+ ASSERT_NE(cls_2pc_queue_reserve(ioctx, queue_name, size_to_reserve, 0, res_id), 0);
// this one is failing because it tries to reserve 0 bytes
- ASSERT_NE(cls_2pc_queue_reserve(ioctx, queue_name, op, 0, number_of_elements, res_id), 0);
+ ASSERT_NE(cls_2pc_queue_reserve(ioctx, queue_name, 0, number_of_elements, res_id), 0);
cls_2pc_reservations reservations;
ASSERT_EQ(0, cls_2pc_queue_list_reservations(ioctx, queue_name, reservations));
TEST_F(TestCls2PCQueue, CommitError)
{
- const std::string queue_name = "my-queue";
+ const std::string queue_name = __PRETTY_FUNCTION__;
const auto max_size = 1024*1024;
const auto number_of_ops = 17U;
const auto number_of_elements = 23U;
});
cls_2pc_reservation::id_t res_id;
- ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, op, total_size, number_of_elements, res_id), 0);
+ ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, total_size, number_of_elements, res_id), 0);
ASSERT_NE(res_id, cls_2pc_reservation::NO_ID);
if (i == invalid_reservation_op) {
// fail on a commits with invalid reservation id
TEST_F(TestCls2PCQueue, AbortError)
{
- const std::string queue_name = "my-queue";
+ const std::string queue_name = __PRETTY_FUNCTION__;
const auto max_size = 1024*1024;
const auto number_of_ops = 17U;
const auto number_of_elements = 23U;
for (auto i = 0U; i < number_of_ops; ++i) {
cls_2pc_reservation::id_t res_id;
- ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, op, size_to_reserve, number_of_elements, res_id), 0);
+ ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, size_to_reserve, number_of_elements, res_id), 0);
ASSERT_NE(res_id, cls_2pc_reservation::NO_ID);
if (i == invalid_reservation_op) {
// aborting a reservation which does not exists
TEST_F(TestCls2PCQueue, MultiReserve)
{
- const std::string queue_name = "my-queue";
+ const std::string queue_name = __PRETTY_FUNCTION__;
const auto max_size = 1024*1024;
const auto number_of_ops = 11U;
const auto number_of_elements = 23U;
librados::ObjectWriteOperation op;
for (auto i = 0U; i < number_of_ops; ++i) {
cls_2pc_reservation::id_t res_id = cls_2pc_reservation::NO_ID;
- ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, op, size_to_reserve, number_of_elements, res_id), 0);
+ ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, size_to_reserve, number_of_elements, res_id), 0);
ASSERT_NE(res_id, 0);
}
});
TEST_F(TestCls2PCQueue, MultiCommit)
{
- const std::string queue_name = "my-queue";
+ const std::string queue_name = __PRETTY_FUNCTION__;
const auto max_size = 1024*1024;
const auto number_of_ops = 11U;
const auto number_of_elements = 23U;
return bl;
});
cls_2pc_reservation::id_t res_id = cls_2pc_reservation::NO_ID;
- ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, op, total_size, number_of_elements, res_id), 0);
+ ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, total_size, number_of_elements, res_id), 0);
ASSERT_NE(res_id, 0);
cls_2pc_queue_commit(op, data, res_id);
ASSERT_EQ(0, ioctx.operate(queue_name, &op));
TEST_F(TestCls2PCQueue, MultiAbort)
{
- const std::string queue_name = "my-queue";
+ const std::string queue_name = __PRETTY_FUNCTION__;
const auto max_size = 1024*1024;
const auto number_of_ops = 11U;
const auto number_of_elements = 23U;
librados::ObjectWriteOperation op;
for (auto i = 0U; i < number_of_ops; ++i) {
cls_2pc_reservation::id_t res_id = cls_2pc_reservation::NO_ID;
- ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, op, size_to_reserve, number_of_elements, res_id), 0);
+ ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, size_to_reserve, number_of_elements, res_id), 0);
ASSERT_NE(res_id, 0);
cls_2pc_queue_abort(op, res_id);
ASSERT_EQ(0, ioctx.operate(queue_name, &op));
TEST_F(TestCls2PCQueue, ReserveCommit)
{
- const std::string queue_name = "my-queue";
+ const std::string queue_name = __PRETTY_FUNCTION__;
const auto max_size = 1024*1024;
const auto number_of_ops = 11U;
const auto number_of_elements = 23U;
librados::ObjectWriteOperation op;
for (auto i = 0U; i < number_of_ops; ++i) {
cls_2pc_reservation::id_t res_id = cls_2pc_reservation::NO_ID;
- ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, op, size_to_reserve, number_of_elements, res_id), 0);
+ ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, size_to_reserve, number_of_elements, res_id), 0);
ASSERT_NE(res_id, cls_2pc_reservation::NO_ID);
}
});
TEST_F(TestCls2PCQueue, ReserveAbort)
{
- const std::string queue_name = "my-queue";
+ const std::string queue_name = __PRETTY_FUNCTION__;
const auto max_size = 1024*1024;
const auto number_of_ops = 17U;
const auto number_of_elements = 23U;
librados::ObjectWriteOperation op;
for (auto i = 0U; i < number_of_ops; ++i) {
cls_2pc_reservation::id_t res_id = cls_2pc_reservation::NO_ID;
- ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, op, size_to_reserve, number_of_elements, res_id), 0);
+ ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, size_to_reserve, number_of_elements, res_id), 0);
ASSERT_NE(res_id, cls_2pc_reservation::NO_ID);
}
});
TEST_F(TestCls2PCQueue, Cleanup)
{
- const std::string queue_name = "my-queue";
+ const std::string queue_name = __PRETTY_FUNCTION__;
const auto max_size = 128*1024*1024;
const auto number_of_ops = 17U;
const auto number_of_elements = 23U;
librados::ObjectWriteOperation op;
for (auto i = 0U; i < number_of_ops; ++i) {
cls_2pc_reservation::id_t res_id = cls_2pc_reservation::NO_ID;
- ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, op, size_to_reserve, number_of_elements, res_id), 0);
+ ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, size_to_reserve, number_of_elements, res_id), 0);
ASSERT_NE(res_id, cls_2pc_reservation::NO_ID);
// wait for 10ms between each reservation to make sure at least some are stale
std::this_thread::sleep_for(std::chrono::milliseconds(10));
TEST_F(TestCls2PCQueue, MultiProducer)
{
- const std::string queue_name = "my-queue";
+ const std::string queue_name = __PRETTY_FUNCTION__;
const auto max_size = 128*1024*1024;
const auto number_of_ops = 300U;
const auto number_of_elements = 23U;
return bl;
});
cls_2pc_reservation::id_t res_id = cls_2pc_reservation::NO_ID;
- ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, op, total_size, number_of_elements, res_id), 0);
+ ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, total_size, number_of_elements, res_id), 0);
ASSERT_NE(res_id, 0);
cls_2pc_queue_commit(op, data, res_id);
ASSERT_EQ(0, ioctx.operate(queue_name, &op));
ASSERT_EQ(consume_count, number_of_ops*number_of_elements*max_producer_count);
}
+TEST_F(TestCls2PCQueue, AsyncConsumer)
+{
+ const std::string queue_name = __PRETTY_FUNCTION__;
+ constexpr auto max_size = 128*1024*1024;
+ constexpr auto number_of_ops = 250U;
+ constexpr auto number_of_elements = 23U;
+ librados::ObjectWriteOperation wop;
+ wop.create(true);
+ cls_2pc_queue_init(wop, queue_name, max_size);
+ ASSERT_EQ(0, ioctx.operate(queue_name, &wop));
+
+
+ for (auto i = 0U; i < number_of_ops; ++i) {
+ const std::string element_prefix("op-" +to_string(i) + "-element-");
+ std::vector<bufferlist> data(number_of_elements);
+ auto total_size = 0UL;
+ // create vector of buffer lists
+ std::generate(data.begin(), data.end(), [j = 0, &element_prefix, &total_size] () mutable {
+ bufferlist bl;
+ bl.append(element_prefix + to_string(j++));
+ total_size += bl.length();
+ return bl;
+ });
+ cls_2pc_reservation::id_t res_id = cls_2pc_reservation::NO_ID;
+ ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, total_size, number_of_elements, res_id), 0);
+ ASSERT_NE(res_id, 0);
+ cls_2pc_queue_commit(wop, data, res_id);
+ ASSERT_EQ(0, ioctx.operate(queue_name, &wop));
+ }
+
+ constexpr auto max_elements = 42;
+ std::string marker;
+ std::string end_marker;
+ librados::ObjectReadOperation rop;
+ auto consume_count = 0U;
+ std::vector<cls_queue_entry> entries;
+ bool truncated = true;
+ while (truncated) {
+ bufferlist bl;
+ int rc;
+ cls_2pc_queue_list_entries(rop, marker, max_elements, &bl, &rc);
+ ASSERT_EQ(0, ioctx.operate(queue_name, &rop, nullptr));
+ ASSERT_EQ(rc, 0);
+ ASSERT_EQ(cls_2pc_queue_list_entries_result(bl, entries, &truncated, end_marker), 0);
+ consume_count += entries.size();
+ cls_2pc_queue_remove_entries(wop, end_marker);
+ marker = end_marker;
+ }
+
+ ASSERT_EQ(consume_count, number_of_ops*number_of_elements);
+ // execute all delete operations in a batch
+ ASSERT_EQ(0, ioctx.operate(queue_name, &wop));
+ // make sure that queue is empty
+ ASSERT_EQ(cls_2pc_queue_list_entries(ioctx, queue_name, marker, max_elements, entries, &truncated, end_marker), 0);
+ ASSERT_EQ(entries.size(), 0);
+}
+
TEST_F(TestCls2PCQueue, MultiProducerConsumer)
{
- const std::string queue_name = "my-queue";
+ const std::string queue_name = __PRETTY_FUNCTION__;
const auto max_size = 1024*1024;
const auto number_of_ops = 300U;
const auto number_of_elements = 23U;
return bl;
});
cls_2pc_reservation::id_t res_id = cls_2pc_reservation::NO_ID;
- auto rc = cls_2pc_queue_reserve(ioctx, queue_name, op, total_size, number_of_elements, res_id);
+ auto rc = cls_2pc_queue_reserve(ioctx, queue_name, total_size, number_of_elements, res_id);
while (rc != 0) {
// other errors should cause test to fail
ASSERT_EQ(rc, -ENOSPC);
// queue is full, sleep and retry
retry_happened = true;
std::this_thread::sleep_for(std::chrono::milliseconds(10));
- rc = cls_2pc_queue_reserve(ioctx, queue_name, op, total_size, number_of_elements, res_id);
+ rc = cls_2pc_queue_reserve(ioctx, queue_name, total_size, number_of_elements, res_id);
};
ASSERT_NE(res_id, 0);
cls_2pc_queue_commit(op, data, res_id);
TEST_F(TestCls2PCQueue, ReserveSpillover)
{
- const std::string queue_name = "my-queue";
+ const std::string queue_name = __PRETTY_FUNCTION__;
const auto max_size = 1024U*1024U;
const auto number_of_ops = 1024U;
const auto number_of_elements = 8U;
for (auto i = 0U; i < number_of_ops; ++i) {
cls_2pc_reservation::id_t res_id;
- ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, op, size_to_reserve, number_of_elements, res_id), 0);
+ ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, size_to_reserve, number_of_elements, res_id), 0);
ASSERT_NE(res_id, cls_2pc_reservation::NO_ID);
}
cls_2pc_reservations reservations;
TEST_F(TestCls2PCQueue, CommitSpillover)
{
- const std::string queue_name = "my-queue";
+ const std::string queue_name = __PRETTY_FUNCTION__;
const auto max_size = 1024U*1024U;
const auto number_of_ops = 1024U;
const auto number_of_elements = 4U;
for (auto i = 0U; i < number_of_ops; ++i) {
cls_2pc_reservation::id_t res_id;
- ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, op, size_to_reserve, number_of_elements, res_id), 0);
+ ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, size_to_reserve, number_of_elements, res_id), 0);
ASSERT_NE(res_id, cls_2pc_reservation::NO_ID);
}
cls_2pc_reservations reservations;
TEST_F(TestCls2PCQueue, AbortSpillover)
{
- const std::string queue_name = "my-queue";
+ const std::string queue_name = __PRETTY_FUNCTION__;
const auto max_size = 1024U*1024U;
const auto number_of_ops = 1024U;
const auto number_of_elements = 4U;
for (auto i = 0U; i < number_of_ops; ++i) {
cls_2pc_reservation::id_t res_id;
- ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, op, size_to_reserve, number_of_elements, res_id), 0);
+ ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, size_to_reserve, number_of_elements, res_id), 0);
ASSERT_NE(res_id, cls_2pc_reservation::NO_ID);
}
cls_2pc_reservations reservations;