]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: data sync: more work towards new policy integration
authorYehuda Sadeh <yehuda@redhat.com>
Wed, 30 Oct 2019 21:51:12 +0000 (14:51 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Tue, 28 Jan 2020 18:20:37 +0000 (10:20 -0800)
RGWGetBucketPeersCR will be able to return correct buckets
that need to sync, either by a target bucket, and/or a source
zone and a source bucket (partially implemented).

Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_bucket_sync.h
src/rgw/rgw_data_sync.cc
src/rgw/rgw_data_sync.h

index c62f4cf2edb476a3ecbf870db024312e9eb58f84..2368ffb097d80dcc47b8f8e319c3c756d710d46b 100644 (file)
@@ -113,6 +113,20 @@ public:
   struct pipe_set {
     std::set<rgw_sync_bucket_pipe> pipes;
 
+    using iterator = std::set<rgw_sync_bucket_pipe>::iterator;
+
+    void insert(const rgw_sync_bucket_pipe& pipe) {
+      pipes.insert(pipe);
+    }
+
+    iterator begin() {
+      return pipes.begin();
+    }
+
+    iterator end() {
+      return pipes.end();
+    }
+
     void dump(ceph::Formatter *f) const;
   };
 
index f86d08582c7efed5e65963ddaff81d44190b825b..2e0e3ad5f2a9740ba2e71103897c865ae230a2ef 100644 (file)
@@ -48,6 +48,7 @@ static string datalog_sync_full_sync_index_prefix = "data.full-sync.index";
 static string bucket_status_oid_prefix = "bucket.sync-status";
 static string object_status_oid_prefix = "bucket.sync-status";
 static string bucket_sync_sources_oid_prefix = "bucket.sync-sources";
+static string bucket_sync_targets_oid_prefix = "bucket.sync-targets";
 
 
 void rgw_datalog_info::decode_json(JSONObj *obj) {
@@ -67,29 +68,6 @@ void rgw_datalog_shard_data::decode_json(JSONObj *obj) {
   JSONDecoder::decode_json("entries", entries, obj);
 };
 
-#warning FIXME
-#if 0
-void rgw_sync_flow_rule::get_zone_peers(const string& zone_id,
-                                        std::set<string> *sources,
-                                        std::set<string> *targets) const
-{
-  sources->clear();
-  targets->clear();
-
-  if (directional) {
-    if (directional->target_zone == zone_id) {
-      sources->insert(directional->source_zone);
-    } else if (directional->source_zone == zone_id) {
-      targets->insert(directional->target_zone);
-    }
-  } else if (symmetrical &&
-             symmetrical->find(zone_id) != symmetrical->end()) {
-    *sources = *symmetrical;
-    sources->erase(zone_id);
-    *targets = *sources;
-  }
-}
-#endif
 
 class RGWReadDataSyncStatusMarkersCR : public RGWShardCollectCR {
   static constexpr int MAX_CONCURRENT_SHARDS = 16;
@@ -1071,6 +1049,26 @@ public:
   int operate() override;
 };
 
+class RGWRunBucketsSyncBySourceCR : public RGWCoroutine {
+  RGWDataSyncCtx *sc;
+  RGWDataSyncEnv *sync_env;
+
+  rgw_bucket_shard source;
+
+  RGWSyncTraceNodeRef tn;
+
+public:
+  RGWRunBucketsSyncBySourceCR(RGWDataSyncCtx *_sc, const rgw_bucket_shard& _source, const RGWSyncTraceNodeRef& _tn_parent)
+    : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env), source(_source),
+      tn(sync_env->sync_tracer->add_node(_tn_parent, "source",
+                                         SSTR(bucket_shard_str{_source} ))) {
+  }
+  ~RGWRunBucketsSyncBySourceCR() override {
+  }
+
+  int operate() override;
+};
+
 class RGWDataSyncSingleEntryCR : public RGWCoroutine {
   RGWDataSyncCtx *sc;
   RGWDataSyncEnv *sync_env;
@@ -1124,7 +1122,7 @@ public:
           sync_pair.source_bs = bs;
           sync_pair.dest_bs = bs;
 #warning init pipe fields
-          call(new RGWRunBucketSyncCoroutine(sc, sync_pair, tn));
+          call(new RGWRunBucketsSyncBySourceCR(sc, bs, tn));
         }
       } while (marker_tracker && marker_tracker->need_retry(raw_key));
 
@@ -3351,7 +3349,7 @@ int RGWBucketShardIncrementalSyncCR::operate()
   return 0;
 }
 
-class RGWBucketSyncSourcesManager {
+class RGWBucketSyncPeersManager {
 public:
   static string sync_sources_oid(const rgw_bucket bucket) {
     return bucket_sync_sources_oid_prefix + "." + bucket.get_key();
@@ -3360,6 +3358,13 @@ public:
   static rgw_raw_obj sync_sources_obj(RGWSI_Zone *zone_svc, const rgw_bucket& bucket) { 
     return rgw_raw_obj(zone_svc->get_zone_params().log_pool, sync_sources_oid(bucket));
   }
+  static string sync_targets_oid(const rgw_bucket bucket) {
+    return bucket_sync_targets_oid_prefix + "." + bucket.get_key();
+  }
+
+  static rgw_raw_obj sync_targets_obj(RGWSI_Zone *zone_svc, const rgw_bucket& bucket) { 
+    return rgw_raw_obj(zone_svc->get_zone_params().log_pool, sync_targets_oid(bucket));
+  }
 };
 
 struct rgw_bucket_sync_source_local_info {
@@ -3416,25 +3421,19 @@ struct rgw_bucket_sync_sources_local_info {
 };
 WRITE_CLASS_ENCODER(rgw_bucket_sync_sources_local_info)
 
-class RGWGetBucketSourcePeersCR : public RGWCoroutine {
+class RGWGetBucketPeersCR : public RGWCoroutine {
   RGWDataSyncEnv *sync_env;
-  rgw_bucket bucket;
 
+  std::optional<rgw_bucket> target_bucket;
   std::optional<string> source_zone;
+  std::optional<rgw_bucket> source_bucket;
 
-  map<string, RGWBucketSyncFlowManager::pipe_set> *sources;
+  RGWBucketSyncFlowManager::pipe_set *sources;
   map<rgw_bucket, RGWBucketInfo> *sources_info;
   map<rgw_bucket, RGWBucketInfo>::iterator siiter;
   RGWBucketInfo *pbucket_info;
 
-  rgw_raw_obj sources_obj;
-
-  bool found_binfo{false};
-  map<string, RGWBucketSyncFlowManager::pipe_set>::iterator siter;
-  map<string, RGWBucketSyncFlowManager::pipe_set>::iterator siter_end;
-  set<rgw_sync_bucket_pipe>::iterator piter;
-
-  std::optional<rgw_bucket> source_bucket;
+  RGWBucketSyncFlowManager::pipe_set::iterator siter;
 
   rgw_bucket_sync_sources_local_info sources_local_info;
   rgw_bucket_sync_sources_local_info expected_local_info;
@@ -3444,26 +3443,79 @@ class RGWGetBucketSourcePeersCR : public RGWCoroutine {
 
   RGWSyncTraceNodeRef tn;
 
+  using pipe_const_iter = map<string, RGWBucketSyncFlowManager::pipe_set>::const_iterator;
+
+  static pair<pipe_const_iter, pipe_const_iter> get_pipe_iters(const map<string, RGWBucketSyncFlowManager::pipe_set>& m, std::optional<string> zone) {
+    if (!zone) {
+      return { m.begin(), m.end() };
+    }
+
+    auto b = m.find(*zone);
+    if (b == m.end()) {
+      return { b, b };
+    }
+    return { b, std::next(b) };
+  }
+
+  static RGWBucketSyncFlowManager::pipe_set filter_sources(std::optional<string> source_zone,
+                                                           std::optional<rgw_bucket> source_bucket,
+                                                           const map<string, RGWBucketSyncFlowManager::pipe_set>& all_sources) {
+    RGWBucketSyncFlowManager::pipe_set result;
+
+    auto iters = get_pipe_iters(all_sources, source_zone);
+    for (auto i = iters.first; i != iters.second; ++i) {
+      for (auto& peer : i->second.pipes) {
+        if (source_bucket &&
+            peer.source.bucket &&
+            *source_bucket != *peer.source.bucket) {
+          continue;
+        }
+        result.insert(peer);
+      }
+    }
+    return result;
+  }
+
+  static RGWBucketSyncFlowManager::pipe_set filter_targets(std::optional<string> target_zone,
+                                                  std::optional<rgw_bucket> target_bucket,
+                                                  const map<string, RGWBucketSyncFlowManager::pipe_set>& all_targets) {
+    RGWBucketSyncFlowManager::pipe_set result;
+
+    auto iters = get_pipe_iters(all_targets, target_zone);
+    for (auto i = iters.first; i != iters.second; ++i) {
+      for (auto& peer : i->second.pipes) {
+        if (target_bucket &&
+            peer.dest.bucket &&
+            *target_bucket != *peer.dest.bucket) {
+          continue;
+        }
+        result.insert(peer);
+      }
+    }
+    return result;
+  }
+
 public:
-  RGWGetBucketSourcePeersCR(RGWDataSyncEnv *_sync_env,
-                            const rgw_bucket& _bucket,
-                            std::optional<string> _source_zone,
-                            map<string, RGWBucketSyncFlowManager::pipe_set> *_sources,
-                            map<rgw_bucket, RGWBucketInfo> *_sources_info,
-                            RGWBucketInfo *_pbucket_info,
-                            const RGWSyncTraceNodeRef& _tn_parent)
+  RGWGetBucketPeersCR(RGWDataSyncEnv *_sync_env,
+                      std::optional<rgw_bucket> _target_bucket,
+                      std::optional<string> _source_zone,
+                      std::optional<rgw_bucket> _source_bucket,
+                      RGWBucketSyncFlowManager::pipe_set *_sources,
+                      map<rgw_bucket, RGWBucketInfo> *_sources_info,
+                      RGWBucketInfo *_pbucket_info,
+                      const RGWSyncTraceNodeRef& _tn_parent)
     : RGWCoroutine(_sync_env->cct),
       sync_env(_sync_env),
-      bucket(_bucket),
+      target_bucket(_target_bucket),
       source_zone(_source_zone),
       sources(_sources),
       sources_info(_sources_info),
       pbucket_info(_pbucket_info),
-      sources_obj(RGWBucketSyncSourcesManager::sync_sources_obj(sync_env->svc->zone, bucket)),
       policy(make_shared<rgw_bucket_get_sync_policy_result>()),
-      tn(sync_env->sync_tracer->add_node(_tn_parent, "read_bucket_sources",
-                                         SSTR(bucket))) {
-  }
+      tn(sync_env->sync_tracer->add_node(_tn_parent, "get_bucket_peers",
+                                         SSTR( "target=" << target_bucket.value_or(rgw_bucket())
+                                               << ":source=" << target_bucket.value_or(rgw_bucket())
+                                               << ":source_zone=" << source_zone.value_or("*")))) {}
 
   int operate() override;
 };
@@ -3477,10 +3529,9 @@ class RGWRunBucketSourcesSyncCR : public RGWCoroutine {
 
   rgw_raw_obj sources_obj;
 
-  map<string, RGWBucketSyncFlowManager::pipe_set> sources;
+  RGWBucketSyncFlowManager::pipe_set sources;
   map<rgw_bucket, RGWBucketInfo> sources_info;
-  map<string, RGWBucketSyncFlowManager::pipe_set>::iterator siter;
-  set<rgw_sync_bucket_pipe>::iterator piter;
+  RGWBucketSyncFlowManager::pipe_set::iterator siter;
 
   rgw_bucket_sync_pair_info sync_pair;
 
@@ -3491,6 +3542,9 @@ class RGWRunBucketSourcesSyncCR : public RGWCoroutine {
   std::vector<RGWDataSyncCtx> scs;
   RGWDataSyncCtx *cur_sc{nullptr};
 
+  RGWRESTConn *conn{nullptr};
+  string last_zone;
+
   int ret{0};
 
 public:
@@ -3502,7 +3556,7 @@ public:
       sync_env(_sync_env),
       bucket(_bucket),
       source_zone(_source_zone),
-      sources_obj(RGWBucketSyncSourcesManager::sync_sources_obj(_sync_env->svc->zone, bucket)),
+      sources_obj(RGWBucketSyncPeersManager::sync_sources_obj(_sync_env->svc->zone, bucket)),
       tn(sync_env->sync_tracer->add_node(_tn_parent, "bucket_sync_sources",
                                          SSTR(bucket))) {
   }
@@ -3539,7 +3593,7 @@ int RGWRunBucketSourcesSyncCR::operate()
     }
 
     tn->log(10, "took lease");
-    yield call(new RGWGetBucketSourcePeersCR(sync_env, bucket, source_zone, &sources, &sources_info, &bucket_info, tn));
+    yield call(new RGWGetBucketPeersCR(sync_env, bucket, source_zone, std::nullopt, &sources, &sources_info, &bucket_info, tn));
     if (retcode < 0 && retcode != -ENOENT) {
       tn->log(0, "ERROR: failed to read sync status for bucket");
       lease_cr->go_down();
@@ -3550,32 +3604,39 @@ int RGWRunBucketSourcesSyncCR::operate()
     for (siter = sources.begin(); siter != sources.end(); ++siter) {
       scs.emplace_back();
       cur_sc = &scs.back();
+      if (!siter->source.zone ||
+          !siter->source.bucket) {
+        continue;
+      }
+
       {
-        auto& szone = siter->first;
-        auto conn = sync_env->svc->zone->get_zone_conn_by_id(szone);
-        if (!conn) {
-          ldpp_dout(sync_env->dpp, 0) << "ERROR: connection object to zone " << szone << " does not exist" << dendl;
-          continue;
+        auto& szone = *siter->source.zone;
+        if (last_zone != szone) {
+          conn = sync_env->svc->zone->get_zone_conn_by_id(szone);
+          if (!conn) {
+            ldpp_dout(sync_env->dpp, 0) << "ERROR: connection object to zone " << szone << " does not exist" << dendl;
+            continue;
+          }
+          last_zone = szone;
         }
-        cur_sc->init(sync_env, conn, siter->first);
+        cur_sc->init(sync_env, conn, szone);
       }
-      for (piter = siter->second.pipes.begin(); piter != siter->second.pipes.end(); ++piter) {
-        sync_pair.source_bs.bucket = *piter->source.bucket;
-        sync_pair.dest_bs.bucket = bucket_info.bucket;
 
-        yield spawn(new RGWRunBucketSyncCoroutine(cur_sc, sync_pair, tn), false);
-        while (num_spawned() > BUCKET_SYNC_SPAWN_WINDOW) {
-          set_status() << "num_spawned() > spawn_window";
-          yield wait_for_child();
-          bool again = true;
-          while (again) {
-            again = collect(&ret, nullptr);
-            if (ret < 0) {
-              tn->log(10, "a sync operation returned error");
-              /* we have reported this error */
-            }
-            /* not waiting for child here */
+      sync_pair.source_bs.bucket = *siter->source.bucket;
+      sync_pair.dest_bs.bucket = bucket_info.bucket;
+
+      yield spawn(new RGWRunBucketSyncCoroutine(cur_sc, sync_pair, tn), false);
+      while (num_spawned() > BUCKET_SYNC_SPAWN_WINDOW) {
+        set_status() << "num_spawned() > spawn_window";
+        yield wait_for_child();
+        bool again = true;
+        while (again) {
+          again = collect(&ret, nullptr);
+          if (ret < 0) {
+            tn->log(10, "a sync operation returned error");
+            /* we have reported this error */
           }
+          /* not waiting for child here */
         }
       }
     }
@@ -3649,19 +3710,21 @@ int RGWSyncGetBucketInfoCR::operate()
   return 0;
 }
 
-int RGWGetBucketSourcePeersCR::operate()
+int RGWGetBucketPeersCR::operate()
 {
   reenter(this) {
-    yield call(new RGWSimpleRadosReadCR<rgw_bucket_sync_sources_local_info>(sync_env->async_rados,
-                                                                            sync_env->svc->sysobj,
-                                                                            RGWBucketSyncSourcesManager::sync_sources_obj(sync_env->svc->zone, bucket),
-                                                                            &sources_local_info));
-    if (retcode < 0 &&
-        retcode != -ENOENT) {
-      return set_cr_error(retcode);
+    if (target_bucket) {
+      yield call(new RGWSimpleRadosReadCR<rgw_bucket_sync_sources_local_info>(sync_env->async_rados,
+                                                                              sync_env->svc->sysobj,
+                                                                              RGWBucketSyncPeersManager::sync_sources_obj(sync_env->svc->zone, *target_bucket),
+                                                                              &sources_local_info));
+      if (retcode < 0 &&
+          retcode != -ENOENT) {
+        return set_cr_error(retcode);
+      }
     }
 
-    get_policy_params.bucket = bucket;
+    get_policy_params.bucket = *target_bucket;
     yield call(new RGWBucketGetSyncPolicyHandlerCR(sync_env->async_rados,
                                                    sync_env->store,
                                                    get_policy_params,
@@ -3675,47 +3738,36 @@ int RGWGetBucketSourcePeersCR::operate()
     {
       auto& handler = policy->policy_handler;
 
-      *sources = handler->get_sources();
+      if (sources) {
+        *sources = filter_sources(source_zone,
+                                  source_bucket,
+                                  handler->get_sources());
+      }
+
       auto& binfo = handler->get_bucket_info();
       if (binfo) {
         *pbucket_info = *binfo;
-        found_binfo = true;
+        if (sources_info) {
+          (*sources_info)[binfo->bucket] = *binfo;
+        }
       }
     }
 
-    if (sources_info) {
-      if (found_binfo) {
-        (*sources_info)[bucket] = *pbucket_info;
-      }
-
-      siter_end = sources->end();
-      if (source_zone) {
-        siter = sources->find(*source_zone);
-        if (siter != sources->end()) {
-          siter_end = siter;
-          ++siter_end;
+    if (sources && sources_info) {
+      for (siter = sources->begin(); siter != sources->end(); ++siter) {
+        source_bucket = siter->source.bucket;
+        if (!source_bucket) {
+          continue;
         }
-      } else {
-        siter = sources->begin();
-      }
-      for (; siter != siter_end; ++siter) {
-        for (piter = siter->second.pipes.begin();
-             piter != siter->second.pipes.end();
-             ++piter) {
-          source_bucket = piter->source.bucket;
-          if (!source_bucket) {
-            continue;
-          }
-          if (sources_info->find(*source_bucket) != sources_info->end()) {
-            continue;
-          }
-
-          (*sources_info)[*source_bucket] = RGWBucketInfo(); /* reserve space for it, will fetch it later when map cannot change */
+        if (sources_info->find(*source_bucket) != sources_info->end()) {
+          continue;
         }
+
+        sources_info->emplace(*source_bucket, RGWBucketInfo()); /* reserve space for it, will fetch it later when map cannot change */
       }
 
       for (siiter = sources_info->begin(); siiter != sources_info->end(); ++siiter) {
-        if (siiter->first != bucket) {
+        if (siiter->second.bucket.name.empty()) {
           yield call(new RGWSyncGetBucketInfoCR(sync_env, siiter->first, &siiter->second, tn));
         }
       }
@@ -3727,6 +3779,15 @@ int RGWGetBucketSourcePeersCR::operate()
   return 0;
 }
 
+int RGWRunBucketsSyncBySourceCR::operate()
+{
+  reenter(this) {
+    return set_cr_done();
+  }
+
+  return 0;
+}
+
 int RGWRunBucketSyncCoroutine::operate()
 {
   reenter(this) {
@@ -3860,55 +3921,59 @@ int RGWBucketPipeSyncStatusManager::init()
                 error_logger, store->getRados()->get_sync_tracer(),
                 sync_module, nullptr);
 
-  map<string, RGWBucketSyncFlowManager::pipe_set> sources;
+  RGWBucketSyncFlowManager::pipe_set sources;
   map<rgw_bucket, RGWBucketInfo> sources_info;
   RGWBucketInfo dest_bucket_info;
 
-  ret = cr_mgr.run(new RGWGetBucketSourcePeersCR(&sync_env,
-                                                 dest_bucket,
-                                                 source_zone,
-                                                 &sources,
-                                                 &sources_info,
-                                                 &dest_bucket_info,
-                                                 sync_env.sync_tracer->root_node));
+  ret = cr_mgr.run(new RGWGetBucketPeersCR(&sync_env,
+                                           dest_bucket,
+                                           source_zone,
+                                           std::nullopt,
+                                           &sources,
+                                           &sources_info,
+                                           &dest_bucket_info,
+                                           sync_env.sync_tracer->root_node));
   if (ret < 0) {
     ldpp_dout(this, 0) << "failed to get bucket source peers info: (ret=" << ret << "): " << cpp_strerror(-ret) << dendl;
     return ret;
   }
 
-  for (auto siter : sources) {
-    if (source_zone && siter.first != *source_zone) {
+  string last_zone;
+
+  for (auto& peer : sources) {
+    if (!peer.source.zone) {
       continue;
     }
 
-    auto& szone = siter.first;
+    auto szone = *peer.source.zone;
 
-    conn = store->svc()->zone->get_zone_conn_by_id(szone);
-    if (!conn) {
-      ldpp_dout(this, 0) << "connection object to zone " << szone << " does not exist" << dendl;
-      return -EINVAL;
+    if (last_zone != szone) {
+      conn = store->svc()->zone->get_zone_conn_by_id(szone);
+      if (!conn) {
+        ldpp_dout(this, 0) << "connection object to zone " << szone << " does not exist" << dendl;
+        return -EINVAL;
+      }
+      last_zone = szone;
     }
 
-    for (auto& pipe : siter.second.pipes) {
-      auto& source_bucket = pipe.source.bucket;
+    auto& source_bucket = peer.source.bucket;
 
-      if (!source_bucket) {
-        continue;
-      }
+    if (!source_bucket) {
+      continue;
+    }
 
-      auto iter = sources_info.find(*source_bucket);
-      if (iter == sources_info.end()) {
-        ldpp_dout(this, 0) << "ERROR: " << __func__ << "(): failed to find bucket info for bucket=" << *source_bucket << ". Likely a bug" << dendl;
-        return -EIO;
+    auto iter = sources_info.find(*source_bucket);
+    if (iter == sources_info.end()) {
+      ldpp_dout(this, 0) << "ERROR: " << __func__ << "(): failed to find bucket info for bucket=" << *source_bucket << ". Likely a bug" << dendl;
+      return -EIO;
     }
 
-      auto& source_bucket_info = iter->second;
+    auto& source_bucket_info = iter->second;
 
-      source_mgrs.push_back(new RGWRemoteBucketManager(this, &sync_env,
-                                                       szone, conn,
-                                                       source_bucket_info,
-                                                       dest_bucket_info.bucket));
-    }
+    source_mgrs.push_back(new RGWRemoteBucketManager(this, &sync_env,
+                                                     szone, conn,
+                                                     source_bucket_info,
+                                                     dest_bucket_info.bucket));
   }
 
   return 0;
index fc1e0602452b1141d45b0fe4899a1a9fb50f373f..483a3b21781582aa0b08b25f53f83d2632d909de 100644 (file)
@@ -633,7 +633,8 @@ public:
   int init_sync_status();
 
   static string status_oid(const string& source_zone, const rgw_bucket_sync_pair_info& bs);
-  static string obj_status_oid(const string& source_zone, const rgw_obj& obj); /* can be used by sync modules */
+  static string obj_status_oid(const string& source_zone, const rgw_obj& obj); /* specific source obj sync status,
+                                                                                  can be used by sync modules */
 
   // implements DoutPrefixProvider
   CephContext *get_cct() const override;