log.debug('TEST: reshard bucket with abort at do_reshard\n')
test_bucket_reshard(connection, 'abort-at-do-reshard', abort_at='do_reshard')
+ log.debug('TEST: reshard bucket with EIO injected at logrecord_writes\n')
+ test_bucket_reshard(connection, 'error-at-logrecord-writes', error_at='logrecord_writes')
+ log.debug('TEST: reshard bucket with abort at logrecord_writes\n')
+ test_bucket_reshard(connection, 'abort-at-logrecord-writes', abort_at='logrecord_writes')
+
+ log.debug('TEST: reshard bucket with EIO injected at change_reshard_state\n')
+ test_bucket_reshard(connection, 'error-at-change-reshard-state', error_at='change_reshard_state')
+ log.debug('TEST: reshard bucket with ECANCELED injected at change_reshard_state\n')
+ test_bucket_reshard(connection, 'error-at-change-reshard-state', error_at='change_reshard_state', error_code=errno.ECANCELED)
+ log.debug('TEST: reshard bucket with abort at change_reshard_state\n')
+ test_bucket_reshard(connection, 'abort-at-change-reshard-state', abort_at='change_reshard_state')
+
# TESTCASE 'versioning reshard-','bucket', reshard','versioning reshard','succeeds'
log.debug(' test: reshard versioned bucket')
num_shards_expected = get_bucket_stats(VER_BUCKET_NAME).num_shards + 1
return first > second;
}
+/**
+ * return: Plain, Instance, OLH or Invalid
+ */
+BIIndexType bi_type(const string& s, const string& prefix)
+{
+ int ret = bi_entry_type(s.substr(prefix.size()));
+ if (ret < 0) {
+ return BIIndexType::Invalid;
+ } else if (ret == 0) {
+ return BIIndexType::Plain;
+ }
+ return (BIIndexType)ret;
+}
+
static void get_time_key(real_time& ut, string *key)
{
char buf[32];
rgw_cls_bi_entry& entry = op.entry;
- int r = cls_cxx_map_set_val(hctx, entry.idx, &entry.data);
- if (r < 0) {
- CLS_LOG(0, "ERROR: %s: cls_cxx_map_set_val() returned r=%d", __func__, r);
+ if (entry.type == BIIndexType::ReshardDeleted) {
+ int r = cls_cxx_map_remove_key(hctx, entry.idx);
+ if (r < 0) {
+ CLS_LOG(0, "ERROR: %s: cls_cxx_map_remove_key() returned r=%d", __func__, r);
+ return r;
+ }
+ } else {
+ int r = cls_cxx_map_set_val(hctx, entry.idx, &entry.data);
+ if (r < 0) {
+ CLS_LOG(0, "ERROR: %s: cls_cxx_map_set_val() returned r=%d", __func__, r);
+ }
}
return 0;
}
-
/* The plain entries in the bucket index are divided into two regions
* divided by the special entries that begin with 0x80. Those below
* ("Low") are ascii entries. Those above ("High") bring in unicode
return count;
}
+static int reshard_log_list_entries(cls_method_context_t hctx, const string& marker,
+ uint32_t max, list<rgw_cls_bi_entry>& entries, bool *truncated)
+{
+ string start_key, end_key;
+ start_key = BI_PREFIX_CHAR;
+ start_key.append(bucket_index_prefixes[BI_BUCKET_RESHARD_LOG_INDEX]);
+
+ string bi_type_marker = start_key;
+
+ end_key = BI_PREFIX_CHAR;
+ end_key.append(bucket_index_prefixes[BI_BUCKET_RESHARD_LOG_INDEX + 1]);
+
+ if (!marker.empty()) {
+ start_key.append(marker);
+ }
+
+ map<string, bufferlist> keys;
+ int ret = cls_cxx_map_get_vals(hctx, start_key, string(), max, &keys, truncated);
+ CLS_LOG(20, "%s(): start_key=%s keys.size()=%d", __func__, escape_str(start_key).c_str(), (int)keys.size());
+ if (ret < 0) {
+ return ret;
+ }
+
+ map<string, bufferlist>::iterator iter;
+ for (iter = keys.begin(); iter != keys.end(); ++iter) {
+ if (iter->first.compare(end_key) >= 0) {
+ if (truncated) {
+ *truncated = false;
+ }
+ return 0;
+ }
+
+ rgw_cls_bi_entry entry;
+ auto biter = iter->second.cbegin();
+ try {
+ decode(entry, biter);
+ } catch (ceph::buffer::error& err) {
+ CLS_LOG(0, "ERROR: %s: failed to decode buffer for rgw_cls_bi_entry \"%s\"",
+ __func__, escape_str(iter->first).c_str());
+ return -EIO;
+ }
+ if (entry.type != BIIndexType::ReshardDeleted)
+ entry.type = bi_type(iter->first, bi_type_marker);
+
+ CLS_LOG(20, "reshard_log_list_entries key=%s bl.length=%d\n", entry.idx.c_str(), (int)iter->second.length());
+
+ entries.push_back(entry);
+ }
+ return 0;
+}
+
static int check_index(cls_method_context_t hctx,
rgw_bucket_dir_header *existing_header,
rgw_bucket_dir_header *calc_header)
}
-/* Lists all the entries that appear in a bucket index listing.
+/* Lists all the entries that appear in a bucket index listing,
+ * or list all the entries in reshardlog namespace.
*
* It may not be obvious why this function calls three other "segment"
* functions (list_plain_entries (twice), list_instance_entries,
constexpr uint32_t MAX_BI_LIST_ENTRIES = 1000;
const uint32_t max = std::min(op.max, MAX_BI_LIST_ENTRIES);
- CLS_LOG(20, "%s: op.marker=\"%s\", op.name_filter=\"%s\", op.max=%u max=%u",
+ CLS_LOG(20, "%s: op.marker=\"%s\", op.name_filter=\"%s\", op.max=%u max=%u, op.reshardlog=%d",
__func__, escape_str(op.marker).c_str(), escape_str(op.name_filter).c_str(),
- op.max, max);
+ op.max, max, op.reshardlog);
int ret;
uint32_t count = 0;
bool more = false;
rgw_cls_bi_list_ret op_ret;
+ if (op.reshardlog) {
+ ret = reshard_log_list_entries(hctx, op.marker, op.max, op_ret.entries, &op_ret.is_truncated);
+ if (ret < 0)
+ return ret;
+ CLS_LOG(20, "%s: returning %lu entries, is_truncated=%d", __func__, op_ret.entries.size(), op_ret.is_truncated);
+ encode(op_ret, *out);
+ return 0;
+ }
+
ret = list_plain_entries(hctx, op.name_filter, op.marker, max,
&op_ret.entries, &more, PlainEntriesRegion::Low);
if (ret < 0) {
return write_bucket_header(hctx, &header);
}
-
static void usage_record_prefix_by_time(uint64_t epoch, string& key)
{
char buf[32];
return rc;
}
- if (header.resharding()) {
+ if (header.resharding_in_progress()) {
return op.ret_err;
}
return 0;
}
+
int cls_rgw_bi_put(librados::IoCtx& io_ctx, const string oid, const rgw_cls_bi_entry& entry)
{
bufferlist in, out;
*/
int cls_rgw_bi_list(librados::IoCtx& io_ctx, const std::string& oid,
const std::string& name_filter, const std::string& marker, uint32_t max,
- std::list<rgw_cls_bi_entry> *entries, bool *is_truncated)
+ std::list<rgw_cls_bi_entry> *entries, bool *is_truncated, bool reshardlog)
{
bufferlist in, out;
rgw_cls_bi_list_op call;
call.name_filter = name_filter;
call.marker = marker;
call.max = max;
+ call.reshardlog = reshardlog;
encode(call, in);
int r = io_ctx.exec(oid, RGW_CLASS, RGW_BI_LIST, in, out);
if (r < 0)
void cls_rgw_bi_put(librados::ObjectWriteOperation& op, const std::string oid, const rgw_cls_bi_entry& entry);
int cls_rgw_bi_list(librados::IoCtx& io_ctx, const std::string& oid,
const std::string& name, const std::string& marker, uint32_t max,
- std::list<rgw_cls_bi_entry> *entries, bool *is_truncated);
-
+ std::list<rgw_cls_bi_entry> *entries, bool *is_truncated, bool reshardlog = false);
void cls_rgw_bucket_link_olh(librados::ObjectWriteOperation& op,
const cls_rgw_obj_key& key, const ceph::buffer::list& olh_tag,
uint32_t max;
std::string name_filter; // limit result to one object and its instances
std::string marker;
+ bool reshardlog;
- rgw_cls_bi_list_op() : max(0) {}
+ rgw_cls_bi_list_op() : max(0), reshardlog(false) {}
void encode(ceph::buffer::list& bl) const {
- ENCODE_START(1, 1, bl);
+ ENCODE_START(2, 1, bl);
encode(max, bl);
encode(name_filter, bl);
encode(marker, bl);
+ encode(reshardlog, bl);
ENCODE_FINISH(bl);
}
void decode(ceph::buffer::list::const_iterator& bl) {
- DECODE_START(1, bl);
+ DECODE_START(2, bl);
decode(max, bl);
decode(name_filter, bl);
decode(marker, bl);
+ if (struct_v >= 2) {
+ decode(reshardlog, bl);
+ }
DECODE_FINISH(bl);
}
f->dump_unsigned("max", max);
f->dump_string("name_filter", name_filter);
f->dump_string("marker", marker);
+ f->dump_bool("reshardlog", reshardlog);
}
static void generate_test_instances(std::list<rgw_cls_bi_list_op*>& o) {
o.back()->max = 100;
o.back()->name_filter = "name_filter";
o.back()->marker = "marker";
+ o.back()->reshardlog = true;
}
};
WRITE_CLASS_ENCODER(rgw_cls_bi_list_op)
case cls_rgw_reshard_status::NOT_RESHARDING:
out << "NOT_RESHARDING";
break;
+ case cls_rgw_reshard_status::IN_LOGRECORD:
+ out << "IN_LOGRECORD";
+ break;
case cls_rgw_reshard_status::IN_PROGRESS:
out << "IN_PROGRESS";
break;
do {
entries_read.clear();
ret = store->bi_list(bs, "", marker, -1,
- &entries_read, &is_truncated, y);
+ &entries_read, &is_truncated, false, y);
if (ret < 0) {
ldpp_dout(dpp, -1) << "ERROR bi_list(): " << cpp_strerror(-ret) <<
dendl;
*count_out = 0;
do {
entries.clear();
- ret = store->bi_list(bs, "", marker, -1, &entries, &is_truncated, y);
+ ret = store->bi_list(bs, "", marker, -1, &entries, &is_truncated, false, y);
if (ret < 0) {
ldpp_dout(dpp, -1) << "ERROR bi_list(): " << cpp_strerror(-ret) << dendl;
break;
*count_out = 0;
do {
entries.clear();
- ret = store->bi_list(bs, "", marker, -1, &entries, &is_truncated, y);
+ ret = store->bi_list(bs, "", marker, -1, &entries, &is_truncated, false, y);
if (ret < 0) {
ldpp_dout(dpp, -1) << "ERROR bi_list(): " << cpp_strerror(-ret) << dendl;
break;
int RGWRados::bi_list(const DoutPrefixProvider *dpp, rgw_bucket& bucket,
const string& obj_name_filter, const string& marker, uint32_t max,
- list<rgw_cls_bi_entry> *entries, bool *is_truncated, optional_yield y)
+ list<rgw_cls_bi_entry> *entries, bool *is_truncated,
+ bool reshardlog, optional_yield y)
{
rgw_obj obj(bucket, obj_name_filter);
BucketShard bs(this);
}
auto& ref = bs.bucket_obj;
- ret = cls_rgw_bi_list(ref.ioctx, ref.obj.oid, obj_name_filter, marker, max, entries, is_truncated);
+ ret = cls_rgw_bi_list(ref.ioctx, ref.obj.oid, obj_name_filter, marker, max, entries, is_truncated, reshardlog);
if (ret == -ENOENT) {
*is_truncated = false;
}
}
int RGWRados::bi_list(BucketShard& bs, const string& obj_name_filter, const string& marker, uint32_t max,
- list<rgw_cls_bi_entry> *entries, bool *is_truncated, optional_yield y)
+ list<rgw_cls_bi_entry> *entries, bool *is_truncated, bool reshardlog, optional_yield y)
{
auto& ref = bs.bucket_obj;
- int ret = cls_rgw_bi_list(ref.ioctx, ref.obj.oid, obj_name_filter, marker, max, entries, is_truncated);
+ int ret = cls_rgw_bi_list(ref.ioctx, ref.obj.oid, obj_name_filter, marker, max, entries, is_truncated, reshardlog);
if (ret < 0)
return ret;
int RGWRados::bi_list(const DoutPrefixProvider *dpp,
const RGWBucketInfo& bucket_info, int shard_id, const string& obj_name_filter, const string& marker, uint32_t max,
- list<rgw_cls_bi_entry> *entries, bool *is_truncated, optional_yield y)
+ list<rgw_cls_bi_entry> *entries, bool *is_truncated, bool reshardlog, optional_yield y)
{
BucketShard bs(this);
int ret = bs.init(dpp, bucket_info,
return ret;
}
- return bi_list(bs, obj_name_filter, marker, max, entries, is_truncated, y);
+ return bi_list(bs, obj_name_filter, marker, max, entries, is_truncated, reshardlog, y);
}
int RGWRados::bi_remove(const DoutPrefixProvider *dpp, BucketShard& bs)
const std::string& marker,
uint32_t max,
std::list<rgw_cls_bi_entry> *entries,
- bool *is_truncated, optional_yield y);
- int bi_list(BucketShard& bs, const std::string& filter_obj, const std::string& marker, uint32_t max, std::list<rgw_cls_bi_entry> *entries, bool *is_truncated, optional_yield y);
+ bool *is_truncated, bool reshardlog, optional_yield y);
+ int bi_list(BucketShard& bs, const std::string& filter_obj, const std::string& marker, uint32_t max, std::list<rgw_cls_bi_entry> *entries,
+ bool *is_truncated, bool reshardlog, optional_yield y);
int bi_list(const DoutPrefixProvider *dpp, rgw_bucket& bucket, const std::string& obj_name, const std::string& marker, uint32_t max,
- std::list<rgw_cls_bi_entry> *entries, bool *is_truncated, optional_yield y);
+ std::list<rgw_cls_bi_entry> *entries, bool *is_truncated, bool reshardlog, optional_yield y);
int bi_remove(const DoutPrefixProvider *dpp, BucketShard& bs);
int cls_obj_usage_log_add(const DoutPrefixProvider *dpp, const std::string& oid, rgw_usage_log_info& info, optional_yield y);
": shard->wait_all_aio() returned ret=" << ret << dendl;
}
}
+ target_shards.clear();
}
int add_entry(int shard_index,
ret = r;
}
}
- target_shards.clear();
return ret;
}
}; // class BucketReshardManager
// retry in case of racing writes to the bucket instance metadata
static constexpr auto max_retries = 10;
int tries = 0;
+
do {
+
// update resharding state
bucket_info.layout.target_index = target;
- bucket_info.layout.resharding = rgw::BucketReshardState::InProgress;
+ bucket_info.layout.resharding = rgw::BucketReshardState::InLogrecord;
if (ret = fault.check("set_target_layout");
ret == 0) { // no fault injected, write the bucket instance metadata
return ret;
}
+ if (ret = fault.check("logrecord_writes");
+ ret == 0) { // no fault injected, record log with writing to the current index shards
+ ret = set_resharding_status(dpp, store, bucket_info,
+ cls_rgw_reshard_status::IN_LOGRECORD);
+ }
+
+ if (ret < 0) {
+ ldpp_dout(dpp, 0) << "ERROR: " << __func__ << " failed to pause "
+ "writes to the current index: " << cpp_strerror(ret) << dendl;
+ // clean up the target layout (ignore errors)
+ revert_target_layout(store, bucket_info, bucket_attrs, fault, dpp, y);
+ return ret;
+ }
+ return 0;
+} // init_reshard
+
+static int change_reshard_state(rgw::sal::RadosStore* store,
+ RGWBucketInfo& bucket_info,
+ std::map<std::string, bufferlist>& bucket_attrs,
+ ReshardFaultInjector& fault,
+ const DoutPrefixProvider *dpp, optional_yield y)
+{
+ auto prev = bucket_info.layout; // make a copy for cleanup
+ const auto current = prev.current_index;
+
+ // retry in case of racing writes to the bucket instance metadata
+ static constexpr auto max_retries = 10;
+ int tries = 0;
+ int ret = 0;
+ do {
+ // update resharding state
+ bucket_info.layout.resharding = rgw::BucketReshardState::InProgress;
+
+ if (ret = fault.check("change_reshard_state");
+ ret == 0) { // no fault injected, write the bucket instance metadata
+ ret = store->getRados()->put_bucket_instance_info(bucket_info, false,
+ real_time(), &bucket_attrs, dpp, y);
+ } else if (ret == -ECANCELED) {
+ fault.clear(); // clear the fault so a retry can succeed
+ }
+
+ if (ret == -ECANCELED) {
+ // racing write detected, read the latest bucket info and try again
+ int ret2 = store->getRados()->get_bucket_instance_info(
+ bucket_info.bucket, bucket_info,
+ nullptr, &bucket_attrs, y, dpp);
+ if (ret2 < 0) {
+ ldpp_dout(dpp, 0) << "ERROR: " << __func__ << " failed to read "
+ "bucket info: " << cpp_strerror(ret2) << dendl;
+ ret = ret2;
+ break;
+ }
+
+ // check that we're still in the reshard state we started in
+ if (bucket_info.layout.resharding != rgw::BucketReshardState::InLogrecord ||
+ bucket_info.layout.current_index != current) {
+ ldpp_dout(dpp, 1) << "WARNING: " << __func__ << " raced with "
+ "another reshard" << dendl;
+ break;
+ }
+ }
+ ++tries;
+ } while (ret == -ECANCELED && tries < max_retries);
+
+ if (ret < 0) {
+ ldpp_dout(dpp, 0) << "ERROR: " << __func__ << " failed to commit "
+ "target index layout: " << cpp_strerror(ret) << dendl;
+
+ bucket_info.layout = std::move(prev); // restore in-memory layout
+
+ // unblock writes to the current index shard objects
+ int ret2 = set_resharding_status(dpp, store, bucket_info,
+ cls_rgw_reshard_status::NOT_RESHARDING);
+ if (ret2 < 0) {
+ ldpp_dout(dpp, 1) << "WARNING: " << __func__ << " failed to unblock "
+ "writes to current index objects: " << cpp_strerror(ret2) << dendl;
+ // non-fatal error
+ }
+ return ret;
+ }
+
if (ret = fault.check("block_writes");
ret == 0) { // no fault injected, block writes to the current index shards
ret = set_resharding_status(dpp, store, bucket_info,
}
return 0;
-} // init_reshard
+} // change_reshard_state
static int cancel_reshard(rgw::sal::RadosStore* store,
RGWBucketInfo& bucket_info,
return ret;
}
- if (bucket_info.layout.resharding != rgw::BucketReshardState::InProgress) {
+ if (bucket_info.layout.resharding != rgw::BucketReshardState::InProgress ||
+ bucket_info.layout.resharding != rgw::BucketReshardState::InLogrecord) {
ldpp_dout(dpp, -1) << "ERROR: bucket is not resharding" << dendl;
ret = -EINVAL;
} else {
return 0;
}
-
-int RGWBucketReshard::do_reshard(const rgw::bucket_index_layout_generation& current,
- const rgw::bucket_index_layout_generation& target,
- int max_op_entries, // max num to process per op
- bool verbose,
- ostream *out,
- Formatter *formatter,
- const DoutPrefixProvider *dpp, optional_yield y)
+int RGWBucketReshard::reshard_process(const rgw::bucket_index_layout_generation& current,
+ int& max_op_entries,
+ BucketReshardManager& target_shards_mgr,
+ bool verbose_json_out,
+ ostream *out,
+ Formatter *formatter, rgw::BucketReshardState reshard_stage,
+ const DoutPrefixProvider *dpp, optional_yield y)
{
- if (out) {
- (*out) << "tenant: " << bucket_info.bucket.tenant << std::endl;
- (*out) << "bucket name: " << bucket_info.bucket.name << std::endl;
- }
-
- /* update bucket info -- in progress*/
list<rgw_cls_bi_entry> entries;
- if (max_op_entries <= 0) {
- ldpp_dout(dpp, 0) << __func__ <<
- ": can't reshard, non-positive max_op_entries" << dendl;
+ string stage;
+ bool read_reshardlog;
+ switch (reshard_stage) {
+ case rgw::BucketReshardState::InLogrecord:
+ stage = "inventory";
+ read_reshardlog = false;
+ break;
+ case rgw::BucketReshardState::InProgress:
+ stage = "inc";
+ read_reshardlog = true;
+ break;
+ default:
+ ldpp_dout(dpp, 0) << "ERROR: " << __func__ << " unknown reshard stage" << dendl;
return -EINVAL;
}
-
- BucketReshardManager target_shards_mgr(dpp, store, bucket_info, target);
-
- bool verbose_json_out = verbose && (formatter != nullptr) && (out != nullptr);
-
+ stage.append("_entries");
if (verbose_json_out) {
- formatter->open_array_section("entries");
+ formatter->open_array_section(stage);
}
- uint64_t total_entries = 0;
-
+ uint64_t stage_entries = 0;
+ stage.append(":");
if (!verbose_json_out && out) {
- (*out) << "total entries:";
+ (*out) << stage;
}
const uint32_t num_source_shards = rgw::num_shards(current.layout.normal);
const std::string null_object_filter; // empty string since we're not filtering by object
while (is_truncated) {
entries.clear();
- int ret = store->getRados()->bi_list(
- dpp, bucket_info, i, null_object_filter, marker, max_op_entries,
- &entries, &is_truncated, y);
+
+ int ret = store->getRados()->bi_list(dpp, bucket_info, i, null_object_filter,
+ marker, max_op_entries, &entries,
+ &is_truncated, read_reshardlog, y);
if (ret == -ENOENT) {
ldpp_dout(dpp, 1) << "WARNING: " << __func__ << " failed to find shard "
<< i << ", skipping" << dendl;
}
for (auto iter = entries.begin(); iter != entries.end(); ++iter) {
- rgw_cls_bi_entry& entry = *iter;
- if (verbose_json_out) {
- formatter->open_object_section("entry");
-
- encode_json("shard_id", i, formatter);
- encode_json("num_entry", total_entries, formatter);
- encode_json("entry", entry, formatter);
- }
- total_entries++;
-
- marker = entry.idx;
-
- int target_shard_id;
- cls_rgw_obj_key cls_key;
- RGWObjCategory category;
- rgw_bucket_category_stats stats;
- bool account = entry.get_info(&cls_key, &category, &stats);
- rgw_obj_key key(cls_key);
- if (entry.type == BIIndexType::OLH && key.empty()) {
- // bogus entry created by https://tracker.ceph.com/issues/46456
- // to fix, skip so it doesn't get include in the new bucket instance
- total_entries--;
- ldpp_dout(dpp, 10) << "Dropping entry with empty name, idx=" << marker << dendl;
- continue;
- }
- rgw_obj obj(bucket_info.bucket, key);
- RGWMPObj mp;
- if (key.ns == RGW_OBJ_NS_MULTIPART && mp.from_meta(key.name)) {
- // place the multipart .meta object on the same shard as its head object
- obj.index_hash_source = mp.get_key();
- }
- ret = store->getRados()->get_target_shard_id(bucket_info.layout.target_index->layout.normal,
- obj.get_hash_object(), &target_shard_id);
- if (ret < 0) {
- ldpp_dout(dpp, -1) << "ERROR: get_target_shard_id() returned ret=" << ret << dendl;
- return ret;
- }
-
- int shard_index = (target_shard_id > 0 ? target_shard_id : 0);
-
- ret = target_shards_mgr.add_entry(shard_index, entry, account,
- category, stats);
- if (ret < 0) {
- return ret;
- }
-
- Clock::time_point now = Clock::now();
- if (reshard_lock.should_renew(now)) {
- // assume outer locks have timespans at least the size of ours, so
- // can call inside conditional
- if (outer_reshard_lock) {
- ret = outer_reshard_lock->renew(now);
- if (ret < 0) {
- return ret;
- }
- }
- ret = reshard_lock.renew(now);
- if (ret < 0) {
- ldpp_dout(dpp, -1) << "Error renewing bucket lock: " << ret << dendl;
- return ret;
- }
- }
- if (verbose_json_out) {
- formatter->close_section();
- formatter->flush(*out);
- } else if (out && !(total_entries % 1000)) {
- (*out) << " " << total_entries;
- }
+ rgw_cls_bi_entry& entry = *iter;
+ if (verbose_json_out) {
+ formatter->open_object_section("entry");
+
+ encode_json("shard_id", i, formatter);
+ encode_json("num_entry", stage_entries, formatter);
+ encode_json("entry", entry, formatter);
+ }
+ stage_entries++;
+
+ marker = entry.idx;
+
+ int target_shard_id;
+ cls_rgw_obj_key cls_key;
+ RGWObjCategory category;
+ rgw_bucket_category_stats stats;
+ bool account = entry.get_info(&cls_key, &category, &stats);
+ rgw_obj_key key(cls_key);
+ if (entry.type == BIIndexType::OLH && key.empty()) {
+ // bogus entry created by https://tracker.ceph.com/issues/46456
+ // to fix, skip so it doesn't get include in the new bucket instance
+ stage_entries--;
+ ldpp_dout(dpp, 10) << "Dropping entry with empty name, idx=" << marker << dendl;
+ continue;
+ }
+ rgw_obj obj(bucket_info.bucket, key);
+ RGWMPObj mp;
+ if (key.ns == RGW_OBJ_NS_MULTIPART && mp.from_meta(key.name)) {
+ // place the multipart .meta object on the same shard as its head object
+ obj.index_hash_source = mp.get_key();
+ }
+ ret = store->getRados()->get_target_shard_id(bucket_info.layout.target_index->layout.normal,
+ obj.get_hash_object(), &target_shard_id);
+ if (ret < 0) {
+ ldpp_dout(dpp, -1) << "ERROR: get_target_shard_id() returned ret=" << ret << dendl;
+ return ret;
+ }
+
+ int shard_index = (target_shard_id > 0 ? target_shard_id : 0);
+
+ ret = target_shards_mgr.add_entry(shard_index, entry, account,
+ category, stats);
+ if (ret < 0) {
+ return ret;
+ }
+
+ Clock::time_point now = Clock::now();
+ if (reshard_lock.should_renew(now)) {
+ // assume outer locks have timespans at least the size of ours, so
+ // can call inside conditional
+ if (outer_reshard_lock) {
+ ret = outer_reshard_lock->renew(now);
+ if (ret < 0) {
+ return ret;
+ }
+ }
+ ret = reshard_lock.renew(now);
+ if (ret < 0) {
+ ldpp_dout(dpp, -1) << "Error renewing bucket lock: " << ret << dendl;
+ return ret;
+ }
+ }
+ if (verbose_json_out) {
+ formatter->close_section();
+ formatter->flush(*out);
+ } else if (out && !(stage_entries % 1000)) {
+ (*out) << " " << stage_entries;
+ }
} // entries loop
}
}
formatter->close_section();
formatter->flush(*out);
} else if (out) {
- (*out) << " " << total_entries << std::endl;
+ (*out) << " " << stage_entries << std::endl;
}
int ret = target_shards_mgr.finish();
if (ret < 0) {
- ldpp_dout(dpp, -1) << "ERROR: failed to reshard" << dendl;
+ ldpp_dout(dpp, -1) << "ERROR: failed to reshard: " << ret << dendl;
return -EIO;
}
return 0;
+}
+
+int RGWBucketReshard::do_reshard(const rgw::bucket_index_layout_generation& current,
+ const rgw::bucket_index_layout_generation& target,
+ int max_op_entries, // max num to process per op
+ bool verbose,
+ ostream *out,
+ Formatter *formatter,
+ ReshardFaultInjector& fault,
+ const DoutPrefixProvider *dpp, optional_yield y)
+{
+ if (out) {
+ (*out) << "tenant: " << bucket_info.bucket.tenant << std::endl;
+ (*out) << "bucket name: " << bucket_info.bucket.name << std::endl;
+ }
+
+ if (max_op_entries <= 0) {
+ ldpp_dout(dpp, 0) << __func__ <<
+ ": can't reshard, non-positive max_op_entries" << dendl;
+ return -EINVAL;
+ }
+
+ BucketReshardManager target_shards_mgr(dpp, store, bucket_info, target);
+
+ bool verbose_json_out = verbose && (formatter != nullptr) && (out != nullptr);
+
+ // a log is written to shard going with client op at this state
+ ceph_assert(bucket_info.layout.resharding == rgw::BucketReshardState::InLogrecord);
+ int ret = reshard_process(current, max_op_entries, target_shards_mgr, verbose_json_out, out,
+ formatter, bucket_info.layout.resharding, dpp, y);
+ if (ret < 0) {
+ ldpp_dout(dpp, 0) << __func__ << ": failed in logrecord state of reshard ret = " << ret << dendl;
+ return ret;
+ }
+
+ ret = change_reshard_state(store, bucket_info, bucket_attrs, fault, dpp, y);
+ if (ret < 0) {
+ return ret;
+ }
+
+ // block the client op and complete the resharding
+ ceph_assert(bucket_info.layout.resharding == rgw::BucketReshardState::InProgress);
+ ret = reshard_process(current, max_op_entries, target_shards_mgr, verbose_json_out, out,
+ formatter, bucket_info.layout.resharding, dpp, y);
+ if (ret < 0) {
+ ldpp_dout(dpp, 0) << __func__ << ": failed in progress state of reshard ret = " << ret << dendl;
+ return ret;
+ }
+ return 0;
} // RGWBucketReshard::do_reshard
int RGWBucketReshard::get_status(const DoutPrefixProvider *dpp, list<cls_rgw_bucket_instance_entry> *status)
if (ret < 0) {
return ret;
}
- // unlock when scope exits
+ // TODO: release the lock when purging the old index shards or unsucessful new index shards
auto unlock = make_scope_guard([this] { reshard_lock.unlock(); });
if (reshard_log) {
ret == 0) { // no fault injected, do the reshard
ret = do_reshard(bucket_info.layout.current_index,
*bucket_info.layout.target_index,
- max_op_entries, verbose, out, formatter, dpp, y);
+ max_op_entries, verbose, out, formatter, fault, dpp, y);
}
if (ret < 0) {
class RGWReshard;
+class BucketReshardManager;
namespace rgw { namespace sal {
class RadosStore;
} }
// using an initializer_list as an array in contiguous memory
// allocated in at once
static const std::initializer_list<uint16_t> reshard_primes;
-
+ int reshard_process(const rgw::bucket_index_layout_generation& current,
+ int& max_entries,
+ BucketReshardManager& target_shards_mgr,
+ bool verbose_json_out,
+ std::ostream *out,
+ Formatter *formatter, rgw::BucketReshardState reshard_stage,
+ const DoutPrefixProvider *dpp, optional_yield y);
int do_reshard(const rgw::bucket_index_layout_generation& current,
const rgw::bucket_index_layout_generation& target,
int max_entries,
bool verbose,
std::ostream *os,
Formatter *formatter,
+ ReshardFaultInjector& fault,
const DoutPrefixProvider *dpp, optional_yield y);
public:
do {
entries.clear();
// if object is specified, we use that as a filter to only retrieve some entries
- ret = static_cast<rgw::sal::RadosStore*>(driver)->getRados()->bi_list(bs, object, marker, max_entries, &entries, &is_truncated, null_yield);
+ ret = static_cast<rgw::sal::RadosStore*>(driver)->getRados()->bi_list(bs, object, marker, max_entries, &entries, &is_truncated, false, null_yield);
if (ret < 0) {
ldpp_dout(dpp(), 0) << "ERROR: bi_list(): " << cpp_strerror(-ret) << dendl;
return -ret;
{
switch (s) {
case BucketReshardState::None: return "None";
+ case BucketReshardState::InLogrecord: return "InLogrecord";
case BucketReshardState::InProgress: return "InProgress";
default: return "Unknown";
}
s = BucketReshardState::None;
return true;
}
+ if (boost::iequals(str, "InLogrecord")) {
+ s = BucketReshardState::InLogrecord;
+ return true;
+ }
if (boost::iequals(str, "InProgress")) {
s = BucketReshardState::InProgress;
return true;
enum class BucketReshardState : uint8_t {
None,
+ InLogrecord,
InProgress,
};
std::string_view to_string(const BucketReshardState& s);
/* prepare both removal and modification on the same object, this time we'll
* first complete modification then remove*/
index_prepare(ioctx, bucket_oid, CLS_RGW_OP_DEL, tag_remove, obj, loc);
- index_prepare(ioctx, bucket_oid, CLS_RGW_OP_DEL, tag_modify, obj, loc);
+ index_prepare(ioctx, bucket_oid, CLS_RGW_OP_ADD, tag_modify, obj, loc);
/* complete modification */
total_size -= meta.size;
test_stats(ioctx, bucket_oid, RGWObjCategory::None, 0, 0);
}
+
+void set_reshard_status(librados::IoCtx& ioctx, const std::string& oid,
+ const cls_rgw_bucket_instance_entry& entry)
+{
+ map<int, string> bucket_objs;
+ bucket_objs[0] = oid;
+ int r = CLSRGWIssueSetBucketResharding(ioctx, bucket_objs, entry, 1)();
+ ASSERT_EQ(0, r);
+}
+
+static int reshardlog_list(librados::IoCtx& ioctx, const std::string& oid,
+ std::list<rgw_cls_bi_entry> *entries, bool *is_truncated)
+{
+ int ret = cls_rgw_bi_list(ioctx, oid, "", "", 100, entries, is_truncated, true);
+ if (ret < 0) {
+ return ret;
+ }
+ return 0;
+}
+
+TEST_F(cls_rgw, reshardlog_list)
+{
+ string bucket_oid = str_int("reshard", 0);
+
+ ObjectWriteOperation op;
+ cls_rgw_bucket_init_index(op);
+ ASSERT_EQ(0, ioctx.operate(bucket_oid, &op));
+
+ cls_rgw_obj_key obj1 = str_int("obj1", 0);
+ string tag = str_int("tag-prepare", 0);
+ string loc = str_int("loc", 0);
+ index_prepare(ioctx, bucket_oid, CLS_RGW_OP_ADD, tag, obj1, loc);
+ rgw_bucket_dir_entry_meta meta;
+ index_complete(ioctx, bucket_oid, CLS_RGW_OP_ADD, tag, 1, obj1, meta);
+
+ // do not record logs
+ bool is_truncated = false;
+ std::list<rgw_cls_bi_entry> entries;
+ ASSERT_EQ(0, reshardlog_list(ioctx, bucket_oid, &entries, &is_truncated));
+ ASSERT_FALSE(is_truncated);
+ ASSERT_EQ(0u, entries.size());
+
+ // set reshard status to IN_LOGRECORD
+ cls_rgw_bucket_instance_entry entry;
+ entry.reshard_status = cls_rgw_reshard_status::IN_LOGRECORD;
+ set_reshard_status(ioctx, bucket_oid, entry);
+
+ // record a log in prepare
+ cls_rgw_obj_key obj2 = str_int("obj2", 0);
+ index_prepare(ioctx, bucket_oid, CLS_RGW_OP_ADD, tag, obj2, loc);
+ ASSERT_EQ(0, reshardlog_list(ioctx, bucket_oid, &entries, &is_truncated));
+ ASSERT_FALSE(is_truncated);
+ ASSERT_EQ(1u, entries.size());
+
+ // overwrite the log writen in prepare
+ entries.clear();
+ index_complete(ioctx, bucket_oid, CLS_RGW_OP_ADD, tag, 1, obj2, meta);
+ ASSERT_EQ(0, reshardlog_list(ioctx, bucket_oid, &entries, &is_truncated));
+ ASSERT_FALSE(is_truncated);
+ ASSERT_EQ(1u, entries.size());
+}