Convert zone names to zone ids, adjust interfaces.
Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
for (auto& m : sources) {
auto& zone = m.first;
out << indented{width, "source zone"} << zone << std::endl;
- for (auto& s : m.second) {
- out << indented{width, "bucket"} << s << std::endl;
+ for (auto& pipe : m.second.pipes) {
+ out << indented{width, "bucket"} << *pipe.source.bucket << std::endl;
}
}
int RGWBucketSyncPolicyHandler::init()
{
-#warning FIXME
-#if 0
- const auto& zone_id = zone_svc->get_zone().id;
- auto& zg = zone_svc->get_zonegroup();
+ flow_mgr->init(*bucket_info.sync_policy);
- if (!bucket_info.sync_policy) {
- return 0;
- }
+ RGWBucketSyncFlowManager::pipe_set sources_by_name;
+ RGWBucketSyncFlowManager::pipe_set targets_by_name;
+ flow_mgr->reflect(bucket_info.bucket, &sources_by_name, &targets_by_name);
+
+ /* convert to zone ids */
+
+ for (auto& pipe : sources_by_name.pipes) {
+ if (!pipe.source.zone) {
+ continue;
+ }
+ rgw_sync_bucket_pipe new_pipe = pipe;
+ string zone_id;
- auto& sync_policy = *bucket_info.sync_policy;
+ if (zone_svc->find_zone_id_by_name(*pipe.source.zone, &zone_id)) {
+ new_pipe.source.zone = zone_id;
+ }
+ sources[*new_pipe.source.zone].pipes.insert(new_pipe);
+ }
+ for (auto& pipe : targets_by_name.pipes) {
+ if (!pipe.dest.zone) {
+ continue;
+ }
+ rgw_sync_bucket_pipe new_pipe = pipe;
+ string zone_id;
+ if (zone_svc->find_zone_id_by_name(*pipe.dest.zone, &zone_id)) {
+ new_pipe.dest.zone = zone_id;
+ }
+ targets[*new_pipe.dest.zone].pipes.insert(new_pipe);
+ }
return 0;
-#endif
}
bool RGWBucketSyncPolicyHandler::bucket_exports_data() const
RGWBucketInfo bucket_info;
std::unique_ptr<RGWBucketSyncFlowManager> flow_mgr;
- std::set<string> source_zones;
+ map<string, RGWBucketSyncFlowManager::pipe_set> sources; /* source pipes by source zone id */
+ map<string, RGWBucketSyncFlowManager::pipe_set> targets; /* target pipes by target zone id */
-public:
- struct peer_info {
- std::string type;
- rgw_bucket bucket;
- /* need to have config for other type of sources */
-
- bool operator<(const peer_info& si) const {
- if (type == si.type) {
- return (bucket < si.bucket);
- }
- return (type < si.type);
- }
-
- bool is_rgw() const {
- return (type.empty() || type == "rgw");
- }
-
- string get_type() const {
- if (!type.empty()) {
- return type;
- }
- return "rgw";
- }
-
- void dump(Formatter *f) const;
- };
+ bool bucket_is_sync_source() const {
+ return !targets.empty();
+ }
-private:
- std::map<string, std::set<peer_info> > sources; /* peers by zone */
- std::map<string, std::set<peer_info> > targets; /* peers by zone */
+ bool bucket_is_sync_target() const {
+ return !sources.empty();
+ }
public:
RGWBucketSyncPolicyHandler(RGWSI_Zone *_zone_svc,
RGWBucketInfo& _bucket_info);
int init();
- std::map<string, std::set<peer_info> >& get_sources() {
+ const map<string, RGWBucketSyncFlowManager::pipe_set>& get_sources() {
return sources;
}
return bucket_info;
}
- bool zone_is_source(const string& zone_id) const {
- return sources.find(zone_id) != sources.end();
- }
-
- bool bucket_is_sync_source() const {
- return !targets.empty();
- }
-
- bool bucket_is_sync_target() const {
- return !sources.empty();
- }
-
bool bucket_exports_data() const;
bool bucket_imports_data() const;
};
-inline ostream& operator<<(ostream& out, const RGWBucketSyncPolicyHandler::peer_info& pi) {
- return out << pi.bucket << " (" << pi.get_type() << ")";
-}
RGWDataSyncEnv *sync_env;
rgw_bucket bucket;
- map<string, set<RGWBucketSyncPolicyHandler::peer_info> > *sources;
+ map<string, RGWBucketSyncFlowManager::pipe_set> *sources;
RGWBucketInfo *pbucket_info;
rgw_raw_obj sources_obj;
public:
RGWGetBucketSourcePeersCR(RGWDataSyncEnv *_sync_env,
const rgw_bucket& _bucket,
- map<string, set<RGWBucketSyncPolicyHandler::peer_info> > *_sources,
+ map<string, RGWBucketSyncFlowManager::pipe_set> *_sources,
RGWBucketInfo *_pbucket_info,
const RGWSyncTraceNodeRef& _tn_parent)
: RGWCoroutine(_sync_env->cct),
rgw_raw_obj sources_obj;
- map<string, set<RGWBucketSyncPolicyHandler::peer_info> > sources;
- map<string, set<RGWBucketSyncPolicyHandler::peer_info> >::iterator siter;
- set<RGWBucketSyncPolicyHandler::peer_info>::iterator piter;
+ map<string, RGWBucketSyncFlowManager::pipe_set> sources;
+ map<string, RGWBucketSyncFlowManager::pipe_set>::iterator siter;
+ set<rgw_sync_bucket_pipe>::iterator piter;
rgw_bucket_sync_pair_info sync_pair;
}
cur_sc->init(sync_env, conn, siter->first);
}
- for (piter = siter->second.begin(); piter != siter->second.end(); ++piter) {
- if (!piter->is_rgw()) {
- continue;
- }
-
- sync_pair.source_bs.bucket = piter->bucket;
+ for (piter = siter->second.pipes.begin(); piter != siter->second.pipes.end(); ++piter) {
+ sync_pair.source_bs.bucket = *piter->source.bucket;
sync_pair.dest_bs.bucket = bucket_info.bucket;
yield spawn(new RGWRunBucketSyncCoroutine(cur_sc, sync_pair, tn), false);
}
}
-void RGWBucketSyncPolicyHandler::peer_info::dump(Formatter *f) const
-{
- encode_json("type", get_type(), f);
- encode_json("bucket", bucket, f);
-}
-
void rgw_obj_key::dump(Formatter *f) const
{
encode_json("name", name, f);