From dd1c4628267575432e3ab430819598de11887940 Mon Sep 17 00:00:00 2001 From: Adam Emerson Date: Tue, 5 Mar 2024 09:56:19 -0500 Subject: [PATCH] cls: Add sem_set class This stores a collection of key-based semaphores, intended to serve as a backend to the project to close the datalog write-hole. Signed-off-by: Adam Emerson --- qa/suites/rgw/verify/tasks/cls.yaml | 1 + qa/workunits/cls/test_cls_sem_set.sh | 5 + src/cls/CMakeLists.txt | 10 +- src/cls/sem_set/DESIGN.md | 134 ++++++++++ src/cls/sem_set/module.cc | 256 ++++++++++++++++++ src/cls/sem_set/ops.h | 135 ++++++++++ src/common/options/osd.yaml.in | 4 +- src/neorados/cls/sem_set.h | 315 +++++++++++++++++++++++ src/osd/CMakeLists.txt | 2 +- src/test/CMakeLists.txt | 1 + src/test/cls_sem_set/CMakeLists.txt | 15 ++ src/test/cls_sem_set/test_cls_sem_set.cc | 262 +++++++++++++++++++ 12 files changed, 1136 insertions(+), 4 deletions(-) create mode 100755 qa/workunits/cls/test_cls_sem_set.sh create mode 100644 src/cls/sem_set/DESIGN.md create mode 100644 src/cls/sem_set/module.cc create mode 100644 src/cls/sem_set/ops.h create mode 100644 src/neorados/cls/sem_set.h create mode 100644 src/test/cls_sem_set/CMakeLists.txt create mode 100644 src/test/cls_sem_set/test_cls_sem_set.cc diff --git a/qa/suites/rgw/verify/tasks/cls.yaml b/qa/suites/rgw/verify/tasks/cls.yaml index 26f948d42ec..4748d1f69f5 100644 --- a/qa/suites/rgw/verify/tasks/cls.yaml +++ b/qa/suites/rgw/verify/tasks/cls.yaml @@ -16,6 +16,7 @@ tasks: - cls/test_cls_cmpomap.sh - cls/test_cls_2pc_queue.sh - cls/test_cls_user.sh + - cls/test_cls_sem_set.sh - rgw/test_rgw_gc_log.sh - rgw/test_rgw_obj.sh - rgw/test_librgw_file.sh diff --git a/qa/workunits/cls/test_cls_sem_set.sh b/qa/workunits/cls/test_cls_sem_set.sh new file mode 100755 index 00000000000..4624e5e8c0a --- /dev/null +++ b/qa/workunits/cls/test_cls_sem_set.sh @@ -0,0 +1,5 @@ +#!/bin/sh -e + +ceph_test_cls_sem_set + +exit 0 diff --git a/src/cls/CMakeLists.txt b/src/cls/CMakeLists.txt index 953ac83195f..fc98e62c126 100644 --- a/src/cls/CMakeLists.txt +++ b/src/cls/CMakeLists.txt @@ -358,4 +358,12 @@ set_target_properties(cls_fifo PROPERTIES target_link_libraries(cls_fifo ${FMT_LIB}) install(TARGETS cls_fifo DESTINATION ${cls_dir}) - +# cls_sem_set +set(cls_sem_set_srcs sem_set/module.cc) +add_library(cls_sem_set SHARED ${cls_sem_set_srcs}) +set_target_properties(cls_sem_set PROPERTIES + VERSION "1.0.0" + SOVERSION "1" + INSTALL_RPATH "" + CXX_VISIBILITY_PRESET hidden) +install(TARGETS cls_sem_set DESTINATION ${cls_dir}) diff --git a/src/cls/sem_set/DESIGN.md b/src/cls/sem_set/DESIGN.md new file mode 100644 index 00000000000..b1ab92a03fb --- /dev/null +++ b/src/cls/sem_set/DESIGN.md @@ -0,0 +1,134 @@ +# Closing the RGWDataChangesLog write hole # + +The Cause: The 'window' optimization creates a situation where some +changes exist only in RAM until a timer expires. If an RGW crashes +after a write, the data log entry might never be made. + +## Requirements ## + +* We need to handle syncing to multiple zones +* Some zones may not sync some buckets +* We need to maintain a strict ordering within the shard, with shards + with the oldest un-swnc activity coming at the top of the listing +* We need to coalesce writes within a shard +* Spurious entries are permitted (but should be infrequent). They + impact efficiency, not correctness. + +## Definitions ## + +* `RGWDataChangesLog`: The class implementing the datalog + functionality. +* `cur_cycle`: The current working set of entries that will be written + to FIFO at the close of the window. +* `add_entry`: The function in the datalog API to write an entry. +* `renew_entries`: Function run periodically to write an entry to FIFO + for every bucketshard in `cur_cycle`. +* `recover`: Proposed function to complete initiated but incomplete datalog writes. +* semaphore object: Proposed object on the OSD holding a count for + every bucketshard, sharded the same as datalog shards. +* bs1, ...: Arbitrary bucket shards + +## Current Proposal ## + +* Make `add_entry` a transaction. +* Reify `cur_cycle` on the OSD in the semaphore object. +* Add a `recover` function to be run at startup. +* Keep the window optimization. + + +### RGWDataChangesLog ### + +* When `add_entry` is called with bs1: + + If `cur_cycle` does not contain bs1, increment bs1's count in + the semaphore object + + Otherwise, proceed as we do currently. +* In `renew_entries`, after the FIFO entry for a given shard has been + written, decrement the count associated with that bucket shard on + the semaphore object. +* In `recover`: + 1. Read the semaphore object, keeping bucketshards and counts in + memory (`unordered_map`). + 2. For every bucketshard, write an entry into the FIFO. + 3. Send a notification on the semaphore object + 4. On receiving a notification, RGWDataChangesLog will send + `cur_cycle` as the response. + 5. On successful notify, go through each response and decrement + each bucketshard in the unordered map. (e.g. if three RGWs + respond with bs1 in their `cur_cycle`, bs1 will be decremented + thrice) + 6. For each entry in the unordered map, decrement on the semaphore + object only if the object's count is greater than 0. + 7. If the `notify` operation errors, don't decrement anything. +* Have some task call `compress` on a regular basis (Daily? Hourly?), + to keep seldom used or deleted bucket shards from slowing down + listing + + +### CLS module requirements ### + +* Our back-end data store will be omap. We will use exactly one + key/value pair for every bucketshard in the system +* To avoid the performance problems of deletes and reinsertions, don't + delete keys when they fall to zero, just set their value to 0. + +#### Operations #### + +* increment([bucketshard, ...]) -> {}: + - For each bucketshard, look up the given key in omap. If it + exists, set the value to one more than is currently + there. Otherwise create it with a value of 1. +* decrement([bucketshard, ...], grace) -> {}: + - For each bucketshard, look it up in omap. If it does not exist + or the value is 0, error. If the last decrement was within the + 'grace' timespan, skip it. Otherwise write decremented value. + - Should it actually error or do we want saturating arithmetic at + 0? Given that the recovery design proposed below allows values + to remain spuriously non-decremented but tries to rule out + spurious decrements, we likely want to have the system scream + bloody murder if we underflow. +* list(cursor?) -> ([(entry, count), ...], cursor): + - Return entries starting from cursor. Skip any with a semaphore of 0. +* compress(cursor?) -> cursor?: + - Go through and delete all entries with a 0 count. + - Return cursor if we can't fit more calls the operation + and need another go-round + +## Analysis ## + +* Casey's semaphore idea solves the race between `add_entry` and + `renew_entries` within a single process and between RGWs on + different hosts +* The use of watch/notify solves the potential race between `recover` + and other RGWs, ensuring that we won't delete someone else's + transaction. +* As described, notify fails safe, as we don't decrement semaphores on + error. This can lead to spurious recover in the future, but that's + not harmful. +* In general, unpaired increments are considered acceptable, as + they'll tend toward zero over time through recovery. +* Watch/Notify seems a better choice than locking, as it can't + introduce a stall on `add_entry`. +* `increment`/`decrement` take a vector of bucketshards so they're + ready to support batching. +* We will never have more omap entries than we do bucketshards in the + system, and the operations are eventually all read-modify-write. Not + deleting saves us from the worst pathologies of omap. + +## Questions ## + +* Do we want to shard the semaphore object? + - [casey] yes, for write parallelism. we don't want all object + writes to serialize on a single rados object + +* What happens when we delete buckets? Would we need to clean up the + asociated bucketshards and delete them? + - [casey] same comes up for bucket reshards, where we add :gen to + these bucket-shard-gen keys i do think we want to delete keys + after they reach 0. ideally we'd do this in batches rocksdb + deletes suck because they leave lots of tombstones behind, but + that mainly effects listing performance. here we only list for + recovery, and recovery only wants to see entries with count > 0 + +* Is a separate `compress` function really necessary? It might be + worth doing test runs between a version that just deletes inline and + one that uses a separate compress step. diff --git a/src/cls/sem_set/module.cc b/src/cls/sem_set/module.cc new file mode 100644 index 00000000000..66d33b9dbc3 --- /dev/null +++ b/src/cls/sem_set/module.cc @@ -0,0 +1,256 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include +#include +#include +#include +#include + +#include + +#include "include/rados/objclass.h" + +#include "cls/sem_set/ops.h" + +#include "objclass/objclass.h" + + +namespace buffer = ::ceph::buffer; +namespace sys = ::boost::system; +namespace ss = ::cls::sem_set; + +using namespace std::literals; + +namespace { +/// So we don't clash with other OMAP keys. +inline constexpr auto PREFIX = "CLS_SEM_SET_"sv; + +struct sem_val { + uint64_t value = 0; + ceph::real_time last_decrement = ceph::real_time::min(); + + sem_val() = default; + + sem_val(uint64_t value) : value(value) {} + + void encode(buffer::list& bl) const { + ENCODE_START(1, 1, bl); + encode(value, bl); + encode(last_decrement, bl); + ENCODE_FINISH(bl); + } + + void decode(buffer::list::const_iterator& bl) { + DECODE_START(1, bl); + decode(value, bl); + decode(last_decrement, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(sem_val); + + +int increment(cls_method_context_t hctx, buffer::list *in, buffer::list *out) +{ + CLS_LOG(10, "%s", __PRETTY_FUNCTION__); + + ss::increment op; + try { + auto iter = in->cbegin(); + decode(op, iter); + } catch (const std::exception& e) { + CLS_ERR("ERROR: %s: failed to decode request: %s", __PRETTY_FUNCTION__, + e.what()); + return -EINVAL; + } + + if (op.keys.size() > ::cls::sem_set::max_keys) { + CLS_ERR("ERROR: %s: too many keys: %zu", __PRETTY_FUNCTION__, + op.keys.size()); + return -E2BIG; + } + + for (const auto& key_ : op.keys) try { + buffer::list valbl; + auto key = std::string(PREFIX) + key_; + auto r = cls_cxx_map_get_val(hctx, key, &valbl); + sem_val val; + if (r >= 0) { + auto bi = valbl.cbegin(); + decode(val, bi); + valbl.clear(); + } else if (r == -ENOENT) { + val.value = 0; + } else { + CLS_ERR("ERROR: %s: failed to read semaphore: r=%d", + __PRETTY_FUNCTION__, r); + return r; + } + val.value += 1; + encode(val, valbl); + r = cls_cxx_map_set_val(hctx, key, &valbl); + if (r < 0) { + CLS_ERR("ERROR: %s: failed to update semaphore: r=%d", + __PRETTY_FUNCTION__, r); + return r; + } + } catch (const std::exception& e) { + CLS_ERR("CAN'T HAPPEN: %s: failed to decode semaphore: %s", + __PRETTY_FUNCTION__, e.what()); + return -EIO; + } + + return 0; +} + +int decrement(cls_method_context_t hctx, buffer::list *in, buffer::list *out) +{ + CLS_LOG(10, "%s", __PRETTY_FUNCTION__); + + ss::decrement op; + try { + auto iter = in->cbegin(); + decode(op, iter); + } catch (const std::exception& e) { + CLS_ERR("ERROR: %s: failed to decode request: %s", __PRETTY_FUNCTION__, + e.what()); + return -EINVAL; + } + + if (op.keys.size() > ::cls::sem_set::max_keys) { + CLS_ERR("ERROR: %s: too many keys: %zu", __PRETTY_FUNCTION__, + op.keys.size()); + return -E2BIG; + } + + auto now = ceph::real_clock::now(); + for (const auto& key_ : op.keys) try { + buffer::list valbl; + auto key = std::string(PREFIX) + key_; + auto r = cls_cxx_map_get_val(hctx, key, &valbl); + if (r < 0) { + CLS_ERR("ERROR: %s: failed to read semaphore: r=%d", + __PRETTY_FUNCTION__, r); + return r; + } + sem_val val; + auto bi = valbl.cbegin(); + decode(val, bi); + // Don't decrement if we're within the grace period (or there's + // screwy time stuff) + if ((now < val.last_decrement) || + (now - val.last_decrement < op.grace)) { + continue; + } + if (val.value > 1) { + --(val.value); + val.last_decrement = now; + valbl.clear(); + encode(val, valbl); + r = cls_cxx_map_set_val(hctx, key, &valbl); + if (r < 0) { + CLS_ERR("ERROR: %s: failed to update semaphore: r=%d", + __PRETTY_FUNCTION__, r); + return r; + } + } else { + r = cls_cxx_map_remove_key(hctx, key); + if (r < 0) { + CLS_ERR("ERROR: %s: failed to remove key %s: r=%d", + __PRETTY_FUNCTION__, key.c_str(), r); + return r; + } + } + } catch (const std::exception& e) { + CLS_ERR("CORRUPTION: %s: failed to decode semaphore: %s", + __PRETTY_FUNCTION__, e.what()); + return -EIO; + } + + return 0; +} + +int list(cls_method_context_t hctx, buffer::list *in, buffer::list *out) +{ + CLS_LOG(10, "%s", __PRETTY_FUNCTION__); + + ss::list_op op; + try { + auto iter = in->cbegin(); + decode(op, iter); + } catch (const std::exception& e) { + CLS_ERR("ERROR: %s: failed to decode request: %s", __PRETTY_FUNCTION__, + e.what()); + return -EINVAL; + } + + auto count = op.count; + ss::list_ret res; + res.cursor = op.cursor; + + while (count > 0) { + auto cursor = std::string{PREFIX} + res.cursor + '\001'; + std::map vals; + bool more = false; + auto r = cls_cxx_map_get_vals(hctx, cursor, + std::string(PREFIX), count, + &vals, &more); + if (r < 0) { + CLS_ERR("ERROR: %s: failed to read semaphore: r=%d", + __PRETTY_FUNCTION__, r); + return r; + } + + count = vals.size() <= count ? count - vals.size() : 0; + + if (!vals.empty()) { + res.cursor = std::string{(--vals.end())->first, PREFIX.size()}; + } + try { + for (auto&& [key_, valbl] : vals) { + sem_val val; + auto bi = valbl.cbegin(); + decode(val, bi); + res.kvs.emplace(std::piecewise_construct, + std::forward_as_tuple(std::move(key_), PREFIX.size()), + std::forward_as_tuple(val.value)); + } + } catch (const sys::system_error& e) { + CLS_ERR("CAN'T HAPPEN: %s: failed to decode seamaphore: %s", + __PRETTY_FUNCTION__, e.what()); + return ceph::from_error_code(e.code());; + } + if (!more) { + res.cursor.clear(); + break; + } + } + + encode(res, *out); + return 0; +} +} // namespace (anonymous) + +CLS_INIT(sem_set) +{ + cls_handle_t h_class; + cls_method_handle_t h_increment; + cls_method_handle_t h_decrement; + cls_method_handle_t h_list; + + cls_register(ss::CLASS, &h_class); + cls_register_cxx_method(h_class, ss::INCREMENT, + CLS_METHOD_RD | CLS_METHOD_WR, + &increment, &h_increment); + + cls_register_cxx_method(h_class, ss::DECREMENT, + CLS_METHOD_RD | CLS_METHOD_WR, + &decrement, &h_decrement); + + cls_register_cxx_method(h_class, ss::LIST, + CLS_METHOD_RD, + &list, &h_list); + + return; +} diff --git a/src/cls/sem_set/ops.h b/src/cls/sem_set/ops.h new file mode 100644 index 00000000000..24d63dca136 --- /dev/null +++ b/src/cls/sem_set/ops.h @@ -0,0 +1,135 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include +#include +#include +#include + +#include +#include + +#include "include/encoding.h" + +namespace cls::sem_set { +using namespace std::literals; + +inline constexpr auto max_keys = 1'000u; + +namespace buffer = ceph::buffer; + +struct increment { + boost::container::flat_set keys; + + increment() = default; + + increment(std::string s) + : keys({std::move(s)}) {} + + increment(decltype(keys) s) + : keys(std::move(s)) {} + + template + increment(I begin, I end) + requires std::is_convertible_v::value_type, + std::string> + : keys(begin, end) {} + + void encode(buffer::list& bl) const { + ENCODE_START(1, 1, bl); + encode(keys, bl); + ENCODE_FINISH(bl); + } + + void decode(buffer::list::const_iterator& bl) { + DECODE_START(1, bl); + decode(keys, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(increment); + +struct decrement { + boost::container::flat_set keys; + ceph::timespan grace; + + decrement() = default; + + decrement(std::string s, ceph::timespan grace = 0ns) + : keys({std::move(s)}), grace(grace) {} + + decrement(decltype(keys) s, ceph::timespan grace = 0ns) + : keys(std::move(s)), grace(grace) {} + + template + decrement(I begin, I end, ceph::timespan grace = 0ns) + requires std::is_convertible_v::value_type, + std::string> + : keys(begin, end), grace(grace) {} + + void encode(buffer::list& bl) const { + ENCODE_START(1, 1, bl); + encode(keys, bl); + encode(grace, bl); + ENCODE_FINISH(bl); + } + + void decode(buffer::list::const_iterator& bl) { + DECODE_START(1, bl); + decode(keys, bl); + decode(grace, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(decrement); + +struct list_op { + std::uint64_t count; + std::string cursor; + + void encode(buffer::list& bl) const { + ENCODE_START(1, 1, bl); + encode(count, bl); + encode(cursor, bl); + ENCODE_FINISH(bl); + } + + void decode(buffer::list::const_iterator& bl) { + DECODE_START(1, bl); + decode(count, bl); + decode(cursor, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(list_op); + +struct list_ret { + boost::container::flat_map kvs; + std::string cursor; + + list_ret() = default; + + void encode(buffer::list& bl) const { + ENCODE_START(1, 1, bl); + encode(kvs, bl); + encode(cursor, bl); + ENCODE_FINISH(bl); + } + + void decode(buffer::list::const_iterator& bl) { + DECODE_START(1, bl); + decode(kvs, bl); + decode(cursor, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(list_ret); + +inline constexpr auto CLASS = "sem_set"; +inline constexpr auto INCREMENT = "increment"; +inline constexpr auto DECREMENT = "decrement"; +inline constexpr auto LIST = "list"; + +} // namespace cls::sem_set diff --git a/src/common/options/osd.yaml.in b/src/common/options/osd.yaml.in index 3ba8f3d2770..ac9072198ae 100644 --- a/src/common/options/osd.yaml.in +++ b/src/common/options/osd.yaml.in @@ -666,14 +666,14 @@ options: type: str level: advanced default: cephfs hello journal lock log numops otp rbd refcount rgw rgw_gc timeindex - user version cas cmpomap queue 2pc_queue fifo + user version cas cmpomap queue 2pc_queue fifo sem_set with_legacy: true # list of object classes with default execute perm (allow all: *) - name: osd_class_default_list type: str level: advanced default: cephfs hello journal lock log numops otp rbd refcount rgw rgw_gc timeindex - user version cas cmpomap queue 2pc_queue fifo + user version cas cmpomap queue 2pc_queue fifo sem_set with_legacy: true - name: osd_agent_max_ops type: int diff --git a/src/neorados/cls/sem_set.h b/src/neorados/cls/sem_set.h new file mode 100644 index 00000000000..ec810af9313 --- /dev/null +++ b/src/neorados/cls/sem_set.h @@ -0,0 +1,315 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2024 IBM + * + * See file COPYING for license information. + * + */ + +#pragma once + +/// \file neodrados/cls/sem_set.h +/// +/// \brief NeoRADOS interface to semaphore set class +/// +/// The `sem_set` object class stores a set of strings with associated +/// semaphores in OMAP. + +#include +#include +#include + +#include +#include + +#include "include/buffer.h" + +#include "include/neorados/RADOS.hpp" + +#include "cls/sem_set/ops.h" + +namespace neorados::cls::sem_set { +using namespace std::literals; + +/// \brief The maximum number of keys per op +/// +using ::cls::sem_set::max_keys; + +/// \brief Increment semaphore +/// +/// Append a call to a write operation that increments the semaphore +/// on a key. +/// +/// \param key Key to increment +/// +/// \return The ClsWriteOp to be passed to WriteOp::exec +[[nodiscard]] inline auto increment(std::string key) +{ + namespace ss = ::cls::sem_set; + buffer::list in; + ss::increment call{std::move(key)}; + encode(call, in); + return ClsWriteOp{[in = std::move(in)](WriteOp& op) { + op.exec(ss::CLASS, ss::INCREMENT, in); + }}; +} + +/// \brief Increment semaphores +/// +/// Append a call to a write operation that increments the semaphores +/// on a set of keys. +/// +/// \param keys Keys to increment +/// +/// \return The ClsWriteOp to be passed to WriteOp::exec +[[nodiscard]] inline auto increment(std::initializer_list keys) +{ + namespace ss = ::cls::sem_set; + namespace buffer = ::ceph::buffer; + buffer::list in; + ss::increment call{keys}; + encode(call, in); + return ClsWriteOp{[in = std::move(in)](WriteOp& op) { + op.exec(ss::CLASS, ss::INCREMENT, in); + }}; +} + +/// \brief Increment semaphores +/// +/// Append a call to a write operation that increments the semaphores +/// on a set of keys. +/// +/// \param keys Keys to increment +/// +/// \return The ClsWriteOp to be passed to WriteOp::exec +[[nodiscard]] inline auto increment(boost::container::flat_set keys) +{ + namespace ss = ::cls::sem_set; + namespace buffer = ::ceph::buffer; + buffer::list in; + ss::increment call{std::move(keys)}; + encode(call, in); + return ClsWriteOp{[in = std::move(in)](WriteOp& op) { + op.exec(ss::CLASS, ss::INCREMENT, in); + }}; +} + +/// \brief Increment semaphores +/// +/// Append a call to a write operation that increments the semaphores +/// on a set of keys. +/// +/// \param keys Keys to increment +/// +/// \return The ClsWriteOp to be passed to WriteOp::exec +template +[[nodiscard]] inline auto increment(I begin, I end) + requires std::is_convertible_v::value_type, + std::string> +{ + namespace ss = ::cls::sem_set; + namespace buffer = ::ceph::buffer; + buffer::list in; + ss::increment call{begin, end}; + encode(call, in); + return ClsWriteOp{[in = std::move(in)](WriteOp& op) { + op.exec(ss::CLASS, ss::INCREMENT, in); + }}; +} + +/// \brief Decrement semaphore +/// +/// Append a call to a write operation that decrements the semaphore +/// on a key. +/// +/// \param key Key to decrement +/// \param grace Don't decrement anything decremented more recently. +/// +/// \return The ClsWriteOp to be passed to WriteOp::exec +[[nodiscard]] inline auto +decrement(std::string key, ceph::timespan grace = 0ns) +{ + namespace ss = ::cls::sem_set; + namespace buffer = ::ceph::buffer; + buffer::list in; + ss::decrement call{std::move(key), grace}; + encode(call, in); + return ClsWriteOp{[in = std::move(in)](WriteOp& op) { + op.exec(ss::CLASS, ss::DECREMENT, in); + }}; +} + +/// \brief Decrement semaphores +/// +/// Append a call to a write operation that decrements the semaphores +/// on a set of keys. +/// +/// \param keys Keys to decrement +/// \param grace Don't decrement anything decremented more recently. +/// +/// \return The ClsWriteOp to be passed to WriteOp::exec +[[nodiscard]] inline auto +decrement(std::initializer_list keys, ceph::timespan grace = 0ns) +{ + namespace ss = ::cls::sem_set; + namespace buffer = ::ceph::buffer; + buffer::list in; + ss::decrement call{keys, grace}; + encode(call, in); + return ClsWriteOp{[in = std::move(in)](WriteOp& op) { + op.exec(ss::CLASS, ss::DECREMENT, in); + }}; +} + +/// \brief Decrement semaphores +/// +/// Append a call to a write operation that decrements the semaphores +/// on a set of keys. +/// +/// \param keys Keys to decrement +/// \param grace Don't decrement anything decremented more recently. +/// +/// \return The ClsWriteOp to be passed to WriteOp::exec +[[nodiscard]] inline auto +decrement(boost::container::flat_set keys, ceph::timespan grace = 0ns) +{ + namespace ss = ::cls::sem_set; + namespace buffer = ::ceph::buffer; + buffer::list in; + ss::decrement call{std::move(keys), grace}; + encode(call, in); + return ClsWriteOp{[in = std::move(in)](WriteOp& op) { + op.exec(ss::CLASS, ss::DECREMENT, in); + }}; +} + +/// \brief decrement semaphores +/// +/// Append a call to a write operation that decrements the semaphores +/// on a set of keys. +/// +/// \param keys Keys to decrement +/// \param grace Don't decrement anything decremented more recently. +/// +/// \return The ClsWriteOp to be passed to WriteOp::exec +template +[[nodiscard]] inline auto +decrement(I begin, I end, ceph::timespan grace = 0ns) + requires std::is_convertible_v +{ + namespace ss = ::cls::sem_set; + namespace buffer = ::ceph::buffer; + buffer::list in; + ss::decrement call{begin, end, grace}; + encode(call, in); + return ClsWriteOp{[in = std::move(in)](WriteOp& op) { + op.exec(ss::CLASS, ss::DECREMENT, in); + }}; +} + +/// \brief List keys and semaphores +/// +/// Append a call to a read operation that lists keys and semaphores +/// +/// \note This *appends* to the `entries`, it does not *clear* it. +/// +/// \param count Number of entries to fetch +/// \param cursor Where to start +/// \param entries Unordered list to which entries are appended +/// \param new_cursor Where to start for the next iteration, empty if completed +/// +/// \return The ClsReadOp to be passed to WriteOp::exec +[[nodiscard]] inline auto list( + std::uint64_t count, std::string cursor, + boost::container::flat_map* const entries, + std::string* const new_cursor) +{ + namespace ss = ::cls::sem_set; + namespace sys = ::boost::system; + namespace buffer = ::ceph::buffer; + buffer::list in; + ::cls::sem_set::list_op call; + call.count = count; + call.cursor = std::move(cursor); + + encode(call, in); + return ClsReadOp{[entries, new_cursor, + in = std::move(in)](ReadOp& op) { + op.exec(ss::CLASS, ss::LIST, in, + [entries, new_cursor](sys::error_code ec, const buffer::list& bl) { + ss::list_ret ret; + if (!ec) { + auto iter = bl.cbegin(); + try { + decode(ret, iter); + } catch (const sys::system_error& e) { + if (e.code() == buffer::errc::end_of_buffer && + bl.length() == 0) { + // It looks like if the object doesn't exist the + // CLS function isn't called and we don't get + // -ENOENT even though we get it for the op, we + // just get an empty buffer. This is crap. + if (new_cursor) { + new_cursor->clear(); + } + return; + } else { + throw; + } + } + if (entries) { + entries->reserve(entries->size() + ret.kvs.size()); + entries->merge(std::move(ret.kvs)); + } + if (new_cursor) { + *new_cursor = std::move(ret.cursor); + } + } + }); + }}; +} + +/// \brief List keys and semaphores +/// +/// Append a call to a read operation that lists keys and semaphores +/// +/// \param count Number of entries to fetch +/// \param cursor Where to start +/// \param out Output iterator +/// \param new_cursor Where to start for the next iteration, empty if completed +/// +/// \return The ClsReadOp to be passed to WriteOp::exec +template> I> +[[nodiscard]] inline auto list(std::uint64_t count, + std::string cursor, I output, + std::string* const new_cursor) +{ + namespace ss = ::cls::sem_set; + using boost::system::error_code; + namespace buffer = ::ceph::buffer; + buffer::list in; + ::cls::sem_set::list_op call; + call.count = count; + call.cursor = std::move(cursor); + + encode(call, in); + return ClsReadOp{[output, new_cursor, + in = std::move(in)](ReadOp& op) { + op.exec(ss::CLASS, ss::LIST, in, + [output, new_cursor](error_code ec, const buffer::list& bl) { + ss::list_ret ret; + if (!ec) { + auto iter = bl.cbegin(); + decode(ret, iter); + std::move(ret.kvs.begin(), ret.kvs.end(), output); + if (new_cursor) { + *new_cursor = std::move(ret.cursor); + } + } + }); + }}; +} +} // namespace neorados::cls::sem_set diff --git a/src/osd/CMakeLists.txt b/src/osd/CMakeLists.txt index 94579ba6ef5..e7f579f3841 100644 --- a/src/osd/CMakeLists.txt +++ b/src/osd/CMakeLists.txt @@ -88,5 +88,5 @@ if(WITH_RBD) add_dependencies(osd cls_rbd) endif() if(WITH_RADOSGW) - add_dependencies(osd cls_otp cls_rgw cls_queue cls_rgw_gc cls_2pc_queue cls_fifo) + add_dependencies(osd cls_otp cls_rgw cls_queue cls_rgw_gc cls_2pc_queue cls_fifo cls_sem_set) endif() diff --git a/src/test/CMakeLists.txt b/src/test/CMakeLists.txt index 664e51387a9..1fd2bb5e98b 100644 --- a/src/test/CMakeLists.txt +++ b/src/test/CMakeLists.txt @@ -58,6 +58,7 @@ if(NOT WIN32) add_subdirectory(cls_2pc_queue) add_subdirectory(cls_cmpomap) add_subdirectory(cls_fifo) + add_subdirectory(cls_sem_set) add_subdirectory(journal) add_subdirectory(erasure-code) diff --git a/src/test/cls_sem_set/CMakeLists.txt b/src/test/cls_sem_set/CMakeLists.txt new file mode 100644 index 00000000000..d01cc928e06 --- /dev/null +++ b/src/test/cls_sem_set/CMakeLists.txt @@ -0,0 +1,15 @@ +add_executable(ceph_test_cls_sem_set + test_cls_sem_set.cc + ) +target_link_libraries(ceph_test_cls_sem_set + libneorados + ${BLKID_LIBRARIES} + ${CMAKE_DL_LIBS} + ${CRYPTO_LIBS} + ${EXTRALIBS} + neoradostest-support + ${UNITTEST_LIBS} + ) +install(TARGETS + ceph_test_cls_sem_set + DESTINATION ${CMAKE_INSTALL_BINDIR}) diff --git a/src/test/cls_sem_set/test_cls_sem_set.cc b/src/test/cls_sem_set/test_cls_sem_set.cc new file mode 100644 index 00000000000..1015fe38670 --- /dev/null +++ b/src/test/cls_sem_set/test_cls_sem_set.cc @@ -0,0 +1,262 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2023 IBM + * + * See file COPYING for license information. + * + */ + +#include "neorados/cls/sem_set.h" + +#include +#include +#include + +#include + +#include + +#include "gtest/gtest.h" + +#include "include/neorados/RADOS.hpp" + +#include "test/neorados/common_tests.h" + +namespace sem_set = neorados::cls::sem_set; +namespace sys = boost::system; +namespace container = boost::container; + +using neorados::ReadOp; +using neorados::WriteOp; + +using namespace std::literals; + +/// Increment semaphore multiple times. Decrement the same number of +/// times. Decrement once more and expect an error. +CORO_TEST_F(cls_sem_set, inc_dec, NeoRadosTest) +{ + std::string_view oid = "obj"; + co_await create_obj(oid); + + auto key = "foo"; + for (auto i = 0; i < 10; ++i) { + co_await execute(oid, WriteOp{}.exec(sem_set::increment(key))); + } + for (auto i = 0; i < 10; ++i) { + co_await execute(oid, WriteOp{}.exec(sem_set::decrement(key))); + } + + co_await expect_error_code( + execute(oid, WriteOp{}.exec(sem_set::decrement(key))), + sys::errc::no_such_file_or_directory); + co_return; +} + +// TODO: Uncomment when we get a GCC everywhere that doesn't crash +// when compiling it. + +// /// Increment semaphore multiple times. Decrement the same number of +// /// times. Decrement once more and expect an error. But this time use +// /// the initializer list definition and two keys. +// CORO_TEST_F(cls_sem_set, inc_dec_init_list, NeoRadosTest) +// { +// std::string_view oid = "obj"; +// co_await create_obj(oid); + +// auto key1 = "foo"; +// auto key2 = "bar"; +// for (auto i = 0; i < 10; ++i) { +// co_await execute(oid, WriteOp{}.exec(sem_set::increment({key1, key2}))); +// } +// for (auto i = 0; i < 10; ++i) { +// co_await execute(oid, WriteOp{}.exec(sem_set::decrement({key1, key2}))); +// } + +// co_await expect_error_code( +// execute(oid, WriteOp{}.exec(sem_set::decrement({key1, key2}))), +// sys::errc::no_such_file_or_directory); +// co_return; +// } + +/// Send too many keys to increment, expecting an error. Then send too +/// many keys to decrement, expecting an error. +CORO_TEST_F(cls_sem_set, inc_dec_overflow, NeoRadosTest) +{ + std::string_view oid = "obj"; + co_await create_obj(oid); + + container::flat_set strings; + for (auto i = 0u; i < 2 * sem_set::max_keys; ++i) { + strings.insert(fmt::format("key{}", i)); + } + + co_await expect_error_code( + execute(oid, WriteOp{}.exec(sem_set::increment(strings.begin(), + strings.end()))), + sys::errc::argument_list_too_long); + co_await expect_error_code( + execute(oid, WriteOp{}.exec(sem_set::decrement(strings.begin(), + strings.end()))), + sys::errc::argument_list_too_long); +} + + +/// Write the maximum number of keys we can return in one listing. Do +/// one listing and ensure that the returned cursor is empty (thus +/// showing list returned all values.) Repeat twice more to give +/// coverage to the other two definitions of list. +CORO_TEST_F(cls_sem_set, list_small, NeoRadosTest) +{ + std::string_view oid = "obj"; + co_await create_obj(oid); + const auto max = sem_set::max_keys; + + container::flat_set ref; + for (auto i = 0u; i < max; ++i) { + ref.insert(fmt::format("key{}", i)); + } + co_await execute(oid, WriteOp{}.exec(sem_set::increment(ref.begin(), + ref.end()))); + { + container::flat_map res; + std::string cursor; + co_await execute(oid, ReadOp{}.exec(sem_set::list(max, {}, + &res, &cursor))); + EXPECT_TRUE(cursor.empty()); + for (const auto& [key, value] : res) { + EXPECT_TRUE(ref.contains(key)); + } + for (const auto& key : ref) { + EXPECT_TRUE(res.contains(key)); + } + } + { + std::map res; + std::string cursor; + co_await execute(oid, ReadOp{}.exec( + sem_set::list(max, {}, + std::inserter(res, res.end()), &cursor))); + EXPECT_TRUE(cursor.empty()); + for (const auto& [key, value] : res) { + EXPECT_TRUE(ref.contains(key)); + } + for (const auto& key : ref) { + EXPECT_TRUE(res.contains(key)); + } + } + { + std::vector> res; + std::string cursor; + co_await execute(oid, ReadOp{}.exec( + sem_set::list(max, {}, + std::inserter(res, res.end()), &cursor))); + EXPECT_TRUE(cursor.empty()); + for (const auto& [key, value] : res) { + EXPECT_TRUE(ref.contains(key)); + } + EXPECT_EQ(ref.size(), res.size()); + } +} + +/// Write the maximum number of keys we can return in one listing +/// several times. Do an iterated listing to test cursor +/// functionality and append. Check that we have all of and only the +/// keys we should. Repeat for the other two list functions. +CORO_TEST_F(cls_sem_set, list_large, NeoRadosTest) +{ + std::string_view oid = "obj"; + co_await create_obj(oid); + const auto max = sem_set::max_keys; + + container::flat_set ref; + for (auto i = 0u; i < 4; ++i) { + container::flat_set part; + for (auto j = 0u; j < max; ++j) { + part.insert(fmt::format("key{}", (i * max) + j)); + } + co_await execute(oid, WriteOp{}.exec(sem_set::increment(part.begin(), + part.end()))); + ref.merge(std::move(part)); + } + EXPECT_EQ(4 * max, ref.size()); + std::string cursor; + + container::flat_map res_um; + do { + co_await execute(oid, ReadOp{}.exec( + sem_set::list(max, cursor, &res_um, &cursor))); + } while (!cursor.empty()); + for (const auto& [key, value] : res_um) { + EXPECT_TRUE(ref.contains(key)); + } + for (const auto& key : ref) { + EXPECT_TRUE(res_um.contains(key)); + } + + std::map res_m; + cursor.clear(); + do { + co_await execute(oid, ReadOp{}.exec( + sem_set::list(max, cursor, + std::inserter(res_m, res_m.end()), + &cursor))); + } while (!cursor.empty()); + for (const auto& [key, value] : res_m) { + EXPECT_TRUE(ref.contains(key)); + } + for (const auto& key : ref) { + EXPECT_TRUE(res_m.contains(key)); + } + + std::vector> res_v; + cursor.clear(); + do { + co_await execute(oid, ReadOp{}.exec( + sem_set::list(max, cursor, + std::inserter(res_v, res_v.end()), + &cursor))); + } while (!cursor.empty()); + for (const auto& [key, value] : res_v) { + EXPECT_TRUE(ref.contains(key)); + } + EXPECT_EQ(ref.size(), res_v.size()); +} + +// Increment some semaphores, wait, increment some more, decrement +// them all with a time. +CORO_TEST_F(cls_sem_set, inc_dec_time, NeoRadosTest) +{ + std::string_view oid = "obj"; + co_await create_obj(oid); + + const auto lose = "lose"s; + const auto remain = "remain"s; + + co_await execute(oid, WriteOp{}.exec(sem_set::increment(lose))); + co_await execute(oid, WriteOp{}.exec(sem_set::increment(lose))); + co_await execute(oid, WriteOp{}.exec(sem_set::increment(remain))); + co_await execute(oid, WriteOp{}.exec(sem_set::increment(remain))); + + co_await execute(oid, WriteOp{}.exec(sem_set::decrement(lose))); + + co_await wait_for(1s); + co_await execute(oid, WriteOp{}.exec(sem_set::decrement(remain))); + + container::flat_set keys{lose, remain}; + co_await execute(oid, + WriteOp{}.exec(sem_set::decrement(std::move(keys), 100ms))); + + container::flat_map res; + std::string cursor; + co_await execute(oid, ReadOp{}.exec(sem_set::list(sem_set::max_keys, {}, + &res, &cursor))); + + EXPECT_TRUE(cursor.empty()); + EXPECT_EQ(1u, res.size()); + EXPECT_EQ(remain, res.begin()->first); + + co_return; +} -- 2.39.5