o.push_back(new rgw_usage_log_entry);
}
+std::string to_string(cls_rgw_reshard_initiator i) {
+ switch (i) {
+ case cls_rgw_reshard_initiator::Unknown:
+ return "unknown";
+ case cls_rgw_reshard_initiator::Admin:
+ return "administrator";
+ case cls_rgw_reshard_initiator::Dynamic:
+ return "dynamic resharding";
+ default:
+ return "error";
+ }
+}
+
void cls_rgw_reshard_entry::generate_key(const string& tenant, const string& bucket_name, string *key)
{
*key = tenant + ":" + bucket_name;
void cls_rgw_reshard_entry::dump(Formatter *f) const
{
utime_t ut(time);
- encode_json("time",ut, f);
+ encode_json("time", ut, f);
encode_json("tenant", tenant, f);
encode_json("bucket_name", bucket_name, f);
encode_json("bucket_id", bucket_id, f);
encode_json("old_num_shards", old_num_shards, f);
encode_json("tentative_new_num_shards", new_num_shards, f);
+ encode_json("initiator", to_string(initiator), f);
}
void cls_rgw_reshard_entry::generate_test_instances(list<cls_rgw_reshard_entry*>& ls)
};
WRITE_CLASS_ENCODER(cls_rgw_lc_entry);
+
+// used to track the initiator of a reshard entry on the reshard queue (log)
+enum class cls_rgw_reshard_initiator : uint8_t {
+ Unknown = 0,
+ Admin = 1,
+ Dynamic = 2,
+};
+std::string to_string(cls_rgw_reshard_initiator i);
+inline std::ostream& operator<<(std::ostream& out, cls_rgw_reshard_initiator i) {
+ return out << to_string(i);
+}
+
+
struct cls_rgw_reshard_entry
{
ceph::real_time time;
std::string tenant;
std::string bucket_name;
std::string bucket_id;
- uint32_t old_num_shards{0};
- uint32_t new_num_shards{0};
+ uint32_t old_num_shards {0};
+ uint32_t new_num_shards {0};
+ cls_rgw_reshard_initiator initiator {cls_rgw_reshard_initiator::Unknown};
cls_rgw_reshard_entry() {}
void encode(ceph::buffer::list& bl) const {
- ENCODE_START(2, 1, bl);
+ ENCODE_START(3, 1, bl);
encode(time, bl);
encode(tenant, bl);
encode(bucket_name, bl);
encode(bucket_id, bl);
encode(old_num_shards, bl);
encode(new_num_shards, bl);
+ encode(initiator, bl);
ENCODE_FINISH(bl);
}
}
decode(old_num_shards, bl);
decode(new_num_shards, bl);
+ if (struct_v >= 3) {
+ decode(initiator, bl);
+ } else {
+ initiator = cls_rgw_reshard_initiator::Unknown;
+ }
DECODE_FINISH(bl);
}
entry.bucket_id = bucket_info.bucket.bucket_id;
entry.old_num_shards = num_source_shards;
entry.new_num_shards = new_num_shards;
+ entry.initiator = cls_rgw_reshard_initiator::Dynamic;
return reshard.add(dpp, entry, y);
}
int RGWBucketReshard::execute(int num_shards,
ReshardFaultInjector& fault,
int max_op_entries,
- const DoutPrefixProvider *dpp, optional_yield y,
- bool verbose, ostream *out,
+ const cls_rgw_reshard_initiator initiator,
+ const DoutPrefixProvider *dpp,
+ optional_yield y,
+ bool verbose,
+ ostream *out,
Formatter *formatter,
RGWReshard* reshard_log)
{
auto unlock = make_scope_guard([this] { reshard_lock.unlock(); });
if (reshard_log) {
- ret = reshard_log->update(dpp, bucket_info, y);
+ ret = reshard_log->update(dpp, bucket_info, initiator, y);
if (ret < 0) {
return ret;
}
librados::ObjectWriteOperation op;
- // if we're reducing, we don't want to overwrite an existing entry
- // in order to not interfere with the reshard reduction wait period
- const bool create_only = entry.new_num_shards < entry.old_num_shards;
+ // if this is dynamic resharding and we're reducing, we don't want
+ // to overwrite an existing entry in order to not interfere with the
+ // reshard reduction wait period
+ const bool create_only =
+ entry.initiator == cls_rgw_reshard_initiator::Dynamic &&
+ entry.new_num_shards < entry.old_num_shards;
cls_rgw_reshard_add(op, entry, create_only);
int ret = rgw_rados_operate(dpp, store->getRados()->reshard_pool_ctx, logshard_oid, &op, y);
if (create_only && ret == -EEXIST) {
- ldpp_dout(dpp, 20) << "INFO: did not write reshard queue entry for oid=" <<
+ ldpp_dout(dpp, 20) <<
+ "INFO: did not write reshard queue entry for oid=" <<
logshard_oid << " tenant=" << entry.tenant << " bucket=" <<
entry.bucket_name <<
- ", because it's a reshard reduction and an entry for that bucket already exists" <<
- dendl;
+ ", because it's a dynamic reshard reduction and an entry for that "
+ "bucket already exists" << dendl;
// this is not an error so just fall through
} else if (ret < 0) {
ldpp_dout(dpp, -1) << "ERROR: failed to add entry to reshard log, oid=" <<
return 0;
}
-int RGWReshard::update(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, optional_yield y)
+int RGWReshard::update(const DoutPrefixProvider *dpp,
+ const RGWBucketInfo& bucket_info,
+ const cls_rgw_reshard_initiator initiator,
+ optional_yield y)
{
cls_rgw_reshard_entry entry;
entry.bucket_name = bucket_info.bucket.name;
entry.bucket_id = bucket_info.bucket.bucket_id;
entry.tenant = bucket_info.bucket.tenant;
+ entry.initiator = initiator;
int ret = get(dpp, entry);
if (ret < 0) {
ret = add(dpp, entry, y);
if (ret < 0) {
- ldpp_dout(dpp, 0) << __func__ << ":Error in updating entry bucket " << entry.bucket_name << ": " <<
+ ldpp_dout(dpp, 0) << __func__ << ": Error in updating entry bucket " << entry.bucket_name << ": " <<
cpp_strerror(-ret) << dendl;
}
}
}
- // if reshard reduction, perform extra sanity checks in part to
- // prevent chasing constantly changing entry count
- if (entry.new_num_shards < entry.old_num_shards) {
+ // if *dynamic* reshard reduction, perform extra sanity checks in
+ // part to prevent chasing constantly changing entry count. If
+ // *admin*-initiated (or unknown-initiated) reshard reduction, skip
+ // this step and proceed.
+ if (entry.initiator == cls_rgw_reshard_initiator::Dynamic &&
+ entry.new_num_shards < entry.old_num_shards) {
const bool may_reduce =
store->ctx()->_conf.get_val<bool>("rgw_dynamic_resharding_may_reduce");
if (! may_reduce) {
RGWBucketReshard br(store, bucket_info, bucket_attrs, nullptr);
ReshardFaultInjector f; // no fault injected
- ret = br.execute(entry.new_num_shards, f, max_entries, dpp, y,
- false, nullptr, nullptr, this);
+ ret = br.execute(entry.new_num_shards, f, max_entries, entry.initiator,
+ dpp, y, false, nullptr, nullptr, this);
if (ret < 0) {
ldpp_dout(dpp, 0) << __func__ <<
": Error during resharding bucket " << entry.bucket_name << ":" <<
continue;
}
- for(auto& entry: entries) { // logshard entries
+ for(auto& entry : entries) { // logshard entries
process_entry(entry, max_entries, dpp, y);
Clock::time_point now = Clock::now();
const std::map<std::string, bufferlist>& _bucket_attrs,
RGWBucketReshardLock* _outer_reshard_lock);
int execute(int num_shards, ReshardFaultInjector& f,
- int max_op_entries, const DoutPrefixProvider *dpp, optional_yield y,
+ int max_op_entries, const cls_rgw_reshard_initiator initiator,
+ const DoutPrefixProvider *dpp, optional_yield y,
bool verbose = false, std::ostream *out = nullptr,
ceph::Formatter *formatter = nullptr,
RGWReshard *reshard_log = nullptr);
public:
RGWReshard(rgw::sal::RadosStore* _store, bool _verbose = false, std::ostream *_out = nullptr, Formatter *_formatter = nullptr);
int add(const DoutPrefixProvider *dpp, cls_rgw_reshard_entry& entry, optional_yield y);
- int update(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, optional_yield y);
+ int update(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, const cls_rgw_reshard_initiator initiator, optional_yield y);
int get(const DoutPrefixProvider *dpp, cls_rgw_reshard_entry& entry);
int remove(const DoutPrefixProvider *dpp, const cls_rgw_reshard_entry& entry, optional_yield y);
int list(const DoutPrefixProvider *dpp, int logshard_num, std::string& marker, uint32_t max, std::list<cls_rgw_reshard_entry>& entries, bool *is_truncated);
} else if (inject_delay_at) {
fault.inject(*inject_delay_at, InjectDelay{inject_delay, dpp()});
}
- ret = br.execute(num_shards, fault, max_entries, dpp(), null_yield,
+ ret = br.execute(num_shards, fault, max_entries,
+ cls_rgw_reshard_initiator::Admin,
+ dpp(), null_yield,
verbose, &cout, formatter.get());
return -ret;
}
entry.bucket_id = bucket->get_info().bucket.bucket_id;
entry.old_num_shards = num_source_shards;
entry.new_num_shards = num_shards;
+ entry.initiator = cls_rgw_reshard_initiator::Admin;
return reshard.add(dpp(), entry, null_yield);
}