From: Yehuda Sadeh Date: Sat, 24 Aug 2019 00:15:43 +0000 (-0700) Subject: rgw: define bucket sync pipes X-Git-Tag: v15.1.0~22^2~111 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=a0a68effe4a675ce86281162bcd3569135062121;p=ceph.git rgw: define bucket sync pipes Signed-off-by: Yehuda Sadeh --- diff --git a/src/rgw/rgw_bucket_sync.h b/src/rgw/rgw_bucket_sync.h index 72e7dedb69b..3858c95991b 100644 --- a/src/rgw/rgw_bucket_sync.h +++ b/src/rgw/rgw_bucket_sync.h @@ -27,6 +27,7 @@ class RGWBucketSyncPolicyHandler { std::set source_zones; +public: struct peer_info { std::string type; rgw_bucket bucket; @@ -38,10 +39,15 @@ class RGWBucketSyncPolicyHandler { } return (type < si.type); } + + bool is_rgw() const { + return (type.empty() || type == "rgw"); + } }; - std::map > sources; - std::map > targets; +private: + std::map > sources; /* peers by zone */ + std::map > targets; /* peers by zone */ public: RGWBucketSyncPolicyHandler(RGWSI_Zone *_zone_svc, @@ -49,6 +55,10 @@ public: bucket_info(_bucket_info) {} int init(); + std::map >& get_sources() { + return sources; + } + const RGWBucketInfo& get_bucket_info() const { return bucket_info; } diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index 9fb9cf1ef73..43687023c51 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -21,6 +21,7 @@ #include "rgw_cr_tools.h" #include "rgw_http_client.h" #include "rgw_bucket.h" +#include "rgw_bucket_sync.h" #include "rgw_metadata.h" #include "rgw_sync_counters.h" #include "rgw_sync_module.h" @@ -1039,6 +1040,7 @@ std::ostream& operator<<(std::ostream& out, const bucket_shard_str& rhs) { class RGWRunBucketSyncCoroutine : public RGWCoroutine { RGWDataSyncCtx *sc; RGWDataSyncEnv *sync_env; + rgw_bucket_sync_pair_info sync_pair; rgw_bucket_sync_pipe sync_pipe; rgw_bucket_shard_sync_info sync_status; RGWMetaSyncEnv meta_sync_env; @@ -1051,12 +1053,11 @@ class RGWRunBucketSyncCoroutine : public RGWCoroutine { RGWSyncTraceNodeRef tn; public: - RGWRunBucketSyncCoroutine(RGWDataSyncCtx *_sc, const rgw_bucket_shard& bs, const RGWSyncTraceNodeRef& _tn_parent) - : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env), - status_oid(RGWBucketPipeSyncStatusManager::status_oid(sc->source_zone, sync_pipe)), + RGWRunBucketSyncCoroutine(RGWDataSyncCtx *_sc, const rgw_bucket_sync_pair_info& _sync_pair, const RGWSyncTraceNodeRef& _tn_parent) + : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env), sync_pair(_sync_pair), + status_oid(RGWBucketPipeSyncStatusManager::status_oid(sc->source_zone, sync_pair)), tn(sync_env->sync_tracer->add_node(_tn_parent, "bucket", - SSTR(bucket_shard_str{bs}))) { - sync_pipe.source_bs = bs; + SSTR(bucket_shard_str{_sync_pair.dest_bs} << "<-" << bucket_shard_str{_sync_pair.source_bs} ))) { } ~RGWRunBucketSyncCoroutine() override { if (lease_cr) { @@ -1075,6 +1076,7 @@ class RGWDataSyncSingleEntryCR : public RGWCoroutine { string entry_marker; rgw_bucket_shard bs; + rgw_bucket_sync_pair_info sync_pair; int sync_status; @@ -1114,7 +1116,12 @@ public: marker_tracker->reset_need_retry(raw_key); } tn->log(0, SSTR("triggering sync of bucket/shard " << bucket_shard_str{bs})); - call(new RGWRunBucketSyncCoroutine(sc, bs, tn)); + + sync_pair = rgw_bucket_sync_pair_info(); + sync_pair.source_bs = bs; + sync_pair.dest_bs = bs; +#warning init pipe fields + call(new RGWRunBucketSyncCoroutine(sc, sync_pair, tn)); } } while (marker_tracker && marker_tracker->need_retry(raw_key)); @@ -1768,7 +1775,7 @@ int RGWDefaultSyncModule::create_instance(CephContext *cct, const JSONFormattabl RGWCoroutine *RGWDefaultDataSyncModule::sync_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, std::optional versioned_epoch, rgw_zone_set *zones_trace) { auto sync_env = sc->env; - return new RGWFetchRemoteObjCR(sync_env->async_rados, sync_env->store, sc->source_zone, sync_pipe.source_bs.bucket, + return new RGWFetchRemoteObjCR(sync_env->async_rados, sync_env->store, sc->source_zone, sync_pipe.info.source_bs.bucket, std::nullopt, sync_pipe.dest_bucket_info, key, std::nullopt, versioned_epoch, true, zones_trace, sync_env->counters, sync_env->dpp); @@ -1826,7 +1833,7 @@ int RGWArchiveSyncModule::create_instance(CephContext *cct, const JSONFormattabl RGWCoroutine *RGWArchiveDataSyncModule::sync_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, std::optional versioned_epoch, rgw_zone_set *zones_trace) { auto sync_env = sc->env; - ldout(sc->cct, 5) << "SYNC_ARCHIVE: sync_object: b=" << sync_pipe.source_bs.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl; + ldout(sc->cct, 5) << "SYNC_ARCHIVE: sync_object: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl; if (!sync_pipe.dest_bucket_info.versioned() || (sync_pipe.dest_bucket_info.flags & BUCKET_VERSIONS_SUSPENDED)) { ldout(sc->cct, 0) << "SYNC_ARCHIVE: sync_object: enabling object versioning for archive bucket" << dendl; @@ -1849,7 +1856,7 @@ RGWCoroutine *RGWArchiveDataSyncModule::sync_object(RGWDataSyncCtx *sc, rgw_buck } return new RGWFetchRemoteObjCR(sync_env->async_rados, sync_env->store, sc->source_zone, - sync_pipe.source_bs.bucket, std::nullopt, sync_pipe.dest_bucket_info, + sync_pipe.info.source_bs.bucket, std::nullopt, sync_pipe.dest_bucket_info, key, dest_key, versioned_epoch, true, zones_trace, nullptr, sync_env->dpp); } @@ -1857,14 +1864,14 @@ RGWCoroutine *RGWArchiveDataSyncModule::sync_object(RGWDataSyncCtx *sc, rgw_buck RGWCoroutine *RGWArchiveDataSyncModule::remove_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) { - ldout(sc->cct, 0) << "SYNC_ARCHIVE: remove_object: b=" << sync_pipe.source_bs.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch << dendl; + ldout(sc->cct, 0) << "SYNC_ARCHIVE: remove_object: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch << dendl; return NULL; } RGWCoroutine *RGWArchiveDataSyncModule::create_delete_marker(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) { - ldout(sc->cct, 0) << "SYNC_ARCHIVE: create_delete_marker: b=" << sync_pipe.source_bs.bucket << " k=" << key << " mtime=" << mtime + ldout(sc->cct, 0) << "SYNC_ARCHIVE: create_delete_marker: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl; auto sync_env = sc->env; return new RGWRemoveObjCR(sync_env->async_rados, sync_env->store, sc->source_zone, @@ -2111,14 +2118,14 @@ public: rgw_bucket_shard_sync_info& _status) : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env), sync_pipe(_sync_pipe), - sync_status_oid(RGWBucketPipeSyncStatusManager::status_oid(sc->source_zone, _sync_pipe)), + sync_status_oid(RGWBucketPipeSyncStatusManager::status_oid(sc->source_zone, _sync_pipe.info)), status(_status) {} int operate() override { reenter(this) { /* fetch current position in logs */ - yield call(new RGWReadRemoteBucketIndexLogInfoCR(sc, sync_pipe.source_bs, &info)); + yield call(new RGWReadRemoteBucketIndexLogInfoCR(sc, sync_pipe.info.source_bs, &info)); if (retcode < 0 && retcode != -ENOENT) { ldout(cct, 0) << "ERROR: failed to fetch bucket index status" << dendl; return set_cr_error(retcode); @@ -2160,6 +2167,7 @@ RGWCoroutine *RGWRemoteBucketLog::init_sync_status_cr() #warning FIXME rgw_bucket_sync_pipe sync_pipe; sync_pipe.source_bs = bs; + sync_pipe.dest_bs = bs; return new RGWInitBucketShardSyncStatusCoroutine(&sc, sync_pipe, init_status); } @@ -2231,10 +2239,10 @@ class RGWReadBucketPipeSyncStatusCoroutine : public RGWCoroutine { map attrs; public: RGWReadBucketPipeSyncStatusCoroutine(RGWDataSyncCtx *_sc, - const rgw_bucket_sync_pipe& sync_pipe, + const rgw_bucket_sync_pair_info& sync_pair, rgw_bucket_shard_sync_info *_status) : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env), - oid(RGWBucketPipeSyncStatusManager::status_oid(sc->source_zone, sync_pipe)), + oid(RGWBucketPipeSyncStatusManager::status_oid(sc->source_zone, sync_pair)), status(_status) {} int operate() override; }; @@ -2435,10 +2443,11 @@ int RGWRemoteDataLog::read_shard_status(int shard_id, set& pending_bucke RGWCoroutine *RGWRemoteBucketLog::read_sync_status_cr(rgw_bucket_shard_sync_info *sync_status) { + rgw_bucket_sync_pair_info sync_pair; + sync_pair.source_bs = bs; #warning FIXME - rgw_bucket_sync_pipe sync_pipe; - sync_pipe.source_bs = bs; - return new RGWReadBucketPipeSyncStatusCoroutine(&sc, sync_pipe, sync_status); + sync_pair.dest_bs = bs; + return new RGWReadBucketPipeSyncStatusCoroutine(&sc, sync_pair, sync_status); } RGWBucketPipeSyncStatusManager::RGWBucketPipeSyncStatusManager(rgw::sal::RGWRadosStore *_store, const string& _source_zone, @@ -2802,7 +2811,7 @@ public: const T& _entry_marker, RGWSyncShardMarkerTrack *_marker_tracker, rgw_zone_set& _zones_trace, RGWSyncTraceNodeRef& _tn_parent) : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env), - sync_pipe(_sync_pipe), bs(_sync_pipe.source_bs), + sync_pipe(_sync_pipe), bs(_sync_pipe.info.source_bs), key(_key), versioned(_versioned), versioned_epoch(_versioned_epoch), owner(_owner), timestamp(_timestamp), op(_op), @@ -2937,7 +2946,7 @@ public: rgw_bucket_shard_sync_info& sync_info, RGWSyncTraceNodeRef tn_parent) : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env), - sync_pipe(_sync_pipe), bs(_sync_pipe.source_bs), + sync_pipe(_sync_pipe), bs(_sync_pipe.info.source_bs), lease_cr(lease_cr), sync_info(sync_info), marker_tracker(sc, status_oid, sync_info.full_marker), status_oid(status_oid), @@ -3089,7 +3098,7 @@ public: rgw_bucket_shard_sync_info& sync_info, RGWSyncTraceNodeRef& _tn_parent) : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env), - sync_pipe(_sync_pipe), bs(_sync_pipe.source_bs), + sync_pipe(_sync_pipe), bs(_sync_pipe.info.source_bs), lease_cr(lease_cr), sync_info(sync_info), marker_tracker(sc, status_oid, sync_info.inc_marker), status_oid(status_oid), zone_id(sync_env->svc->zone->get_zone().id), @@ -3397,11 +3406,12 @@ struct rgw_bucket_sync_sources_local_info { }; WRITE_CLASS_ENCODER(rgw_bucket_sync_sources_local_info) -class RGWReadBucketSourcesInfoCR : public RGWCoroutine { +class RGWGetBucketSourcePeersCR : public RGWCoroutine { RGWDataSyncEnv *sync_env; rgw_bucket bucket; - RGWBucketInfo bucket_info; + map > *sources; + RGWBucketInfo *pbucket_info; rgw_raw_obj sources_obj; @@ -3409,19 +3419,23 @@ class RGWReadBucketSourcesInfoCR : public RGWCoroutine { rgw_bucket_sync_sources_local_info expected_local_info; rgw_bucket_get_sync_policy_params get_policy_params; - std::shared_ptr get_policy_result; + std::shared_ptr policy; RGWSyncTraceNodeRef tn; public: - RGWReadBucketSourcesInfoCR(RGWDataSyncEnv *_sync_env, - const rgw_bucket& _bucket, - const RGWSyncTraceNodeRef& _tn_parent) + RGWGetBucketSourcePeersCR(RGWDataSyncEnv *_sync_env, + const rgw_bucket& _bucket, + map > *_sources, + RGWBucketInfo *_pbucket_info, + const RGWSyncTraceNodeRef& _tn_parent) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bucket(_bucket), + sources(_sources), + pbucket_info(_pbucket_info), sources_obj(RGWBucketSyncSourcesManager::sync_sources_obj(sync_env->svc->zone, bucket)), - get_policy_result(make_shared()), + policy(make_shared()), tn(sync_env->sync_tracer->add_node(_tn_parent, "read_bucket_sources", SSTR(bucket))) { } @@ -3429,7 +3443,7 @@ public: int operate() override; }; -int RGWReadBucketSourcesInfoCR::operate() +int RGWGetBucketSourcePeersCR::operate() { reenter(this) { yield call(new RGWSimpleRadosReadCR(sync_env->async_rados, @@ -3445,11 +3459,19 @@ int RGWReadBucketSourcesInfoCR::operate() yield call(new RGWBucketGetSyncPolicyHandlerCR(sync_env->async_rados, sync_env->store, get_policy_params, - get_policy_result)); + policy)); if (retcode < 0 && retcode != -ENOENT) { return set_cr_error(retcode); } +#warning update local copy of sources and act on changes + + { + auto& handler = policy->policy_handler; + + *sources = handler->get_sources(); + *pbucket_info = handler->get_bucket_info(); + } return set_cr_done(); } @@ -3460,14 +3482,24 @@ int RGWReadBucketSourcesInfoCR::operate() class RGWRunBucketSourcesSyncCR : public RGWCoroutine { RGWDataSyncEnv *sync_env; rgw_bucket bucket; - rgw_sync_source source; + RGWBucketInfo bucket_info; rgw_raw_obj sources_obj; + map > sources; + map >::iterator siter; + set::iterator piter; + + rgw_bucket_sync_pair_info sync_pair; + boost::intrusive_ptr lease_cr; boost::intrusive_ptr lease_stack; RGWSyncTraceNodeRef tn; + std::vector scs; + RGWDataSyncCtx *cur_sc{nullptr}; + + int ret{0}; public: RGWRunBucketSourcesSyncCR(RGWDataSyncEnv *_sync_env, @@ -3513,7 +3545,7 @@ int RGWRunBucketSourcesSyncCR::operate() } tn->log(10, "took lease"); - yield call(new RGWReadBucketSourcesInfoCR(sync_env, bucket, tn, &info)); + yield call(new RGWGetBucketSourcePeersCR(sync_env, bucket, &sources, &bucket_info, tn)); if (retcode < 0 && retcode != -ENOENT) { tn->log(0, "ERROR: failed to read sync status for bucket"); lease_cr->go_down(); @@ -3521,15 +3553,40 @@ int RGWRunBucketSourcesSyncCR::operate() return set_cr_error(retcode); } - if (retcode == -ENOENT) { - rgw_bucket_sync_pipe sync_pipe; - sync_pipe.init_default(bs); - info.pipes.push_back(sync_pipe); - } + for (siter = sources.begin(); siter != sources.end(); ++siter) { + scs.emplace_back(); + cur_sc = &scs.back(); + { + auto& source_zone = siter->first; + auto conn = sync_env->svc->zone->get_zone_conn_by_id(source_zone); + if (!conn) { + ldpp_dout(sync_env->dpp, 0) << "ERROR: connection object to zone " << source_zone << " does not exist" << dendl; + continue; + } + cur_sc->init(sync_env, conn, siter->first); + } + for (piter = siter->second.begin(); piter != siter->second.end(); ++piter) { + if (!piter->is_rgw()) { + continue; + } - yield { - for (auto pipe : info.pipes) { - spawn(new RGWRunBucketSyncCoroutine(sc, pipe, &tn)); + sync_pair.source_bs.bucket = piter->bucket; + sync_pair.dest_bs.bucket = bucket_info.bucket; + + yield spawn(new RGWRunBucketSyncCoroutine(cur_sc, sync_pair, tn)); + while (num_spawned() > BUCKET_SYNC_SPAWN_WINDOW) { + set_status() << "num_spawned() > spawn_window"; + yield wait_for_child(); + bool again = true; + while (again) { + again = collect(&ret, nullptr); + if (ret < 0) { + tn->log(10, "a sync operation returned error"); + /* we have reported this error */ + } + /* not waiting for child here */ + } + } } } @@ -3541,6 +3598,65 @@ int RGWRunBucketSourcesSyncCR::operate() return 0; } +class RGWSyncGetBucketInfoCR : public RGWCoroutine { + RGWDataSyncEnv *sync_env; + rgw_bucket bucket; + RGWBucketInfo *pbucket_info; + RGWMetaSyncEnv meta_sync_env; + + RGWSyncTraceNodeRef tn; + +public: + RGWSyncGetBucketInfoCR(RGWDataSyncEnv *_sync_env, + const rgw_bucket& _bucket, + RGWBucketInfo *_pbucket_info, + const RGWSyncTraceNodeRef& _tn_parent) + : RGWCoroutine(_sync_env->cct), + sync_env(_sync_env), + bucket(_bucket), + pbucket_info(_pbucket_info), + tn(sync_env->sync_tracer->add_node(_tn_parent, "get_bucket_info", + SSTR(bucket))) { + } + + int operate() override; +}; + +int RGWSyncGetBucketInfoCR::operate() +{ + reenter(this) { + yield call(new RGWGetBucketInstanceInfoCR(sync_env->async_rados, sync_env->store, bucket, pbucket_info)); + if (retcode == -ENOENT) { + /* bucket instance info has not been synced in yet, fetch it now */ + yield { + tn->log(10, SSTR("no local info for bucket:" << ": fetching metadata")); + string raw_key = string("bucket.instance:") + bucket.get_key(); + + meta_sync_env.init(sync_env->dpp, cct, sync_env->store, sync_env->svc->zone->get_master_conn(), sync_env->async_rados, + sync_env->http_manager, sync_env->error_logger, sync_env->sync_tracer); + + call(new RGWMetaSyncSingleEntryCR(&meta_sync_env, raw_key, + string() /* no marker */, + MDLOG_STATUS_COMPLETE, + NULL /* no marker tracker */, + tn)); + } + if (retcode < 0) { + tn->log(0, SSTR("ERROR: failed to fetch bucket instance info for " << bucket_str{bucket})); + return set_cr_error(retcode); + } + + yield call(new RGWGetBucketInstanceInfoCR(sync_env->async_rados, sync_env->store, bucket, &pbucket_info)); + } + if (retcode < 0) { + tn->log(0, SSTR("ERROR: failed to retrieve bucket info for bucket=" << bucket_str{bucket})); + return set_cr_error(retcode); + } + + return set_cr_done(); + } +} + int RGWRunBucketSyncCoroutine::operate() { reenter(this) { @@ -3566,7 +3682,7 @@ int RGWRunBucketSyncCoroutine::operate() } tn->log(10, "took lease"); - yield call(new RGWReadBucketPipeSyncStatusCoroutine(sc, sync_pipe, &sync_status)); + yield call(new RGWReadBucketPipeSyncStatusCoroutine(sc, sync_pair, &sync_status)); if (retcode < 0 && retcode != -ENOENT) { tn->log(0, "ERROR: failed to read sync status for bucket"); lease_cr->go_down(); @@ -3574,33 +3690,17 @@ int RGWRunBucketSyncCoroutine::operate() return set_cr_error(retcode); } - tn->log(20, SSTR("sync status for bucket: " << sync_status.state)); - - yield call(new RGWGetBucketInstanceInfoCR(sync_env->async_rados, sync_env->store, sync_pipe.source_bs.bucket, &sync_pipe.dest_bucket_info)); - if (retcode == -ENOENT) { - /* bucket instance info has not been synced in yet, fetch it now */ - yield { - tn->log(10, SSTR("no local info for bucket:" << ": fetching metadata")); - string raw_key = string("bucket.instance:") + sync_pipe.source_bs.bucket.get_key(); - - meta_sync_env.init(sync_env->dpp, cct, sync_env->store, sync_env->svc->zone->get_master_conn(), sync_env->async_rados, - sync_env->http_manager, sync_env->error_logger, sync_env->sync_tracer); - - call(new RGWMetaSyncSingleEntryCR(&meta_sync_env, raw_key, - string() /* no marker */, - MDLOG_STATUS_COMPLETE, - NULL /* no marker tracker */, - tn)); - } - if (retcode < 0) { - tn->log(0, SSTR("ERROR: failed to fetch bucket instance info for " << bucket_str{sync_pipe.source_bs.bucket})); - lease_cr->go_down(); - drain_all(); - return set_cr_error(retcode); - } + tn->log(20, SSTR("sync status for source bucket: " << sync_status.state)); - yield call(new RGWGetBucketInstanceInfoCR(sync_env->async_rados, sync_env->store, sync_pipe.source_bs.bucket, &sync_pipe.dest_bucket_info)); + yield call(new RGWSyncGetBucketInfoCR(sync_env, sync_pair.source_bs.bucket, &sync_pipe.source_bucket_info, tn)); + if (retcode < 0) { + tn->log(0, SSTR("ERROR: failed to retrieve bucket info for bucket=" << bucket_str{sync_pipe.info.source_bs.bucket})); + lease_cr->go_down(); + drain_all(); + return set_cr_error(retcode); } + + yield call(new RGWSyncGetBucketInfoCR(sync_env, sync_pair.dest_bs.bucket, &sync_pipe.dest_bucket_info, tn)); if (retcode < 0) { tn->log(0, SSTR("ERROR: failed to retrieve bucket info for bucket=" << bucket_str{sync_pipe.source_bs.bucket})); lease_cr->go_down(); @@ -3608,6 +3708,8 @@ int RGWRunBucketSyncCoroutine::operate() return set_cr_error(retcode); } + sync_pipe.info = sync_pair; + do { if (sync_status.state == rgw_bucket_shard_sync_info::StateInit) { yield call(new RGWInitBucketShardSyncStatusCoroutine(sc, sync_pipe, sync_status)); @@ -3680,6 +3782,7 @@ int RGWBucketPipeSyncStatusManager::init() return ret; } +#warning read specific bucket sources const string key = bucket.get_key(); @@ -3791,9 +3894,13 @@ std::ostream& RGWBucketPipeSyncStatusManager::gen_prefix(std::ostream& out) cons } string RGWBucketPipeSyncStatusManager::status_oid(const string& source_zone, - const rgw_bucket_sync_pipe& sync_pipe) + const rgw_bucket_sync_pair_info& sync_pair) { - return bucket_status_oid_prefix + "." + source_zone + ":" + sync_pipe.source_bs.get_key(); + if (sync_pair.source_bs == sync_pair.dest_bs) { + return bucket_status_oid_prefix + "." + source_zone + ":" + sync_pair.dest_bs.get_key(); + } else { + return bucket_status_oid_prefix + "." + source_zone + ":" + sync_pair.dest_bs.get_key() + ":" + sync_pair.source_bs.get_key(); + } } string RGWBucketPipeSyncStatusManager::obj_status_oid(const string& source_zone, diff --git a/src/rgw/rgw_data_sync.h b/src/rgw/rgw_data_sync.h index 56cae033562..cf0ca89d0ac 100644 --- a/src/rgw/rgw_data_sync.h +++ b/src/rgw/rgw_data_sync.h @@ -18,15 +18,21 @@ class JSONObj; -struct rgw_bucket_sync_pipe { +struct rgw_bucket_sync_pair_info { rgw_bucket_shard source_bs; - RGWBucketInfo dest_bucket_info; + rgw_bucket_shard dest_bs; string source_prefix; string dest_prefix; }; -inline ostream& operator<<(ostream& out, const rgw_bucket_sync_pipe& p) { - if (p.source_bs.bucket == p.dest_bucket_info.bucket && +struct rgw_bucket_sync_pipe { + rgw_bucket_sync_pair_info info; + RGWBucketInfo source_bucket_info; + RGWBucketInfo dest_bucket_info; +}; + +inline ostream& operator<<(ostream& out, const rgw_bucket_sync_pair_info& p) { + if (p.source_bs.bucket == p.dest_bs.bucket && p.source_prefix == p.dest_prefix) { return out << p.source_bs; } @@ -37,7 +43,7 @@ inline ostream& operator<<(ostream& out, const rgw_bucket_sync_pipe& p) { out << "/" << p.source_prefix; } - out << " -> " << p.dest_bucket_info.bucket; + out << " -> " << p.dest_bs.bucket; if (!p.dest_prefix.empty()) { out << "/" << p.dest_prefix; @@ -766,7 +772,7 @@ public: map& get_sync_status() { return sync_status; } int init_sync_status(); - static string status_oid(const string& source_zone, const rgw_bucket_sync_pipe& bs); + static string status_oid(const string& source_zone, const rgw_bucket_sync_pair_info& bs); static string obj_status_oid(const string& source_zone, const rgw_obj& obj); /* can be used by sync modules */ // implements DoutPrefixProvider