]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: make incomplete multipart upload part of bucket check efficient 57083/head
authorJ. Eric Ivancich <ivancich@redhat.com>
Wed, 24 Apr 2024 22:05:33 +0000 (18:05 -0400)
committerJ. Eric Ivancich <ivancich@redhat.com>
Tue, 30 Apr 2024 16:01:13 +0000 (12:01 -0400)
Previously the incomplete multipart portion of bucket check would list
all entries in the _multipart_ namespace across all shards and then
analyze them in memory before taking further action.

Since all index entries for a given multipart upload are all on the
same shard by design, we can work on this asynchronously shard by
shard. Furthermore since all entries for a given multipart upload are
sequential in the bucket index, we can use a small window to analyze
each of the uploads.

This should make the operation quicker and use much less memory in the
worst cases.

Signed-off-by: J. Eric Ivancich <ivancich@redhat.com>
src/rgw/driver/rados/rgw_bucket.cc
src/rgw/driver/rados/rgw_bucket.h

index c51e61a2755dc2176544e110d405e478bb1d6d8d..7cf6fbd56f1a2ad39e28ca3f6543bde67fc68638 100644 (file)
@@ -85,10 +85,10 @@ static void parse_bucket(const string& bucket,
   }
 }
 
-static void dump_mulipart_index_results(list<rgw_obj_index_key>& objs_to_unlink,
-        Formatter *f)
+static void dump_multipart_index_results(std::list<rgw_obj_index_key>& objs,
+                                        Formatter *f)
 {
-  for (const auto& o : objs_to_unlink) {
+  for (const auto& o : objs) {
     f->dump_string("object",  o.name);
   }
 }
@@ -317,100 +317,236 @@ static void dump_index_check(map<RGWObjCategory, RGWStorageStats> existing_stats
   formatter->close_section();
 }
 
-int RGWBucket::check_bad_index_multipart(RGWBucketAdminOpState& op_state,
-                                        RGWFormatterFlusher& flusher,
-                                        const DoutPrefixProvider *dpp, optional_yield y,
-                                        std::string *err_msg)
+
+/**
+ * Looks for incomplete and damaged multipart uploads on a single
+ * shard. While the parts are being uploaded, entries are kept in the
+ * bucket index to track the components of the upload. There is one
+ * ".meta" entry and the part entries, all with the same prefix in
+ * their keys. If we find the part entries but not their corresponding
+ * shared ".meta" entry we can safely remove the part entries from the
+ * index shard.
+ *
+ * We take advantage of the fact that since all entries for the same
+ * upload have the same prefix, they're sequential in the index, with
+ * the ".meta" coming last in the sequence. So we only need a small
+ * window to track entries until either their ".meta" does or does not
+ * come up.
+ */
+static int check_bad_index_multipart(rgw::sal::RadosStore* const rados_store,
+                                    rgw::sal::Bucket* const bucket,
+                                    const DoutPrefixProvider *dpp,
+                                    RGWBucketAdminOpState& op_state,
+                                    RGWFormatterFlusher& flusher,
+                                    const int shard,
+                                    optional_yield y)
 {
+  RGWRados* store = rados_store->getRados();
+  RGWRados::BucketShard bs(store);
+
   const bool fix_index = op_state.will_fix_index();
 
-  bucket = op_state.get_bucket()->clone();
+  int ret = bs.init(dpp,
+                   bucket->get_info(),
+                   bucket->get_info().layout.current_index, shard, y);
+  if (ret < 0) {
+    ldpp_dout(dpp, -1) << "ERROR bs.init(bucket=" << bucket << "): " <<
+      cpp_strerror(-ret) << dendl;
+    return ret;
+  }
 
-  rgw::sal::Bucket::ListParams params;
-  params.list_versions = true;
-  params.ns = RGW_OBJ_NS_MULTIPART;
+  std::string marker = rgw_obj_key(std::string(),
+                                  std::string(),
+                                  RGW_OBJ_NS_MULTIPART).get_index_key_name();
+  bool is_truncated = true;
+  std::list<rgw_cls_bi_entry> entries_read;
 
-  std::map<std::string, bool> meta_objs;
-  std::map<rgw_obj_index_key, std::string> all_objs;
-  bool is_truncated;
-  do {
-    rgw::sal::Bucket::ListResults results;
-    int r = bucket->list(dpp, params, listing_max_entries, results, y);
-    if (r < 0) {
-      set_err_msg(err_msg, "failed to list objects in bucket=" + bucket->get_name() +
-              " err=" +  cpp_strerror(-r));
+  // holds part entries w/o ".meta"
+  std::list<rgw_obj_index_key> entries_to_unlink;
 
-      return r;
-    }
-    is_truncated = results.is_truncated;
+  // holds entries pending finding of ".meta"
+  std::list<rgw_obj_index_key> entries_window;
 
-    for (const auto& o : results.objs) {
-      rgw_obj_index_key key = o.key;
-      rgw_obj obj(bucket->get_key(), key);
-      std::string oid = obj.get_oid();
+  // tracks whether on same multipart upload or not
+  std::string prev_entry_prefix;
+  do {
+    entries_read.clear();
+    ret = store->bi_list(bs, "", marker, -1,
+                        &entries_read, &is_truncated, y);
+    if (ret < 0) {
+      ldpp_dout(dpp, -1) << "ERROR bi_list(): " << cpp_strerror(-ret) <<
+       dendl;
+      break;
+    }
 
-      int pos = oid.find_last_of('.');
-      if (pos < 0) {
-        /* obj has no suffix */
-        all_objs[key] = oid;
-      } else {
-        /* obj has suffix */
-       std::string name = oid.substr(0, pos);
-       std::string suffix = oid.substr(pos + 1);
+    for (const auto& entry : entries_read) {
+      marker = entry.idx;
 
-        if (suffix.compare("meta") == 0) {
-          meta_objs[name] = true;
-        } else {
-          all_objs[key] = name;
-        }
+      rgw_obj_key obj_key;
+      bool parsed = rgw_obj_key::parse_raw_oid(entry.idx, &obj_key);
+      if (!parsed) {
+       ldpp_dout(dpp, 0) <<
+         "WARNING: could not parse index entry; ignoring; key=" <<
+         entry.idx << dendl;
+       continue;
       }
-    }
-  } while (is_truncated);
+      const std::string& name = obj_key.name;
+      const std::string& ns = obj_key.ns;
 
-  std::list<rgw_obj_index_key> objs_to_unlink;
-  Formatter *f =  flusher.get_formatter();
+      // when we're out of the multipart namespace, we're done
+      if (entry.type != BIIndexType::Plain || ns != RGW_OBJ_NS_MULTIPART) {
+        is_truncated = false;
+        break;
+      }
 
-  f->open_array_section("invalid_multipart_entries");
+      auto period = name.rfind(".");
+      if (period == std::string::npos) {
+       ldpp_dout(dpp, 0) <<
+         "WARNING: index entry in multipart namespace does not contain"
+         " suffix indicator ('.'); ignoring; key=" <<
+         entry.idx << dendl;
+       continue;
+      }
 
-  for (const auto& o : all_objs) {
-    const std::string& name = o.second;
-    if (meta_objs.find(name) == meta_objs.end()) {
-      objs_to_unlink.push_back(o.first);
-    }
+      const std::string entry_prefix = name.substr(0, period);
+      const std::string entry_suffix = name.substr(1 + period);
+
+      // the entries for a given multipart upload will appear
+      // sequentially with the ".meta" will being the last. So we'll
+      // cache the entries until we either find the "meta" entry or
+      // switch to a different upload
+      if (entry_suffix == "meta") {
+       if (entry_prefix != prev_entry_prefix) {
+         entries_to_unlink.insert(entries_to_unlink.end(),
+                                  entries_window.cbegin(),
+                                  entries_window.cend());
+       }
 
-    if (objs_to_unlink.size() > listing_max_entries) {
-      if (fix_index) {
-       // note: under rados this removes directly from rados index objects
-       int r = bucket->remove_objs_from_index(dpp, objs_to_unlink);
-       if (r < 0) {
-         set_err_msg(err_msg, "ERROR: remove_obj_from_index() returned error: " +
-                     cpp_strerror(-r));
-         return r;
+       // either way start over
+       entries_window.clear();
+       prev_entry_prefix.clear();
+      } else {
+       if (entry_prefix != prev_entry_prefix) {
+         entries_to_unlink.insert(entries_to_unlink.end(),
+                                  entries_window.cbegin(),
+                                  entries_window.cend());
+         entries_window.clear();
+         prev_entry_prefix = entry_prefix;
        }
+
+       // create an rgw_obj_index_key to store in window
+       rgw_obj_index_key obj_index_key;
+       obj_key.get_index_key(&obj_index_key);
+       entries_window.push_back(obj_index_key);
       }
 
-      dump_mulipart_index_results(objs_to_unlink, f);
-      flusher.flush();
-      objs_to_unlink.clear();
-    }
-  }
+      // check if this is a good point for intermediate index clean-up
+      if (entries_to_unlink.size() >= listing_max_entries) {
+       dump_multipart_index_results(entries_to_unlink,
+                                    flusher.get_formatter());
+       if (fix_index) {
+         store->remove_objs_from_index(dpp, bucket->get_info(),
+                                       entries_to_unlink);
+       }
+       entries_to_unlink.clear();
+      }
+    } // for
+  } while (is_truncated);
 
-  if (fix_index) {
-    // note: under rados this removes directly from rados index objects
-    int r = bucket->remove_objs_from_index(dpp, objs_to_unlink);
-    if (r < 0) {
-      set_err_msg(err_msg, "ERROR: remove_obj_from_index() returned error: " +
-              cpp_strerror(-r));
+  // any entries left over at the end can be unlinked
+  entries_to_unlink.insert(entries_to_unlink.end(),
+                          entries_window.cbegin(),
+                          entries_window.cend());
+  entries_window.clear();
 
-      return r;
+  if (! entries_to_unlink.empty()) {
+    dump_multipart_index_results(entries_to_unlink,
+                                flusher.get_formatter());
+    if (fix_index) {
+      store->remove_objs_from_index(dpp, bucket->get_info(),
+                                   entries_to_unlink);
     }
+    entries_to_unlink.clear();
   }
 
-  dump_mulipart_index_results(objs_to_unlink, f);
-  f->close_section();
   flusher.flush();
 
   return 0;
+} // static ::check_bad_index_multipart
+
+
+/**
+ * Checks for damaged incomplete multipart uploads in a bucket
+ * index. Since all entries for a given multipart upload end up on the
+ * same shard (by design), we spawn a set of co-routines, each one
+ * working shard by shard until all work is complete.
+ *
+ * TODO: This function takes optional_yield so there's an expectation
+ * that it can run asynchronously, but io_context::run() is
+ * synchronous. This is fine for radosgw-admin, but we also serve this
+ * 'bucket check' operation over the /admin/bucket api, so we'll want
+ * to address this in the future.
+ */
+int RGWBucket::check_bad_index_multipart(rgw::sal::RadosStore* const rados_store,
+                                        RGWBucketAdminOpState& op_state,
+                                        RGWFormatterFlusher& flusher,
+                                        const DoutPrefixProvider *dpp,
+                                        optional_yield y,
+                                        std::string* err_msg)
+{
+  const RGWBucketInfo& bucket_info = get_bucket_info();
+
+  Formatter* formatter = flusher.get_formatter();
+  formatter->open_array_section("invalid_multipart_entries");
+
+  const auto& index_layout = bucket_info.layout.current_index.layout;
+  if (index_layout.type != rgw::BucketIndexType::Normal) {
+    ldpp_dout(dpp, 0) << "ERROR: cannot check bucket indices with layouts of type " <<
+      current_layout_desc(bucket_info.layout) <<
+      " for bad multipart entries" << dendl;
+    return -EINVAL;
+  }
+  const int num_shards = rgw::num_shards(index_layout.normal);
+  int next_shard = 0;
+
+  boost::asio::io_context context;
+  const int max_aio = std::max(1, op_state.get_max_aio());
+  int any_error = 0; // first error encountered if any
+  for (int i = 0; i < max_aio; i++) {
+    spawn::spawn(context, [&](spawn::yield_context yield) {
+      while (true) {
+        const int shard = next_shard++;
+        if (shard >= num_shards) {
+          return;
+        }
+
+        optional_yield y(context, yield);
+
+        int r = ::check_bad_index_multipart(rados_store, &*bucket, dpp,
+                                           op_state, flusher, shard, y);
+        if (r < 0) {
+          ldpp_dout(dpp, -1) << "WARNING: error processing shard " << shard <<
+            " check_bad_index_multipart(): " << r << "; skipping" << dendl;
+         if (!any_error) {
+           // record first error encountered, but continue
+           any_error = r;
+         }
+        }
+      } // while
+    });
+  } // for
+
+  try {
+    context.run();
+  } catch (const std::system_error& e) {
+    formatter->close_section();
+    *err_msg = e.what();
+    return -e.code().value();
+  }
+
+  formatter->close_section();
+
+  return any_error;
 }
 
 int RGWBucket::check_object_index(const DoutPrefixProvider *dpp, 
@@ -561,6 +697,13 @@ static int check_index_olh(rgw::sal::RadosStore* const rados_store,
 /**
  * Spawns separate coroutines to check each bucket shard for leftover
  * olh entries (and remove them if op_state.fix_index is true).
+ *
+ * TODO: Currently this is synchronous as it uses
+ * io_context::run(). Allow this to run asynchronously by receiving an
+ * optional_yield parameter and making other adjustments.  Synchronous
+ * is fine for radosgw-admin, but we also serve this 'bucket check'
+ * operation over the /admin/bucket api, so we'll want to address
+ * this.
  */
 int RGWBucket::check_index_olh(rgw::sal::RadosStore* const rados_store,
                                const DoutPrefixProvider *dpp,
@@ -578,7 +721,14 @@ int RGWBucket::check_index_olh(rgw::sal::RadosStore* const rados_store,
     formatter->open_array_section("");
   }
 
-  const int max_shards = rgw::num_shards(bucket_info.layout.current_index);
+  const auto& index_layout = bucket_info.layout.current_index.layout;
+  if (index_layout.type != rgw::BucketIndexType::Normal) {
+    ldpp_dout(dpp, 0) << "ERROR: cannot check bucket indices with layouts of type " <<
+      current_layout_desc(bucket_info.layout) <<
+      " for bad OLH entries" << dendl;
+    return -EINVAL;
+  }
+  const int max_shards = rgw::num_shards(index_layout.normal);
   std::string verb = op_state.will_fix_index() ? "removed" : "found";
   uint64_t count_out = 0;
   
@@ -1196,38 +1346,49 @@ int RGWBucketAdminOp::check_index_unlinked(rgw::sal::RadosStore* store,
   return 0;
 }
 
-int RGWBucketAdminOp::check_index(rgw::sal::Driver* driver, RGWBucketAdminOpState& op_state,
-                  RGWFormatterFlusher& flusher, optional_yield y, const DoutPrefixProvider *dpp)
+int RGWBucketAdminOp::check_index(rgw::sal::Driver* driver,
+                                 RGWBucketAdminOpState& op_state,
+                                 RGWFormatterFlusher& flusher,
+                                 optional_yield y,
+                                 const DoutPrefixProvider* dpp)
 {
   int ret;
-  map<RGWObjCategory, RGWStorageStats> existing_stats;
-  map<RGWObjCategory, RGWStorageStats> calculated_stats;
-
+  std::map<RGWObjCategory, RGWStorageStats> existing_stats;
+  std::map<RGWObjCategory, RGWStorageStats> calculated_stats;
 
   RGWBucket bucket;
-
   ret = bucket.init(driver, op_state, y, dpp);
-  if (ret < 0)
+  if (ret < 0) {
     return ret;
+  }
 
   Formatter *formatter = flusher.get_formatter();
   flusher.start(0);
 
   formatter->open_object_section("bucket_check");
 
-  ret = bucket.check_bad_index_multipart(op_state, flusher, dpp, y);
-  if (ret < 0)
-    return ret;
+  auto rados_store = dynamic_cast<rgw::sal::RadosStore*>(driver);
+  if (!rados_store) {
+    ldpp_dout(dpp, 0) << "WARNING: couldn't access a RadosStore, "
+      "so skipping bad incomplete multipart check" << dendl;
+  } else {
+    ret = bucket.check_bad_index_multipart(rados_store, op_state, flusher, dpp, y);
+    if (ret < 0) {
+      return ret;
+    }
+  }
 
   if (op_state.will_check_objects()) {
     ret = bucket.check_object_index(dpp, op_state, flusher, y);
-    if (ret < 0)
+    if (ret < 0) {
       return ret;
+    }
   }
 
   ret = bucket.check_index(dpp, op_state, existing_stats, calculated_stats);
-  if (ret < 0)
+  if (ret < 0) {
     return ret;
+  }
 
   dump_index_check(existing_stats, calculated_stats, formatter);
   
index e91c0d7e13953bbac9e3c0c7c04fe2bf6ce46957..6bbfba93df2155dfbb3f801590732d851a3d0b21 100644 (file)
@@ -338,9 +338,12 @@ public:
   int init(rgw::sal::Driver* storage, RGWBucketAdminOpState& op_state, optional_yield y,
              const DoutPrefixProvider *dpp, std::string *err_msg = NULL);
 
-  int check_bad_index_multipart(RGWBucketAdminOpState& op_state,
-              RGWFormatterFlusher& flusher,
-              const DoutPrefixProvider *dpp, optional_yield y, std::string *err_msg = NULL);
+  int check_bad_index_multipart(rgw::sal::RadosStore* const rados_store,
+                               RGWBucketAdminOpState& op_state,
+                               RGWFormatterFlusher& flusher,
+                               const DoutPrefixProvider *dpp,
+                               optional_yield y,
+                               std::string *err_msg = nullptr);
 
   int check_object_index(const DoutPrefixProvider *dpp, 
                          RGWBucketAdminOpState& op_state,