return manager->aio_operate(io_ctx, shard_id, oid, &op);
}
+void cls_rgw_bucket_set_tag_timeout(librados::ObjectWriteOperation& op,
+ uint64_t timeout)
+{
+ const auto call = rgw_cls_tag_timeout_op{.tag_timeout = timeout};
+ bufferlist in;
+ encode(call, in);
+ op.exec(RGW_CLASS, RGW_BUCKET_SET_TAG_TIMEOUT, in);
+}
+
static bool issue_bucket_set_tag_timeout_op(librados::IoCtx& io_ctx,
const int shard_id,
const string& oid,
uint64_t timeout,
BucketIndexAioManager *manager) {
- bufferlist in;
- rgw_cls_tag_timeout_op call;
- call.tag_timeout = timeout;
- encode(call, in);
ObjectWriteOperation op;
- op.exec(RGW_CLASS, RGW_BUCKET_SET_TAG_TIMEOUT, in);
+ cls_rgw_bucket_set_tag_timeout(op, timeout);
return manager->aio_operate(io_ctx, shard_id, oid, &op);
}
return issue_bi_log_trim(io_ctx, oid, shard_id, start_marker_mgr, end_marker_mgr, &manager);
}
+void cls_rgw_bucket_reshard_log_trim(librados::ObjectWriteOperation& op)
+{
+ bufferlist in;
+ op.exec(RGW_CLASS, RGW_RESHARD_LOG_TRIM, in);
+}
+
static bool issue_reshard_log_trim(librados::IoCtx& io_ctx, const string& oid, int shard_id,
BucketIndexAioManager *manager) {
- bufferlist in;
ObjectWriteOperation op;
- op.exec(RGW_CLASS, RGW_RESHARD_LOG_TRIM, in);
+ cls_rgw_bucket_reshard_log_trim(op);
return manager->aio_operate(io_ctx, shard_id, oid, &op);
}
return issue_reshard_log_trim(io_ctx, oid, shard_id, &manager);
}
+void cls_rgw_bucket_check_index(librados::ObjectReadOperation& op,
+ bufferlist& out)
+{
+ bufferlist in;
+ op.exec(RGW_CLASS, RGW_BUCKET_CHECK_INDEX, in, &out, nullptr);
+}
+
+void cls_rgw_bucket_check_index_decode(const bufferlist& out,
+ rgw_cls_check_index_ret& result)
+{
+ auto p = out.cbegin();
+ decode(result, p);
+}
+
static bool issue_bucket_check_index_op(IoCtx& io_ctx, const int shard_id, const string& oid, BucketIndexAioManager *manager,
rgw_cls_check_index_ret *pdata) {
bufferlist in;
return issue_bucket_check_index_op(io_ctx, shard_id, oid, &manager, &result[shard_id]);
}
+void cls_rgw_bucket_rebuild_index(librados::ObjectWriteOperation& op)
+{
+ bufferlist in;
+ op.exec(RGW_CLASS, RGW_BUCKET_REBUILD_INDEX, in);
+}
+
static bool issue_bucket_rebuild_index_op(IoCtx& io_ctx, const int shard_id, const string& oid,
BucketIndexAioManager *manager) {
- bufferlist in;
librados::ObjectWriteOperation op;
- op.exec(RGW_CLASS, RGW_BUCKET_REBUILD_INDEX, in);
+ cls_rgw_bucket_rebuild_index(op);
return manager->aio_operate(io_ctx, shard_id, oid, &op);
}
0, false, &manager, &result[shard_id]);
}
-static bool issue_resync_bi_log(librados::IoCtx& io_ctx, const int shard_id, const string& oid, BucketIndexAioManager *manager)
+void cls_rgw_bilog_start(ObjectWriteOperation& op)
{
bufferlist in;
- librados::ObjectWriteOperation op;
op.exec(RGW_CLASS, RGW_BI_LOG_RESYNC, in);
+}
+
+static bool issue_resync_bi_log(librados::IoCtx& io_ctx, const int shard_id, const string& oid, BucketIndexAioManager *manager)
+{
+ librados::ObjectWriteOperation op;
+ cls_rgw_bilog_start(op);
return manager->aio_operate(io_ctx, shard_id, oid, &op);
}
return issue_resync_bi_log(io_ctx, shard_id, oid, &manager);
}
-static bool issue_bi_log_stop(librados::IoCtx& io_ctx, const int shard_id, const string& oid, BucketIndexAioManager *manager)
+void cls_rgw_bilog_stop(ObjectWriteOperation& op)
{
bufferlist in;
- librados::ObjectWriteOperation op;
op.exec(RGW_CLASS, RGW_BI_LOG_STOP, in);
+}
+
+static bool issue_bi_log_stop(librados::IoCtx& io_ctx, const int shard_id, const string& oid, BucketIndexAioManager *manager)
+{
+ librados::ObjectWriteOperation op;
+ cls_rgw_bilog_stop(op);
return manager->aio_operate(io_ctx, shard_id, oid, &op);
}
op.exec(RGW_CLASS, RGW_RESHARD_REMOVE, in);
}
-int cls_rgw_set_bucket_resharding(librados::IoCtx& io_ctx, const string& oid,
- const cls_rgw_bucket_instance_entry& entry)
-{
- bufferlist in, out;
- cls_rgw_set_bucket_resharding_op call;
- call.entry = entry;
- encode(call, in);
- librados::ObjectWriteOperation op;
- op.exec(RGW_CLASS, RGW_SET_BUCKET_RESHARDING, in);
- return io_ctx.operate(oid, &op);
-}
-
-int cls_rgw_clear_bucket_resharding(librados::IoCtx& io_ctx, const string& oid)
+void cls_rgw_clear_bucket_resharding(librados::ObjectWriteOperation& op)
{
- bufferlist in, out;
+ bufferlist in;
cls_rgw_clear_bucket_resharding_op call;
encode(call, in);
- librados::ObjectWriteOperation op;
op.exec(RGW_CLASS, RGW_CLEAR_BUCKET_RESHARDING, in);
- return io_ctx.operate(oid, &op);
}
-int cls_rgw_get_bucket_resharding(librados::IoCtx& io_ctx, const string& oid,
- cls_rgw_bucket_instance_entry *entry)
+void cls_rgw_get_bucket_resharding(librados::ObjectReadOperation& op,
+ bufferlist& out)
{
- bufferlist in, out;
+ bufferlist in;
cls_rgw_get_bucket_resharding_op call;
encode(call, in);
- int r= io_ctx.exec(oid, RGW_CLASS, RGW_GET_BUCKET_RESHARDING, in, out);
- if (r < 0)
- return r;
+ op.exec(RGW_CLASS, RGW_GET_BUCKET_RESHARDING, in, &out, nullptr);
+}
+void cls_rgw_get_bucket_resharding_decode(const bufferlist& out,
+ cls_rgw_bucket_instance_entry& entry)
+{
cls_rgw_get_bucket_resharding_ret op_ret;
auto iter = out.cbegin();
- try {
- decode(op_ret, iter);
- } catch (ceph::buffer::error& err) {
- return -EIO;
- }
-
- *entry = op_ret.new_instance;
+ decode(op_ret, iter);
- return 0;
+ entry = std::move(op_ret.new_instance);
}
void cls_rgw_guard_bucket_resharding(librados::ObjectOperation& op, int ret_err)
op.exec(RGW_CLASS, RGW_GUARD_BUCKET_RESHARDING, in);
}
-static bool issue_set_bucket_resharding(librados::IoCtx& io_ctx,
- const int shard_id, const string& oid,
- const cls_rgw_bucket_instance_entry& entry,
- BucketIndexAioManager *manager) {
+void cls_rgw_set_bucket_resharding(librados::ObjectWriteOperation& op,
+ cls_rgw_reshard_status status)
+{
bufferlist in;
cls_rgw_set_bucket_resharding_op call;
- call.entry = entry;
+ call.entry.reshard_status = status;
encode(call, in);
- librados::ObjectWriteOperation op;
+
op.assert_exists(); // the shard must exist; if not fail rather than recreate
op.exec(RGW_CLASS, RGW_SET_BUCKET_RESHARDING, in);
+}
+
+static bool issue_set_bucket_resharding(librados::IoCtx& io_ctx,
+ const int shard_id, const string& oid,
+ const cls_rgw_bucket_instance_entry& entry,
+ BucketIndexAioManager *manager) {
+ librados::ObjectWriteOperation op;
+ cls_rgw_set_bucket_resharding(op, entry.reshard_status);
return manager->aio_operate(io_ctx, shard_id, oid, &op);
}
int operator()();
}; // class CLSRGWConcurrentIO
+void cls_rgw_bucket_set_tag_timeout(librados::ObjectWriteOperation& op,
+ uint64_t timeout);
class CLSRGWIssueBucketIndexInit : public CLSRGWConcurrentIO {
protected:
CLSRGWConcurrentIO(io_ctx, _bucket_objs, max_aio) {}
};
+void cls_rgw_bucket_check_index(librados::ObjectReadOperation& op,
+ bufferlist& out);
+// decode the response; may throw buffer::error
+void cls_rgw_bucket_check_index_decode(const bufferlist& out,
+ rgw_cls_check_index_ret& result);
+
/**
* Check the bucket index.
*
virtual ~CLSRGWIssueBucketCheck() override {}
};
+void cls_rgw_bucket_rebuild_index(librados::ObjectWriteOperation& op);
+
class CLSRGWIssueBucketRebuild : public CLSRGWConcurrentIO {
protected:
int issue_op(int shard_id, const std::string& oid) override;
virtual ~CLSRGWIssueSetBucketResharding() override {}
};
+void cls_rgw_bilog_start(librados::ObjectWriteOperation& op);
+void cls_rgw_bilog_stop(librados::ObjectWriteOperation& op);
+
class CLSRGWIssueResyncBucketBILog : public CLSRGWConcurrentIO {
protected:
int issue_op(int shard_id, const std::string& oid);
// cls_rgw in the T+4 (X) release.
void cls_rgw_guard_bucket_resharding(librados::ObjectOperation& op, int ret_err);
-// these overloads which call io_ctx.operate() should not be called in the rgw.
-// rgw_rados_operate() should be called after the overloads w/o calls to io_ctx.operate()
-#ifndef CLS_CLIENT_HIDE_IOCTX
-int cls_rgw_set_bucket_resharding(librados::IoCtx& io_ctx, const std::string& oid,
- const cls_rgw_bucket_instance_entry& entry);
-int cls_rgw_clear_bucket_resharding(librados::IoCtx& io_ctx, const std::string& oid);
-int cls_rgw_get_bucket_resharding(librados::IoCtx& io_ctx, const std::string& oid,
- cls_rgw_bucket_instance_entry *entry);
-#endif
+void cls_rgw_set_bucket_resharding(librados::ObjectWriteOperation& op,
+ cls_rgw_reshard_status status);
+void cls_rgw_clear_bucket_resharding(librados::ObjectWriteOperation& op);
+void cls_rgw_get_bucket_resharding(librados::ObjectReadOperation& op,
+ bufferlist& out);
+// decode the entry; may throw buffer::error
+void cls_rgw_get_bucket_resharding_decode(const bufferlist& out,
+ cls_rgw_bucket_instance_entry& entry);
+
+// Try to remove all reshard log entries from the bucket index. Return success
+// if any entries were removed, and -ENODATA once they're all gone.
+void cls_rgw_bucket_reshard_log_trim(librados::ObjectWriteOperation& op);
struct rgw_cls_tag_timeout_op
{
- uint64_t tag_timeout;
-
- rgw_cls_tag_timeout_op() : tag_timeout(0) {}
+ uint64_t tag_timeout = 0;
void encode(ceph::buffer::list &bl) const {
ENCODE_START(1, 1, bl);
map<RGWObjCategory, RGWStorageStats> *existing_stats,
map<RGWObjCategory, RGWStorageStats> *calculated_stats)
{
- librados::IoCtx index_pool;
-
- // key - bucket index object id
- // value - bucket index check OP returned result with the given bucket index object (shard)
- map<int, string> oids;
-
- int ret = svc.bi_rados->open_bucket_index(dpp, bucket_info, std::nullopt, bucket_info.layout.current_index, &index_pool, &oids, nullptr);
+ std::map<int, bufferlist> buffers;
+ int ret = svc.bi_rados->check_index(dpp, y, bucket_info, buffers);
if (ret < 0) {
return ret;
}
- // declare and pre-populate
- map<int, struct rgw_cls_check_index_ret> bucket_objs_ret;
- for (auto& iter : oids) {
- bucket_objs_ret.emplace(iter.first, rgw_cls_check_index_ret());
- }
-
- maybe_warn_about_blocking(dpp); // TODO: use AioTrottle
- ret = CLSRGWIssueBucketCheck(index_pool, oids, bucket_objs_ret, cct->_conf->rgw_bucket_index_max_aio)();
- if (ret < 0) {
- return ret;
- }
+ try {
+ // decode and accumulate the results
+ for (const auto& kv : buffers) {
+ rgw_cls_check_index_ret result;
+ cls_rgw_bucket_check_index_decode(kv.second, result);
- // aggregate results (from different shards if there are any)
- for (const auto& iter : bucket_objs_ret) {
- accumulate_raw_stats(iter.second.existing_header, *existing_stats);
- accumulate_raw_stats(iter.second.calculated_header, *calculated_stats);
+ accumulate_raw_stats(result.existing_header, *existing_stats);
+ accumulate_raw_stats(result.calculated_header, *calculated_stats);
+ }
+ } catch (const ceph::buffer::error&) {
+ return -EIO;
}
-
return 0;
}
-int RGWRados::bucket_rebuild_index(const DoutPrefixProvider *dpp, optional_yield y,
- const RGWBucketInfo& bucket_info)
-{
- librados::IoCtx index_pool;
- map<int, string> bucket_objs;
-
- int r = svc.bi_rados->open_bucket_index(dpp, bucket_info, std::nullopt, bucket_info.layout.current_index, &index_pool, &bucket_objs, nullptr);
- if (r < 0) {
- return r;
- }
-
- maybe_warn_about_blocking(dpp); // TODO: use AioTrottle
- return CLSRGWIssueBucketRebuild(index_pool, bucket_objs, cct->_conf->rgw_bucket_index_max_aio)();
-}
-
static int resync_encrypted_multipart(const DoutPrefixProvider* dpp,
optional_yield y, RGWRados* store,
RGWBucketInfo& bucket_info,
return 0;
}
+static int get_reshard_status(const DoutPrefixProvider* dpp, optional_yield y,
+ librados::IoCtx& ioctx, const std::string& oid,
+ cls_rgw_bucket_instance_entry& entry)
+{
+ librados::ObjectReadOperation op;
+ bufferlist bl;
+ cls_rgw_get_bucket_resharding(op, bl);
+
+ int ret = rgw_rados_operate(dpp, ioctx, oid, std::move(op), nullptr, y);
+ if (ret < 0) {
+ return ret;
+ }
+ try {
+ cls_rgw_get_bucket_resharding_decode(bl, entry);
+ return 0;
+ } catch (const buffer::error&) {
+ return -EIO;
+ }
+}
+
int RGWRados::block_while_resharding(RGWRados::BucketShard *bs,
const rgw_obj& obj_instance,
RGWBucketInfo& bucket_info,
constexpr int num_retries = 10;
for (int i = 1; i <= num_retries; i++) { // nb: 1-based for loop
auto& ref = bs->bucket_obj;
- ret = cls_rgw_get_bucket_resharding(ref.ioctx, ref.obj.oid, &entry);
+ ret = get_reshard_status(dpp, y, ref.ioctx, ref.obj.oid, entry);
if (ret == -ENOENT) {
ret = fetch_new_bucket_info("get_bucket_resharding_failed");
if (ret < 0) {
std::map<int, rgw_cls_list_ret> shard_list_results;
cls_rgw_obj_key start_after_key(start_after.name, start_after.instance);
- maybe_warn_about_blocking(dpp); // TODO: use AioTrottle
- r = CLSRGWIssueBucketList(ioctx, start_after_key, prefix, delimiter,
- num_entries_per_shard,
- list_versions, shard_oids, shard_list_results,
- cct->_conf->rgw_bucket_index_max_aio)();
+ r = svc.bi_rados->list_objects(dpp, y, ioctx, shard_oids, start_after_key,
+ prefix, delimiter, num_entries_per_shard,
+ list_versions, shard_list_results);
if (r < 0) {
ldpp_dout(dpp, 0) << __func__ <<
": CLSRGWIssueBucketList for " << bucket_info.bucket <<
const RGWBucketInfo& bucket_info,
std::map<RGWObjCategory, RGWStorageStats> *existing_stats,
std::map<RGWObjCategory, RGWStorageStats> *calculated_stats);
- int bucket_rebuild_index(const DoutPrefixProvider *dpp, optional_yield y,
- const RGWBucketInfo& bucket_info);
// Search the bucket for encrypted multipart uploads, and increase their mtime
// slightly to generate a bilog entry to trigger a resync to repair any
int RadosBucket::rebuild_index(const DoutPrefixProvider *dpp, optional_yield y)
{
- return store->getRados()->bucket_rebuild_index(dpp, y, info);
+ return store->svc()->bi_rados->rebuild_index(dpp, y, info);
}
int RadosBucket::set_tag_timeout(const DoutPrefixProvider *dpp, optional_yield y, uint64_t timeout)
{
- return store->getRados()->cls_obj_set_bucket_tag_timeout(dpp, info, timeout);
+ return store->svc()->bi_rados->set_tag_timeout(dpp, y, info, timeout);
}
int RadosBucket::purge_instance(const DoutPrefixProvider* dpp, optional_yield y)
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab ft=cpp
+#include <algorithm>
+#include <iterator>
+
#include "svc_bi_rados.h"
#include "svc_bilog_rados.h"
#include "svc_zone.h"
#include "rgw_zone.h"
#include "rgw_datalog.h"
+#include "driver/rados/shard_io.h"
#include "cls/rgw/cls_rgw_client.h"
+#include "common/async/blocked_completion.h"
#include "common/errno.h"
#define dout_subsys ceph_subsys_rgw
using namespace std;
+using rgwrados::shard_io::Result;
static string dir_oid_prefix = ".dir.";
return 0;
}
+struct IndexHeadReader : rgwrados::shard_io::RadosReader {
+ std::map<int, bufferlist>& buffers;
+
+ IndexHeadReader(const DoutPrefixProvider& dpp,
+ boost::asio::any_io_executor ex,
+ librados::IoCtx& ioctx,
+ std::map<int, bufferlist>& buffers)
+ : RadosReader(dpp, std::move(ex), ioctx), buffers(buffers)
+ {}
+ void prepare_read(int shard, librados::ObjectReadOperation& op) override {
+ auto& bl = buffers[shard];
+ op.omap_get_header(&bl, nullptr);
+ }
+ Result on_complete(int, boost::system::error_code ec) override {
+ // ignore ENOENT
+ if (ec && ec != boost::system::errc::no_such_file_or_directory) {
+ return Result::Error;
+ } else {
+ return Result::Success;
+ }
+ }
+ void add_prefix(std::ostream& out) const override {
+ out << "read dir headers: ";
+ }
+};
+
int RGWSI_BucketIndex_RADOS::cls_bucket_head(const DoutPrefixProvider *dpp,
const RGWBucketInfo& bucket_info,
const rgw::bucket_index_layout_generation& idx_layout,
if (r < 0)
return r;
- map<int, struct rgw_cls_list_ret> list_results;
- for (auto& iter : oids) {
- list_results.emplace(iter.first, rgw_cls_list_ret());
- }
+ // read omap headers into bufferlists
+ std::map<int, bufferlist> buffers;
- maybe_warn_about_blocking(dpp); // TODO: use AioTrottle
- r = CLSRGWIssueGetDirHeader(index_pool, oids, list_results,
- cct->_conf->rgw_bucket_index_max_aio)();
- if (r < 0)
- return r;
+ const size_t max_aio = cct->_conf->rgw_bucket_index_max_aio;
+ boost::system::error_code ec;
+ if (y) {
+ // run on the coroutine's executor and suspend until completion
+ auto yield = y.get_yield_context();
+ auto ex = yield.get_executor();
+ auto reader = IndexHeadReader{*dpp, ex, index_pool, buffers};
+
+ rgwrados::shard_io::async_reads(reader, oids, max_aio, yield[ec]);
+ } else {
+ // run a strand on the system executor and block on a condition variable
+ auto ex = boost::asio::make_strand(boost::asio::system_executor{});
+ auto reader = IndexHeadReader{*dpp, ex, index_pool, buffers};
- map<int, struct rgw_cls_list_ret>::iterator iter = list_results.begin();
- for(; iter != list_results.end(); ++iter) {
- headers->push_back(std::move(iter->second.dir.header));
+ maybe_warn_about_blocking(dpp);
+ rgwrados::shard_io::async_reads(reader, oids, max_aio,
+ ceph::async::use_blocked[ec]);
+ }
+ if (ec) {
+ return ceph::from_error_code(ec);
+ }
+
+ try {
+ std::transform(buffers.begin(), buffers.end(),
+ std::back_inserter(*headers),
+ [] (const auto& kv) {
+ rgw_bucket_dir_header header;
+ auto p = kv.second.cbegin();
+ decode(header, p);
+ return header;
+ });
+ } catch (const ceph::buffer::error&) {
+ return -EIO;
}
return 0;
}
+// init_index() is all-or-nothing so if we fail to initialize all shards,
+// we undo the creation of others. RevertibleWriter provides these semantics
+struct IndexInitWriter : rgwrados::shard_io::RadosRevertibleWriter {
+ bool judge_support_logrecord;
+
+ IndexInitWriter(const DoutPrefixProvider& dpp,
+ boost::asio::any_io_executor ex,
+ librados::IoCtx& ioctx,
+ bool judge_support_logrecord)
+ : RadosRevertibleWriter(dpp, std::move(ex), ioctx),
+ judge_support_logrecord(judge_support_logrecord)
+ {}
+ void prepare_write(int shard, librados::ObjectWriteOperation& op) override {
+ // don't overwrite. fail with EEXIST if a shard already exists
+ op.create(true);
+ if (judge_support_logrecord) {
+ // fail with EOPNOTSUPP if the osd doesn't support the reshard log
+ cls_rgw_bucket_init_index2(op);
+ } else {
+ cls_rgw_bucket_init_index(op);
+ }
+ }
+ void prepare_revert(int shard, librados::ObjectWriteOperation& op) override {
+ // on failure, remove any of the shards we successfully created
+ op.remove();
+ }
+ Result on_complete(int, boost::system::error_code ec) override {
+ // ignore EEXIST
+ if (ec && ec != boost::system::errc::file_exists) {
+ return Result::Error;
+ } else {
+ return Result::Success;
+ }
+ }
+ void add_prefix(std::ostream& out) const override {
+ out << "init index shards: ";
+ }
+};
+
int RGWSI_BucketIndex_RADOS::init_index(const DoutPrefixProvider *dpp,
optional_yield y,
const RGWBucketInfo& bucket_info,
map<int, string> bucket_objs;
get_bucket_index_objects(dir_oid, idx_layout.layout.normal.num_shards, idx_layout.gen, &bucket_objs);
- maybe_warn_about_blocking(dpp); // TODO: use AioTrottle
- if (judge_support_logrecord) {
- return CLSRGWIssueBucketIndexInit2(index_pool,
- bucket_objs,
- cct->_conf->rgw_bucket_index_max_aio)();
+ const size_t max_aio = cct->_conf->rgw_bucket_index_max_aio;
+ boost::system::error_code ec;
+ if (y) {
+ // run on the coroutine's executor and suspend until completion
+ auto yield = y.get_yield_context();
+ auto ex = yield.get_executor();
+ auto writer = IndexInitWriter{*dpp, ex, index_pool, judge_support_logrecord};
+
+ rgwrados::shard_io::async_writes(writer, bucket_objs, max_aio, yield[ec]);
} else {
- return CLSRGWIssueBucketIndexInit(index_pool,
- bucket_objs,
- cct->_conf->rgw_bucket_index_max_aio)();
+ // run a strand on the system executor and block on a condition variable
+ auto ex = boost::asio::make_strand(boost::asio::system_executor{});
+ auto writer = IndexInitWriter{*dpp, ex, index_pool, judge_support_logrecord};
+
+ maybe_warn_about_blocking(dpp);
+ rgwrados::shard_io::async_writes(writer, bucket_objs, max_aio,
+ ceph::async::use_blocked[ec]);
}
+ return ceph::from_error_code(ec);
}
+struct IndexCleanWriter : rgwrados::shard_io::RadosWriter {
+ IndexCleanWriter(const DoutPrefixProvider& dpp,
+ boost::asio::any_io_executor ex,
+ librados::IoCtx& ioctx)
+ : RadosWriter(dpp, std::move(ex), ioctx)
+ {}
+ void prepare_write(int shard, librados::ObjectWriteOperation& op) override {
+ op.remove();
+ }
+ Result on_complete(int, boost::system::error_code ec) override {
+ // ignore ENOENT
+ if (ec && ec != boost::system::errc::no_such_file_or_directory) {
+ return Result::Error;
+ } else {
+ return Result::Success;
+ }
+ }
+ void add_prefix(std::ostream& out) const override {
+ out << "clean index shards: ";
+ }
+};
+
int RGWSI_BucketIndex_RADOS::clean_index(const DoutPrefixProvider *dpp,
optional_yield y,
const RGWBucketInfo& bucket_info,
get_bucket_index_objects(dir_oid, idx_layout.layout.normal.num_shards,
idx_layout.gen, &bucket_objs);
- maybe_warn_about_blocking(dpp); // TODO: use AioTrottle
- return CLSRGWIssueBucketIndexClean(index_pool,
- bucket_objs,
- cct->_conf->rgw_bucket_index_max_aio)();
+ const size_t max_aio = cct->_conf->rgw_bucket_index_max_aio;
+ boost::system::error_code ec;
+ if (y) {
+ // run on the coroutine's executor and suspend until completion
+ auto yield = y.get_yield_context();
+ auto ex = yield.get_executor();
+ auto writer = IndexCleanWriter{*dpp, ex, index_pool};
+
+ rgwrados::shard_io::async_writes(writer, bucket_objs, max_aio, yield[ec]);
+ } else {
+ // run a strand on the system executor and block on a condition variable
+ auto ex = boost::asio::make_strand(boost::asio::system_executor{});
+ auto writer = IndexCleanWriter{*dpp, ex, index_pool};
+
+ maybe_warn_about_blocking(dpp);
+ rgwrados::shard_io::async_writes(writer, bucket_objs, max_aio,
+ ceph::async::use_blocked[ec]);
+ }
+ return ceph::from_error_code(ec);
}
int RGWSI_BucketIndex_RADOS::read_stats(const DoutPrefixProvider *dpp,
return 0;
}
+struct ReshardStatusReader : rgwrados::shard_io::RadosReader {
+ std::map<int, bufferlist>& buffers;
+
+ ReshardStatusReader(const DoutPrefixProvider& dpp,
+ boost::asio::any_io_executor ex,
+ librados::IoCtx& ioctx,
+ std::map<int, bufferlist>& buffers)
+ : RadosReader(dpp, std::move(ex), ioctx), buffers(buffers)
+ {}
+ void prepare_read(int shard, librados::ObjectReadOperation& op) override {
+ auto& bl = buffers[shard];
+ cls_rgw_get_bucket_resharding(op, bl);
+ }
+ Result on_complete(int, boost::system::error_code ec) override {
+ // ignore ENOENT
+ if (ec && ec != boost::system::errc::no_such_file_or_directory) {
+ return Result::Error;
+ } else {
+ return Result::Success;
+ }
+ }
+ void add_prefix(std::ostream& out) const override {
+ out << "get resharding status: ";
+ }
+};
+
int RGWSI_BucketIndex_RADOS::get_reshard_status(const DoutPrefixProvider *dpp,
optional_yield y,
const RGWBucketInfo& bucket_info,
return r;
}
- for (auto i : bucket_objs) {
- cls_rgw_bucket_instance_entry entry;
+ std::map<int, bufferlist> buffers;
+ const size_t max_aio = cct->_conf->rgw_bucket_index_max_aio;
+ boost::system::error_code ec;
+ if (y) {
+ // run on the coroutine's executor and suspend until completion
+ auto yield = y.get_yield_context();
+ auto ex = yield.get_executor();
+ auto reader = ReshardStatusReader{*dpp, ex, index_pool, buffers};
- int ret = cls_rgw_get_bucket_resharding(index_pool, i.second, &entry);
- if (ret < 0 && ret != -ENOENT) {
- ldpp_dout(dpp, -1) << "ERROR: " << __func__ << ": cls_rgw_get_bucket_resharding() returned ret=" << ret << dendl;
- return ret;
- }
+ rgwrados::shard_io::async_reads(reader, bucket_objs, max_aio, yield[ec]);
+ } else {
+ // run a strand on the system executor and block on a condition variable
+ auto ex = boost::asio::make_strand(boost::asio::system_executor{});
+ auto reader = ReshardStatusReader{*dpp, ex, index_pool, buffers};
+
+ maybe_warn_about_blocking(dpp);
+ rgwrados::shard_io::async_reads(reader, bucket_objs, max_aio,
+ ceph::async::use_blocked[ec]);
+ }
+ if (ec) {
+ return ceph::from_error_code(ec);
+ }
- status->push_back(entry);
+ try {
+ std::transform(buffers.begin(), buffers.end(),
+ std::back_inserter(*status),
+ [] (const auto& kv) {
+ cls_rgw_bucket_instance_entry entry;
+ cls_rgw_get_bucket_resharding_decode(kv.second, entry);
+ return entry;
+ });
+ } catch (const ceph::buffer::error&) {
+ return -EIO;
}
return 0;
}
+struct ReshardStatusWriter : rgwrados::shard_io::RadosWriter {
+ cls_rgw_reshard_status status;
+ ReshardStatusWriter(const DoutPrefixProvider& dpp,
+ boost::asio::any_io_executor ex,
+ librados::IoCtx& ioctx,
+ cls_rgw_reshard_status status)
+ : RadosWriter(dpp, std::move(ex), ioctx), status(status)
+ {}
+ void prepare_write(int, librados::ObjectWriteOperation& op) override {
+ cls_rgw_set_bucket_resharding(op, status);
+ }
+ void add_prefix(std::ostream& out) const override {
+ out << "set resharding status: ";
+ }
+};
+
int RGWSI_BucketIndex_RADOS::set_reshard_status(const DoutPrefixProvider *dpp,
optional_yield y,
const RGWBucketInfo& bucket_info,
cls_rgw_reshard_status status)
{
- const auto entry = cls_rgw_bucket_instance_entry{.reshard_status = status};
-
librados::IoCtx index_pool;
map<int, string> bucket_objs;
return r;
}
- maybe_warn_about_blocking(dpp); // TODO: use AioTrottle
- r = CLSRGWIssueSetBucketResharding(index_pool, bucket_objs, entry, cct->_conf->rgw_bucket_index_max_aio)();
- if (r < 0) {
- ldpp_dout(dpp, 0) << "ERROR: " << __func__ <<
- ": unable to issue set bucket resharding, r=" << r << " (" <<
- cpp_strerror(-r) << ")" << dendl;
+ const size_t max_aio = cct->_conf->rgw_bucket_index_max_aio;
+ boost::system::error_code ec;
+ if (y) {
+ // run on the coroutine's executor and suspend until completion
+ auto yield = y.get_yield_context();
+ auto ex = yield.get_executor();
+ auto writer = ReshardStatusWriter{*dpp, ex, index_pool, status};
+
+ rgwrados::shard_io::async_writes(writer, bucket_objs, max_aio, yield[ec]);
+ } else {
+ // run a strand on the system executor and block on a condition variable
+ auto ex = boost::asio::make_strand(boost::asio::system_executor{});
+ auto writer = ReshardStatusWriter{*dpp, ex, index_pool, status};
+
+ maybe_warn_about_blocking(dpp);
+ rgwrados::shard_io::async_writes(writer, bucket_objs, max_aio,
+ ceph::async::use_blocked[ec]);
}
- return r;
+ return ceph::from_error_code(ec);
}
+struct ReshardTrimWriter : rgwrados::shard_io::RadosWriter {
+ using RadosWriter::RadosWriter;
+ void prepare_write(int, librados::ObjectWriteOperation& op) override {
+ cls_rgw_bucket_reshard_log_trim(op);
+ }
+ Result on_complete(int, boost::system::error_code ec) override {
+ // keep trimming until ENODATA (no_message_available)
+ if (!ec) {
+ return Result::Retry;
+ } else if (ec == boost::system::errc::no_message_available) {
+ return Result::Success;
+ } else {
+ return Result::Error;
+ }
+ }
+ void add_prefix(std::ostream& out) const override {
+ out << "trim reshard logs: ";
+ }
+};
+
int RGWSI_BucketIndex_RADOS::trim_reshard_log(const DoutPrefixProvider* dpp,
optional_yield y,
const RGWBucketInfo& bucket_info)
if (r < 0) {
return r;
}
- return CLSRGWIssueReshardLogTrim(index_pool, bucket_objs, cct->_conf->rgw_bucket_index_max_aio)();
+
+ const size_t max_aio = cct->_conf->rgw_bucket_index_max_aio;
+ boost::system::error_code ec;
+ if (y) {
+ // run on the coroutine's executor and suspend until completion
+ auto yield = y.get_yield_context();
+ auto ex = yield.get_executor();
+ auto writer = ReshardTrimWriter{*dpp, ex, index_pool};
+
+ rgwrados::shard_io::async_writes(writer, bucket_objs, max_aio, yield[ec]);
+ } else {
+ // run a strand on the system executor and block on a condition variable
+ auto ex = boost::asio::make_strand(boost::asio::system_executor{});
+ auto writer = ReshardTrimWriter{*dpp, ex, index_pool};
+
+ maybe_warn_about_blocking(dpp);
+ rgwrados::shard_io::async_writes(writer, bucket_objs, max_aio,
+ ceph::async::use_blocked[ec]);
+ }
+ return ceph::from_error_code(ec);
+}
+
+struct TagTimeoutWriter : rgwrados::shard_io::RadosWriter {
+ uint64_t timeout;
+ TagTimeoutWriter(const DoutPrefixProvider& dpp,
+ boost::asio::any_io_executor ex,
+ librados::IoCtx& ioctx,
+ uint64_t timeout)
+ : RadosWriter(dpp, std::move(ex), ioctx), timeout(timeout)
+ {}
+ void prepare_write(int, librados::ObjectWriteOperation& op) override {
+ cls_rgw_bucket_set_tag_timeout(op, timeout);
+ }
+ void add_prefix(std::ostream& out) const override {
+ out << "set tag timeouts: ";
+ }
+};
+
+int RGWSI_BucketIndex_RADOS::set_tag_timeout(const DoutPrefixProvider* dpp,
+ optional_yield y,
+ const RGWBucketInfo& bucket_info,
+ uint64_t timeout)
+{
+ librados::IoCtx index_pool;
+ map<int, string> bucket_objs;
+
+ int r = open_bucket_index(dpp, bucket_info, std::nullopt, bucket_info.layout.current_index, &index_pool, &bucket_objs, nullptr);
+ if (r < 0) {
+ return r;
+ }
+
+ const size_t max_aio = cct->_conf->rgw_bucket_index_max_aio;
+ boost::system::error_code ec;
+ if (y) {
+ // run on the coroutine's executor and suspend until completion
+ auto yield = y.get_yield_context();
+ auto ex = yield.get_executor();
+ auto writer = TagTimeoutWriter{*dpp, ex, index_pool, timeout};
+
+ rgwrados::shard_io::async_writes(writer, bucket_objs, max_aio, yield[ec]);
+ } else {
+ // run a strand on the system executor and block on a condition variable
+ auto ex = boost::asio::make_strand(boost::asio::system_executor{});
+ auto writer = TagTimeoutWriter{*dpp, ex, index_pool, timeout};
+
+ maybe_warn_about_blocking(dpp);
+ rgwrados::shard_io::async_writes(writer, bucket_objs, max_aio,
+ ceph::async::use_blocked[ec]);
+ }
+ return ceph::from_error_code(ec);
+}
+
+struct CheckReader : rgwrados::shard_io::RadosReader {
+ std::map<int, bufferlist>& buffers;
+
+ CheckReader(const DoutPrefixProvider& dpp,
+ boost::asio::any_io_executor ex,
+ librados::IoCtx& ioctx,
+ std::map<int, bufferlist>& buffers)
+ : RadosReader(dpp, std::move(ex), ioctx), buffers(buffers)
+ {}
+ void prepare_read(int shard, librados::ObjectReadOperation& op) override {
+ auto& bl = buffers[shard];
+ cls_rgw_bucket_check_index(op, bl);
+ }
+ void add_prefix(std::ostream& out) const override {
+ out << "check index shards: ";
+ }
+};
+
+int RGWSI_BucketIndex_RADOS::check_index(const DoutPrefixProvider *dpp, optional_yield y,
+ const RGWBucketInfo& bucket_info,
+ std::map<int, bufferlist>& buffers)
+{
+ librados::IoCtx index_pool;
+ std::map<int, std::string> bucket_objs;
+
+ int r = open_bucket_index(dpp, bucket_info, std::nullopt, bucket_info.layout.current_index, &index_pool, &bucket_objs, nullptr);
+ if (r < 0) {
+ return r;
+ }
+
+ const size_t max_aio = cct->_conf->rgw_bucket_index_max_aio;
+ boost::system::error_code ec;
+ if (y) {
+ // run on the coroutine's executor and suspend until completion
+ auto yield = y.get_yield_context();
+ auto ex = yield.get_executor();
+ auto reader = CheckReader{*dpp, ex, index_pool, buffers};
+
+ rgwrados::shard_io::async_reads(reader, bucket_objs, max_aio, yield[ec]);
+ } else {
+ // run a strand on the system executor and block on a condition variable
+ auto ex = boost::asio::make_strand(boost::asio::system_executor{});
+ auto reader = CheckReader{*dpp, ex, index_pool, buffers};
+
+ maybe_warn_about_blocking(dpp);
+ rgwrados::shard_io::async_reads(reader, bucket_objs, max_aio,
+ ceph::async::use_blocked[ec]);
+ }
+ return ceph::from_error_code(ec);
+}
+
+struct RebuildWriter : rgwrados::shard_io::RadosWriter {
+ using RadosWriter::RadosWriter;
+ void prepare_write(int, librados::ObjectWriteOperation& op) override {
+ cls_rgw_bucket_rebuild_index(op);
+ }
+ void add_prefix(std::ostream& out) const override {
+ out << "rebuild index shards: ";
+ }
+};
+
+int RGWSI_BucketIndex_RADOS::rebuild_index(const DoutPrefixProvider *dpp,
+ optional_yield y,
+ const RGWBucketInfo& bucket_info)
+{
+ librados::IoCtx index_pool;
+ map<int, string> bucket_objs;
+
+ int r = open_bucket_index(dpp, bucket_info, std::nullopt, bucket_info.layout.current_index, &index_pool, &bucket_objs, nullptr);
+ if (r < 0) {
+ return r;
+ }
+
+ const size_t max_aio = cct->_conf->rgw_bucket_index_max_aio;
+ boost::system::error_code ec;
+ if (y) {
+ // run on the coroutine's executor and suspend until completion
+ auto yield = y.get_yield_context();
+ auto ex = yield.get_executor();
+ auto writer = RebuildWriter{*dpp, ex, index_pool};
+
+ rgwrados::shard_io::async_writes(writer, bucket_objs, max_aio, yield[ec]);
+ } else {
+ // run a strand on the system executor and block on a condition variable
+ auto ex = boost::asio::make_strand(boost::asio::system_executor{});
+ auto writer = RebuildWriter{*dpp, ex, index_pool};
+
+ maybe_warn_about_blocking(dpp);
+ rgwrados::shard_io::async_writes(writer, bucket_objs, max_aio,
+ ceph::async::use_blocked[ec]);
+ }
+ return ceph::from_error_code(ec);
+}
+
+struct ListReader : rgwrados::shard_io::RadosReader {
+ const cls_rgw_obj_key& start_obj;
+ const std::string& prefix;
+ const std::string& delimiter;
+ uint32_t num_entries;
+ bool list_versions;
+ std::map<int, rgw_cls_list_ret>& results;
+
+ ListReader(const DoutPrefixProvider& dpp,
+ boost::asio::any_io_executor ex,
+ librados::IoCtx& ioctx,
+ const cls_rgw_obj_key& start_obj,
+ const std::string& prefix,
+ const std::string& delimiter,
+ uint32_t num_entries, bool list_versions,
+ std::map<int, rgw_cls_list_ret>& results)
+ : RadosReader(dpp, std::move(ex), ioctx),
+ start_obj(start_obj), prefix(prefix), delimiter(delimiter),
+ num_entries(num_entries), list_versions(list_versions),
+ results(results)
+ {}
+ void prepare_read(int shard, librados::ObjectReadOperation& op) override {
+ // set the marker depending on whether we've already queried this
+ // shard and gotten a RGWBIAdvanceAndRetryError (defined
+ // constant) return value; if we have use the marker in the return
+ // to advance the search, otherwise use the marker passed in by the
+ // caller
+ auto& result = results[shard];
+ const cls_rgw_obj_key& marker =
+ result.marker.empty() ? start_obj : result.marker;
+ cls_rgw_bucket_list_op(op, marker, prefix, delimiter,
+ num_entries, list_versions, &result);
+ }
+ Result on_complete(int, boost::system::error_code ec) override {
+ if (ec.value() == -RGWBIAdvanceAndRetryError) {
+ return Result::Retry;
+ } else if (ec) {
+ return Result::Error;
+ } else {
+ return Result::Success;
+ }
+ }
+ void add_prefix(std::ostream& out) const override {
+ out << "sharded list objects: ";
+ }
+};
+
+int RGWSI_BucketIndex_RADOS::list_objects(const DoutPrefixProvider* dpp, optional_yield y,
+ librados::IoCtx& index_pool,
+ const std::map<int, string>& bucket_objs,
+ const cls_rgw_obj_key& start_obj,
+ const std::string& prefix,
+ const std::string& delimiter,
+ uint32_t num_entries, bool list_versions,
+ std::map<int, rgw_cls_list_ret>& results)
+{
+ const size_t max_aio = cct->_conf->rgw_bucket_index_max_aio;
+ boost::system::error_code ec;
+ if (y) {
+ // run on the coroutine's executor and suspend until completion
+ auto yield = y.get_yield_context();
+ auto ex = yield.get_executor();
+ auto reader = ListReader{*dpp, ex, index_pool, start_obj, prefix, delimiter,
+ num_entries, list_versions, results};
+
+ rgwrados::shard_io::async_reads(reader, bucket_objs, max_aio, yield[ec]);
+ } else {
+ // run a strand on the system executor and block on a condition variable
+ auto ex = boost::asio::make_strand(boost::asio::system_executor{});
+ auto reader = ListReader{*dpp, ex, index_pool, start_obj, prefix, delimiter,
+ num_entries, list_versions, results};
+
+ maybe_warn_about_blocking(dpp);
+ rgwrados::shard_io::async_reads(reader, bucket_objs, max_aio,
+ ceph::async::use_blocked[ec]);
+ }
+ return ceph::from_error_code(ec);
}
int RGWSI_BucketIndex_RADOS::handle_overwrite(const DoutPrefixProvider *dpp,
#include "svc_tier_rados.h"
struct rgw_bucket_dir_header;
+struct rgw_cls_list_ret;
class RGWSI_BILog_RADOS;
int trim_reshard_log(const DoutPrefixProvider* dpp, optional_yield,
const RGWBucketInfo& bucket_info);
+ int set_tag_timeout(const DoutPrefixProvider *dpp, optional_yield y,
+ const RGWBucketInfo& bucket_info, uint64_t timeout);
+
+ int check_index(const DoutPrefixProvider *dpp, optional_yield y,
+ const RGWBucketInfo& bucket_info,
+ std::map<int, bufferlist>& buffers);
+
+ int rebuild_index(const DoutPrefixProvider *dpp, optional_yield y,
+ const RGWBucketInfo& bucket_info);
+
+ /// Read the requested number of entries from each index shard object.
+ int list_objects(const DoutPrefixProvider* dpp, optional_yield y,
+ librados::IoCtx& index_pool,
+ const std::map<int, std::string>& bucket_objs,
+ const cls_rgw_obj_key& start_obj,
+ const std::string& prefix,
+ const std::string& delimiter,
+ uint32_t num_entries, bool list_versions,
+ std::map<int, rgw_cls_list_ret>& results);
+
int handle_overwrite(const DoutPrefixProvider *dpp, const RGWBucketInfo& info,
const RGWBucketInfo& orig_info,
optional_yield y) override;
#include "svc_bi_rados.h"
#include "rgw_asio_thread.h"
+#include "driver/rados/shard_io.h"
#include "cls/rgw/cls_rgw_client.h"
+#include "common/async/blocked_completion.h"
#define dout_subsys ceph_subsys_rgw
using namespace std;
+using rgwrados::shard_io::Result;
RGWSI_BILog_RADOS::RGWSI_BILog_RADOS(CephContext *cct) : RGWServiceInstance(cct)
{
svc.bi = bi_rados_svc;
}
+struct TrimWriter : rgwrados::shard_io::RadosWriter {
+ const BucketIndexShardsManager& start;
+ const BucketIndexShardsManager& end;
+
+ TrimWriter(const DoutPrefixProvider& dpp,
+ boost::asio::any_io_executor ex,
+ librados::IoCtx& ioctx,
+ const BucketIndexShardsManager& start,
+ const BucketIndexShardsManager& end)
+ : RadosWriter(dpp, std::move(ex), ioctx), start(start), end(end)
+ {}
+ void prepare_write(int shard, librados::ObjectWriteOperation& op) override {
+ cls_rgw_bilog_trim(op, start.get(shard, ""), end.get(shard, ""));
+ }
+ Result on_complete(int, boost::system::error_code ec) override {
+ // continue trimming until -ENODATA or other error
+ if (ec == boost::system::errc::no_message_available) {
+ return Result::Success;
+ } else if (ec) {
+ return Result::Error;
+ } else {
+ return Result::Retry;
+ }
+ }
+ void add_prefix(std::ostream& out) const override {
+ out << "trim bilog shards: ";
+ }
+};
+
int RGWSI_BILog_RADOS::log_trim(const DoutPrefixProvider *dpp, optional_yield y,
const RGWBucketInfo& bucket_info,
const rgw::bucket_log_layout_generation& log_layout,
return r;
}
- maybe_warn_about_blocking(dpp); // TODO: use AioTrottle
- return CLSRGWIssueBILogTrim(index_pool, start_marker_mgr, end_marker_mgr, bucket_objs,
- cct->_conf->rgw_bucket_index_max_aio)();
+ const size_t max_aio = cct->_conf->rgw_bucket_index_max_aio;
+ boost::system::error_code ec;
+ if (y) {
+ // run on the coroutine's executor and suspend until completion
+ auto yield = y.get_yield_context();
+ auto ex = yield.get_executor();
+ auto writer = TrimWriter{*dpp, ex, index_pool, start_marker_mgr, end_marker_mgr};
+
+ rgwrados::shard_io::async_writes(writer, bucket_objs, max_aio, yield[ec]);
+ } else {
+ // run a strand on the system executor and block on a condition variable
+ auto ex = boost::asio::make_strand(boost::asio::system_executor{});
+ auto writer = TrimWriter{*dpp, ex, index_pool, start_marker_mgr, end_marker_mgr};
+
+ maybe_warn_about_blocking(dpp);
+ rgwrados::shard_io::async_writes(writer, bucket_objs, max_aio,
+ ceph::async::use_blocked[ec]);
+ }
+ return ceph::from_error_code(ec);
}
+struct StartWriter : rgwrados::shard_io::RadosWriter {
+ using RadosWriter::RadosWriter;
+ void prepare_write(int, librados::ObjectWriteOperation& op) override {
+ cls_rgw_bilog_start(op);
+ }
+ void add_prefix(std::ostream& out) const override {
+ out << "restart bilog shards: ";
+ }
+};
+
int RGWSI_BILog_RADOS::log_start(const DoutPrefixProvider *dpp, optional_yield y,
const RGWBucketInfo& bucket_info,
const rgw::bucket_log_layout_generation& log_layout,
if (r < 0)
return r;
- maybe_warn_about_blocking(dpp); // TODO: use AioTrottle
- return CLSRGWIssueResyncBucketBILog(index_pool, bucket_objs, cct->_conf->rgw_bucket_index_max_aio)();
+ const size_t max_aio = cct->_conf->rgw_bucket_index_max_aio;
+ boost::system::error_code ec;
+ if (y) {
+ // run on the coroutine's executor and suspend until completion
+ auto yield = y.get_yield_context();
+ auto ex = yield.get_executor();
+ auto writer = StartWriter{*dpp, ex, index_pool};
+
+ rgwrados::shard_io::async_writes(writer, bucket_objs, max_aio, yield[ec]);
+ } else {
+ // run a strand on the system executor and block on a condition variable
+ auto ex = boost::asio::make_strand(boost::asio::system_executor{});
+ auto writer = StartWriter{*dpp, ex, index_pool};
+
+ maybe_warn_about_blocking(dpp);
+ rgwrados::shard_io::async_writes(writer, bucket_objs, max_aio,
+ ceph::async::use_blocked[ec]);
+ }
+ return ceph::from_error_code(ec);
}
+struct StopWriter : rgwrados::shard_io::RadosWriter {
+ using RadosWriter::RadosWriter;
+ void prepare_write(int, librados::ObjectWriteOperation& op) override {
+ cls_rgw_bilog_stop(op);
+ }
+ void add_prefix(std::ostream& out) const override {
+ out << "stop bilog shards: ";
+ }
+};
+
int RGWSI_BILog_RADOS::log_stop(const DoutPrefixProvider *dpp, optional_yield y,
const RGWBucketInfo& bucket_info,
const rgw::bucket_log_layout_generation& log_layout,
if (r < 0)
return r;
- maybe_warn_about_blocking(dpp); // TODO: use AioTrottle
- return CLSRGWIssueBucketBILogStop(index_pool, bucket_objs, cct->_conf->rgw_bucket_index_max_aio)();
+ const size_t max_aio = cct->_conf->rgw_bucket_index_max_aio;
+ boost::system::error_code ec;
+ if (y) {
+ // run on the coroutine's executor and suspend until completion
+ auto yield = y.get_yield_context();
+ auto ex = yield.get_executor();
+ auto writer = StopWriter{*dpp, ex, index_pool};
+
+ rgwrados::shard_io::async_writes(writer, bucket_objs, max_aio, yield[ec]);
+ } else {
+ // run a strand on the system executor and block on a condition variable
+ auto ex = boost::asio::make_strand(boost::asio::system_executor{});
+ auto writer = StopWriter{*dpp, ex, index_pool};
+
+ maybe_warn_about_blocking(dpp);
+ rgwrados::shard_io::async_writes(writer, bucket_objs, max_aio,
+ ceph::async::use_blocked[ec]);
+ }
+ return ceph::from_error_code(ec);
}
static void build_bucket_index_marker(const string& shard_id_str,
}
}
+struct LogReader : rgwrados::shard_io::RadosReader {
+ const BucketIndexShardsManager& start;
+ uint32_t max;
+ std::map<int, cls_rgw_bi_log_list_ret>& logs;
+
+ LogReader(const DoutPrefixProvider& dpp, boost::asio::any_io_executor ex,
+ librados::IoCtx& ioctx, const BucketIndexShardsManager& start,
+ uint32_t max, std::map<int, cls_rgw_bi_log_list_ret>& logs)
+ : RadosReader(dpp, std::move(ex), ioctx),
+ start(start), max(max), logs(logs)
+ {}
+ void prepare_read(int shard, librados::ObjectReadOperation& op) override {
+ auto& result = logs[shard];
+ cls_rgw_bilog_list(op, start.get(shard, ""), max, &result, nullptr);
+ }
+ void add_prefix(std::ostream& out) const override {
+ out << "list bilog shards: ";
+ }
+};
+
+static int bilog_list(const DoutPrefixProvider* dpp, optional_yield y,
+ RGWSI_BucketIndex_RADOS* svc_bi,
+ const RGWBucketInfo& bucket_info,
+ const rgw::bucket_log_layout_generation& log_layout,
+ const BucketIndexShardsManager& start,
+ int shard_id, uint32_t max,
+ std::map<int, cls_rgw_bi_log_list_ret>& logs)
+{
+ librados::IoCtx index_pool;
+ map<int, string> oids;
+ const auto& current_index = rgw::log_to_index_layout(log_layout);
+ int r = svc_bi->open_bucket_index(dpp, bucket_info, shard_id, current_index, &index_pool, &oids, nullptr);
+ if (r < 0)
+ return r;
+
+ const size_t max_aio = dpp->get_cct()->_conf->rgw_bucket_index_max_aio;
+ boost::system::error_code ec;
+ if (y) {
+ // run on the coroutine's executor and suspend until completion
+ auto yield = y.get_yield_context();
+ auto ex = yield.get_executor();
+ auto reader = LogReader{*dpp, ex, index_pool, start, max, logs};
+
+ rgwrados::shard_io::async_reads(reader, oids, max_aio, yield[ec]);
+ } else {
+ // run a strand on the system executor and block on a condition variable
+ auto ex = boost::asio::make_strand(boost::asio::system_executor{});
+ auto reader = LogReader{*dpp, ex, index_pool, start, max, logs};
+
+ maybe_warn_about_blocking(dpp);
+ rgwrados::shard_io::async_reads(reader, oids, max_aio,
+ ceph::async::use_blocked[ec]);
+ }
+ return ceph::from_error_code(ec);
+}
+
int RGWSI_BILog_RADOS::log_list(const DoutPrefixProvider *dpp, optional_yield y,
const RGWBucketInfo& bucket_info,
const rgw::bucket_log_layout_generation& log_layout,
ldpp_dout(dpp, 20) << __func__ << ": " << bucket_info.bucket << " marker " << marker << " shard_id=" << shard_id << " max " << max << dendl;
result.clear();
- librados::IoCtx index_pool;
- map<int, string> oids;
- map<int, cls_rgw_bi_log_list_ret> bi_log_lists;
- const auto& current_index = rgw::log_to_index_layout(log_layout);
- int r = svc.bi->open_bucket_index(dpp, bucket_info, shard_id, current_index, &index_pool, &oids, nullptr);
- if (r < 0)
- return r;
-
BucketIndexShardsManager marker_mgr;
- bool has_shards = (oids.size() > 1 || shard_id >= 0);
+ const bool has_shards = (shard_id >= 0);
// If there are multiple shards for the bucket index object, the marker
// should have the pattern '{shard_id_1}#{shard_marker_1},{shard_id_2}#
// {shard_marker_2}...', if there is no sharding, the bi_log_list should
// only contain one record, and the key is the bucket instance id.
- r = marker_mgr.from_string(marker, shard_id);
+ int r = marker_mgr.from_string(marker, shard_id);
if (r < 0)
return r;
- maybe_warn_about_blocking(dpp); // TODO: use AioTrottle
- r = CLSRGWIssueBILogList(index_pool, marker_mgr, max, oids, bi_log_lists, cct->_conf->rgw_bucket_index_max_aio)();
+ std::map<int, cls_rgw_bi_log_list_ret> bi_log_lists;
+ r = bilog_list(dpp, y, svc.bi, bucket_info, log_layout, marker_mgr,
+ shard_id, max, bi_log_lists);
if (r < 0)
return r;