From: J. Eric Ivancich Date: Wed, 29 May 2024 18:19:25 +0000 (-0400) Subject: rgw: track initiator of reshard queue entries X-Git-Tag: testing/wip-vshankar-testing-20240604.051731-debug~12^2 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=9302fbb3f5416871c1978af5d45f3bf568c2c190;p=ceph-ci.git rgw: track initiator of reshard queue entries The logic for managing the reshard queue (log) can vary depending on whether the entry was added by an admin or by dynamic resharding. For example, if it's a reshard reduction, dynamic resharding won't overwrite the queue entry so as not to disrupt the reduction wait period. On the other hand, and admin should be able to overwrite the entry at will. So we now track the initiator of each entry on the queue. This adds another field to that at rest data structure, and it updates the logic to make use of it. Signed-off-by: J. Eric Ivancich --- diff --git a/src/cls/rgw/cls_rgw_types.cc b/src/cls/rgw/cls_rgw_types.cc index 05d48d734fc..ccf92c3765f 100644 --- a/src/cls/rgw/cls_rgw_types.cc +++ b/src/cls/rgw/cls_rgw_types.cc @@ -814,6 +814,19 @@ void rgw_usage_log_entry::generate_test_instances(list &o o.push_back(new rgw_usage_log_entry); } +std::string to_string(cls_rgw_reshard_initiator i) { + switch (i) { + case cls_rgw_reshard_initiator::Unknown: + return "unknown"; + case cls_rgw_reshard_initiator::Admin: + return "administrator"; + case cls_rgw_reshard_initiator::Dynamic: + return "dynamic resharding"; + default: + return "error"; + } +} + void cls_rgw_reshard_entry::generate_key(const string& tenant, const string& bucket_name, string *key) { *key = tenant + ":" + bucket_name; @@ -827,12 +840,13 @@ void cls_rgw_reshard_entry::get_key(string *key) const void cls_rgw_reshard_entry::dump(Formatter *f) const { utime_t ut(time); - encode_json("time",ut, f); + encode_json("time", ut, f); encode_json("tenant", tenant, f); encode_json("bucket_name", bucket_name, f); encode_json("bucket_id", bucket_id, f); encode_json("old_num_shards", old_num_shards, f); encode_json("tentative_new_num_shards", new_num_shards, f); + encode_json("initiator", to_string(initiator), f); } void cls_rgw_reshard_entry::generate_test_instances(list& ls) diff --git a/src/cls/rgw/cls_rgw_types.h b/src/cls/rgw/cls_rgw_types.h index e38e06c7ced..07f05bc5be4 100644 --- a/src/cls/rgw/cls_rgw_types.h +++ b/src/cls/rgw/cls_rgw_types.h @@ -1325,25 +1325,40 @@ struct cls_rgw_lc_entry { }; WRITE_CLASS_ENCODER(cls_rgw_lc_entry); + +// used to track the initiator of a reshard entry on the reshard queue (log) +enum class cls_rgw_reshard_initiator : uint8_t { + Unknown = 0, + Admin = 1, + Dynamic = 2, +}; +std::string to_string(cls_rgw_reshard_initiator i); +inline std::ostream& operator<<(std::ostream& out, cls_rgw_reshard_initiator i) { + return out << to_string(i); +} + + struct cls_rgw_reshard_entry { ceph::real_time time; std::string tenant; std::string bucket_name; std::string bucket_id; - uint32_t old_num_shards{0}; - uint32_t new_num_shards{0}; + uint32_t old_num_shards {0}; + uint32_t new_num_shards {0}; + cls_rgw_reshard_initiator initiator {cls_rgw_reshard_initiator::Unknown}; cls_rgw_reshard_entry() {} void encode(ceph::buffer::list& bl) const { - ENCODE_START(2, 1, bl); + ENCODE_START(3, 1, bl); encode(time, bl); encode(tenant, bl); encode(bucket_name, bl); encode(bucket_id, bl); encode(old_num_shards, bl); encode(new_num_shards, bl); + encode(initiator, bl); ENCODE_FINISH(bl); } @@ -1359,6 +1374,11 @@ struct cls_rgw_reshard_entry } decode(old_num_shards, bl); decode(new_num_shards, bl); + if (struct_v >= 3) { + decode(initiator, bl); + } else { + initiator = cls_rgw_reshard_initiator::Unknown; + } DECODE_FINISH(bl); } diff --git a/src/rgw/driver/rados/rgw_rados.cc b/src/rgw/driver/rados/rgw_rados.cc index bbfedf8ca9a..ec1bd23f59c 100644 --- a/src/rgw/driver/rados/rgw_rados.cc +++ b/src/rgw/driver/rados/rgw_rados.cc @@ -10424,6 +10424,7 @@ int RGWRados::add_bucket_to_reshard(const DoutPrefixProvider *dpp, entry.bucket_id = bucket_info.bucket.bucket_id; entry.old_num_shards = num_source_shards; entry.new_num_shards = new_num_shards; + entry.initiator = cls_rgw_reshard_initiator::Dynamic; return reshard.add(dpp, entry, y); } diff --git a/src/rgw/driver/rados/rgw_reshard.cc b/src/rgw/driver/rados/rgw_reshard.cc index d57821151fe..3d38aa29aa0 100644 --- a/src/rgw/driver/rados/rgw_reshard.cc +++ b/src/rgw/driver/rados/rgw_reshard.cc @@ -1032,8 +1032,11 @@ int RGWBucketReshard::get_status(const DoutPrefixProvider *dpp, listupdate(dpp, bucket_info, y); + ret = reshard_log->update(dpp, bucket_info, initiator, y); if (ret < 0) { return ret; } @@ -1134,19 +1137,23 @@ int RGWReshard::add(const DoutPrefixProvider *dpp, cls_rgw_reshard_entry& entry, librados::ObjectWriteOperation op; - // if we're reducing, we don't want to overwrite an existing entry - // in order to not interfere with the reshard reduction wait period - const bool create_only = entry.new_num_shards < entry.old_num_shards; + // if this is dynamic resharding and we're reducing, we don't want + // to overwrite an existing entry in order to not interfere with the + // reshard reduction wait period + const bool create_only = + entry.initiator == cls_rgw_reshard_initiator::Dynamic && + entry.new_num_shards < entry.old_num_shards; cls_rgw_reshard_add(op, entry, create_only); int ret = rgw_rados_operate(dpp, store->getRados()->reshard_pool_ctx, logshard_oid, &op, y); if (create_only && ret == -EEXIST) { - ldpp_dout(dpp, 20) << "INFO: did not write reshard queue entry for oid=" << + ldpp_dout(dpp, 20) << + "INFO: did not write reshard queue entry for oid=" << logshard_oid << " tenant=" << entry.tenant << " bucket=" << entry.bucket_name << - ", because it's a reshard reduction and an entry for that bucket already exists" << - dendl; + ", because it's a dynamic reshard reduction and an entry for that " + "bucket already exists" << dendl; // this is not an error so just fall through } else if (ret < 0) { ldpp_dout(dpp, -1) << "ERROR: failed to add entry to reshard log, oid=" << @@ -1157,12 +1164,16 @@ int RGWReshard::add(const DoutPrefixProvider *dpp, cls_rgw_reshard_entry& entry, return 0; } -int RGWReshard::update(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, optional_yield y) +int RGWReshard::update(const DoutPrefixProvider *dpp, + const RGWBucketInfo& bucket_info, + const cls_rgw_reshard_initiator initiator, + optional_yield y) { cls_rgw_reshard_entry entry; entry.bucket_name = bucket_info.bucket.name; entry.bucket_id = bucket_info.bucket.bucket_id; entry.tenant = bucket_info.bucket.tenant; + entry.initiator = initiator; int ret = get(dpp, entry); if (ret < 0) { @@ -1171,7 +1182,7 @@ int RGWReshard::update(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucke ret = add(dpp, entry, y); if (ret < 0) { - ldpp_dout(dpp, 0) << __func__ << ":Error in updating entry bucket " << entry.bucket_name << ": " << + ldpp_dout(dpp, 0) << __func__ << ": Error in updating entry bucket " << entry.bucket_name << ": " << cpp_strerror(-ret) << dendl; } @@ -1354,9 +1365,12 @@ int RGWReshard::process_entry(const cls_rgw_reshard_entry& entry, } } - // if reshard reduction, perform extra sanity checks in part to - // prevent chasing constantly changing entry count - if (entry.new_num_shards < entry.old_num_shards) { + // if *dynamic* reshard reduction, perform extra sanity checks in + // part to prevent chasing constantly changing entry count. If + // *admin*-initiated (or unknown-initiated) reshard reduction, skip + // this step and proceed. + if (entry.initiator == cls_rgw_reshard_initiator::Dynamic && + entry.new_num_shards < entry.old_num_shards) { const bool may_reduce = store->ctx()->_conf.get_val("rgw_dynamic_resharding_may_reduce"); if (! may_reduce) { @@ -1446,8 +1460,8 @@ int RGWReshard::process_entry(const cls_rgw_reshard_entry& entry, RGWBucketReshard br(store, bucket_info, bucket_attrs, nullptr); ReshardFaultInjector f; // no fault injected - ret = br.execute(entry.new_num_shards, f, max_entries, dpp, y, - false, nullptr, nullptr, this); + ret = br.execute(entry.new_num_shards, f, max_entries, entry.initiator, + dpp, y, false, nullptr, nullptr, this); if (ret < 0) { ldpp_dout(dpp, 0) << __func__ << ": Error during resharding bucket " << entry.bucket_name << ":" << @@ -1491,7 +1505,7 @@ int RGWReshard::process_single_logshard(int logshard_num, const DoutPrefixProvid continue; } - for(auto& entry: entries) { // logshard entries + for(auto& entry : entries) { // logshard entries process_entry(entry, max_entries, dpp, y); Clock::time_point now = Clock::now(); diff --git a/src/rgw/driver/rados/rgw_reshard.h b/src/rgw/driver/rados/rgw_reshard.h index 0ff01308a64..818bf216495 100644 --- a/src/rgw/driver/rados/rgw_reshard.h +++ b/src/rgw/driver/rados/rgw_reshard.h @@ -100,7 +100,8 @@ public: const std::map& _bucket_attrs, RGWBucketReshardLock* _outer_reshard_lock); int execute(int num_shards, ReshardFaultInjector& f, - int max_op_entries, const DoutPrefixProvider *dpp, optional_yield y, + int max_op_entries, const cls_rgw_reshard_initiator initiator, + const DoutPrefixProvider *dpp, optional_yield y, bool verbose = false, std::ostream *out = nullptr, ceph::Formatter *formatter = nullptr, RGWReshard *reshard_log = nullptr); @@ -222,7 +223,7 @@ protected: public: RGWReshard(rgw::sal::RadosStore* _store, bool _verbose = false, std::ostream *_out = nullptr, Formatter *_formatter = nullptr); int add(const DoutPrefixProvider *dpp, cls_rgw_reshard_entry& entry, optional_yield y); - int update(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, optional_yield y); + int update(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, const cls_rgw_reshard_initiator initiator, optional_yield y); int get(const DoutPrefixProvider *dpp, cls_rgw_reshard_entry& entry); int remove(const DoutPrefixProvider *dpp, const cls_rgw_reshard_entry& entry, optional_yield y); int list(const DoutPrefixProvider *dpp, int logshard_num, std::string& marker, uint32_t max, std::list& entries, bool *is_truncated); diff --git a/src/rgw/rgw_admin.cc b/src/rgw/rgw_admin.cc index 61f5477d2d5..9ef82752bb3 100644 --- a/src/rgw/rgw_admin.cc +++ b/src/rgw/rgw_admin.cc @@ -8346,7 +8346,9 @@ next: } else if (inject_delay_at) { fault.inject(*inject_delay_at, InjectDelay{inject_delay, dpp()}); } - ret = br.execute(num_shards, fault, max_entries, dpp(), null_yield, + ret = br.execute(num_shards, fault, max_entries, + cls_rgw_reshard_initiator::Admin, + dpp(), null_yield, verbose, &cout, formatter.get()); return -ret; } @@ -8374,6 +8376,7 @@ next: entry.bucket_id = bucket->get_info().bucket.bucket_id; entry.old_num_shards = num_source_shards; entry.new_num_shards = num_shards; + entry.initiator = cls_rgw_reshard_initiator::Admin; return reshard.add(dpp(), entry, null_yield); }