static int bucket_source_sync_status(rgw::sal::RGWRadosStore *store, const RGWZone& zone,
const RGWZone& source, RGWRESTConn *conn,
const RGWBucketInfo& bucket_info,
+ const rgw_sync_bucket_pipe& pipe,
int width, std::ostream& out)
{
out << indented{width, "source zone"} << source.id << " (" << source.name << ")\n";
// syncing from this zone?
if (!zone.syncs_from(source.name)) {
- out << indented{width} << "not in sync_from\n";
+ out << indented{width} << "does not sync from zone\n";
return 0;
}
std::vector<rgw_bucket_shard_sync_info> status;
- int r = rgw_bucket_sync_status(dpp(), store, source.id, bucket_info, &status);
+ int r = rgw_bucket_sync_status(dpp(), store, pipe, bucket_info, &status);
if (r < 0) {
lderr(store->ctx()) << "failed to read bucket sync status: " << cpp_strerror(r) << dendl;
return r;
return 0;
}
-#warning need to use bucket sources
+ RGWBucketSyncPolicyHandlerRef handler;
+
+ int r = store->ctl()->bucket->get_sync_policy_handler(info.bucket, &handler, null_yield);
+ if (r < 0) {
+ lderr(store->ctx()) << "ERROR: failed to get policy handler for bucket (" << info.bucket << "): r=" << r << ": " << cpp_strerror(-r) << dendl;
+ return r;
+ }
+
+ auto& sources = handler->get_sources();
+
auto& zone_conn_map = store->svc()->zone->get_zone_conn_map();
+ set<string> zone_ids;
+
if (!source_zone_id.empty()) {
auto z = zonegroup.zones.find(source_zone_id);
if (z == zonegroup.zones.end()) {
lderr(store->ctx()) << "No connection to zone " << z->second.name << dendl;
return -EINVAL;
}
- return bucket_source_sync_status(store, zone, z->second, c->second,
- info, width, out);
+ zone_ids.insert(source_zone_id);
+ } else {
+ for (const auto& entry : zonegroup.zones) {
+ auto c = zone_conn_map.find(entry.second.id);
+ if (c == zone_conn_map.end()) {
+ continue;
+ }
+ zone_ids.insert(entry.second.name);
+ }
}
- for (const auto& z : zonegroup.zones) {
- auto c = zone_conn_map.find(z.second.id);
- if (c != zone_conn_map.end()) {
- bucket_source_sync_status(store, zone, z.second, c->second,
- info, width, out);
+ for (auto& zone_id : zone_ids) {
+ auto z = zonegroup.zones.find(zone_id);
+ if (z == zonegroup.zones.end()) { /* should't happen */
+ continue;
+ }
+ auto c = zone_conn_map.find(source_zone_id);
+ if (c == zone_conn_map.end()) { /* should't happen */
+ continue;
+ }
+
+ for (auto& m : sources) {
+ for (auto& pipe : m.second.pipes) {
+ if (pipe.source.zone.value_or("") == z->second.id) {
+ bucket_source_sync_status(store, zone, z->second,
+ c->second,
+ info, pipe,
+ width, out);
+ }
+ }
}
}
+
return 0;
}
rgw::sal::RGWRadosStore *const store;
RGWDataSyncCtx *const sc;
RGWDataSyncEnv *const env;
- const int num_shards;
- rgw_bucket_shard bs;
-#warning change this
+ RGWBucketInfo source_bucket_info;
+ RGWBucketInfo dest_bucket_info;
+ rgw_bucket_shard source_bs;
+ rgw_bucket_shard dest_bs;
+
rgw_bucket_sync_pair_info sync_pair;
+ bool shard_to_shard_sync;
+
using Vector = std::vector<rgw_bucket_shard_sync_info>;
Vector::iterator i, end;
public:
RGWCollectBucketSyncStatusCR(rgw::sal::RGWRadosStore *store, RGWDataSyncCtx *sc,
- int num_shards, const rgw_bucket& bucket,
+ const RGWBucketInfo& source_bucket_info,
+ const RGWBucketInfo& dest_bucket_info,
Vector *status)
: RGWShardCollectCR(sc->cct, max_concurrent_shards),
- store(store), sc(sc), env(sc->env), num_shards(num_shards),
- bs(bucket, num_shards > 0 ? 0 : -1), // start at shard 0 or -1
+ store(store), sc(sc), env(sc->env),
+ source_bucket_info(source_bucket_info),
+ dest_bucket_info(dest_bucket_info),
i(status->begin()), end(status->end())
- {}
+ {
+ shard_to_shard_sync = (source_bucket_info.num_shards == dest_bucket_info.num_shards);
+
+ source_bs = rgw_bucket_shard(source_bucket_info.bucket, source_bucket_info.num_shards > 0 ? 0 : -1);
+ dest_bs = rgw_bucket_shard(dest_bucket_info.bucket, dest_bucket_info.num_shards > 0 ? 0 : -1);
+
+ status->clear();
+ status->resize(std::max<size_t>(1, source_bucket_info.num_shards));
+
+ i = status->begin();
+ end = status->end();
+ }
bool spawn_next() override {
if (i == end) {
return false;
}
- sync_pair.source_bs = bs;
+ sync_pair.source_bs = source_bs;
+ sync_pair.dest_bs = dest_bs;
spawn(new RGWReadBucketPipeSyncStatusCoroutine(sc, sync_pair, &*i), false);
++i;
- ++bs.shard_id;
+ ++source_bs.shard_id;
+ if (shard_to_shard_sync) {
+ dest_bs.shard_id = source_bs.shard_id;
+ }
return true;
}
};
-int rgw_bucket_sync_status(const DoutPrefixProvider *dpp, rgw::sal::RGWRadosStore *store, const std::string& source_zone,
- const RGWBucketInfo& bucket_info,
+int rgw_bucket_sync_status(const DoutPrefixProvider *dpp,
+ rgw::sal::RGWRadosStore *store,
+ const rgw_sync_bucket_pipe& pipe,
+ const RGWBucketInfo& dest_bucket_info,
std::vector<rgw_bucket_shard_sync_info> *status)
{
- const auto num_shards = bucket_info.num_shards;
- status->clear();
- status->resize(std::max<size_t>(1, num_shards));
+ if (!pipe.source.zone ||
+ !pipe.source.bucket ||
+ !pipe.dest.zone ||
+ !pipe.dest.bucket) {
+ return -EINVAL;
+ }
+
+ if (*pipe.dest.bucket !=
+ dest_bucket_info.bucket) {
+ return -EINVAL;
+ }
+
+ const rgw_bucket& source_bucket = *pipe.source.bucket;
+
+ RGWBucketInfo source_bucket_info;
+
+ auto obj_ctx = store->svc()->sysobj->init_obj_ctx();
+
+ int ret = store->getRados()->get_bucket_instance_info(obj_ctx, source_bucket, source_bucket_info, nullptr, nullptr, null_yield);
+ if (ret < 0) {
+ ldpp_dout(dpp, 0) << "ERROR: failed to get bucket instance info: bucket=" << source_bucket << ": " << cpp_strerror(-ret) << dendl;
+ return ret;
+ }
RGWDataSyncEnv env;
RGWSyncModuleInstanceRef module; // null sync module
nullptr, nullptr, nullptr, module, nullptr);
RGWDataSyncCtx sc;
- sc.init(&env, nullptr, source_zone);
+ sc.init(&env, nullptr, *pipe.source.zone);
RGWCoroutinesManager crs(store->ctx(), store->getRados()->get_cr_registry());
- return crs.run(new RGWCollectBucketSyncStatusCR(store, &sc, num_shards,
- bucket_info.bucket, status));
+ return crs.run(new RGWCollectBucketSyncStatusCR(store, &sc,
+ source_bucket_info,
+ dest_bucket_info,
+ status));
}
#include "rgw_sync_trace.h"
class JSONObj;
+struct rgw_sync_bucket_pipe;
struct rgw_bucket_sync_pair_info {
rgw_bucket_shard source_bs;
};
/// read the sync status of all bucket shards from the given source zone
-int rgw_bucket_sync_status(const DoutPrefixProvider *dpp, rgw::sal::RGWRadosStore *store, const std::string& source_zone,
- const RGWBucketInfo& bucket_info,
+int rgw_bucket_sync_status(const DoutPrefixProvider *dpp,
+ rgw::sal::RGWRadosStore *store,
+ const rgw_sync_bucket_pipe& pipe,
+ const RGWBucketInfo& dest_bucket_info,
std::vector<rgw_bucket_shard_sync_info> *status);
class RGWDefaultSyncModule : public RGWSyncModule {
ldpp_dout(s, 4) << "failed to read bucket info: " << cpp_strerror(http_ret) << dendl;
return;
}
- http_ret = rgw_bucket_sync_status(this, store, source_zone, info, &status);
+ rgw_sync_bucket_pipe pipe;
+ pipe.source.zone = source_zone;
+ pipe.source.bucket = info.bucket;
+ pipe.dest.zone = store->svc()->zone->zone_id();
+ pipe.dest.bucket = info.bucket;
+ http_ret = rgw_bucket_sync_status(this, store, pipe, info, &status);
}
void RGWOp_BILog_Status::send_response()
nullopt,
nullptr);
- sync_flow_mgr->init(zonegroup->sync_policy);
+ rgw_sync_policy_info sync_policy = zonegroup->sync_policy;
+ if (sync_policy.empty()) {
+ RGWSyncPolicyCompat::convert_old_sync_config(this, sync_modules_svc, &sync_policy);
+ }
+
+ sync_flow_mgr->init(sync_policy);
+
+ RGWBucketSyncFlowManager::pipe_set zone_sources;
+ RGWBucketSyncFlowManager::pipe_set zone_targets;
+
+ sync_flow_mgr->reflect(nullopt, &zone_sources, &zone_targets);
+
+ set<string> source_zones_by_name;
+ set<string> target_zones_by_name;
+
+ for (auto& pipe : zone_sources.pipes) {
+ if (pipe.source.zone) {
+ source_zones_by_name.insert(*pipe.source.zone);
+ }
+ }
+
+ for (auto& pipe : zone_targets.pipes) {
+ if (pipe.dest.zone) {
+ target_zones_by_name.insert(*pipe.dest.zone);
+ }
+ }
ret = sync_modules_svc->start();
if (ret < 0) {
ldout(cct, 20) << "generating connection object for zone " << z.name << " id " << z.id << dendl;
RGWRESTConn *conn = new RGWRESTConn(cct, this, z.id, z.endpoints);
zone_conn_map[id] = conn;
- if (zone_syncs_from(*zone_public_config, z) ||
- zone_syncs_from(z, *zone_public_config)) {
- if (zone_syncs_from(*zone_public_config, z)) {
+
+ bool zone_is_source = source_zones_by_name.find(z.name) != source_zones_by_name.end();
+ bool zone_is_target = target_zones_by_name.find(z.name) != target_zones_by_name.end();
+
+ if (zone_is_source || zone_is_target) {
+ if (zone_is_source) {
data_sync_source_zones.push_back(&z);
}
- if (zone_syncs_from(z, *zone_public_config)) {
+ if (zone_is_target) {
zone_data_notify_to_map[id] = conn;
}
} else {