From: Yehuda Sadeh Date: Thu, 31 Oct 2019 21:31:29 +0000 (-0700) Subject: rgw: data sync: trigger per shard sync by source and target X-Git-Tag: v15.1.0~22^2~75 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=a1a2bd3ce491bab800e87f3d317bb85e14e32354;p=ceph.git rgw: data sync: trigger per shard sync by source and target and tie to data log sync Signed-off-by: Yehuda Sadeh --- diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index e0a902725ebdb..04ea6e5869848 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -1049,6 +1049,153 @@ public: int operate() override; }; +struct rgw_sync_pipe_info_entity +{ +private: + RGWBucketInfo bucket_info; + bool _has_bucket_info{false}; + +public: + string zone; + + rgw_sync_pipe_info_entity() {} + rgw_sync_pipe_info_entity(const rgw_sync_bucket_entity& e, + std::optional& binfo) { + if (e.zone) { + zone = *e.zone; + } + if (!e.bucket) { + return; + } + if (!binfo || + binfo->bucket != *e.bucket) { + bucket_info.bucket = *e.bucket; + } + set_bucket_info(*binfo); + } + + void update_empty_bucket_info(const std::map& buckets_info) { + if (_has_bucket_info) { + return; + } + if (bucket_info.bucket.name.empty()) { + return; + } + + auto iter = buckets_info.find(bucket_info.bucket); + if (iter == buckets_info.end()) { + return; + } + + set_bucket_info(iter->second); + } + + bool has_bucket_info() const { + return _has_bucket_info; + } + + void set_bucket_info(const RGWBucketInfo& _bucket_info) { + bucket_info = _bucket_info; + _has_bucket_info = true; + } + + const RGWBucketInfo& get_bucket_info() const { + return bucket_info; + } + + const rgw_bucket& get_bucket() const { + return bucket_info.bucket; + } + + bool operator<(const rgw_sync_pipe_info_entity& e) const { + if (zone < e.zone) { + return false; + } + if (zone > e.zone) { + return true; + } + return (bucket_info.bucket < e.bucket_info.bucket); + } +}; + +std::ostream& operator<<(std::ostream& out, const rgw_sync_pipe_info_entity& e) { + auto& bucket = e.get_bucket_info().bucket; + + out << e.zone << ":" << bucket.get_key(); + return out; +} + +struct rgw_sync_pipe_info { + rgw_sync_pipe_info_entity source; + rgw_sync_pipe_info_entity target; + + rgw_sync_pipe_info() {} + rgw_sync_pipe_info(const rgw_sync_bucket_pipe& pipe, + std::optional source_bucket_info, + std::optional target_bucket_info) : source(pipe.source, source_bucket_info), + target(pipe.dest, target_bucket_info) {} + + bool operator<(const rgw_sync_pipe_info& p) const { + if (source < p.source) { + return true; + } + if (p.source < source) { + return false; + } + return (target < p.target); + } + + void update_empty_bucket_info(const std::map& buckets_info) { + source.update_empty_bucket_info(buckets_info); + target.update_empty_bucket_info(buckets_info); + } +}; + +std::ostream& operator<<(std::ostream& out, const rgw_sync_pipe_info& p) { + out << p.source << ">" << p.target; + return out; +} + +struct rgw_sync_pipe_info_set { + std::set pipes; + + using iterator = std::set::iterator; + + void clear() { + pipes.clear(); + } + + void insert(const rgw_sync_bucket_pipe& pipe, + std::optional& source_bucket_info, + std::optional& target_bucket_info) { + rgw_sync_pipe_info p(pipe, source_bucket_info, target_bucket_info); + pipes.insert(p); + } + + iterator begin() { + return pipes.begin(); + } + + iterator end() { + return pipes.end(); + } + + void update_empty_bucket_info(const std::map& buckets_info) { + if (buckets_info.empty()) { + return; + } + + std::set p; + + for (auto pipe : pipes) { + pipe.update_empty_bucket_info(buckets_info); + p.insert(pipe); + } + + pipes = std::move(p); + } +}; + class RGWRunBucketsSyncBySourceCR : public RGWCoroutine { RGWDataSyncCtx *sc; RGWDataSyncEnv *sync_env; @@ -1069,6 +1216,55 @@ public: int operate() override; }; +class RGWRunBucketSourcesSyncCR : public RGWCoroutine { + RGWDataSyncCtx *sc; + RGWDataSyncEnv *sync_env; + + std::optional target_bs; + std::optional source_bs; + + std::optional target_bucket; + std::optional source_bucket; + + rgw_raw_obj sources_obj; + + rgw_sync_pipe_info_set pipes; + rgw_sync_pipe_info_set::iterator siter; + + rgw_bucket_sync_pair_info sync_pair; + + boost::intrusive_ptr lease_cr; + boost::intrusive_ptr lease_stack; + + RGWSyncTraceNodeRef tn; + std::vector scs; + RGWDataSyncCtx *cur_sc{nullptr}; + + RGWRESTConn *conn{nullptr}; + string last_zone; + + int ret{0}; + + int source_num_shards{0}; + int target_num_shards{0}; + + int num_shards{0}; + int cur_shard{0}; + +public: + RGWRunBucketSourcesSyncCR(RGWDataSyncCtx *_sc, + std::optional _target_bs, + std::optional _source_bs, + const RGWSyncTraceNodeRef& _tn_parent); + ~RGWRunBucketSourcesSyncCR() override { + if (lease_cr) { + lease_cr->abort(); + } + } + + int operate() override; +}; + class RGWDataSyncSingleEntryCR : public RGWCoroutine { RGWDataSyncCtx *sc; RGWDataSyncEnv *sync_env; @@ -1076,7 +1272,7 @@ class RGWDataSyncSingleEntryCR : public RGWCoroutine { string raw_key; string entry_marker; - rgw_bucket_shard bs; + rgw_bucket_shard source_bs; rgw_bucket_sync_pair_info sync_pair; int sync_status; @@ -1109,20 +1305,19 @@ public: do { yield { int ret = rgw_bucket_parse_bucket_key(sync_env->cct, raw_key, - &bs.bucket, &bs.shard_id); + &source_bs.bucket, &source_bs.shard_id); if (ret < 0) { return set_cr_error(-EIO); } if (marker_tracker) { marker_tracker->reset_need_retry(raw_key); } - tn->log(0, SSTR("triggering sync of bucket/shard " << bucket_shard_str{bs})); + tn->log(0, SSTR("triggering sync of source bucket/shard " << bucket_shard_str{source_bs})); - sync_pair = rgw_bucket_sync_pair_info(); - sync_pair.source_bs = bs; - sync_pair.dest_bs = bs; -#warning init pipe fields - call(new RGWRunBucketsSyncBySourceCR(sc, bs, tn)); + call(new RGWRunBucketSourcesSyncCR(sc, + std::nullopt, /* target_bs */ + source_bs, + tn)); } } while (marker_tracker && marker_tracker->need_retry(raw_key)); @@ -3422,141 +3617,6 @@ struct rgw_bucket_sync_sources_local_info { WRITE_CLASS_ENCODER(rgw_bucket_sync_sources_local_info) -struct rgw_sync_pipe_info_entity -{ -private: - RGWBucketInfo bucket_info; - bool _has_bucket_info{false}; - -public: - string zone; - - rgw_sync_pipe_info_entity() {} - rgw_sync_pipe_info_entity(const rgw_sync_bucket_entity& e, - std::optional& binfo) { - if (e.zone) { - zone = *e.zone; - } - if (!e.bucket) { - return; - } - if (!binfo || - binfo->bucket != *e.bucket) { - bucket_info.bucket = *e.bucket; - } - set_bucket_info(*binfo); - } - - void update_empty_bucket_info(const std::map& buckets_info) { - if (_has_bucket_info) { - return; - } - if (bucket_info.bucket.name.empty()) { - return; - } - - auto iter = buckets_info.find(bucket_info.bucket); - if (iter == buckets_info.end()) { - return; - } - - set_bucket_info(iter->second); - } - - bool has_bucket_info() const { - return _has_bucket_info; - } - - void set_bucket_info(const RGWBucketInfo& _bucket_info) { - bucket_info = _bucket_info; - _has_bucket_info = true; - } - - const RGWBucketInfo& get_bucket_info() const { - return bucket_info; - } - - const rgw_bucket& get_bucket() const { - return bucket_info.bucket; - } - - bool operator<(const rgw_sync_pipe_info_entity& e) const { - if (zone < e.zone) { - return false; - } - if (zone > e.zone) { - return true; - } - return (bucket_info.bucket < e.bucket_info.bucket); - } -}; - -struct rgw_sync_pipe_info { - rgw_sync_pipe_info_entity source; - rgw_sync_pipe_info_entity target; - - rgw_sync_pipe_info() {} - rgw_sync_pipe_info(const rgw_sync_bucket_pipe& pipe, - std::optional source_bucket_info, - std::optional target_bucket_info) : source(pipe.source, source_bucket_info), - target(pipe.dest, target_bucket_info) {} - - bool operator<(const rgw_sync_pipe_info& p) const { - if (source < p.source) { - return true; - } - if (p.source < source) { - return false; - } - return (target < p.target); - } - - void update_empty_bucket_info(const std::map& buckets_info) { - source.update_empty_bucket_info(buckets_info); - target.update_empty_bucket_info(buckets_info); - } -}; - -struct rgw_sync_pipe_info_set { - std::set pipes; - - using iterator = std::set::iterator; - - void clear() { - pipes.clear(); - } - - void insert(const rgw_sync_bucket_pipe& pipe, - std::optional& source_bucket_info, - std::optional& target_bucket_info) { - rgw_sync_pipe_info p(pipe, source_bucket_info, target_bucket_info); - pipes.insert(p); - } - - iterator begin() { - return pipes.begin(); - } - - iterator end() { - return pipes.end(); - } - - void update_empty_bucket_info(const std::map& buckets_info) { - if (buckets_info.empty()) { - return; - } - - std::set p; - - for (auto pipe : pipes) { - pipe.update_empty_bucket_info(buckets_info); - p.insert(pipe); - } - - pipes = std::move(p); - } -}; - class RGWGetBucketPeersCR : public RGWCoroutine { RGWDataSyncEnv *sync_env; @@ -3661,64 +3721,34 @@ public: int operate() override; }; -class RGWRunBucketSourcesSyncCR : public RGWCoroutine { - RGWDataSyncEnv *sync_env; - - std::optional target_bs; - std::optional source_zone; - std::optional source_bs; - - std::optional target_bucket; - std::optional source_bucket; - - rgw_raw_obj sources_obj; - - rgw_sync_pipe_info_set pipes; - rgw_sync_pipe_info_set::iterator siter; - - rgw_bucket_sync_pair_info sync_pair; - - boost::intrusive_ptr lease_cr; - boost::intrusive_ptr lease_stack; - - RGWSyncTraceNodeRef tn; - std::vector scs; - RGWDataSyncCtx *cur_sc{nullptr}; - - RGWRESTConn *conn{nullptr}; - string last_zone; - - int ret{0}; +std::ostream& operator<<(std::ostream& out, std::optional& bs) { + if (!bs) { + out << "*"; + } else { + out << *bs; + } + return out; +} -public: - RGWRunBucketSourcesSyncCR(RGWDataSyncEnv *_sync_env, - std::optional _target_bs, - std::optional _source_zone, - std::optional _source_bs, - const RGWSyncTraceNodeRef& _tn_parent) - : RGWCoroutine(_sync_env->cct), - sync_env(_sync_env), +RGWRunBucketSourcesSyncCR::RGWRunBucketSourcesSyncCR(RGWDataSyncCtx *_sc, + std::optional _target_bs, + std::optional _source_bs, + const RGWSyncTraceNodeRef& _tn_parent) : RGWCoroutine(_sc->env->cct), + sc(_sc), + sync_env(_sc->env), target_bs(_target_bs), - source_zone(_source_zone), source_bs(_source_bs), tn(sync_env->sync_tracer->add_node(_tn_parent, "bucket_sync_sources", - SSTR( "target=" << target_bucket.value_or(rgw_bucket()) << ":source_bucket=" << source_bucket.value_or(rgw_bucket()) << ":source_zone=" << source_zone.value_or("*")))) { - if (target_bs) { - target_bucket = target_bs->bucket; - sources_obj = RGWBucketSyncPeersManager::sync_sources_obj(_sync_env->svc->zone, *target_bucket); - } - if (source_bs) { - source_bucket = source_bs->bucket; - } + SSTR( "target=" << target_bucket.value_or(rgw_bucket()) << ":source_bucket=" << source_bucket.value_or(rgw_bucket()) << ":source_zone=" << sc->source_zone))) +{ + if (target_bs) { + target_bucket = target_bs->bucket; + sources_obj = RGWBucketSyncPeersManager::sync_sources_obj(sync_env->svc->zone, *target_bucket); } - ~RGWRunBucketSourcesSyncCR() override { - if (lease_cr) { - lease_cr->abort(); - } + if (source_bs) { + source_bucket = source_bs->bucket; } - - int operate() override; -}; +} int RGWRunBucketSourcesSyncCR::operate() { @@ -3744,47 +3774,60 @@ int RGWRunBucketSourcesSyncCR::operate() } tn->log(10, "took lease"); - yield call(new RGWGetBucketPeersCR(sync_env, target_bucket, source_zone, source_bucket, &pipes, tn)); + yield call(new RGWGetBucketPeersCR(sync_env, target_bucket, sc->source_zone, source_bucket, &pipes, tn)); if (retcode < 0 && retcode != -ENOENT) { tn->log(0, "ERROR: failed to read sync status for bucket"); lease_cr->go_down(); drain_all(); return set_cr_error(retcode); } + ldpp_dout(sync_env->dpp, 20) << __func__ << "(): requested source_bs=" << source_bs << " target_bs=" << target_bs << dendl; for (siter = pipes.begin(); siter != pipes.end(); ++siter) { - scs.emplace_back(); - cur_sc = &scs.back(); - { - auto& szone = siter->source.zone; - if (last_zone != szone) { - conn = sync_env->svc->zone->get_zone_conn_by_id(szone); - if (!conn) { - ldpp_dout(sync_env->dpp, 0) << "ERROR: connection object to zone " << szone << " does not exist" << dendl; - continue; - } - last_zone = szone; + ldpp_dout(sync_env->dpp, 20) << __func__ << "(): sync pipe=" << *siter << dendl; + + source_num_shards = siter->source.get_bucket_info().num_shards; + target_num_shards = siter->target.get_bucket_info().num_shards; + if (source_bs) { + sync_pair.source_bs = *source_bs; + } else { + sync_pair.source_bs.bucket = siter->source.get_bucket(); + } + + if (sync_pair.source_bs.shard_id >= 0) { + num_shards = 1; + cur_shard = sync_pair.source_bs.shard_id; + } else { + num_shards = std::max(1, source_num_shards); + cur_shard = std::min(0, source_num_shards); } - cur_sc->init(sync_env, conn, szone); } - sync_pair.source_bs.bucket = siter->source.get_bucket(); - sync_pair.dest_bs.bucket = siter->target.get_bucket(); -#warning TODO iterate over shards - - yield spawn(new RGWRunBucketSyncCoroutine(cur_sc, sync_pair, tn), false); - while (num_spawned() > BUCKET_SYNC_SPAWN_WINDOW) { - set_status() << "num_spawned() > spawn_window"; - yield wait_for_child(); - bool again = true; - while (again) { - again = collect(&ret, nullptr); - if (ret < 0) { - tn->log(10, "a sync operation returned error"); - /* we have reported this error */ + ldpp_dout(sync_env->dpp, 20) << __func__ << "(): num shards=" << num_shards << " cur_shard=" << cur_shard << dendl; + for (; num_shards > 0; --num_shards, ++cur_shard) { + sync_pair.source_bs.shard_id = cur_shard; + if (source_num_shards == target_num_shards) { + sync_pair.dest_bs.shard_id = cur_shard; + } else { + sync_pair.dest_bs.shard_id = -1; + } + + ldpp_dout(sync_env->dpp, 20) << __func__ << "(): sync_pair=" << sync_pair << dendl; + + yield spawn(new RGWRunBucketSyncCoroutine(sc, sync_pair, tn), false); + while (num_spawned() > BUCKET_SYNC_SPAWN_WINDOW) { + set_status() << "num_spawned() > spawn_window"; + yield wait_for_child(); + bool again = true; + while (again) { + again = collect(&ret, nullptr); + if (ret < 0) { + tn->log(10, "a sync operation returned error"); + /* we have reported this error */ + } + /* not waiting for child here */ } - /* not waiting for child here */ } } } diff --git a/src/rgw/rgw_data_sync.h b/src/rgw/rgw_data_sync.h index 483a3b2178158..9d223ebb2e13a 100644 --- a/src/rgw/rgw_data_sync.h +++ b/src/rgw/rgw_data_sync.h @@ -38,7 +38,7 @@ inline ostream& operator<<(ostream& out, const rgw_bucket_sync_pair_info& p) { out << "/" << p.source_prefix; } - out << " -> " << p.dest_bs.bucket; + out << "->" << p.dest_bs.bucket; if (!p.dest_prefix.empty()) { out << "/" << p.dest_prefix;