From: liangmingyuan Date: Sun, 31 Mar 2024 12:11:02 +0000 (+0800) Subject: rgw/reshard: copy the index entries to dest shards. X-Git-Tag: v20.0.0~1122^2~8 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=55b404afeb6e2f201876b5876f9c086115f032dd;p=ceph.git rgw/reshard: copy the index entries to dest shards. In logrecord state, copy inventoried index entries to dest shards and record a log for new writting entry. In progress state, block the writes, listing the logs written in logrecord state, then gain corresponding index entries and copy them to dest shards. Signed-off-by: Mingyuan Liang --- diff --git a/qa/workunits/rgw/test_rgw_reshard.py b/qa/workunits/rgw/test_rgw_reshard.py index 6326e7b173cf..dd7f601998e3 100755 --- a/qa/workunits/rgw/test_rgw_reshard.py +++ b/qa/workunits/rgw/test_rgw_reshard.py @@ -223,6 +223,18 @@ def main(): log.debug('TEST: reshard bucket with abort at do_reshard\n') test_bucket_reshard(connection, 'abort-at-do-reshard', abort_at='do_reshard') + log.debug('TEST: reshard bucket with EIO injected at logrecord_writes\n') + test_bucket_reshard(connection, 'error-at-logrecord-writes', error_at='logrecord_writes') + log.debug('TEST: reshard bucket with abort at logrecord_writes\n') + test_bucket_reshard(connection, 'abort-at-logrecord-writes', abort_at='logrecord_writes') + + log.debug('TEST: reshard bucket with EIO injected at change_reshard_state\n') + test_bucket_reshard(connection, 'error-at-change-reshard-state', error_at='change_reshard_state') + log.debug('TEST: reshard bucket with ECANCELED injected at change_reshard_state\n') + test_bucket_reshard(connection, 'error-at-change-reshard-state', error_at='change_reshard_state', error_code=errno.ECANCELED) + log.debug('TEST: reshard bucket with abort at change_reshard_state\n') + test_bucket_reshard(connection, 'abort-at-change-reshard-state', abort_at='change_reshard_state') + # TESTCASE 'versioning reshard-','bucket', reshard','versioning reshard','succeeds' log.debug(' test: reshard versioned bucket') num_shards_expected = get_bucket_stats(VER_BUCKET_NAME).num_shards + 1 diff --git a/src/cls/rgw/cls_rgw.cc b/src/cls/rgw/cls_rgw.cc index aa5ba9ed7dbf..7bd60bf20c85 100644 --- a/src/cls/rgw/cls_rgw.cc +++ b/src/cls/rgw/cls_rgw.cc @@ -118,6 +118,20 @@ static bool bi_entry_gt(const string& first, const string& second) return first > second; } +/** + * return: Plain, Instance, OLH or Invalid + */ +BIIndexType bi_type(const string& s, const string& prefix) +{ + int ret = bi_entry_type(s.substr(prefix.size())); + if (ret < 0) { + return BIIndexType::Invalid; + } else if (ret == 0) { + return BIIndexType::Plain; + } + return (BIIndexType)ret; +} + static void get_time_key(real_time& ut, string *key) { char buf[32]; @@ -2822,15 +2836,22 @@ static int rgw_bi_put_op(cls_method_context_t hctx, bufferlist *in, bufferlist * rgw_cls_bi_entry& entry = op.entry; - int r = cls_cxx_map_set_val(hctx, entry.idx, &entry.data); - if (r < 0) { - CLS_LOG(0, "ERROR: %s: cls_cxx_map_set_val() returned r=%d", __func__, r); + if (entry.type == BIIndexType::ReshardDeleted) { + int r = cls_cxx_map_remove_key(hctx, entry.idx); + if (r < 0) { + CLS_LOG(0, "ERROR: %s: cls_cxx_map_remove_key() returned r=%d", __func__, r); + return r; + } + } else { + int r = cls_cxx_map_set_val(hctx, entry.idx, &entry.data); + if (r < 0) { + CLS_LOG(0, "ERROR: %s: cls_cxx_map_set_val() returned r=%d", __func__, r); + } } return 0; } - /* The plain entries in the bucket index are divided into two regions * divided by the special entries that begin with 0x80. Those below * ("Low") are ascii entries. Those above ("High") bring in unicode @@ -3176,6 +3197,57 @@ static int list_olh_entries(cls_method_context_t hctx, return count; } +static int reshard_log_list_entries(cls_method_context_t hctx, const string& marker, + uint32_t max, list& entries, bool *truncated) +{ + string start_key, end_key; + start_key = BI_PREFIX_CHAR; + start_key.append(bucket_index_prefixes[BI_BUCKET_RESHARD_LOG_INDEX]); + + string bi_type_marker = start_key; + + end_key = BI_PREFIX_CHAR; + end_key.append(bucket_index_prefixes[BI_BUCKET_RESHARD_LOG_INDEX + 1]); + + if (!marker.empty()) { + start_key.append(marker); + } + + map keys; + int ret = cls_cxx_map_get_vals(hctx, start_key, string(), max, &keys, truncated); + CLS_LOG(20, "%s(): start_key=%s keys.size()=%d", __func__, escape_str(start_key).c_str(), (int)keys.size()); + if (ret < 0) { + return ret; + } + + map::iterator iter; + for (iter = keys.begin(); iter != keys.end(); ++iter) { + if (iter->first.compare(end_key) >= 0) { + if (truncated) { + *truncated = false; + } + return 0; + } + + rgw_cls_bi_entry entry; + auto biter = iter->second.cbegin(); + try { + decode(entry, biter); + } catch (ceph::buffer::error& err) { + CLS_LOG(0, "ERROR: %s: failed to decode buffer for rgw_cls_bi_entry \"%s\"", + __func__, escape_str(iter->first).c_str()); + return -EIO; + } + if (entry.type != BIIndexType::ReshardDeleted) + entry.type = bi_type(iter->first, bi_type_marker); + + CLS_LOG(20, "reshard_log_list_entries key=%s bl.length=%d\n", entry.idx.c_str(), (int)iter->second.length()); + + entries.push_back(entry); + } + return 0; +} + static int check_index(cls_method_context_t hctx, rgw_bucket_dir_header *existing_header, rgw_bucket_dir_header *calc_header) @@ -3285,7 +3357,8 @@ int rgw_bucket_check_index(cls_method_context_t hctx, bufferlist *in, bufferlist } -/* Lists all the entries that appear in a bucket index listing. +/* Lists all the entries that appear in a bucket index listing, + * or list all the entries in reshardlog namespace. * * It may not be obvious why this function calls three other "segment" * functions (list_plain_entries (twice), list_instance_entries, @@ -3324,15 +3397,24 @@ static int rgw_bi_list_op(cls_method_context_t hctx, constexpr uint32_t MAX_BI_LIST_ENTRIES = 1000; const uint32_t max = std::min(op.max, MAX_BI_LIST_ENTRIES); - CLS_LOG(20, "%s: op.marker=\"%s\", op.name_filter=\"%s\", op.max=%u max=%u", + CLS_LOG(20, "%s: op.marker=\"%s\", op.name_filter=\"%s\", op.max=%u max=%u, op.reshardlog=%d", __func__, escape_str(op.marker).c_str(), escape_str(op.name_filter).c_str(), - op.max, max); + op.max, max, op.reshardlog); int ret; uint32_t count = 0; bool more = false; rgw_cls_bi_list_ret op_ret; + if (op.reshardlog) { + ret = reshard_log_list_entries(hctx, op.marker, op.max, op_ret.entries, &op_ret.is_truncated); + if (ret < 0) + return ret; + CLS_LOG(20, "%s: returning %lu entries, is_truncated=%d", __func__, op_ret.entries.size(), op_ret.is_truncated); + encode(op_ret, *out); + return 0; + } + ret = list_plain_entries(hctx, op.name_filter, op.marker, max, &op_ret.entries, &more, PlainEntriesRegion::Low); if (ret < 0) { @@ -3662,7 +3744,6 @@ static int rgw_bi_log_stop(cls_method_context_t hctx, bufferlist *in, bufferlist return write_bucket_header(hctx, &header); } - static void usage_record_prefix_by_time(uint64_t epoch, string& key) { char buf[32]; @@ -4754,7 +4835,7 @@ static int rgw_guard_bucket_resharding(cls_method_context_t hctx, bufferlist *in return rc; } - if (header.resharding()) { + if (header.resharding_in_progress()) { return op.ret_err; } diff --git a/src/cls/rgw/cls_rgw_client.cc b/src/cls/rgw/cls_rgw_client.cc index e65dedf14e42..bfdac5259aa6 100644 --- a/src/cls/rgw/cls_rgw_client.cc +++ b/src/cls/rgw/cls_rgw_client.cc @@ -443,6 +443,7 @@ int cls_rgw_bi_get(librados::IoCtx& io_ctx, const string oid, return 0; } + int cls_rgw_bi_put(librados::IoCtx& io_ctx, const string oid, const rgw_cls_bi_entry& entry) { bufferlist in, out; @@ -470,13 +471,14 @@ void cls_rgw_bi_put(ObjectWriteOperation& op, const string oid, const rgw_cls_bi */ int cls_rgw_bi_list(librados::IoCtx& io_ctx, const std::string& oid, const std::string& name_filter, const std::string& marker, uint32_t max, - std::list *entries, bool *is_truncated) + std::list *entries, bool *is_truncated, bool reshardlog) { bufferlist in, out; rgw_cls_bi_list_op call; call.name_filter = name_filter; call.marker = marker; call.max = max; + call.reshardlog = reshardlog; encode(call, in); int r = io_ctx.exec(oid, RGW_CLASS, RGW_BI_LIST, in, out); if (r < 0) diff --git a/src/cls/rgw/cls_rgw_client.h b/src/cls/rgw/cls_rgw_client.h index 365a51fb5d59..6f39858b10ba 100644 --- a/src/cls/rgw/cls_rgw_client.h +++ b/src/cls/rgw/cls_rgw_client.h @@ -372,8 +372,7 @@ int cls_rgw_bi_put(librados::IoCtx& io_ctx, const std::string oid, const rgw_cls void cls_rgw_bi_put(librados::ObjectWriteOperation& op, const std::string oid, const rgw_cls_bi_entry& entry); int cls_rgw_bi_list(librados::IoCtx& io_ctx, const std::string& oid, const std::string& name, const std::string& marker, uint32_t max, - std::list *entries, bool *is_truncated); - + std::list *entries, bool *is_truncated, bool reshardlog = false); void cls_rgw_bucket_link_olh(librados::ObjectWriteOperation& op, const cls_rgw_obj_key& key, const ceph::buffer::list& olh_tag, diff --git a/src/cls/rgw/cls_rgw_ops.h b/src/cls/rgw/cls_rgw_ops.h index b824c73d3d01..dd8a7779aedd 100644 --- a/src/cls/rgw/cls_rgw_ops.h +++ b/src/cls/rgw/cls_rgw_ops.h @@ -760,22 +760,27 @@ struct rgw_cls_bi_list_op { uint32_t max; std::string name_filter; // limit result to one object and its instances std::string marker; + bool reshardlog; - rgw_cls_bi_list_op() : max(0) {} + rgw_cls_bi_list_op() : max(0), reshardlog(false) {} void encode(ceph::buffer::list& bl) const { - ENCODE_START(1, 1, bl); + ENCODE_START(2, 1, bl); encode(max, bl); encode(name_filter, bl); encode(marker, bl); + encode(reshardlog, bl); ENCODE_FINISH(bl); } void decode(ceph::buffer::list::const_iterator& bl) { - DECODE_START(1, bl); + DECODE_START(2, bl); decode(max, bl); decode(name_filter, bl); decode(marker, bl); + if (struct_v >= 2) { + decode(reshardlog, bl); + } DECODE_FINISH(bl); } @@ -783,6 +788,7 @@ struct rgw_cls_bi_list_op { f->dump_unsigned("max", max); f->dump_string("name_filter", name_filter); f->dump_string("marker", marker); + f->dump_bool("reshardlog", reshardlog); } static void generate_test_instances(std::list& o) { @@ -791,6 +797,7 @@ struct rgw_cls_bi_list_op { o.back()->max = 100; o.back()->name_filter = "name_filter"; o.back()->marker = "marker"; + o.back()->reshardlog = true; } }; WRITE_CLASS_ENCODER(rgw_cls_bi_list_op) diff --git a/src/cls/rgw/cls_rgw_types.cc b/src/cls/rgw/cls_rgw_types.cc index b58d37691065..ad8e7f16e2fb 100644 --- a/src/cls/rgw/cls_rgw_types.cc +++ b/src/cls/rgw/cls_rgw_types.cc @@ -955,6 +955,9 @@ std::ostream& operator<<(std::ostream& out, cls_rgw_reshard_status status) { case cls_rgw_reshard_status::NOT_RESHARDING: out << "NOT_RESHARDING"; break; + case cls_rgw_reshard_status::IN_LOGRECORD: + out << "IN_LOGRECORD"; + break; case cls_rgw_reshard_status::IN_PROGRESS: out << "IN_PROGRESS"; break; diff --git a/src/rgw/driver/rados/rgw_bucket.cc b/src/rgw/driver/rados/rgw_bucket.cc index 572c24162e77..f002dd78daa1 100644 --- a/src/rgw/driver/rados/rgw_bucket.cc +++ b/src/rgw/driver/rados/rgw_bucket.cc @@ -369,7 +369,7 @@ static int check_bad_index_multipart(rgw::sal::RadosStore* const rados_store, do { entries_read.clear(); ret = store->bi_list(bs, "", marker, -1, - &entries_read, &is_truncated, y); + &entries_read, &is_truncated, false, y); if (ret < 0) { ldpp_dout(dpp, -1) << "ERROR bi_list(): " << cpp_strerror(-ret) << dendl; @@ -630,7 +630,7 @@ static int check_index_olh(rgw::sal::RadosStore* const rados_store, *count_out = 0; do { entries.clear(); - ret = store->bi_list(bs, "", marker, -1, &entries, &is_truncated, y); + ret = store->bi_list(bs, "", marker, -1, &entries, &is_truncated, false, y); if (ret < 0) { ldpp_dout(dpp, -1) << "ERROR bi_list(): " << cpp_strerror(-ret) << dendl; break; @@ -857,7 +857,7 @@ static int check_index_unlinked(rgw::sal::RadosStore* const rados_store, *count_out = 0; do { entries.clear(); - ret = store->bi_list(bs, "", marker, -1, &entries, &is_truncated, y); + ret = store->bi_list(bs, "", marker, -1, &entries, &is_truncated, false, y); if (ret < 0) { ldpp_dout(dpp, -1) << "ERROR bi_list(): " << cpp_strerror(-ret) << dendl; break; diff --git a/src/rgw/driver/rados/rgw_rados.cc b/src/rgw/driver/rados/rgw_rados.cc index 2b408dbb369b..4939438a68b4 100644 --- a/src/rgw/driver/rados/rgw_rados.cc +++ b/src/rgw/driver/rados/rgw_rados.cc @@ -9199,7 +9199,8 @@ int RGWRados::bi_put(const DoutPrefixProvider *dpp, rgw_bucket& bucket, rgw_obj& int RGWRados::bi_list(const DoutPrefixProvider *dpp, rgw_bucket& bucket, const string& obj_name_filter, const string& marker, uint32_t max, - list *entries, bool *is_truncated, optional_yield y) + list *entries, bool *is_truncated, + bool reshardlog, optional_yield y) { rgw_obj obj(bucket, obj_name_filter); BucketShard bs(this); @@ -9210,7 +9211,7 @@ int RGWRados::bi_list(const DoutPrefixProvider *dpp, rgw_bucket& bucket, } auto& ref = bs.bucket_obj; - ret = cls_rgw_bi_list(ref.ioctx, ref.obj.oid, obj_name_filter, marker, max, entries, is_truncated); + ret = cls_rgw_bi_list(ref.ioctx, ref.obj.oid, obj_name_filter, marker, max, entries, is_truncated, reshardlog); if (ret == -ENOENT) { *is_truncated = false; } @@ -9221,10 +9222,10 @@ int RGWRados::bi_list(const DoutPrefixProvider *dpp, rgw_bucket& bucket, } int RGWRados::bi_list(BucketShard& bs, const string& obj_name_filter, const string& marker, uint32_t max, - list *entries, bool *is_truncated, optional_yield y) + list *entries, bool *is_truncated, bool reshardlog, optional_yield y) { auto& ref = bs.bucket_obj; - int ret = cls_rgw_bi_list(ref.ioctx, ref.obj.oid, obj_name_filter, marker, max, entries, is_truncated); + int ret = cls_rgw_bi_list(ref.ioctx, ref.obj.oid, obj_name_filter, marker, max, entries, is_truncated, reshardlog); if (ret < 0) return ret; @@ -9233,7 +9234,7 @@ int RGWRados::bi_list(BucketShard& bs, const string& obj_name_filter, const stri int RGWRados::bi_list(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, int shard_id, const string& obj_name_filter, const string& marker, uint32_t max, - list *entries, bool *is_truncated, optional_yield y) + list *entries, bool *is_truncated, bool reshardlog, optional_yield y) { BucketShard bs(this); int ret = bs.init(dpp, bucket_info, @@ -9244,7 +9245,7 @@ int RGWRados::bi_list(const DoutPrefixProvider *dpp, return ret; } - return bi_list(bs, obj_name_filter, marker, max, entries, is_truncated, y); + return bi_list(bs, obj_name_filter, marker, max, entries, is_truncated, reshardlog, y); } int RGWRados::bi_remove(const DoutPrefixProvider *dpp, BucketShard& bs) diff --git a/src/rgw/driver/rados/rgw_rados.h b/src/rgw/driver/rados/rgw_rados.h index 278d1182b4dd..7d6299d118de 100644 --- a/src/rgw/driver/rados/rgw_rados.h +++ b/src/rgw/driver/rados/rgw_rados.h @@ -1525,10 +1525,11 @@ public: const std::string& marker, uint32_t max, std::list *entries, - bool *is_truncated, optional_yield y); - int bi_list(BucketShard& bs, const std::string& filter_obj, const std::string& marker, uint32_t max, std::list *entries, bool *is_truncated, optional_yield y); + bool *is_truncated, bool reshardlog, optional_yield y); + int bi_list(BucketShard& bs, const std::string& filter_obj, const std::string& marker, uint32_t max, std::list *entries, + bool *is_truncated, bool reshardlog, optional_yield y); int bi_list(const DoutPrefixProvider *dpp, rgw_bucket& bucket, const std::string& obj_name, const std::string& marker, uint32_t max, - std::list *entries, bool *is_truncated, optional_yield y); + std::list *entries, bool *is_truncated, bool reshardlog, optional_yield y); int bi_remove(const DoutPrefixProvider *dpp, BucketShard& bs); int cls_obj_usage_log_add(const DoutPrefixProvider *dpp, const std::string& oid, rgw_usage_log_info& info, optional_yield y); diff --git a/src/rgw/driver/rados/rgw_reshard.cc b/src/rgw/driver/rados/rgw_reshard.cc index a609bd86b6d8..b75bb7bf8cd9 100644 --- a/src/rgw/driver/rados/rgw_reshard.cc +++ b/src/rgw/driver/rados/rgw_reshard.cc @@ -302,6 +302,7 @@ public: ": shard->wait_all_aio() returned ret=" << ret << dendl; } } + target_shards.clear(); } int add_entry(int shard_index, @@ -334,7 +335,6 @@ public: ret = r; } } - target_shards.clear(); return ret; } }; // class BucketReshardManager @@ -464,10 +464,12 @@ static int init_target_layout(rgw::sal::RadosStore* store, // retry in case of racing writes to the bucket instance metadata static constexpr auto max_retries = 10; int tries = 0; + do { + // update resharding state bucket_info.layout.target_index = target; - bucket_info.layout.resharding = rgw::BucketReshardState::InProgress; + bucket_info.layout.resharding = rgw::BucketReshardState::InLogrecord; if (ret = fault.check("set_target_layout"); ret == 0) { // no fault injected, write the bucket instance metadata @@ -607,6 +609,87 @@ static int init_reshard(rgw::sal::RadosStore* store, return ret; } + if (ret = fault.check("logrecord_writes"); + ret == 0) { // no fault injected, record log with writing to the current index shards + ret = set_resharding_status(dpp, store, bucket_info, + cls_rgw_reshard_status::IN_LOGRECORD); + } + + if (ret < 0) { + ldpp_dout(dpp, 0) << "ERROR: " << __func__ << " failed to pause " + "writes to the current index: " << cpp_strerror(ret) << dendl; + // clean up the target layout (ignore errors) + revert_target_layout(store, bucket_info, bucket_attrs, fault, dpp, y); + return ret; + } + return 0; +} // init_reshard + +static int change_reshard_state(rgw::sal::RadosStore* store, + RGWBucketInfo& bucket_info, + std::map& bucket_attrs, + ReshardFaultInjector& fault, + const DoutPrefixProvider *dpp, optional_yield y) +{ + auto prev = bucket_info.layout; // make a copy for cleanup + const auto current = prev.current_index; + + // retry in case of racing writes to the bucket instance metadata + static constexpr auto max_retries = 10; + int tries = 0; + int ret = 0; + do { + // update resharding state + bucket_info.layout.resharding = rgw::BucketReshardState::InProgress; + + if (ret = fault.check("change_reshard_state"); + ret == 0) { // no fault injected, write the bucket instance metadata + ret = store->getRados()->put_bucket_instance_info(bucket_info, false, + real_time(), &bucket_attrs, dpp, y); + } else if (ret == -ECANCELED) { + fault.clear(); // clear the fault so a retry can succeed + } + + if (ret == -ECANCELED) { + // racing write detected, read the latest bucket info and try again + int ret2 = store->getRados()->get_bucket_instance_info( + bucket_info.bucket, bucket_info, + nullptr, &bucket_attrs, y, dpp); + if (ret2 < 0) { + ldpp_dout(dpp, 0) << "ERROR: " << __func__ << " failed to read " + "bucket info: " << cpp_strerror(ret2) << dendl; + ret = ret2; + break; + } + + // check that we're still in the reshard state we started in + if (bucket_info.layout.resharding != rgw::BucketReshardState::InLogrecord || + bucket_info.layout.current_index != current) { + ldpp_dout(dpp, 1) << "WARNING: " << __func__ << " raced with " + "another reshard" << dendl; + break; + } + } + ++tries; + } while (ret == -ECANCELED && tries < max_retries); + + if (ret < 0) { + ldpp_dout(dpp, 0) << "ERROR: " << __func__ << " failed to commit " + "target index layout: " << cpp_strerror(ret) << dendl; + + bucket_info.layout = std::move(prev); // restore in-memory layout + + // unblock writes to the current index shard objects + int ret2 = set_resharding_status(dpp, store, bucket_info, + cls_rgw_reshard_status::NOT_RESHARDING); + if (ret2 < 0) { + ldpp_dout(dpp, 1) << "WARNING: " << __func__ << " failed to unblock " + "writes to current index objects: " << cpp_strerror(ret2) << dendl; + // non-fatal error + } + return ret; + } + if (ret = fault.check("block_writes"); ret == 0) { // no fault injected, block writes to the current index shards ret = set_resharding_status(dpp, store, bucket_info, @@ -622,7 +705,7 @@ static int init_reshard(rgw::sal::RadosStore* store, } return 0; -} // init_reshard +} // change_reshard_state static int cancel_reshard(rgw::sal::RadosStore* store, RGWBucketInfo& bucket_info, @@ -788,7 +871,8 @@ int RGWBucketReshard::cancel(const DoutPrefixProvider* dpp, optional_yield y) return ret; } - if (bucket_info.layout.resharding != rgw::BucketReshardState::InProgress) { + if (bucket_info.layout.resharding != rgw::BucketReshardState::InProgress || + bucket_info.layout.resharding != rgw::BucketReshardState::InLogrecord) { ldpp_dout(dpp, -1) << "ERROR: bucket is not resharding" << dendl; ret = -EINVAL; } else { @@ -885,41 +969,40 @@ int RGWBucketReshardLock::renew(const Clock::time_point& now) { return 0; } - -int RGWBucketReshard::do_reshard(const rgw::bucket_index_layout_generation& current, - const rgw::bucket_index_layout_generation& target, - int max_op_entries, // max num to process per op - bool verbose, - ostream *out, - Formatter *formatter, - const DoutPrefixProvider *dpp, optional_yield y) +int RGWBucketReshard::reshard_process(const rgw::bucket_index_layout_generation& current, + int& max_op_entries, + BucketReshardManager& target_shards_mgr, + bool verbose_json_out, + ostream *out, + Formatter *formatter, rgw::BucketReshardState reshard_stage, + const DoutPrefixProvider *dpp, optional_yield y) { - if (out) { - (*out) << "tenant: " << bucket_info.bucket.tenant << std::endl; - (*out) << "bucket name: " << bucket_info.bucket.name << std::endl; - } - - /* update bucket info -- in progress*/ list entries; - if (max_op_entries <= 0) { - ldpp_dout(dpp, 0) << __func__ << - ": can't reshard, non-positive max_op_entries" << dendl; + string stage; + bool read_reshardlog; + switch (reshard_stage) { + case rgw::BucketReshardState::InLogrecord: + stage = "inventory"; + read_reshardlog = false; + break; + case rgw::BucketReshardState::InProgress: + stage = "inc"; + read_reshardlog = true; + break; + default: + ldpp_dout(dpp, 0) << "ERROR: " << __func__ << " unknown reshard stage" << dendl; return -EINVAL; } - - BucketReshardManager target_shards_mgr(dpp, store, bucket_info, target); - - bool verbose_json_out = verbose && (formatter != nullptr) && (out != nullptr); - + stage.append("_entries"); if (verbose_json_out) { - formatter->open_array_section("entries"); + formatter->open_array_section(stage); } - uint64_t total_entries = 0; - + uint64_t stage_entries = 0; + stage.append(":"); if (!verbose_json_out && out) { - (*out) << "total entries:"; + (*out) << stage; } const uint32_t num_source_shards = rgw::num_shards(current.layout.normal); @@ -930,9 +1013,10 @@ int RGWBucketReshard::do_reshard(const rgw::bucket_index_layout_generation& curr const std::string null_object_filter; // empty string since we're not filtering by object while (is_truncated) { entries.clear(); - int ret = store->getRados()->bi_list( - dpp, bucket_info, i, null_object_filter, marker, max_op_entries, - &entries, &is_truncated, y); + + int ret = store->getRados()->bi_list(dpp, bucket_info, i, null_object_filter, + marker, max_op_entries, &entries, + &is_truncated, read_reshardlog, y); if (ret == -ENOENT) { ldpp_dout(dpp, 1) << "WARNING: " << __func__ << " failed to find shard " << i << ", skipping" << dendl; @@ -944,74 +1028,74 @@ int RGWBucketReshard::do_reshard(const rgw::bucket_index_layout_generation& curr } for (auto iter = entries.begin(); iter != entries.end(); ++iter) { - rgw_cls_bi_entry& entry = *iter; - if (verbose_json_out) { - formatter->open_object_section("entry"); - - encode_json("shard_id", i, formatter); - encode_json("num_entry", total_entries, formatter); - encode_json("entry", entry, formatter); - } - total_entries++; - - marker = entry.idx; - - int target_shard_id; - cls_rgw_obj_key cls_key; - RGWObjCategory category; - rgw_bucket_category_stats stats; - bool account = entry.get_info(&cls_key, &category, &stats); - rgw_obj_key key(cls_key); - if (entry.type == BIIndexType::OLH && key.empty()) { - // bogus entry created by https://tracker.ceph.com/issues/46456 - // to fix, skip so it doesn't get include in the new bucket instance - total_entries--; - ldpp_dout(dpp, 10) << "Dropping entry with empty name, idx=" << marker << dendl; - continue; - } - rgw_obj obj(bucket_info.bucket, key); - RGWMPObj mp; - if (key.ns == RGW_OBJ_NS_MULTIPART && mp.from_meta(key.name)) { - // place the multipart .meta object on the same shard as its head object - obj.index_hash_source = mp.get_key(); - } - ret = store->getRados()->get_target_shard_id(bucket_info.layout.target_index->layout.normal, - obj.get_hash_object(), &target_shard_id); - if (ret < 0) { - ldpp_dout(dpp, -1) << "ERROR: get_target_shard_id() returned ret=" << ret << dendl; - return ret; - } - - int shard_index = (target_shard_id > 0 ? target_shard_id : 0); - - ret = target_shards_mgr.add_entry(shard_index, entry, account, - category, stats); - if (ret < 0) { - return ret; - } - - Clock::time_point now = Clock::now(); - if (reshard_lock.should_renew(now)) { - // assume outer locks have timespans at least the size of ours, so - // can call inside conditional - if (outer_reshard_lock) { - ret = outer_reshard_lock->renew(now); - if (ret < 0) { - return ret; - } - } - ret = reshard_lock.renew(now); - if (ret < 0) { - ldpp_dout(dpp, -1) << "Error renewing bucket lock: " << ret << dendl; - return ret; - } - } - if (verbose_json_out) { - formatter->close_section(); - formatter->flush(*out); - } else if (out && !(total_entries % 1000)) { - (*out) << " " << total_entries; - } + rgw_cls_bi_entry& entry = *iter; + if (verbose_json_out) { + formatter->open_object_section("entry"); + + encode_json("shard_id", i, formatter); + encode_json("num_entry", stage_entries, formatter); + encode_json("entry", entry, formatter); + } + stage_entries++; + + marker = entry.idx; + + int target_shard_id; + cls_rgw_obj_key cls_key; + RGWObjCategory category; + rgw_bucket_category_stats stats; + bool account = entry.get_info(&cls_key, &category, &stats); + rgw_obj_key key(cls_key); + if (entry.type == BIIndexType::OLH && key.empty()) { + // bogus entry created by https://tracker.ceph.com/issues/46456 + // to fix, skip so it doesn't get include in the new bucket instance + stage_entries--; + ldpp_dout(dpp, 10) << "Dropping entry with empty name, idx=" << marker << dendl; + continue; + } + rgw_obj obj(bucket_info.bucket, key); + RGWMPObj mp; + if (key.ns == RGW_OBJ_NS_MULTIPART && mp.from_meta(key.name)) { + // place the multipart .meta object on the same shard as its head object + obj.index_hash_source = mp.get_key(); + } + ret = store->getRados()->get_target_shard_id(bucket_info.layout.target_index->layout.normal, + obj.get_hash_object(), &target_shard_id); + if (ret < 0) { + ldpp_dout(dpp, -1) << "ERROR: get_target_shard_id() returned ret=" << ret << dendl; + return ret; + } + + int shard_index = (target_shard_id > 0 ? target_shard_id : 0); + + ret = target_shards_mgr.add_entry(shard_index, entry, account, + category, stats); + if (ret < 0) { + return ret; + } + + Clock::time_point now = Clock::now(); + if (reshard_lock.should_renew(now)) { + // assume outer locks have timespans at least the size of ours, so + // can call inside conditional + if (outer_reshard_lock) { + ret = outer_reshard_lock->renew(now); + if (ret < 0) { + return ret; + } + } + ret = reshard_lock.renew(now); + if (ret < 0) { + ldpp_dout(dpp, -1) << "Error renewing bucket lock: " << ret << dendl; + return ret; + } + } + if (verbose_json_out) { + formatter->close_section(); + formatter->flush(*out); + } else if (out && !(stage_entries % 1000)) { + (*out) << " " << stage_entries; + } } // entries loop } } @@ -1020,15 +1104,64 @@ int RGWBucketReshard::do_reshard(const rgw::bucket_index_layout_generation& curr formatter->close_section(); formatter->flush(*out); } else if (out) { - (*out) << " " << total_entries << std::endl; + (*out) << " " << stage_entries << std::endl; } int ret = target_shards_mgr.finish(); if (ret < 0) { - ldpp_dout(dpp, -1) << "ERROR: failed to reshard" << dendl; + ldpp_dout(dpp, -1) << "ERROR: failed to reshard: " << ret << dendl; return -EIO; } return 0; +} + +int RGWBucketReshard::do_reshard(const rgw::bucket_index_layout_generation& current, + const rgw::bucket_index_layout_generation& target, + int max_op_entries, // max num to process per op + bool verbose, + ostream *out, + Formatter *formatter, + ReshardFaultInjector& fault, + const DoutPrefixProvider *dpp, optional_yield y) +{ + if (out) { + (*out) << "tenant: " << bucket_info.bucket.tenant << std::endl; + (*out) << "bucket name: " << bucket_info.bucket.name << std::endl; + } + + if (max_op_entries <= 0) { + ldpp_dout(dpp, 0) << __func__ << + ": can't reshard, non-positive max_op_entries" << dendl; + return -EINVAL; + } + + BucketReshardManager target_shards_mgr(dpp, store, bucket_info, target); + + bool verbose_json_out = verbose && (formatter != nullptr) && (out != nullptr); + + // a log is written to shard going with client op at this state + ceph_assert(bucket_info.layout.resharding == rgw::BucketReshardState::InLogrecord); + int ret = reshard_process(current, max_op_entries, target_shards_mgr, verbose_json_out, out, + formatter, bucket_info.layout.resharding, dpp, y); + if (ret < 0) { + ldpp_dout(dpp, 0) << __func__ << ": failed in logrecord state of reshard ret = " << ret << dendl; + return ret; + } + + ret = change_reshard_state(store, bucket_info, bucket_attrs, fault, dpp, y); + if (ret < 0) { + return ret; + } + + // block the client op and complete the resharding + ceph_assert(bucket_info.layout.resharding == rgw::BucketReshardState::InProgress); + ret = reshard_process(current, max_op_entries, target_shards_mgr, verbose_json_out, out, + formatter, bucket_info.layout.resharding, dpp, y); + if (ret < 0) { + ldpp_dout(dpp, 0) << __func__ << ": failed in progress state of reshard ret = " << ret << dendl; + return ret; + } + return 0; } // RGWBucketReshard::do_reshard int RGWBucketReshard::get_status(const DoutPrefixProvider *dpp, list *status) @@ -1052,7 +1185,7 @@ int RGWBucketReshard::execute(int num_shards, if (ret < 0) { return ret; } - // unlock when scope exits + // TODO: release the lock when purging the old index shards or unsucessful new index shards auto unlock = make_scope_guard([this] { reshard_lock.unlock(); }); if (reshard_log) { @@ -1072,7 +1205,7 @@ int RGWBucketReshard::execute(int num_shards, ret == 0) { // no fault injected, do the reshard ret = do_reshard(bucket_info.layout.current_index, *bucket_info.layout.target_index, - max_op_entries, verbose, out, formatter, dpp, y); + max_op_entries, verbose, out, formatter, fault, dpp, y); } if (ret < 0) { diff --git a/src/rgw/driver/rados/rgw_reshard.h b/src/rgw/driver/rados/rgw_reshard.h index a2097318827f..4101f9f06ed3 100644 --- a/src/rgw/driver/rados/rgw_reshard.h +++ b/src/rgw/driver/rados/rgw_reshard.h @@ -26,6 +26,7 @@ class RGWReshard; +class BucketReshardManager; namespace rgw { namespace sal { class RadosStore; } } @@ -83,13 +84,20 @@ class RGWBucketReshard { // using an initializer_list as an array in contiguous memory // allocated in at once static const std::initializer_list reshard_primes; - + int reshard_process(const rgw::bucket_index_layout_generation& current, + int& max_entries, + BucketReshardManager& target_shards_mgr, + bool verbose_json_out, + std::ostream *out, + Formatter *formatter, rgw::BucketReshardState reshard_stage, + const DoutPrefixProvider *dpp, optional_yield y); int do_reshard(const rgw::bucket_index_layout_generation& current, const rgw::bucket_index_layout_generation& target, int max_entries, bool verbose, std::ostream *os, Formatter *formatter, + ReshardFaultInjector& fault, const DoutPrefixProvider *dpp, optional_yield y); public: diff --git a/src/rgw/rgw_admin.cc b/src/rgw/rgw_admin.cc index 44900e5d1afa..ee8f1451bfa4 100644 --- a/src/rgw/rgw_admin.cc +++ b/src/rgw/rgw_admin.cc @@ -7917,7 +7917,7 @@ next: do { entries.clear(); // if object is specified, we use that as a filter to only retrieve some entries - ret = static_cast(driver)->getRados()->bi_list(bs, object, marker, max_entries, &entries, &is_truncated, null_yield); + ret = static_cast(driver)->getRados()->bi_list(bs, object, marker, max_entries, &entries, &is_truncated, false, null_yield); if (ret < 0) { ldpp_dout(dpp(), 0) << "ERROR: bi_list(): " << cpp_strerror(-ret) << dendl; return -ret; diff --git a/src/rgw/rgw_bucket_layout.cc b/src/rgw/rgw_bucket_layout.cc index 499e8f0cd437..57d37bf381f4 100644 --- a/src/rgw/rgw_bucket_layout.cc +++ b/src/rgw/rgw_bucket_layout.cc @@ -299,6 +299,7 @@ std::string_view to_string(const BucketReshardState& s) { switch (s) { case BucketReshardState::None: return "None"; + case BucketReshardState::InLogrecord: return "InLogrecord"; case BucketReshardState::InProgress: return "InProgress"; default: return "Unknown"; } @@ -309,6 +310,10 @@ bool parse(std::string_view str, BucketReshardState& s) s = BucketReshardState::None; return true; } + if (boost::iequals(str, "InLogrecord")) { + s = BucketReshardState::InLogrecord; + return true; + } if (boost::iequals(str, "InProgress")) { s = BucketReshardState::InProgress; return true; diff --git a/src/rgw/rgw_bucket_layout.h b/src/rgw/rgw_bucket_layout.h index 114f1f1ff589..25dc30c3b5f0 100644 --- a/src/rgw/rgw_bucket_layout.h +++ b/src/rgw/rgw_bucket_layout.h @@ -219,6 +219,7 @@ inline bucket_index_layout_generation log_to_index_layout(const bucket_log_layou enum class BucketReshardState : uint8_t { None, + InLogrecord, InProgress, }; std::string_view to_string(const BucketReshardState& s); diff --git a/src/test/cls_rgw/test_cls_rgw.cc b/src/test/cls_rgw/test_cls_rgw.cc index a2f2fa66a767..0cee97e02a22 100644 --- a/src/test/cls_rgw/test_cls_rgw.cc +++ b/src/test/cls_rgw/test_cls_rgw.cc @@ -238,7 +238,7 @@ TEST_F(cls_rgw, index_remove_object) /* prepare both removal and modification on the same object, this time we'll * first complete modification then remove*/ index_prepare(ioctx, bucket_oid, CLS_RGW_OP_DEL, tag_remove, obj, loc); - index_prepare(ioctx, bucket_oid, CLS_RGW_OP_DEL, tag_modify, obj, loc); + index_prepare(ioctx, bucket_oid, CLS_RGW_OP_ADD, tag_modify, obj, loc); /* complete modification */ total_size -= meta.size; @@ -1340,3 +1340,64 @@ TEST_F(cls_rgw, index_racing_removes) test_stats(ioctx, bucket_oid, RGWObjCategory::None, 0, 0); } + +void set_reshard_status(librados::IoCtx& ioctx, const std::string& oid, + const cls_rgw_bucket_instance_entry& entry) +{ + map bucket_objs; + bucket_objs[0] = oid; + int r = CLSRGWIssueSetBucketResharding(ioctx, bucket_objs, entry, 1)(); + ASSERT_EQ(0, r); +} + +static int reshardlog_list(librados::IoCtx& ioctx, const std::string& oid, + std::list *entries, bool *is_truncated) +{ + int ret = cls_rgw_bi_list(ioctx, oid, "", "", 100, entries, is_truncated, true); + if (ret < 0) { + return ret; + } + return 0; +} + +TEST_F(cls_rgw, reshardlog_list) +{ + string bucket_oid = str_int("reshard", 0); + + ObjectWriteOperation op; + cls_rgw_bucket_init_index(op); + ASSERT_EQ(0, ioctx.operate(bucket_oid, &op)); + + cls_rgw_obj_key obj1 = str_int("obj1", 0); + string tag = str_int("tag-prepare", 0); + string loc = str_int("loc", 0); + index_prepare(ioctx, bucket_oid, CLS_RGW_OP_ADD, tag, obj1, loc); + rgw_bucket_dir_entry_meta meta; + index_complete(ioctx, bucket_oid, CLS_RGW_OP_ADD, tag, 1, obj1, meta); + + // do not record logs + bool is_truncated = false; + std::list entries; + ASSERT_EQ(0, reshardlog_list(ioctx, bucket_oid, &entries, &is_truncated)); + ASSERT_FALSE(is_truncated); + ASSERT_EQ(0u, entries.size()); + + // set reshard status to IN_LOGRECORD + cls_rgw_bucket_instance_entry entry; + entry.reshard_status = cls_rgw_reshard_status::IN_LOGRECORD; + set_reshard_status(ioctx, bucket_oid, entry); + + // record a log in prepare + cls_rgw_obj_key obj2 = str_int("obj2", 0); + index_prepare(ioctx, bucket_oid, CLS_RGW_OP_ADD, tag, obj2, loc); + ASSERT_EQ(0, reshardlog_list(ioctx, bucket_oid, &entries, &is_truncated)); + ASSERT_FALSE(is_truncated); + ASSERT_EQ(1u, entries.size()); + + // overwrite the log writen in prepare + entries.clear(); + index_complete(ioctx, bucket_oid, CLS_RGW_OP_ADD, tag, 1, obj2, meta); + ASSERT_EQ(0, reshardlog_list(ioctx, bucket_oid, &entries, &is_truncated)); + ASSERT_FALSE(is_truncated); + ASSERT_EQ(1u, entries.size()); +}