]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/rados: index operations use async_reads/writes()
authorCasey Bodley <cbodley@redhat.com>
Thu, 7 Nov 2024 21:10:15 +0000 (16:10 -0500)
committerCasey Bodley <cbodley@redhat.com>
Tue, 8 Apr 2025 15:45:23 +0000 (11:45 -0400)
replace the classes derived from CLSRGWConcurrentIO with classes
derived from Reader/Writer/RevertibleWriter and use the async algorithms

Signed-off-by: Casey Bodley <cbodley@redhat.com>
src/cls/rgw/cls_rgw_client.cc
src/cls/rgw/cls_rgw_client.h
src/cls/rgw/cls_rgw_ops.h
src/rgw/driver/rados/rgw_rados.cc
src/rgw/driver/rados/rgw_rados.h
src/rgw/driver/rados/rgw_sal_rados.cc
src/rgw/services/svc_bi_rados.cc
src/rgw/services/svc_bi_rados.h
src/rgw/services/svc_bilog_rados.cc

index 6c97311eb0ad266fd5ffa74e5f017b1a7a3077bf..33cd0de48e30582089bb08dd8d21e9014992633e 100644 (file)
@@ -230,17 +230,22 @@ static bool issue_bucket_index_clean_op(librados::IoCtx& io_ctx,
   return manager->aio_operate(io_ctx, shard_id, oid, &op);
 }
 
+void cls_rgw_bucket_set_tag_timeout(librados::ObjectWriteOperation& op,
+                                    uint64_t timeout)
+{
+  const auto call = rgw_cls_tag_timeout_op{.tag_timeout = timeout};
+  bufferlist in;
+  encode(call, in);
+  op.exec(RGW_CLASS, RGW_BUCKET_SET_TAG_TIMEOUT, in);
+}
+
 static bool issue_bucket_set_tag_timeout_op(librados::IoCtx& io_ctx,
                                            const int shard_id,
                                            const string& oid,
                                            uint64_t timeout,
                                            BucketIndexAioManager *manager) {
-  bufferlist in;
-  rgw_cls_tag_timeout_op call;
-  call.tag_timeout = timeout;
-  encode(call, in);
   ObjectWriteOperation op;
-  op.exec(RGW_CLASS, RGW_BUCKET_SET_TAG_TIMEOUT, in);
+  cls_rgw_bucket_set_tag_timeout(op, timeout);
   return manager->aio_operate(io_ctx, shard_id, oid, &op);
 }
 
@@ -725,11 +730,16 @@ int CLSRGWIssueBILogTrim::issue_op(const int shard_id, const string& oid)
   return issue_bi_log_trim(io_ctx, oid, shard_id, start_marker_mgr, end_marker_mgr, &manager);
 }
 
+void cls_rgw_bucket_reshard_log_trim(librados::ObjectWriteOperation& op)
+{
+  bufferlist in;
+  op.exec(RGW_CLASS, RGW_RESHARD_LOG_TRIM, in);
+}
+
 static bool issue_reshard_log_trim(librados::IoCtx& io_ctx, const string& oid, int shard_id,
                                    BucketIndexAioManager *manager) {
-  bufferlist in;
   ObjectWriteOperation op;
-  op.exec(RGW_CLASS, RGW_RESHARD_LOG_TRIM, in);
+  cls_rgw_bucket_reshard_log_trim(op);
   return manager->aio_operate(io_ctx, shard_id, oid, &op);
 }
 
@@ -738,6 +748,20 @@ int CLSRGWIssueReshardLogTrim::issue_op(int shard_id, const string& oid)
   return issue_reshard_log_trim(io_ctx, oid, shard_id, &manager);
 }
 
+void cls_rgw_bucket_check_index(librados::ObjectReadOperation& op,
+                                bufferlist& out)
+{
+  bufferlist in;
+  op.exec(RGW_CLASS, RGW_BUCKET_CHECK_INDEX, in, &out, nullptr);
+}
+
+void cls_rgw_bucket_check_index_decode(const bufferlist& out,
+                                       rgw_cls_check_index_ret& result)
+{
+  auto p = out.cbegin();
+  decode(result, p);
+}
+
 static bool issue_bucket_check_index_op(IoCtx& io_ctx, const int shard_id, const string& oid, BucketIndexAioManager *manager,
     rgw_cls_check_index_ret *pdata) {
   bufferlist in;
@@ -752,11 +776,16 @@ int CLSRGWIssueBucketCheck::issue_op(int shard_id, const string& oid)
   return issue_bucket_check_index_op(io_ctx, shard_id, oid, &manager, &result[shard_id]);
 }
 
+void cls_rgw_bucket_rebuild_index(librados::ObjectWriteOperation& op)
+{
+  bufferlist in;
+  op.exec(RGW_CLASS, RGW_BUCKET_REBUILD_INDEX, in);
+}
+
 static bool issue_bucket_rebuild_index_op(IoCtx& io_ctx, const int shard_id, const string& oid,
     BucketIndexAioManager *manager) {
-  bufferlist in;
   librados::ObjectWriteOperation op;
-  op.exec(RGW_CLASS, RGW_BUCKET_REBUILD_INDEX, in);
+  cls_rgw_bucket_rebuild_index(op);
   return manager->aio_operate(io_ctx, shard_id, oid, &op);
 }
 
@@ -786,11 +815,16 @@ int CLSRGWIssueGetDirHeader::issue_op(const int shard_id, const string& oid)
                              0, false, &manager, &result[shard_id]);
 }
 
-static bool issue_resync_bi_log(librados::IoCtx& io_ctx, const int shard_id, const string& oid, BucketIndexAioManager *manager)
+void cls_rgw_bilog_start(ObjectWriteOperation& op)
 {
   bufferlist in;
-  librados::ObjectWriteOperation op;
   op.exec(RGW_CLASS, RGW_BI_LOG_RESYNC, in);
+}
+
+static bool issue_resync_bi_log(librados::IoCtx& io_ctx, const int shard_id, const string& oid, BucketIndexAioManager *manager)
+{
+  librados::ObjectWriteOperation op;
+  cls_rgw_bilog_start(op);
   return manager->aio_operate(io_ctx, shard_id, oid, &op);
 }
 
@@ -799,11 +833,16 @@ int CLSRGWIssueResyncBucketBILog::issue_op(const int shard_id, const string& oid
   return issue_resync_bi_log(io_ctx, shard_id, oid, &manager);
 }
 
-static bool issue_bi_log_stop(librados::IoCtx& io_ctx, const int shard_id, const string& oid, BucketIndexAioManager *manager)
+void cls_rgw_bilog_stop(ObjectWriteOperation& op)
 {
   bufferlist in;
-  librados::ObjectWriteOperation op;
   op.exec(RGW_CLASS, RGW_BI_LOG_STOP, in);
+}
+
+static bool issue_bi_log_stop(librados::IoCtx& io_ctx, const int shard_id, const string& oid, BucketIndexAioManager *manager)
+{
+  librados::ObjectWriteOperation op;
+  cls_rgw_bilog_stop(op);
   return manager->aio_operate(io_ctx, shard_id, oid, &op);
 }
 
@@ -1214,49 +1253,31 @@ void cls_rgw_reshard_remove(librados::ObjectWriteOperation& op, const cls_rgw_re
   op.exec(RGW_CLASS, RGW_RESHARD_REMOVE, in);
 }
 
-int cls_rgw_set_bucket_resharding(librados::IoCtx& io_ctx, const string& oid,
-                                 const cls_rgw_bucket_instance_entry& entry)
-{
-  bufferlist in, out;
-  cls_rgw_set_bucket_resharding_op call;
-  call.entry = entry;
-  encode(call, in);
-  librados::ObjectWriteOperation op;
-  op.exec(RGW_CLASS, RGW_SET_BUCKET_RESHARDING, in);
-  return io_ctx.operate(oid, &op);
-}
-
-int cls_rgw_clear_bucket_resharding(librados::IoCtx& io_ctx, const string& oid)
+void cls_rgw_clear_bucket_resharding(librados::ObjectWriteOperation& op)
 {
-  bufferlist in, out;
+  bufferlist in;
   cls_rgw_clear_bucket_resharding_op call;
   encode(call, in);
-  librados::ObjectWriteOperation op;
   op.exec(RGW_CLASS, RGW_CLEAR_BUCKET_RESHARDING, in);
-  return io_ctx.operate(oid, &op);
 }
 
-int cls_rgw_get_bucket_resharding(librados::IoCtx& io_ctx, const string& oid,
-                                 cls_rgw_bucket_instance_entry *entry)
+void cls_rgw_get_bucket_resharding(librados::ObjectReadOperation& op,
+                                   bufferlist& out)
 {
-  bufferlist in, out;
+  bufferlist in;
   cls_rgw_get_bucket_resharding_op call;
   encode(call, in);
-  int r= io_ctx.exec(oid, RGW_CLASS, RGW_GET_BUCKET_RESHARDING, in, out);
-  if (r < 0)
-    return r;
+  op.exec(RGW_CLASS, RGW_GET_BUCKET_RESHARDING, in, &out, nullptr);
+}
 
+void cls_rgw_get_bucket_resharding_decode(const bufferlist& out,
+                                          cls_rgw_bucket_instance_entry& entry)
+{
   cls_rgw_get_bucket_resharding_ret op_ret;
   auto iter = out.cbegin();
-  try {
-    decode(op_ret, iter);
-  } catch (ceph::buffer::error& err) {
-    return -EIO;
-  }
-
-  *entry = op_ret.new_instance;
+  decode(op_ret, iter);
 
-  return 0;
+  entry = std::move(op_ret.new_instance);
 }
 
 void cls_rgw_guard_bucket_resharding(librados::ObjectOperation& op, int ret_err)
@@ -1268,17 +1289,24 @@ void cls_rgw_guard_bucket_resharding(librados::ObjectOperation& op, int ret_err)
   op.exec(RGW_CLASS, RGW_GUARD_BUCKET_RESHARDING, in);
 }
 
-static bool issue_set_bucket_resharding(librados::IoCtx& io_ctx,
-                                       const int shard_id, const string& oid,
-                                        const cls_rgw_bucket_instance_entry& entry,
-                                        BucketIndexAioManager *manager) {
+void cls_rgw_set_bucket_resharding(librados::ObjectWriteOperation& op,
+                                   cls_rgw_reshard_status status)
+{
   bufferlist in;
   cls_rgw_set_bucket_resharding_op call;
-  call.entry = entry;
+  call.entry.reshard_status = status;
   encode(call, in);
-  librados::ObjectWriteOperation op;
+
   op.assert_exists(); // the shard must exist; if not fail rather than recreate
   op.exec(RGW_CLASS, RGW_SET_BUCKET_RESHARDING, in);
+}
+
+static bool issue_set_bucket_resharding(librados::IoCtx& io_ctx,
+                                       const int shard_id, const string& oid,
+                                        const cls_rgw_bucket_instance_entry& entry,
+                                        BucketIndexAioManager *manager) {
+  librados::ObjectWriteOperation op;
+  cls_rgw_set_bucket_resharding(op, entry.reshard_status);
   return manager->aio_operate(io_ctx, shard_id, oid, &op);
 }
 
index 8776c1b5c0446a4b566cc5a8c650856058997148..7cc7558f66bfc4ab251db728baa3bc398bc97242 100644 (file)
@@ -302,6 +302,8 @@ public:
   int operator()();
 }; // class CLSRGWConcurrentIO
 
+void cls_rgw_bucket_set_tag_timeout(librados::ObjectWriteOperation& op,
+                                    uint64_t timeout);
 
 class CLSRGWIssueBucketIndexInit : public CLSRGWConcurrentIO {
 protected:
@@ -542,6 +544,12 @@ public:
       CLSRGWConcurrentIO(io_ctx, _bucket_objs, max_aio) {}
 };
 
+void cls_rgw_bucket_check_index(librados::ObjectReadOperation& op,
+                                bufferlist& out);
+// decode the response; may throw buffer::error
+void cls_rgw_bucket_check_index_decode(const bufferlist& out,
+                                       rgw_cls_check_index_ret& result);
+
 /**
  * Check the bucket index.
  *
@@ -563,6 +571,8 @@ public:
   virtual ~CLSRGWIssueBucketCheck() override {}
 };
 
+void cls_rgw_bucket_rebuild_index(librados::ObjectWriteOperation& op);
+
 class CLSRGWIssueBucketRebuild : public CLSRGWConcurrentIO {
 protected:
   int issue_op(int shard_id, const std::string& oid) override;
@@ -594,6 +604,9 @@ public:
   virtual ~CLSRGWIssueSetBucketResharding() override {}
 };
 
+void cls_rgw_bilog_start(librados::ObjectWriteOperation& op);
+void cls_rgw_bilog_stop(librados::ObjectWriteOperation& op);
+
 class CLSRGWIssueResyncBucketBILog : public CLSRGWConcurrentIO {
 protected:
   int issue_op(int shard_id, const std::string& oid);
@@ -684,12 +697,15 @@ int cls_rgw_reshard_get(librados::IoCtx& io_ctx, const std::string& oid, cls_rgw
 // cls_rgw in the T+4 (X) release.
 void cls_rgw_guard_bucket_resharding(librados::ObjectOperation& op, int ret_err);
 
-// these overloads which call io_ctx.operate() should not be called in the rgw.
-// rgw_rados_operate() should be called after the overloads w/o calls to io_ctx.operate()
-#ifndef CLS_CLIENT_HIDE_IOCTX
-int cls_rgw_set_bucket_resharding(librados::IoCtx& io_ctx, const std::string& oid,
-                                  const cls_rgw_bucket_instance_entry& entry);
-int cls_rgw_clear_bucket_resharding(librados::IoCtx& io_ctx, const std::string& oid);
-int cls_rgw_get_bucket_resharding(librados::IoCtx& io_ctx, const std::string& oid,
-                                  cls_rgw_bucket_instance_entry *entry);
-#endif
+void cls_rgw_set_bucket_resharding(librados::ObjectWriteOperation& op,
+                                   cls_rgw_reshard_status status);
+void cls_rgw_clear_bucket_resharding(librados::ObjectWriteOperation& op);
+void cls_rgw_get_bucket_resharding(librados::ObjectReadOperation& op,
+                                   bufferlist& out);
+// decode the entry; may throw buffer::error
+void cls_rgw_get_bucket_resharding_decode(const bufferlist& out,
+                                          cls_rgw_bucket_instance_entry& entry);
+
+// Try to remove all reshard log entries from the bucket index. Return success
+// if any entries were removed, and -ENODATA once they're all gone.
+void cls_rgw_bucket_reshard_log_trim(librados::ObjectWriteOperation& op);
index 2aababad4d683f2f679b1bff13a058f748ac90a1..225df29fe510f84c665d724cea57ee71a676c3e8 100644 (file)
@@ -7,9 +7,7 @@
 
 struct rgw_cls_tag_timeout_op
 {
-  uint64_t tag_timeout;
-
-  rgw_cls_tag_timeout_op() : tag_timeout(0) {}
+  uint64_t tag_timeout = 0;
 
   void encode(ceph::buffer::list &bl) const {
     ENCODE_START(1, 1, bl);
index 0cae52de13d92882390c0a6f2cff0e49b1f35864..179694220ee61bc8d630515318661a15bb56899f 100644 (file)
@@ -5993,53 +5993,27 @@ int RGWRados::bucket_check_index(const DoutPrefixProvider *dpp, optional_yield y
                                 map<RGWObjCategory, RGWStorageStats> *existing_stats,
                                 map<RGWObjCategory, RGWStorageStats> *calculated_stats)
 {
-  librados::IoCtx index_pool;
-
-  // key - bucket index object id
-  // value - bucket index check OP returned result with the given bucket index object (shard)
-  map<int, string> oids;
-
-  int ret = svc.bi_rados->open_bucket_index(dpp, bucket_info, std::nullopt, bucket_info.layout.current_index, &index_pool, &oids, nullptr);
+  std::map<int, bufferlist> buffers;
+  int ret = svc.bi_rados->check_index(dpp, y, bucket_info, buffers);
   if (ret < 0) {
     return ret;
   }
 
-  // declare and pre-populate
-  map<int, struct rgw_cls_check_index_ret> bucket_objs_ret;
-  for (auto& iter : oids) {
-    bucket_objs_ret.emplace(iter.first, rgw_cls_check_index_ret());
-  }
-
-  maybe_warn_about_blocking(dpp); // TODO: use AioTrottle
-  ret = CLSRGWIssueBucketCheck(index_pool, oids, bucket_objs_ret, cct->_conf->rgw_bucket_index_max_aio)();
-  if (ret < 0) {
-    return ret;
-  }
+  try {
+    // decode and accumulate the results
+    for (const auto& kv : buffers) {
+      rgw_cls_check_index_ret result;
+      cls_rgw_bucket_check_index_decode(kv.second, result);
 
-  // aggregate results (from different shards if there are any)
-  for (const auto& iter : bucket_objs_ret) {
-    accumulate_raw_stats(iter.second.existing_header, *existing_stats);
-    accumulate_raw_stats(iter.second.calculated_header, *calculated_stats);
+      accumulate_raw_stats(result.existing_header, *existing_stats);
+      accumulate_raw_stats(result.calculated_header, *calculated_stats);
+    }
+  } catch (const ceph::buffer::error&) {
+    return -EIO;
   }
-
   return 0;
 }
 
-int RGWRados::bucket_rebuild_index(const DoutPrefixProvider *dpp, optional_yield y,
-                                   const RGWBucketInfo& bucket_info)
-{
-  librados::IoCtx index_pool;
-  map<int, string> bucket_objs;
-
-  int r = svc.bi_rados->open_bucket_index(dpp, bucket_info, std::nullopt, bucket_info.layout.current_index, &index_pool, &bucket_objs, nullptr);
-  if (r < 0) {
-    return r;
-  }
-
-  maybe_warn_about_blocking(dpp); // TODO: use AioTrottle
-  return CLSRGWIssueBucketRebuild(index_pool, bucket_objs, cct->_conf->rgw_bucket_index_max_aio)();
-}
-
 static int resync_encrypted_multipart(const DoutPrefixProvider* dpp,
                                       optional_yield y, RGWRados* store,
                                       RGWBucketInfo& bucket_info,
@@ -8341,6 +8315,26 @@ int RGWRados::recover_reshard_logrecord(RGWBucketInfo& bucket_info,
   return 0;
 }
 
+static int get_reshard_status(const DoutPrefixProvider* dpp, optional_yield y,
+                              librados::IoCtx& ioctx, const std::string& oid,
+                              cls_rgw_bucket_instance_entry& entry)
+{
+  librados::ObjectReadOperation op;
+  bufferlist bl;
+  cls_rgw_get_bucket_resharding(op, bl);
+
+  int ret = rgw_rados_operate(dpp, ioctx, oid, std::move(op), nullptr, y);
+  if (ret < 0) {
+    return ret;
+  }
+  try {
+    cls_rgw_get_bucket_resharding_decode(bl, entry);
+    return 0;
+  } catch (const buffer::error&) {
+    return -EIO;
+  }
+}
+
 int RGWRados::block_while_resharding(RGWRados::BucketShard *bs,
                                      const rgw_obj& obj_instance,
                                      RGWBucketInfo& bucket_info,
@@ -8389,7 +8383,7 @@ int RGWRados::block_while_resharding(RGWRados::BucketShard *bs,
   constexpr int num_retries = 10;
   for (int i = 1; i <= num_retries; i++) { // nb: 1-based for loop
     auto& ref = bs->bucket_obj;
-    ret = cls_rgw_get_bucket_resharding(ref.ioctx, ref.obj.oid, &entry);
+    ret = get_reshard_status(dpp, y, ref.ioctx, ref.obj.oid, entry);
     if (ret == -ENOENT) {
       ret = fetch_new_bucket_info("get_bucket_resharding_failed");
       if (ret < 0) {
@@ -10230,11 +10224,9 @@ int RGWRados::cls_bucket_list_ordered(const DoutPrefixProvider *dpp,
 
   std::map<int, rgw_cls_list_ret> shard_list_results;
   cls_rgw_obj_key start_after_key(start_after.name, start_after.instance);
-  maybe_warn_about_blocking(dpp); // TODO: use AioTrottle
-  r = CLSRGWIssueBucketList(ioctx, start_after_key, prefix, delimiter,
-                           num_entries_per_shard,
-                           list_versions, shard_oids, shard_list_results,
-                           cct->_conf->rgw_bucket_index_max_aio)();
+  r = svc.bi_rados->list_objects(dpp, y, ioctx, shard_oids, start_after_key,
+                                 prefix, delimiter, num_entries_per_shard,
+                                 list_versions, shard_list_results);
   if (r < 0) {
     ldpp_dout(dpp, 0) << __func__ <<
       ": CLSRGWIssueBucketList for " << bucket_info.bucket <<
index 57e60ceb7818ed9dfe2cc6e22a4d7b2cc6adec2f..01be66ca6177d9cdff7716308f0d58fd01b8cb5e 100644 (file)
@@ -1594,8 +1594,6 @@ public:
                          const RGWBucketInfo& bucket_info,
                          std::map<RGWObjCategory, RGWStorageStats> *existing_stats,
                          std::map<RGWObjCategory, RGWStorageStats> *calculated_stats);
-  int bucket_rebuild_index(const DoutPrefixProvider *dpp, optional_yield y,
-                           const RGWBucketInfo& bucket_info);
 
   // Search the bucket for encrypted multipart uploads, and increase their mtime
   // slightly to generate a bilog entry to trigger a resync to repair any
index 447e33ef8d115ec90d7be2816c91e87fa2be034f..14d4323ac96a7d7ef9b1c37e869c41dcd2b72bf9 100644 (file)
@@ -806,12 +806,12 @@ int RadosBucket::check_index(const DoutPrefixProvider *dpp, optional_yield y,
 
 int RadosBucket::rebuild_index(const DoutPrefixProvider *dpp, optional_yield y)
 {
-  return store->getRados()->bucket_rebuild_index(dpp, y, info);
+  return store->svc()->bi_rados->rebuild_index(dpp, y, info);
 }
 
 int RadosBucket::set_tag_timeout(const DoutPrefixProvider *dpp, optional_yield y, uint64_t timeout)
 {
-  return store->getRados()->cls_obj_set_bucket_tag_timeout(dpp, info, timeout);
+  return store->svc()->bi_rados->set_tag_timeout(dpp, y, info, timeout);
 }
 
 int RadosBucket::purge_instance(const DoutPrefixProvider* dpp, optional_yield y)
index a82aae180b8d5baab1053902df7432a052a66211..e0216400a1751fbd3be1ab94aa6a5a3972b54784 100644 (file)
@@ -1,6 +1,9 @@
 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
 // vim: ts=8 sw=2 smarttab ft=cpp
 
+#include <algorithm>
+#include <iterator>
+
 #include "svc_bi_rados.h"
 #include "svc_bilog_rados.h"
 #include "svc_zone.h"
 #include "rgw_zone.h"
 #include "rgw_datalog.h"
 
+#include "driver/rados/shard_io.h"
 #include "cls/rgw/cls_rgw_client.h"
+#include "common/async/blocked_completion.h"
 #include "common/errno.h"
 
 #define dout_subsys ceph_subsys_rgw
 
 using namespace std;
+using rgwrados::shard_io::Result;
 
 static string dir_oid_prefix = ".dir.";
 
@@ -322,6 +328,32 @@ int RGWSI_BucketIndex_RADOS::open_bucket_index_shard(const DoutPrefixProvider *d
   return 0;
 }
 
+struct IndexHeadReader : rgwrados::shard_io::RadosReader {
+  std::map<int, bufferlist>& buffers;
+
+  IndexHeadReader(const DoutPrefixProvider& dpp,
+                  boost::asio::any_io_executor ex,
+                  librados::IoCtx& ioctx,
+                  std::map<int, bufferlist>& buffers)
+    : RadosReader(dpp, std::move(ex), ioctx), buffers(buffers)
+  {}
+  void prepare_read(int shard, librados::ObjectReadOperation& op) override {
+    auto& bl = buffers[shard];
+    op.omap_get_header(&bl, nullptr);
+  }
+  Result on_complete(int, boost::system::error_code ec) override {
+    // ignore ENOENT
+    if (ec && ec != boost::system::errc::no_such_file_or_directory) {
+      return Result::Error;
+    } else {
+      return Result::Success;
+    }
+  }
+  void add_prefix(std::ostream& out) const override {
+    out << "read dir headers: ";
+  }
+};
+
 int RGWSI_BucketIndex_RADOS::cls_bucket_head(const DoutPrefixProvider *dpp,
                                              const RGWBucketInfo& bucket_info,
                                              const rgw::bucket_index_layout_generation& idx_layout,
@@ -336,24 +368,85 @@ int RGWSI_BucketIndex_RADOS::cls_bucket_head(const DoutPrefixProvider *dpp,
   if (r < 0)
     return r;
 
-  map<int, struct rgw_cls_list_ret> list_results;
-  for (auto& iter : oids) {
-    list_results.emplace(iter.first, rgw_cls_list_ret());
-  }
+  // read omap headers into bufferlists
+  std::map<int, bufferlist> buffers;
 
-  maybe_warn_about_blocking(dpp); // TODO: use AioTrottle
-  r = CLSRGWIssueGetDirHeader(index_pool, oids, list_results,
-                             cct->_conf->rgw_bucket_index_max_aio)();
-  if (r < 0)
-    return r;
+  const size_t max_aio = cct->_conf->rgw_bucket_index_max_aio;
+  boost::system::error_code ec;
+  if (y) {
+    // run on the coroutine's executor and suspend until completion
+    auto yield = y.get_yield_context();
+    auto ex = yield.get_executor();
+    auto reader = IndexHeadReader{*dpp, ex, index_pool, buffers};
+
+    rgwrados::shard_io::async_reads(reader, oids, max_aio, yield[ec]);
+  } else {
+    // run a strand on the system executor and block on a condition variable
+    auto ex = boost::asio::make_strand(boost::asio::system_executor{});
+    auto reader = IndexHeadReader{*dpp, ex, index_pool, buffers};
 
-  map<int, struct rgw_cls_list_ret>::iterator iter = list_results.begin();
-  for(; iter != list_results.end(); ++iter) {
-    headers->push_back(std::move(iter->second.dir.header));
+    maybe_warn_about_blocking(dpp);
+    rgwrados::shard_io::async_reads(reader, oids, max_aio,
+                                    ceph::async::use_blocked[ec]);
+  }
+  if (ec) {
+    return ceph::from_error_code(ec);
+  }
+
+  try {
+    std::transform(buffers.begin(), buffers.end(),
+                   std::back_inserter(*headers),
+                   [] (const auto& kv) {
+                     rgw_bucket_dir_header header;
+                     auto p = kv.second.cbegin();
+                     decode(header, p);
+                     return header;
+                   });
+  } catch (const ceph::buffer::error&) {
+    return -EIO;
   }
   return 0;
 }
 
+// init_index() is all-or-nothing so if we fail to initialize all shards,
+// we undo the creation of others. RevertibleWriter provides these semantics
+struct IndexInitWriter : rgwrados::shard_io::RadosRevertibleWriter {
+  bool judge_support_logrecord;
+
+  IndexInitWriter(const DoutPrefixProvider& dpp,
+                  boost::asio::any_io_executor ex,
+                  librados::IoCtx& ioctx,
+                  bool judge_support_logrecord)
+    : RadosRevertibleWriter(dpp, std::move(ex), ioctx),
+      judge_support_logrecord(judge_support_logrecord)
+  {}
+  void prepare_write(int shard, librados::ObjectWriteOperation& op) override {
+    // don't overwrite. fail with EEXIST if a shard already exists
+    op.create(true);
+    if (judge_support_logrecord) {
+      // fail with EOPNOTSUPP if the osd doesn't support the reshard log
+      cls_rgw_bucket_init_index2(op);
+    } else {
+      cls_rgw_bucket_init_index(op);
+    }
+  }
+  void prepare_revert(int shard, librados::ObjectWriteOperation& op) override {
+    // on failure, remove any of the shards we successfully created
+    op.remove();
+  }
+  Result on_complete(int, boost::system::error_code ec) override {
+    // ignore EEXIST
+    if (ec && ec != boost::system::errc::file_exists) {
+      return Result::Error;
+    } else {
+      return Result::Success;
+    }
+  }
+  void add_prefix(std::ostream& out) const override {
+    out << "init index shards: ";
+  }
+};
+
 int RGWSI_BucketIndex_RADOS::init_index(const DoutPrefixProvider *dpp,
                                         optional_yield y,
                                         const RGWBucketInfo& bucket_info,
@@ -377,18 +470,49 @@ int RGWSI_BucketIndex_RADOS::init_index(const DoutPrefixProvider *dpp,
   map<int, string> bucket_objs;
   get_bucket_index_objects(dir_oid, idx_layout.layout.normal.num_shards, idx_layout.gen, &bucket_objs);
 
-  maybe_warn_about_blocking(dpp); // TODO: use AioTrottle
-  if (judge_support_logrecord) {
-    return CLSRGWIssueBucketIndexInit2(index_pool,
-                                       bucket_objs,
-                                       cct->_conf->rgw_bucket_index_max_aio)();
+  const size_t max_aio = cct->_conf->rgw_bucket_index_max_aio;
+  boost::system::error_code ec;
+  if (y) {
+    // run on the coroutine's executor and suspend until completion
+    auto yield = y.get_yield_context();
+    auto ex = yield.get_executor();
+    auto writer = IndexInitWriter{*dpp, ex, index_pool, judge_support_logrecord};
+
+    rgwrados::shard_io::async_writes(writer, bucket_objs, max_aio, yield[ec]);
   } else {
-    return CLSRGWIssueBucketIndexInit(index_pool,
-                                      bucket_objs,
-                                      cct->_conf->rgw_bucket_index_max_aio)();
+    // run a strand on the system executor and block on a condition variable
+    auto ex = boost::asio::make_strand(boost::asio::system_executor{});
+    auto writer = IndexInitWriter{*dpp, ex, index_pool, judge_support_logrecord};
+
+    maybe_warn_about_blocking(dpp);
+    rgwrados::shard_io::async_writes(writer, bucket_objs, max_aio,
+                                     ceph::async::use_blocked[ec]);
   }
+  return ceph::from_error_code(ec);
 }
 
+struct IndexCleanWriter : rgwrados::shard_io::RadosWriter {
+  IndexCleanWriter(const DoutPrefixProvider& dpp,
+                   boost::asio::any_io_executor ex,
+                   librados::IoCtx& ioctx)
+    : RadosWriter(dpp, std::move(ex), ioctx)
+  {}
+  void prepare_write(int shard, librados::ObjectWriteOperation& op) override {
+    op.remove();
+  }
+  Result on_complete(int, boost::system::error_code ec) override {
+    // ignore ENOENT
+    if (ec && ec != boost::system::errc::no_such_file_or_directory) {
+      return Result::Error;
+    } else {
+      return Result::Success;
+    }
+  }
+  void add_prefix(std::ostream& out) const override {
+    out << "clean index shards: ";
+  }
+};
+
 int RGWSI_BucketIndex_RADOS::clean_index(const DoutPrefixProvider *dpp,
                                          optional_yield y,
                                          const RGWBucketInfo& bucket_info,
@@ -412,10 +536,25 @@ int RGWSI_BucketIndex_RADOS::clean_index(const DoutPrefixProvider *dpp,
   get_bucket_index_objects(dir_oid, idx_layout.layout.normal.num_shards,
                            idx_layout.gen, &bucket_objs);
 
-  maybe_warn_about_blocking(dpp); // TODO: use AioTrottle
-  return CLSRGWIssueBucketIndexClean(index_pool,
-                                    bucket_objs,
-                                    cct->_conf->rgw_bucket_index_max_aio)();
+  const size_t max_aio = cct->_conf->rgw_bucket_index_max_aio;
+  boost::system::error_code ec;
+  if (y) {
+    // run on the coroutine's executor and suspend until completion
+    auto yield = y.get_yield_context();
+    auto ex = yield.get_executor();
+    auto writer = IndexCleanWriter{*dpp, ex, index_pool};
+
+    rgwrados::shard_io::async_writes(writer, bucket_objs, max_aio, yield[ec]);
+  } else {
+    // run a strand on the system executor and block on a condition variable
+    auto ex = boost::asio::make_strand(boost::asio::system_executor{});
+    auto writer = IndexCleanWriter{*dpp, ex, index_pool};
+
+    maybe_warn_about_blocking(dpp);
+    rgwrados::shard_io::async_writes(writer, bucket_objs, max_aio,
+                                     ceph::async::use_blocked[ec]);
+  }
+  return ceph::from_error_code(ec);
 }
 
 int RGWSI_BucketIndex_RADOS::read_stats(const DoutPrefixProvider *dpp,
@@ -452,6 +591,32 @@ int RGWSI_BucketIndex_RADOS::read_stats(const DoutPrefixProvider *dpp,
   return 0;
 }
 
+struct ReshardStatusReader : rgwrados::shard_io::RadosReader {
+  std::map<int, bufferlist>& buffers;
+
+  ReshardStatusReader(const DoutPrefixProvider& dpp,
+                      boost::asio::any_io_executor ex,
+                      librados::IoCtx& ioctx,
+                      std::map<int, bufferlist>& buffers)
+    : RadosReader(dpp, std::move(ex), ioctx), buffers(buffers)
+  {}
+  void prepare_read(int shard, librados::ObjectReadOperation& op) override {
+    auto& bl = buffers[shard];
+    cls_rgw_get_bucket_resharding(op, bl);
+  }
+  Result on_complete(int, boost::system::error_code ec) override {
+    // ignore ENOENT
+    if (ec && ec != boost::system::errc::no_such_file_or_directory) {
+      return Result::Error;
+    } else {
+      return Result::Success;
+    }
+  }
+  void add_prefix(std::ostream& out) const override {
+    out << "get resharding status: ";
+  }
+};
+
 int RGWSI_BucketIndex_RADOS::get_reshard_status(const DoutPrefixProvider *dpp,
                                                 optional_yield y,
                                                 const RGWBucketInfo& bucket_info,
@@ -471,28 +636,65 @@ int RGWSI_BucketIndex_RADOS::get_reshard_status(const DoutPrefixProvider *dpp,
     return r;
   }
 
-  for (auto i : bucket_objs) {
-    cls_rgw_bucket_instance_entry entry;
+  std::map<int, bufferlist> buffers;
+  const size_t max_aio = cct->_conf->rgw_bucket_index_max_aio;
+  boost::system::error_code ec;
+  if (y) {
+    // run on the coroutine's executor and suspend until completion
+    auto yield = y.get_yield_context();
+    auto ex = yield.get_executor();
+    auto reader = ReshardStatusReader{*dpp, ex, index_pool, buffers};
 
-    int ret = cls_rgw_get_bucket_resharding(index_pool, i.second, &entry);
-    if (ret < 0 && ret != -ENOENT) {
-      ldpp_dout(dpp, -1) << "ERROR: " << __func__ << ": cls_rgw_get_bucket_resharding() returned ret=" << ret << dendl;
-      return ret;
-    }
+    rgwrados::shard_io::async_reads(reader, bucket_objs, max_aio, yield[ec]);
+  } else {
+    // run a strand on the system executor and block on a condition variable
+    auto ex = boost::asio::make_strand(boost::asio::system_executor{});
+    auto reader = ReshardStatusReader{*dpp, ex, index_pool, buffers};
+
+    maybe_warn_about_blocking(dpp);
+    rgwrados::shard_io::async_reads(reader, bucket_objs, max_aio,
+                                    ceph::async::use_blocked[ec]);
+  }
+  if (ec) {
+    return ceph::from_error_code(ec);
+  }
 
-    status->push_back(entry);
+  try {
+    std::transform(buffers.begin(), buffers.end(),
+                   std::back_inserter(*status),
+                   [] (const auto& kv) {
+                     cls_rgw_bucket_instance_entry entry;
+                     cls_rgw_get_bucket_resharding_decode(kv.second, entry);
+                     return entry;
+                   });
+  } catch (const ceph::buffer::error&) {
+    return -EIO;
   }
 
   return 0;
 }
 
+struct ReshardStatusWriter : rgwrados::shard_io::RadosWriter {
+  cls_rgw_reshard_status status;
+  ReshardStatusWriter(const DoutPrefixProvider& dpp,
+                      boost::asio::any_io_executor ex,
+                      librados::IoCtx& ioctx,
+                      cls_rgw_reshard_status status)
+    : RadosWriter(dpp, std::move(ex), ioctx), status(status)
+  {}
+  void prepare_write(int, librados::ObjectWriteOperation& op) override {
+    cls_rgw_set_bucket_resharding(op, status);
+  }
+  void add_prefix(std::ostream& out) const override {
+    out << "set resharding status: ";
+  }
+};
+
 int RGWSI_BucketIndex_RADOS::set_reshard_status(const DoutPrefixProvider *dpp,
                                                 optional_yield y,
                                                 const RGWBucketInfo& bucket_info,
                                                 cls_rgw_reshard_status status)
 {
-  const auto entry = cls_rgw_bucket_instance_entry{.reshard_status = status};
-
   librados::IoCtx index_pool;
   map<int, string> bucket_objs;
 
@@ -504,16 +706,47 @@ int RGWSI_BucketIndex_RADOS::set_reshard_status(const DoutPrefixProvider *dpp,
     return r;
   }
 
-  maybe_warn_about_blocking(dpp); // TODO: use AioTrottle
-  r = CLSRGWIssueSetBucketResharding(index_pool, bucket_objs, entry, cct->_conf->rgw_bucket_index_max_aio)();
-  if (r < 0) {
-    ldpp_dout(dpp, 0) << "ERROR: " << __func__ <<
-      ": unable to issue set bucket resharding, r=" << r << " (" <<
-      cpp_strerror(-r) << ")" << dendl;
+  const size_t max_aio = cct->_conf->rgw_bucket_index_max_aio;
+  boost::system::error_code ec;
+  if (y) {
+    // run on the coroutine's executor and suspend until completion
+    auto yield = y.get_yield_context();
+    auto ex = yield.get_executor();
+    auto writer = ReshardStatusWriter{*dpp, ex, index_pool, status};
+
+    rgwrados::shard_io::async_writes(writer, bucket_objs, max_aio, yield[ec]);
+  } else {
+    // run a strand on the system executor and block on a condition variable
+    auto ex = boost::asio::make_strand(boost::asio::system_executor{});
+    auto writer = ReshardStatusWriter{*dpp, ex, index_pool, status};
+
+    maybe_warn_about_blocking(dpp);
+    rgwrados::shard_io::async_writes(writer, bucket_objs, max_aio,
+                                     ceph::async::use_blocked[ec]);
   }
-  return r;
+  return ceph::from_error_code(ec);
 }
 
+struct ReshardTrimWriter : rgwrados::shard_io::RadosWriter {
+  using RadosWriter::RadosWriter;
+  void prepare_write(int, librados::ObjectWriteOperation& op) override {
+    cls_rgw_bucket_reshard_log_trim(op);
+  }
+  Result on_complete(int, boost::system::error_code ec) override {
+    // keep trimming until ENODATA (no_message_available)
+    if (!ec) {
+      return Result::Retry;
+    } else if (ec == boost::system::errc::no_message_available) {
+      return Result::Success;
+    } else {
+      return Result::Error;
+    }
+  }
+  void add_prefix(std::ostream& out) const override {
+    out << "trim reshard logs: ";
+  }
+};
+
 int RGWSI_BucketIndex_RADOS::trim_reshard_log(const DoutPrefixProvider* dpp,
                                               optional_yield y,
                                               const RGWBucketInfo& bucket_info)
@@ -525,7 +758,249 @@ int RGWSI_BucketIndex_RADOS::trim_reshard_log(const DoutPrefixProvider* dpp,
   if (r < 0) {
     return r;
   }
-  return CLSRGWIssueReshardLogTrim(index_pool, bucket_objs, cct->_conf->rgw_bucket_index_max_aio)();
+
+  const size_t max_aio = cct->_conf->rgw_bucket_index_max_aio;
+  boost::system::error_code ec;
+  if (y) {
+    // run on the coroutine's executor and suspend until completion
+    auto yield = y.get_yield_context();
+    auto ex = yield.get_executor();
+    auto writer = ReshardTrimWriter{*dpp, ex, index_pool};
+
+    rgwrados::shard_io::async_writes(writer, bucket_objs, max_aio, yield[ec]);
+  } else {
+    // run a strand on the system executor and block on a condition variable
+    auto ex = boost::asio::make_strand(boost::asio::system_executor{});
+    auto writer = ReshardTrimWriter{*dpp, ex, index_pool};
+
+    maybe_warn_about_blocking(dpp);
+    rgwrados::shard_io::async_writes(writer, bucket_objs, max_aio,
+                                     ceph::async::use_blocked[ec]);
+  }
+  return ceph::from_error_code(ec);
+}
+
+struct TagTimeoutWriter : rgwrados::shard_io::RadosWriter {
+  uint64_t timeout;
+  TagTimeoutWriter(const DoutPrefixProvider& dpp,
+                   boost::asio::any_io_executor ex,
+                   librados::IoCtx& ioctx,
+                   uint64_t timeout)
+    : RadosWriter(dpp, std::move(ex), ioctx), timeout(timeout)
+  {}
+  void prepare_write(int, librados::ObjectWriteOperation& op) override {
+    cls_rgw_bucket_set_tag_timeout(op, timeout);
+  }
+  void add_prefix(std::ostream& out) const override {
+    out << "set tag timeouts: ";
+  }
+};
+
+int RGWSI_BucketIndex_RADOS::set_tag_timeout(const DoutPrefixProvider* dpp,
+                                             optional_yield y,
+                                             const RGWBucketInfo& bucket_info,
+                                             uint64_t timeout)
+{
+  librados::IoCtx index_pool;
+  map<int, string> bucket_objs;
+
+  int r = open_bucket_index(dpp, bucket_info, std::nullopt, bucket_info.layout.current_index, &index_pool, &bucket_objs, nullptr);
+  if (r < 0) {
+    return r;
+  }
+
+  const size_t max_aio = cct->_conf->rgw_bucket_index_max_aio;
+  boost::system::error_code ec;
+  if (y) {
+    // run on the coroutine's executor and suspend until completion
+    auto yield = y.get_yield_context();
+    auto ex = yield.get_executor();
+    auto writer = TagTimeoutWriter{*dpp, ex, index_pool, timeout};
+
+    rgwrados::shard_io::async_writes(writer, bucket_objs, max_aio, yield[ec]);
+  } else {
+    // run a strand on the system executor and block on a condition variable
+    auto ex = boost::asio::make_strand(boost::asio::system_executor{});
+    auto writer = TagTimeoutWriter{*dpp, ex, index_pool, timeout};
+
+    maybe_warn_about_blocking(dpp);
+    rgwrados::shard_io::async_writes(writer, bucket_objs, max_aio,
+                                     ceph::async::use_blocked[ec]);
+  }
+  return ceph::from_error_code(ec);
+}
+
+struct CheckReader : rgwrados::shard_io::RadosReader {
+  std::map<int, bufferlist>& buffers;
+
+  CheckReader(const DoutPrefixProvider& dpp,
+              boost::asio::any_io_executor ex,
+              librados::IoCtx& ioctx,
+              std::map<int, bufferlist>& buffers)
+    : RadosReader(dpp, std::move(ex), ioctx), buffers(buffers)
+  {}
+  void prepare_read(int shard, librados::ObjectReadOperation& op) override {
+    auto& bl = buffers[shard];
+    cls_rgw_bucket_check_index(op, bl);
+  }
+  void add_prefix(std::ostream& out) const override {
+    out << "check index shards: ";
+  }
+};
+
+int RGWSI_BucketIndex_RADOS::check_index(const DoutPrefixProvider *dpp, optional_yield y,
+                                         const RGWBucketInfo& bucket_info,
+                                         std::map<int, bufferlist>& buffers)
+{
+  librados::IoCtx index_pool;
+  std::map<int, std::string> bucket_objs;
+
+  int r = open_bucket_index(dpp, bucket_info, std::nullopt, bucket_info.layout.current_index, &index_pool, &bucket_objs, nullptr);
+  if (r < 0) {
+    return r;
+  }
+
+  const size_t max_aio = cct->_conf->rgw_bucket_index_max_aio;
+  boost::system::error_code ec;
+  if (y) {
+    // run on the coroutine's executor and suspend until completion
+    auto yield = y.get_yield_context();
+    auto ex = yield.get_executor();
+    auto reader = CheckReader{*dpp, ex, index_pool, buffers};
+
+    rgwrados::shard_io::async_reads(reader, bucket_objs, max_aio, yield[ec]);
+  } else {
+    // run a strand on the system executor and block on a condition variable
+    auto ex = boost::asio::make_strand(boost::asio::system_executor{});
+    auto reader = CheckReader{*dpp, ex, index_pool, buffers};
+
+    maybe_warn_about_blocking(dpp);
+    rgwrados::shard_io::async_reads(reader, bucket_objs, max_aio,
+                                    ceph::async::use_blocked[ec]);
+  }
+  return ceph::from_error_code(ec);
+}
+
+struct RebuildWriter : rgwrados::shard_io::RadosWriter {
+  using RadosWriter::RadosWriter;
+  void prepare_write(int, librados::ObjectWriteOperation& op) override {
+    cls_rgw_bucket_rebuild_index(op);
+  }
+  void add_prefix(std::ostream& out) const override {
+    out << "rebuild index shards: ";
+  }
+};
+
+int RGWSI_BucketIndex_RADOS::rebuild_index(const DoutPrefixProvider *dpp,
+                                           optional_yield y,
+                                           const RGWBucketInfo& bucket_info)
+{
+  librados::IoCtx index_pool;
+  map<int, string> bucket_objs;
+
+  int r = open_bucket_index(dpp, bucket_info, std::nullopt, bucket_info.layout.current_index, &index_pool, &bucket_objs, nullptr);
+  if (r < 0) {
+    return r;
+  }
+
+  const size_t max_aio = cct->_conf->rgw_bucket_index_max_aio;
+  boost::system::error_code ec;
+  if (y) {
+    // run on the coroutine's executor and suspend until completion
+    auto yield = y.get_yield_context();
+    auto ex = yield.get_executor();
+    auto writer = RebuildWriter{*dpp, ex, index_pool};
+
+    rgwrados::shard_io::async_writes(writer, bucket_objs, max_aio, yield[ec]);
+  } else {
+    // run a strand on the system executor and block on a condition variable
+    auto ex = boost::asio::make_strand(boost::asio::system_executor{});
+    auto writer = RebuildWriter{*dpp, ex, index_pool};
+
+    maybe_warn_about_blocking(dpp);
+    rgwrados::shard_io::async_writes(writer, bucket_objs, max_aio,
+                                     ceph::async::use_blocked[ec]);
+  }
+  return ceph::from_error_code(ec);
+}
+
+struct ListReader : rgwrados::shard_io::RadosReader {
+  const cls_rgw_obj_key& start_obj;
+  const std::string& prefix;
+  const std::string& delimiter;
+  uint32_t num_entries;
+  bool list_versions;
+  std::map<int, rgw_cls_list_ret>& results;
+
+  ListReader(const DoutPrefixProvider& dpp,
+             boost::asio::any_io_executor ex,
+             librados::IoCtx& ioctx,
+             const cls_rgw_obj_key& start_obj,
+             const std::string& prefix,
+             const std::string& delimiter,
+             uint32_t num_entries, bool list_versions,
+             std::map<int, rgw_cls_list_ret>& results)
+    : RadosReader(dpp, std::move(ex), ioctx),
+      start_obj(start_obj), prefix(prefix), delimiter(delimiter),
+      num_entries(num_entries), list_versions(list_versions),
+      results(results)
+  {}
+  void prepare_read(int shard, librados::ObjectReadOperation& op) override {
+    // set the marker depending on whether we've already queried this
+    // shard and gotten a RGWBIAdvanceAndRetryError (defined
+    // constant) return value; if we have use the marker in the return
+    // to advance the search, otherwise use the marker passed in by the
+    // caller
+    auto& result = results[shard];
+    const cls_rgw_obj_key& marker =
+        result.marker.empty() ? start_obj : result.marker;
+    cls_rgw_bucket_list_op(op, marker, prefix, delimiter,
+                           num_entries, list_versions, &result);
+  }
+  Result on_complete(int, boost::system::error_code ec) override {
+    if (ec.value() == -RGWBIAdvanceAndRetryError) {
+      return Result::Retry;
+    } else if (ec) {
+      return Result::Error;
+    } else {
+      return Result::Success;
+    }
+  }
+  void add_prefix(std::ostream& out) const override {
+    out << "sharded list objects: ";
+  }
+};
+
+int RGWSI_BucketIndex_RADOS::list_objects(const DoutPrefixProvider* dpp, optional_yield y,
+                                          librados::IoCtx& index_pool,
+                                          const std::map<int, string>& bucket_objs,
+                                          const cls_rgw_obj_key& start_obj,
+                                          const std::string& prefix,
+                                          const std::string& delimiter,
+                                          uint32_t num_entries, bool list_versions,
+                                          std::map<int, rgw_cls_list_ret>& results)
+{
+  const size_t max_aio = cct->_conf->rgw_bucket_index_max_aio;
+  boost::system::error_code ec;
+  if (y) {
+    // run on the coroutine's executor and suspend until completion
+    auto yield = y.get_yield_context();
+    auto ex = yield.get_executor();
+    auto reader = ListReader{*dpp, ex, index_pool, start_obj, prefix, delimiter,
+                             num_entries, list_versions, results};
+
+    rgwrados::shard_io::async_reads(reader, bucket_objs, max_aio, yield[ec]);
+  } else {
+    // run a strand on the system executor and block on a condition variable
+    auto ex = boost::asio::make_strand(boost::asio::system_executor{});
+    auto reader = ListReader{*dpp, ex, index_pool, start_obj, prefix, delimiter,
+                             num_entries, list_versions, results};
+
+    maybe_warn_about_blocking(dpp);
+    rgwrados::shard_io::async_reads(reader, bucket_objs, max_aio,
+                                    ceph::async::use_blocked[ec]);
+  }
+  return ceph::from_error_code(ec);
 }
 
 int RGWSI_BucketIndex_RADOS::handle_overwrite(const DoutPrefixProvider *dpp,
index 24a9fa14a68a083bb2b85dee7e761c4dc78f7c36..2103c1c08a16d50b5ca94b82439795b3bbd8dd02 100644 (file)
@@ -24,6 +24,7 @@
 #include "svc_tier_rados.h"
 
 struct rgw_bucket_dir_header;
+struct rgw_cls_list_ret;
 
 class RGWSI_BILog_RADOS;
 
@@ -145,6 +146,26 @@ public:
   int trim_reshard_log(const DoutPrefixProvider* dpp, optional_yield,
                        const RGWBucketInfo& bucket_info);
 
+  int set_tag_timeout(const DoutPrefixProvider *dpp, optional_yield y,
+                      const RGWBucketInfo& bucket_info, uint64_t timeout);
+
+  int check_index(const DoutPrefixProvider *dpp, optional_yield y,
+                  const RGWBucketInfo& bucket_info,
+                  std::map<int, bufferlist>& buffers);
+
+  int rebuild_index(const DoutPrefixProvider *dpp, optional_yield y,
+                    const RGWBucketInfo& bucket_info);
+
+  /// Read the requested number of entries from each index shard object.
+  int list_objects(const DoutPrefixProvider* dpp, optional_yield y,
+                   librados::IoCtx& index_pool,
+                   const std::map<int, std::string>& bucket_objs,
+                   const cls_rgw_obj_key& start_obj,
+                   const std::string& prefix,
+                   const std::string& delimiter,
+                   uint32_t num_entries, bool list_versions,
+                   std::map<int, rgw_cls_list_ret>& results);
+
   int handle_overwrite(const DoutPrefixProvider *dpp, const RGWBucketInfo& info,
                        const RGWBucketInfo& orig_info,
                       optional_yield y) override;
index 71bcbd5660d585b82041d347905744662ac1f1cc..ac35378f3cfba7efb065c97f1d75da3bffd9987e 100644 (file)
@@ -5,11 +5,14 @@
 #include "svc_bi_rados.h"
 
 #include "rgw_asio_thread.h"
+#include "driver/rados/shard_io.h"
 #include "cls/rgw/cls_rgw_client.h"
+#include "common/async/blocked_completion.h"
 
 #define dout_subsys ceph_subsys_rgw
 
 using namespace std;
+using rgwrados::shard_io::Result;
 
 RGWSI_BILog_RADOS::RGWSI_BILog_RADOS(CephContext *cct) : RGWServiceInstance(cct)
 {
@@ -20,6 +23,35 @@ void RGWSI_BILog_RADOS::init(RGWSI_BucketIndex_RADOS *bi_rados_svc)
   svc.bi = bi_rados_svc;
 }
 
+struct TrimWriter : rgwrados::shard_io::RadosWriter {
+  const BucketIndexShardsManager& start;
+  const BucketIndexShardsManager& end;
+
+  TrimWriter(const DoutPrefixProvider& dpp,
+             boost::asio::any_io_executor ex,
+             librados::IoCtx& ioctx,
+             const BucketIndexShardsManager& start,
+             const BucketIndexShardsManager& end)
+    : RadosWriter(dpp, std::move(ex), ioctx), start(start), end(end)
+  {}
+  void prepare_write(int shard, librados::ObjectWriteOperation& op) override {
+    cls_rgw_bilog_trim(op, start.get(shard, ""), end.get(shard, ""));
+  }
+  Result on_complete(int, boost::system::error_code ec) override {
+    // continue trimming until -ENODATA or other error
+    if (ec == boost::system::errc::no_message_available) {
+      return Result::Success;
+    } else if (ec) {
+      return Result::Error;
+    } else {
+      return Result::Retry;
+    }
+  }
+  void add_prefix(std::ostream& out) const override {
+    out << "trim bilog shards: ";
+  }
+};
+
 int RGWSI_BILog_RADOS::log_trim(const DoutPrefixProvider *dpp, optional_yield y,
                                const RGWBucketInfo& bucket_info,
                                const rgw::bucket_log_layout_generation& log_layout,
@@ -49,11 +81,37 @@ int RGWSI_BILog_RADOS::log_trim(const DoutPrefixProvider *dpp, optional_yield y,
     return r;
   }
 
-  maybe_warn_about_blocking(dpp); // TODO: use AioTrottle
-  return CLSRGWIssueBILogTrim(index_pool, start_marker_mgr, end_marker_mgr, bucket_objs,
-                             cct->_conf->rgw_bucket_index_max_aio)();
+  const size_t max_aio = cct->_conf->rgw_bucket_index_max_aio;
+  boost::system::error_code ec;
+  if (y) {
+    // run on the coroutine's executor and suspend until completion
+    auto yield = y.get_yield_context();
+    auto ex = yield.get_executor();
+    auto writer = TrimWriter{*dpp, ex, index_pool, start_marker_mgr, end_marker_mgr};
+
+    rgwrados::shard_io::async_writes(writer, bucket_objs, max_aio, yield[ec]);
+  } else {
+    // run a strand on the system executor and block on a condition variable
+    auto ex = boost::asio::make_strand(boost::asio::system_executor{});
+    auto writer = TrimWriter{*dpp, ex, index_pool, start_marker_mgr, end_marker_mgr};
+
+    maybe_warn_about_blocking(dpp);
+    rgwrados::shard_io::async_writes(writer, bucket_objs, max_aio,
+                                     ceph::async::use_blocked[ec]);
+  }
+  return ceph::from_error_code(ec);
 }
 
+struct StartWriter : rgwrados::shard_io::RadosWriter {
+  using RadosWriter::RadosWriter;
+  void prepare_write(int, librados::ObjectWriteOperation& op) override {
+    cls_rgw_bilog_start(op);
+  }
+  void add_prefix(std::ostream& out) const override {
+    out << "restart bilog shards: ";
+  }
+};
+
 int RGWSI_BILog_RADOS::log_start(const DoutPrefixProvider *dpp, optional_yield y,
                                  const RGWBucketInfo& bucket_info,
                                  const rgw::bucket_log_layout_generation& log_layout,
@@ -66,10 +124,37 @@ int RGWSI_BILog_RADOS::log_start(const DoutPrefixProvider *dpp, optional_yield y
   if (r < 0)
     return r;
 
-  maybe_warn_about_blocking(dpp); // TODO: use AioTrottle
-  return CLSRGWIssueResyncBucketBILog(index_pool, bucket_objs, cct->_conf->rgw_bucket_index_max_aio)();
+  const size_t max_aio = cct->_conf->rgw_bucket_index_max_aio;
+  boost::system::error_code ec;
+  if (y) {
+    // run on the coroutine's executor and suspend until completion
+    auto yield = y.get_yield_context();
+    auto ex = yield.get_executor();
+    auto writer = StartWriter{*dpp, ex, index_pool};
+
+    rgwrados::shard_io::async_writes(writer, bucket_objs, max_aio, yield[ec]);
+  } else {
+    // run a strand on the system executor and block on a condition variable
+    auto ex = boost::asio::make_strand(boost::asio::system_executor{});
+    auto writer = StartWriter{*dpp, ex, index_pool};
+
+    maybe_warn_about_blocking(dpp);
+    rgwrados::shard_io::async_writes(writer, bucket_objs, max_aio,
+                                     ceph::async::use_blocked[ec]);
+  }
+  return ceph::from_error_code(ec);
 }
 
+struct StopWriter : rgwrados::shard_io::RadosWriter {
+  using RadosWriter::RadosWriter;
+  void prepare_write(int, librados::ObjectWriteOperation& op) override {
+    cls_rgw_bilog_stop(op);
+  }
+  void add_prefix(std::ostream& out) const override {
+    out << "stop bilog shards: ";
+  }
+};
+
 int RGWSI_BILog_RADOS::log_stop(const DoutPrefixProvider *dpp, optional_yield y,
                                 const RGWBucketInfo& bucket_info,
                                 const rgw::bucket_log_layout_generation& log_layout,
@@ -82,8 +167,25 @@ int RGWSI_BILog_RADOS::log_stop(const DoutPrefixProvider *dpp, optional_yield y,
   if (r < 0)
     return r;
 
-  maybe_warn_about_blocking(dpp); // TODO: use AioTrottle
-  return CLSRGWIssueBucketBILogStop(index_pool, bucket_objs, cct->_conf->rgw_bucket_index_max_aio)();
+  const size_t max_aio = cct->_conf->rgw_bucket_index_max_aio;
+  boost::system::error_code ec;
+  if (y) {
+    // run on the coroutine's executor and suspend until completion
+    auto yield = y.get_yield_context();
+    auto ex = yield.get_executor();
+    auto writer = StopWriter{*dpp, ex, index_pool};
+
+    rgwrados::shard_io::async_writes(writer, bucket_objs, max_aio, yield[ec]);
+  } else {
+    // run a strand on the system executor and block on a condition variable
+    auto ex = boost::asio::make_strand(boost::asio::system_executor{});
+    auto writer = StopWriter{*dpp, ex, index_pool};
+
+    maybe_warn_about_blocking(dpp);
+    rgwrados::shard_io::async_writes(writer, bucket_objs, max_aio,
+                                     ceph::async::use_blocked[ec]);
+  }
+  return ceph::from_error_code(ec);
 }
 
 static void build_bucket_index_marker(const string& shard_id_str,
@@ -96,6 +198,62 @@ static void build_bucket_index_marker(const string& shard_id_str,
   }
 }
 
+struct LogReader : rgwrados::shard_io::RadosReader {
+  const BucketIndexShardsManager& start;
+  uint32_t max;
+  std::map<int, cls_rgw_bi_log_list_ret>& logs;
+
+  LogReader(const DoutPrefixProvider& dpp, boost::asio::any_io_executor ex,
+            librados::IoCtx& ioctx, const BucketIndexShardsManager& start,
+            uint32_t max, std::map<int, cls_rgw_bi_log_list_ret>& logs)
+    : RadosReader(dpp, std::move(ex), ioctx),
+      start(start), max(max), logs(logs)
+  {}
+  void prepare_read(int shard, librados::ObjectReadOperation& op) override {
+    auto& result = logs[shard];
+    cls_rgw_bilog_list(op, start.get(shard, ""), max, &result, nullptr);
+  }
+  void add_prefix(std::ostream& out) const override {
+    out << "list bilog shards: ";
+  }
+};
+
+static int bilog_list(const DoutPrefixProvider* dpp, optional_yield y,
+                      RGWSI_BucketIndex_RADOS* svc_bi,
+                      const RGWBucketInfo& bucket_info,
+                      const rgw::bucket_log_layout_generation& log_layout,
+                      const BucketIndexShardsManager& start,
+                      int shard_id, uint32_t max,
+                      std::map<int, cls_rgw_bi_log_list_ret>& logs)
+{
+  librados::IoCtx index_pool;
+  map<int, string> oids;
+  const auto& current_index = rgw::log_to_index_layout(log_layout);
+  int r = svc_bi->open_bucket_index(dpp, bucket_info, shard_id, current_index, &index_pool, &oids, nullptr);
+  if (r < 0)
+    return r;
+
+  const size_t max_aio = dpp->get_cct()->_conf->rgw_bucket_index_max_aio;
+  boost::system::error_code ec;
+  if (y) {
+    // run on the coroutine's executor and suspend until completion
+    auto yield = y.get_yield_context();
+    auto ex = yield.get_executor();
+    auto reader = LogReader{*dpp, ex, index_pool, start, max, logs};
+
+    rgwrados::shard_io::async_reads(reader, oids, max_aio, yield[ec]);
+  } else {
+    // run a strand on the system executor and block on a condition variable
+    auto ex = boost::asio::make_strand(boost::asio::system_executor{});
+    auto reader = LogReader{*dpp, ex, index_pool, start, max, logs};
+
+    maybe_warn_about_blocking(dpp);
+    rgwrados::shard_io::async_reads(reader, oids, max_aio,
+                                    ceph::async::use_blocked[ec]);
+  }
+  return ceph::from_error_code(ec);
+}
+
 int RGWSI_BILog_RADOS::log_list(const DoutPrefixProvider *dpp, optional_yield y,
                                const RGWBucketInfo& bucket_info,
                                const rgw::bucket_log_layout_generation& log_layout,
@@ -105,26 +263,19 @@ int RGWSI_BILog_RADOS::log_list(const DoutPrefixProvider *dpp, optional_yield y,
   ldpp_dout(dpp, 20) << __func__ << ": " << bucket_info.bucket << " marker " << marker << " shard_id=" << shard_id << " max " << max << dendl;
   result.clear();
 
-  librados::IoCtx index_pool;
-  map<int, string> oids;
-  map<int, cls_rgw_bi_log_list_ret> bi_log_lists;
-  const auto& current_index = rgw::log_to_index_layout(log_layout);
-  int r = svc.bi->open_bucket_index(dpp, bucket_info, shard_id, current_index, &index_pool, &oids, nullptr);
-  if (r < 0)
-    return r;
-
   BucketIndexShardsManager marker_mgr;
-  bool has_shards = (oids.size() > 1 || shard_id >= 0);
+  const bool has_shards = (shard_id >= 0);
   // If there are multiple shards for the bucket index object, the marker
   // should have the pattern '{shard_id_1}#{shard_marker_1},{shard_id_2}#
   // {shard_marker_2}...', if there is no sharding, the bi_log_list should
   // only contain one record, and the key is the bucket instance id.
-  r = marker_mgr.from_string(marker, shard_id);
+  int r = marker_mgr.from_string(marker, shard_id);
   if (r < 0)
     return r;
 
-  maybe_warn_about_blocking(dpp); // TODO: use AioTrottle
-  r = CLSRGWIssueBILogList(index_pool, marker_mgr, max, oids, bi_log_lists, cct->_conf->rgw_bucket_index_max_aio)();
+  std::map<int, cls_rgw_bi_log_list_ret> bi_log_lists;
+  r = bilog_list(dpp, y, svc.bi, bucket_info, log_layout, marker_mgr,
+                 shard_id, max, bi_log_lists);
   if (r < 0)
     return r;