]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: sync: bucket sync status changes
authorYehuda Sadeh <yehuda@redhat.com>
Fri, 25 Oct 2019 01:23:22 +0000 (18:23 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Tue, 28 Jan 2020 18:20:37 +0000 (10:20 -0800)
Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_admin.cc
src/rgw/rgw_data_sync.cc
src/rgw/rgw_data_sync.h
src/rgw/rgw_rest_log.cc
src/rgw/services/svc_zone.cc

index bb34ed3b597628b37fdfd020486753cb02b557f8..ecfa33f4916274c5973676dd160e9ff00f4a7532 100644 (file)
@@ -2254,17 +2254,18 @@ static int remote_bilog_markers(rgw::sal::RGWRadosStore *store, const RGWZone& s
 static int bucket_source_sync_status(rgw::sal::RGWRadosStore *store, const RGWZone& zone,
                                      const RGWZone& source, RGWRESTConn *conn,
                                      const RGWBucketInfo& bucket_info,
+                                     const rgw_sync_bucket_pipe& pipe,
                                      int width, std::ostream& out)
 {
   out << indented{width, "source zone"} << source.id << " (" << source.name << ")\n";
 
   // syncing from this zone?
   if (!zone.syncs_from(source.name)) {
-    out << indented{width} << "not in sync_from\n";
+    out << indented{width} << "does not sync from zone\n";
     return 0;
   }
   std::vector<rgw_bucket_shard_sync_info> status;
-  int r = rgw_bucket_sync_status(dpp(), store, source.id, bucket_info, &status);
+  int r = rgw_bucket_sync_status(dpp(), store, pipe, bucket_info, &status);
   if (r < 0) {
     lderr(store->ctx()) << "failed to read bucket sync status: " << cpp_strerror(r) << dendl;
     return r;
@@ -2453,8 +2454,19 @@ static int bucket_sync_status(rgw::sal::RGWRadosStore *store, const RGWBucketInf
     return 0;
   }
 
-#warning need to use bucket sources
+  RGWBucketSyncPolicyHandlerRef handler;
+
+  int r = store->ctl()->bucket->get_sync_policy_handler(info.bucket, &handler, null_yield);
+  if (r < 0) {
+    lderr(store->ctx()) << "ERROR: failed to get policy handler for bucket (" << info.bucket << "): r=" << r << ": " << cpp_strerror(-r) << dendl;
+    return r;
+  }
+
+  auto& sources = handler->get_sources();
+
   auto& zone_conn_map = store->svc()->zone->get_zone_conn_map();
+  set<string> zone_ids;
+
   if (!source_zone_id.empty()) {
     auto z = zonegroup.zones.find(source_zone_id);
     if (z == zonegroup.zones.end()) {
@@ -2467,17 +2479,39 @@ static int bucket_sync_status(rgw::sal::RGWRadosStore *store, const RGWBucketInf
       lderr(store->ctx()) << "No connection to zone " << z->second.name << dendl;
       return -EINVAL;
     }
-    return bucket_source_sync_status(store, zone, z->second, c->second,
-                                     info, width, out);
+    zone_ids.insert(source_zone_id);
+  } else {
+    for (const auto& entry : zonegroup.zones) {
+      auto c = zone_conn_map.find(entry.second.id);
+      if (c == zone_conn_map.end()) {
+        continue;
+      }
+      zone_ids.insert(entry.second.name);
+    }
   }
 
-  for (const auto& z : zonegroup.zones) {
-    auto c = zone_conn_map.find(z.second.id);
-    if (c != zone_conn_map.end()) {
-      bucket_source_sync_status(store, zone, z.second, c->second,
-                                info, width, out);
+  for (auto& zone_id : zone_ids) {
+    auto z = zonegroup.zones.find(zone_id);
+    if (z == zonegroup.zones.end()) { /* should't happen */
+      continue;
+    }
+    auto c = zone_conn_map.find(source_zone_id);
+    if (c == zone_conn_map.end()) { /* should't happen */
+      continue;
+    }
+
+    for (auto& m : sources) {
+      for (auto& pipe : m.second.pipes) {
+        if (pipe.source.zone.value_or("") == z->second.id) {
+          bucket_source_sync_status(store, zone, z->second,
+                                    c->second,
+                                    info, pipe,
+                                    width, out);
+        }
+      }
     }
   }
+
   return 0;
 }
 
index 43bfcefaf50726913e93486449a42478854684fc..4bba4bdac4b89789491d30d0488451cf5333579b 100644 (file)
@@ -3915,43 +3915,86 @@ class RGWCollectBucketSyncStatusCR : public RGWShardCollectCR {
   rgw::sal::RGWRadosStore *const store;
   RGWDataSyncCtx *const sc;
   RGWDataSyncEnv *const env;
-  const int num_shards;
-  rgw_bucket_shard bs;
-#warning change this
+  RGWBucketInfo source_bucket_info;
+  RGWBucketInfo dest_bucket_info;
+  rgw_bucket_shard source_bs;
+  rgw_bucket_shard dest_bs;
+
   rgw_bucket_sync_pair_info sync_pair;
 
+  bool shard_to_shard_sync;
+
   using Vector = std::vector<rgw_bucket_shard_sync_info>;
   Vector::iterator i, end;
 
  public:
   RGWCollectBucketSyncStatusCR(rgw::sal::RGWRadosStore *store, RGWDataSyncCtx *sc,
-                               int num_shards, const rgw_bucket& bucket,
+                               const RGWBucketInfo& source_bucket_info,
+                               const RGWBucketInfo& dest_bucket_info,
                                Vector *status)
     : RGWShardCollectCR(sc->cct, max_concurrent_shards),
-      store(store), sc(sc), env(sc->env), num_shards(num_shards),
-      bs(bucket, num_shards > 0 ? 0 : -1), // start at shard 0 or -1
+      store(store), sc(sc), env(sc->env),
+      source_bucket_info(source_bucket_info),
+      dest_bucket_info(dest_bucket_info),
       i(status->begin()), end(status->end())
-  {}
+  {
+    shard_to_shard_sync = (source_bucket_info.num_shards == dest_bucket_info.num_shards);
+
+    source_bs = rgw_bucket_shard(source_bucket_info.bucket, source_bucket_info.num_shards > 0 ? 0 : -1);
+    dest_bs = rgw_bucket_shard(dest_bucket_info.bucket, dest_bucket_info.num_shards > 0 ? 0 : -1);
+
+    status->clear();
+    status->resize(std::max<size_t>(1, source_bucket_info.num_shards));
+
+    i = status->begin();
+    end = status->end();
+  }
 
   bool spawn_next() override {
     if (i == end) {
       return false;
     }
-    sync_pair.source_bs = bs;
+    sync_pair.source_bs = source_bs;
+    sync_pair.dest_bs = dest_bs;
     spawn(new RGWReadBucketPipeSyncStatusCoroutine(sc, sync_pair, &*i), false);
     ++i;
-    ++bs.shard_id;
+    ++source_bs.shard_id;
+    if (shard_to_shard_sync) {
+      dest_bs.shard_id = source_bs.shard_id;
+    }
     return true;
   }
 };
 
-int rgw_bucket_sync_status(const DoutPrefixProvider *dpp, rgw::sal::RGWRadosStore *store, const std::string& source_zone,
-                           const RGWBucketInfo& bucket_info,
+int rgw_bucket_sync_status(const DoutPrefixProvider *dpp,
+                           rgw::sal::RGWRadosStore *store,
+                           const rgw_sync_bucket_pipe& pipe,
+                           const RGWBucketInfo& dest_bucket_info,
                            std::vector<rgw_bucket_shard_sync_info> *status)
 {
-  const auto num_shards = bucket_info.num_shards;
-  status->clear();
-  status->resize(std::max<size_t>(1, num_shards));
+  if (!pipe.source.zone ||
+      !pipe.source.bucket ||
+      !pipe.dest.zone ||
+      !pipe.dest.bucket) {
+    return -EINVAL;
+  }
+
+  if (*pipe.dest.bucket !=
+      dest_bucket_info.bucket) {
+    return -EINVAL;
+  }
+
+  const rgw_bucket& source_bucket = *pipe.source.bucket;
+
+  RGWBucketInfo source_bucket_info;
+
+  auto obj_ctx = store->svc()->sysobj->init_obj_ctx();
+
+  int ret = store->getRados()->get_bucket_instance_info(obj_ctx, source_bucket, source_bucket_info, nullptr, nullptr, null_yield);
+  if (ret < 0) {
+    ldpp_dout(dpp, 0) << "ERROR: failed to get bucket instance info: bucket=" << source_bucket << ": " << cpp_strerror(-ret) << dendl;
+    return ret;
+  }
 
   RGWDataSyncEnv env;
   RGWSyncModuleInstanceRef module; // null sync module
@@ -3959,9 +4002,11 @@ int rgw_bucket_sync_status(const DoutPrefixProvider *dpp, rgw::sal::RGWRadosStor
            nullptr, nullptr, nullptr, module, nullptr);
 
   RGWDataSyncCtx sc;
-  sc.init(&env, nullptr, source_zone);
+  sc.init(&env, nullptr, *pipe.source.zone);
 
   RGWCoroutinesManager crs(store->ctx(), store->getRados()->get_cr_registry());
-  return crs.run(new RGWCollectBucketSyncStatusCR(store, &sc, num_shards,
-                                                  bucket_info.bucket, status));
+  return crs.run(new RGWCollectBucketSyncStatusCR(store, &sc,
+                                                  source_bucket_info,
+                                                  dest_bucket_info,
+                                                  status));
 }
index 416e35dd9f3026f9168ac6607aea7bdbeff27204..2adbf1227b8f12743eb1e28a7a0046a3e47f84d5 100644 (file)
@@ -17,6 +17,7 @@
 #include "rgw_sync_trace.h"
 
 class JSONObj;
+struct rgw_sync_bucket_pipe;
 
 struct rgw_bucket_sync_pair_info {
   rgw_bucket_shard source_bs;
@@ -639,8 +640,10 @@ public:
 };
 
 /// read the sync status of all bucket shards from the given source zone
-int rgw_bucket_sync_status(const DoutPrefixProvider *dpp, rgw::sal::RGWRadosStore *store, const std::string& source_zone,
-                           const RGWBucketInfo& bucket_info,
+int rgw_bucket_sync_status(const DoutPrefixProvider *dpp,
+                           rgw::sal::RGWRadosStore *store,
+                           const rgw_sync_bucket_pipe& pipe,
+                           const RGWBucketInfo& dest_bucket_info,
                            std::vector<rgw_bucket_shard_sync_info> *status);
 
 class RGWDefaultSyncModule : public RGWSyncModule {
index f74663876b8fd9c9e9e1a0cecd35e26c5e32234b..82256e32ec581befeffe5e9389cb6ca425a5b9c7 100644 (file)
@@ -857,7 +857,12 @@ void RGWOp_BILog_Status::execute()
     ldpp_dout(s, 4) << "failed to read bucket info: " << cpp_strerror(http_ret) << dendl;
     return;
   }
-  http_ret = rgw_bucket_sync_status(this, store, source_zone, info, &status);
+  rgw_sync_bucket_pipe pipe;
+  pipe.source.zone = source_zone;
+  pipe.source.bucket = info.bucket;
+  pipe.dest.zone = store->svc()->zone->zone_id();
+  pipe.dest.bucket = info.bucket;
+  http_ret = rgw_bucket_sync_status(this, store, pipe, info, &status);
 }
 
 void RGWOp_BILog_Status::send_response()
index 15fea6b996894385343a4dcf80a83dab16bf8556..5af307c9caffd2cdae0cdc0cf20536aec4aada5c 100644 (file)
@@ -157,8 +157,33 @@ int RGWSI_Zone::do_start()
                                                nullopt,
                                                nullptr);
 
-  sync_flow_mgr->init(zonegroup->sync_policy);
+  rgw_sync_policy_info sync_policy = zonegroup->sync_policy;
 
+  if (sync_policy.empty()) {
+    RGWSyncPolicyCompat::convert_old_sync_config(this, sync_modules_svc, &sync_policy);
+  }
+
+  sync_flow_mgr->init(sync_policy);
+
+  RGWBucketSyncFlowManager::pipe_set zone_sources;
+  RGWBucketSyncFlowManager::pipe_set zone_targets;
+
+  sync_flow_mgr->reflect(nullopt, &zone_sources, &zone_targets);
+
+  set<string> source_zones_by_name;
+  set<string> target_zones_by_name;
+
+  for (auto& pipe : zone_sources.pipes) {
+    if (pipe.source.zone) {
+      source_zones_by_name.insert(*pipe.source.zone);
+    }
+  }
+
+  for (auto& pipe : zone_targets.pipes) {
+    if (pipe.dest.zone) {
+      target_zones_by_name.insert(*pipe.dest.zone);
+    }
+  }
 
   ret = sync_modules_svc->start();
   if (ret < 0) {
@@ -198,12 +223,15 @@ int RGWSI_Zone::do_start()
     ldout(cct, 20) << "generating connection object for zone " << z.name << " id " << z.id << dendl;
     RGWRESTConn *conn = new RGWRESTConn(cct, this, z.id, z.endpoints);
     zone_conn_map[id] = conn;
-    if (zone_syncs_from(*zone_public_config, z) ||
-        zone_syncs_from(z, *zone_public_config)) {
-      if (zone_syncs_from(*zone_public_config, z)) {
+
+    bool zone_is_source = source_zones_by_name.find(z.name) != source_zones_by_name.end();
+    bool zone_is_target = target_zones_by_name.find(z.name) != target_zones_by_name.end();
+
+    if (zone_is_source || zone_is_target) {
+      if (zone_is_source) {
         data_sync_source_zones.push_back(&z);
       }
-      if (zone_syncs_from(z, *zone_public_config)) {
+      if (zone_is_target) {
         zone_data_notify_to_map[id] = conn;
       }
     } else {