From f929ab342f80dd268785582a38073d13067f1487 Mon Sep 17 00:00:00 2001 From: "Adam C. Emerson" Date: Fri, 10 Sep 2021 11:38:05 -0400 Subject: [PATCH] rgw: RGWListBucketIndexesCR only needs zero shard We only need to check one shard, and everything has shard zero. Signed-off-by: Adam C. Emerson --- src/rgw/rgw_data_sync.cc | 207 ++++++++++++++++++++------------------- 1 file changed, 108 insertions(+), 99 deletions(-) diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index 172edef2a6f97..dd8bb6d762196 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -832,94 +832,131 @@ struct bucket_instance_meta_info { } }; -class RGWListBucketIndexesCR : public RGWCoroutine { +class RGWReadRemoteBucketIndexLogInfoCR : public RGWCoroutine { RGWDataSyncCtx *sc; RGWDataSyncEnv *sync_env; + const string instance_key; - rgw::sal::RadosStore* store; + rgw_bucket_index_marker_info *info; + +public: + RGWReadRemoteBucketIndexLogInfoCR(RGWDataSyncCtx *_sc, + const rgw_bucket& bucket, + rgw_bucket_index_marker_info *_info) + : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env), + instance_key(bucket.get_key()), info(_info) {} + + int operate(const DoutPrefixProvider *dpp) override { + reenter(this) { + yield { + rgw_http_param_pair pairs[] = { { "type" , "bucket-index" }, + { "bucket-instance", instance_key.c_str() }, + { "info" , NULL }, + { NULL, NULL } }; + + string p = "/admin/log/"; + call(new RGWReadRESTResourceCR(sync_env->cct, sc->conn, sync_env->http_manager, p, pairs, info)); + } + if (retcode < 0) { + return set_cr_error(retcode); + } + + return set_cr_done(); + } + return 0; + } +}; + + +class RGWListBucketIndexesCR : public RGWCoroutine { + RGWDataSyncCtx *sc; + RGWDataSyncEnv *sync_env = sc->env; + + rgw::sal::RadosStore* store = sync_env->store; rgw_data_sync_status *sync_status; - int num_shards; - int req_ret; - int ret; + int req_ret = 0; + int ret = 0; list::iterator iter; - RGWShardedOmapCRManager *entries_index; - - string oid_prefix; + unique_ptr entries_index; + string oid_prefix = + datalog_sync_full_sync_index_prefix + "." + sc->source_zone.id; - string path; + string path = "/admin/metadata/bucket.instance"; bucket_instance_meta_info meta_info; string key; - string s; - int i; - bool failed; - bool truncated; + bool failed = false; + bool truncated = false; read_metadata_list result; public: - RGWListBucketIndexesCR(RGWDataSyncCtx *_sc, - rgw_data_sync_status *_sync_status) : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env), - store(sync_env->store), sync_status(_sync_status), - req_ret(0), ret(0), entries_index(NULL), i(0), failed(false), truncated(false) { - oid_prefix = datalog_sync_full_sync_index_prefix + "." + sc->source_zone.id; - path = "/admin/metadata/bucket.instance"; - num_shards = sync_status->sync_info.num_shards; - } - ~RGWListBucketIndexesCR() override { - delete entries_index; - } + RGWListBucketIndexesCR(RGWDataSyncCtx* sc, + rgw_data_sync_status* sync_status) + : RGWCoroutine(sc->cct), sc(sc), sync_status(sync_status) {} + ~RGWListBucketIndexesCR() override { } int operate(const DoutPrefixProvider *dpp) override { reenter(this) { - entries_index = new RGWShardedOmapCRManager(sync_env->async_rados, store, this, num_shards, - sync_env->svc->zone->get_zone_params().log_pool, - oid_prefix); + entries_index = std::make_unique( + sync_env->async_rados, store, this, + cct->_conf->rgw_data_log_num_shards, + sync_env->svc->zone->get_zone_params().log_pool, + oid_prefix); yield; // yield so OmapAppendCRs can start do { yield { - string entrypoint = string("/admin/metadata/bucket.instance"); + string entrypoint = "/admin/metadata/bucket.instance"s; rgw_http_param_pair pairs[] = {{"max-entries", "1000"}, {"marker", result.marker.c_str()}, {NULL, NULL}}; - call(new RGWReadRESTResourceCR(sync_env->cct, sc->conn, sync_env->http_manager, - entrypoint, pairs, &result)); - } - if (retcode < 0) { - ldpp_dout(dpp, 0) << "ERROR: failed to fetch metadata for section bucket.instance" << dendl; + call(new RGWReadRESTResourceCR( + sync_env->cct, sc->conn, sync_env->http_manager, + entrypoint, pairs, &result)); + } + if (retcode < 0) { + ldpp_dout(dpp, 0) + << "ERROR: failed to fetch metadata for section bucket.instance" + << dendl; return set_cr_error(retcode); } for (iter = result.keys.begin(); iter != result.keys.end(); ++iter) { - ldpp_dout(dpp, 20) << "list metadata: section=bucket.instance key=" << *iter << dendl; + ldpp_dout(dpp, 20) << "list metadata: section=bucket.instance key=" + << *iter << dendl; key = *iter; yield { rgw_http_param_pair pairs[] = {{"key", key.c_str()}, {NULL, NULL}}; - call(new RGWReadRESTResourceCR(sync_env->cct, sc->conn, sync_env->http_manager, path, pairs, &meta_info)); + call(new RGWReadRESTResourceCR( + sync_env->cct, sc->conn, sync_env->http_manager, path, pairs, + &meta_info)); } - - num_shards = meta_info.data.get_bucket_info().layout.current_index.layout.normal.num_shards; - if (num_shards > 0) { - for (i = 0; i < num_shards; i++) { - char buf[16]; - snprintf(buf, sizeof(buf), ":%d", i); - s = key + buf; - yield entries_index->append(s, sync_env->svc->datalog_rados->get_log_shard_id(meta_info.data.get_bucket_info().bucket, i)); - } - } else { - yield entries_index->append(key, sync_env->svc->datalog_rados->get_log_shard_id(meta_info.data.get_bucket_info().bucket, -1)); - } - } - truncated = result.truncated; + if (retcode < 0) { + ldpp_dout(dpp, 0) << "ERROR: failed to fetch metadata for key: " + << key << dendl; + return set_cr_error(retcode); + } + // Now that bucket full sync is bucket-wide instead of + // per-shard, we only need to register a single shard of + // each bucket to guarantee that sync will see everything + // that happened before data full sync starts. This also + // means we don't have to care about the bucket's current + // shard count. + yield entries_index->append( + fmt::format("{}:{}", key, 0), + sync_env->svc->datalog_rados->get_log_shard_id( + meta_info.data.get_bucket_info().bucket, 0)); + } + truncated = result.truncated; } while (truncated); yield { @@ -928,28 +965,35 @@ public: } } if (!failed) { - for (map::iterator iter = sync_status->sync_markers.begin(); iter != sync_status->sync_markers.end(); ++iter) { + for (auto iter = sync_status->sync_markers.begin(); + iter != sync_status->sync_markers.end(); + ++iter) { int shard_id = (int)iter->first; rgw_data_sync_marker& marker = iter->second; marker.total_entries = entries_index->get_total_entries(shard_id); - spawn(new RGWSimpleRadosWriteCR(dpp, sync_env->async_rados, sync_env->svc->sysobj, - rgw_raw_obj(sync_env->svc->zone->get_zone_params().log_pool, RGWDataSyncStatusManager::shard_obj_name(sc->source_zone, shard_id)), - marker), - true); - } + spawn(new RGWSimpleRadosWriteCR( + dpp, sync_env->async_rados, sync_env->svc->sysobj, + rgw_raw_obj(sync_env->svc->zone->get_zone_params().log_pool, + RGWDataSyncStatusManager::shard_obj_name( + sc->source_zone, shard_id)), + marker), + true); + } } else { - yield call(sync_env->error_logger->log_error_cr(dpp, sc->conn->get_remote_id(), "data.init", "", - EIO, string("failed to build bucket instances map"))); + yield call(sync_env->error_logger->log_error_cr( + dpp, sc->conn->get_remote_id(), "data.init", "", + EIO, string("failed to build bucket instances map"))); } while (collect(&ret, NULL)) { - if (ret < 0) { - yield call(sync_env->error_logger->log_error_cr(dpp, sc->conn->get_remote_id(), "data.init", "", - -ret, string("failed to store sync status: ") + cpp_strerror(-ret))); - req_ret = ret; - } - yield; + if (ret < 0) { + yield call(sync_env->error_logger->log_error_cr( + dpp, sc->conn->get_remote_id(), "data.init", "", + -ret, string("failed to store sync status: ") + + cpp_strerror(-ret))); + req_ret = ret; + } + yield; } - drain_all(); if (req_ret < 0) { yield return set_cr_error(req_ret); @@ -2702,41 +2746,6 @@ string RGWDataSyncStatusManager::shard_obj_name(const rgw_zone_id& source_zone, return string(buf); } -class RGWReadRemoteBucketIndexLogInfoCR : public RGWCoroutine { - RGWDataSyncCtx *sc; - RGWDataSyncEnv *sync_env; - const string instance_key; - - rgw_bucket_index_marker_info *info; - -public: - RGWReadRemoteBucketIndexLogInfoCR(RGWDataSyncCtx *_sc, - const rgw_bucket& bucket, - rgw_bucket_index_marker_info *_info) - : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env), - instance_key(bucket.get_key()), info(_info) {} - - int operate(const DoutPrefixProvider *dpp) override { - reenter(this) { - yield { - rgw_http_param_pair pairs[] = { { "type" , "bucket-index" }, - { "bucket-instance", instance_key.c_str() }, - { "info" , NULL }, - { NULL, NULL } }; - - string p = "/admin/log/"; - call(new RGWReadRESTResourceCR(sync_env->cct, sc->conn, sync_env->http_manager, p, pairs, info)); - } - if (retcode < 0) { - return set_cr_error(retcode); - } - - return set_cr_done(); - } - return 0; - } -}; - class RGWInitBucketShardSyncStatusCoroutine : public RGWCoroutine { RGWDataSyncCtx *sc; RGWDataSyncEnv *sync_env; -- 2.39.5