From 70f1255ffdbcf38eb6c2fad54181ffdaab2c0289 Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Thu, 24 Oct 2019 18:23:22 -0700 Subject: [PATCH] rgw: sync: bucket sync status changes Signed-off-by: Yehuda Sadeh --- src/rgw/rgw_admin.cc | 54 +++++++++++++++++++----- src/rgw/rgw_data_sync.cc | 79 ++++++++++++++++++++++++++++-------- src/rgw/rgw_data_sync.h | 7 +++- src/rgw/rgw_rest_log.cc | 7 +++- src/rgw/services/svc_zone.cc | 38 ++++++++++++++--- 5 files changed, 150 insertions(+), 35 deletions(-) diff --git a/src/rgw/rgw_admin.cc b/src/rgw/rgw_admin.cc index bb34ed3b597..ecfa33f4916 100644 --- a/src/rgw/rgw_admin.cc +++ b/src/rgw/rgw_admin.cc @@ -2254,17 +2254,18 @@ static int remote_bilog_markers(rgw::sal::RGWRadosStore *store, const RGWZone& s static int bucket_source_sync_status(rgw::sal::RGWRadosStore *store, const RGWZone& zone, const RGWZone& source, RGWRESTConn *conn, const RGWBucketInfo& bucket_info, + const rgw_sync_bucket_pipe& pipe, int width, std::ostream& out) { out << indented{width, "source zone"} << source.id << " (" << source.name << ")\n"; // syncing from this zone? if (!zone.syncs_from(source.name)) { - out << indented{width} << "not in sync_from\n"; + out << indented{width} << "does not sync from zone\n"; return 0; } std::vector status; - int r = rgw_bucket_sync_status(dpp(), store, source.id, bucket_info, &status); + int r = rgw_bucket_sync_status(dpp(), store, pipe, bucket_info, &status); if (r < 0) { lderr(store->ctx()) << "failed to read bucket sync status: " << cpp_strerror(r) << dendl; return r; @@ -2453,8 +2454,19 @@ static int bucket_sync_status(rgw::sal::RGWRadosStore *store, const RGWBucketInf return 0; } -#warning need to use bucket sources + RGWBucketSyncPolicyHandlerRef handler; + + int r = store->ctl()->bucket->get_sync_policy_handler(info.bucket, &handler, null_yield); + if (r < 0) { + lderr(store->ctx()) << "ERROR: failed to get policy handler for bucket (" << info.bucket << "): r=" << r << ": " << cpp_strerror(-r) << dendl; + return r; + } + + auto& sources = handler->get_sources(); + auto& zone_conn_map = store->svc()->zone->get_zone_conn_map(); + set zone_ids; + if (!source_zone_id.empty()) { auto z = zonegroup.zones.find(source_zone_id); if (z == zonegroup.zones.end()) { @@ -2467,17 +2479,39 @@ static int bucket_sync_status(rgw::sal::RGWRadosStore *store, const RGWBucketInf lderr(store->ctx()) << "No connection to zone " << z->second.name << dendl; return -EINVAL; } - return bucket_source_sync_status(store, zone, z->second, c->second, - info, width, out); + zone_ids.insert(source_zone_id); + } else { + for (const auto& entry : zonegroup.zones) { + auto c = zone_conn_map.find(entry.second.id); + if (c == zone_conn_map.end()) { + continue; + } + zone_ids.insert(entry.second.name); + } } - for (const auto& z : zonegroup.zones) { - auto c = zone_conn_map.find(z.second.id); - if (c != zone_conn_map.end()) { - bucket_source_sync_status(store, zone, z.second, c->second, - info, width, out); + for (auto& zone_id : zone_ids) { + auto z = zonegroup.zones.find(zone_id); + if (z == zonegroup.zones.end()) { /* should't happen */ + continue; + } + auto c = zone_conn_map.find(source_zone_id); + if (c == zone_conn_map.end()) { /* should't happen */ + continue; + } + + for (auto& m : sources) { + for (auto& pipe : m.second.pipes) { + if (pipe.source.zone.value_or("") == z->second.id) { + bucket_source_sync_status(store, zone, z->second, + c->second, + info, pipe, + width, out); + } + } } } + return 0; } diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index 43bfcefaf50..4bba4bdac4b 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -3915,43 +3915,86 @@ class RGWCollectBucketSyncStatusCR : public RGWShardCollectCR { rgw::sal::RGWRadosStore *const store; RGWDataSyncCtx *const sc; RGWDataSyncEnv *const env; - const int num_shards; - rgw_bucket_shard bs; -#warning change this + RGWBucketInfo source_bucket_info; + RGWBucketInfo dest_bucket_info; + rgw_bucket_shard source_bs; + rgw_bucket_shard dest_bs; + rgw_bucket_sync_pair_info sync_pair; + bool shard_to_shard_sync; + using Vector = std::vector; Vector::iterator i, end; public: RGWCollectBucketSyncStatusCR(rgw::sal::RGWRadosStore *store, RGWDataSyncCtx *sc, - int num_shards, const rgw_bucket& bucket, + const RGWBucketInfo& source_bucket_info, + const RGWBucketInfo& dest_bucket_info, Vector *status) : RGWShardCollectCR(sc->cct, max_concurrent_shards), - store(store), sc(sc), env(sc->env), num_shards(num_shards), - bs(bucket, num_shards > 0 ? 0 : -1), // start at shard 0 or -1 + store(store), sc(sc), env(sc->env), + source_bucket_info(source_bucket_info), + dest_bucket_info(dest_bucket_info), i(status->begin()), end(status->end()) - {} + { + shard_to_shard_sync = (source_bucket_info.num_shards == dest_bucket_info.num_shards); + + source_bs = rgw_bucket_shard(source_bucket_info.bucket, source_bucket_info.num_shards > 0 ? 0 : -1); + dest_bs = rgw_bucket_shard(dest_bucket_info.bucket, dest_bucket_info.num_shards > 0 ? 0 : -1); + + status->clear(); + status->resize(std::max(1, source_bucket_info.num_shards)); + + i = status->begin(); + end = status->end(); + } bool spawn_next() override { if (i == end) { return false; } - sync_pair.source_bs = bs; + sync_pair.source_bs = source_bs; + sync_pair.dest_bs = dest_bs; spawn(new RGWReadBucketPipeSyncStatusCoroutine(sc, sync_pair, &*i), false); ++i; - ++bs.shard_id; + ++source_bs.shard_id; + if (shard_to_shard_sync) { + dest_bs.shard_id = source_bs.shard_id; + } return true; } }; -int rgw_bucket_sync_status(const DoutPrefixProvider *dpp, rgw::sal::RGWRadosStore *store, const std::string& source_zone, - const RGWBucketInfo& bucket_info, +int rgw_bucket_sync_status(const DoutPrefixProvider *dpp, + rgw::sal::RGWRadosStore *store, + const rgw_sync_bucket_pipe& pipe, + const RGWBucketInfo& dest_bucket_info, std::vector *status) { - const auto num_shards = bucket_info.num_shards; - status->clear(); - status->resize(std::max(1, num_shards)); + if (!pipe.source.zone || + !pipe.source.bucket || + !pipe.dest.zone || + !pipe.dest.bucket) { + return -EINVAL; + } + + if (*pipe.dest.bucket != + dest_bucket_info.bucket) { + return -EINVAL; + } + + const rgw_bucket& source_bucket = *pipe.source.bucket; + + RGWBucketInfo source_bucket_info; + + auto obj_ctx = store->svc()->sysobj->init_obj_ctx(); + + int ret = store->getRados()->get_bucket_instance_info(obj_ctx, source_bucket, source_bucket_info, nullptr, nullptr, null_yield); + if (ret < 0) { + ldpp_dout(dpp, 0) << "ERROR: failed to get bucket instance info: bucket=" << source_bucket << ": " << cpp_strerror(-ret) << dendl; + return ret; + } RGWDataSyncEnv env; RGWSyncModuleInstanceRef module; // null sync module @@ -3959,9 +4002,11 @@ int rgw_bucket_sync_status(const DoutPrefixProvider *dpp, rgw::sal::RGWRadosStor nullptr, nullptr, nullptr, module, nullptr); RGWDataSyncCtx sc; - sc.init(&env, nullptr, source_zone); + sc.init(&env, nullptr, *pipe.source.zone); RGWCoroutinesManager crs(store->ctx(), store->getRados()->get_cr_registry()); - return crs.run(new RGWCollectBucketSyncStatusCR(store, &sc, num_shards, - bucket_info.bucket, status)); + return crs.run(new RGWCollectBucketSyncStatusCR(store, &sc, + source_bucket_info, + dest_bucket_info, + status)); } diff --git a/src/rgw/rgw_data_sync.h b/src/rgw/rgw_data_sync.h index 416e35dd9f3..2adbf1227b8 100644 --- a/src/rgw/rgw_data_sync.h +++ b/src/rgw/rgw_data_sync.h @@ -17,6 +17,7 @@ #include "rgw_sync_trace.h" class JSONObj; +struct rgw_sync_bucket_pipe; struct rgw_bucket_sync_pair_info { rgw_bucket_shard source_bs; @@ -639,8 +640,10 @@ public: }; /// read the sync status of all bucket shards from the given source zone -int rgw_bucket_sync_status(const DoutPrefixProvider *dpp, rgw::sal::RGWRadosStore *store, const std::string& source_zone, - const RGWBucketInfo& bucket_info, +int rgw_bucket_sync_status(const DoutPrefixProvider *dpp, + rgw::sal::RGWRadosStore *store, + const rgw_sync_bucket_pipe& pipe, + const RGWBucketInfo& dest_bucket_info, std::vector *status); class RGWDefaultSyncModule : public RGWSyncModule { diff --git a/src/rgw/rgw_rest_log.cc b/src/rgw/rgw_rest_log.cc index f74663876b8..82256e32ec5 100644 --- a/src/rgw/rgw_rest_log.cc +++ b/src/rgw/rgw_rest_log.cc @@ -857,7 +857,12 @@ void RGWOp_BILog_Status::execute() ldpp_dout(s, 4) << "failed to read bucket info: " << cpp_strerror(http_ret) << dendl; return; } - http_ret = rgw_bucket_sync_status(this, store, source_zone, info, &status); + rgw_sync_bucket_pipe pipe; + pipe.source.zone = source_zone; + pipe.source.bucket = info.bucket; + pipe.dest.zone = store->svc()->zone->zone_id(); + pipe.dest.bucket = info.bucket; + http_ret = rgw_bucket_sync_status(this, store, pipe, info, &status); } void RGWOp_BILog_Status::send_response() diff --git a/src/rgw/services/svc_zone.cc b/src/rgw/services/svc_zone.cc index 15fea6b9968..5af307c9caf 100644 --- a/src/rgw/services/svc_zone.cc +++ b/src/rgw/services/svc_zone.cc @@ -157,8 +157,33 @@ int RGWSI_Zone::do_start() nullopt, nullptr); - sync_flow_mgr->init(zonegroup->sync_policy); + rgw_sync_policy_info sync_policy = zonegroup->sync_policy; + if (sync_policy.empty()) { + RGWSyncPolicyCompat::convert_old_sync_config(this, sync_modules_svc, &sync_policy); + } + + sync_flow_mgr->init(sync_policy); + + RGWBucketSyncFlowManager::pipe_set zone_sources; + RGWBucketSyncFlowManager::pipe_set zone_targets; + + sync_flow_mgr->reflect(nullopt, &zone_sources, &zone_targets); + + set source_zones_by_name; + set target_zones_by_name; + + for (auto& pipe : zone_sources.pipes) { + if (pipe.source.zone) { + source_zones_by_name.insert(*pipe.source.zone); + } + } + + for (auto& pipe : zone_targets.pipes) { + if (pipe.dest.zone) { + target_zones_by_name.insert(*pipe.dest.zone); + } + } ret = sync_modules_svc->start(); if (ret < 0) { @@ -198,12 +223,15 @@ int RGWSI_Zone::do_start() ldout(cct, 20) << "generating connection object for zone " << z.name << " id " << z.id << dendl; RGWRESTConn *conn = new RGWRESTConn(cct, this, z.id, z.endpoints); zone_conn_map[id] = conn; - if (zone_syncs_from(*zone_public_config, z) || - zone_syncs_from(z, *zone_public_config)) { - if (zone_syncs_from(*zone_public_config, z)) { + + bool zone_is_source = source_zones_by_name.find(z.name) != source_zones_by_name.end(); + bool zone_is_target = target_zones_by_name.find(z.name) != target_zones_by_name.end(); + + if (zone_is_source || zone_is_target) { + if (zone_is_source) { data_sync_source_zones.push_back(&z); } - if (zone_syncs_from(z, *zone_public_config)) { + if (zone_is_target) { zone_data_notify_to_map[id] = conn; } } else { -- 2.39.5