#include "rgw/rgw_bucket_sync.h"
#include "rgw/rgw_zone.h"
+#include "rgw/rgw_sync_policy.h"
#define dout_subsys ceph_subsys_rgw
return 0;
}
-
-int RGWSI_Bucket_Sync_SObj::get_policy_handler(RGWSI_Bucket_BI_Ctx& ctx,
- std::optional<string> zone,
- std::optional<rgw_bucket> _bucket,
- RGWBucketSyncPolicyHandlerRef *handler,
+void RGWSI_Bucket_Sync_SObj::get_hint_entities(RGWSI_Bucket_X_Ctx& ctx,
+ const std::set<string>& zone_names,
+ const std::set<rgw_bucket>& buckets,
+ std::set<rgw_sync_bucket_entity> *hint_entities,
optional_yield y)
+{
+ for (auto& zone : zone_names) {
+ for (auto& b : buckets) {
+ string zid;
+ if (!svc.zone->find_zone_id_by_name(zone, &zid)) {
+ cerr << "WARNING: cannot find zone id for zone=" << zone << ", skippping" << std::endl;
+ continue;
+ }
+
+ RGWBucketInfo hint_bucket_info;
+ int ret = svc.bucket_sobj->read_bucket_info(ctx, b, &hint_bucket_info,
+ nullptr, nullptr, boost::none,
+ y);
+ if (ret < 0) {
+ ldout(cct, 20) << "could not init bucket info for hint bucket=" << b << " ... skipping" << dendl;
+ continue;
+ }
+
+ hint_entities->insert(rgw_sync_bucket_entity(zid, hint_bucket_info.bucket));
+ }
+ }
+}
+
+int RGWSI_Bucket_Sync_SObj::resolve_policy_hints(RGWSI_Bucket_X_Ctx& ctx,
+ rgw_sync_bucket_entity& self_entity,
+ RGWBucketSyncPolicyHandlerRef& handler,
+ RGWBucketSyncPolicyHandlerRef& zone_policy_handler,
+ std::map<optional_zone_bucket, RGWBucketSyncPolicyHandlerRef>& temp_map,
+ optional_yield y)
+{
+ set<string> source_zones;
+ set<string> target_zones;
+
+ zone_policy_handler->reflect(nullptr, nullptr,
+ nullptr, nullptr,
+ &source_zones,
+ &target_zones,
+ false); /* relaxed: also get all zones that we allow to sync to/from */
+
+ std::set<rgw_sync_bucket_entity> hint_entities;
+
+ get_hint_entities(ctx, source_zones, handler->get_source_hints(), &hint_entities, y);
+ get_hint_entities(ctx, target_zones, handler->get_target_hints(), &hint_entities, y);
+
+ std::set<rgw_sync_bucket_pipe> resolved_sources;
+ std::set<rgw_sync_bucket_pipe> resolved_dests;
+
+ for (auto& hint_entity : hint_entities) {
+ if (!hint_entity.zone ||
+ !hint_entity.bucket) {
+ continue; /* shouldn't really happen */
+ }
+
+ auto& zid = *hint_entity.zone;
+ auto& hint_bucket = *hint_entity.bucket;
+
+ RGWBucketSyncPolicyHandlerRef hint_bucket_handler;
+
+ auto iter = temp_map.find(optional_zone_bucket(zid, hint_bucket));
+ if (iter != temp_map.end()) {
+ hint_bucket_handler = iter->second;
+ } else {
+ int r = do_get_policy_handler(ctx, zid, hint_bucket, temp_map, &hint_bucket_handler, y);
+ if (r < 0) {
+ ldout(cct, 20) << "could not get bucket sync policy handler for hint bucket=" << hint_bucket << " ... skipping" << dendl;
+ continue;
+ }
+ }
+
+ hint_bucket_handler->get_pipes(&resolved_dests,
+ &resolved_sources,
+ self_entity); /* flipping resolved dests and sources as these are
+ relative to the remote entity */
+ }
+
+ handler->set_resolved_hints(std::move(resolved_sources), std::move(resolved_dests));
+
+ return 0;
+}
+
+int RGWSI_Bucket_Sync_SObj::do_get_policy_handler(RGWSI_Bucket_X_Ctx& ctx,
+ std::optional<string> zone,
+ std::optional<rgw_bucket> _bucket,
+ std::map<optional_zone_bucket, RGWBucketSyncPolicyHandlerRef>& temp_map,
+ RGWBucketSyncPolicyHandlerRef *handler,
+ optional_yield y)
{
if (!_bucket) {
*handler = svc.zone->get_sync_policy_handler(zone);
RGWBucketInfo bucket_info;
- int r = svc.bucket_sobj->read_bucket_instance_info(ctx,
+ int r = svc.bucket_sobj->read_bucket_instance_info(ctx.bi,
bucket_key,
&bucket_info,
nullptr,
return r;
}
- e.handler.reset(svc.zone->get_sync_policy_handler(zone)->alloc_child(bucket_info));
+ auto zone_policy_handler = svc.zone->get_sync_policy_handler(zone);
+
+ e.handler.reset(zone_policy_handler->alloc_child(bucket_info));
r = e.handler->init(y);
if (r < 0) {
return r;
}
+ temp_map.emplace(optional_zone_bucket{zone, bucket}, e.handler);
+
+ rgw_sync_bucket_entity self_entity(zone.value_or(svc.zone->zone_id()), bucket);
+
+ r = resolve_policy_hints(ctx, self_entity,
+ e.handler,
+ zone_policy_handler,
+ temp_map, y);
+ if (r < 0) {
+ ldout(cct, 20) << "ERROR: failed to resolve policy hints: bucket_key=" << bucket_key << ", r=" << r << dendl;
+ return r;
+ }
+
if (!sync_policy_cache->put(svc.cache, cache_key, &e, {&cache_info})) {
ldout(cct, 20) << "couldn't put bucket_sync_policy cache entry, might have raced with data changes" << dendl;
}
return 0;
}
+int RGWSI_Bucket_Sync_SObj::get_policy_handler(RGWSI_Bucket_X_Ctx& ctx,
+ std::optional<string> zone,
+ std::optional<rgw_bucket> _bucket,
+ RGWBucketSyncPolicyHandlerRef *handler,
+ optional_yield y)
+{
+ std::map<optional_zone_bucket, RGWBucketSyncPolicyHandlerRef> temp_map;
+ return do_get_policy_handler(ctx, zone, _bucket, temp_map, handler, y);
+}
+
static bool diff_sets(std::set<rgw_bucket>& orig_set,
std::set<rgw_bucket>& new_set,
vector<rgw_bucket> *added,
class RGWSI_Bucket_Sync_SObj_HintIndexManager;
+struct rgw_sync_bucket_entity;
+
class RGWSI_Bucket_Sync_SObj : public RGWSI_Bucket_Sync
{
struct bucket_sync_policy_cache_entry {
int do_start() override;
+ struct optional_zone_bucket {
+ optional<string> zone;
+ optional<rgw_bucket> bucket;
+
+ optional_zone_bucket(const optional<string>& _zone,
+ const optional<rgw_bucket>& _bucket) : zone(_zone), bucket(_bucket) {}
+
+ bool operator<(const optional_zone_bucket& ozb) const {
+ if (zone < ozb.zone) {
+ return true;
+ }
+ if (zone > ozb.zone) {
+ return false;
+ }
+ return bucket < ozb.bucket;
+ }
+ };
+
+ void get_hint_entities(RGWSI_Bucket_X_Ctx& ctx,
+ const std::set<string>& zone_names,
+ const std::set<rgw_bucket>& buckets,
+ std::set<rgw_sync_bucket_entity> *hint_entities,
+ optional_yield y);
+ int resolve_policy_hints(RGWSI_Bucket_X_Ctx& ctx,
+ rgw_sync_bucket_entity& self_entity,
+ RGWBucketSyncPolicyHandlerRef& handler,
+ RGWBucketSyncPolicyHandlerRef& zone_policy_handler,
+ std::map<optional_zone_bucket, RGWBucketSyncPolicyHandlerRef>& temp_map,
+ optional_yield y);
+ int do_get_policy_handler(RGWSI_Bucket_X_Ctx& ctx,
+ std::optional<string> zone,
+ std::optional<rgw_bucket> _bucket,
+ std::map<optional_zone_bucket, RGWBucketSyncPolicyHandlerRef>& temp_map,
+ RGWBucketSyncPolicyHandlerRef *handler,
+ optional_yield y);
public:
struct Svc {
RGWSI_Zone *zone{nullptr};
RGWSI_Bucket_SObj *_bucket_sobj_svc);
- int get_policy_handler(RGWSI_Bucket_BI_Ctx& ctx,
+ int get_policy_handler(RGWSI_Bucket_X_Ctx& ctx,
std::optional<string> zone,
std::optional<rgw_bucket> bucket,
RGWBucketSyncPolicyHandlerRef *handler,
- optional_yield y) override;
+ optional_yield y);
int handle_bi_update(RGWBucketInfo& bucket_info,
RGWBucketInfo *orig_bucket_info,