]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: define bucket sync pipes
authorYehuda Sadeh <yehuda@redhat.com>
Sat, 24 Aug 2019 00:15:43 +0000 (17:15 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Tue, 28 Jan 2020 18:20:36 +0000 (10:20 -0800)
Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_bucket_sync.h
src/rgw/rgw_data_sync.cc
src/rgw/rgw_data_sync.h

index 72e7dedb69b546e8369624fdeb6adfbe1c706376..3858c95991b71d86bf2156d7a53d960453088ea5 100644 (file)
@@ -27,6 +27,7 @@ class RGWBucketSyncPolicyHandler {
 
   std::set<string> source_zones;
 
+public:
   struct peer_info {
     std::string type;
     rgw_bucket bucket;
@@ -38,10 +39,15 @@ class RGWBucketSyncPolicyHandler {
       }
       return (type < si.type);
     }
+
+    bool is_rgw() const {
+      return (type.empty() || type == "rgw");
+    }
   };
 
-  std::map<string, std::set<peer_info> > sources;
-  std::map<string, std::set<peer_info> > targets;
+private:
+  std::map<string, std::set<peer_info> > sources; /* peers by zone */
+  std::map<string, std::set<peer_info> > targets; /* peers by zone */
 
 public:
   RGWBucketSyncPolicyHandler(RGWSI_Zone *_zone_svc,
@@ -49,6 +55,10 @@ public:
                                                             bucket_info(_bucket_info) {}
   int init();
 
+  std::map<string, std::set<peer_info> >& get_sources() {
+    return sources;
+  }
+
   const RGWBucketInfo& get_bucket_info() const {
     return bucket_info;
   }
index 9fb9cf1ef73770d1117395b0a0eb2f41b0e273a5..43687023c512b6abc3f0c995cfd8238cd7643c20 100644 (file)
@@ -21,6 +21,7 @@
 #include "rgw_cr_tools.h"
 #include "rgw_http_client.h"
 #include "rgw_bucket.h"
+#include "rgw_bucket_sync.h"
 #include "rgw_metadata.h"
 #include "rgw_sync_counters.h"
 #include "rgw_sync_module.h"
@@ -1039,6 +1040,7 @@ std::ostream& operator<<(std::ostream& out, const bucket_shard_str& rhs) {
 class RGWRunBucketSyncCoroutine : public RGWCoroutine {
   RGWDataSyncCtx *sc;
   RGWDataSyncEnv *sync_env;
+  rgw_bucket_sync_pair_info sync_pair;
   rgw_bucket_sync_pipe sync_pipe;
   rgw_bucket_shard_sync_info sync_status;
   RGWMetaSyncEnv meta_sync_env;
@@ -1051,12 +1053,11 @@ class RGWRunBucketSyncCoroutine : public RGWCoroutine {
   RGWSyncTraceNodeRef tn;
 
 public:
-  RGWRunBucketSyncCoroutine(RGWDataSyncCtx *_sc, const rgw_bucket_shard& bs, const RGWSyncTraceNodeRef& _tn_parent)
-    : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
-      status_oid(RGWBucketPipeSyncStatusManager::status_oid(sc->source_zone, sync_pipe)),
+  RGWRunBucketSyncCoroutine(RGWDataSyncCtx *_sc, const rgw_bucket_sync_pair_info& _sync_pair, const RGWSyncTraceNodeRef& _tn_parent)
+    : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env), sync_pair(_sync_pair),
+      status_oid(RGWBucketPipeSyncStatusManager::status_oid(sc->source_zone, sync_pair)),
       tn(sync_env->sync_tracer->add_node(_tn_parent, "bucket",
-                                         SSTR(bucket_shard_str{bs}))) {
-    sync_pipe.source_bs = bs;
+                                         SSTR(bucket_shard_str{_sync_pair.dest_bs} << "<-" << bucket_shard_str{_sync_pair.source_bs} ))) {
   }
   ~RGWRunBucketSyncCoroutine() override {
     if (lease_cr) {
@@ -1075,6 +1076,7 @@ class RGWDataSyncSingleEntryCR : public RGWCoroutine {
   string entry_marker;
 
   rgw_bucket_shard bs;
+  rgw_bucket_sync_pair_info sync_pair;
 
   int sync_status;
 
@@ -1114,7 +1116,12 @@ public:
             marker_tracker->reset_need_retry(raw_key);
           }
           tn->log(0, SSTR("triggering sync of bucket/shard " << bucket_shard_str{bs}));
-          call(new RGWRunBucketSyncCoroutine(sc, bs, tn));
+
+          sync_pair = rgw_bucket_sync_pair_info();
+          sync_pair.source_bs = bs;
+          sync_pair.dest_bs = bs;
+#warning init pipe fields
+          call(new RGWRunBucketSyncCoroutine(sc, sync_pair, tn));
         }
       } while (marker_tracker && marker_tracker->need_retry(raw_key));
 
@@ -1768,7 +1775,7 @@ int RGWDefaultSyncModule::create_instance(CephContext *cct, const JSONFormattabl
 RGWCoroutine *RGWDefaultDataSyncModule::sync_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace)
 {
   auto sync_env = sc->env;
-  return new RGWFetchRemoteObjCR(sync_env->async_rados, sync_env->store, sc->source_zone, sync_pipe.source_bs.bucket,
+  return new RGWFetchRemoteObjCR(sync_env->async_rados, sync_env->store, sc->source_zone, sync_pipe.info.source_bs.bucket,
                                 std::nullopt, sync_pipe.dest_bucket_info,
                                  key, std::nullopt, versioned_epoch,
                                  true, zones_trace, sync_env->counters, sync_env->dpp);
@@ -1826,7 +1833,7 @@ int RGWArchiveSyncModule::create_instance(CephContext *cct, const JSONFormattabl
 RGWCoroutine *RGWArchiveDataSyncModule::sync_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace)
 {
   auto sync_env = sc->env;
-  ldout(sc->cct, 5) << "SYNC_ARCHIVE: sync_object: b=" << sync_pipe.source_bs.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl;
+  ldout(sc->cct, 5) << "SYNC_ARCHIVE: sync_object: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl;
   if (!sync_pipe.dest_bucket_info.versioned() ||
      (sync_pipe.dest_bucket_info.flags & BUCKET_VERSIONS_SUSPENDED)) {
       ldout(sc->cct, 0) << "SYNC_ARCHIVE: sync_object: enabling object versioning for archive bucket" << dendl;
@@ -1849,7 +1856,7 @@ RGWCoroutine *RGWArchiveDataSyncModule::sync_object(RGWDataSyncCtx *sc, rgw_buck
   }
 
   return new RGWFetchRemoteObjCR(sync_env->async_rados, sync_env->store, sc->source_zone,
-                                 sync_pipe.source_bs.bucket, std::nullopt, sync_pipe.dest_bucket_info,
+                                 sync_pipe.info.source_bs.bucket, std::nullopt, sync_pipe.dest_bucket_info,
                                  key, dest_key, versioned_epoch,
                                  true, zones_trace, nullptr, sync_env->dpp);
 }
@@ -1857,14 +1864,14 @@ RGWCoroutine *RGWArchiveDataSyncModule::sync_object(RGWDataSyncCtx *sc, rgw_buck
 RGWCoroutine *RGWArchiveDataSyncModule::remove_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key,
                                                      real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace)
 {
-  ldout(sc->cct, 0) << "SYNC_ARCHIVE: remove_object: b=" << sync_pipe.source_bs.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch << dendl;
+  ldout(sc->cct, 0) << "SYNC_ARCHIVE: remove_object: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch << dendl;
   return NULL;
 }
 
 RGWCoroutine *RGWArchiveDataSyncModule::create_delete_marker(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime,
                                                             rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace)
 {
-  ldout(sc->cct, 0) << "SYNC_ARCHIVE: create_delete_marker: b=" << sync_pipe.source_bs.bucket << " k=" << key << " mtime=" << mtime
+  ldout(sc->cct, 0) << "SYNC_ARCHIVE: create_delete_marker: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " mtime=" << mtime
                                    << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
   auto sync_env = sc->env;
   return new RGWRemoveObjCR(sync_env->async_rados, sync_env->store, sc->source_zone,
@@ -2111,14 +2118,14 @@ public:
                                         rgw_bucket_shard_sync_info& _status)
     : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
       sync_pipe(_sync_pipe),
-      sync_status_oid(RGWBucketPipeSyncStatusManager::status_oid(sc->source_zone, _sync_pipe)),
+      sync_status_oid(RGWBucketPipeSyncStatusManager::status_oid(sc->source_zone, _sync_pipe.info)),
       status(_status)
   {}
 
   int operate() override {
     reenter(this) {
       /* fetch current position in logs */
-      yield call(new RGWReadRemoteBucketIndexLogInfoCR(sc, sync_pipe.source_bs, &info));
+      yield call(new RGWReadRemoteBucketIndexLogInfoCR(sc, sync_pipe.info.source_bs, &info));
       if (retcode < 0 && retcode != -ENOENT) {
         ldout(cct, 0) << "ERROR: failed to fetch bucket index status" << dendl;
         return set_cr_error(retcode);
@@ -2160,6 +2167,7 @@ RGWCoroutine *RGWRemoteBucketLog::init_sync_status_cr()
 #warning FIXME
   rgw_bucket_sync_pipe sync_pipe;
   sync_pipe.source_bs = bs;
+  sync_pipe.dest_bs = bs;
   return new RGWInitBucketShardSyncStatusCoroutine(&sc, sync_pipe, init_status);
 }
 
@@ -2231,10 +2239,10 @@ class RGWReadBucketPipeSyncStatusCoroutine : public RGWCoroutine {
   map<string, bufferlist> attrs;
 public:
   RGWReadBucketPipeSyncStatusCoroutine(RGWDataSyncCtx *_sc,
-                                   const rgw_bucket_sync_pipe& sync_pipe,
+                                   const rgw_bucket_sync_pair_info& sync_pair,
                                    rgw_bucket_shard_sync_info *_status)
     : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
-      oid(RGWBucketPipeSyncStatusManager::status_oid(sc->source_zone, sync_pipe)),
+      oid(RGWBucketPipeSyncStatusManager::status_oid(sc->source_zone, sync_pair)),
       status(_status) {}
   int operate() override;
 };
@@ -2435,10 +2443,11 @@ int RGWRemoteDataLog::read_shard_status(int shard_id, set<string>& pending_bucke
 
 RGWCoroutine *RGWRemoteBucketLog::read_sync_status_cr(rgw_bucket_shard_sync_info *sync_status)
 {
+  rgw_bucket_sync_pair_info sync_pair;
+  sync_pair.source_bs = bs;
 #warning FIXME
-  rgw_bucket_sync_pipe sync_pipe;
-  sync_pipe.source_bs = bs;
-  return new RGWReadBucketPipeSyncStatusCoroutine(&sc, sync_pipe, sync_status);
+  sync_pair.dest_bs = bs;
+  return new RGWReadBucketPipeSyncStatusCoroutine(&sc, sync_pair, sync_status);
 }
 
 RGWBucketPipeSyncStatusManager::RGWBucketPipeSyncStatusManager(rgw::sal::RGWRadosStore *_store, const string& _source_zone,
@@ -2802,7 +2811,7 @@ public:
                             const T& _entry_marker, RGWSyncShardMarkerTrack<T, K> *_marker_tracker, rgw_zone_set& _zones_trace,
                              RGWSyncTraceNodeRef& _tn_parent) : RGWCoroutine(_sc->cct),
                                                      sc(_sc), sync_env(_sc->env),
-                                                      sync_pipe(_sync_pipe), bs(_sync_pipe.source_bs),
+                                                      sync_pipe(_sync_pipe), bs(_sync_pipe.info.source_bs),
                                                       key(_key), versioned(_versioned), versioned_epoch(_versioned_epoch),
                                                       owner(_owner),
                                                       timestamp(_timestamp), op(_op),
@@ -2937,7 +2946,7 @@ public:
                            rgw_bucket_shard_sync_info& sync_info,
                            RGWSyncTraceNodeRef tn_parent)
     : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
-      sync_pipe(_sync_pipe), bs(_sync_pipe.source_bs),
+      sync_pipe(_sync_pipe), bs(_sync_pipe.info.source_bs),
       lease_cr(lease_cr), sync_info(sync_info),
       marker_tracker(sc, status_oid, sync_info.full_marker),
       status_oid(status_oid),
@@ -3089,7 +3098,7 @@ public:
                                   rgw_bucket_shard_sync_info& sync_info,
                                   RGWSyncTraceNodeRef& _tn_parent)
     : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
-      sync_pipe(_sync_pipe), bs(_sync_pipe.source_bs),
+      sync_pipe(_sync_pipe), bs(_sync_pipe.info.source_bs),
       lease_cr(lease_cr), sync_info(sync_info),
       marker_tracker(sc, status_oid, sync_info.inc_marker),
       status_oid(status_oid), zone_id(sync_env->svc->zone->get_zone().id),
@@ -3397,11 +3406,12 @@ struct rgw_bucket_sync_sources_local_info {
 };
 WRITE_CLASS_ENCODER(rgw_bucket_sync_sources_local_info)
 
-class RGWReadBucketSourcesInfoCR : public RGWCoroutine {
+class RGWGetBucketSourcePeersCR : public RGWCoroutine {
   RGWDataSyncEnv *sync_env;
   rgw_bucket bucket;
 
-  RGWBucketInfo bucket_info;
+  map<string, set<RGWBucketSyncPolicyHandler::peer_info> > *sources;
+  RGWBucketInfo *pbucket_info;
 
   rgw_raw_obj sources_obj;
 
@@ -3409,19 +3419,23 @@ class RGWReadBucketSourcesInfoCR : public RGWCoroutine {
   rgw_bucket_sync_sources_local_info expected_local_info;
 
   rgw_bucket_get_sync_policy_params get_policy_params;
-  std::shared_ptr<rgw_bucket_get_sync_policy_result> get_policy_result;
+  std::shared_ptr<rgw_bucket_get_sync_policy_result> policy;
 
   RGWSyncTraceNodeRef tn;
 
 public:
-  RGWReadBucketSourcesInfoCR(RGWDataSyncEnv *_sync_env,
-                             const rgw_bucket& _bucket,
-                             const RGWSyncTraceNodeRef& _tn_parent)
+  RGWGetBucketSourcePeersCR(RGWDataSyncEnv *_sync_env,
+                            const rgw_bucket& _bucket,
+                            map<string, set<RGWBucketSyncPolicyHandler::peer_info> > *_sources,
+                            RGWBucketInfo *_pbucket_info,
+                            const RGWSyncTraceNodeRef& _tn_parent)
     : RGWCoroutine(_sync_env->cct),
       sync_env(_sync_env),
       bucket(_bucket),
+      sources(_sources),
+      pbucket_info(_pbucket_info),
       sources_obj(RGWBucketSyncSourcesManager::sync_sources_obj(sync_env->svc->zone, bucket)),
-      get_policy_result(make_shared<rgw_bucket_get_sync_policy_result>()),
+      policy(make_shared<rgw_bucket_get_sync_policy_result>()),
       tn(sync_env->sync_tracer->add_node(_tn_parent, "read_bucket_sources",
                                          SSTR(bucket))) {
   }
@@ -3429,7 +3443,7 @@ public:
   int operate() override;
 };
 
-int RGWReadBucketSourcesInfoCR::operate()
+int RGWGetBucketSourcePeersCR::operate()
 {
   reenter(this) {
     yield call(new RGWSimpleRadosReadCR<rgw_bucket_sync_sources_local_info>(sync_env->async_rados,
@@ -3445,11 +3459,19 @@ int RGWReadBucketSourcesInfoCR::operate()
     yield call(new RGWBucketGetSyncPolicyHandlerCR(sync_env->async_rados,
                                                    sync_env->store,
                                                    get_policy_params,
-                                                   get_policy_result));
+                                                   policy));
     if (retcode < 0 &&
         retcode != -ENOENT) {
       return set_cr_error(retcode);
     }
+#warning update local copy of sources and act on changes
+
+    {
+      auto& handler = policy->policy_handler;
+
+      *sources = handler->get_sources();
+      *pbucket_info = handler->get_bucket_info();
+    }
 
     return set_cr_done();
   }
@@ -3460,14 +3482,24 @@ int RGWReadBucketSourcesInfoCR::operate()
 class RGWRunBucketSourcesSyncCR : public RGWCoroutine {
   RGWDataSyncEnv *sync_env;
   rgw_bucket bucket;
-  rgw_sync_source source;
+  RGWBucketInfo bucket_info;
 
   rgw_raw_obj sources_obj;
 
+  map<string, set<RGWBucketSyncPolicyHandler::peer_info> > sources;
+  map<string, set<RGWBucketSyncPolicyHandler::peer_info> >::iterator siter;
+  set<RGWBucketSyncPolicyHandler::peer_info>::iterator piter;
+
+  rgw_bucket_sync_pair_info sync_pair;
+
   boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
   boost::intrusive_ptr<RGWCoroutinesStack> lease_stack;
 
   RGWSyncTraceNodeRef tn;
+  std::vector<RGWDataSyncCtx> scs;
+  RGWDataSyncCtx *cur_sc{nullptr};
+
+  int ret{0};
 
 public:
   RGWRunBucketSourcesSyncCR(RGWDataSyncEnv *_sync_env,
@@ -3513,7 +3545,7 @@ int RGWRunBucketSourcesSyncCR::operate()
     }
 
     tn->log(10, "took lease");
-    yield call(new RGWReadBucketSourcesInfoCR(sync_env, bucket, tn, &info));
+    yield call(new RGWGetBucketSourcePeersCR(sync_env, bucket, &sources, &bucket_info, tn));
     if (retcode < 0 && retcode != -ENOENT) {
       tn->log(0, "ERROR: failed to read sync status for bucket");
       lease_cr->go_down();
@@ -3521,15 +3553,40 @@ int RGWRunBucketSourcesSyncCR::operate()
       return set_cr_error(retcode);
     }
 
-    if (retcode == -ENOENT) {
-      rgw_bucket_sync_pipe sync_pipe;
-      sync_pipe.init_default(bs);
-      info.pipes.push_back(sync_pipe);
-    }
+    for (siter = sources.begin(); siter != sources.end(); ++siter) {
+      scs.emplace_back();
+      cur_sc = &scs.back();
+      {
+        auto& source_zone = siter->first;
+        auto conn = sync_env->svc->zone->get_zone_conn_by_id(source_zone);
+        if (!conn) {
+          ldpp_dout(sync_env->dpp, 0) << "ERROR: connection object to zone " << source_zone << " does not exist" << dendl;
+          continue;
+        }
+        cur_sc->init(sync_env, conn, siter->first);
+      }
+      for (piter = siter->second.begin(); piter != siter->second.end(); ++piter) {
+        if (!piter->is_rgw()) {
+          continue;
+        }
 
-    yield {
-      for (auto pipe : info.pipes) {
-        spawn(new RGWRunBucketSyncCoroutine(sc, pipe, &tn));
+        sync_pair.source_bs.bucket = piter->bucket;
+        sync_pair.dest_bs.bucket = bucket_info.bucket;
+
+        yield spawn(new RGWRunBucketSyncCoroutine(cur_sc, sync_pair, tn));
+        while (num_spawned() > BUCKET_SYNC_SPAWN_WINDOW) {
+          set_status() << "num_spawned() > spawn_window";
+          yield wait_for_child();
+          bool again = true;
+          while (again) {
+            again = collect(&ret, nullptr);
+            if (ret < 0) {
+              tn->log(10, "a sync operation returned error");
+              /* we have reported this error */
+            }
+            /* not waiting for child here */
+          }
+        }
       }
     }
 
@@ -3541,6 +3598,65 @@ int RGWRunBucketSourcesSyncCR::operate()
   return 0;
 }
 
+class RGWSyncGetBucketInfoCR : public RGWCoroutine {
+  RGWDataSyncEnv *sync_env;
+  rgw_bucket bucket;
+  RGWBucketInfo *pbucket_info;
+  RGWMetaSyncEnv meta_sync_env;
+
+  RGWSyncTraceNodeRef tn;
+
+public:
+  RGWSyncGetBucketInfoCR(RGWDataSyncEnv *_sync_env,
+                         const rgw_bucket& _bucket,
+                         RGWBucketInfo *_pbucket_info,
+                         const RGWSyncTraceNodeRef& _tn_parent)
+    : RGWCoroutine(_sync_env->cct),
+      sync_env(_sync_env),
+      bucket(_bucket),
+      pbucket_info(_pbucket_info),
+      tn(sync_env->sync_tracer->add_node(_tn_parent, "get_bucket_info",
+                                         SSTR(bucket))) {
+  }
+
+  int operate() override;
+};
+
+int RGWSyncGetBucketInfoCR::operate()
+{
+  reenter(this) {
+    yield call(new RGWGetBucketInstanceInfoCR(sync_env->async_rados, sync_env->store, bucket, pbucket_info));
+    if (retcode == -ENOENT) {
+      /* bucket instance info has not been synced in yet, fetch it now */
+      yield {
+        tn->log(10, SSTR("no local info for bucket:" << ": fetching metadata"));
+        string raw_key = string("bucket.instance:") + bucket.get_key();
+
+        meta_sync_env.init(sync_env->dpp, cct, sync_env->store, sync_env->svc->zone->get_master_conn(), sync_env->async_rados,
+                           sync_env->http_manager, sync_env->error_logger, sync_env->sync_tracer);
+
+        call(new RGWMetaSyncSingleEntryCR(&meta_sync_env, raw_key,
+                                          string() /* no marker */,
+                                          MDLOG_STATUS_COMPLETE,
+                                          NULL /* no marker tracker */,
+                                          tn));
+      }
+      if (retcode < 0) {
+        tn->log(0, SSTR("ERROR: failed to fetch bucket instance info for " << bucket_str{bucket}));
+        return set_cr_error(retcode);
+      }
+
+      yield call(new RGWGetBucketInstanceInfoCR(sync_env->async_rados, sync_env->store, bucket, &pbucket_info));
+    }
+    if (retcode < 0) {
+      tn->log(0, SSTR("ERROR: failed to retrieve bucket info for bucket=" << bucket_str{bucket}));
+      return set_cr_error(retcode);
+    }
+
+    return set_cr_done();
+  }
+}
+
 int RGWRunBucketSyncCoroutine::operate()
 {
   reenter(this) {
@@ -3566,7 +3682,7 @@ int RGWRunBucketSyncCoroutine::operate()
     }
 
     tn->log(10, "took lease");
-    yield call(new RGWReadBucketPipeSyncStatusCoroutine(sc, sync_pipe, &sync_status));
+    yield call(new RGWReadBucketPipeSyncStatusCoroutine(sc, sync_pair, &sync_status));
     if (retcode < 0 && retcode != -ENOENT) {
       tn->log(0, "ERROR: failed to read sync status for bucket");
       lease_cr->go_down();
@@ -3574,33 +3690,17 @@ int RGWRunBucketSyncCoroutine::operate()
       return set_cr_error(retcode);
     }
 
-    tn->log(20, SSTR("sync status for bucket: " << sync_status.state));
-
-    yield call(new RGWGetBucketInstanceInfoCR(sync_env->async_rados, sync_env->store, sync_pipe.source_bs.bucket, &sync_pipe.dest_bucket_info));
-    if (retcode == -ENOENT) {
-      /* bucket instance info has not been synced in yet, fetch it now */
-      yield {
-        tn->log(10, SSTR("no local info for bucket:" << ": fetching metadata"));
-        string raw_key = string("bucket.instance:") + sync_pipe.source_bs.bucket.get_key();
-
-        meta_sync_env.init(sync_env->dpp, cct, sync_env->store, sync_env->svc->zone->get_master_conn(), sync_env->async_rados,
-                           sync_env->http_manager, sync_env->error_logger, sync_env->sync_tracer);
-
-        call(new RGWMetaSyncSingleEntryCR(&meta_sync_env, raw_key,
-                                          string() /* no marker */,
-                                          MDLOG_STATUS_COMPLETE,
-                                          NULL /* no marker tracker */,
-                                          tn));
-      }
-      if (retcode < 0) {
-        tn->log(0, SSTR("ERROR: failed to fetch bucket instance info for " << bucket_str{sync_pipe.source_bs.bucket}));
-        lease_cr->go_down();
-        drain_all();
-        return set_cr_error(retcode);
-      }
+    tn->log(20, SSTR("sync status for source bucket: " << sync_status.state));
 
-      yield call(new RGWGetBucketInstanceInfoCR(sync_env->async_rados, sync_env->store, sync_pipe.source_bs.bucket, &sync_pipe.dest_bucket_info));
+    yield call(new RGWSyncGetBucketInfoCR(sync_env, sync_pair.source_bs.bucket, &sync_pipe.source_bucket_info, tn));
+    if (retcode < 0) {
+      tn->log(0, SSTR("ERROR: failed to retrieve bucket info for bucket=" << bucket_str{sync_pipe.info.source_bs.bucket}));
+      lease_cr->go_down();
+      drain_all();
+      return set_cr_error(retcode);
     }
+
+    yield call(new RGWSyncGetBucketInfoCR(sync_env, sync_pair.dest_bs.bucket, &sync_pipe.dest_bucket_info, tn));
     if (retcode < 0) {
       tn->log(0, SSTR("ERROR: failed to retrieve bucket info for bucket=" << bucket_str{sync_pipe.source_bs.bucket}));
       lease_cr->go_down();
@@ -3608,6 +3708,8 @@ int RGWRunBucketSyncCoroutine::operate()
       return set_cr_error(retcode);
     }
 
+    sync_pipe.info = sync_pair;
+
     do {
       if (sync_status.state == rgw_bucket_shard_sync_info::StateInit) {
         yield call(new RGWInitBucketShardSyncStatusCoroutine(sc, sync_pipe, sync_status));
@@ -3680,6 +3782,7 @@ int RGWBucketPipeSyncStatusManager::init()
     return ret;
   }
 
+#warning read specific bucket sources
 
   const string key = bucket.get_key();
 
@@ -3791,9 +3894,13 @@ std::ostream& RGWBucketPipeSyncStatusManager::gen_prefix(std::ostream& out) cons
 }
 
 string RGWBucketPipeSyncStatusManager::status_oid(const string& source_zone,
-                                              const rgw_bucket_sync_pipe& sync_pipe)
+                                              const rgw_bucket_sync_pair_info& sync_pair)
 {
-  return bucket_status_oid_prefix + "." + source_zone + ":" + sync_pipe.source_bs.get_key();
+  if (sync_pair.source_bs == sync_pair.dest_bs) {
+    return bucket_status_oid_prefix + "." + source_zone + ":" + sync_pair.dest_bs.get_key();
+  } else {
+    return bucket_status_oid_prefix + "." + source_zone + ":" + sync_pair.dest_bs.get_key() + ":" + sync_pair.source_bs.get_key();
+  }
 }
 
 string RGWBucketPipeSyncStatusManager::obj_status_oid(const string& source_zone,
index 56cae033562bc2c36cfaf351cf2b250f1409c137..cf0ca89d0ac3e9e8ed4b465eaf6afde0c73b9ac0 100644 (file)
 
 class JSONObj;
 
-struct rgw_bucket_sync_pipe {
+struct rgw_bucket_sync_pair_info {
   rgw_bucket_shard source_bs;
-  RGWBucketInfo dest_bucket_info;
+  rgw_bucket_shard dest_bs;
   string source_prefix;
   string dest_prefix;
 };
 
-inline ostream& operator<<(ostream& out, const rgw_bucket_sync_pipe& p) {
-  if (p.source_bs.bucket == p.dest_bucket_info.bucket &&
+struct rgw_bucket_sync_pipe {
+  rgw_bucket_sync_pair_info info;
+  RGWBucketInfo source_bucket_info;
+  RGWBucketInfo dest_bucket_info;
+};
+
+inline ostream& operator<<(ostream& out, const rgw_bucket_sync_pair_info& p) {
+  if (p.source_bs.bucket == p.dest_bs.bucket &&
       p.source_prefix == p.dest_prefix) {
     return out << p.source_bs;
   }
@@ -37,7 +43,7 @@ inline ostream& operator<<(ostream& out, const rgw_bucket_sync_pipe& p) {
     out << "/" << p.source_prefix;
   }
 
-  out << " -> " << p.dest_bucket_info.bucket;
+  out << " -> " << p.dest_bs.bucket;
 
   if (!p.dest_prefix.empty()) {
     out << "/" << p.dest_prefix;
@@ -766,7 +772,7 @@ public:
   map<int, rgw_bucket_shard_sync_info>& get_sync_status() { return sync_status; }
   int init_sync_status();
 
-  static string status_oid(const string& source_zone, const rgw_bucket_sync_pipe& bs);
+  static string status_oid(const string& source_zone, const rgw_bucket_sync_pair_info& bs);
   static string obj_status_oid(const string& source_zone, const rgw_obj& obj); /* can be used by sync modules */
 
   // implements DoutPrefixProvider