So that we can use the correct policy handler when checking source bucket.
Also build list of source zones for data sync out of all potential zones,
and not just the ones that are enabled, because it can be enabled at the
bucket level.
Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
static int sync_info(std::optional<string> opt_target_zone, std::optional<rgw_bucket> opt_bucket, Formatter *formatter)
{
- const RGWRealm& realm = store->svc()->zone->get_realm();
- const RGWZoneGroup& zonegroup = store->svc()->zone->get_zonegroup();
- const RGWZone& zone = store->svc()->zone->get_zone();
+ std::optional<string> zone_id;
- string zone_name = opt_target_zone.value_or(store->svc()->zone->zone_name());
+ if (opt_target_zone) {
+ string zid;
+ if (!store->svc()->zone->find_zone_id_by_name(*opt_target_zone, &zid)) {
+ cerr << "WARNING: cannot find zone id for zone=" << *opt_target_zone << std::endl;
+ return -ENOENT;
+ }
+ zone_id = zid;
+ }
- RGWBucketSyncPolicyHandler zone_policy_handler(RGWBucketSyncPolicyHandler(store->svc()->zone,
- store->svc()->sync_modules,
- opt_target_zone));
+ auto zone_policy_handler = store->svc()->zone->get_sync_policy_handler(zone_id);
- std::unique_ptr<RGWBucketSyncPolicyHandler> bucket_handler;
+ RGWBucketSyncPolicyHandlerRef bucket_handler;
std::optional<rgw_bucket> eff_bucket = opt_bucket;
- auto handler = &zone_policy_handler;
+ auto handler = zone_policy_handler;
if (eff_bucket) {
rgw_bucket bucket;
bucket_handler.reset(handler->alloc_child(*eff_bucket, nullopt));
}
- handler = bucket_handler.get();
+ handler = bucket_handler;
}
RGWBucketSyncFlowManager::pipe_set *sources;
RGWBucketSyncPolicyHandlerRef handler;
- int r = store->ctl()->bucket->get_sync_policy_handler(info.bucket, &handler, null_yield);
+ int r = store->ctl()->bucket->get_sync_policy_handler(std::nullopt, info.bucket, &handler, null_yield);
if (r < 0) {
lderr(store->ctx()) << "ERROR: failed to get policy handler for bucket (" << info.bucket << "): r=" << r << ": " << cpp_strerror(-r) << dendl;
return r;
RGWBucketSyncPolicyHandlerRef handler;
- int r = store->ctl()->bucket->get_sync_policy_handler(info.bucket, &handler, null_yield);
+ int r = store->ctl()->bucket->get_sync_policy_handler(std::nullopt, info.bucket, &handler, null_yield);
if (r < 0) {
lderr(store->ctx()) << "ERROR: failed to get policy handler for bucket (" << info.bucket << "): r=" << r << ": " << cpp_strerror(-r) << dendl;
return r;
return ctl.user->flush_bucket_stats(user_id, *pent);
}
-int RGWBucketCtl::get_sync_policy_handler(const rgw_bucket& bucket,
+int RGWBucketCtl::get_sync_policy_handler(std::optional<string> zone,
+ std::optional<rgw_bucket> bucket,
RGWBucketSyncPolicyHandlerRef *phandler,
optional_yield y)
{
int r = call([&](RGWSI_Bucket_X_Ctx& ctx) {
- return svc.bucket_sync->get_policy_handler(ctx.bi, bucket, phandler, y);
+ return svc.bucket_sync->get_policy_handler(ctx.bi, zone, bucket, phandler, y);
});
if (r < 0) {
ldout(cct, 20) << __func__ << "(): failed to get policy handler for bucket=" << bucket << " (r=" << r << ")" << dendl;
RGWBucketSyncPolicyHandlerRef handler;
- int r = get_sync_policy_handler(bucket, &handler, y);
+ int r = get_sync_policy_handler(std::nullopt, bucket, &handler, y);
if (r < 0) {
return r;
}
RGWBucketSyncPolicyHandlerRef handler;
- int r = get_sync_policy_handler(bucket, &handler, y);
+ int r = get_sync_policy_handler(std::nullopt, bucket, &handler, y);
if (r < 0) {
return r;
}
RGWBucketEnt* pent = nullptr);
/* bucket sync */
- int get_sync_policy_handler(const rgw_bucket& bucket,
+ int get_sync_policy_handler(std::optional<string> zone,
+ std::optional<rgw_bucket> bucket,
RGWBucketSyncPolicyHandlerRef *phandler,
optional_yield y);
int bucket_exports_data(const rgw_bucket& bucket,
void RGWBucketSyncFlowManager::reflect(std::optional<rgw_bucket> effective_bucket,
RGWBucketSyncFlowManager::pipe_set *source_pipes,
- RGWBucketSyncFlowManager::pipe_set *dest_pipes) const
+ RGWBucketSyncFlowManager::pipe_set *dest_pipes,
+ bool only_enabled) const
{
rgw_sync_bucket_entity entity;
entity.bucket = effective_bucket.value_or(rgw_bucket());
if (parent) {
- parent->reflect(effective_bucket, source_pipes, dest_pipes);
+ parent->reflect(effective_bucket, source_pipes, dest_pipes, only_enabled);
}
for (auto& item : flow_groups) {
auto& flow_group_map = item.second;
/* only return enabled groups */
- if (flow_group_map.status != rgw_sync_policy_group::Status::ENABLED) {
+ if (flow_group_map.status != rgw_sync_policy_group::Status::ENABLED &&
+ (only_enabled || flow_group_map.status != rgw_sync_policy_group::Status::ALLOWED)) {
continue;
}
{
flow_mgr->init(sync_policy);
- flow_mgr->reflect(bucket, &sources_by_name, &targets_by_name);
+ reflect(&sources_by_name,
+ &targets_by_name,
+ &sources,
+ &targets,
+ &source_zones,
+ &target_zones,
+ true);
+}
+
+void RGWBucketSyncPolicyHandler::reflect(RGWBucketSyncFlowManager::pipe_set *psources_by_name,
+ RGWBucketSyncFlowManager::pipe_set *ptargets_by_name,
+ map<string, RGWBucketSyncFlowManager::pipe_set> *psources,
+ map<string, RGWBucketSyncFlowManager::pipe_set> *ptargets,
+ std::set<string> *psource_zones,
+ std::set<string> *ptarget_zones,
+ bool only_enabled) const
+{
+ RGWBucketSyncFlowManager::pipe_set _sources_by_name;
+ RGWBucketSyncFlowManager::pipe_set _targets_by_name;
+ map<string, RGWBucketSyncFlowManager::pipe_set> _sources;
+ map<string, RGWBucketSyncFlowManager::pipe_set> _targets;
+ std::set<string> _source_zones;
+ std::set<string> _target_zones;
- /* convert to zone ids */
+ flow_mgr->reflect(bucket, &_sources_by_name, &_targets_by_name, only_enabled);
- for (auto& pipe : sources_by_name.pipes) {
+ for (auto& pipe : _sources_by_name.pipes) {
if (!pipe.source.zone) {
continue;
}
- source_zones.insert(*pipe.source.zone);
+ _source_zones.insert(*pipe.source.zone);
rgw_sync_bucket_pipe new_pipe = pipe;
string zone_id;
if (zone_svc->find_zone_id_by_name(*pipe.source.zone, &zone_id)) {
new_pipe.source.zone = zone_id;
}
- sources[*new_pipe.source.zone].pipes.insert(new_pipe);
+ _sources[*new_pipe.source.zone].pipes.insert(new_pipe);
}
- for (auto& pipe : targets_by_name.pipes) {
+ for (auto& pipe : _targets_by_name.pipes) {
if (!pipe.dest.zone) {
continue;
}
- target_zones.insert(*pipe.dest.zone);
+ _target_zones.insert(*pipe.dest.zone);
rgw_sync_bucket_pipe new_pipe = pipe;
string zone_id;
if (zone_svc->find_zone_id_by_name(*pipe.dest.zone, &zone_id)) {
new_pipe.dest.zone = zone_id;
}
- targets[*new_pipe.dest.zone].pipes.insert(new_pipe);
+ _targets[*new_pipe.dest.zone].pipes.insert(new_pipe);
+ }
+
+ if (psources_by_name) {
+ *psources_by_name = std::move(_sources_by_name);
+ }
+ if (ptargets_by_name) {
+ *ptargets_by_name = std::move(_targets_by_name);
+ }
+ if (psources) {
+ *psources = std::move(_sources);
+ }
+ if (ptargets) {
+ *ptargets = std::move(_targets);
+ }
+ if (psource_zones) {
+ *psource_zones = std::move(_source_zones);
+ }
+ if (ptarget_zones) {
+ *ptarget_zones = std::move(_target_zones);
}
}
void reflect(std::optional<rgw_bucket> effective_bucket,
pipe_set *flow_by_source,
- pipe_set *flow_by_dest) const;
+ pipe_set *flow_by_dest,
+ bool only_enabled) const;
};
RGWBucketSyncPolicyHandler *alloc_child(const rgw_bucket& bucket,
std::optional<rgw_sync_policy_info> sync_policy) const;
+ void reflect(RGWBucketSyncFlowManager::pipe_set *psources_by_name,
+ RGWBucketSyncFlowManager::pipe_set *ptargets_by_name,
+ map<string, RGWBucketSyncFlowManager::pipe_set> *psources,
+ map<string, RGWBucketSyncFlowManager::pipe_set> *ptargets,
+ std::set<string> *psource_zones,
+ std::set<string> *ptarget_zones,
+ bool only_enabled) const;
+
const std::set<string>& get_source_zones() const {
return source_zones;
}
{
CephContext *cct = store->ctx();
- int r = store->ctl()->bucket->get_sync_policy_handler(params.bucket,
+ int r = store->ctl()->bucket->get_sync_policy_handler(params.zone,
+ params.bucket,
&result->policy_handler,
null_yield);
if (r < 0) {
using RGWBucketLifecycleConfigCR = RGWSimpleWriteOnlyAsyncCR<rgw_bucket_lifecycle_config_params>;
struct rgw_bucket_get_sync_policy_params {
- rgw_bucket bucket;
+ std::optional<string> zone;
+ std::optional<rgw_bucket> bucket;
};
struct rgw_bucket_get_sync_policy_result {
if (!binfo ||
binfo->bucket != *e.bucket) {
bucket_info.bucket = *e.bucket;
+ } else {
+ set_bucket_info(*binfo);
}
- set_bucket_info(*binfo);
}
void update_empty_bucket_info(const std::map<rgw_bucket, RGWBucketInfo>& buckets_info) {
} else {
sync_pair.source_bs.bucket = siter->source.get_bucket();
}
+ sync_pair.dest_bs.bucket = siter->target.get_bucket();
if (sync_pair.source_bs.shard_id >= 0) {
num_shards = 1;
return set_cr_error(retcode);
}
+ get_policy_params.zone = nullopt;
get_policy_params.bucket = *target_bucket;
yield call(new RGWBucketGetSyncPolicyHandlerCR(sync_env->async_rados,
sync_env->store,
return set_cr_error(retcode);
}
+ get_policy_params.zone = source_zone;
get_policy_params.bucket = *source_bucket;
yield call(new RGWBucketGetSyncPolicyHandlerCR(sync_env->async_rados,
sync_env->store,
RGWSI_Bucket_Sync(CephContext *cct) : RGWServiceInstance(cct) {}
virtual int get_policy_handler(RGWSI_Bucket_BI_Ctx& ctx,
- const rgw_bucket& bucket,
+ std::optional<string> zone,
+ std::optional<rgw_bucket> bucket,
RGWBucketSyncPolicyHandlerRef *handler,
optional_yield y) = 0;
};
}
int RGWSI_Bucket_Sync_SObj::get_policy_handler(RGWSI_Bucket_BI_Ctx& ctx,
- const rgw_bucket& bucket,
+ std::optional<string> zone,
+ std::optional<rgw_bucket> _bucket,
RGWBucketSyncPolicyHandlerRef *handler,
optional_yield y)
{
- string key = RGWSI_Bucket::get_bi_meta_key(bucket);
- string cache_key("bi/");
- cache_key.append(key);
+ if (!_bucket) {
+ *handler = svc.zone->get_sync_policy_handler(zone);
+ return 0;
+ }
+
+ auto& bucket = *_bucket;
+
+ string zone_key;
+ string bucket_key;
+
+ if (zone && *zone != svc.zone->zone_id()) {
+ zone_key = *zone;
+ }
+
+ bucket_key = RGWSI_Bucket::get_bi_meta_key(bucket);
+
+ string cache_key("bi/" + zone_key + "/" + bucket_key);
if (auto e = sync_policy_cache->find(cache_key)) {
*handler = e->handler;
return 0;
}
-
+ bucket_sync_policy_cache_entry e;
rgw_cache_entry_info cache_info;
RGWBucketInfo bucket_info;
int r = svc.bucket_sobj->read_bucket_instance_info(ctx,
- key,
+ bucket_key,
&bucket_info,
nullptr,
nullptr,
&cache_info);
if (r < 0) {
if (r != -ENOENT) {
- ldout(cct, 0) << "ERROR: svc.bucket->read_bucket_instance_info(key=" << key << ") returned r=" << r << dendl;
+ ldout(cct, 0) << "ERROR: svc.bucket->read_bucket_instance_info(key=" << bucket_key << ") returned r=" << r << dendl;
}
return r;
}
- bucket_sync_policy_cache_entry e;
- e.handler.reset(svc.zone->get_sync_policy_handler()->alloc_child(bucket_info));
+ e.handler.reset(svc.zone->get_sync_policy_handler(zone)->alloc_child(bucket_info));
if (!sync_policy_cache->put(svc.cache, cache_key, &e, {&cache_info})) {
ldout(cct, 20) << "couldn't put bucket_sync_policy cache entry, might have raced with data changes" << dendl;
int get_policy_handler(RGWSI_Bucket_BI_Ctx& ctx,
- const rgw_bucket& bucket,
+ std::optional<string> zone,
+ std::optional<rgw_bucket> bucket,
RGWBucketSyncPolicyHandlerRef *handler,
optional_yield y) override;
};
RGWSI_Zone::~RGWSI_Zone()
{
- delete sync_policy_handler;
delete realm;
delete zonegroup;
delete zone_public_config;
delete current_period;
}
+std::shared_ptr<RGWBucketSyncPolicyHandler> RGWSI_Zone::get_sync_policy_handler(std::optional<string> zone) const {
+ if (!zone || *zone == zone_id()) {
+ return sync_policy_handler;
+ }
+ auto iter = sync_policy_handlers.find(*zone);
+ if (iter == sync_policy_handlers.end()) {
+ return std::shared_ptr<RGWBucketSyncPolicyHandler>();
+ }
+ return iter->second;
+}
+
bool RGWSI_Zone::zone_syncs_from(const RGWZone& target_zone, const RGWZone& source_zone) const
{
return target_zone.syncs_from(source_zone.name) &&
zone_short_id = current_period->get_map().get_zone_short_id(zone_params->get_id());
- sync_policy_handler = new RGWBucketSyncPolicyHandler(this, sync_modules_svc);
+ for (auto ziter : zonegroup->zones) {
+ sync_policy_handlers[ziter.second.id].reset(new RGWBucketSyncPolicyHandler(this, sync_modules_svc, ziter.second.name));
+ }
+
+ sync_policy_handler = sync_policy_handlers[zone_id()]; /* we made sure earlier that zonegroup->zones has our zone */
set<string> source_zones_by_name;
set<string> target_zones_by_name;
- for (auto& zone_name : sync_policy_handler->get_source_zones()) {
- source_zones_by_name.insert(zone_name);
- }
-
- for (auto& zone_name : sync_policy_handler->get_target_zones()) {
- target_zones_by_name.insert(zone_name);
- }
+ sync_policy_handler->reflect(nullptr, nullptr,
+ nullptr, nullptr,
+ &source_zones_by_name,
+ &target_zones_by_name,
+ false); /* relaxed: also get all zones that we allow to sync to/from */
ret = sync_modules_svc->start();
if (ret < 0) {
uint32_t zone_short_id{0};
bool writeable_zone{false};
- RGWBucketSyncPolicyHandler *sync_policy_handler{nullptr};
+ std::shared_ptr<RGWBucketSyncPolicyHandler> sync_policy_handler;
+ std::map<string, std::shared_ptr<RGWBucketSyncPolicyHandler> > sync_policy_handlers;
RGWRESTConn *rest_master_conn{nullptr};
map<string, RGWRESTConn *> zone_conn_map;
int get_zonegroup(const string& id, RGWZoneGroup& zonegroup) const;
const RGWZone& get_zone() const;
- const RGWBucketSyncPolicyHandler *get_sync_policy_handler() const {
- return sync_policy_handler;
- }
+ std::shared_ptr<RGWBucketSyncPolicyHandler> get_sync_policy_handler(std::optional<string> zone = nullopt) const;
const string& zone_name() const;
const string& zone_id() const;