From: Yehuda Sadeh Date: Thu, 31 Oct 2019 23:11:57 +0000 (-0700) Subject: rgw: sync: fixes X-Git-Tag: v15.1.0~22^2~74 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=9c82276a4f1697bf77185eb5b72cc6417b4cf0fd;p=ceph.git rgw: sync: fixes Signed-off-by: Yehuda Sadeh --- diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index 04ea6e586984..dce0c03c6005 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -1180,6 +1180,10 @@ struct rgw_sync_pipe_info_set { return pipes.end(); } + bool empty() const { + return pipes.empty(); + } + void update_empty_bucket_info(const std::map& buckets_info) { if (buckets_info.empty()) { return; @@ -3659,16 +3663,21 @@ class RGWGetBucketPeersCR : public RGWCoroutine { std::optional source_bucket, const map& all_sources, rgw_sync_pipe_info_set *result) { + ldpp_dout(sync_env->dpp, 20) << __func__ << ": source_zone=" << source_zone.value_or("*") + << " source_bucket=" << source_bucket.value_or(rgw_bucket()) + << " all_sources.size()=" << all_sources.size() << dendl; auto iters = get_pipe_iters(all_sources, source_zone); for (auto i = iters.first; i != iters.second; ++i) { for (auto& pipe : i->second.pipes) { if (!pipe.specific()) { + ldpp_dout(sync_env->dpp, 20) << __func__ << ": pipe=" << pipe << ": skipping" << dendl; continue; } if (source_bucket && *source_bucket != *pipe.source.bucket) { continue; } + ldpp_dout(sync_env->dpp, 20) << __func__ << ": pipe=" << pipe << ": adding" << dendl; result->insert(pipe, source_bucket_info, target_bucket_info); } } @@ -3678,14 +3687,19 @@ class RGWGetBucketPeersCR : public RGWCoroutine { std::optional target_bucket, const map& all_targets, rgw_sync_pipe_info_set *result) { + ldpp_dout(sync_env->dpp, 20) << __func__ << ": target_zone=" << source_zone.value_or("*") + << " target_bucket=" << source_bucket.value_or(rgw_bucket()) + << " all_targets.size()=" << all_targets.size() << dendl; auto iters = get_pipe_iters(all_targets, target_zone); for (auto i = iters.first; i != iters.second; ++i) { for (auto& pipe : i->second.pipes) { if (target_bucket && pipe.dest.bucket && *target_bucket != *pipe.dest.bucket) { + ldpp_dout(sync_env->dpp, 20) << __func__ << ": pipe=" << pipe << ": skipping" << dendl; continue; } + ldpp_dout(sync_env->dpp, 20) << __func__ << ": pipe=" << pipe << ": adding" << dendl; result->insert(pipe, source_bucket_info, target_bucket_info); } } @@ -3705,6 +3719,7 @@ public: sync_env(_sync_env), target_bucket(_target_bucket), source_zone(_source_zone), + source_bucket(_source_bucket), pipes(_pipes), tn(sync_env->sync_tracer->add_node(_tn_parent, "get_bucket_peers", SSTR( "target=" << target_bucket.value_or(rgw_bucket()) @@ -3753,6 +3768,7 @@ RGWRunBucketSourcesSyncCR::RGWRunBucketSourcesSyncCR(RGWDataSyncCtx *_sc, int RGWRunBucketSourcesSyncCR::operate() { reenter(this) { +#if 0 yield { set_status("acquiring sync lock"); auto store = sync_env->store; @@ -3774,15 +3790,22 @@ int RGWRunBucketSourcesSyncCR::operate() } tn->log(10, "took lease"); +#endif yield call(new RGWGetBucketPeersCR(sync_env, target_bucket, sc->source_zone, source_bucket, &pipes, tn)); if (retcode < 0 && retcode != -ENOENT) { tn->log(0, "ERROR: failed to read sync status for bucket"); +#if 0 lease_cr->go_down(); drain_all(); +#endif return set_cr_error(retcode); } ldpp_dout(sync_env->dpp, 20) << __func__ << "(): requested source_bs=" << source_bs << " target_bs=" << target_bs << dendl; + if (pipes.empty()) { + ldpp_dout(sync_env->dpp, 20) << __func__ << "(): no relevant sync pipes found" << dendl; + } + for (siter = pipes.begin(); siter != pipes.end(); ++siter) { { ldpp_dout(sync_env->dpp, 20) << __func__ << "(): sync pipe=" << *siter << dendl; @@ -3805,6 +3828,7 @@ int RGWRunBucketSourcesSyncCR::operate() } ldpp_dout(sync_env->dpp, 20) << __func__ << "(): num shards=" << num_shards << " cur_shard=" << cur_shard << dendl; + for (; num_shards > 0; --num_shards, ++cur_shard) { sync_pair.source_bs.shard_id = cur_shard; if (source_num_shards == target_num_shards) { @@ -3832,7 +3856,9 @@ int RGWRunBucketSourcesSyncCR::operate() } } +#if 0 lease_cr->go_down(); +#endif drain_all(); return set_cr_done(); } @@ -3903,12 +3929,13 @@ int RGWSyncGetBucketInfoCR::operate() void RGWGetBucketPeersCR::update_from_target_bucket_policy() { - auto handler = target_policy->policy_handler.get(); - - if (!pipes) { + if (!target_policy || + !pipes) { return; } + auto handler = target_policy->policy_handler.get(); + filter_sources(source_zone, source_bucket, handler->get_sources(), @@ -3926,12 +3953,13 @@ void RGWGetBucketPeersCR::update_from_target_bucket_policy() void RGWGetBucketPeersCR::update_from_source_bucket_policy() { - auto handler = source_policy->policy_handler.get(); - - if (!pipes) { + if (!source_policy || + !pipes) { return; } + auto handler = source_policy->policy_handler.get(); + filter_targets(sync_env->svc->zone->get_zone().id, target_bucket, handler->get_targets(), diff --git a/src/rgw/rgw_sync_policy.h b/src/rgw/rgw_sync_policy.h index b38df78e46f2..979785deb30f 100644 --- a/src/rgw/rgw_sync_policy.h +++ b/src/rgw/rgw_sync_policy.h @@ -312,6 +312,11 @@ struct rgw_sync_bucket_pipes { }; WRITE_CLASS_ENCODER(rgw_sync_bucket_pipes) +std::ostream& operator<<(std::ostream& os, const rgw_sync_bucket_entity& e); +std::ostream& operator<<(std::ostream& os, const rgw_sync_bucket_pipe& pipe); +std::ostream& operator<<(std::ostream& os, const rgw_sync_bucket_entities& e); +std::ostream& operator<<(std::ostream& os, const rgw_sync_bucket_pipes& pipe); + /* * define data flow between zones. Symmetrical: zones sync from each other. * Directional: one zone fetches data from another.