}
}
-static void dump_mulipart_index_results(list<rgw_obj_index_key>& objs_to_unlink,
- Formatter *f)
+static void dump_multipart_index_results(std::list<rgw_obj_index_key>& objs,
+ Formatter *f)
{
- for (const auto& o : objs_to_unlink) {
+ for (const auto& o : objs) {
f->dump_string("object", o.name);
}
}
formatter->close_section();
}
-int RGWBucket::check_bad_index_multipart(RGWBucketAdminOpState& op_state,
- RGWFormatterFlusher& flusher,
- const DoutPrefixProvider *dpp, optional_yield y,
- std::string *err_msg)
+
+/**
+ * Looks for incomplete and damaged multipart uploads on a single
+ * shard. While the parts are being uploaded, entries are kept in the
+ * bucket index to track the components of the upload. There is one
+ * ".meta" entry and the part entries, all with the same prefix in
+ * their keys. If we find the part entries but not their corresponding
+ * shared ".meta" entry we can safely remove the part entries from the
+ * index shard.
+ *
+ * We take advantage of the fact that since all entries for the same
+ * upload have the same prefix, they're sequential in the index, with
+ * the ".meta" coming last in the sequence. So we only need a small
+ * window to track entries until either their ".meta" does or does not
+ * come up.
+ */
+static int check_bad_index_multipart(rgw::sal::RadosStore* const rados_store,
+ rgw::sal::Bucket* const bucket,
+ const DoutPrefixProvider *dpp,
+ RGWBucketAdminOpState& op_state,
+ RGWFormatterFlusher& flusher,
+ const int shard,
+ optional_yield y)
{
+ RGWRados* store = rados_store->getRados();
+ RGWRados::BucketShard bs(store);
+
const bool fix_index = op_state.will_fix_index();
- bucket = op_state.get_bucket()->clone();
+ int ret = bs.init(dpp,
+ bucket->get_info(),
+ bucket->get_info().layout.current_index, shard, y);
+ if (ret < 0) {
+ ldpp_dout(dpp, -1) << "ERROR bs.init(bucket=" << bucket << "): " <<
+ cpp_strerror(-ret) << dendl;
+ return ret;
+ }
- rgw::sal::Bucket::ListParams params;
- params.list_versions = true;
- params.ns = RGW_OBJ_NS_MULTIPART;
+ std::string marker = rgw_obj_key(std::string(),
+ std::string(),
+ RGW_OBJ_NS_MULTIPART).get_index_key_name();
+ bool is_truncated = true;
+ std::list<rgw_cls_bi_entry> entries_read;
- std::map<std::string, bool> meta_objs;
- std::map<rgw_obj_index_key, std::string> all_objs;
- bool is_truncated;
- do {
- rgw::sal::Bucket::ListResults results;
- int r = bucket->list(dpp, params, listing_max_entries, results, y);
- if (r < 0) {
- set_err_msg(err_msg, "failed to list objects in bucket=" + bucket->get_name() +
- " err=" + cpp_strerror(-r));
+ // holds part entries w/o ".meta"
+ std::list<rgw_obj_index_key> entries_to_unlink;
- return r;
- }
- is_truncated = results.is_truncated;
+ // holds entries pending finding of ".meta"
+ std::list<rgw_obj_index_key> entries_window;
- for (const auto& o : results.objs) {
- rgw_obj_index_key key = o.key;
- rgw_obj obj(bucket->get_key(), key);
- std::string oid = obj.get_oid();
+ // tracks whether on same multipart upload or not
+ std::string prev_entry_prefix;
+ do {
+ entries_read.clear();
+ ret = store->bi_list(bs, "", marker, -1,
+ &entries_read, &is_truncated, y);
+ if (ret < 0) {
+ ldpp_dout(dpp, -1) << "ERROR bi_list(): " << cpp_strerror(-ret) <<
+ dendl;
+ break;
+ }
- int pos = oid.find_last_of('.');
- if (pos < 0) {
- /* obj has no suffix */
- all_objs[key] = oid;
- } else {
- /* obj has suffix */
- std::string name = oid.substr(0, pos);
- std::string suffix = oid.substr(pos + 1);
+ for (const auto& entry : entries_read) {
+ marker = entry.idx;
- if (suffix.compare("meta") == 0) {
- meta_objs[name] = true;
- } else {
- all_objs[key] = name;
- }
+ rgw_obj_key obj_key;
+ bool parsed = rgw_obj_key::parse_raw_oid(entry.idx, &obj_key);
+ if (!parsed) {
+ ldpp_dout(dpp, 0) <<
+ "WARNING: could not parse index entry; ignoring; key=" <<
+ entry.idx << dendl;
+ continue;
}
- }
- } while (is_truncated);
+ const std::string& name = obj_key.name;
+ const std::string& ns = obj_key.ns;
- std::list<rgw_obj_index_key> objs_to_unlink;
- Formatter *f = flusher.get_formatter();
+ // when we're out of the multipart namespace, we're done
+ if (entry.type != BIIndexType::Plain || ns != RGW_OBJ_NS_MULTIPART) {
+ is_truncated = false;
+ break;
+ }
- f->open_array_section("invalid_multipart_entries");
+ auto period = name.rfind(".");
+ if (period == std::string::npos) {
+ ldpp_dout(dpp, 0) <<
+ "WARNING: index entry in multipart namespace does not contain"
+ " suffix indicator ('.'); ignoring; key=" <<
+ entry.idx << dendl;
+ continue;
+ }
- for (const auto& o : all_objs) {
- const std::string& name = o.second;
- if (meta_objs.find(name) == meta_objs.end()) {
- objs_to_unlink.push_back(o.first);
- }
+ const std::string entry_prefix = name.substr(0, period);
+ const std::string entry_suffix = name.substr(1 + period);
+
+ // the entries for a given multipart upload will appear
+ // sequentially with the ".meta" will being the last. So we'll
+ // cache the entries until we either find the "meta" entry or
+ // switch to a different upload
+ if (entry_suffix == "meta") {
+ if (entry_prefix != prev_entry_prefix) {
+ entries_to_unlink.insert(entries_to_unlink.end(),
+ entries_window.cbegin(),
+ entries_window.cend());
+ }
- if (objs_to_unlink.size() > listing_max_entries) {
- if (fix_index) {
- // note: under rados this removes directly from rados index objects
- int r = bucket->remove_objs_from_index(dpp, objs_to_unlink);
- if (r < 0) {
- set_err_msg(err_msg, "ERROR: remove_obj_from_index() returned error: " +
- cpp_strerror(-r));
- return r;
+ // either way start over
+ entries_window.clear();
+ prev_entry_prefix.clear();
+ } else {
+ if (entry_prefix != prev_entry_prefix) {
+ entries_to_unlink.insert(entries_to_unlink.end(),
+ entries_window.cbegin(),
+ entries_window.cend());
+ entries_window.clear();
+ prev_entry_prefix = entry_prefix;
}
+
+ // create an rgw_obj_index_key to store in window
+ rgw_obj_index_key obj_index_key;
+ obj_key.get_index_key(&obj_index_key);
+ entries_window.push_back(obj_index_key);
}
- dump_mulipart_index_results(objs_to_unlink, f);
- flusher.flush();
- objs_to_unlink.clear();
- }
- }
+ // check if this is a good point for intermediate index clean-up
+ if (entries_to_unlink.size() >= listing_max_entries) {
+ dump_multipart_index_results(entries_to_unlink,
+ flusher.get_formatter());
+ if (fix_index) {
+ store->remove_objs_from_index(dpp, bucket->get_info(),
+ entries_to_unlink);
+ }
+ entries_to_unlink.clear();
+ }
+ } // for
+ } while (is_truncated);
- if (fix_index) {
- // note: under rados this removes directly from rados index objects
- int r = bucket->remove_objs_from_index(dpp, objs_to_unlink);
- if (r < 0) {
- set_err_msg(err_msg, "ERROR: remove_obj_from_index() returned error: " +
- cpp_strerror(-r));
+ // any entries left over at the end can be unlinked
+ entries_to_unlink.insert(entries_to_unlink.end(),
+ entries_window.cbegin(),
+ entries_window.cend());
+ entries_window.clear();
- return r;
+ if (! entries_to_unlink.empty()) {
+ dump_multipart_index_results(entries_to_unlink,
+ flusher.get_formatter());
+ if (fix_index) {
+ store->remove_objs_from_index(dpp, bucket->get_info(),
+ entries_to_unlink);
}
+ entries_to_unlink.clear();
}
- dump_mulipart_index_results(objs_to_unlink, f);
- f->close_section();
flusher.flush();
return 0;
+} // static ::check_bad_index_multipart
+
+
+/**
+ * Checks for damaged incomplete multipart uploads in a bucket
+ * index. Since all entries for a given multipart upload end up on the
+ * same shard (by design), we spawn a set of co-routines, each one
+ * working shard by shard until all work is complete.
+ *
+ * TODO: This function takes optional_yield so there's an expectation
+ * that it can run asynchronously, but io_context::run() is
+ * synchronous. This is fine for radosgw-admin, but we also serve this
+ * 'bucket check' operation over the /admin/bucket api, so we'll want
+ * to address this in the future.
+ */
+int RGWBucket::check_bad_index_multipart(rgw::sal::RadosStore* const rados_store,
+ RGWBucketAdminOpState& op_state,
+ RGWFormatterFlusher& flusher,
+ const DoutPrefixProvider *dpp,
+ optional_yield y,
+ std::string* err_msg)
+{
+ const RGWBucketInfo& bucket_info = get_bucket_info();
+
+ Formatter* formatter = flusher.get_formatter();
+ formatter->open_array_section("invalid_multipart_entries");
+
+ const auto& index_layout = bucket_info.layout.current_index.layout;
+ if (index_layout.type != rgw::BucketIndexType::Normal) {
+ ldpp_dout(dpp, 0) << "ERROR: cannot check bucket indices with layouts of type " <<
+ current_layout_desc(bucket_info.layout) <<
+ " for bad multipart entries" << dendl;
+ return -EINVAL;
+ }
+ const int num_shards = rgw::num_shards(index_layout.normal);
+ int next_shard = 0;
+
+ boost::asio::io_context context;
+ const int max_aio = std::max(1, op_state.get_max_aio());
+ int any_error = 0; // first error encountered if any
+ for (int i = 0; i < max_aio; i++) {
+ spawn::spawn(context, [&](spawn::yield_context yield) {
+ while (true) {
+ const int shard = next_shard++;
+ if (shard >= num_shards) {
+ return;
+ }
+
+ optional_yield y(context, yield);
+
+ int r = ::check_bad_index_multipart(rados_store, &*bucket, dpp,
+ op_state, flusher, shard, y);
+ if (r < 0) {
+ ldpp_dout(dpp, -1) << "WARNING: error processing shard " << shard <<
+ " check_bad_index_multipart(): " << r << "; skipping" << dendl;
+ if (!any_error) {
+ // record first error encountered, but continue
+ any_error = r;
+ }
+ }
+ } // while
+ });
+ } // for
+
+ try {
+ context.run();
+ } catch (const std::system_error& e) {
+ formatter->close_section();
+ *err_msg = e.what();
+ return -e.code().value();
+ }
+
+ formatter->close_section();
+
+ return any_error;
}
int RGWBucket::check_object_index(const DoutPrefixProvider *dpp,
/**
* Spawns separate coroutines to check each bucket shard for leftover
* olh entries (and remove them if op_state.fix_index is true).
+ *
+ * TODO: Currently this is synchronous as it uses
+ * io_context::run(). Allow this to run asynchronously by receiving an
+ * optional_yield parameter and making other adjustments. Synchronous
+ * is fine for radosgw-admin, but we also serve this 'bucket check'
+ * operation over the /admin/bucket api, so we'll want to address
+ * this.
*/
int RGWBucket::check_index_olh(rgw::sal::RadosStore* const rados_store,
const DoutPrefixProvider *dpp,
formatter->open_array_section("");
}
- const int max_shards = rgw::num_shards(bucket_info.layout.current_index);
+ const auto& index_layout = bucket_info.layout.current_index.layout;
+ if (index_layout.type != rgw::BucketIndexType::Normal) {
+ ldpp_dout(dpp, 0) << "ERROR: cannot check bucket indices with layouts of type " <<
+ current_layout_desc(bucket_info.layout) <<
+ " for bad OLH entries" << dendl;
+ return -EINVAL;
+ }
+ const int max_shards = rgw::num_shards(index_layout.normal);
std::string verb = op_state.will_fix_index() ? "removed" : "found";
uint64_t count_out = 0;
return 0;
}
-int RGWBucketAdminOp::check_index(rgw::sal::Driver* driver, RGWBucketAdminOpState& op_state,
- RGWFormatterFlusher& flusher, optional_yield y, const DoutPrefixProvider *dpp)
+int RGWBucketAdminOp::check_index(rgw::sal::Driver* driver,
+ RGWBucketAdminOpState& op_state,
+ RGWFormatterFlusher& flusher,
+ optional_yield y,
+ const DoutPrefixProvider* dpp)
{
int ret;
- map<RGWObjCategory, RGWStorageStats> existing_stats;
- map<RGWObjCategory, RGWStorageStats> calculated_stats;
-
+ std::map<RGWObjCategory, RGWStorageStats> existing_stats;
+ std::map<RGWObjCategory, RGWStorageStats> calculated_stats;
RGWBucket bucket;
-
ret = bucket.init(driver, op_state, y, dpp);
- if (ret < 0)
+ if (ret < 0) {
return ret;
+ }
Formatter *formatter = flusher.get_formatter();
flusher.start(0);
formatter->open_object_section("bucket_check");
- ret = bucket.check_bad_index_multipart(op_state, flusher, dpp, y);
- if (ret < 0)
- return ret;
+ auto rados_store = dynamic_cast<rgw::sal::RadosStore*>(driver);
+ if (!rados_store) {
+ ldpp_dout(dpp, 0) << "WARNING: couldn't access a RadosStore, "
+ "so skipping bad incomplete multipart check" << dendl;
+ } else {
+ ret = bucket.check_bad_index_multipart(rados_store, op_state, flusher, dpp, y);
+ if (ret < 0) {
+ return ret;
+ }
+ }
if (op_state.will_check_objects()) {
ret = bucket.check_object_index(dpp, op_state, flusher, y);
- if (ret < 0)
+ if (ret < 0) {
return ret;
+ }
}
ret = bucket.check_index(dpp, op_state, existing_stats, calculated_stats);
- if (ret < 0)
+ if (ret < 0) {
return ret;
+ }
dump_index_check(existing_stats, calculated_stats, formatter);