]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
cls: Add sem_set class
authorAdam Emerson <aemerson@redhat.com>
Tue, 5 Mar 2024 14:56:19 +0000 (09:56 -0500)
committerAdam C. Emerson <aemerson@redhat.com>
Tue, 1 Apr 2025 15:10:13 +0000 (11:10 -0400)
This stores a collection of key-based semaphores, intended to serve as
a backend to the project to close the datalog write-hole.

Signed-off-by: Adam Emerson <aemerson@redhat.com>
12 files changed:
qa/suites/rgw/verify/tasks/cls.yaml
qa/workunits/cls/test_cls_sem_set.sh [new file with mode: 0755]
src/cls/CMakeLists.txt
src/cls/sem_set/DESIGN.md [new file with mode: 0644]
src/cls/sem_set/module.cc [new file with mode: 0644]
src/cls/sem_set/ops.h [new file with mode: 0644]
src/common/options/osd.yaml.in
src/neorados/cls/sem_set.h [new file with mode: 0644]
src/osd/CMakeLists.txt
src/test/CMakeLists.txt
src/test/cls_sem_set/CMakeLists.txt [new file with mode: 0644]
src/test/cls_sem_set/test_cls_sem_set.cc [new file with mode: 0644]

index 26f948d42ecabd8a6d3599ca0cad2eb8da290668..4748d1f69f5b0e7f553462ebf4773d877499497e 100644 (file)
@@ -16,6 +16,7 @@ tasks:
         - cls/test_cls_cmpomap.sh
         - cls/test_cls_2pc_queue.sh
         - cls/test_cls_user.sh
+        - cls/test_cls_sem_set.sh
         - rgw/test_rgw_gc_log.sh
         - rgw/test_rgw_obj.sh
         - rgw/test_librgw_file.sh
diff --git a/qa/workunits/cls/test_cls_sem_set.sh b/qa/workunits/cls/test_cls_sem_set.sh
new file mode 100755 (executable)
index 0000000..4624e5e
--- /dev/null
@@ -0,0 +1,5 @@
+#!/bin/sh -e
+
+ceph_test_cls_sem_set
+
+exit 0
index 953ac83195f26ee7f6d39dd81c2c3122bb7faa32..fc98e62c126b9e8225041c57c8907b455ff62299 100644 (file)
@@ -358,4 +358,12 @@ set_target_properties(cls_fifo PROPERTIES
 target_link_libraries(cls_fifo ${FMT_LIB})
 install(TARGETS cls_fifo DESTINATION ${cls_dir})
 
-
+# cls_sem_set
+set(cls_sem_set_srcs sem_set/module.cc)
+add_library(cls_sem_set SHARED ${cls_sem_set_srcs})
+set_target_properties(cls_sem_set PROPERTIES
+  VERSION "1.0.0"
+  SOVERSION "1"
+  INSTALL_RPATH ""
+  CXX_VISIBILITY_PRESET hidden)
+install(TARGETS cls_sem_set DESTINATION ${cls_dir})
diff --git a/src/cls/sem_set/DESIGN.md b/src/cls/sem_set/DESIGN.md
new file mode 100644 (file)
index 0000000..b1ab92a
--- /dev/null
@@ -0,0 +1,134 @@
+# Closing the RGWDataChangesLog write hole #
+
+The Cause: The 'window' optimization creates a situation where some
+changes exist only in RAM until a timer expires. If an RGW crashes
+after a write, the data log entry might never be made.
+
+## Requirements ##
+
+* We need to handle syncing to multiple zones
+* Some zones may not sync some buckets
+* We need to maintain a strict ordering within the shard, with shards
+  with the oldest un-swnc activity coming at the top of the listing
+* We need to coalesce writes within a shard
+* Spurious entries are permitted (but should be infrequent). They
+  impact efficiency, not correctness.
+
+## Definitions ##
+
+* `RGWDataChangesLog`: The class implementing the datalog
+  functionality.
+* `cur_cycle`: The current working set of entries that will be written
+  to FIFO at the close of the window.
+* `add_entry`: The function in the datalog API to write an entry.
+* `renew_entries`: Function run periodically to write an entry to FIFO
+  for every bucketshard in `cur_cycle`.
+* `recover`: Proposed function to complete initiated but incomplete datalog writes.
+* semaphore object: Proposed object on the OSD holding a count for
+  every bucketshard, sharded the same as datalog shards.
+* bs1, ...: Arbitrary bucket shards
+
+## Current Proposal ##
+
+* Make `add_entry` a transaction.
+* Reify `cur_cycle` on the OSD in the semaphore object.
+* Add a `recover` function to be run at startup.
+* Keep the window optimization.
+
+
+### RGWDataChangesLog ###
+
+* When `add_entry` is called with bs1:
+    + If `cur_cycle` does not contain bs1, increment bs1's count in
+      the semaphore object
+       + Otherwise, proceed as we do currently.
+* In `renew_entries`, after the FIFO entry for a given shard has been
+  written, decrement the count associated with that bucket shard on
+  the semaphore object.
+* In `recover`:
+    1. Read the semaphore object, keeping bucketshards and counts in
+       memory (`unordered_map`).
+       2. For every bucketshard, write an entry into the FIFO.
+    3. Send a notification on the semaphore object
+    4. On receiving a notification, RGWDataChangesLog will send
+       `cur_cycle` as the response.
+    5. On successful notify, go through each response and decrement
+       each bucketshard in the unordered map.  (e.g. if three RGWs
+       respond with bs1 in their `cur_cycle`, bs1 will be decremented
+       thrice)
+       6. For each entry in the unordered map, decrement on the semaphore
+       object only if the object's count is greater than 0.
+    7. If the `notify` operation errors, don't decrement anything.
+* Have some task call `compress` on a regular basis (Daily? Hourly?),
+  to keep seldom used or deleted bucket shards from slowing down
+  listing
+
+
+### CLS module requirements ###
+
+* Our back-end data store will be omap. We will use exactly one
+  key/value pair for every bucketshard in the system
+* To avoid the performance problems of deletes and reinsertions, don't
+  delete keys when they fall to zero, just set their value to 0.
+
+#### Operations ####
+
+* increment([bucketshard, ...]) -> {}:
+    - For each bucketshard, look up the given key in omap. If it
+      exists, set the value to one more than is currently
+      there. Otherwise create it with a value of 1.
+* decrement([bucketshard, ...], grace) -> {}:
+    - For each bucketshard, look it up in omap. If it does not exist
+      or the value is 0, error. If the last decrement was within the
+         'grace' timespan, skip it. Otherwise write decremented value.
+    - Should it actually error or do we want saturating arithmetic at
+      0? Given that the recovery design proposed below allows values
+      to remain spuriously non-decremented but tries to rule out
+      spurious decrements, we likely want to have the system scream
+      bloody murder if we underflow.
+* list(cursor?) -> ([(entry, count), ...], cursor):
+    - Return entries starting from cursor. Skip any with a semaphore of 0.
+* compress(cursor?) -> cursor?:
+    - Go through and delete all entries with a 0 count.
+    - Return cursor if we can't fit more calls the operation
+         and need another go-round
+
+## Analysis ##
+
+* Casey's semaphore idea solves the race between `add_entry` and
+  `renew_entries` within a single process and between RGWs on
+  different hosts
+* The use of watch/notify solves the potential race between `recover`
+  and other RGWs, ensuring that we won't delete someone else's
+  transaction.
+* As described, notify fails safe, as we don't decrement semaphores on
+  error. This can lead to spurious recover in the future, but that's
+  not harmful.
+* In general, unpaired increments are considered acceptable, as
+  they'll tend toward zero over time through recovery.
+* Watch/Notify seems a better choice than locking, as it can't
+  introduce a stall on `add_entry`.
+* `increment`/`decrement` take a vector of bucketshards so they're
+  ready to support batching.
+* We will never have more omap entries than we do bucketshards in the
+  system, and the operations are eventually all read-modify-write. Not
+  deleting saves us from the worst pathologies of omap.
+
+## Questions ##
+
+* Do we want to shard the semaphore object?
+    - [casey] yes, for write parallelism. we don't want all object
+      writes to serialize on a single rados object
+
+* What happens when we delete buckets? Would we need to clean up the
+  asociated bucketshards and delete them?
+    - [casey] same comes up for bucket reshards, where we add :gen to
+     these bucket-shard-gen keys i do think we want to delete keys
+     after they reach 0. ideally we'd do this in batches rocksdb
+     deletes suck because they leave lots of tombstones behind, but
+     that mainly effects listing performance. here we only list for
+     recovery, and recovery only wants to see entries with count > 0
+
+* Is a separate `compress` function really necessary? It might be
+  worth doing test runs between a version that just deletes inline and
+  one that uses a separate compress step.
diff --git a/src/cls/sem_set/module.cc b/src/cls/sem_set/module.cc
new file mode 100644 (file)
index 0000000..66d33b9
--- /dev/null
@@ -0,0 +1,256 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include <concepts>
+#include <cstdint>
+#include <map>
+#include <string>
+#include <string_view>
+
+#include <boost/system/system_error.hpp>
+
+#include "include/rados/objclass.h"
+
+#include "cls/sem_set/ops.h"
+
+#include "objclass/objclass.h"
+
+
+namespace buffer = ::ceph::buffer;
+namespace sys = ::boost::system;
+namespace ss = ::cls::sem_set;
+
+using namespace std::literals;
+
+namespace {
+/// So we don't clash with other OMAP keys.
+inline constexpr auto PREFIX = "CLS_SEM_SET_"sv;
+
+struct sem_val {
+  uint64_t value = 0;
+  ceph::real_time last_decrement = ceph::real_time::min();
+
+  sem_val() = default;
+
+  sem_val(uint64_t value) : value(value) {}
+
+  void encode(buffer::list& bl) const {
+    ENCODE_START(1, 1, bl);
+    encode(value, bl);
+    encode(last_decrement, bl);
+    ENCODE_FINISH(bl);
+  }
+
+  void decode(buffer::list::const_iterator& bl) {
+    DECODE_START(1, bl);
+    decode(value, bl);
+    decode(last_decrement, bl);
+    DECODE_FINISH(bl);
+  }
+};
+WRITE_CLASS_ENCODER(sem_val);
+
+
+int increment(cls_method_context_t hctx, buffer::list *in, buffer::list *out)
+{
+  CLS_LOG(10, "%s", __PRETTY_FUNCTION__);
+
+  ss::increment op;
+  try {
+    auto iter = in->cbegin();
+    decode(op, iter);
+  } catch (const std::exception& e) {
+    CLS_ERR("ERROR: %s: failed to decode request: %s", __PRETTY_FUNCTION__,
+            e.what());
+    return -EINVAL;
+  }
+
+  if (op.keys.size() > ::cls::sem_set::max_keys) {
+    CLS_ERR("ERROR: %s: too many keys: %zu", __PRETTY_FUNCTION__,
+           op.keys.size());
+    return -E2BIG;
+  }
+
+  for (const auto& key_ : op.keys) try {
+      buffer::list valbl;
+      auto key = std::string(PREFIX) + key_;
+      auto r = cls_cxx_map_get_val(hctx, key, &valbl);
+      sem_val val;
+      if (r >= 0) {
+       auto bi = valbl.cbegin();
+       decode(val, bi);
+       valbl.clear();
+      } else if (r == -ENOENT) {
+       val.value = 0;
+      } else {
+        CLS_ERR("ERROR: %s: failed to read semaphore: r=%d",
+                __PRETTY_FUNCTION__, r);
+        return r;
+      }
+      val.value += 1;
+      encode(val, valbl);
+      r = cls_cxx_map_set_val(hctx, key, &valbl);
+      if (r < 0) {
+        CLS_ERR("ERROR: %s: failed to update semaphore: r=%d",
+                __PRETTY_FUNCTION__, r);
+        return r;
+      }
+    } catch (const std::exception& e) {
+      CLS_ERR("CAN'T HAPPEN: %s: failed to decode semaphore: %s",
+              __PRETTY_FUNCTION__, e.what());
+      return -EIO;
+    }
+
+  return 0;
+}
+
+int decrement(cls_method_context_t hctx, buffer::list *in, buffer::list *out)
+{
+  CLS_LOG(10, "%s", __PRETTY_FUNCTION__);
+
+  ss::decrement op;
+  try {
+    auto iter = in->cbegin();
+    decode(op, iter);
+  } catch (const std::exception& e) {
+    CLS_ERR("ERROR: %s: failed to decode request: %s", __PRETTY_FUNCTION__,
+            e.what());
+    return -EINVAL;
+  }
+
+  if (op.keys.size() > ::cls::sem_set::max_keys) {
+    CLS_ERR("ERROR: %s: too many keys: %zu", __PRETTY_FUNCTION__,
+           op.keys.size());
+    return -E2BIG;
+  }
+
+  auto now = ceph::real_clock::now();
+  for (const auto& key_ : op.keys) try {
+      buffer::list valbl;
+      auto key = std::string(PREFIX) + key_;
+      auto r = cls_cxx_map_get_val(hctx, key, &valbl);
+      if (r < 0) {
+        CLS_ERR("ERROR: %s: failed to read semaphore: r=%d",
+                __PRETTY_FUNCTION__, r);
+        return r;
+      }
+      sem_val val;
+      auto bi = valbl.cbegin();
+      decode(val, bi);
+      // Don't decrement if we're within the grace period (or there's
+      // screwy time stuff)
+      if ((now < val.last_decrement) ||
+         (now - val.last_decrement < op.grace)) {
+       continue;
+      }
+      if (val.value > 1) {
+       --(val.value);
+       val.last_decrement = now;
+        valbl.clear();
+        encode(val, valbl);
+        r = cls_cxx_map_set_val(hctx, key, &valbl);
+        if (r < 0) {
+          CLS_ERR("ERROR: %s: failed to update semaphore: r=%d",
+                  __PRETTY_FUNCTION__, r);
+          return r;
+        }
+      } else {
+        r = cls_cxx_map_remove_key(hctx, key);
+        if (r < 0) {
+          CLS_ERR("ERROR: %s: failed to remove key %s: r=%d",
+                  __PRETTY_FUNCTION__, key.c_str(), r);
+          return r;
+        }
+      }
+    } catch (const std::exception& e) {
+      CLS_ERR("CORRUPTION: %s: failed to decode semaphore: %s",
+              __PRETTY_FUNCTION__, e.what());
+      return -EIO;
+    }
+
+  return 0;
+}
+
+int list(cls_method_context_t hctx, buffer::list *in, buffer::list *out)
+{
+  CLS_LOG(10, "%s", __PRETTY_FUNCTION__);
+
+  ss::list_op op;
+  try {
+    auto iter = in->cbegin();
+    decode(op, iter);
+  } catch (const std::exception& e) {
+    CLS_ERR("ERROR: %s: failed to decode request: %s", __PRETTY_FUNCTION__,
+            e.what());
+    return -EINVAL;
+  }
+
+  auto count = op.count;
+  ss::list_ret res;
+  res.cursor = op.cursor;
+
+  while (count > 0) {
+    auto cursor = std::string{PREFIX} + res.cursor + '\001';
+    std::map<std::string, ceph::buffer::list> vals;
+    bool more = false;
+    auto r = cls_cxx_map_get_vals(hctx, cursor,
+                                 std::string(PREFIX), count,
+                                 &vals, &more);
+    if (r < 0) {
+      CLS_ERR("ERROR: %s: failed to read semaphore: r=%d",
+              __PRETTY_FUNCTION__, r);
+      return r;
+    }
+
+    count = vals.size() <= count ? count - vals.size() : 0;
+
+    if (!vals.empty()) {
+      res.cursor = std::string{(--vals.end())->first, PREFIX.size()};
+    }
+    try {
+      for (auto&& [key_, valbl] : vals) {
+       sem_val val;
+       auto bi = valbl.cbegin();
+       decode(val, bi);
+       res.kvs.emplace(std::piecewise_construct,
+                       std::forward_as_tuple(std::move(key_), PREFIX.size()),
+                       std::forward_as_tuple(val.value));
+      }
+    } catch (const sys::system_error& e) {
+      CLS_ERR("CAN'T HAPPEN: %s: failed to decode seamaphore: %s",
+             __PRETTY_FUNCTION__, e.what());
+      return ceph::from_error_code(e.code());;
+    }
+    if (!more) {
+      res.cursor.clear();
+      break;
+    }
+  }
+
+  encode(res, *out);
+  return 0;
+}
+} // namespace (anonymous)
+
+CLS_INIT(sem_set)
+{
+  cls_handle_t h_class;
+  cls_method_handle_t h_increment;
+  cls_method_handle_t h_decrement;
+  cls_method_handle_t h_list;
+
+  cls_register(ss::CLASS, &h_class);
+  cls_register_cxx_method(h_class, ss::INCREMENT,
+                          CLS_METHOD_RD | CLS_METHOD_WR,
+                          &increment, &h_increment);
+
+  cls_register_cxx_method(h_class, ss::DECREMENT,
+                          CLS_METHOD_RD | CLS_METHOD_WR,
+                          &decrement, &h_decrement);
+
+  cls_register_cxx_method(h_class, ss::LIST,
+                          CLS_METHOD_RD,
+                          &list, &h_list);
+
+  return;
+}
diff --git a/src/cls/sem_set/ops.h b/src/cls/sem_set/ops.h
new file mode 100644 (file)
index 0000000..24d63dc
--- /dev/null
@@ -0,0 +1,135 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <cstdint>
+#include <initializer_list>
+#include <iterator>
+#include <string>
+
+#include <boost/container/flat_set.hpp>
+#include <boost/container/flat_map.hpp>
+
+#include "include/encoding.h"
+
+namespace cls::sem_set {
+using namespace std::literals;
+
+inline constexpr auto max_keys = 1'000u;
+
+namespace buffer = ceph::buffer;
+
+struct increment {
+  boost::container::flat_set<std::string> keys;
+
+  increment() = default;
+
+  increment(std::string s)
+    : keys({std::move(s)}) {}
+
+  increment(decltype(keys) s)
+    : keys(std::move(s)) {}
+
+  template<std::input_iterator I>
+  increment(I begin, I end)
+    requires std::is_convertible_v<typename std::iterator_traits<I>::value_type,
+                                  std::string>
+    : keys(begin, end) {}
+
+  void encode(buffer::list& bl) const {
+    ENCODE_START(1, 1, bl);
+    encode(keys, bl);
+    ENCODE_FINISH(bl);
+  }
+
+  void decode(buffer::list::const_iterator& bl) {
+    DECODE_START(1, bl);
+    decode(keys, bl);
+    DECODE_FINISH(bl);
+  }
+};
+WRITE_CLASS_ENCODER(increment);
+
+struct decrement {
+  boost::container::flat_set<std::string> keys;
+  ceph::timespan grace;
+
+  decrement() = default;
+
+  decrement(std::string s, ceph::timespan grace = 0ns)
+    : keys({std::move(s)}), grace(grace) {}
+
+  decrement(decltype(keys) s, ceph::timespan grace = 0ns)
+    : keys(std::move(s)), grace(grace) {}
+
+  template<std::input_iterator I>
+  decrement(I begin, I end, ceph::timespan grace = 0ns)
+    requires std::is_convertible_v<typename std::iterator_traits<I>::value_type,
+                                  std::string>
+    : keys(begin, end), grace(grace) {}
+
+  void encode(buffer::list& bl) const {
+    ENCODE_START(1, 1, bl);
+    encode(keys, bl);
+    encode(grace, bl);
+    ENCODE_FINISH(bl);
+  }
+
+  void decode(buffer::list::const_iterator& bl) {
+    DECODE_START(1, bl);
+    decode(keys, bl);
+    decode(grace, bl);
+    DECODE_FINISH(bl);
+  }
+};
+WRITE_CLASS_ENCODER(decrement);
+
+struct list_op {
+  std::uint64_t count;
+  std::string cursor;
+
+  void encode(buffer::list& bl) const {
+    ENCODE_START(1, 1, bl);
+    encode(count, bl);
+    encode(cursor, bl);
+    ENCODE_FINISH(bl);
+  }
+
+  void decode(buffer::list::const_iterator& bl) {
+    DECODE_START(1, bl);
+    decode(count, bl);
+    decode(cursor, bl);
+    DECODE_FINISH(bl);
+  }
+};
+WRITE_CLASS_ENCODER(list_op);
+
+struct list_ret {
+  boost::container::flat_map<std::string, std::uint64_t> kvs;
+  std::string cursor;
+
+  list_ret() = default;
+
+  void encode(buffer::list& bl) const {
+    ENCODE_START(1, 1, bl);
+    encode(kvs, bl);
+    encode(cursor, bl);
+    ENCODE_FINISH(bl);
+  }
+
+  void decode(buffer::list::const_iterator& bl) {
+    DECODE_START(1, bl);
+    decode(kvs, bl);
+    decode(cursor, bl);
+    DECODE_FINISH(bl);
+  }
+};
+WRITE_CLASS_ENCODER(list_ret);
+
+inline constexpr auto CLASS = "sem_set";
+inline constexpr auto INCREMENT = "increment";
+inline constexpr auto DECREMENT = "decrement";
+inline constexpr auto LIST = "list";
+
+} // namespace cls::sem_set
index 3ba8f3d2770e2ad4e33ea1f54a22e73d3fc1309a..ac9072198ae7662756feb77d33d5a15a684efaf8 100644 (file)
@@ -666,14 +666,14 @@ options:
   type: str
   level: advanced
   default: cephfs hello journal lock log numops otp rbd refcount rgw rgw_gc timeindex
-    user version cas cmpomap queue 2pc_queue fifo
+    user version cas cmpomap queue 2pc_queue fifo sem_set
   with_legacy: true
 # list of object classes with default execute perm (allow all: *)
 - name: osd_class_default_list
   type: str
   level: advanced
   default: cephfs hello journal lock log numops otp rbd refcount rgw rgw_gc timeindex
-    user version cas cmpomap queue 2pc_queue fifo
+    user version cas cmpomap queue 2pc_queue fifo sem_set
   with_legacy: true
 - name: osd_agent_max_ops
   type: int
diff --git a/src/neorados/cls/sem_set.h b/src/neorados/cls/sem_set.h
new file mode 100644 (file)
index 0000000..ec810af
--- /dev/null
@@ -0,0 +1,315 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2024 IBM
+ *
+ * See file COPYING for license information.
+ *
+ */
+
+#pragma once
+
+/// \file neodrados/cls/sem_set.h
+///
+/// \brief NeoRADOS interface to semaphore set class
+///
+/// The `sem_set` object class stores a set of strings with associated
+/// semaphores in OMAP.
+
+#include <cstdint>
+#include <initializer_list>
+#include <string>
+
+#include <boost/system/error_code.hpp>
+#include <boost/system/system_error.hpp>
+
+#include "include/buffer.h"
+
+#include "include/neorados/RADOS.hpp"
+
+#include "cls/sem_set/ops.h"
+
+namespace neorados::cls::sem_set {
+using namespace std::literals;
+
+/// \brief The maximum number of keys per op
+///
+using ::cls::sem_set::max_keys;
+
+/// \brief Increment semaphore
+///
+/// Append a call to a write operation that increments the semaphore
+/// on a key.
+///
+/// \param key Key to increment
+///
+/// \return The ClsWriteOp to be passed to WriteOp::exec
+[[nodiscard]] inline auto increment(std::string key)
+{
+  namespace ss = ::cls::sem_set;
+  buffer::list in;
+  ss::increment call{std::move(key)};
+  encode(call, in);
+  return ClsWriteOp{[in = std::move(in)](WriteOp& op) {
+    op.exec(ss::CLASS, ss::INCREMENT, in);
+  }};
+}
+
+/// \brief Increment semaphores
+///
+/// Append a call to a write operation that increments the semaphores
+/// on a set of keys.
+///
+/// \param keys Keys to increment
+///
+/// \return The ClsWriteOp to be passed to WriteOp::exec
+[[nodiscard]] inline auto increment(std::initializer_list<std::string> keys)
+{
+  namespace ss = ::cls::sem_set;
+  namespace buffer = ::ceph::buffer;
+  buffer::list in;
+  ss::increment call{keys};
+  encode(call, in);
+  return ClsWriteOp{[in = std::move(in)](WriteOp& op) {
+    op.exec(ss::CLASS, ss::INCREMENT, in);
+  }};
+}
+
+/// \brief Increment semaphores
+///
+/// Append a call to a write operation that increments the semaphores
+/// on a set of keys.
+///
+/// \param keys Keys to increment
+///
+/// \return The ClsWriteOp to be passed to WriteOp::exec
+[[nodiscard]] inline auto increment(boost::container::flat_set<std::string> keys)
+{
+  namespace ss = ::cls::sem_set;
+  namespace buffer = ::ceph::buffer;
+  buffer::list in;
+  ss::increment call{std::move(keys)};
+  encode(call, in);
+  return ClsWriteOp{[in = std::move(in)](WriteOp& op) {
+    op.exec(ss::CLASS, ss::INCREMENT, in);
+  }};
+}
+
+/// \brief Increment semaphores
+///
+/// Append a call to a write operation that increments the semaphores
+/// on a set of keys.
+///
+/// \param keys Keys to increment
+///
+/// \return The ClsWriteOp to be passed to WriteOp::exec
+template<std::input_iterator I>
+[[nodiscard]] inline auto increment(I begin, I end)
+  requires std::is_convertible_v<typename std::iterator_traits<I>::value_type,
+                                std::string>
+{
+  namespace ss = ::cls::sem_set;
+  namespace buffer = ::ceph::buffer;
+  buffer::list in;
+  ss::increment call{begin, end};
+  encode(call, in);
+  return ClsWriteOp{[in = std::move(in)](WriteOp& op) {
+    op.exec(ss::CLASS, ss::INCREMENT, in);
+  }};
+}
+
+/// \brief Decrement semaphore
+///
+/// Append a call to a write operation that decrements the semaphore
+/// on a key.
+///
+/// \param key Key to decrement
+/// \param grace Don't decrement anything decremented more recently.
+///
+/// \return The ClsWriteOp to be passed to WriteOp::exec
+[[nodiscard]] inline auto
+decrement(std::string key, ceph::timespan grace = 0ns)
+{
+  namespace ss = ::cls::sem_set;
+  namespace buffer = ::ceph::buffer;
+  buffer::list in;
+  ss::decrement call{std::move(key), grace};
+  encode(call, in);
+  return ClsWriteOp{[in = std::move(in)](WriteOp& op) {
+    op.exec(ss::CLASS, ss::DECREMENT, in);
+  }};
+}
+
+/// \brief Decrement semaphores
+///
+/// Append a call to a write operation that decrements the semaphores
+/// on a set of keys.
+///
+/// \param keys Keys to decrement
+/// \param grace Don't decrement anything decremented more recently.
+///
+/// \return The ClsWriteOp to be passed to WriteOp::exec
+[[nodiscard]] inline auto
+decrement(std::initializer_list<std::string> keys, ceph::timespan grace = 0ns)
+{
+  namespace ss = ::cls::sem_set;
+  namespace buffer = ::ceph::buffer;
+  buffer::list in;
+  ss::decrement call{keys, grace};
+  encode(call, in);
+  return ClsWriteOp{[in = std::move(in)](WriteOp& op) {
+    op.exec(ss::CLASS, ss::DECREMENT, in);
+  }};
+}
+
+/// \brief Decrement semaphores
+///
+/// Append a call to a write operation that decrements the semaphores
+/// on a set of keys.
+///
+/// \param keys Keys to decrement
+/// \param grace Don't decrement anything decremented more recently.
+///
+/// \return The ClsWriteOp to be passed to WriteOp::exec
+[[nodiscard]] inline auto
+decrement(boost::container::flat_set<std::string> keys, ceph::timespan grace = 0ns)
+{
+  namespace ss = ::cls::sem_set;
+  namespace buffer = ::ceph::buffer;
+  buffer::list in;
+  ss::decrement call{std::move(keys), grace};
+  encode(call, in);
+  return ClsWriteOp{[in = std::move(in)](WriteOp& op) {
+    op.exec(ss::CLASS, ss::DECREMENT, in);
+  }};
+}
+
+/// \brief decrement semaphores
+///
+/// Append a call to a write operation that decrements the semaphores
+/// on a set of keys.
+///
+/// \param keys Keys to decrement
+/// \param grace Don't decrement anything decremented more recently.
+///
+/// \return The ClsWriteOp to be passed to WriteOp::exec
+template <std::input_iterator I>
+[[nodiscard]] inline auto
+decrement(I begin, I end, ceph::timespan grace = 0ns)
+  requires std::is_convertible_v<typename I::value_type, std::string>
+{
+  namespace ss = ::cls::sem_set;
+  namespace buffer = ::ceph::buffer;
+  buffer::list in;
+  ss::decrement call{begin, end, grace};
+  encode(call, in);
+  return ClsWriteOp{[in = std::move(in)](WriteOp& op) {
+    op.exec(ss::CLASS, ss::DECREMENT, in);
+  }};
+}
+
+/// \brief List keys and semaphores
+///
+/// Append a call to a read operation that lists keys and semaphores
+///
+/// \note This *appends* to the `entries`, it does not *clear* it.
+///
+/// \param count Number of entries to fetch
+/// \param cursor Where to start
+/// \param entries Unordered list to which entries are appended
+/// \param new_cursor Where to start for the next iteration, empty if completed
+///
+/// \return The ClsReadOp to be passed to WriteOp::exec
+[[nodiscard]] inline auto list(
+  std::uint64_t count, std::string cursor,
+  boost::container::flat_map<std::string, std::uint64_t>* const entries,
+  std::string* const new_cursor)
+{
+  namespace ss = ::cls::sem_set;
+  namespace sys = ::boost::system;
+  namespace buffer = ::ceph::buffer;
+  buffer::list in;
+  ::cls::sem_set::list_op call;
+  call.count = count;
+  call.cursor = std::move(cursor);
+
+  encode(call, in);
+  return ClsReadOp{[entries, new_cursor,
+                   in = std::move(in)](ReadOp& op) {
+    op.exec(ss::CLASS, ss::LIST, in,
+           [entries, new_cursor](sys::error_code ec, const buffer::list& bl) {
+             ss::list_ret ret;
+             if (!ec) {
+               auto iter = bl.cbegin();
+               try {
+                 decode(ret, iter);
+               } catch (const sys::system_error& e) {
+                 if (e.code() == buffer::errc::end_of_buffer &&
+                     bl.length() == 0) {
+                   // It looks like if the object doesn't exist the
+                   // CLS function isn't called and we don't get
+                   // -ENOENT even though we get it for the op, we
+                   // just get an empty buffer. This is crap.
+                   if (new_cursor) {
+                     new_cursor->clear();
+                   }
+                   return;
+                 } else {
+                   throw;
+                 }
+               }
+               if (entries) {
+                 entries->reserve(entries->size() + ret.kvs.size());
+                 entries->merge(std::move(ret.kvs));
+               }
+               if (new_cursor) {
+                 *new_cursor = std::move(ret.cursor);
+               }
+             }
+           });
+  }};
+}
+
+/// \brief List keys and semaphores
+///
+/// Append a call to a read operation that lists keys and semaphores
+///
+/// \param count Number of entries to fetch
+/// \param cursor Where to start
+/// \param out Output iterator
+/// \param new_cursor Where to start for the next iteration, empty if completed
+///
+/// \return The ClsReadOp to be passed to WriteOp::exec
+template<std::output_iterator<std::pair<std::string, std::uint64_t>> I>
+[[nodiscard]] inline auto list(std::uint64_t count,
+                              std::string cursor, I output,
+                              std::string* const new_cursor)
+{
+  namespace ss = ::cls::sem_set;
+  using boost::system::error_code;
+  namespace buffer = ::ceph::buffer;
+  buffer::list in;
+  ::cls::sem_set::list_op call;
+  call.count = count;
+  call.cursor = std::move(cursor);
+
+  encode(call, in);
+  return ClsReadOp{[output, new_cursor,
+                   in = std::move(in)](ReadOp& op) {
+    op.exec(ss::CLASS, ss::LIST, in,
+           [output, new_cursor](error_code ec, const buffer::list& bl) {
+             ss::list_ret ret;
+             if (!ec) {
+               auto iter = bl.cbegin();
+               decode(ret, iter);
+               std::move(ret.kvs.begin(), ret.kvs.end(), output);
+               if (new_cursor) {
+                 *new_cursor = std::move(ret.cursor);
+               }
+             }
+           });
+  }};
+}
+} // namespace neorados::cls::sem_set
index 94579ba6ef5ebb5059c4eb1383c7d78c2cf76d98..e7f579f38410cbf0a095955012c982e847e7d825 100644 (file)
@@ -88,5 +88,5 @@ if(WITH_RBD)
   add_dependencies(osd cls_rbd)
 endif()
 if(WITH_RADOSGW)
-  add_dependencies(osd cls_otp cls_rgw cls_queue cls_rgw_gc cls_2pc_queue cls_fifo)
+  add_dependencies(osd cls_otp cls_rgw cls_queue cls_rgw_gc cls_2pc_queue cls_fifo cls_sem_set)
 endif()
index 664e51387a997f269f9fdb313ce422dc597a0e18..1fd2bb5e98b220abcd81531c4930040fa00238d4 100644 (file)
@@ -58,6 +58,7 @@ if(NOT WIN32)
   add_subdirectory(cls_2pc_queue)
   add_subdirectory(cls_cmpomap)
   add_subdirectory(cls_fifo)
+  add_subdirectory(cls_sem_set)
   add_subdirectory(journal)
 
   add_subdirectory(erasure-code)
diff --git a/src/test/cls_sem_set/CMakeLists.txt b/src/test/cls_sem_set/CMakeLists.txt
new file mode 100644 (file)
index 0000000..d01cc92
--- /dev/null
@@ -0,0 +1,15 @@
+add_executable(ceph_test_cls_sem_set
+  test_cls_sem_set.cc
+  )
+target_link_libraries(ceph_test_cls_sem_set
+  libneorados
+  ${BLKID_LIBRARIES}
+  ${CMAKE_DL_LIBS}
+  ${CRYPTO_LIBS}
+  ${EXTRALIBS}
+  neoradostest-support
+  ${UNITTEST_LIBS}
+  )
+install(TARGETS
+  ceph_test_cls_sem_set
+  DESTINATION ${CMAKE_INSTALL_BINDIR})
diff --git a/src/test/cls_sem_set/test_cls_sem_set.cc b/src/test/cls_sem_set/test_cls_sem_set.cc
new file mode 100644 (file)
index 0000000..1015fe3
--- /dev/null
@@ -0,0 +1,262 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2023 IBM
+ *
+ * See file COPYING for license information.
+ *
+ */
+
+#include "neorados/cls/sem_set.h"
+
+#include <boost/system/detail/errc.hpp>
+#include <coroutine>
+#include <string>
+
+#include <boost/system/errc.hpp>
+
+#include <fmt/format.h>
+
+#include "gtest/gtest.h"
+
+#include "include/neorados/RADOS.hpp"
+
+#include "test/neorados/common_tests.h"
+
+namespace sem_set = neorados::cls::sem_set;
+namespace sys = boost::system;
+namespace container = boost::container;
+
+using neorados::ReadOp;
+using neorados::WriteOp;
+
+using namespace std::literals;
+
+/// Increment semaphore multiple times. Decrement the same number of
+/// times. Decrement once more and expect an error.
+CORO_TEST_F(cls_sem_set, inc_dec, NeoRadosTest)
+{
+  std::string_view oid = "obj";
+  co_await create_obj(oid);
+
+  auto key = "foo";
+  for (auto i = 0; i < 10; ++i) {
+    co_await execute(oid, WriteOp{}.exec(sem_set::increment(key)));
+  }
+  for (auto i = 0; i < 10; ++i) {
+    co_await execute(oid, WriteOp{}.exec(sem_set::decrement(key)));
+  }
+
+  co_await expect_error_code(
+    execute(oid, WriteOp{}.exec(sem_set::decrement(key))),
+    sys::errc::no_such_file_or_directory);
+  co_return;
+}
+
+// TODO: Uncomment when we get a GCC everywhere that doesn't crash
+// when compiling it.
+
+// /// Increment semaphore multiple times. Decrement the same number of
+// /// times. Decrement once more and expect an error. But this time use
+// /// the initializer list definition and two keys.
+// CORO_TEST_F(cls_sem_set, inc_dec_init_list, NeoRadosTest)
+// {
+//   std::string_view oid = "obj";
+//   co_await create_obj(oid);
+
+//   auto key1 = "foo";
+//   auto key2 = "bar";
+//   for (auto i = 0; i < 10; ++i) {
+//     co_await execute(oid, WriteOp{}.exec(sem_set::increment({key1, key2})));
+//   }
+//   for (auto i = 0; i < 10; ++i) {
+//     co_await execute(oid, WriteOp{}.exec(sem_set::decrement({key1, key2})));
+//   }
+
+//   co_await expect_error_code(
+//     execute(oid, WriteOp{}.exec(sem_set::decrement({key1, key2}))),
+//     sys::errc::no_such_file_or_directory);
+//   co_return;
+// }
+
+/// Send too many keys to increment, expecting an error. Then send too
+/// many keys to decrement, expecting an error.
+CORO_TEST_F(cls_sem_set, inc_dec_overflow, NeoRadosTest)
+{
+  std::string_view oid = "obj";
+  co_await create_obj(oid);
+
+  container::flat_set<std::string> strings;
+  for (auto i = 0u; i < 2 * sem_set::max_keys; ++i) {
+    strings.insert(fmt::format("key{}", i));
+  }
+
+  co_await expect_error_code(
+    execute(oid, WriteOp{}.exec(sem_set::increment(strings.begin(),
+                                                   strings.end()))),
+    sys::errc::argument_list_too_long);
+  co_await expect_error_code(
+    execute(oid, WriteOp{}.exec(sem_set::decrement(strings.begin(),
+                                                   strings.end()))),
+    sys::errc::argument_list_too_long);
+}
+
+
+/// Write the maximum number of keys we can return in one listing. Do
+/// one listing and ensure that the returned cursor is empty (thus
+/// showing list returned all values.) Repeat twice more to give
+/// coverage to the other two definitions of list.
+CORO_TEST_F(cls_sem_set, list_small, NeoRadosTest)
+{
+  std::string_view oid = "obj";
+  co_await create_obj(oid);
+  const auto max = sem_set::max_keys;
+
+  container::flat_set<std::string> ref;
+  for (auto i = 0u; i < max; ++i) {
+    ref.insert(fmt::format("key{}", i));
+  }
+  co_await execute(oid, WriteOp{}.exec(sem_set::increment(ref.begin(),
+                                                          ref.end())));
+  {
+    container::flat_map<std::string, std::uint64_t> res;
+    std::string cursor;
+    co_await execute(oid, ReadOp{}.exec(sem_set::list(max, {},
+                                                      &res, &cursor)));
+    EXPECT_TRUE(cursor.empty());
+    for (const auto& [key, value] : res) {
+      EXPECT_TRUE(ref.contains(key));
+    }
+    for (const auto& key : ref) {
+      EXPECT_TRUE(res.contains(key));
+    }
+  }
+  {
+    std::map<std::string, std::uint64_t> res;
+    std::string cursor;
+    co_await execute(oid, ReadOp{}.exec(
+                       sem_set::list(max, {},
+                                     std::inserter(res, res.end()), &cursor)));
+    EXPECT_TRUE(cursor.empty());
+    for (const auto& [key, value] : res) {
+      EXPECT_TRUE(ref.contains(key));
+    }
+    for (const auto& key : ref) {
+      EXPECT_TRUE(res.contains(key));
+    }
+  }
+  {
+    std::vector<std::pair<std::string, std::uint64_t>> res;
+    std::string cursor;
+    co_await execute(oid, ReadOp{}.exec(
+                       sem_set::list(max, {},
+                                     std::inserter(res, res.end()), &cursor)));
+    EXPECT_TRUE(cursor.empty());
+    for (const auto& [key, value] : res) {
+      EXPECT_TRUE(ref.contains(key));
+    }
+    EXPECT_EQ(ref.size(), res.size());
+  }
+}
+
+/// Write the maximum number of keys we can return in one listing
+/// several times.  Do an iterated listing to test cursor
+/// functionality and append. Check that we have all of and only the
+/// keys we should. Repeat for the other two list functions.
+CORO_TEST_F(cls_sem_set, list_large, NeoRadosTest)
+{
+  std::string_view oid = "obj";
+  co_await create_obj(oid);
+  const auto max = sem_set::max_keys;
+
+  container::flat_set<std::string> ref;
+  for (auto i = 0u; i < 4; ++i) {
+    container::flat_set<std::string> part;
+    for (auto j = 0u; j < max; ++j) {
+      part.insert(fmt::format("key{}", (i * max) + j));
+    }
+    co_await execute(oid, WriteOp{}.exec(sem_set::increment(part.begin(),
+                                                            part.end())));
+    ref.merge(std::move(part));
+  }
+  EXPECT_EQ(4 * max, ref.size());
+  std::string cursor;
+
+  container::flat_map<std::string, std::uint64_t> res_um;
+  do {
+    co_await execute(oid, ReadOp{}.exec(
+                       sem_set::list(max, cursor, &res_um, &cursor)));
+  } while (!cursor.empty());
+  for (const auto& [key, value] : res_um) {
+    EXPECT_TRUE(ref.contains(key));
+  }
+  for (const auto& key : ref) {
+    EXPECT_TRUE(res_um.contains(key));
+  }
+
+  std::map<std::string, std::uint64_t> res_m;
+  cursor.clear();
+  do {
+    co_await execute(oid, ReadOp{}.exec(
+                       sem_set::list(max, cursor,
+                                     std::inserter(res_m, res_m.end()),
+                                     &cursor)));
+  } while (!cursor.empty());
+  for (const auto& [key, value] : res_m) {
+    EXPECT_TRUE(ref.contains(key));
+  }
+  for (const auto& key : ref) {
+    EXPECT_TRUE(res_m.contains(key));
+  }
+
+  std::vector<std::pair<std::string, std::uint64_t>> res_v;
+  cursor.clear();
+  do {
+    co_await execute(oid, ReadOp{}.exec(
+                       sem_set::list(max, cursor,
+                                     std::inserter(res_v, res_v.end()),
+                                     &cursor)));
+  } while (!cursor.empty());
+  for (const auto& [key, value] : res_v) {
+    EXPECT_TRUE(ref.contains(key));
+  }
+  EXPECT_EQ(ref.size(), res_v.size());
+}
+
+// Increment some semaphores, wait, increment some more, decrement
+// them all with a time.
+CORO_TEST_F(cls_sem_set, inc_dec_time, NeoRadosTest)
+{
+  std::string_view oid = "obj";
+  co_await create_obj(oid);
+
+  const auto lose = "lose"s;
+  const auto remain = "remain"s;
+
+  co_await execute(oid, WriteOp{}.exec(sem_set::increment(lose)));
+  co_await execute(oid, WriteOp{}.exec(sem_set::increment(lose)));
+  co_await execute(oid, WriteOp{}.exec(sem_set::increment(remain)));
+  co_await execute(oid, WriteOp{}.exec(sem_set::increment(remain)));
+
+  co_await execute(oid, WriteOp{}.exec(sem_set::decrement(lose)));
+
+  co_await wait_for(1s);
+  co_await execute(oid, WriteOp{}.exec(sem_set::decrement(remain)));
+
+  container::flat_set<std::string> keys{lose, remain};
+  co_await execute(oid,
+                  WriteOp{}.exec(sem_set::decrement(std::move(keys), 100ms)));
+
+  container::flat_map<std::string, std::uint64_t> res;
+  std::string cursor;
+  co_await execute(oid, ReadOp{}.exec(sem_set::list(sem_set::max_keys, {},
+                                                   &res, &cursor)));
+
+  EXPECT_TRUE(cursor.empty());
+  EXPECT_EQ(1u, res.size());
+  EXPECT_EQ(remain, res.begin()->first);
+
+  co_return;
+}