]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
rgw: track initiator of reshard queue entries
authorJ. Eric Ivancich <ivancich@redhat.com>
Wed, 29 May 2024 18:19:25 +0000 (14:19 -0400)
committerJ. Eric Ivancich <ivancich@redhat.com>
Fri, 31 May 2024 21:18:04 +0000 (17:18 -0400)
The logic for managing the reshard queue (log) can vary depending on
whether the entry was added by an admin or by dynamic resharding. For
example, if it's a reshard reduction, dynamic resharding won't
overwrite the queue entry so as not to disrupt the reduction wait
period. On the other hand, and admin should be able to overwrite the
entry at will.

So we now track the initiator of each entry on the queue. This adds
another field to that at rest data structure, and it updates the logic
to make use of it.

Signed-off-by: J. Eric Ivancich <ivancich@redhat.com>
src/cls/rgw/cls_rgw_types.cc
src/cls/rgw/cls_rgw_types.h
src/rgw/driver/rados/rgw_rados.cc
src/rgw/driver/rados/rgw_reshard.cc
src/rgw/driver/rados/rgw_reshard.h
src/rgw/rgw_admin.cc

index 05d48d734fc042602c18a4311c622d5504ffa9c4..ccf92c3765f5b3527f6f434ed33fea79acaa4b64 100644 (file)
@@ -814,6 +814,19 @@ void rgw_usage_log_entry::generate_test_instances(list<rgw_usage_log_entry *> &o
   o.push_back(new rgw_usage_log_entry);
 }
 
+std::string to_string(cls_rgw_reshard_initiator i) {
+  switch (i) {
+  case cls_rgw_reshard_initiator::Unknown:
+    return "unknown";
+  case cls_rgw_reshard_initiator::Admin:
+    return "administrator";
+  case cls_rgw_reshard_initiator::Dynamic:
+    return "dynamic resharding";
+  default:
+    return "error";
+  }
+}
+
 void cls_rgw_reshard_entry::generate_key(const string& tenant, const string& bucket_name, string *key)
 {
   *key = tenant + ":" + bucket_name;
@@ -827,12 +840,13 @@ void cls_rgw_reshard_entry::get_key(string *key) const
 void cls_rgw_reshard_entry::dump(Formatter *f) const
 {
   utime_t ut(time);
-  encode_json("time",ut, f);
+  encode_json("time", ut, f);
   encode_json("tenant", tenant, f);
   encode_json("bucket_name", bucket_name, f);
   encode_json("bucket_id", bucket_id, f);
   encode_json("old_num_shards", old_num_shards, f);
   encode_json("tentative_new_num_shards", new_num_shards, f);
+  encode_json("initiator", to_string(initiator), f);
 }
 
 void cls_rgw_reshard_entry::generate_test_instances(list<cls_rgw_reshard_entry*>& ls)
index e38e06c7ced9a9b2589b7a4d74b229ff7921df02..07f05bc5be420d8585c471944166d21f5f0deeb8 100644 (file)
@@ -1325,25 +1325,40 @@ struct cls_rgw_lc_entry {
 };
 WRITE_CLASS_ENCODER(cls_rgw_lc_entry);
 
+
+// used to track the initiator of a reshard entry on the reshard queue (log)
+enum class cls_rgw_reshard_initiator : uint8_t {
+  Unknown = 0,
+  Admin = 1,
+  Dynamic = 2,
+};
+std::string to_string(cls_rgw_reshard_initiator i);
+inline std::ostream& operator<<(std::ostream& out, cls_rgw_reshard_initiator i) {
+  return out << to_string(i);
+}
+
+
 struct cls_rgw_reshard_entry
 {
   ceph::real_time time;
   std::string tenant;
   std::string bucket_name;
   std::string bucket_id;
-  uint32_t old_num_shards{0};
-  uint32_t new_num_shards{0};
+  uint32_t old_num_shards {0};
+  uint32_t new_num_shards {0};
+  cls_rgw_reshard_initiator initiator {cls_rgw_reshard_initiator::Unknown};
 
   cls_rgw_reshard_entry() {}
 
   void encode(ceph::buffer::list& bl) const {
-    ENCODE_START(2, 1, bl);
+    ENCODE_START(3, 1, bl);
     encode(time, bl);
     encode(tenant, bl);
     encode(bucket_name, bl);
     encode(bucket_id, bl);
     encode(old_num_shards, bl);
     encode(new_num_shards, bl);
+    encode(initiator, bl);
     ENCODE_FINISH(bl);
   }
 
@@ -1359,6 +1374,11 @@ struct cls_rgw_reshard_entry
     }
     decode(old_num_shards, bl);
     decode(new_num_shards, bl);
+    if (struct_v >= 3) {
+      decode(initiator, bl);
+    } else {
+      initiator = cls_rgw_reshard_initiator::Unknown;
+    }
     DECODE_FINISH(bl);
   }
 
index bbfedf8ca9a65bf77ea82f4dbaea33d481bbbb72..ec1bd23f59c895358a3aa35cae48d500e36fa06e 100644 (file)
@@ -10424,6 +10424,7 @@ int RGWRados::add_bucket_to_reshard(const DoutPrefixProvider *dpp,
   entry.bucket_id = bucket_info.bucket.bucket_id;
   entry.old_num_shards = num_source_shards;
   entry.new_num_shards = new_num_shards;
+  entry.initiator = cls_rgw_reshard_initiator::Dynamic;
 
   return reshard.add(dpp, entry, y);
 }
index d57821151fee69a9da20dde1120258c9a3d771e8..3d38aa29aa034489418607b3bcd0bb774d06f8aa 100644 (file)
@@ -1032,8 +1032,11 @@ int RGWBucketReshard::get_status(const DoutPrefixProvider *dpp, list<cls_rgw_buc
 int RGWBucketReshard::execute(int num_shards,
                               ReshardFaultInjector& fault,
                               int max_op_entries,
-                              const DoutPrefixProvider *dpp, optional_yield y,
-                              bool verbose, ostream *out,
+                             const cls_rgw_reshard_initiator initiator,
+                              const DoutPrefixProvider *dpp,
+                             optional_yield y,
+                              bool verbose,
+                             ostream *out,
                               Formatter *formatter,
                               RGWReshard* reshard_log)
 {
@@ -1046,7 +1049,7 @@ int RGWBucketReshard::execute(int num_shards,
   auto unlock = make_scope_guard([this] { reshard_lock.unlock(); });
 
   if (reshard_log) {
-    ret = reshard_log->update(dpp, bucket_info, y);
+    ret = reshard_log->update(dpp, bucket_info, initiator, y);
     if (ret < 0) {
       return ret;
     }
@@ -1134,19 +1137,23 @@ int RGWReshard::add(const DoutPrefixProvider *dpp, cls_rgw_reshard_entry& entry,
 
   librados::ObjectWriteOperation op;
 
-  // if we're reducing, we don't want to overwrite an existing entry
-  // in order to not interfere with the reshard reduction wait period
-  const bool create_only = entry.new_num_shards < entry.old_num_shards;
+  // if this is dynamic resharding and we're reducing, we don't want
+  // to overwrite an existing entry in order to not interfere with the
+  // reshard reduction wait period
+  const bool create_only =
+    entry.initiator == cls_rgw_reshard_initiator::Dynamic &&
+    entry.new_num_shards < entry.old_num_shards;
 
   cls_rgw_reshard_add(op, entry, create_only);
 
   int ret = rgw_rados_operate(dpp, store->getRados()->reshard_pool_ctx, logshard_oid, &op, y);
   if (create_only && ret == -EEXIST) {
-    ldpp_dout(dpp, 20) << "INFO: did not write reshard queue entry for oid=" <<
+    ldpp_dout(dpp, 20) <<
+      "INFO: did not write reshard queue entry for oid=" <<
       logshard_oid << " tenant=" << entry.tenant << " bucket=" <<
       entry.bucket_name <<
-      ", because it's a reshard reduction and an entry for that bucket already exists" <<
-      dendl;
+      ", because it's a dynamic reshard reduction and an entry for that "
+      "bucket already exists" << dendl;
     // this is not an error so just fall through
   } else if (ret < 0) {
     ldpp_dout(dpp, -1) << "ERROR: failed to add entry to reshard log, oid=" <<
@@ -1157,12 +1164,16 @@ int RGWReshard::add(const DoutPrefixProvider *dpp, cls_rgw_reshard_entry& entry,
   return 0;
 }
 
-int RGWReshard::update(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, optional_yield y)
+int RGWReshard::update(const DoutPrefixProvider *dpp,
+                      const RGWBucketInfo& bucket_info,
+                      const cls_rgw_reshard_initiator initiator,
+                      optional_yield y)
 {
   cls_rgw_reshard_entry entry;
   entry.bucket_name = bucket_info.bucket.name;
   entry.bucket_id = bucket_info.bucket.bucket_id;
   entry.tenant = bucket_info.bucket.tenant;
+  entry.initiator = initiator;
 
   int ret = get(dpp, entry);
   if (ret < 0) {
@@ -1171,7 +1182,7 @@ int RGWReshard::update(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucke
 
   ret = add(dpp, entry, y);
   if (ret < 0) {
-    ldpp_dout(dpp, 0) << __func__ << ":Error in updating entry bucket " << entry.bucket_name << ": " <<
+    ldpp_dout(dpp, 0) << __func__ << ": Error in updating entry bucket " << entry.bucket_name << ": " <<
       cpp_strerror(-ret) << dendl;
   }
 
@@ -1354,9 +1365,12 @@ int RGWReshard::process_entry(const cls_rgw_reshard_entry& entry,
     }
   }
 
-  // if reshard reduction, perform extra sanity checks in part to
-  // prevent chasing constantly changing entry count
-  if (entry.new_num_shards < entry.old_num_shards) {
+  // if *dynamic* reshard reduction, perform extra sanity checks in
+  // part to prevent chasing constantly changing entry count. If
+  // *admin*-initiated (or unknown-initiated) reshard reduction, skip
+  // this step and proceed.
+  if (entry.initiator == cls_rgw_reshard_initiator::Dynamic &&
+      entry.new_num_shards < entry.old_num_shards) {
     const bool may_reduce =
       store->ctx()->_conf.get_val<bool>("rgw_dynamic_resharding_may_reduce");
     if (! may_reduce) {
@@ -1446,8 +1460,8 @@ int RGWReshard::process_entry(const cls_rgw_reshard_entry& entry,
   RGWBucketReshard br(store, bucket_info, bucket_attrs, nullptr);
 
   ReshardFaultInjector f; // no fault injected
-  ret = br.execute(entry.new_num_shards, f, max_entries, dpp, y,
-                   false, nullptr, nullptr, this);
+  ret = br.execute(entry.new_num_shards, f, max_entries, entry.initiator,
+                  dpp, y, false, nullptr, nullptr, this);
   if (ret < 0) {
     ldpp_dout(dpp, 0) <<  __func__ <<
         ": Error during resharding bucket " << entry.bucket_name << ":" <<
@@ -1491,7 +1505,7 @@ int RGWReshard::process_single_logshard(int logshard_num, const DoutPrefixProvid
       continue;
     }
 
-    for(auto& entry: entries) { // logshard entries
+    for(auto& entry : entries) { // logshard entries
       process_entry(entry, max_entries, dpp, y);
 
       Clock::time_point now = Clock::now();
index 0ff01308a64b3e12534033e6fc056a633bb0ff00..818bf2164955e1cc5a1a026cff746a2e94b74247 100644 (file)
@@ -100,7 +100,8 @@ public:
                   const std::map<std::string, bufferlist>& _bucket_attrs,
                   RGWBucketReshardLock* _outer_reshard_lock);
   int execute(int num_shards, ReshardFaultInjector& f,
-              int max_op_entries, const DoutPrefixProvider *dpp, optional_yield y,
+              int max_op_entries, const cls_rgw_reshard_initiator initiator,
+             const DoutPrefixProvider *dpp, optional_yield y,
               bool verbose = false, std::ostream *out = nullptr,
               ceph::Formatter *formatter = nullptr,
              RGWReshard *reshard_log = nullptr);
@@ -222,7 +223,7 @@ protected:
 public:
   RGWReshard(rgw::sal::RadosStore* _store, bool _verbose = false, std::ostream *_out = nullptr, Formatter *_formatter = nullptr);
   int add(const DoutPrefixProvider *dpp, cls_rgw_reshard_entry& entry, optional_yield y);
-  int update(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, optional_yield y);
+  int update(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, const cls_rgw_reshard_initiator initiator, optional_yield y);
   int get(const DoutPrefixProvider *dpp, cls_rgw_reshard_entry& entry);
   int remove(const DoutPrefixProvider *dpp, const cls_rgw_reshard_entry& entry, optional_yield y);
   int list(const DoutPrefixProvider *dpp, int logshard_num, std::string& marker, uint32_t max, std::list<cls_rgw_reshard_entry>& entries, bool *is_truncated);
index 61f5477d2d5ba9f59a6469b37fb67738038e5189..9ef82752bb36ec06a1f8996f2f5fb24e4e7c5f78 100644 (file)
@@ -8346,7 +8346,9 @@ next:
     } else if (inject_delay_at) {
       fault.inject(*inject_delay_at, InjectDelay{inject_delay, dpp()});
     }
-    ret = br.execute(num_shards, fault, max_entries, dpp(), null_yield,
+    ret = br.execute(num_shards, fault, max_entries,
+                    cls_rgw_reshard_initiator::Admin,
+                    dpp(), null_yield,
                      verbose, &cout, formatter.get());
     return -ret;
   }
@@ -8374,6 +8376,7 @@ next:
     entry.bucket_id = bucket->get_info().bucket.bucket_id;
     entry.old_num_shards = num_source_shards;
     entry.new_num_shards = num_shards;
+    entry.initiator = cls_rgw_reshard_initiator::Admin;
 
     return reshard.add(dpp(), entry, null_yield);
   }