return pipes.end();
}
+ bool empty() const {
+ return pipes.empty();
+ }
+
void update_empty_bucket_info(const std::map<rgw_bucket, RGWBucketInfo>& buckets_info) {
if (buckets_info.empty()) {
return;
std::optional<rgw_bucket> source_bucket,
const map<string, RGWBucketSyncFlowManager::pipe_set>& all_sources,
rgw_sync_pipe_info_set *result) {
+ ldpp_dout(sync_env->dpp, 20) << __func__ << ": source_zone=" << source_zone.value_or("*")
+ << " source_bucket=" << source_bucket.value_or(rgw_bucket())
+ << " all_sources.size()=" << all_sources.size() << dendl;
auto iters = get_pipe_iters(all_sources, source_zone);
for (auto i = iters.first; i != iters.second; ++i) {
for (auto& pipe : i->second.pipes) {
if (!pipe.specific()) {
+ ldpp_dout(sync_env->dpp, 20) << __func__ << ": pipe=" << pipe << ": skipping" << dendl;
continue;
}
if (source_bucket &&
*source_bucket != *pipe.source.bucket) {
continue;
}
+ ldpp_dout(sync_env->dpp, 20) << __func__ << ": pipe=" << pipe << ": adding" << dendl;
result->insert(pipe, source_bucket_info, target_bucket_info);
}
}
std::optional<rgw_bucket> target_bucket,
const map<string, RGWBucketSyncFlowManager::pipe_set>& all_targets,
rgw_sync_pipe_info_set *result) {
+ ldpp_dout(sync_env->dpp, 20) << __func__ << ": target_zone=" << source_zone.value_or("*")
+ << " target_bucket=" << source_bucket.value_or(rgw_bucket())
+ << " all_targets.size()=" << all_targets.size() << dendl;
auto iters = get_pipe_iters(all_targets, target_zone);
for (auto i = iters.first; i != iters.second; ++i) {
for (auto& pipe : i->second.pipes) {
if (target_bucket &&
pipe.dest.bucket &&
*target_bucket != *pipe.dest.bucket) {
+ ldpp_dout(sync_env->dpp, 20) << __func__ << ": pipe=" << pipe << ": skipping" << dendl;
continue;
}
+ ldpp_dout(sync_env->dpp, 20) << __func__ << ": pipe=" << pipe << ": adding" << dendl;
result->insert(pipe, source_bucket_info, target_bucket_info);
}
}
sync_env(_sync_env),
target_bucket(_target_bucket),
source_zone(_source_zone),
+ source_bucket(_source_bucket),
pipes(_pipes),
tn(sync_env->sync_tracer->add_node(_tn_parent, "get_bucket_peers",
SSTR( "target=" << target_bucket.value_or(rgw_bucket())
int RGWRunBucketSourcesSyncCR::operate()
{
reenter(this) {
+#if 0
yield {
set_status("acquiring sync lock");
auto store = sync_env->store;
}
tn->log(10, "took lease");
+#endif
yield call(new RGWGetBucketPeersCR(sync_env, target_bucket, sc->source_zone, source_bucket, &pipes, tn));
if (retcode < 0 && retcode != -ENOENT) {
tn->log(0, "ERROR: failed to read sync status for bucket");
+#if 0
lease_cr->go_down();
drain_all();
+#endif
return set_cr_error(retcode);
}
ldpp_dout(sync_env->dpp, 20) << __func__ << "(): requested source_bs=" << source_bs << " target_bs=" << target_bs << dendl;
+ if (pipes.empty()) {
+ ldpp_dout(sync_env->dpp, 20) << __func__ << "(): no relevant sync pipes found" << dendl;
+ }
+
for (siter = pipes.begin(); siter != pipes.end(); ++siter) {
{
ldpp_dout(sync_env->dpp, 20) << __func__ << "(): sync pipe=" << *siter << dendl;
}
ldpp_dout(sync_env->dpp, 20) << __func__ << "(): num shards=" << num_shards << " cur_shard=" << cur_shard << dendl;
+
for (; num_shards > 0; --num_shards, ++cur_shard) {
sync_pair.source_bs.shard_id = cur_shard;
if (source_num_shards == target_num_shards) {
}
}
+#if 0
lease_cr->go_down();
+#endif
drain_all();
return set_cr_done();
}
void RGWGetBucketPeersCR::update_from_target_bucket_policy()
{
- auto handler = target_policy->policy_handler.get();
-
- if (!pipes) {
+ if (!target_policy ||
+ !pipes) {
return;
}
+ auto handler = target_policy->policy_handler.get();
+
filter_sources(source_zone,
source_bucket,
handler->get_sources(),
void RGWGetBucketPeersCR::update_from_source_bucket_policy()
{
- auto handler = source_policy->policy_handler.get();
-
- if (!pipes) {
+ if (!source_policy ||
+ !pipes) {
return;
}
+ auto handler = source_policy->policy_handler.get();
+
filter_targets(sync_env->svc->zone->get_zone().id,
target_bucket,
handler->get_targets(),