]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: svc.zone: keep policy handler for all zones
authorYehuda Sadeh <yehuda@redhat.com>
Fri, 1 Nov 2019 02:19:58 +0000 (19:19 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Tue, 28 Jan 2020 18:20:38 +0000 (10:20 -0800)
So that we can use the correct policy handler when checking source bucket.
Also build list of source zones for data sync out of all potential zones,
and not just the ones that are enabled, because it can be enabled at the
bucket level.

Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
13 files changed:
src/rgw/rgw_admin.cc
src/rgw/rgw_bucket.cc
src/rgw/rgw_bucket.h
src/rgw/rgw_bucket_sync.cc
src/rgw/rgw_bucket_sync.h
src/rgw/rgw_cr_tools.cc
src/rgw/rgw_cr_tools.h
src/rgw/rgw_data_sync.cc
src/rgw/services/svc_bucket_sync.h
src/rgw/services/svc_bucket_sync_sobj.cc
src/rgw/services/svc_bucket_sync_sobj.h
src/rgw/services/svc_zone.cc
src/rgw/services/svc_zone.h

index 7771b0a85a8b8201f809aaf413a21d5953fefeb4..1c5d5b574b8df5227155cb5ff7a2957581185d3c 100644 (file)
@@ -2333,21 +2333,24 @@ void encode_json(const char *name, const RGWBucketSyncFlowManager::pipe_set& pse
 
 static int sync_info(std::optional<string> opt_target_zone, std::optional<rgw_bucket> opt_bucket, Formatter *formatter)
 {
-  const RGWRealm& realm = store->svc()->zone->get_realm();
-  const RGWZoneGroup& zonegroup = store->svc()->zone->get_zonegroup();
-  const RGWZone& zone = store->svc()->zone->get_zone();
+  std::optional<string> zone_id;
 
-  string zone_name = opt_target_zone.value_or(store->svc()->zone->zone_name());
+  if (opt_target_zone) {
+    string zid;
+    if (!store->svc()->zone->find_zone_id_by_name(*opt_target_zone, &zid)) {
+      cerr << "WARNING: cannot find zone id for zone=" << *opt_target_zone << std::endl;
+      return -ENOENT;
+    }
+    zone_id = zid;
+  }
 
-  RGWBucketSyncPolicyHandler zone_policy_handler(RGWBucketSyncPolicyHandler(store->svc()->zone,
-                                                                            store->svc()->sync_modules,
-                                                                            opt_target_zone));
+  auto zone_policy_handler = store->svc()->zone->get_sync_policy_handler(zone_id);
 
-  std::unique_ptr<RGWBucketSyncPolicyHandler> bucket_handler;
+  RGWBucketSyncPolicyHandlerRef bucket_handler;
 
   std::optional<rgw_bucket> eff_bucket = opt_bucket;
 
-  auto handler = &zone_policy_handler;
+  auto handler = zone_policy_handler;
 
   if (eff_bucket) {
     rgw_bucket bucket;
@@ -2366,7 +2369,7 @@ static int sync_info(std::optional<string> opt_target_zone, std::optional<rgw_bu
       bucket_handler.reset(handler->alloc_child(*eff_bucket, nullopt));
     }
 
-    handler = bucket_handler.get();
+    handler = bucket_handler;
   }
 
   RGWBucketSyncFlowManager::pipe_set *sources;
@@ -2405,7 +2408,7 @@ static int bucket_sync_info(rgw::sal::RGWRadosStore *store, const RGWBucketInfo&
 
   RGWBucketSyncPolicyHandlerRef handler;
 
-  int r = store->ctl()->bucket->get_sync_policy_handler(info.bucket, &handler, null_yield);
+  int r = store->ctl()->bucket->get_sync_policy_handler(std::nullopt, info.bucket, &handler, null_yield);
   if (r < 0) {
     lderr(store->ctx()) << "ERROR: failed to get policy handler for bucket (" << info.bucket << "): r=" << r << ": " << cpp_strerror(-r) << dendl;
     return r;
@@ -2445,7 +2448,7 @@ static int bucket_sync_status(rgw::sal::RGWRadosStore *store, const RGWBucketInf
 
   RGWBucketSyncPolicyHandlerRef handler;
 
-  int r = store->ctl()->bucket->get_sync_policy_handler(info.bucket, &handler, null_yield);
+  int r = store->ctl()->bucket->get_sync_policy_handler(std::nullopt, info.bucket, &handler, null_yield);
   if (r < 0) {
     lderr(store->ctx()) << "ERROR: failed to get policy handler for bucket (" << info.bucket << "): r=" << r << ": " << cpp_strerror(-r) << dendl;
     return r;
index cbbf015ee1dad20bdcaaf4324703a3a6331df509..ff2829b60696f9fa54e12d990bce07089c2c83e5 100644 (file)
@@ -3662,12 +3662,13 @@ int RGWBucketCtl::sync_user_stats(const rgw_user& user_id,
   return ctl.user->flush_bucket_stats(user_id, *pent);
 }
 
-int RGWBucketCtl::get_sync_policy_handler(const rgw_bucket& bucket,
+int RGWBucketCtl::get_sync_policy_handler(std::optional<string> zone,
+                                          std::optional<rgw_bucket> bucket,
                                           RGWBucketSyncPolicyHandlerRef *phandler,
                                           optional_yield y)
 {
   int r = call([&](RGWSI_Bucket_X_Ctx& ctx) {
-    return svc.bucket_sync->get_policy_handler(ctx.bi, bucket, phandler, y);
+    return svc.bucket_sync->get_policy_handler(ctx.bi, zone, bucket, phandler, y);
   });
   if (r < 0) {
     ldout(cct, 20) << __func__ << "(): failed to get policy handler for bucket=" << bucket << " (r=" << r << ")" << dendl;
@@ -3682,7 +3683,7 @@ int RGWBucketCtl::bucket_exports_data(const rgw_bucket& bucket,
 
   RGWBucketSyncPolicyHandlerRef handler;
 
-  int r = get_sync_policy_handler(bucket, &handler, y);
+  int r = get_sync_policy_handler(std::nullopt, bucket, &handler, y);
   if (r < 0) {
     return r;
   }
@@ -3696,7 +3697,7 @@ int RGWBucketCtl::bucket_imports_data(const rgw_bucket& bucket,
 
   RGWBucketSyncPolicyHandlerRef handler;
 
-  int r = get_sync_policy_handler(bucket, &handler, y);
+  int r = get_sync_policy_handler(std::nullopt, bucket, &handler, y);
   if (r < 0) {
     return r;
   }
index 6c22a9b413b1efc565d2b5027368090f500abcd7..520a1ad80ce8d5ae0e53c548f8cad020f91862c7 100644 (file)
@@ -870,7 +870,8 @@ public:
                       RGWBucketEnt* pent = nullptr);
 
   /* bucket sync */
-  int get_sync_policy_handler(const rgw_bucket& bucket,
+  int get_sync_policy_handler(std::optional<string> zone,
+                              std::optional<rgw_bucket> bucket,
                              RGWBucketSyncPolicyHandlerRef *phandler,
                              optional_yield y);
   int bucket_exports_data(const rgw_bucket& bucket,
index dfc764d2c0efb3cf1e1eeb47a227a2453bc96227..184a31becff06399505a860b928e86cdf9f89e78 100644 (file)
@@ -343,7 +343,8 @@ void RGWBucketSyncFlowManager::init(const rgw_sync_policy_info& sync_policy) {
 
 void RGWBucketSyncFlowManager::reflect(std::optional<rgw_bucket> effective_bucket,
                                        RGWBucketSyncFlowManager::pipe_set *source_pipes,
-                                       RGWBucketSyncFlowManager::pipe_set *dest_pipes) const
+                                       RGWBucketSyncFlowManager::pipe_set *dest_pipes,
+                                       bool only_enabled) const
 
 {
   rgw_sync_bucket_entity entity;
@@ -351,14 +352,15 @@ void RGWBucketSyncFlowManager::reflect(std::optional<rgw_bucket> effective_bucke
   entity.bucket = effective_bucket.value_or(rgw_bucket());
 
   if (parent) {
-    parent->reflect(effective_bucket, source_pipes, dest_pipes);
+    parent->reflect(effective_bucket, source_pipes, dest_pipes, only_enabled);
   }
 
   for (auto& item : flow_groups) {
     auto& flow_group_map = item.second;
 
     /* only return enabled groups */
-    if (flow_group_map.status != rgw_sync_policy_group::Status::ENABLED) {
+    if (flow_group_map.status != rgw_sync_policy_group::Status::ENABLED &&
+        (only_enabled || flow_group_map.status != rgw_sync_policy_group::Status::ALLOWED)) {
       continue;
     }
 
@@ -504,34 +506,75 @@ void RGWBucketSyncPolicyHandler::init()
 {
   flow_mgr->init(sync_policy);
 
-  flow_mgr->reflect(bucket, &sources_by_name, &targets_by_name);
+  reflect(&sources_by_name,
+          &targets_by_name,
+          &sources,
+          &targets,
+          &source_zones,
+          &target_zones,
+          true);
+}
+
+void RGWBucketSyncPolicyHandler::reflect(RGWBucketSyncFlowManager::pipe_set *psources_by_name,
+                                         RGWBucketSyncFlowManager::pipe_set *ptargets_by_name,
+                                         map<string, RGWBucketSyncFlowManager::pipe_set> *psources,
+                                         map<string, RGWBucketSyncFlowManager::pipe_set> *ptargets,
+                                         std::set<string> *psource_zones,
+                                         std::set<string> *ptarget_zones,
+                                         bool only_enabled) const
+{
+  RGWBucketSyncFlowManager::pipe_set _sources_by_name;
+  RGWBucketSyncFlowManager::pipe_set _targets_by_name;
+  map<string, RGWBucketSyncFlowManager::pipe_set> _sources;
+  map<string, RGWBucketSyncFlowManager::pipe_set> _targets;
+  std::set<string> _source_zones;
+  std::set<string> _target_zones;
 
-  /* convert to zone ids */
+  flow_mgr->reflect(bucket, &_sources_by_name, &_targets_by_name, only_enabled);
 
-  for (auto& pipe : sources_by_name.pipes) {
+  for (auto& pipe : _sources_by_name.pipes) {
     if (!pipe.source.zone) {
       continue;
     }
-    source_zones.insert(*pipe.source.zone);
+    _source_zones.insert(*pipe.source.zone);
     rgw_sync_bucket_pipe new_pipe = pipe;
     string zone_id;
 
     if (zone_svc->find_zone_id_by_name(*pipe.source.zone, &zone_id)) {
       new_pipe.source.zone = zone_id;
     }
-    sources[*new_pipe.source.zone].pipes.insert(new_pipe);
+    _sources[*new_pipe.source.zone].pipes.insert(new_pipe);
   }
-  for (auto& pipe : targets_by_name.pipes) {
+  for (auto& pipe : _targets_by_name.pipes) {
     if (!pipe.dest.zone) {
       continue;
     }
-    target_zones.insert(*pipe.dest.zone);
+    _target_zones.insert(*pipe.dest.zone);
     rgw_sync_bucket_pipe new_pipe = pipe;
     string zone_id;
     if (zone_svc->find_zone_id_by_name(*pipe.dest.zone, &zone_id)) {
       new_pipe.dest.zone = zone_id;
     }
-    targets[*new_pipe.dest.zone].pipes.insert(new_pipe);
+    _targets[*new_pipe.dest.zone].pipes.insert(new_pipe);
+  }
+
+  if (psources_by_name) {
+    *psources_by_name = std::move(_sources_by_name);
+  }
+  if (ptargets_by_name) {
+    *ptargets_by_name = std::move(_targets_by_name);
+  }
+  if (psources) {
+    *psources = std::move(_sources);
+  }
+  if (ptargets) {
+    *ptargets = std::move(_targets);
+  }
+  if (psource_zones) {
+    *psource_zones = std::move(_source_zones);
+  }
+  if (ptarget_zones) {
+    *ptarget_zones = std::move(_target_zones);
   }
 }
 
index 17cd4289e98e8c54e0f192c826bfea31bc71bc1e..9a0c447ace1c80358d293af1aea3d298176380a9 100644 (file)
@@ -166,7 +166,8 @@ public:
 
   void reflect(std::optional<rgw_bucket> effective_bucket,
                pipe_set *flow_by_source,
-               pipe_set *flow_by_dest) const;
+               pipe_set *flow_by_dest,  
+               bool only_enabled) const;
 
 };
 
@@ -213,6 +214,14 @@ public:
   RGWBucketSyncPolicyHandler *alloc_child(const rgw_bucket& bucket,
                                           std::optional<rgw_sync_policy_info> sync_policy) const;
 
+  void reflect(RGWBucketSyncFlowManager::pipe_set *psources_by_name,
+               RGWBucketSyncFlowManager::pipe_set *ptargets_by_name,
+               map<string, RGWBucketSyncFlowManager::pipe_set> *psources,
+               map<string, RGWBucketSyncFlowManager::pipe_set> *ptargets,
+               std::set<string> *psource_zones,
+               std::set<string> *ptarget_zones,
+               bool only_enabled) const;
+
   const std::set<string>& get_source_zones() const {
     return source_zones;
   }
index fe9e355bc3949ae47937c254e169b415ff8ea472..9ea45c37127872a44ecf6283ba772997f2d3a724 100644 (file)
@@ -279,7 +279,8 @@ int RGWBucketGetSyncPolicyHandlerCR::Request::_send_request()
 {
   CephContext *cct = store->ctx();
 
-  int r = store->ctl()->bucket->get_sync_policy_handler(params.bucket,
+  int r = store->ctl()->bucket->get_sync_policy_handler(params.zone,
+                                                        params.bucket,
                                                         &result->policy_handler,
                                                         null_yield);
   if (r < 0) {
index 33cc27b7a061597d59666bf1daaddf1b01a8f73a..ccbd4a4d0ffd26a82511ee8b279a084e6764464d 100644 (file)
@@ -76,7 +76,8 @@ struct rgw_bucket_lifecycle_config_params {
 using RGWBucketLifecycleConfigCR = RGWSimpleWriteOnlyAsyncCR<rgw_bucket_lifecycle_config_params>;
 
 struct rgw_bucket_get_sync_policy_params {
-  rgw_bucket bucket;
+  std::optional<string> zone;
+  std::optional<rgw_bucket> bucket;
 };
 
 struct rgw_bucket_get_sync_policy_result {
index dce0c03c60053d230db8237775f24c681f4ca3dc..d0011f848c25a6fcbccb5ce26b0de55a25cfefca 100644 (file)
@@ -1070,8 +1070,9 @@ public:
     if (!binfo ||
         binfo->bucket != *e.bucket) {
       bucket_info.bucket = *e.bucket;
+    } else {
+      set_bucket_info(*binfo);
     }
-    set_bucket_info(*binfo);
   }
 
   void update_empty_bucket_info(const std::map<rgw_bucket, RGWBucketInfo>& buckets_info) {
@@ -3817,6 +3818,7 @@ int RGWRunBucketSourcesSyncCR::operate()
         } else {
           sync_pair.source_bs.bucket = siter->source.get_bucket();
         }
+        sync_pair.dest_bs.bucket = siter->target.get_bucket();
 
         if (sync_pair.source_bs.shard_id >= 0) {
           num_shards = 1;
@@ -3991,6 +3993,7 @@ int RGWGetBucketPeersCR::operate()
         return set_cr_error(retcode);
       }
 
+      get_policy_params.zone = nullopt;
       get_policy_params.bucket = *target_bucket;
       yield call(new RGWBucketGetSyncPolicyHandlerCR(sync_env->async_rados,
                                                      sync_env->store,
@@ -4012,6 +4015,7 @@ int RGWGetBucketPeersCR::operate()
         return set_cr_error(retcode);
       }
 
+      get_policy_params.zone = source_zone;
       get_policy_params.bucket = *source_bucket;
       yield call(new RGWBucketGetSyncPolicyHandlerCR(sync_env->async_rados,
                                                      sync_env->store,
index 51c88789a247fe5a7d6d0ac0756e6cb229a9b86c..1f2a93d2c5380009c7970f64abaa2df318876922 100644 (file)
@@ -31,7 +31,8 @@ public:
   RGWSI_Bucket_Sync(CephContext *cct) : RGWServiceInstance(cct) {}
 
   virtual int get_policy_handler(RGWSI_Bucket_BI_Ctx& ctx,
-                                 const rgw_bucket& bucket,
+                                 std::optional<string> zone,
+                                 std::optional<rgw_bucket> bucket,
                                  RGWBucketSyncPolicyHandlerRef *handler,
                                  optional_yield y) = 0;
 };
index 550fa83e2e480fca6019310ffa3ba5e2fb987a73..71e364be35a2f80f690501473877ba99d7a0260f 100644 (file)
@@ -29,26 +29,41 @@ int RGWSI_Bucket_Sync_SObj::do_start()
 }
 
 int RGWSI_Bucket_Sync_SObj::get_policy_handler(RGWSI_Bucket_BI_Ctx& ctx,
-                                               const rgw_bucket& bucket,
+                                               std::optional<string> zone,
+                                               std::optional<rgw_bucket> _bucket,
                                                RGWBucketSyncPolicyHandlerRef *handler,
                                                optional_yield y)
 {
-  string key = RGWSI_Bucket::get_bi_meta_key(bucket);
-  string cache_key("bi/");
-  cache_key.append(key);
+  if (!_bucket) {
+    *handler = svc.zone->get_sync_policy_handler(zone);
+    return 0;
+  }
+
+  auto& bucket = *_bucket;
+
+  string zone_key;
+  string bucket_key;
+
+  if (zone && *zone != svc.zone->zone_id()) {
+    zone_key = *zone;
+  }
+
+  bucket_key = RGWSI_Bucket::get_bi_meta_key(bucket);
+
+  string cache_key("bi/" + zone_key + "/" + bucket_key);
 
   if (auto e = sync_policy_cache->find(cache_key)) {
     *handler = e->handler;
     return 0;
   }
 
-
+  bucket_sync_policy_cache_entry e;
   rgw_cache_entry_info cache_info;
 
   RGWBucketInfo bucket_info;
 
   int r = svc.bucket_sobj->read_bucket_instance_info(ctx,
-                                                     key,
+                                                     bucket_key,
                                                      &bucket_info,
                                                      nullptr,
                                                      nullptr,
@@ -56,13 +71,12 @@ int RGWSI_Bucket_Sync_SObj::get_policy_handler(RGWSI_Bucket_BI_Ctx& ctx,
                                                      &cache_info);
   if (r < 0) {
     if (r != -ENOENT) {
-      ldout(cct, 0) << "ERROR: svc.bucket->read_bucket_instance_info(key=" << key << ") returned r=" << r << dendl;
+      ldout(cct, 0) << "ERROR: svc.bucket->read_bucket_instance_info(key=" << bucket_key << ") returned r=" << r << dendl;
     }
     return r;
   }
 
-  bucket_sync_policy_cache_entry e;
-  e.handler.reset(svc.zone->get_sync_policy_handler()->alloc_child(bucket_info));
+  e.handler.reset(svc.zone->get_sync_policy_handler(zone)->alloc_child(bucket_info));
 
   if (!sync_policy_cache->put(svc.cache, cache_key, &e, {&cache_info})) {
     ldout(cct, 20) << "couldn't put bucket_sync_policy cache entry, might have raced with data changes" << dendl;
index 5742168d5cbeb9d130f255ec9a0110ee6a569729..967cb987708036866d845c76ba9a86db25a00212 100644 (file)
@@ -56,7 +56,8 @@ public:
 
 
   int get_policy_handler(RGWSI_Bucket_BI_Ctx& ctx,
-                         const rgw_bucket& bucket,
+                         std::optional<string> zone,
+                         std::optional<rgw_bucket> bucket,
                          RGWBucketSyncPolicyHandlerRef *handler,
                          optional_yield y) override;
 };
index aba47b27ec727b5b93ccc672992dab368c77df0d..c86d28c591f6ce2290a2792413d8ee8cc965b89e 100644 (file)
@@ -38,7 +38,6 @@ void RGWSI_Zone::init(RGWSI_SysObj *_sysobj_svc,
 
 RGWSI_Zone::~RGWSI_Zone()
 {
-  delete sync_policy_handler;
   delete realm;
   delete zonegroup;
   delete zone_public_config;
@@ -46,6 +45,17 @@ RGWSI_Zone::~RGWSI_Zone()
   delete current_period;
 }
 
+std::shared_ptr<RGWBucketSyncPolicyHandler> RGWSI_Zone::get_sync_policy_handler(std::optional<string> zone) const {
+  if (!zone || *zone == zone_id()) {
+    return sync_policy_handler;
+  }
+  auto iter = sync_policy_handlers.find(*zone);
+  if (iter == sync_policy_handlers.end()) {
+    return std::shared_ptr<RGWBucketSyncPolicyHandler>();
+  }
+  return iter->second;
+}
+
 bool RGWSI_Zone::zone_syncs_from(const RGWZone& target_zone, const RGWZone& source_zone) const
 {
   return target_zone.syncs_from(source_zone.name) &&
@@ -153,18 +163,20 @@ int RGWSI_Zone::do_start()
 
   zone_short_id = current_period->get_map().get_zone_short_id(zone_params->get_id());
 
-  sync_policy_handler = new RGWBucketSyncPolicyHandler(this, sync_modules_svc);
+  for (auto ziter : zonegroup->zones) {
+    sync_policy_handlers[ziter.second.id].reset(new RGWBucketSyncPolicyHandler(this, sync_modules_svc, ziter.second.name));
+  }
+
+  sync_policy_handler = sync_policy_handlers[zone_id()]; /* we made sure earlier that zonegroup->zones has our zone */
 
   set<string> source_zones_by_name;
   set<string> target_zones_by_name;
 
-  for (auto& zone_name : sync_policy_handler->get_source_zones()) {
-    source_zones_by_name.insert(zone_name);
-  }
-
-  for (auto& zone_name : sync_policy_handler->get_target_zones()) {
-    target_zones_by_name.insert(zone_name);
-  }
+  sync_policy_handler->reflect(nullptr, nullptr,
+                               nullptr, nullptr,
+                               &source_zones_by_name,
+                               &target_zones_by_name,
+                               false); /* relaxed: also get all zones that we allow to sync to/from */
 
   ret = sync_modules_svc->start();
   if (ret < 0) {
index b4f53e868361c8ce1432235b483ae7771bda0e73..50403343d36fa6ff747e46b3efa0ef82db31f697 100644 (file)
@@ -39,7 +39,8 @@ class RGWSI_Zone : public RGWServiceInstance
   uint32_t zone_short_id{0};
   bool writeable_zone{false};
 
-  RGWBucketSyncPolicyHandler *sync_policy_handler{nullptr};
+  std::shared_ptr<RGWBucketSyncPolicyHandler> sync_policy_handler;
+  std::map<string, std::shared_ptr<RGWBucketSyncPolicyHandler> > sync_policy_handlers;
 
   RGWRESTConn *rest_master_conn{nullptr};
   map<string, RGWRESTConn *> zone_conn_map;
@@ -75,9 +76,7 @@ public:
   int get_zonegroup(const string& id, RGWZoneGroup& zonegroup) const;
   const RGWZone& get_zone() const;
 
-  const RGWBucketSyncPolicyHandler *get_sync_policy_handler() const {
-    return sync_policy_handler;
-  }
+  std::shared_ptr<RGWBucketSyncPolicyHandler> get_sync_policy_handler(std::optional<string> zone = nullopt) const;
 
   const string& zone_name() const;
   const string& zone_id() const;