- cls/test_cls_cmpomap.sh
- cls/test_cls_2pc_queue.sh
- cls/test_cls_user.sh
+ - cls/test_cls_sem_set.sh
- rgw/test_rgw_gc_log.sh
- rgw/test_rgw_obj.sh
- rgw/test_librgw_file.sh
--- /dev/null
+#!/bin/sh -e
+
+ceph_test_cls_sem_set
+
+exit 0
target_link_libraries(cls_fifo ${FMT_LIB})
install(TARGETS cls_fifo DESTINATION ${cls_dir})
-
+# cls_sem_set
+set(cls_sem_set_srcs sem_set/module.cc)
+add_library(cls_sem_set SHARED ${cls_sem_set_srcs})
+set_target_properties(cls_sem_set PROPERTIES
+ VERSION "1.0.0"
+ SOVERSION "1"
+ INSTALL_RPATH ""
+ CXX_VISIBILITY_PRESET hidden)
+install(TARGETS cls_sem_set DESTINATION ${cls_dir})
--- /dev/null
+# Closing the RGWDataChangesLog write hole #
+
+The Cause: The 'window' optimization creates a situation where some
+changes exist only in RAM until a timer expires. If an RGW crashes
+after a write, the data log entry might never be made.
+
+## Requirements ##
+
+* We need to handle syncing to multiple zones
+* Some zones may not sync some buckets
+* We need to maintain a strict ordering within the shard, with shards
+ with the oldest un-swnc activity coming at the top of the listing
+* We need to coalesce writes within a shard
+* Spurious entries are permitted (but should be infrequent). They
+ impact efficiency, not correctness.
+
+## Definitions ##
+
+* `RGWDataChangesLog`: The class implementing the datalog
+ functionality.
+* `cur_cycle`: The current working set of entries that will be written
+ to FIFO at the close of the window.
+* `add_entry`: The function in the datalog API to write an entry.
+* `renew_entries`: Function run periodically to write an entry to FIFO
+ for every bucketshard in `cur_cycle`.
+* `recover`: Proposed function to complete initiated but incomplete datalog writes.
+* semaphore object: Proposed object on the OSD holding a count for
+ every bucketshard, sharded the same as datalog shards.
+* bs1, ...: Arbitrary bucket shards
+
+## Current Proposal ##
+
+* Make `add_entry` a transaction.
+* Reify `cur_cycle` on the OSD in the semaphore object.
+* Add a `recover` function to be run at startup.
+* Keep the window optimization.
+
+
+### RGWDataChangesLog ###
+
+* When `add_entry` is called with bs1:
+ + If `cur_cycle` does not contain bs1, increment bs1's count in
+ the semaphore object
+ + Otherwise, proceed as we do currently.
+* In `renew_entries`, after the FIFO entry for a given shard has been
+ written, decrement the count associated with that bucket shard on
+ the semaphore object.
+* In `recover`:
+ 1. Read the semaphore object, keeping bucketshards and counts in
+ memory (`unordered_map`).
+ 2. For every bucketshard, write an entry into the FIFO.
+ 3. Send a notification on the semaphore object
+ 4. On receiving a notification, RGWDataChangesLog will send
+ `cur_cycle` as the response.
+ 5. On successful notify, go through each response and decrement
+ each bucketshard in the unordered map. (e.g. if three RGWs
+ respond with bs1 in their `cur_cycle`, bs1 will be decremented
+ thrice)
+ 6. For each entry in the unordered map, decrement on the semaphore
+ object only if the object's count is greater than 0.
+ 7. If the `notify` operation errors, don't decrement anything.
+* Have some task call `compress` on a regular basis (Daily? Hourly?),
+ to keep seldom used or deleted bucket shards from slowing down
+ listing
+
+
+### CLS module requirements ###
+
+* Our back-end data store will be omap. We will use exactly one
+ key/value pair for every bucketshard in the system
+* To avoid the performance problems of deletes and reinsertions, don't
+ delete keys when they fall to zero, just set their value to 0.
+
+#### Operations ####
+
+* increment([bucketshard, ...]) -> {}:
+ - For each bucketshard, look up the given key in omap. If it
+ exists, set the value to one more than is currently
+ there. Otherwise create it with a value of 1.
+* decrement([bucketshard, ...], grace) -> {}:
+ - For each bucketshard, look it up in omap. If it does not exist
+ or the value is 0, error. If the last decrement was within the
+ 'grace' timespan, skip it. Otherwise write decremented value.
+ - Should it actually error or do we want saturating arithmetic at
+ 0? Given that the recovery design proposed below allows values
+ to remain spuriously non-decremented but tries to rule out
+ spurious decrements, we likely want to have the system scream
+ bloody murder if we underflow.
+* list(cursor?) -> ([(entry, count), ...], cursor):
+ - Return entries starting from cursor. Skip any with a semaphore of 0.
+* compress(cursor?) -> cursor?:
+ - Go through and delete all entries with a 0 count.
+ - Return cursor if we can't fit more calls the operation
+ and need another go-round
+
+## Analysis ##
+
+* Casey's semaphore idea solves the race between `add_entry` and
+ `renew_entries` within a single process and between RGWs on
+ different hosts
+* The use of watch/notify solves the potential race between `recover`
+ and other RGWs, ensuring that we won't delete someone else's
+ transaction.
+* As described, notify fails safe, as we don't decrement semaphores on
+ error. This can lead to spurious recover in the future, but that's
+ not harmful.
+* In general, unpaired increments are considered acceptable, as
+ they'll tend toward zero over time through recovery.
+* Watch/Notify seems a better choice than locking, as it can't
+ introduce a stall on `add_entry`.
+* `increment`/`decrement` take a vector of bucketshards so they're
+ ready to support batching.
+* We will never have more omap entries than we do bucketshards in the
+ system, and the operations are eventually all read-modify-write. Not
+ deleting saves us from the worst pathologies of omap.
+
+## Questions ##
+
+* Do we want to shard the semaphore object?
+ - [casey] yes, for write parallelism. we don't want all object
+ writes to serialize on a single rados object
+
+* What happens when we delete buckets? Would we need to clean up the
+ asociated bucketshards and delete them?
+ - [casey] same comes up for bucket reshards, where we add :gen to
+ these bucket-shard-gen keys i do think we want to delete keys
+ after they reach 0. ideally we'd do this in batches rocksdb
+ deletes suck because they leave lots of tombstones behind, but
+ that mainly effects listing performance. here we only list for
+ recovery, and recovery only wants to see entries with count > 0
+
+* Is a separate `compress` function really necessary? It might be
+ worth doing test runs between a version that just deletes inline and
+ one that uses a separate compress step.
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include <concepts>
+#include <cstdint>
+#include <map>
+#include <string>
+#include <string_view>
+
+#include <boost/system/system_error.hpp>
+
+#include "include/rados/objclass.h"
+
+#include "cls/sem_set/ops.h"
+
+#include "objclass/objclass.h"
+
+
+namespace buffer = ::ceph::buffer;
+namespace sys = ::boost::system;
+namespace ss = ::cls::sem_set;
+
+using namespace std::literals;
+
+namespace {
+/// So we don't clash with other OMAP keys.
+inline constexpr auto PREFIX = "CLS_SEM_SET_"sv;
+
+struct sem_val {
+ uint64_t value = 0;
+ ceph::real_time last_decrement = ceph::real_time::min();
+
+ sem_val() = default;
+
+ sem_val(uint64_t value) : value(value) {}
+
+ void encode(buffer::list& bl) const {
+ ENCODE_START(1, 1, bl);
+ encode(value, bl);
+ encode(last_decrement, bl);
+ ENCODE_FINISH(bl);
+ }
+
+ void decode(buffer::list::const_iterator& bl) {
+ DECODE_START(1, bl);
+ decode(value, bl);
+ decode(last_decrement, bl);
+ DECODE_FINISH(bl);
+ }
+};
+WRITE_CLASS_ENCODER(sem_val);
+
+
+int increment(cls_method_context_t hctx, buffer::list *in, buffer::list *out)
+{
+ CLS_LOG(10, "%s", __PRETTY_FUNCTION__);
+
+ ss::increment op;
+ try {
+ auto iter = in->cbegin();
+ decode(op, iter);
+ } catch (const std::exception& e) {
+ CLS_ERR("ERROR: %s: failed to decode request: %s", __PRETTY_FUNCTION__,
+ e.what());
+ return -EINVAL;
+ }
+
+ if (op.keys.size() > ::cls::sem_set::max_keys) {
+ CLS_ERR("ERROR: %s: too many keys: %zu", __PRETTY_FUNCTION__,
+ op.keys.size());
+ return -E2BIG;
+ }
+
+ for (const auto& key_ : op.keys) try {
+ buffer::list valbl;
+ auto key = std::string(PREFIX) + key_;
+ auto r = cls_cxx_map_get_val(hctx, key, &valbl);
+ sem_val val;
+ if (r >= 0) {
+ auto bi = valbl.cbegin();
+ decode(val, bi);
+ valbl.clear();
+ } else if (r == -ENOENT) {
+ val.value = 0;
+ } else {
+ CLS_ERR("ERROR: %s: failed to read semaphore: r=%d",
+ __PRETTY_FUNCTION__, r);
+ return r;
+ }
+ val.value += 1;
+ encode(val, valbl);
+ r = cls_cxx_map_set_val(hctx, key, &valbl);
+ if (r < 0) {
+ CLS_ERR("ERROR: %s: failed to update semaphore: r=%d",
+ __PRETTY_FUNCTION__, r);
+ return r;
+ }
+ } catch (const std::exception& e) {
+ CLS_ERR("CAN'T HAPPEN: %s: failed to decode semaphore: %s",
+ __PRETTY_FUNCTION__, e.what());
+ return -EIO;
+ }
+
+ return 0;
+}
+
+int decrement(cls_method_context_t hctx, buffer::list *in, buffer::list *out)
+{
+ CLS_LOG(10, "%s", __PRETTY_FUNCTION__);
+
+ ss::decrement op;
+ try {
+ auto iter = in->cbegin();
+ decode(op, iter);
+ } catch (const std::exception& e) {
+ CLS_ERR("ERROR: %s: failed to decode request: %s", __PRETTY_FUNCTION__,
+ e.what());
+ return -EINVAL;
+ }
+
+ if (op.keys.size() > ::cls::sem_set::max_keys) {
+ CLS_ERR("ERROR: %s: too many keys: %zu", __PRETTY_FUNCTION__,
+ op.keys.size());
+ return -E2BIG;
+ }
+
+ auto now = ceph::real_clock::now();
+ for (const auto& key_ : op.keys) try {
+ buffer::list valbl;
+ auto key = std::string(PREFIX) + key_;
+ auto r = cls_cxx_map_get_val(hctx, key, &valbl);
+ if (r < 0) {
+ CLS_ERR("ERROR: %s: failed to read semaphore: r=%d",
+ __PRETTY_FUNCTION__, r);
+ return r;
+ }
+ sem_val val;
+ auto bi = valbl.cbegin();
+ decode(val, bi);
+ // Don't decrement if we're within the grace period (or there's
+ // screwy time stuff)
+ if ((now < val.last_decrement) ||
+ (now - val.last_decrement < op.grace)) {
+ continue;
+ }
+ if (val.value > 1) {
+ --(val.value);
+ val.last_decrement = now;
+ valbl.clear();
+ encode(val, valbl);
+ r = cls_cxx_map_set_val(hctx, key, &valbl);
+ if (r < 0) {
+ CLS_ERR("ERROR: %s: failed to update semaphore: r=%d",
+ __PRETTY_FUNCTION__, r);
+ return r;
+ }
+ } else {
+ r = cls_cxx_map_remove_key(hctx, key);
+ if (r < 0) {
+ CLS_ERR("ERROR: %s: failed to remove key %s: r=%d",
+ __PRETTY_FUNCTION__, key.c_str(), r);
+ return r;
+ }
+ }
+ } catch (const std::exception& e) {
+ CLS_ERR("CORRUPTION: %s: failed to decode semaphore: %s",
+ __PRETTY_FUNCTION__, e.what());
+ return -EIO;
+ }
+
+ return 0;
+}
+
+int list(cls_method_context_t hctx, buffer::list *in, buffer::list *out)
+{
+ CLS_LOG(10, "%s", __PRETTY_FUNCTION__);
+
+ ss::list_op op;
+ try {
+ auto iter = in->cbegin();
+ decode(op, iter);
+ } catch (const std::exception& e) {
+ CLS_ERR("ERROR: %s: failed to decode request: %s", __PRETTY_FUNCTION__,
+ e.what());
+ return -EINVAL;
+ }
+
+ auto count = op.count;
+ ss::list_ret res;
+ res.cursor = op.cursor;
+
+ while (count > 0) {
+ auto cursor = std::string{PREFIX} + res.cursor + '\001';
+ std::map<std::string, ceph::buffer::list> vals;
+ bool more = false;
+ auto r = cls_cxx_map_get_vals(hctx, cursor,
+ std::string(PREFIX), count,
+ &vals, &more);
+ if (r < 0) {
+ CLS_ERR("ERROR: %s: failed to read semaphore: r=%d",
+ __PRETTY_FUNCTION__, r);
+ return r;
+ }
+
+ count = vals.size() <= count ? count - vals.size() : 0;
+
+ if (!vals.empty()) {
+ res.cursor = std::string{(--vals.end())->first, PREFIX.size()};
+ }
+ try {
+ for (auto&& [key_, valbl] : vals) {
+ sem_val val;
+ auto bi = valbl.cbegin();
+ decode(val, bi);
+ res.kvs.emplace(std::piecewise_construct,
+ std::forward_as_tuple(std::move(key_), PREFIX.size()),
+ std::forward_as_tuple(val.value));
+ }
+ } catch (const sys::system_error& e) {
+ CLS_ERR("CAN'T HAPPEN: %s: failed to decode seamaphore: %s",
+ __PRETTY_FUNCTION__, e.what());
+ return ceph::from_error_code(e.code());;
+ }
+ if (!more) {
+ res.cursor.clear();
+ break;
+ }
+ }
+
+ encode(res, *out);
+ return 0;
+}
+} // namespace (anonymous)
+
+CLS_INIT(sem_set)
+{
+ cls_handle_t h_class;
+ cls_method_handle_t h_increment;
+ cls_method_handle_t h_decrement;
+ cls_method_handle_t h_list;
+
+ cls_register(ss::CLASS, &h_class);
+ cls_register_cxx_method(h_class, ss::INCREMENT,
+ CLS_METHOD_RD | CLS_METHOD_WR,
+ &increment, &h_increment);
+
+ cls_register_cxx_method(h_class, ss::DECREMENT,
+ CLS_METHOD_RD | CLS_METHOD_WR,
+ &decrement, &h_decrement);
+
+ cls_register_cxx_method(h_class, ss::LIST,
+ CLS_METHOD_RD,
+ &list, &h_list);
+
+ return;
+}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <cstdint>
+#include <initializer_list>
+#include <iterator>
+#include <string>
+
+#include <boost/container/flat_set.hpp>
+#include <boost/container/flat_map.hpp>
+
+#include "include/encoding.h"
+
+namespace cls::sem_set {
+using namespace std::literals;
+
+inline constexpr auto max_keys = 1'000u;
+
+namespace buffer = ceph::buffer;
+
+struct increment {
+ boost::container::flat_set<std::string> keys;
+
+ increment() = default;
+
+ increment(std::string s)
+ : keys({std::move(s)}) {}
+
+ increment(decltype(keys) s)
+ : keys(std::move(s)) {}
+
+ template<std::input_iterator I>
+ increment(I begin, I end)
+ requires std::is_convertible_v<typename std::iterator_traits<I>::value_type,
+ std::string>
+ : keys(begin, end) {}
+
+ void encode(buffer::list& bl) const {
+ ENCODE_START(1, 1, bl);
+ encode(keys, bl);
+ ENCODE_FINISH(bl);
+ }
+
+ void decode(buffer::list::const_iterator& bl) {
+ DECODE_START(1, bl);
+ decode(keys, bl);
+ DECODE_FINISH(bl);
+ }
+};
+WRITE_CLASS_ENCODER(increment);
+
+struct decrement {
+ boost::container::flat_set<std::string> keys;
+ ceph::timespan grace;
+
+ decrement() = default;
+
+ decrement(std::string s, ceph::timespan grace = 0ns)
+ : keys({std::move(s)}), grace(grace) {}
+
+ decrement(decltype(keys) s, ceph::timespan grace = 0ns)
+ : keys(std::move(s)), grace(grace) {}
+
+ template<std::input_iterator I>
+ decrement(I begin, I end, ceph::timespan grace = 0ns)
+ requires std::is_convertible_v<typename std::iterator_traits<I>::value_type,
+ std::string>
+ : keys(begin, end), grace(grace) {}
+
+ void encode(buffer::list& bl) const {
+ ENCODE_START(1, 1, bl);
+ encode(keys, bl);
+ encode(grace, bl);
+ ENCODE_FINISH(bl);
+ }
+
+ void decode(buffer::list::const_iterator& bl) {
+ DECODE_START(1, bl);
+ decode(keys, bl);
+ decode(grace, bl);
+ DECODE_FINISH(bl);
+ }
+};
+WRITE_CLASS_ENCODER(decrement);
+
+struct list_op {
+ std::uint64_t count;
+ std::string cursor;
+
+ void encode(buffer::list& bl) const {
+ ENCODE_START(1, 1, bl);
+ encode(count, bl);
+ encode(cursor, bl);
+ ENCODE_FINISH(bl);
+ }
+
+ void decode(buffer::list::const_iterator& bl) {
+ DECODE_START(1, bl);
+ decode(count, bl);
+ decode(cursor, bl);
+ DECODE_FINISH(bl);
+ }
+};
+WRITE_CLASS_ENCODER(list_op);
+
+struct list_ret {
+ boost::container::flat_map<std::string, std::uint64_t> kvs;
+ std::string cursor;
+
+ list_ret() = default;
+
+ void encode(buffer::list& bl) const {
+ ENCODE_START(1, 1, bl);
+ encode(kvs, bl);
+ encode(cursor, bl);
+ ENCODE_FINISH(bl);
+ }
+
+ void decode(buffer::list::const_iterator& bl) {
+ DECODE_START(1, bl);
+ decode(kvs, bl);
+ decode(cursor, bl);
+ DECODE_FINISH(bl);
+ }
+};
+WRITE_CLASS_ENCODER(list_ret);
+
+inline constexpr auto CLASS = "sem_set";
+inline constexpr auto INCREMENT = "increment";
+inline constexpr auto DECREMENT = "decrement";
+inline constexpr auto LIST = "list";
+
+} // namespace cls::sem_set
type: str
level: advanced
default: cephfs hello journal lock log numops otp rbd refcount rgw rgw_gc timeindex
- user version cas cmpomap queue 2pc_queue fifo
+ user version cas cmpomap queue 2pc_queue fifo sem_set
with_legacy: true
# list of object classes with default execute perm (allow all: *)
- name: osd_class_default_list
type: str
level: advanced
default: cephfs hello journal lock log numops otp rbd refcount rgw rgw_gc timeindex
- user version cas cmpomap queue 2pc_queue fifo
+ user version cas cmpomap queue 2pc_queue fifo sem_set
with_legacy: true
- name: osd_agent_max_ops
type: int
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2024 IBM
+ *
+ * See file COPYING for license information.
+ *
+ */
+
+#pragma once
+
+/// \file neodrados/cls/sem_set.h
+///
+/// \brief NeoRADOS interface to semaphore set class
+///
+/// The `sem_set` object class stores a set of strings with associated
+/// semaphores in OMAP.
+
+#include <cstdint>
+#include <initializer_list>
+#include <string>
+
+#include <boost/system/error_code.hpp>
+#include <boost/system/system_error.hpp>
+
+#include "include/buffer.h"
+
+#include "include/neorados/RADOS.hpp"
+
+#include "cls/sem_set/ops.h"
+
+namespace neorados::cls::sem_set {
+using namespace std::literals;
+
+/// \brief The maximum number of keys per op
+///
+using ::cls::sem_set::max_keys;
+
+/// \brief Increment semaphore
+///
+/// Append a call to a write operation that increments the semaphore
+/// on a key.
+///
+/// \param key Key to increment
+///
+/// \return The ClsWriteOp to be passed to WriteOp::exec
+[[nodiscard]] inline auto increment(std::string key)
+{
+ namespace ss = ::cls::sem_set;
+ buffer::list in;
+ ss::increment call{std::move(key)};
+ encode(call, in);
+ return ClsWriteOp{[in = std::move(in)](WriteOp& op) {
+ op.exec(ss::CLASS, ss::INCREMENT, in);
+ }};
+}
+
+/// \brief Increment semaphores
+///
+/// Append a call to a write operation that increments the semaphores
+/// on a set of keys.
+///
+/// \param keys Keys to increment
+///
+/// \return The ClsWriteOp to be passed to WriteOp::exec
+[[nodiscard]] inline auto increment(std::initializer_list<std::string> keys)
+{
+ namespace ss = ::cls::sem_set;
+ namespace buffer = ::ceph::buffer;
+ buffer::list in;
+ ss::increment call{keys};
+ encode(call, in);
+ return ClsWriteOp{[in = std::move(in)](WriteOp& op) {
+ op.exec(ss::CLASS, ss::INCREMENT, in);
+ }};
+}
+
+/// \brief Increment semaphores
+///
+/// Append a call to a write operation that increments the semaphores
+/// on a set of keys.
+///
+/// \param keys Keys to increment
+///
+/// \return The ClsWriteOp to be passed to WriteOp::exec
+[[nodiscard]] inline auto increment(boost::container::flat_set<std::string> keys)
+{
+ namespace ss = ::cls::sem_set;
+ namespace buffer = ::ceph::buffer;
+ buffer::list in;
+ ss::increment call{std::move(keys)};
+ encode(call, in);
+ return ClsWriteOp{[in = std::move(in)](WriteOp& op) {
+ op.exec(ss::CLASS, ss::INCREMENT, in);
+ }};
+}
+
+/// \brief Increment semaphores
+///
+/// Append a call to a write operation that increments the semaphores
+/// on a set of keys.
+///
+/// \param keys Keys to increment
+///
+/// \return The ClsWriteOp to be passed to WriteOp::exec
+template<std::input_iterator I>
+[[nodiscard]] inline auto increment(I begin, I end)
+ requires std::is_convertible_v<typename std::iterator_traits<I>::value_type,
+ std::string>
+{
+ namespace ss = ::cls::sem_set;
+ namespace buffer = ::ceph::buffer;
+ buffer::list in;
+ ss::increment call{begin, end};
+ encode(call, in);
+ return ClsWriteOp{[in = std::move(in)](WriteOp& op) {
+ op.exec(ss::CLASS, ss::INCREMENT, in);
+ }};
+}
+
+/// \brief Decrement semaphore
+///
+/// Append a call to a write operation that decrements the semaphore
+/// on a key.
+///
+/// \param key Key to decrement
+/// \param grace Don't decrement anything decremented more recently.
+///
+/// \return The ClsWriteOp to be passed to WriteOp::exec
+[[nodiscard]] inline auto
+decrement(std::string key, ceph::timespan grace = 0ns)
+{
+ namespace ss = ::cls::sem_set;
+ namespace buffer = ::ceph::buffer;
+ buffer::list in;
+ ss::decrement call{std::move(key), grace};
+ encode(call, in);
+ return ClsWriteOp{[in = std::move(in)](WriteOp& op) {
+ op.exec(ss::CLASS, ss::DECREMENT, in);
+ }};
+}
+
+/// \brief Decrement semaphores
+///
+/// Append a call to a write operation that decrements the semaphores
+/// on a set of keys.
+///
+/// \param keys Keys to decrement
+/// \param grace Don't decrement anything decremented more recently.
+///
+/// \return The ClsWriteOp to be passed to WriteOp::exec
+[[nodiscard]] inline auto
+decrement(std::initializer_list<std::string> keys, ceph::timespan grace = 0ns)
+{
+ namespace ss = ::cls::sem_set;
+ namespace buffer = ::ceph::buffer;
+ buffer::list in;
+ ss::decrement call{keys, grace};
+ encode(call, in);
+ return ClsWriteOp{[in = std::move(in)](WriteOp& op) {
+ op.exec(ss::CLASS, ss::DECREMENT, in);
+ }};
+}
+
+/// \brief Decrement semaphores
+///
+/// Append a call to a write operation that decrements the semaphores
+/// on a set of keys.
+///
+/// \param keys Keys to decrement
+/// \param grace Don't decrement anything decremented more recently.
+///
+/// \return The ClsWriteOp to be passed to WriteOp::exec
+[[nodiscard]] inline auto
+decrement(boost::container::flat_set<std::string> keys, ceph::timespan grace = 0ns)
+{
+ namespace ss = ::cls::sem_set;
+ namespace buffer = ::ceph::buffer;
+ buffer::list in;
+ ss::decrement call{std::move(keys), grace};
+ encode(call, in);
+ return ClsWriteOp{[in = std::move(in)](WriteOp& op) {
+ op.exec(ss::CLASS, ss::DECREMENT, in);
+ }};
+}
+
+/// \brief decrement semaphores
+///
+/// Append a call to a write operation that decrements the semaphores
+/// on a set of keys.
+///
+/// \param keys Keys to decrement
+/// \param grace Don't decrement anything decremented more recently.
+///
+/// \return The ClsWriteOp to be passed to WriteOp::exec
+template <std::input_iterator I>
+[[nodiscard]] inline auto
+decrement(I begin, I end, ceph::timespan grace = 0ns)
+ requires std::is_convertible_v<typename I::value_type, std::string>
+{
+ namespace ss = ::cls::sem_set;
+ namespace buffer = ::ceph::buffer;
+ buffer::list in;
+ ss::decrement call{begin, end, grace};
+ encode(call, in);
+ return ClsWriteOp{[in = std::move(in)](WriteOp& op) {
+ op.exec(ss::CLASS, ss::DECREMENT, in);
+ }};
+}
+
+/// \brief List keys and semaphores
+///
+/// Append a call to a read operation that lists keys and semaphores
+///
+/// \note This *appends* to the `entries`, it does not *clear* it.
+///
+/// \param count Number of entries to fetch
+/// \param cursor Where to start
+/// \param entries Unordered list to which entries are appended
+/// \param new_cursor Where to start for the next iteration, empty if completed
+///
+/// \return The ClsReadOp to be passed to WriteOp::exec
+[[nodiscard]] inline auto list(
+ std::uint64_t count, std::string cursor,
+ boost::container::flat_map<std::string, std::uint64_t>* const entries,
+ std::string* const new_cursor)
+{
+ namespace ss = ::cls::sem_set;
+ namespace sys = ::boost::system;
+ namespace buffer = ::ceph::buffer;
+ buffer::list in;
+ ::cls::sem_set::list_op call;
+ call.count = count;
+ call.cursor = std::move(cursor);
+
+ encode(call, in);
+ return ClsReadOp{[entries, new_cursor,
+ in = std::move(in)](ReadOp& op) {
+ op.exec(ss::CLASS, ss::LIST, in,
+ [entries, new_cursor](sys::error_code ec, const buffer::list& bl) {
+ ss::list_ret ret;
+ if (!ec) {
+ auto iter = bl.cbegin();
+ try {
+ decode(ret, iter);
+ } catch (const sys::system_error& e) {
+ if (e.code() == buffer::errc::end_of_buffer &&
+ bl.length() == 0) {
+ // It looks like if the object doesn't exist the
+ // CLS function isn't called and we don't get
+ // -ENOENT even though we get it for the op, we
+ // just get an empty buffer. This is crap.
+ if (new_cursor) {
+ new_cursor->clear();
+ }
+ return;
+ } else {
+ throw;
+ }
+ }
+ if (entries) {
+ entries->reserve(entries->size() + ret.kvs.size());
+ entries->merge(std::move(ret.kvs));
+ }
+ if (new_cursor) {
+ *new_cursor = std::move(ret.cursor);
+ }
+ }
+ });
+ }};
+}
+
+/// \brief List keys and semaphores
+///
+/// Append a call to a read operation that lists keys and semaphores
+///
+/// \param count Number of entries to fetch
+/// \param cursor Where to start
+/// \param out Output iterator
+/// \param new_cursor Where to start for the next iteration, empty if completed
+///
+/// \return The ClsReadOp to be passed to WriteOp::exec
+template<std::output_iterator<std::pair<std::string, std::uint64_t>> I>
+[[nodiscard]] inline auto list(std::uint64_t count,
+ std::string cursor, I output,
+ std::string* const new_cursor)
+{
+ namespace ss = ::cls::sem_set;
+ using boost::system::error_code;
+ namespace buffer = ::ceph::buffer;
+ buffer::list in;
+ ::cls::sem_set::list_op call;
+ call.count = count;
+ call.cursor = std::move(cursor);
+
+ encode(call, in);
+ return ClsReadOp{[output, new_cursor,
+ in = std::move(in)](ReadOp& op) {
+ op.exec(ss::CLASS, ss::LIST, in,
+ [output, new_cursor](error_code ec, const buffer::list& bl) {
+ ss::list_ret ret;
+ if (!ec) {
+ auto iter = bl.cbegin();
+ decode(ret, iter);
+ std::move(ret.kvs.begin(), ret.kvs.end(), output);
+ if (new_cursor) {
+ *new_cursor = std::move(ret.cursor);
+ }
+ }
+ });
+ }};
+}
+} // namespace neorados::cls::sem_set
add_dependencies(osd cls_rbd)
endif()
if(WITH_RADOSGW)
- add_dependencies(osd cls_otp cls_rgw cls_queue cls_rgw_gc cls_2pc_queue cls_fifo)
+ add_dependencies(osd cls_otp cls_rgw cls_queue cls_rgw_gc cls_2pc_queue cls_fifo cls_sem_set)
endif()
add_subdirectory(cls_2pc_queue)
add_subdirectory(cls_cmpomap)
add_subdirectory(cls_fifo)
+ add_subdirectory(cls_sem_set)
add_subdirectory(journal)
add_subdirectory(erasure-code)
--- /dev/null
+add_executable(ceph_test_cls_sem_set
+ test_cls_sem_set.cc
+ )
+target_link_libraries(ceph_test_cls_sem_set
+ libneorados
+ ${BLKID_LIBRARIES}
+ ${CMAKE_DL_LIBS}
+ ${CRYPTO_LIBS}
+ ${EXTRALIBS}
+ neoradostest-support
+ ${UNITTEST_LIBS}
+ )
+install(TARGETS
+ ceph_test_cls_sem_set
+ DESTINATION ${CMAKE_INSTALL_BINDIR})
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2023 IBM
+ *
+ * See file COPYING for license information.
+ *
+ */
+
+#include "neorados/cls/sem_set.h"
+
+#include <boost/system/detail/errc.hpp>
+#include <coroutine>
+#include <string>
+
+#include <boost/system/errc.hpp>
+
+#include <fmt/format.h>
+
+#include "gtest/gtest.h"
+
+#include "include/neorados/RADOS.hpp"
+
+#include "test/neorados/common_tests.h"
+
+namespace sem_set = neorados::cls::sem_set;
+namespace sys = boost::system;
+namespace container = boost::container;
+
+using neorados::ReadOp;
+using neorados::WriteOp;
+
+using namespace std::literals;
+
+/// Increment semaphore multiple times. Decrement the same number of
+/// times. Decrement once more and expect an error.
+CORO_TEST_F(cls_sem_set, inc_dec, NeoRadosTest)
+{
+ std::string_view oid = "obj";
+ co_await create_obj(oid);
+
+ auto key = "foo";
+ for (auto i = 0; i < 10; ++i) {
+ co_await execute(oid, WriteOp{}.exec(sem_set::increment(key)));
+ }
+ for (auto i = 0; i < 10; ++i) {
+ co_await execute(oid, WriteOp{}.exec(sem_set::decrement(key)));
+ }
+
+ co_await expect_error_code(
+ execute(oid, WriteOp{}.exec(sem_set::decrement(key))),
+ sys::errc::no_such_file_or_directory);
+ co_return;
+}
+
+// TODO: Uncomment when we get a GCC everywhere that doesn't crash
+// when compiling it.
+
+// /// Increment semaphore multiple times. Decrement the same number of
+// /// times. Decrement once more and expect an error. But this time use
+// /// the initializer list definition and two keys.
+// CORO_TEST_F(cls_sem_set, inc_dec_init_list, NeoRadosTest)
+// {
+// std::string_view oid = "obj";
+// co_await create_obj(oid);
+
+// auto key1 = "foo";
+// auto key2 = "bar";
+// for (auto i = 0; i < 10; ++i) {
+// co_await execute(oid, WriteOp{}.exec(sem_set::increment({key1, key2})));
+// }
+// for (auto i = 0; i < 10; ++i) {
+// co_await execute(oid, WriteOp{}.exec(sem_set::decrement({key1, key2})));
+// }
+
+// co_await expect_error_code(
+// execute(oid, WriteOp{}.exec(sem_set::decrement({key1, key2}))),
+// sys::errc::no_such_file_or_directory);
+// co_return;
+// }
+
+/// Send too many keys to increment, expecting an error. Then send too
+/// many keys to decrement, expecting an error.
+CORO_TEST_F(cls_sem_set, inc_dec_overflow, NeoRadosTest)
+{
+ std::string_view oid = "obj";
+ co_await create_obj(oid);
+
+ container::flat_set<std::string> strings;
+ for (auto i = 0u; i < 2 * sem_set::max_keys; ++i) {
+ strings.insert(fmt::format("key{}", i));
+ }
+
+ co_await expect_error_code(
+ execute(oid, WriteOp{}.exec(sem_set::increment(strings.begin(),
+ strings.end()))),
+ sys::errc::argument_list_too_long);
+ co_await expect_error_code(
+ execute(oid, WriteOp{}.exec(sem_set::decrement(strings.begin(),
+ strings.end()))),
+ sys::errc::argument_list_too_long);
+}
+
+
+/// Write the maximum number of keys we can return in one listing. Do
+/// one listing and ensure that the returned cursor is empty (thus
+/// showing list returned all values.) Repeat twice more to give
+/// coverage to the other two definitions of list.
+CORO_TEST_F(cls_sem_set, list_small, NeoRadosTest)
+{
+ std::string_view oid = "obj";
+ co_await create_obj(oid);
+ const auto max = sem_set::max_keys;
+
+ container::flat_set<std::string> ref;
+ for (auto i = 0u; i < max; ++i) {
+ ref.insert(fmt::format("key{}", i));
+ }
+ co_await execute(oid, WriteOp{}.exec(sem_set::increment(ref.begin(),
+ ref.end())));
+ {
+ container::flat_map<std::string, std::uint64_t> res;
+ std::string cursor;
+ co_await execute(oid, ReadOp{}.exec(sem_set::list(max, {},
+ &res, &cursor)));
+ EXPECT_TRUE(cursor.empty());
+ for (const auto& [key, value] : res) {
+ EXPECT_TRUE(ref.contains(key));
+ }
+ for (const auto& key : ref) {
+ EXPECT_TRUE(res.contains(key));
+ }
+ }
+ {
+ std::map<std::string, std::uint64_t> res;
+ std::string cursor;
+ co_await execute(oid, ReadOp{}.exec(
+ sem_set::list(max, {},
+ std::inserter(res, res.end()), &cursor)));
+ EXPECT_TRUE(cursor.empty());
+ for (const auto& [key, value] : res) {
+ EXPECT_TRUE(ref.contains(key));
+ }
+ for (const auto& key : ref) {
+ EXPECT_TRUE(res.contains(key));
+ }
+ }
+ {
+ std::vector<std::pair<std::string, std::uint64_t>> res;
+ std::string cursor;
+ co_await execute(oid, ReadOp{}.exec(
+ sem_set::list(max, {},
+ std::inserter(res, res.end()), &cursor)));
+ EXPECT_TRUE(cursor.empty());
+ for (const auto& [key, value] : res) {
+ EXPECT_TRUE(ref.contains(key));
+ }
+ EXPECT_EQ(ref.size(), res.size());
+ }
+}
+
+/// Write the maximum number of keys we can return in one listing
+/// several times. Do an iterated listing to test cursor
+/// functionality and append. Check that we have all of and only the
+/// keys we should. Repeat for the other two list functions.
+CORO_TEST_F(cls_sem_set, list_large, NeoRadosTest)
+{
+ std::string_view oid = "obj";
+ co_await create_obj(oid);
+ const auto max = sem_set::max_keys;
+
+ container::flat_set<std::string> ref;
+ for (auto i = 0u; i < 4; ++i) {
+ container::flat_set<std::string> part;
+ for (auto j = 0u; j < max; ++j) {
+ part.insert(fmt::format("key{}", (i * max) + j));
+ }
+ co_await execute(oid, WriteOp{}.exec(sem_set::increment(part.begin(),
+ part.end())));
+ ref.merge(std::move(part));
+ }
+ EXPECT_EQ(4 * max, ref.size());
+ std::string cursor;
+
+ container::flat_map<std::string, std::uint64_t> res_um;
+ do {
+ co_await execute(oid, ReadOp{}.exec(
+ sem_set::list(max, cursor, &res_um, &cursor)));
+ } while (!cursor.empty());
+ for (const auto& [key, value] : res_um) {
+ EXPECT_TRUE(ref.contains(key));
+ }
+ for (const auto& key : ref) {
+ EXPECT_TRUE(res_um.contains(key));
+ }
+
+ std::map<std::string, std::uint64_t> res_m;
+ cursor.clear();
+ do {
+ co_await execute(oid, ReadOp{}.exec(
+ sem_set::list(max, cursor,
+ std::inserter(res_m, res_m.end()),
+ &cursor)));
+ } while (!cursor.empty());
+ for (const auto& [key, value] : res_m) {
+ EXPECT_TRUE(ref.contains(key));
+ }
+ for (const auto& key : ref) {
+ EXPECT_TRUE(res_m.contains(key));
+ }
+
+ std::vector<std::pair<std::string, std::uint64_t>> res_v;
+ cursor.clear();
+ do {
+ co_await execute(oid, ReadOp{}.exec(
+ sem_set::list(max, cursor,
+ std::inserter(res_v, res_v.end()),
+ &cursor)));
+ } while (!cursor.empty());
+ for (const auto& [key, value] : res_v) {
+ EXPECT_TRUE(ref.contains(key));
+ }
+ EXPECT_EQ(ref.size(), res_v.size());
+}
+
+// Increment some semaphores, wait, increment some more, decrement
+// them all with a time.
+CORO_TEST_F(cls_sem_set, inc_dec_time, NeoRadosTest)
+{
+ std::string_view oid = "obj";
+ co_await create_obj(oid);
+
+ const auto lose = "lose"s;
+ const auto remain = "remain"s;
+
+ co_await execute(oid, WriteOp{}.exec(sem_set::increment(lose)));
+ co_await execute(oid, WriteOp{}.exec(sem_set::increment(lose)));
+ co_await execute(oid, WriteOp{}.exec(sem_set::increment(remain)));
+ co_await execute(oid, WriteOp{}.exec(sem_set::increment(remain)));
+
+ co_await execute(oid, WriteOp{}.exec(sem_set::decrement(lose)));
+
+ co_await wait_for(1s);
+ co_await execute(oid, WriteOp{}.exec(sem_set::decrement(remain)));
+
+ container::flat_set<std::string> keys{lose, remain};
+ co_await execute(oid,
+ WriteOp{}.exec(sem_set::decrement(std::move(keys), 100ms)));
+
+ container::flat_map<std::string, std::uint64_t> res;
+ std::string cursor;
+ co_await execute(oid, ReadOp{}.exec(sem_set::list(sem_set::max_keys, {},
+ &res, &cursor)));
+
+ EXPECT_TRUE(cursor.empty());
+ EXPECT_EQ(1u, res.size());
+ EXPECT_EQ(remain, res.begin()->first);
+
+ co_return;
+}