]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: sync: fixes
authorYehuda Sadeh <yehuda@redhat.com>
Thu, 31 Oct 2019 23:11:57 +0000 (16:11 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Tue, 28 Jan 2020 18:20:38 +0000 (10:20 -0800)
Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_data_sync.cc
src/rgw/rgw_sync_policy.h

index 04ea6e5869848444f4bda7435c8de88b497ce55d..dce0c03c60053d230db8237775f24c681f4ca3dc 100644 (file)
@@ -1180,6 +1180,10 @@ struct rgw_sync_pipe_info_set {
     return pipes.end();
   }
 
+  bool empty() const {
+    return pipes.empty();
+  }
+
   void update_empty_bucket_info(const std::map<rgw_bucket, RGWBucketInfo>& buckets_info) {
     if (buckets_info.empty()) {
       return;
@@ -3659,16 +3663,21 @@ class RGWGetBucketPeersCR : public RGWCoroutine {
                       std::optional<rgw_bucket> source_bucket,
                       const map<string, RGWBucketSyncFlowManager::pipe_set>& all_sources,
                       rgw_sync_pipe_info_set *result) {
+    ldpp_dout(sync_env->dpp, 20) << __func__ << ": source_zone=" << source_zone.value_or("*")
+                                << " source_bucket=" << source_bucket.value_or(rgw_bucket())
+                                << " all_sources.size()=" << all_sources.size() << dendl;
     auto iters = get_pipe_iters(all_sources, source_zone);
     for (auto i = iters.first; i != iters.second; ++i) {
       for (auto& pipe : i->second.pipes) {
         if (!pipe.specific()) {
+          ldpp_dout(sync_env->dpp, 20) << __func__ << ": pipe=" << pipe << ": skipping" << dendl;
           continue;
         }
         if (source_bucket &&
             *source_bucket != *pipe.source.bucket) {
           continue;
         }
+        ldpp_dout(sync_env->dpp, 20) << __func__ << ": pipe=" << pipe << ": adding" << dendl;
         result->insert(pipe, source_bucket_info, target_bucket_info);
       }
     }
@@ -3678,14 +3687,19 @@ class RGWGetBucketPeersCR : public RGWCoroutine {
                       std::optional<rgw_bucket> target_bucket,
                       const map<string, RGWBucketSyncFlowManager::pipe_set>& all_targets,
                       rgw_sync_pipe_info_set *result) {
+    ldpp_dout(sync_env->dpp, 20) << __func__ << ": target_zone=" << source_zone.value_or("*")
+                                << " target_bucket=" << source_bucket.value_or(rgw_bucket())
+                                << " all_targets.size()=" << all_targets.size() << dendl;
     auto iters = get_pipe_iters(all_targets, target_zone);
     for (auto i = iters.first; i != iters.second; ++i) {
       for (auto& pipe : i->second.pipes) {
         if (target_bucket &&
             pipe.dest.bucket &&
             *target_bucket != *pipe.dest.bucket) {
+          ldpp_dout(sync_env->dpp, 20) << __func__ << ": pipe=" << pipe << ": skipping" << dendl;
           continue;
         }
+        ldpp_dout(sync_env->dpp, 20) << __func__ << ": pipe=" << pipe << ": adding" << dendl;
         result->insert(pipe, source_bucket_info, target_bucket_info);
       }
     }
@@ -3705,6 +3719,7 @@ public:
       sync_env(_sync_env),
       target_bucket(_target_bucket),
       source_zone(_source_zone),
+      source_bucket(_source_bucket),
       pipes(_pipes),
       tn(sync_env->sync_tracer->add_node(_tn_parent, "get_bucket_peers",
                                          SSTR( "target=" << target_bucket.value_or(rgw_bucket())
@@ -3753,6 +3768,7 @@ RGWRunBucketSourcesSyncCR::RGWRunBucketSourcesSyncCR(RGWDataSyncCtx *_sc,
 int RGWRunBucketSourcesSyncCR::operate()
 {
   reenter(this) {
+#if 0
     yield {
       set_status("acquiring sync lock");
       auto store = sync_env->store;
@@ -3774,15 +3790,22 @@ int RGWRunBucketSourcesSyncCR::operate()
     }
 
     tn->log(10, "took lease");
+#endif
     yield call(new RGWGetBucketPeersCR(sync_env, target_bucket, sc->source_zone, source_bucket, &pipes, tn));
     if (retcode < 0 && retcode != -ENOENT) {
       tn->log(0, "ERROR: failed to read sync status for bucket");
+#if 0
       lease_cr->go_down();
       drain_all();
+#endif
       return set_cr_error(retcode);
     }
     ldpp_dout(sync_env->dpp, 20) << __func__ << "(): requested source_bs=" << source_bs << " target_bs=" << target_bs << dendl;
 
+    if (pipes.empty()) {
+      ldpp_dout(sync_env->dpp, 20) << __func__ << "(): no relevant sync pipes found" << dendl;
+    }
+
     for (siter = pipes.begin(); siter != pipes.end(); ++siter) {
       {
         ldpp_dout(sync_env->dpp, 20) << __func__ << "(): sync pipe=" << *siter << dendl;
@@ -3805,6 +3828,7 @@ int RGWRunBucketSourcesSyncCR::operate()
       }
 
       ldpp_dout(sync_env->dpp, 20) << __func__ << "(): num shards=" << num_shards << " cur_shard=" << cur_shard << dendl;
+
       for (; num_shards > 0; --num_shards, ++cur_shard) {
         sync_pair.source_bs.shard_id = cur_shard;
         if (source_num_shards == target_num_shards) {
@@ -3832,7 +3856,9 @@ int RGWRunBucketSourcesSyncCR::operate()
       }
     }
 
+#if 0
     lease_cr->go_down();
+#endif
     drain_all();
     return set_cr_done();
   }
@@ -3903,12 +3929,13 @@ int RGWSyncGetBucketInfoCR::operate()
 
 void RGWGetBucketPeersCR::update_from_target_bucket_policy()
 {
-  auto handler = target_policy->policy_handler.get();
-
-  if (!pipes) {
+  if (!target_policy ||
+      !pipes) {
     return;
   }
 
+  auto handler = target_policy->policy_handler.get();
+
   filter_sources(source_zone,
                  source_bucket,
                  handler->get_sources(),
@@ -3926,12 +3953,13 @@ void RGWGetBucketPeersCR::update_from_target_bucket_policy()
 
 void RGWGetBucketPeersCR::update_from_source_bucket_policy()
 {
-  auto handler = source_policy->policy_handler.get();
-
-  if (!pipes) {
+  if (!source_policy ||
+      !pipes) {
     return;
   }
 
+  auto handler = source_policy->policy_handler.get();
+
   filter_targets(sync_env->svc->zone->get_zone().id,
                  target_bucket,
                  handler->get_targets(),
index b38df78e46f202605e746fa713e05f5dfc129955..979785deb30f5936b284382e80848eb1de918ec7 100644 (file)
@@ -312,6 +312,11 @@ struct rgw_sync_bucket_pipes {
 };
 WRITE_CLASS_ENCODER(rgw_sync_bucket_pipes)
 
+std::ostream& operator<<(std::ostream& os, const rgw_sync_bucket_entity& e);
+std::ostream& operator<<(std::ostream& os, const rgw_sync_bucket_pipe& pipe);
+std::ostream& operator<<(std::ostream& os, const rgw_sync_bucket_entities& e);
+std::ostream& operator<<(std::ostream& os, const rgw_sync_bucket_pipes& pipe);
+
 /*
  * define data flow between zones. Symmetrical: zones sync from each other.
  * Directional: one zone fetches data from another.