]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: sync: bucket sync manager adjustments for new system
authorYehuda Sadeh <yehuda@redhat.com>
Tue, 29 Oct 2019 21:22:15 +0000 (14:22 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Tue, 28 Jan 2020 18:20:37 +0000 (10:20 -0800)
Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_admin.cc
src/rgw/rgw_cr_rados.cc
src/rgw/rgw_data_sync.cc
src/rgw/rgw_data_sync.h

index 6d8378db37ed7f1bfbad2847a81be02d0fccc333..7771b0a85a8b8201f809aaf413a21d5953fefeb4 100644 (file)
@@ -7579,7 +7579,7 @@ next:
     if (ret < 0) {
       return -ret;
     }
-    RGWBucketPipeSyncStatusManager sync(store, source_zone, bucket);
+    RGWBucketPipeSyncStatusManager sync(store, source_zone, opt_source_bucket, bucket);
 
     ret = sync.init();
     if (ret < 0) {
@@ -7654,7 +7654,7 @@ next:
     if (ret < 0) {
       return -ret;
     }
-    RGWBucketPipeSyncStatusManager sync(store, source_zone, bucket);
+    RGWBucketPipeSyncStatusManager sync(store, source_zone, opt_source_bucket, bucket);
 
     ret = sync.init();
     if (ret < 0) {
@@ -7687,7 +7687,7 @@ next:
     if (ret < 0) {
       return -ret;
     }
-    RGWBucketPipeSyncStatusManager sync(store, source_zone, bucket);
+    RGWBucketPipeSyncStatusManager sync(store, source_zone, opt_source_bucket, bucket);
 
     ret = sync.init();
     if (ret < 0) {
index 9adc54a2455504cd971252e4c2bb7ea04c6e77d1..b34337ff4315eb2868f483ed58480df96e8a8f22 100644 (file)
@@ -7,6 +7,7 @@
 #include "rgw_coroutine.h"
 #include "rgw_cr_rados.h"
 #include "rgw_sync_counters.h"
+#include "rgw_bucket.h"
 
 #include "services/svc_zone.h"
 #include "services/svc_zone_utils.h"
@@ -529,8 +530,13 @@ bool RGWOmapAppend::finish() {
 
 int RGWAsyncGetBucketInstanceInfo::_send_request()
 {
-  RGWSysObjectCtx obj_ctx = store->svc()->sysobj->init_obj_ctx();
-  int r = store->getRados()->get_bucket_instance_info(obj_ctx, bucket, bucket_info, nullptr, nullptr, null_yield);
+  int r;
+  if (!bucket.bucket_id.empty()) {
+    RGWSysObjectCtx obj_ctx = store->svc()->sysobj->init_obj_ctx();
+    r = store->getRados()->get_bucket_instance_info(obj_ctx, bucket, bucket_info, nullptr, nullptr, null_yield);
+  } else {
+    r = store->ctl()->bucket->read_bucket_info(bucket, &bucket_info, null_yield);
+  }
   if (r < 0) {
     ldout(store->ctx(), 0) << "ERROR: failed to get bucket instance info for "
         << bucket << dendl;
index 730b9a342eb59ffe68794de71fd8b2f6ca80e62a..f86d08582c7efed5e65963ddaff81d44190b825b 100644 (file)
@@ -2043,39 +2043,6 @@ string RGWDataSyncStatusManager::shard_obj_name(const string& source_zone, int s
   return string(buf);
 }
 
-RGWRemoteBucketLog::RGWRemoteBucketLog(const DoutPrefixProvider *_dpp,
-                                      rgw::sal::RGWRadosStore *_store,
-                                       RGWAsyncRadosProcessor *_async_rados,
-                                       RGWHTTPManager *_http_manager)
-    : RGWCoroutinesManager(_store->ctx(), _store->getRados()->get_cr_registry()),
-      dpp(_dpp), store(_store),
-      async_rados(_async_rados), http_manager(_http_manager)
-{
-}
-
-int RGWRemoteBucketLog::init(const string& _source_zone, RGWRESTConn *_conn,
-                             const rgw_bucket& source_bucket, int shard_id,
-                             const rgw_bucket& dest_bucket,
-                             RGWSyncErrorLogger *_error_logger,
-                             RGWSyncTraceManager *_sync_tracer,
-                             RGWSyncModuleInstanceRef& _sync_module)
-{
-  conn = _conn;
-  source_zone = _source_zone;
-  sync_pair.source_bs.bucket = source_bucket;
-  sync_pair.source_bs.shard_id = shard_id;
-  sync_pair.dest_bs.bucket = dest_bucket;
-  if (dest_bucket == source_bucket) {
-    sync_pair.dest_bs.shard_id = shard_id;
-  }
-
-  sync_env.init(dpp, store->ctx(), store, store->svc(), async_rados, http_manager,
-                _error_logger, _sync_tracer, _sync_module, nullptr);
-  sc.init(&sync_env, conn, source_zone);
-
-  return 0;
-}
-
 class RGWReadRemoteBucketIndexLogInfoCR : public RGWCoroutine {
   RGWDataSyncCtx *sc;
   RGWDataSyncEnv *sync_env;
@@ -2170,9 +2137,46 @@ public:
   }
 };
 
-RGWCoroutine *RGWRemoteBucketLog::init_sync_status_cr()
+RGWRemoteBucketManager::RGWRemoteBucketManager(const DoutPrefixProvider *_dpp,
+                                               RGWDataSyncEnv *_sync_env,
+                                               const string& _source_zone,
+                                               RGWRESTConn *_conn,
+                                               const RGWBucketInfo& source_bucket_info,
+                                               const rgw_bucket& dest_bucket) : dpp(_dpp), sync_env(_sync_env)
 {
-  return new RGWInitBucketShardSyncStatusCoroutine(&sc, sync_pair, init_status);
+  conn = _conn;
+  source_zone = _source_zone;
+
+  int num_shards = (source_bucket_info.num_shards <= 0 ? 1 : source_bucket_info.num_shards);
+
+  sync_pairs.resize(num_shards);
+
+  int cur_shard = std::min<int>(source_bucket_info.num_shards, 0);
+
+  for (int i = 0; i < num_shards; ++i, ++cur_shard) {
+    auto& sync_pair = sync_pairs[i];
+
+    sync_pair.source_bs.bucket = source_bucket_info.bucket;
+    sync_pair.dest_bs.bucket = dest_bucket;
+
+    sync_pair.source_bs.shard_id = cur_shard;
+
+    if (dest_bucket == source_bucket_info.bucket) {
+      sync_pair.dest_bs.shard_id = sync_pair.source_bs.shard_id;
+    } else {
+      sync_pair.dest_bs.shard_id = -1;
+    }
+  }
+
+  sc.init(sync_env, conn, source_zone);
+}
+
+RGWCoroutine *RGWRemoteBucketManager::init_sync_status_cr(int num)
+{
+  if ((size_t)num >= sync_pairs.size()) {
+    return nullptr;
+  }
+  return new RGWInitBucketShardSyncStatusCoroutine(&sc, sync_pairs[num], init_status);
 }
 
 #define BUCKET_SYNC_ATTR_PREFIX RGW_ATTR_PREFIX "bucket-sync."
@@ -2445,16 +2449,22 @@ int RGWRemoteDataLog::read_shard_status(int shard_id, set<string>& pending_bucke
   return ret;
 }
 
-RGWCoroutine *RGWRemoteBucketLog::read_sync_status_cr(rgw_bucket_shard_sync_info *sync_status)
+RGWCoroutine *RGWRemoteBucketManager::read_sync_status_cr(int num, rgw_bucket_shard_sync_info *sync_status)
 {
-  return new RGWReadBucketPipeSyncStatusCoroutine(&sc, sync_pair, sync_status);
+  if ((size_t)num >= sync_pairs.size()) {
+    return nullptr;
+  }
+
+  return new RGWReadBucketPipeSyncStatusCoroutine(&sc, sync_pairs[num], sync_status);
 }
 
-RGWBucketPipeSyncStatusManager::RGWBucketPipeSyncStatusManager(rgw::sal::RGWRadosStore *_store, const string& _source_zone,
+RGWBucketPipeSyncStatusManager::RGWBucketPipeSyncStatusManager(rgw::sal::RGWRadosStore *_store,
+                                                               std::optional<string> _source_zone,
+                                                               std::optional<rgw_bucket> _source_bucket,
                                                                const rgw_bucket& _dest_bucket) : store(_store),
                                                                                    cr_mgr(_store->ctx(), _store->getRados()->get_cr_registry()),
                                                                                    http_manager(store->ctx(), cr_mgr.get_completion_mgr()),
-                                                                                   source_zone(_source_zone),
+                                                                                   source_zone(_source_zone), source_bucket(_source_bucket),
                                                                                    conn(NULL), error_logger(NULL),
                                                                                    dest_bucket(_dest_bucket),
                                                                                    num_shards(0)
@@ -2463,8 +2473,8 @@ RGWBucketPipeSyncStatusManager::RGWBucketPipeSyncStatusManager(rgw::sal::RGWRado
 
 RGWBucketPipeSyncStatusManager::~RGWBucketPipeSyncStatusManager()
 {
-  for (map<int, RGWRemoteBucketLog *>::iterator iter = source_logs.begin(); iter != source_logs.end(); ++iter) {
-    delete iter->second;
+  for (vector<RGWRemoteBucketManager *>::iterator iter = source_mgrs.begin(); iter != source_mgrs.end(); ++iter) {
+    delete *iter;
   }
   delete error_logger;
 }
@@ -3410,11 +3420,22 @@ class RGWGetBucketSourcePeersCR : public RGWCoroutine {
   RGWDataSyncEnv *sync_env;
   rgw_bucket bucket;
 
+  std::optional<string> source_zone;
+
   map<string, RGWBucketSyncFlowManager::pipe_set> *sources;
+  map<rgw_bucket, RGWBucketInfo> *sources_info;
+  map<rgw_bucket, RGWBucketInfo>::iterator siiter;
   RGWBucketInfo *pbucket_info;
 
   rgw_raw_obj sources_obj;
 
+  bool found_binfo{false};
+  map<string, RGWBucketSyncFlowManager::pipe_set>::iterator siter;
+  map<string, RGWBucketSyncFlowManager::pipe_set>::iterator siter_end;
+  set<rgw_sync_bucket_pipe>::iterator piter;
+
+  std::optional<rgw_bucket> source_bucket;
+
   rgw_bucket_sync_sources_local_info sources_local_info;
   rgw_bucket_sync_sources_local_info expected_local_info;
 
@@ -3426,13 +3447,17 @@ class RGWGetBucketSourcePeersCR : public RGWCoroutine {
 public:
   RGWGetBucketSourcePeersCR(RGWDataSyncEnv *_sync_env,
                             const rgw_bucket& _bucket,
+                            std::optional<string> _source_zone,
                             map<string, RGWBucketSyncFlowManager::pipe_set> *_sources,
+                            map<rgw_bucket, RGWBucketInfo> *_sources_info,
                             RGWBucketInfo *_pbucket_info,
                             const RGWSyncTraceNodeRef& _tn_parent)
     : RGWCoroutine(_sync_env->cct),
       sync_env(_sync_env),
       bucket(_bucket),
+      source_zone(_source_zone),
       sources(_sources),
+      sources_info(_sources_info),
       pbucket_info(_pbucket_info),
       sources_obj(RGWBucketSyncSourcesManager::sync_sources_obj(sync_env->svc->zone, bucket)),
       policy(make_shared<rgw_bucket_get_sync_policy_result>()),
@@ -3443,53 +3468,17 @@ public:
   int operate() override;
 };
 
-int RGWGetBucketSourcePeersCR::operate()
-{
-  reenter(this) {
-    yield call(new RGWSimpleRadosReadCR<rgw_bucket_sync_sources_local_info>(sync_env->async_rados,
-                                                                            sync_env->svc->sysobj,
-                                                                            RGWBucketSyncSourcesManager::sync_sources_obj(sync_env->svc->zone, bucket),
-                                                                            &sources_local_info));
-    if (retcode < 0 &&
-        retcode != -ENOENT) {
-      return set_cr_error(retcode);
-    }
-
-    get_policy_params.bucket = bucket;
-    yield call(new RGWBucketGetSyncPolicyHandlerCR(sync_env->async_rados,
-                                                   sync_env->store,
-                                                   get_policy_params,
-                                                   policy));
-    if (retcode < 0 &&
-        retcode != -ENOENT) {
-      return set_cr_error(retcode);
-    }
-#warning update local copy of sources and act on changes
-
-    {
-      auto& handler = policy->policy_handler;
-
-      *sources = handler->get_sources();
-      auto& binfo = handler->get_bucket_info();
-      if (binfo) {
-        *pbucket_info = *binfo;
-      }
-    }
-
-    return set_cr_done();
-  }
-
-  return 0;
-}
-
 class RGWRunBucketSourcesSyncCR : public RGWCoroutine {
   RGWDataSyncEnv *sync_env;
   rgw_bucket bucket;
   RGWBucketInfo bucket_info;
 
+  std::optional<std::string> source_zone;
+
   rgw_raw_obj sources_obj;
 
   map<string, RGWBucketSyncFlowManager::pipe_set> sources;
+  map<rgw_bucket, RGWBucketInfo> sources_info;
   map<string, RGWBucketSyncFlowManager::pipe_set>::iterator siter;
   set<rgw_sync_bucket_pipe>::iterator piter;
 
@@ -3507,10 +3496,12 @@ class RGWRunBucketSourcesSyncCR : public RGWCoroutine {
 public:
   RGWRunBucketSourcesSyncCR(RGWDataSyncEnv *_sync_env,
                             const rgw_bucket& _bucket,
+                            optional<string> _source_zone,
                             const RGWSyncTraceNodeRef& _tn_parent)
     : RGWCoroutine(_sync_env->cct),
       sync_env(_sync_env),
       bucket(_bucket),
+      source_zone(_source_zone),
       sources_obj(RGWBucketSyncSourcesManager::sync_sources_obj(_sync_env->svc->zone, bucket)),
       tn(sync_env->sync_tracer->add_node(_tn_parent, "bucket_sync_sources",
                                          SSTR(bucket))) {
@@ -3548,7 +3539,7 @@ int RGWRunBucketSourcesSyncCR::operate()
     }
 
     tn->log(10, "took lease");
-    yield call(new RGWGetBucketSourcePeersCR(sync_env, bucket, &sources, &bucket_info, tn));
+    yield call(new RGWGetBucketSourcePeersCR(sync_env, bucket, source_zone, &sources, &sources_info, &bucket_info, tn));
     if (retcode < 0 && retcode != -ENOENT) {
       tn->log(0, "ERROR: failed to read sync status for bucket");
       lease_cr->go_down();
@@ -3560,10 +3551,10 @@ int RGWRunBucketSourcesSyncCR::operate()
       scs.emplace_back();
       cur_sc = &scs.back();
       {
-        auto& source_zone = siter->first;
-        auto conn = sync_env->svc->zone->get_zone_conn_by_id(source_zone);
+        auto& szone = siter->first;
+        auto conn = sync_env->svc->zone->get_zone_conn_by_id(szone);
         if (!conn) {
-          ldpp_dout(sync_env->dpp, 0) << "ERROR: connection object to zone " << source_zone << " does not exist" << dendl;
+          ldpp_dout(sync_env->dpp, 0) << "ERROR: connection object to zone " << szone << " does not exist" << dendl;
           continue;
         }
         cur_sc->init(sync_env, conn, siter->first);
@@ -3658,6 +3649,84 @@ int RGWSyncGetBucketInfoCR::operate()
   return 0;
 }
 
+int RGWGetBucketSourcePeersCR::operate()
+{
+  reenter(this) {
+    yield call(new RGWSimpleRadosReadCR<rgw_bucket_sync_sources_local_info>(sync_env->async_rados,
+                                                                            sync_env->svc->sysobj,
+                                                                            RGWBucketSyncSourcesManager::sync_sources_obj(sync_env->svc->zone, bucket),
+                                                                            &sources_local_info));
+    if (retcode < 0 &&
+        retcode != -ENOENT) {
+      return set_cr_error(retcode);
+    }
+
+    get_policy_params.bucket = bucket;
+    yield call(new RGWBucketGetSyncPolicyHandlerCR(sync_env->async_rados,
+                                                   sync_env->store,
+                                                   get_policy_params,
+                                                   policy));
+    if (retcode < 0 &&
+        retcode != -ENOENT) {
+      return set_cr_error(retcode);
+    }
+#warning update local copy of sources and act on changes
+
+    {
+      auto& handler = policy->policy_handler;
+
+      *sources = handler->get_sources();
+      auto& binfo = handler->get_bucket_info();
+      if (binfo) {
+        *pbucket_info = *binfo;
+        found_binfo = true;
+      }
+    }
+
+    if (sources_info) {
+      if (found_binfo) {
+        (*sources_info)[bucket] = *pbucket_info;
+      }
+
+      siter_end = sources->end();
+      if (source_zone) {
+        siter = sources->find(*source_zone);
+        if (siter != sources->end()) {
+          siter_end = siter;
+          ++siter_end;
+        }
+      } else {
+        siter = sources->begin();
+      }
+      for (; siter != siter_end; ++siter) {
+        for (piter = siter->second.pipes.begin();
+             piter != siter->second.pipes.end();
+             ++piter) {
+          source_bucket = piter->source.bucket;
+          if (!source_bucket) {
+            continue;
+          }
+          if (sources_info->find(*source_bucket) != sources_info->end()) {
+            continue;
+          }
+
+          (*sources_info)[*source_bucket] = RGWBucketInfo(); /* reserve space for it, will fetch it later when map cannot change */
+        }
+      }
+
+      for (siiter = sources_info->begin(); siiter != sources_info->end(); ++siiter) {
+        if (siiter->first != bucket) {
+          yield call(new RGWSyncGetBucketInfoCR(sync_env, siiter->first, &siiter->second, tn));
+        }
+      }
+    }
+
+    return set_cr_done();
+  }
+
+  return 0;
+}
+
 int RGWRunBucketSyncCoroutine::operate()
 {
   reenter(this) {
@@ -3764,61 +3833,82 @@ int RGWRunBucketSyncCoroutine::operate()
   return 0;
 }
 
-RGWCoroutine *RGWRemoteBucketLog::run_sync_cr()
+RGWCoroutine *RGWRemoteBucketManager::run_sync_cr(int num)
 {
-  return new RGWRunBucketSyncCoroutine(&sc, sync_pair, sync_env.sync_tracer->root_node);
+  if ((size_t)num >= sync_pairs.size()) {
+    return nullptr;
+  }
+
+  return new RGWRunBucketSyncCoroutine(&sc, sync_pairs[num], sync_env->sync_tracer->root_node);
 }
 
 int RGWBucketPipeSyncStatusManager::init()
 {
-  conn = store->svc()->zone->get_zone_conn_by_id(source_zone);
-  if (!conn) {
-    ldpp_dout(this, 0) << "connection object to zone " << source_zone << " does not exist" << dendl;
-    return -EINVAL;
-  }
-
   int ret = http_manager.start();
   if (ret < 0) {
     ldpp_dout(this, 0) << "failed in http_manager.start() ret=" << ret << dendl;
     return ret;
   }
 
-#warning read specific bucket sources
-  rgw_bucket source_bucket = dest_bucket;
+  error_logger = new RGWSyncErrorLogger(store, RGW_SYNC_ERROR_LOG_SHARD_PREFIX, ERROR_LOGGER_SHARDS);
 
-  const string key = source_bucket.get_key();
+  sync_module.reset(new RGWDefaultSyncModuleInstance());
+  auto async_rados = store->svc()->rados->get_async_processor();
 
-  rgw_http_param_pair pairs[] = { { "key", key.c_str() },
-                                  { NULL, NULL } };
+  sync_env.init(this, store->ctx(), store,
+                store->svc(), async_rados, &http_manager,
+                error_logger, store->getRados()->get_sync_tracer(),
+                sync_module, nullptr);
 
-  string path = string("/admin/metadata/bucket.instance");
+  map<string, RGWBucketSyncFlowManager::pipe_set> sources;
+  map<rgw_bucket, RGWBucketInfo> sources_info;
+  RGWBucketInfo dest_bucket_info;
 
-  bucket_instance_meta_info result;
-  ret = cr_mgr.run(new RGWReadRESTResourceCR<bucket_instance_meta_info>(store->ctx(), conn, &http_manager, path, pairs, &result));
+  ret = cr_mgr.run(new RGWGetBucketSourcePeersCR(&sync_env,
+                                                 dest_bucket,
+                                                 source_zone,
+                                                 &sources,
+                                                 &sources_info,
+                                                 &dest_bucket_info,
+                                                 sync_env.sync_tracer->root_node));
   if (ret < 0) {
-    ldpp_dout(this, 0) << "ERROR: failed to fetch bucket metadata info from zone=" << source_zone << " path=" << path << " key=" << key << " ret=" << ret << dendl;
+    ldpp_dout(this, 0) << "failed to get bucket source peers info: (ret=" << ret << "): " << cpp_strerror(-ret) << dendl;
     return ret;
   }
 
-  RGWBucketInfo& bi = result.data.get_bucket_info();
-  num_shards = bi.num_shards;
+  for (auto siter : sources) {
+    if (source_zone && siter.first != *source_zone) {
+      continue;
+    }
 
-  error_logger = new RGWSyncErrorLogger(store, RGW_SYNC_ERROR_LOG_SHARD_PREFIX, ERROR_LOGGER_SHARDS);
+    auto& szone = siter.first;
 
-  sync_module.reset(new RGWDefaultSyncModuleInstance());
+    conn = store->svc()->zone->get_zone_conn_by_id(szone);
+    if (!conn) {
+      ldpp_dout(this, 0) << "connection object to zone " << szone << " does not exist" << dendl;
+      return -EINVAL;
+    }
 
-  int effective_num_shards = (num_shards ? num_shards : 1);
+    for (auto& pipe : siter.second.pipes) {
+      auto& source_bucket = pipe.source.bucket;
 
-  auto async_rados = store->svc()->rados->get_async_processor();
+      if (!source_bucket) {
+        continue;
+      }
 
-  for (int i = 0; i < effective_num_shards; i++) {
-    RGWRemoteBucketLog *l = new RGWRemoteBucketLog(this, store, async_rados, &http_manager);
-    ret = l->init(source_zone, conn, source_bucket, (num_shards ? i : -1), dest_bucket, error_logger, store->getRados()->get_sync_tracer(), sync_module);
-    if (ret < 0) {
-      ldpp_dout(this, 0) << "ERROR: failed to initialize RGWRemoteBucketLog object" << dendl;
-      return ret;
+      auto iter = sources_info.find(*source_bucket);
+      if (iter == sources_info.end()) {
+        ldpp_dout(this, 0) << "ERROR: " << __func__ << "(): failed to find bucket info for bucket=" << *source_bucket << ". Likely a bug" << dendl;
+        return -EIO;
+    }
+
+      auto& source_bucket_info = iter->second;
+
+      source_mgrs.push_back(new RGWRemoteBucketManager(this, &sync_env,
+                                                       szone, conn,
+                                                       source_bucket_info,
+                                                       dest_bucket_info.bucket));
     }
-    source_logs[i] = l;
   }
 
   return 0;
@@ -3828,10 +3918,12 @@ int RGWBucketPipeSyncStatusManager::init_sync_status()
 {
   list<RGWCoroutinesStack *> stacks;
 
-  for (map<int, RGWRemoteBucketLog *>::iterator iter = source_logs.begin(); iter != source_logs.end(); ++iter) {
+  for (auto& mgr : source_mgrs) {
     RGWCoroutinesStack *stack = new RGWCoroutinesStack(store->ctx(), &cr_mgr);
-    RGWRemoteBucketLog *l = iter->second;
-    stack->call(l->init_sync_status_cr());
+
+    for (int i = 0; i < mgr->num_pipes(); ++i) {
+      stack->call(mgr->init_sync_status_cr(i));
+    }
 
     stacks.push_back(stack);
   }
@@ -3843,10 +3935,11 @@ int RGWBucketPipeSyncStatusManager::read_sync_status()
 {
   list<RGWCoroutinesStack *> stacks;
 
-  for (map<int, RGWRemoteBucketLog *>::iterator iter = source_logs.begin(); iter != source_logs.end(); ++iter) {
+  for (auto& mgr : source_mgrs) {
     RGWCoroutinesStack *stack = new RGWCoroutinesStack(store->ctx(), &cr_mgr);
-    RGWRemoteBucketLog *l = iter->second;
-    stack->call(l->read_sync_status_cr(&sync_status[iter->first]));
+    for (int i = 0; i < mgr->num_pipes(); ++i) {
+      stack->call(mgr->read_sync_status_cr(i, &sync_status[i]));
+    }
 
     stacks.push_back(stack);
   }
@@ -3865,10 +3958,11 @@ int RGWBucketPipeSyncStatusManager::run()
 {
   list<RGWCoroutinesStack *> stacks;
 
-  for (map<int, RGWRemoteBucketLog *>::iterator iter = source_logs.begin(); iter != source_logs.end(); ++iter) {
+  for (auto& mgr : source_mgrs) {
     RGWCoroutinesStack *stack = new RGWCoroutinesStack(store->ctx(), &cr_mgr);
-    RGWRemoteBucketLog *l = iter->second;
-    stack->call(l->run_sync_cr());
+    for (int i = 0; i < mgr->num_pipes(); ++i) {
+      stack->call(mgr->run_sync_cr(i));
+    }
 
     stacks.push_back(stack);
   }
@@ -3890,9 +3984,9 @@ unsigned RGWBucketPipeSyncStatusManager::get_subsys() const
 
 std::ostream& RGWBucketPipeSyncStatusManager::gen_prefix(std::ostream& out) const
 {
-  auto zone = std::string_view{source_zone};
+  auto zone = std::string_view{source_zone.value_or("*")};
   return out << "bucket sync zone:" << zone.substr(0, 8)
-      << " bucket:" << dest_bucket << ' ';
+    << " bucket:" << dest_bucket << ' ';
 }
 
 string RGWBucketPipeSyncStatusManager::status_oid(const string& source_zone,
index 2adbf1227b8f12743eb1e28a7a0046a3e47f84d5..fc1e0602452b1141d45b0fe4899a1a9fb50f373f 100644 (file)
@@ -293,7 +293,7 @@ struct RGWDataSyncEnv {
             RGWSyncErrorLogger *_error_logger, RGWSyncTraceManager *_sync_tracer,
             RGWSyncModuleInstanceRef& _sync_module,
             PerfCounters* _counters) {
-    dpp = _dpp;
+     dpp = _dpp;
     cct = _cct;
     store = _store;
     svc = _svc;
@@ -555,39 +555,39 @@ struct rgw_bucket_index_marker_info {
 };
 
 
-class RGWRemoteBucketLog : public RGWCoroutinesManager {
+class RGWRemoteBucketManager {
   const DoutPrefixProvider *dpp;
-  rgw::sal::RGWRadosStore *store;
+
+  RGWDataSyncEnv *sync_env;
+
   RGWRESTConn *conn{nullptr};
   string source_zone;
 
-  rgw_bucket_sync_pair_info sync_pair;
+  vector<rgw_bucket_sync_pair_info> sync_pairs;
 
-  RGWAsyncRadosProcessor *async_rados;
-  RGWHTTPManager *http_manager;
-
-  RGWDataSyncEnv sync_env;
   RGWDataSyncCtx sc;
   rgw_bucket_shard_sync_info init_status;
 
   RGWBucketSyncCR *sync_cr{nullptr};
 
 public:
-  RGWRemoteBucketLog(const DoutPrefixProvider *_dpp, rgw::sal::RGWRadosStore *_store,
-                     RGWAsyncRadosProcessor *_async_rados,
-                     RGWHTTPManager *_http_manager);
-
-  int init(const string& _source_zone, RGWRESTConn *_conn,
-           const rgw_bucket& source_bucket, int shard_id,
-           const rgw_bucket& dest_bucket,
-           RGWSyncErrorLogger *_error_logger,
-           RGWSyncTraceManager *_sync_tracer,
-           RGWSyncModuleInstanceRef& _sync_module);
-  void finish();
+  RGWRemoteBucketManager(const DoutPrefixProvider *_dpp,
+                     RGWDataSyncEnv *_sync_env,
+                     const string& _source_zone, RGWRESTConn *_conn,
+                     const RGWBucketInfo& source_bucket_info,
+                     const rgw_bucket& dest_bucket);
 
-  RGWCoroutine *read_sync_status_cr(rgw_bucket_shard_sync_info *sync_status);
-  RGWCoroutine *init_sync_status_cr();
-  RGWCoroutine *run_sync_cr();
+  void init(const string& _source_zone, RGWRESTConn *_conn,
+            const rgw_bucket& source_bucket, int shard_id,
+            const rgw_bucket& dest_bucket);
+
+  RGWCoroutine *read_sync_status_cr(int num, rgw_bucket_shard_sync_info *sync_status);
+  RGWCoroutine *init_sync_status_cr(int num);
+  RGWCoroutine *run_sync_cr(int num);
+
+  int num_pipes() {
+    return sync_pairs.size();
+  }
 
   void wakeup();
 };
@@ -595,18 +595,22 @@ public:
 class RGWBucketPipeSyncStatusManager : public DoutPrefixProvider {
   rgw::sal::RGWRadosStore *store;
 
+  RGWDataSyncEnv sync_env;
+
   RGWCoroutinesManager cr_mgr;
 
   RGWHTTPManager http_manager;
 
-  string source_zone;
+  std::optional<string> source_zone;
+  std::optional<rgw_bucket> source_bucket;
+
   RGWRESTConn *conn;
   RGWSyncErrorLogger *error_logger;
   RGWSyncModuleInstanceRef sync_module;
 
   rgw_bucket dest_bucket;
 
-  map<int, RGWRemoteBucketLog *> source_logs;
+  vector<RGWRemoteBucketManager *> source_mgrs;
 
   string source_status_oid;
   string source_shard_status_oid_prefix;
@@ -618,7 +622,8 @@ class RGWBucketPipeSyncStatusManager : public DoutPrefixProvider {
 
 public:
   RGWBucketPipeSyncStatusManager(rgw::sal::RGWRadosStore *_store,
-                             const string& _source_zone,
+                             std::optional<string> _source_zone,
+                             std::optional<rgw_bucket> _source_bucket,
                              const rgw_bucket& dest_bucket);
   ~RGWBucketPipeSyncStatusManager();