return string(buf);
}
-RGWRemoteBucketLog::RGWRemoteBucketLog(const DoutPrefixProvider *_dpp,
- rgw::sal::RGWRadosStore *_store,
- RGWAsyncRadosProcessor *_async_rados,
- RGWHTTPManager *_http_manager)
- : RGWCoroutinesManager(_store->ctx(), _store->getRados()->get_cr_registry()),
- dpp(_dpp), store(_store),
- async_rados(_async_rados), http_manager(_http_manager)
-{
-}
-
-int RGWRemoteBucketLog::init(const string& _source_zone, RGWRESTConn *_conn,
- const rgw_bucket& source_bucket, int shard_id,
- const rgw_bucket& dest_bucket,
- RGWSyncErrorLogger *_error_logger,
- RGWSyncTraceManager *_sync_tracer,
- RGWSyncModuleInstanceRef& _sync_module)
-{
- conn = _conn;
- source_zone = _source_zone;
- sync_pair.source_bs.bucket = source_bucket;
- sync_pair.source_bs.shard_id = shard_id;
- sync_pair.dest_bs.bucket = dest_bucket;
- if (dest_bucket == source_bucket) {
- sync_pair.dest_bs.shard_id = shard_id;
- }
-
- sync_env.init(dpp, store->ctx(), store, store->svc(), async_rados, http_manager,
- _error_logger, _sync_tracer, _sync_module, nullptr);
- sc.init(&sync_env, conn, source_zone);
-
- return 0;
-}
-
class RGWReadRemoteBucketIndexLogInfoCR : public RGWCoroutine {
RGWDataSyncCtx *sc;
RGWDataSyncEnv *sync_env;
}
};
-RGWCoroutine *RGWRemoteBucketLog::init_sync_status_cr()
+RGWRemoteBucketManager::RGWRemoteBucketManager(const DoutPrefixProvider *_dpp,
+ RGWDataSyncEnv *_sync_env,
+ const string& _source_zone,
+ RGWRESTConn *_conn,
+ const RGWBucketInfo& source_bucket_info,
+ const rgw_bucket& dest_bucket) : dpp(_dpp), sync_env(_sync_env)
{
- return new RGWInitBucketShardSyncStatusCoroutine(&sc, sync_pair, init_status);
+ conn = _conn;
+ source_zone = _source_zone;
+
+ int num_shards = (source_bucket_info.num_shards <= 0 ? 1 : source_bucket_info.num_shards);
+
+ sync_pairs.resize(num_shards);
+
+ int cur_shard = std::min<int>(source_bucket_info.num_shards, 0);
+
+ for (int i = 0; i < num_shards; ++i, ++cur_shard) {
+ auto& sync_pair = sync_pairs[i];
+
+ sync_pair.source_bs.bucket = source_bucket_info.bucket;
+ sync_pair.dest_bs.bucket = dest_bucket;
+
+ sync_pair.source_bs.shard_id = cur_shard;
+
+ if (dest_bucket == source_bucket_info.bucket) {
+ sync_pair.dest_bs.shard_id = sync_pair.source_bs.shard_id;
+ } else {
+ sync_pair.dest_bs.shard_id = -1;
+ }
+ }
+
+ sc.init(sync_env, conn, source_zone);
+}
+
+RGWCoroutine *RGWRemoteBucketManager::init_sync_status_cr(int num)
+{
+ if ((size_t)num >= sync_pairs.size()) {
+ return nullptr;
+ }
+ return new RGWInitBucketShardSyncStatusCoroutine(&sc, sync_pairs[num], init_status);
}
#define BUCKET_SYNC_ATTR_PREFIX RGW_ATTR_PREFIX "bucket-sync."
return ret;
}
-RGWCoroutine *RGWRemoteBucketLog::read_sync_status_cr(rgw_bucket_shard_sync_info *sync_status)
+RGWCoroutine *RGWRemoteBucketManager::read_sync_status_cr(int num, rgw_bucket_shard_sync_info *sync_status)
{
- return new RGWReadBucketPipeSyncStatusCoroutine(&sc, sync_pair, sync_status);
+ if ((size_t)num >= sync_pairs.size()) {
+ return nullptr;
+ }
+
+ return new RGWReadBucketPipeSyncStatusCoroutine(&sc, sync_pairs[num], sync_status);
}
-RGWBucketPipeSyncStatusManager::RGWBucketPipeSyncStatusManager(rgw::sal::RGWRadosStore *_store, const string& _source_zone,
+RGWBucketPipeSyncStatusManager::RGWBucketPipeSyncStatusManager(rgw::sal::RGWRadosStore *_store,
+ std::optional<string> _source_zone,
+ std::optional<rgw_bucket> _source_bucket,
const rgw_bucket& _dest_bucket) : store(_store),
cr_mgr(_store->ctx(), _store->getRados()->get_cr_registry()),
http_manager(store->ctx(), cr_mgr.get_completion_mgr()),
- source_zone(_source_zone),
+ source_zone(_source_zone), source_bucket(_source_bucket),
conn(NULL), error_logger(NULL),
dest_bucket(_dest_bucket),
num_shards(0)
RGWBucketPipeSyncStatusManager::~RGWBucketPipeSyncStatusManager()
{
- for (map<int, RGWRemoteBucketLog *>::iterator iter = source_logs.begin(); iter != source_logs.end(); ++iter) {
- delete iter->second;
+ for (vector<RGWRemoteBucketManager *>::iterator iter = source_mgrs.begin(); iter != source_mgrs.end(); ++iter) {
+ delete *iter;
}
delete error_logger;
}
RGWDataSyncEnv *sync_env;
rgw_bucket bucket;
+ std::optional<string> source_zone;
+
map<string, RGWBucketSyncFlowManager::pipe_set> *sources;
+ map<rgw_bucket, RGWBucketInfo> *sources_info;
+ map<rgw_bucket, RGWBucketInfo>::iterator siiter;
RGWBucketInfo *pbucket_info;
rgw_raw_obj sources_obj;
+ bool found_binfo{false};
+ map<string, RGWBucketSyncFlowManager::pipe_set>::iterator siter;
+ map<string, RGWBucketSyncFlowManager::pipe_set>::iterator siter_end;
+ set<rgw_sync_bucket_pipe>::iterator piter;
+
+ std::optional<rgw_bucket> source_bucket;
+
rgw_bucket_sync_sources_local_info sources_local_info;
rgw_bucket_sync_sources_local_info expected_local_info;
public:
RGWGetBucketSourcePeersCR(RGWDataSyncEnv *_sync_env,
const rgw_bucket& _bucket,
+ std::optional<string> _source_zone,
map<string, RGWBucketSyncFlowManager::pipe_set> *_sources,
+ map<rgw_bucket, RGWBucketInfo> *_sources_info,
RGWBucketInfo *_pbucket_info,
const RGWSyncTraceNodeRef& _tn_parent)
: RGWCoroutine(_sync_env->cct),
sync_env(_sync_env),
bucket(_bucket),
+ source_zone(_source_zone),
sources(_sources),
+ sources_info(_sources_info),
pbucket_info(_pbucket_info),
sources_obj(RGWBucketSyncSourcesManager::sync_sources_obj(sync_env->svc->zone, bucket)),
policy(make_shared<rgw_bucket_get_sync_policy_result>()),
int operate() override;
};
-int RGWGetBucketSourcePeersCR::operate()
-{
- reenter(this) {
- yield call(new RGWSimpleRadosReadCR<rgw_bucket_sync_sources_local_info>(sync_env->async_rados,
- sync_env->svc->sysobj,
- RGWBucketSyncSourcesManager::sync_sources_obj(sync_env->svc->zone, bucket),
- &sources_local_info));
- if (retcode < 0 &&
- retcode != -ENOENT) {
- return set_cr_error(retcode);
- }
-
- get_policy_params.bucket = bucket;
- yield call(new RGWBucketGetSyncPolicyHandlerCR(sync_env->async_rados,
- sync_env->store,
- get_policy_params,
- policy));
- if (retcode < 0 &&
- retcode != -ENOENT) {
- return set_cr_error(retcode);
- }
-#warning update local copy of sources and act on changes
-
- {
- auto& handler = policy->policy_handler;
-
- *sources = handler->get_sources();
- auto& binfo = handler->get_bucket_info();
- if (binfo) {
- *pbucket_info = *binfo;
- }
- }
-
- return set_cr_done();
- }
-
- return 0;
-}
-
class RGWRunBucketSourcesSyncCR : public RGWCoroutine {
RGWDataSyncEnv *sync_env;
rgw_bucket bucket;
RGWBucketInfo bucket_info;
+ std::optional<std::string> source_zone;
+
rgw_raw_obj sources_obj;
map<string, RGWBucketSyncFlowManager::pipe_set> sources;
+ map<rgw_bucket, RGWBucketInfo> sources_info;
map<string, RGWBucketSyncFlowManager::pipe_set>::iterator siter;
set<rgw_sync_bucket_pipe>::iterator piter;
public:
RGWRunBucketSourcesSyncCR(RGWDataSyncEnv *_sync_env,
const rgw_bucket& _bucket,
+ optional<string> _source_zone,
const RGWSyncTraceNodeRef& _tn_parent)
: RGWCoroutine(_sync_env->cct),
sync_env(_sync_env),
bucket(_bucket),
+ source_zone(_source_zone),
sources_obj(RGWBucketSyncSourcesManager::sync_sources_obj(_sync_env->svc->zone, bucket)),
tn(sync_env->sync_tracer->add_node(_tn_parent, "bucket_sync_sources",
SSTR(bucket))) {
}
tn->log(10, "took lease");
- yield call(new RGWGetBucketSourcePeersCR(sync_env, bucket, &sources, &bucket_info, tn));
+ yield call(new RGWGetBucketSourcePeersCR(sync_env, bucket, source_zone, &sources, &sources_info, &bucket_info, tn));
if (retcode < 0 && retcode != -ENOENT) {
tn->log(0, "ERROR: failed to read sync status for bucket");
lease_cr->go_down();
scs.emplace_back();
cur_sc = &scs.back();
{
- auto& source_zone = siter->first;
- auto conn = sync_env->svc->zone->get_zone_conn_by_id(source_zone);
+ auto& szone = siter->first;
+ auto conn = sync_env->svc->zone->get_zone_conn_by_id(szone);
if (!conn) {
- ldpp_dout(sync_env->dpp, 0) << "ERROR: connection object to zone " << source_zone << " does not exist" << dendl;
+ ldpp_dout(sync_env->dpp, 0) << "ERROR: connection object to zone " << szone << " does not exist" << dendl;
continue;
}
cur_sc->init(sync_env, conn, siter->first);
return 0;
}
+int RGWGetBucketSourcePeersCR::operate()
+{
+ reenter(this) {
+ yield call(new RGWSimpleRadosReadCR<rgw_bucket_sync_sources_local_info>(sync_env->async_rados,
+ sync_env->svc->sysobj,
+ RGWBucketSyncSourcesManager::sync_sources_obj(sync_env->svc->zone, bucket),
+ &sources_local_info));
+ if (retcode < 0 &&
+ retcode != -ENOENT) {
+ return set_cr_error(retcode);
+ }
+
+ get_policy_params.bucket = bucket;
+ yield call(new RGWBucketGetSyncPolicyHandlerCR(sync_env->async_rados,
+ sync_env->store,
+ get_policy_params,
+ policy));
+ if (retcode < 0 &&
+ retcode != -ENOENT) {
+ return set_cr_error(retcode);
+ }
+#warning update local copy of sources and act on changes
+
+ {
+ auto& handler = policy->policy_handler;
+
+ *sources = handler->get_sources();
+ auto& binfo = handler->get_bucket_info();
+ if (binfo) {
+ *pbucket_info = *binfo;
+ found_binfo = true;
+ }
+ }
+
+ if (sources_info) {
+ if (found_binfo) {
+ (*sources_info)[bucket] = *pbucket_info;
+ }
+
+ siter_end = sources->end();
+ if (source_zone) {
+ siter = sources->find(*source_zone);
+ if (siter != sources->end()) {
+ siter_end = siter;
+ ++siter_end;
+ }
+ } else {
+ siter = sources->begin();
+ }
+ for (; siter != siter_end; ++siter) {
+ for (piter = siter->second.pipes.begin();
+ piter != siter->second.pipes.end();
+ ++piter) {
+ source_bucket = piter->source.bucket;
+ if (!source_bucket) {
+ continue;
+ }
+ if (sources_info->find(*source_bucket) != sources_info->end()) {
+ continue;
+ }
+
+ (*sources_info)[*source_bucket] = RGWBucketInfo(); /* reserve space for it, will fetch it later when map cannot change */
+ }
+ }
+
+ for (siiter = sources_info->begin(); siiter != sources_info->end(); ++siiter) {
+ if (siiter->first != bucket) {
+ yield call(new RGWSyncGetBucketInfoCR(sync_env, siiter->first, &siiter->second, tn));
+ }
+ }
+ }
+
+ return set_cr_done();
+ }
+
+ return 0;
+}
+
int RGWRunBucketSyncCoroutine::operate()
{
reenter(this) {
return 0;
}
-RGWCoroutine *RGWRemoteBucketLog::run_sync_cr()
+RGWCoroutine *RGWRemoteBucketManager::run_sync_cr(int num)
{
- return new RGWRunBucketSyncCoroutine(&sc, sync_pair, sync_env.sync_tracer->root_node);
+ if ((size_t)num >= sync_pairs.size()) {
+ return nullptr;
+ }
+
+ return new RGWRunBucketSyncCoroutine(&sc, sync_pairs[num], sync_env->sync_tracer->root_node);
}
int RGWBucketPipeSyncStatusManager::init()
{
- conn = store->svc()->zone->get_zone_conn_by_id(source_zone);
- if (!conn) {
- ldpp_dout(this, 0) << "connection object to zone " << source_zone << " does not exist" << dendl;
- return -EINVAL;
- }
-
int ret = http_manager.start();
if (ret < 0) {
ldpp_dout(this, 0) << "failed in http_manager.start() ret=" << ret << dendl;
return ret;
}
-#warning read specific bucket sources
- rgw_bucket source_bucket = dest_bucket;
+ error_logger = new RGWSyncErrorLogger(store, RGW_SYNC_ERROR_LOG_SHARD_PREFIX, ERROR_LOGGER_SHARDS);
- const string key = source_bucket.get_key();
+ sync_module.reset(new RGWDefaultSyncModuleInstance());
+ auto async_rados = store->svc()->rados->get_async_processor();
- rgw_http_param_pair pairs[] = { { "key", key.c_str() },
- { NULL, NULL } };
+ sync_env.init(this, store->ctx(), store,
+ store->svc(), async_rados, &http_manager,
+ error_logger, store->getRados()->get_sync_tracer(),
+ sync_module, nullptr);
- string path = string("/admin/metadata/bucket.instance");
+ map<string, RGWBucketSyncFlowManager::pipe_set> sources;
+ map<rgw_bucket, RGWBucketInfo> sources_info;
+ RGWBucketInfo dest_bucket_info;
- bucket_instance_meta_info result;
- ret = cr_mgr.run(new RGWReadRESTResourceCR<bucket_instance_meta_info>(store->ctx(), conn, &http_manager, path, pairs, &result));
+ ret = cr_mgr.run(new RGWGetBucketSourcePeersCR(&sync_env,
+ dest_bucket,
+ source_zone,
+ &sources,
+ &sources_info,
+ &dest_bucket_info,
+ sync_env.sync_tracer->root_node));
if (ret < 0) {
- ldpp_dout(this, 0) << "ERROR: failed to fetch bucket metadata info from zone=" << source_zone << " path=" << path << " key=" << key << " ret=" << ret << dendl;
+ ldpp_dout(this, 0) << "failed to get bucket source peers info: (ret=" << ret << "): " << cpp_strerror(-ret) << dendl;
return ret;
}
- RGWBucketInfo& bi = result.data.get_bucket_info();
- num_shards = bi.num_shards;
+ for (auto siter : sources) {
+ if (source_zone && siter.first != *source_zone) {
+ continue;
+ }
- error_logger = new RGWSyncErrorLogger(store, RGW_SYNC_ERROR_LOG_SHARD_PREFIX, ERROR_LOGGER_SHARDS);
+ auto& szone = siter.first;
- sync_module.reset(new RGWDefaultSyncModuleInstance());
+ conn = store->svc()->zone->get_zone_conn_by_id(szone);
+ if (!conn) {
+ ldpp_dout(this, 0) << "connection object to zone " << szone << " does not exist" << dendl;
+ return -EINVAL;
+ }
- int effective_num_shards = (num_shards ? num_shards : 1);
+ for (auto& pipe : siter.second.pipes) {
+ auto& source_bucket = pipe.source.bucket;
- auto async_rados = store->svc()->rados->get_async_processor();
+ if (!source_bucket) {
+ continue;
+ }
- for (int i = 0; i < effective_num_shards; i++) {
- RGWRemoteBucketLog *l = new RGWRemoteBucketLog(this, store, async_rados, &http_manager);
- ret = l->init(source_zone, conn, source_bucket, (num_shards ? i : -1), dest_bucket, error_logger, store->getRados()->get_sync_tracer(), sync_module);
- if (ret < 0) {
- ldpp_dout(this, 0) << "ERROR: failed to initialize RGWRemoteBucketLog object" << dendl;
- return ret;
+ auto iter = sources_info.find(*source_bucket);
+ if (iter == sources_info.end()) {
+ ldpp_dout(this, 0) << "ERROR: " << __func__ << "(): failed to find bucket info for bucket=" << *source_bucket << ". Likely a bug" << dendl;
+ return -EIO;
+ }
+
+ auto& source_bucket_info = iter->second;
+
+ source_mgrs.push_back(new RGWRemoteBucketManager(this, &sync_env,
+ szone, conn,
+ source_bucket_info,
+ dest_bucket_info.bucket));
}
- source_logs[i] = l;
}
return 0;
{
list<RGWCoroutinesStack *> stacks;
- for (map<int, RGWRemoteBucketLog *>::iterator iter = source_logs.begin(); iter != source_logs.end(); ++iter) {
+ for (auto& mgr : source_mgrs) {
RGWCoroutinesStack *stack = new RGWCoroutinesStack(store->ctx(), &cr_mgr);
- RGWRemoteBucketLog *l = iter->second;
- stack->call(l->init_sync_status_cr());
+
+ for (int i = 0; i < mgr->num_pipes(); ++i) {
+ stack->call(mgr->init_sync_status_cr(i));
+ }
stacks.push_back(stack);
}
{
list<RGWCoroutinesStack *> stacks;
- for (map<int, RGWRemoteBucketLog *>::iterator iter = source_logs.begin(); iter != source_logs.end(); ++iter) {
+ for (auto& mgr : source_mgrs) {
RGWCoroutinesStack *stack = new RGWCoroutinesStack(store->ctx(), &cr_mgr);
- RGWRemoteBucketLog *l = iter->second;
- stack->call(l->read_sync_status_cr(&sync_status[iter->first]));
+ for (int i = 0; i < mgr->num_pipes(); ++i) {
+ stack->call(mgr->read_sync_status_cr(i, &sync_status[i]));
+ }
stacks.push_back(stack);
}
{
list<RGWCoroutinesStack *> stacks;
- for (map<int, RGWRemoteBucketLog *>::iterator iter = source_logs.begin(); iter != source_logs.end(); ++iter) {
+ for (auto& mgr : source_mgrs) {
RGWCoroutinesStack *stack = new RGWCoroutinesStack(store->ctx(), &cr_mgr);
- RGWRemoteBucketLog *l = iter->second;
- stack->call(l->run_sync_cr());
+ for (int i = 0; i < mgr->num_pipes(); ++i) {
+ stack->call(mgr->run_sync_cr(i));
+ }
stacks.push_back(stack);
}
std::ostream& RGWBucketPipeSyncStatusManager::gen_prefix(std::ostream& out) const
{
- auto zone = std::string_view{source_zone};
+ auto zone = std::string_view{source_zone.value_or("*")};
return out << "bucket sync zone:" << zone.substr(0, 8)
- << " bucket:" << dest_bucket << ' ';
+ << " bucket:" << dest_bucket << ' ';
}
string RGWBucketPipeSyncStatusManager::status_oid(const string& source_zone,
RGWSyncErrorLogger *_error_logger, RGWSyncTraceManager *_sync_tracer,
RGWSyncModuleInstanceRef& _sync_module,
PerfCounters* _counters) {
- dpp = _dpp;
+ dpp = _dpp;
cct = _cct;
store = _store;
svc = _svc;
};
-class RGWRemoteBucketLog : public RGWCoroutinesManager {
+class RGWRemoteBucketManager {
const DoutPrefixProvider *dpp;
- rgw::sal::RGWRadosStore *store;
+
+ RGWDataSyncEnv *sync_env;
+
RGWRESTConn *conn{nullptr};
string source_zone;
- rgw_bucket_sync_pair_info sync_pair;
+ vector<rgw_bucket_sync_pair_info> sync_pairs;
- RGWAsyncRadosProcessor *async_rados;
- RGWHTTPManager *http_manager;
-
- RGWDataSyncEnv sync_env;
RGWDataSyncCtx sc;
rgw_bucket_shard_sync_info init_status;
RGWBucketSyncCR *sync_cr{nullptr};
public:
- RGWRemoteBucketLog(const DoutPrefixProvider *_dpp, rgw::sal::RGWRadosStore *_store,
- RGWAsyncRadosProcessor *_async_rados,
- RGWHTTPManager *_http_manager);
-
- int init(const string& _source_zone, RGWRESTConn *_conn,
- const rgw_bucket& source_bucket, int shard_id,
- const rgw_bucket& dest_bucket,
- RGWSyncErrorLogger *_error_logger,
- RGWSyncTraceManager *_sync_tracer,
- RGWSyncModuleInstanceRef& _sync_module);
- void finish();
+ RGWRemoteBucketManager(const DoutPrefixProvider *_dpp,
+ RGWDataSyncEnv *_sync_env,
+ const string& _source_zone, RGWRESTConn *_conn,
+ const RGWBucketInfo& source_bucket_info,
+ const rgw_bucket& dest_bucket);
- RGWCoroutine *read_sync_status_cr(rgw_bucket_shard_sync_info *sync_status);
- RGWCoroutine *init_sync_status_cr();
- RGWCoroutine *run_sync_cr();
+ void init(const string& _source_zone, RGWRESTConn *_conn,
+ const rgw_bucket& source_bucket, int shard_id,
+ const rgw_bucket& dest_bucket);
+
+ RGWCoroutine *read_sync_status_cr(int num, rgw_bucket_shard_sync_info *sync_status);
+ RGWCoroutine *init_sync_status_cr(int num);
+ RGWCoroutine *run_sync_cr(int num);
+
+ int num_pipes() {
+ return sync_pairs.size();
+ }
void wakeup();
};
class RGWBucketPipeSyncStatusManager : public DoutPrefixProvider {
rgw::sal::RGWRadosStore *store;
+ RGWDataSyncEnv sync_env;
+
RGWCoroutinesManager cr_mgr;
RGWHTTPManager http_manager;
- string source_zone;
+ std::optional<string> source_zone;
+ std::optional<rgw_bucket> source_bucket;
+
RGWRESTConn *conn;
RGWSyncErrorLogger *error_logger;
RGWSyncModuleInstanceRef sync_module;
rgw_bucket dest_bucket;
- map<int, RGWRemoteBucketLog *> source_logs;
+ vector<RGWRemoteBucketManager *> source_mgrs;
string source_status_oid;
string source_shard_status_oid_prefix;
public:
RGWBucketPipeSyncStatusManager(rgw::sal::RGWRadosStore *_store,
- const string& _source_zone,
+ std::optional<string> _source_zone,
+ std::optional<rgw_bucket> _source_bucket,
const rgw_bucket& dest_bucket);
~RGWBucketPipeSyncStatusManager();