]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
rgw: manage bucket sync deps index
authorYehuda Sadeh <yehuda@redhat.com>
Wed, 6 Nov 2019 02:12:05 +0000 (18:12 -0800)
committerYehuda Sadeh <yehuda@redhat.com>
Tue, 28 Jan 2020 18:20:38 +0000 (10:20 -0800)
Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
14 files changed:
src/cls/version/cls_version_types.h
src/rgw/rgw_admin.cc
src/rgw/rgw_bucket.cc
src/rgw/rgw_bucket_sync.cc
src/rgw/rgw_bucket_sync.h
src/rgw/rgw_service.cc
src/rgw/services/svc_bucket.h
src/rgw/services/svc_bucket_sobj.cc
src/rgw/services/svc_bucket_sobj.h
src/rgw/services/svc_bucket_sync.h
src/rgw/services/svc_bucket_sync_sobj.cc
src/rgw/services/svc_bucket_sync_sobj.h
src/rgw/services/svc_zone.cc
src/rgw/services/svc_zone.h

index 852183f30e57a017f646e8653d3376ea259ed2b0..ffcb73fa1154405ff7fd2994fd443af492af5a2d 100644 (file)
@@ -36,15 +36,20 @@ struct obj_version {
     tag.clear();
   }
 
-  bool empty() {
+  bool empty() const {
     return tag.empty();
   }
 
-  bool compare(struct obj_version *v) {
+  bool compare(struct obj_version *v) const {
     return (ver == v->ver &&
             tag.compare(v->tag) == 0);
   }
 
+  bool operator==(const struct obj_version& v) const {
+    return (ver == v.ver &&
+            tag.compare(v.tag) == 0);
+  }
+
   void dump(Formatter *f) const;
   void decode_json(JSONObj *obj);
   static void generate_test_instances(list<obj_version*>& o);
index 1c5d5b574b8df5227155cb5ff7a2957581185d3c..11ceacaaaa8e5cec5eb33310736db36efba1d312 100644 (file)
@@ -2331,6 +2331,16 @@ void encode_json(const char *name, const RGWBucketSyncFlowManager::pipe_set& pse
   }
 }
 
+static std::vector<string> convert_bucket_set_to_str_vec(const std::set<rgw_bucket>& bs)
+{
+  std::vector<string> result;
+  result.reserve(bs.size());
+  for (auto& b : bs) {
+    result.push_back(b.get_key());
+  }
+  return std::move(result);
+}
+
 static int sync_info(std::optional<string> opt_target_zone, std::optional<rgw_bucket> opt_bucket, Formatter *formatter)
 {
   std::optional<string> zone_id;
@@ -2369,6 +2379,12 @@ static int sync_info(std::optional<string> opt_target_zone, std::optional<rgw_bu
       bucket_handler.reset(handler->alloc_child(*eff_bucket, nullopt));
     }
 
+    ret = bucket_handler->init(null_yield);
+    if (ret < 0) {
+      cerr << "ERROR: failed to init bucket sync policy handler: " << cpp_strerror(-ret) << " (ret=" << ret << ")" << std::endl;
+      return ret;
+    }
+
     handler = bucket_handler;
   }
 
@@ -2377,10 +2393,40 @@ static int sync_info(std::optional<string> opt_target_zone, std::optional<rgw_bu
 
   handler->get_pipes(&sources, &dests);
 
+  auto source_hints_vec = convert_bucket_set_to_str_vec(handler->get_source_hints());
+  auto target_hints_vec = convert_bucket_set_to_str_vec(handler->get_target_hints());
+
+  RGWBucketSyncFlowManager::pipe_set *resolved_sources;
+  RGWBucketSyncFlowManager::pipe_set *resolved_dests;
+
+  for (auto& b : handler->get_source_hints()) {
+    RGWBucketInfo hint_bucket_info;
+    rgw_bucket hint_bucket;
+    int ret = init_bucket(b, hint_bucket_info, hint_bucket);
+    if (ret < 0) {
+      ldout(cct, 20) << "could not init bucket info for hint bucket=" << b << " ... skipping" << dendl;
+      continue;
+    }
+
+    RGWBucketSyncPolicyHandlerRef hint_bucket_handler;
+    hint_bucket_handler.reset(handler->alloc_child(hint_bucket_indo));
+
+  }
+
   {
     Formatter::ObjectSection os(*formatter, "result");
     encode_json("sources", *sources, formatter);
     encode_json("dests", *dests, formatter);
+    {
+      Formatter::ObjectSection hints_section(*formatter, "hints");
+      encode_json("sources", source_hints_vec, formatter);
+      encode_json("dests", target_hints_vec, formatter);
+    }
+    {
+      Formatter::ObjectSection resolved_hints_section(*formatter, "resolved-hints");
+      encode_json("resolved-hints", *sources, formatter);
+      encode_json("dests", *dests, formatter);
+    }
   }
 
   formatter->flush(cout);
index 16adc9df3b4fff4956f90b02fb6b1750290b79ff..a433b30ba18f0eedf6673b07872b5e86a9e772f1 100644 (file)
@@ -2926,7 +2926,7 @@ public:
     if (ret < 0 && ret != -ENOENT)
       return ret;
 
-    return svc.bucket->remove_bucket_instance_info(ctx, entry, &bci.info.objv_tracker, y);
+    return svc.bucket->remove_bucket_instance_info(ctx, entry, bci.info, &bci.info.objv_tracker, y);
   }
 
   int call(std::function<int(RGWSI_Bucket_BI_Ctx& ctx)> f) {
@@ -3267,6 +3267,7 @@ int RGWBucketCtl::remove_bucket_instance_info(const rgw_bucket& bucket,
   return bmi_handler->call([&](RGWSI_Bucket_BI_Ctx& ctx) {
     return svc.bucket->remove_bucket_instance_info(ctx,
                                                    RGWSI_Bucket::get_bi_meta_key(bucket),
+                                                   info,
                                                    &info.objv_tracker,
                                                    y);
   });
index 184a31becff06399505a860b928e86cdf9f89e78..d6e07df72d5e2114da670dc91dbd7f2382909fb5 100644 (file)
@@ -6,6 +6,7 @@
 #include "rgw_zone.h"
 
 #include "services/svc_zone.h"
+#include "services/svc_bucket_sync.h"
 
 #define dout_subsys ceph_subsys_rgw
 
@@ -344,6 +345,7 @@ void RGWBucketSyncFlowManager::init(const rgw_sync_policy_info& sync_policy) {
 void RGWBucketSyncFlowManager::reflect(std::optional<rgw_bucket> effective_bucket,
                                        RGWBucketSyncFlowManager::pipe_set *source_pipes,
                                        RGWBucketSyncFlowManager::pipe_set *dest_pipes,
+                                      std::optional<rgw_bucket> filter_peer_bucket,
                                        bool only_enabled) const
 
 {
@@ -352,7 +354,7 @@ void RGWBucketSyncFlowManager::reflect(std::optional<rgw_bucket> effective_bucke
   entity.bucket = effective_bucket.value_or(rgw_bucket());
 
   if (parent) {
-    parent->reflect(effective_bucket, source_pipes, dest_pipes, only_enabled);
+    parent->reflect(effective_bucket, source_pipes, dest_pipes, filter_peer_bucket, only_enabled);
   }
 
   for (auto& item : flow_groups) {
@@ -369,6 +371,9 @@ void RGWBucketSyncFlowManager::reflect(std::optional<rgw_bucket> effective_bucke
       if (!pipe.dest.match_bucket(effective_bucket)) {
         continue;
       }
+      if (!pipe.source.match_bucket(filter_peer_bucket)) {
+       continue;
+      }
 
       pipe.source.apply_bucket(effective_bucket);
       pipe.dest.apply_bucket(effective_bucket);
@@ -382,6 +387,9 @@ void RGWBucketSyncFlowManager::reflect(std::optional<rgw_bucket> effective_bucke
       if (!pipe.source.match_bucket(effective_bucket)) {
         continue;
       }
+      if (!pipe.dest.match_bucket(filter_peer_bucket)) {
+       continue;
+      }
 
       pipe.source.apply_bucket(effective_bucket);
       pipe.dest.apply_bucket(effective_bucket);
@@ -449,7 +457,9 @@ void RGWSyncPolicyCompat::convert_old_sync_config(RGWSI_Zone *zone_svc,
 
 RGWBucketSyncPolicyHandler::RGWBucketSyncPolicyHandler(RGWSI_Zone *_zone_svc,
                                                        RGWSI_SyncModules *sync_modules_svc,
-                                                       std::optional<string> effective_zone) : zone_svc(_zone_svc) {
+                                                      RGWSI_Bucket_Sync *_bucket_sync_svc,
+                                                       std::optional<string> effective_zone) : zone_svc(_zone_svc) ,
+                                                                                               bucket_sync_svc(_bucket_sync_svc) {
   zone_name = effective_zone.value_or(zone_svc->zone_name());
   flow_mgr.reset(new RGWBucketSyncFlowManager(zone_name,
                                               nullopt,
@@ -459,8 +469,6 @@ RGWBucketSyncPolicyHandler::RGWBucketSyncPolicyHandler(RGWSI_Zone *_zone_svc,
   if (sync_policy.empty()) {
     RGWSyncPolicyCompat::convert_old_sync_config(zone_svc, sync_modules_svc, &sync_policy);
   }
-
-  init();
 }
 
 RGWBucketSyncPolicyHandler::RGWBucketSyncPolicyHandler(const RGWBucketSyncPolicyHandler *_parent,
@@ -471,10 +479,10 @@ RGWBucketSyncPolicyHandler::RGWBucketSyncPolicyHandler(const RGWBucketSyncPolicy
   }
   bucket = _bucket_info.bucket;
   zone_svc = parent->zone_svc;
+  bucket_sync_svc = parent->bucket_sync_svc;
   flow_mgr.reset(new RGWBucketSyncFlowManager(parent->zone_name,
                                               _bucket_info.bucket,
                                               parent->flow_mgr.get()));
-  init();
 }
 
 RGWBucketSyncPolicyHandler::RGWBucketSyncPolicyHandler(const RGWBucketSyncPolicyHandler *_parent,
@@ -485,10 +493,10 @@ RGWBucketSyncPolicyHandler::RGWBucketSyncPolicyHandler(const RGWBucketSyncPolicy
   }
   bucket = _bucket;
   zone_svc = parent->zone_svc;
+  bucket_sync_svc = parent->bucket_sync_svc;
   flow_mgr.reset(new RGWBucketSyncFlowManager(parent->zone_name,
                                               _bucket,
                                               parent->flow_mgr.get()));
-  init();
 }
 
 RGWBucketSyncPolicyHandler *RGWBucketSyncPolicyHandler::alloc_child(const RGWBucketInfo& bucket_info) const
@@ -502,8 +510,19 @@ RGWBucketSyncPolicyHandler *RGWBucketSyncPolicyHandler::alloc_child(const rgw_bu
   return new RGWBucketSyncPolicyHandler(this, bucket, sync_policy);
 }
 
-void RGWBucketSyncPolicyHandler::init()
+int RGWBucketSyncPolicyHandler::init(std::optional<rgw_bucket> filter_peer_bucket,
+                                    optional_yield y)
 {
+  int r = bucket_sync_svc->get_bucket_sync_hints(bucket.value_or(rgw_bucket()),
+                                                &source_hints,
+                                                &target_hints,
+                                                y);
+  if (r < 0) {
+    ldout(bucket_sync_svc->ctx(), 0) << "ERROR: failed to initialize bucket sync policy handler: get_bucket_sync_hints() on bucket="
+      << bucket << " returned r=" << r << dendl;
+    return r;
+  }
+
   flow_mgr->init(sync_policy);
 
   reflect(&sources_by_name,
@@ -512,7 +531,10 @@ void RGWBucketSyncPolicyHandler::init()
           &targets,
           &source_zones,
           &target_zones,
+         filter_peer_bucket,
           true);
+
+  return 0;
 }
 
 void RGWBucketSyncPolicyHandler::reflect(RGWBucketSyncFlowManager::pipe_set *psources_by_name,
@@ -521,6 +543,7 @@ void RGWBucketSyncPolicyHandler::reflect(RGWBucketSyncFlowManager::pipe_set *pso
                                          map<string, RGWBucketSyncFlowManager::pipe_set> *ptargets,
                                          std::set<string> *psource_zones,
                                          std::set<string> *ptarget_zones,
+                                        std::optional<rgw_bucket> filter_peer_bucket,
                                          bool only_enabled) const
 {
   RGWBucketSyncFlowManager::pipe_set _sources_by_name;
@@ -530,7 +553,7 @@ void RGWBucketSyncPolicyHandler::reflect(RGWBucketSyncFlowManager::pipe_set *pso
   std::set<string> _source_zones;
   std::set<string> _target_zones;
 
-  flow_mgr->reflect(bucket, &_sources_by_name, &_targets_by_name, only_enabled);
+  flow_mgr->reflect(bucket, &_sources_by_name, &_targets_by_name, filter_peer_bucket, only_enabled);
 
   for (auto& pipe : _sources_by_name.pipes) {
     if (!pipe.source.zone) {
index 9a0c447ace1c80358d293af1aea3d298176380a9..4be4dda26c6384a84c0dedf574b828ff92d6044c 100644 (file)
@@ -21,6 +21,7 @@
 
 class RGWSI_Zone;
 class RGWSI_SyncModules;
+class RGWSI_Bucket_Sync;
 
 struct rgw_sync_group_pipe_map;
 struct rgw_sync_bucket_pipes;
@@ -167,6 +168,7 @@ public:
   void reflect(std::optional<rgw_bucket> effective_bucket,
                pipe_set *flow_by_source,
                pipe_set *flow_by_dest,  
+              std::optional<rgw_bucket> filter_peer_bucket,
                bool only_enabled) const;
 
 };
@@ -174,6 +176,7 @@ public:
 class RGWBucketSyncPolicyHandler {
   const RGWBucketSyncPolicyHandler *parent{nullptr};
   RGWSI_Zone *zone_svc;
+  RGWSI_Bucket_Sync *bucket_sync_svc;
   string zone_name;
   std::optional<RGWBucketInfo> bucket_info;
   std::optional<rgw_bucket> bucket;
@@ -189,6 +192,9 @@ class RGWBucketSyncPolicyHandler {
   std::set<string> source_zones; /* source zones by name */
   std::set<string> target_zones; /* target zones by name */
 
+  std::set<rgw_bucket> source_hints;
+  std::set<rgw_bucket> target_hints;
+
   bool bucket_is_sync_source() const {
     return !targets.empty();
   }
@@ -197,8 +203,6 @@ class RGWBucketSyncPolicyHandler {
     return !sources.empty();
   }
 
-  void init();
-
   RGWBucketSyncPolicyHandler(const RGWBucketSyncPolicyHandler *_parent,
                              const RGWBucketInfo& _bucket_info);
 
@@ -208,18 +212,22 @@ class RGWBucketSyncPolicyHandler {
 public:
   RGWBucketSyncPolicyHandler(RGWSI_Zone *_zone_svc,
                              RGWSI_SyncModules *sync_modules_svc,
+                            RGWSI_Bucket_Sync *bucket_sync_svc,
                              std::optional<string> effective_zone = std::nullopt);
 
   RGWBucketSyncPolicyHandler *alloc_child(const RGWBucketInfo& bucket_info) const;
   RGWBucketSyncPolicyHandler *alloc_child(const rgw_bucket& bucket,
                                           std::optional<rgw_sync_policy_info> sync_policy) const;
 
+  int init(std::optional<rgw_bucket> filter_peer_bucket, optional_yield y);
+
   void reflect(RGWBucketSyncFlowManager::pipe_set *psources_by_name,
                RGWBucketSyncFlowManager::pipe_set *ptargets_by_name,
                map<string, RGWBucketSyncFlowManager::pipe_set> *psources,
                map<string, RGWBucketSyncFlowManager::pipe_set> *ptargets,
                std::set<string> *psource_zones,
                std::set<string> *ptarget_zones,
+              std::optional<rgw_bucket> filter_peer_bucket,
                bool only_enabled) const;
 
   const std::set<string>& get_source_zones() const {
@@ -247,6 +255,14 @@ public:
     *targets = &targets_by_name;
   }
 
+  const std::set<rgw_bucket>& get_source_hints() const {
+    return source_hints;
+  }
+
+  const std::set<rgw_bucket>& get_target_hints() const {
+    return target_hints;
+  }
+
   bool bucket_exports_data() const;
   bool bucket_imports_data() const;
 };
index c81817f546c2435f46fb02289ad42d73d7d2bd39..00803b81a85a9f8880f50914846a1810ec551c9e 100644 (file)
@@ -82,7 +82,9 @@ int RGWServices_Def::init(CephContext *cct,
   bucket_sobj->init(zone.get(), sysobj.get(), sysobj_cache.get(),
                     bi_rados.get(), meta.get(), meta_be_sobj.get(),
                     sync_modules.get(), bucket_sync_sobj.get());
-  bucket_sync_sobj->init(zone.get(), sysobj_cache.get(),
+  bucket_sync_sobj->init(zone.get(),
+                         sysobj.get(),
+                         sysobj_cache.get(),
                          bucket_sobj.get());
   cls->init(zone.get(), rados.get());
   datalog_rados->init(zone.get(), cls.get());
@@ -93,7 +95,7 @@ int RGWServices_Def::init(CephContext *cct,
   notify->init(zone.get(), rados.get(), finisher.get());
   otp->init(zone.get(), meta.get(), meta_be_otp.get());
   rados->init();
-  zone->init(sysobj.get(), rados.get(), sync_modules.get());
+  zone->init(sysobj.get(), rados.get(), sync_modules.get(), bucket_sync_sobj.get());
   zone_utils->init(rados.get(), zone.get());
   quota->init(zone.get());
   sync_modules->init(zone.get());
index b47b8f711afa609c3a7bf5505d712dbb4895bf0c..7e39302f43ccebe45bc713d1d5a2ac08368239e9 100644 (file)
@@ -86,6 +86,7 @@ public:
 
   virtual int remove_bucket_instance_info(RGWSI_Bucket_BI_Ctx& ctx,
                                   const string& key,
+                                 const RGWBucketInfo& bucket_info,
                                   RGWObjVersionTracker *objv_tracker,
                                   optional_yield y) = 0;
 
index 0b87485fdebf50ec4d21f2c2d4a6cd3034a43d08..b276969e72d06399bb5a7c2301e7f493bec4b1a7 100644 (file)
@@ -533,7 +533,8 @@ int RGWSI_Bucket_SObj::store_bucket_instance_info(RGWSI_Bucket_BI_Ctx& ctx,
 
   if (ret >= 0) {
     int r = svc.bucket_sync->handle_bi_update(info,
-                                              orig_info.value_or(nullptr));
+                                              orig_info.value_or(nullptr),
+                                              y);
     if (r < 0) {
       return r;
     }
@@ -558,11 +559,27 @@ int RGWSI_Bucket_SObj::store_bucket_instance_info(RGWSI_Bucket_BI_Ctx& ctx,
 
 int RGWSI_Bucket_SObj::remove_bucket_instance_info(RGWSI_Bucket_BI_Ctx& ctx,
                                                    const string& key,
+                                                   const RGWBucketInfo& info,
                                                    RGWObjVersionTracker *objv_tracker,
                                                    optional_yield y)
 {
   RGWSI_MBSObj_RemoveParams params;
-  return svc.meta_be->remove_entry(ctx.get(), key, params, objv_tracker, y);
+  int ret = svc.meta_be->remove_entry(ctx.get(), key, params, objv_tracker, y);
+
+  if (ret < 0 &&
+      ret != -ENOENT) {
+    return ret;
+  }
+
+  int r = svc.bucket_sync->handle_bi_removal(info, y);
+  if (r < 0) {
+    ldout(cct, 0) << "ERROR: failed to update bucket instance sync index: r=" << r << dendl;
+    /* returning success as index is just keeping hints, so will keep extra hints,
+     * but bucket removal succeeded
+     */
+  }
+
+  return 0;
 }
 
 int RGWSI_Bucket_SObj::read_bucket_stats(const RGWBucketInfo& bucket_info,
index 80695ba2f7384ce767b39abf2f12a6f54fa2fd70..744f4a8931e42cedcb15c05178affd3ebc0bcdfa 100644 (file)
@@ -153,6 +153,7 @@ public:
 
   int remove_bucket_instance_info(RGWSI_Bucket_BI_Ctx& ctx,
                                   const string& key,
+                                  const RGWBucketInfo& bucket_info,
                                   RGWObjVersionTracker *objv_tracker,
                                   optional_yield y) override;
 
index ed709620276ab1b47575b86a17a89b52ea5f590d..250e874f0068501d3ad9ee8bf273e79cb5c8300e 100644 (file)
@@ -35,8 +35,17 @@ public:
                                  std::optional<rgw_bucket> bucket,
                                  RGWBucketSyncPolicyHandlerRef *handler,
                                  optional_yield y) = 0;
+
   virtual int handle_bi_update(RGWBucketInfo& bucket_info,
-                               RGWBucketInfo *orig_bucket_info) = 0;
+                               RGWBucketInfo *orig_bucket_info,
+                               optional_yield y) = 0;
+  virtual int handle_bi_removal(const RGWBucketInfo& bucket_info,
+                                optional_yield y) = 0;
+
+  virtual int get_bucket_sync_hints(const rgw_bucket& bucket,
+                                    std::set<rgw_bucket> *sources,
+                                    std::set<rgw_bucket> *dests,
+                                    optional_yield y) = 0;
 };
 
 
index 935e0ea776a5f72ca56450a4f753003f307d76f3..c849011e236c68ac5e9031521bcf928af0d26cb3 100644 (file)
@@ -4,20 +4,28 @@
 #include "svc_bucket_sobj.h"
 
 #include "rgw/rgw_bucket_sync.h"
+#include "rgw/rgw_zone.h"
 
 #define dout_subsys ceph_subsys_rgw
 
+static string bucket_sync_sources_oid_prefix = "bucket.sync-source-hints";
+static string bucket_sync_targets_oid_prefix = "bucket.sync-target-hints";
 
 RGWSI_Bucket_Sync_SObj::~RGWSI_Bucket_Sync_SObj() {
 }
 
 void RGWSI_Bucket_Sync_SObj::init(RGWSI_Zone *_zone_svc,
+                                  RGWSI_SysObj *_sysobj_svc,
                                   RGWSI_SysObj_Cache *_cache_svc,
                                   RGWSI_Bucket_SObj *bucket_sobj_svc)
 {
   svc.zone = _zone_svc;
+  svc.sysobj = _sysobj_svc;
   svc.cache = _cache_svc;
   svc.bucket_sobj = bucket_sobj_svc;
+
+  hint_index_mgr.init(svc.zone,
+                      svc.sysobj);
 }
 
 int RGWSI_Bucket_Sync_SObj::do_start()
@@ -79,6 +87,12 @@ int RGWSI_Bucket_Sync_SObj::get_policy_handler(RGWSI_Bucket_BI_Ctx& ctx,
 
   e.handler.reset(svc.zone->get_sync_policy_handler(zone)->alloc_child(bucket_info));
 
+  r = e.handler->init(y);
+  if (r < 0) {
+    ldout(cct, 20) << "ERROR: failed to init bucket sync policy handler: r=" << r << dendl;
+    return r;
+  }
+
   if (!sync_policy_cache->put(svc.cache, cache_key, &e, {&cache_info})) {
     ldout(cct, 20) << "couldn't put bucket_sync_policy cache entry, might have raced with data changes" << dendl;
   }
@@ -88,7 +102,7 @@ int RGWSI_Bucket_Sync_SObj::get_policy_handler(RGWSI_Bucket_BI_Ctx& ctx,
   return 0;
 }
 
-static void diff_sets(std::set<rgw_bucket>& orig_set,
+static bool diff_sets(std::set<rgw_bucket>& orig_set,
                       std::set<rgw_bucket>& new_set,
                       vector<rgw_bucket> *added,
                       vector<rgw_bucket> *removed)
@@ -118,10 +132,499 @@ static void diff_sets(std::set<rgw_bucket>& orig_set,
   for (; niter != new_set.end(); ++niter) {
     added->push_back(*niter);
   }
+
+  return !(removed->empty() && added->empty());
+}
+
+
+class RGWSI_BS_SObj_HintIndexObj
+{
+  friend class RGWSI_Bucket_Sync_SObj;
+
+  CephContext *cct;
+  struct {
+    RGWSI_SysObj *sysobj;
+  } svc;
+
+  RGWSysObjectCtx obj_ctx;
+  rgw_raw_obj obj;
+  RGWSysObj sysobj;
+
+  RGWObjVersionTracker ot;
+
+  bool has_data{false};
+
+public:
+  struct bi_entry {
+    rgw_bucket bucket;
+    map<rgw_bucket /* info_source */, obj_version> sources;
+
+    void encode(bufferlist& bl) const {
+      ENCODE_START(1, 1, bl);
+      encode(bucket, bl);
+      encode(sources, bl);
+      ENCODE_FINISH(bl);
+    }
+
+    void decode(bufferlist::const_iterator& bl) {
+      DECODE_START(1, bl);
+      decode(bucket, bl);
+      decode(sources, bl);
+      DECODE_FINISH(bl);
+    }
+
+    bool add(const rgw_bucket& info_source,
+             const obj_version& info_source_ver) {
+      auto& ver = sources[info_source];
+
+      if (ver == info_source_ver) { /* already updated */
+        return false;
+      }
+
+      if (info_source_ver.tag == ver.tag &&
+          info_source_ver.ver < ver.ver) {
+        return false;
+      }
+
+      ver = info_source_ver;
+
+      return true;
+    }
+
+    bool remove(const rgw_bucket& info_source,
+                const obj_version& info_source_ver) {
+      auto iter = sources.find(info_source);
+      if (iter == sources.end()) {
+        return false;
+      }
+
+      auto& ver = iter->second;
+
+      if (info_source_ver.tag == ver.tag &&
+          info_source_ver.ver < ver.ver) {
+        return false;
+      }
+
+      sources.erase(info_source);
+      return true;
+    }
+
+    bool empty() const {
+      return sources.empty();
+    }
+  };
+
+  struct single_instance_info {
+    map<rgw_bucket, bi_entry> entries;
+
+    void encode(bufferlist& bl) const {
+      ENCODE_START(1, 1, bl);
+      encode(entries, bl);
+      ENCODE_FINISH(bl);
+    }
+
+    void decode(bufferlist::const_iterator& bl) {
+      DECODE_START(1, bl);
+      decode(entries, bl);
+      DECODE_FINISH(bl);
+    }
+
+    bool add_entry(const rgw_bucket& info_source,
+                   const obj_version& info_source_ver,
+                   const rgw_bucket& bucket) {
+      auto& entry = entries[bucket];
+
+      if (!entry.add(info_source, info_source_ver)) {
+        return false;
+      }
+
+      entry.bucket = bucket;
+
+      return true;
+    }
+
+    bool remove_entry(const rgw_bucket& info_source,
+                      const obj_version& info_source_ver,
+                      const rgw_bucket& bucket) {
+      auto iter = entries.find(bucket);
+      if (iter == entries.end()) {
+        return false;
+      }
+
+      if (!iter->second.remove(info_source, info_source_ver)) {
+        return false;
+      }
+
+      if (iter->second.empty()) {
+        entries.erase(iter);
+      }
+
+      return true;
+    }
+
+    void clear() {
+      entries.clear();
+    }
+
+    bool empty() const {
+      return entries.empty();
+    }
+
+    void get_entities(std::set<rgw_bucket> *result) const {
+      for (auto& iter : entries) {
+        result->insert(iter.first);
+      }
+    }
+  };
+
+  struct info_map {
+    map<rgw_bucket, single_instance_info> instances;
+
+    void encode(bufferlist& bl) const {
+      ENCODE_START(1, 1, bl);
+      encode(instances, bl);
+      ENCODE_FINISH(bl);
+    }
+
+    void decode(bufferlist::const_iterator& bl) {
+      DECODE_START(1, bl);
+      decode(instances, bl);
+      DECODE_FINISH(bl);
+    }
+
+    bool empty() const {
+      return instances.empty();
+    }
+
+    void clear() {
+      instances.clear();
+    }
+
+    void get_entities(const rgw_bucket& bucket,
+                      std::set<rgw_bucket> *result) const {
+      auto iter = instances.find(bucket);
+      if (iter == instances.end()) {
+        return;
+      }
+      iter->second.get_entities(result);
+    }
+  } info;
+
+  RGWSI_BS_SObj_HintIndexObj(RGWSI_SysObj *_sysobj_svc,
+                             const rgw_raw_obj& _obj) : cct(_sysobj_svc->ctx()),
+                                                        obj_ctx(_sysobj_svc->init_obj_ctx()),
+                                                        obj(_obj),
+                                                        sysobj(obj_ctx.get_obj(obj))
+  {
+    svc.sysobj = _sysobj_svc;
+  }
+
+  int update(const rgw_bucket& entity,
+             const RGWBucketInfo& info_source,
+             std::optional<std::vector<rgw_bucket> > add,
+             std::optional<std::vector<rgw_bucket> > remove,
+             optional_yield y);
+
+private:
+  void update_entries(const rgw_bucket& info_source,
+                      const obj_version& info_source_ver,
+                      std::optional<std::vector<rgw_bucket> > add,
+                      std::optional<std::vector<rgw_bucket> > remove,
+                      single_instance_info *instance);
+
+  int read(optional_yield y);
+  int flush(optional_yield y);
+
+  void invalidate() {
+    has_data = false;
+    info.clear();
+  }
+
+  void get_entities(const rgw_bucket& bucket,
+                    std::set<rgw_bucket> *result) const {
+    info.get_entities(bucket, result);
+  }
+};
+WRITE_CLASS_ENCODER(RGWSI_BS_SObj_HintIndexObj::bi_entry)
+WRITE_CLASS_ENCODER(RGWSI_BS_SObj_HintIndexObj::single_instance_info)
+WRITE_CLASS_ENCODER(RGWSI_BS_SObj_HintIndexObj::info_map)
+
+int RGWSI_BS_SObj_HintIndexObj::update(const rgw_bucket& entity,
+                                       const RGWBucketInfo& info_source,
+                                       std::optional<std::vector<rgw_bucket> > add,
+                                       std::optional<std::vector<rgw_bucket> > remove,
+                                       optional_yield y)
+{
+  int r = 0;
+
+  auto& info_source_ver = info_source.objv_tracker.read_version;
+
+#define MAX_RETRIES 25
+
+  for (int i = 0; i < MAX_RETRIES; ++i) {
+    if (!has_data) {
+      r = read(y);
+      if (r < 0) {
+        ldout(cct, 0) << "ERROR: cannot update hint index: failed to read: r=" << r << dendl;
+        return r;
+      }
+    }
+
+    auto& instance = info.instances[entity];
+
+    update_entries(info_source.bucket,
+                   info_source_ver,
+                   add, remove,
+                   &instance);
+
+    if (instance.empty()) {
+      info.instances.erase(entity);
+    }
+
+    r = flush(y);
+    if (r >= 0) {
+      return 0;
+    }
+
+    if (r != -ECANCELED) {
+      ldout(cct, 0) << "ERROR: failed to flush hint index: obj=" << obj << " r=" << r << dendl;
+      return r;
+    }
+  }
+  ldout(cct, 0) << "ERROR: failed to flush hint index: too many retries (obj=" << obj << "), likely a bug" << dendl;
+
+  return -EIO;
+}
+
+void RGWSI_BS_SObj_HintIndexObj::update_entries(const rgw_bucket& info_source,
+                                                const obj_version& info_source_ver,
+                                                std::optional<std::vector<rgw_bucket> > add,
+                                                std::optional<std::vector<rgw_bucket> > remove,
+                                                single_instance_info *instance)
+{
+  if (remove) {
+    for (auto& bucket : *remove) {
+      instance->remove_entry(info_source, info_source_ver, bucket);
+    }
+  }
+
+  if (add) {
+    for (auto& bucket : *add) {
+      instance->add_entry(info_source, info_source_ver, bucket);
+    }
+  }
+}
+
+int RGWSI_BS_SObj_HintIndexObj::read(optional_yield y) {
+  RGWObjVersionTracker _ot;
+  bufferlist bl;
+  int r = sysobj.rop()
+    .set_objv_tracker(&_ot) /* forcing read of current version */
+    .read(&bl, y);
+  if (r < 0 && r != -ENOENT) {
+    ldout(cct, 0) << "ERROR: failed reading data (obj=" << obj << "), r=" << r << dendl;
+    return r;
+  }
+
+  ot = _ot;
+
+  if (r >= 0) {
+    auto iter = bl.cbegin();
+    try {
+      decode(info, iter);
+      has_data = true;
+    } catch (buffer::error& err) {
+      ldout(cct, 0) << "ERROR: " << __func__ << "(): failed to decode entries, ignoring" << dendl;
+      info.clear();
+    }
+  } else {
+    info.clear();
+  }
+
+  return 0;
+}
+
+int RGWSI_BS_SObj_HintIndexObj::flush(optional_yield y) {
+  int r;
+
+  if (!info.empty()) {
+    bufferlist bl;
+    encode(info, bl);
+
+    r = sysobj.wop()
+      .set_objv_tracker(&ot) /* forcing read of current version */
+      .write(bl, y);
+
+  } else { /* remove */
+    r = sysobj.wop()
+      .set_objv_tracker(&ot)
+      .remove(y);
+  }
+
+  if (r < 0) {
+    return r;
+  }
+
+  return 0;
+}
+
+rgw_raw_obj RGWSI_Bucket_Sync_SObj::HintIndexManager::get_sources_obj(const rgw_bucket& bucket) const
+{
+  rgw_bucket b = bucket;
+  b.bucket_id.clear();
+  return rgw_raw_obj(svc.zone->get_zone_params().log_pool,
+                     bucket_sync_sources_oid_prefix + "." + b.get_key());
+}
+
+rgw_raw_obj RGWSI_Bucket_Sync_SObj::HintIndexManager::get_dests_obj(const rgw_bucket& bucket) const
+{
+  rgw_bucket b = bucket;
+  b.bucket_id.clear();
+  return rgw_raw_obj(svc.zone->get_zone_params().log_pool,
+                     bucket_sync_targets_oid_prefix + "." + b.get_key());
+}
+
+int RGWSI_Bucket_Sync_SObj::do_update_hints(const RGWBucketInfo& bucket_info,
+                                            std::vector<rgw_bucket>& added_dests,
+                                            std::vector<rgw_bucket>& removed_dests,
+                                            std::vector<rgw_bucket>& added_sources,
+                                            std::vector<rgw_bucket>& removed_sources,
+                                            optional_yield y)
+{
+  std::vector<rgw_bucket> self_entity;
+  self_entity.push_back(bucket_info.bucket);
+
+  if (!added_dests.empty() ||
+      !removed_dests.empty()) {
+    /* update our dests */
+    RGWSI_BS_SObj_HintIndexObj index(svc.sysobj,
+                                     hint_index_mgr.get_dests_obj(bucket_info.bucket));
+    int r = index.update(bucket_info.bucket,
+                         bucket_info,
+                         added_dests,
+                         removed_dests,
+                         y);
+    if (r < 0) {
+      ldout(cct, 0) << "ERROR: failed to update targets index for bucket=" << bucket_info.bucket << " r=" << r << dendl;
+      return r;
+    }
+
+    /* update added dest buckets */
+    for (auto& dest_bucket : added_dests) {
+      RGWSI_BS_SObj_HintIndexObj dep_index(svc.sysobj,
+                                           hint_index_mgr.get_sources_obj(dest_bucket));
+      int r = dep_index.update(dest_bucket,
+                               bucket_info,
+                               self_entity,
+                               std::nullopt,
+                               y);
+      if (r < 0) {
+        ldout(cct, 0) << "ERROR: failed to update targets index for bucket=" << dest_bucket << " r=" << r << dendl;
+        return r;
+      }
+    }
+    /* update removed dest buckets */
+    for (auto& dest_bucket : removed_dests) {
+      RGWSI_BS_SObj_HintIndexObj dep_index(svc.sysobj,
+                                           hint_index_mgr.get_sources_obj(dest_bucket));
+      int r = dep_index.update(dest_bucket,
+                               bucket_info,
+                               std::nullopt,
+                               self_entity,
+                               y);
+      if (r < 0) {
+        ldout(cct, 0) << "ERROR: failed to update targets index for bucket=" << dest_bucket << " r=" << r << dendl;
+        return r;
+      }
+    }
+  }
+
+  if (!added_dests.empty() ||
+      !removed_dests.empty()) {
+    RGWSI_BS_SObj_HintIndexObj index(svc.sysobj,
+                                     hint_index_mgr.get_sources_obj(bucket_info.bucket));
+    /* update our sources */
+    int r = index.update(bucket_info.bucket,
+                         bucket_info,
+                         added_sources,
+                         removed_sources,
+                         y);
+    if (r < 0) {
+      ldout(cct, 0) << "ERROR: failed to update targets index for bucket=" << bucket_info.bucket << " r=" << r << dendl;
+      return r;
+    }
+
+    /* update added sources buckets */
+    for (auto& source_bucket : added_sources) {
+      RGWSI_BS_SObj_HintIndexObj dep_index(svc.sysobj,
+                                           hint_index_mgr.get_dests_obj(source_bucket));
+      int r = dep_index.update(source_bucket,
+                               bucket_info,
+                               self_entity,
+                               std::nullopt,
+                               y);
+      if (r < 0) {
+        ldout(cct, 0) << "ERROR: failed to update targets index for bucket=" << source_bucket << " r=" << r << dendl;
+        return r;
+      }
+    }
+    /* update removed dest buckets */
+    for (auto& source_bucket : removed_sources) {
+      RGWSI_BS_SObj_HintIndexObj dep_index(svc.sysobj,
+                                           hint_index_mgr.get_dests_obj(source_bucket));
+      int r = dep_index.update(source_bucket,
+                               bucket_info,
+                               std::nullopt,
+                               self_entity,
+                               y);
+      if (r < 0) {
+        ldout(cct, 0) << "ERROR: failed to update targets index for bucket=" << source_bucket << " r=" << r << dendl;
+        return r;
+      }
+    }
+  }
+
+  return 0;
+}
+
+int RGWSI_Bucket_Sync_SObj::handle_bi_removal(const RGWBucketInfo& bucket_info,
+                                              optional_yield y)
+{
+  std::set<rgw_bucket> sources_set;
+  std::set<rgw_bucket> dests_set;
+
+  if (bucket_info.sync_policy) {
+    bucket_info.sync_policy->get_potential_related_buckets(bucket_info.bucket,
+                                                           &sources_set,
+                                                           &dests_set);
+  }
+
+  std::vector<rgw_bucket> removed_sources;
+  removed_sources.reserve(sources_set.size());
+  for (auto& e : sources_set) {
+    removed_sources.push_back(e);
+  }
+
+  std::vector<rgw_bucket> removed_dests;
+  removed_dests.reserve(dests_set.size());
+  for (auto& e : dests_set) {
+    removed_dests.push_back(e);
+  }
+
+  std::vector<rgw_bucket> added_sources;
+  std::vector<rgw_bucket> added_dests;
+
+  return do_update_hints(bucket_info,
+                         added_dests,
+                         removed_dests,
+                         added_sources,
+                         removed_sources,
+                         y);
 }
 
 int RGWSI_Bucket_Sync_SObj::handle_bi_update(RGWBucketInfo& bucket_info,
-                                             RGWBucketInfo *orig_bucket_info)
+                                             RGWBucketInfo *orig_bucket_info,
+                                             optional_yield y)
 {
   std::set<rgw_bucket> orig_sources;
   std::set<rgw_bucket> orig_dests;
@@ -143,16 +646,73 @@ int RGWSI_Bucket_Sync_SObj::handle_bi_update(RGWBucketInfo& bucket_info,
 
   std::vector<rgw_bucket> removed_sources;
   std::vector<rgw_bucket> added_sources;
-  diff_sets(orig_sources, sources, &added_sources, &removed_sources);
+  bool found = diff_sets(orig_sources, sources, &added_sources, &removed_sources);
   ldout(cct, 20) << __func__ << "(): bucket=" << bucket_info.bucket << ": orig_sources=" << orig_sources << " new_sources=" << sources << dendl;
   ldout(cct, 20) << __func__ << "(): bucket=" << bucket_info.bucket << ":  potential sources added=" << added_sources << " removed=" << removed_sources << dendl;
   
   std::vector<rgw_bucket> removed_dests;
   std::vector<rgw_bucket> added_dests;
-  diff_sets(orig_dests, dests, &added_dests, &removed_dests);
+  found = found || diff_sets(orig_dests, dests, &added_dests, &removed_dests);
+
   ldout(cct, 20) << __func__ << "(): bucket=" << bucket_info.bucket << ": orig_dests=" << orig_dests << " new_dests=" << dests << dendl;
   ldout(cct, 20) << __func__ << "(): bucket=" << bucket_info.bucket << ":  potential dests added=" << added_dests << " removed=" << removed_dests << dendl;
 
-  return 0;
+  if (!found) {
+    return 0;
+  }
+
+  return do_update_hints(bucket_info,
+                         added_dests,
+                         removed_dests,
+                         added_sources,
+                         removed_sources,
+                         y);
+}
+
+int RGWSI_Bucket_Sync_SObj::get_bucket_sync_hints(const rgw_bucket& bucket,
+                                                  std::set<rgw_bucket> *sources,
+                                                  std::set<rgw_bucket> *dests,
+                                                  optional_yield y)
+{
+  if (!sources && !dests) {
+    return 0;
+  }
+
+  if (sources) {
+    RGWSI_BS_SObj_HintIndexObj index(svc.sysobj,
+                                     hint_index_mgr.get_sources_obj(bucket));
+    int r = index.read(y);
+    if (r < 0) {
+      ldout(cct, 0) << "ERROR: failed to update sources index for bucket=" << bucket << " r=" << r << dendl;
+      return r;
+    }
 
+    index.get_entities(bucket, dests);
+
+    if (!bucket.bucket_id.empty()) {
+      rgw_bucket b = bucket;
+      b.bucket_id.clear();
+      index.get_entities(bucket, dests);
+    }
+  }
+
+  if (dests) {
+    RGWSI_BS_SObj_HintIndexObj index(svc.sysobj,
+                                     hint_index_mgr.get_dests_obj(bucket));
+    int r = index.read(y);
+    if (r < 0) {
+      ldout(cct, 0) << "ERROR: failed to read targets index for bucket=" << bucket << " r=" << r << dendl;
+      return r;
+    }
+
+    index.get_entities(bucket, dests);
+
+    if (!bucket.bucket_id.empty()) {
+      rgw_bucket b = bucket;
+      b.bucket_id.clear();
+      index.get_entities(bucket, dests);
+    }
+  }
+
+  return 0;
 }
index 0271d9faff66b25aee96062ffed32f6a7ba0b0af..a0305e12b4d93e7afeab2ec81c63ea6cd79a8d2c 100644 (file)
@@ -38,10 +38,37 @@ class RGWSI_Bucket_Sync_SObj : public RGWSI_Bucket_Sync
   using RGWChainedCacheImpl_bucket_sync_policy_cache_entry = RGWChainedCacheImpl<bucket_sync_policy_cache_entry>;
   unique_ptr<RGWChainedCacheImpl_bucket_sync_policy_cache_entry> sync_policy_cache;
 
+  class HintIndexManager {
+    struct {
+      RGWSI_Zone *zone;
+      RGWSI_SysObj *sysobj;
+    } svc;
+
+  public:
+    HintIndexManager() {}
+
+    void init(RGWSI_Zone *_zone_svc,
+              RGWSI_SysObj *_sysobj_svc) {
+      svc.zone = _zone_svc;
+      svc.sysobj = _sysobj_svc;
+    }
+
+    rgw_raw_obj get_sources_obj(const rgw_bucket& bucket) const;
+    rgw_raw_obj get_dests_obj(const rgw_bucket& bucket) const;
+  } hint_index_mgr;
+
   int do_start() override;
+
+  int do_update_hints(const RGWBucketInfo& bucket_info,
+                      std::vector<rgw_bucket>& added_dests,
+                      std::vector<rgw_bucket>& removed_dests,
+                      std::vector<rgw_bucket>& added_sources,
+                      std::vector<rgw_bucket>& removed_sources,
+                      optional_yield y);
 public:
   struct Svc {
     RGWSI_Zone *zone{nullptr};
+    RGWSI_SysObj *sysobj{nullptr};
     RGWSI_SysObj_Cache *cache{nullptr};
     RGWSI_Bucket_SObj *bucket_sobj{nullptr};
   } svc;
@@ -50,6 +77,7 @@ public:
   ~RGWSI_Bucket_Sync_SObj();
 
   void init(RGWSI_Zone *_zone_svc,
+            RGWSI_SysObj *_sysobj_svc,
             RGWSI_SysObj_Cache *_cache_svc,
             RGWSI_Bucket_SObj *_bucket_sobj_svc);
 
@@ -61,6 +89,14 @@ public:
                          optional_yield y) override;
 
   int handle_bi_update(RGWBucketInfo& bucket_info,
-                       RGWBucketInfo *orig_bucket_info) override;
+                       RGWBucketInfo *orig_bucket_info,
+                       optional_yield y) override;
+  int handle_bi_removal(const RGWBucketInfo& bucket_info,
+                        optional_yield y) override;
+
+  int get_bucket_sync_hints(const rgw_bucket& bucket,
+                            std::set<rgw_bucket> *sources,
+                            std::set<rgw_bucket> *dests,
+                            optional_yield y) override;
 };
 
index c86d28c591f6ce2290a2792413d8ee8cc965b89e..511984f9a292aa34693d3b75d45a703892321a44 100644 (file)
@@ -23,11 +23,13 @@ RGWSI_Zone::RGWSI_Zone(CephContext *cct) : RGWServiceInstance(cct)
 
 void RGWSI_Zone::init(RGWSI_SysObj *_sysobj_svc,
                       RGWSI_RADOS * _rados_svc,
-                      RGWSI_SyncModules * _sync_modules_svc)
+                      RGWSI_SyncModules * _sync_modules_svc,
+                     RGWSI_Bucket_Sync *_bucket_sync_svc)
 {
   sysobj_svc = _sysobj_svc;
   rados_svc = _rados_svc;
   sync_modules_svc = _sync_modules_svc;
+  bucket_sync_svc = _bucket_sync_svc;
 
   realm = new RGWRealm();
   zonegroup = new RGWZoneGroup();
@@ -164,7 +166,13 @@ int RGWSI_Zone::do_start()
   zone_short_id = current_period->get_map().get_zone_short_id(zone_params->get_id());
 
   for (auto ziter : zonegroup->zones) {
-    sync_policy_handlers[ziter.second.id].reset(new RGWBucketSyncPolicyHandler(this, sync_modules_svc, ziter.second.name));
+    auto zone_handler = new RGWBucketSyncPolicyHandler(this, sync_modules_svc, bucket_sync_svc, ziter.second.name);
+    ret = zone_handler->init(null_yield);
+    if (ret < 0) {
+      lderr(cct) << "ERROR: could not initialize zone policy handler for zone=" << ziter.second.name << dendl;
+      return ret;
+      }
+    sync_policy_handlers[ziter.second.id].reset(zone_handler);
   }
 
   sync_policy_handler = sync_policy_handlers[zone_id()]; /* we made sure earlier that zonegroup->zones has our zone */
index 50403343d36fa6ff747e46b3efa0ef82db31f697..e20575f163278b27f820f64afca9d1c527f31436 100644 (file)
@@ -9,6 +9,7 @@
 class RGWSI_RADOS;
 class RGWSI_SysObj;
 class RGWSI_SyncModules;
+class RGWSI_Bucket_Sync;
 
 class RGWRealm;
 class RGWZoneGroup;
@@ -30,6 +31,7 @@ class RGWSI_Zone : public RGWServiceInstance
   RGWSI_SysObj *sysobj_svc{nullptr};
   RGWSI_RADOS *rados_svc{nullptr};
   RGWSI_SyncModules *sync_modules_svc{nullptr};
+  RGWSI_Bucket_Sync *bucket_sync_svc{nullptr};
 
   RGWRealm *realm{nullptr};
   RGWZoneGroup *zonegroup{nullptr};
@@ -55,7 +57,8 @@ class RGWSI_Zone : public RGWServiceInstance
 
   void init(RGWSI_SysObj *_sysobj_svc,
            RGWSI_RADOS *_rados_svc,
-           RGWSI_SyncModules *_sync_modules_svc);
+           RGWSI_SyncModules *_sync_modules_svc,
+          RGWSI_Bucket_Sync *_bucket_sync_svc);
   int do_start() override;
   void shutdown() override;