out << indented{width} << "does not sync from zone\n";
return 0;
}
+
+ if (!pipe.source.bucket) {
+ lderr(store->ctx()) << __func__ << "(): missing source bucket" << dendl;
+ return -EINVAL;
+ }
+
RGWBucketInfo source_bucket_info;
+ rgw_bucket source_bucket;
+ int r = init_bucket(*pipe.source.bucket, source_bucket_info, source_bucket);
+ if (r < 0) {
+ lderr(store->ctx()) << "failed to read source bucket info: " << cpp_strerror(r) << dendl;
+ return r;
+ }
+
std::vector<rgw_bucket_shard_sync_info> status;
- int r = rgw_bucket_sync_status(dpp(), store, pipe, bucket_info, &source_bucket_info, &status);
+ r = rgw_bucket_sync_status(dpp(), store, pipe, bucket_info, &source_bucket_info, &status);
if (r < 0) {
lderr(store->ctx()) << "failed to read bucket sync status: " << cpp_strerror(r) << dendl;
return r;
}
}
-multimap<rgw_zone_id, rgw_sync_bucket_pipe> RGWBucketSyncPolicyHandler::get_all_sources()
+multimap<rgw_zone_id, rgw_sync_bucket_pipe> RGWBucketSyncPolicyHandler::get_all_sources() const
{
multimap<rgw_zone_id, rgw_sync_bucket_pipe> m;
return std::move(m);
}
-multimap<rgw_zone_id, rgw_sync_bucket_pipe> RGWBucketSyncPolicyHandler::get_all_dests()
+multimap<rgw_zone_id, rgw_sync_bucket_pipe> RGWBucketSyncPolicyHandler::get_all_dests() const
{
multimap<rgw_zone_id, rgw_sync_bucket_pipe> m;
m.insert(make_pair(*pipe.dest.zone, pipe));
}
- return std::move(m);
+ return m;
+}
+
+multimap<rgw_zone_id, rgw_sync_bucket_pipe> RGWBucketSyncPolicyHandler::get_all_dests_in_zone(const rgw_zone_id& zone_id) const
+{
+ multimap<rgw_zone_id, rgw_sync_bucket_pipe> m;
+
+ auto iter = targets.find(zone_id);
+ if (iter != targets.end()) {
+ auto& pipes = iter->second.pipe_map;
+
+ for (auto& entry : pipes) {
+ auto& pipe = entry.second;
+ m.insert(make_pair(zone_id, pipe));
+ }
+ }
+
+ for (auto& pipe : resolved_dests) {
+ if (!pipe.dest.zone ||
+ *pipe.dest.zone != zone_id) {
+ continue;
+ }
+
+ m.insert(make_pair(*pipe.dest.zone, pipe));
+ }
+
+ return m;
}
void RGWBucketSyncPolicyHandler::get_pipes(std::set<rgw_sync_bucket_pipe> *_sources, std::set<rgw_sync_bucket_pipe> *_targets,
return sources;
}
- multimap<rgw_zone_id, rgw_sync_bucket_pipe> get_all_sources();
- multimap<rgw_zone_id, rgw_sync_bucket_pipe> get_all_dests();
+ multimap<rgw_zone_id, rgw_sync_bucket_pipe> get_all_sources() const;
+ multimap<rgw_zone_id, rgw_sync_bucket_pipe> get_all_dests() const;
+ multimap<rgw_zone_id, rgw_sync_bucket_pipe> get_all_dests_in_zone(const rgw_zone_id& zone_id) const;
const map<rgw_zone_id, RGWBucketSyncFlowManager::pipe_set>& get_targets() {
return targets;
rgw::sal::RGWRadosStore *store,
const rgw_sync_bucket_pipe& pipe,
const RGWBucketInfo& dest_bucket_info,
- RGWBucketInfo *psource_bucket_info,
+ const RGWBucketInfo *psource_bucket_info,
std::vector<rgw_bucket_shard_sync_info> *status)
{
if (!pipe.source.zone ||
RGWBucketInfo source_bucket_info;
- auto& bucket_ctl = store->getRados()->ctl.bucket;
+ if (!psource_bucket_info) {
+ auto& bucket_ctl = store->getRados()->ctl.bucket;
- int ret = bucket_ctl->read_bucket_info(source_bucket, &source_bucket_info, null_yield);
- if (ret < 0) {
- ldpp_dout(dpp, 0) << "ERROR: failed to get bucket instance info: bucket=" << source_bucket << ": " << cpp_strerror(-ret) << dendl;
- return ret;
- }
+ int ret = bucket_ctl->read_bucket_info(source_bucket, &source_bucket_info, null_yield);
+ if (ret < 0) {
+ ldpp_dout(dpp, 0) << "ERROR: failed to get bucket instance info: bucket=" << source_bucket << ": " << cpp_strerror(-ret) << dendl;
+ return ret;
+ }
- if (psource_bucket_info) {
- *psource_bucket_info = source_bucket_info;
+ psource_bucket_info = &source_bucket_info;
}
+
RGWDataSyncEnv env;
RGWSyncModuleInstanceRef module; // null sync module
env.init(dpp, store->ctx(), store, store->svc(), store->svc()->rados->get_async_processor(),
RGWCoroutinesManager crs(store->ctx(), store->getRados()->get_cr_registry());
return crs.run(new RGWCollectBucketSyncStatusCR(store, &sc,
- source_bucket_info,
+ *psource_bucket_info,
dest_bucket_info,
status));
}
rgw::sal::RGWRadosStore *store,
const rgw_sync_bucket_pipe& pipe,
const RGWBucketInfo& dest_bucket_info,
- RGWBucketInfo *psource_bucket_info,
+ const RGWBucketInfo *psource_bucket_info,
std::vector<rgw_bucket_shard_sync_info> *status);
class RGWDefaultSyncModule : public RGWSyncModule {
void RGWOp_BILog_Status::execute()
{
+ const auto options = s->info.args.get("options");
+ bool merge = (options == "merge");
const auto source_zone = s->info.args.get("source-zone");
- const auto key = s->info.args.get("bucket");
+ const auto source_key = s->info.args.get("source-bucket");
+ auto key = s->info.args.get("bucket");
+ if (key.empty()) {
+ key = source_key;
+ }
if (key.empty()) {
ldpp_dout(s, 4) << "no 'bucket' provided" << dendl;
http_ret = -EINVAL;
int shard_id{-1}; // unused
http_ret = rgw_bucket_parse_bucket_key(s->cct, key, &bucket, &shard_id);
if (http_ret < 0) {
- ldpp_dout(s, 4) << "no 'bucket' provided" << dendl;
+ ldpp_dout(s, 4) << "invalid 'bucket' provided" << dendl;
http_ret = -EINVAL;
return;
}
ldpp_dout(s, 4) << "failed to read bucket info: " << cpp_strerror(http_ret) << dendl;
return;
}
- rgw_sync_bucket_pipe pipe;
- pipe.source.zone = source_zone;
- pipe.source.bucket = info.bucket;
- pipe.dest.zone = store->svc()->zone->zone_id();
- pipe.dest.bucket = info.bucket;
- http_ret = rgw_bucket_sync_status(this, store, pipe, info, nullptr, &status);
+
+ rgw_bucket source_bucket;
+
+ if (source_key.empty()) {
+ source_bucket = info.bucket;
+ } else {
+ http_ret = rgw_bucket_parse_bucket_key(s->cct, source_key, &source_bucket, nullptr);
+ if (http_ret < 0) {
+ ldpp_dout(s, 4) << "invalid 'source-bucket' provided (key=" << source_key << ")" << dendl;
+ return;
+ }
+ }
+
+ const auto& local_zone_id = store->svc()->zone->zone_id();
+
+ if (!merge) {
+ rgw_sync_bucket_pipe pipe;
+ pipe.source.zone = source_zone;
+ pipe.source.bucket = source_bucket;
+ pipe.dest.zone = local_zone_id;
+ pipe.dest.bucket = info.bucket;
+
+ ldout(s->cct, 20) << "RGWOp_BILog_Status::execute(): getting sync status for pipe=" << pipe << dendl;
+
+ http_ret = rgw_bucket_sync_status(this, store, pipe, info, nullptr, &status);
+
+ if (http_ret < 0) {
+ lderr(s->cct) << "ERROR: rgw_bucket_sync_status() on pipe=" << pipe << " returned ret=" << http_ret << dendl;
+ }
+ return;
+ }
+
+ rgw_zone_id source_zone_id(source_zone);
+
+ RGWBucketSyncPolicyHandlerRef source_handler;
+ http_ret = store->ctl()->bucket->get_sync_policy_handler(source_zone_id, source_bucket, &source_handler, null_yield);
+ if (http_ret < 0) {
+ lderr(s->cct) << "could not get bucket sync policy handler (r=" << http_ret << ")" << dendl;
+ return;
+ }
+
+ auto local_dests = source_handler->get_all_dests_in_zone(local_zone_id);
+
+ std::vector<rgw_bucket_shard_sync_info> current_status;
+ for (auto& entry : local_dests) {
+ auto pipe = entry.second;
+
+ ldout(s->cct, 20) << "RGWOp_BILog_Status::execute(): getting sync status for pipe=" << pipe << dendl;
+
+ RGWBucketInfo *pinfo = &info;
+ std::optional<RGWBucketInfo> opt_dest_info;
+
+ if (!pipe.dest.bucket) {
+ /* Uh oh, something went wrong */
+ ldout(s->cct, 20) << "ERROR: RGWOp_BILog_Status::execute(): BUG: pipe.dest.bucket was not initialized" << pipe << dendl;
+ http_ret = -EIO;
+ return;
+ }
+
+ if (*pipe.dest.bucket != info.bucket) {
+ opt_dest_info.emplace();
+ pinfo = &(*opt_dest_info);
+
+ /* dest bucket might not have a bucket id */
+ http_ret = store->ctl()->bucket->read_bucket_info(*pipe.dest.bucket,
+ pinfo,
+ s->yield,
+ RGWBucketCtl::BucketInstance::GetParams(),
+ nullptr);
+ if (http_ret < 0) {
+ ldpp_dout(s, 4) << "failed to read target bucket info (bucket=: " << cpp_strerror(http_ret) << dendl;
+ return;
+ }
+
+ pipe.dest.bucket = pinfo->bucket;
+ }
+
+ int r = rgw_bucket_sync_status(this, store, pipe, *pinfo, &info, ¤t_status);
+ if (r < 0) {
+ lderr(s->cct) << "ERROR: rgw_bucket_sync_status() on pipe=" << pipe << " returned ret=" << r << dendl;
+ http_ret = r;
+ return;
+ }
+
+ if (status.empty()) {
+ status = std::move(current_status);
+ } else {
+ if (current_status.size() !=
+ status.size()) {
+ http_ret = -EINVAL;
+ lderr(s->cct) << "ERROR: different number of shards for sync status of buckets syncing from the same source: status.size()= " << status.size() << " current_status.size()=" << current_status.size() << dendl;
+ return;
+ }
+ auto m = status.begin();
+ for (auto& cur_shard_status : current_status) {
+ auto& result_shard_status = *m++;
+ // always take the first marker, or any later marker that's smaller
+ if (cur_shard_status.inc_marker.position < result_shard_status.inc_marker.position) {
+ result_shard_status = std::move(cur_shard_status);
+ }
+ }
+ }
+ }
}
void RGWOp_BILog_Status::send_response()
// query peers for sync status
set_status("fetching sync status from relevant peers");
yield {
- // query data sync status from each sync peer
- rgw_http_param_pair params[] = {
- { "type", "bucket-index" },
- { "status", nullptr },
- { "bucket", bucket_instance.c_str() },
- { "source-zone", zone_id.c_str() },
- { nullptr, nullptr }
- };
-
const auto& all_dests = source_policy->policy_handler->get_all_dests();
- set<rgw_zone_id> target_zones;
- rgw_zone_id last_zone;
- for (const auto& entry : all_dests) {
- if (entry.first != last_zone) {
- last_zone = entry.first;
- target_zones.insert(last_zone);
+ vector<rgw_zone_id> zids;
+ rgw_zone_id last_zid;
+ for (auto& diter : all_dests) {
+ const auto& zid = diter.first;
+ if (zid == last_zid) {
+ continue;
}
+ last_zid = zid;
+ zids.push_back(zid);
}
- peer_status.resize(target_zones.size());
+ peer_status.resize(zids.size());
auto& zone_conn_map = store->svc()->zone->get_zone_conn_map();
auto p = peer_status.begin();
- for (auto& zid : target_zones) {
+ for (auto& zid : zids) {
+ // query data sync status from each sync peer
+ rgw_http_param_pair params[] = {
+ { "type", "bucket-index" },
+ { "status", nullptr },
+ { "options", "merge" },
+ { "source-bucket", bucket_instance.c_str() },
+ { "source-zone", zone_id.c_str() },
+ { nullptr, nullptr }
+ };
+
auto ziter = zone_conn_map.find(zid);
if (ziter == zone_conn_map.end()) {
ldout(cct, 0) << "WARNING: no connection to zone " << zid << ", can't trim bucket: " << bucket << dendl;