void RGWBucketSyncFlowManager::reflect(std::optional<rgw_bucket> effective_bucket,
RGWBucketSyncFlowManager::pipe_set *source_pipes,
- RGWBucketSyncFlowManager::pipe_set *dest_pipes)
+ RGWBucketSyncFlowManager::pipe_set *dest_pipes) const
{
rgw_sync_bucket_entity entity;
RGWBucketSyncFlowManager::RGWBucketSyncFlowManager(const string& _zone_name,
std::optional<rgw_bucket> _bucket,
- RGWBucketSyncFlowManager *_parent) : zone_name(_zone_name),
- bucket(_bucket),
- parent(_parent) {}
+ const RGWBucketSyncFlowManager *_parent) : zone_name(_zone_name),
+ bucket(_bucket),
+ parent(_parent) {}
-int RGWBucketSyncPolicyHandler::init()
+void RGWSyncPolicyCompat::convert_old_sync_config(RGWSI_Zone *zone_svc,
+ RGWSI_SyncModules *sync_modules_svc,
+ rgw_sync_policy_info *ppolicy)
{
-#warning FIXME
-#if 0
- const auto& zone_id = zone_svc->get_zone().id;
- auto& zg = zone_svc->get_zonegroup();
+ bool found = false;
- if (!bucket_info.sync_policy) {
- return 0;
- }
+ rgw_sync_policy_info policy;
- auto& sync_policy = *bucket_info.sync_policy;
+ auto& group = policy.groups["default"];
+ auto& zonegroup = zone_svc->get_zonegroup();
+
+ for (const auto& ziter1 : zonegroup.zones) {
+ const string& id1 = ziter1.first;
+ const RGWZone& z1 = ziter1.second;
+
+ for (const auto& ziter2 : zonegroup.zones) {
+ const string& id2 = ziter2.first;
+ const RGWZone& z2 = ziter2.second;
- if (sync_policy.dests) {
- for (auto& dest : *sync_policy.dests) {
- if (!(dest.bucket || *dest.bucket == bucket_info.bucket)) {
+ if (id1 == id2) {
continue;
}
- if (dest.zones.find("*") == dest.zones.end() &&
- dest.zones.find(zone_id) == dest.zones.end()) {
- continue;
+ if (z1.syncs_from(z2.name)) {
+ found = true;
+ rgw_sync_directional_rule *rule;
+ group.data_flow.find_directional(z2.name, z1.name, true, &rule);
}
+ }
+ }
- if (dest.flow_rules) {
- /* populate trivial peers */
- for (auto& rule : *dest.flow_rules) {
- set<string> source_zones;
- set<string> dest_zones;
- rule.get_zone_peers(zone_id, &source_zones, &dest_zones);
-
- for (auto& sz : source_zones) {
- peer_info sinfo;
- sinfo.bucket = bucket_info.bucket;
- sources[sz].insert(sinfo);
- }
+ if (!found) { /* nothing syncs */
+ return;
+ }
- for (auto& tz : dest_zones) {
- peer_info tinfo;
- tinfo.bucket = bucket_info.bucket;
- dests[tz].insert(tinfo);
- }
- }
- }
+ rgw_sync_bucket_pipes pipes;
+ pipes.id = "all";
+ pipes.source.all_zones = true;
+ pipes.dest.all_zones = true;
- /* non trivial sources */
- for (auto& source : dest.sources) {
- if (!source.bucket ||
- *source.bucket == bucket_info.bucket) {
- if ((source.type.empty() || source.type == "rgw") &&
- source.zone &&
- source.bucket) {
- peer_info sinfo;
- sinfo.type = source.type;
- sinfo.bucket = *source.bucket;
- sources[*source.zone].insert(sinfo);
- }
- }
- }
- }
+ group.pipes.emplace_back(std::move(pipes));
+
+
+ group.status = rgw_sync_policy_group::Status::ENABLED;
+
+ *ppolicy = std::move(policy);
+}
+
+RGWBucketSyncPolicyHandler::RGWBucketSyncPolicyHandler(RGWSI_Zone *_zone_svc,
+ RGWBucketInfo& _bucket_info) : zone_svc(_zone_svc),
+ bucket_info(_bucket_info) {
+ flow_mgr.reset(new RGWBucketSyncFlowManager(zone_svc->zone_name(),
+ bucket_info.bucket,
+ zone_svc->get_sync_flow_manager()));
+}
+
+int RGWBucketSyncPolicyHandler::init()
+{
+#warning FIXME
+#if 0
+ const auto& zone_id = zone_svc->get_zone().id;
+ auto& zg = zone_svc->get_zonegroup();
+
+ if (!bucket_info.sync_policy) {
+ return 0;
}
-#endif
+
+ auto& sync_policy = *bucket_info.sync_policy;
return 0;
+#endif
}
bool RGWBucketSyncPolicyHandler::bucket_exports_data() const
{
return bucket_is_sync_target();
}
+
#include "rgw_sync_policy.h"
class RGWSI_Zone;
+class RGWSI_SyncModules;
struct rgw_sync_group_pipe_map;
struct rgw_sync_bucket_pipes;
struct rgw_sync_policy_info;
std::optional<rgw_bucket> dest_bucket) const;
};
+class RGWSyncPolicyCompat {
+public:
+ static void convert_old_sync_config(RGWSI_Zone *zone_svc,
+ RGWSI_SyncModules *sync_modules_svc,
+ rgw_sync_policy_info *ppolicy);
+};
+
class RGWBucketSyncFlowManager {
public:
struct pipe_set {
string zone_name;
std::optional<rgw_bucket> bucket;
- RGWBucketSyncFlowManager *parent{nullptr};
+ const RGWBucketSyncFlowManager *parent{nullptr};
map<string, rgw_sync_group_pipe_map> flow_groups;
RGWBucketSyncFlowManager(const string& _zone_name,
std::optional<rgw_bucket> _bucket,
- RGWBucketSyncFlowManager *_parent);
+ const RGWBucketSyncFlowManager *_parent);
void init(const rgw_sync_policy_info& sync_policy);
void reflect(std::optional<rgw_bucket> effective_bucket,
pipe_set *flow_by_source,
- pipe_set *flow_by_dest);
+ pipe_set *flow_by_dest) const;
};
class RGWBucketSyncPolicyHandler {
RGWSI_Zone *zone_svc;
RGWBucketInfo bucket_info;
+ std::unique_ptr<RGWBucketSyncFlowManager> flow_mgr;
std::set<string> source_zones;
public:
RGWBucketSyncPolicyHandler(RGWSI_Zone *_zone_svc,
- RGWBucketInfo& _bucket_info) : zone_svc(_zone_svc),
- bucket_info(_bucket_info) {}
+ RGWBucketInfo& _bucket_info);
int init();
std::map<string, std::set<peer_info> >& get_sources() {
class RGWPeriod;
class RGWZonePlacementInfo;
+class RGWBucketSyncFlowManager;
+
class RGWRESTConn;
class RGWSI_Zone : public RGWServiceInstance
uint32_t zone_short_id{0};
bool writeable_zone{false};
+ RGWBucketSyncFlowManager *sync_flow_mgr{nullptr};
+
RGWRESTConn *rest_master_conn{nullptr};
map<string, RGWRESTConn *> zone_conn_map;
std::vector<const RGWZone*> data_sync_source_zones;
int get_zonegroup(const string& id, RGWZoneGroup& zonegroup) const;
const RGWZone& get_zone() const;
+ const RGWBucketSyncFlowManager *get_sync_flow_manager() const {
+ return sync_flow_mgr;
+ }
+
const string& zone_name() const;
const string& zone_id() const;
uint32_t get_zone_short_id() const;