const rgw_sync_bucket_pipe& pipe,
int width, std::ostream& out)
{
- out << indented{width, "source zone"} << source.id << " (" << source.name << ")\n";
+ out << indented{width, "source zone"} << source.id << " (" << source.name << ")" << std::endl;
// syncing from this zone?
if (!zone.syncs_from(source.name)) {
out << indented{width} << "does not sync from zone\n";
return 0;
}
+ RGWBucketInfo source_bucket_info;
std::vector<rgw_bucket_shard_sync_info> status;
- int r = rgw_bucket_sync_status(dpp(), store, pipe, bucket_info, &status);
+ int r = rgw_bucket_sync_status(dpp(), store, pipe, bucket_info, &source_bucket_info, &status);
if (r < 0) {
lderr(store->ctx()) << "failed to read bucket sync status: " << cpp_strerror(r) << dendl;
return r;
}
+ out << indented{width, "source bucket"} << source_bucket_info.bucket.get_key() << std::endl;
+
int num_full = 0;
int num_inc = 0;
uint64_t full_complete = 0;
out << indented{width} << "incremental sync: " << num_inc << "/" << total_shards << " shards\n";
BucketIndexShardsManager remote_markers;
- r = remote_bilog_markers(store, source, conn, bucket_info, &remote_markers);
+ r = remote_bilog_markers(store, source, conn, source_bucket_info, &remote_markers);
if (r < 0) {
lderr(store->ctx()) << "failed to read remote log: " << cpp_strerror(r) << dendl;
return r;
static int bucket_sync_status(rgw::sal::RGWRadosStore *store, const RGWBucketInfo& info,
const rgw_zone_id& source_zone_id,
+ std::optional<rgw_bucket>& opt_source_bucket,
std::ostream& out)
{
const RGWRealm& realm = store->svc()->zone->get_realm();
out << indented{width, "bucket"} << info.bucket << "\n\n";
if (!store->ctl()->bucket->bucket_imports_data(info.bucket, null_yield)) {
- out << "Sync is disabled for bucket " << info.bucket.name << '\n';
+ out << "Sync is disabled for bucket " << info.bucket.name << " or bucket has no sync sources" << std::endl;
return 0;
}
return r;
}
- auto& sources = handler->get_sources();
+ auto sources = handler->get_all_sources();
auto& zone_conn_map = store->svc()->zone->get_zone_conn_map();
set<rgw_zone_id> zone_ids;
zone_ids.insert(source_zone_id);
} else {
for (const auto& entry : zonegroup.zones) {
- auto c = zone_conn_map.find(entry.second.id);
- if (c == zone_conn_map.end()) {
- continue;
- }
- zone_ids.insert(entry.second.name);
+ zone_ids.insert(entry.second.id);
}
}
if (z == zonegroup.zones.end()) { /* should't happen */
continue;
}
- auto c = zone_conn_map.find(source_zone_id);
+ auto c = zone_conn_map.find(zone_id.id);
if (c == zone_conn_map.end()) { /* should't happen */
continue;
}
- for (auto& m : sources) {
- for (auto& entry : m.second.pipe_map) {
- auto& pipe = entry.second;
- if (pipe.source.zone.value_or(rgw_zone_id()) == z->second.id) {
- bucket_source_sync_status(store, zone, z->second,
- c->second,
- info, pipe,
- width, out);
- }
+ for (auto& entry : sources) {
+ auto& pipe = entry.second;
+ if (opt_source_bucket &&
+ pipe.source.bucket != opt_source_bucket) {
+ continue;
+ }
+ if (pipe.source.zone.value_or(rgw_zone_id()) == z->second.id) {
+ bucket_source_sync_status(store, zone, z->second,
+ c->second,
+ info, pipe,
+ width, out);
}
}
}
if (ret < 0) {
return -ret;
}
- bucket_sync_status(store, bucket_info, source_zone, std::cout);
+ bucket_sync_status(store, bucket_info, source_zone, opt_source_bucket, std::cout);
}
if (opt_cmd == OPT::BUCKET_SYNC_MARKERS) {
}
}
+multimap<rgw_zone_id, rgw_sync_bucket_pipe> RGWBucketSyncPolicyHandler::get_all_sources()
+{
+ multimap<rgw_zone_id, rgw_sync_bucket_pipe> m;
+
+ for (auto& source_entry : sources) {
+ auto& zone_id = source_entry.first;
+
+ auto& pipes = source_entry.second.pipe_map;
+
+ for (auto& entry : pipes) {
+ auto& pipe = entry.second;
+ m.insert(make_pair(zone_id, pipe));
+ }
+ }
+
+ for (auto& pipe : resolved_sources) {
+ if (!pipe.source.zone) {
+ continue;
+ }
+
+ m.insert(make_pair(*pipe.source.zone, pipe));
+ }
+
+ return std::move(m);
+}
+
void RGWBucketSyncPolicyHandler::get_pipes(std::set<rgw_sync_bucket_pipe> *_sources, std::set<rgw_sync_bucket_pipe> *_targets,
std::optional<rgw_sync_bucket_entity> filter_peer) { /* return raw pipes */
for (auto& entry : source_pipes.pipe_map) {
ldpp_dout(sync_env->dpp, 20) << __func__ << "(): num shards=" << num_shards << " cur_shard=" << cur_shard << dendl;
for (; num_shards > 0; --num_shards, ++cur_shard) {
+ /*
+ * use a negatvie shard_id for backward compatibility,
+ * this affects the crafted status oid
+ */
sync_pair.source_bs.shard_id = (source_num_shards > 0 ? cur_shard : -1);
if (source_num_shards == target_num_shards) {
sync_pair.dest_bs.shard_id = sync_pair.source_bs.shard_id;
rgw::sal::RGWRadosStore *store,
const rgw_sync_bucket_pipe& pipe,
const RGWBucketInfo& dest_bucket_info,
+ RGWBucketInfo *psource_bucket_info,
std::vector<rgw_bucket_shard_sync_info> *status)
{
if (!pipe.source.zone ||
RGWBucketInfo source_bucket_info;
- auto obj_ctx = store->svc()->sysobj->init_obj_ctx();
+ auto& bucket_ctl = store->getRados()->ctl.bucket;
- int ret = store->getRados()->get_bucket_instance_info(obj_ctx, source_bucket, source_bucket_info, nullptr, nullptr, null_yield);
+ int ret = bucket_ctl->read_bucket_info(source_bucket, &source_bucket_info, null_yield);
if (ret < 0) {
ldpp_dout(dpp, 0) << "ERROR: failed to get bucket instance info: bucket=" << source_bucket << ": " << cpp_strerror(-ret) << dendl;
return ret;
}
+ if (psource_bucket_info) {
+ *psource_bucket_info = source_bucket_info;
+ }
+
RGWDataSyncEnv env;
RGWSyncModuleInstanceRef module; // null sync module
env.init(dpp, store->ctx(), store, store->svc(), store->svc()->rados->get_async_processor(),