]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
rgw: bucket sync: read local bucket sources
authorYehuda Sadeh <yehuda@redhat.com>
Thu, 22 Aug 2019 00:24:17 +0000 (17:24 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Tue, 28 Jan 2020 18:20:36 +0000 (10:20 -0800)
And also add functionality to get bucket sync policy handler (in cr).
Next we'll need to compare local bucket sources with what policy
handler defines, and act if it's different.

Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_bucket.cc
src/rgw/rgw_bucket.h
src/rgw/rgw_cr_tools.cc
src/rgw/rgw_cr_tools.h
src/rgw/rgw_data_sync.cc

index 0ea81b46af1c662d1824e5e03eec91cba0e3692c..3ba6326e35b9168152d1a15ed26bc0aeede0bd5e 100644 (file)
@@ -3660,17 +3660,28 @@ int RGWBucketCtl::sync_user_stats(const rgw_user& user_id,
   return ctl.user->flush_bucket_stats(user_id, *pent);
 }
 
+int RGWBucketCtl::get_sync_policy_handler(const rgw_bucket& bucket,
+                                          RGWBucketSyncPolicyHandlerRef *phandler,
+                                          optional_yield y)
+{
+  int r = call([&](RGWSI_Bucket_X_Ctx& ctx) {
+    return svc.bucket_sync->get_policy_handler(ctx.bi, bucket, phandler, y);
+  });
+  if (r < 0) {
+    ldout(cct, 20) << __func__ << "(): failed to get policy handler for bucket=" << bucket << " (r=" << r << ")" << dendl;
+    return r;
+  }
+  return 0;
+}
+
 int RGWBucketCtl::bucket_exports_data(const rgw_bucket& bucket,
                                       optional_yield y)
 {
 
   RGWBucketSyncPolicyHandlerRef handler;
 
-  int r = call([&](RGWSI_Bucket_X_Ctx& ctx) {
-    return svc.bucket_sync->get_policy_handler(ctx.bi, bucket, &handler, y);
-  });
+  int r = get_sync_policy_handler(bucket, &handler, y);
   if (r < 0) {
-    ldout(cct, 20) << __func__ << "(): failed to read bucket stats (r=" << r << ")" << dendl;
     return r;
   }
 
@@ -3683,11 +3694,8 @@ int RGWBucketCtl::bucket_imports_data(const rgw_bucket& bucket,
 
   RGWBucketSyncPolicyHandlerRef handler;
 
-  int r = call([&](RGWSI_Bucket_X_Ctx& ctx) {
-    return svc.bucket_sync->get_policy_handler(ctx.bi, bucket, &handler, y);
-  });
+  int r = get_sync_policy_handler(bucket, &handler, y);
   if (r < 0) {
-    ldout(cct, 20) << __func__ << "(): failed to read bucket stats (r=" << r << ")" << dendl;
     return r;
   }
 
index d3f3946d86a23720df67fa9d702c92e15d5b2439..6c22a9b413b1efc565d2b5027368090f500abcd7 100644 (file)
@@ -21,6 +21,7 @@
 #include "rgw_formats.h"
 
 #include "services/svc_bucket_types.h"
+#include "services/svc_bucket_sync.h"
 
 
 static constexpr size_t listing_max_entries = 1000;
@@ -869,6 +870,9 @@ public:
                       RGWBucketEnt* pent = nullptr);
 
   /* bucket sync */
+  int get_sync_policy_handler(const rgw_bucket& bucket,
+                             RGWBucketSyncPolicyHandlerRef *phandler,
+                             optional_yield y);
   int bucket_exports_data(const rgw_bucket& bucket,
                           optional_yield y);
   int bucket_imports_data(const rgw_bucket& bucket,
index ff904c3ca51f2316437b8c2f9ef67a95813f3e12..fe9e355bc3949ae47937c254e169b415ff8ea472 100644 (file)
@@ -273,3 +273,19 @@ int RGWBucketLifecycleConfigCR::Request::_send_request()
 
   return 0;
 }
+
+template<>
+int RGWBucketGetSyncPolicyHandlerCR::Request::_send_request()
+{
+  CephContext *cct = store->ctx();
+
+  int r = store->ctl()->bucket->get_sync_policy_handler(params.bucket,
+                                                        &result->policy_handler,
+                                                        null_yield);
+  if (r < 0) {
+    lderr(cct) << "ERROR: " << __func__ << "(): get_sync_policy_handler() returned " << r << dendl;
+    return  r;
+  }
+
+  return 0;
+}
index 98eadf66c0f7232882ebbd595e90bac6bc4e9c8e..33cc27b7a061597d59666bf1daaddf1b01a8f73a 100644 (file)
@@ -8,6 +8,7 @@
 #include "rgw_tools.h"
 #include "rgw_lc.h"
 
+#include "services/svc_bucket_sync.h"
 
 struct rgw_user_create_params {
   rgw_user user;
@@ -74,5 +75,14 @@ struct rgw_bucket_lifecycle_config_params {
 
 using RGWBucketLifecycleConfigCR = RGWSimpleWriteOnlyAsyncCR<rgw_bucket_lifecycle_config_params>;
 
+struct rgw_bucket_get_sync_policy_params {
+  rgw_bucket bucket;
+};
+
+struct rgw_bucket_get_sync_policy_result {
+  RGWBucketSyncPolicyHandlerRef policy_handler;
+};
+
+using RGWBucketGetSyncPolicyHandlerCR = RGWSimpleAsyncCR<rgw_bucket_get_sync_policy_params, rgw_bucket_get_sync_policy_result>;
 
 #endif
index 79ec05e8f55d6571f9ae9da459a7119769dd3768..9fb9cf1ef73770d1117395b0a0eb2f41b0e273a5 100644 (file)
@@ -18,6 +18,7 @@
 #include "rgw_rest_conn.h"
 #include "rgw_cr_rados.h"
 #include "rgw_cr_rest.h"
+#include "rgw_cr_tools.h"
 #include "rgw_http_client.h"
 #include "rgw_bucket.h"
 #include "rgw_metadata.h"
@@ -3334,11 +3335,11 @@ int RGWBucketShardIncrementalSyncCR::operate()
 class RGWBucketSyncSourcesManager {
 public:
   static string sync_sources_oid(const rgw_bucket bucket) {
-    return bucket_sync_sources_oid_prefix + "." + bucket.to_str();
+    return bucket_sync_sources_oid_prefix + "." + bucket.get_key();
   }
 
   static rgw_raw_obj sync_sources_obj(RGWSI_Zone *zone_svc, const rgw_bucket& bucket) { 
-    return rgw_raw_obj(zone_svc->get_zone_params().log_pool, status_oid(bucket)),
+    return rgw_raw_obj(zone_svc->get_zone_params().log_pool, sync_sources_oid(bucket));
   }
 };
 
@@ -3391,25 +3392,36 @@ struct rgw_bucket_sync_sources_local_info {
   }
 
   void dump(ceph::Formatter *f) const {
-    encode_json("sources", type, f);
+    encode_json("sources", sources, f);
   }
 };
 WRITE_CLASS_ENCODER(rgw_bucket_sync_sources_local_info)
 
 class RGWReadBucketSourcesInfoCR : public RGWCoroutine {
+  RGWDataSyncEnv *sync_env;
   rgw_bucket bucket;
 
+  RGWBucketInfo bucket_info;
+
   rgw_raw_obj sources_obj;
 
+  rgw_bucket_sync_sources_local_info sources_local_info;
+  rgw_bucket_sync_sources_local_info expected_local_info;
+
+  rgw_bucket_get_sync_policy_params get_policy_params;
+  std::shared_ptr<rgw_bucket_get_sync_policy_result> get_policy_result;
+
   RGWSyncTraceNodeRef tn;
 
 public:
-  RGWReadBucketSourcesInfoCR(RGWSI_Zone *_zone_svc,
-                            const rgw_bucket& _bucket,
-                            const RGWSyncTraceNodeRef& _tn_parent)
-    : RGWCoroutine(_zone_svc->cct),
+  RGWReadBucketSourcesInfoCR(RGWDataSyncEnv *_sync_env,
+                             const rgw_bucket& _bucket,
+                             const RGWSyncTraceNodeRef& _tn_parent)
+    : RGWCoroutine(_sync_env->cct),
+      sync_env(_sync_env),
       bucket(_bucket),
-      source_obj(RGWBucketSyncSourcesManager::sync_sources_obj(bucket)),
+      sources_obj(RGWBucketSyncSourcesManager::sync_sources_obj(sync_env->svc->zone, bucket)),
+      get_policy_result(make_shared<rgw_bucket_get_sync_policy_result>()),
       tn(sync_env->sync_tracer->add_node(_tn_parent, "read_bucket_sources",
                                          SSTR(bucket))) {
   }
@@ -3421,15 +3433,31 @@ int RGWReadBucketSourcesInfoCR::operate()
 {
   reenter(this) {
     yield call(new RGWSimpleRadosReadCR<rgw_bucket_sync_sources_local_info>(sync_env->async_rados,
-                                                                            sync_env->svc.sysobj,
-                                                                            RGWBucketSyncSourcesManager::sync_sources_obj(bucket),
-                                                                            &pinfo));
+                                                                            sync_env->svc->sysobj,
+                                                                            RGWBucketSyncSourcesManager::sync_sources_obj(sync_env->svc->zone, bucket),
+                                                                            &sources_local_info));
+    if (retcode < 0 &&
+        retcode != -ENOENT) {
+      return set_cr_error(retcode);
+    }
+
+    get_policy_params.bucket = bucket;
+    yield call(new RGWBucketGetSyncPolicyHandlerCR(sync_env->async_rados,
+                                                   sync_env->store,
+                                                   get_policy_params,
+                                                   get_policy_result));
+    if (retcode < 0 &&
+        retcode != -ENOENT) {
+      return set_cr_error(retcode);
+    }
+
     return set_cr_done();
   }
+
+  return 0;
 }
 
 class RGWRunBucketSourcesSyncCR : public RGWCoroutine {
-  RGWDataSyncCtx *sc;
   RGWDataSyncEnv *sync_env;
   rgw_bucket bucket;
   rgw_sync_source source;
@@ -3442,13 +3470,13 @@ class RGWRunBucketSourcesSyncCR : public RGWCoroutine {
   RGWSyncTraceNodeRef tn;
 
 public:
-  RGWRunBucketSourcesSyncCR(RGWSI_Zone *_zone_svc,
+  RGWRunBucketSourcesSyncCR(RGWDataSyncEnv *_sync_env,
                             const rgw_bucket& _bucket,
                             const RGWSyncTraceNodeRef& _tn_parent)
-    : RGWCoroutine(_sc->cct),
-      sc(_sc), sync_env(_sc->env),
+    : RGWCoroutine(_sync_env->cct),
+      sync_env(_sync_env),
       bucket(_bucket),
-      source_obj(RGWBucketSyncSourcesManager::sync_sources_obj(bucket)),
+      sources_obj(RGWBucketSyncSourcesManager::sync_sources_obj(_sync_env->svc->zone, bucket)),
       tn(sync_env->sync_tracer->add_node(_tn_parent, "bucket_sync_sources",
                                          SSTR(bucket))) {
   }
@@ -3468,7 +3496,7 @@ int RGWRunBucketSourcesSyncCR::operate()
       set_status("acquiring sync lock");
       auto store = sync_env->store;
       lease_cr.reset(new RGWContinuousLeaseCR(sync_env->async_rados, store,
-                                              RGWBucketSyncSourcesManager::sync_sources_obj(bucket),
+                                              sources_obj,
                                               "sync_lock",
                                               cct->_conf->rgw_sync_lease_period,
                                               this));
@@ -3485,7 +3513,7 @@ int RGWRunBucketSourcesSyncCR::operate()
     }
 
     tn->log(10, "took lease");
-    yield call(new RGWReadBucketSourcesInfoCR(sc, bucket, &info));
+    yield call(new RGWReadBucketSourcesInfoCR(sync_env, bucket, tn, &info));
     if (retcode < 0 && retcode != -ENOENT) {
       tn->log(0, "ERROR: failed to read sync status for bucket");
       lease_cr->go_down();