From 3bc7ca6d8b840ad446282ead9db0874913224fce Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Wed, 21 Aug 2019 17:24:17 -0700 Subject: [PATCH] rgw: bucket sync: read local bucket sources And also add functionality to get bucket sync policy handler (in cr). Next we'll need to compare local bucket sources with what policy handler defines, and act if it's different. Signed-off-by: Yehuda Sadeh --- src/rgw/rgw_bucket.cc | 24 ++++++++++----- src/rgw/rgw_bucket.h | 4 +++ src/rgw/rgw_cr_tools.cc | 16 ++++++++++ src/rgw/rgw_cr_tools.h | 10 +++++++ src/rgw/rgw_data_sync.cc | 64 +++++++++++++++++++++++++++++----------- 5 files changed, 92 insertions(+), 26 deletions(-) diff --git a/src/rgw/rgw_bucket.cc b/src/rgw/rgw_bucket.cc index 0ea81b46af1..3ba6326e35b 100644 --- a/src/rgw/rgw_bucket.cc +++ b/src/rgw/rgw_bucket.cc @@ -3660,17 +3660,28 @@ int RGWBucketCtl::sync_user_stats(const rgw_user& user_id, return ctl.user->flush_bucket_stats(user_id, *pent); } +int RGWBucketCtl::get_sync_policy_handler(const rgw_bucket& bucket, + RGWBucketSyncPolicyHandlerRef *phandler, + optional_yield y) +{ + int r = call([&](RGWSI_Bucket_X_Ctx& ctx) { + return svc.bucket_sync->get_policy_handler(ctx.bi, bucket, phandler, y); + }); + if (r < 0) { + ldout(cct, 20) << __func__ << "(): failed to get policy handler for bucket=" << bucket << " (r=" << r << ")" << dendl; + return r; + } + return 0; +} + int RGWBucketCtl::bucket_exports_data(const rgw_bucket& bucket, optional_yield y) { RGWBucketSyncPolicyHandlerRef handler; - int r = call([&](RGWSI_Bucket_X_Ctx& ctx) { - return svc.bucket_sync->get_policy_handler(ctx.bi, bucket, &handler, y); - }); + int r = get_sync_policy_handler(bucket, &handler, y); if (r < 0) { - ldout(cct, 20) << __func__ << "(): failed to read bucket stats (r=" << r << ")" << dendl; return r; } @@ -3683,11 +3694,8 @@ int RGWBucketCtl::bucket_imports_data(const rgw_bucket& bucket, RGWBucketSyncPolicyHandlerRef handler; - int r = call([&](RGWSI_Bucket_X_Ctx& ctx) { - return svc.bucket_sync->get_policy_handler(ctx.bi, bucket, &handler, y); - }); + int r = get_sync_policy_handler(bucket, &handler, y); if (r < 0) { - ldout(cct, 20) << __func__ << "(): failed to read bucket stats (r=" << r << ")" << dendl; return r; } diff --git a/src/rgw/rgw_bucket.h b/src/rgw/rgw_bucket.h index d3f3946d86a..6c22a9b413b 100644 --- a/src/rgw/rgw_bucket.h +++ b/src/rgw/rgw_bucket.h @@ -21,6 +21,7 @@ #include "rgw_formats.h" #include "services/svc_bucket_types.h" +#include "services/svc_bucket_sync.h" static constexpr size_t listing_max_entries = 1000; @@ -869,6 +870,9 @@ public: RGWBucketEnt* pent = nullptr); /* bucket sync */ + int get_sync_policy_handler(const rgw_bucket& bucket, + RGWBucketSyncPolicyHandlerRef *phandler, + optional_yield y); int bucket_exports_data(const rgw_bucket& bucket, optional_yield y); int bucket_imports_data(const rgw_bucket& bucket, diff --git a/src/rgw/rgw_cr_tools.cc b/src/rgw/rgw_cr_tools.cc index ff904c3ca51..fe9e355bc39 100644 --- a/src/rgw/rgw_cr_tools.cc +++ b/src/rgw/rgw_cr_tools.cc @@ -273,3 +273,19 @@ int RGWBucketLifecycleConfigCR::Request::_send_request() return 0; } + +template<> +int RGWBucketGetSyncPolicyHandlerCR::Request::_send_request() +{ + CephContext *cct = store->ctx(); + + int r = store->ctl()->bucket->get_sync_policy_handler(params.bucket, + &result->policy_handler, + null_yield); + if (r < 0) { + lderr(cct) << "ERROR: " << __func__ << "(): get_sync_policy_handler() returned " << r << dendl; + return r; + } + + return 0; +} diff --git a/src/rgw/rgw_cr_tools.h b/src/rgw/rgw_cr_tools.h index 98eadf66c0f..33cc27b7a06 100644 --- a/src/rgw/rgw_cr_tools.h +++ b/src/rgw/rgw_cr_tools.h @@ -8,6 +8,7 @@ #include "rgw_tools.h" #include "rgw_lc.h" +#include "services/svc_bucket_sync.h" struct rgw_user_create_params { rgw_user user; @@ -74,5 +75,14 @@ struct rgw_bucket_lifecycle_config_params { using RGWBucketLifecycleConfigCR = RGWSimpleWriteOnlyAsyncCR; +struct rgw_bucket_get_sync_policy_params { + rgw_bucket bucket; +}; + +struct rgw_bucket_get_sync_policy_result { + RGWBucketSyncPolicyHandlerRef policy_handler; +}; + +using RGWBucketGetSyncPolicyHandlerCR = RGWSimpleAsyncCR; #endif diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index 79ec05e8f55..9fb9cf1ef73 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -18,6 +18,7 @@ #include "rgw_rest_conn.h" #include "rgw_cr_rados.h" #include "rgw_cr_rest.h" +#include "rgw_cr_tools.h" #include "rgw_http_client.h" #include "rgw_bucket.h" #include "rgw_metadata.h" @@ -3334,11 +3335,11 @@ int RGWBucketShardIncrementalSyncCR::operate() class RGWBucketSyncSourcesManager { public: static string sync_sources_oid(const rgw_bucket bucket) { - return bucket_sync_sources_oid_prefix + "." + bucket.to_str(); + return bucket_sync_sources_oid_prefix + "." + bucket.get_key(); } static rgw_raw_obj sync_sources_obj(RGWSI_Zone *zone_svc, const rgw_bucket& bucket) { - return rgw_raw_obj(zone_svc->get_zone_params().log_pool, status_oid(bucket)), + return rgw_raw_obj(zone_svc->get_zone_params().log_pool, sync_sources_oid(bucket)); } }; @@ -3391,25 +3392,36 @@ struct rgw_bucket_sync_sources_local_info { } void dump(ceph::Formatter *f) const { - encode_json("sources", type, f); + encode_json("sources", sources, f); } }; WRITE_CLASS_ENCODER(rgw_bucket_sync_sources_local_info) class RGWReadBucketSourcesInfoCR : public RGWCoroutine { + RGWDataSyncEnv *sync_env; rgw_bucket bucket; + RGWBucketInfo bucket_info; + rgw_raw_obj sources_obj; + rgw_bucket_sync_sources_local_info sources_local_info; + rgw_bucket_sync_sources_local_info expected_local_info; + + rgw_bucket_get_sync_policy_params get_policy_params; + std::shared_ptr get_policy_result; + RGWSyncTraceNodeRef tn; public: - RGWReadBucketSourcesInfoCR(RGWSI_Zone *_zone_svc, - const rgw_bucket& _bucket, - const RGWSyncTraceNodeRef& _tn_parent) - : RGWCoroutine(_zone_svc->cct), + RGWReadBucketSourcesInfoCR(RGWDataSyncEnv *_sync_env, + const rgw_bucket& _bucket, + const RGWSyncTraceNodeRef& _tn_parent) + : RGWCoroutine(_sync_env->cct), + sync_env(_sync_env), bucket(_bucket), - source_obj(RGWBucketSyncSourcesManager::sync_sources_obj(bucket)), + sources_obj(RGWBucketSyncSourcesManager::sync_sources_obj(sync_env->svc->zone, bucket)), + get_policy_result(make_shared()), tn(sync_env->sync_tracer->add_node(_tn_parent, "read_bucket_sources", SSTR(bucket))) { } @@ -3421,15 +3433,31 @@ int RGWReadBucketSourcesInfoCR::operate() { reenter(this) { yield call(new RGWSimpleRadosReadCR(sync_env->async_rados, - sync_env->svc.sysobj, - RGWBucketSyncSourcesManager::sync_sources_obj(bucket), - &pinfo)); + sync_env->svc->sysobj, + RGWBucketSyncSourcesManager::sync_sources_obj(sync_env->svc->zone, bucket), + &sources_local_info)); + if (retcode < 0 && + retcode != -ENOENT) { + return set_cr_error(retcode); + } + + get_policy_params.bucket = bucket; + yield call(new RGWBucketGetSyncPolicyHandlerCR(sync_env->async_rados, + sync_env->store, + get_policy_params, + get_policy_result)); + if (retcode < 0 && + retcode != -ENOENT) { + return set_cr_error(retcode); + } + return set_cr_done(); } + + return 0; } class RGWRunBucketSourcesSyncCR : public RGWCoroutine { - RGWDataSyncCtx *sc; RGWDataSyncEnv *sync_env; rgw_bucket bucket; rgw_sync_source source; @@ -3442,13 +3470,13 @@ class RGWRunBucketSourcesSyncCR : public RGWCoroutine { RGWSyncTraceNodeRef tn; public: - RGWRunBucketSourcesSyncCR(RGWSI_Zone *_zone_svc, + RGWRunBucketSourcesSyncCR(RGWDataSyncEnv *_sync_env, const rgw_bucket& _bucket, const RGWSyncTraceNodeRef& _tn_parent) - : RGWCoroutine(_sc->cct), - sc(_sc), sync_env(_sc->env), + : RGWCoroutine(_sync_env->cct), + sync_env(_sync_env), bucket(_bucket), - source_obj(RGWBucketSyncSourcesManager::sync_sources_obj(bucket)), + sources_obj(RGWBucketSyncSourcesManager::sync_sources_obj(_sync_env->svc->zone, bucket)), tn(sync_env->sync_tracer->add_node(_tn_parent, "bucket_sync_sources", SSTR(bucket))) { } @@ -3468,7 +3496,7 @@ int RGWRunBucketSourcesSyncCR::operate() set_status("acquiring sync lock"); auto store = sync_env->store; lease_cr.reset(new RGWContinuousLeaseCR(sync_env->async_rados, store, - RGWBucketSyncSourcesManager::sync_sources_obj(bucket), + sources_obj, "sync_lock", cct->_conf->rgw_sync_lease_period, this)); @@ -3485,7 +3513,7 @@ int RGWRunBucketSourcesSyncCR::operate() } tn->log(10, "took lease"); - yield call(new RGWReadBucketSourcesInfoCR(sc, bucket, &info)); + yield call(new RGWReadBucketSourcesInfoCR(sync_env, bucket, tn, &info)); if (retcode < 0 && retcode != -ENOENT) { tn->log(0, "ERROR: failed to read sync status for bucket"); lease_cr->go_down(); -- 2.39.5