From 41b14af3320d0eb5ca600501f084801605ef387e Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Mon, 19 Aug 2019 15:43:33 -0700 Subject: [PATCH] rgw: defined bucket sync sources manager Signed-off-by: Yehuda Sadeh --- src/rgw/rgw_bucket.cc | 3 + src/rgw/rgw_bucket_sync.h | 4 +- src/rgw/rgw_data_sync.cc | 120 ++++++++++++++++++++++++++++++++++---- 3 files changed, 114 insertions(+), 13 deletions(-) diff --git a/src/rgw/rgw_bucket.cc b/src/rgw/rgw_bucket.cc index 5aef8f3b84dd6..0ea81b46af1c6 100644 --- a/src/rgw/rgw_bucket.cc +++ b/src/rgw/rgw_bucket.cc @@ -2181,9 +2181,12 @@ int RGWDataChangesLog::get_log_shard_id(rgw_bucket& bucket, int shard_id) { } int RGWDataChangesLog::add_entry(const RGWBucketInfo& bucket_info, int shard_id) { +#warning FIXME +#if 0 if (!ctl.bucket->bucket_exports_data(bucket_info.bucket, null_yield)) { return 0; } +#endif auto& bucket = bucket_info.bucket; diff --git a/src/rgw/rgw_bucket_sync.h b/src/rgw/rgw_bucket_sync.h index aad992d0cba9f..72e7dedb69b54 100644 --- a/src/rgw/rgw_bucket_sync.h +++ b/src/rgw/rgw_bucket_sync.h @@ -45,8 +45,8 @@ class RGWBucketSyncPolicyHandler { public: RGWBucketSyncPolicyHandler(RGWSI_Zone *_zone_svc, - RGWBucketInfo& _bucket_info); - + RGWBucketInfo& _bucket_info) : zone_svc(_zone_svc), + bucket_info(_bucket_info) {} int init(); const RGWBucketInfo& get_bucket_info() const { diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index ed26f3e349afa..79ec05e8f55d6 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -3331,15 +3331,110 @@ int RGWBucketShardIncrementalSyncCR::operate() return 0; } -#if 0 +class RGWBucketSyncSourcesManager { +public: + static string sync_sources_oid(const rgw_bucket bucket) { + return bucket_sync_sources_oid_prefix + "." + bucket.to_str(); + } + + static rgw_raw_obj sync_sources_obj(RGWSI_Zone *zone_svc, const rgw_bucket& bucket) { + return rgw_raw_obj(zone_svc->get_zone_params().log_pool, status_oid(bucket)), + } +}; + +struct rgw_bucket_sync_source_local_info { + string id; + string type; + string zone; + rgw_bucket bucket; + /* FIXME: config */ + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + encode(id, bl); + encode(type, bl); + encode(zone, bl); + encode(bucket, bl); + ENCODE_FINISH(bl); + } + + void decode(bufferlist::const_iterator& bl) { + DECODE_START(1, bl); + decode(id, bl); + decode(type, bl); + decode(zone, bl); + decode(bucket, bl); + DECODE_FINISH(bl); + } + void dump(ceph::Formatter *f) const { + encode_json("id", id, f); + encode_json("type", type, f); + encode_json("zone", zone, f); + encode_json("bucket", bucket, f); + } +}; +WRITE_CLASS_ENCODER(rgw_bucket_sync_source_local_info) + +struct rgw_bucket_sync_sources_local_info { + map sources; /* id -> source */ + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + encode(sources, bl); + ENCODE_FINISH(bl); + } + + void decode(bufferlist::const_iterator& bl) { + DECODE_START(1, bl); + decode(sources, bl); + DECODE_FINISH(bl); + } + + void dump(ceph::Formatter *f) const { + encode_json("sources", type, f); + } +}; +WRITE_CLASS_ENCODER(rgw_bucket_sync_sources_local_info) + +class RGWReadBucketSourcesInfoCR : public RGWCoroutine { + rgw_bucket bucket; + + rgw_raw_obj sources_obj; + + RGWSyncTraceNodeRef tn; + +public: + RGWReadBucketSourcesInfoCR(RGWSI_Zone *_zone_svc, + const rgw_bucket& _bucket, + const RGWSyncTraceNodeRef& _tn_parent) + : RGWCoroutine(_zone_svc->cct), + bucket(_bucket), + source_obj(RGWBucketSyncSourcesManager::sync_sources_obj(bucket)), + tn(sync_env->sync_tracer->add_node(_tn_parent, "read_bucket_sources", + SSTR(bucket))) { + } + + int operate() override; +}; + +int RGWReadBucketSourcesInfoCR::operate() +{ + reenter(this) { + yield call(new RGWSimpleRadosReadCR(sync_env->async_rados, + sync_env->svc.sysobj, + RGWBucketSyncSourcesManager::sync_sources_obj(bucket), + &pinfo)); + return set_cr_done(); + } +} + class RGWRunBucketSourcesSyncCR : public RGWCoroutine { RGWDataSyncCtx *sc; RGWDataSyncEnv *sync_env; rgw_bucket bucket; rgw_sync_source source; - RGWMetaSyncEnv meta_sync_env; - const std::string status_oid; + rgw_raw_obj sources_obj; boost::intrusive_ptr lease_cr; boost::intrusive_ptr lease_stack; @@ -3347,11 +3442,15 @@ class RGWRunBucketSourcesSyncCR : public RGWCoroutine { RGWSyncTraceNodeRef tn; public: - RGWRunBucketSourcesSyncCR(RGWDataSyncCtx *_sc, const rgw_bucket& bucket, const RGWSyncTraceNodeRef& _tn_parent) - : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env), bucket(_bucket), - status_oid(RGWBucketPipeSyncStatusManager::status_oid(sc->source_zone, bucket)), - tn(sync_env->sync_tracer->add_node(_tn_parent, "bucket_source", - SSTR(bucket_shard_str{bs}))) { + RGWRunBucketSourcesSyncCR(RGWSI_Zone *_zone_svc, + const rgw_bucket& _bucket, + const RGWSyncTraceNodeRef& _tn_parent) + : RGWCoroutine(_sc->cct), + sc(_sc), sync_env(_sc->env), + bucket(_bucket), + source_obj(RGWBucketSyncSourcesManager::sync_sources_obj(bucket)), + tn(sync_env->sync_tracer->add_node(_tn_parent, "bucket_sync_sources", + SSTR(bucket))) { } ~RGWRunBucketSourcesSyncCR() override { if (lease_cr) { @@ -3369,7 +3468,7 @@ int RGWRunBucketSourcesSyncCR::operate() set_status("acquiring sync lock"); auto store = sync_env->store; lease_cr.reset(new RGWContinuousLeaseCR(sync_env->async_rados, store, - rgw_raw_obj(store->svc->zone->get_zone_params().log_pool, status_oid), + RGWBucketSyncSourcesManager::sync_sources_obj(bucket), "sync_lock", cct->_conf->rgw_sync_lease_period, this)); @@ -3386,7 +3485,7 @@ int RGWRunBucketSourcesSyncCR::operate() } tn->log(10, "took lease"); - yield call(new RGWReadBucketSourcesInfoCR(sc, bs.bucket, &info)); + yield call(new RGWReadBucketSourcesInfoCR(sc, bucket, &info)); if (retcode < 0 && retcode != -ENOENT) { tn->log(0, "ERROR: failed to read sync status for bucket"); lease_cr->go_down(); @@ -3413,7 +3512,6 @@ int RGWRunBucketSourcesSyncCR::operate() return 0; } -#endif int RGWRunBucketSyncCoroutine::operate() { -- 2.39.5