string zone_name = opt_target_zone.value_or(store->svc()->zone->zone_name());
- RGWBucketSyncFlowManager zone_flow(zone_name, nullopt, nullptr);
+ RGWBucketSyncPolicyHandler zone_policy_handler(RGWBucketSyncPolicyHandler(store->svc()->zone,
+ store->svc()->sync_modules,
+ opt_target_zone));
- zone_flow.init(zonegroup.sync_policy);
-
- RGWBucketSyncFlowManager *flow_mgr = &zone_flow;
-
- std::optional<RGWBucketSyncFlowManager> bucket_flow;
+ std::unique_ptr<RGWBucketSyncPolicyHandler> bucket_handler;
std::optional<rgw_bucket> eff_bucket = opt_bucket;
+ auto handler = &zone_policy_handler;
+
if (eff_bucket) {
rgw_bucket bucket;
RGWBucketInfo bucket_info;
return ret;
}
- rgw_sync_policy_info default_policy;
- rgw_sync_policy_info *policy;
-
-
- if (ret == -ENOENT) {
- cerr << "WARNING: bucket not found, simulating result" << std::endl;
- bucket = *eff_bucket;
+ if (ret >= 0) {
+ bucket_handler.reset(handler->alloc_child(bucket_info));
} else {
- eff_bucket = bucket_info.bucket;
+ cerr << "WARNING: bucket not found, simulating result" << std::endl;
+ bucket_handler.reset(handler->alloc_child(*eff_bucket, nullopt));
}
- if (bucket_info.sync_policy) {
- policy = (rgw_sync_policy_info *)bucket_info.sync_policy.get();
- bucket_flow.emplace(zone_name, bucket, &zone_flow);
-
- bucket_flow->init(*policy);
-
- flow_mgr = &(*bucket_flow);
- }
+ handler = bucket_handler.get();
}
- RGWBucketSyncFlowManager::pipe_set sources;
- RGWBucketSyncFlowManager::pipe_set dests;
+ RGWBucketSyncFlowManager::pipe_set *sources;
+ RGWBucketSyncFlowManager::pipe_set *dests;
- flow_mgr->reflect(eff_bucket, &sources, &dests);
+ handler->get_pipes(&sources, &dests);
{
Formatter::ObjectSection os(*formatter, "result");
- encode_json("sources", sources, formatter);
- encode_json("dests", dests, formatter);
+ encode_json("sources", *sources, formatter);
+ encode_json("dests", *dests, formatter);
}
formatter->flush(cout);
}
RGWBucketSyncPolicyHandler::RGWBucketSyncPolicyHandler(RGWSI_Zone *_zone_svc,
- RGWBucketInfo& _bucket_info) : zone_svc(_zone_svc),
- bucket_info(_bucket_info) {
- flow_mgr.reset(new RGWBucketSyncFlowManager(zone_svc->zone_name(),
- bucket_info.bucket,
- zone_svc->get_sync_flow_manager()));
+ RGWSI_SyncModules *sync_modules_svc,
+ std::optional<string> effective_zone) : zone_svc(_zone_svc) {
+ zone_name = effective_zone.value_or(zone_svc->zone_name());
+ flow_mgr.reset(new RGWBucketSyncFlowManager(zone_name,
+ nullopt,
+ nullptr));
+ sync_policy = zone_svc->get_zonegroup().sync_policy;
+
+ if (sync_policy.empty()) {
+ RGWSyncPolicyCompat::convert_old_sync_config(zone_svc, sync_modules_svc, &sync_policy);
+ }
+
+ init();
+}
+
+RGWBucketSyncPolicyHandler::RGWBucketSyncPolicyHandler(const RGWBucketSyncPolicyHandler *_parent,
+ const RGWBucketInfo& _bucket_info) : parent(_parent),
+ bucket_info(_bucket_info) {
+ if (_bucket_info.sync_policy) {
+ sync_policy = *_bucket_info.sync_policy;
+ }
+ bucket = _bucket_info.bucket;
+ zone_svc = parent->zone_svc;
+ flow_mgr.reset(new RGWBucketSyncFlowManager(parent->zone_name,
+ _bucket_info.bucket,
+ parent->flow_mgr.get()));
+ init();
+}
+
+RGWBucketSyncPolicyHandler::RGWBucketSyncPolicyHandler(const RGWBucketSyncPolicyHandler *_parent,
+ const rgw_bucket& _bucket,
+ std::optional<rgw_sync_policy_info> _sync_policy) : parent(_parent) {
+ if (_sync_policy) {
+ sync_policy = *_sync_policy;
+ }
+ bucket = _bucket;
+ zone_svc = parent->zone_svc;
+ flow_mgr.reset(new RGWBucketSyncFlowManager(parent->zone_name,
+ _bucket,
+ parent->flow_mgr.get()));
+ init();
}
-int RGWBucketSyncPolicyHandler::init()
+RGWBucketSyncPolicyHandler *RGWBucketSyncPolicyHandler::alloc_child(const RGWBucketInfo& bucket_info) const
{
- flow_mgr->init(*bucket_info.sync_policy);
+ return new RGWBucketSyncPolicyHandler(this, bucket_info);
+}
- RGWBucketSyncFlowManager::pipe_set sources_by_name;
- RGWBucketSyncFlowManager::pipe_set targets_by_name;
- flow_mgr->reflect(bucket_info.bucket, &sources_by_name, &targets_by_name);
+RGWBucketSyncPolicyHandler *RGWBucketSyncPolicyHandler::alloc_child(const rgw_bucket& bucket,
+ std::optional<rgw_sync_policy_info> sync_policy) const
+{
+ return new RGWBucketSyncPolicyHandler(this, bucket, sync_policy);
+}
+
+void RGWBucketSyncPolicyHandler::init()
+{
+ flow_mgr->init(sync_policy);
+
+ flow_mgr->reflect(bucket, &sources_by_name, &targets_by_name);
/* convert to zone ids */
if (!pipe.source.zone) {
continue;
}
+ source_zones.insert(*pipe.source.zone);
rgw_sync_bucket_pipe new_pipe = pipe;
string zone_id;
if (!pipe.dest.zone) {
continue;
}
+ target_zones.insert(*pipe.dest.zone);
rgw_sync_bucket_pipe new_pipe = pipe;
string zone_id;
if (zone_svc->find_zone_id_by_name(*pipe.dest.zone, &zone_id)) {
}
targets[*new_pipe.dest.zone].pipes.insert(new_pipe);
}
-
- return 0;
}
bool RGWBucketSyncPolicyHandler::bucket_exports_data() const
{
+ if (!bucket) {
+ return false;
+ }
+
if (bucket_is_sync_source()) {
return true;
}
return (zone_svc->need_to_log_data() &&
- bucket_info.datasync_flag_enabled());
+ bucket_info->datasync_flag_enabled());
}
bool RGWBucketSyncPolicyHandler::bucket_imports_data() const
class RGWSI_Zone;
class RGWSI_SyncModules;
+
struct rgw_sync_group_pipe_map;
struct rgw_sync_bucket_pipes;
struct rgw_sync_policy_info;
};
class RGWBucketSyncFlowManager {
+ friend class RGWBucketSyncPolicyHandler;
public:
struct pipe_set {
std::set<rgw_sync_bucket_pipe> pipes;
*/
void update_flow_maps(const rgw_sync_bucket_pipes& pipe);
+ void init(const rgw_sync_policy_info& sync_policy);
+
public:
RGWBucketSyncFlowManager(const string& _zone_name,
std::optional<rgw_bucket> _bucket,
const RGWBucketSyncFlowManager *_parent);
- void init(const rgw_sync_policy_info& sync_policy);
void reflect(std::optional<rgw_bucket> effective_bucket,
pipe_set *flow_by_source,
pipe_set *flow_by_dest) const;
};
class RGWBucketSyncPolicyHandler {
+ const RGWBucketSyncPolicyHandler *parent{nullptr};
RGWSI_Zone *zone_svc;
- RGWBucketInfo bucket_info;
+ string zone_name;
+ std::optional<RGWBucketInfo> bucket_info;
+ std::optional<rgw_bucket> bucket;
std::unique_ptr<RGWBucketSyncFlowManager> flow_mgr;
+ rgw_sync_policy_info sync_policy;
+
+ RGWBucketSyncFlowManager::pipe_set sources_by_name;
+ RGWBucketSyncFlowManager::pipe_set targets_by_name;
map<string, RGWBucketSyncFlowManager::pipe_set> sources; /* source pipes by source zone id */
map<string, RGWBucketSyncFlowManager::pipe_set> targets; /* target pipes by target zone id */
+ std::set<string> source_zones; /* source zones by name */
+ std::set<string> target_zones; /* target zones by name */
+
bool bucket_is_sync_source() const {
return !targets.empty();
}
return !sources.empty();
}
+ void init();
+
+ RGWBucketSyncPolicyHandler(const RGWBucketSyncPolicyHandler *_parent,
+ const RGWBucketInfo& _bucket_info);
+
+ RGWBucketSyncPolicyHandler(const RGWBucketSyncPolicyHandler *_parent,
+ const rgw_bucket& _bucket,
+ std::optional<rgw_sync_policy_info> _sync_policy);
public:
RGWBucketSyncPolicyHandler(RGWSI_Zone *_zone_svc,
- RGWBucketInfo& _bucket_info);
- int init();
+ RGWSI_SyncModules *sync_modules_svc,
+ std::optional<string> effective_zone = std::nullopt);
+
+ RGWBucketSyncPolicyHandler *alloc_child(const RGWBucketInfo& bucket_info) const;
+ RGWBucketSyncPolicyHandler *alloc_child(const rgw_bucket& bucket,
+ std::optional<rgw_sync_policy_info> sync_policy) const;
+
+ const std::set<string>& get_source_zones() const {
+ return source_zones;
+ }
+
+ const std::set<string>& get_target_zones() const {
+ return target_zones;
+ }
const map<string, RGWBucketSyncFlowManager::pipe_set>& get_sources() {
return sources;
}
- const RGWBucketInfo& get_bucket_info() const {
+ const std::optional<RGWBucketInfo>& get_bucket_info() const {
return bucket_info;
}
+ void get_pipes(RGWBucketSyncFlowManager::pipe_set **sources, RGWBucketSyncFlowManager::pipe_set **targets) { /* return raw pipes (with zone name) */
+ *sources = &sources_by_name;
+ *targets = &targets_by_name;
+ }
+
bool bucket_exports_data() const;
bool bucket_imports_data() const;
};
auto& handler = policy->policy_handler;
*sources = handler->get_sources();
- *pbucket_info = handler->get_bucket_info();
+ auto& binfo = handler->get_bucket_info();
+ if (binfo) {
+ *pbucket_info = *binfo;
+ }
}
return set_cr_done();
}
bucket_sync_policy_cache_entry e;
- e.handler.reset(new RGWBucketSyncPolicyHandler(svc.zone, bucket_info));
-
- r = e.handler->init();
- if (r < 0) {
- ldout(cct, 0) << "ERROR: RGWBucketSyncPolicyHandler::init() returned r=" << r << dendl;
- return r;
- }
+ e.handler.reset(svc.zone->get_sync_policy_handler()->alloc_child(bucket_info));
if (!sync_policy_cache->put(svc.cache, cache_key, &e, {&cache_info})) {
ldout(cct, 20) << "couldn't put bucket_sync_policy cache entry, might have raced with data changes" << dendl;
RGWSI_Zone::~RGWSI_Zone()
{
- delete sync_flow_mgr;
+ delete sync_policy_handler;
delete realm;
delete zonegroup;
delete zone_public_config;
zone_short_id = current_period->get_map().get_zone_short_id(zone_params->get_id());
- sync_flow_mgr = new RGWBucketSyncFlowManager(zone_params->get_name(),
- nullopt,
- nullptr);
-
- rgw_sync_policy_info sync_policy = zonegroup->sync_policy;
-
- if (sync_policy.empty()) {
- RGWSyncPolicyCompat::convert_old_sync_config(this, sync_modules_svc, &sync_policy);
- }
-
- sync_flow_mgr->init(sync_policy);
-
- RGWBucketSyncFlowManager::pipe_set zone_sources;
- RGWBucketSyncFlowManager::pipe_set zone_targets;
-
- sync_flow_mgr->reflect(nullopt, &zone_sources, &zone_targets);
+ sync_policy_handler = new RGWBucketSyncPolicyHandler(this, sync_modules_svc);
set<string> source_zones_by_name;
set<string> target_zones_by_name;
- for (auto& pipe : zone_sources.pipes) {
- if (pipe.source.zone) {
- source_zones_by_name.insert(*pipe.source.zone);
- }
+ for (auto& zone_name : sync_policy_handler->get_source_zones()) {
+ source_zones_by_name.insert(zone_name);
}
- for (auto& pipe : zone_targets.pipes) {
- if (pipe.dest.zone) {
- target_zones_by_name.insert(*pipe.dest.zone);
- }
+ for (auto& zone_name : sync_policy_handler->get_target_zones()) {
+ target_zones_by_name.insert(zone_name);
}
ret = sync_modules_svc->start();
class RGWPeriod;
class RGWZonePlacementInfo;
-class RGWBucketSyncFlowManager;
+class RGWBucketSyncPolicyHandler;
class RGWRESTConn;
+struct rgw_sync_policy_info;
+
class RGWSI_Zone : public RGWServiceInstance
{
friend struct RGWServices_Def;
uint32_t zone_short_id{0};
bool writeable_zone{false};
- RGWBucketSyncFlowManager *sync_flow_mgr{nullptr};
+ RGWBucketSyncPolicyHandler *sync_policy_handler{nullptr};
RGWRESTConn *rest_master_conn{nullptr};
map<string, RGWRESTConn *> zone_conn_map;
map<string, string> zone_id_by_name;
map<string, RGWZone> zone_by_id;
+ std::unique_ptr<rgw_sync_policy_info> sync_policy;
+
void init(RGWSI_SysObj *_sysobj_svc,
RGWSI_RADOS *_rados_svc,
RGWSI_SyncModules *_sync_modules_svc);
int get_zonegroup(const string& id, RGWZoneGroup& zonegroup) const;
const RGWZone& get_zone() const;
- const RGWBucketSyncFlowManager *get_sync_flow_manager() const {
- return sync_flow_mgr;
+ const RGWBucketSyncPolicyHandler *get_sync_policy_handler() const {
+ return sync_policy_handler;
}
const string& zone_name() const;