From e795acc08e8ebdbb53d4a2018e5e1c1f77ec89b1 Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Tue, 29 Oct 2019 14:22:15 -0700 Subject: [PATCH] rgw: sync: bucket sync manager adjustments for new system Signed-off-by: Yehuda Sadeh --- src/rgw/rgw_admin.cc | 6 +- src/rgw/rgw_cr_rados.cc | 10 +- src/rgw/rgw_data_sync.cc | 344 +++++++++++++++++++++++++-------------- src/rgw/rgw_data_sync.h | 55 ++++--- 4 files changed, 260 insertions(+), 155 deletions(-) diff --git a/src/rgw/rgw_admin.cc b/src/rgw/rgw_admin.cc index 6d8378db37ed7..7771b0a85a8b8 100644 --- a/src/rgw/rgw_admin.cc +++ b/src/rgw/rgw_admin.cc @@ -7579,7 +7579,7 @@ next: if (ret < 0) { return -ret; } - RGWBucketPipeSyncStatusManager sync(store, source_zone, bucket); + RGWBucketPipeSyncStatusManager sync(store, source_zone, opt_source_bucket, bucket); ret = sync.init(); if (ret < 0) { @@ -7654,7 +7654,7 @@ next: if (ret < 0) { return -ret; } - RGWBucketPipeSyncStatusManager sync(store, source_zone, bucket); + RGWBucketPipeSyncStatusManager sync(store, source_zone, opt_source_bucket, bucket); ret = sync.init(); if (ret < 0) { @@ -7687,7 +7687,7 @@ next: if (ret < 0) { return -ret; } - RGWBucketPipeSyncStatusManager sync(store, source_zone, bucket); + RGWBucketPipeSyncStatusManager sync(store, source_zone, opt_source_bucket, bucket); ret = sync.init(); if (ret < 0) { diff --git a/src/rgw/rgw_cr_rados.cc b/src/rgw/rgw_cr_rados.cc index 9adc54a245550..b34337ff4315e 100644 --- a/src/rgw/rgw_cr_rados.cc +++ b/src/rgw/rgw_cr_rados.cc @@ -7,6 +7,7 @@ #include "rgw_coroutine.h" #include "rgw_cr_rados.h" #include "rgw_sync_counters.h" +#include "rgw_bucket.h" #include "services/svc_zone.h" #include "services/svc_zone_utils.h" @@ -529,8 +530,13 @@ bool RGWOmapAppend::finish() { int RGWAsyncGetBucketInstanceInfo::_send_request() { - RGWSysObjectCtx obj_ctx = store->svc()->sysobj->init_obj_ctx(); - int r = store->getRados()->get_bucket_instance_info(obj_ctx, bucket, bucket_info, nullptr, nullptr, null_yield); + int r; + if (!bucket.bucket_id.empty()) { + RGWSysObjectCtx obj_ctx = store->svc()->sysobj->init_obj_ctx(); + r = store->getRados()->get_bucket_instance_info(obj_ctx, bucket, bucket_info, nullptr, nullptr, null_yield); + } else { + r = store->ctl()->bucket->read_bucket_info(bucket, &bucket_info, null_yield); + } if (r < 0) { ldout(store->ctx(), 0) << "ERROR: failed to get bucket instance info for " << bucket << dendl; diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index 730b9a342eb59..f86d08582c7ef 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -2043,39 +2043,6 @@ string RGWDataSyncStatusManager::shard_obj_name(const string& source_zone, int s return string(buf); } -RGWRemoteBucketLog::RGWRemoteBucketLog(const DoutPrefixProvider *_dpp, - rgw::sal::RGWRadosStore *_store, - RGWAsyncRadosProcessor *_async_rados, - RGWHTTPManager *_http_manager) - : RGWCoroutinesManager(_store->ctx(), _store->getRados()->get_cr_registry()), - dpp(_dpp), store(_store), - async_rados(_async_rados), http_manager(_http_manager) -{ -} - -int RGWRemoteBucketLog::init(const string& _source_zone, RGWRESTConn *_conn, - const rgw_bucket& source_bucket, int shard_id, - const rgw_bucket& dest_bucket, - RGWSyncErrorLogger *_error_logger, - RGWSyncTraceManager *_sync_tracer, - RGWSyncModuleInstanceRef& _sync_module) -{ - conn = _conn; - source_zone = _source_zone; - sync_pair.source_bs.bucket = source_bucket; - sync_pair.source_bs.shard_id = shard_id; - sync_pair.dest_bs.bucket = dest_bucket; - if (dest_bucket == source_bucket) { - sync_pair.dest_bs.shard_id = shard_id; - } - - sync_env.init(dpp, store->ctx(), store, store->svc(), async_rados, http_manager, - _error_logger, _sync_tracer, _sync_module, nullptr); - sc.init(&sync_env, conn, source_zone); - - return 0; -} - class RGWReadRemoteBucketIndexLogInfoCR : public RGWCoroutine { RGWDataSyncCtx *sc; RGWDataSyncEnv *sync_env; @@ -2170,9 +2137,46 @@ public: } }; -RGWCoroutine *RGWRemoteBucketLog::init_sync_status_cr() +RGWRemoteBucketManager::RGWRemoteBucketManager(const DoutPrefixProvider *_dpp, + RGWDataSyncEnv *_sync_env, + const string& _source_zone, + RGWRESTConn *_conn, + const RGWBucketInfo& source_bucket_info, + const rgw_bucket& dest_bucket) : dpp(_dpp), sync_env(_sync_env) { - return new RGWInitBucketShardSyncStatusCoroutine(&sc, sync_pair, init_status); + conn = _conn; + source_zone = _source_zone; + + int num_shards = (source_bucket_info.num_shards <= 0 ? 1 : source_bucket_info.num_shards); + + sync_pairs.resize(num_shards); + + int cur_shard = std::min(source_bucket_info.num_shards, 0); + + for (int i = 0; i < num_shards; ++i, ++cur_shard) { + auto& sync_pair = sync_pairs[i]; + + sync_pair.source_bs.bucket = source_bucket_info.bucket; + sync_pair.dest_bs.bucket = dest_bucket; + + sync_pair.source_bs.shard_id = cur_shard; + + if (dest_bucket == source_bucket_info.bucket) { + sync_pair.dest_bs.shard_id = sync_pair.source_bs.shard_id; + } else { + sync_pair.dest_bs.shard_id = -1; + } + } + + sc.init(sync_env, conn, source_zone); +} + +RGWCoroutine *RGWRemoteBucketManager::init_sync_status_cr(int num) +{ + if ((size_t)num >= sync_pairs.size()) { + return nullptr; + } + return new RGWInitBucketShardSyncStatusCoroutine(&sc, sync_pairs[num], init_status); } #define BUCKET_SYNC_ATTR_PREFIX RGW_ATTR_PREFIX "bucket-sync." @@ -2445,16 +2449,22 @@ int RGWRemoteDataLog::read_shard_status(int shard_id, set& pending_bucke return ret; } -RGWCoroutine *RGWRemoteBucketLog::read_sync_status_cr(rgw_bucket_shard_sync_info *sync_status) +RGWCoroutine *RGWRemoteBucketManager::read_sync_status_cr(int num, rgw_bucket_shard_sync_info *sync_status) { - return new RGWReadBucketPipeSyncStatusCoroutine(&sc, sync_pair, sync_status); + if ((size_t)num >= sync_pairs.size()) { + return nullptr; + } + + return new RGWReadBucketPipeSyncStatusCoroutine(&sc, sync_pairs[num], sync_status); } -RGWBucketPipeSyncStatusManager::RGWBucketPipeSyncStatusManager(rgw::sal::RGWRadosStore *_store, const string& _source_zone, +RGWBucketPipeSyncStatusManager::RGWBucketPipeSyncStatusManager(rgw::sal::RGWRadosStore *_store, + std::optional _source_zone, + std::optional _source_bucket, const rgw_bucket& _dest_bucket) : store(_store), cr_mgr(_store->ctx(), _store->getRados()->get_cr_registry()), http_manager(store->ctx(), cr_mgr.get_completion_mgr()), - source_zone(_source_zone), + source_zone(_source_zone), source_bucket(_source_bucket), conn(NULL), error_logger(NULL), dest_bucket(_dest_bucket), num_shards(0) @@ -2463,8 +2473,8 @@ RGWBucketPipeSyncStatusManager::RGWBucketPipeSyncStatusManager(rgw::sal::RGWRado RGWBucketPipeSyncStatusManager::~RGWBucketPipeSyncStatusManager() { - for (map::iterator iter = source_logs.begin(); iter != source_logs.end(); ++iter) { - delete iter->second; + for (vector::iterator iter = source_mgrs.begin(); iter != source_mgrs.end(); ++iter) { + delete *iter; } delete error_logger; } @@ -3410,11 +3420,22 @@ class RGWGetBucketSourcePeersCR : public RGWCoroutine { RGWDataSyncEnv *sync_env; rgw_bucket bucket; + std::optional source_zone; + map *sources; + map *sources_info; + map::iterator siiter; RGWBucketInfo *pbucket_info; rgw_raw_obj sources_obj; + bool found_binfo{false}; + map::iterator siter; + map::iterator siter_end; + set::iterator piter; + + std::optional source_bucket; + rgw_bucket_sync_sources_local_info sources_local_info; rgw_bucket_sync_sources_local_info expected_local_info; @@ -3426,13 +3447,17 @@ class RGWGetBucketSourcePeersCR : public RGWCoroutine { public: RGWGetBucketSourcePeersCR(RGWDataSyncEnv *_sync_env, const rgw_bucket& _bucket, + std::optional _source_zone, map *_sources, + map *_sources_info, RGWBucketInfo *_pbucket_info, const RGWSyncTraceNodeRef& _tn_parent) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bucket(_bucket), + source_zone(_source_zone), sources(_sources), + sources_info(_sources_info), pbucket_info(_pbucket_info), sources_obj(RGWBucketSyncSourcesManager::sync_sources_obj(sync_env->svc->zone, bucket)), policy(make_shared()), @@ -3443,53 +3468,17 @@ public: int operate() override; }; -int RGWGetBucketSourcePeersCR::operate() -{ - reenter(this) { - yield call(new RGWSimpleRadosReadCR(sync_env->async_rados, - sync_env->svc->sysobj, - RGWBucketSyncSourcesManager::sync_sources_obj(sync_env->svc->zone, bucket), - &sources_local_info)); - if (retcode < 0 && - retcode != -ENOENT) { - return set_cr_error(retcode); - } - - get_policy_params.bucket = bucket; - yield call(new RGWBucketGetSyncPolicyHandlerCR(sync_env->async_rados, - sync_env->store, - get_policy_params, - policy)); - if (retcode < 0 && - retcode != -ENOENT) { - return set_cr_error(retcode); - } -#warning update local copy of sources and act on changes - - { - auto& handler = policy->policy_handler; - - *sources = handler->get_sources(); - auto& binfo = handler->get_bucket_info(); - if (binfo) { - *pbucket_info = *binfo; - } - } - - return set_cr_done(); - } - - return 0; -} - class RGWRunBucketSourcesSyncCR : public RGWCoroutine { RGWDataSyncEnv *sync_env; rgw_bucket bucket; RGWBucketInfo bucket_info; + std::optional source_zone; + rgw_raw_obj sources_obj; map sources; + map sources_info; map::iterator siter; set::iterator piter; @@ -3507,10 +3496,12 @@ class RGWRunBucketSourcesSyncCR : public RGWCoroutine { public: RGWRunBucketSourcesSyncCR(RGWDataSyncEnv *_sync_env, const rgw_bucket& _bucket, + optional _source_zone, const RGWSyncTraceNodeRef& _tn_parent) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bucket(_bucket), + source_zone(_source_zone), sources_obj(RGWBucketSyncSourcesManager::sync_sources_obj(_sync_env->svc->zone, bucket)), tn(sync_env->sync_tracer->add_node(_tn_parent, "bucket_sync_sources", SSTR(bucket))) { @@ -3548,7 +3539,7 @@ int RGWRunBucketSourcesSyncCR::operate() } tn->log(10, "took lease"); - yield call(new RGWGetBucketSourcePeersCR(sync_env, bucket, &sources, &bucket_info, tn)); + yield call(new RGWGetBucketSourcePeersCR(sync_env, bucket, source_zone, &sources, &sources_info, &bucket_info, tn)); if (retcode < 0 && retcode != -ENOENT) { tn->log(0, "ERROR: failed to read sync status for bucket"); lease_cr->go_down(); @@ -3560,10 +3551,10 @@ int RGWRunBucketSourcesSyncCR::operate() scs.emplace_back(); cur_sc = &scs.back(); { - auto& source_zone = siter->first; - auto conn = sync_env->svc->zone->get_zone_conn_by_id(source_zone); + auto& szone = siter->first; + auto conn = sync_env->svc->zone->get_zone_conn_by_id(szone); if (!conn) { - ldpp_dout(sync_env->dpp, 0) << "ERROR: connection object to zone " << source_zone << " does not exist" << dendl; + ldpp_dout(sync_env->dpp, 0) << "ERROR: connection object to zone " << szone << " does not exist" << dendl; continue; } cur_sc->init(sync_env, conn, siter->first); @@ -3658,6 +3649,84 @@ int RGWSyncGetBucketInfoCR::operate() return 0; } +int RGWGetBucketSourcePeersCR::operate() +{ + reenter(this) { + yield call(new RGWSimpleRadosReadCR(sync_env->async_rados, + sync_env->svc->sysobj, + RGWBucketSyncSourcesManager::sync_sources_obj(sync_env->svc->zone, bucket), + &sources_local_info)); + if (retcode < 0 && + retcode != -ENOENT) { + return set_cr_error(retcode); + } + + get_policy_params.bucket = bucket; + yield call(new RGWBucketGetSyncPolicyHandlerCR(sync_env->async_rados, + sync_env->store, + get_policy_params, + policy)); + if (retcode < 0 && + retcode != -ENOENT) { + return set_cr_error(retcode); + } +#warning update local copy of sources and act on changes + + { + auto& handler = policy->policy_handler; + + *sources = handler->get_sources(); + auto& binfo = handler->get_bucket_info(); + if (binfo) { + *pbucket_info = *binfo; + found_binfo = true; + } + } + + if (sources_info) { + if (found_binfo) { + (*sources_info)[bucket] = *pbucket_info; + } + + siter_end = sources->end(); + if (source_zone) { + siter = sources->find(*source_zone); + if (siter != sources->end()) { + siter_end = siter; + ++siter_end; + } + } else { + siter = sources->begin(); + } + for (; siter != siter_end; ++siter) { + for (piter = siter->second.pipes.begin(); + piter != siter->second.pipes.end(); + ++piter) { + source_bucket = piter->source.bucket; + if (!source_bucket) { + continue; + } + if (sources_info->find(*source_bucket) != sources_info->end()) { + continue; + } + + (*sources_info)[*source_bucket] = RGWBucketInfo(); /* reserve space for it, will fetch it later when map cannot change */ + } + } + + for (siiter = sources_info->begin(); siiter != sources_info->end(); ++siiter) { + if (siiter->first != bucket) { + yield call(new RGWSyncGetBucketInfoCR(sync_env, siiter->first, &siiter->second, tn)); + } + } + } + + return set_cr_done(); + } + + return 0; +} + int RGWRunBucketSyncCoroutine::operate() { reenter(this) { @@ -3764,61 +3833,82 @@ int RGWRunBucketSyncCoroutine::operate() return 0; } -RGWCoroutine *RGWRemoteBucketLog::run_sync_cr() +RGWCoroutine *RGWRemoteBucketManager::run_sync_cr(int num) { - return new RGWRunBucketSyncCoroutine(&sc, sync_pair, sync_env.sync_tracer->root_node); + if ((size_t)num >= sync_pairs.size()) { + return nullptr; + } + + return new RGWRunBucketSyncCoroutine(&sc, sync_pairs[num], sync_env->sync_tracer->root_node); } int RGWBucketPipeSyncStatusManager::init() { - conn = store->svc()->zone->get_zone_conn_by_id(source_zone); - if (!conn) { - ldpp_dout(this, 0) << "connection object to zone " << source_zone << " does not exist" << dendl; - return -EINVAL; - } - int ret = http_manager.start(); if (ret < 0) { ldpp_dout(this, 0) << "failed in http_manager.start() ret=" << ret << dendl; return ret; } -#warning read specific bucket sources - rgw_bucket source_bucket = dest_bucket; + error_logger = new RGWSyncErrorLogger(store, RGW_SYNC_ERROR_LOG_SHARD_PREFIX, ERROR_LOGGER_SHARDS); - const string key = source_bucket.get_key(); + sync_module.reset(new RGWDefaultSyncModuleInstance()); + auto async_rados = store->svc()->rados->get_async_processor(); - rgw_http_param_pair pairs[] = { { "key", key.c_str() }, - { NULL, NULL } }; + sync_env.init(this, store->ctx(), store, + store->svc(), async_rados, &http_manager, + error_logger, store->getRados()->get_sync_tracer(), + sync_module, nullptr); - string path = string("/admin/metadata/bucket.instance"); + map sources; + map sources_info; + RGWBucketInfo dest_bucket_info; - bucket_instance_meta_info result; - ret = cr_mgr.run(new RGWReadRESTResourceCR(store->ctx(), conn, &http_manager, path, pairs, &result)); + ret = cr_mgr.run(new RGWGetBucketSourcePeersCR(&sync_env, + dest_bucket, + source_zone, + &sources, + &sources_info, + &dest_bucket_info, + sync_env.sync_tracer->root_node)); if (ret < 0) { - ldpp_dout(this, 0) << "ERROR: failed to fetch bucket metadata info from zone=" << source_zone << " path=" << path << " key=" << key << " ret=" << ret << dendl; + ldpp_dout(this, 0) << "failed to get bucket source peers info: (ret=" << ret << "): " << cpp_strerror(-ret) << dendl; return ret; } - RGWBucketInfo& bi = result.data.get_bucket_info(); - num_shards = bi.num_shards; + for (auto siter : sources) { + if (source_zone && siter.first != *source_zone) { + continue; + } - error_logger = new RGWSyncErrorLogger(store, RGW_SYNC_ERROR_LOG_SHARD_PREFIX, ERROR_LOGGER_SHARDS); + auto& szone = siter.first; - sync_module.reset(new RGWDefaultSyncModuleInstance()); + conn = store->svc()->zone->get_zone_conn_by_id(szone); + if (!conn) { + ldpp_dout(this, 0) << "connection object to zone " << szone << " does not exist" << dendl; + return -EINVAL; + } - int effective_num_shards = (num_shards ? num_shards : 1); + for (auto& pipe : siter.second.pipes) { + auto& source_bucket = pipe.source.bucket; - auto async_rados = store->svc()->rados->get_async_processor(); + if (!source_bucket) { + continue; + } - for (int i = 0; i < effective_num_shards; i++) { - RGWRemoteBucketLog *l = new RGWRemoteBucketLog(this, store, async_rados, &http_manager); - ret = l->init(source_zone, conn, source_bucket, (num_shards ? i : -1), dest_bucket, error_logger, store->getRados()->get_sync_tracer(), sync_module); - if (ret < 0) { - ldpp_dout(this, 0) << "ERROR: failed to initialize RGWRemoteBucketLog object" << dendl; - return ret; + auto iter = sources_info.find(*source_bucket); + if (iter == sources_info.end()) { + ldpp_dout(this, 0) << "ERROR: " << __func__ << "(): failed to find bucket info for bucket=" << *source_bucket << ". Likely a bug" << dendl; + return -EIO; + } + + auto& source_bucket_info = iter->second; + + source_mgrs.push_back(new RGWRemoteBucketManager(this, &sync_env, + szone, conn, + source_bucket_info, + dest_bucket_info.bucket)); } - source_logs[i] = l; } return 0; @@ -3828,10 +3918,12 @@ int RGWBucketPipeSyncStatusManager::init_sync_status() { list stacks; - for (map::iterator iter = source_logs.begin(); iter != source_logs.end(); ++iter) { + for (auto& mgr : source_mgrs) { RGWCoroutinesStack *stack = new RGWCoroutinesStack(store->ctx(), &cr_mgr); - RGWRemoteBucketLog *l = iter->second; - stack->call(l->init_sync_status_cr()); + + for (int i = 0; i < mgr->num_pipes(); ++i) { + stack->call(mgr->init_sync_status_cr(i)); + } stacks.push_back(stack); } @@ -3843,10 +3935,11 @@ int RGWBucketPipeSyncStatusManager::read_sync_status() { list stacks; - for (map::iterator iter = source_logs.begin(); iter != source_logs.end(); ++iter) { + for (auto& mgr : source_mgrs) { RGWCoroutinesStack *stack = new RGWCoroutinesStack(store->ctx(), &cr_mgr); - RGWRemoteBucketLog *l = iter->second; - stack->call(l->read_sync_status_cr(&sync_status[iter->first])); + for (int i = 0; i < mgr->num_pipes(); ++i) { + stack->call(mgr->read_sync_status_cr(i, &sync_status[i])); + } stacks.push_back(stack); } @@ -3865,10 +3958,11 @@ int RGWBucketPipeSyncStatusManager::run() { list stacks; - for (map::iterator iter = source_logs.begin(); iter != source_logs.end(); ++iter) { + for (auto& mgr : source_mgrs) { RGWCoroutinesStack *stack = new RGWCoroutinesStack(store->ctx(), &cr_mgr); - RGWRemoteBucketLog *l = iter->second; - stack->call(l->run_sync_cr()); + for (int i = 0; i < mgr->num_pipes(); ++i) { + stack->call(mgr->run_sync_cr(i)); + } stacks.push_back(stack); } @@ -3890,9 +3984,9 @@ unsigned RGWBucketPipeSyncStatusManager::get_subsys() const std::ostream& RGWBucketPipeSyncStatusManager::gen_prefix(std::ostream& out) const { - auto zone = std::string_view{source_zone}; + auto zone = std::string_view{source_zone.value_or("*")}; return out << "bucket sync zone:" << zone.substr(0, 8) - << " bucket:" << dest_bucket << ' '; + << " bucket:" << dest_bucket << ' '; } string RGWBucketPipeSyncStatusManager::status_oid(const string& source_zone, diff --git a/src/rgw/rgw_data_sync.h b/src/rgw/rgw_data_sync.h index 2adbf1227b8f1..fc1e0602452b1 100644 --- a/src/rgw/rgw_data_sync.h +++ b/src/rgw/rgw_data_sync.h @@ -293,7 +293,7 @@ struct RGWDataSyncEnv { RGWSyncErrorLogger *_error_logger, RGWSyncTraceManager *_sync_tracer, RGWSyncModuleInstanceRef& _sync_module, PerfCounters* _counters) { - dpp = _dpp; + dpp = _dpp; cct = _cct; store = _store; svc = _svc; @@ -555,39 +555,39 @@ struct rgw_bucket_index_marker_info { }; -class RGWRemoteBucketLog : public RGWCoroutinesManager { +class RGWRemoteBucketManager { const DoutPrefixProvider *dpp; - rgw::sal::RGWRadosStore *store; + + RGWDataSyncEnv *sync_env; + RGWRESTConn *conn{nullptr}; string source_zone; - rgw_bucket_sync_pair_info sync_pair; + vector sync_pairs; - RGWAsyncRadosProcessor *async_rados; - RGWHTTPManager *http_manager; - - RGWDataSyncEnv sync_env; RGWDataSyncCtx sc; rgw_bucket_shard_sync_info init_status; RGWBucketSyncCR *sync_cr{nullptr}; public: - RGWRemoteBucketLog(const DoutPrefixProvider *_dpp, rgw::sal::RGWRadosStore *_store, - RGWAsyncRadosProcessor *_async_rados, - RGWHTTPManager *_http_manager); - - int init(const string& _source_zone, RGWRESTConn *_conn, - const rgw_bucket& source_bucket, int shard_id, - const rgw_bucket& dest_bucket, - RGWSyncErrorLogger *_error_logger, - RGWSyncTraceManager *_sync_tracer, - RGWSyncModuleInstanceRef& _sync_module); - void finish(); + RGWRemoteBucketManager(const DoutPrefixProvider *_dpp, + RGWDataSyncEnv *_sync_env, + const string& _source_zone, RGWRESTConn *_conn, + const RGWBucketInfo& source_bucket_info, + const rgw_bucket& dest_bucket); - RGWCoroutine *read_sync_status_cr(rgw_bucket_shard_sync_info *sync_status); - RGWCoroutine *init_sync_status_cr(); - RGWCoroutine *run_sync_cr(); + void init(const string& _source_zone, RGWRESTConn *_conn, + const rgw_bucket& source_bucket, int shard_id, + const rgw_bucket& dest_bucket); + + RGWCoroutine *read_sync_status_cr(int num, rgw_bucket_shard_sync_info *sync_status); + RGWCoroutine *init_sync_status_cr(int num); + RGWCoroutine *run_sync_cr(int num); + + int num_pipes() { + return sync_pairs.size(); + } void wakeup(); }; @@ -595,18 +595,22 @@ public: class RGWBucketPipeSyncStatusManager : public DoutPrefixProvider { rgw::sal::RGWRadosStore *store; + RGWDataSyncEnv sync_env; + RGWCoroutinesManager cr_mgr; RGWHTTPManager http_manager; - string source_zone; + std::optional source_zone; + std::optional source_bucket; + RGWRESTConn *conn; RGWSyncErrorLogger *error_logger; RGWSyncModuleInstanceRef sync_module; rgw_bucket dest_bucket; - map source_logs; + vector source_mgrs; string source_status_oid; string source_shard_status_oid_prefix; @@ -618,7 +622,8 @@ class RGWBucketPipeSyncStatusManager : public DoutPrefixProvider { public: RGWBucketPipeSyncStatusManager(rgw::sal::RGWRadosStore *_store, - const string& _source_zone, + std::optional _source_zone, + std::optional _source_bucket, const rgw_bucket& dest_bucket); ~RGWBucketPipeSyncStatusManager(); -- 2.39.5