int operate() override;
};
+struct rgw_sync_pipe_info_entity
+{
+private:
+ RGWBucketInfo bucket_info;
+ bool _has_bucket_info{false};
+
+public:
+ string zone;
+
+ rgw_sync_pipe_info_entity() {}
+ rgw_sync_pipe_info_entity(const rgw_sync_bucket_entity& e,
+ std::optional<RGWBucketInfo>& binfo) {
+ if (e.zone) {
+ zone = *e.zone;
+ }
+ if (!e.bucket) {
+ return;
+ }
+ if (!binfo ||
+ binfo->bucket != *e.bucket) {
+ bucket_info.bucket = *e.bucket;
+ }
+ set_bucket_info(*binfo);
+ }
+
+ void update_empty_bucket_info(const std::map<rgw_bucket, RGWBucketInfo>& buckets_info) {
+ if (_has_bucket_info) {
+ return;
+ }
+ if (bucket_info.bucket.name.empty()) {
+ return;
+ }
+
+ auto iter = buckets_info.find(bucket_info.bucket);
+ if (iter == buckets_info.end()) {
+ return;
+ }
+
+ set_bucket_info(iter->second);
+ }
+
+ bool has_bucket_info() const {
+ return _has_bucket_info;
+ }
+
+ void set_bucket_info(const RGWBucketInfo& _bucket_info) {
+ bucket_info = _bucket_info;
+ _has_bucket_info = true;
+ }
+
+ const RGWBucketInfo& get_bucket_info() const {
+ return bucket_info;
+ }
+
+ const rgw_bucket& get_bucket() const {
+ return bucket_info.bucket;
+ }
+
+ bool operator<(const rgw_sync_pipe_info_entity& e) const {
+ if (zone < e.zone) {
+ return false;
+ }
+ if (zone > e.zone) {
+ return true;
+ }
+ return (bucket_info.bucket < e.bucket_info.bucket);
+ }
+};
+
+std::ostream& operator<<(std::ostream& out, const rgw_sync_pipe_info_entity& e) {
+ auto& bucket = e.get_bucket_info().bucket;
+
+ out << e.zone << ":" << bucket.get_key();
+ return out;
+}
+
+struct rgw_sync_pipe_info {
+ rgw_sync_pipe_info_entity source;
+ rgw_sync_pipe_info_entity target;
+
+ rgw_sync_pipe_info() {}
+ rgw_sync_pipe_info(const rgw_sync_bucket_pipe& pipe,
+ std::optional<RGWBucketInfo> source_bucket_info,
+ std::optional<RGWBucketInfo> target_bucket_info) : source(pipe.source, source_bucket_info),
+ target(pipe.dest, target_bucket_info) {}
+
+ bool operator<(const rgw_sync_pipe_info& p) const {
+ if (source < p.source) {
+ return true;
+ }
+ if (p.source < source) {
+ return false;
+ }
+ return (target < p.target);
+ }
+
+ void update_empty_bucket_info(const std::map<rgw_bucket, RGWBucketInfo>& buckets_info) {
+ source.update_empty_bucket_info(buckets_info);
+ target.update_empty_bucket_info(buckets_info);
+ }
+};
+
+std::ostream& operator<<(std::ostream& out, const rgw_sync_pipe_info& p) {
+ out << p.source << ">" << p.target;
+ return out;
+}
+
+struct rgw_sync_pipe_info_set {
+ std::set<rgw_sync_pipe_info> pipes;
+
+ using iterator = std::set<rgw_sync_pipe_info>::iterator;
+
+ void clear() {
+ pipes.clear();
+ }
+
+ void insert(const rgw_sync_bucket_pipe& pipe,
+ std::optional<RGWBucketInfo>& source_bucket_info,
+ std::optional<RGWBucketInfo>& target_bucket_info) {
+ rgw_sync_pipe_info p(pipe, source_bucket_info, target_bucket_info);
+ pipes.insert(p);
+ }
+
+ iterator begin() {
+ return pipes.begin();
+ }
+
+ iterator end() {
+ return pipes.end();
+ }
+
+ void update_empty_bucket_info(const std::map<rgw_bucket, RGWBucketInfo>& buckets_info) {
+ if (buckets_info.empty()) {
+ return;
+ }
+
+ std::set<rgw_sync_pipe_info> p;
+
+ for (auto pipe : pipes) {
+ pipe.update_empty_bucket_info(buckets_info);
+ p.insert(pipe);
+ }
+
+ pipes = std::move(p);
+ }
+};
+
class RGWRunBucketsSyncBySourceCR : public RGWCoroutine {
RGWDataSyncCtx *sc;
RGWDataSyncEnv *sync_env;
int operate() override;
};
+class RGWRunBucketSourcesSyncCR : public RGWCoroutine {
+ RGWDataSyncCtx *sc;
+ RGWDataSyncEnv *sync_env;
+
+ std::optional<rgw_bucket_shard> target_bs;
+ std::optional<rgw_bucket_shard> source_bs;
+
+ std::optional<rgw_bucket> target_bucket;
+ std::optional<rgw_bucket> source_bucket;
+
+ rgw_raw_obj sources_obj;
+
+ rgw_sync_pipe_info_set pipes;
+ rgw_sync_pipe_info_set::iterator siter;
+
+ rgw_bucket_sync_pair_info sync_pair;
+
+ boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
+ boost::intrusive_ptr<RGWCoroutinesStack> lease_stack;
+
+ RGWSyncTraceNodeRef tn;
+ std::vector<RGWDataSyncCtx> scs;
+ RGWDataSyncCtx *cur_sc{nullptr};
+
+ RGWRESTConn *conn{nullptr};
+ string last_zone;
+
+ int ret{0};
+
+ int source_num_shards{0};
+ int target_num_shards{0};
+
+ int num_shards{0};
+ int cur_shard{0};
+
+public:
+ RGWRunBucketSourcesSyncCR(RGWDataSyncCtx *_sc,
+ std::optional<rgw_bucket_shard> _target_bs,
+ std::optional<rgw_bucket_shard> _source_bs,
+ const RGWSyncTraceNodeRef& _tn_parent);
+ ~RGWRunBucketSourcesSyncCR() override {
+ if (lease_cr) {
+ lease_cr->abort();
+ }
+ }
+
+ int operate() override;
+};
+
class RGWDataSyncSingleEntryCR : public RGWCoroutine {
RGWDataSyncCtx *sc;
RGWDataSyncEnv *sync_env;
string raw_key;
string entry_marker;
- rgw_bucket_shard bs;
+ rgw_bucket_shard source_bs;
rgw_bucket_sync_pair_info sync_pair;
int sync_status;
do {
yield {
int ret = rgw_bucket_parse_bucket_key(sync_env->cct, raw_key,
- &bs.bucket, &bs.shard_id);
+ &source_bs.bucket, &source_bs.shard_id);
if (ret < 0) {
return set_cr_error(-EIO);
}
if (marker_tracker) {
marker_tracker->reset_need_retry(raw_key);
}
- tn->log(0, SSTR("triggering sync of bucket/shard " << bucket_shard_str{bs}));
+ tn->log(0, SSTR("triggering sync of source bucket/shard " << bucket_shard_str{source_bs}));
- sync_pair = rgw_bucket_sync_pair_info();
- sync_pair.source_bs = bs;
- sync_pair.dest_bs = bs;
-#warning init pipe fields
- call(new RGWRunBucketsSyncBySourceCR(sc, bs, tn));
+ call(new RGWRunBucketSourcesSyncCR(sc,
+ std::nullopt, /* target_bs */
+ source_bs,
+ tn));
}
} while (marker_tracker && marker_tracker->need_retry(raw_key));
WRITE_CLASS_ENCODER(rgw_bucket_sync_sources_local_info)
-struct rgw_sync_pipe_info_entity
-{
-private:
- RGWBucketInfo bucket_info;
- bool _has_bucket_info{false};
-
-public:
- string zone;
-
- rgw_sync_pipe_info_entity() {}
- rgw_sync_pipe_info_entity(const rgw_sync_bucket_entity& e,
- std::optional<RGWBucketInfo>& binfo) {
- if (e.zone) {
- zone = *e.zone;
- }
- if (!e.bucket) {
- return;
- }
- if (!binfo ||
- binfo->bucket != *e.bucket) {
- bucket_info.bucket = *e.bucket;
- }
- set_bucket_info(*binfo);
- }
-
- void update_empty_bucket_info(const std::map<rgw_bucket, RGWBucketInfo>& buckets_info) {
- if (_has_bucket_info) {
- return;
- }
- if (bucket_info.bucket.name.empty()) {
- return;
- }
-
- auto iter = buckets_info.find(bucket_info.bucket);
- if (iter == buckets_info.end()) {
- return;
- }
-
- set_bucket_info(iter->second);
- }
-
- bool has_bucket_info() const {
- return _has_bucket_info;
- }
-
- void set_bucket_info(const RGWBucketInfo& _bucket_info) {
- bucket_info = _bucket_info;
- _has_bucket_info = true;
- }
-
- const RGWBucketInfo& get_bucket_info() const {
- return bucket_info;
- }
-
- const rgw_bucket& get_bucket() const {
- return bucket_info.bucket;
- }
-
- bool operator<(const rgw_sync_pipe_info_entity& e) const {
- if (zone < e.zone) {
- return false;
- }
- if (zone > e.zone) {
- return true;
- }
- return (bucket_info.bucket < e.bucket_info.bucket);
- }
-};
-
-struct rgw_sync_pipe_info {
- rgw_sync_pipe_info_entity source;
- rgw_sync_pipe_info_entity target;
-
- rgw_sync_pipe_info() {}
- rgw_sync_pipe_info(const rgw_sync_bucket_pipe& pipe,
- std::optional<RGWBucketInfo> source_bucket_info,
- std::optional<RGWBucketInfo> target_bucket_info) : source(pipe.source, source_bucket_info),
- target(pipe.dest, target_bucket_info) {}
-
- bool operator<(const rgw_sync_pipe_info& p) const {
- if (source < p.source) {
- return true;
- }
- if (p.source < source) {
- return false;
- }
- return (target < p.target);
- }
-
- void update_empty_bucket_info(const std::map<rgw_bucket, RGWBucketInfo>& buckets_info) {
- source.update_empty_bucket_info(buckets_info);
- target.update_empty_bucket_info(buckets_info);
- }
-};
-
-struct rgw_sync_pipe_info_set {
- std::set<rgw_sync_pipe_info> pipes;
-
- using iterator = std::set<rgw_sync_pipe_info>::iterator;
-
- void clear() {
- pipes.clear();
- }
-
- void insert(const rgw_sync_bucket_pipe& pipe,
- std::optional<RGWBucketInfo>& source_bucket_info,
- std::optional<RGWBucketInfo>& target_bucket_info) {
- rgw_sync_pipe_info p(pipe, source_bucket_info, target_bucket_info);
- pipes.insert(p);
- }
-
- iterator begin() {
- return pipes.begin();
- }
-
- iterator end() {
- return pipes.end();
- }
-
- void update_empty_bucket_info(const std::map<rgw_bucket, RGWBucketInfo>& buckets_info) {
- if (buckets_info.empty()) {
- return;
- }
-
- std::set<rgw_sync_pipe_info> p;
-
- for (auto pipe : pipes) {
- pipe.update_empty_bucket_info(buckets_info);
- p.insert(pipe);
- }
-
- pipes = std::move(p);
- }
-};
-
class RGWGetBucketPeersCR : public RGWCoroutine {
RGWDataSyncEnv *sync_env;
int operate() override;
};
-class RGWRunBucketSourcesSyncCR : public RGWCoroutine {
- RGWDataSyncEnv *sync_env;
-
- std::optional<rgw_bucket_shard> target_bs;
- std::optional<std::string> source_zone;
- std::optional<rgw_bucket_shard> source_bs;
-
- std::optional<rgw_bucket> target_bucket;
- std::optional<rgw_bucket> source_bucket;
-
- rgw_raw_obj sources_obj;
-
- rgw_sync_pipe_info_set pipes;
- rgw_sync_pipe_info_set::iterator siter;
-
- rgw_bucket_sync_pair_info sync_pair;
-
- boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
- boost::intrusive_ptr<RGWCoroutinesStack> lease_stack;
-
- RGWSyncTraceNodeRef tn;
- std::vector<RGWDataSyncCtx> scs;
- RGWDataSyncCtx *cur_sc{nullptr};
-
- RGWRESTConn *conn{nullptr};
- string last_zone;
-
- int ret{0};
+std::ostream& operator<<(std::ostream& out, std::optional<rgw_bucket_shard>& bs) {
+ if (!bs) {
+ out << "*";
+ } else {
+ out << *bs;
+ }
+ return out;
+}
-public:
- RGWRunBucketSourcesSyncCR(RGWDataSyncEnv *_sync_env,
- std::optional<rgw_bucket_shard> _target_bs,
- std::optional<string> _source_zone,
- std::optional<rgw_bucket_shard> _source_bs,
- const RGWSyncTraceNodeRef& _tn_parent)
- : RGWCoroutine(_sync_env->cct),
- sync_env(_sync_env),
+RGWRunBucketSourcesSyncCR::RGWRunBucketSourcesSyncCR(RGWDataSyncCtx *_sc,
+ std::optional<rgw_bucket_shard> _target_bs,
+ std::optional<rgw_bucket_shard> _source_bs,
+ const RGWSyncTraceNodeRef& _tn_parent) : RGWCoroutine(_sc->env->cct),
+ sc(_sc),
+ sync_env(_sc->env),
target_bs(_target_bs),
- source_zone(_source_zone),
source_bs(_source_bs),
tn(sync_env->sync_tracer->add_node(_tn_parent, "bucket_sync_sources",
- SSTR( "target=" << target_bucket.value_or(rgw_bucket()) << ":source_bucket=" << source_bucket.value_or(rgw_bucket()) << ":source_zone=" << source_zone.value_or("*")))) {
- if (target_bs) {
- target_bucket = target_bs->bucket;
- sources_obj = RGWBucketSyncPeersManager::sync_sources_obj(_sync_env->svc->zone, *target_bucket);
- }
- if (source_bs) {
- source_bucket = source_bs->bucket;
- }
+ SSTR( "target=" << target_bucket.value_or(rgw_bucket()) << ":source_bucket=" << source_bucket.value_or(rgw_bucket()) << ":source_zone=" << sc->source_zone)))
+{
+ if (target_bs) {
+ target_bucket = target_bs->bucket;
+ sources_obj = RGWBucketSyncPeersManager::sync_sources_obj(sync_env->svc->zone, *target_bucket);
}
- ~RGWRunBucketSourcesSyncCR() override {
- if (lease_cr) {
- lease_cr->abort();
- }
+ if (source_bs) {
+ source_bucket = source_bs->bucket;
}
-
- int operate() override;
-};
+}
int RGWRunBucketSourcesSyncCR::operate()
{
}
tn->log(10, "took lease");
- yield call(new RGWGetBucketPeersCR(sync_env, target_bucket, source_zone, source_bucket, &pipes, tn));
+ yield call(new RGWGetBucketPeersCR(sync_env, target_bucket, sc->source_zone, source_bucket, &pipes, tn));
if (retcode < 0 && retcode != -ENOENT) {
tn->log(0, "ERROR: failed to read sync status for bucket");
lease_cr->go_down();
drain_all();
return set_cr_error(retcode);
}
+ ldpp_dout(sync_env->dpp, 20) << __func__ << "(): requested source_bs=" << source_bs << " target_bs=" << target_bs << dendl;
for (siter = pipes.begin(); siter != pipes.end(); ++siter) {
- scs.emplace_back();
- cur_sc = &scs.back();
-
{
- auto& szone = siter->source.zone;
- if (last_zone != szone) {
- conn = sync_env->svc->zone->get_zone_conn_by_id(szone);
- if (!conn) {
- ldpp_dout(sync_env->dpp, 0) << "ERROR: connection object to zone " << szone << " does not exist" << dendl;
- continue;
- }
- last_zone = szone;
+ ldpp_dout(sync_env->dpp, 20) << __func__ << "(): sync pipe=" << *siter << dendl;
+
+ source_num_shards = siter->source.get_bucket_info().num_shards;
+ target_num_shards = siter->target.get_bucket_info().num_shards;
+ if (source_bs) {
+ sync_pair.source_bs = *source_bs;
+ } else {
+ sync_pair.source_bs.bucket = siter->source.get_bucket();
+ }
+
+ if (sync_pair.source_bs.shard_id >= 0) {
+ num_shards = 1;
+ cur_shard = sync_pair.source_bs.shard_id;
+ } else {
+ num_shards = std::max<int>(1, source_num_shards);
+ cur_shard = std::min<int>(0, source_num_shards);
}
- cur_sc->init(sync_env, conn, szone);
}
- sync_pair.source_bs.bucket = siter->source.get_bucket();
- sync_pair.dest_bs.bucket = siter->target.get_bucket();
-#warning TODO iterate over shards
-
- yield spawn(new RGWRunBucketSyncCoroutine(cur_sc, sync_pair, tn), false);
- while (num_spawned() > BUCKET_SYNC_SPAWN_WINDOW) {
- set_status() << "num_spawned() > spawn_window";
- yield wait_for_child();
- bool again = true;
- while (again) {
- again = collect(&ret, nullptr);
- if (ret < 0) {
- tn->log(10, "a sync operation returned error");
- /* we have reported this error */
+ ldpp_dout(sync_env->dpp, 20) << __func__ << "(): num shards=" << num_shards << " cur_shard=" << cur_shard << dendl;
+ for (; num_shards > 0; --num_shards, ++cur_shard) {
+ sync_pair.source_bs.shard_id = cur_shard;
+ if (source_num_shards == target_num_shards) {
+ sync_pair.dest_bs.shard_id = cur_shard;
+ } else {
+ sync_pair.dest_bs.shard_id = -1;
+ }
+
+ ldpp_dout(sync_env->dpp, 20) << __func__ << "(): sync_pair=" << sync_pair << dendl;
+
+ yield spawn(new RGWRunBucketSyncCoroutine(sc, sync_pair, tn), false);
+ while (num_spawned() > BUCKET_SYNC_SPAWN_WINDOW) {
+ set_status() << "num_spawned() > spawn_window";
+ yield wait_for_child();
+ bool again = true;
+ while (again) {
+ again = collect(&ret, nullptr);
+ if (ret < 0) {
+ tn->log(10, "a sync operation returned error");
+ /* we have reported this error */
+ }
+ /* not waiting for child here */
}
- /* not waiting for child here */
}
}
}