class RGWBucketSyncPeersManager {
public:
- static string sync_sources_oid(const rgw_bucket bucket) {
+ static string sync_sources_oid(const rgw_bucket& bucket) {
return bucket_sync_sources_oid_prefix + "." + bucket.get_key();
}
static rgw_raw_obj sync_sources_obj(RGWSI_Zone *zone_svc, const rgw_bucket& bucket) {
return rgw_raw_obj(zone_svc->get_zone_params().log_pool, sync_sources_oid(bucket));
}
- static string sync_targets_oid(const rgw_bucket bucket) {
- return bucket_sync_targets_oid_prefix + "." + bucket.get_key();
+ static string sync_targets_oid(const string& source_zone, const rgw_bucket& source_bucket) {
+ return bucket_sync_targets_oid_prefix + "." + source_zone + "." + source_bucket.get_key();
}
- static rgw_raw_obj sync_targets_obj(RGWSI_Zone *zone_svc, const rgw_bucket& bucket) {
- return rgw_raw_obj(zone_svc->get_zone_params().log_pool, sync_targets_oid(bucket));
+ static rgw_raw_obj sync_targets_obj(RGWSI_Zone *zone_svc, const string& source_zone, const rgw_bucket& source_bucket) {
+ return rgw_raw_obj(zone_svc->get_zone_params().log_pool, sync_targets_oid(source_zone, source_bucket));
}
};
};
WRITE_CLASS_ENCODER(rgw_bucket_sync_sources_local_info)
+
+struct rgw_sync_pipe_info_entity
+{
+private:
+ RGWBucketInfo bucket_info;
+ bool _has_bucket_info{false};
+
+public:
+ string zone;
+
+ rgw_sync_pipe_info_entity() {}
+ rgw_sync_pipe_info_entity(const rgw_sync_bucket_entity& e,
+ std::optional<RGWBucketInfo>& binfo) {
+ if (e.zone) {
+ zone = *e.zone;
+ }
+ if (!e.bucket) {
+ return;
+ }
+ if (!binfo ||
+ binfo->bucket != *e.bucket) {
+ bucket_info.bucket = *e.bucket;
+ }
+ set_bucket_info(*binfo);
+ }
+
+ void update_empty_bucket_info(const std::map<rgw_bucket, RGWBucketInfo>& buckets_info) {
+ if (_has_bucket_info) {
+ return;
+ }
+ if (bucket_info.bucket.name.empty()) {
+ return;
+ }
+
+ auto iter = buckets_info.find(bucket_info.bucket);
+ if (iter == buckets_info.end()) {
+ return;
+ }
+
+ set_bucket_info(iter->second);
+ }
+
+ bool has_bucket_info() const {
+ return _has_bucket_info;
+ }
+
+ void set_bucket_info(const RGWBucketInfo& _bucket_info) {
+ bucket_info = _bucket_info;
+ _has_bucket_info = true;
+ }
+
+ const RGWBucketInfo& get_bucket_info() const {
+ return bucket_info;
+ }
+
+ const rgw_bucket& get_bucket() const {
+ return bucket_info.bucket;
+ }
+
+ bool operator<(const rgw_sync_pipe_info_entity& e) const {
+ if (zone < e.zone) {
+ return false;
+ }
+ if (zone > e.zone) {
+ return true;
+ }
+ return (bucket_info.bucket < e.bucket_info.bucket);
+ }
+};
+
+struct rgw_sync_pipe_info {
+ rgw_sync_pipe_info_entity source;
+ rgw_sync_pipe_info_entity target;
+
+ rgw_sync_pipe_info() {}
+ rgw_sync_pipe_info(const rgw_sync_bucket_pipe& pipe,
+ std::optional<RGWBucketInfo> source_bucket_info,
+ std::optional<RGWBucketInfo> target_bucket_info) : source(pipe.source, source_bucket_info),
+ target(pipe.dest, target_bucket_info) {}
+
+ bool operator<(const rgw_sync_pipe_info& p) const {
+ if (source < p.source) {
+ return true;
+ }
+ if (p.source < source) {
+ return false;
+ }
+ return (target < p.target);
+ }
+
+ void update_empty_bucket_info(const std::map<rgw_bucket, RGWBucketInfo>& buckets_info) {
+ source.update_empty_bucket_info(buckets_info);
+ target.update_empty_bucket_info(buckets_info);
+ }
+};
+
+struct rgw_sync_pipe_info_set {
+ std::set<rgw_sync_pipe_info> pipes;
+
+ using iterator = std::set<rgw_sync_pipe_info>::iterator;
+
+ void clear() {
+ pipes.clear();
+ }
+
+ void insert(const rgw_sync_bucket_pipe& pipe,
+ std::optional<RGWBucketInfo>& source_bucket_info,
+ std::optional<RGWBucketInfo>& target_bucket_info) {
+ rgw_sync_pipe_info p(pipe, source_bucket_info, target_bucket_info);
+ pipes.insert(p);
+ }
+
+ iterator begin() {
+ return pipes.begin();
+ }
+
+ iterator end() {
+ return pipes.end();
+ }
+
+ void update_empty_bucket_info(const std::map<rgw_bucket, RGWBucketInfo>& buckets_info) {
+ if (buckets_info.empty()) {
+ return;
+ }
+
+ std::set<rgw_sync_pipe_info> p;
+
+ for (auto pipe : pipes) {
+ pipe.update_empty_bucket_info(buckets_info);
+ p.insert(pipe);
+ }
+
+ pipes = std::move(p);
+ }
+};
+
class RGWGetBucketPeersCR : public RGWCoroutine {
RGWDataSyncEnv *sync_env;
std::optional<string> source_zone;
std::optional<rgw_bucket> source_bucket;
- RGWBucketSyncFlowManager::pipe_set *sources;
- map<rgw_bucket, RGWBucketInfo> *sources_info;
+ rgw_sync_pipe_info_set *pipes;
+ map<rgw_bucket, RGWBucketInfo> buckets_info;
map<rgw_bucket, RGWBucketInfo>::iterator siiter;
- RGWBucketInfo *pbucket_info;
+ std::optional<RGWBucketInfo> target_bucket_info;
+ std::optional<RGWBucketInfo> source_bucket_info;
- RGWBucketSyncFlowManager::pipe_set::iterator siter;
+ rgw_sync_pipe_info_set::iterator siter;
rgw_bucket_sync_sources_local_info sources_local_info;
rgw_bucket_sync_sources_local_info expected_local_info;
rgw_bucket_get_sync_policy_params get_policy_params;
- std::shared_ptr<rgw_bucket_get_sync_policy_result> policy;
+ std::shared_ptr<rgw_bucket_get_sync_policy_result> source_policy;
+ std::shared_ptr<rgw_bucket_get_sync_policy_result> target_policy;
RGWSyncTraceNodeRef tn;
return { b, std::next(b) };
}
- static RGWBucketSyncFlowManager::pipe_set filter_sources(std::optional<string> source_zone,
- std::optional<rgw_bucket> source_bucket,
- const map<string, RGWBucketSyncFlowManager::pipe_set>& all_sources) {
- RGWBucketSyncFlowManager::pipe_set result;
-
+ void filter_sources(std::optional<string> source_zone,
+ std::optional<rgw_bucket> source_bucket,
+ const map<string, RGWBucketSyncFlowManager::pipe_set>& all_sources,
+ rgw_sync_pipe_info_set *result) {
auto iters = get_pipe_iters(all_sources, source_zone);
for (auto i = iters.first; i != iters.second; ++i) {
- for (auto& peer : i->second.pipes) {
+ for (auto& pipe : i->second.pipes) {
+ if (!pipe.specific()) {
+ continue;
+ }
if (source_bucket &&
- peer.source.bucket &&
- *source_bucket != *peer.source.bucket) {
+ *source_bucket != *pipe.source.bucket) {
continue;
}
- result.insert(peer);
+ result->insert(pipe, source_bucket_info, target_bucket_info);
}
}
- return result;
}
- static RGWBucketSyncFlowManager::pipe_set filter_targets(std::optional<string> target_zone,
- std::optional<rgw_bucket> target_bucket,
- const map<string, RGWBucketSyncFlowManager::pipe_set>& all_targets) {
- RGWBucketSyncFlowManager::pipe_set result;
-
+ void filter_targets(std::optional<string> target_zone,
+ std::optional<rgw_bucket> target_bucket,
+ const map<string, RGWBucketSyncFlowManager::pipe_set>& all_targets,
+ rgw_sync_pipe_info_set *result) {
auto iters = get_pipe_iters(all_targets, target_zone);
for (auto i = iters.first; i != iters.second; ++i) {
- for (auto& peer : i->second.pipes) {
+ for (auto& pipe : i->second.pipes) {
if (target_bucket &&
- peer.dest.bucket &&
- *target_bucket != *peer.dest.bucket) {
+ pipe.dest.bucket &&
+ *target_bucket != *pipe.dest.bucket) {
continue;
}
- result.insert(peer);
+ result->insert(pipe, source_bucket_info, target_bucket_info);
}
}
- return result;
}
+ void update_from_target_bucket_policy();
+ void update_from_source_bucket_policy();
+
public:
RGWGetBucketPeersCR(RGWDataSyncEnv *_sync_env,
std::optional<rgw_bucket> _target_bucket,
std::optional<string> _source_zone,
std::optional<rgw_bucket> _source_bucket,
- RGWBucketSyncFlowManager::pipe_set *_sources,
- map<rgw_bucket, RGWBucketInfo> *_sources_info,
- RGWBucketInfo *_pbucket_info,
+ rgw_sync_pipe_info_set *_pipes,
const RGWSyncTraceNodeRef& _tn_parent)
: RGWCoroutine(_sync_env->cct),
sync_env(_sync_env),
target_bucket(_target_bucket),
source_zone(_source_zone),
- sources(_sources),
- sources_info(_sources_info),
- pbucket_info(_pbucket_info),
- policy(make_shared<rgw_bucket_get_sync_policy_result>()),
+ pipes(_pipes),
tn(sync_env->sync_tracer->add_node(_tn_parent, "get_bucket_peers",
SSTR( "target=" << target_bucket.value_or(rgw_bucket())
<< ":source=" << target_bucket.value_or(rgw_bucket())
- << ":source_zone=" << source_zone.value_or("*")))) {}
+ << ":source_zone=" << source_zone.value_or("*")))) {
+ if (target_bucket) {
+ target_policy = make_shared<rgw_bucket_get_sync_policy_result>();
+ }
+ if (source_bucket) {
+ source_policy = make_shared<rgw_bucket_get_sync_policy_result>();
+ }
+ }
int operate() override;
};
class RGWRunBucketSourcesSyncCR : public RGWCoroutine {
RGWDataSyncEnv *sync_env;
- rgw_bucket bucket;
- RGWBucketInfo bucket_info;
+ std::optional<rgw_bucket_shard> target_bs;
std::optional<std::string> source_zone;
+ std::optional<rgw_bucket_shard> source_bs;
+
+ std::optional<rgw_bucket> target_bucket;
+ std::optional<rgw_bucket> source_bucket;
rgw_raw_obj sources_obj;
- RGWBucketSyncFlowManager::pipe_set sources;
- map<rgw_bucket, RGWBucketInfo> sources_info;
- RGWBucketSyncFlowManager::pipe_set::iterator siter;
+ rgw_sync_pipe_info_set pipes;
+ rgw_sync_pipe_info_set::iterator siter;
rgw_bucket_sync_pair_info sync_pair;
public:
RGWRunBucketSourcesSyncCR(RGWDataSyncEnv *_sync_env,
- const rgw_bucket& _bucket,
- optional<string> _source_zone,
+ std::optional<rgw_bucket_shard> _target_bs,
+ std::optional<string> _source_zone,
+ std::optional<rgw_bucket_shard> _source_bs,
const RGWSyncTraceNodeRef& _tn_parent)
: RGWCoroutine(_sync_env->cct),
sync_env(_sync_env),
- bucket(_bucket),
+ target_bs(_target_bs),
source_zone(_source_zone),
- sources_obj(RGWBucketSyncPeersManager::sync_sources_obj(_sync_env->svc->zone, bucket)),
+ source_bs(_source_bs),
tn(sync_env->sync_tracer->add_node(_tn_parent, "bucket_sync_sources",
- SSTR(bucket))) {
+ SSTR( "target=" << target_bucket.value_or(rgw_bucket()) << ":source_bucket=" << source_bucket.value_or(rgw_bucket()) << ":source_zone=" << source_zone.value_or("*")))) {
+ if (target_bs) {
+ target_bucket = target_bs->bucket;
+ sources_obj = RGWBucketSyncPeersManager::sync_sources_obj(_sync_env->svc->zone, *target_bucket);
+ }
+ if (source_bs) {
+ source_bucket = source_bs->bucket;
+ }
}
~RGWRunBucketSourcesSyncCR() override {
if (lease_cr) {
}
tn->log(10, "took lease");
- yield call(new RGWGetBucketPeersCR(sync_env, bucket, source_zone, std::nullopt, &sources, &sources_info, &bucket_info, tn));
+ yield call(new RGWGetBucketPeersCR(sync_env, target_bucket, source_zone, source_bucket, &pipes, tn));
if (retcode < 0 && retcode != -ENOENT) {
tn->log(0, "ERROR: failed to read sync status for bucket");
lease_cr->go_down();
return set_cr_error(retcode);
}
- for (siter = sources.begin(); siter != sources.end(); ++siter) {
+ for (siter = pipes.begin(); siter != pipes.end(); ++siter) {
scs.emplace_back();
cur_sc = &scs.back();
- if (!siter->source.zone ||
- !siter->source.bucket) {
- continue;
- }
{
- auto& szone = *siter->source.zone;
+ auto& szone = siter->source.zone;
if (last_zone != szone) {
conn = sync_env->svc->zone->get_zone_conn_by_id(szone);
if (!conn) {
cur_sc->init(sync_env, conn, szone);
}
- sync_pair.source_bs.bucket = *siter->source.bucket;
- sync_pair.dest_bs.bucket = bucket_info.bucket;
+ sync_pair.source_bs.bucket = siter->source.get_bucket();
+ sync_pair.dest_bs.bucket = siter->target.get_bucket();
+#warning TODO iterate over shards
yield spawn(new RGWRunBucketSyncCoroutine(cur_sc, sync_pair, tn), false);
while (num_spawned() > BUCKET_SYNC_SPAWN_WINDOW) {
return 0;
}
+void RGWGetBucketPeersCR::update_from_target_bucket_policy()
+{
+ auto handler = target_policy->policy_handler.get();
+
+ if (!pipes) {
+ return;
+ }
+
+ filter_sources(source_zone,
+ source_bucket,
+ handler->get_sources(),
+ pipes);
+
+ for (siter = pipes->begin(); siter != pipes->end(); ++siter) {
+ if (!siter->source.has_bucket_info()) {
+ buckets_info.emplace(siter->source.get_bucket(), RGWBucketInfo());
+ }
+ if (!siter->target.has_bucket_info()) {
+ buckets_info.emplace(siter->target.get_bucket(), RGWBucketInfo());
+ }
+ }
+}
+
+void RGWGetBucketPeersCR::update_from_source_bucket_policy()
+{
+ auto handler = source_policy->policy_handler.get();
+
+ if (!pipes) {
+ return;
+ }
+
+ filter_targets(sync_env->svc->zone->get_zone().id,
+ target_bucket,
+ handler->get_targets(),
+ pipes);
+
+ for (siter = pipes->begin(); siter != pipes->end(); ++siter) {
+ if (!siter->source.has_bucket_info()) {
+ buckets_info.emplace(siter->source.get_bucket(), RGWBucketInfo());
+ }
+ if (!siter->target.has_bucket_info()) {
+ buckets_info.emplace(siter->target.get_bucket(), RGWBucketInfo());
+ }
+ }
+}
+
int RGWGetBucketPeersCR::operate()
{
reenter(this) {
+ if (pipes) {
+ pipes->clear();
+ }
if (target_bucket) {
yield call(new RGWSimpleRadosReadCR<rgw_bucket_sync_sources_local_info>(sync_env->async_rados,
sync_env->svc->sysobj,
retcode != -ENOENT) {
return set_cr_error(retcode);
}
- }
- get_policy_params.bucket = *target_bucket;
- yield call(new RGWBucketGetSyncPolicyHandlerCR(sync_env->async_rados,
- sync_env->store,
- get_policy_params,
- policy));
- if (retcode < 0 &&
- retcode != -ENOENT) {
- return set_cr_error(retcode);
+ get_policy_params.bucket = *target_bucket;
+ yield call(new RGWBucketGetSyncPolicyHandlerCR(sync_env->async_rados,
+ sync_env->store,
+ get_policy_params,
+ target_policy));
+ if (retcode < 0 &&
+ retcode != -ENOENT) {
+ return set_cr_error(retcode);
+ }
}
-#warning update local copy of sources and act on changes
- {
- auto& handler = policy->policy_handler;
-
- if (sources) {
- *sources = filter_sources(source_zone,
- source_bucket,
- handler->get_sources());
+ if (source_bucket && source_zone) {
+ yield call(new RGWSimpleRadosReadCR<rgw_bucket_sync_sources_local_info>(sync_env->async_rados,
+ sync_env->svc->sysobj,
+ RGWBucketSyncPeersManager::sync_targets_obj(sync_env->svc->zone, *source_zone, *source_bucket),
+ &sources_local_info));
+ if (retcode < 0 &&
+ retcode != -ENOENT) {
+ return set_cr_error(retcode);
}
- auto& binfo = handler->get_bucket_info();
- if (binfo) {
- *pbucket_info = *binfo;
- if (sources_info) {
- (*sources_info)[binfo->bucket] = *binfo;
- }
+ get_policy_params.bucket = *source_bucket;
+ yield call(new RGWBucketGetSyncPolicyHandlerCR(sync_env->async_rados,
+ sync_env->store,
+ get_policy_params,
+ source_policy));
+ if (retcode < 0 &&
+ retcode != -ENOENT) {
+ return set_cr_error(retcode);
}
+
+ source_bucket_info = source_policy->policy_handler->get_bucket_info();
}
+#warning update local copy of sources and act on changes
- if (sources && sources_info) {
- for (siter = sources->begin(); siter != sources->end(); ++siter) {
- source_bucket = siter->source.bucket;
- if (!source_bucket) {
- continue;
- }
- if (sources_info->find(*source_bucket) != sources_info->end()) {
- continue;
- }
+ update_from_target_bucket_policy();
+ update_from_source_bucket_policy();
- sources_info->emplace(*source_bucket, RGWBucketInfo()); /* reserve space for it, will fetch it later when map cannot change */
+ for (siiter = buckets_info.begin(); siiter != buckets_info.end(); ++siiter) {
+ if (siiter->second.bucket.name.empty()) {
+ yield call(new RGWSyncGetBucketInfoCR(sync_env, siiter->first, &siiter->second, tn));
}
+ }
- for (siiter = sources_info->begin(); siiter != sources_info->end(); ++siiter) {
- if (siiter->second.bucket.name.empty()) {
- yield call(new RGWSyncGetBucketInfoCR(sync_env, siiter->first, &siiter->second, tn));
- }
- }
+ if (pipes) {
+ pipes->update_empty_bucket_info(buckets_info);
}
return set_cr_done();
error_logger, store->getRados()->get_sync_tracer(),
sync_module, nullptr);
- RGWBucketSyncFlowManager::pipe_set sources;
- map<rgw_bucket, RGWBucketInfo> sources_info;
- RGWBucketInfo dest_bucket_info;
+ rgw_sync_pipe_info_set pipes;
ret = cr_mgr.run(new RGWGetBucketPeersCR(&sync_env,
dest_bucket,
source_zone,
std::nullopt,
- &sources,
- &sources_info,
- &dest_bucket_info,
+ &pipes,
sync_env.sync_tracer->root_node));
if (ret < 0) {
ldpp_dout(this, 0) << "failed to get bucket source peers info: (ret=" << ret << "): " << cpp_strerror(-ret) << dendl;
string last_zone;
- for (auto& peer : sources) {
- if (!peer.source.zone) {
- continue;
- }
-
- auto szone = *peer.source.zone;
+ for (auto& pipe : pipes) {
+ auto& szone = pipe.source.zone;
if (last_zone != szone) {
conn = store->svc()->zone->get_zone_conn_by_id(szone);
last_zone = szone;
}
- auto& source_bucket = peer.source.bucket;
-
- if (!source_bucket) {
- continue;
- }
-
- auto iter = sources_info.find(*source_bucket);
- if (iter == sources_info.end()) {
- ldpp_dout(this, 0) << "ERROR: " << __func__ << "(): failed to find bucket info for bucket=" << *source_bucket << ". Likely a bug" << dendl;
- return -EIO;
- }
-
- auto& source_bucket_info = iter->second;
-
source_mgrs.push_back(new RGWRemoteBucketManager(this, &sync_env,
szone, conn,
- source_bucket_info,
- dest_bucket_info.bucket));
+ pipe.source.get_bucket_info(),
+ pipe.target.get_bucket()));
}
return 0;