From: Yuval Lifshitz Date: Tue, 12 May 2020 12:44:44 +0000 (+0300) Subject: cls/2pc_queue: add async API to read operations X-Git-Tag: v16.1.0~2267^2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=refs%2Fpull%2F35020%2Fhead;p=ceph.git cls/2pc_queue: add async API to read operations Signed-off-by: Yuval Lifshitz --- diff --git a/src/cls/2pc_queue/cls_2pc_queue_client.cc b/src/cls/2pc_queue/cls_2pc_queue_client.cc index 45f31cf401a3..648988d6b80e 100644 --- a/src/cls/2pc_queue/cls_2pc_queue_client.cc +++ b/src/cls/2pc_queue/cls_2pc_queue_client.cc @@ -17,6 +17,21 @@ void cls_2pc_queue_init(ObjectWriteOperation& op, const std::string& queue_name, op.exec(TPC_QUEUE_CLASS, TPC_QUEUE_INIT, in); } +int cls_2pc_queue_get_capacity_result(const bufferlist& bl, uint64_t& size) { + cls_queue_get_capacity_ret op_ret; + auto iter = bl.cbegin(); + try { + decode(op_ret, iter); + } catch (buffer::error& err) { + return -EIO; + } + + size = op_ret.queue_capacity; + + return 0; +} + +#ifndef CLS_CLIENT_HIDE_IOCTX int cls_2pc_queue_get_capacity(IoCtx& io_ctx, const string& queue_name, uint64_t& size) { bufferlist in, out; const auto r = io_ctx.exec(queue_name, TPC_QUEUE_CLASS, TPC_QUEUE_GET_CAPACITY, in, out); @@ -24,20 +39,32 @@ int cls_2pc_queue_get_capacity(IoCtx& io_ctx, const string& queue_name, uint64_t return r; } - cls_queue_get_capacity_ret op_ret; - auto iter = out.cbegin(); + return cls_2pc_queue_get_capacity_result(out, size); +} +#endif + +// optionally async method for getting capacity (bytes) +// after answer is received, call cls_2pc_queue_get_capacity_result() to prase the results +void cls_2pc_queue_get_capacity(ObjectReadOperation& op, bufferlist* obl, int* prval) { + bufferlist in; + op.exec(TPC_QUEUE_CLASS, TPC_QUEUE_GET_CAPACITY, in, obl, prval); +} + + +int cls_2pc_queue_reserve_result(const bufferlist& bl, cls_2pc_reservation::id_t& res_id) { + cls_2pc_queue_reserve_ret op_ret; + auto iter = bl.cbegin(); try { decode(op_ret, iter); } catch (buffer::error& err) { return -EIO; } - - size = op_ret.queue_capacity; + res_id = op_ret.id; return 0; } -int cls_2pc_queue_reserve(IoCtx& io_ctx, const string& queue_name, librados::ObjectWriteOperation& op, +int cls_2pc_queue_reserve(IoCtx& io_ctx, const string& queue_name, uint64_t res_size, uint32_t entries, cls_2pc_reservation::id_t& res_id) { bufferlist in, out; cls_2pc_queue_reserve_op reserve_op; @@ -46,22 +73,25 @@ int cls_2pc_queue_reserve(IoCtx& io_ctx, const string& queue_name, librados::Obj encode(reserve_op, in); int rval; + ObjectWriteOperation op; op.exec(TPC_QUEUE_CLASS, TPC_QUEUE_RESERVE, in, &out, &rval); const auto r = io_ctx.operate(queue_name, &op, librados::OPERATION_RETURNVEC); + if (r < 0) { return r; } + + return cls_2pc_queue_reserve_result(out, res_id); +} - cls_2pc_queue_reserve_ret op_ret; - auto iter = out.cbegin(); - try { - decode(op_ret, iter); - } catch (buffer::error& err) { - return -EIO; - } - res_id = op_ret.id; - - return 0; +void cls_2pc_queue_reserve(ObjectWriteOperation& op, uint64_t res_size, + uint32_t entries, bufferlist* obl, int* prval) { + bufferlist in; + cls_2pc_queue_reserve_op reserve_op; + reserve_op.size = res_size; + reserve_op.entries = entries; + encode(reserve_op, in); + op.exec(TPC_QUEUE_CLASS, TPC_QUEUE_RESERVE, in, obl, prval); } void cls_2pc_queue_commit(ObjectWriteOperation& op, std::vector bl_data_vec, @@ -82,22 +112,10 @@ void cls_2pc_queue_abort(ObjectWriteOperation& op, cls_2pc_reservation::id_t res op.exec(TPC_QUEUE_CLASS, TPC_QUEUE_ABORT, in); } -int cls_2pc_queue_list_entries(IoCtx& io_ctx, const string& queue_name, const string& marker, uint32_t max, - std::vector& entries, +int cls_2pc_queue_list_entries_result(const bufferlist& bl, std::vector& entries, bool *truncated, std::string& next_marker) { - bufferlist in, out; - cls_queue_list_op op; - op.start_marker = marker; - op.max = max; - encode(op, in); - - const auto r = io_ctx.exec(queue_name, TPC_QUEUE_CLASS, TPC_QUEUE_LIST_ENTRIES, in, out); - if (r < 0) { - return r; - } - cls_queue_list_ret ret; - auto iter = out.cbegin(); + auto iter = bl.cbegin(); try { decode(ret, iter); } catch (buffer::error& err) { @@ -112,16 +130,37 @@ int cls_2pc_queue_list_entries(IoCtx& io_ctx, const string& queue_name, const st return 0; } -int cls_2pc_queue_list_reservations(librados::IoCtx& io_ctx, const std::string& queue_name, cls_2pc_reservations& reservations) { +#ifndef CLS_CLIENT_HIDE_IOCTX +int cls_2pc_queue_list_entries(IoCtx& io_ctx, const string& queue_name, const string& marker, uint32_t max, + std::vector& entries, + bool *truncated, std::string& next_marker) { bufferlist in, out; + cls_queue_list_op op; + op.start_marker = marker; + op.max = max; + encode(op, in); - const auto r = io_ctx.exec(queue_name, TPC_QUEUE_CLASS, TPC_QUEUE_LIST_RESERVATIONS, in, out); + const auto r = io_ctx.exec(queue_name, TPC_QUEUE_CLASS, TPC_QUEUE_LIST_ENTRIES, in, out); if (r < 0) { return r; } + return cls_2pc_queue_list_entries_result(out, entries, truncated, next_marker); +} +#endif + +void cls_2pc_queue_list_entries(ObjectReadOperation& op, const std::string& marker, uint32_t max, bufferlist* obl, int* prval) { + bufferlist in; + cls_queue_list_op list_op; + list_op.start_marker = marker; + list_op.max = max; + encode(list_op, in); + op.exec(TPC_QUEUE_CLASS, TPC_QUEUE_LIST_ENTRIES, in, obl, prval); +} + +int cls_2pc_queue_list_reservations_result(const bufferlist& bl, cls_2pc_reservations& reservations) { cls_2pc_queue_reservations_ret ret; - auto iter = out.cbegin(); + auto iter = bl.cbegin(); try { decode(ret, iter); } catch (buffer::error& err) { @@ -133,6 +172,24 @@ int cls_2pc_queue_list_reservations(librados::IoCtx& io_ctx, const std::string& return 0; } +#ifndef CLS_CLIENT_HIDE_IOCTX +int cls_2pc_queue_list_reservations(IoCtx& io_ctx, const std::string& queue_name, cls_2pc_reservations& reservations) { + bufferlist in, out; + + const auto r = io_ctx.exec(queue_name, TPC_QUEUE_CLASS, TPC_QUEUE_LIST_RESERVATIONS, in, out); + if (r < 0) { + return r; + } + return cls_2pc_queue_list_reservations_result(out, reservations); +} +#endif + +void cls_2pc_queue_list_reservations(ObjectReadOperation& op, bufferlist* obl, int* prval) { + bufferlist in; + + op.exec(TPC_QUEUE_CLASS, TPC_QUEUE_LIST_RESERVATIONS, in, obl, prval); +} + void cls_2pc_queue_remove_entries(ObjectWriteOperation& op, const std::string& end_marker) { bufferlist in; cls_queue_remove_op rem_op; diff --git a/src/cls/2pc_queue/cls_2pc_queue_client.h b/src/cls/2pc_queue/cls_2pc_queue_client.h index 6b408e113f69..a64805a191c2 100644 --- a/src/cls/2pc_queue/cls_2pc_queue_client.h +++ b/src/cls/2pc_queue/cls_2pc_queue_client.h @@ -14,14 +14,41 @@ // and more may be allocated as xattrs of the object (depending with the number of concurrent reservations) void cls_2pc_queue_init(librados::ObjectWriteOperation& op, const std::string& queue_name, uint64_t size); -// return max capacity (bytes) +// these overloads which call io_ctx.operate() or io_ctx.exec() should not be called in the rgw. +// rgw_rados_operate() should be called after the overloads w/o calls to io_ctx.operate()/exec() +#ifndef CLS_CLIENT_HIDE_IOCTX +// return capacity (bytes) int cls_2pc_queue_get_capacity(librados::IoCtx& io_ctx, const string& queue_name, uint64_t& size); // make a reservation on the queue (in bytes) and number of expected entries (to calculate overhead) // return a reservation id if reservations is possible, 0 otherwise -int cls_2pc_queue_reserve(librados::IoCtx& io_ctx, const std::string& queue_name, librados::ObjectWriteOperation& op, +int cls_2pc_queue_reserve(librados::IoCtx& io_ctx, const std::string& queue_name, uint64_t res_size, uint32_t entries, cls_2pc_reservation::id_t& res_id); +// incremental listing of all entries in the queue +int cls_2pc_queue_list_entries(librados::IoCtx& io_ctx, const std::string& queue_name, const std::string& marker, uint32_t max, + std::vector& entries, bool *truncated, std::string& next_marker); + +// list all pending reservations in the queue +int cls_2pc_queue_list_reservations(librados::IoCtx& io_ctx, const std::string& queue_name, cls_2pc_reservations& reservations); +#endif + +// optionally async method for getting capacity (bytes) +// after answer is received, call cls_2pc_queue_get_capacity_result() to parse the results +void cls_2pc_queue_get_capacity(librados::ObjectReadOperation& op, bufferlist* obl, int* prval); + +int cls_2pc_queue_get_capacity_result(const bufferlist& bl, uint64_t& size); + +// optionally async method for making a reservation on the queue (in bytes) and number of expected entries (to calculate overhead) +// notes: +// (1) make sure that librados::OPERATION_RETURNVEC is passed to the executing function +// (2) multiple operations cannot be executed in a batch (operations both read and write) +// after answer is received, call cls_2pc_queue_reserve_result() to parse the results +void cls_2pc_queue_reserve(librados::ObjectWriteOperation& op, uint64_t res_size, + uint32_t entries, bufferlist* obl, int* prval); + +int cls_2pc_queue_reserve_result(const bufferlist& bl, cls_2pc_reservation::id_t& res_id); + // commit data using a reservation done beforehand // res_id must be allocated using cls_2pc_queue_reserve, and could be either committed or aborted once // the size of bl_data_vec must be equal or smaller to the size reserved for the res_id @@ -35,12 +62,18 @@ void cls_2pc_queue_commit(librados::ObjectWriteOperation& op, std::vector& entries, bool *truncated, std::string& next_marker); +// optionally async incremental listing of all entries in the queue +// after answer is received, call cls_2pc_queue_list_entries_result() to parse the results +void cls_2pc_queue_list_entries(librados::ObjectReadOperation& op, const std::string& marker, uint32_t max, bufferlist* obl, int* prval); -// list all pending reservations in the queue -int cls_2pc_queue_list_reservations(librados::IoCtx& io_ctx, const std::string& queue_name, cls_2pc_reservations& reservations); +int cls_2pc_queue_list_entries_result(const bufferlist& bl, std::vector& entries, + bool *truncated, std::string& next_marker); + +// optionally async listing of all pending reservations in the queue +// after answer is received, call cls_2pc_queue_list_reservations_result() to parse the results +void cls_2pc_queue_list_reservations(librados::ObjectReadOperation& op, bufferlist* obl, int* prval); + +int cls_2pc_queue_list_reservations_result(const librados::bufferlist& bl, cls_2pc_reservations& reservations); // remove all entries up to the given marker void cls_2pc_queue_remove_entries(librados::ObjectWriteOperation& op, const std::string& end_marker); diff --git a/src/test/cls_2pc_queue/test_cls_2pc_queue.cc b/src/test/cls_2pc_queue/test_cls_2pc_queue.cc index 9b325b258d07..05db8122a667 100644 --- a/src/test/cls_2pc_queue/test_cls_2pc_queue.cc +++ b/src/test/cls_2pc_queue/test_cls_2pc_queue.cc @@ -39,7 +39,7 @@ protected: TEST_F(TestCls2PCQueue, GetCapacity) { - const std::string queue_name = "my-queue"; + const std::string queue_name = __PRETTY_FUNCTION__; const auto max_size = 8*1024; librados::ObjectWriteOperation op; op.create(true); @@ -53,9 +53,29 @@ TEST_F(TestCls2PCQueue, GetCapacity) ASSERT_EQ(max_size, size); } +TEST_F(TestCls2PCQueue, AsyncGetCapacity) +{ + const std::string queue_name = __PRETTY_FUNCTION__; + const auto max_size = 8*1024; + librados::ObjectWriteOperation wop; + wop.create(true); + cls_2pc_queue_init(wop, queue_name, max_size); + ASSERT_EQ(0, ioctx.operate(queue_name, &wop)); + + librados::ObjectReadOperation rop; + bufferlist bl; + int rc; + cls_2pc_queue_get_capacity(rop, &bl, &rc); + ASSERT_EQ(0, ioctx.operate(queue_name, &rop, nullptr)); + ASSERT_EQ(0, rc); + uint64_t size; + ASSERT_EQ(cls_2pc_queue_get_capacity_result(bl, size), 0); + ASSERT_EQ(max_size, size); +} + TEST_F(TestCls2PCQueue, Reserve) { - const std::string queue_name = "my-queue"; + const std::string queue_name = __PRETTY_FUNCTION__; const auto max_size = 1024U*1024U; const auto number_of_ops = 10U; const auto number_of_elements = 23U; @@ -67,8 +87,8 @@ TEST_F(TestCls2PCQueue, Reserve) for (auto i = 0U; i < number_of_ops; ++i) { cls_2pc_reservation::id_t res_id; - ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, op, size_to_reserve, number_of_elements, res_id), 0); - ASSERT_NE(res_id, cls_2pc_reservation::NO_ID); + ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, size_to_reserve, number_of_elements, res_id), 0); + ASSERT_EQ(res_id, i+1); } cls_2pc_reservations reservations; ASSERT_EQ(0, cls_2pc_queue_list_reservations(ioctx, queue_name, reservations)); @@ -79,9 +99,47 @@ TEST_F(TestCls2PCQueue, Reserve) } } +TEST_F(TestCls2PCQueue, AsyncReserve) +{ + const std::string queue_name = __PRETTY_FUNCTION__; + const auto max_size = 1024U*1024U; + constexpr auto number_of_ops = 10U; + constexpr auto number_of_elements = 23U; + const auto size_to_reserve = 250U; + librados::ObjectWriteOperation wop; + wop.create(true); + cls_2pc_queue_init(wop, queue_name, max_size); + ASSERT_EQ(0, ioctx.operate(queue_name, &wop)); + + for (auto i = 0U; i < number_of_ops; ++i) { + bufferlist res_bl; + int res_rc; + cls_2pc_queue_reserve(wop, size_to_reserve, number_of_elements, &res_bl, &res_rc); + ASSERT_EQ(0, ioctx.operate(queue_name, &wop, librados::OPERATION_RETURNVEC)); + ASSERT_EQ(res_rc, 0); + cls_2pc_reservation::id_t res_id; + ASSERT_EQ(0, cls_2pc_queue_reserve_result(res_bl, res_id)); + ASSERT_EQ(res_id, i+1); + } + + bufferlist bl; + int rc; + librados::ObjectReadOperation rop; + cls_2pc_queue_list_reservations(rop, &bl, &rc); + ASSERT_EQ(0, ioctx.operate(queue_name, &rop, nullptr)); + ASSERT_EQ(0, rc); + cls_2pc_reservations reservations; + ASSERT_EQ(0, cls_2pc_queue_list_reservations_result(bl, reservations)); + ASSERT_EQ(reservations.size(), number_of_ops); + for (const auto& r : reservations) { + ASSERT_NE(r.first, cls_2pc_reservation::NO_ID); + ASSERT_GT(r.second.timestamp.time_since_epoch().count(), 0); + } +} + TEST_F(TestCls2PCQueue, Commit) { - const std::string queue_name = "my-queue"; + const std::string queue_name = __PRETTY_FUNCTION__; const auto max_size = 1024*1024*128; const auto number_of_ops = 200U; const auto number_of_elements = 23U; @@ -103,7 +161,7 @@ TEST_F(TestCls2PCQueue, Commit) }); cls_2pc_reservation::id_t res_id; - ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, op, total_size, number_of_elements, res_id), 0); + ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, total_size, number_of_elements, res_id), 0); ASSERT_NE(res_id, cls_2pc_reservation::NO_ID); cls_2pc_queue_commit(op, data, res_id); ASSERT_EQ(0, ioctx.operate(queue_name, &op)); @@ -115,7 +173,7 @@ TEST_F(TestCls2PCQueue, Commit) TEST_F(TestCls2PCQueue, Abort) { - const std::string queue_name = "my-queue"; + const std::string queue_name = __PRETTY_FUNCTION__; const auto max_size = 1024U*1024U; const auto number_of_ops = 17U; const auto number_of_elements = 23U; @@ -127,7 +185,7 @@ TEST_F(TestCls2PCQueue, Abort) for (auto i = 0U; i < number_of_ops; ++i) { cls_2pc_reservation::id_t res_id; - ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, op, size_to_reserve, number_of_elements, res_id), 0); + ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, size_to_reserve, number_of_elements, res_id), 0); ASSERT_NE(res_id, cls_2pc_reservation::NO_ID); cls_2pc_queue_abort(op, res_id); ASSERT_EQ(0, ioctx.operate(queue_name, &op)); @@ -139,7 +197,7 @@ TEST_F(TestCls2PCQueue, Abort) TEST_F(TestCls2PCQueue, ReserveError) { - const std::string queue_name = "my-queue"; + const std::string queue_name = __PRETTY_FUNCTION__; const auto max_size = 256U*1024U; const auto number_of_ops = 254U; const auto number_of_elements = 1U; @@ -151,18 +209,18 @@ TEST_F(TestCls2PCQueue, ReserveError) cls_2pc_reservation::id_t res_id; for (auto i = 0U; i < number_of_ops-1; ++i) { - ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, op, size_to_reserve, number_of_elements, res_id), 0); + ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, size_to_reserve, number_of_elements, res_id), 0); ASSERT_NE(res_id, cls_2pc_reservation::NO_ID); } res_id = cls_2pc_reservation::NO_ID; // this one is failing because it exceeds the queue size - ASSERT_NE(cls_2pc_queue_reserve(ioctx, queue_name, op, size_to_reserve, number_of_elements, res_id), 0); + ASSERT_NE(cls_2pc_queue_reserve(ioctx, queue_name, size_to_reserve, number_of_elements, res_id), 0); ASSERT_EQ(res_id, cls_2pc_reservation::NO_ID); // this one is failing because it tries to reserve 0 entries - ASSERT_NE(cls_2pc_queue_reserve(ioctx, queue_name, op, size_to_reserve, 0, res_id), 0); + ASSERT_NE(cls_2pc_queue_reserve(ioctx, queue_name, size_to_reserve, 0, res_id), 0); // this one is failing because it tries to reserve 0 bytes - ASSERT_NE(cls_2pc_queue_reserve(ioctx, queue_name, op, 0, number_of_elements, res_id), 0); + ASSERT_NE(cls_2pc_queue_reserve(ioctx, queue_name, 0, number_of_elements, res_id), 0); cls_2pc_reservations reservations; ASSERT_EQ(0, cls_2pc_queue_list_reservations(ioctx, queue_name, reservations)); @@ -175,7 +233,7 @@ TEST_F(TestCls2PCQueue, ReserveError) TEST_F(TestCls2PCQueue, CommitError) { - const std::string queue_name = "my-queue"; + const std::string queue_name = __PRETTY_FUNCTION__; const auto max_size = 1024*1024; const auto number_of_ops = 17U; const auto number_of_elements = 23U; @@ -206,7 +264,7 @@ TEST_F(TestCls2PCQueue, CommitError) }); cls_2pc_reservation::id_t res_id; - ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, op, total_size, number_of_elements, res_id), 0); + ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, total_size, number_of_elements, res_id), 0); ASSERT_NE(res_id, cls_2pc_reservation::NO_ID); if (i == invalid_reservation_op) { // fail on a commits with invalid reservation id @@ -229,7 +287,7 @@ TEST_F(TestCls2PCQueue, CommitError) TEST_F(TestCls2PCQueue, AbortError) { - const std::string queue_name = "my-queue"; + const std::string queue_name = __PRETTY_FUNCTION__; const auto max_size = 1024*1024; const auto number_of_ops = 17U; const auto number_of_elements = 23U; @@ -243,7 +301,7 @@ TEST_F(TestCls2PCQueue, AbortError) for (auto i = 0U; i < number_of_ops; ++i) { cls_2pc_reservation::id_t res_id; - ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, op, size_to_reserve, number_of_elements, res_id), 0); + ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, size_to_reserve, number_of_elements, res_id), 0); ASSERT_NE(res_id, cls_2pc_reservation::NO_ID); if (i == invalid_reservation_op) { // aborting a reservation which does not exists @@ -262,7 +320,7 @@ TEST_F(TestCls2PCQueue, AbortError) TEST_F(TestCls2PCQueue, MultiReserve) { - const std::string queue_name = "my-queue"; + const std::string queue_name = __PRETTY_FUNCTION__; const auto max_size = 1024*1024; const auto number_of_ops = 11U; const auto number_of_elements = 23U; @@ -279,7 +337,7 @@ TEST_F(TestCls2PCQueue, MultiReserve) librados::ObjectWriteOperation op; for (auto i = 0U; i < number_of_ops; ++i) { cls_2pc_reservation::id_t res_id = cls_2pc_reservation::NO_ID; - ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, op, size_to_reserve, number_of_elements, res_id), 0); + ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, size_to_reserve, number_of_elements, res_id), 0); ASSERT_NE(res_id, 0); } }); @@ -299,7 +357,7 @@ TEST_F(TestCls2PCQueue, MultiReserve) TEST_F(TestCls2PCQueue, MultiCommit) { - const std::string queue_name = "my-queue"; + const std::string queue_name = __PRETTY_FUNCTION__; const auto max_size = 1024*1024; const auto number_of_ops = 11U; const auto number_of_elements = 23U; @@ -325,7 +383,7 @@ TEST_F(TestCls2PCQueue, MultiCommit) return bl; }); cls_2pc_reservation::id_t res_id = cls_2pc_reservation::NO_ID; - ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, op, total_size, number_of_elements, res_id), 0); + ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, total_size, number_of_elements, res_id), 0); ASSERT_NE(res_id, 0); cls_2pc_queue_commit(op, data, res_id); ASSERT_EQ(0, ioctx.operate(queue_name, &op)); @@ -342,7 +400,7 @@ TEST_F(TestCls2PCQueue, MultiCommit) TEST_F(TestCls2PCQueue, MultiAbort) { - const std::string queue_name = "my-queue"; + const std::string queue_name = __PRETTY_FUNCTION__; const auto max_size = 1024*1024; const auto number_of_ops = 11U; const auto number_of_elements = 23U; @@ -359,7 +417,7 @@ TEST_F(TestCls2PCQueue, MultiAbort) librados::ObjectWriteOperation op; for (auto i = 0U; i < number_of_ops; ++i) { cls_2pc_reservation::id_t res_id = cls_2pc_reservation::NO_ID; - ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, op, size_to_reserve, number_of_elements, res_id), 0); + ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, size_to_reserve, number_of_elements, res_id), 0); ASSERT_NE(res_id, 0); cls_2pc_queue_abort(op, res_id); ASSERT_EQ(0, ioctx.operate(queue_name, &op)); @@ -376,7 +434,7 @@ TEST_F(TestCls2PCQueue, MultiAbort) TEST_F(TestCls2PCQueue, ReserveCommit) { - const std::string queue_name = "my-queue"; + const std::string queue_name = __PRETTY_FUNCTION__; const auto max_size = 1024*1024; const auto number_of_ops = 11U; const auto number_of_elements = 23U; @@ -393,7 +451,7 @@ TEST_F(TestCls2PCQueue, ReserveCommit) librados::ObjectWriteOperation op; for (auto i = 0U; i < number_of_ops; ++i) { cls_2pc_reservation::id_t res_id = cls_2pc_reservation::NO_ID; - ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, op, size_to_reserve, number_of_elements, res_id), 0); + ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, size_to_reserve, number_of_elements, res_id), 0); ASSERT_NE(res_id, cls_2pc_reservation::NO_ID); } }); @@ -431,7 +489,7 @@ TEST_F(TestCls2PCQueue, ReserveCommit) TEST_F(TestCls2PCQueue, ReserveAbort) { - const std::string queue_name = "my-queue"; + const std::string queue_name = __PRETTY_FUNCTION__; const auto max_size = 1024*1024; const auto number_of_ops = 17U; const auto number_of_elements = 23U; @@ -448,7 +506,7 @@ TEST_F(TestCls2PCQueue, ReserveAbort) librados::ObjectWriteOperation op; for (auto i = 0U; i < number_of_ops; ++i) { cls_2pc_reservation::id_t res_id = cls_2pc_reservation::NO_ID; - ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, op, size_to_reserve, number_of_elements, res_id), 0); + ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, size_to_reserve, number_of_elements, res_id), 0); ASSERT_NE(res_id, cls_2pc_reservation::NO_ID); } }); @@ -478,7 +536,7 @@ TEST_F(TestCls2PCQueue, ReserveAbort) TEST_F(TestCls2PCQueue, Cleanup) { - const std::string queue_name = "my-queue"; + const std::string queue_name = __PRETTY_FUNCTION__; const auto max_size = 128*1024*1024; const auto number_of_ops = 17U; const auto number_of_elements = 23U; @@ -498,7 +556,7 @@ TEST_F(TestCls2PCQueue, Cleanup) librados::ObjectWriteOperation op; for (auto i = 0U; i < number_of_ops; ++i) { cls_2pc_reservation::id_t res_id = cls_2pc_reservation::NO_ID; - ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, op, size_to_reserve, number_of_elements, res_id), 0); + ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, size_to_reserve, number_of_elements, res_id), 0); ASSERT_NE(res_id, cls_2pc_reservation::NO_ID); // wait for 10ms between each reservation to make sure at least some are stale std::this_thread::sleep_for(std::chrono::milliseconds(10)); @@ -552,7 +610,7 @@ TEST_F(TestCls2PCQueue, Cleanup) TEST_F(TestCls2PCQueue, MultiProducer) { - const std::string queue_name = "my-queue"; + const std::string queue_name = __PRETTY_FUNCTION__; const auto max_size = 128*1024*1024; const auto number_of_ops = 300U; const auto number_of_elements = 23U; @@ -580,7 +638,7 @@ TEST_F(TestCls2PCQueue, MultiProducer) return bl; }); cls_2pc_reservation::id_t res_id = cls_2pc_reservation::NO_ID; - ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, op, total_size, number_of_elements, res_id), 0); + ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, total_size, number_of_elements, res_id), 0); ASSERT_NE(res_id, 0); cls_2pc_queue_commit(op, data, res_id); ASSERT_EQ(0, ioctx.operate(queue_name, &op)); @@ -611,9 +669,66 @@ TEST_F(TestCls2PCQueue, MultiProducer) ASSERT_EQ(consume_count, number_of_ops*number_of_elements*max_producer_count); } +TEST_F(TestCls2PCQueue, AsyncConsumer) +{ + const std::string queue_name = __PRETTY_FUNCTION__; + constexpr auto max_size = 128*1024*1024; + constexpr auto number_of_ops = 250U; + constexpr auto number_of_elements = 23U; + librados::ObjectWriteOperation wop; + wop.create(true); + cls_2pc_queue_init(wop, queue_name, max_size); + ASSERT_EQ(0, ioctx.operate(queue_name, &wop)); + + + for (auto i = 0U; i < number_of_ops; ++i) { + const std::string element_prefix("op-" +to_string(i) + "-element-"); + std::vector data(number_of_elements); + auto total_size = 0UL; + // create vector of buffer lists + std::generate(data.begin(), data.end(), [j = 0, &element_prefix, &total_size] () mutable { + bufferlist bl; + bl.append(element_prefix + to_string(j++)); + total_size += bl.length(); + return bl; + }); + cls_2pc_reservation::id_t res_id = cls_2pc_reservation::NO_ID; + ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, total_size, number_of_elements, res_id), 0); + ASSERT_NE(res_id, 0); + cls_2pc_queue_commit(wop, data, res_id); + ASSERT_EQ(0, ioctx.operate(queue_name, &wop)); + } + + constexpr auto max_elements = 42; + std::string marker; + std::string end_marker; + librados::ObjectReadOperation rop; + auto consume_count = 0U; + std::vector entries; + bool truncated = true; + while (truncated) { + bufferlist bl; + int rc; + cls_2pc_queue_list_entries(rop, marker, max_elements, &bl, &rc); + ASSERT_EQ(0, ioctx.operate(queue_name, &rop, nullptr)); + ASSERT_EQ(rc, 0); + ASSERT_EQ(cls_2pc_queue_list_entries_result(bl, entries, &truncated, end_marker), 0); + consume_count += entries.size(); + cls_2pc_queue_remove_entries(wop, end_marker); + marker = end_marker; + } + + ASSERT_EQ(consume_count, number_of_ops*number_of_elements); + // execute all delete operations in a batch + ASSERT_EQ(0, ioctx.operate(queue_name, &wop)); + // make sure that queue is empty + ASSERT_EQ(cls_2pc_queue_list_entries(ioctx, queue_name, marker, max_elements, entries, &truncated, end_marker), 0); + ASSERT_EQ(entries.size(), 0); +} + TEST_F(TestCls2PCQueue, MultiProducerConsumer) { - const std::string queue_name = "my-queue"; + const std::string queue_name = __PRETTY_FUNCTION__; const auto max_size = 1024*1024; const auto number_of_ops = 300U; const auto number_of_elements = 23U; @@ -643,7 +758,7 @@ TEST_F(TestCls2PCQueue, MultiProducerConsumer) return bl; }); cls_2pc_reservation::id_t res_id = cls_2pc_reservation::NO_ID; - auto rc = cls_2pc_queue_reserve(ioctx, queue_name, op, total_size, number_of_elements, res_id); + auto rc = cls_2pc_queue_reserve(ioctx, queue_name, total_size, number_of_elements, res_id); while (rc != 0) { // other errors should cause test to fail ASSERT_EQ(rc, -ENOSPC); @@ -651,7 +766,7 @@ TEST_F(TestCls2PCQueue, MultiProducerConsumer) // queue is full, sleep and retry retry_happened = true; std::this_thread::sleep_for(std::chrono::milliseconds(10)); - rc = cls_2pc_queue_reserve(ioctx, queue_name, op, total_size, number_of_elements, res_id); + rc = cls_2pc_queue_reserve(ioctx, queue_name, total_size, number_of_elements, res_id); }; ASSERT_NE(res_id, 0); cls_2pc_queue_commit(op, data, res_id); @@ -704,7 +819,7 @@ TEST_F(TestCls2PCQueue, MultiProducerConsumer) TEST_F(TestCls2PCQueue, ReserveSpillover) { - const std::string queue_name = "my-queue"; + const std::string queue_name = __PRETTY_FUNCTION__; const auto max_size = 1024U*1024U; const auto number_of_ops = 1024U; const auto number_of_elements = 8U; @@ -716,7 +831,7 @@ TEST_F(TestCls2PCQueue, ReserveSpillover) for (auto i = 0U; i < number_of_ops; ++i) { cls_2pc_reservation::id_t res_id; - ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, op, size_to_reserve, number_of_elements, res_id), 0); + ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, size_to_reserve, number_of_elements, res_id), 0); ASSERT_NE(res_id, cls_2pc_reservation::NO_ID); } cls_2pc_reservations reservations; @@ -730,7 +845,7 @@ TEST_F(TestCls2PCQueue, ReserveSpillover) TEST_F(TestCls2PCQueue, CommitSpillover) { - const std::string queue_name = "my-queue"; + const std::string queue_name = __PRETTY_FUNCTION__; const auto max_size = 1024U*1024U; const auto number_of_ops = 1024U; const auto number_of_elements = 4U; @@ -742,7 +857,7 @@ TEST_F(TestCls2PCQueue, CommitSpillover) for (auto i = 0U; i < number_of_ops; ++i) { cls_2pc_reservation::id_t res_id; - ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, op, size_to_reserve, number_of_elements, res_id), 0); + ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, size_to_reserve, number_of_elements, res_id), 0); ASSERT_NE(res_id, cls_2pc_reservation::NO_ID); } cls_2pc_reservations reservations; @@ -768,7 +883,7 @@ TEST_F(TestCls2PCQueue, CommitSpillover) TEST_F(TestCls2PCQueue, AbortSpillover) { - const std::string queue_name = "my-queue"; + const std::string queue_name = __PRETTY_FUNCTION__; const auto max_size = 1024U*1024U; const auto number_of_ops = 1024U; const auto number_of_elements = 4U; @@ -780,7 +895,7 @@ TEST_F(TestCls2PCQueue, AbortSpillover) for (auto i = 0U; i < number_of_ops; ++i) { cls_2pc_reservation::id_t res_id; - ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, op, size_to_reserve, number_of_elements, res_id), 0); + ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, size_to_reserve, number_of_elements, res_id), 0); ASSERT_NE(res_id, cls_2pc_reservation::NO_ID); } cls_2pc_reservations reservations;