return ret;
}
+ if (pipes.empty()) {
+ ldpp_dout(this, 0) << "No peers. This is not a valid multisite configuration." << dendl;
+ return -EINVAL;
+ }
+
for (auto& pipe : pipes) {
auto& szone = pipe.source.zone;
return 0;
}
+int RGWBucketPipeSyncStatusManager::remote_info(const DoutPrefixProvider *dpp,
+ source& s,
+ uint64_t* oldest_gen,
+ uint64_t* latest_gen,
+ uint64_t* num_shards)
+{
+ rgw_bucket_index_marker_info remote_info;
+ BucketIndexShardsManager remote_markers;
+ auto r = rgw_read_remote_bilog_info(dpp, s.sc.conn, s.info.bucket,
+ remote_info, remote_markers,
+ null_yield);
+
+ if (r < 0) {
+ ldpp_dout(dpp, 0) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " rgw_read_remote_bilog_info: r="
+ << r << dendl;
+ return r;
+ }
+ if (oldest_gen)
+ *oldest_gen = remote_info.oldest_gen;
+
+ if (latest_gen)
+ *latest_gen = remote_info.latest_gen;
+
+ if (num_shards)
+ *num_shards = remote_markers.get().size();
+
+ return 0;
+}
+
tl::expected<std::unique_ptr<RGWBucketPipeSyncStatusManager>, int>
RGWBucketPipeSyncStatusManager::construct(
const DoutPrefixProvider* dpp,
std::map<int, rgw_bucket_shard_sync_info> sync_status;
list<RGWCoroutinesStack *> stacks;
- for (auto& mgr : source_mgrs) {
- RGWCoroutinesStack *stack = new RGWCoroutinesStack(store->ctx(), &cr_mgr);
- for (int i = 0; i < mgr.num_pipes(); ++i) {
- stack->call(mgr.read_sync_status_cr(i, &sync_status[i]));
- }
+ auto sz = sources.begin();
- stacks.push_back(stack);
+ if (source_zone) {
+ sz = std::find_if(sources.begin(), sources.end(),
+ [this](const source& s) {
+ return s.sc.source_zone == *source_zone;
+ }
+ );
+ if (sz == sources.end()) {
+ ldpp_dout(this, 0) << "ERROR: failed to find source zone: "
+ << *source_zone << dendl;
+ return tl::unexpected(-ENOENT);
+ }
+ } else {
+ ldpp_dout(this, 5) << "No source zone specified, using source zone: "
+ << sz->sc.source_zone << dendl;
+ return tl::unexpected(-ENOENT);
+ }
+ uint64_t num_shards, latest_gen;
+ auto ret = remote_info(dpp, *sz, nullptr, &latest_gen, &num_shards);
+ if (!ret) {
+ ldpp_dout(this, 5) << "Unable to get remote info: "
+ << ret << dendl;
+ return tl::unexpected(ret);
+ }
+ auto stack = new RGWCoroutinesStack(store->ctx(), &cr_mgr);
+ std::vector<rgw_bucket_sync_pair_info> pairs(num_shards);
+ for (auto shard = 0u; shard < num_shards; ++shard) {
+ auto& pair = pairs[shard];
+ pair.source_bs.bucket = sz->info.bucket;
+ pair.dest_bucket = sz->dest;
+ pair.source_bs.shard_id = shard;
+ stack->call(new RGWReadBucketPipeSyncStatusCoroutine(
+ &sz->sc, pair, &sync_status[shard],
+ nullptr, latest_gen));
}
- int ret = cr_mgr.run(dpp, stacks);
+ stacks.push_back(stack);
+
+ ret = cr_mgr.run(dpp, stacks);
if (ret < 0) {
ldpp_dout(this, 0) << "ERROR: failed to read sync status for "
<< bucket_str{dest_bucket} << dendl;