]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: data sync: rework RGWGetBucketPeersCR
authorYehuda Sadeh <yehuda@redhat.com>
Thu, 31 Oct 2019 01:14:54 +0000 (18:14 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Tue, 28 Jan 2020 18:20:37 +0000 (10:20 -0800)
so that it returns pipes with bucket info

Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_bucket_sync.h
src/rgw/rgw_data_sync.cc
src/rgw/rgw_sync_policy.h

index 2368ffb097d80dcc47b8f8e319c3c756d710d46b..17cd4289e98e8c54e0f192c826bfea31bc71bc1e 100644 (file)
@@ -115,6 +115,10 @@ public:
 
     using iterator = std::set<rgw_sync_bucket_pipe>::iterator;
 
+    void clear() {
+      pipes.clear();
+    }
+
     void insert(const rgw_sync_bucket_pipe& pipe) {
       pipes.insert(pipe);
     }
@@ -221,6 +225,10 @@ public:
     return sources;
   }
 
+  const  map<string, RGWBucketSyncFlowManager::pipe_set>& get_targets() {
+    return targets;
+  }
+
   const std::optional<RGWBucketInfo>& get_bucket_info() const {
     return bucket_info;
   }
index 2e0e3ad5f2a9740ba2e71103897c865ae230a2ef..e0a902725ebdbddb550c3907e5365a8b18825a0d 100644 (file)
@@ -3351,19 +3351,19 @@ int RGWBucketShardIncrementalSyncCR::operate()
 
 class RGWBucketSyncPeersManager {
 public:
-  static string sync_sources_oid(const rgw_bucket bucket) {
+  static string sync_sources_oid(const rgw_bucket& bucket) {
     return bucket_sync_sources_oid_prefix + "." + bucket.get_key();
   }
 
   static rgw_raw_obj sync_sources_obj(RGWSI_Zone *zone_svc, const rgw_bucket& bucket) { 
     return rgw_raw_obj(zone_svc->get_zone_params().log_pool, sync_sources_oid(bucket));
   }
-  static string sync_targets_oid(const rgw_bucket bucket) {
-    return bucket_sync_targets_oid_prefix + "." + bucket.get_key();
+  static string sync_targets_oid(const string& source_zone, const rgw_bucket& source_bucket) {
+    return bucket_sync_targets_oid_prefix + "." + source_zone + "." + source_bucket.get_key();
   }
 
-  static rgw_raw_obj sync_targets_obj(RGWSI_Zone *zone_svc, const rgw_bucket& bucket) { 
-    return rgw_raw_obj(zone_svc->get_zone_params().log_pool, sync_targets_oid(bucket));
+  static rgw_raw_obj sync_targets_obj(RGWSI_Zone *zone_svc, const string& source_zone, const rgw_bucket& source_bucket) { 
+    return rgw_raw_obj(zone_svc->get_zone_params().log_pool, sync_targets_oid(source_zone, source_bucket));
   }
 };
 
@@ -3421,6 +3421,142 @@ struct rgw_bucket_sync_sources_local_info {
 };
 WRITE_CLASS_ENCODER(rgw_bucket_sync_sources_local_info)
 
+
+struct rgw_sync_pipe_info_entity 
+{
+private:
+  RGWBucketInfo bucket_info;
+  bool _has_bucket_info{false};
+
+public:
+  string zone;
+
+  rgw_sync_pipe_info_entity() {}
+  rgw_sync_pipe_info_entity(const rgw_sync_bucket_entity& e,
+                            std::optional<RGWBucketInfo>& binfo) {
+    if (e.zone) {
+      zone = *e.zone;
+    }
+    if (!e.bucket) {
+      return;
+    }
+    if (!binfo ||
+        binfo->bucket != *e.bucket) {
+      bucket_info.bucket = *e.bucket;
+    }
+    set_bucket_info(*binfo);
+  }
+
+  void update_empty_bucket_info(const std::map<rgw_bucket, RGWBucketInfo>& buckets_info) {
+    if (_has_bucket_info) {
+      return;
+    }
+    if (bucket_info.bucket.name.empty()) {
+      return;
+    }
+
+    auto iter = buckets_info.find(bucket_info.bucket);
+    if (iter == buckets_info.end()) {
+      return;
+    }
+
+    set_bucket_info(iter->second);
+  }
+
+  bool has_bucket_info() const {
+    return _has_bucket_info;
+  }
+
+  void set_bucket_info(const RGWBucketInfo& _bucket_info) {
+    bucket_info = _bucket_info;
+    _has_bucket_info = true;
+  }
+
+  const RGWBucketInfo& get_bucket_info() const {
+    return bucket_info;
+  }
+
+  const rgw_bucket& get_bucket() const {
+    return bucket_info.bucket;
+  }
+
+  bool operator<(const rgw_sync_pipe_info_entity& e) const {
+    if (zone < e.zone) {
+      return false;
+    }
+    if (zone > e.zone) {
+      return true;
+    }
+    return (bucket_info.bucket < e.bucket_info.bucket);
+  }
+};
+
+struct rgw_sync_pipe_info {
+  rgw_sync_pipe_info_entity source;
+  rgw_sync_pipe_info_entity target;
+
+  rgw_sync_pipe_info() {}
+  rgw_sync_pipe_info(const rgw_sync_bucket_pipe& pipe,
+                     std::optional<RGWBucketInfo> source_bucket_info,
+                     std::optional<RGWBucketInfo> target_bucket_info) : source(pipe.source, source_bucket_info),
+                                                                        target(pipe.dest, target_bucket_info) {}
+
+  bool operator<(const rgw_sync_pipe_info& p) const {
+    if (source < p.source) {
+      return true;
+    }
+    if (p.source < source) {
+      return false;
+    }
+    return (target < p.target);
+  }
+
+  void update_empty_bucket_info(const std::map<rgw_bucket, RGWBucketInfo>& buckets_info) {
+    source.update_empty_bucket_info(buckets_info);
+    target.update_empty_bucket_info(buckets_info);
+  }
+};
+
+struct rgw_sync_pipe_info_set {
+  std::set<rgw_sync_pipe_info> pipes;
+
+  using iterator = std::set<rgw_sync_pipe_info>::iterator;
+
+  void clear() {
+    pipes.clear();
+  }
+
+  void insert(const rgw_sync_bucket_pipe& pipe,
+              std::optional<RGWBucketInfo>& source_bucket_info,
+              std::optional<RGWBucketInfo>& target_bucket_info) {
+    rgw_sync_pipe_info p(pipe, source_bucket_info, target_bucket_info);
+    pipes.insert(p);
+  }
+
+  iterator begin() {
+    return pipes.begin();
+  }
+
+  iterator end() {
+    return pipes.end();
+  }
+
+  void update_empty_bucket_info(const std::map<rgw_bucket, RGWBucketInfo>& buckets_info) {
+    if (buckets_info.empty()) {
+      return;
+    }
+
+    std::set<rgw_sync_pipe_info> p;
+
+    for (auto pipe : pipes) {
+      pipe.update_empty_bucket_info(buckets_info);
+      p.insert(pipe);
+    }
+
+    pipes = std::move(p);
+  }
+};
+
 class RGWGetBucketPeersCR : public RGWCoroutine {
   RGWDataSyncEnv *sync_env;
 
@@ -3428,18 +3564,20 @@ class RGWGetBucketPeersCR : public RGWCoroutine {
   std::optional<string> source_zone;
   std::optional<rgw_bucket> source_bucket;
 
-  RGWBucketSyncFlowManager::pipe_set *sources;
-  map<rgw_bucket, RGWBucketInfo> *sources_info;
+  rgw_sync_pipe_info_set *pipes;
+  map<rgw_bucket, RGWBucketInfo> buckets_info;
   map<rgw_bucket, RGWBucketInfo>::iterator siiter;
-  RGWBucketInfo *pbucket_info;
+  std::optional<RGWBucketInfo> target_bucket_info;
+  std::optional<RGWBucketInfo> source_bucket_info;
 
-  RGWBucketSyncFlowManager::pipe_set::iterator siter;
+  rgw_sync_pipe_info_set::iterator siter;
 
   rgw_bucket_sync_sources_local_info sources_local_info;
   rgw_bucket_sync_sources_local_info expected_local_info;
 
   rgw_bucket_get_sync_policy_params get_policy_params;
-  std::shared_ptr<rgw_bucket_get_sync_policy_result> policy;
+  std::shared_ptr<rgw_bucket_get_sync_policy_result> source_policy;
+  std::shared_ptr<rgw_bucket_get_sync_policy_result> target_policy;
 
   RGWSyncTraceNodeRef tn;
 
@@ -3457,81 +3595,86 @@ class RGWGetBucketPeersCR : public RGWCoroutine {
     return { b, std::next(b) };
   }
 
-  static RGWBucketSyncFlowManager::pipe_set filter_sources(std::optional<string> source_zone,
-                                                           std::optional<rgw_bucket> source_bucket,
-                                                           const map<string, RGWBucketSyncFlowManager::pipe_set>& all_sources) {
-    RGWBucketSyncFlowManager::pipe_set result;
-
+  void filter_sources(std::optional<string> source_zone,
+                      std::optional<rgw_bucket> source_bucket,
+                      const map<string, RGWBucketSyncFlowManager::pipe_set>& all_sources,
+                      rgw_sync_pipe_info_set *result) {
     auto iters = get_pipe_iters(all_sources, source_zone);
     for (auto i = iters.first; i != iters.second; ++i) {
-      for (auto& peer : i->second.pipes) {
+      for (auto& pipe : i->second.pipes) {
+        if (!pipe.specific()) {
+          continue;
+        }
         if (source_bucket &&
-            peer.source.bucket &&
-            *source_bucket != *peer.source.bucket) {
+            *source_bucket != *pipe.source.bucket) {
           continue;
         }
-        result.insert(peer);
+        result->insert(pipe, source_bucket_info, target_bucket_info);
       }
     }
-    return result;
   }
 
-  static RGWBucketSyncFlowManager::pipe_set filter_targets(std::optional<string> target_zone,
-                                                  std::optional<rgw_bucket> target_bucket,
-                                                  const map<string, RGWBucketSyncFlowManager::pipe_set>& all_targets) {
-    RGWBucketSyncFlowManager::pipe_set result;
-
+  void filter_targets(std::optional<string> target_zone,
+                      std::optional<rgw_bucket> target_bucket,
+                      const map<string, RGWBucketSyncFlowManager::pipe_set>& all_targets,
+                      rgw_sync_pipe_info_set *result) {
     auto iters = get_pipe_iters(all_targets, target_zone);
     for (auto i = iters.first; i != iters.second; ++i) {
-      for (auto& peer : i->second.pipes) {
+      for (auto& pipe : i->second.pipes) {
         if (target_bucket &&
-            peer.dest.bucket &&
-            *target_bucket != *peer.dest.bucket) {
+            pipe.dest.bucket &&
+            *target_bucket != *pipe.dest.bucket) {
           continue;
         }
-        result.insert(peer);
+        result->insert(pipe, source_bucket_info, target_bucket_info);
       }
     }
-    return result;
   }
 
+  void update_from_target_bucket_policy();
+  void update_from_source_bucket_policy();
+
 public:
   RGWGetBucketPeersCR(RGWDataSyncEnv *_sync_env,
                       std::optional<rgw_bucket> _target_bucket,
                       std::optional<string> _source_zone,
                       std::optional<rgw_bucket> _source_bucket,
-                      RGWBucketSyncFlowManager::pipe_set *_sources,
-                      map<rgw_bucket, RGWBucketInfo> *_sources_info,
-                      RGWBucketInfo *_pbucket_info,
+                      rgw_sync_pipe_info_set *_pipes,
                       const RGWSyncTraceNodeRef& _tn_parent)
     : RGWCoroutine(_sync_env->cct),
       sync_env(_sync_env),
       target_bucket(_target_bucket),
       source_zone(_source_zone),
-      sources(_sources),
-      sources_info(_sources_info),
-      pbucket_info(_pbucket_info),
-      policy(make_shared<rgw_bucket_get_sync_policy_result>()),
+      pipes(_pipes),
       tn(sync_env->sync_tracer->add_node(_tn_parent, "get_bucket_peers",
                                          SSTR( "target=" << target_bucket.value_or(rgw_bucket())
                                                << ":source=" << target_bucket.value_or(rgw_bucket())
-                                               << ":source_zone=" << source_zone.value_or("*")))) {}
+                                               << ":source_zone=" << source_zone.value_or("*")))) {
+        if (target_bucket) {
+          target_policy = make_shared<rgw_bucket_get_sync_policy_result>();
+        }
+        if (source_bucket) {
+          source_policy = make_shared<rgw_bucket_get_sync_policy_result>();
+        }
+      }
 
   int operate() override;
 };
 
 class RGWRunBucketSourcesSyncCR : public RGWCoroutine {
   RGWDataSyncEnv *sync_env;
-  rgw_bucket bucket;
-  RGWBucketInfo bucket_info;
 
+  std::optional<rgw_bucket_shard> target_bs;
   std::optional<std::string> source_zone;
+  std::optional<rgw_bucket_shard> source_bs;
+
+  std::optional<rgw_bucket> target_bucket;
+  std::optional<rgw_bucket> source_bucket;
 
   rgw_raw_obj sources_obj;
 
-  RGWBucketSyncFlowManager::pipe_set sources;
-  map<rgw_bucket, RGWBucketInfo> sources_info;
-  RGWBucketSyncFlowManager::pipe_set::iterator siter;
+  rgw_sync_pipe_info_set pipes;
+  rgw_sync_pipe_info_set::iterator siter;
 
   rgw_bucket_sync_pair_info sync_pair;
 
@@ -3549,16 +3692,24 @@ class RGWRunBucketSourcesSyncCR : public RGWCoroutine {
 
 public:
   RGWRunBucketSourcesSyncCR(RGWDataSyncEnv *_sync_env,
-                            const rgw_bucket& _bucket,
-                            optional<string> _source_zone,
+                            std::optional<rgw_bucket_shard> _target_bs,
+                            std::optional<string> _source_zone,
+                            std::optional<rgw_bucket_shard> _source_bs,
                             const RGWSyncTraceNodeRef& _tn_parent)
     : RGWCoroutine(_sync_env->cct),
       sync_env(_sync_env),
-      bucket(_bucket),
+      target_bs(_target_bs),
       source_zone(_source_zone),
-      sources_obj(RGWBucketSyncPeersManager::sync_sources_obj(_sync_env->svc->zone, bucket)),
+      source_bs(_source_bs),
       tn(sync_env->sync_tracer->add_node(_tn_parent, "bucket_sync_sources",
-                                         SSTR(bucket))) {
+                                         SSTR( "target=" << target_bucket.value_or(rgw_bucket()) << ":source_bucket=" << source_bucket.value_or(rgw_bucket()) << ":source_zone=" << source_zone.value_or("*")))) {
+    if (target_bs) {
+      target_bucket = target_bs->bucket;
+      sources_obj = RGWBucketSyncPeersManager::sync_sources_obj(_sync_env->svc->zone, *target_bucket);
+    }
+    if (source_bs) {
+      source_bucket = source_bs->bucket;
+    }
   }
   ~RGWRunBucketSourcesSyncCR() override {
     if (lease_cr) {
@@ -3593,7 +3744,7 @@ int RGWRunBucketSourcesSyncCR::operate()
     }
 
     tn->log(10, "took lease");
-    yield call(new RGWGetBucketPeersCR(sync_env, bucket, source_zone, std::nullopt, &sources, &sources_info, &bucket_info, tn));
+    yield call(new RGWGetBucketPeersCR(sync_env, target_bucket, source_zone, source_bucket, &pipes, tn));
     if (retcode < 0 && retcode != -ENOENT) {
       tn->log(0, "ERROR: failed to read sync status for bucket");
       lease_cr->go_down();
@@ -3601,16 +3752,12 @@ int RGWRunBucketSourcesSyncCR::operate()
       return set_cr_error(retcode);
     }
 
-    for (siter = sources.begin(); siter != sources.end(); ++siter) {
+    for (siter = pipes.begin(); siter != pipes.end(); ++siter) {
       scs.emplace_back();
       cur_sc = &scs.back();
-      if (!siter->source.zone ||
-          !siter->source.bucket) {
-        continue;
-      }
 
       {
-        auto& szone = *siter->source.zone;
+        auto& szone = siter->source.zone;
         if (last_zone != szone) {
           conn = sync_env->svc->zone->get_zone_conn_by_id(szone);
           if (!conn) {
@@ -3622,8 +3769,9 @@ int RGWRunBucketSourcesSyncCR::operate()
         cur_sc->init(sync_env, conn, szone);
       }
 
-      sync_pair.source_bs.bucket = *siter->source.bucket;
-      sync_pair.dest_bs.bucket = bucket_info.bucket;
+      sync_pair.source_bs.bucket = siter->source.get_bucket();
+      sync_pair.dest_bs.bucket = siter->target.get_bucket();
+#warning TODO iterate over shards
 
       yield spawn(new RGWRunBucketSyncCoroutine(cur_sc, sync_pair, tn), false);
       while (num_spawned() > BUCKET_SYNC_SPAWN_WINDOW) {
@@ -3710,9 +3858,58 @@ int RGWSyncGetBucketInfoCR::operate()
   return 0;
 }
 
+void RGWGetBucketPeersCR::update_from_target_bucket_policy()
+{
+  auto handler = target_policy->policy_handler.get();
+
+  if (!pipes) {
+    return;
+  }
+
+  filter_sources(source_zone,
+                 source_bucket,
+                 handler->get_sources(),
+                 pipes);
+
+  for (siter = pipes->begin(); siter != pipes->end(); ++siter) {
+    if (!siter->source.has_bucket_info()) {
+      buckets_info.emplace(siter->source.get_bucket(), RGWBucketInfo());
+    }
+    if (!siter->target.has_bucket_info()) {
+      buckets_info.emplace(siter->target.get_bucket(), RGWBucketInfo());
+    }
+  }
+}
+
+void RGWGetBucketPeersCR::update_from_source_bucket_policy()
+{
+  auto handler = source_policy->policy_handler.get();
+
+  if (!pipes) {
+    return;
+  }
+
+  filter_targets(sync_env->svc->zone->get_zone().id,
+                 target_bucket,
+                 handler->get_targets(),
+                 pipes);
+
+  for (siter = pipes->begin(); siter != pipes->end(); ++siter) {
+    if (!siter->source.has_bucket_info()) {
+      buckets_info.emplace(siter->source.get_bucket(), RGWBucketInfo());
+    }
+    if (!siter->target.has_bucket_info()) {
+      buckets_info.emplace(siter->target.get_bucket(), RGWBucketInfo());
+    }
+  }
+}
+
 int RGWGetBucketPeersCR::operate()
 {
   reenter(this) {
+    if (pipes) {
+      pipes->clear();
+    }
     if (target_bucket) {
       yield call(new RGWSimpleRadosReadCR<rgw_bucket_sync_sources_local_info>(sync_env->async_rados,
                                                                               sync_env->svc->sysobj,
@@ -3722,55 +3919,53 @@ int RGWGetBucketPeersCR::operate()
           retcode != -ENOENT) {
         return set_cr_error(retcode);
       }
-    }
 
-    get_policy_params.bucket = *target_bucket;
-    yield call(new RGWBucketGetSyncPolicyHandlerCR(sync_env->async_rados,
-                                                   sync_env->store,
-                                                   get_policy_params,
-                                                   policy));
-    if (retcode < 0 &&
-        retcode != -ENOENT) {
-      return set_cr_error(retcode);
+      get_policy_params.bucket = *target_bucket;
+      yield call(new RGWBucketGetSyncPolicyHandlerCR(sync_env->async_rados,
+                                                     sync_env->store,
+                                                     get_policy_params,
+                                                     target_policy));
+      if (retcode < 0 &&
+          retcode != -ENOENT) {
+        return set_cr_error(retcode);
+      }
     }
-#warning update local copy of sources and act on changes
 
-    {
-      auto& handler = policy->policy_handler;
-
-      if (sources) {
-        *sources = filter_sources(source_zone,
-                                  source_bucket,
-                                  handler->get_sources());
+    if (source_bucket && source_zone) {
+      yield call(new RGWSimpleRadosReadCR<rgw_bucket_sync_sources_local_info>(sync_env->async_rados,
+                                                                              sync_env->svc->sysobj,
+                                                                              RGWBucketSyncPeersManager::sync_targets_obj(sync_env->svc->zone, *source_zone, *source_bucket),
+                                                                              &sources_local_info));
+      if (retcode < 0 &&
+          retcode != -ENOENT) {
+        return set_cr_error(retcode);
       }
 
-      auto& binfo = handler->get_bucket_info();
-      if (binfo) {
-        *pbucket_info = *binfo;
-        if (sources_info) {
-          (*sources_info)[binfo->bucket] = *binfo;
-        }
+      get_policy_params.bucket = *source_bucket;
+      yield call(new RGWBucketGetSyncPolicyHandlerCR(sync_env->async_rados,
+                                                     sync_env->store,
+                                                     get_policy_params,
+                                                     source_policy));
+      if (retcode < 0 &&
+          retcode != -ENOENT) {
+        return set_cr_error(retcode);
       }
+
+      source_bucket_info = source_policy->policy_handler->get_bucket_info();
     }
+#warning update local copy of sources and act on changes
 
-    if (sources && sources_info) {
-      for (siter = sources->begin(); siter != sources->end(); ++siter) {
-        source_bucket = siter->source.bucket;
-        if (!source_bucket) {
-          continue;
-        }
-        if (sources_info->find(*source_bucket) != sources_info->end()) {
-          continue;
-        }
+    update_from_target_bucket_policy();
+    update_from_source_bucket_policy();
 
-        sources_info->emplace(*source_bucket, RGWBucketInfo()); /* reserve space for it, will fetch it later when map cannot change */
+    for (siiter = buckets_info.begin(); siiter != buckets_info.end(); ++siiter) {
+      if (siiter->second.bucket.name.empty()) {
+        yield call(new RGWSyncGetBucketInfoCR(sync_env, siiter->first, &siiter->second, tn));
       }
+    }
 
-      for (siiter = sources_info->begin(); siiter != sources_info->end(); ++siiter) {
-        if (siiter->second.bucket.name.empty()) {
-          yield call(new RGWSyncGetBucketInfoCR(sync_env, siiter->first, &siiter->second, tn));
-        }
-      }
+    if (pipes) {
+      pipes->update_empty_bucket_info(buckets_info);
     }
 
     return set_cr_done();
@@ -3921,17 +4116,13 @@ int RGWBucketPipeSyncStatusManager::init()
                 error_logger, store->getRados()->get_sync_tracer(),
                 sync_module, nullptr);
 
-  RGWBucketSyncFlowManager::pipe_set sources;
-  map<rgw_bucket, RGWBucketInfo> sources_info;
-  RGWBucketInfo dest_bucket_info;
+  rgw_sync_pipe_info_set pipes;
 
   ret = cr_mgr.run(new RGWGetBucketPeersCR(&sync_env,
                                            dest_bucket,
                                            source_zone,
                                            std::nullopt,
-                                           &sources,
-                                           &sources_info,
-                                           &dest_bucket_info,
+                                           &pipes,
                                            sync_env.sync_tracer->root_node));
   if (ret < 0) {
     ldpp_dout(this, 0) << "failed to get bucket source peers info: (ret=" << ret << "): " << cpp_strerror(-ret) << dendl;
@@ -3940,12 +4131,8 @@ int RGWBucketPipeSyncStatusManager::init()
 
   string last_zone;
 
-  for (auto& peer : sources) {
-    if (!peer.source.zone) {
-      continue;
-    }
-
-    auto szone = *peer.source.zone;
+  for (auto& pipe : pipes) {
+    auto& szone = pipe.source.zone;
 
     if (last_zone != szone) {
       conn = store->svc()->zone->get_zone_conn_by_id(szone);
@@ -3956,24 +4143,10 @@ int RGWBucketPipeSyncStatusManager::init()
       last_zone = szone;
     }
 
-    auto& source_bucket = peer.source.bucket;
-
-    if (!source_bucket) {
-      continue;
-    }
-
-    auto iter = sources_info.find(*source_bucket);
-    if (iter == sources_info.end()) {
-      ldpp_dout(this, 0) << "ERROR: " << __func__ << "(): failed to find bucket info for bucket=" << *source_bucket << ". Likely a bug" << dendl;
-      return -EIO;
-    }
-
-    auto& source_bucket_info = iter->second;
-
     source_mgrs.push_back(new RGWRemoteBucketManager(this, &sync_env,
                                                      szone, conn,
-                                                     source_bucket_info,
-                                                     dest_bucket_info.bucket));
+                                                     pipe.source.get_bucket_info(),
+                                                     pipe.target.get_bucket()));
   }
 
   return 0;
index 57081a62aae8b0d556bf2d929499f7eba7c3740c..b38df78e46f202605e746fa713e05f5dfc129955 100644 (file)
@@ -86,6 +86,10 @@ struct rgw_sync_bucket_entity {
                          std::optional<rgw_bucket> _bucket) : zone(_zone),
                                                               bucket(_bucket.value_or(rgw_bucket())) {}
 
+  bool specific() const {
+    return zone && bucket;
+  }
+
   void encode(bufferlist& bl) const {
     ENCODE_START(1, 1, bl);
     encode(all_zones, bl);
@@ -166,6 +170,10 @@ public:
   rgw_sync_bucket_entity source;
   rgw_sync_bucket_entity dest;
 
+  bool specific() const {
+    return source.specific() && dest.specific();
+  }
+
   void encode(bufferlist& bl) const {
     ENCODE_START(1, 1, bl);
     encode(source, bl);