]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: batch and process bucket instances
authorAbhishek Lekshmanan <alekshmanan@suse.com>
Wed, 17 Oct 2018 22:44:51 +0000 (00:44 +0200)
committerAbhishek Lekshmanan <abhishek@suse.com>
Thu, 29 Nov 2018 13:29:04 +0000 (14:29 +0100)
Sort through and batch bucket instances so that multiple calls to reading
current bucket info and locking can be avoided. For the most trivial case when
the bucket is already deleted we exit early with all the stale instances. When
the bucket reshard is in progress we only process the stale entries with status
done, if the bucket is available for locking then we lock down and mark the
other instances as well.

Signed-off-by: Abhishek Lekshmanan <abhishek@suse.com>
(cherry picked from commit fb9c0497621fce34283094cfe260480f9122573f)

 Conflicts:
src/rgw/rgw_bucket.cc
Get rid of the following c++17isms:
- split_tenant auto return type -> trailing return type
- tuple destructuring bind for split tenant with std::tie

src/rgw/rgw_bucket.cc

index c6e736832ab7e6429abd5e8ecf0f7beb944a27e8..58447eee197688c537cef79bda23e020f8a57d4b 100644 (file)
@@ -13,6 +13,7 @@
 #include "common/errno.h"
 #include "common/ceph_json.h"
 #include "common/backport14.h"
+#include "include/scope_guard.h"
 #include "rgw_rados.h"
 #include "rgw_acl.h"
 #include "rgw_acl_s3.h"
@@ -26,7 +27,7 @@
 #include "include/rados/librados.hpp"
 // until everything is moved from rgw_common
 #include "rgw_common.h"
-
+#include "rgw_reshard.h"
 #include "cls/user/cls_user_types.h"
 
 #define dout_context g_ceph_context
@@ -1654,98 +1655,141 @@ int RGWBucketAdminOp::set_quota(RGWRados *store, RGWBucketAdminOpState& op_state
   return bucket.set_quota(op_state);
 }
 
-inline std::string bucket_instance_name(const std::string&bucket, const std::string& bucket_id)
-{
-  return bucket + ":" + bucket_id;
-}
-
 static int purge_bucket_instance(RGWRados *store, const RGWBucketInfo& bucket_info)
 {
   int max_shards = (bucket_info.num_shards > 0 ? bucket_info.num_shards : 1);
   for (int i = 0; i < max_shards; i++) {
     RGWRados::BucketShard bs(store);
     int shard_id = (bucket_info.num_shards > 0  ? i : -1);
-    int ret = bs.init(bucket_info.bucket, shard_id);
+    int ret = bs.init(bucket_info.bucket, shard_id, nullptr);
     if (ret < 0) {
       cerr << "ERROR: bs.init(bucket=" << bucket_info.bucket << ", shard=" << shard_id
            << "): " << cpp_strerror(-ret) << std::endl;
-      return -ret;
+      return ret;
     }
     ret = store->bi_remove(bs);
     if (ret < 0) {
       cerr << "ERROR: failed to remove bucket index object: "
            << cpp_strerror(-ret) << std::endl;
-      return -ret;
+      return ret;
     }
   }
   return 0;
 }
 
-static bool is_stale_instance(RGWRados *store, const std::string& bucket_instance,
-                             RGWBucketInfo& bucket_info)
+inline auto split_tenant(const std::string& bucket_name) ->
+  std::pair<std::string,std::string>
+{
+  auto p = bucket_name.find('/');
+  if(p != std::string::npos) {
+    return std::make_pair(bucket_name.substr(0,p), bucket_name.substr(p+1));
+  }
+  return std::make_pair(std::string(), bucket_name);
+}
+
+using bucket_instance_ls = std::vector<RGWBucketInfo>;
+void get_stale_instances(RGWRados *store, const std::string& bucket_name,
+                         const vector<std::string>& lst,
+                         bucket_instance_ls& stale_instances)
 {
+
   RGWObjectCtx obj_ctx(store);
-  int r = store->get_bucket_instance_info(obj_ctx, bucket_instance,
-                                         bucket_info, nullptr,nullptr);
 
-  if (r < 0){
-    cerr << "Bucket instance is invalid!" << bucket_instance
-        << cpp_strerror(-r) << std::endl;
-    return false;
+  bucket_instance_ls other_instances;
+// first iterate over the entries, and pick up the done buckets; these
+// are guaranteed to be stale
+  for (const auto& bucket_instance : lst){
+    RGWBucketInfo binfo;
+    int r = store->get_bucket_instance_info(obj_ctx, bucket_instance,
+                                            binfo, nullptr,nullptr);
+    if (r < 0){
+      // this can only happen if someone deletes us right when we're processing
+      lderr(store->ctx()) << "Bucket instance is invalid: " << bucket_instance
+                          << cpp_strerror(-r) << dendl;
+      continue;
+    }
+    if (binfo.reshard_status == CLS_RGW_RESHARD_DONE)
+      stale_instances.emplace_back(std::move(binfo));
+    else {
+      other_instances.emplace_back(std::move(binfo));
+    }
   }
 
-  if (bucket_info.reshard_status == CLS_RGW_RESHARD_DONE)
-    return true;
-  else if(bucket_info.reshard_status == CLS_RGW_RESHARD_IN_PROGRESS)
-    return false;
-
+  // Read the cur bucket info, if the bucket doesn't exist we can simply return
+  // all the instances
+  std::string tenant,bucket;
+  std::tie(tenant, bucket) = split_tenant(bucket_name);
   RGWBucketInfo cur_bucket_info;
-  rgw_bucket b;
-  int _;
-  rgw_bucket_parse_bucket_key(store->ctx(), bucket_instance, &b, &_);
-  r = store->get_bucket_info(obj_ctx, b.tenant, b.name, cur_bucket_info, nullptr);
+  int r = store->get_bucket_info(obj_ctx, tenant, bucket, cur_bucket_info, nullptr);
+  if (r < 0) {
+    if (r == -ENOENT) {
+      // bucket doesn't exist, everything is stale then
+      stale_instances.insert(std::end(stale_instances),
+                             std::make_move_iterator(other_instances.begin()),
+                             std::make_move_iterator(other_instances.end()));
+    } else {
+      // all bets are off if we can't read the bucket, just return the sureshot stale instances
+      lderr(store->ctx()) << "error: reading bucket info for bucket: "
+                          << bucket << cpp_strerror(-r) << dendl;
+    }
+    return;
+  }
 
-  if (cur_bucket_info.reshard_status == CLS_RGW_RESHARD_IN_PROGRESS &&
-      cur_bucket_info.new_bucket_instance_id == b.bucket_id)
-    return false;
+  // Don't process further in this round if bucket is resharding
+  if (cur_bucket_info.reshard_status == CLS_RGW_RESHARD_IN_PROGRESS)
+    return;
 
-  RGWBucketEntryPoint ep;
-  r = store->get_bucket_entrypoint_info(obj_ctx, b.tenant,
-                                       b.name, ep, nullptr, nullptr, nullptr);
-  return (ep.bucket.bucket_id != b.bucket_id);
-}
+  other_instances.erase(std::remove_if(other_instances.begin(), other_instances.end(),
+                                       [&cur_bucket_info](const RGWBucketInfo& b){
+                                         return (b.bucket.bucket_id == cur_bucket_info.bucket.bucket_id ||
+                                                 b.bucket.bucket_id == cur_bucket_info.new_bucket_instance_id);
+                                       }),
+                        other_instances.end());
+
+  // check if there are still instances left
+  if (other_instances.empty()) {
+    return;
+  }
 
+  // Now we have a bucket with instances where the reshard status is none, this
+  // usually happens when the reshard process couldn't complete, lockdown the
+  // bucket and walk through these instances to make sure no one else interferes
+  // with these
+  {
+    RGWBucketReshardLock reshard_lock(store, cur_bucket_info, true);
+    r = reshard_lock.lock();
+    if (r < 0) {
+      // most likely bucket is under reshard, return the sureshot stale instances
+      ldout(store->ctx(), 5) << __func__
+                             << "failed to take reshard lock; reshard underway likey" << dendl;
+      return;
+    }
+    auto sg = make_scope_guard([&reshard_lock](){ reshard_lock.unlock();} );
+    // this should be fast enough that we may not need to renew locks and check
+    // exit status?, should we read the values of the instances again?
+    stale_instances.insert(std::end(stale_instances),
+                           std::make_move_iterator(other_instances.begin()),
+                           std::make_move_iterator(other_instances.end()));
+  }
+
+  return;
+}
 
-using bucket_instance_list_t = std::list<std::string>;
 static int process_stale_instances(RGWRados *store, RGWBucketAdminOpState& op_state,
-                                  RGWFormatterFlusher& flusher,
-                                  std::function<void(const RGWBucketInfo&,
-                                                     Formatter *,
-                                                     RGWRados*)> process_f)
+                                   RGWFormatterFlusher& flusher,
+                                   std::function<void(const bucket_instance_ls&,
+                                                      Formatter *,
+                                                      RGWRados*)> process_f)
 {
   std::string marker;
   void *handle;
   Formatter *formatter = flusher.get_formatter();
   static constexpr auto default_max_keys = 1000;
-  int ret;
-
-#if 0
-  const auto bucket_name = op_state.get_bucket_name();
-  if (!bucket_name.empty()){
-    // TODO: implement me, marker needs to be ObjectCursor
-    // findout what can convert an oid into this
-    RGWObjectCtx obj_ctx(store);
-    RGWBucketEntryPoint ep;
-    ret = store->get_bucket_entrypoint_info(obj_ctx, op_state.get_user_id().tenant,
-                                           bucket_name, ep, nullptr, nullptr, nullptr);
-    marker = bucket_name + ":" + ep.bucket.bucket_id;
-  }
-#endif
 
-  ret = store->meta_mgr->list_keys_init("bucket.instance", marker, &handle);
+  int ret = store->meta_mgr->list_keys_init("bucket.instance", marker, &handle);
   if (ret < 0) {
     cerr << "ERROR: can't get key: " << cpp_strerror(-ret) << std::endl;
-    return -ret;
+    return ret;
   }
 
   bool truncated;
@@ -1753,18 +1797,25 @@ static int process_stale_instances(RGWRados *store, RGWBucketAdminOpState& op_st
   formatter->open_array_section("keys");
 
   do {
-    bucket_instance_list_t keys;
+    list<std::string> keys;
 
     ret = store->meta_mgr->list_keys_next(handle, default_max_keys, keys, &truncated);
     if (ret < 0 && ret != -ENOENT) {
       cerr << "ERROR: lists_keys_next(): " << cpp_strerror(-ret) << std::endl;
-      return -ret;
+      return ret;
     } if (ret != -ENOENT) {
-      for (const auto& iter: keys) {
-       RGWBucketInfo bucket_info;
-       if (is_stale_instance(store, iter, bucket_info)){
-         process_f(bucket_info, formatter, store);
-       }
+      // partition the list of buckets by buckets as the listing is un sorted,
+      // since it would minimize the reads to bucket_info
+      std::unordered_map<std::string, std::vector<std::string>> bucket_instance_map;
+      for (auto &key: keys) {
+        auto pos = key.find(':');
+        if(pos != std::string::npos)
+          bucket_instance_map[key.substr(0,pos)].emplace_back(std::move(key));
+      }
+      for (const auto& kv: bucket_instance_map) {
+        bucket_instance_ls stale_lst;
+        get_stale_instances(store, kv.first, kv.second, stale_lst);
+        process_f(stale_lst, formatter, store);
       }
     }
   } while (truncated);
@@ -1775,31 +1826,38 @@ static int process_stale_instances(RGWRados *store, RGWBucketAdminOpState& op_st
 }
 
 int RGWBucketAdminOp::list_stale_instances(RGWRados *store,
-                                          RGWBucketAdminOpState& op_state,
-                                          RGWFormatterFlusher& flusher)
-{
-  auto process_f = [](const RGWBucketInfo& binfo,
-                     Formatter *formatter,
-                     RGWRados*){
-                    formatter->dump_string("key", binfo.bucket.bucket_id);
-                  };
+                                           RGWBucketAdminOpState& op_state,
+                                           RGWFormatterFlusher& flusher)
+{
+  auto process_f = [](const bucket_instance_ls& lst,
+                      Formatter *formatter,
+                      RGWRados*){
+                     for (const auto& binfo: lst)
+                       formatter->dump_string("key", binfo.bucket.get_key());
+                   };
   return process_stale_instances(store, op_state, flusher, process_f);
 }
 
 
 int RGWBucketAdminOp::clear_stale_instances(RGWRados *store,
-                                           RGWBucketAdminOpState& op_state,
-                                           RGWFormatterFlusher& flusher)
-{
-  auto process_f = [](const RGWBucketInfo& binfo,
-                     Formatter *formatter,
-                     RGWRados *store){
-                    int ret = purge_bucket_instance(store, binfo);
-                    formatter->open_object_section("delete_status");
-                    formatter->dump_string("bucket_instance", binfo.bucket.bucket_id);
-                    formatter->dump_int("status", ret);
-                    formatter->close_section();
-                  };
+                                            RGWBucketAdminOpState& op_state,
+                                            RGWFormatterFlusher& flusher)
+{
+  auto process_f = [](const bucket_instance_ls& lst,
+                      Formatter *formatter,
+                      RGWRados *store){
+                     for (const auto &binfo: lst) {
+                       int ret = purge_bucket_instance(store, binfo);
+                       if (ret == 0){
+                         auto md_key = "bucket.instance:" + binfo.bucket.get_key();
+                         ret = store->meta_mgr->remove(md_key);
+                       }
+                       formatter->open_object_section("delete_status");
+                       formatter->dump_string("bucket_instance", binfo.bucket.get_key());
+                       formatter->dump_int("status", -ret);
+                       formatter->close_section();
+                     }
+                   };
 
   return process_stale_instances(store, op_state, flusher, process_f);
 }