From ebe98934cd69a32abb3ddafc1fa94b40dfa6844c Mon Sep 17 00:00:00 2001 From: "J. Eric Ivancich" Date: Wed, 24 Apr 2024 18:05:33 -0400 Subject: [PATCH] rgw: make incomplete multipart upload part of bucket check efficient Previously the incomplete multipart portion of bucket check would list all entries in the _multipart_ namespace across all shards and then analyze them in memory before taking further action. Since all index entries for a given multipart upload are all on the same shard by design, we can work on this asynchronously shard by shard. Furthermore since all entries for a given multipart upload are sequential in the bucket index, we can use a small window to analyze each of the uploads. This should make the operation quicker and use much less memory in the worst cases. Signed-off-by: J. Eric Ivancich --- src/rgw/driver/rados/rgw_bucket.cc | 331 +++++++++++++++++++++-------- src/rgw/driver/rados/rgw_bucket.h | 9 +- 2 files changed, 252 insertions(+), 88 deletions(-) diff --git a/src/rgw/driver/rados/rgw_bucket.cc b/src/rgw/driver/rados/rgw_bucket.cc index c51e61a2755dc..7cf6fbd56f1a2 100644 --- a/src/rgw/driver/rados/rgw_bucket.cc +++ b/src/rgw/driver/rados/rgw_bucket.cc @@ -85,10 +85,10 @@ static void parse_bucket(const string& bucket, } } -static void dump_mulipart_index_results(list& objs_to_unlink, - Formatter *f) +static void dump_multipart_index_results(std::list& objs, + Formatter *f) { - for (const auto& o : objs_to_unlink) { + for (const auto& o : objs) { f->dump_string("object", o.name); } } @@ -317,100 +317,236 @@ static void dump_index_check(map existing_stats formatter->close_section(); } -int RGWBucket::check_bad_index_multipart(RGWBucketAdminOpState& op_state, - RGWFormatterFlusher& flusher, - const DoutPrefixProvider *dpp, optional_yield y, - std::string *err_msg) + +/** + * Looks for incomplete and damaged multipart uploads on a single + * shard. While the parts are being uploaded, entries are kept in the + * bucket index to track the components of the upload. There is one + * ".meta" entry and the part entries, all with the same prefix in + * their keys. If we find the part entries but not their corresponding + * shared ".meta" entry we can safely remove the part entries from the + * index shard. + * + * We take advantage of the fact that since all entries for the same + * upload have the same prefix, they're sequential in the index, with + * the ".meta" coming last in the sequence. So we only need a small + * window to track entries until either their ".meta" does or does not + * come up. + */ +static int check_bad_index_multipart(rgw::sal::RadosStore* const rados_store, + rgw::sal::Bucket* const bucket, + const DoutPrefixProvider *dpp, + RGWBucketAdminOpState& op_state, + RGWFormatterFlusher& flusher, + const int shard, + optional_yield y) { + RGWRados* store = rados_store->getRados(); + RGWRados::BucketShard bs(store); + const bool fix_index = op_state.will_fix_index(); - bucket = op_state.get_bucket()->clone(); + int ret = bs.init(dpp, + bucket->get_info(), + bucket->get_info().layout.current_index, shard, y); + if (ret < 0) { + ldpp_dout(dpp, -1) << "ERROR bs.init(bucket=" << bucket << "): " << + cpp_strerror(-ret) << dendl; + return ret; + } - rgw::sal::Bucket::ListParams params; - params.list_versions = true; - params.ns = RGW_OBJ_NS_MULTIPART; + std::string marker = rgw_obj_key(std::string(), + std::string(), + RGW_OBJ_NS_MULTIPART).get_index_key_name(); + bool is_truncated = true; + std::list entries_read; - std::map meta_objs; - std::map all_objs; - bool is_truncated; - do { - rgw::sal::Bucket::ListResults results; - int r = bucket->list(dpp, params, listing_max_entries, results, y); - if (r < 0) { - set_err_msg(err_msg, "failed to list objects in bucket=" + bucket->get_name() + - " err=" + cpp_strerror(-r)); + // holds part entries w/o ".meta" + std::list entries_to_unlink; - return r; - } - is_truncated = results.is_truncated; + // holds entries pending finding of ".meta" + std::list entries_window; - for (const auto& o : results.objs) { - rgw_obj_index_key key = o.key; - rgw_obj obj(bucket->get_key(), key); - std::string oid = obj.get_oid(); + // tracks whether on same multipart upload or not + std::string prev_entry_prefix; + do { + entries_read.clear(); + ret = store->bi_list(bs, "", marker, -1, + &entries_read, &is_truncated, y); + if (ret < 0) { + ldpp_dout(dpp, -1) << "ERROR bi_list(): " << cpp_strerror(-ret) << + dendl; + break; + } - int pos = oid.find_last_of('.'); - if (pos < 0) { - /* obj has no suffix */ - all_objs[key] = oid; - } else { - /* obj has suffix */ - std::string name = oid.substr(0, pos); - std::string suffix = oid.substr(pos + 1); + for (const auto& entry : entries_read) { + marker = entry.idx; - if (suffix.compare("meta") == 0) { - meta_objs[name] = true; - } else { - all_objs[key] = name; - } + rgw_obj_key obj_key; + bool parsed = rgw_obj_key::parse_raw_oid(entry.idx, &obj_key); + if (!parsed) { + ldpp_dout(dpp, 0) << + "WARNING: could not parse index entry; ignoring; key=" << + entry.idx << dendl; + continue; } - } - } while (is_truncated); + const std::string& name = obj_key.name; + const std::string& ns = obj_key.ns; - std::list objs_to_unlink; - Formatter *f = flusher.get_formatter(); + // when we're out of the multipart namespace, we're done + if (entry.type != BIIndexType::Plain || ns != RGW_OBJ_NS_MULTIPART) { + is_truncated = false; + break; + } - f->open_array_section("invalid_multipart_entries"); + auto period = name.rfind("."); + if (period == std::string::npos) { + ldpp_dout(dpp, 0) << + "WARNING: index entry in multipart namespace does not contain" + " suffix indicator ('.'); ignoring; key=" << + entry.idx << dendl; + continue; + } - for (const auto& o : all_objs) { - const std::string& name = o.second; - if (meta_objs.find(name) == meta_objs.end()) { - objs_to_unlink.push_back(o.first); - } + const std::string entry_prefix = name.substr(0, period); + const std::string entry_suffix = name.substr(1 + period); + + // the entries for a given multipart upload will appear + // sequentially with the ".meta" will being the last. So we'll + // cache the entries until we either find the "meta" entry or + // switch to a different upload + if (entry_suffix == "meta") { + if (entry_prefix != prev_entry_prefix) { + entries_to_unlink.insert(entries_to_unlink.end(), + entries_window.cbegin(), + entries_window.cend()); + } - if (objs_to_unlink.size() > listing_max_entries) { - if (fix_index) { - // note: under rados this removes directly from rados index objects - int r = bucket->remove_objs_from_index(dpp, objs_to_unlink); - if (r < 0) { - set_err_msg(err_msg, "ERROR: remove_obj_from_index() returned error: " + - cpp_strerror(-r)); - return r; + // either way start over + entries_window.clear(); + prev_entry_prefix.clear(); + } else { + if (entry_prefix != prev_entry_prefix) { + entries_to_unlink.insert(entries_to_unlink.end(), + entries_window.cbegin(), + entries_window.cend()); + entries_window.clear(); + prev_entry_prefix = entry_prefix; } + + // create an rgw_obj_index_key to store in window + rgw_obj_index_key obj_index_key; + obj_key.get_index_key(&obj_index_key); + entries_window.push_back(obj_index_key); } - dump_mulipart_index_results(objs_to_unlink, f); - flusher.flush(); - objs_to_unlink.clear(); - } - } + // check if this is a good point for intermediate index clean-up + if (entries_to_unlink.size() >= listing_max_entries) { + dump_multipart_index_results(entries_to_unlink, + flusher.get_formatter()); + if (fix_index) { + store->remove_objs_from_index(dpp, bucket->get_info(), + entries_to_unlink); + } + entries_to_unlink.clear(); + } + } // for + } while (is_truncated); - if (fix_index) { - // note: under rados this removes directly from rados index objects - int r = bucket->remove_objs_from_index(dpp, objs_to_unlink); - if (r < 0) { - set_err_msg(err_msg, "ERROR: remove_obj_from_index() returned error: " + - cpp_strerror(-r)); + // any entries left over at the end can be unlinked + entries_to_unlink.insert(entries_to_unlink.end(), + entries_window.cbegin(), + entries_window.cend()); + entries_window.clear(); - return r; + if (! entries_to_unlink.empty()) { + dump_multipart_index_results(entries_to_unlink, + flusher.get_formatter()); + if (fix_index) { + store->remove_objs_from_index(dpp, bucket->get_info(), + entries_to_unlink); } + entries_to_unlink.clear(); } - dump_mulipart_index_results(objs_to_unlink, f); - f->close_section(); flusher.flush(); return 0; +} // static ::check_bad_index_multipart + + +/** + * Checks for damaged incomplete multipart uploads in a bucket + * index. Since all entries for a given multipart upload end up on the + * same shard (by design), we spawn a set of co-routines, each one + * working shard by shard until all work is complete. + * + * TODO: This function takes optional_yield so there's an expectation + * that it can run asynchronously, but io_context::run() is + * synchronous. This is fine for radosgw-admin, but we also serve this + * 'bucket check' operation over the /admin/bucket api, so we'll want + * to address this in the future. + */ +int RGWBucket::check_bad_index_multipart(rgw::sal::RadosStore* const rados_store, + RGWBucketAdminOpState& op_state, + RGWFormatterFlusher& flusher, + const DoutPrefixProvider *dpp, + optional_yield y, + std::string* err_msg) +{ + const RGWBucketInfo& bucket_info = get_bucket_info(); + + Formatter* formatter = flusher.get_formatter(); + formatter->open_array_section("invalid_multipart_entries"); + + const auto& index_layout = bucket_info.layout.current_index.layout; + if (index_layout.type != rgw::BucketIndexType::Normal) { + ldpp_dout(dpp, 0) << "ERROR: cannot check bucket indices with layouts of type " << + current_layout_desc(bucket_info.layout) << + " for bad multipart entries" << dendl; + return -EINVAL; + } + const int num_shards = rgw::num_shards(index_layout.normal); + int next_shard = 0; + + boost::asio::io_context context; + const int max_aio = std::max(1, op_state.get_max_aio()); + int any_error = 0; // first error encountered if any + for (int i = 0; i < max_aio; i++) { + spawn::spawn(context, [&](spawn::yield_context yield) { + while (true) { + const int shard = next_shard++; + if (shard >= num_shards) { + return; + } + + optional_yield y(context, yield); + + int r = ::check_bad_index_multipart(rados_store, &*bucket, dpp, + op_state, flusher, shard, y); + if (r < 0) { + ldpp_dout(dpp, -1) << "WARNING: error processing shard " << shard << + " check_bad_index_multipart(): " << r << "; skipping" << dendl; + if (!any_error) { + // record first error encountered, but continue + any_error = r; + } + } + } // while + }); + } // for + + try { + context.run(); + } catch (const std::system_error& e) { + formatter->close_section(); + *err_msg = e.what(); + return -e.code().value(); + } + + formatter->close_section(); + + return any_error; } int RGWBucket::check_object_index(const DoutPrefixProvider *dpp, @@ -561,6 +697,13 @@ static int check_index_olh(rgw::sal::RadosStore* const rados_store, /** * Spawns separate coroutines to check each bucket shard for leftover * olh entries (and remove them if op_state.fix_index is true). + * + * TODO: Currently this is synchronous as it uses + * io_context::run(). Allow this to run asynchronously by receiving an + * optional_yield parameter and making other adjustments. Synchronous + * is fine for radosgw-admin, but we also serve this 'bucket check' + * operation over the /admin/bucket api, so we'll want to address + * this. */ int RGWBucket::check_index_olh(rgw::sal::RadosStore* const rados_store, const DoutPrefixProvider *dpp, @@ -578,7 +721,14 @@ int RGWBucket::check_index_olh(rgw::sal::RadosStore* const rados_store, formatter->open_array_section(""); } - const int max_shards = rgw::num_shards(bucket_info.layout.current_index); + const auto& index_layout = bucket_info.layout.current_index.layout; + if (index_layout.type != rgw::BucketIndexType::Normal) { + ldpp_dout(dpp, 0) << "ERROR: cannot check bucket indices with layouts of type " << + current_layout_desc(bucket_info.layout) << + " for bad OLH entries" << dendl; + return -EINVAL; + } + const int max_shards = rgw::num_shards(index_layout.normal); std::string verb = op_state.will_fix_index() ? "removed" : "found"; uint64_t count_out = 0; @@ -1196,38 +1346,49 @@ int RGWBucketAdminOp::check_index_unlinked(rgw::sal::RadosStore* store, return 0; } -int RGWBucketAdminOp::check_index(rgw::sal::Driver* driver, RGWBucketAdminOpState& op_state, - RGWFormatterFlusher& flusher, optional_yield y, const DoutPrefixProvider *dpp) +int RGWBucketAdminOp::check_index(rgw::sal::Driver* driver, + RGWBucketAdminOpState& op_state, + RGWFormatterFlusher& flusher, + optional_yield y, + const DoutPrefixProvider* dpp) { int ret; - map existing_stats; - map calculated_stats; - + std::map existing_stats; + std::map calculated_stats; RGWBucket bucket; - ret = bucket.init(driver, op_state, y, dpp); - if (ret < 0) + if (ret < 0) { return ret; + } Formatter *formatter = flusher.get_formatter(); flusher.start(0); formatter->open_object_section("bucket_check"); - ret = bucket.check_bad_index_multipart(op_state, flusher, dpp, y); - if (ret < 0) - return ret; + auto rados_store = dynamic_cast(driver); + if (!rados_store) { + ldpp_dout(dpp, 0) << "WARNING: couldn't access a RadosStore, " + "so skipping bad incomplete multipart check" << dendl; + } else { + ret = bucket.check_bad_index_multipart(rados_store, op_state, flusher, dpp, y); + if (ret < 0) { + return ret; + } + } if (op_state.will_check_objects()) { ret = bucket.check_object_index(dpp, op_state, flusher, y); - if (ret < 0) + if (ret < 0) { return ret; + } } ret = bucket.check_index(dpp, op_state, existing_stats, calculated_stats); - if (ret < 0) + if (ret < 0) { return ret; + } dump_index_check(existing_stats, calculated_stats, formatter); diff --git a/src/rgw/driver/rados/rgw_bucket.h b/src/rgw/driver/rados/rgw_bucket.h index e91c0d7e13953..6bbfba93df215 100644 --- a/src/rgw/driver/rados/rgw_bucket.h +++ b/src/rgw/driver/rados/rgw_bucket.h @@ -338,9 +338,12 @@ public: int init(rgw::sal::Driver* storage, RGWBucketAdminOpState& op_state, optional_yield y, const DoutPrefixProvider *dpp, std::string *err_msg = NULL); - int check_bad_index_multipart(RGWBucketAdminOpState& op_state, - RGWFormatterFlusher& flusher, - const DoutPrefixProvider *dpp, optional_yield y, std::string *err_msg = NULL); + int check_bad_index_multipart(rgw::sal::RadosStore* const rados_store, + RGWBucketAdminOpState& op_state, + RGWFormatterFlusher& flusher, + const DoutPrefixProvider *dpp, + optional_yield y, + std::string *err_msg = nullptr); int check_object_index(const DoutPrefixProvider *dpp, RGWBucketAdminOpState& op_state, -- 2.39.5