]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: data sync: trigger per shard sync by source and target
authorYehuda Sadeh <yehuda@redhat.com>
Thu, 31 Oct 2019 21:31:29 +0000 (14:31 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Tue, 28 Jan 2020 18:20:38 +0000 (10:20 -0800)
and tie to data log sync

Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_data_sync.cc
src/rgw/rgw_data_sync.h

index e0a902725ebdbddb550c3907e5365a8b18825a0d..04ea6e5869848444f4bda7435c8de88b497ce55d 100644 (file)
@@ -1049,6 +1049,153 @@ public:
   int operate() override;
 };
 
+struct rgw_sync_pipe_info_entity
+{
+private:
+  RGWBucketInfo bucket_info;
+  bool _has_bucket_info{false};
+
+public:
+  string zone;
+
+  rgw_sync_pipe_info_entity() {}
+  rgw_sync_pipe_info_entity(const rgw_sync_bucket_entity& e,
+                            std::optional<RGWBucketInfo>& binfo) {
+    if (e.zone) {
+      zone = *e.zone;
+    }
+    if (!e.bucket) {
+      return;
+    }
+    if (!binfo ||
+        binfo->bucket != *e.bucket) {
+      bucket_info.bucket = *e.bucket;
+    }
+    set_bucket_info(*binfo);
+  }
+
+  void update_empty_bucket_info(const std::map<rgw_bucket, RGWBucketInfo>& buckets_info) {
+    if (_has_bucket_info) {
+      return;
+    }
+    if (bucket_info.bucket.name.empty()) {
+      return;
+    }
+
+    auto iter = buckets_info.find(bucket_info.bucket);
+    if (iter == buckets_info.end()) {
+      return;
+    }
+
+    set_bucket_info(iter->second);
+  }
+
+  bool has_bucket_info() const {
+    return _has_bucket_info;
+  }
+
+  void set_bucket_info(const RGWBucketInfo& _bucket_info) {
+    bucket_info = _bucket_info;
+    _has_bucket_info = true;
+  }
+
+  const RGWBucketInfo& get_bucket_info() const {
+    return bucket_info;
+  }
+
+  const rgw_bucket& get_bucket() const {
+    return bucket_info.bucket;
+  }
+
+  bool operator<(const rgw_sync_pipe_info_entity& e) const {
+    if (zone < e.zone) {
+      return false;
+    }
+    if (zone > e.zone) {
+      return true;
+    }
+    return (bucket_info.bucket < e.bucket_info.bucket);
+  }
+};
+
+std::ostream& operator<<(std::ostream& out, const rgw_sync_pipe_info_entity& e) {
+  auto& bucket = e.get_bucket_info().bucket;
+
+  out << e.zone << ":" << bucket.get_key();
+  return out;
+}
+
+struct rgw_sync_pipe_info {
+  rgw_sync_pipe_info_entity source;
+  rgw_sync_pipe_info_entity target;
+
+  rgw_sync_pipe_info() {}
+  rgw_sync_pipe_info(const rgw_sync_bucket_pipe& pipe,
+                     std::optional<RGWBucketInfo> source_bucket_info,
+                     std::optional<RGWBucketInfo> target_bucket_info) : source(pipe.source, source_bucket_info),
+                                                                        target(pipe.dest, target_bucket_info) {}
+
+  bool operator<(const rgw_sync_pipe_info& p) const {
+    if (source < p.source) {
+      return true;
+    }
+    if (p.source < source) {
+      return false;
+    }
+    return (target < p.target);
+  }
+
+  void update_empty_bucket_info(const std::map<rgw_bucket, RGWBucketInfo>& buckets_info) {
+    source.update_empty_bucket_info(buckets_info);
+    target.update_empty_bucket_info(buckets_info);
+  }
+};
+
+std::ostream& operator<<(std::ostream& out, const rgw_sync_pipe_info& p) {
+  out << p.source << ">" << p.target;
+  return out;
+}
+
+struct rgw_sync_pipe_info_set {
+  std::set<rgw_sync_pipe_info> pipes;
+
+  using iterator = std::set<rgw_sync_pipe_info>::iterator;
+
+  void clear() {
+    pipes.clear();
+  }
+
+  void insert(const rgw_sync_bucket_pipe& pipe,
+              std::optional<RGWBucketInfo>& source_bucket_info,
+              std::optional<RGWBucketInfo>& target_bucket_info) {
+    rgw_sync_pipe_info p(pipe, source_bucket_info, target_bucket_info);
+    pipes.insert(p);
+  }
+
+  iterator begin() {
+    return pipes.begin();
+  }
+
+  iterator end() {
+    return pipes.end();
+  }
+
+  void update_empty_bucket_info(const std::map<rgw_bucket, RGWBucketInfo>& buckets_info) {
+    if (buckets_info.empty()) {
+      return;
+    }
+
+    std::set<rgw_sync_pipe_info> p;
+
+    for (auto pipe : pipes) {
+      pipe.update_empty_bucket_info(buckets_info);
+      p.insert(pipe);
+    }
+
+    pipes = std::move(p);
+  }
+};
+
 class RGWRunBucketsSyncBySourceCR : public RGWCoroutine {
   RGWDataSyncCtx *sc;
   RGWDataSyncEnv *sync_env;
@@ -1069,6 +1216,55 @@ public:
   int operate() override;
 };
 
+class RGWRunBucketSourcesSyncCR : public RGWCoroutine {
+  RGWDataSyncCtx *sc;
+  RGWDataSyncEnv *sync_env;
+
+  std::optional<rgw_bucket_shard> target_bs;
+  std::optional<rgw_bucket_shard> source_bs;
+
+  std::optional<rgw_bucket> target_bucket;
+  std::optional<rgw_bucket> source_bucket;
+
+  rgw_raw_obj sources_obj;
+
+  rgw_sync_pipe_info_set pipes;
+  rgw_sync_pipe_info_set::iterator siter;
+
+  rgw_bucket_sync_pair_info sync_pair;
+
+  boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
+  boost::intrusive_ptr<RGWCoroutinesStack> lease_stack;
+
+  RGWSyncTraceNodeRef tn;
+  std::vector<RGWDataSyncCtx> scs;
+  RGWDataSyncCtx *cur_sc{nullptr};
+
+  RGWRESTConn *conn{nullptr};
+  string last_zone;
+
+  int ret{0};
+
+  int source_num_shards{0};
+  int target_num_shards{0};
+
+  int num_shards{0};
+  int cur_shard{0};
+
+public:
+  RGWRunBucketSourcesSyncCR(RGWDataSyncCtx *_sc,
+                            std::optional<rgw_bucket_shard> _target_bs,
+                            std::optional<rgw_bucket_shard> _source_bs,
+                            const RGWSyncTraceNodeRef& _tn_parent);
+  ~RGWRunBucketSourcesSyncCR() override {
+    if (lease_cr) {
+      lease_cr->abort();
+    }
+  }
+
+  int operate() override;
+};
+
 class RGWDataSyncSingleEntryCR : public RGWCoroutine {
   RGWDataSyncCtx *sc;
   RGWDataSyncEnv *sync_env;
@@ -1076,7 +1272,7 @@ class RGWDataSyncSingleEntryCR : public RGWCoroutine {
   string raw_key;
   string entry_marker;
 
-  rgw_bucket_shard bs;
+  rgw_bucket_shard source_bs;
   rgw_bucket_sync_pair_info sync_pair;
 
   int sync_status;
@@ -1109,20 +1305,19 @@ public:
       do {
         yield {
           int ret = rgw_bucket_parse_bucket_key(sync_env->cct, raw_key,
-                                                &bs.bucket, &bs.shard_id);
+                                                &source_bs.bucket, &source_bs.shard_id);
           if (ret < 0) {
             return set_cr_error(-EIO);
           }
           if (marker_tracker) {
             marker_tracker->reset_need_retry(raw_key);
           }
-          tn->log(0, SSTR("triggering sync of bucket/shard " << bucket_shard_str{bs}));
+          tn->log(0, SSTR("triggering sync of source bucket/shard " << bucket_shard_str{source_bs}));
 
-          sync_pair = rgw_bucket_sync_pair_info();
-          sync_pair.source_bs = bs;
-          sync_pair.dest_bs = bs;
-#warning init pipe fields
-          call(new RGWRunBucketsSyncBySourceCR(sc, bs, tn));
+          call(new RGWRunBucketSourcesSyncCR(sc,
+                                             std::nullopt, /* target_bs */
+                                             source_bs,
+                                             tn));
         }
       } while (marker_tracker && marker_tracker->need_retry(raw_key));
 
@@ -3422,141 +3617,6 @@ struct rgw_bucket_sync_sources_local_info {
 WRITE_CLASS_ENCODER(rgw_bucket_sync_sources_local_info)
 
 
-struct rgw_sync_pipe_info_entity 
-{
-private:
-  RGWBucketInfo bucket_info;
-  bool _has_bucket_info{false};
-
-public:
-  string zone;
-
-  rgw_sync_pipe_info_entity() {}
-  rgw_sync_pipe_info_entity(const rgw_sync_bucket_entity& e,
-                            std::optional<RGWBucketInfo>& binfo) {
-    if (e.zone) {
-      zone = *e.zone;
-    }
-    if (!e.bucket) {
-      return;
-    }
-    if (!binfo ||
-        binfo->bucket != *e.bucket) {
-      bucket_info.bucket = *e.bucket;
-    }
-    set_bucket_info(*binfo);
-  }
-
-  void update_empty_bucket_info(const std::map<rgw_bucket, RGWBucketInfo>& buckets_info) {
-    if (_has_bucket_info) {
-      return;
-    }
-    if (bucket_info.bucket.name.empty()) {
-      return;
-    }
-
-    auto iter = buckets_info.find(bucket_info.bucket);
-    if (iter == buckets_info.end()) {
-      return;
-    }
-
-    set_bucket_info(iter->second);
-  }
-
-  bool has_bucket_info() const {
-    return _has_bucket_info;
-  }
-
-  void set_bucket_info(const RGWBucketInfo& _bucket_info) {
-    bucket_info = _bucket_info;
-    _has_bucket_info = true;
-  }
-
-  const RGWBucketInfo& get_bucket_info() const {
-    return bucket_info;
-  }
-
-  const rgw_bucket& get_bucket() const {
-    return bucket_info.bucket;
-  }
-
-  bool operator<(const rgw_sync_pipe_info_entity& e) const {
-    if (zone < e.zone) {
-      return false;
-    }
-    if (zone > e.zone) {
-      return true;
-    }
-    return (bucket_info.bucket < e.bucket_info.bucket);
-  }
-};
-
-struct rgw_sync_pipe_info {
-  rgw_sync_pipe_info_entity source;
-  rgw_sync_pipe_info_entity target;
-
-  rgw_sync_pipe_info() {}
-  rgw_sync_pipe_info(const rgw_sync_bucket_pipe& pipe,
-                     std::optional<RGWBucketInfo> source_bucket_info,
-                     std::optional<RGWBucketInfo> target_bucket_info) : source(pipe.source, source_bucket_info),
-                                                                        target(pipe.dest, target_bucket_info) {}
-
-  bool operator<(const rgw_sync_pipe_info& p) const {
-    if (source < p.source) {
-      return true;
-    }
-    if (p.source < source) {
-      return false;
-    }
-    return (target < p.target);
-  }
-
-  void update_empty_bucket_info(const std::map<rgw_bucket, RGWBucketInfo>& buckets_info) {
-    source.update_empty_bucket_info(buckets_info);
-    target.update_empty_bucket_info(buckets_info);
-  }
-};
-
-struct rgw_sync_pipe_info_set {
-  std::set<rgw_sync_pipe_info> pipes;
-
-  using iterator = std::set<rgw_sync_pipe_info>::iterator;
-
-  void clear() {
-    pipes.clear();
-  }
-
-  void insert(const rgw_sync_bucket_pipe& pipe,
-              std::optional<RGWBucketInfo>& source_bucket_info,
-              std::optional<RGWBucketInfo>& target_bucket_info) {
-    rgw_sync_pipe_info p(pipe, source_bucket_info, target_bucket_info);
-    pipes.insert(p);
-  }
-
-  iterator begin() {
-    return pipes.begin();
-  }
-
-  iterator end() {
-    return pipes.end();
-  }
-
-  void update_empty_bucket_info(const std::map<rgw_bucket, RGWBucketInfo>& buckets_info) {
-    if (buckets_info.empty()) {
-      return;
-    }
-
-    std::set<rgw_sync_pipe_info> p;
-
-    for (auto pipe : pipes) {
-      pipe.update_empty_bucket_info(buckets_info);
-      p.insert(pipe);
-    }
-
-    pipes = std::move(p);
-  }
-};
-
 class RGWGetBucketPeersCR : public RGWCoroutine {
   RGWDataSyncEnv *sync_env;
 
@@ -3661,64 +3721,34 @@ public:
   int operate() override;
 };
 
-class RGWRunBucketSourcesSyncCR : public RGWCoroutine {
-  RGWDataSyncEnv *sync_env;
-
-  std::optional<rgw_bucket_shard> target_bs;
-  std::optional<std::string> source_zone;
-  std::optional<rgw_bucket_shard> source_bs;
-
-  std::optional<rgw_bucket> target_bucket;
-  std::optional<rgw_bucket> source_bucket;
-
-  rgw_raw_obj sources_obj;
-
-  rgw_sync_pipe_info_set pipes;
-  rgw_sync_pipe_info_set::iterator siter;
-
-  rgw_bucket_sync_pair_info sync_pair;
-
-  boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
-  boost::intrusive_ptr<RGWCoroutinesStack> lease_stack;
-
-  RGWSyncTraceNodeRef tn;
-  std::vector<RGWDataSyncCtx> scs;
-  RGWDataSyncCtx *cur_sc{nullptr};
-
-  RGWRESTConn *conn{nullptr};
-  string last_zone;
-
-  int ret{0};
+std::ostream& operator<<(std::ostream& out, std::optional<rgw_bucket_shard>& bs) {
+  if (!bs) {
+    out << "*";
+  } else {
+    out << *bs;
+  }
+  return out;
+}
 
-public:
-  RGWRunBucketSourcesSyncCR(RGWDataSyncEnv *_sync_env,
-                            std::optional<rgw_bucket_shard> _target_bs,
-                            std::optional<string> _source_zone,
-                            std::optional<rgw_bucket_shard> _source_bs,
-                            const RGWSyncTraceNodeRef& _tn_parent)
-    : RGWCoroutine(_sync_env->cct),
-      sync_env(_sync_env),
+RGWRunBucketSourcesSyncCR::RGWRunBucketSourcesSyncCR(RGWDataSyncCtx *_sc,
+                                                     std::optional<rgw_bucket_shard> _target_bs,
+                                                     std::optional<rgw_bucket_shard> _source_bs,
+                                                     const RGWSyncTraceNodeRef& _tn_parent) : RGWCoroutine(_sc->env->cct),
+      sc(_sc),
+      sync_env(_sc->env),
       target_bs(_target_bs),
-      source_zone(_source_zone),
       source_bs(_source_bs),
       tn(sync_env->sync_tracer->add_node(_tn_parent, "bucket_sync_sources",
-                                         SSTR( "target=" << target_bucket.value_or(rgw_bucket()) << ":source_bucket=" << source_bucket.value_or(rgw_bucket()) << ":source_zone=" << source_zone.value_or("*")))) {
-    if (target_bs) {
-      target_bucket = target_bs->bucket;
-      sources_obj = RGWBucketSyncPeersManager::sync_sources_obj(_sync_env->svc->zone, *target_bucket);
-    }
-    if (source_bs) {
-      source_bucket = source_bs->bucket;
-    }
+                                         SSTR( "target=" << target_bucket.value_or(rgw_bucket()) << ":source_bucket=" << source_bucket.value_or(rgw_bucket()) << ":source_zone=" << sc->source_zone)))
+{
+  if (target_bs) {
+    target_bucket = target_bs->bucket;
+    sources_obj = RGWBucketSyncPeersManager::sync_sources_obj(sync_env->svc->zone, *target_bucket);
   }
-  ~RGWRunBucketSourcesSyncCR() override {
-    if (lease_cr) {
-      lease_cr->abort();
-    }
+  if (source_bs) {
+    source_bucket = source_bs->bucket;
   }
-
-  int operate() override;
-};
+}
 
 int RGWRunBucketSourcesSyncCR::operate()
 {
@@ -3744,47 +3774,60 @@ int RGWRunBucketSourcesSyncCR::operate()
     }
 
     tn->log(10, "took lease");
-    yield call(new RGWGetBucketPeersCR(sync_env, target_bucket, source_zone, source_bucket, &pipes, tn));
+    yield call(new RGWGetBucketPeersCR(sync_env, target_bucket, sc->source_zone, source_bucket, &pipes, tn));
     if (retcode < 0 && retcode != -ENOENT) {
       tn->log(0, "ERROR: failed to read sync status for bucket");
       lease_cr->go_down();
       drain_all();
       return set_cr_error(retcode);
     }
+    ldpp_dout(sync_env->dpp, 20) << __func__ << "(): requested source_bs=" << source_bs << " target_bs=" << target_bs << dendl;
 
     for (siter = pipes.begin(); siter != pipes.end(); ++siter) {
-      scs.emplace_back();
-      cur_sc = &scs.back();
-
       {
-        auto& szone = siter->source.zone;
-        if (last_zone != szone) {
-          conn = sync_env->svc->zone->get_zone_conn_by_id(szone);
-          if (!conn) {
-            ldpp_dout(sync_env->dpp, 0) << "ERROR: connection object to zone " << szone << " does not exist" << dendl;
-            continue;
-          }
-          last_zone = szone;
+        ldpp_dout(sync_env->dpp, 20) << __func__ << "(): sync pipe=" << *siter << dendl;
+
+        source_num_shards = siter->source.get_bucket_info().num_shards;
+        target_num_shards = siter->target.get_bucket_info().num_shards;
+        if (source_bs) {
+          sync_pair.source_bs = *source_bs;
+        } else {
+          sync_pair.source_bs.bucket = siter->source.get_bucket();
+        }
+
+        if (sync_pair.source_bs.shard_id >= 0) {
+          num_shards = 1;
+          cur_shard = sync_pair.source_bs.shard_id;
+        } else {
+          num_shards = std::max<int>(1, source_num_shards);
+          cur_shard = std::min<int>(0, source_num_shards);
         }
-        cur_sc->init(sync_env, conn, szone);
       }
 
-      sync_pair.source_bs.bucket = siter->source.get_bucket();
-      sync_pair.dest_bs.bucket = siter->target.get_bucket();
-#warning TODO iterate over shards
-
-      yield spawn(new RGWRunBucketSyncCoroutine(cur_sc, sync_pair, tn), false);
-      while (num_spawned() > BUCKET_SYNC_SPAWN_WINDOW) {
-        set_status() << "num_spawned() > spawn_window";
-        yield wait_for_child();
-        bool again = true;
-        while (again) {
-          again = collect(&ret, nullptr);
-          if (ret < 0) {
-            tn->log(10, "a sync operation returned error");
-            /* we have reported this error */
+      ldpp_dout(sync_env->dpp, 20) << __func__ << "(): num shards=" << num_shards << " cur_shard=" << cur_shard << dendl;
+      for (; num_shards > 0; --num_shards, ++cur_shard) {
+        sync_pair.source_bs.shard_id = cur_shard;
+        if (source_num_shards == target_num_shards) {
+          sync_pair.dest_bs.shard_id = cur_shard;
+        } else {
+          sync_pair.dest_bs.shard_id = -1;
+        }
+
+        ldpp_dout(sync_env->dpp, 20) << __func__ << "(): sync_pair=" << sync_pair << dendl;
+
+        yield spawn(new RGWRunBucketSyncCoroutine(sc, sync_pair, tn), false);
+        while (num_spawned() > BUCKET_SYNC_SPAWN_WINDOW) {
+          set_status() << "num_spawned() > spawn_window";
+          yield wait_for_child();
+          bool again = true;
+          while (again) {
+            again = collect(&ret, nullptr);
+            if (ret < 0) {
+              tn->log(10, "a sync operation returned error");
+              /* we have reported this error */
+            }
+            /* not waiting for child here */
           }
-          /* not waiting for child here */
         }
       }
     }
index 483a3b21781582aa0b08b25f53f83d2632d909de..9d223ebb2e13ab315896f376a9d59008eb855521 100644 (file)
@@ -38,7 +38,7 @@ inline ostream& operator<<(ostream& out, const rgw_bucket_sync_pair_info& p) {
     out << "/" << p.source_prefix;
   }
 
-  out << " -> " << p.dest_bs.bucket;
+  out << "->" << p.dest_bs.bucket;
 
   if (!p.dest_prefix.empty()) {
     out << "/" << p.dest_prefix;