From: Ali Masarwa Date: Tue, 31 Oct 2023 13:55:41 +0000 (+0200) Subject: RGW: make new rados support old RGW 2pc remove X-Git-Tag: v19.0.0~39^2 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=855098f191ea68c87fb09e6a44e34e7336534895;p=ceph.git RGW: make new rados support old RGW 2pc remove Signed-off-by: Ali Masarwa --- diff --git a/src/cls/2pc_queue/cls_2pc_queue.cc b/src/cls/2pc_queue/cls_2pc_queue.cc index 019f2c96deafe..1e99503943a35 100644 --- a/src/cls/2pc_queue/cls_2pc_queue.cc +++ b/src/cls/2pc_queue/cls_2pc_queue.cc @@ -578,6 +578,19 @@ static int cls_2pc_queue_list_entries(cls_method_context_t hctx, bufferlist *in, return 0; } +static int cls_2pc_queue_count_entries(cls_method_context_t hctx, cls_queue_list_op& op, cls_queue_head& head, + uint32_t& entries_to_remove) +{ + cls_queue_list_ret op_ret; + auto ret = queue_list_entries(hctx, op, op_ret, head); + if (ret < 0) { + return ret; + } + + entries_to_remove = op_ret.entries.size(); + return 0; +} + static int cls_2pc_queue_remove_entries(cls_method_context_t hctx, bufferlist *in, bufferlist *out) { auto in_iter = in->cbegin(); @@ -594,6 +607,21 @@ static int cls_2pc_queue_remove_entries(cls_method_context_t hctx, bufferlist *i if (ret < 0) { return ret; } + + // Old RGW is running, and it sent cls_queue_remove_op instead of cls_2pc_queue_remove_op + if (rem_2pc_op.entries_to_remove == 0) { + CLS_LOG(10, "INFO: cls_2pc_queue_remove_entries: incompatible RGW with rados, counting entries to remove..."); + cls_queue_list_op list_op; + list_op.max = std::numeric_limits::max(); // max length because endmarker is the stopping condition. + list_op.end_marker = rem_2pc_op.end_marker; + ret = cls_2pc_queue_count_entries(hctx, list_op, head, rem_2pc_op.entries_to_remove); + if (ret < 0) { + CLS_LOG(1, "ERROR: cls_2pc_queue_count_entries: returned: %d", ret); + return ret; + } + CLS_LOG(10, "INFO: cls_2pc_queue_count_entries: counted: %u", rem_2pc_op.entries_to_remove); + } + cls_queue_remove_op rem_op; rem_op.end_marker = std::move(rem_2pc_op.end_marker); ret = queue_remove_entries(hctx, rem_op, head); diff --git a/src/cls/2pc_queue/cls_2pc_queue_client.h b/src/cls/2pc_queue/cls_2pc_queue_client.h index c806d30f59e0f..0d55d68e7a05c 100644 --- a/src/cls/2pc_queue/cls_2pc_queue_client.h +++ b/src/cls/2pc_queue/cls_2pc_queue_client.h @@ -87,5 +87,8 @@ void cls_2pc_queue_expire_reservations(librados::ObjectWriteOperation& op, ceph::coarse_real_time stale_time); // remove all entries up to the given marker -void cls_2pc_queue_remove_entries(librados::ObjectWriteOperation& op, const std::string& end_marker, uint64_t entries_to_remove); +// if there is no race condition, providing the number of entries_to_remove is recommended, as it is more efficient. +// if there is no guarantee against two clienst deleting entries at the same time, you can leave the entries_to_remove unprovided or input zero entries_to_remove +// the function will count how many entries it needs to removed +void cls_2pc_queue_remove_entries(librados::ObjectWriteOperation& op, const std::string& end_marker, uint64_t entries_to_remove=0); diff --git a/src/cls/2pc_queue/cls_2pc_queue_ops.h b/src/cls/2pc_queue/cls_2pc_queue_ops.h index bb61ef341ac1e..194fdf6da9d56 100644 --- a/src/cls/2pc_queue/cls_2pc_queue_ops.h +++ b/src/cls/2pc_queue/cls_2pc_queue_ops.h @@ -118,21 +118,23 @@ WRITE_CLASS_ENCODER(cls_2pc_queue_reservations_ret) struct cls_2pc_queue_remove_op { std::string end_marker; - uint32_t entries_to_remove; + uint32_t entries_to_remove = 0; cls_2pc_queue_remove_op() {} void encode(ceph::buffer::list& bl) const { - ENCODE_START(1, 1, bl); + ENCODE_START(2, 1, bl); encode(end_marker, bl); encode(entries_to_remove, bl); ENCODE_FINISH(bl); } void decode(ceph::buffer::list::const_iterator& bl) { - DECODE_START(1, bl); + DECODE_START(2, bl); decode(end_marker, bl); - decode(entries_to_remove, bl); + if (struct_v > 1) { + decode(entries_to_remove, bl); + } DECODE_FINISH(bl); } }; diff --git a/src/cls/queue/cls_queue_client.cc b/src/cls/queue/cls_queue_client.cc index 87d17bb9e3157..d3d38a9214e50 100644 --- a/src/cls/queue/cls_queue_client.cc +++ b/src/cls/queue/cls_queue_client.cc @@ -48,16 +48,9 @@ void cls_queue_enqueue(ObjectWriteOperation& op, uint32_t expiration_secs, vecto op.exec(QUEUE_CLASS, QUEUE_ENQUEUE, in); } -int cls_queue_list_entries(IoCtx& io_ctx, const string& oid, const string& marker, uint32_t max, - vector& entries, - bool *truncated, string& next_marker) +int cls_queue_list_entries_inner(IoCtx& io_ctx, const string& oid, vector& entries, + bool *truncated, string& next_marker, bufferlist& in, bufferlist& out) { - bufferlist in, out; - cls_queue_list_op op; - op.start_marker = marker; - op.max = max; - encode(op, in); - int r = io_ctx.exec(oid, QUEUE_CLASS, QUEUE_LIST_ENTRIES, in, out); if (r < 0) return r; @@ -78,6 +71,33 @@ int cls_queue_list_entries(IoCtx& io_ctx, const string& oid, const string& marke return 0; } +int cls_queue_list_entries(IoCtx& io_ctx, const string& oid, const string& marker, uint32_t max, + vector& entries, + bool *truncated, string& next_marker) +{ + bufferlist in, out; + cls_queue_list_op op; + op.start_marker = marker; + op.max = max; + encode(op, in); + + return cls_queue_list_entries_inner(io_ctx, oid, entries, truncated, next_marker, in, out); +} + +int cls_queue_list_entries(IoCtx& io_ctx, const string& oid, const string& marker, const string& end_marker, + vector& entries, + bool *truncated, string& next_marker) +{ + bufferlist in, out; + cls_queue_list_op op; + op.start_marker = marker; + op.max = std::numeric_limits::max(); + op.end_marker = end_marker; + encode(op, in); + + return cls_queue_list_entries_inner(io_ctx, oid, entries, truncated, next_marker, in, out); +} + void cls_queue_remove_entries(ObjectWriteOperation& op, const string& end_marker) { bufferlist in, out; diff --git a/src/cls/queue/cls_queue_client.h b/src/cls/queue/cls_queue_client.h index 895a51c117379..903448fd480bf 100644 --- a/src/cls/queue/cls_queue_client.h +++ b/src/cls/queue/cls_queue_client.h @@ -11,6 +11,8 @@ int cls_queue_get_capacity(librados::IoCtx& io_ctx, const std::string& oid, uint void cls_queue_enqueue(librados::ObjectWriteOperation& op, uint32_t expiration_secs, std::vector bl_data_vec); int cls_queue_list_entries(librados::IoCtx& io_ctx, const std::string& oid, const std::string& marker, uint32_t max, std::vector& entries, bool *truncated, std::string& next_marker); +int cls_queue_list_entries(librados::IoCtx& io_ctx, const std::string& oid, const std::string& marker, const std::string& end_marker, + std::vector& entries, bool *truncated, std::string& next_marker); void cls_queue_remove_entries(librados::ObjectWriteOperation& op, const std::string& end_marker); #endif diff --git a/src/cls/queue/cls_queue_ops.h b/src/cls/queue/cls_queue_ops.h index 8209659bda907..f41572036c514 100644 --- a/src/cls/queue/cls_queue_ops.h +++ b/src/cls/queue/cls_queue_ops.h @@ -54,20 +54,25 @@ WRITE_CLASS_ENCODER(cls_queue_enqueue_op) struct cls_queue_list_op { uint64_t max; std::string start_marker; + std::string end_marker; cls_queue_list_op() {} void encode(ceph::buffer::list& bl) const { - ENCODE_START(1, 1, bl); + ENCODE_START(2, 1, bl); encode(max, bl); encode(start_marker, bl); + encode(end_marker, bl); ENCODE_FINISH(bl); } void decode(ceph::buffer::list::const_iterator& bl) { - DECODE_START(1, bl); + DECODE_START(2, bl); decode(max, bl); decode(start_marker, bl); + if (struct_v > 1) { + decode(end_marker, bl); + } DECODE_FINISH(bl); } }; diff --git a/src/cls/queue/cls_queue_src.cc b/src/cls/queue/cls_queue_src.cc index b34d9929b93a1..028b1cb123c91 100644 --- a/src/cls/queue/cls_queue_src.cc +++ b/src/cls/queue/cls_queue_src.cc @@ -400,6 +400,10 @@ int queue_list_entries(cls_method_context_t hctx, const cls_queue_list_op& op, c CLS_LOG(10, "INFO: queue_list_entries(): not enough data to read data, breaking out!"); break; } + if (!op.end_marker.empty() && entry.marker == op.end_marker) { + last_marker = entry.marker; + break; + } op_ret.entries.emplace_back(entry); // Resetting some values offset_populated = false; @@ -414,11 +418,17 @@ int queue_list_entries(cls_method_context_t hctx, const cls_queue_list_op& op, c } } while(index < bl_chunk.length()); - CLS_LOG(10, "INFO: num_ops: %lu and op.max is %lu\n", num_ops, op.max); + CLS_LOG(10, "INFO: num_ops: %lu and op.max is %lu, last_marker: %s and op.end_marker is %s\n", + num_ops, op.max, last_marker.c_str(), op.end_marker.c_str()); - if (num_ops == op.max) { - next_marker = cls_queue_marker{(entry_start_offset + index), gen}; - CLS_LOG(10, "INFO: queue_list_entries(): num_ops is same as op.max, hence breaking out from outer loop with next offset: %lu", next_marker.offset); + if (num_ops == op.max || (!op.end_marker.empty() && op.end_marker == last_marker)) { + if (!op.end_marker.empty()) { + next_marker.from_str(op.end_marker.c_str()); + } else { + next_marker = cls_queue_marker{(entry_start_offset + index), gen}; + } + CLS_LOG(10, "INFO: queue_list_entries(): either num_ops is same as op.max or last_marker is same as op.end_marker, " + "hence breaking out from outer loop with next offset: %lu", next_marker.offset); break; } diff --git a/src/test/cls_2pc_queue/test_cls_2pc_queue.cc b/src/test/cls_2pc_queue/test_cls_2pc_queue.cc index 14947244d41f4..8f392721f5b74 100644 --- a/src/test/cls_2pc_queue/test_cls_2pc_queue.cc +++ b/src/test/cls_2pc_queue/test_cls_2pc_queue.cc @@ -11,6 +11,7 @@ #include "gtest/gtest.h" #include "test/librados/test_cxx.h" #include "global/global_context.h" +#include "cls/2pc_queue/cls_2pc_queue_const.h" #include #include @@ -173,6 +174,131 @@ TEST_F(TestCls2PCQueue, Commit) ASSERT_EQ(reservations.size(), 0); } +TEST_F(TestCls2PCQueue, Stats) +{ + const std::string queue_name = __PRETTY_FUNCTION__; + const auto max_size = 1024*1024*128; + const auto number_of_ops = 200U; + const auto number_of_elements = 23U; + auto total_committed_elements = 0U; + librados::ObjectWriteOperation op; + op.create(true); + cls_2pc_queue_init(op, queue_name, max_size); + ASSERT_EQ(0, ioctx.operate(queue_name, &op)); + + for (auto i = 0U; i < number_of_ops; ++i) { + const std::string element_prefix("op-" +to_string(i) + "-element-"); + auto total_size = 0UL; + std::vector data(number_of_elements); + // create vector of buffer lists + std::generate(data.begin(), data.end(), [j = 0, &element_prefix, &total_size] () mutable { + bufferlist bl; + bl.append(element_prefix + to_string(j++)); + total_size += bl.length(); + return bl; + }); + + cls_2pc_reservation::id_t res_id; + ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, total_size, number_of_elements, res_id), 0); + ASSERT_NE(res_id, cls_2pc_reservation::NO_ID); + cls_2pc_queue_commit(op, data, res_id); + ASSERT_EQ(0, ioctx.operate(queue_name, &op)); + + total_committed_elements += number_of_elements; + uint32_t committed_entries; + uint64_t size; + + ASSERT_EQ(cls_2pc_queue_get_topic_stats(ioctx, queue_name, committed_entries, size), 0); + ASSERT_EQ(committed_entries, total_committed_elements); + } + cls_2pc_reservations reservations; + ASSERT_EQ(0, cls_2pc_queue_list_reservations(ioctx, queue_name, reservations)); + ASSERT_EQ(reservations.size(), 0); +} + +TEST_F(TestCls2PCQueue, UpgradeFromReef) +{ + const std::string queue_name = __PRETTY_FUNCTION__; + const auto max_size = 1024*1024*128; + const auto number_of_ops = 200U; + const auto number_of_elements = 23U; + auto total_committed_elements = 0U; + librados::ObjectWriteOperation wop; + wop.create(true); + cls_2pc_queue_init(wop, queue_name, max_size); + ASSERT_EQ(0, ioctx.operate(queue_name, &wop)); + + for (auto i = 0U; i < number_of_ops; ++i) { + const std::string element_prefix("wop-" +to_string(i) + "-element-"); + auto total_size = 0UL; + std::vector data(number_of_elements); + // create vector of buffer lists + std::generate(data.begin(), data.end(), [j = 0, &element_prefix, &total_size] () mutable { + bufferlist bl; + bl.append(element_prefix + to_string(j++)); + total_size += bl.length(); + return bl; + }); + + cls_2pc_reservation::id_t res_id; + ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, total_size, number_of_elements, res_id), 0); + ASSERT_NE(res_id, cls_2pc_reservation::NO_ID); + cls_2pc_queue_commit(wop, data, res_id); + ASSERT_EQ(0, ioctx.operate(queue_name, &wop)); + + total_committed_elements += number_of_elements; + uint32_t committed_entries; + uint64_t size; + + ASSERT_EQ(cls_2pc_queue_get_topic_stats(ioctx, queue_name, committed_entries, size), 0); + ASSERT_EQ(committed_entries, total_committed_elements); + } + cls_2pc_reservations reservations; + ASSERT_EQ(0, cls_2pc_queue_list_reservations(ioctx, queue_name, reservations)); + ASSERT_EQ(reservations.size(), 0); + + constexpr auto max_elements = 42U; + std::string marker; + std::string end_marker; + librados::ObjectReadOperation rop; + auto consume_count = 0U; + std::vector entries; + bool truncated = true; + + auto simulate_reef_cls_2pc_queue_remove_entries = [](librados::ObjectWriteOperation& wop, const std::string& end_marker) { + bufferlist in; + cls_queue_remove_op rem_op; + rem_op.end_marker = end_marker; + encode(rem_op, in); + wop.exec(TPC_QUEUE_CLASS, TPC_QUEUE_REMOVE_ENTRIES, in); + }; + + while (truncated) { + bufferlist bl; + int rc; + cls_2pc_queue_list_entries(rop, marker, max_elements, &bl, &rc); + ASSERT_EQ(0, ioctx.operate(queue_name, &rop, nullptr)); + ASSERT_EQ(rc, 0); + ASSERT_EQ(cls_2pc_queue_list_entries_result(bl, entries, &truncated, end_marker), 0); + + consume_count += entries.size(); + // simulating reef cls_2pc_queue_remove_entries with cls_queue_remove_op + simulate_reef_cls_2pc_queue_remove_entries(wop, end_marker); + marker = end_marker; + total_committed_elements -= entries.size(); + } + + // execute all delete operations in a batch + ASSERT_EQ(0, ioctx.operate(queue_name, &wop)); + ASSERT_EQ(consume_count, number_of_ops*number_of_elements); + + uint32_t entries_number; + uint64_t size; + ASSERT_EQ(cls_2pc_queue_get_topic_stats(ioctx, queue_name, entries_number, size), 0); + ASSERT_EQ(total_committed_elements, 0); + ASSERT_EQ(entries_number, 0); +} + TEST_F(TestCls2PCQueue, Abort) { const std::string queue_name = __PRETTY_FUNCTION__; diff --git a/src/test/cls_queue/test_cls_queue.cc b/src/test/cls_queue/test_cls_queue.cc index cca615afb0a58..a1f68638c1f41 100644 --- a/src/test/cls_queue/test_cls_queue.cc +++ b/src/test/cls_queue/test_cls_queue.cc @@ -137,6 +137,51 @@ TEST_F(TestClsQueue, List) ASSERT_EQ(total_elements, number_of_ops*number_of_elements); } +TEST_F(TestClsQueue, ListByEndMarker) +{ + const std::string queue_name = "my-queue"; + const uint64_t queue_size = 1024*1024; + librados::ObjectWriteOperation op; + op.create(true); + cls_queue_init(op, queue_name, queue_size); + ASSERT_EQ(0, ioctx.operate(queue_name, &op)); + const auto number_of_ops = 10; + const auto number_of_elements = 100; + + // test multiple enqueues + test_enqueue(queue_name, number_of_ops, number_of_elements, 0); + + const auto max_elements = 42; + std::string marker, end_marker; + bool truncated = false; + std::string max_op_next_marker; + auto total_elements = 0; + do { + std::vector entries; + auto ret = cls_queue_list_entries(ioctx, queue_name, marker, max_elements, entries, &truncated, max_op_next_marker); + ASSERT_EQ(0, ret); + end_marker = max_op_next_marker; + + std::vector end_marker_entries; + std::string end_marker_next_marker; + bool end_marker_truncated = false; + ret = cls_queue_list_entries(ioctx, queue_name, marker, end_marker, end_marker_entries, + &end_marker_truncated, end_marker_next_marker); + ASSERT_EQ(0, ret); + + ASSERT_EQ(end_marker_next_marker, end_marker); + ASSERT_EQ(end_marker_entries.size(), entries.size()); + for (auto i = 0U; i < end_marker_entries.size() && i < entries.size(); ++i) { + ASSERT_EQ(end_marker_entries[i].marker, entries[i].marker); + } + + marker = max_op_next_marker; + total_elements += entries.size(); + } while (truncated); + + ASSERT_EQ(total_elements, number_of_ops*number_of_elements); +} + TEST_F(TestClsQueue, Dequeue) { const std::string queue_name = "my-queue";