From 9f105fa7ed57ba04599ec808e89d7d2d9cfe1336 Mon Sep 17 00:00:00 2001 From: Yuval Lifshitz Date: Wed, 11 Mar 2020 20:04:04 +0200 Subject: [PATCH] cls/queue: spillover of urgent data to xattrs Signed-off-by: Yuval Lifshitz --- src/cls/2pc_queue/cls_2pc_queue.cc | 182 +++++++++++++++++-- src/cls/2pc_queue/cls_2pc_queue_types.h | 3 + src/test/cls_2pc_queue/test_cls_2pc_queue.cc | 92 ++++++++++ 3 files changed, 259 insertions(+), 18 deletions(-) diff --git a/src/cls/2pc_queue/cls_2pc_queue.cc b/src/cls/2pc_queue/cls_2pc_queue.cc index 539105f9a28..a28fb648eda 100644 --- a/src/cls/2pc_queue/cls_2pc_queue.cc +++ b/src/cls/2pc_queue/cls_2pc_queue.cc @@ -13,6 +13,8 @@ CLS_VER(1,0) CLS_NAME(2pc_queue) +constexpr auto CLS_QUEUE_URGENT_DATA_XATTR_NAME = "cls_queue_urgent_data"; + static int cls_2pc_queue_init(cls_method_context_t hctx, bufferlist *in, bufferlist *out) { auto in_iter = in->cbegin(); @@ -87,9 +89,13 @@ static int cls_2pc_queue_reserve(cls_method_context_t hctx, bufferlist *in, buff } urgent_data.reserved_size += res_op.size + overhead; + // note that last id is incremented regadless of failures + // to avoid "old reservation" issues below + ++urgent_data.last_id; bool result; - std::tie(std::ignore, result) = urgent_data.reservations.emplace(std::piecewise_construct, - std::forward_as_tuple(++urgent_data.last_id), + cls_2pc_reservations::iterator last_reservation; + std::tie(last_reservation, result) = urgent_data.reservations.emplace(std::piecewise_construct, + std::forward_as_tuple(urgent_data.last_id), std::forward_as_tuple(res_op.size, ceph::real_clock::now())); if (!result) { // an old reservation that was never committed or aborted is in the map @@ -103,11 +109,50 @@ static int cls_2pc_queue_reserve(cls_method_context_t hctx, bufferlist *in, buff encode(urgent_data, head.bl_urgent_data); const uint64_t urgent_data_length = head.bl_urgent_data.length(); + auto total_entries = urgent_data.reservations.size(); if (head.max_urgent_data_size < urgent_data_length) { - // TODO: use objects xattr for spillover - CLS_LOG(1, "ERROR: cls_2pc_queue_reserve: urgent data size: %lu exceeded maximum: %lu", urgent_data_length, head.max_urgent_data_size); - return -ENOSPC; + CLS_LOG(10, "INFO: cls_2pc_queue_reserve: urgent data size: %lu exceeded maximum: %lu using xattrs", urgent_data_length, head.max_urgent_data_size); + // add the last reservation to xattrs + bufferlist bl_xattrs; + auto ret = cls_cxx_getxattr(hctx, CLS_QUEUE_URGENT_DATA_XATTR_NAME, &bl_xattrs); + if (ret < 0 && (ret != -ENOENT && ret != -ENODATA)) { + CLS_LOG(1, "ERROR: cls_2pc_queue_reserve: failed to read xattrs with: %d", ret); + return ret; + } + cls_2pc_reservations xattr_reservations; + if (ret >= 0) { + // xattrs exist + auto iter = bl_xattrs.cbegin(); + try { + decode(xattr_reservations, iter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: cls_2pc_queue_reserve: failed to decode xattrs urgent data map"); + return -EINVAL; + } //end - catch + } + std::tie(std::ignore, result) = xattr_reservations.emplace(std::piecewise_construct, + std::forward_as_tuple(urgent_data.last_id), + std::forward_as_tuple(res_op.size, ceph::real_clock::now())); + if (!result) { + // an old reservation that was never committed or aborted is in the map + // caller should try again assuming other IDs are ok + CLS_LOG(1, "ERROR: cls_2pc_queue_reserve: reservation id conflict inside xattrs after rollover: %u", urgent_data.last_id); + return -EAGAIN; + } + bl_xattrs.clear(); + encode(xattr_reservations, bl_xattrs); + ret = cls_cxx_setxattr(hctx, CLS_QUEUE_URGENT_DATA_XATTR_NAME, &bl_xattrs); + if (ret < 0) { + CLS_LOG(1, "ERROR: cls_2pc_queue_reserve: failed to write xattrs with: %d", ret); + return ret; + } + // remove the last reservation from the reservation list + // and indicate that spillover happened + urgent_data.has_xattrs = true; + urgent_data.reservations.erase(last_reservation); + head.bl_urgent_data.clear(); + encode(urgent_data, head.bl_urgent_data); } ret = queue_write_head(hctx, head); @@ -119,7 +164,7 @@ static int cls_2pc_queue_reserve(cls_method_context_t hctx, bufferlist *in, buff CLS_LOG(20, "INFO: cls_2pc_queue_reserve: current reservations: %lu (bytes)", urgent_data.reserved_size); CLS_LOG(20, "INFO: cls_2pc_queue_reserve: requested size: %lu (bytes)", res_op.size); CLS_LOG(20, "INFO: cls_2pc_queue_reserve: urgent data size: %lu (bytes)", urgent_data_length); - CLS_LOG(20, "INFO: cls_2pc_queue_reserve: current reservation ops: %lu", urgent_data.reservations.size()); + CLS_LOG(20, "INFO: cls_2pc_queue_reserve: current reservation entries: %lu", total_entries); cls_2pc_queue_reserve_ret op_ret; op_ret.id = urgent_data.last_id; @@ -155,9 +200,36 @@ static int cls_2pc_queue_commit(cls_method_context_t hctx, bufferlist *in, buffe } auto it = urgent_data.reservations.find(commit_op.id); + cls_2pc_reservations xattr_reservations; + bufferlist bl_xattrs; if (it == urgent_data.reservations.end()) { - CLS_LOG(1, "ERROR: cls_2pc_queue_commit: reservation does not exist: %u", commit_op.id); - return -ENOENT; + if (!urgent_data.has_xattrs) { + CLS_LOG(1, "ERROR: cls_2pc_queue_commit: reservation does not exist: %u", commit_op.id); + return -ENOENT; + } + // try to look for the reservation in xattrs + auto ret = cls_cxx_getxattr(hctx, CLS_QUEUE_URGENT_DATA_XATTR_NAME, &bl_xattrs); + if (ret < 0) { + if (ret == -ENOENT || ret == -ENODATA) { + // no xattrs, reservation does not exists + CLS_LOG(1, "ERROR: cls_2pc_queue_commit: reservation does not exist: %u", commit_op.id); + return -ENOENT; + } + CLS_LOG(1, "ERROR: cls_2pc_queue_commit: failed to read xattrs with: %d", ret); + return ret; + } + auto iter = bl_xattrs.cbegin(); + try { + decode(xattr_reservations, iter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: cls_2pc_queue_commit: failed to decode xattrs urgent data map"); + return -EINVAL; + } //end - catch + it = xattr_reservations.find(commit_op.id); + if (it == urgent_data.reservations.end()) { + CLS_LOG(1, "ERROR: cls_2pc_queue_commit: reservation does not exist: %u", commit_op.id); + return -ENOENT; + } } auto& res = it->second; @@ -181,12 +253,27 @@ static int cls_2pc_queue_commit(cls_method_context_t hctx, bufferlist *in, buffe return ret; } - // remove the reservation urgent_data.reserved_size -= res.size; - urgent_data.reservations.erase(it); + + if (xattr_reservations.empty()) { + // remove the reservation from urgent data + urgent_data.reservations.erase(it); + } else { + // remove the reservation from xattrs + xattr_reservations.erase(it); + bl_xattrs.clear(); + encode(xattr_reservations, bl_xattrs); + ret = cls_cxx_setxattr(hctx, CLS_QUEUE_URGENT_DATA_XATTR_NAME, &bl_xattrs); + if (ret < 0) { + CLS_LOG(1, "ERROR: cls_2pc_queue_commit: failed to write xattrs with: %d", ret); + return ret; + } + } CLS_LOG(20, "INFO: cls_2pc_queue_commit: current reservations: %lu (bytes)", urgent_data.reserved_size); - CLS_LOG(20, "INFO: cls_2pc_queue_reserve: current reservation ops: %lu", urgent_data.reservations.size()); + CLS_LOG(20, "INFO: cls_2pc_queue_commit: current reservation entries: %lu", + urgent_data.reservations.size() + xattr_reservations.size()); + // write back head head.bl_urgent_data.clear(); encode(urgent_data, head.bl_urgent_data); @@ -218,19 +305,57 @@ static int cls_2pc_queue_abort(cls_method_context_t hctx, bufferlist *in, buffer CLS_LOG(1, "ERROR: cls_2pc_queue_abort: failed to decode entry: %s", err.what()); return -EINVAL; } - + + auto total_entries = urgent_data.reservations.size(); auto it = urgent_data.reservations.find(abort_op.id); if (it == urgent_data.reservations.end()) { - CLS_LOG(10, "INFO: cls_2pc_queue_abort: reservation does not exist: %u", abort_op.id); - return 0; + if (!urgent_data.has_xattrs) { + CLS_LOG(20, "INFO: cls_2pc_queue_abort: reservation does not exist: %u", abort_op.id); + return 0; + } + // try to look for the reservation in xattrs + bufferlist bl_xattrs; + auto ret = cls_cxx_getxattr(hctx, CLS_QUEUE_URGENT_DATA_XATTR_NAME, &bl_xattrs); + if (ret < 0) { + if (ret == -ENOENT || ret == -ENODATA) { + // no xattrs, reservation does not exists + CLS_LOG(20, "INFO: cls_2pc_queue_abort: reservation does not exist: %u", abort_op.id); + return 0; + } + CLS_LOG(1, "ERROR: cls_2pc_queue_abort: failed to read xattrs with: %d", ret); + return ret; + } + auto iter = bl_xattrs.cbegin(); + cls_2pc_reservations xattr_reservations; + try { + decode(xattr_reservations, iter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: cls_2pc_queue_abort: failed to decode xattrs urgent data map"); + return -EINVAL; + } //end - catch + it = xattr_reservations.find(abort_op.id); + if (it == xattr_reservations.end()) { + CLS_LOG(20, "INFO: cls_2pc_queue_abort: reservation does not exist: %u", abort_op.id); + return 0; + } + total_entries += xattr_reservations.size(); + xattr_reservations.erase(it); + bl_xattrs.clear(); + encode(xattr_reservations, bl_xattrs); + ret = cls_cxx_setxattr(hctx, CLS_QUEUE_URGENT_DATA_XATTR_NAME, &bl_xattrs); + if (ret < 0) { + CLS_LOG(1, "ERROR: cls_2pc_queue_abort: failed to write xattrs with: %d", ret); + return ret; + } + } else { + urgent_data.reservations.erase(it); } // remove the reservation urgent_data.reserved_size -= it->second.size; - urgent_data.reservations.erase(it); CLS_LOG(20, "INFO: cls_2pc_queue_abort: current reservations: %lu (bytes)", urgent_data.reserved_size); - CLS_LOG(20, "INFO: cls_2pc_queue_reserve: current reservation ops: %lu", urgent_data.reservations.size()); + CLS_LOG(20, "INFO: cls_2pc_queue_abort: current reservation entries: %lu", total_entries-1); // write back head head.bl_urgent_data.clear(); @@ -241,7 +366,7 @@ static int cls_2pc_queue_abort(cls_method_context_t hctx, bufferlist *in, buffer static int cls_2pc_queue_list_reservations(cls_method_context_t hctx, bufferlist *in, bufferlist *out) { //get head cls_queue_head head; - int ret = queue_read_head(hctx, head); + auto ret = queue_read_head(hctx, head); if (ret < 0) { return ret; } @@ -255,9 +380,30 @@ static int cls_2pc_queue_list_reservations(cls_method_context_t hctx, bufferlist return -EINVAL; } + CLS_LOG(20, "INFO: cls_2pc_queue_list_reservations: %lu reservation entries found", urgent_data.reservations.size()); cls_2pc_queue_reservations_ret op_ret; op_ret.reservations = std::move(urgent_data.reservations); - + if (urgent_data.has_xattrs) { + // try to look for the reservation in xattrs + cls_2pc_reservations xattr_reservations; + bufferlist bl_xattrs; + ret = cls_cxx_getxattr(hctx, CLS_QUEUE_URGENT_DATA_XATTR_NAME, &bl_xattrs); + if (ret < 0 && (ret != -ENOENT && ret != -ENODATA)) { + CLS_LOG(1, "ERROR: cls_2pc_queue_list_reservations: failed to read xattrs with: %d", ret); + return ret; + } + if (ret >= 0) { + auto iter = bl_xattrs.cbegin(); + try { + decode(xattr_reservations, iter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: cls_2pc_queue_list_reservations: failed to decode xattrs urgent data map"); + return -EINVAL; + } //end - catch + CLS_LOG(20, "INFO: cls_2pc_queue_list_reservations: %lu reservation entries found in xatts", xattr_reservations.size()); + op_ret.reservations.merge(xattr_reservations); + } + } encode(op_ret, *out); return 0; diff --git a/src/cls/2pc_queue/cls_2pc_queue_types.h b/src/cls/2pc_queue/cls_2pc_queue_types.h index acf6534d3d8..b0a175cc472 100644 --- a/src/cls/2pc_queue/cls_2pc_queue_types.h +++ b/src/cls/2pc_queue/cls_2pc_queue_types.h @@ -37,12 +37,14 @@ struct cls_2pc_urgent_data uint64_t reserved_size{0}; // pending reservations size in bytes cls_2pc_reservation::id_t last_id{cls_2pc_reservation::NO_ID}; // last allocated id cls_2pc_reservations reservations; // reservation list (keyed by id) + bool has_xattrs{false}; void encode(bufferlist& bl) const { ENCODE_START(1, 1, bl); encode(reserved_size, bl); encode(last_id, bl); encode(reservations, bl); + encode(has_xattrs, bl); ENCODE_FINISH(bl); } @@ -51,6 +53,7 @@ struct cls_2pc_urgent_data decode(reserved_size, bl); decode(last_id, bl); decode(reservations, bl); + decode(has_xattrs, bl); DECODE_FINISH(bl); } }; diff --git a/src/test/cls_2pc_queue/test_cls_2pc_queue.cc b/src/test/cls_2pc_queue/test_cls_2pc_queue.cc index 6ca8817e563..9b325b258d0 100644 --- a/src/test/cls_2pc_queue/test_cls_2pc_queue.cc +++ b/src/test/cls_2pc_queue/test_cls_2pc_queue.cc @@ -702,3 +702,95 @@ TEST_F(TestCls2PCQueue, MultiProducerConsumer) ASSERT_EQ(entries.size(), 0); } +TEST_F(TestCls2PCQueue, ReserveSpillover) +{ + const std::string queue_name = "my-queue"; + const auto max_size = 1024U*1024U; + const auto number_of_ops = 1024U; + const auto number_of_elements = 8U; + const auto size_to_reserve = 64U; + librados::ObjectWriteOperation op; + op.create(true); + cls_2pc_queue_init(op, queue_name, max_size); + ASSERT_EQ(0, ioctx.operate(queue_name, &op)); + + for (auto i = 0U; i < number_of_ops; ++i) { + cls_2pc_reservation::id_t res_id; + ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, op, size_to_reserve, number_of_elements, res_id), 0); + ASSERT_NE(res_id, cls_2pc_reservation::NO_ID); + } + cls_2pc_reservations reservations; + ASSERT_EQ(0, cls_2pc_queue_list_reservations(ioctx, queue_name, reservations)); + ASSERT_EQ(reservations.size(), number_of_ops); + for (const auto& r : reservations) { + ASSERT_NE(r.first, cls_2pc_reservation::NO_ID); + ASSERT_GT(r.second.timestamp.time_since_epoch().count(), 0); + } +} + +TEST_F(TestCls2PCQueue, CommitSpillover) +{ + const std::string queue_name = "my-queue"; + const auto max_size = 1024U*1024U; + const auto number_of_ops = 1024U; + const auto number_of_elements = 4U; + const auto size_to_reserve = 128U; + librados::ObjectWriteOperation op; + op.create(true); + cls_2pc_queue_init(op, queue_name, max_size); + ASSERT_EQ(0, ioctx.operate(queue_name, &op)); + + for (auto i = 0U; i < number_of_ops; ++i) { + cls_2pc_reservation::id_t res_id; + ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, op, size_to_reserve, number_of_elements, res_id), 0); + ASSERT_NE(res_id, cls_2pc_reservation::NO_ID); + } + cls_2pc_reservations reservations; + ASSERT_EQ(0, cls_2pc_queue_list_reservations(ioctx, queue_name, reservations)); + for (const auto& r : reservations) { + const std::string element_prefix("foo"); + std::vector data(number_of_elements); + auto total_size = 0UL; + // create vector of buffer lists + std::generate(data.begin(), data.end(), [j = 0, &element_prefix, &total_size] () mutable { + bufferlist bl; + bl.append(element_prefix + to_string(j++)); + total_size += bl.length(); + return bl; + }); + ASSERT_NE(r.first, cls_2pc_reservation::NO_ID); + cls_2pc_queue_commit(op, data, r.first); + ASSERT_EQ(0, ioctx.operate(queue_name, &op)); + } + ASSERT_EQ(0, cls_2pc_queue_list_reservations(ioctx, queue_name, reservations)); + ASSERT_EQ(reservations.size(), 0); +} + +TEST_F(TestCls2PCQueue, AbortSpillover) +{ + const std::string queue_name = "my-queue"; + const auto max_size = 1024U*1024U; + const auto number_of_ops = 1024U; + const auto number_of_elements = 4U; + const auto size_to_reserve = 128U; + librados::ObjectWriteOperation op; + op.create(true); + cls_2pc_queue_init(op, queue_name, max_size); + ASSERT_EQ(0, ioctx.operate(queue_name, &op)); + + for (auto i = 0U; i < number_of_ops; ++i) { + cls_2pc_reservation::id_t res_id; + ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, op, size_to_reserve, number_of_elements, res_id), 0); + ASSERT_NE(res_id, cls_2pc_reservation::NO_ID); + } + cls_2pc_reservations reservations; + ASSERT_EQ(0, cls_2pc_queue_list_reservations(ioctx, queue_name, reservations)); + for (const auto& r : reservations) { + ASSERT_NE(r.first, cls_2pc_reservation::NO_ID); + cls_2pc_queue_abort(op, r.first); + ASSERT_EQ(0, ioctx.operate(queue_name, &op)); + } + ASSERT_EQ(0, cls_2pc_queue_list_reservations(ioctx, queue_name, reservations)); + ASSERT_EQ(reservations.size(), 0); +} + -- 2.39.5