From: Casey Bodley Date: Tue, 31 Mar 2020 13:22:58 +0000 (-0400) Subject: rgw: parse bucket-shard before DataSyncSingleEntry X-Git-Tag: v16.1.0~2586^2~19 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=5b97f04d37a931df6399be9e4c0639d6623041d7;p=ceph.git rgw: parse bucket-shard before DataSyncSingleEntry it's easier for DataSyncShard to handle parsing failures before calling MarkerTrack::start() and DataSyncSingleEntry Signed-off-by: Casey Bodley --- diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index 1045e01f2422..a55836528639 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -1283,12 +1283,11 @@ class RGWDataSyncSingleEntryCR : public RGWCoroutine { RGWDataSyncCtx *sc; RGWDataSyncEnv *sync_env; + rgw_bucket_shard source_bs; string raw_key; string entry_marker; ceph::real_time entry_timestamp; - rgw_bucket_shard source_bs; - int sync_status; bufferlist md_bl; @@ -1300,15 +1299,16 @@ class RGWDataSyncSingleEntryCR : public RGWCoroutine { RGWSyncTraceNodeRef tn; public: - RGWDataSyncSingleEntryCR(RGWDataSyncCtx *_sc, + RGWDataSyncSingleEntryCR(RGWDataSyncCtx *_sc, const rgw_bucket_shard& source_bs, const string& _raw_key, const string& _entry_marker, ceph::real_time entry_timestamp, RGWDataSyncShardMarkerTrack *_marker_tracker, - RGWOmapAppend *_error_repo, bool _remove_from_repo, const RGWSyncTraceNodeRef& _tn_parent) : RGWCoroutine(_sc->cct), - sc(_sc), sync_env(_sc->env), - raw_key(_raw_key), entry_marker(_entry_marker), - entry_timestamp(entry_timestamp), sync_status(0), - marker_tracker(_marker_tracker), - error_repo(_error_repo), remove_from_repo(_remove_from_repo) { + RGWOmapAppend *_error_repo, bool _remove_from_repo, const RGWSyncTraceNodeRef& _tn_parent) + : RGWCoroutine(_sc->cct), + sc(_sc), sync_env(_sc->env), source_bs(source_bs), + raw_key(_raw_key), entry_marker(_entry_marker), + entry_timestamp(entry_timestamp), sync_status(0), + marker_tracker(_marker_tracker), + error_repo(_error_repo), remove_from_repo(_remove_from_repo) { set_description() << "data sync single entry (source_zone=" << sc->source_zone << ") key=" <<_raw_key << " entry=" << entry_marker; tn = sync_env->sync_tracer->add_node(_tn_parent, "entry", raw_key); } @@ -1316,22 +1316,15 @@ public: int operate() override { reenter(this) { do { - yield { - int ret = rgw_bucket_parse_bucket_key(sync_env->cct, raw_key, - &source_bs.bucket, &source_bs.shard_id); - if (ret < 0) { - return set_cr_error(-EIO); - } - if (marker_tracker) { - marker_tracker->reset_need_retry(raw_key); - } - tn->log(0, SSTR("triggering sync of source bucket/shard " << bucket_shard_str{source_bs})); - - call(new RGWRunBucketSourcesSyncCR(sc, - std::nullopt, /* target_bs */ - source_bs, - tn)); + if (marker_tracker) { + marker_tracker->reset_need_retry(raw_key); } + tn->log(0, SSTR("triggering sync of source bucket/shard " << bucket_shard_str{source_bs})); + + yield call(new RGWRunBucketSourcesSyncCR(sc, + std::nullopt, /* target_bs */ + source_bs, + tn)); } while (marker_tracker && marker_tracker->need_retry(raw_key)); sync_status = retcode; @@ -1448,6 +1441,13 @@ class RGWDataSyncShardCR : public RGWCoroutine { uint32_t retry_backoff_secs; RGWSyncTraceNodeRef tn; + + rgw_bucket_shard source_bs; + + int parse_bucket_key(const std::string& key, rgw_bucket_shard& bs) const { + return rgw_bucket_parse_bucket_key(sync_env->cct, key, + &bs.bucket, &bs.shard_id); + } public: RGWDataSyncShardCR(RGWDataSyncCtx *_sc, rgw_pool& _pool, @@ -1572,13 +1572,19 @@ public: tn->log(20, SSTR("retrieved " << entries.size() << " entries to sync")); iter = entries.begin(); for (; iter != entries.end(); ++iter) { + retcode = parse_bucket_key(iter->first, source_bs); + if (retcode < 0) { + tn->log(1, SSTR("failed to parse bucket shard: " << iter->first)); + marker_tracker->try_update_high_marker(iter->first, 0, entry_timestamp); + continue; + } tn->log(20, SSTR("full sync: " << iter->first)); total_entries++; if (!marker_tracker->start(iter->first, total_entries, entry_timestamp)) { tn->log(0, SSTR("ERROR: cannot start syncing " << iter->first << ". Duplicate entry?")); } else { // fetch remote and write locally - yield spawn(new RGWDataSyncSingleEntryCR(sc, iter->first, iter->first, entry_timestamp, marker_tracker, error_repo, false, tn), false); + yield spawn(new RGWDataSyncSingleEntryCR(sc, source_bs, iter->first, iter->first, entry_timestamp, marker_tracker, error_repo, false, tn), false); } sync_marker.marker = iter->first; @@ -1662,10 +1668,13 @@ public: } /* process out of band updates */ for (modified_iter = current_modified.begin(); modified_iter != current_modified.end(); ++modified_iter) { - yield { - tn->log(20, SSTR("received async update notification: " << *modified_iter)); - spawn(new RGWDataSyncSingleEntryCR(sc, *modified_iter, string(), ceph::real_time{}, marker_tracker, nullptr, false, tn), false); + retcode = parse_bucket_key(*modified_iter, source_bs); + if (retcode < 0) { + tn->log(1, SSTR("failed to parse bucket shard: " << *modified_iter)); + continue; } + tn->log(20, SSTR("received async update notification: " << *modified_iter)); + spawn(new RGWDataSyncSingleEntryCR(sc, source_bs, *modified_iter, string(), ceph::real_time{}, marker_tracker, nullptr, false, tn), false); } if (error_retry_time <= ceph::coarse_real_clock::now()) { @@ -1679,8 +1688,15 @@ public: for (; iter != error_entries.end(); ++iter) { error_marker = iter->first; entry_timestamp = rgw_error_repo_decode_value(iter->second); + retcode = parse_bucket_key(error_marker, source_bs); + if (retcode < 0) { + tn->log(1, SSTR("failed to parse bucket shard: " << error_marker)); + spawn(rgw_error_repo_remove_cr(sync_env->store->svc()->rados, rgw_raw_obj{pool, error_oid}, + error_marker, entry_timestamp), false); + continue; + } tn->log(20, SSTR("handle error entry key=" << error_marker << " timestamp=" << entry_timestamp)); - spawn(new RGWDataSyncSingleEntryCR(sc, error_marker, error_marker, + spawn(new RGWDataSyncSingleEntryCR(sc, source_bs, error_marker, error_marker, entry_timestamp, nullptr /* no marker tracker */, error_repo, true, tn), false); } @@ -1716,6 +1732,12 @@ public: for (log_iter = log_entries.begin(); log_iter != log_entries.end(); ++log_iter) { tn->log(20, SSTR("shard_id=" << shard_id << " log_entry: " << log_iter->log_id << ":" << log_iter->log_timestamp << ":" << log_iter->entry.key)); + retcode = parse_bucket_key(log_iter->entry.key, source_bs); + if (retcode < 0) { + tn->log(1, SSTR("failed to parse bucket shard: " << log_iter->entry.key)); + marker_tracker->try_update_high_marker(log_iter->log_id, 0, log_iter->log_timestamp); + continue; + } if (!marker_tracker->index_key_to_marker(log_iter->entry.key, log_iter->log_id)) { tn->log(20, SSTR("skipping sync of entry: " << log_iter->log_id << ":" << log_iter->entry.key << " sync already in progress for bucket shard")); marker_tracker->try_update_high_marker(log_iter->log_id, 0, log_iter->log_timestamp); @@ -1724,7 +1746,7 @@ public: if (!marker_tracker->start(log_iter->log_id, 0, log_iter->log_timestamp)) { tn->log(0, SSTR("ERROR: cannot start syncing " << log_iter->log_id << ". Duplicate entry?")); } else { - spawn(new RGWDataSyncSingleEntryCR(sc, log_iter->entry.key, log_iter->log_id, + spawn(new RGWDataSyncSingleEntryCR(sc, source_bs, log_iter->entry.key, log_iter->log_id, log_iter->log_timestamp, marker_tracker, error_repo, false, tn), false); }