From: Yehuda Sadeh Date: Sat, 18 Jan 2020 01:33:41 +0000 (-0800) Subject: rgw: bucket autotrim: fetch aggregated bilog info of all bucket targets X-Git-Tag: v15.1.0~22^2~20 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=a44f6ace22226b0bc52db003dbc2c62377b36705;p=ceph.git rgw: bucket autotrim: fetch aggregated bilog info of all bucket targets Add a new option to the bilog status request that allows returning the min status for all buckets in a zone that sync from a specific bucket. Signed-off-by: Yehuda Sadeh --- diff --git a/src/rgw/rgw_admin.cc b/src/rgw/rgw_admin.cc index a1b44bf5665a..0fdb6e5976cf 100644 --- a/src/rgw/rgw_admin.cc +++ b/src/rgw/rgw_admin.cc @@ -2266,9 +2266,22 @@ static int bucket_source_sync_status(rgw::sal::RGWRadosStore *store, const RGWZo out << indented{width} << "does not sync from zone\n"; return 0; } + + if (!pipe.source.bucket) { + lderr(store->ctx()) << __func__ << "(): missing source bucket" << dendl; + return -EINVAL; + } + RGWBucketInfo source_bucket_info; + rgw_bucket source_bucket; + int r = init_bucket(*pipe.source.bucket, source_bucket_info, source_bucket); + if (r < 0) { + lderr(store->ctx()) << "failed to read source bucket info: " << cpp_strerror(r) << dendl; + return r; + } + std::vector status; - int r = rgw_bucket_sync_status(dpp(), store, pipe, bucket_info, &source_bucket_info, &status); + r = rgw_bucket_sync_status(dpp(), store, pipe, bucket_info, &source_bucket_info, &status); if (r < 0) { lderr(store->ctx()) << "failed to read bucket sync status: " << cpp_strerror(r) << dendl; return r; diff --git a/src/rgw/rgw_bucket_sync.cc b/src/rgw/rgw_bucket_sync.cc index 565278902163..da4556a8b6de 100644 --- a/src/rgw/rgw_bucket_sync.cc +++ b/src/rgw/rgw_bucket_sync.cc @@ -812,7 +812,7 @@ void RGWBucketSyncPolicyHandler::reflect(RGWBucketSyncFlowManager::pipe_set *pso } } -multimap RGWBucketSyncPolicyHandler::get_all_sources() +multimap RGWBucketSyncPolicyHandler::get_all_sources() const { multimap m; @@ -838,7 +838,7 @@ multimap RGWBucketSyncPolicyHandler::get_all_ return std::move(m); } -multimap RGWBucketSyncPolicyHandler::get_all_dests() +multimap RGWBucketSyncPolicyHandler::get_all_dests() const { multimap m; @@ -861,7 +861,33 @@ multimap RGWBucketSyncPolicyHandler::get_all_ m.insert(make_pair(*pipe.dest.zone, pipe)); } - return std::move(m); + return m; +} + +multimap RGWBucketSyncPolicyHandler::get_all_dests_in_zone(const rgw_zone_id& zone_id) const +{ + multimap m; + + auto iter = targets.find(zone_id); + if (iter != targets.end()) { + auto& pipes = iter->second.pipe_map; + + for (auto& entry : pipes) { + auto& pipe = entry.second; + m.insert(make_pair(zone_id, pipe)); + } + } + + for (auto& pipe : resolved_dests) { + if (!pipe.dest.zone || + *pipe.dest.zone != zone_id) { + continue; + } + + m.insert(make_pair(*pipe.dest.zone, pipe)); + } + + return m; } void RGWBucketSyncPolicyHandler::get_pipes(std::set *_sources, std::set *_targets, diff --git a/src/rgw/rgw_bucket_sync.h b/src/rgw/rgw_bucket_sync.h index 63ec487f336d..f6a19261eba5 100644 --- a/src/rgw/rgw_bucket_sync.h +++ b/src/rgw/rgw_bucket_sync.h @@ -366,8 +366,9 @@ public: return sources; } - multimap get_all_sources(); - multimap get_all_dests(); + multimap get_all_sources() const; + multimap get_all_dests() const; + multimap get_all_dests_in_zone(const rgw_zone_id& zone_id) const; const map& get_targets() { return targets; diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index 91fc9e1b252c..adfc6babc2e4 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -4895,7 +4895,7 @@ int rgw_bucket_sync_status(const DoutPrefixProvider *dpp, rgw::sal::RGWRadosStore *store, const rgw_sync_bucket_pipe& pipe, const RGWBucketInfo& dest_bucket_info, - RGWBucketInfo *psource_bucket_info, + const RGWBucketInfo *psource_bucket_info, std::vector *status) { if (!pipe.source.zone || @@ -4914,18 +4914,19 @@ int rgw_bucket_sync_status(const DoutPrefixProvider *dpp, RGWBucketInfo source_bucket_info; - auto& bucket_ctl = store->getRados()->ctl.bucket; + if (!psource_bucket_info) { + auto& bucket_ctl = store->getRados()->ctl.bucket; - int ret = bucket_ctl->read_bucket_info(source_bucket, &source_bucket_info, null_yield); - if (ret < 0) { - ldpp_dout(dpp, 0) << "ERROR: failed to get bucket instance info: bucket=" << source_bucket << ": " << cpp_strerror(-ret) << dendl; - return ret; - } + int ret = bucket_ctl->read_bucket_info(source_bucket, &source_bucket_info, null_yield); + if (ret < 0) { + ldpp_dout(dpp, 0) << "ERROR: failed to get bucket instance info: bucket=" << source_bucket << ": " << cpp_strerror(-ret) << dendl; + return ret; + } - if (psource_bucket_info) { - *psource_bucket_info = source_bucket_info; + psource_bucket_info = &source_bucket_info; } + RGWDataSyncEnv env; RGWSyncModuleInstanceRef module; // null sync module env.init(dpp, store->ctx(), store, store->svc(), store->svc()->rados->get_async_processor(), @@ -4936,7 +4937,7 @@ int rgw_bucket_sync_status(const DoutPrefixProvider *dpp, RGWCoroutinesManager crs(store->ctx(), store->getRados()->get_cr_registry()); return crs.run(new RGWCollectBucketSyncStatusCR(store, &sc, - source_bucket_info, + *psource_bucket_info, dest_bucket_info, status)); } diff --git a/src/rgw/rgw_data_sync.h b/src/rgw/rgw_data_sync.h index 6e841ca8a5ab..29267ca40c45 100644 --- a/src/rgw/rgw_data_sync.h +++ b/src/rgw/rgw_data_sync.h @@ -650,7 +650,7 @@ int rgw_bucket_sync_status(const DoutPrefixProvider *dpp, rgw::sal::RGWRadosStore *store, const rgw_sync_bucket_pipe& pipe, const RGWBucketInfo& dest_bucket_info, - RGWBucketInfo *psource_bucket_info, + const RGWBucketInfo *psource_bucket_info, std::vector *status); class RGWDefaultSyncModule : public RGWSyncModule { diff --git a/src/rgw/rgw_rest_log.cc b/src/rgw/rgw_rest_log.cc index 53ac3f386536..735e652014a0 100644 --- a/src/rgw/rgw_rest_log.cc +++ b/src/rgw/rgw_rest_log.cc @@ -832,8 +832,14 @@ public: void RGWOp_BILog_Status::execute() { + const auto options = s->info.args.get("options"); + bool merge = (options == "merge"); const auto source_zone = s->info.args.get("source-zone"); - const auto key = s->info.args.get("bucket"); + const auto source_key = s->info.args.get("source-bucket"); + auto key = s->info.args.get("bucket"); + if (key.empty()) { + key = source_key; + } if (key.empty()) { ldpp_dout(s, 4) << "no 'bucket' provided" << dendl; http_ret = -EINVAL; @@ -844,7 +850,7 @@ void RGWOp_BILog_Status::execute() int shard_id{-1}; // unused http_ret = rgw_bucket_parse_bucket_key(s->cct, key, &bucket, &shard_id); if (http_ret < 0) { - ldpp_dout(s, 4) << "no 'bucket' provided" << dendl; + ldpp_dout(s, 4) << "invalid 'bucket' provided" << dendl; http_ret = -EINVAL; return; } @@ -857,12 +863,109 @@ void RGWOp_BILog_Status::execute() ldpp_dout(s, 4) << "failed to read bucket info: " << cpp_strerror(http_ret) << dendl; return; } - rgw_sync_bucket_pipe pipe; - pipe.source.zone = source_zone; - pipe.source.bucket = info.bucket; - pipe.dest.zone = store->svc()->zone->zone_id(); - pipe.dest.bucket = info.bucket; - http_ret = rgw_bucket_sync_status(this, store, pipe, info, nullptr, &status); + + rgw_bucket source_bucket; + + if (source_key.empty()) { + source_bucket = info.bucket; + } else { + http_ret = rgw_bucket_parse_bucket_key(s->cct, source_key, &source_bucket, nullptr); + if (http_ret < 0) { + ldpp_dout(s, 4) << "invalid 'source-bucket' provided (key=" << source_key << ")" << dendl; + return; + } + } + + const auto& local_zone_id = store->svc()->zone->zone_id(); + + if (!merge) { + rgw_sync_bucket_pipe pipe; + pipe.source.zone = source_zone; + pipe.source.bucket = source_bucket; + pipe.dest.zone = local_zone_id; + pipe.dest.bucket = info.bucket; + + ldout(s->cct, 20) << "RGWOp_BILog_Status::execute(): getting sync status for pipe=" << pipe << dendl; + + http_ret = rgw_bucket_sync_status(this, store, pipe, info, nullptr, &status); + + if (http_ret < 0) { + lderr(s->cct) << "ERROR: rgw_bucket_sync_status() on pipe=" << pipe << " returned ret=" << http_ret << dendl; + } + return; + } + + rgw_zone_id source_zone_id(source_zone); + + RGWBucketSyncPolicyHandlerRef source_handler; + http_ret = store->ctl()->bucket->get_sync_policy_handler(source_zone_id, source_bucket, &source_handler, null_yield); + if (http_ret < 0) { + lderr(s->cct) << "could not get bucket sync policy handler (r=" << http_ret << ")" << dendl; + return; + } + + auto local_dests = source_handler->get_all_dests_in_zone(local_zone_id); + + std::vector current_status; + for (auto& entry : local_dests) { + auto pipe = entry.second; + + ldout(s->cct, 20) << "RGWOp_BILog_Status::execute(): getting sync status for pipe=" << pipe << dendl; + + RGWBucketInfo *pinfo = &info; + std::optional opt_dest_info; + + if (!pipe.dest.bucket) { + /* Uh oh, something went wrong */ + ldout(s->cct, 20) << "ERROR: RGWOp_BILog_Status::execute(): BUG: pipe.dest.bucket was not initialized" << pipe << dendl; + http_ret = -EIO; + return; + } + + if (*pipe.dest.bucket != info.bucket) { + opt_dest_info.emplace(); + pinfo = &(*opt_dest_info); + + /* dest bucket might not have a bucket id */ + http_ret = store->ctl()->bucket->read_bucket_info(*pipe.dest.bucket, + pinfo, + s->yield, + RGWBucketCtl::BucketInstance::GetParams(), + nullptr); + if (http_ret < 0) { + ldpp_dout(s, 4) << "failed to read target bucket info (bucket=: " << cpp_strerror(http_ret) << dendl; + return; + } + + pipe.dest.bucket = pinfo->bucket; + } + + int r = rgw_bucket_sync_status(this, store, pipe, *pinfo, &info, ¤t_status); + if (r < 0) { + lderr(s->cct) << "ERROR: rgw_bucket_sync_status() on pipe=" << pipe << " returned ret=" << r << dendl; + http_ret = r; + return; + } + + if (status.empty()) { + status = std::move(current_status); + } else { + if (current_status.size() != + status.size()) { + http_ret = -EINVAL; + lderr(s->cct) << "ERROR: different number of shards for sync status of buckets syncing from the same source: status.size()= " << status.size() << " current_status.size()=" << current_status.size() << dendl; + return; + } + auto m = status.begin(); + for (auto& cur_shard_status : current_status) { + auto& result_shard_status = *m++; + // always take the first marker, or any later marker that's smaller + if (cur_shard_status.inc_marker.position < result_shard_status.inc_marker.position) { + result_shard_status = std::move(cur_shard_status); + } + } + } + } } void RGWOp_BILog_Status::send_response() diff --git a/src/rgw/rgw_trim_bilog.cc b/src/rgw/rgw_trim_bilog.cc index c71f47c7ebae..abfba41331b0 100644 --- a/src/rgw/rgw_trim_bilog.cc +++ b/src/rgw/rgw_trim_bilog.cc @@ -476,32 +476,35 @@ int BucketTrimInstanceCR::operate() // query peers for sync status set_status("fetching sync status from relevant peers"); yield { - // query data sync status from each sync peer - rgw_http_param_pair params[] = { - { "type", "bucket-index" }, - { "status", nullptr }, - { "bucket", bucket_instance.c_str() }, - { "source-zone", zone_id.c_str() }, - { nullptr, nullptr } - }; - const auto& all_dests = source_policy->policy_handler->get_all_dests(); - set target_zones; - rgw_zone_id last_zone; - for (const auto& entry : all_dests) { - if (entry.first != last_zone) { - last_zone = entry.first; - target_zones.insert(last_zone); + vector zids; + rgw_zone_id last_zid; + for (auto& diter : all_dests) { + const auto& zid = diter.first; + if (zid == last_zid) { + continue; } + last_zid = zid; + zids.push_back(zid); } - peer_status.resize(target_zones.size()); + peer_status.resize(zids.size()); auto& zone_conn_map = store->svc()->zone->get_zone_conn_map(); auto p = peer_status.begin(); - for (auto& zid : target_zones) { + for (auto& zid : zids) { + // query data sync status from each sync peer + rgw_http_param_pair params[] = { + { "type", "bucket-index" }, + { "status", nullptr }, + { "options", "merge" }, + { "source-bucket", bucket_instance.c_str() }, + { "source-zone", zone_id.c_str() }, + { nullptr, nullptr } + }; + auto ziter = zone_conn_map.find(zid); if (ziter == zone_conn_map.end()) { ldout(cct, 0) << "WARNING: no connection to zone " << zid << ", can't trim bucket: " << bucket << dendl;