From 4fba777a1d15bfd6d5b40c4820f3ec997695dedb Mon Sep 17 00:00:00 2001 From: Yuval Lifshitz Date: Mon, 17 Feb 2020 18:45:15 +0200 Subject: [PATCH] cls/queue: add 2-phase-commit queue implementation 2-phase commit (2pc) queue is based on the existing cls_queue together with the abilities to: * make a reservation of entries on the queue * commit entries based on a reservation * abort a reservation and release it * get a list of all pending reservations (so they could be cleaned) Signed-off-by: Yuval Lifshitz --- src/cls/2pc_queue/cls_2pc_queue.cc | 287 ++++++++ src/cls/2pc_queue/cls_2pc_queue_client.cc | 143 ++++ src/cls/2pc_queue/cls_2pc_queue_client.h | 47 ++ src/cls/2pc_queue/cls_2pc_queue_const.h | 10 + src/cls/2pc_queue/cls_2pc_queue_ops.h | 97 +++ src/cls/2pc_queue/cls_2pc_queue_types.h | 58 ++ src/cls/CMakeLists.txt | 18 + src/cls/queue/cls_queue_src.cc | 33 +- src/cls/queue/cls_queue_src.h | 6 +- src/cls/queue/cls_queue_types.h | 3 +- src/osd/CMakeLists.txt | 2 +- src/test/CMakeLists.txt | 1 + src/test/cls_2pc_queue/CMakeLists.txt | 18 + src/test/cls_2pc_queue/test_cls_2pc_queue.cc | 704 +++++++++++++++++++ 14 files changed, 1420 insertions(+), 7 deletions(-) create mode 100644 src/cls/2pc_queue/cls_2pc_queue.cc create mode 100644 src/cls/2pc_queue/cls_2pc_queue_client.cc create mode 100644 src/cls/2pc_queue/cls_2pc_queue_client.h create mode 100644 src/cls/2pc_queue/cls_2pc_queue_const.h create mode 100644 src/cls/2pc_queue/cls_2pc_queue_ops.h create mode 100644 src/cls/2pc_queue/cls_2pc_queue_types.h create mode 100644 src/test/cls_2pc_queue/CMakeLists.txt create mode 100644 src/test/cls_2pc_queue/test_cls_2pc_queue.cc diff --git a/src/cls/2pc_queue/cls_2pc_queue.cc b/src/cls/2pc_queue/cls_2pc_queue.cc new file mode 100644 index 00000000000..539105f9a28 --- /dev/null +++ b/src/cls/2pc_queue/cls_2pc_queue.cc @@ -0,0 +1,287 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "include/types.h" + +#include "cls/2pc_queue/cls_2pc_queue_types.h" +#include "cls/2pc_queue/cls_2pc_queue_ops.h" +#include "cls/2pc_queue/cls_2pc_queue_const.h" +#include "cls/queue/cls_queue_ops.h" +#include "cls/queue/cls_queue_src.h" +#include "objclass/objclass.h" + +CLS_VER(1,0) +CLS_NAME(2pc_queue) + +static int cls_2pc_queue_init(cls_method_context_t hctx, bufferlist *in, bufferlist *out) { + auto in_iter = in->cbegin(); + + cls_queue_init_op op; + try { + decode(op, in_iter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: cls_2pc_queue_init: failed to decode entry: %s", err.what()); + return -EINVAL; + } + + cls_2pc_urgent_data urgent_data; + + cls_queue_init_op init_op; + + CLS_LOG(20, "INFO: cls_2pc_queue_init: max size is %lu (bytes)", op.queue_size); + + init_op.queue_size = op.queue_size; + init_op.max_urgent_data_size = 23552; // overall head is 24KB ~ pending 1K reservations ops + encode(urgent_data, init_op.bl_urgent_data); + + return queue_init(hctx, init_op); +} + +static int cls_2pc_queue_reserve(cls_method_context_t hctx, bufferlist *in, bufferlist *out) { + cls_2pc_queue_reserve_op res_op; + try { + auto in_iter = in->cbegin(); + decode(res_op, in_iter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: cls_2pc_queue_reserve: failed to decode entry: %s", err.what()); + return -EINVAL; + } + + if (res_op.size == 0) { + CLS_LOG(1, "ERROR: cls_2pc_queue_reserve: cannot reserve zero bytes"); + return -EINVAL; + } + if (res_op.entries == 0) { + CLS_LOG(1, "ERROR: cls_2pc_queue_reserve: cannot reserve zero entries"); + return -EINVAL; + } + + // get head + cls_queue_head head; + int ret = queue_read_head(hctx, head); + if (ret < 0) { + return ret; + } + + cls_2pc_urgent_data urgent_data; + try { + auto in_iter = head.bl_urgent_data.cbegin(); + decode(urgent_data, in_iter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: cls_2pc_queue_reserve: failed to decode entry: %s", err.what()); + return -EINVAL; + } + + const auto overhead = res_op.entries*QUEUE_ENTRY_OVERHEAD; + const auto remaining_size = (head.tail.offset >= head.front.offset) ? + (head.queue_size - head.tail.offset) + (head.front.offset - head.max_head_size) : + head.front.offset - head.tail.offset; + + + if (res_op.size + urgent_data.reserved_size + overhead > remaining_size) { + CLS_LOG(1, "ERROR: cls_2pc_queue_reserve: reservations exceeded maximum capacity"); + CLS_LOG(10, "INFO: cls_2pc_queue_reserve: remaining size: %lu (bytes)", remaining_size); + CLS_LOG(10, "INFO: cls_2pc_queue_reserve: current reservations: %lu (bytes)", urgent_data.reserved_size); + CLS_LOG(10, "INFO: cls_2pc_queue_reserve: requested size: %lu (bytes)", res_op.size); + return -ENOSPC; + } + + urgent_data.reserved_size += res_op.size + overhead; + bool result; + std::tie(std::ignore, result) = urgent_data.reservations.emplace(std::piecewise_construct, + std::forward_as_tuple(++urgent_data.last_id), + std::forward_as_tuple(res_op.size, ceph::real_clock::now())); + if (!result) { + // an old reservation that was never committed or aborted is in the map + // caller should try again assuming other IDs are ok + CLS_LOG(1, "ERROR: cls_2pc_queue_reserve: reservation id conflict after rollover: %u", urgent_data.last_id); + return -EAGAIN; + } + + // write back head + head.bl_urgent_data.clear(); + encode(urgent_data, head.bl_urgent_data); + + const uint64_t urgent_data_length = head.bl_urgent_data.length(); + + if (head.max_urgent_data_size < urgent_data_length) { + // TODO: use objects xattr for spillover + CLS_LOG(1, "ERROR: cls_2pc_queue_reserve: urgent data size: %lu exceeded maximum: %lu", urgent_data_length, head.max_urgent_data_size); + return -ENOSPC; + } + + ret = queue_write_head(hctx, head); + if (ret < 0) { + return ret; + } + + CLS_LOG(20, "INFO: cls_2pc_queue_reserve: remaining size: %lu (bytes)", remaining_size); + CLS_LOG(20, "INFO: cls_2pc_queue_reserve: current reservations: %lu (bytes)", urgent_data.reserved_size); + CLS_LOG(20, "INFO: cls_2pc_queue_reserve: requested size: %lu (bytes)", res_op.size); + CLS_LOG(20, "INFO: cls_2pc_queue_reserve: urgent data size: %lu (bytes)", urgent_data_length); + CLS_LOG(20, "INFO: cls_2pc_queue_reserve: current reservation ops: %lu", urgent_data.reservations.size()); + + cls_2pc_queue_reserve_ret op_ret; + op_ret.id = urgent_data.last_id; + encode(op_ret, *out); + + return 0; +} + +static int cls_2pc_queue_commit(cls_method_context_t hctx, bufferlist *in, bufferlist *out) { + cls_2pc_queue_commit_op commit_op; + try { + auto in_iter = in->cbegin(); + decode(commit_op, in_iter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: cls_2pc_queue_commit: failed to decode entry: %s", err.what()); + return -EINVAL; + } + + // get head + cls_queue_head head; + int ret = queue_read_head(hctx, head); + if (ret < 0) { + return ret; + } + + cls_2pc_urgent_data urgent_data; + try { + auto in_iter = head.bl_urgent_data.cbegin(); + decode(urgent_data, in_iter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: cls_2pc_queue_commit: failed to decode entry: %s", err.what()); + return -EINVAL; + } + + auto it = urgent_data.reservations.find(commit_op.id); + if (it == urgent_data.reservations.end()) { + CLS_LOG(1, "ERROR: cls_2pc_queue_commit: reservation does not exist: %u", commit_op.id); + return -ENOENT; + } + + auto& res = it->second; + const auto actual_size = std::accumulate(commit_op.bl_data_vec.begin(), + commit_op.bl_data_vec.end(), 0UL, [] (uint64_t sum, const bufferlist& bl) { + return sum + bl.length(); + }); + + if (res.size < actual_size) { + CLS_LOG(1, "ERROR: cls_2pc_queue_commit: trying to commit %lu bytes to a %lu bytes reservation", + actual_size, + res.size); + return -EINVAL; + } + + // commit the data to the queue + cls_queue_enqueue_op enqueue_op; + enqueue_op.bl_data_vec = std::move(commit_op.bl_data_vec); + ret = queue_enqueue(hctx, enqueue_op, head); + if (ret < 0) { + return ret; + } + + // remove the reservation + urgent_data.reserved_size -= res.size; + urgent_data.reservations.erase(it); + + CLS_LOG(20, "INFO: cls_2pc_queue_commit: current reservations: %lu (bytes)", urgent_data.reserved_size); + CLS_LOG(20, "INFO: cls_2pc_queue_reserve: current reservation ops: %lu", urgent_data.reservations.size()); + // write back head + head.bl_urgent_data.clear(); + encode(urgent_data, head.bl_urgent_data); + return queue_write_head(hctx, head); +} + +static int cls_2pc_queue_abort(cls_method_context_t hctx, bufferlist *in, bufferlist *out) { + cls_2pc_queue_abort_op abort_op; + try { + auto in_iter = in->cbegin(); + decode(abort_op, in_iter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: cls_2pc_queue_abort: failed to decode entry: %s", err.what()); + return -EINVAL; + } + + // get head + cls_queue_head head; + int ret = queue_read_head(hctx, head); + if (ret < 0) { + return ret; + } + + cls_2pc_urgent_data urgent_data; + try { + auto in_iter = head.bl_urgent_data.cbegin(); + decode(urgent_data, in_iter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: cls_2pc_queue_abort: failed to decode entry: %s", err.what()); + return -EINVAL; + } + + auto it = urgent_data.reservations.find(abort_op.id); + if (it == urgent_data.reservations.end()) { + CLS_LOG(10, "INFO: cls_2pc_queue_abort: reservation does not exist: %u", abort_op.id); + return 0; + } + + // remove the reservation + urgent_data.reserved_size -= it->second.size; + urgent_data.reservations.erase(it); + + CLS_LOG(20, "INFO: cls_2pc_queue_abort: current reservations: %lu (bytes)", urgent_data.reserved_size); + CLS_LOG(20, "INFO: cls_2pc_queue_reserve: current reservation ops: %lu", urgent_data.reservations.size()); + + // write back head + head.bl_urgent_data.clear(); + encode(urgent_data, head.bl_urgent_data); + return queue_write_head(hctx, head); +} + +static int cls_2pc_queue_list_reservations(cls_method_context_t hctx, bufferlist *in, bufferlist *out) { + //get head + cls_queue_head head; + int ret = queue_read_head(hctx, head); + if (ret < 0) { + return ret; + } + + cls_2pc_urgent_data urgent_data; + try { + auto in_iter = head.bl_urgent_data.cbegin(); + decode(urgent_data, in_iter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: cls_2pc_queue_list_reservations: failed to decode entry: %s", err.what()); + return -EINVAL; + } + + cls_2pc_queue_reservations_ret op_ret; + op_ret.reservations = std::move(urgent_data.reservations); + + encode(op_ret, *out); + + return 0; +} + +CLS_INIT(2pc_queue) +{ + CLS_LOG(1, "Loaded 2pc queue class!"); + + cls_handle_t h_class; + cls_method_handle_t h_2pc_queue_init; + cls_method_handle_t h_2pc_queue_reserve; + cls_method_handle_t h_2pc_queue_commit; + cls_method_handle_t h_2pc_queue_abort; + cls_method_handle_t h_2pc_queue_list_reservations; + + cls_register(TPC_QUEUE_CLASS, &h_class); + + cls_register_cxx_method(h_class, TPC_QUEUE_INIT, CLS_METHOD_RD | CLS_METHOD_WR, cls_2pc_queue_init, &h_2pc_queue_init); + cls_register_cxx_method(h_class, TPC_QUEUE_RESERVE, CLS_METHOD_RD | CLS_METHOD_WR, cls_2pc_queue_reserve, &h_2pc_queue_reserve); + cls_register_cxx_method(h_class, TPC_QUEUE_COMMIT, CLS_METHOD_RD | CLS_METHOD_WR, cls_2pc_queue_commit, &h_2pc_queue_commit); + cls_register_cxx_method(h_class, TPC_QUEUE_ABORT, CLS_METHOD_RD | CLS_METHOD_WR, cls_2pc_queue_abort, &h_2pc_queue_abort); + cls_register_cxx_method(h_class, TPC_QUEUE_LIST_RESERVATIONS, CLS_METHOD_RD, cls_2pc_queue_list_reservations, &h_2pc_queue_list_reservations); + + return; +} + diff --git a/src/cls/2pc_queue/cls_2pc_queue_client.cc b/src/cls/2pc_queue/cls_2pc_queue_client.cc new file mode 100644 index 00000000000..ebd7c4ada67 --- /dev/null +++ b/src/cls/2pc_queue/cls_2pc_queue_client.cc @@ -0,0 +1,143 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "cls/2pc_queue/cls_2pc_queue_client.h" +#include "cls/2pc_queue/cls_2pc_queue_ops.h" +#include "cls/2pc_queue/cls_2pc_queue_const.h" +#include "cls/queue/cls_queue_ops.h" +#include "cls/queue/cls_queue_const.h" + +using namespace librados; + +void cls_2pc_queue_init(ObjectWriteOperation& op, const std::string& queue_name, uint64_t size) { + bufferlist in; + cls_queue_init_op call; + call.queue_size = size; + encode(call, in); + op.exec(TPC_QUEUE_CLASS, TPC_QUEUE_INIT, in); +} + +int cls_2pc_queue_get_capacity(IoCtx& io_ctx, const string& queue_name, uint64_t& size) { + bufferlist in, out; + const auto r = io_ctx.exec(queue_name, QUEUE_CLASS, QUEUE_GET_CAPACITY, in, out); + if (r < 0 ) { + return r; + } + + cls_queue_get_capacity_ret op_ret; + auto iter = out.cbegin(); + try { + decode(op_ret, iter); + } catch (buffer::error& err) { + return -EIO; + } + + size = op_ret.queue_capacity; + + return 0; +} + +int cls_2pc_queue_reserve(IoCtx& io_ctx, const string& queue_name, librados::ObjectWriteOperation& op, + uint64_t res_size, uint32_t entries, cls_2pc_reservation::id_t& res_id) { + bufferlist in, out; + cls_2pc_queue_reserve_op reserve_op; + reserve_op.size = res_size; + reserve_op.entries = entries; + + encode(reserve_op, in); + int rval; + op.exec(TPC_QUEUE_CLASS, TPC_QUEUE_RESERVE, in, &out, &rval); + const auto r = io_ctx.operate(queue_name, &op, librados::OPERATION_RETURNVEC); + if (r < 0) { + return r; + } + + cls_2pc_queue_reserve_ret op_ret; + auto iter = out.cbegin(); + try { + decode(op_ret, iter); + } catch (buffer::error& err) { + return -EIO; + } + res_id = op_ret.id; + + return 0; +} + +void cls_2pc_queue_commit(ObjectWriteOperation& op, std::vector bl_data_vec, + cls_2pc_reservation::id_t res_id) { + bufferlist in; + cls_2pc_queue_commit_op commit_op; + commit_op.id = res_id; + commit_op.bl_data_vec = std::move(bl_data_vec); + encode(commit_op, in); + op.exec(TPC_QUEUE_CLASS, TPC_QUEUE_COMMIT, in); +} + +void cls_2pc_queue_abort(ObjectWriteOperation& op, cls_2pc_reservation::id_t res_id) { + bufferlist in; + cls_2pc_queue_abort_op abort_op; + abort_op.id = res_id; + encode(abort_op, in); + op.exec(TPC_QUEUE_CLASS, TPC_QUEUE_ABORT, in); +} + +int cls_2pc_queue_list_entries(IoCtx& io_ctx, const string& queue_name, const string& marker, uint32_t max, + std::vector& entries, + bool *truncated, std::string& next_marker) { + bufferlist in, out; + cls_queue_list_op op; + op.start_marker = marker; + op.max = max; + encode(op, in); + + const auto r = io_ctx.exec(queue_name, QUEUE_CLASS, QUEUE_LIST_ENTRIES, in, out); + if (r < 0) { + return r; + } + + cls_queue_list_ret ret; + auto iter = out.cbegin(); + try { + decode(ret, iter); + } catch (buffer::error& err) { + return -EIO; + } + + entries = std::move(ret.entries); + *truncated = ret.is_truncated; + + next_marker = std::move(ret.next_marker); + + return 0; +} + +int cls_2pc_queue_list_reservations(librados::IoCtx& io_ctx, const std::string& queue_name, cls_2pc_reservations& reservations) { + bufferlist in, out; + + const auto r = io_ctx.exec(queue_name, TPC_QUEUE_CLASS, TPC_QUEUE_LIST_RESERVATIONS, in, out); + if (r < 0) { + return r; + } + + cls_2pc_queue_reservations_ret ret; + auto iter = out.cbegin(); + try { + decode(ret, iter); + } catch (buffer::error& err) { + return -EIO; + } + + reservations = std::move(ret.reservations); + + return 0; +} + +void cls_2pc_queue_remove_entries(ObjectWriteOperation& op, const std::string& end_marker) { + bufferlist in; + cls_queue_remove_op rem_op; + rem_op.end_marker = end_marker; + encode(rem_op, in); + op.exec(QUEUE_CLASS, QUEUE_REMOVE_ENTRIES, in); +} + diff --git a/src/cls/2pc_queue/cls_2pc_queue_client.h b/src/cls/2pc_queue/cls_2pc_queue_client.h new file mode 100644 index 00000000000..6b408e113f6 --- /dev/null +++ b/src/cls/2pc_queue/cls_2pc_queue_client.h @@ -0,0 +1,47 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include +#include +#include "include/rados/librados.hpp" +#include "cls/queue/cls_queue_types.h" +#include "cls/2pc_queue/cls_2pc_queue_types.h" + +// initialize the queue with maximum size (bytes) +// note that the actual size of the queue will be larger, as 24K bytes will be allocated in the head object +// and more may be allocated as xattrs of the object (depending with the number of concurrent reservations) +void cls_2pc_queue_init(librados::ObjectWriteOperation& op, const std::string& queue_name, uint64_t size); + +// return max capacity (bytes) +int cls_2pc_queue_get_capacity(librados::IoCtx& io_ctx, const string& queue_name, uint64_t& size); + +// make a reservation on the queue (in bytes) and number of expected entries (to calculate overhead) +// return a reservation id if reservations is possible, 0 otherwise +int cls_2pc_queue_reserve(librados::IoCtx& io_ctx, const std::string& queue_name, librados::ObjectWriteOperation& op, + uint64_t res_size, uint32_t entries, cls_2pc_reservation::id_t& res_id); + +// commit data using a reservation done beforehand +// res_id must be allocated using cls_2pc_queue_reserve, and could be either committed or aborted once +// the size of bl_data_vec must be equal or smaller to the size reserved for the res_id +// note that the number of entries in bl_data_vec does not have to match the number of entries reserved +// only size (including the overhead of the entries) is checked +void cls_2pc_queue_commit(librados::ObjectWriteOperation& op, std::vector bl_data_vec, + cls_2pc_reservation::id_t res_id); + +// abort a reservation +// res_id must be allocated using cls_2pc_queue_reserve +void cls_2pc_queue_abort(librados::ObjectWriteOperation& op, + cls_2pc_reservation::id_t res_id); + +// incremental listing of all entries in the queue +int cls_2pc_queue_list_entries(librados::IoCtx& io_ctx, const std::string& queue_name, const std::string& marker, uint32_t max, + std::vector& entries, bool *truncated, std::string& next_marker); + +// list all pending reservations in the queue +int cls_2pc_queue_list_reservations(librados::IoCtx& io_ctx, const std::string& queue_name, cls_2pc_reservations& reservations); + +// remove all entries up to the given marker +void cls_2pc_queue_remove_entries(librados::ObjectWriteOperation& op, const std::string& end_marker); + diff --git a/src/cls/2pc_queue/cls_2pc_queue_const.h b/src/cls/2pc_queue/cls_2pc_queue_const.h new file mode 100644 index 00000000000..2a22ff1a62c --- /dev/null +++ b/src/cls/2pc_queue/cls_2pc_queue_const.h @@ -0,0 +1,10 @@ +#pragma once + +#define TPC_QUEUE_CLASS "2pc_queue" + +#define TPC_QUEUE_INIT "2pc_queue_init" +#define TPC_QUEUE_RESERVE "2pc_queue_reserve" +#define TPC_QUEUE_COMMIT "2pc_queue_commit" +#define TPC_QUEUE_ABORT "2pc_queue_abort" +#define TPC_QUEUE_LIST_RESERVATIONS "2pc_queue_list_reservations" + diff --git a/src/cls/2pc_queue/cls_2pc_queue_ops.h b/src/cls/2pc_queue/cls_2pc_queue_ops.h new file mode 100644 index 00000000000..5a58dc6c4fa --- /dev/null +++ b/src/cls/2pc_queue/cls_2pc_queue_ops.h @@ -0,0 +1,97 @@ +#pragma once + +#include "include/types.h" +#include "cls_2pc_queue_types.h" + +struct cls_2pc_queue_reserve_op { + uint64_t size; + uint32_t entries; + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + encode(size, bl); + encode(entries, bl); + ENCODE_FINISH(bl); + } + + void decode(bufferlist::const_iterator& bl) { + DECODE_START(1, bl); + decode(size, bl); + decode(entries, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(cls_2pc_queue_reserve_op) + +struct cls_2pc_queue_reserve_ret { + cls_2pc_reservation::id_t id; // allocated reservation id + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + encode(id, bl); + ENCODE_FINISH(bl); + } + + void decode(bufferlist::const_iterator& bl) { + DECODE_START(1, bl); + decode(id, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(cls_2pc_queue_reserve_ret) + +struct cls_2pc_queue_commit_op { + cls_2pc_reservation::id_t id; // reservation to commit + std::vector bl_data_vec; // the data to enqueue + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + encode(id, bl); + encode(bl_data_vec, bl); + ENCODE_FINISH(bl); + } + + void decode(bufferlist::const_iterator& bl) { + DECODE_START(1, bl); + decode(id, bl); + decode(bl_data_vec, bl); + DECODE_FINISH(bl); + } + +}; +WRITE_CLASS_ENCODER(cls_2pc_queue_commit_op) + +struct cls_2pc_queue_abort_op { + cls_2pc_reservation::id_t id; // reservation to abort + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + encode(id, bl); + ENCODE_FINISH(bl); + } + + void decode(bufferlist::const_iterator& bl) { + DECODE_START(1, bl); + decode(id, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(cls_2pc_queue_abort_op) + +struct cls_2pc_queue_reservations_ret { + cls_2pc_reservations reservations; // reservation list (keyed by id) + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + encode(reservations, bl); + ENCODE_FINISH(bl); + } + + void decode(bufferlist::const_iterator& bl) { + DECODE_START(1, bl); + decode(reservations, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(cls_2pc_queue_reservations_ret) + diff --git a/src/cls/2pc_queue/cls_2pc_queue_types.h b/src/cls/2pc_queue/cls_2pc_queue_types.h new file mode 100644 index 00000000000..acf6534d3d8 --- /dev/null +++ b/src/cls/2pc_queue/cls_2pc_queue_types.h @@ -0,0 +1,58 @@ +#pragma once + +#include "include/types.h" + +struct cls_2pc_reservation +{ + using id_t = uint32_t; + inline static const id_t NO_ID{0}; + uint64_t size; // how many entries are reserved + ceph::real_time timestamp; // when the reservation was done (used for cleaning stale reservations) + + cls_2pc_reservation(uint64_t _size, ceph::real_time _timestamp) : + size(_size), timestamp(_timestamp) {} + + cls_2pc_reservation() = default; + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + encode(size, bl); + encode(timestamp, bl); + ENCODE_FINISH(bl); + } + + void decode(bufferlist::const_iterator& bl) { + DECODE_START(1, bl); + decode(size, bl); + decode(timestamp, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(cls_2pc_reservation) + +using cls_2pc_reservations = ceph::unordered_map; + +struct cls_2pc_urgent_data +{ + uint64_t reserved_size{0}; // pending reservations size in bytes + cls_2pc_reservation::id_t last_id{cls_2pc_reservation::NO_ID}; // last allocated id + cls_2pc_reservations reservations; // reservation list (keyed by id) + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + encode(reserved_size, bl); + encode(last_id, bl); + encode(reservations, bl); + ENCODE_FINISH(bl); + } + + void decode(bufferlist::const_iterator& bl) { + DECODE_START(1, bl); + decode(reserved_size, bl); + decode(last_id, bl); + decode(reservations, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(cls_2pc_urgent_data) + diff --git a/src/cls/CMakeLists.txt b/src/cls/CMakeLists.txt index 00f077ce0ec..72c3fd08bbe 100644 --- a/src/cls/CMakeLists.txt +++ b/src/cls/CMakeLists.txt @@ -300,3 +300,21 @@ if (WITH_RADOSGW) rgw_gc/cls_rgw_gc_client.cc) add_library(cls_rgw_gc_client STATIC ${cls_rgw_gc_client_srcs}) endif (WITH_RADOSGW) + +#cls_2pc_queue +set(cls_2pc_queue_srcs + 2pc_queue/cls_2pc_queue.cc + queue/cls_queue_src.cc + ${CMAKE_SOURCE_DIR}/src/common/ceph_json.cc) +add_library(cls_2pc_queue SHARED ${cls_2pc_queue_srcs}) +set_target_properties(cls_2pc_queue PROPERTIES + VERSION "1.0.0" + SOVERSION "1" + INSTALL_RPATH "" + CXX_VISIBILITY_PRESET hidden) +install(TARGETS cls_2pc_queue DESTINATION ${cls_dir}) + +set(cls_2pc_queue_client_srcs + 2pc_queue/cls_2pc_queue_client.cc) +add_library(cls_2pc_queue_client STATIC ${cls_2pc_queue_client_srcs}) + diff --git a/src/cls/queue/cls_queue_src.cc b/src/cls/queue/cls_queue_src.cc index b48dcd19f85..879cfd2b492 100644 --- a/src/cls/queue/cls_queue_src.cc +++ b/src/cls/queue/cls_queue_src.cc @@ -67,10 +67,9 @@ int queue_read_head(cls_method_context_t hctx, cls_queue_head& head) return -EINVAL; } - constexpr auto decoded_head_size = sizeof(queue_head_start) + sizeof(encoded_len); - if (encoded_len > (chunk_size - decoded_head_size)) { + if (encoded_len > (chunk_size - QUEUE_ENTRY_OVERHEAD)) { start_offset = chunk_size; - chunk_size = (encoded_len - (chunk_size - decoded_head_size)); + chunk_size = (encoded_len - (chunk_size - QUEUE_ENTRY_OVERHEAD)); bufferlist bl_remaining_head; const auto ret = cls_cxx_read2(hctx, start_offset, chunk_size, &bl_remaining_head, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL); if (ret < 0) { @@ -139,6 +138,32 @@ int queue_get_capacity(cls_method_context_t hctx, cls_queue_get_capacity_ret& op return 0; } + +/* +enqueue of new bufferlist happens in the free spaces of the queue, the queue can be in +one of two states: + +(1) split free space ++-------------+--------------------------------------------------------------------+ +| object head | XXXXXXXXXXXXXXXXXXXXXXXXXXX | +| | ^ ^ | +| front tail | | | | ++---+------+--+----------------|-------------------------|-------------------------+ + | | | | + | +-------------------|-------------------------+ + +--------------------------+ + +(2) continuous free space ++-------------+--------------------------------------------------------------------+ +| object head |XXXXXXXXXXXXXXXXX XXXXXXXXXXXXXXXXXXXXXXXXXX| +| | ^ ^ | +| front tail | | | | ++---+------+--+----------------|-------------------------|-------------------------+ + | | | | + | +-------------------+ | + +----------------------------------------------------+ +*/ + int queue_enqueue(cls_method_context_t hctx, cls_queue_enqueue_op& op, cls_queue_head& head) { if ((head.front.offset == head.tail.offset) && (head.tail.gen == head.front.gen + 1)) { @@ -314,7 +339,7 @@ int queue_list_entries(cls_method_context_t hctx, const cls_queue_list_op& op, c } // Magic number + Data size - process if not done in previous iteration if (! entry_start_processed ) { - if (size_to_process >= (sizeof(uint16_t) + sizeof(uint64_t))) { + if (size_to_process >= QUEUE_ENTRY_OVERHEAD) { // Decode magic number at start try { decode(entry_start, it); diff --git a/src/cls/queue/cls_queue_src.h b/src/cls/queue/cls_queue_src.h index 9111f1f0409..9970b98ea7c 100644 --- a/src/cls/queue/cls_queue_src.h +++ b/src/cls/queue/cls_queue_src.h @@ -1,6 +1,10 @@ #ifndef CEPH_CLS_QUEUE_SRC_H #define CEPH_CLS_QUEUE_SRC_H +#include "objclass/objclass.h" +#include "cls/queue/cls_queue_types.h" +#include "cls/queue/cls_queue_ops.h" + int queue_write_head(cls_method_context_t hctx, cls_queue_head& head); int queue_read_head(cls_method_context_t hctx, cls_queue_head& head); int queue_init(cls_method_context_t hctx, const cls_queue_init_op& op); @@ -9,4 +13,4 @@ int queue_enqueue(cls_method_context_t hctx, cls_queue_enqueue_op& op, cls_queue int queue_list_entries(cls_method_context_t hctx, const cls_queue_list_op& op, cls_queue_list_ret& op_ret, cls_queue_head& head); int queue_remove_entries(cls_method_context_t hctx, const cls_queue_remove_op& op, cls_queue_head& head); -#endif /* CEPH_CLS_QUEUE_SRC_H */ \ No newline at end of file +#endif /* CEPH_CLS_QUEUE_SRC_H */ diff --git a/src/cls/queue/cls_queue_types.h b/src/cls/queue/cls_queue_types.h index cf50cb163f4..0a253bdfefb 100644 --- a/src/cls/queue/cls_queue_types.h +++ b/src/cls/queue/cls_queue_types.h @@ -11,6 +11,7 @@ constexpr unsigned int QUEUE_HEAD_START = 0xDEAD; constexpr unsigned int QUEUE_ENTRY_START = 0xBEEF; +constexpr unsigned int QUEUE_ENTRY_OVERHEAD = sizeof(uint16_t) + sizeof(uint64_t); struct cls_queue_entry { @@ -114,4 +115,4 @@ struct cls_queue_head }; WRITE_CLASS_ENCODER(cls_queue_head) -#endif \ No newline at end of file +#endif diff --git a/src/osd/CMakeLists.txt b/src/osd/CMakeLists.txt index c993ac2ff1b..56844af9499 100644 --- a/src/osd/CMakeLists.txt +++ b/src/osd/CMakeLists.txt @@ -67,5 +67,5 @@ if(WITH_RBD) add_dependencies(osd cls_rbd) endif() if(WITH_RADOSGW) - add_dependencies(osd cls_otp cls_rgw cls_queue cls_rgw_gc) + add_dependencies(osd cls_otp cls_rgw cls_queue cls_rgw_gc cls_2pc_queue) endif() diff --git a/src/test/CMakeLists.txt b/src/test/CMakeLists.txt index 82208219e78..f4157d184b9 100644 --- a/src/test/CMakeLists.txt +++ b/src/test/CMakeLists.txt @@ -22,6 +22,7 @@ add_subdirectory(cls_version) add_subdirectory(cls_lua) add_subdirectory(cls_rgw_gc) add_subdirectory(cls_queue) +add_subdirectory(cls_2pc_queue) add_subdirectory(common) add_subdirectory(compressor) add_subdirectory(crush) diff --git a/src/test/cls_2pc_queue/CMakeLists.txt b/src/test/cls_2pc_queue/CMakeLists.txt new file mode 100644 index 00000000000..1b5a4eda303 --- /dev/null +++ b/src/test/cls_2pc_queue/CMakeLists.txt @@ -0,0 +1,18 @@ +add_executable(ceph_test_cls_2pc_queue + test_cls_2pc_queue.cc +) +target_link_libraries(ceph_test_cls_2pc_queue + cls_2pc_queue_client + cls_queue_client + librados + global + ${UNITTEST_LIBS} + ${EXTRALIBS} + ${BLKID_LIBRARIES} + ${CMAKE_DL_LIBS} + radostest-cxx) + + install(TARGETS + ceph_test_cls_2pc_queue + DESTINATION ${CMAKE_INSTALL_BINDIR}) + diff --git a/src/test/cls_2pc_queue/test_cls_2pc_queue.cc b/src/test/cls_2pc_queue/test_cls_2pc_queue.cc new file mode 100644 index 00000000000..6ca8817e563 --- /dev/null +++ b/src/test/cls_2pc_queue/test_cls_2pc_queue.cc @@ -0,0 +1,704 @@ +// -*- mode:C; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "include/types.h" + +#include "cls/2pc_queue/cls_2pc_queue_types.h" +#include "cls/2pc_queue/cls_2pc_queue_client.h" +#include "cls/queue/cls_queue_client.h" +#include "cls/2pc_queue/cls_2pc_queue_types.h" + +#include "gtest/gtest.h" +#include "test/librados/test_cxx.h" +#include "global/global_context.h" + +#include +#include +#include +#include +#include +#include + +class TestCls2PCQueue : public ::testing::Test { +protected: + librados::Rados rados; + std::string pool_name; + librados::IoCtx ioctx; + + void SetUp() override { + pool_name = get_temp_pool_name(); + ASSERT_EQ("", create_one_pool_pp(pool_name, rados)); + ASSERT_EQ(0, rados.ioctx_create(pool_name.c_str(), ioctx)); + } + + void TearDown() override { + ioctx.close(); + ASSERT_EQ(0, destroy_one_pool_pp(pool_name, rados)); + } +}; + +TEST_F(TestCls2PCQueue, GetCapacity) +{ + const std::string queue_name = "my-queue"; + const auto max_size = 8*1024; + librados::ObjectWriteOperation op; + op.create(true); + cls_2pc_queue_init(op, queue_name, max_size); + ASSERT_EQ(0, ioctx.operate(queue_name, &op)); + + uint64_t size; + + const int ret = cls_queue_get_capacity(ioctx, queue_name, size); + ASSERT_EQ(0, ret); + ASSERT_EQ(max_size, size); +} + +TEST_F(TestCls2PCQueue, Reserve) +{ + const std::string queue_name = "my-queue"; + const auto max_size = 1024U*1024U; + const auto number_of_ops = 10U; + const auto number_of_elements = 23U; + const auto size_to_reserve = 250U; + librados::ObjectWriteOperation op; + op.create(true); + cls_2pc_queue_init(op, queue_name, max_size); + ASSERT_EQ(0, ioctx.operate(queue_name, &op)); + + for (auto i = 0U; i < number_of_ops; ++i) { + cls_2pc_reservation::id_t res_id; + ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, op, size_to_reserve, number_of_elements, res_id), 0); + ASSERT_NE(res_id, cls_2pc_reservation::NO_ID); + } + cls_2pc_reservations reservations; + ASSERT_EQ(0, cls_2pc_queue_list_reservations(ioctx, queue_name, reservations)); + ASSERT_EQ(reservations.size(), number_of_ops); + for (const auto& r : reservations) { + ASSERT_NE(r.first, cls_2pc_reservation::NO_ID); + ASSERT_GT(r.second.timestamp.time_since_epoch().count(), 0); + } +} + +TEST_F(TestCls2PCQueue, Commit) +{ + const std::string queue_name = "my-queue"; + const auto max_size = 1024*1024*128; + const auto number_of_ops = 200U; + const auto number_of_elements = 23U; + librados::ObjectWriteOperation op; + op.create(true); + cls_2pc_queue_init(op, queue_name, max_size); + ASSERT_EQ(0, ioctx.operate(queue_name, &op)); + + for (auto i = 0U; i < number_of_ops; ++i) { + const std::string element_prefix("op-" +to_string(i) + "-element-"); + auto total_size = 0UL; + std::vector data(number_of_elements); + // create vector of buffer lists + std::generate(data.begin(), data.end(), [j = 0, &element_prefix, &total_size] () mutable { + bufferlist bl; + bl.append(element_prefix + to_string(j++)); + total_size += bl.length(); + return bl; + }); + + cls_2pc_reservation::id_t res_id; + ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, op, total_size, number_of_elements, res_id), 0); + ASSERT_NE(res_id, cls_2pc_reservation::NO_ID); + cls_2pc_queue_commit(op, data, res_id); + ASSERT_EQ(0, ioctx.operate(queue_name, &op)); + } + cls_2pc_reservations reservations; + ASSERT_EQ(0, cls_2pc_queue_list_reservations(ioctx, queue_name, reservations)); + ASSERT_EQ(reservations.size(), 0); +} + +TEST_F(TestCls2PCQueue, Abort) +{ + const std::string queue_name = "my-queue"; + const auto max_size = 1024U*1024U; + const auto number_of_ops = 17U; + const auto number_of_elements = 23U; + const auto size_to_reserve = 250U; + librados::ObjectWriteOperation op; + op.create(true); + cls_2pc_queue_init(op, queue_name, max_size); + ASSERT_EQ(0, ioctx.operate(queue_name, &op)); + + for (auto i = 0U; i < number_of_ops; ++i) { + cls_2pc_reservation::id_t res_id; + ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, op, size_to_reserve, number_of_elements, res_id), 0); + ASSERT_NE(res_id, cls_2pc_reservation::NO_ID); + cls_2pc_queue_abort(op, res_id); + ASSERT_EQ(0, ioctx.operate(queue_name, &op)); + } + cls_2pc_reservations reservations; + ASSERT_EQ(0, cls_2pc_queue_list_reservations(ioctx, queue_name, reservations)); + ASSERT_EQ(reservations.size(), 0); +} + +TEST_F(TestCls2PCQueue, ReserveError) +{ + const std::string queue_name = "my-queue"; + const auto max_size = 256U*1024U; + const auto number_of_ops = 254U; + const auto number_of_elements = 1U; + const auto size_to_reserve = 1024U; + librados::ObjectWriteOperation op; + op.create(true); + cls_2pc_queue_init(op, queue_name, max_size); + ASSERT_EQ(0, ioctx.operate(queue_name, &op)); + + cls_2pc_reservation::id_t res_id; + for (auto i = 0U; i < number_of_ops-1; ++i) { + ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, op, size_to_reserve, number_of_elements, res_id), 0); + ASSERT_NE(res_id, cls_2pc_reservation::NO_ID); + } + res_id = cls_2pc_reservation::NO_ID; + // this one is failing because it exceeds the queue size + ASSERT_NE(cls_2pc_queue_reserve(ioctx, queue_name, op, size_to_reserve, number_of_elements, res_id), 0); + ASSERT_EQ(res_id, cls_2pc_reservation::NO_ID); + + // this one is failing because it tries to reserve 0 entries + ASSERT_NE(cls_2pc_queue_reserve(ioctx, queue_name, op, size_to_reserve, 0, res_id), 0); + // this one is failing because it tries to reserve 0 bytes + ASSERT_NE(cls_2pc_queue_reserve(ioctx, queue_name, op, 0, number_of_elements, res_id), 0); + + cls_2pc_reservations reservations; + ASSERT_EQ(0, cls_2pc_queue_list_reservations(ioctx, queue_name, reservations)); + ASSERT_EQ(reservations.size(), number_of_ops-1); + for (const auto& r : reservations) { + ASSERT_NE(r.first, cls_2pc_reservation::NO_ID); + ASSERT_GT(r.second.timestamp.time_since_epoch().count(), 0); + } +} + +TEST_F(TestCls2PCQueue, CommitError) +{ + const std::string queue_name = "my-queue"; + const auto max_size = 1024*1024; + const auto number_of_ops = 17U; + const auto number_of_elements = 23U; + librados::ObjectWriteOperation op; + op.create(true); + cls_2pc_queue_init(op, queue_name, max_size); + ASSERT_EQ(0, ioctx.operate(queue_name, &op)); + + const auto invalid_reservation_op = 8; + const auto invalid_elements_op = 11; + std::vector invalid_data(number_of_elements+3); + // create vector of buffer lists + std::generate(invalid_data.begin(), invalid_data.end(), [j = 0] () mutable { + bufferlist bl; + bl.append("invalid data is larger that regular data" + to_string(j++)); + return bl; + }); + for (auto i = 0U; i < number_of_ops; ++i) { + const std::string element_prefix("op-" +to_string(i) + "-element-"); + std::vector data(number_of_elements); + auto total_size = 0UL; + // create vector of buffer lists + std::generate(data.begin(), data.end(), [j = 0, &element_prefix, &total_size] () mutable { + bufferlist bl; + bl.append(element_prefix + to_string(j++)); + total_size += bl.length(); + return bl; + }); + + cls_2pc_reservation::id_t res_id; + ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, op, total_size, number_of_elements, res_id), 0); + ASSERT_NE(res_id, cls_2pc_reservation::NO_ID); + if (i == invalid_reservation_op) { + // fail on a commits with invalid reservation id + cls_2pc_queue_commit(op, data, res_id+999); + ASSERT_NE(0, ioctx.operate(queue_name, &op)); + } else if (i == invalid_elements_op) { + // fail on a commits when data size is larger than the reserved one + cls_2pc_queue_commit(op, invalid_data, res_id); + ASSERT_NE(0, ioctx.operate(queue_name, &op)); + } else { + cls_2pc_queue_commit(op, data, res_id); + ASSERT_EQ(0, ioctx.operate(queue_name, &op)); + } + } + cls_2pc_reservations reservations; + ASSERT_EQ(0, cls_2pc_queue_list_reservations(ioctx, queue_name, reservations)); + // 2 reservations were not comitted + ASSERT_EQ(reservations.size(), 2); +} + +TEST_F(TestCls2PCQueue, AbortError) +{ + const std::string queue_name = "my-queue"; + const auto max_size = 1024*1024; + const auto number_of_ops = 17U; + const auto number_of_elements = 23U; + const auto size_to_reserve = 250U; + librados::ObjectWriteOperation op; + op.create(true); + cls_2pc_queue_init(op, queue_name, max_size); + ASSERT_EQ(0, ioctx.operate(queue_name, &op)); + + const auto invalid_reservation_op = 8; + + for (auto i = 0U; i < number_of_ops; ++i) { + cls_2pc_reservation::id_t res_id; + ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, op, size_to_reserve, number_of_elements, res_id), 0); + ASSERT_NE(res_id, cls_2pc_reservation::NO_ID); + if (i == invalid_reservation_op) { + // aborting a reservation which does not exists + // is a no-op, not an error + cls_2pc_queue_abort(op, res_id+999); + } else { + cls_2pc_queue_abort(op, res_id); + } + ASSERT_EQ(0, ioctx.operate(queue_name, &op)); + } + cls_2pc_reservations reservations; + ASSERT_EQ(0, cls_2pc_queue_list_reservations(ioctx, queue_name, reservations)); + // 1 reservation was not aborted + ASSERT_EQ(reservations.size(), 1); +} + +TEST_F(TestCls2PCQueue, MultiReserve) +{ + const std::string queue_name = "my-queue"; + const auto max_size = 1024*1024; + const auto number_of_ops = 11U; + const auto number_of_elements = 23U; + const auto max_producer_count = 10U; + const auto size_to_reserve = 250U; + librados::ObjectWriteOperation op; + op.create(true); + cls_2pc_queue_init(op, queue_name, max_size); + ASSERT_EQ(0, ioctx.operate(queue_name, &op)); + + std::vector producers(max_producer_count); + for (auto& p : producers) { + p = std::thread([this, &queue_name] { + librados::ObjectWriteOperation op; + for (auto i = 0U; i < number_of_ops; ++i) { + cls_2pc_reservation::id_t res_id = cls_2pc_reservation::NO_ID; + ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, op, size_to_reserve, number_of_elements, res_id), 0); + ASSERT_NE(res_id, 0); + } + }); + } + + std::for_each(producers.begin(), producers.end(), [](auto& p) { p.join(); }); + + cls_2pc_reservations reservations; + ASSERT_EQ(0, cls_2pc_queue_list_reservations(ioctx, queue_name, reservations)); + ASSERT_EQ(reservations.size(), number_of_ops*max_producer_count); + auto total_reservations = 0U; + for (const auto& r : reservations) { + total_reservations += r.second.size; + } + ASSERT_EQ(total_reservations, number_of_ops*max_producer_count*size_to_reserve); +} + +TEST_F(TestCls2PCQueue, MultiCommit) +{ + const std::string queue_name = "my-queue"; + const auto max_size = 1024*1024; + const auto number_of_ops = 11U; + const auto number_of_elements = 23U; + const auto max_producer_count = 10U; + librados::ObjectWriteOperation op; + op.create(true); + cls_2pc_queue_init(op, queue_name, max_size); + ASSERT_EQ(0, ioctx.operate(queue_name, &op)); + + std::vector producers(max_producer_count); + for (auto& p : producers) { + p = std::thread([this, &queue_name] { + librados::ObjectWriteOperation op; + for (auto i = 0U; i < number_of_ops; ++i) { + const std::string element_prefix("op-" +to_string(i) + "-element-"); + std::vector data(number_of_elements); + auto total_size = 0UL; + // create vector of buffer lists + std::generate(data.begin(), data.end(), [j = 0, &element_prefix, &total_size] () mutable { + bufferlist bl; + bl.append(element_prefix + to_string(j++)); + total_size += bl.length(); + return bl; + }); + cls_2pc_reservation::id_t res_id = cls_2pc_reservation::NO_ID; + ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, op, total_size, number_of_elements, res_id), 0); + ASSERT_NE(res_id, 0); + cls_2pc_queue_commit(op, data, res_id); + ASSERT_EQ(0, ioctx.operate(queue_name, &op)); + } + }); + } + + std::for_each(producers.begin(), producers.end(), [](auto& p) { p.join(); }); + + cls_2pc_reservations reservations; + ASSERT_EQ(0, cls_2pc_queue_list_reservations(ioctx, queue_name, reservations)); + ASSERT_EQ(reservations.size(), 0); +} + +TEST_F(TestCls2PCQueue, MultiAbort) +{ + const std::string queue_name = "my-queue"; + const auto max_size = 1024*1024; + const auto number_of_ops = 11U; + const auto number_of_elements = 23U; + const auto max_producer_count = 10U; + const auto size_to_reserve = 250U; + librados::ObjectWriteOperation op; + op.create(true); + cls_2pc_queue_init(op, queue_name, max_size); + ASSERT_EQ(0, ioctx.operate(queue_name, &op)); + + std::vector producers(max_producer_count); + for (auto& p : producers) { + p = std::thread([this, &queue_name] { + librados::ObjectWriteOperation op; + for (auto i = 0U; i < number_of_ops; ++i) { + cls_2pc_reservation::id_t res_id = cls_2pc_reservation::NO_ID; + ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, op, size_to_reserve, number_of_elements, res_id), 0); + ASSERT_NE(res_id, 0); + cls_2pc_queue_abort(op, res_id); + ASSERT_EQ(0, ioctx.operate(queue_name, &op)); + } + }); + } + + std::for_each(producers.begin(), producers.end(), [](auto& p) { p.join(); }); + + cls_2pc_reservations reservations; + ASSERT_EQ(0, cls_2pc_queue_list_reservations(ioctx, queue_name, reservations)); + ASSERT_EQ(reservations.size(), 0); +} + +TEST_F(TestCls2PCQueue, ReserveCommit) +{ + const std::string queue_name = "my-queue"; + const auto max_size = 1024*1024; + const auto number_of_ops = 11U; + const auto number_of_elements = 23U; + const auto max_workers = 10U; + const auto size_to_reserve = 512U; + librados::ObjectWriteOperation op; + op.create(true); + cls_2pc_queue_init(op, queue_name, max_size); + ASSERT_EQ(0, ioctx.operate(queue_name, &op)); + + std::vector reservers(max_workers); + for (auto& r : reservers) { + r = std::thread([this, &queue_name] { + librados::ObjectWriteOperation op; + for (auto i = 0U; i < number_of_ops; ++i) { + cls_2pc_reservation::id_t res_id = cls_2pc_reservation::NO_ID; + ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, op, size_to_reserve, number_of_elements, res_id), 0); + ASSERT_NE(res_id, cls_2pc_reservation::NO_ID); + } + }); + } + + auto committer = std::thread([this, &queue_name] { + librados::ObjectWriteOperation op; + int remaining_ops = number_of_ops*max_workers; + while (remaining_ops > 0) { + const std::string element_prefix("op-" +to_string(remaining_ops) + "-element-"); + std::vector data(number_of_elements); + // create vector of buffer lists + std::generate(data.begin(), data.end(), [j = 0, &element_prefix] () mutable { + bufferlist bl; + bl.append(element_prefix + to_string(j++)); + return bl; + }); + cls_2pc_reservations reservations; + ASSERT_EQ(0, cls_2pc_queue_list_reservations(ioctx, queue_name, reservations)); + for (const auto& r : reservations) { + cls_2pc_queue_commit(op, data, r.first); + ASSERT_EQ(0, ioctx.operate(queue_name, &op)); + --remaining_ops; + } + } + }); + + std::for_each(reservers.begin(), reservers.end(), [](auto& r) { r.join(); }); + committer.join(); + + cls_2pc_reservations reservations; + ASSERT_EQ(0, cls_2pc_queue_list_reservations(ioctx, queue_name, reservations)); + ASSERT_EQ(reservations.size(), 0); +} + +TEST_F(TestCls2PCQueue, ReserveAbort) +{ + const std::string queue_name = "my-queue"; + const auto max_size = 1024*1024; + const auto number_of_ops = 17U; + const auto number_of_elements = 23U; + const auto max_workers = 10U; + const auto size_to_reserve = 250U; + librados::ObjectWriteOperation op; + op.create(true); + cls_2pc_queue_init(op, queue_name, max_size); + ASSERT_EQ(0, ioctx.operate(queue_name, &op)); + + std::vector reservers(max_workers); + for (auto& r : reservers) { + r = std::thread([this, &queue_name] { + librados::ObjectWriteOperation op; + for (auto i = 0U; i < number_of_ops; ++i) { + cls_2pc_reservation::id_t res_id = cls_2pc_reservation::NO_ID; + ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, op, size_to_reserve, number_of_elements, res_id), 0); + ASSERT_NE(res_id, cls_2pc_reservation::NO_ID); + } + }); + } + + auto aborter = std::thread([this, &queue_name] { + librados::ObjectWriteOperation op; + int remaining_ops = number_of_ops*max_workers; + while (remaining_ops > 0) { + cls_2pc_reservations reservations; + ASSERT_EQ(0, cls_2pc_queue_list_reservations(ioctx, queue_name, reservations)); + for (const auto& r : reservations) { + cls_2pc_queue_abort(op, r.first); + ASSERT_EQ(0, ioctx.operate(queue_name, &op)); + --remaining_ops; + } + } + }); + + std::for_each(reservers.begin(), reservers.end(), [](auto& r) { r.join(); }); + aborter.join(); + + cls_2pc_reservations reservations; + ASSERT_EQ(0, cls_2pc_queue_list_reservations(ioctx, queue_name, reservations)); + ASSERT_EQ(reservations.size(), 0); +} + +TEST_F(TestCls2PCQueue, Cleanup) +{ + const std::string queue_name = "my-queue"; + const auto max_size = 128*1024*1024; + const auto number_of_ops = 17U; + const auto number_of_elements = 23U; + const auto max_workers = 10U; + const auto size_to_reserve = 512U; + librados::ObjectWriteOperation op; + op.create(true); + cls_2pc_queue_init(op, queue_name, max_size); + ASSERT_EQ(0, ioctx.operate(queue_name, &op)); + + // anything older than 100ms is cosidered stale + ceph::real_time stale_time = ceph::real_clock::now() + std::chrono::milliseconds(100); + + std::vector reservers(max_workers); + for (auto& r : reservers) { + r = std::thread([this, &queue_name] { + librados::ObjectWriteOperation op; + for (auto i = 0U; i < number_of_ops; ++i) { + cls_2pc_reservation::id_t res_id = cls_2pc_reservation::NO_ID; + ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, op, size_to_reserve, number_of_elements, res_id), 0); + ASSERT_NE(res_id, cls_2pc_reservation::NO_ID); + // wait for 10ms between each reservation to make sure at least some are stale + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + }); + } + + auto cleaned_reservations = 0U; + auto committed_reservations = 0U; + auto aborter = std::thread([this, &queue_name, &stale_time, &cleaned_reservations, &committed_reservations] { + librados::ObjectWriteOperation op; + int remaining_ops = number_of_ops*max_workers; + while (remaining_ops > 0) { + cls_2pc_reservations reservations; + ASSERT_EQ(0, cls_2pc_queue_list_reservations(ioctx, queue_name, reservations)); + for (const auto& r : reservations) { + if (r.second.timestamp > stale_time) { + // abort stale reservations + cls_2pc_queue_abort(op, r.first); + ASSERT_EQ(0, ioctx.operate(queue_name, &op)); + ++cleaned_reservations; + } else { + // commit good reservations + const std::string element_prefix("op-" +to_string(remaining_ops) + "-element-"); + std::vector data(number_of_elements); + // create vector of buffer lists + std::generate(data.begin(), data.end(), [j = 0, &element_prefix] () mutable { + bufferlist bl; + bl.append(element_prefix + to_string(j++)); + return bl; + }); + cls_2pc_queue_commit(op, data, r.first); + ASSERT_EQ(0, ioctx.operate(queue_name, &op)); + ++committed_reservations; + } + --remaining_ops; + } + } + }); + + + std::for_each(reservers.begin(), reservers.end(), [](auto& r) { r.join(); }); + aborter.join(); + + ASSERT_GT(cleaned_reservations, 0); + ASSERT_GT(committed_reservations, 0); + cls_2pc_reservations reservations; + ASSERT_EQ(0, cls_2pc_queue_list_reservations(ioctx, queue_name, reservations)); + ASSERT_EQ(reservations.size(), 0); +} + +TEST_F(TestCls2PCQueue, MultiProducer) +{ + const std::string queue_name = "my-queue"; + const auto max_size = 128*1024*1024; + const auto number_of_ops = 300U; + const auto number_of_elements = 23U; + const auto max_producer_count = 10U; + librados::ObjectWriteOperation op; + op.create(true); + cls_2pc_queue_init(op, queue_name, max_size); + ASSERT_EQ(0, ioctx.operate(queue_name, &op)); + + auto producer_count = max_producer_count; + + std::vector producers(max_producer_count); + for (auto& p : producers) { + p = std::thread([this, &queue_name, &producer_count] { + librados::ObjectWriteOperation op; + for (auto i = 0U; i < number_of_ops; ++i) { + const std::string element_prefix("op-" +to_string(i) + "-element-"); + std::vector data(number_of_elements); + auto total_size = 0UL; + // create vector of buffer lists + std::generate(data.begin(), data.end(), [j = 0, &element_prefix, &total_size] () mutable { + bufferlist bl; + bl.append(element_prefix + to_string(j++)); + total_size += bl.length(); + return bl; + }); + cls_2pc_reservation::id_t res_id = cls_2pc_reservation::NO_ID; + ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, op, total_size, number_of_elements, res_id), 0); + ASSERT_NE(res_id, 0); + cls_2pc_queue_commit(op, data, res_id); + ASSERT_EQ(0, ioctx.operate(queue_name, &op)); + } + --producer_count; + }); + } + + auto consume_count = 0U; + std::thread consumer([this, &queue_name, &consume_count, &producer_count] { + librados::ObjectWriteOperation op; + const auto max_elements = 42; + const std::string marker; + bool truncated = false; + std::string end_marker; + std::vector entries; + while (producer_count > 0 || truncated) { + const auto ret = cls_2pc_queue_list_entries(ioctx, queue_name, marker, max_elements, entries, &truncated, end_marker); + ASSERT_EQ(0, ret); + consume_count += entries.size(); + cls_2pc_queue_remove_entries(op, end_marker); + ASSERT_EQ(0, ioctx.operate(queue_name, &op)); + } + }); + + std::for_each(producers.begin(), producers.end(), [](auto& p) { p.join(); }); + consumer.join(); + ASSERT_EQ(consume_count, number_of_ops*number_of_elements*max_producer_count); +} + +TEST_F(TestCls2PCQueue, MultiProducerConsumer) +{ + const std::string queue_name = "my-queue"; + const auto max_size = 1024*1024; + const auto number_of_ops = 300U; + const auto number_of_elements = 23U; + const auto max_workers = 10U; + librados::ObjectWriteOperation op; + op.create(true); + cls_2pc_queue_init(op, queue_name, max_size); + ASSERT_EQ(0, ioctx.operate(queue_name, &op)); + + auto producer_count = max_workers; + + auto retry_happened = false; + + std::vector producers(max_workers); + for (auto& p : producers) { + p = std::thread([this, &queue_name, &producer_count, &retry_happened] { + librados::ObjectWriteOperation op; + for (auto i = 0U; i < number_of_ops; ++i) { + const std::string element_prefix("op-" +to_string(i) + "-element-"); + std::vector data(number_of_elements); + auto total_size = 0UL; + // create vector of buffer lists + std::generate(data.begin(), data.end(), [j = 0, &element_prefix, &total_size] () mutable { + bufferlist bl; + bl.append(element_prefix + to_string(j++)); + total_size += bl.length(); + return bl; + }); + cls_2pc_reservation::id_t res_id = cls_2pc_reservation::NO_ID; + auto rc = cls_2pc_queue_reserve(ioctx, queue_name, op, total_size, number_of_elements, res_id); + while (rc != 0) { + // other errors should cause test to fail + ASSERT_EQ(rc, -ENOSPC); + ASSERT_EQ(res_id, 0); + // queue is full, sleep and retry + retry_happened = true; + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + rc = cls_2pc_queue_reserve(ioctx, queue_name, op, total_size, number_of_elements, res_id); + }; + ASSERT_NE(res_id, 0); + cls_2pc_queue_commit(op, data, res_id); + ASSERT_EQ(0, ioctx.operate(queue_name, &op)); + } + --producer_count; + }); + } + + const auto max_elements = 128; + std::vector consumers(max_workers/2); + for (auto& c : consumers) { + c = std::thread([this, &queue_name, &producer_count] { + librados::ObjectWriteOperation op; + const std::string marker; + bool truncated = false; + std::string end_marker; + std::vector entries; + while (producer_count > 0 || truncated) { + const auto ret = cls_2pc_queue_list_entries(ioctx, queue_name, marker, max_elements, entries, &truncated, end_marker); + ASSERT_EQ(0, ret); + if (entries.empty()) { + // queue is empty, let it fill + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } else { + cls_2pc_queue_remove_entries(op, end_marker); + ASSERT_EQ(0, ioctx.operate(queue_name, &op)); + } + } + }); + } + + std::for_each(producers.begin(), producers.end(), [](auto& p) { p.join(); }); + std::for_each(consumers.begin(), consumers.end(), [](auto& c) { c.join(); }); + if (!retry_happened) { + std::cerr << "Queue was never full - all reservations were sucessfull." << + "Please decrease the amount of consumer threads" << std::endl; + } + // make sure that queue is empty and no reservations remain + cls_2pc_reservations reservations; + ASSERT_EQ(0, cls_2pc_queue_list_reservations(ioctx, queue_name, reservations)); + ASSERT_EQ(reservations.size(), 0); + const std::string marker; + bool truncated = false; + std::string end_marker; + std::vector entries; + ASSERT_EQ(0, cls_2pc_queue_list_entries(ioctx, queue_name, marker, max_elements, entries, &truncated, end_marker)); + ASSERT_EQ(entries.size(), 0); +} + -- 2.39.5